proper use of messagehash and store

This commit is contained in:
Ivan Folgueira Bande 2024-12-15 21:29:44 +01:00
parent 3f72fc7a2d
commit 345dcbed39
No known key found for this signature in database
GPG Key ID: 3C117481F89E24A7
10 changed files with 114 additions and 61 deletions

View File

@ -87,7 +87,7 @@ impl App<Initialized> {
self.waku.set_event_callback(move|response| {
if let LibwakuResponse::Success(v) = response {
let event: WakuEvent =
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");
serde_json::from_str(v.unwrap().as_str()).expect("failed parsing event in set_event_callback");
match event {
WakuEvent::WakuMessage(evt) => {
@ -98,7 +98,11 @@ impl App<Initialized> {
match <Chat2Message as Message>::decode(evt.waku_message.payload()) {
Ok(chat_message) => {
shared_messages.write().unwrap().push(chat_message);
// Add the new message to the front
{
let mut messages_lock = shared_messages.write().unwrap();
messages_lock.insert(0, chat_message); // Insert at the front (index 0)
}
}
Err(e) => {
let mut out = std::io::stderr();
@ -130,10 +134,8 @@ impl App<Initialized> {
impl App<Running> {
fn retrieve_history(&mut self) {
let history = self.waku.store_query(None, vec![TOY_CHAT_CONTENT_TOPIC.clone()], STORE_NODE);
let history = history.unwrap();
let messages = history.messages
let messages = self.waku.store_query(None, vec![TOY_CHAT_CONTENT_TOPIC.clone()], STORE_NODE).unwrap();
let messages:Vec<_> = messages
.iter()
.map(|store_resp_msg| {
<Chat2Message as Message>::decode(store_resp_msg.message.payload())
@ -141,7 +143,7 @@ impl App<Running> {
})
.collect();
if history.messages.len() > 0 {
if messages.len() > 0 {
*self.messages.write().unwrap() = messages;
}
}
@ -150,9 +152,6 @@ impl App<Running> {
&mut self,
terminal: &mut Terminal<B>,
) -> std::result::Result<(), Box<dyn Error>> {
self.retrieve_history();
loop {
terminal.draw(|f| ui(f, self))?;
@ -229,7 +228,9 @@ fn main() -> std::result::Result<(), Box<dyn Error>> {
let backend = CrosstermBackend::new(stdout);
let mut terminal = Terminal::new(backend)?;
app.retrieve_history();
let res = app.run_main_loop(&mut terminal);
app.stop_app();
// restore terminal
disable_raw_mode()?;

View File

@ -1,10 +1,50 @@
pub struct MessageHash {
pub data: [u8; 32],
}
use crate::utils::WakuDecode;
use hex::FromHex;
use serde::{Deserialize, Deserializer, Serialize};
use std::convert::TryInto;
use std::str::FromStr;
impl MessageHash {
// Create a new hash with default (zeroed) data
pub fn new() -> Self {
MessageHash { data: [0u8; 32] }
/// Waku message hash, hex encoded sha256 digest of the message
#[derive(Debug, Serialize, Clone)]
pub struct MessageHash([u8; 32]);
impl FromStr for MessageHash {
type Err = String;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
let s = s.strip_prefix("0x").unwrap_or(s);
// Decode the hexadecimal string to a Vec<u8>
// We expect a string format like: d38220de82fbcf2df865b680692fce98c36600fdd1d954b8a71e916dc4222b8e
let bytes = Vec::from_hex(s).map_err(|e| format!("Hex decode error MessageHash: {}", e))?;
// Ensure the length is exactly 32 bytes
let res = bytes
.try_into()
.map_err(|_| "Hex string must represent exactly 32 bytes".to_string())?;
Ok(MessageHash(res))
}
}
impl<'de> Deserialize<'de> for MessageHash {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
// Deserialize the input as a vector of u8
let vec: Vec<u8> = Deserialize::deserialize(deserializer)?;
// Ensure the vector has exactly 32 elements
let array: [u8; 32] = vec
.try_into()
.map_err(|_| serde::de::Error::custom("Expected an array of length 32"))?;
Ok(MessageHash(array))
}
}
impl WakuDecode for MessageHash {
fn decode(input: &str) -> Result<Self, String> {
serde_json::from_str(input).expect("could not parse MessageHash")
}
}

View File

@ -11,8 +11,6 @@ use serde_aux::prelude::*;
/// Waku message version
pub type WakuMessageVersion = usize;
/// Waku message hash, hex encoded sha256 digest of the message
pub type MessageHash = String;
pub type Result<T> = std::result::Result<T, String>;

View File

@ -15,9 +15,9 @@ pub use utils::LibwakuResponse;
use rln;
pub use node::{
waku_create_content_topic, waku_new, WakuEvent, Initialized, Key, Multiaddr, PublicKey, RLNConfig,
Running, SecretKey, WakuMessageEvent, WakuNodeConfig, WakuNodeHandle,
waku_create_content_topic, waku_new, Initialized, Key, Multiaddr, PublicKey, RLNConfig,
Running, SecretKey, WakuEvent, WakuMessageEvent, WakuNodeConfig, WakuNodeHandle,
};
pub use general::contenttopic::{Encoding, WakuContentTopic};
pub use general::{MessageHash, Result, WakuMessage, WakuMessageVersion};
pub use general::{messagehash::MessageHash, Result, WakuMessage, WakuMessageVersion};

View File

@ -5,7 +5,7 @@ use std::ffi::CString;
// crates
use libc::*;
// internal
use crate::general::{MessageHash, Result, WakuMessage};
use crate::general::{messagehash::MessageHash, Result, WakuMessage};
use crate::node::context::WakuNodeContext;
use crate::utils::{get_trampoline, handle_response, LibwakuResponse};

View File

@ -17,11 +17,11 @@ pub use multiaddr::Multiaddr;
pub use secp256k1::{PublicKey, SecretKey};
use std::marker::PhantomData;
use std::time::Duration;
use store::StoreResponse;
use store::StoreWakuMessageResponse;
// internal
use crate::general::contenttopic::{Encoding, WakuContentTopic};
pub use crate::general::pubsubtopic::PubsubTopic;
use crate::general::{MessageHash, Result, WakuMessage};
use crate::general::{messagehash::MessageHash, Result, WakuMessage};
use crate::utils::LibwakuResponse;
use crate::node::context::WakuNodeContext;
@ -187,25 +187,43 @@ impl WakuNodeHandle<Running> {
pubsub_topic: Option<PubsubTopic>,
content_topics: Vec<WakuContentTopic>,
peer_addr: &str,
) -> Result<StoreResponse> {
store::waku_store_query(
&self.ctx,
"hard-coded-req-id".to_string(),
true, // include_data
pubsub_topic,
content_topics,
Some(
(Duration::from_secs(Utc::now().timestamp() as u64)
- Duration::from_secs(60 * 60 * 24))
.as_nanos() as usize,
), // time_start
None, // end_time
None, // message_hashes
None, // pagination_cursor
true, // pagination_forward
Some(25), // pagination_limit,
peer_addr,
None, // timeout_millis
)
) -> Result<Vec<StoreWakuMessageResponse>> {
let one_day_in_secs = 60 * 60 * 24;
let time_start = (Duration::from_secs(Utc::now().timestamp() as u64)
- Duration::from_secs(one_day_in_secs))
.as_nanos() as usize;
let mut cursor: Option<MessageHash> = None;
let mut messages: Vec<StoreWakuMessageResponse> = Vec::new();
loop {
let response = store::waku_store_query(
&self.ctx,
"hard-coded-req-id".to_string(),
true, // include_data
pubsub_topic.clone(),
content_topics.clone(),
Some(time_start), // time_start
None, // end_time
None, // message_hashes
cursor, // pagination_cursor
true, // pagination_forward
Some(25), // pagination_limit,
peer_addr,
None, // timeout_millis
)?;
messages.extend(response.messages);
if !response.pagination_cursor.is_some() {
break;
}
cursor = response.pagination_cursor;
}
messages.reverse();
return Ok(messages);
}
}

View File

@ -6,7 +6,7 @@ use std::ffi::CString;
use libc::*;
// internal
use crate::general::{
contenttopic::WakuContentTopic, pubsubtopic::PubsubTopic, MessageHash, Result,
contenttopic::WakuContentTopic, messagehash::MessageHash, pubsubtopic::PubsubTopic, Result,
WakuStoreRespMessage,
};
use crate::node::context::WakuNodeContext;
@ -48,7 +48,7 @@ struct StoreQueryRequest {
#[derive(Clone, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct StoreWakuMessageResponse {
pub message_hash: [u8; 32],
pub message_hash: MessageHash,
pub message: WakuStoreRespMessage,
pub pubsub_topic: String,
}
@ -66,14 +66,13 @@ pub struct StoreResponse {
pub messages: Vec<StoreWakuMessageResponse>,
/// Paging information in [`PagingOptions`] format from which to resume further historical queries
#[serde(skip_serializing_if = "Option::is_none")]
pub pagination_cursor: Option<[u8; 32]>,
pub pagination_cursor: Option<MessageHash>,
}
// Implement WakuDecode for Vec<Multiaddr>
impl WakuDecode for StoreResponse {
fn decode(input: &str) -> Result<Self> {
let ret: StoreResponse = serde_json::from_str(input).expect("could not parse store resp");
Ok(ret)
Ok(serde_json::from_str(input).expect("could not parse store resp"))
}
}

View File

@ -99,12 +99,15 @@ pub fn handle_json_response<F: WakuDecode>(code: i32, result: LibwakuResponse) -
}
}
pub fn handle_response<F: FromStr>(code: i32, result: LibwakuResponse) -> Result<F> {
pub fn handle_response<F: FromStr>(code: i32, result: LibwakuResponse) -> Result<F>
where
<F as FromStr>::Err: std::fmt::Debug,
{
match result {
LibwakuResponse::Success(v) => v
.unwrap_or_default()
.parse()
.map_err(|_| "could not parse value".into()),
.map_err(|e| format!("could not parse value: {:?}", e)),
LibwakuResponse::Failure(v) => Err(v),
LibwakuResponse::MissingCallback => panic!("callback is required"),
LibwakuResponse::Undefined => panic!(

View File

@ -10,7 +10,7 @@ use tokio::time;
use tokio::time::sleep;
use waku_bindings::node::PubsubTopic;
use waku_bindings::{
waku_new, Encoding, WakuEvent, Initialized, MessageHash, WakuContentTopic, WakuMessage,
waku_new, Encoding, Initialized, MessageHash, WakuContentTopic, WakuEvent, WakuMessage,
WakuNodeConfig, WakuNodeHandle,
};
use waku_bindings::{LibwakuResponse, Running};
@ -45,8 +45,8 @@ async fn test_echo_messages(
let rx_waku_message_cloned = rx_waku_message.clone();
let closure = move |response| {
if let LibwakuResponse::Success(v) = response {
let event: WakuEvent =
serde_json::from_str(v.unwrap().as_str()).expect("Parsing event to succeed");
let event: WakuEvent = serde_json::from_str(v.unwrap().as_str())
.expect("Parsing event to succeed test_echo_messages");
match event {
WakuEvent::WakuMessage(evt) => {

View File

@ -68,12 +68,6 @@ fn generate_bindgen_code(project_dir: &Path) {
println!("cargo:rustc-link-lib=stdc++");
println!(
"cargo:rustc-link-search={}",
vendor_path.join("vendor/negentropy/cpp").display()
);
println!("cargo:rustc-link-lib=static=negentropy");
println!("cargo:rustc-link-lib=ssl");
println!("cargo:rustc-link-lib=crypto");