Peers handling (#7)
* Refactor node -> management * Implemented peers * Pipe peers methods to waku node handle
This commit is contained in:
parent
98350f4e1f
commit
fb0805ce40
|
@ -6,6 +6,8 @@ use serde::{Deserialize, Serialize};
|
||||||
pub type PubsubTopic = String;
|
pub type PubsubTopic = String;
|
||||||
pub type ContentTopic = String;
|
pub type ContentTopic = String;
|
||||||
pub type WakuMessageVersion = usize;
|
pub type WakuMessageVersion = usize;
|
||||||
|
/// Base58 encoded peer id
|
||||||
|
pub type PeerId = String;
|
||||||
|
|
||||||
/// JsonResponse wrapper.
|
/// JsonResponse wrapper.
|
||||||
/// `go-waku` ffi returns this type as a `char *` as per the [specification](https://rfc.vac.dev/spec/36/#jsonresponse-type)
|
/// `go-waku` ffi returns this type as a `char *` as per the [specification](https://rfc.vac.dev/spec/36/#jsonresponse-type)
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
mod events;
|
mod events;
|
||||||
mod general;
|
mod general;
|
||||||
mod node_management;
|
mod node;
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
|
|
|
@ -79,9 +79,7 @@ pub fn waku_listen_addressses() -> Result<Vec<Multiaddr>> {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use super::waku_new;
|
use super::waku_new;
|
||||||
use crate::node_management::node::{
|
use crate::node::management::{waku_listen_addressses, waku_peer_id, waku_start, waku_stop};
|
||||||
waku_listen_addressses, waku_peer_id, waku_start, waku_stop,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn waku_flow() {
|
fn waku_flow() {
|
|
@ -1,15 +1,18 @@
|
||||||
mod config;
|
mod config;
|
||||||
mod node;
|
mod management;
|
||||||
|
mod peers;
|
||||||
|
|
||||||
// std
|
// std
|
||||||
use multiaddr::Multiaddr;
|
use multiaddr::Multiaddr;
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use std::sync::Mutex;
|
use std::sync::Mutex;
|
||||||
|
use std::time::Duration;
|
||||||
// crates
|
// crates
|
||||||
// internal
|
// internal
|
||||||
use crate::general::Result;
|
use crate::general::{PeerId, Result};
|
||||||
|
|
||||||
pub use config::WakuNodeConfig;
|
pub use config::WakuNodeConfig;
|
||||||
|
pub use peers::{Protocol, WakuPeerData, WakuPeers};
|
||||||
|
|
||||||
/// Shared flag to check if a waku node is already running in the current process
|
/// Shared flag to check if a waku node is already running in the current process
|
||||||
static WAKU_NODE_INITIALIZED: Mutex<bool> = Mutex::new(false);
|
static WAKU_NODE_INITIALIZED: Mutex<bool> = Mutex::new(false);
|
||||||
|
@ -30,11 +33,15 @@ pub struct WakuNodeHandle<State: WakuNodeState>(PhantomData<State>);
|
||||||
|
|
||||||
impl<State: WakuNodeState> WakuNodeHandle<State> {
|
impl<State: WakuNodeState> WakuNodeHandle<State> {
|
||||||
pub fn peer_id(&self) -> Result<String> {
|
pub fn peer_id(&self) -> Result<String> {
|
||||||
node::waku_peer_id()
|
management::waku_peer_id()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn listen_addresses(&self) -> Result<Vec<Multiaddr>> {
|
pub fn listen_addresses(&self) -> Result<Vec<Multiaddr>> {
|
||||||
node::waku_listen_addressses()
|
management::waku_listen_addressses()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add_peer(&mut self, address: Multiaddr, protocol_id: usize) -> Result<PeerId> {
|
||||||
|
peers::waku_add_peers(address, protocol_id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fn stop_node() -> Result<()> {
|
fn stop_node() -> Result<()> {
|
||||||
|
@ -42,12 +49,12 @@ fn stop_node() -> Result<()> {
|
||||||
.lock()
|
.lock()
|
||||||
.expect("Access to the mutex at some point");
|
.expect("Access to the mutex at some point");
|
||||||
*node_initialized = false;
|
*node_initialized = false;
|
||||||
node::waku_stop().map(|_| ())
|
management::waku_stop().map(|_| ())
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WakuNodeHandle<Initialized> {
|
impl WakuNodeHandle<Initialized> {
|
||||||
pub fn start(self) -> Result<WakuNodeHandle<Running>> {
|
pub fn start(self) -> Result<WakuNodeHandle<Running>> {
|
||||||
node::waku_start().map(|_| WakuNodeHandle(Default::default()))
|
management::waku_start().map(|_| WakuNodeHandle(Default::default()))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn stop(self) -> Result<()> {
|
pub fn stop(self) -> Result<()> {
|
||||||
|
@ -59,6 +66,34 @@ impl WakuNodeHandle<Running> {
|
||||||
pub fn stop(self) -> Result<()> {
|
pub fn stop(self) -> Result<()> {
|
||||||
stop_node()
|
stop_node()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn connect_peer_with_address(
|
||||||
|
&mut self,
|
||||||
|
address: Multiaddr,
|
||||||
|
timeout: Option<Duration>,
|
||||||
|
) -> Result<()> {
|
||||||
|
peers::waku_connect_peer_with_address(address, timeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn connect_peer_with_id(
|
||||||
|
&mut self,
|
||||||
|
peer_id: PeerId,
|
||||||
|
timeout: Option<Duration>,
|
||||||
|
) -> Result<()> {
|
||||||
|
peers::waku_connect_peer_with_id(peer_id, timeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn disconnect_peer_with_id(&mut self, peer_id: PeerId) -> Result<()> {
|
||||||
|
peers::waku_disconnect_peer_with_id(peer_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn peer_count(&self) -> Result<usize> {
|
||||||
|
peers::waku_peer_count()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn peers(&self) -> Result<WakuPeers> {
|
||||||
|
peers::waku_peers()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeHandle<Initialized>> {
|
pub fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeHandle<Initialized>> {
|
||||||
|
@ -69,7 +104,7 @@ pub fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeHandle<Initial
|
||||||
return Err("Waku node is already initialized".into());
|
return Err("Waku node is already initialized".into());
|
||||||
}
|
}
|
||||||
*node_initialized = true;
|
*node_initialized = true;
|
||||||
node::waku_new(config).map(|_| WakuNodeHandle(Default::default()))
|
management::waku_new(config).map(|_| WakuNodeHandle(Default::default()))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
|
@ -0,0 +1,168 @@
|
||||||
|
// std
|
||||||
|
use std::ffi::{c_char, CStr, CString};
|
||||||
|
use std::time::Duration;
|
||||||
|
// crates
|
||||||
|
use multiaddr::Multiaddr;
|
||||||
|
use serde::Deserialize;
|
||||||
|
// internal
|
||||||
|
use crate::general::{JsonResponse, PeerId, Result};
|
||||||
|
|
||||||
|
/// Add a node multiaddress and protocol to the waku node’s peerstore.
|
||||||
|
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_add_peerchar-address-char-protocolid)
|
||||||
|
pub fn waku_add_peers(address: Multiaddr, protocol_id: usize) -> Result<PeerId> {
|
||||||
|
let response = unsafe {
|
||||||
|
CStr::from_ptr(waku_sys::waku_add_peer(
|
||||||
|
CString::new(address.to_string())
|
||||||
|
.expect("CString should build properly from the address")
|
||||||
|
.into_raw(),
|
||||||
|
CString::new(protocol_id.to_string())
|
||||||
|
.expect("CString should build properly from the protocol id")
|
||||||
|
.into_raw(),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
.to_str()
|
||||||
|
.expect("&str should build properly from the returning response");
|
||||||
|
|
||||||
|
let result: JsonResponse<PeerId> =
|
||||||
|
serde_json::from_str(response).expect("JsonResponse should always succeed to deserialize");
|
||||||
|
|
||||||
|
result.into()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Dial peer using a multiaddress
|
||||||
|
/// If `timeout` as milliseconds doesn't fit into a `i32` it is clamped to [`i32::MAX`]
|
||||||
|
/// If the function execution takes longer than `timeout` value, the execution will be canceled and an error returned.
|
||||||
|
/// Use 0 for no timeout
|
||||||
|
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_connect_peerchar-address-int-timeoutms)
|
||||||
|
pub fn waku_connect_peer_with_address(address: Multiaddr, timeout: Option<Duration>) -> Result<()> {
|
||||||
|
let response = unsafe {
|
||||||
|
CStr::from_ptr(waku_sys::waku_connect(
|
||||||
|
CString::new(address.to_string())
|
||||||
|
.expect("CString should build properly from multiaddress")
|
||||||
|
.into_raw(),
|
||||||
|
timeout
|
||||||
|
.map(|duration| duration.as_millis().try_into().unwrap_or(i32::MAX))
|
||||||
|
.unwrap_or(0),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
.to_str()
|
||||||
|
.expect("&str should build properly from the returning response");
|
||||||
|
|
||||||
|
let result: JsonResponse<bool> =
|
||||||
|
serde_json::from_str(response).expect("JsonResponse should always succeed to deserialize");
|
||||||
|
|
||||||
|
Result::from(result).map(|_| ())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Dial peer using a peer id
|
||||||
|
/// If `timeout` as milliseconds doesn't fit into a `i32` it is clamped to [`i32::MAX`]
|
||||||
|
/// The peer must be already known.
|
||||||
|
/// It must have been added before with [`waku_add_peer`] or previously dialed with [`waku_connect_peer_with_address`]
|
||||||
|
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_connect_peeridchar-peerid-int-timeoutms)
|
||||||
|
pub fn waku_connect_peer_with_id(peer_id: PeerId, timeout: Option<Duration>) -> Result<()> {
|
||||||
|
let response = unsafe {
|
||||||
|
CStr::from_ptr(waku_sys::waku_connect(
|
||||||
|
CString::new(peer_id)
|
||||||
|
.expect("CString should build properly from peer id")
|
||||||
|
.into_raw(),
|
||||||
|
timeout
|
||||||
|
.map(|duration| duration.as_millis().try_into().unwrap_or(i32::MAX))
|
||||||
|
.unwrap_or(0),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
.to_str()
|
||||||
|
.expect("&str should build properly from the returning response");
|
||||||
|
|
||||||
|
let result: JsonResponse<bool> =
|
||||||
|
serde_json::from_str(response).expect("JsonResponse should always succeed to deserialize");
|
||||||
|
|
||||||
|
Result::from(result).map(|_| ())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Disconnect a peer using its peer id
|
||||||
|
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_disconnect_peerchar-peerid)
|
||||||
|
pub fn waku_disconnect_peer_with_id(peer_id: PeerId) -> Result<()> {
|
||||||
|
let response = unsafe {
|
||||||
|
CStr::from_ptr(waku_sys::waku_disconnect(
|
||||||
|
CString::new(peer_id)
|
||||||
|
.expect("CString should build properly from peer id")
|
||||||
|
.into_raw(),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
.to_str()
|
||||||
|
.expect("&str should build properly from the returning response");
|
||||||
|
|
||||||
|
let result: JsonResponse<bool> =
|
||||||
|
serde_json::from_str(response).expect("JsonResponse should always succeed to deserialize");
|
||||||
|
|
||||||
|
Result::from(result).map(|_| ())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get number of connected peers
|
||||||
|
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_peer_count)
|
||||||
|
pub fn waku_peer_count() -> Result<usize> {
|
||||||
|
let response = unsafe { CStr::from_ptr(waku_sys::waku_peer_cnt()) }
|
||||||
|
.to_str()
|
||||||
|
.expect("&str should build properly from the returning response");
|
||||||
|
|
||||||
|
let result: JsonResponse<usize> =
|
||||||
|
serde_json::from_str(response).expect("JsonResponse should always succeed to deserialize");
|
||||||
|
|
||||||
|
result.into()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type Protocol = String;
|
||||||
|
|
||||||
|
/// Peer data from known/connected waku nodes
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct WakuPeerData {
|
||||||
|
/// Waku peer id
|
||||||
|
#[serde(alias = "peerID")]
|
||||||
|
peer_id: PeerId,
|
||||||
|
/// Supported node protocols
|
||||||
|
protocols: Vec<Protocol>,
|
||||||
|
/// Node available addresses
|
||||||
|
#[serde(alias = "addrs")]
|
||||||
|
addresses: Vec<Multiaddr>,
|
||||||
|
/// Already connected flag
|
||||||
|
connected: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// List of [`WakuPeerData`], return value from [`waku_peers`] funtion
|
||||||
|
pub type WakuPeers = Vec<WakuPeerData>;
|
||||||
|
|
||||||
|
/// Retrieve the list of peers known by the Waku node
|
||||||
|
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_peers)
|
||||||
|
pub fn waku_peers() -> Result<WakuPeers> {
|
||||||
|
let response = unsafe { CStr::from_ptr(waku_sys::waku_peers()) }
|
||||||
|
.to_str()
|
||||||
|
.expect("&str should build properly from the returning response");
|
||||||
|
|
||||||
|
let result: JsonResponse<WakuPeers> =
|
||||||
|
serde_json::from_str(response).expect("JsonResponse should always succeed to deserialize");
|
||||||
|
|
||||||
|
result.into()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use crate::node::peers::WakuPeerData;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn deserialize_waku_peer_data() {
|
||||||
|
let json_str = r#"{
|
||||||
|
"peerID": "16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47RedcBafeDCBA",
|
||||||
|
"protocols": [
|
||||||
|
"/ipfs/id/1.0.0",
|
||||||
|
"/vac/waku/relay/2.0.0",
|
||||||
|
"/ipfs/ping/1.0.0"
|
||||||
|
],
|
||||||
|
"addrs": [
|
||||||
|
"/ip4/1.2.3.4/tcp/30303"
|
||||||
|
],
|
||||||
|
"connected": true
|
||||||
|
}"#;
|
||||||
|
let data: WakuPeerData = serde_json::from_str(json_str).unwrap();
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue