refactor(waku-store): inherit from message_store in store_queue

This commit is contained in:
Lorenzo Delgado 2022-08-01 14:09:41 +02:00 committed by Lorenzo Delgado
parent 613812023e
commit 4f93510fc9
3 changed files with 181 additions and 89 deletions

View File

@ -6,19 +6,34 @@
import
std/options,
stew/results,
chronos
import
../../../protocol/waku_message,
../../../protocol/waku_store/rpc,
../../../utils/time,
../../../utils/pagination
type
DataProc* = proc(receiverTimestamp: Timestamp, msg: WakuMessage, pubsubTopic: string) {.closure, raises: [Defect].}
const
StoreDefaultCapacity* = 25_000
StoreMaxOverflow* = 1.3
StoreDefaultRetentionTime* = chronos.days(30).seconds
StoreMaxPageSize* = 100.uint64
StoreMaxTimeVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" into the future
type
MessageStoreResult*[T] = Result[T, string]
MessageStorePage* = (seq[WakuMessage], Option[PagingInfo])
MessageStoreRow* = (Timestamp, WakuMessage, string)
MessageStore* = ref object of RootObj
# TODO: Deprecate the following type
type DataProc* = proc(receiverTimestamp: Timestamp, msg: WakuMessage, pubsubTopic: string) {.closure, raises: [Defect].}
# TODO: Remove after resolving nwaku #1026. Move it back to waku_store_queue.nim
type
@ -33,8 +48,29 @@ type
# MessageStore interface
method getMostRecentMessageTimestamp*(db: MessageStore): MessageStoreResult[Timestamp] {.base.} = discard
method getOldestMessageTimestamp*(db: MessageStore): MessageStoreResult[Timestamp] {.base.} = discard
method put*(db: MessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] {.base.} = discard
# TODO: Deprecate the following methods after after #1026
method getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] {.base.} = discard
method getPage*(db: MessageStore, pred: QueryFilterMatcher, pagingInfo: PagingInfo): MessageStoreResult[(seq[WakuMessage], PagingInfo, HistoryResponseError)] {.base.} = discard
method getPage*(db: MessageStore, pagingInfo: PagingInfo): MessageStoreResult[(seq[WakuMessage], PagingInfo, HistoryResponseError)] {.base.} = discard
# TODO: Move to sqlite store
method getAllMessages(db: MessageStore): MessageStoreResult[seq[MessageStoreRow]] {.base.} = discard
method getMessagesByHistoryQuery*(
db: MessageStore,
contentTopic = none(seq[ContentTopic]),
pubsubTopic = none(string),
cursor = none(Index),
startTime = none(Timestamp),
endTime = none(Timestamp),
maxPageSize = StoreMaxPageSize,
ascendingOrder = true
): MessageStoreResult[MessageStorePage] {.base.} = discard

View File

@ -21,23 +21,10 @@ logScope:
topics = "message_store.storequeue"
const
MaxPageSize = uint64(100) # Maximum number of waku messages in each page
MaxTimeVariance = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" into the future
type QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.}
type
QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.}
QueryResult* = Result[uint64, string]
MessagesResult* = Result[seq[WakuMessage], string]
type
StoreQueueResult*[T] = Result[T, cstring]
StoreQueueRef* = ref object
StoreQueueRef* = ref object of MessageStore
## Bounded repository for indexed messages
##
## The store queue will keep messages up to its
@ -199,7 +186,7 @@ proc bwdPage(storeQueue: StoreQueueRef,
lastValidCursor = startCursor.get()
let cursorEntry = w.rwdToCursor(startCursor.get())
if cursorEntry.isErr:
if cursorEntry.isErr():
# Quick exit here if start cursor not found
trace "Could not find starting cursor. Returning empty result.", startCursor=startCursor
outSeq = @[]
@ -222,10 +209,9 @@ proc bwdPage(storeQueue: StoreQueueRef,
## 2. adds entries matching the predicate function to output page
## 3. until either the beginning of the queue or maxPageSize is reached
var numberOfItems = 0.uint
while currentEntry.isOk and numberOfItems < maxPageSize:
while currentEntry.isOk() and numberOfItems < maxPageSize:
trace "Continuing bwd page query", currentEntry=currentEntry, numberOfItems=numberOfItems
if pred(currentEntry.value.data):
trace "Current item matches predicate. Adding to output."
lastValidCursor = currentEntry.value.key
@ -247,72 +233,101 @@ proc bwdPage(storeQueue: StoreQueueRef,
#### API
proc new*(T: type StoreQueueRef, capacity: int = StoreDefaultCapacity): T =
var items = SortedSet[Index, IndexedWakuMessage].init()
return StoreQueueRef(items: items, capacity: capacity)
proc contains*(store: StoreQueueRef, index: Index): bool =
## Return `true` if the store queue already contains the `index`, `false` otherwise.
store.items.eq(index).isOk()
proc len*(store: StoreQueueRef): int {.noSideEffect.} =
store.items.len
proc `$`*(store: StoreQueueRef): string =
$store.items
## --- SortedSet accessors ---
iterator fwdIterator*(storeQueue: StoreQueueRef): (Index, IndexedWakuMessage) =
## Forward iterator over the entire store queue
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
res = w.first
while res.isOk:
res = w.first()
while res.isOk():
yield (res.value.key, res.value.data)
res = w.next
w.destroy
res = w.next()
w.destroy()
iterator bwdIterator*(storeQueue: StoreQueueRef): (Index, IndexedWakuMessage) =
## Backwards iterator over the entire store queue
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
res = w.last
while res.isOk:
res = w.last()
while res.isOk():
yield (res.value.key, res.value.data)
res = w.prev
w.destroy
res = w.prev()
proc first*(storeQueue: StoreQueueRef): StoreQueueResult[IndexedWakuMessage] =
w.destroy()
proc first*(store: StoreQueueRef): MessageStoreResult[IndexedWakuMessage] =
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
res = w.first
w.destroy
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(store.items)
res = w.first()
w.destroy()
if res.isOk:
return ok(res.value.data)
else:
if res.isErr():
return err("Not found")
proc last*(storeQueue: StoreQueueRef): StoreQueueResult[IndexedWakuMessage] =
return ok(res.value.data)
proc last*(storeQueue: StoreQueueRef): MessageStoreResult[IndexedWakuMessage] =
var
w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items)
res = w.last
w.destroy
res = w.last()
w.destroy()
if res.isOk:
return ok(res.value.data)
else:
if res.isErr():
return err("Not found")
return ok(res.value.data)
## --- Queue API ---
proc new*(T: type StoreQueueRef, capacity: int): T =
var items = SortedSet[Index, IndexedWakuMessage].init()
method getMostRecentMessageTimestamp*(store: StoreQueueRef): MessageStoreResult[Timestamp] =
let message = ?store.last()
ok(message.index.receiverTime)
return StoreQueueRef(items: items, capacity: capacity)
method getOldestMessageTimestamp*(store: StoreQueueRef): MessageStoreResult[Timestamp] =
let message = ?store.first()
ok(message.index.receiverTime)
proc add*(storeQueue: StoreQueueRef, msg: IndexedWakuMessage): StoreQueueResult[void] =
proc add*(store: StoreQueueRef, msg: IndexedWakuMessage): MessageStoreResult[void] =
## Add a message to the queue
## If we're at capacity, we will be removing,
## the oldest (first) item
# Ensure that messages don't "jump" to the front of the queue with future timestamps
if msg.index.senderTime - msg.index.receiverTime > MaxTimeVariance:
return err("future_sender_timestamp")
trace "Adding item to store queue", msg=msg
##
## If we're at capacity, we will be removing, the oldest (first) item
trace "adding item to store queue", msg=msg
# TODO the below delete block can be removed if we convert to circular buffer
if storeQueue.items.len >= storeQueue.capacity:
# Ensure that messages don't "jump" to the front of the queue with future timestamps
if msg.index.senderTime - msg.index.receiverTime > StoreMaxTimeVariance:
return err("future_sender_timestamp")
if store.contains(msg.index):
trace "could not add item to store queue. Index already exists", index=msg.index
return err("duplicate")
# TODO: the below delete block can be removed if we convert to circular buffer
if store.items.len >= store.capacity:
var
w = SortedSetWalkRef[Index, IndexedWakuMessage].init(storeQueue.items)
w = SortedSetWalkRef[Index, IndexedWakuMessage].init(store.items)
firstItem = w.first
if cmp(msg.index, firstItem.value.key) < 0:
@ -320,22 +335,18 @@ proc add*(storeQueue: StoreQueueRef, msg: IndexedWakuMessage): StoreQueueResult[
w.destroy # Clean up walker
return err("too_old")
discard storeQueue.items.delete(firstItem.value.key)
discard store.items.delete(firstItem.value.key)
w.destroy # better to destroy walker after a delete operation
let res = storeQueue.items.insert(msg.index)
if res.isErr:
# This indicates the index already exists in the storeQueue.
# TODO: could return error result and log in metrics
store.items.insert(msg.index).value.data = msg
trace "Could not add item to store queue. Index already exists.", index=msg.index
return err("duplicate")
else:
res.value.data = msg
trace "Successfully added item to store queue.", msg=msg
return ok()
method put*(store: StoreQueueRef, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] =
let message = IndexedWakuMessage(msg: message, index: cursor, pubsubTopic: pubsubTopic)
store.add(message)
proc getPage*(storeQueue: StoreQueueRef,
pred: QueryFilterMatcher,
pagingInfo: PagingInfo):
@ -348,8 +359,8 @@ proc getPage*(storeQueue: StoreQueueRef,
let
cursorOpt = if pagingInfo.cursor == Index(): none(Index) ## TODO: pagingInfo.cursor should be an Option. We shouldn't rely on empty initialisation to determine if set or not!
else: some(pagingInfo.cursor)
maxPageSize = if pagingInfo.pageSize == 0 or pagingInfo.pageSize > MaxPageSize: MaxPageSize # Used default MaxPageSize for invalid pagingInfos
else: pagingInfo.pageSize
maxPageSize = if pagingInfo.pageSize <= 0: StoreMaxPageSize
else: min(pagingInfo.pageSize, StoreMaxPageSize)
case pagingInfo.direction
of PagingDirection.FORWARD:
@ -367,15 +378,55 @@ proc getPage*(storeQueue: StoreQueueRef,
return getPage(storeQueue, predicate, pagingInfo)
proc contains*(storeQueue: StoreQueueRef, index: Index): bool =
## Return `true` if the store queue already contains the `index`,
## `false` otherwise
let res = storeQueue.items.eq(index)
return res.isOk()
method getMessagesByHistoryQuery*(
store: StoreQueueRef,
contentTopic = none(seq[ContentTopic]),
pubsubTopic = none(string),
cursor = none(Index),
startTime = none(Timestamp),
endTime = none(Timestamp),
maxPageSize = StoreMaxPageSize,
ascendingOrder = true
): MessageStoreResult[MessageStorePage] =
proc len*(storeQueue: StoreQueueRef): int {.noSideEffect.} =
storeQueue.items.len
proc matchesQuery(indMsg: IndexedWakuMessage): bool =
trace "Matching indexed message against predicate", msg=indMsg
proc `$`*(storeQueue: StoreQueueRef): string =
$(storeQueue.items)
if pubsubTopic.isSome():
# filter by pubsub topic
if indMsg.pubsubTopic != pubsubTopic.get():
trace "Failed to match pubsub topic", criteria=pubsubTopic.get(), actual=indMsg.pubsubTopic
return false
if startTime.isSome() and endTime.isSome():
# temporal filtering: select only messages whose sender generated timestamps fall
# between the queried start time and end time
if indMsg.msg.timestamp > endTime.get() or indMsg.msg.timestamp < startTime.get():
trace "Failed to match temporal filter", criteriaStart=startTime.get(), criteriaEnd=endTime.get(), actual=indMsg.msg.timestamp
return false
if contentTopic.isSome():
# filter by content topic
if indMsg.msg.contentTopic notin contentTopic.get():
trace "Failed to match content topic", criteria=contentTopic.get(), actual=indMsg.msg.contentTopic
return false
return true
let queryPagingInfo = PagingInfo(
pageSize: maxPageSize,
cursor: cursor.get(Index()),
direction: if ascendingOrder: PagingDirection.FORWARD
else: PagingDirection.BACKWARD
)
let (messages, pagingInfo, error) = store.getPage(matchesQuery, queryPagingInfo)
if error == HistoryResponseError.INVALID_CURSOR:
return err("invalid cursor")
if messages.len == 0:
return ok((messages, none(PagingInfo)))
ok((messages, some(pagingInfo)))

View File

@ -38,14 +38,14 @@ logScope:
const
# Constants required for pagination -------------------------------------------
MaxPageSize* = uint64(100) # Maximum number of waku messages in each page
MaxPageSize* = StoreMaxPageSize
# TODO the DefaultPageSize can be changed, it's current value is random
DefaultPageSize* = uint64(20) # A recommended default number of waku messages per page
MaxRpcSize* = MaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead
MaxRpcSize* = StoreMaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead
MaxTimeVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" into the future
MaxTimeVariance* = StoreMaxTimeVariance
DefaultTopic* = "/waku/2/default-waku/proto"
@ -60,6 +60,8 @@ const
decodeRpcFailure = "decode_rpc_failure"
type
WakuStoreResult*[T] = Result[T, string]
WakuStore* = ref object of LPProtocol
peerManager*: PeerManager
rng*: ref BrHmacDrbgContext
@ -275,7 +277,10 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn
waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"])
handler(response.value.response)
proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, peer: RemotePeerInfo): Future[QueryResult] {.async, gcsafe.} =
## 21/WAKU2-FAULT-TOLERANT-STORE
proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, peer: RemotePeerInfo): Future[WakuStoreResult[uint64]] {.async, gcsafe.} =
## sends the query to the given peer
## returns the number of retrieved messages if no error occurs, otherwise returns the error string
# TODO dialPeer add it to the list of known peers, while it does not cause any issue but might be unnecessary
@ -304,7 +309,7 @@ proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, pe
handler(response.value.response)
return ok(response.value.response.messages.len.uint64)
proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInfo): Future[MessagesResult] {.async, gcsafe.} =
proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} =
## a thin wrapper for queryFrom
## sends the query to the given peer
## when the query has a valid pagingInfo, it retrieves the historical messages in pages
@ -336,17 +341,17 @@ proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInf
return ok(messageList)
proc queryLoop(w: WakuStore, query: HistoryQuery, candidateList: seq[RemotePeerInfo]): Future[MessagesResult] {.async, gcsafe.} =
proc queryLoop(w: WakuStore, query: HistoryQuery, candidateList: seq[RemotePeerInfo]): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} =
## loops through the candidateList in order and sends the query to each
## once all responses have been received, the retrieved messages are consolidated into one deduplicated list
## if no messages have been retrieved, the returned future will resolve into a MessagesResult result holding an empty seq.
var futureList: seq[Future[MessagesResult]]
var futureList: seq[Future[WakuStoreResult[seq[WakuMessage]]]]
for peer in candidateList.items:
futureList.add(w.queryFromWithPaging(query, peer))
await allFutures(futureList) # all(), which returns a Future[seq[T]], has been deprecated
let messagesList = futureList
.map(proc (fut: Future[MessagesResult]): seq[WakuMessage] =
.map(proc (fut: Future[WakuStoreResult[seq[WakuMessage]]]): seq[WakuMessage] =
if fut.completed() and fut.read().isOk(): # completed() just as a sanity check. These futures have been awaited before using allFutures()
fut.read().value
else:
@ -360,7 +365,7 @@ proc queryLoop(w: WakuStore, query: HistoryQuery, candidateList: seq[RemotePeerI
debug "failed to resolve the query"
return err("failed to resolve the query")
proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo]), pageSize: uint64 = DefaultPageSize): Future[QueryResult] {.async, gcsafe.} =
proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo]), pageSize: uint64 = DefaultPageSize): Future[WakuStoreResult[uint64]] {.async, gcsafe.} =
## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
## messages are stored in the store node's messages field and in the message db
## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
@ -457,6 +462,7 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem
save(successResult.value)
return ok(added)
# NOTE: Experimental, maybe incorporate as part of query call
proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} =
# @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service.
@ -503,7 +509,6 @@ proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHand
handler(response.value.response)
# TODO: Remove the following deprecated method
proc computeIndex*(msg: WakuMessage,
receivedTime = getNanosecondTime(getTime().toUnixFloat()),