Move dispersal proto into subnets poc

This commit is contained in:
Gusto 2024-07-17 18:50:41 +03:00
parent 4317d46e3f
commit 7e4929de8c
No known key found for this signature in database
12 changed files with 335 additions and 221 deletions

View File

@ -1,9 +0,0 @@
# Zone Executor to Nomos DA communication
## Protobuf
To generate the updated protobuf serializer from `dispersal.proto` run:
``` bash
protoc --python_out=. dispersal.proto
```

View File

@ -1,52 +0,0 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# NO CHECKED-IN PROTOBUF GENCODE
# source: dispersal.proto
# Protobuf Python Version: 5.27.1
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import runtime_version as _runtime_version
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
_runtime_version.ValidateProtobufRuntimeVersion(
_runtime_version.Domain.PUBLIC,
5,
27,
1,
'',
'dispersal.proto'
)
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0f\x64ispersal.proto\x12\tdispersal\"%\n\x04\x42lob\x12\x0f\n\x07\x62lob_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\"\x1c\n\x05\x45rror\x12\x13\n\x0b\x64\x65scription\x18\x01 \x01(\t\"x\n\x0c\x44ispersalErr\x12*\n\x0e\x63hunk_size_err\x18\x01 \x01(\x0b\x32\x10.dispersal.ErrorH\x00\x12,\n\x10verification_err\x18\x02 \x01(\x0b\x32\x10.dispersal.ErrorH\x00\x42\x0e\n\x0cmessage_type\"-\n\x0c\x44ispersalReq\x12\x1d\n\x04\x62lob\x18\x01 \x01(\x0b\x32\x0f.dispersal.Blob\"Y\n\x0c\x44ispersalRes\x12\x11\n\x07\x62lob_id\x18\x01 \x01(\x0cH\x00\x12&\n\x03\x65rr\x18\x02 \x01(\x0b\x32\x17.dispersal.DispersalErrH\x00\x42\x0e\n\x0cmessage_type\"B\n\tSampleErr\x12%\n\tnot_found\x18\x01 \x01(\x0b\x32\x10.dispersal.ErrorH\x00\x42\x0e\n\x0cmessage_type\"\x1c\n\tSampleReq\x12\x0f\n\x07\x62lob_id\x18\x01 \x01(\x0c\"a\n\tSampleRes\x12\x1f\n\x04\x62lob\x18\x01 \x01(\x0b\x32\x0f.dispersal.BlobH\x00\x12#\n\x03\x65rr\x18\x02 \x01(\x0b\x32\x14.dispersal.SampleErrH\x00\x42\x0e\n\x0cmessage_type\"\xde\x01\n\x10\x44ispersalMessage\x12\x30\n\rdispersal_req\x18\x01 \x01(\x0b\x32\x17.dispersal.DispersalReqH\x00\x12\x30\n\rdispersal_res\x18\x02 \x01(\x0b\x32\x17.dispersal.DispersalResH\x00\x12*\n\nsample_req\x18\x03 \x01(\x0b\x32\x14.dispersal.SampleReqH\x00\x12*\n\nsample_res\x18\x04 \x01(\x0b\x32\x14.dispersal.SampleResH\x00\x42\x0e\n\x0cmessage_typeb\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'dispersal_pb2', _globals)
if not _descriptor._USE_C_DESCRIPTORS:
DESCRIPTOR._loaded_options = None
_globals['_BLOB']._serialized_start=30
_globals['_BLOB']._serialized_end=67
_globals['_ERROR']._serialized_start=69
_globals['_ERROR']._serialized_end=97
_globals['_DISPERSALERR']._serialized_start=99
_globals['_DISPERSALERR']._serialized_end=219
_globals['_DISPERSALREQ']._serialized_start=221
_globals['_DISPERSALREQ']._serialized_end=266
_globals['_DISPERSALRES']._serialized_start=268
_globals['_DISPERSALRES']._serialized_end=357
_globals['_SAMPLEERR']._serialized_start=359
_globals['_SAMPLEERR']._serialized_end=425
_globals['_SAMPLEREQ']._serialized_start=427
_globals['_SAMPLEREQ']._serialized_end=455
_globals['_SAMPLERES']._serialized_start=457
_globals['_SAMPLERES']._serialized_end=554
_globals['_DISPERSALMESSAGE']._serialized_start=557
_globals['_DISPERSALMESSAGE']._serialized_end=779
# @@protoc_insertion_point(module_scope)

View File

@ -1,46 +0,0 @@
import asyncio
import proto
from transport import Transport
class Executor:
def __init__(self, addr, port, col_num):
self.addr = addr
self.port = port
self.col_num = col_num
self.connections = []
self.interval = 10
async def execute(self):
message = proto.new_dispersal_req_msg(b"dummy_blob_id", b"dummy_data")
while True:
try:
# TODO: Mock original data conversion into blobs.
await asyncio.gather(*[t.write(message) for t in self.connections])
await asyncio.sleep(self.interval)
except asyncio.CancelledError:
break
except Exception as e:
print(f"Executor: Error during message sending: {e}")
async def connect(self):
try:
reader, writer = await asyncio.open_connection(self.addr, self.port)
conn_id = len(self.connections)
transport = Transport(conn_id, reader, writer, self._handle)
self.connections.append(transport)
print(f"Executor: Connected to {self.addr}:{self.port}, ID: {conn_id}")
asyncio.create_task(transport.read_and_process())
except Exception as e:
print(f"Executor: Failed to connect or lost connection: {e}")
async def _handle(self, conn_id, writer, message):
if message.HasField('dispersal_res'):
print(f"Executor: Received DispersalRes: blob_id={message.dispersal_res.blob_id}")
elif message.HasField('sample_res'):
print(f"Executor: Received SampleRes: blob_id={message.sample_res.blob_id}")
else:
print(f"Executor: Received unknown message: {message}")
async def run(self):
await asyncio.gather(*(self.connect() for _ in range(self.col_num)))
await self.execute()

View File

@ -1,30 +0,0 @@
import asyncio
import argparse
from node import Node
from executor import Executor
class App:
def __init__(self, addr='localhost'):
self.addr = addr
async def run_nodes(self, start_port, num_nodes):
nodes = [Node(self.addr, start_port + i) for i in range(num_nodes)]
await asyncio.gather(*(node.run() for node in nodes))
async def run_node_with_executor(self, col_number):
node = Node(self.addr, 8888)
executor = Executor(self.addr, 8888, col_number)
await asyncio.gather(node.run(), executor.run())
async def run_executor(self, remote_addr, start_port, col_number):
executor = Executor(remote_addr, start_port, col_number)
await asyncio.gather(executor.run())
def main():
# TODO: Add args parser.
app = App()
asyncio.run(app.run_node_with_executor(1))
if __name__ == '__main__':
main()

View File

@ -1,40 +0,0 @@
import asyncio
import struct
import proto
from itertools import count
from transport import Transport
conn_id_counter = count(start=1)
class Node:
def __init__(self, addr, port):
self.addr = addr
self.port = port
async def _on_conn(self, reader, writer):
conn_id = next(conn_id_counter)
transport = Transport(conn_id, reader, writer, self._handle)
await transport.read_and_process()
async def listen(self):
server = await asyncio.start_server(
self._on_conn, self.addr, self.port
)
print(f"Node: Server started at {self.addr}:{self.port}")
async with server:
await server.serve_forever()
async def _handle(self, conn_id, writer, message):
if message.HasField('dispersal_req'):
blob_id = message.dispersal_req.blob.blob_id
data = message.dispersal_req.blob.data
print(f"Node: Received DispersalRes: blob_id={blob_id}; data={data}")
# Imitate succesful verification.
writer.write(proto.new_dispersal_res_success_msg(blob_id))
elif message.HasField('sample_req'):
print(f"Node: Received SampleRes: blob_id={message.sample_req.blob_id}")
else:
print(f"Node: Received unknown message: {message} ")
async def run(self):
await self.listen()

View File

@ -1,26 +0,0 @@
import asyncio
import proto
class Transport:
def __init__(self, conn_id, reader, writer, handler):
self.conn_id = conn_id
self.reader = reader
self.writer = writer
self.handler = handler
async def read_and_process(self):
try:
while True:
message = await proto.parse_from_reader(self.reader)
await self.handler(self.conn_id, self.writer, message)
except asyncio.IncompleteReadError:
print("Transport: Connection closed by the peer.")
except Exception as e:
print(f"Transport: An error occurred: {e}")
finally:
self.writer.close()
await self.writer.wait_closed()
async def write(self, message):
self.writer.write(message)
await self.writer.drain()

View File

@ -0,0 +1,76 @@
# Zone Executor to Nomos DA Communication
Protocol for communication between the Zone Executor and Nomos DA using Protocol Buffers (protobuf).
## Overview
The protocol defines messages used to request and respond to data dispersal, sampling operations, and session control within the Nomos DA system. The communication involves the exchange of blobs (binary large objects) and error handling for various operations.
## Messages
### Blob
- **Blob**: Represents the binary data to be dispersed.
- `bytes blob_id`: Unique identifier for the blob.
- `bytes data`: The binary data of the blob.
### Error Handling
- **DispersalErr**: Represents errors related to dispersal operations.
- `bytes blob_id`: Unique identifier of the blob related to the error.
- `enum DispersalErrType`: Enumeration of dispersal error types.
- `CHUNK_SIZE`: Error due to incorrect chunk size.
- `VERIFICATION`: Error due to verification failure.
- `string err_description`: Description of the error.
- **SampleErr**: Represents errors related to sample operations.
- `bytes blob_id`: Unique identifier of the blob related to the error.
- `enum SampleErrType`: Enumeration of sample error types.
- `NOT_FOUND`: Error when a blob is not found.
- `string err_description`: Description of the error.
### Dispersal
- **DispersalReq**: Request message for dispersing a blob.
- `Blob blob`: The blob to be dispersed.
- **DispersalRes**: Response message for a dispersal request.
- `oneof message_type`: Contains either a success response or an error.
- `bytes blob_id`: Unique identifier of the dispersed blob.
- `DispersalErr err`: Error occurred during dispersal.
### Sample
- **SampleReq**: Request message for sampling a blob.
- `bytes blob_id`: Unique identifier of the blob to be sampled.
- **SampleRes**: Response message for a sample request.
- `oneof message_type`: Contains either a success response or an error.
- `Blob blob`: The sampled blob.
- `SampleErr err`: Error occurred during sampling.
### Session Control
- **CloseMsg**: Message to close a session with a reason.
- `enum CloseReason`: Enumeration of close reasons.
- `GRACEFUL_SHUTDOWN`: Graceful shutdown of the session.
- `SUBNET_CHANGE`: Change in the subnet.
- `SUBNET_SAMPLE_FAIL`: Subnet sample failure.
- `CloseReason reason`: Reason for closing the session.
- **SessionReq**: Request message for session control.
- `oneof message_type`: Contains one of the following message types.
- `CloseMsg close_msg`: Message to close the session.
### DispersalMessage
- **DispersalMessage**: Wrapper message for different types of dispersal and sampling messages.
- `oneof message_type`: Contains one of the following message types.
- `DispersalReq dispersal_req`: Dispersal request.
- `DispersalRes dispersal_res`: Dispersal response.
- `SampleReq sample_req`: Sample request.
- `SampleRes sample_res`: Sample response.
## Protobuf
To generate the updated protobuf serializer from `dispersal.proto`, run the following command:
```bash
protoc --python_out=. dispersal.proto
```
This will generate the necessary Python code to serialize and deserialize the messages defined in the `dispersal.proto` file.

View File

@ -1,21 +1,24 @@
syntax = "proto3";
package nomos.da.dispersal;
package nomos.da.dispersal.v1;
message Blob {
bytes blob_id = 1;
bytes data = 2;
}
message Error {
string description = 1;
}
// DISPERSAL
message DispersalErr {
oneof message_type {
Error chunk_size_err = 1;
Error verification_err = 2;
bytes blob_id = 1;
enum DispersalErrType {
CHUNK_SIZE = 0;
VERIFICATION = 1;
}
DispersalErrType err_type = 2;
string err_description = 3;
}
message DispersalReq {
@ -29,10 +32,17 @@ message DispersalRes {
}
}
// SAMPLING
message SampleErr {
oneof message_type {
Error not_found = 1;
bytes blob_id = 1;
enum SampleErrType {
NOT_FOUND = 0;
}
SampleErrType err_type = 2;
string err_description = 3;
}
message SampleReq {
@ -46,6 +56,26 @@ message SampleRes {
}
}
// SESSION CONTROL
message CloseMsg {
enum CloseReason {
GRACEFUL_SHUTDOWN = 0;
SUBNET_CHANGE = 1;
SUBNET_SAMPLE_FAIL = 2;
}
CloseReason reason = 1;
}
message SessionReq {
oneof message_type {
CloseMsg close_msg = 1;
}
}
// WRAPPER MESSAGE
message DispersalMessage {
oneof message_type {
DispersalReq dispersal_req = 1;

View File

@ -0,0 +1,54 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# NO CHECKED-IN PROTOBUF GENCODE
# source: dispersal.proto
# Protobuf Python Version: 5.27.1
"""Generated protocol buffer code."""
from google.protobuf import descriptor as _descriptor
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import runtime_version as _runtime_version
from google.protobuf import symbol_database as _symbol_database
from google.protobuf.internal import builder as _builder
_runtime_version.ValidateProtobufRuntimeVersion(
_runtime_version.Domain.PUBLIC,
5,
27,
1,
'',
'dispersal.proto'
)
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x0f\x64ispersal.proto\x12\x12nomos.da.dispersal\"%\n\x04\x42lob\x12\x0f\n\x07\x62lob_id\x18\x01 \x01(\x0c\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\"\xb3\x01\n\x0c\x44ispersalErr\x12\x0f\n\x07\x62lob_id\x18\x01 \x01(\x0c\x12\x43\n\x08\x65rr_type\x18\x02 \x01(\x0e\x32\x31.nomos.da.dispersal.DispersalErr.DispersalErrType\x12\x17\n\x0f\x65rr_description\x18\x03 \x01(\t\"4\n\x10\x44ispersalErrType\x12\x0e\n\nCHUNK_SIZE\x10\x00\x12\x10\n\x0cVERIFICATION\x10\x01\"6\n\x0c\x44ispersalReq\x12&\n\x04\x62lob\x18\x01 \x01(\x0b\x32\x18.nomos.da.dispersal.Blob\"b\n\x0c\x44ispersalRes\x12\x11\n\x07\x62lob_id\x18\x01 \x01(\x0cH\x00\x12/\n\x03\x65rr\x18\x02 \x01(\x0b\x32 .nomos.da.dispersal.DispersalErrH\x00\x42\x0e\n\x0cmessage_type\"\x94\x01\n\tSampleErr\x12\x0f\n\x07\x62lob_id\x18\x01 \x01(\x0c\x12=\n\x08\x65rr_type\x18\x02 \x01(\x0e\x32+.nomos.da.dispersal.SampleErr.SampleErrType\x12\x17\n\x0f\x65rr_description\x18\x03 \x01(\t\"\x1e\n\rSampleErrType\x12\r\n\tNOT_FOUND\x10\x00\"\x1c\n\tSampleReq\x12\x0f\n\x07\x62lob_id\x18\x01 \x01(\x0c\"s\n\tSampleRes\x12(\n\x04\x62lob\x18\x01 \x01(\x0b\x32\x18.nomos.da.dispersal.BlobH\x00\x12,\n\x03\x65rr\x18\x02 \x01(\x0b\x32\x1d.nomos.da.dispersal.SampleErrH\x00\x42\x0e\n\x0cmessage_type\"\x82\x02\n\x10\x44ispersalMessage\x12\x39\n\rdispersal_req\x18\x01 \x01(\x0b\x32 .nomos.da.dispersal.DispersalReqH\x00\x12\x39\n\rdispersal_res\x18\x02 \x01(\x0b\x32 .nomos.da.dispersal.DispersalResH\x00\x12\x33\n\nsample_req\x18\x03 \x01(\x0b\x32\x1d.nomos.da.dispersal.SampleReqH\x00\x12\x33\n\nsample_res\x18\x04 \x01(\x0b\x32\x1d.nomos.da.dispersal.SampleResH\x00\x42\x0e\n\x0cmessage_typeb\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'dispersal_pb2', _globals)
if not _descriptor._USE_C_DESCRIPTORS:
DESCRIPTOR._loaded_options = None
_globals['_BLOB']._serialized_start=39
_globals['_BLOB']._serialized_end=76
_globals['_DISPERSALERR']._serialized_start=79
_globals['_DISPERSALERR']._serialized_end=258
_globals['_DISPERSALERR_DISPERSALERRTYPE']._serialized_start=206
_globals['_DISPERSALERR_DISPERSALERRTYPE']._serialized_end=258
_globals['_DISPERSALREQ']._serialized_start=260
_globals['_DISPERSALREQ']._serialized_end=314
_globals['_DISPERSALRES']._serialized_start=316
_globals['_DISPERSALRES']._serialized_end=414
_globals['_SAMPLEERR']._serialized_start=417
_globals['_SAMPLEERR']._serialized_end=565
_globals['_SAMPLEERR_SAMPLEERRTYPE']._serialized_start=535
_globals['_SAMPLEERR_SAMPLEERRTYPE']._serialized_end=565
_globals['_SAMPLEREQ']._serialized_start=567
_globals['_SAMPLEREQ']._serialized_end=595
_globals['_SAMPLERES']._serialized_start=597
_globals['_SAMPLERES']._serialized_end=712
_globals['_DISPERSALMESSAGE']._serialized_start=715
_globals['_DISPERSALMESSAGE']._serialized_end=973
# @@protoc_insertion_point(module_scope)

View File

@ -0,0 +1,123 @@
import asyncio
import argparse
import proto
from itertools import count
conn_id_counter = count(start=1)
class MockTransport:
def __init__(self, conn_id, reader, writer, handler):
self.conn_id = conn_id
self.reader = reader
self.writer = writer
self.handler = handler
async def read_and_process(self):
try:
while True:
message = await proto.parse_from_reader(self.reader)
await self.handler(self.conn_id, self.writer, message)
except Exception as e:
print(f"MockTransport: An error occurred: {e}")
finally:
self.writer.close()
await self.writer.wait_closed()
async def write(self, message):
self.writer.write(message)
await self.writer.drain()
class MockNode:
def __init__(self, addr, port, handler=None):
self.addr = addr
self.port = port
self.handler = handler if handler else self._handle
async def _on_conn(self, reader, writer):
conn_id = next(conn_id_counter)
transport = MockTransport(conn_id, reader, writer, self.handler)
await transport.read_and_process()
async def _handle(self, conn_id, writer, message):
if message.HasField('dispersal_req'):
blob_id = message.dispersal_req.blob.blob_id
data = message.dispersal_req.blob.data
print(f"MockNode: Received DispersalRes: blob_id={blob_id}; data={data}")
# Imitate succesful verification.
writer.write(proto.new_dispersal_res_success_msg(blob_id))
elif message.HasField('sample_req'):
print(f"MockNode: Received SampleRes: blob_id={message.sample_req.blob_id}")
else:
print(f"MockNode: Received unknown message: {message} ")
async def run(self):
server = await asyncio.start_server(
self._on_conn, self.addr, self.port
)
print(f"MockNode: Server started at {self.addr}:{self.port}")
async with server:
await server.serve_forever()
class MockExecutor:
def __init__(self, addr, port, col_num, executor=None, handler=None):
self.addr = addr
self.port = port
self.col_num = col_num
self.connections = []
self.interval = 10
self.executor = executor if executor else self._execute
self.handler = handler if handler else self._handle
async def _execute(self):
message = proto.new_dispersal_req_msg(b"dummy_blob_id", b"dummy_data")
while True:
try:
await asyncio.gather(*[t.write(message) for t in self.connections])
await asyncio.sleep(self.interval)
except asyncio.CancelledError:
break
except Exception as e:
print(f"MockExecutor: Error during message sending: {e}")
async def _handle(self, conn_id, writer, message):
if message.HasField('dispersal_res'):
print(f"MockExecutor: Received DispersalRes: blob_id={message.dispersal_res.blob_id}")
elif message.HasField('sample_res'):
print(f"MockExecutor: Received SampleRes: blob_id={message.sample_res.blob_id}")
else:
print(f"MockExecutor: Received unknown message: {message}")
async def _connect(self):
try:
reader, writer = await asyncio.open_connection(self.addr, self.port)
conn_id = len(self.connections)
transport = MockTransport(conn_id, reader, writer, self.handler)
self.connections.append(transport)
print(f"MockExecutor: Connected to {self.addr}:{self.port}, ID: {conn_id}")
asyncio.create_task(transport.read_and_process())
except Exception as e:
print(f"MockExecutor: Failed to connect or lost connection: {e}")
async def run(self):
await asyncio.gather(*(self._connect() for _ in range(self.col_num)))
await self._execute()
class MockSystem:
def __init__(self, addr='localhost'):
self.addr = addr
async def run_node_with_executor(self, col_number):
node = MockNode(self.addr, 8888)
executor = MockExecutor(self.addr, 8888, col_number)
await asyncio.gather(node.run(), executor.run())
def main():
app = MockSystem()
asyncio.run(app.run_node_with_executor(1))
if __name__ == '__main__':
main()

View File

@ -20,6 +20,9 @@ def unpack_message(data):
message.ParseFromString(data)
return message
# DISPERSAL
def new_dispersal_req_msg(blob_id, data):
blob = dispersal_pb2.Blob(blob_id=blob_id, data=data)
dispersal_req = dispersal_pb2.DispersalReq(blob=blob)
@ -31,20 +34,27 @@ def new_dispersal_res_success_msg(blob_id):
dispersal_message = dispersal_pb2.DispersalMessage(dispersal_res=dispersal_res)
return pack_message(dispersal_message)
def new_dispersal_res_chunk_size_error_msg(description):
error = dispersal_pb2.Error(description=description)
dispersal_err = dispersal_pb2.DispersalErr(chunk_size_err=error)
def new_dispersal_res_chunk_size_error_msg(blob_id, description):
dispersal_err = dispersal_pb2.DispersalErr(
blob_id=blob_id, err_type=dispersal_pb2.DispersalErr.CHUNK_SIZE,
err_description=description
)
dispersal_res = dispersal_pb2.DispersalRes(err=dispersal_err)
dispersal_message = dispersal_pb2.DispersalMessage(dispersal_res=dispersal_res)
return pack_message(dispersal_message)
def new_dispersal_res_verification_error_msg(description):
error = dispersal_pb2.Error(description=description)
dispersal_err = dispersal_pb2.DispersalErr(verification_err=error)
def new_dispersal_res_verification_error_msg(blob_id, description):
dispersal_err = dispersal_pb2.DispersalErr(
blob_id=blob_id, err_type=dispersal_pb2.DispersalErr.VERIFICATION,
err_description=description
)
dispersal_res = dispersal_pb2.DispersalRes(err=dispersal_err)
dispersal_message = dispersal_pb2.DispersalMessage(dispersal_res=dispersal_res)
return pack_message(dispersal_message)
# SAMPLING
def new_sample_req_msg(blob_id):
sample_req = dispersal_pb2.SampleReq(blob_id=blob_id)
dispersal_message = dispersal_pb2.DispersalMessage(sample_req=sample_req)
@ -56,9 +66,33 @@ def new_sample_res_success_msg(blob_id, data):
dispersal_message = dispersal_pb2.DispersalMessage(sample_res=sample_res)
return pack_message(dispersal_message)
def new_sample_res_not_found_error_msg(description):
error = dispersal_pb2.Error(description=description)
sample_err = dispersal_pb2.SampleErr(not_found=error)
def new_sample_res_not_found_error_msg(blob_id, description):
sample_err = dispersal_pb2.SampleErr(
blob_id=blob_id, err_type=dispersal_pb2.SampleErr.NOT_FOUND,
err_description=description
)
sample_res = dispersal_pb2.SampleRes(err=sample_err)
dispersal_message = dispersal_pb2.DispersalMessage(sample_res=sample_res)
return pack_message(dispersal_message)
# SESSION CONTROL
def new_close_msg(reason):
close_msg = dispersal_pb2.CloseMsg(reason=reason)
return close_msg
def new_session_req_close_msg(reason):
close_msg = new_close_msg(reason)
session_req = dispersal_pb2.SessionReq(close_msg=close_msg)
dispersal_message = dispersal_pb2.DispersalMessage(session_req=session_req)
return pack_message(dispersal_message)
def new_session_req_graceful_shutdown_msg():
new_session_req_close_msg(dispersal_pb2.CloseMsg.GRACEFUL_SHUTDOWN)
def new_session_req_subnet_change_msg():
new_session_req_close_msg(dispersal_pb2.CloseMsg.SUBNET_CHANGE)
def new_session_req_subnet_sample_fail_msg():
new_session_req_close_msg(dispersal_pb2.CloseMsg.SUBNET_SAMPLE_FAIL)