Make block id generic in mempool (#605)

This commit is contained in:
Giacomo Pasini 2024-03-11 11:07:58 +01:00 committed by GitHub
parent 31c5f69121
commit 06d225db20
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 78 additions and 52 deletions

View File

@ -53,7 +53,7 @@ pub struct AxumBackend<T, S, const SIZE: usize> {
da_status,
),
components(
schemas(Status, MempoolMetrics)
schemas(Status<BlockId>, MempoolMetrics)
),
tags(
(name = "da", description = "data availibility related APIs")

View File

@ -15,6 +15,7 @@ use bytes::Bytes;
use carnot_consensus::CarnotConsensus;
use nomos_api::ApiService;
use nomos_core::{
block::BlockId,
da::{blob, certificate},
tx::Transaction,
wire,
@ -58,9 +59,13 @@ const MB16: usize = 1024 * 1024 * 16;
pub type Carnot = CarnotConsensus<
ConsensusLibp2pAdapter,
MockPool<Tx, <Tx as Transaction>::Hash>,
MockPool<BlockId, Tx, <Tx as Transaction>::Hash>,
MempoolLibp2pAdapter<Tx, <Tx as Transaction>::Hash>,
MockPool<Certificate, <<Certificate as certificate::Certificate>::Blob as blob::Blob>::Hash>,
MockPool<
BlockId,
Certificate,
<<Certificate as certificate::Certificate>::Blob as blob::Blob>::Hash,
>,
MempoolLibp2pAdapter<
Certificate,
<<Certificate as certificate::Certificate>::Blob as blob::Blob>::Hash,
@ -77,7 +82,7 @@ pub type DataAvailability = DataAvailabilityService<
DaLibp2pAdapter<Blob, Attestation>,
>;
type Mempool<K, V, D> = MempoolService<MempoolLibp2pAdapter<K, V>, MockPool<K, V>, D>;
type Mempool<K, V, D> = MempoolService<MempoolLibp2pAdapter<K, V>, MockPool<BlockId, K, V>, D>;
#[derive(Services)]
pub struct Nomos {

View File

@ -1,5 +1,6 @@
use core::{fmt::Debug, hash::Hash};
use nomos_core::block::BlockId;
use nomos_core::tx::Transaction;
use nomos_mempool::{
backend::mockpool::MockPool,
@ -12,7 +13,7 @@ use tokio::sync::oneshot;
type ClMempoolService<T> = MempoolService<
Libp2pAdapter<T, <T as Transaction>::Hash>,
MockPool<T, <T as Transaction>::Hash>,
MockPool<BlockId, T, <T as Transaction>::Hash>,
TxDiscriminant,
>;
@ -46,7 +47,7 @@ where
pub async fn cl_mempool_status<T>(
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
items: Vec<<T as Transaction>::Hash>,
) -> Result<Vec<Status>, super::DynError>
) -> Result<Vec<Status<BlockId>>, super::DynError>
where
T: Transaction
+ Clone

View File

@ -27,9 +27,13 @@ use nomos_storage::backends::{sled::SledBackend, StorageSerde};
pub type Carnot<Tx, SS, const SIZE: usize> = CarnotConsensus<
ConsensusLibp2pAdapter,
MockPool<Tx, <Tx as Transaction>::Hash>,
MockPool<BlockId, Tx, <Tx as Transaction>::Hash>,
MempoolLibp2pAdapter<Tx, <Tx as Transaction>::Hash>,
MockPool<Certificate, <<Certificate as certificate::Certificate>::Blob as blob::Blob>::Hash>,
MockPool<
BlockId,
Certificate,
<<Certificate as certificate::Certificate>::Blob as blob::Blob>::Hash,
>,
MempoolLibp2pAdapter<
Certificate,
<<Certificate as certificate::Certificate>::Blob as blob::Blob>::Hash,

View File

@ -1,4 +1,5 @@
use full_replication::{AbsoluteNumber, Attestation, Blob, Certificate, FullReplication};
use nomos_core::block::BlockId;
use nomos_core::da::blob;
use nomos_da::{
backend::memory_cache::BlobCache, network::adapters::libp2p::Libp2pAdapter as DaLibp2pAdapter,
@ -14,7 +15,7 @@ use tokio::sync::oneshot;
pub type DaMempoolService = MempoolService<
Libp2pAdapter<Certificate, <Blob as blob::Blob>::Hash>,
MockPool<Certificate, <Blob as blob::Blob>::Hash>,
MockPool<BlockId, Certificate, <Blob as blob::Blob>::Hash>,
CertDiscriminant,
>;
@ -42,7 +43,7 @@ pub async fn da_mempool_metrics(
pub async fn da_mempool_status(
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
items: Vec<<Blob as blob::Blob>::Hash>,
) -> Result<Vec<Status>, super::DynError> {
) -> Result<Vec<Status<BlockId>>, super::DynError> {
let relay = handle.relay::<DaMempoolService>().connect().await?;
let (sender, receiver) = oneshot::channel();
relay

View File

@ -1,5 +1,5 @@
use core::{fmt::Debug, hash::Hash};
use nomos_core::block::BlockId;
use nomos_mempool::{
backend::mockpool::MockPool, network::NetworkAdapter, Discriminant, MempoolMsg, MempoolService,
};
@ -20,7 +20,7 @@ where
Key: Clone + Debug + Ord + Hash + 'static,
{
let relay = handle
.relay::<MempoolService<A, MockPool<Item, Key>, D>>()
.relay::<MempoolService<A, MockPool<BlockId, Item, Key>, D>>()
.connect()
.await?;
let (sender, receiver) = oneshot::channel();

View File

@ -113,8 +113,8 @@ pub struct CarnotConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, T
where
A: NetworkAdapter,
ClPoolAdapter: MempoolAdapter<Item = ClPool::Item, Key = ClPool::Key>,
ClPool: MemPool,
DaPool: MemPool,
ClPool: MemPool<BlockId = BlockId>,
DaPool: MemPool<BlockId = BlockId>,
DaPoolAdapter: MempoolAdapter<Item = DaPool::Item, Key = DaPool::Key>,
O: Overlay + Debug,
ClPool::Item: Debug + 'static,
@ -140,10 +140,10 @@ impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS, Storage> Servi
for CarnotConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS, Storage>
where
A: NetworkAdapter,
ClPool: MemPool,
ClPool: MemPool<BlockId = BlockId>,
ClPool::Item: Debug,
ClPool::Key: Debug,
DaPool: MemPool,
DaPool: MemPool<BlockId = BlockId>,
DaPool::Item: Debug,
DaPool::Key: Debug,
ClPoolAdapter: MempoolAdapter<Item = ClPool::Item, Key = ClPool::Key>,
@ -165,9 +165,9 @@ impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS, Storage> Servi
for CarnotConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS, Storage>
where
A: NetworkAdapter + Clone + Send + Sync + 'static,
ClPool: MemPool + Send + Sync + 'static,
ClPool: MemPool<BlockId = BlockId> + Send + Sync + 'static,
ClPool::Settings: Send + Sync + 'static,
DaPool: MemPool + Send + Sync + 'static,
DaPool: MemPool<BlockId = BlockId> + Send + Sync + 'static,
DaPool::Settings: Send + Sync + 'static,
ClPool::Item: Transaction<Hash = ClPool::Key>
+ Debug
@ -364,9 +364,9 @@ impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS, Storage>
CarnotConsensus<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, O, TxS, BS, Storage>
where
A: NetworkAdapter + Clone + Send + Sync + 'static,
ClPool: MemPool + Send + Sync + 'static,
ClPool: MemPool<BlockId = BlockId> + Send + Sync + 'static,
ClPool::Settings: Send + Sync + 'static,
DaPool: MemPool + Send + Sync + 'static,
DaPool: MemPool<BlockId = BlockId> + Send + Sync + 'static,
DaPool::Settings: Send + Sync + 'static,
ClPool::Item: Transaction<Hash = ClPool::Key>
+ Debug
@ -462,8 +462,8 @@ where
task_manager: &mut TaskManager<View, Event<ClPool::Item, DaPool::Item>>,
adapter: A,
private_key: PrivateKey,
cl_mempool_relay: OutboundRelay<MempoolMsg<ClPool::Item, ClPool::Key>>,
da_mempool_relay: OutboundRelay<MempoolMsg<DaPool::Item, DaPool::Key>>,
cl_mempool_relay: OutboundRelay<MempoolMsg<BlockId, ClPool::Item, ClPool::Key>>,
da_mempool_relay: OutboundRelay<MempoolMsg<BlockId, DaPool::Item, DaPool::Key>>,
storage_relay: OutboundRelay<StorageMsg<Storage>>,
tx_selector: TxS,
blobl_selector: BS,
@ -577,8 +577,8 @@ where
task_manager: &mut TaskManager<View, Event<ClPool::Item, DaPool::Item>>,
adapter: A,
storage_relay: OutboundRelay<StorageMsg<Storage>>,
cl_mempool_relay: OutboundRelay<MempoolMsg<ClPool::Item, ClPool::Key>>,
da_mempool_relay: OutboundRelay<MempoolMsg<DaPool::Item, DaPool::Key>>,
cl_mempool_relay: OutboundRelay<MempoolMsg<BlockId, ClPool::Item, ClPool::Key>>,
da_mempool_relay: OutboundRelay<MempoolMsg<BlockId, DaPool::Item, DaPool::Key>>,
) -> (Carnot<O>, Option<Output<ClPool::Item, DaPool::Item>>) {
tracing::debug!("received proposal {:?}", block);
if carnot.highest_voted_view() >= block.header().view {
@ -793,8 +793,8 @@ where
qc: Qc,
tx_selector: TxS,
blob_selector: BS,
cl_mempool_relay: OutboundRelay<MempoolMsg<ClPool::Item, ClPool::Key>>,
da_mempool_relay: OutboundRelay<MempoolMsg<DaPool::Item, DaPool::Key>>,
cl_mempool_relay: OutboundRelay<MempoolMsg<BlockId, ClPool::Item, ClPool::Key>>,
da_mempool_relay: OutboundRelay<MempoolMsg<BlockId, DaPool::Item, DaPool::Key>>,
) -> Option<Output<ClPool::Item, DaPool::Item>> {
let mut output = None;
let cl_txs = get_mempool_contents(cl_mempool_relay);
@ -1127,7 +1127,7 @@ pub struct CarnotInfo {
}
async fn get_mempool_contents<Item, Key>(
mempool: OutboundRelay<MempoolMsg<Item, Key>>,
mempool: OutboundRelay<MempoolMsg<BlockId, Item, Key>>,
) -> Result<Box<dyn Iterator<Item = Item> + Send>, tokio::sync::oneshot::error::RecvError> {
let (reply_channel, rx) = tokio::sync::oneshot::channel();
@ -1143,7 +1143,7 @@ async fn get_mempool_contents<Item, Key>(
}
async fn mark_in_block<Item, Key>(
mempool: OutboundRelay<MempoolMsg<Item, Key>>,
mempool: OutboundRelay<MempoolMsg<BlockId, Item, Key>>,
ids: impl Iterator<Item = Key>,
block: BlockId,
) {

View File

@ -6,19 +6,18 @@ use std::{collections::BTreeMap, time::UNIX_EPOCH};
// crates
// internal
use crate::backend::{MemPool, MempoolError};
use nomos_core::block::BlockId;
use super::Status;
/// A mock mempool implementation that stores all transactions in memory in the order received.
pub struct MockPool<Item, Key> {
pub struct MockPool<BlockId, Item, Key> {
pending_items: LinkedHashMap<Key, Item>,
in_block_items: BTreeMap<BlockId, Vec<Item>>,
in_block_items_by_id: BTreeMap<Key, BlockId>,
last_item_timestamp: u64,
}
impl<Item, Key> Default for MockPool<Item, Key>
impl<BlockId, Item, Key> Default for MockPool<BlockId, Item, Key>
where
Key: Hash + Eq,
{
@ -32,7 +31,7 @@ where
}
}
impl<Item, Key> MockPool<Item, Key>
impl<BlockId, Item, Key> MockPool<BlockId, Item, Key>
where
Key: Hash + Eq + Clone,
{
@ -41,14 +40,16 @@ where
}
}
impl<Item, Key> MemPool for MockPool<Item, Key>
impl<BlockId, Item, Key> MemPool for MockPool<BlockId, Item, Key>
where
Item: Clone + Send + Sync + 'static + Hash,
Key: Clone + Ord + Hash,
BlockId: Copy + Ord,
{
type Settings = ();
type Item = Item;
type Key = Key;
type BlockId = BlockId;
fn new(_settings: Self::Settings) -> Self {
Self::new()
@ -108,7 +109,7 @@ where
self.last_item_timestamp
}
fn status(&self, items: &[Self::Key]) -> Vec<Status> {
fn status(&self, items: &[Self::Key]) -> Vec<Status<BlockId>> {
items
.iter()
.map(|key| {

View File

@ -1,7 +1,6 @@
#[cfg(feature = "mock")]
pub mod mockpool;
use nomos_core::block::BlockId;
use serde::{Deserialize, Serialize};
#[derive(thiserror::Error, Debug)]
@ -16,6 +15,7 @@ pub trait MemPool {
type Settings: Clone;
type Item;
type Key;
type BlockId;
/// Construct a new empty pool
fn new(settings: Self::Settings) -> Self;
@ -28,14 +28,17 @@ pub trait MemPool {
/// in a block.
/// The hint on the ancestor *can* be used by the implementation to display additional
/// items that were not included up to that point if available.
fn view(&self, ancestor_hint: BlockId) -> Box<dyn Iterator<Item = Self::Item> + Send>;
fn view(&self, ancestor_hint: Self::BlockId) -> Box<dyn Iterator<Item = Self::Item> + Send>;
/// Record that a set of items were included in a block
fn mark_in_block(&mut self, items: &[Self::Key], block: BlockId);
fn mark_in_block(&mut self, items: &[Self::Key], block: Self::BlockId);
/// Returns all of the transactions for the block
#[cfg(test)]
fn block_items(&self, block: BlockId) -> Option<Box<dyn Iterator<Item = Self::Item> + Send>>;
fn block_items(
&self,
block: Self::BlockId,
) -> Option<Box<dyn Iterator<Item = Self::Item> + Send>>;
/// Signal that a set of transactions can't be possibly requested anymore and can be
/// discarded.
@ -46,12 +49,12 @@ pub trait MemPool {
// Return the status of a set of items.
// This is a best effort attempt, and implementations are free to return `Unknown` for all of them.
fn status(&self, items: &[Self::Key]) -> Vec<Status>;
fn status(&self, items: &[Self::Key]) -> Vec<Status<Self::BlockId>>;
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
pub enum Status {
pub enum Status<BlockId> {
/// Unknown status
Unknown,
/// Pending status

View File

@ -24,7 +24,6 @@ use tokio::sync::oneshot::Sender;
// internal
use crate::network::NetworkAdapter;
use backend::{MemPool, Status};
use nomos_core::block::BlockId;
use nomos_network::{NetworkMsg, NetworkService};
use overwatch_rs::services::life_cycle::LifecycleMessage;
use overwatch_rs::services::{
@ -42,6 +41,7 @@ where
P::Settings: Clone,
P::Item: Debug + 'static,
P::Key: Debug + 'static,
P::BlockId: Debug + 'static,
D: Discriminant,
{
service_state: ServiceStateHandle<Self>,
@ -63,7 +63,7 @@ pub struct MempoolMetrics {
pub last_item_timestamp: u64,
}
pub enum MempoolMsg<Item, Key> {
pub enum MempoolMsg<BlockId, Item, Key> {
Add {
item: Item,
key: Key,
@ -90,12 +90,13 @@ pub enum MempoolMsg<Item, Key> {
},
Status {
items: Vec<Key>,
reply_channel: Sender<Vec<Status>>,
reply_channel: Sender<Vec<Status<BlockId>>>,
},
}
impl<Item, Key> Debug for MempoolMsg<Item, Key>
impl<BlockId, Item, Key> Debug for MempoolMsg<BlockId, Item, Key>
where
BlockId: Debug,
Item: Debug,
Key: Debug,
{
@ -122,7 +123,10 @@ where
}
}
impl<Item: 'static, Key: 'static> RelayMessage for MempoolMsg<Item, Key> {}
impl<BlockId: 'static, Item: 'static, Key: 'static> RelayMessage
for MempoolMsg<BlockId, Item, Key>
{
}
pub struct Transaction;
pub struct Certificate;
@ -146,13 +150,14 @@ where
P::Settings: Clone,
P::Item: Debug + 'static,
P::Key: Debug + 'static,
P::BlockId: Debug + 'static,
D: Discriminant,
{
const SERVICE_ID: ServiceId = D::ID;
type Settings = Settings<P::Settings, N::Settings>;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = MempoolMsg<<P as MemPool>::Item, <P as MemPool>::Key>;
type Message = MempoolMsg<<P as MemPool>::BlockId, <P as MemPool>::Item, <P as MemPool>::Key>;
}
#[async_trait::async_trait]
@ -163,6 +168,7 @@ where
N::Settings: Clone + Send + Sync + 'static,
P::Item: Clone + Debug + Send + Sync + 'static,
P::Key: Debug + Send + Sync + 'static,
P::BlockId: Send + Debug + 'static,
N: NetworkAdapter<Item = P::Item, Key = P::Key> + Send + Sync + 'static,
D: Discriminant + Send,
{
@ -237,6 +243,7 @@ where
N::Settings: Clone + Send + Sync + 'static,
P::Item: Clone + Debug + Send + Sync + 'static,
P::Key: Debug + Send + Sync + 'static,
P::BlockId: Debug + Send + 'static,
N: NetworkAdapter<Item = P::Item, Key = P::Key> + Send + Sync + 'static,
D: Discriminant + Send,
{
@ -256,7 +263,7 @@ where
}
async fn handle_mempool_message(
message: MempoolMsg<P::Item, P::Key>,
message: MempoolMsg<P::BlockId, P::Item, P::Key>,
pool: &mut P,
network_relay: &mut OutboundRelay<NetworkMsg<N::Backend>>,
service_state: &mut ServiceStateHandle<Self>,

View File

@ -18,12 +18,12 @@ enum MempoolMsgType {
MarkInBlock,
}
impl<I, K> From<&MempoolMsg<I, K>> for MempoolMsgType
impl<BlockId, I, K> From<&MempoolMsg<BlockId, I, K>> for MempoolMsgType
where
I: 'static + Debug,
K: 'static + Debug,
{
fn from(event: &MempoolMsg<I, K>) -> Self {
fn from(event: &MempoolMsg<BlockId, I, K>) -> Self {
match event {
MempoolMsg::Add { .. } => MempoolMsgType::Add,
MempoolMsg::View { .. } => MempoolMsgType::View,
@ -60,7 +60,7 @@ impl Metrics {
Self { messages }
}
pub(crate) fn record<I, K>(&self, msg: &MempoolMsg<I, K>)
pub(crate) fn record<BlockId, I, K>(&self, msg: &MempoolMsg<BlockId, I, K>)
where
I: 'static + Debug,
K: 'static + Debug,

View File

@ -21,7 +21,11 @@ struct MockPoolNode {
logging: ServiceHandle<Logger>,
network: ServiceHandle<NetworkService<Mock>>,
mockpool: ServiceHandle<
MempoolService<MockAdapter, MockPool<MockTransaction<MockMessage>, MockTxId>, Transaction>,
MempoolService<
MockAdapter,
MockPool<BlockId, MockTransaction<MockMessage>, MockTxId>,
Transaction,
>,
>,
}
@ -76,7 +80,7 @@ fn test_mockmempool() {
let network = app.handle().relay::<NetworkService<Mock>>();
let mempool = app.handle().relay::<MempoolService<
MockAdapter,
MockPool<MockTransaction<MockMessage>, MockTxId>,
MockPool<BlockId, MockTransaction<MockMessage>, MockTxId>,
Transaction,
>>();