Add leadership stub (#22)
* add leadership stub * move types to core * clippy happy
This commit is contained in:
parent
9721e6f5fa
commit
0229337414
|
@ -2,3 +2,4 @@ pub mod block;
|
||||||
pub mod crypto;
|
pub mod crypto;
|
||||||
pub mod fountain;
|
pub mod fountain;
|
||||||
pub mod staking;
|
pub mod staking;
|
||||||
|
pub mod tx;
|
||||||
|
|
|
@ -0,0 +1,5 @@
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct Tx;
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct Id;
|
|
@ -12,6 +12,7 @@ rand = "0.8"
|
||||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
|
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
|
||||||
async-trait = "0.1"
|
async-trait = "0.1"
|
||||||
nomos-network = { path = "../network" }
|
nomos-network = { path = "../network" }
|
||||||
|
nomos-mempool = { path = "../mempool" }
|
||||||
nomos-core = { path = "../../nomos-core" }
|
nomos-core = { path = "../../nomos-core" }
|
||||||
tokio = { version = "1", features = ["sync"] }
|
tokio = { version = "1", features = ["sync"] }
|
||||||
tokio-stream = "0.1"
|
tokio-stream = "0.1"
|
||||||
|
|
|
@ -0,0 +1,58 @@
|
||||||
|
// std
|
||||||
|
use std::marker::PhantomData;
|
||||||
|
// crates
|
||||||
|
// internal
|
||||||
|
use nomos_core::crypto::PrivateKey;
|
||||||
|
use nomos_mempool::MempoolMsg;
|
||||||
|
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
// TODO: take care of sensitve material
|
||||||
|
struct Enclave {
|
||||||
|
key: PrivateKey,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct Leadership<Tx, Id> {
|
||||||
|
key: Enclave,
|
||||||
|
mempool: OutboundRelay<MempoolMsg<Tx, Id>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub enum LeadershipResult<'view> {
|
||||||
|
Leader {
|
||||||
|
block: Block,
|
||||||
|
_view: PhantomData<&'view u8>,
|
||||||
|
},
|
||||||
|
NotLeader {
|
||||||
|
_view: PhantomData<&'view u8>,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Tx, Id> Leadership<Tx, Id> {
|
||||||
|
pub fn new(key: PrivateKey, mempool: OutboundRelay<MempoolMsg<Tx, Id>>) -> Self {
|
||||||
|
Self {
|
||||||
|
key: Enclave { key },
|
||||||
|
mempool,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[allow(unused, clippy::diverging_sub_expression)]
|
||||||
|
pub async fn try_propose_block<'view>(
|
||||||
|
&self,
|
||||||
|
view: &'view View,
|
||||||
|
tip: &Tip,
|
||||||
|
) -> LeadershipResult<'view> {
|
||||||
|
let ancestor_hint = todo!("get the ancestor from the tip");
|
||||||
|
if view.is_leader(self.key.key) {
|
||||||
|
let (tx, rx) = tokio::sync::oneshot::channel();
|
||||||
|
self.mempool.send(MempoolMsg::View { ancestor_hint, tx });
|
||||||
|
let _iter = rx.await;
|
||||||
|
|
||||||
|
LeadershipResult::Leader {
|
||||||
|
_view: PhantomData,
|
||||||
|
block: todo!("form a block from the returned iterator"),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LeadershipResult::NotLeader { _view: PhantomData }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -4,17 +4,22 @@
|
||||||
//! are always synchronized (i.e. it cannot happen that we accidentally use committees from different views).
|
//! are always synchronized (i.e. it cannot happen that we accidentally use committees from different views).
|
||||||
//! It's obviously extremely important that the information contained in `View` is synchronized across different
|
//! It's obviously extremely important that the information contained in `View` is synchronized across different
|
||||||
//! nodes, but that has to be achieved through different means.
|
//! nodes, but that has to be achieved through different means.
|
||||||
|
mod leadership;
|
||||||
mod network;
|
mod network;
|
||||||
pub mod overlay;
|
pub mod overlay;
|
||||||
|
mod tip;
|
||||||
|
|
||||||
// std
|
// std
|
||||||
use std::collections::{BTreeMap, HashSet};
|
use std::collections::{BTreeMap, HashSet};
|
||||||
|
use std::fmt::Debug;
|
||||||
// crates
|
// crates
|
||||||
// internal
|
// internal
|
||||||
use crate::network::NetworkAdapter;
|
use crate::network::NetworkAdapter;
|
||||||
|
use leadership::{Leadership, LeadershipResult};
|
||||||
use nomos_core::block::Block;
|
use nomos_core::block::Block;
|
||||||
use nomos_core::crypto::PublicKey;
|
use nomos_core::crypto::PublicKey;
|
||||||
use nomos_core::staking::Stake;
|
use nomos_core::staking::Stake;
|
||||||
|
use nomos_mempool::{backend::Pool, Mempool};
|
||||||
use nomos_network::NetworkService;
|
use nomos_network::NetworkService;
|
||||||
use overlay::{Member, Overlay};
|
use overlay::{Member, Overlay};
|
||||||
use overwatch_rs::services::relay::{OutboundRelay, Relay};
|
use overwatch_rs::services::relay::{OutboundRelay, Relay};
|
||||||
|
@ -24,6 +29,7 @@ use overwatch_rs::services::{
|
||||||
state::{NoOperator, NoState},
|
state::{NoOperator, NoState},
|
||||||
ServiceCore, ServiceData, ServiceId,
|
ServiceCore, ServiceData, ServiceId,
|
||||||
};
|
};
|
||||||
|
use tip::Tip;
|
||||||
|
|
||||||
// Raw bytes for now, could be a ed25519 public key
|
// Raw bytes for now, could be a ed25519 public key
|
||||||
pub type NodeId = PublicKey;
|
pub type NodeId = PublicKey;
|
||||||
|
@ -37,14 +43,27 @@ pub struct CarnotSettings {
|
||||||
private_key: [u8; 32],
|
private_key: [u8; 32],
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct CarnotConsensus<Network: NetworkAdapter + Send + Sync + 'static> {
|
pub struct CarnotConsensus<A, P>
|
||||||
|
where
|
||||||
|
A: NetworkAdapter + Send + Sync + 'static,
|
||||||
|
P: Pool + Send + Sync + 'static,
|
||||||
|
P::Tx: Debug + Send + Sync + 'static,
|
||||||
|
P::Id: Debug + Send + Sync + 'static,
|
||||||
|
{
|
||||||
service_state: ServiceStateHandle<Self>,
|
service_state: ServiceStateHandle<Self>,
|
||||||
// underlying networking backend. We need this so we can relay and check the types properly
|
// underlying networking backend. We need this so we can relay and check the types properly
|
||||||
// when implementing ServiceCore for CarnotConsensus
|
// when implementing ServiceCore for CarnotConsensus
|
||||||
network_relay: Relay<NetworkService<<Network as NetworkAdapter>::Backend>>,
|
network_relay: Relay<NetworkService<A::Backend>>,
|
||||||
|
mempool_relay: Relay<Mempool<A::Backend, P>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Network: NetworkAdapter + Send + Sync + 'static> ServiceData for CarnotConsensus<Network> {
|
impl<A, P> ServiceData for CarnotConsensus<A, P>
|
||||||
|
where
|
||||||
|
A: NetworkAdapter + Send + Sync + 'static,
|
||||||
|
P: Pool + Send + Sync + 'static,
|
||||||
|
P::Tx: Debug + Send + Sync + 'static,
|
||||||
|
P::Id: Debug + Send + Sync + 'static,
|
||||||
|
{
|
||||||
const SERVICE_ID: ServiceId = "Carnot";
|
const SERVICE_ID: ServiceId = "Carnot";
|
||||||
type Settings = CarnotSettings;
|
type Settings = CarnotSettings;
|
||||||
type State = NoState<Self::Settings>;
|
type State = NoState<Self::Settings>;
|
||||||
|
@ -53,12 +72,20 @@ impl<Network: NetworkAdapter + Send + Sync + 'static> ServiceData for CarnotCons
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl<Network: NetworkAdapter + Send + Sync + 'static> ServiceCore for CarnotConsensus<Network> {
|
impl<A, P> ServiceCore for CarnotConsensus<A, P>
|
||||||
|
where
|
||||||
|
A: NetworkAdapter + Send + Sync + 'static,
|
||||||
|
P: Pool + Send + Sync + 'static,
|
||||||
|
P::Tx: Debug + Send + Sync + 'static,
|
||||||
|
P::Id: Debug + Send + Sync + 'static,
|
||||||
|
{
|
||||||
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
|
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
|
||||||
let network_relay = service_state.overwatch_handle.relay();
|
let network_relay = service_state.overwatch_handle.relay();
|
||||||
|
let mempool_relay = service_state.overwatch_handle.relay();
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
service_state,
|
service_state,
|
||||||
network_relay,
|
network_relay,
|
||||||
|
mempool_relay,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -71,27 +98,48 @@ impl<Network: NetworkAdapter + Send + Sync + 'static> ServiceCore for CarnotCons
|
||||||
.await
|
.await
|
||||||
.expect("Relay connection with NetworkService should succeed");
|
.expect("Relay connection with NetworkService should succeed");
|
||||||
|
|
||||||
let network_adapter = Network::new(network_relay).await;
|
let mempool_relay: OutboundRelay<_> = self
|
||||||
|
.mempool_relay
|
||||||
|
.connect()
|
||||||
|
.await
|
||||||
|
.expect("Relay connection with MempoolService should succeed");
|
||||||
|
|
||||||
|
let network_adapter = A::new(network_relay).await;
|
||||||
|
|
||||||
|
let tip = Tip;
|
||||||
|
|
||||||
// TODO: fix
|
// TODO: fix
|
||||||
let node_id = self
|
let priv_key = self
|
||||||
.service_state
|
.service_state
|
||||||
.settings_reader
|
.settings_reader
|
||||||
.get_updated_settings()
|
.get_updated_settings()
|
||||||
.private_key;
|
.private_key;
|
||||||
|
let node_id = priv_key;
|
||||||
|
|
||||||
|
let leadership = Leadership::new(priv_key, mempool_relay);
|
||||||
loop {
|
loop {
|
||||||
let view = view_generator.next().await;
|
let view = view_generator.next().await;
|
||||||
// if we want to process multiple views at the same time this can
|
// if we want to process multiple views at the same time this can
|
||||||
// be spawned as a separate future
|
// be spawned as a separate future
|
||||||
// TODO: add leadership module
|
// TODO: add leadership module
|
||||||
view.resolve::<Network, Member<'_, COMMITTEE_SIZE>>(node_id, &network_adapter)
|
view.resolve::<A, Member<'_, COMMITTEE_SIZE>, _, _>(
|
||||||
|
node_id,
|
||||||
|
&tip,
|
||||||
|
&network_adapter,
|
||||||
|
&leadership,
|
||||||
|
)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<Network: NetworkAdapter + Send + Sync + 'static> CarnotConsensus<Network> {
|
impl<A, P> CarnotConsensus<A, P>
|
||||||
|
where
|
||||||
|
A: NetworkAdapter + Send + Sync + 'static,
|
||||||
|
P: Pool + Send + Sync + 'static,
|
||||||
|
P::Tx: Debug + Send + Sync + 'static,
|
||||||
|
P::Id: Debug + Send + Sync + 'static,
|
||||||
|
{
|
||||||
// Build a service that generates new views as they become available
|
// Build a service that generates new views as they become available
|
||||||
async fn view_generator(&self) -> ViewGenerator {
|
async fn view_generator(&self) -> ViewGenerator {
|
||||||
todo!()
|
todo!()
|
||||||
|
@ -126,14 +174,25 @@ impl View {
|
||||||
const APPROVAL_THRESHOLD: usize = 1;
|
const APPROVAL_THRESHOLD: usize = 1;
|
||||||
|
|
||||||
// TODO: might want to encode steps in the type system
|
// TODO: might want to encode steps in the type system
|
||||||
async fn resolve<'view, Network: NetworkAdapter, O: Overlay<'view, Network>>(
|
async fn resolve<'view, A, O, Tx, Id>(
|
||||||
&'view self,
|
&'view self,
|
||||||
node_id: NodeId,
|
node_id: NodeId,
|
||||||
adapter: &Network,
|
tip: &Tip,
|
||||||
) {
|
adapter: &A,
|
||||||
|
leadership: &Leadership<Tx, Id>,
|
||||||
|
) where
|
||||||
|
A: NetworkAdapter + Send + Sync + 'static,
|
||||||
|
O: Overlay<'view, A>,
|
||||||
|
{
|
||||||
let overlay = O::new(self, node_id);
|
let overlay = O::new(self, node_id);
|
||||||
|
|
||||||
let block = overlay.reconstruct_proposal_block(adapter).await;
|
let block = if let LeadershipResult::Leader { block, .. } =
|
||||||
|
leadership.try_propose_block(self, tip).await
|
||||||
|
{
|
||||||
|
block
|
||||||
|
} else {
|
||||||
|
overlay.reconstruct_proposal_block(adapter).await
|
||||||
|
};
|
||||||
// TODO: verify?
|
// TODO: verify?
|
||||||
overlay.broadcast_block(block.clone(), adapter).await;
|
overlay.broadcast_block(block.clone(), adapter).await;
|
||||||
self.approve(&overlay, block, adapter).await;
|
self.approve(&overlay, block, adapter).await;
|
||||||
|
@ -161,4 +220,8 @@ impl View {
|
||||||
fn craft_proof_of_approval(&self, _approvals: impl Iterator<Item = Approval>) -> Approval {
|
fn craft_proof_of_approval(&self, _approvals: impl Iterator<Item = Approval>) -> Approval {
|
||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn is_leader(&self, _node_id: NodeId) -> bool {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
/// Assuming determining which tip to consider is integral part of consensus
|
||||||
|
pub struct Tip;
|
|
@ -1,5 +1,5 @@
|
||||||
[package]
|
[package]
|
||||||
name = "mempool"
|
name = "nomos-mempool"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
|
|
|
@ -39,7 +39,7 @@ pub enum MempoolMsg<Tx, Id> {
|
||||||
},
|
},
|
||||||
View {
|
View {
|
||||||
ancestor_hint: BlockId,
|
ancestor_hint: BlockId,
|
||||||
rx: Sender<Box<dyn Iterator<Item = Tx> + Send>>,
|
tx: Sender<Box<dyn Iterator<Item = Tx> + Send>>,
|
||||||
},
|
},
|
||||||
Prune {
|
Prune {
|
||||||
ids: Vec<Id>,
|
ids: Vec<Id>,
|
||||||
|
@ -133,8 +133,8 @@ where
|
||||||
tracing::debug!("could not add tx to the pool due to: {}", e)
|
tracing::debug!("could not add tx to the pool due to: {}", e)
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
MempoolMsg::View { ancestor_hint, rx } => {
|
MempoolMsg::View { ancestor_hint, tx } => {
|
||||||
rx.send(pool.view(ancestor_hint)).unwrap_or_else(|_| {
|
tx.send(pool.view(ancestor_hint)).unwrap_or_else(|_| {
|
||||||
tracing::debug!("could not send back pool view")
|
tracing::debug!("could not send back pool view")
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue