Direct DA connection protocol (#100)

* WIP: Direct DA connection protocol

* Dispersal wrapper messages for executor to node communication

* Reusable types in dispersal.proto

* Add helper methods for dispersal errors

* Remove redundant BlobId type

* Write to connections in parallel

* Explicite package name for dispersal proto messages

* Parse length+message in the proto module

* Move dispersal proto into subnets poc

* Test for protocol message helper functions

* Use dispersal proto message in poc executor
This commit is contained in:
gusto 2024-07-19 10:10:56 +03:00 committed by GitHub
parent 376c66485b
commit 4d49fba52a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 535 additions and 1 deletions

View File

@ -0,0 +1,76 @@
# Zone Executor to Nomos DA Communication
Protocol for communication between the Zone Executor and Nomos DA using Protocol Buffers (protobuf).
## Overview
The protocol defines messages used to request and respond to data dispersal, sampling operations, and session control within the Nomos DA system. The communication involves the exchange of blobs (binary large objects) and error handling for various operations.
## Messages
### Blob
- **Blob**: Represents the binary data to be dispersed.
- `bytes blob_id`: Unique identifier for the blob.
- `bytes data`: The binary data of the blob.
### Error Handling
- **DispersalErr**: Represents errors related to dispersal operations.
- `bytes blob_id`: Unique identifier of the blob related to the error.
- `enum DispersalErrType`: Enumeration of dispersal error types.
- `CHUNK_SIZE`: Error due to incorrect chunk size.
- `VERIFICATION`: Error due to verification failure.
- `string err_description`: Description of the error.
- **SampleErr**: Represents errors related to sample operations.
- `bytes blob_id`: Unique identifier of the blob related to the error.
- `enum SampleErrType`: Enumeration of sample error types.
- `NOT_FOUND`: Error when a blob is not found.
- `string err_description`: Description of the error.
### Dispersal
- **DispersalReq**: Request message for dispersing a blob.
- `Blob blob`: The blob to be dispersed.
- **DispersalRes**: Response message for a dispersal request.
- `oneof message_type`: Contains either a success response or an error.
- `bytes blob_id`: Unique identifier of the dispersed blob.
- `DispersalErr err`: Error occurred during dispersal.
### Sample
- **SampleReq**: Request message for sampling a blob.
- `bytes blob_id`: Unique identifier of the blob to be sampled.
- **SampleRes**: Response message for a sample request.
- `oneof message_type`: Contains either a success response or an error.
- `Blob blob`: The sampled blob.
- `SampleErr err`: Error occurred during sampling.
### Session Control
- **CloseMsg**: Message to close a session with a reason.
- `enum CloseReason`: Enumeration of close reasons.
- `GRACEFUL_SHUTDOWN`: Graceful shutdown of the session.
- `SUBNET_CHANGE`: Change in the subnet.
- `SUBNET_SAMPLE_FAIL`: Subnet sample failure.
- `CloseReason reason`: Reason for closing the session.
- **SessionReq**: Request message for session control.
- `oneof message_type`: Contains one of the following message types.
- `CloseMsg close_msg`: Message to close the session.
### DispersalMessage
- **DispersalMessage**: Wrapper message for different types of dispersal and sampling messages.
- `oneof message_type`: Contains one of the following message types.
- `DispersalReq dispersal_req`: Dispersal request.
- `DispersalRes dispersal_res`: Dispersal response.
- `SampleReq sample_req`: Sample request.
- `SampleRes sample_res`: Sample response.
## Protobuf
To generate the updated protobuf serializer from `dispersal.proto`, run the following command:
```bash
protoc --python_out=. dispersal.proto
```
This will generate the necessary Python code to serialize and deserialize the messages defined in the `dispersal.proto` file.

View File

View File

@ -0,0 +1,87 @@
syntax = "proto3";
package nomos.da.dispersal.v1;
message Blob {
bytes blob_id = 1;
bytes data = 2;
}
// DISPERSAL
message DispersalErr {
bytes blob_id = 1;
enum DispersalErrType {
CHUNK_SIZE = 0;
VERIFICATION = 1;
}
DispersalErrType err_type = 2;
string err_description = 3;
}
message DispersalReq {
Blob blob = 1;
}
message DispersalRes {
oneof message_type {
bytes blob_id = 1;
DispersalErr err = 2;
}
}
// SAMPLING
message SampleErr {
bytes blob_id = 1;
enum SampleErrType {
NOT_FOUND = 0;
}
SampleErrType err_type = 2;
string err_description = 3;
}
message SampleReq {
bytes blob_id = 1;
}
message SampleRes {
oneof message_type {
Blob blob = 1;
SampleErr err = 2;
}
}
// SESSION CONTROL
message CloseMsg {
enum CloseReason {
GRACEFUL_SHUTDOWN = 0;
SUBNET_CHANGE = 1;
SUBNET_SAMPLE_FAIL = 2;
}
CloseReason reason = 1;
}
message SessionReq {
oneof message_type {
CloseMsg close_msg = 1;
}
}
// WRAPPER MESSAGE
message DispersalMessage {
oneof message_type {
DispersalReq dispersal_req = 1;
DispersalRes dispersal_res = 2;
SampleReq sample_req = 3;
SampleRes sample_res = 4;
SessionReq session_req = 5;
}
}

View File

@ -0,0 +1,60 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# NO CHECKED-IN PROTOBUF GENCODE
# source: dispersal.proto
# Protobuf Python Version: 5.27.1
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import runtime_version as _runtime_version
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
_runtime_version.ValidateProtobufRuntimeVersion(
_runtime_version.Domain.PUBLIC,
5,
27,
1,
'',
'dispersal.proto'
)
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0f\x64ispersal.proto\x12\x15nomos.da.dispersal.v1\"%\n\x04\x42lob\x12\x0f\n\x07\x62lob_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\"\xb6\x01\n\x0c\x44ispersalErr\x12\x0f\n\x07\x62lob_id\x18\x01 \x01(\x0c\x12\x46\n\x08\x65rr_type\x18\x02 \x01(\x0e\x32\x34.nomos.da.dispersal.v1.DispersalErr.DispersalErrType\x12\x17\n\x0f\x65rr_description\x18\x03 \x01(\t\"4\n\x10\x44ispersalErrType\x12\x0e\n\nCHUNK_SIZE\x10\x00\x12\x10\n\x0cVERIFICATION\x10\x01\"9\n\x0c\x44ispersalReq\x12)\n\x04\x62lob\x18\x01 \x01(\x0b\x32\x1b.nomos.da.dispersal.v1.Blob\"e\n\x0c\x44ispersalRes\x12\x11\n\x07\x62lob_id\x18\x01 \x01(\x0cH\x00\x12\x32\n\x03\x65rr\x18\x02 \x01(\x0b\x32#.nomos.da.dispersal.v1.DispersalErrH\x00\x42\x0e\n\x0cmessage_type\"\x97\x01\n\tSampleErr\x12\x0f\n\x07\x62lob_id\x18\x01 \x01(\x0c\x12@\n\x08\x65rr_type\x18\x02 \x01(\x0e\x32..nomos.da.dispersal.v1.SampleErr.SampleErrType\x12\x17\n\x0f\x65rr_description\x18\x03 \x01(\t\"\x1e\n\rSampleErrType\x12\r\n\tNOT_FOUND\x10\x00\"\x1c\n\tSampleReq\x12\x0f\n\x07\x62lob_id\x18\x01 \x01(\x0c\"y\n\tSampleRes\x12+\n\x04\x62lob\x18\x01 \x01(\x0b\x32\x1b.nomos.da.dispersal.v1.BlobH\x00\x12/\n\x03\x65rr\x18\x02 \x01(\x0b\x32 .nomos.da.dispersal.v1.SampleErrH\x00\x42\x0e\n\x0cmessage_type\"\x98\x01\n\x08\x43loseMsg\x12;\n\x06reason\x18\x01 \x01(\x0e\x32+.nomos.da.dispersal.v1.CloseMsg.CloseReason\"O\n\x0b\x43loseReason\x12\x15\n\x11GRACEFUL_SHUTDOWN\x10\x00\x12\x11\n\rSUBNET_CHANGE\x10\x01\x12\x16\n\x12SUBNET_SAMPLE_FAIL\x10\x02\"R\n\nSessionReq\x12\x34\n\tclose_msg\x18\x01 \x01(\x0b\x32\x1f.nomos.da.dispersal.v1.CloseMsgH\x00\x42\x0e\n\x0cmessage_type\"\xc8\x02\n\x10\x44ispersalMessage\x12<\n\rdispersal_req\x18\x01 \x01(\x0b\x32#.nomos.da.dispersal.v1.DispersalReqH\x00\x12<\n\rdispersal_res\x18\x02 \x01(\x0b\x32#.nomos.da.dispersal.v1.DispersalResH\x00\x12\x36\n\nsample_req\x18\x03 \x01(\x0b\x32 .nomos.da.dispersal.v1.SampleReqH\x00\x12\x36\n\nsample_res\x18\x04 \x01(\x0b\x32 .nomos.da.dispersal.v1.SampleResH\x00\x12\x38\n\x0bsession_req\x18\x05 \x01(\x0b\x32!.nomos.da.dispersal.v1.SessionReqH\x00\x42\x0e\n\x0cmessage_typeb\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'dispersal_pb2', _globals)
if not _descriptor._USE_C_DESCRIPTORS:
DESCRIPTOR._loaded_options = None
_globals['_BLOB']._serialized_start=42
_globals['_BLOB']._serialized_end=79
_globals['_DISPERSALERR']._serialized_start=82
_globals['_DISPERSALERR']._serialized_end=264
_globals['_DISPERSALERR_DISPERSALERRTYPE']._serialized_start=212
_globals['_DISPERSALERR_DISPERSALERRTYPE']._serialized_end=264
_globals['_DISPERSALREQ']._serialized_start=266
_globals['_DISPERSALREQ']._serialized_end=323
_globals['_DISPERSALRES']._serialized_start=325
_globals['_DISPERSALRES']._serialized_end=426
_globals['_SAMPLEERR']._serialized_start=429
_globals['_SAMPLEERR']._serialized_end=580
_globals['_SAMPLEERR_SAMPLEERRTYPE']._serialized_start=550
_globals['_SAMPLEERR_SAMPLEERRTYPE']._serialized_end=580
_globals['_SAMPLEREQ']._serialized_start=582
_globals['_SAMPLEREQ']._serialized_end=610
_globals['_SAMPLERES']._serialized_start=612
_globals['_SAMPLERES']._serialized_end=733
_globals['_CLOSEMSG']._serialized_start=736
_globals['_CLOSEMSG']._serialized_end=888
_globals['_CLOSEMSG_CLOSEREASON']._serialized_start=809
_globals['_CLOSEMSG_CLOSEREASON']._serialized_end=888
_globals['_SESSIONREQ']._serialized_start=890
_globals['_SESSIONREQ']._serialized_end=972
_globals['_DISPERSALMESSAGE']._serialized_start=975
_globals['_DISPERSALMESSAGE']._serialized_end=1303
# @@protoc_insertion_point(module_scope)

View File

@ -0,0 +1,123 @@
import asyncio
import argparse
import proto
from itertools import count
conn_id_counter = count(start=1)
class MockTransport:
def __init__(self, conn_id, reader, writer, handler):
self.conn_id = conn_id
self.reader = reader
self.writer = writer
self.handler = handler
async def read_and_process(self):
try:
while True:
message = await proto.unpack_from_reader(self.reader)
await self.handler(self.conn_id, self.writer, message)
except Exception as e:
print(f"MockTransport: An error occurred: {e}")
finally:
self.writer.close()
await self.writer.wait_closed()
async def write(self, message):
self.writer.write(message)
await self.writer.drain()
class MockNode:
def __init__(self, addr, port, handler=None):
self.addr = addr
self.port = port
self.handler = handler if handler else self._handle
async def _on_conn(self, reader, writer):
conn_id = next(conn_id_counter)
transport = MockTransport(conn_id, reader, writer, self.handler)
await transport.read_and_process()
async def _handle(self, conn_id, writer, message):
if message.HasField('dispersal_req'):
blob_id = message.dispersal_req.blob.blob_id
data = message.dispersal_req.blob.data
print(f"MockNode: Received DispersalRes: blob_id={blob_id}; data={data}")
# Imitate succesful verification.
writer.write(proto.new_dispersal_res_success_msg(blob_id))
elif message.HasField('sample_req'):
print(f"MockNode: Received SampleRes: blob_id={message.sample_req.blob_id}")
else:
print(f"MockNode: Received unknown message: {message} ")
async def run(self):
server = await asyncio.start_server(
self._on_conn, self.addr, self.port
)
print(f"MockNode: Server started at {self.addr}:{self.port}")
async with server:
await server.serve_forever()
class MockExecutor:
def __init__(self, addr, port, col_num, executor=None, handler=None):
self.addr = addr
self.port = port
self.col_num = col_num
self.connections = []
self.interval = 10
self.executor = executor if executor else self._execute
self.handler = handler if handler else self._handle
async def _execute(self):
message = proto.new_dispersal_req_msg(b"dummy_blob_id", b"dummy_data")
while True:
try:
await asyncio.gather(*[t.write(message) for t in self.connections])
await asyncio.sleep(self.interval)
except asyncio.CancelledError:
break
except Exception as e:
print(f"MockExecutor: Error during message sending: {e}")
async def _handle(self, conn_id, writer, message):
if message.HasField('dispersal_res'):
print(f"MockExecutor: Received DispersalRes: blob_id={message.dispersal_res.blob_id}")
elif message.HasField('sample_res'):
print(f"MockExecutor: Received SampleRes: blob_id={message.sample_res.blob_id}")
else:
print(f"MockExecutor: Received unknown message: {message}")
async def _connect(self):
try:
reader, writer = await asyncio.open_connection(self.addr, self.port)
conn_id = len(self.connections)
transport = MockTransport(conn_id, reader, writer, self.handler)
self.connections.append(transport)
print(f"MockExecutor: Connected to {self.addr}:{self.port}, ID: {conn_id}")
asyncio.create_task(transport.read_and_process())
except Exception as e:
print(f"MockExecutor: Failed to connect or lost connection: {e}")
async def run(self):
await asyncio.gather(*(self._connect() for _ in range(self.col_num)))
await self.executor()
class MockSystem:
def __init__(self, addr='localhost'):
self.addr = addr
async def run_node_with_executor(self, col_number):
node = MockNode(self.addr, 8888)
executor = MockExecutor(self.addr, 8888, col_number)
await asyncio.gather(node.run(), executor.run())
def main():
app = MockSystem()
asyncio.run(app.run_node_with_executor(1))
if __name__ == '__main__':
main()

View File

@ -0,0 +1,106 @@
import dispersal_pb2
from itertools import count
MAX_MSG_LEN_BYTES = 2
def pack_message(message):
# SerializeToString method returns an instance of bytes.
data = message.SerializeToString()
length_prefix = len(data).to_bytes(MAX_MSG_LEN_BYTES, byteorder='big')
return length_prefix + data
async def unpack_from_reader(reader):
length_prefix = await reader.readexactly(MAX_MSG_LEN_BYTES)
data_length = int.from_bytes(length_prefix, byteorder='big')
data = await reader.readexactly(data_length)
return parse(data)
def unpack_from_bytes(data):
length_prefix = data[:MAX_MSG_LEN_BYTES]
data_length = int.from_bytes(length_prefix, byteorder='big')
return parse(data[MAX_MSG_LEN_BYTES:MAX_MSG_LEN_BYTES + data_length])
def parse(data):
message = dispersal_pb2.DispersalMessage()
message.ParseFromString(data)
return message
# DISPERSAL
def new_dispersal_req_msg(blob_id, data):
blob = dispersal_pb2.Blob(blob_id=blob_id, data=data)
dispersal_req = dispersal_pb2.DispersalReq(blob=blob)
dispersal_message = dispersal_pb2.DispersalMessage(dispersal_req=dispersal_req)
return pack_message(dispersal_message)
def new_dispersal_res_success_msg(blob_id):
dispersal_res = dispersal_pb2.DispersalRes(blob_id=blob_id)
dispersal_message = dispersal_pb2.DispersalMessage(dispersal_res=dispersal_res)
return pack_message(dispersal_message)
def new_dispersal_res_chunk_size_error_msg(blob_id, description):
dispersal_err = dispersal_pb2.DispersalErr(
blob_id=blob_id, err_type=dispersal_pb2.DispersalErr.CHUNK_SIZE,
err_description=description
)
dispersal_res = dispersal_pb2.DispersalRes(err=dispersal_err)
dispersal_message = dispersal_pb2.DispersalMessage(dispersal_res=dispersal_res)
return pack_message(dispersal_message)
def new_dispersal_res_verification_error_msg(blob_id, description):
dispersal_err = dispersal_pb2.DispersalErr(
blob_id=blob_id, err_type=dispersal_pb2.DispersalErr.VERIFICATION,
err_description=description
)
dispersal_res = dispersal_pb2.DispersalRes(err=dispersal_err)
dispersal_message = dispersal_pb2.DispersalMessage(dispersal_res=dispersal_res)
return pack_message(dispersal_message)
# SAMPLING
def new_sample_req_msg(blob_id):
sample_req = dispersal_pb2.SampleReq(blob_id=blob_id)
dispersal_message = dispersal_pb2.DispersalMessage(sample_req=sample_req)
return pack_message(dispersal_message)
def new_sample_res_success_msg(blob_id, data):
blob = dispersal_pb2.Blob(blob_id=blob_id, data=data)
sample_res = dispersal_pb2.SampleRes(blob=blob)
dispersal_message = dispersal_pb2.DispersalMessage(sample_res=sample_res)
return pack_message(dispersal_message)
def new_sample_res_not_found_error_msg(blob_id, description):
sample_err = dispersal_pb2.SampleErr(
blob_id=blob_id, err_type=dispersal_pb2.SampleErr.NOT_FOUND,
err_description=description
)
sample_res = dispersal_pb2.SampleRes(err=sample_err)
dispersal_message = dispersal_pb2.DispersalMessage(sample_res=sample_res)
return pack_message(dispersal_message)
# SESSION CONTROL
def new_close_msg(reason):
close_msg = dispersal_pb2.CloseMsg(reason=reason)
return close_msg
def new_session_req_close_msg(reason):
close_msg = new_close_msg(reason)
session_req = dispersal_pb2.SessionReq(close_msg=close_msg)
dispersal_message = dispersal_pb2.DispersalMessage(session_req=session_req)
return dispersal_message
def new_session_req_graceful_shutdown_msg():
message = new_session_req_close_msg(dispersal_pb2.CloseMsg.GRACEFUL_SHUTDOWN)
return pack_message(message)
def new_session_req_subnet_change_msg():
message = new_session_req_close_msg(dispersal_pb2.CloseMsg.SUBNET_CHANGE)
return pack_message(message)
def new_session_req_subnet_sample_fail_msg():
message = new_session_req_close_msg(dispersal_pb2.CloseMsg.SUBNET_SAMPLE_FAIL)
return pack_message(message)

View File

@ -0,0 +1,75 @@
import dispersal_pb2
import proto
from unittest import TestCase
class TestMessageSerialization(TestCase):
def test_dispersal_req_msg(self):
blob_id = b"dummy_blob_id"
data = b"dummy_data"
packed_message = proto.new_dispersal_req_msg(blob_id, data)
message = proto.unpack_from_bytes(packed_message)
self.assertTrue(message.HasField('dispersal_req'))
self.assertEqual(message.dispersal_req.blob.blob_id, blob_id)
self.assertEqual(message.dispersal_req.blob.data, data)
def test_dispersal_res_success_msg(self):
blob_id = b"dummy_blob_id"
packed_message = proto.new_dispersal_res_success_msg(blob_id)
message = proto.unpack_from_bytes(packed_message)
self.assertTrue(message.HasField('dispersal_res'))
self.assertEqual(message.dispersal_res.blob_id, blob_id)
def test_dispersal_res_chunk_size_error_msg(self):
blob_id = b"dummy_blob_id"
description = "Chunk size error"
packed_message = proto.new_dispersal_res_chunk_size_error_msg(blob_id, description)
message = proto.unpack_from_bytes(packed_message)
self.assertTrue(message.HasField('dispersal_res'))
self.assertEqual(message.dispersal_res.err.blob_id, blob_id)
self.assertEqual(message.dispersal_res.err.err_type, dispersal_pb2.DispersalErr.CHUNK_SIZE)
self.assertEqual(message.dispersal_res.err.err_description, description)
def test_dispersal_res_verification_error_msg(self):
blob_id = b"dummy_blob_id"
description = "Verification error"
packed_message = proto.new_dispersal_res_verification_error_msg(blob_id, description)
message = proto.unpack_from_bytes(packed_message)
self.assertTrue(message.HasField('dispersal_res'))
self.assertEqual(message.dispersal_res.err.blob_id, blob_id)
self.assertEqual(message.dispersal_res.err.err_type, dispersal_pb2.DispersalErr.VERIFICATION)
self.assertEqual(message.dispersal_res.err.err_description, description)
def test_sample_req_msg(self):
blob_id = b"dummy_blob_id"
packed_message = proto.new_sample_req_msg(blob_id)
message = proto.unpack_from_bytes(packed_message)
self.assertTrue(message.HasField('sample_req'))
self.assertEqual(message.sample_req.blob_id, blob_id)
def test_sample_res_success_msg(self):
blob_id = b"dummy_blob_id"
data = b"dummy_data"
packed_message = proto.new_sample_res_success_msg(blob_id, data)
message = proto.unpack_from_bytes(packed_message)
self.assertTrue(message.HasField('sample_res'))
self.assertEqual(message.sample_res.blob.blob_id, blob_id)
self.assertEqual(message.sample_res.blob.data, data)
def test_sample_res_not_found_error_msg(self):
blob_id = b"dummy_blob_id"
description = "Blob not found"
packed_message = proto.new_sample_res_not_found_error_msg(blob_id, description)
message = proto.unpack_from_bytes(packed_message)
self.assertTrue(message.HasField('sample_res'))
self.assertEqual(message.sample_res.err.blob_id, blob_id)
self.assertEqual(message.sample_res.err.err_type, dispersal_pb2.SampleErr.NOT_FOUND)
self.assertEqual(message.sample_res.err.err_description, description)
def test_session_req_close_msg(self):
reason = dispersal_pb2.CloseMsg.GRACEFUL_SHUTDOWN
packed_message = proto.new_session_req_graceful_shutdown_msg()
message = proto.unpack_from_bytes(packed_message)
self.assertTrue(message.HasField('session_req'))
self.assertTrue(message.session_req.HasField('close_msg'))
self.assertEqual(message.session_req.close_msg.reason, reason)

View File

@ -9,6 +9,7 @@ from constants import HASH_LENGTH, PROTOCOL_ID
from libp2p import host, new_host
from libp2p.network.stream.net_stream_interface import INetStream
from libp2p.peer.peerinfo import info_from_p2p_addr
from da.network.dispersal import proto
class Executor:
@ -94,4 +95,9 @@ class Executor:
Send data to peer (async)
The index is the subnet number
"""
await stream.write(self.data[index])
blob_id = sha256(self.data)
blob_data = self.data[index]
message = proto.new_dispersal_req_msg(blob_id, blob_data)
await stream.write(message)

View File

@ -13,3 +13,4 @@ portalocker==2.8.2 # portable file locking
keum==0.2.0 # for CL's use of more obscure curves
poseidon-hash==0.1.4 # used as the algebraic hash in CL
hypothesis==6.103.0
protobuf==5.27.2