diff --git a/Cargo.toml b/Cargo.toml index f1744e0e..0bde0c96 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ members = [ "nomos-services/network", "nomos-services/storage", "nomos-services/carnot-consensus", + "nomos-services/cryptarchia-consensus", "nomos-services/mempool", "nomos-services/http", "nomos-services/data-availability", diff --git a/consensus/carnot-engine/src/lib.rs b/consensus/carnot-engine/src/lib.rs index 63f8cf0f..a56cfcce 100644 --- a/consensus/carnot-engine/src/lib.rs +++ b/consensus/carnot-engine/src/lib.rs @@ -8,12 +8,6 @@ mod types; pub use overlay::Overlay; pub use types::*; -/// Re-export of the OpenAPI types -#[cfg(feature = "openapi")] -pub mod openapi { - pub use crate::types::BlockId; -} - #[derive(Clone, Debug, PartialEq)] pub struct Carnot { id: NodeId, diff --git a/consensus/cryptarchia-engine/src/config.rs b/consensus/cryptarchia-engine/src/config.rs index 72f58dae..3c2dbba4 100644 --- a/consensus/cryptarchia-engine/src/config.rs +++ b/consensus/cryptarchia-engine/src/config.rs @@ -1,3 +1,4 @@ +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[derive(Clone, Debug, PartialEq)] pub struct Config { // The k parameter in the Common Prefix property. diff --git a/consensus/cryptarchia-engine/src/time.rs b/consensus/cryptarchia-engine/src/time.rs index 96d8bb36..7260d454 100644 --- a/consensus/cryptarchia-engine/src/time.rs +++ b/consensus/cryptarchia-engine/src/time.rs @@ -4,6 +4,7 @@ use std::ops::Add; #[derive(Clone, Debug, Eq, PartialEq, Copy, Hash, PartialOrd, Ord)] pub struct Slot(u64); +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[derive(Clone, Debug, Eq, PartialEq, Copy, Hash, PartialOrd, Ord)] pub struct Epoch(u32); diff --git a/ledger/cryptarchia-ledger/Cargo.toml b/ledger/cryptarchia-ledger/Cargo.toml index 73441093..fd819d29 100644 --- a/ledger/cryptarchia-ledger/Cargo.toml +++ b/ledger/cryptarchia-ledger/Cargo.toml @@ -15,4 +15,4 @@ cryptarchia-engine = { path = "../../consensus/cryptarchia-engine" } nomos-utils = { path = "../../nomos-utils", optional = true } [features] -serde = ["dep:serde", "nomos-utils/serde"] \ No newline at end of file +serde = ["dep:serde", "nomos-utils/serde", "rpds/serde"] \ No newline at end of file diff --git a/ledger/cryptarchia-ledger/src/config.rs b/ledger/cryptarchia-ledger/src/config.rs index 17761940..d7fe1940 100644 --- a/ledger/cryptarchia-ledger/src/config.rs +++ b/ledger/cryptarchia-ledger/src/config.rs @@ -1,5 +1,6 @@ use cryptarchia_engine::{Epoch, Slot}; +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[derive(Clone, Debug, PartialEq)] pub struct Config { // The stake distribution is always taken at the beginning of the previous epoch. diff --git a/ledger/cryptarchia-ledger/src/lib.rs b/ledger/cryptarchia-ledger/src/lib.rs index 971a27c1..105f239e 100644 --- a/ledger/cryptarchia-ledger/src/lib.rs +++ b/ledger/cryptarchia-ledger/src/lib.rs @@ -7,10 +7,11 @@ mod utils; use blake2::Digest; use cryptarchia_engine::{Epoch, Slot}; use crypto::Blake2b; -use rpds::HashTrieSet; use std::{collections::HashMap, hash::Hash}; use thiserror::Error; +type HashTrieSet = rpds::HashTrieSetSync; + pub use config::Config; pub use leader_proof::*; pub use nonce::*; @@ -31,6 +32,7 @@ pub enum LedgerError { OrphanMissing(Id), } +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[derive(Clone, Debug, Eq, PartialEq)] pub struct EpochState { // The epoch this snapshot is for @@ -131,6 +133,7 @@ where } } +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[derive(Clone, Eq, PartialEq)] pub struct LedgerState { // commitments to coins that can be used to propose new blocks diff --git a/nomos-services/cryptarchia-consensus/Cargo.toml b/nomos-services/cryptarchia-consensus/Cargo.toml new file mode 100644 index 00000000..25aec05a --- /dev/null +++ b/nomos-services/cryptarchia-consensus/Cargo.toml @@ -0,0 +1,42 @@ +[package] +name = "cryptarchia-consensus" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = "0.1" +bytes = "1.3" +chrono = "0.4" +cryptarchia-engine = { path = "../../consensus/cryptarchia-engine", features = ["serde"] } +cryptarchia-ledger = { path = "../../ledger/cryptarchia-ledger", features = ["serde"] } +futures = "0.3" +nomos-network = { path = "../network" } +nomos-mempool = { path = "../mempool" } +nomos-core = { path = "../../nomos-core" } +overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } +nomos-storage = { path = "../storage" } +rand_chacha = "0.3" +rand = "0.8" +serde = { version = "1", features = ["derive"] } +thiserror = "1.0" +tokio = { version = "1", features = ["sync"] } +tokio-stream = "0.1" +tokio-util = "0.7" +tracing = "0.1" +bls-signatures = "0.14" +serde_with = "3.0.0" +nomos-libp2p = { path = "../../nomos-libp2p", optional = true } +blake2 = "0.10" + +utoipa = { version = "4.0", optional = true } +serde_json = { version = "1", optional = true } + +[features] +default = [] +libp2p = ["nomos-network/libp2p", "nomos-libp2p"] +openapi = ["dep:utoipa", "serde_json"] + +[dev-dependencies] +serde_json = "1.0.96" diff --git a/nomos-services/cryptarchia-consensus/src/leadership.rs b/nomos-services/cryptarchia-consensus/src/leadership.rs new file mode 100644 index 00000000..e69de29b diff --git a/nomos-services/cryptarchia-consensus/src/lib.rs b/nomos-services/cryptarchia-consensus/src/lib.rs new file mode 100644 index 00000000..a989ef1a --- /dev/null +++ b/nomos-services/cryptarchia-consensus/src/lib.rs @@ -0,0 +1,418 @@ +mod leadership; +pub mod network; +use core::fmt::Debug; +use cryptarchia_ledger::LedgerState; +use futures::StreamExt; +use network::NetworkAdapter; +use nomos_core::block::Block; +use nomos_core::da::certificate::{BlobCertificateSelect, Certificate}; +use nomos_core::header::{Header, HeaderId}; +use nomos_core::tx::{Transaction, TxSelect}; +use nomos_mempool::{ + backend::MemPool, network::NetworkAdapter as MempoolAdapter, Certificate as CertDiscriminant, + MempoolMsg, MempoolService, Transaction as TxDiscriminant, +}; +use nomos_network::NetworkService; +use nomos_storage::{backends::StorageBackend, StorageMsg, StorageService}; +use overwatch_rs::services::life_cycle::LifecycleMessage; +use overwatch_rs::services::relay::{OutboundRelay, Relay, RelayMessage}; +use overwatch_rs::services::{ + handle::ServiceStateHandle, + state::{NoOperator, NoState}, + ServiceCore, ServiceData, ServiceId, +}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use serde_with::serde_as; +use std::hash::Hash; +use thiserror::Error; +use tokio::sync::oneshot::Sender; +use tracing::{error, instrument}; + +#[derive(Debug, Clone, Error)] +pub enum Error { + #[error("Ledger error: {0}")] + Ledger(#[from] cryptarchia_ledger::LedgerError), + #[error("Consensus error: {0}")] + Consensus(#[from] cryptarchia_engine::Error), +} + +struct Cryptarchia { + ledger: cryptarchia_ledger::Ledger, + consensus: cryptarchia_engine::Cryptarchia, +} + +impl Cryptarchia { + fn tip(&self) -> HeaderId { + self.consensus.tip() + } + + fn try_apply_header(&self, header: &Header) -> Result { + let header = header.cryptarchia(); + let id = header.id(); + let parent = header.parent(); + let slot = header.slot(); + let ledger = self.ledger.try_update( + id, + parent, + header.slot(), + header.leader_proof(), + header.orphaned_proofs().into_iter().map(|imported_header| { + (imported_header.id(), imported_header.leader_proof().clone()) + }), + )?; + let consensus = self.consensus.receive_block(id, parent, slot)?; + + Ok(Self { ledger, consensus }) + } +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct CryptarchiaSettings { + #[serde(default)] + pub transaction_selector_settings: Ts, + #[serde(default)] + pub blob_selector_settings: Bs, + pub consensus_config: cryptarchia_engine::Config, + pub ledger_config: cryptarchia_ledger::Config, + pub genesis_state: LedgerState, +} + +impl CryptarchiaSettings { + #[inline] + pub const fn new( + transaction_selector_settings: Ts, + blob_selector_settings: Bs, + consensus_config: cryptarchia_engine::Config, + ledger_config: cryptarchia_ledger::Config, + genesis_state: LedgerState, + ) -> Self { + Self { + transaction_selector_settings, + blob_selector_settings, + consensus_config, + ledger_config, + genesis_state, + } + } +} + +pub struct CryptarchiaConsensus +where + A: NetworkAdapter, + ClPoolAdapter: MempoolAdapter, + ClPool: MemPool, + DaPool: MemPool, + DaPoolAdapter: MempoolAdapter, + + ClPool::Item: Debug + 'static, + ClPool::Key: Debug + 'static, + DaPool::Item: Debug + 'static, + DaPool::Key: Debug + 'static, + A::Backend: 'static, + TxS: TxSelect, + BS: BlobCertificateSelect, + Storage: StorageBackend + Send + Sync + 'static, +{ + service_state: ServiceStateHandle, + // underlying networking backend. We need this so we can relay and check the types properly + // when implementing ServiceCore for CryptarchiaConsensus + network_relay: Relay>, + cl_mempool_relay: Relay>, + da_mempool_relay: Relay>, + storage_relay: Relay>, +} + +impl ServiceData + for CryptarchiaConsensus +where + A: NetworkAdapter, + ClPool: MemPool, + ClPool::Item: Debug, + ClPool::Key: Debug, + DaPool: MemPool, + DaPool::Item: Debug, + DaPool::Key: Debug, + ClPoolAdapter: MempoolAdapter, + DaPoolAdapter: MempoolAdapter, + TxS: TxSelect, + BS: BlobCertificateSelect, + Storage: StorageBackend + Send + Sync + 'static, +{ + const SERVICE_ID: ServiceId = "Cryptarchia"; + type Settings = CryptarchiaSettings; + type State = NoState; + type StateOperator = NoOperator; + type Message = ConsensusMsg; +} + +#[async_trait::async_trait] +impl ServiceCore + for CryptarchiaConsensus +where + A: NetworkAdapter + + Clone + + Send + + Sync + + 'static, + ClPool: MemPool + Send + Sync + 'static, + ClPool::Settings: Send + Sync + 'static, + DaPool: MemPool + Send + Sync + 'static, + DaPool::Settings: Send + Sync + 'static, + ClPool::Item: Transaction + + Debug + + Clone + + Eq + + Hash + + Serialize + + serde::de::DeserializeOwned + + Send + + Sync + + 'static, + DaPool::Item: Certificate + + Debug + + Clone + + Eq + + Hash + + Serialize + + DeserializeOwned + + Send + + Sync + + 'static, + ClPool::Key: Debug + Send + Sync, + DaPool::Key: Debug + Send + Sync, + ClPoolAdapter: MempoolAdapter + Send + Sync + 'static, + DaPoolAdapter: MempoolAdapter + Send + Sync + 'static, + TxS: TxSelect + Clone + Send + Sync + 'static, + TxS::Settings: Send + Sync + 'static, + BS: BlobCertificateSelect + Clone + Send + Sync + 'static, + BS::Settings: Send + Sync + 'static, + Storage: StorageBackend + Send + Sync + 'static, +{ + fn init(service_state: ServiceStateHandle) -> Result { + let network_relay = service_state.overwatch_handle.relay(); + let cl_mempool_relay = service_state.overwatch_handle.relay(); + let da_mempool_relay = service_state.overwatch_handle.relay(); + let storage_relay = service_state.overwatch_handle.relay(); + Ok(Self { + service_state, + network_relay, + cl_mempool_relay, + da_mempool_relay, + storage_relay, + }) + } + + async fn run(mut self) -> Result<(), overwatch_rs::DynError> { + let network_relay: OutboundRelay<_> = self + .network_relay + .connect() + .await + .expect("Relay connection with NetworkService should succeed"); + + let cl_mempool_relay: OutboundRelay<_> = self + .cl_mempool_relay + .connect() + .await + .expect("Relay connection with MemPoolService should succeed"); + + let da_mempool_relay: OutboundRelay<_> = self + .da_mempool_relay + .connect() + .await + .expect("Relay connection with MemPoolService should succeed"); + + let storage_relay: OutboundRelay<_> = self + .storage_relay + .connect() + .await + .expect("Relay connection with StorageService should succeed"); + + let CryptarchiaSettings { + consensus_config, + ledger_config, + genesis_state, + .. + } = self.service_state.settings_reader.get_updated_settings(); + + let genesis_id = HeaderId::from([0; 32]); + let mut cryptarchia = Cryptarchia { + ledger: >::from_genesis( + genesis_id, + genesis_state, + ledger_config, + ), + consensus: >::from_genesis( + genesis_id, + consensus_config, + ), + }; + let adapter = A::new(network_relay).await; + + let mut incoming_blocks = adapter.blocks_stream().await; + + let mut lifecycle_stream = self.service_state.lifecycle_handle.message_stream(); + loop { + tokio::select! { + Some(block) = incoming_blocks.next() => { + cryptarchia = Self::process_block( + cryptarchia, + block, + storage_relay.clone(), + cl_mempool_relay.clone(), + da_mempool_relay.clone(), + ) + .await + } + + Some(msg) = self.service_state.inbound_relay.next() => { + Self::process_message(&cryptarchia, msg); + } + Some(msg) = lifecycle_stream.next() => { + if Self::should_stop_service(msg).await { + break; + } + } + } + } + Ok(()) + } +} + +impl + CryptarchiaConsensus +where + A: NetworkAdapter + Clone + Send + Sync + 'static, + ClPool: MemPool + Send + Sync + 'static, + ClPool::Settings: Send + Sync + 'static, + DaPool: MemPool + Send + Sync + 'static, + DaPool::Settings: Send + Sync + 'static, + ClPool::Item: Transaction + + Debug + + Clone + + Eq + + Hash + + Serialize + + serde::de::DeserializeOwned + + Send + + Sync + + 'static, + DaPool::Item: Certificate + + Debug + + Clone + + Eq + + Hash + + Serialize + + DeserializeOwned + + Send + + Sync + + 'static, + TxS: TxSelect + Clone + Send + Sync + 'static, + BS: BlobCertificateSelect + Clone + Send + Sync + 'static, + ClPool::Key: Debug + Send + Sync, + DaPool::Key: Debug + Send + Sync, + ClPoolAdapter: MempoolAdapter + Send + Sync + 'static, + DaPoolAdapter: MempoolAdapter + Send + Sync + 'static, + Storage: StorageBackend + Send + Sync + 'static, +{ + async fn should_stop_service(message: LifecycleMessage) -> bool { + match message { + LifecycleMessage::Shutdown(sender) => { + if sender.send(()).is_err() { + error!( + "Error sending successful shutdown signal from service {}", + Self::SERVICE_ID + ); + } + true + } + LifecycleMessage::Kill => true, + } + } + + fn process_message(cryptarchia: &Cryptarchia, msg: ConsensusMsg) { + match msg { + ConsensusMsg::Info { tx } => { + let info = CryptarchiaInfo { + tip: cryptarchia.tip(), + }; + tx.send(info).unwrap_or_else(|e| { + tracing::error!("Could not send consensus info through channel: {:?}", e) + }); + } + } + } + + #[allow(clippy::type_complexity, clippy::too_many_arguments)] + #[instrument( + level = "debug", + skip(cryptarchia, storage_relay, cl_mempool_relay, da_mempool_relay) + )] + async fn process_block( + mut cryptarchia: Cryptarchia, + block: Block, + storage_relay: OutboundRelay>, + cl_mempool_relay: OutboundRelay>, + da_mempool_relay: OutboundRelay>, + ) -> Cryptarchia { + tracing::debug!("received proposal {:?}", block); + + let header = block.header(); + let id = header.id(); + match cryptarchia.try_apply_header(block.header()) { + Ok(new_state) => { + // remove included content from mempool + mark_in_block( + cl_mempool_relay, + block.transactions().map(Transaction::hash), + id, + ) + .await; + + mark_in_block(da_mempool_relay, block.blobs().map(Certificate::hash), id).await; + + // store block + let msg = >::new_store_message(header.id(), block); + if let Err((e, _msg)) = storage_relay.send(msg).await { + tracing::error!("Could not send block to storage: {e}"); + } + + cryptarchia = new_state; + } + Err(Error::Consensus(cryptarchia_engine::Error::ParentMissing(parent))) => { + tracing::debug!("missing parent {:?}", parent); + // TODO: request parent block + } + Err(e) => tracing::debug!("invalid block {:?}: {e:?}", block), + } + + cryptarchia + } +} + +#[derive(Debug)] +pub enum ConsensusMsg { + Info { tx: Sender }, +} + +impl RelayMessage for ConsensusMsg {} + +#[serde_as] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))] +pub struct CryptarchiaInfo { + pub tip: HeaderId, +} + +async fn mark_in_block( + mempool: OutboundRelay>, + ids: impl Iterator, + block: HeaderId, +) { + mempool + .send(MempoolMsg::MarkInBlock { + ids: ids.collect(), + block, + }) + .await + .unwrap_or_else(|(e, _)| tracing::error!("Could not mark items in block: {e}")) +} diff --git a/nomos-services/cryptarchia-consensus/src/network/adapters/libp2p.rs b/nomos-services/cryptarchia-consensus/src/network/adapters/libp2p.rs new file mode 100644 index 00000000..306f3c39 --- /dev/null +++ b/nomos-services/cryptarchia-consensus/src/network/adapters/libp2p.rs @@ -0,0 +1,123 @@ +// std +use std::hash::Hash; +// crates +use serde::{de::DeserializeOwned, Serialize}; +use tokio::sync::broadcast::error::RecvError; +use tokio_stream::{wrappers::BroadcastStream, StreamExt}; +// internal +use crate::network::{messages::NetworkMessage, BoxedStream, NetworkAdapter}; +use nomos_core::{block::Block, wire}; +use nomos_network::{ + backends::libp2p::{Command, Event, EventKind, Libp2p}, + NetworkMsg, NetworkService, +}; +use overwatch_rs::services::{relay::OutboundRelay, ServiceData}; + +const TOPIC: &str = "/cryptarchia/proto"; +const BUFFER_SIZE: usize = 64; +type Relay = OutboundRelay< as ServiceData>::Message>; + +#[derive(Clone)] +pub struct LibP2pAdapter +where + Tx: Clone + Eq + Hash, + BlobCert: Clone + Eq + Hash, +{ + network_relay: OutboundRelay< as ServiceData>::Message>, + blocks: tokio::sync::broadcast::Sender>, +} + +impl LibP2pAdapter +where + Tx: Clone + Eq + Hash + Serialize, + BlobCert: Clone + Eq + Hash + Serialize, +{ + async fn subscribe(relay: &Relay, topic: &str) { + if let Err((e, _)) = relay + .send(NetworkMsg::Process(Command::Subscribe(topic.into()))) + .await + { + tracing::error!("error subscribing to {topic}: {e}"); + }; + } +} + +#[async_trait::async_trait] +impl NetworkAdapter for LibP2pAdapter +where + Tx: Serialize + DeserializeOwned + Clone + Eq + Hash + Send + 'static, + BlobCert: Serialize + DeserializeOwned + Clone + Eq + Hash + Send + 'static, +{ + type Backend = Libp2p; + type Tx = Tx; + type BlobCertificate = BlobCert; + + async fn new(network_relay: Relay) -> Self { + let relay = network_relay.clone(); + Self::subscribe(&relay, TOPIC).await; + let blocks = tokio::sync::broadcast::Sender::new(BUFFER_SIZE); + let blocks_sender = blocks.clone(); + tracing::debug!("Starting up..."); + // this wait seems to be helpful in some cases since we give the time + // to the network to establish connections before we start sending messages + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + // TODO: maybe we need the runtime handle here? + tokio::spawn(async move { + let (sender, receiver) = tokio::sync::oneshot::channel(); + if let Err((e, _)) = relay + .send(NetworkMsg::Subscribe { + kind: EventKind::Message, + sender, + }) + .await + { + tracing::error!("error subscribing to incoming messages: {e}"); + } + + let mut incoming_messages = receiver.await.unwrap(); + loop { + match incoming_messages.recv().await { + Ok(Event::Message(message)) => { + match nomos_core::wire::deserialize(&message.data) { + Ok(msg) => match msg { + NetworkMessage::Block(block) => { + tracing::debug!("received block {:?}", block.header().id()); + if let Err(err) = blocks_sender.send(block) { + tracing::error!("error sending block to consensus: {err}"); + } + } + }, + _ => tracing::debug!("unrecognized gossipsub message"), + } + } + Err(RecvError::Lagged(n)) => { + tracing::error!("lagged messages: {n}") + } + Err(RecvError::Closed) => unreachable!(), + } + } + }); + Self { + network_relay, + blocks, + } + } + + async fn blocks_stream(&self) -> BoxedStream> { + Box::new(BroadcastStream::new(self.blocks.subscribe()).filter_map(Result::ok)) + } + + async fn broadcast(&self, message: NetworkMessage) { + if let Err((e, message)) = self + .network_relay + .send(NetworkMsg::Process(Command::Broadcast { + message: wire::serialize(&message).unwrap().into_boxed_slice(), + topic: TOPIC.into(), + })) + .await + { + tracing::error!("error broadcasting {message:?}: {e}"); + }; + } +} diff --git a/nomos-services/cryptarchia-consensus/src/network/adapters/mod.rs b/nomos-services/cryptarchia-consensus/src/network/adapters/mod.rs new file mode 100644 index 00000000..a22ade97 --- /dev/null +++ b/nomos-services/cryptarchia-consensus/src/network/adapters/mod.rs @@ -0,0 +1,2 @@ +#[cfg(feature = "libp2p")] +pub mod libp2p; diff --git a/nomos-services/cryptarchia-consensus/src/network/messages.rs b/nomos-services/cryptarchia-consensus/src/network/messages.rs new file mode 100644 index 00000000..d0caf7c4 --- /dev/null +++ b/nomos-services/cryptarchia-consensus/src/network/messages.rs @@ -0,0 +1,15 @@ +// std +use std::hash::Hash; +// crates +use serde::{Deserialize, Serialize}; +// internal +use nomos_core::block::Block; + +#[derive(Serialize, Deserialize)] +pub enum NetworkMessage +where + Tx: Clone + Eq + Hash, + Blob: Clone + Eq + Hash, +{ + Block(Block), +} diff --git a/nomos-services/cryptarchia-consensus/src/network/mod.rs b/nomos-services/cryptarchia-consensus/src/network/mod.rs new file mode 100644 index 00000000..409669bc --- /dev/null +++ b/nomos-services/cryptarchia-consensus/src/network/mod.rs @@ -0,0 +1,29 @@ +pub mod adapters; +pub mod messages; + +// std +use std::hash::Hash; +// crates +use futures::Stream; +use nomos_core::block::Block; +// internal +use crate::network::messages::NetworkMessage; +use nomos_network::backends::NetworkBackend; +use nomos_network::NetworkService; +use overwatch_rs::services::relay::OutboundRelay; +use overwatch_rs::services::ServiceData; +use serde::{de::DeserializeOwned, Serialize}; + +type BoxedStream = Box + Send + Sync + Unpin>; + +#[async_trait::async_trait] +pub trait NetworkAdapter { + type Backend: NetworkBackend + 'static; + type Tx: Serialize + DeserializeOwned + Clone + Eq + Hash + 'static; + type BlobCertificate: Serialize + DeserializeOwned + Clone + Eq + Hash + 'static; + async fn new( + network_relay: OutboundRelay< as ServiceData>::Message>, + ) -> Self; + async fn blocks_stream(&self) -> BoxedStream>; + async fn broadcast(&self, message: NetworkMessage); +}