Update Deps and fixes

This commit is contained in:
SionoiS 2025-09-03 12:33:41 -04:00
parent 57505a1c06
commit a5f30f2aa8
No known key found for this signature in database
GPG Key ID: C9458A8CB1852951
18 changed files with 1352 additions and 1642 deletions

2533
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -2,20 +2,20 @@ use std::io::Error;
use std::str::from_utf8; use std::str::from_utf8;
use tokio::time::{sleep, Duration}; use tokio::time::{sleep, Duration};
use waku::{ use waku::{
general::pubsubtopic::PubsubTopic, waku_new, Encoding, LibwakuResponse, WakuContentTopic, Encoding, LibwakuResponse, WakuContentTopic, WakuEvent, WakuMessage, WakuNodeConfig,
WakuEvent, WakuMessage, WakuNodeConfig, WakuNodeHandle,
}; };
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Error> { async fn main() -> Result<(), Error> {
let node1 = waku_new(Some(WakuNodeConfig { let node1 = WakuNodeHandle::new(Some(WakuNodeConfig {
tcp_port: Some(60010), // TODO: use any available port. tcp_port: Some(60010), // TODO: use any available port.
..Default::default() ..Default::default()
})) }))
.await .await
.expect("should instantiate"); .expect("should instantiate");
let node2 = waku_new(Some(WakuNodeConfig { let node2 = WakuNodeHandle::new(Some(WakuNodeConfig {
tcp_port: Some(60020), // TODO: use any available port. tcp_port: Some(60020), // TODO: use any available port.
..Default::default() ..Default::default()
})) }))
@ -87,15 +87,15 @@ async fn main() -> Result<(), Error> {
// ======================================================================== // ========================================================================
// Subscribe to pubsub topic // Subscribe to pubsub topic
let topic = PubsubTopic::new("test"); let topic = "test";
node1 node1
.relay_subscribe(&topic) .relay_subscribe(topic)
.await .await
.expect("node1 should subscribe"); .expect("node1 should subscribe");
node2 node2
.relay_subscribe(&topic) .relay_subscribe(topic)
.await .await
.expect("node2 should subscribe"); .expect("node2 should subscribe");
@ -121,9 +121,14 @@ async fn main() -> Result<(), Error> {
// Publish a message // Publish a message
let content_topic = WakuContentTopic::new("waku", "2", "test", Encoding::Proto); let content_topic = WakuContentTopic::new("waku", "2", "test", Encoding::Proto);
let message = WakuMessage::new("Hello world", content_topic, 0, Vec::new(), false); let message = WakuMessage {
payload: "Hello world".to_string().into_bytes(),
content_topic,
..Default::default()
};
node1 node1
.relay_publish_message(&message, &topic, None) .relay_publish_message(&message, topic, None)
.await .await
.expect("should have sent the message"); .expect("should have sent the message");

View File

@ -16,28 +16,29 @@ categories = ["network-programming"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
aes-gcm = { version = "0.10", features = ["aes"] } aes-gcm = { version = "0.10", features = ["aes"] }
base64 = "0.21" base64 = "0.22"
enr = { version = "0.7", features = ["serde", "rust-secp256k1"] }
hex = "0.4"
multiaddr = "0.17"
once_cell = "1.15"
rand = "0.8"
secp256k1 = { version = "0.26", features = ["rand", "recovery", "serde"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sscanf = "0.4"
smart-default = "0.6"
url = "2.3"
waku-sys = { version = "1.0.0", path = "../waku-sys" }
libc = "0.2"
serde-aux = "4.3.1"
rln = "0.3.4"
tokio = { version = "1", features = ["full"] }
regex = "1"
chrono = "0.4" chrono = "0.4"
uuid = { version = "1.3", features = ["v4"] } enr = { version = "0.13", features = ["serde", "rust-secp256k1"] }
hex = "0.4"
libc = "0.2"
multiaddr = "0.18"
once_cell = "1"
rand = "0.9"
regex = "1"
rln = "0.8"
secp256k1 = { version = "0.31", features = ["rand", "recovery", "serde"] }
serde = { version = "1", features = ["derive"] }
serde-aux = "4"
serde_json = "1"
sscanf = "0.4"
smart-default = "0.7"
tokio = { version = "1", features = ["full"] }
tokio-stream = { version = "0.1", features = [] }
url = "2"
uuid = { version = "1", features = ["v4"] }
waku-sys = { version = "1", path = "../waku-sys" }
[dev-dependencies] [dev-dependencies]
futures = "0.3.25" futures = "0.3"
serial_test = "1.0.0" serial_test = "3"
tokio = { version = "1.24.2", features = ["macros", "rt", "sync", "time"] } tokio = { version = "1", features = ["full"] }

View File

@ -32,7 +32,7 @@ impl TryFrom<(u32, &str)> for LibwakuResponse {
/// Used in cases where the FFI call doesn't return additional information in the /// Used in cases where the FFI call doesn't return additional information in the
/// callback. Instead, it returns RET_OK, RET_ERR, etc. /// callback. Instead, it returns RET_OK, RET_ERR, etc.
pub fn handle_no_response(code: i32, result: LibwakuResponse) -> Result<()> { pub(crate) fn handle_no_response(code: i32, result: LibwakuResponse) -> Result<()> {
if result == LibwakuResponse::Undefined && code as u32 == RET_OK { if result == LibwakuResponse::Undefined && code as u32 == RET_OK {
// Some functions will only execute the callback on error // Some functions will only execute the callback on error
return Ok(()); return Ok(());
@ -51,7 +51,7 @@ pub fn handle_no_response(code: i32, result: LibwakuResponse) -> Result<()> {
/// Used in cases where the FFI function returns a code (RET_OK, RET_ERR, etc) plus additional /// Used in cases where the FFI function returns a code (RET_OK, RET_ERR, etc) plus additional
/// information, i.e. LibwakuResponse /// information, i.e. LibwakuResponse
pub fn handle_response<F: WakuDecode>(code: i32, result: LibwakuResponse) -> Result<F> { pub(crate) fn handle_response<F: WakuDecode>(code: i32, result: LibwakuResponse) -> Result<F> {
match result { match result {
LibwakuResponse::Success(v) => WakuDecode::decode(&v.unwrap_or_default()), LibwakuResponse::Success(v) => WakuDecode::decode(&v.unwrap_or_default()),
LibwakuResponse::Failure(v) => Err(v), LibwakuResponse::Failure(v) => Err(v),

View File

@ -32,15 +32,15 @@ pub struct WakuMessage {
#[serde(default)] #[serde(default)]
pub version: WakuMessageVersion, pub version: WakuMessageVersion,
/// Unix timestamp in nanoseconds /// Unix timestamp in nanoseconds
#[serde(deserialize_with = "deserialize_number_from_string")] #[serde(
deserialize_with = "deserialize_number_from_string",
default = "get_now_in_nanosecs"
)]
pub timestamp: u64, pub timestamp: u64,
#[serde(with = "base64_serde", default = "Vec::new")] #[serde(with = "base64_serde", default = "Vec::new")]
pub meta: Vec<u8>, pub meta: Vec<u8>,
#[serde(default)] #[serde(default)]
pub ephemeral: bool, pub ephemeral: bool,
// TODO: implement RLN fields
#[serde(flatten)]
_extras: serde_json::Value,
} }
#[derive(Clone, Serialize, Deserialize, Debug, Default)] #[derive(Clone, Serialize, Deserialize, Debug, Default)]
@ -63,39 +63,6 @@ pub struct WakuStoreRespMessage {
pub proof: Vec<u8>, pub proof: Vec<u8>,
} }
impl WakuMessage {
pub fn new<PAYLOAD: AsRef<[u8]>, META: AsRef<[u8]>>(
payload: PAYLOAD,
content_topic: WakuContentTopic,
version: WakuMessageVersion,
meta: META,
ephemeral: bool,
) -> Self {
let payload = payload.as_ref().to_vec();
let meta = meta.as_ref().to_vec();
Self {
payload,
content_topic,
version,
timestamp: get_now_in_nanosecs(),
meta,
ephemeral,
_extras: Default::default(),
}
}
pub fn payload(&self) -> &[u8] {
&self.payload
}
}
impl WakuStoreRespMessage {
pub fn payload(&self) -> &[u8] {
&self.payload
}
}
mod base64_serde { mod base64_serde {
use base64::Engine; use base64::Engine;
use serde::de::Error; use serde::de::Error;

View File

@ -4,16 +4,20 @@ use serde::{Deserialize, Serialize};
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct PubsubTopic(String); pub struct PubsubTopic(String);
impl PubsubTopic { impl From<String> for PubsubTopic {
// Constructor to create a new MyString fn from(value: String) -> Self {
pub fn new(value: &str) -> Self { PubsubTopic(value)
}
}
impl From<&str> for PubsubTopic {
fn from(value: &str) -> Self {
PubsubTopic(value.to_string()) PubsubTopic(value.to_string())
} }
} }
// to allow conversion from `PubsubTopic` to `String` impl Into<Vec<u8>> for PubsubTopic {
impl From<&PubsubTopic> for String { fn into(self) -> Vec<u8> {
fn from(topic: &PubsubTopic) -> Self { self.0.into()
topic.0.to_string()
} }
} }

View File

@ -15,8 +15,8 @@ pub use general::libwaku_response::LibwakuResponse;
use rln; use rln;
pub use node::{ pub use node::{
waku_create_content_topic, waku_new, Initialized, Key, Multiaddr, PublicKey, RLNConfig, Initialized, Key, Multiaddr, PublicKey, RLNConfig, Running, SecretKey, WakuEvent,
Running, SecretKey, WakuEvent, WakuMessageEvent, WakuNodeConfig, WakuNodeHandle, WakuMessageEvent, WakuNodeConfig, WakuNodeHandle,
}; };
pub use general::contenttopic::{Encoding, WakuContentTopic}; pub use general::contenttopic::{Encoding, WakuContentTopic};

View File

@ -26,7 +26,7 @@ unsafe extern "C" fn trampoline<F>(
closure(result); closure(result);
} }
pub fn get_trampoline<F>(_closure: &F) -> WakuCallBack pub(crate) fn get_trampoline<F>(_closure: &F) -> WakuCallBack
where where
F: FnMut(LibwakuResponse), F: FnMut(LibwakuResponse),
{ {

View File

@ -7,13 +7,13 @@ use crate::macros::get_trampoline;
type LibwakuResponseClosure = dyn FnMut(LibwakuResponse) + Send + Sync; type LibwakuResponseClosure = dyn FnMut(LibwakuResponse) + Send + Sync;
pub struct WakuNodeContext { pub(crate) struct WakuNodeContext {
obj_ptr: *mut c_void, obj_ptr: *mut c_void,
msg_observer: Arc<Mutex<Box<LibwakuResponseClosure>>>, msg_observer: Arc<Mutex<Box<LibwakuResponseClosure>>>,
} }
impl WakuNodeContext { impl WakuNodeContext {
pub fn new(obj_ptr: *mut c_void) -> Self { pub(crate) fn new(obj_ptr: *mut c_void) -> Self {
let me = Self { let me = Self {
obj_ptr, obj_ptr,
msg_observer: Arc::new(Mutex::new(Box::new(|_| {}))), msg_observer: Arc::new(Mutex::new(Box::new(|_| {}))),
@ -31,17 +31,17 @@ impl WakuNodeContext {
panic!("callback not set. Please use waku_set_event_callback to set a valid callback") panic!("callback not set. Please use waku_set_event_callback to set a valid callback")
} }
pub fn get_ptr(&self) -> *mut c_void { pub(crate) fn get_ptr(&self) -> *mut c_void {
self.obj_ptr self.obj_ptr
} }
pub fn reset_ptr(mut self) { pub(crate) fn reset_ptr(mut self) {
self.obj_ptr = null_mut(); self.obj_ptr = null_mut();
} }
/// Register callback to act as event handler and receive application events, /// Register callback to act as event handler and receive application events,
/// which are used to react to asynchronous events in Waku /// which are used to react to asynchronous events in Waku
pub fn waku_set_event_callback<F: FnMut(LibwakuResponse) + 'static + Sync + Send>( pub(crate) fn waku_set_event_callback<F: FnMut(LibwakuResponse) + 'static + Sync + Send>(
&self, &self,
closure: F, closure: F,
) -> Result<(), String> { ) -> Result<(), String> {

View File

@ -10,13 +10,13 @@ use crate::general::Result;
use crate::handle_ffi_call; use crate::handle_ffi_call;
use crate::node::context::WakuNodeContext; use crate::node::context::WakuNodeContext;
pub async fn waku_filter_subscribe( pub(crate) async fn waku_filter_subscribe(
ctx: &WakuNodeContext, ctx: &WakuNodeContext,
pubsub_topic: &PubsubTopic, pubsub_topic: PubsubTopic,
content_topics: Vec<WakuContentTopic>, content_topics: Vec<WakuContentTopic>,
) -> Result<()> { ) -> Result<()> {
let pubsub_topic = CString::new(String::from(pubsub_topic)) let pubsub_topic =
.expect("CString should build properly from pubsub topic"); CString::new(pubsub_topic).expect("CString should build properly from pubsub topic");
let content_topics = WakuContentTopic::join_content_topics(content_topics); let content_topics = WakuContentTopic::join_content_topics(content_topics);
let content_topics = let content_topics =
CString::new(content_topics).expect("CString should build properly from content topic"); CString::new(content_topics).expect("CString should build properly from content topic");
@ -30,13 +30,13 @@ pub async fn waku_filter_subscribe(
) )
} }
pub async fn waku_filter_unsubscribe( pub(crate) async fn waku_filter_unsubscribe(
ctx: &WakuNodeContext, ctx: &WakuNodeContext,
pubsub_topic: &PubsubTopic, pubsub_topic: PubsubTopic,
content_topics: Vec<WakuContentTopic>, // comma-separated list of content topics content_topics: Vec<WakuContentTopic>, // comma-separated list of content topics
) -> Result<()> { ) -> Result<()> {
let pubsub_topic = CString::new(String::from(pubsub_topic)) let pubsub_topic =
.expect("CString should build properly from pubsub topic"); CString::new(pubsub_topic).expect("CString should build properly from pubsub topic");
let content_topics = WakuContentTopic::join_content_topics(content_topics); let content_topics = WakuContentTopic::join_content_topics(content_topics);
let content_topics = let content_topics =
CString::new(content_topics).expect("CString should build properly from content topic"); CString::new(content_topics).expect("CString should build properly from content topic");
@ -50,7 +50,7 @@ pub async fn waku_filter_unsubscribe(
) )
} }
pub async fn waku_filter_unsubscribe_all(ctx: &WakuNodeContext) -> Result<()> { pub(crate) async fn waku_filter_unsubscribe_all(ctx: &WakuNodeContext) -> Result<()> {
handle_ffi_call!( handle_ffi_call!(
waku_sys::waku_filter_unsubscribe_all, waku_sys::waku_filter_unsubscribe_all,
handle_no_response, handle_no_response,

View File

@ -10,10 +10,10 @@ use crate::node::context::WakuNodeContext;
use crate::general::pubsubtopic::PubsubTopic; use crate::general::pubsubtopic::PubsubTopic;
pub async fn waku_lightpush_publish_message( pub(crate) async fn waku_lightpush_publish_message(
ctx: &WakuNodeContext, ctx: &WakuNodeContext,
message: &WakuMessage, message: &WakuMessage,
pubsub_topic: &PubsubTopic, pubsub_topic: PubsubTopic,
) -> Result<MessageHash> { ) -> Result<MessageHash> {
let message = CString::new( let message = CString::new(
serde_json::to_string(&message) serde_json::to_string(&message)
@ -21,8 +21,8 @@ pub async fn waku_lightpush_publish_message(
) )
.expect("CString should build properly from the serialized waku message"); .expect("CString should build properly from the serialized waku message");
let pubsub_topic = CString::new(String::from(pubsub_topic)) let pubsub_topic =
.expect("CString should build properly from pubsub topic"); CString::new(pubsub_topic).expect("CString should build properly from pubsub topic");
handle_ffi_call!( handle_ffi_call!(
waku_sys::waku_lightpush_publish, waku_sys::waku_lightpush_publish,

View File

@ -17,10 +17,9 @@ use crate::node::context::WakuNodeContext;
/// Instantiates a Waku node /// Instantiates a Waku node
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_newchar-jsonconfig) /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_newchar-jsonconfig)
pub async fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeContext> { pub(crate) async fn waku_new(config: &WakuNodeConfig) -> Result<WakuNodeContext> {
let config = config.unwrap_or_default();
let config = CString::new( let config = CString::new(
serde_json::to_string(&config) serde_json::to_string(config)
.expect("Serialization from properly built NodeConfig should never fail"), .expect("Serialization from properly built NodeConfig should never fail"),
) )
.expect("CString should build properly from the config"); .expect("CString should build properly from the config");
@ -48,30 +47,30 @@ pub async fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeContext>
} }
} }
pub async fn waku_destroy(ctx: &WakuNodeContext) -> Result<()> { pub(crate) async fn waku_destroy(ctx: &WakuNodeContext) -> Result<()> {
handle_ffi_call!(waku_sys::waku_destroy, handle_no_response, ctx.get_ptr()) handle_ffi_call!(waku_sys::waku_destroy, handle_no_response, ctx.get_ptr())
} }
/// Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation. /// Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation.
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_start) /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_start)
pub async fn waku_start(ctx: &WakuNodeContext) -> Result<()> { pub(crate) async fn waku_start(ctx: &WakuNodeContext) -> Result<()> {
handle_ffi_call!(waku_sys::waku_start, handle_no_response, ctx.get_ptr()) handle_ffi_call!(waku_sys::waku_start, handle_no_response, ctx.get_ptr())
} }
/// Stops a Waku node /// Stops a Waku node
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop) /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop)
pub async fn waku_stop(ctx: &WakuNodeContext) -> Result<()> { pub(crate) async fn waku_stop(ctx: &WakuNodeContext) -> Result<()> {
handle_ffi_call!(waku_sys::waku_stop, handle_no_response, ctx.get_ptr()) handle_ffi_call!(waku_sys::waku_stop, handle_no_response, ctx.get_ptr())
} }
/// nwaku version /// nwaku version
pub async fn waku_version(ctx: &WakuNodeContext) -> Result<String> { pub(crate) async fn waku_version(ctx: &WakuNodeContext) -> Result<String> {
handle_ffi_call!(waku_sys::waku_version, handle_response, ctx.get_ptr()) handle_ffi_call!(waku_sys::waku_version, handle_response, ctx.get_ptr())
} }
/// Get the multiaddresses the Waku node is listening to /// Get the multiaddresses the Waku node is listening to
/// as per [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_listen_addresses) /// as per [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_listen_addresses)
pub async fn waku_listen_addresses(ctx: &WakuNodeContext) -> Result<Vec<Multiaddr>> { pub(crate) async fn waku_listen_addresses(ctx: &WakuNodeContext) -> Result<Vec<Multiaddr>> {
handle_ffi_call!( handle_ffi_call!(
waku_sys::waku_listen_addresses, waku_sys::waku_listen_addresses,
handle_response, handle_response,
@ -81,40 +80,34 @@ pub async fn waku_listen_addresses(ctx: &WakuNodeContext) -> Result<Vec<Multiadd
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::waku_new; use crate::WakuNodeHandle;
use crate::node::management::{
waku_destroy, waku_listen_addresses, waku_start, waku_stop, waku_version,
};
use serial_test::serial; use serial_test::serial;
#[tokio::test] #[tokio::test]
#[serial] #[serial]
async fn waku_flow() { async fn waku_flow() {
let node = waku_new(None).await.unwrap(); let node = WakuNodeHandle::new(None).await.unwrap();
let node = node.start().await.unwrap();
waku_start(&node).await.unwrap();
// test addresses // test addresses
let addresses = waku_listen_addresses(&node).await.unwrap(); let addresses = node.listen_addresses().await.unwrap();
dbg!(&addresses); dbg!(&addresses);
assert!(!addresses.is_empty()); assert!(!addresses.is_empty());
waku_stop(&node).await.unwrap(); let node = node.stop().await.unwrap();
waku_destroy(&node).await.unwrap(); node.waku_destroy().await.unwrap();
} }
#[tokio::test] #[tokio::test]
#[serial] #[serial]
async fn nwaku_version() { async fn nwaku_version() {
let node = waku_new(None).await.unwrap(); let node = WakuNodeHandle::new(None).await.unwrap();
let version = waku_version(&node) let version = node.version().await.expect("should return the version");
.await
.expect("should return the version");
print!("Current version: {}", version); print!("Current version: {}", version);
assert!(!version.is_empty()); assert!(!version.is_empty());
waku_destroy(&node).await.unwrap(); node.waku_destroy().await.unwrap();
} }
} }

View File

@ -17,8 +17,11 @@ pub use secp256k1::{PublicKey, SecretKey};
use std::marker::PhantomData; use std::marker::PhantomData;
use std::time::Duration; use std::time::Duration;
use store::{StoreQueryRequest, StoreWakuMessageResponse}; use store::{StoreQueryRequest, StoreWakuMessageResponse};
use tokio::sync::mpsc::unbounded_channel;
use tokio_stream::wrappers::UnboundedReceiverStream;
use tokio_stream::Stream;
// internal // internal
use crate::general::contenttopic::{Encoding, WakuContentTopic}; use crate::general::contenttopic::WakuContentTopic;
use crate::general::libwaku_response::LibwakuResponse; use crate::general::libwaku_response::LibwakuResponse;
pub use crate::general::pubsubtopic::PubsubTopic; pub use crate::general::pubsubtopic::PubsubTopic;
use crate::general::{messagehash::MessageHash, Result, WakuMessage}; use crate::general::{messagehash::MessageHash, Result, WakuMessage};
@ -27,7 +30,6 @@ use crate::node::context::WakuNodeContext;
pub use config::RLNConfig; pub use config::RLNConfig;
pub use config::WakuNodeConfig; pub use config::WakuNodeConfig;
pub use events::{WakuEvent, WakuMessageEvent}; pub use events::{WakuEvent, WakuMessageEvent};
pub use relay::waku_create_content_topic;
// Define state marker types // Define state marker types
pub struct Initialized; pub struct Initialized;
@ -36,18 +38,10 @@ pub struct Running;
/// Handle to the underliying waku node /// Handle to the underliying waku node
pub struct WakuNodeHandle<State> { pub struct WakuNodeHandle<State> {
ctx: WakuNodeContext, ctx: WakuNodeContext,
config: WakuNodeConfig,
_state: PhantomData<State>, _state: PhantomData<State>,
} }
/// Spawn a new Waku node with the given configuration (default configuration if `None` provided)
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_newchar-jsonconfig)
pub async fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeHandle<Initialized>> {
Ok(WakuNodeHandle {
ctx: management::waku_new(config).await?,
_state: PhantomData,
})
}
impl<State> WakuNodeHandle<State> { impl<State> WakuNodeHandle<State> {
/// Get the nwaku version /// Get the nwaku version
pub async fn version(&self) -> Result<String> { pub async fn version(&self) -> Result<String> {
@ -59,23 +53,37 @@ impl<State> WakuNodeHandle<State> {
self.ctx.reset_ptr(); self.ctx.reset_ptr();
res res
} }
/// Subscribe to WakuRelay to receive messages matching a content filter.
pub async fn relay_subscribe(&self, pubsub_topic: &PubsubTopic) -> Result<()> {
relay::waku_relay_subscribe(&self.ctx, pubsub_topic).await
}
} }
impl WakuNodeHandle<Initialized> { impl WakuNodeHandle<Initialized> {
/// Spawn a new Waku node with the given configuration (default configuration if `None` provided)
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_newchar-jsonconfig)
pub async fn new(config: Option<WakuNodeConfig>) -> Result<Self> {
let config = config.unwrap_or_default();
let ctx = management::waku_new(&config).await?;
let node = Self {
ctx,
config,
_state: PhantomData,
};
Ok(node)
}
/// Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation. /// Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation.
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_start) /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_start)
pub async fn start(self) -> Result<WakuNodeHandle<Running>> { pub async fn start(self) -> Result<WakuNodeHandle<Running>> {
management::waku_start(&self.ctx) management::waku_start(&self.ctx).await?;
.await
.map(|_| WakuNodeHandle { let running_node = WakuNodeHandle {
ctx: self.ctx, ctx: self.ctx,
_state: PhantomData, config: self.config,
}) _state: PhantomData,
};
Ok(running_node)
} }
pub fn set_event_callback<F: FnMut(LibwakuResponse) + 'static + Sync + Send>( pub fn set_event_callback<F: FnMut(LibwakuResponse) + 'static + Sync + Send>(
@ -84,18 +92,41 @@ impl WakuNodeHandle<Initialized> {
) -> Result<()> { ) -> Result<()> {
self.ctx.waku_set_event_callback(closure) self.ctx.waku_set_event_callback(closure)
} }
/// Return a stream of all Waku responses.
pub fn response_stream(&self) -> impl Stream<Item = LibwakuResponse> {
let (tx, rx) = unbounded_channel();
let tx_clone = tx.clone();
let callback = {
move |event: LibwakuResponse| {
let _ = tx_clone.send(event);
}
};
if let Err(error) = self.ctx.waku_set_event_callback(callback) {
tx.send(LibwakuResponse::Failure(error)).unwrap();
}
let stream = UnboundedReceiverStream::new(rx);
stream
}
} }
impl WakuNodeHandle<Running> { impl WakuNodeHandle<Running> {
/// Stops a Waku node /// Stops a Waku node
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop) /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop)
pub async fn stop(self) -> Result<WakuNodeHandle<Initialized>> { pub async fn stop(self) -> Result<WakuNodeHandle<Initialized>> {
management::waku_stop(&self.ctx) management::waku_stop(&self.ctx).await?;
.await
.map(|_| WakuNodeHandle { let init_node = WakuNodeHandle {
ctx: self.ctx, ctx: self.ctx,
_state: PhantomData, config: self.config,
}) _state: PhantomData,
};
Ok(init_node)
} }
/// Get the multiaddresses the Waku node is listening to /// Get the multiaddresses the Waku node is listening to
@ -113,38 +144,55 @@ impl WakuNodeHandle<Running> {
peers::waku_connect(&self.ctx, address, timeout).await peers::waku_connect(&self.ctx, address, timeout).await
} }
pub async fn relay_publish_txt(
&self,
pubsub_topic: &PubsubTopic,
msg_txt: &String,
content_topic_name: &'static str,
timeout: Option<Duration>,
) -> Result<MessageHash> {
let content_topic = WakuContentTopic::new("waku", "2", content_topic_name, Encoding::Proto);
let message = WakuMessage::new(msg_txt, content_topic, 0, Vec::new(), false);
relay::waku_relay_publish_message(&self.ctx, &message, pubsub_topic, timeout).await
}
/// Publish a message using Waku Relay. /// Publish a message using Waku Relay.
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms) /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms)
/// The pubsub_topic parameter is optional and if not specified it will be derived from the contentTopic. /// The pubsub_topic parameter is optional and if not specified it will be derived from the contentTopic.
pub async fn relay_publish_message( pub async fn relay_publish_message(
&self, &self,
message: &WakuMessage, message: &WakuMessage,
pubsub_topic: &PubsubTopic, pubsub_topic: impl Into<PubsubTopic>,
timeout: Option<Duration>, timeout: Option<Duration>,
) -> Result<MessageHash> { ) -> Result<MessageHash> {
relay::waku_relay_publish_message(&self.ctx, message, pubsub_topic, timeout).await if self.config.relay.is_none_or(|enabled| !enabled) {
//TODO add error types
return Err(
"Relay is disabled. Restart the waku node with Relay enabled to use this function."
.to_string(),
);
}
relay::waku_relay_publish_message(&self.ctx, message, pubsub_topic.into(), timeout).await
} }
/// Closes the pubsub subscription to stop receiving messages matching a content filter. No more messages will be received from this pubsub topic /// Subscribe to receive messages matching a pubsub topic.
pub async fn relay_unsubscribe(&self, pubsub_topic: &PubsubTopic) -> Result<()> { pub async fn relay_subscribe(&self, pubsub_topic: impl Into<PubsubTopic>) -> Result<()> {
relay::waku_relay_unsubscribe(&self.ctx, pubsub_topic).await if self.config.relay.is_none_or(|enabled| !enabled) {
//TODO add error types
return Err(
"Relay is disabled. Restart the waku node with Relay enabled to use this function."
.to_string(),
);
}
relay::waku_relay_subscribe(&self.ctx, pubsub_topic.into()).await
}
/// Unsubscribe to stop receiving messages matching a pubsub topic.
pub async fn relay_unsubscribe(&self, pubsub_topic: impl Into<PubsubTopic>) -> Result<()> {
if self.config.relay.is_none_or(|enabled| !enabled) {
//TODO add error types
return Err(
"Relay is disabled. Restart the waku node with Relay enabled to use this function."
.to_string(),
);
}
relay::waku_relay_unsubscribe(&self.ctx, pubsub_topic.into()).await
} }
pub async fn filter_subscribe( pub async fn filter_subscribe(
&self, &self,
pubsub_topic: &PubsubTopic, pubsub_topic: PubsubTopic,
content_topics: Vec<WakuContentTopic>, content_topics: Vec<WakuContentTopic>,
) -> Result<()> { ) -> Result<()> {
filter::waku_filter_subscribe(&self.ctx, pubsub_topic, content_topics).await filter::waku_filter_subscribe(&self.ctx, pubsub_topic, content_topics).await
@ -152,7 +200,7 @@ impl WakuNodeHandle<Running> {
pub async fn filter_unsubscribe( pub async fn filter_unsubscribe(
&self, &self,
pubsub_topic: &PubsubTopic, pubsub_topic: PubsubTopic,
content_topics: Vec<WakuContentTopic>, content_topics: Vec<WakuContentTopic>,
) -> Result<()> { ) -> Result<()> {
filter::waku_filter_unsubscribe(&self.ctx, pubsub_topic, content_topics).await filter::waku_filter_unsubscribe(&self.ctx, pubsub_topic, content_topics).await
@ -165,7 +213,7 @@ impl WakuNodeHandle<Running> {
pub async fn lightpush_publish_message( pub async fn lightpush_publish_message(
&self, &self,
message: &WakuMessage, message: &WakuMessage,
pubsub_topic: &PubsubTopic, pubsub_topic: PubsubTopic,
) -> Result<MessageHash> { ) -> Result<MessageHash> {
lightpush::waku_lightpush_publish_message(&self.ctx, message, pubsub_topic).await lightpush::waku_lightpush_publish_message(&self.ctx, message, pubsub_topic).await
} }

View File

@ -16,7 +16,7 @@ use crate::node::context::WakuNodeContext;
/// If the function execution takes longer than `timeout` value, the execution will be canceled and an error returned. /// If the function execution takes longer than `timeout` value, the execution will be canceled and an error returned.
/// Use 0 for no timeout /// Use 0 for no timeout
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_connect_peerchar-address-int-timeoutms) /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_connect_peerchar-address-int-timeoutms)
pub async fn waku_connect( pub(crate) async fn waku_connect(
ctx: &WakuNodeContext, ctx: &WakuNodeContext,
address: &Multiaddr, address: &Multiaddr,
timeout: Option<Duration>, timeout: Option<Duration>,

View File

@ -4,17 +4,21 @@
use std::ffi::CString; use std::ffi::CString;
use std::time::Duration; use std::time::Duration;
// internal // internal
use crate::general::contenttopic::{Encoding, WakuContentTopic}; use crate::{
use crate::general::libwaku_response::{handle_no_response, handle_response, LibwakuResponse}; general::{
use crate::general::pubsubtopic::PubsubTopic; contenttopic::{Encoding, WakuContentTopic},
use crate::general::{messagehash::MessageHash, Result, WakuMessage}; libwaku_response::{handle_no_response, handle_response, LibwakuResponse},
use crate::handle_ffi_call; messagehash::MessageHash,
use crate::node::context::WakuNodeContext; Result, WakuMessage,
},
handle_ffi_call,
node::context::WakuNodeContext,
};
/// Create a content topic according to [RFC 23](https://rfc.vac.dev/spec/23/) /// Create a content topic according to [RFC 23](https://rfc.vac.dev/spec/23/)
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_content_topicchar-applicationname-unsigned-int-applicationversion-char-contenttopicname-char-encoding) /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_content_topicchar-applicationname-unsigned-int-applicationversion-char-contenttopicname-char-encoding)
#[allow(clippy::not_unsafe_ptr_arg_deref)] #[allow(clippy::not_unsafe_ptr_arg_deref)]
pub async fn waku_create_content_topic( pub(crate) async fn _waku_create_content_topic(
ctx: &WakuNodeContext, ctx: &WakuNodeContext,
application_name: &str, application_name: &str,
application_version: u32, application_version: u32,
@ -41,10 +45,10 @@ pub async fn waku_create_content_topic(
/// Publish a message using Waku Relay /// Publish a message using Waku Relay
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms) /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms)
pub async fn waku_relay_publish_message( pub(crate) async fn waku_relay_publish_message(
ctx: &WakuNodeContext, ctx: &WakuNodeContext,
message: &WakuMessage, message: &WakuMessage,
pubsub_topic: &PubsubTopic, pubsub_topic: impl Into<Vec<u8>>,
timeout: Option<Duration>, timeout: Option<Duration>,
) -> Result<MessageHash> { ) -> Result<MessageHash> {
let message = CString::new( let message = CString::new(
@ -53,8 +57,8 @@ pub async fn waku_relay_publish_message(
) )
.expect("CString should build properly from the serialized waku message"); .expect("CString should build properly from the serialized waku message");
let pubsub_topic = CString::new(String::from(pubsub_topic)) let pubsub_topic =
.expect("CString should build properly from pubsub topic"); CString::new(pubsub_topic).expect("CString should build properly from pubsub topic");
handle_ffi_call!( handle_ffi_call!(
waku_sys::waku_relay_publish, waku_sys::waku_relay_publish,
@ -73,9 +77,12 @@ pub async fn waku_relay_publish_message(
) )
} }
pub async fn waku_relay_subscribe(ctx: &WakuNodeContext, pubsub_topic: &PubsubTopic) -> Result<()> { pub(crate) async fn waku_relay_subscribe(
let pubsub_topic = CString::new(String::from(pubsub_topic)) ctx: &WakuNodeContext,
.expect("CString should build properly from pubsub topic"); pubsub_topic: impl Into<Vec<u8>>,
) -> Result<()> {
let pubsub_topic =
CString::new(pubsub_topic).expect("CString should build properly from pubsub topic");
handle_ffi_call!( handle_ffi_call!(
waku_sys::waku_relay_subscribe, waku_sys::waku_relay_subscribe,
@ -85,12 +92,12 @@ pub async fn waku_relay_subscribe(ctx: &WakuNodeContext, pubsub_topic: &PubsubTo
) )
} }
pub async fn waku_relay_unsubscribe( pub(crate) async fn waku_relay_unsubscribe(
ctx: &WakuNodeContext, ctx: &WakuNodeContext,
pubsub_topic: &PubsubTopic, pubsub_topic: impl Into<Vec<u8>>,
) -> Result<()> { ) -> Result<()> {
let pubsub_topic = CString::new(String::from(pubsub_topic)) let pubsub_topic =
.expect("CString should build properly from pubsub topic"); CString::new(pubsub_topic).expect("CString should build properly from pubsub topic");
handle_ffi_call!( handle_ffi_call!(
waku_sys::waku_relay_unsubscribe, waku_sys::waku_relay_unsubscribe,

View File

@ -20,7 +20,7 @@ use serde::{Deserialize, Serialize};
#[derive(Clone, Serialize, Deserialize, Debug)] #[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct PagingOptions { pub(crate) struct PagingOptions {
pub page_size: usize, pub page_size: usize,
pub cursor: Option<MessageHash>, pub cursor: Option<MessageHash>,
pub forward: bool, pub forward: bool,
@ -28,7 +28,7 @@ pub struct PagingOptions {
/// Criteria used to retrieve historical messages /// Criteria used to retrieve historical messages
#[derive(Clone, Serialize, Debug)] #[derive(Clone, Serialize, Debug)]
pub struct StoreQueryRequest { pub(crate) struct StoreQueryRequest {
/// if true, the store-response will include the full message content. If false, /// if true, the store-response will include the full message content. If false,
/// the store-response will only include a list of message hashes. /// the store-response will only include a list of message hashes.
#[serde(rename = "requestId")] #[serde(rename = "requestId")]
@ -54,7 +54,7 @@ pub struct StoreQueryRequest {
} }
impl StoreQueryRequest { impl StoreQueryRequest {
pub fn new() -> Self { pub(crate) fn new() -> Self {
StoreQueryRequest { StoreQueryRequest {
request_id: Uuid::new_v4().to_string(), request_id: Uuid::new_v4().to_string(),
include_data: true, include_data: true,
@ -69,43 +69,43 @@ impl StoreQueryRequest {
} }
} }
pub fn with_include_data(mut self, include_data: bool) -> Self { pub(crate) fn with_include_data(mut self, include_data: bool) -> Self {
self.include_data = include_data; self.include_data = include_data;
self self
} }
pub fn with_pubsub_topic(mut self, pubsub_topic: Option<PubsubTopic>) -> Self { pub(crate) fn with_pubsub_topic(mut self, pubsub_topic: Option<PubsubTopic>) -> Self {
self.pubsub_topic = pubsub_topic; self.pubsub_topic = pubsub_topic;
self self
} }
pub fn with_content_topics(mut self, content_topics: Vec<WakuContentTopic>) -> Self { pub(crate) fn with_content_topics(mut self, content_topics: Vec<WakuContentTopic>) -> Self {
self.content_topics = content_topics; self.content_topics = content_topics;
self self
} }
pub fn with_time_start(mut self, time_start: Option<u64>) -> Self { pub(crate) fn with_time_start(mut self, time_start: Option<u64>) -> Self {
self.time_start = time_start; self.time_start = time_start;
self self
} }
pub fn with_time_end(mut self, time_end: Option<u64>) -> Self { pub(crate) fn with_time_end(mut self, time_end: Option<u64>) -> Self {
self.time_end = time_end; self.time_end = time_end;
self self
} }
#[allow(dead_code)] #[allow(dead_code)]
pub fn with_message_hashes(mut self, message_hashes: Vec<MessageHash>) -> Self { pub(crate) fn with_message_hashes(mut self, message_hashes: Vec<MessageHash>) -> Self {
self.message_hashes = Some(message_hashes); self.message_hashes = Some(message_hashes);
self self
} }
pub fn with_pagination_cursor(mut self, pagination_cursor: Option<MessageHash>) -> Self { pub(crate) fn with_pagination_cursor(mut self, pagination_cursor: Option<MessageHash>) -> Self {
self.pagination_cursor = pagination_cursor; self.pagination_cursor = pagination_cursor;
self self
} }
pub fn with_pagination_forward(mut self, pagination_forward: bool) -> Self { pub(crate) fn with_pagination_forward(mut self, pagination_forward: bool) -> Self {
self.pagination_forward = pagination_forward; self.pagination_forward = pagination_forward;
self self
} }
@ -121,7 +121,7 @@ pub struct StoreWakuMessageResponse {
#[derive(Clone, Deserialize, Debug)] #[derive(Clone, Deserialize, Debug)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct StoreResponse { pub(crate) struct StoreResponse {
#[allow(unused)] #[allow(unused)]
pub request_id: String, pub request_id: String,
@ -142,12 +142,13 @@ pub struct StoreResponse {
// Implement WakuDecode for Vec<Multiaddr> // Implement WakuDecode for Vec<Multiaddr>
impl WakuDecode for StoreResponse { impl WakuDecode for StoreResponse {
//TODO impl TryFrom instead
fn decode(input: &str) -> Result<Self> { fn decode(input: &str) -> Result<Self> {
Ok(serde_json::from_str(input).expect("could not parse store resp")) Ok(serde_json::from_str(input).expect("could not parse store resp"))
} }
} }
pub async fn waku_store_query( pub(crate) async fn waku_store_query(
ctx: &WakuNodeContext, ctx: &WakuNodeContext,
query: StoreQueryRequest, query: StoreQueryRequest,
peer_addr: &str, peer_addr: &str,

View File

@ -8,12 +8,12 @@ use std::time::Duration;
use std::{collections::HashSet, str::from_utf8}; use std::{collections::HashSet, str::from_utf8};
use tokio::time; use tokio::time;
use tokio::time::sleep; use tokio::time::sleep;
use waku_bindings::node::PubsubTopic;
use waku_bindings::{ use waku_bindings::{
waku_new, Encoding, Initialized, MessageHash, WakuContentTopic, WakuEvent, WakuMessage, Encoding, Initialized, MessageHash, WakuContentTopic, WakuEvent, WakuMessage, WakuNodeConfig,
WakuNodeConfig, WakuNodeHandle, WakuNodeHandle,
}; };
use waku_bindings::{LibwakuResponse, Running}; use waku_bindings::{LibwakuResponse, Running};
const ECHO_TIMEOUT: u64 = 1000; const ECHO_TIMEOUT: u64 = 1000;
const ECHO_MESSAGE: &str = "Hi from 🦀!"; const ECHO_MESSAGE: &str = "Hi from 🦀!";
const TEST_PUBSUBTOPIC: &str = "test"; const TEST_PUBSUBTOPIC: &str = "test";
@ -23,7 +23,7 @@ async fn try_publish_relay_messages(
msg: &WakuMessage, msg: &WakuMessage,
) -> Result<HashSet<MessageHash>, String> { ) -> Result<HashSet<MessageHash>, String> {
Ok(HashSet::from([node Ok(HashSet::from([node
.relay_publish_message(msg, &PubsubTopic::new(TEST_PUBSUBTOPIC), None) .relay_publish_message(msg, TEST_PUBSUBTOPIC, None)
.await?])) .await?]))
} }
@ -73,14 +73,8 @@ async fn test_echo_messages(
let node1 = node1.start().await?; let node1 = node1.start().await?;
let node2 = node2.start().await?; let node2 = node2.start().await?;
node1 node1.relay_subscribe(TEST_PUBSUBTOPIC).await.unwrap();
.relay_subscribe(&PubsubTopic::new(TEST_PUBSUBTOPIC)) node2.relay_subscribe(TEST_PUBSUBTOPIC).await.unwrap();
.await
.unwrap();
node2
.relay_subscribe(&PubsubTopic::new(TEST_PUBSUBTOPIC))
.await
.unwrap();
sleep(Duration::from_secs(5)).await; sleep(Duration::from_secs(5)).await;
@ -101,7 +95,12 @@ async fn test_echo_messages(
sleep(Duration::from_secs(3)).await; sleep(Duration::from_secs(3)).await;
dbg!("Before publish"); dbg!("Before publish");
let message = WakuMessage::new(content, content_topic, 1, Vec::new(), false); let message = WakuMessage {
payload: content.to_owned().into_bytes(),
content_topic,
version: 1,
..Default::default()
};
let _ids = try_publish_relay_messages(&node1, &message) let _ids = try_publish_relay_messages(&node1, &message)
.await .await
.expect("send relay messages"); .expect("send relay messages");
@ -136,12 +135,12 @@ async fn test_echo_messages(
#[serial] #[serial]
async fn default_echo() -> Result<(), String> { async fn default_echo() -> Result<(), String> {
println!("Test default_echo"); println!("Test default_echo");
let node1 = waku_new(Some(WakuNodeConfig { let node1 = WakuNodeHandle::new(Some(WakuNodeConfig {
tcp_port: Some(60010), tcp_port: Some(60010),
..Default::default() ..Default::default()
})) }))
.await?; .await?;
let node2 = waku_new(Some(WakuNodeConfig { let node2 = WakuNodeHandle::new(Some(WakuNodeConfig {
tcp_port: Some(60020), tcp_port: Some(60020),
..Default::default() ..Default::default()
})) }))
@ -175,7 +174,7 @@ async fn node_restart() {
}; };
for _ in 0..3 { for _ in 0..3 {
let node = waku_new(config.clone().into()) let node = WakuNodeHandle::new(config.clone().into())
.await .await
.expect("default config should be valid"); .expect("default config should be valid");
let node = node let node = node

View File

@ -34,5 +34,5 @@ crate-type = ["rlib"]
[dependencies] [dependencies]
[build-dependencies] [build-dependencies]
bindgen = "0.64" bindgen = "0.72"
cc = "1.0.73" cc = "1.0.73"