DA: Network message types (#681)

* Split message definitions for broadcast, dispersal and sampling

* Expose dispersal messages via crate

* Create nomos-da-messages crate

* From trait implementations for messages

* Rename broadcast to replication
This commit is contained in:
gusto 2024-07-25 12:27:55 +03:00 committed by GitHub
parent 5c17221d16
commit 9ff90e7d1c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 220 additions and 144 deletions

View File

@ -4,7 +4,7 @@ members = [
"nomos-da/kzgrs",
"nomos-da/kzgrs-backend",
"nomos-da/storage",
"nomos-da/network/dispersal",
"nomos-da/network/messages",
"nomos-libp2p",
"nomos-services/api",
"nomos-services/log",

View File

@ -1,9 +0,0 @@
use std::{env, path::PathBuf};
fn main() {
let project_dir = PathBuf::from(env::var("CARGO_MANIFEST_DIR").unwrap());
let dispersal_includes = project_dir.join("proto");
let dispersal_proto = project_dir.join("proto/dispersal.proto");
prost_build::compile_protos(&[dispersal_proto], &[dispersal_includes]).unwrap();
}

View File

@ -1,87 +0,0 @@
syntax = "proto3";
package nomos.da.dispersal.v1;
message Blob {
bytes blob_id = 1;
bytes data = 2;
}
// DISPERSAL
message DispersalErr {
bytes blob_id = 1;
enum DispersalErrType {
CHUNK_SIZE = 0;
VERIFICATION = 1;
}
DispersalErrType err_type = 2;
string err_description = 3;
}
message DispersalReq {
Blob blob = 1;
}
message DispersalRes {
oneof message_type {
bytes blob_id = 1;
DispersalErr err = 2;
}
}
// SAMPLING
message SampleErr {
bytes blob_id = 1;
enum SampleErrType {
NOT_FOUND = 0;
}
SampleErrType err_type = 2;
string err_description = 3;
}
message SampleReq {
bytes blob_id = 1;
}
message SampleRes {
oneof message_type {
Blob blob = 1;
SampleErr err = 2;
}
}
// SESSION CONTROL
message CloseMsg {
enum CloseReason {
GRACEFUL_SHUTDOWN = 0;
SUBNET_CHANGE = 1;
SUBNET_SAMPLE_FAIL = 2;
}
CloseReason reason = 1;
}
message SessionReq {
oneof message_type {
CloseMsg close_msg = 1;
}
}
// WRAPPER MESSAGE
message DispersalMessage {
oneof message_type {
DispersalReq dispersal_req = 1;
DispersalRes dispersal_res = 2;
SampleReq sample_req = 3;
SampleRes sample_res = 4;
SessionReq session_req = 5;
}
}

View File

@ -1 +0,0 @@
pub mod proto;

View File

@ -1,40 +0,0 @@
include!(concat!(env!("OUT_DIR"), "/nomos.da.dispersal.v1.rs"));
// Macro to implement From trait for DispersalMessage
macro_rules! impl_from_for_dispersal_message {
($($type:ty => $variant:ident),+ $(,)?) => {
$(
impl From<$type> for DispersalMessage {
fn from(msg: $type) -> Self {
DispersalMessage {
message_type: Some(dispersal_message::MessageType::$variant(msg)),
}
}
}
)+
}
}
impl_from_for_dispersal_message!(
DispersalReq => DispersalReq,
DispersalRes => DispersalRes,
SampleReq => SampleReq,
SampleRes => SampleRes,
SessionReq => SessionReq,
);
#[cfg(test)]
mod tests {
use crate::proto::dispersal;
#[test]
fn dispersal_message() {
let blob = dispersal::Blob {
blob_id: vec![0; 32],
data: vec![1; 32],
};
let req = dispersal::DispersalReq { blob: Some(blob) };
assert_eq!(req.blob.unwrap().blob_id, vec![0; 32]);
}
}

View File

@ -1,5 +1,5 @@
[package]
name = "nomos-da-dispersal"
name = "nomos-da-messages"
version = "0.1.0"
edition = "2021"

View File

@ -0,0 +1,16 @@
use std::{env, path::PathBuf};
fn main() {
let project_dir = PathBuf::from(env::var("CARGO_MANIFEST_DIR").unwrap());
let proto_includes = project_dir.join("proto");
let dispersal_proto = project_dir.join("proto/dispersal.proto");
let replication_proto = project_dir.join("proto/replication.proto");
let sampling_proto = project_dir.join("proto/sampling.proto");
prost_build::compile_protos(
&[dispersal_proto, replication_proto, sampling_proto],
&[proto_includes],
)
.unwrap();
}

View File

@ -0,0 +1,26 @@
syntax = "proto3";
package nomos.da.v1.common;
message Blob {
bytes blob_id = 1;
bytes data = 2;
}
// SESSION CONTROL
message CloseMsg {
enum CloseReason {
GRACEFUL_SHUTDOWN = 0;
SUBNET_CHANGE = 1;
SUBNET_SAMPLE_FAIL = 2;
}
CloseReason reason = 1;
}
message SessionReq {
oneof message_type {
CloseMsg close_msg = 1;
}
}

View File

@ -0,0 +1,40 @@
syntax = "proto3";
package nomos.da.v1.dispersal;
import "common.proto";
// DISPERSAL
message DispersalErr {
bytes blob_id = 1;
enum DispersalErrType {
CHUNK_SIZE = 0;
VERIFICATION = 1;
}
DispersalErrType err_type = 2;
string err_description = 3;
}
message DispersalReq {
common.Blob blob = 1;
}
message DispersalRes {
oneof message_type {
bytes blob_id = 1;
DispersalErr err = 2;
}
}
// WRAPPER MESSAGE
message Message {
oneof message_type {
DispersalReq dispersal_req = 1;
DispersalRes dispersal_res = 2;
common.SessionReq session_req = 3;
}
}

View File

@ -0,0 +1,20 @@
syntax = "proto3";
package nomos.da.v1.replication;
import "common.proto";
// REPLICATION
message ReplicationReq {
common.Blob blob = 1;
}
// WRAPPER MESSAGE
message Message {
oneof message_type {
ReplicationReq replication_req = 1;
common.SessionReq session_req = 2;
}
}

View File

@ -0,0 +1,38 @@
syntax = "proto3";
package nomos.da.v1.sampling;
import "common.proto";
// SAMPLING
message SampleErr {
bytes blob_id = 1;
enum SampleErrType {
NOT_FOUND = 0;
}
SampleErrType err_type = 2;
string err_description = 3;
}
message SampleReq {
bytes blob_id = 1;
}
message SampleRes {
oneof message_type {
common.Blob blob = 1;
SampleErr err = 2;
}
}
// WRAPPER MESSAGE
message Message {
oneof message_type {
SampleReq sample_req = 1;
SampleRes sample_res = 2;
}
}

View File

@ -0,0 +1,26 @@
use crate::{common, impl_from_for_message};
include!(concat!(env!("OUT_DIR"), "/nomos.da.v1.dispersal.rs"));
impl_from_for_message!(
Message,
DispersalReq => DispersalReq,
DispersalRes => DispersalRes,
common::SessionReq => SessionReq,
);
#[cfg(test)]
mod tests {
use crate::{common, dispersal};
#[test]
fn dispersal_message() {
let blob = common::Blob {
blob_id: vec![0; 32],
data: vec![1; 32],
};
let req = dispersal::DispersalReq { blob: Some(blob) };
assert_eq!(req.blob.unwrap().blob_id, vec![0; 32]);
}
}

View File

@ -5,9 +5,15 @@ use futures::AsyncReadExt;
use prost::Message;
pub mod dispersal;
pub mod replication;
pub mod sampling;
const MAX_MSG_LEN_BYTES: usize = 2;
pub mod common {
include!(concat!(env!("OUT_DIR"), "/nomos.da.v1.common.rs"));
}
pub fn pack_message(message: &impl Message) -> Result<Vec<u8>, io::Error> {
let data_len = message.encoded_len();
@ -39,25 +45,48 @@ where
M::decode(Bytes::from(data)).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
}
/// Macro to implement From trait for Wrapper Messages.
///
/// Usage:
/// ```ignore
/// impl_from_for_message!(
/// WrapperMessage, // impl From<Req> for WrappedMessage {
/// Req => WrappedReq, // .. return WrappedMsg::MessageType::WrappedReq(Req);
/// );
/// ```
#[macro_export]
macro_rules! impl_from_for_message {
($message:path, $($type:path => $variant:ident),+ $(,)?) => {
$(
impl From<$type> for $message {
fn from(msg: $type) -> Self {
$message {
message_type: Some(message::MessageType::$variant(msg)),
}
}
}
)+
}
}
#[cfg(test)]
mod tests {
use futures::io::BufReader;
use crate::proto::{dispersal, pack_message, unpack_from_reader};
use crate::{common, dispersal, pack_message, unpack_from_reader};
#[tokio::test]
async fn pack_and_unpack_from_reader() {
let blob = dispersal::Blob {
let blob = common::Blob {
blob_id: vec![0; 32],
data: vec![1; 32],
};
let message: dispersal::DispersalMessage =
dispersal::DispersalReq { blob: Some(blob) }.into();
let message: dispersal::Message = dispersal::DispersalReq { blob: Some(blob) }.into();
let packed = pack_message(&message).unwrap();
let mut reader = BufReader::new(&packed[..]);
let unpacked: dispersal::DispersalMessage = unpack_from_reader(&mut reader).await.unwrap();
let unpacked: dispersal::Message = unpack_from_reader(&mut reader).await.unwrap();
assert_eq!(message, unpacked);
}

View File

@ -0,0 +1,9 @@
use crate::{common, impl_from_for_message};
include!(concat!(env!("OUT_DIR"), "/nomos.da.v1.replication.rs"));
impl_from_for_message!(
Message,
ReplicationReq => ReplicationReq,
common::SessionReq => SessionReq,
);

View File

@ -0,0 +1,9 @@
use crate::impl_from_for_message;
include!(concat!(env!("OUT_DIR"), "/nomos.da.v1.sampling.rs"));
impl_from_for_message!(
Message,
SampleReq => SampleReq,
SampleRes => SampleRes,
);