1
0
mirror of synced 2025-01-09 23:35:46 +00:00

Cryptarchia block subscribe (#617)

* Pipe broadcast channel

* Add subscription to cryptarchia services message

* Clippy happy
This commit is contained in:
Daniel Sanchez 2024-03-22 10:45:54 +01:00 committed by GitHub
parent 36d441ec21
commit 2677199ed6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -32,6 +32,7 @@ use serde_with::serde_as;
use std::hash::Hash; use std::hash::Hash;
use thiserror::Error; use thiserror::Error;
use tokio::sync::oneshot::Sender; use tokio::sync::oneshot::Sender;
use tokio::sync::{broadcast, oneshot};
use tokio_stream::wrappers::IntervalStream; use tokio_stream::wrappers::IntervalStream;
use tracing::{error, instrument}; use tracing::{error, instrument};
@ -124,9 +125,9 @@ where
DaPool: MemPool<BlockId = HeaderId>, DaPool: MemPool<BlockId = HeaderId>,
DaPoolAdapter: MempoolAdapter<Item = DaPool::Item, Key = DaPool::Key>, DaPoolAdapter: MempoolAdapter<Item = DaPool::Item, Key = DaPool::Key>,
ClPool::Item: Debug + 'static, ClPool::Item: Clone + Eq + Hash + Debug + 'static,
ClPool::Key: Debug + 'static, ClPool::Key: Debug + 'static,
DaPool::Item: Debug + 'static, DaPool::Item: Clone + Eq + Hash + Debug + 'static,
DaPool::Key: Debug + 'static, DaPool::Key: Debug + 'static,
A::Backend: 'static, A::Backend: 'static,
TxS: TxSelect<Tx = ClPool::Item>, TxS: TxSelect<Tx = ClPool::Item>,
@ -139,6 +140,7 @@ where
network_relay: Relay<NetworkService<A::Backend>>, network_relay: Relay<NetworkService<A::Backend>>,
cl_mempool_relay: Relay<MempoolService<ClPoolAdapter, ClPool, TxDiscriminant>>, cl_mempool_relay: Relay<MempoolService<ClPoolAdapter, ClPool, TxDiscriminant>>,
da_mempool_relay: Relay<MempoolService<DaPoolAdapter, DaPool, CertDiscriminant>>, da_mempool_relay: Relay<MempoolService<DaPoolAdapter, DaPool, CertDiscriminant>>,
block_subscription_sender: broadcast::Sender<Block<ClPool::Item, DaPool::Item>>,
storage_relay: Relay<StorageService<Storage>>, storage_relay: Relay<StorageService<Storage>>,
} }
@ -147,10 +149,10 @@ impl<A, ClPool, ClPoolAdapter, DaPool, DaPoolAdapter, TxS, BS, Storage> ServiceD
where where
A: NetworkAdapter, A: NetworkAdapter,
ClPool: MemPool<BlockId = HeaderId>, ClPool: MemPool<BlockId = HeaderId>,
ClPool::Item: Debug, ClPool::Item: Clone + Eq + Hash + Debug,
ClPool::Key: Debug, ClPool::Key: Debug,
DaPool: MemPool<BlockId = HeaderId>, DaPool: MemPool<BlockId = HeaderId>,
DaPool::Item: Debug, DaPool::Item: Clone + Eq + Hash + Debug,
DaPool::Key: Debug, DaPool::Key: Debug,
ClPoolAdapter: MempoolAdapter<Item = ClPool::Item, Key = ClPool::Key>, ClPoolAdapter: MempoolAdapter<Item = ClPool::Item, Key = ClPool::Key>,
DaPoolAdapter: MempoolAdapter<Item = DaPool::Item, Key = DaPool::Key>, DaPoolAdapter: MempoolAdapter<Item = DaPool::Item, Key = DaPool::Key>,
@ -162,7 +164,7 @@ where
type Settings = CryptarchiaSettings<TxS::Settings, BS::Settings>; type Settings = CryptarchiaSettings<TxS::Settings, BS::Settings>;
type State = NoState<Self::Settings>; type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>; type StateOperator = NoOperator<Self::State>;
type Message = ConsensusMsg; type Message = ConsensusMsg<Block<ClPool::Item, DaPool::Item>>;
} }
#[async_trait::async_trait] #[async_trait::async_trait]
@ -213,11 +215,13 @@ where
let cl_mempool_relay = service_state.overwatch_handle.relay(); let cl_mempool_relay = service_state.overwatch_handle.relay();
let da_mempool_relay = service_state.overwatch_handle.relay(); let da_mempool_relay = service_state.overwatch_handle.relay();
let storage_relay = service_state.overwatch_handle.relay(); let storage_relay = service_state.overwatch_handle.relay();
let (block_subscription_sender, _) = broadcast::channel(16);
Ok(Self { Ok(Self {
service_state, service_state,
network_relay, network_relay,
cl_mempool_relay, cl_mempool_relay,
da_mempool_relay, da_mempool_relay,
block_subscription_sender,
storage_relay, storage_relay,
}) })
} }
@ -287,6 +291,7 @@ where
storage_relay.clone(), storage_relay.clone(),
cl_mempool_relay.clone(), cl_mempool_relay.clone(),
da_mempool_relay.clone(), da_mempool_relay.clone(),
&mut self.block_subscription_sender
) )
.await; .await;
} }
@ -317,7 +322,7 @@ where
} }
Some(msg) = self.service_state.inbound_relay.next() => { Some(msg) = self.service_state.inbound_relay.next() => {
Self::process_message(&cryptarchia, msg); Self::process_message(&cryptarchia, &self.block_subscription_sender, msg);
} }
Some(msg) = lifecycle_stream.next() => { Some(msg) = lifecycle_stream.next() => {
if Self::should_stop_service(msg).await { if Self::should_stop_service(msg).await {
@ -381,7 +386,11 @@ where
} }
} }
fn process_message(cryptarchia: &Cryptarchia, msg: ConsensusMsg) { fn process_message(
cryptarchia: &Cryptarchia,
block_channel: &broadcast::Sender<Block<ClPool::Item, DaPool::Item>>,
msg: ConsensusMsg<Block<ClPool::Item, DaPool::Item>>,
) {
match msg { match msg {
ConsensusMsg::Info { tx } => { ConsensusMsg::Info { tx } => {
let info = CryptarchiaInfo { let info = CryptarchiaInfo {
@ -391,6 +400,11 @@ where
tracing::error!("Could not send consensus info through channel: {:?}", e) tracing::error!("Could not send consensus info through channel: {:?}", e)
}); });
} }
ConsensusMsg::BlockSubscribe { sender } => {
sender.send(block_channel.subscribe()).unwrap_or_else(|_| {
tracing::error!("Could not subscribe to block subscription channel")
});
}
} }
} }
@ -405,6 +419,7 @@ where
storage_relay: OutboundRelay<StorageMsg<Storage>>, storage_relay: OutboundRelay<StorageMsg<Storage>>,
cl_mempool_relay: OutboundRelay<MempoolMsg<HeaderId, ClPool::Item, ClPool::Key>>, cl_mempool_relay: OutboundRelay<MempoolMsg<HeaderId, ClPool::Item, ClPool::Key>>,
da_mempool_relay: OutboundRelay<MempoolMsg<HeaderId, DaPool::Item, DaPool::Key>>, da_mempool_relay: OutboundRelay<MempoolMsg<HeaderId, DaPool::Item, DaPool::Key>>,
block_broadcaster: &mut broadcast::Sender<Block<ClPool::Item, DaPool::Item>>,
) -> Cryptarchia { ) -> Cryptarchia {
tracing::debug!("received proposal {:?}", block); tracing::debug!("received proposal {:?}", block);
@ -425,11 +440,15 @@ where
mark_in_block(da_mempool_relay, block.blobs().map(Certificate::hash), id).await; mark_in_block(da_mempool_relay, block.blobs().map(Certificate::hash), id).await;
// store block // store block
let msg = <StorageMsg<_>>::new_store_message(header.id(), block); let msg = <StorageMsg<_>>::new_store_message(header.id(), block.clone());
if let Err((e, _msg)) = storage_relay.send(msg).await { if let Err((e, _msg)) = storage_relay.send(msg).await {
tracing::error!("Could not send block to storage: {e}"); tracing::error!("Could not send block to storage: {e}");
} }
if let Err(e) = block_broadcaster.send(block) {
tracing::error!("Could not notify block to services {e}");
}
cryptarchia = new_state; cryptarchia = new_state;
} }
Err(Error::Consensus(cryptarchia_engine::Error::ParentMissing(parent))) => { Err(Error::Consensus(cryptarchia_engine::Error::ParentMissing(parent))) => {
@ -478,11 +497,16 @@ where
} }
#[derive(Debug)] #[derive(Debug)]
pub enum ConsensusMsg { pub enum ConsensusMsg<Block> {
Info { tx: Sender<CryptarchiaInfo> }, Info {
tx: Sender<CryptarchiaInfo>,
},
BlockSubscribe {
sender: oneshot::Sender<broadcast::Receiver<Block>>,
},
} }
impl RelayMessage for ConsensusMsg {} impl<Block: 'static> RelayMessage for ConsensusMsg<Block> {}
#[serde_as] #[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]