diff --git a/waku-bindings/src/events/mod.rs b/waku-bindings/src/events/mod.rs index 4a0ed0a..2222b51 100644 --- a/waku-bindings/src/events/mod.rs +++ b/waku-bindings/src/events/mod.rs @@ -67,15 +67,6 @@ impl WakuMessageEvent { } } -/// Shared callback slot. Callbacks are registered here so they can be accessed by the extern "C" -#[allow(clippy::type_complexity)] -static CALLBACK: Lazy>> = - Lazy::new(|| Mutex::new(Box::new(|_| {}))); - -/// Register global callback -fn set_callback(f: F) { - *CALLBACK.lock().unwrap() = Box::new(f); -} /// Wrapper callback, it transformst the `*const c_char` into a [`Signal`] /// and executes the [`CALLBACK`] funtion with it @@ -94,7 +85,7 @@ extern "C" fn callback(_ret_code: c_int, data: *const c_char, _user_data: *mut c /// Register callback to act as event handler and receive application signals, /// which are used to react to asynchronous events in Waku pub fn waku_set_event_callback(f: F) { - set_callback(f); + // TODO: unsafe { waku_sys::waku_set_event_callback(Some(callback)) }; } diff --git a/waku-bindings/src/general/mod.rs b/waku-bindings/src/general/mod.rs index bb0e699..d15bb66 100644 --- a/waku-bindings/src/general/mod.rs +++ b/waku-bindings/src/general/mod.rs @@ -239,8 +239,7 @@ mod base64_serde { mod tests { use super::*; use crate::WakuPubSubTopic; - use secp256k1::{rand, Secp256k1}; - use std::time::SystemTime; + #[test] fn parse_waku_topic() { let s = "/waku/2/default-waku/proto"; diff --git a/waku-bindings/src/lib.rs b/waku-bindings/src/lib.rs index bfd4383..f9666fb 100644 --- a/waku-bindings/src/lib.rs +++ b/waku-bindings/src/lib.rs @@ -7,12 +7,12 @@ mod node; mod utils; pub use node::{ - waku_create_content_topic, waku_default_pubsub_topic, waku_new, Initialized, Key, Multiaddr, - PublicKey, Running, SecretKey, WakuNodeConfig, WakuNodeHandle, + waku_create_content_topic, waku_default_pubsub_topic, waku_new, Key, Multiaddr, PublicKey, + SecretKey, WakuNodeConfig, WakuNodeHandle, }; pub use general::{ - MessageId, Result, WakuContentTopic, WakuMessage, WakuMessageVersion, WakuPubSubTopic, + Encoding, MessageId, Result, WakuContentTopic, WakuMessage, WakuMessageVersion, WakuPubSubTopic, }; pub use events::{waku_set_event_callback, Event, Signal, WakuMessageEvent}; diff --git a/waku-bindings/src/node/management.rs b/waku-bindings/src/node/management.rs index 1db52c8..d25cf4b 100644 --- a/waku-bindings/src/node/management.rs +++ b/waku-bindings/src/node/management.rs @@ -3,7 +3,7 @@ // std use std::ffi::CString; // crates -use libc::*; +use libc::c_void; // internal use super::config::WakuNodeConfig; use crate::general::Result; @@ -11,7 +11,7 @@ use crate::utils::{get_trampoline, handle_json_response, handle_no_response, han /// Instantiates a Waku node /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_newchar-jsonconfig) -pub fn waku_new(config: Option) -> Result<()> { +pub fn waku_new(config: Option) -> Result<*mut c_void> { let config = config.unwrap_or_default(); let config_ptr = CString::new( @@ -23,7 +23,7 @@ pub fn waku_new(config: Option) -> Result<()> { let mut error: String = Default::default(); let error_cb = |v: &str| error = v.to_string(); - let code = unsafe { + let node_ptr = unsafe { let mut closure = error_cb; let cb = get_trampoline(&closure); let out = waku_sys::waku_new(config_ptr, cb, &mut closure as *mut _ as *mut c_void); @@ -33,18 +33,18 @@ pub fn waku_new(config: Option) -> Result<()> { out }; - handle_no_response(code, &error) + Ok(node_ptr) } /// Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation. /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_start) -pub fn waku_start() -> Result<()> { +pub fn waku_start(ctx: *mut c_void) -> Result<()> { let mut error: String = Default::default(); let error_cb = |v: &str| error = v.to_string(); let code = unsafe { let mut closure = error_cb; let cb = get_trampoline(&closure); - waku_sys::waku_start(cb, &mut closure as *mut _ as *mut c_void) + waku_sys::waku_start(ctx, cb, &mut closure as *mut _ as *mut c_void) }; handle_no_response(code, &error) @@ -52,13 +52,13 @@ pub fn waku_start() -> Result<()> { /// Stops a Waku node /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop) -pub fn waku_stop() -> Result<()> { +pub fn waku_stop(ctx: *mut c_void) -> Result<()> { let mut error: String = Default::default(); let error_cb = |v: &str| error = v.to_string(); let code = unsafe { let mut closure = error_cb; let cb = get_trampoline(&closure); - waku_sys::waku_stop(cb, &mut closure as *mut _ as *mut c_void) + waku_sys::waku_stop(ctx, cb, &mut closure as *mut _ as *mut c_void) }; handle_no_response(code, &error) @@ -67,28 +67,14 @@ pub fn waku_stop() -> Result<()> { #[cfg(test)] mod test { use super::waku_new; - use crate::node::management::{waku_listen_addresses, waku_peer_id, waku_start, waku_stop}; - use crate::node::peers::waku_peer_count; + use crate::node::management::{waku_start, waku_stop}; use serial_test::serial; #[test] #[serial] fn waku_flow() { - waku_new(None).unwrap(); - waku_start().unwrap(); - // test peer id call, since we cannot start different instances of the node - let id = waku_peer_id().unwrap(); - dbg!(&id); - assert!(!id.is_empty()); - - let peer_cnt = waku_peer_count().unwrap(); - dbg!(peer_cnt); - - // test addresses, since we cannot start different instances of the node - let addresses = waku_listen_addresses().unwrap(); - dbg!(&addresses); - assert!(!addresses.is_empty()); - - waku_stop().unwrap(); + let node = waku_new(None).unwrap(); + waku_start(node).unwrap(); + waku_stop(node).unwrap(); } } diff --git a/waku-bindings/src/node/mod.rs b/waku-bindings/src/node/mod.rs index 02c2997..e4f8c51 100644 --- a/waku-bindings/src/node/mod.rs +++ b/waku-bindings/src/node/mod.rs @@ -9,10 +9,9 @@ mod relay; pub use aes_gcm::{Aes256Gcm, Key}; pub use multiaddr::Multiaddr; pub use secp256k1::{PublicKey, SecretKey}; -use std::marker::PhantomData; -use std::sync::Mutex; use std::time::Duration; // crates +use libc::c_void; // internal use crate::general::{MessageId, Result, WakuMessage, WakuPubSubTopic}; @@ -20,82 +19,22 @@ use crate::general::{MessageId, Result, WakuMessage, WakuPubSubTopic}; pub use config::WakuNodeConfig; pub use relay::{waku_create_content_topic, waku_default_pubsub_topic}; -/// Shared flag to check if a waku node is already running in the current process -static WAKU_NODE_INITIALIZED: Mutex = Mutex::new(false); - -/// Marker trait to disallow undesired waku node states in the handle -pub trait WakuNodeState {} - -/// Waku node initialized state -pub struct Initialized; - -/// Waku node running state -pub struct Running; - -impl WakuNodeState for Initialized {} -impl WakuNodeState for Running {} - /// Handle to the underliying waku node -/// Safe to sendt to/through threads. -/// Only a waku node can be running at a time. -/// Referenes (`&`) to the handle can call queries and perform operations in a thread safe way. -/// Only an owned version of the handle can `start` or `stop` the node. -pub struct WakuNodeHandle(PhantomData); - -/// We do not have any inner state, so the handle should be safe to be send among threads. -unsafe impl Send for WakuNodeHandle {} - -/// References to the handle are safe to share, as they do not mutate the handle itself and -/// operations are performed by the bindings backend, which is supposed to be thread safe. -unsafe impl Sync for WakuNodeHandle {} - -impl WakuNodeHandle { - // /// If the execution is successful, the result is the peer ID as a string (base58 encoded) - // /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop) - // pub fn peer_id(&self) -> Result { - // management::waku_peer_id() - // } - - // /// Get the multiaddresses the Waku node is listening to - // /// as per [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_listen_addresses) - // pub fn listen_addresses(&self) -> Result> { - // management::waku_listen_addresses() - // } - - // /// Add a node multiaddress and protocol to the waku node’s peerstore. - // /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_add_peerchar-address-char-protocolid) - // pub fn add_peer(&self, address: &Multiaddr, protocol_id: ProtocolId) -> Result { - // peers::waku_add_peers(address, protocol_id) - // } +pub struct WakuNodeHandle { + ctx: *mut c_void, } -fn stop_node() -> Result<()> { - let mut node_initialized = WAKU_NODE_INITIALIZED - .lock() - .expect("Access to the mutex at some point"); - *node_initialized = false; - management::waku_stop().map(|_| ()) -} - -impl WakuNodeHandle { +impl WakuNodeHandle { /// Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation. /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_start) - pub fn start(self) -> Result> { - management::waku_start().map(|_| WakuNodeHandle(Default::default())) + pub fn start(self) -> Result<()> { + management::waku_start(self.ctx) } /// Stops a Waku node /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop) pub fn stop(self) -> Result<()> { - stop_node() - } -} - -impl WakuNodeHandle { - /// Stops a Waku node - /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop) - pub fn stop(self) -> Result<()> { - stop_node() + management::waku_stop(self.ctx) } /// Dial peer using a multiaddress @@ -104,7 +43,7 @@ impl WakuNodeHandle { /// Use 0 for no timeout /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_connect_peerchar-address-int-timeoutms) pub fn connect(&self, address: &Multiaddr, timeout: Option) -> Result<()> { - peers::waku_connect(address, timeout) + peers::waku_connect(self.ctx, address, timeout) } /// Publish a message using Waku Relay. @@ -116,45 +55,24 @@ impl WakuNodeHandle { pubsub_topic: &WakuPubSubTopic, timeout: Option, ) -> Result { - relay::waku_relay_publish_message(message, pubsub_topic, timeout) + relay::waku_relay_publish_message(self.ctx, message, pubsub_topic, timeout) } /// Subscribe to WakuRelay to receive messages matching a content filter. pub fn relay_subscribe(&self, pubsub_topic: &WakuPubSubTopic) -> Result<()> { - relay::waku_relay_subscribe(pubsub_topic) + relay::waku_relay_subscribe(self.ctx, pubsub_topic) } /// Closes the pubsub subscription to stop receiving messages matching a content filter. No more messages will be received from this pubsub topic pub fn relay_unsubscribe(&self, pubsub_topic: &WakuPubSubTopic) -> Result<()> { - relay::waku_relay_unsubscribe(pubsub_topic) + relay::waku_relay_unsubscribe(self.ctx, pubsub_topic) } } /// Spawn a new Waku node with the given configuration (default configuration if `None` provided) /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_newchar-jsonconfig) -pub fn waku_new(config: Option) -> Result> { - let mut node_initialized = WAKU_NODE_INITIALIZED - .lock() - .expect("Access to the mutex at some point"); - if *node_initialized { - return Err("Waku node is already initialized".into()); - } - *node_initialized = true; - management::waku_new(config).map(|_| WakuNodeHandle(Default::default())) -} - -#[cfg(test)] -mod tests { - use super::waku_new; - use serial_test::serial; - - #[test] - #[serial] - fn exclusive_running() { - let handle1 = waku_new(None).unwrap(); - let handle2 = waku_new(None); - assert!(handle2.is_err()); - let stop_handle = handle1.start().unwrap(); - stop_handle.stop().unwrap(); - } +pub fn waku_new(config: Option) -> Result { + Ok(WakuNodeHandle { + ctx: management::waku_new(config)?, + }) } diff --git a/waku-bindings/src/node/peers.rs b/waku-bindings/src/node/peers.rs index b9a7477..b37e03c 100644 --- a/waku-bindings/src/node/peers.rs +++ b/waku-bindings/src/node/peers.rs @@ -15,7 +15,11 @@ use crate::utils::{get_trampoline, handle_json_response, handle_no_response, han /// If the function execution takes longer than `timeout` value, the execution will be canceled and an error returned. /// Use 0 for no timeout /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_connect_peerchar-address-int-timeoutms) -pub fn waku_connect(address: &Multiaddr, timeout: Option) -> Result<()> { +pub fn waku_connect( + ctx: *mut c_void, + address: &Multiaddr, + timeout: Option, +) -> Result<()> { let address_ptr = CString::new(address.to_string()) .expect("CString should build properly from multiaddress") .into_raw(); @@ -26,9 +30,10 @@ pub fn waku_connect(address: &Multiaddr, timeout: Option) -> Result<() let mut closure = error_cb; let cb = get_trampoline(&closure); let out = waku_sys::waku_connect( + ctx, address_ptr, timeout - .map(|duration| duration.as_millis().try_into().unwrap_or(i32::MAX)) + .map(|duration| duration.as_millis().try_into().unwrap_or(u32::MAX)) .unwrap_or(0), cb, &mut closure as *mut _ as *mut c_void, diff --git a/waku-bindings/src/node/relay.rs b/waku-bindings/src/node/relay.rs index 343c3a6..d3c0a3f 100644 --- a/waku-bindings/src/node/relay.rs +++ b/waku-bindings/src/node/relay.rs @@ -12,17 +12,15 @@ use crate::utils::{get_trampoline, handle_json_response, handle_no_response, han /// Create a content topic according to [RFC 23](https://rfc.vac.dev/spec/23/) /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_content_topicchar-applicationname-unsigned-int-applicationversion-char-contenttopicname-char-encoding) pub fn waku_create_content_topic( + ctx: *mut c_void, application_name: &str, - application_version: &str, + application_version: u32, content_topic_name: &str, encoding: Encoding, ) -> WakuContentTopic { let application_name_ptr = CString::new(application_name) .expect("Application name should always transform to CString") .into_raw(); - let application_version_ptr = CString::new(application_version) - .expect("Application version should always transform to CString") - .into_raw(); let content_topic_name_ptr = CString::new(content_topic_name) .expect("Content topic should always transform to CString") .into_raw(); @@ -36,8 +34,9 @@ pub fn waku_create_content_topic( let mut closure = result_cb; let cb = get_trampoline(&closure); let out = waku_sys::waku_content_topic( + ctx, application_name_ptr, - application_version_ptr, + application_version, content_topic_name_ptr, encoding_ptr, cb, @@ -45,7 +44,6 @@ pub fn waku_create_content_topic( ); drop(CString::from_raw(application_name_ptr)); - drop(CString::from_raw(application_version_ptr)); drop(CString::from_raw(content_topic_name_ptr)); drop(CString::from_raw(encoding_ptr)); @@ -57,13 +55,13 @@ pub fn waku_create_content_topic( } /// Default pubsub topic used for exchanging waku messages defined in [RFC 10](https://rfc.vac.dev/spec/10/) -pub fn waku_default_pubsub_topic() -> WakuPubSubTopic { +pub fn waku_default_pubsub_topic(ctx: *mut c_void) -> WakuPubSubTopic { let mut result: String = Default::default(); let result_cb = |v: &str| result = v.to_string(); let code = unsafe { let mut closure = result_cb; let cb = get_trampoline(&closure); - waku_sys::waku_default_pubsub_topic(cb, &mut closure as *mut _ as *mut c_void) + waku_sys::waku_default_pubsub_topic(ctx, cb, &mut closure as *mut _ as *mut c_void) }; handle_response(code, &result).expect("&str from result should always be extracted") @@ -72,6 +70,7 @@ pub fn waku_default_pubsub_topic() -> WakuPubSubTopic { /// Publish a message using Waku Relay /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms) pub fn waku_relay_publish_message( + ctx: *mut c_void, message: &WakuMessage, pubsub_topic: &WakuPubSubTopic, timeout: Option, @@ -94,6 +93,7 @@ pub fn waku_relay_publish_message( let mut closure = result_cb; let cb = get_trampoline(&closure); let out = waku_sys::waku_relay_publish( + ctx, message_ptr, pubsub_topic_ptr, timeout @@ -101,7 +101,7 @@ pub fn waku_relay_publish_message( duration .as_millis() .try_into() - .expect("Duration as milliseconds should fit in a i32") + .expect("Duration as milliseconds should fit in a u32") }) .unwrap_or(0), cb, @@ -117,7 +117,7 @@ pub fn waku_relay_publish_message( handle_response(code, &result) } -pub fn waku_relay_subscribe(pubsub_topic: &WakuPubSubTopic) -> Result<()> { +pub fn waku_relay_subscribe(ctx: *mut c_void, pubsub_topic: &WakuPubSubTopic) -> Result<()> { let pubsub_topic = pubsub_topic.to_string(); let pubsub_topic_ptr = CString::new(pubsub_topic) .expect("CString should build properly from pubsub topic") @@ -129,6 +129,7 @@ pub fn waku_relay_subscribe(pubsub_topic: &WakuPubSubTopic) -> Result<()> { let mut closure = error_cb; let cb = get_trampoline(&closure); let out = waku_sys::waku_relay_subscribe( + ctx, pubsub_topic_ptr, cb, &mut closure as *mut _ as *mut c_void, @@ -142,7 +143,7 @@ pub fn waku_relay_subscribe(pubsub_topic: &WakuPubSubTopic) -> Result<()> { handle_no_response(code, &error) } -pub fn waku_relay_unsubscribe(pubsub_topic: &WakuPubSubTopic) -> Result<()> { +pub fn waku_relay_unsubscribe(ctx: *mut c_void, pubsub_topic: &WakuPubSubTopic) -> Result<()> { let pubsub_topic = pubsub_topic.to_string(); let pubsub_topic_ptr = CString::new(pubsub_topic) .expect("CString should build properly from pubsub topic") @@ -154,6 +155,7 @@ pub fn waku_relay_unsubscribe(pubsub_topic: &WakuPubSubTopic) -> Result<()> { let mut closure = error_cb; let cb = get_trampoline(&closure); let out = waku_sys::waku_relay_subscribe( + ctx, pubsub_topic_ptr, cb, &mut closure as *mut _ as *mut c_void, diff --git a/waku-bindings/src/utils.rs b/waku-bindings/src/utils.rs index 0c2ba1c..bc2df6a 100644 --- a/waku-bindings/src/utils.rs +++ b/waku-bindings/src/utils.rs @@ -1,9 +1,10 @@ use crate::general::Result; use core::str::FromStr; use serde::de::DeserializeOwned; -use std::ffi::CStr; +use std::{slice, str}; use waku_sys::WakuCallBack; use waku_sys::{RET_ERR, RET_MISSING_CALLBACK, RET_OK}; + pub fn decode(input: &str) -> Result { serde_json::from_str(input) .map_err(|err| format!("could not deserialize waku response: {}", err)) @@ -12,6 +13,7 @@ pub fn decode(input: &str) -> Result { unsafe extern "C" fn trampoline( _ret_code: ::std::os::raw::c_int, data: *const ::std::os::raw::c_char, + data_len: usize, user_data: *mut ::std::os::raw::c_void, ) where F: FnMut(&str), @@ -21,14 +23,7 @@ unsafe extern "C" fn trampoline( let response = if data.is_null() { "" } else { - unsafe { CStr::from_ptr(data) } - .to_str() - .map_err(|err| { - format!( - "could not retrieve response from pointer returned by waku: {}", - err - ) - }) + str::from_utf8(slice::from_raw_parts(data as *mut u8, data_len)) .expect("could not retrieve response") }; diff --git a/waku-bindings/tests/node.rs b/waku-bindings/tests/node.rs index 8a69e69..d4b3eb2 100644 --- a/waku-bindings/tests/node.rs +++ b/waku-bindings/tests/node.rs @@ -3,15 +3,12 @@ use multiaddr::Multiaddr; use rand::thread_rng; use secp256k1::SecretKey; use serial_test::serial; -use std::net::IpAddr; -use std::str::FromStr; use std::time::{Duration, SystemTime}; use std::{collections::HashSet, str::from_utf8}; use tokio::sync::mpsc::{self, Sender}; use tokio::time; use waku_bindings::{ - waku_default_pubsub_topic, waku_new, waku_set_event_callback, ContentFilter, Encoding, Event, - GossipSubParams, Key, MessageId, ProtocolId, Running, WakuContentTopic, WakuLogLevel, + waku_new, waku_set_event_callback, Encoding, Event, Key, MessageId, WakuContentTopic, WakuMessage, WakuNodeConfig, WakuNodeHandle, WakuPubSubTopic, }; @@ -25,7 +22,7 @@ const NODES: &[&str] = &[ ]; fn try_publish_relay_messages( - node: &WakuNodeHandle, + node: &WakuNodeHandle, msg: &WakuMessage, ) -> Result, String> { Ok(HashSet::from( @@ -33,24 +30,6 @@ fn try_publish_relay_messages( )) } -fn try_publish_lightpush_messages( - node: &WakuNodeHandle, - msg: &WakuMessage, -) -> Result, String> { - let peer_id = node - .peers() - .unwrap() - .iter() - .map(|peer| peer.peer_id()) - .find(|id| id.as_str() != node.peer_id().unwrap().as_str()) - .unwrap() - .clone(); - - Ok(HashSet::from([ - node.lightpush_publish(msg, None, peer_id, None)? - ])) -} - #[derive(Debug)] struct Response { id: MessageId, @@ -63,19 +42,7 @@ fn set_callback(tx: Sender, sk: SecretKey, ssk: Key) { let id = message.message_id(); let message = message.waku_message(); - let payload = if let Ok(message) = message - .try_decode_asymmetric(&sk) - .map_err(|e| println!("{e}")) - { - message.data().to_vec() - } else if let Ok(message) = message - .try_decode_symmetric(&ssk) - .map_err(|e| println!("{e}")) - { - message.data().to_vec() - } else { - message.payload().to_vec() - }; + let payload = message.payload().to_vec(); futures::executor::block_on(tx.send(Response { id: id.to_string(), @@ -87,7 +54,7 @@ fn set_callback(tx: Sender, sk: SecretKey, ssk: Key) { } async fn test_echo_messages( - node: &WakuNodeHandle, + node: &WakuNodeHandle, content: &'static str, content_topic: WakuContentTopic, sk: SecretKey, @@ -112,8 +79,6 @@ async fn test_echo_messages( let mut ids = try_publish_relay_messages(node, &message).expect("send relay messages"); - ids.extend(try_publish_lightpush_messages(node, &message).expect("send lightpush messages")); - while let Some(res) = rx.recv().await { if ids.take(&res.id).is_some() { let msg = from_utf8(&res.payload).expect("should be valid message"); @@ -126,98 +91,28 @@ async fn test_echo_messages( } } -#[ignore] -#[tokio::test] -#[serial] -async fn discv5_echo() -> Result<(), String> { - let config = WakuNodeConfig { - host: IpAddr::from_str("0.0.0.0").ok(), - log_level: Some(WakuLogLevel::Error), - discv5: Some(true), - discv5_udp_port: Some(9000), - discv5_bootstrap_nodes: Vec::new(), - ..Default::default() - }; - - let node = waku_new(Some(config))?; - let node = node.start()?; - println!("Node peer id: {}", node.peer_id()?); - - for node_address in NODES { - let address: Multiaddr = node_address.parse().unwrap(); - let peer_id = node.add_peer(&address, ProtocolId::Relay)?; - node.connect_peer_with_id(&peer_id, None)?; - } - - assert!(node.peers()?.len() >= NODES.len()); - assert!(node.peer_count()? >= NODES.len()); - - assert!(node.relay_enough_peers(None)?); - let sk = SecretKey::new(&mut thread_rng()); - let ssk = Aes256Gcm::generate_key(&mut thread_rng()); - - // Subscribe to default channel. - let content_filter = ContentFilter::new(Some(waku_default_pubsub_topic()), vec![]); - node.relay_subscribe(&content_filter)?; - let content_topic = WakuContentTopic::new("toychat", "2", "huilong", Encoding::Proto); - - let topics = node.relay_topics()?; - let default_topic = waku_default_pubsub_topic(); - assert!(topics.len() == 1); - let topic: WakuPubSubTopic = topics[0].parse().unwrap(); - - assert!(topic == default_topic); - - let sleep = time::sleep(Duration::from_secs(ECHO_TIMEOUT)); - tokio::pin!(sleep); - - // Send and receive messages. Waits until all messages received. - let got_all = tokio::select! { - _ = sleep => false, - _ = test_echo_messages(&node, ECHO_MESSAGE, content_topic, sk, ssk) => true, - }; - - assert!(got_all); - - for node_data in node.peers()? { - if node_data.peer_id() != &node.peer_id()? { - node.disconnect_peer_with_id(node_data.peer_id())?; - } - } - - node.stop()?; - Ok(()) -} - #[ignore] #[tokio::test] #[serial] async fn default_echo() -> Result<(), String> { let config = WakuNodeConfig { - log_level: Some(WakuLogLevel::Error), ..Default::default() }; let node = waku_new(Some(config))?; - let node = node.start()?; - println!("Node peer id: {}", node.peer_id()?); + + node.start()?; for node_address in NODES { let address: Multiaddr = node_address.parse().unwrap(); - let peer_id = node.add_peer(&address, ProtocolId::Relay)?; - node.connect_peer_with_id(&peer_id, None)?; + node.connect(&address, None)?; } - - assert!(node.peers()?.len() >= NODES.len()); - assert!(node.peer_count()? >= NODES.len()); - - assert!(node.relay_enough_peers(None)?); let sk = SecretKey::new(&mut thread_rng()); let ssk = Aes256Gcm::generate_key(&mut thread_rng()); // subscribe to default channel - let content_filter = ContentFilter::new(Some(waku_default_pubsub_topic()), vec![]); node.relay_subscribe(&content_filter)?; + let content_topic = WakuContentTopic::new("toychat", "2", "huilong", Encoding::Proto); let sleep = time::sleep(Duration::from_secs(ECHO_TIMEOUT)); @@ -231,87 +126,6 @@ async fn default_echo() -> Result<(), String> { assert!(got_all); - for node_data in node.peers()? { - if node_data.peer_id() != &node.peer_id()? { - node.disconnect_peer_with_id(node_data.peer_id())?; - } - } - - node.stop()?; - Ok(()) -} - -#[test] -#[serial] -fn gossipsub_config() -> Result<(), String> { - let params = GossipSubParams { - d: Some(6), - dlo: Some(3), - dhi: Some(12), - dscore: Some(10), - dout: Some(8), - history_length: Some(500), - history_gossip: Some(3), - dlazy: Some(12), - gossip_factor: Some(0.25), - gossip_retransmission: Some(4), - heartbeat_initial_delay_ms: Some(500), - heartbeat_interval_seconds: Some(60), - slow_heartbeat_warning: Some(0.5), - fanout_ttl_seconds: Some(60), - prune_peers: Some(3), - prune_backoff_seconds: Some(900), - unsubscribe_backoff_seconds: Some(60), - connectors: Some(3), - max_pending_connections: Some(50), - connection_timeout_seconds: Some(15), - direct_connect_ticks: Some(5), - direct_connect_initial_delay_seconds: Some(5), - opportunistic_graft_ticks: Some(8), - opportunistic_graft_peers: Some(2), - graft_flood_threshold_seconds: Some(120), - max_ihave_length: Some(32), - max_ihave_messages: Some(8), - iwant_followup_time_seconds: Some(120), - seen_messages_ttl_seconds: Some(120), - }; - - let config = WakuNodeConfig { - gossipsub_params: params.into(), - log_level: Some(WakuLogLevel::Error), - ..Default::default() - }; - - let node = waku_new(Some(config))?; - let node = node.start()?; - node.stop()?; - Ok(()) -} - -#[test] -#[serial] -fn loglevel_error() -> Result<(), String> { - let config = WakuNodeConfig { - log_level: Some(WakuLogLevel::Error), - ..Default::default() - }; - - let node = waku_new(Some(config))?; - let node = node.start()?; - node.stop()?; - Ok(()) -} - -#[test] -#[serial] -fn loglevel_info() -> Result<(), String> { - let config = WakuNodeConfig { - log_level: Some(WakuLogLevel::Info), - ..Default::default() - }; - - let node = waku_new(Some(config))?; - let node = node.start()?; node.stop()?; Ok(()) } @@ -323,9 +137,9 @@ fn node_restart() { for _ in 0..3 { let node = waku_new(config.clone().into()).expect("default config should be valid"); - let node = node.start().expect("node should start with valid config"); - assert!(node.peer_id().is_ok()); + node.start().expect("node should start with valid config"); + node.stop().expect("node should stop"); } }