Added waku metrics (#356)

This commit is contained in:
Hanno Cornelius 2021-01-29 10:42:41 +02:00 committed by GitHub
parent ed979a7d60
commit 886b458ff5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 49 additions and 6 deletions

View File

@ -87,7 +87,7 @@ prometheus with this config, e.g.:
```bash
cd ./metrics/prometheus
prometheus
prometheus --config.file=prometheus.yml
```
A Grafana dashboard containing the example dashboard for each simulation node

View File

@ -1,6 +1,6 @@
import
std/[options, tables, strutils, sequtils],
chronos, chronicles, stew/shims/net as stewNet,
chronos, chronicles, metrics, stew/shims/net as stewNet,
# TODO: Why do we need eth keys?
eth/keys,
libp2p/multiaddress,
@ -18,6 +18,10 @@ import
./message_store/message_store,
../utils/requests
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
declarePublicGauge waku_node_filters, "number of content filter subscriptions"
declarePublicGauge waku_node_errors, "number of wakunode errors", ["type"]
logScope:
topics = "wakunode"
@ -181,6 +185,8 @@ proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHa
id = await node.wakuFilter.subscribe(request)
node.filters[id] = Filter(contentFilters: request.contentFilters, handler: handler)
waku_node_filters.set(node.filters.len.int64)
proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) {.async.} =
## Unsubscribes a handler from a PubSub topic.
##
@ -213,6 +219,8 @@ proc unsubscribe*(node: WakuNode, request: FilterRequest) {.async, gcsafe.} =
await node.wakuFilter.unsubscribe(request)
node.filters.removeContentFilters(request.contentFilters)
waku_node_filters.set(node.filters.len.int64)
proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) {.async, gcsafe.} =
## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a
@ -263,6 +271,7 @@ proc mountFilter*(node: WakuNode) =
info "push received"
for message in msg.messages:
node.filters.notify(message, requestId)
waku_node_messages.inc(labelValues = ["filter"])
node.wakuFilter = WakuFilter.init(node.switch, node.rng, filterHandler)
node.switch.mount(node.wakuFilter)
@ -317,6 +326,7 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRela
if msg.isOk():
node.filters.notify(msg.value(), "")
await node.subscriptions.notify(topic, msg.value())
waku_node_messages.inc(labelValues = ["relay"])
await node.wakuRelay.subscribe("/waku/2/default-waku/proto", relayHandler)
@ -433,8 +443,11 @@ when isMainModule:
{.gcsafe.}:
# TODO: libp2p_pubsub_peers is not public, so we need to make this either
# public in libp2p or do our own peer counting after all.
let
totalMessages = total_messages.value
var
totalMessages = 0.float64
for key in waku_node_messages.metrics.keys():
totalMessages = totalMessages + waku_node_messages.value(key)
info "Node metrics", totalMessages
discard setTimer(Moment.fromNow(2.seconds), logMetrics)
@ -467,10 +480,12 @@ when isMainModule:
let dbRes = SqliteDatabase.init(conf.dbpath)
if dbRes.isErr:
warn "failed to init database", err = dbRes.error
waku_node_errors.inc(labelValues = ["init_db_failure"])
let res = WakuMessageStore.init(dbRes.value)
if res.isErr:
warn "failed to init WakuMessageStore", err = res.error
waku_node_errors.inc(labelValues = ["init_store_failure"])
else:
store = res.value

View File

@ -20,6 +20,10 @@ import
export waku_filter_types
declarePublicGauge waku_filter_peers, "number of filter peers"
declarePublicGauge waku_filter_subscribers, "number of light node filter subscribers"
declarePublicGauge waku_filter_errors, "number of filter protocol errors", ["type"]
logScope:
topics = "wakufilter"
@ -162,6 +166,7 @@ method init*(wf: WakuFilter) =
var res = FilterRPC.init(message)
if res.isErr:
error "failed to decode rpc"
waku_filter_errors.inc(labelValues = ["decode_rpc_failure"])
return
info "filter message received"
@ -174,6 +179,8 @@ method init*(wf: WakuFilter) =
wf.subscribers.add(Subscriber(peer: conn.peerInfo, requestId: value.requestId, filter: value.request))
else:
wf.subscribers.unsubscribeFilters(value.request, conn.peerInfo.peerId)
waku_filter_subscribers.set(wf.subscribers.len.int64)
wf.handler = handle
wf.codec = WakuFilterCodec
@ -188,6 +195,7 @@ proc init*(T: type WakuFilter, switch: Switch, rng: ref BrHmacDrbgContext, handl
# @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY
proc setPeer*(wf: WakuFilter, peer: PeerInfo) =
wf.peers.add(FilterPeer(peerInfo: peer))
waku_filter_peers.inc()
proc subscription*(proto: WakuFilter): MessageNotificationSubscription =
## Returns a Filter for the specific protocol

View File

@ -9,8 +9,6 @@ import
libp2p/protocols/pubsub/rpc/messages,
libp2p/stream/connection
declarePublicGauge total_messages, "number of messages received"
logScope:
topics = "wakurelay"

View File

@ -19,6 +19,10 @@ import
export waku_store_types
declarePublicGauge waku_store_messages, "number of historical messages"
declarePublicGauge waku_store_peers, "number of store peers"
declarePublicGauge waku_store_errors, "number of store protocol errors", ["type"]
logScope:
topics = "wakustore"
@ -303,6 +307,7 @@ method init*(ws: WakuStore) =
var res = HistoryRPC.init(message)
if res.isErr:
error "failed to decode rpc"
waku_store_errors.inc(labelValues = ["decode_rpc_failure"])
return
info "received query"
@ -336,6 +341,9 @@ method init*(ws: WakuStore) =
let res = ws.store.getAll(onData)
if res.isErr:
warn "failed to load messages from store", err = res.error
waku_store_errors.inc(labelValues = ["store_load_failure"])
waku_store_messages.set(ws.messages.len.int64, labelValues = ["stored"])
proc init*(T: type WakuStore, switch: Switch, rng: ref BrHmacDrbgContext,
store: MessageStore = nil, wakuSwap: WakuSwap = nil): T =
@ -349,6 +357,7 @@ proc init*(T: type WakuStore, switch: Switch, rng: ref BrHmacDrbgContext,
# @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY
proc setPeer*(ws: WakuStore, peer: PeerInfo) =
ws.peers.add(HistoryPeer(peerInfo: peer))
waku_store_peers.inc()
proc subscription*(proto: WakuStore): MessageNotificationSubscription =
## The filter function returns the pubsub filter for the node.
@ -358,12 +367,14 @@ proc subscription*(proto: WakuStore): MessageNotificationSubscription =
proc handle(topic: string, msg: WakuMessage) {.async.} =
let index = msg.computeIndex()
proto.messages.add(IndexedWakuMessage(msg: msg, index: index))
waku_store_messages.inc(labelValues = ["stored"])
if proto.store.isNil:
return
let res = proto.store.put(index, msg)
if res.isErr:
warn "failed to store messages", err = res.error
waku_store_errors.inc(labelValues = ["store_failure"])
MessageNotificationSubscription.init(@[], handle)
@ -386,8 +397,11 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn
if response.isErr:
error "failed to decode response"
waku_store_errors.inc(labelValues = ["decode_rpc_failure"])
return
waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"])
handler(response.value.response)
# NOTE: Experimental, maybe incorporate as part of query call
@ -410,6 +424,7 @@ proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHand
if response.isErr:
error "failed to decode response"
waku_store_errors.inc(labelValues = ["decode_rpc_failure"])
return
# NOTE Perform accounting operation
@ -418,4 +433,6 @@ proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHand
let messages = response.value.response.messages
ws.wakuSwap.debit(peerId, messages.len)
waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"])
handler(response.value.response)

View File

@ -35,6 +35,9 @@ import
export waku_swap_types
declarePublicGauge waku_swap_peers, "number of swap peers"
declarePublicGauge waku_swap_errors, "number of swap protocol errors", ["type"]
logScope:
topics = "wakuswap"
@ -115,6 +118,7 @@ proc init*(wakuSwap: WakuSwap) =
var res = Cheque.init(message)
if res.isErr:
error "failed to decode rpc"
waku_swap_errors.inc(labelValues = ["decode_rpc_failure"])
return
info "received cheque", value=res.value
@ -171,6 +175,7 @@ proc init*(T: type WakuSwap, switch: Switch, rng: ref BrHmacDrbgContext): T =
proc setPeer*(ws: WakuSwap, peer: PeerInfo) =
ws.peers.add(SwapPeer(peerInfo: peer))
waku_swap_peers.inc()
# TODO End to end communication