chore(waku-store): improve logging and query traceability

This commit is contained in:
Lorenzo Delgado 2022-09-02 10:14:58 +02:00 committed by GitHub
parent 982fb08c77
commit 4f3a3da084
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 25 additions and 31 deletions

View File

@ -77,10 +77,7 @@ type
proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} =
## Query history to return a single page of messages matching the query
info "Finding messages matching received query", query=query
## Extract query criteria
## All query criteria are optional
# Extract query criteria. All query criteria are optional
let
qContentTopics = if (query.contentFilters.len != 0): some(query.contentFilters.mapIt(it.contentTopic))
else: none(seq[ContentTopic])
@ -117,8 +114,8 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.}
ascendingOrder = qAscendingOrder
)
# Build response
# TODO: Handle errors
# Build response
# TODO: Improve error reporting
if queryRes.isErr():
return HistoryResponse(messages: @[], pagingInfo: PagingInfo(), error: HistoryResponseError.INVALID_CURSOR)
@ -133,35 +130,34 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.}
proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) =
proc handler(conn: Connection, proto: string) {.async.} =
var message = await conn.readLp(MaxRpcSize.int)
var res = HistoryRPC.init(message)
if res.isErr:
error "failed to decode rpc"
let buf = await conn.readLp(MaxRpcSize.int)
let resReq = HistoryRPC.init(buf)
if resReq.isErr():
error "failed to decode rpc", peerId=conn.peerId
waku_store_errors.inc(labelValues = [decodeRpcFailure])
return
# TODO Print more info here
info "received query", rpc=res.value
let req = resReq.value
info "received history query", peerId=conn.peerId, requestId=req.requestId, query=req.query
waku_store_queries.inc()
let value = res.value
let response = ws.findMessages(res.value.query)
let resp = ws.findMessages(req.query)
# TODO Do accounting here, response is HistoryResponse
# How do we get node or swap context?
if not ws.wakuSwap.isNil:
info "handle store swap test", text=ws.wakuSwap.text
# NOTE Perform accounting operation
info "handle store swap", peerId=conn.peerId, requestId=req.requestId, text=ws.wakuSwap.text
# Perform accounting operation
# TODO: Do accounting here, response is HistoryResponse. How do we get node or swap context?
let peerId = conn.peerId
let messages = response.messages
let messages = resp.messages
ws.wakuSwap.credit(peerId, messages.len)
else:
info "handle store swap is nil"
info "sending response", messages=response.messages.len
info "sending history response", peerId=conn.peerId, requestId=req.requestId, messages=resp.messages.len
await conn.writeLp(HistoryRPC(requestId: value.requestId,
response: response).encode().buffer)
let rpc = HistoryRPC(requestId: req.requestId, response: resp)
await conn.writeLp(rpc.encode().buffer)
ws.handler = handler
ws.codec = WakuStoreCodec
@ -193,12 +189,11 @@ proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) =
proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgContext,
store: MessageStore = nil, wakuSwap: WakuSwap = nil, persistMessages = true,
capacity = StoreDefaultCapacity, isSqliteOnly = false): T =
debug "init"
var output = WakuStore(rng: rng, peerManager: peerManager, store: store, wakuSwap: wakuSwap, persistMessages: persistMessages, isSqliteOnly: isSqliteOnly)
output.init(capacity)
return output
let ws = WakuStore(rng: rng, peerManager: peerManager, store: store, wakuSwap: wakuSwap, persistMessages: persistMessages, isSqliteOnly: isSqliteOnly)
ws.init(capacity)
return ws
# @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY
# TODO: This should probably be an add function and append the peer to an array
proc setPeer*(ws: WakuStore, peer: RemotePeerInfo) =
ws.peerManager.addPeer(peer, WakuStoreCodec)
waku_store_peers.inc()
@ -215,13 +210,12 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} =
pubsubTopic = topic
)
# add message to in-memory store
# Add message to in-memory store
if not w.isSqliteOnly:
# Handle WakuMessage according to store protocol
trace "handle message in WakuStore", topic=topic, msg=msg
let addRes = w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic))
if addRes.isErr():
trace "Attempt to add message to store failed", msg=msg, index=index, err=addRes.error()
waku_store_errors.inc(labelValues = [$(addRes.error())])