mirror of
https://github.com/logos-messaging/logos-messaging-rust-bindings.git
synced 2026-01-02 14:03:12 +00:00
Recover toy chat app powered by libwaku/nwaku (#105)
* bump nwaku to the current master branch ( commit: 625c8ee5 ) * make the waku crate to behave tokio-asynchronously * use of store * use of lightpush and filter * add waku-bindings/src/general/messagehash.rs * add waku-bindings/src/general/time.rs * add waku-bindings/src/general/waku_decode.rs * add WakuEvent management (WakuMessage, ConnectionChange, TopicHealthChange.) * add waku-bindings/src/macros.rs
This commit is contained in:
parent
fd7e73a7f0
commit
0c0b834aa0
3
.gitignore
vendored
3
.gitignore
vendored
@ -1,4 +1,5 @@
|
||||
**target
|
||||
/Cargo.lock
|
||||
/.idea
|
||||
/.fleet
|
||||
/.fleet
|
||||
nimcache/
|
||||
|
||||
29
Cargo.lock
generated
29
Cargo.lock
generated
@ -530,8 +530,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bfd4d1b31faaa3a89d7934dbded3111da0d2ef28e3ebccdb4f0179f5929d1ef1"
|
||||
dependencies = [
|
||||
"iana-time-zone",
|
||||
"js-sys",
|
||||
"num-integer",
|
||||
"num-traits",
|
||||
"time",
|
||||
"wasm-bindgen",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
@ -1292,7 +1295,7 @@ checksum = "4eb1a864a501629691edf6c15a593b7a51eebaa1e8468e9ddc623de7c9b58ec6"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"wasi",
|
||||
"wasi 0.11.0+wasi-snapshot-preview1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -1695,7 +1698,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"wasi",
|
||||
"wasi 0.11.0+wasi-snapshot-preview1",
|
||||
"windows-sys 0.48.0",
|
||||
]
|
||||
|
||||
@ -2812,6 +2815,17 @@ dependencies = [
|
||||
"once_cell",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "time"
|
||||
version = "0.1.45"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1b797afad3f312d1c66a56d11d0316f916356d11bd158fbc6ca6389ff6bf805a"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"wasi 0.10.0+wasi-snapshot-preview1",
|
||||
"winapi",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tiny-keccak"
|
||||
version = "2.0.2"
|
||||
@ -3034,6 +3048,9 @@ name = "uuid"
|
||||
version = "1.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f00cc9702ca12d3c81455259621e676d0f7251cec66a21e98fe2e9a37db93b2a"
|
||||
dependencies = [
|
||||
"getrandom",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "valuable"
|
||||
@ -3053,6 +3070,7 @@ version = "0.5.0"
|
||||
dependencies = [
|
||||
"aes-gcm",
|
||||
"base64 0.21.0",
|
||||
"chrono",
|
||||
"enr",
|
||||
"futures",
|
||||
"hex",
|
||||
@ -3071,6 +3089,7 @@ dependencies = [
|
||||
"sscanf",
|
||||
"tokio",
|
||||
"url",
|
||||
"uuid",
|
||||
"waku-sys",
|
||||
]
|
||||
|
||||
@ -3092,6 +3111,12 @@ dependencies = [
|
||||
"winapi-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "wasi"
|
||||
version = "0.10.0+wasi-snapshot-preview1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f"
|
||||
|
||||
[[package]]
|
||||
name = "wasi"
|
||||
version = "0.11.0+wasi-snapshot-preview1"
|
||||
|
||||
1226
examples/Cargo.lock
generated
1226
examples/Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@ -2,5 +2,6 @@
|
||||
|
||||
members = [
|
||||
"basic",
|
||||
"tic-tac-toe-gui"
|
||||
"tic-tac-toe-gui",
|
||||
"toy-chat"
|
||||
]
|
||||
|
||||
@ -1,10 +1,9 @@
|
||||
use std::io::Error;
|
||||
use std::str::from_utf8;
|
||||
use std::time::SystemTime;
|
||||
use tokio::time::{sleep, Duration};
|
||||
use waku::{
|
||||
general::pubsubtopic::PubsubTopic, waku_new, Encoding, Event, LibwakuResponse,
|
||||
WakuContentTopic, WakuMessage, WakuNodeConfig,
|
||||
general::pubsubtopic::PubsubTopic, waku_new, Encoding, LibwakuResponse, WakuContentTopic,
|
||||
WakuEvent, WakuMessage, WakuNodeConfig,
|
||||
};
|
||||
|
||||
#[tokio::main]
|
||||
@ -13,12 +12,14 @@ async fn main() -> Result<(), Error> {
|
||||
tcp_port: Some(60010), // TODO: use any available port.
|
||||
..Default::default()
|
||||
}))
|
||||
.await
|
||||
.expect("should instantiate");
|
||||
|
||||
let node2 = waku_new(Some(WakuNodeConfig {
|
||||
tcp_port: Some(60020), // TODO: use any available port.
|
||||
..Default::default()
|
||||
}))
|
||||
.await
|
||||
.expect("should instantiate");
|
||||
|
||||
// ========================================================================
|
||||
@ -26,12 +27,12 @@ async fn main() -> Result<(), Error> {
|
||||
node2
|
||||
.set_event_callback(|response| {
|
||||
if let LibwakuResponse::Success(v) = response {
|
||||
let event: Event =
|
||||
let event: WakuEvent =
|
||||
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");
|
||||
|
||||
match event {
|
||||
Event::WakuMessage(evt) => {
|
||||
println!("WakuMessage event received: {:?}", evt.waku_message);
|
||||
WakuEvent::WakuMessage(evt) => {
|
||||
// println!("WakuMessage event received: {:?}", evt.waku_message);
|
||||
let message = evt.waku_message;
|
||||
let payload = message.payload.to_vec();
|
||||
let msg = from_utf8(&payload).expect("should be valid message");
|
||||
@ -39,7 +40,13 @@ async fn main() -> Result<(), Error> {
|
||||
println!("Message Received in NODE 2: {}", msg);
|
||||
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
|
||||
}
|
||||
Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err),
|
||||
WakuEvent::RelayTopicHealthChange(_evt) => {
|
||||
// dbg!("Relay topic change evt", evt);
|
||||
}
|
||||
WakuEvent::ConnectionChange(_evt) => {
|
||||
// dbg!("Conn change evt", evt);
|
||||
}
|
||||
WakuEvent::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err),
|
||||
_ => panic!("event case not expected"),
|
||||
};
|
||||
}
|
||||
@ -49,12 +56,12 @@ async fn main() -> Result<(), Error> {
|
||||
node1
|
||||
.set_event_callback(|response| {
|
||||
if let LibwakuResponse::Success(v) = response {
|
||||
let event: Event =
|
||||
let event: WakuEvent =
|
||||
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");
|
||||
|
||||
match event {
|
||||
Event::WakuMessage(evt) => {
|
||||
println!("WakuMessage event received: {:?}", evt.waku_message);
|
||||
WakuEvent::WakuMessage(evt) => {
|
||||
// println!("WakuMessage event received: {:?}", evt.waku_message);
|
||||
let message = evt.waku_message;
|
||||
let payload = message.payload.to_vec();
|
||||
let msg = from_utf8(&payload).expect("should be valid message");
|
||||
@ -62,15 +69,21 @@ async fn main() -> Result<(), Error> {
|
||||
println!("Message Received in NODE 1: {}", msg);
|
||||
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
|
||||
}
|
||||
Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err),
|
||||
WakuEvent::RelayTopicHealthChange(_evt) => {
|
||||
// dbg!("Relay topic change evt", evt);
|
||||
}
|
||||
WakuEvent::ConnectionChange(_evt) => {
|
||||
// dbg!("Conn change evt", evt);
|
||||
}
|
||||
WakuEvent::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err),
|
||||
_ => panic!("event case not expected"),
|
||||
};
|
||||
}
|
||||
})
|
||||
.expect("set event call back working");
|
||||
|
||||
let node1 = node1.start().expect("node1 should start");
|
||||
let node2 = node2.start().expect("node2 should start");
|
||||
let node1 = node1.start().await.expect("node1 should start");
|
||||
let node2 = node2.start().await.expect("node2 should start");
|
||||
|
||||
// ========================================================================
|
||||
// Subscribe to pubsub topic
|
||||
@ -78,10 +91,12 @@ async fn main() -> Result<(), Error> {
|
||||
|
||||
node1
|
||||
.relay_subscribe(&topic)
|
||||
.await
|
||||
.expect("node1 should subscribe");
|
||||
|
||||
node2
|
||||
.relay_subscribe(&topic)
|
||||
.await
|
||||
.expect("node2 should subscribe");
|
||||
|
||||
// ========================================================================
|
||||
@ -89,10 +104,12 @@ async fn main() -> Result<(), Error> {
|
||||
|
||||
let addresses2 = node2
|
||||
.listen_addresses()
|
||||
.await
|
||||
.expect("should obtain the addresses");
|
||||
|
||||
node1
|
||||
.connect(&addresses2[0], None)
|
||||
.await
|
||||
.expect("node1 should connect to node2");
|
||||
|
||||
// ========================================================================
|
||||
@ -104,21 +121,10 @@ async fn main() -> Result<(), Error> {
|
||||
// Publish a message
|
||||
|
||||
let content_topic = WakuContentTopic::new("waku", "2", "test", Encoding::Proto);
|
||||
let message = WakuMessage::new(
|
||||
"Hello world",
|
||||
content_topic,
|
||||
0,
|
||||
SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis()
|
||||
.try_into()
|
||||
.unwrap(),
|
||||
Vec::new(),
|
||||
false,
|
||||
);
|
||||
let message = WakuMessage::new("Hello world", content_topic, 0, Vec::new(), false);
|
||||
node1
|
||||
.relay_publish_message(&message, &topic, None)
|
||||
.await
|
||||
.expect("should have sent the message");
|
||||
|
||||
// ========================================================================
|
||||
@ -129,13 +135,13 @@ async fn main() -> Result<(), Error> {
|
||||
// ========================================================================
|
||||
// Stop both instances
|
||||
|
||||
let node1 = node1.stop().expect("should stop");
|
||||
let node2 = node2.stop().expect("should stop");
|
||||
let node1 = node1.stop().await.expect("should stop");
|
||||
let node2 = node2.stop().await.expect("should stop");
|
||||
|
||||
// ========================================================================
|
||||
// Free resources
|
||||
node1.waku_destroy().expect("should deallocate");
|
||||
node2.waku_destroy().expect("should deallocate");
|
||||
node1.waku_destroy().await.expect("should deallocate");
|
||||
node2.waku_destroy().await.expect("should deallocate");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -2,11 +2,12 @@ use eframe::egui;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::str::from_utf8;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{SystemTime, Duration};
|
||||
use std::time::Duration;
|
||||
use tokio::task;
|
||||
|
||||
use tokio::sync::mpsc;
|
||||
use waku::{
|
||||
waku_new, Encoding, Event, LibwakuResponse, WakuContentTopic,
|
||||
waku_new, Encoding, WakuEvent, LibwakuResponse, WakuContentTopic,
|
||||
WakuMessage, WakuNodeConfig, WakuNodeHandle, Initialized, Running,
|
||||
general::pubsubtopic::PubsubTopic,
|
||||
};
|
||||
@ -48,16 +49,16 @@ impl TicTacToeApp<Initialized> {
|
||||
}
|
||||
}
|
||||
|
||||
fn start(self) -> TicTacToeApp<Running> {
|
||||
async fn start(self) -> TicTacToeApp<Running> {
|
||||
let tx_clone = self.tx.clone();
|
||||
|
||||
let my_closure = move |response| {
|
||||
if let LibwakuResponse::Success(v) = response {
|
||||
let event: Event =
|
||||
let event: WakuEvent =
|
||||
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");
|
||||
|
||||
match event {
|
||||
Event::WakuMessage(evt) => {
|
||||
WakuEvent::WakuMessage(evt) => {
|
||||
// println!("WakuMessage event received: {:?}", evt.waku_message);
|
||||
let message = evt.waku_message;
|
||||
let payload = message.payload.to_vec();
|
||||
@ -73,8 +74,14 @@ impl TicTacToeApp<Initialized> {
|
||||
// Handle the error as needed, or just log and skip
|
||||
}
|
||||
}
|
||||
}
|
||||
Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err),
|
||||
},
|
||||
WakuEvent::RelayTopicHealthChange(_evt) => {
|
||||
// dbg!("Relay topic change evt", evt);
|
||||
},
|
||||
WakuEvent::ConnectionChange(_evt) => {
|
||||
// dbg!("Conn change evt", evt);
|
||||
},
|
||||
WakuEvent::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err),
|
||||
_ => panic!("event case not expected"),
|
||||
};
|
||||
}
|
||||
@ -83,17 +90,22 @@ impl TicTacToeApp<Initialized> {
|
||||
// Establish a closure that handles the incoming messages
|
||||
self.waku.set_event_callback(my_closure).expect("set event call back working");
|
||||
|
||||
let _ = self.waku.version();
|
||||
|
||||
// Start the waku node
|
||||
let waku = self.waku.start().expect("waku should start");
|
||||
let waku = self.waku.start().await.expect("waku should start");
|
||||
|
||||
// Subscribe to desired topic using the relay protocol
|
||||
// self.waku.relay_subscribe(&self.game_topic.to_string()).expect("waku should subscribe");
|
||||
waku.relay_subscribe(&self.game_topic).await.expect("waku should subscribe");
|
||||
|
||||
let ctopic = WakuContentTopic::new("waku", "2", "tictactoegame", Encoding::Proto);
|
||||
let content_topics = vec![ctopic];
|
||||
waku.filter_subscribe(&self.game_topic, content_topics).expect("waku should subscribe");
|
||||
// Example filter subscription. This is needed in edge nodes (resource-restricted devices)
|
||||
// Nodes usually use either relay or lightpush/filter protocols
|
||||
|
||||
// let ctopic = WakuContentTopic::new("waku", "2", "tictactoegame", Encoding::Proto);
|
||||
// let content_topics = vec![ctopic];
|
||||
// waku.filter_subscribe(&self.game_topic, content_topics).await.expect("waku should subscribe");
|
||||
|
||||
// End filter example ----------------------------------------
|
||||
|
||||
// Example to establish direct connection to a well-known node
|
||||
|
||||
// Connect to hard-coded node
|
||||
// let target_node_multi_addr =
|
||||
@ -105,9 +117,11 @@ impl TicTacToeApp<Initialized> {
|
||||
// self.waku.connect(&target_node_multi_addr, None)
|
||||
// .expect("waku should connect to other node");
|
||||
|
||||
// End example direct connection
|
||||
|
||||
TicTacToeApp {
|
||||
game_state: self.game_state,
|
||||
waku: waku,
|
||||
waku,
|
||||
game_topic: self.game_topic,
|
||||
tx: self.tx,
|
||||
player_role: self.player_role,
|
||||
@ -116,7 +130,7 @@ impl TicTacToeApp<Initialized> {
|
||||
}
|
||||
|
||||
impl TicTacToeApp<Running> {
|
||||
fn send_game_state(&self, game_state: &GameState) {
|
||||
async fn send_game_state(&self, game_state: &GameState) {
|
||||
let serialized_game_state = serde_json::to_string(game_state).unwrap();
|
||||
let content_topic = WakuContentTopic::new("waku", "2", "tictactoegame", Encoding::Proto);
|
||||
|
||||
@ -124,44 +138,58 @@ impl TicTacToeApp<Running> {
|
||||
&serialized_game_state,
|
||||
content_topic,
|
||||
0,
|
||||
SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis()
|
||||
.try_into()
|
||||
.unwrap(),
|
||||
Vec::new(),
|
||||
false,
|
||||
);
|
||||
|
||||
// self.waku.relay_publish_message(&message, &self.game_topic.to_string(), None)
|
||||
// .expect("Failed to send message");
|
||||
self.waku.lightpush_publish_message(&message, &self.game_topic).expect("Failed to send message");
|
||||
if let Ok(msg_hash) = self.waku.relay_publish_message(&message, &self.game_topic, None).await {
|
||||
dbg!(format!("message hash published: {}", msg_hash));
|
||||
}
|
||||
|
||||
// Example lightpush publish message. This is needed in edge nodes (resource-restricted devices)
|
||||
// Nodes usually use either relay or lightpush/filter protocols
|
||||
//
|
||||
// let msg_hash_ret = self.waku.lightpush_publish_message(&message, &self.game_topic).await;
|
||||
// match msg_hash_ret {
|
||||
// Ok(msg_hash) => println!("Published message hash {:?}", msg_hash.to_string()),
|
||||
// Err(error) => println!("Failed to publish with lightpush: {}", error)
|
||||
// }
|
||||
// End example lightpush publish message
|
||||
}
|
||||
|
||||
fn make_move(&mut self, row: usize, col: usize) {
|
||||
if let Ok(mut game_state) = self.game_state.try_lock() {
|
||||
|
||||
if let Some(my_role) = self.player_role {
|
||||
if (*game_state).current_turn != my_role {
|
||||
if game_state.current_turn != my_role {
|
||||
return; // skip click if not my turn
|
||||
}
|
||||
}
|
||||
|
||||
if (*game_state).board[row][col].is_none() && (*game_state).moves_left > 0 {
|
||||
(*game_state).board[row][col] = Some((*game_state).current_turn);
|
||||
(*game_state).moves_left -= 1;
|
||||
if game_state.board[row][col].is_none() && game_state.moves_left > 0 {
|
||||
game_state.board[row][col] = Some(game_state.current_turn);
|
||||
game_state.moves_left -= 1;
|
||||
|
||||
if let Some(winner) = self.check_winner(&game_state) {
|
||||
(*game_state).current_turn = winner;
|
||||
game_state.current_turn = winner;
|
||||
} else {
|
||||
(*game_state).current_turn = match (*game_state).current_turn {
|
||||
game_state.current_turn = match game_state.current_turn {
|
||||
Player::X => Player::O,
|
||||
Player::O => Player::X,
|
||||
};
|
||||
}
|
||||
|
||||
self.send_game_state(&game_state); // Send updated state after a move
|
||||
// Call the async function in a blocking context
|
||||
task::block_in_place(|| {
|
||||
// Obtain the current runtime handle
|
||||
let handle = tokio::runtime::Handle::current();
|
||||
|
||||
// Block on the async function
|
||||
handle.block_on(async {
|
||||
// Assuming `self` is available in the current context
|
||||
self.send_game_state(&game_state).await;
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -227,7 +255,7 @@ impl eframe::App for TicTacToeApp<Running> {
|
||||
if ui.button("Play as O").clicked() {
|
||||
self.player_role = Some(Player::O);
|
||||
if let Ok(mut game_state) = self.game_state.try_lock() {
|
||||
(*game_state).current_turn = Player::X; // player X should start
|
||||
game_state.current_turn = Player::X; // player X should start
|
||||
}
|
||||
}
|
||||
|
||||
@ -301,6 +329,10 @@ impl eframe::App for TicTacToeApp<Running> {
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
fn on_exit(&mut self, _gl: Option<&eframe::glow::Context>) {
|
||||
// TODO: implement the cleanup an proper stop of waku node
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
@ -316,7 +348,7 @@ async fn main() -> eframe::Result<()> {
|
||||
// node_key: Some(SecretKey::from_str("2fc0515879e52b7b73297cfd6ab3abf7c344ef84b7a90ff6f4cc19e05a198027").unwrap()),
|
||||
max_message_size: Some("1024KiB".to_string()),
|
||||
relay_topics: vec![String::from(&game_topic)],
|
||||
log_level: Some("DEBUG"), // Supported: TRACE, DEBUG, INFO, NOTICE, WARN, ERROR or FATAL
|
||||
log_level: Some("FATAL"), // Supported: TRACE, DEBUG, INFO, NOTICE, WARN, ERROR or FATAL
|
||||
|
||||
keep_alive: Some(true),
|
||||
|
||||
@ -328,7 +360,7 @@ async fn main() -> eframe::Result<()> {
|
||||
// discv5_enr_auto_update: Some(false),
|
||||
|
||||
..Default::default()
|
||||
}))
|
||||
})).await
|
||||
.expect("should instantiate");
|
||||
|
||||
let game_state = GameState {
|
||||
@ -341,7 +373,7 @@ async fn main() -> eframe::Result<()> {
|
||||
let clone = shared_state.clone();
|
||||
let app = TicTacToeApp::new(waku, game_topic, clone, tx);
|
||||
|
||||
let app = app.start();
|
||||
let app = app.start().await;
|
||||
|
||||
let clone = shared_state.clone();
|
||||
// Listen for messages in the main thread
|
||||
|
||||
19
examples/toy-chat/Cargo.toml
Normal file
19
examples/toy-chat/Cargo.toml
Normal file
@ -0,0 +1,19 @@
|
||||
[package]
|
||||
name = "toy-chat"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
authors = [
|
||||
"Daniel Sanchez Quiros <danielsq@status.im>"
|
||||
]
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
waku = { path = "../../waku-bindings", package = "waku-bindings" }
|
||||
serde_json = "1.0"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
tui = "0.19"
|
||||
crossterm = "0.25"
|
||||
unicode-width = "0.1"
|
||||
prost = "0.11"
|
||||
chrono = "0.4"
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
361
examples/toy-chat/src/main.rs
Normal file
361
examples/toy-chat/src/main.rs
Normal file
@ -0,0 +1,361 @@
|
||||
mod protocol;
|
||||
|
||||
use crate::protocol::{Chat2Message, TOY_CHAT_CONTENT_TOPIC};
|
||||
use tokio::task;
|
||||
use crossterm::{
|
||||
event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode},
|
||||
execute,
|
||||
terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen},
|
||||
};
|
||||
use prost::Message;
|
||||
use chrono::Utc;
|
||||
use std::io::Write;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::{error::Error, io};
|
||||
use std::time::Duration;
|
||||
use tui::{
|
||||
backend::{Backend, CrosstermBackend},
|
||||
layout::{Constraint, Direction, Layout},
|
||||
style::{Color, Modifier, Style},
|
||||
text::{Span, Spans, Text},
|
||||
widgets::{Block, Borders, List, ListItem, Paragraph},
|
||||
Frame, Terminal,
|
||||
};
|
||||
use unicode_width::UnicodeWidthStr;
|
||||
use waku::{
|
||||
general::pubsubtopic::PubsubTopic, general::Result, waku_new, Initialized, LibwakuResponse, Running, WakuEvent,
|
||||
WakuMessage, WakuNodeConfig, WakuNodeHandle,
|
||||
};
|
||||
|
||||
enum InputMode {
|
||||
Normal,
|
||||
Editing,
|
||||
}
|
||||
|
||||
const STORE_NODE: &str = "/dns4/store-01.do-ams3.status.staging.status.im/tcp/30303/p2p/16Uiu2HAm3xVDaz6SRJ6kErwC21zBJEZjavVXg7VSkoWzaV1aMA3F";
|
||||
|
||||
const DEFAULT_PUBSUB_TOPIC: &str = "/waku/2/rs/16/32";
|
||||
|
||||
/// App holds the state of the application
|
||||
struct App<State> {
|
||||
/// Current value of the input box
|
||||
input: String,
|
||||
nick: String,
|
||||
/// Current input mode
|
||||
input_mode: InputMode,
|
||||
/// History of recorded messages
|
||||
messages: Arc<RwLock<Vec<Chat2Message>>>,
|
||||
waku: WakuNodeHandle<State>,
|
||||
}
|
||||
|
||||
impl App<Initialized> {
|
||||
async fn new(nick: String) -> Result<App<Initialized>> {
|
||||
let pubsub_topic = PubsubTopic::new(DEFAULT_PUBSUB_TOPIC);
|
||||
let waku = waku_new(Some(WakuNodeConfig {
|
||||
tcp_port: Some(60010),
|
||||
cluster_id: Some(16),
|
||||
shards: vec![1, 32, 64, 128, 256],
|
||||
// node_key: Some(SecretKey::from_str("2fc0515879e52b7b73297cfd6ab3abf7c344ef84b7a90ff6f4cc19e05a198027").unwrap()),
|
||||
max_message_size: Some("1024KiB".to_string()),
|
||||
relay_topics: vec![String::from(&pubsub_topic)],
|
||||
log_level: Some("FATAL"), // Supported: TRACE, DEBUG, INFO, NOTICE, WARN, ERROR or FATAL
|
||||
|
||||
keep_alive: Some(true),
|
||||
|
||||
// Discovery
|
||||
dns_discovery: Some(true),
|
||||
dns_discovery_url: Some("enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.prod.status.nodes.status.im"),
|
||||
// discv5_discovery: Some(true),
|
||||
// discv5_udp_port: Some(9001),
|
||||
// discv5_enr_auto_update: Some(false),
|
||||
|
||||
..Default::default()
|
||||
})).await?;
|
||||
|
||||
Ok(App {
|
||||
input: String::new(),
|
||||
input_mode: InputMode::Normal,
|
||||
messages: Arc::new(RwLock::new(Vec::new())),
|
||||
nick,
|
||||
waku,
|
||||
})
|
||||
}
|
||||
|
||||
async fn start_waku_node(self) -> Result<App<Running>> {
|
||||
|
||||
let shared_messages = Arc::clone(&self.messages);
|
||||
|
||||
self.waku.set_event_callback(move|response| {
|
||||
if let LibwakuResponse::Success(v) = response {
|
||||
let event: WakuEvent =
|
||||
serde_json::from_str(v.unwrap().as_str()).expect("failed parsing event in set_event_callback");
|
||||
|
||||
match event {
|
||||
WakuEvent::WakuMessage(evt) => {
|
||||
|
||||
if evt.waku_message.content_topic != TOY_CHAT_CONTENT_TOPIC {
|
||||
return; // skip the messages that don't belong to the toy chat
|
||||
}
|
||||
|
||||
match <Chat2Message as Message>::decode(evt.waku_message.payload()) {
|
||||
Ok(chat_message) => {
|
||||
// Add the new message to the front
|
||||
{
|
||||
let mut messages_lock = shared_messages.write().unwrap();
|
||||
messages_lock.insert(0, chat_message); // Insert at the front (index 0)
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
let mut out = std::io::stderr();
|
||||
write!(out, "{e:?}").unwrap();
|
||||
}
|
||||
}
|
||||
},
|
||||
WakuEvent::RelayTopicHealthChange(_evt) => {
|
||||
// dbg!("Relay topic change evt", evt);
|
||||
},
|
||||
WakuEvent::ConnectionChange(_evt) => {
|
||||
// dbg!("Conn change evt", evt);
|
||||
},
|
||||
WakuEvent::Unrecognized(err) => eprintln!("Unrecognized waku event: {:?}", err),
|
||||
_ => eprintln!("event case not expected"),
|
||||
};
|
||||
}
|
||||
})?;
|
||||
|
||||
let waku = self.waku.start().await?;
|
||||
|
||||
Ok(App {
|
||||
input: self.input,
|
||||
nick: self.nick,
|
||||
input_mode: self.input_mode,
|
||||
messages: self.messages,
|
||||
waku,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl App<Running> {
|
||||
|
||||
async fn retrieve_history(&mut self) {
|
||||
let one_day_in_secs = 60 * 60 * 24;
|
||||
let time_start = (Duration::from_secs(Utc::now().timestamp() as u64)
|
||||
- Duration::from_secs(one_day_in_secs))
|
||||
.as_nanos() as u64;
|
||||
|
||||
let include_data = true;
|
||||
|
||||
let messages = self.waku.store_query(None,
|
||||
vec![TOY_CHAT_CONTENT_TOPIC.clone()],
|
||||
STORE_NODE,
|
||||
include_data,
|
||||
Some(time_start),
|
||||
None,
|
||||
None).await.unwrap();
|
||||
|
||||
let messages: Vec<_> = messages
|
||||
.into_iter()
|
||||
// we expect messages because the query was passed with include_data == true
|
||||
.filter(|item| item.message.is_some())
|
||||
.map(|store_resp_msg| {
|
||||
<Chat2Message as Message>::decode(store_resp_msg.message.unwrap().payload())
|
||||
.expect("Toy chat messages should be decodeable")
|
||||
})
|
||||
.collect();
|
||||
|
||||
if !messages.is_empty() {
|
||||
*self.messages.write().unwrap() = messages;
|
||||
}
|
||||
}
|
||||
|
||||
fn run_main_loop<B: Backend>(
|
||||
&mut self,
|
||||
terminal: &mut Terminal<B>,
|
||||
) -> std::result::Result<(), Box<dyn Error>> {
|
||||
loop {
|
||||
terminal.draw(|f| ui(f, self))?;
|
||||
|
||||
if event::poll(Duration::from_millis(500)).unwrap() {
|
||||
if let Event::Key(key) = event::read()? {
|
||||
match self.input_mode {
|
||||
InputMode::Normal => match key.code {
|
||||
KeyCode::Char('e') => {
|
||||
self.input_mode = InputMode::Editing;
|
||||
}
|
||||
KeyCode::Char('q') => {
|
||||
return Ok(());
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
InputMode::Editing => match key.code {
|
||||
KeyCode::Enter => {
|
||||
let message_content: String = self.input.drain(..).collect();
|
||||
let message = Chat2Message::new(&self.nick, &message_content);
|
||||
let mut buff = Vec::new();
|
||||
let meta = Vec::new();
|
||||
Message::encode(&message, &mut buff)?;
|
||||
let waku_message = WakuMessage::new(
|
||||
buff,
|
||||
TOY_CHAT_CONTENT_TOPIC.clone(),
|
||||
1,
|
||||
meta,
|
||||
false,
|
||||
);
|
||||
|
||||
// Call the async function in a blocking context
|
||||
task::block_in_place(|| {
|
||||
// Obtain the current runtime handle
|
||||
let handle = tokio::runtime::Handle::current();
|
||||
|
||||
// Block on the async function
|
||||
handle.block_on(async {
|
||||
// Assuming `self` is available in the current context
|
||||
let pubsub_topic = PubsubTopic::new(DEFAULT_PUBSUB_TOPIC);
|
||||
if let Err(e) = self.waku.relay_publish_message(
|
||||
&waku_message,
|
||||
&pubsub_topic,
|
||||
None,
|
||||
).await {
|
||||
let mut out = std::io::stderr();
|
||||
write!(out, "{e:?}").unwrap();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
KeyCode::Char(c) => {
|
||||
self.input.push(c);
|
||||
}
|
||||
KeyCode::Backspace => {
|
||||
self.input.pop();
|
||||
}
|
||||
KeyCode::Esc => {
|
||||
self.input_mode = InputMode::Normal;
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn stop_app(self) {
|
||||
self.waku.stop().await.expect("the node should stop properly");
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> std::result::Result<(), Box<dyn Error>> {
|
||||
let nick = std::env::args().nth(1).expect("Nick to be set");
|
||||
|
||||
let app = App::new(nick).await?;
|
||||
let mut app = app.start_waku_node().await?;
|
||||
|
||||
// setup terminal
|
||||
enable_raw_mode()?;
|
||||
let mut stdout = io::stdout();
|
||||
execute!(stdout, EnterAlternateScreen, EnableMouseCapture)?;
|
||||
let backend = CrosstermBackend::new(stdout);
|
||||
let mut terminal = Terminal::new(backend)?;
|
||||
|
||||
app.retrieve_history().await;
|
||||
let res = app.run_main_loop(&mut terminal);
|
||||
app.stop_app().await;
|
||||
|
||||
// restore terminal
|
||||
disable_raw_mode()?;
|
||||
execute!(
|
||||
terminal.backend_mut(),
|
||||
LeaveAlternateScreen,
|
||||
DisableMouseCapture
|
||||
)?;
|
||||
terminal.show_cursor()?;
|
||||
|
||||
if let Err(err) = res {
|
||||
println!("{err:?}")
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn ui<B: Backend, State>(f: &mut Frame<B>, app: &App<State>) {
|
||||
let chunks = Layout::default()
|
||||
.direction(Direction::Vertical)
|
||||
.margin(2)
|
||||
.constraints(
|
||||
[
|
||||
Constraint::Length(1),
|
||||
Constraint::Length(3),
|
||||
Constraint::Min(1),
|
||||
]
|
||||
.as_ref(),
|
||||
)
|
||||
.split(f.size());
|
||||
|
||||
let (msg, style) = match app.input_mode {
|
||||
InputMode::Normal => (
|
||||
vec![
|
||||
Span::raw("Press "),
|
||||
Span::styled("q", Style::default().add_modifier(Modifier::BOLD)),
|
||||
Span::raw(" to exit, "),
|
||||
Span::styled("e", Style::default().add_modifier(Modifier::BOLD)),
|
||||
Span::raw(" to start writing a message."),
|
||||
],
|
||||
Style::default().add_modifier(Modifier::RAPID_BLINK),
|
||||
),
|
||||
InputMode::Editing => (
|
||||
vec![
|
||||
Span::raw("Press "),
|
||||
Span::styled("Esc", Style::default().add_modifier(Modifier::BOLD)),
|
||||
Span::raw(" to stop editing, "),
|
||||
Span::styled("Enter", Style::default().add_modifier(Modifier::BOLD)),
|
||||
Span::raw(" to record the message"),
|
||||
],
|
||||
Style::default(),
|
||||
),
|
||||
};
|
||||
let mut text = Text::from(Spans::from(msg));
|
||||
text.patch_style(style);
|
||||
let help_message = Paragraph::new(text);
|
||||
f.render_widget(help_message, chunks[0]);
|
||||
|
||||
let input = Paragraph::new(app.input.as_ref())
|
||||
.style(match app.input_mode {
|
||||
InputMode::Normal => Style::default(),
|
||||
InputMode::Editing => Style::default().fg(Color::Yellow),
|
||||
})
|
||||
.block(Block::default().borders(Borders::ALL).title("Input"));
|
||||
f.render_widget(input, chunks[1]);
|
||||
match app.input_mode {
|
||||
InputMode::Normal =>
|
||||
// Hide the cursor. `Frame` does this by default, so we don't need to do anything here
|
||||
{}
|
||||
|
||||
InputMode::Editing => {
|
||||
// Make the cursor visible and ask tui-rs to put it at the specified coordinates after rendering
|
||||
f.set_cursor(
|
||||
// Put cursor past the end of the input text
|
||||
chunks[1].x + app.input.width() as u16 + 1,
|
||||
// Move one line down, from the border to the input line
|
||||
chunks[1].y + 1,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
let messages: Vec<ListItem> = app
|
||||
.messages
|
||||
.read()
|
||||
.unwrap()
|
||||
.iter()
|
||||
.map(|message| {
|
||||
let content = vec![Spans::from(Span::raw(format!(
|
||||
"[{} - {}]: {}",
|
||||
message.timestamp().unwrap().format("%d-%m-%y %H:%M"),
|
||||
message.nick(),
|
||||
message.message()
|
||||
)))];
|
||||
ListItem::new(content)
|
||||
})
|
||||
.collect();
|
||||
let messages = List::new(messages).block(Block::default().borders(Borders::ALL).title("Chat"));
|
||||
f.render_widget(messages, chunks[2]);
|
||||
}
|
||||
37
examples/toy-chat/src/protocol.rs
Normal file
37
examples/toy-chat/src/protocol.rs
Normal file
@ -0,0 +1,37 @@
|
||||
use chrono::{DateTime, LocalResult, TimeZone, Utc};
|
||||
use prost::Message;
|
||||
use waku::{Encoding, WakuContentTopic};
|
||||
|
||||
pub static TOY_CHAT_CONTENT_TOPIC: WakuContentTopic =
|
||||
WakuContentTopic::new("toy-chat", "2", "huilong", Encoding::Proto);
|
||||
|
||||
#[derive(Clone, Message)]
|
||||
pub struct Chat2Message {
|
||||
#[prost(uint64, tag = "1")]
|
||||
timestamp: u64,
|
||||
#[prost(string, tag = "2")]
|
||||
nick: String,
|
||||
#[prost(bytes, tag = "3")]
|
||||
payload: Vec<u8>,
|
||||
}
|
||||
|
||||
impl Chat2Message {
|
||||
pub fn new(nick: &str, payload: &str) -> Self {
|
||||
Self {
|
||||
timestamp: Utc::now().timestamp() as u64,
|
||||
nick: nick.to_string(),
|
||||
payload: payload.as_bytes().to_vec(),
|
||||
}
|
||||
}
|
||||
pub fn message(&self) -> String {
|
||||
String::from_utf8(self.payload.clone()).unwrap()
|
||||
}
|
||||
|
||||
pub fn nick(&self) -> &str {
|
||||
&self.nick
|
||||
}
|
||||
|
||||
pub fn timestamp(&self) -> LocalResult<DateTime<Utc>> {
|
||||
Utc.timestamp_opt(self.timestamp as i64, 0)
|
||||
}
|
||||
}
|
||||
@ -33,6 +33,8 @@ serde-aux = "4.3.1"
|
||||
rln = "0.3.4"
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
regex = "1"
|
||||
chrono = "0.4"
|
||||
uuid = { version = "1.3", features = ["v4"] }
|
||||
|
||||
[dev-dependencies]
|
||||
futures = "0.3.25"
|
||||
|
||||
@ -1,4 +1,6 @@
|
||||
// std
|
||||
use crate::general::waku_decode::WakuDecode;
|
||||
use crate::general::Result;
|
||||
use std::borrow::Cow;
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::str::FromStr;
|
||||
@ -79,6 +81,12 @@ impl WakuContentTopic {
|
||||
}
|
||||
}
|
||||
|
||||
impl WakuDecode for WakuContentTopic {
|
||||
fn decode(input: &str) -> Result<Self> {
|
||||
Ok(serde_json::from_str(input).expect("could not parse store resp"))
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for WakuContentTopic {
|
||||
type Err = String;
|
||||
|
||||
|
||||
@ -1,11 +1,10 @@
|
||||
use crate::general::waku_decode::WakuDecode;
|
||||
use crate::general::Result;
|
||||
use core::str::FromStr;
|
||||
use std::convert::TryFrom;
|
||||
use std::{slice, str};
|
||||
use waku_sys::WakuCallBack;
|
||||
use std::str;
|
||||
use waku_sys::{RET_ERR, RET_MISSING_CALLBACK, RET_OK};
|
||||
|
||||
#[derive(Debug, Default, PartialEq)]
|
||||
#[derive(Debug, Clone, Default, PartialEq)]
|
||||
pub enum LibwakuResponse {
|
||||
Success(Option<String>),
|
||||
Failure(String),
|
||||
@ -31,45 +30,8 @@ impl TryFrom<(u32, &str)> for LibwakuResponse {
|
||||
}
|
||||
}
|
||||
|
||||
// Define the WakuDecode trait
|
||||
pub trait WakuDecode: Sized {
|
||||
fn decode(input: &str) -> Result<Self>;
|
||||
}
|
||||
|
||||
pub fn decode<T: WakuDecode>(input: String) -> Result<T> {
|
||||
T::decode(input.as_str())
|
||||
}
|
||||
|
||||
unsafe extern "C" fn trampoline<F>(
|
||||
ret_code: ::std::os::raw::c_int,
|
||||
data: *const ::std::os::raw::c_char,
|
||||
data_len: usize,
|
||||
user_data: *mut ::std::os::raw::c_void,
|
||||
) where
|
||||
F: FnMut(LibwakuResponse),
|
||||
{
|
||||
let closure = &mut *(user_data as *mut F);
|
||||
|
||||
let response = if data.is_null() {
|
||||
""
|
||||
} else {
|
||||
str::from_utf8(slice::from_raw_parts(data as *mut u8, data_len))
|
||||
.expect("could not retrieve response")
|
||||
};
|
||||
|
||||
let result = LibwakuResponse::try_from((ret_code as u32, response))
|
||||
.expect("invalid response obtained from libwaku");
|
||||
|
||||
closure(result);
|
||||
}
|
||||
|
||||
pub fn get_trampoline<F>(_closure: &F) -> WakuCallBack
|
||||
where
|
||||
F: FnMut(LibwakuResponse),
|
||||
{
|
||||
Some(trampoline::<F>)
|
||||
}
|
||||
|
||||
/// Used in cases where the FFI call doesn't return additional information in the
|
||||
/// callback. Instead, it returns RET_OK, RET_ERR, etc.
|
||||
pub fn handle_no_response(code: i32, result: LibwakuResponse) -> Result<()> {
|
||||
if result == LibwakuResponse::Undefined && code as u32 == RET_OK {
|
||||
// Some functions will only execute the callback on error
|
||||
@ -87,24 +49,11 @@ pub fn handle_no_response(code: i32, result: LibwakuResponse) -> Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handle_json_response<F: WakuDecode>(code: i32, result: LibwakuResponse) -> Result<F> {
|
||||
/// Used in cases where the FFI function returns a code (RET_OK, RET_ERR, etc) plus additional
|
||||
/// information, i.e. LibwakuResponse
|
||||
pub fn handle_response<F: WakuDecode>(code: i32, result: LibwakuResponse) -> Result<F> {
|
||||
match result {
|
||||
LibwakuResponse::Success(v) => decode(v.unwrap_or_default()),
|
||||
LibwakuResponse::Failure(v) => Err(v),
|
||||
LibwakuResponse::MissingCallback => panic!("callback is required"),
|
||||
LibwakuResponse::Undefined => panic!(
|
||||
"undefined ffi state: code({}) was returned but callback was not executed",
|
||||
code
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handle_response<F: FromStr>(code: i32, result: LibwakuResponse) -> Result<F> {
|
||||
match result {
|
||||
LibwakuResponse::Success(v) => v
|
||||
.unwrap_or_default()
|
||||
.parse()
|
||||
.map_err(|_| "could not parse value".into()),
|
||||
LibwakuResponse::Success(v) => WakuDecode::decode(&v.unwrap_or_default()),
|
||||
LibwakuResponse::Failure(v) => Err(v),
|
||||
LibwakuResponse::MissingCallback => panic!("callback is required"),
|
||||
LibwakuResponse::Undefined => panic!(
|
||||
52
waku-bindings/src/general/messagehash.rs
Normal file
52
waku-bindings/src/general/messagehash.rs
Normal file
@ -0,0 +1,52 @@
|
||||
use crate::general::waku_decode::WakuDecode;
|
||||
use hex::FromHex;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::convert::TryInto;
|
||||
use std::fmt;
|
||||
use std::fmt::Write;
|
||||
use std::hash::Hash;
|
||||
use std::str::FromStr;
|
||||
|
||||
/// Waku message hash, hex encoded sha256 digest of the message
|
||||
#[derive(Debug, Deserialize, Serialize, PartialEq, Eq, Clone, Hash)]
|
||||
pub struct MessageHash([u8; 32]);
|
||||
|
||||
impl MessageHash {
|
||||
fn to_hex_string(&self) -> String {
|
||||
self.0.iter().fold(String::new(), |mut output, b| {
|
||||
let _ = write!(output, "{b:02X}");
|
||||
output
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for MessageHash {
|
||||
type Err = String;
|
||||
|
||||
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
|
||||
let s = s.strip_prefix("0x").unwrap_or(s);
|
||||
// Decode the hexadecimal string to a Vec<u8>
|
||||
// We expect a string format like: d38220de82fbcf2df865b680692fce98c36600fdd1d954b8a71e916dc4222b8e
|
||||
let bytes = Vec::from_hex(s).map_err(|e| format!("Hex decode error MessageHash: {}", e))?;
|
||||
|
||||
// Ensure the length is exactly 32 bytes
|
||||
let res = bytes
|
||||
.try_into()
|
||||
.map_err(|_| "Hex string must represent exactly 32 bytes".to_string())?;
|
||||
|
||||
Ok(MessageHash(res))
|
||||
}
|
||||
}
|
||||
|
||||
impl WakuDecode for MessageHash {
|
||||
fn decode(input: &str) -> Result<Self, String> {
|
||||
MessageHash::from_str(input)
|
||||
}
|
||||
}
|
||||
|
||||
// Implement the Display trait
|
||||
impl fmt::Display for MessageHash {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}", self.to_hex_string())
|
||||
}
|
||||
}
|
||||
@ -1,17 +1,20 @@
|
||||
//! Waku [general](https://rfc.vac.dev/spec/36/#general) types
|
||||
|
||||
pub mod contenttopic;
|
||||
pub mod libwaku_response;
|
||||
pub mod messagehash;
|
||||
pub mod pubsubtopic;
|
||||
pub mod time;
|
||||
pub mod waku_decode;
|
||||
|
||||
// crates
|
||||
use crate::general::time::get_now_in_nanosecs;
|
||||
use contenttopic::WakuContentTopic;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_aux::prelude::*;
|
||||
|
||||
/// Waku message version
|
||||
pub type WakuMessageVersion = usize;
|
||||
/// Waku message hash, hex encoded sha256 digest of the message
|
||||
pub type MessageHash = String;
|
||||
|
||||
pub type Result<T> = std::result::Result<T, String>;
|
||||
|
||||
@ -30,7 +33,7 @@ pub struct WakuMessage {
|
||||
pub version: WakuMessageVersion,
|
||||
/// Unix timestamp in nanoseconds
|
||||
#[serde(deserialize_with = "deserialize_number_from_string")]
|
||||
pub timestamp: usize,
|
||||
pub timestamp: u64,
|
||||
#[serde(with = "base64_serde", default = "Vec::new")]
|
||||
pub meta: Vec<u8>,
|
||||
#[serde(default)]
|
||||
@ -40,12 +43,31 @@ pub struct WakuMessage {
|
||||
_extras: serde_json::Value,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize, Debug, Default)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct WakuStoreRespMessage {
|
||||
// #[serde(with = "base64_serde", default = "Vec::new")]
|
||||
pub payload: Vec<u8>,
|
||||
/// The content topic to be set on the message
|
||||
// #[serde(rename = "contentTopic")]
|
||||
pub content_topic: String,
|
||||
// #[serde(with = "base64_serde", default = "Vec::new")]
|
||||
pub meta: Vec<u8>,
|
||||
/// The Waku Message version number
|
||||
#[serde(default)]
|
||||
pub version: WakuMessageVersion,
|
||||
/// Unix timestamp in nanoseconds
|
||||
pub timestamp: usize,
|
||||
#[serde(default)]
|
||||
pub ephemeral: bool,
|
||||
pub proof: Vec<u8>,
|
||||
}
|
||||
|
||||
impl WakuMessage {
|
||||
pub fn new<PAYLOAD: AsRef<[u8]>, META: AsRef<[u8]>>(
|
||||
payload: PAYLOAD,
|
||||
content_topic: WakuContentTopic,
|
||||
version: WakuMessageVersion,
|
||||
timestamp: usize,
|
||||
meta: META,
|
||||
ephemeral: bool,
|
||||
) -> Self {
|
||||
@ -56,12 +78,22 @@ impl WakuMessage {
|
||||
payload,
|
||||
content_topic,
|
||||
version,
|
||||
timestamp,
|
||||
timestamp: get_now_in_nanosecs(),
|
||||
meta,
|
||||
ephemeral,
|
||||
_extras: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn payload(&self) -> &[u8] {
|
||||
&self.payload
|
||||
}
|
||||
}
|
||||
|
||||
impl WakuStoreRespMessage {
|
||||
pub fn payload(&self) -> &[u8] {
|
||||
&self.payload
|
||||
}
|
||||
}
|
||||
|
||||
mod base64_serde {
|
||||
|
||||
@ -1,4 +1,7 @@
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct PubsubTopic(String);
|
||||
|
||||
impl PubsubTopic {
|
||||
|
||||
7
waku-bindings/src/general/time.rs
Normal file
7
waku-bindings/src/general/time.rs
Normal file
@ -0,0 +1,7 @@
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
pub fn get_now_in_nanosecs() -> u64 {
|
||||
let now = SystemTime::now();
|
||||
let since_epoch = now.duration_since(UNIX_EPOCH).expect("Time went backwards");
|
||||
since_epoch.as_secs() * 1_000_000_000 + since_epoch.subsec_nanos() as u64
|
||||
}
|
||||
26
waku-bindings/src/general/waku_decode.rs
Normal file
26
waku-bindings/src/general/waku_decode.rs
Normal file
@ -0,0 +1,26 @@
|
||||
use crate::general::Result;
|
||||
use multiaddr::Multiaddr;
|
||||
// Define the WakuDecode trait
|
||||
pub trait WakuDecode: Sized {
|
||||
fn decode(input: &str) -> Result<Self>;
|
||||
}
|
||||
|
||||
impl WakuDecode for String {
|
||||
fn decode(input: &str) -> Result<Self> {
|
||||
Ok(input.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn decode<T: WakuDecode>(input: String) -> Result<T> {
|
||||
T::decode(input.as_str())
|
||||
}
|
||||
|
||||
impl WakuDecode for Vec<Multiaddr> {
|
||||
fn decode(input: &str) -> Result<Self> {
|
||||
input
|
||||
.split(',')
|
||||
.map(|s| s.trim().parse::<Multiaddr>().map_err(|err| err.to_string()))
|
||||
.collect::<Result<Vec<Multiaddr>>>() // Collect results into a Vec
|
||||
.map_err(|err| format!("could not parse Multiaddr: {}", err))
|
||||
}
|
||||
}
|
||||
@ -2,11 +2,11 @@
|
||||
//!
|
||||
//! Implementation on top of [`waku-bindings`](https://rfc.vac.dev/spec/36/)
|
||||
pub mod general;
|
||||
mod macros;
|
||||
pub mod node;
|
||||
pub mod utils;
|
||||
|
||||
// Re-export the LibwakuResponse type to make it accessible outside this module
|
||||
pub use utils::LibwakuResponse;
|
||||
pub use general::libwaku_response::LibwakuResponse;
|
||||
|
||||
// Required so functions inside libwaku can call RLN functions even if we
|
||||
// use it within the bindings functions
|
||||
@ -15,9 +15,9 @@ pub use utils::LibwakuResponse;
|
||||
use rln;
|
||||
|
||||
pub use node::{
|
||||
waku_create_content_topic, waku_new, Event, Initialized, Key, Multiaddr, PublicKey, RLNConfig,
|
||||
Running, SecretKey, WakuMessageEvent, WakuNodeConfig, WakuNodeHandle,
|
||||
waku_create_content_topic, waku_new, Initialized, Key, Multiaddr, PublicKey, RLNConfig,
|
||||
Running, SecretKey, WakuEvent, WakuMessageEvent, WakuNodeConfig, WakuNodeHandle,
|
||||
};
|
||||
|
||||
pub use general::contenttopic::{Encoding, WakuContentTopic};
|
||||
pub use general::{MessageHash, Result, WakuMessage, WakuMessageVersion};
|
||||
pub use general::{messagehash::MessageHash, Result, WakuMessage, WakuMessageVersion};
|
||||
|
||||
73
waku-bindings/src/macros.rs
Normal file
73
waku-bindings/src/macros.rs
Normal file
@ -0,0 +1,73 @@
|
||||
use crate::general::libwaku_response::LibwakuResponse;
|
||||
|
||||
use std::{slice, str};
|
||||
use waku_sys::WakuCallBack;
|
||||
|
||||
unsafe extern "C" fn trampoline<F>(
|
||||
ret_code: ::std::os::raw::c_int,
|
||||
data: *const ::std::os::raw::c_char,
|
||||
data_len: usize,
|
||||
user_data: *mut ::std::os::raw::c_void,
|
||||
) where
|
||||
F: FnMut(LibwakuResponse),
|
||||
{
|
||||
let closure = &mut *(user_data as *mut F);
|
||||
|
||||
let response = if data.is_null() {
|
||||
""
|
||||
} else {
|
||||
str::from_utf8(slice::from_raw_parts(data as *mut u8, data_len))
|
||||
.expect("could not retrieve response")
|
||||
};
|
||||
|
||||
let result = LibwakuResponse::try_from((ret_code as u32, response))
|
||||
.expect("invalid response obtained from libwaku");
|
||||
|
||||
closure(result);
|
||||
}
|
||||
|
||||
pub fn get_trampoline<F>(_closure: &F) -> WakuCallBack
|
||||
where
|
||||
F: FnMut(LibwakuResponse),
|
||||
{
|
||||
Some(trampoline::<F>)
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! handle_ffi_call {
|
||||
// Case: With or without additional arguments
|
||||
($waku_fn:expr, $resp_hndlr:expr, $ctx:expr $(, $($arg:expr),*)?) => {{
|
||||
use $crate::macros::get_trampoline;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Notify;
|
||||
use libc::*;
|
||||
|
||||
let mut result = LibwakuResponse::default();
|
||||
let notify = Arc::new(Notify::new());
|
||||
let notify_clone = notify.clone();
|
||||
|
||||
// Callback to update the result and notify the waiter
|
||||
let result_cb = |r: LibwakuResponse| {
|
||||
result = r;
|
||||
notify_clone.notify_one();
|
||||
};
|
||||
|
||||
// Create trampoline and invoke the `waku_sys` function
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
$waku_fn(
|
||||
$ctx, // Pass the context
|
||||
$($($arg),*,)? // Expand the variadic arguments if provided
|
||||
cb, // Pass the callback trampoline
|
||||
&mut closure as *mut _ as *mut c_void
|
||||
)
|
||||
};
|
||||
|
||||
// Wait for the callback to notify us
|
||||
notify.notified().await;
|
||||
|
||||
// Handle the response
|
||||
$resp_hndlr(code, result)
|
||||
}};
|
||||
}
|
||||
@ -33,6 +33,10 @@ pub struct WakuNodeConfig {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub max_message_size: Option<String>,
|
||||
|
||||
/// Store protocol
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub storenode: Option<&'static str>,
|
||||
|
||||
/// RLN configuration
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub rln_relay: Option<RLNConfig>,
|
||||
|
||||
@ -2,7 +2,8 @@ use std::ffi::c_void;
|
||||
use std::ptr::null_mut;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use crate::utils::{get_trampoline, LibwakuResponse};
|
||||
use crate::general::libwaku_response::LibwakuResponse;
|
||||
use crate::macros::get_trampoline;
|
||||
|
||||
type LibwakuResponseClosure = dyn FnMut(LibwakuResponse) + Send + Sync;
|
||||
|
||||
|
||||
@ -2,7 +2,7 @@
|
||||
//!
|
||||
//! Asynchronous events require a callback to be registered.
|
||||
//! An example of an asynchronous event that might be emitted is receiving a message.
|
||||
//! When an event is emitted, this callback will be triggered receiving an [`Event`]
|
||||
//! When an event is emitted, this callback will be triggered receiving an [`WakuEvent`]
|
||||
|
||||
// crates
|
||||
use serde::{Deserialize, Serialize};
|
||||
@ -17,9 +17,16 @@ use crate::MessageHash;
|
||||
#[non_exhaustive]
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(tag = "eventType", rename_all = "camelCase")]
|
||||
pub enum Event {
|
||||
pub enum WakuEvent {
|
||||
#[serde(rename = "message")]
|
||||
WakuMessage(WakuMessageEvent),
|
||||
|
||||
#[serde(rename = "relay_topic_health_change")]
|
||||
RelayTopicHealthChange(TopicHealthEvent),
|
||||
|
||||
#[serde(rename = "connection_change")]
|
||||
ConnectionChange(ConnectionChangeEvent),
|
||||
|
||||
Unrecognized(serde_json::Value),
|
||||
}
|
||||
|
||||
@ -35,14 +42,64 @@ pub struct WakuMessageEvent {
|
||||
pub waku_message: WakuMessage,
|
||||
}
|
||||
|
||||
/// Type of `event` field for a `topic health` event
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct TopicHealthEvent {
|
||||
/// The pubsub topic on which the message was received
|
||||
pub pubsub_topic: String,
|
||||
/// The message hash
|
||||
pub topic_health: String,
|
||||
}
|
||||
|
||||
/// Type of `event` field for a `connection change` event
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ConnectionChangeEvent {
|
||||
/// The pubsub topic on which the message was received
|
||||
pub peer_id: String,
|
||||
/// The message hash
|
||||
pub peer_event: String,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::Event;
|
||||
use crate::WakuEvent;
|
||||
use crate::WakuEvent::{ConnectionChange, RelayTopicHealthChange};
|
||||
|
||||
#[test]
|
||||
fn deserialize_message_event() {
|
||||
let s = "{\"eventType\":\"message\",\"messageHash\":\"0x26ff3d7fbc950ea2158ce62fd76fd745eee0323c9eac23d0713843b0f04ea27c\",\"pubsubTopic\":\"/waku/2/default-waku/proto\",\"wakuMessage\":{\"payload\":\"SGkgZnJvbSDwn6aAIQ==\",\"contentTopic\":\"/toychat/2/huilong/proto\",\"timestamp\":1665580926660}}";
|
||||
let evt: Event = serde_json::from_str(s).unwrap();
|
||||
assert!(matches!(evt, Event::WakuMessage(_)));
|
||||
let s = "{\"eventType\":\"message\",\"messageHash\":[91, 70, 26, 8, 141, 232, 150, 200, 26, 206, 224, 175, 249, 74, 61, 140, 231, 126, 224, 160, 91, 80, 162, 65, 250, 171, 84, 149, 133, 110, 214, 101],\"pubsubTopic\":\"/waku/2/default-waku/proto\",\"wakuMessage\":{\"payload\":\"SGkgZnJvbSDwn6aAIQ==\",\"contentTopic\":\"/toychat/2/huilong/proto\",\"timestamp\":1665580926660}}";
|
||||
let evt: WakuEvent = serde_json::from_str(s).unwrap();
|
||||
assert!(matches!(evt, WakuEvent::WakuMessage(_)));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn deserialize_topic_health_change_event() {
|
||||
let s = "{\"eventType\":\"relay_topic_health_change\", \"pubsubTopic\":\"/waku/2/rs/16/1\",\"topicHealth\":\"MinimallyHealthy\"}";
|
||||
let evt: WakuEvent = serde_json::from_str(s).unwrap();
|
||||
match evt {
|
||||
RelayTopicHealthChange(topic_health_event) => {
|
||||
assert_eq!(topic_health_event.pubsub_topic, "/waku/2/rs/16/1");
|
||||
assert_eq!(topic_health_event.topic_health, "MinimallyHealthy");
|
||||
}
|
||||
_ => panic!("Expected RelayTopicHealthChange event, but got {:?}", evt),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn deserialize_connection_change_event() {
|
||||
let s = "{\"eventType\":\"connection_change\", \"peerId\":\"16Uiu2HAmAR24Mbb6VuzoyUiGx42UenDkshENVDj4qnmmbabLvo31\",\"peerEvent\":\"Joined\"}";
|
||||
let evt: WakuEvent = serde_json::from_str(s).unwrap();
|
||||
match evt {
|
||||
ConnectionChange(conn_change_event) => {
|
||||
assert_eq!(
|
||||
conn_change_event.peer_id,
|
||||
"16Uiu2HAmAR24Mbb6VuzoyUiGx42UenDkshENVDj4qnmmbabLvo31"
|
||||
);
|
||||
assert_eq!(conn_change_event.peer_event, "Joined");
|
||||
}
|
||||
_ => panic!("Expected RelayTopicHealthChange event, but got {:?}", evt),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -2,99 +2,58 @@
|
||||
|
||||
// std
|
||||
use std::ffi::CString;
|
||||
// crates
|
||||
use libc::*;
|
||||
// internal
|
||||
use crate::general::contenttopic::WakuContentTopic;
|
||||
use crate::general::libwaku_response::{handle_no_response, LibwakuResponse};
|
||||
use crate::general::pubsubtopic::PubsubTopic;
|
||||
use crate::general::Result;
|
||||
use crate::handle_ffi_call;
|
||||
use crate::node::context::WakuNodeContext;
|
||||
use crate::utils::{get_trampoline, handle_no_response, LibwakuResponse};
|
||||
|
||||
pub fn waku_filter_subscribe(
|
||||
pub async fn waku_filter_subscribe(
|
||||
ctx: &WakuNodeContext,
|
||||
pubsub_topic: &PubsubTopic,
|
||||
content_topics: Vec<WakuContentTopic>,
|
||||
) -> Result<()> {
|
||||
let pubsub_topic = CString::new(String::from(pubsub_topic))
|
||||
.expect("CString should build properly from pubsub topic");
|
||||
let content_topics = WakuContentTopic::join_content_topics(content_topics);
|
||||
let content_topics =
|
||||
CString::new(content_topics).expect("CString should build properly from content topic");
|
||||
|
||||
let pubsub_topic_ptr = CString::new(String::from(pubsub_topic))
|
||||
.expect("CString should build properly from pubsub topic")
|
||||
.into_raw();
|
||||
let content_topics_ptr = CString::new(content_topics)
|
||||
.expect("CString should build properly from content topic")
|
||||
.into_raw();
|
||||
|
||||
let mut result: LibwakuResponse = Default::default();
|
||||
let result_cb = |r: LibwakuResponse| result = r;
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
let out = waku_sys::waku_filter_subscribe(
|
||||
ctx.get_ptr(),
|
||||
pubsub_topic_ptr,
|
||||
content_topics_ptr,
|
||||
cb,
|
||||
&mut closure as *mut _ as *mut c_void,
|
||||
);
|
||||
|
||||
drop(CString::from_raw(pubsub_topic_ptr));
|
||||
drop(CString::from_raw(content_topics_ptr));
|
||||
|
||||
out
|
||||
};
|
||||
|
||||
handle_no_response(code, result)
|
||||
handle_ffi_call!(
|
||||
waku_sys::waku_filter_subscribe,
|
||||
handle_no_response,
|
||||
ctx.get_ptr(),
|
||||
pubsub_topic.as_ptr(),
|
||||
content_topics.as_ptr()
|
||||
)
|
||||
}
|
||||
|
||||
pub fn waku_filter_unsubscribe(
|
||||
pub async fn waku_filter_unsubscribe(
|
||||
ctx: &WakuNodeContext,
|
||||
pubsub_topic: &PubsubTopic,
|
||||
content_topics: Vec<WakuContentTopic>, // comma-separated list of content topics
|
||||
) -> Result<()> {
|
||||
let content_topics_topics = WakuContentTopic::join_content_topics(content_topics);
|
||||
let pubsub_topic = CString::new(String::from(pubsub_topic))
|
||||
.expect("CString should build properly from pubsub topic");
|
||||
let content_topics = WakuContentTopic::join_content_topics(content_topics);
|
||||
let content_topics =
|
||||
CString::new(content_topics).expect("CString should build properly from content topic");
|
||||
|
||||
let pubsub_topic_ptr = CString::new(String::from(pubsub_topic))
|
||||
.expect("CString should build properly from pubsub topic")
|
||||
.into_raw();
|
||||
let content_topics_topics_ptr = CString::new(content_topics_topics)
|
||||
.expect("CString should build properly from content topic")
|
||||
.into_raw();
|
||||
|
||||
let mut result: LibwakuResponse = Default::default();
|
||||
let result_cb = |r: LibwakuResponse| result = r;
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
let out = waku_sys::waku_filter_unsubscribe(
|
||||
ctx.get_ptr(),
|
||||
pubsub_topic_ptr,
|
||||
content_topics_topics_ptr,
|
||||
cb,
|
||||
&mut closure as *mut _ as *mut c_void,
|
||||
);
|
||||
|
||||
drop(CString::from_raw(pubsub_topic_ptr));
|
||||
drop(CString::from_raw(content_topics_topics_ptr));
|
||||
|
||||
out
|
||||
};
|
||||
|
||||
handle_no_response(code, result)
|
||||
handle_ffi_call!(
|
||||
waku_sys::waku_filter_unsubscribe,
|
||||
handle_no_response,
|
||||
ctx.get_ptr(),
|
||||
pubsub_topic.as_ptr(),
|
||||
content_topics.as_ptr()
|
||||
)
|
||||
}
|
||||
|
||||
pub fn waku_filter_unsubscribe_all(ctx: &WakuNodeContext) -> Result<()> {
|
||||
let mut result: LibwakuResponse = Default::default();
|
||||
let result_cb = |r: LibwakuResponse| result = r;
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
waku_sys::waku_filter_unsubscribe_all(
|
||||
ctx.get_ptr(),
|
||||
cb,
|
||||
&mut closure as *mut _ as *mut c_void,
|
||||
)
|
||||
};
|
||||
|
||||
handle_no_response(code, result)
|
||||
pub async fn waku_filter_unsubscribe_all(ctx: &WakuNodeContext) -> Result<()> {
|
||||
handle_ffi_call!(
|
||||
waku_sys::waku_filter_unsubscribe_all,
|
||||
handle_no_response,
|
||||
ctx.get_ptr()
|
||||
)
|
||||
}
|
||||
|
||||
@ -2,48 +2,33 @@
|
||||
|
||||
// std
|
||||
use std::ffi::CString;
|
||||
// crates
|
||||
use libc::*;
|
||||
// internal
|
||||
use crate::general::{MessageHash, Result, WakuMessage};
|
||||
use crate::general::libwaku_response::{handle_response, LibwakuResponse};
|
||||
use crate::general::{messagehash::MessageHash, Result, WakuMessage};
|
||||
use crate::handle_ffi_call;
|
||||
use crate::node::context::WakuNodeContext;
|
||||
use crate::utils::{get_trampoline, handle_response, LibwakuResponse};
|
||||
|
||||
use crate::general::pubsubtopic::PubsubTopic;
|
||||
|
||||
pub fn waku_lightpush_publish_message(
|
||||
pub async fn waku_lightpush_publish_message(
|
||||
ctx: &WakuNodeContext,
|
||||
message: &WakuMessage,
|
||||
pubsub_topic: &PubsubTopic,
|
||||
) -> Result<MessageHash> {
|
||||
let message_ptr = CString::new(
|
||||
let message = CString::new(
|
||||
serde_json::to_string(&message)
|
||||
.expect("WakuMessages should always be able to success serializing"),
|
||||
)
|
||||
.expect("CString should build properly from the serialized waku message")
|
||||
.into_raw();
|
||||
let pubsub_topic_ptr = CString::new(String::from(pubsub_topic))
|
||||
.expect("CString should build properly from pubsub topic")
|
||||
.into_raw();
|
||||
.expect("CString should build properly from the serialized waku message");
|
||||
|
||||
let mut result: LibwakuResponse = Default::default();
|
||||
let result_cb = |r: LibwakuResponse| result = r;
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
let out = waku_sys::waku_lightpush_publish(
|
||||
ctx.get_ptr(),
|
||||
pubsub_topic_ptr,
|
||||
message_ptr,
|
||||
cb,
|
||||
&mut closure as *mut _ as *mut c_void,
|
||||
);
|
||||
let pubsub_topic = CString::new(String::from(pubsub_topic))
|
||||
.expect("CString should build properly from pubsub topic");
|
||||
|
||||
drop(CString::from_raw(message_ptr));
|
||||
drop(CString::from_raw(pubsub_topic_ptr));
|
||||
|
||||
out
|
||||
};
|
||||
|
||||
handle_response(code, result)
|
||||
handle_ffi_call!(
|
||||
waku_sys::waku_lightpush_publish,
|
||||
handle_response,
|
||||
ctx.get_ptr(),
|
||||
pubsub_topic.as_ptr(),
|
||||
message.as_ptr()
|
||||
)
|
||||
}
|
||||
|
||||
@ -5,41 +5,42 @@ use std::ffi::CString;
|
||||
// crates
|
||||
use libc::c_void;
|
||||
use multiaddr::Multiaddr;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::Notify;
|
||||
// internal
|
||||
use super::config::WakuNodeConfig;
|
||||
use crate::general::libwaku_response::{handle_no_response, handle_response, LibwakuResponse};
|
||||
use crate::general::Result;
|
||||
use crate::handle_ffi_call;
|
||||
use crate::macros::get_trampoline;
|
||||
use crate::node::context::WakuNodeContext;
|
||||
use crate::utils::LibwakuResponse;
|
||||
use crate::utils::WakuDecode;
|
||||
use crate::utils::{get_trampoline, handle_json_response, handle_no_response, handle_response};
|
||||
|
||||
/// Instantiates a Waku node
|
||||
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_newchar-jsonconfig)
|
||||
pub fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeContext> {
|
||||
unsafe {
|
||||
waku_sys::waku_setup();
|
||||
}
|
||||
|
||||
pub async fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeContext> {
|
||||
let config = config.unwrap_or_default();
|
||||
let config_ptr = CString::new(
|
||||
let config = CString::new(
|
||||
serde_json::to_string(&config)
|
||||
.expect("Serialization from properly built NodeConfig should never fail"),
|
||||
)
|
||||
.expect("CString should build properly from the config")
|
||||
.into_raw();
|
||||
.expect("CString should build properly from the config");
|
||||
let config_ptr = config.as_ptr();
|
||||
|
||||
let mut result: LibwakuResponse = Default::default();
|
||||
let result_cb = |r: LibwakuResponse| result = r;
|
||||
let notify = Arc::new(Notify::new());
|
||||
let notify_clone = notify.clone();
|
||||
let mut result = LibwakuResponse::default();
|
||||
let result_cb = |r: LibwakuResponse| {
|
||||
result = r;
|
||||
notify_clone.notify_one(); // Notify that the value has been updated
|
||||
};
|
||||
let obj_ptr = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
let out = waku_sys::waku_new(config_ptr, cb, &mut closure as *mut _ as *mut c_void);
|
||||
|
||||
drop(CString::from_raw(config_ptr));
|
||||
|
||||
out
|
||||
waku_sys::waku_new(config_ptr, cb, &mut closure as *mut _ as *mut c_void)
|
||||
};
|
||||
|
||||
notify.notified().await; // Wait until a result is received
|
||||
|
||||
match result {
|
||||
LibwakuResponse::MissingCallback => panic!("callback is required"),
|
||||
LibwakuResponse::Failure(v) => Err(v),
|
||||
@ -47,112 +48,73 @@ pub fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeContext> {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn waku_destroy(ctx: &WakuNodeContext) -> Result<()> {
|
||||
let mut result: LibwakuResponse = Default::default();
|
||||
let result_cb = |r: LibwakuResponse| result = r;
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
waku_sys::waku_destroy(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void)
|
||||
};
|
||||
|
||||
handle_no_response(code, result)
|
||||
pub async fn waku_destroy(ctx: &WakuNodeContext) -> Result<()> {
|
||||
handle_ffi_call!(waku_sys::waku_destroy, handle_no_response, ctx.get_ptr())
|
||||
}
|
||||
|
||||
/// Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation.
|
||||
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_start)
|
||||
pub fn waku_start(ctx: &WakuNodeContext) -> Result<()> {
|
||||
let mut result: LibwakuResponse = Default::default();
|
||||
let result_cb = |r: LibwakuResponse| result = r;
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
waku_sys::waku_start(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void)
|
||||
};
|
||||
|
||||
handle_no_response(code, result)
|
||||
pub async fn waku_start(ctx: &WakuNodeContext) -> Result<()> {
|
||||
handle_ffi_call!(waku_sys::waku_start, handle_no_response, ctx.get_ptr())
|
||||
}
|
||||
|
||||
/// Stops a Waku node
|
||||
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop)
|
||||
pub fn waku_stop(ctx: &WakuNodeContext) -> Result<()> {
|
||||
let mut result: LibwakuResponse = Default::default();
|
||||
let result_cb = |r: LibwakuResponse| result = r;
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
waku_sys::waku_stop(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void)
|
||||
};
|
||||
|
||||
handle_no_response(code, result)
|
||||
pub async fn waku_stop(ctx: &WakuNodeContext) -> Result<()> {
|
||||
handle_ffi_call!(waku_sys::waku_stop, handle_no_response, ctx.get_ptr())
|
||||
}
|
||||
|
||||
/// nwaku version
|
||||
#[allow(clippy::not_unsafe_ptr_arg_deref)]
|
||||
pub fn waku_version(ctx: &WakuNodeContext) -> Result<String> {
|
||||
let mut result: LibwakuResponse = Default::default();
|
||||
let result_cb = |r: LibwakuResponse| result = r;
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
waku_sys::waku_version(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void)
|
||||
};
|
||||
|
||||
handle_response(code, result)
|
||||
}
|
||||
|
||||
// Implement WakuDecode for Vec<Multiaddr>
|
||||
impl WakuDecode for Vec<Multiaddr> {
|
||||
fn decode(input: &str) -> Result<Self> {
|
||||
input
|
||||
.split(',')
|
||||
.map(|s| s.trim().parse::<Multiaddr>().map_err(|err| err.to_string()))
|
||||
.collect::<Result<Vec<Multiaddr>>>() // Collect results into a Vec
|
||||
.map_err(|err| format!("could not parse Multiaddr: {}", err))
|
||||
}
|
||||
pub async fn waku_version(ctx: &WakuNodeContext) -> Result<String> {
|
||||
handle_ffi_call!(waku_sys::waku_version, handle_response, ctx.get_ptr())
|
||||
}
|
||||
|
||||
/// Get the multiaddresses the Waku node is listening to
|
||||
/// as per [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_listen_addresses)
|
||||
pub fn waku_listen_addresses(ctx: &WakuNodeContext) -> Result<Vec<Multiaddr>> {
|
||||
let mut result: LibwakuResponse = Default::default();
|
||||
let result_cb = |r: LibwakuResponse| result = r;
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
waku_sys::waku_listen_addresses(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void)
|
||||
};
|
||||
|
||||
handle_json_response(code, result)
|
||||
pub async fn waku_listen_addresses(ctx: &WakuNodeContext) -> Result<Vec<Multiaddr>> {
|
||||
handle_ffi_call!(
|
||||
waku_sys::waku_listen_addresses,
|
||||
handle_response,
|
||||
ctx.get_ptr()
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::waku_new;
|
||||
use crate::node::management::{waku_listen_addresses, waku_start, waku_stop, waku_version};
|
||||
use crate::node::management::{
|
||||
waku_destroy, waku_listen_addresses, waku_start, waku_stop, waku_version,
|
||||
};
|
||||
use serial_test::serial;
|
||||
|
||||
#[test]
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
fn waku_flow() {
|
||||
let node = waku_new(None).unwrap();
|
||||
async fn waku_flow() {
|
||||
let node = waku_new(None).await.unwrap();
|
||||
|
||||
waku_start(&node).unwrap();
|
||||
waku_start(&node).await.unwrap();
|
||||
|
||||
// test addresses
|
||||
let addresses = waku_listen_addresses(&node).unwrap();
|
||||
let addresses = waku_listen_addresses(&node).await.unwrap();
|
||||
dbg!(&addresses);
|
||||
assert!(!addresses.is_empty());
|
||||
|
||||
waku_stop(&node).unwrap();
|
||||
waku_stop(&node).await.unwrap();
|
||||
waku_destroy(&node).await.unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
fn nwaku_version() {
|
||||
let node = waku_new(None).unwrap();
|
||||
let version = waku_version(&node).expect("should return the version");
|
||||
async fn nwaku_version() {
|
||||
let node = waku_new(None).await.unwrap();
|
||||
|
||||
let version = waku_version(&node)
|
||||
.await
|
||||
.expect("should return the version");
|
||||
|
||||
print!("Current version: {}", version);
|
||||
|
||||
assert!(!version.is_empty());
|
||||
waku_destroy(&node).await.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
@ -8,6 +8,7 @@ mod lightpush;
|
||||
mod management;
|
||||
mod peers;
|
||||
mod relay;
|
||||
mod store;
|
||||
|
||||
// std
|
||||
pub use aes_gcm::Key;
|
||||
@ -15,20 +16,19 @@ pub use multiaddr::Multiaddr;
|
||||
pub use secp256k1::{PublicKey, SecretKey};
|
||||
use std::marker::PhantomData;
|
||||
use std::time::Duration;
|
||||
use store::{StoreQueryRequest, StoreWakuMessageResponse};
|
||||
// internal
|
||||
use crate::general::contenttopic::{Encoding, WakuContentTopic};
|
||||
use crate::general::libwaku_response::LibwakuResponse;
|
||||
pub use crate::general::pubsubtopic::PubsubTopic;
|
||||
use crate::general::{MessageHash, Result, WakuMessage};
|
||||
use crate::utils::LibwakuResponse;
|
||||
use crate::general::{messagehash::MessageHash, Result, WakuMessage};
|
||||
|
||||
use crate::node::context::WakuNodeContext;
|
||||
pub use config::RLNConfig;
|
||||
pub use config::WakuNodeConfig;
|
||||
pub use events::{Event, WakuMessageEvent};
|
||||
pub use events::{WakuEvent, WakuMessageEvent};
|
||||
pub use relay::waku_create_content_topic;
|
||||
|
||||
use std::time::SystemTime;
|
||||
|
||||
// Define state marker types
|
||||
pub struct Initialized;
|
||||
pub struct Running;
|
||||
@ -41,34 +41,41 @@ pub struct WakuNodeHandle<State> {
|
||||
|
||||
/// Spawn a new Waku node with the given configuration (default configuration if `None` provided)
|
||||
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_newchar-jsonconfig)
|
||||
pub fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeHandle<Initialized>> {
|
||||
pub async fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeHandle<Initialized>> {
|
||||
Ok(WakuNodeHandle {
|
||||
ctx: management::waku_new(config)?,
|
||||
ctx: management::waku_new(config).await?,
|
||||
_state: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
impl<State> WakuNodeHandle<State> {
|
||||
/// Get the nwaku version
|
||||
pub fn version(&self) -> Result<String> {
|
||||
management::waku_version(&self.ctx)
|
||||
pub async fn version(&self) -> Result<String> {
|
||||
management::waku_version(&self.ctx).await
|
||||
}
|
||||
|
||||
pub fn waku_destroy(self) -> Result<()> {
|
||||
let res = management::waku_destroy(&self.ctx);
|
||||
pub async fn waku_destroy(self) -> Result<()> {
|
||||
let res = management::waku_destroy(&self.ctx).await;
|
||||
self.ctx.reset_ptr();
|
||||
res
|
||||
}
|
||||
|
||||
/// Subscribe to WakuRelay to receive messages matching a content filter.
|
||||
pub async fn relay_subscribe(&self, pubsub_topic: &PubsubTopic) -> Result<()> {
|
||||
relay::waku_relay_subscribe(&self.ctx, pubsub_topic).await
|
||||
}
|
||||
}
|
||||
|
||||
impl WakuNodeHandle<Initialized> {
|
||||
/// Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation.
|
||||
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_start)
|
||||
pub fn start(self) -> Result<WakuNodeHandle<Running>> {
|
||||
management::waku_start(&self.ctx).map(|_| WakuNodeHandle {
|
||||
ctx: self.ctx,
|
||||
_state: PhantomData,
|
||||
})
|
||||
pub async fn start(self) -> Result<WakuNodeHandle<Running>> {
|
||||
management::waku_start(&self.ctx)
|
||||
.await
|
||||
.map(|_| WakuNodeHandle {
|
||||
ctx: self.ctx,
|
||||
_state: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn set_event_callback<F: FnMut(LibwakuResponse) + 'static + Sync + Send>(
|
||||
@ -82,17 +89,19 @@ impl WakuNodeHandle<Initialized> {
|
||||
impl WakuNodeHandle<Running> {
|
||||
/// Stops a Waku node
|
||||
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop)
|
||||
pub fn stop(self) -> Result<WakuNodeHandle<Initialized>> {
|
||||
management::waku_stop(&self.ctx).map(|_| WakuNodeHandle {
|
||||
ctx: self.ctx,
|
||||
_state: PhantomData,
|
||||
})
|
||||
pub async fn stop(self) -> Result<WakuNodeHandle<Initialized>> {
|
||||
management::waku_stop(&self.ctx)
|
||||
.await
|
||||
.map(|_| WakuNodeHandle {
|
||||
ctx: self.ctx,
|
||||
_state: PhantomData,
|
||||
})
|
||||
}
|
||||
|
||||
/// Get the multiaddresses the Waku node is listening to
|
||||
/// as per [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_listen_addresses)
|
||||
pub fn listen_addresses(&self) -> Result<Vec<Multiaddr>> {
|
||||
management::waku_listen_addresses(&self.ctx)
|
||||
pub async fn listen_addresses(&self) -> Result<Vec<Multiaddr>> {
|
||||
management::waku_listen_addresses(&self.ctx).await
|
||||
}
|
||||
|
||||
/// Dial peer using a multiaddress
|
||||
@ -100,11 +109,11 @@ impl WakuNodeHandle<Running> {
|
||||
/// If the function execution takes longer than `timeout` value, the execution will be canceled and an error returned.
|
||||
/// Use 0 for no timeout
|
||||
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_connect_peerchar-address-int-timeoutms)
|
||||
pub fn connect(&self, address: &Multiaddr, timeout: Option<Duration>) -> Result<()> {
|
||||
peers::waku_connect(&self.ctx, address, timeout)
|
||||
pub async fn connect(&self, address: &Multiaddr, timeout: Option<Duration>) -> Result<()> {
|
||||
peers::waku_connect(&self.ctx, address, timeout).await
|
||||
}
|
||||
|
||||
pub fn relay_publish_txt(
|
||||
pub async fn relay_publish_txt(
|
||||
&self,
|
||||
pubsub_topic: &PubsubTopic,
|
||||
msg_txt: &String,
|
||||
@ -112,70 +121,93 @@ impl WakuNodeHandle<Running> {
|
||||
timeout: Option<Duration>,
|
||||
) -> Result<MessageHash> {
|
||||
let content_topic = WakuContentTopic::new("waku", "2", content_topic_name, Encoding::Proto);
|
||||
let message = WakuMessage::new(
|
||||
msg_txt,
|
||||
content_topic,
|
||||
0,
|
||||
SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis()
|
||||
.try_into()
|
||||
.unwrap(),
|
||||
Vec::new(),
|
||||
false,
|
||||
);
|
||||
|
||||
relay::waku_relay_publish_message(&self.ctx, &message, pubsub_topic, timeout)
|
||||
let message = WakuMessage::new(msg_txt, content_topic, 0, Vec::new(), false);
|
||||
relay::waku_relay_publish_message(&self.ctx, &message, pubsub_topic, timeout).await
|
||||
}
|
||||
|
||||
/// Publish a message using Waku Relay.
|
||||
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms)
|
||||
/// The pubsub_topic parameter is optional and if not specified it will be derived from the contentTopic.
|
||||
pub fn relay_publish_message(
|
||||
pub async fn relay_publish_message(
|
||||
&self,
|
||||
message: &WakuMessage,
|
||||
pubsub_topic: &PubsubTopic,
|
||||
timeout: Option<Duration>,
|
||||
) -> Result<MessageHash> {
|
||||
relay::waku_relay_publish_message(&self.ctx, message, pubsub_topic, timeout)
|
||||
}
|
||||
|
||||
/// Subscribe to WakuRelay to receive messages matching a content filter.
|
||||
pub fn relay_subscribe(&self, pubsub_topic: &PubsubTopic) -> Result<()> {
|
||||
relay::waku_relay_subscribe(&self.ctx, pubsub_topic)
|
||||
relay::waku_relay_publish_message(&self.ctx, message, pubsub_topic, timeout).await
|
||||
}
|
||||
|
||||
/// Closes the pubsub subscription to stop receiving messages matching a content filter. No more messages will be received from this pubsub topic
|
||||
pub fn relay_unsubscribe(&self, pubsub_topic: &PubsubTopic) -> Result<()> {
|
||||
relay::waku_relay_unsubscribe(&self.ctx, pubsub_topic)
|
||||
pub async fn relay_unsubscribe(&self, pubsub_topic: &PubsubTopic) -> Result<()> {
|
||||
relay::waku_relay_unsubscribe(&self.ctx, pubsub_topic).await
|
||||
}
|
||||
|
||||
pub fn filter_subscribe(
|
||||
pub async fn filter_subscribe(
|
||||
&self,
|
||||
pubsub_topic: &PubsubTopic,
|
||||
content_topics: Vec<WakuContentTopic>,
|
||||
) -> Result<()> {
|
||||
filter::waku_filter_subscribe(&self.ctx, pubsub_topic, content_topics)
|
||||
filter::waku_filter_subscribe(&self.ctx, pubsub_topic, content_topics).await
|
||||
}
|
||||
|
||||
pub fn filter_unsubscribe(
|
||||
pub async fn filter_unsubscribe(
|
||||
&self,
|
||||
pubsub_topic: &PubsubTopic,
|
||||
content_topics: Vec<WakuContentTopic>,
|
||||
) -> Result<()> {
|
||||
filter::waku_filter_unsubscribe(&self.ctx, pubsub_topic, content_topics)
|
||||
filter::waku_filter_unsubscribe(&self.ctx, pubsub_topic, content_topics).await
|
||||
}
|
||||
|
||||
pub fn filter_unsubscribe_all(&self) -> Result<()> {
|
||||
filter::waku_filter_unsubscribe_all(&self.ctx)
|
||||
pub async fn filter_unsubscribe_all(&self) -> Result<()> {
|
||||
filter::waku_filter_unsubscribe_all(&self.ctx).await
|
||||
}
|
||||
|
||||
pub fn lightpush_publish_message(
|
||||
pub async fn lightpush_publish_message(
|
||||
&self,
|
||||
message: &WakuMessage,
|
||||
pubsub_topic: &PubsubTopic,
|
||||
) -> Result<MessageHash> {
|
||||
lightpush::waku_lightpush_publish_message(&self.ctx, message, pubsub_topic)
|
||||
lightpush::waku_lightpush_publish_message(&self.ctx, message, pubsub_topic).await
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn store_query(
|
||||
&self,
|
||||
pubsub_topic: Option<PubsubTopic>,
|
||||
content_topics: Vec<WakuContentTopic>,
|
||||
peer_addr: &str,
|
||||
include_data: bool, // is true, resp contains payload, etc. Only msg_hashes otherwise
|
||||
time_start: Option<u64>, // unix time nanoseconds
|
||||
time_end: Option<u64>, // unix time nanoseconds
|
||||
timeout_millis: Option<Duration>,
|
||||
) -> Result<Vec<StoreWakuMessageResponse>> {
|
||||
let mut cursor: Option<MessageHash> = None;
|
||||
|
||||
let mut messages: Vec<StoreWakuMessageResponse> = Vec::new();
|
||||
|
||||
loop {
|
||||
let query = StoreQueryRequest::new()
|
||||
.with_pubsub_topic(pubsub_topic.clone())
|
||||
.with_content_topics(content_topics.clone())
|
||||
.with_include_data(include_data)
|
||||
.with_time_start(time_start)
|
||||
.with_time_end(time_end)
|
||||
.with_pagination_cursor(cursor)
|
||||
.with_pagination_forward(true);
|
||||
|
||||
let response =
|
||||
store::waku_store_query(&self.ctx, query, peer_addr, timeout_millis).await?;
|
||||
|
||||
messages.extend(response.messages);
|
||||
|
||||
if response.pagination_cursor.is_none() {
|
||||
break;
|
||||
}
|
||||
cursor = response.pagination_cursor;
|
||||
}
|
||||
|
||||
messages.reverse();
|
||||
|
||||
Ok(messages)
|
||||
}
|
||||
}
|
||||
|
||||
@ -4,47 +4,33 @@
|
||||
use std::ffi::CString;
|
||||
use std::time::Duration;
|
||||
// crates
|
||||
use libc::*;
|
||||
use multiaddr::Multiaddr;
|
||||
// internal
|
||||
use crate::general::libwaku_response::{handle_no_response, LibwakuResponse};
|
||||
use crate::general::Result;
|
||||
use crate::handle_ffi_call;
|
||||
use crate::node::context::WakuNodeContext;
|
||||
use crate::utils::LibwakuResponse;
|
||||
use crate::utils::{get_trampoline, handle_no_response};
|
||||
|
||||
/// Dial peer using a multiaddress
|
||||
/// If `timeout` as milliseconds doesn't fit into a `i32` it is clamped to [`i32::MAX`]
|
||||
/// If the function execution takes longer than `timeout` value, the execution will be canceled and an error returned.
|
||||
/// Use 0 for no timeout
|
||||
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_connect_peerchar-address-int-timeoutms)
|
||||
pub fn waku_connect(
|
||||
pub async fn waku_connect(
|
||||
ctx: &WakuNodeContext,
|
||||
address: &Multiaddr,
|
||||
timeout: Option<Duration>,
|
||||
) -> Result<()> {
|
||||
let address_ptr = CString::new(address.to_string())
|
||||
.expect("CString should build properly from multiaddress")
|
||||
.into_raw();
|
||||
let address =
|
||||
CString::new(address.to_string()).expect("CString should build properly from multiaddress");
|
||||
|
||||
let mut result: LibwakuResponse = Default::default();
|
||||
let result_cb = |r: LibwakuResponse| result = r;
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
let out = waku_sys::waku_connect(
|
||||
ctx.get_ptr(),
|
||||
address_ptr,
|
||||
timeout
|
||||
.map(|duration| duration.as_millis().try_into().unwrap_or(u32::MAX))
|
||||
.unwrap_or(0),
|
||||
cb,
|
||||
&mut closure as *mut _ as *mut c_void,
|
||||
);
|
||||
|
||||
drop(CString::from_raw(address_ptr));
|
||||
|
||||
out
|
||||
};
|
||||
|
||||
handle_no_response(code, result)
|
||||
handle_ffi_call!(
|
||||
waku_sys::waku_connect,
|
||||
handle_no_response,
|
||||
ctx.get_ptr(),
|
||||
address.as_ptr(),
|
||||
timeout
|
||||
.map(|duration| duration.as_millis().try_into().unwrap_or(u32::MAX))
|
||||
.unwrap_or(0)
|
||||
)
|
||||
}
|
||||
|
||||
@ -3,154 +3,99 @@
|
||||
// std
|
||||
use std::ffi::CString;
|
||||
use std::time::Duration;
|
||||
// crates
|
||||
use libc::*;
|
||||
// internal
|
||||
use crate::general::contenttopic::{Encoding, WakuContentTopic};
|
||||
use crate::general::libwaku_response::{handle_no_response, handle_response, LibwakuResponse};
|
||||
use crate::general::pubsubtopic::PubsubTopic;
|
||||
use crate::general::{MessageHash, Result, WakuMessage};
|
||||
use crate::general::{messagehash::MessageHash, Result, WakuMessage};
|
||||
use crate::handle_ffi_call;
|
||||
use crate::node::context::WakuNodeContext;
|
||||
use crate::utils::{get_trampoline, handle_no_response, handle_response, LibwakuResponse};
|
||||
|
||||
/// Create a content topic according to [RFC 23](https://rfc.vac.dev/spec/23/)
|
||||
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_content_topicchar-applicationname-unsigned-int-applicationversion-char-contenttopicname-char-encoding)
|
||||
#[allow(clippy::not_unsafe_ptr_arg_deref)]
|
||||
pub fn waku_create_content_topic(
|
||||
pub async fn waku_create_content_topic(
|
||||
ctx: &WakuNodeContext,
|
||||
application_name: &str,
|
||||
application_version: u32,
|
||||
content_topic_name: &str,
|
||||
encoding: Encoding,
|
||||
) -> WakuContentTopic {
|
||||
let application_name_ptr = CString::new(application_name)
|
||||
.expect("Application name should always transform to CString")
|
||||
.into_raw();
|
||||
let content_topic_name_ptr = CString::new(content_topic_name)
|
||||
.expect("Content topic should always transform to CString")
|
||||
.into_raw();
|
||||
let encoding_ptr = CString::new(encoding.to_string())
|
||||
.expect("Encoding should always transform to CString")
|
||||
.into_raw();
|
||||
) -> Result<WakuContentTopic> {
|
||||
let application_name = CString::new(application_name)
|
||||
.expect("Application name should always transform to CString");
|
||||
let content_topic_name =
|
||||
CString::new(content_topic_name).expect("Content topic should always transform to CString");
|
||||
let encoding =
|
||||
CString::new(encoding.to_string()).expect("Encoding should always transform to CString");
|
||||
|
||||
let mut result: LibwakuResponse = Default::default();
|
||||
let result_cb = |r: LibwakuResponse| result = r;
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
let out = waku_sys::waku_content_topic(
|
||||
ctx.get_ptr(),
|
||||
application_name_ptr,
|
||||
application_version,
|
||||
content_topic_name_ptr,
|
||||
encoding_ptr,
|
||||
cb,
|
||||
&mut closure as *mut _ as *mut c_void,
|
||||
);
|
||||
|
||||
drop(CString::from_raw(application_name_ptr));
|
||||
drop(CString::from_raw(content_topic_name_ptr));
|
||||
drop(CString::from_raw(encoding_ptr));
|
||||
|
||||
out
|
||||
};
|
||||
|
||||
handle_response(code, result).expect("&str from result should always be extracted")
|
||||
handle_ffi_call!(
|
||||
waku_sys::waku_content_topic,
|
||||
handle_response,
|
||||
ctx.get_ptr(),
|
||||
application_name.as_ptr(),
|
||||
application_version,
|
||||
content_topic_name.as_ptr(),
|
||||
encoding.as_ptr()
|
||||
)
|
||||
}
|
||||
|
||||
/// Publish a message using Waku Relay
|
||||
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms)
|
||||
pub fn waku_relay_publish_message(
|
||||
pub async fn waku_relay_publish_message(
|
||||
ctx: &WakuNodeContext,
|
||||
message: &WakuMessage,
|
||||
pubsub_topic: &PubsubTopic,
|
||||
timeout: Option<Duration>,
|
||||
) -> Result<MessageHash> {
|
||||
let message_ptr = CString::new(
|
||||
let message = CString::new(
|
||||
serde_json::to_string(&message)
|
||||
.expect("WakuMessages should always be able to success serializing"),
|
||||
)
|
||||
.expect("CString should build properly from the serialized waku message")
|
||||
.into_raw();
|
||||
let pubsub_topic_ptr = CString::new(String::from(pubsub_topic))
|
||||
.expect("CString should build properly from pubsub topic")
|
||||
.into_raw();
|
||||
.expect("CString should build properly from the serialized waku message");
|
||||
|
||||
let mut result: LibwakuResponse = Default::default();
|
||||
let result_cb = |r: LibwakuResponse| result = r;
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
let out = waku_sys::waku_relay_publish(
|
||||
ctx.get_ptr(),
|
||||
pubsub_topic_ptr,
|
||||
message_ptr,
|
||||
timeout
|
||||
.map(|duration| {
|
||||
duration
|
||||
.as_millis()
|
||||
.try_into()
|
||||
.expect("Duration as milliseconds should fit in a u32")
|
||||
})
|
||||
.unwrap_or(0),
|
||||
cb,
|
||||
&mut closure as *mut _ as *mut c_void,
|
||||
);
|
||||
let pubsub_topic = CString::new(String::from(pubsub_topic))
|
||||
.expect("CString should build properly from pubsub topic");
|
||||
|
||||
drop(CString::from_raw(message_ptr));
|
||||
drop(CString::from_raw(pubsub_topic_ptr));
|
||||
|
||||
out
|
||||
};
|
||||
|
||||
handle_response(code, result)
|
||||
handle_ffi_call!(
|
||||
waku_sys::waku_relay_publish,
|
||||
handle_response,
|
||||
ctx.get_ptr(),
|
||||
pubsub_topic.as_ptr(),
|
||||
message.as_ptr(),
|
||||
timeout
|
||||
.map(|duration| {
|
||||
duration
|
||||
.as_millis()
|
||||
.try_into()
|
||||
.expect("Duration as milliseconds should fit in a u32")
|
||||
})
|
||||
.unwrap_or(0)
|
||||
)
|
||||
}
|
||||
|
||||
pub fn waku_relay_subscribe(ctx: &WakuNodeContext, pubsub_topic: &PubsubTopic) -> Result<()> {
|
||||
let pubsub_topic_ptr = CString::new(String::from(pubsub_topic))
|
||||
.expect("CString should build properly from pubsub topic")
|
||||
.into_raw();
|
||||
pub async fn waku_relay_subscribe(ctx: &WakuNodeContext, pubsub_topic: &PubsubTopic) -> Result<()> {
|
||||
let pubsub_topic = CString::new(String::from(pubsub_topic))
|
||||
.expect("CString should build properly from pubsub topic");
|
||||
|
||||
let mut result: LibwakuResponse = Default::default();
|
||||
let result_cb = |r: LibwakuResponse| result = r;
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
let out = waku_sys::waku_relay_subscribe(
|
||||
ctx.get_ptr(),
|
||||
pubsub_topic_ptr,
|
||||
cb,
|
||||
&mut closure as *mut _ as *mut c_void,
|
||||
);
|
||||
|
||||
drop(CString::from_raw(pubsub_topic_ptr));
|
||||
|
||||
out
|
||||
};
|
||||
|
||||
handle_no_response(code, result)
|
||||
handle_ffi_call!(
|
||||
waku_sys::waku_relay_subscribe,
|
||||
handle_no_response,
|
||||
ctx.get_ptr(),
|
||||
pubsub_topic.as_ptr()
|
||||
)
|
||||
}
|
||||
|
||||
pub fn waku_relay_unsubscribe(ctx: &WakuNodeContext, pubsub_topic: &PubsubTopic) -> Result<()> {
|
||||
let pubsub_topic_ptr = CString::new(String::from(pubsub_topic))
|
||||
.expect("CString should build properly from pubsub topic")
|
||||
.into_raw();
|
||||
pub async fn waku_relay_unsubscribe(
|
||||
ctx: &WakuNodeContext,
|
||||
pubsub_topic: &PubsubTopic,
|
||||
) -> Result<()> {
|
||||
let pubsub_topic = CString::new(String::from(pubsub_topic))
|
||||
.expect("CString should build properly from pubsub topic");
|
||||
|
||||
let mut result: LibwakuResponse = Default::default();
|
||||
let result_cb = |r: LibwakuResponse| result = r;
|
||||
let code = unsafe {
|
||||
let mut closure = result_cb;
|
||||
let cb = get_trampoline(&closure);
|
||||
let out = waku_sys::waku_relay_subscribe(
|
||||
ctx.get_ptr(),
|
||||
pubsub_topic_ptr,
|
||||
cb,
|
||||
&mut closure as *mut _ as *mut c_void,
|
||||
);
|
||||
|
||||
drop(CString::from_raw(pubsub_topic_ptr));
|
||||
|
||||
out
|
||||
};
|
||||
|
||||
handle_no_response(code, result)
|
||||
handle_ffi_call!(
|
||||
waku_sys::waku_relay_unsubscribe,
|
||||
handle_no_response,
|
||||
ctx.get_ptr(),
|
||||
pubsub_topic.as_ptr()
|
||||
)
|
||||
}
|
||||
|
||||
172
waku-bindings/src/node/store.rs
Normal file
172
waku-bindings/src/node/store.rs
Normal file
@ -0,0 +1,172 @@
|
||||
//! Waku store protocol related methods
|
||||
|
||||
// std
|
||||
use std::ffi::CString;
|
||||
use uuid::Uuid;
|
||||
// crates
|
||||
use tokio::time::Duration;
|
||||
// internal
|
||||
use crate::general::libwaku_response::{handle_response, LibwakuResponse};
|
||||
use crate::general::time::get_now_in_nanosecs;
|
||||
use crate::general::waku_decode::WakuDecode;
|
||||
use crate::general::{
|
||||
contenttopic::WakuContentTopic, messagehash::MessageHash, pubsubtopic::PubsubTopic, Result,
|
||||
WakuStoreRespMessage,
|
||||
};
|
||||
use crate::handle_ffi_call;
|
||||
use crate::node::context::WakuNodeContext;
|
||||
use multiaddr::Multiaddr;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct PagingOptions {
|
||||
pub page_size: usize,
|
||||
pub cursor: Option<MessageHash>,
|
||||
pub forward: bool,
|
||||
}
|
||||
|
||||
/// Criteria used to retrieve historical messages
|
||||
#[derive(Clone, Serialize, Debug)]
|
||||
pub struct StoreQueryRequest {
|
||||
/// if true, the store-response will include the full message content. If false,
|
||||
/// the store-response will only include a list of message hashes.
|
||||
request_id: String,
|
||||
include_data: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pubsub_topic: Option<PubsubTopic>,
|
||||
content_topics: Vec<WakuContentTopic>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
time_start: Option<u64>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
time_end: Option<u64>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
message_hashes: Option<Vec<MessageHash>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pagination_cursor: Option<MessageHash>, // Message hash (key) from where to start query (exclusive)
|
||||
pagination_forward: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pagination_limit: Option<u64>,
|
||||
}
|
||||
|
||||
impl StoreQueryRequest {
|
||||
pub fn new() -> Self {
|
||||
StoreQueryRequest {
|
||||
request_id: Uuid::new_v4().to_string(),
|
||||
include_data: true,
|
||||
pubsub_topic: None,
|
||||
content_topics: Vec::new(),
|
||||
time_start: Some(get_now_in_nanosecs()),
|
||||
time_end: Some(get_now_in_nanosecs()),
|
||||
message_hashes: None,
|
||||
pagination_cursor: None,
|
||||
pagination_forward: true,
|
||||
pagination_limit: Some(25),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_include_data(mut self, include_data: bool) -> Self {
|
||||
self.include_data = include_data;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_pubsub_topic(mut self, pubsub_topic: Option<PubsubTopic>) -> Self {
|
||||
self.pubsub_topic = pubsub_topic;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_content_topics(mut self, content_topics: Vec<WakuContentTopic>) -> Self {
|
||||
self.content_topics = content_topics;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_time_start(mut self, time_start: Option<u64>) -> Self {
|
||||
self.time_start = time_start;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_time_end(mut self, time_end: Option<u64>) -> Self {
|
||||
self.time_end = time_end;
|
||||
self
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn with_message_hashes(mut self, message_hashes: Vec<MessageHash>) -> Self {
|
||||
self.message_hashes = Some(message_hashes);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_pagination_cursor(mut self, pagination_cursor: Option<MessageHash>) -> Self {
|
||||
self.pagination_cursor = pagination_cursor;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_pagination_forward(mut self, pagination_forward: bool) -> Self {
|
||||
self.pagination_forward = pagination_forward;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Deserialize, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct StoreWakuMessageResponse {
|
||||
pub message_hash: MessageHash,
|
||||
pub message: Option<WakuStoreRespMessage>, // None if include_data == false
|
||||
pub pubsub_topic: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Deserialize, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct StoreResponse {
|
||||
#[allow(unused)]
|
||||
pub request_id: String,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[allow(unused)]
|
||||
pub status_code: u32,
|
||||
|
||||
#[allow(unused)]
|
||||
pub status_desc: String,
|
||||
|
||||
/// Array of retrieved historical messages in [`WakuMessage`] format
|
||||
// #[serde(default)]
|
||||
pub messages: Vec<StoreWakuMessageResponse>,
|
||||
/// Paging information in [`PagingOptions`] format from which to resume further historical queries
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub pagination_cursor: Option<MessageHash>,
|
||||
}
|
||||
|
||||
// Implement WakuDecode for Vec<Multiaddr>
|
||||
impl WakuDecode for StoreResponse {
|
||||
fn decode(input: &str) -> Result<Self> {
|
||||
Ok(serde_json::from_str(input).expect("could not parse store resp"))
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn waku_store_query(
|
||||
ctx: &WakuNodeContext,
|
||||
query: StoreQueryRequest,
|
||||
peer_addr: &str,
|
||||
timeout_millis: Option<Duration>,
|
||||
) -> Result<StoreResponse> {
|
||||
let json_query = CString::new(
|
||||
serde_json::to_string(&query).expect("StoreQuery should always be able to be serialized"),
|
||||
)
|
||||
.expect("CString should build properly from the serialized filter subscription");
|
||||
|
||||
peer_addr
|
||||
.parse::<Multiaddr>()
|
||||
.expect("correct multiaddress in store query");
|
||||
let peer_addr = CString::new(peer_addr).expect("peer_addr CString should be created");
|
||||
|
||||
let timeout_millis = timeout_millis.unwrap_or(Duration::from_secs(10));
|
||||
|
||||
handle_ffi_call!(
|
||||
waku_sys::waku_store_query,
|
||||
handle_response,
|
||||
ctx.get_ptr(),
|
||||
json_query.as_ptr(),
|
||||
peer_addr.as_ptr(),
|
||||
timeout_millis.as_millis() as i32
|
||||
)
|
||||
}
|
||||
@ -4,13 +4,13 @@ use secp256k1::SecretKey;
|
||||
use serial_test::serial;
|
||||
use std::str::FromStr;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, SystemTime};
|
||||
use std::time::Duration;
|
||||
use std::{collections::HashSet, str::from_utf8};
|
||||
use tokio::time;
|
||||
use tokio::time::sleep;
|
||||
use waku_bindings::node::PubsubTopic;
|
||||
use waku_bindings::{
|
||||
waku_new, Encoding, Event, Initialized, MessageHash, WakuContentTopic, WakuMessage,
|
||||
waku_new, Encoding, Initialized, MessageHash, WakuContentTopic, WakuEvent, WakuMessage,
|
||||
WakuNodeConfig, WakuNodeHandle,
|
||||
};
|
||||
use waku_bindings::{LibwakuResponse, Running};
|
||||
@ -18,15 +18,13 @@ const ECHO_TIMEOUT: u64 = 1000;
|
||||
const ECHO_MESSAGE: &str = "Hi from 🦀!";
|
||||
const TEST_PUBSUBTOPIC: &str = "test";
|
||||
|
||||
fn try_publish_relay_messages(
|
||||
async fn try_publish_relay_messages(
|
||||
node: &WakuNodeHandle<Running>,
|
||||
msg: &WakuMessage,
|
||||
) -> Result<HashSet<MessageHash>, String> {
|
||||
Ok(HashSet::from([node.relay_publish_message(
|
||||
msg,
|
||||
&PubsubTopic::new(TEST_PUBSUBTOPIC),
|
||||
None,
|
||||
)?]))
|
||||
Ok(HashSet::from([node
|
||||
.relay_publish_message(msg, &PubsubTopic::new(TEST_PUBSUBTOPIC), None)
|
||||
.await?]))
|
||||
}
|
||||
|
||||
async fn test_echo_messages(
|
||||
@ -45,16 +43,22 @@ async fn test_echo_messages(
|
||||
let rx_waku_message_cloned = rx_waku_message.clone();
|
||||
let closure = move |response| {
|
||||
if let LibwakuResponse::Success(v) = response {
|
||||
let event: Event =
|
||||
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");
|
||||
let event: WakuEvent = serde_json::from_str(v.unwrap().as_str())
|
||||
.expect("Parsing event to succeed test_echo_messages");
|
||||
|
||||
match event {
|
||||
Event::WakuMessage(evt) => {
|
||||
WakuEvent::WakuMessage(evt) => {
|
||||
if let Ok(mut msg_lock) = rx_waku_message_cloned.lock() {
|
||||
*msg_lock = evt.waku_message;
|
||||
}
|
||||
}
|
||||
Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err),
|
||||
WakuEvent::RelayTopicHealthChange(_evt) => {
|
||||
// dbg!("Relay topic change evt", evt);
|
||||
}
|
||||
WakuEvent::ConnectionChange(_evt) => {
|
||||
// dbg!("Conn change evt", evt);
|
||||
}
|
||||
WakuEvent::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err),
|
||||
_ => panic!("event case not expected"),
|
||||
};
|
||||
}
|
||||
@ -66,21 +70,23 @@ async fn test_echo_messages(
|
||||
.set_event_callback(closure)
|
||||
.expect("set event call back working"); // Set the event callback with the closure
|
||||
|
||||
let node1 = node1.start()?;
|
||||
let node2 = node2.start()?;
|
||||
let node1 = node1.start().await?;
|
||||
let node2 = node2.start().await?;
|
||||
|
||||
node1
|
||||
.relay_subscribe(&PubsubTopic::new(TEST_PUBSUBTOPIC))
|
||||
.await
|
||||
.unwrap();
|
||||
node2
|
||||
.relay_subscribe(&PubsubTopic::new(TEST_PUBSUBTOPIC))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
sleep(Duration::from_secs(3)).await;
|
||||
sleep(Duration::from_secs(5)).await;
|
||||
|
||||
// Interconnect nodes
|
||||
// Replace all matches with 127.0.0.1 to avoid issue with NAT or firewall.
|
||||
let addresses1 = node1.listen_addresses().unwrap();
|
||||
let addresses1 = node1.listen_addresses().await.unwrap();
|
||||
let addresses1 = &addresses1[0].to_string();
|
||||
|
||||
let re = Regex::new(r"\b(?:\d{1,3}\.){3}\d{1,3}\b").unwrap();
|
||||
@ -89,26 +95,16 @@ async fn test_echo_messages(
|
||||
let addresses1 = addresses1.parse::<Multiaddr>().expect("parse multiaddress");
|
||||
|
||||
println!("Connecting node1 to node2: {}", addresses1);
|
||||
node2.connect(&addresses1, None).unwrap();
|
||||
node2.connect(&addresses1, None).await.unwrap();
|
||||
|
||||
// Wait for mesh to form
|
||||
sleep(Duration::from_secs(3)).await;
|
||||
|
||||
dbg!("Before publish");
|
||||
let message = WakuMessage::new(
|
||||
content,
|
||||
content_topic,
|
||||
1,
|
||||
SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis()
|
||||
.try_into()
|
||||
.unwrap(),
|
||||
Vec::new(),
|
||||
false,
|
||||
);
|
||||
let _ids = try_publish_relay_messages(&node1, &message).expect("send relay messages");
|
||||
let message = WakuMessage::new(content, content_topic, 1, Vec::new(), false);
|
||||
let _ids = try_publish_relay_messages(&node1, &message)
|
||||
.await
|
||||
.expect("send relay messages");
|
||||
|
||||
// Wait for the msg to arrive
|
||||
let rx_waku_message_cloned = rx_waku_message.clone();
|
||||
@ -118,8 +114,8 @@ async fn test_echo_messages(
|
||||
let payload = msg.payload.to_vec();
|
||||
let payload_str = from_utf8(&payload).expect("should be valid message");
|
||||
if payload_str == ECHO_MESSAGE {
|
||||
node1.stop()?;
|
||||
node2.stop()?;
|
||||
node1.stop().await?;
|
||||
node2.stop().await?;
|
||||
return Ok(());
|
||||
}
|
||||
} else {
|
||||
@ -127,11 +123,11 @@ async fn test_echo_messages(
|
||||
}
|
||||
}
|
||||
|
||||
let node1 = node1.stop()?;
|
||||
let node2 = node2.stop()?;
|
||||
let node1 = node1.stop().await?;
|
||||
let node2 = node2.stop().await?;
|
||||
|
||||
node1.waku_destroy()?;
|
||||
node2.waku_destroy()?;
|
||||
node1.waku_destroy().await?;
|
||||
node2.waku_destroy().await?;
|
||||
|
||||
return Err("Unexpected test ending".to_string());
|
||||
}
|
||||
@ -143,11 +139,13 @@ async fn default_echo() -> Result<(), String> {
|
||||
let node1 = waku_new(Some(WakuNodeConfig {
|
||||
tcp_port: Some(60010),
|
||||
..Default::default()
|
||||
}))?;
|
||||
}))
|
||||
.await?;
|
||||
let node2 = waku_new(Some(WakuNodeConfig {
|
||||
tcp_port: Some(60020),
|
||||
..Default::default()
|
||||
}))?;
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let content_topic = WakuContentTopic::new("toychat", "2", "huilong", Encoding::Proto);
|
||||
|
||||
@ -165,9 +163,9 @@ async fn default_echo() -> Result<(), String> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[tokio::test]
|
||||
#[serial]
|
||||
fn node_restart() {
|
||||
async fn node_restart() {
|
||||
let config = WakuNodeConfig {
|
||||
node_key: Some(
|
||||
SecretKey::from_str("05f381866cc21f6c1e2e80e07fa732008e36d942dce3206ad6dcd6793c98d609")
|
||||
@ -177,9 +175,14 @@ fn node_restart() {
|
||||
};
|
||||
|
||||
for _ in 0..3 {
|
||||
let node = waku_new(config.clone().into()).expect("default config should be valid");
|
||||
let node = node.start().expect("node should start with valid config");
|
||||
let node = node.stop().expect("node should stop");
|
||||
node.waku_destroy().expect("free resources");
|
||||
let node = waku_new(config.clone().into())
|
||||
.await
|
||||
.expect("default config should be valid");
|
||||
let node = node
|
||||
.start()
|
||||
.await
|
||||
.expect("node should start with valid config");
|
||||
let node = node.stop().await.expect("node should stop");
|
||||
node.waku_destroy().await.expect("free resources");
|
||||
}
|
||||
}
|
||||
|
||||
@ -66,27 +66,11 @@ fn generate_bindgen_code(project_dir: &Path) {
|
||||
);
|
||||
println!("cargo:rustc-link-lib=static=backtrace");
|
||||
|
||||
println!("cargo:rustc-link-lib=stdc++");
|
||||
|
||||
println!(
|
||||
"cargo:rustc-link-search={}",
|
||||
vendor_path.join("vendor/negentropy/cpp").display()
|
||||
);
|
||||
println!("cargo:rustc-link-lib=static=negentropy");
|
||||
|
||||
println!("cargo:rustc-link-lib=ssl");
|
||||
println!("cargo:rustc-link-lib=crypto");
|
||||
|
||||
cc::Build::new()
|
||||
.file("src/cmd.c") // Compile the C file
|
||||
.compile("cmditems"); // Compile it as a library
|
||||
println!("cargo:rustc-link-lib=static=cmditems");
|
||||
|
||||
// TODO: Determine if pthread is automatically included
|
||||
println!("cargo:rustc-link-lib=pthread");
|
||||
|
||||
// TODO: Test in other architectures
|
||||
|
||||
// Generate waku bindings with bindgen
|
||||
let bindings = bindgen::Builder::default()
|
||||
// The input header we would like to generate
|
||||
|
||||
@ -1 +1 @@
|
||||
Subproject commit d814519578380bf01398c29424a5fd1005ed3a29
|
||||
Subproject commit 625c8ee51bc3e065da0e2e8d3a53d3634589f548
|
||||
Loading…
x
Reference in New Issue
Block a user