fix(blend): panic on empty membership before core and edge services are gracefully shut down (#2233)

This commit is contained in:
Antonio 2026-02-19 19:48:10 +01:00 committed by GitHub
parent 675b9b4f23
commit 7f32c764d5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 407 additions and 71 deletions

2
Cargo.lock generated
View File

@ -3961,6 +3961,7 @@ dependencies = [
"const-hex",
"ed25519-dalek",
"generic-array 1.3.5",
"hex",
"logos-blockchain-blend-crypto",
"logos-blockchain-core",
"logos-blockchain-groth16",
@ -4006,6 +4007,7 @@ dependencies = [
"async-trait",
"fork_stream",
"futures",
"hex",
"libp2p",
"libp2p-stream",
"libp2p-swarm-test",

View File

@ -15,6 +15,7 @@ workspace = true
[dependencies]
ed25519-dalek = { workspace = true }
generic-array = { default-features = false, version = "1.2" }
hex = { workspace = true }
lb-blend-crypto = { workspace = true }
lb-groth16 = { workspace = true }
lb-pol = { workspace = true }

View File

@ -1,3 +1,5 @@
use core::fmt::{self, Debug, Formatter};
use lb_poq::NotePathAndSelectors;
use crate::{CorePathAndSelectors, ZkHash};
@ -50,12 +52,21 @@ impl Inputs {
}
}
#[derive(Debug, Clone)]
#[derive(Clone)]
pub enum ProofType {
CoreQuota(Box<ProofOfCoreQuotaInputs>),
LeadershipQuota(Box<ProofOfLeadershipQuotaInputs>),
}
impl Debug for ProofType {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
Self::CoreQuota(_) => f.write_str("ProofType::CoreQuota"),
Self::LeadershipQuota(_) => f.write_str("ProofType::LeadershipQuota"),
}
}
}
impl ProofType {
#[must_use]
pub const fn proof_selector(&self) -> bool {
@ -66,7 +77,7 @@ impl ProofType {
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Clone, PartialEq, Eq)]
pub struct ProofOfCoreQuotaInputs {
pub core_sk: ZkHash,
pub core_path_and_selectors: CorePathAndSelectors,
@ -78,7 +89,7 @@ impl From<ProofOfCoreQuotaInputs> for ProofType {
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Clone, PartialEq, Eq)]
pub struct ProofOfLeadershipQuotaInputs {
pub slot: u64,
pub note_value: u64,

View File

@ -1,10 +1,12 @@
use lb_groth16::Fr;
use core::fmt::{self, Debug, Formatter};
use lb_groth16::{Fr, fr_to_bytes};
use serde::{Deserialize, Serialize};
use crate::{ZkHash, quota::Ed25519PublicKey};
/// Public inputs for all types of Proof of Quota. Spec: <https://www.notion.so/nomos-tech/Proof-of-Quota-Specification-215261aa09df81d88118ee22205cbafe?source=copy_link#25a261aa09df80ce943dce35dd5403ac>.
#[derive(Debug, Clone, Copy)]
#[derive(Clone, Copy)]
pub struct Inputs {
pub signing_key: Ed25519PublicKey,
pub session: u64,
@ -12,6 +14,17 @@ pub struct Inputs {
pub leader: LeaderInputs,
}
impl Debug for Inputs {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Inputs")
.field("signing_key", &hex::encode(self.signing_key.as_bytes()))
.field("session", &self.session)
.field("core", &self.core)
.field("leader", &self.leader)
.finish()
}
}
#[cfg(test)]
impl Default for Inputs {
fn default() -> Self {
@ -26,7 +39,7 @@ impl Default for Inputs {
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[derive(Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[cfg_attr(test, derive(Default))]
pub struct CoreInputs {
#[serde(with = "lb_groth16::serde::serde_fr")]
@ -34,7 +47,16 @@ pub struct CoreInputs {
pub quota: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
impl Debug for CoreInputs {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("CoreInputs")
.field("zk_root", &hex::encode(fr_to_bytes(&self.zk_root)))
.field("quota", &self.quota)
.finish()
}
}
#[derive(Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[cfg_attr(test, derive(Default))]
pub struct LeaderInputs {
#[serde(with = "lb_groth16::serde::serde_fr")]
@ -47,3 +69,21 @@ pub struct LeaderInputs {
#[serde(with = "lb_groth16::serde::serde_fr")]
pub lottery_1: Fr,
}
impl Debug for LeaderInputs {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("LeaderInputs")
.field(
"pol_ledger_aged",
&hex::encode(fr_to_bytes(&self.pol_ledger_aged)),
)
.field(
"pol_epoch_nonce",
&hex::encode(fr_to_bytes(&self.pol_epoch_nonce)),
)
.field("message_quota", &self.message_quota)
.field("lottery_0", &hex::encode(fr_to_bytes(&self.lottery_0)))
.field("lottery_1", &hex::encode(fr_to_bytes(&self.lottery_1)))
.finish()
}
}

View File

@ -136,6 +136,11 @@ impl<NodeId> Membership<NodeId> {
pub fn size(&self) -> usize {
self.core_nodes.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.core_nodes.is_empty()
}
}
#[cfg(test)]

View File

@ -302,6 +302,6 @@ mod test {
processor.set_epoch_private(new_private_inputs.clone());
assert_eq!(processor.proofs_generator.1, Some(new_private_inputs));
assert!(processor.proofs_generator.1 == Some(new_private_inputs));
}
}

View File

@ -213,6 +213,6 @@ mod test {
processor.proofs_generator.0.public_inputs.leader,
new_leader_inputs
);
assert_eq!(processor.proofs_generator.1, new_private_inputs);
assert!(processor.proofs_generator.1 == new_private_inputs);
}
}

View File

@ -95,6 +95,10 @@ where
}
}
pub fn consume(self) -> OldSessionMessageScheduler<Rng, ProcessedMessage> {
OldSessionMessageScheduler(self.release_delayer)
}
pub fn rotate_session(
self,
new_session_info: SessionInfo,

View File

@ -54,7 +54,7 @@ where
}
}
#[derive(Clone)]
#[derive(Clone, Debug)]
pub enum SessionEvent<Session> {
NewSession(Session),
TransitionPeriodExpired,

View File

@ -16,6 +16,7 @@ workspace = true
async-trait = "0.1"
fork_stream = { workspace = true }
futures = { default-features = false, version = "0.3" }
hex = { workspace = true }
lb-blend = { workspace = true }
lb-chain-broadcast-service = { workspace = true }
lb-chain-service = { workspace = true }

View File

@ -75,7 +75,7 @@ use overwatch::{
use rand::{RngCore, SeedableRng as _, seq::SliceRandom as _};
use serde::{Deserialize, Serialize};
use tokio::sync::oneshot;
use tracing::{error, info};
use tracing::{debug, error, info};
use crate::{
core::{
@ -96,7 +96,7 @@ use crate::{
kms::PreloadKmsService,
membership::{self, MembershipInfo, ZkInfo},
message::{NetworkMessage, ProcessedMessage, ServiceMessage},
session::{CoreSessionInfo, CoreSessionPublicInfo},
session::{CoreSessionInfo, CoreSessionPublicInfo, MaybeEmptyCoreSessionInfo},
settings::FIRST_STREAM_ITEM_READY_TIMEOUT,
};
@ -222,7 +222,7 @@ impl<
>
where
Backend: BlendBackend<NodeId, BlakeRng, ProofsVerifier, RuntimeServiceId> + Send + Sync,
NodeId: Clone + Send + Eq + Hash + Sync + 'static,
NodeId: Clone + Debug + Send + Eq + Hash + Sync + 'static,
Network: NetworkAdapter<RuntimeServiceId, BroadcastSettings: Eq + Hash + Unpin> + Send + Sync,
MembershipAdapter: membership::Adapter<NodeId = NodeId, Error: Send + Sync + 'static> + Send,
membership::ServiceMessage<MembershipAdapter>: Send + Sync + 'static,
@ -521,7 +521,7 @@ async fn initialize<
Option<RecoveryServiceState<Backend::Settings, NetAdapter::BroadcastSettings>>,
>,
) -> (
impl Stream<Item = SessionEvent<CoreSessionInfo<NodeId, KmsAdapter::CorePoQGenerator>>>
impl Stream<Item = SessionEvent<MaybeEmptyCoreSessionInfo<NodeId, KmsAdapter::CorePoQGenerator>>>
+ Unpin
+ Send
+ 'static,
@ -543,7 +543,7 @@ async fn initialize<
BlakeRng,
)
where
NodeId: Clone + Eq + Hash + Send + 'static,
NodeId: Clone + Debug + Eq + Hash + Send + 'static,
Backend: BlendBackend<NodeId, BlakeRng, ProofsVerifier, RuntimeServiceId> + Sync,
NetAdapter: NetworkAdapter<RuntimeServiceId, BroadcastSettings: Eq + Hash + Unpin>,
ChainService: ChainApi<RuntimeServiceId> + Sync,
@ -566,10 +566,16 @@ where
session_number,
zk,
}| {
let ZkInfo {
// This can be empty in case of an empty membership set.
let Some(ZkInfo {
root,
core_and_path_selectors,
} = zk.expect("ZK info should be present for the membership set.");
}) = zk
else {
return MaybeEmptyCoreSessionInfo::Empty {
session: session_number,
};
};
CoreSessionInfo {
public: CoreSessionPublicInfo {
poq_core_public_inputs: CoreInputs {
@ -587,6 +593,7 @@ where
),
),
}
.into()
},
)
}
@ -601,7 +608,10 @@ where
)
.await
.map(|(membership_info, remaining_session_stream)| {
(membership_info, remaining_session_stream.fork())
let MaybeEmptyCoreSessionInfo::NonEmpty(core_session_info) = membership_info else {
panic!("First retrieved session for Blend core startup must be available.");
};
(core_session_info, remaining_session_stream.fork())
})
.expect("The current session info must be available.");
@ -629,8 +639,8 @@ where
info!(
target: LOG_TARGET,
"The current membership is ready: {} nodes.",
current_membership_info.public.membership.size()
"The current membership is ready: {:?}",
current_membership_info.public
);
let current_public_info = PublicInfo {
@ -648,6 +658,8 @@ where
},
};
debug!(target: LOG_TARGET, "Current public info: {:?}", current_public_info);
let crypto_processor = CoreCryptographicProcessor::<
_,
KmsAdapter::CorePoQGenerator,
@ -784,7 +796,8 @@ async fn run_event_loop<
remaining_clock_stream: &mut (impl Stream<Item = SlotTick> + Send + Sync + Unpin + 'static),
mut secret_pol_info_stream: impl Stream<Item = PolEpochInfo> + Unpin,
remaining_session_stream: &mut (
impl Stream<Item = SessionEvent<CoreSessionInfo<NodeId, CorePoQGenerator>>> + Unpin
impl Stream<Item = SessionEvent<MaybeEmptyCoreSessionInfo<NodeId, CorePoQGenerator>>>
+ Unpin
),
blend_config: &RunningBlendConfig<Backend::Settings>,
@ -928,7 +941,7 @@ async fn retire<
+ 'static,
mut remaining_clock_stream: impl Stream<Item = SlotTick> + Send + Sync + Unpin + 'static,
mut remaining_session_stream: impl Stream<
Item = SessionEvent<CoreSessionInfo<NodeId, CorePoQGenerator>>,
Item = SessionEvent<MaybeEmptyCoreSessionInfo<NodeId, CorePoQGenerator>>,
> + Unpin,
blend_config: &RunningBlendConfig<Backend::Settings>,
mut backend: Backend,
@ -998,6 +1011,7 @@ async fn retire<
/// for `PoQ` verification in this new session. It ignores the transition period
/// expiration event and returns the previous cryptographic processor as is.
#[expect(clippy::too_many_arguments, reason = "necessary for session handling")]
#[expect(clippy::too_many_lines, reason = "necessary for session handling")]
async fn handle_session_event<
NodeId,
ProofsGenerator,
@ -1008,7 +1022,7 @@ async fn handle_session_event<
CorePoQGenerator,
RuntimeServiceId,
>(
event: SessionEvent<CoreSessionInfo<NodeId, CorePoQGenerator>>,
event: SessionEvent<MaybeEmptyCoreSessionInfo<NodeId, CorePoQGenerator>>,
settings: &RunningBlendConfig<Backend::Settings>,
current_cryptographic_processor: CoreCryptographicProcessor<
NodeId,
@ -1043,7 +1057,7 @@ where
Backend: BlendBackend<NodeId, BlakeRng, ProofsVerifier, RuntimeServiceId>,
{
match event {
SessionEvent::NewSession(CoreSessionInfo {
SessionEvent::NewSession(MaybeEmptyCoreSessionInfo::NonEmpty(CoreSessionInfo {
core_poq_generator,
public:
CoreSessionPublicInfo {
@ -1051,7 +1065,7 @@ where
session: new_session,
membership: new_membership,
},
}) => {
})) => {
let (_, _, _, _, current_session_blending_token_collector, _, state_updater) =
current_recovery_checkpoint.into_components();
@ -1128,6 +1142,27 @@ where
.expect("service state should be created successfully"),
}
}
SessionEvent::NewSession(MaybeEmptyCoreSessionInfo::Empty { session }) => {
tracing::info!(target: LOG_TARGET, "New session event received, but no session info is available due to empty membership set.");
let (_, _, _, _, current_session_blending_token_collector, _, _) =
current_recovery_checkpoint.into_components();
let new_reward_session_info = reward::SessionInfo::new(
session,
&current_public_info.epoch.pol_epoch_nonce,
0,
0,
settings.activity_threshold_sensitivity,
)
.expect("Reward session info must be created successfully. Panicking since the service cannot continue with this session");
let (_, old_session_blending_token_collector) =
current_session_blending_token_collector.rotate_session(&new_reward_session_info);
HandleSessionEventOutput::Retiring {
old_crypto_processor: current_cryptographic_processor,
old_scheduler: current_scheduler.consume(),
old_token_collector: old_session_blending_token_collector,
old_public_info: current_public_info,
}
}
SessionEvent::TransitionPeriodExpired => {
let mut state_updater = current_recovery_checkpoint.start_updating();

View File

@ -374,14 +374,17 @@ async fn test_handle_session_event() {
// Handle a NewSession event, expecting Transitioning output.
let output = handle_session_event(
SessionEvent::NewSession(CoreSessionInfo {
public: CoreSessionPublicInfo {
membership: membership.clone(),
session: session + 1,
poq_core_public_inputs: public_info.session.core_public_inputs,
},
core_poq_generator: (),
}),
SessionEvent::NewSession(
CoreSessionInfo {
public: CoreSessionPublicInfo {
membership: membership.clone(),
session: session + 1,
poq_core_public_inputs: public_info.session.core_public_inputs,
},
core_poq_generator: (),
}
.into(),
),
&settings,
crypto_processor,
scheduler,
@ -466,14 +469,17 @@ async fn test_handle_session_event() {
// Handle a NewSession event with a new too small membership,
// expecting Retiring output.
let output = handle_session_event(
SessionEvent::NewSession(CoreSessionInfo {
public: CoreSessionPublicInfo {
membership: new_membership(minimal_network_size - 1).0,
session: session + 2,
poq_core_public_inputs: current_public_info.session.core_public_inputs,
},
core_poq_generator: (),
}),
SessionEvent::NewSession(
CoreSessionInfo {
public: CoreSessionPublicInfo {
membership: new_membership(minimal_network_size - 1).0,
session: session + 2,
poq_core_public_inputs: current_public_info.session.core_public_inputs,
},
core_poq_generator: (),
}
.into(),
),
&settings,
current_crypto_processor,
current_scheduler,
@ -673,3 +679,166 @@ async fn complete_old_session_after_main_loop_done() {
.await
.expect("the service should stop without error");
}
/// Check that the service handles a new session with empty providers (zk: None)
/// without panicking. It should retire gracefully.
#[test_log::test(tokio::test)]
async fn stop_on_empty_session() {
let minimal_network_size = 2;
let (membership, local_private_key) = new_membership(minimal_network_size);
// Create settings.
let (settings, _recovery_file) = settings(
local_private_key.clone(),
u64::from(minimal_network_size).try_into().unwrap(),
(),
0,
);
// Prepare streams.
let (inbound_relay, _inbound_message_sender) = new_stream();
let (mut blend_message_stream, _blend_message_sender) = new_stream();
let (membership_stream, membership_sender) = new_stream();
let (clock_stream, clock_sender) = new_stream();
// Send the initial membership info that the service will expect to receive
// immediately.
let initial_session = 0;
let membership_info = MembershipInfo {
membership: membership.clone(),
zk: Some(ZkInfo {
root: ZkHash::ZERO,
core_and_path_selectors: Some([(ZkHash::ZERO, false); CORE_MERKLE_TREE_HEIGHT]),
}),
session_number: initial_session,
};
membership_sender
.send(membership_info.clone())
.await
.unwrap();
let (sdp_relay, _sdp_relay_receiver) = sdp_relay();
// Send the initial slot tick that the service will expect to receive
// immediately.
clock_sender
.send(SlotTick {
epoch: 0.into(),
slot: 0.into(),
})
.await
.unwrap();
// Prepare an epoch handler with the mock chain service that always returns the
// same epoch state.
let mut epoch_handler = EpochHandler::new(
TestChainService,
settings.time.epoch_transition_period_in_slots,
);
// Prepare dummy Overwatch resources.
let (overwatch_handle, _overwatch_cmd_receiver, state_updater, _state_receiver) =
dummy_overwatch_resources();
// Initialize the service.
let (
mut remaining_session_stream,
mut remaining_clock_stream,
current_public_info,
crypto_processor,
current_recovery_checkpoint,
message_scheduler,
mut backend,
mut rng,
) = initialize::<
NodeId,
TestBlendBackend,
TestNetworkAdapter,
TestChainService,
MockCoreAndLeaderProofsGenerator,
MockProofsVerifier,
MockKmsAdapter,
RuntimeServiceId,
>(
settings.clone(),
membership_stream,
clock_stream,
&mut epoch_handler,
overwatch_handle.clone(),
MockKmsAdapter,
&sdp_relay,
None,
state_updater,
)
.await;
let mut backend_event_receiver = backend.subscribe_to_events();
// Run the event loop of the service in a separate task.
let settings_cloned = settings.clone();
let join_handle = tokio::spawn(async move {
let secret_pol_info_stream =
post_initialize::<OncePolStreamProvider, RuntimeServiceId>(&overwatch_handle).await;
let (
old_session_crypto_processor,
old_session_message_scheduler,
old_session_blending_token_collector,
old_session_public_info,
) = run_event_loop(
inbound_relay,
&mut blend_message_stream,
&mut remaining_clock_stream,
secret_pol_info_stream,
&mut remaining_session_stream,
&settings_cloned,
&mut backend,
&TestNetworkAdapter,
&sdp_relay,
&mut epoch_handler,
message_scheduler.into(),
&mut rng,
crypto_processor,
current_public_info,
current_recovery_checkpoint,
)
.await;
retire(
blend_message_stream,
remaining_clock_stream,
remaining_session_stream,
&settings_cloned,
backend,
TestNetworkAdapter,
sdp_relay,
epoch_handler,
old_session_message_scheduler,
rng,
old_session_blending_token_collector,
old_session_crypto_processor,
old_session_public_info,
)
.await;
});
// Send a new session with empty providers (zk: None).
// This simulates a session where no providers are available.
membership_sender
.send(MembershipInfo {
membership: membership.clone(),
zk: None,
session_number: initial_session + 1,
})
.await
.unwrap();
wait_for_blend_backend_event(
&mut backend_event_receiver,
TestBlendBackendEvent::SessionTransitionCompleted,
)
.await;
// The service should stop without panicking.
join_handle
.await
.expect("the service should stop without panic on empty session");
}

View File

@ -152,7 +152,7 @@ impl<
>
where
Backend: BlendBackend<NodeId, RuntimeServiceId> + Send + Sync,
NodeId: Clone + Eq + Hash + Send + Sync + 'static,
NodeId: Clone + Debug + Eq + Hash + Send + Sync + 'static,
BroadcastSettings: Serialize + DeserializeOwned + Send,
MembershipAdapter: membership::Adapter<NodeId = NodeId, Error: Send + Sync + 'static> + Send,
membership::ServiceMessage<MembershipAdapter>: Send + Sync + 'static,
@ -343,7 +343,7 @@ async fn run<Backend, NodeId, ProofsGenerator, ChainService, PolInfoProvider, Ru
) -> Result<(), Error>
where
Backend: BlendBackend<NodeId, RuntimeServiceId> + Sync + Send,
NodeId: Clone + Eq + Hash + Send + Sync + 'static,
NodeId: Clone + Debug + Eq + Hash + Send + Sync + 'static,
ProofsGenerator: LeaderProofsGenerator + Send,
ChainService: ChainApi<RuntimeServiceId> + Send + Sync,
PolInfoProvider: PolInfoProviderTrait<RuntimeServiceId, Stream: Unpin>,
@ -354,6 +354,12 @@ where
.await
.expect("The current session info must be available.");
info!(
target: LOG_TARGET,
"The current membership is ready: {:?}",
current_membership_info
);
let (current_epoch_info, mut remaining_clock_stream) = async {
let (slot_tick, remaining_clock_stream) = clock_stream
.first()
@ -385,11 +391,7 @@ where
}
.await;
info!(
target: LOG_TARGET,
"The current membership is ready: {} nodes.",
current_membership_info.membership.size()
);
debug!(target: LOG_TARGET, "Current epoch info: {:?}", current_epoch_info);
notify_ready();
@ -417,6 +419,8 @@ where
}
.await;
debug!(target: LOG_TARGET, "Current secret leader info: {:?}", current_private_leader_info);
let mut current_public_inputs = PoQVerificationInputsMinusSigningKey {
core: CoreInputs {
zk_root: current_membership_info
@ -433,6 +437,8 @@ where
session: current_membership_info.session_number,
};
debug!(target: LOG_TARGET, "Current public info: {current_public_inputs:?}");
let mut message_handler =
MessageHandler::<Backend, _, ProofsGenerator, _>::try_new_with_edge_condition_check(
settings.clone(),
@ -446,9 +452,20 @@ where
loop {
tokio::select! {
Some(SessionEvent::NewSession(new_session_info)) = remaining_session_stream.next() => {
let (new_message_handler, new_public_inputs) = handle_new_session(new_session_info, settings.clone(), current_private_leader_info.poq_private_inputs.clone(), overwatch_handle.clone(), current_public_inputs, message_handler)?;
message_handler = new_message_handler;
current_public_inputs = new_public_inputs;
match handle_new_session(new_session_info, settings.clone(), current_private_leader_info.poq_private_inputs.clone(), overwatch_handle.clone(), current_public_inputs, message_handler) {
Ok((new_message_handler, new_public_inputs)) => {
message_handler = new_message_handler;
current_public_inputs = new_public_inputs;
},
Err(Error::NetworkIsTooSmall(_)) => {
info!(target: LOG_TARGET, "New membership does not satisfy edge node condition, edge service shutting down.");
return Ok(());
}
Err(e) => {
error!(target: LOG_TARGET, "Error when handling new session: {e:?}, edge service shutting down.");
return Err(e);
}
}
}
Some(message) = incoming_message_stream.next() => {
let message_copies = settings.data_replication_factor.checked_add(1).unwrap();
@ -504,6 +521,9 @@ where
ProofsGenerator: LeaderProofsGenerator,
RuntimeServiceId: Clone,
{
let Some(zk_info) = zk else {
return Err(Error::NetworkIsTooSmall(0));
};
debug!(target: LOG_TARGET, "Trying to create a new message handler");
// Update current public inputs with new session info.
let new_public_inputs = PoQVerificationInputsMinusSigningKey {
@ -514,7 +534,7 @@ where
&settings.time,
new_membership.size(),
),
zk_root: zk.expect("Membership should have ZK info").root,
zk_root: zk_info.root,
},
..current_public_inputs
};

View File

@ -86,10 +86,10 @@ async fn run_panics_with_local_is_core_in_initial_membership() {
resume_panic_from(join_handle).await;
}
/// [`run`] fails if a new membership is smaller than the minimum network
/// size.
/// [`run`] shuts down gracefully if a new membership is smaller than the
/// minimum network size.
#[test_log::test(tokio::test)]
async fn run_fails_if_new_membership_is_small() {
async fn run_shuts_down_if_new_membership_is_small() {
let local_node = NodeId(99);
let core_node = NodeId(0);
let minimal_network_size = 1;
@ -105,10 +105,7 @@ async fn run_fails_if_new_membership_is_small() {
.send(membership(&[], local_node))
.await
.expect("channel opened");
assert!(matches!(
join_handle.await.unwrap(),
Err(Error::NetworkIsTooSmall(0))
));
assert!(matches!(join_handle.await.unwrap(), Ok(())));
}
/// [`run`] fails if the local node is not edge in a new membership.

View File

@ -1,4 +1,9 @@
use core::{fmt::Debug, marker::PhantomData, num::NonZeroU64, ops::Deref};
use core::{
fmt::{self, Debug, Formatter},
marker::PhantomData,
num::NonZeroU64,
ops::Deref,
};
use async_trait::async_trait;
use futures::Stream;
@ -6,14 +11,14 @@ use lb_blend::proofs::quota::inputs::prove::private::ProofOfLeadershipQuotaInput
use lb_chain_service::api::{CryptarchiaServiceApi, CryptarchiaServiceData};
use lb_core::crypto::ZkHash;
use lb_cryptarchia_engine::{Epoch, Slot};
use lb_groth16::Fr;
use lb_groth16::{Fr, fr_to_bytes};
use lb_ledger::EpochState;
use lb_time_service::SlotTick;
use overwatch::overwatch::OverwatchHandle;
/// Secret `PoL` info associated to an epoch, as returned by the `PoL` info
/// provider.
#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct PolEpochInfo {
/// Epoch nonce.
pub nonce: ZkHash,
@ -22,6 +27,15 @@ pub struct PolEpochInfo {
pub poq_private_inputs: ProofOfLeadershipQuotaInputs,
}
impl Debug for PolEpochInfo {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("PolEpochInfo")
.field("nonce", &hex::encode(fr_to_bytes(&self.nonce)))
.field("poq_private_inputs", &"<redacted>")
.finish()
}
}
#[async_trait]
pub trait PolInfoProvider<RuntimeServiceId> {
type Stream: Stream<Item = PolEpochInfo>;

View File

@ -19,7 +19,7 @@ use overwatch::{
state::{NoOperator, NoState},
},
};
use tracing::{debug, error, info};
use tracing::{error, info};
use crate::{
core::{
@ -92,7 +92,7 @@ where
> + Send
+ Sync
+ 'static,
NodeId: Clone + Hash + Eq + Send + Sync + 'static,
NodeId: Clone + Debug + Hash + Eq + Send + Sync + 'static,
BackendSettings: Clone + Send + Sync,
> + Send
+ 'static,
@ -192,8 +192,7 @@ where
info!(
target: LOG_TARGET,
"The current membership is ready: {} nodes.",
membership.size()
"The current membership is ready: {membership:?}.",
);
let mut instance = Instance::<CoreService, EdgeService, RuntimeServiceId>::new(
@ -212,7 +211,7 @@ where
loop {
tokio::select! {
Some(session_event) = remaining_session_stream.next() => {
debug!(target: LOG_TARGET, "Received a new session event");
info!(target: LOG_TARGET, "Received a new session event: {session_event:?}");
instance = instance.handle_session_event(session_event, overwatch_handle, minimal_network_size).await?;
},
Some(message) = inbound_relay.next() => {

View File

@ -1,11 +1,13 @@
pub mod node_id;
pub mod service;
use core::fmt::{self, Debug, Formatter};
use std::pin::Pin;
use futures::Stream;
use lb_blend::scheduling::membership::Membership;
use lb_core::crypto::ZkHash;
use lb_groth16::fr_to_bytes;
use lb_key_management_system_service::keys::{Ed25519PublicKey, ZkPublicKey};
use lb_poq::CorePathAndSelectors;
use overwatch::services::{ServiceData, relay::OutboundRelay};
@ -25,15 +27,20 @@ impl<NodeId> MembershipInfo<NodeId> {
membership: Membership<NodeId>,
session_number: u64,
) -> Self {
let zk = if membership.is_empty() {
None
} else {
Some(ZkInfo::default())
};
Self {
membership,
zk,
session_number,
zk: Some(ZkInfo::default()),
}
}
}
#[derive(Clone, Debug)]
#[derive(Clone)]
#[cfg_attr(test, derive(Default))]
/// ZK info for a new session.
pub struct ZkInfo {
@ -44,6 +51,15 @@ pub struct ZkInfo {
pub core_and_path_selectors: Option<CorePathAndSelectors>,
}
impl Debug for ZkInfo {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("ZkInfo")
.field("root", &hex::encode(fr_to_bytes(&self.root)))
.field("core_and_path_selectors", &"<redacted>")
.finish()
}
}
pub type MembershipStream<NodeId> =
Pin<Box<dyn Stream<Item = MembershipInfo<NodeId>> + Send + Sync + 'static>>;

View File

@ -2,7 +2,29 @@ use lb_blend::{
proofs::quota::inputs::prove::public::CoreInputs, scheduling::membership::Membership,
};
#[derive(Clone)]
#[derive(Clone, Debug)]
// TODO: Refactor this so that it's a struct with the common fields, and
// everything case-specific is an enum.
pub enum MaybeEmptyCoreSessionInfo<NodeId, CorePoQGenerator> {
Empty { session: u64 },
NonEmpty(CoreSessionInfo<NodeId, CorePoQGenerator>),
}
impl<NodeId, CorePoQGenerator> From<u64> for MaybeEmptyCoreSessionInfo<NodeId, CorePoQGenerator> {
fn from(session: u64) -> Self {
Self::Empty { session }
}
}
impl<NodeId, CorePoQGenerator> From<CoreSessionInfo<NodeId, CorePoQGenerator>>
for MaybeEmptyCoreSessionInfo<NodeId, CorePoQGenerator>
{
fn from(core_session_info: CoreSessionInfo<NodeId, CorePoQGenerator>) -> Self {
Self::NonEmpty(core_session_info)
}
}
#[derive(Clone, Debug)]
/// All info that Blend services need to be available on new sessions.
pub struct CoreSessionInfo<NodeId, CorePoQGenerator> {
/// The session info available to all nodes.
@ -11,7 +33,7 @@ pub struct CoreSessionInfo<NodeId, CorePoQGenerator> {
pub core_poq_generator: CorePoQGenerator,
}
#[derive(Clone)]
#[derive(Clone, Debug)]
/// All public info that Blend services need to be available on new sessions.
pub struct CoreSessionPublicInfo<NodeId> {
/// The list of core Blend nodes for the new session.