diff --git a/Cargo.toml b/Cargo.toml index 08a335f5..7f067b58 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,10 +23,10 @@ members = [ "nomos-services/data-availability/verifier", "nomos-services/data-availability/dispersal", "nomos-services/data-availability/tests", - "nomos-services/mix", - "nomos-mix/core", - "nomos-mix/message", - "nomos-mix/network", + "nomos-services/blend", + "nomos-blend/core", + "nomos-blend/message", + "nomos-blend/network", "nomos-tracing", "nomos-cli", "nomos-utils", diff --git a/nodes/nomos-executor/Cargo.toml b/nodes/nomos-executor/Cargo.toml index 808a6486..d0aac2ba 100644 --- a/nodes/nomos-executor/Cargo.toml +++ b/nodes/nomos-executor/Cargo.toml @@ -23,7 +23,7 @@ nomos-mempool = { path = "../../nomos-services/mempool", features = [ "libp2p", ] } nomos-network = { path = "../../nomos-services/network", features = ["libp2p"] } -nomos-mix-service = { path = "../../nomos-services/mix", features = ["libp2p"] } +nomos-blend-service = { path = "../../nomos-services/blend", features = ["libp2p"] } nomos-node = { path = "../nomos-node" } nomos-storage = { path = "../../nomos-services/storage", features = ["rocksdb"] } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } diff --git a/nodes/nomos-executor/src/config.rs b/nodes/nomos-executor/src/config.rs index 26a4d296..3cba7740 100644 --- a/nodes/nomos-executor/src/config.rs +++ b/nodes/nomos-executor/src/config.rs @@ -1,14 +1,16 @@ // std // crates use color_eyre::eyre::Result; +use nomos_blend_service::backends::libp2p::Libp2pBlendBackend as BlendBackend; +use nomos_blend_service::network::libp2p::Libp2pAdapter as BlendNetworkAdapter; +use nomos_blend_service::BlendService; use nomos_da_network_service::backends::libp2p::executor::DaNetworkExecutorBackend; use nomos_da_network_service::NetworkService as DaNetworkService; -use nomos_mix_service::backends::libp2p::Libp2pMixBackend as MixBackend; -use nomos_mix_service::network::libp2p::Libp2pAdapter as MixNetworkAdapter; -use nomos_mix_service::MixService; use nomos_network::backends::libp2p::Libp2p as NetworkBackend; use nomos_node::{ - config::{update_cryptarchia_consensus, update_mix, update_network, update_tracing, MixArgs}, + config::{ + update_blend, update_cryptarchia_consensus, update_network, update_tracing, BlendArgs, + }, CryptarchiaArgs, HttpArgs, LogArgs, NetworkArgs, NetworkService, Tracing, Wire, }; use nomos_storage::backends::rocksdb::RocksBackend; @@ -22,7 +24,7 @@ use crate::ExecutorApiService; pub struct Config { pub tracing: ::Settings, pub network: as ServiceData>::Settings, - pub mix: as ServiceData>::Settings, + pub blend: as ServiceData>::Settings, pub da_dispersal: ::Settings, pub da_network: > as ServiceData>::Settings, @@ -39,13 +41,13 @@ impl Config { mut self, log_args: LogArgs, network_args: NetworkArgs, - mix_args: MixArgs, + blend_args: BlendArgs, http_args: HttpArgs, cryptarchia_args: CryptarchiaArgs, ) -> Result { update_tracing(&mut self.tracing, log_args)?; update_network(&mut self.network, network_args)?; - update_mix(&mut self.mix, mix_args)?; + update_blend(&mut self.blend, blend_args)?; update_http(&mut self.http, http_args)?; update_cryptarchia_consensus(&mut self.cryptarchia, cryptarchia_args)?; Ok(self) diff --git a/nodes/nomos-executor/src/lib.rs b/nodes/nomos-executor/src/lib.rs index 17ed141c..209a5397 100644 --- a/nodes/nomos-executor/src/lib.rs +++ b/nodes/nomos-executor/src/lib.rs @@ -8,6 +8,9 @@ use rand_chacha::ChaCha20Rng; use api::backend::AxumBackend; use kzgrs_backend::common::blob::DaBlob; use nomos_api::ApiService; +use nomos_blend_service::backends::libp2p::Libp2pBlendBackend as BlendBackend; +use nomos_blend_service::network::libp2p::Libp2pAdapter as BlendNetworkAdapter; +use nomos_blend_service::BlendService; use nomos_da_dispersal::adapters::mempool::kzgrs::KzgrsMempoolAdapter; use nomos_da_dispersal::adapters::network::libp2p::Libp2pNetworkAdapter as DispersalNetworkAdapter; use nomos_da_dispersal::backend::kzgrs::DispersalKZGRSBackend; @@ -18,9 +21,6 @@ use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapter as SamplingStora use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifier; use nomos_da_verifier::network::adapters::executor::Libp2pAdapter as VerifierNetworkAdapter; use nomos_mempool::backend::mockpool::MockPool; -use nomos_mix_service::backends::libp2p::Libp2pMixBackend as MixBackend; -use nomos_mix_service::network::libp2p::Libp2pAdapter as MixNetworkAdapter; -use nomos_mix_service::MixService; use nomos_node::DispersedBlobInfo; use nomos_node::HeaderId; use nomos_node::MempoolNetworkAdapter; @@ -88,7 +88,7 @@ pub struct NomosExecutor { #[cfg(feature = "tracing")] tracing: ServiceHandle, network: ServiceHandle>, - mix: ServiceHandle>, + blend: ServiceHandle>, da_dispersal: ServiceHandle, da_indexer: ServiceHandle, da_verifier: ServiceHandle, diff --git a/nodes/nomos-executor/src/main.rs b/nodes/nomos-executor/src/main.rs index 310a88b9..fcd3e66d 100644 --- a/nodes/nomos-executor/src/main.rs +++ b/nodes/nomos-executor/src/main.rs @@ -5,7 +5,7 @@ use color_eyre::eyre::{eyre, Result}; use nomos_executor::config::Config as ExecutorConfig; use nomos_executor::{NomosExecutor, NomosExecutorServiceSettings}; use nomos_node::{ - config::MixArgs, BlobInfo, CryptarchiaArgs, DaMempoolSettings, DispersedBlobInfo, HttpArgs, + config::BlendArgs, BlobInfo, CryptarchiaArgs, DaMempoolSettings, DispersedBlobInfo, HttpArgs, LogArgs, MempoolAdapterSettings, NetworkArgs, Transaction, Tx, TxMempoolSettings, CL_TOPIC, DA_TOPIC, }; @@ -25,9 +25,9 @@ struct Args { /// Overrides network config. #[clap(flatten)] network_args: NetworkArgs, - /// Overrides mix config. + /// Overrides blend config. #[clap(flatten)] - mix_args: MixArgs, + blend_args: BlendArgs, /// Overrides http config. #[clap(flatten)] http_args: HttpArgs, @@ -41,14 +41,14 @@ fn main() -> Result<()> { log_args, http_args, network_args, - mix_args, + blend_args, cryptarchia_args, } = Args::parse(); let config = serde_yaml::from_reader::<_, ExecutorConfig>(std::fs::File::open(config)?)? .update_from_args( log_args, network_args, - mix_args, + blend_args, http_args, cryptarchia_args, )?; @@ -63,7 +63,7 @@ fn main() -> Result<()> { let app = OverwatchRunner::::run( NomosExecutorServiceSettings { network: config.network, - mix: config.mix, + blend: config.blend, #[cfg(feature = "tracing")] tracing: config.tracing, http: config.http, diff --git a/nodes/nomos-node/Cargo.toml b/nodes/nomos-node/Cargo.toml index 3f54f53e..4167f34b 100644 --- a/nodes/nomos-node/Cargo.toml +++ b/nodes/nomos-node/Cargo.toml @@ -28,7 +28,7 @@ nomos-da-verifier = { path = "../../nomos-services/data-availability/verifier", nomos-da-indexer = { path = "../../nomos-services/data-availability/indexer", features = ["rocksdb-backend"] } nomos-da-network-service = { path = "../../nomos-services/data-availability/network" } nomos-network = { path = "../../nomos-services/network", features = ["libp2p"] } -nomos-mix-service = { path = "../../nomos-services/mix", features = ["libp2p"] } +nomos-blend-service = { path = "../../nomos-services/blend", features = ["libp2p"] } nomos-api = { path = "../../nomos-services/api" } nomos-tracing = { path = "../../nomos-tracing" } nomos-tracing-service = { path = "../../nomos-services/tracing" } diff --git a/nodes/nomos-node/config.yaml b/nodes/nomos-node/config.yaml index 339f5334..1a39f384 100644 --- a/nodes/nomos-node/config.yaml +++ b/nodes/nomos-node/config.yaml @@ -48,7 +48,7 @@ network: node_key: 40fb62acf1604000c1b8d3bd0880e43eb2f6ae52029fde75d992ba0fed6e01c3 initial_peers: [] -mix: +blend: backend: listening_address: /ip4/0.0.0.0/udp/3001/quic-v1 node_key: 40fb62acf1604000c1b8d3bd0880e43eb2f6ae52029fde75d992ba0fed6e01c3 @@ -59,7 +59,7 @@ mix: message_blend: cryptographic_processor: private_key: [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0] - num_mix_layers: 1 + num_blend_layers: 1 temporal_processor: max_delay_seconds: 5 cover_traffic: diff --git a/nodes/nomos-node/src/config.rs b/nodes/nomos-node/src/config.rs index fb237c7a..c78c2a93 100644 --- a/nodes/nomos-node/src/config.rs +++ b/nodes/nomos-node/src/config.rs @@ -13,13 +13,13 @@ use serde::{Deserialize, Serialize}; use tracing::Level; // internal use crate::{NomosApiService, NomosDaMembership, Wire}; +use nomos_blend_service::backends::libp2p::Libp2pBlendBackend as BlendBackend; +use nomos_blend_service::network::libp2p::Libp2pAdapter as BlendNetworkAdapter; +use nomos_blend_service::BlendService; use nomos_core::{proofs::covenant::CovenantProof, staking::NMO_UNIT}; use nomos_da_network_service::backends::libp2p::validator::DaNetworkValidatorBackend; use nomos_da_network_service::NetworkService as DaNetworkService; use nomos_libp2p::{ed25519::SecretKey, Multiaddr}; -use nomos_mix_service::backends::libp2p::Libp2pMixBackend as MixBackend; -use nomos_mix_service::network::libp2p::Libp2pAdapter as MixNetworkAdapter; -use nomos_mix_service::MixService; use nomos_network::backends::libp2p::Libp2p as NetworkBackend; use nomos_network::NetworkService; use nomos_storage::backends::rocksdb::RocksBackend; @@ -75,16 +75,16 @@ pub struct NetworkArgs { } #[derive(Parser, Debug, Clone)] -pub struct MixArgs { - #[clap(long = "mix-addr", env = "MIX_ADDR")] - mix_addr: Option, +pub struct BlendArgs { + #[clap(long = "blend-addr", env = "BLEND_ADDR")] + blend_addr: Option, // TODO: Use either the raw bytes or the key type directly to delegate error handling to clap - #[clap(long = "mix-node-key", env = "MIX_NODE_KEY")] - mix_node_key: Option, + #[clap(long = "blend-node-key", env = "BLEND_NODE_KEY")] + blend_node_key: Option, - #[clap(long = "mix-num-mix-layers", env = "MIX_NUM_MIX_LAYERS")] - mix_num_mix_layers: Option, + #[clap(long = "blend-num-blend-layers", env = "BLEND_NUM_BLEND_LAYERS")] + blend_num_blend_layers: Option, } #[derive(Parser, Debug, Clone)] @@ -130,7 +130,7 @@ pub struct CryptarchiaArgs { pub struct Config { pub tracing: ::Settings, pub network: as ServiceData>::Settings, - pub mix: as ServiceData>::Settings, + pub blend: as ServiceData>::Settings, pub da_network: > as ServiceData>::Settings, pub da_indexer: ::Settings, @@ -148,13 +148,13 @@ impl Config { mut self, log_args: LogArgs, network_args: NetworkArgs, - mix_args: MixArgs, + blend_args: BlendArgs, http_args: HttpArgs, cryptarchia_args: CryptarchiaArgs, ) -> Result { update_tracing(&mut self.tracing, log_args)?; update_network(&mut self.network, network_args)?; - update_mix(&mut self.mix, mix_args)?; + update_blend(&mut self.blend, blend_args)?; update_http(&mut self.http, http_args)?; update_cryptarchia_consensus(&mut self.cryptarchia, cryptarchia_args)?; Ok(self) @@ -237,27 +237,27 @@ pub fn update_network( Ok(()) } -pub fn update_mix( - mix: &mut as ServiceData>::Settings, - mix_args: MixArgs, +pub fn update_blend( + blend: &mut as ServiceData>::Settings, + blend_args: BlendArgs, ) -> Result<()> { - let MixArgs { - mix_addr, - mix_node_key, - mix_num_mix_layers, - } = mix_args; + let BlendArgs { + blend_addr, + blend_node_key, + blend_num_blend_layers, + } = blend_args; - if let Some(addr) = mix_addr { - mix.backend.listening_address = addr; + if let Some(addr) = blend_addr { + blend.backend.listening_address = addr; } - if let Some(node_key) = mix_node_key { + if let Some(node_key) = blend_node_key { let mut key_bytes = hex::decode(node_key)?; - mix.backend.node_key = SecretKey::try_from_bytes(key_bytes.as_mut_slice())?; + blend.backend.node_key = SecretKey::try_from_bytes(key_bytes.as_mut_slice())?; } - if let Some(num_mix_layers) = mix_num_mix_layers { - mix.message_blend.cryptographic_processor.num_mix_layers = num_mix_layers; + if let Some(num_blend_layers) = blend_num_blend_layers { + blend.message_blend.cryptographic_processor.num_blend_layers = num_blend_layers; } Ok(()) diff --git a/nodes/nomos-node/src/lib.rs b/nodes/nomos-node/src/lib.rs index ea3c8031..cd651073 100644 --- a/nodes/nomos-node/src/lib.rs +++ b/nodes/nomos-node/src/lib.rs @@ -11,6 +11,9 @@ pub use config::{Config, CryptarchiaArgs, HttpArgs, LogArgs, NetworkArgs}; use kzgrs_backend::common::blob::DaBlob; pub use kzgrs_backend::dispersal::BlobInfo; use nomos_api::ApiService; +pub use nomos_blend_service::backends::libp2p::Libp2pBlendBackend as BlendBackend; +pub use nomos_blend_service::network::libp2p::Libp2pAdapter as BlendNetworkAdapter; +pub use nomos_blend_service::BlendService; pub use nomos_core::da::blob::info::DispersedBlobInfo; pub use nomos_core::{ da::blob::select::FillSize as FillSizeWithBlobs, tx::select::FillSize as FillSizeWithTx, @@ -35,9 +38,6 @@ pub use nomos_mempool::network::adapters::libp2p::{ }; pub use nomos_mempool::TxMempoolSettings; use nomos_mempool::{backend::mockpool::MockPool, TxMempoolService}; -pub use nomos_mix_service::backends::libp2p::Libp2pMixBackend as MixBackend; -pub use nomos_mix_service::network::libp2p::Libp2pAdapter as MixNetworkAdapter; -pub use nomos_mix_service::MixService; pub use nomos_network::backends::libp2p::Libp2p as NetworkBackend; pub use nomos_network::NetworkService; pub use nomos_storage::{ @@ -86,7 +86,11 @@ pub const MB16: usize = 1024 * 1024 * 16; pub type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus< cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter, - cryptarchia_consensus::mix::adapters::libp2p::LibP2pAdapter, + cryptarchia_consensus::blend::adapters::libp2p::LibP2pAdapter< + BlendNetworkAdapter, + Tx, + BlobInfo, + >, MockPool::Hash>, MempoolNetworkAdapter::Hash>, MockPool::BlobId>, @@ -124,7 +128,11 @@ pub type DaIndexer = DataIndexerService< CryptarchiaConsensusAdapter, // Cryptarchia specific, should be the same as in `Cryptarchia` type above. cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter, - cryptarchia_consensus::mix::adapters::libp2p::LibP2pAdapter, + cryptarchia_consensus::blend::adapters::libp2p::LibP2pAdapter< + BlendNetworkAdapter, + Tx, + BlobInfo, + >, MockPool::Hash>, MempoolNetworkAdapter::Hash>, MockPool::BlobId>, @@ -160,7 +168,7 @@ pub struct Nomos { #[cfg(feature = "tracing")] tracing: ServiceHandle, network: ServiceHandle>, - mix: ServiceHandle>, + blend: ServiceHandle>, da_indexer: ServiceHandle, da_verifier: ServiceHandle, da_sampling: ServiceHandle, diff --git a/nodes/nomos-node/src/main.rs b/nodes/nomos-node/src/main.rs index 6b44d9d7..165a12c2 100644 --- a/nodes/nomos-node/src/main.rs +++ b/nodes/nomos-node/src/main.rs @@ -1,6 +1,6 @@ use kzgrs_backend::dispersal::BlobInfo; use nomos_node::{ - config::MixArgs, Config, CryptarchiaArgs, HttpArgs, LogArgs, NetworkArgs, Nomos, + config::BlendArgs, Config, CryptarchiaArgs, HttpArgs, LogArgs, NetworkArgs, Nomos, NomosServiceSettings, Tx, }; @@ -25,9 +25,9 @@ struct Args { /// Overrides network config. #[clap(flatten)] network_args: NetworkArgs, - /// Overrides mix config. + /// Overrides blend config. #[clap(flatten)] - mix_args: MixArgs, + blend_args: BlendArgs, /// Overrides http config. #[clap(flatten)] http_args: HttpArgs, @@ -41,14 +41,14 @@ fn main() -> Result<()> { log_args, http_args, network_args, - mix_args, + blend_args, cryptarchia_args, } = Args::parse(); let config = serde_yaml::from_reader::<_, Config>(std::fs::File::open(config)?)? .update_from_args( log_args, network_args, - mix_args, + blend_args, http_args, cryptarchia_args, )?; @@ -63,7 +63,7 @@ fn main() -> Result<()> { let app = OverwatchRunner::::run( NomosServiceSettings { network: config.network, - mix: config.mix, + blend: config.blend, #[cfg(feature = "tracing")] tracing: config.tracing, http: config.http, diff --git a/nomos-mix/core/Cargo.toml b/nomos-blend/core/Cargo.toml similarity index 88% rename from nomos-mix/core/Cargo.toml rename to nomos-blend/core/Cargo.toml index 4a228442..7ba8d723 100644 --- a/nomos-mix/core/Cargo.toml +++ b/nomos-blend/core/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "nomos-mix" +name = "nomos-blend" version = "0.1.0" edition = "2021" @@ -11,7 +11,7 @@ tokio-stream = "0.1" tracing = "0.1" rand = "0.8" serde = { version = "1.0", features = ["derive"] } -nomos-mix-message = { path = "../message" } +nomos-blend-message = { path = "../message" } futures = "0.3" multiaddr = "0.18" x25519-dalek = { version = "2", features = ["getrandom", "static_secrets"] } diff --git a/nomos-mix/core/src/conn_maintenance.rs b/nomos-blend/core/src/conn_maintenance.rs similarity index 97% rename from nomos-mix/core/src/conn_maintenance.rs rename to nomos-blend/core/src/conn_maintenance.rs index fa8208e8..a15d39f9 100644 --- a/nomos-mix/core/src/conn_maintenance.rs +++ b/nomos-blend/core/src/conn_maintenance.rs @@ -6,7 +6,7 @@ use std::{ use fixed::types::U57F7; use multiaddr::Multiaddr; -use nomos_mix_message::MixMessage; +use nomos_blend_message::BlendMessage; use rand::RngCore; use serde::{Deserialize, Serialize}; @@ -24,7 +24,7 @@ pub struct ConnectionMaintenanceSettings { /// based on the number of messages sent by each peer in time windows pub struct ConnectionMaintenance where - M: MixMessage, + M: BlendMessage, R: RngCore, { settings: ConnectionMaintenanceSettings, @@ -39,7 +39,7 @@ where impl ConnectionMaintenance where - M: MixMessage, + M: BlendMessage, M::PublicKey: PartialEq, R: RngCore, { @@ -260,7 +260,7 @@ impl ConnectionMonitor { #[cfg(test)] mod tests { - use nomos_mix_message::mock::MockMixMessage; + use nomos_blend_message::mock::MockBlendMessage; use rand::{rngs::ThreadRng, thread_rng}; use crate::membership::Node; @@ -400,9 +400,9 @@ mod tests { fn init_maintenance( settings: ConnectionMaintenanceSettings, node_count: usize, - ) -> ConnectionMaintenance { + ) -> ConnectionMaintenance { let nodes = nodes(node_count); - let mut maintenance = ConnectionMaintenance::::new( + let mut maintenance = ConnectionMaintenance::::new( settings, Membership::new(nodes.clone(), nodes[0].public_key), thread_rng(), @@ -414,7 +414,7 @@ mod tests { maintenance } - fn nodes(count: usize) -> Vec::PublicKey>> { + fn nodes(count: usize) -> Vec::PublicKey>> { (0..count) .map(|i| Node { address: format!("/ip4/127.0.0.1/udp/{}/quic-v1", 1000 + i) diff --git a/nomos-mix/core/src/cover_traffic.rs b/nomos-blend/core/src/cover_traffic.rs similarity index 97% rename from nomos-mix/core/src/cover_traffic.rs rename to nomos-blend/core/src/cover_traffic.rs index 8042ed2b..f24ef927 100644 --- a/nomos-mix/core/src/cover_traffic.rs +++ b/nomos-blend/core/src/cover_traffic.rs @@ -1,7 +1,7 @@ use blake2::digest::consts::U4; use blake2::Digest; use futures::{Stream, StreamExt}; -use nomos_mix_message::MixMessage; +use nomos_blend_message::BlendMessage; use serde::Deserialize; use std::collections::HashSet; use std::hash::Hash; @@ -53,7 +53,7 @@ impl Stream for CoverTraffic + Send + Sync + Unpin, SlotStream: Stream + Send + Sync + Unpin, - Message: MixMessage + Send + Sync + Unpin, + Message: BlendMessage + Send + Sync + Unpin, { type Item = Vec; diff --git a/nomos-blend/core/src/lib.rs b/nomos-blend/core/src/lib.rs new file mode 100644 index 00000000..654a5792 --- /dev/null +++ b/nomos-blend/core/src/lib.rs @@ -0,0 +1,19 @@ +pub mod conn_maintenance; +pub mod cover_traffic; +pub mod membership; +pub mod message_blend; +pub mod persistent_transmission; + +pub enum BlendOutgoingMessage { + FullyUnwrapped(Vec), + Outbound(Vec), +} + +impl From for Vec { + fn from(value: BlendOutgoingMessage) -> Self { + match value { + BlendOutgoingMessage::FullyUnwrapped(v) => v, + BlendOutgoingMessage::Outbound(v) => v, + } + } +} diff --git a/nomos-mix/core/src/membership.rs b/nomos-blend/core/src/membership.rs similarity index 95% rename from nomos-mix/core/src/membership.rs rename to nomos-blend/core/src/membership.rs index 04dd8386..0207b83d 100644 --- a/nomos-mix/core/src/membership.rs +++ b/nomos-blend/core/src/membership.rs @@ -1,7 +1,7 @@ use std::collections::HashSet; use multiaddr::Multiaddr; -use nomos_mix_message::MixMessage; +use nomos_blend_message::BlendMessage; use rand::{ seq::{IteratorRandom, SliceRandom}, Rng, @@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug)] pub struct Membership where - M: MixMessage, + M: BlendMessage, { remote_nodes: Vec>, local_node: Node, @@ -25,7 +25,7 @@ pub struct Node { impl Membership where - M: MixMessage, + M: BlendMessage, M::PublicKey: PartialEq, { pub fn new(nodes: Vec>, local_public_key: M::PublicKey) -> Self { diff --git a/nomos-mix/core/src/message_blend/crypto.rs b/nomos-blend/core/src/message_blend/crypto.rs similarity index 91% rename from nomos-mix/core/src/message_blend/crypto.rs rename to nomos-blend/core/src/message_blend/crypto.rs index 62a8a865..506f0274 100644 --- a/nomos-mix/core/src/message_blend/crypto.rs +++ b/nomos-blend/core/src/message_blend/crypto.rs @@ -1,5 +1,5 @@ use crate::membership::Membership; -use nomos_mix_message::MixMessage; +use nomos_blend_message::BlendMessage; use rand::RngCore; use serde::{Deserialize, Serialize}; @@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize}; /// for the message indistinguishability. pub struct CryptographicProcessor where - M: MixMessage, + M: BlendMessage, { settings: CryptographicProcessorSettings, membership: Membership, @@ -17,13 +17,13 @@ where #[derive(Clone, Debug, Serialize, Deserialize)] pub struct CryptographicProcessorSettings { pub private_key: K, - pub num_mix_layers: usize, + pub num_blend_layers: usize, } impl CryptographicProcessor where R: RngCore, - M: MixMessage, + M: BlendMessage, M::PublicKey: Clone + PartialEq, { pub fn new( @@ -42,7 +42,7 @@ where // TODO: Use the actual Sphinx encoding instead of mock. let public_keys = self .membership - .choose_remote_nodes(&mut self.rng, self.settings.num_mix_layers) + .choose_remote_nodes(&mut self.rng, self.settings.num_blend_layers) .iter() .map(|node| node.public_key.clone()) .collect::>(); diff --git a/nomos-mix/core/src/message_blend/mod.rs b/nomos-blend/core/src/message_blend/mod.rs similarity index 88% rename from nomos-mix/core/src/message_blend/mod.rs rename to nomos-blend/core/src/message_blend/mod.rs index 2cc988b6..f702d0bf 100644 --- a/nomos-mix/core/src/message_blend/mod.rs +++ b/nomos-blend/core/src/message_blend/mod.rs @@ -13,8 +13,8 @@ pub use temporal::TemporalSchedulerSettings; use crate::membership::Membership; use crate::message_blend::crypto::CryptographicProcessor; use crate::message_blend::temporal::TemporalProcessorExt; -use crate::MixOutgoingMessage; -use nomos_mix_message::MixMessage; +use crate::BlendOutgoingMessage; +use nomos_blend_message::BlendMessage; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; @@ -24,23 +24,23 @@ use tokio_stream::wrappers::UnboundedReceiverStream; #[derive(Clone, Debug, Serialize, Deserialize)] pub struct MessageBlendSettings where - M: MixMessage, + M: BlendMessage, M::PrivateKey: Serialize + DeserializeOwned, { pub cryptographic_processor: CryptographicProcessorSettings, pub temporal_processor: TemporalSchedulerSettings, } -/// [`MessageBlendStream`] handles the entire mixing tiers process +/// [`MessageBlendStream`] handles the entire blending tiers process /// - Unwraps incoming messages received from network using [`CryptographicProcessor`] /// - Pushes unwrapped messages to [`TemporalProcessor`] pub struct MessageBlendStream where - M: MixMessage, + M: BlendMessage, { input_stream: S, - output_stream: Pin + Send + Sync + 'static>>, - temporal_sender: UnboundedSender, + output_stream: Pin + Send + Sync + 'static>>, + temporal_sender: UnboundedSender, cryptographic_processor: CryptographicProcessor, _rng: PhantomData, _scheduler: PhantomData, @@ -50,7 +50,7 @@ impl MessageBlendStream where S: Stream>, Rng: RngCore + Unpin + Send + 'static, - M: MixMessage, + M: BlendMessage, M::PrivateKey: Serialize + DeserializeOwned, M::PublicKey: Clone + PartialEq, M::Error: Debug, @@ -85,9 +85,9 @@ where match self.cryptographic_processor.unwrap_message(&message) { Ok((unwrapped_message, fully_unwrapped)) => { let message = if fully_unwrapped { - MixOutgoingMessage::FullyUnwrapped(unwrapped_message) + BlendOutgoingMessage::FullyUnwrapped(unwrapped_message) } else { - MixOutgoingMessage::Outbound(unwrapped_message) + BlendOutgoingMessage::Outbound(unwrapped_message) }; if let Err(e) = self.temporal_sender.send(message) { tracing::error!("Failed to send message to the outbound channel: {e:?}"); @@ -104,13 +104,13 @@ impl Stream for MessageBlendStream where S: Stream> + Unpin, Rng: RngCore + Unpin + Send + 'static, - M: MixMessage + Unpin, + M: BlendMessage + Unpin, M::PrivateKey: Serialize + DeserializeOwned + Unpin, M::PublicKey: Clone + PartialEq + Unpin, M::Error: Debug, Scheduler: Stream + Unpin + Send + Sync + 'static, { - type Item = MixOutgoingMessage; + type Item = BlendOutgoingMessage; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { if let Poll::Ready(Some(message)) = self.input_stream.poll_next_unpin(cx) { @@ -123,7 +123,7 @@ where pub trait MessageBlendExt: Stream> where Rng: RngCore + Send + Unpin + 'static, - M: MixMessage, + M: BlendMessage, M::PrivateKey: Serialize + DeserializeOwned, M::PublicKey: Clone + PartialEq, M::Error: Debug, @@ -153,7 +153,7 @@ impl MessageBlendExt for T where T: Stream>, Rng: RngCore + Unpin + Send + 'static, - M: MixMessage, + M: BlendMessage, M::PrivateKey: Clone + Serialize + DeserializeOwned + PartialEq, M::PublicKey: Clone + Serialize + DeserializeOwned + PartialEq, M::Error: Debug, diff --git a/nomos-mix/core/src/message_blend/temporal.rs b/nomos-blend/core/src/message_blend/temporal.rs similarity index 100% rename from nomos-mix/core/src/message_blend/temporal.rs rename to nomos-blend/core/src/message_blend/temporal.rs diff --git a/nomos-mix/core/src/persistent_transmission.rs b/nomos-blend/core/src/persistent_transmission.rs similarity index 94% rename from nomos-mix/core/src/persistent_transmission.rs rename to nomos-blend/core/src/persistent_transmission.rs index 72531a49..f05cddb3 100644 --- a/nomos-mix/core/src/persistent_transmission.rs +++ b/nomos-blend/core/src/persistent_transmission.rs @@ -1,5 +1,5 @@ use futures::{Stream, StreamExt}; -use nomos_mix_message::MixMessage; +use nomos_blend_message::BlendMessage; use rand::{distributions::Uniform, prelude::Distribution, Rng, RngCore}; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; @@ -33,14 +33,14 @@ where coin: Coin, stream: S, scheduler: Scheduler, - _mix_message: PhantomData, + _blend_message: PhantomData, } impl PersistentTransmissionStream where S: Stream, Rng: RngCore, - M: MixMessage, + M: BlendMessage, Scheduler: Stream, { pub fn new( @@ -54,7 +54,7 @@ where coin, stream, scheduler, - _mix_message: Default::default(), + _blend_message: Default::default(), } } } @@ -63,7 +63,7 @@ impl Stream for PersistentTransmissionStream> + Unpin, Rng: RngCore + Unpin, - M: MixMessage + Unpin, + M: BlendMessage + Unpin, Scheduler: Stream + Unpin, { type Item = Vec; @@ -91,7 +91,7 @@ where pub trait PersistentTransmissionExt: Stream where Rng: RngCore, - M: MixMessage, + M: BlendMessage, Scheduler: Stream, { fn persistent_transmission( @@ -111,7 +111,7 @@ impl PersistentTransmissionExt for S where S: Stream, Rng: RngCore, - M: MixMessage, + M: BlendMessage, M::PublicKey: Clone + Serialize + DeserializeOwned, Scheduler: Stream, { @@ -150,7 +150,7 @@ enum CoinError { mod tests { use super::*; use futures::StreamExt; - use nomos_mix_message::mock::MockMixMessage; + use nomos_blend_message::mock::MockBlendMessage; use rand::SeedableRng; use rand_chacha::ChaCha8Rng; use std::time::Duration; @@ -199,7 +199,7 @@ mod tests { let mut persistent_transmission_stream: PersistentTransmissionStream< _, _, - MockMixMessage, + MockBlendMessage, _, > = stream.persistent_transmission( settings, @@ -230,12 +230,12 @@ mod tests { ); assert_interval!(&mut last_time, lower_bound, upper_bound); - assert!(MockMixMessage::is_drop_message( + assert!(MockBlendMessage::is_drop_message( &persistent_transmission_stream.next().await.unwrap() )); assert_interval!(&mut last_time, lower_bound, upper_bound); - assert!(MockMixMessage::is_drop_message( + assert!(MockBlendMessage::is_drop_message( &persistent_transmission_stream.next().await.unwrap() )); assert_interval!(&mut last_time, lower_bound, upper_bound); diff --git a/nomos-mix/message/Cargo.toml b/nomos-blend/message/Cargo.toml similarity index 92% rename from nomos-mix/message/Cargo.toml rename to nomos-blend/message/Cargo.toml index 3a4c33c1..1db0dd25 100644 --- a/nomos-mix/message/Cargo.toml +++ b/nomos-blend/message/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "nomos-mix-message" +name = "nomos-blend-message" version = "0.1.0" edition = "2021" diff --git a/nomos-mix/message/src/lib.rs b/nomos-blend/message/src/lib.rs similarity index 90% rename from nomos-mix/message/src/lib.rs rename to nomos-blend/message/src/lib.rs index cc8e4c27..d17b4342 100644 --- a/nomos-mix/message/src/lib.rs +++ b/nomos-blend/message/src/lib.rs @@ -1,7 +1,7 @@ pub mod mock; pub mod sphinx; -pub trait MixMessage { +pub trait BlendMessage { type PublicKey; type PrivateKey; type Error; @@ -17,7 +17,7 @@ pub trait MixMessage { /// (False if the message still has layers to be unwrapped, true otherwise) /// /// If the input message was already fully unwrapped, or if its format is invalid, - /// this function returns `[Error::InvalidMixMessage]`. + /// this function returns `[Error::InvalidBlendMessage]`. fn unwrap_message( message: &[u8], private_key: &Self::PrivateKey, diff --git a/nomos-mix/message/src/mock/error.rs b/nomos-blend/message/src/mock/error.rs similarity index 83% rename from nomos-mix/message/src/mock/error.rs rename to nomos-blend/message/src/mock/error.rs index bc54d0fc..1fd4999e 100644 --- a/nomos-mix/message/src/mock/error.rs +++ b/nomos-blend/message/src/mock/error.rs @@ -1,7 +1,7 @@ #[derive(thiserror::Error, Debug)] pub enum Error { - #[error("Invalid mix message format")] - InvalidMixMessage, + #[error("Invalid blend message format")] + InvalidBlendMessage, #[error("Payload is too large")] PayloadTooLarge, #[error("Invalid number of layers")] diff --git a/nomos-mix/message/src/mock/mod.rs b/nomos-blend/message/src/mock/mod.rs similarity index 87% rename from nomos-mix/message/src/mock/mod.rs rename to nomos-blend/message/src/mock/mod.rs index bdececee..74abeacb 100644 --- a/nomos-mix/message/src/mock/mod.rs +++ b/nomos-blend/message/src/mock/mod.rs @@ -2,7 +2,7 @@ pub mod error; use error::Error; -use crate::MixMessage; +use crate::BlendMessage; const NODE_ID_SIZE: usize = 32; // TODO: Move MAX_PAYLOAD_SIZE and MAX_LAYERS to the upper layer (service layer). @@ -15,9 +15,9 @@ pub const MESSAGE_SIZE: usize = /// A mock implementation of the Sphinx encoding. #[derive(Clone, Debug)] -pub struct MockMixMessage; +pub struct MockBlendMessage; -impl MixMessage for MockMixMessage { +impl BlendMessage for MockBlendMessage { type PublicKey = [u8; NODE_ID_SIZE]; type PrivateKey = [u8; NODE_ID_SIZE]; type Error = Error; @@ -60,7 +60,7 @@ impl MixMessage for MockMixMessage { private_key: &Self::PrivateKey, ) -> Result<(Vec, bool), Self::Error> { if message.len() != MESSAGE_SIZE { - return Err(Error::InvalidMixMessage); + return Err(Error::InvalidBlendMessage); } // In this mock, we don't decrypt anything. So, we use private key as just a node ID. @@ -80,7 +80,7 @@ impl MixMessage for MockMixMessage { Some(pos) => { return Ok((padded_payload[0..pos].to_vec(), true)); } - _ => return Err(Error::InvalidMixMessage), + _ => return Err(Error::InvalidBlendMessage), } } @@ -100,21 +100,21 @@ mod tests { fn message() { let node_ids = (0..3).map(|i| [i; NODE_ID_SIZE]).collect::>(); let payload = [7; 10]; - let message = MockMixMessage::build_message(&payload, &node_ids).unwrap(); + let message = MockBlendMessage::build_message(&payload, &node_ids).unwrap(); assert_eq!(message.len(), MESSAGE_SIZE); let (message, is_fully_unwrapped) = - MockMixMessage::unwrap_message(&message, &node_ids[0]).unwrap(); + MockBlendMessage::unwrap_message(&message, &node_ids[0]).unwrap(); assert!(!is_fully_unwrapped); assert_eq!(message.len(), MESSAGE_SIZE); let (message, is_fully_unwrapped) = - MockMixMessage::unwrap_message(&message, &node_ids[1]).unwrap(); + MockBlendMessage::unwrap_message(&message, &node_ids[1]).unwrap(); assert!(!is_fully_unwrapped); assert_eq!(message.len(), MESSAGE_SIZE); let (unwrapped_payload, is_fully_unwrapped) = - MockMixMessage::unwrap_message(&message, &node_ids[2]).unwrap(); + MockBlendMessage::unwrap_message(&message, &node_ids[2]).unwrap(); assert!(is_fully_unwrapped); assert_eq!(unwrapped_payload, payload); } diff --git a/nomos-mix/message/src/sphinx/error.rs b/nomos-blend/message/src/sphinx/error.rs similarity index 100% rename from nomos-mix/message/src/sphinx/error.rs rename to nomos-blend/message/src/sphinx/error.rs diff --git a/nomos-mix/message/src/sphinx/layered_cipher.rs b/nomos-blend/message/src/sphinx/layered_cipher.rs similarity index 100% rename from nomos-mix/message/src/sphinx/layered_cipher.rs rename to nomos-blend/message/src/sphinx/layered_cipher.rs diff --git a/nomos-mix/message/src/sphinx/mod.rs b/nomos-blend/message/src/sphinx/mod.rs similarity index 96% rename from nomos-mix/message/src/sphinx/mod.rs rename to nomos-blend/message/src/sphinx/mod.rs index 137a3c6a..74f427d3 100644 --- a/nomos-mix/message/src/sphinx/mod.rs +++ b/nomos-blend/message/src/sphinx/mod.rs @@ -1,7 +1,7 @@ use error::Error; use packet::{Packet, UnpackedPacket}; -use crate::MixMessage; +use crate::BlendMessage; pub mod error; mod layered_cipher; @@ -16,7 +16,7 @@ const ASYM_KEY_SIZE: usize = 32; const MAX_PAYLOAD_SIZE: usize = 2048; const MAX_LAYERS: usize = 5; -impl MixMessage for SphinxMessage { +impl BlendMessage for SphinxMessage { type PublicKey = [u8; ASYM_KEY_SIZE]; type PrivateKey = [u8; ASYM_KEY_SIZE]; type Error = Error; diff --git a/nomos-mix/message/src/sphinx/packet.rs b/nomos-blend/message/src/sphinx/packet.rs similarity index 100% rename from nomos-mix/message/src/sphinx/packet.rs rename to nomos-blend/message/src/sphinx/packet.rs diff --git a/nomos-mix/message/src/sphinx/routing.rs b/nomos-blend/message/src/sphinx/routing.rs similarity index 100% rename from nomos-mix/message/src/sphinx/routing.rs rename to nomos-blend/message/src/sphinx/routing.rs diff --git a/nomos-mix/network/Cargo.toml b/nomos-blend/network/Cargo.toml similarity index 75% rename from nomos-mix/network/Cargo.toml rename to nomos-blend/network/Cargo.toml index d22baaf1..123b9cee 100644 --- a/nomos-mix/network/Cargo.toml +++ b/nomos-blend/network/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "nomos-mix-network" +name = "nomos-blend-network" version = "0.1.0" edition = "2021" @@ -9,10 +9,11 @@ futures = "0.3.30" futures-timer = "3.0.3" libp2p = "0.53" tracing = "0.1" -nomos-mix = { path = "../core" } -nomos-mix-message = { path = "../message" } +nomos-blend = { path = "../core" } +nomos-blend-message = { path = "../message" } sha2 = "0.10" rand = "0.8" +opentelemetry = "0.27.1" [dev-dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] } diff --git a/nomos-mix/network/src/behaviour.rs b/nomos-blend/network/src/behaviour.rs similarity index 94% rename from nomos-mix/network/src/behaviour.rs rename to nomos-blend/network/src/behaviour.rs index 0a1b8f78..60d62f84 100644 --- a/nomos-mix/network/src/behaviour.rs +++ b/nomos-blend/network/src/behaviour.rs @@ -1,6 +1,6 @@ use crate::{ error::Error, - handler::{FromBehaviour, MixConnectionHandler, ToBehaviour}, + handler::{BlendConnectionHandler, FromBehaviour, ToBehaviour}, }; use cached::{Cached, TimedCache}; use futures::Stream; @@ -13,11 +13,11 @@ use libp2p::{ }, Multiaddr, PeerId, }; -use nomos_mix::{ +use nomos_blend::{ conn_maintenance::{ConnectionMaintenance, ConnectionMaintenanceSettings}, membership::Membership, }; -use nomos_mix_message::MixMessage; +use nomos_blend_message::BlendMessage; use rand::RngCore; use sha2::{Digest, Sha256}; use std::marker::PhantomData; @@ -32,7 +32,7 @@ use std::{ /// - receives messages from all connected peers. pub struct Behaviour where - M: MixMessage, + M: BlendMessage, R: RngCore, { config: Config, @@ -46,7 +46,7 @@ where /// An LRU time cache for storing seen messages (based on their ID). This cache prevents /// duplicates from being propagated on the network. duplicate_cache: TimedCache, ()>, - _mix_message: PhantomData, + _blend_message: PhantomData, } #[derive(Debug)] @@ -65,7 +65,7 @@ pub enum Event { impl Behaviour where - M: MixMessage, + M: BlendMessage, M::PublicKey: PartialEq, R: RngCore, { @@ -90,7 +90,7 @@ where events, waker: None, duplicate_cache, - _mix_message: Default::default(), + _blend_message: Default::default(), } } @@ -117,7 +117,7 @@ where /// Forwards a message to all connected peers except the excluded peer. /// - /// Returns [`Error::NoPeers`] if there are no connected peers that support the mix protocol. + /// Returns [`Error::NoPeers`] if there are no connected peers that support the blend protocol. fn forward_message( &mut self, message: Vec, @@ -185,12 +185,12 @@ where impl NetworkBehaviour for Behaviour where - M: MixMessage + 'static, + M: BlendMessage + 'static, M::PublicKey: PartialEq + 'static, R: RngCore + 'static, Interval: Stream + Unpin + 'static, { - type ConnectionHandler = MixConnectionHandler; + type ConnectionHandler = BlendConnectionHandler; type ToSwarm = Event; fn handle_established_inbound_connection( @@ -202,7 +202,7 @@ where ) -> Result, ConnectionDenied> { // Keep PeerId <> Multiaddr mapping self.peer_address_map.add(peer_id, remote_addr.clone()); - Ok(MixConnectionHandler::new()) + Ok(BlendConnectionHandler::new()) } fn handle_established_outbound_connection( @@ -214,7 +214,7 @@ where ) -> Result, ConnectionDenied> { // Keep PeerId <> Multiaddr mapping self.peer_address_map.add(peer_id, addr.clone()); - Ok(MixConnectionHandler::new()) + Ok(BlendConnectionHandler::new()) } /// Informs the behaviour about an event from the [`Swarm`]. @@ -233,7 +233,7 @@ where } } - /// Handles an event generated by the [`MixConnectionHandler`] + /// Handles an event generated by the [`BlendConnectionHandler`] /// dedicated to the connection identified by `peer_id` and `connection_id`. fn on_connection_handler_event( &mut self, @@ -277,7 +277,7 @@ where .push_back(ToSwarm::GenerateEvent(Event::Message(message))); } // The connection was fully negotiated by the peer, - // which means that the peer supports the mix protocol. + // which means that the peer supports the blend protocol. ToBehaviour::FullyNegotiatedOutbound => { if let Some(addr) = self.peer_address_map.address(&peer_id) { self.conn_maintenance.add_connected_peer(addr.clone()); diff --git a/nomos-mix/network/src/error.rs b/nomos-blend/network/src/error.rs similarity index 100% rename from nomos-mix/network/src/error.rs rename to nomos-blend/network/src/error.rs diff --git a/nomos-mix/network/src/handler.rs b/nomos-blend/network/src/handler.rs similarity index 90% rename from nomos-mix/network/src/handler.rs rename to nomos-blend/network/src/handler.rs index a01cc2a6..2ef40b9f 100644 --- a/nomos-mix/network/src/handler.rs +++ b/nomos-blend/network/src/handler.rs @@ -14,12 +14,18 @@ use libp2p::{ Stream, StreamProtocol, }; -const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/nomos/mix/0.1.0"); +// Metrics +const VALUE_FULLY_NEGOTIATED_INBOUND: &str = "fully_negotiated_inbound"; +const VALUE_FULLY_NEGOTIATED_OUTBOUND: &str = "fully_negotiated_outbound"; +const VALUE_DIAL_UPGRADE_ERROR: &str = "dial_upgrade_error"; +const VALUE_IGNORED: &str = "ignored"; + +const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/nomos/blend/0.1.0"); // TODO: Consider replacing this struct with libp2p_stream ConnectionHandler // because we don't implement persistent emission in the per-connection level anymore. -/// A [`ConnectionHandler`] that handles the mix protocol. -pub struct MixConnectionHandler { +/// A [`ConnectionHandler`] that handles the blend protocol. +pub struct BlendConnectionHandler { inbound_substream: Option, outbound_substream: Option, outbound_msgs: VecDeque>, @@ -39,7 +45,7 @@ enum OutboundSubstreamState { PendingSend(MsgSendFuture), } -impl MixConnectionHandler { +impl BlendConnectionHandler { pub fn new() -> Self { Self { inbound_substream: None, @@ -57,7 +63,7 @@ impl MixConnectionHandler { } } -impl Default for MixConnectionHandler { +impl Default for BlendConnectionHandler { fn default() -> Self { Self::new() } @@ -71,9 +77,9 @@ pub enum FromBehaviour { #[derive(Debug)] pub enum ToBehaviour { - /// An outbound substream has been successfully upgraded for the mix protocol. + /// An outbound substream has been successfully upgraded for the blend protocol. FullyNegotiatedOutbound, - /// An outbound substream was failed to be upgraded for the mix protocol. + /// An outbound substream was failed to be upgraded for the blend protocol. NegotiationFailed, /// A message has been received from the connection. Message(Vec), @@ -81,7 +87,7 @@ pub enum ToBehaviour { IOError(io::Error), } -impl ConnectionHandler for MixConnectionHandler { +impl ConnectionHandler for BlendConnectionHandler { type FromBehaviour = FromBehaviour; type ToBehaviour = ToBehaviour; type InboundProtocol = ReadyUpgrade; @@ -99,6 +105,11 @@ impl ConnectionHandler for MixConnectionHandler { ) -> Poll< ConnectionHandlerEvent, > { + tracing::info!(gauge.pending_outbound_messages = self.outbound_msgs.len() as u64,); + tracing::info!( + gauge.pending_events_to_behaviour = self.pending_events_to_behaviour.len() as u64, + ); + // Process pending events to be sent to the behaviour if let Some(event) = self.pending_events_to_behaviour.pop_front() { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)); @@ -202,13 +213,14 @@ impl ConnectionHandler for MixConnectionHandler { Self::OutboundOpenInfo, >, ) { - match event { + let event_name = match event { ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { protocol: stream, .. }) => { tracing::debug!("FullyNegotiatedInbound: Creating inbound substream"); - self.inbound_substream = Some(recv_msg(stream).boxed()) + self.inbound_substream = Some(recv_msg(stream).boxed()); + VALUE_FULLY_NEGOTIATED_INBOUND } ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { protocol: stream, @@ -218,6 +230,7 @@ impl ConnectionHandler for MixConnectionHandler { self.outbound_substream = Some(OutboundSubstreamState::Idle(stream)); self.pending_events_to_behaviour .push_back(ToBehaviour::FullyNegotiatedOutbound); + VALUE_FULLY_NEGOTIATED_OUTBOUND } ConnectionEvent::DialUpgradeError(e) => { tracing::error!("DialUpgradeError: {:?}", e); @@ -234,17 +247,20 @@ impl ConnectionHandler for MixConnectionHandler { self.pending_events_to_behaviour .push_back(ToBehaviour::IOError(io::Error::new( io::ErrorKind::TimedOut, - "mix protocol negotiation timed out", + "blend protocol negotiation timed out", ))); } StreamUpgradeError::Apply(_) => unreachable!(), - } + }; + VALUE_DIAL_UPGRADE_ERROR } event => { - tracing::debug!("Ignoring connection event: {:?}", event) + tracing::debug!("Ignoring connection event: {:?}", event); + VALUE_IGNORED } - } + }; + tracing::info!(counter.connection_event = 1, event = event_name); self.try_wake(); } } diff --git a/nomos-mix/network/src/lib.rs b/nomos-blend/network/src/lib.rs similarity index 88% rename from nomos-mix/network/src/lib.rs rename to nomos-blend/network/src/lib.rs index 9cc4420b..cad75657 100644 --- a/nomos-mix/network/src/lib.rs +++ b/nomos-blend/network/src/lib.rs @@ -14,11 +14,11 @@ mod test { swarm::{dummy, NetworkBehaviour, SwarmEvent}, Multiaddr, Swarm, SwarmBuilder, }; - use nomos_mix::{ + use nomos_blend::{ conn_maintenance::{ConnectionMaintenanceSettings, ConnectionMonitorSettings}, membership::{Membership, Node}, }; - use nomos_mix_message::{mock::MockMixMessage, MixMessage}; + use nomos_blend_message::{mock::MockBlendMessage, BlendMessage}; use rand::{rngs::ThreadRng, thread_rng}; use tokio::select; use tokio_stream::wrappers::IntervalStream; @@ -28,7 +28,7 @@ mod test { /// Check that a published messsage arrives in the peers successfully. #[tokio::test] async fn behaviour() { - // Initialize two swarms that support the mix protocol. + // Initialize two swarms that support the blend protocol. let nodes = nodes(2, 8090); let mut swarm1 = new_swarm(Membership::new(nodes.clone(), nodes[0].public_key)); let mut swarm2 = new_swarm(Membership::new(nodes.clone(), nodes[1].public_key)); @@ -66,16 +66,16 @@ mod test { .is_ok()); } - /// If the peer doesn't support the mix protocol, the message should not be forwarded to the peer. + /// If the peer doesn't support the blend protocol, the message should not be forwarded to the peer. #[tokio::test] - async fn peer_not_support_mix_protocol() { - // Only swarm2 supports the mix protocol. + async fn peer_not_support_blend_protocol() { + // Only swarm2 supports the blend protocol. let nodes = nodes(2, 8190); - let mut swarm1 = new_swarm_without_mix(nodes[0].address.clone()); + let mut swarm1 = new_swarm_without_blend(nodes[0].address.clone()); let mut swarm2 = new_swarm(Membership::new(nodes.clone(), nodes[1].public_key)); // Expect all publish attempts to fail with [`Error::NoPeers`] - // because swarm2 doesn't have any peers that support the mix protocol. + // because swarm2 doesn't have any peers that support the blend protocol. let msg = vec![1; 10]; let mut publish_try_interval = tokio::time::interval(Duration::from_secs(1)); let mut publish_try_count = 0; @@ -95,8 +95,8 @@ mod test { } fn new_swarm( - membership: Membership, - ) -> Swarm> { + membership: Membership, + ) -> Swarm> { let conn_maintenance_settings = ConnectionMaintenanceSettings { peering_degree: membership.size() - 1, // excluding the local node max_peering_degree: membership.size() * 2, @@ -127,7 +127,7 @@ mod test { swarm } - fn new_swarm_without_mix(addr: Multiaddr) -> Swarm { + fn new_swarm_without_blend(addr: Multiaddr) -> Swarm { let mut swarm = new_swarm_with_behaviour(dummy::Behaviour); swarm.listen_on(addr).unwrap(); swarm @@ -151,7 +151,7 @@ mod test { fn nodes( count: usize, base_port: usize, - ) -> Vec::PublicKey>> { + ) -> Vec::PublicKey>> { (0..count) .map(|i| Node { address: format!("/ip4/127.0.0.1/udp/{}/quic-v1", base_port + i) diff --git a/nomos-core/chain-defs/src/block/mod.rs b/nomos-core/chain-defs/src/block/mod.rs index 14bea461..821baabf 100644 --- a/nomos-core/chain-defs/src/block/mod.rs +++ b/nomos-core/chain-defs/src/block/mod.rs @@ -47,4 +47,12 @@ impl< pub fn from_bytes(bytes: &[u8]) -> Self { wire::deserialize(bytes).unwrap() } + + pub fn cl_transactions_len(&self) -> usize { + self.cl_transactions.len() + } + + pub fn bl_blobs_len(&self) -> usize { + self.bl_blobs.len() + } } diff --git a/nomos-core/chain-defs/src/header/mod.rs b/nomos-core/chain-defs/src/header/mod.rs index fe10a5cc..d38de0bc 100644 --- a/nomos-core/chain-defs/src/header/mod.rs +++ b/nomos-core/chain-defs/src/header/mod.rs @@ -70,6 +70,10 @@ impl Header { &self.orphaned_leader_proofs } + pub fn content_size(&self) -> u32 { + self.content_size + } + pub fn new( parent: HeaderId, content_size: u32, diff --git a/nomos-da/network/core/src/protocols/dispersal/validator/behaviour.rs b/nomos-da/network/core/src/protocols/dispersal/validator/behaviour.rs index f3b46abe..a7b5f533 100644 --- a/nomos-da/network/core/src/protocols/dispersal/validator/behaviour.rs +++ b/nomos-da/network/core/src/protocols/dispersal/validator/behaviour.rs @@ -24,6 +24,17 @@ pub enum DispersalEvent { /// Received a n IncomingMessage { message: DispersalReq }, } + +impl DispersalEvent { + pub fn blob_size(&self) -> Option { + match self { + DispersalEvent::IncomingMessage { message } => { + message.blob.as_ref().map(|blob| blob.data.len()) + } + } + } +} + pub struct DispersalValidatorBehaviour { stream_behaviour: libp2p_stream::Behaviour, incoming_streams: IncomingStreams, diff --git a/nomos-da/network/core/src/protocols/replication/behaviour.rs b/nomos-da/network/core/src/protocols/replication/behaviour.rs index 4ae0bea2..07f7aa4a 100644 --- a/nomos-da/network/core/src/protocols/replication/behaviour.rs +++ b/nomos-da/network/core/src/protocols/replication/behaviour.rs @@ -12,7 +12,6 @@ use libp2p::swarm::{ }; use libp2p::{Multiaddr, PeerId}; use log::{error, trace}; - use subnetworks_assignations::MembershipHandler; use crate::SubnetworkId; @@ -31,6 +30,16 @@ pub enum ReplicationEvent { IncomingMessage { peer_id: PeerId, message: DaMessage }, } +impl ReplicationEvent { + pub fn blob_size(&self) -> Option { + match self { + ReplicationEvent::IncomingMessage { message, .. } => { + message.blob.as_ref().map(|blob| blob.data.len()) + } + } + } +} + /// Nomos DA broadcas network behaviour /// This item handles the logic of the nomos da subnetworks broadcasting /// DA subnetworks are a logical distribution of subsets. diff --git a/nomos-da/network/core/src/swarm/executor.rs b/nomos-da/network/core/src/swarm/executor.rs index 406ddf1a..55554e60 100644 --- a/nomos-da/network/core/src/swarm/executor.rs +++ b/nomos-da/network/core/src/swarm/executor.rs @@ -29,6 +29,12 @@ use crate::swarm::validator::ValidatorEventsStream; use crate::SubnetworkId; use subnetworks_assignations::MembershipHandler; +// Metrics +const EVENT_SAMPLING: &str = "sampling"; +const EVENT_DISPERSAL_EXECUTOR_DISPERSAL: &str = "dispersal_executor_event"; +const EVENT_VALIDATOR_DISPERSAL: &str = "validator_dispersal"; +const EVENT_REPLICATION: &str = "replication"; + pub struct ExecutorEventsStream { pub validator_events_stream: ValidatorEventsStream, pub dispersal_events_receiver: UnboundedReceiverStream, @@ -58,6 +64,7 @@ where let sampling_events_receiver = UnboundedReceiverStream::new(sampling_events_receiver); let validation_events_receiver = UnboundedReceiverStream::new(validation_events_receiver); let dispersal_events_receiver = UnboundedReceiverStream::new(dispersal_events_receiver); + ( Self { swarm: Self::build_swarm(key, membership, addresses), @@ -161,15 +168,33 @@ where async fn handle_behaviour_event(&mut self, event: ExecutorBehaviourEvent) { match event { ExecutorBehaviourEvent::Sampling(event) => { + tracing::info!( + counter.behaviour_events_received = 1, + event = EVENT_SAMPLING + ); self.handle_sampling_event(event).await; } ExecutorBehaviourEvent::ExecutorDispersal(event) => { + tracing::info!( + counter.behaviour_events_received = 1, + event = EVENT_DISPERSAL_EXECUTOR_DISPERSAL + ); self.handle_executor_dispersal_event(event).await; } ExecutorBehaviourEvent::ValidatorDispersal(event) => { + tracing::info!( + counter.behaviour_events_received = 1, + event = EVENT_VALIDATOR_DISPERSAL, + blob_size = event.blob_size() + ); self.handle_validator_dispersal_event(event).await; } ExecutorBehaviourEvent::Replication(event) => { + tracing::info!( + counter.behaviour_events_received = 1, + event = EVENT_REPLICATION, + blob_size = event.blob_size() + ); self.handle_replication_event(event).await; } } diff --git a/nomos-da/network/core/src/swarm/validator.rs b/nomos-da/network/core/src/swarm/validator.rs index 4afc008d..d778119d 100644 --- a/nomos-da/network/core/src/swarm/validator.rs +++ b/nomos-da/network/core/src/swarm/validator.rs @@ -25,6 +25,11 @@ use crate::swarm::common::{ use crate::SubnetworkId; use subnetworks_assignations::MembershipHandler; +// Metrics +const EVENT_SAMPLING: &str = "sampling"; +const EVENT_VALIDATOR_DISPERSAL: &str = "validator_dispersal"; +const EVENT_REPLICATION: &str = "replication"; + pub struct ValidatorEventsStream { pub sampling_events_receiver: UnboundedReceiverStream, pub validation_events_receiver: UnboundedReceiverStream, @@ -52,6 +57,7 @@ where let sampling_events_receiver = UnboundedReceiverStream::new(sampling_events_receiver); let validation_events_receiver = UnboundedReceiverStream::new(validation_events_receiver); + ( Self { swarm: Self::build_swarm(key, membership, addresses), @@ -131,12 +137,26 @@ where async fn handle_behaviour_event(&mut self, event: ValidatorBehaviourEvent) { match event { ValidatorBehaviourEvent::Sampling(event) => { + tracing::info!( + counter.behaviour_events_received = 1, + event = EVENT_SAMPLING + ); self.handle_sampling_event(event).await; } ValidatorBehaviourEvent::Dispersal(event) => { + tracing::info!( + counter.behaviour_events_received = 1, + event = EVENT_VALIDATOR_DISPERSAL, + blob_size = event.blob_size() + ); self.handle_dispersal_event(event).await; } ValidatorBehaviourEvent::Replication(event) => { + tracing::info!( + counter.behaviour_events_received = 1, + event = EVENT_REPLICATION, + blob_size = event.blob_size() + ); self.handle_replication_event(event).await; } } diff --git a/nomos-mix/core/src/lib.rs b/nomos-mix/core/src/lib.rs deleted file mode 100644 index 650b4299..00000000 --- a/nomos-mix/core/src/lib.rs +++ /dev/null @@ -1,19 +0,0 @@ -pub mod conn_maintenance; -pub mod cover_traffic; -pub mod membership; -pub mod message_blend; -pub mod persistent_transmission; - -pub enum MixOutgoingMessage { - FullyUnwrapped(Vec), - Outbound(Vec), -} - -impl From for Vec { - fn from(value: MixOutgoingMessage) -> Self { - match value { - MixOutgoingMessage::FullyUnwrapped(v) => v, - MixOutgoingMessage::Outbound(v) => v, - } - } -} diff --git a/nomos-services/api/Cargo.toml b/nomos-services/api/Cargo.toml index 2b812678..23c3f8d2 100644 --- a/nomos-services/api/Cargo.toml +++ b/nomos-services/api/Cargo.toml @@ -16,7 +16,7 @@ tracing = "0.1" nomos-core = { path = "../../nomos-core/chain-defs" } cryptarchia-consensus = { path = "../cryptarchia-consensus" } nomos-network = { path = "../../nomos-services/network" } -nomos-mix-service = { path = "../../nomos-services/mix", features = ["libp2p"] } +nomos-blend-service = { path = "../../nomos-services/blend", features = ["libp2p"] } nomos-mempool = { path = "../../nomos-services/mempool", features = [ "mock", "libp2p", diff --git a/nomos-services/api/src/http/consensus/cryptarchia.rs b/nomos-services/api/src/http/consensus/cryptarchia.rs index 2ab3d143..a438d54a 100644 --- a/nomos-services/api/src/http/consensus/cryptarchia.rs +++ b/nomos-services/api/src/http/consensus/cryptarchia.rs @@ -7,11 +7,12 @@ use tokio::sync::oneshot; use crate::http::DynError; use cryptarchia_consensus::{ - mix::adapters::libp2p::LibP2pAdapter as MixAdapter, + blend::adapters::libp2p::LibP2pAdapter as BlendAdapter, network::adapters::libp2p::LibP2pAdapter as ConsensusNetworkAdapter, ConsensusMsg, CryptarchiaConsensus, CryptarchiaInfo, }; use kzgrs_backend::dispersal::BlobInfo; +use nomos_blend_service::network::libp2p::Libp2pAdapter as BlendNetworkAdapter; use nomos_core::{ da::{ blob::{self, select::FillSize as FillSizeWithBlobs}, @@ -24,7 +25,6 @@ use nomos_da_sampling::backend::DaSamplingServiceBackend; use nomos_mempool::{ backend::mockpool::MockPool, network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter, }; -use nomos_mix_service::network::libp2p::Libp2pAdapter as MixNetworkAdapter; use nomos_storage::backends::{rocksdb::RocksBackend, StorageSerde}; pub type Cryptarchia< @@ -37,7 +37,7 @@ pub type Cryptarchia< const SIZE: usize, > = CryptarchiaConsensus< ConsensusNetworkAdapter, - MixAdapter, + BlendAdapter, MockPool::Hash>, MempoolNetworkAdapter::Hash>, MockPool::BlobId>, diff --git a/nomos-services/api/src/http/da.rs b/nomos-services/api/src/http/da.rs index 7223a955..b0a0aa87 100644 --- a/nomos-services/api/src/http/da.rs +++ b/nomos-services/api/src/http/da.rs @@ -1,5 +1,6 @@ use bytes::Bytes; use core::ops::Range; +use nomos_blend_service::network::libp2p::Libp2pAdapter as BlendNetworkAdapter; use nomos_core::da::blob::info::DispersedBlobInfo; use nomos_core::da::blob::{metadata, select::FillSize as FillSizeWithBlobs, Blob}; use nomos_core::da::{BlobId, DaVerifier as CoreDaVerifier}; @@ -23,7 +24,6 @@ use nomos_da_verifier::{DaVerifierMsg, DaVerifierService}; use nomos_libp2p::PeerId; use nomos_mempool::backend::mockpool::MockPool; use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter; -use nomos_mix_service::network::libp2p::Libp2pAdapter as MixNetworkAdapter; use nomos_storage::backends::rocksdb::RocksBackend; use nomos_storage::backends::StorageSerde; use overwatch_rs::overwatch::handle::OverwatchHandle; @@ -54,7 +54,7 @@ pub type DaIndexer< CryptarchiaConsensusAdapter, // Cryptarchia specific, should be the same as in `Cryptarchia` type above. cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter, - cryptarchia_consensus::mix::adapters::libp2p::LibP2pAdapter, + cryptarchia_consensus::blend::adapters::libp2p::LibP2pAdapter, MockPool::Hash>, MempoolNetworkAdapter::Hash>, MockPool, diff --git a/nomos-services/mix/Cargo.toml b/nomos-services/blend/Cargo.toml similarity index 78% rename from nomos-services/mix/Cargo.toml rename to nomos-services/blend/Cargo.toml index c5ad3c75..95b2375f 100644 --- a/nomos-services/mix/Cargo.toml +++ b/nomos-services/blend/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "nomos-mix-service" +name = "nomos-blend-service" version = "0.1.0" edition = "2021" @@ -8,10 +8,10 @@ async-trait = "0.1" futures = "0.3" libp2p = { version = "0.53", features = ["ed25519"] } nomos-libp2p = { path = "../../nomos-libp2p", optional = true } -nomos-mix = { path = "../../nomos-mix/core" } +nomos-blend = { path = "../../nomos-blend/core" } nomos-core = { path = "../../nomos-core/chain-defs" } -nomos-mix-network = { path = "../../nomos-mix/network" } -nomos-mix-message = { path = "../../nomos-mix/message" } +nomos-blend-network = { path = "../../nomos-blend/network" } +nomos-blend-message = { path = "../../nomos-blend/message" } nomos-network = { path = "../network" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } rand = "0.8.5" diff --git a/nomos-services/mix/src/backends/libp2p.rs b/nomos-services/blend/src/backends/libp2p.rs similarity index 65% rename from nomos-services/mix/src/backends/libp2p.rs rename to nomos-services/blend/src/backends/libp2p.rs index 962b7f68..5c990f37 100644 --- a/nomos-services/mix/src/backends/libp2p.rs +++ b/nomos-services/blend/src/backends/libp2p.rs @@ -1,6 +1,6 @@ use std::{pin::Pin, time::Duration}; -use super::MixBackend; +use super::BlendBackend; use async_trait::async_trait; use futures::{Stream, StreamExt}; use libp2p::{ @@ -8,9 +8,9 @@ use libp2p::{ swarm::SwarmEvent, Multiaddr, Swarm, SwarmBuilder, }; +use nomos_blend::{conn_maintenance::ConnectionMaintenanceSettings, membership::Membership}; +use nomos_blend_message::sphinx::SphinxMessage; use nomos_libp2p::secret_key_serde; -use nomos_mix::{conn_maintenance::ConnectionMaintenanceSettings, membership::Membership}; -use nomos_mix_message::sphinx::SphinxMessage; use overwatch_rs::overwatch::handle::OverwatchHandle; use rand::RngCore; use serde::{Deserialize, Serialize}; @@ -20,16 +20,16 @@ use tokio::{ }; use tokio_stream::wrappers::{BroadcastStream, IntervalStream}; -/// A mix backend that uses the libp2p network stack. -pub struct Libp2pMixBackend { +/// A blend backend that uses the libp2p network stack. +pub struct Libp2pBlendBackend { #[allow(dead_code)] task: JoinHandle<()>, - swarm_message_sender: mpsc::Sender, + swarm_message_sender: mpsc::Sender, incoming_message_sender: broadcast::Sender>, } #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct Libp2pMixBackendSettings { +pub struct Libp2pBlendBackendSettings { pub listening_address: Multiaddr, // A key for deriving PeerId and establishing secure connections (TLS 1.3 by QUIC) #[serde(with = "secret_key_serde", default = "ed25519::SecretKey::generate")] @@ -40,8 +40,8 @@ pub struct Libp2pMixBackendSettings { const CHANNEL_SIZE: usize = 64; #[async_trait] -impl MixBackend for Libp2pMixBackend { - type Settings = Libp2pMixBackendSettings; +impl BlendBackend for Libp2pBlendBackend { + type Settings = Libp2pBlendBackendSettings; fn new( config: Self::Settings, @@ -55,7 +55,7 @@ impl MixBackend for Libp2pMixBackend { let (swarm_message_sender, swarm_message_receiver) = mpsc::channel(CHANNEL_SIZE); let (incoming_message_sender, _) = broadcast::channel(CHANNEL_SIZE); - let mut swarm = MixSwarm::new( + let mut swarm = BlendSwarm::new( config, membership, rng, @@ -77,10 +77,10 @@ impl MixBackend for Libp2pMixBackend { async fn publish(&self, msg: Vec) { if let Err(e) = self .swarm_message_sender - .send(MixSwarmMessage::Publish(msg)) + .send(BlendSwarmMessage::Publish(msg)) .await { - tracing::error!("Failed to send message to MixSwarm: {e}"); + tracing::error!("Failed to send message to BlendSwarm: {e}"); } } @@ -92,29 +92,29 @@ impl MixBackend for Libp2pMixBackend { } } -struct MixSwarm +struct BlendSwarm where R: RngCore + 'static, { - swarm: Swarm>, - swarm_messages_receiver: mpsc::Receiver, + swarm: Swarm>, + swarm_messages_receiver: mpsc::Receiver, incoming_message_sender: broadcast::Sender>, } #[derive(Debug)] -pub enum MixSwarmMessage { +pub enum BlendSwarmMessage { Publish(Vec), } -impl MixSwarm +impl BlendSwarm where R: RngCore + 'static, { fn new( - config: Libp2pMixBackendSettings, + config: Libp2pBlendBackendSettings, membership: Membership, rng: R, - swarm_messages_receiver: mpsc::Receiver, + swarm_messages_receiver: mpsc::Receiver, incoming_message_sender: broadcast::Sender>, ) -> Self { let keypair = Keypair::from(ed25519::Keypair::from(config.node_key.clone())); @@ -126,8 +126,8 @@ where config.conn_maintenance.monitor.as_ref().map(|monitor| { IntervalStream::new(tokio::time::interval(monitor.time_window)) }); - nomos_mix_network::Behaviour::new( - nomos_mix_network::Config { + nomos_blend_network::Behaviour::new( + nomos_blend_network::Config { duplicate_cache_lifespan: 60, conn_maintenance_settings: config.conn_maintenance, conn_maintenance_interval, @@ -136,7 +136,7 @@ where rng, ) }) - .expect("Mix Behaviour should be built") + .expect("Blend Behaviour should be built") .with_swarm_config(|cfg| { cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX)) }) @@ -145,7 +145,7 @@ where swarm .listen_on(config.listening_address) .unwrap_or_else(|e| { - panic!("Failed to listen on Mix network: {e:?}"); + panic!("Failed to listen on Blend network: {e:?}"); }); Self { @@ -168,29 +168,42 @@ where } } - async fn handle_swarm_message(&mut self, msg: MixSwarmMessage) { + async fn handle_swarm_message(&mut self, msg: BlendSwarmMessage) { match msg { - MixSwarmMessage::Publish(msg) => { + BlendSwarmMessage::Publish(msg) => { + let msg_size = msg.len(); if let Err(e) = self.swarm.behaviour_mut().publish(msg) { - tracing::error!("Failed to publish message to mix network: {e:?}"); + tracing::error!("Failed to publish message to blend network: {e:?}"); + tracing::info!(counter.failed_outbound_messages = 1); + } else { + tracing::info!(counter.successful_outbound_messages = 1); + tracing::info!(histogram.sent_data = msg_size as u64); } } } } - fn handle_event(&mut self, event: SwarmEvent) { + fn handle_event(&mut self, event: SwarmEvent) { match event { - SwarmEvent::Behaviour(nomos_mix_network::Event::Message(msg)) => { + SwarmEvent::Behaviour(nomos_blend_network::Event::Message(msg)) => { tracing::debug!("Received message from a peer: {msg:?}"); + + let msg_size = msg.len(); if let Err(e) = self.incoming_message_sender.send(msg) { tracing::error!("Failed to send incoming message to channel: {e}"); + tracing::info!(counter.failed_inbound_messages = 1); + } else { + tracing::info!(counter.successful_inbound_messages = 1); + tracing::info!(histogram.received_data = msg_size as u64); } } - SwarmEvent::Behaviour(nomos_mix_network::Event::Error(e)) => { - tracing::error!("Received error from mix network: {e:?}"); + SwarmEvent::Behaviour(nomos_blend_network::Event::Error(e)) => { + tracing::error!("Received error from blend network: {e:?}"); + tracing::info!(counter.error = 1); } _ => { - tracing::debug!("Received event from mix network: {event:?}"); + tracing::debug!("Received event from blend network: {event:?}"); + tracing::info!(counter.ignored_event = 1); } } } diff --git a/nomos-services/mix/src/backends/mod.rs b/nomos-services/blend/src/backends/mod.rs similarity index 67% rename from nomos-services/mix/src/backends/mod.rs rename to nomos-services/blend/src/backends/mod.rs index 8bab1449..50f47ecf 100644 --- a/nomos-services/mix/src/backends/mod.rs +++ b/nomos-services/blend/src/backends/mod.rs @@ -4,14 +4,14 @@ pub mod libp2p; use std::{fmt::Debug, pin::Pin}; use futures::Stream; -use nomos_mix::membership::Membership; -use nomos_mix_message::sphinx::SphinxMessage; +use nomos_blend::membership::Membership; +use nomos_blend_message::sphinx::SphinxMessage; use overwatch_rs::overwatch::handle::OverwatchHandle; use rand::RngCore; -/// A trait for mix backends that send messages to the mix network. +/// A trait for blend backends that send messages to the blend network. #[async_trait::async_trait] -pub trait MixBackend { +pub trait BlendBackend { type Settings: Clone + Debug + Send + Sync + 'static; fn new( @@ -22,8 +22,8 @@ pub trait MixBackend { ) -> Self where R: RngCore + Send + 'static; - /// Publish a message to the mix network. + /// Publish a message to the blend network. async fn publish(&self, msg: Vec); - /// Listen to messages received from the mix network. + /// Listen to messages received from the blend network. fn listen_to_incoming_messages(&mut self) -> Pin> + Send>>; } diff --git a/nomos-services/mix/src/lib.rs b/nomos-services/blend/src/lib.rs similarity index 76% rename from nomos-services/mix/src/lib.rs rename to nomos-services/blend/src/lib.rs index 2338fbc4..dd663a1f 100644 --- a/nomos-services/mix/src/lib.rs +++ b/nomos-services/blend/src/lib.rs @@ -2,22 +2,22 @@ pub mod backends; pub mod network; use async_trait::async_trait; -use backends::MixBackend; +use backends::BlendBackend; use futures::StreamExt; use network::NetworkAdapter; -use nomos_core::wire; -use nomos_mix::message_blend::temporal::TemporalScheduler; -use nomos_mix::message_blend::{crypto::CryptographicProcessor, CryptographicProcessorSettings}; -use nomos_mix::message_blend::{MessageBlendExt, MessageBlendSettings}; -use nomos_mix::persistent_transmission::{ +use nomos_blend::message_blend::temporal::TemporalScheduler; +use nomos_blend::message_blend::{crypto::CryptographicProcessor, CryptographicProcessorSettings}; +use nomos_blend::message_blend::{MessageBlendExt, MessageBlendSettings}; +use nomos_blend::persistent_transmission::{ PersistentTransmissionExt, PersistentTransmissionSettings, PersistentTransmissionStream, }; -use nomos_mix::MixOutgoingMessage; -use nomos_mix::{ +use nomos_blend::BlendOutgoingMessage; +use nomos_blend::{ cover_traffic::{CoverTraffic, CoverTrafficSettings}, membership::{Membership, Node}, }; -use nomos_mix_message::{sphinx::SphinxMessage, MixMessage}; +use nomos_blend_message::{sphinx::SphinxMessage, BlendMessage}; +use nomos_core::wire; use nomos_network::NetworkService; use overwatch_rs::services::{ handle::ServiceStateHandle, @@ -35,14 +35,14 @@ use tokio::sync::mpsc; use tokio::time; use tokio_stream::wrappers::{IntervalStream, UnboundedReceiverStream}; -/// A mix service that sends messages to the mix network +/// A blend service that sends messages to the blend network /// and broadcasts fully unwrapped messages through the [`NetworkService`]. /// -/// The mix backend and the network adapter are generic types that are independent with each other. -/// For example, the mix backend can use the libp2p network stack, while the network adapter can use the other network backend. -pub struct MixService +/// The blend backend and the network adapter are generic types that are independent with each other. +/// For example, the blend backend can use the libp2p network stack, while the network adapter can use the other network backend. +pub struct BlendService where - Backend: MixBackend + 'static, + Backend: BlendBackend + 'static, Backend::Settings: Clone + Debug, Network: NetworkAdapter, Network::BroadcastSettings: Clone + Debug + Serialize + DeserializeOwned, @@ -53,24 +53,24 @@ where membership: Membership, } -impl ServiceData for MixService +impl ServiceData for BlendService where - Backend: MixBackend + 'static, + Backend: BlendBackend + 'static, Backend::Settings: Clone, Network: NetworkAdapter, Network::BroadcastSettings: Clone + Debug + Serialize + DeserializeOwned, { - const SERVICE_ID: ServiceId = "Mix"; - type Settings = MixConfig; + const SERVICE_ID: ServiceId = "Blend"; + type Settings = BlendConfig; type State = NoState; type StateOperator = NoOperator; type Message = ServiceMessage; } #[async_trait] -impl ServiceCore for MixService +impl ServiceCore for BlendService where - Backend: MixBackend + Send + 'static, + Backend: BlendBackend + Send + 'static, Backend::Settings: Clone, Network: NetworkAdapter + Send + Sync + 'static, Network::BroadcastSettings: @@ -78,17 +78,17 @@ where { fn init(service_state: ServiceStateHandle) -> Result { let network_relay = service_state.overwatch_handle.relay(); - let mix_config = service_state.settings_reader.get_updated_settings(); + let blend_config = service_state.settings_reader.get_updated_settings(); Ok(Self { - backend: ::new( + backend: ::new( service_state.settings_reader.get_updated_settings().backend, service_state.overwatch_handle.clone(), - mix_config.membership(), + blend_config.membership(), ChaCha12Rng::from_entropy(), ), service_state, network_relay, - membership: mix_config.membership(), + membership: blend_config.membership(), }) } @@ -99,9 +99,9 @@ where network_relay, membership, } = self; - let mix_config = service_state.settings_reader.get_updated_settings(); + let blend_config = service_state.settings_reader.get_updated_settings(); let mut cryptographic_processor = CryptographicProcessor::new( - mix_config.message_blend.cryptographic_processor.clone(), + blend_config.message_blend.cryptographic_processor.clone(), membership.clone(), ChaCha12Rng::from_entropy(), ); @@ -116,21 +116,21 @@ where SphinxMessage, _, > = UnboundedReceiverStream::new(persistent_receiver).persistent_transmission( - mix_config.persistent_transmission, + blend_config.persistent_transmission, ChaCha12Rng::from_entropy(), IntervalStream::new(time::interval(Duration::from_secs_f64( - 1.0 / mix_config.persistent_transmission.max_emission_frequency, + 1.0 / blend_config.persistent_transmission.max_emission_frequency, ))) .map(|_| ()), ); // tier 2 blend let temporal_scheduler = TemporalScheduler::new( - mix_config.message_blend.temporal_processor, + blend_config.message_blend.temporal_processor, ChaCha12Rng::from_entropy(), ); let mut blend_messages = backend.listen_to_incoming_messages().blend( - mix_config.message_blend.clone(), + blend_config.message_blend.clone(), membership.clone(), temporal_scheduler, ChaCha12Rng::from_entropy(), @@ -138,21 +138,22 @@ where // tier 3 cover traffic let mut cover_traffic: CoverTraffic<_, _, SphinxMessage> = CoverTraffic::new( - mix_config.cover_traffic.cover_traffic_settings( + blend_config.cover_traffic.cover_traffic_settings( &membership, - &mix_config.message_blend.cryptographic_processor, + &blend_config.message_blend.cryptographic_processor, ), - mix_config.cover_traffic.epoch_stream(), - mix_config.cover_traffic.slot_stream(), + blend_config.cover_traffic.epoch_stream(), + blend_config.cover_traffic.slot_stream(), ); // local messages, are bypassed and send immediately - let mut local_messages = service_state - .inbound_relay - .map(|ServiceMessage::Mix(message)| { - wire::serialize(&message) - .expect("Message from internal services should not fail to serialize") - }); + let mut local_messages = + service_state + .inbound_relay + .map(|ServiceMessage::Blend(message)| { + wire::serialize(&message) + .expect("Message from internal services should not fail to serialize") + }); let mut lifecycle_stream = service_state.lifecycle_handle.message_stream(); loop { @@ -163,19 +164,19 @@ where // Already processed blend messages Some(msg) = blend_messages.next() => { match msg { - MixOutgoingMessage::Outbound(msg) => { + BlendOutgoingMessage::Outbound(msg) => { if let Err(e) = persistent_sender.send(msg) { tracing::error!("Error sending message to persistent stream: {e}"); } } - MixOutgoingMessage::FullyUnwrapped(msg) => { + BlendOutgoingMessage::FullyUnwrapped(msg) => { tracing::debug!("Broadcasting fully unwrapped message"); match wire::deserialize::>(&msg) { Ok(msg) => { network_adapter.broadcast(msg.message, msg.broadcast_settings).await; }, _ => { - tracing::debug!("unrecognized message from mix backend"); + tracing::debug!("unrecognized message from blend backend"); } } } @@ -199,9 +200,9 @@ where } } -impl MixService +impl BlendService where - Backend: MixBackend + Send + 'static, + Backend: BlendBackend + Send + 'static, Backend::Settings: Clone, Network: NetworkAdapter, Network::BroadcastSettings: Clone + Debug + Serialize + DeserializeOwned, @@ -241,12 +242,12 @@ where } #[derive(Serialize, Deserialize, Clone, Debug)] -pub struct MixConfig { +pub struct BlendConfig { pub backend: BackendSettings, pub message_blend: MessageBlendSettings, pub persistent_transmission: PersistentTransmissionSettings, pub cover_traffic: CoverTrafficExtSettings, - pub membership: Vec::PublicKey>>, + pub membership: Vec::PublicKey>>, } #[derive(Serialize, Deserialize, Clone, Debug)] @@ -260,12 +261,12 @@ impl CoverTrafficExtSettings { &self, membership: &Membership, cryptographic_processor_settings: &CryptographicProcessorSettings< - ::PrivateKey, + ::PrivateKey, >, ) -> CoverTrafficSettings { CoverTrafficSettings { node_id: membership.local_node().public_key, - number_of_hops: cryptographic_processor_settings.num_mix_layers, + number_of_hops: cryptographic_processor_settings.num_blend_layers, slots_per_epoch: self.slots_per_epoch(), network_size: membership.size(), } @@ -301,7 +302,7 @@ impl CoverTrafficExtSettings { } } -impl MixConfig { +impl BlendConfig { fn membership(&self) -> Membership { let public_key = x25519_dalek::PublicKey::from(&x25519_dalek::StaticSecret::from( self.message_blend.cryptographic_processor.private_key, @@ -311,11 +312,11 @@ impl MixConfig { } } -/// A message that is handled by [`MixService`]. +/// A message that is handled by [`BlendService`]. #[derive(Debug)] pub enum ServiceMessage { - /// To send a message to the mix network and eventually broadcast it to the [`NetworkService`]. - Mix(NetworkMessage), + /// To send a message to the blend network and eventually broadcast it to the [`NetworkService`]. + Blend(NetworkMessage), } impl RelayMessage for ServiceMessage where @@ -323,7 +324,7 @@ impl RelayMessage for ServiceMessage where { } -/// A message that is sent to the mix network. +/// A message that is sent to the blend network. /// To eventually broadcast the message to the network service, /// [`BroadcastSettings`] must be included in the [`NetworkMessage`]. /// [`BroadcastSettings`] is a generic type defined by [`NetworkAdapter`]. diff --git a/nomos-services/mix/src/network/libp2p.rs b/nomos-services/blend/src/network/libp2p.rs similarity index 100% rename from nomos-services/mix/src/network/libp2p.rs rename to nomos-services/blend/src/network/libp2p.rs diff --git a/nomos-services/mix/src/network/mod.rs b/nomos-services/blend/src/network/mod.rs similarity index 94% rename from nomos-services/mix/src/network/mod.rs rename to nomos-services/blend/src/network/mod.rs index 9c2678a4..0df48cc5 100644 --- a/nomos-services/mix/src/network/mod.rs +++ b/nomos-services/blend/src/network/mod.rs @@ -10,7 +10,7 @@ use overwatch_rs::services::ServiceData; use serde::{de::DeserializeOwned, Serialize}; /// A trait for communicating with the network service, which is used to broadcast -/// fully unwrapped messages returned from the mix backend. +/// fully unwrapped messages returned from the blend backend. #[async_trait::async_trait] pub trait NetworkAdapter { /// The network backend used by the network service. diff --git a/nomos-services/cryptarchia-consensus/Cargo.toml b/nomos-services/cryptarchia-consensus/Cargo.toml index 355778f2..83698d24 100644 --- a/nomos-services/cryptarchia-consensus/Cargo.toml +++ b/nomos-services/cryptarchia-consensus/Cargo.toml @@ -16,7 +16,7 @@ cl = { path = "../../nomos-core/cl" } futures = "0.3" nomos-da-sampling = { path = "../data-availability/sampling" } nomos-network = { path = "../network" } -nomos-mix-service = { path = "../mix" } +nomos-blend-service = { path = "../blend" } nomos-mempool = { path = "../mempool" } nomos-core = { path = "../../nomos-core/chain-defs" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } @@ -42,7 +42,7 @@ serde_json = { version = "1", optional = true } [features] default = [] -libp2p = ["nomos-network/libp2p", "nomos-mix-service/libp2p", "nomos-libp2p"] +libp2p = ["nomos-network/libp2p", "nomos-blend-service/libp2p", "nomos-libp2p"] openapi = ["dep:utoipa", "serde_json"] [dev-dependencies] diff --git a/nomos-services/cryptarchia-consensus/src/mix/adapters/libp2p.rs b/nomos-services/cryptarchia-consensus/src/blend/adapters/libp2p.rs similarity index 70% rename from nomos-services/cryptarchia-consensus/src/mix/adapters/libp2p.rs rename to nomos-services/cryptarchia-consensus/src/blend/adapters/libp2p.rs index 09a64f2b..107d6e13 100644 --- a/nomos-services/cryptarchia-consensus/src/mix/adapters/libp2p.rs +++ b/nomos-services/cryptarchia-consensus/src/blend/adapters/libp2p.rs @@ -1,13 +1,13 @@ use std::{hash::Hash, marker::PhantomData}; -use nomos_core::{block::Block, wire}; -use nomos_mix_service::{ - backends::libp2p::Libp2pMixBackend, network::NetworkAdapter, MixService, ServiceMessage, +use nomos_blend_service::{ + backends::libp2p::Libp2pBlendBackend, network::NetworkAdapter, BlendService, ServiceMessage, }; +use nomos_core::{block::Block, wire}; use overwatch_rs::services::{relay::OutboundRelay, ServiceData}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use crate::{messages::NetworkMessage, mix::MixAdapter}; +use crate::{blend::BlendAdapter, messages::NetworkMessage}; #[derive(Clone)] pub struct LibP2pAdapter @@ -18,13 +18,13 @@ where BlobCert: Clone + Eq + Hash, { settings: LibP2pAdapterSettings, - mix_relay: OutboundRelay< as ServiceData>::Message>, + blend_relay: OutboundRelay< as ServiceData>::Message>, _tx: PhantomData, _blob_cert: PhantomData, } #[async_trait::async_trait] -impl MixAdapter for LibP2pAdapter +impl BlendAdapter for LibP2pAdapter where Network: NetworkAdapter + 'static, Network::BroadcastSettings: Clone, @@ -32,15 +32,15 @@ where BlobCert: Serialize + DeserializeOwned + Clone + Eq + Hash + Send + Sync + 'static, { type Settings = LibP2pAdapterSettings; - type Backend = Libp2pMixBackend; + type Backend = Libp2pBlendBackend; type Network = Network; type Tx = Tx; type BlobCertificate = BlobCert; async fn new( settings: Self::Settings, - mix_relay: OutboundRelay< - as ServiceData>::Message, + blend_relay: OutboundRelay< + as ServiceData>::Message, >, ) -> Self { // this wait seems to be helpful in some cases since we give the time @@ -50,22 +50,22 @@ where Self { settings, - mix_relay, + blend_relay, _tx: PhantomData, _blob_cert: PhantomData, } } - async fn mix(&self, block: Block) { + async fn blend(&self, block: Block) { if let Err((e, msg)) = self - .mix_relay - .send(ServiceMessage::Mix(nomos_mix_service::NetworkMessage { + .blend_relay + .send(ServiceMessage::Blend(nomos_blend_service::NetworkMessage { message: wire::serialize(&NetworkMessage::Block(block)).unwrap(), broadcast_settings: self.settings.broadcast_settings.clone(), })) .await { - tracing::error!("error sending message to mix network: {e}: {msg:?}",); + tracing::error!("error sending message to blend network: {e}: {msg:?}",); } } } diff --git a/nomos-services/cryptarchia-consensus/src/mix/adapters/mod.rs b/nomos-services/cryptarchia-consensus/src/blend/adapters/mod.rs similarity index 100% rename from nomos-services/cryptarchia-consensus/src/mix/adapters/mod.rs rename to nomos-services/cryptarchia-consensus/src/blend/adapters/mod.rs diff --git a/nomos-services/cryptarchia-consensus/src/mix/mod.rs b/nomos-services/cryptarchia-consensus/src/blend/mod.rs similarity index 61% rename from nomos-services/cryptarchia-consensus/src/mix/mod.rs rename to nomos-services/cryptarchia-consensus/src/blend/mod.rs index b34d7112..e5912d6f 100644 --- a/nomos-services/cryptarchia-consensus/src/mix/mod.rs +++ b/nomos-services/cryptarchia-consensus/src/blend/mod.rs @@ -4,8 +4,8 @@ pub mod adapters; use nomos_core::block::Block; use std::hash::Hash; // crates -use nomos_mix_service::network::NetworkAdapter; -use nomos_mix_service::{backends::MixBackend, MixService}; +use nomos_blend_service::network::NetworkAdapter; +use nomos_blend_service::{backends::BlendBackend, BlendService}; // internal use overwatch_rs::services::relay::OutboundRelay; use overwatch_rs::services::ServiceData; @@ -13,17 +13,17 @@ use serde::de::DeserializeOwned; use serde::Serialize; #[async_trait::async_trait] -pub trait MixAdapter { +pub trait BlendAdapter { type Settings: Clone + 'static; - type Backend: MixBackend + 'static; + type Backend: BlendBackend + 'static; type Network: NetworkAdapter + 'static; type Tx: Serialize + DeserializeOwned + Clone + Eq + Hash + 'static; type BlobCertificate: Serialize + DeserializeOwned + Clone + Eq + Hash + 'static; async fn new( settings: Self::Settings, - mix_relay: OutboundRelay< - as ServiceData>::Message, + blend_relay: OutboundRelay< + as ServiceData>::Message, >, ) -> Self; - async fn mix(&self, block: Block); + async fn blend(&self, block: Block); } diff --git a/nomos-services/cryptarchia-consensus/src/lib.rs b/nomos-services/cryptarchia-consensus/src/lib.rs index 5f3ed6b7..fd8404e7 100644 --- a/nomos-services/cryptarchia-consensus/src/lib.rs +++ b/nomos-services/cryptarchia-consensus/src/lib.rs @@ -1,6 +1,6 @@ +pub mod blend; mod leadership; mod messages; -pub mod mix; pub mod network; mod time; @@ -119,7 +119,7 @@ impl Cryptarchia { } #[derive(Debug, Deserialize, Serialize, Clone)] -pub struct CryptarchiaSettings { +pub struct CryptarchiaSettings { #[serde(default)] pub transaction_selector_settings: Ts, #[serde(default)] @@ -129,12 +129,12 @@ pub struct CryptarchiaSettings where A: NetworkAdapter, - MixAdapter: mix::MixAdapter, + BlendAdapter: blend::BlendAdapter, ClPoolAdapter: MempoolAdapter, ClPool: MemPool, DaPool: MemPool, @@ -174,7 +174,8 @@ pub struct CryptarchiaConsensus< // underlying networking backend. We need this so we can relay and check the types properly // when implementing ServiceCore for CryptarchiaConsensus network_relay: Relay>, - mix_relay: Relay>, + blend_relay: + Relay>, cl_mempool_relay: Relay>, da_mempool_relay: Relay< DaMempoolService< @@ -195,7 +196,7 @@ pub struct CryptarchiaConsensus< impl< A, - MixAdapter, + BlendAdapter, ClPool, ClPoolAdapter, DaPool, @@ -210,7 +211,7 @@ impl< > ServiceData for CryptarchiaConsensus< A, - MixAdapter, + BlendAdapter, ClPool, ClPoolAdapter, DaPool, @@ -225,7 +226,7 @@ impl< > where A: NetworkAdapter, - MixAdapter: mix::MixAdapter, + BlendAdapter: blend::BlendAdapter, ClPool: MemPool, ClPool::Item: Clone + Eq + Hash + Debug, ClPool::Key: Debug, @@ -248,7 +249,7 @@ where { const SERVICE_ID: ServiceId = CRYPTARCHIA_ID; type Settings = - CryptarchiaSettings; + CryptarchiaSettings; type State = NoState; type StateOperator = NoOperator; type Message = ConsensusMsg>; @@ -257,7 +258,7 @@ where #[async_trait::async_trait] impl< A, - MixAdapter, + BlendAdapter, ClPool, ClPoolAdapter, DaPool, @@ -272,7 +273,7 @@ impl< > ServiceCore for CryptarchiaConsensus< A, - MixAdapter, + BlendAdapter, ClPool, ClPoolAdapter, DaPool, @@ -292,12 +293,12 @@ where + Sync + 'static, A::Settings: Send + Sync + 'static, - MixAdapter: mix::MixAdapter + BlendAdapter: blend::BlendAdapter + Clone + Send + Sync + 'static, - MixAdapter::Settings: Send + Sync + 'static, + BlendAdapter::Settings: Send + Sync + 'static, ClPool: MemPool + Send + Sync + 'static, ClPool::Settings: Send + Sync + 'static, DaPool: MemPool + Send + Sync + 'static, @@ -345,16 +346,17 @@ where { fn init(service_state: ServiceStateHandle) -> Result { let network_relay = service_state.overwatch_handle.relay(); - let mix_relay = service_state.overwatch_handle.relay(); + let blend_relay = service_state.overwatch_handle.relay(); let cl_mempool_relay = service_state.overwatch_handle.relay(); let da_mempool_relay = service_state.overwatch_handle.relay(); let storage_relay = service_state.overwatch_handle.relay(); let sampling_relay = service_state.overwatch_handle.relay(); let (block_subscription_sender, _) = broadcast::channel(16); + Ok(Self { service_state, network_relay, - mix_relay, + blend_relay, cl_mempool_relay, da_mempool_relay, block_subscription_sender, @@ -370,11 +372,11 @@ where .await .expect("Relay connection with NetworkService should succeed"); - let mix_relay: OutboundRelay<_> = self - .mix_relay + let blend_relay: OutboundRelay<_> = self + .blend_relay .connect() .await - .expect("Relay connection with nomos_mix_service::MixService should succeed"); + .expect("Relay connection with nomos_blend_service::BlendService should succeed"); let cl_mempool_relay: OutboundRelay<_> = self .cl_mempool_relay @@ -408,7 +410,7 @@ where time, leader_config, network_adapter_settings, - mix_adapter_settings, + blend_adapter_settings, } = self.service_state.settings_reader.get_updated_settings(); let genesis_id = HeaderId::from([0; 32]); @@ -434,7 +436,7 @@ where let mut slot_timer = IntervalStream::new(timer.slot_interval()); - let mix_adapter = MixAdapter::new(mix_adapter_settings, mix_relay).await; + let blend_adapter = BlendAdapter::new(blend_adapter_settings, blend_relay).await; let mut lifecycle_stream = self.service_state.lifecycle_handle.message_stream(); @@ -442,6 +444,7 @@ where loop { tokio::select! { Some(block) = incoming_blocks.next() => { + Self::log_received_block(&block); cryptarchia = Self::process_block( cryptarchia, &mut leader, @@ -480,7 +483,7 @@ where ).await; if let Some(block) = block { - mix_adapter.mix(block).await; + blend_adapter.blend(block).await; } } } @@ -505,7 +508,7 @@ where impl< A, - MixAdapter, + BlendAdapter, ClPool, ClPoolAdapter, DaPool, @@ -520,7 +523,7 @@ impl< > CryptarchiaConsensus< A, - MixAdapter, + BlendAdapter, ClPool, ClPoolAdapter, DaPool, @@ -535,7 +538,7 @@ impl< > where A: NetworkAdapter + Clone + Send + Sync + 'static, - MixAdapter: mix::MixAdapter + Clone + Send + Sync + 'static, + BlendAdapter: blend::BlendAdapter + Clone + Send + Sync + 'static, ClPool: MemPool + Send + Sync + 'static, ClPool::Settings: Send + Sync + 'static, ClPool::Item: Transaction @@ -804,6 +807,24 @@ where .all(|blob| sampled_blobs_ids.contains(&blob.blob_id())); validated_blobs } + + fn log_received_block(block: &Block) { + let content_size = block.header().content_size(); + let transactions = block.cl_transactions_len(); + let blobs = block.bl_blobs_len(); + + tracing::info!( + counter.received_blocks = 1, + transactions = transactions, + blobs = blobs, + bytes = content_size + ); + tracing::info!( + histogram.received_blocks_data = content_size, + transactions = transactions, + blobs = blobs + ); + } } #[derive(Debug)] diff --git a/nomos-services/data-availability/indexer/src/lib.rs b/nomos-services/data-availability/indexer/src/lib.rs index e14821aa..8a094806 100644 --- a/nomos-services/data-availability/indexer/src/lib.rs +++ b/nomos-services/data-availability/indexer/src/lib.rs @@ -32,7 +32,7 @@ use tracing::error; pub type ConsensusRelay< A, - MixAdapter, + BlendAdapter, ClPool, ClPoolAdapter, DaPool, @@ -47,7 +47,7 @@ pub type ConsensusRelay< > = Relay< CryptarchiaConsensus< A, - MixAdapter, + BlendAdapter, ClPool, ClPoolAdapter, DaPool, @@ -67,7 +67,7 @@ pub struct DataIndexerService< DaStorage, Consensus, A, - MixAdapter, + BlendAdapter, ClPool, ClPoolAdapter, DaPool, @@ -82,7 +82,7 @@ pub struct DataIndexerService< > where B: 'static, A: NetworkAdapter, - MixAdapter: cryptarchia_consensus::mix::MixAdapter, + BlendAdapter: cryptarchia_consensus::blend::BlendAdapter, ClPoolAdapter: MempoolAdapter, ClPool: MemPool, DaPool: MemPool, @@ -110,7 +110,7 @@ pub struct DataIndexerService< #[allow(clippy::type_complexity)] consensus_relay: ConsensusRelay< A, - MixAdapter, + BlendAdapter, ClPool, ClPoolAdapter, DaPool, @@ -156,7 +156,7 @@ impl< DaStorage, Consensus, A, - MixAdapter, + BlendAdapter, ClPool, ClPoolAdapter, DaPool, @@ -174,7 +174,7 @@ impl< DaStorage, Consensus, A, - MixAdapter, + BlendAdapter, ClPool, ClPoolAdapter, DaPool, @@ -190,7 +190,7 @@ impl< where B: 'static, A: NetworkAdapter, - MixAdapter: cryptarchia_consensus::mix::MixAdapter, + BlendAdapter: cryptarchia_consensus::blend::BlendAdapter, ClPoolAdapter: MempoolAdapter, ClPool: MemPool, DaPool: MemPool, @@ -225,7 +225,7 @@ impl< DaStorage, Consensus, A, - MixAdapter, + BlendAdapter, ClPool, ClPoolAdapter, DaPool, @@ -243,7 +243,7 @@ impl< DaStorage, Consensus, A, - MixAdapter, + BlendAdapter, ClPool, ClPoolAdapter, DaPool, @@ -259,7 +259,7 @@ impl< where B: Send + Sync + 'static, A: NetworkAdapter, - MixAdapter: cryptarchia_consensus::mix::MixAdapter, + BlendAdapter: cryptarchia_consensus::blend::BlendAdapter, ClPoolAdapter: MempoolAdapter, ClPool: MemPool, DaPool: MemPool, @@ -336,7 +336,7 @@ impl< DaStorage, Consensus, A, - MixAdapter, + BlendAdapter, ClPool, ClPoolAdapter, DaPool, @@ -354,7 +354,7 @@ impl< DaStorage, Consensus, A, - MixAdapter, + BlendAdapter, ClPool, ClPoolAdapter, DaPool, @@ -370,7 +370,7 @@ impl< where B: Debug + Send + Sync, A: NetworkAdapter, - MixAdapter: cryptarchia_consensus::mix::MixAdapter, + BlendAdapter: cryptarchia_consensus::blend::BlendAdapter, ClPoolAdapter: MempoolAdapter, ClPool: MemPool, DaPool: MemPool, diff --git a/nomos-services/data-availability/tests/Cargo.toml b/nomos-services/data-availability/tests/Cargo.toml index 0baffd23..61a9e8f5 100644 --- a/nomos-services/data-availability/tests/Cargo.toml +++ b/nomos-services/data-availability/tests/Cargo.toml @@ -23,9 +23,9 @@ nomos-node = { path = "../../../nodes/nomos-node" } nomos-mempool = { path = "../../../nomos-services/mempool" } nomos-storage = { path = "../../../nomos-services/storage", features = ["rocksdb-backend"] } nomos-network = { path = "../../network", features = ["mock"] } -nomos-mix-service = { path = "../../mix" } -nomos-mix = { path = "../../../nomos-mix/core" } -nomos-mix-message = { path = "../../../nomos-mix/message" } +nomos-blend-service = { path = "../../blend" } +nomos-blend = { path = "../../../nomos-blend/core" } +nomos-blend-message = { path = "../../../nomos-blend/message" } nomos-libp2p = { path = "../../../nomos-libp2p" } libp2p = { version = "0.53.2", features = ["ed25519"] } once_cell = "1.19" diff --git a/nomos-services/data-availability/tests/src/common.rs b/nomos-services/data-availability/tests/src/common.rs index 849a7b66..2d43c057 100644 --- a/nomos-services/data-availability/tests/src/common.rs +++ b/nomos-services/data-availability/tests/src/common.rs @@ -1,11 +1,11 @@ use cryptarchia_consensus::LeaderConfig; // std -use nomos_da_network_service::backends::libp2p::common::DaNetworkBackendSettings; -use nomos_mix::message_blend::{ +use nomos_blend::message_blend::{ CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings, }; -use nomos_mix::{conn_maintenance::ConnectionMaintenanceSettings, membership::Node}; -use nomos_mix_message::{sphinx::SphinxMessage, MixMessage}; +use nomos_blend::{conn_maintenance::ConnectionMaintenanceSettings, membership::Node}; +use nomos_blend_message::{sphinx::SphinxMessage, BlendMessage}; +use nomos_da_network_service::backends::libp2p::common::DaNetworkBackendSettings; use std::path::PathBuf; use std::time::Duration; // crates @@ -19,6 +19,10 @@ use libp2p::identity::{ ed25519::{self, Keypair as Ed25519Keypair}, Keypair, PeerId, }; +use nomos_blend_service::backends::libp2p::{ + Libp2pBlendBackend as BlendBackend, Libp2pBlendBackendSettings, +}; +use nomos_blend_service::{BlendConfig, BlendService}; use nomos_core::{da::blob::info::DispersedBlobInfo, header::HeaderId, tx::Transaction}; pub use nomos_core::{ da::blob::select::FillSize as FillSizeWithBlobs, tx::select::FillSize as FillSizeWithTx, @@ -54,10 +58,6 @@ use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAda use nomos_mempool::network::adapters::libp2p::Settings as AdapterSettings; use nomos_mempool::{backend::mockpool::MockPool, TxMempoolService}; use nomos_mempool::{DaMempoolSettings, TxMempoolSettings}; -use nomos_mix_service::backends::libp2p::{ - Libp2pMixBackend as MixBackend, Libp2pMixBackendSettings, -}; -use nomos_mix_service::{MixConfig, MixService}; use nomos_network::backends::libp2p::{Libp2p as NetworkBackend, Libp2pConfig}; use nomos_network::NetworkConfig; use nomos_network::NetworkService; @@ -92,8 +92,8 @@ pub static ENCODER: Lazy = Lazy::new(|| DaEncoder::new(PARAMS.clone() pub(crate) type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus< cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter, - cryptarchia_consensus::mix::adapters::libp2p::LibP2pAdapter< - nomos_mix_service::network::libp2p::Libp2pAdapter, + cryptarchia_consensus::blend::adapters::libp2p::LibP2pAdapter< + nomos_blend_service::network::libp2p::Libp2pAdapter, Tx, BlobInfo, >, @@ -124,8 +124,8 @@ pub(crate) type DaIndexer = DataIndexerService< CryptarchiaConsensusAdapter, // Cryptarchia specific, should be the same as in `Cryptarchia` type above. cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter, - cryptarchia_consensus::mix::adapters::libp2p::LibP2pAdapter< - nomos_mix_service::network::libp2p::Libp2pAdapter, + cryptarchia_consensus::blend::adapters::libp2p::LibP2pAdapter< + nomos_blend_service::network::libp2p::Libp2pAdapter, Tx, BlobInfo, >, @@ -168,7 +168,9 @@ pub(crate) const MB16: usize = 1024 * 1024 * 16; pub struct TestNode { //logging: ServiceHandle, network: ServiceHandle>, - mix: ServiceHandle>, + blend: ServiceHandle< + BlendService, + >, cl_mempool: ServiceHandle, da_network: ServiceHandle>>, da_mempool: ServiceHandle, @@ -188,10 +190,10 @@ pub struct TestDaNetworkSettings { pub node_key: ed25519::SecretKey, } -pub struct TestMixSettings { - pub backend: Libp2pMixBackendSettings, +pub struct TestBlendSettings { + pub backend: Libp2pBlendBackendSettings, pub private_key: x25519_dalek::StaticSecret, - pub membership: Vec::PublicKey>>, + pub membership: Vec::PublicKey>>, } pub fn new_node( @@ -200,7 +202,7 @@ pub fn new_node( genesis_state: &LedgerState, time_config: &TimeConfig, swarm_config: &SwarmConfig, - mix_config: &TestMixSettings, + blend_config: &TestBlendSettings, db_path: PathBuf, blobs_dir: &PathBuf, initial_peers: Vec, @@ -216,23 +218,23 @@ pub fn new_node( initial_peers, }, }, - mix: MixConfig { - backend: mix_config.backend.clone(), + blend: BlendConfig { + backend: blend_config.backend.clone(), persistent_transmission: Default::default(), message_blend: MessageBlendSettings { cryptographic_processor: CryptographicProcessorSettings { - private_key: mix_config.private_key.to_bytes(), - num_mix_layers: 1, + private_key: blend_config.private_key.to_bytes(), + num_blend_layers: 1, }, temporal_processor: TemporalSchedulerSettings { max_delay_seconds: 2, }, }, - cover_traffic: nomos_mix_service::CoverTrafficExtSettings { + cover_traffic: nomos_blend_service::CoverTrafficExtSettings { epoch_duration: Duration::from_secs(432000), slot_duration: Duration::from_secs(20), }, - membership: mix_config.membership.clone(), + membership: blend_config.membership.clone(), }, da_network: DaNetworkConfig { backend: DaNetworkBackendSettings { @@ -285,10 +287,10 @@ pub fn new_node( cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapterSettings { topic: String::from(nomos_node::CONSENSUS_TOPIC), }, - mix_adapter_settings: - cryptarchia_consensus::mix::adapters::libp2p::LibP2pAdapterSettings { + blend_adapter_settings: + cryptarchia_consensus::blend::adapters::libp2p::LibP2pAdapterSettings { broadcast_settings: - nomos_mix_service::network::libp2p::Libp2pBroadcastSettings { + nomos_blend_service::network::libp2p::Libp2pBroadcastSettings { topic: String::from(nomos_node::CONSENSUS_TOPIC), }, }, @@ -321,12 +323,12 @@ pub fn new_node( .unwrap() } -pub fn new_mix_configs(listening_addresses: Vec) -> Vec { +pub fn new_blend_configs(listening_addresses: Vec) -> Vec { let settings = listening_addresses .iter() .map(|listening_address| { ( - Libp2pMixBackendSettings { + Libp2pBlendBackendSettings { listening_address: listening_address.clone(), node_key: ed25519::SecretKey::generate(), conn_maintenance: ConnectionMaintenanceSettings { @@ -353,7 +355,7 @@ pub fn new_mix_configs(listening_addresses: Vec) -> Vec = host_da_peer_addresses .clone() @@ -122,11 +122,11 @@ pub fn create_node_configs( network_config.swarm_config.port = host.network_port; network_config.initial_peers = host_network_init_peers.clone(); - // Mix config. - let mut mix_config = mix_configs[i].to_owned(); - mix_config.backend.listening_address = - Multiaddr::from_str(&format!("/ip4/0.0.0.0/udp/{}/quic-v1", host.mix_port)).unwrap(); - mix_config.membership = host_mix_membership.clone(); + // Blend config. + let mut blend_config = blend_configs[i].to_owned(); + blend_config.backend.listening_address = + Multiaddr::from_str(&format!("/ip4/0.0.0.0/udp/{}/quic-v1", host.blend_port)).unwrap(); + blend_config.membership = host_blend_membership.clone(); // Tracing config. let tracing_config = @@ -138,7 +138,7 @@ pub fn create_node_configs( consensus_config, da_config, network_config, - mix_config, + blend_config, api_config, tracing_config, }, @@ -174,16 +174,16 @@ fn update_da_peer_addresses( .collect() } -fn update_mix_membership( +fn update_blend_membership( hosts: Vec, - membership: Vec::PublicKey>>, -) -> Vec::PublicKey>> { + membership: Vec::PublicKey>>, +) -> Vec::PublicKey>> { membership .into_iter() .zip(hosts) .map(|(mut node, host)| { node.address = - Multiaddr::from_str(&format!("/ip4/{}/udp/{}/quic-v1", host.ip, host.mix_port)) + Multiaddr::from_str(&format!("/ip4/{}/udp/{}/quic-v1", host.ip, host.blend_port)) .unwrap(); node }) @@ -235,7 +235,7 @@ mod cfgsync_tests { identifier: "node".into(), network_port: 3000, da_network_port: 4044, - mix_port: 5000, + blend_port: 5000, }) .collect(); @@ -265,11 +265,11 @@ mod cfgsync_tests { for (host, config) in configs.iter() { let network_port = config.network_config.swarm_config.port; let da_network_port = extract_port(&config.da_config.listening_address); - let mix_port = extract_port(&config.mix_config.backend.listening_address); + let blend_port = extract_port(&config.blend_config.backend.listening_address); assert_eq!(network_port, host.network_port); assert_eq!(da_network_port, host.da_network_port); - assert_eq!(mix_port, host.mix_port); + assert_eq!(blend_port, host.blend_port); } } diff --git a/tests/Cargo.toml b/tests/Cargo.toml index c4ae7b46..bde318d7 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -10,9 +10,9 @@ executor-http-client = { path = "../clients/executor-http-client" } nomos-node = { path = "../nodes/nomos-node", default-features = false } nomos-executor = { path = "../nodes/nomos-executor", default-features = false } nomos-network = { path = "../nomos-services/network", features = ["libp2p"] } -nomos-mix-service = { path = "../nomos-services/mix", features = ["libp2p"] } -nomos-mix = { path = "../nomos-mix/core" } -nomos-mix-message = { path = "../nomos-mix/message" } +nomos-blend-service = { path = "../nomos-services/blend", features = ["libp2p"] } +nomos-blend = { path = "../nomos-blend/core" } +nomos-blend-message = { path = "../nomos-blend/message" } cryptarchia-consensus = { path = "../nomos-services/cryptarchia-consensus" } nomos-tracing = { path = "../nomos-tracing" } nomos-tracing-service = { path = "../nomos-services/tracing" } diff --git a/tests/src/nodes/executor.rs b/tests/src/nodes/executor.rs index 382b56eb..4515098c 100644 --- a/tests/src/nodes/executor.rs +++ b/tests/src/nodes/executor.rs @@ -6,6 +6,9 @@ use std::{net::SocketAddr, process::Child}; use crate::adjust_timeout; use crate::topology::configs::GeneralConfig; use cryptarchia_consensus::CryptarchiaSettings; +use nomos_blend::message_blend::{ + CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings, +}; use nomos_da_dispersal::backend::kzgrs::{DispersalKZGRSBackendSettings, EncoderSettings}; use nomos_da_dispersal::DispersalServiceSettings; use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings as IndexerStorageAdapterSettings; @@ -22,9 +25,6 @@ use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as Verif use nomos_da_verifier::DaVerifierServiceSettings; use nomos_executor::api::backend::AxumBackendSettings; use nomos_executor::config::Config; -use nomos_mix::message_blend::{ - CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings, -}; use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig}; use nomos_node::api::paths::{CL_METRICS, DA_GET_RANGE}; use nomos_node::RocksBackendSettings; @@ -154,23 +154,23 @@ pub fn create_executor_config(config: GeneralConfig) -> Config { initial_peers: config.network_config.initial_peers, }, }, - mix: nomos_mix_service::MixConfig { - backend: config.mix_config.backend, + blend: nomos_blend_service::BlendConfig { + backend: config.blend_config.backend, persistent_transmission: Default::default(), message_blend: MessageBlendSettings { cryptographic_processor: CryptographicProcessorSettings { - private_key: config.mix_config.private_key.to_bytes(), - num_mix_layers: 1, + private_key: config.blend_config.private_key.to_bytes(), + num_blend_layers: 1, }, temporal_processor: TemporalSchedulerSettings { max_delay_seconds: 2, }, }, - cover_traffic: nomos_mix_service::CoverTrafficExtSettings { + cover_traffic: nomos_blend_service::CoverTrafficExtSettings { epoch_duration: Duration::from_secs(432000), slot_duration: Duration::from_secs(20), }, - membership: config.mix_config.membership, + membership: config.blend_config.membership, }, cryptarchia: CryptarchiaSettings { leader_config: config.consensus_config.leader_config, @@ -183,10 +183,10 @@ pub fn create_executor_config(config: GeneralConfig) -> Config { cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapterSettings { topic: String::from(nomos_node::CONSENSUS_TOPIC), }, - mix_adapter_settings: - cryptarchia_consensus::mix::adapters::libp2p::LibP2pAdapterSettings { + blend_adapter_settings: + cryptarchia_consensus::blend::adapters::libp2p::LibP2pAdapterSettings { broadcast_settings: - nomos_mix_service::network::libp2p::Libp2pBroadcastSettings { + nomos_blend_service::network::libp2p::Libp2pBroadcastSettings { topic: String::from(nomos_node::CONSENSUS_TOPIC), }, }, diff --git a/tests/src/nodes/validator.rs b/tests/src/nodes/validator.rs index 1ce45546..655322e2 100644 --- a/tests/src/nodes/validator.rs +++ b/tests/src/nodes/validator.rs @@ -4,6 +4,9 @@ use std::time::Duration; use std::{net::SocketAddr, process::Child}; use cryptarchia_consensus::{CryptarchiaInfo, CryptarchiaSettings}; +use nomos_blend::message_blend::{ + CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings, +}; use nomos_core::block::Block; use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings as IndexerStorageAdapterSettings; use nomos_da_indexer::IndexerSettings; @@ -14,9 +17,6 @@ use nomos_da_sampling::{backend::kzgrs::KzgrsSamplingBackendSettings, DaSampling use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as VerifierStorageAdapterSettings; use nomos_da_verifier::{backend::kzgrs::KzgrsDaVerifierSettings, DaVerifierServiceSettings}; use nomos_mempool::MempoolMetrics; -use nomos_mix::message_blend::{ - CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings, -}; use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig}; use nomos_node::api::paths::{ CL_METRICS, CRYPTARCHIA_HEADERS, CRYPTARCHIA_INFO, DA_GET_RANGE, STORAGE_BLOCK, @@ -240,23 +240,23 @@ pub fn create_validator_config(config: GeneralConfig) -> Config { initial_peers: config.network_config.initial_peers, }, }, - mix: nomos_mix_service::MixConfig { - backend: config.mix_config.backend, + blend: nomos_blend_service::BlendConfig { + backend: config.blend_config.backend, persistent_transmission: Default::default(), message_blend: MessageBlendSettings { cryptographic_processor: CryptographicProcessorSettings { - private_key: config.mix_config.private_key.to_bytes(), - num_mix_layers: 1, + private_key: config.blend_config.private_key.to_bytes(), + num_blend_layers: 1, }, temporal_processor: TemporalSchedulerSettings { max_delay_seconds: 2, }, }, - cover_traffic: nomos_mix_service::CoverTrafficExtSettings { + cover_traffic: nomos_blend_service::CoverTrafficExtSettings { epoch_duration: Duration::from_secs(432000), slot_duration: Duration::from_secs(20), }, - membership: config.mix_config.membership, + membership: config.blend_config.membership, }, cryptarchia: CryptarchiaSettings { leader_config: config.consensus_config.leader_config, @@ -269,10 +269,10 @@ pub fn create_validator_config(config: GeneralConfig) -> Config { cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapterSettings { topic: String::from(nomos_node::CONSENSUS_TOPIC), }, - mix_adapter_settings: - cryptarchia_consensus::mix::adapters::libp2p::LibP2pAdapterSettings { + blend_adapter_settings: + cryptarchia_consensus::blend::adapters::libp2p::LibP2pAdapterSettings { broadcast_settings: - nomos_mix_service::network::libp2p::Libp2pBroadcastSettings { + nomos_blend_service::network::libp2p::Libp2pBroadcastSettings { topic: String::from(nomos_node::CONSENSUS_TOPIC), }, }, diff --git a/tests/src/tests/da.rs b/tests/src/tests/da.rs index 43764856..3e7c103b 100644 --- a/tests/src/tests/da.rs +++ b/tests/src/tests/da.rs @@ -28,7 +28,7 @@ async fn disseminate(executor: &Executor, data: &[u8]) { client.publish_blob(data.to_vec(), metadata).await.unwrap(); } -#[ignore = "todo: reenable after mixnet is tested"] +#[ignore = "todo: reenable after blendnet is tested"] #[tokio::test] async fn disseminate_and_retrieve() { let topology = Topology::spawn(TopologyConfig::validator_and_executor()).await; diff --git a/tests/src/topology/configs/mix.rs b/tests/src/topology/configs/blend.rs similarity index 66% rename from tests/src/topology/configs/mix.rs rename to tests/src/topology/configs/blend.rs index b8ad089d..e50417ae 100644 --- a/tests/src/topology/configs/mix.rs +++ b/tests/src/topology/configs/blend.rs @@ -1,29 +1,29 @@ use std::str::FromStr; +use nomos_blend::{conn_maintenance::ConnectionMaintenanceSettings, membership::Node}; +use nomos_blend_message::{sphinx::SphinxMessage, BlendMessage}; +use nomos_blend_service::backends::libp2p::Libp2pBlendBackendSettings; use nomos_libp2p::{ed25519, Multiaddr}; -use nomos_mix::{conn_maintenance::ConnectionMaintenanceSettings, membership::Node}; -use nomos_mix_message::{sphinx::SphinxMessage, MixMessage}; -use nomos_mix_service::backends::libp2p::Libp2pMixBackendSettings; use crate::get_available_port; #[derive(Clone)] -pub struct GeneralMixConfig { - pub backend: Libp2pMixBackendSettings, +pub struct GeneralBlendConfig { + pub backend: Libp2pBlendBackendSettings, pub private_key: x25519_dalek::StaticSecret, - pub membership: Vec::PublicKey>>, + pub membership: Vec::PublicKey>>, } -pub fn create_mix_configs(ids: &[[u8; 32]]) -> Vec { - let mut configs: Vec = ids +pub fn create_blend_configs(ids: &[[u8; 32]]) -> Vec { + let mut configs: Vec = ids .iter() .map(|id| { let mut node_key_bytes = *id; let node_key = ed25519::SecretKey::try_from_bytes(&mut node_key_bytes) .expect("Failed to generate secret key from bytes"); - GeneralMixConfig { - backend: Libp2pMixBackendSettings { + GeneralBlendConfig { + backend: Libp2pBlendBackendSettings { listening_address: Multiaddr::from_str(&format!( "/ip4/127.0.0.1/udp/{}/quic-v1", get_available_port(), @@ -42,7 +42,7 @@ pub fn create_mix_configs(ids: &[[u8; 32]]) -> Vec { }) .collect(); - let nodes = mix_nodes(&configs); + let nodes = blend_nodes(&configs); configs.iter_mut().for_each(|config| { config.membership = nodes.clone(); }); @@ -50,7 +50,9 @@ pub fn create_mix_configs(ids: &[[u8; 32]]) -> Vec { configs } -fn mix_nodes(configs: &[GeneralMixConfig]) -> Vec::PublicKey>> { +fn blend_nodes( + configs: &[GeneralBlendConfig], +) -> Vec::PublicKey>> { configs .iter() .map(|config| Node { diff --git a/tests/src/topology/configs/mod.rs b/tests/src/topology/configs/mod.rs index b4b62d03..91d9c8b4 100644 --- a/tests/src/topology/configs/mod.rs +++ b/tests/src/topology/configs/mod.rs @@ -1,14 +1,14 @@ pub mod api; +pub mod blend; pub mod consensus; pub mod da; -pub mod mix; pub mod network; pub mod tracing; use api::GeneralApiConfig; +use blend::GeneralBlendConfig; use consensus::GeneralConsensusConfig; use da::GeneralDaConfig; -use mix::GeneralMixConfig; use network::GeneralNetworkConfig; use tracing::GeneralTracingConfig; @@ -18,6 +18,6 @@ pub struct GeneralConfig { pub consensus_config: GeneralConsensusConfig, pub da_config: GeneralDaConfig, pub network_config: GeneralNetworkConfig, - pub mix_config: GeneralMixConfig, + pub blend_config: GeneralBlendConfig, pub tracing_config: GeneralTracingConfig, } diff --git a/tests/src/topology/mod.rs b/tests/src/topology/mod.rs index b2d15529..d531c933 100644 --- a/tests/src/topology/mod.rs +++ b/tests/src/topology/mod.rs @@ -15,8 +15,8 @@ use crate::{ }, topology::configs::{ api::create_api_configs, + blend::create_blend_configs, consensus::{create_consensus_configs, ConsensusParams}, - mix::create_mix_configs, }, }; @@ -77,7 +77,7 @@ impl Topology { let consensus_configs = create_consensus_configs(&ids, config.consensus_params); let da_configs = create_da_configs(&ids, config.da_params); let network_configs = create_network_configs(&ids, config.network_params); - let mix_configs = create_mix_configs(&ids); + let blend_configs = create_blend_configs(&ids); let api_configs = create_api_configs(&ids); let tracing_configs = create_tracing_configs(&ids); @@ -87,7 +87,7 @@ impl Topology { consensus_config: consensus_configs[i].to_owned(), da_config: da_configs[i].to_owned(), network_config: network_configs[i].to_owned(), - mix_config: mix_configs[i].to_owned(), + blend_config: blend_configs[i].to_owned(), api_config: api_configs[i].to_owned(), tracing_config: tracing_configs[i].to_owned(), }); @@ -100,7 +100,7 @@ impl Topology { consensus_config: consensus_configs[i].to_owned(), da_config: da_configs[i].to_owned(), network_config: network_configs[i].to_owned(), - mix_config: mix_configs[i].to_owned(), + blend_config: blend_configs[i].to_owned(), api_config: api_configs[i].to_owned(), tracing_config: tracing_configs[i].to_owned(), });