nwaku/tools/networkmonitor/networkmonitor.nim

258 lines
8.8 KiB
Nim
Raw Normal View History

when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[tables,strutils,times,sequtils,httpclient],
chronicles,
chronicles/topics_registry,
chronos,
confutils,
eth/keys,
eth/p2p/discoveryv5/enr,
libp2p/crypto/crypto,
metrics,
metrics/chronos_httpserver,
presto/[route, server],
stew/shims/net
import
../../waku/v2/node/discv5/waku_discv5,
../../waku/v2/node/peer_manager/peer_manager,
../../waku/v2/node/waku_node,
../../waku/v2/utils/wakuenr,
../../waku/v2/utils/peers,
./networkmonitor_metrics,
./networkmonitor_config,
./networkmonitor_utils
logScope:
topics = "networkmonitor"
proc setDiscoveredPeersCapabilities(
routingTableNodes: seq[Node]) =
for capability in @[Relay, Store, Filter, Lightpush]:
let nOfNodesWithCapability = routingTableNodes.countIt(it.record.supportsCapability(capability))
info "capabilities as per ENR waku flag", capability=capability, amount=nOfNodesWithCapability
peer_type_as_per_enr.set(int64(nOfNodesWithCapability), labelValues = [$capability])
# TODO: Split in discover, connect, populate ips
proc setConnectedPeersMetrics(discoveredNodes: seq[Node],
node: WakuNode,
timeout: chronos.Duration,
client: HttpClient,
allPeers: CustomPeersTableRef) {.async.} =
let currentTime = $getTime()
# Protocols and agent string and its count
var allProtocols: Table[string, int]
var allAgentStrings: Table[string, int]
# iterate all newly discovered nodes
for discNode in discoveredNodes:
let typedRecord = discNode.record.toTypedRecord()
if not typedRecord.isOk():
warn "could not convert record to typed record", record=discNode.record
continue
let secp256k1 = typedRecord.get().secp256k1
if not secp256k1.isSome():
warn "could not get secp256k1 key", typedRecord=typedRecord.get()
continue
# create new entry if new peerId found
let peerId = secp256k1.get().toHex()
let customPeerInfo = CustomPeerInfo(peerId: peerId)
if not allPeers.hasKey(peerId):
allPeers[peerId] = customPeerInfo
allPeers[peerId].lastTimeDiscovered = currentTime
allPeers[peerId].enr = discNode.record.toURI()
allPeers[peerId].enrCapabilities = discNode.record.getCapabilities().mapIt($it)
if not typedRecord.get().ip.isSome():
warn "ip field is not set", record=typedRecord.get()
continue
let ip = $typedRecord.get().ip.get().join(".")
allPeers[peerId].ip = ip
# get more info the peers from its ip address
let location = await ipToLocation(ip, client)
if not location.isOk():
warn "could not get location", ip=ip
continue
allPeers[peerId].country = location.get().country
allPeers[peerId].city = location.get().city
let peer = toRemotePeerInfo(discNode.record)
if not peer.isOk():
warn "error converting record to remote peer info", record=discNode.record
continue
# try to connect to the peer
let timedOut = not await node.connectToNodes(@[peer.get()]).withTimeout(timeout)
if timedOut:
warn "could not connect to peer, timedout", timeout=timeout, peer=peer.get()
continue
# after connection, get supported protocols
let lp2pPeerStore = node.switch.peerStore
let nodeProtocols = lp2pPeerStore[ProtoBook][peer.get().peerId]
allPeers[peerId].supportedProtocols = nodeProtocols
allPeers[peerId].lastTimeConnected = currentTime
# after connection, get user-agent
let nodeUserAgent = lp2pPeerStore[AgentBook][peer.get().peerId]
allPeers[peerId].userAgent = nodeUserAgent
# store avaiable protocols in the network
for protocol in nodeProtocols:
if not allProtocols.hasKey(protocol):
allProtocols[protocol] = 0
allProtocols[protocol] += 1
# store available user-agents in the network
if not allAgentStrings.hasKey(nodeUserAgent):
allAgentStrings[nodeUserAgent] = 0
allAgentStrings[nodeUserAgent] += 1
debug "connected to peer", peer=allPeers[customPeerInfo.peerId]
# inform the total connections that we did in this round
let nOfOkConnections = allProtocols.len()
info "number of successful connections", amount=nOfOkConnections
# update count on each protocol
for protocol in allProtocols.keys():
let countOfProtocols = allProtocols[protocol]
peer_type_as_per_protocol.set(int64(countOfProtocols), labelValues = [protocol])
info "supported protocols in the network", protocol=protocol, count=countOfProtocols
# update count on each user-agent
for userAgent in allAgentStrings.keys():
let countOfUserAgent = allAgentStrings[userAgent]
peer_user_agents.set(int64(countOfUserAgent), labelValues = [userAgent])
info "user agents participating in the network", userAgent=userAgent, count=countOfUserAgent
# TODO: Split in discovery, connections, and ip2location
# crawls the network discovering peers and trying to connect to them
# metrics are processed and exposed
proc crawlNetwork(node: WakuNode,
conf: NetworkMonitorConf,
allPeersRef: CustomPeersTableRef) {.async.} =
let crawlInterval = conf.refreshInterval * 1000 * 60
let client = newHttpClient()
while true:
# discover new random nodes
let discoveredNodes = await node.wakuDiscv5.protocol.queryRandom()
# nodes are nested into bucket, flat it
let flatNodes = node.wakuDiscv5.protocol.routingTable.buckets.mapIt(it.nodes).flatten()
# populate metrics related to capabilities as advertised by the ENR (see waku field)
setDiscoveredPeersCapabilities(flatNodes)
# tries to connect to all newly discovered nodes
# and populates metrics related to peers we could connect
# note random discovered nodes can be already known
await setConnectedPeersMetrics(discoveredNodes, node, conf.timeout, client, allPeersRef)
let totalNodes = flatNodes.len
let seenNodes = flatNodes.countIt(it.seen)
info "discovered nodes: ", total=totalNodes, seen=seenNodes
# Notes:
# we dont run ipMajorityLoop
# we dont run revalidateLoop
await sleepAsync(crawlInterval)
proc initAndStartNode(conf: NetworkMonitorConf): Result[WakuNode, string] =
let
# some hardcoded parameters
rng = keys.newRng()
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
nodeTcpPort = Port(60000)
nodeUdpPort = Port(9000)
flags = initWakuFlags(lightpush = false, filter = false, store = false, relay = true)
try:
let
bindIp = ValidIpAddress.init("0.0.0.0")
extIp = ValidIpAddress.init("127.0.0.1")
node = WakuNode.new(nodeKey, bindIp, nodeTcpPort)
# mount discv5
node.wakuDiscv5 = WakuDiscoveryV5.new(
some(extIp), some(nodeTcpPort), some(nodeUdpPort),
bindIp, nodeUdpPort, conf.bootstrapNodes, false,
keys.PrivateKey(nodeKey.skkey), flags, [], node.rng)
node.wakuDiscv5.protocol.open()
return ok(node)
except:
error("could not start node")
proc startRestApiServer(conf: NetworkMonitorConf,
allPeersRef: CustomPeersTableRef): Result[void, string] =
try:
let serverAddress = initTAddress(conf.metricsRestAddress & ":" & $conf.metricsRestPort)
proc validate(pattern: string, value: string): int =
if pattern.startsWith("{") and pattern.endsWith("}"): 0
else: 1
var router = RestRouter.init(validate)
router.installHandler(allPeersRef)
var sres = RestServerRef.new(router, serverAddress)
let restServer = sres.get()
restServer.start()
except:
error("could not start rest api server")
ok()
when isMainModule:
# known issue: confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
{.pop.}
let confRes = NetworkMonitorConf.loadConfig()
if confRes.isErr():
error "could not load cli variables", err=confRes.error
quit(1)
let conf = confRes.get()
info "cli flags", conf=conf
if conf.logLevel != LogLevel.NONE:
setLogLevel(conf.logLevel)
# list of peers that we have discovered/connected
var allPeersRef = CustomPeersTableRef()
# start metrics server
if conf.metricsServer:
let res = startMetricsServer(conf.metricsServerAddress, Port(conf.metricsServerPort))
if res.isErr:
error "could not start metrics server", err=res.error
quit(1)
# start rest server for custom metrics
let res = startRestApiServer(conf, allPeersRef)
if res.isErr:
error "could not start rest api server", err=res.error
# start waku node
let node = initAndStartNode(conf)
if node.isErr:
error "could not start node"
quit 1
# spawn the routine that crawls the network
# TODO: split into 3 routines (discovery, connections, ip2location)
asyncSpawn crawlNetwork(node.get(), conf, allPeersRef)
runForever()