import codecs
[docs]class SourceProtocolABC(object):
'''
Source protocol is a handler class, that basically gets the socket (in reader)
and extract the payload from it in a way that is conformant to expected protocol.
That is happening in the `handle()` method.
The output is to be shipped to source.process() method.
'''
[docs] def __init__(self, app, pipeline, config):
"""
Description:
"""
self.Loop = app.Loop
[docs] async def handle(self, source, stream, context):
"""
Description:
"""
raise NotImplementedError()
[docs]class LineSourceProtocol(SourceProtocolABC):
'''
Description: Basically readline() for reading lines from a socket.
'''
[docs] def __init__(self, app, pipeline, config):
"""
Description:
"""
super().__init__(app, pipeline, config)
# TODO: All following values could be read from configuration
self.EOL = b'\n'
self.SaneBufferSize = 64 * 1024 # The maximum buffer size considered as sane
# Line decoder
decode_codec = config['decode']
if decode_codec == "bytes":
self.Codec = None
self.LineDecoder = self._line_bytes_decoder
else:
self.Codec = codecs.lookup(decode_codec)
self.LineDecoder = self._line_codec_decoder
[docs] async def handle(self, source, stream, context):
"""
Description:
"""
pipeline = source.Pipeline
input_buffer = bytearray(b' ' * 8)
input_buffer_mv = memoryview(input_buffer)
input_buffer_pos = 0
last_eol_pos = 0
while True:
recv_bytes = await stream.recv_into(input_buffer_mv[input_buffer_pos:])
if recv_bytes <= 0:
# Client closed the connection
if recv_bytes < 0:
raise RuntimeError("Client sock_recv_into returned {}".format(recv_bytes))
return
input_buffer_pos += recv_bytes
if len(input_buffer) == input_buffer_pos:
if len(input_buffer) > self.SaneBufferSize:
raise RuntimeError("Insane buffer size requested")
# Grow the input_buffer if the size touches the top
new_input_buffer = bytearray(b' ' * (len(input_buffer) * 2))
input_buffer_mv = memoryview(new_input_buffer)
input_buffer_mv[:len(input_buffer)] = input_buffer # Copy the content of the old buffer
input_buffer = new_input_buffer
while last_eol_pos < input_buffer_pos:
# Seek for end of line symbol in the buffer
eol_pos = input_buffer[:input_buffer_pos].find(self.EOL, last_eol_pos)
if eol_pos == -1:
break
line = self.LineDecoder(
line_bytes=input_buffer[last_eol_pos:eol_pos]
)
last_eol_pos = eol_pos + 1
await pipeline.ready()
await source.process(line, context=context.copy())
# TODO: HIGH PRIORITY This definitively doesn't cover all possible cases
# If the '\n' is at the end of the buffer, reset the buffer position
if last_eol_pos == input_buffer_pos:
input_buffer_pos = 0
last_eol_pos = 0
continue
def _line_codec_decoder(self, line_bytes):
line, _ = self.Codec.decode(
line_bytes
)
return line
def _line_bytes_decoder(self, line_bytes):
return line_bytes