mirror of
https://github.com/logos-messaging/logos-messaging-rust-bindings.git
synced 2026-01-02 22:13:07 +00:00
avoid duplicated code and better separate structs in modules
This commit is contained in:
parent
f27cda7d88
commit
08b8190bdb
@ -1,6 +1,6 @@
|
||||
// std
|
||||
use crate::general::waku_decode::WakuDecode;
|
||||
use crate::general::Result;
|
||||
use crate::utils::WakuDecode;
|
||||
use std::borrow::Cow;
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::str::FromStr;
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
use crate::general::waku_decode::WakuDecode;
|
||||
use crate::general::Result;
|
||||
use std::convert::TryFrom;
|
||||
use std::{slice, str};
|
||||
use waku_sys::WakuCallBack;
|
||||
use std::str;
|
||||
use waku_sys::{RET_ERR, RET_MISSING_CALLBACK, RET_OK};
|
||||
|
||||
#[derive(Debug, Clone, Default, PartialEq)]
|
||||
@ -30,51 +30,8 @@ impl TryFrom<(u32, &str)> for LibwakuResponse {
|
||||
}
|
||||
}
|
||||
|
||||
// Define the WakuDecode trait
|
||||
pub trait WakuDecode: Sized {
|
||||
fn decode(input: &str) -> Result<Self>;
|
||||
}
|
||||
|
||||
impl WakuDecode for String {
|
||||
fn decode(input: &str) -> Result<Self> {
|
||||
Ok(input.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn decode<T: WakuDecode>(input: String) -> Result<T> {
|
||||
T::decode(input.as_str())
|
||||
}
|
||||
|
||||
unsafe extern "C" fn trampoline<F>(
|
||||
ret_code: ::std::os::raw::c_int,
|
||||
data: *const ::std::os::raw::c_char,
|
||||
data_len: usize,
|
||||
user_data: *mut ::std::os::raw::c_void,
|
||||
) where
|
||||
F: FnMut(LibwakuResponse),
|
||||
{
|
||||
let closure = &mut *(user_data as *mut F);
|
||||
|
||||
let response = if data.is_null() {
|
||||
""
|
||||
} else {
|
||||
str::from_utf8(slice::from_raw_parts(data as *mut u8, data_len))
|
||||
.expect("could not retrieve response")
|
||||
};
|
||||
|
||||
let result = LibwakuResponse::try_from((ret_code as u32, response))
|
||||
.expect("invalid response obtained from libwaku");
|
||||
|
||||
closure(result);
|
||||
}
|
||||
|
||||
pub fn get_trampoline<F>(_closure: &F) -> WakuCallBack
|
||||
where
|
||||
F: FnMut(LibwakuResponse),
|
||||
{
|
||||
Some(trampoline::<F>)
|
||||
}
|
||||
|
||||
/// Used in cases where the FFI call doesn't return additional infomation in the
|
||||
/// callback. Instead, it returns RET_OK, RET_ERR, etc.
|
||||
pub fn handle_no_response(code: i32, result: LibwakuResponse) -> Result<()> {
|
||||
if result == LibwakuResponse::Undefined && code as u32 == RET_OK {
|
||||
// Some functions will only execute the callback on error
|
||||
@ -92,9 +49,11 @@ pub fn handle_no_response(code: i32, result: LibwakuResponse) -> Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Used in cases where the FFI function returns a code (RET_OK, RET_ERR, etc) plus additional
|
||||
/// information, i.e. LibwakuResponse
|
||||
pub fn handle_response<F: WakuDecode>(code: i32, result: LibwakuResponse) -> Result<F> {
|
||||
match result {
|
||||
LibwakuResponse::Success(v) => decode(v.unwrap_or_default()),
|
||||
LibwakuResponse::Success(v) => WakuDecode::decode(&v.unwrap_or_default()),
|
||||
LibwakuResponse::Failure(v) => Err(v),
|
||||
LibwakuResponse::MissingCallback => panic!("callback is required"),
|
||||
LibwakuResponse::Undefined => panic!(
|
||||
@ -1,4 +1,4 @@
|
||||
use crate::utils::WakuDecode;
|
||||
use crate::general::waku_decode::WakuDecode;
|
||||
use hex::FromHex;
|
||||
use serde::{Deserialize, Deserializer, Serialize};
|
||||
use std::convert::TryInto;
|
||||
|
||||
@ -1,8 +1,10 @@
|
||||
//! Waku [general](https://rfc.vac.dev/spec/36/#general) types
|
||||
|
||||
pub mod contenttopic;
|
||||
pub mod libwaku_response;
|
||||
pub mod messagehash;
|
||||
pub mod pubsubtopic;
|
||||
pub mod waku_decode;
|
||||
|
||||
// crates
|
||||
use contenttopic::WakuContentTopic;
|
||||
|
||||
26
waku-bindings/src/general/waku_decode.rs
Normal file
26
waku-bindings/src/general/waku_decode.rs
Normal file
@ -0,0 +1,26 @@
|
||||
use crate::general::Result;
|
||||
use multiaddr::Multiaddr;
|
||||
// Define the WakuDecode trait
|
||||
pub trait WakuDecode: Sized {
|
||||
fn decode(input: &str) -> Result<Self>;
|
||||
}
|
||||
|
||||
impl WakuDecode for String {
|
||||
fn decode(input: &str) -> Result<Self> {
|
||||
Ok(input.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn decode<T: WakuDecode>(input: String) -> Result<T> {
|
||||
T::decode(input.as_str())
|
||||
}
|
||||
|
||||
impl WakuDecode for Vec<Multiaddr> {
|
||||
fn decode(input: &str) -> Result<Self> {
|
||||
input
|
||||
.split(',')
|
||||
.map(|s| s.trim().parse::<Multiaddr>().map_err(|err| err.to_string()))
|
||||
.collect::<Result<Vec<Multiaddr>>>() // Collect results into a Vec
|
||||
.map_err(|err| format!("could not parse Multiaddr: {}", err))
|
||||
}
|
||||
}
|
||||
@ -2,11 +2,11 @@
|
||||
//!
|
||||
//! Implementation on top of [`waku-bindings`](https://rfc.vac.dev/spec/36/)
|
||||
pub mod general;
|
||||
mod macros;
|
||||
pub mod node;
|
||||
pub mod utils;
|
||||
|
||||
// Re-export the LibwakuResponse type to make it accessible outside this module
|
||||
pub use utils::LibwakuResponse;
|
||||
pub use general::libwaku_response::LibwakuResponse;
|
||||
|
||||
// Required so functions inside libwaku can call RLN functions even if we
|
||||
// use it within the bindings functions
|
||||
|
||||
73
waku-bindings/src/macros.rs
Normal file
73
waku-bindings/src/macros.rs
Normal file
@ -0,0 +1,73 @@
|
||||
use crate::general::libwaku_response::LibwakuResponse;
|
||||
|
||||
use std::{slice, str};
|
||||
use waku_sys::WakuCallBack;
|
||||
|
||||
unsafe extern "C" fn trampoline<F>(
|
||||
ret_code: ::std::os::raw::c_int,
|
||||
data: *const ::std::os::raw::c_char,
|
||||
data_len: usize,
|
||||
user_data: *mut ::std::os::raw::c_void,
|
||||
) where
|
||||
F: FnMut(LibwakuResponse),
|
||||
{
|
||||
let closure = &mut *(user_data as *mut F);
|
||||
|
||||
let response = if data.is_null() {
|
||||
""
|
||||
} else {
|
||||
str::from_utf8(slice::from_raw_parts(data as *mut u8, data_len))
|
||||
.expect("could not retrieve response")
|
||||
};
|
||||
|
||||
let result = LibwakuResponse::try_from((ret_code as u32, response))
|
||||
.expect("invalid response obtained from libwaku");
|
||||
|
||||
closure(result);
|
||||
}
|
||||
|
||||
pub fn get_trampoline<F>(_closure: &F) -> WakuCallBack
|
||||
where
|
||||
F: FnMut(LibwakuResponse),
|
||||
{
|
||||
Some(trampoline::<F>)
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! handle_ffi_call {
|
||||
// Case: With or without additional arguments
|
||||
($waku_fn:expr, $resp_hndlr:expr, $ctx:expr $(, $($arg:expr),*)?) => {{
|
||||
use crate::macros::get_trampoline;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Notify;
|
||||
use libc::*;
|
||||
|
||||
let mut result = LibwakuResponse::default();
|
||||
let notify = Arc::new(Notify::new());
|
||||
let notify_clone = notify.clone();
|
||||
|
||||
// Callback to update the result and notify the waiter
|
||||
let result_cb = |r: LibwakuResponse| {
|
||||
result = r;
|
||||
notify_clone.notify_one();
|
||||
};
|
||||
|
||||
// Create trampoline and invoke the `waku_sys` function
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
$waku_fn(
|
||||
$ctx, // Pass the context
|
||||
$($($arg),*,)? // Expand the variadic arguments if provided
|
||||
cb, // Pass the callback trampoline
|
||||
&mut closure as *mut _ as *mut c_void
|
||||
)
|
||||
};
|
||||
|
||||
// Wait for the callback to notify us
|
||||
notify.notified().await;
|
||||
|
||||
// Handle the response
|
||||
$resp_hndlr(code, result)
|
||||
}};
|
||||
}
|
||||
@ -2,7 +2,8 @@ use std::ffi::c_void;
|
||||
use std::ptr::null_mut;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use crate::utils::{get_trampoline, LibwakuResponse};
|
||||
use crate::general::libwaku_response::LibwakuResponse;
|
||||
use crate::macros::get_trampoline;
|
||||
|
||||
type LibwakuResponseClosure = dyn FnMut(LibwakuResponse) + Send + Sync;
|
||||
|
||||
|
||||
@ -8,10 +8,11 @@ use std::sync::Arc;
|
||||
use tokio::sync::Notify;
|
||||
// internal
|
||||
use crate::general::contenttopic::WakuContentTopic;
|
||||
use crate::general::libwaku_response::{handle_no_response, LibwakuResponse};
|
||||
use crate::general::pubsubtopic::PubsubTopic;
|
||||
use crate::general::Result;
|
||||
use crate::macros::get_trampoline;
|
||||
use crate::node::context::WakuNodeContext;
|
||||
use crate::utils::{get_trampoline, handle_no_response, LibwakuResponse};
|
||||
|
||||
pub async fn waku_filter_subscribe(
|
||||
ctx: &WakuNodeContext,
|
||||
|
||||
@ -3,13 +3,11 @@
|
||||
// std
|
||||
use std::ffi::CString;
|
||||
// crates
|
||||
use libc::*;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Notify;
|
||||
// internal
|
||||
use crate::general::libwaku_response::{handle_response, LibwakuResponse};
|
||||
use crate::general::{messagehash::MessageHash, Result, WakuMessage};
|
||||
use crate::handle_ffi_call;
|
||||
use crate::node::context::WakuNodeContext;
|
||||
use crate::utils::{get_trampoline, handle_response, LibwakuResponse};
|
||||
|
||||
use crate::general::pubsubtopic::PubsubTopic;
|
||||
|
||||
@ -23,32 +21,15 @@ pub async fn waku_lightpush_publish_message(
|
||||
.expect("WakuMessages should always be able to success serializing"),
|
||||
)
|
||||
.expect("CString should build properly from the serialized waku message");
|
||||
let message_ptr = message.as_ptr();
|
||||
|
||||
let pubsub_topic = CString::new(String::from(pubsub_topic))
|
||||
.expect("CString should build properly from pubsub topic");
|
||||
let pubsub_topic_ptr = pubsub_topic.as_ptr();
|
||||
|
||||
let mut result = LibwakuResponse::default();
|
||||
let notify = Arc::new(Notify::new());
|
||||
let notify_clone = notify.clone();
|
||||
let result_cb = |r: LibwakuResponse| {
|
||||
result = r;
|
||||
notify_clone.notify_one(); // Notify that the value has been updated
|
||||
};
|
||||
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
waku_sys::waku_lightpush_publish(
|
||||
ctx.get_ptr(),
|
||||
pubsub_topic_ptr,
|
||||
message_ptr,
|
||||
cb,
|
||||
&mut closure as *mut _ as *mut c_void,
|
||||
)
|
||||
};
|
||||
|
||||
notify.notified().await; // Wait until a result is received
|
||||
handle_response(code, result)
|
||||
handle_ffi_call!(
|
||||
waku_sys::waku_lightpush_publish,
|
||||
handle_response,
|
||||
ctx.get_ptr(),
|
||||
pubsub_topic.as_ptr(),
|
||||
message.as_ptr()
|
||||
)
|
||||
}
|
||||
|
||||
@ -9,11 +9,11 @@ use std::sync::Arc;
|
||||
use tokio::sync::Notify;
|
||||
// internal
|
||||
use super::config::WakuNodeConfig;
|
||||
use crate::general::libwaku_response::{handle_no_response, handle_response, LibwakuResponse};
|
||||
use crate::general::Result;
|
||||
use crate::handle_ffi_call;
|
||||
use crate::macros::get_trampoline;
|
||||
use crate::node::context::WakuNodeContext;
|
||||
use crate::utils::LibwakuResponse;
|
||||
use crate::utils::WakuDecode;
|
||||
use crate::utils::{get_trampoline, handle_no_response, handle_response};
|
||||
|
||||
/// Instantiates a Waku node
|
||||
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_newchar-jsonconfig)
|
||||
@ -49,113 +49,34 @@ pub async fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeContext>
|
||||
}
|
||||
|
||||
pub async fn waku_destroy(ctx: &WakuNodeContext) -> Result<()> {
|
||||
let mut result = LibwakuResponse::default();
|
||||
let notify = Arc::new(Notify::new());
|
||||
let notify_clone = notify.clone();
|
||||
let result_cb = |r: LibwakuResponse| {
|
||||
result = r;
|
||||
notify_clone.notify_one(); // Notify that the value has been updated
|
||||
};
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
waku_sys::waku_destroy(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void)
|
||||
};
|
||||
notify.notified().await; // Wait until a result is received
|
||||
|
||||
handle_no_response(code, result)
|
||||
handle_ffi_call!(waku_sys::waku_destroy, handle_no_response, ctx.get_ptr())
|
||||
}
|
||||
|
||||
/// Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation.
|
||||
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_start)
|
||||
pub async fn waku_start(ctx: &WakuNodeContext) -> Result<()> {
|
||||
let mut result = LibwakuResponse::default();
|
||||
let notify = Arc::new(Notify::new());
|
||||
let notify_clone = notify.clone();
|
||||
let result_cb = |r: LibwakuResponse| {
|
||||
result = r;
|
||||
notify_clone.notify_one(); // Notify that the value has been updated
|
||||
};
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
waku_sys::waku_start(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void)
|
||||
};
|
||||
|
||||
notify.notified().await; // Wait until a result is received
|
||||
handle_no_response(code, result)
|
||||
handle_ffi_call!(waku_sys::waku_start, handle_no_response, ctx.get_ptr())
|
||||
}
|
||||
|
||||
/// Stops a Waku node
|
||||
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop)
|
||||
pub async fn waku_stop(ctx: &WakuNodeContext) -> Result<()> {
|
||||
let mut result = LibwakuResponse::default();
|
||||
let notify = Arc::new(Notify::new());
|
||||
let notify_clone = notify.clone();
|
||||
let result_cb = |r: LibwakuResponse| {
|
||||
result = r;
|
||||
notify_clone.notify_one(); // Notify that the value has been updated
|
||||
};
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
waku_sys::waku_stop(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void)
|
||||
};
|
||||
|
||||
notify.notified().await; // Wait until a result is received
|
||||
handle_no_response(code, result)
|
||||
handle_ffi_call!(waku_sys::waku_stop, handle_no_response, ctx.get_ptr())
|
||||
}
|
||||
|
||||
/// nwaku version
|
||||
#[allow(clippy::not_unsafe_ptr_arg_deref)]
|
||||
pub async fn waku_version(ctx: &WakuNodeContext) -> Result<String> {
|
||||
let mut result = LibwakuResponse::default();
|
||||
let notify = Arc::new(Notify::new());
|
||||
let notify_clone = notify.clone();
|
||||
let result_cb = |r: LibwakuResponse| {
|
||||
result = r;
|
||||
notify_clone.notify_one(); // Notify that the value has been updated
|
||||
};
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
waku_sys::waku_version(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void)
|
||||
};
|
||||
|
||||
notify.notified().await; // Wait until a result is received
|
||||
handle_response(code, result)
|
||||
}
|
||||
|
||||
// Implement WakuDecode for Vec<Multiaddr>
|
||||
impl WakuDecode for Vec<Multiaddr> {
|
||||
fn decode(input: &str) -> Result<Self> {
|
||||
input
|
||||
.split(',')
|
||||
.map(|s| s.trim().parse::<Multiaddr>().map_err(|err| err.to_string()))
|
||||
.collect::<Result<Vec<Multiaddr>>>() // Collect results into a Vec
|
||||
.map_err(|err| format!("could not parse Multiaddr: {}", err))
|
||||
}
|
||||
handle_ffi_call!(waku_sys::waku_version, handle_response, ctx.get_ptr())
|
||||
}
|
||||
|
||||
/// Get the multiaddresses the Waku node is listening to
|
||||
/// as per [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_listen_addresses)
|
||||
pub async fn waku_listen_addresses(ctx: &WakuNodeContext) -> Result<Vec<Multiaddr>> {
|
||||
let mut result = LibwakuResponse::default();
|
||||
let notify = Arc::new(Notify::new());
|
||||
let notify_clone = notify.clone();
|
||||
let result_cb = |r: LibwakuResponse| {
|
||||
result = r;
|
||||
notify_clone.notify_one(); // Notify that the value has been updated
|
||||
};
|
||||
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
waku_sys::waku_listen_addresses(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void)
|
||||
};
|
||||
|
||||
notify.notified().await; // Wait until a result is received
|
||||
handle_response(code, result)
|
||||
handle_ffi_call!(
|
||||
waku_sys::waku_listen_addresses,
|
||||
handle_response,
|
||||
ctx.get_ptr()
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@ -20,9 +20,9 @@ use std::time::Duration;
|
||||
use store::StoreWakuMessageResponse;
|
||||
// internal
|
||||
use crate::general::contenttopic::{Encoding, WakuContentTopic};
|
||||
use crate::general::libwaku_response::LibwakuResponse;
|
||||
pub use crate::general::pubsubtopic::PubsubTopic;
|
||||
use crate::general::{messagehash::MessageHash, Result, WakuMessage};
|
||||
use crate::utils::LibwakuResponse;
|
||||
|
||||
use crate::node::context::WakuNodeContext;
|
||||
pub use config::RLNConfig;
|
||||
|
||||
@ -4,15 +4,12 @@
|
||||
use std::ffi::CString;
|
||||
use std::time::Duration;
|
||||
// crates
|
||||
use libc::*;
|
||||
use multiaddr::Multiaddr;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Notify;
|
||||
// internal
|
||||
use crate::general::libwaku_response::{handle_no_response, LibwakuResponse};
|
||||
use crate::general::Result;
|
||||
use crate::handle_ffi_call;
|
||||
use crate::node::context::WakuNodeContext;
|
||||
use crate::utils::LibwakuResponse;
|
||||
use crate::utils::{get_trampoline, handle_no_response};
|
||||
|
||||
/// Dial peer using a multiaddress
|
||||
/// If `timeout` as milliseconds doesn't fit into a `i32` it is clamped to [`i32::MAX`]
|
||||
@ -27,29 +24,13 @@ pub async fn waku_connect(
|
||||
let address =
|
||||
CString::new(address.to_string()).expect("CString should build properly from multiaddress");
|
||||
|
||||
let address_ptr = address.as_ptr();
|
||||
|
||||
let mut result = LibwakuResponse::default();
|
||||
let notify = Arc::new(Notify::new());
|
||||
let notify_clone = notify.clone();
|
||||
let result_cb = |r: LibwakuResponse| {
|
||||
result = r;
|
||||
notify_clone.notify_one(); // Notify that the value has been updated
|
||||
};
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
waku_sys::waku_connect(
|
||||
ctx.get_ptr(),
|
||||
address_ptr,
|
||||
timeout
|
||||
.map(|duration| duration.as_millis().try_into().unwrap_or(u32::MAX))
|
||||
.unwrap_or(0),
|
||||
cb,
|
||||
&mut closure as *mut _ as *mut c_void,
|
||||
)
|
||||
};
|
||||
|
||||
notify.notified().await; // Wait until a result is received
|
||||
handle_no_response(code, result)
|
||||
handle_ffi_call!(
|
||||
waku_sys::waku_connect,
|
||||
handle_no_response,
|
||||
ctx.get_ptr(),
|
||||
address.as_ptr(),
|
||||
timeout
|
||||
.map(|duration| duration.as_millis().try_into().unwrap_or(u32::MAX))
|
||||
.unwrap_or(0)
|
||||
)
|
||||
}
|
||||
|
||||
@ -2,17 +2,14 @@
|
||||
|
||||
// std
|
||||
use std::ffi::CString;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::Notify;
|
||||
// crates
|
||||
use libc::*;
|
||||
// internal
|
||||
use crate::general::contenttopic::{Encoding, WakuContentTopic};
|
||||
use crate::general::libwaku_response::{handle_no_response, handle_response, LibwakuResponse};
|
||||
use crate::general::pubsubtopic::PubsubTopic;
|
||||
use crate::general::{messagehash::MessageHash, Result, WakuMessage};
|
||||
use crate::handle_ffi_call;
|
||||
use crate::node::context::WakuNodeContext;
|
||||
use crate::utils::{get_trampoline, handle_no_response, handle_response, LibwakuResponse};
|
||||
|
||||
/// Create a content topic according to [RFC 23](https://rfc.vac.dev/spec/23/)
|
||||
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_content_topicchar-applicationname-unsigned-int-applicationversion-char-contenttopicname-char-encoding)
|
||||
@ -23,42 +20,23 @@ pub async fn waku_create_content_topic(
|
||||
application_version: u32,
|
||||
content_topic_name: &str,
|
||||
encoding: Encoding,
|
||||
) -> WakuContentTopic {
|
||||
) -> Result<WakuContentTopic> {
|
||||
let application_name = CString::new(application_name)
|
||||
.expect("Application name should always transform to CString");
|
||||
let application_name_ptr = application_name.as_ptr();
|
||||
|
||||
let content_topic_name =
|
||||
CString::new(content_topic_name).expect("Content topic should always transform to CString");
|
||||
let content_topic_name_ptr = content_topic_name.as_ptr();
|
||||
|
||||
let encoding =
|
||||
CString::new(encoding.to_string()).expect("Encoding should always transform to CString");
|
||||
let encoding_ptr = encoding.as_ptr();
|
||||
|
||||
let mut result = LibwakuResponse::default();
|
||||
let notify = Arc::new(Notify::new());
|
||||
let notify_clone = notify.clone();
|
||||
let result_cb = |r: LibwakuResponse| {
|
||||
result = r;
|
||||
notify_clone.notify_one(); // Notify that the value has been updated
|
||||
};
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
waku_sys::waku_content_topic(
|
||||
ctx.get_ptr(),
|
||||
application_name_ptr,
|
||||
application_version,
|
||||
content_topic_name_ptr,
|
||||
encoding_ptr,
|
||||
cb,
|
||||
&mut closure as *mut _ as *mut c_void,
|
||||
)
|
||||
};
|
||||
|
||||
notify.notified().await; // Wait until a result is received
|
||||
handle_response(code, result).expect("&str from result should always be extracted")
|
||||
handle_ffi_call!(
|
||||
waku_sys::waku_content_topic,
|
||||
handle_response,
|
||||
ctx.get_ptr(),
|
||||
application_name.as_ptr(),
|
||||
application_version,
|
||||
content_topic_name.as_ptr(),
|
||||
encoding.as_ptr()
|
||||
)
|
||||
}
|
||||
|
||||
/// Publish a message using Waku Relay
|
||||
@ -78,62 +56,33 @@ pub async fn waku_relay_publish_message(
|
||||
let pubsub_topic = CString::new(String::from(pubsub_topic))
|
||||
.expect("CString should build properly from pubsub topic");
|
||||
|
||||
let mut result = LibwakuResponse::default();
|
||||
let notify = Arc::new(Notify::new());
|
||||
let notify_clone = notify.clone();
|
||||
let result_cb = |r: LibwakuResponse| {
|
||||
result = r;
|
||||
notify_clone.notify_one(); // Notify that the value has been updated
|
||||
};
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
waku_sys::waku_relay_publish(
|
||||
ctx.get_ptr(),
|
||||
pubsub_topic.as_ptr(),
|
||||
message.as_ptr(),
|
||||
timeout
|
||||
.map(|duration| {
|
||||
duration
|
||||
.as_millis()
|
||||
.try_into()
|
||||
.expect("Duration as milliseconds should fit in a u32")
|
||||
})
|
||||
.unwrap_or(0),
|
||||
cb,
|
||||
&mut closure as *mut _ as *mut c_void,
|
||||
)
|
||||
};
|
||||
|
||||
notify.notified().await; // Wait until a result is received
|
||||
handle_response(code, result)
|
||||
handle_ffi_call!(
|
||||
waku_sys::waku_relay_publish,
|
||||
handle_response,
|
||||
ctx.get_ptr(),
|
||||
pubsub_topic.as_ptr(),
|
||||
message.as_ptr(),
|
||||
timeout
|
||||
.map(|duration| {
|
||||
duration
|
||||
.as_millis()
|
||||
.try_into()
|
||||
.expect("Duration as milliseconds should fit in a u32")
|
||||
})
|
||||
.unwrap_or(0)
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn waku_relay_subscribe(ctx: &WakuNodeContext, pubsub_topic: &PubsubTopic) -> Result<()> {
|
||||
let pubsub_topic = CString::new(String::from(pubsub_topic))
|
||||
.expect("CString should build properly from pubsub topic");
|
||||
let pubsub_topic_ptr = pubsub_topic.as_ptr();
|
||||
|
||||
let mut result = LibwakuResponse::default();
|
||||
let notify = Arc::new(Notify::new());
|
||||
let notify_clone = notify.clone();
|
||||
let result_cb = |r: LibwakuResponse| {
|
||||
result = r;
|
||||
notify_clone.notify_one(); // Notify that the value has been updated
|
||||
};
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
waku_sys::waku_relay_subscribe(
|
||||
ctx.get_ptr(),
|
||||
pubsub_topic_ptr,
|
||||
cb,
|
||||
&mut closure as *mut _ as *mut c_void,
|
||||
)
|
||||
};
|
||||
|
||||
notify.notified().await; // Wait until a result is received
|
||||
handle_no_response(code, result)
|
||||
handle_ffi_call!(
|
||||
waku_sys::waku_relay_subscribe,
|
||||
handle_no_response,
|
||||
ctx.get_ptr(),
|
||||
pubsub_topic.as_ptr()
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn waku_relay_unsubscribe(
|
||||
@ -142,26 +91,11 @@ pub async fn waku_relay_unsubscribe(
|
||||
) -> Result<()> {
|
||||
let pubsub_topic = CString::new(String::from(pubsub_topic))
|
||||
.expect("CString should build properly from pubsub topic");
|
||||
let pubsub_topic_ptr = pubsub_topic.as_ptr();
|
||||
|
||||
let mut result = LibwakuResponse::default();
|
||||
let notify = Arc::new(Notify::new());
|
||||
let notify_clone = notify.clone();
|
||||
let result_cb = |r: LibwakuResponse| {
|
||||
result = r;
|
||||
notify_clone.notify_one(); // Notify that the value has been updated
|
||||
};
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
waku_sys::waku_relay_subscribe(
|
||||
ctx.get_ptr(),
|
||||
pubsub_topic_ptr,
|
||||
cb,
|
||||
&mut closure as *mut _ as *mut c_void,
|
||||
)
|
||||
};
|
||||
|
||||
notify.notified().await; // Wait until a result is received
|
||||
handle_no_response(code, result)
|
||||
handle_ffi_call!(
|
||||
waku_sys::waku_relay_unsubscribe,
|
||||
handle_no_response,
|
||||
ctx.get_ptr(),
|
||||
pubsub_topic.as_ptr()
|
||||
)
|
||||
}
|
||||
|
||||
@ -2,17 +2,15 @@
|
||||
|
||||
// std
|
||||
use std::ffi::CString;
|
||||
// crates
|
||||
use libc::*;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Notify;
|
||||
// internal
|
||||
use crate::general::libwaku_response::{handle_response, LibwakuResponse};
|
||||
use crate::general::waku_decode::WakuDecode;
|
||||
use crate::general::{
|
||||
contenttopic::WakuContentTopic, messagehash::MessageHash, pubsubtopic::PubsubTopic, Result,
|
||||
WakuStoreRespMessage,
|
||||
};
|
||||
use crate::handle_ffi_call;
|
||||
use crate::node::context::WakuNodeContext;
|
||||
use crate::utils::{get_trampoline, handle_response, LibwakuResponse, WakuDecode};
|
||||
use multiaddr::Multiaddr;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@ -110,36 +108,18 @@ pub async fn waku_store_query(
|
||||
serde_json::to_string(&query).expect("StoreQuery should always be able to be serialized"),
|
||||
)
|
||||
.expect("CString should build properly from the serialized filter subscription");
|
||||
let json_query_ptr = json_query.as_ptr();
|
||||
|
||||
peer_addr
|
||||
.parse::<Multiaddr>()
|
||||
.expect("correct multiaddress in store query");
|
||||
let peer_addr = CString::new(peer_addr).expect("peer_addr CString should be created");
|
||||
let peer_addr_ptr = peer_addr.as_ptr();
|
||||
|
||||
let timeout_millis = timeout_millis.unwrap_or(10000i32);
|
||||
|
||||
let mut result = LibwakuResponse::default();
|
||||
let notify = Arc::new(Notify::new());
|
||||
let notify_clone = notify.clone();
|
||||
let result_cb = |r: LibwakuResponse| {
|
||||
result = r;
|
||||
notify_clone.notify_one(); // Notify that the value has been updated
|
||||
};
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
waku_sys::waku_store_query(
|
||||
ctx.get_ptr(),
|
||||
json_query_ptr,
|
||||
peer_addr_ptr,
|
||||
timeout_millis,
|
||||
cb,
|
||||
&mut closure as *mut _ as *mut c_void,
|
||||
)
|
||||
};
|
||||
|
||||
notify.notified().await; // Wait until a result is received
|
||||
handle_response(code, result)
|
||||
handle_ffi_call!(
|
||||
waku_sys::waku_store_query,
|
||||
handle_response,
|
||||
ctx.get_ptr(),
|
||||
json_query.as_ptr(),
|
||||
peer_addr.as_ptr(),
|
||||
timeout_millis.unwrap_or(10000i32)
|
||||
)
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user