From 863d3aee9c2521fb29c379cfa1ee01fd2586b4f1 Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande Date: Sun, 5 Jan 2025 22:38:49 +0100 Subject: [PATCH] Some code simplification in filter --- examples/tic-tac-toe-gui/src/main.rs | 20 ++-- waku-bindings/src/general/libwaku_response.rs | 2 +- waku-bindings/src/node/filter.rs | 103 ++++-------------- waku-bindings/src/node/lightpush.rs | 1 - 4 files changed, 37 insertions(+), 89 deletions(-) diff --git a/examples/tic-tac-toe-gui/src/main.rs b/examples/tic-tac-toe-gui/src/main.rs index 7009741..9c96683 100644 --- a/examples/tic-tac-toe-gui/src/main.rs +++ b/examples/tic-tac-toe-gui/src/main.rs @@ -89,14 +89,14 @@ impl TicTacToeApp { let waku = self.waku.start().await.expect("waku should start"); // Subscribe to desired topic using the relay protocol - waku.relay_subscribe(&self.game_topic).await.expect("waku should subscribe"); + // waku.relay_subscribe(&self.game_topic).await.expect("waku should subscribe"); // Example filter subscription. This is needed in edge nodes (resource-restricted devices) // Nodes usually use either relay or lightpush/filter protocols - // let ctopic = WakuContentTopic::new("waku", "2", "tictactoegame", Encoding::Proto); - // let content_topics = vec![ctopic]; - // waku.filter_subscribe(&self.game_topic, content_topics).await.expect("waku should subscribe"); + let ctopic = WakuContentTopic::new("waku", "2", "tictactoegame", Encoding::Proto); + let content_topics = vec![ctopic]; + waku.filter_subscribe(&self.game_topic, content_topics).await.expect("waku should subscribe"); // End filter example ---------------------------------------- @@ -137,14 +137,18 @@ impl TicTacToeApp { false, ); - if let Ok(msg_hash) = self.waku.relay_publish_message(&message, &self.game_topic, None).await { - dbg!(format!("message hash published: {}", msg_hash)); - } + // if let Ok(msg_hash) = self.waku.relay_publish_message(&message, &self.game_topic, None).await { + // dbg!(format!("message hash published: {}", msg_hash)); + // } // Example lightpush publish message. This is needed in edge nodes (resource-restricted devices) // Nodes usually use either relay or lightpush/filter protocols // - // self.waku.lightpush_publish_message(&message, &self.game_topic); + let msg_hash_ret = self.waku.lightpush_publish_message(&message, &self.game_topic).await; + match msg_hash_ret { + Ok(msg_hash) => println!("Published message hash {:?}", msg_hash.to_string()), + Err(error) => println!("Failed to publish with lightpush: {}", error) + } // End example lightpush publish message } diff --git a/waku-bindings/src/general/libwaku_response.rs b/waku-bindings/src/general/libwaku_response.rs index d73953e..cd66bd8 100644 --- a/waku-bindings/src/general/libwaku_response.rs +++ b/waku-bindings/src/general/libwaku_response.rs @@ -30,7 +30,7 @@ impl TryFrom<(u32, &str)> for LibwakuResponse { } } -/// Used in cases where the FFI call doesn't return additional infomation in the +/// Used in cases where the FFI call doesn't return additional information in the /// callback. Instead, it returns RET_OK, RET_ERR, etc. pub fn handle_no_response(code: i32, result: LibwakuResponse) -> Result<()> { if result == LibwakuResponse::Undefined && code as u32 == RET_OK { diff --git a/waku-bindings/src/node/filter.rs b/waku-bindings/src/node/filter.rs index 0440bba..51b8267 100644 --- a/waku-bindings/src/node/filter.rs +++ b/waku-bindings/src/node/filter.rs @@ -2,16 +2,12 @@ // std use std::ffi::CString; -// crates -use libc::*; -use std::sync::Arc; -use tokio::sync::Notify; // internal use crate::general::contenttopic::WakuContentTopic; use crate::general::libwaku_response::{handle_no_response, LibwakuResponse}; use crate::general::pubsubtopic::PubsubTopic; use crate::general::Result; -use crate::macros::get_trampoline; +use crate::handle_ffi_call; use crate::node::context::WakuNodeContext; pub async fn waku_filter_subscribe( @@ -19,38 +15,19 @@ pub async fn waku_filter_subscribe( pubsub_topic: &PubsubTopic, content_topics: Vec, ) -> Result<()> { - let content_topics = WakuContentTopic::join_content_topics(content_topics); - let pubsub_topic = CString::new(String::from(pubsub_topic)) .expect("CString should build properly from pubsub topic"); - let pubsub_topic_ptr = pubsub_topic.as_ptr(); - + let content_topics = WakuContentTopic::join_content_topics(content_topics); let content_topics = CString::new(content_topics).expect("CString should build properly from content topic"); - let content_topics_ptr = content_topics.as_ptr(); - let mut result = LibwakuResponse::default(); - let notify = Arc::new(Notify::new()); - let notify_clone = notify.clone(); - let result_cb = |r: LibwakuResponse| { - result = r; - notify_clone.notify_one(); // Notify that the value has been updated - }; - - let code = unsafe { - let mut closure = result_cb; - let cb = get_trampoline(&closure); - waku_sys::waku_filter_subscribe( - ctx.get_ptr(), - pubsub_topic_ptr, - content_topics_ptr, - cb, - &mut closure as *mut _ as *mut c_void, - ) - }; - - notify.notified().await; // Wait until a result is received - handle_no_response(code, result) + handle_ffi_call!( + waku_sys::waku_filter_subscribe, + handle_no_response, + ctx.get_ptr(), + pubsub_topic.as_ptr(), + content_topics.as_ptr() + ) } pub async fn waku_filter_unsubscribe( @@ -58,57 +35,25 @@ pub async fn waku_filter_unsubscribe( pubsub_topic: &PubsubTopic, content_topics: Vec, // comma-separated list of content topics ) -> Result<()> { - let content_topics_topics = WakuContentTopic::join_content_topics(content_topics); - let pubsub_topic = CString::new(String::from(pubsub_topic)) .expect("CString should build properly from pubsub topic"); - let pubsub_topic_ptr = pubsub_topic.as_ptr(); + let content_topics = WakuContentTopic::join_content_topics(content_topics); + let content_topics = + CString::new(content_topics).expect("CString should build properly from content topic"); - let content_topics_topics = CString::new(content_topics_topics) - .expect("CString should build properly from content topic"); - let content_topics_topics_ptr = content_topics_topics.as_ptr(); - - let mut result = LibwakuResponse::default(); - let notify = Arc::new(Notify::new()); - let notify_clone = notify.clone(); - let result_cb = |r: LibwakuResponse| { - result = r; - notify_clone.notify_one(); // Notify that the value has been updated - }; - let code = unsafe { - let mut closure = result_cb; - let cb = get_trampoline(&closure); - waku_sys::waku_filter_unsubscribe( - ctx.get_ptr(), - pubsub_topic_ptr, - content_topics_topics_ptr, - cb, - &mut closure as *mut _ as *mut c_void, - ) - }; - - notify.notified().await; // Wait until a result is received - handle_no_response(code, result) + handle_ffi_call!( + waku_sys::waku_filter_unsubscribe, + handle_no_response, + ctx.get_ptr(), + pubsub_topic.as_ptr(), + content_topics.as_ptr() + ) } pub async fn waku_filter_unsubscribe_all(ctx: &WakuNodeContext) -> Result<()> { - let mut result = LibwakuResponse::default(); - let notify = Arc::new(Notify::new()); - let notify_clone = notify.clone(); - let result_cb = |r: LibwakuResponse| { - result = r; - notify_clone.notify_one(); // Notify that the value has been updated - }; - let code = unsafe { - let mut closure = result_cb; - let cb = get_trampoline(&closure); - waku_sys::waku_filter_unsubscribe_all( - ctx.get_ptr(), - cb, - &mut closure as *mut _ as *mut c_void, - ) - }; - - notify.notified().await; // Wait until a result is received - handle_no_response(code, result) + handle_ffi_call!( + waku_sys::waku_filter_unsubscribe_all, + handle_no_response, + ctx.get_ptr() + ) } diff --git a/waku-bindings/src/node/lightpush.rs b/waku-bindings/src/node/lightpush.rs index 060a152..24a5536 100644 --- a/waku-bindings/src/node/lightpush.rs +++ b/waku-bindings/src/node/lightpush.rs @@ -2,7 +2,6 @@ // std use std::ffi::CString; -// crates // internal use crate::general::libwaku_response::{handle_response, LibwakuResponse}; use crate::general::{messagehash::MessageHash, Result, WakuMessage};