From a933c7324541f5aaaa482606b8049a2c253edf9a Mon Sep 17 00:00:00 2001 From: Giacomo Pasini <21265557+zeegomo@users.noreply.github.com> Date: Mon, 18 Mar 2024 16:04:16 +0100 Subject: [PATCH] add cryptarchia leadership (#613) --- ledger/cryptarchia-ledger/src/coin.rs | 157 ++++++++++++++++++ ledger/cryptarchia-ledger/src/config.rs | 42 ++++- ledger/cryptarchia-ledger/src/lib.rs | 120 ++++++------- ledger/cryptarchia-ledger/src/nonce.rs | 5 + nomos-core/src/crypto.rs | 2 +- nomos-core/src/header/cryptarchia.rs | 4 +- .../cryptarchia-consensus/Cargo.toml | 1 + .../cryptarchia-consensus/src/leadership.rs | 21 +++ .../cryptarchia-consensus/src/lib.rs | 144 +++++++++++++--- .../cryptarchia-consensus/src/time.rs | 50 ++++++ 10 files changed, 456 insertions(+), 90 deletions(-) create mode 100644 ledger/cryptarchia-ledger/src/coin.rs create mode 100644 nomos-services/cryptarchia-consensus/src/time.rs diff --git a/ledger/cryptarchia-ledger/src/coin.rs b/ledger/cryptarchia-ledger/src/coin.rs new file mode 100644 index 00000000..81f76d24 --- /dev/null +++ b/ledger/cryptarchia-ledger/src/coin.rs @@ -0,0 +1,157 @@ +use crate::{crypto::Blake2b, Commitment, EpochState, LeaderProof, Nonce, Nullifier}; +use blake2::digest::Digest; +use cryptarchia_engine::config::Config; +use cryptarchia_engine::Slot; + +type SecretKey = [u8; 32]; +type PublicKey = [u8; 32]; + +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +#[derive(Clone, Debug, Eq, PartialEq, Copy, Hash)] +pub struct Value(u32); + +impl From for Value { + fn from(value: u32) -> Self { + Self(value) + } +} + +// This implementatio is only a stub +// see https://github.com/logos-co/nomos-specs/blob/master/cryptarchia/cryptarchia.py for a spec +#[derive(Debug, Clone, PartialEq, Eq, Copy)] +pub struct Coin { + sk: SecretKey, + nonce: Nonce, + value: Value, +} + +impl Coin { + pub fn new(sk: SecretKey, nonce: Nonce, value: Value) -> Self { + Self { sk, nonce, value } + } + + pub fn evolve(&self) -> Self { + let mut h = Blake2b::new(); + h.update("coin-evolve"); + h.update(self.sk); + h.update(self.nonce); + Self { + sk: self.sk, + nonce: <[u8; 32]>::from(h.finalize()).into(), + value: self.value, + } + } + + fn value_bytes(&self) -> [u8; 32] { + let mut value_bytes = [0; 32]; + value_bytes[28..].copy_from_slice(&self.value.0.to_be_bytes()); + value_bytes + } + + pub fn pk(&self) -> &PublicKey { + &self.sk + } + + pub fn commitment(&self) -> Commitment { + let mut h = Blake2b::new(); + h.update("coin-commitment"); + h.update(self.nonce); + h.update(self.pk()); + h.update(self.value_bytes()); + <[u8; 32]>::from(h.finalize()).into() + } + + pub fn nullifier(&self) -> Nullifier { + let mut h = Blake2b::new(); + h.update("coin-nullifier"); + h.update(self.nonce); + h.update(self.pk()); + h.update(self.value_bytes()); + <[u8; 32]>::from(h.finalize()).into() + } + + // TODO: better precision + fn phi(active_slot_coeff: f64, relative_stake: f64) -> f64 { + 1.0 - (1.0 - active_slot_coeff).powf(relative_stake) + } + + pub fn vrf(&self, epoch_nonce: Nonce, slot: Slot) -> [u8; 32] { + let mut h = Blake2b::new(); + h.update("lead"); + h.update(epoch_nonce); + h.update(u64::from(slot).to_be_bytes()); + h.update(self.sk); + h.update(self.nonce); + h.finalize().into() + } + + pub fn is_slot_leader(&self, epoch: &EpochState, slot: Slot, config: &Config) -> bool { + // TODO: check slot and epoch state are consistent + let relative_stake = f64::from(self.value.0) / f64::from(epoch.total_stake.0); + + let vrf = self.vrf(epoch.nonce, slot); + + // HACK: we artificially restrict the VRF output so that it fits into a u32 + // this is not going to be the final implementation anyway and the last 4 bytes + // should have a similar (statistical) distribution as the full output + let vrf = u32::from_be_bytes(vrf[28..].try_into().unwrap()); + vrf < (f64::from(u32::MAX) * Self::phi(config.active_slot_coeff, relative_stake)) as u32 + } + + pub fn to_proof(&self, slot: Slot) -> LeaderProof { + LeaderProof::new( + self.commitment(), + self.nullifier(), + slot, + self.evolve().commitment(), + ) + } +} + +#[cfg(test)] +mod tests { + use super::*; + #[test] + fn test_slot_leader_statistics() { + let epoch = EpochState { + epoch: 0.into(), + nonce: [1; 32].into(), + commitments: Default::default(), + total_stake: 1000.into(), + }; + + let coin = Coin { + sk: [0; 32], + nonce: [0; 32].into(), + value: 10.into(), + }; + + let config = Config { + active_slot_coeff: 0.05, + security_param: 1, + }; + + // We'll use the Margin of Error equation to decide how many samples we need. + // https://en.wikipedia.org/wiki/Margin_of_error + let margin_of_error = 1e-4; + let p = Coin::phi(config.active_slot_coeff, 10.0 / 1000.0); + let std = (p * (1.0 - p)).sqrt(); + let z = 3.0; // we want 3 std from the mean to be within the margin of error + let n = (z * std / margin_of_error).powi(2).ceil(); + + // After N slots, the measured leader rate should be within the + // interval `p +- margin_of_error` with high probabiltiy + let leader_rate = (0..n as u64) + .map(|slot| coin.is_slot_leader(&epoch, slot.into(), &config)) + .filter(|x| *x) + .count(); + + println!("leader_rate: {n} {} / {}", leader_rate, p); + + assert!( + (leader_rate as f64 / n - p).abs() < margin_of_error, + "{leader_rate} != {p}, err={} > {margin_of_error}", + (leader_rate as f64 / n - p).abs() + ); + } +} diff --git a/ledger/cryptarchia-ledger/src/config.rs b/ledger/cryptarchia-ledger/src/config.rs index d7fe1940..06acb907 100644 --- a/ledger/cryptarchia-ledger/src/config.rs +++ b/ledger/cryptarchia-ledger/src/config.rs @@ -31,15 +31,53 @@ impl Config { pub fn nonce_snapshot(&self, epoch: Epoch) -> Slot { let offset = self.base_period_length() * (self.epoch_period_nonce_buffer + self.epoch_stake_distribution_stabilization) as u64; - let base = u32::from(epoch) as u64 * self.epoch_length(); + let base = (u32::from(epoch) - 1) as u64 * self.epoch_length(); (base + offset).into() } pub fn stake_distribution_snapshot(&self, epoch: Epoch) -> Slot { - (u32::from(epoch) as u64 * self.epoch_length()).into() + ((u32::from(epoch) - 1) as u64 * self.epoch_length()).into() } pub fn epoch(&self, slot: Slot) -> Epoch { ((u64::from(slot) / self.epoch_length()) as u32).into() } } + +#[cfg(test)] +mod tests { + #[test] + fn epoch_snapshots() { + let config = super::Config { + epoch_stake_distribution_stabilization: 3, + epoch_period_nonce_buffer: 3, + epoch_period_nonce_stabilization: 4, + consensus_config: cryptarchia_engine::Config { + security_param: 5, + active_slot_coeff: 0.5, + }, + }; + assert_eq!(config.epoch_length(), 100); + assert_eq!(config.nonce_snapshot(1.into()), 60.into()); + assert_eq!(config.nonce_snapshot(2.into()), 160.into()); + assert_eq!(config.stake_distribution_snapshot(1.into()), 0.into()); + assert_eq!(config.stake_distribution_snapshot(2.into()), 100.into()); + } + + #[test] + fn slot_to_epoch() { + let config = super::Config { + epoch_stake_distribution_stabilization: 3, + epoch_period_nonce_buffer: 3, + epoch_period_nonce_stabilization: 4, + consensus_config: cryptarchia_engine::Config { + security_param: 5, + active_slot_coeff: 0.5, + }, + }; + assert_eq!(config.epoch(1.into()), 0.into()); + assert_eq!(config.epoch(100.into()), 1.into()); + assert_eq!(config.epoch(101.into()), 1.into()); + assert_eq!(config.epoch(200.into()), 2.into()); + } +} diff --git a/ledger/cryptarchia-ledger/src/lib.rs b/ledger/cryptarchia-ledger/src/lib.rs index 105f239e..32f1e0e9 100644 --- a/ledger/cryptarchia-ledger/src/lib.rs +++ b/ledger/cryptarchia-ledger/src/lib.rs @@ -1,3 +1,4 @@ +mod coin; mod config; mod crypto; mod leader_proof; @@ -12,6 +13,7 @@ use thiserror::Error; type HashTrieSet = rpds::HashTrieSetSync; +pub use coin::{Coin, Value}; pub use config::Config; pub use leader_proof::*; pub use nonce::*; @@ -42,6 +44,7 @@ pub struct EpochState { // stake distribution snapshot taken at the beginning of the epoch // (in practice, this is equivalent to the coins the are spendable at the beginning of the epoch) commitments: HashTrieSet, + total_stake: Value, } impl EpochState { @@ -63,12 +66,17 @@ impl EpochState { epoch: self.epoch, nonce, commitments, + total_stake: self.total_stake, } } fn is_eligible_leader(&self, commitment: &Commitment) -> bool { self.commitments.contains(commitment) } + + pub fn epoch(&self) -> Epoch { + self.epoch + } } #[derive(Clone, Debug, PartialEq)] @@ -131,6 +139,10 @@ where pub fn state(&self, id: &Id) -> Option<&LedgerState> { self.states.get(id) } + + pub fn config(&self) -> &Config { + &self.config + } } #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] @@ -170,6 +182,8 @@ impl LedgerState { }); } + // TODO: update once supply can change + let total_stake = self.epoch_state.total_stake; let current_epoch = config.epoch(self.slot); let new_epoch = config.epoch(slot); @@ -198,6 +212,7 @@ impl LedgerState { epoch: new_epoch + 1, nonce: self.nonce, commitments: self.spend_commitments.clone(), + total_stake, }; Ok(Self { slot, @@ -211,11 +226,13 @@ impl LedgerState { epoch: new_epoch, nonce: self.nonce, commitments: self.spend_commitments.clone(), + total_stake, }; let next_epoch_state = EpochState { epoch: new_epoch + 1, nonce: self.nonce, commitments: self.spend_commitments.clone(), + total_stake, }; Ok(Self { slot, @@ -318,14 +335,28 @@ impl LedgerState { epoch: 1.into(), nonce: [0; 32].into(), commitments: Default::default(), + total_stake: 1.into(), }, epoch_state: EpochState { epoch: 0.into(), nonce: [0; 32].into(), commitments: Default::default(), + total_stake: 1.into(), }, } } + + pub fn slot(&self) -> Slot { + self.slot + } + + pub fn epoch_state(&self) -> &EpochState { + &self.epoch_state + } + + pub fn next_epoch_state(&self) -> &EpochState { + &self.next_epoch_state + } } impl core::fmt::Debug for LedgerState { @@ -348,14 +379,19 @@ impl core::fmt::Debug for LedgerState { #[cfg(test)] pub mod tests { - use super::{EpochState, Ledger, LedgerState}; - use crate::{crypto::Blake2b, Commitment, Config, LeaderProof, LedgerError, Nullifier}; + use super::{Coin, EpochState, Ledger, LedgerState}; + use crate::{crypto::Blake2b, Commitment, Config, LedgerError}; use blake2::Digest; use cryptarchia_engine::Slot; - use std::hash::{DefaultHasher, Hash, Hasher}; type HeaderId = [u8; 32]; + fn coin(id: u64) -> Coin { + let mut sk = [0; 32]; + sk[..8].copy_from_slice(&id.to_be_bytes()); + Coin::new(sk, [0; 32].into(), 1.into()) + } + fn update_ledger( ledger: &mut Ledger, parent: HeaderId, @@ -369,8 +405,7 @@ pub mod tests { Blake2b::new() .chain_update(parent) .chain_update(slot.into().to_be_bytes()) - .chain_update(coin.sk.to_be_bytes()) - .chain_update(coin.nonce.to_be_bytes()) + .chain_update(coin.vrf([0; 32].into(), 0.into())) .finalize() .into() } @@ -408,54 +443,6 @@ pub mod tests { } } - #[derive(Debug, Clone, Copy)] - pub struct Coin { - sk: u64, - nonce: u64, - } - - impl Coin { - pub fn new(sk: u64) -> Self { - Self { sk, nonce: 0 } - } - - pub fn commitment(&self) -> Commitment { - <[u8; 32]>::from( - Blake2b::new_with_prefix("commitment".as_bytes()) - .chain_update(self.sk.to_be_bytes()) - .chain_update(self.nonce.to_be_bytes()) - .finalize(), - ) - .into() - } - - pub fn nullifier(&self) -> Nullifier { - <[u8; 32]>::from( - Blake2b::new_with_prefix("nullifier".as_bytes()) - .chain_update(self.sk.to_be_bytes()) - .chain_update(self.nonce.to_be_bytes()) - .finalize(), - ) - .into() - } - - pub fn evolve(&self) -> Self { - let mut h = DefaultHasher::new(); - self.nonce.hash(&mut h); - let nonce = h.finish(); - Self { sk: self.sk, nonce } - } - - pub fn to_proof(&self, slot: Slot) -> LeaderProof { - LeaderProof::new( - self.commitment(), - self.nullifier(), - slot, - self.evolve().commitment(), - ) - } - } - pub fn genesis_state(commitments: &[Commitment]) -> LedgerState { LedgerState { lead_commitments: commitments.iter().cloned().collect(), @@ -467,11 +454,13 @@ pub mod tests { epoch: 1.into(), nonce: [0; 32].into(), commitments: commitments.iter().cloned().collect(), + total_stake: 1.into(), }, epoch_state: EpochState { epoch: 0.into(), nonce: [0; 32].into(), commitments: commitments.iter().cloned().collect(), + total_stake: 1.into(), }, } } @@ -503,7 +492,7 @@ pub mod tests { #[test] fn test_ledger_state_prevents_coin_reuse() { - let coin = Coin::new(0); + let coin = coin(0); let (mut ledger, genesis) = ledger(&[coin.commitment()]); let h = update_ledger(&mut ledger, genesis, 1, coin).unwrap(); @@ -517,7 +506,7 @@ pub mod tests { #[test] fn test_ledger_state_uncommited_coin() { - let coin = Coin::new(0); + let coin = coin(0); let (mut ledger, genesis) = ledger(&[]); assert!(matches!( update_ledger(&mut ledger, genesis, 1, coin), @@ -527,9 +516,9 @@ pub mod tests { #[test] fn test_ledger_state_is_properly_updated_on_reorg() { - let coin_1 = Coin::new(0); - let coin_2 = Coin::new(1); - let coin_3 = Coin::new(2); + let coin_1 = coin(0); + let coin_2 = coin(1); + let coin_3 = coin(2); let (mut ledger, genesis) = ledger(&[ coin_1.commitment(), @@ -550,9 +539,9 @@ pub mod tests { #[test] fn test_epoch_transition() { - let coins = (0..4).map(Coin::new).collect::>(); - let coin_4 = Coin::new(4); - let coin_5 = Coin::new(5); + let coins = (0..4).map(coin).collect::>(); + let coin_4 = coin(4); + let coin_5 = coin(5); let (mut ledger, genesis) = ledger(&coins.iter().map(|c| c.commitment()).collect::>()); @@ -597,7 +586,7 @@ pub mod tests { #[test] fn test_evolved_coin_is_eligible_for_leadership() { - let coin = Coin::new(0); + let coin = coin(0); let (mut ledger, genesis) = ledger(&[coin.commitment()]); let h = update_ledger(&mut ledger, genesis, 1, coin).unwrap(); @@ -620,8 +609,9 @@ pub mod tests { #[test] fn test_new_coins_becoming_eligible_after_stake_distribution_stabilizes() { - let coin = Coin::new(0); - let coin_1 = Coin::new(1); + let coin_1 = coin(1); + let coin = coin(0); + let (mut ledger, genesis) = ledger(&[coin.commitment()]); // EPOCH 0 @@ -657,7 +647,7 @@ pub mod tests { #[test] fn test_orphan_proof_import() { - let coin = Coin::new(0); + let coin = coin(0); let (mut ledger, genesis) = ledger(&[coin.commitment()]); let coin_new = coin.evolve(); diff --git a/ledger/cryptarchia-ledger/src/nonce.rs b/ledger/cryptarchia-ledger/src/nonce.rs index 6dd32e21..d7ac9b6d 100644 --- a/ledger/cryptarchia-ledger/src/nonce.rs +++ b/ledger/cryptarchia-ledger/src/nonce.rs @@ -14,4 +14,9 @@ impl From for [u8; 32] { } } +impl AsRef<[u8]> for Nonce { + fn as_ref(&self) -> &[u8] { + &self.0 + } +} serialize_bytes_newtype!(Nonce); diff --git a/nomos-core/src/crypto.rs b/nomos-core/src/crypto.rs index 4c38c383..c1775e4b 100644 --- a/nomos-core/src/crypto.rs +++ b/nomos-core/src/crypto.rs @@ -4,4 +4,4 @@ pub type PublicKey = [u8; 32]; pub type PrivateKey = [u8; 32]; pub type Signature = [u8; 32]; -pub(crate) type Blake2b = blake2::Blake2b; +pub type Blake2b = blake2::Blake2b; diff --git a/nomos-core/src/header/cryptarchia.rs b/nomos-core/src/header/cryptarchia.rs index 2dc3a042..ba2f0144 100644 --- a/nomos-core/src/header/cryptarchia.rs +++ b/nomos-core/src/header/cryptarchia.rs @@ -91,10 +91,10 @@ pub struct Builder { } impl Builder { - pub fn new(parent: HeaderId, slot: Slot, leader_proof: LeaderProof) -> Self { + pub fn new(parent: HeaderId, leader_proof: LeaderProof) -> Self { Self { parent, - slot, + slot: leader_proof.slot(), leader_proof, orphaned_leader_proofs: vec![], } diff --git a/nomos-services/cryptarchia-consensus/Cargo.toml b/nomos-services/cryptarchia-consensus/Cargo.toml index 25aec05a..e52bdb73 100644 --- a/nomos-services/cryptarchia-consensus/Cargo.toml +++ b/nomos-services/cryptarchia-consensus/Cargo.toml @@ -29,6 +29,7 @@ bls-signatures = "0.14" serde_with = "3.0.0" nomos-libp2p = { path = "../../nomos-libp2p", optional = true } blake2 = "0.10" +time = { version = "0.3", features = ["serde"] } utoipa = { version = "4.0", optional = true } serde_json = { version = "1", optional = true } diff --git a/nomos-services/cryptarchia-consensus/src/leadership.rs b/nomos-services/cryptarchia-consensus/src/leadership.rs index 8b137891..cf4ec2c2 100644 --- a/nomos-services/cryptarchia-consensus/src/leadership.rs +++ b/nomos-services/cryptarchia-consensus/src/leadership.rs @@ -1 +1,22 @@ +use cryptarchia_engine::Slot; +use cryptarchia_ledger::{Coin, Config, EpochState, LeaderProof}; +pub struct Leader { + coins: Vec, + config: cryptarchia_ledger::Config, +} + +impl Leader { + pub fn new(coins: Vec, config: Config) -> Self { + Leader { coins, config } + } + + pub fn build_proof_for(&mut self, epoch_state: &EpochState, slot: Slot) -> Option { + for coin in &self.coins { + if coin.is_slot_leader(epoch_state, slot, &self.config.consensus_config) { + return Some(coin.to_proof(slot)); + } + } + None + } +} diff --git a/nomos-services/cryptarchia-consensus/src/lib.rs b/nomos-services/cryptarchia-consensus/src/lib.rs index db430a96..35b4ba80 100644 --- a/nomos-services/cryptarchia-consensus/src/lib.rs +++ b/nomos-services/cryptarchia-consensus/src/lib.rs @@ -1,13 +1,19 @@ mod leadership; pub mod network; +mod time; + use core::fmt::Debug; -use cryptarchia_ledger::LedgerState; +use cryptarchia_engine::Slot; +use cryptarchia_ledger::{LeaderProof, LedgerState}; use futures::StreamExt; -use network::NetworkAdapter; -use nomos_core::block::Block; +use network::{messages::NetworkMessage, NetworkAdapter}; use nomos_core::da::certificate::{BlobCertificateSelect, Certificate}; -use nomos_core::header::{cryptarchia, HeaderId}; +use nomos_core::header::{cryptarchia::Header, HeaderId}; use nomos_core::tx::{Transaction, TxSelect}; +use nomos_core::{ + block::{builder::BlockBuilder, Block}, + header::cryptarchia::Builder, +}; use nomos_mempool::{ backend::MemPool, network::NetworkAdapter as MempoolAdapter, Certificate as CertDiscriminant, MempoolMsg, MempoolService, Transaction as TxDiscriminant, @@ -26,6 +32,7 @@ use serde_with::serde_as; use std::hash::Hash; use thiserror::Error; use tokio::sync::oneshot::Sender; +use tokio_stream::wrappers::IntervalStream; use tracing::{error, instrument}; #[derive(Debug, Clone, Error)] @@ -46,7 +53,7 @@ impl Cryptarchia { self.consensus.tip() } - fn try_apply_header(&self, header: &cryptarchia::Header) -> Result { + fn try_apply_header(&self, header: &Header) -> Result { let id = header.id(); let parent = header.parent(); let slot = header.slot(); @@ -64,6 +71,19 @@ impl Cryptarchia { Ok(Self { ledger, consensus }) } + + fn epoch_state_for_slot(&self, slot: Slot) -> Option<&cryptarchia_ledger::EpochState> { + let tip = self.tip(); + let state = self.ledger.state(&tip).expect("no state for tip"); + let requested_epoch = self.ledger.config().epoch(slot); + if state.epoch_state().epoch() == requested_epoch { + Some(state.epoch_state()) + } else if requested_epoch == state.next_epoch_state().epoch() { + Some(state.next_epoch_state()) + } else { + None + } + } } #[derive(Debug, Deserialize, Serialize, Clone)] @@ -72,9 +92,9 @@ pub struct CryptarchiaSettings { pub transaction_selector_settings: Ts, #[serde(default)] pub blob_selector_settings: Bs, - pub consensus_config: cryptarchia_engine::Config, - pub ledger_config: cryptarchia_ledger::Config, + pub config: cryptarchia_ledger::Config, pub genesis_state: LedgerState, + pub time: time::Config, } impl CryptarchiaSettings { @@ -82,16 +102,16 @@ impl CryptarchiaSettings { pub const fn new( transaction_selector_settings: Ts, blob_selector_settings: Bs, - consensus_config: cryptarchia_engine::Config, - ledger_config: cryptarchia_ledger::Config, + config: cryptarchia_ledger::Config, genesis_state: LedgerState, + time: time::Config, ) -> Self { Self { transaction_selector_settings, blob_selector_settings, - consensus_config, - ledger_config, + config, genesis_state, + time, } } } @@ -228,27 +248,34 @@ where .expect("Relay connection with StorageService should succeed"); let CryptarchiaSettings { - consensus_config, - ledger_config, + config, genesis_state, - .. + transaction_selector_settings, + blob_selector_settings, + time, } = self.service_state.settings_reader.get_updated_settings(); let genesis_id = HeaderId::from([0; 32]); let mut cryptarchia = Cryptarchia { + consensus: >::from_genesis( + genesis_id, + config.consensus_config.clone(), + ), ledger: >::from_genesis( genesis_id, genesis_state, - ledger_config, - ), - consensus: >::from_genesis( - genesis_id, - consensus_config, + config.clone(), ), }; let adapter = A::new(network_relay).await; + let tx_selector = TxS::new(transaction_selector_settings); + let blob_selector = BS::new(blob_selector_settings); let mut incoming_blocks = adapter.blocks_stream().await; + let mut leader = leadership::Leader::new(vec![], config); + let timer = time::Timer::new(time); + + let mut slot_timer = IntervalStream::new(timer.slot_interval()); let mut lifecycle_stream = self.service_state.lifecycle_handle.message_stream(); loop { @@ -261,7 +288,32 @@ where cl_mempool_relay.clone(), da_mempool_relay.clone(), ) - .await + .await; + } + + _ = slot_timer.next() => { + let slot = timer.current_slot(); + let parent = cryptarchia.tip(); + + let Some(epoch_state) = cryptarchia.epoch_state_for_slot(slot) else { + tracing::error!("trying to propose a block for slot {} but epoch state is not available", u64::from(slot)); + continue; + }; + if let Some(proof) = leader.build_proof_for(epoch_state, slot) { + // TODO: spawn as a separate task? + let block = Self::propose_block( + parent, + proof, + tx_selector.clone(), + blob_selector.clone(), + cl_mempool_relay.clone(), + da_mempool_relay.clone(), + ).await; + + if let Some(block) = block { + let _ = adapter.broadcast(NetworkMessage::Block(block)).await; + } + } } Some(msg) = self.service_state.inbound_relay.next() => { @@ -356,6 +408,8 @@ where ) -> Cryptarchia { tracing::debug!("received proposal {:?}", block); + // TODO: filter on time? + let header = block.header(); let id = header.id(); match cryptarchia.try_apply_header(block.header().cryptarchia()) { @@ -387,6 +441,40 @@ where cryptarchia } + + #[instrument( + level = "debug", + skip(cl_mempool_relay, da_mempool_relay, tx_selector, blob_selector) + )] + async fn propose_block( + parent: HeaderId, + proof: LeaderProof, + tx_selector: TxS, + blob_selector: BS, + cl_mempool_relay: OutboundRelay>, + da_mempool_relay: OutboundRelay>, + ) -> Option> { + let mut output = None; + let cl_txs = get_mempool_contents(cl_mempool_relay); + let da_certs = get_mempool_contents(da_mempool_relay); + + match futures::join!(cl_txs, da_certs) { + (Ok(cl_txs), Ok(da_certs)) => { + let Ok(block) = BlockBuilder::new(tx_selector, blob_selector) + .with_cryptarchia_builder(Builder::new(parent, proof)) + .with_transactions(cl_txs) + .with_blobs_certificates(da_certs) + .build() + else { + panic!("Proposal block should always succeed to be built") + }; + output = Some(block); + } + (Err(_), _) => tracing::error!("Could not fetch block cl transactions"), + (_, Err(_)) => tracing::error!("Could not fetch block da certificates"), + } + output + } } #[derive(Debug)] @@ -403,6 +491,22 @@ pub struct CryptarchiaInfo { pub tip: HeaderId, } +async fn get_mempool_contents( + mempool: OutboundRelay>, +) -> Result + Send>, tokio::sync::oneshot::error::RecvError> { + let (reply_channel, rx) = tokio::sync::oneshot::channel(); + + mempool + .send(MempoolMsg::View { + ancestor_hint: [0; 32].into(), + reply_channel, + }) + .await + .unwrap_or_else(|(e, _)| eprintln!("Could not get transactions from mempool {e}")); + + rx.await +} + async fn mark_in_block( mempool: OutboundRelay>, ids: impl Iterator, diff --git a/nomos-services/cryptarchia-consensus/src/time.rs b/nomos-services/cryptarchia-consensus/src/time.rs new file mode 100644 index 00000000..4dd700b3 --- /dev/null +++ b/nomos-services/cryptarchia-consensus/src/time.rs @@ -0,0 +1,50 @@ +use cryptarchia_engine::Slot; +use serde::{Deserialize, Serialize}; +use std::time::Duration; +use time::OffsetDateTime; +use tokio::time::{Interval, MissedTickBehavior}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Config { + pub slot_duration: Duration, + /// Start of the first epoch + pub chain_start_time: OffsetDateTime, +} + +#[derive(Clone, Debug)] +pub struct Timer { + config: Config, +} + +impl Timer { + pub fn new(config: Config) -> Self { + Timer { config } + } + + pub fn current_slot(&self) -> Slot { + // TODO: leap seconds / weird time stuff + let since_start = OffsetDateTime::now_utc() - self.config.chain_start_time; + if since_start.is_negative() { + tracing::warn!("Current slot is before the start of the chain"); + Slot::genesis() + } else { + Slot::from(since_start.whole_seconds() as u64 / self.config.slot_duration.as_secs()) + } + } + + /// Ticks at the start of each slot, starting from the next slot + pub fn slot_interval(&self) -> Interval { + let slot_duration = self.config.slot_duration; + let now = OffsetDateTime::now_utc(); + let next_slot_start = self.config.chain_start_time + + slot_duration * u64::from(self.current_slot() + 1) as u32; + let delay = next_slot_start - now; + let mut interval = tokio::time::interval_at( + tokio::time::Instant::now() + + Duration::try_from(delay).expect("could not set slot timer duration"), + slot_duration, + ); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + interval + } +}