diff --git a/waku/v2/node/storage/message/message_store.nim b/waku/v2/node/storage/message/message_store.nim index b87e8408c..dc0771054 100644 --- a/waku/v2/node/storage/message/message_store.nim +++ b/waku/v2/node/storage/message/message_store.nim @@ -6,19 +6,34 @@ import std/options, stew/results, + chronos +import ../../../protocol/waku_message, ../../../protocol/waku_store/rpc, ../../../utils/time, ../../../utils/pagination -type - DataProc* = proc(receiverTimestamp: Timestamp, msg: WakuMessage, pubsubTopic: string) {.closure, raises: [Defect].} +const + StoreDefaultCapacity* = 25_000 + StoreMaxOverflow* = 1.3 + StoreDefaultRetentionTime* = chronos.days(30).seconds + StoreMaxPageSize* = 100.uint64 + StoreMaxTimeVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" into the future + +type MessageStoreResult*[T] = Result[T, string] + + MessageStorePage* = (seq[WakuMessage], Option[PagingInfo]) + + MessageStoreRow* = (Timestamp, WakuMessage, string) MessageStore* = ref object of RootObj +# TODO: Deprecate the following type +type DataProc* = proc(receiverTimestamp: Timestamp, msg: WakuMessage, pubsubTopic: string) {.closure, raises: [Defect].} + # TODO: Remove after resolving nwaku #1026. Move it back to waku_store_queue.nim type @@ -33,8 +48,29 @@ type # MessageStore interface +method getMostRecentMessageTimestamp*(db: MessageStore): MessageStoreResult[Timestamp] {.base.} = discard + +method getOldestMessageTimestamp*(db: MessageStore): MessageStoreResult[Timestamp] {.base.} = discard + method put*(db: MessageStore, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] {.base.} = discard + + +# TODO: Deprecate the following methods after after #1026 method getAll*(db: MessageStore, onData: DataProc): MessageStoreResult[bool] {.base.} = discard method getPage*(db: MessageStore, pred: QueryFilterMatcher, pagingInfo: PagingInfo): MessageStoreResult[(seq[WakuMessage], PagingInfo, HistoryResponseError)] {.base.} = discard method getPage*(db: MessageStore, pagingInfo: PagingInfo): MessageStoreResult[(seq[WakuMessage], PagingInfo, HistoryResponseError)] {.base.} = discard + +# TODO: Move to sqlite store +method getAllMessages(db: MessageStore): MessageStoreResult[seq[MessageStoreRow]] {.base.} = discard + +method getMessagesByHistoryQuery*( + db: MessageStore, + contentTopic = none(seq[ContentTopic]), + pubsubTopic = none(string), + cursor = none(Index), + startTime = none(Timestamp), + endTime = none(Timestamp), + maxPageSize = StoreMaxPageSize, + ascendingOrder = true +): MessageStoreResult[MessageStorePage] {.base.} = discard diff --git a/waku/v2/node/storage/message/waku_store_queue.nim b/waku/v2/node/storage/message/waku_store_queue.nim index 545655733..85d814cdb 100644 --- a/waku/v2/node/storage/message/waku_store_queue.nim +++ b/waku/v2/node/storage/message/waku_store_queue.nim @@ -21,23 +21,10 @@ logScope: topics = "message_store.storequeue" -const - MaxPageSize = uint64(100) # Maximum number of waku messages in each page - - MaxTimeVariance = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" into the future - +type QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.} type - QueryHandlerFunc* = proc(response: HistoryResponse) {.gcsafe, closure.} - - QueryResult* = Result[uint64, string] - - MessagesResult* = Result[seq[WakuMessage], string] - -type - StoreQueueResult*[T] = Result[T, cstring] - - StoreQueueRef* = ref object + StoreQueueRef* = ref object of MessageStore ## Bounded repository for indexed messages ## ## The store queue will keep messages up to its @@ -199,7 +186,7 @@ proc bwdPage(storeQueue: StoreQueueRef, lastValidCursor = startCursor.get() let cursorEntry = w.rwdToCursor(startCursor.get()) - if cursorEntry.isErr: + if cursorEntry.isErr(): # Quick exit here if start cursor not found trace "Could not find starting cursor. Returning empty result.", startCursor=startCursor outSeq = @[] @@ -222,10 +209,9 @@ proc bwdPage(storeQueue: StoreQueueRef, ## 2. adds entries matching the predicate function to output page ## 3. until either the beginning of the queue or maxPageSize is reached var numberOfItems = 0.uint - while currentEntry.isOk and numberOfItems < maxPageSize: - + while currentEntry.isOk() and numberOfItems < maxPageSize: trace "Continuing bwd page query", currentEntry=currentEntry, numberOfItems=numberOfItems - + if pred(currentEntry.value.data): trace "Current item matches predicate. Adding to output." lastValidCursor = currentEntry.value.key @@ -247,72 +233,101 @@ proc bwdPage(storeQueue: StoreQueueRef, #### API + +proc new*(T: type StoreQueueRef, capacity: int = StoreDefaultCapacity): T = + var items = SortedSet[Index, IndexedWakuMessage].init() + return StoreQueueRef(items: items, capacity: capacity) + + +proc contains*(store: StoreQueueRef, index: Index): bool = + ## Return `true` if the store queue already contains the `index`, `false` otherwise. + store.items.eq(index).isOk() + +proc len*(store: StoreQueueRef): int {.noSideEffect.} = + store.items.len + +proc `$`*(store: StoreQueueRef): string = + $store.items + + ## --- SortedSet accessors --- iterator fwdIterator*(storeQueue: StoreQueueRef): (Index, IndexedWakuMessage) = ## Forward iterator over the entire store queue var w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items) - res = w.first - while res.isOk: + res = w.first() + + while res.isOk(): yield (res.value.key, res.value.data) - res = w.next - w.destroy + res = w.next() + + w.destroy() iterator bwdIterator*(storeQueue: StoreQueueRef): (Index, IndexedWakuMessage) = ## Backwards iterator over the entire store queue var w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items) - res = w.last - while res.isOk: + res = w.last() + + while res.isOk(): yield (res.value.key, res.value.data) - res = w.prev - w.destroy + res = w.prev() -proc first*(storeQueue: StoreQueueRef): StoreQueueResult[IndexedWakuMessage] = + w.destroy() + +proc first*(store: StoreQueueRef): MessageStoreResult[IndexedWakuMessage] = var - w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items) - res = w.first - w.destroy + w = SortedSetWalkRef[Index,IndexedWakuMessage].init(store.items) + res = w.first() + w.destroy() - if res.isOk: - return ok(res.value.data) - else: + if res.isErr(): return err("Not found") -proc last*(storeQueue: StoreQueueRef): StoreQueueResult[IndexedWakuMessage] = + return ok(res.value.data) + +proc last*(storeQueue: StoreQueueRef): MessageStoreResult[IndexedWakuMessage] = var w = SortedSetWalkRef[Index,IndexedWakuMessage].init(storeQueue.items) - res = w.last - w.destroy + res = w.last() + w.destroy() - if res.isOk: - return ok(res.value.data) - else: + if res.isErr(): return err("Not found") + return ok(res.value.data) + ## --- Queue API --- -proc new*(T: type StoreQueueRef, capacity: int): T = - var items = SortedSet[Index, IndexedWakuMessage].init() +method getMostRecentMessageTimestamp*(store: StoreQueueRef): MessageStoreResult[Timestamp] = + let message = ?store.last() + ok(message.index.receiverTime) - return StoreQueueRef(items: items, capacity: capacity) +method getOldestMessageTimestamp*(store: StoreQueueRef): MessageStoreResult[Timestamp] = + let message = ?store.first() + ok(message.index.receiverTime) -proc add*(storeQueue: StoreQueueRef, msg: IndexedWakuMessage): StoreQueueResult[void] = + +proc add*(store: StoreQueueRef, msg: IndexedWakuMessage): MessageStoreResult[void] = ## Add a message to the queue - ## If we're at capacity, we will be removing, - ## the oldest (first) item - - # Ensure that messages don't "jump" to the front of the queue with future timestamps - if msg.index.senderTime - msg.index.receiverTime > MaxTimeVariance: - return err("future_sender_timestamp") - - trace "Adding item to store queue", msg=msg + ## + ## If we're at capacity, we will be removing, the oldest (first) item + trace "adding item to store queue", msg=msg - # TODO the below delete block can be removed if we convert to circular buffer - if storeQueue.items.len >= storeQueue.capacity: + # Ensure that messages don't "jump" to the front of the queue with future timestamps + if msg.index.senderTime - msg.index.receiverTime > StoreMaxTimeVariance: + return err("future_sender_timestamp") + + if store.contains(msg.index): + trace "could not add item to store queue. Index already exists", index=msg.index + return err("duplicate") + + + # TODO: the below delete block can be removed if we convert to circular buffer + if store.items.len >= store.capacity: var - w = SortedSetWalkRef[Index, IndexedWakuMessage].init(storeQueue.items) + w = SortedSetWalkRef[Index, IndexedWakuMessage].init(store.items) firstItem = w.first if cmp(msg.index, firstItem.value.key) < 0: @@ -320,22 +335,18 @@ proc add*(storeQueue: StoreQueueRef, msg: IndexedWakuMessage): StoreQueueResult[ w.destroy # Clean up walker return err("too_old") - discard storeQueue.items.delete(firstItem.value.key) + discard store.items.delete(firstItem.value.key) w.destroy # better to destroy walker after a delete operation - let res = storeQueue.items.insert(msg.index) - if res.isErr: - # This indicates the index already exists in the storeQueue. - # TODO: could return error result and log in metrics + store.items.insert(msg.index).value.data = msg - trace "Could not add item to store queue. Index already exists.", index=msg.index - return err("duplicate") - else: - res.value.data = msg - - trace "Successfully added item to store queue.", msg=msg return ok() +method put*(store: StoreQueueRef, cursor: Index, message: WakuMessage, pubsubTopic: string): MessageStoreResult[void] = + let message = IndexedWakuMessage(msg: message, index: cursor, pubsubTopic: pubsubTopic) + store.add(message) + + proc getPage*(storeQueue: StoreQueueRef, pred: QueryFilterMatcher, pagingInfo: PagingInfo): @@ -348,8 +359,8 @@ proc getPage*(storeQueue: StoreQueueRef, let cursorOpt = if pagingInfo.cursor == Index(): none(Index) ## TODO: pagingInfo.cursor should be an Option. We shouldn't rely on empty initialisation to determine if set or not! else: some(pagingInfo.cursor) - maxPageSize = if pagingInfo.pageSize == 0 or pagingInfo.pageSize > MaxPageSize: MaxPageSize # Used default MaxPageSize for invalid pagingInfos - else: pagingInfo.pageSize + maxPageSize = if pagingInfo.pageSize <= 0: StoreMaxPageSize + else: min(pagingInfo.pageSize, StoreMaxPageSize) case pagingInfo.direction of PagingDirection.FORWARD: @@ -367,15 +378,55 @@ proc getPage*(storeQueue: StoreQueueRef, return getPage(storeQueue, predicate, pagingInfo) -proc contains*(storeQueue: StoreQueueRef, index: Index): bool = - ## Return `true` if the store queue already contains the `index`, - ## `false` otherwise - let res = storeQueue.items.eq(index) - return res.isOk() +method getMessagesByHistoryQuery*( + store: StoreQueueRef, + contentTopic = none(seq[ContentTopic]), + pubsubTopic = none(string), + cursor = none(Index), + startTime = none(Timestamp), + endTime = none(Timestamp), + maxPageSize = StoreMaxPageSize, + ascendingOrder = true +): MessageStoreResult[MessageStorePage] = -proc len*(storeQueue: StoreQueueRef): int {.noSideEffect.} = - storeQueue.items.len + proc matchesQuery(indMsg: IndexedWakuMessage): bool = + trace "Matching indexed message against predicate", msg=indMsg -proc `$`*(storeQueue: StoreQueueRef): string = - $(storeQueue.items) + if pubsubTopic.isSome(): + # filter by pubsub topic + if indMsg.pubsubTopic != pubsubTopic.get(): + trace "Failed to match pubsub topic", criteria=pubsubTopic.get(), actual=indMsg.pubsubTopic + return false + + if startTime.isSome() and endTime.isSome(): + # temporal filtering: select only messages whose sender generated timestamps fall + # between the queried start time and end time + if indMsg.msg.timestamp > endTime.get() or indMsg.msg.timestamp < startTime.get(): + trace "Failed to match temporal filter", criteriaStart=startTime.get(), criteriaEnd=endTime.get(), actual=indMsg.msg.timestamp + return false + + if contentTopic.isSome(): + # filter by content topic + if indMsg.msg.contentTopic notin contentTopic.get(): + trace "Failed to match content topic", criteria=contentTopic.get(), actual=indMsg.msg.contentTopic + return false + + return true + + + let queryPagingInfo = PagingInfo( + pageSize: maxPageSize, + cursor: cursor.get(Index()), + direction: if ascendingOrder: PagingDirection.FORWARD + else: PagingDirection.BACKWARD + ) + let (messages, pagingInfo, error) = store.getPage(matchesQuery, queryPagingInfo) + + if error == HistoryResponseError.INVALID_CURSOR: + return err("invalid cursor") + + if messages.len == 0: + return ok((messages, none(PagingInfo))) + + ok((messages, some(pagingInfo))) diff --git a/waku/v2/protocol/waku_store/protocol.nim b/waku/v2/protocol/waku_store/protocol.nim index 032ff3835..0a519c5d7 100644 --- a/waku/v2/protocol/waku_store/protocol.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -38,14 +38,14 @@ logScope: const # Constants required for pagination ------------------------------------------- - MaxPageSize* = uint64(100) # Maximum number of waku messages in each page + MaxPageSize* = StoreMaxPageSize # TODO the DefaultPageSize can be changed, it's current value is random DefaultPageSize* = uint64(20) # A recommended default number of waku messages per page - MaxRpcSize* = MaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead + MaxRpcSize* = StoreMaxPageSize * MaxWakuMessageSize + 64*1024 # We add a 64kB safety buffer for protocol overhead - MaxTimeVariance* = getNanoSecondTime(20) # 20 seconds maximum allowable sender timestamp "drift" into the future + MaxTimeVariance* = StoreMaxTimeVariance DefaultTopic* = "/waku/2/default-waku/proto" @@ -60,6 +60,8 @@ const decodeRpcFailure = "decode_rpc_failure" type + WakuStoreResult*[T] = Result[T, string] + WakuStore* = ref object of LPProtocol peerManager*: PeerManager rng*: ref BrHmacDrbgContext @@ -275,7 +277,10 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"]) handler(response.value.response) -proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, peer: RemotePeerInfo): Future[QueryResult] {.async, gcsafe.} = + +## 21/WAKU2-FAULT-TOLERANT-STORE + +proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, peer: RemotePeerInfo): Future[WakuStoreResult[uint64]] {.async, gcsafe.} = ## sends the query to the given peer ## returns the number of retrieved messages if no error occurs, otherwise returns the error string # TODO dialPeer add it to the list of known peers, while it does not cause any issue but might be unnecessary @@ -304,7 +309,7 @@ proc queryFrom*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc, pe handler(response.value.response) return ok(response.value.response.messages.len.uint64) -proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInfo): Future[MessagesResult] {.async, gcsafe.} = +proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} = ## a thin wrapper for queryFrom ## sends the query to the given peer ## when the query has a valid pagingInfo, it retrieves the historical messages in pages @@ -336,17 +341,17 @@ proc queryFromWithPaging*(w: WakuStore, query: HistoryQuery, peer: RemotePeerInf return ok(messageList) -proc queryLoop(w: WakuStore, query: HistoryQuery, candidateList: seq[RemotePeerInfo]): Future[MessagesResult] {.async, gcsafe.} = +proc queryLoop(w: WakuStore, query: HistoryQuery, candidateList: seq[RemotePeerInfo]): Future[WakuStoreResult[seq[WakuMessage]]] {.async, gcsafe.} = ## loops through the candidateList in order and sends the query to each ## once all responses have been received, the retrieved messages are consolidated into one deduplicated list ## if no messages have been retrieved, the returned future will resolve into a MessagesResult result holding an empty seq. - var futureList: seq[Future[MessagesResult]] + var futureList: seq[Future[WakuStoreResult[seq[WakuMessage]]]] for peer in candidateList.items: futureList.add(w.queryFromWithPaging(query, peer)) await allFutures(futureList) # all(), which returns a Future[seq[T]], has been deprecated let messagesList = futureList - .map(proc (fut: Future[MessagesResult]): seq[WakuMessage] = + .map(proc (fut: Future[WakuStoreResult[seq[WakuMessage]]]): seq[WakuMessage] = if fut.completed() and fut.read().isOk(): # completed() just as a sanity check. These futures have been awaited before using allFutures() fut.read().value else: @@ -360,7 +365,7 @@ proc queryLoop(w: WakuStore, query: HistoryQuery, candidateList: seq[RemotePeerI debug "failed to resolve the query" return err("failed to resolve the query") -proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo]), pageSize: uint64 = DefaultPageSize): Future[QueryResult] {.async, gcsafe.} = +proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[RemotePeerInfo]), pageSize: uint64 = DefaultPageSize): Future[WakuStoreResult[uint64]] {.async, gcsafe.} = ## resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online ## messages are stored in the store node's messages field and in the message db ## the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message @@ -457,6 +462,7 @@ proc resume*(ws: WakuStore, peerList: Option[seq[RemotePeerInfo]] = none(seq[Rem save(successResult.value) return ok(added) + # NOTE: Experimental, maybe incorporate as part of query call proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.async, gcsafe.} = # @TODO We need to be more stratigic about which peers we dial. Right now we just set one on the service. @@ -503,7 +509,6 @@ proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHand handler(response.value.response) - # TODO: Remove the following deprecated method proc computeIndex*(msg: WakuMessage, receivedTime = getNanosecondTime(getTime().toUnixFloat()),