mirror of https://github.com/waku-org/nwaku.git
chore(networkmonitor): add metric listing content topics + messages (#1335)
* chore(networkmonitor): add metric listing content topics + messages * chore(networkmonitor): drop topics after x amount
This commit is contained in:
parent
07833ce313
commit
cff8fb1502
|
@ -53,8 +53,8 @@ Monitoring tool to run in an existing `waku` network with the following features
|
||||||
* Keeps discovering new peers using `discv5`
|
* Keeps discovering new peers using `discv5`
|
||||||
* Tracks advertised capabilities of each node as per stored in the ENR `waku` field
|
* Tracks advertised capabilities of each node as per stored in the ENR `waku` field
|
||||||
* Attempts to connect to all nodes, tracking which protocols each node supports
|
* Attempts to connect to all nodes, tracking which protocols each node supports
|
||||||
* Presents grafana-ready metrics showing the state of the network in terms of locations, ips, number discovered peers, number of peers we could connect to, user-agent that each peer contains, etc.
|
* Presents grafana-ready metrics showing the state of the network in terms of locations, ips, number discovered peers, number of peers we could connect to, user-agent that each peer contains, content topics and the amount of rx messages in each one.
|
||||||
* Metrics are exposed through prometheus metrics but also with a custom rest api, presenting detailed information about each peer.
|
* Metrics are exposed through prometheus metrics but also with a custom rest api, presenting detailed information about each peer. These metrics are exposed via a rest api.
|
||||||
|
|
||||||
### Usage
|
### Usage
|
||||||
|
|
||||||
|
@ -79,14 +79,19 @@ The following options are available:
|
||||||
|
|
||||||
### Example
|
### Example
|
||||||
|
|
||||||
Connect to the network through a given bootstrap node, with default parameters. Once its running, metrics will be live at `localhost:8008/metrics`
|
Connect to the network through a given bootstrap node, with default parameters. See metrics section for the data that it exposes.
|
||||||
|
|
||||||
```console
|
```console
|
||||||
./build/networkmonitor --log-level=INFO --b="enr:-Nm4QOdTOKZJKTUUZ4O_W932CXIET-M9NamewDnL78P5u9DOGnZlK0JFZ4k0inkfe6iY-0JAaJVovZXc575VV3njeiABgmlkgnY0gmlwhAjS3ueKbXVsdGlhZGRyc7g6ADg2MW5vZGUtMDEuYWMtY24taG9uZ2tvbmctYy53YWt1djIucHJvZC5zdGF0dXNpbS5uZXQGH0DeA4lzZWNwMjU2azGhAo0C-VvfgHiXrxZi3umDiooXMGY9FvYj5_d1Q4EeS7eyg3RjcIJ2X4N1ZHCCIyiFd2FrdTIP"
|
./build/networkmonitor --log-level=INFO --b="enr:-Nm4QOdTOKZJKTUUZ4O_W932CXIET-M9NamewDnL78P5u9DOGnZlK0JFZ4k0inkfe6iY-0JAaJVovZXc575VV3njeiABgmlkgnY0gmlwhAjS3ueKbXVsdGlhZGRyc7g6ADg2MW5vZGUtMDEuYWMtY24taG9uZ2tvbmctYy53YWt1djIucHJvZC5zdGF0dXNpbS5uZXQGH0DeA4lzZWNwMjU2azGhAo0C-VvfgHiXrxZi3umDiooXMGY9FvYj5_d1Q4EeS7eyg3RjcIJ2X4N1ZHCCIyiFd2FrdTIP"
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### Metrics
|
||||||
|
|
||||||
### metrics
|
Metrics are divided into two categories:
|
||||||
|
* Prometheus metrics, exposed as i.e. gauges.
|
||||||
|
* Custom metrics, used for unconstrained labels such as peer information or content topics. These metrics are not exposed through prometheus because since they are unconstrained, they can end up breaking the backend, as a new datapoint is generated for each one and it can reach up a point where is too much to handle.
|
||||||
|
|
||||||
|
#### Prometheus Metrics
|
||||||
|
|
||||||
The following metrics are available. See `http://localhost:8008/metrics`
|
The following metrics are available. See `http://localhost:8008/metrics`
|
||||||
|
|
||||||
|
@ -98,6 +103,8 @@ Other relevant metrics reused from `nim-eth`:
|
||||||
* routing_table_nodes: Inherited from nim-eth, number of nodes in the routing table
|
* routing_table_nodes: Inherited from nim-eth, number of nodes in the routing table
|
||||||
* discovery_message_requests_outgoing_total: Inherited from nim-eth, number of outging discovery requests, useful to know if the node is actiely looking for new peers
|
* discovery_message_requests_outgoing_total: Inherited from nim-eth, number of outging discovery requests, useful to know if the node is actiely looking for new peers
|
||||||
|
|
||||||
The following metrics are exposed via a custom rest api. See `http://localhost:8009/allpeersinfo`
|
#### Custom Metrics
|
||||||
|
|
||||||
* json list of all peers with extra information such as ip, locatio, supported protocols and last connection time.
|
The following endpoints are available:
|
||||||
|
* `http://localhost:8009/allpeersinfo`: json list of all peers with extra information such as ip, location, supported protocols and last connection time.
|
||||||
|
* `http://localhost:8009/contenttopics`: content topic messages and its message count.
|
|
@ -22,6 +22,7 @@ import
|
||||||
../../waku/v2/node/peer_manager/peer_manager,
|
../../waku/v2/node/peer_manager/peer_manager,
|
||||||
../../waku/v2/node/waku_node,
|
../../waku/v2/node/waku_node,
|
||||||
../../waku/v2/utils/wakuenr,
|
../../waku/v2/utils/wakuenr,
|
||||||
|
../../waku/v2/protocol/waku_message,
|
||||||
../../waku/v2/utils/peers,
|
../../waku/v2/utils/peers,
|
||||||
./networkmonitor_metrics,
|
./networkmonitor_metrics,
|
||||||
./networkmonitor_config,
|
./networkmonitor_config,
|
||||||
|
@ -94,6 +95,7 @@ proc setConnectedPeersMetrics(discoveredNodes: seq[Node],
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# try to connect to the peer
|
# try to connect to the peer
|
||||||
|
# TODO: check last connection time and if not > x, skip connecting
|
||||||
let timedOut = not await node.connectToNodes(@[peer.get()]).withTimeout(timeout)
|
let timedOut = not await node.connectToNodes(@[peer.get()]).withTimeout(timeout)
|
||||||
if timedOut:
|
if timedOut:
|
||||||
warn "could not connect to peer, timedout", timeout=timeout, peer=peer.get()
|
warn "could not connect to peer, timedout", timeout=timeout, peer=peer.get()
|
||||||
|
@ -146,7 +148,7 @@ proc crawlNetwork(node: WakuNode,
|
||||||
conf: NetworkMonitorConf,
|
conf: NetworkMonitorConf,
|
||||||
allPeersRef: CustomPeersTableRef) {.async.} =
|
allPeersRef: CustomPeersTableRef) {.async.} =
|
||||||
|
|
||||||
let crawlInterval = conf.refreshInterval * 1000 * 60
|
let crawlInterval = conf.refreshInterval * 1000
|
||||||
let client = newHttpClient()
|
let client = newHttpClient()
|
||||||
while true:
|
while true:
|
||||||
# discover new random nodes
|
# discover new random nodes
|
||||||
|
@ -201,14 +203,16 @@ proc initAndStartNode(conf: NetworkMonitorConf): Result[WakuNode, string] =
|
||||||
error("could not start node")
|
error("could not start node")
|
||||||
|
|
||||||
proc startRestApiServer(conf: NetworkMonitorConf,
|
proc startRestApiServer(conf: NetworkMonitorConf,
|
||||||
allPeersRef: CustomPeersTableRef): Result[void, string] =
|
allPeersInfo: CustomPeersTableRef,
|
||||||
|
numMessagesPerContentTopic: ContentTopicMessageTableRef
|
||||||
|
): Result[void, string] =
|
||||||
try:
|
try:
|
||||||
let serverAddress = initTAddress(conf.metricsRestAddress & ":" & $conf.metricsRestPort)
|
let serverAddress = initTAddress(conf.metricsRestAddress & ":" & $conf.metricsRestPort)
|
||||||
proc validate(pattern: string, value: string): int =
|
proc validate(pattern: string, value: string): int =
|
||||||
if pattern.startsWith("{") and pattern.endsWith("}"): 0
|
if pattern.startsWith("{") and pattern.endsWith("}"): 0
|
||||||
else: 1
|
else: 1
|
||||||
var router = RestRouter.init(validate)
|
var router = RestRouter.init(validate)
|
||||||
router.installHandler(allPeersRef)
|
router.installHandler(allPeersInfo, numMessagesPerContentTopic)
|
||||||
var sres = RestServerRef.new(router, serverAddress)
|
var sres = RestServerRef.new(router, serverAddress)
|
||||||
let restServer = sres.get()
|
let restServer = sres.get()
|
||||||
restServer.start()
|
restServer.start()
|
||||||
|
@ -216,6 +220,36 @@ proc startRestApiServer(conf: NetworkMonitorConf,
|
||||||
error("could not start rest api server")
|
error("could not start rest api server")
|
||||||
ok()
|
ok()
|
||||||
|
|
||||||
|
# handles rx of messages over a topic (see subscribe)
|
||||||
|
# counts the number of messages per content topic
|
||||||
|
proc subscribeAndHandleMessages(node: WakuNode,
|
||||||
|
pubsubTopic: PubsubTopic,
|
||||||
|
msgPerContentTopic: ContentTopicMessageTableRef) =
|
||||||
|
|
||||||
|
# handle function
|
||||||
|
proc handler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} =
|
||||||
|
let messageRes = WakuMessage.decode(data)
|
||||||
|
if messageRes.isErr():
|
||||||
|
warn "could not decode message", data=data, pubsubTopic=pubsubTopic
|
||||||
|
|
||||||
|
let message = messageRes.get()
|
||||||
|
trace "rx message", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic
|
||||||
|
|
||||||
|
# If we reach a table limit size, remove c topics with the least messages.
|
||||||
|
let tableSize = 100
|
||||||
|
if msgPerContentTopic.len > (tableSize - 1):
|
||||||
|
let minIndex = toSeq(msgPerContentTopic.values()).minIndex()
|
||||||
|
msgPerContentTopic.del(toSeq(msgPerContentTopic.keys())[minIndex])
|
||||||
|
|
||||||
|
# TODO: Will overflow at some point
|
||||||
|
# +1 if content topic existed, init to 1 otherwise
|
||||||
|
if msgPerContentTopic.hasKey(message.contentTopic):
|
||||||
|
msgPerContentTopic[message.contentTopic] += 1
|
||||||
|
else:
|
||||||
|
msgPerContentTopic[message.contentTopic] = 1
|
||||||
|
|
||||||
|
node.subscribe(pubsubTopic, handler)
|
||||||
|
|
||||||
when isMainModule:
|
when isMainModule:
|
||||||
# known issue: confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
|
# known issue: confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
|
||||||
{.pop.}
|
{.pop.}
|
||||||
|
@ -231,28 +265,38 @@ when isMainModule:
|
||||||
setLogLevel(conf.logLevel)
|
setLogLevel(conf.logLevel)
|
||||||
|
|
||||||
# list of peers that we have discovered/connected
|
# list of peers that we have discovered/connected
|
||||||
var allPeersRef = CustomPeersTableRef()
|
var allPeersInfo = CustomPeersTableRef()
|
||||||
|
|
||||||
|
# content topic and the number of messages that were received
|
||||||
|
var msgPerContentTopic = ContentTopicMessageTableRef()
|
||||||
|
|
||||||
# start metrics server
|
# start metrics server
|
||||||
if conf.metricsServer:
|
if conf.metricsServer:
|
||||||
let res = startMetricsServer(conf.metricsServerAddress, Port(conf.metricsServerPort))
|
let res = startMetricsServer(conf.metricsServerAddress, Port(conf.metricsServerPort))
|
||||||
if res.isErr:
|
if res.isErr():
|
||||||
error "could not start metrics server", err=res.error
|
error "could not start metrics server", err=res.error
|
||||||
quit(1)
|
quit(1)
|
||||||
|
|
||||||
# start rest server for custom metrics
|
# start rest server for custom metrics
|
||||||
let res = startRestApiServer(conf, allPeersRef)
|
let res = startRestApiServer(conf, allPeersInfo, msgPerContentTopic)
|
||||||
if res.isErr:
|
if res.isErr():
|
||||||
error "could not start rest api server", err=res.error
|
error "could not start rest api server", err=res.error
|
||||||
|
|
||||||
# start waku node
|
# start waku node
|
||||||
let node = initAndStartNode(conf)
|
let nodeRes = initAndStartNode(conf)
|
||||||
if node.isErr:
|
if nodeRes.isErr():
|
||||||
error "could not start node"
|
error "could not start node"
|
||||||
quit 1
|
quit 1
|
||||||
|
|
||||||
|
let node = nodeRes.get()
|
||||||
|
|
||||||
|
waitFor node.mountRelay()
|
||||||
|
|
||||||
|
# Subscribe the node to the default pubsubtopic, to count messages
|
||||||
|
subscribeAndHandleMessages(node, DefaultPubsubTopic, msgPerContentTopic)
|
||||||
|
|
||||||
# spawn the routine that crawls the network
|
# spawn the routine that crawls the network
|
||||||
# TODO: split into 3 routines (discovery, connections, ip2location)
|
# TODO: split into 3 routines (discovery, connections, ip2location)
|
||||||
asyncSpawn crawlNetwork(node.get(), conf, allPeersRef)
|
asyncSpawn crawlNetwork(node, conf, allPeersInfo)
|
||||||
|
|
||||||
runForever()
|
runForever()
|
|
@ -28,8 +28,8 @@ type
|
||||||
abbr: "b" }: seq[string]
|
abbr: "b" }: seq[string]
|
||||||
|
|
||||||
refreshInterval* {.
|
refreshInterval* {.
|
||||||
desc: "How often new peers are discovered and connected to (in minutes)",
|
desc: "How often new peers are discovered and connected to (in seconds)",
|
||||||
defaultValue: 10,
|
defaultValue: 5,
|
||||||
name: "refresh-interval",
|
name: "refresh-interval",
|
||||||
abbr: "r" }: int
|
abbr: "r" }: int
|
||||||
|
|
||||||
|
|
|
@ -53,13 +53,21 @@ type
|
||||||
supportedProtocols*: seq[string]
|
supportedProtocols*: seq[string]
|
||||||
userAgent*: string
|
userAgent*: string
|
||||||
|
|
||||||
|
# Stores information about all discovered/connected peers
|
||||||
CustomPeersTableRef* = TableRef[string, CustomPeerInfo]
|
CustomPeersTableRef* = TableRef[string, CustomPeerInfo]
|
||||||
|
|
||||||
# GET /allpeersinfo
|
# stores the content topic and the count of rx messages
|
||||||
proc installHandler*(router: var RestRouter, allPeers: CustomPeersTableRef) =
|
ContentTopicMessageTableRef* = TableRef[string, int]
|
||||||
|
|
||||||
|
proc installHandler*(router: var RestRouter,
|
||||||
|
allPeers: CustomPeersTableRef,
|
||||||
|
numMessagesPerContentTopic: ContentTopicMessageTableRef) =
|
||||||
router.api(MethodGet, "/allpeersinfo") do () -> RestApiResponse:
|
router.api(MethodGet, "/allpeersinfo") do () -> RestApiResponse:
|
||||||
let values = toSeq(allPeers.values())
|
let values = toSeq(allPeers.values())
|
||||||
return RestApiResponse.response(values.toJson(), contentType="application/json")
|
return RestApiResponse.response(values.toJson(), contentType="application/json")
|
||||||
|
router.api(MethodGet, "/contenttopics") do () -> RestApiResponse:
|
||||||
|
# TODO: toJson() includes the hash
|
||||||
|
return RestApiResponse.response($(%numMessagesPerContentTopic), contentType="application/json")
|
||||||
|
|
||||||
proc startMetricsServer*(serverIp: ValidIpAddress, serverPort: Port): Result[void, string] =
|
proc startMetricsServer*(serverIp: ValidIpAddress, serverPort: Port): Result[void, string] =
|
||||||
info "Starting metrics HTTP server", serverIp, serverPort
|
info "Starting metrics HTTP server", serverIp, serverPort
|
||||||
|
|
Loading…
Reference in New Issue