From fb0805ce409f3ab9ecaab30c0d8f56fbc81d11ca Mon Sep 17 00:00:00 2001 From: Daniel Sanchez Date: Thu, 6 Oct 2022 15:28:25 +0200 Subject: [PATCH] Peers handling (#7) * Refactor node -> management * Implemented peers * Pipe peers methods to waku node handle --- waku/src/general/mod.rs | 2 + waku/src/lib.rs | 2 +- waku/src/{node_management => node}/config.rs | 0 .../node.rs => node/management.rs} | 4 +- waku/src/{node_management => node}/mod.rs | 49 ++++- waku/src/node/peers.rs | 168 ++++++++++++++++++ 6 files changed, 214 insertions(+), 11 deletions(-) rename waku/src/{node_management => node}/config.rs (100%) rename waku/src/{node_management/node.rs => node/management.rs} (96%) rename waku/src/{node_management => node}/mod.rs (59%) create mode 100644 waku/src/node/peers.rs diff --git a/waku/src/general/mod.rs b/waku/src/general/mod.rs index 43e1b27..d6de08b 100644 --- a/waku/src/general/mod.rs +++ b/waku/src/general/mod.rs @@ -6,6 +6,8 @@ use serde::{Deserialize, Serialize}; pub type PubsubTopic = String; pub type ContentTopic = String; pub type WakuMessageVersion = usize; +/// Base58 encoded peer id +pub type PeerId = String; /// JsonResponse wrapper. /// `go-waku` ffi returns this type as a `char *` as per the [specification](https://rfc.vac.dev/spec/36/#jsonresponse-type) diff --git a/waku/src/lib.rs b/waku/src/lib.rs index 95aa506..3494379 100644 --- a/waku/src/lib.rs +++ b/waku/src/lib.rs @@ -1,6 +1,6 @@ mod events; mod general; -mod node_management; +mod node; #[cfg(test)] mod tests { diff --git a/waku/src/node_management/config.rs b/waku/src/node/config.rs similarity index 100% rename from waku/src/node_management/config.rs rename to waku/src/node/config.rs diff --git a/waku/src/node_management/node.rs b/waku/src/node/management.rs similarity index 96% rename from waku/src/node_management/node.rs rename to waku/src/node/management.rs index 5f02897..9c7f36e 100644 --- a/waku/src/node_management/node.rs +++ b/waku/src/node/management.rs @@ -79,9 +79,7 @@ pub fn waku_listen_addressses() -> Result> { #[cfg(test)] mod test { use super::waku_new; - use crate::node_management::node::{ - waku_listen_addressses, waku_peer_id, waku_start, waku_stop, - }; + use crate::node::management::{waku_listen_addressses, waku_peer_id, waku_start, waku_stop}; #[test] fn waku_flow() { diff --git a/waku/src/node_management/mod.rs b/waku/src/node/mod.rs similarity index 59% rename from waku/src/node_management/mod.rs rename to waku/src/node/mod.rs index c19d941..d8821a5 100644 --- a/waku/src/node_management/mod.rs +++ b/waku/src/node/mod.rs @@ -1,15 +1,18 @@ mod config; -mod node; +mod management; +mod peers; // std use multiaddr::Multiaddr; use std::marker::PhantomData; use std::sync::Mutex; +use std::time::Duration; // crates // internal -use crate::general::Result; +use crate::general::{PeerId, Result}; pub use config::WakuNodeConfig; +pub use peers::{Protocol, WakuPeerData, WakuPeers}; /// Shared flag to check if a waku node is already running in the current process static WAKU_NODE_INITIALIZED: Mutex = Mutex::new(false); @@ -30,11 +33,15 @@ pub struct WakuNodeHandle(PhantomData); impl WakuNodeHandle { pub fn peer_id(&self) -> Result { - node::waku_peer_id() + management::waku_peer_id() } pub fn listen_addresses(&self) -> Result> { - node::waku_listen_addressses() + management::waku_listen_addressses() + } + + pub fn add_peer(&mut self, address: Multiaddr, protocol_id: usize) -> Result { + peers::waku_add_peers(address, protocol_id) } } fn stop_node() -> Result<()> { @@ -42,12 +49,12 @@ fn stop_node() -> Result<()> { .lock() .expect("Access to the mutex at some point"); *node_initialized = false; - node::waku_stop().map(|_| ()) + management::waku_stop().map(|_| ()) } impl WakuNodeHandle { pub fn start(self) -> Result> { - node::waku_start().map(|_| WakuNodeHandle(Default::default())) + management::waku_start().map(|_| WakuNodeHandle(Default::default())) } pub fn stop(self) -> Result<()> { @@ -59,6 +66,34 @@ impl WakuNodeHandle { pub fn stop(self) -> Result<()> { stop_node() } + + pub fn connect_peer_with_address( + &mut self, + address: Multiaddr, + timeout: Option, + ) -> Result<()> { + peers::waku_connect_peer_with_address(address, timeout) + } + + pub fn connect_peer_with_id( + &mut self, + peer_id: PeerId, + timeout: Option, + ) -> Result<()> { + peers::waku_connect_peer_with_id(peer_id, timeout) + } + + pub fn disconnect_peer_with_id(&mut self, peer_id: PeerId) -> Result<()> { + peers::waku_disconnect_peer_with_id(peer_id) + } + + pub fn peer_count(&self) -> Result { + peers::waku_peer_count() + } + + pub fn peers(&self) -> Result { + peers::waku_peers() + } } pub fn waku_new(config: Option) -> Result> { @@ -69,7 +104,7 @@ pub fn waku_new(config: Option) -> Result Result { + let response = unsafe { + CStr::from_ptr(waku_sys::waku_add_peer( + CString::new(address.to_string()) + .expect("CString should build properly from the address") + .into_raw(), + CString::new(protocol_id.to_string()) + .expect("CString should build properly from the protocol id") + .into_raw(), + )) + } + .to_str() + .expect("&str should build properly from the returning response"); + + let result: JsonResponse = + serde_json::from_str(response).expect("JsonResponse should always succeed to deserialize"); + + result.into() +} + +/// Dial peer using a multiaddress +/// If `timeout` as milliseconds doesn't fit into a `i32` it is clamped to [`i32::MAX`] +/// If the function execution takes longer than `timeout` value, the execution will be canceled and an error returned. +/// Use 0 for no timeout +/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_connect_peerchar-address-int-timeoutms) +pub fn waku_connect_peer_with_address(address: Multiaddr, timeout: Option) -> Result<()> { + let response = unsafe { + CStr::from_ptr(waku_sys::waku_connect( + CString::new(address.to_string()) + .expect("CString should build properly from multiaddress") + .into_raw(), + timeout + .map(|duration| duration.as_millis().try_into().unwrap_or(i32::MAX)) + .unwrap_or(0), + )) + } + .to_str() + .expect("&str should build properly from the returning response"); + + let result: JsonResponse = + serde_json::from_str(response).expect("JsonResponse should always succeed to deserialize"); + + Result::from(result).map(|_| ()) +} + +/// Dial peer using a peer id +/// If `timeout` as milliseconds doesn't fit into a `i32` it is clamped to [`i32::MAX`] +/// The peer must be already known. +/// It must have been added before with [`waku_add_peer`] or previously dialed with [`waku_connect_peer_with_address`] +/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_connect_peeridchar-peerid-int-timeoutms) +pub fn waku_connect_peer_with_id(peer_id: PeerId, timeout: Option) -> Result<()> { + let response = unsafe { + CStr::from_ptr(waku_sys::waku_connect( + CString::new(peer_id) + .expect("CString should build properly from peer id") + .into_raw(), + timeout + .map(|duration| duration.as_millis().try_into().unwrap_or(i32::MAX)) + .unwrap_or(0), + )) + } + .to_str() + .expect("&str should build properly from the returning response"); + + let result: JsonResponse = + serde_json::from_str(response).expect("JsonResponse should always succeed to deserialize"); + + Result::from(result).map(|_| ()) +} + +/// Disconnect a peer using its peer id +/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_disconnect_peerchar-peerid) +pub fn waku_disconnect_peer_with_id(peer_id: PeerId) -> Result<()> { + let response = unsafe { + CStr::from_ptr(waku_sys::waku_disconnect( + CString::new(peer_id) + .expect("CString should build properly from peer id") + .into_raw(), + )) + } + .to_str() + .expect("&str should build properly from the returning response"); + + let result: JsonResponse = + serde_json::from_str(response).expect("JsonResponse should always succeed to deserialize"); + + Result::from(result).map(|_| ()) +} + +/// Get number of connected peers +/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_peer_count) +pub fn waku_peer_count() -> Result { + let response = unsafe { CStr::from_ptr(waku_sys::waku_peer_cnt()) } + .to_str() + .expect("&str should build properly from the returning response"); + + let result: JsonResponse = + serde_json::from_str(response).expect("JsonResponse should always succeed to deserialize"); + + result.into() +} + +pub type Protocol = String; + +/// Peer data from known/connected waku nodes +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct WakuPeerData { + /// Waku peer id + #[serde(alias = "peerID")] + peer_id: PeerId, + /// Supported node protocols + protocols: Vec, + /// Node available addresses + #[serde(alias = "addrs")] + addresses: Vec, + /// Already connected flag + connected: bool, +} + +/// List of [`WakuPeerData`], return value from [`waku_peers`] funtion +pub type WakuPeers = Vec; + +/// Retrieve the list of peers known by the Waku node +/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_peers) +pub fn waku_peers() -> Result { + let response = unsafe { CStr::from_ptr(waku_sys::waku_peers()) } + .to_str() + .expect("&str should build properly from the returning response"); + + let result: JsonResponse = + serde_json::from_str(response).expect("JsonResponse should always succeed to deserialize"); + + result.into() +} + +#[cfg(test)] +mod tests { + use crate::node::peers::WakuPeerData; + + #[test] + fn deserialize_waku_peer_data() { + let json_str = r#"{ + "peerID": "16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47RedcBafeDCBA", + "protocols": [ + "/ipfs/id/1.0.0", + "/vac/waku/relay/2.0.0", + "/ipfs/ping/1.0.0" + ], + "addrs": [ + "/ip4/1.2.3.4/tcp/30303" + ], + "connected": true + }"#; + let data: WakuPeerData = serde_json::from_str(json_str).unwrap(); + } +}