* Refactor node -> management

* Pipe peers methods to waku node handle

* Added relay topic types

* Added content topic, update pubsub topic

* Relay create content/pubsub topic

* Impl Display for topics

* Added symmetric and asymmetric publish

* Implement relay subscriptions methods

* Impl Serialize/Deserialize for Content/Pubsub topic

* Missing serde de::Error import

* Fix enconding typo

* Derive clone for general types

* Plumb relay methods to node

* Add docs to node methods

* Methods should be thread-safe

* Missing thread-safe methods

* Implement send + sync for the node handle

* Stylish space
This commit is contained in:
Daniel Sanchez 2022-10-06 15:51:00 +02:00 committed by GitHub
parent fb0805ce40
commit aefe45ad65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 706 additions and 36 deletions

152
Cargo.lock generated
View File

@ -2,6 +2,41 @@
# It is not intended for manual editing.
version = 3
[[package]]
name = "aead"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c192eb8f11fc081b0fe4259ba5af04217d4e0faddd02417310a927911abd7c8"
dependencies = [
"crypto-common",
"generic-array",
]
[[package]]
name = "aes"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfe0133578c0986e1fe3dfcd4af1cc5b2dd6c3dbf534d69916ce16a2701d40ba"
dependencies = [
"cfg-if",
"cipher",
"cpufeatures",
]
[[package]]
name = "aes-gcm"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "82e1366e0c69c9f927b1fa5ce2c7bf9eafc8f9268c0b9800729e8b267612447c"
dependencies = [
"aead",
"aes",
"cipher",
"ctr",
"ghash",
"subtle",
]
[[package]]
name = "aho-corasick"
version = "0.7.19"
@ -105,6 +140,16 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "cipher"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1873270f8f7942c191139cb8a40fd228da6c3fd2fc376d7e92d47aa14aeb59e"
dependencies = [
"crypto-common",
"inout",
]
[[package]]
name = "clang-sys"
version = "1.4.0"
@ -140,6 +185,26 @@ dependencies = [
"os_str_bytes",
]
[[package]]
name = "const_format"
version = "0.2.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "939dc9e2eb9077e0679d2ce32de1ded8531779360b003b4a972a7a39ec263495"
dependencies = [
"const_format_proc_macros",
]
[[package]]
name = "const_format_proc_macros"
version = "0.2.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef196d5d972878a48da7decb7686eded338b4858fbabeed513d63a7c98b2b82d"
dependencies = [
"proc-macro2",
"quote",
"unicode-xid",
]
[[package]]
name = "core2"
version = "0.4.0"
@ -164,6 +229,17 @@ version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7"
[[package]]
name = "crypto-common"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
dependencies = [
"generic-array",
"rand_core",
"typenum",
]
[[package]]
name = "crypto-mac"
version = "0.8.0"
@ -174,6 +250,15 @@ dependencies = [
"subtle",
]
[[package]]
name = "ctr"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0369ee1ad671834580515889b80f2ea915f23b8be8d0daa4bbaf2ac5c7590835"
dependencies = [
"cipher",
]
[[package]]
name = "data-encoding"
version = "2.3.2"
@ -238,6 +323,16 @@ dependencies = [
"wasi",
]
[[package]]
name = "ghash"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d930750de5717d2dd0b8c0d42c076c0e884c81a73e6cab859bbd2339c71e3e40"
dependencies = [
"opaque-debug",
"polyval",
]
[[package]]
name = "glob"
version = "0.3.0"
@ -312,6 +407,15 @@ dependencies = [
"hashbrown",
]
[[package]]
name = "inout"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5"
dependencies = [
"generic-array",
]
[[package]]
name = "itoa"
version = "1.0.3"
@ -498,6 +602,18 @@ version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e"
[[package]]
name = "polyval"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ef234e08c11dfcb2e56f79fd70f6f2eb7f025c0ce2333e82f4f0518ecad30c6"
dependencies = [
"cfg-if",
"cpufeatures",
"opaque-debug",
"universal-hash",
]
[[package]]
name = "ppv-lite86"
version = "0.2.16"
@ -666,6 +782,30 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43b2853a4d09f215c24cc5489c992ce46052d359b5109343cbafbf26bc62f8a3"
[[package]]
name = "sscanf"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5ecdd7ea17bcadebf81d656db919f58f96c1d194d748cf0839a44a220123eedd"
dependencies = [
"const_format",
"lazy_static",
"regex",
"sscanf_macro",
]
[[package]]
name = "sscanf_macro"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe2309d255caf220c1ff9f380d89420a1377de1cabc1d57e0b308e53b0406bed"
dependencies = [
"proc-macro2",
"quote",
"regex-syntax",
"syn",
]
[[package]]
name = "static_assertions"
version = "1.1.0"
@ -799,6 +939,16 @@ version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c"
[[package]]
name = "universal-hash"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d3160b73c9a19f7e2939a2fdad446c57c1bbbbf4d919d3213ff1267a580d8b5"
dependencies = [
"crypto-common",
"subtle",
]
[[package]]
name = "unsigned-varint"
version = "0.7.1"
@ -826,12 +976,14 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
name = "waku"
version = "0.1.0"
dependencies = [
"aes-gcm",
"hex",
"libsecp256k1",
"multiaddr",
"once_cell",
"serde",
"serde_json",
"sscanf",
"waku-sys",
]

View File

@ -6,10 +6,12 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
aes-gcm = { version = "0.10", features = ["aes"] }
hex = "0.4"
libsecp256k1 = "0.7"
multiaddr = "0.14"
once_cell = "1.15"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sscanf = "0.3"
waku-sys = { path = "../waku-sys" }

View File

@ -6,7 +6,7 @@ use std::sync::RwLock;
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
// internal
use crate::general::{PubsubTopic, WakuMessage};
use crate::general::{WakuMessage, WakuPubSubTopic};
#[derive(Serialize, Deserialize)]
pub struct Signal {
@ -26,7 +26,7 @@ pub enum Event {
#[serde(rename_all = "camelCase")]
pub struct WakuMessageEvent {
/// The pubsub topic on which the message was received
pubsub_topic: PubsubTopic,
pubsub_topic: WakuPubSubTopic,
/// The message id
message_id: String,
/// The message in [`WakuMessage`] format
@ -34,7 +34,7 @@ pub struct WakuMessageEvent {
}
impl WakuMessageEvent {
pub fn pubsub_topic(&self) -> &PubsubTopic {
pub fn pubsub_topic(&self) -> &WakuPubSubTopic {
&self.pubsub_topic
}

View File

@ -1,13 +1,15 @@
// std
use std::fmt::{Display, Formatter};
use std::str::FromStr;
// crates
use serde::{Deserialize, Serialize};
use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer};
use sscanf::{scanf, RegexRepresentation};
// internal
pub type PubsubTopic = String;
pub type ContentTopic = String;
pub type WakuMessageVersion = usize;
/// Base58 encoded peer id
pub type PeerId = String;
pub type MessageId = String;
/// JsonResponse wrapper.
/// `go-waku` ffi returns this type as a `char *` as per the [specification](https://rfc.vac.dev/spec/36/#jsonresponse-type)
@ -34,12 +36,12 @@ impl<T> From<JsonResponse<T>> for Result<T> {
/// JsonMessage, Waku message in JSON format.
/// as per the [specification](https://rfc.vac.dev/spec/36/#jsonmessage-type)
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct WakuMessage {
payload: Box<[u8]>,
/// The content topic to be set on the message
content_topic: ContentTopic,
content_topic: WakuContentTopic,
/// The Waku Message version number
version: WakuMessageVersion,
/// Unix timestamp in nanoseconds
@ -60,30 +62,30 @@ pub struct DecodedPayload {
/// The content topic of a Waku message
/// as per the [specification](https://rfc.vac.dev/spec/36/#contentfilter-type)
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ContentFilter {
/// The content topic of a Waku message
content_topic: ContentTopic,
content_topic: WakuContentTopic,
}
/// The criteria to create subscription to a light node in JSON Format
/// as per the [specification](https://rfc.vac.dev/spec/36/#filtersubscription-type)
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct FilterSubscription {
/// Array of [`ContentFilter`] being subscribed to / unsubscribed from
content_filters: Vec<ContentFilter>,
/// Optional pubsub topic
pubsub_topic: Option<PubsubTopic>,
pubsub_topic: Option<WakuPubSubTopic>,
}
/// Criteria used to retrieve historical messages
#[derive(Serialize)]
#[derive(Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct StoreQuery {
/// The pubsub topic on which messages are published
pubsub_topic: Option<PubsubTopic>,
pubsub_topic: Option<WakuPubSubTopic>,
/// Array of [`ContentFilter`] to query for historical messages
content_filters: Vec<ContentFilter>,
/// The inclusive lower bound on the timestamp of queried messages.
@ -97,7 +99,7 @@ pub struct StoreQuery {
}
/// The response received after doing a query to a store node
#[derive(Deserialize)]
#[derive(Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct StoreResponse {
/// Array of retrieved historical messages in [`WakuMessage`] format
@ -107,7 +109,7 @@ pub struct StoreResponse {
}
/// Paging information
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PagingOptions {
/// Number of messages to retrieve per page
@ -120,7 +122,7 @@ pub struct PagingOptions {
forward: bool,
}
#[derive(Serialize, Deserialize)]
#[derive(Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct MessageIndex {
/// Hash of the message at this [`MessageIndex`]
@ -130,5 +132,156 @@ pub struct MessageIndex {
/// UNIX timestamp in nanoseconds at which the message is generated by its sender
sender_time: usize,
/// The pubsub topic of the message at this [`MessageIndex`]
pubsub_topic: PubsubTopic,
pubsub_topic: WakuPubSubTopic,
}
#[derive(Copy, Clone)]
pub enum Encoding {
Proto,
Rlp,
Rfc26,
}
impl Display for Encoding {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let s = match self {
Encoding::Proto => "proto",
Encoding::Rlp => "rlp",
Encoding::Rfc26 => "rfc26",
};
f.write_str(s)
}
}
impl FromStr for Encoding {
type Err = String;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"proto" => Ok(Self::Proto),
"rlp" => Ok(Self::Rlp),
"rfc26" => Ok(Self::Rfc26),
encoding => Err(format!("Unrecognized encoding: {}", encoding)),
}
}
}
impl RegexRepresentation for Encoding {
const REGEX: &'static str = r"\w";
}
#[derive(Clone)]
pub struct WakuContentTopic {
application_name: String,
version: usize,
content_topic_name: String,
encoding: Encoding,
}
impl FromStr for WakuContentTopic {
type Err = String;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
if let Ok((application_name, version, content_topic_name, encoding)) =
scanf!(s, "/{}/{}/{}/{}", String, usize, String, Encoding)
{
Ok(WakuContentTopic {
application_name,
version,
content_topic_name,
encoding,
})
} else {
Err(
format!(
"Wrong pub-sub topic format. Should be `/{{application-name}}/{{version-of-the-application}}/{{content-topic-name}}/{{encoding}}`. Got: {}",
s
)
)
}
}
}
impl Display for WakuContentTopic {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"/{}/{}/{}/{}",
self.application_name, self.version, self.content_topic_name, self.encoding
)
}
}
impl Serialize for WakuContentTopic {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
self.to_string().serialize(serializer)
}
}
impl<'de> Deserialize<'de> for WakuContentTopic {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let as_string: String = String::deserialize(deserializer)?;
as_string
.parse::<WakuContentTopic>()
.map_err(D::Error::custom)
}
}
#[derive(Clone)]
pub struct WakuPubSubTopic {
topic_name: String,
encoding: Encoding,
}
impl FromStr for WakuPubSubTopic {
type Err = String;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
if let Ok((topic_name, encoding)) = scanf!(s, "/waku/v2/{}/{}", String, Encoding) {
Ok(WakuPubSubTopic {
topic_name,
encoding,
})
} else {
Err(
format!(
"Wrong pub-sub topic format. Should be `/waku/2/{{topic-name}}/{{encoding}}`. Got: {}",
s
)
)
}
}
}
impl Display for WakuPubSubTopic {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "/waku/2/{}/{}", self.topic_name, self.encoding)
}
}
impl Serialize for WakuPubSubTopic {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
self.to_string().serialize(serializer)
}
}
impl<'de> Deserialize<'de> for WakuPubSubTopic {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let as_string: String = String::deserialize(deserializer)?;
as_string
.parse::<WakuPubSubTopic>()
.map_err(D::Error::custom)
}
}

View File

@ -4,7 +4,7 @@ use std::ffi::{CStr, CString};
// crates
// internal
use super::config::WakuNodeConfig;
use crate::general::{JsonResponse, Result};
use crate::general::{JsonResponse, PeerId, Result};
/// Instantiates a Waku node
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_newchar-jsonconfig)
@ -52,7 +52,7 @@ pub fn waku_stop() -> Result<bool> {
/// If the execution is successful, the result is the peer ID as a string (base58 encoded)
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop)
pub fn waku_peer_id() -> Result<String> {
pub fn waku_peer_id() -> Result<PeerId> {
let response = unsafe { CStr::from_ptr(waku_sys::waku_peerid()) }
.to_str()
.expect("Response should always succeed to load to a &str");
@ -65,7 +65,7 @@ pub fn waku_peer_id() -> Result<String> {
/// Get the multiaddresses the Waku node is listening to
/// as per [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_listen_addresses)
pub fn waku_listen_addressses() -> Result<Vec<Multiaddr>> {
pub fn waku_listen_addresses() -> Result<Vec<Multiaddr>> {
let response = unsafe { CStr::from_ptr(waku_sys::waku_listen_addresses()) }
.to_str()
.expect("Response should always succeed to load to a &str");
@ -79,7 +79,7 @@ pub fn waku_listen_addressses() -> Result<Vec<Multiaddr>> {
#[cfg(test)]
mod test {
use super::waku_new;
use crate::node::management::{waku_listen_addressses, waku_peer_id, waku_start, waku_stop};
use crate::node::management::{waku_listen_addresses, waku_peer_id, waku_start, waku_stop};
#[test]
fn waku_flow() {
@ -91,7 +91,7 @@ mod test {
assert!(!id.is_empty());
// test addresses, since we cannot start different instances of the node
let addresses = waku_listen_addressses().unwrap();
let addresses = waku_listen_addresses().unwrap();
dbg!(&addresses);
assert!(!addresses.is_empty());

View File

@ -1,18 +1,22 @@
mod config;
mod management;
mod peers;
mod relay;
// std
use aes_gcm::{Aes256Gcm, Key};
use libsecp256k1::{PublicKey, SecretKey};
use multiaddr::Multiaddr;
use std::marker::PhantomData;
use std::sync::Mutex;
use std::time::Duration;
// crates
// internal
use crate::general::{PeerId, Result};
use crate::general::{MessageId, PeerId, Result, WakuMessage, WakuPubSubTopic};
pub use config::WakuNodeConfig;
pub use peers::{Protocol, WakuPeerData, WakuPeers};
pub use relay::{waku_create_content_topic, waku_create_pubsub_topic, waku_dafault_pubsub_topic};
/// Shared flag to check if a waku node is already running in the current process
static WAKU_NODE_INITIALIZED: Mutex<bool> = Mutex::new(false);
@ -31,19 +35,36 @@ impl WakuNodeState for Running {}
pub struct WakuNodeHandle<State: WakuNodeState>(PhantomData<State>);
/// We do not have any inner state, so the handle should be safe to be send among threads.
unsafe impl<State: WakuNodeState> Send for WakuNodeHandle<State> {}
/// References to the handle are safe to share, as they do not mutate the handle itself and
/// operations are performed by the bindings backend, which is supposed to be thread safe.
unsafe impl<State: WakuNodeState> Sync for WakuNodeHandle<State> {}
impl<State: WakuNodeState> WakuNodeHandle<State> {
pub fn peer_id(&self) -> Result<String> {
/// If the execution is successful, the result is the peer ID as a string (base58 encoded)
///
/// wrapper around [`management::waku_peer_id`]
pub fn peer_id(&self) -> Result<PeerId> {
management::waku_peer_id()
}
/// Get the multiaddresses the Waku node is listening to
///
/// wrapper around [`management::waku_listen_addresses`]
pub fn listen_addresses(&self) -> Result<Vec<Multiaddr>> {
management::waku_listen_addressses()
management::waku_listen_addresses()
}
pub fn add_peer(&mut self, address: Multiaddr, protocol_id: usize) -> Result<PeerId> {
/// Add a node multiaddress and protocol to the waku nodes peerstore
///
/// wrapper around [`peers::waku_add_peers`]
pub fn add_peer(&self, address: Multiaddr, protocol_id: usize) -> Result<PeerId> {
peers::waku_add_peers(address, protocol_id)
}
}
fn stop_node() -> Result<()> {
let mut node_initialized = WAKU_NODE_INITIALIZED
.lock()
@ -53,49 +74,147 @@ fn stop_node() -> Result<()> {
}
impl WakuNodeHandle<Initialized> {
/// Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation
///
/// wrapper around [`management::waku_start`]
pub fn start(self) -> Result<WakuNodeHandle<Running>> {
management::waku_start().map(|_| WakuNodeHandle(Default::default()))
}
/// Stops a Waku node
///
/// internally uses [`management::waku_stop`]
pub fn stop(self) -> Result<()> {
stop_node()
}
}
impl WakuNodeHandle<Running> {
/// Stops a Waku node
///
/// internally uses [`management::waku_stop`]
pub fn stop(self) -> Result<()> {
stop_node()
}
/// Dial peer using a multiaddress
/// If `timeout` as milliseconds doesn't fit into a `i32` it is clamped to [`i32::MAX`]
/// If the function execution takes longer than `timeout` value, the execution will be canceled and an error returned.
/// Use 0 for no timeout
///
/// wrapper around [`peers::waku_connect_peer_with_address`]
pub fn connect_peer_with_address(
&mut self,
&self,
address: Multiaddr,
timeout: Option<Duration>,
) -> Result<()> {
peers::waku_connect_peer_with_address(address, timeout)
}
pub fn connect_peer_with_id(
&mut self,
peer_id: PeerId,
timeout: Option<Duration>,
) -> Result<()> {
/// Dial peer using its peer ID
///
/// wrapper around [`peers::waku_connect_peer_with_id`]
pub fn connect_peer_with_id(&self, peer_id: PeerId, timeout: Option<Duration>) -> Result<()> {
peers::waku_connect_peer_with_id(peer_id, timeout)
}
pub fn disconnect_peer_with_id(&mut self, peer_id: PeerId) -> Result<()> {
/// Disconnect a peer using its peerID
///
/// wrapper around [`peers::waku_disconnect_peer_with_id`]
pub fn disconnect_peer_with_id(&self, peer_id: PeerId) -> Result<()> {
peers::waku_disconnect_peer_with_id(peer_id)
}
/// Get number of connected peers
///
/// wrapper around [`peers::waku_peer_count`]
pub fn peer_count(&self) -> Result<usize> {
peers::waku_peer_count()
}
/// Retrieve the list of peers known by the Waku node
///
/// wrapper around [`peers::waku_peers`]
pub fn peers(&self) -> Result<WakuPeers> {
peers::waku_peers()
}
/// Publish a message using Waku Relay
///
/// wrapper around [`relay::waku_relay_publish_message`]
pub fn relay_publish_message(
&self,
message: &WakuMessage,
pubsub_topic: Option<WakuPubSubTopic>,
timeout: Duration,
) -> Result<MessageId> {
relay::waku_relay_publish_message(message, pubsub_topic, timeout)
}
/// Optionally sign, encrypt using asymmetric encryption and publish a message using Waku Relay
///
/// wrapper around [`relay::waku_relay_publish_encrypt_asymmetric`]
pub fn relay_publish_encrypt_asymmetric(
&self,
message: &WakuMessage,
pubsub_topic: Option<WakuPubSubTopic>,
public_key: &PublicKey,
signing_key: &SecretKey,
timeout: Duration,
) -> Result<MessageId> {
relay::waku_relay_publish_encrypt_asymmetric(
message,
pubsub_topic,
public_key,
signing_key,
timeout,
)
}
/// Optionally sign, encrypt using symmetric encryption and publish a message using Waku Relay
///
/// wrapper around [`relay::waku_relay_publish_encrypt_symmetric`]
pub fn relay_publish_encrypt_symmetric(
&self,
message: &WakuMessage,
pubsub_topic: Option<WakuPubSubTopic>,
symmetric_key: &Key<Aes256Gcm>,
signing_key: &SecretKey,
timeout: Duration,
) -> Result<MessageId> {
relay::waku_relay_publish_encrypt_symmetric(
message,
pubsub_topic,
symmetric_key,
signing_key,
timeout,
)
}
/// Determine if there are enough peers to publish a message on a given pubsub topic
///
/// wrapper around [`relay::waku_enough_peers`]
pub fn relay_enough_peers(&self, pubsub_topic: Option<WakuPubSubTopic>) -> Result<bool> {
relay::waku_enough_peers(pubsub_topic)
}
/// Subscribe to a Waku Relay pubsub topic to receive messages
///
/// wrapper around [`relay::waku_relay_subscribe`]
pub fn relay_subscribe(&self, pubsub_topic: Option<WakuPubSubTopic>) -> Result<()> {
relay::waku_relay_subscribe(pubsub_topic)
}
/// Closes the pubsub subscription to a pubsub topic. No more messages will be received from this pubsub topic
///
/// wrapper around [`relay::waku_relay_unsubscribe`]
pub fn relay_unsubscribe(&self, pubsub_topic: Option<WakuPubSubTopic>) -> Result<()> {
relay::waku_relay_unsubscribe(pubsub_topic)
}
}
/// Spawn a new Waku node with the givent configuration (default configuration if `None` provided)
/// Internally uses [`management::waku_new`]
pub fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeHandle<Initialized>> {
let mut node_initialized = WAKU_NODE_INITIALIZED
.lock()

View File

@ -1,5 +1,5 @@
// std
use std::ffi::{c_char, CStr, CString};
use std::ffi::{CStr, CString};
use std::time::Duration;
// crates
use multiaddr::Multiaddr;
@ -57,7 +57,7 @@ pub fn waku_connect_peer_with_address(address: Multiaddr, timeout: Option<Durati
/// Dial peer using a peer id
/// If `timeout` as milliseconds doesn't fit into a `i32` it is clamped to [`i32::MAX`]
/// The peer must be already known.
/// It must have been added before with [`waku_add_peer`] or previously dialed with [`waku_connect_peer_with_address`]
/// It must have been added before with [`waku_add_peers`] or previously dialed with [`waku_connect_peer_with_address`]
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_connect_peeridchar-peerid-int-timeoutms)
pub fn waku_connect_peer_with_id(peer_id: PeerId, timeout: Option<Duration>) -> Result<()> {
let response = unsafe {

244
waku/src/node/relay.rs Normal file
View File

@ -0,0 +1,244 @@
// std
use std::ffi::{CStr, CString};
use std::time::Duration;
// crates
use aes_gcm::{Aes256Gcm, Key};
use libsecp256k1::{PublicKey, SecretKey};
// internal
use crate::general::{
Encoding, JsonResponse, MessageId, Result, WakuContentTopic, WakuMessage, WakuPubSubTopic,
};
/// Create a content topic according to [RFC 23](https://rfc.vac.dev/spec/23/)
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_content_topicchar-applicationname-unsigned-int-applicationversion-char-contenttopicname-char-encoding)
pub fn waku_create_content_topic(
application_name: &str,
application_version: usize,
content_topic_name: &str,
encoding: Encoding,
) -> WakuContentTopic {
unsafe {
CStr::from_ptr(waku_sys::waku_content_topic(
CString::new(application_name)
.expect("Application name should always transform to CString")
.into_raw(),
application_version
.try_into()
.expect("Version should fit within an u32"),
CString::new(content_topic_name)
.expect("Conmtent topic should always transform to CString")
.into_raw(),
CString::new(encoding.to_string())
.expect("Encoding should always transform to CString")
.into_raw(),
))
}
.to_str()
.expect("&str from result should always be extracted")
.parse()
.expect("Content topic data should be always parseable")
}
/// Create a pubsub topic according to [RFC 23](https://rfc.vac.dev/spec/23/)
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_pubsub_topicchar-name-char-encoding)
pub fn waku_create_pubsub_topic(topic_name: &str, encoding: Encoding) -> WakuPubSubTopic {
unsafe {
CStr::from_ptr(waku_sys::waku_pubsub_topic(
CString::new(topic_name)
.expect("Topic name should always transform to CString")
.into_raw(),
CString::new(encoding.to_string())
.expect("Encoding should always transform to CString")
.into_raw(),
))
}
.to_str()
.expect("&str from result should always be extracted")
.parse()
.expect("Pubsub topic data should be always parseable")
}
/// Default pubsub topic used for exchanging waku messages defined in [RFC 10](https://rfc.vac.dev/spec/10/)
pub fn waku_dafault_pubsub_topic() -> WakuPubSubTopic {
unsafe { CStr::from_ptr(waku_sys::waku_default_pubsub_topic()) }
.to_str()
.expect("&str from result should always be extracted")
.parse()
.expect("Default pubsub topic should always be parseable")
}
/// Publish a message using Waku Relay
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms)
pub fn waku_relay_publish_message(
message: &WakuMessage,
pubsub_topic: Option<WakuPubSubTopic>,
timeout: Duration,
) -> Result<MessageId> {
let pubsub_topic = pubsub_topic
.unwrap_or_else(waku_dafault_pubsub_topic)
.to_string();
let result = unsafe {
CStr::from_ptr(waku_sys::waku_relay_publish(
CString::new(
serde_json::to_string(&message)
.expect("WakuMessages should always be able to success serializing"),
)
.expect("CString should build properly from the serialized waku message")
.into_raw(),
CString::new(pubsub_topic)
.expect("CString should build properly from pubsub topic")
.into_raw(),
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32"),
))
}
.to_str()
.expect("&str from result should always be extracted");
let message_id: JsonResponse<MessageId> =
serde_json::from_str(result).expect("JsonResponse should always succeed to deserialize");
message_id.into()
}
/// Optionally sign, encrypt using asymmetric encryption and publish a message using Waku Relay
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publish_enc_asymmetricchar-messagejson-char-pubsubtopic-char-publickey-char-optionalsigningkey-int-timeoutms)
pub fn waku_relay_publish_encrypt_asymmetric(
message: &WakuMessage,
pubsub_topic: Option<WakuPubSubTopic>,
public_key: &PublicKey,
signing_key: &SecretKey,
timeout: Duration,
) -> Result<MessageId> {
let pk = hex::encode(public_key.serialize());
let sk = hex::encode(signing_key.serialize());
let pubsub_topic = pubsub_topic
.unwrap_or_else(waku_dafault_pubsub_topic)
.to_string();
let result = unsafe {
CStr::from_ptr(waku_sys::waku_relay_publish_enc_asymmetric(
CString::new(
serde_json::to_string(&message)
.expect("WakuMessages should always be able to success serializing"),
)
.expect("CString should build properly from the serialized waku message")
.into_raw(),
CString::new(pubsub_topic)
.expect("CString should build properly from pubsub topic")
.into_raw(),
CString::new(pk)
.expect("CString should build properly from hex encoded public key")
.into_raw(),
CString::new(sk)
.expect("CString should build properly from hex encoded signing key")
.into_raw(),
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32"),
))
.to_str()
.expect("Response should always succeed to load to a &str")
};
let message_id: JsonResponse<MessageId> =
serde_json::from_str(result).expect("JsonResponse should always succeed to deserialize");
message_id.into()
}
/// Optionally sign, encrypt using symmetric encryption and publish a message using Waku Relay
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publish_enc_symmetricchar-messagejson-char-pubsubtopic-char-symmetrickey-char-optionalsigningkey-int-timeoutms)
pub fn waku_relay_publish_encrypt_symmetric(
message: &WakuMessage,
pubsub_topic: Option<WakuPubSubTopic>,
symmetric_key: &Key<Aes256Gcm>,
signing_key: &SecretKey,
timeout: Duration,
) -> Result<MessageId> {
let symk = hex::encode(symmetric_key.as_slice());
let sk = hex::encode(signing_key.serialize());
let pubsub_topic = pubsub_topic
.unwrap_or_else(waku_dafault_pubsub_topic)
.to_string();
let result = unsafe {
CStr::from_ptr(waku_sys::waku_relay_publish_enc_symmetric(
CString::new(
serde_json::to_string(&message)
.expect("WakuMessages should always be able to success serializing"),
)
.expect("CString should build properly from the serialized waku message")
.into_raw(),
CString::new(pubsub_topic)
.expect("CString should build properly from pubsub topic")
.into_raw(),
CString::new(symk)
.expect("CString should build properly from hex encoded symmetric key")
.into_raw(),
CString::new(sk)
.expect("CString should build properly from hex encoded signing key")
.into_raw(),
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32"),
))
.to_str()
.expect("Response should always succeed to load to a &str")
};
let message_id: JsonResponse<MessageId> =
serde_json::from_str(result).expect("JsonResponse should always succeed to deserialize");
message_id.into()
}
pub fn waku_enough_peers(pubsub_topic: Option<WakuPubSubTopic>) -> Result<bool> {
let pubsub_topic = pubsub_topic
.unwrap_or_else(waku_dafault_pubsub_topic)
.to_string();
let result = unsafe {
CStr::from_ptr(waku_sys::waku_relay_enough_peers(
CString::new(pubsub_topic)
.expect("CString should build properly from pubsub topic")
.into_raw(),
))
}
.to_str()
.expect("Response should always succeed to load to a &str");
let enough_peers: JsonResponse<bool> =
serde_json::from_str(result).expect("JsonResponse should always succeed to deserialize");
enough_peers.into()
}
pub fn waku_relay_subscribe(pubsub_topic: Option<WakuPubSubTopic>) -> Result<()> {
let pubsub_topic = pubsub_topic
.unwrap_or_else(waku_dafault_pubsub_topic)
.to_string();
let result = unsafe {
CStr::from_ptr(waku_sys::waku_relay_subscribe(
CString::new(pubsub_topic)
.expect("CString should build properly from pubsub topic")
.into_raw(),
))
}
.to_str()
.expect("Response should always succeed to load to a &str");
let enough_peers: JsonResponse<bool> =
serde_json::from_str(result).expect("JsonResponse should always succeed to deserialize");
Result::from(enough_peers).map(|_| ())
}
pub fn waku_relay_unsubscribe(pubsub_topic: Option<WakuPubSubTopic>) -> Result<()> {
let pubsub_topic = pubsub_topic
.unwrap_or_else(waku_dafault_pubsub_topic)
.to_string();
let result = unsafe {
CStr::from_ptr(waku_sys::waku_relay_unsubscribe(
CString::new(pubsub_topic)
.expect("CString should build properly from pubsub topic")
.into_raw(),
))
}
.to_str()
.expect("Response should always succeed to load to a &str");
let enough_peers: JsonResponse<bool> =
serde_json::from_str(result).expect("JsonResponse should always succeed to deserialize");
Result::from(enough_peers).map(|_| ())
}