From 505fa65855f9c920c439ec5a0c8b4106e4a784f0 Mon Sep 17 00:00:00 2001 From: LNSD Date: Fri, 2 Sep 2022 08:44:11 +0000 Subject: [PATCH] deploy: 4f3a3da084f3150ac643f062f9ac0fb7459e4354 --- .../vendor/libbacktrace-upstream/libtool | 2 +- waku/v2/protocol/waku_store/protocol.nim | 56 +++++++++---------- 2 files changed, 26 insertions(+), 32 deletions(-) diff --git a/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool b/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool index 8ebd2fd07..93e696cc5 100755 --- a/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool +++ b/vendor/nim-libbacktrace/vendor/libbacktrace-upstream/libtool @@ -2,7 +2,7 @@ # libtool - Provide generalized library-building support services. # Generated automatically by config.status (libbacktrace) version-unused -# Libtool was configured on host fv-az220-178: +# Libtool was configured on host fv-az186-467: # NOTE: Changes made to this file will be lost: look at ltmain.sh. # # Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005, diff --git a/waku/v2/protocol/waku_store/protocol.nim b/waku/v2/protocol/waku_store/protocol.nim index f406eda55..ad308967e 100644 --- a/waku/v2/protocol/waku_store/protocol.nim +++ b/waku/v2/protocol/waku_store/protocol.nim @@ -77,10 +77,7 @@ type proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} = ## Query history to return a single page of messages matching the query - info "Finding messages matching received query", query=query - - ## Extract query criteria - ## All query criteria are optional + # Extract query criteria. All query criteria are optional let qContentTopics = if (query.contentFilters.len != 0): some(query.contentFilters.mapIt(it.contentTopic)) else: none(seq[ContentTopic]) @@ -117,8 +114,8 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} ascendingOrder = qAscendingOrder ) -# Build response - # TODO: Handle errors + # Build response + # TODO: Improve error reporting if queryRes.isErr(): return HistoryResponse(messages: @[], pagingInfo: PagingInfo(), error: HistoryResponseError.INVALID_CURSOR) @@ -133,35 +130,34 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) = proc handler(conn: Connection, proto: string) {.async.} = - var message = await conn.readLp(MaxRpcSize.int) - var res = HistoryRPC.init(message) - if res.isErr: - error "failed to decode rpc" + let buf = await conn.readLp(MaxRpcSize.int) + + let resReq = HistoryRPC.init(buf) + if resReq.isErr(): + error "failed to decode rpc", peerId=conn.peerId waku_store_errors.inc(labelValues = [decodeRpcFailure]) return - # TODO Print more info here - info "received query", rpc=res.value + let req = resReq.value + + info "received history query", peerId=conn.peerId, requestId=req.requestId, query=req.query waku_store_queries.inc() - let value = res.value - let response = ws.findMessages(res.value.query) + let resp = ws.findMessages(req.query) - # TODO Do accounting here, response is HistoryResponse - # How do we get node or swap context? if not ws.wakuSwap.isNil: - info "handle store swap test", text=ws.wakuSwap.text - # NOTE Perform accounting operation + info "handle store swap", peerId=conn.peerId, requestId=req.requestId, text=ws.wakuSwap.text + + # Perform accounting operation + # TODO: Do accounting here, response is HistoryResponse. How do we get node or swap context? let peerId = conn.peerId - let messages = response.messages + let messages = resp.messages ws.wakuSwap.credit(peerId, messages.len) - else: - info "handle store swap is nil" - info "sending response", messages=response.messages.len + info "sending history response", peerId=conn.peerId, requestId=req.requestId, messages=resp.messages.len - await conn.writeLp(HistoryRPC(requestId: value.requestId, - response: response).encode().buffer) + let rpc = HistoryRPC(requestId: req.requestId, response: resp) + await conn.writeLp(rpc.encode().buffer) ws.handler = handler ws.codec = WakuStoreCodec @@ -193,12 +189,11 @@ proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) = proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgContext, store: MessageStore = nil, wakuSwap: WakuSwap = nil, persistMessages = true, capacity = StoreDefaultCapacity, isSqliteOnly = false): T = - debug "init" - var output = WakuStore(rng: rng, peerManager: peerManager, store: store, wakuSwap: wakuSwap, persistMessages: persistMessages, isSqliteOnly: isSqliteOnly) - output.init(capacity) - return output + let ws = WakuStore(rng: rng, peerManager: peerManager, store: store, wakuSwap: wakuSwap, persistMessages: persistMessages, isSqliteOnly: isSqliteOnly) + ws.init(capacity) + return ws -# @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY +# TODO: This should probably be an add function and append the peer to an array proc setPeer*(ws: WakuStore, peer: RemotePeerInfo) = ws.peerManager.addPeer(peer, WakuStoreCodec) waku_store_peers.inc() @@ -215,13 +210,12 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} = pubsubTopic = topic ) - # add message to in-memory store + # Add message to in-memory store if not w.isSqliteOnly: # Handle WakuMessage according to store protocol trace "handle message in WakuStore", topic=topic, msg=msg let addRes = w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic)) - if addRes.isErr(): trace "Attempt to add message to store failed", msg=msg, index=index, err=addRes.error() waku_store_errors.inc(labelValues = [$(addRes.error())])