From c53681ea82a8d16716b1f3d445cce657b710b5ce Mon Sep 17 00:00:00 2001 From: coderkun Date: Sun, 4 Nov 2018 12:17:09 +0100 Subject: [PATCH] Use custom buffer for reading from socket Use the recv() method to read data from the socket instead of makefile() to allow reading of binary data that is not text. This requires using a custom buffer. --- mcg/client.py | 84 ++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 67 insertions(+), 17 deletions(-) diff --git a/mcg/client.py b/mcg/client.py index b191511..f72443a 100644 --- a/mcg/client.py +++ b/mcg/client.py @@ -124,6 +124,9 @@ class Client(Base): SIGNAL_CUSTOM = 'custom' # Signal: error SIGNAL_ERROR = 'error' + # Buffer size for reading from socket + SOCKET_BUFSIZE = 4096 + def __init__(self): @@ -131,7 +134,7 @@ class Client(Base): Base.__init__(self) self._logger = logging.getLogger(__name__) self._sock = None - self._sock_read = None + self._buffer = bytearray() self._sock_write = None self._stop = threading.Event() self._actions = queue.Queue() @@ -295,7 +298,6 @@ class Client(Base): return try: self._sock = self._connect_socket(host, port) - self._sock_read = self._sock.makefile("r", encoding="utf-8") self._sock_write = self._sock.makefile("w", encoding="utf-8") self._greet() self._logger.info("connected") @@ -329,11 +331,8 @@ class Client(Base): def _greet(self): - greeting = self._sock_read.readline() + greeting = self._read_line() self._logger.debug("greeting: %s", greeting.strip()) - if not greeting.endswith("\n"): - self._disconnect_socket() - raise ConnectionException("incomplete line") if not greeting.startswith(Client.PROTOCOL_GREETING): self._disconnect_socket() raise ProtocolException("invalid greeting: {}".format(greeting)) @@ -347,9 +346,6 @@ class Client(Base): def _disconnect_socket(self): - if self._sock_read is not None: - self._sock_read.close() - self._sock_read = None if self._sock_write is not None: self._sock_write.close() self._sock_write = None @@ -698,16 +694,10 @@ class Client(Base): def _read(self): self._logger.debug("reading response") response = [] - line = self._sock_read.readline() - if not line.endswith("\n"): - self._disconnect_socket() - raise ConnectionException("incomplete line") + line = self._read_line() while not line.startswith(Client.PROTOCOL_COMPLETION) and not line.startswith(Client.PROTOCOL_ERROR): response.append(line.strip()) - line = self._sock_read.readline() - if not line.endswith("\n"): - self._disconnect_socket() - raise ConnectionException("incomplete line") + line = self._read_line() if line.startswith(Client.PROTOCOL_COMPLETION): self._logger.debug("response complete") if line.startswith(Client.PROTOCOL_ERROR): @@ -718,6 +708,66 @@ class Client(Base): return response + def _read_line(self): + self._logger.debug("reading line") + + # Read from the buffer + data = self._buffer_get_char(b'\x0A') + if not data.endswith(b'\x0A'): + # Read more from socket until next line break + while b'\x0A' not in data: + buf = self._sock.recv(Client.SOCKET_BUFSIZE) + if buf: + data += buf + else: + break + + # Get first line from data, add rest to buffer + if data: + lines = data.split(b'\x0A', 1) + data = lines[0] + self._buffer_set(lines[1]) + if data: + return data.decode('utf-8') + return None + + + def _read_bytes(self, buf, nbytes): + self._logger.debug("reading bytes") + # Use already buffered data + buf_read = self._buffer_get_size(nbytes) + nbytes_read = len(buf_read) + buf[0:nbytes_read] = buf_read + # Read additional data from socket + nbytes = nbytes - nbytes_read + if nbytes > 0: + buf_view = memoryview(buf)[nbytes_read:] + nbytes_read += self._sock.recv_into(buf_view, nbytes) + return nbytes_read + + + def _buffer_get_char(self, char): + pos = self._buffer.find(char) + if pos < 0: + pos = len(self._buffer)-1 + buf = self._buffer[0:pos+1] + self._buffer = self._buffer[pos+1:] + return buf + + + def _buffer_get_size(self, size): + buf = self._buffer[0:size] + self._logger.debug("get %d bytes from buffer", len(buf)) + self._buffer = self._buffer[size:] + self._logger.debug("leaving %d in the buffer", len(self._buffer)) + return buf + + + def _buffer_set(self, buf): + self._logger.debug("set %d %s as buffer", len(buf), type(buf)) + self._buffer = buf + + def _parse_dict(self, response): dict = {} if response: