add cryptarchia consensus service
This commit is contained in:
parent
dbda061f04
commit
3c18ac4fe9
|
@ -9,6 +9,7 @@ members = [
|
||||||
"nomos-services/network",
|
"nomos-services/network",
|
||||||
"nomos-services/storage",
|
"nomos-services/storage",
|
||||||
"nomos-services/carnot-consensus",
|
"nomos-services/carnot-consensus",
|
||||||
|
"nomos-services/cryptarchia-consensus",
|
||||||
"nomos-services/mempool",
|
"nomos-services/mempool",
|
||||||
"nomos-services/http",
|
"nomos-services/http",
|
||||||
"nomos-services/data-availability",
|
"nomos-services/data-availability",
|
||||||
|
|
|
@ -8,12 +8,6 @@ mod types;
|
||||||
pub use overlay::Overlay;
|
pub use overlay::Overlay;
|
||||||
pub use types::*;
|
pub use types::*;
|
||||||
|
|
||||||
/// Re-export of the OpenAPI types
|
|
||||||
#[cfg(feature = "openapi")]
|
|
||||||
pub mod openapi {
|
|
||||||
pub use crate::types::BlockId;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq)]
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
pub struct Carnot<O: Overlay, Id: Eq + Hash> {
|
pub struct Carnot<O: Overlay, Id: Eq + Hash> {
|
||||||
id: NodeId,
|
id: NodeId,
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||||
#[derive(Clone, Debug, PartialEq)]
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
// The k parameter in the Common Prefix property.
|
// The k parameter in the Common Prefix property.
|
||||||
|
|
|
@ -4,6 +4,7 @@ use std::ops::Add;
|
||||||
#[derive(Clone, Debug, Eq, PartialEq, Copy, Hash, PartialOrd, Ord)]
|
#[derive(Clone, Debug, Eq, PartialEq, Copy, Hash, PartialOrd, Ord)]
|
||||||
pub struct Slot(u64);
|
pub struct Slot(u64);
|
||||||
|
|
||||||
|
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||||
#[derive(Clone, Debug, Eq, PartialEq, Copy, Hash, PartialOrd, Ord)]
|
#[derive(Clone, Debug, Eq, PartialEq, Copy, Hash, PartialOrd, Ord)]
|
||||||
pub struct Epoch(u32);
|
pub struct Epoch(u32);
|
||||||
|
|
||||||
|
|
|
@ -15,4 +15,4 @@ cryptarchia-engine = { path = "../../consensus/cryptarchia-engine" }
|
||||||
nomos-utils = { path = "../../nomos-utils", optional = true }
|
nomos-utils = { path = "../../nomos-utils", optional = true }
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
serde = ["dep:serde", "nomos-utils/serde"]
|
serde = ["dep:serde", "nomos-utils/serde", "rpds/serde"]
|
|
@ -1,5 +1,6 @@
|
||||||
use cryptarchia_engine::{Epoch, Slot};
|
use cryptarchia_engine::{Epoch, Slot};
|
||||||
|
|
||||||
|
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||||
#[derive(Clone, Debug, PartialEq)]
|
#[derive(Clone, Debug, PartialEq)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
// The stake distribution is always taken at the beginning of the previous epoch.
|
// The stake distribution is always taken at the beginning of the previous epoch.
|
||||||
|
|
|
@ -7,10 +7,11 @@ mod utils;
|
||||||
use blake2::Digest;
|
use blake2::Digest;
|
||||||
use cryptarchia_engine::{Epoch, Slot};
|
use cryptarchia_engine::{Epoch, Slot};
|
||||||
use crypto::Blake2b;
|
use crypto::Blake2b;
|
||||||
use rpds::HashTrieSet;
|
|
||||||
use std::{collections::HashMap, hash::Hash};
|
use std::{collections::HashMap, hash::Hash};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
|
||||||
|
type HashTrieSet<T> = rpds::HashTrieSetSync<T>;
|
||||||
|
|
||||||
pub use config::Config;
|
pub use config::Config;
|
||||||
pub use leader_proof::*;
|
pub use leader_proof::*;
|
||||||
pub use nonce::*;
|
pub use nonce::*;
|
||||||
|
@ -31,6 +32,7 @@ pub enum LedgerError<Id> {
|
||||||
OrphanMissing(Id),
|
OrphanMissing(Id),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||||
pub struct EpochState {
|
pub struct EpochState {
|
||||||
// The epoch this snapshot is for
|
// The epoch this snapshot is for
|
||||||
|
@ -131,6 +133,7 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
|
||||||
#[derive(Clone, Eq, PartialEq)]
|
#[derive(Clone, Eq, PartialEq)]
|
||||||
pub struct LedgerState {
|
pub struct LedgerState {
|
||||||
// commitments to coins that can be used to propose new blocks
|
// commitments to coins that can be used to propose new blocks
|
||||||
|
|
|
@ -0,0 +1,42 @@
|
||||||
|
[package]
|
||||||
|
name = "cryptarchia-consensus"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
async-trait = "0.1"
|
||||||
|
bytes = "1.3"
|
||||||
|
chrono = "0.4"
|
||||||
|
cryptarchia-engine = { path = "../../consensus/cryptarchia-engine", features = ["serde"] }
|
||||||
|
cryptarchia-ledger = { path = "../../ledger/cryptarchia-ledger", features = ["serde"] }
|
||||||
|
futures = "0.3"
|
||||||
|
nomos-network = { path = "../network" }
|
||||||
|
nomos-mempool = { path = "../mempool" }
|
||||||
|
nomos-core = { path = "../../nomos-core" }
|
||||||
|
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
|
||||||
|
nomos-storage = { path = "../storage" }
|
||||||
|
rand_chacha = "0.3"
|
||||||
|
rand = "0.8"
|
||||||
|
serde = { version = "1", features = ["derive"] }
|
||||||
|
thiserror = "1.0"
|
||||||
|
tokio = { version = "1", features = ["sync"] }
|
||||||
|
tokio-stream = "0.1"
|
||||||
|
tokio-util = "0.7"
|
||||||
|
tracing = "0.1"
|
||||||
|
bls-signatures = "0.14"
|
||||||
|
serde_with = "3.0.0"
|
||||||
|
nomos-libp2p = { path = "../../nomos-libp2p", optional = true }
|
||||||
|
blake2 = "0.10"
|
||||||
|
|
||||||
|
utoipa = { version = "4.0", optional = true }
|
||||||
|
serde_json = { version = "1", optional = true }
|
||||||
|
|
||||||
|
[features]
|
||||||
|
default = []
|
||||||
|
libp2p = ["nomos-network/libp2p", "nomos-libp2p"]
|
||||||
|
openapi = ["dep:utoipa", "serde_json"]
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
serde_json = "1.0.96"
|
|
@ -0,0 +1,418 @@
|
||||||
|
mod leadership;
|
||||||
|
pub mod network;
|
||||||
|
use core::fmt::Debug;
|
||||||
|
use cryptarchia_ledger::LedgerState;
|
||||||
|
use futures::StreamExt;
|
||||||
|
use network::NetworkAdapter;
|
||||||
|
use nomos_core::block::Block;
|
||||||
|
use nomos_core::da::certificate::{BlobCertificateSelect, Certificate};
|
||||||
|
use nomos_core::header::{Header, HeaderId};
|
||||||
|
use nomos_core::tx::{Transaction, TxSelect};
|
||||||
|
use nomos_mempool::{
|
||||||
|
backend::MemPool, network::NetworkAdapter as MempoolAdapter, Certificate as CertDiscriminant,
|
||||||
|
MempoolMsg, MempoolService, Transaction as TxDiscriminant,
|
||||||
|
};
|
||||||
|
use nomos_network::NetworkService;
|
||||||
|
use nomos_storage::{backends::StorageBackend, StorageMsg, StorageService};
|
||||||
|
use overwatch_rs::services::life_cycle::LifecycleMessage;
|
||||||
|
use overwatch_rs::services::relay::{OutboundRelay, Relay, RelayMessage};
|
||||||
|
use overwatch_rs::services::{
|
||||||
|
handle::ServiceStateHandle,
|
||||||
|
state::{NoOperator, NoState},
|
||||||
|
ServiceCore, ServiceData, ServiceId,
|
||||||
|
};
|
||||||
|
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
||||||
|
use serde_with::serde_as;
|
||||||
|
use std::hash::Hash;
|
||||||
|
use thiserror::Error;
|
||||||
|
use tokio::sync::oneshot::Sender;
|
||||||
|
use tracing::{error, instrument};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Error)]
|
||||||
|
pub enum Error {
|
||||||
|
#[error("Ledger error: {0}")]
|
||||||
|
Ledger(#[from] cryptarchia_ledger::LedgerError<HeaderId>),
|
||||||
|
#[error("Consensus error: {0}")]
|
||||||
|
Consensus(#[from] cryptarchia_engine::Error<HeaderId>),
|
||||||
|
}
|
||||||
|
|
||||||
|
struct Cryptarchia {
|
||||||
|
ledger: cryptarchia_ledger::Ledger<HeaderId>,
|
||||||
|
consensus: cryptarchia_engine::Cryptarchia<HeaderId>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Cryptarchia {
|
||||||
|
fn tip(&self) -> HeaderId {
|
||||||
|
self.consensus.tip()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_apply_header(&self, header: &Header) -> Result<Self, Error> {
|
||||||
|
let header = header.cryptarchia();
|
||||||
|
let id = header.id();
|
||||||
|
let parent = header.parent();
|
||||||
|
let slot = header.slot();
|
||||||
|
let ledger = self.ledger.try_update(
|
||||||
|
id,
|
||||||
|
parent,
|
||||||
|
header.slot(),
|
||||||
|
header.leader_proof(),
|
||||||
|
header.orphaned_proofs().into_iter().map(|imported_header| {
|
||||||
|
(imported_header.id(), imported_header.leader_proof().clone())
|
||||||
|
}),
|
||||||
|
)?;
|
||||||
|
let consensus = self.consensus.receive_block(id, parent, slot)?;
|
||||||
|
|
||||||
|
Ok(Self { ledger, consensus })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, Serialize, Clone)]
|
||||||
|
pub struct CryptarchiaSettings<Ts, Bs> {
|
||||||
|
#[serde(default)]
|
||||||
|
pub transaction_selector_settings: Ts,
|
||||||
|
#[serde(default)]
|
||||||
|
pub blob_selector_settings: Bs,
|
||||||
|
pub consensus_config: cryptarchia_engine::Config,
|
||||||
|
pub ledger_config: cryptarchia_ledger::Config,
|
||||||
|
pub genesis_state: LedgerState,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Ts, Bs> CryptarchiaSettings<Ts, Bs> {
|
||||||
|
#[inline]
|
||||||
|
pub const fn new(
|
||||||
|
transaction_selector_settings: Ts,
|
||||||
|
blob_selector_settings: Bs,
|
||||||
|
consensus_config: cryptarchia_engine::Config,
|
||||||
|
ledger_config: cryptarchia_ledger::Config,
|
||||||
|
genesis_state: LedgerState,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
transaction_selector_settings,
|
||||||
|
blob_selector_settings,
|
||||||
|
consensus_config,
|
||||||
|
ledger_config,
|
||||||
|
genesis_state,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct CryptarchiaConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage>
|
||||||
|
where
|
||||||
|
A: NetworkAdapter,
|
||||||
|
ClPoolAdapter: MempoolAdapter<Item = ClPool::Item, Key = ClPool::Key>,
|
||||||
|
ClPool: MemPool<BlockId = HeaderId>,
|
||||||
|
DaPool: MemPool<BlockId = HeaderId>,
|
||||||
|
DaPoolAdapter: MempoolAdapter<Item = DaPool::Item, Key = DaPool::Key>,
|
||||||
|
|
||||||
|
ClPool::Item: Debug + 'static,
|
||||||
|
ClPool::Key: Debug + 'static,
|
||||||
|
DaPool::Item: Debug + 'static,
|
||||||
|
DaPool::Key: Debug + 'static,
|
||||||
|
A::Backend: 'static,
|
||||||
|
TxS: TxSelect<Tx = ClPool::Item>,
|
||||||
|
BS: BlobCertificateSelect<Certificate = DaPool::Item>,
|
||||||
|
Storage: StorageBackend + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
service_state: ServiceStateHandle<Self>,
|
||||||
|
// underlying networking backend. We need this so we can relay and check the types properly
|
||||||
|
// when implementing ServiceCore for CryptarchiaConsensus
|
||||||
|
network_relay: Relay<NetworkService<A::Backend>>,
|
||||||
|
cl_mempool_relay: Relay<MempoolService<ClPoolAdapter, ClPool, TxDiscriminant>>,
|
||||||
|
da_mempool_relay: Relay<MempoolService<DaPoolAdapter, DaPool, CertDiscriminant>>,
|
||||||
|
storage_relay: Relay<StorageService<Storage>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage> ServiceData
|
||||||
|
for CryptarchiaConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage>
|
||||||
|
where
|
||||||
|
A: NetworkAdapter,
|
||||||
|
ClPool: MemPool<BlockId = HeaderId>,
|
||||||
|
ClPool::Item: Debug,
|
||||||
|
ClPool::Key: Debug,
|
||||||
|
DaPool: MemPool<BlockId = HeaderId>,
|
||||||
|
DaPool::Item: Debug,
|
||||||
|
DaPool::Key: Debug,
|
||||||
|
ClPoolAdapter: MempoolAdapter<Item = ClPool::Item, Key = ClPool::Key>,
|
||||||
|
DaPoolAdapter: MempoolAdapter<Item = DaPool::Item, Key = DaPool::Key>,
|
||||||
|
TxS: TxSelect<Tx = ClPool::Item>,
|
||||||
|
BS: BlobCertificateSelect<Certificate = DaPool::Item>,
|
||||||
|
Storage: StorageBackend + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
const SERVICE_ID: ServiceId = "Cryptarchia";
|
||||||
|
type Settings = CryptarchiaSettings<TxS::Settings, BS::Settings>;
|
||||||
|
type State = NoState<Self::Settings>;
|
||||||
|
type StateOperator = NoOperator<Self::State>;
|
||||||
|
type Message = ConsensusMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage> ServiceCore
|
||||||
|
for CryptarchiaConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage>
|
||||||
|
where
|
||||||
|
A: NetworkAdapter<Tx = ClPool::Item, BlobCertificate = DaPool::Item>
|
||||||
|
+ Clone
|
||||||
|
+ Send
|
||||||
|
+ Sync
|
||||||
|
+ 'static,
|
||||||
|
ClPool: MemPool<BlockId = HeaderId> + Send + Sync + 'static,
|
||||||
|
ClPool::Settings: Send + Sync + 'static,
|
||||||
|
DaPool: MemPool<BlockId = HeaderId> + Send + Sync + 'static,
|
||||||
|
DaPool::Settings: Send + Sync + 'static,
|
||||||
|
ClPool::Item: Transaction<Hash = ClPool::Key>
|
||||||
|
+ Debug
|
||||||
|
+ Clone
|
||||||
|
+ Eq
|
||||||
|
+ Hash
|
||||||
|
+ Serialize
|
||||||
|
+ serde::de::DeserializeOwned
|
||||||
|
+ Send
|
||||||
|
+ Sync
|
||||||
|
+ 'static,
|
||||||
|
DaPool::Item: Certificate<Hash = DaPool::Key>
|
||||||
|
+ Debug
|
||||||
|
+ Clone
|
||||||
|
+ Eq
|
||||||
|
+ Hash
|
||||||
|
+ Serialize
|
||||||
|
+ DeserializeOwned
|
||||||
|
+ Send
|
||||||
|
+ Sync
|
||||||
|
+ 'static,
|
||||||
|
ClPool::Key: Debug + Send + Sync,
|
||||||
|
DaPool::Key: Debug + Send + Sync,
|
||||||
|
ClPoolAdapter: MempoolAdapter<Item = ClPool::Item, Key = ClPool::Key> + Send + Sync + 'static,
|
||||||
|
DaPoolAdapter: MempoolAdapter<Item = DaPool::Item, Key = DaPool::Key> + Send + Sync + 'static,
|
||||||
|
TxS: TxSelect<Tx = ClPool::Item> + Clone + Send + Sync + 'static,
|
||||||
|
TxS::Settings: Send + Sync + 'static,
|
||||||
|
BS: BlobCertificateSelect<Certificate = DaPool::Item> + Clone + Send + Sync + 'static,
|
||||||
|
BS::Settings: Send + Sync + 'static,
|
||||||
|
Storage: StorageBackend + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
|
||||||
|
let network_relay = service_state.overwatch_handle.relay();
|
||||||
|
let cl_mempool_relay = service_state.overwatch_handle.relay();
|
||||||
|
let da_mempool_relay = service_state.overwatch_handle.relay();
|
||||||
|
let storage_relay = service_state.overwatch_handle.relay();
|
||||||
|
Ok(Self {
|
||||||
|
service_state,
|
||||||
|
network_relay,
|
||||||
|
cl_mempool_relay,
|
||||||
|
da_mempool_relay,
|
||||||
|
storage_relay,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(mut self) -> Result<(), overwatch_rs::DynError> {
|
||||||
|
let network_relay: OutboundRelay<_> = self
|
||||||
|
.network_relay
|
||||||
|
.connect()
|
||||||
|
.await
|
||||||
|
.expect("Relay connection with NetworkService should succeed");
|
||||||
|
|
||||||
|
let cl_mempool_relay: OutboundRelay<_> = self
|
||||||
|
.cl_mempool_relay
|
||||||
|
.connect()
|
||||||
|
.await
|
||||||
|
.expect("Relay connection with MemPoolService should succeed");
|
||||||
|
|
||||||
|
let da_mempool_relay: OutboundRelay<_> = self
|
||||||
|
.da_mempool_relay
|
||||||
|
.connect()
|
||||||
|
.await
|
||||||
|
.expect("Relay connection with MemPoolService should succeed");
|
||||||
|
|
||||||
|
let storage_relay: OutboundRelay<_> = self
|
||||||
|
.storage_relay
|
||||||
|
.connect()
|
||||||
|
.await
|
||||||
|
.expect("Relay connection with StorageService should succeed");
|
||||||
|
|
||||||
|
let CryptarchiaSettings {
|
||||||
|
consensus_config,
|
||||||
|
ledger_config,
|
||||||
|
genesis_state,
|
||||||
|
..
|
||||||
|
} = self.service_state.settings_reader.get_updated_settings();
|
||||||
|
|
||||||
|
let genesis_id = HeaderId::from([0; 32]);
|
||||||
|
let mut cryptarchia = Cryptarchia {
|
||||||
|
ledger: <cryptarchia_ledger::Ledger<_>>::from_genesis(
|
||||||
|
genesis_id,
|
||||||
|
genesis_state,
|
||||||
|
ledger_config,
|
||||||
|
),
|
||||||
|
consensus: <cryptarchia_engine::Cryptarchia<_>>::from_genesis(
|
||||||
|
genesis_id,
|
||||||
|
consensus_config,
|
||||||
|
),
|
||||||
|
};
|
||||||
|
let adapter = A::new(network_relay).await;
|
||||||
|
|
||||||
|
let mut incoming_blocks = adapter.blocks_stream().await;
|
||||||
|
|
||||||
|
let mut lifecycle_stream = self.service_state.lifecycle_handle.message_stream();
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
Some(block) = incoming_blocks.next() => {
|
||||||
|
cryptarchia = Self::process_block(
|
||||||
|
cryptarchia,
|
||||||
|
block,
|
||||||
|
storage_relay.clone(),
|
||||||
|
cl_mempool_relay.clone(),
|
||||||
|
da_mempool_relay.clone(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
Some(msg) = self.service_state.inbound_relay.next() => {
|
||||||
|
Self::process_message(&cryptarchia, msg);
|
||||||
|
}
|
||||||
|
Some(msg) = lifecycle_stream.next() => {
|
||||||
|
if Self::should_stop_service(msg).await {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage>
|
||||||
|
CryptarchiaConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage>
|
||||||
|
where
|
||||||
|
A: NetworkAdapter + Clone + Send + Sync + 'static,
|
||||||
|
ClPool: MemPool<BlockId = HeaderId> + Send + Sync + 'static,
|
||||||
|
ClPool::Settings: Send + Sync + 'static,
|
||||||
|
DaPool: MemPool<BlockId = HeaderId> + Send + Sync + 'static,
|
||||||
|
DaPool::Settings: Send + Sync + 'static,
|
||||||
|
ClPool::Item: Transaction<Hash = ClPool::Key>
|
||||||
|
+ Debug
|
||||||
|
+ Clone
|
||||||
|
+ Eq
|
||||||
|
+ Hash
|
||||||
|
+ Serialize
|
||||||
|
+ serde::de::DeserializeOwned
|
||||||
|
+ Send
|
||||||
|
+ Sync
|
||||||
|
+ 'static,
|
||||||
|
DaPool::Item: Certificate<Hash = DaPool::Key>
|
||||||
|
+ Debug
|
||||||
|
+ Clone
|
||||||
|
+ Eq
|
||||||
|
+ Hash
|
||||||
|
+ Serialize
|
||||||
|
+ DeserializeOwned
|
||||||
|
+ Send
|
||||||
|
+ Sync
|
||||||
|
+ 'static,
|
||||||
|
TxS: TxSelect<Tx = ClPool::Item> + Clone + Send + Sync + 'static,
|
||||||
|
BS: BlobCertificateSelect<Certificate = DaPool::Item> + Clone + Send + Sync + 'static,
|
||||||
|
ClPool::Key: Debug + Send + Sync,
|
||||||
|
DaPool::Key: Debug + Send + Sync,
|
||||||
|
ClPoolAdapter: MempoolAdapter<Item = ClPool::Item, Key = ClPool::Key> + Send + Sync + 'static,
|
||||||
|
DaPoolAdapter: MempoolAdapter<Item = DaPool::Item, Key = DaPool::Key> + Send + Sync + 'static,
|
||||||
|
Storage: StorageBackend + Send + Sync + 'static,
|
||||||
|
{
|
||||||
|
async fn should_stop_service(message: LifecycleMessage) -> bool {
|
||||||
|
match message {
|
||||||
|
LifecycleMessage::Shutdown(sender) => {
|
||||||
|
if sender.send(()).is_err() {
|
||||||
|
error!(
|
||||||
|
"Error sending successful shutdown signal from service {}",
|
||||||
|
Self::SERVICE_ID
|
||||||
|
);
|
||||||
|
}
|
||||||
|
true
|
||||||
|
}
|
||||||
|
LifecycleMessage::Kill => true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn process_message(cryptarchia: &Cryptarchia, msg: ConsensusMsg) {
|
||||||
|
match msg {
|
||||||
|
ConsensusMsg::Info { tx } => {
|
||||||
|
let info = CryptarchiaInfo {
|
||||||
|
tip: cryptarchia.tip(),
|
||||||
|
};
|
||||||
|
tx.send(info).unwrap_or_else(|e| {
|
||||||
|
tracing::error!("Could not send consensus info through channel: {:?}", e)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::type_complexity, clippy::too_many_arguments)]
|
||||||
|
#[instrument(
|
||||||
|
level = "debug",
|
||||||
|
skip(cryptarchia, storage_relay, cl_mempool_relay, da_mempool_relay)
|
||||||
|
)]
|
||||||
|
async fn process_block(
|
||||||
|
mut cryptarchia: Cryptarchia,
|
||||||
|
block: Block<ClPool::Item, DaPool::Item>,
|
||||||
|
storage_relay: OutboundRelay<StorageMsg<Storage>>,
|
||||||
|
cl_mempool_relay: OutboundRelay<MempoolMsg<HeaderId, ClPool::Item, ClPool::Key>>,
|
||||||
|
da_mempool_relay: OutboundRelay<MempoolMsg<HeaderId, DaPool::Item, DaPool::Key>>,
|
||||||
|
) -> Cryptarchia {
|
||||||
|
tracing::debug!("received proposal {:?}", block);
|
||||||
|
|
||||||
|
let header = block.header();
|
||||||
|
let id = header.id();
|
||||||
|
match cryptarchia.try_apply_header(block.header()) {
|
||||||
|
Ok(new_state) => {
|
||||||
|
// remove included content from mempool
|
||||||
|
mark_in_block(
|
||||||
|
cl_mempool_relay,
|
||||||
|
block.transactions().map(Transaction::hash),
|
||||||
|
id,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
mark_in_block(da_mempool_relay, block.blobs().map(Certificate::hash), id).await;
|
||||||
|
|
||||||
|
// store block
|
||||||
|
let msg = <StorageMsg<_>>::new_store_message(header.id(), block);
|
||||||
|
if let Err((e, _msg)) = storage_relay.send(msg).await {
|
||||||
|
tracing::error!("Could not send block to storage: {e}");
|
||||||
|
}
|
||||||
|
|
||||||
|
cryptarchia = new_state;
|
||||||
|
}
|
||||||
|
Err(Error::Consensus(cryptarchia_engine::Error::ParentMissing(parent))) => {
|
||||||
|
tracing::debug!("missing parent {:?}", parent);
|
||||||
|
// TODO: request parent block
|
||||||
|
}
|
||||||
|
Err(e) => tracing::debug!("invalid block {:?}: {e:?}", block),
|
||||||
|
}
|
||||||
|
|
||||||
|
cryptarchia
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum ConsensusMsg {
|
||||||
|
Info { tx: Sender<CryptarchiaInfo> },
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RelayMessage for ConsensusMsg {}
|
||||||
|
|
||||||
|
#[serde_as]
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||||
|
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
|
||||||
|
pub struct CryptarchiaInfo {
|
||||||
|
pub tip: HeaderId,
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn mark_in_block<Item, Key>(
|
||||||
|
mempool: OutboundRelay<MempoolMsg<HeaderId, Item, Key>>,
|
||||||
|
ids: impl Iterator<Item = Key>,
|
||||||
|
block: HeaderId,
|
||||||
|
) {
|
||||||
|
mempool
|
||||||
|
.send(MempoolMsg::MarkInBlock {
|
||||||
|
ids: ids.collect(),
|
||||||
|
block,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap_or_else(|(e, _)| tracing::error!("Could not mark items in block: {e}"))
|
||||||
|
}
|
|
@ -0,0 +1,123 @@
|
||||||
|
// std
|
||||||
|
use std::hash::Hash;
|
||||||
|
// crates
|
||||||
|
use serde::{de::DeserializeOwned, Serialize};
|
||||||
|
use tokio::sync::broadcast::error::RecvError;
|
||||||
|
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
|
||||||
|
// internal
|
||||||
|
use crate::network::{messages::NetworkMessage, BoxedStream, NetworkAdapter};
|
||||||
|
use nomos_core::{block::Block, wire};
|
||||||
|
use nomos_network::{
|
||||||
|
backends::libp2p::{Command, Event, EventKind, Libp2p},
|
||||||
|
NetworkMsg, NetworkService,
|
||||||
|
};
|
||||||
|
use overwatch_rs::services::{relay::OutboundRelay, ServiceData};
|
||||||
|
|
||||||
|
const TOPIC: &str = "/cryptarchia/proto";
|
||||||
|
const BUFFER_SIZE: usize = 64;
|
||||||
|
type Relay<T> = OutboundRelay<<NetworkService<T> as ServiceData>::Message>;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct LibP2pAdapter<Tx, BlobCert>
|
||||||
|
where
|
||||||
|
Tx: Clone + Eq + Hash,
|
||||||
|
BlobCert: Clone + Eq + Hash,
|
||||||
|
{
|
||||||
|
network_relay: OutboundRelay<<NetworkService<Libp2p> as ServiceData>::Message>,
|
||||||
|
blocks: tokio::sync::broadcast::Sender<Block<Tx, BlobCert>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Tx, BlobCert> LibP2pAdapter<Tx, BlobCert>
|
||||||
|
where
|
||||||
|
Tx: Clone + Eq + Hash + Serialize,
|
||||||
|
BlobCert: Clone + Eq + Hash + Serialize,
|
||||||
|
{
|
||||||
|
async fn subscribe(relay: &Relay<Libp2p>, topic: &str) {
|
||||||
|
if let Err((e, _)) = relay
|
||||||
|
.send(NetworkMsg::Process(Command::Subscribe(topic.into())))
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::error!("error subscribing to {topic}: {e}");
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl<Tx, BlobCert> NetworkAdapter for LibP2pAdapter<Tx, BlobCert>
|
||||||
|
where
|
||||||
|
Tx: Serialize + DeserializeOwned + Clone + Eq + Hash + Send + 'static,
|
||||||
|
BlobCert: Serialize + DeserializeOwned + Clone + Eq + Hash + Send + 'static,
|
||||||
|
{
|
||||||
|
type Backend = Libp2p;
|
||||||
|
type Tx = Tx;
|
||||||
|
type BlobCertificate = BlobCert;
|
||||||
|
|
||||||
|
async fn new(network_relay: Relay<Libp2p>) -> Self {
|
||||||
|
let relay = network_relay.clone();
|
||||||
|
Self::subscribe(&relay, TOPIC).await;
|
||||||
|
let blocks = tokio::sync::broadcast::Sender::new(BUFFER_SIZE);
|
||||||
|
let blocks_sender = blocks.clone();
|
||||||
|
tracing::debug!("Starting up...");
|
||||||
|
// this wait seems to be helpful in some cases since we give the time
|
||||||
|
// to the network to establish connections before we start sending messages
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||||
|
|
||||||
|
// TODO: maybe we need the runtime handle here?
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let (sender, receiver) = tokio::sync::oneshot::channel();
|
||||||
|
if let Err((e, _)) = relay
|
||||||
|
.send(NetworkMsg::Subscribe {
|
||||||
|
kind: EventKind::Message,
|
||||||
|
sender,
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::error!("error subscribing to incoming messages: {e}");
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut incoming_messages = receiver.await.unwrap();
|
||||||
|
loop {
|
||||||
|
match incoming_messages.recv().await {
|
||||||
|
Ok(Event::Message(message)) => {
|
||||||
|
match nomos_core::wire::deserialize(&message.data) {
|
||||||
|
Ok(msg) => match msg {
|
||||||
|
NetworkMessage::Block(block) => {
|
||||||
|
tracing::debug!("received block {:?}", block.header().id());
|
||||||
|
if let Err(err) = blocks_sender.send(block) {
|
||||||
|
tracing::error!("error sending block to consensus: {err}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => tracing::debug!("unrecognized gossipsub message"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(RecvError::Lagged(n)) => {
|
||||||
|
tracing::error!("lagged messages: {n}")
|
||||||
|
}
|
||||||
|
Err(RecvError::Closed) => unreachable!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Self {
|
||||||
|
network_relay,
|
||||||
|
blocks,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn blocks_stream(&self) -> BoxedStream<Block<Self::Tx, Self::BlobCertificate>> {
|
||||||
|
Box::new(BroadcastStream::new(self.blocks.subscribe()).filter_map(Result::ok))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn broadcast(&self, message: NetworkMessage<Self::Tx, Self::BlobCertificate>) {
|
||||||
|
if let Err((e, message)) = self
|
||||||
|
.network_relay
|
||||||
|
.send(NetworkMsg::Process(Command::Broadcast {
|
||||||
|
message: wire::serialize(&message).unwrap().into_boxed_slice(),
|
||||||
|
topic: TOPIC.into(),
|
||||||
|
}))
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::error!("error broadcasting {message:?}: {e}");
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,2 @@
|
||||||
|
#[cfg(feature = "libp2p")]
|
||||||
|
pub mod libp2p;
|
|
@ -0,0 +1,15 @@
|
||||||
|
// std
|
||||||
|
use std::hash::Hash;
|
||||||
|
// crates
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
// internal
|
||||||
|
use nomos_core::block::Block;
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
pub enum NetworkMessage<Tx, Blob>
|
||||||
|
where
|
||||||
|
Tx: Clone + Eq + Hash,
|
||||||
|
Blob: Clone + Eq + Hash,
|
||||||
|
{
|
||||||
|
Block(Block<Tx, Blob>),
|
||||||
|
}
|
|
@ -0,0 +1,29 @@
|
||||||
|
pub mod adapters;
|
||||||
|
pub mod messages;
|
||||||
|
|
||||||
|
// std
|
||||||
|
use std::hash::Hash;
|
||||||
|
// crates
|
||||||
|
use futures::Stream;
|
||||||
|
use nomos_core::block::Block;
|
||||||
|
// internal
|
||||||
|
use crate::network::messages::NetworkMessage;
|
||||||
|
use nomos_network::backends::NetworkBackend;
|
||||||
|
use nomos_network::NetworkService;
|
||||||
|
use overwatch_rs::services::relay::OutboundRelay;
|
||||||
|
use overwatch_rs::services::ServiceData;
|
||||||
|
use serde::{de::DeserializeOwned, Serialize};
|
||||||
|
|
||||||
|
type BoxedStream<T> = Box<dyn Stream<Item = T> + Send + Sync + Unpin>;
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
pub trait NetworkAdapter {
|
||||||
|
type Backend: NetworkBackend + 'static;
|
||||||
|
type Tx: Serialize + DeserializeOwned + Clone + Eq + Hash + 'static;
|
||||||
|
type BlobCertificate: Serialize + DeserializeOwned + Clone + Eq + Hash + 'static;
|
||||||
|
async fn new(
|
||||||
|
network_relay: OutboundRelay<<NetworkService<Self::Backend> as ServiceData>::Message>,
|
||||||
|
) -> Self;
|
||||||
|
async fn blocks_stream(&self) -> BoxedStream<Block<Self::Tx, Self::BlobCertificate>>;
|
||||||
|
async fn broadcast(&self, message: NetworkMessage<Self::Tx, Self::BlobCertificate>);
|
||||||
|
}
|
Loading…
Reference in New Issue