From b3c5379b6a5cd55ec541b67bb21899a29d4331a4 Mon Sep 17 00:00:00 2001 From: holisticode Date: Thu, 29 Aug 2024 12:44:07 -0500 Subject: [PATCH] abstract rng --- .../sampling/src/backend/kzgrs.rs | 14 ++++---- .../sampling/src/backend/mod.rs | 5 +-- .../data-availability/sampling/src/lib.rs | 34 +++++++++++++------ 3 files changed, 33 insertions(+), 20 deletions(-) diff --git a/nomos-services/data-availability/sampling/src/backend/kzgrs.rs b/nomos-services/data-availability/sampling/src/backend/kzgrs.rs index 37a10c75..cd0ae5e2 100644 --- a/nomos-services/data-availability/sampling/src/backend/kzgrs.rs +++ b/nomos-services/data-availability/sampling/src/backend/kzgrs.rs @@ -6,7 +6,7 @@ use std::time::{Duration, Instant}; // crates use rand::distributions::Standard; use rand::prelude::*; -use rand_chacha::ChaCha20Rng; +use rand::SeedableRng; use kzgrs_backend::common::blob::DaBlob; @@ -28,16 +28,16 @@ pub struct KzgrsDaSamplerSettings { pub blobs_validity_duration: Duration, } -pub struct KzgrsDaSampler { +pub struct KzgrsDaSampler { settings: KzgrsDaSamplerSettings, validated_blobs: BTreeSet, pending_sampling_blobs: HashMap, // TODO: is there a better place for this? Do we need to have this even globally? // Do we already have some source of randomness already? - rng: ChaCha20Rng, + rng: R, } -impl KzgrsDaSampler { +impl KzgrsDaSampler { fn prune_by_time(&mut self) { self.pending_sampling_blobs.retain(|_blob_id, context| { context.started.elapsed() < self.settings.old_blobs_check_duration @@ -46,18 +46,18 @@ impl KzgrsDaSampler { } #[async_trait::async_trait] -impl DaSamplingServiceBackend for KzgrsDaSampler { +impl DaSamplingServiceBackend for KzgrsDaSampler { type Settings = KzgrsDaSamplerSettings; type BlobId = BlobId; type Blob = DaBlob; - fn new(settings: Self::Settings) -> Self { + fn new(settings: Self::Settings, rng: R) -> Self { let bt: BTreeSet = BTreeSet::new(); Self { settings, validated_blobs: bt, pending_sampling_blobs: HashMap::new(), - rng: ChaCha20Rng::from_entropy(), + rng: rng, } } diff --git a/nomos-services/data-availability/sampling/src/backend/mod.rs b/nomos-services/data-availability/sampling/src/backend/mod.rs index 64903831..4e3de12b 100644 --- a/nomos-services/data-availability/sampling/src/backend/mod.rs +++ b/nomos-services/data-availability/sampling/src/backend/mod.rs @@ -4,17 +4,18 @@ pub mod kzgrs; use std::collections::BTreeSet; // crates +use rand::Rng; // // internal use nomos_da_network_core::SubnetworkId; #[async_trait::async_trait] -pub trait DaSamplingServiceBackend { +pub trait DaSamplingServiceBackend { type Settings; type BlobId; type Blob; - fn new(settings: Self::Settings) -> Self; + fn new(settings: Self::Settings, rng: R) -> Self; async fn get_validated_blobs(&self) -> BTreeSet; async fn mark_in_block(&mut self, blobs_ids: &[Self::BlobId]); async fn handle_sampling_success(&mut self, blob_id: Self::BlobId, blob: Self::Blob); diff --git a/nomos-services/data-availability/sampling/src/lib.rs b/nomos-services/data-availability/sampling/src/lib.rs index 875954dd..fe388f61 100644 --- a/nomos-services/data-availability/sampling/src/lib.rs +++ b/nomos-services/data-availability/sampling/src/lib.rs @@ -6,6 +6,10 @@ use std::collections::BTreeSet; use std::fmt::Debug; // crates +use rand::prelude::*; +use rand::Rng; +use rand_chacha::rand_core::CryptoRngCore; +use rand_chacha::ChaCha20Rng; use tokio_stream::StreamExt; use tracing::{error, span, Instrument, Level}; // internal @@ -46,9 +50,10 @@ pub struct DaSamplingServiceSettings { impl RelayMessage for DaSamplingServiceMsg {} -pub struct DaSamplingService +pub struct DaSamplingService where - Backend: DaSamplingServiceBackend + Send, + R: Rng, + Backend: DaSamplingServiceBackend + Send, Backend::Settings: Clone, Backend::Blob: Debug + 'static, Backend::BlobId: Debug + 'static, @@ -60,9 +65,10 @@ where sampler: Backend, } -impl DaSamplingService +impl DaSamplingService where - Backend: DaSamplingServiceBackend + Send + 'static, + R: Rng, + Backend: DaSamplingServiceBackend + Send + 'static, Backend::Settings: Clone, N: NetworkAdapter + Send + 'static, N::Settings: Clone, @@ -126,9 +132,10 @@ where } } -impl ServiceData for DaSamplingService +impl ServiceData for DaSamplingService where - Backend: DaSamplingServiceBackend + Send, + R: Rng, + Backend: DaSamplingServiceBackend + Send, Backend::Settings: Clone, Backend::Blob: Debug + 'static, Backend::BlobId: Debug + 'static, @@ -143,9 +150,10 @@ where } #[async_trait::async_trait] -impl ServiceCore for DaSamplingService +impl ServiceCore for DaSamplingService where - Backend: DaSamplingServiceBackend + Send + Sync + 'static, + R: Rng, + Backend: DaSamplingServiceBackend + Send + Sync + 'static, Backend::Settings: Clone + Send + Sync + 'static, N: NetworkAdapter + Send + Sync + 'static, N::Settings: Clone + Send + Sync + 'static, @@ -156,11 +164,12 @@ where } = service_state.settings_reader.get_updated_settings(); let network_relay = service_state.overwatch_handle.relay(); + let rng: R = ChaCha20Rng::from_entropy(); Ok(Self { network_relay, service_state, - sampler: Backend::new(sampling_settings), + sampler: Backend::new(sampling_settings, rng), }) } @@ -187,18 +196,21 @@ where tokio::select! { Some(service_message) = service_state.inbound_relay.recv() => { Self::handle_service_message(service_message, &mut network_adapter, &mut sampler).await; + // cleanup not on time samples + sampler.prune(); } Some(sampling_message) = sampling_message_stream.next() => { Self::handle_sampling_message(sampling_message, &mut sampler).await; + // cleanup not on time samples + sampler.prune(); } + Some(msg) = lifecycle_stream.next() => { if Self::should_stop_service(msg).await { break; } } } - // cleanup not on time samples - sampler.prune(); } } .instrument(span!(Level::TRACE, DA_SAMPLING_TAG))