From 7e4929de8c9d94e29fccf5202e3d6443c44a9474 Mon Sep 17 00:00:00 2001 From: Gusto Date: Wed, 17 Jul 2024 18:50:41 +0300 Subject: [PATCH] Move dispersal proto into subnets poc --- da/executor/README.md | 9 -- da/executor/dispersal_pb2.py | 52 -------- da/executor/executor.py | 46 ------- da/executor/mock_network.py | 30 ----- da/executor/node.py | 40 ------ da/executor/transport.py | 26 ---- da/network/dispersal/README.md | 76 +++++++++++ .../dispersal}/__init__.py | 0 .../dispersal}/dispersal.proto | 48 +++++-- da/network/dispersal/dispersal_pb2.py | 54 ++++++++ da/network/dispersal/mock_system.py | 123 ++++++++++++++++++ da/{executor => network/dispersal}/proto.py | 52 ++++++-- 12 files changed, 335 insertions(+), 221 deletions(-) delete mode 100644 da/executor/README.md delete mode 100644 da/executor/dispersal_pb2.py delete mode 100644 da/executor/executor.py delete mode 100644 da/executor/mock_network.py delete mode 100644 da/executor/node.py delete mode 100644 da/executor/transport.py create mode 100644 da/network/dispersal/README.md rename da/{executor => network/dispersal}/__init__.py (100%) rename da/{executor => network/dispersal}/dispersal.proto (52%) create mode 100644 da/network/dispersal/dispersal_pb2.py create mode 100644 da/network/dispersal/mock_system.py rename da/{executor => network/dispersal}/proto.py (61%) diff --git a/da/executor/README.md b/da/executor/README.md deleted file mode 100644 index 30974de..0000000 --- a/da/executor/README.md +++ /dev/null @@ -1,9 +0,0 @@ -# Zone Executor to Nomos DA communication - -## Protobuf - -To generate the updated protobuf serializer from `dispersal.proto` run: - -``` bash -protoc --python_out=. dispersal.proto -``` diff --git a/da/executor/dispersal_pb2.py b/da/executor/dispersal_pb2.py deleted file mode 100644 index 4d4828f..0000000 --- a/da/executor/dispersal_pb2.py +++ /dev/null @@ -1,52 +0,0 @@ -# -*- coding: utf-8 -*- -# Generated by the protocol buffer compiler. DO NOT EDIT! -# NO CHECKED-IN PROTOBUF GENCODE -# source: dispersal.proto -# Protobuf Python Version: 5.27.1 -"""Generated protocol buffer code.""" -from google.protobuf import descriptor as _descriptor -from google.protobuf import descriptor_pool as _descriptor_pool -from google.protobuf import runtime_version as _runtime_version -from google.protobuf import symbol_database as _symbol_database -from google.protobuf.internal import builder as _builder -_runtime_version.ValidateProtobufRuntimeVersion( - _runtime_version.Domain.PUBLIC, - 5, - 27, - 1, - '', - 'dispersal.proto' -) -# @@protoc_insertion_point(imports) - -_sym_db = _symbol_database.Default() - - - - -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0f\x64ispersal.proto\x12\tdispersal\"%\n\x04\x42lob\x12\x0f\n\x07\x62lob_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\"\x1c\n\x05\x45rror\x12\x13\n\x0b\x64\x65scription\x18\x01 \x01(\t\"x\n\x0c\x44ispersalErr\x12*\n\x0e\x63hunk_size_err\x18\x01 \x01(\x0b\x32\x10.dispersal.ErrorH\x00\x12,\n\x10verification_err\x18\x02 \x01(\x0b\x32\x10.dispersal.ErrorH\x00\x42\x0e\n\x0cmessage_type\"-\n\x0c\x44ispersalReq\x12\x1d\n\x04\x62lob\x18\x01 \x01(\x0b\x32\x0f.dispersal.Blob\"Y\n\x0c\x44ispersalRes\x12\x11\n\x07\x62lob_id\x18\x01 \x01(\x0cH\x00\x12&\n\x03\x65rr\x18\x02 \x01(\x0b\x32\x17.dispersal.DispersalErrH\x00\x42\x0e\n\x0cmessage_type\"B\n\tSampleErr\x12%\n\tnot_found\x18\x01 \x01(\x0b\x32\x10.dispersal.ErrorH\x00\x42\x0e\n\x0cmessage_type\"\x1c\n\tSampleReq\x12\x0f\n\x07\x62lob_id\x18\x01 \x01(\x0c\"a\n\tSampleRes\x12\x1f\n\x04\x62lob\x18\x01 \x01(\x0b\x32\x0f.dispersal.BlobH\x00\x12#\n\x03\x65rr\x18\x02 \x01(\x0b\x32\x14.dispersal.SampleErrH\x00\x42\x0e\n\x0cmessage_type\"\xde\x01\n\x10\x44ispersalMessage\x12\x30\n\rdispersal_req\x18\x01 \x01(\x0b\x32\x17.dispersal.DispersalReqH\x00\x12\x30\n\rdispersal_res\x18\x02 \x01(\x0b\x32\x17.dispersal.DispersalResH\x00\x12*\n\nsample_req\x18\x03 \x01(\x0b\x32\x14.dispersal.SampleReqH\x00\x12*\n\nsample_res\x18\x04 \x01(\x0b\x32\x14.dispersal.SampleResH\x00\x42\x0e\n\x0cmessage_typeb\x06proto3') - -_globals = globals() -_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) -_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'dispersal_pb2', _globals) -if not _descriptor._USE_C_DESCRIPTORS: - DESCRIPTOR._loaded_options = None - _globals['_BLOB']._serialized_start=30 - _globals['_BLOB']._serialized_end=67 - _globals['_ERROR']._serialized_start=69 - _globals['_ERROR']._serialized_end=97 - _globals['_DISPERSALERR']._serialized_start=99 - _globals['_DISPERSALERR']._serialized_end=219 - _globals['_DISPERSALREQ']._serialized_start=221 - _globals['_DISPERSALREQ']._serialized_end=266 - _globals['_DISPERSALRES']._serialized_start=268 - _globals['_DISPERSALRES']._serialized_end=357 - _globals['_SAMPLEERR']._serialized_start=359 - _globals['_SAMPLEERR']._serialized_end=425 - _globals['_SAMPLEREQ']._serialized_start=427 - _globals['_SAMPLEREQ']._serialized_end=455 - _globals['_SAMPLERES']._serialized_start=457 - _globals['_SAMPLERES']._serialized_end=554 - _globals['_DISPERSALMESSAGE']._serialized_start=557 - _globals['_DISPERSALMESSAGE']._serialized_end=779 -# @@protoc_insertion_point(module_scope) diff --git a/da/executor/executor.py b/da/executor/executor.py deleted file mode 100644 index 11458af..0000000 --- a/da/executor/executor.py +++ /dev/null @@ -1,46 +0,0 @@ -import asyncio -import proto -from transport import Transport - -class Executor: - def __init__(self, addr, port, col_num): - self.addr = addr - self.port = port - self.col_num = col_num - self.connections = [] - self.interval = 10 - - async def execute(self): - message = proto.new_dispersal_req_msg(b"dummy_blob_id", b"dummy_data") - while True: - try: - # TODO: Mock original data conversion into blobs. - await asyncio.gather(*[t.write(message) for t in self.connections]) - await asyncio.sleep(self.interval) - except asyncio.CancelledError: - break - except Exception as e: - print(f"Executor: Error during message sending: {e}") - - async def connect(self): - try: - reader, writer = await asyncio.open_connection(self.addr, self.port) - conn_id = len(self.connections) - transport = Transport(conn_id, reader, writer, self._handle) - self.connections.append(transport) - print(f"Executor: Connected to {self.addr}:{self.port}, ID: {conn_id}") - asyncio.create_task(transport.read_and_process()) - except Exception as e: - print(f"Executor: Failed to connect or lost connection: {e}") - - async def _handle(self, conn_id, writer, message): - if message.HasField('dispersal_res'): - print(f"Executor: Received DispersalRes: blob_id={message.dispersal_res.blob_id}") - elif message.HasField('sample_res'): - print(f"Executor: Received SampleRes: blob_id={message.sample_res.blob_id}") - else: - print(f"Executor: Received unknown message: {message}") - - async def run(self): - await asyncio.gather(*(self.connect() for _ in range(self.col_num))) - await self.execute() diff --git a/da/executor/mock_network.py b/da/executor/mock_network.py deleted file mode 100644 index f6dbaa6..0000000 --- a/da/executor/mock_network.py +++ /dev/null @@ -1,30 +0,0 @@ -import asyncio -import argparse -from node import Node -from executor import Executor - -class App: - def __init__(self, addr='localhost'): - self.addr = addr - - async def run_nodes(self, start_port, num_nodes): - nodes = [Node(self.addr, start_port + i) for i in range(num_nodes)] - await asyncio.gather(*(node.run() for node in nodes)) - - async def run_node_with_executor(self, col_number): - node = Node(self.addr, 8888) - executor = Executor(self.addr, 8888, col_number) - await asyncio.gather(node.run(), executor.run()) - - async def run_executor(self, remote_addr, start_port, col_number): - executor = Executor(remote_addr, start_port, col_number) - await asyncio.gather(executor.run()) - -def main(): - # TODO: Add args parser. - app = App() - - asyncio.run(app.run_node_with_executor(1)) - -if __name__ == '__main__': - main() diff --git a/da/executor/node.py b/da/executor/node.py deleted file mode 100644 index 363de35..0000000 --- a/da/executor/node.py +++ /dev/null @@ -1,40 +0,0 @@ -import asyncio -import struct -import proto -from itertools import count -from transport import Transport - -conn_id_counter = count(start=1) - -class Node: - def __init__(self, addr, port): - self.addr = addr - self.port = port - - async def _on_conn(self, reader, writer): - conn_id = next(conn_id_counter) - transport = Transport(conn_id, reader, writer, self._handle) - await transport.read_and_process() - - async def listen(self): - server = await asyncio.start_server( - self._on_conn, self.addr, self.port - ) - print(f"Node: Server started at {self.addr}:{self.port}") - async with server: - await server.serve_forever() - - async def _handle(self, conn_id, writer, message): - if message.HasField('dispersal_req'): - blob_id = message.dispersal_req.blob.blob_id - data = message.dispersal_req.blob.data - print(f"Node: Received DispersalRes: blob_id={blob_id}; data={data}") - # Imitate succesful verification. - writer.write(proto.new_dispersal_res_success_msg(blob_id)) - elif message.HasField('sample_req'): - print(f"Node: Received SampleRes: blob_id={message.sample_req.blob_id}") - else: - print(f"Node: Received unknown message: {message} ") - - async def run(self): - await self.listen() diff --git a/da/executor/transport.py b/da/executor/transport.py deleted file mode 100644 index da33d8c..0000000 --- a/da/executor/transport.py +++ /dev/null @@ -1,26 +0,0 @@ -import asyncio -import proto - -class Transport: - def __init__(self, conn_id, reader, writer, handler): - self.conn_id = conn_id - self.reader = reader - self.writer = writer - self.handler = handler - - async def read_and_process(self): - try: - while True: - message = await proto.parse_from_reader(self.reader) - await self.handler(self.conn_id, self.writer, message) - except asyncio.IncompleteReadError: - print("Transport: Connection closed by the peer.") - except Exception as e: - print(f"Transport: An error occurred: {e}") - finally: - self.writer.close() - await self.writer.wait_closed() - - async def write(self, message): - self.writer.write(message) - await self.writer.drain() diff --git a/da/network/dispersal/README.md b/da/network/dispersal/README.md new file mode 100644 index 0000000..ab9086d --- /dev/null +++ b/da/network/dispersal/README.md @@ -0,0 +1,76 @@ +# Zone Executor to Nomos DA Communication + +Protocol for communication between the Zone Executor and Nomos DA using Protocol Buffers (protobuf). + +## Overview + +The protocol defines messages used to request and respond to data dispersal, sampling operations, and session control within the Nomos DA system. The communication involves the exchange of blobs (binary large objects) and error handling for various operations. + +## Messages + +### Blob +- **Blob**: Represents the binary data to be dispersed. + - `bytes blob_id`: Unique identifier for the blob. + - `bytes data`: The binary data of the blob. + +### Error Handling +- **DispersalErr**: Represents errors related to dispersal operations. + - `bytes blob_id`: Unique identifier of the blob related to the error. + - `enum DispersalErrType`: Enumeration of dispersal error types. + - `CHUNK_SIZE`: Error due to incorrect chunk size. + - `VERIFICATION`: Error due to verification failure. + - `string err_description`: Description of the error. + +- **SampleErr**: Represents errors related to sample operations. + - `bytes blob_id`: Unique identifier of the blob related to the error. + - `enum SampleErrType`: Enumeration of sample error types. + - `NOT_FOUND`: Error when a blob is not found. + - `string err_description`: Description of the error. + +### Dispersal +- **DispersalReq**: Request message for dispersing a blob. + - `Blob blob`: The blob to be dispersed. + +- **DispersalRes**: Response message for a dispersal request. + - `oneof message_type`: Contains either a success response or an error. + - `bytes blob_id`: Unique identifier of the dispersed blob. + - `DispersalErr err`: Error occurred during dispersal. + +### Sample +- **SampleReq**: Request message for sampling a blob. + - `bytes blob_id`: Unique identifier of the blob to be sampled. + +- **SampleRes**: Response message for a sample request. + - `oneof message_type`: Contains either a success response or an error. + - `Blob blob`: The sampled blob. + - `SampleErr err`: Error occurred during sampling. + +### Session Control +- **CloseMsg**: Message to close a session with a reason. + - `enum CloseReason`: Enumeration of close reasons. + - `GRACEFUL_SHUTDOWN`: Graceful shutdown of the session. + - `SUBNET_CHANGE`: Change in the subnet. + - `SUBNET_SAMPLE_FAIL`: Subnet sample failure. + - `CloseReason reason`: Reason for closing the session. + +- **SessionReq**: Request message for session control. + - `oneof message_type`: Contains one of the following message types. + - `CloseMsg close_msg`: Message to close the session. + +### DispersalMessage +- **DispersalMessage**: Wrapper message for different types of dispersal and sampling messages. + - `oneof message_type`: Contains one of the following message types. + - `DispersalReq dispersal_req`: Dispersal request. + - `DispersalRes dispersal_res`: Dispersal response. + - `SampleReq sample_req`: Sample request. + - `SampleRes sample_res`: Sample response. + +## Protobuf + +To generate the updated protobuf serializer from `dispersal.proto`, run the following command: + +```bash +protoc --python_out=. dispersal.proto +``` + +This will generate the necessary Python code to serialize and deserialize the messages defined in the `dispersal.proto` file. diff --git a/da/executor/__init__.py b/da/network/dispersal/__init__.py similarity index 100% rename from da/executor/__init__.py rename to da/network/dispersal/__init__.py diff --git a/da/executor/dispersal.proto b/da/network/dispersal/dispersal.proto similarity index 52% rename from da/executor/dispersal.proto rename to da/network/dispersal/dispersal.proto index 615c994..99fe9eb 100644 --- a/da/executor/dispersal.proto +++ b/da/network/dispersal/dispersal.proto @@ -1,21 +1,24 @@ syntax = "proto3"; -package nomos.da.dispersal; +package nomos.da.dispersal.v1; message Blob { bytes blob_id = 1; bytes data = 2; } -message Error { - string description = 1; -} +// DISPERSAL message DispersalErr { - oneof message_type { - Error chunk_size_err = 1; - Error verification_err = 2; + bytes blob_id = 1; + + enum DispersalErrType { + CHUNK_SIZE = 0; + VERIFICATION = 1; } + + DispersalErrType err_type = 2; + string err_description = 3; } message DispersalReq { @@ -29,10 +32,17 @@ message DispersalRes { } } +// SAMPLING + message SampleErr { - oneof message_type { - Error not_found = 1; + bytes blob_id = 1; + + enum SampleErrType { + NOT_FOUND = 0; } + + SampleErrType err_type = 2; + string err_description = 3; } message SampleReq { @@ -46,6 +56,26 @@ message SampleRes { } } +// SESSION CONTROL + +message CloseMsg { + enum CloseReason { + GRACEFUL_SHUTDOWN = 0; + SUBNET_CHANGE = 1; + SUBNET_SAMPLE_FAIL = 2; + } + + CloseReason reason = 1; +} + +message SessionReq { + oneof message_type { + CloseMsg close_msg = 1; + } +} + +// WRAPPER MESSAGE + message DispersalMessage { oneof message_type { DispersalReq dispersal_req = 1; diff --git a/da/network/dispersal/dispersal_pb2.py b/da/network/dispersal/dispersal_pb2.py new file mode 100644 index 0000000..5d7ae35 --- /dev/null +++ b/da/network/dispersal/dispersal_pb2.py @@ -0,0 +1,54 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE +# source: dispersal.proto +# Protobuf Python Version: 5.27.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 5, + 27, + 1, + '', + 'dispersal.proto' +) +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0f\x64ispersal.proto\x12\x12nomos.da.dispersal\"%\n\x04\x42lob\x12\x0f\n\x07\x62lob_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\"\xb3\x01\n\x0c\x44ispersalErr\x12\x0f\n\x07\x62lob_id\x18\x01 \x01(\x0c\x12\x43\n\x08\x65rr_type\x18\x02 \x01(\x0e\x32\x31.nomos.da.dispersal.DispersalErr.DispersalErrType\x12\x17\n\x0f\x65rr_description\x18\x03 \x01(\t\"4\n\x10\x44ispersalErrType\x12\x0e\n\nCHUNK_SIZE\x10\x00\x12\x10\n\x0cVERIFICATION\x10\x01\"6\n\x0c\x44ispersalReq\x12&\n\x04\x62lob\x18\x01 \x01(\x0b\x32\x18.nomos.da.dispersal.Blob\"b\n\x0c\x44ispersalRes\x12\x11\n\x07\x62lob_id\x18\x01 \x01(\x0cH\x00\x12/\n\x03\x65rr\x18\x02 \x01(\x0b\x32 .nomos.da.dispersal.DispersalErrH\x00\x42\x0e\n\x0cmessage_type\"\x94\x01\n\tSampleErr\x12\x0f\n\x07\x62lob_id\x18\x01 \x01(\x0c\x12=\n\x08\x65rr_type\x18\x02 \x01(\x0e\x32+.nomos.da.dispersal.SampleErr.SampleErrType\x12\x17\n\x0f\x65rr_description\x18\x03 \x01(\t\"\x1e\n\rSampleErrType\x12\r\n\tNOT_FOUND\x10\x00\"\x1c\n\tSampleReq\x12\x0f\n\x07\x62lob_id\x18\x01 \x01(\x0c\"s\n\tSampleRes\x12(\n\x04\x62lob\x18\x01 \x01(\x0b\x32\x18.nomos.da.dispersal.BlobH\x00\x12,\n\x03\x65rr\x18\x02 \x01(\x0b\x32\x1d.nomos.da.dispersal.SampleErrH\x00\x42\x0e\n\x0cmessage_type\"\x82\x02\n\x10\x44ispersalMessage\x12\x39\n\rdispersal_req\x18\x01 \x01(\x0b\x32 .nomos.da.dispersal.DispersalReqH\x00\x12\x39\n\rdispersal_res\x18\x02 \x01(\x0b\x32 .nomos.da.dispersal.DispersalResH\x00\x12\x33\n\nsample_req\x18\x03 \x01(\x0b\x32\x1d.nomos.da.dispersal.SampleReqH\x00\x12\x33\n\nsample_res\x18\x04 \x01(\x0b\x32\x1d.nomos.da.dispersal.SampleResH\x00\x42\x0e\n\x0cmessage_typeb\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'dispersal_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None + _globals['_BLOB']._serialized_start=39 + _globals['_BLOB']._serialized_end=76 + _globals['_DISPERSALERR']._serialized_start=79 + _globals['_DISPERSALERR']._serialized_end=258 + _globals['_DISPERSALERR_DISPERSALERRTYPE']._serialized_start=206 + _globals['_DISPERSALERR_DISPERSALERRTYPE']._serialized_end=258 + _globals['_DISPERSALREQ']._serialized_start=260 + _globals['_DISPERSALREQ']._serialized_end=314 + _globals['_DISPERSALRES']._serialized_start=316 + _globals['_DISPERSALRES']._serialized_end=414 + _globals['_SAMPLEERR']._serialized_start=417 + _globals['_SAMPLEERR']._serialized_end=565 + _globals['_SAMPLEERR_SAMPLEERRTYPE']._serialized_start=535 + _globals['_SAMPLEERR_SAMPLEERRTYPE']._serialized_end=565 + _globals['_SAMPLEREQ']._serialized_start=567 + _globals['_SAMPLEREQ']._serialized_end=595 + _globals['_SAMPLERES']._serialized_start=597 + _globals['_SAMPLERES']._serialized_end=712 + _globals['_DISPERSALMESSAGE']._serialized_start=715 + _globals['_DISPERSALMESSAGE']._serialized_end=973 +# @@protoc_insertion_point(module_scope) diff --git a/da/network/dispersal/mock_system.py b/da/network/dispersal/mock_system.py new file mode 100644 index 0000000..6d7ebd2 --- /dev/null +++ b/da/network/dispersal/mock_system.py @@ -0,0 +1,123 @@ +import asyncio +import argparse +import proto +from itertools import count + +conn_id_counter = count(start=1) + +class MockTransport: + def __init__(self, conn_id, reader, writer, handler): + self.conn_id = conn_id + self.reader = reader + self.writer = writer + self.handler = handler + + async def read_and_process(self): + try: + while True: + message = await proto.parse_from_reader(self.reader) + await self.handler(self.conn_id, self.writer, message) + except Exception as e: + print(f"MockTransport: An error occurred: {e}") + finally: + self.writer.close() + await self.writer.wait_closed() + + async def write(self, message): + self.writer.write(message) + await self.writer.drain() + + +class MockNode: + def __init__(self, addr, port, handler=None): + self.addr = addr + self.port = port + self.handler = handler if handler else self._handle + + async def _on_conn(self, reader, writer): + conn_id = next(conn_id_counter) + transport = MockTransport(conn_id, reader, writer, self.handler) + await transport.read_and_process() + + async def _handle(self, conn_id, writer, message): + if message.HasField('dispersal_req'): + blob_id = message.dispersal_req.blob.blob_id + data = message.dispersal_req.blob.data + print(f"MockNode: Received DispersalRes: blob_id={blob_id}; data={data}") + # Imitate succesful verification. + writer.write(proto.new_dispersal_res_success_msg(blob_id)) + elif message.HasField('sample_req'): + print(f"MockNode: Received SampleRes: blob_id={message.sample_req.blob_id}") + else: + print(f"MockNode: Received unknown message: {message} ") + + async def run(self): + server = await asyncio.start_server( + self._on_conn, self.addr, self.port + ) + print(f"MockNode: Server started at {self.addr}:{self.port}") + async with server: + await server.serve_forever() + + +class MockExecutor: + def __init__(self, addr, port, col_num, executor=None, handler=None): + self.addr = addr + self.port = port + self.col_num = col_num + self.connections = [] + self.interval = 10 + self.executor = executor if executor else self._execute + self.handler = handler if handler else self._handle + + async def _execute(self): + message = proto.new_dispersal_req_msg(b"dummy_blob_id", b"dummy_data") + while True: + try: + await asyncio.gather(*[t.write(message) for t in self.connections]) + await asyncio.sleep(self.interval) + except asyncio.CancelledError: + break + except Exception as e: + print(f"MockExecutor: Error during message sending: {e}") + + async def _handle(self, conn_id, writer, message): + if message.HasField('dispersal_res'): + print(f"MockExecutor: Received DispersalRes: blob_id={message.dispersal_res.blob_id}") + elif message.HasField('sample_res'): + print(f"MockExecutor: Received SampleRes: blob_id={message.sample_res.blob_id}") + else: + print(f"MockExecutor: Received unknown message: {message}") + + async def _connect(self): + try: + reader, writer = await asyncio.open_connection(self.addr, self.port) + conn_id = len(self.connections) + transport = MockTransport(conn_id, reader, writer, self.handler) + self.connections.append(transport) + print(f"MockExecutor: Connected to {self.addr}:{self.port}, ID: {conn_id}") + asyncio.create_task(transport.read_and_process()) + except Exception as e: + print(f"MockExecutor: Failed to connect or lost connection: {e}") + + async def run(self): + await asyncio.gather(*(self._connect() for _ in range(self.col_num))) + await self._execute() + + +class MockSystem: + def __init__(self, addr='localhost'): + self.addr = addr + + async def run_node_with_executor(self, col_number): + node = MockNode(self.addr, 8888) + executor = MockExecutor(self.addr, 8888, col_number) + await asyncio.gather(node.run(), executor.run()) + + +def main(): + app = MockSystem() + asyncio.run(app.run_node_with_executor(1)) + +if __name__ == '__main__': + main() diff --git a/da/executor/proto.py b/da/network/dispersal/proto.py similarity index 61% rename from da/executor/proto.py rename to da/network/dispersal/proto.py index 6c5baa8..022c833 100644 --- a/da/executor/proto.py +++ b/da/network/dispersal/proto.py @@ -20,6 +20,9 @@ def unpack_message(data): message.ParseFromString(data) return message + +# DISPERSAL + def new_dispersal_req_msg(blob_id, data): blob = dispersal_pb2.Blob(blob_id=blob_id, data=data) dispersal_req = dispersal_pb2.DispersalReq(blob=blob) @@ -31,20 +34,27 @@ def new_dispersal_res_success_msg(blob_id): dispersal_message = dispersal_pb2.DispersalMessage(dispersal_res=dispersal_res) return pack_message(dispersal_message) -def new_dispersal_res_chunk_size_error_msg(description): - error = dispersal_pb2.Error(description=description) - dispersal_err = dispersal_pb2.DispersalErr(chunk_size_err=error) +def new_dispersal_res_chunk_size_error_msg(blob_id, description): + dispersal_err = dispersal_pb2.DispersalErr( + blob_id=blob_id, err_type=dispersal_pb2.DispersalErr.CHUNK_SIZE, + err_description=description + ) dispersal_res = dispersal_pb2.DispersalRes(err=dispersal_err) dispersal_message = dispersal_pb2.DispersalMessage(dispersal_res=dispersal_res) return pack_message(dispersal_message) -def new_dispersal_res_verification_error_msg(description): - error = dispersal_pb2.Error(description=description) - dispersal_err = dispersal_pb2.DispersalErr(verification_err=error) +def new_dispersal_res_verification_error_msg(blob_id, description): + dispersal_err = dispersal_pb2.DispersalErr( + blob_id=blob_id, err_type=dispersal_pb2.DispersalErr.VERIFICATION, + err_description=description + ) dispersal_res = dispersal_pb2.DispersalRes(err=dispersal_err) dispersal_message = dispersal_pb2.DispersalMessage(dispersal_res=dispersal_res) return pack_message(dispersal_message) + +# SAMPLING + def new_sample_req_msg(blob_id): sample_req = dispersal_pb2.SampleReq(blob_id=blob_id) dispersal_message = dispersal_pb2.DispersalMessage(sample_req=sample_req) @@ -56,9 +66,33 @@ def new_sample_res_success_msg(blob_id, data): dispersal_message = dispersal_pb2.DispersalMessage(sample_res=sample_res) return pack_message(dispersal_message) -def new_sample_res_not_found_error_msg(description): - error = dispersal_pb2.Error(description=description) - sample_err = dispersal_pb2.SampleErr(not_found=error) +def new_sample_res_not_found_error_msg(blob_id, description): + sample_err = dispersal_pb2.SampleErr( + blob_id=blob_id, err_type=dispersal_pb2.SampleErr.NOT_FOUND, + err_description=description + ) sample_res = dispersal_pb2.SampleRes(err=sample_err) dispersal_message = dispersal_pb2.DispersalMessage(sample_res=sample_res) return pack_message(dispersal_message) + + +# SESSION CONTROL + +def new_close_msg(reason): + close_msg = dispersal_pb2.CloseMsg(reason=reason) + return close_msg + +def new_session_req_close_msg(reason): + close_msg = new_close_msg(reason) + session_req = dispersal_pb2.SessionReq(close_msg=close_msg) + dispersal_message = dispersal_pb2.DispersalMessage(session_req=session_req) + return pack_message(dispersal_message) + +def new_session_req_graceful_shutdown_msg(): + new_session_req_close_msg(dispersal_pb2.CloseMsg.GRACEFUL_SHUTDOWN) + +def new_session_req_subnet_change_msg(): + new_session_req_close_msg(dispersal_pb2.CloseMsg.SUBNET_CHANGE) + +def new_session_req_subnet_sample_fail_msg(): + new_session_req_close_msg(dispersal_pb2.CloseMsg.SUBNET_SAMPLE_FAIL)