mirror of
https://github.com/waku-org/waku-rust-bindings.git
synced 2025-02-16 16:47:33 +00:00
final set of adaptations to current nwaku
This commit is contained in:
parent
e5aaa4d90b
commit
dfe068222d
1
Cargo.lock
generated
1
Cargo.lock
generated
@ -380,6 +380,7 @@ name = "basic"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"futures",
|
"futures",
|
||||||
|
"serde_json",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tokio-util",
|
"tokio-util",
|
||||||
"waku-bindings",
|
"waku-bindings",
|
||||||
|
@ -10,3 +10,4 @@ futures = "0.3.30"
|
|||||||
tokio = { version = "1.36.0", features = ["full"] }
|
tokio = { version = "1.36.0", features = ["full"] }
|
||||||
tokio-util = { version = "0.7.10", features = ["rt"] }
|
tokio-util = { version = "0.7.10", features = ["rt"] }
|
||||||
waku = { path = "../../waku-bindings", package = "waku-bindings" }
|
waku = { path = "../../waku-bindings", package = "waku-bindings" }
|
||||||
|
serde_json = "1.0"
|
||||||
|
@ -3,7 +3,7 @@ use std::str::from_utf8;
|
|||||||
use std::time::SystemTime;
|
use std::time::SystemTime;
|
||||||
use tokio::time::{sleep, Duration};
|
use tokio::time::{sleep, Duration};
|
||||||
use waku::{
|
use waku::{
|
||||||
waku_destroy, waku_new, Encoding, Event, WakuContentTopic, WakuMessage, WakuNodeConfig,
|
waku_destroy, waku_new, Encoding, Event, WakuContentTopic, WakuMessage, WakuNodeConfig, LibwakuResponse,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
@ -25,25 +25,45 @@ async fn main() -> Result<(), Error> {
|
|||||||
|
|
||||||
// ========================================================================
|
// ========================================================================
|
||||||
// Setting an event callback to be executed each time a message is received
|
// Setting an event callback to be executed each time a message is received
|
||||||
node2.set_event_callback(move |event| {
|
node2.set_event_callback(&|response| {
|
||||||
if let Event::WakuMessage(message) = event {
|
if let LibwakuResponse::Success(v) = response {
|
||||||
let message = message.waku_message;
|
let event: Event =
|
||||||
let payload = message.payload.to_vec();
|
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");
|
||||||
let msg = from_utf8(&payload).expect("should be valid message");
|
|
||||||
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
|
match event {
|
||||||
println!("Message Received in NODE 2: {}", msg);
|
Event::WakuMessage(evt) => {
|
||||||
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
|
println!("WakuMessage event received: {:?}", evt.waku_message);
|
||||||
|
let message = evt.waku_message;
|
||||||
|
let payload = message.payload.to_vec();
|
||||||
|
let msg = from_utf8(&payload).expect("should be valid message");
|
||||||
|
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
|
||||||
|
println!("Message Received in NODE 2: {}", msg);
|
||||||
|
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
|
||||||
|
}
|
||||||
|
Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err),
|
||||||
|
_ => panic!("event case not expected"),
|
||||||
|
};
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
node1.set_event_callback(move |event| {
|
node1.set_event_callback(&|response| {
|
||||||
if let Event::WakuMessage(message) = event {
|
if let LibwakuResponse::Success(v) = response {
|
||||||
let message = message.waku_message;
|
let event: Event =
|
||||||
let payload = message.payload.to_vec();
|
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");
|
||||||
let msg = from_utf8(&payload).expect("should be valid message");
|
|
||||||
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
|
match event {
|
||||||
println!("Message Received in NODE 1: {}", msg);
|
Event::WakuMessage(evt) => {
|
||||||
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
|
println!("WakuMessage event received: {:?}", evt.waku_message);
|
||||||
|
let message = evt.waku_message;
|
||||||
|
let payload = message.payload.to_vec();
|
||||||
|
let msg = from_utf8(&payload).expect("should be valid message");
|
||||||
|
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
|
||||||
|
println!("Message Received in NODE 1: {}", msg);
|
||||||
|
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
|
||||||
|
}
|
||||||
|
Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err),
|
||||||
|
_ => panic!("event case not expected"),
|
||||||
|
};
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -31,6 +31,7 @@ waku-sys = { version = "0.5.0", path = "../waku-sys" }
|
|||||||
libc = "0.2"
|
libc = "0.2"
|
||||||
serde-aux = "4.3.1"
|
serde-aux = "4.3.1"
|
||||||
rln = "0.3.4"
|
rln = "0.3.4"
|
||||||
|
tokio = { version = "1", features = ["full"] }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
futures = "0.3.25"
|
futures = "0.3.25"
|
||||||
|
@ -39,10 +39,10 @@ pub struct WakuMessageEvent {
|
|||||||
|
|
||||||
/// Register callback to act as event handler and receive application events,
|
/// Register callback to act as event handler and receive application events,
|
||||||
/// which are used to react to asynchronous events in Waku
|
/// which are used to react to asynchronous events in Waku
|
||||||
pub fn waku_set_event_callback<F: FnMut(LibwakuResponse)>(ctx: &WakuNodeContext, closure: F) {
|
pub fn waku_set_event_callback<F: FnMut(LibwakuResponse)>(ctx: &WakuNodeContext, closure: &F) {
|
||||||
unsafe {
|
unsafe {
|
||||||
let cb = get_trampoline(&closure);
|
let cb = get_trampoline(closure);
|
||||||
waku_sys::waku_set_event_callback(ctx.obj_ptr, cb, &closure as *const _ as *mut c_void)
|
waku_sys::waku_set_event_callback(ctx.obj_ptr, cb, closure as *const _ as *mut c_void)
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -117,7 +117,7 @@ impl WakuNodeHandle<Running> {
|
|||||||
relay::waku_relay_unsubscribe(&self.ctx, pubsub_topic)
|
relay::waku_relay_unsubscribe(&self.ctx, pubsub_topic)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_event_callback<F: FnMut(LibwakuResponse)>(&self, f: F) {
|
pub fn set_event_callback<F: Fn(LibwakuResponse)>(&self, closure: &F) {
|
||||||
events::waku_set_event_callback(&self.ctx, f)
|
events::waku_set_event_callback(&self.ctx, closure)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4,17 +4,13 @@ use std::str::FromStr;
|
|||||||
use std::time::{Duration, SystemTime};
|
use std::time::{Duration, SystemTime};
|
||||||
use std::{collections::HashSet, str::from_utf8};
|
use std::{collections::HashSet, str::from_utf8};
|
||||||
use std::cell::OnceCell;
|
use std::cell::OnceCell;
|
||||||
use tokio::sync::broadcast::{self, Sender};
|
|
||||||
use waku_bindings::LibwakuResponse;
|
use waku_bindings::LibwakuResponse;
|
||||||
use std::sync::{Arc, OnceLock, Mutex}; // Import Arc and Mutex
|
|
||||||
use tokio::time;
|
use tokio::time;
|
||||||
use tokio::time::sleep;
|
use tokio::time::sleep;
|
||||||
use waku_bindings::utils;
|
|
||||||
use waku_bindings::{
|
use waku_bindings::{
|
||||||
waku_destroy, waku_new, Encoding, Event, MessageHash, Running, WakuContentTopic, WakuMessage,
|
waku_destroy, waku_new, Encoding, Event, MessageHash, Running, WakuContentTopic, WakuMessage,
|
||||||
WakuNodeConfig, WakuNodeHandle,
|
WakuNodeConfig, WakuNodeHandle,
|
||||||
};
|
};
|
||||||
use std::ffi::c_void;
|
|
||||||
const ECHO_TIMEOUT: u64 = 1000;
|
const ECHO_TIMEOUT: u64 = 1000;
|
||||||
const ECHO_MESSAGE: &str = "Hi from 🦀!";
|
const ECHO_MESSAGE: &str = "Hi from 🦀!";
|
||||||
const TEST_PUBSUBTOPIC: &str = "test";
|
const TEST_PUBSUBTOPIC: &str = "test";
|
||||||
@ -29,24 +25,18 @@ fn try_publish_relay_messages(
|
|||||||
]))
|
]))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
struct Response {
|
|
||||||
hash: MessageHash,
|
|
||||||
payload: Vec<u8>,
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn test_echo_messages(
|
async fn test_echo_messages(
|
||||||
node1: &WakuNodeHandle<Running>,
|
node1: &WakuNodeHandle<Running>,
|
||||||
node2: &WakuNodeHandle<Running>,
|
node2: &WakuNodeHandle<Running>,
|
||||||
content: &'static str,
|
content: &'static str,
|
||||||
content_topic: WakuContentTopic,
|
content_topic: WakuContentTopic,
|
||||||
) {
|
) -> Result<(), String> {
|
||||||
// setting a naïve event handler to avoid appearing ERR messages in logs
|
// setting a naïve event handler to avoid appearing ERR messages in logs
|
||||||
node1.set_event_callback(|_LibwakuResponse| {});
|
node1.set_event_callback(&|_| {});
|
||||||
|
|
||||||
let rx_waku_message: OnceCell<WakuMessage> = OnceCell::new();
|
let rx_waku_message: OnceCell<WakuMessage> = OnceCell::new();
|
||||||
|
|
||||||
let closure = |response: LibwakuResponse| {
|
let closure = |response| {
|
||||||
if let LibwakuResponse::Success(v) = response {
|
if let LibwakuResponse::Success(v) = response {
|
||||||
let event: Event =
|
let event: Event =
|
||||||
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");
|
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");
|
||||||
@ -65,12 +55,7 @@ async fn test_echo_messages(
|
|||||||
|
|
||||||
println!("Before setting event callback");
|
println!("Before setting event callback");
|
||||||
|
|
||||||
unsafe {
|
node2.set_event_callback(&closure); // Set the event callback with the closure
|
||||||
let cb = utils::get_trampoline(&closure);
|
|
||||||
waku_sys::waku_set_event_callback(node2.ctx.obj_ptr, cb, &closure as *const _ as *mut c_void)
|
|
||||||
};
|
|
||||||
|
|
||||||
// node2.set_event_callback(closure); // Set the event callback with the closure
|
|
||||||
|
|
||||||
let topic = TEST_PUBSUBTOPIC.to_string();
|
let topic = TEST_PUBSUBTOPIC.to_string();
|
||||||
node1.relay_subscribe(&topic).unwrap();
|
node1.relay_subscribe(&topic).unwrap();
|
||||||
@ -100,22 +85,28 @@ async fn test_echo_messages(
|
|||||||
Vec::new(),
|
Vec::new(),
|
||||||
false,
|
false,
|
||||||
);
|
);
|
||||||
let ids = try_publish_relay_messages(node1, &message).expect("send relay messages");
|
let _ids = try_publish_relay_messages(node1, &message).expect("send relay messages");
|
||||||
println!("After publish");
|
|
||||||
|
|
||||||
// Wait for the msg to arrive
|
// Wait for the msg to arrive
|
||||||
for _ in 0..50 {
|
for _ in 0..50 {
|
||||||
if let Some(value) = rx_waku_message.get() {
|
if let Some(msg) = rx_waku_message.get() {
|
||||||
println!("The waku message value is: {:?}", value);
|
println!("The waku message value is: {:?}", msg);
|
||||||
break;
|
let payload = msg.payload.to_vec();
|
||||||
|
let payload_str = from_utf8(&payload).expect("should be valid message");
|
||||||
|
println!("payload: {:?}", payload_str);
|
||||||
|
if payload_str == ECHO_MESSAGE {
|
||||||
|
return Ok(())
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
sleep(Duration::from_millis(100)).await;
|
sleep(Duration::from_millis(100)).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if let None = rx_waku_message.get() {
|
if let None = rx_waku_message.get() {
|
||||||
println!("ERROR could not get waku message");
|
return Err("could not get waku message".to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return Err("Unexpected test ending".to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
@ -134,7 +125,6 @@ async fn default_echo() -> Result<(), String> {
|
|||||||
let node1 = node1.start()?;
|
let node1 = node1.start()?;
|
||||||
let node2 = node2.start()?;
|
let node2 = node2.start()?;
|
||||||
|
|
||||||
let waku_version = node2.version()?;
|
|
||||||
let content_topic = WakuContentTopic::new("toychat", "2", "huilong", Encoding::Proto);
|
let content_topic = WakuContentTopic::new("toychat", "2", "huilong", Encoding::Proto);
|
||||||
|
|
||||||
let sleep = time::sleep(Duration::from_secs(ECHO_TIMEOUT));
|
let sleep = time::sleep(Duration::from_secs(ECHO_TIMEOUT));
|
||||||
@ -148,12 +138,8 @@ async fn default_echo() -> Result<(), String> {
|
|||||||
|
|
||||||
assert!(got_all);
|
assert!(got_all);
|
||||||
|
|
||||||
let node2 = node2.stop()?;
|
|
||||||
let node1 = node1.stop()?;
|
let node1 = node1.stop()?;
|
||||||
|
let node2 = node2.stop()?;
|
||||||
let sleep = time::sleep(Duration::from_secs(5));
|
|
||||||
tokio::pin!(sleep);
|
|
||||||
|
|
||||||
waku_destroy(node1)?;
|
waku_destroy(node1)?;
|
||||||
waku_destroy(node2)?;
|
waku_destroy(node2)?;
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user