Improve node metrics (#831)

* Improve node metrics

* Add support for empty content topics as a separate label
This commit is contained in:
Hanno Cornelius 2022-01-26 12:02:57 +01:00 committed by GitHub
parent d45e3c8f7a
commit 4421b8de00
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 39 additions and 23 deletions

View File

@ -474,8 +474,8 @@ procSuite "Waku v2 JSON-RPC API":
node3.mountRelay()
# Dial nodes 2 and 3 from node1
await node1.dialPeer(constructMultiaddrStr(peerInfo2))
await node1.dialPeer(constructMultiaddrStr(peerInfo3))
await node1.connectToNodes(@[constructMultiaddrStr(peerInfo2)])
await node1.connectToNodes(@[constructMultiaddrStr(peerInfo3)])
# RPC server setup
let

View File

@ -41,9 +41,10 @@ when defined(rln):
web3,
../protocol/waku_rln_relay/[rln, waku_rln_relay_utils]
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
declarePublicCounter waku_node_messages, "number of messages received", ["type", "contentTopic"]
declarePublicGauge waku_node_filters, "number of content filter subscriptions"
declarePublicGauge waku_node_errors, "number of wakunode errors", ["type"]
declarePublicCounter waku_node_conns_initiated, "number of connections initiated by this node", ["source"]
logScope:
topics = "wakunode"
@ -251,7 +252,10 @@ proc subscribe(node: WakuNode, topic: Topic, handler: Option[TopicHandler]) =
if (not node.wakuStore.isNil):
await node.wakuStore.handleMessage(topic, msg.value())
waku_node_messages.inc(labelValues = ["relay"])
# Increase message counter
let ctLabel = if msg.value().contentTopic.len > 0: msg.value().contentTopic
else: "none"
waku_node_messages.inc(labelValues = ["relay", ctLabel])
let wakuRelay = node.wakuRelay
@ -433,7 +437,11 @@ proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {
info "push received"
for message in msg.messages:
node.filters.notify(message, requestId) # Trigger filter handlers on a light node
waku_node_messages.inc(labelValues = ["filter"])
# Increase message counter
let ctLabel = if message.contentTopic.len > 0: message.contentTopic
else: "none"
waku_node_messages.inc(labelValues = ["filter", ctLabel])
node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler, filterTimeout)
node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterCodec))
@ -721,15 +729,20 @@ proc startKeepalive*(node: WakuNode) =
asyncSpawn node.keepaliveLoop(defaultKeepalive)
## Helpers
proc dialPeer*(n: WakuNode, address: string) {.async.} =
info "dialPeer", address = address
# XXX: This turns ipfs into p2p, not quite sure why
let remotePeer = parseRemotePeerInfo(address)
proc connectToNode(n: WakuNode, remotePeer: RemotePeerInfo, source = "api") {.async.} =
## `source` indicates source of node addrs (static config, api call, discovery, etc)
info "Connecting to node", remotePeer = remotePeer, source = source
info "Dialing peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId
# NOTE This is dialing on WakuRelay protocol specifically
discard await n.peerManager.dialPeer(remotePeer, WakuRelayCodec)
info "Post peerManager dial"
info "Attempting dial", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId
let connOpt = await n.peerManager.dialPeer(remotePeer, WakuRelayCodec)
if connOpt.isSome():
info "Successfully connected to peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId
waku_node_conns_initiated.inc(labelValues = [source])
else:
error "Failed to connect to peer", wireAddr = remotePeer.addrs[0], peerId = remotePeer.peerId
waku_node_errors.inc(labelValues = ["conn_init_failure"])
proc setStorePeer*(n: WakuNode, address: string) {.raises: [Defect, ValueError, LPError].} =
info "Set store peer", address = address
@ -752,11 +765,12 @@ proc setLightPushPeer*(n: WakuNode, address: string) {.raises: [Defect, ValueErr
n.wakuLightPush.setPeer(remotePeer)
proc connectToNodes*(n: WakuNode, nodes: seq[string]) {.async.} =
proc connectToNodes*(n: WakuNode, nodes: seq[string], source = "api") {.async.} =
## `source` indicates source of node addrs (static config, api call, discovery, etc)
info "connectToNodes", len = nodes.len
for nodeId in nodes:
info "connectToNodes", node = nodeId
# XXX: This seems...brittle
await dialPeer(n, nodeId)
await connectToNode(n, parseRemotePeerInfo(nodeId), source)
# The issue seems to be around peers not being fully connected when
# trying to subscribe. So what we do is sleep to guarantee nodes are
@ -766,10 +780,12 @@ proc connectToNodes*(n: WakuNode, nodes: seq[string]) {.async.} =
# later.
await sleepAsync(5.seconds)
proc connectToNodes*(n: WakuNode, nodes: seq[RemotePeerInfo]) {.async.} =
proc connectToNodes*(n: WakuNode, nodes: seq[RemotePeerInfo], source = "api") {.async.} =
## `source` indicates source of node addrs (static config, api call, discovery, etc)
info "connectToNodes", len = nodes.len
for remotePeerInfo in nodes:
info "connectToNodes", peer = remotePeerInfo
discard await n.peerManager.dialPeer(remotePeerInfo, WakuRelayCodec)
await connectToNode(n, remotePeerInfo, source)
# The issue seems to be around peers not being fully connected when
# trying to subscribe. So what we do is sleep to guarantee nodes are
@ -804,7 +820,7 @@ proc runDiscv5Loop(node: WakuNode) {.async.} =
if newPeers.len > 0:
debug "Connecting to newly discovered peers", count=newPeers.len()
await connectToNodes(node, newPeers)
await connectToNodes(node, newPeers, "discv5")
# Discovery `queryRandom` can have a synchronous fast path for example
# when no peers are in the routing table. Don't run it in continuous loop.
@ -1113,7 +1129,7 @@ when isMainModule:
# Connect to configured static nodes
if conf.staticnodes.len > 0:
waitFor connectToNodes(node, conf.staticnodes)
waitFor connectToNodes(node, conf.staticnodes, "static")
# Connect to discovered nodes
if conf.dnsDiscovery and conf.dnsDiscoveryUrl != "":
@ -1136,7 +1152,7 @@ when isMainModule:
let discoveredPeers = wakuDnsDiscovery.get().findPeers()
if discoveredPeers.isOk:
info "Connecting to discovered peers"
waitFor connectToNodes(node, discoveredPeers.get())
waitFor connectToNodes(node, discoveredPeers.get(), "dnsdisc")
else:
warn "Failed to init Waku DNS discovery"