From 69a48725ca515661a257974150266356feff803c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Fri, 1 Mar 2024 11:31:00 -0400 Subject: [PATCH] refactor: node handle constructor and messageHash on publish (#98) * refactor: node handle constructor and messageId on publish * refactor: add back typestate * chore: rename messageId to messageHash --- waku-bindings/src/general/mod.rs | 4 +-- waku-bindings/src/lib.rs | 8 +++-- waku-bindings/src/node/events.rs | 19 +++++------ waku-bindings/src/node/mod.rs | 55 ++++++++++++++++++++++---------- waku-bindings/src/node/relay.rs | 6 ++-- waku-bindings/src/utils.rs | 7 ++-- waku-bindings/tests/node.rs | 45 +++++++++++++++----------- waku-sys/vendor | 2 +- 8 files changed, 89 insertions(+), 57 deletions(-) diff --git a/waku-bindings/src/general/mod.rs b/waku-bindings/src/general/mod.rs index 82ca5f7..bd880a7 100644 --- a/waku-bindings/src/general/mod.rs +++ b/waku-bindings/src/general/mod.rs @@ -11,8 +11,8 @@ use sscanf::{scanf, RegexRepresentation}; /// Waku message version pub type WakuMessageVersion = usize; -/// Waku message id, hex encoded sha256 digest of the message -pub type MessageId = String; +/// Waku message hash, hex encoded sha256 digest of the message +pub type MessageHash = String; /// Waku response, just a `Result` with an `String` error. pub type Result = std::result::Result; diff --git a/waku-bindings/src/lib.rs b/waku-bindings/src/lib.rs index e295340..7d268f6 100644 --- a/waku-bindings/src/lib.rs +++ b/waku-bindings/src/lib.rs @@ -12,8 +12,10 @@ mod utils; use rln; pub use node::{ - waku_create_content_topic, waku_new, Event, Key, Multiaddr, PublicKey, SecretKey, - WakuMessageEvent, WakuNodeConfig, WakuNodeHandle, + waku_create_content_topic, waku_new, Event, Initialized, Key, Multiaddr, PublicKey, Running, + SecretKey, WakuMessageEvent, WakuNodeConfig, WakuNodeHandle, }; -pub use general::{Encoding, MessageId, Result, WakuContentTopic, WakuMessage, WakuMessageVersion}; +pub use general::{ + Encoding, MessageHash, Result, WakuContentTopic, WakuMessage, WakuMessageVersion, +}; diff --git a/waku-bindings/src/node/events.rs b/waku-bindings/src/node/events.rs index 8f66c53..fd71b7a 100644 --- a/waku-bindings/src/node/events.rs +++ b/waku-bindings/src/node/events.rs @@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize}; use crate::general::WakuMessage; use crate::node::context::WakuNodeContext; use crate::utils::{get_trampoline, LibwakuResponse}; -use crate::MessageId; +use crate::MessageHash; /// Waku event /// For now just WakuMessage is supported @@ -31,8 +31,8 @@ pub enum Event { pub struct WakuMessageEvent { /// The pubsub topic on which the message was received pub pubsub_topic: String, - /// The message id - pub message_id: MessageId, + /// The message hash + pub message_hash: MessageHash, /// The message in [`WakuMessage`] format pub waku_message: WakuMessage, } @@ -41,13 +41,10 @@ pub struct WakuMessageEvent { /// which are used to react to asynchronous events in Waku pub fn waku_set_event_callback(ctx: &WakuNodeContext, mut f: F) { let cb = |response: LibwakuResponse| { - match response { - LibwakuResponse::Success(v) => { - let data: Event = - serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed"); - f(data); - } - _ => {} // Do nothing + if let LibwakuResponse::Success(v) = response { + let data: Event = + serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed"); + f(data); }; }; @@ -65,7 +62,7 @@ mod tests { #[test] fn deserialize_message_event() { - let s = "{\"eventType\":\"message\",\"messageId\":\"0x26ff3d7fbc950ea2158ce62fd76fd745eee0323c9eac23d0713843b0f04ea27c\",\"pubsubTopic\":\"/waku/2/default-waku/proto\",\"wakuMessage\":{\"payload\":\"SGkgZnJvbSDwn6aAIQ==\",\"contentTopic\":\"/toychat/2/huilong/proto\",\"timestamp\":1665580926660}}"; + let s = "{\"eventType\":\"message\",\"messageHash\":\"0x26ff3d7fbc950ea2158ce62fd76fd745eee0323c9eac23d0713843b0f04ea27c\",\"pubsubTopic\":\"/waku/2/default-waku/proto\",\"wakuMessage\":{\"payload\":\"SGkgZnJvbSDwn6aAIQ==\",\"contentTopic\":\"/toychat/2/huilong/proto\",\"timestamp\":1665580926660}}"; let evt: Event = serde_json::from_str(s).unwrap(); assert!(matches!(evt, Event::WakuMessage(_))); } diff --git a/waku-bindings/src/node/mod.rs b/waku-bindings/src/node/mod.rs index 37269bd..c5d4105 100644 --- a/waku-bindings/src/node/mod.rs +++ b/waku-bindings/src/node/mod.rs @@ -11,31 +11,62 @@ mod relay; pub use aes_gcm::Key; pub use multiaddr::Multiaddr; pub use secp256k1::{PublicKey, SecretKey}; +use std::marker::PhantomData; use std::time::Duration; // internal -use crate::general::{Result, WakuMessage}; +use crate::general::{MessageHash, Result, WakuMessage}; use context::WakuNodeContext; pub use config::WakuNodeConfig; pub use events::{Event, WakuMessageEvent}; pub use relay::waku_create_content_topic; +/// Marker trait to disallow undesired waku node states in the handle +pub trait WakuNodeState {} + +/// Waku node initialized state +pub struct Initialized; + +/// Waku node running state +pub struct Running; + +impl WakuNodeState for Initialized {} +impl WakuNodeState for Running {} + /// Handle to the underliying waku node -pub struct WakuNodeHandle { +pub struct WakuNodeHandle { ctx: WakuNodeContext, + phantom: PhantomData, } -impl WakuNodeHandle { +/// Spawn a new Waku node with the given configuration (default configuration if `None` provided) +/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_newchar-jsonconfig) +pub fn waku_new(config: Option) -> Result> { + Ok(WakuNodeHandle { + ctx: management::waku_new(config)?, + phantom: PhantomData, + }) +} + +impl WakuNodeHandle { /// Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation. /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_start) - pub fn start(&self) -> Result<()> { - management::waku_start(&self.ctx) + pub fn start(self) -> Result> { + management::waku_start(&self.ctx).map(|_| WakuNodeHandle { + ctx: self.ctx, + phantom: PhantomData, + }) } +} +impl WakuNodeHandle { /// Stops a Waku node /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop) - pub fn stop(&self) -> Result<()> { - management::waku_stop(&self.ctx) + pub fn stop(self) -> Result> { + management::waku_stop(&self.ctx).map(|_| WakuNodeHandle { + ctx: self.ctx, + phantom: PhantomData, + }) } /// Get the multiaddresses the Waku node is listening to @@ -66,7 +97,7 @@ impl WakuNodeHandle { message: &WakuMessage, pubsub_topic: &String, timeout: Option, - ) -> Result<()> { + ) -> Result { relay::waku_relay_publish_message(&self.ctx, message, pubsub_topic, timeout) } @@ -84,11 +115,3 @@ impl WakuNodeHandle { events::waku_set_event_callback(&self.ctx, f) } } - -/// Spawn a new Waku node with the given configuration (default configuration if `None` provided) -/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_newchar-jsonconfig) -pub fn waku_new(config: Option) -> Result { - Ok(WakuNodeHandle { - ctx: management::waku_new(config)?, - }) -} diff --git a/waku-bindings/src/node/relay.rs b/waku-bindings/src/node/relay.rs index 629ef85..e42caf4 100644 --- a/waku-bindings/src/node/relay.rs +++ b/waku-bindings/src/node/relay.rs @@ -6,7 +6,7 @@ use std::time::Duration; // crates use libc::*; // internal -use crate::general::{Encoding, Result, WakuContentTopic, WakuMessage}; +use crate::general::{Encoding, MessageHash, Result, WakuContentTopic, WakuMessage}; use crate::node::context::WakuNodeContext; use crate::utils::{get_trampoline, handle_no_response, handle_response, LibwakuResponse}; @@ -62,7 +62,7 @@ pub fn waku_relay_publish_message( message: &WakuMessage, pubsub_topic: &String, timeout: Option, -) -> Result<()> { +) -> Result { let pubsub_topic = pubsub_topic.to_string(); let message_ptr = CString::new( @@ -102,7 +102,7 @@ pub fn waku_relay_publish_message( out }; - handle_no_response(code, result) + handle_response(code, result) } pub fn waku_relay_subscribe(ctx: &WakuNodeContext, pubsub_topic: &String) -> Result<()> { diff --git a/waku-bindings/src/utils.rs b/waku-bindings/src/utils.rs index ac5250a..8e50cba 100644 --- a/waku-bindings/src/utils.rs +++ b/waku-bindings/src/utils.rs @@ -22,7 +22,10 @@ impl TryFrom<(u32, &str)> for LibwakuResponse { let opt_value = Some(response.to_string()).filter(|s| !s.is_empty()); match ret_code { RET_OK => Ok(LibwakuResponse::Success(opt_value)), - RET_ERR => Ok(LibwakuResponse::Failure(format!("waku error: {}", response))), + RET_ERR => Ok(LibwakuResponse::Failure(format!( + "waku error: {}", + response + ))), RET_MISSING_CALLBACK => Ok(LibwakuResponse::MissingCallback), _ => Err(format!("undefined return code {}", ret_code)), } @@ -98,7 +101,7 @@ pub fn handle_response(code: i32, result: LibwakuResponse) -> Result LibwakuResponse::Success(v) => v .unwrap_or_default() .parse() - .map_err(|_| format!("could not parse value")), + .map_err(|_| "could not parse value".into()), LibwakuResponse::Failure(v) => Err(v), LibwakuResponse::MissingCallback => panic!("callback is required"), LibwakuResponse::Undefined => panic!( diff --git a/waku-bindings/tests/node.rs b/waku-bindings/tests/node.rs index 5769063..7d33dfc 100644 --- a/waku-bindings/tests/node.rs +++ b/waku-bindings/tests/node.rs @@ -2,12 +2,12 @@ use secp256k1::SecretKey; use serial_test::serial; use std::str::FromStr; use std::time::{Duration, SystemTime}; -use std::{str::from_utf8}; +use std::{collections::HashSet, str::from_utf8}; use tokio::sync::broadcast::{self, Sender}; use tokio::time; use tokio::time::sleep; use waku_bindings::{ - waku_new, Encoding, Event, MessageId, WakuContentTopic, WakuMessage, WakuNodeConfig, + waku_new, Encoding, Event, MessageHash, Running, WakuContentTopic, WakuMessage, WakuNodeConfig, WakuNodeHandle, }; const ECHO_TIMEOUT: u64 = 10; @@ -15,28 +15,30 @@ const ECHO_MESSAGE: &str = "Hi from 🦀!"; const TEST_PUBSUBTOPIC: &str = "test"; fn try_publish_relay_messages( - node: &WakuNodeHandle, + node: &WakuNodeHandle, msg: &WakuMessage, -) -> Result<(), String> { +) -> Result, String> { let topic = TEST_PUBSUBTOPIC.to_string(); - Ok(node.relay_publish_message(msg, &topic, None)?) + Ok(HashSet::from([ + node.relay_publish_message(msg, &topic, None)? + ])) } #[derive(Debug, Clone)] struct Response { - id: MessageId, + hash: MessageHash, payload: Vec, } -fn set_callback(node: &WakuNodeHandle, tx: Sender) { +fn set_callback(node: &WakuNodeHandle, tx: Sender) { node.set_event_callback(move |event| { if let Event::WakuMessage(message) = event { - let id = message.message_id; + let hash = message.message_hash; let message = message.waku_message; let payload = message.payload.to_vec(); tx.send(Response { - id: id.to_string(), + hash: hash.to_string(), payload, }) .expect("send response to the receiver"); @@ -45,8 +47,8 @@ fn set_callback(node: &WakuNodeHandle, tx: Sender) { } async fn test_echo_messages( - node1: &WakuNodeHandle, - node2: &WakuNodeHandle, + node1: &WakuNodeHandle, + node2: &WakuNodeHandle, content: &'static str, content_topic: WakuContentTopic, ) { @@ -69,11 +71,16 @@ async fn test_echo_messages( let (tx, mut rx) = broadcast::channel(1); set_callback(node2, tx); - try_publish_relay_messages(node1, &message).expect("send relay messages"); - + let mut ids = try_publish_relay_messages(node1, &message).expect("send relay messages"); while let Ok(res) = rx.recv().await { - assert!(!res.id.is_empty()); - from_utf8(&res.payload).expect("should be valid message"); + if ids.take(&res.hash).is_some() { + let msg = from_utf8(&res.payload).expect("should be valid message"); + assert_eq!(content, msg); + } + + if ids.is_empty() { + break; + } } } @@ -89,8 +96,8 @@ async fn default_echo() -> Result<(), String> { ..Default::default() }))?; - node1.start()?; - node2.start()?; + let node1 = node1.start()?; + let node2 = node2.start()?; let addresses1 = node1.listen_addresses()?; node2.connect(&addresses1[0], None)?; @@ -101,7 +108,7 @@ async fn default_echo() -> Result<(), String> { node2.relay_subscribe(&topic)?; // Wait for mesh to form - sleep(Duration::from_secs(5)).await; + sleep(Duration::from_secs(3)).await; let content_topic = WakuContentTopic::new("toychat", "2", "huilong", Encoding::Proto); @@ -136,7 +143,7 @@ fn node_restart() { for _ in 0..3 { let node = waku_new(config.clone().into()).expect("default config should be valid"); - node.start().expect("node should start with valid config"); + let node = node.start().expect("node should start with valid config"); node.stop().expect("node should stop"); } diff --git a/waku-sys/vendor b/waku-sys/vendor index 7aea145..91e3f8c 160000 --- a/waku-sys/vendor +++ b/waku-sys/vendor @@ -1 +1 @@ -Subproject commit 7aea145efe37dd3b8e5d623807016b1c82e97695 +Subproject commit 91e3f8cde61b8ef3ca4332710b9ec9d7bc4fb786