Merge branch 'master' into chore-cluster-tests

This commit is contained in:
Roman Zajic 2024-12-12 18:42:53 +08:00 committed by GitHub
commit 8a9ee5e37f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
68 changed files with 605 additions and 462 deletions

View File

@ -23,10 +23,10 @@ members = [
"nomos-services/data-availability/verifier",
"nomos-services/data-availability/dispersal",
"nomos-services/data-availability/tests",
"nomos-services/mix",
"nomos-mix/core",
"nomos-mix/message",
"nomos-mix/network",
"nomos-services/blend",
"nomos-blend/core",
"nomos-blend/message",
"nomos-blend/network",
"nomos-tracing",
"nomos-cli",
"nomos-utils",

View File

@ -23,7 +23,7 @@ nomos-mempool = { path = "../../nomos-services/mempool", features = [
"libp2p",
] }
nomos-network = { path = "../../nomos-services/network", features = ["libp2p"] }
nomos-mix-service = { path = "../../nomos-services/mix", features = ["libp2p"] }
nomos-blend-service = { path = "../../nomos-services/blend", features = ["libp2p"] }
nomos-node = { path = "../nomos-node" }
nomos-storage = { path = "../../nomos-services/storage", features = ["rocksdb"] }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }

View File

@ -1,14 +1,16 @@
// std
// crates
use color_eyre::eyre::Result;
use nomos_blend_service::backends::libp2p::Libp2pBlendBackend as BlendBackend;
use nomos_blend_service::network::libp2p::Libp2pAdapter as BlendNetworkAdapter;
use nomos_blend_service::BlendService;
use nomos_da_network_service::backends::libp2p::executor::DaNetworkExecutorBackend;
use nomos_da_network_service::NetworkService as DaNetworkService;
use nomos_mix_service::backends::libp2p::Libp2pMixBackend as MixBackend;
use nomos_mix_service::network::libp2p::Libp2pAdapter as MixNetworkAdapter;
use nomos_mix_service::MixService;
use nomos_network::backends::libp2p::Libp2p as NetworkBackend;
use nomos_node::{
config::{update_cryptarchia_consensus, update_mix, update_network, update_tracing, MixArgs},
config::{
update_blend, update_cryptarchia_consensus, update_network, update_tracing, BlendArgs,
},
CryptarchiaArgs, HttpArgs, LogArgs, NetworkArgs, NetworkService, Tracing, Wire,
};
use nomos_storage::backends::rocksdb::RocksBackend;
@ -22,7 +24,7 @@ use crate::ExecutorApiService;
pub struct Config {
pub tracing: <Tracing as ServiceData>::Settings,
pub network: <NetworkService<NetworkBackend> as ServiceData>::Settings,
pub mix: <MixService<MixBackend, MixNetworkAdapter> as ServiceData>::Settings,
pub blend: <BlendService<BlendBackend, BlendNetworkAdapter> as ServiceData>::Settings,
pub da_dispersal: <crate::DaDispersal as ServiceData>::Settings,
pub da_network:
<DaNetworkService<DaNetworkExecutorBackend<FillFromNodeList>> as ServiceData>::Settings,
@ -39,13 +41,13 @@ impl Config {
mut self,
log_args: LogArgs,
network_args: NetworkArgs,
mix_args: MixArgs,
blend_args: BlendArgs,
http_args: HttpArgs,
cryptarchia_args: CryptarchiaArgs,
) -> Result<Self> {
update_tracing(&mut self.tracing, log_args)?;
update_network(&mut self.network, network_args)?;
update_mix(&mut self.mix, mix_args)?;
update_blend(&mut self.blend, blend_args)?;
update_http(&mut self.http, http_args)?;
update_cryptarchia_consensus(&mut self.cryptarchia, cryptarchia_args)?;
Ok(self)

View File

@ -8,6 +8,9 @@ use rand_chacha::ChaCha20Rng;
use api::backend::AxumBackend;
use kzgrs_backend::common::blob::DaBlob;
use nomos_api::ApiService;
use nomos_blend_service::backends::libp2p::Libp2pBlendBackend as BlendBackend;
use nomos_blend_service::network::libp2p::Libp2pAdapter as BlendNetworkAdapter;
use nomos_blend_service::BlendService;
use nomos_da_dispersal::adapters::mempool::kzgrs::KzgrsMempoolAdapter;
use nomos_da_dispersal::adapters::network::libp2p::Libp2pNetworkAdapter as DispersalNetworkAdapter;
use nomos_da_dispersal::backend::kzgrs::DispersalKZGRSBackend;
@ -18,9 +21,6 @@ use nomos_da_sampling::storage::adapters::rocksdb::RocksAdapter as SamplingStora
use nomos_da_verifier::backend::kzgrs::KzgrsDaVerifier;
use nomos_da_verifier::network::adapters::executor::Libp2pAdapter as VerifierNetworkAdapter;
use nomos_mempool::backend::mockpool::MockPool;
use nomos_mix_service::backends::libp2p::Libp2pMixBackend as MixBackend;
use nomos_mix_service::network::libp2p::Libp2pAdapter as MixNetworkAdapter;
use nomos_mix_service::MixService;
use nomos_node::DispersedBlobInfo;
use nomos_node::HeaderId;
use nomos_node::MempoolNetworkAdapter;
@ -88,7 +88,7 @@ pub struct NomosExecutor {
#[cfg(feature = "tracing")]
tracing: ServiceHandle<Tracing>,
network: ServiceHandle<NetworkService<NetworkBackend>>,
mix: ServiceHandle<MixService<MixBackend, MixNetworkAdapter>>,
blend: ServiceHandle<BlendService<BlendBackend, BlendNetworkAdapter>>,
da_dispersal: ServiceHandle<DaDispersal>,
da_indexer: ServiceHandle<ExecutorDaIndexer>,
da_verifier: ServiceHandle<ExecutorDaVerifier>,

View File

@ -5,7 +5,7 @@ use color_eyre::eyre::{eyre, Result};
use nomos_executor::config::Config as ExecutorConfig;
use nomos_executor::{NomosExecutor, NomosExecutorServiceSettings};
use nomos_node::{
config::MixArgs, BlobInfo, CryptarchiaArgs, DaMempoolSettings, DispersedBlobInfo, HttpArgs,
config::BlendArgs, BlobInfo, CryptarchiaArgs, DaMempoolSettings, DispersedBlobInfo, HttpArgs,
LogArgs, MempoolAdapterSettings, NetworkArgs, Transaction, Tx, TxMempoolSettings, CL_TOPIC,
DA_TOPIC,
};
@ -25,9 +25,9 @@ struct Args {
/// Overrides network config.
#[clap(flatten)]
network_args: NetworkArgs,
/// Overrides mix config.
/// Overrides blend config.
#[clap(flatten)]
mix_args: MixArgs,
blend_args: BlendArgs,
/// Overrides http config.
#[clap(flatten)]
http_args: HttpArgs,
@ -41,14 +41,14 @@ fn main() -> Result<()> {
log_args,
http_args,
network_args,
mix_args,
blend_args,
cryptarchia_args,
} = Args::parse();
let config = serde_yaml::from_reader::<_, ExecutorConfig>(std::fs::File::open(config)?)?
.update_from_args(
log_args,
network_args,
mix_args,
blend_args,
http_args,
cryptarchia_args,
)?;
@ -63,7 +63,7 @@ fn main() -> Result<()> {
let app = OverwatchRunner::<NomosExecutor>::run(
NomosExecutorServiceSettings {
network: config.network,
mix: config.mix,
blend: config.blend,
#[cfg(feature = "tracing")]
tracing: config.tracing,
http: config.http,

View File

@ -28,7 +28,7 @@ nomos-da-verifier = { path = "../../nomos-services/data-availability/verifier",
nomos-da-indexer = { path = "../../nomos-services/data-availability/indexer", features = ["rocksdb-backend"] }
nomos-da-network-service = { path = "../../nomos-services/data-availability/network" }
nomos-network = { path = "../../nomos-services/network", features = ["libp2p"] }
nomos-mix-service = { path = "../../nomos-services/mix", features = ["libp2p"] }
nomos-blend-service = { path = "../../nomos-services/blend", features = ["libp2p"] }
nomos-api = { path = "../../nomos-services/api" }
nomos-tracing = { path = "../../nomos-tracing" }
nomos-tracing-service = { path = "../../nomos-services/tracing" }

View File

@ -48,7 +48,7 @@ network:
node_key: 40fb62acf1604000c1b8d3bd0880e43eb2f6ae52029fde75d992ba0fed6e01c3
initial_peers: []
mix:
blend:
backend:
listening_address: /ip4/0.0.0.0/udp/3001/quic-v1
node_key: 40fb62acf1604000c1b8d3bd0880e43eb2f6ae52029fde75d992ba0fed6e01c3
@ -59,7 +59,7 @@ mix:
message_blend:
cryptographic_processor:
private_key: [0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]
num_mix_layers: 1
num_blend_layers: 1
temporal_processor:
max_delay_seconds: 5
cover_traffic:

View File

@ -13,13 +13,13 @@ use serde::{Deserialize, Serialize};
use tracing::Level;
// internal
use crate::{NomosApiService, NomosDaMembership, Wire};
use nomos_blend_service::backends::libp2p::Libp2pBlendBackend as BlendBackend;
use nomos_blend_service::network::libp2p::Libp2pAdapter as BlendNetworkAdapter;
use nomos_blend_service::BlendService;
use nomos_core::{proofs::covenant::CovenantProof, staking::NMO_UNIT};
use nomos_da_network_service::backends::libp2p::validator::DaNetworkValidatorBackend;
use nomos_da_network_service::NetworkService as DaNetworkService;
use nomos_libp2p::{ed25519::SecretKey, Multiaddr};
use nomos_mix_service::backends::libp2p::Libp2pMixBackend as MixBackend;
use nomos_mix_service::network::libp2p::Libp2pAdapter as MixNetworkAdapter;
use nomos_mix_service::MixService;
use nomos_network::backends::libp2p::Libp2p as NetworkBackend;
use nomos_network::NetworkService;
use nomos_storage::backends::rocksdb::RocksBackend;
@ -75,16 +75,16 @@ pub struct NetworkArgs {
}
#[derive(Parser, Debug, Clone)]
pub struct MixArgs {
#[clap(long = "mix-addr", env = "MIX_ADDR")]
mix_addr: Option<Multiaddr>,
pub struct BlendArgs {
#[clap(long = "blend-addr", env = "BLEND_ADDR")]
blend_addr: Option<Multiaddr>,
// TODO: Use either the raw bytes or the key type directly to delegate error handling to clap
#[clap(long = "mix-node-key", env = "MIX_NODE_KEY")]
mix_node_key: Option<String>,
#[clap(long = "blend-node-key", env = "BLEND_NODE_KEY")]
blend_node_key: Option<String>,
#[clap(long = "mix-num-mix-layers", env = "MIX_NUM_MIX_LAYERS")]
mix_num_mix_layers: Option<usize>,
#[clap(long = "blend-num-blend-layers", env = "BLEND_NUM_BLEND_LAYERS")]
blend_num_blend_layers: Option<usize>,
}
#[derive(Parser, Debug, Clone)]
@ -130,7 +130,7 @@ pub struct CryptarchiaArgs {
pub struct Config {
pub tracing: <Tracing as ServiceData>::Settings,
pub network: <NetworkService<NetworkBackend> as ServiceData>::Settings,
pub mix: <MixService<MixBackend, MixNetworkAdapter> as ServiceData>::Settings,
pub blend: <BlendService<BlendBackend, BlendNetworkAdapter> as ServiceData>::Settings,
pub da_network:
<DaNetworkService<DaNetworkValidatorBackend<FillFromNodeList>> as ServiceData>::Settings,
pub da_indexer: <crate::NodeDaIndexer as ServiceData>::Settings,
@ -148,13 +148,13 @@ impl Config {
mut self,
log_args: LogArgs,
network_args: NetworkArgs,
mix_args: MixArgs,
blend_args: BlendArgs,
http_args: HttpArgs,
cryptarchia_args: CryptarchiaArgs,
) -> Result<Self> {
update_tracing(&mut self.tracing, log_args)?;
update_network(&mut self.network, network_args)?;
update_mix(&mut self.mix, mix_args)?;
update_blend(&mut self.blend, blend_args)?;
update_http(&mut self.http, http_args)?;
update_cryptarchia_consensus(&mut self.cryptarchia, cryptarchia_args)?;
Ok(self)
@ -237,27 +237,27 @@ pub fn update_network(
Ok(())
}
pub fn update_mix(
mix: &mut <MixService<MixBackend, MixNetworkAdapter> as ServiceData>::Settings,
mix_args: MixArgs,
pub fn update_blend(
blend: &mut <BlendService<BlendBackend, BlendNetworkAdapter> as ServiceData>::Settings,
blend_args: BlendArgs,
) -> Result<()> {
let MixArgs {
mix_addr,
mix_node_key,
mix_num_mix_layers,
} = mix_args;
let BlendArgs {
blend_addr,
blend_node_key,
blend_num_blend_layers,
} = blend_args;
if let Some(addr) = mix_addr {
mix.backend.listening_address = addr;
if let Some(addr) = blend_addr {
blend.backend.listening_address = addr;
}
if let Some(node_key) = mix_node_key {
if let Some(node_key) = blend_node_key {
let mut key_bytes = hex::decode(node_key)?;
mix.backend.node_key = SecretKey::try_from_bytes(key_bytes.as_mut_slice())?;
blend.backend.node_key = SecretKey::try_from_bytes(key_bytes.as_mut_slice())?;
}
if let Some(num_mix_layers) = mix_num_mix_layers {
mix.message_blend.cryptographic_processor.num_mix_layers = num_mix_layers;
if let Some(num_blend_layers) = blend_num_blend_layers {
blend.message_blend.cryptographic_processor.num_blend_layers = num_blend_layers;
}
Ok(())

View File

@ -11,6 +11,9 @@ pub use config::{Config, CryptarchiaArgs, HttpArgs, LogArgs, NetworkArgs};
use kzgrs_backend::common::blob::DaBlob;
pub use kzgrs_backend::dispersal::BlobInfo;
use nomos_api::ApiService;
pub use nomos_blend_service::backends::libp2p::Libp2pBlendBackend as BlendBackend;
pub use nomos_blend_service::network::libp2p::Libp2pAdapter as BlendNetworkAdapter;
pub use nomos_blend_service::BlendService;
pub use nomos_core::da::blob::info::DispersedBlobInfo;
pub use nomos_core::{
da::blob::select::FillSize as FillSizeWithBlobs, tx::select::FillSize as FillSizeWithTx,
@ -35,9 +38,6 @@ pub use nomos_mempool::network::adapters::libp2p::{
};
pub use nomos_mempool::TxMempoolSettings;
use nomos_mempool::{backend::mockpool::MockPool, TxMempoolService};
pub use nomos_mix_service::backends::libp2p::Libp2pMixBackend as MixBackend;
pub use nomos_mix_service::network::libp2p::Libp2pAdapter as MixNetworkAdapter;
pub use nomos_mix_service::MixService;
pub use nomos_network::backends::libp2p::Libp2p as NetworkBackend;
pub use nomos_network::NetworkService;
pub use nomos_storage::{
@ -86,7 +86,11 @@ pub const MB16: usize = 1024 * 1024 * 16;
pub type Cryptarchia<SamplingAdapter> = cryptarchia_consensus::CryptarchiaConsensus<
cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter<Tx, BlobInfo>,
cryptarchia_consensus::mix::adapters::libp2p::LibP2pAdapter<MixNetworkAdapter, Tx, BlobInfo>,
cryptarchia_consensus::blend::adapters::libp2p::LibP2pAdapter<
BlendNetworkAdapter,
Tx,
BlobInfo,
>,
MockPool<HeaderId, Tx, <Tx as Transaction>::Hash>,
MempoolNetworkAdapter<Tx, <Tx as Transaction>::Hash>,
MockPool<HeaderId, BlobInfo, <BlobInfo as DispersedBlobInfo>::BlobId>,
@ -124,7 +128,11 @@ pub type DaIndexer<SamplingAdapter> = DataIndexerService<
CryptarchiaConsensusAdapter<Tx, BlobInfo>,
// Cryptarchia specific, should be the same as in `Cryptarchia` type above.
cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter<Tx, BlobInfo>,
cryptarchia_consensus::mix::adapters::libp2p::LibP2pAdapter<MixNetworkAdapter, Tx, BlobInfo>,
cryptarchia_consensus::blend::adapters::libp2p::LibP2pAdapter<
BlendNetworkAdapter,
Tx,
BlobInfo,
>,
MockPool<HeaderId, Tx, <Tx as Transaction>::Hash>,
MempoolNetworkAdapter<Tx, <Tx as Transaction>::Hash>,
MockPool<HeaderId, BlobInfo, <BlobInfo as DispersedBlobInfo>::BlobId>,
@ -160,7 +168,7 @@ pub struct Nomos {
#[cfg(feature = "tracing")]
tracing: ServiceHandle<Tracing>,
network: ServiceHandle<NetworkService<NetworkBackend>>,
mix: ServiceHandle<MixService<MixBackend, MixNetworkAdapter>>,
blend: ServiceHandle<BlendService<BlendBackend, BlendNetworkAdapter>>,
da_indexer: ServiceHandle<NodeDaIndexer>,
da_verifier: ServiceHandle<NodeDaVerifier>,
da_sampling: ServiceHandle<NodeDaSampling>,

View File

@ -1,6 +1,6 @@
use kzgrs_backend::dispersal::BlobInfo;
use nomos_node::{
config::MixArgs, Config, CryptarchiaArgs, HttpArgs, LogArgs, NetworkArgs, Nomos,
config::BlendArgs, Config, CryptarchiaArgs, HttpArgs, LogArgs, NetworkArgs, Nomos,
NomosServiceSettings, Tx,
};
@ -25,9 +25,9 @@ struct Args {
/// Overrides network config.
#[clap(flatten)]
network_args: NetworkArgs,
/// Overrides mix config.
/// Overrides blend config.
#[clap(flatten)]
mix_args: MixArgs,
blend_args: BlendArgs,
/// Overrides http config.
#[clap(flatten)]
http_args: HttpArgs,
@ -41,14 +41,14 @@ fn main() -> Result<()> {
log_args,
http_args,
network_args,
mix_args,
blend_args,
cryptarchia_args,
} = Args::parse();
let config = serde_yaml::from_reader::<_, Config>(std::fs::File::open(config)?)?
.update_from_args(
log_args,
network_args,
mix_args,
blend_args,
http_args,
cryptarchia_args,
)?;
@ -63,7 +63,7 @@ fn main() -> Result<()> {
let app = OverwatchRunner::<Nomos>::run(
NomosServiceSettings {
network: config.network,
mix: config.mix,
blend: config.blend,
#[cfg(feature = "tracing")]
tracing: config.tracing,
http: config.http,

View File

@ -1,5 +1,5 @@
[package]
name = "nomos-mix"
name = "nomos-blend"
version = "0.1.0"
edition = "2021"
@ -11,7 +11,7 @@ tokio-stream = "0.1"
tracing = "0.1"
rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
nomos-mix-message = { path = "../message" }
nomos-blend-message = { path = "../message" }
futures = "0.3"
multiaddr = "0.18"
x25519-dalek = { version = "2", features = ["getrandom", "static_secrets"] }

View File

@ -6,7 +6,7 @@ use std::{
use fixed::types::U57F7;
use multiaddr::Multiaddr;
use nomos_mix_message::MixMessage;
use nomos_blend_message::BlendMessage;
use rand::RngCore;
use serde::{Deserialize, Serialize};
@ -24,7 +24,7 @@ pub struct ConnectionMaintenanceSettings {
/// based on the number of messages sent by each peer in time windows
pub struct ConnectionMaintenance<M, R>
where
M: MixMessage,
M: BlendMessage,
R: RngCore,
{
settings: ConnectionMaintenanceSettings,
@ -39,7 +39,7 @@ where
impl<M, R> ConnectionMaintenance<M, R>
where
M: MixMessage,
M: BlendMessage,
M::PublicKey: PartialEq,
R: RngCore,
{
@ -260,7 +260,7 @@ impl ConnectionMonitor {
#[cfg(test)]
mod tests {
use nomos_mix_message::mock::MockMixMessage;
use nomos_blend_message::mock::MockBlendMessage;
use rand::{rngs::ThreadRng, thread_rng};
use crate::membership::Node;
@ -400,9 +400,9 @@ mod tests {
fn init_maintenance(
settings: ConnectionMaintenanceSettings,
node_count: usize,
) -> ConnectionMaintenance<MockMixMessage, ThreadRng> {
) -> ConnectionMaintenance<MockBlendMessage, ThreadRng> {
let nodes = nodes(node_count);
let mut maintenance = ConnectionMaintenance::<MockMixMessage, ThreadRng>::new(
let mut maintenance = ConnectionMaintenance::<MockBlendMessage, ThreadRng>::new(
settings,
Membership::new(nodes.clone(), nodes[0].public_key),
thread_rng(),
@ -414,7 +414,7 @@ mod tests {
maintenance
}
fn nodes(count: usize) -> Vec<Node<<MockMixMessage as MixMessage>::PublicKey>> {
fn nodes(count: usize) -> Vec<Node<<MockBlendMessage as BlendMessage>::PublicKey>> {
(0..count)
.map(|i| Node {
address: format!("/ip4/127.0.0.1/udp/{}/quic-v1", 1000 + i)

View File

@ -1,7 +1,7 @@
use blake2::digest::consts::U4;
use blake2::Digest;
use futures::{Stream, StreamExt};
use nomos_mix_message::MixMessage;
use nomos_blend_message::BlendMessage;
use serde::Deserialize;
use std::collections::HashSet;
use std::hash::Hash;
@ -53,7 +53,7 @@ impl<EpochStream, SlotStream, Message> Stream for CoverTraffic<EpochStream, Slot
where
EpochStream: Stream<Item = usize> + Send + Sync + Unpin,
SlotStream: Stream<Item = usize> + Send + Sync + Unpin,
Message: MixMessage + Send + Sync + Unpin,
Message: BlendMessage + Send + Sync + Unpin,
{
type Item = Vec<u8>;

View File

@ -0,0 +1,19 @@
pub mod conn_maintenance;
pub mod cover_traffic;
pub mod membership;
pub mod message_blend;
pub mod persistent_transmission;
pub enum BlendOutgoingMessage {
FullyUnwrapped(Vec<u8>),
Outbound(Vec<u8>),
}
impl From<BlendOutgoingMessage> for Vec<u8> {
fn from(value: BlendOutgoingMessage) -> Self {
match value {
BlendOutgoingMessage::FullyUnwrapped(v) => v,
BlendOutgoingMessage::Outbound(v) => v,
}
}
}

View File

@ -1,7 +1,7 @@
use std::collections::HashSet;
use multiaddr::Multiaddr;
use nomos_mix_message::MixMessage;
use nomos_blend_message::BlendMessage;
use rand::{
seq::{IteratorRandom, SliceRandom},
Rng,
@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize};
#[derive(Clone, Debug)]
pub struct Membership<M>
where
M: MixMessage,
M: BlendMessage,
{
remote_nodes: Vec<Node<M::PublicKey>>,
local_node: Node<M::PublicKey>,
@ -25,7 +25,7 @@ pub struct Node<K> {
impl<M> Membership<M>
where
M: MixMessage,
M: BlendMessage,
M::PublicKey: PartialEq,
{
pub fn new(nodes: Vec<Node<M::PublicKey>>, local_public_key: M::PublicKey) -> Self {

View File

@ -1,5 +1,5 @@
use crate::membership::Membership;
use nomos_mix_message::MixMessage;
use nomos_blend_message::BlendMessage;
use rand::RngCore;
use serde::{Deserialize, Serialize};
@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
/// for the message indistinguishability.
pub struct CryptographicProcessor<R, M>
where
M: MixMessage,
M: BlendMessage,
{
settings: CryptographicProcessorSettings<M::PrivateKey>,
membership: Membership<M>,
@ -17,13 +17,13 @@ where
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CryptographicProcessorSettings<K> {
pub private_key: K,
pub num_mix_layers: usize,
pub num_blend_layers: usize,
}
impl<R, M> CryptographicProcessor<R, M>
where
R: RngCore,
M: MixMessage,
M: BlendMessage,
M::PublicKey: Clone + PartialEq,
{
pub fn new(
@ -42,7 +42,7 @@ where
// TODO: Use the actual Sphinx encoding instead of mock.
let public_keys = self
.membership
.choose_remote_nodes(&mut self.rng, self.settings.num_mix_layers)
.choose_remote_nodes(&mut self.rng, self.settings.num_blend_layers)
.iter()
.map(|node| node.public_key.clone())
.collect::<Vec<_>>();

View File

@ -13,8 +13,8 @@ pub use temporal::TemporalSchedulerSettings;
use crate::membership::Membership;
use crate::message_blend::crypto::CryptographicProcessor;
use crate::message_blend::temporal::TemporalProcessorExt;
use crate::MixOutgoingMessage;
use nomos_mix_message::MixMessage;
use crate::BlendOutgoingMessage;
use nomos_blend_message::BlendMessage;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
@ -24,23 +24,23 @@ use tokio_stream::wrappers::UnboundedReceiverStream;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MessageBlendSettings<M>
where
M: MixMessage,
M: BlendMessage,
M::PrivateKey: Serialize + DeserializeOwned,
{
pub cryptographic_processor: CryptographicProcessorSettings<M::PrivateKey>,
pub temporal_processor: TemporalSchedulerSettings,
}
/// [`MessageBlendStream`] handles the entire mixing tiers process
/// [`MessageBlendStream`] handles the entire blending tiers process
/// - Unwraps incoming messages received from network using [`CryptographicProcessor`]
/// - Pushes unwrapped messages to [`TemporalProcessor`]
pub struct MessageBlendStream<S, Rng, M, Scheduler>
where
M: MixMessage,
M: BlendMessage,
{
input_stream: S,
output_stream: Pin<Box<dyn Stream<Item = MixOutgoingMessage> + Send + Sync + 'static>>,
temporal_sender: UnboundedSender<MixOutgoingMessage>,
output_stream: Pin<Box<dyn Stream<Item = BlendOutgoingMessage> + Send + Sync + 'static>>,
temporal_sender: UnboundedSender<BlendOutgoingMessage>,
cryptographic_processor: CryptographicProcessor<Rng, M>,
_rng: PhantomData<Rng>,
_scheduler: PhantomData<Scheduler>,
@ -50,7 +50,7 @@ impl<S, Rng, M, Scheduler> MessageBlendStream<S, Rng, M, Scheduler>
where
S: Stream<Item = Vec<u8>>,
Rng: RngCore + Unpin + Send + 'static,
M: MixMessage,
M: BlendMessage,
M::PrivateKey: Serialize + DeserializeOwned,
M::PublicKey: Clone + PartialEq,
M::Error: Debug,
@ -85,9 +85,9 @@ where
match self.cryptographic_processor.unwrap_message(&message) {
Ok((unwrapped_message, fully_unwrapped)) => {
let message = if fully_unwrapped {
MixOutgoingMessage::FullyUnwrapped(unwrapped_message)
BlendOutgoingMessage::FullyUnwrapped(unwrapped_message)
} else {
MixOutgoingMessage::Outbound(unwrapped_message)
BlendOutgoingMessage::Outbound(unwrapped_message)
};
if let Err(e) = self.temporal_sender.send(message) {
tracing::error!("Failed to send message to the outbound channel: {e:?}");
@ -104,13 +104,13 @@ impl<S, Rng, M, Scheduler> Stream for MessageBlendStream<S, Rng, M, Scheduler>
where
S: Stream<Item = Vec<u8>> + Unpin,
Rng: RngCore + Unpin + Send + 'static,
M: MixMessage + Unpin,
M: BlendMessage + Unpin,
M::PrivateKey: Serialize + DeserializeOwned + Unpin,
M::PublicKey: Clone + PartialEq + Unpin,
M::Error: Debug,
Scheduler: Stream<Item = ()> + Unpin + Send + Sync + 'static,
{
type Item = MixOutgoingMessage;
type Item = BlendOutgoingMessage;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Poll::Ready(Some(message)) = self.input_stream.poll_next_unpin(cx) {
@ -123,7 +123,7 @@ where
pub trait MessageBlendExt<Rng, M, Scheduler>: Stream<Item = Vec<u8>>
where
Rng: RngCore + Send + Unpin + 'static,
M: MixMessage,
M: BlendMessage,
M::PrivateKey: Serialize + DeserializeOwned,
M::PublicKey: Clone + PartialEq,
M::Error: Debug,
@ -153,7 +153,7 @@ impl<T, Rng, M, S> MessageBlendExt<Rng, M, S> for T
where
T: Stream<Item = Vec<u8>>,
Rng: RngCore + Unpin + Send + 'static,
M: MixMessage,
M: BlendMessage,
M::PrivateKey: Clone + Serialize + DeserializeOwned + PartialEq,
M::PublicKey: Clone + Serialize + DeserializeOwned + PartialEq,
M::Error: Debug,

View File

@ -1,5 +1,5 @@
use futures::{Stream, StreamExt};
use nomos_mix_message::MixMessage;
use nomos_blend_message::BlendMessage;
use rand::{distributions::Uniform, prelude::Distribution, Rng, RngCore};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
@ -33,14 +33,14 @@ where
coin: Coin<Rng>,
stream: S,
scheduler: Scheduler,
_mix_message: PhantomData<M>,
_blend_message: PhantomData<M>,
}
impl<S, Rng, M, Scheduler> PersistentTransmissionStream<S, Rng, M, Scheduler>
where
S: Stream,
Rng: RngCore,
M: MixMessage,
M: BlendMessage,
Scheduler: Stream<Item = ()>,
{
pub fn new(
@ -54,7 +54,7 @@ where
coin,
stream,
scheduler,
_mix_message: Default::default(),
_blend_message: Default::default(),
}
}
}
@ -63,7 +63,7 @@ impl<S, Rng, M, Scheduler> Stream for PersistentTransmissionStream<S, Rng, M, Sc
where
S: Stream<Item = Vec<u8>> + Unpin,
Rng: RngCore + Unpin,
M: MixMessage + Unpin,
M: BlendMessage + Unpin,
Scheduler: Stream<Item = ()> + Unpin,
{
type Item = Vec<u8>;
@ -91,7 +91,7 @@ where
pub trait PersistentTransmissionExt<Rng, M, Scheduler>: Stream
where
Rng: RngCore,
M: MixMessage,
M: BlendMessage,
Scheduler: Stream<Item = ()>,
{
fn persistent_transmission(
@ -111,7 +111,7 @@ impl<S, Rng, M, Scheduler> PersistentTransmissionExt<Rng, M, Scheduler> for S
where
S: Stream,
Rng: RngCore,
M: MixMessage,
M: BlendMessage,
M::PublicKey: Clone + Serialize + DeserializeOwned,
Scheduler: Stream<Item = ()>,
{
@ -150,7 +150,7 @@ enum CoinError {
mod tests {
use super::*;
use futures::StreamExt;
use nomos_mix_message::mock::MockMixMessage;
use nomos_blend_message::mock::MockBlendMessage;
use rand::SeedableRng;
use rand_chacha::ChaCha8Rng;
use std::time::Duration;
@ -199,7 +199,7 @@ mod tests {
let mut persistent_transmission_stream: PersistentTransmissionStream<
_,
_,
MockMixMessage,
MockBlendMessage,
_,
> = stream.persistent_transmission(
settings,
@ -230,12 +230,12 @@ mod tests {
);
assert_interval!(&mut last_time, lower_bound, upper_bound);
assert!(MockMixMessage::is_drop_message(
assert!(MockBlendMessage::is_drop_message(
&persistent_transmission_stream.next().await.unwrap()
));
assert_interval!(&mut last_time, lower_bound, upper_bound);
assert!(MockMixMessage::is_drop_message(
assert!(MockBlendMessage::is_drop_message(
&persistent_transmission_stream.next().await.unwrap()
));
assert_interval!(&mut last_time, lower_bound, upper_bound);

View File

@ -1,5 +1,5 @@
[package]
name = "nomos-mix-message"
name = "nomos-blend-message"
version = "0.1.0"
edition = "2021"

View File

@ -1,7 +1,7 @@
pub mod mock;
pub mod sphinx;
pub trait MixMessage {
pub trait BlendMessage {
type PublicKey;
type PrivateKey;
type Error;
@ -17,7 +17,7 @@ pub trait MixMessage {
/// (False if the message still has layers to be unwrapped, true otherwise)
///
/// If the input message was already fully unwrapped, or if its format is invalid,
/// this function returns `[Error::InvalidMixMessage]`.
/// this function returns `[Error::InvalidBlendMessage]`.
fn unwrap_message(
message: &[u8],
private_key: &Self::PrivateKey,

View File

@ -1,7 +1,7 @@
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("Invalid mix message format")]
InvalidMixMessage,
#[error("Invalid blend message format")]
InvalidBlendMessage,
#[error("Payload is too large")]
PayloadTooLarge,
#[error("Invalid number of layers")]

View File

@ -2,7 +2,7 @@ pub mod error;
use error::Error;
use crate::MixMessage;
use crate::BlendMessage;
const NODE_ID_SIZE: usize = 32;
// TODO: Move MAX_PAYLOAD_SIZE and MAX_LAYERS to the upper layer (service layer).
@ -15,9 +15,9 @@ pub const MESSAGE_SIZE: usize =
/// A mock implementation of the Sphinx encoding.
#[derive(Clone, Debug)]
pub struct MockMixMessage;
pub struct MockBlendMessage;
impl MixMessage for MockMixMessage {
impl BlendMessage for MockBlendMessage {
type PublicKey = [u8; NODE_ID_SIZE];
type PrivateKey = [u8; NODE_ID_SIZE];
type Error = Error;
@ -60,7 +60,7 @@ impl MixMessage for MockMixMessage {
private_key: &Self::PrivateKey,
) -> Result<(Vec<u8>, bool), Self::Error> {
if message.len() != MESSAGE_SIZE {
return Err(Error::InvalidMixMessage);
return Err(Error::InvalidBlendMessage);
}
// In this mock, we don't decrypt anything. So, we use private key as just a node ID.
@ -80,7 +80,7 @@ impl MixMessage for MockMixMessage {
Some(pos) => {
return Ok((padded_payload[0..pos].to_vec(), true));
}
_ => return Err(Error::InvalidMixMessage),
_ => return Err(Error::InvalidBlendMessage),
}
}
@ -100,21 +100,21 @@ mod tests {
fn message() {
let node_ids = (0..3).map(|i| [i; NODE_ID_SIZE]).collect::<Vec<_>>();
let payload = [7; 10];
let message = MockMixMessage::build_message(&payload, &node_ids).unwrap();
let message = MockBlendMessage::build_message(&payload, &node_ids).unwrap();
assert_eq!(message.len(), MESSAGE_SIZE);
let (message, is_fully_unwrapped) =
MockMixMessage::unwrap_message(&message, &node_ids[0]).unwrap();
MockBlendMessage::unwrap_message(&message, &node_ids[0]).unwrap();
assert!(!is_fully_unwrapped);
assert_eq!(message.len(), MESSAGE_SIZE);
let (message, is_fully_unwrapped) =
MockMixMessage::unwrap_message(&message, &node_ids[1]).unwrap();
MockBlendMessage::unwrap_message(&message, &node_ids[1]).unwrap();
assert!(!is_fully_unwrapped);
assert_eq!(message.len(), MESSAGE_SIZE);
let (unwrapped_payload, is_fully_unwrapped) =
MockMixMessage::unwrap_message(&message, &node_ids[2]).unwrap();
MockBlendMessage::unwrap_message(&message, &node_ids[2]).unwrap();
assert!(is_fully_unwrapped);
assert_eq!(unwrapped_payload, payload);
}

View File

@ -1,7 +1,7 @@
use error::Error;
use packet::{Packet, UnpackedPacket};
use crate::MixMessage;
use crate::BlendMessage;
pub mod error;
mod layered_cipher;
@ -16,7 +16,7 @@ const ASYM_KEY_SIZE: usize = 32;
const MAX_PAYLOAD_SIZE: usize = 2048;
const MAX_LAYERS: usize = 5;
impl MixMessage for SphinxMessage {
impl BlendMessage for SphinxMessage {
type PublicKey = [u8; ASYM_KEY_SIZE];
type PrivateKey = [u8; ASYM_KEY_SIZE];
type Error = Error;

View File

@ -1,5 +1,5 @@
[package]
name = "nomos-mix-network"
name = "nomos-blend-network"
version = "0.1.0"
edition = "2021"
@ -9,10 +9,11 @@ futures = "0.3.30"
futures-timer = "3.0.3"
libp2p = "0.53"
tracing = "0.1"
nomos-mix = { path = "../core" }
nomos-mix-message = { path = "../message" }
nomos-blend = { path = "../core" }
nomos-blend-message = { path = "../message" }
sha2 = "0.10"
rand = "0.8"
opentelemetry = "0.27.1"
[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread", "time"] }

View File

@ -1,6 +1,6 @@
use crate::{
error::Error,
handler::{FromBehaviour, MixConnectionHandler, ToBehaviour},
handler::{BlendConnectionHandler, FromBehaviour, ToBehaviour},
};
use cached::{Cached, TimedCache};
use futures::Stream;
@ -13,11 +13,11 @@ use libp2p::{
},
Multiaddr, PeerId,
};
use nomos_mix::{
use nomos_blend::{
conn_maintenance::{ConnectionMaintenance, ConnectionMaintenanceSettings},
membership::Membership,
};
use nomos_mix_message::MixMessage;
use nomos_blend_message::BlendMessage;
use rand::RngCore;
use sha2::{Digest, Sha256};
use std::marker::PhantomData;
@ -32,7 +32,7 @@ use std::{
/// - receives messages from all connected peers.
pub struct Behaviour<M, R, Interval>
where
M: MixMessage,
M: BlendMessage,
R: RngCore,
{
config: Config<Interval>,
@ -46,7 +46,7 @@ where
/// An LRU time cache for storing seen messages (based on their ID). This cache prevents
/// duplicates from being propagated on the network.
duplicate_cache: TimedCache<Vec<u8>, ()>,
_mix_message: PhantomData<M>,
_blend_message: PhantomData<M>,
}
#[derive(Debug)]
@ -65,7 +65,7 @@ pub enum Event {
impl<M, R, Interval> Behaviour<M, R, Interval>
where
M: MixMessage,
M: BlendMessage,
M::PublicKey: PartialEq,
R: RngCore,
{
@ -90,7 +90,7 @@ where
events,
waker: None,
duplicate_cache,
_mix_message: Default::default(),
_blend_message: Default::default(),
}
}
@ -117,7 +117,7 @@ where
/// Forwards a message to all connected peers except the excluded peer.
///
/// Returns [`Error::NoPeers`] if there are no connected peers that support the mix protocol.
/// Returns [`Error::NoPeers`] if there are no connected peers that support the blend protocol.
fn forward_message(
&mut self,
message: Vec<u8>,
@ -185,12 +185,12 @@ where
impl<M, R, Interval> NetworkBehaviour for Behaviour<M, R, Interval>
where
M: MixMessage + 'static,
M: BlendMessage + 'static,
M::PublicKey: PartialEq + 'static,
R: RngCore + 'static,
Interval: Stream + Unpin + 'static,
{
type ConnectionHandler = MixConnectionHandler;
type ConnectionHandler = BlendConnectionHandler;
type ToSwarm = Event;
fn handle_established_inbound_connection(
@ -202,7 +202,7 @@ where
) -> Result<THandler<Self>, ConnectionDenied> {
// Keep PeerId <> Multiaddr mapping
self.peer_address_map.add(peer_id, remote_addr.clone());
Ok(MixConnectionHandler::new())
Ok(BlendConnectionHandler::new())
}
fn handle_established_outbound_connection(
@ -214,7 +214,7 @@ where
) -> Result<THandler<Self>, ConnectionDenied> {
// Keep PeerId <> Multiaddr mapping
self.peer_address_map.add(peer_id, addr.clone());
Ok(MixConnectionHandler::new())
Ok(BlendConnectionHandler::new())
}
/// Informs the behaviour about an event from the [`Swarm`].
@ -233,7 +233,7 @@ where
}
}
/// Handles an event generated by the [`MixConnectionHandler`]
/// Handles an event generated by the [`BlendConnectionHandler`]
/// dedicated to the connection identified by `peer_id` and `connection_id`.
fn on_connection_handler_event(
&mut self,
@ -277,7 +277,7 @@ where
.push_back(ToSwarm::GenerateEvent(Event::Message(message)));
}
// The connection was fully negotiated by the peer,
// which means that the peer supports the mix protocol.
// which means that the peer supports the blend protocol.
ToBehaviour::FullyNegotiatedOutbound => {
if let Some(addr) = self.peer_address_map.address(&peer_id) {
self.conn_maintenance.add_connected_peer(addr.clone());

View File

@ -14,12 +14,18 @@ use libp2p::{
Stream, StreamProtocol,
};
const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/nomos/mix/0.1.0");
// Metrics
const VALUE_FULLY_NEGOTIATED_INBOUND: &str = "fully_negotiated_inbound";
const VALUE_FULLY_NEGOTIATED_OUTBOUND: &str = "fully_negotiated_outbound";
const VALUE_DIAL_UPGRADE_ERROR: &str = "dial_upgrade_error";
const VALUE_IGNORED: &str = "ignored";
const PROTOCOL_NAME: StreamProtocol = StreamProtocol::new("/nomos/blend/0.1.0");
// TODO: Consider replacing this struct with libp2p_stream ConnectionHandler
// because we don't implement persistent emission in the per-connection level anymore.
/// A [`ConnectionHandler`] that handles the mix protocol.
pub struct MixConnectionHandler {
/// A [`ConnectionHandler`] that handles the blend protocol.
pub struct BlendConnectionHandler {
inbound_substream: Option<MsgRecvFuture>,
outbound_substream: Option<OutboundSubstreamState>,
outbound_msgs: VecDeque<Vec<u8>>,
@ -39,7 +45,7 @@ enum OutboundSubstreamState {
PendingSend(MsgSendFuture),
}
impl MixConnectionHandler {
impl BlendConnectionHandler {
pub fn new() -> Self {
Self {
inbound_substream: None,
@ -57,7 +63,7 @@ impl MixConnectionHandler {
}
}
impl Default for MixConnectionHandler {
impl Default for BlendConnectionHandler {
fn default() -> Self {
Self::new()
}
@ -71,9 +77,9 @@ pub enum FromBehaviour {
#[derive(Debug)]
pub enum ToBehaviour {
/// An outbound substream has been successfully upgraded for the mix protocol.
/// An outbound substream has been successfully upgraded for the blend protocol.
FullyNegotiatedOutbound,
/// An outbound substream was failed to be upgraded for the mix protocol.
/// An outbound substream was failed to be upgraded for the blend protocol.
NegotiationFailed,
/// A message has been received from the connection.
Message(Vec<u8>),
@ -81,7 +87,7 @@ pub enum ToBehaviour {
IOError(io::Error),
}
impl ConnectionHandler for MixConnectionHandler {
impl ConnectionHandler for BlendConnectionHandler {
type FromBehaviour = FromBehaviour;
type ToBehaviour = ToBehaviour;
type InboundProtocol = ReadyUpgrade<StreamProtocol>;
@ -99,6 +105,11 @@ impl ConnectionHandler for MixConnectionHandler {
) -> Poll<
ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
> {
tracing::info!(gauge.pending_outbound_messages = self.outbound_msgs.len() as u64,);
tracing::info!(
gauge.pending_events_to_behaviour = self.pending_events_to_behaviour.len() as u64,
);
// Process pending events to be sent to the behaviour
if let Some(event) = self.pending_events_to_behaviour.pop_front() {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
@ -202,13 +213,14 @@ impl ConnectionHandler for MixConnectionHandler {
Self::OutboundOpenInfo,
>,
) {
match event {
let event_name = match event {
ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound {
protocol: stream,
..
}) => {
tracing::debug!("FullyNegotiatedInbound: Creating inbound substream");
self.inbound_substream = Some(recv_msg(stream).boxed())
self.inbound_substream = Some(recv_msg(stream).boxed());
VALUE_FULLY_NEGOTIATED_INBOUND
}
ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound {
protocol: stream,
@ -218,6 +230,7 @@ impl ConnectionHandler for MixConnectionHandler {
self.outbound_substream = Some(OutboundSubstreamState::Idle(stream));
self.pending_events_to_behaviour
.push_back(ToBehaviour::FullyNegotiatedOutbound);
VALUE_FULLY_NEGOTIATED_OUTBOUND
}
ConnectionEvent::DialUpgradeError(e) => {
tracing::error!("DialUpgradeError: {:?}", e);
@ -234,17 +247,20 @@ impl ConnectionHandler for MixConnectionHandler {
self.pending_events_to_behaviour
.push_back(ToBehaviour::IOError(io::Error::new(
io::ErrorKind::TimedOut,
"mix protocol negotiation timed out",
"blend protocol negotiation timed out",
)));
}
StreamUpgradeError::Apply(_) => unreachable!(),
}
};
VALUE_DIAL_UPGRADE_ERROR
}
event => {
tracing::debug!("Ignoring connection event: {:?}", event)
tracing::debug!("Ignoring connection event: {:?}", event);
VALUE_IGNORED
}
}
};
tracing::info!(counter.connection_event = 1, event = event_name);
self.try_wake();
}
}

View File

@ -14,11 +14,11 @@ mod test {
swarm::{dummy, NetworkBehaviour, SwarmEvent},
Multiaddr, Swarm, SwarmBuilder,
};
use nomos_mix::{
use nomos_blend::{
conn_maintenance::{ConnectionMaintenanceSettings, ConnectionMonitorSettings},
membership::{Membership, Node},
};
use nomos_mix_message::{mock::MockMixMessage, MixMessage};
use nomos_blend_message::{mock::MockBlendMessage, BlendMessage};
use rand::{rngs::ThreadRng, thread_rng};
use tokio::select;
use tokio_stream::wrappers::IntervalStream;
@ -28,7 +28,7 @@ mod test {
/// Check that a published messsage arrives in the peers successfully.
#[tokio::test]
async fn behaviour() {
// Initialize two swarms that support the mix protocol.
// Initialize two swarms that support the blend protocol.
let nodes = nodes(2, 8090);
let mut swarm1 = new_swarm(Membership::new(nodes.clone(), nodes[0].public_key));
let mut swarm2 = new_swarm(Membership::new(nodes.clone(), nodes[1].public_key));
@ -66,16 +66,16 @@ mod test {
.is_ok());
}
/// If the peer doesn't support the mix protocol, the message should not be forwarded to the peer.
/// If the peer doesn't support the blend protocol, the message should not be forwarded to the peer.
#[tokio::test]
async fn peer_not_support_mix_protocol() {
// Only swarm2 supports the mix protocol.
async fn peer_not_support_blend_protocol() {
// Only swarm2 supports the blend protocol.
let nodes = nodes(2, 8190);
let mut swarm1 = new_swarm_without_mix(nodes[0].address.clone());
let mut swarm1 = new_swarm_without_blend(nodes[0].address.clone());
let mut swarm2 = new_swarm(Membership::new(nodes.clone(), nodes[1].public_key));
// Expect all publish attempts to fail with [`Error::NoPeers`]
// because swarm2 doesn't have any peers that support the mix protocol.
// because swarm2 doesn't have any peers that support the blend protocol.
let msg = vec![1; 10];
let mut publish_try_interval = tokio::time::interval(Duration::from_secs(1));
let mut publish_try_count = 0;
@ -95,8 +95,8 @@ mod test {
}
fn new_swarm(
membership: Membership<MockMixMessage>,
) -> Swarm<Behaviour<MockMixMessage, ThreadRng, IntervalStream>> {
membership: Membership<MockBlendMessage>,
) -> Swarm<Behaviour<MockBlendMessage, ThreadRng, IntervalStream>> {
let conn_maintenance_settings = ConnectionMaintenanceSettings {
peering_degree: membership.size() - 1, // excluding the local node
max_peering_degree: membership.size() * 2,
@ -127,7 +127,7 @@ mod test {
swarm
}
fn new_swarm_without_mix(addr: Multiaddr) -> Swarm<dummy::Behaviour> {
fn new_swarm_without_blend(addr: Multiaddr) -> Swarm<dummy::Behaviour> {
let mut swarm = new_swarm_with_behaviour(dummy::Behaviour);
swarm.listen_on(addr).unwrap();
swarm
@ -151,7 +151,7 @@ mod test {
fn nodes(
count: usize,
base_port: usize,
) -> Vec<Node<<MockMixMessage as MixMessage>::PublicKey>> {
) -> Vec<Node<<MockBlendMessage as BlendMessage>::PublicKey>> {
(0..count)
.map(|i| Node {
address: format!("/ip4/127.0.0.1/udp/{}/quic-v1", base_port + i)

View File

@ -47,4 +47,12 @@ impl<
pub fn from_bytes(bytes: &[u8]) -> Self {
wire::deserialize(bytes).unwrap()
}
pub fn cl_transactions_len(&self) -> usize {
self.cl_transactions.len()
}
pub fn bl_blobs_len(&self) -> usize {
self.bl_blobs.len()
}
}

View File

@ -70,6 +70,10 @@ impl Header {
&self.orphaned_leader_proofs
}
pub fn content_size(&self) -> u32 {
self.content_size
}
pub fn new(
parent: HeaderId,
content_size: u32,

View File

@ -24,6 +24,17 @@ pub enum DispersalEvent {
/// Received a n
IncomingMessage { message: DispersalReq },
}
impl DispersalEvent {
pub fn blob_size(&self) -> Option<usize> {
match self {
DispersalEvent::IncomingMessage { message } => {
message.blob.as_ref().map(|blob| blob.data.len())
}
}
}
}
pub struct DispersalValidatorBehaviour<Membership> {
stream_behaviour: libp2p_stream::Behaviour,
incoming_streams: IncomingStreams,

View File

@ -12,7 +12,6 @@ use libp2p::swarm::{
};
use libp2p::{Multiaddr, PeerId};
use log::{error, trace};
use subnetworks_assignations::MembershipHandler;
use crate::SubnetworkId;
@ -31,6 +30,16 @@ pub enum ReplicationEvent {
IncomingMessage { peer_id: PeerId, message: DaMessage },
}
impl ReplicationEvent {
pub fn blob_size(&self) -> Option<usize> {
match self {
ReplicationEvent::IncomingMessage { message, .. } => {
message.blob.as_ref().map(|blob| blob.data.len())
}
}
}
}
/// Nomos DA broadcas network behaviour
/// This item handles the logic of the nomos da subnetworks broadcasting
/// DA subnetworks are a logical distribution of subsets.

View File

@ -29,6 +29,12 @@ use crate::swarm::validator::ValidatorEventsStream;
use crate::SubnetworkId;
use subnetworks_assignations::MembershipHandler;
// Metrics
const EVENT_SAMPLING: &str = "sampling";
const EVENT_DISPERSAL_EXECUTOR_DISPERSAL: &str = "dispersal_executor_event";
const EVENT_VALIDATOR_DISPERSAL: &str = "validator_dispersal";
const EVENT_REPLICATION: &str = "replication";
pub struct ExecutorEventsStream {
pub validator_events_stream: ValidatorEventsStream,
pub dispersal_events_receiver: UnboundedReceiverStream<DispersalExecutorEvent>,
@ -58,6 +64,7 @@ where
let sampling_events_receiver = UnboundedReceiverStream::new(sampling_events_receiver);
let validation_events_receiver = UnboundedReceiverStream::new(validation_events_receiver);
let dispersal_events_receiver = UnboundedReceiverStream::new(dispersal_events_receiver);
(
Self {
swarm: Self::build_swarm(key, membership, addresses),
@ -161,15 +168,33 @@ where
async fn handle_behaviour_event(&mut self, event: ExecutorBehaviourEvent<Membership>) {
match event {
ExecutorBehaviourEvent::Sampling(event) => {
tracing::info!(
counter.behaviour_events_received = 1,
event = EVENT_SAMPLING
);
self.handle_sampling_event(event).await;
}
ExecutorBehaviourEvent::ExecutorDispersal(event) => {
tracing::info!(
counter.behaviour_events_received = 1,
event = EVENT_DISPERSAL_EXECUTOR_DISPERSAL
);
self.handle_executor_dispersal_event(event).await;
}
ExecutorBehaviourEvent::ValidatorDispersal(event) => {
tracing::info!(
counter.behaviour_events_received = 1,
event = EVENT_VALIDATOR_DISPERSAL,
blob_size = event.blob_size()
);
self.handle_validator_dispersal_event(event).await;
}
ExecutorBehaviourEvent::Replication(event) => {
tracing::info!(
counter.behaviour_events_received = 1,
event = EVENT_REPLICATION,
blob_size = event.blob_size()
);
self.handle_replication_event(event).await;
}
}

View File

@ -25,6 +25,11 @@ use crate::swarm::common::{
use crate::SubnetworkId;
use subnetworks_assignations::MembershipHandler;
// Metrics
const EVENT_SAMPLING: &str = "sampling";
const EVENT_VALIDATOR_DISPERSAL: &str = "validator_dispersal";
const EVENT_REPLICATION: &str = "replication";
pub struct ValidatorEventsStream {
pub sampling_events_receiver: UnboundedReceiverStream<SamplingEvent>,
pub validation_events_receiver: UnboundedReceiverStream<DaBlob>,
@ -52,6 +57,7 @@ where
let sampling_events_receiver = UnboundedReceiverStream::new(sampling_events_receiver);
let validation_events_receiver = UnboundedReceiverStream::new(validation_events_receiver);
(
Self {
swarm: Self::build_swarm(key, membership, addresses),
@ -131,12 +137,26 @@ where
async fn handle_behaviour_event(&mut self, event: ValidatorBehaviourEvent<Membership>) {
match event {
ValidatorBehaviourEvent::Sampling(event) => {
tracing::info!(
counter.behaviour_events_received = 1,
event = EVENT_SAMPLING
);
self.handle_sampling_event(event).await;
}
ValidatorBehaviourEvent::Dispersal(event) => {
tracing::info!(
counter.behaviour_events_received = 1,
event = EVENT_VALIDATOR_DISPERSAL,
blob_size = event.blob_size()
);
self.handle_dispersal_event(event).await;
}
ValidatorBehaviourEvent::Replication(event) => {
tracing::info!(
counter.behaviour_events_received = 1,
event = EVENT_REPLICATION,
blob_size = event.blob_size()
);
self.handle_replication_event(event).await;
}
}

View File

@ -1,19 +0,0 @@
pub mod conn_maintenance;
pub mod cover_traffic;
pub mod membership;
pub mod message_blend;
pub mod persistent_transmission;
pub enum MixOutgoingMessage {
FullyUnwrapped(Vec<u8>),
Outbound(Vec<u8>),
}
impl From<MixOutgoingMessage> for Vec<u8> {
fn from(value: MixOutgoingMessage) -> Self {
match value {
MixOutgoingMessage::FullyUnwrapped(v) => v,
MixOutgoingMessage::Outbound(v) => v,
}
}
}

View File

@ -16,7 +16,7 @@ tracing = "0.1"
nomos-core = { path = "../../nomos-core/chain-defs" }
cryptarchia-consensus = { path = "../cryptarchia-consensus" }
nomos-network = { path = "../../nomos-services/network" }
nomos-mix-service = { path = "../../nomos-services/mix", features = ["libp2p"] }
nomos-blend-service = { path = "../../nomos-services/blend", features = ["libp2p"] }
nomos-mempool = { path = "../../nomos-services/mempool", features = [
"mock",
"libp2p",

View File

@ -7,11 +7,12 @@ use tokio::sync::oneshot;
use crate::http::DynError;
use cryptarchia_consensus::{
mix::adapters::libp2p::LibP2pAdapter as MixAdapter,
blend::adapters::libp2p::LibP2pAdapter as BlendAdapter,
network::adapters::libp2p::LibP2pAdapter as ConsensusNetworkAdapter, ConsensusMsg,
CryptarchiaConsensus, CryptarchiaInfo,
};
use kzgrs_backend::dispersal::BlobInfo;
use nomos_blend_service::network::libp2p::Libp2pAdapter as BlendNetworkAdapter;
use nomos_core::{
da::{
blob::{self, select::FillSize as FillSizeWithBlobs},
@ -24,7 +25,6 @@ use nomos_da_sampling::backend::DaSamplingServiceBackend;
use nomos_mempool::{
backend::mockpool::MockPool, network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter,
};
use nomos_mix_service::network::libp2p::Libp2pAdapter as MixNetworkAdapter;
use nomos_storage::backends::{rocksdb::RocksBackend, StorageSerde};
pub type Cryptarchia<
@ -37,7 +37,7 @@ pub type Cryptarchia<
const SIZE: usize,
> = CryptarchiaConsensus<
ConsensusNetworkAdapter<Tx, BlobInfo>,
MixAdapter<MixNetworkAdapter, Tx, BlobInfo>,
BlendAdapter<BlendNetworkAdapter, Tx, BlobInfo>,
MockPool<HeaderId, Tx, <Tx as Transaction>::Hash>,
MempoolNetworkAdapter<Tx, <Tx as Transaction>::Hash>,
MockPool<HeaderId, BlobInfo, <BlobInfo as blob::info::DispersedBlobInfo>::BlobId>,

View File

@ -1,5 +1,6 @@
use bytes::Bytes;
use core::ops::Range;
use nomos_blend_service::network::libp2p::Libp2pAdapter as BlendNetworkAdapter;
use nomos_core::da::blob::info::DispersedBlobInfo;
use nomos_core::da::blob::{metadata, select::FillSize as FillSizeWithBlobs, Blob};
use nomos_core::da::{BlobId, DaVerifier as CoreDaVerifier};
@ -23,7 +24,6 @@ use nomos_da_verifier::{DaVerifierMsg, DaVerifierService};
use nomos_libp2p::PeerId;
use nomos_mempool::backend::mockpool::MockPool;
use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAdapter;
use nomos_mix_service::network::libp2p::Libp2pAdapter as MixNetworkAdapter;
use nomos_storage::backends::rocksdb::RocksBackend;
use nomos_storage::backends::StorageSerde;
use overwatch_rs::overwatch::handle::OverwatchHandle;
@ -54,7 +54,7 @@ pub type DaIndexer<
CryptarchiaConsensusAdapter<Tx, V>,
// Cryptarchia specific, should be the same as in `Cryptarchia` type above.
cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter<Tx, V>,
cryptarchia_consensus::mix::adapters::libp2p::LibP2pAdapter<MixNetworkAdapter, Tx, V>,
cryptarchia_consensus::blend::adapters::libp2p::LibP2pAdapter<BlendNetworkAdapter, Tx, V>,
MockPool<HeaderId, Tx, <Tx as Transaction>::Hash>,
MempoolNetworkAdapter<Tx, <Tx as Transaction>::Hash>,
MockPool<HeaderId, V, [u8; 32]>,

View File

@ -1,5 +1,5 @@
[package]
name = "nomos-mix-service"
name = "nomos-blend-service"
version = "0.1.0"
edition = "2021"
@ -8,10 +8,10 @@ async-trait = "0.1"
futures = "0.3"
libp2p = { version = "0.53", features = ["ed25519"] }
nomos-libp2p = { path = "../../nomos-libp2p", optional = true }
nomos-mix = { path = "../../nomos-mix/core" }
nomos-blend = { path = "../../nomos-blend/core" }
nomos-core = { path = "../../nomos-core/chain-defs" }
nomos-mix-network = { path = "../../nomos-mix/network" }
nomos-mix-message = { path = "../../nomos-mix/message" }
nomos-blend-network = { path = "../../nomos-blend/network" }
nomos-blend-message = { path = "../../nomos-blend/message" }
nomos-network = { path = "../network" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
rand = "0.8.5"

View File

@ -1,6 +1,6 @@
use std::{pin::Pin, time::Duration};
use super::MixBackend;
use super::BlendBackend;
use async_trait::async_trait;
use futures::{Stream, StreamExt};
use libp2p::{
@ -8,9 +8,9 @@ use libp2p::{
swarm::SwarmEvent,
Multiaddr, Swarm, SwarmBuilder,
};
use nomos_blend::{conn_maintenance::ConnectionMaintenanceSettings, membership::Membership};
use nomos_blend_message::sphinx::SphinxMessage;
use nomos_libp2p::secret_key_serde;
use nomos_mix::{conn_maintenance::ConnectionMaintenanceSettings, membership::Membership};
use nomos_mix_message::sphinx::SphinxMessage;
use overwatch_rs::overwatch::handle::OverwatchHandle;
use rand::RngCore;
use serde::{Deserialize, Serialize};
@ -20,16 +20,16 @@ use tokio::{
};
use tokio_stream::wrappers::{BroadcastStream, IntervalStream};
/// A mix backend that uses the libp2p network stack.
pub struct Libp2pMixBackend {
/// A blend backend that uses the libp2p network stack.
pub struct Libp2pBlendBackend {
#[allow(dead_code)]
task: JoinHandle<()>,
swarm_message_sender: mpsc::Sender<MixSwarmMessage>,
swarm_message_sender: mpsc::Sender<BlendSwarmMessage>,
incoming_message_sender: broadcast::Sender<Vec<u8>>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Libp2pMixBackendSettings {
pub struct Libp2pBlendBackendSettings {
pub listening_address: Multiaddr,
// A key for deriving PeerId and establishing secure connections (TLS 1.3 by QUIC)
#[serde(with = "secret_key_serde", default = "ed25519::SecretKey::generate")]
@ -40,8 +40,8 @@ pub struct Libp2pMixBackendSettings {
const CHANNEL_SIZE: usize = 64;
#[async_trait]
impl MixBackend for Libp2pMixBackend {
type Settings = Libp2pMixBackendSettings;
impl BlendBackend for Libp2pBlendBackend {
type Settings = Libp2pBlendBackendSettings;
fn new<R>(
config: Self::Settings,
@ -55,7 +55,7 @@ impl MixBackend for Libp2pMixBackend {
let (swarm_message_sender, swarm_message_receiver) = mpsc::channel(CHANNEL_SIZE);
let (incoming_message_sender, _) = broadcast::channel(CHANNEL_SIZE);
let mut swarm = MixSwarm::new(
let mut swarm = BlendSwarm::new(
config,
membership,
rng,
@ -77,10 +77,10 @@ impl MixBackend for Libp2pMixBackend {
async fn publish(&self, msg: Vec<u8>) {
if let Err(e) = self
.swarm_message_sender
.send(MixSwarmMessage::Publish(msg))
.send(BlendSwarmMessage::Publish(msg))
.await
{
tracing::error!("Failed to send message to MixSwarm: {e}");
tracing::error!("Failed to send message to BlendSwarm: {e}");
}
}
@ -92,29 +92,29 @@ impl MixBackend for Libp2pMixBackend {
}
}
struct MixSwarm<R>
struct BlendSwarm<R>
where
R: RngCore + 'static,
{
swarm: Swarm<nomos_mix_network::Behaviour<SphinxMessage, R, IntervalStream>>,
swarm_messages_receiver: mpsc::Receiver<MixSwarmMessage>,
swarm: Swarm<nomos_blend_network::Behaviour<SphinxMessage, R, IntervalStream>>,
swarm_messages_receiver: mpsc::Receiver<BlendSwarmMessage>,
incoming_message_sender: broadcast::Sender<Vec<u8>>,
}
#[derive(Debug)]
pub enum MixSwarmMessage {
pub enum BlendSwarmMessage {
Publish(Vec<u8>),
}
impl<R> MixSwarm<R>
impl<R> BlendSwarm<R>
where
R: RngCore + 'static,
{
fn new(
config: Libp2pMixBackendSettings,
config: Libp2pBlendBackendSettings,
membership: Membership<SphinxMessage>,
rng: R,
swarm_messages_receiver: mpsc::Receiver<MixSwarmMessage>,
swarm_messages_receiver: mpsc::Receiver<BlendSwarmMessage>,
incoming_message_sender: broadcast::Sender<Vec<u8>>,
) -> Self {
let keypair = Keypair::from(ed25519::Keypair::from(config.node_key.clone()));
@ -126,8 +126,8 @@ where
config.conn_maintenance.monitor.as_ref().map(|monitor| {
IntervalStream::new(tokio::time::interval(monitor.time_window))
});
nomos_mix_network::Behaviour::new(
nomos_mix_network::Config {
nomos_blend_network::Behaviour::new(
nomos_blend_network::Config {
duplicate_cache_lifespan: 60,
conn_maintenance_settings: config.conn_maintenance,
conn_maintenance_interval,
@ -136,7 +136,7 @@ where
rng,
)
})
.expect("Mix Behaviour should be built")
.expect("Blend Behaviour should be built")
.with_swarm_config(|cfg| {
cfg.with_idle_connection_timeout(Duration::from_secs(u64::MAX))
})
@ -145,7 +145,7 @@ where
swarm
.listen_on(config.listening_address)
.unwrap_or_else(|e| {
panic!("Failed to listen on Mix network: {e:?}");
panic!("Failed to listen on Blend network: {e:?}");
});
Self {
@ -168,29 +168,42 @@ where
}
}
async fn handle_swarm_message(&mut self, msg: MixSwarmMessage) {
async fn handle_swarm_message(&mut self, msg: BlendSwarmMessage) {
match msg {
MixSwarmMessage::Publish(msg) => {
BlendSwarmMessage::Publish(msg) => {
let msg_size = msg.len();
if let Err(e) = self.swarm.behaviour_mut().publish(msg) {
tracing::error!("Failed to publish message to mix network: {e:?}");
tracing::error!("Failed to publish message to blend network: {e:?}");
tracing::info!(counter.failed_outbound_messages = 1);
} else {
tracing::info!(counter.successful_outbound_messages = 1);
tracing::info!(histogram.sent_data = msg_size as u64);
}
}
}
}
fn handle_event(&mut self, event: SwarmEvent<nomos_mix_network::Event>) {
fn handle_event(&mut self, event: SwarmEvent<nomos_blend_network::Event>) {
match event {
SwarmEvent::Behaviour(nomos_mix_network::Event::Message(msg)) => {
SwarmEvent::Behaviour(nomos_blend_network::Event::Message(msg)) => {
tracing::debug!("Received message from a peer: {msg:?}");
let msg_size = msg.len();
if let Err(e) = self.incoming_message_sender.send(msg) {
tracing::error!("Failed to send incoming message to channel: {e}");
tracing::info!(counter.failed_inbound_messages = 1);
} else {
tracing::info!(counter.successful_inbound_messages = 1);
tracing::info!(histogram.received_data = msg_size as u64);
}
}
SwarmEvent::Behaviour(nomos_mix_network::Event::Error(e)) => {
tracing::error!("Received error from mix network: {e:?}");
SwarmEvent::Behaviour(nomos_blend_network::Event::Error(e)) => {
tracing::error!("Received error from blend network: {e:?}");
tracing::info!(counter.error = 1);
}
_ => {
tracing::debug!("Received event from mix network: {event:?}");
tracing::debug!("Received event from blend network: {event:?}");
tracing::info!(counter.ignored_event = 1);
}
}
}

View File

@ -4,14 +4,14 @@ pub mod libp2p;
use std::{fmt::Debug, pin::Pin};
use futures::Stream;
use nomos_mix::membership::Membership;
use nomos_mix_message::sphinx::SphinxMessage;
use nomos_blend::membership::Membership;
use nomos_blend_message::sphinx::SphinxMessage;
use overwatch_rs::overwatch::handle::OverwatchHandle;
use rand::RngCore;
/// A trait for mix backends that send messages to the mix network.
/// A trait for blend backends that send messages to the blend network.
#[async_trait::async_trait]
pub trait MixBackend {
pub trait BlendBackend {
type Settings: Clone + Debug + Send + Sync + 'static;
fn new<R>(
@ -22,8 +22,8 @@ pub trait MixBackend {
) -> Self
where
R: RngCore + Send + 'static;
/// Publish a message to the mix network.
/// Publish a message to the blend network.
async fn publish(&self, msg: Vec<u8>);
/// Listen to messages received from the mix network.
/// Listen to messages received from the blend network.
fn listen_to_incoming_messages(&mut self) -> Pin<Box<dyn Stream<Item = Vec<u8>> + Send>>;
}

View File

@ -2,22 +2,22 @@ pub mod backends;
pub mod network;
use async_trait::async_trait;
use backends::MixBackend;
use backends::BlendBackend;
use futures::StreamExt;
use network::NetworkAdapter;
use nomos_core::wire;
use nomos_mix::message_blend::temporal::TemporalScheduler;
use nomos_mix::message_blend::{crypto::CryptographicProcessor, CryptographicProcessorSettings};
use nomos_mix::message_blend::{MessageBlendExt, MessageBlendSettings};
use nomos_mix::persistent_transmission::{
use nomos_blend::message_blend::temporal::TemporalScheduler;
use nomos_blend::message_blend::{crypto::CryptographicProcessor, CryptographicProcessorSettings};
use nomos_blend::message_blend::{MessageBlendExt, MessageBlendSettings};
use nomos_blend::persistent_transmission::{
PersistentTransmissionExt, PersistentTransmissionSettings, PersistentTransmissionStream,
};
use nomos_mix::MixOutgoingMessage;
use nomos_mix::{
use nomos_blend::BlendOutgoingMessage;
use nomos_blend::{
cover_traffic::{CoverTraffic, CoverTrafficSettings},
membership::{Membership, Node},
};
use nomos_mix_message::{sphinx::SphinxMessage, MixMessage};
use nomos_blend_message::{sphinx::SphinxMessage, BlendMessage};
use nomos_core::wire;
use nomos_network::NetworkService;
use overwatch_rs::services::{
handle::ServiceStateHandle,
@ -35,14 +35,14 @@ use tokio::sync::mpsc;
use tokio::time;
use tokio_stream::wrappers::{IntervalStream, UnboundedReceiverStream};
/// A mix service that sends messages to the mix network
/// A blend service that sends messages to the blend network
/// and broadcasts fully unwrapped messages through the [`NetworkService`].
///
/// The mix backend and the network adapter are generic types that are independent with each other.
/// For example, the mix backend can use the libp2p network stack, while the network adapter can use the other network backend.
pub struct MixService<Backend, Network>
/// The blend backend and the network adapter are generic types that are independent with each other.
/// For example, the blend backend can use the libp2p network stack, while the network adapter can use the other network backend.
pub struct BlendService<Backend, Network>
where
Backend: MixBackend + 'static,
Backend: BlendBackend + 'static,
Backend::Settings: Clone + Debug,
Network: NetworkAdapter,
Network::BroadcastSettings: Clone + Debug + Serialize + DeserializeOwned,
@ -53,24 +53,24 @@ where
membership: Membership<SphinxMessage>,
}
impl<Backend, Network> ServiceData for MixService<Backend, Network>
impl<Backend, Network> ServiceData for BlendService<Backend, Network>
where
Backend: MixBackend + 'static,
Backend: BlendBackend + 'static,
Backend::Settings: Clone,
Network: NetworkAdapter,
Network::BroadcastSettings: Clone + Debug + Serialize + DeserializeOwned,
{
const SERVICE_ID: ServiceId = "Mix";
type Settings = MixConfig<Backend::Settings>;
const SERVICE_ID: ServiceId = "Blend";
type Settings = BlendConfig<Backend::Settings>;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = ServiceMessage<Network::BroadcastSettings>;
}
#[async_trait]
impl<Backend, Network> ServiceCore for MixService<Backend, Network>
impl<Backend, Network> ServiceCore for BlendService<Backend, Network>
where
Backend: MixBackend + Send + 'static,
Backend: BlendBackend + Send + 'static,
Backend::Settings: Clone,
Network: NetworkAdapter + Send + Sync + 'static,
Network::BroadcastSettings:
@ -78,17 +78,17 @@ where
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let network_relay = service_state.overwatch_handle.relay();
let mix_config = service_state.settings_reader.get_updated_settings();
let blend_config = service_state.settings_reader.get_updated_settings();
Ok(Self {
backend: <Backend as MixBackend>::new(
backend: <Backend as BlendBackend>::new(
service_state.settings_reader.get_updated_settings().backend,
service_state.overwatch_handle.clone(),
mix_config.membership(),
blend_config.membership(),
ChaCha12Rng::from_entropy(),
),
service_state,
network_relay,
membership: mix_config.membership(),
membership: blend_config.membership(),
})
}
@ -99,9 +99,9 @@ where
network_relay,
membership,
} = self;
let mix_config = service_state.settings_reader.get_updated_settings();
let blend_config = service_state.settings_reader.get_updated_settings();
let mut cryptographic_processor = CryptographicProcessor::new(
mix_config.message_blend.cryptographic_processor.clone(),
blend_config.message_blend.cryptographic_processor.clone(),
membership.clone(),
ChaCha12Rng::from_entropy(),
);
@ -116,21 +116,21 @@ where
SphinxMessage,
_,
> = UnboundedReceiverStream::new(persistent_receiver).persistent_transmission(
mix_config.persistent_transmission,
blend_config.persistent_transmission,
ChaCha12Rng::from_entropy(),
IntervalStream::new(time::interval(Duration::from_secs_f64(
1.0 / mix_config.persistent_transmission.max_emission_frequency,
1.0 / blend_config.persistent_transmission.max_emission_frequency,
)))
.map(|_| ()),
);
// tier 2 blend
let temporal_scheduler = TemporalScheduler::new(
mix_config.message_blend.temporal_processor,
blend_config.message_blend.temporal_processor,
ChaCha12Rng::from_entropy(),
);
let mut blend_messages = backend.listen_to_incoming_messages().blend(
mix_config.message_blend.clone(),
blend_config.message_blend.clone(),
membership.clone(),
temporal_scheduler,
ChaCha12Rng::from_entropy(),
@ -138,21 +138,22 @@ where
// tier 3 cover traffic
let mut cover_traffic: CoverTraffic<_, _, SphinxMessage> = CoverTraffic::new(
mix_config.cover_traffic.cover_traffic_settings(
blend_config.cover_traffic.cover_traffic_settings(
&membership,
&mix_config.message_blend.cryptographic_processor,
&blend_config.message_blend.cryptographic_processor,
),
mix_config.cover_traffic.epoch_stream(),
mix_config.cover_traffic.slot_stream(),
blend_config.cover_traffic.epoch_stream(),
blend_config.cover_traffic.slot_stream(),
);
// local messages, are bypassed and send immediately
let mut local_messages = service_state
.inbound_relay
.map(|ServiceMessage::Mix(message)| {
wire::serialize(&message)
.expect("Message from internal services should not fail to serialize")
});
let mut local_messages =
service_state
.inbound_relay
.map(|ServiceMessage::Blend(message)| {
wire::serialize(&message)
.expect("Message from internal services should not fail to serialize")
});
let mut lifecycle_stream = service_state.lifecycle_handle.message_stream();
loop {
@ -163,19 +164,19 @@ where
// Already processed blend messages
Some(msg) = blend_messages.next() => {
match msg {
MixOutgoingMessage::Outbound(msg) => {
BlendOutgoingMessage::Outbound(msg) => {
if let Err(e) = persistent_sender.send(msg) {
tracing::error!("Error sending message to persistent stream: {e}");
}
}
MixOutgoingMessage::FullyUnwrapped(msg) => {
BlendOutgoingMessage::FullyUnwrapped(msg) => {
tracing::debug!("Broadcasting fully unwrapped message");
match wire::deserialize::<NetworkMessage<Network::BroadcastSettings>>(&msg) {
Ok(msg) => {
network_adapter.broadcast(msg.message, msg.broadcast_settings).await;
},
_ => {
tracing::debug!("unrecognized message from mix backend");
tracing::debug!("unrecognized message from blend backend");
}
}
}
@ -199,9 +200,9 @@ where
}
}
impl<Backend, Network> MixService<Backend, Network>
impl<Backend, Network> BlendService<Backend, Network>
where
Backend: MixBackend + Send + 'static,
Backend: BlendBackend + Send + 'static,
Backend::Settings: Clone,
Network: NetworkAdapter,
Network::BroadcastSettings: Clone + Debug + Serialize + DeserializeOwned,
@ -241,12 +242,12 @@ where
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct MixConfig<BackendSettings> {
pub struct BlendConfig<BackendSettings> {
pub backend: BackendSettings,
pub message_blend: MessageBlendSettings<SphinxMessage>,
pub persistent_transmission: PersistentTransmissionSettings,
pub cover_traffic: CoverTrafficExtSettings,
pub membership: Vec<Node<<SphinxMessage as nomos_mix_message::MixMessage>::PublicKey>>,
pub membership: Vec<Node<<SphinxMessage as nomos_blend_message::BlendMessage>::PublicKey>>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
@ -260,12 +261,12 @@ impl CoverTrafficExtSettings {
&self,
membership: &Membership<SphinxMessage>,
cryptographic_processor_settings: &CryptographicProcessorSettings<
<SphinxMessage as MixMessage>::PrivateKey,
<SphinxMessage as BlendMessage>::PrivateKey,
>,
) -> CoverTrafficSettings {
CoverTrafficSettings {
node_id: membership.local_node().public_key,
number_of_hops: cryptographic_processor_settings.num_mix_layers,
number_of_hops: cryptographic_processor_settings.num_blend_layers,
slots_per_epoch: self.slots_per_epoch(),
network_size: membership.size(),
}
@ -301,7 +302,7 @@ impl CoverTrafficExtSettings {
}
}
impl<BackendSettings> MixConfig<BackendSettings> {
impl<BackendSettings> BlendConfig<BackendSettings> {
fn membership(&self) -> Membership<SphinxMessage> {
let public_key = x25519_dalek::PublicKey::from(&x25519_dalek::StaticSecret::from(
self.message_blend.cryptographic_processor.private_key,
@ -311,11 +312,11 @@ impl<BackendSettings> MixConfig<BackendSettings> {
}
}
/// A message that is handled by [`MixService`].
/// A message that is handled by [`BlendService`].
#[derive(Debug)]
pub enum ServiceMessage<BroadcastSettings> {
/// To send a message to the mix network and eventually broadcast it to the [`NetworkService`].
Mix(NetworkMessage<BroadcastSettings>),
/// To send a message to the blend network and eventually broadcast it to the [`NetworkService`].
Blend(NetworkMessage<BroadcastSettings>),
}
impl<BroadcastSettings> RelayMessage for ServiceMessage<BroadcastSettings> where
@ -323,7 +324,7 @@ impl<BroadcastSettings> RelayMessage for ServiceMessage<BroadcastSettings> where
{
}
/// A message that is sent to the mix network.
/// A message that is sent to the blend network.
/// To eventually broadcast the message to the network service,
/// [`BroadcastSettings`] must be included in the [`NetworkMessage`].
/// [`BroadcastSettings`] is a generic type defined by [`NetworkAdapter`].

View File

@ -10,7 +10,7 @@ use overwatch_rs::services::ServiceData;
use serde::{de::DeserializeOwned, Serialize};
/// A trait for communicating with the network service, which is used to broadcast
/// fully unwrapped messages returned from the mix backend.
/// fully unwrapped messages returned from the blend backend.
#[async_trait::async_trait]
pub trait NetworkAdapter {
/// The network backend used by the network service.

View File

@ -16,7 +16,7 @@ cl = { path = "../../nomos-core/cl" }
futures = "0.3"
nomos-da-sampling = { path = "../data-availability/sampling" }
nomos-network = { path = "../network" }
nomos-mix-service = { path = "../mix" }
nomos-blend-service = { path = "../blend" }
nomos-mempool = { path = "../mempool" }
nomos-core = { path = "../../nomos-core/chain-defs" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
@ -42,7 +42,7 @@ serde_json = { version = "1", optional = true }
[features]
default = []
libp2p = ["nomos-network/libp2p", "nomos-mix-service/libp2p", "nomos-libp2p"]
libp2p = ["nomos-network/libp2p", "nomos-blend-service/libp2p", "nomos-libp2p"]
openapi = ["dep:utoipa", "serde_json"]
[dev-dependencies]

View File

@ -1,13 +1,13 @@
use std::{hash::Hash, marker::PhantomData};
use nomos_core::{block::Block, wire};
use nomos_mix_service::{
backends::libp2p::Libp2pMixBackend, network::NetworkAdapter, MixService, ServiceMessage,
use nomos_blend_service::{
backends::libp2p::Libp2pBlendBackend, network::NetworkAdapter, BlendService, ServiceMessage,
};
use nomos_core::{block::Block, wire};
use overwatch_rs::services::{relay::OutboundRelay, ServiceData};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use crate::{messages::NetworkMessage, mix::MixAdapter};
use crate::{blend::BlendAdapter, messages::NetworkMessage};
#[derive(Clone)]
pub struct LibP2pAdapter<Network, Tx, BlobCert>
@ -18,13 +18,13 @@ where
BlobCert: Clone + Eq + Hash,
{
settings: LibP2pAdapterSettings<Network::BroadcastSettings>,
mix_relay: OutboundRelay<<MixService<Libp2pMixBackend, Network> as ServiceData>::Message>,
blend_relay: OutboundRelay<<BlendService<Libp2pBlendBackend, Network> as ServiceData>::Message>,
_tx: PhantomData<Tx>,
_blob_cert: PhantomData<BlobCert>,
}
#[async_trait::async_trait]
impl<Network, Tx, BlobCert> MixAdapter for LibP2pAdapter<Network, Tx, BlobCert>
impl<Network, Tx, BlobCert> BlendAdapter for LibP2pAdapter<Network, Tx, BlobCert>
where
Network: NetworkAdapter + 'static,
Network::BroadcastSettings: Clone,
@ -32,15 +32,15 @@ where
BlobCert: Serialize + DeserializeOwned + Clone + Eq + Hash + Send + Sync + 'static,
{
type Settings = LibP2pAdapterSettings<Network::BroadcastSettings>;
type Backend = Libp2pMixBackend;
type Backend = Libp2pBlendBackend;
type Network = Network;
type Tx = Tx;
type BlobCertificate = BlobCert;
async fn new(
settings: Self::Settings,
mix_relay: OutboundRelay<
<MixService<Self::Backend, Self::Network> as ServiceData>::Message,
blend_relay: OutboundRelay<
<BlendService<Self::Backend, Self::Network> as ServiceData>::Message,
>,
) -> Self {
// this wait seems to be helpful in some cases since we give the time
@ -50,22 +50,22 @@ where
Self {
settings,
mix_relay,
blend_relay,
_tx: PhantomData,
_blob_cert: PhantomData,
}
}
async fn mix(&self, block: Block<Self::Tx, Self::BlobCertificate>) {
async fn blend(&self, block: Block<Self::Tx, Self::BlobCertificate>) {
if let Err((e, msg)) = self
.mix_relay
.send(ServiceMessage::Mix(nomos_mix_service::NetworkMessage {
.blend_relay
.send(ServiceMessage::Blend(nomos_blend_service::NetworkMessage {
message: wire::serialize(&NetworkMessage::Block(block)).unwrap(),
broadcast_settings: self.settings.broadcast_settings.clone(),
}))
.await
{
tracing::error!("error sending message to mix network: {e}: {msg:?}",);
tracing::error!("error sending message to blend network: {e}: {msg:?}",);
}
}
}

View File

@ -4,8 +4,8 @@ pub mod adapters;
use nomos_core::block::Block;
use std::hash::Hash;
// crates
use nomos_mix_service::network::NetworkAdapter;
use nomos_mix_service::{backends::MixBackend, MixService};
use nomos_blend_service::network::NetworkAdapter;
use nomos_blend_service::{backends::BlendBackend, BlendService};
// internal
use overwatch_rs::services::relay::OutboundRelay;
use overwatch_rs::services::ServiceData;
@ -13,17 +13,17 @@ use serde::de::DeserializeOwned;
use serde::Serialize;
#[async_trait::async_trait]
pub trait MixAdapter {
pub trait BlendAdapter {
type Settings: Clone + 'static;
type Backend: MixBackend + 'static;
type Backend: BlendBackend + 'static;
type Network: NetworkAdapter + 'static;
type Tx: Serialize + DeserializeOwned + Clone + Eq + Hash + 'static;
type BlobCertificate: Serialize + DeserializeOwned + Clone + Eq + Hash + 'static;
async fn new(
settings: Self::Settings,
mix_relay: OutboundRelay<
<MixService<Self::Backend, Self::Network> as ServiceData>::Message,
blend_relay: OutboundRelay<
<BlendService<Self::Backend, Self::Network> as ServiceData>::Message,
>,
) -> Self;
async fn mix(&self, block: Block<Self::Tx, Self::BlobCertificate>);
async fn blend(&self, block: Block<Self::Tx, Self::BlobCertificate>);
}

View File

@ -1,6 +1,6 @@
pub mod blend;
mod leadership;
mod messages;
pub mod mix;
pub mod network;
mod time;
@ -119,7 +119,7 @@ impl Cryptarchia {
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct CryptarchiaSettings<Ts, Bs, NetworkAdapterSettings, MixAdapterSettings> {
pub struct CryptarchiaSettings<Ts, Bs, NetworkAdapterSettings, BlendAdapterSettings> {
#[serde(default)]
pub transaction_selector_settings: Ts,
#[serde(default)]
@ -129,12 +129,12 @@ pub struct CryptarchiaSettings<Ts, Bs, NetworkAdapterSettings, MixAdapterSetting
pub time: TimeConfig,
pub leader_config: LeaderConfig,
pub network_adapter_settings: NetworkAdapterSettings,
pub mix_adapter_settings: MixAdapterSettings,
pub blend_adapter_settings: BlendAdapterSettings,
}
pub struct CryptarchiaConsensus<
A,
MixAdapter,
BlendAdapter,
ClPool,
ClPoolAdapter,
DaPool,
@ -148,7 +148,7 @@ pub struct CryptarchiaConsensus<
SamplingStorage,
> where
A: NetworkAdapter,
MixAdapter: mix::MixAdapter,
BlendAdapter: blend::BlendAdapter,
ClPoolAdapter: MempoolAdapter<Payload = ClPool::Item, Key = ClPool::Key>,
ClPool: MemPool<BlockId = HeaderId>,
DaPool: MemPool<BlockId = HeaderId>,
@ -174,7 +174,8 @@ pub struct CryptarchiaConsensus<
// underlying networking backend. We need this so we can relay and check the types properly
// when implementing ServiceCore for CryptarchiaConsensus
network_relay: Relay<NetworkService<A::Backend>>,
mix_relay: Relay<nomos_mix_service::MixService<MixAdapter::Backend, MixAdapter::Network>>,
blend_relay:
Relay<nomos_blend_service::BlendService<BlendAdapter::Backend, BlendAdapter::Network>>,
cl_mempool_relay: Relay<TxMempoolService<ClPoolAdapter, ClPool>>,
da_mempool_relay: Relay<
DaMempoolService<
@ -195,7 +196,7 @@ pub struct CryptarchiaConsensus<
impl<
A,
MixAdapter,
BlendAdapter,
ClPool,
ClPoolAdapter,
DaPool,
@ -210,7 +211,7 @@ impl<
> ServiceData
for CryptarchiaConsensus<
A,
MixAdapter,
BlendAdapter,
ClPool,
ClPoolAdapter,
DaPool,
@ -225,7 +226,7 @@ impl<
>
where
A: NetworkAdapter,
MixAdapter: mix::MixAdapter,
BlendAdapter: blend::BlendAdapter,
ClPool: MemPool<BlockId = HeaderId>,
ClPool::Item: Clone + Eq + Hash + Debug,
ClPool::Key: Debug,
@ -248,7 +249,7 @@ where
{
const SERVICE_ID: ServiceId = CRYPTARCHIA_ID;
type Settings =
CryptarchiaSettings<TxS::Settings, BS::Settings, A::Settings, MixAdapter::Settings>;
CryptarchiaSettings<TxS::Settings, BS::Settings, A::Settings, BlendAdapter::Settings>;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = ConsensusMsg<Block<ClPool::Item, DaPool::Item>>;
@ -257,7 +258,7 @@ where
#[async_trait::async_trait]
impl<
A,
MixAdapter,
BlendAdapter,
ClPool,
ClPoolAdapter,
DaPool,
@ -272,7 +273,7 @@ impl<
> ServiceCore
for CryptarchiaConsensus<
A,
MixAdapter,
BlendAdapter,
ClPool,
ClPoolAdapter,
DaPool,
@ -292,12 +293,12 @@ where
+ Sync
+ 'static,
A::Settings: Send + Sync + 'static,
MixAdapter: mix::MixAdapter<Tx = ClPool::Item, BlobCertificate = DaPool::Item>
BlendAdapter: blend::BlendAdapter<Tx = ClPool::Item, BlobCertificate = DaPool::Item>
+ Clone
+ Send
+ Sync
+ 'static,
MixAdapter::Settings: Send + Sync + 'static,
BlendAdapter::Settings: Send + Sync + 'static,
ClPool: MemPool<BlockId = HeaderId> + Send + Sync + 'static,
ClPool::Settings: Send + Sync + 'static,
DaPool: MemPool<BlockId = HeaderId, Key = SamplingBackend::BlobId> + Send + Sync + 'static,
@ -345,16 +346,17 @@ where
{
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
let network_relay = service_state.overwatch_handle.relay();
let mix_relay = service_state.overwatch_handle.relay();
let blend_relay = service_state.overwatch_handle.relay();
let cl_mempool_relay = service_state.overwatch_handle.relay();
let da_mempool_relay = service_state.overwatch_handle.relay();
let storage_relay = service_state.overwatch_handle.relay();
let sampling_relay = service_state.overwatch_handle.relay();
let (block_subscription_sender, _) = broadcast::channel(16);
Ok(Self {
service_state,
network_relay,
mix_relay,
blend_relay,
cl_mempool_relay,
da_mempool_relay,
block_subscription_sender,
@ -370,11 +372,11 @@ where
.await
.expect("Relay connection with NetworkService should succeed");
let mix_relay: OutboundRelay<_> = self
.mix_relay
let blend_relay: OutboundRelay<_> = self
.blend_relay
.connect()
.await
.expect("Relay connection with nomos_mix_service::MixService should succeed");
.expect("Relay connection with nomos_blend_service::BlendService should succeed");
let cl_mempool_relay: OutboundRelay<_> = self
.cl_mempool_relay
@ -408,7 +410,7 @@ where
time,
leader_config,
network_adapter_settings,
mix_adapter_settings,
blend_adapter_settings,
} = self.service_state.settings_reader.get_updated_settings();
let genesis_id = HeaderId::from([0; 32]);
@ -434,7 +436,7 @@ where
let mut slot_timer = IntervalStream::new(timer.slot_interval());
let mix_adapter = MixAdapter::new(mix_adapter_settings, mix_relay).await;
let blend_adapter = BlendAdapter::new(blend_adapter_settings, blend_relay).await;
let mut lifecycle_stream = self.service_state.lifecycle_handle.message_stream();
@ -442,6 +444,7 @@ where
loop {
tokio::select! {
Some(block) = incoming_blocks.next() => {
Self::log_received_block(&block);
cryptarchia = Self::process_block(
cryptarchia,
&mut leader,
@ -480,7 +483,7 @@ where
).await;
if let Some(block) = block {
mix_adapter.mix(block).await;
blend_adapter.blend(block).await;
}
}
}
@ -505,7 +508,7 @@ where
impl<
A,
MixAdapter,
BlendAdapter,
ClPool,
ClPoolAdapter,
DaPool,
@ -520,7 +523,7 @@ impl<
>
CryptarchiaConsensus<
A,
MixAdapter,
BlendAdapter,
ClPool,
ClPoolAdapter,
DaPool,
@ -535,7 +538,7 @@ impl<
>
where
A: NetworkAdapter + Clone + Send + Sync + 'static,
MixAdapter: mix::MixAdapter + Clone + Send + Sync + 'static,
BlendAdapter: blend::BlendAdapter + Clone + Send + Sync + 'static,
ClPool: MemPool<BlockId = HeaderId> + Send + Sync + 'static,
ClPool::Settings: Send + Sync + 'static,
ClPool::Item: Transaction<Hash = ClPool::Key>
@ -804,6 +807,24 @@ where
.all(|blob| sampled_blobs_ids.contains(&blob.blob_id()));
validated_blobs
}
fn log_received_block(block: &Block<ClPool::Item, DaPool::Item>) {
let content_size = block.header().content_size();
let transactions = block.cl_transactions_len();
let blobs = block.bl_blobs_len();
tracing::info!(
counter.received_blocks = 1,
transactions = transactions,
blobs = blobs,
bytes = content_size
);
tracing::info!(
histogram.received_blocks_data = content_size,
transactions = transactions,
blobs = blobs
);
}
}
#[derive(Debug)]

View File

@ -32,7 +32,7 @@ use tracing::error;
pub type ConsensusRelay<
A,
MixAdapter,
BlendAdapter,
ClPool,
ClPoolAdapter,
DaPool,
@ -47,7 +47,7 @@ pub type ConsensusRelay<
> = Relay<
CryptarchiaConsensus<
A,
MixAdapter,
BlendAdapter,
ClPool,
ClPoolAdapter,
DaPool,
@ -67,7 +67,7 @@ pub struct DataIndexerService<
DaStorage,
Consensus,
A,
MixAdapter,
BlendAdapter,
ClPool,
ClPoolAdapter,
DaPool,
@ -82,7 +82,7 @@ pub struct DataIndexerService<
> where
B: 'static,
A: NetworkAdapter,
MixAdapter: cryptarchia_consensus::mix::MixAdapter,
BlendAdapter: cryptarchia_consensus::blend::BlendAdapter,
ClPoolAdapter: MempoolAdapter<Payload = ClPool::Item, Key = ClPool::Key>,
ClPool: MemPool<BlockId = HeaderId>,
DaPool: MemPool<BlockId = HeaderId>,
@ -110,7 +110,7 @@ pub struct DataIndexerService<
#[allow(clippy::type_complexity)]
consensus_relay: ConsensusRelay<
A,
MixAdapter,
BlendAdapter,
ClPool,
ClPoolAdapter,
DaPool,
@ -156,7 +156,7 @@ impl<
DaStorage,
Consensus,
A,
MixAdapter,
BlendAdapter,
ClPool,
ClPoolAdapter,
DaPool,
@ -174,7 +174,7 @@ impl<
DaStorage,
Consensus,
A,
MixAdapter,
BlendAdapter,
ClPool,
ClPoolAdapter,
DaPool,
@ -190,7 +190,7 @@ impl<
where
B: 'static,
A: NetworkAdapter,
MixAdapter: cryptarchia_consensus::mix::MixAdapter,
BlendAdapter: cryptarchia_consensus::blend::BlendAdapter,
ClPoolAdapter: MempoolAdapter<Payload = ClPool::Item, Key = ClPool::Key>,
ClPool: MemPool<BlockId = HeaderId>,
DaPool: MemPool<BlockId = HeaderId>,
@ -225,7 +225,7 @@ impl<
DaStorage,
Consensus,
A,
MixAdapter,
BlendAdapter,
ClPool,
ClPoolAdapter,
DaPool,
@ -243,7 +243,7 @@ impl<
DaStorage,
Consensus,
A,
MixAdapter,
BlendAdapter,
ClPool,
ClPoolAdapter,
DaPool,
@ -259,7 +259,7 @@ impl<
where
B: Send + Sync + 'static,
A: NetworkAdapter,
MixAdapter: cryptarchia_consensus::mix::MixAdapter,
BlendAdapter: cryptarchia_consensus::blend::BlendAdapter,
ClPoolAdapter: MempoolAdapter<Payload = ClPool::Item, Key = ClPool::Key>,
ClPool: MemPool<BlockId = HeaderId>,
DaPool: MemPool<BlockId = HeaderId>,
@ -336,7 +336,7 @@ impl<
DaStorage,
Consensus,
A,
MixAdapter,
BlendAdapter,
ClPool,
ClPoolAdapter,
DaPool,
@ -354,7 +354,7 @@ impl<
DaStorage,
Consensus,
A,
MixAdapter,
BlendAdapter,
ClPool,
ClPoolAdapter,
DaPool,
@ -370,7 +370,7 @@ impl<
where
B: Debug + Send + Sync,
A: NetworkAdapter,
MixAdapter: cryptarchia_consensus::mix::MixAdapter,
BlendAdapter: cryptarchia_consensus::blend::BlendAdapter,
ClPoolAdapter: MempoolAdapter<Payload = ClPool::Item, Key = ClPool::Key>,
ClPool: MemPool<BlockId = HeaderId>,
DaPool: MemPool<BlockId = HeaderId>,

View File

@ -23,9 +23,9 @@ nomos-node = { path = "../../../nodes/nomos-node" }
nomos-mempool = { path = "../../../nomos-services/mempool" }
nomos-storage = { path = "../../../nomos-services/storage", features = ["rocksdb-backend"] }
nomos-network = { path = "../../network", features = ["mock"] }
nomos-mix-service = { path = "../../mix" }
nomos-mix = { path = "../../../nomos-mix/core" }
nomos-mix-message = { path = "../../../nomos-mix/message" }
nomos-blend-service = { path = "../../blend" }
nomos-blend = { path = "../../../nomos-blend/core" }
nomos-blend-message = { path = "../../../nomos-blend/message" }
nomos-libp2p = { path = "../../../nomos-libp2p" }
libp2p = { version = "0.53.2", features = ["ed25519"] }
once_cell = "1.19"

View File

@ -1,11 +1,11 @@
use cryptarchia_consensus::LeaderConfig;
// std
use nomos_da_network_service::backends::libp2p::common::DaNetworkBackendSettings;
use nomos_mix::message_blend::{
use nomos_blend::message_blend::{
CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings,
};
use nomos_mix::{conn_maintenance::ConnectionMaintenanceSettings, membership::Node};
use nomos_mix_message::{sphinx::SphinxMessage, MixMessage};
use nomos_blend::{conn_maintenance::ConnectionMaintenanceSettings, membership::Node};
use nomos_blend_message::{sphinx::SphinxMessage, BlendMessage};
use nomos_da_network_service::backends::libp2p::common::DaNetworkBackendSettings;
use std::path::PathBuf;
use std::time::Duration;
// crates
@ -19,6 +19,10 @@ use libp2p::identity::{
ed25519::{self, Keypair as Ed25519Keypair},
Keypair, PeerId,
};
use nomos_blend_service::backends::libp2p::{
Libp2pBlendBackend as BlendBackend, Libp2pBlendBackendSettings,
};
use nomos_blend_service::{BlendConfig, BlendService};
use nomos_core::{da::blob::info::DispersedBlobInfo, header::HeaderId, tx::Transaction};
pub use nomos_core::{
da::blob::select::FillSize as FillSizeWithBlobs, tx::select::FillSize as FillSizeWithTx,
@ -54,10 +58,6 @@ use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolNetworkAda
use nomos_mempool::network::adapters::libp2p::Settings as AdapterSettings;
use nomos_mempool::{backend::mockpool::MockPool, TxMempoolService};
use nomos_mempool::{DaMempoolSettings, TxMempoolSettings};
use nomos_mix_service::backends::libp2p::{
Libp2pMixBackend as MixBackend, Libp2pMixBackendSettings,
};
use nomos_mix_service::{MixConfig, MixService};
use nomos_network::backends::libp2p::{Libp2p as NetworkBackend, Libp2pConfig};
use nomos_network::NetworkConfig;
use nomos_network::NetworkService;
@ -92,8 +92,8 @@ pub static ENCODER: Lazy<DaEncoder> = Lazy::new(|| DaEncoder::new(PARAMS.clone()
pub(crate) type Cryptarchia = cryptarchia_consensus::CryptarchiaConsensus<
cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter<Tx, BlobInfo>,
cryptarchia_consensus::mix::adapters::libp2p::LibP2pAdapter<
nomos_mix_service::network::libp2p::Libp2pAdapter,
cryptarchia_consensus::blend::adapters::libp2p::LibP2pAdapter<
nomos_blend_service::network::libp2p::Libp2pAdapter,
Tx,
BlobInfo,
>,
@ -124,8 +124,8 @@ pub(crate) type DaIndexer = DataIndexerService<
CryptarchiaConsensusAdapter<Tx, BlobInfo>,
// Cryptarchia specific, should be the same as in `Cryptarchia` type above.
cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapter<Tx, BlobInfo>,
cryptarchia_consensus::mix::adapters::libp2p::LibP2pAdapter<
nomos_mix_service::network::libp2p::Libp2pAdapter,
cryptarchia_consensus::blend::adapters::libp2p::LibP2pAdapter<
nomos_blend_service::network::libp2p::Libp2pAdapter,
Tx,
BlobInfo,
>,
@ -168,7 +168,9 @@ pub(crate) const MB16: usize = 1024 * 1024 * 16;
pub struct TestNode {
//logging: ServiceHandle<Logger>,
network: ServiceHandle<NetworkService<NetworkBackend>>,
mix: ServiceHandle<MixService<MixBackend, nomos_mix_service::network::libp2p::Libp2pAdapter>>,
blend: ServiceHandle<
BlendService<BlendBackend, nomos_blend_service::network::libp2p::Libp2pAdapter>,
>,
cl_mempool: ServiceHandle<TxMempool>,
da_network: ServiceHandle<DaNetworkService<DaNetworkValidatorBackend<FillFromNodeList>>>,
da_mempool: ServiceHandle<DaMempool>,
@ -188,10 +190,10 @@ pub struct TestDaNetworkSettings {
pub node_key: ed25519::SecretKey,
}
pub struct TestMixSettings {
pub backend: Libp2pMixBackendSettings,
pub struct TestBlendSettings {
pub backend: Libp2pBlendBackendSettings,
pub private_key: x25519_dalek::StaticSecret,
pub membership: Vec<Node<<SphinxMessage as MixMessage>::PublicKey>>,
pub membership: Vec<Node<<SphinxMessage as BlendMessage>::PublicKey>>,
}
pub fn new_node(
@ -200,7 +202,7 @@ pub fn new_node(
genesis_state: &LedgerState,
time_config: &TimeConfig,
swarm_config: &SwarmConfig,
mix_config: &TestMixSettings,
blend_config: &TestBlendSettings,
db_path: PathBuf,
blobs_dir: &PathBuf,
initial_peers: Vec<Multiaddr>,
@ -216,23 +218,23 @@ pub fn new_node(
initial_peers,
},
},
mix: MixConfig {
backend: mix_config.backend.clone(),
blend: BlendConfig {
backend: blend_config.backend.clone(),
persistent_transmission: Default::default(),
message_blend: MessageBlendSettings {
cryptographic_processor: CryptographicProcessorSettings {
private_key: mix_config.private_key.to_bytes(),
num_mix_layers: 1,
private_key: blend_config.private_key.to_bytes(),
num_blend_layers: 1,
},
temporal_processor: TemporalSchedulerSettings {
max_delay_seconds: 2,
},
},
cover_traffic: nomos_mix_service::CoverTrafficExtSettings {
cover_traffic: nomos_blend_service::CoverTrafficExtSettings {
epoch_duration: Duration::from_secs(432000),
slot_duration: Duration::from_secs(20),
},
membership: mix_config.membership.clone(),
membership: blend_config.membership.clone(),
},
da_network: DaNetworkConfig {
backend: DaNetworkBackendSettings {
@ -285,10 +287,10 @@ pub fn new_node(
cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapterSettings {
topic: String::from(nomos_node::CONSENSUS_TOPIC),
},
mix_adapter_settings:
cryptarchia_consensus::mix::adapters::libp2p::LibP2pAdapterSettings {
blend_adapter_settings:
cryptarchia_consensus::blend::adapters::libp2p::LibP2pAdapterSettings {
broadcast_settings:
nomos_mix_service::network::libp2p::Libp2pBroadcastSettings {
nomos_blend_service::network::libp2p::Libp2pBroadcastSettings {
topic: String::from(nomos_node::CONSENSUS_TOPIC),
},
},
@ -321,12 +323,12 @@ pub fn new_node(
.unwrap()
}
pub fn new_mix_configs(listening_addresses: Vec<Multiaddr>) -> Vec<TestMixSettings> {
pub fn new_blend_configs(listening_addresses: Vec<Multiaddr>) -> Vec<TestBlendSettings> {
let settings = listening_addresses
.iter()
.map(|listening_address| {
(
Libp2pMixBackendSettings {
Libp2pBlendBackendSettings {
listening_address: listening_address.clone(),
node_key: ed25519::SecretKey::generate(),
conn_maintenance: ConnectionMaintenanceSettings {
@ -353,7 +355,7 @@ pub fn new_mix_configs(listening_addresses: Vec<Multiaddr>) -> Vec<TestMixSettin
settings
.into_iter()
.map(|(backend, private_key)| TestMixSettings {
.map(|(backend, private_key)| TestBlendSettings {
backend,
private_key,
membership: membership.clone(),

View File

@ -92,7 +92,7 @@ fn test_indexer() {
port: 7772,
..Default::default()
};
let mix_configs = new_mix_configs(vec![
let blend_configs = new_blend_configs(vec![
Multiaddr::from_str("/ip4/127.0.0.1/udp/7781/quic-v1").unwrap(),
Multiaddr::from_str("/ip4/127.0.0.1/udp/7782/quic-v1").unwrap(),
]);
@ -123,7 +123,7 @@ fn test_indexer() {
&genesis_state,
&time_config,
&swarm_config1,
&mix_configs[0],
&blend_configs[0],
NamedTempFile::new().unwrap().path().to_path_buf(),
&blobs_dir,
vec![node_address(&swarm_config2)],
@ -151,7 +151,7 @@ fn test_indexer() {
&genesis_state,
&time_config,
&swarm_config2,
&mix_configs[1],
&blend_configs[1],
NamedTempFile::new().unwrap().path().to_path_buf(),
&blobs_dir,
vec![node_address(&swarm_config1)],

View File

@ -83,7 +83,7 @@ fn test_verifier() {
..Default::default()
};
let mix_configs = new_mix_configs(vec![
let blend_configs = new_blend_configs(vec![
Multiaddr::from_str("/ip4/127.0.0.1/udp/7783/quic-v1").unwrap(),
Multiaddr::from_str("/ip4/127.0.0.1/udp/7784/quic-v1").unwrap(),
Multiaddr::from_str("/ip4/127.0.0.1/udp/7785/quic-v1").unwrap(),
@ -124,7 +124,7 @@ fn test_verifier() {
&genesis_state,
&time_config,
&swarm_config1,
&mix_configs[0],
&blend_configs[0],
NamedTempFile::new().unwrap().path().to_path_buf(),
&blobs_dir,
vec![node_address(&swarm_config2)],
@ -152,7 +152,7 @@ fn test_verifier() {
&genesis_state,
&time_config,
&swarm_config2,
&mix_configs[1],
&blend_configs[1],
NamedTempFile::new().unwrap().path().to_path_buf(),
&blobs_dir,
vec![node_address(&swarm_config1)],
@ -180,7 +180,7 @@ fn test_verifier() {
&genesis_state,
&time_config,
&swarm_config3,
&mix_configs[2],
&blend_configs[2],
NamedTempFile::new().unwrap().path().to_path_buf(),
&blobs_dir,
vec![node_address(&swarm_config2)],

View File

@ -9,8 +9,8 @@ clap = { version = "4", features = ["derive"] }
nomos-executor = { path = "../../nodes/nomos-executor" }
nomos-libp2p = { path = "../../nomos-libp2p" }
nomos-node = { path = "../../nodes/nomos-node" }
nomos-mix = { path = "../../nomos-mix/core" }
nomos-mix-message = { path = "../../nomos-mix/message" }
nomos-blend = { path = "../../nomos-blend/core" }
nomos-blend-message = { path = "../../nomos-blend/message" }
nomos-tracing = { path = "../../nomos-tracing" }
nomos-tracing-service = { path = "../../nomos-services/tracing" }
rand = "0.8"

View File

@ -1,9 +1,9 @@
// std
use std::{collections::HashMap, net::Ipv4Addr, str::FromStr};
// crates
use nomos_blend::membership::Node;
use nomos_blend_message::{sphinx::SphinxMessage, BlendMessage};
use nomos_libp2p::{Multiaddr, PeerId};
use nomos_mix::membership::Node;
use nomos_mix_message::{sphinx::SphinxMessage, MixMessage};
use nomos_tracing::{
logging::loki::LokiConfig, metrics::otlp::OtlpMetricsConfig, tracing::otlp::OtlpTracingConfig,
};
@ -11,9 +11,9 @@ use nomos_tracing_service::{FilterLayer, LoggerLayer, MetricsLayer, TracingSetti
use rand::{thread_rng, Rng};
use tests::topology::configs::{
api::GeneralApiConfig,
blend::create_blend_configs,
consensus::{create_consensus_configs, ConsensusParams},
da::{create_da_configs, DaParams},
mix::create_mix_configs,
network::create_network_configs,
tracing::GeneralTracingConfig,
GeneralConfig,
@ -24,7 +24,7 @@ use crate::TracingParams;
const DEFAULT_LIBP2P_NETWORK_PORT: u16 = 3000;
const DEFAULT_DA_NETWORK_PORT: u16 = 3300;
const DEFAULT_MIX_PORT: u16 = 3400;
const DEFAULT_BLEND_PORT: u16 = 3400;
const DEFAULT_API_PORT: u16 = 18080;
#[derive(Eq, PartialEq, Hash, Clone)]
@ -40,7 +40,7 @@ pub struct Host {
pub identifier: String,
pub network_port: u16,
pub da_network_port: u16,
pub mix_port: u16,
pub blend_port: u16,
}
impl Host {
@ -51,7 +51,7 @@ impl Host {
identifier,
network_port: DEFAULT_LIBP2P_NETWORK_PORT,
da_network_port: DEFAULT_DA_NETWORK_PORT,
mix_port: DEFAULT_MIX_PORT,
blend_port: DEFAULT_BLEND_PORT,
}
}
@ -62,7 +62,7 @@ impl Host {
identifier,
network_port: DEFAULT_LIBP2P_NETWORK_PORT,
da_network_port: DEFAULT_DA_NETWORK_PORT,
mix_port: DEFAULT_MIX_PORT,
blend_port: DEFAULT_BLEND_PORT,
}
}
}
@ -81,7 +81,7 @@ pub fn create_node_configs(
let consensus_configs = create_consensus_configs(&ids, consensus_params);
let da_configs = create_da_configs(&ids, da_params);
let network_configs = create_network_configs(&ids, Default::default());
let mix_configs = create_mix_configs(&ids);
let blend_configs = create_blend_configs(&ids);
let api_configs = ids
.iter()
.map(|_| GeneralApiConfig {
@ -94,8 +94,8 @@ pub fn create_node_configs(
let peer_addresses = da_configs[0].addresses.clone();
let host_network_init_peers = update_network_init_peers(hosts.clone());
let host_da_peer_addresses = update_da_peer_addresses(hosts.clone(), peer_addresses);
let host_mix_membership =
update_mix_membership(hosts.clone(), mix_configs[0].membership.clone());
let host_blend_membership =
update_blend_membership(hosts.clone(), blend_configs[0].membership.clone());
let new_peer_addresses: HashMap<PeerId, Multiaddr> = host_da_peer_addresses
.clone()
@ -122,11 +122,11 @@ pub fn create_node_configs(
network_config.swarm_config.port = host.network_port;
network_config.initial_peers = host_network_init_peers.clone();
// Mix config.
let mut mix_config = mix_configs[i].to_owned();
mix_config.backend.listening_address =
Multiaddr::from_str(&format!("/ip4/0.0.0.0/udp/{}/quic-v1", host.mix_port)).unwrap();
mix_config.membership = host_mix_membership.clone();
// Blend config.
let mut blend_config = blend_configs[i].to_owned();
blend_config.backend.listening_address =
Multiaddr::from_str(&format!("/ip4/0.0.0.0/udp/{}/quic-v1", host.blend_port)).unwrap();
blend_config.membership = host_blend_membership.clone();
// Tracing config.
let tracing_config =
@ -138,7 +138,7 @@ pub fn create_node_configs(
consensus_config,
da_config,
network_config,
mix_config,
blend_config,
api_config,
tracing_config,
},
@ -174,16 +174,16 @@ fn update_da_peer_addresses(
.collect()
}
fn update_mix_membership(
fn update_blend_membership(
hosts: Vec<Host>,
membership: Vec<Node<<SphinxMessage as MixMessage>::PublicKey>>,
) -> Vec<Node<<SphinxMessage as MixMessage>::PublicKey>> {
membership: Vec<Node<<SphinxMessage as BlendMessage>::PublicKey>>,
) -> Vec<Node<<SphinxMessage as BlendMessage>::PublicKey>> {
membership
.into_iter()
.zip(hosts)
.map(|(mut node, host)| {
node.address =
Multiaddr::from_str(&format!("/ip4/{}/udp/{}/quic-v1", host.ip, host.mix_port))
Multiaddr::from_str(&format!("/ip4/{}/udp/{}/quic-v1", host.ip, host.blend_port))
.unwrap();
node
})
@ -235,7 +235,7 @@ mod cfgsync_tests {
identifier: "node".into(),
network_port: 3000,
da_network_port: 4044,
mix_port: 5000,
blend_port: 5000,
})
.collect();
@ -265,11 +265,11 @@ mod cfgsync_tests {
for (host, config) in configs.iter() {
let network_port = config.network_config.swarm_config.port;
let da_network_port = extract_port(&config.da_config.listening_address);
let mix_port = extract_port(&config.mix_config.backend.listening_address);
let blend_port = extract_port(&config.blend_config.backend.listening_address);
assert_eq!(network_port, host.network_port);
assert_eq!(da_network_port, host.da_network_port);
assert_eq!(mix_port, host.mix_port);
assert_eq!(blend_port, host.blend_port);
}
}

View File

@ -10,9 +10,9 @@ executor-http-client = { path = "../clients/executor-http-client" }
nomos-node = { path = "../nodes/nomos-node", default-features = false }
nomos-executor = { path = "../nodes/nomos-executor", default-features = false }
nomos-network = { path = "../nomos-services/network", features = ["libp2p"] }
nomos-mix-service = { path = "../nomos-services/mix", features = ["libp2p"] }
nomos-mix = { path = "../nomos-mix/core" }
nomos-mix-message = { path = "../nomos-mix/message" }
nomos-blend-service = { path = "../nomos-services/blend", features = ["libp2p"] }
nomos-blend = { path = "../nomos-blend/core" }
nomos-blend-message = { path = "../nomos-blend/message" }
cryptarchia-consensus = { path = "../nomos-services/cryptarchia-consensus" }
nomos-tracing = { path = "../nomos-tracing" }
nomos-tracing-service = { path = "../nomos-services/tracing" }

View File

@ -6,6 +6,9 @@ use std::{net::SocketAddr, process::Child};
use crate::adjust_timeout;
use crate::topology::configs::GeneralConfig;
use cryptarchia_consensus::CryptarchiaSettings;
use nomos_blend::message_blend::{
CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings,
};
use nomos_da_dispersal::backend::kzgrs::{DispersalKZGRSBackendSettings, EncoderSettings};
use nomos_da_dispersal::DispersalServiceSettings;
use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings as IndexerStorageAdapterSettings;
@ -22,9 +25,6 @@ use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as Verif
use nomos_da_verifier::DaVerifierServiceSettings;
use nomos_executor::api::backend::AxumBackendSettings;
use nomos_executor::config::Config;
use nomos_mix::message_blend::{
CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings,
};
use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig};
use nomos_node::api::paths::{CL_METRICS, DA_GET_RANGE};
use nomos_node::RocksBackendSettings;
@ -154,23 +154,23 @@ pub fn create_executor_config(config: GeneralConfig) -> Config {
initial_peers: config.network_config.initial_peers,
},
},
mix: nomos_mix_service::MixConfig {
backend: config.mix_config.backend,
blend: nomos_blend_service::BlendConfig {
backend: config.blend_config.backend,
persistent_transmission: Default::default(),
message_blend: MessageBlendSettings {
cryptographic_processor: CryptographicProcessorSettings {
private_key: config.mix_config.private_key.to_bytes(),
num_mix_layers: 1,
private_key: config.blend_config.private_key.to_bytes(),
num_blend_layers: 1,
},
temporal_processor: TemporalSchedulerSettings {
max_delay_seconds: 2,
},
},
cover_traffic: nomos_mix_service::CoverTrafficExtSettings {
cover_traffic: nomos_blend_service::CoverTrafficExtSettings {
epoch_duration: Duration::from_secs(432000),
slot_duration: Duration::from_secs(20),
},
membership: config.mix_config.membership,
membership: config.blend_config.membership,
},
cryptarchia: CryptarchiaSettings {
leader_config: config.consensus_config.leader_config,
@ -183,10 +183,10 @@ pub fn create_executor_config(config: GeneralConfig) -> Config {
cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapterSettings {
topic: String::from(nomos_node::CONSENSUS_TOPIC),
},
mix_adapter_settings:
cryptarchia_consensus::mix::adapters::libp2p::LibP2pAdapterSettings {
blend_adapter_settings:
cryptarchia_consensus::blend::adapters::libp2p::LibP2pAdapterSettings {
broadcast_settings:
nomos_mix_service::network::libp2p::Libp2pBroadcastSettings {
nomos_blend_service::network::libp2p::Libp2pBroadcastSettings {
topic: String::from(nomos_node::CONSENSUS_TOPIC),
},
},

View File

@ -4,6 +4,9 @@ use std::time::Duration;
use std::{net::SocketAddr, process::Child};
use cryptarchia_consensus::{CryptarchiaInfo, CryptarchiaSettings};
use nomos_blend::message_blend::{
CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings,
};
use nomos_core::block::Block;
use nomos_da_indexer::storage::adapters::rocksdb::RocksAdapterSettings as IndexerStorageAdapterSettings;
use nomos_da_indexer::IndexerSettings;
@ -14,9 +17,6 @@ use nomos_da_sampling::{backend::kzgrs::KzgrsSamplingBackendSettings, DaSampling
use nomos_da_verifier::storage::adapters::rocksdb::RocksAdapterSettings as VerifierStorageAdapterSettings;
use nomos_da_verifier::{backend::kzgrs::KzgrsDaVerifierSettings, DaVerifierServiceSettings};
use nomos_mempool::MempoolMetrics;
use nomos_mix::message_blend::{
CryptographicProcessorSettings, MessageBlendSettings, TemporalSchedulerSettings,
};
use nomos_network::{backends::libp2p::Libp2pConfig, NetworkConfig};
use nomos_node::api::paths::{
CL_METRICS, CRYPTARCHIA_HEADERS, CRYPTARCHIA_INFO, DA_GET_RANGE, STORAGE_BLOCK,
@ -240,23 +240,23 @@ pub fn create_validator_config(config: GeneralConfig) -> Config {
initial_peers: config.network_config.initial_peers,
},
},
mix: nomos_mix_service::MixConfig {
backend: config.mix_config.backend,
blend: nomos_blend_service::BlendConfig {
backend: config.blend_config.backend,
persistent_transmission: Default::default(),
message_blend: MessageBlendSettings {
cryptographic_processor: CryptographicProcessorSettings {
private_key: config.mix_config.private_key.to_bytes(),
num_mix_layers: 1,
private_key: config.blend_config.private_key.to_bytes(),
num_blend_layers: 1,
},
temporal_processor: TemporalSchedulerSettings {
max_delay_seconds: 2,
},
},
cover_traffic: nomos_mix_service::CoverTrafficExtSettings {
cover_traffic: nomos_blend_service::CoverTrafficExtSettings {
epoch_duration: Duration::from_secs(432000),
slot_duration: Duration::from_secs(20),
},
membership: config.mix_config.membership,
membership: config.blend_config.membership,
},
cryptarchia: CryptarchiaSettings {
leader_config: config.consensus_config.leader_config,
@ -269,10 +269,10 @@ pub fn create_validator_config(config: GeneralConfig) -> Config {
cryptarchia_consensus::network::adapters::libp2p::LibP2pAdapterSettings {
topic: String::from(nomos_node::CONSENSUS_TOPIC),
},
mix_adapter_settings:
cryptarchia_consensus::mix::adapters::libp2p::LibP2pAdapterSettings {
blend_adapter_settings:
cryptarchia_consensus::blend::adapters::libp2p::LibP2pAdapterSettings {
broadcast_settings:
nomos_mix_service::network::libp2p::Libp2pBroadcastSettings {
nomos_blend_service::network::libp2p::Libp2pBroadcastSettings {
topic: String::from(nomos_node::CONSENSUS_TOPIC),
},
},

View File

@ -28,7 +28,7 @@ async fn disseminate(executor: &Executor, data: &[u8]) {
client.publish_blob(data.to_vec(), metadata).await.unwrap();
}
#[ignore = "todo: reenable after mixnet is tested"]
#[ignore = "todo: reenable after blendnet is tested"]
#[tokio::test]
async fn disseminate_and_retrieve() {
let topology = Topology::spawn(TopologyConfig::validator_and_executor()).await;

View File

@ -1,29 +1,29 @@
use std::str::FromStr;
use nomos_blend::{conn_maintenance::ConnectionMaintenanceSettings, membership::Node};
use nomos_blend_message::{sphinx::SphinxMessage, BlendMessage};
use nomos_blend_service::backends::libp2p::Libp2pBlendBackendSettings;
use nomos_libp2p::{ed25519, Multiaddr};
use nomos_mix::{conn_maintenance::ConnectionMaintenanceSettings, membership::Node};
use nomos_mix_message::{sphinx::SphinxMessage, MixMessage};
use nomos_mix_service::backends::libp2p::Libp2pMixBackendSettings;
use crate::get_available_port;
#[derive(Clone)]
pub struct GeneralMixConfig {
pub backend: Libp2pMixBackendSettings,
pub struct GeneralBlendConfig {
pub backend: Libp2pBlendBackendSettings,
pub private_key: x25519_dalek::StaticSecret,
pub membership: Vec<Node<<SphinxMessage as MixMessage>::PublicKey>>,
pub membership: Vec<Node<<SphinxMessage as BlendMessage>::PublicKey>>,
}
pub fn create_mix_configs(ids: &[[u8; 32]]) -> Vec<GeneralMixConfig> {
let mut configs: Vec<GeneralMixConfig> = ids
pub fn create_blend_configs(ids: &[[u8; 32]]) -> Vec<GeneralBlendConfig> {
let mut configs: Vec<GeneralBlendConfig> = ids
.iter()
.map(|id| {
let mut node_key_bytes = *id;
let node_key = ed25519::SecretKey::try_from_bytes(&mut node_key_bytes)
.expect("Failed to generate secret key from bytes");
GeneralMixConfig {
backend: Libp2pMixBackendSettings {
GeneralBlendConfig {
backend: Libp2pBlendBackendSettings {
listening_address: Multiaddr::from_str(&format!(
"/ip4/127.0.0.1/udp/{}/quic-v1",
get_available_port(),
@ -42,7 +42,7 @@ pub fn create_mix_configs(ids: &[[u8; 32]]) -> Vec<GeneralMixConfig> {
})
.collect();
let nodes = mix_nodes(&configs);
let nodes = blend_nodes(&configs);
configs.iter_mut().for_each(|config| {
config.membership = nodes.clone();
});
@ -50,7 +50,9 @@ pub fn create_mix_configs(ids: &[[u8; 32]]) -> Vec<GeneralMixConfig> {
configs
}
fn mix_nodes(configs: &[GeneralMixConfig]) -> Vec<Node<<SphinxMessage as MixMessage>::PublicKey>> {
fn blend_nodes(
configs: &[GeneralBlendConfig],
) -> Vec<Node<<SphinxMessage as BlendMessage>::PublicKey>> {
configs
.iter()
.map(|config| Node {

View File

@ -1,14 +1,14 @@
pub mod api;
pub mod blend;
pub mod consensus;
pub mod da;
pub mod mix;
pub mod network;
pub mod tracing;
use api::GeneralApiConfig;
use blend::GeneralBlendConfig;
use consensus::GeneralConsensusConfig;
use da::GeneralDaConfig;
use mix::GeneralMixConfig;
use network::GeneralNetworkConfig;
use tracing::GeneralTracingConfig;
@ -18,6 +18,6 @@ pub struct GeneralConfig {
pub consensus_config: GeneralConsensusConfig,
pub da_config: GeneralDaConfig,
pub network_config: GeneralNetworkConfig,
pub mix_config: GeneralMixConfig,
pub blend_config: GeneralBlendConfig,
pub tracing_config: GeneralTracingConfig,
}

View File

@ -15,8 +15,8 @@ use crate::{
},
topology::configs::{
api::create_api_configs,
blend::create_blend_configs,
consensus::{create_consensus_configs, ConsensusParams},
mix::create_mix_configs,
},
};
@ -77,7 +77,7 @@ impl Topology {
let consensus_configs = create_consensus_configs(&ids, config.consensus_params);
let da_configs = create_da_configs(&ids, config.da_params);
let network_configs = create_network_configs(&ids, config.network_params);
let mix_configs = create_mix_configs(&ids);
let blend_configs = create_blend_configs(&ids);
let api_configs = create_api_configs(&ids);
let tracing_configs = create_tracing_configs(&ids);
@ -87,7 +87,7 @@ impl Topology {
consensus_config: consensus_configs[i].to_owned(),
da_config: da_configs[i].to_owned(),
network_config: network_configs[i].to_owned(),
mix_config: mix_configs[i].to_owned(),
blend_config: blend_configs[i].to_owned(),
api_config: api_configs[i].to_owned(),
tracing_config: tracing_configs[i].to_owned(),
});
@ -100,7 +100,7 @@ impl Topology {
consensus_config: consensus_configs[i].to_owned(),
da_config: da_configs[i].to_owned(),
network_config: network_configs[i].to_owned(),
mix_config: mix_configs[i].to_owned(),
blend_config: blend_configs[i].to_owned(),
api_config: api_configs[i].to_owned(),
tracing_config: tracing_configs[i].to_owned(),
});