diff --git a/Cargo.lock b/Cargo.lock index cc547ac..5f37f20 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -977,6 +977,7 @@ name = "waku" version = "0.1.0" dependencies = [ "aes-gcm", + "base64", "hex", "libsecp256k1", "multiaddr", diff --git a/waku/Cargo.toml b/waku/Cargo.toml index d9eed35..fb3da29 100644 --- a/waku/Cargo.toml +++ b/waku/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] aes-gcm = { version = "0.10", features = ["aes"] } +base64 = "0.13" hex = "0.4" libsecp256k1 = "0.7" multiaddr = "0.14" diff --git a/waku/src/decrypt.rs b/waku/src/decrypt.rs new file mode 100644 index 0000000..d0190cd --- /dev/null +++ b/waku/src/decrypt.rs @@ -0,0 +1,65 @@ +//! Symmetric and asymmetric waku messages [decrypting](https://rfc.vac.dev/spec/36/#decrypting-messages) methods + +// std +use std::ffi::{CStr, CString}; +// crates +use aes_gcm::{Aes256Gcm, Key}; +use libsecp256k1::SecretKey; +// internal +use crate::general::{DecodedPayload, JsonResponse, Result, WakuMessage}; + +/// Decrypt a message using a symmetric key +/// +/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_decode_symmetricchar-messagejson-char-symmetrickey) +pub fn waku_decode_symmetric( + message: &WakuMessage, + symmetric_key: &Key, +) -> Result { + let symk = hex::encode(symmetric_key.as_slice()); + let result = unsafe { + CStr::from_ptr(waku_sys::waku_decode_symmetric( + CString::new( + serde_json::to_string(&message) + .expect("WakuMessages should always be able to success serializing"), + ) + .expect("CString should build properly from the serialized waku message") + .into_raw(), + CString::new(symk) + .expect("CString should build properly from hex encoded symmetric key") + .into_raw(), + )) + } + .to_str() + .expect("Response should always succeed to load to a &str"); + let response: JsonResponse = + serde_json::from_str(result).expect("JsonResponse should always succeed to deserialize"); + response.into() +} + +/// Decrypt a message using a symmetric key +/// +/// As per the [specification](extern char* waku_decode_asymmetric(char* messageJson, char* privateKey)) +pub fn waku_decode_asymmetric( + message: &WakuMessage, + asymmetric_key: &SecretKey, +) -> Result { + let sk = hex::encode(asymmetric_key.serialize()); + let result = unsafe { + CStr::from_ptr(waku_sys::waku_decode_asymmetric( + CString::new( + serde_json::to_string(&message) + .expect("WakuMessages should always be able to success serializing"), + ) + .expect("CString should build properly from the serialized waku message") + .into_raw(), + CString::new(sk) + .expect("CString should build properly from hex encoded symmetric key") + .into_raw(), + )) + } + .to_str() + .expect("Response should always succeed to load to a &str"); + let response: JsonResponse = + serde_json::from_str(result).expect("JsonResponse should always succeed to deserialize"); + response.into() +} diff --git a/waku/src/events/mod.rs b/waku/src/events/mod.rs index 535e3e3..a9182ce 100644 --- a/waku/src/events/mod.rs +++ b/waku/src/events/mod.rs @@ -1,20 +1,38 @@ +//! Waku message [event](https://rfc.vac.dev/spec/36/#events) related items +//! +//! Asynchronous events require a callback to be registered. +//! An example of an asynchronous event that might be emitted is receiving a message. +//! When an event is emitted, this callback will be triggered receiving a [`Signal`] + // std use std::ffi::{c_char, CStr}; use std::ops::Deref; -use std::sync::RwLock; +use std::sync::Mutex; // crates use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; // internal use crate::general::{WakuMessage, WakuPubSubTopic}; +/// Event signal #[derive(Serialize, Deserialize)] pub struct Signal { + /// Type of signal being emitted. Currently, only message is available #[serde(alias = "type")] _type: String, + /// Format depends on the type of signal event: Event, } +impl Signal { + pub fn event(&self) -> &Event { + &self.event + } +} + +/// Waku event +/// For now just WakuMessage is supported +#[non_exhaustive] #[derive(Serialize, Deserialize)] #[serde(tag = "untagged", rename_all = "camelCase")] pub enum Event { @@ -49,12 +67,12 @@ impl WakuMessageEvent { /// Shared callback slot. Callbacks are registered here so they can be accessed by the extern "C" #[allow(clippy::type_complexity)] -static CALLBACK: Lazy>> = - Lazy::new(|| RwLock::new(Box::new(|_| {}))); +static CALLBACK: Lazy>> = + Lazy::new(|| Mutex::new(Box::new(|_| {}))); /// Register global callback fn set_callback(f: F) { - *CALLBACK.write().unwrap() = Box::new(f); + *CALLBACK.lock().unwrap() = Box::new(f); } /// Wrapper callback, it transformst the `*const c_char` into a [`Signal`] @@ -66,7 +84,7 @@ extern "C" fn callback(data: *const c_char) { let data: Signal = serde_json::from_str(raw_response).expect("Parsing signal to succeed"); (CALLBACK .deref() - .write() + .lock() .expect("Access to the shared callback") .as_mut())(data) } diff --git a/waku/src/general/mod.rs b/waku/src/general/mod.rs index 81b8d57..80b08fb 100644 --- a/waku/src/general/mod.rs +++ b/waku/src/general/mod.rs @@ -1,14 +1,21 @@ +//! Waku [general](https://rfc.vac.dev/spec/36/#general) types + // std use std::fmt::{Display, Formatter}; use std::str::FromStr; // crates +use aes_gcm::{Aes256Gcm, Key}; +use libsecp256k1::{PublicKey, SecretKey, Signature}; use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer}; use sscanf::{scanf, RegexRepresentation}; // internal +use crate::decrypt::{waku_decode_asymmetric, waku_decode_symmetric}; +/// Waku message version pub type WakuMessageVersion = usize; /// Base58 encoded peer id pub type PeerId = String; +/// Waku message id, hex encoded sha256 digest of the message pub type MessageId = String; /// JsonResponse wrapper. @@ -34,12 +41,14 @@ impl From> for Result { } } -/// JsonMessage, Waku message in JSON format. +// TODO: Properly type and deserialize payload form base64 encoded string +/// Waku message in JSON format. /// as per the [specification](https://rfc.vac.dev/spec/36/#jsonmessage-type) #[derive(Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct WakuMessage { - payload: Box<[u8]>, + #[serde(with = "base64_serde")] + payload: Vec, /// The content topic to be set on the message content_topic: WakuContentTopic, /// The Waku Message version number @@ -48,16 +57,87 @@ pub struct WakuMessage { timestamp: usize, } +impl WakuMessage { + pub fn new>( + payload: PAYLOAD, + content_topic: WakuContentTopic, + version: WakuMessageVersion, + timestamp: usize, + ) -> Self { + let payload = payload.as_ref().to_vec(); + Self { + payload, + content_topic, + version, + timestamp, + } + } + + pub fn payload(&self) -> &[u8] { + &self.payload + } + + pub fn content_topic(&self) -> &WakuContentTopic { + &self.content_topic + } + + pub fn version(&self) -> WakuMessageVersion { + self.version + } + + pub fn timestamp(&self) -> usize { + self.timestamp + } + + /// Try decode the message with an expected symmetric key + /// + /// wrapper around [`crate::decrypt::waku_decode_symmetric`] + pub fn try_decode_symmetric(&self, symmetric_key: &Key) -> Result { + waku_decode_symmetric(self, symmetric_key) + } + + /// Try decode the message with an expected asymmetric key + /// + /// wrapper around [`crate::decrypt::waku_decode_asymmetric`] + pub fn try_decode_asymmentric(&self, asymmetric_key: &SecretKey) -> Result { + waku_decode_asymmetric(self, asymmetric_key) + } +} + /// A payload once decoded, used when a received Waku Message is encrypted +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] pub struct DecodedPayload { /// Public key that signed the message (optional), hex encoded with 0x prefix - public_key: Option, + #[serde(deserialize_with = "deserialize_optional_pk")] + public_key: Option, /// Message signature (optional), hex encoded with 0x prefix - signature: Option, + #[serde(deserialize_with = "deserialize_optional_signature")] + signature: Option, /// Decrypted message payload base64 encoded - data: String, + #[serde(with = "base64_serde")] + data: Vec, /// Padding base64 encoded - padding: String, + #[serde(with = "base64_serde")] + padding: Vec, +} + +impl DecodedPayload { + pub fn public_key(&self) -> Option<&PublicKey> { + self.public_key.as_ref() + } + + pub fn signature(&self) -> Option<&Signature> { + self.signature.as_ref() + } + + pub fn data(&self) -> &[u8] { + &self.data + } + + pub fn padding(&self) -> &[u8] { + &self.padding + } } /// The content topic of a Waku message @@ -69,6 +149,16 @@ pub struct ContentFilter { content_topic: WakuContentTopic, } +impl ContentFilter { + pub fn new(content_topic: WakuContentTopic) -> Self { + Self { content_topic } + } + + pub fn content_topic(&self) -> &WakuContentTopic { + &self.content_topic + } +} + /// The criteria to create subscription to a light node in JSON Format /// as per the [specification](https://rfc.vac.dev/spec/36/#filtersubscription-type) #[derive(Clone, Serialize, Deserialize)] @@ -80,22 +170,32 @@ pub struct FilterSubscription { pubsub_topic: Option, } +impl FilterSubscription { + pub fn content_filters(&self) -> &[ContentFilter] { + &self.content_filters + } + + pub fn pubsub_topic(&self) -> Option<&WakuPubSubTopic> { + self.pubsub_topic.as_ref() + } +} + /// Criteria used to retrieve historical messages #[derive(Clone, Serialize)] #[serde(rename_all = "camelCase")] pub struct StoreQuery { /// The pubsub topic on which messages are published - pubsub_topic: Option, + pub pubsub_topic: Option, /// Array of [`ContentFilter`] to query for historical messages - content_filters: Vec, + pub content_filters: Vec, /// The inclusive lower bound on the timestamp of queried messages. /// This field holds the Unix epoch time in nanoseconds - start_time: Option, + pub start_time: Option, /// The inclusive upper bound on the timestamp of queried messages. /// This field holds the Unix epoch time in nanoseconds - end_time: Option, + pub end_time: Option, /// Paging information in [`PagingOptions`] format - paging_options: Option, + pub paging_options: Option, } /// The response received after doing a query to a store node @@ -108,33 +208,45 @@ pub struct StoreResponse { paging_options: Option, } +impl StoreResponse { + pub fn messages(&self) -> &[WakuMessage] { + &self.messages + } + + pub fn paging_options(&self) -> Option<&PagingOptions> { + self.paging_options.as_ref() + } +} + /// Paging information #[derive(Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct PagingOptions { /// Number of messages to retrieve per page - page_size: usize, + pub page_size: usize, /// Message Index from which to perform pagination. /// If not included and forward is set to `true`, paging will be performed from the beginning of the list. /// If not included and forward is set to `false`, paging will be performed from the end of the list - cursor: Option, + pub cursor: Option, /// `true` if paging forward, `false` if paging backward - forward: bool, + pub forward: bool, } +/// Pagination index type #[derive(Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct MessageIndex { - /// Hash of the message at this [`MessageIndex`] - digest: String, + /// Hash of the message at this [``MessageIndex`] + pub digest: String, /// UNIX timestamp in nanoseconds at which the message at this [`MessageIndex`] was received - receiver_time: usize, + pub receiver_time: usize, /// UNIX timestamp in nanoseconds at which the message is generated by its sender - sender_time: usize, + pub sender_time: usize, /// The pubsub topic of the message at this [`MessageIndex`] - pubsub_topic: WakuPubSubTopic, + pub pubsub_topic: WakuPubSubTopic, } +/// WakuMessage encoding scheme #[derive(Copy, Clone)] pub enum Encoding { Proto, @@ -170,12 +282,13 @@ impl RegexRepresentation for Encoding { const REGEX: &'static str = r"\w"; } +/// A waku content topic `/{application_name}/{version}/{content_topic_name}/{encdoing}` #[derive(Clone)] pub struct WakuContentTopic { - application_name: String, - version: usize, - content_topic_name: String, - encoding: Encoding, + pub application_name: String, + pub version: usize, + pub content_topic_name: String, + pub encoding: Encoding, } impl FromStr for WakuContentTopic { @@ -233,10 +346,20 @@ impl<'de> Deserialize<'de> for WakuContentTopic { } } +/// A waku pubsub topic in the form of `/waku/v2/{topic_name}/{encoding}` #[derive(Clone)] pub struct WakuPubSubTopic { - topic_name: String, - encoding: Encoding, + pub topic_name: String, + pub encoding: Encoding, +} + +impl WakuPubSubTopic { + pub fn new(topic_name: String, encoding: Encoding) -> Self { + Self { + topic_name, + encoding, + } + } } impl FromStr for WakuPubSubTopic { @@ -285,3 +408,53 @@ impl<'de> Deserialize<'de> for WakuPubSubTopic { .map_err(D::Error::custom) } } + +mod base64_serde { + use serde::de::Error; + use serde::{Deserialize, Deserializer, Serialize, Serializer}; + + pub fn serialize(value: &[u8], serializer: S) -> std::result::Result + where + S: Serializer, + { + base64::encode(value).serialize(serializer) + } + + pub fn deserialize<'de, D>(deserializer: D) -> std::result::Result, D::Error> + where + D: Deserializer<'de>, + { + let base64_str: String = String::deserialize(deserializer)?; + base64::decode(base64_str).map_err(D::Error::custom) + } +} + +pub fn deserialize_optional_pk<'de, D>( + deserializer: D, +) -> std::result::Result, D::Error> +where + D: Deserializer<'de>, +{ + let base64_str: Option = Option::::deserialize(deserializer)?; + base64_str + .map(|base64_str| { + let raw_bytes = base64::decode(base64_str).map_err(D::Error::custom)?; + PublicKey::parse_slice(&raw_bytes, None).map_err(D::Error::custom) + }) + .transpose() +} + +pub fn deserialize_optional_signature<'de, D>( + deserializer: D, +) -> std::result::Result, D::Error> +where + D: Deserializer<'de>, +{ + let base64_str: Option = Option::::deserialize(deserializer)?; + base64_str + .map(|base64_str| { + let raw_bytes = base64::decode(base64_str).map_err(D::Error::custom)?; + Signature::parse_der(&raw_bytes).map_err(D::Error::custom) + }) + .transpose() +} diff --git a/waku/src/lib.rs b/waku/src/lib.rs index 3494379..2679160 100644 --- a/waku/src/lib.rs +++ b/waku/src/lib.rs @@ -1,7 +1,24 @@ +//! # Waku +//! +//! Implementation on top of [`waku-bindings`](https://rfc.vac.dev/spec/36/) +mod decrypt; mod events; mod general; mod node; +pub use node::{ + waku_create_content_topic, waku_create_pubsub_topic, waku_dafault_pubsub_topic, waku_new, + Initialized, Protocol, Running, WakuNodeConfig, WakuNodeHandle, WakuPeerData, WakuPeers, +}; + +pub use general::{ + ContentFilter, DecodedPayload, Encoding, FilterSubscription, MessageId, MessageIndex, + PagingOptions, PeerId, StoreQuery, StoreResponse, WakuContentTopic, WakuMessage, + WakuMessageVersion, WakuPubSubTopic, +}; + +pub use events::{waku_set_event_callback, Event, Signal, WakuMessageEvent}; + #[cfg(test)] mod tests { use std::ffi::CStr; diff --git a/waku/src/node/config.rs b/waku/src/node/config.rs index 4f8db6c..44d9705 100644 --- a/waku/src/node/config.rs +++ b/waku/src/node/config.rs @@ -1,3 +1,5 @@ +//! Waku node [configuration](https://rfc.vac.dev/spec/36/#jsonconfig-type) related items + // std // crates use libsecp256k1::SecretKey; diff --git a/waku/src/node/filter.rs b/waku/src/node/filter.rs new file mode 100644 index 0000000..c2d3bc4 --- /dev/null +++ b/waku/src/node/filter.rs @@ -0,0 +1,72 @@ +//! Waku [filter](https://rfc.vac.dev/spec/36/#waku-filter) protocol related methods + +// std +use std::ffi::{CStr, CString}; +use std::time::Duration; +// crates + +// internal +use crate::general::Result; +use crate::general::{FilterSubscription, JsonResponse, PeerId}; + +/// Creates a subscription in a lightnode for messages that matches a content filter and optionally a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`) +/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_subscribechar-filterjson-char-peerid-int-timeoutms) +pub fn waku_filter_subscribe( + filter_subscription: &FilterSubscription, + peer_id: PeerId, + timeout: Duration, +) -> Result<()> { + let result = unsafe { + CStr::from_ptr(waku_sys::waku_filter_subscribe( + CString::new( + serde_json::to_string(filter_subscription) + .expect("FilterSubscription should always be able to be serialized"), + ) + .expect("CString should build properly from the serialized filter subscription") + .into_raw(), + CString::new(peer_id) + .expect("CString should build properly from peer id") + .into_raw(), + timeout + .as_millis() + .try_into() + .expect("Duration as milliseconds should fit in a i32"), + )) + } + .to_str() + .expect("Response should always succeed to load to a &str"); + + let response: JsonResponse = + serde_json::from_str(result).expect("JsonResponse should always succeed to deserialize"); + + Result::from(response).map(|_| ()) +} + +/// Removes subscriptions in a light node matching a content filter and, optionally, a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`) +/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_unsubscribechar-filterjson-int-timeoutms) +pub fn waku_filter_unsubscribe( + filter_subscription: &FilterSubscription, + timeout: Duration, +) -> Result<()> { + let result = unsafe { + CStr::from_ptr(waku_sys::waku_filter_unsubscribe( + CString::new( + serde_json::to_string(filter_subscription) + .expect("FilterSubscription should always be able to be serialized"), + ) + .expect("CString should build properly from the serialized filter subscription") + .into_raw(), + timeout + .as_millis() + .try_into() + .expect("Duration as milliseconds should fit in a i32"), + )) + } + .to_str() + .expect("Response should always succeed to load to a &str"); + + let response: JsonResponse = + serde_json::from_str(result).expect("JsonResponse should always succeed to deserialize"); + + Result::from(response).map(|_| ()) +} diff --git a/waku/src/node/lightpush.rs b/waku/src/node/lightpush.rs new file mode 100644 index 0000000..34ad952 --- /dev/null +++ b/waku/src/node/lightpush.rs @@ -0,0 +1,148 @@ +//! Waku [lightpush](https://rfc.vac.dev/spec/36/#waku-lightpush) protocol related methods + +// std +use std::ffi::{CStr, CString}; +use std::time::Duration; +// crates +use aes_gcm::{Aes256Gcm, Key}; +use libsecp256k1::{PublicKey, SecretKey}; +// internal +use crate::general::{JsonResponse, MessageId, PeerId, Result, WakuMessage, WakuPubSubTopic}; +use crate::node::waku_dafault_pubsub_topic; + +/// Publish a message using Waku Lightpush +/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_lightpush_publishchar-messagejson-char-topic-char-peerid-int-timeoutms) +pub fn waku_lightpush_publish( + message: &WakuMessage, + pubsub_topic: WakuPubSubTopic, + peer_id: PeerId, + timeout: Duration, +) -> Result { + let result = unsafe { + CStr::from_ptr(waku_sys::waku_lightpush_publish( + CString::new( + serde_json::to_string(&message) + .expect("WakuMessages should always be able to success serializing"), + ) + .expect("CString should build properly from the serialized waku message") + .into_raw(), + CString::new(pubsub_topic.to_string()) + .expect("CString should build properly from pubsub topic") + .into_raw(), + CString::new(peer_id) + .expect("CString should build properly from peer id") + .into_raw(), + timeout + .as_millis() + .try_into() + .expect("Duration as milliseconds should fit in a i32"), + )) + } + .to_str() + .expect("Response should always succeed to load to a &str"); + + let response: JsonResponse = + serde_json::from_str(result).expect("JsonResponse should always succeed to deserialize"); + + response.into() +} + +/// Optionally sign, encrypt using asymmetric encryption and publish a message using Waku Lightpush +/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_lightpush_publish_enc_asymmetricchar-messagejson-char-pubsubtopic-char-peerid-char-publickey-char-optionalsigningkey-int-timeoutms) +pub fn waku_lightpush_publish_encrypt_asymmetric( + message: &WakuMessage, + pubsub_topic: Option, + peer_id: PeerId, + public_key: &PublicKey, + signing_key: Option<&SecretKey>, + timeout: Duration, +) -> Result { + let pk = hex::encode(public_key.serialize()); + let sk = signing_key + .map(|signing_key| hex::encode(signing_key.serialize())) + .unwrap_or_else(String::new); + let pubsub_topic = pubsub_topic + .unwrap_or_else(waku_dafault_pubsub_topic) + .to_string(); + let result = unsafe { + CStr::from_ptr(waku_sys::waku_lightpush_publish_enc_asymmetric( + CString::new( + serde_json::to_string(&message) + .expect("WakuMessages should always be able to success serializing"), + ) + .expect("CString should build properly from the serialized waku message") + .into_raw(), + CString::new(pubsub_topic) + .expect("CString should build properly from pubsub topic") + .into_raw(), + CString::new(peer_id) + .expect("CString should build properly from peer id") + .into_raw(), + CString::new(pk) + .expect("CString should build properly from hex encoded public key") + .into_raw(), + CString::new(sk) + .expect("CString should build properly from hex encoded signing key") + .into_raw(), + timeout + .as_millis() + .try_into() + .expect("Duration as milliseconds should fit in a i32"), + )) + .to_str() + .expect("Response should always succeed to load to a &str") + }; + let message_id: JsonResponse = + serde_json::from_str(result).expect("JsonResponse should always succeed to deserialize"); + message_id.into() +} + +/// Optionally sign, encrypt using symmetric encryption and publish a message using Waku Lightpush +/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_lightpush_publish_enc_symmetricchar-messagejson-char-pubsubtopic-char-peerid-char-symmetrickey-char-optionalsigningkey-int-timeoutms) +pub fn waku_lightpush_publish_encrypt_symmetric( + message: &WakuMessage, + pubsub_topic: Option, + peer_id: PeerId, + symmetric_key: &Key, + signing_key: Option<&SecretKey>, + timeout: Duration, +) -> Result { + let symk = hex::encode(symmetric_key.as_slice()); + let sk = signing_key + .map(|signing_key| hex::encode(signing_key.serialize())) + .unwrap_or_else(String::new); + let pubsub_topic = pubsub_topic + .unwrap_or_else(waku_dafault_pubsub_topic) + .to_string(); + let result = unsafe { + CStr::from_ptr(waku_sys::waku_lightpush_publish_enc_symmetric( + CString::new( + serde_json::to_string(&message) + .expect("WakuMessages should always be able to success serializing"), + ) + .expect("CString should build properly from the serialized waku message") + .into_raw(), + CString::new(pubsub_topic) + .expect("CString should build properly from pubsub topic") + .into_raw(), + CString::new(peer_id) + .expect("CString should build properly from peer id") + .into_raw(), + CString::new(symk) + .expect("CString should build properly from hex encoded symmetric key") + .into_raw(), + CString::new(sk) + .expect("CString should build properly from hex encoded signing key") + .into_raw(), + timeout + .as_millis() + .try_into() + .expect("Duration as milliseconds should fit in a i32"), + )) + .to_str() + .expect("Response should always succeed to load to a &str") + }; + let message_id: JsonResponse = + serde_json::from_str(result).expect("JsonResponse should always succeed to deserialize"); + message_id.into() +} diff --git a/waku/src/node/management.rs b/waku/src/node/management.rs index cd55c56..cd79754 100644 --- a/waku/src/node/management.rs +++ b/waku/src/node/management.rs @@ -1,3 +1,5 @@ +//! Node lifcycle [mangement](https://rfc.vac.dev/spec/36/#node-management) related methods + // std use multiaddr::Multiaddr; use std::ffi::{CStr, CString}; diff --git a/waku/src/node/mod.rs b/waku/src/node/mod.rs index e59d928..217e1f7 100644 --- a/waku/src/node/mod.rs +++ b/waku/src/node/mod.rs @@ -1,7 +1,12 @@ +//! Waku node implementation + mod config; +mod filter; +mod lightpush; mod management; mod peers; mod relay; +mod store; // std use aes_gcm::{Aes256Gcm, Key}; @@ -12,7 +17,10 @@ use std::sync::Mutex; use std::time::Duration; // crates // internal -use crate::general::{MessageId, PeerId, Result, WakuMessage, WakuPubSubTopic}; +use crate::general::{ + FilterSubscription, MessageId, PeerId, Result, StoreQuery, StoreResponse, WakuMessage, + WakuPubSubTopic, +}; pub use config::WakuNodeConfig; pub use peers::{Protocol, WakuPeerData, WakuPeers}; @@ -33,6 +41,11 @@ pub struct Running; impl WakuNodeState for Initialized {} impl WakuNodeState for Running {} +/// Handle to the underliying waku node +/// Safe to sendt to/through threads. +/// Only a waku node can be running at a time. +/// Referenes (`&`) to the handle can call queries and perform operations in a thread safe way. +/// Only an owned version of the handle can `start` or `stop` the node. pub struct WakuNodeHandle(PhantomData); /// We do not have any inner state, so the handle should be safe to be send among threads. @@ -159,7 +172,7 @@ impl WakuNodeHandle { message: &WakuMessage, pubsub_topic: Option, public_key: &PublicKey, - signing_key: &SecretKey, + signing_key: Option<&SecretKey>, timeout: Duration, ) -> Result { relay::waku_relay_publish_encrypt_asymmetric( @@ -179,7 +192,7 @@ impl WakuNodeHandle { message: &WakuMessage, pubsub_topic: Option, symmetric_key: &Key, - signing_key: &SecretKey, + signing_key: Option<&SecretKey>, timeout: Duration, ) -> Result { relay::waku_relay_publish_encrypt_symmetric( @@ -211,6 +224,92 @@ impl WakuNodeHandle { pub fn relay_unsubscribe(&self, pubsub_topic: Option) -> Result<()> { relay::waku_relay_unsubscribe(pubsub_topic) } + + /// Retrieves historical messages on specific content topics + /// + /// wrapper around [`store::waku_store_query`] + pub fn store_query( + query: &StoreQuery, + peer_id: PeerId, + timeout: Duration, + ) -> Result { + store::waku_store_query(query, peer_id, timeout) + } + + /// Publish a message using Waku Lightpush + /// + /// wrapper around [`lightpush::waku_lightpush_publish`] + pub fn lightpush_publish( + message: &WakuMessage, + pubsub_topic: WakuPubSubTopic, + peer_id: PeerId, + timeout: Duration, + ) -> Result { + lightpush::waku_lightpush_publish(message, pubsub_topic, peer_id, timeout) + } + + /// Optionally sign, encrypt using asymmetric encryption and publish a message using Waku Lightpush + /// + /// wrapper around [`lightpush::waku_lightpush_publish_encrypt_asymmetric`] + pub fn lightpush_publish_encrypt_asymmetric( + message: &WakuMessage, + pubsub_topic: Option, + peer_id: PeerId, + public_key: &PublicKey, + signing_key: Option<&SecretKey>, + timeout: Duration, + ) -> Result { + lightpush::waku_lightpush_publish_encrypt_asymmetric( + message, + pubsub_topic, + peer_id, + public_key, + signing_key, + timeout, + ) + } + + /// Optionally sign, encrypt using symmetric encryption and publish a message using Waku Lightpush + /// + /// wrapper around [`lightpush::waku_lightpush_publish_encrypt_symmetric`] + pub fn lightpush_publish_encrypt_symmetric( + message: &WakuMessage, + pubsub_topic: Option, + peer_id: PeerId, + symmetric_key: &Key, + signing_key: Option<&SecretKey>, + timeout: Duration, + ) -> Result { + lightpush::waku_lightpush_publish_encrypt_symmetric( + message, + pubsub_topic, + peer_id, + symmetric_key, + signing_key, + timeout, + ) + } + + /// Creates a subscription in a lightnode for messages that matches a content filter and optionally a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`) + /// + /// wrapper around [`filter::waku_filter_subscribe`] + pub fn filter_subscribe( + filter_subscription: &FilterSubscription, + peer_id: PeerId, + timeout: Duration, + ) -> Result<()> { + filter::waku_filter_subscribe(filter_subscription, peer_id, timeout) + } + + /// Removes subscriptions in a light node matching a content filter and, optionally, a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`) + /// + /// wrapper around [`filter::waku_filter_unsubscribe`] + pub fn filter_unsubscribe( + filter_subscription: &FilterSubscription, + timeout: Duration, + ) -> Result<()> { + filter::waku_filter_unsubscribe(filter_subscription, timeout) + } } /// Spawn a new Waku node with the givent configuration (default configuration if `None` provided) diff --git a/waku/src/node/peers.rs b/waku/src/node/peers.rs index 4b1a933..094c1ef 100644 --- a/waku/src/node/peers.rs +++ b/waku/src/node/peers.rs @@ -1,3 +1,5 @@ +//! Waku [peer handling and connection](https://rfc.vac.dev/spec/36/#connecting-to-peers) methods + // std use std::ffi::{CStr, CString}; use std::time::Duration; @@ -111,6 +113,12 @@ pub fn waku_peer_count() -> Result { result.into() } +/// Waku peer supported protocol +/// +/// Examples: +/// `"/ipfs/id/1.0.0"` +/// `"/vac/waku/relay/2.0.0"` +/// `"/ipfs/ping/1.0.0"` pub type Protocol = String; /// Peer data from known/connected waku nodes @@ -129,6 +137,24 @@ pub struct WakuPeerData { connected: bool, } +impl WakuPeerData { + pub fn peer_id(&self) -> &PeerId { + &self.peer_id + } + + pub fn protocols(&self) -> &[Protocol] { + &self.protocols + } + + pub fn addresses(&self) -> &[Multiaddr] { + &self.addresses + } + + pub fn connected(&self) -> bool { + self.connected + } +} + /// List of [`WakuPeerData`], return value from [`waku_peers`] funtion pub type WakuPeers = Vec; diff --git a/waku/src/node/relay.rs b/waku/src/node/relay.rs index 4bc01bb..b4abd76 100644 --- a/waku/src/node/relay.rs +++ b/waku/src/node/relay.rs @@ -1,3 +1,5 @@ +//! Waku [relay](https://rfc.vac.dev/spec/36/#waku-relay) protocol related methods + // std use std::ffi::{CStr, CString}; use std::time::Duration; @@ -107,11 +109,13 @@ pub fn waku_relay_publish_encrypt_asymmetric( message: &WakuMessage, pubsub_topic: Option, public_key: &PublicKey, - signing_key: &SecretKey, + signing_key: Option<&SecretKey>, timeout: Duration, ) -> Result { let pk = hex::encode(public_key.serialize()); - let sk = hex::encode(signing_key.serialize()); + let sk = signing_key + .map(|signing_key| hex::encode(signing_key.serialize())) + .unwrap_or_else(String::new); let pubsub_topic = pubsub_topic .unwrap_or_else(waku_dafault_pubsub_topic) .to_string(); @@ -151,11 +155,13 @@ pub fn waku_relay_publish_encrypt_symmetric( message: &WakuMessage, pubsub_topic: Option, symmetric_key: &Key, - signing_key: &SecretKey, + signing_key: Option<&SecretKey>, timeout: Duration, ) -> Result { let symk = hex::encode(symmetric_key.as_slice()); - let sk = hex::encode(signing_key.serialize()); + let sk = signing_key + .map(|signing_key| hex::encode(signing_key.serialize())) + .unwrap_or_else(String::new); let pubsub_topic = pubsub_topic .unwrap_or_else(waku_dafault_pubsub_topic) .to_string(); diff --git a/waku/src/node/store.rs b/waku/src/node/store.rs new file mode 100644 index 0000000..cc39a7d --- /dev/null +++ b/waku/src/node/store.rs @@ -0,0 +1,42 @@ +//! Waku [store](https://rfc.vac.dev/spec/36/#waku-store) handling methods + +// std +use std::ffi::{CStr, CString}; +use std::time::Duration; +// crates +// internal +use crate::general::{JsonResponse, PeerId, Result, StoreQuery, StoreResponse}; + +/// Retrieves historical messages on specific content topics. This method may be called with [`PagingOptions`](`crate::general::PagingOptions`), +/// to retrieve historical messages on a per-page basis. If the request included [`PagingOptions`](`crate::general::PagingOptions`), +/// the node must return messages on a per-page basis and include [`PagingOptions`](`crate::general::PagingOptions`) in the response. +/// These [`PagingOptions`](`crate::general::PagingOptions`) must contain a cursor pointing to the Index from which a new page can be requested +pub fn waku_store_query( + query: &StoreQuery, + peer_id: PeerId, + timeout: Duration, +) -> Result { + let result = unsafe { + CStr::from_ptr(waku_sys::waku_store_query( + CString::new( + serde_json::to_string(query) + .expect("StoreQuery should always be able to be serialized"), + ) + .expect("CString should build properly from the serialized filter subscription") + .into_raw(), + CString::new(peer_id) + .expect("CString should build properly from peer id") + .into_raw(), + timeout + .as_millis() + .try_into() + .expect("Duration as milliseconds should fit in a i32"), + )) + } + .to_str() + .expect("Response should always succeed to load to a &str"); + + let response: JsonResponse = + serde_json::from_str(result).expect("JsonResponse should always succeed to deserialize"); + response.into() +}