wide adaptations to make the waku crate behave tokio-based async

This commit is contained in:
Ivan Folgueira Bande 2024-12-19 23:05:39 +01:00
parent 9d73660c32
commit e937e0541c
No known key found for this signature in database
GPG Key ID: 3C117481F89E24A7
18 changed files with 443 additions and 284 deletions

1
examples/Cargo.lock generated
View File

@ -4736,6 +4736,7 @@ dependencies = [
"prost", "prost",
"serde", "serde",
"serde_json", "serde_json",
"tokio",
"tui", "tui",
"unicode-width 0.1.12", "unicode-width 0.1.12",
"waku-bindings", "waku-bindings",

View File

@ -13,12 +13,14 @@ async fn main() -> Result<(), Error> {
tcp_port: Some(60010), // TODO: use any available port. tcp_port: Some(60010), // TODO: use any available port.
..Default::default() ..Default::default()
})) }))
.await
.expect("should instantiate"); .expect("should instantiate");
let node2 = waku_new(Some(WakuNodeConfig { let node2 = waku_new(Some(WakuNodeConfig {
tcp_port: Some(60020), // TODO: use any available port. tcp_port: Some(60020), // TODO: use any available port.
..Default::default() ..Default::default()
})) }))
.await
.expect("should instantiate"); .expect("should instantiate");
// ======================================================================== // ========================================================================
@ -31,7 +33,7 @@ async fn main() -> Result<(), Error> {
match event { match event {
WakuEvent::WakuMessage(evt) => { WakuEvent::WakuMessage(evt) => {
println!("WakuMessage event received: {:?}", evt.waku_message); // println!("WakuMessage event received: {:?}", evt.waku_message);
let message = evt.waku_message; let message = evt.waku_message;
let payload = message.payload.to_vec(); let payload = message.payload.to_vec();
let msg = from_utf8(&payload).expect("should be valid message"); let msg = from_utf8(&payload).expect("should be valid message");
@ -54,7 +56,7 @@ async fn main() -> Result<(), Error> {
match event { match event {
WakuEvent::WakuMessage(evt) => { WakuEvent::WakuMessage(evt) => {
println!("WakuMessage event received: {:?}", evt.waku_message); // println!("WakuMessage event received: {:?}", evt.waku_message);
let message = evt.waku_message; let message = evt.waku_message;
let payload = message.payload.to_vec(); let payload = message.payload.to_vec();
let msg = from_utf8(&payload).expect("should be valid message"); let msg = from_utf8(&payload).expect("should be valid message");
@ -69,8 +71,8 @@ async fn main() -> Result<(), Error> {
}) })
.expect("set event call back working"); .expect("set event call back working");
let node1 = node1.start().expect("node1 should start"); let node1 = node1.start().await.expect("node1 should start");
let node2 = node2.start().expect("node2 should start"); let node2 = node2.start().await.expect("node2 should start");
// ======================================================================== // ========================================================================
// Subscribe to pubsub topic // Subscribe to pubsub topic
@ -78,10 +80,12 @@ async fn main() -> Result<(), Error> {
node1 node1
.relay_subscribe(&topic) .relay_subscribe(&topic)
.await
.expect("node1 should subscribe"); .expect("node1 should subscribe");
node2 node2
.relay_subscribe(&topic) .relay_subscribe(&topic)
.await
.expect("node2 should subscribe"); .expect("node2 should subscribe");
// ======================================================================== // ========================================================================
@ -89,10 +93,12 @@ async fn main() -> Result<(), Error> {
let addresses2 = node2 let addresses2 = node2
.listen_addresses() .listen_addresses()
.await
.expect("should obtain the addresses"); .expect("should obtain the addresses");
node1 node1
.connect(&addresses2[0], None) .connect(&addresses2[0], None)
.await
.expect("node1 should connect to node2"); .expect("node1 should connect to node2");
// ======================================================================== // ========================================================================
@ -119,6 +125,7 @@ async fn main() -> Result<(), Error> {
); );
node1 node1
.relay_publish_message(&message, &topic, None) .relay_publish_message(&message, &topic, None)
.await
.expect("should have sent the message"); .expect("should have sent the message");
// ======================================================================== // ========================================================================
@ -129,13 +136,13 @@ async fn main() -> Result<(), Error> {
// ======================================================================== // ========================================================================
// Stop both instances // Stop both instances
let node1 = node1.stop().expect("should stop"); let node1 = node1.stop().await.expect("should stop");
let node2 = node2.stop().expect("should stop"); let node2 = node2.stop().await.expect("should stop");
// ======================================================================== // ========================================================================
// Free resources // Free resources
node1.waku_destroy().expect("should deallocate"); node1.waku_destroy().await.expect("should deallocate");
node2.waku_destroy().expect("should deallocate"); node2.waku_destroy().await.expect("should deallocate");
Ok(()) Ok(())
} }

View File

@ -3,6 +3,7 @@ use serde::{Deserialize, Serialize};
use std::str::from_utf8; use std::str::from_utf8;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::{SystemTime, Duration}; use std::time::{SystemTime, Duration};
use tokio::task;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use waku::{ use waku::{
@ -48,7 +49,7 @@ impl TicTacToeApp<Initialized> {
} }
} }
fn start(self) -> TicTacToeApp<Running> { async fn start(self) -> TicTacToeApp<Running> {
let tx_clone = self.tx.clone(); let tx_clone = self.tx.clone();
let my_closure = move |response| { let my_closure = move |response| {
@ -84,14 +85,14 @@ impl TicTacToeApp<Initialized> {
self.waku.set_event_callback(my_closure).expect("set event call back working"); self.waku.set_event_callback(my_closure).expect("set event call back working");
// Start the waku node // Start the waku node
let waku = self.waku.start().expect("waku should start"); let waku = self.waku.start().await.expect("waku should start");
// Subscribe to desired topic using the relay protocol // Subscribe to desired topic using the relay protocol
// self.waku.relay_subscribe(&self.game_topic.to_string()).expect("waku should subscribe"); waku.relay_subscribe(&self.game_topic).await.expect("waku should subscribe");
let ctopic = WakuContentTopic::new("waku", "2", "tictactoegame", Encoding::Proto); // let ctopic = WakuContentTopic::new("waku", "2", "tictactoegame", Encoding::Proto);
let content_topics = vec![ctopic]; // let content_topics = vec![ctopic];
waku.filter_subscribe(&self.game_topic, content_topics).expect("waku should subscribe"); // waku.filter_subscribe(&self.game_topic, content_topics).await.expect("waku should subscribe");
// Connect to hard-coded node // Connect to hard-coded node
// let target_node_multi_addr = // let target_node_multi_addr =
@ -114,7 +115,7 @@ impl TicTacToeApp<Initialized> {
} }
impl TicTacToeApp<Running> { impl TicTacToeApp<Running> {
fn send_game_state(&self, game_state: &GameState) { async fn send_game_state(&self, game_state: &GameState) {
let serialized_game_state = serde_json::to_string(game_state).unwrap(); let serialized_game_state = serde_json::to_string(game_state).unwrap();
let content_topic = WakuContentTopic::new("waku", "2", "tictactoegame", Encoding::Proto); let content_topic = WakuContentTopic::new("waku", "2", "tictactoegame", Encoding::Proto);
@ -132,9 +133,11 @@ impl TicTacToeApp<Running> {
false, false,
); );
// self.waku.relay_publish_message(&message, &self.game_topic.to_string(), None) if let Ok(msg_hash) = self.waku.relay_publish_message(&message, &self.game_topic, None).await {
// .expect("Failed to send message"); dbg!(format!("message hash published: {}", msg_hash));
self.waku.lightpush_publish_message(&message, &self.game_topic).expect("Failed to send message"); }
// self.waku.lightpush_publish_message(&message, &self.game_topic);
} }
fn make_move(&mut self, row: usize, col: usize) { fn make_move(&mut self, row: usize, col: usize) {
@ -159,7 +162,17 @@ impl TicTacToeApp<Running> {
}; };
} }
self.send_game_state(&game_state); // Send updated state after a move // Call the async function in a blocking context
task::block_in_place(|| {
// Obtain the current runtime handle
let handle = tokio::runtime::Handle::current();
// Block on the async function
handle.block_on(async {
// Assuming `self` is available in the current context
self.send_game_state(&game_state).await;
});
});
} }
} }
} }
@ -314,7 +327,7 @@ async fn main() -> eframe::Result<()> {
// node_key: Some(SecretKey::from_str("2fc0515879e52b7b73297cfd6ab3abf7c344ef84b7a90ff6f4cc19e05a198027").unwrap()), // node_key: Some(SecretKey::from_str("2fc0515879e52b7b73297cfd6ab3abf7c344ef84b7a90ff6f4cc19e05a198027").unwrap()),
max_message_size: Some("1024KiB".to_string()), max_message_size: Some("1024KiB".to_string()),
relay_topics: vec![String::from(&game_topic)], relay_topics: vec![String::from(&game_topic)],
log_level: Some("DEBUG"), // Supported: TRACE, DEBUG, INFO, NOTICE, WARN, ERROR or FATAL log_level: Some("FATAL"), // Supported: TRACE, DEBUG, INFO, NOTICE, WARN, ERROR or FATAL
keep_alive: Some(true), keep_alive: Some(true),
@ -326,7 +339,7 @@ async fn main() -> eframe::Result<()> {
// discv5_enr_auto_update: Some(false), // discv5_enr_auto_update: Some(false),
..Default::default() ..Default::default()
})) })).await
.expect("should instantiate"); .expect("should instantiate");
let game_state = GameState { let game_state = GameState {
@ -339,7 +352,7 @@ async fn main() -> eframe::Result<()> {
let clone = shared_state.clone(); let clone = shared_state.clone();
let app = TicTacToeApp::new(waku, game_topic, clone, tx); let app = TicTacToeApp::new(waku, game_topic, clone, tx);
let app = app.start(); let app = app.start().await;
let clone = shared_state.clone(); let clone = shared_state.clone();
// Listen for messages in the main thread // Listen for messages in the main thread

View File

@ -15,4 +15,5 @@ tui = "0.19"
crossterm = "0.25" crossterm = "0.25"
unicode-width = "0.1" unicode-width = "0.1"
prost = "0.11" prost = "0.11"
chrono = "0.4" chrono = "0.4"
tokio = { version = "1", features = ["full"] }

View File

@ -1,6 +1,7 @@
mod protocol; mod protocol;
use crate::protocol::{Chat2Message, TOY_CHAT_CONTENT_TOPIC}; use crate::protocol::{Chat2Message, TOY_CHAT_CONTENT_TOPIC};
use tokio::task;
use chrono::Utc; use chrono::Utc;
use crossterm::{ use crossterm::{
event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode}, event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode},
@ -48,7 +49,7 @@ struct App<State> {
} }
impl App<Initialized> { impl App<Initialized> {
fn new(nick: String) -> Result<App<Initialized>> { async fn new(nick: String) -> Result<App<Initialized>> {
let pubsub_topic = PubsubTopic::new(DEFAULT_PUBSUB_TOPIC); let pubsub_topic = PubsubTopic::new(DEFAULT_PUBSUB_TOPIC);
let waku = waku_new(Some(WakuNodeConfig { let waku = waku_new(Some(WakuNodeConfig {
tcp_port: Some(60010), tcp_port: Some(60010),
@ -69,7 +70,7 @@ impl App<Initialized> {
// discv5_enr_auto_update: Some(false), // discv5_enr_auto_update: Some(false),
..Default::default() ..Default::default()
}))?; })).await?;
Ok(App { Ok(App {
input: String::new(), input: String::new(),
@ -80,7 +81,7 @@ impl App<Initialized> {
}) })
} }
fn start_waku_node(self) -> Result<App<Running>> { async fn start_waku_node(self) -> Result<App<Running>> {
let shared_messages = Arc::clone(&self.messages); let shared_messages = Arc::clone(&self.messages);
@ -116,10 +117,10 @@ impl App<Initialized> {
} }
})?; })?;
let waku = self.waku.start()?; let waku = self.waku.start().await?;
let pubsub_topic = PubsubTopic::new(DEFAULT_PUBSUB_TOPIC); let pubsub_topic = PubsubTopic::new(DEFAULT_PUBSUB_TOPIC);
waku.relay_subscribe(&pubsub_topic)?; waku.relay_subscribe(&pubsub_topic).await?;
Ok(App { Ok(App {
input: self.input, input: self.input,
@ -133,8 +134,8 @@ impl App<Initialized> {
impl App<Running> { impl App<Running> {
fn retrieve_history(&mut self) { async fn retrieve_history(&mut self) {
let messages = self.waku.store_query(None, vec![TOY_CHAT_CONTENT_TOPIC.clone()], STORE_NODE).unwrap(); let messages = self.waku.store_query(None, vec![TOY_CHAT_CONTENT_TOPIC.clone()], STORE_NODE).await.unwrap();
let messages:Vec<_> = messages let messages:Vec<_> = messages
.iter() .iter()
.map(|store_resp_msg| { .map(|store_resp_msg| {
@ -183,15 +184,25 @@ impl App<Running> {
false, false,
); );
let pubsub_topic = PubsubTopic::new(DEFAULT_PUBSUB_TOPIC); // Call the async function in a blocking context
if let Err(e) = self.waku.relay_publish_message( task::block_in_place(|| {
&waku_message, // Obtain the current runtime handle
&pubsub_topic, let handle = tokio::runtime::Handle::current();
None,
) { // Block on the async function
let mut out = std::io::stderr(); handle.block_on(async {
write!(out, "{e:?}").unwrap(); // Assuming `self` is available in the current context
} let pubsub_topic = PubsubTopic::new(DEFAULT_PUBSUB_TOPIC);
if let Err(e) = self.waku.relay_publish_message(
&waku_message,
&pubsub_topic,
None,
).await {
let mut out = std::io::stderr();
write!(out, "{e:?}").unwrap();
}
});
});
} }
KeyCode::Char(c) => { KeyCode::Char(c) => {
self.input.push(c); self.input.push(c);
@ -210,16 +221,17 @@ impl App<Running> {
} }
} }
fn stop_app(self) { async fn stop_app(self) {
self.waku.stop().expect("the node should stop properly"); self.waku.stop().await.expect("the node should stop properly");
} }
} }
fn main() -> std::result::Result<(), Box<dyn Error>> { #[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn Error>> {
let nick = std::env::args().nth(1).expect("Nick to be set"); let nick = std::env::args().nth(1).expect("Nick to be set");
let app = App::new(nick)?; let app = App::new(nick).await?;
let mut app = app.start_waku_node()?; let mut app = app.start_waku_node().await?;
// setup terminal // setup terminal
enable_raw_mode()?; enable_raw_mode()?;
@ -228,9 +240,9 @@ fn main() -> std::result::Result<(), Box<dyn Error>> {
let backend = CrosstermBackend::new(stdout); let backend = CrosstermBackend::new(stdout);
let mut terminal = Terminal::new(backend)?; let mut terminal = Terminal::new(backend)?;
app.retrieve_history(); app.retrieve_history().await;
let res = app.run_main_loop(&mut terminal); let res = app.run_main_loop(&mut terminal);
app.stop_app(); app.stop_app().await;
// restore terminal // restore terminal
disable_raw_mode()?; disable_raw_mode()?;

View File

@ -1,4 +1,6 @@
// std // std
use crate::general::Result;
use crate::utils::WakuDecode;
use std::borrow::Cow; use std::borrow::Cow;
use std::fmt::{Display, Formatter}; use std::fmt::{Display, Formatter};
use std::str::FromStr; use std::str::FromStr;
@ -79,6 +81,12 @@ impl WakuContentTopic {
} }
} }
impl WakuDecode for WakuContentTopic {
fn decode(input: &str) -> Result<Self> {
Ok(serde_json::from_str(input).expect("could not parse store resp"))
}
}
impl FromStr for WakuContentTopic { impl FromStr for WakuContentTopic {
type Err = String; type Err = String;

View File

@ -2,12 +2,28 @@ use crate::utils::WakuDecode;
use hex::FromHex; use hex::FromHex;
use serde::{Deserialize, Deserializer, Serialize}; use serde::{Deserialize, Deserializer, Serialize};
use std::convert::TryInto; use std::convert::TryInto;
use std::fmt;
use std::hash::{Hash, Hasher};
use std::str::FromStr; use std::str::FromStr;
/// Waku message hash, hex encoded sha256 digest of the message /// Waku message hash, hex encoded sha256 digest of the message
#[derive(Debug, Serialize, Clone)] #[derive(Debug, Serialize, PartialEq, Eq, Clone)]
pub struct MessageHash([u8; 32]); pub struct MessageHash([u8; 32]);
impl MessageHash {
fn to_hex_string(&self) -> String {
let hex: String = self.0.iter().map(|b| format!("{:02x}", b)).collect();
format!("0x{}", hex)
}
}
impl Hash for MessageHash {
fn hash<H: Hasher>(&self, state: &mut H) {
// Use the inner array to contribute to the hash
self.0.hash(state);
}
}
impl FromStr for MessageHash { impl FromStr for MessageHash {
type Err = String; type Err = String;
@ -45,6 +61,13 @@ impl<'de> Deserialize<'de> for MessageHash {
impl WakuDecode for MessageHash { impl WakuDecode for MessageHash {
fn decode(input: &str) -> Result<Self, String> { fn decode(input: &str) -> Result<Self, String> {
serde_json::from_str(input).expect("could not parse MessageHash") MessageHash::from_str(input)
}
}
// Implement the Display trait
impl fmt::Display for MessageHash {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.to_hex_string())
} }
} }

View File

@ -41,7 +41,7 @@ mod tests {
#[test] #[test]
fn deserialize_message_event() { fn deserialize_message_event() {
let s = "{\"eventType\":\"message\",\"messageHash\":\"0x26ff3d7fbc950ea2158ce62fd76fd745eee0323c9eac23d0713843b0f04ea27c\",\"pubsubTopic\":\"/waku/2/default-waku/proto\",\"wakuMessage\":{\"payload\":\"SGkgZnJvbSDwn6aAIQ==\",\"contentTopic\":\"/toychat/2/huilong/proto\",\"timestamp\":1665580926660}}"; let s = "{\"eventType\":\"message\",\"messageHash\":[91, 70, 26, 8, 141, 232, 150, 200, 26, 206, 224, 175, 249, 74, 61, 140, 231, 126, 224, 160, 91, 80, 162, 65, 250, 171, 84, 149, 133, 110, 214, 101],\"pubsubTopic\":\"/waku/2/default-waku/proto\",\"wakuMessage\":{\"payload\":\"SGkgZnJvbSDwn6aAIQ==\",\"contentTopic\":\"/toychat/2/huilong/proto\",\"timestamp\":1665580926660}}";
let evt: WakuEvent = serde_json::from_str(s).unwrap(); let evt: WakuEvent = serde_json::from_str(s).unwrap();
assert!(matches!(evt, WakuEvent::WakuMessage(_))); assert!(matches!(evt, WakuEvent::WakuMessage(_)));
} }

View File

@ -4,6 +4,8 @@
use std::ffi::CString; use std::ffi::CString;
// crates // crates
use libc::*; use libc::*;
use std::sync::Arc;
use tokio::sync::Notify;
// internal // internal
use crate::general::contenttopic::WakuContentTopic; use crate::general::contenttopic::WakuContentTopic;
use crate::general::pubsubtopic::PubsubTopic; use crate::general::pubsubtopic::PubsubTopic;
@ -11,22 +13,29 @@ use crate::general::Result;
use crate::node::context::WakuNodeContext; use crate::node::context::WakuNodeContext;
use crate::utils::{get_trampoline, handle_no_response, LibwakuResponse}; use crate::utils::{get_trampoline, handle_no_response, LibwakuResponse};
pub fn waku_filter_subscribe( pub async fn waku_filter_subscribe(
ctx: &WakuNodeContext, ctx: &WakuNodeContext,
pubsub_topic: &PubsubTopic, pubsub_topic: &PubsubTopic,
content_topics: Vec<WakuContentTopic>, content_topics: Vec<WakuContentTopic>,
) -> Result<()> { ) -> Result<()> {
let content_topics = WakuContentTopic::join_content_topics(content_topics); let content_topics = WakuContentTopic::join_content_topics(content_topics);
let pubsub_topic_ptr = CString::new(String::from(pubsub_topic)) let pubsub_topic = CString::new(String::from(pubsub_topic))
.expect("CString should build properly from pubsub topic") .expect("CString should build properly from pubsub topic");
.into_raw(); let pubsub_topic_ptr = pubsub_topic.as_ptr();
let content_topics_ptr = CString::new(content_topics)
.expect("CString should build properly from content topic") let content_topics =
.into_raw(); CString::new(content_topics).expect("CString should build properly from content topic");
let content_topics_ptr = content_topics.as_ptr();
let mut result = LibwakuResponse::default();
let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();
let result_cb = |r: LibwakuResponse| {
result = r;
notify_clone.notify_one(); // Notify that the value has been updated
};
let mut result: LibwakuResponse = Default::default();
let result_cb = |r: LibwakuResponse| result = r;
let code = unsafe { let code = unsafe {
let mut closure = result_cb; let mut closure = result_cb;
let cb = get_trampoline(&closure); let cb = get_trampoline(&closure);
@ -38,31 +47,35 @@ pub fn waku_filter_subscribe(
&mut closure as *mut _ as *mut c_void, &mut closure as *mut _ as *mut c_void,
); );
drop(CString::from_raw(pubsub_topic_ptr));
drop(CString::from_raw(content_topics_ptr));
out out
}; };
notify.notified().await; // Wait until a result is received
handle_no_response(code, result) handle_no_response(code, result)
} }
pub fn waku_filter_unsubscribe( pub async fn waku_filter_unsubscribe(
ctx: &WakuNodeContext, ctx: &WakuNodeContext,
pubsub_topic: &PubsubTopic, pubsub_topic: &PubsubTopic,
content_topics: Vec<WakuContentTopic>, // comma-separated list of content topics content_topics: Vec<WakuContentTopic>, // comma-separated list of content topics
) -> Result<()> { ) -> Result<()> {
let content_topics_topics = WakuContentTopic::join_content_topics(content_topics); let content_topics_topics = WakuContentTopic::join_content_topics(content_topics);
let pubsub_topic_ptr = CString::new(String::from(pubsub_topic)) let pubsub_topic = CString::new(String::from(pubsub_topic))
.expect("CString should build properly from pubsub topic") .expect("CString should build properly from pubsub topic");
.into_raw(); let pubsub_topic_ptr = pubsub_topic.as_ptr();
let content_topics_topics_ptr = CString::new(content_topics_topics)
.expect("CString should build properly from content topic")
.into_raw();
let mut result: LibwakuResponse = Default::default(); let content_topics_topics = CString::new(content_topics_topics)
let result_cb = |r: LibwakuResponse| result = r; .expect("CString should build properly from content topic");
let content_topics_topics_ptr = content_topics_topics.as_ptr();
let mut result = LibwakuResponse::default();
let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();
let result_cb = |r: LibwakuResponse| {
result = r;
notify_clone.notify_one(); // Notify that the value has been updated
};
let code = unsafe { let code = unsafe {
let mut closure = result_cb; let mut closure = result_cb;
let cb = get_trampoline(&closure); let cb = get_trampoline(&closure);
@ -74,18 +87,21 @@ pub fn waku_filter_unsubscribe(
&mut closure as *mut _ as *mut c_void, &mut closure as *mut _ as *mut c_void,
); );
drop(CString::from_raw(pubsub_topic_ptr));
drop(CString::from_raw(content_topics_topics_ptr));
out out
}; };
notify.notified().await; // Wait until a result is received
handle_no_response(code, result) handle_no_response(code, result)
} }
pub fn waku_filter_unsubscribe_all(ctx: &WakuNodeContext) -> Result<()> { pub async fn waku_filter_unsubscribe_all(ctx: &WakuNodeContext) -> Result<()> {
let mut result: LibwakuResponse = Default::default(); let mut result = LibwakuResponse::default();
let result_cb = |r: LibwakuResponse| result = r; let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();
let result_cb = |r: LibwakuResponse| {
result = r;
notify_clone.notify_one(); // Notify that the value has been updated
};
let code = unsafe { let code = unsafe {
let mut closure = result_cb; let mut closure = result_cb;
let cb = get_trampoline(&closure); let cb = get_trampoline(&closure);
@ -96,5 +112,6 @@ pub fn waku_filter_unsubscribe_all(ctx: &WakuNodeContext) -> Result<()> {
) )
}; };
notify.notified().await; // Wait until a result is received
handle_no_response(code, result) handle_no_response(code, result)
} }

View File

@ -4,6 +4,8 @@
use std::ffi::CString; use std::ffi::CString;
// crates // crates
use libc::*; use libc::*;
use std::sync::Arc;
use tokio::sync::Notify;
// internal // internal
use crate::general::{messagehash::MessageHash, Result, WakuMessage}; use crate::general::{messagehash::MessageHash, Result, WakuMessage};
use crate::node::context::WakuNodeContext; use crate::node::context::WakuNodeContext;
@ -11,23 +13,30 @@ use crate::utils::{get_trampoline, handle_response, LibwakuResponse};
use crate::general::pubsubtopic::PubsubTopic; use crate::general::pubsubtopic::PubsubTopic;
pub fn waku_lightpush_publish_message( pub async fn waku_lightpush_publish_message(
ctx: &WakuNodeContext, ctx: &WakuNodeContext,
message: &WakuMessage, message: &WakuMessage,
pubsub_topic: &PubsubTopic, pubsub_topic: &PubsubTopic,
) -> Result<MessageHash> { ) -> Result<MessageHash> {
let message_ptr = CString::new( let message = CString::new(
serde_json::to_string(&message) serde_json::to_string(&message)
.expect("WakuMessages should always be able to success serializing"), .expect("WakuMessages should always be able to success serializing"),
) )
.expect("CString should build properly from the serialized waku message") .expect("CString should build properly from the serialized waku message");
.into_raw(); let message_ptr = message.as_ptr();
let pubsub_topic_ptr = CString::new(String::from(pubsub_topic))
.expect("CString should build properly from pubsub topic") let pubsub_topic = CString::new(String::from(pubsub_topic))
.into_raw(); .expect("CString should build properly from pubsub topic");
let pubsub_topic_ptr = pubsub_topic.as_ptr();
let mut result = LibwakuResponse::default();
let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();
let result_cb = |r: LibwakuResponse| {
result = r;
notify_clone.notify_one(); // Notify that the value has been updated
};
let mut result: LibwakuResponse = Default::default();
let result_cb = |r: LibwakuResponse| result = r;
let code = unsafe { let code = unsafe {
let mut closure = result_cb; let mut closure = result_cb;
let cb = get_trampoline(&closure); let cb = get_trampoline(&closure);
@ -39,11 +48,9 @@ pub fn waku_lightpush_publish_message(
&mut closure as *mut _ as *mut c_void, &mut closure as *mut _ as *mut c_void,
); );
drop(CString::from_raw(message_ptr));
drop(CString::from_raw(pubsub_topic_ptr));
out out
}; };
notify.notified().await; // Wait until a result is received
handle_response(code, result) handle_response(code, result)
} }

View File

@ -5,37 +5,44 @@ use std::ffi::CString;
// crates // crates
use libc::c_void; use libc::c_void;
use multiaddr::Multiaddr; use multiaddr::Multiaddr;
use std::sync::Arc;
use tokio::sync::Notify;
// internal // internal
use super::config::WakuNodeConfig; use super::config::WakuNodeConfig;
use crate::general::Result; use crate::general::Result;
use crate::node::context::WakuNodeContext; use crate::node::context::WakuNodeContext;
use crate::utils::LibwakuResponse; use crate::utils::LibwakuResponse;
use crate::utils::WakuDecode; use crate::utils::WakuDecode;
use crate::utils::{get_trampoline, handle_json_response, handle_no_response, handle_response}; use crate::utils::{get_trampoline, handle_no_response, handle_response};
/// Instantiates a Waku node /// Instantiates a Waku node
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_newchar-jsonconfig) /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_newchar-jsonconfig)
pub fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeContext> { pub async fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeContext> {
let config = config.unwrap_or_default(); let config = config.unwrap_or_default();
let config_ptr = CString::new( let config = CString::new(
serde_json::to_string(&config) serde_json::to_string(&config)
.expect("Serialization from properly built NodeConfig should never fail"), .expect("Serialization from properly built NodeConfig should never fail"),
) )
.expect("CString should build properly from the config") .expect("CString should build properly from the config");
.into_raw(); let config_ptr = config.as_ptr();
let mut result: LibwakuResponse = Default::default(); let notify = Arc::new(Notify::new());
let result_cb = |r: LibwakuResponse| result = r; let notify_clone = notify.clone();
let mut result = LibwakuResponse::default();
let result_cb = |r: LibwakuResponse| {
result = r;
notify_clone.notify_one(); // Notify that the value has been updated
};
let obj_ptr = unsafe { let obj_ptr = unsafe {
let mut closure = result_cb; let mut closure = result_cb;
let cb = get_trampoline(&closure); let cb = get_trampoline(&closure);
let out = waku_sys::waku_new(config_ptr, cb, &mut closure as *mut _ as *mut c_void); let out = waku_sys::waku_new(config_ptr, cb, &mut closure as *mut _ as *mut c_void);
drop(CString::from_raw(config_ptr));
out out
}; };
notify.notified().await; // Wait until a result is received
match result { match result {
LibwakuResponse::MissingCallback => panic!("callback is required"), LibwakuResponse::MissingCallback => panic!("callback is required"),
LibwakuResponse::Failure(v) => Err(v), LibwakuResponse::Failure(v) => Err(v),
@ -43,57 +50,81 @@ pub fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeContext> {
} }
} }
pub fn waku_destroy(ctx: &WakuNodeContext) -> Result<()> { pub async fn waku_destroy(ctx: &WakuNodeContext) -> Result<()> {
let mut result: LibwakuResponse = Default::default(); let mut result = LibwakuResponse::default();
let result_cb = |r: LibwakuResponse| result = r; let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();
let result_cb = |r: LibwakuResponse| {
result = r;
notify_clone.notify_one(); // Notify that the value has been updated
};
let code = unsafe { let code = unsafe {
let mut closure = result_cb; let mut closure = result_cb;
let cb = get_trampoline(&closure); let cb = get_trampoline(&closure);
waku_sys::waku_destroy(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void) waku_sys::waku_destroy(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void)
}; };
notify.notified().await; // Wait until a result is received
handle_no_response(code, result) handle_no_response(code, result)
} }
/// Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation. /// Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation.
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_start) /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_start)
pub fn waku_start(ctx: &WakuNodeContext) -> Result<()> { pub async fn waku_start(ctx: &WakuNodeContext) -> Result<()> {
let mut result: LibwakuResponse = Default::default(); let mut result = LibwakuResponse::default();
let result_cb = |r: LibwakuResponse| result = r; let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();
let result_cb = |r: LibwakuResponse| {
result = r;
notify_clone.notify_one(); // Notify that the value has been updated
};
let code = unsafe { let code = unsafe {
let mut closure = result_cb; let mut closure = result_cb;
let cb = get_trampoline(&closure); let cb = get_trampoline(&closure);
waku_sys::waku_start(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void) waku_sys::waku_start(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void)
}; };
notify.notified().await; // Wait until a result is received
handle_no_response(code, result) handle_no_response(code, result)
} }
/// Stops a Waku node /// Stops a Waku node
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop) /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop)
pub fn waku_stop(ctx: &WakuNodeContext) -> Result<()> { pub async fn waku_stop(ctx: &WakuNodeContext) -> Result<()> {
let mut result: LibwakuResponse = Default::default(); let mut result = LibwakuResponse::default();
let result_cb = |r: LibwakuResponse| result = r; let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();
let result_cb = |r: LibwakuResponse| {
result = r;
notify_clone.notify_one(); // Notify that the value has been updated
};
let code = unsafe { let code = unsafe {
let mut closure = result_cb; let mut closure = result_cb;
let cb = get_trampoline(&closure); let cb = get_trampoline(&closure);
waku_sys::waku_stop(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void) waku_sys::waku_stop(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void)
}; };
notify.notified().await; // Wait until a result is received
handle_no_response(code, result) handle_no_response(code, result)
} }
/// nwaku version /// nwaku version
#[allow(clippy::not_unsafe_ptr_arg_deref)] #[allow(clippy::not_unsafe_ptr_arg_deref)]
pub fn waku_version(ctx: &WakuNodeContext) -> Result<String> { pub async fn waku_version(ctx: &WakuNodeContext) -> Result<String> {
let mut result: LibwakuResponse = Default::default(); let mut result = LibwakuResponse::default();
let result_cb = |r: LibwakuResponse| result = r; let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();
let result_cb = |r: LibwakuResponse| {
result = r;
notify_clone.notify_one(); // Notify that the value has been updated
};
let code = unsafe { let code = unsafe {
let mut closure = result_cb; let mut closure = result_cb;
let cb = get_trampoline(&closure); let cb = get_trampoline(&closure);
waku_sys::waku_version(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void) waku_sys::waku_version(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void)
}; };
notify.notified().await; // Wait until a result is received
handle_response(code, result) handle_response(code, result)
} }
@ -110,45 +141,54 @@ impl WakuDecode for Vec<Multiaddr> {
/// Get the multiaddresses the Waku node is listening to /// Get the multiaddresses the Waku node is listening to
/// as per [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_listen_addresses) /// as per [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_listen_addresses)
pub fn waku_listen_addresses(ctx: &WakuNodeContext) -> Result<Vec<Multiaddr>> { pub async fn waku_listen_addresses(ctx: &WakuNodeContext) -> Result<Vec<Multiaddr>> {
let mut result: LibwakuResponse = Default::default(); let mut result = LibwakuResponse::default();
let result_cb = |r: LibwakuResponse| result = r; let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();
let result_cb = |r: LibwakuResponse| {
result = r;
notify_clone.notify_one(); // Notify that the value has been updated
};
let code = unsafe { let code = unsafe {
let mut closure = result_cb; let mut closure = result_cb;
let cb = get_trampoline(&closure); let cb = get_trampoline(&closure);
waku_sys::waku_listen_addresses(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void) waku_sys::waku_listen_addresses(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void)
}; };
handle_json_response(code, result) notify.notified().await; // Wait until a result is received
handle_response(code, result)
} }
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::waku_new; use super::waku_new;
use crate::node::management::{waku_listen_addresses, waku_start, waku_stop, waku_version}; use crate::node::management::{waku_listen_addresses, waku_start, waku_stop, waku_version};
use serial_test::serial;
#[test] #[tokio::test]
#[serial] async fn waku_flow() {
fn waku_flow() { let node = waku_new(None).await.unwrap();
let node = waku_new(None).unwrap();
waku_start(&node).unwrap(); waku_start(&node).await.unwrap();
// test addresses // test addresses
let addresses = waku_listen_addresses(&node).unwrap(); let addresses = waku_listen_addresses(&node).await.unwrap();
dbg!(&addresses); dbg!(&addresses);
assert!(!addresses.is_empty()); assert!(!addresses.is_empty());
waku_stop(&node).unwrap(); waku_stop(&node).await.unwrap();
} }
#[test] #[tokio::test]
#[serial] async fn nwaku_version() {
fn nwaku_version() { let node = waku_new(None).await.unwrap();
let node = waku_new(None).unwrap();
let version = waku_version(&node).expect("should return the version"); let version = waku_version(&node)
.await
.expect("should return the version");
print!("Current version: {}", version); print!("Current version: {}", version);
assert!(!version.is_empty()); assert!(!version.is_empty());
} }
} }

View File

@ -44,34 +44,41 @@ pub struct WakuNodeHandle<State> {
/// Spawn a new Waku node with the given configuration (default configuration if `None` provided) /// Spawn a new Waku node with the given configuration (default configuration if `None` provided)
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_newchar-jsonconfig) /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_newchar-jsonconfig)
pub fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeHandle<Initialized>> { pub async fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeHandle<Initialized>> {
Ok(WakuNodeHandle { Ok(WakuNodeHandle {
ctx: management::waku_new(config)?, ctx: management::waku_new(config).await?,
_state: PhantomData, _state: PhantomData,
}) })
} }
impl<State> WakuNodeHandle<State> { impl<State> WakuNodeHandle<State> {
/// Get the nwaku version /// Get the nwaku version
pub fn version(&self) -> Result<String> { pub async fn version(&self) -> Result<String> {
management::waku_version(&self.ctx) management::waku_version(&self.ctx).await
} }
pub fn waku_destroy(self) -> Result<()> { pub async fn waku_destroy(self) -> Result<()> {
let res = management::waku_destroy(&self.ctx); let res = management::waku_destroy(&self.ctx).await;
self.ctx.reset_ptr(); self.ctx.reset_ptr();
res res
} }
/// Subscribe to WakuRelay to receive messages matching a content filter.
pub async fn relay_subscribe(&self, pubsub_topic: &PubsubTopic) -> Result<()> {
relay::waku_relay_subscribe(&self.ctx, pubsub_topic).await
}
} }
impl WakuNodeHandle<Initialized> { impl WakuNodeHandle<Initialized> {
/// Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation. /// Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation.
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_start) /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_start)
pub fn start(self) -> Result<WakuNodeHandle<Running>> { pub async fn start(self) -> Result<WakuNodeHandle<Running>> {
management::waku_start(&self.ctx).map(|_| WakuNodeHandle { management::waku_start(&self.ctx)
ctx: self.ctx, .await
_state: PhantomData, .map(|_| WakuNodeHandle {
}) ctx: self.ctx,
_state: PhantomData,
})
} }
pub fn set_event_callback<F: FnMut(LibwakuResponse) + 'static + Sync + Send>( pub fn set_event_callback<F: FnMut(LibwakuResponse) + 'static + Sync + Send>(
@ -85,17 +92,19 @@ impl WakuNodeHandle<Initialized> {
impl WakuNodeHandle<Running> { impl WakuNodeHandle<Running> {
/// Stops a Waku node /// Stops a Waku node
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop) /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop)
pub fn stop(self) -> Result<WakuNodeHandle<Initialized>> { pub async fn stop(self) -> Result<WakuNodeHandle<Initialized>> {
management::waku_stop(&self.ctx).map(|_| WakuNodeHandle { management::waku_stop(&self.ctx)
ctx: self.ctx, .await
_state: PhantomData, .map(|_| WakuNodeHandle {
}) ctx: self.ctx,
_state: PhantomData,
})
} }
/// Get the multiaddresses the Waku node is listening to /// Get the multiaddresses the Waku node is listening to
/// as per [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_listen_addresses) /// as per [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_listen_addresses)
pub fn listen_addresses(&self) -> Result<Vec<Multiaddr>> { pub async fn listen_addresses(&self) -> Result<Vec<Multiaddr>> {
management::waku_listen_addresses(&self.ctx) management::waku_listen_addresses(&self.ctx).await
} }
/// Dial peer using a multiaddress /// Dial peer using a multiaddress
@ -103,11 +112,11 @@ impl WakuNodeHandle<Running> {
/// If the function execution takes longer than `timeout` value, the execution will be canceled and an error returned. /// If the function execution takes longer than `timeout` value, the execution will be canceled and an error returned.
/// Use 0 for no timeout /// Use 0 for no timeout
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_connect_peerchar-address-int-timeoutms) /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_connect_peerchar-address-int-timeoutms)
pub fn connect(&self, address: &Multiaddr, timeout: Option<Duration>) -> Result<()> { pub async fn connect(&self, address: &Multiaddr, timeout: Option<Duration>) -> Result<()> {
peers::waku_connect(&self.ctx, address, timeout) peers::waku_connect(&self.ctx, address, timeout).await
} }
pub fn relay_publish_txt( pub async fn relay_publish_txt(
&self, &self,
pubsub_topic: &PubsubTopic, pubsub_topic: &PubsubTopic,
msg_txt: &String, msg_txt: &String,
@ -129,60 +138,55 @@ impl WakuNodeHandle<Running> {
false, false,
); );
relay::waku_relay_publish_message(&self.ctx, &message, pubsub_topic, timeout) relay::waku_relay_publish_message(&self.ctx, &message, pubsub_topic, timeout).await
} }
/// Publish a message using Waku Relay. /// Publish a message using Waku Relay.
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms) /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms)
/// The pubsub_topic parameter is optional and if not specified it will be derived from the contentTopic. /// The pubsub_topic parameter is optional and if not specified it will be derived from the contentTopic.
pub fn relay_publish_message( pub async fn relay_publish_message(
&self, &self,
message: &WakuMessage, message: &WakuMessage,
pubsub_topic: &PubsubTopic, pubsub_topic: &PubsubTopic,
timeout: Option<Duration>, timeout: Option<Duration>,
) -> Result<MessageHash> { ) -> Result<MessageHash> {
relay::waku_relay_publish_message(&self.ctx, message, pubsub_topic, timeout) relay::waku_relay_publish_message(&self.ctx, message, pubsub_topic, timeout).await
}
/// Subscribe to WakuRelay to receive messages matching a content filter.
pub fn relay_subscribe(&self, pubsub_topic: &PubsubTopic) -> Result<()> {
relay::waku_relay_subscribe(&self.ctx, pubsub_topic)
} }
/// Closes the pubsub subscription to stop receiving messages matching a content filter. No more messages will be received from this pubsub topic /// Closes the pubsub subscription to stop receiving messages matching a content filter. No more messages will be received from this pubsub topic
pub fn relay_unsubscribe(&self, pubsub_topic: &PubsubTopic) -> Result<()> { pub async fn relay_unsubscribe(&self, pubsub_topic: &PubsubTopic) -> Result<()> {
relay::waku_relay_unsubscribe(&self.ctx, pubsub_topic) relay::waku_relay_unsubscribe(&self.ctx, pubsub_topic).await
} }
pub fn filter_subscribe( pub async fn filter_subscribe(
&self, &self,
pubsub_topic: &PubsubTopic, pubsub_topic: &PubsubTopic,
content_topics: Vec<WakuContentTopic>, content_topics: Vec<WakuContentTopic>,
) -> Result<()> { ) -> Result<()> {
filter::waku_filter_subscribe(&self.ctx, pubsub_topic, content_topics) filter::waku_filter_subscribe(&self.ctx, pubsub_topic, content_topics).await
} }
pub fn filter_unsubscribe( pub async fn filter_unsubscribe(
&self, &self,
pubsub_topic: &PubsubTopic, pubsub_topic: &PubsubTopic,
content_topics: Vec<WakuContentTopic>, content_topics: Vec<WakuContentTopic>,
) -> Result<()> { ) -> Result<()> {
filter::waku_filter_unsubscribe(&self.ctx, pubsub_topic, content_topics) filter::waku_filter_unsubscribe(&self.ctx, pubsub_topic, content_topics).await
} }
pub fn filter_unsubscribe_all(&self) -> Result<()> { pub async fn filter_unsubscribe_all(&self) -> Result<()> {
filter::waku_filter_unsubscribe_all(&self.ctx) filter::waku_filter_unsubscribe_all(&self.ctx).await
} }
pub fn lightpush_publish_message( pub async fn lightpush_publish_message(
&self, &self,
message: &WakuMessage, message: &WakuMessage,
pubsub_topic: &PubsubTopic, pubsub_topic: &PubsubTopic,
) -> Result<MessageHash> { ) -> Result<MessageHash> {
lightpush::waku_lightpush_publish_message(&self.ctx, message, pubsub_topic) lightpush::waku_lightpush_publish_message(&self.ctx, message, pubsub_topic).await
} }
pub fn store_query( pub async fn store_query(
&self, &self,
pubsub_topic: Option<PubsubTopic>, pubsub_topic: Option<PubsubTopic>,
content_topics: Vec<WakuContentTopic>, content_topics: Vec<WakuContentTopic>,
@ -212,7 +216,8 @@ impl WakuNodeHandle<Running> {
Some(25), // pagination_limit, Some(25), // pagination_limit,
peer_addr, peer_addr,
None, // timeout_millis None, // timeout_millis
)?; )
.await?;
messages.extend(response.messages); messages.extend(response.messages);

View File

@ -6,6 +6,8 @@ use std::time::Duration;
// crates // crates
use libc::*; use libc::*;
use multiaddr::Multiaddr; use multiaddr::Multiaddr;
use std::sync::Arc;
use tokio::sync::Notify;
// internal // internal
use crate::general::Result; use crate::general::Result;
use crate::node::context::WakuNodeContext; use crate::node::context::WakuNodeContext;
@ -17,17 +19,23 @@ use crate::utils::{get_trampoline, handle_no_response};
/// If the function execution takes longer than `timeout` value, the execution will be canceled and an error returned. /// If the function execution takes longer than `timeout` value, the execution will be canceled and an error returned.
/// Use 0 for no timeout /// Use 0 for no timeout
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_connect_peerchar-address-int-timeoutms) /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_connect_peerchar-address-int-timeoutms)
pub fn waku_connect( pub async fn waku_connect(
ctx: &WakuNodeContext, ctx: &WakuNodeContext,
address: &Multiaddr, address: &Multiaddr,
timeout: Option<Duration>, timeout: Option<Duration>,
) -> Result<()> { ) -> Result<()> {
let address_ptr = CString::new(address.to_string()) let address =
.expect("CString should build properly from multiaddress") CString::new(address.to_string()).expect("CString should build properly from multiaddress");
.into_raw();
let mut result: LibwakuResponse = Default::default(); let address_ptr = address.as_ptr();
let result_cb = |r: LibwakuResponse| result = r;
let mut result = LibwakuResponse::default();
let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();
let result_cb = |r: LibwakuResponse| {
result = r;
notify_clone.notify_one(); // Notify that the value has been updated
};
let code = unsafe { let code = unsafe {
let mut closure = result_cb; let mut closure = result_cb;
let cb = get_trampoline(&closure); let cb = get_trampoline(&closure);
@ -41,10 +49,9 @@ pub fn waku_connect(
&mut closure as *mut _ as *mut c_void, &mut closure as *mut _ as *mut c_void,
); );
drop(CString::from_raw(address_ptr));
out out
}; };
notify.notified().await; // Wait until a result is received
handle_no_response(code, result) handle_no_response(code, result)
} }

View File

@ -2,7 +2,9 @@
// std // std
use std::ffi::CString; use std::ffi::CString;
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::sync::Notify;
// crates // crates
use libc::*; use libc::*;
// internal // internal
@ -15,25 +17,32 @@ use crate::utils::{get_trampoline, handle_no_response, handle_response, LibwakuR
/// Create a content topic according to [RFC 23](https://rfc.vac.dev/spec/23/) /// Create a content topic according to [RFC 23](https://rfc.vac.dev/spec/23/)
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_content_topicchar-applicationname-unsigned-int-applicationversion-char-contenttopicname-char-encoding) /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_content_topicchar-applicationname-unsigned-int-applicationversion-char-contenttopicname-char-encoding)
#[allow(clippy::not_unsafe_ptr_arg_deref)] #[allow(clippy::not_unsafe_ptr_arg_deref)]
pub fn waku_create_content_topic( pub async fn waku_create_content_topic(
ctx: &WakuNodeContext, ctx: &WakuNodeContext,
application_name: &str, application_name: &str,
application_version: u32, application_version: u32,
content_topic_name: &str, content_topic_name: &str,
encoding: Encoding, encoding: Encoding,
) -> WakuContentTopic { ) -> WakuContentTopic {
let application_name_ptr = CString::new(application_name) let application_name = CString::new(application_name)
.expect("Application name should always transform to CString") .expect("Application name should always transform to CString");
.into_raw(); let application_name_ptr = application_name.as_ptr();
let content_topic_name_ptr = CString::new(content_topic_name)
.expect("Content topic should always transform to CString")
.into_raw();
let encoding_ptr = CString::new(encoding.to_string())
.expect("Encoding should always transform to CString")
.into_raw();
let mut result: LibwakuResponse = Default::default(); let content_topic_name =
let result_cb = |r: LibwakuResponse| result = r; CString::new(content_topic_name).expect("Content topic should always transform to CString");
let content_topic_name_ptr = content_topic_name.as_ptr();
let encoding =
CString::new(encoding.to_string()).expect("Encoding should always transform to CString");
let encoding_ptr = encoding.as_ptr();
let mut result = LibwakuResponse::default();
let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();
let result_cb = |r: LibwakuResponse| {
result = r;
notify_clone.notify_one(); // Notify that the value has been updated
};
let code = unsafe { let code = unsafe {
let mut closure = result_cb; let mut closure = result_cb;
let cb = get_trampoline(&closure); let cb = get_trampoline(&closure);
@ -47,43 +56,44 @@ pub fn waku_create_content_topic(
&mut closure as *mut _ as *mut c_void, &mut closure as *mut _ as *mut c_void,
); );
drop(CString::from_raw(application_name_ptr));
drop(CString::from_raw(content_topic_name_ptr));
drop(CString::from_raw(encoding_ptr));
out out
}; };
notify.notified().await; // Wait until a result is received
handle_response(code, result).expect("&str from result should always be extracted") handle_response(code, result).expect("&str from result should always be extracted")
} }
/// Publish a message using Waku Relay /// Publish a message using Waku Relay
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms) /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms)
pub fn waku_relay_publish_message( pub async fn waku_relay_publish_message(
ctx: &WakuNodeContext, ctx: &WakuNodeContext,
message: &WakuMessage, message: &WakuMessage,
pubsub_topic: &PubsubTopic, pubsub_topic: &PubsubTopic,
timeout: Option<Duration>, timeout: Option<Duration>,
) -> Result<MessageHash> { ) -> Result<MessageHash> {
let message_ptr = CString::new( let message = CString::new(
serde_json::to_string(&message) serde_json::to_string(&message)
.expect("WakuMessages should always be able to success serializing"), .expect("WakuMessages should always be able to success serializing"),
) )
.expect("CString should build properly from the serialized waku message") .expect("CString should build properly from the serialized waku message");
.into_raw();
let pubsub_topic_ptr = CString::new(String::from(pubsub_topic))
.expect("CString should build properly from pubsub topic")
.into_raw();
let mut result: LibwakuResponse = Default::default(); let pubsub_topic = CString::new(String::from(pubsub_topic))
let result_cb = |r: LibwakuResponse| result = r; .expect("CString should build properly from pubsub topic");
let mut result = LibwakuResponse::default();
let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();
let result_cb = |r: LibwakuResponse| {
result = r;
notify_clone.notify_one(); // Notify that the value has been updated
};
let code = unsafe { let code = unsafe {
let mut closure = result_cb; let mut closure = result_cb;
let cb = get_trampoline(&closure); let cb = get_trampoline(&closure);
let out = waku_sys::waku_relay_publish( let out = waku_sys::waku_relay_publish(
ctx.get_ptr(), ctx.get_ptr(),
pubsub_topic_ptr, pubsub_topic.as_ptr(),
message_ptr, message.as_ptr(),
timeout timeout
.map(|duration| { .map(|duration| {
duration duration
@ -96,22 +106,25 @@ pub fn waku_relay_publish_message(
&mut closure as *mut _ as *mut c_void, &mut closure as *mut _ as *mut c_void,
); );
drop(CString::from_raw(message_ptr));
drop(CString::from_raw(pubsub_topic_ptr));
out out
}; };
notify.notified().await; // Wait until a result is received
handle_response(code, result) handle_response(code, result)
} }
pub fn waku_relay_subscribe(ctx: &WakuNodeContext, pubsub_topic: &PubsubTopic) -> Result<()> { pub async fn waku_relay_subscribe(ctx: &WakuNodeContext, pubsub_topic: &PubsubTopic) -> Result<()> {
let pubsub_topic_ptr = CString::new(String::from(pubsub_topic)) let pubsub_topic = CString::new(String::from(pubsub_topic))
.expect("CString should build properly from pubsub topic") .expect("CString should build properly from pubsub topic");
.into_raw(); let pubsub_topic_ptr = pubsub_topic.as_ptr();
let mut result: LibwakuResponse = Default::default(); let mut result = LibwakuResponse::default();
let result_cb = |r: LibwakuResponse| result = r; let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();
let result_cb = |r: LibwakuResponse| {
result = r;
notify_clone.notify_one(); // Notify that the value has been updated
};
let code = unsafe { let code = unsafe {
let mut closure = result_cb; let mut closure = result_cb;
let cb = get_trampoline(&closure); let cb = get_trampoline(&closure);
@ -122,21 +135,28 @@ pub fn waku_relay_subscribe(ctx: &WakuNodeContext, pubsub_topic: &PubsubTopic) -
&mut closure as *mut _ as *mut c_void, &mut closure as *mut _ as *mut c_void,
); );
drop(CString::from_raw(pubsub_topic_ptr));
out out
}; };
notify.notified().await; // Wait until a result is received
handle_no_response(code, result) handle_no_response(code, result)
} }
pub fn waku_relay_unsubscribe(ctx: &WakuNodeContext, pubsub_topic: &PubsubTopic) -> Result<()> { pub async fn waku_relay_unsubscribe(
let pubsub_topic_ptr = CString::new(String::from(pubsub_topic)) ctx: &WakuNodeContext,
.expect("CString should build properly from pubsub topic") pubsub_topic: &PubsubTopic,
.into_raw(); ) -> Result<()> {
let pubsub_topic = CString::new(String::from(pubsub_topic))
.expect("CString should build properly from pubsub topic");
let pubsub_topic_ptr = pubsub_topic.as_ptr();
let mut result: LibwakuResponse = Default::default(); let mut result = LibwakuResponse::default();
let result_cb = |r: LibwakuResponse| result = r; let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();
let result_cb = |r: LibwakuResponse| {
result = r;
notify_clone.notify_one(); // Notify that the value has been updated
};
let code = unsafe { let code = unsafe {
let mut closure = result_cb; let mut closure = result_cb;
let cb = get_trampoline(&closure); let cb = get_trampoline(&closure);
@ -147,10 +167,9 @@ pub fn waku_relay_unsubscribe(ctx: &WakuNodeContext, pubsub_topic: &PubsubTopic)
&mut closure as *mut _ as *mut c_void, &mut closure as *mut _ as *mut c_void,
); );
drop(CString::from_raw(pubsub_topic_ptr));
out out
}; };
notify.notified().await; // Wait until a result is received
handle_no_response(code, result) handle_no_response(code, result)
} }

View File

@ -4,13 +4,15 @@
use std::ffi::CString; use std::ffi::CString;
// crates // crates
use libc::*; use libc::*;
use std::sync::Arc;
use tokio::sync::Notify;
// internal // internal
use crate::general::{ use crate::general::{
contenttopic::WakuContentTopic, messagehash::MessageHash, pubsubtopic::PubsubTopic, Result, contenttopic::WakuContentTopic, messagehash::MessageHash, pubsubtopic::PubsubTopic, Result,
WakuStoreRespMessage, WakuStoreRespMessage,
}; };
use crate::node::context::WakuNodeContext; use crate::node::context::WakuNodeContext;
use crate::utils::{get_trampoline, handle_json_response, LibwakuResponse, WakuDecode}; use crate::utils::{get_trampoline, handle_response, LibwakuResponse, WakuDecode};
use multiaddr::Multiaddr; use multiaddr::Multiaddr;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -76,7 +78,7 @@ impl WakuDecode for StoreResponse {
} }
} }
pub fn waku_store_query( pub async fn waku_store_query(
ctx: &WakuNodeContext, ctx: &WakuNodeContext,
request_id: String, request_id: String,
include_data: bool, include_data: bool,
@ -107,37 +109,39 @@ pub fn waku_store_query(
let json_query = CString::new( let json_query = CString::new(
serde_json::to_string(&query).expect("StoreQuery should always be able to be serialized"), serde_json::to_string(&query).expect("StoreQuery should always be able to be serialized"),
) )
.expect("CString should build properly from the serialized filter subscription") .expect("CString should build properly from the serialized filter subscription");
.into_raw(); let json_query_ptr = json_query.as_ptr();
peer_addr peer_addr
.parse::<Multiaddr>() .parse::<Multiaddr>()
.expect("correct multiaddress in store query"); .expect("correct multiaddress in store query");
let peer_addr = CString::new(peer_addr) let peer_addr = CString::new(peer_addr).expect("peer_addr CString should be created");
.expect("peer_addr CString should be created") let peer_addr_ptr = peer_addr.as_ptr();
.into_raw();
let timeout_millis = timeout_millis.unwrap_or(10000i32); let timeout_millis = timeout_millis.unwrap_or(10000i32);
let mut result: LibwakuResponse = Default::default(); let mut result = LibwakuResponse::default();
let result_cb = |r: LibwakuResponse| result = r; let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();
let result_cb = |r: LibwakuResponse| {
result = r;
notify_clone.notify_one(); // Notify that the value has been updated
};
let code = unsafe { let code = unsafe {
let mut closure = result_cb; let mut closure = result_cb;
let cb = get_trampoline(&closure); let cb = get_trampoline(&closure);
let out = waku_sys::waku_store_query( let out = waku_sys::waku_store_query(
ctx.get_ptr(), ctx.get_ptr(),
json_query, json_query_ptr,
peer_addr, peer_addr_ptr,
timeout_millis, timeout_millis,
cb, cb,
&mut closure as *mut _ as *mut c_void, &mut closure as *mut _ as *mut c_void,
); );
drop(CString::from_raw(json_query));
drop(CString::from_raw(peer_addr));
out out
}; };
handle_json_response(code, result) notify.notified().await; // Wait until a result is received
handle_response(code, result)
} }

View File

@ -1,11 +1,10 @@
use crate::general::Result; use crate::general::Result;
use core::str::FromStr;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::{slice, str}; use std::{slice, str};
use waku_sys::WakuCallBack; use waku_sys::WakuCallBack;
use waku_sys::{RET_ERR, RET_MISSING_CALLBACK, RET_OK}; use waku_sys::{RET_ERR, RET_MISSING_CALLBACK, RET_OK};
#[derive(Debug, Default, PartialEq)] #[derive(Debug, Clone, Default, PartialEq)]
pub enum LibwakuResponse { pub enum LibwakuResponse {
Success(Option<String>), Success(Option<String>),
Failure(String), Failure(String),
@ -36,6 +35,12 @@ pub trait WakuDecode: Sized {
fn decode(input: &str) -> Result<Self>; fn decode(input: &str) -> Result<Self>;
} }
impl WakuDecode for String {
fn decode(input: &str) -> Result<Self> {
Ok(input.to_string())
}
}
pub fn decode<T: WakuDecode>(input: String) -> Result<T> { pub fn decode<T: WakuDecode>(input: String) -> Result<T> {
T::decode(input.as_str()) T::decode(input.as_str())
} }
@ -87,7 +92,7 @@ pub fn handle_no_response(code: i32, result: LibwakuResponse) -> Result<()> {
} }
} }
pub fn handle_json_response<F: WakuDecode>(code: i32, result: LibwakuResponse) -> Result<F> { pub fn handle_response<F: WakuDecode>(code: i32, result: LibwakuResponse) -> Result<F> {
match result { match result {
LibwakuResponse::Success(v) => decode(v.unwrap_or_default()), LibwakuResponse::Success(v) => decode(v.unwrap_or_default()),
LibwakuResponse::Failure(v) => Err(v), LibwakuResponse::Failure(v) => Err(v),
@ -98,21 +103,3 @@ pub fn handle_json_response<F: WakuDecode>(code: i32, result: LibwakuResponse) -
), ),
} }
} }
pub fn handle_response<F: FromStr>(code: i32, result: LibwakuResponse) -> Result<F>
where
<F as FromStr>::Err: std::fmt::Debug,
{
match result {
LibwakuResponse::Success(v) => v
.unwrap_or_default()
.parse()
.map_err(|e| format!("could not parse value: {:?}", e)),
LibwakuResponse::Failure(v) => Err(v),
LibwakuResponse::MissingCallback => panic!("callback is required"),
LibwakuResponse::Undefined => panic!(
"undefined ffi state: code({}) was returned but callback was not executed",
code
),
}
}

View File

@ -18,15 +18,13 @@ const ECHO_TIMEOUT: u64 = 1000;
const ECHO_MESSAGE: &str = "Hi from 🦀!"; const ECHO_MESSAGE: &str = "Hi from 🦀!";
const TEST_PUBSUBTOPIC: &str = "test"; const TEST_PUBSUBTOPIC: &str = "test";
fn try_publish_relay_messages( async fn try_publish_relay_messages(
node: &WakuNodeHandle<Running>, node: &WakuNodeHandle<Running>,
msg: &WakuMessage, msg: &WakuMessage,
) -> Result<HashSet<MessageHash>, String> { ) -> Result<HashSet<MessageHash>, String> {
Ok(HashSet::from([node.relay_publish_message( Ok(HashSet::from([node
msg, .relay_publish_message(msg, &PubsubTopic::new(TEST_PUBSUBTOPIC), None)
&PubsubTopic::new(TEST_PUBSUBTOPIC), .await?]))
None,
)?]))
} }
async fn test_echo_messages( async fn test_echo_messages(
@ -66,21 +64,23 @@ async fn test_echo_messages(
.set_event_callback(closure) .set_event_callback(closure)
.expect("set event call back working"); // Set the event callback with the closure .expect("set event call back working"); // Set the event callback with the closure
let node1 = node1.start()?; let node1 = node1.start().await?;
let node2 = node2.start()?; let node2 = node2.start().await?;
node1 node1
.relay_subscribe(&PubsubTopic::new(TEST_PUBSUBTOPIC)) .relay_subscribe(&PubsubTopic::new(TEST_PUBSUBTOPIC))
.await
.unwrap(); .unwrap();
node2 node2
.relay_subscribe(&PubsubTopic::new(TEST_PUBSUBTOPIC)) .relay_subscribe(&PubsubTopic::new(TEST_PUBSUBTOPIC))
.await
.unwrap(); .unwrap();
sleep(Duration::from_secs(3)).await; sleep(Duration::from_secs(3)).await;
// Interconnect nodes // Interconnect nodes
// Replace all matches with 127.0.0.1 to avoid issue with NAT or firewall. // Replace all matches with 127.0.0.1 to avoid issue with NAT or firewall.
let addresses1 = node1.listen_addresses().unwrap(); let addresses1 = node1.listen_addresses().await.unwrap();
let addresses1 = &addresses1[0].to_string(); let addresses1 = &addresses1[0].to_string();
let re = Regex::new(r"\b(?:\d{1,3}\.){3}\d{1,3}\b").unwrap(); let re = Regex::new(r"\b(?:\d{1,3}\.){3}\d{1,3}\b").unwrap();
@ -89,7 +89,7 @@ async fn test_echo_messages(
let addresses1 = addresses1.parse::<Multiaddr>().expect("parse multiaddress"); let addresses1 = addresses1.parse::<Multiaddr>().expect("parse multiaddress");
println!("Connecting node1 to node2: {}", addresses1); println!("Connecting node1 to node2: {}", addresses1);
node2.connect(&addresses1, None).unwrap(); node2.connect(&addresses1, None).await.unwrap();
// Wait for mesh to form // Wait for mesh to form
sleep(Duration::from_secs(3)).await; sleep(Duration::from_secs(3)).await;
@ -108,7 +108,9 @@ async fn test_echo_messages(
Vec::new(), Vec::new(),
false, false,
); );
let _ids = try_publish_relay_messages(&node1, &message).expect("send relay messages"); let _ids = try_publish_relay_messages(&node1, &message)
.await
.expect("send relay messages");
// Wait for the msg to arrive // Wait for the msg to arrive
let rx_waku_message_cloned = rx_waku_message.clone(); let rx_waku_message_cloned = rx_waku_message.clone();
@ -118,8 +120,8 @@ async fn test_echo_messages(
let payload = msg.payload.to_vec(); let payload = msg.payload.to_vec();
let payload_str = from_utf8(&payload).expect("should be valid message"); let payload_str = from_utf8(&payload).expect("should be valid message");
if payload_str == ECHO_MESSAGE { if payload_str == ECHO_MESSAGE {
node1.stop()?; node1.stop().await?;
node2.stop()?; node2.stop().await?;
return Ok(()); return Ok(());
} }
} else { } else {
@ -127,11 +129,11 @@ async fn test_echo_messages(
} }
} }
let node1 = node1.stop()?; let node1 = node1.stop().await?;
let node2 = node2.stop()?; let node2 = node2.stop().await?;
node1.waku_destroy()?; node1.waku_destroy().await?;
node2.waku_destroy()?; node2.waku_destroy().await?;
return Err("Unexpected test ending".to_string()); return Err("Unexpected test ending".to_string());
} }
@ -143,11 +145,13 @@ async fn default_echo() -> Result<(), String> {
let node1 = waku_new(Some(WakuNodeConfig { let node1 = waku_new(Some(WakuNodeConfig {
tcp_port: Some(60010), tcp_port: Some(60010),
..Default::default() ..Default::default()
}))?; }))
.await?;
let node2 = waku_new(Some(WakuNodeConfig { let node2 = waku_new(Some(WakuNodeConfig {
tcp_port: Some(60020), tcp_port: Some(60020),
..Default::default() ..Default::default()
}))?; }))
.await?;
let content_topic = WakuContentTopic::new("toychat", "2", "huilong", Encoding::Proto); let content_topic = WakuContentTopic::new("toychat", "2", "huilong", Encoding::Proto);
@ -165,9 +169,8 @@ async fn default_echo() -> Result<(), String> {
Ok(()) Ok(())
} }
#[test] #[tokio::test]
#[serial] async fn node_restart() {
fn node_restart() {
let config = WakuNodeConfig { let config = WakuNodeConfig {
node_key: Some( node_key: Some(
SecretKey::from_str("05f381866cc21f6c1e2e80e07fa732008e36d942dce3206ad6dcd6793c98d609") SecretKey::from_str("05f381866cc21f6c1e2e80e07fa732008e36d942dce3206ad6dcd6793c98d609")
@ -177,9 +180,14 @@ fn node_restart() {
}; };
for _ in 0..3 { for _ in 0..3 {
let node = waku_new(config.clone().into()).expect("default config should be valid"); let node = waku_new(config.clone().into())
let node = node.start().expect("node should start with valid config"); .await
let node = node.stop().expect("node should stop"); .expect("default config should be valid");
node.waku_destroy().expect("free resources"); let node = node
.start()
.await
.expect("node should start with valid config");
let node = node.stop().await.expect("node should stop");
node.waku_destroy().await.expect("free resources");
} }
} }

@ -1 +1 @@
Subproject commit 1fa916589d3a69a2bb770aba27d4124b929cc4b2 Subproject commit 1d206a5f5e89f7bac855bfd2e25066bcf1187ade