feat: waku_filter_subscribe

This commit is contained in:
Richard Ramos 2023-10-20 15:50:28 -04:00
parent d0b35f4aaf
commit c04eb6b7ee
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760
15 changed files with 215 additions and 160 deletions

4
Cargo.lock generated
View File

@ -1674,7 +1674,7 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "waku-bindings"
version = "0.4.0"
version = "0.5.0"
dependencies = [
"aes-gcm",
"base64 0.21.0",
@ -1698,7 +1698,7 @@ dependencies = [
[[package]]
name = "waku-sys"
version = "0.4.0"
version = "0.5.0"
dependencies = [
"bindgen",
]

5
examples/Cargo.lock generated
View File

@ -1497,12 +1497,13 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "waku-bindings"
version = "0.1.1"
version = "0.4.0"
dependencies = [
"aes-gcm",
"base64 0.21.0",
"enr",
"hex",
"libc",
"multiaddr",
"once_cell",
"rand",
@ -1517,7 +1518,7 @@ dependencies = [
[[package]]
name = "waku-sys"
version = "0.1.0"
version = "0.3.0"
dependencies = [
"bindgen",
]

View File

@ -23,8 +23,8 @@ use tui::{
};
use unicode_width::UnicodeWidthStr;
use waku_bindings::{
waku_new, waku_set_event_callback, ContentFilter, Multiaddr, PagingOptions, ProtocolId,
Running, StoreQuery, WakuMessage, WakuNodeHandle,
waku_default_pubsub_topic, waku_new, waku_set_event_callback, ContentFilter, Multiaddr,
PagingOptions, ProtocolId, Running, StoreQuery, WakuMessage, WakuNodeHandle,
};
enum InputMode {
@ -76,7 +76,7 @@ fn retrieve_history(
let result = node_handle.store_query(
&StoreQuery {
pubsub_topic: None,
content_filters: vec![ContentFilter::new(TOY_CHAT_CONTENT_TOPIC.clone())],
content_topics: vec![TOY_CHAT_CONTENT_TOPIC.clone()],
start_time: Some(
(Duration::from_secs(Utc::now().timestamp() as u64)
- Duration::from_secs(60 * 60 * 24))
@ -110,7 +110,9 @@ fn setup_node_handle() -> std::result::Result<WakuNodeHandle<Running>, Box<dyn E
let peerid = node_handle.add_peer(&address, ProtocolId::Relay)?;
node_handle.connect_peer_with_id(&peerid, None)?;
}
node_handle.relay_subscribe(None)?;
let content_filter = ContentFilter::new(Some(waku_default_pubsub_topic()), vec![]);
node_handle.relay_subscribe(&content_filter)?;
Ok(node_handle)
}
@ -207,10 +209,11 @@ fn run_app<B: Backend>(
meta,
false,
);
if let Err(e) =
app.node_handle
.relay_publish_message(&waku_message, None, None)
{
if let Err(e) = app.node_handle.relay_publish_message(
&waku_message,
Some(waku_default_pubsub_topic()),
None,
) {
let mut out = std::io::stderr();
write!(out, "{e:?}").unwrap();
}

View File

@ -1,6 +1,6 @@
[package]
name = "waku-bindings"
version = "0.4.0"
version = "0.5.0"
edition = "2021"
authors = [
"Daniel Sanchez Quiros <danielsq@status.im>"
@ -26,7 +26,7 @@ serde_json = "1.0"
sscanf = "0.4"
smart-default = "0.6"
url = "2.3"
waku-sys = { version = "0.4.0", path = "../waku-sys" }
waku-sys = { version = "0.5.0", path = "../waku-sys" }
libc = "0.2"
[dev-dependencies]

View File

@ -5,7 +5,7 @@
//! When an event is emitted, this callback will be triggered receiving a [`Signal`]
// std
use std::ffi::{c_char, c_void, CStr};
use std::ffi::{c_char, c_int, c_void, CStr};
use std::ops::Deref;
use std::sync::Mutex;
// crates
@ -79,7 +79,7 @@ fn set_callback<F: FnMut(Signal) + Send + Sync + 'static>(f: F) {
/// Wrapper callback, it transformst the `*const c_char` into a [`Signal`]
/// and executes the [`CALLBACK`] funtion with it
extern "C" fn callback(data: *const c_char, _user_data: *mut c_void) {
extern "C" fn callback(_ret_code: c_int, data: *const c_char, _user_data: *mut c_void) {
let raw_response = unsafe { CStr::from_ptr(data) }
.to_str()
.expect("Not null ptr");

View File

@ -199,12 +199,12 @@ impl DecodedPayload {
/// as per the [specification](https://rfc.vac.dev/spec/36/#contentfilter-type)
#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ContentFilter {
pub struct LegacyContentFilter {
/// The content topic of a Waku message
content_topic: WakuContentTopic,
}
impl ContentFilter {
impl LegacyContentFilter {
pub fn new(content_topic: WakuContentTopic) -> Self {
Self { content_topic }
}
@ -217,7 +217,6 @@ impl ContentFilter {
/// The criteria to create subscription to a light node in JSON Format
/// as per the [specification](https://rfc.vac.dev/spec/36/#filtersubscription-type)
#[derive(Clone, Serialize, Deserialize, Debug)]
#[deprecated]
#[serde(rename_all = "camelCase")]
pub struct LegacyFilterSubscription {
/// Array of [`ContentFilter`] being subscribed to / unsubscribed from
@ -244,20 +243,19 @@ impl LegacyFilterSubscription {
}
/// The criteria to create subscription to a filter full node matching a content filter.
/// as per the [specification](https://rfc.vac.dev/spec/36/#filtersubscription-type)
#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct FilterSubscription {
/// mandatory, at least one required, with a max of 10
content_topics: Vec<WakuContentTopic>,
pub struct ContentFilter {
/// optional if using autosharding, mandatory if using static or named sharding.
pubsub_topic: Option<WakuPubSubTopic>,
/// mandatory, at least one required, with a max of 10
content_topics: Vec<WakuContentTopic>,
}
impl FilterSubscription {
impl ContentFilter {
pub fn new(
content_topics: Vec<WakuContentTopic>,
pubsub_topic: Option<WakuPubSubTopic>,
content_topics: Vec<WakuContentTopic>,
) -> Self {
Self {
content_topics,
@ -274,14 +272,73 @@ impl FilterSubscription {
}
}
#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct FilterSubscriptionDetail {
#[serde(rename = "peerID")]
peer_id: PeerId,
content_topics: Vec<WakuContentTopic>,
pubsub_topic: WakuPubSubTopic,
}
impl FilterSubscriptionDetail {
pub fn new(
peer_id: PeerId,
content_topics: Vec<WakuContentTopic>,
pubsub_topic: WakuPubSubTopic,
) -> Self {
Self {
peer_id,
content_topics,
pubsub_topic,
}
}
pub fn peer_id(&self) -> &PeerId {
&self.peer_id
}
pub fn content_topics(&self) -> &[WakuContentTopic] {
&self.content_topics
}
pub fn pubsub_topic(&self) -> &WakuPubSubTopic {
&self.pubsub_topic
}
}
#[derive(Clone, Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct FilterSubscriptionResult {
subscriptions: Vec<FilterSubscriptionDetail>,
error: Option<String>,
}
impl FilterSubscriptionResult {
pub fn new(subscriptions: Vec<FilterSubscriptionDetail>, error: Option<String>) -> Self {
Self {
subscriptions,
error,
}
}
pub fn subscriptions(&self) -> &[FilterSubscriptionDetail] {
&self.subscriptions
}
pub fn error(&self) -> &Option<String> {
&self.error
}
}
/// Criteria used to retrieve historical messages
#[derive(Clone, Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct StoreQuery {
/// The pubsub topic on which messages are published
pub pubsub_topic: Option<WakuPubSubTopic>,
/// Array of [`ContentFilter`] to query for historical messages
pub content_filters: Vec<ContentFilter>,
/// Array of [`WakuContentTopic`] to query for historical messages
pub content_topics: Vec<WakuContentTopic>,
/// The inclusive lower bound on the timestamp of queried messages.
/// This field holds the Unix epoch time in nanoseconds
pub start_time: Option<usize>,

View File

@ -9,16 +9,17 @@ mod node;
mod utils;
pub use node::{
waku_create_content_topic, waku_create_pubsub_topic, waku_default_pubsub_topic,
waku_discv5_update_bootnodes, waku_dns_discovery, waku_new, Aes256Gcm, DnsInfo,
GossipSubParams, Initialized, Key, Multiaddr, Protocol, PublicKey, Running, SecretKey,
WakuLogLevel, WakuNodeConfig, WakuNodeHandle, WakuPeerData, WakuPeers, WebsocketParams,
waku_create_content_topic, waku_default_pubsub_topic, waku_discv5_update_bootnodes,
waku_dns_discovery, waku_new, Aes256Gcm, DnsInfo, GossipSubParams, Initialized, Key, Multiaddr,
Protocol, PublicKey, Running, SecretKey, WakuLogLevel, WakuNodeConfig, WakuNodeHandle,
WakuPeerData, WakuPeers, WebsocketParams,
};
pub use general::{
ContentFilter, DecodedPayload, Encoding, FilterSubscription, LegacyFilterSubscription,
MessageId, MessageIndex, PagingOptions, PeerId, ProtocolId, Result, StoreQuery, StoreResponse,
WakuContentTopic, WakuMessage, WakuMessageVersion, WakuPubSubTopic,
ContentFilter, DecodedPayload, Encoding, FilterSubscriptionDetail, FilterSubscriptionResult,
LegacyContentFilter, LegacyFilterSubscription, MessageId, MessageIndex, PagingOptions, PeerId,
ProtocolId, Result, StoreQuery, StoreResponse, WakuContentTopic, WakuMessage,
WakuMessageVersion, WakuPubSubTopic,
};
pub use events::{waku_set_event_callback, Event, Signal, WakuMessageEvent};

View File

@ -7,21 +7,21 @@ use std::time::Duration;
use libc::*;
// internal
use crate::general::Result;
use crate::general::{FilterSubscription, PeerId};
use crate::utils::{get_trampoline, handle_response, handle_no_response};
use crate::general::{ContentFilter, FilterSubscriptionResult, PeerId};
use crate::utils::{get_trampoline, handle_json_response, handle_no_response};
/// Creates a subscription in a lightnode for messages that matches a content filter and optionally a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`)
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_subscribechar-filterjson-char-peerid-int-timeoutms)
pub fn waku_filter_subscribe(
filter_subscription: &FilterSubscription,
content_filter: &ContentFilter,
peer_id: Option<PeerId>,
timeout: Duration,
) -> Result<PeerId> {
let filter_subscription_ptr = CString::new(
serde_json::to_string(filter_subscription)
.expect("FilterSubscription should always succeed to serialize"),
timeout: Option<Duration>,
) -> Result<FilterSubscriptionResult> {
let content_filter_ptr = CString::new(
serde_json::to_string(content_filter)
.expect("ContentFilter should always succeed to serialize"),
)
.expect("FilterSubscription should always be able to be serialized")
.expect("ContentFilter should always be able to be serialized")
.into_raw();
let peer_id_ptr = match peer_id {
None => CString::new(""),
@ -36,29 +36,32 @@ pub fn waku_filter_subscribe(
let mut closure = response_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_filter_subscribe(
filter_subscription_ptr,
content_filter_ptr,
peer_id_ptr,
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32"),
.map(|timeout| {
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32")
})
.unwrap_or(0),
cb,
&mut closure as *mut _ as *mut c_void,
);
drop(CString::from_raw(filter_subscription_ptr));
drop(CString::from_raw(content_filter_ptr));
drop(CString::from_raw(peer_id_ptr));
out
};
TODO: extract the peerID from here?
handle_response(code, &response)
handle_json_response(code, &response)
}
/// Used to know if a service node has an active subscription for this client
/// peerID should contain the ID of a peer we are subscribed to, supporting the filter protocol
pub fn waku_filter_ping(peer_id: PeerId, timeout: Duration) -> Result<()> {
pub fn waku_filter_ping(peer_id: PeerId, timeout: Option<Duration>) -> Result<()> {
let peer_id_ptr = CString::new(peer_id)
.expect("PeerId should always be able to be serialized")
.into_raw();
@ -71,9 +74,13 @@ pub fn waku_filter_ping(peer_id: PeerId, timeout: Duration) -> Result<()> {
let out = waku_sys::waku_filter_ping(
peer_id_ptr,
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32"),
.map(|timeout| {
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32")
})
.unwrap_or(0),
cb,
&mut closure as *mut _ as *mut c_void,
);
@ -90,13 +97,13 @@ pub fn waku_filter_ping(peer_id: PeerId, timeout: Duration) -> Result<()> {
/// It might be used to modify an existing subscription by providing a subset of the original filter
/// criteria
pub fn waku_filter_unsubscribe(
filter_subscription: &FilterSubscription,
content_filter: &ContentFilter,
peer_id: PeerId,
timeout: Duration,
timeout: Option<Duration>,
) -> Result<()> {
let filter_subscription_ptr = CString::new(
serde_json::to_string(filter_subscription)
.expect("FilterSubscription should always succeed to serialize"),
let content_filter_ptr = CString::new(
serde_json::to_string(content_filter)
.expect("ContentFilter should always succeed to serialize"),
)
.expect("CString should build properly from the serialized filter subscription")
.into_raw();
@ -110,17 +117,21 @@ pub fn waku_filter_unsubscribe(
let mut closure = error_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_filter_unsubscribe(
filter_subscription_ptr,
content_filter_ptr,
peer_id_ptr,
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32"),
.map(|timeout| {
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32")
})
.unwrap_or(0),
cb,
&mut closure as *mut _ as *mut c_void,
);
drop(CString::from_raw(filter_subscription_ptr));
drop(CString::from_raw(content_filter_ptr));
drop(CString::from_raw(peer_id_ptr));
out
@ -132,7 +143,10 @@ pub fn waku_filter_unsubscribe(
/// Sends a requests to a service node (or all service nodes) to stop pushing messages
/// peerID should contain the ID of a peer this client is subscribed to, or can be None
/// to stop all active subscriptions
pub fn waku_filter_unsubscribe_all(peer_id: Option<PeerId>, timeout: Duration) -> Result<()> {
pub fn waku_filter_unsubscribe_all(
peer_id: Option<PeerId>,
timeout: Option<Duration>,
) -> Result<()> {
let peer_id_ptr = match peer_id {
None => CString::new(""),
Some(t) => CString::new(t),
@ -148,9 +162,13 @@ pub fn waku_filter_unsubscribe_all(peer_id: Option<PeerId>, timeout: Duration) -
let out = waku_sys::waku_filter_unsubscribe_all(
peer_id_ptr,
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32"),
.map(|timeout| {
timeout
.as_millis()
.try_into()
.expect("Duration as milliseconds should fit in a i32")
})
.unwrap_or(0),
cb,
&mut closure as *mut _ as *mut c_void,
);

View File

@ -12,7 +12,6 @@ use crate::utils::{get_trampoline, handle_no_response};
/// Creates a subscription in a lightnode for messages that matches a content filter and optionally a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`)
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_legacy_filter_subscribechar-filterjson-char-peerid-int-timeoutms)
#[deprecated]
pub fn waku_legacy_filter_subscribe(
filter_subscription: &LegacyFilterSubscription,
peer_id: PeerId,
@ -55,7 +54,6 @@ pub fn waku_legacy_filter_subscribe(
/// Removes subscriptions in a light node matching a content filter and, optionally, a [`WakuPubSubTopic`](`crate::general::WakuPubSubTopic`)
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_filter_unsubscribechar-filterjson-int-timeoutms)
#[deprecated]
pub fn waku_legacy_filter_unsubscribe(
filter_subscription: &LegacyFilterSubscription,
timeout: Duration,

View File

@ -21,14 +21,14 @@ use std::time::Duration;
// internal
use crate::general::{
FilterSubscription, LegacyFilterSubscription, MessageId, PeerId, ProtocolId, Result,
StoreQuery, StoreResponse, WakuMessage, WakuPubSubTopic,
ContentFilter, FilterSubscriptionResult, LegacyFilterSubscription, MessageId, PeerId,
ProtocolId, Result, StoreQuery, StoreResponse, WakuMessage, WakuPubSubTopic,
};
pub use config::{GossipSubParams, WakuLogLevel, WakuNodeConfig, WebsocketParams};
pub use discovery::{waku_discv5_update_bootnodes, waku_dns_discovery, DnsInfo};
pub use peers::{Protocol, WakuPeerData, WakuPeers};
pub use relay::{waku_create_content_topic, waku_create_pubsub_topic, waku_default_pubsub_topic};
pub use relay::{waku_create_content_topic, waku_default_pubsub_topic};
pub use store::{waku_local_store_query, waku_store_query};
/// Shared flag to check if a waku node is already running in the current process
@ -149,8 +149,9 @@ impl WakuNodeHandle<Running> {
peers::waku_peers()
}
/// Publish a message using Waku Relay
/// Publish a message using Waku Relay.
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_relay_publishchar-messagejson-char-pubsubtopic-int-timeoutms)
/// The pubsub_topic parameter is optional and if not specified it will be derived from the contentTopic.
pub fn relay_publish_message(
&self,
message: &WakuMessage,
@ -165,14 +166,14 @@ impl WakuNodeHandle<Running> {
relay::waku_enough_peers(pubsub_topic)
}
/// Subscribe to a Waku Relay pubsub topic to receive messages
pub fn relay_subscribe(&self, pubsub_topic: Option<WakuPubSubTopic>) -> Result<()> {
relay::waku_relay_subscribe(pubsub_topic)
/// Subscribe to WakuRelay to receive messages matching a content filter.
pub fn relay_subscribe(&self, content_filter: &ContentFilter) -> Result<()> {
relay::waku_relay_subscribe(content_filter)
}
/// Closes the pubsub subscription to a pubsub topic. No more messages will be received from this pubsub topic
pub fn relay_unsubscribe(&self, pubsub_topic: Option<WakuPubSubTopic>) -> Result<()> {
relay::waku_relay_unsubscribe(pubsub_topic)
/// Closes the pubsub subscription to stop receiving messages matching a content filter. No more messages will be received from this pubsub topic
pub fn relay_unsubscribe(&self, content_filter: &ContentFilter) -> Result<()> {
relay::waku_relay_unsubscribe(content_filter)
}
/// Returns the list of pubsub topics the node is subscribed to in Waku Relay
@ -201,8 +202,9 @@ impl WakuNodeHandle<Running> {
store::waku_local_store_query(query)
}
/// Publish a message using Waku Lightpush
/// Publish a message using Waku Lightpush.
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_lightpush_publishchar-messagejson-char-topic-char-peerid-int-timeoutms)
/// The pubsub_topic parameter is optional and if not specified it will be derived from the contentTopic.
pub fn lightpush_publish(
&self,
message: &WakuMessage,
@ -240,15 +242,15 @@ impl WakuNodeHandle<Running> {
/// Returns the PeerId on which the filter subscription was created
pub fn filter_subscribe(
&self,
filter_subscription: &FilterSubscription,
content_filter: &ContentFilter,
peer_id: Option<PeerId>,
timeout: Duration,
) -> Result<PeerId> {
filter::waku_filter_subscribe(filter_subscription, peer_id, timeout)
timeout: Option<Duration>,
) -> Result<FilterSubscriptionResult> {
filter::waku_filter_subscribe(content_filter, peer_id, timeout)
}
/// Used to know if a service node has an active subscription for this client
pub fn filter_ping(&self, peer_id: PeerId, timeout: Duration) -> Result<()> {
pub fn filter_ping(&self, peer_id: PeerId, timeout: Option<Duration>) -> Result<()> {
filter::waku_filter_ping(peer_id, timeout)
}
@ -257,18 +259,18 @@ impl WakuNodeHandle<Running> {
/// criteria
pub fn filter_unsubscribe(
&self,
filter_subscription: &FilterSubscription,
content_filter: &ContentFilter,
peer_id: PeerId,
timeout: Duration,
timeout: Option<Duration>,
) -> Result<()> {
filter::waku_filter_unsubscribe(filter_subscription, peer_id, timeout)
filter::waku_filter_unsubscribe(content_filter, peer_id, timeout)
}
/// Sends a requests to a service node (or all service nodes) to stop pushing messages
pub fn filter_unsubscribe_all(
&self,
peer_id: Option<PeerId>,
timeout: Duration,
timeout: Option<Duration>,
) -> Result<()> {
filter::waku_filter_unsubscribe_all(peer_id, timeout)
}

View File

@ -6,7 +6,9 @@ use std::time::Duration;
// crates
use libc::*;
// internal
use crate::general::{Encoding, MessageId, Result, WakuContentTopic, WakuMessage, WakuPubSubTopic};
use crate::general::{
ContentFilter, Encoding, MessageId, Result, WakuContentTopic, WakuMessage, WakuPubSubTopic,
};
use crate::utils::{get_trampoline, handle_json_response, handle_no_response, handle_response};
/// Create a content topic according to [RFC 23](https://rfc.vac.dev/spec/23/)
@ -55,37 +57,6 @@ pub fn waku_create_content_topic(
.expect("&str from result should always be extracted")
}
/// Create a pubsub topic according to [RFC 23](https://rfc.vac.dev/spec/23/)
/// As per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_pubsub_topicchar-name-char-encoding)
pub fn waku_create_pubsub_topic(topic_name: &str, encoding: Encoding) -> WakuPubSubTopic {
let topic_name_ptr = CString::new(topic_name)
.expect("Topic name should always transform to CString")
.into_raw();
let encoding_ptr = CString::new(encoding.to_string())
.expect("Encoding should always transform to CString")
.into_raw();
let mut result: String = Default::default();
let result_cb = |v: &str| result = v.to_string();
let code = unsafe {
let mut closure = result_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_pubsub_topic(
topic_name_ptr,
encoding_ptr,
cb,
&mut closure as *mut _ as *mut c_void,
);
drop(CString::from_raw(topic_name_ptr));
drop(CString::from_raw(encoding_ptr));
out
};
handle_response(code, &result).expect("&str from result should always be extracted")
}
/// Default pubsub topic used for exchanging waku messages defined in [RFC 10](https://rfc.vac.dev/spec/10/)
pub fn waku_default_pubsub_topic() -> WakuPubSubTopic {
let mut result: String = Default::default();
@ -191,27 +162,25 @@ pub fn waku_enough_peers(pubsub_topic: Option<WakuPubSubTopic>) -> Result<bool>
handle_response(code, &result)
}
pub fn waku_relay_subscribe(pubsub_topic: Option<WakuPubSubTopic>) -> Result<()> {
let pubsub_topic = pubsub_topic
.unwrap_or_else(waku_default_pubsub_topic)
.to_string();
let pubsub_topic_ptr = CString::new(pubsub_topic)
.expect("CString should build properly from pubsub topic")
.into_raw();
pub fn waku_relay_subscribe(content_filter: &ContentFilter) -> Result<()> {
let content_filter_ptr = CString::new(
serde_json::to_string(content_filter)
.expect("ContentFilter should always succeed to serialize"),
)
.expect("ContentFilter should always be able to be serialized")
.into_raw();
let mut error: String = Default::default();
let error_cb = |v: &str| error = v.to_string();
let code = unsafe {
let mut closure = error_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_relay_subscribe(
pubsub_topic_ptr,
content_filter_ptr,
cb,
&mut closure as *mut _ as *mut c_void,
);
drop(CString::from_raw(pubsub_topic_ptr));
drop(CString::from_raw(content_filter_ptr));
out
};
@ -219,27 +188,25 @@ pub fn waku_relay_subscribe(pubsub_topic: Option<WakuPubSubTopic>) -> Result<()>
handle_no_response(code, &error)
}
pub fn waku_relay_unsubscribe(pubsub_topic: Option<WakuPubSubTopic>) -> Result<()> {
let pubsub_topic = pubsub_topic
.unwrap_or_else(waku_default_pubsub_topic)
.to_string();
let pubsub_topic_ptr = CString::new(pubsub_topic)
.expect("CString should build properly from pubsub topic")
.into_raw();
pub fn waku_relay_unsubscribe(content_filter: &ContentFilter) -> Result<()> {
let content_filter_ptr = CString::new(
serde_json::to_string(content_filter)
.expect("ContentFilter should always succeed to serialize"),
)
.expect("ContentFilter should always be able to be serialized")
.into_raw();
let mut error: String = Default::default();
let error_cb = |v: &str| error = v.to_string();
let code = unsafe {
let mut closure = error_cb;
let cb = get_trampoline(&closure);
let out = waku_sys::waku_relay_subscribe(
pubsub_topic_ptr,
content_filter_ptr,
cb,
&mut closure as *mut _ as *mut c_void,
);
drop(CString::from_raw(pubsub_topic_ptr));
drop(CString::from_raw(content_filter_ptr));
out
};

View File

@ -10,21 +10,27 @@ pub fn decode<T: DeserializeOwned>(input: &str) -> Result<T> {
}
unsafe extern "C" fn trampoline<F>(
_ret_code: ::std::os::raw::c_int,
data: *const ::std::os::raw::c_char,
user_data: *mut ::std::os::raw::c_void,
) where
F: FnMut(&str),
{
let user_data = &mut *(user_data as *mut F);
let response = unsafe { CStr::from_ptr(data) }
.to_str()
.map_err(|err| {
format!(
"could not retrieve response from pointer returned by waku: {}",
err
)
})
.expect("could not retrieve response");
let response = if data.is_null() {
""
} else {
unsafe { CStr::from_ptr(data) }
.to_str()
.map_err(|err| {
format!(
"could not retrieve response from pointer returned by waku: {}",
err
)
})
.expect("could not retrieve response")
};
user_data(response);
}

View File

@ -10,9 +10,9 @@ use std::{collections::HashSet, str::from_utf8};
use tokio::sync::mpsc::{self, Sender};
use tokio::time;
use waku_bindings::{
waku_default_pubsub_topic, waku_new, waku_set_event_callback, Encoding, Event, GossipSubParams,
Key, MessageId, ProtocolId, Running, WakuContentTopic, WakuLogLevel, WakuMessage,
WakuNodeConfig, WakuNodeHandle, WakuPubSubTopic,
waku_default_pubsub_topic, waku_new, waku_set_event_callback, ContentFilter, Encoding, Event,
GossipSubParams, Key, MessageId, ProtocolId, Running, WakuContentTopic, WakuLogLevel,
WakuMessage, WakuNodeConfig, WakuNodeHandle, WakuPubSubTopic,
};
const ECHO_TIMEOUT: u64 = 10;
@ -157,7 +157,8 @@ async fn discv5_echo() -> Result<(), String> {
let ssk = Aes256Gcm::generate_key(&mut thread_rng());
// Subscribe to default channel.
node.relay_subscribe(None)?;
let content_filter = ContentFilter::new(Some(waku_default_pubsub_topic()), vec![]);
node.relay_subscribe(&content_filter)?;
let content_topic = WakuContentTopic::new("toychat", 2, "huilong", Encoding::Proto);
let topics = node.relay_topics()?;
@ -215,7 +216,8 @@ async fn default_echo() -> Result<(), String> {
let ssk = Aes256Gcm::generate_key(&mut thread_rng());
// subscribe to default channel
node.relay_subscribe(None)?;
let content_filter = ContentFilter::new(Some(waku_default_pubsub_topic()), vec![]);
node.relay_subscribe(&content_filter)?;
let content_topic = WakuContentTopic::new("toychat", 2, "huilong", Encoding::Proto);
let sleep = time::sleep(Duration::from_secs(ECHO_TIMEOUT));

View File

@ -1,6 +1,6 @@
[package]
name = "waku-sys"
version = "0.4.0"
version = "0.5.0"
edition = "2021"
authors = [
"Daniel Sanchez Quiros <danielsq@status.im>"

@ -1 +1 @@
Subproject commit b3bd45f01f1211cb18fb44ced5277758ab38eee7
Subproject commit 02f2800b046094f73d1011081daef7d897126687