diff --git a/waku-bindings/src/general/mod.rs b/waku-bindings/src/general/mod.rs index dfcbf51..7ab0e2c 100644 --- a/waku-bindings/src/general/mod.rs +++ b/waku-bindings/src/general/mod.rs @@ -217,15 +217,16 @@ impl ContentFilter { /// The criteria to create subscription to a light node in JSON Format /// as per the [specification](https://rfc.vac.dev/spec/36/#filtersubscription-type) #[derive(Clone, Serialize, Deserialize, Debug)] +#[deprecated] #[serde(rename_all = "camelCase")] -pub struct FilterSubscription { +pub struct LegacyFilterSubscription { /// Array of [`ContentFilter`] being subscribed to / unsubscribed from content_filters: Vec, /// Optional pubsub topic pubsub_topic: Option, } -impl FilterSubscription { +impl LegacyFilterSubscription { pub fn new(content_filters: Vec, pubsub_topic: Option) -> Self { Self { content_filters, @@ -242,6 +243,37 @@ impl FilterSubscription { } } +/// The criteria to create subscription to a filter full node matching a content filter. +/// as per the [specification](https://rfc.vac.dev/spec/36/#filtersubscription-type) +#[derive(Clone, Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct FilterSubscription { + /// mandatory, at least one required, with a max of 10 + content_topics: Vec, + /// optional if using autosharding, mandatory if using static or named sharding. + pubsub_topic: Option, +} + +impl FilterSubscription { + pub fn new( + content_topics: Vec, + pubsub_topic: Option, + ) -> Self { + Self { + content_topics, + pubsub_topic, + } + } + + pub fn content_topics(&self) -> &[WakuContentTopic] { + &self.content_topics + } + + pub fn pubsub_topic(&self) -> Option<&WakuPubSubTopic> { + self.pubsub_topic.as_ref() + } +} + /// Criteria used to retrieve historical messages #[derive(Clone, Serialize, Debug)] #[serde(rename_all = "camelCase")] diff --git a/waku-bindings/src/lib.rs b/waku-bindings/src/lib.rs index 4debe07..be7f049 100644 --- a/waku-bindings/src/lib.rs +++ b/waku-bindings/src/lib.rs @@ -16,9 +16,9 @@ pub use node::{ }; pub use general::{ - ContentFilter, DecodedPayload, Encoding, FilterSubscription, MessageId, MessageIndex, - PagingOptions, PeerId, ProtocolId, Result, StoreQuery, StoreResponse, WakuContentTopic, - WakuMessage, WakuMessageVersion, WakuPubSubTopic, + ContentFilter, DecodedPayload, Encoding, FilterSubscription, LegacyFilterSubscription, + MessageId, MessageIndex, PagingOptions, PeerId, ProtocolId, Result, StoreQuery, StoreResponse, + WakuContentTopic, WakuMessage, WakuMessageVersion, WakuPubSubTopic, }; pub use events::{waku_set_event_callback, Event, Signal, WakuMessageEvent}; diff --git a/waku-bindings/src/node/filter.rs b/waku-bindings/src/node/filter.rs index 8b18122..40cb21c 100644 --- a/waku-bindings/src/node/filter.rs +++ b/waku-bindings/src/node/filter.rs @@ -8,11 +8,88 @@ use libc::*; // internal use crate::general::Result; use crate::general::{FilterSubscription, PeerId}; -use crate::utils::{get_trampoline, handle_no_response}; +use crate::utils::{get_trampoline, handle_response, handle_no_response}; /// Creates a subscription in a lightnode for messages that matches a content filter and optionally a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`) /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_subscribechar-filterjson-char-peerid-int-timeoutms) pub fn waku_filter_subscribe( + filter_subscription: &FilterSubscription, + peer_id: Option, + timeout: Duration, +) -> Result { + let filter_subscription_ptr = CString::new( + serde_json::to_string(filter_subscription) + .expect("FilterSubscription should always succeed to serialize"), + ) + .expect("FilterSubscription should always be able to be serialized") + .into_raw(); + let peer_id_ptr = match peer_id { + None => CString::new(""), + Some(t) => CString::new(t), + } + .expect("CString should build properly from peer id") + .into_raw(); + + let mut response: String = Default::default(); + let response_cb = |v: &str| response = v.to_string(); + let code = unsafe { + let mut closure = response_cb; + let cb = get_trampoline(&closure); + let out = waku_sys::waku_filter_subscribe( + filter_subscription_ptr, + peer_id_ptr, + timeout + .as_millis() + .try_into() + .expect("Duration as milliseconds should fit in a i32"), + cb, + &mut closure as *mut _ as *mut c_void, + ); + + drop(CString::from_raw(filter_subscription_ptr)); + drop(CString::from_raw(peer_id_ptr)); + + out + }; + + TODO: extract the peerID from here? + handle_response(code, &response) +} + +/// Used to know if a service node has an active subscription for this client +/// peerID should contain the ID of a peer we are subscribed to, supporting the filter protocol +pub fn waku_filter_ping(peer_id: PeerId, timeout: Duration) -> Result<()> { + let peer_id_ptr = CString::new(peer_id) + .expect("PeerId should always be able to be serialized") + .into_raw(); + + let mut error: String = Default::default(); + let error_cb = |v: &str| error = v.to_string(); + let code = unsafe { + let mut closure = error_cb; + let cb = get_trampoline(&closure); + let out = waku_sys::waku_filter_ping( + peer_id_ptr, + timeout + .as_millis() + .try_into() + .expect("Duration as milliseconds should fit in a i32"), + cb, + &mut closure as *mut _ as *mut c_void, + ); + + drop(CString::from_raw(peer_id_ptr)); + + out + }; + + handle_no_response(code, &error) +} + +/// Sends a requests to a service node to stop pushing messages matching this filter to this client. +/// It might be used to modify an existing subscription by providing a subset of the original filter +/// criteria +pub fn waku_filter_unsubscribe( filter_subscription: &FilterSubscription, peer_id: PeerId, timeout: Duration, @@ -21,7 +98,7 @@ pub fn waku_filter_subscribe( serde_json::to_string(filter_subscription) .expect("FilterSubscription should always succeed to serialize"), ) - .expect("FilterSubscription should always be able to be serialized") + .expect("CString should build properly from the serialized filter subscription") .into_raw(); let peer_id_ptr = CString::new(peer_id) .expect("PeerId should always be able to be serialized") @@ -32,7 +109,7 @@ pub fn waku_filter_subscribe( let code = unsafe { let mut closure = error_cb; let cb = get_trampoline(&closure); - let out = waku_sys::waku_legacy_filter_subscribe( + let out = waku_sys::waku_filter_unsubscribe( filter_subscription_ptr, peer_id_ptr, timeout @@ -52,17 +129,15 @@ pub fn waku_filter_subscribe( handle_no_response(code, &error) } -/// Removes subscriptions in a light node matching a content filter and, optionally, a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`) -/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_unsubscribechar-filterjson-int-timeoutms) -pub fn waku_filter_unsubscribe( - filter_subscription: &FilterSubscription, - timeout: Duration, -) -> Result<()> { - let filter_subscription_ptr = CString::new( - serde_json::to_string(filter_subscription) - .expect("FilterSubscription should always succeed to serialize"), - ) - .expect("CString should build properly from the serialized filter subscription") +/// Sends a requests to a service node (or all service nodes) to stop pushing messages +/// peerID should contain the ID of a peer this client is subscribed to, or can be None +/// to stop all active subscriptions +pub fn waku_filter_unsubscribe_all(peer_id: Option, timeout: Duration) -> Result<()> { + let peer_id_ptr = match peer_id { + None => CString::new(""), + Some(t) => CString::new(t), + } + .expect("CString should build properly from peer id") .into_raw(); let mut error: String = Default::default(); @@ -70,8 +145,8 @@ pub fn waku_filter_unsubscribe( let code = unsafe { let mut closure = error_cb; let cb = get_trampoline(&closure); - let out = waku_sys::waku_legacy_filter_unsubscribe( - filter_subscription_ptr, + let out = waku_sys::waku_filter_unsubscribe_all( + peer_id_ptr, timeout .as_millis() .try_into() @@ -80,7 +155,7 @@ pub fn waku_filter_unsubscribe( &mut closure as *mut _ as *mut c_void, ); - drop(CString::from_raw(filter_subscription_ptr)); + drop(CString::from_raw(peer_id_ptr)); out }; diff --git a/waku-bindings/src/node/legacyfilter.rs b/waku-bindings/src/node/legacyfilter.rs new file mode 100644 index 0000000..4803daf --- /dev/null +++ b/waku-bindings/src/node/legacyfilter.rs @@ -0,0 +1,91 @@ +//! Waku [filter](https://rfc.vac.dev/spec/36/#waku-filter) protocol related methods + +// std +use std::ffi::CString; +use std::time::Duration; +// crates +use libc::*; +// internal +use crate::general::Result; +use crate::general::{LegacyFilterSubscription, PeerId}; +use crate::utils::{get_trampoline, handle_no_response}; + +/// Creates a subscription in a lightnode for messages that matches a content filter and optionally a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`) +/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_legacy_filter_subscribechar-filterjson-char-peerid-int-timeoutms) +#[deprecated] +pub fn waku_legacy_filter_subscribe( + filter_subscription: &LegacyFilterSubscription, + peer_id: PeerId, + timeout: Duration, +) -> Result<()> { + let filter_subscription_ptr = CString::new( + serde_json::to_string(filter_subscription) + .expect("FilterSubscription should always succeed to serialize"), + ) + .expect("FilterSubscription should always be able to be serialized") + .into_raw(); + let peer_id_ptr = CString::new(peer_id) + .expect("PeerId should always be able to be serialized") + .into_raw(); + + let mut error: String = Default::default(); + let error_cb = |v: &str| error = v.to_string(); + let code = unsafe { + let mut closure = error_cb; + let cb = get_trampoline(&closure); + let out = waku_sys::waku_legacy_filter_subscribe( + filter_subscription_ptr, + peer_id_ptr, + timeout + .as_millis() + .try_into() + .expect("Duration as milliseconds should fit in a i32"), + cb, + &mut closure as *mut _ as *mut c_void, + ); + + drop(CString::from_raw(filter_subscription_ptr)); + drop(CString::from_raw(peer_id_ptr)); + + out + }; + + handle_no_response(code, &error) +} + +/// Removes subscriptions in a light node matching a content filter and, optionally, a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`) +/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_unsubscribechar-filterjson-int-timeoutms) +#[deprecated] +pub fn waku_legacy_filter_unsubscribe( + filter_subscription: &LegacyFilterSubscription, + timeout: Duration, +) -> Result<()> { + let filter_subscription_ptr = CString::new( + serde_json::to_string(filter_subscription) + .expect("FilterSubscription should always succeed to serialize"), + ) + .expect("CString should build properly from the serialized filter subscription") + .into_raw(); + + let mut error: String = Default::default(); + let error_cb = |v: &str| error = v.to_string(); + let code = unsafe { + let mut closure = error_cb; + let cb = get_trampoline(&closure); + let out = waku_sys::waku_legacy_filter_unsubscribe( + filter_subscription_ptr, + timeout + .as_millis() + .try_into() + .expect("Duration as milliseconds should fit in a i32"), + cb, + &mut closure as *mut _ as *mut c_void, + ); + + drop(CString::from_raw(filter_subscription_ptr)); + + out + }; + + handle_no_response(code, &error) +} diff --git a/waku-bindings/src/node/mod.rs b/waku-bindings/src/node/mod.rs index 2597273..22f4a3f 100644 --- a/waku-bindings/src/node/mod.rs +++ b/waku-bindings/src/node/mod.rs @@ -3,6 +3,7 @@ mod config; mod discovery; mod filter; +mod legacyfilter; mod lightpush; mod management; mod peers; @@ -20,8 +21,8 @@ use std::time::Duration; // internal use crate::general::{ - FilterSubscription, MessageId, PeerId, ProtocolId, Result, StoreQuery, StoreResponse, - WakuMessage, WakuPubSubTopic, + FilterSubscription, LegacyFilterSubscription, MessageId, PeerId, ProtocolId, Result, + StoreQuery, StoreResponse, WakuMessage, WakuPubSubTopic, }; pub use config::{GossipSubParams, WakuLogLevel, WakuNodeConfig, WebsocketParams}; @@ -214,23 +215,62 @@ impl WakuNodeHandle { /// Creates a subscription in a lightnode for messages that matches a content filter and optionally a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`) /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_subscribechar-filterjson-char-peerid-int-timeoutms) + #[deprecated] + pub fn legacy_filter_subscribe( + &self, + filter_subscription: &LegacyFilterSubscription, + peer_id: PeerId, + timeout: Duration, + ) -> Result<()> { + legacyfilter::waku_legacy_filter_subscribe(filter_subscription, peer_id, timeout) + } + + /// Removes subscriptions in a light node matching a content filter and, optionally, a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`) + /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_unsubscribechar-filterjson-int-timeoutms) + #[deprecated] + pub fn legacy_filter_unsubscribe( + &self, + filter_subscription: &LegacyFilterSubscription, + timeout: Duration, + ) -> Result<()> { + legacyfilter::waku_legacy_filter_unsubscribe(filter_subscription, timeout) + } + + /// Creates a subscription to a filter full node matching a content filter. + /// Returns the PeerId on which the filter subscription was created pub fn filter_subscribe( + &self, + filter_subscription: &FilterSubscription, + peer_id: Option, + timeout: Duration, + ) -> Result { + filter::waku_filter_subscribe(filter_subscription, peer_id, timeout) + } + + /// Used to know if a service node has an active subscription for this client + pub fn filter_ping(&self, peer_id: PeerId, timeout: Duration) -> Result<()> { + filter::waku_filter_ping(peer_id, timeout) + } + + /// Sends a requests to a service node to stop pushing messages matching this filter to this client. + /// It might be used to modify an existing subscription by providing a subset of the original filter + /// criteria + pub fn filter_unsubscribe( &self, filter_subscription: &FilterSubscription, peer_id: PeerId, timeout: Duration, ) -> Result<()> { - filter::waku_filter_subscribe(filter_subscription, peer_id, timeout) + filter::waku_filter_unsubscribe(filter_subscription, peer_id, timeout) } - /// Removes subscriptions in a light node matching a content filter and, optionally, a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`) - /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_unsubscribechar-filterjson-int-timeoutms) - pub fn filter_unsubscribe( + /// Sends a requests to a service node (or all service nodes) to stop pushing messages + pub fn filter_unsubscribe_all( &self, - filter_subscription: &FilterSubscription, + peer_id: Option, timeout: Duration, ) -> Result<()> { - filter::waku_filter_unsubscribe(filter_subscription, timeout) + filter::waku_filter_unsubscribe_all(peer_id, timeout) } /// Update the bootnodes used by DiscoveryV5 by passing a list of ENRs