Use custom buffer for reading from socket
Use the recv() method to read data from the socket instead of makefile() to allow reading of binary data that is not text. This requires using a custom buffer.
This commit is contained in:
parent
83990c8796
commit
c53681ea82
1 changed files with 67 additions and 17 deletions
|
@ -124,6 +124,9 @@ class Client(Base):
|
|||
SIGNAL_CUSTOM = 'custom'
|
||||
# Signal: error
|
||||
SIGNAL_ERROR = 'error'
|
||||
# Buffer size for reading from socket
|
||||
SOCKET_BUFSIZE = 4096
|
||||
|
||||
|
||||
|
||||
def __init__(self):
|
||||
|
@ -131,7 +134,7 @@ class Client(Base):
|
|||
Base.__init__(self)
|
||||
self._logger = logging.getLogger(__name__)
|
||||
self._sock = None
|
||||
self._sock_read = None
|
||||
self._buffer = bytearray()
|
||||
self._sock_write = None
|
||||
self._stop = threading.Event()
|
||||
self._actions = queue.Queue()
|
||||
|
@ -295,7 +298,6 @@ class Client(Base):
|
|||
return
|
||||
try:
|
||||
self._sock = self._connect_socket(host, port)
|
||||
self._sock_read = self._sock.makefile("r", encoding="utf-8")
|
||||
self._sock_write = self._sock.makefile("w", encoding="utf-8")
|
||||
self._greet()
|
||||
self._logger.info("connected")
|
||||
|
@ -329,11 +331,8 @@ class Client(Base):
|
|||
|
||||
|
||||
def _greet(self):
|
||||
greeting = self._sock_read.readline()
|
||||
greeting = self._read_line()
|
||||
self._logger.debug("greeting: %s", greeting.strip())
|
||||
if not greeting.endswith("\n"):
|
||||
self._disconnect_socket()
|
||||
raise ConnectionException("incomplete line")
|
||||
if not greeting.startswith(Client.PROTOCOL_GREETING):
|
||||
self._disconnect_socket()
|
||||
raise ProtocolException("invalid greeting: {}".format(greeting))
|
||||
|
@ -347,9 +346,6 @@ class Client(Base):
|
|||
|
||||
|
||||
def _disconnect_socket(self):
|
||||
if self._sock_read is not None:
|
||||
self._sock_read.close()
|
||||
self._sock_read = None
|
||||
if self._sock_write is not None:
|
||||
self._sock_write.close()
|
||||
self._sock_write = None
|
||||
|
@ -698,16 +694,10 @@ class Client(Base):
|
|||
def _read(self):
|
||||
self._logger.debug("reading response")
|
||||
response = []
|
||||
line = self._sock_read.readline()
|
||||
if not line.endswith("\n"):
|
||||
self._disconnect_socket()
|
||||
raise ConnectionException("incomplete line")
|
||||
line = self._read_line()
|
||||
while not line.startswith(Client.PROTOCOL_COMPLETION) and not line.startswith(Client.PROTOCOL_ERROR):
|
||||
response.append(line.strip())
|
||||
line = self._sock_read.readline()
|
||||
if not line.endswith("\n"):
|
||||
self._disconnect_socket()
|
||||
raise ConnectionException("incomplete line")
|
||||
line = self._read_line()
|
||||
if line.startswith(Client.PROTOCOL_COMPLETION):
|
||||
self._logger.debug("response complete")
|
||||
if line.startswith(Client.PROTOCOL_ERROR):
|
||||
|
@ -718,6 +708,66 @@ class Client(Base):
|
|||
return response
|
||||
|
||||
|
||||
def _read_line(self):
|
||||
self._logger.debug("reading line")
|
||||
|
||||
# Read from the buffer
|
||||
data = self._buffer_get_char(b'\x0A')
|
||||
if not data.endswith(b'\x0A'):
|
||||
# Read more from socket until next line break
|
||||
while b'\x0A' not in data:
|
||||
buf = self._sock.recv(Client.SOCKET_BUFSIZE)
|
||||
if buf:
|
||||
data += buf
|
||||
else:
|
||||
break
|
||||
|
||||
# Get first line from data, add rest to buffer
|
||||
if data:
|
||||
lines = data.split(b'\x0A', 1)
|
||||
data = lines[0]
|
||||
self._buffer_set(lines[1])
|
||||
if data:
|
||||
return data.decode('utf-8')
|
||||
return None
|
||||
|
||||
|
||||
def _read_bytes(self, buf, nbytes):
|
||||
self._logger.debug("reading bytes")
|
||||
# Use already buffered data
|
||||
buf_read = self._buffer_get_size(nbytes)
|
||||
nbytes_read = len(buf_read)
|
||||
buf[0:nbytes_read] = buf_read
|
||||
# Read additional data from socket
|
||||
nbytes = nbytes - nbytes_read
|
||||
if nbytes > 0:
|
||||
buf_view = memoryview(buf)[nbytes_read:]
|
||||
nbytes_read += self._sock.recv_into(buf_view, nbytes)
|
||||
return nbytes_read
|
||||
|
||||
|
||||
def _buffer_get_char(self, char):
|
||||
pos = self._buffer.find(char)
|
||||
if pos < 0:
|
||||
pos = len(self._buffer)-1
|
||||
buf = self._buffer[0:pos+1]
|
||||
self._buffer = self._buffer[pos+1:]
|
||||
return buf
|
||||
|
||||
|
||||
def _buffer_get_size(self, size):
|
||||
buf = self._buffer[0:size]
|
||||
self._logger.debug("get %d bytes from buffer", len(buf))
|
||||
self._buffer = self._buffer[size:]
|
||||
self._logger.debug("leaving %d in the buffer", len(self._buffer))
|
||||
return buf
|
||||
|
||||
|
||||
def _buffer_set(self, buf):
|
||||
self._logger.debug("set %d %s as buffer", len(buf), type(buf))
|
||||
self._buffer = buf
|
||||
|
||||
|
||||
def _parse_dict(self, response):
|
||||
dict = {}
|
||||
if response:
|
||||
|
|
Loading…
Reference in a new issue