fix: event handler

This commit is contained in:
Richard Ramos 2024-02-14 16:52:21 -04:00
parent 1f9283a849
commit 685a6aef0a
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760
8 changed files with 89 additions and 109 deletions

View File

@ -13,8 +13,6 @@ use sscanf::{scanf, RegexRepresentation};
pub type WakuMessageVersion = usize;
/// Waku message id, hex encoded sha256 digest of the message
pub type MessageId = String;
/// Waku pubsub topic
pub type WakuPubSubTopic = String;
/// Waku response, just a `Result` with an `String` error.
pub type Result<T> = std::result::Result<T, String>;
@ -29,7 +27,6 @@ pub struct WakuMessage {
payload: Vec<u8>,
/// The content topic to be set on the message
content_topic: WakuContentTopic,
// TODO: check if missing default should be 0
/// The Waku Message version number
#[serde(default)]
version: WakuMessageVersion,
@ -238,13 +235,6 @@ mod base64_serde {
#[cfg(test)]
mod tests {
use super::*;
use crate::WakuPubSubTopic;
#[test]
fn parse_waku_topic() {
let s = "/waku/2/default-waku/proto";
let _: WakuPubSubTopic = s.parse().unwrap();
}
#[test]
fn deserialize_waku_message() {

View File

@ -13,9 +13,7 @@ use rln;
pub use node::{
waku_create_content_topic, waku_default_pubsub_topic, waku_new, Event, Key, Multiaddr,
PublicKey, SecretKey, Signal, WakuMessageEvent, WakuNodeConfig, WakuNodeHandle,
PublicKey, SecretKey, WakuMessageEvent, WakuNodeConfig, WakuNodeHandle,
};
pub use general::{
Encoding, MessageId, Result, WakuContentTopic, WakuMessage, WakuMessageVersion, WakuPubSubTopic,
};
pub use general::{Encoding, MessageId, Result, WakuContentTopic, WakuMessage, WakuMessageVersion};

View File

@ -2,7 +2,6 @@
// std
// crates
use crate::WakuPubSubTopic;
use multiaddr::Multiaddr;
use secp256k1::SecretKey;
use serde::{Deserialize, Serialize};
@ -29,7 +28,7 @@ pub struct WakuNodeConfig {
/// Enable relay protocol. Default `true`
#[default(Some(true))]
pub relay: Option<bool>,
pub relay_topics: Vec<WakuPubSubTopic>,
pub relay_topics: Vec<String>,
// /// Enable store protocol to persist message history
// #[default(Some(false))]
// pub store: Option<bool>,

View File

@ -2,40 +2,27 @@
//!
//! Asynchronous events require a callback to be registered.
//! An example of an asynchronous event that might be emitted is receiving a message.
//! When an event is emitted, this callback will be triggered receiving a [`Signal`]
//! When an event is emitted, this callback will be triggered receiving an [`Event`]
// std
use std::ffi::c_void;
// crates
use serde::{Deserialize, Serialize};
// internal
use crate::general::{WakuMessage, WakuPubSubTopic};
use crate::general::WakuMessage;
use crate::utils::get_trampoline;
use crate::MessageId;
use core::str::FromStr;
use waku_sys::WakuCallBack;
/// Event signal
#[derive(Serialize, Deserialize)]
pub struct Signal {
/// Type of signal being emitted. Currently, only message is available
#[serde(alias = "type")]
_type: String,
/// Format depends on the type of signal
event: Event,
}
impl Signal {
pub fn event(&self) -> &Event {
&self.event
}
}
/// Waku event
/// For now just WakuMessage is supported
#[non_exhaustive]
#[derive(Serialize, Deserialize)]
#[serde(untagged, rename_all = "camelCase")]
#[serde(tag = "eventType", rename_all = "camelCase")]
pub enum Event {
#[serde(rename = "message")]
WakuMessage(WakuMessageEvent),
Unrecognized(serde_json::Value),
}
@ -45,7 +32,7 @@ pub enum Event {
#[serde(rename_all = "camelCase")]
pub struct WakuMessageEvent {
/// The pubsub topic on which the message was received
pubsub_topic: WakuPubSubTopic,
pubsub_topic: String,
/// The message id
message_id: MessageId,
/// The message in [`WakuMessage`] format
@ -53,7 +40,7 @@ pub struct WakuMessageEvent {
}
impl WakuMessageEvent {
pub fn pubsub_topic(&self) -> &WakuPubSubTopic {
pub fn pubsub_topic(&self) -> &String {
&self.pubsub_topic
}
@ -66,43 +53,38 @@ impl WakuMessageEvent {
}
}
/// Wrapper callback, it transformst the `*const c_char` into a [`Signal`]
fn callback<F: FnMut(Signal) + Send + Sync>(mut f: F) -> WakuCallBack {
/// Register callback to act as event handler and receive application events,
/// which are used to react to asynchronous events in Waku
pub fn waku_set_event_callback<F: FnMut(Event) + Send + Sync>(ctx: *mut c_void, mut f: F) {
let cb = |v: &str| {
let data: Signal = serde_json::from_str(v).expect("Parsing signal to succeed");
let data: Event = serde_json::from_str(v).expect("Parsing event to succeed");
println!("EXEC CALLBACK");
f(data);
println!("SUCCESS!");
};
get_trampoline(&cb)
}
unsafe {
let mut closure = cb;
let cb = get_trampoline(&closure);
/// Register callback to act as event handler and receive application signals,
/// which are used to react to asynchronous events in Waku
pub fn waku_set_event_callback<F: FnMut(Signal) + Send + Sync>(ctx: *mut c_void, f: F) {
// <F: FnMut(Signal) + Send + Sync + 'static> , , f: F
unsafe { waku_sys::waku_set_event_callback(ctx, callback(f), std::ptr::null_mut()) };
waku_sys::waku_set_event_callback(ctx, cb, &mut closure as *mut _ as *mut c_void)
};
}
#[cfg(test)]
mod tests {
/*use crate::events::waku_set_event_callback;
use crate::{Event, Signal};
use crate::Event;
// TODO: how to actually send a signal and check if the callback is run?
#[test]
fn set_event_callback() {
waku_set_event_callback(|_signal| {});
}
#[test]
fn deserialize_signal() {
let s = "{\"type\":\"message\",\"event\":{\"messageId\":\"0x26ff3d7fbc950ea2158ce62fd76fd745eee0323c9eac23d0713843b0f04ea27c\",\"pubsubTopic\":\"/waku/2/default-waku/proto\",\"wakuMessage\":{\"payload\":\"SGkgZnJvbSDwn6aAIQ==\",\"contentTopic\":\"/toychat/2/huilong/proto\",\"timestamp\":1665580926660}}}";
let _: Signal = serde_json::from_str(s).unwrap();
}
#[test]
fn deserialize_event() {
let e = "{\"messageId\":\"0x26ff3d7fbc950ea2158ce62fd76fd745eee0323c9eac23d0713843b0f04ea27c\",\"pubsubTopic\":\"/waku/2/default-waku/proto\",\"wakuMessage\":{\"payload\":\"SGkgZnJvbSDwn6aAIQ==\",\"contentTopic\":\"/toychat/2/huilong/proto\",\"timestamp\":1665580926660}}";
let _: Event = serde_json::from_str(e).unwrap();
// TODO: how to actually send an event and check if the callback is run?
//#[test]
/*fn set_callback() {
callback(|_event| {});
}*/
#[test]
fn deserialize_message_event() {
let s = "{\"eventType\":\"message\",\"messageId\":\"0x26ff3d7fbc950ea2158ce62fd76fd745eee0323c9eac23d0713843b0f04ea27c\",\"pubsubTopic\":\"/waku/2/default-waku/proto\",\"wakuMessage\":{\"payload\":\"SGkgZnJvbSDwn6aAIQ==\",\"contentTopic\":\"/toychat/2/huilong/proto\",\"timestamp\":1665580926660}}";
let evt: Event = serde_json::from_str(s).unwrap();
assert!(matches!(evt, Event::WakuMessage(_)));
}
}

View File

@ -15,10 +15,10 @@ use std::time::Duration;
use libc::c_void;
// internal
use crate::general::{MessageId, Result, WakuMessage, WakuPubSubTopic};
use crate::general::{MessageId, Result, WakuMessage};
pub use config::WakuNodeConfig;
pub use events::{Event, Signal, WakuMessageEvent};
pub use events::{Event, WakuMessageEvent};
pub use relay::{waku_create_content_topic, waku_default_pubsub_topic};
/// Handle to the underliying waku node
@ -54,23 +54,23 @@ impl WakuNodeHandle {
pub fn relay_publish_message(
&self,
message: &WakuMessage,
pubsub_topic: &WakuPubSubTopic,
pubsub_topic: &String,
timeout: Option<Duration>,
) -> Result<MessageId> {
relay::waku_relay_publish_message(self.ctx, message, pubsub_topic, timeout)
}
/// Subscribe to WakuRelay to receive messages matching a content filter.
pub fn relay_subscribe(&self, pubsub_topic: &WakuPubSubTopic) -> Result<()> {
pub fn relay_subscribe(&self, pubsub_topic: &String) -> Result<()> {
relay::waku_relay_subscribe(self.ctx, pubsub_topic)
}
/// Closes the pubsub subscription to stop receiving messages matching a content filter. No more messages will be received from this pubsub topic
pub fn relay_unsubscribe(&self, pubsub_topic: &WakuPubSubTopic) -> Result<()> {
pub fn relay_unsubscribe(&self, pubsub_topic: &String) -> Result<()> {
relay::waku_relay_unsubscribe(self.ctx, pubsub_topic)
}
pub fn set_event_callback<F: FnMut(Signal) + Send + Sync>(&self, f: F) {
pub fn set_event_callback<F: FnMut(Event) + Send + Sync + 'static>(&self, f: F) {
events::waku_set_event_callback(self.ctx, f)
}
}

View File

@ -6,7 +6,7 @@ use std::time::Duration;
// crates
use libc::*;
// internal
use crate::general::{Encoding, MessageId, Result, WakuContentTopic, WakuMessage, WakuPubSubTopic};
use crate::general::{Encoding, MessageId, Result, WakuContentTopic, WakuMessage};
use crate::utils::{get_trampoline, handle_json_response, handle_no_response, handle_response};
/// Create a content topic according to [RFC 23](https://rfc.vac.dev/spec/23/)
@ -57,7 +57,7 @@ pub fn waku_create_content_topic(
/// Default pubsub topic used for exchanging waku messages defined in [RFC 10](https://rfc.vac.dev/spec/10/)
#[allow(clippy::not_unsafe_ptr_arg_deref)]
pub fn waku_default_pubsub_topic(ctx: *mut c_void) -> WakuPubSubTopic {
pub fn waku_default_pubsub_topic(ctx: *mut c_void) -> String {
let mut result: String = Default::default();
let result_cb = |v: &str| result = v.to_string();
let code = unsafe {
@ -74,7 +74,7 @@ pub fn waku_default_pubsub_topic(ctx: *mut c_void) -> WakuPubSubTopic {
pub fn waku_relay_publish_message(
ctx: *mut c_void,
message: &WakuMessage,
pubsub_topic: &WakuPubSubTopic,
pubsub_topic: &String,
timeout: Option<Duration>,
) -> Result<MessageId> {
let pubsub_topic = pubsub_topic.to_string();
@ -96,8 +96,8 @@ pub fn waku_relay_publish_message(
let cb = get_trampoline(&closure);
let out = waku_sys::waku_relay_publish(
ctx,
message_ptr,
pubsub_topic_ptr,
message_ptr,
timeout
.map(|duration| {
duration
@ -119,7 +119,7 @@ pub fn waku_relay_publish_message(
handle_response(code, &result)
}
pub fn waku_relay_subscribe(ctx: *mut c_void, pubsub_topic: &WakuPubSubTopic) -> Result<()> {
pub fn waku_relay_subscribe(ctx: *mut c_void, pubsub_topic: &String) -> Result<()> {
let pubsub_topic = pubsub_topic.to_string();
let pubsub_topic_ptr = CString::new(pubsub_topic)
.expect("CString should build properly from pubsub topic")
@ -145,7 +145,7 @@ pub fn waku_relay_subscribe(ctx: *mut c_void, pubsub_topic: &WakuPubSubTopic) ->
handle_no_response(code, &error)
}
pub fn waku_relay_unsubscribe(ctx: *mut c_void, pubsub_topic: &WakuPubSubTopic) -> Result<()> {
pub fn waku_relay_unsubscribe(ctx: *mut c_void, pubsub_topic: &String) -> Result<()> {
let pubsub_topic = pubsub_topic.to_string();
let pubsub_topic_ptr = CString::new(pubsub_topic)
.expect("CString should build properly from pubsub topic")

View File

@ -1,49 +1,64 @@
use aes_gcm::{Aes256Gcm, KeyInit};
use multiaddr::Multiaddr;
use rand::thread_rng;
use secp256k1::SecretKey;
use serial_test::serial;
use std::str::FromStr;
use std::time::{Duration, SystemTime};
use std::{collections::HashSet, str::from_utf8};
use tokio::sync::mpsc::{self, Sender};
use tokio::sync::broadcast::{self, Sender};
use tokio::time;
use waku_bindings::{
waku_new, Encoding, Event, Key, MessageId, WakuContentTopic, WakuMessage, WakuNodeConfig,
WakuNodeHandle, WakuPubSubTopic,
waku_new, Encoding, Event, MessageId, WakuContentTopic, WakuMessage, WakuNodeConfig,
WakuNodeHandle,
};
const ECHO_TIMEOUT: u64 = 10;
const ECHO_MESSAGE: &str = "Hi from 🦀!";
const NODES: &[&str] = &[
"/dns4/node-01.ac-cn-hongkong-c.wakuv2.test.statusim.net/tcp/30303/p2p/16Uiu2HAkvWiyFsgRhuJEb9JfjYxEkoHLgnUQmr1N5mKWnYjxYRVm",
"/dns4/node-01.do-ams3.wakuv2.test.statusim.net/tcp/30303/p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ",
"/dns4/node-01.gc-us-central1-a.wakuv2.test.statusim.net/tcp/30303/p2p/16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47Rxx5hw3q4YjS"
];
const NODES: &[&str] =
&["/dns4/node-01.do-ams3.status.prod.statusim.net/tcp/30303/p2p/16Uiu2HAm6HZZr7aToTvEBPpiys4UxajCTU97zj5v7RNR2gbniy1D"];
fn try_publish_relay_messages(
node: &WakuNodeHandle,
msg: &WakuMessage,
) -> Result<HashSet<MessageId>, String> {
let topic = "test".to_string();
node.relay_publish_message(msg, &topic, None)?;
node.relay_publish_message(msg, &topic, None)?;
Ok(HashSet::from([
node.relay_publish_message(msg, &topic, None)?
]))
}
#[derive(Debug)]
#[derive(Debug, Clone)]
struct Response {
id: MessageId,
payload: Vec<u8>,
}
fn set_callback(node: &WakuNodeHandle, tx: Sender<Response>) {
node.set_event_callback(move |event| {
if let Event::WakuMessage(message) = event {
let id = message.message_id();
let message = message.waku_message();
let payload = message.payload().to_vec();
println!("===============");
println!("ID: {}", id);
println!("Sending to channel....");
tx.send(Response {
id: id.to_string(),
payload,
})
.expect("send response to the receiver");
println!("Sent!");
}
});
}
async fn test_echo_messages(
node: &WakuNodeHandle,
content: &'static str,
content_topic: WakuContentTopic,
sk: SecretKey,
ssk: Key<Aes256Gcm>,
) {
let message = WakuMessage::new(
content,
@ -59,25 +74,23 @@ async fn test_echo_messages(
false,
);
/*
// let (tx, mut rx) = mpsc::channel(1);
//set_callback(tx, sk, ssk);
let (tx, mut rx) = broadcast::channel(1);
set_callback(node, tx);
let mut ids = try_publish_relay_messages(node, &message).expect("send relay messages");
let mut ids = try_publish_relay_messages(node, &message).expect("send relay messages");
while let Some(res) = rx.recv().await {
if ids.take(&res.id).is_some() {
let msg = from_utf8(&res.payload).expect("should be valid message");
assert_eq!(content, msg);
}
while let Ok(res) = rx.recv().await {
if ids.take(&res.id).is_some() {
let msg = from_utf8(&res.payload).expect("should be valid message");
assert_eq!(content, msg);
}
if ids.is_empty() {
break;
}
}*/
if ids.is_empty() {
break;
}
}
}
#[ignore]
#[tokio::test]
#[serial]
async fn default_echo() -> Result<(), String> {
@ -97,8 +110,6 @@ async fn default_echo() -> Result<(), String> {
let address: Multiaddr = node_address.parse().unwrap();
node.connect(&address, None)?;
}
let sk = SecretKey::new(&mut thread_rng());
let ssk = Aes256Gcm::generate_key(&mut thread_rng());
// subscribe to default channel
let topic = "test".to_string();
@ -113,7 +124,7 @@ async fn default_echo() -> Result<(), String> {
// Send and receive messages. Waits until all messages received.
let got_all = tokio::select! {
_ = sleep => false,
_ = test_echo_messages(&node, ECHO_MESSAGE, content_topic, sk, ssk) => true,
_ = test_echo_messages(&node, ECHO_MESSAGE, content_topic) => true,
};
assert!(got_all);

@ -1 +1 @@
Subproject commit b64b017060337dfc427c74c4688e55ca8165caef
Subproject commit 9d9622265b7382e9f1a2b9f7f1b68c6beb93a0af