Add waku-chat example (#1)

* add waku-chat example

* clippy happy

* update dependencies

* remove unneeded tokio features

* use separate thread for stdin

* only build on linux

Co-authored-by: Giacomo Pasini <giacomo@status.im>
This commit is contained in:
Giacomo Pasini 2022-10-24 13:21:39 +02:00 committed by GitHub
parent f8f33ee781
commit cc20ecc918
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 1298 additions and 3 deletions

971
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -3,6 +3,7 @@
members = [
"overwatch",
"overwatch-derive",
"examples/waku-chat"
]
[profile.release-opt]

View File

@ -0,0 +1,19 @@
[package]
name = "waku-chat"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { version = "1", features = ["rt"] }
waku = { git = "https://github.com/waku-org/waku-rust-bindings" }
serde = "1"
bincode = "1"
overwatch = { path = "../../overwatch" }
overwatch-derive = { path = "../../overwatch-derive" }
tracing = "*"
async-trait = "0.1"
tracing-subscriber = "0.3"
clap = { version = "4.0.18", features = ["derive"] }
rand = "0.8"

View File

@ -0,0 +1,90 @@
use crate::network::*;
use async_trait::async_trait;
use overwatch::services::handle::ServiceStateHandle;
use overwatch::services::relay::{NoMessage, Relay};
use overwatch::services::state::{NoOperator, NoState};
use overwatch::services::{ServiceCore, ServiceData, ServiceId};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::channel;
/// Chat service handler
/// displays received messages, send new ones
pub struct ChatService {
service_state: ServiceStateHandle<Self>,
}
#[derive(Deserialize, Serialize)]
struct Message {
user: usize,
msg: Box<[u8]>,
}
impl ServiceData for ChatService {
const SERVICE_ID: ServiceId = "Chat";
type Settings = usize;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = NoMessage;
}
#[async_trait]
impl ServiceCore for ChatService {
fn init(service_state: ServiceStateHandle<Self>) -> Self {
Self { service_state }
}
async fn run(self) {
let Self {
mut service_state, ..
} = self;
// TODO: waku should not end up in the public interface of the network service, at least not as a type
let mut network_relay: Relay<NetworkService<waku::Waku>> =
service_state.overwatch_handle.relay();
let user = service_state.settings_reader.get_updated_settings();
network_relay.connect().await.unwrap();
let (sender, mut receiver) = channel(1);
// TODO: typestate so I can't call send if it's not connected
network_relay
.send(NetworkMsg::Subscribe {
kind: EventKind::Message,
sender,
})
.await
.unwrap();
// send new messages
// for interactive stdin I/O it's recommended to
// use an external thread, see https://docs.rs/tokio/latest/tokio/io/struct.Stdin.html
std::thread::spawn(move || loop {
let mut input = String::new();
std::io::stdin()
.read_line(&mut input)
.expect("error reading message");
input.truncate(input.trim().len());
network_relay
.blocking_send(NetworkMsg::Broadcast(
bincode::serialize(&Message {
user,
msg: input.as_bytes().to_vec().into_boxed_slice(),
})
.unwrap()
.into_boxed_slice(),
))
.unwrap();
tracing::debug!("[sending]: {}...", input);
});
// print received messages
while let Some(NetworkEvent::RawMessage(message)) = receiver.recv().await {
if let Ok(msg) = bincode::deserialize::<Message>(&message) {
if msg.user != user {
println!(
"[received][{}]: {}",
msg.user,
String::from_utf8_lossy(&msg.msg)
);
}
}
}
}
}

View File

@ -0,0 +1,51 @@
#![allow(dead_code)]
#![allow(unused)]
// public chat service
// messages are disseminated through waku,
// no consensus, no blocks
mod network;
// TODO: different chat rooms with different contentTopicId
mod chat;
use chat::*;
use clap::Parser;
use network::*;
use overwatch::{overwatch::*, services::handle::ServiceHandle};
use overwatch_derive::*;
/// Simple program to greet a person
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Multiaddrs of other nodes participating in the protocol
#[arg(short, long)]
peers: Vec<String>,
/// Listening port
port: u16,
}
#[derive(Services)]
struct Services {
chat: ServiceHandle<ChatService>,
network: ServiceHandle<NetworkService<network::waku::Waku>>,
}
#[cfg(target_os = "linux")]
fn main() {
tracing_subscriber::fmt::init();
let Args { peers, port } = Args::parse();
let app = OverwatchRunner::<Services>::run(
ServicesServiceSettings {
chat: rand::random(),
network: NetworkConfig { peers, port },
},
None,
);
app.wait_finished();
}
#[cfg(not(target_os = "linux"))]
fn main() {
println!("waku is only supported on linux");
}

View File

@ -0,0 +1,81 @@
pub mod waku;
use async_trait::async_trait;
use overwatch::services::handle::ServiceStateHandle;
use overwatch::services::relay::RelayMessage;
use overwatch::services::state::{NoOperator, NoState};
use overwatch::services::{ServiceCore, ServiceData, ServiceId};
use std::fmt::Debug;
use tokio::sync::mpsc::Sender;
#[derive(Debug)]
pub enum NetworkMsg {
Broadcast(Box<[u8]>),
Subscribe {
kind: EventKind,
sender: Sender<NetworkEvent>,
},
}
impl RelayMessage for NetworkMsg {}
#[derive(Debug)]
pub enum EventKind {
Message,
}
#[derive(Debug)]
pub enum NetworkEvent {
RawMessage(Box<[u8]>),
}
#[derive(Clone, Debug)]
pub struct NetworkConfig {
pub port: u16,
pub peers: Vec<String>,
}
pub struct NetworkService<I: NetworkBackend + Send + 'static> {
implem: I,
service_state: ServiceStateHandle<Self>,
}
impl<I: NetworkBackend + Send + 'static> ServiceData for NetworkService<I> {
const SERVICE_ID: ServiceId = "Network";
type Settings = NetworkConfig;
type State = NoState<Self::Settings>;
type StateOperator = NoOperator<Self::State>;
type Message = NetworkMsg;
}
#[async_trait]
impl<I: NetworkBackend + Send + 'static> ServiceCore for NetworkService<I> {
fn init(mut service_state: ServiceStateHandle<Self>) -> Self {
Self {
implem: <I as NetworkBackend>::new(
service_state.settings_reader.get_updated_settings(),
),
service_state,
}
}
async fn run(mut self) {
let Self {
service_state,
mut implem,
} = self;
let mut relay = service_state.inbound_relay;
while let Some(msg) = relay.recv().await {
match msg {
NetworkMsg::Broadcast(msg) => implem.broadcast(msg),
NetworkMsg::Subscribe { kind: _, sender } => implem.subscribe(sender),
}
}
}
}
pub trait NetworkBackend {
fn new(config: NetworkConfig) -> Self;
fn broadcast(&self, msg: Box<[u8]>);
fn subscribe(&mut self, sender: Sender<NetworkEvent>);
}

View File

@ -0,0 +1,76 @@
use super::*;
use ::waku::*;
use std::str::FromStr;
use std::sync::{Arc, RwLock};
use std::time::SystemTime;
use tokio::sync::mpsc::Sender;
pub struct Waku {
waku: WakuNodeHandle<Running>,
subscribers: Arc<RwLock<Vec<Sender<NetworkEvent>>>>,
}
impl NetworkBackend for Waku {
fn new(config: NetworkConfig) -> Self {
let waku_config = WakuNodeConfig {
port: Some(config.port.into()),
..Default::default()
};
let waku = waku_new(Some(waku_config)).unwrap().start().unwrap();
for peer in config.peers {
let addr = Multiaddr::from_str(&peer).unwrap();
let peer_id = waku.add_peer(&addr, waku::ProtocolId::Relay).unwrap();
waku.connect_peer_with_id(peer_id, None).unwrap();
}
waku.relay_subscribe(None).unwrap();
assert!(waku.relay_enough_peers(None).unwrap());
tracing::info!("waku listening on {}", waku.listen_addresses().unwrap()[0]);
Self {
waku,
subscribers: Arc::new(RwLock::new(Vec::new())),
}
}
fn subscribe(&mut self, sender: Sender<NetworkEvent>) {
self.subscribers.write().unwrap().push(sender);
tracing::debug!("someone subscribed");
let subscribers = Arc::clone(&self.subscribers);
waku_set_event_callback(move |sig| {
match sig.event() {
Event::WakuMessage(ref message_event) => {
tracing::debug!("received message event");
// we can probably avoid sending a copy to each subscriber and just borrow / clone on demand
for s in subscribers.read().unwrap().iter() {
s.try_send(NetworkEvent::RawMessage(
message_event
.waku_message()
.payload()
.to_vec()
.into_boxed_slice(),
))
.unwrap()
}
}
_ => tracing::debug!("unsupported event"),
}
});
}
fn broadcast(&self, msg: Box<[u8]>) {
let content_topic = WakuContentTopic::from_str("/waku/2/default-waku/proto").unwrap();
let message = WakuMessage::new(
msg,
content_topic,
1,
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs() as usize,
);
let msg_id = self
.waku
.relay_publish_message(&message, None, None)
.unwrap();
tracing::debug!("sent msg {:?} with id {}", message.payload(), msg_id);
}
}

View File

@ -164,6 +164,18 @@ impl<S: ServiceCore> Relay<S> {
}
}
#[instrument(skip_all, err(Debug))]
pub fn blocking_send(&mut self, message: S::Message) -> Result<(), RelayError> {
if let RelayState::Connected(outbound_relay) = &mut self.state {
outbound_relay
.sender
.blocking_send(message)
.map_err(|_| RelayError::Send)
} else {
Err(RelayError::Disconnected)
}
}
async fn request_relay(&mut self, reply: oneshot::Sender<RelayResult>) {
let relay_command = OverwatchCommand::Relay(RelayCommand {
service_id: S::SERVICE_ID,