diff --git a/examples/tic-tac-toe-gui/src/main.rs b/examples/tic-tac-toe-gui/src/main.rs index 56d4ff1..7f130c5 100644 --- a/examples/tic-tac-toe-gui/src/main.rs +++ b/examples/tic-tac-toe-gui/src/main.rs @@ -95,7 +95,7 @@ impl TicTacToeApp { }; // Establish a closure that handles the incoming messages - self.waku.ctx.waku_set_event_callback(my_closure); + // self.waku.ctx.waku_set_event_callback(my_closure); // Subscribe to desired topic self.waku.relay_subscribe(&self.game_topic.to_string()).expect("waku should subscribe"); diff --git a/waku-bindings/src/node/events.rs b/waku-bindings/src/node/events.rs index c7e8d33..6737314 100644 --- a/waku-bindings/src/node/events.rs +++ b/waku-bindings/src/node/events.rs @@ -10,17 +10,18 @@ use std::ffi::c_void; use serde::{Deserialize, Serialize}; // internal use crate::general::WakuMessage; -use std::{slice, str}; +use std::str; -use crate::utils::LibwakuResponse; +use crate::utils::{get_trampoline, LibwakuResponse}; use crate::MessageHash; -use std::ops::Deref; -use std::sync::Mutex; -// crates -use once_cell::sync::Lazy; + +use std::sync::{Arc, Mutex}; + +use crate::node::Observer; pub struct WakuNodeContext { pub obj_ptr: *mut c_void, + msg_observers: Arc>>>, // List of observers } /// Waku event @@ -46,41 +47,29 @@ pub struct WakuMessageEvent { pub waku_message: WakuMessage, } -#[allow(clippy::type_complexity)] -static CALLBACK: Lazy>> = - Lazy::new(|| Mutex::new(Box::new(|_| {}))); - -/// Register global callback -fn set_callback(f: F) { - *CALLBACK.lock().unwrap() = Box::new(f); -} - -unsafe extern "C" fn callback( - ret_code: ::std::os::raw::c_int, - data: *const ::std::os::raw::c_char, - data_len: usize, - user_data: *mut ::std::os::raw::c_void, -) { - let response = if data.is_null() { - "" - } else { - str::from_utf8(slice::from_raw_parts(data as *mut u8, data_len)) - .expect("could not retrieve response") - }; - - let result = LibwakuResponse::try_from((ret_code as u32, response)) - .expect("invalid response obtained from libwaku"); - - (CALLBACK - .deref() - .lock() - .expect("Access to the shared callback") - .as_mut())(result); -} - impl WakuNodeContext { + pub fn new( + obj_ptr: *mut c_void + ) -> Self { + Self { + obj_ptr: obj_ptr, + msg_observers: Arc::new(Mutex::new(Vec::new())), + } + } - fn waku_event_callback(response: LibwakuResponse) { + pub fn add_msg_observer(&mut self, observer: Arc) { + let mut observers = self.msg_observers.lock().expect("Failed to lock observers"); + observers.push(observer); + } + + pub fn notify_observers(&self, msg: &WakuMessage) { + let observers = self.msg_observers.lock().expect("Failed to lock observers"); + for observer in observers.iter() { + observer.on_message_received(msg); + } + } + + fn event_callback(response: LibwakuResponse) { if let LibwakuResponse::Success(v) = response { let event: Event = serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed"); @@ -88,77 +77,21 @@ impl WakuNodeContext { // let mut game_state = self.game_state.lock().unwrap(); match event { Event::WakuMessage(evt) => { - // println!("WakuMessage event received: {:?}", evt.waku_message); - let message = evt.waku_message; - let payload = message.payload.to_vec().clone(); - match from_utf8(&payload) { - Ok(msg) => { - println!("::::::::::::::::::::::::::::::::::::::::::::::::::::"); - println!("Message Received: {}", msg); - println!("::::::::::::::::::::::::::::::::::::::::::::::::::::"); - - // Send the message to the main thread - if let Ok(mut tx) = tx_clone.try_lock() { - // Lock succeeded, proceed to send the message - if tx.blocking_send(msg.to_string()).is_err() { - eprintln!("Failed to send message to async task"); - } else { - eprintln!("Sent!!!!"); - } - } else { - eprintln!("Failed to acquire lock on tx_clone"); - } - - // Deserialize the JSON into the GameState struct - // Lock the game_state and update it - // match serde_json::from_str::(msg) { - // Ok(parsed_value) => { - // // Handle the parsed value here - // // self.game_state = parsed_value; - // println!("Parsed correctly"); - // } - // Err(e) => { - // eprintln!("Failed to parse JSON: {}", e); - // // Handle the error as needed, such as retrying, defaulting, etc. - // } - // } - // *game_state = serde_json::from_str(msg).expect("Failed to deserialize JSON"); - - // let tx_inner = tx_cloned.clone(); - // let msg_inner = msg.to_string(); - // tokio::spawn(async move { - // println!("do nothing"); - // if tx_inner.send(msg_inner.to_string()).await.is_err() { - // eprintln!("Failed to send message"); - // } - // }); - } - Err(e) => { - eprintln!("Failed to decode payload as UTF-8: {}", e); - // Handle the error as needed, or just log and skip - } - } - } + println!("WakuMessage event received: {:?}", evt.waku_message); + }, Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err), - _ => panic!("event case not expected"), }; } } /// Register callback to act as event handler and receive application events, /// which are used to react to asynchronous events in Waku - pub fn waku_set_event_callback( - &self, - mut closure: F, - ) { - set_callback(closure); + pub fn waku_set_event_callback(&self) { + let mut closure = WakuNodeContext::event_callback; unsafe { - waku_sys::waku_set_event_callback( - self.obj_ptr, - Some(callback), - callback as *mut c_void, - ); - } + let cb = get_trampoline(&closure); + waku_sys::waku_set_event_callback(self.obj_ptr, cb, &mut closure as *mut _ as *mut c_void) + }; } } diff --git a/waku-bindings/src/node/management.rs b/waku-bindings/src/node/management.rs index 2926b4e..2530a25 100644 --- a/waku-bindings/src/node/management.rs +++ b/waku-bindings/src/node/management.rs @@ -43,7 +43,11 @@ pub fn waku_new(config: Option) -> Result { match result { LibwakuResponse::MissingCallback => panic!("callback is required"), LibwakuResponse::Failure(v) => Err(v), - _ => Ok(WakuNodeContext { obj_ptr }), + _ => { + let ctx = WakuNodeContext::new(obj_ptr); + ctx.waku_set_event_callback(); + Ok(ctx) + }, } } diff --git a/waku-bindings/src/node/mod.rs b/waku-bindings/src/node/mod.rs index a3386c0..0bcae89 100644 --- a/waku-bindings/src/node/mod.rs +++ b/waku-bindings/src/node/mod.rs @@ -5,6 +5,7 @@ mod events; mod management; mod peers; mod relay; +mod observer; // std pub use aes_gcm::Key; @@ -23,6 +24,8 @@ use crate::WakuContentTopic; use crate::Encoding; use std::time::SystemTime; +pub use observer::Observer; + /// Marker trait to disallow undesired waku node states in the handle pub trait WakuNodeState {}