chore: remove code no longer needed (for now)

This commit is contained in:
Richard Ramos 2024-02-12 20:13:49 -04:00
parent 8755d9a7c8
commit 1d8626b110
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760
9 changed files with 66 additions and 356 deletions

View File

@ -67,15 +67,6 @@ impl WakuMessageEvent {
} }
} }
/// Shared callback slot. Callbacks are registered here so they can be accessed by the extern "C"
#[allow(clippy::type_complexity)]
static CALLBACK: Lazy<Mutex<Box<dyn FnMut(Signal) + Send + Sync>>> =
Lazy::new(|| Mutex::new(Box::new(|_| {})));
/// Register global callback
fn set_callback<F: FnMut(Signal) + Send + Sync + 'static>(f: F) {
*CALLBACK.lock().unwrap() = Box::new(f);
}
/// Wrapper callback, it transformst the `*const c_char` into a [`Signal`] /// Wrapper callback, it transformst the `*const c_char` into a [`Signal`]
/// and executes the [`CALLBACK`] funtion with it /// and executes the [`CALLBACK`] funtion with it
@ -94,7 +85,7 @@ extern "C" fn callback(_ret_code: c_int, data: *const c_char, _user_data: *mut c
/// Register callback to act as event handler and receive application signals, /// Register callback to act as event handler and receive application signals,
/// which are used to react to asynchronous events in Waku /// which are used to react to asynchronous events in Waku
pub fn waku_set_event_callback<F: FnMut(Signal) + Send + Sync + 'static>(f: F) { pub fn waku_set_event_callback<F: FnMut(Signal) + Send + Sync + 'static>(f: F) {
set_callback(f); // TODO:
unsafe { waku_sys::waku_set_event_callback(Some(callback)) }; unsafe { waku_sys::waku_set_event_callback(Some(callback)) };
} }

View File

@ -239,8 +239,7 @@ mod base64_serde {
mod tests { mod tests {
use super::*; use super::*;
use crate::WakuPubSubTopic; use crate::WakuPubSubTopic;
use secp256k1::{rand, Secp256k1};
use std::time::SystemTime;
#[test] #[test]
fn parse_waku_topic() { fn parse_waku_topic() {
let s = "/waku/2/default-waku/proto"; let s = "/waku/2/default-waku/proto";

View File

@ -7,12 +7,12 @@ mod node;
mod utils; mod utils;
pub use node::{ pub use node::{
waku_create_content_topic, waku_default_pubsub_topic, waku_new, Initialized, Key, Multiaddr, waku_create_content_topic, waku_default_pubsub_topic, waku_new, Key, Multiaddr, PublicKey,
PublicKey, Running, SecretKey, WakuNodeConfig, WakuNodeHandle, SecretKey, WakuNodeConfig, WakuNodeHandle,
}; };
pub use general::{ pub use general::{
MessageId, Result, WakuContentTopic, WakuMessage, WakuMessageVersion, WakuPubSubTopic, Encoding, MessageId, Result, WakuContentTopic, WakuMessage, WakuMessageVersion, WakuPubSubTopic,
}; };
pub use events::{waku_set_event_callback, Event, Signal, WakuMessageEvent}; pub use events::{waku_set_event_callback, Event, Signal, WakuMessageEvent};

View File

@ -3,7 +3,7 @@
// std // std
use std::ffi::CString; use std::ffi::CString;
// crates // crates
use libc::*; use libc::c_void;
// internal // internal
use super::config::WakuNodeConfig; use super::config::WakuNodeConfig;
use crate::general::Result; use crate::general::Result;
@ -11,7 +11,7 @@ use crate::utils::{get_trampoline, handle_json_response, handle_no_response, han
/// Instantiates a Waku node /// Instantiates a Waku node
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_newchar-jsonconfig) /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_newchar-jsonconfig)
pub fn waku_new(config: Option<WakuNodeConfig>) -> Result<()> { pub fn waku_new(config: Option<WakuNodeConfig>) -> Result<*mut c_void> {
let config = config.unwrap_or_default(); let config = config.unwrap_or_default();
let config_ptr = CString::new( let config_ptr = CString::new(
@ -23,7 +23,7 @@ pub fn waku_new(config: Option<WakuNodeConfig>) -> Result<()> {
let mut error: String = Default::default(); let mut error: String = Default::default();
let error_cb = |v: &str| error = v.to_string(); let error_cb = |v: &str| error = v.to_string();
let code = unsafe { let node_ptr = unsafe {
let mut closure = error_cb; let mut closure = error_cb;
let cb = get_trampoline(&closure); let cb = get_trampoline(&closure);
let out = waku_sys::waku_new(config_ptr, cb, &mut closure as *mut _ as *mut c_void); let out = waku_sys::waku_new(config_ptr, cb, &mut closure as *mut _ as *mut c_void);
@ -33,18 +33,18 @@ pub fn waku_new(config: Option<WakuNodeConfig>) -> Result<()> {
out out
}; };
handle_no_response(code, &error) Ok(node_ptr)
} }
/// Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation. /// Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation.
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_start) /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_start)
pub fn waku_start() -> Result<()> { pub fn waku_start(ctx: *mut c_void) -> Result<()> {
let mut error: String = Default::default(); let mut error: String = Default::default();
let error_cb = |v: &str| error = v.to_string(); let error_cb = |v: &str| error = v.to_string();
let code = unsafe { let code = unsafe {
let mut closure = error_cb; let mut closure = error_cb;
let cb = get_trampoline(&closure); let cb = get_trampoline(&closure);
waku_sys::waku_start(cb, &mut closure as *mut _ as *mut c_void) waku_sys::waku_start(ctx, cb, &mut closure as *mut _ as *mut c_void)
}; };
handle_no_response(code, &error) handle_no_response(code, &error)
@ -52,13 +52,13 @@ pub fn waku_start() -> Result<()> {
/// Stops a Waku node /// Stops a Waku node
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop) /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop)
pub fn waku_stop() -> Result<()> { pub fn waku_stop(ctx: *mut c_void) -> Result<()> {
let mut error: String = Default::default(); let mut error: String = Default::default();
let error_cb = |v: &str| error = v.to_string(); let error_cb = |v: &str| error = v.to_string();
let code = unsafe { let code = unsafe {
let mut closure = error_cb; let mut closure = error_cb;
let cb = get_trampoline(&closure); let cb = get_trampoline(&closure);
waku_sys::waku_stop(cb, &mut closure as *mut _ as *mut c_void) waku_sys::waku_stop(ctx, cb, &mut closure as *mut _ as *mut c_void)
}; };
handle_no_response(code, &error) handle_no_response(code, &error)
@ -67,28 +67,14 @@ pub fn waku_stop() -> Result<()> {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use super::waku_new; use super::waku_new;
use crate::node::management::{waku_listen_addresses, waku_peer_id, waku_start, waku_stop}; use crate::node::management::{waku_start, waku_stop};
use crate::node::peers::waku_peer_count;
use serial_test::serial; use serial_test::serial;
#[test] #[test]
#[serial] #[serial]
fn waku_flow() { fn waku_flow() {
waku_new(None).unwrap(); let node = waku_new(None).unwrap();
waku_start().unwrap(); waku_start(node).unwrap();
// test peer id call, since we cannot start different instances of the node waku_stop(node).unwrap();
let id = waku_peer_id().unwrap();
dbg!(&id);
assert!(!id.is_empty());
let peer_cnt = waku_peer_count().unwrap();
dbg!(peer_cnt);
// test addresses, since we cannot start different instances of the node
let addresses = waku_listen_addresses().unwrap();
dbg!(&addresses);
assert!(!addresses.is_empty());
waku_stop().unwrap();
} }
} }

View File

@ -9,10 +9,9 @@ mod relay;
pub use aes_gcm::{Aes256Gcm, Key}; pub use aes_gcm::{Aes256Gcm, Key};
pub use multiaddr::Multiaddr; pub use multiaddr::Multiaddr;
pub use secp256k1::{PublicKey, SecretKey}; pub use secp256k1::{PublicKey, SecretKey};
use std::marker::PhantomData;
use std::sync::Mutex;
use std::time::Duration; use std::time::Duration;
// crates // crates
use libc::c_void;
// internal // internal
use crate::general::{MessageId, Result, WakuMessage, WakuPubSubTopic}; use crate::general::{MessageId, Result, WakuMessage, WakuPubSubTopic};
@ -20,82 +19,22 @@ use crate::general::{MessageId, Result, WakuMessage, WakuPubSubTopic};
pub use config::WakuNodeConfig; pub use config::WakuNodeConfig;
pub use relay::{waku_create_content_topic, waku_default_pubsub_topic}; pub use relay::{waku_create_content_topic, waku_default_pubsub_topic};
/// Shared flag to check if a waku node is already running in the current process
static WAKU_NODE_INITIALIZED: Mutex<bool> = Mutex::new(false);
/// Marker trait to disallow undesired waku node states in the handle
pub trait WakuNodeState {}
/// Waku node initialized state
pub struct Initialized;
/// Waku node running state
pub struct Running;
impl WakuNodeState for Initialized {}
impl WakuNodeState for Running {}
/// Handle to the underliying waku node /// Handle to the underliying waku node
/// Safe to sendt to/through threads. pub struct WakuNodeHandle {
/// Only a waku node can be running at a time. ctx: *mut c_void,
/// Referenes (`&`) to the handle can call queries and perform operations in a thread safe way.
/// Only an owned version of the handle can `start` or `stop` the node.
pub struct WakuNodeHandle<State: WakuNodeState>(PhantomData<State>);
/// We do not have any inner state, so the handle should be safe to be send among threads.
unsafe impl<State: WakuNodeState> Send for WakuNodeHandle<State> {}
/// References to the handle are safe to share, as they do not mutate the handle itself and
/// operations are performed by the bindings backend, which is supposed to be thread safe.
unsafe impl<State: WakuNodeState> Sync for WakuNodeHandle<State> {}
impl<State: WakuNodeState> WakuNodeHandle<State> {
// /// If the execution is successful, the result is the peer ID as a string (base58 encoded)
// /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop)
// pub fn peer_id(&self) -> Result<PeerId> {
// management::waku_peer_id()
// }
// /// Get the multiaddresses the Waku node is listening to
// /// as per [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_listen_addresses)
// pub fn listen_addresses(&self) -> Result<Vec<Multiaddr>> {
// management::waku_listen_addresses()
// }
// /// Add a node multiaddress and protocol to the waku nodes peerstore.
// /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_add_peerchar-address-char-protocolid)
// pub fn add_peer(&self, address: &Multiaddr, protocol_id: ProtocolId) -> Result<PeerId> {
// peers::waku_add_peers(address, protocol_id)
// }
} }
fn stop_node() -> Result<()> { impl WakuNodeHandle {
let mut node_initialized = WAKU_NODE_INITIALIZED
.lock()
.expect("Access to the mutex at some point");
*node_initialized = false;
management::waku_stop().map(|_| ())
}
impl WakuNodeHandle<Initialized> {
/// Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation. /// Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation.
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_start) /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_start)
pub fn start(self) -> Result<WakuNodeHandle<Running>> { pub fn start(self) -> Result<()> {
management::waku_start().map(|_| WakuNodeHandle(Default::default())) management::waku_start(self.ctx)
} }
/// Stops a Waku node /// Stops a Waku node
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop) /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop)
pub fn stop(self) -> Result<()> { pub fn stop(self) -> Result<()> {
stop_node() management::waku_stop(self.ctx)
}
}
impl WakuNodeHandle<Running> {
/// Stops a Waku node
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop)
pub fn stop(self) -> Result<()> {
stop_node()
} }
/// Dial peer using a multiaddress /// Dial peer using a multiaddress
@ -104,7 +43,7 @@ impl WakuNodeHandle<Running> {
/// Use 0 for no timeout /// Use 0 for no timeout
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_connect_peerchar-address-int-timeoutms) /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_connect_peerchar-address-int-timeoutms)
pub fn connect(&self, address: &Multiaddr, timeout: Option<Duration>) -> Result<()> { pub fn connect(&self, address: &Multiaddr, timeout: Option<Duration>) -> Result<()> {
peers::waku_connect(address, timeout) peers::waku_connect(self.ctx, address, timeout)
} }
/// Publish a message using Waku Relay. /// Publish a message using Waku Relay.
@ -116,45 +55,24 @@ impl WakuNodeHandle<Running> {
pubsub_topic: &WakuPubSubTopic, pubsub_topic: &WakuPubSubTopic,
timeout: Option<Duration>, timeout: Option<Duration>,
) -> Result<MessageId> { ) -> Result<MessageId> {
relay::waku_relay_publish_message(message, pubsub_topic, timeout) relay::waku_relay_publish_message(self.ctx, message, pubsub_topic, timeout)
} }
/// Subscribe to WakuRelay to receive messages matching a content filter. /// Subscribe to WakuRelay to receive messages matching a content filter.
pub fn relay_subscribe(&self, pubsub_topic: &WakuPubSubTopic) -> Result<()> { pub fn relay_subscribe(&self, pubsub_topic: &WakuPubSubTopic) -> Result<()> {
relay::waku_relay_subscribe(pubsub_topic) relay::waku_relay_subscribe(self.ctx, pubsub_topic)
} }
/// Closes the pubsub subscription to stop receiving messages matching a content filter. No more messages will be received from this pubsub topic /// Closes the pubsub subscription to stop receiving messages matching a content filter. No more messages will be received from this pubsub topic
pub fn relay_unsubscribe(&self, pubsub_topic: &WakuPubSubTopic) -> Result<()> { pub fn relay_unsubscribe(&self, pubsub_topic: &WakuPubSubTopic) -> Result<()> {
relay::waku_relay_unsubscribe(pubsub_topic) relay::waku_relay_unsubscribe(self.ctx, pubsub_topic)
} }
} }
/// Spawn a new Waku node with the given configuration (default configuration if `None` provided) /// Spawn a new Waku node with the given configuration (default configuration if `None` provided)
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_newchar-jsonconfig) /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_newchar-jsonconfig)
pub fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeHandle<Initialized>> { pub fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeHandle> {
let mut node_initialized = WAKU_NODE_INITIALIZED Ok(WakuNodeHandle {
.lock() ctx: management::waku_new(config)?,
.expect("Access to the mutex at some point"); })
if *node_initialized {
return Err("Waku node is already initialized".into());
}
*node_initialized = true;
management::waku_new(config).map(|_| WakuNodeHandle(Default::default()))
}
#[cfg(test)]
mod tests {
use super::waku_new;
use serial_test::serial;
#[test]
#[serial]
fn exclusive_running() {
let handle1 = waku_new(None).unwrap();
let handle2 = waku_new(None);
assert!(handle2.is_err());
let stop_handle = handle1.start().unwrap();
stop_handle.stop().unwrap();
}
} }

View File

@ -15,7 +15,11 @@ use crate::utils::{get_trampoline, handle_json_response, handle_no_response, han
/// If the function execution takes longer than `timeout` value, the execution will be canceled and an error returned. /// If the function execution takes longer than `timeout` value, the execution will be canceled and an error returned.
/// Use 0 for no timeout /// Use 0 for no timeout
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_connect_peerchar-address-int-timeoutms) /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_connect_peerchar-address-int-timeoutms)
pub fn waku_connect(address: &Multiaddr, timeout: Option<Duration>) -> Result<()> { pub fn waku_connect(
ctx: *mut c_void,
address: &Multiaddr,
timeout: Option<Duration>,
) -> Result<()> {
let address_ptr = CString::new(address.to_string()) let address_ptr = CString::new(address.to_string())
.expect("CString should build properly from multiaddress") .expect("CString should build properly from multiaddress")
.into_raw(); .into_raw();
@ -26,9 +30,10 @@ pub fn waku_connect(address: &Multiaddr, timeout: Option<Duration>) -> Result<()
let mut closure = error_cb; let mut closure = error_cb;
let cb = get_trampoline(&closure); let cb = get_trampoline(&closure);
let out = waku_sys::waku_connect( let out = waku_sys::waku_connect(
ctx,
address_ptr, address_ptr,
timeout timeout
.map(|duration| duration.as_millis().try_into().unwrap_or(i32::MAX)) .map(|duration| duration.as_millis().try_into().unwrap_or(u32::MAX))
.unwrap_or(0), .unwrap_or(0),
cb, cb,
&mut closure as *mut _ as *mut c_void, &mut closure as *mut _ as *mut c_void,

View File

@ -12,17 +12,15 @@ use crate::utils::{get_trampoline, handle_json_response, handle_no_response, han
/// Create a content topic according to [RFC 23](https://rfc.vac.dev/spec/23/) /// Create a content topic according to [RFC 23](https://rfc.vac.dev/spec/23/)
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_content_topicchar-applicationname-unsigned-int-applicationversion-char-contenttopicname-char-encoding) /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_content_topicchar-applicationname-unsigned-int-applicationversion-char-contenttopicname-char-encoding)
pub fn waku_create_content_topic( pub fn waku_create_content_topic(
ctx: *mut c_void,
application_name: &str, application_name: &str,
application_version: &str, application_version: u32,
content_topic_name: &str, content_topic_name: &str,
encoding: Encoding, encoding: Encoding,
) -> WakuContentTopic { ) -> WakuContentTopic {
let application_name_ptr = CString::new(application_name) let application_name_ptr = CString::new(application_name)
.expect("Application name should always transform to CString") .expect("Application name should always transform to CString")
.into_raw(); .into_raw();
let application_version_ptr = CString::new(application_version)
.expect("Application version should always transform to CString")
.into_raw();
let content_topic_name_ptr = CString::new(content_topic_name) let content_topic_name_ptr = CString::new(content_topic_name)
.expect("Content topic should always transform to CString") .expect("Content topic should always transform to CString")
.into_raw(); .into_raw();
@ -36,8 +34,9 @@ pub fn waku_create_content_topic(
let mut closure = result_cb; let mut closure = result_cb;
let cb = get_trampoline(&closure); let cb = get_trampoline(&closure);
let out = waku_sys::waku_content_topic( let out = waku_sys::waku_content_topic(
ctx,
application_name_ptr, application_name_ptr,
application_version_ptr, application_version,
content_topic_name_ptr, content_topic_name_ptr,
encoding_ptr, encoding_ptr,
cb, cb,
@ -45,7 +44,6 @@ pub fn waku_create_content_topic(
); );
drop(CString::from_raw(application_name_ptr)); drop(CString::from_raw(application_name_ptr));
drop(CString::from_raw(application_version_ptr));
drop(CString::from_raw(content_topic_name_ptr)); drop(CString::from_raw(content_topic_name_ptr));
drop(CString::from_raw(encoding_ptr)); drop(CString::from_raw(encoding_ptr));
@ -57,13 +55,13 @@ pub fn waku_create_content_topic(
} }
/// Default pubsub topic used for exchanging waku messages defined in [RFC 10](https://rfc.vac.dev/spec/10/) /// Default pubsub topic used for exchanging waku messages defined in [RFC 10](https://rfc.vac.dev/spec/10/)
pub fn waku_default_pubsub_topic() -> WakuPubSubTopic { pub fn waku_default_pubsub_topic(ctx: *mut c_void) -> WakuPubSubTopic {
let mut result: String = Default::default(); let mut result: String = Default::default();
let result_cb = |v: &str| result = v.to_string(); let result_cb = |v: &str| result = v.to_string();
let code = unsafe { let code = unsafe {
let mut closure = result_cb; let mut closure = result_cb;
let cb = get_trampoline(&closure); let cb = get_trampoline(&closure);
waku_sys::waku_default_pubsub_topic(cb, &mut closure as *mut _ as *mut c_void) waku_sys::waku_default_pubsub_topic(ctx, cb, &mut closure as *mut _ as *mut c_void)
}; };
handle_response(code, &result).expect("&str from result should always be extracted") handle_response(code, &result).expect("&str from result should always be extracted")
@ -72,6 +70,7 @@ pub fn waku_default_pubsub_topic() -> WakuPubSubTopic {
/// Publish a message using Waku Relay /// Publish a message using Waku Relay
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms) /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms)
pub fn waku_relay_publish_message( pub fn waku_relay_publish_message(
ctx: *mut c_void,
message: &WakuMessage, message: &WakuMessage,
pubsub_topic: &WakuPubSubTopic, pubsub_topic: &WakuPubSubTopic,
timeout: Option<Duration>, timeout: Option<Duration>,
@ -94,6 +93,7 @@ pub fn waku_relay_publish_message(
let mut closure = result_cb; let mut closure = result_cb;
let cb = get_trampoline(&closure); let cb = get_trampoline(&closure);
let out = waku_sys::waku_relay_publish( let out = waku_sys::waku_relay_publish(
ctx,
message_ptr, message_ptr,
pubsub_topic_ptr, pubsub_topic_ptr,
timeout timeout
@ -101,7 +101,7 @@ pub fn waku_relay_publish_message(
duration duration
.as_millis() .as_millis()
.try_into() .try_into()
.expect("Duration as milliseconds should fit in a i32") .expect("Duration as milliseconds should fit in a u32")
}) })
.unwrap_or(0), .unwrap_or(0),
cb, cb,
@ -117,7 +117,7 @@ pub fn waku_relay_publish_message(
handle_response(code, &result) handle_response(code, &result)
} }
pub fn waku_relay_subscribe(pubsub_topic: &WakuPubSubTopic) -> Result<()> { pub fn waku_relay_subscribe(ctx: *mut c_void, pubsub_topic: &WakuPubSubTopic) -> Result<()> {
let pubsub_topic = pubsub_topic.to_string(); let pubsub_topic = pubsub_topic.to_string();
let pubsub_topic_ptr = CString::new(pubsub_topic) let pubsub_topic_ptr = CString::new(pubsub_topic)
.expect("CString should build properly from pubsub topic") .expect("CString should build properly from pubsub topic")
@ -129,6 +129,7 @@ pub fn waku_relay_subscribe(pubsub_topic: &WakuPubSubTopic) -> Result<()> {
let mut closure = error_cb; let mut closure = error_cb;
let cb = get_trampoline(&closure); let cb = get_trampoline(&closure);
let out = waku_sys::waku_relay_subscribe( let out = waku_sys::waku_relay_subscribe(
ctx,
pubsub_topic_ptr, pubsub_topic_ptr,
cb, cb,
&mut closure as *mut _ as *mut c_void, &mut closure as *mut _ as *mut c_void,
@ -142,7 +143,7 @@ pub fn waku_relay_subscribe(pubsub_topic: &WakuPubSubTopic) -> Result<()> {
handle_no_response(code, &error) handle_no_response(code, &error)
} }
pub fn waku_relay_unsubscribe(pubsub_topic: &WakuPubSubTopic) -> Result<()> { pub fn waku_relay_unsubscribe(ctx: *mut c_void, pubsub_topic: &WakuPubSubTopic) -> Result<()> {
let pubsub_topic = pubsub_topic.to_string(); let pubsub_topic = pubsub_topic.to_string();
let pubsub_topic_ptr = CString::new(pubsub_topic) let pubsub_topic_ptr = CString::new(pubsub_topic)
.expect("CString should build properly from pubsub topic") .expect("CString should build properly from pubsub topic")
@ -154,6 +155,7 @@ pub fn waku_relay_unsubscribe(pubsub_topic: &WakuPubSubTopic) -> Result<()> {
let mut closure = error_cb; let mut closure = error_cb;
let cb = get_trampoline(&closure); let cb = get_trampoline(&closure);
let out = waku_sys::waku_relay_subscribe( let out = waku_sys::waku_relay_subscribe(
ctx,
pubsub_topic_ptr, pubsub_topic_ptr,
cb, cb,
&mut closure as *mut _ as *mut c_void, &mut closure as *mut _ as *mut c_void,

View File

@ -1,9 +1,10 @@
use crate::general::Result; use crate::general::Result;
use core::str::FromStr; use core::str::FromStr;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use std::ffi::CStr; use std::{slice, str};
use waku_sys::WakuCallBack; use waku_sys::WakuCallBack;
use waku_sys::{RET_ERR, RET_MISSING_CALLBACK, RET_OK}; use waku_sys::{RET_ERR, RET_MISSING_CALLBACK, RET_OK};
pub fn decode<T: DeserializeOwned>(input: &str) -> Result<T> { pub fn decode<T: DeserializeOwned>(input: &str) -> Result<T> {
serde_json::from_str(input) serde_json::from_str(input)
.map_err(|err| format!("could not deserialize waku response: {}", err)) .map_err(|err| format!("could not deserialize waku response: {}", err))
@ -12,6 +13,7 @@ pub fn decode<T: DeserializeOwned>(input: &str) -> Result<T> {
unsafe extern "C" fn trampoline<F>( unsafe extern "C" fn trampoline<F>(
_ret_code: ::std::os::raw::c_int, _ret_code: ::std::os::raw::c_int,
data: *const ::std::os::raw::c_char, data: *const ::std::os::raw::c_char,
data_len: usize,
user_data: *mut ::std::os::raw::c_void, user_data: *mut ::std::os::raw::c_void,
) where ) where
F: FnMut(&str), F: FnMut(&str),
@ -21,14 +23,7 @@ unsafe extern "C" fn trampoline<F>(
let response = if data.is_null() { let response = if data.is_null() {
"" ""
} else { } else {
unsafe { CStr::from_ptr(data) } str::from_utf8(slice::from_raw_parts(data as *mut u8, data_len))
.to_str()
.map_err(|err| {
format!(
"could not retrieve response from pointer returned by waku: {}",
err
)
})
.expect("could not retrieve response") .expect("could not retrieve response")
}; };

View File

@ -3,15 +3,12 @@ use multiaddr::Multiaddr;
use rand::thread_rng; use rand::thread_rng;
use secp256k1::SecretKey; use secp256k1::SecretKey;
use serial_test::serial; use serial_test::serial;
use std::net::IpAddr;
use std::str::FromStr;
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
use std::{collections::HashSet, str::from_utf8}; use std::{collections::HashSet, str::from_utf8};
use tokio::sync::mpsc::{self, Sender}; use tokio::sync::mpsc::{self, Sender};
use tokio::time; use tokio::time;
use waku_bindings::{ use waku_bindings::{
waku_default_pubsub_topic, waku_new, waku_set_event_callback, ContentFilter, Encoding, Event, waku_new, waku_set_event_callback, Encoding, Event, Key, MessageId, WakuContentTopic,
GossipSubParams, Key, MessageId, ProtocolId, Running, WakuContentTopic, WakuLogLevel,
WakuMessage, WakuNodeConfig, WakuNodeHandle, WakuPubSubTopic, WakuMessage, WakuNodeConfig, WakuNodeHandle, WakuPubSubTopic,
}; };
@ -25,7 +22,7 @@ const NODES: &[&str] = &[
]; ];
fn try_publish_relay_messages( fn try_publish_relay_messages(
node: &WakuNodeHandle<Running>, node: &WakuNodeHandle,
msg: &WakuMessage, msg: &WakuMessage,
) -> Result<HashSet<MessageId>, String> { ) -> Result<HashSet<MessageId>, String> {
Ok(HashSet::from( Ok(HashSet::from(
@ -33,24 +30,6 @@ fn try_publish_relay_messages(
)) ))
} }
fn try_publish_lightpush_messages(
node: &WakuNodeHandle<Running>,
msg: &WakuMessage,
) -> Result<HashSet<MessageId>, String> {
let peer_id = node
.peers()
.unwrap()
.iter()
.map(|peer| peer.peer_id())
.find(|id| id.as_str() != node.peer_id().unwrap().as_str())
.unwrap()
.clone();
Ok(HashSet::from([
node.lightpush_publish(msg, None, peer_id, None)?
]))
}
#[derive(Debug)] #[derive(Debug)]
struct Response { struct Response {
id: MessageId, id: MessageId,
@ -63,19 +42,7 @@ fn set_callback(tx: Sender<Response>, sk: SecretKey, ssk: Key<Aes256Gcm>) {
let id = message.message_id(); let id = message.message_id();
let message = message.waku_message(); let message = message.waku_message();
let payload = if let Ok(message) = message let payload = message.payload().to_vec();
.try_decode_asymmetric(&sk)
.map_err(|e| println!("{e}"))
{
message.data().to_vec()
} else if let Ok(message) = message
.try_decode_symmetric(&ssk)
.map_err(|e| println!("{e}"))
{
message.data().to_vec()
} else {
message.payload().to_vec()
};
futures::executor::block_on(tx.send(Response { futures::executor::block_on(tx.send(Response {
id: id.to_string(), id: id.to_string(),
@ -87,7 +54,7 @@ fn set_callback(tx: Sender<Response>, sk: SecretKey, ssk: Key<Aes256Gcm>) {
} }
async fn test_echo_messages( async fn test_echo_messages(
node: &WakuNodeHandle<Running>, node: &WakuNodeHandle,
content: &'static str, content: &'static str,
content_topic: WakuContentTopic, content_topic: WakuContentTopic,
sk: SecretKey, sk: SecretKey,
@ -112,8 +79,6 @@ async fn test_echo_messages(
let mut ids = try_publish_relay_messages(node, &message).expect("send relay messages"); let mut ids = try_publish_relay_messages(node, &message).expect("send relay messages");
ids.extend(try_publish_lightpush_messages(node, &message).expect("send lightpush messages"));
while let Some(res) = rx.recv().await { while let Some(res) = rx.recv().await {
if ids.take(&res.id).is_some() { if ids.take(&res.id).is_some() {
let msg = from_utf8(&res.payload).expect("should be valid message"); let msg = from_utf8(&res.payload).expect("should be valid message");
@ -126,98 +91,28 @@ async fn test_echo_messages(
} }
} }
#[ignore]
#[tokio::test]
#[serial]
async fn discv5_echo() -> Result<(), String> {
let config = WakuNodeConfig {
host: IpAddr::from_str("0.0.0.0").ok(),
log_level: Some(WakuLogLevel::Error),
discv5: Some(true),
discv5_udp_port: Some(9000),
discv5_bootstrap_nodes: Vec::new(),
..Default::default()
};
let node = waku_new(Some(config))?;
let node = node.start()?;
println!("Node peer id: {}", node.peer_id()?);
for node_address in NODES {
let address: Multiaddr = node_address.parse().unwrap();
let peer_id = node.add_peer(&address, ProtocolId::Relay)?;
node.connect_peer_with_id(&peer_id, None)?;
}
assert!(node.peers()?.len() >= NODES.len());
assert!(node.peer_count()? >= NODES.len());
assert!(node.relay_enough_peers(None)?);
let sk = SecretKey::new(&mut thread_rng());
let ssk = Aes256Gcm::generate_key(&mut thread_rng());
// Subscribe to default channel.
let content_filter = ContentFilter::new(Some(waku_default_pubsub_topic()), vec![]);
node.relay_subscribe(&content_filter)?;
let content_topic = WakuContentTopic::new("toychat", "2", "huilong", Encoding::Proto);
let topics = node.relay_topics()?;
let default_topic = waku_default_pubsub_topic();
assert!(topics.len() == 1);
let topic: WakuPubSubTopic = topics[0].parse().unwrap();
assert!(topic == default_topic);
let sleep = time::sleep(Duration::from_secs(ECHO_TIMEOUT));
tokio::pin!(sleep);
// Send and receive messages. Waits until all messages received.
let got_all = tokio::select! {
_ = sleep => false,
_ = test_echo_messages(&node, ECHO_MESSAGE, content_topic, sk, ssk) => true,
};
assert!(got_all);
for node_data in node.peers()? {
if node_data.peer_id() != &node.peer_id()? {
node.disconnect_peer_with_id(node_data.peer_id())?;
}
}
node.stop()?;
Ok(())
}
#[ignore] #[ignore]
#[tokio::test] #[tokio::test]
#[serial] #[serial]
async fn default_echo() -> Result<(), String> { async fn default_echo() -> Result<(), String> {
let config = WakuNodeConfig { let config = WakuNodeConfig {
log_level: Some(WakuLogLevel::Error),
..Default::default() ..Default::default()
}; };
let node = waku_new(Some(config))?; let node = waku_new(Some(config))?;
let node = node.start()?;
println!("Node peer id: {}", node.peer_id()?); node.start()?;
for node_address in NODES { for node_address in NODES {
let address: Multiaddr = node_address.parse().unwrap(); let address: Multiaddr = node_address.parse().unwrap();
let peer_id = node.add_peer(&address, ProtocolId::Relay)?; node.connect(&address, None)?;
node.connect_peer_with_id(&peer_id, None)?;
} }
assert!(node.peers()?.len() >= NODES.len());
assert!(node.peer_count()? >= NODES.len());
assert!(node.relay_enough_peers(None)?);
let sk = SecretKey::new(&mut thread_rng()); let sk = SecretKey::new(&mut thread_rng());
let ssk = Aes256Gcm::generate_key(&mut thread_rng()); let ssk = Aes256Gcm::generate_key(&mut thread_rng());
// subscribe to default channel // subscribe to default channel
let content_filter = ContentFilter::new(Some(waku_default_pubsub_topic()), vec![]);
node.relay_subscribe(&content_filter)?; node.relay_subscribe(&content_filter)?;
let content_topic = WakuContentTopic::new("toychat", "2", "huilong", Encoding::Proto); let content_topic = WakuContentTopic::new("toychat", "2", "huilong", Encoding::Proto);
let sleep = time::sleep(Duration::from_secs(ECHO_TIMEOUT)); let sleep = time::sleep(Duration::from_secs(ECHO_TIMEOUT));
@ -231,87 +126,6 @@ async fn default_echo() -> Result<(), String> {
assert!(got_all); assert!(got_all);
for node_data in node.peers()? {
if node_data.peer_id() != &node.peer_id()? {
node.disconnect_peer_with_id(node_data.peer_id())?;
}
}
node.stop()?;
Ok(())
}
#[test]
#[serial]
fn gossipsub_config() -> Result<(), String> {
let params = GossipSubParams {
d: Some(6),
dlo: Some(3),
dhi: Some(12),
dscore: Some(10),
dout: Some(8),
history_length: Some(500),
history_gossip: Some(3),
dlazy: Some(12),
gossip_factor: Some(0.25),
gossip_retransmission: Some(4),
heartbeat_initial_delay_ms: Some(500),
heartbeat_interval_seconds: Some(60),
slow_heartbeat_warning: Some(0.5),
fanout_ttl_seconds: Some(60),
prune_peers: Some(3),
prune_backoff_seconds: Some(900),
unsubscribe_backoff_seconds: Some(60),
connectors: Some(3),
max_pending_connections: Some(50),
connection_timeout_seconds: Some(15),
direct_connect_ticks: Some(5),
direct_connect_initial_delay_seconds: Some(5),
opportunistic_graft_ticks: Some(8),
opportunistic_graft_peers: Some(2),
graft_flood_threshold_seconds: Some(120),
max_ihave_length: Some(32),
max_ihave_messages: Some(8),
iwant_followup_time_seconds: Some(120),
seen_messages_ttl_seconds: Some(120),
};
let config = WakuNodeConfig {
gossipsub_params: params.into(),
log_level: Some(WakuLogLevel::Error),
..Default::default()
};
let node = waku_new(Some(config))?;
let node = node.start()?;
node.stop()?;
Ok(())
}
#[test]
#[serial]
fn loglevel_error() -> Result<(), String> {
let config = WakuNodeConfig {
log_level: Some(WakuLogLevel::Error),
..Default::default()
};
let node = waku_new(Some(config))?;
let node = node.start()?;
node.stop()?;
Ok(())
}
#[test]
#[serial]
fn loglevel_info() -> Result<(), String> {
let config = WakuNodeConfig {
log_level: Some(WakuLogLevel::Info),
..Default::default()
};
let node = waku_new(Some(config))?;
let node = node.start()?;
node.stop()?; node.stop()?;
Ok(()) Ok(())
} }
@ -323,9 +137,9 @@ fn node_restart() {
for _ in 0..3 { for _ in 0..3 {
let node = waku_new(config.clone().into()).expect("default config should be valid"); let node = waku_new(config.clone().into()).expect("default config should be valid");
let node = node.start().expect("node should start with valid config");
assert!(node.peer_id().is_ok()); node.start().expect("node should start with valid config");
node.stop().expect("node should stop"); node.stop().expect("node should stop");
} }
} }