Abort tasks for da networks backends (#853)

* Abort tasks for da networks backends

* fmt
This commit is contained in:
Daniel Sanchez 2024-10-24 18:17:33 +02:00 committed by GitHub
parent 193ff82980
commit 733f50b521
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 84 additions and 36 deletions

View File

@ -3,7 +3,11 @@ use crate::backends::libp2p::common::{
DaNetworkBackendSettings, SamplingEvent, BROADCAST_CHANNEL_SIZE,
};
use crate::backends::NetworkBackend;
use futures::{Stream, StreamExt};
use futures::future::Aborted;
use futures::{
stream::{AbortHandle, Abortable},
Stream, StreamExt,
};
use kzgrs_backend::common::blob::DaBlob;
use libp2p::PeerId;
use log::error;
@ -71,15 +75,9 @@ pub struct DaNetworkExecutorBackend<Membership>
where
Membership: MembershipHandler,
{
// TODO: this join handles should be cancelable tasks. We should add an stop method for
// the `NetworkBackend` trait so if the service is stopped the backend can gracefully handle open
// sub-tasks as well.
#[allow(dead_code)]
task: JoinHandle<()>,
#[allow(dead_code)]
verifier_replies_task: JoinHandle<()>,
#[allow(dead_code)]
executor_replies_task: JoinHandle<()>,
task: (AbortHandle, JoinHandle<Result<(), Aborted>>),
verifier_replies_task: (AbortHandle, JoinHandle<Result<(), Aborted>>),
executor_replies_task: (AbortHandle, JoinHandle<Result<(), Aborted>>),
sampling_request_channel: UnboundedSender<(SubnetworkId, BlobId)>,
sampling_broadcast_receiver: broadcast::Receiver<SamplingEvent>,
verifying_broadcast_receiver: broadcast::Receiver<DaBlob>,
@ -147,7 +145,13 @@ where
let dispersal_blobs_sender = executor_swarm.dispersal_blobs_channel();
let executor_open_stream_sender = executor_swarm.dispersal_open_stream_sender();
let task = overwatch_handle.runtime().spawn(executor_swarm.run());
let (task_abort_handle, abort_registration) = AbortHandle::new_pair();
let task = (
task_abort_handle,
overwatch_handle
.runtime()
.spawn(Abortable::new(executor_swarm.run(), abort_registration)),
);
std::thread::sleep(Duration::from_secs(1));
// open streams to dispersal peers
@ -161,21 +165,32 @@ where
broadcast::channel(BROADCAST_CHANNEL_SIZE);
let (dispersal_broadcast_sender, dispersal_broadcast_receiver) =
broadcast::channel(BROADCAST_CHANNEL_SIZE);
let verifier_replies_task =
overwatch_handle
.runtime()
.spawn(handle_validator_events_stream(
let (verifier_replies_task_abort_handle, verifier_replies_task_abort_registration) =
AbortHandle::new_pair();
let verifier_replies_task = (
verifier_replies_task_abort_handle,
overwatch_handle.runtime().spawn(Abortable::new(
handle_validator_events_stream(
executor_events_stream.validator_events_stream,
sampling_broadcast_sender,
verifying_broadcast_sender,
));
let executor_replies_task =
overwatch_handle
.runtime()
.spawn(handle_executor_dispersal_events_stream(
),
verifier_replies_task_abort_registration,
)),
);
let (executor_replies_task_abort_handle, executor_replies_task_abort_registration) =
AbortHandle::new_pair();
let executor_replies_task = (
executor_replies_task_abort_handle,
overwatch_handle.runtime().spawn(Abortable::new(
handle_executor_dispersal_events_stream(
executor_events_stream.dispersal_events_receiver,
dispersal_broadcast_sender,
));
),
executor_replies_task_abort_registration,
)),
);
Self {
task,
@ -190,6 +205,18 @@ where
}
}
fn shutdown(&mut self) {
let Self {
task: (task_handle, _),
verifier_replies_task: (verifier_handle, _),
executor_replies_task: (executor_handle, _),
..
} = self;
task_handle.abort();
verifier_handle.abort();
executor_handle.abort();
}
async fn process(&self, msg: Self::Message) {
match msg {
ExecutorDaNetworkMessage::RequestSample {

View File

@ -3,6 +3,7 @@ use crate::backends::libp2p::common::{
DaNetworkBackendSettings, SamplingEvent, BROADCAST_CHANNEL_SIZE,
};
use crate::backends::NetworkBackend;
use futures::future::{AbortHandle, Abortable, Aborted};
use futures::{Stream, StreamExt};
use kzgrs_backend::common::blob::DaBlob;
use libp2p::PeerId;
@ -51,13 +52,8 @@ pub enum DaNetworkEvent {
/// Internally uses a libp2p swarm composed of the [`ValidatorBehaviour`]
/// It forwards network messages to the corresponding subscription channels/streams
pub struct DaNetworkValidatorBackend<Membership> {
// TODO: this join handles should be cancelable tasks. We should add an stop method for
// the `NetworkBackend` trait so if the service is stopped the backend can gracefully handle open
// sub-tasks as well.
#[allow(dead_code)]
task: JoinHandle<()>,
#[allow(dead_code)]
replies_task: JoinHandle<()>,
task: (AbortHandle, JoinHandle<Result<(), Aborted>>),
replies_task: (AbortHandle, JoinHandle<Result<(), Aborted>>),
sampling_request_channel: UnboundedSender<(SubnetworkId, BlobId)>,
sampling_broadcast_receiver: broadcast::Receiver<SamplingEvent>,
verifying_broadcast_receiver: broadcast::Receiver<DaBlob>,
@ -108,18 +104,29 @@ where
let sampling_request_channel = validator_swarm.sample_request_channel();
let task = overwatch_handle.runtime().spawn(validator_swarm.run());
let (task_abort_handle, abort_registration) = AbortHandle::new_pair();
let task = (
task_abort_handle,
overwatch_handle
.runtime()
.spawn(Abortable::new(validator_swarm.run(), abort_registration)),
);
let (sampling_broadcast_sender, sampling_broadcast_receiver) =
broadcast::channel(BROADCAST_CHANNEL_SIZE);
let (verifying_broadcast_sender, verifying_broadcast_receiver) =
broadcast::channel(BROADCAST_CHANNEL_SIZE);
let replies_task = overwatch_handle
.runtime()
.spawn(handle_validator_events_stream(
validator_events_stream,
sampling_broadcast_sender,
verifying_broadcast_sender,
));
let (replies_task_abort_handle, replies_task_abort_registration) = AbortHandle::new_pair();
let replies_task = (
replies_task_abort_handle,
overwatch_handle.runtime().spawn(Abortable::new(
handle_validator_events_stream(
validator_events_stream,
sampling_broadcast_sender,
verifying_broadcast_sender,
),
replies_task_abort_registration,
)),
);
Self {
task,
@ -131,6 +138,16 @@ where
}
}
fn shutdown(&mut self) {
let Self {
task: (task_handle, _),
replies_task: (replies_handle, _),
..
} = self;
task_handle.abort();
replies_handle.abort();
}
async fn process(&self, msg: Self::Message) {
match msg {
DaNetworkMessage::RequestSample {

View File

@ -80,6 +80,8 @@ impl NetworkBackend for MockExecutorBackend {
}
}
fn shutdown(&mut self) {}
async fn process(&self, msg: Self::Message) {
match msg {
Command::Disperse {

View File

@ -15,6 +15,7 @@ pub trait NetworkBackend {
type NetworkEvent: Debug + Send + Sync + 'static;
fn new(config: Self::Settings, overwatch_handle: OverwatchHandle) -> Self;
fn shutdown(&mut self);
async fn process(&self, msg: Self::Message);
async fn subscribe(
&mut self,

View File

@ -102,6 +102,7 @@ where
}
Some(msg) = lifecycle_stream.next() => {
if Self::should_stop_service(msg).await {
backend.shutdown();
break;
}
}