bit improved

This commit is contained in:
holisticode 2024-09-10 22:03:52 -05:00
parent d4a68a9cad
commit e042000903
2 changed files with 24 additions and 21 deletions

View File

@ -33,7 +33,7 @@ use nomos_node::{Tx, Wire};
use nomos_storage::{backends::rocksdb::RocksBackend, StorageService};
use overwatch_derive::*;
use overwatch_rs::overwatch::{Overwatch, OverwatchRunner};
use overwatch_rs::services::handle::ServiceHandle;
use overwatch_rs::services::{handle::ServiceHandle, ServiceData};
use rand::{thread_rng, Rng};
use subnetworks_assignations::versions::v1::FillFromNodeList;
use tempfile::{NamedTempFile, TempDir};
@ -41,6 +41,7 @@ use time::OffsetDateTime;
use tokio_stream::{wrappers::BroadcastStream, StreamExt};
// internal
use crate::common::*;
use crate::service_wrapper::ServiceWrapper;
#[derive(Services)]
struct IndexerNode {
@ -195,6 +196,8 @@ fn test_indexer() {
vec![node_address(&swarm_config1)],
);
let wrapper1 = ServiceWrapper::new(node1.handle());
let mempool = node1.handle().relay::<DaMempool>();
let storage = node1.handle().relay::<StorageService<RocksBackend<Wire>>>();
let indexer = node1.handle().relay::<DaIndexer>();
@ -237,6 +240,11 @@ fn test_indexer() {
let indexer_outbound = indexer.connect().await.unwrap();
let consensus_outbound = consensus.connect().await.unwrap();
wrapper1.wrap(mempool_outbound);
wrapper1.wrap(&storage_outbound);
wrapper1.wrap(&indexer_outbound);
wrapper1.wrap(&consensus_outbound);
let (sender, receiver) = tokio::sync::oneshot::channel();
consensus_outbound
.send(ConsensusMsg::BlockSubscribe { sender })

View File

@ -1,30 +1,25 @@
use overwatch_rs::{
overwatch::{handle::OverwatchHandle, OverwatchRunner, Services},
services::ServiceData,
overwatch::{handle::OverwatchHandle, Services},
services::{
relay::{AnyMessage, OutboundRelay},
ServiceData,
},
};
pub struct ServiceWrapper<S> {
ow_handle: OverwatchHandle,
wrapped_services: Vec<S>,
pub struct ServiceWrapper<'a> {
ow_handle: &'a OverwatchHandle,
wrapped_services: Vec<&'a OutboundRelay<ServiceData::Message>>,
}
impl<S: Services> ServiceWrapper<S>
where
S: Services + Send,
{
fn new(settings: S::Settings, relays: Vec<ServiceData>) -> Self {
let ow = OverwatchRunner::<S>::run(settings, None)
.map_err(|e| eprintln!("Error encountered: {}", e))
.unwrap();
let handle = ow.handle();
let mut services = vec![];
for s in relays.iter() {
let relay = handle.relay::<s>();
services.push(relay);
}
impl<'a> ServiceWrapper<'a> {
pub fn new(handle: &'a OverwatchHandle) -> Self {
Self {
ow_handle: handle,
wrapped_services: services,
wrapped_services: vec![],
}
}
pub async fn wrap(&mut self, relay: &'a OutboundRelay<ServiceData::Message>) {
self.wrapped_services.push(relay);
}
}