diff --git a/examples/toy-chat/src/main.rs b/examples/toy-chat/src/main.rs index 1d0e200..f9eb2df 100644 --- a/examples/toy-chat/src/main.rs +++ b/examples/toy-chat/src/main.rs @@ -87,7 +87,7 @@ impl App { self.waku.set_event_callback(move|response| { if let LibwakuResponse::Success(v) = response { let event: WakuEvent = - serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed"); + serde_json::from_str(v.unwrap().as_str()).expect("failed parsing event in set_event_callback"); match event { WakuEvent::WakuMessage(evt) => { @@ -98,7 +98,11 @@ impl App { match ::decode(evt.waku_message.payload()) { Ok(chat_message) => { - shared_messages.write().unwrap().push(chat_message); + // Add the new message to the front + { + let mut messages_lock = shared_messages.write().unwrap(); + messages_lock.insert(0, chat_message); // Insert at the front (index 0) + } } Err(e) => { let mut out = std::io::stderr(); @@ -130,10 +134,8 @@ impl App { impl App { fn retrieve_history(&mut self) { - let history = self.waku.store_query(None, vec![TOY_CHAT_CONTENT_TOPIC.clone()], STORE_NODE); - let history = history.unwrap(); - - let messages = history.messages + let messages = self.waku.store_query(None, vec![TOY_CHAT_CONTENT_TOPIC.clone()], STORE_NODE).unwrap(); + let messages:Vec<_> = messages .iter() .map(|store_resp_msg| { ::decode(store_resp_msg.message.payload()) @@ -141,7 +143,7 @@ impl App { }) .collect(); - if history.messages.len() > 0 { + if messages.len() > 0 { *self.messages.write().unwrap() = messages; } } @@ -150,9 +152,6 @@ impl App { &mut self, terminal: &mut Terminal, ) -> std::result::Result<(), Box> { - - self.retrieve_history(); - loop { terminal.draw(|f| ui(f, self))?; @@ -229,7 +228,9 @@ fn main() -> std::result::Result<(), Box> { let backend = CrosstermBackend::new(stdout); let mut terminal = Terminal::new(backend)?; + app.retrieve_history(); let res = app.run_main_loop(&mut terminal); + app.stop_app(); // restore terminal disable_raw_mode()?; diff --git a/waku-bindings/src/general/messagehash.rs b/waku-bindings/src/general/messagehash.rs index 1a9ad58..ffdd5a9 100644 --- a/waku-bindings/src/general/messagehash.rs +++ b/waku-bindings/src/general/messagehash.rs @@ -1,10 +1,50 @@ -pub struct MessageHash { - pub data: [u8; 32], -} +use crate::utils::WakuDecode; +use hex::FromHex; +use serde::{Deserialize, Deserializer, Serialize}; +use std::convert::TryInto; +use std::str::FromStr; -impl MessageHash { - // Create a new hash with default (zeroed) data - pub fn new() -> Self { - MessageHash { data: [0u8; 32] } +/// Waku message hash, hex encoded sha256 digest of the message +#[derive(Debug, Serialize, Clone)] +pub struct MessageHash([u8; 32]); + +impl FromStr for MessageHash { + type Err = String; + + fn from_str(s: &str) -> std::result::Result { + let s = s.strip_prefix("0x").unwrap_or(s); + // Decode the hexadecimal string to a Vec + // We expect a string format like: d38220de82fbcf2df865b680692fce98c36600fdd1d954b8a71e916dc4222b8e + let bytes = Vec::from_hex(s).map_err(|e| format!("Hex decode error MessageHash: {}", e))?; + + // Ensure the length is exactly 32 bytes + let res = bytes + .try_into() + .map_err(|_| "Hex string must represent exactly 32 bytes".to_string())?; + + Ok(MessageHash(res)) + } +} + +impl<'de> Deserialize<'de> for MessageHash { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + // Deserialize the input as a vector of u8 + let vec: Vec = Deserialize::deserialize(deserializer)?; + + // Ensure the vector has exactly 32 elements + let array: [u8; 32] = vec + .try_into() + .map_err(|_| serde::de::Error::custom("Expected an array of length 32"))?; + + Ok(MessageHash(array)) + } +} + +impl WakuDecode for MessageHash { + fn decode(input: &str) -> Result { + serde_json::from_str(input).expect("could not parse MessageHash") } } diff --git a/waku-bindings/src/general/mod.rs b/waku-bindings/src/general/mod.rs index 4cb8524..c63db6b 100644 --- a/waku-bindings/src/general/mod.rs +++ b/waku-bindings/src/general/mod.rs @@ -11,8 +11,6 @@ use serde_aux::prelude::*; /// Waku message version pub type WakuMessageVersion = usize; -/// Waku message hash, hex encoded sha256 digest of the message -pub type MessageHash = String; pub type Result = std::result::Result; diff --git a/waku-bindings/src/lib.rs b/waku-bindings/src/lib.rs index fcce68a..03d2e45 100644 --- a/waku-bindings/src/lib.rs +++ b/waku-bindings/src/lib.rs @@ -15,9 +15,9 @@ pub use utils::LibwakuResponse; use rln; pub use node::{ - waku_create_content_topic, waku_new, WakuEvent, Initialized, Key, Multiaddr, PublicKey, RLNConfig, - Running, SecretKey, WakuMessageEvent, WakuNodeConfig, WakuNodeHandle, + waku_create_content_topic, waku_new, Initialized, Key, Multiaddr, PublicKey, RLNConfig, + Running, SecretKey, WakuEvent, WakuMessageEvent, WakuNodeConfig, WakuNodeHandle, }; pub use general::contenttopic::{Encoding, WakuContentTopic}; -pub use general::{MessageHash, Result, WakuMessage, WakuMessageVersion}; +pub use general::{messagehash::MessageHash, Result, WakuMessage, WakuMessageVersion}; diff --git a/waku-bindings/src/node/lightpush.rs b/waku-bindings/src/node/lightpush.rs index 925c01c..3b538eb 100644 --- a/waku-bindings/src/node/lightpush.rs +++ b/waku-bindings/src/node/lightpush.rs @@ -5,7 +5,7 @@ use std::ffi::CString; // crates use libc::*; // internal -use crate::general::{MessageHash, Result, WakuMessage}; +use crate::general::{messagehash::MessageHash, Result, WakuMessage}; use crate::node::context::WakuNodeContext; use crate::utils::{get_trampoline, handle_response, LibwakuResponse}; diff --git a/waku-bindings/src/node/mod.rs b/waku-bindings/src/node/mod.rs index e52c28d..3c022e9 100644 --- a/waku-bindings/src/node/mod.rs +++ b/waku-bindings/src/node/mod.rs @@ -17,11 +17,11 @@ pub use multiaddr::Multiaddr; pub use secp256k1::{PublicKey, SecretKey}; use std::marker::PhantomData; use std::time::Duration; -use store::StoreResponse; +use store::StoreWakuMessageResponse; // internal use crate::general::contenttopic::{Encoding, WakuContentTopic}; pub use crate::general::pubsubtopic::PubsubTopic; -use crate::general::{MessageHash, Result, WakuMessage}; +use crate::general::{messagehash::MessageHash, Result, WakuMessage}; use crate::utils::LibwakuResponse; use crate::node::context::WakuNodeContext; @@ -187,25 +187,43 @@ impl WakuNodeHandle { pubsub_topic: Option, content_topics: Vec, peer_addr: &str, - ) -> Result { - store::waku_store_query( - &self.ctx, - "hard-coded-req-id".to_string(), - true, // include_data - pubsub_topic, - content_topics, - Some( - (Duration::from_secs(Utc::now().timestamp() as u64) - - Duration::from_secs(60 * 60 * 24)) - .as_nanos() as usize, - ), // time_start - None, // end_time - None, // message_hashes - None, // pagination_cursor - true, // pagination_forward - Some(25), // pagination_limit, - peer_addr, - None, // timeout_millis - ) + ) -> Result> { + let one_day_in_secs = 60 * 60 * 24; + let time_start = (Duration::from_secs(Utc::now().timestamp() as u64) + - Duration::from_secs(one_day_in_secs)) + .as_nanos() as usize; + + let mut cursor: Option = None; + + let mut messages: Vec = Vec::new(); + + loop { + let response = store::waku_store_query( + &self.ctx, + "hard-coded-req-id".to_string(), + true, // include_data + pubsub_topic.clone(), + content_topics.clone(), + Some(time_start), // time_start + None, // end_time + None, // message_hashes + cursor, // pagination_cursor + true, // pagination_forward + Some(25), // pagination_limit, + peer_addr, + None, // timeout_millis + )?; + + messages.extend(response.messages); + + if !response.pagination_cursor.is_some() { + break; + } + cursor = response.pagination_cursor; + } + + messages.reverse(); + + return Ok(messages); } } diff --git a/waku-bindings/src/node/store.rs b/waku-bindings/src/node/store.rs index a507c0a..d436cb0 100644 --- a/waku-bindings/src/node/store.rs +++ b/waku-bindings/src/node/store.rs @@ -6,7 +6,7 @@ use std::ffi::CString; use libc::*; // internal use crate::general::{ - contenttopic::WakuContentTopic, pubsubtopic::PubsubTopic, MessageHash, Result, + contenttopic::WakuContentTopic, messagehash::MessageHash, pubsubtopic::PubsubTopic, Result, WakuStoreRespMessage, }; use crate::node::context::WakuNodeContext; @@ -48,7 +48,7 @@ struct StoreQueryRequest { #[derive(Clone, Deserialize, Debug)] #[serde(rename_all = "camelCase")] pub struct StoreWakuMessageResponse { - pub message_hash: [u8; 32], + pub message_hash: MessageHash, pub message: WakuStoreRespMessage, pub pubsub_topic: String, } @@ -66,14 +66,13 @@ pub struct StoreResponse { pub messages: Vec, /// Paging information in [`PagingOptions`] format from which to resume further historical queries #[serde(skip_serializing_if = "Option::is_none")] - pub pagination_cursor: Option<[u8; 32]>, + pub pagination_cursor: Option, } // Implement WakuDecode for Vec impl WakuDecode for StoreResponse { fn decode(input: &str) -> Result { - let ret: StoreResponse = serde_json::from_str(input).expect("could not parse store resp"); - Ok(ret) + Ok(serde_json::from_str(input).expect("could not parse store resp")) } } diff --git a/waku-bindings/src/utils.rs b/waku-bindings/src/utils.rs index 760f774..d4ec81a 100644 --- a/waku-bindings/src/utils.rs +++ b/waku-bindings/src/utils.rs @@ -99,12 +99,15 @@ pub fn handle_json_response(code: i32, result: LibwakuResponse) - } } -pub fn handle_response(code: i32, result: LibwakuResponse) -> Result { +pub fn handle_response(code: i32, result: LibwakuResponse) -> Result +where + ::Err: std::fmt::Debug, +{ match result { LibwakuResponse::Success(v) => v .unwrap_or_default() .parse() - .map_err(|_| "could not parse value".into()), + .map_err(|e| format!("could not parse value: {:?}", e)), LibwakuResponse::Failure(v) => Err(v), LibwakuResponse::MissingCallback => panic!("callback is required"), LibwakuResponse::Undefined => panic!( diff --git a/waku-bindings/tests/node.rs b/waku-bindings/tests/node.rs index 369ef3e..4c9fd81 100644 --- a/waku-bindings/tests/node.rs +++ b/waku-bindings/tests/node.rs @@ -10,7 +10,7 @@ use tokio::time; use tokio::time::sleep; use waku_bindings::node::PubsubTopic; use waku_bindings::{ - waku_new, Encoding, WakuEvent, Initialized, MessageHash, WakuContentTopic, WakuMessage, + waku_new, Encoding, Initialized, MessageHash, WakuContentTopic, WakuEvent, WakuMessage, WakuNodeConfig, WakuNodeHandle, }; use waku_bindings::{LibwakuResponse, Running}; @@ -45,8 +45,8 @@ async fn test_echo_messages( let rx_waku_message_cloned = rx_waku_message.clone(); let closure = move |response| { if let LibwakuResponse::Success(v) = response { - let event: WakuEvent = - serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed"); + let event: WakuEvent = serde_json::from_str(v.unwrap().as_str()) + .expect("Parsing event to succeed test_echo_messages"); match event { WakuEvent::WakuMessage(evt) => { diff --git a/waku-sys/build.rs b/waku-sys/build.rs index b41bba1..046f9ed 100644 --- a/waku-sys/build.rs +++ b/waku-sys/build.rs @@ -68,12 +68,6 @@ fn generate_bindgen_code(project_dir: &Path) { println!("cargo:rustc-link-lib=stdc++"); - println!( - "cargo:rustc-link-search={}", - vendor_path.join("vendor/negentropy/cpp").display() - ); - println!("cargo:rustc-link-lib=static=negentropy"); - println!("cargo:rustc-link-lib=ssl"); println!("cargo:rustc-link-lib=crypto");