Dispersal wrapper messages for executor to node communication

This commit is contained in:
Gusto 2024-07-11 14:03:13 +03:00
parent 775d9fc6b4
commit dc1fa97fa3
No known key found for this signature in database
8 changed files with 129 additions and 63 deletions

View File

@ -0,0 +1,31 @@
syntax = "proto3";
package dispersal;
message DispersalReq {
bytes blob_id = 1;
bytes data = 2;
}
message DispersalRes {
int32 blob_id = 1;
}
message SampleReq {
int32 blob_id = 1;
}
message SampleRes {
int32 blob_id = 1;
bytes data = 2;
}
message DispersalMessage {
oneof message_type {
DispersalReq dispersal_req = 1;
DispersalRes dispersal_res = 2;
SampleReq sample_req = 3;
SampleRes sample_res = 4;
}
}

View File

@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# NO CHECKED-IN PROTOBUF GENCODE
# source: dispersal.proto
# Protobuf Python Version: 5.27.1
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import runtime_version as _runtime_version
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
_runtime_version.ValidateProtobufRuntimeVersion(
_runtime_version.Domain.PUBLIC,
5,
27,
1,
'',
'dispersal.proto'
)
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0f\x64ispersal.proto\x12\tdispersal\"-\n\x0c\x44ispersalReq\x12\x0f\n\x07\x62lob_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\"\x1f\n\x0c\x44ispersalRes\x12\x0f\n\x07\x62lob_id\x18\x01 \x01(\x05\"\x1c\n\tSampleReq\x12\x0f\n\x07\x62lob_id\x18\x01 \x01(\x05\"*\n\tSampleRes\x12\x0f\n\x07\x62lob_id\x18\x01 \x01(\x05\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\"\xde\x01\n\x10\x44ispersalMessage\x12\x30\n\rdispersal_req\x18\x01 \x01(\x0b\x32\x17.dispersal.DispersalReqH\x00\x12\x30\n\rdispersal_res\x18\x02 \x01(\x0b\x32\x17.dispersal.DispersalResH\x00\x12*\n\nsample_req\x18\x03 \x01(\x0b\x32\x14.dispersal.SampleReqH\x00\x12*\n\nsample_res\x18\x04 \x01(\x0b\x32\x14.dispersal.SampleResH\x00\x42\x0e\n\x0cmessage_typeb\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'dispersal_pb2', _globals)
if not _descriptor._USE_C_DESCRIPTORS:
DESCRIPTOR._loaded_options = None
_globals['_DISPERSALREQ']._serialized_start=30
_globals['_DISPERSALREQ']._serialized_end=75
_globals['_DISPERSALRES']._serialized_start=77
_globals['_DISPERSALRES']._serialized_end=108
_globals['_SAMPLEREQ']._serialized_start=110
_globals['_SAMPLEREQ']._serialized_end=138
_globals['_SAMPLERES']._serialized_start=140
_globals['_SAMPLERES']._serialized_end=182
_globals['_DISPERSALMESSAGE']._serialized_start=185
_globals['_DISPERSALMESSAGE']._serialized_end=407
# @@protoc_insertion_point(module_scope)

View File

@ -11,11 +11,12 @@ class Executor:
self.interval = 10
async def execute(self):
data = "TEST DATA"
blob_id = b"test"
data = b"TEST DATA"
while True:
try:
for transport in self.connections:
message = proto.new_dispersal_put_msg(data)
message = proto.new_dispersal_req_msg(blob_id, data)
await transport.write(message)
await asyncio.sleep(self.interval)
except asyncio.CancelledError:
@ -35,11 +36,12 @@ class Executor:
print(f"Failed to connect or lost connection: {e}")
async def _handle(self, conn_id, writer, message):
msg_type, msg_id, data = message
if msg_type == proto.DISPERSAL_OK:
print(f"Executor: Dispersal OK from connection {conn_id}, message ID {msg_id}.")
elif msg_type == proto.SAMPLE_OK:
print(f"Executor: Sample OK from connection {conn_id}, message ID {msg_id}.")
if message.HasField('dispersal_res'):
print(f"Received DispersalRes: blob_id={message.dispersal_res.blob_id}")
elif message.HasField('sample_res'):
print(f"Received SampleRes: blob_id={message.sample_res.blob_id}")
else:
print("Received unknown message type")
async def run(self):
await asyncio.gather(*(self.connect() for _ in range(self.col_num)))

View File

@ -11,7 +11,7 @@ class App:
nodes = [Node(self.addr, start_port + i) for i in range(num_nodes)]
await asyncio.gather(*(node.run() for node in nodes))
async def run_nodes_with_executor(self, col_number):
async def run_node_with_executor(self, col_number):
node = Node(self.addr, 8888)
executor = Executor(self.addr, 8888, col_number)
await asyncio.gather(node.run(), executor.run())
@ -26,7 +26,7 @@ def main():
# asyncio.run(app.run_nodes(10000, 4096))
# asyncio.run(app.run_executor('localhost', 8888, 1))
asyncio.run(app.run_nodes_with_executor(10))
asyncio.run(app.run_node_with_executor(1))
if __name__ == '__main__':
main()

View File

@ -25,12 +25,12 @@ class Node:
await server.serve_forever()
async def _handle(self, conn_id, writer, message):
msg_type, msg_id, data = message
if msg_type == proto.DISPERSAL_PUT:
response = proto.new_dispersal_ok_msg(msg_id)
writer.write(response)
elif msg_type == proto.SAMPLE_PUT:
pass
if message.HasField('dispersal_req'):
print(f"Received DispersalRes: blob_id={message.dispersal_req.blob_id}")
elif message.HasField('sample_req'):
print(f"Received SampleRes: blob_id={message.sample_req.blob_id}")
else:
print("Received unknown message type")
async def run(self):
await self.listen()

View File

@ -1,50 +1,35 @@
import struct
import dispersal_pb2
from itertools import count
DISPERSAL_PUT = 0x01
DISPERSAL_OK = 0x02
SAMPLE_PUT = 0x03
SAMPLE_OK = 0x04
MAX_MSG_LEN_BYTES = 2
HEADER_SIZE = 9
HEADER_FORMAT = "!B I I"
def pack_message(message):
# SerializeToString method returns an instance of bytes.
data = message.SerializeToString()
length_prefix = len(data).to_bytes(MAX_MSG_LEN_BYTES, byteorder='big')
return length_prefix + data
DISPERSAL_HASH_COL_SIZE = 6
# First 4 bytes for the hash and the next 2 bytes for the column index.
DISPERSAL_HASH_COL_FORMAT = "!I H"
def unpack_message(data):
message = dispersal_pb2.DispersalMessage()
message.ParseFromString(data)
return message
msg_id_counter = count(start=1)
def new_dispersal_req_msg(blob_id, data):
dispersal_req = dispersal_pb2.DispersalReq(blob_id=blob_id, data=data)
dispersal_message = dispersal_pb2.DispersalMessage(dispersal_req=dispersal_req)
return pack_message(dispersal_message)
def pack_header(msg_type, msg_id, data):
encoded_data = data.encode()
data_length = len(encoded_data)
header = struct.pack(HEADER_FORMAT, msg_type, msg_id, data_length)
return header + encoded_data
def new_dispersal_res_msg(blob_id):
dispersal_res = dispersal_pb2.DispersalRes(blob_id=blob_id)
dispersal_message = dispersal_pb2.DispersalMessage(dispersal_res=dispersal_res)
return pack_message(dispersal_message)
def unpack_header(data):
if len(data) < HEADER_SIZE:
return None, None, None, None
msg_type, msg_id, data_length = struct.unpack(HEADER_FORMAT, data[:HEADER_SIZE])
return msg_type, msg_id, data_length
def new_sample_req_msg(blob_id):
sample_req = dispersal_pb2.SampleReq(blob_id=blob_id)
dispersal_message = dispersal_pb2.DispersalMessage(sample_req=sample_req)
return pack_message(dispersal_message)
def new_dispersal_put_msg(data):
msg_id = next(msg_id_counter)
return pack_header(DISPERSAL_PUT, msg_id, data)
def new_dispersal_ok_msg(msg_id):
return pack_header(DISPERSAL_OK, msg_id, "")
def new_sample_put_msg(data):
msg_id = next(msg_id_counter)
return pack_header(SAMPLE_PUT, msg_id, data)
def new_sample_ok_msg(msg_id, response_data):
return pack_header(SAMPLE_OK, msg_id, response_data)
def parse_dispersal_data(data):
if len(data) >= DISPERSAL_HASH_COL_SIZE:
hash_value, col_index = struct.unpack(DISPERSAL_HASH_COL_FORMAT, data[:DISPERSAL_HASH_COL_SIZE])
remaining_data = data[DISPERSAL_HASH_COL_SIZE:]
return hash_value, col_index, remaining_data
else:
raise ValueError("Data is too short to unpack hash and col.")
def new_sample_res_msg(blob_id, data):
sample_res = dispersal_pb2.SampleRes(blob_id=blob_id, data=data)
dispersal_message = dispersal_pb2.DispersalMessage(sample_res=sample_res)
return pack_message(dispersal_message)

View File

@ -1,5 +1,4 @@
import asyncio
import struct
import proto
class Transport:
@ -12,10 +11,13 @@ class Transport:
async def read_and_process(self):
try:
while True:
header = await self.reader.readexactly(9) # Assuming the header is 9 bytes long
msg_type, msg_id, data_length = proto.unpack_header(header)
length_prefix = await self.reader.readexactly(proto.MAX_MSG_LEN_BYTES)
data_length = int.from_bytes(length_prefix, byteorder='big')
data = await self.reader.readexactly(data_length)
await self.handler(self.conn_id, self.writer, (msg_type, msg_id, data))
message = proto.unpack_message(data)
await self.handler(self.conn_id, self.writer, message)
except asyncio.IncompleteReadError:
print("Transport: Connection closed by the peer.")
except Exception as e:
@ -24,7 +26,8 @@ class Transport:
self.writer.close()
await self.writer.wait_closed()
async def write(self, data):
self.writer.write(data)
async def write(self, message):
self.writer.write(message)
await self.writer.drain()

View File

@ -13,3 +13,4 @@ portalocker==2.8.2 # portable file locking
keum==0.2.0 # for CL's use of more obscure curves
poseidon-hash==0.1.4 # used as the algebraic hash in CL
hypothesis==6.103.0
protobuf==5.27.2