Increase store and store queue logging (#887)

This commit is contained in:
Hanno Cornelius 2022-03-11 07:26:15 +02:00 committed by GitHub
parent 0d2a8b6ddf
commit ea72a65ef8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 53 additions and 1 deletions

View File

@ -271,6 +271,8 @@ proc encode*(rpc: HistoryRPC): ProtoBuffer =
proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} =
## Query history to return a single page of messages matching the query
info "Finding messages matching received query", query=query
## Extract query criteria
## All query criteria are optional
@ -284,22 +286,30 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.}
qEndTime = if query.endTime != Timestamp(0): some(query.endTime)
else: none(Timestamp)
trace "Combined query criteria into single predicate", contentTopics=qContentTopics, pubsubTopic=qPubSubTopic, startTime=qStartTime, endTime=qEndTime
## Compose filter predicate for message from query criteria
proc matchesQuery(indMsg: IndexedWakuMessage): bool =
trace "Matching indexed message against predicate", msg=indMsg
if qPubSubTopic.isSome():
# filter on pubsub topic
if indMsg.pubsubTopic != qPubSubTopic.get():
trace "Failed to match pubsub topic", criteria=qPubSubTopic.get(), actual=indMsg.pubsubTopic
return false
if qStartTime.isSome() and qEndTime.isSome():
# temporal filtering
# select only messages whose sender generated timestamps fall bw the queried start time and end time
if indMsg.msg.timestamp > qEndTime.get() or indMsg.msg.timestamp < qStartTime.get():
trace "Failed to match temporal filter", criteriaStart=qStartTime.get(), criteriaEnd=qEndTime.get(), actual=indMsg.msg.timestamp
return false
if qContentTopics.isSome():
# filter on content
if indMsg.msg.contentTopic notin qContentTopics.get():
trace "Failed to match content topic", criteria=qContentTopics.get(), actual=indMsg.msg.contentTopic
return false
return true
@ -310,6 +320,7 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.}
# Build response
historyRes = HistoryResponse(messages: wakuMsgList, pagingInfo: updatedPagingInfo, error: error)
trace "Successfully populated a history response", response=historyRes
return historyRes
proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) =
@ -323,7 +334,7 @@ proc init*(ws: WakuStore, capacity = DefaultStoreCapacity) =
return
# TODO Print more info here
info "received query"
info "received query", rpc=res.value
waku_store_queries.inc()
let value = res.value

View File

@ -7,6 +7,7 @@ import
std/[algorithm, options],
# external imports
bearssl,
chronicles,
libp2p/protocols/protocol,
stew/[results, sorted_set],
# internal imports
@ -118,6 +119,9 @@ type
# StoreQueue helpers #
######################
logScope:
topics = "wakustorequeue"
proc ffdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage],
startCursor: Index):
SortedSetResult[Index, IndexedWakuMessage] =
@ -125,6 +129,8 @@ proc ffdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage],
## TODO: can probably improve performance here with a binary/tree search
var nextItem = w.first
trace "Fast forwarding to start cursor", startCursor=startCursor, firstItem=nextItem
## Fast forward until we reach the startCursor
while nextItem.isOk:
@ -134,6 +140,7 @@ proc ffdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage],
# Not yet at cursor. Continue advancing
nextItem = w.next
trace "Continuing ffd to start cursor", nextItem=nextItem
return nextItem
@ -145,6 +152,8 @@ proc rwdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage],
var prevItem = w.last
trace "Rewinding to start cursor", startCursor=startCursor, lastItem=prevItem
## Rewind until we reach the startCursor
while prevItem.isOk:
@ -154,6 +163,7 @@ proc rwdToCursor(w: SortedSetWalkRef[Index, IndexedWakuMessage],
# Not yet at cursor. Continue rewinding.
prevItem = w.prev
trace "Continuing rewind to start cursor", prevItem=prevItem
return prevItem
@ -167,6 +177,8 @@ proc fwdPage(storeQueue: StoreQueueRef,
## Page size must not exceed `maxPageSize`
## Each entry must match the `pred`
trace "Retrieving fwd page from store queue", len=storeQueue.items.len, maxPageSize=maxPageSize, startCursor=startCursor
var
outSeq: seq[WakuMessage]
outPagingInfo: PagingInfo
@ -184,6 +196,7 @@ proc fwdPage(storeQueue: StoreQueueRef,
let cursorEntry = w.ffdToCursor(startCursor.get())
if cursorEntry.isErr:
# Quick exit here if start cursor not found
trace "Could not find starting cursor. Returning empty result.", startCursor=startCursor
outSeq = @[]
outPagingInfo = PagingInfo(pageSize: 0, cursor: startCursor.get(), direction: PagingDirection.FORWARD)
outError = HistoryResponseError.INVALID_CURSOR
@ -196,13 +209,19 @@ proc fwdPage(storeQueue: StoreQueueRef,
lastValidCursor = Index() # No valid (only empty) last cursor
currentEntry = w.first
trace "Starting fwd page query", currentEntry=currentEntry
## This loop walks forward over the queue:
## 1. from the given cursor (or first entry, if not provided)
## 2. adds entries matching the predicate function to output page
## 3. until either the end of the queue or maxPageSize is reached
var numberOfItems = 0.uint
while currentEntry.isOk and numberOfItems < maxPageSize:
trace "Continuing fwd page query", currentEntry=currentEntry, numberOfItems=numberOfItems
if pred(currentEntry.value.data):
trace "Current item matches predicate. Adding to output."
lastValidCursor = currentEntry.value.key
outSeq.add(currentEntry.value.data.msg)
numberOfItems += 1
@ -214,6 +233,8 @@ proc fwdPage(storeQueue: StoreQueueRef,
direction: PagingDirection.FORWARD)
outError = HistoryResponseError.NONE
trace "Successfully retrieved fwd page", len=outSeq.len, pagingInfo=outPagingInfo
return (outSeq, outPagingInfo, outError)
@ -227,6 +248,8 @@ proc bwdPage(storeQueue: StoreQueueRef,
## Page size must not exceed `maxPageSize`
## Each entry must match the `pred`
trace "Retrieving bwd page from store queue", len=storeQueue.items.len, maxPageSize=maxPageSize, startCursor=startCursor
var
outSeq: seq[WakuMessage]
outPagingInfo: PagingInfo
@ -244,6 +267,7 @@ proc bwdPage(storeQueue: StoreQueueRef,
let cursorEntry = w.rwdToCursor(startCursor.get())
if cursorEntry.isErr:
# Quick exit here if start cursor not found
trace "Could not find starting cursor. Returning empty result.", startCursor=startCursor
outSeq = @[]
outPagingInfo = PagingInfo(pageSize: 0, cursor: startCursor.get(), direction: PagingDirection.BACKWARD)
outError = HistoryResponseError.INVALID_CURSOR
@ -255,6 +279,8 @@ proc bwdPage(storeQueue: StoreQueueRef,
# Start from the back of the queue
lastValidCursor = Index() # No valid (only empty) last cursor
currentEntry = w.last
trace "Starting bwd page query", currentEntry=currentEntry
## This loop walks backward over the queue:
## 1. from the given cursor (or last entry, if not provided)
@ -262,7 +288,11 @@ proc bwdPage(storeQueue: StoreQueueRef,
## 3. until either the beginning of the queue or maxPageSize is reached
var numberOfItems = 0.uint
while currentEntry.isOk and numberOfItems < maxPageSize:
trace "Continuing bwd page query", currentEntry=currentEntry, numberOfItems=numberOfItems
if pred(currentEntry.value.data):
trace "Current item matches predicate. Adding to output."
lastValidCursor = currentEntry.value.key
outSeq.add(currentEntry.value.data.msg)
numberOfItems += 1
@ -274,6 +304,8 @@ proc bwdPage(storeQueue: StoreQueueRef,
direction: PagingDirection.BACKWARD)
outError = HistoryResponseError.NONE
trace "Successfully retrieved bwd page", len=outSeq.len, pagingInfo=outPagingInfo
return (outSeq.reversed(), # Even if paging backwards, each page should be in forward order
outPagingInfo,
outError)
@ -338,11 +370,15 @@ proc add*(storeQueue: StoreQueueRef, msg: IndexedWakuMessage): StoreQueueResult[
## If we're at capacity, we will be removing,
## the oldest (first) item
trace "Adding item to store queue", msg=msg
# TODO the below delete block can be removed if we convert to circular buffer
if storeQueue.items.len >= storeQueue.capacity:
var
w = SortedSetWalkRef[Index, IndexedWakuMessage].init(storeQueue.items)
toDelete = w.first
trace "Store queue at capacity. Deleting oldest item.", oldest=toDelete
discard storeQueue.items.delete(toDelete.value.key)
w.destroy # better to destroy walker after a delete operation
@ -350,10 +386,13 @@ proc add*(storeQueue: StoreQueueRef, msg: IndexedWakuMessage): StoreQueueResult[
if res.isErr:
# This indicates the index already exists in the storeQueue.
# TODO: could return error result and log in metrics
trace "Could not add item to store queue. Index already exists.", index=msg.index
return err("duplicate")
else:
res.value.data = msg
trace "Successfully added item to store queue.", msg=msg
return ok()
proc getPage*(storeQueue: StoreQueueRef,
@ -363,6 +402,8 @@ proc getPage*(storeQueue: StoreQueueRef,
## Get a single page of history matching the predicate and
## adhering to the pagingInfo parameters
trace "getting page from store queue", len=storeQueue.items.len, pagingInfo=pagingInfo
let
cursorOpt = if pagingInfo.cursor == Index(): none(Index) ## TODO: pagingInfo.cursor should be an Option. We shouldn't rely on empty initialisation to determine if set or not!
else: some(pagingInfo.cursor)