Main test and extended fixes (#10)

* Pipe protocol id

* Fix peer id connect

* Fix WakuPubSubTopic parsing

* Use optional timeout on publish messages

* More test cases

* Update vendor

* Use connect with peer_id

* Fix signal -> event -> message deserialization

* Actively wait for result to arrive the test node

* Clippy happy

* Clippy happy on tests

* Cleaning and adjusting types

* Updated vendor

* Fix keys dance

* Fix lightpush

* Add disconnect test

* Ignore node test for CI

* Add gcc on gh actions

* Reverse installing gcc

* Bring back gcc just for ubuntu and windows

* Removed rust-crypto unused dependency

* Clippy happy
This commit is contained in:
Daniel Sanchez 2022-10-17 19:30:07 +02:00 committed by GitHub
parent 94e643a250
commit c82f4ebe4a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 378 additions and 206 deletions

145
Cargo.lock generated
View File

@ -104,15 +104,6 @@ version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "block-buffer"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4"
dependencies = [
"generic-array",
]
[[package]]
name = "bs58"
version = "0.4.0"
@ -125,6 +116,12 @@ version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "cc"
version = "1.0.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11"
[[package]]
name = "cexpr"
version = "0.6.0"
@ -223,12 +220,6 @@ dependencies = [
"libc",
]
[[package]]
name = "crunchy"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7"
[[package]]
name = "crypto-common"
version = "0.1.6"
@ -240,16 +231,6 @@ dependencies = [
"typenum",
]
[[package]]
name = "crypto-mac"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b584a330336237c1eecd3e94266efb216c56ed91225d634cb2991c5f3fd1aeab"
dependencies = [
"generic-array",
"subtle",
]
[[package]]
name = "ctr"
version = "0.9.2"
@ -265,15 +246,6 @@ version = "2.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ee2393c4a91429dffb4bedf19f4d6abf27d8a732c8ce4980305d782e5426d57"
[[package]]
name = "digest"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066"
dependencies = [
"generic-array",
]
[[package]]
name = "either"
version = "1.8.0"
@ -360,27 +332,6 @@ version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "hmac"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "126888268dcc288495a26bf004b38c5fdbb31682f992c84ceb046a1f0fe38840"
dependencies = [
"crypto-mac",
"digest",
]
[[package]]
name = "hmac-drbg"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17ea0a1394df5b6574da6e0c1ade9e78868c9fb0a4e5ef4428e32da4676b85b1"
dependencies = [
"digest",
"generic-array",
"hmac",
]
[[package]]
name = "humantime"
version = "2.1.0"
@ -450,54 +401,6 @@ dependencies = [
"winapi",
]
[[package]]
name = "libsecp256k1"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95b09eff1b35ed3b33b877ced3a691fc7a481919c7e29c53c906226fcf55e2a1"
dependencies = [
"arrayref",
"base64",
"digest",
"hmac-drbg",
"libsecp256k1-core",
"libsecp256k1-gen-ecmult",
"libsecp256k1-gen-genmult",
"rand",
"serde",
"sha2",
"typenum",
]
[[package]]
name = "libsecp256k1-core"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5be9b9bb642d8522a44d533eab56c16c738301965504753b03ad1de3425d5451"
dependencies = [
"crunchy",
"digest",
"subtle",
]
[[package]]
name = "libsecp256k1-gen-ecmult"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3038c808c55c87e8a172643a7d87187fc6c4174468159cb3090659d55bcb4809"
dependencies = [
"libsecp256k1-core",
]
[[package]]
name = "libsecp256k1-gen-genmult"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3db8d6ba2cec9eacc40e6e8ccc98931840301f1006e95647ceb2dd5c3aa06f7c"
dependencies = [
"libsecp256k1-core",
]
[[package]]
name = "log"
version = "0.4.17"
@ -732,6 +635,26 @@ version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09"
[[package]]
name = "secp256k1"
version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7649a0b3ffb32636e60c7ce0d70511eda9c52c658cd0634e194d5a19943aeff"
dependencies = [
"rand",
"secp256k1-sys",
"serde",
]
[[package]]
name = "secp256k1-sys"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83080e2c2fc1006e625be82e5d1eb6a43b7fd9578b617fcc55814daf286bba4b"
dependencies = [
"cc",
]
[[package]]
name = "serde"
version = "1.0.145"
@ -763,19 +686,6 @@ dependencies = [
"serde",
]
[[package]]
name = "sha2"
version = "0.9.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800"
dependencies = [
"block-buffer",
"cfg-if",
"cpufeatures",
"digest",
"opaque-debug",
]
[[package]]
name = "shlex"
version = "1.1.0"
@ -979,9 +889,10 @@ dependencies = [
"aes-gcm",
"base64",
"hex",
"libsecp256k1",
"multiaddr",
"once_cell",
"rand",
"secp256k1",
"serde",
"serde_json",
"sscanf",

@ -1 +1 @@
Subproject commit 2881d0cd5eb7b9083c9fa31d21cc066ef2ae1337
Subproject commit 3a7f2608b3613e40c055e8f9a6afb9337c28c580

View File

@ -9,9 +9,10 @@ edition = "2021"
aes-gcm = { version = "0.10", features = ["aes"] }
base64 = "0.13"
hex = "0.4"
libsecp256k1 = "0.7"
secp256k1 = { version = "0.24", features = ["rand", "recovery", "serde"] }
multiaddr = "0.14"
once_cell = "1.15"
rand = "0.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sscanf = "0.3"

View File

@ -4,7 +4,7 @@
use std::ffi::{CStr, CString};
// crates
use aes_gcm::{Aes256Gcm, Key};
use libsecp256k1::SecretKey;
use secp256k1::SecretKey;
// internal
use crate::general::{DecodedPayload, JsonResponse, Result, WakuMessage};
@ -32,7 +32,7 @@ pub fn waku_decode_symmetric(
.to_str()
.expect("Response should always succeed to load to a &str");
let response: JsonResponse<DecodedPayload> =
serde_json::from_str(result).expect("JsonResponse should always succeed to deserialize");
serde_json::from_str(result).map_err(|e| format!("{e}"))?;
response.into()
}
@ -43,7 +43,7 @@ pub fn waku_decode_asymmetric(
message: &WakuMessage,
asymmetric_key: &SecretKey,
) -> Result<DecodedPayload> {
let sk = hex::encode(asymmetric_key.serialize());
let sk = hex::encode(asymmetric_key.secret_bytes());
let result = unsafe {
CStr::from_ptr(waku_sys::waku_decode_asymmetric(
CString::new(
@ -60,6 +60,6 @@ pub fn waku_decode_asymmetric(
.to_str()
.expect("Response should always succeed to load to a &str");
let response: JsonResponse<DecodedPayload> =
serde_json::from_str(result).expect("JsonResponse should always succeed to deserialize");
serde_json::from_str(result).map_err(|e| format!("{e}"))?;
response.into()
}

View File

@ -13,6 +13,7 @@ use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
// internal
use crate::general::{WakuMessage, WakuPubSubTopic};
use crate::MessageId;
/// Event signal
#[derive(Serialize, Deserialize)]
@ -34,7 +35,7 @@ impl Signal {
/// For now just WakuMessage is supported
#[non_exhaustive]
#[derive(Serialize, Deserialize)]
#[serde(tag = "untagged", rename_all = "camelCase")]
#[serde(untagged, rename_all = "camelCase")]
pub enum Event {
WakuMessage(WakuMessageEvent),
}
@ -46,7 +47,7 @@ pub struct WakuMessageEvent {
/// The pubsub topic on which the message was received
pubsub_topic: WakuPubSubTopic,
/// The message id
message_id: String,
message_id: MessageId,
/// The message in [`WakuMessage`] format
waku_message: WakuMessage,
}
@ -93,16 +94,29 @@ extern "C" fn callback(data: *const c_char) {
/// which are used to react to asynchronous events in Waku
pub fn waku_set_event_callback<F: FnMut(Signal) + Send + Sync + 'static>(f: F) {
set_callback(f);
unsafe { waku_sys::waku_set_event_callback(&mut callback as *mut _ as *mut std::ffi::c_void) };
unsafe { waku_sys::waku_set_event_callback(callback as *mut std::ffi::c_void) };
}
#[cfg(test)]
mod tests {
use crate::events::waku_set_event_callback;
use crate::{Event, Signal};
// TODO: how to actually send a signal and check if the callback is run?
#[test]
fn set_event_callback() {
waku_set_event_callback(|_signal| {});
}
#[test]
fn deserialize_signal() {
let s = "{\"type\":\"message\",\"event\":{\"messageId\":\"0x26ff3d7fbc950ea2158ce62fd76fd745eee0323c9eac23d0713843b0f04ea27c\",\"pubsubTopic\":\"/waku/2/default-waku/proto\",\"wakuMessage\":{\"payload\":\"SGkgZnJvbSDwn6aAIQ==\",\"contentTopic\":\"/toychat/2/huilong/proto\",\"timestamp\":1665580926660}}}";
let _: Signal = serde_json::from_str(s).unwrap();
}
#[test]
fn deserialize_event() {
let e = "{\"messageId\":\"0x26ff3d7fbc950ea2158ce62fd76fd745eee0323c9eac23d0713843b0f04ea27c\",\"pubsubTopic\":\"/waku/2/default-waku/proto\",\"wakuMessage\":{\"payload\":\"SGkgZnJvbSDwn6aAIQ==\",\"contentTopic\":\"/toychat/2/huilong/proto\",\"timestamp\":1665580926660}}";
let _: Event = serde_json::from_str(e).unwrap();
}
}

View File

@ -5,7 +5,7 @@ use std::fmt::{Display, Formatter};
use std::str::FromStr;
// crates
use aes_gcm::{Aes256Gcm, Key};
use libsecp256k1::{PublicKey, SecretKey, Signature};
use secp256k1::{ecdsa::Signature, PublicKey, SecretKey};
use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer};
use sscanf::{scanf, RegexRepresentation};
// internal
@ -18,6 +18,35 @@ pub type PeerId = String;
/// Waku message id, hex encoded sha256 digest of the message
pub type MessageId = String;
/// Protocol identifiers
#[non_exhaustive]
pub enum ProtocolId {
Store,
Lightpush,
Filter,
Relay,
}
impl ProtocolId {
pub fn as_string_with_version(&self, version: &str) -> String {
format!("{}/{}", self, version)
}
}
impl Display for ProtocolId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let tag = match self {
ProtocolId::Store => "/vac/waku/store",
ProtocolId::Lightpush => "/vac/waku/lightpush",
ProtocolId::Filter => "/vac/waku/filter",
ProtocolId::Relay => "/vac/waku/relay",
#[allow(unreachable_patterns)]
_ => unreachable!(),
};
write!(f, "{}", tag)
}
}
/// JsonResponse wrapper.
/// `go-waku` ffi returns this type as a `char *` as per the [specification](https://rfc.vac.dev/spec/36/#jsonresponse-type)
/// This is internal, as it is better to use rust plain `Result` type.
@ -51,10 +80,15 @@ pub struct WakuMessage {
payload: Vec<u8>,
/// The content topic to be set on the message
content_topic: WakuContentTopic,
// TODO: check if missing default should be 0
/// The Waku Message version number
#[serde(default)]
version: WakuMessageVersion,
/// Unix timestamp in nanoseconds
timestamp: usize,
// TODO: implement RLN fields
#[serde(flatten)]
_extras: serde_json::Value,
}
impl WakuMessage {
@ -65,11 +99,13 @@ impl WakuMessage {
timestamp: usize,
) -> Self {
let payload = payload.as_ref().to_vec();
Self {
payload,
content_topic,
version,
timestamp,
_extras: Default::default(),
}
}
@ -99,7 +135,7 @@ impl WakuMessage {
/// Try decode the message with an expected asymmetric key
///
/// wrapper around [`crate::decrypt::waku_decode_asymmetric`]
pub fn try_decode_asymmentric(&self, asymmetric_key: &SecretKey) -> Result<DecodedPayload> {
pub fn try_decode_asymmetric(&self, asymmetric_key: &SecretKey) -> Result<DecodedPayload> {
waku_decode_asymmetric(self, asymmetric_key)
}
}
@ -109,10 +145,10 @@ impl WakuMessage {
#[serde(rename_all = "camelCase")]
pub struct DecodedPayload {
/// Public key that signed the message (optional), hex encoded with 0x prefix
#[serde(deserialize_with = "deserialize_optional_pk")]
#[serde(deserialize_with = "deserialize_optional_pk", default)]
public_key: Option<PublicKey>,
/// Message signature (optional), hex encoded with 0x prefix
#[serde(deserialize_with = "deserialize_optional_signature")]
#[serde(deserialize_with = "deserialize_optional_signature", default)]
signature: Option<Signature>,
/// Decrypted message payload base64 encoded
#[serde(with = "base64_serde")]
@ -296,7 +332,7 @@ impl FromStr for WakuContentTopic {
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
if let Ok((application_name, version, content_topic_name, encoding)) =
scanf!(s, "/{}/{}/{}/{}", String, usize, String, Encoding)
scanf!(s, "/{}/{}/{}/{:/.+?/}", String, usize, String, Encoding)
{
Ok(WakuContentTopic {
application_name,
@ -366,7 +402,7 @@ impl FromStr for WakuPubSubTopic {
type Err = String;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
if let Ok((topic_name, encoding)) = scanf!(s, "/waku/v2/{}/{}", String, Encoding) {
if let Ok((topic_name, encoding)) = scanf!(s, "/waku/2/{}/{:/.+?/}", String, Encoding) {
Ok(WakuPubSubTopic {
topic_name,
encoding,
@ -439,7 +475,7 @@ where
base64_str
.map(|base64_str| {
let raw_bytes = base64::decode(base64_str).map_err(D::Error::custom)?;
PublicKey::parse_slice(&raw_bytes, None).map_err(D::Error::custom)
PublicKey::from_slice(&raw_bytes).map_err(D::Error::custom)
})
.transpose()
}
@ -450,11 +486,34 @@ pub fn deserialize_optional_signature<'de, D>(
where
D: Deserializer<'de>,
{
let base64_str: Option<String> = Option::<String>::deserialize(deserializer)?;
base64_str
.map(|base64_str| {
let raw_bytes = base64::decode(base64_str).map_err(D::Error::custom)?;
Signature::parse_der(&raw_bytes).map_err(D::Error::custom)
let hex_str: Option<String> = Option::<String>::deserialize(deserializer)?;
hex_str
.map(|hex_str| {
let raw_bytes = hex::decode(hex_str.strip_prefix("0x").unwrap_or(&hex_str))
.map_err(D::Error::custom)?;
if ![64, 65].contains(&raw_bytes.len()) {
return Err(D::Error::custom(
"Invalid signature, only 64 or 65 bytes len are supported",
));
}
Signature::from_compact(&raw_bytes[..64]).map_err(D::Error::custom)
})
.transpose()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::WakuPubSubTopic;
#[test]
fn parse_waku_topic() {
let s = "/waku/2/default-waku/proto";
let _: WakuPubSubTopic = s.parse().unwrap();
}
#[test]
fn deserialize_waku_message() {
let message = "{\"payload\":\"SGkgZnJvbSDwn6aAIQ==\",\"contentTopic\":\"/toychat/2/huilong/proto\",\"timestamp\":1665580926660}";
let _: WakuMessage = serde_json::from_str(message).unwrap();
}
}

View File

@ -8,12 +8,13 @@ mod node;
pub use node::{
waku_create_content_topic, waku_create_pubsub_topic, waku_dafault_pubsub_topic, waku_new,
Initialized, Protocol, Running, WakuNodeConfig, WakuNodeHandle, WakuPeerData, WakuPeers,
waku_store_query, Initialized, Protocol, Running, WakuNodeConfig, WakuNodeHandle, WakuPeerData,
WakuPeers,
};
pub use general::{
ContentFilter, DecodedPayload, Encoding, FilterSubscription, MessageId, MessageIndex,
PagingOptions, PeerId, StoreQuery, StoreResponse, WakuContentTopic, WakuMessage,
PagingOptions, PeerId, ProtocolId, StoreQuery, StoreResponse, WakuContentTopic, WakuMessage,
WakuMessageVersion, WakuPubSubTopic,
};

View File

@ -2,8 +2,8 @@
// std
// crates
use libsecp256k1::SecretKey;
use multiaddr::Multiaddr;
use secp256k1::SecretKey;
use serde::{Deserialize, Serialize};
// internal
@ -12,28 +12,28 @@ use serde::{Deserialize, Serialize};
#[serde(rename_all = "camelCase")]
pub struct WakuNodeConfig {
/// Listening IP address. Default `0.0.0.0`
host: Option<std::net::IpAddr>,
pub host: Option<std::net::IpAddr>,
/// Libp2p TCP listening port. Default `60000`. Use `0` for **random**
port: Option<usize>,
pub port: Option<usize>,
/// External address to advertise to other nodes. Can be ip4, ip6 or dns4, dns6.
/// If null, the multiaddress(es) generated from the ip and port specified in the config (or default ones) will be used.
/// Default: null
advertise_addr: Option<Multiaddr>,
pub advertise_addr: Option<Multiaddr>,
/// Secp256k1 private key in Hex format (`0x123...abc`). Default random
#[serde(with = "secret_key_serde")]
node_key: Option<SecretKey>,
pub node_key: Option<SecretKey>,
/// Interval in seconds for pinging peers to keep the connection alive. Default `20`
keep_alive_interval: Option<usize>,
pub keep_alive_interval: Option<usize>,
/// Enable relay protocol. Default `true`
relay: Option<bool>,
pub relay: Option<bool>,
/// The minimum number of peers required on a topic to allow broadcasting a message. Default `0`
min_peers_to_publish: Option<usize>,
pub min_peers_to_publish: Option<usize>,
/// Enable filter protocol. Default `false`
filter: Option<bool>,
pub filter: Option<bool>,
}
mod secret_key_serde {
use libsecp256k1::SecretKey;
use secp256k1::SecretKey;
use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
@ -41,7 +41,7 @@ mod secret_key_serde {
where
S: Serializer,
{
let as_string: Option<String> = key.as_ref().map(|key| hex::encode(key.serialize()));
let as_string: Option<String> = key.as_ref().map(|key| hex::encode(key.secret_bytes()));
as_string.serialize(serializer)
}
@ -55,7 +55,7 @@ mod secret_key_serde {
Some(s) => {
let key_bytes = hex::decode(s).map_err(|e| D::Error::custom(format!("{e}")))?;
Ok(Some(
SecretKey::parse_slice(&key_bytes)
SecretKey::from_slice(&key_bytes)
.map_err(|e| D::Error::custom(format!("{e}")))?,
))
}

View File

@ -5,7 +5,7 @@ use std::ffi::{CStr, CString};
use std::time::Duration;
// crates
use aes_gcm::{Aes256Gcm, Key};
use libsecp256k1::{PublicKey, SecretKey};
use secp256k1::{PublicKey, SecretKey};
// internal
use crate::general::{JsonResponse, MessageId, PeerId, Result, WakuMessage, WakuPubSubTopic};
use crate::node::waku_dafault_pubsub_topic;
@ -14,10 +14,13 @@ use crate::node::waku_dafault_pubsub_topic;
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_lightpush_publishchar-messagejson-char-topic-char-peerid-int-timeoutms)
pub fn waku_lightpush_publish(
message: &WakuMessage,
pubsub_topic: WakuPubSubTopic,
pubsub_topic: Option<WakuPubSubTopic>,
peer_id: PeerId,
timeout: Duration,
timeout: Option<Duration>,
) -> Result<MessageId> {
let pubsub_topic = pubsub_topic
.unwrap_or_else(waku_dafault_pubsub_topic)
.to_string();
let result = unsafe {
CStr::from_ptr(waku_sys::waku_lightpush_publish(
CString::new(
@ -26,16 +29,20 @@ pub fn waku_lightpush_publish(
)
.expect("CString should build properly from the serialized waku message")
.into_raw(),
CString::new(pubsub_topic.to_string())
CString::new(pubsub_topic)
.expect("CString should build properly from pubsub topic")
.into_raw(),
CString::new(peer_id)
.expect("CString should build properly from peer id")
.into_raw(),
timeout
.map(|timeout| {
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32"),
.expect("Duration as milliseconds should fit in a i32")
})
.unwrap_or(0),
))
}
.to_str()
@ -55,11 +62,11 @@ pub fn waku_lightpush_publish_encrypt_asymmetric(
peer_id: PeerId,
public_key: &PublicKey,
signing_key: Option<&SecretKey>,
timeout: Duration,
timeout: Option<Duration>,
) -> Result<MessageId> {
let pk = hex::encode(public_key.serialize());
let pk = hex::encode(public_key.serialize_uncompressed());
let sk = signing_key
.map(|signing_key| hex::encode(signing_key.serialize()))
.map(|signing_key| hex::encode(signing_key.secret_bytes()))
.unwrap_or_else(String::new);
let pubsub_topic = pubsub_topic
.unwrap_or_else(waku_dafault_pubsub_topic)
@ -84,10 +91,14 @@ pub fn waku_lightpush_publish_encrypt_asymmetric(
CString::new(sk)
.expect("CString should build properly from hex encoded signing key")
.into_raw(),
timeout
.map(|timeout| {
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32"),
.expect("Duration as milliseconds should fit in a i32")
})
.unwrap_or(0),
))
.to_str()
.expect("Response should always succeed to load to a &str")
@ -105,11 +116,11 @@ pub fn waku_lightpush_publish_encrypt_symmetric(
peer_id: PeerId,
symmetric_key: &Key<Aes256Gcm>,
signing_key: Option<&SecretKey>,
timeout: Duration,
timeout: Option<Duration>,
) -> Result<MessageId> {
let symk = hex::encode(symmetric_key.as_slice());
let sk = signing_key
.map(|signing_key| hex::encode(signing_key.serialize()))
.map(|signing_key| hex::encode(signing_key.secret_bytes()))
.unwrap_or_else(String::new);
let pubsub_topic = pubsub_topic
.unwrap_or_else(waku_dafault_pubsub_topic)
@ -134,10 +145,14 @@ pub fn waku_lightpush_publish_encrypt_symmetric(
CString::new(sk)
.expect("CString should build properly from hex encoded signing key")
.into_raw(),
timeout
.map(|timeout| {
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32"),
.expect("Duration as milliseconds should fit in a i32")
})
.unwrap_or(0),
))
.to_str()
.expect("Response should always succeed to load to a &str")

View File

@ -10,21 +10,22 @@ mod store;
// std
use aes_gcm::{Aes256Gcm, Key};
use libsecp256k1::{PublicKey, SecretKey};
use multiaddr::Multiaddr;
use secp256k1::{PublicKey, SecretKey};
use std::marker::PhantomData;
use std::sync::Mutex;
use std::time::Duration;
// crates
// internal
use crate::general::{
FilterSubscription, MessageId, PeerId, Result, StoreQuery, StoreResponse, WakuMessage,
WakuPubSubTopic,
FilterSubscription, MessageId, PeerId, ProtocolId, Result, StoreQuery, StoreResponse,
WakuMessage, WakuPubSubTopic,
};
pub use config::WakuNodeConfig;
pub use peers::{Protocol, WakuPeerData, WakuPeers};
pub use relay::{waku_create_content_topic, waku_create_pubsub_topic, waku_dafault_pubsub_topic};
pub use store::waku_store_query;
/// Shared flag to check if a waku node is already running in the current process
static WAKU_NODE_INITIALIZED: Mutex<bool> = Mutex::new(false);
@ -73,7 +74,7 @@ impl<State: WakuNodeState> WakuNodeHandle<State> {
/// Add a node multiaddress and protocol to the waku nodes peerstore
///
/// wrapper around [`peers::waku_add_peers`]
pub fn add_peer(&self, address: Multiaddr, protocol_id: usize) -> Result<PeerId> {
pub fn add_peer(&self, address: &Multiaddr, protocol_id: ProtocolId) -> Result<PeerId> {
peers::waku_add_peers(address, protocol_id)
}
}
@ -118,7 +119,7 @@ impl WakuNodeHandle<Running> {
/// wrapper around [`peers::waku_connect_peer_with_address`]
pub fn connect_peer_with_address(
&self,
address: Multiaddr,
address: &Multiaddr,
timeout: Option<Duration>,
) -> Result<()> {
peers::waku_connect_peer_with_address(address, timeout)
@ -134,7 +135,7 @@ impl WakuNodeHandle<Running> {
/// Disconnect a peer using its peerID
///
/// wrapper around [`peers::waku_disconnect_peer_with_id`]
pub fn disconnect_peer_with_id(&self, peer_id: PeerId) -> Result<()> {
pub fn disconnect_peer_with_id(&self, peer_id: &PeerId) -> Result<()> {
peers::waku_disconnect_peer_with_id(peer_id)
}
@ -159,7 +160,7 @@ impl WakuNodeHandle<Running> {
&self,
message: &WakuMessage,
pubsub_topic: Option<WakuPubSubTopic>,
timeout: Duration,
timeout: Option<Duration>,
) -> Result<MessageId> {
relay::waku_relay_publish_message(message, pubsub_topic, timeout)
}
@ -173,7 +174,7 @@ impl WakuNodeHandle<Running> {
pubsub_topic: Option<WakuPubSubTopic>,
public_key: &PublicKey,
signing_key: Option<&SecretKey>,
timeout: Duration,
timeout: Option<Duration>,
) -> Result<MessageId> {
relay::waku_relay_publish_encrypt_asymmetric(
message,
@ -193,7 +194,7 @@ impl WakuNodeHandle<Running> {
pubsub_topic: Option<WakuPubSubTopic>,
symmetric_key: &Key<Aes256Gcm>,
signing_key: Option<&SecretKey>,
timeout: Duration,
timeout: Option<Duration>,
) -> Result<MessageId> {
relay::waku_relay_publish_encrypt_symmetric(
message,
@ -229,6 +230,7 @@ impl WakuNodeHandle<Running> {
///
/// wrapper around [`store::waku_store_query`]
pub fn store_query(
&self,
query: &StoreQuery,
peer_id: PeerId,
timeout: Duration,
@ -240,10 +242,11 @@ impl WakuNodeHandle<Running> {
///
/// wrapper around [`lightpush::waku_lightpush_publish`]
pub fn lightpush_publish(
&self,
message: &WakuMessage,
pubsub_topic: WakuPubSubTopic,
pubsub_topic: Option<WakuPubSubTopic>,
peer_id: PeerId,
timeout: Duration,
timeout: Option<Duration>,
) -> Result<MessageId> {
lightpush::waku_lightpush_publish(message, pubsub_topic, peer_id, timeout)
}
@ -252,12 +255,13 @@ impl WakuNodeHandle<Running> {
///
/// wrapper around [`lightpush::waku_lightpush_publish_encrypt_asymmetric`]
pub fn lightpush_publish_encrypt_asymmetric(
&self,
message: &WakuMessage,
pubsub_topic: Option<WakuPubSubTopic>,
peer_id: PeerId,
public_key: &PublicKey,
signing_key: Option<&SecretKey>,
timeout: Duration,
timeout: Option<Duration>,
) -> Result<MessageId> {
lightpush::waku_lightpush_publish_encrypt_asymmetric(
message,
@ -273,12 +277,13 @@ impl WakuNodeHandle<Running> {
///
/// wrapper around [`lightpush::waku_lightpush_publish_encrypt_symmetric`]
pub fn lightpush_publish_encrypt_symmetric(
&self,
message: &WakuMessage,
pubsub_topic: Option<WakuPubSubTopic>,
peer_id: PeerId,
symmetric_key: &Key<Aes256Gcm>,
signing_key: Option<&SecretKey>,
timeout: Duration,
timeout: Option<Duration>,
) -> Result<MessageId> {
lightpush::waku_lightpush_publish_encrypt_symmetric(
message,
@ -294,6 +299,7 @@ impl WakuNodeHandle<Running> {
///
/// wrapper around [`filter::waku_filter_subscribe`]
pub fn filter_subscribe(
&self,
filter_subscription: &FilterSubscription,
peer_id: PeerId,
timeout: Duration,
@ -305,6 +311,7 @@ impl WakuNodeHandle<Running> {
///
/// wrapper around [`filter::waku_filter_unsubscribe`]
pub fn filter_unsubscribe(
&self,
filter_subscription: &FilterSubscription,
timeout: Duration,
) -> Result<()> {

View File

@ -7,11 +7,11 @@ use std::time::Duration;
use multiaddr::Multiaddr;
use serde::Deserialize;
// internal
use crate::general::{JsonResponse, PeerId, Result};
use crate::general::{JsonResponse, PeerId, ProtocolId, Result};
/// Add a node multiaddress and protocol to the waku nodes peerstore.
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_add_peerchar-address-char-protocolid)
pub fn waku_add_peers(address: Multiaddr, protocol_id: usize) -> Result<PeerId> {
pub fn waku_add_peers(address: &Multiaddr, protocol_id: ProtocolId) -> Result<PeerId> {
let response = unsafe {
CStr::from_ptr(waku_sys::waku_add_peer(
CString::new(address.to_string())
@ -36,7 +36,10 @@ pub fn waku_add_peers(address: Multiaddr, protocol_id: usize) -> Result<PeerId>
/// If the function execution takes longer than `timeout` value, the execution will be canceled and an error returned.
/// Use 0 for no timeout
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_connect_peerchar-address-int-timeoutms)
pub fn waku_connect_peer_with_address(address: Multiaddr, timeout: Option<Duration>) -> Result<()> {
pub fn waku_connect_peer_with_address(
address: &Multiaddr,
timeout: Option<Duration>,
) -> Result<()> {
let response = unsafe {
CStr::from_ptr(waku_sys::waku_connect(
CString::new(address.to_string())
@ -63,7 +66,7 @@ pub fn waku_connect_peer_with_address(address: Multiaddr, timeout: Option<Durati
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_connect_peeridchar-peerid-int-timeoutms)
pub fn waku_connect_peer_with_id(peer_id: PeerId, timeout: Option<Duration>) -> Result<()> {
let response = unsafe {
CStr::from_ptr(waku_sys::waku_connect(
CStr::from_ptr(waku_sys::waku_connect_peerid(
CString::new(peer_id)
.expect("CString should build properly from peer id")
.into_raw(),
@ -83,10 +86,10 @@ pub fn waku_connect_peer_with_id(peer_id: PeerId, timeout: Option<Duration>) ->
/// Disconnect a peer using its peer id
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_disconnect_peerchar-peerid)
pub fn waku_disconnect_peer_with_id(peer_id: PeerId) -> Result<()> {
pub fn waku_disconnect_peer_with_id(peer_id: &PeerId) -> Result<()> {
let response = unsafe {
CStr::from_ptr(waku_sys::waku_disconnect(
CString::new(peer_id)
CString::new(peer_id.as_bytes())
.expect("CString should build properly from peer id")
.into_raw(),
))
@ -189,6 +192,6 @@ mod tests {
],
"connected": true
}"#;
let data: WakuPeerData = serde_json::from_str(json_str).unwrap();
let _: WakuPeerData = serde_json::from_str(json_str).unwrap();
}
}

View File

@ -5,7 +5,7 @@ use std::ffi::{CStr, CString};
use std::time::Duration;
// crates
use aes_gcm::{Aes256Gcm, Key};
use libsecp256k1::{PublicKey, SecretKey};
use secp256k1::{PublicKey, SecretKey};
// internal
use crate::general::{
Encoding, JsonResponse, MessageId, Result, WakuContentTopic, WakuMessage, WakuPubSubTopic,
@ -74,7 +74,7 @@ pub fn waku_dafault_pubsub_topic() -> WakuPubSubTopic {
pub fn waku_relay_publish_message(
message: &WakuMessage,
pubsub_topic: Option<WakuPubSubTopic>,
timeout: Duration,
timeout: Option<Duration>,
) -> Result<MessageId> {
let pubsub_topic = pubsub_topic
.unwrap_or_else(waku_dafault_pubsub_topic)
@ -91,9 +91,13 @@ pub fn waku_relay_publish_message(
.expect("CString should build properly from pubsub topic")
.into_raw(),
timeout
.map(|duration| {
duration
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32"),
.expect("Duration as milliseconds should fit in a i32")
})
.unwrap_or(0),
))
}
.to_str()
@ -110,11 +114,11 @@ pub fn waku_relay_publish_encrypt_asymmetric(
pubsub_topic: Option<WakuPubSubTopic>,
public_key: &PublicKey,
signing_key: Option<&SecretKey>,
timeout: Duration,
timeout: Option<Duration>,
) -> Result<MessageId> {
let pk = hex::encode(public_key.serialize());
let pk = hex::encode(public_key.serialize_uncompressed());
let sk = signing_key
.map(|signing_key| hex::encode(signing_key.serialize()))
.map(|signing_key| hex::encode(signing_key.secret_bytes()))
.unwrap_or_else(String::new);
let pubsub_topic = pubsub_topic
.unwrap_or_else(waku_dafault_pubsub_topic)
@ -136,10 +140,14 @@ pub fn waku_relay_publish_encrypt_asymmetric(
CString::new(sk)
.expect("CString should build properly from hex encoded signing key")
.into_raw(),
timeout
.map(|timeout| {
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32"),
.expect("Duration as milliseconds should fit in a i32")
})
.unwrap_or(0),
))
.to_str()
.expect("Response should always succeed to load to a &str")
@ -156,11 +164,11 @@ pub fn waku_relay_publish_encrypt_symmetric(
pubsub_topic: Option<WakuPubSubTopic>,
symmetric_key: &Key<Aes256Gcm>,
signing_key: Option<&SecretKey>,
timeout: Duration,
timeout: Option<Duration>,
) -> Result<MessageId> {
let symk = hex::encode(symmetric_key.as_slice());
let sk = signing_key
.map(|signing_key| hex::encode(signing_key.serialize()))
.map(|signing_key| hex::encode(signing_key.secret_bytes()))
.unwrap_or_else(String::new);
let pubsub_topic = pubsub_topic
.unwrap_or_else(waku_dafault_pubsub_topic)
@ -182,10 +190,14 @@ pub fn waku_relay_publish_encrypt_symmetric(
CString::new(sk)
.expect("CString should build properly from hex encoded signing key")
.into_raw(),
timeout
.map(|timeout| {
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32"),
.expect("Duration as milliseconds should fit in a i32")
})
.unwrap_or(0),
))
.to_str()
.expect("Response should always succeed to load to a &str")

149
waku/tests/node.rs Normal file
View File

@ -0,0 +1,149 @@
use aes_gcm::{Aes256Gcm, KeyInit};
use multiaddr::Multiaddr;
use rand::thread_rng;
use secp256k1::{PublicKey, Secp256k1, SecretKey};
use std::net::IpAddr;
use std::str::FromStr;
use std::time::{Duration, SystemTime};
use waku::{
waku_new, waku_set_event_callback, Encoding, Event, ProtocolId, WakuContentTopic, WakuMessage,
WakuNodeConfig,
};
const NODES: &[&str] = &[
"/dns4/node-01.ac-cn-hongkong-c.wakuv2.test.statusim.net/tcp/30303/p2p/16Uiu2HAkvWiyFsgRhuJEb9JfjYxEkoHLgnUQmr1N5mKWnYjxYRVm",
"/dns4/node-01.do-ams3.wakuv2.test.statusim.net/tcp/30303/p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ",
"/dns4/node-01.gc-us-central1-a.wakuv2.test.statusim.net/tcp/30303/p2p/16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47Rxx5hw3q4YjS"
];
#[ignore]
#[test]
pub fn main() -> Result<(), String> {
let config = WakuNodeConfig {
host: IpAddr::from_str("0.0.0.0").ok(),
port: None,
advertise_addr: None,
node_key: None,
keep_alive_interval: None,
relay: None,
min_peers_to_publish: None,
filter: None,
};
let node = waku_new(Some(config))?;
let node = node.start()?;
println!("Node peer id: {}", node.peer_id()?);
for node_address in NODES {
let address: Multiaddr = node_address.parse().unwrap();
let peer_id = node.add_peer(&address, ProtocolId::Relay)?;
node.connect_peer_with_id(peer_id, None)?;
}
assert!(node.peers()?.len() >= NODES.len());
assert!(node.peer_count()? >= NODES.len());
assert!(node.relay_enough_peers(None)?);
let sk = SecretKey::new(&mut thread_rng());
let pk = PublicKey::from_secret_key(&Secp256k1::new(), &sk);
let ssk = Aes256Gcm::generate_key(&mut thread_rng());
let content = "Hi from 🦀!";
let content_callback = content.clone();
waku_set_event_callback(move |signal| match signal.event() {
Event::WakuMessage(message) => {
println!("Message with id [{}] received", message.message_id());
let message = message.waku_message();
let payload = if let Ok(message) = message
.try_decode_asymmetric(&sk)
.map_err(|e| println!("{e}"))
{
println!("Asymmetryc message");
message.data().to_vec()
} else if let Ok(message) = message
.try_decode_symmetric(&ssk)
.map_err(|e| println!("{e}"))
{
println!("Symmetryc message");
message.data().to_vec()
} else {
println!("Unencoded message");
message.payload().to_vec()
};
let message_content: String =
String::from_utf8(payload).expect("Message should be able to be read");
println!("Message content: {message_content}");
assert_eq!(message_content, content_callback);
}
_ => {
println!("Wtf is this event?");
}
});
// subscribe to default channel
node.relay_subscribe(None)?;
let content_topic = WakuContentTopic {
application_name: "toychat".to_string(),
version: 2,
content_topic_name: "huilong".to_string(),
encoding: Encoding::Proto,
};
let message = WakuMessage::new(
content,
content_topic,
1,
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis()
.try_into()
.unwrap(),
);
node.relay_publish_message(&message, None, None)?;
node.relay_publish_encrypt_asymmetric(&message, None, &pk, None, None)?;
node.relay_publish_encrypt_symmetric(&message, None, &ssk, None, None)?;
node.relay_publish_encrypt_asymmetric(&message, None, &pk, Some(&sk), None)?;
node.relay_publish_encrypt_symmetric(&message, None, &ssk, Some(&sk), None)?;
let peer_id = node
.peers()
.unwrap()
.iter()
.map(|peer| peer.peer_id())
.filter(|id| id.as_str() != node.peer_id().unwrap().as_str())
.next()
.unwrap()
.clone();
node.lightpush_publish(&message, None, peer_id.clone(), None)?;
node.lightpush_publish_encrypt_asymmetric(&message, None, peer_id.clone(), &pk, None, None)?;
node.lightpush_publish_encrypt_asymmetric(
&message,
None,
peer_id.clone(),
&pk,
Some(&sk),
None,
)?;
node.lightpush_publish_encrypt_symmetric(&message, None, peer_id.clone(), &ssk, None, None)?;
node.lightpush_publish_encrypt_symmetric(
&message,
None,
peer_id.clone(),
&ssk,
Some(&sk),
None,
)?;
for node_data in node.peers()? {
if node_data.peer_id() != &node.peer_id()? {
node.disconnect_peer_with_id(node_data.peer_id())?;
}
}
std::thread::sleep(Duration::from_secs(2));
node.stop()?;
Ok(())
}