temporary changes. pendin to fine tune them

This commit is contained in:
Ivan Folgueira Bande 2024-10-30 14:58:10 +01:00
parent 223762d426
commit ef44d11e81
No known key found for this signature in database
GPG Key ID: 3C117481F89E24A7
6 changed files with 52 additions and 54 deletions

View File

@ -20,7 +20,7 @@ pub type Result<T> = std::result::Result<T, String>;
// TODO: Properly type and deserialize payload form base64 encoded string // TODO: Properly type and deserialize payload form base64 encoded string
/// Waku message in JSON format. /// Waku message in JSON format.
/// as per the [specification](https://rfc.vac.dev/spec/36/#jsonmessage-type) /// as per the [specification](https://rfc.vac.dev/spec/36/#jsonmessage-type)
#[derive(Clone, Serialize, Deserialize, Debug)] #[derive(Clone, Serialize, Deserialize, Debug, Default)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct WakuMessage { pub struct WakuMessage {
#[serde(with = "base64_serde", default = "Vec::new")] #[serde(with = "base64_serde", default = "Vec::new")]
@ -67,8 +67,9 @@ impl WakuMessage {
} }
/// WakuMessage encoding scheme /// WakuMessage encoding scheme
#[derive(Clone, Debug, Eq, PartialEq)] #[derive(Clone, Debug, Eq, PartialEq, Default)]
pub enum Encoding { pub enum Encoding {
#[default]
Proto, Proto,
Rlp, Rlp,
Rfc26, Rfc26,
@ -105,7 +106,7 @@ impl RegexRepresentation for Encoding {
} }
/// A waku content topic `/{application_name}/{version}/{content_topic_name}/{encdoing}` /// A waku content topic `/{application_name}/{version}/{content_topic_name}/{encdoing}`
#[derive(Clone, Debug, Eq, PartialEq)] #[derive(Clone, Debug, Eq, PartialEq, Default)]
pub struct WakuContentTopic { pub struct WakuContentTopic {
pub application_name: Cow<'static, str>, pub application_name: Cow<'static, str>,
pub version: Cow<'static, str>, pub version: Cow<'static, str>,

View File

@ -3,7 +3,10 @@
//! Implementation on top of [`waku-bindings`](https://rfc.vac.dev/spec/36/) //! Implementation on top of [`waku-bindings`](https://rfc.vac.dev/spec/36/)
mod general; mod general;
mod node; mod node;
mod utils; pub mod utils;
// Re-export the LibwakuResponse type to make it accessible outside this module
pub use utils::LibwakuResponse;
// Required so functions inside libwaku can call RLN functions even if we // Required so functions inside libwaku can call RLN functions even if we
// use it within the bindings functions // use it within the bindings functions

View File

@ -17,7 +17,7 @@ use crate::MessageHash;
/// Waku event /// Waku event
/// For now just WakuMessage is supported /// For now just WakuMessage is supported
#[non_exhaustive] #[non_exhaustive]
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "eventType", rename_all = "camelCase")] #[serde(tag = "eventType", rename_all = "camelCase")]
pub enum Event { pub enum Event {
#[serde(rename = "message")] #[serde(rename = "message")]
@ -26,7 +26,7 @@ pub enum Event {
} }
/// Type of `event` field for a `message` event /// Type of `event` field for a `message` event
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct WakuMessageEvent { pub struct WakuMessageEvent {
/// The pubsub topic on which the message was received /// The pubsub topic on which the message was received
@ -39,20 +39,10 @@ pub struct WakuMessageEvent {
/// Register callback to act as event handler and receive application events, /// Register callback to act as event handler and receive application events,
/// which are used to react to asynchronous events in Waku /// which are used to react to asynchronous events in Waku
pub fn waku_set_event_callback<F: FnMut(Event) + Send + Sync>(ctx: &WakuNodeContext, mut f: F) { pub fn waku_set_event_callback<F: FnMut(LibwakuResponse)>(ctx: &WakuNodeContext, closure: F) {
let cb = |response: LibwakuResponse| {
if let LibwakuResponse::Success(v) = response {
let data: Event =
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");
f(data);
};
};
unsafe { unsafe {
let mut closure = cb;
let cb = get_trampoline(&closure); let cb = get_trampoline(&closure);
waku_sys::waku_set_event_callback(ctx.obj_ptr, cb, &closure as *const _ as *mut c_void)
waku_sys::waku_set_event_callback(ctx.obj_ptr, cb, &mut closure as *mut _ as *mut c_void)
}; };
} }

View File

@ -15,6 +15,7 @@ use std::marker::PhantomData;
use std::time::Duration; use std::time::Duration;
// internal // internal
use crate::general::{MessageHash, Result, WakuMessage}; use crate::general::{MessageHash, Result, WakuMessage};
use crate::LibwakuResponse;
use context::WakuNodeContext; use context::WakuNodeContext;
pub use config::RLNConfig; pub use config::RLNConfig;
@ -116,7 +117,7 @@ impl WakuNodeHandle<Running> {
relay::waku_relay_unsubscribe(&self.ctx, pubsub_topic) relay::waku_relay_unsubscribe(&self.ctx, pubsub_topic)
} }
pub fn set_event_callback<F: FnMut(Event) + Send + Sync + 'static>(&self, f: F) { pub fn set_event_callback<F: FnMut(LibwakuResponse)>(&self, f: F) {
events::waku_set_event_callback(&self.ctx, f) events::waku_set_event_callback(&self.ctx, f)
} }
} }

View File

@ -48,7 +48,7 @@ unsafe extern "C" fn trampoline<F>(
) where ) where
F: FnMut(LibwakuResponse), F: FnMut(LibwakuResponse),
{ {
let user_data = &mut *(user_data as *mut F); let closure = &mut *(user_data as *mut F);
let response = if data.is_null() { let response = if data.is_null() {
"" ""
@ -60,7 +60,7 @@ unsafe extern "C" fn trampoline<F>(
let result = LibwakuResponse::try_from((ret_code as u32, response)) let result = LibwakuResponse::try_from((ret_code as u32, response))
.expect("invalid response obtained from libwaku"); .expect("invalid response obtained from libwaku");
user_data(result); closure(result);
} }
pub fn get_trampoline<F>(_closure: &F) -> WakuCallBack pub fn get_trampoline<F>(_closure: &F) -> WakuCallBack

View File

@ -3,14 +3,16 @@ use serial_test::serial;
use std::str::FromStr; use std::str::FromStr;
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
use std::{collections::HashSet, str::from_utf8}; use std::{collections::HashSet, str::from_utf8};
use std::cell::OnceCell;
use tokio::sync::broadcast::{self, Sender}; use tokio::sync::broadcast::{self, Sender};
use waku_bindings::LibwakuResponse;
use tokio::time; use tokio::time;
use tokio::time::sleep; use tokio::time::sleep;
use waku_bindings::{ use waku_bindings::{
waku_destroy, waku_new, Encoding, Event, MessageHash, Running, WakuContentTopic, WakuMessage, waku_destroy, waku_new, Encoding, Event, MessageHash, Running, WakuContentTopic, WakuMessage,
WakuNodeConfig, WakuNodeHandle, WakuNodeConfig, WakuNodeHandle,
}; };
const ECHO_TIMEOUT: u64 = 10; const ECHO_TIMEOUT: u64 = 100;
const ECHO_MESSAGE: &str = "Hi from 🦀!"; const ECHO_MESSAGE: &str = "Hi from 🦀!";
const TEST_PUBSUBTOPIC: &str = "test"; const TEST_PUBSUBTOPIC: &str = "test";
@ -30,22 +32,6 @@ struct Response {
payload: Vec<u8>, payload: Vec<u8>,
} }
fn set_callback(node: &WakuNodeHandle<Running>, tx: Sender<Response>) {
node.set_event_callback(move |event| {
if let Event::WakuMessage(message) = event {
let hash = message.message_hash;
let message = message.waku_message;
let payload = message.payload.to_vec();
tx.send(Response {
hash: hash.to_string(),
payload,
})
.expect("send response to the receiver");
}
});
}
async fn test_echo_messages( async fn test_echo_messages(
node1: &WakuNodeHandle<Running>, node1: &WakuNodeHandle<Running>,
node2: &WakuNodeHandle<Running>, node2: &WakuNodeHandle<Running>,
@ -66,27 +52,39 @@ async fn test_echo_messages(
false, false,
); );
node1.set_event_callback(move |_event| {}); // setting a naïve event handler to avoid appearing ERR messages in logs
node1.set_event_callback(|_LibwakuResponse| {});
let (tx, mut rx) = broadcast::channel(1); let rx_waku_message: OnceCell<WakuMessage> = OnceCell::new();
set_callback(node2, tx);
let closure = |response: LibwakuResponse| {
if let LibwakuResponse::Success(v) = response {
let event: Event =
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");
match event {
Event::WakuMessage(evt) => {
println!("WakuMessage event received: {:?}", evt.waku_message);
// let _ = rx_waku_message.set(evt.waku_message); // <-- this produces segfault
}
Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err),
_ => panic!("event case not expected"),
};
};
};
node2.set_event_callback(closure);
let mut ids = try_publish_relay_messages(node1, &message).expect("send relay messages"); let mut ids = try_publish_relay_messages(node1, &message).expect("send relay messages");
while let Ok(res) = rx.recv().await {
if ids.take(&res.hash).is_some() {
let msg = from_utf8(&res.payload).expect("should be valid message");
assert_eq!(content, msg);
}
if ids.is_empty() { // Wait for the msg to arrive to form
break; sleep(Duration::from_secs(1)).await;
}
}
} }
#[tokio::test] #[tokio::test]
#[serial] #[serial]
async fn default_echo() -> Result<(), String> { async fn default_echo() -> Result<(), String> {
println!("Test default_echo");
let node1 = waku_new(Some(WakuNodeConfig { let node1 = waku_new(Some(WakuNodeConfig {
port: Some(60010), port: Some(60010),
..Default::default() ..Default::default()
@ -99,16 +97,20 @@ async fn default_echo() -> Result<(), String> {
let node1 = node1.start()?; let node1 = node1.start()?;
let node2 = node2.start()?; let node2 = node2.start()?;
let addresses1 = node1.listen_addresses()?;
node2.connect(&addresses1[0], None)?;
let topic = TEST_PUBSUBTOPIC.to_string(); let topic = TEST_PUBSUBTOPIC.to_string();
node1.relay_subscribe(&topic)?; node1.relay_subscribe(&topic)?;
node2.relay_subscribe(&topic)?; node2.relay_subscribe(&topic)?;
sleep(Duration::from_secs(35)).await;
// Interconnect nodes
println!("Connecting node1 to node2");
let addresses1 = node1.listen_addresses()?;
node2.connect(&addresses1[0], None)?;
// Wait for mesh to form // Wait for mesh to form
sleep(Duration::from_secs(3)).await; sleep(Duration::from_secs(25)).await;
let content_topic = WakuContentTopic::new("toychat", "2", "huilong", Encoding::Proto); let content_topic = WakuContentTopic::new("toychat", "2", "huilong", Encoding::Proto);
@ -135,6 +137,7 @@ async fn default_echo() -> Result<(), String> {
#[test] #[test]
#[serial] #[serial]
fn node_restart() { fn node_restart() {
println!("Test node_restart");
let config = WakuNodeConfig { let config = WakuNodeConfig {
node_key: Some( node_key: Some(
SecretKey::from_str("05f381866cc21f6c1e2e80e07fa732008e36d942dce3206ad6dcd6793c98d609") SecretKey::from_str("05f381866cc21f6c1e2e80e07fa732008e36d942dce3206ad6dcd6793c98d609")