diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/waku_store.nim index 950d2eed0..671e8e9b2 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/waku_store.nim @@ -271,6 +271,8 @@ proc encode*(rpc: HistoryRPC): ProtoBuffer = proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} = ## Query history to return a single page of messages matching the query + + info "Finding messages matching received query", query=query ## Extract query criteria ## All query criteria are optional @@ -284,22 +286,30 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} qEndTime = if query.endTime != Timestamp(0): some(query.endTime) else: none(Timestamp) + trace "Combined query criteria into single predicate", contentTopics=qContentTopics, pubsubTopic=qPubSubTopic, startTime=qStartTime, endTime=qEndTime + ## Compose filter predicate for message from query criteria proc matchesQuery(indMsg: IndexedWakuMessage): bool = + trace "Matching indexed message against predicate", msg=indMsg + if qPubSubTopic.isSome(): # filter on pubsub topic if indMsg.pubsubTopic != qPubSubTopic.get(): + trace "Failed to match pubsub topic", criteria=qPubSubTopic.get(), actual=indMsg.pubsubTopic return false if qStartTime.isSome() and qEndTime.isSome(): # temporal filtering # select only messages whose sender generated timestamps fall bw the queried start time and end time + if indMsg.msg.timestamp > qEndTime.get() or indMsg.msg.timestamp < qStartTime.get(): + trace "Failed to match temporal filter", criteriaStart=qStartTime.get(), criteriaEnd=qEndTime.get(), actual=indMsg.msg.timestamp return false if qContentTopics.isSome(): # filter on content if indMsg.msg.contentTopic notin qContentTopics.get(): + trace "Failed to match content topic", criteria=qContentTopics.get(), actual=indMsg.msg.contentTopic return false return true @@ -310,6 +320,7 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} # Build response historyRes = HistoryResponse(messages: wakuMsgList, pagingInfo: updatedPagingInfo, error: error) + trace "Successfully populated a history response", response=historyRes return historyRes proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) = @@ -323,7 +334,7 @@ proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) = return # TODO Print more info here - info "received query" + info "received query", rpc=res.value waku_store_queries.inc() let value = res.value diff --git a/waku/v2/protocol/waku_store/waku_store_types.nim b/waku/v2/protocol/waku_store/waku_store_types.nim index 43aa52765..d07f52c55 100644 --- a/waku/v2/protocol/waku_store/waku_store_types.nim +++ b/waku/v2/protocol/waku_store/waku_store_types.nim @@ -7,6 +7,7 @@ import std/[algorithm, options], # external imports bearssl, + chronicles, libp2p/protocols/protocol, stew/[results, sorted_set], # internal imports @@ -118,6 +119,9 @@ type # StoreQueue helpers # ###################### +logScope: + topics = "wakustorequeue" + proc ffdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage], startCursor: Index): SortedSetResult[Index, IndexedWakuMessage] = @@ -125,6 +129,8 @@ proc ffdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage], ## TODO: can probably improve performance here with a binary/tree search var nextItem = w.first + + trace "Fast forwarding to start cursor", startCursor=startCursor, firstItem=nextItem ## Fast forward until we reach the startCursor while nextItem.isOk: @@ -134,6 +140,7 @@ proc ffdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage], # Not yet at cursor. Continue advancing nextItem = w.next + trace "Continuing ffd to start cursor", nextItem=nextItem return nextItem @@ -145,6 +152,8 @@ proc rwdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage], var prevItem = w.last + trace "Rewinding to start cursor", startCursor=startCursor, lastItem=prevItem + ## Rewind until we reach the startCursor while prevItem.isOk: @@ -154,6 +163,7 @@ proc rwdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage], # Not yet at cursor. Continue rewinding. prevItem = w.prev + trace "Continuing rewind to start cursor", prevItem=prevItem return prevItem @@ -167,6 +177,8 @@ proc fwdPage(storeQueue: StoreQueueRef, ## Page size must not exceed `maxPageSize` ## Each entry must match the `pred` + trace "Retrieving fwd page from store queue", len=storeQueue.items.len, maxPageSize=maxPageSize, startCursor=startCursor + var outSeq: seq[WakuMessage] outPagingInfo: PagingInfo @@ -184,6 +196,7 @@ proc fwdPage(storeQueue: StoreQueueRef, let cursorEntry = w.ffdToCursor(startCursor.get()) if cursorEntry.isErr: # Quick exit here if start cursor not found + trace "Could not find starting cursor. Returning empty result.", startCursor=startCursor outSeq = @[] outPagingInfo = PagingInfo(pageSize: 0, cursor: startCursor.get(), direction: PagingDirection.FORWARD) outError = HistoryResponseError.INVALID_CURSOR @@ -196,13 +209,19 @@ proc fwdPage(storeQueue: StoreQueueRef, lastValidCursor = Index() # No valid (only empty) last cursor currentEntry = w.first + trace "Starting fwd page query", currentEntry=currentEntry + ## This loop walks forward over the queue: ## 1. from the given cursor (or first entry, if not provided) ## 2. adds entries matching the predicate function to output page ## 3. until either the end of the queue or maxPageSize is reached var numberOfItems = 0.uint while currentEntry.isOk and numberOfItems < maxPageSize: + + trace "Continuing fwd page query", currentEntry=currentEntry, numberOfItems=numberOfItems + if pred(currentEntry.value.data): + trace "Current item matches predicate. Adding to output." lastValidCursor = currentEntry.value.key outSeq.add(currentEntry.value.data.msg) numberOfItems += 1 @@ -214,6 +233,8 @@ proc fwdPage(storeQueue: StoreQueueRef, direction: PagingDirection.FORWARD) outError = HistoryResponseError.NONE + + trace "Successfully retrieved fwd page", len=outSeq.len, pagingInfo=outPagingInfo return (outSeq, outPagingInfo, outError) @@ -227,6 +248,8 @@ proc bwdPage(storeQueue: StoreQueueRef, ## Page size must not exceed `maxPageSize` ## Each entry must match the `pred` + trace "Retrieving bwd page from store queue", len=storeQueue.items.len, maxPageSize=maxPageSize, startCursor=startCursor + var outSeq: seq[WakuMessage] outPagingInfo: PagingInfo @@ -244,6 +267,7 @@ proc bwdPage(storeQueue: StoreQueueRef, let cursorEntry = w.rwdToCursor(startCursor.get()) if cursorEntry.isErr: # Quick exit here if start cursor not found + trace "Could not find starting cursor. Returning empty result.", startCursor=startCursor outSeq = @[] outPagingInfo = PagingInfo(pageSize: 0, cursor: startCursor.get(), direction: PagingDirection.BACKWARD) outError = HistoryResponseError.INVALID_CURSOR @@ -255,6 +279,8 @@ proc bwdPage(storeQueue: StoreQueueRef, # Start from the back of the queue lastValidCursor = Index() # No valid (only empty) last cursor currentEntry = w.last + + trace "Starting bwd page query", currentEntry=currentEntry ## This loop walks backward over the queue: ## 1. from the given cursor (or last entry, if not provided) @@ -262,7 +288,11 @@ proc bwdPage(storeQueue: StoreQueueRef, ## 3. until either the beginning of the queue or maxPageSize is reached var numberOfItems = 0.uint while currentEntry.isOk and numberOfItems < maxPageSize: + + trace "Continuing bwd page query", currentEntry=currentEntry, numberOfItems=numberOfItems + if pred(currentEntry.value.data): + trace "Current item matches predicate. Adding to output." lastValidCursor = currentEntry.value.key outSeq.add(currentEntry.value.data.msg) numberOfItems += 1 @@ -274,6 +304,8 @@ proc bwdPage(storeQueue: StoreQueueRef, direction: PagingDirection.BACKWARD) outError = HistoryResponseError.NONE + trace "Successfully retrieved bwd page", len=outSeq.len, pagingInfo=outPagingInfo + return (outSeq.reversed(), # Even if paging backwards, each page should be in forward order outPagingInfo, outError) @@ -338,11 +370,15 @@ proc add*(storeQueue: StoreQueueRef, msg: IndexedWakuMessage): StoreQueueResult[ ## If we're at capacity, we will be removing, ## the oldest (first) item + trace "Adding item to store queue", msg=msg + # TODO the below delete block can be removed if we convert to circular buffer if storeQueue.items.len >= storeQueue.capacity: var w = SortedSetWalkRef[Index, IndexedWakuMessage].init(storeQueue.items) toDelete = w.first + + trace "Store queue at capacity. Deleting oldest item.", oldest=toDelete discard storeQueue.items.delete(toDelete.value.key) w.destroy # better to destroy walker after a delete operation @@ -350,10 +386,13 @@ proc add*(storeQueue: StoreQueueRef, msg: IndexedWakuMessage): StoreQueueResult[ if res.isErr: # This indicates the index already exists in the storeQueue. # TODO: could return error result and log in metrics + + trace "Could not add item to store queue. Index already exists.", index=msg.index return err("duplicate") else: res.value.data = msg + trace "Successfully added item to store queue.", msg=msg return ok() proc getPage*(storeQueue: StoreQueueRef, @@ -363,6 +402,8 @@ proc getPage*(storeQueue: StoreQueueRef, ## Get a single page of history matching the predicate and ## adhering to the pagingInfo parameters + trace "getting page from store queue", len=storeQueue.items.len, pagingInfo=pagingInfo + let cursorOpt = if pagingInfo.cursor == Index(): none(Index) ## TODO: pagingInfo.cursor should be an Option. We shouldn't rely on empty initialisation to determine if set or not! else: some(pagingInfo.cursor)