diff --git a/nomos-services/data-availability/network/src/backends/libp2p/executor.rs b/nomos-services/data-availability/network/src/backends/libp2p/executor.rs index f8c11629..9330a63d 100644 --- a/nomos-services/data-availability/network/src/backends/libp2p/executor.rs +++ b/nomos-services/data-availability/network/src/backends/libp2p/executor.rs @@ -3,7 +3,11 @@ use crate::backends::libp2p::common::{ DaNetworkBackendSettings, SamplingEvent, BROADCAST_CHANNEL_SIZE, }; use crate::backends::NetworkBackend; -use futures::{Stream, StreamExt}; +use futures::future::Aborted; +use futures::{ + stream::{AbortHandle, Abortable}, + Stream, StreamExt, +}; use kzgrs_backend::common::blob::DaBlob; use libp2p::PeerId; use log::error; @@ -71,15 +75,9 @@ pub struct DaNetworkExecutorBackend where Membership: MembershipHandler, { - // TODO: this join handles should be cancelable tasks. We should add an stop method for - // the `NetworkBackend` trait so if the service is stopped the backend can gracefully handle open - // sub-tasks as well. - #[allow(dead_code)] - task: JoinHandle<()>, - #[allow(dead_code)] - verifier_replies_task: JoinHandle<()>, - #[allow(dead_code)] - executor_replies_task: JoinHandle<()>, + task: (AbortHandle, JoinHandle>), + verifier_replies_task: (AbortHandle, JoinHandle>), + executor_replies_task: (AbortHandle, JoinHandle>), sampling_request_channel: UnboundedSender<(SubnetworkId, BlobId)>, sampling_broadcast_receiver: broadcast::Receiver, verifying_broadcast_receiver: broadcast::Receiver, @@ -147,7 +145,13 @@ where let dispersal_blobs_sender = executor_swarm.dispersal_blobs_channel(); let executor_open_stream_sender = executor_swarm.dispersal_open_stream_sender(); - let task = overwatch_handle.runtime().spawn(executor_swarm.run()); + let (task_abort_handle, abort_registration) = AbortHandle::new_pair(); + let task = ( + task_abort_handle, + overwatch_handle + .runtime() + .spawn(Abortable::new(executor_swarm.run(), abort_registration)), + ); std::thread::sleep(Duration::from_secs(1)); // open streams to dispersal peers @@ -161,21 +165,32 @@ where broadcast::channel(BROADCAST_CHANNEL_SIZE); let (dispersal_broadcast_sender, dispersal_broadcast_receiver) = broadcast::channel(BROADCAST_CHANNEL_SIZE); - let verifier_replies_task = - overwatch_handle - .runtime() - .spawn(handle_validator_events_stream( + let (verifier_replies_task_abort_handle, verifier_replies_task_abort_registration) = + AbortHandle::new_pair(); + let verifier_replies_task = ( + verifier_replies_task_abort_handle, + overwatch_handle.runtime().spawn(Abortable::new( + handle_validator_events_stream( executor_events_stream.validator_events_stream, sampling_broadcast_sender, verifying_broadcast_sender, - )); - let executor_replies_task = - overwatch_handle - .runtime() - .spawn(handle_executor_dispersal_events_stream( + ), + verifier_replies_task_abort_registration, + )), + ); + let (executor_replies_task_abort_handle, executor_replies_task_abort_registration) = + AbortHandle::new_pair(); + + let executor_replies_task = ( + executor_replies_task_abort_handle, + overwatch_handle.runtime().spawn(Abortable::new( + handle_executor_dispersal_events_stream( executor_events_stream.dispersal_events_receiver, dispersal_broadcast_sender, - )); + ), + executor_replies_task_abort_registration, + )), + ); Self { task, @@ -190,6 +205,18 @@ where } } + fn shutdown(&mut self) { + let Self { + task: (task_handle, _), + verifier_replies_task: (verifier_handle, _), + executor_replies_task: (executor_handle, _), + .. + } = self; + task_handle.abort(); + verifier_handle.abort(); + executor_handle.abort(); + } + async fn process(&self, msg: Self::Message) { match msg { ExecutorDaNetworkMessage::RequestSample { diff --git a/nomos-services/data-availability/network/src/backends/libp2p/validator.rs b/nomos-services/data-availability/network/src/backends/libp2p/validator.rs index 1682595c..5b033306 100644 --- a/nomos-services/data-availability/network/src/backends/libp2p/validator.rs +++ b/nomos-services/data-availability/network/src/backends/libp2p/validator.rs @@ -3,6 +3,7 @@ use crate::backends::libp2p::common::{ DaNetworkBackendSettings, SamplingEvent, BROADCAST_CHANNEL_SIZE, }; use crate::backends::NetworkBackend; +use futures::future::{AbortHandle, Abortable, Aborted}; use futures::{Stream, StreamExt}; use kzgrs_backend::common::blob::DaBlob; use libp2p::PeerId; @@ -51,13 +52,8 @@ pub enum DaNetworkEvent { /// Internally uses a libp2p swarm composed of the [`ValidatorBehaviour`] /// It forwards network messages to the corresponding subscription channels/streams pub struct DaNetworkValidatorBackend { - // TODO: this join handles should be cancelable tasks. We should add an stop method for - // the `NetworkBackend` trait so if the service is stopped the backend can gracefully handle open - // sub-tasks as well. - #[allow(dead_code)] - task: JoinHandle<()>, - #[allow(dead_code)] - replies_task: JoinHandle<()>, + task: (AbortHandle, JoinHandle>), + replies_task: (AbortHandle, JoinHandle>), sampling_request_channel: UnboundedSender<(SubnetworkId, BlobId)>, sampling_broadcast_receiver: broadcast::Receiver, verifying_broadcast_receiver: broadcast::Receiver, @@ -108,18 +104,29 @@ where let sampling_request_channel = validator_swarm.sample_request_channel(); - let task = overwatch_handle.runtime().spawn(validator_swarm.run()); + let (task_abort_handle, abort_registration) = AbortHandle::new_pair(); + let task = ( + task_abort_handle, + overwatch_handle + .runtime() + .spawn(Abortable::new(validator_swarm.run(), abort_registration)), + ); let (sampling_broadcast_sender, sampling_broadcast_receiver) = broadcast::channel(BROADCAST_CHANNEL_SIZE); let (verifying_broadcast_sender, verifying_broadcast_receiver) = broadcast::channel(BROADCAST_CHANNEL_SIZE); - let replies_task = overwatch_handle - .runtime() - .spawn(handle_validator_events_stream( - validator_events_stream, - sampling_broadcast_sender, - verifying_broadcast_sender, - )); + let (replies_task_abort_handle, replies_task_abort_registration) = AbortHandle::new_pair(); + let replies_task = ( + replies_task_abort_handle, + overwatch_handle.runtime().spawn(Abortable::new( + handle_validator_events_stream( + validator_events_stream, + sampling_broadcast_sender, + verifying_broadcast_sender, + ), + replies_task_abort_registration, + )), + ); Self { task, @@ -131,6 +138,16 @@ where } } + fn shutdown(&mut self) { + let Self { + task: (task_handle, _), + replies_task: (replies_handle, _), + .. + } = self; + task_handle.abort(); + replies_handle.abort(); + } + async fn process(&self, msg: Self::Message) { match msg { DaNetworkMessage::RequestSample { diff --git a/nomos-services/data-availability/network/src/backends/mock/executor.rs b/nomos-services/data-availability/network/src/backends/mock/executor.rs index 180dd6ee..9a827f49 100644 --- a/nomos-services/data-availability/network/src/backends/mock/executor.rs +++ b/nomos-services/data-availability/network/src/backends/mock/executor.rs @@ -80,6 +80,8 @@ impl NetworkBackend for MockExecutorBackend { } } + fn shutdown(&mut self) {} + async fn process(&self, msg: Self::Message) { match msg { Command::Disperse { diff --git a/nomos-services/data-availability/network/src/backends/mod.rs b/nomos-services/data-availability/network/src/backends/mod.rs index 95338a4e..80c3fd67 100644 --- a/nomos-services/data-availability/network/src/backends/mod.rs +++ b/nomos-services/data-availability/network/src/backends/mod.rs @@ -15,6 +15,7 @@ pub trait NetworkBackend { type NetworkEvent: Debug + Send + Sync + 'static; fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self; + fn shutdown(&mut self); async fn process(&self, msg: Self::Message); async fn subscribe( &mut self, diff --git a/nomos-services/data-availability/network/src/lib.rs b/nomos-services/data-availability/network/src/lib.rs index 294721f2..a7ea6e4b 100644 --- a/nomos-services/data-availability/network/src/lib.rs +++ b/nomos-services/data-availability/network/src/lib.rs @@ -102,6 +102,7 @@ where } Some(msg) = lifecycle_stream.next() => { if Self::should_stop_service(msg).await { + backend.shutdown(); break; } }