From dc32f22f4a496a4db485b1ecf7580c9e275a388b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Tue, 7 Nov 2023 15:50:35 -0400 Subject: [PATCH] feat: add filterv2 functions (#75) --- .github/workflows/codecov.yml | 2 +- .github/workflows/main.yml | 6 +- Cargo.lock | 4 +- examples/Cargo.lock | 5 +- examples/toy-chat/src/main.rs | 19 +-- waku-bindings/Cargo.toml | 4 +- waku-bindings/src/events/mod.rs | 4 +- waku-bindings/src/general/mod.rs | 172 ++++++++++++++----------- waku-bindings/src/lib.rs | 15 ++- waku-bindings/src/node/filter.rs | 157 +++++++++++++++++----- waku-bindings/src/node/legacyfilter.rs | 89 +++++++++++++ waku-bindings/src/node/mod.rs | 76 ++++++++--- waku-bindings/src/node/relay.rs | 75 +++-------- waku-bindings/src/utils.rs | 24 ++-- waku-bindings/tests/node.rs | 12 +- waku-sys/Cargo.toml | 2 +- waku-sys/vendor | 2 +- 17 files changed, 447 insertions(+), 221 deletions(-) create mode 100644 waku-bindings/src/node/legacyfilter.rs diff --git a/.github/workflows/codecov.yml b/.github/workflows/codecov.yml index 665d811..0e8238f 100644 --- a/.github/workflows/codecov.yml +++ b/.github/workflows/codecov.yml @@ -20,7 +20,7 @@ jobs: run: git submodule update --init --recursive - uses: actions/setup-go@v3 # we need go to build go-waku with: - go-version: '1.19' + go-version: '1.20' - uses: actions-rs/toolchain@v1 with: profile: minimal diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 1bd790d..9e48daa 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -29,7 +29,7 @@ jobs: run: git submodule update --init --recursive - uses: actions/setup-go@v3 # we need go to build go-waku with: - go-version: '1.19' + go-version: '1.20' - uses: actions-rs/toolchain@v1 with: profile: minimal @@ -61,7 +61,7 @@ jobs: run: git submodule update --init --recursive - uses: actions/setup-go@v3 # we need go to build go-waku with: - go-version: '1.19' + go-version: '1.20' - uses: actions-rs/toolchain@v1 with: profile: minimal @@ -88,7 +88,7 @@ jobs: run: git submodule update --init --recursive - uses: actions/setup-go@v3 # we need go to build go-waku with: - go-version: '1.19' + go-version: '1.20' - uses: actions-rs/toolchain@v1 with: profile: minimal diff --git a/Cargo.lock b/Cargo.lock index 3a6fb9f..c94c5fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1674,7 +1674,7 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] name = "waku-bindings" -version = "0.4.0" +version = "0.5.0" dependencies = [ "aes-gcm", "base64 0.21.0", @@ -1698,7 +1698,7 @@ dependencies = [ [[package]] name = "waku-sys" -version = "0.4.0" +version = "0.5.0" dependencies = [ "bindgen", ] diff --git a/examples/Cargo.lock b/examples/Cargo.lock index 7b78dab..f287384 100644 --- a/examples/Cargo.lock +++ b/examples/Cargo.lock @@ -1497,12 +1497,13 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] name = "waku-bindings" -version = "0.1.1" +version = "0.5.0" dependencies = [ "aes-gcm", "base64 0.21.0", "enr", "hex", + "libc", "multiaddr", "once_cell", "rand", @@ -1517,7 +1518,7 @@ dependencies = [ [[package]] name = "waku-sys" -version = "0.1.0" +version = "0.5.0" dependencies = [ "bindgen", ] diff --git a/examples/toy-chat/src/main.rs b/examples/toy-chat/src/main.rs index 2e5f3fe..5e21c9f 100644 --- a/examples/toy-chat/src/main.rs +++ b/examples/toy-chat/src/main.rs @@ -23,8 +23,8 @@ use tui::{ }; use unicode_width::UnicodeWidthStr; use waku_bindings::{ - waku_new, waku_set_event_callback, ContentFilter, Multiaddr, PagingOptions, ProtocolId, - Running, StoreQuery, WakuMessage, WakuNodeHandle, + waku_default_pubsub_topic, waku_new, waku_set_event_callback, ContentFilter, Multiaddr, + PagingOptions, ProtocolId, Running, StoreQuery, WakuMessage, WakuNodeHandle, }; enum InputMode { @@ -76,7 +76,7 @@ fn retrieve_history( let result = node_handle.store_query( &StoreQuery { pubsub_topic: None, - content_filters: vec![ContentFilter::new(TOY_CHAT_CONTENT_TOPIC.clone())], + content_topics: vec![TOY_CHAT_CONTENT_TOPIC.clone()], start_time: Some( (Duration::from_secs(Utc::now().timestamp() as u64) - Duration::from_secs(60 * 60 * 24)) @@ -110,7 +110,9 @@ fn setup_node_handle() -> std::result::Result, Box( meta, false, ); - if let Err(e) = - app.node_handle - .relay_publish_message(&waku_message, None, None) - { + if let Err(e) = app.node_handle.relay_publish_message( + &waku_message, + Some(waku_default_pubsub_topic()), + None, + ) { let mut out = std::io::stderr(); write!(out, "{e:?}").unwrap(); } diff --git a/waku-bindings/Cargo.toml b/waku-bindings/Cargo.toml index 1ac48c7..a045192 100644 --- a/waku-bindings/Cargo.toml +++ b/waku-bindings/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "waku-bindings" -version = "0.4.0" +version = "0.5.0" edition = "2021" authors = [ "Daniel Sanchez Quiros " @@ -26,7 +26,7 @@ serde_json = "1.0" sscanf = "0.4" smart-default = "0.6" url = "2.3" -waku-sys = { version = "0.4.0", path = "../waku-sys" } +waku-sys = { version = "0.5.0", path = "../waku-sys" } libc = "0.2" [dev-dependencies] diff --git a/waku-bindings/src/events/mod.rs b/waku-bindings/src/events/mod.rs index 445fc49..4a0ed0a 100644 --- a/waku-bindings/src/events/mod.rs +++ b/waku-bindings/src/events/mod.rs @@ -5,7 +5,7 @@ //! When an event is emitted, this callback will be triggered receiving a [`Signal`] // std -use std::ffi::{c_char, c_void, CStr}; +use std::ffi::{c_char, c_int, c_void, CStr}; use std::ops::Deref; use std::sync::Mutex; // crates @@ -79,7 +79,7 @@ fn set_callback(f: F) { /// Wrapper callback, it transformst the `*const c_char` into a [`Signal`] /// and executes the [`CALLBACK`] funtion with it -extern "C" fn callback(data: *const c_char, _user_data: *mut c_void) { +extern "C" fn callback(_ret_code: c_int, data: *const c_char, _user_data: *mut c_void) { let raw_response = unsafe { CStr::from_ptr(data) } .to_str() .expect("Not null ptr"); diff --git a/waku-bindings/src/general/mod.rs b/waku-bindings/src/general/mod.rs index dfcbf51..4214fd1 100644 --- a/waku-bindings/src/general/mod.rs +++ b/waku-bindings/src/general/mod.rs @@ -20,6 +20,8 @@ pub type WakuMessageVersion = usize; pub type PeerId = String; /// Waku message id, hex encoded sha256 digest of the message pub type MessageId = String; +/// Waku pubsub topic +pub type WakuPubSubTopic = String; /// Protocol identifiers #[non_exhaustive] @@ -199,12 +201,12 @@ impl DecodedPayload { /// as per the [specification](https://rfc.vac.dev/spec/36/#contentfilter-type) #[derive(Clone, Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] -pub struct ContentFilter { +pub struct LegacyContentFilter { /// The content topic of a Waku message content_topic: WakuContentTopic, } -impl ContentFilter { +impl LegacyContentFilter { pub fn new(content_topic: WakuContentTopic) -> Self { Self { content_topic } } @@ -218,14 +220,14 @@ impl ContentFilter { /// as per the [specification](https://rfc.vac.dev/spec/36/#filtersubscription-type) #[derive(Clone, Serialize, Deserialize, Debug)] #[serde(rename_all = "camelCase")] -pub struct FilterSubscription { +pub struct LegacyFilterSubscription { /// Array of [`ContentFilter`] being subscribed to / unsubscribed from content_filters: Vec, /// Optional pubsub topic pubsub_topic: Option, } -impl FilterSubscription { +impl LegacyFilterSubscription { pub fn new(content_filters: Vec, pubsub_topic: Option) -> Self { Self { content_filters, @@ -242,14 +244,103 @@ impl FilterSubscription { } } +/// The criteria to create subscription to a filter full node matching a content filter. +#[derive(Clone, Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct ContentFilter { + /// optional if using autosharding, mandatory if using static or named sharding. + pubsub_topic: Option, + /// mandatory, at least one required, with a max of 10 + content_topics: Vec, +} + +impl ContentFilter { + pub fn new( + pubsub_topic: Option, + content_topics: Vec, + ) -> Self { + Self { + content_topics, + pubsub_topic, + } + } + + pub fn content_topics(&self) -> &[WakuContentTopic] { + &self.content_topics + } + + pub fn pubsub_topic(&self) -> Option<&WakuPubSubTopic> { + self.pubsub_topic.as_ref() + } +} + +#[derive(Clone, Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct FilterSubscriptionDetail { + #[serde(rename = "peerID")] + peer_id: PeerId, + content_topics: Vec, + pubsub_topic: WakuPubSubTopic, +} + +impl FilterSubscriptionDetail { + pub fn new( + peer_id: PeerId, + content_topics: Vec, + pubsub_topic: WakuPubSubTopic, + ) -> Self { + Self { + peer_id, + content_topics, + pubsub_topic, + } + } + + pub fn peer_id(&self) -> &PeerId { + &self.peer_id + } + + pub fn content_topics(&self) -> &[WakuContentTopic] { + &self.content_topics + } + + pub fn pubsub_topic(&self) -> &WakuPubSubTopic { + &self.pubsub_topic + } +} + +#[derive(Clone, Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct FilterSubscriptionResult { + subscriptions: Vec, + error: Option, +} + +impl FilterSubscriptionResult { + pub fn new(subscriptions: Vec, error: Option) -> Self { + Self { + subscriptions, + error, + } + } + + pub fn subscriptions(&self) -> &[FilterSubscriptionDetail] { + &self.subscriptions + } + + pub fn error(&self) -> &Option { + &self.error + } +} + /// Criteria used to retrieve historical messages #[derive(Clone, Serialize, Debug)] #[serde(rename_all = "camelCase")] pub struct StoreQuery { /// The pubsub topic on which messages are published pub pubsub_topic: Option, - /// Array of [`ContentFilter`] to query for historical messages - pub content_filters: Vec, + /// Array of [`WakuContentTopic`] to query for historical messages + pub content_topics: Vec, /// The inclusive lower bound on the timestamp of queried messages. /// This field holds the Unix epoch time in nanoseconds pub start_time: Option, @@ -426,75 +517,6 @@ impl<'de> Deserialize<'de> for WakuContentTopic { } } -/// A waku pubsub topic in the form of `/waku/v2/{topic_name}/{encoding}` -#[derive(Clone, Debug, Eq, PartialEq)] -pub struct WakuPubSubTopic { - pub topic_name: Cow<'static, str>, - pub encoding: Encoding, -} - -impl WakuPubSubTopic { - pub const fn new(topic_name: &'static str, encoding: Encoding) -> Self { - Self { - topic_name: Cow::Borrowed(topic_name), - encoding, - } - } - - pub fn with_topic_name(topic_name: String, encoding: Encoding) -> Self { - Self { - topic_name: Cow::Owned(topic_name), - encoding, - } - } -} - -impl FromStr for WakuPubSubTopic { - type Err = String; - - fn from_str(s: &str) -> std::result::Result { - if let Ok((topic_name, encoding)) = scanf!(s, "/waku/2/{}/{:/.+?/}", String, Encoding) { - Ok(WakuPubSubTopic { - topic_name: Cow::Owned(topic_name), - encoding, - }) - } else { - Err( - format!( - "Wrong pub-sub topic format. Should be `/waku/2/{{topic-name}}/{{encoding}}`. Got: {s}" - ) - ) - } - } -} - -impl Display for WakuPubSubTopic { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "/waku/2/{}/{}", self.topic_name, self.encoding) - } -} - -impl Serialize for WakuPubSubTopic { - fn serialize(&self, serializer: S) -> std::result::Result - where - S: Serializer, - { - self.to_string().serialize(serializer) - } -} - -impl<'de> Deserialize<'de> for WakuPubSubTopic { - fn deserialize(deserializer: D) -> std::result::Result - where - D: Deserializer<'de>, - { - let as_string: String = String::deserialize(deserializer)?; - as_string - .parse::() - .map_err(D::Error::custom) - } -} - mod base64_serde { use base64::Engine; use serde::de::Error; diff --git a/waku-bindings/src/lib.rs b/waku-bindings/src/lib.rs index 4debe07..bd1291b 100644 --- a/waku-bindings/src/lib.rs +++ b/waku-bindings/src/lib.rs @@ -9,16 +9,17 @@ mod node; mod utils; pub use node::{ - waku_create_content_topic, waku_create_pubsub_topic, waku_default_pubsub_topic, - waku_discv5_update_bootnodes, waku_dns_discovery, waku_new, Aes256Gcm, DnsInfo, - GossipSubParams, Initialized, Key, Multiaddr, Protocol, PublicKey, Running, SecretKey, - WakuLogLevel, WakuNodeConfig, WakuNodeHandle, WakuPeerData, WakuPeers, WebsocketParams, + waku_create_content_topic, waku_default_pubsub_topic, waku_discv5_update_bootnodes, + waku_dns_discovery, waku_new, Aes256Gcm, DnsInfo, GossipSubParams, Initialized, Key, Multiaddr, + Protocol, PublicKey, Running, SecretKey, WakuLogLevel, WakuNodeConfig, WakuNodeHandle, + WakuPeerData, WakuPeers, WebsocketParams, }; pub use general::{ - ContentFilter, DecodedPayload, Encoding, FilterSubscription, MessageId, MessageIndex, - PagingOptions, PeerId, ProtocolId, Result, StoreQuery, StoreResponse, WakuContentTopic, - WakuMessage, WakuMessageVersion, WakuPubSubTopic, + ContentFilter, DecodedPayload, Encoding, FilterSubscriptionDetail, FilterSubscriptionResult, + LegacyContentFilter, LegacyFilterSubscription, MessageId, MessageIndex, PagingOptions, PeerId, + ProtocolId, Result, StoreQuery, StoreResponse, WakuContentTopic, WakuMessage, + WakuMessageVersion, WakuPubSubTopic, }; pub use events::{waku_set_event_callback, Event, Signal, WakuMessageEvent}; diff --git a/waku-bindings/src/node/filter.rs b/waku-bindings/src/node/filter.rs index 8b18122..70336f2 100644 --- a/waku-bindings/src/node/filter.rs +++ b/waku-bindings/src/node/filter.rs @@ -7,21 +7,105 @@ use std::time::Duration; use libc::*; // internal use crate::general::Result; -use crate::general::{FilterSubscription, PeerId}; -use crate::utils::{get_trampoline, handle_no_response}; +use crate::general::{ContentFilter, FilterSubscriptionResult, PeerId}; +use crate::utils::{get_trampoline, handle_json_response, handle_no_response}; /// Creates a subscription in a lightnode for messages that matches a content filter and optionally a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`) /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_subscribechar-filterjson-char-peerid-int-timeoutms) pub fn waku_filter_subscribe( - filter_subscription: &FilterSubscription, - peer_id: PeerId, - timeout: Duration, -) -> Result<()> { - let filter_subscription_ptr = CString::new( - serde_json::to_string(filter_subscription) - .expect("FilterSubscription should always succeed to serialize"), + content_filter: &ContentFilter, + peer_id: Option, + timeout: Option, +) -> Result { + let content_filter_ptr = CString::new( + serde_json::to_string(content_filter) + .expect("ContentFilter should always succeed to serialize"), ) - .expect("FilterSubscription should always be able to be serialized") + .expect("ContentFilter should always be able to be serialized") + .into_raw(); + let peer_id_ptr = match peer_id { + None => CString::new(""), + Some(t) => CString::new(t), + } + .expect("CString should build properly from peer id") + .into_raw(); + + let mut response: String = Default::default(); + let response_cb = |v: &str| response = v.to_string(); + let code = unsafe { + let mut closure = response_cb; + let cb = get_trampoline(&closure); + let out = waku_sys::waku_filter_subscribe( + content_filter_ptr, + peer_id_ptr, + timeout + .map(|timeout| { + timeout + .as_millis() + .try_into() + .expect("Duration as milliseconds should fit in a i32") + }) + .unwrap_or(0), + cb, + &mut closure as *mut _ as *mut c_void, + ); + + drop(CString::from_raw(content_filter_ptr)); + drop(CString::from_raw(peer_id_ptr)); + + out + }; + + handle_json_response(code, &response) +} + +/// Used to know if a service node has an active subscription for this client +/// peerID should contain the ID of a peer we are subscribed to, supporting the filter protocol +pub fn waku_filter_ping(peer_id: PeerId, timeout: Option) -> Result<()> { + let peer_id_ptr = CString::new(peer_id) + .expect("PeerId should always be able to be serialized") + .into_raw(); + + let mut error: String = Default::default(); + let error_cb = |v: &str| error = v.to_string(); + let code = unsafe { + let mut closure = error_cb; + let cb = get_trampoline(&closure); + let out = waku_sys::waku_filter_ping( + peer_id_ptr, + timeout + .map(|timeout| { + timeout + .as_millis() + .try_into() + .expect("Duration as milliseconds should fit in a i32") + }) + .unwrap_or(0), + cb, + &mut closure as *mut _ as *mut c_void, + ); + + drop(CString::from_raw(peer_id_ptr)); + + out + }; + + handle_no_response(code, &error) +} + +/// Sends a requests to a service node to stop pushing messages matching this filter to this client. +/// It might be used to modify an existing subscription by providing a subset of the original filter +/// criteria +pub fn waku_filter_unsubscribe( + content_filter: &ContentFilter, + peer_id: PeerId, + timeout: Option, +) -> Result<()> { + let content_filter_ptr = CString::new( + serde_json::to_string(content_filter) + .expect("ContentFilter should always succeed to serialize"), + ) + .expect("CString should build properly from the serialized filter subscription") .into_raw(); let peer_id_ptr = CString::new(peer_id) .expect("PeerId should always be able to be serialized") @@ -32,18 +116,22 @@ pub fn waku_filter_subscribe( let code = unsafe { let mut closure = error_cb; let cb = get_trampoline(&closure); - let out = waku_sys::waku_legacy_filter_subscribe( - filter_subscription_ptr, + let out = waku_sys::waku_filter_unsubscribe( + content_filter_ptr, peer_id_ptr, timeout - .as_millis() - .try_into() - .expect("Duration as milliseconds should fit in a i32"), + .map(|timeout| { + timeout + .as_millis() + .try_into() + .expect("Duration as milliseconds should fit in a i32") + }) + .unwrap_or(0), cb, &mut closure as *mut _ as *mut c_void, ); - drop(CString::from_raw(filter_subscription_ptr)); + drop(CString::from_raw(content_filter_ptr)); drop(CString::from_raw(peer_id_ptr)); out @@ -52,17 +140,18 @@ pub fn waku_filter_subscribe( handle_no_response(code, &error) } -/// Removes subscriptions in a light node matching a content filter and, optionally, a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`) -/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_unsubscribechar-filterjson-int-timeoutms) -pub fn waku_filter_unsubscribe( - filter_subscription: &FilterSubscription, - timeout: Duration, +/// Sends a requests to a service node (or all service nodes) to stop pushing messages +/// peerID should contain the ID of a peer this client is subscribed to, or can be None +/// to stop all active subscriptions +pub fn waku_filter_unsubscribe_all( + peer_id: Option, + timeout: Option, ) -> Result<()> { - let filter_subscription_ptr = CString::new( - serde_json::to_string(filter_subscription) - .expect("FilterSubscription should always succeed to serialize"), - ) - .expect("CString should build properly from the serialized filter subscription") + let peer_id_ptr = match peer_id { + None => CString::new(""), + Some(t) => CString::new(t), + } + .expect("CString should build properly from peer id") .into_raw(); let mut error: String = Default::default(); @@ -70,17 +159,21 @@ pub fn waku_filter_unsubscribe( let code = unsafe { let mut closure = error_cb; let cb = get_trampoline(&closure); - let out = waku_sys::waku_legacy_filter_unsubscribe( - filter_subscription_ptr, + let out = waku_sys::waku_filter_unsubscribe_all( + peer_id_ptr, timeout - .as_millis() - .try_into() - .expect("Duration as milliseconds should fit in a i32"), + .map(|timeout| { + timeout + .as_millis() + .try_into() + .expect("Duration as milliseconds should fit in a i32") + }) + .unwrap_or(0), cb, &mut closure as *mut _ as *mut c_void, ); - drop(CString::from_raw(filter_subscription_ptr)); + drop(CString::from_raw(peer_id_ptr)); out }; diff --git a/waku-bindings/src/node/legacyfilter.rs b/waku-bindings/src/node/legacyfilter.rs new file mode 100644 index 0000000..35ef177 --- /dev/null +++ b/waku-bindings/src/node/legacyfilter.rs @@ -0,0 +1,89 @@ +//! Waku [filter](https://rfc.vac.dev/spec/36/#waku-filter) protocol related methods + +// std +use std::ffi::CString; +use std::time::Duration; +// crates +use libc::*; +// internal +use crate::general::Result; +use crate::general::{LegacyFilterSubscription, PeerId}; +use crate::utils::{get_trampoline, handle_no_response}; + +/// Creates a subscription in a lightnode for messages that matches a content filter and optionally a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`) +/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_legacy_filter_subscribechar-filterjson-char-peerid-int-timeoutms) +pub fn waku_legacy_filter_subscribe( + filter_subscription: &LegacyFilterSubscription, + peer_id: PeerId, + timeout: Duration, +) -> Result<()> { + let filter_subscription_ptr = CString::new( + serde_json::to_string(filter_subscription) + .expect("FilterSubscription should always succeed to serialize"), + ) + .expect("FilterSubscription should always be able to be serialized") + .into_raw(); + let peer_id_ptr = CString::new(peer_id) + .expect("PeerId should always be able to be serialized") + .into_raw(); + + let mut error: String = Default::default(); + let error_cb = |v: &str| error = v.to_string(); + let code = unsafe { + let mut closure = error_cb; + let cb = get_trampoline(&closure); + let out = waku_sys::waku_legacy_filter_subscribe( + filter_subscription_ptr, + peer_id_ptr, + timeout + .as_millis() + .try_into() + .expect("Duration as milliseconds should fit in a i32"), + cb, + &mut closure as *mut _ as *mut c_void, + ); + + drop(CString::from_raw(filter_subscription_ptr)); + drop(CString::from_raw(peer_id_ptr)); + + out + }; + + handle_no_response(code, &error) +} + +/// Removes subscriptions in a light node matching a content filter and, optionally, a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`) +/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_unsubscribechar-filterjson-int-timeoutms) +pub fn waku_legacy_filter_unsubscribe( + filter_subscription: &LegacyFilterSubscription, + timeout: Duration, +) -> Result<()> { + let filter_subscription_ptr = CString::new( + serde_json::to_string(filter_subscription) + .expect("FilterSubscription should always succeed to serialize"), + ) + .expect("CString should build properly from the serialized filter subscription") + .into_raw(); + + let mut error: String = Default::default(); + let error_cb = |v: &str| error = v.to_string(); + let code = unsafe { + let mut closure = error_cb; + let cb = get_trampoline(&closure); + let out = waku_sys::waku_legacy_filter_unsubscribe( + filter_subscription_ptr, + timeout + .as_millis() + .try_into() + .expect("Duration as milliseconds should fit in a i32"), + cb, + &mut closure as *mut _ as *mut c_void, + ); + + drop(CString::from_raw(filter_subscription_ptr)); + + out + }; + + handle_no_response(code, &error) +} diff --git a/waku-bindings/src/node/mod.rs b/waku-bindings/src/node/mod.rs index 2597273..8339741 100644 --- a/waku-bindings/src/node/mod.rs +++ b/waku-bindings/src/node/mod.rs @@ -3,6 +3,7 @@ mod config; mod discovery; mod filter; +mod legacyfilter; mod lightpush; mod management; mod peers; @@ -20,14 +21,14 @@ use std::time::Duration; // internal use crate::general::{ - FilterSubscription, MessageId, PeerId, ProtocolId, Result, StoreQuery, StoreResponse, - WakuMessage, WakuPubSubTopic, + ContentFilter, FilterSubscriptionResult, LegacyFilterSubscription, MessageId, PeerId, + ProtocolId, Result, StoreQuery, StoreResponse, WakuMessage, WakuPubSubTopic, }; pub use config::{GossipSubParams, WakuLogLevel, WakuNodeConfig, WebsocketParams}; pub use discovery::{waku_discv5_update_bootnodes, waku_dns_discovery, DnsInfo}; pub use peers::{Protocol, WakuPeerData, WakuPeers}; -pub use relay::{waku_create_content_topic, waku_create_pubsub_topic, waku_default_pubsub_topic}; +pub use relay::{waku_create_content_topic, waku_default_pubsub_topic}; pub use store::{waku_local_store_query, waku_store_query}; /// Shared flag to check if a waku node is already running in the current process @@ -148,8 +149,9 @@ impl WakuNodeHandle { peers::waku_peers() } - /// Publish a message using Waku Relay + /// Publish a message using Waku Relay. /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms) + /// The pubsub_topic parameter is optional and if not specified it will be derived from the contentTopic. pub fn relay_publish_message( &self, message: &WakuMessage, @@ -164,14 +166,14 @@ impl WakuNodeHandle { relay::waku_enough_peers(pubsub_topic) } - /// Subscribe to a Waku Relay pubsub topic to receive messages - pub fn relay_subscribe(&self, pubsub_topic: Option) -> Result<()> { - relay::waku_relay_subscribe(pubsub_topic) + /// Subscribe to WakuRelay to receive messages matching a content filter. + pub fn relay_subscribe(&self, content_filter: &ContentFilter) -> Result<()> { + relay::waku_relay_subscribe(content_filter) } - /// Closes the pubsub subscription to a pubsub topic. No more messages will be received from this pubsub topic - pub fn relay_unsubscribe(&self, pubsub_topic: Option) -> Result<()> { - relay::waku_relay_unsubscribe(pubsub_topic) + /// Closes the pubsub subscription to stop receiving messages matching a content filter. No more messages will be received from this pubsub topic + pub fn relay_unsubscribe(&self, content_filter: &ContentFilter) -> Result<()> { + relay::waku_relay_unsubscribe(content_filter) } /// Returns the list of pubsub topics the node is subscribed to in Waku Relay @@ -200,8 +202,9 @@ impl WakuNodeHandle { store::waku_local_store_query(query) } - /// Publish a message using Waku Lightpush + /// Publish a message using Waku Lightpush. /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_lightpush_publishchar-messagejson-char-topic-char-peerid-int-timeoutms) + /// The pubsub_topic parameter is optional and if not specified it will be derived from the contentTopic. pub fn lightpush_publish( &self, message: &WakuMessage, @@ -214,23 +217,62 @@ impl WakuNodeHandle { /// Creates a subscription in a lightnode for messages that matches a content filter and optionally a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`) /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_subscribechar-filterjson-char-peerid-int-timeoutms) - pub fn filter_subscribe( + #[deprecated] + pub fn legacy_filter_subscribe( &self, - filter_subscription: &FilterSubscription, + filter_subscription: &LegacyFilterSubscription, peer_id: PeerId, timeout: Duration, ) -> Result<()> { - filter::waku_filter_subscribe(filter_subscription, peer_id, timeout) + legacyfilter::waku_legacy_filter_subscribe(filter_subscription, peer_id, timeout) } /// Removes subscriptions in a light node matching a content filter and, optionally, a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`) /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_unsubscribechar-filterjson-int-timeoutms) - pub fn filter_unsubscribe( + #[deprecated] + pub fn legacy_filter_unsubscribe( &self, - filter_subscription: &FilterSubscription, + filter_subscription: &LegacyFilterSubscription, timeout: Duration, ) -> Result<()> { - filter::waku_filter_unsubscribe(filter_subscription, timeout) + legacyfilter::waku_legacy_filter_unsubscribe(filter_subscription, timeout) + } + + /// Creates a subscription to a filter full node matching a content filter. + /// Returns the PeerId on which the filter subscription was created + pub fn filter_subscribe( + &self, + content_filter: &ContentFilter, + peer_id: Option, + timeout: Option, + ) -> Result { + filter::waku_filter_subscribe(content_filter, peer_id, timeout) + } + + /// Used to know if a service node has an active subscription for this client + pub fn filter_ping(&self, peer_id: PeerId, timeout: Option) -> Result<()> { + filter::waku_filter_ping(peer_id, timeout) + } + + /// Sends a requests to a service node to stop pushing messages matching this filter to this client. + /// It might be used to modify an existing subscription by providing a subset of the original filter + /// criteria + pub fn filter_unsubscribe( + &self, + content_filter: &ContentFilter, + peer_id: PeerId, + timeout: Option, + ) -> Result<()> { + filter::waku_filter_unsubscribe(content_filter, peer_id, timeout) + } + + /// Sends a requests to a service node (or all service nodes) to stop pushing messages + pub fn filter_unsubscribe_all( + &self, + peer_id: Option, + timeout: Option, + ) -> Result<()> { + filter::waku_filter_unsubscribe_all(peer_id, timeout) } /// Update the bootnodes used by DiscoveryV5 by passing a list of ENRs diff --git a/waku-bindings/src/node/relay.rs b/waku-bindings/src/node/relay.rs index bce3483..dd5415c 100644 --- a/waku-bindings/src/node/relay.rs +++ b/waku-bindings/src/node/relay.rs @@ -6,7 +6,9 @@ use std::time::Duration; // crates use libc::*; // internal -use crate::general::{Encoding, MessageId, Result, WakuContentTopic, WakuMessage, WakuPubSubTopic}; +use crate::general::{ + ContentFilter, Encoding, MessageId, Result, WakuContentTopic, WakuMessage, WakuPubSubTopic, +}; use crate::utils::{get_trampoline, handle_json_response, handle_no_response, handle_response}; /// Create a content topic according to [RFC 23](https://rfc.vac.dev/spec/23/) @@ -55,37 +57,6 @@ pub fn waku_create_content_topic( .expect("&str from result should always be extracted") } -/// Create a pubsub topic according to [RFC 23](https://rfc.vac.dev/spec/23/) -/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_pubsub_topicchar-name-char-encoding) -pub fn waku_create_pubsub_topic(topic_name: &str, encoding: Encoding) -> WakuPubSubTopic { - let topic_name_ptr = CString::new(topic_name) - .expect("Topic name should always transform to CString") - .into_raw(); - let encoding_ptr = CString::new(encoding.to_string()) - .expect("Encoding should always transform to CString") - .into_raw(); - - let mut result: String = Default::default(); - let result_cb = |v: &str| result = v.to_string(); - let code = unsafe { - let mut closure = result_cb; - let cb = get_trampoline(&closure); - let out = waku_sys::waku_pubsub_topic( - topic_name_ptr, - encoding_ptr, - cb, - &mut closure as *mut _ as *mut c_void, - ); - - drop(CString::from_raw(topic_name_ptr)); - drop(CString::from_raw(encoding_ptr)); - - out - }; - - handle_response(code, &result).expect("&str from result should always be extracted") -} - /// Default pubsub topic used for exchanging waku messages defined in [RFC 10](https://rfc.vac.dev/spec/10/) pub fn waku_default_pubsub_topic() -> WakuPubSubTopic { let mut result: String = Default::default(); @@ -191,27 +162,25 @@ pub fn waku_enough_peers(pubsub_topic: Option) -> Result handle_response(code, &result) } -pub fn waku_relay_subscribe(pubsub_topic: Option) -> Result<()> { - let pubsub_topic = pubsub_topic - .unwrap_or_else(waku_default_pubsub_topic) - .to_string(); - - let pubsub_topic_ptr = CString::new(pubsub_topic) - .expect("CString should build properly from pubsub topic") - .into_raw(); - +pub fn waku_relay_subscribe(content_filter: &ContentFilter) -> Result<()> { + let content_filter_ptr = CString::new( + serde_json::to_string(content_filter) + .expect("ContentFilter should always succeed to serialize"), + ) + .expect("ContentFilter should always be able to be serialized") + .into_raw(); let mut error: String = Default::default(); let error_cb = |v: &str| error = v.to_string(); let code = unsafe { let mut closure = error_cb; let cb = get_trampoline(&closure); let out = waku_sys::waku_relay_subscribe( - pubsub_topic_ptr, + content_filter_ptr, cb, &mut closure as *mut _ as *mut c_void, ); - drop(CString::from_raw(pubsub_topic_ptr)); + drop(CString::from_raw(content_filter_ptr)); out }; @@ -219,27 +188,25 @@ pub fn waku_relay_subscribe(pubsub_topic: Option) -> Result<()> handle_no_response(code, &error) } -pub fn waku_relay_unsubscribe(pubsub_topic: Option) -> Result<()> { - let pubsub_topic = pubsub_topic - .unwrap_or_else(waku_default_pubsub_topic) - .to_string(); - - let pubsub_topic_ptr = CString::new(pubsub_topic) - .expect("CString should build properly from pubsub topic") - .into_raw(); - +pub fn waku_relay_unsubscribe(content_filter: &ContentFilter) -> Result<()> { + let content_filter_ptr = CString::new( + serde_json::to_string(content_filter) + .expect("ContentFilter should always succeed to serialize"), + ) + .expect("ContentFilter should always be able to be serialized") + .into_raw(); let mut error: String = Default::default(); let error_cb = |v: &str| error = v.to_string(); let code = unsafe { let mut closure = error_cb; let cb = get_trampoline(&closure); let out = waku_sys::waku_relay_subscribe( - pubsub_topic_ptr, + content_filter_ptr, cb, &mut closure as *mut _ as *mut c_void, ); - drop(CString::from_raw(pubsub_topic_ptr)); + drop(CString::from_raw(content_filter_ptr)); out }; diff --git a/waku-bindings/src/utils.rs b/waku-bindings/src/utils.rs index 07be7a4..e945e1e 100644 --- a/waku-bindings/src/utils.rs +++ b/waku-bindings/src/utils.rs @@ -10,21 +10,27 @@ pub fn decode(input: &str) -> Result { } unsafe extern "C" fn trampoline( + _ret_code: ::std::os::raw::c_int, data: *const ::std::os::raw::c_char, user_data: *mut ::std::os::raw::c_void, ) where F: FnMut(&str), { let user_data = &mut *(user_data as *mut F); - let response = unsafe { CStr::from_ptr(data) } - .to_str() - .map_err(|err| { - format!( - "could not retrieve response from pointer returned by waku: {}", - err - ) - }) - .expect("could not retrieve response"); + + let response = if data.is_null() { + "" + } else { + unsafe { CStr::from_ptr(data) } + .to_str() + .map_err(|err| { + format!( + "could not retrieve response from pointer returned by waku: {}", + err + ) + }) + .expect("could not retrieve response") + }; user_data(response); } diff --git a/waku-bindings/tests/node.rs b/waku-bindings/tests/node.rs index e565c6d..eeb1001 100644 --- a/waku-bindings/tests/node.rs +++ b/waku-bindings/tests/node.rs @@ -10,9 +10,9 @@ use std::{collections::HashSet, str::from_utf8}; use tokio::sync::mpsc::{self, Sender}; use tokio::time; use waku_bindings::{ - waku_default_pubsub_topic, waku_new, waku_set_event_callback, Encoding, Event, GossipSubParams, - Key, MessageId, ProtocolId, Running, WakuContentTopic, WakuLogLevel, WakuMessage, - WakuNodeConfig, WakuNodeHandle, WakuPubSubTopic, + waku_default_pubsub_topic, waku_new, waku_set_event_callback, ContentFilter, Encoding, Event, + GossipSubParams, Key, MessageId, ProtocolId, Running, WakuContentTopic, WakuLogLevel, + WakuMessage, WakuNodeConfig, WakuNodeHandle, WakuPubSubTopic, }; const ECHO_TIMEOUT: u64 = 10; @@ -157,7 +157,8 @@ async fn discv5_echo() -> Result<(), String> { let ssk = Aes256Gcm::generate_key(&mut thread_rng()); // Subscribe to default channel. - node.relay_subscribe(None)?; + let content_filter = ContentFilter::new(Some(waku_default_pubsub_topic()), vec![]); + node.relay_subscribe(&content_filter)?; let content_topic = WakuContentTopic::new("toychat", 2, "huilong", Encoding::Proto); let topics = node.relay_topics()?; @@ -215,7 +216,8 @@ async fn default_echo() -> Result<(), String> { let ssk = Aes256Gcm::generate_key(&mut thread_rng()); // subscribe to default channel - node.relay_subscribe(None)?; + let content_filter = ContentFilter::new(Some(waku_default_pubsub_topic()), vec![]); + node.relay_subscribe(&content_filter)?; let content_topic = WakuContentTopic::new("toychat", 2, "huilong", Encoding::Proto); let sleep = time::sleep(Duration::from_secs(ECHO_TIMEOUT)); diff --git a/waku-sys/Cargo.toml b/waku-sys/Cargo.toml index c4fb72a..c5c6ae1 100644 --- a/waku-sys/Cargo.toml +++ b/waku-sys/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "waku-sys" -version = "0.4.0" +version = "0.5.0" edition = "2021" authors = [ "Daniel Sanchez Quiros " diff --git a/waku-sys/vendor b/waku-sys/vendor index b3bd45f..02f2800 160000 --- a/waku-sys/vendor +++ b/waku-sys/vendor @@ -1 +1 @@ -Subproject commit b3bd45f01f1211cb18fb44ced5277758ab38eee7 +Subproject commit 02f2800b046094f73d1011081daef7d897126687