diff --git a/mcg/client.py b/mcg/client.py index b191511..f72443a 100644 --- a/mcg/client.py +++ b/mcg/client.py @@ -124,6 +124,9 @@ class Client(Base): SIGNAL_CUSTOM = 'custom' # Signal: error SIGNAL_ERROR = 'error' + # Buffer size for reading from socket + SOCKET_BUFSIZE = 4096 + def __init__(self): @@ -131,7 +134,7 @@ class Client(Base): Base.__init__(self) self._logger = logging.getLogger(__name__) self._sock = None - self._sock_read = None + self._buffer = bytearray() self._sock_write = None self._stop = threading.Event() self._actions = queue.Queue() @@ -295,7 +298,6 @@ class Client(Base): return try: self._sock = self._connect_socket(host, port) - self._sock_read = self._sock.makefile("r", encoding="utf-8") self._sock_write = self._sock.makefile("w", encoding="utf-8") self._greet() self._logger.info("connected") @@ -329,11 +331,8 @@ class Client(Base): def _greet(self): - greeting = self._sock_read.readline() + greeting = self._read_line() self._logger.debug("greeting: %s", greeting.strip()) - if not greeting.endswith("\n"): - self._disconnect_socket() - raise ConnectionException("incomplete line") if not greeting.startswith(Client.PROTOCOL_GREETING): self._disconnect_socket() raise ProtocolException("invalid greeting: {}".format(greeting)) @@ -347,9 +346,6 @@ class Client(Base): def _disconnect_socket(self): - if self._sock_read is not None: - self._sock_read.close() - self._sock_read = None if self._sock_write is not None: self._sock_write.close() self._sock_write = None @@ -698,16 +694,10 @@ class Client(Base): def _read(self): self._logger.debug("reading response") response = [] - line = self._sock_read.readline() - if not line.endswith("\n"): - self._disconnect_socket() - raise ConnectionException("incomplete line") + line = self._read_line() while not line.startswith(Client.PROTOCOL_COMPLETION) and not line.startswith(Client.PROTOCOL_ERROR): response.append(line.strip()) - line = self._sock_read.readline() - if not line.endswith("\n"): - self._disconnect_socket() - raise ConnectionException("incomplete line") + line = self._read_line() if line.startswith(Client.PROTOCOL_COMPLETION): self._logger.debug("response complete") if line.startswith(Client.PROTOCOL_ERROR): @@ -718,6 +708,66 @@ class Client(Base): return response + def _read_line(self): + self._logger.debug("reading line") + + # Read from the buffer + data = self._buffer_get_char(b'\x0A') + if not data.endswith(b'\x0A'): + # Read more from socket until next line break + while b'\x0A' not in data: + buf = self._sock.recv(Client.SOCKET_BUFSIZE) + if buf: + data += buf + else: + break + + # Get first line from data, add rest to buffer + if data: + lines = data.split(b'\x0A', 1) + data = lines[0] + self._buffer_set(lines[1]) + if data: + return data.decode('utf-8') + return None + + + def _read_bytes(self, buf, nbytes): + self._logger.debug("reading bytes") + # Use already buffered data + buf_read = self._buffer_get_size(nbytes) + nbytes_read = len(buf_read) + buf[0:nbytes_read] = buf_read + # Read additional data from socket + nbytes = nbytes - nbytes_read + if nbytes > 0: + buf_view = memoryview(buf)[nbytes_read:] + nbytes_read += self._sock.recv_into(buf_view, nbytes) + return nbytes_read + + + def _buffer_get_char(self, char): + pos = self._buffer.find(char) + if pos < 0: + pos = len(self._buffer)-1 + buf = self._buffer[0:pos+1] + self._buffer = self._buffer[pos+1:] + return buf + + + def _buffer_get_size(self, size): + buf = self._buffer[0:size] + self._logger.debug("get %d bytes from buffer", len(buf)) + self._buffer = self._buffer[size:] + self._logger.debug("leaving %d in the buffer", len(self._buffer)) + return buf + + + def _buffer_set(self, buf): + self._logger.debug("set %d %s as buffer", len(buf), type(buf)) + self._buffer = buf + + def _parse_dict(self, response): dict = {} if response: