Tic tac toe example (#104)

* update nwaku vendor to v0.33.1
* build.rs: add negentropy dependency and cmdCount cmdLine dependencies
* fix: call waku_setup when instantiating waku_new
* Properly decode a Vec<Multiaddr>
* First commit tic-tac-toe
* adding some simple game logic to coordinate the turns a little
* some logic to panic if a proper event callback hasn't been set
* restoring back the type state pattern introduced by Richard
* new PubsubTopic type
* fix clippy issues

---------

Co-authored-by: Richard Ramos <info@richardramos.me>
This commit is contained in:
Ivan FB 2024-11-28 10:35:41 +01:00 committed by GitHub
parent 7a2e4d1d01
commit fd7e73a7f0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 3226 additions and 351 deletions

2
Cargo.lock generated
View File

@ -380,6 +380,7 @@ name = "basic"
version = "0.1.0"
dependencies = [
"futures",
"serde_json",
"tokio",
"tokio-util",
"waku-bindings",
@ -3059,6 +3060,7 @@ dependencies = [
"multiaddr",
"once_cell",
"rand",
"regex",
"rln",
"secp256k1 0.26.0",
"serde",

2122
examples/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,5 +1,6 @@
[workspace]
members = [
"basic"
"basic",
"tic-tac-toe-gui"
]

View File

@ -10,3 +10,4 @@ futures = "0.3.30"
tokio = { version = "1.36.0", features = ["full"] }
tokio-util = { version = "0.7.10", features = ["rt"] }
waku = { path = "../../waku-bindings", package = "waku-bindings" }
serde_json = "1.0"

View File

@ -3,53 +3,78 @@ use std::str::from_utf8;
use std::time::SystemTime;
use tokio::time::{sleep, Duration};
use waku::{
waku_destroy, waku_new, Encoding, Event, WakuContentTopic, WakuMessage, WakuNodeConfig,
general::pubsubtopic::PubsubTopic, waku_new, Encoding, Event, LibwakuResponse,
WakuContentTopic, WakuMessage, WakuNodeConfig,
};
#[tokio::main]
async fn main() -> Result<(), Error> {
let node1 = waku_new(Some(WakuNodeConfig {
port: Some(60010), // TODO: use any available port.
tcp_port: Some(60010), // TODO: use any available port.
..Default::default()
}))
.expect("should instantiate");
let node2 = waku_new(Some(WakuNodeConfig {
port: Some(60020), // TODO: use any available port.
tcp_port: Some(60020), // TODO: use any available port.
..Default::default()
}))
.expect("should instantiate");
// ========================================================================
// Setting an event callback to be executed each time a message is received
node2
.set_event_callback(|response| {
if let LibwakuResponse::Success(v) = response {
let event: Event =
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");
match event {
Event::WakuMessage(evt) => {
println!("WakuMessage event received: {:?}", evt.waku_message);
let message = evt.waku_message;
let payload = message.payload.to_vec();
let msg = from_utf8(&payload).expect("should be valid message");
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
println!("Message Received in NODE 2: {}", msg);
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
}
Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err),
_ => panic!("event case not expected"),
};
}
})
.expect("set event call back working");
node1
.set_event_callback(|response| {
if let LibwakuResponse::Success(v) = response {
let event: Event =
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");
match event {
Event::WakuMessage(evt) => {
println!("WakuMessage event received: {:?}", evt.waku_message);
let message = evt.waku_message;
let payload = message.payload.to_vec();
let msg = from_utf8(&payload).expect("should be valid message");
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
println!("Message Received in NODE 1: {}", msg);
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
}
Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err),
_ => panic!("event case not expected"),
};
}
})
.expect("set event call back working");
let node1 = node1.start().expect("node1 should start");
let node2 = node2.start().expect("node2 should start");
// ========================================================================
// Setting an event callback to be executed each time a message is received
node2.set_event_callback(move |event| {
if let Event::WakuMessage(message) = event {
let message = message.waku_message;
let payload = message.payload.to_vec();
let msg = from_utf8(&payload).expect("should be valid message");
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
println!("Message Received in NODE 2: {}", msg);
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
}
});
node1.set_event_callback(move |event| {
if let Event::WakuMessage(message) = event {
let message = message.waku_message;
let payload = message.payload.to_vec();
let msg = from_utf8(&payload).expect("should be valid message");
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
println!("Message Received in NODE 1: {}", msg);
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
}
});
// ========================================================================
// Subscribe to pubsub topic
let topic = "test".to_string();
let topic = PubsubTopic::new("test");
node1
.relay_subscribe(&topic)
@ -109,8 +134,8 @@ async fn main() -> Result<(), Error> {
// ========================================================================
// Free resources
waku_destroy(node1).expect("should deallocate");
waku_destroy(node2).expect("should deallocate");
node1.waku_destroy().expect("should deallocate");
node2.waku_destroy().expect("should deallocate");
Ok(())
}

View File

@ -0,0 +1,17 @@
[package]
name = "tic-tac-toe-gui"
version = "0.1.0"
edition = "2021"
[dependencies]
waku = { path = "../../waku-bindings", package = "waku-bindings" }
serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
ark-std = "0.4"
ctrlc = "3.2.4"
tokio = { version = "1", features = ["full"] }
tokio-util = "0.6" # for utility functions if needed
egui = "0.22"
eframe = "0.22"
secp256k1 = { version = "0.26", features = ["rand", "recovery", "serde"] }

View File

@ -0,0 +1,374 @@
use eframe::egui;
use serde::{Deserialize, Serialize};
use std::str::from_utf8;
use std::sync::{Arc, Mutex};
use std::time::{SystemTime, Duration};
use tokio::sync::mpsc;
use waku::{
waku_new, Encoding, Event, LibwakuResponse, WakuContentTopic,
WakuMessage, WakuNodeConfig, WakuNodeHandle, Initialized, Running,
general::pubsubtopic::PubsubTopic,
};
#[derive(Serialize, Deserialize, PartialEq, Debug, Copy, Clone)]
enum Player {
X,
O,
}
#[derive(Serialize, Deserialize, Clone)]
struct GameState {
board: [[Option<Player>; 3]; 3],
current_turn: Player,
moves_left: usize,
}
struct TicTacToeApp<State> {
game_state: Arc<Mutex<GameState>>,
waku: WakuNodeHandle<State>,
game_topic: PubsubTopic,
tx: mpsc::Sender<String>, // Sender to send `msg` to main thread
player_role: Option<Player>, // Store the player's role (X or O)
}
impl TicTacToeApp<Initialized> {
fn new(
waku: WakuNodeHandle<Initialized>,
game_topic: PubsubTopic,
game_state: Arc<Mutex<GameState>>,
tx: mpsc::Sender<String>,
) -> Self {
Self {
game_state,
waku,
game_topic,
tx,
player_role: None,
}
}
fn start(self) -> TicTacToeApp<Running> {
let tx_clone = self.tx.clone();
let my_closure = move |response| {
if let LibwakuResponse::Success(v) = response {
let event: Event =
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");
match event {
Event::WakuMessage(evt) => {
// println!("WakuMessage event received: {:?}", evt.waku_message);
let message = evt.waku_message;
let payload = message.payload.to_vec();
match from_utf8(&payload) {
Ok(msg) => {
// Lock succeeded, proceed to send the message
if tx_clone.blocking_send(msg.to_string()).is_err() {
eprintln!("Failed to send message to async task");
}
}
Err(e) => {
eprintln!("Failed to decode payload as UTF-8: {}", e);
// Handle the error as needed, or just log and skip
}
}
}
Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err),
_ => panic!("event case not expected"),
};
}
};
// Establish a closure that handles the incoming messages
self.waku.set_event_callback(my_closure).expect("set event call back working");
let _ = self.waku.version();
// Start the waku node
let waku = self.waku.start().expect("waku should start");
// Subscribe to desired topic using the relay protocol
// self.waku.relay_subscribe(&self.game_topic.to_string()).expect("waku should subscribe");
let ctopic = WakuContentTopic::new("waku", "2", "tictactoegame", Encoding::Proto);
let content_topics = vec![ctopic];
waku.filter_subscribe(&self.game_topic, content_topics).expect("waku should subscribe");
// Connect to hard-coded node
// let target_node_multi_addr =
// "/ip4/159.223.242.94/tcp/30303/p2p/16Uiu2HAmAUdrQ3uwzuE4Gy4D56hX6uLKEeerJAnhKEHZ3DxF1EfT"
// // "/dns4/store-01.do-ams3.status.prod.status.im/tcp/30303/p2p/16Uiu2HAmAUdrQ3uwzuE4Gy4D56hX6uLKEeerJAnhKEHZ3DxF1EfT"
// // "/ip4/24.144.78.119/tcp/30303/p2p/16Uiu2HAm3xVDaz6SRJ6kErwC21zBJEZjavVXg7VSkoWzaV1aMA3F"
// .parse::<Multiaddr>().expect("parse multiaddress");
// self.waku.connect(&target_node_multi_addr, None)
// .expect("waku should connect to other node");
TicTacToeApp {
game_state: self.game_state,
waku: waku,
game_topic: self.game_topic,
tx: self.tx,
player_role: self.player_role,
}
}
}
impl TicTacToeApp<Running> {
fn send_game_state(&self, game_state: &GameState) {
let serialized_game_state = serde_json::to_string(game_state).unwrap();
let content_topic = WakuContentTopic::new("waku", "2", "tictactoegame", Encoding::Proto);
let message = WakuMessage::new(
&serialized_game_state,
content_topic,
0,
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis()
.try_into()
.unwrap(),
Vec::new(),
false,
);
// self.waku.relay_publish_message(&message, &self.game_topic.to_string(), None)
// .expect("Failed to send message");
self.waku.lightpush_publish_message(&message, &self.game_topic).expect("Failed to send message");
}
fn make_move(&mut self, row: usize, col: usize) {
if let Ok(mut game_state) = self.game_state.try_lock() {
if let Some(my_role) = self.player_role {
if (*game_state).current_turn != my_role {
return; // skip click if not my turn
}
}
if (*game_state).board[row][col].is_none() && (*game_state).moves_left > 0 {
(*game_state).board[row][col] = Some((*game_state).current_turn);
(*game_state).moves_left -= 1;
if let Some(winner) = self.check_winner(&game_state) {
(*game_state).current_turn = winner;
} else {
(*game_state).current_turn = match (*game_state).current_turn {
Player::X => Player::O,
Player::O => Player::X,
};
}
self.send_game_state(&game_state); // Send updated state after a move
}
}
}
fn check_winner(&self, game_state: &GameState) -> Option<Player> {
// Check rows, columns, and diagonals
for i in 0..3 {
if game_state.board[i][0] == game_state.board[i][1] &&
game_state.board[i][1] == game_state.board[i][2] {
if let Some(player) = game_state.board[i][0] {
return Some(player);
}
}
if game_state.board[0][i] == game_state.board[1][i] &&
game_state.board[1][i] == game_state.board[2][i] {
if let Some(player) = game_state.board[0][i] {
return Some(player);
}
}
}
if game_state.board[0][0] == game_state.board[1][1] &&
game_state.board[1][1] == game_state.board[2][2] {
if let Some(player) = game_state.board[0][0] {
return Some(player);
}
}
if game_state.board[0][2] == game_state.board[1][1] &&
game_state.board[1][1] == game_state.board[2][0] {
if let Some(player) = game_state.board[0][2] {
return Some(player);
}
}
None
}
fn reset_game(&mut self) {
self.game_state = Arc::new(Mutex::new(GameState {
board: [[None; 3]; 3],
current_turn: Player::X,
moves_left: 9,
}));
self.player_role = None
}
}
impl eframe::App for TicTacToeApp<Running> {
fn update(&mut self, ctx: &egui::Context, _frame: &mut eframe::Frame) {
// Request a repaint every second
ctx.request_repaint_after(Duration::from_secs(1));
egui::CentralPanel::default().show(ctx, |ui| {
ui.heading("Tic-Tac-Toe");
// If the player hasn't selected a role, show the role selection buttons
if self.player_role.is_none() {
ui.label("Select your role:");
if ui.button("Play as X").clicked() {
self.player_role = Some(Player::X);
}
if ui.button("Play as O").clicked() {
self.player_role = Some(Player::O);
if let Ok(mut game_state) = self.game_state.try_lock() {
(*game_state).current_turn = Player::X; // player X should start
}
}
return; // Exit early until a role is selected
}
let player_role = self.player_role.unwrap(); // Safe to unwrap because we've ensured it's Some
// Main game UI
ui.label(format!("You are playing as: {:?}", player_role));
// Draw the game board and handle the game state
let board_size = ui.available_size();
let cell_size = board_size.x / 4.0;
ui.horizontal(|ui| {
for row in 0..3 {
ui.vertical(|ui| {
for col in 0..3 {
let label;
{
if let Ok(game_state) = self.game_state.try_lock() {
label = match game_state.board[row][col] {
Some(Player::X) => "X",
Some(Player::O) => "O",
None => "-",
};
}
else {
label = "#";
}
}
let button = ui.add(egui::Button::new(label).min_size(egui::vec2(cell_size, cell_size)).sense(egui::Sense::click()));
if button.clicked() {
self.make_move(row, col);
}
}
});
if row < 2 {
ui.add_space(4.0);
}
}
});
if let Ok(game_state) = self.game_state.try_lock() {
if let Some(winner) = self.check_winner(&game_state) {
ui.label(format!(
"Player {} wins!",
match winner {
Player::X => "X",
Player::O => "O",
}
));
} else if game_state.moves_left == 0 {
ui.label("It's a tie!");
} else {
ui.label(format!(
"Player {}'s turn",
match game_state.current_turn {
Player::X => "X",
Player::O => "O",
}
));
}
}
if ui.add(egui::Button::new("Restart Game")).clicked() {
self.reset_game();
}
});
}
}
#[tokio::main]
async fn main() -> eframe::Result<()> {
let (tx, mut rx) = mpsc::channel::<String>(3200); // Channel to communicate between threads
let game_topic = PubsubTopic::new("/waku/2/rs/16/32");
// Create a Waku instance
let waku = waku_new(Some(WakuNodeConfig {
tcp_port: Some(60010),
cluster_id: Some(16),
shards: vec![1, 32, 64, 128, 256],
// node_key: Some(SecretKey::from_str("2fc0515879e52b7b73297cfd6ab3abf7c344ef84b7a90ff6f4cc19e05a198027").unwrap()),
max_message_size: Some("1024KiB".to_string()),
relay_topics: vec![String::from(&game_topic)],
log_level: Some("DEBUG"), // Supported: TRACE, DEBUG, INFO, NOTICE, WARN, ERROR or FATAL
keep_alive: Some(true),
// Discovery
dns_discovery: Some(true),
dns_discovery_url: Some("enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.prod.status.nodes.status.im"),
// discv5_discovery: Some(true),
// discv5_udp_port: Some(9001),
// discv5_enr_auto_update: Some(false),
..Default::default()
}))
.expect("should instantiate");
let game_state = GameState {
board: [[None; 3]; 3],
current_turn: Player::X,
moves_left: 9,
};
let shared_state = Arc::new(Mutex::new(game_state));
let clone = shared_state.clone();
let app = TicTacToeApp::new(waku, game_topic, clone, tx);
let app = app.start();
let clone = shared_state.clone();
// Listen for messages in the main thread
tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
// println!("MSG received: {}", msg);
// Handle the received message, e.g., update the UI or game state
if let Ok(parsed_value) = serde_json::from_str::<GameState>(&msg)
{
if let Ok(mut unclocked_game_state) = clone.lock(){
*unclocked_game_state = parsed_value;
}
}
else {
eprintln!("Failed to parse JSON");
}
}
});
eframe::run_native(
"Tic-Tac-Toe Multiplayer via Waku",
eframe::NativeOptions {
initial_window_size: Some(egui::vec2(400.0, 400.0)),
..Default::default()
},
Box::new(|_cc| Box::new(app)),
)?;
Ok(())
}

View File

@ -31,6 +31,8 @@ waku-sys = { version = "0.5.0", path = "../waku-sys" }
libc = "0.2"
serde-aux = "4.3.1"
rln = "0.3.4"
tokio = { version = "1", features = ["full"] }
regex = "1"
[dev-dependencies]
futures = "0.3.25"

View File

@ -0,0 +1,134 @@
// std
use std::borrow::Cow;
use std::fmt::{Display, Formatter};
use std::str::FromStr;
use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer};
use sscanf::{scanf, RegexRepresentation};
/// WakuMessage encoding scheme
#[derive(Clone, Debug, Eq, PartialEq, Default)]
pub enum Encoding {
#[default]
Proto,
Rlp,
Rfc26,
Unknown(String),
}
impl Display for Encoding {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let s = match self {
Encoding::Proto => "proto",
Encoding::Rlp => "rlp",
Encoding::Rfc26 => "rfc26",
Encoding::Unknown(value) => value,
};
f.write_str(s)
}
}
impl FromStr for Encoding {
type Err = std::io::Error;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"proto" => Ok(Self::Proto),
"rlp" => Ok(Self::Rlp),
"rfc26" => Ok(Self::Rfc26),
encoding => Ok(Self::Unknown(encoding.to_string())),
}
}
}
impl RegexRepresentation for Encoding {
const REGEX: &'static str = r"\w";
}
/// A waku content topic `/{application_name}/{version}/{content_topic_name}/{encdoing}`
#[derive(Clone, Debug, Eq, PartialEq, Default)]
pub struct WakuContentTopic {
pub application_name: Cow<'static, str>,
pub version: Cow<'static, str>,
pub content_topic_name: Cow<'static, str>,
pub encoding: Encoding,
}
impl WakuContentTopic {
pub const fn new(
application_name: &'static str,
version: &'static str,
content_topic_name: &'static str,
encoding: Encoding,
) -> Self {
Self {
application_name: Cow::Borrowed(application_name),
version: Cow::Borrowed(version),
content_topic_name: Cow::Borrowed(content_topic_name),
encoding,
}
}
pub fn join_content_topics(topics: Vec<WakuContentTopic>) -> String {
topics
.iter()
.map(|topic| topic.to_string())
.collect::<Vec<_>>()
.join(",")
}
}
impl FromStr for WakuContentTopic {
type Err = String;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
if let Ok((application_name, version, content_topic_name, encoding)) =
scanf!(s, "/{}/{}/{}/{:/.+?/}", String, String, String, Encoding)
{
Ok(WakuContentTopic {
application_name: Cow::Owned(application_name),
version: Cow::Owned(version),
content_topic_name: Cow::Owned(content_topic_name),
encoding,
})
} else {
Err(
format!(
"Wrong pub-sub topic format. Should be `/{{application-name}}/{{version-of-the-application}}/{{content-topic-name}}/{{encoding}}`. Got: {s}"
)
)
}
}
}
impl Display for WakuContentTopic {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"/{}/{}/{}/{}",
self.application_name, self.version, self.content_topic_name, self.encoding
)
}
}
impl Serialize for WakuContentTopic {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
self.to_string().serialize(serializer)
}
}
impl<'de> Deserialize<'de> for WakuContentTopic {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let as_string: String = String::deserialize(deserializer)?;
as_string
.parse::<WakuContentTopic>()
.map_err(D::Error::custom)
}
}

View File

@ -1,26 +1,24 @@
//! Waku [general](https://rfc.vac.dev/spec/36/#general) types
// std
use std::borrow::Cow;
use std::fmt::{Display, Formatter};
use std::str::FromStr;
pub mod contenttopic;
pub mod pubsubtopic;
// crates
use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer};
use contenttopic::WakuContentTopic;
use serde::{Deserialize, Serialize};
use serde_aux::prelude::*;
use sscanf::{scanf, RegexRepresentation};
/// Waku message version
pub type WakuMessageVersion = usize;
/// Waku message hash, hex encoded sha256 digest of the message
pub type MessageHash = String;
/// Waku response, just a `Result` with an `String` error.
pub type Result<T> = std::result::Result<T, String>;
// TODO: Properly type and deserialize payload form base64 encoded string
/// Waku message in JSON format.
/// as per the [specification](https://rfc.vac.dev/spec/36/#jsonmessage-type)
#[derive(Clone, Serialize, Deserialize, Debug)]
#[derive(Clone, Serialize, Deserialize, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct WakuMessage {
#[serde(with = "base64_serde", default = "Vec::new")]
@ -66,123 +64,6 @@ impl WakuMessage {
}
}
/// WakuMessage encoding scheme
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum Encoding {
Proto,
Rlp,
Rfc26,
Unknown(String),
}
impl Display for Encoding {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let s = match self {
Encoding::Proto => "proto",
Encoding::Rlp => "rlp",
Encoding::Rfc26 => "rfc26",
Encoding::Unknown(value) => value,
};
f.write_str(s)
}
}
impl FromStr for Encoding {
type Err = std::io::Error;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"proto" => Ok(Self::Proto),
"rlp" => Ok(Self::Rlp),
"rfc26" => Ok(Self::Rfc26),
encoding => Ok(Self::Unknown(encoding.to_string())),
}
}
}
impl RegexRepresentation for Encoding {
const REGEX: &'static str = r"\w";
}
/// A waku content topic `/{application_name}/{version}/{content_topic_name}/{encdoing}`
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct WakuContentTopic {
pub application_name: Cow<'static, str>,
pub version: Cow<'static, str>,
pub content_topic_name: Cow<'static, str>,
pub encoding: Encoding,
}
impl WakuContentTopic {
pub const fn new(
application_name: &'static str,
version: &'static str,
content_topic_name: &'static str,
encoding: Encoding,
) -> Self {
Self {
application_name: Cow::Borrowed(application_name),
version: Cow::Borrowed(version),
content_topic_name: Cow::Borrowed(content_topic_name),
encoding,
}
}
}
impl FromStr for WakuContentTopic {
type Err = String;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
if let Ok((application_name, version, content_topic_name, encoding)) =
scanf!(s, "/{}/{}/{}/{:/.+?/}", String, String, String, Encoding)
{
Ok(WakuContentTopic {
application_name: Cow::Owned(application_name),
version: Cow::Owned(version),
content_topic_name: Cow::Owned(content_topic_name),
encoding,
})
} else {
Err(
format!(
"Wrong pub-sub topic format. Should be `/{{application-name}}/{{version-of-the-application}}/{{content-topic-name}}/{{encoding}}`. Got: {s}"
)
)
}
}
}
impl Display for WakuContentTopic {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"/{}/{}/{}/{}",
self.application_name, self.version, self.content_topic_name, self.encoding
)
}
}
impl Serialize for WakuContentTopic {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
self.to_string().serialize(serializer)
}
}
impl<'de> Deserialize<'de> for WakuContentTopic {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let as_string: String = String::deserialize(deserializer)?;
as_string
.parse::<WakuContentTopic>()
.map_err(D::Error::custom)
}
}
mod base64_serde {
use base64::Engine;
use serde::de::Error;

View File

@ -0,0 +1,16 @@
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PubsubTopic(String);
impl PubsubTopic {
// Constructor to create a new MyString
pub fn new(value: &str) -> Self {
PubsubTopic(value.to_string())
}
}
// to allow conversion from `PubsubTopic` to `String`
impl From<&PubsubTopic> for String {
fn from(topic: &PubsubTopic) -> Self {
topic.0.to_string()
}
}

View File

@ -1,9 +1,12 @@
//! # Waku
//!
//! Implementation on top of [`waku-bindings`](https://rfc.vac.dev/spec/36/)
mod general;
mod node;
mod utils;
pub mod general;
pub mod node;
pub mod utils;
// Re-export the LibwakuResponse type to make it accessible outside this module
pub use utils::LibwakuResponse;
// Required so functions inside libwaku can call RLN functions even if we
// use it within the bindings functions
@ -12,10 +15,9 @@ mod utils;
use rln;
pub use node::{
waku_create_content_topic, waku_destroy, waku_new, Event, Initialized, Key, Multiaddr,
PublicKey, RLNConfig, Running, SecretKey, WakuMessageEvent, WakuNodeConfig, WakuNodeHandle,
waku_create_content_topic, waku_new, Event, Initialized, Key, Multiaddr, PublicKey, RLNConfig,
Running, SecretKey, WakuMessageEvent, WakuNodeConfig, WakuNodeHandle,
};
pub use general::{
Encoding, MessageHash, Result, WakuContentTopic, WakuMessage, WakuMessageVersion,
};
pub use general::contenttopic::{Encoding, WakuContentTopic};
pub use general::{MessageHash, Result, WakuMessage, WakuMessageVersion};

View File

@ -16,20 +16,45 @@ pub struct WakuNodeConfig {
pub host: Option<std::net::IpAddr>,
/// Libp2p TCP listening port. Default `60000`. Use `0` for **random**
#[default(Some(60000))]
pub port: Option<usize>,
pub tcp_port: Option<usize>,
/// Secp256k1 private key in Hex format (`0x123...abc`). Default random
#[serde(with = "secret_key_serde", rename = "key")]
pub node_key: Option<SecretKey>,
/// Cluster id that the node is running in
#[default(Some(0))]
pub cluster_id: Option<usize>,
/// Enable relay protocol. Default `true`
/// Relay protocol
#[default(Some(true))]
pub relay: Option<bool>,
pub relay_topics: Vec<String>,
#[default(vec![1])]
pub shards: Vec<usize>,
#[serde(skip_serializing_if = "Option::is_none")]
pub max_message_size: Option<String>,
/// RLN configuration
#[serde(skip_serializing_if = "Option::is_none")]
pub rln_relay: Option<RLNConfig>,
// Discovery
#[default(Some(false))]
pub dns_discovery: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub dns_discovery_url: Option<&'static str>,
#[default(Some(false))]
pub discv5_discovery: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub discv5_udp_port: Option<usize>,
#[default(Some(false))]
pub discv5_enr_auto_update: Option<bool>,
// other settings
#[serde(skip_serializing_if = "Option::is_none")]
pub log_level: Option<&'static str>,
#[serde(skip_serializing_if = "Option::is_none")]
pub keep_alive: Option<bool>,
}
/// RLN Relay configuration

View File

@ -1,6 +1,62 @@
// crates
use libc::c_void;
use std::ffi::c_void;
use std::ptr::null_mut;
use std::sync::{Arc, Mutex};
use crate::utils::{get_trampoline, LibwakuResponse};
type LibwakuResponseClosure = dyn FnMut(LibwakuResponse) + Send + Sync;
pub struct WakuNodeContext {
pub obj_ptr: *mut c_void,
obj_ptr: *mut c_void,
msg_observer: Arc<Mutex<Box<LibwakuResponseClosure>>>,
}
impl WakuNodeContext {
pub fn new(obj_ptr: *mut c_void) -> Self {
let me = Self {
obj_ptr,
msg_observer: Arc::new(Mutex::new(Box::new(|_| {}))),
};
// By default we set a callback that will panic if the user didn't specify a valid callback.
// And by valid callback we mean a callback that can properly handle the waku events.
me.waku_set_event_callback(WakuNodeContext::panic_callback)
.expect("correctly set default callback");
me
}
// default callback that does nothing. A valid callback should be set
fn panic_callback(_response: LibwakuResponse) {
panic!("callback not set. Please use waku_set_event_callback to set a valid callback")
}
pub fn get_ptr(&self) -> *mut c_void {
self.obj_ptr
}
pub fn reset_ptr(mut self) {
self.obj_ptr = null_mut();
}
/// Register callback to act as event handler and receive application events,
/// which are used to react to asynchronous events in Waku
pub fn waku_set_event_callback<F: FnMut(LibwakuResponse) + 'static + Sync + Send>(
&self,
closure: F,
) -> Result<(), String> {
if let Ok(mut boxed_closure) = self.msg_observer.lock() {
*boxed_closure = Box::new(closure);
unsafe {
let cb = get_trampoline(&(*boxed_closure));
waku_sys::waku_set_event_callback(
self.obj_ptr,
cb,
&mut (*boxed_closure) as *mut _ as *mut c_void,
)
};
Ok(())
} else {
Err("Failed to acquire lock in waku_set_event_callback!".to_string())
}
}
}

View File

@ -4,20 +4,18 @@
//! An example of an asynchronous event that might be emitted is receiving a message.
//! When an event is emitted, this callback will be triggered receiving an [`Event`]
// std
use std::ffi::c_void;
// crates
use serde::{Deserialize, Serialize};
// internal
use crate::general::WakuMessage;
use crate::node::context::WakuNodeContext;
use crate::utils::{get_trampoline, LibwakuResponse};
use std::str;
use crate::MessageHash;
/// Waku event
/// For now just WakuMessage is supported
#[non_exhaustive]
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "eventType", rename_all = "camelCase")]
pub enum Event {
#[serde(rename = "message")]
@ -26,7 +24,7 @@ pub enum Event {
}
/// Type of `event` field for a `message` event
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct WakuMessageEvent {
/// The pubsub topic on which the message was received
@ -37,25 +35,6 @@ pub struct WakuMessageEvent {
pub waku_message: WakuMessage,
}
/// Register callback to act as event handler and receive application events,
/// which are used to react to asynchronous events in Waku
pub fn waku_set_event_callback<F: FnMut(Event) + Send + Sync>(ctx: &WakuNodeContext, mut f: F) {
let cb = |response: LibwakuResponse| {
if let LibwakuResponse::Success(v) = response {
let data: Event =
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");
f(data);
};
};
unsafe {
let mut closure = cb;
let cb = get_trampoline(&closure);
waku_sys::waku_set_event_callback(ctx.obj_ptr, cb, &mut closure as *mut _ as *mut c_void)
};
}
#[cfg(test)]
mod tests {
use crate::Event;

View File

@ -0,0 +1,100 @@
//! Waku filter protocol related methods
// std
use std::ffi::CString;
// crates
use libc::*;
// internal
use crate::general::contenttopic::WakuContentTopic;
use crate::general::pubsubtopic::PubsubTopic;
use crate::general::Result;
use crate::node::context::WakuNodeContext;
use crate::utils::{get_trampoline, handle_no_response, LibwakuResponse};
pub fn waku_filter_subscribe(
ctx: &WakuNodeContext,
pubsub_topic: &PubsubTopic,
content_topics: Vec<WakuContentTopic>,
) -> Result<()> {
let content_topics = WakuContentTopic::join_content_topics(content_topics);
let pubsub_topic_ptr = CString::new(String::from(pubsub_topic))
.expect("CString should build properly from pubsub topic")
.into_raw();
let content_topics_ptr = CString::new(content_topics)
.expect("CString should build properly from content topic")
.into_raw();
let mut result: LibwakuResponse = Default::default();
let result_cb = |r: LibwakuResponse| result = r;
let code = unsafe {
let mut closure = result_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_filter_subscribe(
ctx.get_ptr(),
pubsub_topic_ptr,
content_topics_ptr,
cb,
&mut closure as *mut _ as *mut c_void,
);
drop(CString::from_raw(pubsub_topic_ptr));
drop(CString::from_raw(content_topics_ptr));
out
};
handle_no_response(code, result)
}
pub fn waku_filter_unsubscribe(
ctx: &WakuNodeContext,
pubsub_topic: &PubsubTopic,
content_topics: Vec<WakuContentTopic>, // comma-separated list of content topics
) -> Result<()> {
let content_topics_topics = WakuContentTopic::join_content_topics(content_topics);
let pubsub_topic_ptr = CString::new(String::from(pubsub_topic))
.expect("CString should build properly from pubsub topic")
.into_raw();
let content_topics_topics_ptr = CString::new(content_topics_topics)
.expect("CString should build properly from content topic")
.into_raw();
let mut result: LibwakuResponse = Default::default();
let result_cb = |r: LibwakuResponse| result = r;
let code = unsafe {
let mut closure = result_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_filter_unsubscribe(
ctx.get_ptr(),
pubsub_topic_ptr,
content_topics_topics_ptr,
cb,
&mut closure as *mut _ as *mut c_void,
);
drop(CString::from_raw(pubsub_topic_ptr));
drop(CString::from_raw(content_topics_topics_ptr));
out
};
handle_no_response(code, result)
}
pub fn waku_filter_unsubscribe_all(ctx: &WakuNodeContext) -> Result<()> {
let mut result: LibwakuResponse = Default::default();
let result_cb = |r: LibwakuResponse| result = r;
let code = unsafe {
let mut closure = result_cb;
let cb = get_trampoline(&closure);
waku_sys::waku_filter_unsubscribe_all(
ctx.get_ptr(),
cb,
&mut closure as *mut _ as *mut c_void,
)
};
handle_no_response(code, result)
}

View File

@ -0,0 +1,49 @@
//! Waku lightpush protocol related methods
// std
use std::ffi::CString;
// crates
use libc::*;
// internal
use crate::general::{MessageHash, Result, WakuMessage};
use crate::node::context::WakuNodeContext;
use crate::utils::{get_trampoline, handle_response, LibwakuResponse};
use crate::general::pubsubtopic::PubsubTopic;
pub fn waku_lightpush_publish_message(
ctx: &WakuNodeContext,
message: &WakuMessage,
pubsub_topic: &PubsubTopic,
) -> Result<MessageHash> {
let message_ptr = CString::new(
serde_json::to_string(&message)
.expect("WakuMessages should always be able to success serializing"),
)
.expect("CString should build properly from the serialized waku message")
.into_raw();
let pubsub_topic_ptr = CString::new(String::from(pubsub_topic))
.expect("CString should build properly from pubsub topic")
.into_raw();
let mut result: LibwakuResponse = Default::default();
let result_cb = |r: LibwakuResponse| result = r;
let code = unsafe {
let mut closure = result_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_lightpush_publish(
ctx.get_ptr(),
pubsub_topic_ptr,
message_ptr,
cb,
&mut closure as *mut _ as *mut c_void,
);
drop(CString::from_raw(message_ptr));
drop(CString::from_raw(pubsub_topic_ptr));
out
};
handle_response(code, result)
}

View File

@ -10,13 +10,17 @@ use super::config::WakuNodeConfig;
use crate::general::Result;
use crate::node::context::WakuNodeContext;
use crate::utils::LibwakuResponse;
use crate::utils::WakuDecode;
use crate::utils::{get_trampoline, handle_json_response, handle_no_response, handle_response};
/// Instantiates a Waku node
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_newchar-jsonconfig)
pub fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeContext> {
let config = config.unwrap_or_default();
unsafe {
waku_sys::waku_setup();
}
let config = config.unwrap_or_default();
let config_ptr = CString::new(
serde_json::to_string(&config)
.expect("Serialization from properly built NodeConfig should never fail"),
@ -39,7 +43,7 @@ pub fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeContext> {
match result {
LibwakuResponse::MissingCallback => panic!("callback is required"),
LibwakuResponse::Failure(v) => Err(v),
_ => Ok(WakuNodeContext { obj_ptr }),
_ => Ok(WakuNodeContext::new(obj_ptr)),
}
}
@ -49,7 +53,7 @@ pub fn waku_destroy(ctx: &WakuNodeContext) -> Result<()> {
let code = unsafe {
let mut closure = result_cb;
let cb = get_trampoline(&closure);
waku_sys::waku_destroy(ctx.obj_ptr, cb, &mut closure as *mut _ as *mut c_void)
waku_sys::waku_destroy(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void)
};
handle_no_response(code, result)
@ -63,7 +67,7 @@ pub fn waku_start(ctx: &WakuNodeContext) -> Result<()> {
let code = unsafe {
let mut closure = result_cb;
let cb = get_trampoline(&closure);
waku_sys::waku_start(ctx.obj_ptr, cb, &mut closure as *mut _ as *mut c_void)
waku_sys::waku_start(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void)
};
handle_no_response(code, result)
@ -77,7 +81,7 @@ pub fn waku_stop(ctx: &WakuNodeContext) -> Result<()> {
let code = unsafe {
let mut closure = result_cb;
let cb = get_trampoline(&closure);
waku_sys::waku_stop(ctx.obj_ptr, cb, &mut closure as *mut _ as *mut c_void)
waku_sys::waku_stop(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void)
};
handle_no_response(code, result)
@ -91,12 +95,23 @@ pub fn waku_version(ctx: &WakuNodeContext) -> Result<String> {
let code = unsafe {
let mut closure = result_cb;
let cb = get_trampoline(&closure);
waku_sys::waku_version(ctx.obj_ptr, cb, &mut closure as *mut _ as *mut c_void)
waku_sys::waku_version(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void)
};
handle_response(code, result)
}
// Implement WakuDecode for Vec<Multiaddr>
impl WakuDecode for Vec<Multiaddr> {
fn decode(input: &str) -> Result<Self> {
input
.split(',')
.map(|s| s.trim().parse::<Multiaddr>().map_err(|err| err.to_string()))
.collect::<Result<Vec<Multiaddr>>>() // Collect results into a Vec
.map_err(|err| format!("could not parse Multiaddr: {}", err))
}
}
/// Get the multiaddresses the Waku node is listening to
/// as per [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_listen_addresses)
pub fn waku_listen_addresses(ctx: &WakuNodeContext) -> Result<Vec<Multiaddr>> {
@ -105,7 +120,7 @@ pub fn waku_listen_addresses(ctx: &WakuNodeContext) -> Result<Vec<Multiaddr>> {
let code = unsafe {
let mut closure = result_cb;
let cb = get_trampoline(&closure);
waku_sys::waku_listen_addresses(ctx.obj_ptr, cb, &mut closure as *mut _ as *mut c_void)
waku_sys::waku_listen_addresses(ctx.get_ptr(), cb, &mut closure as *mut _ as *mut c_void)
};
handle_json_response(code, result)
@ -137,6 +152,7 @@ mod test {
fn nwaku_version() {
let node = waku_new(None).unwrap();
let version = waku_version(&node).expect("should return the version");
print!("Current version: {}", version);
assert!(!version.is_empty());
}
}

View File

@ -3,6 +3,8 @@
mod config;
mod context;
mod events;
mod filter;
mod lightpush;
mod management;
mod peers;
mod relay;
@ -14,30 +16,27 @@ pub use secp256k1::{PublicKey, SecretKey};
use std::marker::PhantomData;
use std::time::Duration;
// internal
use crate::general::contenttopic::{Encoding, WakuContentTopic};
pub use crate::general::pubsubtopic::PubsubTopic;
use crate::general::{MessageHash, Result, WakuMessage};
use context::WakuNodeContext;
use crate::utils::LibwakuResponse;
use crate::node::context::WakuNodeContext;
pub use config::RLNConfig;
pub use config::WakuNodeConfig;
pub use events::{Event, WakuMessageEvent};
pub use relay::waku_create_content_topic;
/// Marker trait to disallow undesired waku node states in the handle
pub trait WakuNodeState {}
use std::time::SystemTime;
/// Waku node initialized state
// Define state marker types
pub struct Initialized;
/// Waku node running state
pub struct Running;
impl WakuNodeState for Initialized {}
impl WakuNodeState for Running {}
/// Handle to the underliying waku node
pub struct WakuNodeHandle<State: WakuNodeState> {
pub struct WakuNodeHandle<State> {
ctx: WakuNodeContext,
phantom: PhantomData<State>,
_state: PhantomData<State>,
}
/// Spawn a new Waku node with the given configuration (default configuration if `None` provided)
@ -45,12 +44,21 @@ pub struct WakuNodeHandle<State: WakuNodeState> {
pub fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeHandle<Initialized>> {
Ok(WakuNodeHandle {
ctx: management::waku_new(config)?,
phantom: PhantomData,
_state: PhantomData,
})
}
pub fn waku_destroy(node: WakuNodeHandle<Initialized>) -> Result<()> {
management::waku_destroy(&node.ctx)
impl<State> WakuNodeHandle<State> {
/// Get the nwaku version
pub fn version(&self) -> Result<String> {
management::waku_version(&self.ctx)
}
pub fn waku_destroy(self) -> Result<()> {
let res = management::waku_destroy(&self.ctx);
self.ctx.reset_ptr();
res
}
}
impl WakuNodeHandle<Initialized> {
@ -59,9 +67,16 @@ impl WakuNodeHandle<Initialized> {
pub fn start(self) -> Result<WakuNodeHandle<Running>> {
management::waku_start(&self.ctx).map(|_| WakuNodeHandle {
ctx: self.ctx,
phantom: PhantomData,
_state: PhantomData,
})
}
pub fn set_event_callback<F: FnMut(LibwakuResponse) + 'static + Sync + Send>(
&self,
closure: F,
) -> Result<()> {
self.ctx.waku_set_event_callback(closure)
}
}
impl WakuNodeHandle<Running> {
@ -70,7 +85,7 @@ impl WakuNodeHandle<Running> {
pub fn stop(self) -> Result<WakuNodeHandle<Initialized>> {
management::waku_stop(&self.ctx).map(|_| WakuNodeHandle {
ctx: self.ctx,
phantom: PhantomData,
_state: PhantomData,
})
}
@ -80,11 +95,6 @@ impl WakuNodeHandle<Running> {
management::waku_listen_addresses(&self.ctx)
}
/// Get the nwaku version
pub fn version(&self) -> Result<String> {
management::waku_version(&self.ctx)
}
/// Dial peer using a multiaddress
/// If `timeout` as milliseconds doesn't fit into a `i32` it is clamped to [`i32::MAX`]
/// If the function execution takes longer than `timeout` value, the execution will be canceled and an error returned.
@ -94,29 +104,78 @@ impl WakuNodeHandle<Running> {
peers::waku_connect(&self.ctx, address, timeout)
}
pub fn relay_publish_txt(
&self,
pubsub_topic: &PubsubTopic,
msg_txt: &String,
content_topic_name: &'static str,
timeout: Option<Duration>,
) -> Result<MessageHash> {
let content_topic = WakuContentTopic::new("waku", "2", content_topic_name, Encoding::Proto);
let message = WakuMessage::new(
msg_txt,
content_topic,
0,
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis()
.try_into()
.unwrap(),
Vec::new(),
false,
);
relay::waku_relay_publish_message(&self.ctx, &message, pubsub_topic, timeout)
}
/// Publish a message using Waku Relay.
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms)
/// The pubsub_topic parameter is optional and if not specified it will be derived from the contentTopic.
pub fn relay_publish_message(
&self,
message: &WakuMessage,
pubsub_topic: &String,
pubsub_topic: &PubsubTopic,
timeout: Option<Duration>,
) -> Result<MessageHash> {
relay::waku_relay_publish_message(&self.ctx, message, pubsub_topic, timeout)
}
/// Subscribe to WakuRelay to receive messages matching a content filter.
pub fn relay_subscribe(&self, pubsub_topic: &String) -> Result<()> {
pub fn relay_subscribe(&self, pubsub_topic: &PubsubTopic) -> Result<()> {
relay::waku_relay_subscribe(&self.ctx, pubsub_topic)
}
/// Closes the pubsub subscription to stop receiving messages matching a content filter. No more messages will be received from this pubsub topic
pub fn relay_unsubscribe(&self, pubsub_topic: &String) -> Result<()> {
pub fn relay_unsubscribe(&self, pubsub_topic: &PubsubTopic) -> Result<()> {
relay::waku_relay_unsubscribe(&self.ctx, pubsub_topic)
}
pub fn set_event_callback<F: FnMut(Event) + Send + Sync + 'static>(&self, f: F) {
events::waku_set_event_callback(&self.ctx, f)
pub fn filter_subscribe(
&self,
pubsub_topic: &PubsubTopic,
content_topics: Vec<WakuContentTopic>,
) -> Result<()> {
filter::waku_filter_subscribe(&self.ctx, pubsub_topic, content_topics)
}
pub fn filter_unsubscribe(
&self,
pubsub_topic: &PubsubTopic,
content_topics: Vec<WakuContentTopic>,
) -> Result<()> {
filter::waku_filter_unsubscribe(&self.ctx, pubsub_topic, content_topics)
}
pub fn filter_unsubscribe_all(&self) -> Result<()> {
filter::waku_filter_unsubscribe_all(&self.ctx)
}
pub fn lightpush_publish_message(
&self,
message: &WakuMessage,
pubsub_topic: &PubsubTopic,
) -> Result<MessageHash> {
lightpush::waku_lightpush_publish_message(&self.ctx, message, pubsub_topic)
}
}

View File

@ -32,7 +32,7 @@ pub fn waku_connect(
let mut closure = result_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_connect(
ctx.obj_ptr,
ctx.get_ptr(),
address_ptr,
timeout
.map(|duration| duration.as_millis().try_into().unwrap_or(u32::MAX))

View File

@ -6,7 +6,9 @@ use std::time::Duration;
// crates
use libc::*;
// internal
use crate::general::{Encoding, MessageHash, Result, WakuContentTopic, WakuMessage};
use crate::general::contenttopic::{Encoding, WakuContentTopic};
use crate::general::pubsubtopic::PubsubTopic;
use crate::general::{MessageHash, Result, WakuMessage};
use crate::node::context::WakuNodeContext;
use crate::utils::{get_trampoline, handle_no_response, handle_response, LibwakuResponse};
@ -36,7 +38,7 @@ pub fn waku_create_content_topic(
let mut closure = result_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_content_topic(
ctx.obj_ptr,
ctx.get_ptr(),
application_name_ptr,
application_version,
content_topic_name_ptr,
@ -60,18 +62,16 @@ pub fn waku_create_content_topic(
pub fn waku_relay_publish_message(
ctx: &WakuNodeContext,
message: &WakuMessage,
pubsub_topic: &String,
pubsub_topic: &PubsubTopic,
timeout: Option<Duration>,
) -> Result<MessageHash> {
let pubsub_topic = pubsub_topic.to_string();
let message_ptr = CString::new(
serde_json::to_string(&message)
.expect("WakuMessages should always be able to success serializing"),
)
.expect("CString should build properly from the serialized waku message")
.into_raw();
let pubsub_topic_ptr = CString::new(pubsub_topic)
let pubsub_topic_ptr = CString::new(String::from(pubsub_topic))
.expect("CString should build properly from pubsub topic")
.into_raw();
@ -81,7 +81,7 @@ pub fn waku_relay_publish_message(
let mut closure = result_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_relay_publish(
ctx.obj_ptr,
ctx.get_ptr(),
pubsub_topic_ptr,
message_ptr,
timeout
@ -105,9 +105,8 @@ pub fn waku_relay_publish_message(
handle_response(code, result)
}
pub fn waku_relay_subscribe(ctx: &WakuNodeContext, pubsub_topic: &String) -> Result<()> {
let pubsub_topic = pubsub_topic.to_string();
let pubsub_topic_ptr = CString::new(pubsub_topic)
pub fn waku_relay_subscribe(ctx: &WakuNodeContext, pubsub_topic: &PubsubTopic) -> Result<()> {
let pubsub_topic_ptr = CString::new(String::from(pubsub_topic))
.expect("CString should build properly from pubsub topic")
.into_raw();
@ -117,7 +116,7 @@ pub fn waku_relay_subscribe(ctx: &WakuNodeContext, pubsub_topic: &String) -> Res
let mut closure = result_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_relay_subscribe(
ctx.obj_ptr,
ctx.get_ptr(),
pubsub_topic_ptr,
cb,
&mut closure as *mut _ as *mut c_void,
@ -131,9 +130,8 @@ pub fn waku_relay_subscribe(ctx: &WakuNodeContext, pubsub_topic: &String) -> Res
handle_no_response(code, result)
}
pub fn waku_relay_unsubscribe(ctx: &WakuNodeContext, pubsub_topic: &String) -> Result<()> {
let pubsub_topic = pubsub_topic.to_string();
let pubsub_topic_ptr = CString::new(pubsub_topic)
pub fn waku_relay_unsubscribe(ctx: &WakuNodeContext, pubsub_topic: &PubsubTopic) -> Result<()> {
let pubsub_topic_ptr = CString::new(String::from(pubsub_topic))
.expect("CString should build properly from pubsub topic")
.into_raw();
@ -143,7 +141,7 @@ pub fn waku_relay_unsubscribe(ctx: &WakuNodeContext, pubsub_topic: &String) -> R
let mut closure = result_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_relay_subscribe(
ctx.obj_ptr,
ctx.get_ptr(),
pubsub_topic_ptr,
cb,
&mut closure as *mut _ as *mut c_void,

View File

@ -1,6 +1,5 @@
use crate::general::Result;
use core::str::FromStr;
use serde::de::DeserializeOwned;
use std::convert::TryFrom;
use std::{slice, str};
use waku_sys::WakuCallBack;
@ -32,9 +31,13 @@ impl TryFrom<(u32, &str)> for LibwakuResponse {
}
}
pub fn decode<T: DeserializeOwned>(input: String) -> Result<T> {
serde_json::from_str(input.as_str())
.map_err(|err| format!("could not deserialize waku response: {}", err))
// Define the WakuDecode trait
pub trait WakuDecode: Sized {
fn decode(input: &str) -> Result<Self>;
}
pub fn decode<T: WakuDecode>(input: String) -> Result<T> {
T::decode(input.as_str())
}
unsafe extern "C" fn trampoline<F>(
@ -45,7 +48,7 @@ unsafe extern "C" fn trampoline<F>(
) where
F: FnMut(LibwakuResponse),
{
let user_data = &mut *(user_data as *mut F);
let closure = &mut *(user_data as *mut F);
let response = if data.is_null() {
""
@ -57,7 +60,7 @@ unsafe extern "C" fn trampoline<F>(
let result = LibwakuResponse::try_from((ret_code as u32, response))
.expect("invalid response obtained from libwaku");
user_data(result);
closure(result);
}
pub fn get_trampoline<F>(_closure: &F) -> WakuCallBack
@ -84,7 +87,7 @@ pub fn handle_no_response(code: i32, result: LibwakuResponse) -> Result<()> {
}
}
pub fn handle_json_response<F: DeserializeOwned>(code: i32, result: LibwakuResponse) -> Result<F> {
pub fn handle_json_response<F: WakuDecode>(code: i32, result: LibwakuResponse) -> Result<F> {
match result {
LibwakuResponse::Success(v) => decode(v.unwrap_or_default()),
LibwakuResponse::Failure(v) => Err(v),

View File

@ -1,16 +1,20 @@
use multiaddr::Multiaddr;
use regex::Regex;
use secp256k1::SecretKey;
use serial_test::serial;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
use std::{collections::HashSet, str::from_utf8};
use tokio::sync::broadcast::{self, Sender};
use tokio::time;
use tokio::time::sleep;
use waku_bindings::node::PubsubTopic;
use waku_bindings::{
waku_destroy, waku_new, Encoding, Event, MessageHash, Running, WakuContentTopic, WakuMessage,
waku_new, Encoding, Event, Initialized, MessageHash, WakuContentTopic, WakuMessage,
WakuNodeConfig, WakuNodeHandle,
};
const ECHO_TIMEOUT: u64 = 10;
use waku_bindings::{LibwakuResponse, Running};
const ECHO_TIMEOUT: u64 = 1000;
const ECHO_MESSAGE: &str = "Hi from 🦀!";
const TEST_PUBSUBTOPIC: &str = "test";
@ -18,40 +22,79 @@ fn try_publish_relay_messages(
node: &WakuNodeHandle<Running>,
msg: &WakuMessage,
) -> Result<HashSet<MessageHash>, String> {
let topic = TEST_PUBSUBTOPIC.to_string();
Ok(HashSet::from([
node.relay_publish_message(msg, &topic, None)?
]))
}
#[derive(Debug, Clone)]
struct Response {
hash: MessageHash,
payload: Vec<u8>,
}
fn set_callback(node: &WakuNodeHandle<Running>, tx: Sender<Response>) {
node.set_event_callback(move |event| {
if let Event::WakuMessage(message) = event {
let hash = message.message_hash;
let message = message.waku_message;
let payload = message.payload.to_vec();
tx.send(Response {
hash: hash.to_string(),
payload,
})
.expect("send response to the receiver");
}
});
Ok(HashSet::from([node.relay_publish_message(
msg,
&PubsubTopic::new(TEST_PUBSUBTOPIC),
None,
)?]))
}
async fn test_echo_messages(
node1: &WakuNodeHandle<Running>,
node2: &WakuNodeHandle<Running>,
node1: WakuNodeHandle<Initialized>,
node2: WakuNodeHandle<Initialized>,
content: &'static str,
content_topic: WakuContentTopic,
) {
) -> Result<(), String> {
// setting a naïve event handler to avoid appearing ERR messages in logs
node1
.set_event_callback(&|_| {})
.expect("set event call back working");
let rx_waku_message: Arc<Mutex<WakuMessage>> = Arc::new(Mutex::new(WakuMessage::default()));
let rx_waku_message_cloned = rx_waku_message.clone();
let closure = move |response| {
if let LibwakuResponse::Success(v) = response {
let event: Event =
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");
match event {
Event::WakuMessage(evt) => {
if let Ok(mut msg_lock) = rx_waku_message_cloned.lock() {
*msg_lock = evt.waku_message;
}
}
Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err),
_ => panic!("event case not expected"),
};
}
};
println!("Before setting event callback");
node2
.set_event_callback(closure)
.expect("set event call back working"); // Set the event callback with the closure
let node1 = node1.start()?;
let node2 = node2.start()?;
node1
.relay_subscribe(&PubsubTopic::new(TEST_PUBSUBTOPIC))
.unwrap();
node2
.relay_subscribe(&PubsubTopic::new(TEST_PUBSUBTOPIC))
.unwrap();
sleep(Duration::from_secs(3)).await;
// Interconnect nodes
// Replace all matches with 127.0.0.1 to avoid issue with NAT or firewall.
let addresses1 = node1.listen_addresses().unwrap();
let addresses1 = &addresses1[0].to_string();
let re = Regex::new(r"\b(?:\d{1,3}\.){3}\d{1,3}\b").unwrap();
let addresses1 = re.replace_all(addresses1, "127.0.0.1").to_string();
let addresses1 = addresses1.parse::<Multiaddr>().expect("parse multiaddress");
println!("Connecting node1 to node2: {}", addresses1);
node2.connect(&addresses1, None).unwrap();
// Wait for mesh to form
sleep(Duration::from_secs(3)).await;
dbg!("Before publish");
let message = WakuMessage::new(
content,
content_topic,
@ -65,51 +108,47 @@ async fn test_echo_messages(
Vec::new(),
false,
);
let _ids = try_publish_relay_messages(&node1, &message).expect("send relay messages");
node1.set_event_callback(move |_event| {});
let (tx, mut rx) = broadcast::channel(1);
set_callback(node2, tx);
let mut ids = try_publish_relay_messages(node1, &message).expect("send relay messages");
while let Ok(res) = rx.recv().await {
if ids.take(&res.hash).is_some() {
let msg = from_utf8(&res.payload).expect("should be valid message");
assert_eq!(content, msg);
}
if ids.is_empty() {
break;
// Wait for the msg to arrive
let rx_waku_message_cloned = rx_waku_message.clone();
for _ in 0..50 {
if let Ok(msg) = rx_waku_message_cloned.lock() {
// dbg!("The waku message value is: {:?}", msg);
let payload = msg.payload.to_vec();
let payload_str = from_utf8(&payload).expect("should be valid message");
if payload_str == ECHO_MESSAGE {
node1.stop()?;
node2.stop()?;
return Ok(());
}
} else {
sleep(Duration::from_millis(100)).await;
}
}
let node1 = node1.stop()?;
let node2 = node2.stop()?;
node1.waku_destroy()?;
node2.waku_destroy()?;
return Err("Unexpected test ending".to_string());
}
#[tokio::test]
#[serial]
async fn default_echo() -> Result<(), String> {
println!("Test default_echo");
let node1 = waku_new(Some(WakuNodeConfig {
port: Some(60010),
tcp_port: Some(60010),
..Default::default()
}))?;
let node2 = waku_new(Some(WakuNodeConfig {
port: Some(60020),
tcp_port: Some(60020),
..Default::default()
}))?;
let node1 = node1.start()?;
let node2 = node2.start()?;
let addresses1 = node1.listen_addresses()?;
node2.connect(&addresses1[0], None)?;
let topic = TEST_PUBSUBTOPIC.to_string();
node1.relay_subscribe(&topic)?;
node2.relay_subscribe(&topic)?;
// Wait for mesh to form
sleep(Duration::from_secs(3)).await;
let content_topic = WakuContentTopic::new("toychat", "2", "huilong", Encoding::Proto);
let sleep = time::sleep(Duration::from_secs(ECHO_TIMEOUT));
@ -118,17 +157,11 @@ async fn default_echo() -> Result<(), String> {
// Send and receive messages. Waits until all messages received.
let got_all = tokio::select! {
_ = sleep => false,
_ = test_echo_messages(&node1, &node2, ECHO_MESSAGE, content_topic) => true,
_ = test_echo_messages(node1, node2, ECHO_MESSAGE, content_topic) => true,
};
assert!(got_all);
let node2 = node2.stop()?;
let node1 = node1.stop()?;
waku_destroy(node1)?;
waku_destroy(node2)?;
Ok(())
}
@ -145,9 +178,8 @@ fn node_restart() {
for _ in 0..3 {
let node = waku_new(config.clone().into()).expect("default config should be valid");
let node = node.start().expect("node should start with valid config");
node.stop().expect("node should stop");
let node = node.stop().expect("node should stop");
node.waku_destroy().expect("free resources");
}
}

View File

@ -36,4 +36,4 @@ crate-type = ["rlib"]
[build-dependencies]
bindgen = "0.64"
cc = "1.0"
cc = "1.0.73"

View File

@ -38,6 +38,7 @@ fn generate_bindgen_code(project_dir: &Path) {
vendor_path.join("build").display()
);
println!("cargo:rustc-link-lib=static=waku");
println!(
"cargo:rustc-link-search={}",
vendor_path
@ -45,6 +46,7 @@ fn generate_bindgen_code(project_dir: &Path) {
.display()
);
println!("cargo:rustc-link-lib=static=miniupnpc");
println!(
"cargo:rustc-link-search={}",
vendor_path
@ -52,8 +54,10 @@ fn generate_bindgen_code(project_dir: &Path) {
.display()
);
println!("cargo:rustc-link-lib=static=natpmp");
println!("cargo:rustc-link-lib=dl");
println!("cargo:rustc-link-lib=m");
println!(
"cargo:rustc-link-search=native={}",
vendor_path
@ -62,7 +66,25 @@ fn generate_bindgen_code(project_dir: &Path) {
);
println!("cargo:rustc-link-lib=static=backtrace");
println!("cargo:rustc-link-lib=stdc++");
println!(
"cargo:rustc-link-search={}",
vendor_path.join("vendor/negentropy/cpp").display()
);
println!("cargo:rustc-link-lib=static=negentropy");
println!("cargo:rustc-link-lib=ssl");
println!("cargo:rustc-link-lib=crypto");
cc::Build::new()
.file("src/cmd.c") // Compile the C file
.compile("cmditems"); // Compile it as a library
println!("cargo:rustc-link-lib=static=cmditems");
// TODO: Determine if pthread is automatically included
println!("cargo:rustc-link-lib=pthread");
// TODO: Test in other architectures
// Generate waku bindings with bindgen

13
waku-sys/src/cmd.c Normal file
View File

@ -0,0 +1,13 @@
/*
This file is needed to avoid errors like the following when linking the waku-sys lib crate:
<<undefined reference to `cmdCount'>>
and
<<undefined reference to `cmdLine'>>
*/
#include <stdio.h>
int cmdCount = 0;
char** cmdLine = NULL;

@ -1 +1 @@
Subproject commit 964d7ab7dc3dc38c9a05087b998a0cc7a1475cc0
Subproject commit d814519578380bf01398c29424a5fd1005ed3a29