diff --git a/Cargo.lock b/Cargo.lock index cf0a16b..cc547ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,41 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "aead" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c192eb8f11fc081b0fe4259ba5af04217d4e0faddd02417310a927911abd7c8" +dependencies = [ + "crypto-common", + "generic-array", +] + +[[package]] +name = "aes" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfe0133578c0986e1fe3dfcd4af1cc5b2dd6c3dbf534d69916ce16a2701d40ba" +dependencies = [ + "cfg-if", + "cipher", + "cpufeatures", +] + +[[package]] +name = "aes-gcm" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82e1366e0c69c9f927b1fa5ce2c7bf9eafc8f9268c0b9800729e8b267612447c" +dependencies = [ + "aead", + "aes", + "cipher", + "ctr", + "ghash", + "subtle", +] + [[package]] name = "aho-corasick" version = "0.7.19" @@ -105,6 +140,16 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cipher" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1873270f8f7942c191139cb8a40fd228da6c3fd2fc376d7e92d47aa14aeb59e" +dependencies = [ + "crypto-common", + "inout", +] + [[package]] name = "clang-sys" version = "1.4.0" @@ -140,6 +185,26 @@ dependencies = [ "os_str_bytes", ] +[[package]] +name = "const_format" +version = "0.2.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "939dc9e2eb9077e0679d2ce32de1ded8531779360b003b4a972a7a39ec263495" +dependencies = [ + "const_format_proc_macros", +] + +[[package]] +name = "const_format_proc_macros" +version = "0.2.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef196d5d972878a48da7decb7686eded338b4858fbabeed513d63a7c98b2b82d" +dependencies = [ + "proc-macro2", + "quote", + "unicode-xid", +] + [[package]] name = "core2" version = "0.4.0" @@ -164,6 +229,17 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "rand_core", + "typenum", +] + [[package]] name = "crypto-mac" version = "0.8.0" @@ -174,6 +250,15 @@ dependencies = [ "subtle", ] +[[package]] +name = "ctr" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0369ee1ad671834580515889b80f2ea915f23b8be8d0daa4bbaf2ac5c7590835" +dependencies = [ + "cipher", +] + [[package]] name = "data-encoding" version = "2.3.2" @@ -238,6 +323,16 @@ dependencies = [ "wasi", ] +[[package]] +name = "ghash" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d930750de5717d2dd0b8c0d42c076c0e884c81a73e6cab859bbd2339c71e3e40" +dependencies = [ + "opaque-debug", + "polyval", +] + [[package]] name = "glob" version = "0.3.0" @@ -312,6 +407,15 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "inout" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5" +dependencies = [ + "generic-array", +] + [[package]] name = "itoa" version = "1.0.3" @@ -498,6 +602,18 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" +[[package]] +name = "polyval" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ef234e08c11dfcb2e56f79fd70f6f2eb7f025c0ce2333e82f4f0518ecad30c6" +dependencies = [ + "cfg-if", + "cpufeatures", + "opaque-debug", + "universal-hash", +] + [[package]] name = "ppv-lite86" version = "0.2.16" @@ -666,6 +782,30 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3" +[[package]] +name = "sscanf" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ecdd7ea17bcadebf81d656db919f58f96c1d194d748cf0839a44a220123eedd" +dependencies = [ + "const_format", + "lazy_static", + "regex", + "sscanf_macro", +] + +[[package]] +name = "sscanf_macro" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe2309d255caf220c1ff9f380d89420a1377de1cabc1d57e0b308e53b0406bed" +dependencies = [ + "proc-macro2", + "quote", + "regex-syntax", + "syn", +] + [[package]] name = "static_assertions" version = "1.1.0" @@ -799,6 +939,16 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" +[[package]] +name = "universal-hash" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d3160b73c9a19f7e2939a2fdad446c57c1bbbbf4d919d3213ff1267a580d8b5" +dependencies = [ + "crypto-common", + "subtle", +] + [[package]] name = "unsigned-varint" version = "0.7.1" @@ -826,12 +976,14 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" name = "waku" version = "0.1.0" dependencies = [ + "aes-gcm", "hex", "libsecp256k1", "multiaddr", "once_cell", "serde", "serde_json", + "sscanf", "waku-sys", ] diff --git a/waku/Cargo.toml b/waku/Cargo.toml index 0c23ead..d9eed35 100644 --- a/waku/Cargo.toml +++ b/waku/Cargo.toml @@ -6,10 +6,12 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +aes-gcm = { version = "0.10", features = ["aes"] } hex = "0.4" libsecp256k1 = "0.7" multiaddr = "0.14" once_cell = "1.15" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +sscanf = "0.3" waku-sys = { path = "../waku-sys" } diff --git a/waku/src/events/mod.rs b/waku/src/events/mod.rs index 39ffdd2..535e3e3 100644 --- a/waku/src/events/mod.rs +++ b/waku/src/events/mod.rs @@ -6,7 +6,7 @@ use std::sync::RwLock; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; // internal -use crate::general::{PubsubTopic, WakuMessage}; +use crate::general::{WakuMessage, WakuPubSubTopic}; #[derive(Serialize, Deserialize)] pub struct Signal { @@ -26,7 +26,7 @@ pub enum Event { #[serde(rename_all = "camelCase")] pub struct WakuMessageEvent { /// The pubsub topic on which the message was received - pubsub_topic: PubsubTopic, + pubsub_topic: WakuPubSubTopic, /// The message id message_id: String, /// The message in [`WakuMessage`] format @@ -34,7 +34,7 @@ pub struct WakuMessageEvent { } impl WakuMessageEvent { - pub fn pubsub_topic(&self) -> &PubsubTopic { + pub fn pubsub_topic(&self) -> &WakuPubSubTopic { &self.pubsub_topic } diff --git a/waku/src/general/mod.rs b/waku/src/general/mod.rs index d6de08b..81b8d57 100644 --- a/waku/src/general/mod.rs +++ b/waku/src/general/mod.rs @@ -1,13 +1,15 @@ // std +use std::fmt::{Display, Formatter}; +use std::str::FromStr; // crates -use serde::{Deserialize, Serialize}; +use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; +use sscanf::{scanf, RegexRepresentation}; // internal -pub type PubsubTopic = String; -pub type ContentTopic = String; pub type WakuMessageVersion = usize; /// Base58 encoded peer id pub type PeerId = String; +pub type MessageId = String; /// JsonResponse wrapper. /// `go-waku` ffi returns this type as a `char *` as per the [specification](https://rfc.vac.dev/spec/36/#jsonresponse-type) @@ -34,12 +36,12 @@ impl From> for Result { /// JsonMessage, Waku message in JSON format. /// as per the [specification](https://rfc.vac.dev/spec/36/#jsonmessage-type) -#[derive(Serialize, Deserialize)] +#[derive(Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct WakuMessage { payload: Box<[u8]>, /// The content topic to be set on the message - content_topic: ContentTopic, + content_topic: WakuContentTopic, /// The Waku Message version number version: WakuMessageVersion, /// Unix timestamp in nanoseconds @@ -60,30 +62,30 @@ pub struct DecodedPayload { /// The content topic of a Waku message /// as per the [specification](https://rfc.vac.dev/spec/36/#contentfilter-type) -#[derive(Serialize, Deserialize)] +#[derive(Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct ContentFilter { /// The content topic of a Waku message - content_topic: ContentTopic, + content_topic: WakuContentTopic, } /// The criteria to create subscription to a light node in JSON Format /// as per the [specification](https://rfc.vac.dev/spec/36/#filtersubscription-type) -#[derive(Serialize, Deserialize)] +#[derive(Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct FilterSubscription { /// Array of [`ContentFilter`] being subscribed to / unsubscribed from content_filters: Vec, /// Optional pubsub topic - pubsub_topic: Option, + pubsub_topic: Option, } /// Criteria used to retrieve historical messages -#[derive(Serialize)] +#[derive(Clone, Serialize)] #[serde(rename_all = "camelCase")] pub struct StoreQuery { /// The pubsub topic on which messages are published - pubsub_topic: Option, + pubsub_topic: Option, /// Array of [`ContentFilter`] to query for historical messages content_filters: Vec, /// The inclusive lower bound on the timestamp of queried messages. @@ -97,7 +99,7 @@ pub struct StoreQuery { } /// The response received after doing a query to a store node -#[derive(Deserialize)] +#[derive(Clone, Deserialize)] #[serde(rename_all = "camelCase")] pub struct StoreResponse { /// Array of retrieved historical messages in [`WakuMessage`] format @@ -107,7 +109,7 @@ pub struct StoreResponse { } /// Paging information -#[derive(Serialize, Deserialize)] +#[derive(Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct PagingOptions { /// Number of messages to retrieve per page @@ -120,7 +122,7 @@ pub struct PagingOptions { forward: bool, } -#[derive(Serialize, Deserialize)] +#[derive(Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct MessageIndex { /// Hash of the message at this [`MessageIndex`] @@ -130,5 +132,156 @@ pub struct MessageIndex { /// UNIX timestamp in nanoseconds at which the message is generated by its sender sender_time: usize, /// The pubsub topic of the message at this [`MessageIndex`] - pubsub_topic: PubsubTopic, + pubsub_topic: WakuPubSubTopic, +} + +#[derive(Copy, Clone)] +pub enum Encoding { + Proto, + Rlp, + Rfc26, +} + +impl Display for Encoding { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let s = match self { + Encoding::Proto => "proto", + Encoding::Rlp => "rlp", + Encoding::Rfc26 => "rfc26", + }; + f.write_str(s) + } +} + +impl FromStr for Encoding { + type Err = String; + + fn from_str(s: &str) -> std::result::Result { + match s.to_lowercase().as_str() { + "proto" => Ok(Self::Proto), + "rlp" => Ok(Self::Rlp), + "rfc26" => Ok(Self::Rfc26), + encoding => Err(format!("Unrecognized encoding: {}", encoding)), + } + } +} + +impl RegexRepresentation for Encoding { + const REGEX: &'static str = r"\w"; +} + +#[derive(Clone)] +pub struct WakuContentTopic { + application_name: String, + version: usize, + content_topic_name: String, + encoding: Encoding, +} + +impl FromStr for WakuContentTopic { + type Err = String; + + fn from_str(s: &str) -> std::result::Result { + if let Ok((application_name, version, content_topic_name, encoding)) = + scanf!(s, "/{}/{}/{}/{}", String, usize, String, Encoding) + { + Ok(WakuContentTopic { + application_name, + version, + content_topic_name, + encoding, + }) + } else { + Err( + format!( + "Wrong pub-sub topic format. Should be `/{{application-name}}/{{version-of-the-application}}/{{content-topic-name}}/{{encoding}}`. Got: {}", + s + ) + ) + } + } +} + +impl Display for WakuContentTopic { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "/{}/{}/{}/{}", + self.application_name, self.version, self.content_topic_name, self.encoding + ) + } +} + +impl Serialize for WakuContentTopic { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: Serializer, + { + self.to_string().serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for WakuContentTopic { + fn deserialize(deserializer: D) -> std::result::Result + where + D: Deserializer<'de>, + { + let as_string: String = String::deserialize(deserializer)?; + as_string + .parse::() + .map_err(D::Error::custom) + } +} + +#[derive(Clone)] +pub struct WakuPubSubTopic { + topic_name: String, + encoding: Encoding, +} + +impl FromStr for WakuPubSubTopic { + type Err = String; + + fn from_str(s: &str) -> std::result::Result { + if let Ok((topic_name, encoding)) = scanf!(s, "/waku/v2/{}/{}", String, Encoding) { + Ok(WakuPubSubTopic { + topic_name, + encoding, + }) + } else { + Err( + format!( + "Wrong pub-sub topic format. Should be `/waku/2/{{topic-name}}/{{encoding}}`. Got: {}", + s + ) + ) + } + } +} + +impl Display for WakuPubSubTopic { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "/waku/2/{}/{}", self.topic_name, self.encoding) + } +} + +impl Serialize for WakuPubSubTopic { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: Serializer, + { + self.to_string().serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for WakuPubSubTopic { + fn deserialize(deserializer: D) -> std::result::Result + where + D: Deserializer<'de>, + { + let as_string: String = String::deserialize(deserializer)?; + as_string + .parse::() + .map_err(D::Error::custom) + } } diff --git a/waku/src/node/management.rs b/waku/src/node/management.rs index 9c7f36e..cd55c56 100644 --- a/waku/src/node/management.rs +++ b/waku/src/node/management.rs @@ -4,7 +4,7 @@ use std::ffi::{CStr, CString}; // crates // internal use super::config::WakuNodeConfig; -use crate::general::{JsonResponse, Result}; +use crate::general::{JsonResponse, PeerId, Result}; /// Instantiates a Waku node /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_newchar-jsonconfig) @@ -52,7 +52,7 @@ pub fn waku_stop() -> Result { /// If the execution is successful, the result is the peer ID as a string (base58 encoded) /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop) -pub fn waku_peer_id() -> Result { +pub fn waku_peer_id() -> Result { let response = unsafe { CStr::from_ptr(waku_sys::waku_peerid()) } .to_str() .expect("Response should always succeed to load to a &str"); @@ -65,7 +65,7 @@ pub fn waku_peer_id() -> Result { /// Get the multiaddresses the Waku node is listening to /// as per [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_listen_addresses) -pub fn waku_listen_addressses() -> Result> { +pub fn waku_listen_addresses() -> Result> { let response = unsafe { CStr::from_ptr(waku_sys::waku_listen_addresses()) } .to_str() .expect("Response should always succeed to load to a &str"); @@ -79,7 +79,7 @@ pub fn waku_listen_addressses() -> Result> { #[cfg(test)] mod test { use super::waku_new; - use crate::node::management::{waku_listen_addressses, waku_peer_id, waku_start, waku_stop}; + use crate::node::management::{waku_listen_addresses, waku_peer_id, waku_start, waku_stop}; #[test] fn waku_flow() { @@ -91,7 +91,7 @@ mod test { assert!(!id.is_empty()); // test addresses, since we cannot start different instances of the node - let addresses = waku_listen_addressses().unwrap(); + let addresses = waku_listen_addresses().unwrap(); dbg!(&addresses); assert!(!addresses.is_empty()); diff --git a/waku/src/node/mod.rs b/waku/src/node/mod.rs index d8821a5..e59d928 100644 --- a/waku/src/node/mod.rs +++ b/waku/src/node/mod.rs @@ -1,18 +1,22 @@ mod config; mod management; mod peers; +mod relay; // std +use aes_gcm::{Aes256Gcm, Key}; +use libsecp256k1::{PublicKey, SecretKey}; use multiaddr::Multiaddr; use std::marker::PhantomData; use std::sync::Mutex; use std::time::Duration; // crates // internal -use crate::general::{PeerId, Result}; +use crate::general::{MessageId, PeerId, Result, WakuMessage, WakuPubSubTopic}; pub use config::WakuNodeConfig; pub use peers::{Protocol, WakuPeerData, WakuPeers}; +pub use relay::{waku_create_content_topic, waku_create_pubsub_topic, waku_dafault_pubsub_topic}; /// Shared flag to check if a waku node is already running in the current process static WAKU_NODE_INITIALIZED: Mutex = Mutex::new(false); @@ -31,19 +35,36 @@ impl WakuNodeState for Running {} pub struct WakuNodeHandle(PhantomData); +/// We do not have any inner state, so the handle should be safe to be send among threads. +unsafe impl Send for WakuNodeHandle {} + +/// References to the handle are safe to share, as they do not mutate the handle itself and +/// operations are performed by the bindings backend, which is supposed to be thread safe. +unsafe impl Sync for WakuNodeHandle {} + impl WakuNodeHandle { - pub fn peer_id(&self) -> Result { + /// If the execution is successful, the result is the peer ID as a string (base58 encoded) + /// + /// wrapper around [`management::waku_peer_id`] + pub fn peer_id(&self) -> Result { management::waku_peer_id() } + /// Get the multiaddresses the Waku node is listening to + /// + /// wrapper around [`management::waku_listen_addresses`] pub fn listen_addresses(&self) -> Result> { - management::waku_listen_addressses() + management::waku_listen_addresses() } - pub fn add_peer(&mut self, address: Multiaddr, protocol_id: usize) -> Result { + /// Add a node multiaddress and protocol to the waku node’s peerstore + /// + /// wrapper around [`peers::waku_add_peers`] + pub fn add_peer(&self, address: Multiaddr, protocol_id: usize) -> Result { peers::waku_add_peers(address, protocol_id) } } + fn stop_node() -> Result<()> { let mut node_initialized = WAKU_NODE_INITIALIZED .lock() @@ -53,49 +74,147 @@ fn stop_node() -> Result<()> { } impl WakuNodeHandle { + /// Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation + /// + /// wrapper around [`management::waku_start`] pub fn start(self) -> Result> { management::waku_start().map(|_| WakuNodeHandle(Default::default())) } + /// Stops a Waku node + /// + /// internally uses [`management::waku_stop`] pub fn stop(self) -> Result<()> { stop_node() } } impl WakuNodeHandle { + /// Stops a Waku node + /// + /// internally uses [`management::waku_stop`] pub fn stop(self) -> Result<()> { stop_node() } + /// Dial peer using a multiaddress + /// If `timeout` as milliseconds doesn't fit into a `i32` it is clamped to [`i32::MAX`] + /// If the function execution takes longer than `timeout` value, the execution will be canceled and an error returned. + /// Use 0 for no timeout + /// + /// wrapper around [`peers::waku_connect_peer_with_address`] pub fn connect_peer_with_address( - &mut self, + &self, address: Multiaddr, timeout: Option, ) -> Result<()> { peers::waku_connect_peer_with_address(address, timeout) } - pub fn connect_peer_with_id( - &mut self, - peer_id: PeerId, - timeout: Option, - ) -> Result<()> { + /// Dial peer using its peer ID + /// + /// wrapper around [`peers::waku_connect_peer_with_id`] + pub fn connect_peer_with_id(&self, peer_id: PeerId, timeout: Option) -> Result<()> { peers::waku_connect_peer_with_id(peer_id, timeout) } - pub fn disconnect_peer_with_id(&mut self, peer_id: PeerId) -> Result<()> { + /// Disconnect a peer using its peerID + /// + /// wrapper around [`peers::waku_disconnect_peer_with_id`] + pub fn disconnect_peer_with_id(&self, peer_id: PeerId) -> Result<()> { peers::waku_disconnect_peer_with_id(peer_id) } + /// Get number of connected peers + /// + /// wrapper around [`peers::waku_peer_count`] pub fn peer_count(&self) -> Result { peers::waku_peer_count() } + /// Retrieve the list of peers known by the Waku node + /// + /// wrapper around [`peers::waku_peers`] pub fn peers(&self) -> Result { peers::waku_peers() } + + /// Publish a message using Waku Relay + /// + /// wrapper around [`relay::waku_relay_publish_message`] + pub fn relay_publish_message( + &self, + message: &WakuMessage, + pubsub_topic: Option, + timeout: Duration, + ) -> Result { + relay::waku_relay_publish_message(message, pubsub_topic, timeout) + } + + /// Optionally sign, encrypt using asymmetric encryption and publish a message using Waku Relay + /// + /// wrapper around [`relay::waku_relay_publish_encrypt_asymmetric`] + pub fn relay_publish_encrypt_asymmetric( + &self, + message: &WakuMessage, + pubsub_topic: Option, + public_key: &PublicKey, + signing_key: &SecretKey, + timeout: Duration, + ) -> Result { + relay::waku_relay_publish_encrypt_asymmetric( + message, + pubsub_topic, + public_key, + signing_key, + timeout, + ) + } + + /// Optionally sign, encrypt using symmetric encryption and publish a message using Waku Relay + /// + /// wrapper around [`relay::waku_relay_publish_encrypt_symmetric`] + pub fn relay_publish_encrypt_symmetric( + &self, + message: &WakuMessage, + pubsub_topic: Option, + symmetric_key: &Key, + signing_key: &SecretKey, + timeout: Duration, + ) -> Result { + relay::waku_relay_publish_encrypt_symmetric( + message, + pubsub_topic, + symmetric_key, + signing_key, + timeout, + ) + } + + /// Determine if there are enough peers to publish a message on a given pubsub topic + /// + /// wrapper around [`relay::waku_enough_peers`] + pub fn relay_enough_peers(&self, pubsub_topic: Option) -> Result { + relay::waku_enough_peers(pubsub_topic) + } + + /// Subscribe to a Waku Relay pubsub topic to receive messages + /// + /// wrapper around [`relay::waku_relay_subscribe`] + pub fn relay_subscribe(&self, pubsub_topic: Option) -> Result<()> { + relay::waku_relay_subscribe(pubsub_topic) + } + + /// Closes the pubsub subscription to a pubsub topic. No more messages will be received from this pubsub topic + /// + /// wrapper around [`relay::waku_relay_unsubscribe`] + pub fn relay_unsubscribe(&self, pubsub_topic: Option) -> Result<()> { + relay::waku_relay_unsubscribe(pubsub_topic) + } } +/// Spawn a new Waku node with the givent configuration (default configuration if `None` provided) +/// Internally uses [`management::waku_new`] pub fn waku_new(config: Option) -> Result> { let mut node_initialized = WAKU_NODE_INITIALIZED .lock() diff --git a/waku/src/node/peers.rs b/waku/src/node/peers.rs index 59188b4..4b1a933 100644 --- a/waku/src/node/peers.rs +++ b/waku/src/node/peers.rs @@ -1,5 +1,5 @@ // std -use std::ffi::{c_char, CStr, CString}; +use std::ffi::{CStr, CString}; use std::time::Duration; // crates use multiaddr::Multiaddr; @@ -57,7 +57,7 @@ pub fn waku_connect_peer_with_address(address: Multiaddr, timeout: Option) -> Result<()> { let response = unsafe { diff --git a/waku/src/node/relay.rs b/waku/src/node/relay.rs new file mode 100644 index 0000000..4bc01bb --- /dev/null +++ b/waku/src/node/relay.rs @@ -0,0 +1,244 @@ +// std +use std::ffi::{CStr, CString}; +use std::time::Duration; +// crates +use aes_gcm::{Aes256Gcm, Key}; +use libsecp256k1::{PublicKey, SecretKey}; +// internal +use crate::general::{ + Encoding, JsonResponse, MessageId, Result, WakuContentTopic, WakuMessage, WakuPubSubTopic, +}; + +/// Create a content topic according to [RFC 23](https://rfc.vac.dev/spec/23/) +/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_content_topicchar-applicationname-unsigned-int-applicationversion-char-contenttopicname-char-encoding) +pub fn waku_create_content_topic( + application_name: &str, + application_version: usize, + content_topic_name: &str, + encoding: Encoding, +) -> WakuContentTopic { + unsafe { + CStr::from_ptr(waku_sys::waku_content_topic( + CString::new(application_name) + .expect("Application name should always transform to CString") + .into_raw(), + application_version + .try_into() + .expect("Version should fit within an u32"), + CString::new(content_topic_name) + .expect("Conmtent topic should always transform to CString") + .into_raw(), + CString::new(encoding.to_string()) + .expect("Encoding should always transform to CString") + .into_raw(), + )) + } + .to_str() + .expect("&str from result should always be extracted") + .parse() + .expect("Content topic data should be always parseable") +} + +/// Create a pubsub topic according to [RFC 23](https://rfc.vac.dev/spec/23/) +/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_pubsub_topicchar-name-char-encoding) +pub fn waku_create_pubsub_topic(topic_name: &str, encoding: Encoding) -> WakuPubSubTopic { + unsafe { + CStr::from_ptr(waku_sys::waku_pubsub_topic( + CString::new(topic_name) + .expect("Topic name should always transform to CString") + .into_raw(), + CString::new(encoding.to_string()) + .expect("Encoding should always transform to CString") + .into_raw(), + )) + } + .to_str() + .expect("&str from result should always be extracted") + .parse() + .expect("Pubsub topic data should be always parseable") +} + +/// Default pubsub topic used for exchanging waku messages defined in [RFC 10](https://rfc.vac.dev/spec/10/) +pub fn waku_dafault_pubsub_topic() -> WakuPubSubTopic { + unsafe { CStr::from_ptr(waku_sys::waku_default_pubsub_topic()) } + .to_str() + .expect("&str from result should always be extracted") + .parse() + .expect("Default pubsub topic should always be parseable") +} + +/// Publish a message using Waku Relay +/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms) +pub fn waku_relay_publish_message( + message: &WakuMessage, + pubsub_topic: Option, + timeout: Duration, +) -> Result { + let pubsub_topic = pubsub_topic + .unwrap_or_else(waku_dafault_pubsub_topic) + .to_string(); + let result = unsafe { + CStr::from_ptr(waku_sys::waku_relay_publish( + CString::new( + serde_json::to_string(&message) + .expect("WakuMessages should always be able to success serializing"), + ) + .expect("CString should build properly from the serialized waku message") + .into_raw(), + CString::new(pubsub_topic) + .expect("CString should build properly from pubsub topic") + .into_raw(), + timeout + .as_millis() + .try_into() + .expect("Duration as milliseconds should fit in a i32"), + )) + } + .to_str() + .expect("&str from result should always be extracted"); + let message_id: JsonResponse = + serde_json::from_str(result).expect("JsonResponse should always succeed to deserialize"); + message_id.into() +} + +/// Optionally sign, encrypt using asymmetric encryption and publish a message using Waku Relay +/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publish_enc_asymmetricchar-messagejson-char-pubsubtopic-char-publickey-char-optionalsigningkey-int-timeoutms) +pub fn waku_relay_publish_encrypt_asymmetric( + message: &WakuMessage, + pubsub_topic: Option, + public_key: &PublicKey, + signing_key: &SecretKey, + timeout: Duration, +) -> Result { + let pk = hex::encode(public_key.serialize()); + let sk = hex::encode(signing_key.serialize()); + let pubsub_topic = pubsub_topic + .unwrap_or_else(waku_dafault_pubsub_topic) + .to_string(); + let result = unsafe { + CStr::from_ptr(waku_sys::waku_relay_publish_enc_asymmetric( + CString::new( + serde_json::to_string(&message) + .expect("WakuMessages should always be able to success serializing"), + ) + .expect("CString should build properly from the serialized waku message") + .into_raw(), + CString::new(pubsub_topic) + .expect("CString should build properly from pubsub topic") + .into_raw(), + CString::new(pk) + .expect("CString should build properly from hex encoded public key") + .into_raw(), + CString::new(sk) + .expect("CString should build properly from hex encoded signing key") + .into_raw(), + timeout + .as_millis() + .try_into() + .expect("Duration as milliseconds should fit in a i32"), + )) + .to_str() + .expect("Response should always succeed to load to a &str") + }; + let message_id: JsonResponse = + serde_json::from_str(result).expect("JsonResponse should always succeed to deserialize"); + message_id.into() +} + +/// Optionally sign, encrypt using symmetric encryption and publish a message using Waku Relay +/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publish_enc_symmetricchar-messagejson-char-pubsubtopic-char-symmetrickey-char-optionalsigningkey-int-timeoutms) +pub fn waku_relay_publish_encrypt_symmetric( + message: &WakuMessage, + pubsub_topic: Option, + symmetric_key: &Key, + signing_key: &SecretKey, + timeout: Duration, +) -> Result { + let symk = hex::encode(symmetric_key.as_slice()); + let sk = hex::encode(signing_key.serialize()); + let pubsub_topic = pubsub_topic + .unwrap_or_else(waku_dafault_pubsub_topic) + .to_string(); + let result = unsafe { + CStr::from_ptr(waku_sys::waku_relay_publish_enc_symmetric( + CString::new( + serde_json::to_string(&message) + .expect("WakuMessages should always be able to success serializing"), + ) + .expect("CString should build properly from the serialized waku message") + .into_raw(), + CString::new(pubsub_topic) + .expect("CString should build properly from pubsub topic") + .into_raw(), + CString::new(symk) + .expect("CString should build properly from hex encoded symmetric key") + .into_raw(), + CString::new(sk) + .expect("CString should build properly from hex encoded signing key") + .into_raw(), + timeout + .as_millis() + .try_into() + .expect("Duration as milliseconds should fit in a i32"), + )) + .to_str() + .expect("Response should always succeed to load to a &str") + }; + let message_id: JsonResponse = + serde_json::from_str(result).expect("JsonResponse should always succeed to deserialize"); + message_id.into() +} + +pub fn waku_enough_peers(pubsub_topic: Option) -> Result { + let pubsub_topic = pubsub_topic + .unwrap_or_else(waku_dafault_pubsub_topic) + .to_string(); + let result = unsafe { + CStr::from_ptr(waku_sys::waku_relay_enough_peers( + CString::new(pubsub_topic) + .expect("CString should build properly from pubsub topic") + .into_raw(), + )) + } + .to_str() + .expect("Response should always succeed to load to a &str"); + let enough_peers: JsonResponse = + serde_json::from_str(result).expect("JsonResponse should always succeed to deserialize"); + enough_peers.into() +} + +pub fn waku_relay_subscribe(pubsub_topic: Option) -> Result<()> { + let pubsub_topic = pubsub_topic + .unwrap_or_else(waku_dafault_pubsub_topic) + .to_string(); + let result = unsafe { + CStr::from_ptr(waku_sys::waku_relay_subscribe( + CString::new(pubsub_topic) + .expect("CString should build properly from pubsub topic") + .into_raw(), + )) + } + .to_str() + .expect("Response should always succeed to load to a &str"); + let enough_peers: JsonResponse = + serde_json::from_str(result).expect("JsonResponse should always succeed to deserialize"); + Result::from(enough_peers).map(|_| ()) +} + +pub fn waku_relay_unsubscribe(pubsub_topic: Option) -> Result<()> { + let pubsub_topic = pubsub_topic + .unwrap_or_else(waku_dafault_pubsub_topic) + .to_string(); + let result = unsafe { + CStr::from_ptr(waku_sys::waku_relay_unsubscribe( + CString::new(pubsub_topic) + .expect("CString should build properly from pubsub topic") + .into_raw(), + )) + } + .to_str() + .expect("Response should always succeed to load to a &str"); + let enough_peers: JsonResponse = + serde_json::from_str(result).expect("JsonResponse should always succeed to deserialize"); + Result::from(enough_peers).map(|_| ()) +}