2020-11-24 04:34:32 +00:00
## Waku Store protocol for historical messaging support.
## See spec for more details:
## https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-store.md
2021-06-09 14:37:08 +00:00
{. push raises : [ Defect ] . }
2020-08-31 03:32:41 +00:00
import
2022-06-28 23:59:38 +00:00
std / [ tables , times , sequtils , options , math ] ,
2022-08-01 16:21:11 +00:00
stew / results ,
2021-07-16 22:28:35 +00:00
chronicles ,
chronos ,
2022-07-25 11:01:37 +00:00
bearssl ,
2020-09-28 21:44:14 +00:00
libp2p / crypto / crypto ,
2020-08-31 03:32:41 +00:00
libp2p / protocols / protocol ,
libp2p / protobuf / minprotobuf ,
libp2p / stream / connection ,
2022-07-25 11:01:37 +00:00
metrics
import
2021-03-25 08:37:11 +00:00
.. / .. / node / storage / message / message_store ,
2022-07-25 11:01:37 +00:00
.. / .. / node / storage / message / waku_store_queue ,
2021-07-16 22:28:35 +00:00
.. / .. / node / peer_manager / peer_manager ,
2022-02-17 15:00:15 +00:00
.. / .. / utils / time ,
2022-07-25 11:01:37 +00:00
.. / .. / utils / pagination ,
.. / .. / utils / requests ,
.. / waku_message ,
2021-07-16 22:28:35 +00:00
.. / waku_swap / waku_swap ,
2022-07-25 11:01:37 +00:00
. / rpc ,
. / rpc_codec
2020-11-24 04:34:32 +00:00
2020-08-27 02:44:09 +00:00
2021-03-04 07:19:21 +00:00
declarePublicGauge waku_store_messages , " number of historical messages " , [ " type " ]
2021-01-29 08:42:41 +00:00
declarePublicGauge waku_store_peers , " number of store peers "
declarePublicGauge waku_store_errors , " number of store protocol errors " , [ " type " ]
2022-01-06 11:23:25 +00:00
declarePublicGauge waku_store_queries , " number of store queries received "
2021-01-29 08:42:41 +00:00
2020-09-16 04:23:10 +00:00
logScope :
topics = " wakustore "
2022-08-01 16:21:11 +00:00
const
WakuStoreCodec * = " /vac/waku/store/2.0.0-beta4 "
DefaultTopic * = " /waku/2/default-waku/proto "
2022-07-25 11:01:37 +00:00
# Constants required for pagination -------------------------------------------
2022-08-01 12:09:41 +00:00
MaxPageSize * = StoreMaxPageSize
2022-07-25 11:01:37 +00:00
# TODO the DefaultPageSize can be changed, it's current value is random
DefaultPageSize * = uint64 ( 20 ) # A recommended default number of waku messages per page
2022-08-01 12:09:41 +00:00
MaxTimeVariance * = StoreMaxTimeVariance
2022-07-25 11:01:37 +00:00
2022-08-01 16:21:11 +00:00
const MaxRpcSize = StoreMaxPageSize * MaxWakuMessageSize + 64 * 1024 # We add a 64kB safety buffer for protocol overhead
2022-07-25 11:01:37 +00:00
2020-08-27 02:44:09 +00:00
2021-02-09 08:31:38 +00:00
# Error types (metric label values)
const
dialFailure = " dial_failure "
decodeRpcFailure = " decode_rpc_failure "
2022-08-01 16:21:11 +00:00
peerNotFoundFailure = " peer_not_found_failure "
2021-02-09 08:31:38 +00:00
2022-06-13 17:59:53 +00:00
type
2022-08-01 12:09:41 +00:00
WakuStoreResult * [ T ] = Result [ T , string ]
2022-06-13 17:59:53 +00:00
WakuStore * = ref object of LPProtocol
peerManager * : PeerManager
rng * : ref BrHmacDrbgContext
messages * : StoreQueueRef # in-memory message store
store * : MessageStore # sqlite DB handle
wakuSwap * : WakuSwap
persistMessages * : bool
#TODO: WakuMessageStore currenly also holds isSqliteOnly; put it in single place.
isSqliteOnly : bool # if true, don't use in memory-store and answer history queries from the sqlite DB
2022-02-17 10:00:45 +00:00
proc findMessages ( w : WakuStore , query : HistoryQuery ) : HistoryResponse {. gcsafe . } =
## Query history to return a single page of messages matching the query
2022-03-11 05:26:15 +00:00
info " Finding messages matching received query " , query = query
2021-08-04 22:24:36 +00:00
2022-01-11 13:32:09 +00:00
## Extract query criteria
## All query criteria are optional
let
qContentTopics = if ( query . contentFilters . len ! = 0 ) : some ( query . contentFilters . mapIt ( it . contentTopic ) )
else : none ( seq [ ContentTopic ] )
qPubSubTopic = if ( query . pubsubTopic ! = " " ) : some ( query . pubsubTopic )
else : none ( string )
2022-08-01 16:21:11 +00:00
qCursor = if query . pagingInfo . cursor ! = Index ( ) : some ( query . pagingInfo . cursor )
else : none ( Index )
2022-02-17 15:00:15 +00:00
qStartTime = if query . startTime ! = Timestamp ( 0 ) : some ( query . startTime )
else : none ( Timestamp )
qEndTime = if query . endTime ! = Timestamp ( 0 ) : some ( query . endTime )
else : none ( Timestamp )
2022-08-01 16:21:11 +00:00
qMaxPageSize = query . pagingInfo . pageSize
qAscendingOrder = query . pagingInfo . direction = = PagingDirection . FORWARD
2020-11-09 04:48:09 +00:00
2022-08-01 16:21:11 +00:00
let queryRes = block :
if w . isSqliteOnly :
w . store . getMessagesByHistoryQuery (
contentTopic = qContentTopics ,
pubsubTopic = qPubSubTopic ,
cursor = qCursor ,
startTime = qStartTime ,
endTime = qEndTime ,
maxPageSize = qMaxPageSize ,
ascendingOrder = qAscendingOrder
)
else :
w . messages . getMessagesByHistoryQuery (
contentTopic = qContentTopics ,
pubsubTopic = qPubSubTopic ,
cursor = qCursor ,
startTime = qStartTime ,
endTime = qEndTime ,
maxPageSize = qMaxPageSize ,
ascendingOrder = qAscendingOrder
)
2022-03-11 05:26:15 +00:00
2022-08-01 16:21:11 +00:00
# Build response
# TODO: Handle errors
if queryRes . isErr ( ) :
return HistoryResponse ( messages : @ [ ] , pagingInfo : PagingInfo ( ) , error : HistoryResponseError . INVALID_CURSOR )
2022-01-11 13:32:09 +00:00
2022-08-01 16:21:11 +00:00
let ( messages , updatedPagingInfo ) = queryRes . get ( )
2022-01-11 13:32:09 +00:00
2022-08-01 16:21:11 +00:00
HistoryResponse (
messages : messages ,
pagingInfo : updatedPagingInfo . get ( PagingInfo ( ) ) ,
error : HistoryResponseError . NONE
)
2020-11-09 04:48:09 +00:00
2022-08-01 16:21:11 +00:00
proc init * ( ws : WakuStore , capacity = StoreDefaultCapacity ) =
2021-11-03 10:59:51 +00:00
2021-06-09 14:37:08 +00:00
proc handler ( conn : Connection , proto : string ) {. async . } =
2022-01-06 12:42:37 +00:00
var message = await conn . readLp ( MaxRpcSize . int )
2020-09-25 14:02:13 +00:00
var res = HistoryRPC . init ( message )
if res . isErr :
error " failed to decode rpc "
2021-02-09 08:31:38 +00:00
waku_store_errors . inc ( labelValues = [ decodeRpcFailure ] )
2020-08-27 02:44:09 +00:00
return
2021-04-20 01:59:14 +00:00
# TODO Print more info here
2022-03-11 05:26:15 +00:00
info " received query " , rpc = res . value
2022-01-06 11:23:25 +00:00
waku_store_queries . inc ( )
2020-08-27 02:44:09 +00:00
2020-09-25 14:02:13 +00:00
let value = res . value
let response = ws . findMessages ( res . value . query )
2020-11-24 04:53:42 +00:00
# TODO Do accounting here, response is HistoryResponse
# How do we get node or swap context?
if not ws . wakuSwap . isNil :
info " handle store swap test " , text = ws . wakuSwap . text
# NOTE Perform accounting operation
2021-10-06 12:29:08 +00:00
let peerId = conn . peerId
2020-11-24 04:53:42 +00:00
let messages = response . messages
2021-10-06 12:29:08 +00:00
ws . wakuSwap . credit ( peerId , messages . len )
2020-11-24 04:53:42 +00:00
else :
info " handle store swap is nil "
2021-04-20 01:59:14 +00:00
info " sending response " , messages = response . messages . len
2020-11-09 04:48:09 +00:00
await conn . writeLp ( HistoryRPC ( requestId : value . requestId ,
response : response ) . encode ( ) . buffer )
2020-08-27 02:44:09 +00:00
2021-06-09 14:37:08 +00:00
ws . handler = handler
2020-08-27 02:44:09 +00:00
ws . codec = WakuStoreCodec
2022-02-17 10:00:45 +00:00
ws . messages = StoreQueueRef . new ( capacity )
2020-09-18 13:28:19 +00:00
2020-11-16 08:38:52 +00:00
if ws . store . isNil :
return
2022-06-13 17:59:53 +00:00
if ws . isSqliteOnly :
info " SQLite-only store initialized. Messages are *not* loaded into memory. "
return
2022-08-01 16:21:11 +00:00
# Load all messages from sqliteStore into queueStore
2021-11-03 10:59:51 +00:00
info " attempting to load messages from persistent storage "
2022-08-01 16:21:11 +00:00
let res = ws . store . getAllMessages ( )
if res . isOk ( ) :
for ( receiverTime , msg , pubsubTopic ) in res . value :
let index = Index . compute ( msg , receiverTime , pubsubTopic )
discard ws . messages . put ( index , msg , pubsubTopic )
info " successfully loaded messages from the persistent store "
else :
warn " failed to load messages from the persistent store " , err = res . error ( )
2021-06-22 03:30:12 +00:00
debug " the number of messages in the memory " , messageNum = ws . messages . len
2021-01-29 08:42:41 +00:00
waku_store_messages . set ( ws . messages . len . int64 , labelValues = [ " stored " ] )
2020-11-16 08:38:52 +00:00
2021-02-09 08:31:38 +00:00
proc init * ( T : type WakuStore , peerManager : PeerManager , rng : ref BrHmacDrbgContext ,
2022-01-06 12:42:37 +00:00
store : MessageStore = nil , wakuSwap : WakuSwap = nil , persistMessages = true ,
2022-08-01 16:21:11 +00:00
capacity = StoreDefaultCapacity , isSqliteOnly = false ) : T =
2021-04-20 01:59:14 +00:00
debug " init "
2022-06-13 17:59:53 +00:00
var output = WakuStore ( rng : rng , peerManager : peerManager , store : store , wakuSwap : wakuSwap , persistMessages : persistMessages , isSqliteOnly : isSqliteOnly )
2021-11-03 10:59:51 +00:00
output . init ( capacity )
2021-07-14 15:59:42 +00:00
return output
2020-08-27 02:44:09 +00:00
2020-09-24 02:16:25 +00:00
# @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY
2021-10-06 12:29:08 +00:00
proc setPeer * ( ws : WakuStore , peer : RemotePeerInfo ) =
2021-02-11 08:58:25 +00:00
ws . peerManager . addPeer ( peer , WakuStoreCodec )
2021-01-29 08:42:41 +00:00
waku_store_peers . inc ( )
2020-09-24 02:16:25 +00:00
2021-07-13 07:18:51 +00:00
proc handleMessage * ( w : WakuStore , topic : string , msg : WakuMessage ) {. async . } =
if ( not w . persistMessages ) :
# Store is mounted but new messages should not be stored
return
2020-08-27 02:44:09 +00:00
2022-07-25 11:01:37 +00:00
let index = Index . compute (
msg ,
receivedTime = getNanosecondTime ( getTime ( ) . toUnixFloat ( ) ) ,
pubsubTopic = topic
)
2022-06-13 17:59:53 +00:00
# add message to in-memory store
if not w . isSqliteOnly :
# Handle WakuMessage according to store protocol
trace " handle message in WakuStore " , topic = topic , msg = msg
let addRes = w . messages . add ( IndexedWakuMessage ( msg : msg , index : index , pubsubTopic : topic ) )
2022-02-17 10:00:45 +00:00
2022-06-13 17:59:53 +00:00
if addRes . isErr :
trace " Attempt to add message to store failed " , msg = msg , index = index , err = addRes . error ( )
waku_store_errors . inc ( labelValues = [ $ ( addRes . error ( ) ) ] )
return # Do not attempt to store in persistent DB
2022-02-17 10:00:45 +00:00
2022-06-13 17:59:53 +00:00
waku_store_messages . set ( w . messages . len . int64 , labelValues = [ " stored " ] )
2022-02-17 10:00:45 +00:00
2021-07-13 07:18:51 +00:00
if w . store . isNil :
return
let res = w . store . put ( index , msg , topic )
if res . isErr :
2021-11-03 10:59:51 +00:00
trace " failed to store messages " , err = res . error
2021-07-13 07:18:51 +00:00
waku_store_errors . inc ( labelValues = [ " store_failure " ] )
2020-09-24 02:16:25 +00:00
2022-08-01 16:21:11 +00:00
# TODO: Remove after converting the query method into a non-callback method
type QueryHandlerFunc * = proc ( response : HistoryResponse ) {. gcsafe , closure . }
2020-09-24 02:16:25 +00:00
proc query * ( w : WakuStore , query : HistoryQuery , handler : QueryHandlerFunc ) {. async , gcsafe . } =
# @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service.
# Ideally depending on the query and our set of peers we take a subset of ideal peers.
# This will require us to check for various factors such as:
# - which topics they track
# - latency?
# - default store peer?
2021-02-11 08:58:25 +00:00
let peerOpt = w . peerManager . selectPeer ( WakuStoreCodec )
if peerOpt . isNone ( ) :
2021-02-12 08:53:52 +00:00
error " no suitable remote peers "
2022-08-01 16:21:11 +00:00
waku_store_errors . inc ( labelValues = [ peerNotFoundFailure ] )
2021-02-11 08:58:25 +00:00
return
let connOpt = await w . peerManager . dialPeer ( peerOpt . get ( ) , WakuStoreCodec )
2021-02-09 08:31:38 +00:00
if connOpt . isNone ( ) :
# @TODO more sophisticated error handling here
error " failed to connect to remote peer "
waku_store_errors . inc ( labelValues = [ dialFailure ] )
return
2020-09-24 02:16:25 +00:00
2021-02-09 08:31:38 +00:00
await connOpt . get ( ) . writeLP ( HistoryRPC ( requestId : generateRequestId ( w . rng ) ,
2020-11-09 04:48:09 +00:00
query : query ) . encode ( ) . buffer )
2020-09-24 02:16:25 +00:00
2022-01-06 12:42:37 +00:00
var message = await connOpt . get ( ) . readLp ( MaxRpcSize . int )
2020-09-25 14:02:13 +00:00
let response = HistoryRPC . init ( message )
2020-09-24 02:16:25 +00:00
if response . isErr :
error " failed to decode response "
2021-02-09 08:31:38 +00:00
waku_store_errors . inc ( labelValues = [ decodeRpcFailure ] )
2020-09-24 02:16:25 +00:00
return
2021-01-29 08:42:41 +00:00
waku_store_messages . set ( response . value . response . messages . len . int64 , labelValues = [ " retrieved " ] )
2020-09-25 14:02:13 +00:00
handler ( response . value . response )
2020-11-16 09:55:49 +00:00
2022-08-01 12:09:41 +00:00
## 21/WAKU2-FAULT-TOLERANT-STORE
proc queryFrom * ( w : WakuStore , query : HistoryQuery , handler : QueryHandlerFunc , peer : RemotePeerInfo ) : Future [ WakuStoreResult [ uint64 ] ] {. async , gcsafe . } =
2021-05-26 19:33:22 +00:00
## sends the query to the given peer
## returns the number of retrieved messages if no error occurs, otherwise returns the error string
# TODO dialPeer add it to the list of known peers, while it does not cause any issue but might be unnecessary
2021-05-19 19:28:09 +00:00
let connOpt = await w . peerManager . dialPeer ( peer , WakuStoreCodec )
if connOpt . isNone ( ) :
error " failed to connect to remote peer "
waku_store_errors . inc ( labelValues = [ dialFailure ] )
return err ( " failed to connect to remote peer " )
await connOpt . get ( ) . writeLP ( HistoryRPC ( requestId : generateRequestId ( w . rng ) ,
query : query ) . encode ( ) . buffer )
2021-07-02 18:37:58 +00:00
debug " query is sent " , query = query
2022-01-06 12:42:37 +00:00
var message = await connOpt . get ( ) . readLp ( MaxRpcSize . int )
2021-05-19 19:28:09 +00:00
let response = HistoryRPC . init ( message )
2021-07-02 18:37:58 +00:00
debug " response is received "
2021-05-19 19:28:09 +00:00
if response . isErr :
error " failed to decode response "
waku_store_errors . inc ( labelValues = [ decodeRpcFailure ] )
return err ( " failed to decode response " )
waku_store_messages . set ( response . value . response . messages . len . int64 , labelValues = [ " retrieved " ] )
handler ( response . value . response )
2021-06-22 03:30:12 +00:00
return ok ( response . value . response . messages . len . uint64 )
2021-05-19 19:28:09 +00:00
2022-08-01 12:09:41 +00:00
proc queryFromWithPaging * ( w : WakuStore , query : HistoryQuery , peer : RemotePeerInfo ) : Future [ WakuStoreResult [ seq [ WakuMessage ] ] ] {. async , gcsafe . } =
2021-07-02 18:37:58 +00:00
## a thin wrapper for queryFrom
## sends the query to the given peer
## when the query has a valid pagingInfo, it retrieves the historical messages in pages
## returns all the fetched messages if no error occurs, otherwise returns an error string
debug " queryFromWithPaging is called "
var messageList : seq [ WakuMessage ]
# make a copy of the query
var q = query
debug " query is " , q = q
var hasNextPage = true
2021-07-14 17:58:46 +00:00
proc handler ( response : HistoryResponse ) {. gcsafe . } =
2021-07-02 18:37:58 +00:00
# store messages
for m in response . messages . items : messageList . add ( m )
# check whether it is the last page
hasNextPage = ( response . pagingInfo . pageSize ! = 0 )
debug " hasNextPage " , hasNextPage = hasNextPage
# update paging cursor
q . pagingInfo . cursor = response . pagingInfo . cursor
debug " next paging info " , pagingInfo = q . pagingInfo
# fetch the history in pages
while ( hasNextPage ) :
let successResult = await w . queryFrom ( q , handler , peer )
if not successResult . isOk : return err ( " failed to resolve the query " )
debug " hasNextPage " , hasNextPage = hasNextPage
return ok ( messageList )
2022-08-01 12:09:41 +00:00
proc queryLoop ( w : WakuStore , query : HistoryQuery , candidateList : seq [ RemotePeerInfo ] ) : Future [ WakuStoreResult [ seq [ WakuMessage ] ] ] {. async , gcsafe . } =
2022-01-18 22:05:41 +00:00
## loops through the candidateList in order and sends the query to each
## once all responses have been received, the retrieved messages are consolidated into one deduplicated list
## if no messages have been retrieved, the returned future will resolve into a MessagesResult result holding an empty seq.
2022-08-01 12:09:41 +00:00
var futureList : seq [ Future [ WakuStoreResult [ seq [ WakuMessage ] ] ] ]
2022-01-18 22:05:41 +00:00
for peer in candidateList . items :
futureList . add ( w . queryFromWithPaging ( query , peer ) )
await allFutures ( futureList ) # all(), which returns a Future[seq[T]], has been deprecated
let messagesList = futureList
2022-08-01 12:09:41 +00:00
. map ( proc ( fut : Future [ WakuStoreResult [ seq [ WakuMessage ] ] ] ) : seq [ WakuMessage ] =
2022-01-18 22:05:41 +00:00
if fut . completed ( ) and fut . read ( ) . isOk ( ) : # completed() just as a sanity check. These futures have been awaited before using allFutures()
fut . read ( ) . value
else :
@ [ ]
)
. concat ( )
if messagesList . len ! = 0 :
return ok ( messagesList . deduplicate ( ) )
else :
debug " failed to resolve the query "
return err ( " failed to resolve the query " )
2021-05-19 19:28:09 +00:00
2022-08-01 12:09:41 +00:00
proc resume * ( ws : WakuStore , peerList : Option [ seq [ RemotePeerInfo ] ] = none ( seq [ RemotePeerInfo ] ) , pageSize : uint64 = DefaultPageSize ) : Future [ WakuStoreResult [ uint64 ] ] {. async , gcsafe . } =
2021-05-13 21:21:46 +00:00
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
## messages are stored in the store node's messages field and in the message db
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
## an offset of 20 second is added to the time window to count for nodes asynchrony
2022-01-18 22:05:41 +00:00
## peerList indicates the list of peers to query from.
## The history is fetched from all available peers in this list and then consolidated into one deduplicated list.
## Such candidates should be found through a discovery method (to be developed).
2021-05-26 19:33:22 +00:00
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
2021-05-19 19:28:09 +00:00
## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
2021-05-26 19:33:22 +00:00
2022-02-17 15:00:15 +00:00
var currentTime = getNanosecondTime ( epochTime ( ) )
2021-05-13 21:21:46 +00:00
debug " resume " , currentEpochTime = currentTime
2022-02-17 10:00:45 +00:00
let lastSeenItem = ws . messages . last ( )
var lastSeenTime = if lastSeenItem . isOk ( ) : lastSeenItem . get ( ) . msg . timestamp
2022-02-17 15:00:15 +00:00
else : Timestamp ( 0 )
2022-02-17 10:00:45 +00:00
2021-05-13 21:21:46 +00:00
# adjust the time window with an offset of 20 seconds
2022-02-17 15:00:15 +00:00
let offset : Timestamp = getNanosecondTime ( 20 )
2021-05-13 21:21:46 +00:00
currentTime = currentTime + offset
lastSeenTime = max ( lastSeenTime - offset , 0 )
2022-01-18 22:05:41 +00:00
debug " the offline time window is " , lastSeenTime = lastSeenTime , currentTime = currentTime
2021-05-13 21:21:46 +00:00
2021-07-02 18:37:58 +00:00
let
pinfo = PagingInfo ( direction : PagingDirection . FORWARD , pageSize : pageSize )
rpc = HistoryQuery ( pubsubTopic : DefaultTopic , startTime : lastSeenTime , endTime : currentTime , pagingInfo : pinfo )
2021-06-22 03:30:12 +00:00
var dismissed : uint = 0
var added : uint = 0
2021-07-14 17:58:46 +00:00
proc save ( msgList : seq [ WakuMessage ] ) =
2021-07-02 18:37:58 +00:00
debug " save proc is called "
2021-06-22 03:30:12 +00:00
# exclude index from the comparison criteria
2022-02-17 10:00:45 +00:00
2021-07-02 18:37:58 +00:00
for msg in msgList :
2022-07-25 11:01:37 +00:00
let index = Index . compute (
msg ,
receivedTime = getNanosecondTime ( getTime ( ) . toUnixFloat ( ) ) ,
pubsubTopic = DefaultTopic
)
2021-06-22 03:30:12 +00:00
# check for duplicate messages
# TODO Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic
2022-02-17 10:00:45 +00:00
if ws . messages . contains ( index ) :
2021-06-22 03:30:12 +00:00
dismissed = dismissed + 1
continue
# store the new message
let indexedWakuMsg = IndexedWakuMessage ( msg : msg , index : index , pubsubTopic : DefaultTopic )
2021-06-25 21:06:56 +00:00
# store in db if exists
if not ws . store . isNil :
let res = ws . store . put ( index , msg , DefaultTopic )
if res . isErr :
2021-11-03 10:59:51 +00:00
trace " failed to store messages " , err = res . error
2021-06-25 21:06:56 +00:00
waku_store_errors . inc ( labelValues = [ " store_failure " ] )
continue
2022-02-17 10:00:45 +00:00
discard ws . messages . add ( indexedWakuMsg )
2021-06-22 03:30:12 +00:00
added = added + 1
2022-01-06 11:23:25 +00:00
waku_store_messages . set ( ws . messages . len . int64 , labelValues = [ " stored " ] )
2021-06-22 03:30:12 +00:00
debug " number of duplicate messages found in resume " , dismissed = dismissed
debug " number of messages added via resume " , added = added
2021-05-19 19:28:09 +00:00
if peerList . isSome :
2021-05-26 19:33:22 +00:00
debug " trying the candidate list to fetch the history "
2021-07-02 18:37:58 +00:00
let successResult = await ws . queryLoop ( rpc , peerList . get ( ) )
2021-05-19 19:28:09 +00:00
if successResult . isErr :
debug " failed to resume the history from the list of candidates "
return err ( " failed to resume the history from the list of candidates " )
2021-05-26 19:33:22 +00:00
debug " resume is done successfully "
2021-07-02 18:37:58 +00:00
save ( successResult . value )
return ok ( added )
2021-05-19 19:28:09 +00:00
else :
2021-05-26 19:33:22 +00:00
debug " no candidate list is provided, selecting a random peer "
2021-05-19 19:28:09 +00:00
# if no peerList is set then query from one of the peers stored in the peer manager
let peerOpt = ws . peerManager . selectPeer ( WakuStoreCodec )
if peerOpt . isNone ( ) :
2022-06-08 09:20:18 +00:00
warn " no suitable remote peers "
2022-08-01 16:21:11 +00:00
waku_store_errors . inc ( labelValues = [ peerNotFoundFailure ] )
2021-05-19 19:28:09 +00:00
return err ( " no suitable remote peers " )
2021-05-26 19:33:22 +00:00
debug " a peer is selected from peer manager "
2021-10-06 12:29:08 +00:00
let remotePeerInfo = peerOpt . get ( )
let successResult = await ws . queryFromWithPaging ( rpc , remotePeerInfo )
2021-05-19 19:28:09 +00:00
if successResult . isErr :
debug " failed to resume the history "
return err ( " failed to resume the history " )
2021-05-26 19:33:22 +00:00
debug " resume is done successfully "
2021-07-02 18:37:58 +00:00
save ( successResult . value )
2021-06-22 03:30:12 +00:00
return ok ( added )
2021-05-13 21:21:46 +00:00
2022-08-01 12:09:41 +00:00
2020-11-16 09:55:49 +00:00
# NOTE: Experimental, maybe incorporate as part of query call
2020-11-24 04:53:42 +00:00
proc queryWithAccounting * ( ws : WakuStore , query : HistoryQuery , handler : QueryHandlerFunc ) {. async , gcsafe . } =
2020-11-16 09:55:49 +00:00
# @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service.
# Ideally depending on the query and our set of peers we take a subset of ideal peers.
# This will require us to check for various factors such as:
# - which topics they track
# - latency?
# - default store peer?
2021-02-11 08:58:25 +00:00
let peerOpt = ws . peerManager . selectPeer ( WakuStoreCodec )
if peerOpt . isNone ( ) :
2021-02-12 08:53:52 +00:00
error " no suitable remote peers "
2022-08-01 16:21:11 +00:00
waku_store_errors . inc ( labelValues = [ peerNotFoundFailure ] )
2021-02-11 08:58:25 +00:00
return
let connOpt = await ws . peerManager . dialPeer ( peerOpt . get ( ) , WakuStoreCodec )
2021-02-09 08:31:38 +00:00
if connOpt . isNone ( ) :
# @TODO more sophisticated error handling here
error " failed to connect to remote peer "
waku_store_errors . inc ( labelValues = [ dialFailure ] )
return
2020-11-16 09:55:49 +00:00
2021-02-09 08:31:38 +00:00
await connOpt . get ( ) . writeLP ( HistoryRPC ( requestId : generateRequestId ( ws . rng ) ,
2020-11-16 09:55:49 +00:00
query : query ) . encode ( ) . buffer )
2022-01-06 12:42:37 +00:00
var message = await connOpt . get ( ) . readLp ( MaxRpcSize . int )
2020-11-16 09:55:49 +00:00
let response = HistoryRPC . init ( message )
if response . isErr :
error " failed to decode response "
2021-02-09 08:31:38 +00:00
waku_store_errors . inc ( labelValues = [ decodeRpcFailure ] )
2020-11-16 09:55:49 +00:00
return
# NOTE Perform accounting operation
2020-11-24 04:53:42 +00:00
# Assumes wakuSwap protocol is mounted
2021-10-06 12:29:08 +00:00
let remotePeerInfo = peerOpt . get ( )
2020-11-16 09:55:49 +00:00
let messages = response . value . response . messages
2021-10-06 12:29:08 +00:00
ws . wakuSwap . debit ( remotePeerInfo . peerId , messages . len )
2020-11-16 09:55:49 +00:00
2021-01-29 08:42:41 +00:00
waku_store_messages . set ( response . value . response . messages . len . int64 , labelValues = [ " retrieved " ] )
2020-11-16 09:55:49 +00:00
handler ( response . value . response )