From 44689d6e0c47c59fb9f7df6bfa50daedeafbb81a Mon Sep 17 00:00:00 2001 From: Gusto Date: Thu, 11 Jul 2024 19:10:09 +0300 Subject: [PATCH] Write to connections in parallel --- da/executor/executor.py | 10 ++++------ da/executor/node.py | 5 +++-- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/da/executor/executor.py b/da/executor/executor.py index 6163f9a..f74ca0c 100644 --- a/da/executor/executor.py +++ b/da/executor/executor.py @@ -11,13 +11,11 @@ class Executor: self.interval = 10 async def execute(self): - blob_id = b"test" - data = b"TEST DATA" + message = proto.new_dispersal_req_msg(b"dummy_blob_id", b"dummy_data") while True: try: - for transport in self.connections: - message = proto.new_dispersal_req_msg(blob_id, data) - await transport.write(message) + # TODO: Mock original data conversion into blobs. + await asyncio.gather(*[t.write(message) for t in self.connections]) await asyncio.sleep(self.interval) except asyncio.CancelledError: break @@ -41,7 +39,7 @@ class Executor: elif message.HasField('sample_res'): print(f"Executor: Received SampleRes: blob_id={message.sample_res.blob_id}") else: - print("Executor: Received unknown message type") + print(f"Executor: Received unknown message: {message}") async def run(self): await asyncio.gather(*(self.connect() for _ in range(self.col_num))) diff --git a/da/executor/node.py b/da/executor/node.py index ac73196..363de35 100644 --- a/da/executor/node.py +++ b/da/executor/node.py @@ -27,13 +27,14 @@ class Node: async def _handle(self, conn_id, writer, message): if message.HasField('dispersal_req'): blob_id = message.dispersal_req.blob.blob_id - print(f"Node: Received DispersalRes: blob_id={blob_id}") + data = message.dispersal_req.blob.data + print(f"Node: Received DispersalRes: blob_id={blob_id}; data={data}") # Imitate succesful verification. writer.write(proto.new_dispersal_res_success_msg(blob_id)) elif message.HasField('sample_req'): print(f"Node: Received SampleRes: blob_id={message.sample_req.blob_id}") else: - print("Node: Received unknown message type") + print(f"Node: Received unknown message: {message} ") async def run(self): await self.listen()