diff --git a/waku-bindings/src/node/events.rs b/waku-bindings/src/node/events.rs index aff8624..8f66c53 100644 --- a/waku-bindings/src/node/events.rs +++ b/waku-bindings/src/node/events.rs @@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize}; // internal use crate::general::WakuMessage; use crate::node::context::WakuNodeContext; -use crate::utils::get_trampoline; +use crate::utils::{get_trampoline, LibwakuResponse}; use crate::MessageId; /// Waku event @@ -40,9 +40,15 @@ pub struct WakuMessageEvent { /// Register callback to act as event handler and receive application events, /// which are used to react to asynchronous events in Waku pub fn waku_set_event_callback(ctx: &WakuNodeContext, mut f: F) { - let cb = |v: &str| { - let data: Event = serde_json::from_str(v).expect("Parsing event to succeed"); - f(data); + let cb = |response: LibwakuResponse| { + match response { + LibwakuResponse::Success(v) => { + let data: Event = + serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed"); + f(data); + } + _ => {} // Do nothing + }; }; unsafe { diff --git a/waku-bindings/src/node/management.rs b/waku-bindings/src/node/management.rs index 9bf779d..7c66986 100644 --- a/waku-bindings/src/node/management.rs +++ b/waku-bindings/src/node/management.rs @@ -9,6 +9,7 @@ use multiaddr::Multiaddr; use super::config::WakuNodeConfig; use crate::general::Result; use crate::node::context::WakuNodeContext; +use crate::utils::LibwakuResponse; use crate::utils::{get_trampoline, handle_json_response, handle_no_response, handle_response}; /// Instantiates a Waku node @@ -23,10 +24,10 @@ pub fn waku_new(config: Option) -> Result { .expect("CString should build properly from the config") .into_raw(); - let mut error: String = Default::default(); - let error_cb = |v: &str| error = v.to_string(); + let mut result: LibwakuResponse = Default::default(); + let result_cb = |r: LibwakuResponse| result = r; let obj_ptr = unsafe { - let mut closure = error_cb; + let mut closure = result_cb; let cb = get_trampoline(&closure); let out = waku_sys::waku_new(config_ptr, cb, &mut closure as *mut _ as *mut c_void); @@ -35,67 +36,67 @@ pub fn waku_new(config: Option) -> Result { out }; - if !error.is_empty() { - Err(error) - } else { - Ok(WakuNodeContext { obj_ptr }) + match result { + LibwakuResponse::MissingCallback => panic!("callback is required"), + LibwakuResponse::Failure(v) => Err(v), + _ => Ok(WakuNodeContext { obj_ptr }), } } /// Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation. /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_start) pub fn waku_start(ctx: &WakuNodeContext) -> Result<()> { - let mut error: String = Default::default(); - let error_cb = |v: &str| error = v.to_string(); + let mut result: LibwakuResponse = Default::default(); + let result_cb = |r: LibwakuResponse| result = r; let code = unsafe { - let mut closure = error_cb; + let mut closure = result_cb; let cb = get_trampoline(&closure); waku_sys::waku_start(ctx.obj_ptr, cb, &mut closure as *mut _ as *mut c_void) }; - handle_no_response(code, &error) + handle_no_response(code, result) } /// Stops a Waku node /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop) pub fn waku_stop(ctx: &WakuNodeContext) -> Result<()> { - let mut error: String = Default::default(); - let error_cb = |v: &str| error = v.to_string(); + let mut result: LibwakuResponse = Default::default(); + let result_cb = |r: LibwakuResponse| result = r; let code = unsafe { - let mut closure = error_cb; + let mut closure = result_cb; let cb = get_trampoline(&closure); waku_sys::waku_stop(ctx.obj_ptr, cb, &mut closure as *mut _ as *mut c_void) }; - handle_no_response(code, &error) + handle_no_response(code, result) } /// nwaku version #[allow(clippy::not_unsafe_ptr_arg_deref)] pub fn waku_version(ctx: &WakuNodeContext) -> Result { - let mut result: String = Default::default(); - let result_cb = |v: &str| result = v.to_string(); + let mut result: LibwakuResponse = Default::default(); + let result_cb = |r: LibwakuResponse| result = r; let code = unsafe { let mut closure = result_cb; let cb = get_trampoline(&closure); waku_sys::waku_version(ctx.obj_ptr, cb, &mut closure as *mut _ as *mut c_void) }; - handle_response(code, &result) + handle_response(code, result) } /// Get the multiaddresses the Waku node is listening to /// as per [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_listen_addresses) pub fn waku_listen_addresses(ctx: &WakuNodeContext) -> Result> { - let mut result: String = Default::default(); - let result_cb = |v: &str| result = v.to_string(); + let mut result: LibwakuResponse = Default::default(); + let result_cb = |r: LibwakuResponse| result = r; let code = unsafe { let mut closure = result_cb; let cb = get_trampoline(&closure); waku_sys::waku_listen_addresses(ctx.obj_ptr, cb, &mut closure as *mut _ as *mut c_void) }; - handle_json_response(code, &result) + handle_json_response(code, result) } #[cfg(test)] diff --git a/waku-bindings/src/node/mod.rs b/waku-bindings/src/node/mod.rs index 20102e1..37269bd 100644 --- a/waku-bindings/src/node/mod.rs +++ b/waku-bindings/src/node/mod.rs @@ -13,7 +13,7 @@ pub use multiaddr::Multiaddr; pub use secp256k1::{PublicKey, SecretKey}; use std::time::Duration; // internal -use crate::general::{MessageId, Result, WakuMessage}; +use crate::general::{Result, WakuMessage}; use context::WakuNodeContext; pub use config::WakuNodeConfig; @@ -66,7 +66,7 @@ impl WakuNodeHandle { message: &WakuMessage, pubsub_topic: &String, timeout: Option, - ) -> Result { + ) -> Result<()> { relay::waku_relay_publish_message(&self.ctx, message, pubsub_topic, timeout) } diff --git a/waku-bindings/src/node/peers.rs b/waku-bindings/src/node/peers.rs index b2d37d0..2fa1087 100644 --- a/waku-bindings/src/node/peers.rs +++ b/waku-bindings/src/node/peers.rs @@ -9,6 +9,7 @@ use multiaddr::Multiaddr; // internal use crate::general::Result; use crate::node::context::WakuNodeContext; +use crate::utils::LibwakuResponse; use crate::utils::{get_trampoline, handle_no_response}; /// Dial peer using a multiaddress @@ -25,10 +26,10 @@ pub fn waku_connect( .expect("CString should build properly from multiaddress") .into_raw(); - let mut error: String = Default::default(); - let error_cb = |v: &str| error = v.to_string(); + let mut result: LibwakuResponse = Default::default(); + let result_cb = |r: LibwakuResponse| result = r; let code = unsafe { - let mut closure = error_cb; + let mut closure = result_cb; let cb = get_trampoline(&closure); let out = waku_sys::waku_connect( ctx.obj_ptr, @@ -45,5 +46,5 @@ pub fn waku_connect( out }; - handle_no_response(code, &error) + handle_no_response(code, result) } diff --git a/waku-bindings/src/node/relay.rs b/waku-bindings/src/node/relay.rs index 2ab8a51..629ef85 100644 --- a/waku-bindings/src/node/relay.rs +++ b/waku-bindings/src/node/relay.rs @@ -6,9 +6,9 @@ use std::time::Duration; // crates use libc::*; // internal -use crate::general::{Encoding, MessageId, Result, WakuContentTopic, WakuMessage}; +use crate::general::{Encoding, Result, WakuContentTopic, WakuMessage}; use crate::node::context::WakuNodeContext; -use crate::utils::{get_trampoline, handle_no_response, handle_response}; +use crate::utils::{get_trampoline, handle_no_response, handle_response, LibwakuResponse}; /// Create a content topic according to [RFC 23](https://rfc.vac.dev/spec/23/) /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_content_topicchar-applicationname-unsigned-int-applicationversion-char-contenttopicname-char-encoding) @@ -30,8 +30,8 @@ pub fn waku_create_content_topic( .expect("Encoding should always transform to CString") .into_raw(); - let mut result: String = Default::default(); - let result_cb = |v: &str| result = v.to_string(); + let mut result: LibwakuResponse = Default::default(); + let result_cb = |r: LibwakuResponse| result = r; let code = unsafe { let mut closure = result_cb; let cb = get_trampoline(&closure); @@ -52,8 +52,7 @@ pub fn waku_create_content_topic( out }; - handle_response::(code, &result) - .expect("&str from result should always be extracted") + handle_response(code, result).expect("&str from result should always be extracted") } /// Publish a message using Waku Relay @@ -63,7 +62,7 @@ pub fn waku_relay_publish_message( message: &WakuMessage, pubsub_topic: &String, timeout: Option, -) -> Result { +) -> Result<()> { let pubsub_topic = pubsub_topic.to_string(); let message_ptr = CString::new( @@ -76,8 +75,8 @@ pub fn waku_relay_publish_message( .expect("CString should build properly from pubsub topic") .into_raw(); - let mut result: String = Default::default(); - let result_cb = |v: &str| result = v.to_string(); + let mut result: LibwakuResponse = Default::default(); + let result_cb = |r: LibwakuResponse| result = r; let code = unsafe { let mut closure = result_cb; let cb = get_trampoline(&closure); @@ -103,7 +102,7 @@ pub fn waku_relay_publish_message( out }; - handle_response(code, &result) + handle_no_response(code, result) } pub fn waku_relay_subscribe(ctx: &WakuNodeContext, pubsub_topic: &String) -> Result<()> { @@ -112,10 +111,10 @@ pub fn waku_relay_subscribe(ctx: &WakuNodeContext, pubsub_topic: &String) -> Res .expect("CString should build properly from pubsub topic") .into_raw(); - let mut error: String = Default::default(); - let error_cb = |v: &str| error = v.to_string(); + let mut result: LibwakuResponse = Default::default(); + let result_cb = |r: LibwakuResponse| result = r; let code = unsafe { - let mut closure = error_cb; + let mut closure = result_cb; let cb = get_trampoline(&closure); let out = waku_sys::waku_relay_subscribe( ctx.obj_ptr, @@ -129,7 +128,7 @@ pub fn waku_relay_subscribe(ctx: &WakuNodeContext, pubsub_topic: &String) -> Res out }; - handle_no_response(code, &error) + handle_no_response(code, result) } pub fn waku_relay_unsubscribe(ctx: &WakuNodeContext, pubsub_topic: &String) -> Result<()> { @@ -138,10 +137,10 @@ pub fn waku_relay_unsubscribe(ctx: &WakuNodeContext, pubsub_topic: &String) -> R .expect("CString should build properly from pubsub topic") .into_raw(); - let mut error: String = Default::default(); - let error_cb = |v: &str| error = v.to_string(); + let mut result: LibwakuResponse = Default::default(); + let result_cb = |r: LibwakuResponse| result = r; let code = unsafe { - let mut closure = error_cb; + let mut closure = result_cb; let cb = get_trampoline(&closure); let out = waku_sys::waku_relay_subscribe( ctx.obj_ptr, @@ -155,5 +154,5 @@ pub fn waku_relay_unsubscribe(ctx: &WakuNodeContext, pubsub_topic: &String) -> R out }; - handle_no_response(code, &error) + handle_no_response(code, result) } diff --git a/waku-bindings/src/utils.rs b/waku-bindings/src/utils.rs index bc2df6a..ac5250a 100644 --- a/waku-bindings/src/utils.rs +++ b/waku-bindings/src/utils.rs @@ -1,22 +1,46 @@ use crate::general::Result; use core::str::FromStr; use serde::de::DeserializeOwned; +use std::convert::TryFrom; use std::{slice, str}; use waku_sys::WakuCallBack; use waku_sys::{RET_ERR, RET_MISSING_CALLBACK, RET_OK}; -pub fn decode(input: &str) -> Result { - serde_json::from_str(input) +#[derive(Debug, Default, PartialEq)] +pub enum LibwakuResponse { + Success(Option), + Failure(String), + MissingCallback, + #[default] + Undefined, +} + +impl TryFrom<(u32, &str)> for LibwakuResponse { + type Error = String; + + fn try_from((ret_code, response): (u32, &str)) -> std::result::Result { + let opt_value = Some(response.to_string()).filter(|s| !s.is_empty()); + match ret_code { + RET_OK => Ok(LibwakuResponse::Success(opt_value)), + RET_ERR => Ok(LibwakuResponse::Failure(format!("waku error: {}", response))), + RET_MISSING_CALLBACK => Ok(LibwakuResponse::MissingCallback), + _ => Err(format!("undefined return code {}", ret_code)), + } + } +} + +pub fn decode(input: String) -> Result { + serde_json::from_str(input.as_str()) .map_err(|err| format!("could not deserialize waku response: {}", err)) } unsafe extern "C" fn trampoline( - _ret_code: ::std::os::raw::c_int, + ret_code: ::std::os::raw::c_int, data: *const ::std::os::raw::c_char, data_len: usize, user_data: *mut ::std::os::raw::c_void, ) where - F: FnMut(&str), + F: FnMut(LibwakuResponse), { let user_data = &mut *(user_data as *mut F); @@ -27,41 +51,59 @@ unsafe extern "C" fn trampoline( .expect("could not retrieve response") }; - user_data(response); + let result = LibwakuResponse::try_from((ret_code as u32, response)) + .expect("invalid response obtained from libwaku"); + + user_data(result); } pub fn get_trampoline(_closure: &F) -> WakuCallBack where - F: FnMut(&str), + F: FnMut(LibwakuResponse), { Some(trampoline::) } -pub fn handle_no_response(code: i32, error: &str) -> Result<()> { - match code as u32 { - RET_OK => Ok(()), - RET_ERR => Err(format!("waku error: {}", error)), - RET_MISSING_CALLBACK => Err("missing callback".to_string()), - _ => Err(format!("undefined return code {}", code)), +pub fn handle_no_response(code: i32, result: LibwakuResponse) -> Result<()> { + if result == LibwakuResponse::Undefined && code as u32 == RET_OK { + // Some functions will only execute the callback on error + return Ok(()); + } + + match result { + LibwakuResponse::Success(_) => Ok(()), + LibwakuResponse::Failure(v) => Err(v), + LibwakuResponse::MissingCallback => panic!("callback is required"), + LibwakuResponse::Undefined => panic!( + "undefined ffi state: code({}) was returned but callback was not executed", + code + ), } } -pub fn handle_json_response(code: i32, result: &str) -> Result { - match code as u32 { - RET_OK => decode(result), - RET_ERR => Err(format!("waku error: {}", result)), - RET_MISSING_CALLBACK => Err("missing callback".to_string()), - _ => Err(format!("undefined return code {}", code)), +pub fn handle_json_response(code: i32, result: LibwakuResponse) -> Result { + match result { + LibwakuResponse::Success(v) => decode(v.unwrap_or_default()), + LibwakuResponse::Failure(v) => Err(v), + LibwakuResponse::MissingCallback => panic!("callback is required"), + LibwakuResponse::Undefined => panic!( + "undefined ffi state: code({}) was returned but callback was not executed", + code + ), } } -pub fn handle_response(code: i32, result: &str) -> Result { - match code as u32 { - RET_OK => result +pub fn handle_response(code: i32, result: LibwakuResponse) -> Result { + match result { + LibwakuResponse::Success(v) => v + .unwrap_or_default() .parse() - .map_err(|_| format!("could not parse value: {}", result)), - RET_ERR => Err(format!("waku error: {}", result)), - RET_MISSING_CALLBACK => Err("missing callback".to_string()), - _ => Err(format!("undefined return code {}", code)), + .map_err(|_| format!("could not parse value")), + LibwakuResponse::Failure(v) => Err(v), + LibwakuResponse::MissingCallback => panic!("callback is required"), + LibwakuResponse::Undefined => panic!( + "undefined ffi state: code({}) was returned but callback was not executed", + code + ), } } diff --git a/waku-bindings/tests/node.rs b/waku-bindings/tests/node.rs index 5b91bbe..5769063 100644 --- a/waku-bindings/tests/node.rs +++ b/waku-bindings/tests/node.rs @@ -2,7 +2,7 @@ use secp256k1::SecretKey; use serial_test::serial; use std::str::FromStr; use std::time::{Duration, SystemTime}; -use std::{collections::HashSet, str::from_utf8}; +use std::{str::from_utf8}; use tokio::sync::broadcast::{self, Sender}; use tokio::time; use tokio::time::sleep; @@ -17,11 +17,9 @@ const TEST_PUBSUBTOPIC: &str = "test"; fn try_publish_relay_messages( node: &WakuNodeHandle, msg: &WakuMessage, -) -> Result, String> { +) -> Result<(), String> { let topic = TEST_PUBSUBTOPIC.to_string(); - Ok(HashSet::from([ - node.relay_publish_message(msg, &topic, None)? - ])) + Ok(node.relay_publish_message(msg, &topic, None)?) } #[derive(Debug, Clone)] @@ -71,17 +69,11 @@ async fn test_echo_messages( let (tx, mut rx) = broadcast::channel(1); set_callback(node2, tx); - let mut ids = try_publish_relay_messages(node1, &message).expect("send relay messages"); + try_publish_relay_messages(node1, &message).expect("send relay messages"); while let Ok(res) = rx.recv().await { - if ids.take(&res.id).is_some() { - let msg = from_utf8(&res.payload).expect("should be valid message"); - assert_eq!(content, msg); - } - - if ids.is_empty() { - break; - } + assert!(!res.id.is_empty()); + from_utf8(&res.payload).expect("should be valid message"); } }