new PubsubTopic type

This commit is contained in:
Ivan Folgueira Bande 2024-11-27 23:20:58 +01:00
parent 2fd169bda6
commit 8cf8c41f19
No known key found for this signature in database
GPG Key ID: 3C117481F89E24A7
10 changed files with 65 additions and 44 deletions

View File

@ -3,7 +3,8 @@ use std::str::from_utf8;
use std::time::SystemTime;
use tokio::time::{sleep, Duration};
use waku::{
waku_new, Encoding, Event, LibwakuResponse, WakuContentTopic, WakuMessage, WakuNodeConfig,
general::pubsubtopic::PubsubTopic, waku_new, Encoding, Event, LibwakuResponse,
WakuContentTopic, WakuMessage, WakuNodeConfig,
};
#[tokio::main]
@ -73,7 +74,7 @@ async fn main() -> Result<(), Error> {
// ========================================================================
// Subscribe to pubsub topic
let topic = "test".to_string();
let topic = PubsubTopic::new("test");
node1
.relay_subscribe(&topic)

View File

@ -7,7 +7,8 @@ use std::time::{SystemTime, Duration};
use tokio::sync::mpsc;
use waku::{
waku_new, Encoding, Event, LibwakuResponse, WakuContentTopic,
WakuMessage, WakuNodeConfig, WakuNodeHandle, Initialized, Running
WakuMessage, WakuNodeConfig, WakuNodeHandle, Initialized, Running,
general::pubsubtopic::PubsubTopic,
};
#[derive(Serialize, Deserialize, PartialEq, Debug, Copy, Clone)]
@ -26,7 +27,7 @@ struct GameState {
struct TicTacToeApp<State> {
game_state: Arc<Mutex<GameState>>,
waku: WakuNodeHandle<State>,
game_topic: &'static str,
game_topic: PubsubTopic,
tx: mpsc::Sender<String>, // Sender to send `msg` to main thread
player_role: Option<Player>, // Store the player's role (X or O)
}
@ -34,7 +35,7 @@ struct TicTacToeApp<State> {
impl TicTacToeApp<Initialized> {
fn new(
waku: WakuNodeHandle<Initialized>,
game_topic: &'static str,
game_topic: PubsubTopic,
game_state: Arc<Mutex<GameState>>,
tx: mpsc::Sender<String>,
) -> Self {
@ -92,7 +93,7 @@ impl TicTacToeApp<Initialized> {
let ctopic = WakuContentTopic::new("waku", "2", "tictactoegame", Encoding::Proto);
let content_topics = vec![ctopic];
waku.filter_subscribe(&self.game_topic.to_string(), content_topics).expect("waku should subscribe");
waku.filter_subscribe(&self.game_topic, content_topics).expect("waku should subscribe");
// Connect to hard-coded node
// let target_node_multi_addr =
@ -135,7 +136,7 @@ impl TicTacToeApp<Running> {
// self.waku.relay_publish_message(&message, &self.game_topic.to_string(), None)
// .expect("Failed to send message");
self.waku.lightpush_publish_message(&message, &self.game_topic.to_string()).expect("Failed to send message");
self.waku.lightpush_publish_message(&message, &self.game_topic).expect("Failed to send message");
}
fn make_move(&mut self, row: usize, col: usize) {
@ -306,7 +307,7 @@ impl eframe::App for TicTacToeApp<Running> {
async fn main() -> eframe::Result<()> {
let (tx, mut rx) = mpsc::channel::<String>(3200); // Channel to communicate between threads
let game_topic = "/waku/2/rs/16/32";
let game_topic = PubsubTopic::new("/waku/2/rs/16/32");
// Create a Waku instance
let waku = waku_new(Some(WakuNodeConfig {
tcp_port: Some(60010),
@ -314,7 +315,7 @@ async fn main() -> eframe::Result<()> {
shards: vec![1, 32, 64, 128, 256],
// node_key: Some(SecretKey::from_str("2fc0515879e52b7b73297cfd6ab3abf7c344ef84b7a90ff6f4cc19e05a198027").unwrap()),
max_message_size: Some("1024KiB".to_string()),
relay_topics: vec![game_topic.to_string()],
relay_topics: vec![String::from(&game_topic)],
log_level: Some("DEBUG"), // Supported: TRACE, DEBUG, INFO, NOTICE, WARN, ERROR or FATAL
keep_alive: Some(true),

View File

@ -1,6 +1,7 @@
//! Waku [general](https://rfc.vac.dev/spec/36/#general) types
pub mod contenttopic;
pub mod pubsubtopic;
// crates
use contenttopic::WakuContentTopic;

View File

@ -0,0 +1,16 @@
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PubsubTopic(String);
impl PubsubTopic {
// Constructor to create a new MyString
pub fn new(value: &str) -> Self {
PubsubTopic(value.to_string())
}
}
// to allow conversion from `PubsubTopic` to `String`
impl From<&PubsubTopic> for String {
fn from(topic: &PubsubTopic) -> Self {
topic.0.to_string()
}
}

View File

@ -1,7 +1,7 @@
//! # Waku
//!
//! Implementation on top of [`waku-bindings`](https://rfc.vac.dev/spec/36/)
mod general;
pub mod general;
pub mod node;
pub mod utils;

View File

@ -6,19 +6,19 @@ use std::ffi::CString;
use libc::*;
// internal
use crate::general::contenttopic::WakuContentTopic;
use crate::general::pubsubtopic::PubsubTopic;
use crate::general::Result;
use crate::node::context::WakuNodeContext;
use crate::utils::{get_trampoline, handle_no_response, LibwakuResponse};
pub fn waku_filter_subscribe(
ctx: &WakuNodeContext,
pubsub_topic: &str,
pubsub_topic: &PubsubTopic,
content_topics: Vec<WakuContentTopic>,
) -> Result<()> {
let pubsub_topic = pubsub_topic.to_string();
let content_topics = WakuContentTopic::join_content_topics(content_topics);
let pubsub_topic_ptr = CString::new(pubsub_topic)
let pubsub_topic_ptr = CString::new(String::from(pubsub_topic))
.expect("CString should build properly from pubsub topic")
.into_raw();
let content_topics_ptr = CString::new(content_topics)
@ -49,13 +49,12 @@ pub fn waku_filter_subscribe(
pub fn waku_filter_unsubscribe(
ctx: &WakuNodeContext,
pubsub_topic: &str,
pubsub_topic: &PubsubTopic,
content_topics: Vec<WakuContentTopic>, // comma-separated list of content topics
) -> Result<()> {
let pubsub_topic = pubsub_topic.to_string();
let content_topics_topics = WakuContentTopic::join_content_topics(content_topics);
let pubsub_topic_ptr = CString::new(pubsub_topic)
let pubsub_topic_ptr = CString::new(String::from(pubsub_topic))
.expect("CString should build properly from pubsub topic")
.into_raw();
let content_topics_topics_ptr = CString::new(content_topics_topics)

View File

@ -9,20 +9,20 @@ use crate::general::{MessageHash, Result, WakuMessage};
use crate::node::context::WakuNodeContext;
use crate::utils::{get_trampoline, handle_response, LibwakuResponse};
use crate::general::pubsubtopic::PubsubTopic;
pub fn waku_lightpush_publish_message(
ctx: &WakuNodeContext,
message: &WakuMessage,
pubsub_topic: &str,
pubsub_topic: &PubsubTopic,
) -> Result<MessageHash> {
let pubsub_topic = pubsub_topic.to_string();
let message_ptr = CString::new(
serde_json::to_string(&message)
.expect("WakuMessages should always be able to success serializing"),
)
.expect("CString should build properly from the serialized waku message")
.into_raw();
let pubsub_topic_ptr = CString::new(pubsub_topic)
let pubsub_topic_ptr = CString::new(String::from(pubsub_topic))
.expect("CString should build properly from pubsub topic")
.into_raw();

View File

@ -17,6 +17,7 @@ use std::marker::PhantomData;
use std::time::Duration;
// internal
use crate::general::contenttopic::{Encoding, WakuContentTopic};
pub use crate::general::pubsubtopic::PubsubTopic;
use crate::general::{MessageHash, Result, WakuMessage};
use crate::utils::LibwakuResponse;
@ -105,7 +106,7 @@ impl WakuNodeHandle<Running> {
pub fn relay_publish_txt(
&self,
pubsub_topic: &String,
pubsub_topic: &PubsubTopic,
msg_txt: &String,
content_topic_name: &'static str,
timeout: Option<Duration>,
@ -134,25 +135,25 @@ impl WakuNodeHandle<Running> {
pub fn relay_publish_message(
&self,
message: &WakuMessage,
pubsub_topic: &String,
pubsub_topic: &PubsubTopic,
timeout: Option<Duration>,
) -> Result<MessageHash> {
relay::waku_relay_publish_message(&self.ctx, message, pubsub_topic, timeout)
}
/// Subscribe to WakuRelay to receive messages matching a content filter.
pub fn relay_subscribe(&self, pubsub_topic: &String) -> Result<()> {
pub fn relay_subscribe(&self, pubsub_topic: &PubsubTopic) -> Result<()> {
relay::waku_relay_subscribe(&self.ctx, pubsub_topic)
}
/// Closes the pubsub subscription to stop receiving messages matching a content filter. No more messages will be received from this pubsub topic
pub fn relay_unsubscribe(&self, pubsub_topic: &String) -> Result<()> {
pub fn relay_unsubscribe(&self, pubsub_topic: &PubsubTopic) -> Result<()> {
relay::waku_relay_unsubscribe(&self.ctx, pubsub_topic)
}
pub fn filter_subscribe(
&self,
pubsub_topic: &String,
pubsub_topic: &PubsubTopic,
content_topics: Vec<WakuContentTopic>,
) -> Result<()> {
filter::waku_filter_subscribe(&self.ctx, pubsub_topic, content_topics)
@ -160,7 +161,7 @@ impl WakuNodeHandle<Running> {
pub fn filter_unsubscribe(
&self,
pubsub_topic: &String,
pubsub_topic: &PubsubTopic,
content_topics: Vec<WakuContentTopic>,
) -> Result<()> {
filter::waku_filter_unsubscribe(&self.ctx, pubsub_topic, content_topics)
@ -173,7 +174,7 @@ impl WakuNodeHandle<Running> {
pub fn lightpush_publish_message(
&self,
message: &WakuMessage,
pubsub_topic: &String,
pubsub_topic: &PubsubTopic,
) -> Result<MessageHash> {
lightpush::waku_lightpush_publish_message(&self.ctx, message, pubsub_topic)
}

View File

@ -7,6 +7,7 @@ use std::time::Duration;
use libc::*;
// internal
use crate::general::contenttopic::{Encoding, WakuContentTopic};
use crate::general::pubsubtopic::PubsubTopic;
use crate::general::{MessageHash, Result, WakuMessage};
use crate::node::context::WakuNodeContext;
use crate::utils::{get_trampoline, handle_no_response, handle_response, LibwakuResponse};
@ -61,18 +62,16 @@ pub fn waku_create_content_topic(
pub fn waku_relay_publish_message(
ctx: &WakuNodeContext,
message: &WakuMessage,
pubsub_topic: &str,
pubsub_topic: &PubsubTopic,
timeout: Option<Duration>,
) -> Result<MessageHash> {
let pubsub_topic = pubsub_topic.to_string();
let message_ptr = CString::new(
serde_json::to_string(&message)
.expect("WakuMessages should always be able to success serializing"),
)
.expect("CString should build properly from the serialized waku message")
.into_raw();
let pubsub_topic_ptr = CString::new(pubsub_topic)
let pubsub_topic_ptr = CString::new(String::from(pubsub_topic))
.expect("CString should build properly from pubsub topic")
.into_raw();
@ -106,9 +105,8 @@ pub fn waku_relay_publish_message(
handle_response(code, result)
}
pub fn waku_relay_subscribe(ctx: &WakuNodeContext, pubsub_topic: &str) -> Result<()> {
let pubsub_topic = pubsub_topic.to_string();
let pubsub_topic_ptr = CString::new(pubsub_topic)
pub fn waku_relay_subscribe(ctx: &WakuNodeContext, pubsub_topic: &PubsubTopic) -> Result<()> {
let pubsub_topic_ptr = CString::new(String::from(pubsub_topic))
.expect("CString should build properly from pubsub topic")
.into_raw();
@ -132,9 +130,8 @@ pub fn waku_relay_subscribe(ctx: &WakuNodeContext, pubsub_topic: &str) -> Result
handle_no_response(code, result)
}
pub fn waku_relay_unsubscribe(ctx: &WakuNodeContext, pubsub_topic: &String) -> Result<()> {
let pubsub_topic = pubsub_topic.to_string();
let pubsub_topic_ptr = CString::new(pubsub_topic)
pub fn waku_relay_unsubscribe(ctx: &WakuNodeContext, pubsub_topic: &PubsubTopic) -> Result<()> {
let pubsub_topic_ptr = CString::new(String::from(pubsub_topic))
.expect("CString should build properly from pubsub topic")
.into_raw();

View File

@ -8,6 +8,7 @@ use std::time::{Duration, SystemTime};
use std::{collections::HashSet, str::from_utf8};
use tokio::time;
use tokio::time::sleep;
use waku_bindings::node::PubsubTopic;
use waku_bindings::{
waku_new, Encoding, Event, Initialized, MessageHash, WakuContentTopic, WakuMessage,
WakuNodeConfig, WakuNodeHandle,
@ -21,10 +22,11 @@ fn try_publish_relay_messages(
node: &WakuNodeHandle<Running>,
msg: &WakuMessage,
) -> Result<HashSet<MessageHash>, String> {
let topic = TEST_PUBSUBTOPIC.to_string();
Ok(HashSet::from([
node.relay_publish_message(msg, &topic, None)?
]))
Ok(HashSet::from([node.relay_publish_message(
msg,
&PubsubTopic::new(TEST_PUBSUBTOPIC),
None,
)?]))
}
async fn test_echo_messages(
@ -67,9 +69,12 @@ async fn test_echo_messages(
let node1 = node1.start()?;
let node2 = node2.start()?;
let topic = TEST_PUBSUBTOPIC.to_string();
node1.relay_subscribe(&topic).unwrap();
node2.relay_subscribe(&topic).unwrap();
node1
.relay_subscribe(&PubsubTopic::new(TEST_PUBSUBTOPIC))
.unwrap();
node2
.relay_subscribe(&PubsubTopic::new(TEST_PUBSUBTOPIC))
.unwrap();
sleep(Duration::from_secs(3)).await;