add cryptarchia leadership (#613)

This commit is contained in:
Giacomo Pasini 2024-03-18 16:04:16 +01:00 committed by GitHub
parent 1b925d9a3a
commit a933c73245
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 456 additions and 90 deletions

View File

@ -0,0 +1,157 @@
use crate::{crypto::Blake2b, Commitment, EpochState, LeaderProof, Nonce, Nullifier};
use blake2::digest::Digest;
use cryptarchia_engine::config::Config;
use cryptarchia_engine::Slot;
type SecretKey = [u8; 32];
type PublicKey = [u8; 32];
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[derive(Clone, Debug, Eq, PartialEq, Copy, Hash)]
pub struct Value(u32);
impl From<u32> for Value {
fn from(value: u32) -> Self {
Self(value)
}
}
// This implementatio is only a stub
// see https://github.com/logos-co/nomos-specs/blob/master/cryptarchia/cryptarchia.py for a spec
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
pub struct Coin {
sk: SecretKey,
nonce: Nonce,
value: Value,
}
impl Coin {
pub fn new(sk: SecretKey, nonce: Nonce, value: Value) -> Self {
Self { sk, nonce, value }
}
pub fn evolve(&self) -> Self {
let mut h = Blake2b::new();
h.update("coin-evolve");
h.update(self.sk);
h.update(self.nonce);
Self {
sk: self.sk,
nonce: <[u8; 32]>::from(h.finalize()).into(),
value: self.value,
}
}
fn value_bytes(&self) -> [u8; 32] {
let mut value_bytes = [0; 32];
value_bytes[28..].copy_from_slice(&self.value.0.to_be_bytes());
value_bytes
}
pub fn pk(&self) -> &PublicKey {
&self.sk
}
pub fn commitment(&self) -> Commitment {
let mut h = Blake2b::new();
h.update("coin-commitment");
h.update(self.nonce);
h.update(self.pk());
h.update(self.value_bytes());
<[u8; 32]>::from(h.finalize()).into()
}
pub fn nullifier(&self) -> Nullifier {
let mut h = Blake2b::new();
h.update("coin-nullifier");
h.update(self.nonce);
h.update(self.pk());
h.update(self.value_bytes());
<[u8; 32]>::from(h.finalize()).into()
}
// TODO: better precision
fn phi(active_slot_coeff: f64, relative_stake: f64) -> f64 {
1.0 - (1.0 - active_slot_coeff).powf(relative_stake)
}
pub fn vrf(&self, epoch_nonce: Nonce, slot: Slot) -> [u8; 32] {
let mut h = Blake2b::new();
h.update("lead");
h.update(epoch_nonce);
h.update(u64::from(slot).to_be_bytes());
h.update(self.sk);
h.update(self.nonce);
h.finalize().into()
}
pub fn is_slot_leader(&self, epoch: &EpochState, slot: Slot, config: &Config) -> bool {
// TODO: check slot and epoch state are consistent
let relative_stake = f64::from(self.value.0) / f64::from(epoch.total_stake.0);
let vrf = self.vrf(epoch.nonce, slot);
// HACK: we artificially restrict the VRF output so that it fits into a u32
// this is not going to be the final implementation anyway and the last 4 bytes
// should have a similar (statistical) distribution as the full output
let vrf = u32::from_be_bytes(vrf[28..].try_into().unwrap());
vrf < (f64::from(u32::MAX) * Self::phi(config.active_slot_coeff, relative_stake)) as u32
}
pub fn to_proof(&self, slot: Slot) -> LeaderProof {
LeaderProof::new(
self.commitment(),
self.nullifier(),
slot,
self.evolve().commitment(),
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_slot_leader_statistics() {
let epoch = EpochState {
epoch: 0.into(),
nonce: [1; 32].into(),
commitments: Default::default(),
total_stake: 1000.into(),
};
let coin = Coin {
sk: [0; 32],
nonce: [0; 32].into(),
value: 10.into(),
};
let config = Config {
active_slot_coeff: 0.05,
security_param: 1,
};
// We'll use the Margin of Error equation to decide how many samples we need.
// https://en.wikipedia.org/wiki/Margin_of_error
let margin_of_error = 1e-4;
let p = Coin::phi(config.active_slot_coeff, 10.0 / 1000.0);
let std = (p * (1.0 - p)).sqrt();
let z = 3.0; // we want 3 std from the mean to be within the margin of error
let n = (z * std / margin_of_error).powi(2).ceil();
// After N slots, the measured leader rate should be within the
// interval `p +- margin_of_error` with high probabiltiy
let leader_rate = (0..n as u64)
.map(|slot| coin.is_slot_leader(&epoch, slot.into(), &config))
.filter(|x| *x)
.count();
println!("leader_rate: {n} {} / {}", leader_rate, p);
assert!(
(leader_rate as f64 / n - p).abs() < margin_of_error,
"{leader_rate} != {p}, err={} > {margin_of_error}",
(leader_rate as f64 / n - p).abs()
);
}
}

View File

@ -31,15 +31,53 @@ impl Config {
pub fn nonce_snapshot(&self, epoch: Epoch) -> Slot { pub fn nonce_snapshot(&self, epoch: Epoch) -> Slot {
let offset = self.base_period_length() let offset = self.base_period_length()
* (self.epoch_period_nonce_buffer + self.epoch_stake_distribution_stabilization) as u64; * (self.epoch_period_nonce_buffer + self.epoch_stake_distribution_stabilization) as u64;
let base = u32::from(epoch) as u64 * self.epoch_length(); let base = (u32::from(epoch) - 1) as u64 * self.epoch_length();
(base + offset).into() (base + offset).into()
} }
pub fn stake_distribution_snapshot(&self, epoch: Epoch) -> Slot { pub fn stake_distribution_snapshot(&self, epoch: Epoch) -> Slot {
(u32::from(epoch) as u64 * self.epoch_length()).into() ((u32::from(epoch) - 1) as u64 * self.epoch_length()).into()
} }
pub fn epoch(&self, slot: Slot) -> Epoch { pub fn epoch(&self, slot: Slot) -> Epoch {
((u64::from(slot) / self.epoch_length()) as u32).into() ((u64::from(slot) / self.epoch_length()) as u32).into()
} }
} }
#[cfg(test)]
mod tests {
#[test]
fn epoch_snapshots() {
let config = super::Config {
epoch_stake_distribution_stabilization: 3,
epoch_period_nonce_buffer: 3,
epoch_period_nonce_stabilization: 4,
consensus_config: cryptarchia_engine::Config {
security_param: 5,
active_slot_coeff: 0.5,
},
};
assert_eq!(config.epoch_length(), 100);
assert_eq!(config.nonce_snapshot(1.into()), 60.into());
assert_eq!(config.nonce_snapshot(2.into()), 160.into());
assert_eq!(config.stake_distribution_snapshot(1.into()), 0.into());
assert_eq!(config.stake_distribution_snapshot(2.into()), 100.into());
}
#[test]
fn slot_to_epoch() {
let config = super::Config {
epoch_stake_distribution_stabilization: 3,
epoch_period_nonce_buffer: 3,
epoch_period_nonce_stabilization: 4,
consensus_config: cryptarchia_engine::Config {
security_param: 5,
active_slot_coeff: 0.5,
},
};
assert_eq!(config.epoch(1.into()), 0.into());
assert_eq!(config.epoch(100.into()), 1.into());
assert_eq!(config.epoch(101.into()), 1.into());
assert_eq!(config.epoch(200.into()), 2.into());
}
}

View File

@ -1,3 +1,4 @@
mod coin;
mod config; mod config;
mod crypto; mod crypto;
mod leader_proof; mod leader_proof;
@ -12,6 +13,7 @@ use thiserror::Error;
type HashTrieSet<T> = rpds::HashTrieSetSync<T>; type HashTrieSet<T> = rpds::HashTrieSetSync<T>;
pub use coin::{Coin, Value};
pub use config::Config; pub use config::Config;
pub use leader_proof::*; pub use leader_proof::*;
pub use nonce::*; pub use nonce::*;
@ -42,6 +44,7 @@ pub struct EpochState {
// stake distribution snapshot taken at the beginning of the epoch // stake distribution snapshot taken at the beginning of the epoch
// (in practice, this is equivalent to the coins the are spendable at the beginning of the epoch) // (in practice, this is equivalent to the coins the are spendable at the beginning of the epoch)
commitments: HashTrieSet<Commitment>, commitments: HashTrieSet<Commitment>,
total_stake: Value,
} }
impl EpochState { impl EpochState {
@ -63,12 +66,17 @@ impl EpochState {
epoch: self.epoch, epoch: self.epoch,
nonce, nonce,
commitments, commitments,
total_stake: self.total_stake,
} }
} }
fn is_eligible_leader(&self, commitment: &Commitment) -> bool { fn is_eligible_leader(&self, commitment: &Commitment) -> bool {
self.commitments.contains(commitment) self.commitments.contains(commitment)
} }
pub fn epoch(&self) -> Epoch {
self.epoch
}
} }
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
@ -131,6 +139,10 @@ where
pub fn state(&self, id: &Id) -> Option<&LedgerState> { pub fn state(&self, id: &Id) -> Option<&LedgerState> {
self.states.get(id) self.states.get(id)
} }
pub fn config(&self) -> &Config {
&self.config
}
} }
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
@ -170,6 +182,8 @@ impl LedgerState {
}); });
} }
// TODO: update once supply can change
let total_stake = self.epoch_state.total_stake;
let current_epoch = config.epoch(self.slot); let current_epoch = config.epoch(self.slot);
let new_epoch = config.epoch(slot); let new_epoch = config.epoch(slot);
@ -198,6 +212,7 @@ impl LedgerState {
epoch: new_epoch + 1, epoch: new_epoch + 1,
nonce: self.nonce, nonce: self.nonce,
commitments: self.spend_commitments.clone(), commitments: self.spend_commitments.clone(),
total_stake,
}; };
Ok(Self { Ok(Self {
slot, slot,
@ -211,11 +226,13 @@ impl LedgerState {
epoch: new_epoch, epoch: new_epoch,
nonce: self.nonce, nonce: self.nonce,
commitments: self.spend_commitments.clone(), commitments: self.spend_commitments.clone(),
total_stake,
}; };
let next_epoch_state = EpochState { let next_epoch_state = EpochState {
epoch: new_epoch + 1, epoch: new_epoch + 1,
nonce: self.nonce, nonce: self.nonce,
commitments: self.spend_commitments.clone(), commitments: self.spend_commitments.clone(),
total_stake,
}; };
Ok(Self { Ok(Self {
slot, slot,
@ -318,14 +335,28 @@ impl LedgerState {
epoch: 1.into(), epoch: 1.into(),
nonce: [0; 32].into(), nonce: [0; 32].into(),
commitments: Default::default(), commitments: Default::default(),
total_stake: 1.into(),
}, },
epoch_state: EpochState { epoch_state: EpochState {
epoch: 0.into(), epoch: 0.into(),
nonce: [0; 32].into(), nonce: [0; 32].into(),
commitments: Default::default(), commitments: Default::default(),
total_stake: 1.into(),
}, },
} }
} }
pub fn slot(&self) -> Slot {
self.slot
}
pub fn epoch_state(&self) -> &EpochState {
&self.epoch_state
}
pub fn next_epoch_state(&self) -> &EpochState {
&self.next_epoch_state
}
} }
impl core::fmt::Debug for LedgerState { impl core::fmt::Debug for LedgerState {
@ -348,14 +379,19 @@ impl core::fmt::Debug for LedgerState {
#[cfg(test)] #[cfg(test)]
pub mod tests { pub mod tests {
use super::{EpochState, Ledger, LedgerState}; use super::{Coin, EpochState, Ledger, LedgerState};
use crate::{crypto::Blake2b, Commitment, Config, LeaderProof, LedgerError, Nullifier}; use crate::{crypto::Blake2b, Commitment, Config, LedgerError};
use blake2::Digest; use blake2::Digest;
use cryptarchia_engine::Slot; use cryptarchia_engine::Slot;
use std::hash::{DefaultHasher, Hash, Hasher};
type HeaderId = [u8; 32]; type HeaderId = [u8; 32];
fn coin(id: u64) -> Coin {
let mut sk = [0; 32];
sk[..8].copy_from_slice(&id.to_be_bytes());
Coin::new(sk, [0; 32].into(), 1.into())
}
fn update_ledger( fn update_ledger(
ledger: &mut Ledger<HeaderId>, ledger: &mut Ledger<HeaderId>,
parent: HeaderId, parent: HeaderId,
@ -369,8 +405,7 @@ pub mod tests {
Blake2b::new() Blake2b::new()
.chain_update(parent) .chain_update(parent)
.chain_update(slot.into().to_be_bytes()) .chain_update(slot.into().to_be_bytes())
.chain_update(coin.sk.to_be_bytes()) .chain_update(coin.vrf([0; 32].into(), 0.into()))
.chain_update(coin.nonce.to_be_bytes())
.finalize() .finalize()
.into() .into()
} }
@ -408,54 +443,6 @@ pub mod tests {
} }
} }
#[derive(Debug, Clone, Copy)]
pub struct Coin {
sk: u64,
nonce: u64,
}
impl Coin {
pub fn new(sk: u64) -> Self {
Self { sk, nonce: 0 }
}
pub fn commitment(&self) -> Commitment {
<[u8; 32]>::from(
Blake2b::new_with_prefix("commitment".as_bytes())
.chain_update(self.sk.to_be_bytes())
.chain_update(self.nonce.to_be_bytes())
.finalize(),
)
.into()
}
pub fn nullifier(&self) -> Nullifier {
<[u8; 32]>::from(
Blake2b::new_with_prefix("nullifier".as_bytes())
.chain_update(self.sk.to_be_bytes())
.chain_update(self.nonce.to_be_bytes())
.finalize(),
)
.into()
}
pub fn evolve(&self) -> Self {
let mut h = DefaultHasher::new();
self.nonce.hash(&mut h);
let nonce = h.finish();
Self { sk: self.sk, nonce }
}
pub fn to_proof(&self, slot: Slot) -> LeaderProof {
LeaderProof::new(
self.commitment(),
self.nullifier(),
slot,
self.evolve().commitment(),
)
}
}
pub fn genesis_state(commitments: &[Commitment]) -> LedgerState { pub fn genesis_state(commitments: &[Commitment]) -> LedgerState {
LedgerState { LedgerState {
lead_commitments: commitments.iter().cloned().collect(), lead_commitments: commitments.iter().cloned().collect(),
@ -467,11 +454,13 @@ pub mod tests {
epoch: 1.into(), epoch: 1.into(),
nonce: [0; 32].into(), nonce: [0; 32].into(),
commitments: commitments.iter().cloned().collect(), commitments: commitments.iter().cloned().collect(),
total_stake: 1.into(),
}, },
epoch_state: EpochState { epoch_state: EpochState {
epoch: 0.into(), epoch: 0.into(),
nonce: [0; 32].into(), nonce: [0; 32].into(),
commitments: commitments.iter().cloned().collect(), commitments: commitments.iter().cloned().collect(),
total_stake: 1.into(),
}, },
} }
} }
@ -503,7 +492,7 @@ pub mod tests {
#[test] #[test]
fn test_ledger_state_prevents_coin_reuse() { fn test_ledger_state_prevents_coin_reuse() {
let coin = Coin::new(0); let coin = coin(0);
let (mut ledger, genesis) = ledger(&[coin.commitment()]); let (mut ledger, genesis) = ledger(&[coin.commitment()]);
let h = update_ledger(&mut ledger, genesis, 1, coin).unwrap(); let h = update_ledger(&mut ledger, genesis, 1, coin).unwrap();
@ -517,7 +506,7 @@ pub mod tests {
#[test] #[test]
fn test_ledger_state_uncommited_coin() { fn test_ledger_state_uncommited_coin() {
let coin = Coin::new(0); let coin = coin(0);
let (mut ledger, genesis) = ledger(&[]); let (mut ledger, genesis) = ledger(&[]);
assert!(matches!( assert!(matches!(
update_ledger(&mut ledger, genesis, 1, coin), update_ledger(&mut ledger, genesis, 1, coin),
@ -527,9 +516,9 @@ pub mod tests {
#[test] #[test]
fn test_ledger_state_is_properly_updated_on_reorg() { fn test_ledger_state_is_properly_updated_on_reorg() {
let coin_1 = Coin::new(0); let coin_1 = coin(0);
let coin_2 = Coin::new(1); let coin_2 = coin(1);
let coin_3 = Coin::new(2); let coin_3 = coin(2);
let (mut ledger, genesis) = ledger(&[ let (mut ledger, genesis) = ledger(&[
coin_1.commitment(), coin_1.commitment(),
@ -550,9 +539,9 @@ pub mod tests {
#[test] #[test]
fn test_epoch_transition() { fn test_epoch_transition() {
let coins = (0..4).map(Coin::new).collect::<Vec<_>>(); let coins = (0..4).map(coin).collect::<Vec<_>>();
let coin_4 = Coin::new(4); let coin_4 = coin(4);
let coin_5 = Coin::new(5); let coin_5 = coin(5);
let (mut ledger, genesis) = let (mut ledger, genesis) =
ledger(&coins.iter().map(|c| c.commitment()).collect::<Vec<_>>()); ledger(&coins.iter().map(|c| c.commitment()).collect::<Vec<_>>());
@ -597,7 +586,7 @@ pub mod tests {
#[test] #[test]
fn test_evolved_coin_is_eligible_for_leadership() { fn test_evolved_coin_is_eligible_for_leadership() {
let coin = Coin::new(0); let coin = coin(0);
let (mut ledger, genesis) = ledger(&[coin.commitment()]); let (mut ledger, genesis) = ledger(&[coin.commitment()]);
let h = update_ledger(&mut ledger, genesis, 1, coin).unwrap(); let h = update_ledger(&mut ledger, genesis, 1, coin).unwrap();
@ -620,8 +609,9 @@ pub mod tests {
#[test] #[test]
fn test_new_coins_becoming_eligible_after_stake_distribution_stabilizes() { fn test_new_coins_becoming_eligible_after_stake_distribution_stabilizes() {
let coin = Coin::new(0); let coin_1 = coin(1);
let coin_1 = Coin::new(1); let coin = coin(0);
let (mut ledger, genesis) = ledger(&[coin.commitment()]); let (mut ledger, genesis) = ledger(&[coin.commitment()]);
// EPOCH 0 // EPOCH 0
@ -657,7 +647,7 @@ pub mod tests {
#[test] #[test]
fn test_orphan_proof_import() { fn test_orphan_proof_import() {
let coin = Coin::new(0); let coin = coin(0);
let (mut ledger, genesis) = ledger(&[coin.commitment()]); let (mut ledger, genesis) = ledger(&[coin.commitment()]);
let coin_new = coin.evolve(); let coin_new = coin.evolve();

View File

@ -14,4 +14,9 @@ impl From<Nonce> for [u8; 32] {
} }
} }
impl AsRef<[u8]> for Nonce {
fn as_ref(&self) -> &[u8] {
&self.0
}
}
serialize_bytes_newtype!(Nonce); serialize_bytes_newtype!(Nonce);

View File

@ -4,4 +4,4 @@ pub type PublicKey = [u8; 32];
pub type PrivateKey = [u8; 32]; pub type PrivateKey = [u8; 32];
pub type Signature = [u8; 32]; pub type Signature = [u8; 32];
pub(crate) type Blake2b = blake2::Blake2b<U32>; pub type Blake2b = blake2::Blake2b<U32>;

View File

@ -91,10 +91,10 @@ pub struct Builder {
} }
impl Builder { impl Builder {
pub fn new(parent: HeaderId, slot: Slot, leader_proof: LeaderProof) -> Self { pub fn new(parent: HeaderId, leader_proof: LeaderProof) -> Self {
Self { Self {
parent, parent,
slot, slot: leader_proof.slot(),
leader_proof, leader_proof,
orphaned_leader_proofs: vec![], orphaned_leader_proofs: vec![],
} }

View File

@ -29,6 +29,7 @@ bls-signatures = "0.14"
serde_with = "3.0.0" serde_with = "3.0.0"
nomos-libp2p = { path = "../../nomos-libp2p", optional = true } nomos-libp2p = { path = "../../nomos-libp2p", optional = true }
blake2 = "0.10" blake2 = "0.10"
time = { version = "0.3", features = ["serde"] }
utoipa = { version = "4.0", optional = true } utoipa = { version = "4.0", optional = true }
serde_json = { version = "1", optional = true } serde_json = { version = "1", optional = true }

View File

@ -1 +1,22 @@
use cryptarchia_engine::Slot;
use cryptarchia_ledger::{Coin, Config, EpochState, LeaderProof};
pub struct Leader {
coins: Vec<Coin>,
config: cryptarchia_ledger::Config,
}
impl Leader {
pub fn new(coins: Vec<Coin>, config: Config) -> Self {
Leader { coins, config }
}
pub fn build_proof_for(&mut self, epoch_state: &EpochState, slot: Slot) -> Option<LeaderProof> {
for coin in &self.coins {
if coin.is_slot_leader(epoch_state, slot, &self.config.consensus_config) {
return Some(coin.to_proof(slot));
}
}
None
}
}

View File

@ -1,13 +1,19 @@
mod leadership; mod leadership;
pub mod network; pub mod network;
mod time;
use core::fmt::Debug; use core::fmt::Debug;
use cryptarchia_ledger::LedgerState; use cryptarchia_engine::Slot;
use cryptarchia_ledger::{LeaderProof, LedgerState};
use futures::StreamExt; use futures::StreamExt;
use network::NetworkAdapter; use network::{messages::NetworkMessage, NetworkAdapter};
use nomos_core::block::Block;
use nomos_core::da::certificate::{BlobCertificateSelect, Certificate}; use nomos_core::da::certificate::{BlobCertificateSelect, Certificate};
use nomos_core::header::{cryptarchia, HeaderId}; use nomos_core::header::{cryptarchia::Header, HeaderId};
use nomos_core::tx::{Transaction, TxSelect}; use nomos_core::tx::{Transaction, TxSelect};
use nomos_core::{
block::{builder::BlockBuilder, Block},
header::cryptarchia::Builder,
};
use nomos_mempool::{ use nomos_mempool::{
backend::MemPool, network::NetworkAdapter as MempoolAdapter, Certificate as CertDiscriminant, backend::MemPool, network::NetworkAdapter as MempoolAdapter, Certificate as CertDiscriminant,
MempoolMsg, MempoolService, Transaction as TxDiscriminant, MempoolMsg, MempoolService, Transaction as TxDiscriminant,
@ -26,6 +32,7 @@ use serde_with::serde_as;
use std::hash::Hash; use std::hash::Hash;
use thiserror::Error; use thiserror::Error;
use tokio::sync::oneshot::Sender; use tokio::sync::oneshot::Sender;
use tokio_stream::wrappers::IntervalStream;
use tracing::{error, instrument}; use tracing::{error, instrument};
#[derive(Debug, Clone, Error)] #[derive(Debug, Clone, Error)]
@ -46,7 +53,7 @@ impl Cryptarchia {
self.consensus.tip() self.consensus.tip()
} }
fn try_apply_header(&self, header: &cryptarchia::Header) -> Result<Self, Error> { fn try_apply_header(&self, header: &Header) -> Result<Self, Error> {
let id = header.id(); let id = header.id();
let parent = header.parent(); let parent = header.parent();
let slot = header.slot(); let slot = header.slot();
@ -64,6 +71,19 @@ impl Cryptarchia {
Ok(Self { ledger, consensus }) Ok(Self { ledger, consensus })
} }
fn epoch_state_for_slot(&self, slot: Slot) -> Option<&cryptarchia_ledger::EpochState> {
let tip = self.tip();
let state = self.ledger.state(&tip).expect("no state for tip");
let requested_epoch = self.ledger.config().epoch(slot);
if state.epoch_state().epoch() == requested_epoch {
Some(state.epoch_state())
} else if requested_epoch == state.next_epoch_state().epoch() {
Some(state.next_epoch_state())
} else {
None
}
}
} }
#[derive(Debug, Deserialize, Serialize, Clone)] #[derive(Debug, Deserialize, Serialize, Clone)]
@ -72,9 +92,9 @@ pub struct CryptarchiaSettings<Ts, Bs> {
pub transaction_selector_settings: Ts, pub transaction_selector_settings: Ts,
#[serde(default)] #[serde(default)]
pub blob_selector_settings: Bs, pub blob_selector_settings: Bs,
pub consensus_config: cryptarchia_engine::Config, pub config: cryptarchia_ledger::Config,
pub ledger_config: cryptarchia_ledger::Config,
pub genesis_state: LedgerState, pub genesis_state: LedgerState,
pub time: time::Config,
} }
impl<Ts, Bs> CryptarchiaSettings<Ts, Bs> { impl<Ts, Bs> CryptarchiaSettings<Ts, Bs> {
@ -82,16 +102,16 @@ impl<Ts, Bs> CryptarchiaSettings<Ts, Bs> {
pub const fn new( pub const fn new(
transaction_selector_settings: Ts, transaction_selector_settings: Ts,
blob_selector_settings: Bs, blob_selector_settings: Bs,
consensus_config: cryptarchia_engine::Config, config: cryptarchia_ledger::Config,
ledger_config: cryptarchia_ledger::Config,
genesis_state: LedgerState, genesis_state: LedgerState,
time: time::Config,
) -> Self { ) -> Self {
Self { Self {
transaction_selector_settings, transaction_selector_settings,
blob_selector_settings, blob_selector_settings,
consensus_config, config,
ledger_config,
genesis_state, genesis_state,
time,
} }
} }
} }
@ -228,27 +248,34 @@ where
.expect("Relay connection with StorageService should succeed"); .expect("Relay connection with StorageService should succeed");
let CryptarchiaSettings { let CryptarchiaSettings {
consensus_config, config,
ledger_config,
genesis_state, genesis_state,
.. transaction_selector_settings,
blob_selector_settings,
time,
} = self.service_state.settings_reader.get_updated_settings(); } = self.service_state.settings_reader.get_updated_settings();
let genesis_id = HeaderId::from([0; 32]); let genesis_id = HeaderId::from([0; 32]);
let mut cryptarchia = Cryptarchia { let mut cryptarchia = Cryptarchia {
consensus: <cryptarchia_engine::Cryptarchia<_>>::from_genesis(
genesis_id,
config.consensus_config.clone(),
),
ledger: <cryptarchia_ledger::Ledger<_>>::from_genesis( ledger: <cryptarchia_ledger::Ledger<_>>::from_genesis(
genesis_id, genesis_id,
genesis_state, genesis_state,
ledger_config, config.clone(),
),
consensus: <cryptarchia_engine::Cryptarchia<_>>::from_genesis(
genesis_id,
consensus_config,
), ),
}; };
let adapter = A::new(network_relay).await; let adapter = A::new(network_relay).await;
let tx_selector = TxS::new(transaction_selector_settings);
let blob_selector = BS::new(blob_selector_settings);
let mut incoming_blocks = adapter.blocks_stream().await; let mut incoming_blocks = adapter.blocks_stream().await;
let mut leader = leadership::Leader::new(vec![], config);
let timer = time::Timer::new(time);
let mut slot_timer = IntervalStream::new(timer.slot_interval());
let mut lifecycle_stream = self.service_state.lifecycle_handle.message_stream(); let mut lifecycle_stream = self.service_state.lifecycle_handle.message_stream();
loop { loop {
@ -261,7 +288,32 @@ where
cl_mempool_relay.clone(), cl_mempool_relay.clone(),
da_mempool_relay.clone(), da_mempool_relay.clone(),
) )
.await .await;
}
_ = slot_timer.next() => {
let slot = timer.current_slot();
let parent = cryptarchia.tip();
let Some(epoch_state) = cryptarchia.epoch_state_for_slot(slot) else {
tracing::error!("trying to propose a block for slot {} but epoch state is not available", u64::from(slot));
continue;
};
if let Some(proof) = leader.build_proof_for(epoch_state, slot) {
// TODO: spawn as a separate task?
let block = Self::propose_block(
parent,
proof,
tx_selector.clone(),
blob_selector.clone(),
cl_mempool_relay.clone(),
da_mempool_relay.clone(),
).await;
if let Some(block) = block {
let _ = adapter.broadcast(NetworkMessage::Block(block)).await;
}
}
} }
Some(msg) = self.service_state.inbound_relay.next() => { Some(msg) = self.service_state.inbound_relay.next() => {
@ -356,6 +408,8 @@ where
) -> Cryptarchia { ) -> Cryptarchia {
tracing::debug!("received proposal {:?}", block); tracing::debug!("received proposal {:?}", block);
// TODO: filter on time?
let header = block.header(); let header = block.header();
let id = header.id(); let id = header.id();
match cryptarchia.try_apply_header(block.header().cryptarchia()) { match cryptarchia.try_apply_header(block.header().cryptarchia()) {
@ -387,6 +441,40 @@ where
cryptarchia cryptarchia
} }
#[instrument(
level = "debug",
skip(cl_mempool_relay, da_mempool_relay, tx_selector, blob_selector)
)]
async fn propose_block(
parent: HeaderId,
proof: LeaderProof,
tx_selector: TxS,
blob_selector: BS,
cl_mempool_relay: OutboundRelay<MempoolMsg<HeaderId, ClPool::Item, ClPool::Key>>,
da_mempool_relay: OutboundRelay<MempoolMsg<HeaderId, DaPool::Item, DaPool::Key>>,
) -> Option<Block<ClPool::Item, DaPool::Item>> {
let mut output = None;
let cl_txs = get_mempool_contents(cl_mempool_relay);
let da_certs = get_mempool_contents(da_mempool_relay);
match futures::join!(cl_txs, da_certs) {
(Ok(cl_txs), Ok(da_certs)) => {
let Ok(block) = BlockBuilder::new(tx_selector, blob_selector)
.with_cryptarchia_builder(Builder::new(parent, proof))
.with_transactions(cl_txs)
.with_blobs_certificates(da_certs)
.build()
else {
panic!("Proposal block should always succeed to be built")
};
output = Some(block);
}
(Err(_), _) => tracing::error!("Could not fetch block cl transactions"),
(_, Err(_)) => tracing::error!("Could not fetch block da certificates"),
}
output
}
} }
#[derive(Debug)] #[derive(Debug)]
@ -403,6 +491,22 @@ pub struct CryptarchiaInfo {
pub tip: HeaderId, pub tip: HeaderId,
} }
async fn get_mempool_contents<Item, Key>(
mempool: OutboundRelay<MempoolMsg<HeaderId, Item, Key>>,
) -> Result<Box<dyn Iterator<Item = Item> + Send>, tokio::sync::oneshot::error::RecvError> {
let (reply_channel, rx) = tokio::sync::oneshot::channel();
mempool
.send(MempoolMsg::View {
ancestor_hint: [0; 32].into(),
reply_channel,
})
.await
.unwrap_or_else(|(e, _)| eprintln!("Could not get transactions from mempool {e}"));
rx.await
}
async fn mark_in_block<Item, Key>( async fn mark_in_block<Item, Key>(
mempool: OutboundRelay<MempoolMsg<HeaderId, Item, Key>>, mempool: OutboundRelay<MempoolMsg<HeaderId, Item, Key>>,
ids: impl Iterator<Item = Key>, ids: impl Iterator<Item = Key>,

View File

@ -0,0 +1,50 @@
use cryptarchia_engine::Slot;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use time::OffsetDateTime;
use tokio::time::{Interval, MissedTickBehavior};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Config {
pub slot_duration: Duration,
/// Start of the first epoch
pub chain_start_time: OffsetDateTime,
}
#[derive(Clone, Debug)]
pub struct Timer {
config: Config,
}
impl Timer {
pub fn new(config: Config) -> Self {
Timer { config }
}
pub fn current_slot(&self) -> Slot {
// TODO: leap seconds / weird time stuff
let since_start = OffsetDateTime::now_utc() - self.config.chain_start_time;
if since_start.is_negative() {
tracing::warn!("Current slot is before the start of the chain");
Slot::genesis()
} else {
Slot::from(since_start.whole_seconds() as u64 / self.config.slot_duration.as_secs())
}
}
/// Ticks at the start of each slot, starting from the next slot
pub fn slot_interval(&self) -> Interval {
let slot_duration = self.config.slot_duration;
let now = OffsetDateTime::now_utc();
let next_slot_start = self.config.chain_start_time
+ slot_duration * u64::from(self.current_slot() + 1) as u32;
let delay = next_slot_start - now;
let mut interval = tokio::time::interval_at(
tokio::time::Instant::now()
+ Duration::try_from(delay).expect("could not set slot timer duration"),
slot_duration,
);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
interval
}
}