From 4317d46e3fa594dabc7dd3be03cb9efe98f3a9d6 Mon Sep 17 00:00:00 2001 From: Gusto Date: Tue, 16 Jul 2024 16:04:59 +0300 Subject: [PATCH] Parse length+message in the proto module --- da/executor/executor.py | 1 - da/executor/mock_network.py | 1 - da/executor/proto.py | 7 ++++++- da/executor/transport.py | 9 +-------- 4 files changed, 7 insertions(+), 11 deletions(-) diff --git a/da/executor/executor.py b/da/executor/executor.py index f74ca0c..11458af 100644 --- a/da/executor/executor.py +++ b/da/executor/executor.py @@ -44,4 +44,3 @@ class Executor: async def run(self): await asyncio.gather(*(self.connect() for _ in range(self.col_num))) await self.execute() - diff --git a/da/executor/mock_network.py b/da/executor/mock_network.py index da4a1aa..f6dbaa6 100644 --- a/da/executor/mock_network.py +++ b/da/executor/mock_network.py @@ -28,4 +28,3 @@ def main(): if __name__ == '__main__': main() - diff --git a/da/executor/proto.py b/da/executor/proto.py index 673c794..6c5baa8 100644 --- a/da/executor/proto.py +++ b/da/executor/proto.py @@ -3,6 +3,12 @@ from itertools import count MAX_MSG_LEN_BYTES = 2 +async def parse_from_reader(reader): + length_prefix = await reader.readexactly(MAX_MSG_LEN_BYTES) + data_length = int.from_bytes(length_prefix, byteorder='big') + data = await reader.readexactly(data_length) + return unpack_message(data) + def pack_message(message): # SerializeToString method returns an instance of bytes. data = message.SerializeToString() @@ -56,4 +62,3 @@ def new_sample_res_not_found_error_msg(description): sample_res = dispersal_pb2.SampleRes(err=sample_err) dispersal_message = dispersal_pb2.DispersalMessage(sample_res=sample_res) return pack_message(dispersal_message) - diff --git a/da/executor/transport.py b/da/executor/transport.py index 9d928a5..da33d8c 100644 --- a/da/executor/transport.py +++ b/da/executor/transport.py @@ -11,12 +11,7 @@ class Transport: async def read_and_process(self): try: while True: - length_prefix = await self.reader.readexactly(proto.MAX_MSG_LEN_BYTES) - data_length = int.from_bytes(length_prefix, byteorder='big') - - data = await self.reader.readexactly(data_length) - message = proto.unpack_message(data) - + message = await proto.parse_from_reader(self.reader) await self.handler(self.conn_id, self.writer, message) except asyncio.IncompleteReadError: print("Transport: Connection closed by the peer.") @@ -29,5 +24,3 @@ class Transport: async def write(self, message): self.writer.write(message) await self.writer.drain() - -