diff --git a/waku-bindings/src/general/contenttopic.rs b/waku-bindings/src/general/contenttopic.rs index ad987be..f8d8d9f 100644 --- a/waku-bindings/src/general/contenttopic.rs +++ b/waku-bindings/src/general/contenttopic.rs @@ -1,6 +1,6 @@ // std +use crate::general::waku_decode::WakuDecode; use crate::general::Result; -use crate::utils::WakuDecode; use std::borrow::Cow; use std::fmt::{Display, Formatter}; use std::str::FromStr; diff --git a/waku-bindings/src/utils.rs b/waku-bindings/src/general/libwaku_response.rs similarity index 62% rename from waku-bindings/src/utils.rs rename to waku-bindings/src/general/libwaku_response.rs index 3a6e606..d73953e 100644 --- a/waku-bindings/src/utils.rs +++ b/waku-bindings/src/general/libwaku_response.rs @@ -1,7 +1,7 @@ +use crate::general::waku_decode::WakuDecode; use crate::general::Result; use std::convert::TryFrom; -use std::{slice, str}; -use waku_sys::WakuCallBack; +use std::str; use waku_sys::{RET_ERR, RET_MISSING_CALLBACK, RET_OK}; #[derive(Debug, Clone, Default, PartialEq)] @@ -30,51 +30,8 @@ impl TryFrom<(u32, &str)> for LibwakuResponse { } } -// Define the WakuDecode trait -pub trait WakuDecode: Sized { - fn decode(input: &str) -> Result; -} - -impl WakuDecode for String { - fn decode(input: &str) -> Result { - Ok(input.to_string()) - } -} - -pub fn decode(input: String) -> Result { - T::decode(input.as_str()) -} - -unsafe extern "C" fn trampoline( - ret_code: ::std::os::raw::c_int, - data: *const ::std::os::raw::c_char, - data_len: usize, - user_data: *mut ::std::os::raw::c_void, -) where - F: FnMut(LibwakuResponse), -{ - let closure = &mut *(user_data as *mut F); - - let response = if data.is_null() { - "" - } else { - str::from_utf8(slice::from_raw_parts(data as *mut u8, data_len)) - .expect("could not retrieve response") - }; - - let result = LibwakuResponse::try_from((ret_code as u32, response)) - .expect("invalid response obtained from libwaku"); - - closure(result); -} - -pub fn get_trampoline(_closure: &F) -> WakuCallBack -where - F: FnMut(LibwakuResponse), -{ - Some(trampoline::) -} - +/// Used in cases where the FFI call doesn't return additional infomation in the +/// callback. Instead, it returns RET_OK, RET_ERR, etc. pub fn handle_no_response(code: i32, result: LibwakuResponse) -> Result<()> { if result == LibwakuResponse::Undefined && code as u32 == RET_OK { // Some functions will only execute the callback on error @@ -92,9 +49,11 @@ pub fn handle_no_response(code: i32, result: LibwakuResponse) -> Result<()> { } } +/// Used in cases where the FFI function returns a code (RET_OK, RET_ERR, etc) plus additional +/// information, i.e. LibwakuResponse pub fn handle_response(code: i32, result: LibwakuResponse) -> Result { match result { - LibwakuResponse::Success(v) => decode(v.unwrap_or_default()), + LibwakuResponse::Success(v) => WakuDecode::decode(&v.unwrap_or_default()), LibwakuResponse::Failure(v) => Err(v), LibwakuResponse::MissingCallback => panic!("callback is required"), LibwakuResponse::Undefined => panic!( diff --git a/waku-bindings/src/general/messagehash.rs b/waku-bindings/src/general/messagehash.rs index 0f07a22..a3dce99 100644 --- a/waku-bindings/src/general/messagehash.rs +++ b/waku-bindings/src/general/messagehash.rs @@ -1,4 +1,4 @@ -use crate::utils::WakuDecode; +use crate::general::waku_decode::WakuDecode; use hex::FromHex; use serde::{Deserialize, Deserializer, Serialize}; use std::convert::TryInto; diff --git a/waku-bindings/src/general/mod.rs b/waku-bindings/src/general/mod.rs index c63db6b..011178c 100644 --- a/waku-bindings/src/general/mod.rs +++ b/waku-bindings/src/general/mod.rs @@ -1,8 +1,10 @@ //! Waku [general](https://rfc.vac.dev/spec/36/#general) types pub mod contenttopic; +pub mod libwaku_response; pub mod messagehash; pub mod pubsubtopic; +pub mod waku_decode; // crates use contenttopic::WakuContentTopic; diff --git a/waku-bindings/src/general/waku_decode.rs b/waku-bindings/src/general/waku_decode.rs new file mode 100644 index 0000000..892e37f --- /dev/null +++ b/waku-bindings/src/general/waku_decode.rs @@ -0,0 +1,26 @@ +use crate::general::Result; +use multiaddr::Multiaddr; +// Define the WakuDecode trait +pub trait WakuDecode: Sized { + fn decode(input: &str) -> Result; +} + +impl WakuDecode for String { + fn decode(input: &str) -> Result { + Ok(input.to_string()) + } +} + +pub fn decode(input: String) -> Result { + T::decode(input.as_str()) +} + +impl WakuDecode for Vec { + fn decode(input: &str) -> Result { + input + .split(',') + .map(|s| s.trim().parse::().map_err(|err| err.to_string())) + .collect::>>() // Collect results into a Vec + .map_err(|err| format!("could not parse Multiaddr: {}", err)) + } +} diff --git a/waku-bindings/src/lib.rs b/waku-bindings/src/lib.rs index 03d2e45..ca01ae9 100644 --- a/waku-bindings/src/lib.rs +++ b/waku-bindings/src/lib.rs @@ -2,11 +2,11 @@ //! //! Implementation on top of [`waku-bindings`](https://rfc.vac.dev/spec/36/) pub mod general; +mod macros; pub mod node; -pub mod utils; // Re-export the LibwakuResponse type to make it accessible outside this module -pub use utils::LibwakuResponse; +pub use general::libwaku_response::LibwakuResponse; // Required so functions inside libwaku can call RLN functions even if we // use it within the bindings functions diff --git a/waku-bindings/src/macros.rs b/waku-bindings/src/macros.rs new file mode 100644 index 0000000..b1440d6 --- /dev/null +++ b/waku-bindings/src/macros.rs @@ -0,0 +1,73 @@ +use crate::general::libwaku_response::LibwakuResponse; + +use std::{slice, str}; +use waku_sys::WakuCallBack; + +unsafe extern "C" fn trampoline( + ret_code: ::std::os::raw::c_int, + data: *const ::std::os::raw::c_char, + data_len: usize, + user_data: *mut ::std::os::raw::c_void, +) where + F: FnMut(LibwakuResponse), +{ + let closure = &mut *(user_data as *mut F); + + let response = if data.is_null() { + "" + } else { + str::from_utf8(slice::from_raw_parts(data as *mut u8, data_len)) + .expect("could not retrieve response") + }; + + let result = LibwakuResponse::try_from((ret_code as u32, response)) + .expect("invalid response obtained from libwaku"); + + closure(result); +} + +pub fn get_trampoline(_closure: &F) -> WakuCallBack +where + F: FnMut(LibwakuResponse), +{ + Some(trampoline::) +} + +#[macro_export] +macro_rules! handle_ffi_call { + // Case: With or without additional arguments + ($waku_fn:expr, $resp_hndlr:expr, $ctx:expr $(, $($arg:expr),*)?) => {{ + use crate::macros::get_trampoline; + use std::sync::Arc; + use tokio::sync::Notify; + use libc::*; + + let mut result = LibwakuResponse::default(); + let notify = Arc::new(Notify::new()); + let notify_clone = notify.clone(); + + // Callback to update the result and notify the waiter + let result_cb = |r: LibwakuResponse| { + result = r; + notify_clone.notify_one(); + }; + + // Create trampoline and invoke the `waku_sys` function + let code = unsafe { + let mut closure = result_cb; + let cb = get_trampoline(&closure); + $waku_fn( + $ctx, // Pass the context + $($($arg),*,)? // Expand the variadic arguments if provided + cb, // Pass the callback trampoline + &mut closure as *mut _ as *mut c_void + ) + }; + + // Wait for the callback to notify us + notify.notified().await; + + // Handle the response + $resp_hndlr(code, result) + }}; +} diff --git a/waku-bindings/src/node/context.rs b/waku-bindings/src/node/context.rs index 5305a4a..87afae5 100644 --- a/waku-bindings/src/node/context.rs +++ b/waku-bindings/src/node/context.rs @@ -2,7 +2,8 @@ use std::ffi::c_void; use std::ptr::null_mut; use std::sync::{Arc, Mutex}; -use crate::utils::{get_trampoline, LibwakuResponse}; +use crate::general::libwaku_response::LibwakuResponse; +use crate::macros::get_trampoline; type LibwakuResponseClosure = dyn FnMut(LibwakuResponse) + Send + Sync; diff --git a/waku-bindings/src/node/filter.rs b/waku-bindings/src/node/filter.rs index d9f45ab..0440bba 100644 --- a/waku-bindings/src/node/filter.rs +++ b/waku-bindings/src/node/filter.rs @@ -8,10 +8,11 @@ use std::sync::Arc; use tokio::sync::Notify; // internal use crate::general::contenttopic::WakuContentTopic; +use crate::general::libwaku_response::{handle_no_response, LibwakuResponse}; use crate::general::pubsubtopic::PubsubTopic; use crate::general::Result; +use crate::macros::get_trampoline; use crate::node::context::WakuNodeContext; -use crate::utils::{get_trampoline, handle_no_response, LibwakuResponse}; pub async fn waku_filter_subscribe( ctx: &WakuNodeContext, diff --git a/waku-bindings/src/node/lightpush.rs b/waku-bindings/src/node/lightpush.rs index 0161a96..060a152 100644 --- a/waku-bindings/src/node/lightpush.rs +++ b/waku-bindings/src/node/lightpush.rs @@ -3,13 +3,11 @@ // std use std::ffi::CString; // crates -use libc::*; -use std::sync::Arc; -use tokio::sync::Notify; // internal +use crate::general::libwaku_response::{handle_response, LibwakuResponse}; use crate::general::{messagehash::MessageHash, Result, WakuMessage}; +use crate::handle_ffi_call; use crate::node::context::WakuNodeContext; -use crate::utils::{get_trampoline, handle_response, LibwakuResponse}; use crate::general::pubsubtopic::PubsubTopic; @@ -23,32 +21,15 @@ pub async fn waku_lightpush_publish_message( .expect("WakuMessages should always be able to success serializing"), ) .expect("CString should build properly from the serialized waku message"); - let message_ptr = message.as_ptr(); let pubsub_topic = CString::new(String::from(pubsub_topic)) .expect("CString should build properly from pubsub topic"); - let pubsub_topic_ptr = pubsub_topic.as_ptr(); - let mut result = LibwakuResponse::default(); - let notify = Arc::new(Notify::new()); - let notify_clone = notify.clone(); - let result_cb = |r: LibwakuResponse| { - result = r; - notify_clone.notify_one(); // Notify that the value has been updated - }; - - let code = unsafe { - let mut closure = result_cb; - let cb = get_trampoline(&closure); - waku_sys::waku_lightpush_publish( - ctx.get_ptr(), - pubsub_topic_ptr, - message_ptr, - cb, - &mut closure as *mut _ as *mut c_void, - ) - }; - - notify.notified().await; // Wait until a result is received - handle_response(code, result) + handle_ffi_call!( + waku_sys::waku_lightpush_publish, + handle_response, + ctx.get_ptr(), + pubsub_topic.as_ptr(), + message.as_ptr() + ) } diff --git a/waku-bindings/src/node/management.rs b/waku-bindings/src/node/management.rs index 107473f..cad28f4 100644 --- a/waku-bindings/src/node/management.rs +++ b/waku-bindings/src/node/management.rs @@ -9,11 +9,11 @@ use std::sync::Arc; use tokio::sync::Notify; // internal use super::config::WakuNodeConfig; +use crate::general::libwaku_response::{handle_no_response, handle_response, LibwakuResponse}; use crate::general::Result; +use crate::handle_ffi_call; +use crate::macros::get_trampoline; use crate::node::context::WakuNodeContext; -use crate::utils::LibwakuResponse; -use crate::utils::WakuDecode; -use crate::utils::{get_trampoline, handle_no_response, handle_response}; /// Instantiates a Waku node /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_newchar-jsonconfig) @@ -49,113 +49,34 @@ pub async fn waku_new(config: Option) -> Result } pub async fn waku_destroy(ctx: &WakuNodeContext) -> Result<()> { - let mut result = LibwakuResponse::default(); - let notify = Arc::new(Notify::new()); - let notify_clone = notify.clone(); - let result_cb = |r: LibwakuResponse| { - result = r; - notify_clone.notify_one(); // Notify that the value has been updated - }; - let code = unsafe { - let mut closure = result_cb; - let cb = get_trampoline(&closure); - waku_sys::waku_destroy(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void) - }; - notify.notified().await; // Wait until a result is received - - handle_no_response(code, result) + handle_ffi_call!(waku_sys::waku_destroy, handle_no_response, ctx.get_ptr()) } /// Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation. /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_start) pub async fn waku_start(ctx: &WakuNodeContext) -> Result<()> { - let mut result = LibwakuResponse::default(); - let notify = Arc::new(Notify::new()); - let notify_clone = notify.clone(); - let result_cb = |r: LibwakuResponse| { - result = r; - notify_clone.notify_one(); // Notify that the value has been updated - }; - let code = unsafe { - let mut closure = result_cb; - let cb = get_trampoline(&closure); - waku_sys::waku_start(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void) - }; - - notify.notified().await; // Wait until a result is received - handle_no_response(code, result) + handle_ffi_call!(waku_sys::waku_start, handle_no_response, ctx.get_ptr()) } /// Stops a Waku node /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop) pub async fn waku_stop(ctx: &WakuNodeContext) -> Result<()> { - let mut result = LibwakuResponse::default(); - let notify = Arc::new(Notify::new()); - let notify_clone = notify.clone(); - let result_cb = |r: LibwakuResponse| { - result = r; - notify_clone.notify_one(); // Notify that the value has been updated - }; - let code = unsafe { - let mut closure = result_cb; - let cb = get_trampoline(&closure); - waku_sys::waku_stop(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void) - }; - - notify.notified().await; // Wait until a result is received - handle_no_response(code, result) + handle_ffi_call!(waku_sys::waku_stop, handle_no_response, ctx.get_ptr()) } /// nwaku version -#[allow(clippy::not_unsafe_ptr_arg_deref)] pub async fn waku_version(ctx: &WakuNodeContext) -> Result { - let mut result = LibwakuResponse::default(); - let notify = Arc::new(Notify::new()); - let notify_clone = notify.clone(); - let result_cb = |r: LibwakuResponse| { - result = r; - notify_clone.notify_one(); // Notify that the value has been updated - }; - let code = unsafe { - let mut closure = result_cb; - let cb = get_trampoline(&closure); - waku_sys::waku_version(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void) - }; - - notify.notified().await; // Wait until a result is received - handle_response(code, result) -} - -// Implement WakuDecode for Vec -impl WakuDecode for Vec { - fn decode(input: &str) -> Result { - input - .split(',') - .map(|s| s.trim().parse::().map_err(|err| err.to_string())) - .collect::>>() // Collect results into a Vec - .map_err(|err| format!("could not parse Multiaddr: {}", err)) - } + handle_ffi_call!(waku_sys::waku_version, handle_response, ctx.get_ptr()) } /// Get the multiaddresses the Waku node is listening to /// as per [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_listen_addresses) pub async fn waku_listen_addresses(ctx: &WakuNodeContext) -> Result> { - let mut result = LibwakuResponse::default(); - let notify = Arc::new(Notify::new()); - let notify_clone = notify.clone(); - let result_cb = |r: LibwakuResponse| { - result = r; - notify_clone.notify_one(); // Notify that the value has been updated - }; - - let code = unsafe { - let mut closure = result_cb; - let cb = get_trampoline(&closure); - waku_sys::waku_listen_addresses(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void) - }; - - notify.notified().await; // Wait until a result is received - handle_response(code, result) + handle_ffi_call!( + waku_sys::waku_listen_addresses, + handle_response, + ctx.get_ptr() + ) } #[cfg(test)] diff --git a/waku-bindings/src/node/mod.rs b/waku-bindings/src/node/mod.rs index 00c858b..5ab9238 100644 --- a/waku-bindings/src/node/mod.rs +++ b/waku-bindings/src/node/mod.rs @@ -20,9 +20,9 @@ use std::time::Duration; use store::StoreWakuMessageResponse; // internal use crate::general::contenttopic::{Encoding, WakuContentTopic}; +use crate::general::libwaku_response::LibwakuResponse; pub use crate::general::pubsubtopic::PubsubTopic; use crate::general::{messagehash::MessageHash, Result, WakuMessage}; -use crate::utils::LibwakuResponse; use crate::node::context::WakuNodeContext; pub use config::RLNConfig; diff --git a/waku-bindings/src/node/peers.rs b/waku-bindings/src/node/peers.rs index 450a443..7b7d875 100644 --- a/waku-bindings/src/node/peers.rs +++ b/waku-bindings/src/node/peers.rs @@ -4,15 +4,12 @@ use std::ffi::CString; use std::time::Duration; // crates -use libc::*; use multiaddr::Multiaddr; -use std::sync::Arc; -use tokio::sync::Notify; // internal +use crate::general::libwaku_response::{handle_no_response, LibwakuResponse}; use crate::general::Result; +use crate::handle_ffi_call; use crate::node::context::WakuNodeContext; -use crate::utils::LibwakuResponse; -use crate::utils::{get_trampoline, handle_no_response}; /// Dial peer using a multiaddress /// If `timeout` as milliseconds doesn't fit into a `i32` it is clamped to [`i32::MAX`] @@ -27,29 +24,13 @@ pub async fn waku_connect( let address = CString::new(address.to_string()).expect("CString should build properly from multiaddress"); - let address_ptr = address.as_ptr(); - - let mut result = LibwakuResponse::default(); - let notify = Arc::new(Notify::new()); - let notify_clone = notify.clone(); - let result_cb = |r: LibwakuResponse| { - result = r; - notify_clone.notify_one(); // Notify that the value has been updated - }; - let code = unsafe { - let mut closure = result_cb; - let cb = get_trampoline(&closure); - waku_sys::waku_connect( - ctx.get_ptr(), - address_ptr, - timeout - .map(|duration| duration.as_millis().try_into().unwrap_or(u32::MAX)) - .unwrap_or(0), - cb, - &mut closure as *mut _ as *mut c_void, - ) - }; - - notify.notified().await; // Wait until a result is received - handle_no_response(code, result) + handle_ffi_call!( + waku_sys::waku_connect, + handle_no_response, + ctx.get_ptr(), + address.as_ptr(), + timeout + .map(|duration| duration.as_millis().try_into().unwrap_or(u32::MAX)) + .unwrap_or(0) + ) } diff --git a/waku-bindings/src/node/relay.rs b/waku-bindings/src/node/relay.rs index 738b3ab..c271357 100644 --- a/waku-bindings/src/node/relay.rs +++ b/waku-bindings/src/node/relay.rs @@ -2,17 +2,14 @@ // std use std::ffi::CString; -use std::sync::Arc; use std::time::Duration; -use tokio::sync::Notify; -// crates -use libc::*; // internal use crate::general::contenttopic::{Encoding, WakuContentTopic}; +use crate::general::libwaku_response::{handle_no_response, handle_response, LibwakuResponse}; use crate::general::pubsubtopic::PubsubTopic; use crate::general::{messagehash::MessageHash, Result, WakuMessage}; +use crate::handle_ffi_call; use crate::node::context::WakuNodeContext; -use crate::utils::{get_trampoline, handle_no_response, handle_response, LibwakuResponse}; /// Create a content topic according to [RFC 23](https://rfc.vac.dev/spec/23/) /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_content_topicchar-applicationname-unsigned-int-applicationversion-char-contenttopicname-char-encoding) @@ -23,42 +20,23 @@ pub async fn waku_create_content_topic( application_version: u32, content_topic_name: &str, encoding: Encoding, -) -> WakuContentTopic { +) -> Result { let application_name = CString::new(application_name) .expect("Application name should always transform to CString"); - let application_name_ptr = application_name.as_ptr(); - let content_topic_name = CString::new(content_topic_name).expect("Content topic should always transform to CString"); - let content_topic_name_ptr = content_topic_name.as_ptr(); - let encoding = CString::new(encoding.to_string()).expect("Encoding should always transform to CString"); - let encoding_ptr = encoding.as_ptr(); - let mut result = LibwakuResponse::default(); - let notify = Arc::new(Notify::new()); - let notify_clone = notify.clone(); - let result_cb = |r: LibwakuResponse| { - result = r; - notify_clone.notify_one(); // Notify that the value has been updated - }; - let code = unsafe { - let mut closure = result_cb; - let cb = get_trampoline(&closure); - waku_sys::waku_content_topic( - ctx.get_ptr(), - application_name_ptr, - application_version, - content_topic_name_ptr, - encoding_ptr, - cb, - &mut closure as *mut _ as *mut c_void, - ) - }; - - notify.notified().await; // Wait until a result is received - handle_response(code, result).expect("&str from result should always be extracted") + handle_ffi_call!( + waku_sys::waku_content_topic, + handle_response, + ctx.get_ptr(), + application_name.as_ptr(), + application_version, + content_topic_name.as_ptr(), + encoding.as_ptr() + ) } /// Publish a message using Waku Relay @@ -78,62 +56,33 @@ pub async fn waku_relay_publish_message( let pubsub_topic = CString::new(String::from(pubsub_topic)) .expect("CString should build properly from pubsub topic"); - let mut result = LibwakuResponse::default(); - let notify = Arc::new(Notify::new()); - let notify_clone = notify.clone(); - let result_cb = |r: LibwakuResponse| { - result = r; - notify_clone.notify_one(); // Notify that the value has been updated - }; - let code = unsafe { - let mut closure = result_cb; - let cb = get_trampoline(&closure); - waku_sys::waku_relay_publish( - ctx.get_ptr(), - pubsub_topic.as_ptr(), - message.as_ptr(), - timeout - .map(|duration| { - duration - .as_millis() - .try_into() - .expect("Duration as milliseconds should fit in a u32") - }) - .unwrap_or(0), - cb, - &mut closure as *mut _ as *mut c_void, - ) - }; - - notify.notified().await; // Wait until a result is received - handle_response(code, result) + handle_ffi_call!( + waku_sys::waku_relay_publish, + handle_response, + ctx.get_ptr(), + pubsub_topic.as_ptr(), + message.as_ptr(), + timeout + .map(|duration| { + duration + .as_millis() + .try_into() + .expect("Duration as milliseconds should fit in a u32") + }) + .unwrap_or(0) + ) } pub async fn waku_relay_subscribe(ctx: &WakuNodeContext, pubsub_topic: &PubsubTopic) -> Result<()> { let pubsub_topic = CString::new(String::from(pubsub_topic)) .expect("CString should build properly from pubsub topic"); - let pubsub_topic_ptr = pubsub_topic.as_ptr(); - let mut result = LibwakuResponse::default(); - let notify = Arc::new(Notify::new()); - let notify_clone = notify.clone(); - let result_cb = |r: LibwakuResponse| { - result = r; - notify_clone.notify_one(); // Notify that the value has been updated - }; - let code = unsafe { - let mut closure = result_cb; - let cb = get_trampoline(&closure); - waku_sys::waku_relay_subscribe( - ctx.get_ptr(), - pubsub_topic_ptr, - cb, - &mut closure as *mut _ as *mut c_void, - ) - }; - - notify.notified().await; // Wait until a result is received - handle_no_response(code, result) + handle_ffi_call!( + waku_sys::waku_relay_subscribe, + handle_no_response, + ctx.get_ptr(), + pubsub_topic.as_ptr() + ) } pub async fn waku_relay_unsubscribe( @@ -142,26 +91,11 @@ pub async fn waku_relay_unsubscribe( ) -> Result<()> { let pubsub_topic = CString::new(String::from(pubsub_topic)) .expect("CString should build properly from pubsub topic"); - let pubsub_topic_ptr = pubsub_topic.as_ptr(); - let mut result = LibwakuResponse::default(); - let notify = Arc::new(Notify::new()); - let notify_clone = notify.clone(); - let result_cb = |r: LibwakuResponse| { - result = r; - notify_clone.notify_one(); // Notify that the value has been updated - }; - let code = unsafe { - let mut closure = result_cb; - let cb = get_trampoline(&closure); - waku_sys::waku_relay_subscribe( - ctx.get_ptr(), - pubsub_topic_ptr, - cb, - &mut closure as *mut _ as *mut c_void, - ) - }; - - notify.notified().await; // Wait until a result is received - handle_no_response(code, result) + handle_ffi_call!( + waku_sys::waku_relay_unsubscribe, + handle_no_response, + ctx.get_ptr(), + pubsub_topic.as_ptr() + ) } diff --git a/waku-bindings/src/node/store.rs b/waku-bindings/src/node/store.rs index 8709a98..51ac707 100644 --- a/waku-bindings/src/node/store.rs +++ b/waku-bindings/src/node/store.rs @@ -2,17 +2,15 @@ // std use std::ffi::CString; -// crates -use libc::*; -use std::sync::Arc; -use tokio::sync::Notify; // internal +use crate::general::libwaku_response::{handle_response, LibwakuResponse}; +use crate::general::waku_decode::WakuDecode; use crate::general::{ contenttopic::WakuContentTopic, messagehash::MessageHash, pubsubtopic::PubsubTopic, Result, WakuStoreRespMessage, }; +use crate::handle_ffi_call; use crate::node::context::WakuNodeContext; -use crate::utils::{get_trampoline, handle_response, LibwakuResponse, WakuDecode}; use multiaddr::Multiaddr; use serde::{Deserialize, Serialize}; @@ -110,36 +108,18 @@ pub async fn waku_store_query( serde_json::to_string(&query).expect("StoreQuery should always be able to be serialized"), ) .expect("CString should build properly from the serialized filter subscription"); - let json_query_ptr = json_query.as_ptr(); peer_addr .parse::() .expect("correct multiaddress in store query"); let peer_addr = CString::new(peer_addr).expect("peer_addr CString should be created"); - let peer_addr_ptr = peer_addr.as_ptr(); - let timeout_millis = timeout_millis.unwrap_or(10000i32); - - let mut result = LibwakuResponse::default(); - let notify = Arc::new(Notify::new()); - let notify_clone = notify.clone(); - let result_cb = |r: LibwakuResponse| { - result = r; - notify_clone.notify_one(); // Notify that the value has been updated - }; - let code = unsafe { - let mut closure = result_cb; - let cb = get_trampoline(&closure); - waku_sys::waku_store_query( - ctx.get_ptr(), - json_query_ptr, - peer_addr_ptr, - timeout_millis, - cb, - &mut closure as *mut _ as *mut c_void, - ) - }; - - notify.notified().await; // Wait until a result is received - handle_response(code, result) + handle_ffi_call!( + waku_sys::waku_store_query, + handle_response, + ctx.get_ptr(), + json_query.as_ptr(), + peer_addr.as_ptr(), + timeout_millis.unwrap_or(10000i32) + ) }