feat: add filterv2 functions

This commit is contained in:
Richard Ramos 2023-10-01 10:09:37 -04:00
parent dc641645b9
commit d0b35f4aaf
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760
5 changed files with 268 additions and 30 deletions

View File

@ -217,15 +217,16 @@ impl ContentFilter {
/// The criteria to create subscription to a light node in JSON Format /// The criteria to create subscription to a light node in JSON Format
/// as per the [specification](https://rfc.vac.dev/spec/36/#filtersubscription-type) /// as per the [specification](https://rfc.vac.dev/spec/36/#filtersubscription-type)
#[derive(Clone, Serialize, Deserialize, Debug)] #[derive(Clone, Serialize, Deserialize, Debug)]
#[deprecated]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct FilterSubscription { pub struct LegacyFilterSubscription {
/// Array of [`ContentFilter`] being subscribed to / unsubscribed from /// Array of [`ContentFilter`] being subscribed to / unsubscribed from
content_filters: Vec<ContentFilter>, content_filters: Vec<ContentFilter>,
/// Optional pubsub topic /// Optional pubsub topic
pubsub_topic: Option<WakuPubSubTopic>, pubsub_topic: Option<WakuPubSubTopic>,
} }
impl FilterSubscription { impl LegacyFilterSubscription {
pub fn new(content_filters: Vec<ContentFilter>, pubsub_topic: Option<WakuPubSubTopic>) -> Self { pub fn new(content_filters: Vec<ContentFilter>, pubsub_topic: Option<WakuPubSubTopic>) -> Self {
Self { Self {
content_filters, content_filters,
@ -242,6 +243,37 @@ impl FilterSubscription {
} }
} }
/// The criteria to create subscription to a filter full node matching a content filter.
/// as per the [specification](https://rfc.vac.dev/spec/36/#filtersubscription-type)
#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct FilterSubscription {
/// mandatory, at least one required, with a max of 10
content_topics: Vec<WakuContentTopic>,
/// optional if using autosharding, mandatory if using static or named sharding.
pubsub_topic: Option<WakuPubSubTopic>,
}
impl FilterSubscription {
pub fn new(
content_topics: Vec<WakuContentTopic>,
pubsub_topic: Option<WakuPubSubTopic>,
) -> Self {
Self {
content_topics,
pubsub_topic,
}
}
pub fn content_topics(&self) -> &[WakuContentTopic] {
&self.content_topics
}
pub fn pubsub_topic(&self) -> Option<&WakuPubSubTopic> {
self.pubsub_topic.as_ref()
}
}
/// Criteria used to retrieve historical messages /// Criteria used to retrieve historical messages
#[derive(Clone, Serialize, Debug)] #[derive(Clone, Serialize, Debug)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]

View File

@ -16,9 +16,9 @@ pub use node::{
}; };
pub use general::{ pub use general::{
ContentFilter, DecodedPayload, Encoding, FilterSubscription, MessageId, MessageIndex, ContentFilter, DecodedPayload, Encoding, FilterSubscription, LegacyFilterSubscription,
PagingOptions, PeerId, ProtocolId, Result, StoreQuery, StoreResponse, WakuContentTopic, MessageId, MessageIndex, PagingOptions, PeerId, ProtocolId, Result, StoreQuery, StoreResponse,
WakuMessage, WakuMessageVersion, WakuPubSubTopic, WakuContentTopic, WakuMessage, WakuMessageVersion, WakuPubSubTopic,
}; };
pub use events::{waku_set_event_callback, Event, Signal, WakuMessageEvent}; pub use events::{waku_set_event_callback, Event, Signal, WakuMessageEvent};

View File

@ -8,11 +8,88 @@ use libc::*;
// internal // internal
use crate::general::Result; use crate::general::Result;
use crate::general::{FilterSubscription, PeerId}; use crate::general::{FilterSubscription, PeerId};
use crate::utils::{get_trampoline, handle_no_response}; use crate::utils::{get_trampoline, handle_response, handle_no_response};
/// Creates a subscription in a lightnode for messages that matches a content filter and optionally a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`) /// Creates a subscription in a lightnode for messages that matches a content filter and optionally a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`)
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_subscribechar-filterjson-char-peerid-int-timeoutms) /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_subscribechar-filterjson-char-peerid-int-timeoutms)
pub fn waku_filter_subscribe( pub fn waku_filter_subscribe(
filter_subscription: &FilterSubscription,
peer_id: Option<PeerId>,
timeout: Duration,
) -> Result<PeerId> {
let filter_subscription_ptr = CString::new(
serde_json::to_string(filter_subscription)
.expect("FilterSubscription should always succeed to serialize"),
)
.expect("FilterSubscription should always be able to be serialized")
.into_raw();
let peer_id_ptr = match peer_id {
None => CString::new(""),
Some(t) => CString::new(t),
}
.expect("CString should build properly from peer id")
.into_raw();
let mut response: String = Default::default();
let response_cb = |v: &str| response = v.to_string();
let code = unsafe {
let mut closure = response_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_filter_subscribe(
filter_subscription_ptr,
peer_id_ptr,
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32"),
cb,
&mut closure as *mut _ as *mut c_void,
);
drop(CString::from_raw(filter_subscription_ptr));
drop(CString::from_raw(peer_id_ptr));
out
};
TODO: extract the peerID from here?
handle_response(code, &response)
}
/// Used to know if a service node has an active subscription for this client
/// peerID should contain the ID of a peer we are subscribed to, supporting the filter protocol
pub fn waku_filter_ping(peer_id: PeerId, timeout: Duration) -> Result<()> {
let peer_id_ptr = CString::new(peer_id)
.expect("PeerId should always be able to be serialized")
.into_raw();
let mut error: String = Default::default();
let error_cb = |v: &str| error = v.to_string();
let code = unsafe {
let mut closure = error_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_filter_ping(
peer_id_ptr,
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32"),
cb,
&mut closure as *mut _ as *mut c_void,
);
drop(CString::from_raw(peer_id_ptr));
out
};
handle_no_response(code, &error)
}
/// Sends a requests to a service node to stop pushing messages matching this filter to this client.
/// It might be used to modify an existing subscription by providing a subset of the original filter
/// criteria
pub fn waku_filter_unsubscribe(
filter_subscription: &FilterSubscription, filter_subscription: &FilterSubscription,
peer_id: PeerId, peer_id: PeerId,
timeout: Duration, timeout: Duration,
@ -21,7 +98,7 @@ pub fn waku_filter_subscribe(
serde_json::to_string(filter_subscription) serde_json::to_string(filter_subscription)
.expect("FilterSubscription should always succeed to serialize"), .expect("FilterSubscription should always succeed to serialize"),
) )
.expect("FilterSubscription should always be able to be serialized") .expect("CString should build properly from the serialized filter subscription")
.into_raw(); .into_raw();
let peer_id_ptr = CString::new(peer_id) let peer_id_ptr = CString::new(peer_id)
.expect("PeerId should always be able to be serialized") .expect("PeerId should always be able to be serialized")
@ -32,7 +109,7 @@ pub fn waku_filter_subscribe(
let code = unsafe { let code = unsafe {
let mut closure = error_cb; let mut closure = error_cb;
let cb = get_trampoline(&closure); let cb = get_trampoline(&closure);
let out = waku_sys::waku_legacy_filter_subscribe( let out = waku_sys::waku_filter_unsubscribe(
filter_subscription_ptr, filter_subscription_ptr,
peer_id_ptr, peer_id_ptr,
timeout timeout
@ -52,17 +129,15 @@ pub fn waku_filter_subscribe(
handle_no_response(code, &error) handle_no_response(code, &error)
} }
/// Removes subscriptions in a light node matching a content filter and, optionally, a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`) /// Sends a requests to a service node (or all service nodes) to stop pushing messages
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_unsubscribechar-filterjson-int-timeoutms) /// peerID should contain the ID of a peer this client is subscribed to, or can be None
pub fn waku_filter_unsubscribe( /// to stop all active subscriptions
filter_subscription: &FilterSubscription, pub fn waku_filter_unsubscribe_all(peer_id: Option<PeerId>, timeout: Duration) -> Result<()> {
timeout: Duration, let peer_id_ptr = match peer_id {
) -> Result<()> { None => CString::new(""),
let filter_subscription_ptr = CString::new( Some(t) => CString::new(t),
serde_json::to_string(filter_subscription) }
.expect("FilterSubscription should always succeed to serialize"), .expect("CString should build properly from peer id")
)
.expect("CString should build properly from the serialized filter subscription")
.into_raw(); .into_raw();
let mut error: String = Default::default(); let mut error: String = Default::default();
@ -70,8 +145,8 @@ pub fn waku_filter_unsubscribe(
let code = unsafe { let code = unsafe {
let mut closure = error_cb; let mut closure = error_cb;
let cb = get_trampoline(&closure); let cb = get_trampoline(&closure);
let out = waku_sys::waku_legacy_filter_unsubscribe( let out = waku_sys::waku_filter_unsubscribe_all(
filter_subscription_ptr, peer_id_ptr,
timeout timeout
.as_millis() .as_millis()
.try_into() .try_into()
@ -80,7 +155,7 @@ pub fn waku_filter_unsubscribe(
&mut closure as *mut _ as *mut c_void, &mut closure as *mut _ as *mut c_void,
); );
drop(CString::from_raw(filter_subscription_ptr)); drop(CString::from_raw(peer_id_ptr));
out out
}; };

View File

@ -0,0 +1,91 @@
//! Waku [filter](https://rfc.vac.dev/spec/36/#waku-filter) protocol related methods
// std
use std::ffi::CString;
use std::time::Duration;
// crates
use libc::*;
// internal
use crate::general::Result;
use crate::general::{LegacyFilterSubscription, PeerId};
use crate::utils::{get_trampoline, handle_no_response};
/// Creates a subscription in a lightnode for messages that matches a content filter and optionally a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`)
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_legacy_filter_subscribechar-filterjson-char-peerid-int-timeoutms)
#[deprecated]
pub fn waku_legacy_filter_subscribe(
filter_subscription: &LegacyFilterSubscription,
peer_id: PeerId,
timeout: Duration,
) -> Result<()> {
let filter_subscription_ptr = CString::new(
serde_json::to_string(filter_subscription)
.expect("FilterSubscription should always succeed to serialize"),
)
.expect("FilterSubscription should always be able to be serialized")
.into_raw();
let peer_id_ptr = CString::new(peer_id)
.expect("PeerId should always be able to be serialized")
.into_raw();
let mut error: String = Default::default();
let error_cb = |v: &str| error = v.to_string();
let code = unsafe {
let mut closure = error_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_legacy_filter_subscribe(
filter_subscription_ptr,
peer_id_ptr,
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32"),
cb,
&mut closure as *mut _ as *mut c_void,
);
drop(CString::from_raw(filter_subscription_ptr));
drop(CString::from_raw(peer_id_ptr));
out
};
handle_no_response(code, &error)
}
/// Removes subscriptions in a light node matching a content filter and, optionally, a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`)
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_unsubscribechar-filterjson-int-timeoutms)
#[deprecated]
pub fn waku_legacy_filter_unsubscribe(
filter_subscription: &LegacyFilterSubscription,
timeout: Duration,
) -> Result<()> {
let filter_subscription_ptr = CString::new(
serde_json::to_string(filter_subscription)
.expect("FilterSubscription should always succeed to serialize"),
)
.expect("CString should build properly from the serialized filter subscription")
.into_raw();
let mut error: String = Default::default();
let error_cb = |v: &str| error = v.to_string();
let code = unsafe {
let mut closure = error_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_legacy_filter_unsubscribe(
filter_subscription_ptr,
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32"),
cb,
&mut closure as *mut _ as *mut c_void,
);
drop(CString::from_raw(filter_subscription_ptr));
out
};
handle_no_response(code, &error)
}

View File

@ -3,6 +3,7 @@
mod config; mod config;
mod discovery; mod discovery;
mod filter; mod filter;
mod legacyfilter;
mod lightpush; mod lightpush;
mod management; mod management;
mod peers; mod peers;
@ -20,8 +21,8 @@ use std::time::Duration;
// internal // internal
use crate::general::{ use crate::general::{
FilterSubscription, MessageId, PeerId, ProtocolId, Result, StoreQuery, StoreResponse, FilterSubscription, LegacyFilterSubscription, MessageId, PeerId, ProtocolId, Result,
WakuMessage, WakuPubSubTopic, StoreQuery, StoreResponse, WakuMessage, WakuPubSubTopic,
}; };
pub use config::{GossipSubParams, WakuLogLevel, WakuNodeConfig, WebsocketParams}; pub use config::{GossipSubParams, WakuLogLevel, WakuNodeConfig, WebsocketParams};
@ -214,23 +215,62 @@ impl WakuNodeHandle<Running> {
/// Creates a subscription in a lightnode for messages that matches a content filter and optionally a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`) /// Creates a subscription in a lightnode for messages that matches a content filter and optionally a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`)
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_subscribechar-filterjson-char-peerid-int-timeoutms) /// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_subscribechar-filterjson-char-peerid-int-timeoutms)
#[deprecated]
pub fn legacy_filter_subscribe(
&self,
filter_subscription: &LegacyFilterSubscription,
peer_id: PeerId,
timeout: Duration,
) -> Result<()> {
legacyfilter::waku_legacy_filter_subscribe(filter_subscription, peer_id, timeout)
}
/// Removes subscriptions in a light node matching a content filter and, optionally, a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`)
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_unsubscribechar-filterjson-int-timeoutms)
#[deprecated]
pub fn legacy_filter_unsubscribe(
&self,
filter_subscription: &LegacyFilterSubscription,
timeout: Duration,
) -> Result<()> {
legacyfilter::waku_legacy_filter_unsubscribe(filter_subscription, timeout)
}
/// Creates a subscription to a filter full node matching a content filter.
/// Returns the PeerId on which the filter subscription was created
pub fn filter_subscribe( pub fn filter_subscribe(
&self,
filter_subscription: &FilterSubscription,
peer_id: Option<PeerId>,
timeout: Duration,
) -> Result<PeerId> {
filter::waku_filter_subscribe(filter_subscription, peer_id, timeout)
}
/// Used to know if a service node has an active subscription for this client
pub fn filter_ping(&self, peer_id: PeerId, timeout: Duration) -> Result<()> {
filter::waku_filter_ping(peer_id, timeout)
}
/// Sends a requests to a service node to stop pushing messages matching this filter to this client.
/// It might be used to modify an existing subscription by providing a subset of the original filter
/// criteria
pub fn filter_unsubscribe(
&self, &self,
filter_subscription: &FilterSubscription, filter_subscription: &FilterSubscription,
peer_id: PeerId, peer_id: PeerId,
timeout: Duration, timeout: Duration,
) -> Result<()> { ) -> Result<()> {
filter::waku_filter_subscribe(filter_subscription, peer_id, timeout) filter::waku_filter_unsubscribe(filter_subscription, peer_id, timeout)
} }
/// Removes subscriptions in a light node matching a content filter and, optionally, a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`) /// Sends a requests to a service node (or all service nodes) to stop pushing messages
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_unsubscribechar-filterjson-int-timeoutms) pub fn filter_unsubscribe_all(
pub fn filter_unsubscribe(
&self, &self,
filter_subscription: &FilterSubscription, peer_id: Option<PeerId>,
timeout: Duration, timeout: Duration,
) -> Result<()> { ) -> Result<()> {
filter::waku_filter_unsubscribe(filter_subscription, timeout) filter::waku_filter_unsubscribe_all(peer_id, timeout)
} }
/// Update the bootnodes used by DiscoveryV5 by passing a list of ENRs /// Update the bootnodes used by DiscoveryV5 by passing a list of ENRs