127 lines
4.0 KiB
Python
127 lines
4.0 KiB
Python
from itertools import count
|
|
|
|
import dispersal.dispersal_pb2 as dispersal_pb2
|
|
|
|
MAX_MSG_LEN_BYTES = 2
|
|
|
|
|
|
def pack_message(message):
|
|
# SerializeToString method returns an instance of bytes.
|
|
data = message.SerializeToString()
|
|
length_prefix = len(data).to_bytes(MAX_MSG_LEN_BYTES, byteorder="big")
|
|
return length_prefix + data
|
|
|
|
|
|
async def unpack_from_reader(reader):
|
|
length_prefix = await reader.readexactly(MAX_MSG_LEN_BYTES)
|
|
data_length = int.from_bytes(length_prefix, byteorder="big")
|
|
data = await reader.readexactly(data_length)
|
|
return parse(data)
|
|
|
|
|
|
def unpack_from_bytes(data):
|
|
length_prefix = data[:MAX_MSG_LEN_BYTES]
|
|
data_length = int.from_bytes(length_prefix, byteorder="big")
|
|
return parse(data[MAX_MSG_LEN_BYTES : MAX_MSG_LEN_BYTES + data_length])
|
|
|
|
|
|
def parse(data):
|
|
message = dispersal_pb2.DispersalMessage()
|
|
message.ParseFromString(data)
|
|
return message
|
|
|
|
|
|
# DISPERSAL
|
|
|
|
|
|
def new_dispersal_req_msg(blob_id, data):
|
|
blob = dispersal_pb2.Blob(blob_id=blob_id, data=data)
|
|
dispersal_req = dispersal_pb2.DispersalReq(blob=blob)
|
|
dispersal_message = dispersal_pb2.DispersalMessage(dispersal_req=dispersal_req)
|
|
return pack_message(dispersal_message)
|
|
|
|
|
|
def new_dispersal_res_success_msg(blob_id):
|
|
dispersal_res = dispersal_pb2.DispersalRes(blob_id=blob_id)
|
|
dispersal_message = dispersal_pb2.DispersalMessage(dispersal_res=dispersal_res)
|
|
return pack_message(dispersal_message)
|
|
|
|
|
|
def new_dispersal_res_chunk_size_error_msg(blob_id, description):
|
|
dispersal_err = dispersal_pb2.DispersalErr(
|
|
blob_id=blob_id,
|
|
err_type=dispersal_pb2.DispersalErr.CHUNK_SIZE,
|
|
err_description=description,
|
|
)
|
|
dispersal_res = dispersal_pb2.DispersalRes(err=dispersal_err)
|
|
dispersal_message = dispersal_pb2.DispersalMessage(dispersal_res=dispersal_res)
|
|
return pack_message(dispersal_message)
|
|
|
|
|
|
def new_dispersal_res_verification_error_msg(blob_id, description):
|
|
dispersal_err = dispersal_pb2.DispersalErr(
|
|
blob_id=blob_id,
|
|
err_type=dispersal_pb2.DispersalErr.VERIFICATION,
|
|
err_description=description,
|
|
)
|
|
dispersal_res = dispersal_pb2.DispersalRes(err=dispersal_err)
|
|
dispersal_message = dispersal_pb2.DispersalMessage(dispersal_res=dispersal_res)
|
|
return pack_message(dispersal_message)
|
|
|
|
|
|
# SAMPLING
|
|
|
|
|
|
def new_sample_req_msg(blob_id):
|
|
sample_req = dispersal_pb2.SampleReq(blob_id=blob_id)
|
|
dispersal_message = dispersal_pb2.DispersalMessage(sample_req=sample_req)
|
|
return pack_message(dispersal_message)
|
|
|
|
|
|
def new_sample_res_success_msg(blob_id, data):
|
|
blob = dispersal_pb2.Blob(blob_id=blob_id, data=data)
|
|
sample_res = dispersal_pb2.SampleRes(blob=blob)
|
|
dispersal_message = dispersal_pb2.DispersalMessage(sample_res=sample_res)
|
|
return pack_message(dispersal_message)
|
|
|
|
|
|
def new_sample_res_not_found_error_msg(blob_id, description):
|
|
sample_err = dispersal_pb2.SampleErr(
|
|
blob_id=blob_id,
|
|
err_type=dispersal_pb2.SampleErr.NOT_FOUND,
|
|
err_description=description,
|
|
)
|
|
sample_res = dispersal_pb2.SampleRes(err=sample_err)
|
|
dispersal_message = dispersal_pb2.DispersalMessage(sample_res=sample_res)
|
|
return pack_message(dispersal_message)
|
|
|
|
|
|
# SESSION CONTROL
|
|
|
|
|
|
def new_close_msg(reason):
|
|
close_msg = dispersal_pb2.CloseMsg(reason=reason)
|
|
return close_msg
|
|
|
|
|
|
def new_session_req_close_msg(reason):
|
|
close_msg = new_close_msg(reason)
|
|
session_req = dispersal_pb2.SessionReq(close_msg=close_msg)
|
|
dispersal_message = dispersal_pb2.DispersalMessage(session_req=session_req)
|
|
return dispersal_message
|
|
|
|
|
|
def new_session_req_graceful_shutdown_msg():
|
|
message = new_session_req_close_msg(dispersal_pb2.CloseMsg.GRACEFUL_SHUTDOWN)
|
|
return pack_message(message)
|
|
|
|
|
|
def new_session_req_subnet_change_msg():
|
|
message = new_session_req_close_msg(dispersal_pb2.CloseMsg.SUBNET_CHANGE)
|
|
return pack_message(message)
|
|
|
|
|
|
def new_session_req_subnet_sample_fail_msg():
|
|
message = new_session_req_close_msg(dispersal_pb2.CloseMsg.SUBNET_SAMPLE_FAIL)
|
|
return pack_message(message)
|