Source code for testplan.common.utils.sockets.fix.client

"""Fix TCP client module."""
import ssl
import time
import socket
from typing import Optional

from testplan.common.utils.sockets.fix.utils import utc_timestamp

from .parser import tagsoverride, FixParser
from ..tls import TLSConfig


[docs] class Client: """ A Basic FIX Client Connects to a FIX server via the FIX session protocol. """ def __init__( self, msgclass, codec, host, port, sender, target, version="FIX.4.2", sendersub=None, interface=None, logger=None, tls_config: Optional[TLSConfig] = None, ): """ Create a new FIX client. This constructor takes parameters that specify the address (host, port) to connect to and identifiers necessary to uniquely identify the connection (sender, target). :param msgclass: Type used to construct logon, logoff and received FIX messages. :type msgclass: ``type`` :param codec: A Codec to use to encode and decode FIX messages. :type codec: a ``Codec`` instance :param host: hostname or IP address to connect to. :type host: ``str`` :param port: port to connect to. :type port: ``str`` or ``int`` :param sender: Value written to tag 49 (SenderCompID). Used to identify the firm sending the message. :type sender: ``str`` :param target: Value written to tag 56 (TargetCompID). Used to identify the firm receiving the message. :type target: ``str`` :param version: FIX version, defaults to "FIX.4.2". This string is used as the contents of tag 8 (BeginString). :type version: ``str`` :param sendersub: Value to be used as default value tag 50 (SenderSubID, a.k.a. OwnerID). Only used if tag 50 does not have a value. Used to identify the message originator. :type sendersub: ``str`` :param interface: Local interface to bind to. Defaults to None, in which case the socket does not bind before connecting :type interface: (``str``, ``str`` or ``int``) tuple :param logger: Logger instance. :type logger: ``logging.Logger`` """ self.host = host self.port = int(port) self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if interface is not None: self.socket.bind(interface) self.version = version self.sender = sender self.target = target self.sendersub = sendersub self.in_seqno = 1 self.out_seqno = 1 self.timeout = 30 self.msgclass = msgclass self.log_callback = logger.debug if logger else lambda msg: None self.tls_config = tls_config self.codec = codec self.connection_name = "{}:{}:{}_{}{}".format( self.sender, self.target, self.sendersub, self.host, self.port ) if self.tls_config: self.socket = self.tls_config.get_context( purpose=ssl.Purpose.SERVER_AUTH ).wrap_socket(self.socket, server_hostname=self.host) @property def address(self): """ Returns the host and port information of socket. """ return self.socket.getsockname()
[docs] def connect(self): """ Transport connection. """ self.log_callback( "Connecting socket to {}:{}".format(self.host, self.port) ) return self.socket.connect((self.host, self.port))
[docs] def sendlogon(self, custom_tags=None): """ Send logon message. """ req = self.msgclass.from_dict({35: "A", 98: "0", 108: "600", 141: "Y"}) tagsoverride(req, custom_tags or {}) if 34 in req: self.out_seqno = int(req[34]) self.log_callback("Sending logon msg {}.".format(req)) return self.send(req)
def _populate_tags(self, msg): msg[8] = self.version msg[49] = self.sender msg[56] = self.target if 50 not in msg and self.sendersub: msg[50] = self.sendersub msg[52] = getattr(self.codec, "utc_timestamp", utc_timestamp)() msg[34] = self.out_seqno self.out_seqno += 1 if msg[35] in (b"4", "4"): self.out_seqno = int(msg[36]) return msg
[docs] def send(self, msg): """ Regular send. """ return self.rawsend_tsp(self._populate_tags(msg))
[docs] def rawsend(self, msg): """ Raw send (without stamping any session tags). """ return self.rawsend_tsp(msg)[1]
[docs] def rawsend_tsp(self, msg): """ Raw send (without stamping any session tags). """ self.log_callback("Sending msg {}.".format(msg)) msgstr = msg.to_wire(self.codec) tsp = time.time() * 1000000 self.socket.send(msgstr) return tsp, msg
[docs] def receive(self, timeout=30): """ Receive a FIX message. """ self.socket.settimeout(float(timeout)) data = self.socket.recv(1) if not data: self.log_callback("Received empty data, peer closed?") raise socket.error("Received empty data") parser = FixParser() size = parser.consume(data) while size: size = parser.consume(self.socket.recv(size)) self.in_seqno += 1 return self.msgclass.from_buffer(parser.buffer, self.codec)
[docs] def sendlogoff(self, custom_tags=None): """ Send logoff message. """ req = self.msgclass.from_dict({35: "5"}) tagsoverride(req, custom_tags or {}) self.log_callback("Sending logoff msg {}.".format(req)) return self.send(req)
[docs] def close(self): """ Close the connection. """ self.socket.close() self.socket = None self.log_callback("Closed socket.")