1
0
mirror of synced 2025-01-10 15:56:03 +00:00

DA Prost integration (#678)

* Prost crate integration for DA

* Packing and unpacking for protobuf messages

* Minimal helper macro for dispersal message

* Use protoc in gh actions

* Move dispersal related protocol to nomos-da-dispersal
This commit is contained in:
gusto 2024-07-23 18:24:21 +03:00 committed by GitHub
parent f58c296959
commit fcda9a6da8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 221 additions and 0 deletions

View File

@ -21,6 +21,7 @@ jobs:
- uses: actions/checkout@v2
with:
submodules: true
- uses: arduino/setup-protoc@v3
- uses: actions-rs/toolchain@v1
with:
profile: minimal
@ -44,6 +45,7 @@ jobs:
- uses: actions/checkout@v2
with:
submodules: true
- uses: arduino/setup-protoc@v3
# Setup Rust toolchain with GNU for Windows
- name: Setup Rust with GNU toolchain (Windows)
if: matrix.os == 'windows-latest'
@ -100,6 +102,7 @@ jobs:
- uses: actions/checkout@v2
with:
submodules: true
- uses: arduino/setup-protoc@v3
- uses: actions-rs/toolchain@v1
with:
profile: minimal

View File

@ -16,6 +16,7 @@ jobs:
- uses: actions/checkout@v3
with:
submodules: true
- uses: arduino/setup-protoc@v3
- name: Checkout submodules
run: git submodule update --init --recursive
- uses: actions-rs/toolchain@v1

View File

@ -4,6 +4,7 @@ members = [
"nomos-da/kzgrs",
"nomos-da/kzgrs-backend",
"nomos-da/storage",
"nomos-da/network/dispersal",
"nomos-libp2p",
"nomos-services/api",
"nomos-services/log",

View File

@ -0,0 +1,15 @@
[package]
name = "nomos-da-dispersal"
version = "0.1.0"
edition = "2021"
[dependencies]
bytes = "1.6.1"
futures = "0.3.30"
prost = "0.13.1"
[build-dependencies]
prost-build = "0.13.1"
[dev-dependencies]
tokio = { version = "1.38.1", features = ["rt", "macros"] }

View File

@ -0,0 +1,9 @@
use std::{env, path::PathBuf};
fn main() {
let project_dir = PathBuf::from(env::var("CARGO_MANIFEST_DIR").unwrap());
let dispersal_includes = project_dir.join("proto");
let dispersal_proto = project_dir.join("proto/dispersal.proto");
prost_build::compile_protos(&[dispersal_proto], &[dispersal_includes]).unwrap();
}

View File

@ -0,0 +1,87 @@
syntax = "proto3";
package nomos.da.dispersal.v1;
message Blob {
bytes blob_id = 1;
bytes data = 2;
}
// DISPERSAL
message DispersalErr {
bytes blob_id = 1;
enum DispersalErrType {
CHUNK_SIZE = 0;
VERIFICATION = 1;
}
DispersalErrType err_type = 2;
string err_description = 3;
}
message DispersalReq {
Blob blob = 1;
}
message DispersalRes {
oneof message_type {
bytes blob_id = 1;
DispersalErr err = 2;
}
}
// SAMPLING
message SampleErr {
bytes blob_id = 1;
enum SampleErrType {
NOT_FOUND = 0;
}
SampleErrType err_type = 2;
string err_description = 3;
}
message SampleReq {
bytes blob_id = 1;
}
message SampleRes {
oneof message_type {
Blob blob = 1;
SampleErr err = 2;
}
}
// SESSION CONTROL
message CloseMsg {
enum CloseReason {
GRACEFUL_SHUTDOWN = 0;
SUBNET_CHANGE = 1;
SUBNET_SAMPLE_FAIL = 2;
}
CloseReason reason = 1;
}
message SessionReq {
oneof message_type {
CloseMsg close_msg = 1;
}
}
// WRAPPER MESSAGE
message DispersalMessage {
oneof message_type {
DispersalReq dispersal_req = 1;
DispersalRes dispersal_res = 2;
SampleReq sample_req = 3;
SampleRes sample_res = 4;
SessionReq session_req = 5;
}
}

View File

@ -0,0 +1 @@
pub mod proto;

View File

@ -0,0 +1,40 @@
include!(concat!(env!("OUT_DIR"), "/nomos.da.dispersal.v1.rs"));
// Macro to implement From trait for DispersalMessage
macro_rules! impl_from_for_dispersal_message {
($($type:ty => $variant:ident),+ $(,)?) => {
$(
impl From<$type> for DispersalMessage {
fn from(msg: $type) -> Self {
DispersalMessage {
message_type: Some(dispersal_message::MessageType::$variant(msg)),
}
}
}
)+
}
}
impl_from_for_dispersal_message!(
DispersalReq => DispersalReq,
DispersalRes => DispersalRes,
SampleReq => SampleReq,
SampleRes => SampleRes,
SessionReq => SessionReq,
);
#[cfg(test)]
mod tests {
use crate::proto::dispersal;
#[test]
fn dispersal_message() {
let blob = dispersal::Blob {
blob_id: vec![0; 32],
data: vec![1; 32],
};
let req = dispersal::DispersalReq { blob: Some(blob) };
assert_eq!(req.blob.unwrap().blob_id, vec![0; 32]);
}
}

View File

@ -0,0 +1,64 @@
use std::io;
use bytes::Bytes;
use futures::AsyncReadExt;
use prost::Message;
pub mod dispersal;
const MAX_MSG_LEN_BYTES: usize = 2;
pub fn pack_message(message: &impl Message) -> Result<Vec<u8>, io::Error> {
let data_len = message.encoded_len();
if data_len > (1 << (MAX_MSG_LEN_BYTES * 8)) {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
"Message too large",
));
}
let mut buf = Vec::with_capacity(MAX_MSG_LEN_BYTES + data_len);
buf.extend_from_slice(&(data_len as u16).to_be_bytes());
message.encode(&mut buf).unwrap();
Ok(buf)
}
pub async fn unpack_from_reader<M, R>(reader: &mut R) -> Result<M, io::Error>
where
M: Message + Default,
R: AsyncReadExt + Unpin,
{
let mut length_prefix = [0u8; MAX_MSG_LEN_BYTES];
reader.read_exact(&mut length_prefix).await?;
let data_length = u16::from_be_bytes(length_prefix) as usize;
let mut data = vec![0u8; data_length];
reader.read_exact(&mut data).await?;
M::decode(Bytes::from(data)).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
}
#[cfg(test)]
mod tests {
use futures::io::BufReader;
use crate::proto::{dispersal, pack_message, unpack_from_reader};
#[tokio::test]
async fn pack_and_unpack_from_reader() {
let blob = dispersal::Blob {
blob_id: vec![0; 32],
data: vec![1; 32],
};
let message: dispersal::DispersalMessage =
dispersal::DispersalReq { blob: Some(blob) }.into();
let packed = pack_message(&message).unwrap();
let mut reader = BufReader::new(&packed[..]);
let unpacked: dispersal::DispersalMessage = unpack_from_reader(&mut reader).await.unwrap();
assert_eq!(message, unpacked);
}
}