Add cryptarchia consensus service (#612)
* add cryptarchia consensus service * fmt * clippy happy * review comments
This commit is contained in:
parent
dbda061f04
commit
1b925d9a3a
|
@ -9,6 +9,7 @@ members = [
|
|||
"nomos-services/network",
|
||||
"nomos-services/storage",
|
||||
"nomos-services/carnot-consensus",
|
||||
"nomos-services/cryptarchia-consensus",
|
||||
"nomos-services/mempool",
|
||||
"nomos-services/http",
|
||||
"nomos-services/data-availability",
|
||||
|
|
|
@ -8,12 +8,6 @@ mod types;
|
|||
pub use overlay::Overlay;
|
||||
pub use types::*;
|
||||
|
||||
/// Re-export of the OpenAPI types
|
||||
#[cfg(feature = "openapi")]
|
||||
pub mod openapi {
|
||||
pub use crate::types::BlockId;
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct Carnot<O: Overlay, Id: Eq + Hash> {
|
||||
id: NodeId,
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct Config {
|
||||
// The k parameter in the Common Prefix property.
|
||||
|
|
|
@ -4,6 +4,7 @@ use std::ops::Add;
|
|||
#[derive(Clone, Debug, Eq, PartialEq, Copy, Hash, PartialOrd, Ord)]
|
||||
pub struct Slot(u64);
|
||||
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
#[derive(Clone, Debug, Eq, PartialEq, Copy, Hash, PartialOrd, Ord)]
|
||||
pub struct Epoch(u32);
|
||||
|
||||
|
|
|
@ -15,4 +15,4 @@ cryptarchia-engine = { path = "../../consensus/cryptarchia-engine" }
|
|||
nomos-utils = { path = "../../nomos-utils", optional = true }
|
||||
|
||||
[features]
|
||||
serde = ["dep:serde", "nomos-utils/serde"]
|
||||
serde = ["dep:serde", "nomos-utils/serde", "rpds/serde"]
|
|
@ -1,5 +1,6 @@
|
|||
use cryptarchia_engine::{Epoch, Slot};
|
||||
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
pub struct Config {
|
||||
// The stake distribution is always taken at the beginning of the previous epoch.
|
||||
|
|
|
@ -7,10 +7,11 @@ mod utils;
|
|||
use blake2::Digest;
|
||||
use cryptarchia_engine::{Epoch, Slot};
|
||||
use crypto::Blake2b;
|
||||
use rpds::HashTrieSet;
|
||||
use std::{collections::HashMap, hash::Hash};
|
||||
use thiserror::Error;
|
||||
|
||||
type HashTrieSet<T> = rpds::HashTrieSetSync<T>;
|
||||
|
||||
pub use config::Config;
|
||||
pub use leader_proof::*;
|
||||
pub use nonce::*;
|
||||
|
@ -31,6 +32,7 @@ pub enum LedgerError<Id> {
|
|||
OrphanMissing(Id),
|
||||
}
|
||||
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub struct EpochState {
|
||||
// The epoch this snapshot is for
|
||||
|
@ -131,6 +133,7 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||
#[derive(Clone, Eq, PartialEq)]
|
||||
pub struct LedgerState {
|
||||
// commitments to coins that can be used to propose new blocks
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
[package]
|
||||
name = "cryptarchia-consensus"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
async-trait = "0.1"
|
||||
bytes = "1.3"
|
||||
chrono = "0.4"
|
||||
cryptarchia-engine = { path = "../../consensus/cryptarchia-engine", features = ["serde"] }
|
||||
cryptarchia-ledger = { path = "../../ledger/cryptarchia-ledger", features = ["serde"] }
|
||||
futures = "0.3"
|
||||
nomos-network = { path = "../network" }
|
||||
nomos-mempool = { path = "../mempool" }
|
||||
nomos-core = { path = "../../nomos-core" }
|
||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
|
||||
nomos-storage = { path = "../storage" }
|
||||
rand_chacha = "0.3"
|
||||
rand = "0.8"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
thiserror = "1.0"
|
||||
tokio = { version = "1", features = ["sync"] }
|
||||
tokio-stream = "0.1"
|
||||
tokio-util = "0.7"
|
||||
tracing = "0.1"
|
||||
bls-signatures = "0.14"
|
||||
serde_with = "3.0.0"
|
||||
nomos-libp2p = { path = "../../nomos-libp2p", optional = true }
|
||||
blake2 = "0.10"
|
||||
|
||||
utoipa = { version = "4.0", optional = true }
|
||||
serde_json = { version = "1", optional = true }
|
||||
|
||||
[features]
|
||||
default = []
|
||||
libp2p = ["nomos-network/libp2p", "nomos-libp2p"]
|
||||
openapi = ["dep:utoipa", "serde_json"]
|
||||
|
||||
[dev-dependencies]
|
||||
serde_json = "1.0.96"
|
|
@ -0,0 +1 @@
|
|||
|
|
@ -0,0 +1,418 @@
|
|||
mod leadership;
|
||||
pub mod network;
|
||||
use core::fmt::Debug;
|
||||
use cryptarchia_ledger::LedgerState;
|
||||
use futures::StreamExt;
|
||||
use network::NetworkAdapter;
|
||||
use nomos_core::block::Block;
|
||||
use nomos_core::da::certificate::{BlobCertificateSelect, Certificate};
|
||||
use nomos_core::header::{cryptarchia, HeaderId};
|
||||
use nomos_core::tx::{Transaction, TxSelect};
|
||||
use nomos_mempool::{
|
||||
backend::MemPool, network::NetworkAdapter as MempoolAdapter, Certificate as CertDiscriminant,
|
||||
MempoolMsg, MempoolService, Transaction as TxDiscriminant,
|
||||
};
|
||||
use nomos_network::NetworkService;
|
||||
use nomos_storage::{backends::StorageBackend, StorageMsg, StorageService};
|
||||
use overwatch_rs::services::life_cycle::LifecycleMessage;
|
||||
use overwatch_rs::services::relay::{OutboundRelay, Relay, RelayMessage};
|
||||
use overwatch_rs::services::{
|
||||
handle::ServiceStateHandle,
|
||||
state::{NoOperator, NoState},
|
||||
ServiceCore, ServiceData, ServiceId,
|
||||
};
|
||||
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
||||
use serde_with::serde_as;
|
||||
use std::hash::Hash;
|
||||
use thiserror::Error;
|
||||
use tokio::sync::oneshot::Sender;
|
||||
use tracing::{error, instrument};
|
||||
|
||||
#[derive(Debug, Clone, Error)]
|
||||
pub enum Error {
|
||||
#[error("Ledger error: {0}")]
|
||||
Ledger(#[from] cryptarchia_ledger::LedgerError<HeaderId>),
|
||||
#[error("Consensus error: {0}")]
|
||||
Consensus(#[from] cryptarchia_engine::Error<HeaderId>),
|
||||
}
|
||||
|
||||
struct Cryptarchia {
|
||||
ledger: cryptarchia_ledger::Ledger<HeaderId>,
|
||||
consensus: cryptarchia_engine::Cryptarchia<HeaderId>,
|
||||
}
|
||||
|
||||
impl Cryptarchia {
|
||||
fn tip(&self) -> HeaderId {
|
||||
self.consensus.tip()
|
||||
}
|
||||
|
||||
fn try_apply_header(&self, header: &cryptarchia::Header) -> Result<Self, Error> {
|
||||
let id = header.id();
|
||||
let parent = header.parent();
|
||||
let slot = header.slot();
|
||||
let ledger = self.ledger.try_update(
|
||||
id,
|
||||
parent,
|
||||
slot,
|
||||
header.leader_proof(),
|
||||
header
|
||||
.orphaned_proofs()
|
||||
.iter()
|
||||
.map(|imported_header| (imported_header.id(), *imported_header.leader_proof())),
|
||||
)?;
|
||||
let consensus = self.consensus.receive_block(id, parent, slot)?;
|
||||
|
||||
Ok(Self { ledger, consensus })
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize, Clone)]
|
||||
pub struct CryptarchiaSettings<Ts, Bs> {
|
||||
#[serde(default)]
|
||||
pub transaction_selector_settings: Ts,
|
||||
#[serde(default)]
|
||||
pub blob_selector_settings: Bs,
|
||||
pub consensus_config: cryptarchia_engine::Config,
|
||||
pub ledger_config: cryptarchia_ledger::Config,
|
||||
pub genesis_state: LedgerState,
|
||||
}
|
||||
|
||||
impl<Ts, Bs> CryptarchiaSettings<Ts, Bs> {
|
||||
#[inline]
|
||||
pub const fn new(
|
||||
transaction_selector_settings: Ts,
|
||||
blob_selector_settings: Bs,
|
||||
consensus_config: cryptarchia_engine::Config,
|
||||
ledger_config: cryptarchia_ledger::Config,
|
||||
genesis_state: LedgerState,
|
||||
) -> Self {
|
||||
Self {
|
||||
transaction_selector_settings,
|
||||
blob_selector_settings,
|
||||
consensus_config,
|
||||
ledger_config,
|
||||
genesis_state,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct CryptarchiaConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage>
|
||||
where
|
||||
A: NetworkAdapter,
|
||||
ClPoolAdapter: MempoolAdapter<Item = ClPool::Item, Key = ClPool::Key>,
|
||||
ClPool: MemPool<BlockId = HeaderId>,
|
||||
DaPool: MemPool<BlockId = HeaderId>,
|
||||
DaPoolAdapter: MempoolAdapter<Item = DaPool::Item, Key = DaPool::Key>,
|
||||
|
||||
ClPool::Item: Debug + 'static,
|
||||
ClPool::Key: Debug + 'static,
|
||||
DaPool::Item: Debug + 'static,
|
||||
DaPool::Key: Debug + 'static,
|
||||
A::Backend: 'static,
|
||||
TxS: TxSelect<Tx = ClPool::Item>,
|
||||
BS: BlobCertificateSelect<Certificate = DaPool::Item>,
|
||||
Storage: StorageBackend + Send + Sync + 'static,
|
||||
{
|
||||
service_state: ServiceStateHandle<Self>,
|
||||
// underlying networking backend. We need this so we can relay and check the types properly
|
||||
// when implementing ServiceCore for CryptarchiaConsensus
|
||||
network_relay: Relay<NetworkService<A::Backend>>,
|
||||
cl_mempool_relay: Relay<MempoolService<ClPoolAdapter, ClPool, TxDiscriminant>>,
|
||||
da_mempool_relay: Relay<MempoolService<DaPoolAdapter, DaPool, CertDiscriminant>>,
|
||||
storage_relay: Relay<StorageService<Storage>>,
|
||||
}
|
||||
|
||||
impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage> ServiceData
|
||||
for CryptarchiaConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage>
|
||||
where
|
||||
A: NetworkAdapter,
|
||||
ClPool: MemPool<BlockId = HeaderId>,
|
||||
ClPool::Item: Debug,
|
||||
ClPool::Key: Debug,
|
||||
DaPool: MemPool<BlockId = HeaderId>,
|
||||
DaPool::Item: Debug,
|
||||
DaPool::Key: Debug,
|
||||
ClPoolAdapter: MempoolAdapter<Item = ClPool::Item, Key = ClPool::Key>,
|
||||
DaPoolAdapter: MempoolAdapter<Item = DaPool::Item, Key = DaPool::Key>,
|
||||
TxS: TxSelect<Tx = ClPool::Item>,
|
||||
BS: BlobCertificateSelect<Certificate = DaPool::Item>,
|
||||
Storage: StorageBackend + Send + Sync + 'static,
|
||||
{
|
||||
const SERVICE_ID: ServiceId = "Cryptarchia";
|
||||
type Settings = CryptarchiaSettings<TxS::Settings, BS::Settings>;
|
||||
type State = NoState<Self::Settings>;
|
||||
type StateOperator = NoOperator<Self::State>;
|
||||
type Message = ConsensusMsg;
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage> ServiceCore
|
||||
for CryptarchiaConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage>
|
||||
where
|
||||
A: NetworkAdapter<Tx = ClPool::Item, BlobCertificate = DaPool::Item>
|
||||
+ Clone
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
ClPool: MemPool<BlockId = HeaderId> + Send + Sync + 'static,
|
||||
ClPool::Settings: Send + Sync + 'static,
|
||||
DaPool: MemPool<BlockId = HeaderId> + Send + Sync + 'static,
|
||||
DaPool::Settings: Send + Sync + 'static,
|
||||
ClPool::Item: Transaction<Hash = ClPool::Key>
|
||||
+ Debug
|
||||
+ Clone
|
||||
+ Eq
|
||||
+ Hash
|
||||
+ Serialize
|
||||
+ serde::de::DeserializeOwned
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
DaPool::Item: Certificate<Hash = DaPool::Key>
|
||||
+ Debug
|
||||
+ Clone
|
||||
+ Eq
|
||||
+ Hash
|
||||
+ Serialize
|
||||
+ DeserializeOwned
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
ClPool::Key: Debug + Send + Sync,
|
||||
DaPool::Key: Debug + Send + Sync,
|
||||
ClPoolAdapter: MempoolAdapter<Item = ClPool::Item, Key = ClPool::Key> + Send + Sync + 'static,
|
||||
DaPoolAdapter: MempoolAdapter<Item = DaPool::Item, Key = DaPool::Key> + Send + Sync + 'static,
|
||||
TxS: TxSelect<Tx = ClPool::Item> + Clone + Send + Sync + 'static,
|
||||
TxS::Settings: Send + Sync + 'static,
|
||||
BS: BlobCertificateSelect<Certificate = DaPool::Item> + Clone + Send + Sync + 'static,
|
||||
BS::Settings: Send + Sync + 'static,
|
||||
Storage: StorageBackend + Send + Sync + 'static,
|
||||
{
|
||||
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
|
||||
let network_relay = service_state.overwatch_handle.relay();
|
||||
let cl_mempool_relay = service_state.overwatch_handle.relay();
|
||||
let da_mempool_relay = service_state.overwatch_handle.relay();
|
||||
let storage_relay = service_state.overwatch_handle.relay();
|
||||
Ok(Self {
|
||||
service_state,
|
||||
network_relay,
|
||||
cl_mempool_relay,
|
||||
da_mempool_relay,
|
||||
storage_relay,
|
||||
})
|
||||
}
|
||||
|
||||
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
|
||||
let network_relay: OutboundRelay<_> = self
|
||||
.network_relay
|
||||
.connect()
|
||||
.await
|
||||
.expect("Relay connection with NetworkService should succeed");
|
||||
|
||||
let cl_mempool_relay: OutboundRelay<_> = self
|
||||
.cl_mempool_relay
|
||||
.connect()
|
||||
.await
|
||||
.expect("Relay connection with MemPoolService should succeed");
|
||||
|
||||
let da_mempool_relay: OutboundRelay<_> = self
|
||||
.da_mempool_relay
|
||||
.connect()
|
||||
.await
|
||||
.expect("Relay connection with MemPoolService should succeed");
|
||||
|
||||
let storage_relay: OutboundRelay<_> = self
|
||||
.storage_relay
|
||||
.connect()
|
||||
.await
|
||||
.expect("Relay connection with StorageService should succeed");
|
||||
|
||||
let CryptarchiaSettings {
|
||||
consensus_config,
|
||||
ledger_config,
|
||||
genesis_state,
|
||||
..
|
||||
} = self.service_state.settings_reader.get_updated_settings();
|
||||
|
||||
let genesis_id = HeaderId::from([0; 32]);
|
||||
let mut cryptarchia = Cryptarchia {
|
||||
ledger: <cryptarchia_ledger::Ledger<_>>::from_genesis(
|
||||
genesis_id,
|
||||
genesis_state,
|
||||
ledger_config,
|
||||
),
|
||||
consensus: <cryptarchia_engine::Cryptarchia<_>>::from_genesis(
|
||||
genesis_id,
|
||||
consensus_config,
|
||||
),
|
||||
};
|
||||
let adapter = A::new(network_relay).await;
|
||||
|
||||
let mut incoming_blocks = adapter.blocks_stream().await;
|
||||
|
||||
let mut lifecycle_stream = self.service_state.lifecycle_handle.message_stream();
|
||||
loop {
|
||||
tokio::select! {
|
||||
Some(block) = incoming_blocks.next() => {
|
||||
cryptarchia = Self::process_block(
|
||||
cryptarchia,
|
||||
block,
|
||||
storage_relay.clone(),
|
||||
cl_mempool_relay.clone(),
|
||||
da_mempool_relay.clone(),
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
Some(msg) = self.service_state.inbound_relay.next() => {
|
||||
Self::process_message(&cryptarchia, msg);
|
||||
}
|
||||
Some(msg) = lifecycle_stream.next() => {
|
||||
if Self::should_stop_service(msg).await {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage>
|
||||
CryptarchiaConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage>
|
||||
where
|
||||
A: NetworkAdapter + Clone + Send + Sync + 'static,
|
||||
ClPool: MemPool<BlockId = HeaderId> + Send + Sync + 'static,
|
||||
ClPool::Settings: Send + Sync + 'static,
|
||||
DaPool: MemPool<BlockId = HeaderId> + Send + Sync + 'static,
|
||||
DaPool::Settings: Send + Sync + 'static,
|
||||
ClPool::Item: Transaction<Hash = ClPool::Key>
|
||||
+ Debug
|
||||
+ Clone
|
||||
+ Eq
|
||||
+ Hash
|
||||
+ Serialize
|
||||
+ serde::de::DeserializeOwned
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
DaPool::Item: Certificate<Hash = DaPool::Key>
|
||||
+ Debug
|
||||
+ Clone
|
||||
+ Eq
|
||||
+ Hash
|
||||
+ Serialize
|
||||
+ DeserializeOwned
|
||||
+ Send
|
||||
+ Sync
|
||||
+ 'static,
|
||||
TxS: TxSelect<Tx = ClPool::Item> + Clone + Send + Sync + 'static,
|
||||
BS: BlobCertificateSelect<Certificate = DaPool::Item> + Clone + Send + Sync + 'static,
|
||||
ClPool::Key: Debug + Send + Sync,
|
||||
DaPool::Key: Debug + Send + Sync,
|
||||
ClPoolAdapter: MempoolAdapter<Item = ClPool::Item, Key = ClPool::Key> + Send + Sync + 'static,
|
||||
DaPoolAdapter: MempoolAdapter<Item = DaPool::Item, Key = DaPool::Key> + Send + Sync + 'static,
|
||||
Storage: StorageBackend + Send + Sync + 'static,
|
||||
{
|
||||
async fn should_stop_service(message: LifecycleMessage) -> bool {
|
||||
match message {
|
||||
LifecycleMessage::Shutdown(sender) => {
|
||||
if sender.send(()).is_err() {
|
||||
error!(
|
||||
"Error sending successful shutdown signal from service {}",
|
||||
Self::SERVICE_ID
|
||||
);
|
||||
}
|
||||
true
|
||||
}
|
||||
LifecycleMessage::Kill => true,
|
||||
}
|
||||
}
|
||||
|
||||
fn process_message(cryptarchia: &Cryptarchia, msg: ConsensusMsg) {
|
||||
match msg {
|
||||
ConsensusMsg::Info { tx } => {
|
||||
let info = CryptarchiaInfo {
|
||||
tip: cryptarchia.tip(),
|
||||
};
|
||||
tx.send(info).unwrap_or_else(|e| {
|
||||
tracing::error!("Could not send consensus info through channel: {:?}", e)
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity, clippy::too_many_arguments)]
|
||||
#[instrument(
|
||||
level = "debug",
|
||||
skip(cryptarchia, storage_relay, cl_mempool_relay, da_mempool_relay)
|
||||
)]
|
||||
async fn process_block(
|
||||
mut cryptarchia: Cryptarchia,
|
||||
block: Block<ClPool::Item, DaPool::Item>,
|
||||
storage_relay: OutboundRelay<StorageMsg<Storage>>,
|
||||
cl_mempool_relay: OutboundRelay<MempoolMsg<HeaderId, ClPool::Item, ClPool::Key>>,
|
||||
da_mempool_relay: OutboundRelay<MempoolMsg<HeaderId, DaPool::Item, DaPool::Key>>,
|
||||
) -> Cryptarchia {
|
||||
tracing::debug!("received proposal {:?}", block);
|
||||
|
||||
let header = block.header();
|
||||
let id = header.id();
|
||||
match cryptarchia.try_apply_header(block.header().cryptarchia()) {
|
||||
Ok(new_state) => {
|
||||
// remove included content from mempool
|
||||
mark_in_block(
|
||||
cl_mempool_relay,
|
||||
block.transactions().map(Transaction::hash),
|
||||
id,
|
||||
)
|
||||
.await;
|
||||
|
||||
mark_in_block(da_mempool_relay, block.blobs().map(Certificate::hash), id).await;
|
||||
|
||||
// store block
|
||||
let msg = <StorageMsg<_>>::new_store_message(header.id(), block);
|
||||
if let Err((e, _msg)) = storage_relay.send(msg).await {
|
||||
tracing::error!("Could not send block to storage: {e}");
|
||||
}
|
||||
|
||||
cryptarchia = new_state;
|
||||
}
|
||||
Err(Error::Consensus(cryptarchia_engine::Error::ParentMissing(parent))) => {
|
||||
tracing::debug!("missing parent {:?}", parent);
|
||||
// TODO: request parent block
|
||||
}
|
||||
Err(e) => tracing::debug!("invalid block {:?}: {e:?}", block),
|
||||
}
|
||||
|
||||
cryptarchia
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum ConsensusMsg {
|
||||
Info { tx: Sender<CryptarchiaInfo> },
|
||||
}
|
||||
|
||||
impl RelayMessage for ConsensusMsg {}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
|
||||
pub struct CryptarchiaInfo {
|
||||
pub tip: HeaderId,
|
||||
}
|
||||
|
||||
async fn mark_in_block<Item, Key>(
|
||||
mempool: OutboundRelay<MempoolMsg<HeaderId, Item, Key>>,
|
||||
ids: impl Iterator<Item = Key>,
|
||||
block: HeaderId,
|
||||
) {
|
||||
mempool
|
||||
.send(MempoolMsg::MarkInBlock {
|
||||
ids: ids.collect(),
|
||||
block,
|
||||
})
|
||||
.await
|
||||
.unwrap_or_else(|(e, _)| tracing::error!("Could not mark items in block: {e}"))
|
||||
}
|
|
@ -0,0 +1,123 @@
|
|||
// std
|
||||
use std::hash::Hash;
|
||||
// crates
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
|
||||
// internal
|
||||
use crate::network::{messages::NetworkMessage, BoxedStream, NetworkAdapter};
|
||||
use nomos_core::{block::Block, wire};
|
||||
use nomos_network::{
|
||||
backends::libp2p::{Command, Event, EventKind, Libp2p},
|
||||
NetworkMsg, NetworkService,
|
||||
};
|
||||
use overwatch_rs::services::{relay::OutboundRelay, ServiceData};
|
||||
|
||||
const TOPIC: &str = "/cryptarchia/proto";
|
||||
const BUFFER_SIZE: usize = 64;
|
||||
type Relay<T> = OutboundRelay<<NetworkService<T> as ServiceData>::Message>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct LibP2pAdapter<Tx, BlobCert>
|
||||
where
|
||||
Tx: Clone + Eq + Hash,
|
||||
BlobCert: Clone + Eq + Hash,
|
||||
{
|
||||
network_relay: OutboundRelay<<NetworkService<Libp2p> as ServiceData>::Message>,
|
||||
blocks: tokio::sync::broadcast::Sender<Block<Tx, BlobCert>>,
|
||||
}
|
||||
|
||||
impl<Tx, BlobCert> LibP2pAdapter<Tx, BlobCert>
|
||||
where
|
||||
Tx: Clone + Eq + Hash + Serialize,
|
||||
BlobCert: Clone + Eq + Hash + Serialize,
|
||||
{
|
||||
async fn subscribe(relay: &Relay<Libp2p>, topic: &str) {
|
||||
if let Err((e, _)) = relay
|
||||
.send(NetworkMsg::Process(Command::Subscribe(topic.into())))
|
||||
.await
|
||||
{
|
||||
tracing::error!("error subscribing to {topic}: {e}");
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<Tx, BlobCert> NetworkAdapter for LibP2pAdapter<Tx, BlobCert>
|
||||
where
|
||||
Tx: Serialize + DeserializeOwned + Clone + Eq + Hash + Send + 'static,
|
||||
BlobCert: Serialize + DeserializeOwned + Clone + Eq + Hash + Send + 'static,
|
||||
{
|
||||
type Backend = Libp2p;
|
||||
type Tx = Tx;
|
||||
type BlobCertificate = BlobCert;
|
||||
|
||||
async fn new(network_relay: Relay<Libp2p>) -> Self {
|
||||
let relay = network_relay.clone();
|
||||
Self::subscribe(&relay, TOPIC).await;
|
||||
let blocks = tokio::sync::broadcast::Sender::new(BUFFER_SIZE);
|
||||
let blocks_sender = blocks.clone();
|
||||
tracing::debug!("Starting up...");
|
||||
// this wait seems to be helpful in some cases since we give the time
|
||||
// to the network to establish connections before we start sending messages
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
|
||||
// TODO: maybe we need the runtime handle here?
|
||||
tokio::spawn(async move {
|
||||
let (sender, receiver) = tokio::sync::oneshot::channel();
|
||||
if let Err((e, _)) = relay
|
||||
.send(NetworkMsg::Subscribe {
|
||||
kind: EventKind::Message,
|
||||
sender,
|
||||
})
|
||||
.await
|
||||
{
|
||||
tracing::error!("error subscribing to incoming messages: {e}");
|
||||
}
|
||||
|
||||
let mut incoming_messages = receiver.await.unwrap();
|
||||
loop {
|
||||
match incoming_messages.recv().await {
|
||||
Ok(Event::Message(message)) => {
|
||||
match nomos_core::wire::deserialize(&message.data) {
|
||||
Ok(msg) => match msg {
|
||||
NetworkMessage::Block(block) => {
|
||||
tracing::debug!("received block {:?}", block.header().id());
|
||||
if let Err(err) = blocks_sender.send(block) {
|
||||
tracing::error!("error sending block to consensus: {err}");
|
||||
}
|
||||
}
|
||||
},
|
||||
_ => tracing::debug!("unrecognized gossipsub message"),
|
||||
}
|
||||
}
|
||||
Err(RecvError::Lagged(n)) => {
|
||||
tracing::error!("lagged messages: {n}")
|
||||
}
|
||||
Err(RecvError::Closed) => unreachable!(),
|
||||
}
|
||||
}
|
||||
});
|
||||
Self {
|
||||
network_relay,
|
||||
blocks,
|
||||
}
|
||||
}
|
||||
|
||||
async fn blocks_stream(&self) -> BoxedStream<Block<Self::Tx, Self::BlobCertificate>> {
|
||||
Box::new(BroadcastStream::new(self.blocks.subscribe()).filter_map(Result::ok))
|
||||
}
|
||||
|
||||
async fn broadcast(&self, message: NetworkMessage<Self::Tx, Self::BlobCertificate>) {
|
||||
if let Err((e, message)) = self
|
||||
.network_relay
|
||||
.send(NetworkMsg::Process(Command::Broadcast {
|
||||
message: wire::serialize(&message).unwrap().into_boxed_slice(),
|
||||
topic: TOPIC.into(),
|
||||
}))
|
||||
.await
|
||||
{
|
||||
tracing::error!("error broadcasting {message:?}: {e}");
|
||||
};
|
||||
}
|
||||
}
|
|
@ -0,0 +1,2 @@
|
|||
#[cfg(feature = "libp2p")]
|
||||
pub mod libp2p;
|
|
@ -0,0 +1,15 @@
|
|||
// std
|
||||
use std::hash::Hash;
|
||||
// crates
|
||||
use serde::{Deserialize, Serialize};
|
||||
// internal
|
||||
use nomos_core::block::Block;
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
pub enum NetworkMessage<Tx, Blob>
|
||||
where
|
||||
Tx: Clone + Eq + Hash,
|
||||
Blob: Clone + Eq + Hash,
|
||||
{
|
||||
Block(Block<Tx, Blob>),
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
pub mod adapters;
|
||||
pub mod messages;
|
||||
|
||||
// std
|
||||
use std::hash::Hash;
|
||||
// crates
|
||||
use futures::Stream;
|
||||
use nomos_core::block::Block;
|
||||
// internal
|
||||
use crate::network::messages::NetworkMessage;
|
||||
use nomos_network::backends::NetworkBackend;
|
||||
use nomos_network::NetworkService;
|
||||
use overwatch_rs::services::relay::OutboundRelay;
|
||||
use overwatch_rs::services::ServiceData;
|
||||
use serde::{de::DeserializeOwned, Serialize};
|
||||
|
||||
type BoxedStream<T> = Box<dyn Stream<Item = T> + Send + Sync + Unpin>;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait NetworkAdapter {
|
||||
type Backend: NetworkBackend + 'static;
|
||||
type Tx: Serialize + DeserializeOwned + Clone + Eq + Hash + 'static;
|
||||
type BlobCertificate: Serialize + DeserializeOwned + Clone + Eq + Hash + 'static;
|
||||
async fn new(
|
||||
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
|
||||
) -> Self;
|
||||
async fn blocks_stream(&self) -> BoxedStream<Block<Self::Tx, Self::BlobCertificate>>;
|
||||
async fn broadcast(&self, message: NetworkMessage<Self::Tx, Self::BlobCertificate>);
|
||||
}
|
Loading…
Reference in New Issue