refactor of msg callback

This commit is contained in:
Ivan Folgueira Bande 2024-11-18 10:14:03 +01:00
parent fc7189a2b5
commit 0bd6af6358
No known key found for this signature in database
GPG Key ID: 3C117481F89E24A7
4 changed files with 35 additions and 59 deletions

View File

@ -95,7 +95,7 @@ impl TicTacToeApp {
};
// Establish a closure that handles the incoming messages
// self.waku.ctx.waku_set_event_callback(my_closure);
self.waku.ctx.waku_set_event_callback(my_closure);
// Subscribe to desired topic
self.waku.relay_subscribe(&self.game_topic.to_string()).expect("waku should subscribe");
@ -319,14 +319,13 @@ async fn main() -> eframe::Result<()> {
// Discovery
dns_discovery: Some(true),
dns_discovery_url: Some("enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJS7UVSNDYZG4@boot.prod.status.nodes.status.im"),
discv5_discovery: Some(true),
discv5_udp_port: Some(9000),
discv5_enr_auto_update: Some(false),
// discv5_discovery: Some(true),
// discv5_udp_port: Some(9001),
// discv5_enr_auto_update: Some(false),
..Default::default()
}))
.expect("should instantiate");
// Initialize Waku
let game_state = GameState {
board: [[None; 3]; 3],
@ -353,7 +352,7 @@ async fn main() -> eframe::Result<()> {
}
}
else {
eprintln!("Failed to parse JSON: ");
eprintln!("Failed to parse JSON");
}
}
});

View File

@ -17,11 +17,9 @@ use crate::MessageHash;
use std::sync::{Arc, Mutex};
use crate::node::Observer;
pub struct WakuNodeContext {
pub obj_ptr: *mut c_void,
msg_observers: Arc<Mutex<Vec<Arc<dyn Observer + Send + Sync>>>>, // List of observers
msg_observer: Arc<Mutex<Box<dyn FnMut(LibwakuResponse) + Send + Sync>>>,
}
/// Waku event
@ -48,50 +46,37 @@ pub struct WakuMessageEvent {
}
impl WakuNodeContext {
pub fn new(
obj_ptr: *mut c_void
) -> Self {
pub fn new(obj_ptr: *mut c_void) -> Self {
Self {
obj_ptr: obj_ptr,
msg_observers: Arc::new(Mutex::new(Vec::new())),
}
}
pub fn add_msg_observer(&mut self, observer: Arc<dyn Observer + Send + Sync>) {
let mut observers = self.msg_observers.lock().expect("Failed to lock observers");
observers.push(observer);
}
pub fn notify_observers(&self, msg: &WakuMessage) {
let observers = self.msg_observers.lock().expect("Failed to lock observers");
for observer in observers.iter() {
observer.on_message_received(msg);
}
}
fn event_callback(response: LibwakuResponse) {
if let LibwakuResponse::Success(v) = response {
let event: Event =
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");
// let mut game_state = self.game_state.lock().unwrap();
match event {
Event::WakuMessage(evt) => {
println!("WakuMessage event received: {:?}", evt.waku_message);
},
Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err),
};
msg_observer: Arc::new(Mutex::new(Box::new(|_response| {
println!("msg observer not set")
}))),
}
}
/// Register callback to act as event handler and receive application events,
/// which are used to react to asynchronous events in Waku
pub fn waku_set_event_callback(&self) {
let mut closure = WakuNodeContext::event_callback;
unsafe {
let cb = get_trampoline(&closure);
waku_sys::waku_set_event_callback(self.obj_ptr, cb, &mut closure as *mut _ as *mut c_void)
};
pub fn waku_set_event_callback<F: FnMut(LibwakuResponse) + 'static + Sync + Send>(
&self,
mut closure: F,
) -> Result<(), String> {
if let Ok(mut boxed_closure) = self.msg_observer.lock() {
*boxed_closure = Box::new(closure);
unsafe {
let cb = get_trampoline(&(*boxed_closure));
waku_sys::waku_set_event_callback(
self.obj_ptr,
cb,
&mut (*boxed_closure) as *mut _ as *mut c_void,
)
};
Ok(())
} else {
Err(format!(
"Failed to acquire lock in waku_set_event_callback!"
))
}
}
}

View File

@ -8,10 +8,10 @@ use multiaddr::Multiaddr;
// internal
use super::config::WakuNodeConfig;
use crate::general::Result;
use crate::utils::LibwakuResponse;
use crate::utils::{get_trampoline, handle_json_response, handle_no_response, handle_response};
use crate::utils::WakuDecode;
use crate::node::events::WakuNodeContext;
use crate::utils::LibwakuResponse;
use crate::utils::WakuDecode;
use crate::utils::{get_trampoline, handle_json_response, handle_no_response, handle_response};
/// Instantiates a Waku node
/// as per the [specification](https://rfc.vac.dev/spec/36/#extern-char-waku_newchar-jsonconfig)
@ -43,11 +43,7 @@ pub fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeContext> {
match result {
LibwakuResponse::MissingCallback => panic!("callback is required"),
LibwakuResponse::Failure(v) => Err(v),
_ => {
let ctx = WakuNodeContext::new(obj_ptr);
ctx.waku_set_event_callback();
Ok(ctx)
},
_ => Ok(WakuNodeContext::new(obj_ptr)),
}
}

View File

@ -5,7 +5,6 @@ mod events;
mod management;
mod peers;
mod relay;
mod observer;
// std
pub use aes_gcm::Key;
@ -20,12 +19,10 @@ pub use config::WakuNodeConfig;
pub use events::{Event, WakuMessageEvent, WakuNodeContext};
pub use relay::waku_create_content_topic;
use crate::WakuContentTopic;
use crate::Encoding;
use crate::WakuContentTopic;
use std::time::SystemTime;
pub use observer::Observer;
/// Marker trait to disallow undesired waku node states in the handle
pub trait WakuNodeState {}
@ -136,5 +133,4 @@ impl WakuNodeHandle {
pub fn relay_unsubscribe(&self, pubsub_topic: &String) -> Result<()> {
relay::waku_relay_unsubscribe(&self.ctx, pubsub_topic)
}
}