mirror of
https://github.com/logos-messaging/logos-messaging-rust-bindings.git
synced 2026-01-04 06:53:06 +00:00
refactor: use a enum for handling the responses, and don't ignore the return code (#97)
* refactor: handle return code and use an enum to handle responses * fix: nwaku does not return an envelope hash on publish
This commit is contained in:
parent
646f6f0080
commit
a10a5c2d22
@ -11,7 +11,7 @@ use serde::{Deserialize, Serialize};
|
||||
// internal
|
||||
use crate::general::WakuMessage;
|
||||
use crate::node::context::WakuNodeContext;
|
||||
use crate::utils::get_trampoline;
|
||||
use crate::utils::{get_trampoline, LibwakuResponse};
|
||||
use crate::MessageId;
|
||||
|
||||
/// Waku event
|
||||
@ -40,9 +40,15 @@ pub struct WakuMessageEvent {
|
||||
/// Register callback to act as event handler and receive application events,
|
||||
/// which are used to react to asynchronous events in Waku
|
||||
pub fn waku_set_event_callback<F: FnMut(Event) + Send + Sync>(ctx: &WakuNodeContext, mut f: F) {
|
||||
let cb = |v: &str| {
|
||||
let data: Event = serde_json::from_str(v).expect("Parsing event to succeed");
|
||||
f(data);
|
||||
let cb = |response: LibwakuResponse| {
|
||||
match response {
|
||||
LibwakuResponse::Success(v) => {
|
||||
let data: Event =
|
||||
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");
|
||||
f(data);
|
||||
}
|
||||
_ => {} // Do nothing
|
||||
};
|
||||
};
|
||||
|
||||
unsafe {
|
||||
|
||||
@ -9,6 +9,7 @@ use multiaddr::Multiaddr;
|
||||
use super::config::WakuNodeConfig;
|
||||
use crate::general::Result;
|
||||
use crate::node::context::WakuNodeContext;
|
||||
use crate::utils::LibwakuResponse;
|
||||
use crate::utils::{get_trampoline, handle_json_response, handle_no_response, handle_response};
|
||||
|
||||
/// Instantiates a Waku node
|
||||
@ -23,10 +24,10 @@ pub fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeContext> {
|
||||
.expect("CString should build properly from the config")
|
||||
.into_raw();
|
||||
|
||||
let mut error: String = Default::default();
|
||||
let error_cb = |v: &str| error = v.to_string();
|
||||
let mut result: LibwakuResponse = Default::default();
|
||||
let result_cb = |r: LibwakuResponse| result = r;
|
||||
let obj_ptr = unsafe {
|
||||
let mut closure = error_cb;
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
let out = waku_sys::waku_new(config_ptr, cb, &mut closure as *mut _ as *mut c_void);
|
||||
|
||||
@ -35,67 +36,67 @@ pub fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeContext> {
|
||||
out
|
||||
};
|
||||
|
||||
if !error.is_empty() {
|
||||
Err(error)
|
||||
} else {
|
||||
Ok(WakuNodeContext { obj_ptr })
|
||||
match result {
|
||||
LibwakuResponse::MissingCallback => panic!("callback is required"),
|
||||
LibwakuResponse::Failure(v) => Err(v),
|
||||
_ => Ok(WakuNodeContext { obj_ptr }),
|
||||
}
|
||||
}
|
||||
|
||||
/// Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation.
|
||||
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_start)
|
||||
pub fn waku_start(ctx: &WakuNodeContext) -> Result<()> {
|
||||
let mut error: String = Default::default();
|
||||
let error_cb = |v: &str| error = v.to_string();
|
||||
let mut result: LibwakuResponse = Default::default();
|
||||
let result_cb = |r: LibwakuResponse| result = r;
|
||||
let code = unsafe {
|
||||
let mut closure = error_cb;
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
waku_sys::waku_start(ctx.obj_ptr, cb, &mut closure as *mut _ as *mut c_void)
|
||||
};
|
||||
|
||||
handle_no_response(code, &error)
|
||||
handle_no_response(code, result)
|
||||
}
|
||||
|
||||
/// Stops a Waku node
|
||||
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop)
|
||||
pub fn waku_stop(ctx: &WakuNodeContext) -> Result<()> {
|
||||
let mut error: String = Default::default();
|
||||
let error_cb = |v: &str| error = v.to_string();
|
||||
let mut result: LibwakuResponse = Default::default();
|
||||
let result_cb = |r: LibwakuResponse| result = r;
|
||||
let code = unsafe {
|
||||
let mut closure = error_cb;
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
waku_sys::waku_stop(ctx.obj_ptr, cb, &mut closure as *mut _ as *mut c_void)
|
||||
};
|
||||
|
||||
handle_no_response(code, &error)
|
||||
handle_no_response(code, result)
|
||||
}
|
||||
|
||||
/// nwaku version
|
||||
#[allow(clippy::not_unsafe_ptr_arg_deref)]
|
||||
pub fn waku_version(ctx: &WakuNodeContext) -> Result<String> {
|
||||
let mut result: String = Default::default();
|
||||
let result_cb = |v: &str| result = v.to_string();
|
||||
let mut result: LibwakuResponse = Default::default();
|
||||
let result_cb = |r: LibwakuResponse| result = r;
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
waku_sys::waku_version(ctx.obj_ptr, cb, &mut closure as *mut _ as *mut c_void)
|
||||
};
|
||||
|
||||
handle_response(code, &result)
|
||||
handle_response(code, result)
|
||||
}
|
||||
|
||||
/// Get the multiaddresses the Waku node is listening to
|
||||
/// as per [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_listen_addresses)
|
||||
pub fn waku_listen_addresses(ctx: &WakuNodeContext) -> Result<Vec<Multiaddr>> {
|
||||
let mut result: String = Default::default();
|
||||
let result_cb = |v: &str| result = v.to_string();
|
||||
let mut result: LibwakuResponse = Default::default();
|
||||
let result_cb = |r: LibwakuResponse| result = r;
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
waku_sys::waku_listen_addresses(ctx.obj_ptr, cb, &mut closure as *mut _ as *mut c_void)
|
||||
};
|
||||
|
||||
handle_json_response(code, &result)
|
||||
handle_json_response(code, result)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@ -13,7 +13,7 @@ pub use multiaddr::Multiaddr;
|
||||
pub use secp256k1::{PublicKey, SecretKey};
|
||||
use std::time::Duration;
|
||||
// internal
|
||||
use crate::general::{MessageId, Result, WakuMessage};
|
||||
use crate::general::{Result, WakuMessage};
|
||||
use context::WakuNodeContext;
|
||||
|
||||
pub use config::WakuNodeConfig;
|
||||
@ -66,7 +66,7 @@ impl WakuNodeHandle {
|
||||
message: &WakuMessage,
|
||||
pubsub_topic: &String,
|
||||
timeout: Option<Duration>,
|
||||
) -> Result<MessageId> {
|
||||
) -> Result<()> {
|
||||
relay::waku_relay_publish_message(&self.ctx, message, pubsub_topic, timeout)
|
||||
}
|
||||
|
||||
|
||||
@ -9,6 +9,7 @@ use multiaddr::Multiaddr;
|
||||
// internal
|
||||
use crate::general::Result;
|
||||
use crate::node::context::WakuNodeContext;
|
||||
use crate::utils::LibwakuResponse;
|
||||
use crate::utils::{get_trampoline, handle_no_response};
|
||||
|
||||
/// Dial peer using a multiaddress
|
||||
@ -25,10 +26,10 @@ pub fn waku_connect(
|
||||
.expect("CString should build properly from multiaddress")
|
||||
.into_raw();
|
||||
|
||||
let mut error: String = Default::default();
|
||||
let error_cb = |v: &str| error = v.to_string();
|
||||
let mut result: LibwakuResponse = Default::default();
|
||||
let result_cb = |r: LibwakuResponse| result = r;
|
||||
let code = unsafe {
|
||||
let mut closure = error_cb;
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
let out = waku_sys::waku_connect(
|
||||
ctx.obj_ptr,
|
||||
@ -45,5 +46,5 @@ pub fn waku_connect(
|
||||
out
|
||||
};
|
||||
|
||||
handle_no_response(code, &error)
|
||||
handle_no_response(code, result)
|
||||
}
|
||||
|
||||
@ -6,9 +6,9 @@ use std::time::Duration;
|
||||
// crates
|
||||
use libc::*;
|
||||
// internal
|
||||
use crate::general::{Encoding, MessageId, Result, WakuContentTopic, WakuMessage};
|
||||
use crate::general::{Encoding, Result, WakuContentTopic, WakuMessage};
|
||||
use crate::node::context::WakuNodeContext;
|
||||
use crate::utils::{get_trampoline, handle_no_response, handle_response};
|
||||
use crate::utils::{get_trampoline, handle_no_response, handle_response, LibwakuResponse};
|
||||
|
||||
/// Create a content topic according to [RFC 23](https://rfc.vac.dev/spec/23/)
|
||||
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_content_topicchar-applicationname-unsigned-int-applicationversion-char-contenttopicname-char-encoding)
|
||||
@ -30,8 +30,8 @@ pub fn waku_create_content_topic(
|
||||
.expect("Encoding should always transform to CString")
|
||||
.into_raw();
|
||||
|
||||
let mut result: String = Default::default();
|
||||
let result_cb = |v: &str| result = v.to_string();
|
||||
let mut result: LibwakuResponse = Default::default();
|
||||
let result_cb = |r: LibwakuResponse| result = r;
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
@ -52,8 +52,7 @@ pub fn waku_create_content_topic(
|
||||
out
|
||||
};
|
||||
|
||||
handle_response::<WakuContentTopic>(code, &result)
|
||||
.expect("&str from result should always be extracted")
|
||||
handle_response(code, result).expect("&str from result should always be extracted")
|
||||
}
|
||||
|
||||
/// Publish a message using Waku Relay
|
||||
@ -63,7 +62,7 @@ pub fn waku_relay_publish_message(
|
||||
message: &WakuMessage,
|
||||
pubsub_topic: &String,
|
||||
timeout: Option<Duration>,
|
||||
) -> Result<MessageId> {
|
||||
) -> Result<()> {
|
||||
let pubsub_topic = pubsub_topic.to_string();
|
||||
|
||||
let message_ptr = CString::new(
|
||||
@ -76,8 +75,8 @@ pub fn waku_relay_publish_message(
|
||||
.expect("CString should build properly from pubsub topic")
|
||||
.into_raw();
|
||||
|
||||
let mut result: String = Default::default();
|
||||
let result_cb = |v: &str| result = v.to_string();
|
||||
let mut result: LibwakuResponse = Default::default();
|
||||
let result_cb = |r: LibwakuResponse| result = r;
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
@ -103,7 +102,7 @@ pub fn waku_relay_publish_message(
|
||||
out
|
||||
};
|
||||
|
||||
handle_response(code, &result)
|
||||
handle_no_response(code, result)
|
||||
}
|
||||
|
||||
pub fn waku_relay_subscribe(ctx: &WakuNodeContext, pubsub_topic: &String) -> Result<()> {
|
||||
@ -112,10 +111,10 @@ pub fn waku_relay_subscribe(ctx: &WakuNodeContext, pubsub_topic: &String) -> Res
|
||||
.expect("CString should build properly from pubsub topic")
|
||||
.into_raw();
|
||||
|
||||
let mut error: String = Default::default();
|
||||
let error_cb = |v: &str| error = v.to_string();
|
||||
let mut result: LibwakuResponse = Default::default();
|
||||
let result_cb = |r: LibwakuResponse| result = r;
|
||||
let code = unsafe {
|
||||
let mut closure = error_cb;
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
let out = waku_sys::waku_relay_subscribe(
|
||||
ctx.obj_ptr,
|
||||
@ -129,7 +128,7 @@ pub fn waku_relay_subscribe(ctx: &WakuNodeContext, pubsub_topic: &String) -> Res
|
||||
out
|
||||
};
|
||||
|
||||
handle_no_response(code, &error)
|
||||
handle_no_response(code, result)
|
||||
}
|
||||
|
||||
pub fn waku_relay_unsubscribe(ctx: &WakuNodeContext, pubsub_topic: &String) -> Result<()> {
|
||||
@ -138,10 +137,10 @@ pub fn waku_relay_unsubscribe(ctx: &WakuNodeContext, pubsub_topic: &String) -> R
|
||||
.expect("CString should build properly from pubsub topic")
|
||||
.into_raw();
|
||||
|
||||
let mut error: String = Default::default();
|
||||
let error_cb = |v: &str| error = v.to_string();
|
||||
let mut result: LibwakuResponse = Default::default();
|
||||
let result_cb = |r: LibwakuResponse| result = r;
|
||||
let code = unsafe {
|
||||
let mut closure = error_cb;
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
let out = waku_sys::waku_relay_subscribe(
|
||||
ctx.obj_ptr,
|
||||
@ -155,5 +154,5 @@ pub fn waku_relay_unsubscribe(ctx: &WakuNodeContext, pubsub_topic: &String) -> R
|
||||
out
|
||||
};
|
||||
|
||||
handle_no_response(code, &error)
|
||||
handle_no_response(code, result)
|
||||
}
|
||||
|
||||
@ -1,22 +1,46 @@
|
||||
use crate::general::Result;
|
||||
use core::str::FromStr;
|
||||
use serde::de::DeserializeOwned;
|
||||
use std::convert::TryFrom;
|
||||
use std::{slice, str};
|
||||
use waku_sys::WakuCallBack;
|
||||
use waku_sys::{RET_ERR, RET_MISSING_CALLBACK, RET_OK};
|
||||
|
||||
pub fn decode<T: DeserializeOwned>(input: &str) -> Result<T> {
|
||||
serde_json::from_str(input)
|
||||
#[derive(Debug, Default, PartialEq)]
|
||||
pub enum LibwakuResponse {
|
||||
Success(Option<String>),
|
||||
Failure(String),
|
||||
MissingCallback,
|
||||
#[default]
|
||||
Undefined,
|
||||
}
|
||||
|
||||
impl TryFrom<(u32, &str)> for LibwakuResponse {
|
||||
type Error = String;
|
||||
|
||||
fn try_from((ret_code, response): (u32, &str)) -> std::result::Result<Self, Self::Error> {
|
||||
let opt_value = Some(response.to_string()).filter(|s| !s.is_empty());
|
||||
match ret_code {
|
||||
RET_OK => Ok(LibwakuResponse::Success(opt_value)),
|
||||
RET_ERR => Ok(LibwakuResponse::Failure(format!("waku error: {}", response))),
|
||||
RET_MISSING_CALLBACK => Ok(LibwakuResponse::MissingCallback),
|
||||
_ => Err(format!("undefined return code {}", ret_code)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn decode<T: DeserializeOwned>(input: String) -> Result<T> {
|
||||
serde_json::from_str(input.as_str())
|
||||
.map_err(|err| format!("could not deserialize waku response: {}", err))
|
||||
}
|
||||
|
||||
unsafe extern "C" fn trampoline<F>(
|
||||
_ret_code: ::std::os::raw::c_int,
|
||||
ret_code: ::std::os::raw::c_int,
|
||||
data: *const ::std::os::raw::c_char,
|
||||
data_len: usize,
|
||||
user_data: *mut ::std::os::raw::c_void,
|
||||
) where
|
||||
F: FnMut(&str),
|
||||
F: FnMut(LibwakuResponse),
|
||||
{
|
||||
let user_data = &mut *(user_data as *mut F);
|
||||
|
||||
@ -27,41 +51,59 @@ unsafe extern "C" fn trampoline<F>(
|
||||
.expect("could not retrieve response")
|
||||
};
|
||||
|
||||
user_data(response);
|
||||
let result = LibwakuResponse::try_from((ret_code as u32, response))
|
||||
.expect("invalid response obtained from libwaku");
|
||||
|
||||
user_data(result);
|
||||
}
|
||||
|
||||
pub fn get_trampoline<F>(_closure: &F) -> WakuCallBack
|
||||
where
|
||||
F: FnMut(&str),
|
||||
F: FnMut(LibwakuResponse),
|
||||
{
|
||||
Some(trampoline::<F>)
|
||||
}
|
||||
|
||||
pub fn handle_no_response(code: i32, error: &str) -> Result<()> {
|
||||
match code as u32 {
|
||||
RET_OK => Ok(()),
|
||||
RET_ERR => Err(format!("waku error: {}", error)),
|
||||
RET_MISSING_CALLBACK => Err("missing callback".to_string()),
|
||||
_ => Err(format!("undefined return code {}", code)),
|
||||
pub fn handle_no_response(code: i32, result: LibwakuResponse) -> Result<()> {
|
||||
if result == LibwakuResponse::Undefined && code as u32 == RET_OK {
|
||||
// Some functions will only execute the callback on error
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
match result {
|
||||
LibwakuResponse::Success(_) => Ok(()),
|
||||
LibwakuResponse::Failure(v) => Err(v),
|
||||
LibwakuResponse::MissingCallback => panic!("callback is required"),
|
||||
LibwakuResponse::Undefined => panic!(
|
||||
"undefined ffi state: code({}) was returned but callback was not executed",
|
||||
code
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handle_json_response<F: DeserializeOwned>(code: i32, result: &str) -> Result<F> {
|
||||
match code as u32 {
|
||||
RET_OK => decode(result),
|
||||
RET_ERR => Err(format!("waku error: {}", result)),
|
||||
RET_MISSING_CALLBACK => Err("missing callback".to_string()),
|
||||
_ => Err(format!("undefined return code {}", code)),
|
||||
pub fn handle_json_response<F: DeserializeOwned>(code: i32, result: LibwakuResponse) -> Result<F> {
|
||||
match result {
|
||||
LibwakuResponse::Success(v) => decode(v.unwrap_or_default()),
|
||||
LibwakuResponse::Failure(v) => Err(v),
|
||||
LibwakuResponse::MissingCallback => panic!("callback is required"),
|
||||
LibwakuResponse::Undefined => panic!(
|
||||
"undefined ffi state: code({}) was returned but callback was not executed",
|
||||
code
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handle_response<F: FromStr>(code: i32, result: &str) -> Result<F> {
|
||||
match code as u32 {
|
||||
RET_OK => result
|
||||
pub fn handle_response<F: FromStr>(code: i32, result: LibwakuResponse) -> Result<F> {
|
||||
match result {
|
||||
LibwakuResponse::Success(v) => v
|
||||
.unwrap_or_default()
|
||||
.parse()
|
||||
.map_err(|_| format!("could not parse value: {}", result)),
|
||||
RET_ERR => Err(format!("waku error: {}", result)),
|
||||
RET_MISSING_CALLBACK => Err("missing callback".to_string()),
|
||||
_ => Err(format!("undefined return code {}", code)),
|
||||
.map_err(|_| format!("could not parse value")),
|
||||
LibwakuResponse::Failure(v) => Err(v),
|
||||
LibwakuResponse::MissingCallback => panic!("callback is required"),
|
||||
LibwakuResponse::Undefined => panic!(
|
||||
"undefined ffi state: code({}) was returned but callback was not executed",
|
||||
code
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
@ -2,7 +2,7 @@ use secp256k1::SecretKey;
|
||||
use serial_test::serial;
|
||||
use std::str::FromStr;
|
||||
use std::time::{Duration, SystemTime};
|
||||
use std::{collections::HashSet, str::from_utf8};
|
||||
use std::{str::from_utf8};
|
||||
use tokio::sync::broadcast::{self, Sender};
|
||||
use tokio::time;
|
||||
use tokio::time::sleep;
|
||||
@ -17,11 +17,9 @@ const TEST_PUBSUBTOPIC: &str = "test";
|
||||
fn try_publish_relay_messages(
|
||||
node: &WakuNodeHandle,
|
||||
msg: &WakuMessage,
|
||||
) -> Result<HashSet<MessageId>, String> {
|
||||
) -> Result<(), String> {
|
||||
let topic = TEST_PUBSUBTOPIC.to_string();
|
||||
Ok(HashSet::from([
|
||||
node.relay_publish_message(msg, &topic, None)?
|
||||
]))
|
||||
Ok(node.relay_publish_message(msg, &topic, None)?)
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@ -71,17 +69,11 @@ async fn test_echo_messages(
|
||||
let (tx, mut rx) = broadcast::channel(1);
|
||||
set_callback(node2, tx);
|
||||
|
||||
let mut ids = try_publish_relay_messages(node1, &message).expect("send relay messages");
|
||||
try_publish_relay_messages(node1, &message).expect("send relay messages");
|
||||
|
||||
while let Ok(res) = rx.recv().await {
|
||||
if ids.take(&res.id).is_some() {
|
||||
let msg = from_utf8(&res.payload).expect("should be valid message");
|
||||
assert_eq!(content, msg);
|
||||
}
|
||||
|
||||
if ids.is_empty() {
|
||||
break;
|
||||
}
|
||||
assert!(!res.id.is_empty());
|
||||
from_utf8(&res.payload).expect("should be valid message");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user