From 886b458ff55d960761906a8b907e5ce192b5ea61 Mon Sep 17 00:00:00 2001 From: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com> Date: Fri, 29 Jan 2021 10:42:41 +0200 Subject: [PATCH] Added waku metrics (#356) --- waku/v2/README.md | 2 +- waku/v2/node/wakunode2.nim | 21 +++++++++++++++++--- waku/v2/protocol/waku_filter/waku_filter.nim | 8 ++++++++ waku/v2/protocol/waku_relay.nim | 2 -- waku/v2/protocol/waku_store/waku_store.nim | 17 ++++++++++++++++ waku/v2/protocol/waku_swap/waku_swap.nim | 5 +++++ 6 files changed, 49 insertions(+), 6 deletions(-) diff --git a/waku/v2/README.md b/waku/v2/README.md index 2e5112aa8..0dd17f978 100644 --- a/waku/v2/README.md +++ b/waku/v2/README.md @@ -87,7 +87,7 @@ prometheus with this config, e.g.: ```bash cd ./metrics/prometheus -prometheus +prometheus --config.file=prometheus.yml ``` A Grafana dashboard containing the example dashboard for each simulation node diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 725e8d0ed..7899bead7 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -1,6 +1,6 @@ import std/[options, tables, strutils, sequtils], - chronos, chronicles, stew/shims/net as stewNet, + chronos, chronicles, metrics, stew/shims/net as stewNet, # TODO: Why do we need eth keys? eth/keys, libp2p/multiaddress, @@ -18,6 +18,10 @@ import ./message_store/message_store, ../utils/requests +declarePublicCounter waku_node_messages, "number of messages received", ["type"] +declarePublicGauge waku_node_filters, "number of content filter subscriptions" +declarePublicGauge waku_node_errors, "number of wakunode errors", ["type"] + logScope: topics = "wakunode" @@ -181,6 +185,8 @@ proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHa id = await node.wakuFilter.subscribe(request) node.filters[id] = Filter(contentFilters: request.contentFilters, handler: handler) + waku_node_filters.set(node.filters.len.int64) + proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) {.async.} = ## Unsubscribes a handler from a PubSub topic. ## @@ -213,6 +219,8 @@ proc unsubscribe*(node: WakuNode, request: FilterRequest) {.async, gcsafe.} = await node.wakuFilter.unsubscribe(request) node.filters.removeContentFilters(request.contentFilters) + waku_node_filters.set(node.filters.len.int64) + proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) {.async, gcsafe.} = ## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a @@ -263,6 +271,7 @@ proc mountFilter*(node: WakuNode) = info "push received" for message in msg.messages: node.filters.notify(message, requestId) + waku_node_messages.inc(labelValues = ["filter"]) node.wakuFilter = WakuFilter.init(node.switch, node.rng, filterHandler) node.switch.mount(node.wakuFilter) @@ -317,6 +326,7 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRela if msg.isOk(): node.filters.notify(msg.value(), "") await node.subscriptions.notify(topic, msg.value()) + waku_node_messages.inc(labelValues = ["relay"]) await node.wakuRelay.subscribe("/waku/2/default-waku/proto", relayHandler) @@ -433,8 +443,11 @@ when isMainModule: {.gcsafe.}: # TODO: libp2p_pubsub_peers is not public, so we need to make this either # public in libp2p or do our own peer counting after all. - let - totalMessages = total_messages.value + var + totalMessages = 0.float64 + + for key in waku_node_messages.metrics.keys(): + totalMessages = totalMessages + waku_node_messages.value(key) info "Node metrics", totalMessages discard setTimer(Moment.fromNow(2.seconds), logMetrics) @@ -467,10 +480,12 @@ when isMainModule: let dbRes = SqliteDatabase.init(conf.dbpath) if dbRes.isErr: warn "failed to init database", err = dbRes.error + waku_node_errors.inc(labelValues = ["init_db_failure"]) let res = WakuMessageStore.init(dbRes.value) if res.isErr: warn "failed to init WakuMessageStore", err = res.error + waku_node_errors.inc(labelValues = ["init_store_failure"]) else: store = res.value diff --git a/waku/v2/protocol/waku_filter/waku_filter.nim b/waku/v2/protocol/waku_filter/waku_filter.nim index 82524e1b4..24200cfd9 100644 --- a/waku/v2/protocol/waku_filter/waku_filter.nim +++ b/waku/v2/protocol/waku_filter/waku_filter.nim @@ -20,6 +20,10 @@ import export waku_filter_types +declarePublicGauge waku_filter_peers, "number of filter peers" +declarePublicGauge waku_filter_subscribers, "number of light node filter subscribers" +declarePublicGauge waku_filter_errors, "number of filter protocol errors", ["type"] + logScope: topics = "wakufilter" @@ -162,6 +166,7 @@ method init*(wf: WakuFilter) = var res = FilterRPC.init(message) if res.isErr: error "failed to decode rpc" + waku_filter_errors.inc(labelValues = ["decode_rpc_failure"]) return info "filter message received" @@ -174,6 +179,8 @@ method init*(wf: WakuFilter) = wf.subscribers.add(Subscriber(peer: conn.peerInfo, requestId: value.requestId, filter: value.request)) else: wf.subscribers.unsubscribeFilters(value.request, conn.peerInfo.peerId) + + waku_filter_subscribers.set(wf.subscribers.len.int64) wf.handler = handle wf.codec = WakuFilterCodec @@ -188,6 +195,7 @@ proc init*(T: type WakuFilter, switch: Switch, rng: ref BrHmacDrbgContext, handl # @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY proc setPeer*(wf: WakuFilter, peer: PeerInfo) = wf.peers.add(FilterPeer(peerInfo: peer)) + waku_filter_peers.inc() proc subscription*(proto: WakuFilter): MessageNotificationSubscription = ## Returns a Filter for the specific protocol diff --git a/waku/v2/protocol/waku_relay.nim b/waku/v2/protocol/waku_relay.nim index 8fe4bddad..ac7382eb3 100644 --- a/waku/v2/protocol/waku_relay.nim +++ b/waku/v2/protocol/waku_relay.nim @@ -9,8 +9,6 @@ import libp2p/protocols/pubsub/rpc/messages, libp2p/stream/connection -declarePublicGauge total_messages, "number of messages received" - logScope: topics = "wakurelay" diff --git a/waku/v2/protocol/waku_store/waku_store.nim b/waku/v2/protocol/waku_store/waku_store.nim index 8b896e24b..1d8824361 100644 --- a/waku/v2/protocol/waku_store/waku_store.nim +++ b/waku/v2/protocol/waku_store/waku_store.nim @@ -19,6 +19,10 @@ import export waku_store_types +declarePublicGauge waku_store_messages, "number of historical messages" +declarePublicGauge waku_store_peers, "number of store peers" +declarePublicGauge waku_store_errors, "number of store protocol errors", ["type"] + logScope: topics = "wakustore" @@ -303,6 +307,7 @@ method init*(ws: WakuStore) = var res = HistoryRPC.init(message) if res.isErr: error "failed to decode rpc" + waku_store_errors.inc(labelValues = ["decode_rpc_failure"]) return info "received query" @@ -336,6 +341,9 @@ method init*(ws: WakuStore) = let res = ws.store.getAll(onData) if res.isErr: warn "failed to load messages from store", err = res.error + waku_store_errors.inc(labelValues = ["store_load_failure"]) + + waku_store_messages.set(ws.messages.len.int64, labelValues = ["stored"]) proc init*(T: type WakuStore, switch: Switch, rng: ref BrHmacDrbgContext, store: MessageStore = nil, wakuSwap: WakuSwap = nil): T = @@ -349,6 +357,7 @@ proc init*(T: type WakuStore, switch: Switch, rng: ref BrHmacDrbgContext, # @TODO THIS SHOULD PROBABLY BE AN ADD FUNCTION AND APPEND THE PEER TO AN ARRAY proc setPeer*(ws: WakuStore, peer: PeerInfo) = ws.peers.add(HistoryPeer(peerInfo: peer)) + waku_store_peers.inc() proc subscription*(proto: WakuStore): MessageNotificationSubscription = ## The filter function returns the pubsub filter for the node. @@ -358,12 +367,14 @@ proc subscription*(proto: WakuStore): MessageNotificationSubscription = proc handle(topic: string, msg: WakuMessage) {.async.} = let index = msg.computeIndex() proto.messages.add(IndexedWakuMessage(msg: msg, index: index)) + waku_store_messages.inc(labelValues = ["stored"]) if proto.store.isNil: return let res = proto.store.put(index, msg) if res.isErr: warn "failed to store messages", err = res.error + waku_store_errors.inc(labelValues = ["store_failure"]) MessageNotificationSubscription.init(@[], handle) @@ -386,8 +397,11 @@ proc query*(w: WakuStore, query: HistoryQuery, handler: QueryHandlerFunc) {.asyn if response.isErr: error "failed to decode response" + waku_store_errors.inc(labelValues = ["decode_rpc_failure"]) return + waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"]) + handler(response.value.response) # NOTE: Experimental, maybe incorporate as part of query call @@ -410,6 +424,7 @@ proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHand if response.isErr: error "failed to decode response" + waku_store_errors.inc(labelValues = ["decode_rpc_failure"]) return # NOTE Perform accounting operation @@ -418,4 +433,6 @@ proc queryWithAccounting*(ws: WakuStore, query: HistoryQuery, handler: QueryHand let messages = response.value.response.messages ws.wakuSwap.debit(peerId, messages.len) + waku_store_messages.set(response.value.response.messages.len.int64, labelValues = ["retrieved"]) + handler(response.value.response) diff --git a/waku/v2/protocol/waku_swap/waku_swap.nim b/waku/v2/protocol/waku_swap/waku_swap.nim index 41cf9c0b1..edb3dc524 100644 --- a/waku/v2/protocol/waku_swap/waku_swap.nim +++ b/waku/v2/protocol/waku_swap/waku_swap.nim @@ -35,6 +35,9 @@ import export waku_swap_types +declarePublicGauge waku_swap_peers, "number of swap peers" +declarePublicGauge waku_swap_errors, "number of swap protocol errors", ["type"] + logScope: topics = "wakuswap" @@ -115,6 +118,7 @@ proc init*(wakuSwap: WakuSwap) = var res = Cheque.init(message) if res.isErr: error "failed to decode rpc" + waku_swap_errors.inc(labelValues = ["decode_rpc_failure"]) return info "received cheque", value=res.value @@ -171,6 +175,7 @@ proc init*(T: type WakuSwap, switch: Switch, rng: ref BrHmacDrbgContext): T = proc setPeer*(ws: WakuSwap, peer: PeerInfo) = ws.peers.add(SwapPeer(peerInfo: peer)) + waku_swap_peers.inc() # TODO End to end communication