refactor: remove unneeded functions and change some data types

This commit is contained in:
Richard Ramos 2024-02-08 17:16:22 -04:00
parent 9d73230c8a
commit 8755d9a7c8
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760
8 changed files with 64 additions and 1095 deletions

View File

@ -5,55 +5,17 @@ use std::borrow::Cow;
use std::fmt::{Display, Formatter};
use std::str::FromStr;
// crates
use aes_gcm::{Aes256Gcm, Key};
use base64::Engine;
use secp256k1::{ecdsa::Signature, PublicKey, SecretKey};
use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer};
use serde_aux::prelude::*;
use sscanf::{scanf, RegexRepresentation};
// internal
use crate::decrypt::{waku_decode_asymmetric, waku_decode_symmetric};
use crate::encrypt::{waku_encode_asymmetric, waku_encode_symmetric};
/// Waku message version
pub type WakuMessageVersion = usize;
/// Base58 encoded peer id
pub type PeerId = String;
/// Waku message id, hex encoded sha256 digest of the message
pub type MessageId = String;
/// Waku pubsub topic
pub type WakuPubSubTopic = String;
/// Protocol identifiers
#[non_exhaustive]
#[derive(Debug, Clone, Copy)]
pub enum ProtocolId {
Store,
Lightpush,
Filter,
Relay,
}
impl ProtocolId {
pub fn as_string_with_version(&self, version: &str) -> String {
format!("{self}/{version}")
}
}
impl Display for ProtocolId {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let tag = match self {
ProtocolId::Store => "/vac/waku/store",
ProtocolId::Lightpush => "/vac/waku/lightpush",
ProtocolId::Filter => "/vac/waku/filter",
ProtocolId::Relay => "/vac/waku/relay",
#[allow(unreachable_patterns)]
_ => unreachable!(),
};
write!(f, "{tag}")
}
}
/// Waku response, just a `Result` with an `String` error.
pub type Result<T> = std::result::Result<T, String>;
@ -129,277 +91,6 @@ impl WakuMessage {
pub fn ephemeral(&self) -> bool {
self.ephemeral
}
/// Optionally sign and encrypt a message using symmetric encryption
pub fn encode_symmetric(
&self,
symmetric_key: &Key<Aes256Gcm>,
signing_key: Option<&SecretKey>,
) -> Result<WakuMessage> {
waku_encode_symmetric(self, symmetric_key, signing_key)
}
/// Try decode the message with an expected symmetric key
///
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_decode_symmetricchar-messagejson-char-symmetrickey)
pub fn try_decode_symmetric(&self, symmetric_key: &Key<Aes256Gcm>) -> Result<DecodedPayload> {
waku_decode_symmetric(self, symmetric_key)
}
/// Optionally sign and encrypt a message using asymmetric encryption
pub fn encode_asymmetric(
&self,
public_key: &PublicKey,
signing_key: Option<&SecretKey>,
) -> Result<WakuMessage> {
waku_encode_asymmetric(self, public_key, signing_key)
}
/// Try decode the message with an expected asymmetric key
///
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_decode_asymmetricchar-messagejson-char-privatekey)
pub fn try_decode_asymmetric(&self, asymmetric_key: &SecretKey) -> Result<DecodedPayload> {
waku_decode_asymmetric(self, asymmetric_key)
}
}
/// A payload once decoded, used when a received Waku Message is encrypted
#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct DecodedPayload {
/// Public key that signed the message (optional), hex encoded with 0x prefix
#[serde(deserialize_with = "deserialize_optional_pk", default)]
public_key: Option<PublicKey>,
/// Message signature (optional), hex encoded with 0x prefix
#[serde(deserialize_with = "deserialize_optional_signature", default)]
signature: Option<Signature>,
/// Decrypted message payload base64 encoded
#[serde(with = "base64_serde")]
data: Vec<u8>,
/// Padding base64 encoded
#[serde(with = "base64_serde")]
padding: Vec<u8>,
}
impl DecodedPayload {
pub fn public_key(&self) -> Option<&PublicKey> {
self.public_key.as_ref()
}
pub fn signature(&self) -> Option<&Signature> {
self.signature.as_ref()
}
pub fn data(&self) -> &[u8] {
&self.data
}
pub fn padding(&self) -> &[u8] {
&self.padding
}
}
/// The content topic of a Waku message
/// as per the [specification](https://rfc.vac.dev/spec/36/#contentfilter-type)
#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct LegacyContentFilter {
/// The content topic of a Waku message
content_topic: WakuContentTopic,
}
impl LegacyContentFilter {
pub fn new(content_topic: WakuContentTopic) -> Self {
Self { content_topic }
}
pub fn content_topic(&self) -> &WakuContentTopic {
&self.content_topic
}
}
/// The criteria to create subscription to a light node in JSON Format
/// as per the [specification](https://rfc.vac.dev/spec/36/#filtersubscription-type)
#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct LegacyFilterSubscription {
/// Array of [`ContentFilter`] being subscribed to / unsubscribed from
content_filters: Vec<ContentFilter>,
/// Optional pubsub topic
pubsub_topic: Option<WakuPubSubTopic>,
}
impl LegacyFilterSubscription {
pub fn new(content_filters: Vec<ContentFilter>, pubsub_topic: Option<WakuPubSubTopic>) -> Self {
Self {
content_filters,
pubsub_topic,
}
}
pub fn content_filters(&self) -> &[ContentFilter] {
&self.content_filters
}
pub fn pubsub_topic(&self) -> Option<&WakuPubSubTopic> {
self.pubsub_topic.as_ref()
}
}
/// The criteria to create subscription to a filter full node matching a content filter.
#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ContentFilter {
/// optional if using autosharding, mandatory if using static or named sharding.
pubsub_topic: Option<WakuPubSubTopic>,
/// mandatory, at least one required, with a max of 10
content_topics: Vec<WakuContentTopic>,
}
impl ContentFilter {
pub fn new(
pubsub_topic: Option<WakuPubSubTopic>,
content_topics: Vec<WakuContentTopic>,
) -> Self {
Self {
content_topics,
pubsub_topic,
}
}
pub fn content_topics(&self) -> &[WakuContentTopic] {
&self.content_topics
}
pub fn pubsub_topic(&self) -> Option<&WakuPubSubTopic> {
self.pubsub_topic.as_ref()
}
}
#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct FilterSubscriptionDetail {
#[serde(rename = "peerID")]
peer_id: PeerId,
content_topics: Vec<WakuContentTopic>,
pubsub_topic: WakuPubSubTopic,
}
impl FilterSubscriptionDetail {
pub fn new(
peer_id: PeerId,
content_topics: Vec<WakuContentTopic>,
pubsub_topic: WakuPubSubTopic,
) -> Self {
Self {
peer_id,
content_topics,
pubsub_topic,
}
}
pub fn peer_id(&self) -> &PeerId {
&self.peer_id
}
pub fn content_topics(&self) -> &[WakuContentTopic] {
&self.content_topics
}
pub fn pubsub_topic(&self) -> &WakuPubSubTopic {
&self.pubsub_topic
}
}
#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct FilterSubscriptionResult {
subscriptions: Vec<FilterSubscriptionDetail>,
error: Option<String>,
}
impl FilterSubscriptionResult {
pub fn new(subscriptions: Vec<FilterSubscriptionDetail>, error: Option<String>) -> Self {
Self {
subscriptions,
error,
}
}
pub fn subscriptions(&self) -> &[FilterSubscriptionDetail] {
&self.subscriptions
}
pub fn error(&self) -> &Option<String> {
&self.error
}
}
/// Criteria used to retrieve historical messages
#[derive(Clone, Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct StoreQuery {
/// The pubsub topic on which messages are published
pub pubsub_topic: Option<WakuPubSubTopic>,
/// Array of [`WakuContentTopic`] to query for historical messages
pub content_topics: Vec<WakuContentTopic>,
/// The inclusive lower bound on the timestamp of queried messages.
/// This field holds the Unix epoch time in nanoseconds
pub start_time: Option<usize>,
/// The inclusive upper bound on the timestamp of queried messages.
/// This field holds the Unix epoch time in nanoseconds
pub end_time: Option<usize>,
/// Paging information in [`PagingOptions`] format
pub paging_options: Option<PagingOptions>,
}
/// The response received after doing a query to a store node
#[derive(Clone, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct StoreResponse {
/// Array of retrieved historical messages in [`WakuMessage`] format
#[serde(default)]
pub messages: Vec<WakuMessage>,
/// Paging information in [`PagingOptions`] format from which to resume further historical queries
pub paging_options: Option<PagingOptions>,
}
impl StoreResponse {
pub fn messages(&self) -> &[WakuMessage] {
&self.messages
}
pub fn paging_options(&self) -> Option<&PagingOptions> {
self.paging_options.as_ref()
}
}
/// Paging information
#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct PagingOptions {
/// Number of messages to retrieve per page
pub page_size: usize,
/// Message Index from which to perform pagination.
/// If not included and forward is set to `true`, paging will be performed from the beginning of the list.
/// If not included and forward is set to `false`, paging will be performed from the end of the list
pub cursor: Option<MessageIndex>,
/// `true` if paging forward, `false` if paging backward
pub forward: bool,
}
/// Pagination index type
#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct MessageIndex {
/// Hash of the message at this [``MessageIndex`]
pub digest: String,
/// UNIX timestamp in nanoseconds at which the message at this [`MessageIndex`] was received
pub receiver_time: usize,
/// UNIX timestamp in nanoseconds at which the message is generated by its sender
pub sender_time: usize,
/// The pubsub topic of the message at this [`MessageIndex`]
pub pubsub_topic: WakuPubSubTopic,
}
/// WakuMessage encoding scheme
@ -544,44 +235,6 @@ mod base64_serde {
}
}
pub fn deserialize_optional_pk<'de, D>(
deserializer: D,
) -> std::result::Result<Option<PublicKey>, D::Error>
where
D: Deserializer<'de>,
{
let base64_str: Option<String> = Option::<String>::deserialize(deserializer)?;
base64_str
.map(|base64_str| {
let raw_bytes = base64::engine::general_purpose::STANDARD
.decode(base64_str)
.map_err(D::Error::custom)?;
PublicKey::from_slice(&raw_bytes).map_err(D::Error::custom)
})
.transpose()
}
pub fn deserialize_optional_signature<'de, D>(
deserializer: D,
) -> std::result::Result<Option<Signature>, D::Error>
where
D: Deserializer<'de>,
{
let hex_str: Option<String> = Option::<String>::deserialize(deserializer)?;
hex_str
.map(|hex_str| {
let raw_bytes = hex::decode(hex_str.strip_prefix("0x").unwrap_or(&hex_str))
.map_err(D::Error::custom)?;
if ![64, 65].contains(&raw_bytes.len()) {
return Err(D::Error::custom(
"Invalid signature, only 64 or 65 bytes len are supported",
));
}
Signature::from_compact(&raw_bytes[..64]).map_err(D::Error::custom)
})
.transpose()
}
#[cfg(test)]
mod tests {
use super::*;
@ -599,38 +252,4 @@ mod tests {
let message = "{\"payload\":\"SGkgZnJvbSDwn6aAIQ==\",\"contentTopic\":\"/toychat/2/huilong/proto\",\"timestamp\":1665580926660,\"ephemeral\":true,\"meta\":\"SGkgZnJvbSDwn6aAIQ==\"}";
let _: WakuMessage = serde_json::from_str(message).unwrap();
}
#[test]
fn encode_decode() {
let content_topic = WakuContentTopic::new("hello", "2", "world", Encoding::Proto);
let message = WakuMessage::new(
"hello",
content_topic,
1,
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis()
.try_into()
.unwrap(),
Vec::new(),
false,
);
let secp = Secp256k1::new();
let signing_key = SecretKey::new(&mut rand::thread_rng());
let encrypt_key = SecretKey::new(&mut rand::thread_rng());
let public_key = PublicKey::from_secret_key(&secp, &encrypt_key);
let encoded_message = message
.encode_asymmetric(&public_key, Some(&signing_key))
.expect("could not encode");
let decoded_message = encoded_message
.try_decode_asymmetric(&encrypt_key)
.expect("could not decode");
assert!(message.payload() != encoded_message.payload());
assert!(encoded_message.version() == 1);
assert!(message.payload() == decoded_message.data());
}
}

View File

@ -1,25 +1,18 @@
//! # Waku
//!
//! Implementation on top of [`waku-bindings`](https://rfc.vac.dev/spec/36/)
mod decrypt;
mod encrypt;
mod events;
mod general;
mod node;
mod utils;
pub use node::{
waku_create_content_topic, waku_default_pubsub_topic, waku_discv5_update_bootnodes,
waku_dns_discovery, waku_new, Aes256Gcm, DnsInfo, GossipSubParams, Initialized, Key, Multiaddr,
Protocol, PublicKey, Running, SecretKey, WakuLogLevel, WakuNodeConfig, WakuNodeHandle,
WakuPeerData, WakuPeers, WebsocketParams,
waku_create_content_topic, waku_default_pubsub_topic, waku_new, Initialized, Key, Multiaddr,
PublicKey, Running, SecretKey, WakuNodeConfig, WakuNodeHandle,
};
pub use general::{
ContentFilter, DecodedPayload, Encoding, FilterSubscriptionDetail, FilterSubscriptionResult,
LegacyContentFilter, LegacyFilterSubscription, MessageId, MessageIndex, PagingOptions, PeerId,
ProtocolId, Result, StoreQuery, StoreResponse, WakuContentTopic, WakuMessage,
WakuMessageVersion, WakuPubSubTopic,
MessageId, Result, WakuContentTopic, WakuMessage, WakuMessageVersion, WakuPubSubTopic,
};
pub use events::{waku_set_event_callback, Event, Signal, WakuMessageEvent};

View File

@ -1,7 +1,5 @@
//! Waku node [configuration](https://rfc.vac.dev/spec/36/#jsonconfig-type) related items
use std::fmt::{Display, Formatter};
use std::str::FromStr;
// std
// crates
use crate::WakuPubSubTopic;
@ -26,254 +24,24 @@ pub struct WakuNodeConfig {
/// Default: null
pub advertise_addr: Option<Multiaddr>,
/// Secp256k1 private key in Hex format (`0x123...abc`). Default random
#[serde(with = "secret_key_serde")]
#[serde(with = "secret_key_serde", rename = "key")]
pub node_key: Option<SecretKey>,
/// Interval in seconds for pinging peers to keep the connection alive. Default `20`
#[default(Some(20))]
pub keep_alive_interval: Option<usize>,
/// Enable relay protocol. Default `true`
#[default(Some(true))]
pub relay: Option<bool>,
/// Enable store protocol to persist message history
#[default(Some(false))]
pub store: Option<bool>,
/// Url connection string. Accepts SQLite and PostgreSQL connection strings
#[default(Some("sqlite3://store.db".to_string()))]
pub database_url: Option<String>,
/// Max number of messages to store in the databas
#[default(Some(1000))]
pub store_retention_max_messages: Option<usize>,
/// Max number of seconds that a message will be persisted in the database, default 1 day
#[default(Some(86400))]
pub store_retention_max_seconds: Option<usize>,
pub relay_topics: Vec<WakuPubSubTopic>,
/// The minimum number of peers required on a topic to allow broadcasting a message. Default `0`
#[default(Some(0))]
pub min_peers_to_publish: Option<usize>,
/// Enable filter protocol. Default `false`
#[default(Some(false))]
#[serde(rename = "legacyFilter")]
pub filter: Option<bool>,
/// Set the log level. Default `INFO`. Allowed values "DEBUG", "INFO", "WARN", "ERROR", "DPANIC", "PANIC", "FATAL"
#[default(Some(WakuLogLevel::Info))]
pub log_level: Option<WakuLogLevel>,
/// Enable DiscoveryV5. Default `false`
#[default(Some(false))]
#[serde(rename = "discV5")]
pub discv5: Option<bool>,
/// Array of bootstrap nodes ENR.
#[serde(rename = "discV5BootstrapNodes", default)]
pub discv5_bootstrap_nodes: Vec<String>,
/// UDP port for DiscoveryV5. Default `9000`.
#[default(Some(9000))]
#[serde(rename = "discV5UDPPort")]
pub discv5_udp_port: Option<u16>,
/// Array of DNS discovery URLs
#[serde(rename = "dnsDiscoveryURLs", default)]
pub dns_discovery_urls: Vec<String>,
/// Use custom nameserver. Default `` (uses the OS nameserver)
#[default(Some("".to_string()))]
#[serde(rename = "dnsDiscoveryNameServer")]
pub dns_discovery_nameserver: Option<String>,
/// Gossipsub custom configuration.
pub gossipsub_params: Option<GossipSubParams>,
/// The domain name resolving to the node's public IPv4 address.
#[serde(rename = "dns4DomainName")]
pub dns4_domain_name: Option<String>,
/// Custom websocket support parameters
#[serde(rename = "websockets")]
pub websocket_params: Option<WebsocketParams>,
}
#[derive(Clone, SmartDefault, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct GossipSubParams {
/// Sets the optimal degree for a GossipSub topic mesh. For example, if D == 6,
/// each peer will want to have about six peers in their mesh for each topic they're subscribed to.
/// `d` should be set somewhere between `dlo` and `dhi`.
#[serde(rename = "d")]
pub d: Option<i32>,
/// Sets the lower bound on the number of peers we keep in a GossipSub topic mesh.
/// If we have fewer than dlo peers, we will attempt to graft some more into the mesh at
/// the next heartbeat.
#[serde(rename = "d_low")]
pub dlo: Option<i32>,
/// Sets the upper bound on the number of peers we keep in a GossipSub topic mesh.
/// If we have more than dhi peers, we will select some to prune from the mesh at the next heartbeat.
#[serde(rename = "d_high")]
pub dhi: Option<i32>,
/// `dscore` affects how peers are selected when pruning a mesh due to over subscription.
/// At least dscore of the retained peers will be high-scoring, while the remainder are
/// chosen randomly.
#[serde(rename = "d_score")]
pub dscore: Option<i32>,
/// Sets the quota for the number of outbound connections to maintain in a topic mesh.
/// When the mesh is pruned due to over subscription, we make sure that we have outbound connections
/// to at least dout of the survivor peers. This prevents sybil attackers from overwhelming
/// our mesh with incoming connections.
///
/// dout must be set below Dlo, and must not exceed D / 2.
#[serde(rename = "d_out")]
pub dout: Option<i32>,
/// Controls the size of the message cache used for gossip.
/// The message cache will remember messages for history_length heartbeats.
pub history_length: Option<i32>,
/// Controls how many cached message ids we will advertise in
/// IHAVE gossip messages. When asked for our seen message IDs, we will return
/// only those from the most recent history_gossip heartbeats. The slack between
/// history_gossip and history_length allows us to avoid advertising messages
/// that will be expired by the time they're requested.
///
/// history_gossip must be less than or equal to history_length to
/// avoid a runtime panic.
pub history_gossip: Option<i32>,
/// dlazy affects how many peers we will emit gossip to at each heartbeat.
/// We will send gossip to at least dlazy peers outside our mesh. The actual
/// number may be more, depending on gossip_factor and how many peers we're
/// connected to.
pub dlazy: Option<i32>,
/// `gossip_factor` affects how many peers we will emit gossip to at each heartbeat.
/// We will send gossip to gossip_factor * (total number of non-mesh peers), or
/// Dlazy, whichever is greater.
pub gossip_factor: Option<f64>,
/// Controls how many times we will allow a peer to request
/// the same message id through IWANT gossip before we start ignoring them. This is designed
/// to prevent peers from spamming us with requests and wasting our resources.
pub gossip_retransmission: Option<i32>,
/// Short delay before the heartbeat timer begins
/// after the router is initialized.
pub heartbeat_initial_delay_ms: Option<i32>,
/// Controls the time between heartbeats.
pub heartbeat_interval_seconds: Option<i32>,
/// Duration threshold for heartbeat processing before emitting
/// a warning; this would be indicative of an overloaded peer.
pub slow_heartbeat_warning: Option<f64>,
/// Controls how long we keep track of the fanout state. If it's been
/// fanout_ttl_seconds since we've published to a topic that we're not subscribed to,
/// we'll delete the fanout map for that topic.
pub fanout_ttl_seconds: Option<i32>,
/// Controls the number of peers to include in prune Peer eXchange.
/// When we prune a peer that's eligible for PX (has a good score, etc), we will try to
/// send them signed peer records for up to prune_peers other peers that we
/// know of.
pub prune_peers: Option<i32>,
/// Controls the backoff time for pruned peers. This is how long
/// a peer must wait before attempting to graft into our mesh again after being pruned.
/// When pruning a peer, we send them our value of PruneBackoff so they know
/// the minimum time to wait. Peers running older versions may not send a backoff time,
/// so if we receive a prune message without one, we will wait at least PruneBackoff
/// before attempting to re-graft.
pub prune_backoff_seconds: Option<i32>,
/// Controls the backoff time to use when unsuscribing
/// from a topic. A peer should not resubscribe to this topic before this
/// duration.
pub unsubscribe_backoff_seconds: Option<i32>,
/// Controls the number of active connection attempts for peers obtained through PX.
pub connectors: Option<i32>,
/// Sets the maximum number of pending connections for peers attempted through px.
pub max_pending_connections: Option<i32>,
/// Controls the timeout for connection attempts.
pub connection_timeout_seconds: Option<i32>,
/// Number of heartbeat ticks for attempting to reconnect direct peers
/// that are not currently connected.
pub direct_connect_ticks: Option<u64>,
/// Initial delay before opening connections to direct peers
pub direct_connect_initial_delay_seconds: Option<i32>,
/// Number of heartbeat ticks for attempting to improve the mesh
/// with opportunistic grafting. Every opportunistic_graft_ticks we will attempt to select some
/// high-scoring mesh peers to replace lower-scoring ones, if the median score of our mesh peers falls
/// below a threshold (see https://godoc.org/github.com/libp2p/go-libp2p-pubsub#PeerScoreThresholds).
pub opportunistic_graft_ticks: Option<u64>,
/// Number of peers to opportunistically graft.
pub opportunistic_graft_peers: Option<i32>,
/// If a GRAFT comes before graft_flood_threshold_seconds has elapsed since the last PRUNE,
/// then there is an extra score penalty applied to the peer through P7.
pub graft_flood_threshold_seconds: Option<i32>,
/// Maximum number of messages to include in an IHAVE message.
/// Also controls the maximum number of IHAVE ids we will accept and request with IWANT from a
/// peer within a heartbeat, to protect from IHAVE floods. You should adjust this value from the
/// default if your system is pushing more than 5000 messages in history_gossip heartbeats;
/// with the defaults this is 1666 messages/s.
#[serde(rename = "maxIHaveLength")]
pub max_ihave_length: Option<i32>,
/// Maximum number of IHAVE messages to accept from a peer within a heartbeat.
#[serde(rename = "maxIHaveMessages")]
pub max_ihave_messages: Option<i32>,
/// Time to wait for a message requested through IWANT following an IHAVE advertisement.
/// If the message is not received within this window, a broken promise is declared and
/// the router may apply bahavioural penalties.
#[serde(rename = "iwantFollowupTimeSeconds")]
pub iwant_followup_time_seconds: Option<i32>,
// Time until a previously seen message ID can be forgotten about.
#[serde(rename = "seenMessagesTTLSeconds")]
pub seen_messages_ttl_seconds: Option<i32>,
}
#[derive(Clone, SmartDefault, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct WebsocketParams {
/// Indicates if websockets support will be enabled
#[default(Some(false))]
pub enabled: Option<bool>,
/// Listening address for websocket connections. Default `0.0.0.0`
#[default(Some(std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0))))]
pub host: Option<std::net::IpAddr>,
/// TCP listening port for websocket connection. Use `0` for **random**. Default `60001`, if secure websockets support is enabled, the default is `6443“`
pub port: Option<usize>,
/// Enable secure websockets support
#[default(Some(false))]
pub secure: Option<bool>,
/// Secure websocket certificate path. Mandatory if secure websockets support is enabled.
pub cert_path: Option<String>,
/// Secure websocket key path. Mandatory if secure websockets support is enabled.
pub key_path: Option<String>,
}
#[derive(Clone, Default, Serialize, Deserialize, Debug)]
pub enum WakuLogLevel {
#[default]
Info,
Debug,
Warn,
Error,
DPanic,
Panic,
Fatal,
}
impl FromStr for WakuLogLevel {
type Err = std::io::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"info" => Ok(Self::Info),
"debug" => Ok(Self::Debug),
"warn" => Ok(Self::Warn),
"error" => Ok(Self::Error),
"dpanic" => Ok(Self::DPanic),
"panic" => Ok(Self::Panic),
"fatal" => Ok(Self::Fatal),
_ => Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Unrecognized waku log level: {s}. Allowed values \"DEBUG\", \"INFO\", \"WARN\", \"ERROR\", \"DPANIC\", \"PANIC\", \"FATAL\""),
)),
}
}
}
impl Display for WakuLogLevel {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let tag = match self {
WakuLogLevel::Info => "INFO",
WakuLogLevel::Debug => "DEBUG",
WakuLogLevel::Warn => "WARN",
WakuLogLevel::Error => "ERROR",
WakuLogLevel::DPanic => "DPANIC",
WakuLogLevel::Panic => "PANIC",
WakuLogLevel::Fatal => "FATAL",
};
write!(f, "{tag}")
}
// /// Enable store protocol to persist message history
// #[default(Some(false))]
// pub store: Option<bool>,
// /// Url connection string. Accepts SQLite and PostgreSQL connection strings
// #[default(Some("sqlite3://store.db".to_string()))]
// pub database_url: Option<String>,
// /// Max number of messages to store in the databas
// #[default(Some(1000))]
// pub store_retention_max_messages: Option<usize>,
// /// Max number of seconds that a message will be persisted in the database, default 1 day
// #[default(Some(86400))]
// pub store_retention_max_seconds: Option<usize>,
}
mod secret_key_serde {

View File

@ -1,13 +1,12 @@
//! Node lifcycle [mangement](https://rfc.vac.dev/spec/36/#node-management) related methods
// std
use multiaddr::Multiaddr;
use std::ffi::CString;
// crates
use libc::*;
// internal
use super::config::WakuNodeConfig;
use crate::general::{PeerId, Result};
use crate::general::Result;
use crate::utils::{get_trampoline, handle_json_response, handle_no_response, handle_response};
/// Instantiates a Waku node
@ -65,34 +64,6 @@ pub fn waku_stop() -> Result<()> {
handle_no_response(code, &error)
}
/// If the execution is successful, the result is the peer ID as a string (base58 encoded)
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop)
pub fn waku_peer_id() -> Result<PeerId> {
let mut result: String = Default::default();
let result_cb = |v: &str| result = v.to_string();
let code = unsafe {
let mut closure = result_cb;
let cb = get_trampoline(&closure);
waku_sys::waku_peerid(cb, &mut closure as *mut _ as *mut c_void)
};
handle_response(code, &result)
}
/// Get the multiaddresses the Waku node is listening to
/// as per [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_listen_addresses)
pub fn waku_listen_addresses() -> Result<Vec<Multiaddr>> {
let mut result: String = Default::default();
let result_cb = |v: &str| result = v.to_string();
let code = unsafe {
let mut closure = result_cb;
let cb = get_trampoline(&closure);
waku_sys::waku_listen_addresses(cb, &mut closure as *mut _ as *mut c_void)
};
handle_json_response(code, &result)
}
#[cfg(test)]
mod test {
use super::waku_new;

View File

@ -1,14 +1,9 @@
//! Waku node implementation
mod config;
mod discovery;
mod filter;
mod legacyfilter;
mod lightpush;
mod management;
mod peers;
mod relay;
mod store;
// std
pub use aes_gcm::{Aes256Gcm, Key};
@ -20,14 +15,9 @@ use std::time::Duration;
// crates
// internal
use crate::general::{
ContentFilter, FilterSubscriptionResult, LegacyFilterSubscription, MessageId, PeerId,
ProtocolId, Result, StoreQuery, StoreResponse, WakuMessage, WakuPubSubTopic,
};
use crate::general::{MessageId, Result, WakuMessage, WakuPubSubTopic};
pub use config::{GossipSubParams, WakuLogLevel, WakuNodeConfig, WebsocketParams};
pub use discovery::{waku_discv5_update_bootnodes, waku_dns_discovery, DnsInfo};
pub use peers::{Protocol, WakuPeerData, WakuPeers};
pub use config::WakuNodeConfig;
pub use relay::{waku_create_content_topic, waku_default_pubsub_topic};
/// Shared flag to check if a waku node is already running in the current process
@ -60,23 +50,23 @@ unsafe impl<State: WakuNodeState> Send for WakuNodeHandle<State> {}
unsafe impl<State: WakuNodeState> Sync for WakuNodeHandle<State> {}
impl<State: WakuNodeState> WakuNodeHandle<State> {
/// If the execution is successful, the result is the peer ID as a string (base58 encoded)
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop)
pub fn peer_id(&self) -> Result<PeerId> {
management::waku_peer_id()
}
// /// If the execution is successful, the result is the peer ID as a string (base58 encoded)
// /// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_stop)
// pub fn peer_id(&self) -> Result<PeerId> {
// management::waku_peer_id()
// }
/// Get the multiaddresses the Waku node is listening to
/// as per [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_listen_addresses)
pub fn listen_addresses(&self) -> Result<Vec<Multiaddr>> {
management::waku_listen_addresses()
}
// /// Get the multiaddresses the Waku node is listening to
// /// as per [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_listen_addresses)
// pub fn listen_addresses(&self) -> Result<Vec<Multiaddr>> {
// management::waku_listen_addresses()
// }
/// Add a node multiaddress and protocol to the waku nodes peerstore.
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_add_peerchar-address-char-protocolid)
pub fn add_peer(&self, address: &Multiaddr, protocol_id: ProtocolId) -> Result<PeerId> {
peers::waku_add_peers(address, protocol_id)
}
// /// Add a node multiaddress and protocol to the waku nodes peerstore.
// /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_add_peerchar-address-char-protocolid)
// pub fn add_peer(&self, address: &Multiaddr, protocol_id: ProtocolId) -> Result<PeerId> {
// peers::waku_add_peers(address, protocol_id)
// }
}
fn stop_node() -> Result<()> {
@ -113,39 +103,8 @@ impl WakuNodeHandle<Running> {
/// If the function execution takes longer than `timeout` value, the execution will be canceled and an error returned.
/// Use 0 for no timeout
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_connect_peerchar-address-int-timeoutms)
pub fn connect_peer_with_address(
&self,
address: &Multiaddr,
timeout: Option<Duration>,
) -> Result<()> {
peers::waku_connect_peer_with_address(address, timeout)
}
/// Dial peer using a peer id
/// If `timeout` as milliseconds doesn't fit into a `i32` it is clamped to [`i32::MAX`]
/// The peer must be already known.
/// It must have been added before with [`WakuNodeHandle::add_peer`] or previously dialed with [`WakuNodeHandle::connect_peer_with_address`]
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_connect_peeridchar-peerid-int-timeoutms)
pub fn connect_peer_with_id(&self, peer_id: &PeerId, timeout: Option<Duration>) -> Result<()> {
peers::waku_connect_peer_with_id(peer_id, timeout)
}
/// Disconnect a peer using its peer id
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_disconnect_peerchar-peerid)
pub fn disconnect_peer_with_id(&self, peer_id: &PeerId) -> Result<()> {
peers::waku_disconnect_peer_with_id(peer_id)
}
/// Get number of connected peers
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_peer_count)
pub fn peer_count(&self) -> Result<usize> {
peers::waku_peer_count()
}
/// Retrieve the list of peers known by the Waku node
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_peers)
pub fn peers(&self) -> Result<WakuPeers> {
peers::waku_peers()
pub fn connect(&self, address: &Multiaddr, timeout: Option<Duration>) -> Result<()> {
peers::waku_connect(address, timeout)
}
/// Publish a message using Waku Relay.
@ -154,129 +113,20 @@ impl WakuNodeHandle<Running> {
pub fn relay_publish_message(
&self,
message: &WakuMessage,
pubsub_topic: Option<WakuPubSubTopic>,
pubsub_topic: &WakuPubSubTopic,
timeout: Option<Duration>,
) -> Result<MessageId> {
relay::waku_relay_publish_message(message, pubsub_topic, timeout)
}
/// Determine if there are enough peers to publish a message on a given pubsub topic
pub fn relay_enough_peers(&self, pubsub_topic: Option<WakuPubSubTopic>) -> Result<bool> {
relay::waku_enough_peers(pubsub_topic)
}
/// Subscribe to WakuRelay to receive messages matching a content filter.
pub fn relay_subscribe(&self, content_filter: &ContentFilter) -> Result<()> {
relay::waku_relay_subscribe(content_filter)
pub fn relay_subscribe(&self, pubsub_topic: &WakuPubSubTopic) -> Result<()> {
relay::waku_relay_subscribe(pubsub_topic)
}
/// Closes the pubsub subscription to stop receiving messages matching a content filter. No more messages will be received from this pubsub topic
pub fn relay_unsubscribe(&self, content_filter: &ContentFilter) -> Result<()> {
relay::waku_relay_unsubscribe(content_filter)
}
/// Returns the list of pubsub topics the node is subscribed to in Waku Relay
pub fn relay_topics(&self) -> Result<Vec<String>> {
relay::waku_relay_topics()
}
/// Retrieves historical messages on specific content topics. This method may be called with [`PagingOptions`](`crate::general::PagingOptions`),
/// to retrieve historical messages on a per-page basis. If the request included [`PagingOptions`](`crate::general::PagingOptions`),
/// the node must return messages on a per-page basis and include [`PagingOptions`](`crate::general::PagingOptions`) in the response.
/// These [`PagingOptions`](`crate::general::PagingOptions`) must contain a cursor pointing to the Index from which a new page can be requested
pub fn store_query(
&self,
query: &StoreQuery,
peer_id: &PeerId,
timeout: Option<Duration>,
) -> Result<StoreResponse> {
store::waku_store_query(query, peer_id, timeout)
}
/// Retrieves locally stored historical messages on specific content topics. This method may be called with [`PagingOptions`](`crate::general::PagingOptions`),
/// to retrieve historical messages on a per-page basis. If the request included [`PagingOptions`](`crate::general::PagingOptions`),
/// the node must return messages on a per-page basis and include [`PagingOptions`](`crate::general::PagingOptions`) in the response.
/// These [`PagingOptions`](`crate::general::PagingOptions`) must contain a cursor pointing to the Index from which a new page can be requested
pub fn local_store_query(&self, query: &StoreQuery) -> Result<StoreResponse> {
store::waku_local_store_query(query)
}
/// Publish a message using Waku Lightpush.
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_lightpush_publishchar-messagejson-char-topic-char-peerid-int-timeoutms)
/// The pubsub_topic parameter is optional and if not specified it will be derived from the contentTopic.
pub fn lightpush_publish(
&self,
message: &WakuMessage,
pubsub_topic: Option<WakuPubSubTopic>,
peer_id: PeerId,
timeout: Option<Duration>,
) -> Result<MessageId> {
lightpush::waku_lightpush_publish(message, pubsub_topic, peer_id, timeout)
}
/// Creates a subscription in a lightnode for messages that matches a content filter and optionally a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`)
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_subscribechar-filterjson-char-peerid-int-timeoutms)
#[deprecated]
pub fn legacy_filter_subscribe(
&self,
filter_subscription: &LegacyFilterSubscription,
peer_id: PeerId,
timeout: Duration,
) -> Result<()> {
legacyfilter::waku_legacy_filter_subscribe(filter_subscription, peer_id, timeout)
}
/// Removes subscriptions in a light node matching a content filter and, optionally, a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`)
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_unsubscribechar-filterjson-int-timeoutms)
#[deprecated]
pub fn legacy_filter_unsubscribe(
&self,
filter_subscription: &LegacyFilterSubscription,
timeout: Duration,
) -> Result<()> {
legacyfilter::waku_legacy_filter_unsubscribe(filter_subscription, timeout)
}
/// Creates a subscription to a filter full node matching a content filter.
/// Returns the PeerId on which the filter subscription was created
pub fn filter_subscribe(
&self,
content_filter: &ContentFilter,
peer_id: Option<PeerId>,
timeout: Option<Duration>,
) -> Result<FilterSubscriptionResult> {
filter::waku_filter_subscribe(content_filter, peer_id, timeout)
}
/// Used to know if a service node has an active subscription for this client
pub fn filter_ping(&self, peer_id: PeerId, timeout: Option<Duration>) -> Result<()> {
filter::waku_filter_ping(peer_id, timeout)
}
/// Sends a requests to a service node to stop pushing messages matching this filter to this client.
/// It might be used to modify an existing subscription by providing a subset of the original filter
/// criteria
pub fn filter_unsubscribe(
&self,
content_filter: &ContentFilter,
peer_id: PeerId,
timeout: Option<Duration>,
) -> Result<()> {
filter::waku_filter_unsubscribe(content_filter, peer_id, timeout)
}
/// Sends a requests to a service node (or all service nodes) to stop pushing messages
pub fn filter_unsubscribe_all(
&self,
peer_id: Option<PeerId>,
timeout: Option<Duration>,
) -> Result<()> {
filter::waku_filter_unsubscribe_all(peer_id, timeout)
}
/// Update the bootnodes used by DiscoveryV5 by passing a list of ENRs
pub fn discv5_update_bootnodes(bootnodes: Vec<String>) -> Result<()> {
discovery::waku_discv5_update_bootnodes(bootnodes)
pub fn relay_unsubscribe(&self, pubsub_topic: &WakuPubSubTopic) -> Result<()> {
relay::waku_relay_unsubscribe(pubsub_topic)
}
}

View File

@ -6,51 +6,16 @@ use std::time::Duration;
// crates
use libc::*;
use multiaddr::Multiaddr;
use serde::Deserialize;
// internal
use crate::general::{PeerId, ProtocolId, Result};
use crate::general::Result;
use crate::utils::{get_trampoline, handle_json_response, handle_no_response, handle_response};
/// Add a node multiaddress and protocol to the waku nodes peerstore.
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_add_peerchar-address-char-protocolid)
pub fn waku_add_peers(address: &Multiaddr, protocol_id: ProtocolId) -> Result<PeerId> {
let address_ptr = CString::new(address.to_string())
.expect("CString should build properly from the address")
.into_raw();
let protocol_id_ptr = CString::new(protocol_id.to_string())
.expect("CString should build properly from the protocol id")
.into_raw();
let mut result: String = Default::default();
let result_cb = |v: &str| result = v.to_string();
let code = unsafe {
let mut closure = result_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_add_peer(
address_ptr,
protocol_id_ptr,
cb,
&mut closure as *mut _ as *mut c_void,
);
drop(CString::from_raw(address_ptr));
drop(CString::from_raw(protocol_id_ptr));
out
};
handle_response(code, &result)
}
/// Dial peer using a multiaddress
/// If `timeout` as milliseconds doesn't fit into a `i32` it is clamped to [`i32::MAX`]
/// If the function execution takes longer than `timeout` value, the execution will be canceled and an error returned.
/// Use 0 for no timeout
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_connect_peerchar-address-int-timeoutms)
pub fn waku_connect_peer_with_address(
address: &Multiaddr,
timeout: Option<Duration>,
) -> Result<()> {
pub fn waku_connect(address: &Multiaddr, timeout: Option<Duration>) -> Result<()> {
let address_ptr = CString::new(address.to_string())
.expect("CString should build properly from multiaddress")
.into_raw();
@ -76,152 +41,3 @@ pub fn waku_connect_peer_with_address(
handle_no_response(code, &error)
}
/// Dial peer using a peer id
/// If `timeout` as milliseconds doesn't fit into a `i32` it is clamped to [`i32::MAX`]
/// The peer must be already known.
/// It must have been added before with [`waku_add_peers`] or previously dialed with [`waku_connect_peer_with_address`]
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_connect_peeridchar-peerid-int-timeoutms)
pub fn waku_connect_peer_with_id(peer_id: &PeerId, timeout: Option<Duration>) -> Result<()> {
let peer_id_ptr = CString::new(peer_id.as_bytes())
.expect("CString should build properly from peer id")
.into_raw();
let mut error: String = Default::default();
let error_cb = |v: &str| error = v.to_string();
let code = unsafe {
let mut closure = error_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_connect_peerid(
peer_id_ptr,
timeout
.map(|duration| duration.as_millis().try_into().unwrap_or(i32::MAX))
.unwrap_or(0),
cb,
&mut closure as *mut _ as *mut c_void,
);
drop(CString::from_raw(peer_id_ptr));
out
};
handle_no_response(code, &error)
}
/// Disconnect a peer using its peer id
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_disconnect_peerchar-peerid)
pub fn waku_disconnect_peer_with_id(peer_id: &PeerId) -> Result<()> {
let peer_id_ptr = CString::new(peer_id.as_bytes())
.expect("CString should build properly from peer id")
.into_raw();
let mut error: String = Default::default();
let error_cb = |v: &str| error = v.to_string();
let code = unsafe {
let mut closure = error_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_disconnect(peer_id_ptr, cb, &mut closure as *mut _ as *mut c_void);
drop(CString::from_raw(peer_id_ptr));
out
};
handle_no_response(code, &error)
}
/// Get number of connected peers
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_peer_count)
pub fn waku_peer_count() -> Result<usize> {
let mut result: String = Default::default();
let result_cb = |v: &str| result = v.to_string();
let code = unsafe {
let mut closure = result_cb;
let cb = get_trampoline(&closure);
waku_sys::waku_peer_cnt(cb, &mut closure as *mut _ as *mut c_void)
};
handle_response(code, &result)
}
/// Waku peer supported protocol
///
/// Examples:
/// `"/ipfs/id/1.0.0"`
/// `"/vac/waku/relay/2.0.0"`
/// `"/ipfs/ping/1.0.0"`
pub type Protocol = String;
/// Peer data from known/connected waku nodes
#[derive(Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct WakuPeerData {
/// Waku peer id
#[serde(alias = "peerID")]
peer_id: PeerId,
/// Supported node protocols
protocols: Vec<Protocol>,
/// Node available addresses
#[serde(alias = "addrs")]
addresses: Vec<Multiaddr>,
/// Already connected flag
connected: bool,
}
impl WakuPeerData {
pub fn peer_id(&self) -> &PeerId {
&self.peer_id
}
pub fn protocols(&self) -> &[Protocol] {
&self.protocols
}
pub fn addresses(&self) -> &[Multiaddr] {
&self.addresses
}
pub fn connected(&self) -> bool {
self.connected
}
}
/// List of [`WakuPeerData`]
pub type WakuPeers = Vec<WakuPeerData>;
/// Retrieve the list of peers known by the Waku node
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_peers)
pub fn waku_peers() -> Result<WakuPeers> {
let mut result: String = Default::default();
let result_cb = |v: &str| result = v.to_string();
let code = unsafe {
let mut closure = result_cb;
let cb = get_trampoline(&closure);
waku_sys::waku_peers(cb, &mut closure as *mut _ as *mut c_void)
};
handle_json_response(code, &result)
}
#[cfg(test)]
mod tests {
use crate::node::peers::WakuPeerData;
#[test]
fn deserialize_waku_peer_data() {
let json_str = r#"{
"peerID": "16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47RedcBafeDCBA",
"protocols": [
"/ipfs/id/1.0.0",
"/vac/waku/relay/2.0.0",
"/ipfs/ping/1.0.0"
],
"addrs": [
"/ip4/1.2.3.4/tcp/30303"
],
"connected": true
}"#;
let _: WakuPeerData = serde_json::from_str(json_str).unwrap();
}
}

View File

@ -6,9 +6,7 @@ use std::time::Duration;
// crates
use libc::*;
// internal
use crate::general::{
ContentFilter, Encoding, MessageId, Result, WakuContentTopic, WakuMessage, WakuPubSubTopic,
};
use crate::general::{Encoding, MessageId, Result, WakuContentTopic, WakuMessage, WakuPubSubTopic};
use crate::utils::{get_trampoline, handle_json_response, handle_no_response, handle_response};
/// Create a content topic according to [RFC 23](https://rfc.vac.dev/spec/23/)
@ -71,30 +69,14 @@ pub fn waku_default_pubsub_topic() -> WakuPubSubTopic {
handle_response(code, &result).expect("&str from result should always be extracted")
}
/// Get the list of subscribed pubsub topics in Waku Relay.
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_relay_topics)
pub fn waku_relay_topics() -> Result<Vec<String>> {
let mut result: String = Default::default();
let result_cb = |v: &str| result = v.to_string();
let code = unsafe {
let mut closure = result_cb;
let cb = get_trampoline(&closure);
waku_sys::waku_relay_topics(cb, &mut closure as *mut _ as *mut c_void)
};
handle_json_response(code, &result)
}
/// Publish a message using Waku Relay
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms)
pub fn waku_relay_publish_message(
message: &WakuMessage,
pubsub_topic: Option<WakuPubSubTopic>,
pubsub_topic: &WakuPubSubTopic,
timeout: Option<Duration>,
) -> Result<MessageId> {
let pubsub_topic = pubsub_topic
.unwrap_or_else(waku_default_pubsub_topic)
.to_string();
let pubsub_topic = pubsub_topic.to_string();
let message_ptr = CString::new(
serde_json::to_string(&message)
@ -135,21 +117,18 @@ pub fn waku_relay_publish_message(
handle_response(code, &result)
}
pub fn waku_enough_peers(pubsub_topic: Option<WakuPubSubTopic>) -> Result<bool> {
let pubsub_topic = pubsub_topic
.unwrap_or_else(waku_default_pubsub_topic)
.to_string();
pub fn waku_relay_subscribe(pubsub_topic: &WakuPubSubTopic) -> Result<()> {
let pubsub_topic = pubsub_topic.to_string();
let pubsub_topic_ptr = CString::new(pubsub_topic)
.expect("CString should build properly from pubsub topic")
.into_raw();
let mut result: String = Default::default();
let result_cb = |v: &str| result = v.to_string();
let mut error: String = Default::default();
let error_cb = |v: &str| error = v.to_string();
let code = unsafe {
let mut closure = result_cb;
let mut closure = error_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_relay_enough_peers(
let out = waku_sys::waku_relay_subscribe(
pubsub_topic_ptr,
cb,
&mut closure as *mut _ as *mut c_void,
@ -160,54 +139,27 @@ pub fn waku_enough_peers(pubsub_topic: Option<WakuPubSubTopic>) -> Result<bool>
out
};
handle_response(code, &result)
handle_no_response(code, &error)
}
pub fn waku_relay_subscribe(content_filter: &ContentFilter) -> Result<()> {
let content_filter_ptr = CString::new(
serde_json::to_string(content_filter)
.expect("ContentFilter should always succeed to serialize"),
)
.expect("ContentFilter should always be able to be serialized")
.into_raw();
pub fn waku_relay_unsubscribe(pubsub_topic: &WakuPubSubTopic) -> Result<()> {
let pubsub_topic = pubsub_topic.to_string();
let pubsub_topic_ptr = CString::new(pubsub_topic)
.expect("CString should build properly from pubsub topic")
.into_raw();
let mut error: String = Default::default();
let error_cb = |v: &str| error = v.to_string();
let code = unsafe {
let mut closure = error_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_relay_subscribe(
content_filter_ptr,
pubsub_topic_ptr,
cb,
&mut closure as *mut _ as *mut c_void,
);
drop(CString::from_raw(content_filter_ptr));
out
};
handle_no_response(code, &error)
}
pub fn waku_relay_unsubscribe(content_filter: &ContentFilter) -> Result<()> {
let content_filter_ptr = CString::new(
serde_json::to_string(content_filter)
.expect("ContentFilter should always succeed to serialize"),
)
.expect("ContentFilter should always be able to be serialized")
.into_raw();
let mut error: String = Default::default();
let error_cb = |v: &str| error = v.to_string();
let code = unsafe {
let mut closure = error_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_relay_subscribe(
content_filter_ptr,
cb,
&mut closure as *mut _ as *mut c_void,
);
drop(CString::from_raw(content_filter_ptr));
drop(CString::from_raw(pubsub_topic_ptr));
out
};

View File

@ -43,7 +43,7 @@ where
}
pub fn handle_no_response(code: i32, error: &str) -> Result<()> {
match code {
match code as u32 {
RET_OK => Ok(()),
RET_ERR => Err(format!("waku error: {}", error)),
RET_MISSING_CALLBACK => Err("missing callback".to_string()),
@ -52,7 +52,7 @@ pub fn handle_no_response(code: i32, error: &str) -> Result<()> {
}
pub fn handle_json_response<F: DeserializeOwned>(code: i32, result: &str) -> Result<F> {
match code {
match code as u32 {
RET_OK => decode(result),
RET_ERR => Err(format!("waku error: {}", result)),
RET_MISSING_CALLBACK => Err("missing callback".to_string()),
@ -61,7 +61,7 @@ pub fn handle_json_response<F: DeserializeOwned>(code: i32, result: &str) -> Res
}
pub fn handle_response<F: FromStr>(code: i32, result: &str) -> Result<F> {
match code {
match code as u32 {
RET_OK => result
.parse()
.map_err(|_| format!("could not parse value: {}", result)),