Basic metrics print node metrics

This commit is contained in:
Oskar Thoren 2020-05-27 12:00:50 +08:00
parent d68f6aedd2
commit 57c9ee1138
No known key found for this signature in database
GPG Key ID: B2ECCFD3BC2EF77E
2 changed files with 29 additions and 22 deletions

View File

@ -1,7 +1,7 @@
import
confutils, config, strutils, chronos, json_rpc/rpcserver, metrics,
chronicles/topics_registry, # TODO: What? Need this for setLoglevel, weird.
eth/[keys, p2p, async_utils], eth/common/utils, eth/net/nat,
eth/[keys, p2p, async_utils], eth/net/nat,
eth/p2p/[discovery, enode, peer_pool, bootnodes, whispernodes],
# TODO remove me
../v1/rpc/[wakusim, key_storage],
@ -196,6 +196,21 @@ proc run(config: WakuNodeConf) =
# Optionally direct connect with a set of nodes
if config.staticnodes.len > 0: connectToNodes(wakuProto, config.staticnodes)
### XXX: Fixup
if config.logMetrics:
proc logMetrics(udata: pointer) {.closure, gcsafe.} =
{.gcsafe.}:
let
connectedPeers = connected_peers.value
totalMessages = total_messages.value
# NOTE: Just message volume for now, no valid/invalid envelopes
info "Node metrics", connectedPeers, totalMessages
addTimer(Moment.fromNow(2.seconds), logMetrics)
addTimer(Moment.fromNow(2.seconds), logMetrics)
runForever()
# Here directchat uses rwLoop for protocol
# What if we dial here? How dial?
# Feels very ghetto, this hookup
@ -273,27 +288,7 @@ proc run(config: WakuNodeConf) =
# port = config.metricsServerPort + config.portsShift
# info "Starting metrics HTTP server", address, port
# metrics.startHttpServer($address, Port(port))
#
# if config.logMetrics:
# proc logMetrics(udata: pointer) {.closure, gcsafe.} =
# {.gcsafe.}:
# let
# connectedPeers = connected_peers.value
# validEnvelopes = waku_protocol.valid_envelopes.value
# invalidEnvelopes = waku_protocol.dropped_expired_envelopes.value +
# waku_protocol.dropped_from_future_envelopes.value +
# waku_protocol.dropped_low_pow_envelopes.value +
# waku_protocol.dropped_too_large_envelopes.value +
# waku_protocol.dropped_bloom_filter_mismatch_envelopes.value +
# waku_protocol.dropped_topic_mismatch_envelopes.value +
# waku_protocol.dropped_benign_duplicate_envelopes.value +
# waku_protocol.dropped_duplicate_envelopes.value
#
# info "Node metrics", connectedPeers, validEnvelopes, invalidEnvelopes
# addTimer(Moment.fromNow(2.seconds), logMetrics)
# addTimer(Moment.fromNow(2.seconds), logMetrics)
#
runForever()
when isMainModule:
let conf = WakuNodeConf.load()

View File

@ -11,6 +11,11 @@ import libp2p/protocols/pubsub/pubsub,
libp2p/protocols/pubsub/rpc/[messages],
libp2p/connection
import metrics
declarePublicGauge connected_peers, "number of peers in the pool" # XXX
declarePublicGauge total_messages, "number of messages received"
logScope:
topic = "WakuSub"
@ -33,6 +38,9 @@ method init(w: WakuSub) =
##
debug "Incoming WakuSub connection"
# XXX: Increment connectedPeers counter, unclear if this is the right place tho
# Where is the disconnect event?
connected_peers.inc()
await w.handleConn(conn, proto)
# XXX: Handler hijack FloodSub here?
@ -62,6 +70,7 @@ method subscribeTopic*(w: WakuSub,
debug "subscribeTopic", topic=topic, subscribe=subscribe, peerId=peerId
procCall FloodSub(w).subscribeTopic(topic, subscribe, peerId)
# TODO: Decrement connected peers here
method handleDisconnect*(w: WakuSub, peer: PubSubPeer) {.async.} =
debug "handleDisconnect (NYI)"
@ -69,6 +78,9 @@ method rpcHandler*(w: WakuSub,
peer: PubSubPeer,
rpcMsgs: seq[RPCMsg]) {.async.} =
debug "rpcHandler"
# XXX: Right place?
total_messages.inc()
await procCall FloodSub(w).rpcHandler(peer, rpcMsgs)
method publish*(w: WakuSub,