trying new approach to set callback. not completed yet

This commit is contained in:
Ivan Folgueira Bande 2024-11-16 19:29:02 +07:00
parent ae71d06e79
commit fc7189a2b5
No known key found for this signature in database
GPG Key ID: 3C117481F89E24A7
4 changed files with 44 additions and 104 deletions

View File

@ -95,7 +95,7 @@ impl TicTacToeApp {
};
// Establish a closure that handles the incoming messages
self.waku.ctx.waku_set_event_callback(my_closure);
// self.waku.ctx.waku_set_event_callback(my_closure);
// Subscribe to desired topic
self.waku.relay_subscribe(&self.game_topic.to_string()).expect("waku should subscribe");

View File

@ -10,17 +10,18 @@ use std::ffi::c_void;
use serde::{Deserialize, Serialize};
// internal
use crate::general::WakuMessage;
use std::{slice, str};
use std::str;
use crate::utils::LibwakuResponse;
use crate::utils::{get_trampoline, LibwakuResponse};
use crate::MessageHash;
use std::ops::Deref;
use std::sync::Mutex;
// crates
use once_cell::sync::Lazy;
use std::sync::{Arc, Mutex};
use crate::node::Observer;
pub struct WakuNodeContext {
pub obj_ptr: *mut c_void,
msg_observers: Arc<Mutex<Vec<Arc<dyn Observer + Send + Sync>>>>, // List of observers
}
/// Waku event
@ -46,41 +47,29 @@ pub struct WakuMessageEvent {
pub waku_message: WakuMessage,
}
#[allow(clippy::type_complexity)]
static CALLBACK: Lazy<Mutex<Box<dyn FnMut(LibwakuResponse) + Send + Sync>>> =
Lazy::new(|| Mutex::new(Box::new(|_| {})));
/// Register global callback
fn set_callback<F: FnMut(LibwakuResponse) + Send + Sync + 'static>(f: F) {
*CALLBACK.lock().unwrap() = Box::new(f);
}
unsafe extern "C" fn callback(
ret_code: ::std::os::raw::c_int,
data: *const ::std::os::raw::c_char,
data_len: usize,
user_data: *mut ::std::os::raw::c_void,
) {
let response = if data.is_null() {
""
} else {
str::from_utf8(slice::from_raw_parts(data as *mut u8, data_len))
.expect("could not retrieve response")
};
let result = LibwakuResponse::try_from((ret_code as u32, response))
.expect("invalid response obtained from libwaku");
(CALLBACK
.deref()
.lock()
.expect("Access to the shared callback")
.as_mut())(result);
}
impl WakuNodeContext {
pub fn new(
obj_ptr: *mut c_void
) -> Self {
Self {
obj_ptr: obj_ptr,
msg_observers: Arc::new(Mutex::new(Vec::new())),
}
}
fn waku_event_callback(response: LibwakuResponse) {
pub fn add_msg_observer(&mut self, observer: Arc<dyn Observer + Send + Sync>) {
let mut observers = self.msg_observers.lock().expect("Failed to lock observers");
observers.push(observer);
}
pub fn notify_observers(&self, msg: &WakuMessage) {
let observers = self.msg_observers.lock().expect("Failed to lock observers");
for observer in observers.iter() {
observer.on_message_received(msg);
}
}
fn event_callback(response: LibwakuResponse) {
if let LibwakuResponse::Success(v) = response {
let event: Event =
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");
@ -88,77 +77,21 @@ impl WakuNodeContext {
// let mut game_state = self.game_state.lock().unwrap();
match event {
Event::WakuMessage(evt) => {
// println!("WakuMessage event received: {:?}", evt.waku_message);
let message = evt.waku_message;
let payload = message.payload.to_vec().clone();
match from_utf8(&payload) {
Ok(msg) => {
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
println!("Message Received: {}", msg);
println!("::::::::::::::::::::::::::::::::::::::::::::::::::::");
// Send the message to the main thread
if let Ok(mut tx) = tx_clone.try_lock() {
// Lock succeeded, proceed to send the message
if tx.blocking_send(msg.to_string()).is_err() {
eprintln!("Failed to send message to async task");
} else {
eprintln!("Sent!!!!");
}
} else {
eprintln!("Failed to acquire lock on tx_clone");
}
// Deserialize the JSON into the GameState struct
// Lock the game_state and update it
// match serde_json::from_str::<GameState>(msg) {
// Ok(parsed_value) => {
// // Handle the parsed value here
// // self.game_state = parsed_value;
// println!("Parsed correctly");
// }
// Err(e) => {
// eprintln!("Failed to parse JSON: {}", e);
// // Handle the error as needed, such as retrying, defaulting, etc.
// }
// }
// *game_state = serde_json::from_str(msg).expect("Failed to deserialize JSON");
// let tx_inner = tx_cloned.clone();
// let msg_inner = msg.to_string();
// tokio::spawn(async move {
// println!("do nothing");
// if tx_inner.send(msg_inner.to_string()).await.is_err() {
// eprintln!("Failed to send message");
// }
// });
}
Err(e) => {
eprintln!("Failed to decode payload as UTF-8: {}", e);
// Handle the error as needed, or just log and skip
}
}
}
println!("WakuMessage event received: {:?}", evt.waku_message);
},
Event::Unrecognized(err) => panic!("Unrecognized waku event: {:?}", err),
_ => panic!("event case not expected"),
};
}
}
/// Register callback to act as event handler and receive application events,
/// which are used to react to asynchronous events in Waku
pub fn waku_set_event_callback<F: FnMut(LibwakuResponse) + 'static + Sync + Send>(
&self,
mut closure: F,
) {
set_callback(closure);
pub fn waku_set_event_callback(&self) {
let mut closure = WakuNodeContext::event_callback;
unsafe {
waku_sys::waku_set_event_callback(
self.obj_ptr,
Some(callback),
callback as *mut c_void,
);
}
let cb = get_trampoline(&closure);
waku_sys::waku_set_event_callback(self.obj_ptr, cb, &mut closure as *mut _ as *mut c_void)
};
}
}

View File

@ -43,7 +43,11 @@ pub fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeContext> {
match result {
LibwakuResponse::MissingCallback => panic!("callback is required"),
LibwakuResponse::Failure(v) => Err(v),
_ => Ok(WakuNodeContext { obj_ptr }),
_ => {
let ctx = WakuNodeContext::new(obj_ptr);
ctx.waku_set_event_callback();
Ok(ctx)
},
}
}

View File

@ -5,6 +5,7 @@ mod events;
mod management;
mod peers;
mod relay;
mod observer;
// std
pub use aes_gcm::Key;
@ -23,6 +24,8 @@ use crate::WakuContentTopic;
use crate::Encoding;
use std::time::SystemTime;
pub use observer::Observer;
/// Marker trait to disallow undesired waku node states in the handle
pub trait WakuNodeState {}