mirror of
https://github.com/logos-messaging/logos-messaging-rust-bindings.git
synced 2026-01-02 14:03:12 +00:00
apply better option approach in store queries
This commit is contained in:
parent
3cbb259b80
commit
80ea725aeb
@ -138,7 +138,7 @@ impl App<Running> {
|
||||
let one_day_in_secs = 60 * 60 * 24;
|
||||
let time_start = (Duration::from_secs(Utc::now().timestamp() as u64)
|
||||
- Duration::from_secs(one_day_in_secs))
|
||||
.as_nanos() as usize;
|
||||
.as_nanos() as u64;
|
||||
|
||||
let include_data = true;
|
||||
|
||||
@ -147,6 +147,7 @@ impl App<Running> {
|
||||
STORE_NODE,
|
||||
include_data,
|
||||
Some(time_start),
|
||||
None,
|
||||
None).await.unwrap();
|
||||
let messages:Vec<_> = messages
|
||||
.iter()
|
||||
|
||||
@ -16,8 +16,7 @@ pub use multiaddr::Multiaddr;
|
||||
pub use secp256k1::{PublicKey, SecretKey};
|
||||
use std::marker::PhantomData;
|
||||
use std::time::Duration;
|
||||
use store::StoreWakuMessageResponse;
|
||||
use uuid::Uuid;
|
||||
use store::{StoreQueryRequest, StoreWakuMessageResponse};
|
||||
// internal
|
||||
use crate::general::contenttopic::{Encoding, WakuContentTopic};
|
||||
use crate::general::libwaku_response::LibwakuResponse;
|
||||
@ -177,31 +176,26 @@ impl WakuNodeHandle<Running> {
|
||||
content_topics: Vec<WakuContentTopic>,
|
||||
peer_addr: &str,
|
||||
include_data: bool, // is true, resp contains payload, etc. Only msg_hashes otherwise
|
||||
time_start: Option<usize>, // unix time nanoseconds
|
||||
time_end: Option<usize>, // unix time nanoseconds
|
||||
time_start: Option<u64>, // unix time nanoseconds
|
||||
time_end: Option<u64>, // unix time nanoseconds
|
||||
timeout_millis: Option<i32>,
|
||||
) -> Result<Vec<StoreWakuMessageResponse>> {
|
||||
let mut cursor: Option<MessageHash> = None;
|
||||
|
||||
let mut messages: Vec<StoreWakuMessageResponse> = Vec::new();
|
||||
|
||||
loop {
|
||||
let request_id = Uuid::new_v4();
|
||||
let response = store::waku_store_query(
|
||||
&self.ctx,
|
||||
request_id.to_string(),
|
||||
include_data,
|
||||
pubsub_topic.clone(),
|
||||
content_topics.clone(),
|
||||
time_start,
|
||||
time_end,
|
||||
None, // message_hashes
|
||||
cursor, // pagination_cursor
|
||||
true, // pagination_forward
|
||||
Some(25), // pagination_limit,
|
||||
peer_addr,
|
||||
None, // timeout_millis
|
||||
)
|
||||
.await?;
|
||||
let query = StoreQueryRequest::new()
|
||||
.with_pubsub_topic(pubsub_topic.clone())
|
||||
.with_content_topics(content_topics.clone())
|
||||
.with_include_data(include_data)
|
||||
.with_time_start(time_start)
|
||||
.with_time_end(time_end)
|
||||
.with_pagination_cursor(cursor)
|
||||
.with_pagination_forward(true);
|
||||
|
||||
let response =
|
||||
store::waku_store_query(&self.ctx, query, peer_addr, timeout_millis).await?;
|
||||
|
||||
messages.extend(response.messages);
|
||||
|
||||
|
||||
@ -2,8 +2,10 @@
|
||||
|
||||
// std
|
||||
use std::ffi::CString;
|
||||
use uuid::Uuid;
|
||||
// internal
|
||||
use crate::general::libwaku_response::{handle_response, LibwakuResponse};
|
||||
use crate::general::time::get_now_in_nanosecs;
|
||||
use crate::general::waku_decode::WakuDecode;
|
||||
use crate::general::{
|
||||
contenttopic::WakuContentTopic, messagehash::MessageHash, pubsubtopic::PubsubTopic, Result,
|
||||
@ -24,7 +26,7 @@ pub struct PagingOptions {
|
||||
|
||||
/// Criteria used to retrieve historical messages
|
||||
#[derive(Clone, Serialize, Debug)]
|
||||
struct StoreQueryRequest {
|
||||
pub struct StoreQueryRequest {
|
||||
/// if true, the store-response will include the full message content. If false,
|
||||
/// the store-response will only include a list of message hashes.
|
||||
request_id: String,
|
||||
@ -33,9 +35,9 @@ struct StoreQueryRequest {
|
||||
pubsub_topic: Option<PubsubTopic>,
|
||||
content_topics: Vec<WakuContentTopic>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
time_start: Option<usize>,
|
||||
time_start: Option<u64>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
time_end: Option<usize>,
|
||||
time_end: Option<u64>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
message_hashes: Option<Vec<MessageHash>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
@ -45,6 +47,63 @@ struct StoreQueryRequest {
|
||||
pagination_limit: Option<u64>,
|
||||
}
|
||||
|
||||
impl StoreQueryRequest {
|
||||
pub fn new() -> Self {
|
||||
StoreQueryRequest {
|
||||
request_id: Uuid::new_v4().to_string(),
|
||||
include_data: true,
|
||||
pubsub_topic: None,
|
||||
content_topics: Vec::new(),
|
||||
time_start: Some(get_now_in_nanosecs()),
|
||||
time_end: Some(get_now_in_nanosecs()),
|
||||
message_hashes: None,
|
||||
pagination_cursor: None,
|
||||
pagination_forward: true,
|
||||
pagination_limit: Some(25),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_include_data(mut self, include_data: bool) -> Self {
|
||||
self.include_data = include_data;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_pubsub_topic(mut self, pubsub_topic: Option<PubsubTopic>) -> Self {
|
||||
self.pubsub_topic = pubsub_topic;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_content_topics(mut self, content_topics: Vec<WakuContentTopic>) -> Self {
|
||||
self.content_topics = content_topics;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_time_start(mut self, time_start: Option<u64>) -> Self {
|
||||
self.time_start = time_start;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_time_end(mut self, time_end: Option<u64>) -> Self {
|
||||
self.time_end = time_end;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_message_hashes(mut self, message_hashes: Vec<MessageHash>) -> Self {
|
||||
self.message_hashes = Some(message_hashes);
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_pagination_cursor(mut self, pagination_cursor: Option<MessageHash>) -> Self {
|
||||
self.pagination_cursor = pagination_cursor;
|
||||
self
|
||||
}
|
||||
|
||||
pub fn with_pagination_forward(mut self, pagination_forward: bool) -> Self {
|
||||
self.pagination_forward = pagination_forward;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Deserialize, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct StoreWakuMessageResponse {
|
||||
@ -78,32 +137,10 @@ impl WakuDecode for StoreResponse {
|
||||
|
||||
pub async fn waku_store_query(
|
||||
ctx: &WakuNodeContext,
|
||||
request_id: String,
|
||||
include_data: bool,
|
||||
pubsub_topic: Option<PubsubTopic>,
|
||||
content_topics: Vec<WakuContentTopic>,
|
||||
time_start: Option<usize>,
|
||||
time_end: Option<usize>,
|
||||
message_hashes: Option<Vec<MessageHash>>,
|
||||
pagination_cursor: Option<MessageHash>, // Message hash (key) from where to start query (exclusive)
|
||||
pagination_forward: bool,
|
||||
pagination_limit: Option<u64>,
|
||||
query: StoreQueryRequest,
|
||||
peer_addr: &str,
|
||||
timeout_millis: Option<i32>,
|
||||
) -> Result<StoreResponse> {
|
||||
let query = StoreQueryRequest {
|
||||
request_id,
|
||||
include_data,
|
||||
pubsub_topic,
|
||||
content_topics,
|
||||
time_start,
|
||||
time_end,
|
||||
message_hashes,
|
||||
pagination_cursor,
|
||||
pagination_forward,
|
||||
pagination_limit,
|
||||
};
|
||||
|
||||
let json_query = CString::new(
|
||||
serde_json::to_string(&query).expect("StoreQuery should always be able to be serialized"),
|
||||
)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user