From 7917e05d9d9cdef974c45d3fd7ab9f649bf06834 Mon Sep 17 00:00:00 2001 From: Alvaro Revuelta Date: Thu, 10 Nov 2022 10:29:34 +0100 Subject: [PATCH] chore(networkmonitor): tool to discover and provide metrics on peers (#1290) * chore(networkmonitor): initial prototype * chore(networkmonitor): add cli, metrics and PoC * feat(utils): add supportsCapability function + tests * feat(utils): add supportedCapabilites function * chore(networkmonitor): add metrics with enr/ip/capabilities * chore(networkmonitor): refactor + tests * chore(networkmonitor): add discovered timestamp * chore(networkmonitor): add metrics on connected nodes * chore(networkmonitor): new flags + utils file + readme * chore(networkmonitor): add user-agent metrics * chore(networkmonitor): connect only to randomly discovered peers * chore(networkmonitor): get location of peer using ip * chore(networkmonitor): expose peer metrics with simple rest server * chore(networkmonitor): update README * chore(networkmonitor): fix wakunode2 to waku_node * chore(networkmonitor): fix import order * chore(networkmonitor): fix comments + refactor + pushraises * chore(networkmonitor): refactor + handle exceptions * chore(networkmonitor): fix makefile after rebase * chore(networkmonitor): address review comments 1 * chore(networkmonitor): add nim.cfg --- Makefile | 3 + tests/v2/test_enr_utils.nim | 96 ++++++- tools/README.md | 59 +++- tools/networkmonitor/networkmonitor.nim | 258 ++++++++++++++++++ .../networkmonitor/networkmonitor_config.nim | 86 ++++++ .../networkmonitor/networkmonitor_metrics.nim | 73 +++++ tools/networkmonitor/networkmonitor_utils.nim | 53 ++++ tools/networkmonitor/nim.cfg | 3 + waku.nimble | 4 + waku/v2/utils/wakuenr.nim | 30 +- 10 files changed, 658 insertions(+), 7 deletions(-) create mode 100644 tools/networkmonitor/networkmonitor.nim create mode 100644 tools/networkmonitor/networkmonitor_config.nim create mode 100644 tools/networkmonitor/networkmonitor_metrics.nim create mode 100644 tools/networkmonitor/networkmonitor_utils.nim create mode 100644 tools/networkmonitor/nim.cfg diff --git a/Makefile b/Makefile index 49a378f63..9c3b84ce4 100644 --- a/Makefile +++ b/Makefile @@ -191,6 +191,9 @@ wakucanary: | build deps echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim wakucanary $(NIM_PARAMS) waku.nims +networkmonitor: | build deps + echo -e $(BUILD_MSG) "build/$@" && \ + $(ENV_SCRIPT) nim networkmonitor $(NIM_PARAMS) waku.nims ## Waku docs diff --git a/tests/v2/test_enr_utils.nim b/tests/v2/test_enr_utils.nim index 6e7a045d8..618fe0aba 100644 --- a/tests/v2/test_enr_utils.nim +++ b/tests/v2/test_enr_utils.nim @@ -1,10 +1,10 @@ {.used.} import - testutils/unittests, - std/options, - stew/byteutils, + std/[options,sequtils], chronos, + stew/byteutils, + testutils/unittests, ../../waku/v2/utils/wakuenr, ../test_helpers @@ -125,3 +125,93 @@ procSuite "ENR utils": for knownMultiaddr in knownMultiaddrs: check decodedAddrs.contains(knownMultiaddr) + + asyncTest "Supports specific capabilities encoded in the ENR": + let + enrIp = ValidIpAddress.init("127.0.0.1") + enrTcpPort, enrUdpPort = Port(60000) + enrKey = wakuenr.crypto.PrivateKey.random(Secp256k1, rng[])[] + multiaddrs = @[MultiAddress.init("/ip4/127.0.0.1/tcp/442/ws")[]] + + # TODO: Refactor initEnr, provide enums as inputs initEnr(capabilites=[Store,Filter]) + # TODO: safer than a util function and directly using the bits + # test all flag combinations 2^4 = 16 (b0000-b1111) + records = toSeq(0b0000_0000'u8..0b0000_1111'u8) + .mapIt(initEnr(enrKey, + some(enrIp), + some(enrTcpPort), + some(enrUdpPort), + some(uint8(it)), + multiaddrs)) + + # same order:  lightpush | filter| store | relay + expectedCapabilities = @[[false, false, false, false], + [false, false, false, true], + [false, false, true, false], + [false, false, true, true], + [false, true, false, false], + [false, true, false, true], + [false, true, true, false], + [false, true, true, true], + [true, false, false, false], + [true, false, false, true], + [true, false, true, false], + [true, false, true, true], + [true, true, false, false], + [true, true, false, true], + [true, true, true, false], + [true, true, true, true]] + + for i, record in records: + for j, capability in @[Lightpush, Filter, Store, Relay]: + check expectedCapabilities[i][j] == record.supportsCapability(capability) + + asyncTest "Get all supported capabilities encoded in the ENR": + let + enrIp = ValidIpAddress.init("127.0.0.1") + enrTcpPort, enrUdpPort = Port(60000) + enrKey = wakuenr.crypto.PrivateKey.random(Secp256k1, rng[])[] + multiaddrs = @[MultiAddress.init("/ip4/127.0.0.1/tcp/442/ws")[]] + + records = @[0b0000_0000'u8, + 0b0000_1111'u8, + 0b0000_1001'u8, + 0b0000_1110'u8, + 0b0000_1000'u8,] + .mapIt(initEnr(enrKey, + some(enrIp), + some(enrTcpPort), + some(enrUdpPort), + some(uint8(it)), + multiaddrs)) + + # expected capabilities, ordered LSB to MSB + expectedCapabilities: seq[seq[Capabilities]] = @[ + #[0b0000_0000]# @[], + #[0b0000_1111]# @[Relay, Store, Filter, Lightpush], + #[0b0000_1001]# @[Relay, Lightpush], + #[0b0000_1110]# @[Store, Filter, Lightpush], + #[0b0000_1000]# @[Lightpush]] + + for i, actualExpetedTuple in zip(records, expectedCapabilities): + check actualExpetedTuple[0].getCapabilities() == actualExpetedTuple[1] + + asyncTest "Get supported capabilities of a non waku node": + + # non waku enr, i.e. Ethereum one + let nonWakuEnr = "enr:-KG4QOtcP9X1FbIMOe17QNMKqDxCpm14jcX5tiOE4_TyMrFqbmhPZHK_ZPG2G"& + "xb1GE2xdtodOfx9-cgvNtxnRyHEmC0ghGV0aDKQ9aX9QgAAAAD__________4JpZIJ2NIJpcIQDE8KdiXNl"& + "Y3AyNTZrMaEDhpehBDbZjM_L9ek699Y7vhUJ-eAdMyQW_Fil522Y0fODdGNwgiMog3VkcIIjKA" + + var nonWakuEnrRecord: Record + + check: + nonWakuEnrRecord.fromURI(nonWakuEnr) + + # check that it doesn't support any capability and it doesnt't break + check: + nonWakuEnrRecord.getCapabilities() == [] + nonWakuEnrRecord.supportsCapability(Relay) == false + nonWakuEnrRecord.supportsCapability(Store) == false + nonWakuEnrRecord.supportsCapability(Filter) == false + nonWakuEnrRecord.supportsCapability(Lightpush) == false \ No newline at end of file diff --git a/tools/README.md b/tools/README.md index 7756e4113..1d8b0e4b8 100644 --- a/tools/README.md +++ b/tools/README.md @@ -1,4 +1,4 @@ -## waku canary tool +## waku canary tool Attempts to dial a peer and asserts it supports a given set of protocols. @@ -45,4 +45,59 @@ Note that a domain name can also be used. $ ./build/wakucanary --address=/dns4/node-01.do-ams3.status.test.statusim.net/tcp/30303/p2p/16Uiu2HAkukebeXjTQ9QDBeNDWuGfbaSg79wkkhK4vPocLgR6QFDf --protocol=store --protocol=filter $ echo $? 0 -``` \ No newline at end of file +``` + +## networkmonitor + +Monitoring tool to run in an existing `waku` network with the following features: +* Keeps discovering new peers using `discv5` +* Tracks advertised capabilities of each node as per stored in the ENR `waku` field +* Attempts to connect to all nodes, tracking which protocols each node supports +* Presents grafana-ready metrics showing the state of the network in terms of locations, ips, number discovered peers, number of peers we could connect to, user-agent that each peer contains, etc. +* Metrics are exposed through prometheus metrics but also with a custom rest api, presenting detailed information about each peer. + +### Usage + +```console +./build/networkmonitor --help +Usage: + +networkmonitor [OPTIONS]... + +The following options are available: + + -l, --log-level Sets the log level [=LogLevel.DEBUG]. + -t, --timeout Timeout to consider that the connection failed [=chronos.seconds(10)]. + -b, --bootstrap-node Bootstrap ENR node. Argument may be repeated. [=@[""]]. + -r, --refresh-interval How often new peers are discovered and connected to (in minutes) [=10]. + --metrics-server Enable the metrics server: true|false [=true]. + --metrics-server-address Listening address of the metrics server. [=ValidIpAddress.init("127.0.0.1")]. + --metrics-server-port Listening HTTP port of the metrics server. [=8008]. + --metrics-rest-address Listening address of the metrics rest server. [=127.0.0.1]. + --metrics-rest-port Listening HTTP port of the metrics rest server. [=8009]. +``` + +### Example + +Connect to the network through a given bootstrap node, with default parameters. Once its running, metrics will be live at `localhost:8008/metrics` + +```console +./build/networkmonitor --log-level=INFO --b="enr:-Nm4QOdTOKZJKTUUZ4O_W932CXIET-M9NamewDnL78P5u9DOGnZlK0JFZ4k0inkfe6iY-0JAaJVovZXc575VV3njeiABgmlkgnY0gmlwhAjS3ueKbXVsdGlhZGRyc7g6ADg2MW5vZGUtMDEuYWMtY24taG9uZ2tvbmctYy53YWt1djIucHJvZC5zdGF0dXNpbS5uZXQGH0DeA4lzZWNwMjU2azGhAo0C-VvfgHiXrxZi3umDiooXMGY9FvYj5_d1Q4EeS7eyg3RjcIJ2X4N1ZHCCIyiFd2FrdTIP" +``` + + +### metrics + +The following metrics are available. See `http://localhost:8008/metrics` + +* peer_type_as_per_enr: Number of peers supporting each capability according the the ENR (Relay, Store, Lightpush, Filter) +* peer_type_as_per_protocol: Number of peers supporting each protocol, after a successful connection) +* peer_user_agents: List of useragents found in the network and their count + +Other relevant metrics reused from `nim-eth`: +* routing_table_nodes: Inherited from nim-eth, number of nodes in the routing table +* discovery_message_requests_outgoing_total: Inherited from nim-eth, number of outging discovery requests, useful to know if the node is actiely looking for new peers + +The following metrics are exposed via a custom rest api. See `http://localhost:8009/allpeersinfo` + +* json list of all peers with extra information such as ip, locatio, supported protocols and last connection time. diff --git a/tools/networkmonitor/networkmonitor.nim b/tools/networkmonitor/networkmonitor.nim new file mode 100644 index 000000000..4f241896e --- /dev/null +++ b/tools/networkmonitor/networkmonitor.nim @@ -0,0 +1,258 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/[tables,strutils,times,sequtils,httpclient], + chronicles, + chronicles/topics_registry, + chronos, + confutils, + eth/keys, + eth/p2p/discoveryv5/enr, + libp2p/crypto/crypto, + metrics, + metrics/chronos_httpserver, + presto/[route, server], + stew/shims/net + +import + ../../waku/v2/node/discv5/waku_discv5, + ../../waku/v2/node/peer_manager/peer_manager, + ../../waku/v2/node/waku_node, + ../../waku/v2/utils/wakuenr, + ../../waku/v2/utils/peers, + ./networkmonitor_metrics, + ./networkmonitor_config, + ./networkmonitor_utils + +logScope: + topics = "networkmonitor" + +proc setDiscoveredPeersCapabilities( + routingTableNodes: seq[Node]) = + for capability in @[Relay, Store, Filter, Lightpush]: + let nOfNodesWithCapability = routingTableNodes.countIt(it.record.supportsCapability(capability)) + info "capabilities as per ENR waku flag", capability=capability, amount=nOfNodesWithCapability + peer_type_as_per_enr.set(int64(nOfNodesWithCapability), labelValues = [$capability]) + +# TODO: Split in discover, connect, populate ips +proc setConnectedPeersMetrics(discoveredNodes: seq[Node], + node: WakuNode, + timeout: chronos.Duration, + client: HttpClient, + allPeers: CustomPeersTableRef) {.async.} = + + let currentTime = $getTime() + + # Protocols and agent string and its count + var allProtocols: Table[string, int] + var allAgentStrings: Table[string, int] + + # iterate all newly discovered nodes + for discNode in discoveredNodes: + let typedRecord = discNode.record.toTypedRecord() + if not typedRecord.isOk(): + warn "could not convert record to typed record", record=discNode.record + continue + + let secp256k1 = typedRecord.get().secp256k1 + if not secp256k1.isSome(): + warn "could not get secp256k1 key", typedRecord=typedRecord.get() + continue + + # create new entry if new peerId found + let peerId = secp256k1.get().toHex() + let customPeerInfo = CustomPeerInfo(peerId: peerId) + if not allPeers.hasKey(peerId): + allPeers[peerId] = customPeerInfo + + allPeers[peerId].lastTimeDiscovered = currentTime + allPeers[peerId].enr = discNode.record.toURI() + allPeers[peerId].enrCapabilities = discNode.record.getCapabilities().mapIt($it) + + if not typedRecord.get().ip.isSome(): + warn "ip field is not set", record=typedRecord.get() + continue + + let ip = $typedRecord.get().ip.get().join(".") + allPeers[peerId].ip = ip + + # get more info the peers from its ip address + let location = await ipToLocation(ip, client) + if not location.isOk(): + warn "could not get location", ip=ip + continue + + allPeers[peerId].country = location.get().country + allPeers[peerId].city = location.get().city + + let peer = toRemotePeerInfo(discNode.record) + if not peer.isOk(): + warn "error converting record to remote peer info", record=discNode.record + continue + + # try to connect to the peer + let timedOut = not await node.connectToNodes(@[peer.get()]).withTimeout(timeout) + if timedOut: + warn "could not connect to peer, timedout", timeout=timeout, peer=peer.get() + continue + + # after connection, get supported protocols + let lp2pPeerStore = node.switch.peerStore + let nodeProtocols = lp2pPeerStore[ProtoBook][peer.get().peerId] + allPeers[peerId].supportedProtocols = nodeProtocols + allPeers[peerId].lastTimeConnected = currentTime + + # after connection, get user-agent + let nodeUserAgent = lp2pPeerStore[AgentBook][peer.get().peerId] + allPeers[peerId].userAgent = nodeUserAgent + + # store avaiable protocols in the network + for protocol in nodeProtocols: + if not allProtocols.hasKey(protocol): + allProtocols[protocol] = 0 + allProtocols[protocol] += 1 + + # store available user-agents in the network + if not allAgentStrings.hasKey(nodeUserAgent): + allAgentStrings[nodeUserAgent] = 0 + allAgentStrings[nodeUserAgent] += 1 + + debug "connected to peer", peer=allPeers[customPeerInfo.peerId] + + # inform the total connections that we did in this round + let nOfOkConnections = allProtocols.len() + info "number of successful connections", amount=nOfOkConnections + + # update count on each protocol + for protocol in allProtocols.keys(): + let countOfProtocols = allProtocols[protocol] + peer_type_as_per_protocol.set(int64(countOfProtocols), labelValues = [protocol]) + info "supported protocols in the network", protocol=protocol, count=countOfProtocols + + # update count on each user-agent + for userAgent in allAgentStrings.keys(): + let countOfUserAgent = allAgentStrings[userAgent] + peer_user_agents.set(int64(countOfUserAgent), labelValues = [userAgent]) + info "user agents participating in the network", userAgent=userAgent, count=countOfUserAgent + + +# TODO: Split in discovery, connections, and ip2location +# crawls the network discovering peers and trying to connect to them +# metrics are processed and exposed +proc crawlNetwork(node: WakuNode, + conf: NetworkMonitorConf, + allPeersRef: CustomPeersTableRef) {.async.} = + + let crawlInterval = conf.refreshInterval * 1000 * 60 + let client = newHttpClient() + while true: + # discover new random nodes + let discoveredNodes = await node.wakuDiscv5.protocol.queryRandom() + + # nodes are nested into bucket, flat it + let flatNodes = node.wakuDiscv5.protocol.routingTable.buckets.mapIt(it.nodes).flatten() + + # populate metrics related to capabilities as advertised by the ENR (see waku field) + setDiscoveredPeersCapabilities(flatNodes) + + # tries to connect to all newly discovered nodes + # and populates metrics related to peers we could connect + # note random discovered nodes can be already known + await setConnectedPeersMetrics(discoveredNodes, node, conf.timeout, client, allPeersRef) + + let totalNodes = flatNodes.len + let seenNodes = flatNodes.countIt(it.seen) + + info "discovered nodes: ", total=totalNodes, seen=seenNodes + + # Notes: + # we dont run ipMajorityLoop + # we dont run revalidateLoop + + await sleepAsync(crawlInterval) + +proc initAndStartNode(conf: NetworkMonitorConf): Result[WakuNode, string] = + let + # some hardcoded parameters + rng = keys.newRng() + nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + nodeTcpPort = Port(60000) + nodeUdpPort = Port(9000) + flags = initWakuFlags(lightpush = false, filter = false, store = false, relay = true) + + try: + let + bindIp = ValidIpAddress.init("0.0.0.0") + extIp = ValidIpAddress.init("127.0.0.1") + node = WakuNode.new(nodeKey, bindIp, nodeTcpPort) + + # mount discv5 + node.wakuDiscv5 = WakuDiscoveryV5.new( + some(extIp), some(nodeTcpPort), some(nodeUdpPort), + bindIp, nodeUdpPort, conf.bootstrapNodes, false, + keys.PrivateKey(nodeKey.skkey), flags, [], node.rng) + + node.wakuDiscv5.protocol.open() + return ok(node) + except: + error("could not start node") + +proc startRestApiServer(conf: NetworkMonitorConf, + allPeersRef: CustomPeersTableRef): Result[void, string] = + try: + let serverAddress = initTAddress(conf.metricsRestAddress & ":" & $conf.metricsRestPort) + proc validate(pattern: string, value: string): int = + if pattern.startsWith("{") and pattern.endsWith("}"): 0 + else: 1 + var router = RestRouter.init(validate) + router.installHandler(allPeersRef) + var sres = RestServerRef.new(router, serverAddress) + let restServer = sres.get() + restServer.start() + except: + error("could not start rest api server") + ok() + +when isMainModule: + # known issue: confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError + {.pop.} + let confRes = NetworkMonitorConf.loadConfig() + if confRes.isErr(): + error "could not load cli variables", err=confRes.error + quit(1) + + let conf = confRes.get() + info "cli flags", conf=conf + + if conf.logLevel != LogLevel.NONE: + setLogLevel(conf.logLevel) + + # list of peers that we have discovered/connected + var allPeersRef = CustomPeersTableRef() + + # start metrics server + if conf.metricsServer: + let res = startMetricsServer(conf.metricsServerAddress, Port(conf.metricsServerPort)) + if res.isErr: + error "could not start metrics server", err=res.error + quit(1) + + # start rest server for custom metrics + let res = startRestApiServer(conf, allPeersRef) + if res.isErr: + error "could not start rest api server", err=res.error + + # start waku node + let node = initAndStartNode(conf) + if node.isErr: + error "could not start node" + quit 1 + + # spawn the routine that crawls the network + # TODO: split into 3 routines (discovery, connections, ip2location) + asyncSpawn crawlNetwork(node.get(), conf, allPeersRef) + + runForever() \ No newline at end of file diff --git a/tools/networkmonitor/networkmonitor_config.nim b/tools/networkmonitor/networkmonitor_config.nim new file mode 100644 index 000000000..000c5b56f --- /dev/null +++ b/tools/networkmonitor/networkmonitor_config.nim @@ -0,0 +1,86 @@ +import + std/strutils, + chronicles, + chronicles/topics_registry, + chronos, + confutils, + stew/results, + stew/shims/net + +type + NetworkMonitorConf* = object + logLevel* {. + desc: "Sets the log level", + defaultValue: LogLevel.DEBUG, + name: "log-level", + abbr: "l" .}: LogLevel + + timeout* {. + desc: "Timeout to consider that the connection failed", + defaultValue: chronos.seconds(10), + name: "timeout", + abbr: "t" }: chronos.Duration + + bootstrapNodes* {. + desc: "Bootstrap ENR node. Argument may be repeated.", + defaultValue: @[""], + name: "bootstrap-node", + abbr: "b" }: seq[string] + + refreshInterval* {. + desc: "How often new peers are discovered and connected to (in minutes)", + defaultValue: 10, + name: "refresh-interval", + abbr: "r" }: int + + ## Prometheus metrics config + metricsServer* {. + desc: "Enable the metrics server: true|false" + defaultValue: true + name: "metrics-server" }: bool + + metricsServerAddress* {. + desc: "Listening address of the metrics server." + defaultValue: ValidIpAddress.init("127.0.0.1") + name: "metrics-server-address" }: ValidIpAddress + + metricsServerPort* {. + desc: "Listening HTTP port of the metrics server." + defaultValue: 8008 + name: "metrics-server-port" }: uint16 + + ## Custom metrics rest server + metricsRestAddress* {. + desc: "Listening address of the metrics rest server.", + defaultValue: "127.0.0.1", + name: "metrics-rest-address" }: string + metricsRestPort* {. + desc: "Listening HTTP port of the metrics rest server.", + defaultValue: 8009, + name: "metrics-rest-port" }: uint16 + + +proc parseCmdArg*(T: type ValidIpAddress, p: string): T = + try: + result = ValidIpAddress.init(p) + except CatchableError as e: + raise newException(ConfigurationError, "Invalid IP address") + +proc completeCmdArg*(T: type ValidIpAddress, val: string): seq[string] = + return @[] + +proc parseCmdArg*(T: type chronos.Duration, p: string): T = + try: + result = chronos.seconds(parseInt(p)) + except CatchableError as e: + raise newException(ConfigurationError, "Invalid duration value") + +proc completeCmdArg*(T: type chronos.Duration, val: string): seq[string] = + return @[] + +proc loadConfig*(T: type NetworkMonitorConf): Result[T, string] = + try: + let conf = NetworkMonitorConf.load() + ok(conf) + except CatchableError: + err(getCurrentExceptionMsg()) \ No newline at end of file diff --git a/tools/networkmonitor/networkmonitor_metrics.nim b/tools/networkmonitor/networkmonitor_metrics.nim new file mode 100644 index 000000000..14f9ae84a --- /dev/null +++ b/tools/networkmonitor/networkmonitor_metrics.nim @@ -0,0 +1,73 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/[json,tables,sequtils], + chronicles, + chronicles/topics_registry, + chronos, + json_serialization, + metrics, + metrics/chronos_httpserver, + presto/route, + presto/server, + stew/results, + stew/shims/net + +logScope: + topics = "networkmonitor_metrics" + +# On top of our custom metrics, the following are reused from nim-eth +#routing_table_nodes{state=""} +#routing_table_nodes{state="seen"} +#discovery_message_requests_outgoing_total{response=""} +#discovery_message_requests_outgoing_total{response="no_response"} + +declarePublicGauge peer_type_as_per_enr, + "Number of peers supporting each capability according the the ENR", + labels = ["capability"] + +declarePublicGauge peer_type_as_per_protocol, + "Number of peers supporting each protocol, after a successful connection) ", + labels = ["protocols"] + +declarePublicGauge peer_user_agents, + "Number of peers with each user agent", + labels = ["user_agent"] + +type + CustomPeerInfo* = object + # populated after discovery + lastTimeDiscovered*: string + peerId*: string + enr*: string + ip*: string + enrCapabilities*: seq[string] + country*: string + city*: string + + # only after ok connection + lastTimeConnected*: string + supportedProtocols*: seq[string] + userAgent*: string + + CustomPeersTableRef* = TableRef[string, CustomPeerInfo] + +# GET /allpeersinfo +proc installHandler*(router: var RestRouter, allPeers: CustomPeersTableRef) = + router.api(MethodGet, "/allpeersinfo") do () -> RestApiResponse: + let values = toSeq(allPeers.values()) + return RestApiResponse.response(values.toJson(), contentType="application/json") + +proc startMetricsServer*(serverIp: ValidIpAddress, serverPort: Port): Result[void, string] = + info "Starting metrics HTTP server", serverIp, serverPort + + try: + startMetricsHttpServer($serverIp, serverPort) + except Exception as e: + error("Failed to start metrics HTTP server", serverIp=serverIp, serverPort=serverPort, msg=e.msg) + + info "Metrics HTTP server started", serverIp, serverPort + ok() \ No newline at end of file diff --git a/tools/networkmonitor/networkmonitor_utils.nim b/tools/networkmonitor/networkmonitor_utils.nim new file mode 100644 index 000000000..fca1ef12a --- /dev/null +++ b/tools/networkmonitor/networkmonitor_utils.nim @@ -0,0 +1,53 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + std/[json,httpclient], + chronicles, + chronicles/topics_registry, + chronos, + stew/results + +type + NodeLocation = object + country*: string + city*: string + lat*: string + long*: string + isp*: string + +proc flatten*[T](a: seq[seq[T]]): seq[T] = + var aFlat = newSeq[T](0) + for subseq in a: + aFlat &= subseq + return aFlat + +# using an external api retrieves some data associated with the ip +# TODO: use a cache +# TODO: use nim-presto's HTTP asynchronous client +proc ipToLocation*(ip: string, + client: Httpclient): + Future[Result[NodeLocation, string]] {.async.} = + # naive mechanism to avoid hitting the rate limit + # IP-API endpoints are now limited to 45 HTTP requests per minute + await sleepAsync(1400) + try: + let content = client.getContent("http://ip-api.com/json/" & ip) + let jsonContent = parseJson(content) + + if $jsonContent["status"].getStr() != "success": + error "query failed", result=jsonContent + return err("query failed: " & $jsonContent) + + return ok(NodeLocation( + country: jsonContent["country"].getStr(), + city: jsonContent["city"].getStr(), + lat: jsonContent["lat"].getStr(), + long: jsonContent["lon"].getStr(), + isp: jsonContent["isp"].getStr() + )) + except: + error "failed to get the location for IP", ip=ip, error=getCurrentExceptionMsg() + return err("failed to get the location for IP '" & ip & "':" & getCurrentExceptionMsg()) \ No newline at end of file diff --git a/tools/networkmonitor/nim.cfg b/tools/networkmonitor/nim.cfg new file mode 100644 index 000000000..8d66f42d3 --- /dev/null +++ b/tools/networkmonitor/nim.cfg @@ -0,0 +1,3 @@ +-d:chronicles_line_numbers +-d:chronicles_runtime_filtering:on +-d:discv5_protocol_id:d5waku \ No newline at end of file diff --git a/waku.nimble b/waku.nimble index 122c90d66..e70f0e18a 100644 --- a/waku.nimble +++ b/waku.nimble @@ -108,3 +108,7 @@ task chat2bridge, "Build chat2bridge": task wakucanary, "Build waku-canary tool": let name = "wakucanary" buildBinary name, "tools/wakucanary/", "-d:chronicles_log_level=TRACE" + +task networkmonitor, "Build network monitor tool": + let name = "networkmonitor" + buildBinary name, "tools/networkmonitor/", "-d:chronicles_log_level=TRACE" diff --git a/waku/v2/utils/wakuenr.nim b/waku/v2/utils/wakuenr.nim index 31185c22e..403043caf 100644 --- a/waku/v2/utils/wakuenr.nim +++ b/waku/v2/utils/wakuenr.nim @@ -11,7 +11,8 @@ import libp2p/[multiaddress, multicodec], libp2p/crypto/crypto, stew/[endians2, results], - stew/shims/net + stew/shims/net, + std/bitops export enr, crypto, multiaddress, net @@ -23,7 +24,15 @@ type ## 8-bit flag field to indicate Waku capabilities. ## Only the 4 LSBs are currently defined according ## to RFC31 (https://rfc.vac.dev/spec/31/). - WakuEnrBitfield* = uint8 + WakuEnrBitfield* = uint8 + + ## See: https://rfc.vac.dev/spec/31/#waku2-enr-key + ## each enum numbers maps to a bit (where 0 is the LSB) + Capabilities* = enum + Relay = 0, + Store = 1, + Filter = 2, + Lightpush = 3, func toFieldPair(multiaddrs: seq[MultiAddress]): FieldPair = ## Converts a seq of multiaddrs to a `multiaddrs` ENR @@ -90,6 +99,14 @@ func initWakuFlags*(lightpush, filter, store, relay: bool): WakuEnrBitfield = if store: v.setBit(1) if relay: v.setBit(0) + # TODO: With the changes in this PR, this can be refactored? Using the enum? + # Perhaps refactor to: + # WaKuEnr.initEnr(..., capabilities=[Store, Lightpush]) + # WaKuEnr.initEnr(..., capabilities=[Store, Lightpush, Relay, Filter]) + + # Safer also since we dont inject WakuEnrBitfield, and we let this package + # handle the bits according to the capabilities + return v.WakuEnrBitfield func toMultiAddresses*(multiaddrsField: seq[byte]): seq[MultiAddress] = @@ -151,3 +168,12 @@ func initEnr*(privateKey: crypto.PrivateKey, wakuEnrFields).expect("Record within size limits") return enr + +proc supportsCapability*(r: Record, capability: Capabilities): bool = + let enrCapabilities = r.get(WAKU_ENR_FIELD, seq[byte]) + if enrCapabilities.isOk(): + return testBit(enrCapabilities.get()[0], capability.ord) + return false + +proc getCapabilities*(r: Record): seq[Capabilities] = + return toSeq(Capabilities.low..Capabilities.high).filterIt(r.supportsCapability(it)) \ No newline at end of file