Some code simplification in filter

This commit is contained in:
Ivan Folgueira Bande 2025-01-05 22:38:49 +01:00
parent d758d7273f
commit 863d3aee9c
No known key found for this signature in database
GPG Key ID: 3C117481F89E24A7
4 changed files with 37 additions and 89 deletions

View File

@ -89,14 +89,14 @@ impl TicTacToeApp<Initialized> {
let waku = self.waku.start().await.expect("waku should start");
// Subscribe to desired topic using the relay protocol
waku.relay_subscribe(&self.game_topic).await.expect("waku should subscribe");
// waku.relay_subscribe(&self.game_topic).await.expect("waku should subscribe");
// Example filter subscription. This is needed in edge nodes (resource-restricted devices)
// Nodes usually use either relay or lightpush/filter protocols
// let ctopic = WakuContentTopic::new("waku", "2", "tictactoegame", Encoding::Proto);
// let content_topics = vec![ctopic];
// waku.filter_subscribe(&self.game_topic, content_topics).await.expect("waku should subscribe");
let ctopic = WakuContentTopic::new("waku", "2", "tictactoegame", Encoding::Proto);
let content_topics = vec![ctopic];
waku.filter_subscribe(&self.game_topic, content_topics).await.expect("waku should subscribe");
// End filter example ----------------------------------------
@ -137,14 +137,18 @@ impl TicTacToeApp<Running> {
false,
);
if let Ok(msg_hash) = self.waku.relay_publish_message(&message, &self.game_topic, None).await {
dbg!(format!("message hash published: {}", msg_hash));
}
// if let Ok(msg_hash) = self.waku.relay_publish_message(&message, &self.game_topic, None).await {
// dbg!(format!("message hash published: {}", msg_hash));
// }
// Example lightpush publish message. This is needed in edge nodes (resource-restricted devices)
// Nodes usually use either relay or lightpush/filter protocols
//
// self.waku.lightpush_publish_message(&message, &self.game_topic);
let msg_hash_ret = self.waku.lightpush_publish_message(&message, &self.game_topic).await;
match msg_hash_ret {
Ok(msg_hash) => println!("Published message hash {:?}", msg_hash.to_string()),
Err(error) => println!("Failed to publish with lightpush: {}", error)
}
// End example lightpush publish message
}

View File

@ -30,7 +30,7 @@ impl TryFrom<(u32, &str)> for LibwakuResponse {
}
}
/// Used in cases where the FFI call doesn't return additional infomation in the
/// Used in cases where the FFI call doesn't return additional information in the
/// callback. Instead, it returns RET_OK, RET_ERR, etc.
pub fn handle_no_response(code: i32, result: LibwakuResponse) -> Result<()> {
if result == LibwakuResponse::Undefined && code as u32 == RET_OK {

View File

@ -2,16 +2,12 @@
// std
use std::ffi::CString;
// crates
use libc::*;
use std::sync::Arc;
use tokio::sync::Notify;
// internal
use crate::general::contenttopic::WakuContentTopic;
use crate::general::libwaku_response::{handle_no_response, LibwakuResponse};
use crate::general::pubsubtopic::PubsubTopic;
use crate::general::Result;
use crate::macros::get_trampoline;
use crate::handle_ffi_call;
use crate::node::context::WakuNodeContext;
pub async fn waku_filter_subscribe(
@ -19,38 +15,19 @@ pub async fn waku_filter_subscribe(
pubsub_topic: &PubsubTopic,
content_topics: Vec<WakuContentTopic>,
) -> Result<()> {
let content_topics = WakuContentTopic::join_content_topics(content_topics);
let pubsub_topic = CString::new(String::from(pubsub_topic))
.expect("CString should build properly from pubsub topic");
let pubsub_topic_ptr = pubsub_topic.as_ptr();
let content_topics = WakuContentTopic::join_content_topics(content_topics);
let content_topics =
CString::new(content_topics).expect("CString should build properly from content topic");
let content_topics_ptr = content_topics.as_ptr();
let mut result = LibwakuResponse::default();
let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();
let result_cb = |r: LibwakuResponse| {
result = r;
notify_clone.notify_one(); // Notify that the value has been updated
};
let code = unsafe {
let mut closure = result_cb;
let cb = get_trampoline(&closure);
waku_sys::waku_filter_subscribe(
ctx.get_ptr(),
pubsub_topic_ptr,
content_topics_ptr,
cb,
&mut closure as *mut _ as *mut c_void,
)
};
notify.notified().await; // Wait until a result is received
handle_no_response(code, result)
handle_ffi_call!(
waku_sys::waku_filter_subscribe,
handle_no_response,
ctx.get_ptr(),
pubsub_topic.as_ptr(),
content_topics.as_ptr()
)
}
pub async fn waku_filter_unsubscribe(
@ -58,57 +35,25 @@ pub async fn waku_filter_unsubscribe(
pubsub_topic: &PubsubTopic,
content_topics: Vec<WakuContentTopic>, // comma-separated list of content topics
) -> Result<()> {
let content_topics_topics = WakuContentTopic::join_content_topics(content_topics);
let pubsub_topic = CString::new(String::from(pubsub_topic))
.expect("CString should build properly from pubsub topic");
let pubsub_topic_ptr = pubsub_topic.as_ptr();
let content_topics = WakuContentTopic::join_content_topics(content_topics);
let content_topics =
CString::new(content_topics).expect("CString should build properly from content topic");
let content_topics_topics = CString::new(content_topics_topics)
.expect("CString should build properly from content topic");
let content_topics_topics_ptr = content_topics_topics.as_ptr();
let mut result = LibwakuResponse::default();
let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();
let result_cb = |r: LibwakuResponse| {
result = r;
notify_clone.notify_one(); // Notify that the value has been updated
};
let code = unsafe {
let mut closure = result_cb;
let cb = get_trampoline(&closure);
waku_sys::waku_filter_unsubscribe(
ctx.get_ptr(),
pubsub_topic_ptr,
content_topics_topics_ptr,
cb,
&mut closure as *mut _ as *mut c_void,
)
};
notify.notified().await; // Wait until a result is received
handle_no_response(code, result)
handle_ffi_call!(
waku_sys::waku_filter_unsubscribe,
handle_no_response,
ctx.get_ptr(),
pubsub_topic.as_ptr(),
content_topics.as_ptr()
)
}
pub async fn waku_filter_unsubscribe_all(ctx: &WakuNodeContext) -> Result<()> {
let mut result = LibwakuResponse::default();
let notify = Arc::new(Notify::new());
let notify_clone = notify.clone();
let result_cb = |r: LibwakuResponse| {
result = r;
notify_clone.notify_one(); // Notify that the value has been updated
};
let code = unsafe {
let mut closure = result_cb;
let cb = get_trampoline(&closure);
waku_sys::waku_filter_unsubscribe_all(
ctx.get_ptr(),
cb,
&mut closure as *mut _ as *mut c_void,
)
};
notify.notified().await; // Wait until a result is received
handle_no_response(code, result)
handle_ffi_call!(
waku_sys::waku_filter_unsubscribe_all,
handle_no_response,
ctx.get_ptr()
)
}

View File

@ -2,7 +2,6 @@
// std
use std::ffi::CString;
// crates
// internal
use crate::general::libwaku_response::{handle_response, LibwakuResponse};
use crate::general::{messagehash::MessageHash, Result, WakuMessage};