2020-11-24 04:34:32 +00:00
## Waku Store protocol for historical messaging support.
## See spec for more details:
## https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-store.md
2021-06-09 14:37:08 +00:00
{. push raises : [ Defect ] . }
2020-08-31 03:32:41 +00:00
import
2022-10-03 15:36:17 +00:00
std / [ tables , times , sequtils , options , algorithm ] ,
2022-08-01 16:21:11 +00:00
stew / results ,
2021-07-16 22:28:35 +00:00
chronicles ,
chronos ,
2022-09-07 15:31:27 +00:00
bearssl / rand ,
2020-09-28 21:44:14 +00:00
libp2p / crypto / crypto ,
2020-08-31 03:32:41 +00:00
libp2p / protocols / protocol ,
libp2p / protobuf / minprotobuf ,
libp2p / stream / connection ,
2022-07-25 11:01:37 +00:00
metrics
import
2022-09-16 10:55:22 +00:00
.. / .. / node / storage / message / message_retention_policy ,
2022-07-25 11:01:37 +00:00
.. / .. / node / storage / message / waku_store_queue ,
2021-07-16 22:28:35 +00:00
.. / .. / node / peer_manager / peer_manager ,
2022-02-17 15:00:15 +00:00
.. / .. / utils / time ,
2022-07-25 11:01:37 +00:00
.. / .. / utils / requests ,
.. / waku_message ,
2021-07-16 22:28:35 +00:00
.. / waku_swap / waku_swap ,
2022-07-25 11:01:37 +00:00
. / rpc ,
2022-09-28 11:36:05 +00:00
. / rpc_codec ,
. / pagination ,
. / message_store
2020-11-24 04:34:32 +00:00
2020-08-27 02:44:09 +00:00
2021-03-04 07:19:21 +00:00
declarePublicGauge waku_store_messages , " number of historical messages " , [ " type " ]
2021-01-29 08:42:41 +00:00
declarePublicGauge waku_store_peers , " number of store peers "
declarePublicGauge waku_store_errors , " number of store protocol errors " , [ " type " ]
2022-01-06 11:23:25 +00:00
declarePublicGauge waku_store_queries , " number of store queries received "
2022-09-14 12:40:11 +00:00
declarePublicHistogram waku_store_insert_duration_seconds , " message insertion duration "
declarePublicHistogram waku_store_query_duration_seconds , " history query duration "
2021-01-29 08:42:41 +00:00
2020-09-16 04:23:10 +00:00
logScope :
topics = " wakustore "
2022-09-22 09:17:38 +00:00
2022-08-01 16:21:11 +00:00
const
WakuStoreCodec * = " /vac/waku/store/2.0.0-beta4 "
DefaultTopic * = " /waku/2/default-waku/proto "
2022-07-25 11:01:37 +00:00
2022-09-22 09:17:38 +00:00
MaxMessageTimestampVariance * = Timestamp ( 20 . seconds . nanoseconds ) # 20 seconds maximum allowable sender timestamp "drift"
2022-07-25 11:01:37 +00:00
2020-08-27 02:44:09 +00:00
2021-02-09 08:31:38 +00:00
# Error types (metric label values)
const
2022-09-26 09:50:15 +00:00
invalidMessage = " invalid_message "
2022-09-13 16:06:23 +00:00
insertFailure = " insert_failure "
2022-09-20 09:39:52 +00:00
retPolicyFailure = " retpolicy_failure "
2021-02-09 08:31:38 +00:00
dialFailure = " dial_failure "
decodeRpcFailure = " decode_rpc_failure "
2022-08-01 16:21:11 +00:00
peerNotFoundFailure = " peer_not_found_failure "
2021-02-09 08:31:38 +00:00
2022-06-13 17:59:53 +00:00
type
2022-08-01 12:09:41 +00:00
WakuStoreResult * [ T ] = Result [ T , string ]
2022-06-13 17:59:53 +00:00
WakuStore * = ref object of LPProtocol
peerManager * : PeerManager
2022-09-07 15:31:27 +00:00
rng * : ref rand . HmacDrbgContext
2022-09-20 09:39:52 +00:00
store * : MessageStore
2022-06-13 17:59:53 +00:00
wakuSwap * : WakuSwap
2022-09-16 10:55:22 +00:00
retentionPolicy : Option [ MessageRetentionPolicy ]
2022-06-13 17:59:53 +00:00
2022-09-21 09:32:59 +00:00
proc executeMessageRetentionPolicy * ( w : WakuStore ) =
2022-09-20 09:39:52 +00:00
if w . retentionPolicy . isNone ( ) :
2022-09-21 09:32:59 +00:00
return
2022-09-20 09:39:52 +00:00
if w . store . isNil ( ) :
2022-09-21 09:32:59 +00:00
return
2022-09-20 09:39:52 +00:00
2022-09-21 09:32:59 +00:00
let policy = w . retentionPolicy . get ( )
let retPolicyRes = policy . execute ( w . store )
if retPolicyRes . isErr ( ) :
waku_store_errors . inc ( labelValues = [ retPolicyFailure ] )
debug " failed execution of retention policy " , error = retPolicyRes . error
2022-09-20 09:39:52 +00:00
proc reportStoredMessagesMetric * ( w : WakuStore ) =
if w . store . isNil ( ) :
return
let resCount = w . store . getMessagesCount ( )
2022-09-15 16:13:30 +00:00
if resCount . isErr ( ) :
return
waku_store_messages . set ( resCount . value , labelValues = [ " stored " ] )
2022-02-17 10:00:45 +00:00
proc findMessages ( w : WakuStore , query : HistoryQuery ) : HistoryResponse {. gcsafe . } =
## Query history to return a single page of messages matching the query
2022-03-11 05:26:15 +00:00
2022-09-02 08:14:58 +00:00
# Extract query criteria. All query criteria are optional
2022-01-11 13:32:09 +00:00
let
qContentTopics = if ( query . contentFilters . len ! = 0 ) : some ( query . contentFilters . mapIt ( it . contentTopic ) )
else : none ( seq [ ContentTopic ] )
qPubSubTopic = if ( query . pubsubTopic ! = " " ) : some ( query . pubsubTopic )
else : none ( string )
2022-09-27 19:10:11 +00:00
qCursor = if query . pagingInfo . cursor ! = PagingIndex ( ) : some ( query . pagingInfo . cursor )
else : none ( PagingIndex )
2022-02-17 15:00:15 +00:00
qStartTime = if query . startTime ! = Timestamp ( 0 ) : some ( query . startTime )
else : none ( Timestamp )
qEndTime = if query . endTime ! = Timestamp ( 0 ) : some ( query . endTime )
else : none ( Timestamp )
2022-09-27 09:47:25 +00:00
qMaxPageSize = if query . pagingInfo . pageSize < = 0 : DefaultPageSize
else : min ( query . pagingInfo . pageSize , MaxPageSize )
2022-08-01 16:21:11 +00:00
qAscendingOrder = query . pagingInfo . direction = = PagingDirection . FORWARD
2022-09-13 14:48:33 +00:00
2022-09-20 09:39:52 +00:00
let queryStartTime = getTime ( ) . toUnixFloat ( )
let queryRes = w . store . getMessagesByHistoryQuery (
2022-08-01 16:21:11 +00:00
contentTopic = qContentTopics ,
pubsubTopic = qPubSubTopic ,
cursor = qCursor ,
startTime = qStartTime ,
endTime = qEndTime ,
2022-10-03 15:36:17 +00:00
maxPageSize = qMaxPageSize + 1 ,
2022-08-01 16:21:11 +00:00
ascendingOrder = qAscendingOrder
)
2022-09-13 14:48:33 +00:00
2022-09-14 12:40:11 +00:00
let queryDuration = getTime ( ) . toUnixFloat ( ) - queryStartTime
waku_store_query_duration_seconds . observe ( queryDuration )
2022-09-13 14:48:33 +00:00
2022-09-02 08:14:58 +00:00
# Build response
# TODO: Improve error reporting
2022-08-01 16:21:11 +00:00
if queryRes . isErr ( ) :
return HistoryResponse ( messages : @ [ ] , pagingInfo : PagingInfo ( ) , error : HistoryResponseError . INVALID_CURSOR )
2022-01-11 13:32:09 +00:00
2022-10-03 15:36:17 +00:00
let rows = queryRes . get ( )
2022-01-11 13:32:09 +00:00
2022-10-03 15:36:17 +00:00
if rows . len < = 0 :
return HistoryResponse ( messages : @ [ ] , error : HistoryResponseError . NONE )
var messages = if rows . len < = int ( qMaxPageSize ) : rows . mapIt ( it [ 1 ] )
else : rows [ 0 .. ^ 2 ] . mapIt ( it [ 1 ] )
var pagingInfo = none ( PagingInfo )
# The retrieved messages list should always be in chronological order
if not qAscendingOrder :
messages . reverse ( )
if rows . len > int ( qMaxPageSize ) :
# Build last message cursor
let ( pubsubTopic , message , digest , storeTimestamp ) = rows [ ^ 1 ]
# TODO: Improve coherence of MessageDigest type
var messageDigest : array [ 32 , byte ]
for i in 0 .. < min ( digest . len , 32 ) :
messageDigest [ i ] = digest [ i ]
let pagingIndex = PagingIndex (
pubsubTopic : pubsubTopic ,
senderTime : message . timestamp ,
receiverTime : storeTimestamp ,
digest : MessageDigest ( data : messageDigest )
)
pagingInfo = some ( PagingInfo (
pageSize : uint64 ( messages . len ) ,
cursor : pagingIndex ,
direction : if qAscendingOrder : PagingDirection . FORWARD
else : PagingDirection . BACKWARD
) )
2022-08-01 16:21:11 +00:00
HistoryResponse (
messages : messages ,
2022-10-03 15:36:17 +00:00
pagingInfo : pagingInfo . get ( PagingInfo ( ) ) ,
2022-08-01 16:21:11 +00:00
error : HistoryResponseError . NONE
)
2020-11-09 04:48:09 +00:00
2022-09-20 09:39:52 +00:00
proc initProtocolHandler * ( ws : WakuStore ) =
2021-06-09 14:37:08 +00:00
proc handler ( conn : Connection , proto : string ) {. async . } =
2022-09-02 08:14:58 +00:00
let buf = await conn . readLp ( MaxRpcSize . int )
let resReq = HistoryRPC . init ( buf )
if resReq . isErr ( ) :
error " failed to decode rpc " , peerId = conn . peerId
2021-02-09 08:31:38 +00:00
waku_store_errors . inc ( labelValues = [ decodeRpcFailure ] )
2020-08-27 02:44:09 +00:00
return
2022-09-02 08:14:58 +00:00
let req = resReq . value
info " received history query " , peerId = conn . peerId , requestId = req . requestId , query = req . query
2022-01-06 11:23:25 +00:00
waku_store_queries . inc ( )
2020-08-27 02:44:09 +00:00
2022-09-20 09:39:52 +00:00
let resp = if not ws . store . isNil ( ) : ws . findMessages ( req . query )
# TODO: Improve error reporting
else : HistoryResponse ( error : HistoryResponseError . SERVICE_UNAVAILABLE )
2020-11-24 04:53:42 +00:00
2022-09-13 16:06:23 +00:00
if not ws . wakuSwap . isNil ( ) :
2022-09-02 08:14:58 +00:00
info " handle store swap " , peerId = conn . peerId , requestId = req . requestId , text = ws . wakuSwap . text
# Perform accounting operation
# TODO: Do accounting here, response is HistoryResponse. How do we get node or swap context?
2021-10-06 12:29:08 +00:00
let peerId = conn . peerId
2022-09-02 08:14:58 +00:00
let messages = resp . messages
2021-10-06 12:29:08 +00:00
ws . wakuSwap . credit ( peerId , messages . len )
2020-11-24 04:53:42 +00:00
2022-09-02 08:14:58 +00:00
info " sending history response " , peerId = conn . peerId , requestId = req . requestId , messages = resp . messages . len
2021-04-20 01:59:14 +00:00
2022-09-02 08:14:58 +00:00
let rpc = HistoryRPC ( requestId : req . requestId , response : resp )
await conn . writeLp ( rpc . encode ( ) . buffer )
2020-08-27 02:44:09 +00:00
2021-06-09 14:37:08 +00:00
ws . handler = handler
2020-08-27 02:44:09 +00:00
ws . codec = WakuStoreCodec
2020-11-16 08:38:52 +00:00
2022-09-16 10:55:22 +00:00
proc init * ( T : type WakuStore ,
peerManager : PeerManager ,
rng : ref rand . HmacDrbgContext ,
2022-09-20 09:39:52 +00:00
store : MessageStore ,
2022-09-16 10:55:22 +00:00
wakuSwap : WakuSwap = nil ,
retentionPolicy = none ( MessageRetentionPolicy ) ) : T =
let ws = WakuStore (
rng : rng ,
peerManager : peerManager ,
store : store ,
wakuSwap : wakuSwap ,
retentionPolicy : retentionPolicy
)
2022-09-20 09:39:52 +00:00
ws . initProtocolHandler ( )
2020-08-27 02:44:09 +00:00
2022-09-20 09:39:52 +00:00
return ws
proc init * ( T : type WakuStore ,
peerManager : PeerManager ,
rng : ref rand . HmacDrbgContext ,
wakuSwap : WakuSwap = nil ,
retentionPolicy = none ( MessageRetentionPolicy ) ) : T =
2022-09-22 09:17:38 +00:00
let store = StoreQueueRef . new ( )
2022-09-20 09:39:52 +00:00
WakuStore . init ( peerManager , rng , store , wakuSwap , retentionPolicy )
2020-09-24 02:16:25 +00:00
2022-08-02 11:14:13 +00:00
2022-09-26 09:50:15 +00:00
proc isValidMessage ( msg : WakuMessage ) : bool =
if msg . timestamp = = 0 :
return true
let
now = getNanosecondTime ( getTime ( ) . toUnixFloat ( ) )
lowerBound = now - MaxMessageTimestampVariance
upperBound = now + MaxMessageTimestampVariance
return lowerBound < = msg . timestamp and msg . timestamp < = upperBound
2022-09-21 09:32:59 +00:00
proc handleMessage * ( w : WakuStore , pubsubTopic : string , msg : WakuMessage ) =
2022-09-20 09:39:52 +00:00
if w . store . isNil ( ) :
# Messages should not be stored
2021-07-13 07:18:51 +00:00
return
2020-08-27 02:44:09 +00:00
2022-09-13 10:37:06 +00:00
if msg . ephemeral :
# The message is ephemeral, should not be stored
return
2022-09-13 14:48:33 +00:00
2022-09-26 09:50:15 +00:00
if not isValidMessage ( msg ) :
waku_store_errors . inc ( labelValues = [ invalidMessage ] )
return
2022-09-13 14:48:33 +00:00
let insertStartTime = getTime ( ) . toUnixFloat ( )
2022-09-26 09:50:15 +00:00
block :
let
msgDigest = computeDigest ( msg )
msgReceivedTime = if msg . timestamp > 0 : msg . timestamp
else : getNanosecondTime ( getTime ( ) . toUnixFloat ( ) )
2022-09-13 10:37:06 +00:00
2022-09-26 09:50:15 +00:00
trace " handling message " , pubsubTopic = pubsubTopic , contentTopic = msg . contentTopic , timestamp = msg . timestamp , digest = msgDigest
2022-09-13 16:06:23 +00:00
2022-09-26 09:50:15 +00:00
let putStoreRes = w . store . put ( pubsubTopic , msg , msgDigest , msgReceivedTime )
if putStoreRes . isErr ( ) :
debug " failed to insert message into the store " , err = putStoreRes . error
waku_store_errors . inc ( labelValues = [ insertFailure ] )
return
2022-09-20 09:39:52 +00:00
2022-09-14 12:40:11 +00:00
let insertDuration = getTime ( ) . toUnixFloat ( ) - insertStartTime
waku_store_insert_duration_seconds . observe ( insertDuration )
2020-09-24 02:16:25 +00:00
2022-08-01 16:21:11 +00:00
2022-09-20 09:39:52 +00:00
## CLIENT
# TODO: This should probably be an add function and append the peer to an array
proc setPeer * ( ws : WakuStore , peer : RemotePeerInfo ) =
ws . peerManager . addPeer ( peer , WakuStoreCodec )
waku_store_peers . inc ( )
2022-08-02 11:14:13 +00:00
proc query ( w : WakuStore , req : HistoryQuery , peer : RemotePeerInfo ) : Future [ WakuStoreResult [ HistoryResponse ] ] {. async , gcsafe . } =
let connOpt = await w . peerManager . dialPeer ( peer , WakuStoreCodec )
if connOpt . isNone ( ) :
waku_store_errors . inc ( labelValues = [ dialFailure ] )
return err ( dialFailure )
let connection = connOpt . get ( )
let rpc = HistoryRPC ( requestId : generateRequestId ( w . rng ) , query : req )
await connection . writeLP ( rpc . encode ( ) . buffer )
var message = await connOpt . get ( ) . readLp ( MaxRpcSize . int )
let response = HistoryRPC . init ( message )
if response . isErr ( ) :
error " failed to decode response "
waku_store_errors . inc ( labelValues = [ decodeRpcFailure ] )
return err ( decodeRpcFailure )
waku_store_messages . set ( response . value . response . messages . len . int64 , labelValues = [ " retrieved " ] )
return ok ( response . value . response )
proc query * ( w : WakuStore , req : HistoryQuery ) : Future [ WakuStoreResult [ HistoryResponse ] ] {. async , gcsafe . } =
# TODO: We need to be more stratigic about which peers we dial. Right now we just set one on the service.
2020-09-24 02:16:25 +00:00
# Ideally depending on the query and our set of peers we take a subset of ideal peers.
# This will require us to check for various factors such as:
# - which topics they track
# - latency?
# - default store peer?
2021-02-11 08:58:25 +00:00
let peerOpt = w . peerManager . selectPeer ( WakuStoreCodec )
if peerOpt . isNone ( ) :
2021-02-12 08:53:52 +00:00
error " no suitable remote peers "
2022-08-01 16:21:11 +00:00
waku_store_errors . inc ( labelValues = [ peerNotFoundFailure ] )
2022-08-02 11:14:13 +00:00
return err ( peerNotFoundFailure )
2021-02-09 08:31:38 +00:00
2022-08-02 11:14:13 +00:00
return await w . query ( req , peerOpt . get ( ) )
2020-09-24 02:16:25 +00:00
2022-08-01 12:09:41 +00:00
## 21/WAKU2-FAULT-TOLERANT-STORE
2022-09-20 09:39:52 +00:00
const StoreResumeTimeWindowOffset : Timestamp = getNanosecondTime ( 20 ) ## Adjust the time window with an offset of 20 seconds
2022-08-02 11:14:13 +00:00
proc queryFromWithPaging * ( w : WakuStore , query : HistoryQuery , peer : RemotePeerInfo ) : Future [ WakuStoreResult [ seq [ WakuMessage ] ] ] {. async , gcsafe . } =
## A thin wrapper for query. Sends the query to the given peer. when the query has a valid pagingInfo,
## it retrieves the historical messages in pages.
## Returns all the fetched messages, if error occurs, returns an error string
2021-05-19 19:28:09 +00:00
2022-08-02 11:14:13 +00:00
# Make a copy of the query
var req = query
2021-07-02 18:37:58 +00:00
2022-08-02 11:14:13 +00:00
var messageList : seq [ WakuMessage ] = @ [ ]
2021-05-19 19:28:09 +00:00
2022-08-02 11:14:13 +00:00
# Fetch the history in pages
while true :
let res = await w . query ( req , peer )
if res . isErr ( ) :
2022-09-20 09:39:52 +00:00
return err ( res . error )
2021-05-19 19:28:09 +00:00
2022-08-02 11:14:13 +00:00
let response = res . get ( )
messageList . add ( response . messages )
2021-07-02 18:37:58 +00:00
2022-08-02 11:14:13 +00:00
# Check whether it is the last page
if response . pagingInfo . pageSize = = 0 :
break
2021-07-02 18:37:58 +00:00
2022-08-02 11:14:13 +00:00
# Update paging cursor
req . pagingInfo . cursor = response . pagingInfo . cursor
2021-07-02 18:37:58 +00:00
return ok ( messageList )
2022-08-02 11:14:13 +00:00
proc queryLoop ( w : WakuStore , req : HistoryQuery , candidateList : seq [ RemotePeerInfo ] ) : Future [ WakuStoreResult [ seq [ WakuMessage ] ] ] {. async , gcsafe . } =
## Loops through the peers candidate list in order and sends the query to each
##
## Once all responses have been received, the retrieved messages are consolidated into one deduplicated list.
## if no messages have been retrieved, the returned future will resolve into a result holding an empty seq.
let queriesList = candidateList . mapIt ( w . queryFromWithPaging ( req , it ) )
2022-01-18 22:05:41 +00:00
2022-08-02 11:14:13 +00:00
await allFutures ( queriesList )
let messagesList = queriesList
2022-08-01 12:09:41 +00:00
. map ( proc ( fut : Future [ WakuStoreResult [ seq [ WakuMessage ] ] ] ) : seq [ WakuMessage ] =
2022-08-02 11:14:13 +00:00
# These futures have been awaited before using allFutures(). Call completed() just as a sanity check.
if not fut . completed ( ) or fut . read ( ) . isErr ( ) :
return @ [ ]
fut . read ( ) . value
2022-01-18 22:05:41 +00:00
)
. concat ( )
2022-08-02 11:14:13 +00:00
. deduplicate ( )
2022-01-18 22:05:41 +00:00
2022-08-02 11:14:13 +00:00
if messagesList . len = = 0 :
2022-01-18 22:05:41 +00:00
return err ( " failed to resolve the query " )
2021-05-19 19:28:09 +00:00
2022-08-02 11:14:13 +00:00
return ok ( messagesList )
proc resume * ( w : WakuStore ,
peerList : Option [ seq [ RemotePeerInfo ] ] = none ( seq [ RemotePeerInfo ] ) ,
pageSize : uint64 = DefaultPageSize ,
pubsubTopic = DefaultTopic ) : Future [ WakuStoreResult [ uint64 ] ] {. async , gcsafe . } =
2021-05-13 21:21:46 +00:00
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
## messages are stored in the store node's messages field and in the message db
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
## an offset of 20 second is added to the time window to count for nodes asynchrony
2022-01-18 22:05:41 +00:00
## peerList indicates the list of peers to query from.
## The history is fetched from all available peers in this list and then consolidated into one deduplicated list.
## Such candidates should be found through a discovery method (to be developed).
2021-05-26 19:33:22 +00:00
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
2021-05-19 19:28:09 +00:00
## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
2022-09-13 16:06:23 +00:00
# If store has not been provided, don't even try
2022-09-20 09:39:52 +00:00
if w . store . isNil ( ) :
return err ( " store not provided (nil) " )
2022-09-13 16:06:23 +00:00
2022-09-20 09:39:52 +00:00
# NOTE: Original implementation is based on the message's sender timestamp. At the moment
# of writing, the sqlite store implementation returns the last message's receiver
# timestamp.
# lastSeenTime = lastSeenItem.get().msg.timestamp
let
lastSeenTime = w . store . getNewestMessageTimestamp ( ) . get ( Timestamp ( 0 ) )
now = getNanosecondTime ( getTime ( ) . toUnixFloat ( ) )
2022-09-13 16:06:23 +00:00
2022-09-20 09:39:52 +00:00
debug " resuming with offline time window " , lastSeenTime = lastSeenTime , currentTime = now
2021-06-22 03:30:12 +00:00
2022-09-20 09:39:52 +00:00
let
queryEndTime = now + StoreResumeTimeWindowOffset
queryStartTime = max ( lastSeenTime - StoreResumeTimeWindowOffset , 0 )
2021-06-22 03:30:12 +00:00
2022-08-02 11:14:13 +00:00
let req = HistoryQuery (
pubsubTopic : pubsubTopic ,
2022-09-20 09:39:52 +00:00
startTime : queryStartTime ,
endTime : queryEndTime ,
2022-08-02 11:14:13 +00:00
pagingInfo : PagingInfo (
direction : PagingDirection . FORWARD ,
pageSize : pageSize
)
)
2021-06-22 03:30:12 +00:00
2022-08-02 11:14:13 +00:00
var res : WakuStoreResult [ seq [ WakuMessage ] ]
if peerList . isSome ( ) :
2021-05-26 19:33:22 +00:00
debug " trying the candidate list to fetch the history "
2022-08-02 11:14:13 +00:00
res = await w . queryLoop ( req , peerList . get ( ) )
2021-05-19 19:28:09 +00:00
else :
2021-05-26 19:33:22 +00:00
debug " no candidate list is provided, selecting a random peer "
2021-05-19 19:28:09 +00:00
# if no peerList is set then query from one of the peers stored in the peer manager
2022-08-02 11:14:13 +00:00
let peerOpt = w . peerManager . selectPeer ( WakuStoreCodec )
2021-05-19 19:28:09 +00:00
if peerOpt . isNone ( ) :
2022-06-08 09:20:18 +00:00
warn " no suitable remote peers "
2022-08-01 16:21:11 +00:00
waku_store_errors . inc ( labelValues = [ peerNotFoundFailure ] )
2021-05-19 19:28:09 +00:00
return err ( " no suitable remote peers " )
2021-05-26 19:33:22 +00:00
debug " a peer is selected from peer manager "
2022-08-02 11:14:13 +00:00
res = await w . queryFromWithPaging ( req , peerOpt . get ( ) )
2021-05-13 21:21:46 +00:00
2022-08-02 11:14:13 +00:00
if res . isErr ( ) :
debug " failed to resume the history "
return err ( " failed to resume the history " )
2022-08-01 12:09:41 +00:00
2020-11-16 09:55:49 +00:00
2022-08-02 11:14:13 +00:00
# Save the retrieved messages in the store
var added : uint = 0
for msg in res . get ( ) :
2022-09-26 09:50:15 +00:00
let putStoreRes = w . store . put ( pubsubTopic , msg )
2022-09-20 09:39:52 +00:00
if putStoreRes . isErr ( ) :
continue
2022-08-02 11:14:13 +00:00
added . inc ( )
2022-09-13 16:06:23 +00:00
2022-08-02 11:14:13 +00:00
return ok ( added )
2021-02-11 08:58:25 +00:00
2022-08-02 11:14:13 +00:00
## EXPERIMENTAL
# NOTE: Experimental, maybe incorporate as part of query call
proc queryWithAccounting * ( ws : WakuStore , req : HistoryQuery ) : Future [ WakuStoreResult [ HistoryResponse ] ] {. async , gcsafe . } =
let peerOpt = ws . peerManager . selectPeer ( WakuStoreCodec )
2021-02-11 08:58:25 +00:00
if peerOpt . isNone ( ) :
2021-02-12 08:53:52 +00:00
error " no suitable remote peers "
2022-08-01 16:21:11 +00:00
waku_store_errors . inc ( labelValues = [ peerNotFoundFailure ] )
2022-08-02 11:14:13 +00:00
return err ( peerNotFoundFailure )
2021-02-11 08:58:25 +00:00
2022-08-02 11:14:13 +00:00
let res = await ws . query ( req , peerOpt . get ( ) )
if res . isErr ( ) :
2022-09-20 09:39:52 +00:00
return err ( res . error )
2021-02-09 08:31:38 +00:00
2022-08-02 11:14:13 +00:00
let response = res . get ( )
2020-11-16 09:55:49 +00:00
2022-08-02 11:14:13 +00:00
# Perform accounting operation. Assumes wakuSwap protocol is mounted
ws . wakuSwap . debit ( peerOpt . get ( ) . peerId , response . messages . len )
2020-11-16 09:55:49 +00:00
2022-08-02 11:14:13 +00:00
return ok ( response )