2020-11-24 12:34:32 +08:00
## Waku Store protocol for historical messaging support.
## See spec for more details:
## https://github.com/vacp2p/specs/blob/master/specs/waku/v2/waku-store.md
2021-06-09 16:37:08 +02:00
{. push raises : [ Defect ] . }
2021-07-16 15:28:35 -07:00
# Group by std, external then internal imports
2020-08-31 05:32:41 +02:00
import
2021-07-16 15:28:35 -07:00
# std imports
2022-01-17 19:37:06 +01:00
std / [ tables , times , sequtils , algorithm , options , math ] ,
2021-07-16 15:28:35 -07:00
# external imports
2020-09-28 23:44:14 +02:00
bearssl ,
2021-07-16 15:28:35 -07:00
chronicles ,
chronos ,
2020-09-28 23:44:14 +02:00
libp2p / crypto / crypto ,
2020-08-31 05:32:41 +02:00
libp2p / protocols / protocol ,
libp2p / protobuf / minprotobuf ,
libp2p / stream / connection ,
2021-07-16 15:28:35 -07:00
metrics ,
stew / [ results , byteutils ] ,
# internal imports
2021-03-25 10:37:11 +02:00
.. / .. / node / storage / message / message_store ,
2021-07-16 15:28:35 -07:00
.. / .. / node / peer_manager / peer_manager ,
2021-02-09 10:31:38 +02:00
.. / .. / utils / requests ,
2021-07-16 15:28:35 -07:00
.. / waku_swap / waku_swap ,
. / waku_store_types
2020-11-24 12:34:32 +08:00
2021-07-16 15:28:35 -07:00
# export all modules whose types are used in public functions/types
export
options ,
chronos ,
bearssl ,
minprotobuf ,
peer_manager ,
waku_store_types
2020-08-27 04:44:09 +02:00
2021-03-04 09:19:21 +02:00
declarePublicGauge waku_store_messages , " number of historical messages " , [ " type " ]
2021-01-29 10:42:41 +02:00
declarePublicGauge waku_store_peers , " number of store peers "
declarePublicGauge waku_store_errors , " number of store protocol errors " , [ " type " ]
2022-01-06 12:23:25 +01:00
declarePublicGauge waku_store_queries , " number of store queries received "
2021-01-29 10:42:41 +02:00
2020-09-16 12:23:10 +08:00
logScope :
topics = " wakustore "
2020-08-27 04:44:09 +02:00
const
2021-04-27 16:52:24 -07:00
WakuStoreCodec * = " /vac/waku/store/2.0.0-beta3 "
2021-11-03 11:59:51 +01:00
DefaultStoreCapacity * = 50000 # Default maximum of 50k messages stored
2020-08-27 04:44:09 +02:00
2021-02-09 10:31:38 +02:00
# Error types (metric label values)
const
dialFailure = " dial_failure "
decodeRpcFailure = " decode_rpc_failure "
2020-11-24 12:34:32 +08:00
# TODO Move serialization function to separate file, too noisy
# TODO Move pagination to separate file, self-contained logic
2022-01-17 19:37:06 +01:00
proc computeIndex * ( msg : WakuMessage , receivedTime = getTime ( ) . toUnixFloat ( ) ) : Index =
## Takes a WakuMessage with received timestamp and returns its Index.
## Received timestamp will default to system time if not provided.
2020-11-24 12:34:32 +08:00
var ctx : sha256
ctx . init ( )
ctx . update ( msg . contentTopic . toBytes ( ) ) # converts the contentTopic to bytes
ctx . update ( msg . payload )
let digest = ctx . finish ( ) # computes the hash
ctx . clear ( )
2022-01-17 19:37:06 +01:00
let receiverTime = receivedTime . round ( 3 ) # Ensure timestamp has (only) millisecond resolution
var index = Index ( digest : digest , receiverTime : receiverTime , senderTime : msg . timestamp )
2021-07-07 16:56:20 -07:00
return index
2020-11-24 12:34:32 +08:00
2020-10-29 18:55:31 -07:00
proc encode * ( index : Index ) : ProtoBuffer =
## encodes an Index object into a ProtoBuffer
## returns the resultant ProtoBuffer
# intiate a ProtoBuffer
2021-07-15 11:25:52 -07:00
var output = initProtoBuffer ( )
2020-10-29 18:55:31 -07:00
# encodes index
2021-07-14 08:59:42 -07:00
output . write ( 1 , index . digest . data )
output . write ( 2 , index . receiverTime )
output . write ( 3 , index . senderTime )
return output
2020-10-29 18:55:31 -07:00
proc encode * ( pinfo : PagingInfo ) : ProtoBuffer =
## encodes a PagingInfo object into a ProtoBuffer
## returns the resultant ProtoBuffer
# intiate a ProtoBuffer
2021-07-15 11:25:52 -07:00
var output = initProtoBuffer ( )
2020-10-29 18:55:31 -07:00
# encodes pinfo
2021-07-14 08:59:42 -07:00
output . write ( 1 , pinfo . pageSize )
output . write ( 2 , pinfo . cursor . encode ( ) )
output . write ( 3 , uint32 ( ord ( pinfo . direction ) ) )
return output
2020-10-29 18:55:31 -07:00
proc init * ( T : type Index , buffer : seq [ byte ] ) : ProtoResult [ T ] =
## creates and returns an Index object out of buffer
var index = Index ( )
let pb = initProtoBuffer ( buffer )
var data : seq [ byte ]
discard ? pb . getField ( 1 , data )
# create digest from data
index . digest = MDigest [ 256 ] ( )
for count , b in data :
index . digest . data [ count ] = b
2021-07-07 16:56:20 -07:00
# read the timestamp
var receiverTime : float64
discard ? pb . getField ( 2 , receiverTime )
index . receiverTime = receiverTime
# read the timestamp
var senderTime : float64
discard ? pb . getField ( 3 , senderTime )
index . senderTime = senderTime
2020-10-29 18:55:31 -07:00
2021-07-14 08:59:42 -07:00
return ok ( index )
2020-10-29 18:55:31 -07:00
proc init * ( T : type PagingInfo , buffer : seq [ byte ] ) : ProtoResult [ T ] =
## creates and returns a PagingInfo object out of buffer
var pagingInfo = PagingInfo ( )
let pb = initProtoBuffer ( buffer )
2021-04-09 03:04:21 -07:00
var pageSize : uint64
2020-10-29 18:55:31 -07:00
discard ? pb . getField ( 1 , pageSize )
pagingInfo . pageSize = pageSize
var cursorBuffer : seq [ byte ]
discard ? pb . getField ( 2 , cursorBuffer )
pagingInfo . cursor = ? Index . init ( cursorBuffer )
2021-04-08 23:09:20 -04:00
var direction : uint32
discard ? pb . getField ( 3 , direction )
pagingInfo . direction = PagingDirection ( direction )
2020-10-29 18:55:31 -07:00
2021-07-14 08:59:42 -07:00
return ok ( pagingInfo )
2021-04-19 10:38:30 -07:00
proc init * ( T : type HistoryContentFilter , buffer : seq [ byte ] ) : ProtoResult [ T ] =
let pb = initProtoBuffer ( buffer )
# ContentTopic corresponds to the contentTopic field of waku message (not to be confused with pubsub topic)
var contentTopic : ContentTopic
discard ? pb . getField ( 1 , contentTopic )
ok ( HistoryContentFilter ( contentTopic : contentTopic ) )
2020-08-31 05:32:41 +02:00
proc init * ( T : type HistoryQuery , buffer : seq [ byte ] ) : ProtoResult [ T ] =
2020-08-27 04:44:09 +02:00
var msg = HistoryQuery ( )
let pb = initProtoBuffer ( buffer )
2021-04-27 16:52:24 -07:00
discard ? pb . getField ( 2 , msg . pubsubTopic )
2020-08-27 04:44:09 +02:00
2021-04-19 10:38:30 -07:00
var buffs : seq [ seq [ byte ] ]
2021-04-27 16:52:24 -07:00
discard ? pb . getRepeatedField ( 3 , buffs )
2021-04-19 10:38:30 -07:00
for buf in buffs :
msg . contentFilters . add ( ? HistoryContentFilter . init ( buf ) )
2020-08-27 04:44:09 +02:00
2020-10-29 18:55:31 -07:00
var pagingInfoBuffer : seq [ byte ]
2021-04-27 16:52:24 -07:00
discard ? pb . getField ( 4 , pagingInfoBuffer )
2020-10-29 18:55:31 -07:00
msg . pagingInfo = ? PagingInfo . init ( pagingInfoBuffer )
2021-04-27 16:52:24 -07:00
discard ? pb . getField ( 5 , msg . startTime )
discard ? pb . getField ( 6 , msg . endTime )
2021-04-02 15:53:28 -07:00
2021-07-14 08:59:42 -07:00
return ok ( msg )
2020-08-27 04:44:09 +02:00
proc init * ( T : type HistoryResponse , buffer : seq [ byte ] ) : ProtoResult [ T ] =
var msg = HistoryResponse ( )
let pb = initProtoBuffer ( buffer )
var messages : seq [ seq [ byte ] ]
2021-07-20 11:51:32 -07:00
discard ? pb . getRepeatedField ( 2 , messages )
2020-08-27 04:44:09 +02:00
for buf in messages :
2020-11-16 09:38:52 +01:00
msg . messages . add ( ? WakuMessage . init ( buf ) )
2020-10-29 18:55:31 -07:00
var pagingInfoBuffer : seq [ byte ]
2021-07-20 11:51:32 -07:00
discard ? pb . getField ( 3 , pagingInfoBuffer )
2020-10-29 18:55:31 -07:00
msg . pagingInfo = ? PagingInfo . init ( pagingInfoBuffer )
2020-08-27 04:44:09 +02:00
2021-07-13 12:01:21 -07:00
var error : uint32
2021-07-20 11:51:32 -07:00
discard ? pb . getField ( 4 , error )
2021-07-13 12:01:21 -07:00
msg . error = HistoryResponseError ( error )
2021-07-14 08:59:42 -07:00
return ok ( msg )
2020-08-27 04:44:09 +02:00
2020-09-25 16:02:13 +02:00
proc init * ( T : type HistoryRPC , buffer : seq [ byte ] ) : ProtoResult [ T ] =
var rpc = HistoryRPC ( )
let pb = initProtoBuffer ( buffer )
discard ? pb . getField ( 1 , rpc . requestId )
var queryBuffer : seq [ byte ]
discard ? pb . getField ( 2 , queryBuffer )
rpc . query = ? HistoryQuery . init ( queryBuffer )
var responseBuffer : seq [ byte ]
discard ? pb . getField ( 3 , responseBuffer )
rpc . response = ? HistoryResponse . init ( responseBuffer )
2021-07-14 08:59:42 -07:00
return ok ( rpc )
2020-09-25 16:02:13 +02:00
2021-04-19 10:38:30 -07:00
proc encode * ( filter : HistoryContentFilter ) : ProtoBuffer =
2021-07-15 11:25:52 -07:00
var output = initProtoBuffer ( )
2021-07-14 08:59:42 -07:00
output . write ( 1 , filter . contentTopic )
return output
2021-04-19 10:38:30 -07:00
2020-08-31 05:32:41 +02:00
proc encode * ( query : HistoryQuery ) : ProtoBuffer =
2021-07-15 11:25:52 -07:00
var output = initProtoBuffer ( )
2021-04-27 16:52:24 -07:00
2021-07-14 08:59:42 -07:00
output . write ( 2 , query . pubsubTopic )
2020-08-27 04:44:09 +02:00
2021-04-19 10:38:30 -07:00
for filter in query . contentFilters :
2021-07-14 08:59:42 -07:00
output . write ( 3 , filter . encode ( ) )
output . write ( 4 , query . pagingInfo . encode ( ) )
2021-04-27 16:52:24 -07:00
2021-07-14 08:59:42 -07:00
output . write ( 5 , query . startTime )
output . write ( 6 , query . endTime )
2021-04-27 16:52:24 -07:00
2021-07-14 08:59:42 -07:00
return output
2020-08-27 04:44:09 +02:00
2021-04-02 15:53:28 -07:00
2020-08-31 05:32:41 +02:00
proc encode * ( response : HistoryResponse ) : ProtoBuffer =
2021-07-15 11:25:52 -07:00
var output = initProtoBuffer ( )
2020-08-27 04:44:09 +02:00
for msg in response . messages :
2021-07-20 11:51:32 -07:00
output . write ( 2 , msg . encode ( ) )
2020-09-25 16:02:13 +02:00
2021-07-20 11:51:32 -07:00
output . write ( 3 , response . pagingInfo . encode ( ) )
2020-10-29 18:55:31 -07:00
2021-07-20 11:51:32 -07:00
output . write ( 4 , uint32 ( ord ( response . error ) ) )
2021-07-14 08:59:42 -07:00
return output
2021-07-13 12:01:21 -07:00
2020-09-25 16:02:13 +02:00
proc encode * ( rpc : HistoryRPC ) : ProtoBuffer =
2021-07-15 11:25:52 -07:00
var output = initProtoBuffer ( )
2021-07-14 08:59:42 -07:00
output . write ( 1 , rpc . requestId )
output . write ( 2 , rpc . query . encode ( ) )
output . write ( 3 , rpc . response . encode ( ) )
2020-09-25 16:02:13 +02:00
2021-07-14 08:59:42 -07:00
return output
2020-08-27 04:44:09 +02:00
2020-11-08 20:48:09 -08:00
proc indexComparison * ( x , y : Index ) : int =
## compares x and y
## returns 0 if they are equal
## returns -1 if x < y
## returns 1 if x > y
let
2021-07-07 16:56:20 -07:00
timecmp = system . cmp ( x . senderTime , y . senderTime )
2020-11-08 20:48:09 -08:00
digestcm = system . cmp ( x . digest . data , y . digest . data )
if timecmp ! = 0 : # timestamp has a higher priority for comparison
return timecmp
return digestcm
proc indexedWakuMessageComparison * ( x , y : IndexedWakuMessage ) : int =
## compares x and y
## returns 0 if they are equal
## returns -1 if x < y
## returns 1 if x > y
2021-07-14 08:59:42 -07:00
return indexComparison ( x . index , y . index )
2020-11-08 20:48:09 -08:00
proc findIndex * ( msgList : seq [ IndexedWakuMessage ] , index : Index ) : Option [ int ] =
## returns the position of an IndexedWakuMessage in msgList whose index value matches the given index
## returns none if no match is found
for i , indexedWakuMessage in msgList :
if indexedWakuMessage . index = = index :
return some ( i )
return none ( int )
2022-01-11 14:32:09 +01:00
proc paginate * ( msgList : seq [ IndexedWakuMessage ] , pinfo : PagingInfo ) : ( seq [ IndexedWakuMessage ] , PagingInfo , HistoryResponseError ) =
## takes a message list, and performs paging based on pinfo
## the message list must be sorted
2020-11-08 20:48:09 -08:00
## returns the page i.e, a sequence of IndexedWakuMessage and the new paging info to be used for the next paging request
var
cursor = pinfo . cursor
pageSize = pinfo . pageSize
dir = pinfo . direction
2021-08-04 15:24:36 -07:00
output : ( seq [ IndexedWakuMessage ] , PagingInfo , HistoryResponseError )
2020-11-08 20:48:09 -08:00
2022-01-11 14:32:09 +01:00
if msgList . len = = 0 : # no pagination is needed for an empty list
output = ( msgList , PagingInfo ( pageSize : 0 , cursor : pinfo . cursor , direction : pinfo . direction ) , HistoryResponseError . NONE )
2021-08-04 15:24:36 -07:00
return output
2022-01-07 15:01:23 +01:00
## Adjust pageSize:
## - pageSize should not exceed maximum
## - pageSize being zero indicates "no pagination", but we still limit
## responses to no more than a page of MaxPageSize messages
if ( pageSize = = uint64 ( 0 ) ) or ( pageSize > MaxPageSize ) :
2021-08-04 15:24:36 -07:00
pageSize = MaxPageSize
2022-01-11 14:32:09 +01:00
let total = uint64 ( msgList . len )
2021-04-09 03:04:21 -07:00
2021-08-04 15:24:36 -07:00
# set the cursor of the initial paging request
var isInitialQuery = false
2022-01-11 14:32:09 +01:00
var cursorIndex : uint64
2021-08-04 15:24:36 -07:00
if cursor = = Index ( ) : # an empty cursor means it is an initial query
isInitialQuery = true
2020-11-08 20:48:09 -08:00
case dir
2022-01-11 14:32:09 +01:00
of PagingDirection . FORWARD :
cursorIndex = 0
cursor = msgList [ cursorIndex ] . index # set the cursor to the beginning of the list
of PagingDirection . BACKWARD :
cursorIndex = total - 1
cursor = msgList [ cursorIndex ] . index # set the cursor to the end of the list
else :
var cursorIndexOption = msgList . findIndex ( cursor )
if cursorIndexOption . isNone : # the cursor is not valid
output = ( @ [ ] , PagingInfo ( pageSize : 0 , cursor : pinfo . cursor , direction : pinfo . direction ) , HistoryResponseError . INVALID_CURSOR )
return output
cursorIndex = uint64 ( cursorIndexOption . get ( ) )
2021-08-04 15:24:36 -07:00
2020-11-08 20:48:09 -08:00
case dir
of PagingDirection . FORWARD : # forward pagination
2021-08-04 15:24:36 -07:00
# set the index of the first message in the page
# exclude the message pointing by the cursor
var startIndex = cursorIndex + 1
# for the initial query, include the message pointing by the cursor
if isInitialQuery :
startIndex = cursorIndex
# adjust the pageSize based on the total remaining messages
pageSize = min ( pageSize , total - startIndex )
if ( pageSize = = 0 ) :
output = ( @ [ ] , PagingInfo ( pageSize : pageSize , cursor : pinfo . cursor , direction : pinfo . direction ) , HistoryResponseError . NONE )
return output
# set the index of the last message in the page
var endIndex = startIndex + pageSize - 1
# retrieve the messages
var retMessages : seq [ IndexedWakuMessage ]
for i in startIndex .. endIndex :
retMessages . add ( msgList [ i ] )
output = ( retMessages , PagingInfo ( pageSize : pageSize , cursor : msgList [ endIndex ] . index , direction : pinfo . direction ) , HistoryResponseError . NONE )
return output
of PagingDirection . BACKWARD :
# set the index of the last message in the page
# exclude the message pointing by the cursor
var endIndex = cursorIndex - 1
# for the initial query, include the message pointing by the cursor
if isInitialQuery :
endIndex = cursorIndex
# adjust the pageSize based on the total remaining messages
pageSize = min ( pageSize , endIndex + 1 )
if ( pageSize = = 0 ) :
output = ( @ [ ] , PagingInfo ( pageSize : pageSize , cursor : pinfo . cursor , direction : pinfo . direction ) , HistoryResponseError . NONE )
return output
# set the index of the first message in the page
var startIndex = endIndex - pageSize + 1
# retrieve the messages
var retMessages : seq [ IndexedWakuMessage ]
for i in startIndex .. endIndex :
retMessages . add ( msgList [ i ] )
output = ( retMessages , PagingInfo ( pageSize : pageSize , cursor : msgList [ startIndex ] . index , direction : pinfo . direction ) , HistoryResponseError . NONE )
return output
2020-11-08 20:48:09 -08:00
2020-09-24 04:16:25 +02:00
proc findMessages ( w : WakuStore , query : HistoryQuery ) : HistoryResponse =
2022-01-11 14:32:09 +01:00
## Extract query criteria
## All query criteria are optional
let
qContentTopics = if ( query . contentFilters . len ! = 0 ) : some ( query . contentFilters . mapIt ( it . contentTopic ) )
else : none ( seq [ ContentTopic ] )
qPubSubTopic = if ( query . pubsubTopic ! = " " ) : some ( query . pubsubTopic )
else : none ( string )
qStartTime = if query . startTime ! = float64 ( 0 ) : some ( query . startTime )
else : none ( float64 )
qEndTime = if query . endTime ! = float64 ( 0 ) : some ( query . endTime )
else : none ( float64 )
2020-11-08 20:48:09 -08:00
2022-01-11 14:32:09 +01:00
## Compose filter predicate for message from query criteria
proc matchesQuery ( indMsg : IndexedWakuMessage ) : bool =
if qPubSubTopic . isSome ( ) :
# filter on pubsub topic
if indMsg . pubsubTopic ! = qPubSubTopic . get ( ) :
return false
if qStartTime . isSome ( ) and qEndTime . isSome ( ) :
# temporal filtering
# select only messages whose sender generated timestamps fall bw the queried start time and end time
if indMsg . msg . timestamp > qEndTime . get ( ) or indMsg . msg . timestamp < qStartTime . get ( ) :
return false
if qContentTopics . isSome ( ) :
# filter on content
if indMsg . msg . contentTopic notin qContentTopics . get ( ) :
return false
return true
## Filter history using predicate and sort on indexedWakuMessageComparison
## TODO: since MaxPageSize is likely much smaller than w.messages.len,
## we could optimise here by only filtering a portion of w.messages,
## and repeat until we have populated a full page.
## TODO: we can gain a lot by rather sorting on insert. Perhaps use a nim-stew
## sorted set?
let filteredMsgs = w . messages . filterIt ( it . matchesQuery )
. sorted ( indexedWakuMessageComparison )
## Paginate the filtered messages
let ( indexedWakuMsgList , updatedPagingInfo , error ) = paginate ( filteredMsgs , query . pagingInfo )
2021-07-13 12:01:21 -07:00
2022-01-11 14:32:09 +01:00
## Extract and return response
let
wakuMsgList = indexedWakuMsgList . mapIt ( it . msg )
historyRes = HistoryResponse ( messages : wakuMsgList , pagingInfo : updatedPagingInfo , error : error )
2021-07-13 12:01:21 -07:00
return historyRes
2020-11-08 20:48:09 -08:00
2021-11-03 11:59:51 +01:00
proc init * ( ws : WakuStore , capacity = DefaultStoreCapacity ) =
2021-06-09 16:37:08 +02:00
proc handler ( conn : Connection , proto : string ) {. async . } =
2022-01-06 13:42:37 +01:00
var message = await conn . readLp ( MaxRpcSize . int )
2020-09-25 16:02:13 +02:00
var res = HistoryRPC . init ( message )
if res . isErr :
error " failed to decode rpc "
2021-02-09 10:31:38 +02:00
waku_store_errors . inc ( labelValues = [ decodeRpcFailure ] )
2020-08-27 04:44:09 +02:00
return
2021-04-20 09:59:14 +08:00
# TODO Print more info here
2020-08-27 04:44:09 +02:00
info " received query "
2022-01-06 12:23:25 +01:00
waku_store_queries . inc ( )
2020-08-27 04:44:09 +02:00
2020-09-25 16:02:13 +02:00
let value = res . value
let response = ws . findMessages ( res . value . query )
2020-11-24 12:53:42 +08:00
# TODO Do accounting here, response is HistoryResponse
# How do we get node or swap context?
if not ws . wakuSwap . isNil :
info " handle store swap test " , text = ws . wakuSwap . text
# NOTE Perform accounting operation
2021-10-06 14:29:08 +02:00
let peerId = conn . peerId
2020-11-24 12:53:42 +08:00
let messages = response . messages
2021-10-06 14:29:08 +02:00
ws . wakuSwap . credit ( peerId , messages . len )
2020-11-24 12:53:42 +08:00
else :
info " handle store swap is nil "
2021-04-20 09:59:14 +08:00
info " sending response " , messages = response . messages . len
2020-11-08 20:48:09 -08:00
await conn . writeLp ( HistoryRPC ( requestId : value . requestId ,
response : response ) . encode ( ) . buffer )
2020-08-27 04:44:09 +02:00
2021-06-09 16:37:08 +02:00
ws . handler = handler
2020-08-27 04:44:09 +02:00
ws . codec = WakuStoreCodec
2021-11-03 11:59:51 +01:00
ws . messages = initQueue ( capacity )
2020-09-18 15:28:19 +02:00
2020-11-16 09:38:52 +01:00
if ws . store . isNil :
return
2021-07-07 16:56:20 -07:00
proc onData ( receiverTime : float64 , msg : WakuMessage , pubsubTopic : string ) =
2021-04-27 16:52:24 -07:00
# TODO index should not be recalculated
2022-01-17 19:37:06 +01:00
ws . messages . add ( IndexedWakuMessage ( msg : msg , index : msg . computeIndex ( receiverTime ) , pubsubTopic : pubsubTopic ) )
2020-11-16 09:38:52 +01:00
2021-11-03 11:59:51 +01:00
info " attempting to load messages from persistent storage "
let res = ws . store . getAll ( onData , some ( capacity ) )
2020-11-16 09:38:52 +01:00
if res . isErr :
warn " failed to load messages from store " , err = res . error
2021-01-29 10:42:41 +02:00
waku_store_errors . inc ( labelValues = [ " store_load_failure " ] )
2021-12-02 10:36:18 -04:00
else :
info " successfully loaded from store "
2021-01-29 10:42:41 +02:00
2021-06-21 20:30:12 -07:00
debug " the number of messages in the memory " , messageNum = ws . messages . len
2021-01-29 10:42:41 +02:00
waku_store_messages . set ( ws . messages . len . int64 , labelValues = [ " stored " ] )
2020-11-16 09:38:52 +01:00
2021-05-13 14:21:46 -07:00
2021-02-09 10:31:38 +02:00
proc init * ( T : type WakuStore , peerManager : PeerManager , rng : ref BrHmacDrbgContext ,
2022-01-06 13:42:37 +01:00
store : MessageStore = nil , wakuSwap : WakuSwap = nil , persistMessages = true ,
capacity = DefaultStoreCapacity ) : T =
2021-04-20 09:59:14 +08:00
debug " init "
2021-07-14 08:59:42 -07:00
var output = WakuStore ( rng : rng , peerManager : peerManager , store : store , wakuSwap : wakuSwap , persistMessages : persistMessages )
2021-11-03 11:59:51 +01:00
output . init ( capacity )
2021-07-14 08:59:42 -07:00
return output
2020-08-27 04:44:09 +02:00
2020-09-24 04:16:25 +02:00
# @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY
2021-10-06 14:29:08 +02:00
proc setPeer * ( ws : WakuStore , peer : RemotePeerInfo ) =
2021-02-11 10:58:25 +02:00
ws . peerManager . addPeer ( peer , WakuStoreCodec )
2021-01-29 10:42:41 +02:00
waku_store_peers . inc ( )
2020-09-24 04:16:25 +02:00
2021-07-13 09:18:51 +02:00
proc handleMessage * ( w : WakuStore , topic : string , msg : WakuMessage ) {. async . } =
if ( not w . persistMessages ) :
# Store is mounted but new messages should not be stored
return
2020-08-27 04:44:09 +02:00
2021-07-13 09:18:51 +02:00
# Handle WakuMessage according to store protocol
trace " handle message in WakuStore " , topic = topic , msg = msg
let index = msg . computeIndex ( )
w . messages . add ( IndexedWakuMessage ( msg : msg , index : index , pubsubTopic : topic ) )
2022-01-06 12:23:25 +01:00
waku_store_messages . set ( w . messages . len . int64 , labelValues = [ " stored " ] )
2021-07-13 09:18:51 +02:00
if w . store . isNil :
return
let res = w . store . put ( index , msg , topic )
if res . isErr :
2021-11-03 11:59:51 +01:00
trace " failed to store messages " , err = res . error
2021-07-13 09:18:51 +02:00
waku_store_errors . inc ( labelValues = [ " store_failure " ] )
2020-09-24 04:16:25 +02:00
proc query * ( w : WakuStore , query : HistoryQuery , handler : QueryHandlerFunc ) {. async , gcsafe . } =
# @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service.
# Ideally depending on the query and our set of peers we take a subset of ideal peers.
# This will require us to check for various factors such as:
# - which topics they track
# - latency?
# - default store peer?
2021-02-11 10:58:25 +02:00
let peerOpt = w . peerManager . selectPeer ( WakuStoreCodec )
if peerOpt . isNone ( ) :
2021-02-12 10:53:52 +02:00
error " no suitable remote peers "
2021-02-11 10:58:25 +02:00
waku_store_errors . inc ( labelValues = [ dialFailure ] )
return
let connOpt = await w . peerManager . dialPeer ( peerOpt . get ( ) , WakuStoreCodec )
2021-02-09 10:31:38 +02:00
if connOpt . isNone ( ) :
# @TODO more sophisticated error handling here
error " failed to connect to remote peer "
waku_store_errors . inc ( labelValues = [ dialFailure ] )
return
2020-09-24 04:16:25 +02:00
2021-02-09 10:31:38 +02:00
await connOpt . get ( ) . writeLP ( HistoryRPC ( requestId : generateRequestId ( w . rng ) ,
2020-11-08 20:48:09 -08:00
query : query ) . encode ( ) . buffer )
2020-09-24 04:16:25 +02:00
2022-01-06 13:42:37 +01:00
var message = await connOpt . get ( ) . readLp ( MaxRpcSize . int )
2020-09-25 16:02:13 +02:00
let response = HistoryRPC . init ( message )
2020-09-24 04:16:25 +02:00
if response . isErr :
error " failed to decode response "
2021-02-09 10:31:38 +02:00
waku_store_errors . inc ( labelValues = [ decodeRpcFailure ] )
2020-09-24 04:16:25 +02:00
return
2021-01-29 10:42:41 +02:00
waku_store_messages . set ( response . value . response . messages . len . int64 , labelValues = [ " retrieved " ] )
2020-09-25 16:02:13 +02:00
handler ( response . value . response )
2020-11-16 17:55:49 +08:00
2021-10-06 14:29:08 +02:00
proc queryFrom * ( w : WakuStore , query : HistoryQuery , handler : QueryHandlerFunc , peer : RemotePeerInfo ) : Future [ QueryResult ] {. async , gcsafe . } =
2021-05-26 12:33:22 -07:00
## sends the query to the given peer
## returns the number of retrieved messages if no error occurs, otherwise returns the error string
# TODO dialPeer add it to the list of known peers, while it does not cause any issue but might be unnecessary
2021-05-19 12:28:09 -07:00
let connOpt = await w . peerManager . dialPeer ( peer , WakuStoreCodec )
if connOpt . isNone ( ) :
error " failed to connect to remote peer "
waku_store_errors . inc ( labelValues = [ dialFailure ] )
return err ( " failed to connect to remote peer " )
await connOpt . get ( ) . writeLP ( HistoryRPC ( requestId : generateRequestId ( w . rng ) ,
query : query ) . encode ( ) . buffer )
2021-07-02 11:37:58 -07:00
debug " query is sent " , query = query
2022-01-06 13:42:37 +01:00
var message = await connOpt . get ( ) . readLp ( MaxRpcSize . int )
2021-05-19 12:28:09 -07:00
let response = HistoryRPC . init ( message )
2021-07-02 11:37:58 -07:00
debug " response is received "
2021-05-19 12:28:09 -07:00
if response . isErr :
error " failed to decode response "
waku_store_errors . inc ( labelValues = [ decodeRpcFailure ] )
return err ( " failed to decode response " )
waku_store_messages . set ( response . value . response . messages . len . int64 , labelValues = [ " retrieved " ] )
handler ( response . value . response )
2021-06-21 20:30:12 -07:00
return ok ( response . value . response . messages . len . uint64 )
2021-05-19 12:28:09 -07:00
2021-10-06 14:29:08 +02:00
proc queryFromWithPaging * ( w : WakuStore , query : HistoryQuery , peer : RemotePeerInfo ) : Future [ MessagesResult ] {. async , gcsafe . } =
2021-07-02 11:37:58 -07:00
## a thin wrapper for queryFrom
## sends the query to the given peer
## when the query has a valid pagingInfo, it retrieves the historical messages in pages
## returns all the fetched messages if no error occurs, otherwise returns an error string
debug " queryFromWithPaging is called "
var messageList : seq [ WakuMessage ]
# make a copy of the query
var q = query
debug " query is " , q = q
var hasNextPage = true
2021-07-14 19:58:46 +02:00
proc handler ( response : HistoryResponse ) {. gcsafe . } =
2021-07-02 11:37:58 -07:00
# store messages
for m in response . messages . items : messageList . add ( m )
# check whether it is the last page
hasNextPage = ( response . pagingInfo . pageSize ! = 0 )
debug " hasNextPage " , hasNextPage = hasNextPage
# update paging cursor
q . pagingInfo . cursor = response . pagingInfo . cursor
debug " next paging info " , pagingInfo = q . pagingInfo
# fetch the history in pages
while ( hasNextPage ) :
let successResult = await w . queryFrom ( q , handler , peer )
if not successResult . isOk : return err ( " failed to resolve the query " )
debug " hasNextPage " , hasNextPage = hasNextPage
return ok ( messageList )
2021-10-06 14:29:08 +02:00
proc queryLoop ( w : WakuStore , query : HistoryQuery , candidateList : seq [ RemotePeerInfo ] ) : Future [ MessagesResult ] {. async , gcsafe . } =
2021-05-19 12:28:09 -07:00
## loops through the candidateList in order and sends the query to each until one of the query gets resolved successfully
2021-07-02 11:37:58 -07:00
## returns the retrieved messages, or error if all the requests fail
2021-05-19 12:28:09 -07:00
for peer in candidateList . items :
2021-07-02 11:37:58 -07:00
let successResult = await w . queryFromWithPaging ( query , peer )
if successResult . isOk : return ok ( successResult . value )
2021-05-19 12:28:09 -07:00
debug " failed to resolve the query "
return err ( " failed to resolve the query " )
2021-05-13 14:21:46 -07:00
proc findLastSeen * ( list : seq [ IndexedWakuMessage ] ) : float =
var lastSeenTime = float64 ( 0 )
for iwmsg in list . items :
if iwmsg . msg . timestamp > lastSeenTime :
lastSeenTime = iwmsg . msg . timestamp
return lastSeenTime
2021-06-21 20:30:12 -07:00
proc isDuplicate ( message : WakuMessage , list : seq [ WakuMessage ] ) : bool =
## return true if a duplicate message is found, otherwise false
# it is defined as a separate proc to be bale to adjust comparison criteria
# e.g., to exclude timestamp or include pubsub topic
if message in list : return true
return false
2021-10-06 14:29:08 +02:00
proc resume * ( ws : WakuStore , peerList : Option [ seq [ RemotePeerInfo ] ] = none ( seq [ RemotePeerInfo ] ) , pageSize : uint64 = DefaultPageSize ) : Future [ QueryResult ] {. async , gcsafe . } =
2021-05-13 14:21:46 -07:00
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
## messages are stored in the store node's messages field and in the message db
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
## an offset of 20 second is added to the time window to count for nodes asynchrony
2021-05-19 12:28:09 -07:00
## peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed).
2021-05-26 12:33:22 -07:00
## if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from.
## The history gets fetched successfully if the dialed peer has been online during the queried time window.
2021-05-19 12:28:09 -07:00
## the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
2021-05-26 12:33:22 -07:00
2021-05-13 14:21:46 -07:00
var currentTime = epochTime ( )
2021-11-03 11:59:51 +01:00
var lastSeenTime : float = findLastSeen ( ws . messages . allItems ( ) )
2021-05-13 14:21:46 -07:00
debug " resume " , currentEpochTime = currentTime
2021-05-19 12:28:09 -07:00
2021-05-13 14:21:46 -07:00
# adjust the time window with an offset of 20 seconds
let offset : float64 = 200000
currentTime = currentTime + offset
lastSeenTime = max ( lastSeenTime - offset , 0 )
2021-05-26 12:33:22 -07:00
debug " the offline time window is " , lastSeenTime = lastSeenTime , currentTime = currentTime
2021-05-13 14:21:46 -07:00
2021-07-02 11:37:58 -07:00
let
pinfo = PagingInfo ( direction : PagingDirection . FORWARD , pageSize : pageSize )
rpc = HistoryQuery ( pubsubTopic : DefaultTopic , startTime : lastSeenTime , endTime : currentTime , pagingInfo : pinfo )
2021-06-21 20:30:12 -07:00
var dismissed : uint = 0
var added : uint = 0
2021-07-14 19:58:46 +02:00
proc save ( msgList : seq [ WakuMessage ] ) =
2021-07-02 11:37:58 -07:00
debug " save proc is called "
2021-06-21 20:30:12 -07:00
# exclude index from the comparison criteria
2021-11-03 11:59:51 +01:00
let currentMsgSummary = ws . messages . mapIt ( it . msg )
2021-07-02 11:37:58 -07:00
for msg in msgList :
2021-06-21 20:30:12 -07:00
# check for duplicate messages
# TODO Should take pubsub topic into account if we are going to support topics rather than the DefaultTopic
if isDuplicate ( msg , currentMsgSummary ) :
dismissed = dismissed + 1
continue
# store the new message
2021-05-13 14:21:46 -07:00
let index = msg . computeIndex ( )
2021-06-21 20:30:12 -07:00
let indexedWakuMsg = IndexedWakuMessage ( msg : msg , index : index , pubsubTopic : DefaultTopic )
2021-06-25 14:06:56 -07:00
# store in db if exists
if not ws . store . isNil :
let res = ws . store . put ( index , msg , DefaultTopic )
if res . isErr :
2021-11-03 11:59:51 +01:00
trace " failed to store messages " , err = res . error
2021-06-25 14:06:56 -07:00
waku_store_errors . inc ( labelValues = [ " store_failure " ] )
continue
2021-06-21 20:30:12 -07:00
ws . messages . add ( indexedWakuMsg )
added = added + 1
2022-01-06 12:23:25 +01:00
waku_store_messages . set ( ws . messages . len . int64 , labelValues = [ " stored " ] )
2021-06-21 20:30:12 -07:00
debug " number of duplicate messages found in resume " , dismissed = dismissed
debug " number of messages added via resume " , added = added
2021-05-19 12:28:09 -07:00
if peerList . isSome :
2021-05-26 12:33:22 -07:00
debug " trying the candidate list to fetch the history "
2021-07-02 11:37:58 -07:00
let successResult = await ws . queryLoop ( rpc , peerList . get ( ) )
2021-05-19 12:28:09 -07:00
if successResult . isErr :
debug " failed to resume the history from the list of candidates "
return err ( " failed to resume the history from the list of candidates " )
2021-05-26 12:33:22 -07:00
debug " resume is done successfully "
2021-07-02 11:37:58 -07:00
save ( successResult . value )
return ok ( added )
2021-05-19 12:28:09 -07:00
else :
2021-05-26 12:33:22 -07:00
debug " no candidate list is provided, selecting a random peer "
2021-05-19 12:28:09 -07:00
# if no peerList is set then query from one of the peers stored in the peer manager
let peerOpt = ws . peerManager . selectPeer ( WakuStoreCodec )
if peerOpt . isNone ( ) :
error " no suitable remote peers "
waku_store_errors . inc ( labelValues = [ dialFailure ] )
return err ( " no suitable remote peers " )
2021-05-26 12:33:22 -07:00
debug " a peer is selected from peer manager "
2021-10-06 14:29:08 +02:00
let remotePeerInfo = peerOpt . get ( )
let successResult = await ws . queryFromWithPaging ( rpc , remotePeerInfo )
2021-05-19 12:28:09 -07:00
if successResult . isErr :
debug " failed to resume the history "
return err ( " failed to resume the history " )
2021-05-26 12:33:22 -07:00
debug " resume is done successfully "
2021-07-02 11:37:58 -07:00
save ( successResult . value )
2021-06-21 20:30:12 -07:00
return ok ( added )
2021-05-13 14:21:46 -07:00
2020-11-16 17:55:49 +08:00
# NOTE: Experimental, maybe incorporate as part of query call
2020-11-24 12:53:42 +08:00
proc queryWithAccounting * ( ws : WakuStore , query : HistoryQuery , handler : QueryHandlerFunc ) {. async , gcsafe . } =
2020-11-16 17:55:49 +08:00
# @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service.
# Ideally depending on the query and our set of peers we take a subset of ideal peers.
# This will require us to check for various factors such as:
# - which topics they track
# - latency?
# - default store peer?
2021-02-11 10:58:25 +02:00
let peerOpt = ws . peerManager . selectPeer ( WakuStoreCodec )
if peerOpt . isNone ( ) :
2021-02-12 10:53:52 +02:00
error " no suitable remote peers "
2021-02-11 10:58:25 +02:00
waku_store_errors . inc ( labelValues = [ dialFailure ] )
return
let connOpt = await ws . peerManager . dialPeer ( peerOpt . get ( ) , WakuStoreCodec )
2021-02-09 10:31:38 +02:00
if connOpt . isNone ( ) :
# @TODO more sophisticated error handling here
error " failed to connect to remote peer "
waku_store_errors . inc ( labelValues = [ dialFailure ] )
return
2020-11-16 17:55:49 +08:00
2021-02-09 10:31:38 +02:00
await connOpt . get ( ) . writeLP ( HistoryRPC ( requestId : generateRequestId ( ws . rng ) ,
2020-11-16 17:55:49 +08:00
query : query ) . encode ( ) . buffer )
2022-01-06 13:42:37 +01:00
var message = await connOpt . get ( ) . readLp ( MaxRpcSize . int )
2020-11-16 17:55:49 +08:00
let response = HistoryRPC . init ( message )
if response . isErr :
error " failed to decode response "
2021-02-09 10:31:38 +02:00
waku_store_errors . inc ( labelValues = [ decodeRpcFailure ] )
2020-11-16 17:55:49 +08:00
return
# NOTE Perform accounting operation
2020-11-24 12:53:42 +08:00
# Assumes wakuSwap protocol is mounted
2021-10-06 14:29:08 +02:00
let remotePeerInfo = peerOpt . get ( )
2020-11-16 17:55:49 +08:00
let messages = response . value . response . messages
2021-10-06 14:29:08 +02:00
ws . wakuSwap . debit ( remotePeerInfo . peerId , messages . len )
2020-11-16 17:55:49 +08:00
2021-01-29 10:42:41 +02:00
waku_store_messages . set ( response . value . response . messages . len . int64 , labelValues = [ " retrieved " ] )
2020-11-16 17:55:49 +08:00
handler ( response . value . response )