nwaku/apps/networkmonitor/networkmonitor.nim

391 lines
14 KiB
Nim
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}
import
std/[tables,strutils,times,sequtils],
stew/results,
stew/shims/net,
chronicles,
chronicles/topics_registry,
chronos,
chronos/timer as ctime,
confutils,
eth/keys,
eth/p2p/discoveryv5/enr,
libp2p/crypto/crypto,
libp2p/nameresolving/dnsresolver,
metrics,
metrics/chronos_httpserver,
presto/[route, server, client]
import
../../waku/v2/waku_core,
../../waku/v2/node/peer_manager,
../../waku/v2/waku_node,
../../waku/v2/waku_enr,
../../waku/v2/waku_discv5,
../../waku/v2/waku_dnsdisc,
./networkmonitor_metrics,
./networkmonitor_config,
./networkmonitor_utils
logScope:
topics = "networkmonitor"
proc setDiscoveredPeersCapabilities(
routingTableNodes: seq[Node]) =
for capability in @[Relay, Store, Filter, Lightpush]:
let nOfNodesWithCapability = routingTableNodes.countIt(it.record.supportsCapability(capability))
info "capabilities as per ENR waku flag", capability=capability, amount=nOfNodesWithCapability
peer_type_as_per_enr.set(int64(nOfNodesWithCapability), labelValues = [$capability])
# TODO: Split in discover, connect
proc setConnectedPeersMetrics(discoveredNodes: seq[Node],
node: WakuNode,
timeout: chronos.Duration,
restClient: RestClientRef,
allPeers: CustomPeersTableRef) {.async.} =
let currentTime = $getTime()
# Protocols and agent string and its count
var allProtocols: Table[string, int]
var allAgentStrings: Table[string, int]
# iterate all newly discovered nodes
for discNode in discoveredNodes:
let typedRecord = discNode.record.toTypedRecord()
if not typedRecord.isOk():
warn "could not convert record to typed record", record=discNode.record
continue
let secp256k1 = typedRecord.get().secp256k1
if not secp256k1.isSome():
warn "could not get secp256k1 key", typedRecord=typedRecord.get()
continue
# create new entry if new peerId found
let peerId = secp256k1.get().toHex()
let customPeerInfo = CustomPeerInfo(peerId: peerId)
if not allPeers.hasKey(peerId):
allPeers[peerId] = customPeerInfo
allPeers[peerId].lastTimeDiscovered = currentTime
allPeers[peerId].enr = discNode.record.toURI()
allPeers[peerId].enrCapabilities = discNode.record.getCapabilities().mapIt($it)
if not typedRecord.get().ip.isSome():
warn "ip field is not set", record=typedRecord.get()
continue
let ip = $typedRecord.get().ip.get().join(".")
allPeers[peerId].ip = ip
let peer = toRemotePeerInfo(discNode.record)
if not peer.isOk():
warn "error converting record to remote peer info", record=discNode.record
continue
# try to connect to the peer
# TODO: check last connection time and if not > x, skip connecting
let timedOut = not await node.connectToNodes(@[peer.get()]).withTimeout(timeout)
if timedOut:
warn "could not connect to peer, timedout", timeout=timeout, peer=peer.get()
# TODO: Add other staates
allPeers[peerId].connError = "timedout"
continue
# after connection, get supported protocols
let lp2pPeerStore = node.switch.peerStore
let nodeProtocols = lp2pPeerStore[ProtoBook][peer.get().peerId]
allPeers[peerId].supportedProtocols = nodeProtocols
allPeers[peerId].lastTimeConnected = currentTime
# after connection, get user-agent
let nodeUserAgent = lp2pPeerStore[AgentBook][peer.get().peerId]
allPeers[peerId].userAgent = nodeUserAgent
# store avaiable protocols in the network
for protocol in nodeProtocols:
if not allProtocols.hasKey(protocol):
allProtocols[protocol] = 0
allProtocols[protocol] += 1
# store available user-agents in the network
if not allAgentStrings.hasKey(nodeUserAgent):
allAgentStrings[nodeUserAgent] = 0
allAgentStrings[nodeUserAgent] += 1
debug "connected to peer", peer=allPeers[customPeerInfo.peerId]
# inform the total connections that we did in this round
let nOfOkConnections = allProtocols.len()
info "number of successful connections", amount=nOfOkConnections
# update count on each protocol
for protocol in allProtocols.keys():
let countOfProtocols = allProtocols[protocol]
peer_type_as_per_protocol.set(int64(countOfProtocols), labelValues = [protocol])
info "supported protocols in the network", protocol=protocol, count=countOfProtocols
# update count on each user-agent
for userAgent in allAgentStrings.keys():
let countOfUserAgent = allAgentStrings[userAgent]
peer_user_agents.set(int64(countOfUserAgent), labelValues = [userAgent])
info "user agents participating in the network", userAgent=userAgent, count=countOfUserAgent
proc populateInfoFromIp(allPeersRef: CustomPeersTableRef,
restClient: RestClientRef) {.async.} =
for peer in allPeersRef.keys():
if allPeersRef[peer].country != "" and allPeersRef[peer].city != "":
continue
# TODO: Update also if last update > x
if allPeersRef[peer].ip == "":
continue
# get more info the peers from its ip address
var location: NodeLocation
try:
# IP-API endpoints are now limited to 45 HTTP requests per minute
await sleepAsync(1400.millis)
let response = await restClient.ipToLocation(allPeersRef[peer].ip)
location = response.data
except CatchableError:
warn "could not get location", ip=allPeersRef[peer].ip
continue
allPeersRef[peer].country = location.country
allPeersRef[peer].city = location.city
# TODO: Split in discovery, connections, and ip2location
# crawls the network discovering peers and trying to connect to them
# metrics are processed and exposed
proc crawlNetwork(node: WakuNode,
restClient: RestClientRef,
conf: NetworkMonitorConf,
allPeersRef: CustomPeersTableRef) {.async.} =
let crawlInterval = conf.refreshInterval * 1000
while true:
# discover new random nodes
let discoveredNodes = await node.wakuDiscv5.protocol.queryRandom()
# nodes are nested into bucket, flat it
let flatNodes = node.wakuDiscv5.protocol.routingTable.buckets.mapIt(it.nodes).flatten()
# populate metrics related to capabilities as advertised by the ENR (see waku field)
setDiscoveredPeersCapabilities(flatNodes)
# tries to connect to all newly discovered nodes
# and populates metrics related to peers we could connect
# note random discovered nodes can be already known
await setConnectedPeersMetrics(discoveredNodes, node, conf.timeout, restClient, allPeersRef)
# populate info from ip addresses
await populateInfoFromIp(allPeersRef, restClient)
let totalNodes = flatNodes.len
let seenNodes = flatNodes.countIt(it.seen)
info "discovered nodes: ", total=totalNodes, seen=seenNodes
# Notes:
# we dont run ipMajorityLoop
# we dont run revalidateLoop
await sleepAsync(crawlInterval.millis)
proc retrieveDynamicBootstrapNodes(dnsDiscovery: bool, dnsDiscoveryUrl: string, dnsDiscoveryNameServers: seq[ValidIpAddress]): Result[seq[RemotePeerInfo], string] =
if dnsDiscovery and dnsDiscoveryUrl != "":
# DNS discovery
debug "Discovering nodes using Waku DNS discovery", url=dnsDiscoveryUrl
var nameServers: seq[TransportAddress]
for ip in dnsDiscoveryNameServers:
nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53
let dnsResolver = DnsResolver.new(nameServers)
proc resolver(domain: string): Future[string] {.async, gcsafe.} =
trace "resolving", domain=domain
let resolved = await dnsResolver.resolveTxt(domain)
return resolved[0] # Use only first answer
var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl, resolver)
if wakuDnsDiscovery.isOk():
return wakuDnsDiscovery.get().findPeers()
.mapErr(proc (e: cstring): string = $e)
else:
warn "Failed to init Waku DNS discovery"
debug "No method for retrieving dynamic bootstrap nodes specified."
ok(newSeq[RemotePeerInfo]()) # Return an empty seq by default
proc getBootstrapFromDiscDns(conf: NetworkMonitorConf): Result[seq[enr.Record], string] =
try:
let dnsNameServers = @[ValidIpAddress.init("1.1.1.1"), ValidIpAddress.init("1.0.0.1")]
let dynamicBootstrapNodesRes = retrieveDynamicBootstrapNodes(true, conf.dnsDiscoveryUrl, dnsNameServers)
if not dynamicBootstrapNodesRes.isOk():
error("failed discovering peers from DNS")
let dynamicBootstrapNodes = dynamicBootstrapNodesRes.get()
# select dynamic bootstrap nodes that have an ENR containing a udp port.
# Discv5 only supports UDP https://github.com/ethereum/devp2p/blob/master/discv5/discv5-theory.md)
var discv5BootstrapEnrs: seq[enr.Record]
for n in dynamicBootstrapNodes:
if n.enr.isSome():
let
enr = n.enr.get()
tenrRes = enr.toTypedRecord()
if tenrRes.isOk() and (tenrRes.get().udp.isSome() or tenrRes.get().udp6.isSome()):
discv5BootstrapEnrs.add(enr)
return ok(discv5BootstrapEnrs)
except CatchableError:
error("failed discovering peers from DNS")
proc initAndStartNode(conf: NetworkMonitorConf): Result[WakuNode, string] =
let
# some hardcoded parameters
rng = keys.newRng()
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
nodeTcpPort = Port(60000)
nodeUdpPort = Port(9000)
flags = CapabilitiesBitfield.init(lightpush = false, filter = false, store = false, relay = true)
try:
let
bindIp = ValidIpAddress.init("0.0.0.0")
extIp = ValidIpAddress.init("127.0.0.1")
var builder = WakuNodeBuilder.init()
builder.withNodeKey(nodeKey)
? builder.withNetworkConfigurationDetails(bindIp, nodeTcpPort)
let node = ? builder.build()
var discv5BootstrapEnrsRes = getBootstrapFromDiscDns(conf)
if not discv5BootstrapEnrsRes.isOk():
error("failed discovering peers from DNS")
var discv5BootstrapEnrs = discv5BootstrapEnrsRes.get()
# parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq
for enrUri in conf.bootstrapNodes:
addBootstrapNode(enrUri, discv5BootstrapEnrs)
# mount discv5
node.wakuDiscv5 = WakuDiscoveryV5.new(
some(extIp), some(nodeTcpPort), some(nodeUdpPort),
bindIp, nodeUdpPort, discv5BootstrapEnrs, false,
keys.PrivateKey(nodeKey.skkey), flags, @[], node.rng)
node.wakuDiscv5.protocol.open()
return ok(node)
except CatchableError:
error("could not start node")
proc startRestApiServer(conf: NetworkMonitorConf,
allPeersInfo: CustomPeersTableRef,
numMessagesPerContentTopic: ContentTopicMessageTableRef
): Result[void, string] =
try:
let serverAddress = initTAddress(conf.metricsRestAddress & ":" & $conf.metricsRestPort)
proc validate(pattern: string, value: string): int =
if pattern.startsWith("{") and pattern.endsWith("}"): 0
else: 1
var router = RestRouter.init(validate)
router.installHandler(allPeersInfo, numMessagesPerContentTopic)
var sres = RestServerRef.new(router, serverAddress)
let restServer = sres.get()
restServer.start()
except CatchableError:
error("could not start rest api server")
ok()
# handles rx of messages over a topic (see subscribe)
# counts the number of messages per content topic
proc subscribeAndHandleMessages(node: WakuNode,
pubsubTopic: PubsubTopic,
msgPerContentTopic: ContentTopicMessageTableRef) =
# handle function
proc handler(pubsubTopic: PubsubTopic, data: seq[byte]) {.async, gcsafe.} =
let messageRes = WakuMessage.decode(data)
if messageRes.isErr():
warn "could not decode message", data=data, pubsubTopic=pubsubTopic
let message = messageRes.get()
trace "rx message", pubsubTopic=pubsubTopic, contentTopic=message.contentTopic
# If we reach a table limit size, remove c topics with the least messages.
let tableSize = 100
if msgPerContentTopic.len > (tableSize - 1):
let minIndex = toSeq(msgPerContentTopic.values()).minIndex()
msgPerContentTopic.del(toSeq(msgPerContentTopic.keys())[minIndex])
# TODO: Will overflow at some point
# +1 if content topic existed, init to 1 otherwise
if msgPerContentTopic.hasKey(message.contentTopic):
msgPerContentTopic[message.contentTopic] += 1
else:
msgPerContentTopic[message.contentTopic] = 1
node.subscribe(pubsubTopic, handler)
when isMainModule:
# known issue: confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
{.pop.}
let confRes = NetworkMonitorConf.loadConfig()
if confRes.isErr():
error "could not load cli variables", err=confRes.error
quit(1)
let conf = confRes.get()
info "cli flags", conf=conf
if conf.logLevel != LogLevel.NONE:
setLogLevel(conf.logLevel)
# list of peers that we have discovered/connected
var allPeersInfo = CustomPeersTableRef()
# content topic and the number of messages that were received
var msgPerContentTopic = ContentTopicMessageTableRef()
# start metrics server
if conf.metricsServer:
let res = startMetricsServer(conf.metricsServerAddress, Port(conf.metricsServerPort))
if res.isErr():
error "could not start metrics server", err=res.error
quit(1)
# start rest server for custom metrics
let res = startRestApiServer(conf, allPeersInfo, msgPerContentTopic)
if res.isErr():
error "could not start rest api server", err=res.error
quit(1)
# create a rest client
let clientRest = RestClientRef.new(url="http://ip-api.com",
connectTimeout=ctime.seconds(2))
if clientRest.isErr():
error "could not start rest api client", err=res.error
quit(1)
let restClient = clientRest.get()
# start waku node
let nodeRes = initAndStartNode(conf)
if nodeRes.isErr():
error "could not start node"
quit 1
let node = nodeRes.get()
waitFor node.mountRelay()
# Subscribe the node to the default pubsubtopic, to count messages
subscribeAndHandleMessages(node, DefaultPubsubTopic, msgPerContentTopic)
# spawn the routine that crawls the network
# TODO: split into 3 routines (discovery, connections, ip2location)
asyncSpawn crawlNetwork(node, restClient, conf, allPeersInfo)
runForever()