diff --git a/waku-bindings/src/events/mod.rs b/waku-bindings/src/node/events.rs similarity index 83% rename from waku-bindings/src/events/mod.rs rename to waku-bindings/src/node/events.rs index 2222b51..e66750f 100644 --- a/waku-bindings/src/events/mod.rs +++ b/waku-bindings/src/node/events.rs @@ -13,7 +13,9 @@ use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; // internal use crate::general::{WakuMessage, WakuPubSubTopic}; +use crate::utils::get_trampoline; use crate::MessageId; +use waku_sys::WakuCallBack; /// Event signal #[derive(Serialize, Deserialize)] @@ -67,31 +69,28 @@ impl WakuMessageEvent { } } - /// Wrapper callback, it transformst the `*const c_char` into a [`Signal`] /// and executes the [`CALLBACK`] funtion with it -extern "C" fn callback(_ret_code: c_int, data: *const c_char, _user_data: *mut c_void) { - let raw_response = unsafe { CStr::from_ptr(data) } - .to_str() - .expect("Not null ptr"); - let data: Signal = serde_json::from_str(raw_response).expect("Parsing signal to succeed"); - (CALLBACK - .deref() - .lock() - .expect("Access to the shared callback") - .as_mut())(data) +fn callback() -> WakuCallBack { + let cb = |v: &str| { + print!("{}", v); + let data: Signal = serde_json::from_str(v).expect("Parsing signal to succeed"); + }; + + let mut closure = cb; + get_trampoline(&closure) } /// Register callback to act as event handler and receive application signals, /// which are used to react to asynchronous events in Waku -pub fn waku_set_event_callback(f: F) { - // TODO: - unsafe { waku_sys::waku_set_event_callback(Some(callback)) }; +pub fn waku_set_event_callback(ctx: *mut c_void) { + // , , f: F + unsafe { waku_sys::waku_set_event_callback(ctx, callback(), std::ptr::null_mut()) }; } #[cfg(test)] mod tests { - use crate::events::waku_set_event_callback; + /*use crate::events::waku_set_event_callback; use crate::{Event, Signal}; // TODO: how to actually send a signal and check if the callback is run? @@ -110,5 +109,5 @@ mod tests { fn deserialize_event() { let e = "{\"messageId\":\"0x26ff3d7fbc950ea2158ce62fd76fd745eee0323c9eac23d0713843b0f04ea27c\",\"pubsubTopic\":\"/waku/2/default-waku/proto\",\"wakuMessage\":{\"payload\":\"SGkgZnJvbSDwn6aAIQ==\",\"contentTopic\":\"/toychat/2/huilong/proto\",\"timestamp\":1665580926660}}"; let _: Event = serde_json::from_str(e).unwrap(); - } + }*/ } diff --git a/waku-bindings/src/node/management.rs b/waku-bindings/src/node/management.rs index d25cf4b..3f5a914 100644 --- a/waku-bindings/src/node/management.rs +++ b/waku-bindings/src/node/management.rs @@ -2,6 +2,7 @@ // std use std::ffi::CString; +use std::ptr; // crates use libc::c_void; // internal @@ -33,6 +34,12 @@ pub fn waku_new(config: Option) -> Result<*mut c_void> { out }; + // TODO: create error handler function, format of err message is + // {"message":"The actual message","eventType":"error"} + if error != "" { + return Err(error); + } + Ok(node_ptr) } @@ -68,12 +75,24 @@ pub fn waku_stop(ctx: *mut c_void) -> Result<()> { mod test { use super::waku_new; use crate::node::management::{waku_start, waku_stop}; + use crate::WakuNodeConfig; + use secp256k1::SecretKey; use serial_test::serial; + use std::str::FromStr; #[test] #[serial] fn waku_flow() { - let node = waku_new(None).unwrap(); + let node = waku_new(Some(WakuNodeConfig { + node_key: Some( + SecretKey::from_str( + "05f381866cc21f6c1e2e80e07fa732008e36d942dce3206ad6dcd6793c98d609", + ) + .unwrap(), + ), // TODO: consider making this optional + ..Default::default() + })) + .unwrap(); waku_start(node).unwrap(); waku_stop(node).unwrap(); } diff --git a/waku-bindings/src/node/mod.rs b/waku-bindings/src/node/mod.rs index e4f8c51..f18c86e 100644 --- a/waku-bindings/src/node/mod.rs +++ b/waku-bindings/src/node/mod.rs @@ -1,6 +1,7 @@ //! Waku node implementation mod config; +mod events; mod management; mod peers; mod relay; @@ -17,6 +18,7 @@ use libc::c_void; use crate::general::{MessageId, Result, WakuMessage, WakuPubSubTopic}; pub use config::WakuNodeConfig; +pub use events::{Event, Signal, WakuMessageEvent}; pub use relay::{waku_create_content_topic, waku_default_pubsub_topic}; /// Handle to the underliying waku node @@ -27,13 +29,13 @@ pub struct WakuNodeHandle { impl WakuNodeHandle { /// Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation. /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_start) - pub fn start(self) -> Result<()> { + pub fn start(&self) -> Result<()> { management::waku_start(self.ctx) } /// Stops a Waku node /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop) - pub fn stop(self) -> Result<()> { + pub fn stop(&self) -> Result<()> { management::waku_stop(self.ctx) } @@ -67,6 +69,10 @@ impl WakuNodeHandle { pub fn relay_unsubscribe(&self, pubsub_topic: &WakuPubSubTopic) -> Result<()> { relay::waku_relay_unsubscribe(self.ctx, pubsub_topic) } + + pub fn set_event_callback(&self) { + events::waku_set_event_callback(self.ctx) + } } /// Spawn a new Waku node with the given configuration (default configuration if `None` provided) diff --git a/waku-bindings/tests/node.rs b/waku-bindings/tests/node.rs index d4b3eb2..90ded78 100644 --- a/waku-bindings/tests/node.rs +++ b/waku-bindings/tests/node.rs @@ -3,13 +3,14 @@ use multiaddr::Multiaddr; use rand::thread_rng; use secp256k1::SecretKey; use serial_test::serial; +use std::str::FromStr; use std::time::{Duration, SystemTime}; use std::{collections::HashSet, str::from_utf8}; use tokio::sync::mpsc::{self, Sender}; use tokio::time; use waku_bindings::{ - waku_new, waku_set_event_callback, Encoding, Event, Key, MessageId, WakuContentTopic, - WakuMessage, WakuNodeConfig, WakuNodeHandle, WakuPubSubTopic, + waku_new, Encoding, Event, Key, MessageId, WakuContentTopic, WakuMessage, WakuNodeConfig, + WakuNodeHandle, WakuPubSubTopic, }; const ECHO_TIMEOUT: u64 = 10; @@ -25,9 +26,10 @@ fn try_publish_relay_messages( node: &WakuNodeHandle, msg: &WakuMessage, ) -> Result, String> { - Ok(HashSet::from( - [node.relay_publish_message(msg, None, None)?], - )) + let topic = "test".to_string(); + Ok(HashSet::from([ + node.relay_publish_message(msg, &topic, None)? + ])) } #[derive(Debug)] @@ -36,23 +38,6 @@ struct Response { payload: Vec, } -fn set_callback(tx: Sender, sk: SecretKey, ssk: Key) { - waku_set_event_callback(move |signal| { - if let Event::WakuMessage(message) = signal.event() { - let id = message.message_id(); - let message = message.waku_message(); - - let payload = message.payload().to_vec(); - - futures::executor::block_on(tx.send(Response { - id: id.to_string(), - payload, - })) - .expect("send response to the receiver"); - } - }); -} - async fn test_echo_messages( node: &WakuNodeHandle, content: &'static str, @@ -74,21 +59,22 @@ async fn test_echo_messages( false, ); - let (tx, mut rx) = mpsc::channel(1); - set_callback(tx, sk, ssk); + /* + // let (tx, mut rx) = mpsc::channel(1); + //set_callback(tx, sk, ssk); - let mut ids = try_publish_relay_messages(node, &message).expect("send relay messages"); + let mut ids = try_publish_relay_messages(node, &message).expect("send relay messages"); - while let Some(res) = rx.recv().await { - if ids.take(&res.id).is_some() { - let msg = from_utf8(&res.payload).expect("should be valid message"); - assert_eq!(content, msg); - } + while let Some(res) = rx.recv().await { + if ids.take(&res.id).is_some() { + let msg = from_utf8(&res.payload).expect("should be valid message"); + assert_eq!(content, msg); + } - if ids.is_empty() { - break; - } - } + if ids.is_empty() { + break; + } + }*/ } #[ignore] @@ -96,6 +82,10 @@ async fn test_echo_messages( #[serial] async fn default_echo() -> Result<(), String> { let config = WakuNodeConfig { + node_key: Some( + SecretKey::from_str("05f381866cc21f6c1e2e80e07fa732008e36d942dce3206ad6dcd6793c98d609") + .unwrap(), + ), // TODO: consider making this optional ..Default::default() }; @@ -111,7 +101,9 @@ async fn default_echo() -> Result<(), String> { let ssk = Aes256Gcm::generate_key(&mut thread_rng()); // subscribe to default channel - node.relay_subscribe(&content_filter)?; + let topic = "test".to_string(); + + node.relay_subscribe(&topic)?; let content_topic = WakuContentTopic::new("toychat", "2", "huilong", Encoding::Proto); @@ -133,7 +125,13 @@ async fn default_echo() -> Result<(), String> { #[test] #[serial] fn node_restart() { - let config = WakuNodeConfig::default(); + let config = WakuNodeConfig { + node_key: Some( + SecretKey::from_str("05f381866cc21f6c1e2e80e07fa732008e36d942dce3206ad6dcd6793c98d609") + .unwrap(), + ), // TODO: consider making this optional + ..Default::default() + }; for _ in 0..3 { let node = waku_new(config.clone().into()).expect("default config should be valid"); diff --git a/waku-sys/build.rs b/waku-sys/build.rs index 43edfef..e3cc837 100644 --- a/waku-sys/build.rs +++ b/waku-sys/build.rs @@ -40,6 +40,7 @@ fn generate_bindgen_code(project_dir: &Path) { println!("cargo:rustc-link-lib=static=backtrace"); // TODO: Determine if pthread is automatically included + // TODO: Test in other architectures // Generate waku bindings with bindgen let bindings = bindgen::Builder::default()