feat: add filterv2 functions (#75)

This commit is contained in:
richΛrd 2023-11-07 15:50:35 -04:00 committed by GitHub
parent dc641645b9
commit dc32f22f4a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 447 additions and 221 deletions

View File

@ -20,7 +20,7 @@ jobs:
run: git submodule update --init --recursive
- uses: actions/setup-go@v3 # we need go to build go-waku
with:
go-version: '1.19'
go-version: '1.20'
- uses: actions-rs/toolchain@v1
with:
profile: minimal

View File

@ -29,7 +29,7 @@ jobs:
run: git submodule update --init --recursive
- uses: actions/setup-go@v3 # we need go to build go-waku
with:
go-version: '1.19'
go-version: '1.20'
- uses: actions-rs/toolchain@v1
with:
profile: minimal
@ -61,7 +61,7 @@ jobs:
run: git submodule update --init --recursive
- uses: actions/setup-go@v3 # we need go to build go-waku
with:
go-version: '1.19'
go-version: '1.20'
- uses: actions-rs/toolchain@v1
with:
profile: minimal
@ -88,7 +88,7 @@ jobs:
run: git submodule update --init --recursive
- uses: actions/setup-go@v3 # we need go to build go-waku
with:
go-version: '1.19'
go-version: '1.20'
- uses: actions-rs/toolchain@v1
with:
profile: minimal

4
Cargo.lock generated
View File

@ -1674,7 +1674,7 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "waku-bindings"
version = "0.4.0"
version = "0.5.0"
dependencies = [
"aes-gcm",
"base64 0.21.0",
@ -1698,7 +1698,7 @@ dependencies = [
[[package]]
name = "waku-sys"
version = "0.4.0"
version = "0.5.0"
dependencies = [
"bindgen",
]

5
examples/Cargo.lock generated
View File

@ -1497,12 +1497,13 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "waku-bindings"
version = "0.1.1"
version = "0.5.0"
dependencies = [
"aes-gcm",
"base64 0.21.0",
"enr",
"hex",
"libc",
"multiaddr",
"once_cell",
"rand",
@ -1517,7 +1518,7 @@ dependencies = [
[[package]]
name = "waku-sys"
version = "0.1.0"
version = "0.5.0"
dependencies = [
"bindgen",
]

View File

@ -23,8 +23,8 @@ use tui::{
};
use unicode_width::UnicodeWidthStr;
use waku_bindings::{
waku_new, waku_set_event_callback, ContentFilter, Multiaddr, PagingOptions, ProtocolId,
Running, StoreQuery, WakuMessage, WakuNodeHandle,
waku_default_pubsub_topic, waku_new, waku_set_event_callback, ContentFilter, Multiaddr,
PagingOptions, ProtocolId, Running, StoreQuery, WakuMessage, WakuNodeHandle,
};
enum InputMode {
@ -76,7 +76,7 @@ fn retrieve_history(
let result = node_handle.store_query(
&StoreQuery {
pubsub_topic: None,
content_filters: vec![ContentFilter::new(TOY_CHAT_CONTENT_TOPIC.clone())],
content_topics: vec![TOY_CHAT_CONTENT_TOPIC.clone()],
start_time: Some(
(Duration::from_secs(Utc::now().timestamp() as u64)
- Duration::from_secs(60 * 60 * 24))
@ -110,7 +110,9 @@ fn setup_node_handle() -> std::result::Result<WakuNodeHandle<Running>, Box<dyn E
let peerid = node_handle.add_peer(&address, ProtocolId::Relay)?;
node_handle.connect_peer_with_id(&peerid, None)?;
}
node_handle.relay_subscribe(None)?;
let content_filter = ContentFilter::new(Some(waku_default_pubsub_topic()), vec![]);
node_handle.relay_subscribe(&content_filter)?;
Ok(node_handle)
}
@ -207,10 +209,11 @@ fn run_app<B: Backend>(
meta,
false,
);
if let Err(e) =
app.node_handle
.relay_publish_message(&waku_message, None, None)
{
if let Err(e) = app.node_handle.relay_publish_message(
&waku_message,
Some(waku_default_pubsub_topic()),
None,
) {
let mut out = std::io::stderr();
write!(out, "{e:?}").unwrap();
}

View File

@ -1,6 +1,6 @@
[package]
name = "waku-bindings"
version = "0.4.0"
version = "0.5.0"
edition = "2021"
authors = [
"Daniel Sanchez Quiros <danielsq@status.im>"
@ -26,7 +26,7 @@ serde_json = "1.0"
sscanf = "0.4"
smart-default = "0.6"
url = "2.3"
waku-sys = { version = "0.4.0", path = "../waku-sys" }
waku-sys = { version = "0.5.0", path = "../waku-sys" }
libc = "0.2"
[dev-dependencies]

View File

@ -5,7 +5,7 @@
//! When an event is emitted, this callback will be triggered receiving a [`Signal`]
// std
use std::ffi::{c_char, c_void, CStr};
use std::ffi::{c_char, c_int, c_void, CStr};
use std::ops::Deref;
use std::sync::Mutex;
// crates
@ -79,7 +79,7 @@ fn set_callback<F: FnMut(Signal) + Send + Sync + 'static>(f: F) {
/// Wrapper callback, it transformst the `*const c_char` into a [`Signal`]
/// and executes the [`CALLBACK`] funtion with it
extern "C" fn callback(data: *const c_char, _user_data: *mut c_void) {
extern "C" fn callback(_ret_code: c_int, data: *const c_char, _user_data: *mut c_void) {
let raw_response = unsafe { CStr::from_ptr(data) }
.to_str()
.expect("Not null ptr");

View File

@ -20,6 +20,8 @@ pub type WakuMessageVersion = usize;
pub type PeerId = String;
/// Waku message id, hex encoded sha256 digest of the message
pub type MessageId = String;
/// Waku pubsub topic
pub type WakuPubSubTopic = String;
/// Protocol identifiers
#[non_exhaustive]
@ -199,12 +201,12 @@ impl DecodedPayload {
/// as per the [specification](https://rfc.vac.dev/spec/36/#contentfilter-type)
#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ContentFilter {
pub struct LegacyContentFilter {
/// The content topic of a Waku message
content_topic: WakuContentTopic,
}
impl ContentFilter {
impl LegacyContentFilter {
pub fn new(content_topic: WakuContentTopic) -> Self {
Self { content_topic }
}
@ -218,14 +220,14 @@ impl ContentFilter {
/// as per the [specification](https://rfc.vac.dev/spec/36/#filtersubscription-type)
#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct FilterSubscription {
pub struct LegacyFilterSubscription {
/// Array of [`ContentFilter`] being subscribed to / unsubscribed from
content_filters: Vec<ContentFilter>,
/// Optional pubsub topic
pubsub_topic: Option<WakuPubSubTopic>,
}
impl FilterSubscription {
impl LegacyFilterSubscription {
pub fn new(content_filters: Vec<ContentFilter>, pubsub_topic: Option<WakuPubSubTopic>) -> Self {
Self {
content_filters,
@ -242,14 +244,103 @@ impl FilterSubscription {
}
}
/// The criteria to create subscription to a filter full node matching a content filter.
#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ContentFilter {
/// optional if using autosharding, mandatory if using static or named sharding.
pubsub_topic: Option<WakuPubSubTopic>,
/// mandatory, at least one required, with a max of 10
content_topics: Vec<WakuContentTopic>,
}
impl ContentFilter {
pub fn new(
pubsub_topic: Option<WakuPubSubTopic>,
content_topics: Vec<WakuContentTopic>,
) -> Self {
Self {
content_topics,
pubsub_topic,
}
}
pub fn content_topics(&self) -> &[WakuContentTopic] {
&self.content_topics
}
pub fn pubsub_topic(&self) -> Option<&WakuPubSubTopic> {
self.pubsub_topic.as_ref()
}
}
#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct FilterSubscriptionDetail {
#[serde(rename = "peerID")]
peer_id: PeerId,
content_topics: Vec<WakuContentTopic>,
pubsub_topic: WakuPubSubTopic,
}
impl FilterSubscriptionDetail {
pub fn new(
peer_id: PeerId,
content_topics: Vec<WakuContentTopic>,
pubsub_topic: WakuPubSubTopic,
) -> Self {
Self {
peer_id,
content_topics,
pubsub_topic,
}
}
pub fn peer_id(&self) -> &PeerId {
&self.peer_id
}
pub fn content_topics(&self) -> &[WakuContentTopic] {
&self.content_topics
}
pub fn pubsub_topic(&self) -> &WakuPubSubTopic {
&self.pubsub_topic
}
}
#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct FilterSubscriptionResult {
subscriptions: Vec<FilterSubscriptionDetail>,
error: Option<String>,
}
impl FilterSubscriptionResult {
pub fn new(subscriptions: Vec<FilterSubscriptionDetail>, error: Option<String>) -> Self {
Self {
subscriptions,
error,
}
}
pub fn subscriptions(&self) -> &[FilterSubscriptionDetail] {
&self.subscriptions
}
pub fn error(&self) -> &Option<String> {
&self.error
}
}
/// Criteria used to retrieve historical messages
#[derive(Clone, Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct StoreQuery {
/// The pubsub topic on which messages are published
pub pubsub_topic: Option<WakuPubSubTopic>,
/// Array of [`ContentFilter`] to query for historical messages
pub content_filters: Vec<ContentFilter>,
/// Array of [`WakuContentTopic`] to query for historical messages
pub content_topics: Vec<WakuContentTopic>,
/// The inclusive lower bound on the timestamp of queried messages.
/// This field holds the Unix epoch time in nanoseconds
pub start_time: Option<usize>,
@ -426,75 +517,6 @@ impl<'de> Deserialize<'de> for WakuContentTopic {
}
}
/// A waku pubsub topic in the form of `/waku/v2/{topic_name}/{encoding}`
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct WakuPubSubTopic {
pub topic_name: Cow<'static, str>,
pub encoding: Encoding,
}
impl WakuPubSubTopic {
pub const fn new(topic_name: &'static str, encoding: Encoding) -> Self {
Self {
topic_name: Cow::Borrowed(topic_name),
encoding,
}
}
pub fn with_topic_name(topic_name: String, encoding: Encoding) -> Self {
Self {
topic_name: Cow::Owned(topic_name),
encoding,
}
}
}
impl FromStr for WakuPubSubTopic {
type Err = String;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
if let Ok((topic_name, encoding)) = scanf!(s, "/waku/2/{}/{:/.+?/}", String, Encoding) {
Ok(WakuPubSubTopic {
topic_name: Cow::Owned(topic_name),
encoding,
})
} else {
Err(
format!(
"Wrong pub-sub topic format. Should be `/waku/2/{{topic-name}}/{{encoding}}`. Got: {s}"
)
)
}
}
}
impl Display for WakuPubSubTopic {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "/waku/2/{}/{}", self.topic_name, self.encoding)
}
}
impl Serialize for WakuPubSubTopic {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
self.to_string().serialize(serializer)
}
}
impl<'de> Deserialize<'de> for WakuPubSubTopic {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let as_string: String = String::deserialize(deserializer)?;
as_string
.parse::<WakuPubSubTopic>()
.map_err(D::Error::custom)
}
}
mod base64_serde {
use base64::Engine;
use serde::de::Error;

View File

@ -9,16 +9,17 @@ mod node;
mod utils;
pub use node::{
waku_create_content_topic, waku_create_pubsub_topic, waku_default_pubsub_topic,
waku_discv5_update_bootnodes, waku_dns_discovery, waku_new, Aes256Gcm, DnsInfo,
GossipSubParams, Initialized, Key, Multiaddr, Protocol, PublicKey, Running, SecretKey,
WakuLogLevel, WakuNodeConfig, WakuNodeHandle, WakuPeerData, WakuPeers, WebsocketParams,
waku_create_content_topic, waku_default_pubsub_topic, waku_discv5_update_bootnodes,
waku_dns_discovery, waku_new, Aes256Gcm, DnsInfo, GossipSubParams, Initialized, Key, Multiaddr,
Protocol, PublicKey, Running, SecretKey, WakuLogLevel, WakuNodeConfig, WakuNodeHandle,
WakuPeerData, WakuPeers, WebsocketParams,
};
pub use general::{
ContentFilter, DecodedPayload, Encoding, FilterSubscription, MessageId, MessageIndex,
PagingOptions, PeerId, ProtocolId, Result, StoreQuery, StoreResponse, WakuContentTopic,
WakuMessage, WakuMessageVersion, WakuPubSubTopic,
ContentFilter, DecodedPayload, Encoding, FilterSubscriptionDetail, FilterSubscriptionResult,
LegacyContentFilter, LegacyFilterSubscription, MessageId, MessageIndex, PagingOptions, PeerId,
ProtocolId, Result, StoreQuery, StoreResponse, WakuContentTopic, WakuMessage,
WakuMessageVersion, WakuPubSubTopic,
};
pub use events::{waku_set_event_callback, Event, Signal, WakuMessageEvent};

View File

@ -7,21 +7,105 @@ use std::time::Duration;
use libc::*;
// internal
use crate::general::Result;
use crate::general::{FilterSubscription, PeerId};
use crate::utils::{get_trampoline, handle_no_response};
use crate::general::{ContentFilter, FilterSubscriptionResult, PeerId};
use crate::utils::{get_trampoline, handle_json_response, handle_no_response};
/// Creates a subscription in a lightnode for messages that matches a content filter and optionally a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`)
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_subscribechar-filterjson-char-peerid-int-timeoutms)
pub fn waku_filter_subscribe(
filter_subscription: &FilterSubscription,
peer_id: PeerId,
timeout: Duration,
) -> Result<()> {
let filter_subscription_ptr = CString::new(
serde_json::to_string(filter_subscription)
.expect("FilterSubscription should always succeed to serialize"),
content_filter: &ContentFilter,
peer_id: Option<PeerId>,
timeout: Option<Duration>,
) -> Result<FilterSubscriptionResult> {
let content_filter_ptr = CString::new(
serde_json::to_string(content_filter)
.expect("ContentFilter should always succeed to serialize"),
)
.expect("FilterSubscription should always be able to be serialized")
.expect("ContentFilter should always be able to be serialized")
.into_raw();
let peer_id_ptr = match peer_id {
None => CString::new(""),
Some(t) => CString::new(t),
}
.expect("CString should build properly from peer id")
.into_raw();
let mut response: String = Default::default();
let response_cb = |v: &str| response = v.to_string();
let code = unsafe {
let mut closure = response_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_filter_subscribe(
content_filter_ptr,
peer_id_ptr,
timeout
.map(|timeout| {
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32")
})
.unwrap_or(0),
cb,
&mut closure as *mut _ as *mut c_void,
);
drop(CString::from_raw(content_filter_ptr));
drop(CString::from_raw(peer_id_ptr));
out
};
handle_json_response(code, &response)
}
/// Used to know if a service node has an active subscription for this client
/// peerID should contain the ID of a peer we are subscribed to, supporting the filter protocol
pub fn waku_filter_ping(peer_id: PeerId, timeout: Option<Duration>) -> Result<()> {
let peer_id_ptr = CString::new(peer_id)
.expect("PeerId should always be able to be serialized")
.into_raw();
let mut error: String = Default::default();
let error_cb = |v: &str| error = v.to_string();
let code = unsafe {
let mut closure = error_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_filter_ping(
peer_id_ptr,
timeout
.map(|timeout| {
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32")
})
.unwrap_or(0),
cb,
&mut closure as *mut _ as *mut c_void,
);
drop(CString::from_raw(peer_id_ptr));
out
};
handle_no_response(code, &error)
}
/// Sends a requests to a service node to stop pushing messages matching this filter to this client.
/// It might be used to modify an existing subscription by providing a subset of the original filter
/// criteria
pub fn waku_filter_unsubscribe(
content_filter: &ContentFilter,
peer_id: PeerId,
timeout: Option<Duration>,
) -> Result<()> {
let content_filter_ptr = CString::new(
serde_json::to_string(content_filter)
.expect("ContentFilter should always succeed to serialize"),
)
.expect("CString should build properly from the serialized filter subscription")
.into_raw();
let peer_id_ptr = CString::new(peer_id)
.expect("PeerId should always be able to be serialized")
@ -32,18 +116,22 @@ pub fn waku_filter_subscribe(
let code = unsafe {
let mut closure = error_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_legacy_filter_subscribe(
filter_subscription_ptr,
let out = waku_sys::waku_filter_unsubscribe(
content_filter_ptr,
peer_id_ptr,
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32"),
.map(|timeout| {
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32")
})
.unwrap_or(0),
cb,
&mut closure as *mut _ as *mut c_void,
);
drop(CString::from_raw(filter_subscription_ptr));
drop(CString::from_raw(content_filter_ptr));
drop(CString::from_raw(peer_id_ptr));
out
@ -52,17 +140,18 @@ pub fn waku_filter_subscribe(
handle_no_response(code, &error)
}
/// Removes subscriptions in a light node matching a content filter and, optionally, a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`)
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_unsubscribechar-filterjson-int-timeoutms)
pub fn waku_filter_unsubscribe(
filter_subscription: &FilterSubscription,
timeout: Duration,
/// Sends a requests to a service node (or all service nodes) to stop pushing messages
/// peerID should contain the ID of a peer this client is subscribed to, or can be None
/// to stop all active subscriptions
pub fn waku_filter_unsubscribe_all(
peer_id: Option<PeerId>,
timeout: Option<Duration>,
) -> Result<()> {
let filter_subscription_ptr = CString::new(
serde_json::to_string(filter_subscription)
.expect("FilterSubscription should always succeed to serialize"),
)
.expect("CString should build properly from the serialized filter subscription")
let peer_id_ptr = match peer_id {
None => CString::new(""),
Some(t) => CString::new(t),
}
.expect("CString should build properly from peer id")
.into_raw();
let mut error: String = Default::default();
@ -70,17 +159,21 @@ pub fn waku_filter_unsubscribe(
let code = unsafe {
let mut closure = error_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_legacy_filter_unsubscribe(
filter_subscription_ptr,
let out = waku_sys::waku_filter_unsubscribe_all(
peer_id_ptr,
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32"),
.map(|timeout| {
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32")
})
.unwrap_or(0),
cb,
&mut closure as *mut _ as *mut c_void,
);
drop(CString::from_raw(filter_subscription_ptr));
drop(CString::from_raw(peer_id_ptr));
out
};

View File

@ -0,0 +1,89 @@
//! Waku [filter](https://rfc.vac.dev/spec/36/#waku-filter) protocol related methods
// std
use std::ffi::CString;
use std::time::Duration;
// crates
use libc::*;
// internal
use crate::general::Result;
use crate::general::{LegacyFilterSubscription, PeerId};
use crate::utils::{get_trampoline, handle_no_response};
/// Creates a subscription in a lightnode for messages that matches a content filter and optionally a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`)
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_legacy_filter_subscribechar-filterjson-char-peerid-int-timeoutms)
pub fn waku_legacy_filter_subscribe(
filter_subscription: &LegacyFilterSubscription,
peer_id: PeerId,
timeout: Duration,
) -> Result<()> {
let filter_subscription_ptr = CString::new(
serde_json::to_string(filter_subscription)
.expect("FilterSubscription should always succeed to serialize"),
)
.expect("FilterSubscription should always be able to be serialized")
.into_raw();
let peer_id_ptr = CString::new(peer_id)
.expect("PeerId should always be able to be serialized")
.into_raw();
let mut error: String = Default::default();
let error_cb = |v: &str| error = v.to_string();
let code = unsafe {
let mut closure = error_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_legacy_filter_subscribe(
filter_subscription_ptr,
peer_id_ptr,
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32"),
cb,
&mut closure as *mut _ as *mut c_void,
);
drop(CString::from_raw(filter_subscription_ptr));
drop(CString::from_raw(peer_id_ptr));
out
};
handle_no_response(code, &error)
}
/// Removes subscriptions in a light node matching a content filter and, optionally, a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`)
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_unsubscribechar-filterjson-int-timeoutms)
pub fn waku_legacy_filter_unsubscribe(
filter_subscription: &LegacyFilterSubscription,
timeout: Duration,
) -> Result<()> {
let filter_subscription_ptr = CString::new(
serde_json::to_string(filter_subscription)
.expect("FilterSubscription should always succeed to serialize"),
)
.expect("CString should build properly from the serialized filter subscription")
.into_raw();
let mut error: String = Default::default();
let error_cb = |v: &str| error = v.to_string();
let code = unsafe {
let mut closure = error_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_legacy_filter_unsubscribe(
filter_subscription_ptr,
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32"),
cb,
&mut closure as *mut _ as *mut c_void,
);
drop(CString::from_raw(filter_subscription_ptr));
out
};
handle_no_response(code, &error)
}

View File

@ -3,6 +3,7 @@
mod config;
mod discovery;
mod filter;
mod legacyfilter;
mod lightpush;
mod management;
mod peers;
@ -20,14 +21,14 @@ use std::time::Duration;
// internal
use crate::general::{
FilterSubscription, MessageId, PeerId, ProtocolId, Result, StoreQuery, StoreResponse,
WakuMessage, WakuPubSubTopic,
ContentFilter, FilterSubscriptionResult, LegacyFilterSubscription, MessageId, PeerId,
ProtocolId, Result, StoreQuery, StoreResponse, WakuMessage, WakuPubSubTopic,
};
pub use config::{GossipSubParams, WakuLogLevel, WakuNodeConfig, WebsocketParams};
pub use discovery::{waku_discv5_update_bootnodes, waku_dns_discovery, DnsInfo};
pub use peers::{Protocol, WakuPeerData, WakuPeers};
pub use relay::{waku_create_content_topic, waku_create_pubsub_topic, waku_default_pubsub_topic};
pub use relay::{waku_create_content_topic, waku_default_pubsub_topic};
pub use store::{waku_local_store_query, waku_store_query};
/// Shared flag to check if a waku node is already running in the current process
@ -148,8 +149,9 @@ impl WakuNodeHandle<Running> {
peers::waku_peers()
}
/// Publish a message using Waku Relay
/// Publish a message using Waku Relay.
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms)
/// The pubsub_topic parameter is optional and if not specified it will be derived from the contentTopic.
pub fn relay_publish_message(
&self,
message: &WakuMessage,
@ -164,14 +166,14 @@ impl WakuNodeHandle<Running> {
relay::waku_enough_peers(pubsub_topic)
}
/// Subscribe to a Waku Relay pubsub topic to receive messages
pub fn relay_subscribe(&self, pubsub_topic: Option<WakuPubSubTopic>) -> Result<()> {
relay::waku_relay_subscribe(pubsub_topic)
/// Subscribe to WakuRelay to receive messages matching a content filter.
pub fn relay_subscribe(&self, content_filter: &ContentFilter) -> Result<()> {
relay::waku_relay_subscribe(content_filter)
}
/// Closes the pubsub subscription to a pubsub topic. No more messages will be received from this pubsub topic
pub fn relay_unsubscribe(&self, pubsub_topic: Option<WakuPubSubTopic>) -> Result<()> {
relay::waku_relay_unsubscribe(pubsub_topic)
/// Closes the pubsub subscription to stop receiving messages matching a content filter. No more messages will be received from this pubsub topic
pub fn relay_unsubscribe(&self, content_filter: &ContentFilter) -> Result<()> {
relay::waku_relay_unsubscribe(content_filter)
}
/// Returns the list of pubsub topics the node is subscribed to in Waku Relay
@ -200,8 +202,9 @@ impl WakuNodeHandle<Running> {
store::waku_local_store_query(query)
}
/// Publish a message using Waku Lightpush
/// Publish a message using Waku Lightpush.
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_lightpush_publishchar-messagejson-char-topic-char-peerid-int-timeoutms)
/// The pubsub_topic parameter is optional and if not specified it will be derived from the contentTopic.
pub fn lightpush_publish(
&self,
message: &WakuMessage,
@ -214,23 +217,62 @@ impl WakuNodeHandle<Running> {
/// Creates a subscription in a lightnode for messages that matches a content filter and optionally a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`)
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_subscribechar-filterjson-char-peerid-int-timeoutms)
pub fn filter_subscribe(
#[deprecated]
pub fn legacy_filter_subscribe(
&self,
filter_subscription: &FilterSubscription,
filter_subscription: &LegacyFilterSubscription,
peer_id: PeerId,
timeout: Duration,
) -> Result<()> {
filter::waku_filter_subscribe(filter_subscription, peer_id, timeout)
legacyfilter::waku_legacy_filter_subscribe(filter_subscription, peer_id, timeout)
}
/// Removes subscriptions in a light node matching a content filter and, optionally, a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`)
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_unsubscribechar-filterjson-int-timeoutms)
pub fn filter_unsubscribe(
#[deprecated]
pub fn legacy_filter_unsubscribe(
&self,
filter_subscription: &FilterSubscription,
filter_subscription: &LegacyFilterSubscription,
timeout: Duration,
) -> Result<()> {
filter::waku_filter_unsubscribe(filter_subscription, timeout)
legacyfilter::waku_legacy_filter_unsubscribe(filter_subscription, timeout)
}
/// Creates a subscription to a filter full node matching a content filter.
/// Returns the PeerId on which the filter subscription was created
pub fn filter_subscribe(
&self,
content_filter: &ContentFilter,
peer_id: Option<PeerId>,
timeout: Option<Duration>,
) -> Result<FilterSubscriptionResult> {
filter::waku_filter_subscribe(content_filter, peer_id, timeout)
}
/// Used to know if a service node has an active subscription for this client
pub fn filter_ping(&self, peer_id: PeerId, timeout: Option<Duration>) -> Result<()> {
filter::waku_filter_ping(peer_id, timeout)
}
/// Sends a requests to a service node to stop pushing messages matching this filter to this client.
/// It might be used to modify an existing subscription by providing a subset of the original filter
/// criteria
pub fn filter_unsubscribe(
&self,
content_filter: &ContentFilter,
peer_id: PeerId,
timeout: Option<Duration>,
) -> Result<()> {
filter::waku_filter_unsubscribe(content_filter, peer_id, timeout)
}
/// Sends a requests to a service node (or all service nodes) to stop pushing messages
pub fn filter_unsubscribe_all(
&self,
peer_id: Option<PeerId>,
timeout: Option<Duration>,
) -> Result<()> {
filter::waku_filter_unsubscribe_all(peer_id, timeout)
}
/// Update the bootnodes used by DiscoveryV5 by passing a list of ENRs

View File

@ -6,7 +6,9 @@ use std::time::Duration;
// crates
use libc::*;
// internal
use crate::general::{Encoding, MessageId, Result, WakuContentTopic, WakuMessage, WakuPubSubTopic};
use crate::general::{
ContentFilter, Encoding, MessageId, Result, WakuContentTopic, WakuMessage, WakuPubSubTopic,
};
use crate::utils::{get_trampoline, handle_json_response, handle_no_response, handle_response};
/// Create a content topic according to [RFC 23](https://rfc.vac.dev/spec/23/)
@ -55,37 +57,6 @@ pub fn waku_create_content_topic(
.expect("&str from result should always be extracted")
}
/// Create a pubsub topic according to [RFC 23](https://rfc.vac.dev/spec/23/)
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_pubsub_topicchar-name-char-encoding)
pub fn waku_create_pubsub_topic(topic_name: &str, encoding: Encoding) -> WakuPubSubTopic {
let topic_name_ptr = CString::new(topic_name)
.expect("Topic name should always transform to CString")
.into_raw();
let encoding_ptr = CString::new(encoding.to_string())
.expect("Encoding should always transform to CString")
.into_raw();
let mut result: String = Default::default();
let result_cb = |v: &str| result = v.to_string();
let code = unsafe {
let mut closure = result_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_pubsub_topic(
topic_name_ptr,
encoding_ptr,
cb,
&mut closure as *mut _ as *mut c_void,
);
drop(CString::from_raw(topic_name_ptr));
drop(CString::from_raw(encoding_ptr));
out
};
handle_response(code, &result).expect("&str from result should always be extracted")
}
/// Default pubsub topic used for exchanging waku messages defined in [RFC 10](https://rfc.vac.dev/spec/10/)
pub fn waku_default_pubsub_topic() -> WakuPubSubTopic {
let mut result: String = Default::default();
@ -191,27 +162,25 @@ pub fn waku_enough_peers(pubsub_topic: Option<WakuPubSubTopic>) -> Result<bool>
handle_response(code, &result)
}
pub fn waku_relay_subscribe(pubsub_topic: Option<WakuPubSubTopic>) -> Result<()> {
let pubsub_topic = pubsub_topic
.unwrap_or_else(waku_default_pubsub_topic)
.to_string();
let pubsub_topic_ptr = CString::new(pubsub_topic)
.expect("CString should build properly from pubsub topic")
.into_raw();
pub fn waku_relay_subscribe(content_filter: &ContentFilter) -> Result<()> {
let content_filter_ptr = CString::new(
serde_json::to_string(content_filter)
.expect("ContentFilter should always succeed to serialize"),
)
.expect("ContentFilter should always be able to be serialized")
.into_raw();
let mut error: String = Default::default();
let error_cb = |v: &str| error = v.to_string();
let code = unsafe {
let mut closure = error_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_relay_subscribe(
pubsub_topic_ptr,
content_filter_ptr,
cb,
&mut closure as *mut _ as *mut c_void,
);
drop(CString::from_raw(pubsub_topic_ptr));
drop(CString::from_raw(content_filter_ptr));
out
};
@ -219,27 +188,25 @@ pub fn waku_relay_subscribe(pubsub_topic: Option<WakuPubSubTopic>) -> Result<()>
handle_no_response(code, &error)
}
pub fn waku_relay_unsubscribe(pubsub_topic: Option<WakuPubSubTopic>) -> Result<()> {
let pubsub_topic = pubsub_topic
.unwrap_or_else(waku_default_pubsub_topic)
.to_string();
let pubsub_topic_ptr = CString::new(pubsub_topic)
.expect("CString should build properly from pubsub topic")
.into_raw();
pub fn waku_relay_unsubscribe(content_filter: &ContentFilter) -> Result<()> {
let content_filter_ptr = CString::new(
serde_json::to_string(content_filter)
.expect("ContentFilter should always succeed to serialize"),
)
.expect("ContentFilter should always be able to be serialized")
.into_raw();
let mut error: String = Default::default();
let error_cb = |v: &str| error = v.to_string();
let code = unsafe {
let mut closure = error_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_relay_subscribe(
pubsub_topic_ptr,
content_filter_ptr,
cb,
&mut closure as *mut _ as *mut c_void,
);
drop(CString::from_raw(pubsub_topic_ptr));
drop(CString::from_raw(content_filter_ptr));
out
};

View File

@ -10,21 +10,27 @@ pub fn decode<T: DeserializeOwned>(input: &str) -> Result<T> {
}
unsafe extern "C" fn trampoline<F>(
_ret_code: ::std::os::raw::c_int,
data: *const ::std::os::raw::c_char,
user_data: *mut ::std::os::raw::c_void,
) where
F: FnMut(&str),
{
let user_data = &mut *(user_data as *mut F);
let response = unsafe { CStr::from_ptr(data) }
.to_str()
.map_err(|err| {
format!(
"could not retrieve response from pointer returned by waku: {}",
err
)
})
.expect("could not retrieve response");
let response = if data.is_null() {
""
} else {
unsafe { CStr::from_ptr(data) }
.to_str()
.map_err(|err| {
format!(
"could not retrieve response from pointer returned by waku: {}",
err
)
})
.expect("could not retrieve response")
};
user_data(response);
}

View File

@ -10,9 +10,9 @@ use std::{collections::HashSet, str::from_utf8};
use tokio::sync::mpsc::{self, Sender};
use tokio::time;
use waku_bindings::{
waku_default_pubsub_topic, waku_new, waku_set_event_callback, Encoding, Event, GossipSubParams,
Key, MessageId, ProtocolId, Running, WakuContentTopic, WakuLogLevel, WakuMessage,
WakuNodeConfig, WakuNodeHandle, WakuPubSubTopic,
waku_default_pubsub_topic, waku_new, waku_set_event_callback, ContentFilter, Encoding, Event,
GossipSubParams, Key, MessageId, ProtocolId, Running, WakuContentTopic, WakuLogLevel,
WakuMessage, WakuNodeConfig, WakuNodeHandle, WakuPubSubTopic,
};
const ECHO_TIMEOUT: u64 = 10;
@ -157,7 +157,8 @@ async fn discv5_echo() -> Result<(), String> {
let ssk = Aes256Gcm::generate_key(&mut thread_rng());
// Subscribe to default channel.
node.relay_subscribe(None)?;
let content_filter = ContentFilter::new(Some(waku_default_pubsub_topic()), vec![]);
node.relay_subscribe(&content_filter)?;
let content_topic = WakuContentTopic::new("toychat", 2, "huilong", Encoding::Proto);
let topics = node.relay_topics()?;
@ -215,7 +216,8 @@ async fn default_echo() -> Result<(), String> {
let ssk = Aes256Gcm::generate_key(&mut thread_rng());
// subscribe to default channel
node.relay_subscribe(None)?;
let content_filter = ContentFilter::new(Some(waku_default_pubsub_topic()), vec![]);
node.relay_subscribe(&content_filter)?;
let content_topic = WakuContentTopic::new("toychat", 2, "huilong", Encoding::Proto);
let sleep = time::sleep(Duration::from_secs(ECHO_TIMEOUT));

View File

@ -1,6 +1,6 @@
[package]
name = "waku-sys"
version = "0.4.0"
version = "0.5.0"
edition = "2021"
authors = [
"Daniel Sanchez Quiros <danielsq@status.im>"

@ -1 +1 @@
Subproject commit b3bd45f01f1211cb18fb44ced5277758ab38eee7
Subproject commit 02f2800b046094f73d1011081daef7d897126687