From 24d7dd840c2117634640d6c80ae246bf7164bc0c Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Wed, 14 Feb 2024 16:52:21 -0400 Subject: [PATCH] test: attempt to fix event handler --- waku-bindings/src/general/mod.rs | 10 ----- waku-bindings/src/lib.rs | 6 +-- waku-bindings/src/node/config.rs | 3 +- waku-bindings/src/node/events.rs | 65 +++++++++++------------------- waku-bindings/src/node/mod.rs | 12 +++--- waku-bindings/src/node/relay.rs | 12 +++--- waku-bindings/tests/node.rs | 69 ++++++++++++++++++-------------- waku-sys/vendor | 2 +- 8 files changed, 79 insertions(+), 100 deletions(-) diff --git a/waku-bindings/src/general/mod.rs b/waku-bindings/src/general/mod.rs index d15bb66..33e4b6a 100644 --- a/waku-bindings/src/general/mod.rs +++ b/waku-bindings/src/general/mod.rs @@ -13,8 +13,6 @@ use sscanf::{scanf, RegexRepresentation}; pub type WakuMessageVersion = usize; /// Waku message id, hex encoded sha256 digest of the message pub type MessageId = String; -/// Waku pubsub topic -pub type WakuPubSubTopic = String; /// Waku response, just a `Result` with an `String` error. pub type Result = std::result::Result; @@ -29,7 +27,6 @@ pub struct WakuMessage { payload: Vec, /// The content topic to be set on the message content_topic: WakuContentTopic, - // TODO: check if missing default should be 0 /// The Waku Message version number #[serde(default)] version: WakuMessageVersion, @@ -238,13 +235,6 @@ mod base64_serde { #[cfg(test)] mod tests { use super::*; - use crate::WakuPubSubTopic; - - #[test] - fn parse_waku_topic() { - let s = "/waku/2/default-waku/proto"; - let _: WakuPubSubTopic = s.parse().unwrap(); - } #[test] fn deserialize_waku_message() { diff --git a/waku-bindings/src/lib.rs b/waku-bindings/src/lib.rs index cc40f39..0a2e7be 100644 --- a/waku-bindings/src/lib.rs +++ b/waku-bindings/src/lib.rs @@ -13,9 +13,7 @@ use rln; pub use node::{ waku_create_content_topic, waku_default_pubsub_topic, waku_new, Event, Key, Multiaddr, - PublicKey, SecretKey, Signal, WakuMessageEvent, WakuNodeConfig, WakuNodeHandle, + PublicKey, SecretKey, WakuMessageEvent, WakuNodeConfig, WakuNodeHandle, }; -pub use general::{ - Encoding, MessageId, Result, WakuContentTopic, WakuMessage, WakuMessageVersion, WakuPubSubTopic, -}; +pub use general::{Encoding, MessageId, Result, WakuContentTopic, WakuMessage, WakuMessageVersion}; diff --git a/waku-bindings/src/node/config.rs b/waku-bindings/src/node/config.rs index 426b2ef..8a508ca 100644 --- a/waku-bindings/src/node/config.rs +++ b/waku-bindings/src/node/config.rs @@ -2,7 +2,6 @@ // std // crates -use crate::WakuPubSubTopic; use multiaddr::Multiaddr; use secp256k1::SecretKey; use serde::{Deserialize, Serialize}; @@ -29,7 +28,7 @@ pub struct WakuNodeConfig { /// Enable relay protocol. Default `true` #[default(Some(true))] pub relay: Option, - pub relay_topics: Vec, + pub relay_topics: Vec, // /// Enable store protocol to persist message history // #[default(Some(false))] // pub store: Option, diff --git a/waku-bindings/src/node/events.rs b/waku-bindings/src/node/events.rs index 72634e4..e9fdb46 100644 --- a/waku-bindings/src/node/events.rs +++ b/waku-bindings/src/node/events.rs @@ -2,40 +2,25 @@ //! //! Asynchronous events require a callback to be registered. //! An example of an asynchronous event that might be emitted is receiving a message. -//! When an event is emitted, this callback will be triggered receiving a [`Signal`] +//! When an event is emitted, this callback will be triggered receiving an [`Event`] // std use std::ffi::c_void; // crates use serde::{Deserialize, Serialize}; // internal -use crate::general::{WakuMessage, WakuPubSubTopic}; +use crate::general::WakuMessage; use crate::utils::get_trampoline; use crate::MessageId; use waku_sys::WakuCallBack; -/// Event signal -#[derive(Serialize, Deserialize)] -pub struct Signal { - /// Type of signal being emitted. Currently, only message is available - #[serde(alias = "type")] - _type: String, - /// Format depends on the type of signal - event: Event, -} - -impl Signal { - pub fn event(&self) -> &Event { - &self.event - } -} - /// Waku event /// For now just WakuMessage is supported #[non_exhaustive] #[derive(Serialize, Deserialize)] -#[serde(untagged, rename_all = "camelCase")] +#[serde(tag = "eventType", rename_all = "camelCase")] pub enum Event { + #[serde(rename = "message")] WakuMessage(WakuMessageEvent), Unrecognized(serde_json::Value), } @@ -45,7 +30,7 @@ pub enum Event { #[serde(rename_all = "camelCase")] pub struct WakuMessageEvent { /// The pubsub topic on which the message was received - pubsub_topic: WakuPubSubTopic, + pubsub_topic: String, /// The message id message_id: MessageId, /// The message in [`WakuMessage`] format @@ -53,7 +38,7 @@ pub struct WakuMessageEvent { } impl WakuMessageEvent { - pub fn pubsub_topic(&self) -> &WakuPubSubTopic { + pub fn pubsub_topic(&self) -> &String { &self.pubsub_topic } @@ -66,43 +51,39 @@ impl WakuMessageEvent { } } -/// Wrapper callback, it transformst the `*const c_char` into a [`Signal`] -fn callback(mut f: F) -> WakuCallBack { - let cb = |v: &str| { - let data: Signal = serde_json::from_str(v).expect("Parsing signal to succeed"); +/// Wrapper callback, it transformst the `*const c_char` into an [`Event`] +fn callback(mut f: F) -> WakuCallBack { + let cb = move |v: &str| { + let data: Event = serde_json::from_str(v).expect("Parsing event to succeed"); + println!("EXEC CALLBACK"); f(data); + println!("SUCCESS!"); }; get_trampoline(&cb) } -/// Register callback to act as event handler and receive application signals, +/// Register callback to act as event handler and receive application events, /// which are used to react to asynchronous events in Waku -pub fn waku_set_event_callback(ctx: *mut c_void, f: F) { - // , , f: F +pub fn waku_set_event_callback(ctx: *mut c_void, f: F) { unsafe { waku_sys::waku_set_event_callback(ctx, callback(f), std::ptr::null_mut()) }; } #[cfg(test)] mod tests { - /*use crate::events::waku_set_event_callback; - use crate::{Event, Signal}; + use crate::node::events::callback; + use crate::Event; - // TODO: how to actually send a signal and check if the callback is run? + // TODO: how to actually send an event and check if the callback is run? #[test] - fn set_event_callback() { - waku_set_event_callback(|_signal| {}); + fn set_callback() { + callback(|_event| {}); } #[test] - fn deserialize_signal() { - let s = "{\"type\":\"message\",\"event\":{\"messageId\":\"0x26ff3d7fbc950ea2158ce62fd76fd745eee0323c9eac23d0713843b0f04ea27c\",\"pubsubTopic\":\"/waku/2/default-waku/proto\",\"wakuMessage\":{\"payload\":\"SGkgZnJvbSDwn6aAIQ==\",\"contentTopic\":\"/toychat/2/huilong/proto\",\"timestamp\":1665580926660}}}"; - let _: Signal = serde_json::from_str(s).unwrap(); + fn deserialize_message_event() { + let s = "{\"eventType\":\"message\",\"messageId\":\"0x26ff3d7fbc950ea2158ce62fd76fd745eee0323c9eac23d0713843b0f04ea27c\",\"pubsubTopic\":\"/waku/2/default-waku/proto\",\"wakuMessage\":{\"payload\":\"SGkgZnJvbSDwn6aAIQ==\",\"contentTopic\":\"/toychat/2/huilong/proto\",\"timestamp\":1665580926660}}"; + let evt: Event = serde_json::from_str(s).unwrap(); + assert!(matches!(evt, Event::WakuMessage(_))); } - - #[test] - fn deserialize_event() { - let e = "{\"messageId\":\"0x26ff3d7fbc950ea2158ce62fd76fd745eee0323c9eac23d0713843b0f04ea27c\",\"pubsubTopic\":\"/waku/2/default-waku/proto\",\"wakuMessage\":{\"payload\":\"SGkgZnJvbSDwn6aAIQ==\",\"contentTopic\":\"/toychat/2/huilong/proto\",\"timestamp\":1665580926660}}"; - let _: Event = serde_json::from_str(e).unwrap(); - }*/ } diff --git a/waku-bindings/src/node/mod.rs b/waku-bindings/src/node/mod.rs index 9ed60f2..cd27dd7 100644 --- a/waku-bindings/src/node/mod.rs +++ b/waku-bindings/src/node/mod.rs @@ -15,10 +15,10 @@ use std::time::Duration; use libc::c_void; // internal -use crate::general::{MessageId, Result, WakuMessage, WakuPubSubTopic}; +use crate::general::{MessageId, Result, WakuMessage}; pub use config::WakuNodeConfig; -pub use events::{Event, Signal, WakuMessageEvent}; +pub use events::{Event, WakuMessageEvent}; pub use relay::{waku_create_content_topic, waku_default_pubsub_topic}; /// Handle to the underliying waku node @@ -54,23 +54,23 @@ impl WakuNodeHandle { pub fn relay_publish_message( &self, message: &WakuMessage, - pubsub_topic: &WakuPubSubTopic, + pubsub_topic: &String, timeout: Option, ) -> Result { relay::waku_relay_publish_message(self.ctx, message, pubsub_topic, timeout) } /// Subscribe to WakuRelay to receive messages matching a content filter. - pub fn relay_subscribe(&self, pubsub_topic: &WakuPubSubTopic) -> Result<()> { + pub fn relay_subscribe(&self, pubsub_topic: &String) -> Result<()> { relay::waku_relay_subscribe(self.ctx, pubsub_topic) } /// Closes the pubsub subscription to stop receiving messages matching a content filter. No more messages will be received from this pubsub topic - pub fn relay_unsubscribe(&self, pubsub_topic: &WakuPubSubTopic) -> Result<()> { + pub fn relay_unsubscribe(&self, pubsub_topic: &String) -> Result<()> { relay::waku_relay_unsubscribe(self.ctx, pubsub_topic) } - pub fn set_event_callback(&self, f: F) { + pub fn set_event_callback(&self, f: F) { events::waku_set_event_callback(self.ctx, f) } } diff --git a/waku-bindings/src/node/relay.rs b/waku-bindings/src/node/relay.rs index 427949a..9339e50 100644 --- a/waku-bindings/src/node/relay.rs +++ b/waku-bindings/src/node/relay.rs @@ -6,7 +6,7 @@ use std::time::Duration; // crates use libc::*; // internal -use crate::general::{Encoding, MessageId, Result, WakuContentTopic, WakuMessage, WakuPubSubTopic}; +use crate::general::{Encoding, MessageId, Result, WakuContentTopic, WakuMessage}; use crate::utils::{get_trampoline, handle_json_response, handle_no_response, handle_response}; /// Create a content topic according to [RFC 23](https://rfc.vac.dev/spec/23/) @@ -57,7 +57,7 @@ pub fn waku_create_content_topic( /// Default pubsub topic used for exchanging waku messages defined in [RFC 10](https://rfc.vac.dev/spec/10/) #[allow(clippy::not_unsafe_ptr_arg_deref)] -pub fn waku_default_pubsub_topic(ctx: *mut c_void) -> WakuPubSubTopic { +pub fn waku_default_pubsub_topic(ctx: *mut c_void) -> String { let mut result: String = Default::default(); let result_cb = |v: &str| result = v.to_string(); let code = unsafe { @@ -74,7 +74,7 @@ pub fn waku_default_pubsub_topic(ctx: *mut c_void) -> WakuPubSubTopic { pub fn waku_relay_publish_message( ctx: *mut c_void, message: &WakuMessage, - pubsub_topic: &WakuPubSubTopic, + pubsub_topic: &String, timeout: Option, ) -> Result { let pubsub_topic = pubsub_topic.to_string(); @@ -96,8 +96,8 @@ pub fn waku_relay_publish_message( let cb = get_trampoline(&closure); let out = waku_sys::waku_relay_publish( ctx, - message_ptr, pubsub_topic_ptr, + message_ptr, timeout .map(|duration| { duration @@ -119,7 +119,7 @@ pub fn waku_relay_publish_message( handle_response(code, &result) } -pub fn waku_relay_subscribe(ctx: *mut c_void, pubsub_topic: &WakuPubSubTopic) -> Result<()> { +pub fn waku_relay_subscribe(ctx: *mut c_void, pubsub_topic: &String) -> Result<()> { let pubsub_topic = pubsub_topic.to_string(); let pubsub_topic_ptr = CString::new(pubsub_topic) .expect("CString should build properly from pubsub topic") @@ -145,7 +145,7 @@ pub fn waku_relay_subscribe(ctx: *mut c_void, pubsub_topic: &WakuPubSubTopic) -> handle_no_response(code, &error) } -pub fn waku_relay_unsubscribe(ctx: *mut c_void, pubsub_topic: &WakuPubSubTopic) -> Result<()> { +pub fn waku_relay_unsubscribe(ctx: *mut c_void, pubsub_topic: &String) -> Result<()> { let pubsub_topic = pubsub_topic.to_string(); let pubsub_topic_ptr = CString::new(pubsub_topic) .expect("CString should build properly from pubsub topic") diff --git a/waku-bindings/tests/node.rs b/waku-bindings/tests/node.rs index 90ded78..52603ba 100644 --- a/waku-bindings/tests/node.rs +++ b/waku-bindings/tests/node.rs @@ -1,6 +1,4 @@ -use aes_gcm::{Aes256Gcm, KeyInit}; use multiaddr::Multiaddr; -use rand::thread_rng; use secp256k1::SecretKey; use serial_test::serial; use std::str::FromStr; @@ -9,24 +7,23 @@ use std::{collections::HashSet, str::from_utf8}; use tokio::sync::mpsc::{self, Sender}; use tokio::time; use waku_bindings::{ - waku_new, Encoding, Event, Key, MessageId, WakuContentTopic, WakuMessage, WakuNodeConfig, - WakuNodeHandle, WakuPubSubTopic, + waku_new, Encoding, Event, MessageId, WakuContentTopic, WakuMessage, WakuNodeConfig, + WakuNodeHandle, }; - const ECHO_TIMEOUT: u64 = 10; const ECHO_MESSAGE: &str = "Hi from 🦀!"; -const NODES: &[&str] = &[ - "/dns4/node-01.ac-cn-hongkong-c.wakuv2.test.statusim.net/tcp/30303/p2p/16Uiu2HAkvWiyFsgRhuJEb9JfjYxEkoHLgnUQmr1N5mKWnYjxYRVm", - "/dns4/node-01.do-ams3.wakuv2.test.statusim.net/tcp/30303/p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ", - "/dns4/node-01.gc-us-central1-a.wakuv2.test.statusim.net/tcp/30303/p2p/16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47Rxx5hw3q4YjS" -]; +const NODES: &[&str] = + &["/dns4/node-01.do-ams3.status.prod.statusim.net/tcp/30303/p2p/16Uiu2HAm6HZZr7aToTvEBPpiys4UxajCTU97zj5v7RNR2gbniy1D"]; fn try_publish_relay_messages( node: &WakuNodeHandle, msg: &WakuMessage, ) -> Result, String> { let topic = "test".to_string(); + node.relay_publish_message(msg, &topic, None)?; + node.relay_publish_message(msg, &topic, None)?; + Ok(HashSet::from([ node.relay_publish_message(msg, &topic, None)? ])) @@ -38,12 +35,30 @@ struct Response { payload: Vec, } +fn set_callback(node: &WakuNodeHandle, tx: Sender) { + node.set_event_callback(move |event| { + if let Event::WakuMessage(message) = event { + let id = message.message_id(); + let message = message.waku_message(); + let payload = message.payload().to_vec(); + + println!("==============="); + println!("ID: {}", id); + println!("Sending to channel...."); + futures::executor::block_on(tx.send(Response { + id: id.to_string(), + payload, + })) + .expect("send response to the receiver"); + println!("Sent!"); + } + }); +} + async fn test_echo_messages( node: &WakuNodeHandle, content: &'static str, content_topic: WakuContentTopic, - sk: SecretKey, - ssk: Key, ) { let message = WakuMessage::new( content, @@ -59,25 +74,23 @@ async fn test_echo_messages( false, ); - /* - // let (tx, mut rx) = mpsc::channel(1); - //set_callback(tx, sk, ssk); + let (tx, mut rx) = mpsc::channel(1); + set_callback(node, tx); - let mut ids = try_publish_relay_messages(node, &message).expect("send relay messages"); + let mut ids = try_publish_relay_messages(node, &message).expect("send relay messages"); - while let Some(res) = rx.recv().await { - if ids.take(&res.id).is_some() { - let msg = from_utf8(&res.payload).expect("should be valid message"); - assert_eq!(content, msg); - } + while let Some(res) = rx.recv().await { + if ids.take(&res.id).is_some() { + let msg = from_utf8(&res.payload).expect("should be valid message"); + assert_eq!(content, msg); + } - if ids.is_empty() { - break; - } - }*/ + if ids.is_empty() { + break; + } + } } -#[ignore] #[tokio::test] #[serial] async fn default_echo() -> Result<(), String> { @@ -97,8 +110,6 @@ async fn default_echo() -> Result<(), String> { let address: Multiaddr = node_address.parse().unwrap(); node.connect(&address, None)?; } - let sk = SecretKey::new(&mut thread_rng()); - let ssk = Aes256Gcm::generate_key(&mut thread_rng()); // subscribe to default channel let topic = "test".to_string(); @@ -113,7 +124,7 @@ async fn default_echo() -> Result<(), String> { // Send and receive messages. Waits until all messages received. let got_all = tokio::select! { _ = sleep => false, - _ = test_echo_messages(&node, ECHO_MESSAGE, content_topic, sk, ssk) => true, + _ = test_echo_messages(&node, ECHO_MESSAGE, content_topic) => true, }; assert!(got_all); diff --git a/waku-sys/vendor b/waku-sys/vendor index b64b017..b6be1ac 160000 --- a/waku-sys/vendor +++ b/waku-sys/vendor @@ -1 +1 @@ -Subproject commit b64b017060337dfc427c74c4688e55ca8165caef +Subproject commit b6be1aca8102b614d37bedf2fca8ec59b8b30257