From 0fcfd92a555e117c9eb1ceae65a14bcc825118cd Mon Sep 17 00:00:00 2001 From: Giacomo Pasini Date: Thu, 3 Nov 2022 14:28:37 +0100 Subject: [PATCH] Initial network service (#2) * initial network service * [style] unpack inbound relay --- Cargo.toml | 5 + nomos-services/Cargo.toml | 12 +++ nomos-services/src/lib.rs | 1 + nomos-services/src/network/backends/mod.rs | 12 +++ nomos-services/src/network/mod.rs | 117 +++++++++++++++++++++ 5 files changed, 147 insertions(+) create mode 100644 Cargo.toml create mode 100644 nomos-services/Cargo.toml create mode 100644 nomos-services/src/lib.rs create mode 100644 nomos-services/src/network/backends/mod.rs create mode 100644 nomos-services/src/network/mod.rs diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 00000000..38df4a60 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,5 @@ +[workspace] + +members = [ + "nomos-services" +] \ No newline at end of file diff --git a/nomos-services/Cargo.toml b/nomos-services/Cargo.toml new file mode 100644 index 00000000..f9e261b2 --- /dev/null +++ b/nomos-services/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "nomos-services" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +overwatch = { git = "https://github.com/logos-co/Overwatch", branch = "main" } +async-trait = "0.1" +tokio = { version = "1", features = ["sync"] } +tracing = "0.1" \ No newline at end of file diff --git a/nomos-services/src/lib.rs b/nomos-services/src/lib.rs new file mode 100644 index 00000000..a61610bd --- /dev/null +++ b/nomos-services/src/lib.rs @@ -0,0 +1 @@ +pub mod network; diff --git a/nomos-services/src/network/backends/mod.rs b/nomos-services/src/network/backends/mod.rs new file mode 100644 index 00000000..3221c6e1 --- /dev/null +++ b/nomos-services/src/network/backends/mod.rs @@ -0,0 +1,12 @@ +use super::*; +use overwatch::services::state::ServiceState; +use tokio::sync::broadcast::Receiver; + +pub trait NetworkBackend { + type Config: Clone + Send + Sync + 'static; + type State: ServiceState + Clone;; + + fn new(config: Self::Config) -> Self; + fn broadcast(&self, msg: NetworkData); + fn subscribe(&mut self, event: EventKind) -> Receiver; +} diff --git a/nomos-services/src/network/mod.rs b/nomos-services/src/network/mod.rs new file mode 100644 index 00000000..b4b56490 --- /dev/null +++ b/nomos-services/src/network/mod.rs @@ -0,0 +1,117 @@ +pub mod backends; + +use async_trait::async_trait; +use backends::NetworkBackend; +use overwatch::services::{ + handle::ServiceStateHandle, + relay::RelayMessage, + state::{NoOperator, ServiceState}, + ServiceCore, ServiceData, ServiceId, +}; +use std::fmt::Debug; +use tokio::sync::broadcast; +use tokio::sync::oneshot; + +pub type NetworkData = Box<[u8]>; + +#[derive(Debug)] +pub enum NetworkMsg { + Broadcast(NetworkData), + Subscribe { + kind: EventKind, + sender: oneshot::Sender>, + }, +} + +impl RelayMessage for NetworkMsg {} + +#[derive(Debug)] +pub enum EventKind { + Message, +} + +#[derive(Debug)] +pub enum NetworkEvent { + RawMessage(NetworkData), +} + +pub struct NetworkConfig { + pub backend: I::Config, +} + +pub struct NetworkService { + backend: I, + service_state: ServiceStateHandle, +} + +pub struct NetworkState { + _backend: I::State, +} + +impl ServiceData for NetworkService { + const SERVICE_ID: ServiceId = "Network"; + type Settings = NetworkConfig; + type State = NetworkState; + type StateOperator = NoOperator; + type Message = NetworkMsg; +} + +#[async_trait] +impl ServiceCore for NetworkService { + fn init(mut service_state: ServiceStateHandle) -> Self { + Self { + backend: ::new( + service_state.settings_reader.get_updated_settings().backend, + ), + service_state, + } + } + + async fn run(mut self) { + let Self { + service_state: ServiceStateHandle { + mut inbound_relay, .. + }, + mut backend, + } = self; + + while let Some(msg) = inbound_relay.recv().await { + match msg { + NetworkMsg::Broadcast(msg) => backend.broadcast(msg), + NetworkMsg::Subscribe { kind, sender } => { + sender.send(backend.subscribe(kind)).unwrap_or_else(|_| { + tracing::warn!( + "client hung up before as subscription handle could be established" + ) + }) + } + } + } + } +} + +impl Clone for NetworkConfig { + fn clone(&self) -> Self { + NetworkConfig { + backend: self.backend.clone(), + } + } +} + +impl Clone for NetworkState { + fn clone(&self) -> Self { + NetworkState { + _backend: self._backend.clone(), + } + } +} + +impl ServiceState for NetworkState { + type Settings = NetworkConfig; + + fn from_settings(settings: &Self::Settings) -> Self { + Self { + _backend: I::State::from_settings(&settings.backend), + } + } +}