deploy: 4f3a3da084f3150ac643f062f9ac0fb7459e4354

This commit is contained in:
LNSD 2022-09-02 08:44:11 +00:00
parent 94ac63a6bc
commit 505fa65855
2 changed files with 26 additions and 32 deletions

View File

@ -2,7 +2,7 @@
# libtool - Provide generalized library-building support services. # libtool - Provide generalized library-building support services.
# Generated automatically by config.status (libbacktrace) version-unused # Generated automatically by config.status (libbacktrace) version-unused
# Libtool was configured on host fv-az220-178: # Libtool was configured on host fv-az186-467:
# NOTE: Changes made to this file will be lost: look at ltmain.sh. # NOTE: Changes made to this file will be lost: look at ltmain.sh.
# #
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005, # Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,

View File

@ -77,10 +77,7 @@ type
proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} = proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.} =
## Query history to return a single page of messages matching the query ## Query history to return a single page of messages matching the query
info "Finding messages matching received query", query=query # Extract query criteria. All query criteria are optional
## Extract query criteria
## All query criteria are optional
let let
qContentTopics = if (query.contentFilters.len != 0): some(query.contentFilters.mapIt(it.contentTopic)) qContentTopics = if (query.contentFilters.len != 0): some(query.contentFilters.mapIt(it.contentTopic))
else: none(seq[ContentTopic]) else: none(seq[ContentTopic])
@ -117,8 +114,8 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.}
ascendingOrder = qAscendingOrder ascendingOrder = qAscendingOrder
) )
# Build response # Build response
# TODO: Handle errors # TODO: Improve error reporting
if queryRes.isErr(): if queryRes.isErr():
return HistoryResponse(messages: @[], pagingInfo: PagingInfo(), error: HistoryResponseError.INVALID_CURSOR) return HistoryResponse(messages: @[], pagingInfo: PagingInfo(), error: HistoryResponseError.INVALID_CURSOR)
@ -133,35 +130,34 @@ proc findMessages(w: WakuStore, query: HistoryQuery): HistoryResponse {.gcsafe.}
proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) = proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) =
proc handler(conn: Connection, proto: string) {.async.} = proc handler(conn: Connection, proto: string) {.async.} =
var message = await conn.readLp(MaxRpcSize.int) let buf = await conn.readLp(MaxRpcSize.int)
var res = HistoryRPC.init(message)
if res.isErr: let resReq = HistoryRPC.init(buf)
error "failed to decode rpc" if resReq.isErr():
error "failed to decode rpc", peerId=conn.peerId
waku_store_errors.inc(labelValues = [decodeRpcFailure]) waku_store_errors.inc(labelValues = [decodeRpcFailure])
return return
# TODO Print more info here let req = resReq.value
info "received query", rpc=res.value
info "received history query", peerId=conn.peerId, requestId=req.requestId, query=req.query
waku_store_queries.inc() waku_store_queries.inc()
let value = res.value let resp = ws.findMessages(req.query)
let response = ws.findMessages(res.value.query)
# TODO Do accounting here, response is HistoryResponse
# How do we get node or swap context?
if not ws.wakuSwap.isNil: if not ws.wakuSwap.isNil:
info "handle store swap test", text=ws.wakuSwap.text info "handle store swap", peerId=conn.peerId, requestId=req.requestId, text=ws.wakuSwap.text
# NOTE Perform accounting operation
# Perform accounting operation
# TODO: Do accounting here, response is HistoryResponse. How do we get node or swap context?
let peerId = conn.peerId let peerId = conn.peerId
let messages = response.messages let messages = resp.messages
ws.wakuSwap.credit(peerId, messages.len) ws.wakuSwap.credit(peerId, messages.len)
else:
info "handle store swap is nil"
info "sending response", messages=response.messages.len info "sending history response", peerId=conn.peerId, requestId=req.requestId, messages=resp.messages.len
await conn.writeLp(HistoryRPC(requestId: value.requestId, let rpc = HistoryRPC(requestId: req.requestId, response: resp)
response: response).encode().buffer) await conn.writeLp(rpc.encode().buffer)
ws.handler = handler ws.handler = handler
ws.codec = WakuStoreCodec ws.codec = WakuStoreCodec
@ -193,12 +189,11 @@ proc init*(ws: WakuStore, capacity = StoreDefaultCapacity) =
proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgContext, proc init*(T: type WakuStore, peerManager: PeerManager, rng: ref BrHmacDrbgContext,
store: MessageStore = nil, wakuSwap: WakuSwap = nil, persistMessages = true, store: MessageStore = nil, wakuSwap: WakuSwap = nil, persistMessages = true,
capacity = StoreDefaultCapacity, isSqliteOnly = false): T = capacity = StoreDefaultCapacity, isSqliteOnly = false): T =
debug "init" let ws = WakuStore(rng: rng, peerManager: peerManager, store: store, wakuSwap: wakuSwap, persistMessages: persistMessages, isSqliteOnly: isSqliteOnly)
var output = WakuStore(rng: rng, peerManager: peerManager, store: store, wakuSwap: wakuSwap, persistMessages: persistMessages, isSqliteOnly: isSqliteOnly) ws.init(capacity)
output.init(capacity) return ws
return output
# @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY # TODO: This should probably be an add function and append the peer to an array
proc setPeer*(ws: WakuStore, peer: RemotePeerInfo) = proc setPeer*(ws: WakuStore, peer: RemotePeerInfo) =
ws.peerManager.addPeer(peer, WakuStoreCodec) ws.peerManager.addPeer(peer, WakuStoreCodec)
waku_store_peers.inc() waku_store_peers.inc()
@ -215,13 +210,12 @@ proc handleMessage*(w: WakuStore, topic: string, msg: WakuMessage) {.async.} =
pubsubTopic = topic pubsubTopic = topic
) )
# add message to in-memory store # Add message to in-memory store
if not w.isSqliteOnly: if not w.isSqliteOnly:
# Handle WakuMessage according to store protocol # Handle WakuMessage according to store protocol
trace "handle message in WakuStore", topic=topic, msg=msg trace "handle message in WakuStore", topic=topic, msg=msg
let addRes = w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic)) let addRes = w.messages.add(IndexedWakuMessage(msg: msg, index: index, pubsubTopic: topic))
if addRes.isErr(): if addRes.isErr():
trace "Attempt to add message to store failed", msg=msg, index=index, err=addRes.error() trace "Attempt to add message to store failed", msg=msg, index=index, err=addRes.error()
waku_store_errors.inc(labelValues = [$(addRes.error())]) waku_store_errors.inc(labelValues = [$(addRes.error())])