mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-04 06:53:12 +00:00
deploy: 3f9807b08353b5d69e6ea01938971311f42337a5
This commit is contained in:
parent
79a16115a4
commit
1906a6acfc
3
Makefile
3
Makefile
@ -191,6 +191,9 @@ wakucanary: | build deps
|
|||||||
echo -e $(BUILD_MSG) "build/$@" && \
|
echo -e $(BUILD_MSG) "build/$@" && \
|
||||||
$(ENV_SCRIPT) nim wakucanary $(NIM_PARAMS) waku.nims
|
$(ENV_SCRIPT) nim wakucanary $(NIM_PARAMS) waku.nims
|
||||||
|
|
||||||
|
networkmonitor: | build deps
|
||||||
|
echo -e $(BUILD_MSG) "build/$@" && \
|
||||||
|
$(ENV_SCRIPT) nim networkmonitor $(NIM_PARAMS) waku.nims
|
||||||
|
|
||||||
## Waku docs
|
## Waku docs
|
||||||
|
|
||||||
|
|||||||
@ -1,10 +1,10 @@
|
|||||||
{.used.}
|
{.used.}
|
||||||
|
|
||||||
import
|
import
|
||||||
testutils/unittests,
|
std/[options,sequtils],
|
||||||
std/options,
|
|
||||||
stew/byteutils,
|
|
||||||
chronos,
|
chronos,
|
||||||
|
stew/byteutils,
|
||||||
|
testutils/unittests,
|
||||||
../../waku/v2/utils/wakuenr,
|
../../waku/v2/utils/wakuenr,
|
||||||
../test_helpers
|
../test_helpers
|
||||||
|
|
||||||
@ -125,3 +125,93 @@ procSuite "ENR utils":
|
|||||||
|
|
||||||
for knownMultiaddr in knownMultiaddrs:
|
for knownMultiaddr in knownMultiaddrs:
|
||||||
check decodedAddrs.contains(knownMultiaddr)
|
check decodedAddrs.contains(knownMultiaddr)
|
||||||
|
|
||||||
|
asyncTest "Supports specific capabilities encoded in the ENR":
|
||||||
|
let
|
||||||
|
enrIp = ValidIpAddress.init("127.0.0.1")
|
||||||
|
enrTcpPort, enrUdpPort = Port(60000)
|
||||||
|
enrKey = wakuenr.crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||||
|
multiaddrs = @[MultiAddress.init("/ip4/127.0.0.1/tcp/442/ws")[]]
|
||||||
|
|
||||||
|
# TODO: Refactor initEnr, provide enums as inputs initEnr(capabilites=[Store,Filter])
|
||||||
|
# TODO: safer than a util function and directly using the bits
|
||||||
|
# test all flag combinations 2^4 = 16 (b0000-b1111)
|
||||||
|
records = toSeq(0b0000_0000'u8..0b0000_1111'u8)
|
||||||
|
.mapIt(initEnr(enrKey,
|
||||||
|
some(enrIp),
|
||||||
|
some(enrTcpPort),
|
||||||
|
some(enrUdpPort),
|
||||||
|
some(uint8(it)),
|
||||||
|
multiaddrs))
|
||||||
|
|
||||||
|
# same order: lightpush | filter| store | relay
|
||||||
|
expectedCapabilities = @[[false, false, false, false],
|
||||||
|
[false, false, false, true],
|
||||||
|
[false, false, true, false],
|
||||||
|
[false, false, true, true],
|
||||||
|
[false, true, false, false],
|
||||||
|
[false, true, false, true],
|
||||||
|
[false, true, true, false],
|
||||||
|
[false, true, true, true],
|
||||||
|
[true, false, false, false],
|
||||||
|
[true, false, false, true],
|
||||||
|
[true, false, true, false],
|
||||||
|
[true, false, true, true],
|
||||||
|
[true, true, false, false],
|
||||||
|
[true, true, false, true],
|
||||||
|
[true, true, true, false],
|
||||||
|
[true, true, true, true]]
|
||||||
|
|
||||||
|
for i, record in records:
|
||||||
|
for j, capability in @[Lightpush, Filter, Store, Relay]:
|
||||||
|
check expectedCapabilities[i][j] == record.supportsCapability(capability)
|
||||||
|
|
||||||
|
asyncTest "Get all supported capabilities encoded in the ENR":
|
||||||
|
let
|
||||||
|
enrIp = ValidIpAddress.init("127.0.0.1")
|
||||||
|
enrTcpPort, enrUdpPort = Port(60000)
|
||||||
|
enrKey = wakuenr.crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||||
|
multiaddrs = @[MultiAddress.init("/ip4/127.0.0.1/tcp/442/ws")[]]
|
||||||
|
|
||||||
|
records = @[0b0000_0000'u8,
|
||||||
|
0b0000_1111'u8,
|
||||||
|
0b0000_1001'u8,
|
||||||
|
0b0000_1110'u8,
|
||||||
|
0b0000_1000'u8,]
|
||||||
|
.mapIt(initEnr(enrKey,
|
||||||
|
some(enrIp),
|
||||||
|
some(enrTcpPort),
|
||||||
|
some(enrUdpPort),
|
||||||
|
some(uint8(it)),
|
||||||
|
multiaddrs))
|
||||||
|
|
||||||
|
# expected capabilities, ordered LSB to MSB
|
||||||
|
expectedCapabilities: seq[seq[Capabilities]] = @[
|
||||||
|
#[0b0000_0000]# @[],
|
||||||
|
#[0b0000_1111]# @[Relay, Store, Filter, Lightpush],
|
||||||
|
#[0b0000_1001]# @[Relay, Lightpush],
|
||||||
|
#[0b0000_1110]# @[Store, Filter, Lightpush],
|
||||||
|
#[0b0000_1000]# @[Lightpush]]
|
||||||
|
|
||||||
|
for i, actualExpetedTuple in zip(records, expectedCapabilities):
|
||||||
|
check actualExpetedTuple[0].getCapabilities() == actualExpetedTuple[1]
|
||||||
|
|
||||||
|
asyncTest "Get supported capabilities of a non waku node":
|
||||||
|
|
||||||
|
# non waku enr, i.e. Ethereum one
|
||||||
|
let nonWakuEnr = "enr:-KG4QOtcP9X1FbIMOe17QNMKqDxCpm14jcX5tiOE4_TyMrFqbmhPZHK_ZPG2G"&
|
||||||
|
"xb1GE2xdtodOfx9-cgvNtxnRyHEmC0ghGV0aDKQ9aX9QgAAAAD__________4JpZIJ2NIJpcIQDE8KdiXNl"&
|
||||||
|
"Y3AyNTZrMaEDhpehBDbZjM_L9ek699Y7vhUJ-eAdMyQW_Fil522Y0fODdGNwgiMog3VkcIIjKA"
|
||||||
|
|
||||||
|
var nonWakuEnrRecord: Record
|
||||||
|
|
||||||
|
check:
|
||||||
|
nonWakuEnrRecord.fromURI(nonWakuEnr)
|
||||||
|
|
||||||
|
# check that it doesn't support any capability and it doesnt't break
|
||||||
|
check:
|
||||||
|
nonWakuEnrRecord.getCapabilities() == []
|
||||||
|
nonWakuEnrRecord.supportsCapability(Relay) == false
|
||||||
|
nonWakuEnrRecord.supportsCapability(Store) == false
|
||||||
|
nonWakuEnrRecord.supportsCapability(Filter) == false
|
||||||
|
nonWakuEnrRecord.supportsCapability(Lightpush) == false
|
||||||
@ -1,4 +1,4 @@
|
|||||||
## waku canary tool
|
## waku canary tool
|
||||||
|
|
||||||
Attempts to dial a peer and asserts it supports a given set of protocols.
|
Attempts to dial a peer and asserts it supports a given set of protocols.
|
||||||
|
|
||||||
@ -45,4 +45,59 @@ Note that a domain name can also be used.
|
|||||||
$ ./build/wakucanary --address=/dns4/node-01.do-ams3.status.test.statusim.net/tcp/30303/p2p/16Uiu2HAkukebeXjTQ9QDBeNDWuGfbaSg79wkkhK4vPocLgR6QFDf --protocol=store --protocol=filter
|
$ ./build/wakucanary --address=/dns4/node-01.do-ams3.status.test.statusim.net/tcp/30303/p2p/16Uiu2HAkukebeXjTQ9QDBeNDWuGfbaSg79wkkhK4vPocLgR6QFDf --protocol=store --protocol=filter
|
||||||
$ echo $?
|
$ echo $?
|
||||||
0
|
0
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## networkmonitor
|
||||||
|
|
||||||
|
Monitoring tool to run in an existing `waku` network with the following features:
|
||||||
|
* Keeps discovering new peers using `discv5`
|
||||||
|
* Tracks advertised capabilities of each node as per stored in the ENR `waku` field
|
||||||
|
* Attempts to connect to all nodes, tracking which protocols each node supports
|
||||||
|
* Presents grafana-ready metrics showing the state of the network in terms of locations, ips, number discovered peers, number of peers we could connect to, user-agent that each peer contains, etc.
|
||||||
|
* Metrics are exposed through prometheus metrics but also with a custom rest api, presenting detailed information about each peer.
|
||||||
|
|
||||||
|
### Usage
|
||||||
|
|
||||||
|
```console
|
||||||
|
./build/networkmonitor --help
|
||||||
|
Usage:
|
||||||
|
|
||||||
|
networkmonitor [OPTIONS]...
|
||||||
|
|
||||||
|
The following options are available:
|
||||||
|
|
||||||
|
-l, --log-level Sets the log level [=LogLevel.DEBUG].
|
||||||
|
-t, --timeout Timeout to consider that the connection failed [=chronos.seconds(10)].
|
||||||
|
-b, --bootstrap-node Bootstrap ENR node. Argument may be repeated. [=@[""]].
|
||||||
|
-r, --refresh-interval How often new peers are discovered and connected to (in minutes) [=10].
|
||||||
|
--metrics-server Enable the metrics server: true|false [=true].
|
||||||
|
--metrics-server-address Listening address of the metrics server. [=ValidIpAddress.init("127.0.0.1")].
|
||||||
|
--metrics-server-port Listening HTTP port of the metrics server. [=8008].
|
||||||
|
--metrics-rest-address Listening address of the metrics rest server. [=127.0.0.1].
|
||||||
|
--metrics-rest-port Listening HTTP port of the metrics rest server. [=8009].
|
||||||
|
```
|
||||||
|
|
||||||
|
### Example
|
||||||
|
|
||||||
|
Connect to the network through a given bootstrap node, with default parameters. Once its running, metrics will be live at `localhost:8008/metrics`
|
||||||
|
|
||||||
|
```console
|
||||||
|
./build/networkmonitor --log-level=INFO --b="enr:-Nm4QOdTOKZJKTUUZ4O_W932CXIET-M9NamewDnL78P5u9DOGnZlK0JFZ4k0inkfe6iY-0JAaJVovZXc575VV3njeiABgmlkgnY0gmlwhAjS3ueKbXVsdGlhZGRyc7g6ADg2MW5vZGUtMDEuYWMtY24taG9uZ2tvbmctYy53YWt1djIucHJvZC5zdGF0dXNpbS5uZXQGH0DeA4lzZWNwMjU2azGhAo0C-VvfgHiXrxZi3umDiooXMGY9FvYj5_d1Q4EeS7eyg3RjcIJ2X4N1ZHCCIyiFd2FrdTIP"
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
### metrics
|
||||||
|
|
||||||
|
The following metrics are available. See `http://localhost:8008/metrics`
|
||||||
|
|
||||||
|
* peer_type_as_per_enr: Number of peers supporting each capability according the the ENR (Relay, Store, Lightpush, Filter)
|
||||||
|
* peer_type_as_per_protocol: Number of peers supporting each protocol, after a successful connection)
|
||||||
|
* peer_user_agents: List of useragents found in the network and their count
|
||||||
|
|
||||||
|
Other relevant metrics reused from `nim-eth`:
|
||||||
|
* routing_table_nodes: Inherited from nim-eth, number of nodes in the routing table
|
||||||
|
* discovery_message_requests_outgoing_total: Inherited from nim-eth, number of outging discovery requests, useful to know if the node is actiely looking for new peers
|
||||||
|
|
||||||
|
The following metrics are exposed via a custom rest api. See `http://localhost:8009/allpeersinfo`
|
||||||
|
|
||||||
|
* json list of all peers with extra information such as ip, locatio, supported protocols and last connection time.
|
||||||
|
|||||||
258
tools/networkmonitor/networkmonitor.nim
Normal file
258
tools/networkmonitor/networkmonitor.nim
Normal file
@ -0,0 +1,258 @@
|
|||||||
|
when (NimMajor, NimMinor) < (1, 4):
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
else:
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
import
|
||||||
|
std/[tables,strutils,times,sequtils,httpclient],
|
||||||
|
chronicles,
|
||||||
|
chronicles/topics_registry,
|
||||||
|
chronos,
|
||||||
|
confutils,
|
||||||
|
eth/keys,
|
||||||
|
eth/p2p/discoveryv5/enr,
|
||||||
|
libp2p/crypto/crypto,
|
||||||
|
metrics,
|
||||||
|
metrics/chronos_httpserver,
|
||||||
|
presto/[route, server],
|
||||||
|
stew/shims/net
|
||||||
|
|
||||||
|
import
|
||||||
|
../../waku/v2/node/discv5/waku_discv5,
|
||||||
|
../../waku/v2/node/peer_manager/peer_manager,
|
||||||
|
../../waku/v2/node/waku_node,
|
||||||
|
../../waku/v2/utils/wakuenr,
|
||||||
|
../../waku/v2/utils/peers,
|
||||||
|
./networkmonitor_metrics,
|
||||||
|
./networkmonitor_config,
|
||||||
|
./networkmonitor_utils
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
topics = "networkmonitor"
|
||||||
|
|
||||||
|
proc setDiscoveredPeersCapabilities(
|
||||||
|
routingTableNodes: seq[Node]) =
|
||||||
|
for capability in @[Relay, Store, Filter, Lightpush]:
|
||||||
|
let nOfNodesWithCapability = routingTableNodes.countIt(it.record.supportsCapability(capability))
|
||||||
|
info "capabilities as per ENR waku flag", capability=capability, amount=nOfNodesWithCapability
|
||||||
|
peer_type_as_per_enr.set(int64(nOfNodesWithCapability), labelValues = [$capability])
|
||||||
|
|
||||||
|
# TODO: Split in discover, connect, populate ips
|
||||||
|
proc setConnectedPeersMetrics(discoveredNodes: seq[Node],
|
||||||
|
node: WakuNode,
|
||||||
|
timeout: chronos.Duration,
|
||||||
|
client: HttpClient,
|
||||||
|
allPeers: CustomPeersTableRef) {.async.} =
|
||||||
|
|
||||||
|
let currentTime = $getTime()
|
||||||
|
|
||||||
|
# Protocols and agent string and its count
|
||||||
|
var allProtocols: Table[string, int]
|
||||||
|
var allAgentStrings: Table[string, int]
|
||||||
|
|
||||||
|
# iterate all newly discovered nodes
|
||||||
|
for discNode in discoveredNodes:
|
||||||
|
let typedRecord = discNode.record.toTypedRecord()
|
||||||
|
if not typedRecord.isOk():
|
||||||
|
warn "could not convert record to typed record", record=discNode.record
|
||||||
|
continue
|
||||||
|
|
||||||
|
let secp256k1 = typedRecord.get().secp256k1
|
||||||
|
if not secp256k1.isSome():
|
||||||
|
warn "could not get secp256k1 key", typedRecord=typedRecord.get()
|
||||||
|
continue
|
||||||
|
|
||||||
|
# create new entry if new peerId found
|
||||||
|
let peerId = secp256k1.get().toHex()
|
||||||
|
let customPeerInfo = CustomPeerInfo(peerId: peerId)
|
||||||
|
if not allPeers.hasKey(peerId):
|
||||||
|
allPeers[peerId] = customPeerInfo
|
||||||
|
|
||||||
|
allPeers[peerId].lastTimeDiscovered = currentTime
|
||||||
|
allPeers[peerId].enr = discNode.record.toURI()
|
||||||
|
allPeers[peerId].enrCapabilities = discNode.record.getCapabilities().mapIt($it)
|
||||||
|
|
||||||
|
if not typedRecord.get().ip.isSome():
|
||||||
|
warn "ip field is not set", record=typedRecord.get()
|
||||||
|
continue
|
||||||
|
|
||||||
|
let ip = $typedRecord.get().ip.get().join(".")
|
||||||
|
allPeers[peerId].ip = ip
|
||||||
|
|
||||||
|
# get more info the peers from its ip address
|
||||||
|
let location = await ipToLocation(ip, client)
|
||||||
|
if not location.isOk():
|
||||||
|
warn "could not get location", ip=ip
|
||||||
|
continue
|
||||||
|
|
||||||
|
allPeers[peerId].country = location.get().country
|
||||||
|
allPeers[peerId].city = location.get().city
|
||||||
|
|
||||||
|
let peer = toRemotePeerInfo(discNode.record)
|
||||||
|
if not peer.isOk():
|
||||||
|
warn "error converting record to remote peer info", record=discNode.record
|
||||||
|
continue
|
||||||
|
|
||||||
|
# try to connect to the peer
|
||||||
|
let timedOut = not await node.connectToNodes(@[peer.get()]).withTimeout(timeout)
|
||||||
|
if timedOut:
|
||||||
|
warn "could not connect to peer, timedout", timeout=timeout, peer=peer.get()
|
||||||
|
continue
|
||||||
|
|
||||||
|
# after connection, get supported protocols
|
||||||
|
let lp2pPeerStore = node.switch.peerStore
|
||||||
|
let nodeProtocols = lp2pPeerStore[ProtoBook][peer.get().peerId]
|
||||||
|
allPeers[peerId].supportedProtocols = nodeProtocols
|
||||||
|
allPeers[peerId].lastTimeConnected = currentTime
|
||||||
|
|
||||||
|
# after connection, get user-agent
|
||||||
|
let nodeUserAgent = lp2pPeerStore[AgentBook][peer.get().peerId]
|
||||||
|
allPeers[peerId].userAgent = nodeUserAgent
|
||||||
|
|
||||||
|
# store avaiable protocols in the network
|
||||||
|
for protocol in nodeProtocols:
|
||||||
|
if not allProtocols.hasKey(protocol):
|
||||||
|
allProtocols[protocol] = 0
|
||||||
|
allProtocols[protocol] += 1
|
||||||
|
|
||||||
|
# store available user-agents in the network
|
||||||
|
if not allAgentStrings.hasKey(nodeUserAgent):
|
||||||
|
allAgentStrings[nodeUserAgent] = 0
|
||||||
|
allAgentStrings[nodeUserAgent] += 1
|
||||||
|
|
||||||
|
debug "connected to peer", peer=allPeers[customPeerInfo.peerId]
|
||||||
|
|
||||||
|
# inform the total connections that we did in this round
|
||||||
|
let nOfOkConnections = allProtocols.len()
|
||||||
|
info "number of successful connections", amount=nOfOkConnections
|
||||||
|
|
||||||
|
# update count on each protocol
|
||||||
|
for protocol in allProtocols.keys():
|
||||||
|
let countOfProtocols = allProtocols[protocol]
|
||||||
|
peer_type_as_per_protocol.set(int64(countOfProtocols), labelValues = [protocol])
|
||||||
|
info "supported protocols in the network", protocol=protocol, count=countOfProtocols
|
||||||
|
|
||||||
|
# update count on each user-agent
|
||||||
|
for userAgent in allAgentStrings.keys():
|
||||||
|
let countOfUserAgent = allAgentStrings[userAgent]
|
||||||
|
peer_user_agents.set(int64(countOfUserAgent), labelValues = [userAgent])
|
||||||
|
info "user agents participating in the network", userAgent=userAgent, count=countOfUserAgent
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: Split in discovery, connections, and ip2location
|
||||||
|
# crawls the network discovering peers and trying to connect to them
|
||||||
|
# metrics are processed and exposed
|
||||||
|
proc crawlNetwork(node: WakuNode,
|
||||||
|
conf: NetworkMonitorConf,
|
||||||
|
allPeersRef: CustomPeersTableRef) {.async.} =
|
||||||
|
|
||||||
|
let crawlInterval = conf.refreshInterval * 1000 * 60
|
||||||
|
let client = newHttpClient()
|
||||||
|
while true:
|
||||||
|
# discover new random nodes
|
||||||
|
let discoveredNodes = await node.wakuDiscv5.protocol.queryRandom()
|
||||||
|
|
||||||
|
# nodes are nested into bucket, flat it
|
||||||
|
let flatNodes = node.wakuDiscv5.protocol.routingTable.buckets.mapIt(it.nodes).flatten()
|
||||||
|
|
||||||
|
# populate metrics related to capabilities as advertised by the ENR (see waku field)
|
||||||
|
setDiscoveredPeersCapabilities(flatNodes)
|
||||||
|
|
||||||
|
# tries to connect to all newly discovered nodes
|
||||||
|
# and populates metrics related to peers we could connect
|
||||||
|
# note random discovered nodes can be already known
|
||||||
|
await setConnectedPeersMetrics(discoveredNodes, node, conf.timeout, client, allPeersRef)
|
||||||
|
|
||||||
|
let totalNodes = flatNodes.len
|
||||||
|
let seenNodes = flatNodes.countIt(it.seen)
|
||||||
|
|
||||||
|
info "discovered nodes: ", total=totalNodes, seen=seenNodes
|
||||||
|
|
||||||
|
# Notes:
|
||||||
|
# we dont run ipMajorityLoop
|
||||||
|
# we dont run revalidateLoop
|
||||||
|
|
||||||
|
await sleepAsync(crawlInterval)
|
||||||
|
|
||||||
|
proc initAndStartNode(conf: NetworkMonitorConf): Result[WakuNode, string] =
|
||||||
|
let
|
||||||
|
# some hardcoded parameters
|
||||||
|
rng = keys.newRng()
|
||||||
|
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
|
||||||
|
nodeTcpPort = Port(60000)
|
||||||
|
nodeUdpPort = Port(9000)
|
||||||
|
flags = initWakuFlags(lightpush = false, filter = false, store = false, relay = true)
|
||||||
|
|
||||||
|
try:
|
||||||
|
let
|
||||||
|
bindIp = ValidIpAddress.init("0.0.0.0")
|
||||||
|
extIp = ValidIpAddress.init("127.0.0.1")
|
||||||
|
node = WakuNode.new(nodeKey, bindIp, nodeTcpPort)
|
||||||
|
|
||||||
|
# mount discv5
|
||||||
|
node.wakuDiscv5 = WakuDiscoveryV5.new(
|
||||||
|
some(extIp), some(nodeTcpPort), some(nodeUdpPort),
|
||||||
|
bindIp, nodeUdpPort, conf.bootstrapNodes, false,
|
||||||
|
keys.PrivateKey(nodeKey.skkey), flags, [], node.rng)
|
||||||
|
|
||||||
|
node.wakuDiscv5.protocol.open()
|
||||||
|
return ok(node)
|
||||||
|
except:
|
||||||
|
error("could not start node")
|
||||||
|
|
||||||
|
proc startRestApiServer(conf: NetworkMonitorConf,
|
||||||
|
allPeersRef: CustomPeersTableRef): Result[void, string] =
|
||||||
|
try:
|
||||||
|
let serverAddress = initTAddress(conf.metricsRestAddress & ":" & $conf.metricsRestPort)
|
||||||
|
proc validate(pattern: string, value: string): int =
|
||||||
|
if pattern.startsWith("{") and pattern.endsWith("}"): 0
|
||||||
|
else: 1
|
||||||
|
var router = RestRouter.init(validate)
|
||||||
|
router.installHandler(allPeersRef)
|
||||||
|
var sres = RestServerRef.new(router, serverAddress)
|
||||||
|
let restServer = sres.get()
|
||||||
|
restServer.start()
|
||||||
|
except:
|
||||||
|
error("could not start rest api server")
|
||||||
|
ok()
|
||||||
|
|
||||||
|
when isMainModule:
|
||||||
|
# known issue: confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
|
||||||
|
{.pop.}
|
||||||
|
let confRes = NetworkMonitorConf.loadConfig()
|
||||||
|
if confRes.isErr():
|
||||||
|
error "could not load cli variables", err=confRes.error
|
||||||
|
quit(1)
|
||||||
|
|
||||||
|
let conf = confRes.get()
|
||||||
|
info "cli flags", conf=conf
|
||||||
|
|
||||||
|
if conf.logLevel != LogLevel.NONE:
|
||||||
|
setLogLevel(conf.logLevel)
|
||||||
|
|
||||||
|
# list of peers that we have discovered/connected
|
||||||
|
var allPeersRef = CustomPeersTableRef()
|
||||||
|
|
||||||
|
# start metrics server
|
||||||
|
if conf.metricsServer:
|
||||||
|
let res = startMetricsServer(conf.metricsServerAddress, Port(conf.metricsServerPort))
|
||||||
|
if res.isErr:
|
||||||
|
error "could not start metrics server", err=res.error
|
||||||
|
quit(1)
|
||||||
|
|
||||||
|
# start rest server for custom metrics
|
||||||
|
let res = startRestApiServer(conf, allPeersRef)
|
||||||
|
if res.isErr:
|
||||||
|
error "could not start rest api server", err=res.error
|
||||||
|
|
||||||
|
# start waku node
|
||||||
|
let node = initAndStartNode(conf)
|
||||||
|
if node.isErr:
|
||||||
|
error "could not start node"
|
||||||
|
quit 1
|
||||||
|
|
||||||
|
# spawn the routine that crawls the network
|
||||||
|
# TODO: split into 3 routines (discovery, connections, ip2location)
|
||||||
|
asyncSpawn crawlNetwork(node.get(), conf, allPeersRef)
|
||||||
|
|
||||||
|
runForever()
|
||||||
86
tools/networkmonitor/networkmonitor_config.nim
Normal file
86
tools/networkmonitor/networkmonitor_config.nim
Normal file
@ -0,0 +1,86 @@
|
|||||||
|
import
|
||||||
|
std/strutils,
|
||||||
|
chronicles,
|
||||||
|
chronicles/topics_registry,
|
||||||
|
chronos,
|
||||||
|
confutils,
|
||||||
|
stew/results,
|
||||||
|
stew/shims/net
|
||||||
|
|
||||||
|
type
|
||||||
|
NetworkMonitorConf* = object
|
||||||
|
logLevel* {.
|
||||||
|
desc: "Sets the log level",
|
||||||
|
defaultValue: LogLevel.DEBUG,
|
||||||
|
name: "log-level",
|
||||||
|
abbr: "l" .}: LogLevel
|
||||||
|
|
||||||
|
timeout* {.
|
||||||
|
desc: "Timeout to consider that the connection failed",
|
||||||
|
defaultValue: chronos.seconds(10),
|
||||||
|
name: "timeout",
|
||||||
|
abbr: "t" }: chronos.Duration
|
||||||
|
|
||||||
|
bootstrapNodes* {.
|
||||||
|
desc: "Bootstrap ENR node. Argument may be repeated.",
|
||||||
|
defaultValue: @[""],
|
||||||
|
name: "bootstrap-node",
|
||||||
|
abbr: "b" }: seq[string]
|
||||||
|
|
||||||
|
refreshInterval* {.
|
||||||
|
desc: "How often new peers are discovered and connected to (in minutes)",
|
||||||
|
defaultValue: 10,
|
||||||
|
name: "refresh-interval",
|
||||||
|
abbr: "r" }: int
|
||||||
|
|
||||||
|
## Prometheus metrics config
|
||||||
|
metricsServer* {.
|
||||||
|
desc: "Enable the metrics server: true|false"
|
||||||
|
defaultValue: true
|
||||||
|
name: "metrics-server" }: bool
|
||||||
|
|
||||||
|
metricsServerAddress* {.
|
||||||
|
desc: "Listening address of the metrics server."
|
||||||
|
defaultValue: ValidIpAddress.init("127.0.0.1")
|
||||||
|
name: "metrics-server-address" }: ValidIpAddress
|
||||||
|
|
||||||
|
metricsServerPort* {.
|
||||||
|
desc: "Listening HTTP port of the metrics server."
|
||||||
|
defaultValue: 8008
|
||||||
|
name: "metrics-server-port" }: uint16
|
||||||
|
|
||||||
|
## Custom metrics rest server
|
||||||
|
metricsRestAddress* {.
|
||||||
|
desc: "Listening address of the metrics rest server.",
|
||||||
|
defaultValue: "127.0.0.1",
|
||||||
|
name: "metrics-rest-address" }: string
|
||||||
|
metricsRestPort* {.
|
||||||
|
desc: "Listening HTTP port of the metrics rest server.",
|
||||||
|
defaultValue: 8009,
|
||||||
|
name: "metrics-rest-port" }: uint16
|
||||||
|
|
||||||
|
|
||||||
|
proc parseCmdArg*(T: type ValidIpAddress, p: string): T =
|
||||||
|
try:
|
||||||
|
result = ValidIpAddress.init(p)
|
||||||
|
except CatchableError as e:
|
||||||
|
raise newException(ConfigurationError, "Invalid IP address")
|
||||||
|
|
||||||
|
proc completeCmdArg*(T: type ValidIpAddress, val: string): seq[string] =
|
||||||
|
return @[]
|
||||||
|
|
||||||
|
proc parseCmdArg*(T: type chronos.Duration, p: string): T =
|
||||||
|
try:
|
||||||
|
result = chronos.seconds(parseInt(p))
|
||||||
|
except CatchableError as e:
|
||||||
|
raise newException(ConfigurationError, "Invalid duration value")
|
||||||
|
|
||||||
|
proc completeCmdArg*(T: type chronos.Duration, val: string): seq[string] =
|
||||||
|
return @[]
|
||||||
|
|
||||||
|
proc loadConfig*(T: type NetworkMonitorConf): Result[T, string] =
|
||||||
|
try:
|
||||||
|
let conf = NetworkMonitorConf.load()
|
||||||
|
ok(conf)
|
||||||
|
except CatchableError:
|
||||||
|
err(getCurrentExceptionMsg())
|
||||||
73
tools/networkmonitor/networkmonitor_metrics.nim
Normal file
73
tools/networkmonitor/networkmonitor_metrics.nim
Normal file
@ -0,0 +1,73 @@
|
|||||||
|
when (NimMajor, NimMinor) < (1, 4):
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
else:
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
import
|
||||||
|
std/[json,tables,sequtils],
|
||||||
|
chronicles,
|
||||||
|
chronicles/topics_registry,
|
||||||
|
chronos,
|
||||||
|
json_serialization,
|
||||||
|
metrics,
|
||||||
|
metrics/chronos_httpserver,
|
||||||
|
presto/route,
|
||||||
|
presto/server,
|
||||||
|
stew/results,
|
||||||
|
stew/shims/net
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
topics = "networkmonitor_metrics"
|
||||||
|
|
||||||
|
# On top of our custom metrics, the following are reused from nim-eth
|
||||||
|
#routing_table_nodes{state=""}
|
||||||
|
#routing_table_nodes{state="seen"}
|
||||||
|
#discovery_message_requests_outgoing_total{response=""}
|
||||||
|
#discovery_message_requests_outgoing_total{response="no_response"}
|
||||||
|
|
||||||
|
declarePublicGauge peer_type_as_per_enr,
|
||||||
|
"Number of peers supporting each capability according the the ENR",
|
||||||
|
labels = ["capability"]
|
||||||
|
|
||||||
|
declarePublicGauge peer_type_as_per_protocol,
|
||||||
|
"Number of peers supporting each protocol, after a successful connection) ",
|
||||||
|
labels = ["protocols"]
|
||||||
|
|
||||||
|
declarePublicGauge peer_user_agents,
|
||||||
|
"Number of peers with each user agent",
|
||||||
|
labels = ["user_agent"]
|
||||||
|
|
||||||
|
type
|
||||||
|
CustomPeerInfo* = object
|
||||||
|
# populated after discovery
|
||||||
|
lastTimeDiscovered*: string
|
||||||
|
peerId*: string
|
||||||
|
enr*: string
|
||||||
|
ip*: string
|
||||||
|
enrCapabilities*: seq[string]
|
||||||
|
country*: string
|
||||||
|
city*: string
|
||||||
|
|
||||||
|
# only after ok connection
|
||||||
|
lastTimeConnected*: string
|
||||||
|
supportedProtocols*: seq[string]
|
||||||
|
userAgent*: string
|
||||||
|
|
||||||
|
CustomPeersTableRef* = TableRef[string, CustomPeerInfo]
|
||||||
|
|
||||||
|
# GET /allpeersinfo
|
||||||
|
proc installHandler*(router: var RestRouter, allPeers: CustomPeersTableRef) =
|
||||||
|
router.api(MethodGet, "/allpeersinfo") do () -> RestApiResponse:
|
||||||
|
let values = toSeq(allPeers.values())
|
||||||
|
return RestApiResponse.response(values.toJson(), contentType="application/json")
|
||||||
|
|
||||||
|
proc startMetricsServer*(serverIp: ValidIpAddress, serverPort: Port): Result[void, string] =
|
||||||
|
info "Starting metrics HTTP server", serverIp, serverPort
|
||||||
|
|
||||||
|
try:
|
||||||
|
startMetricsHttpServer($serverIp, serverPort)
|
||||||
|
except Exception as e:
|
||||||
|
error("Failed to start metrics HTTP server", serverIp=serverIp, serverPort=serverPort, msg=e.msg)
|
||||||
|
|
||||||
|
info "Metrics HTTP server started", serverIp, serverPort
|
||||||
|
ok()
|
||||||
53
tools/networkmonitor/networkmonitor_utils.nim
Normal file
53
tools/networkmonitor/networkmonitor_utils.nim
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
when (NimMajor, NimMinor) < (1, 4):
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
else:
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
import
|
||||||
|
std/[json,httpclient],
|
||||||
|
chronicles,
|
||||||
|
chronicles/topics_registry,
|
||||||
|
chronos,
|
||||||
|
stew/results
|
||||||
|
|
||||||
|
type
|
||||||
|
NodeLocation = object
|
||||||
|
country*: string
|
||||||
|
city*: string
|
||||||
|
lat*: string
|
||||||
|
long*: string
|
||||||
|
isp*: string
|
||||||
|
|
||||||
|
proc flatten*[T](a: seq[seq[T]]): seq[T] =
|
||||||
|
var aFlat = newSeq[T](0)
|
||||||
|
for subseq in a:
|
||||||
|
aFlat &= subseq
|
||||||
|
return aFlat
|
||||||
|
|
||||||
|
# using an external api retrieves some data associated with the ip
|
||||||
|
# TODO: use a cache
|
||||||
|
# TODO: use nim-presto's HTTP asynchronous client
|
||||||
|
proc ipToLocation*(ip: string,
|
||||||
|
client: Httpclient):
|
||||||
|
Future[Result[NodeLocation, string]] {.async.} =
|
||||||
|
# naive mechanism to avoid hitting the rate limit
|
||||||
|
# IP-API endpoints are now limited to 45 HTTP requests per minute
|
||||||
|
await sleepAsync(1400)
|
||||||
|
try:
|
||||||
|
let content = client.getContent("http://ip-api.com/json/" & ip)
|
||||||
|
let jsonContent = parseJson(content)
|
||||||
|
|
||||||
|
if $jsonContent["status"].getStr() != "success":
|
||||||
|
error "query failed", result=jsonContent
|
||||||
|
return err("query failed: " & $jsonContent)
|
||||||
|
|
||||||
|
return ok(NodeLocation(
|
||||||
|
country: jsonContent["country"].getStr(),
|
||||||
|
city: jsonContent["city"].getStr(),
|
||||||
|
lat: jsonContent["lat"].getStr(),
|
||||||
|
long: jsonContent["lon"].getStr(),
|
||||||
|
isp: jsonContent["isp"].getStr()
|
||||||
|
))
|
||||||
|
except:
|
||||||
|
error "failed to get the location for IP", ip=ip, error=getCurrentExceptionMsg()
|
||||||
|
return err("failed to get the location for IP '" & ip & "':" & getCurrentExceptionMsg())
|
||||||
3
tools/networkmonitor/nim.cfg
Normal file
3
tools/networkmonitor/nim.cfg
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
-d:chronicles_line_numbers
|
||||||
|
-d:chronicles_runtime_filtering:on
|
||||||
|
-d:discv5_protocol_id:d5waku
|
||||||
@ -108,3 +108,7 @@ task chat2bridge, "Build chat2bridge":
|
|||||||
task wakucanary, "Build waku-canary tool":
|
task wakucanary, "Build waku-canary tool":
|
||||||
let name = "wakucanary"
|
let name = "wakucanary"
|
||||||
buildBinary name, "tools/wakucanary/", "-d:chronicles_log_level=TRACE"
|
buildBinary name, "tools/wakucanary/", "-d:chronicles_log_level=TRACE"
|
||||||
|
|
||||||
|
task networkmonitor, "Build network monitor tool":
|
||||||
|
let name = "networkmonitor"
|
||||||
|
buildBinary name, "tools/networkmonitor/", "-d:chronicles_log_level=TRACE"
|
||||||
|
|||||||
@ -11,7 +11,8 @@ import
|
|||||||
libp2p/[multiaddress, multicodec],
|
libp2p/[multiaddress, multicodec],
|
||||||
libp2p/crypto/crypto,
|
libp2p/crypto/crypto,
|
||||||
stew/[endians2, results],
|
stew/[endians2, results],
|
||||||
stew/shims/net
|
stew/shims/net,
|
||||||
|
std/bitops
|
||||||
|
|
||||||
export enr, crypto, multiaddress, net
|
export enr, crypto, multiaddress, net
|
||||||
|
|
||||||
@ -23,7 +24,15 @@ type
|
|||||||
## 8-bit flag field to indicate Waku capabilities.
|
## 8-bit flag field to indicate Waku capabilities.
|
||||||
## Only the 4 LSBs are currently defined according
|
## Only the 4 LSBs are currently defined according
|
||||||
## to RFC31 (https://rfc.vac.dev/spec/31/).
|
## to RFC31 (https://rfc.vac.dev/spec/31/).
|
||||||
WakuEnrBitfield* = uint8
|
WakuEnrBitfield* = uint8
|
||||||
|
|
||||||
|
## See: https://rfc.vac.dev/spec/31/#waku2-enr-key
|
||||||
|
## each enum numbers maps to a bit (where 0 is the LSB)
|
||||||
|
Capabilities* = enum
|
||||||
|
Relay = 0,
|
||||||
|
Store = 1,
|
||||||
|
Filter = 2,
|
||||||
|
Lightpush = 3,
|
||||||
|
|
||||||
func toFieldPair(multiaddrs: seq[MultiAddress]): FieldPair =
|
func toFieldPair(multiaddrs: seq[MultiAddress]): FieldPair =
|
||||||
## Converts a seq of multiaddrs to a `multiaddrs` ENR
|
## Converts a seq of multiaddrs to a `multiaddrs` ENR
|
||||||
@ -90,6 +99,14 @@ func initWakuFlags*(lightpush, filter, store, relay: bool): WakuEnrBitfield =
|
|||||||
if store: v.setBit(1)
|
if store: v.setBit(1)
|
||||||
if relay: v.setBit(0)
|
if relay: v.setBit(0)
|
||||||
|
|
||||||
|
# TODO: With the changes in this PR, this can be refactored? Using the enum?
|
||||||
|
# Perhaps refactor to:
|
||||||
|
# WaKuEnr.initEnr(..., capabilities=[Store, Lightpush])
|
||||||
|
# WaKuEnr.initEnr(..., capabilities=[Store, Lightpush, Relay, Filter])
|
||||||
|
|
||||||
|
# Safer also since we dont inject WakuEnrBitfield, and we let this package
|
||||||
|
# handle the bits according to the capabilities
|
||||||
|
|
||||||
return v.WakuEnrBitfield
|
return v.WakuEnrBitfield
|
||||||
|
|
||||||
func toMultiAddresses*(multiaddrsField: seq[byte]): seq[MultiAddress] =
|
func toMultiAddresses*(multiaddrsField: seq[byte]): seq[MultiAddress] =
|
||||||
@ -151,3 +168,12 @@ func initEnr*(privateKey: crypto.PrivateKey,
|
|||||||
wakuEnrFields).expect("Record within size limits")
|
wakuEnrFields).expect("Record within size limits")
|
||||||
|
|
||||||
return enr
|
return enr
|
||||||
|
|
||||||
|
proc supportsCapability*(r: Record, capability: Capabilities): bool =
|
||||||
|
let enrCapabilities = r.get(WAKU_ENR_FIELD, seq[byte])
|
||||||
|
if enrCapabilities.isOk():
|
||||||
|
return testBit(enrCapabilities.get()[0], capability.ord)
|
||||||
|
return false
|
||||||
|
|
||||||
|
proc getCapabilities*(r: Record): seq[Capabilities] =
|
||||||
|
return toSeq(Capabilities.low..Capabilities.high).filterIt(r.supportsCapability(it))
|
||||||
Loading…
x
Reference in New Issue
Block a user