From dc1fa97fa3c5ecfae497998e04b6ab1221402a9d Mon Sep 17 00:00:00 2001 From: Gusto Date: Thu, 11 Jul 2024 14:03:13 +0300 Subject: [PATCH] Dispersal wrapper messages for executor to node communication --- da/executor/dispersal.proto | 31 ++++++++++++++++ da/executor/dispersal_pb2.py | 44 +++++++++++++++++++++++ da/executor/executor.py | 16 +++++---- da/executor/mock_network.py | 4 +-- da/executor/node.py | 12 +++---- da/executor/proto.py | 69 ++++++++++++++---------------------- da/executor/transport.py | 15 ++++---- requirements.txt | 1 + 8 files changed, 129 insertions(+), 63 deletions(-) create mode 100644 da/executor/dispersal.proto create mode 100644 da/executor/dispersal_pb2.py diff --git a/da/executor/dispersal.proto b/da/executor/dispersal.proto new file mode 100644 index 0000000..e4af369 --- /dev/null +++ b/da/executor/dispersal.proto @@ -0,0 +1,31 @@ +syntax = "proto3"; + +package dispersal; + +message DispersalReq { + bytes blob_id = 1; + bytes data = 2; +} + +message DispersalRes { + int32 blob_id = 1; +} + +message SampleReq { + int32 blob_id = 1; +} + +message SampleRes { + int32 blob_id = 1; + bytes data = 2; +} + +message DispersalMessage { + oneof message_type { + DispersalReq dispersal_req = 1; + DispersalRes dispersal_res = 2; + SampleReq sample_req = 3; + SampleRes sample_res = 4; + } +} + diff --git a/da/executor/dispersal_pb2.py b/da/executor/dispersal_pb2.py new file mode 100644 index 0000000..e696ea9 --- /dev/null +++ b/da/executor/dispersal_pb2.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE +# source: dispersal.proto +# Protobuf Python Version: 5.27.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 5, + 27, + 1, + '', + 'dispersal.proto' +) +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0f\x64ispersal.proto\x12\tdispersal\"-\n\x0c\x44ispersalReq\x12\x0f\n\x07\x62lob_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\"\x1f\n\x0c\x44ispersalRes\x12\x0f\n\x07\x62lob_id\x18\x01 \x01(\x05\"\x1c\n\tSampleReq\x12\x0f\n\x07\x62lob_id\x18\x01 \x01(\x05\"*\n\tSampleRes\x12\x0f\n\x07\x62lob_id\x18\x01 \x01(\x05\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\"\xde\x01\n\x10\x44ispersalMessage\x12\x30\n\rdispersal_req\x18\x01 \x01(\x0b\x32\x17.dispersal.DispersalReqH\x00\x12\x30\n\rdispersal_res\x18\x02 \x01(\x0b\x32\x17.dispersal.DispersalResH\x00\x12*\n\nsample_req\x18\x03 \x01(\x0b\x32\x14.dispersal.SampleReqH\x00\x12*\n\nsample_res\x18\x04 \x01(\x0b\x32\x14.dispersal.SampleResH\x00\x42\x0e\n\x0cmessage_typeb\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'dispersal_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None + _globals['_DISPERSALREQ']._serialized_start=30 + _globals['_DISPERSALREQ']._serialized_end=75 + _globals['_DISPERSALRES']._serialized_start=77 + _globals['_DISPERSALRES']._serialized_end=108 + _globals['_SAMPLEREQ']._serialized_start=110 + _globals['_SAMPLEREQ']._serialized_end=138 + _globals['_SAMPLERES']._serialized_start=140 + _globals['_SAMPLERES']._serialized_end=182 + _globals['_DISPERSALMESSAGE']._serialized_start=185 + _globals['_DISPERSALMESSAGE']._serialized_end=407 +# @@protoc_insertion_point(module_scope) diff --git a/da/executor/executor.py b/da/executor/executor.py index da965a8..b6ded13 100644 --- a/da/executor/executor.py +++ b/da/executor/executor.py @@ -11,11 +11,12 @@ class Executor: self.interval = 10 async def execute(self): - data = "TEST DATA" + blob_id = b"test" + data = b"TEST DATA" while True: try: for transport in self.connections: - message = proto.new_dispersal_put_msg(data) + message = proto.new_dispersal_req_msg(blob_id, data) await transport.write(message) await asyncio.sleep(self.interval) except asyncio.CancelledError: @@ -35,11 +36,12 @@ class Executor: print(f"Failed to connect or lost connection: {e}") async def _handle(self, conn_id, writer, message): - msg_type, msg_id, data = message - if msg_type == proto.DISPERSAL_OK: - print(f"Executor: Dispersal OK from connection {conn_id}, message ID {msg_id}.") - elif msg_type == proto.SAMPLE_OK: - print(f"Executor: Sample OK from connection {conn_id}, message ID {msg_id}.") + if message.HasField('dispersal_res'): + print(f"Received DispersalRes: blob_id={message.dispersal_res.blob_id}") + elif message.HasField('sample_res'): + print(f"Received SampleRes: blob_id={message.sample_res.blob_id}") + else: + print("Received unknown message type") async def run(self): await asyncio.gather(*(self.connect() for _ in range(self.col_num))) diff --git a/da/executor/mock_network.py b/da/executor/mock_network.py index 8f57229..1873283 100644 --- a/da/executor/mock_network.py +++ b/da/executor/mock_network.py @@ -11,7 +11,7 @@ class App: nodes = [Node(self.addr, start_port + i) for i in range(num_nodes)] await asyncio.gather(*(node.run() for node in nodes)) - async def run_nodes_with_executor(self, col_number): + async def run_node_with_executor(self, col_number): node = Node(self.addr, 8888) executor = Executor(self.addr, 8888, col_number) await asyncio.gather(node.run(), executor.run()) @@ -26,7 +26,7 @@ def main(): # asyncio.run(app.run_nodes(10000, 4096)) # asyncio.run(app.run_executor('localhost', 8888, 1)) - asyncio.run(app.run_nodes_with_executor(10)) + asyncio.run(app.run_node_with_executor(1)) if __name__ == '__main__': main() diff --git a/da/executor/node.py b/da/executor/node.py index 93b3749..4035943 100644 --- a/da/executor/node.py +++ b/da/executor/node.py @@ -25,12 +25,12 @@ class Node: await server.serve_forever() async def _handle(self, conn_id, writer, message): - msg_type, msg_id, data = message - if msg_type == proto.DISPERSAL_PUT: - response = proto.new_dispersal_ok_msg(msg_id) - writer.write(response) - elif msg_type == proto.SAMPLE_PUT: - pass + if message.HasField('dispersal_req'): + print(f"Received DispersalRes: blob_id={message.dispersal_req.blob_id}") + elif message.HasField('sample_req'): + print(f"Received SampleRes: blob_id={message.sample_req.blob_id}") + else: + print("Received unknown message type") async def run(self): await self.listen() diff --git a/da/executor/proto.py b/da/executor/proto.py index dc6ed8b..4d1e534 100644 --- a/da/executor/proto.py +++ b/da/executor/proto.py @@ -1,50 +1,35 @@ -import struct +import dispersal_pb2 from itertools import count -DISPERSAL_PUT = 0x01 -DISPERSAL_OK = 0x02 -SAMPLE_PUT = 0x03 -SAMPLE_OK = 0x04 +MAX_MSG_LEN_BYTES = 2 -HEADER_SIZE = 9 -HEADER_FORMAT = "!B I I" +def pack_message(message): + # SerializeToString method returns an instance of bytes. + data = message.SerializeToString() + length_prefix = len(data).to_bytes(MAX_MSG_LEN_BYTES, byteorder='big') + return length_prefix + data -DISPERSAL_HASH_COL_SIZE = 6 -# First 4 bytes for the hash and the next 2 bytes for the column index. -DISPERSAL_HASH_COL_FORMAT = "!I H" +def unpack_message(data): + message = dispersal_pb2.DispersalMessage() + message.ParseFromString(data) + return message -msg_id_counter = count(start=1) +def new_dispersal_req_msg(blob_id, data): + dispersal_req = dispersal_pb2.DispersalReq(blob_id=blob_id, data=data) + dispersal_message = dispersal_pb2.DispersalMessage(dispersal_req=dispersal_req) + return pack_message(dispersal_message) -def pack_header(msg_type, msg_id, data): - encoded_data = data.encode() - data_length = len(encoded_data) - header = struct.pack(HEADER_FORMAT, msg_type, msg_id, data_length) - return header + encoded_data +def new_dispersal_res_msg(blob_id): + dispersal_res = dispersal_pb2.DispersalRes(blob_id=blob_id) + dispersal_message = dispersal_pb2.DispersalMessage(dispersal_res=dispersal_res) + return pack_message(dispersal_message) -def unpack_header(data): - if len(data) < HEADER_SIZE: - return None, None, None, None - msg_type, msg_id, data_length = struct.unpack(HEADER_FORMAT, data[:HEADER_SIZE]) - return msg_type, msg_id, data_length +def new_sample_req_msg(blob_id): + sample_req = dispersal_pb2.SampleReq(blob_id=blob_id) + dispersal_message = dispersal_pb2.DispersalMessage(sample_req=sample_req) + return pack_message(dispersal_message) -def new_dispersal_put_msg(data): - msg_id = next(msg_id_counter) - return pack_header(DISPERSAL_PUT, msg_id, data) - -def new_dispersal_ok_msg(msg_id): - return pack_header(DISPERSAL_OK, msg_id, "") - -def new_sample_put_msg(data): - msg_id = next(msg_id_counter) - return pack_header(SAMPLE_PUT, msg_id, data) - -def new_sample_ok_msg(msg_id, response_data): - return pack_header(SAMPLE_OK, msg_id, response_data) - -def parse_dispersal_data(data): - if len(data) >= DISPERSAL_HASH_COL_SIZE: - hash_value, col_index = struct.unpack(DISPERSAL_HASH_COL_FORMAT, data[:DISPERSAL_HASH_COL_SIZE]) - remaining_data = data[DISPERSAL_HASH_COL_SIZE:] - return hash_value, col_index, remaining_data - else: - raise ValueError("Data is too short to unpack hash and col.") +def new_sample_res_msg(blob_id, data): + sample_res = dispersal_pb2.SampleRes(blob_id=blob_id, data=data) + dispersal_message = dispersal_pb2.DispersalMessage(sample_res=sample_res) + return pack_message(dispersal_message) diff --git a/da/executor/transport.py b/da/executor/transport.py index de09fdb..9d928a5 100644 --- a/da/executor/transport.py +++ b/da/executor/transport.py @@ -1,5 +1,4 @@ import asyncio -import struct import proto class Transport: @@ -12,10 +11,13 @@ class Transport: async def read_and_process(self): try: while True: - header = await self.reader.readexactly(9) # Assuming the header is 9 bytes long - msg_type, msg_id, data_length = proto.unpack_header(header) + length_prefix = await self.reader.readexactly(proto.MAX_MSG_LEN_BYTES) + data_length = int.from_bytes(length_prefix, byteorder='big') + data = await self.reader.readexactly(data_length) - await self.handler(self.conn_id, self.writer, (msg_type, msg_id, data)) + message = proto.unpack_message(data) + + await self.handler(self.conn_id, self.writer, message) except asyncio.IncompleteReadError: print("Transport: Connection closed by the peer.") except Exception as e: @@ -24,7 +26,8 @@ class Transport: self.writer.close() await self.writer.wait_closed() - async def write(self, data): - self.writer.write(data) + async def write(self, message): + self.writer.write(message) await self.writer.drain() + diff --git a/requirements.txt b/requirements.txt index 38ae3bd..40cf5b8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,3 +13,4 @@ portalocker==2.8.2 # portable file locking keum==0.2.0 # for CL's use of more obscure curves poseidon-hash==0.1.4 # used as the algebraic hash in CL hypothesis==6.103.0 +protobuf==5.27.2