Initial network service (#2)

* initial network service

* [style] unpack inbound relay
This commit is contained in:
Giacomo Pasini 2022-11-03 14:28:37 +01:00 committed by GitHub
parent e9857d979b
commit 0fcfd92a55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 147 additions and 0 deletions

5
Cargo.toml Normal file
View File

@ -0,0 +1,5 @@
[workspace]
members = [
"nomos-services"
]

12
nomos-services/Cargo.toml Normal file
View File

@ -0,0 +1,12 @@
[package]
name = "nomos-services"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
overwatch = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
async-trait = "0.1"
tokio = { version = "1", features = ["sync"] }
tracing = "0.1"

View File

@ -0,0 +1 @@
pub mod network;

View File

@ -0,0 +1,12 @@
use super::*;
use overwatch::services::state::ServiceState;
use tokio::sync::broadcast::Receiver;
pub trait NetworkBackend {
type Config: Clone + Send + Sync + 'static;
type State: ServiceState<Settings = Self::Config> + Clone;;
fn new(config: Self::Config) -> Self;
fn broadcast(&self, msg: NetworkData);
fn subscribe(&mut self, event: EventKind) -> Receiver<NetworkEvent>;
}

View File

@ -0,0 +1,117 @@
pub mod backends;
use async_trait::async_trait;
use backends::NetworkBackend;
use overwatch::services::{
handle::ServiceStateHandle,
relay::RelayMessage,
state::{NoOperator, ServiceState},
ServiceCore, ServiceData, ServiceId,
};
use std::fmt::Debug;
use tokio::sync::broadcast;
use tokio::sync::oneshot;
pub type NetworkData = Box<[u8]>;
#[derive(Debug)]
pub enum NetworkMsg {
Broadcast(NetworkData),
Subscribe {
kind: EventKind,
sender: oneshot::Sender<broadcast::Receiver<NetworkEvent>>,
},
}
impl RelayMessage for NetworkMsg {}
#[derive(Debug)]
pub enum EventKind {
Message,
}
#[derive(Debug)]
pub enum NetworkEvent {
RawMessage(NetworkData),
}
pub struct NetworkConfig<I: NetworkBackend> {
pub backend: I::Config,
}
pub struct NetworkService<I: NetworkBackend + Send + 'static> {
backend: I,
service_state: ServiceStateHandle<Self>,
}
pub struct NetworkState<I: NetworkBackend> {
_backend: I::State,
}
impl<I: NetworkBackend + Send + 'static> ServiceData for NetworkService<I> {
const SERVICE_ID: ServiceId = "Network";
type Settings = NetworkConfig<I>;
type State = NetworkState<I>;
type StateOperator = NoOperator<Self::State>;
type Message = NetworkMsg;
}
#[async_trait]
impl<I: NetworkBackend + Send + 'static> ServiceCore for NetworkService<I> {
fn init(mut service_state: ServiceStateHandle<Self>) -> Self {
Self {
backend: <I as NetworkBackend>::new(
service_state.settings_reader.get_updated_settings().backend,
),
service_state,
}
}
async fn run(mut self) {
let Self {
service_state: ServiceStateHandle {
mut inbound_relay, ..
},
mut backend,
} = self;
while let Some(msg) = inbound_relay.recv().await {
match msg {
NetworkMsg::Broadcast(msg) => backend.broadcast(msg),
NetworkMsg::Subscribe { kind, sender } => {
sender.send(backend.subscribe(kind)).unwrap_or_else(|_| {
tracing::warn!(
"client hung up before as subscription handle could be established"
)
})
}
}
}
}
}
impl<I: NetworkBackend> Clone for NetworkConfig<I> {
fn clone(&self) -> Self {
NetworkConfig {
backend: self.backend.clone(),
}
}
}
impl<I: NetworkBackend> Clone for NetworkState<I> {
fn clone(&self) -> Self {
NetworkState {
_backend: self._backend.clone(),
}
}
}
impl<I: NetworkBackend + Send + 'static> ServiceState for NetworkState<I> {
type Settings = NetworkConfig<I>;
fn from_settings(settings: &Self::Settings) -> Self {
Self {
_backend: I::State::from_settings(&settings.backend),
}
}
}