mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-28 23:10:54 +00:00
chore(networkmonitor): refactor setConnectedPeersMetrics, make it partially concurrent, add version (#2080)
* chore(networkmonitor): refactor setConnectedPeersMetrics, make it partially concurrent, add version * add more metrics, refactor how most metrics are calculated * rework metrics table fillup * reset connErr to make sure we honour successful reconnection
This commit is contained in:
parent
aaf10e0871
commit
c5aa97046b
@ -35,15 +35,71 @@ logScope:
|
||||
topics = "networkmonitor"
|
||||
|
||||
const ReconnectTime = 60
|
||||
const MaxConnectionRetries = 10
|
||||
const MaxConnectionRetries = 5
|
||||
const ResetRetriesAfter = 1200
|
||||
const AvgPingWindow = 10.0
|
||||
|
||||
const git_version* {.strdefine.} = "n/a"
|
||||
|
||||
proc setDiscoveredPeersCapabilities(
|
||||
routingTableNodes: seq[Node]) =
|
||||
for capability in @[Relay, Store, Filter, Lightpush]:
|
||||
let nOfNodesWithCapability = routingTableNodes.countIt(it.record.supportsCapability(capability))
|
||||
info "capabilities as per ENR waku flag", capability=capability, amount=nOfNodesWithCapability
|
||||
peer_type_as_per_enr.set(int64(nOfNodesWithCapability), labelValues = [$capability])
|
||||
networkmonitor_peer_type_as_per_enr.set(int64(nOfNodesWithCapability), labelValues = [$capability])
|
||||
|
||||
proc analyzePeer(
|
||||
customPeerInfo: CustomPeerInfoRef,
|
||||
peerInfo: RemotePeerInfo,
|
||||
node: WakuNode,
|
||||
timeout: chronos.Duration
|
||||
): Future[Result[string, string]] {.async.} =
|
||||
var pingDelay: chronos.Duration
|
||||
|
||||
proc ping(): Future[Result[void, string]] {.async, gcsafe.} =
|
||||
try:
|
||||
let conn = await node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec)
|
||||
pingDelay = await node.libp2pPing.ping(conn)
|
||||
return ok()
|
||||
|
||||
except CatchableError:
|
||||
var msg = getCurrentExceptionMsg()
|
||||
if msg == "Future operation cancelled!":
|
||||
msg = "timedout"
|
||||
warn "failed to ping the peer", peer=peerInfo, err=msg
|
||||
|
||||
customPeerInfo.connError = msg
|
||||
return err("could not ping peer: " & msg)
|
||||
|
||||
let timedOut = not await ping().withTimeout(timeout)
|
||||
# need this check for pingDelat == 0 because there may be a conn error before timeout
|
||||
if timedOut or pingDelay == 0.millis:
|
||||
customPeerInfo.retries += 1
|
||||
return err(customPeerInfo.connError)
|
||||
|
||||
customPeerInfo.connError = ""
|
||||
info "successfully pinged peer", peer=peerInfo, duration=pingDelay.millis
|
||||
networkmonitor_peer_ping.observe(pingDelay.millis)
|
||||
|
||||
if customPeerInfo.avgPingDuration == 0.millis:
|
||||
customPeerInfo.avgPingDuration = pingDelay
|
||||
|
||||
# TODO: check why the calculation ends up losing precision
|
||||
customPeerInfo.avgPingDuration = int64((float64(customPeerInfo.avgPingDuration.millis) * (AvgPingWindow - 1.0) + float64(pingDelay.millis)) / AvgPingWindow).millis
|
||||
customPeerInfo.lastPingDuration = pingDelay
|
||||
|
||||
return ok(customPeerInfo.peerId)
|
||||
|
||||
proc shouldReconnect(customPeerInfo: CustomPeerInfoRef): bool =
|
||||
let reconnetIntervalCheck = getTime().toUnix() >= customPeerInfo.lastTimeConnected + ReconnectTime
|
||||
var retriesCheck = customPeerInfo.retries < MaxConnectionRetries
|
||||
|
||||
if not retriesCheck and getTime().toUnix() >= customPeerInfo.lastTimeConnected + ResetRetriesAfter:
|
||||
customPeerInfo.retries = 0
|
||||
retriesCheck = true
|
||||
info "resetting retries counter", peerId=customPeerInfo.peerId
|
||||
|
||||
return reconnetIntervalCheck and retriesCheck
|
||||
|
||||
# TODO: Split in discover, connect
|
||||
proc setConnectedPeersMetrics(discoveredNodes: seq[Node],
|
||||
@ -54,11 +110,10 @@ proc setConnectedPeersMetrics(discoveredNodes: seq[Node],
|
||||
|
||||
let currentTime = getTime().toUnix()
|
||||
|
||||
# Protocols and agent string and its count
|
||||
var allProtocols: Table[string, int]
|
||||
var allAgentStrings: Table[string, int]
|
||||
|
||||
var newPeers = 0
|
||||
var successfulConnections = 0
|
||||
|
||||
var analyzeFuts: seq[Future[Result[string, string]]]
|
||||
|
||||
# iterate all newly discovered nodes
|
||||
for discNode in discoveredNodes:
|
||||
@ -80,103 +135,104 @@ proc setConnectedPeersMetrics(discoveredNodes: seq[Node],
|
||||
|
||||
# create new entry if new peerId found
|
||||
let peerId = $peerInfo.peerId
|
||||
let customPeerInfo = CustomPeerInfo(peerId: peerId)
|
||||
|
||||
if not allPeers.hasKey(peerId):
|
||||
allPeers[peerId] = customPeerInfo
|
||||
allPeers[peerId] = CustomPeerInfoRef(peerId: peerId)
|
||||
newPeers += 1
|
||||
else:
|
||||
info "already seen", peerId=peerId
|
||||
|
||||
allPeers[peerId].lastTimeDiscovered = currentTime
|
||||
allPeers[peerId].enr = discNode.record.toURI()
|
||||
allPeers[peerId].enrCapabilities = discNode.record.getCapabilities().mapIt($it)
|
||||
allPeers[peerId].discovered += 1
|
||||
let customPeerInfo = allPeers[peerId]
|
||||
|
||||
customPeerInfo.lastTimeDiscovered = currentTime
|
||||
customPeerInfo.enr = discNode.record.toURI()
|
||||
customPeerInfo.enrCapabilities = discNode.record.getCapabilities().mapIt($it)
|
||||
customPeerInfo.discovered += 1
|
||||
|
||||
if not typedRecord.get().ip.isSome():
|
||||
warn "ip field is not set", record=typedRecord.get()
|
||||
continue
|
||||
|
||||
let ip = $typedRecord.get().ip.get().join(".")
|
||||
allPeers[peerId].ip = ip
|
||||
customPeerInfo.ip = ip
|
||||
|
||||
# try to ping the peer
|
||||
if getTime().toUnix() >= allPeers[peerId].lastTimeConnected + ReconnectTime and allPeers[peerId].retries < MaxConnectionRetries:
|
||||
if allPeers[peerId].retries > 0:
|
||||
warn "trying to dial failed peer again", peerId=peerId, retry=allPeers[peerId].retries
|
||||
if shouldReconnect(customPeerInfo):
|
||||
if customPeerInfo.retries > 0:
|
||||
warn "trying to dial failed peer again", peerId=peerId, retry=customPeerInfo.retries
|
||||
analyzeFuts.add(analyzePeer(customPeerInfo, peerInfo, node, timeout))
|
||||
|
||||
var pingDelay:chronos.Duration
|
||||
# Wait for all connection attempts to finish
|
||||
let analyzedPeers = await allFinished(analyzeFuts)
|
||||
|
||||
proc ping(): Future[Result[void, string]] {.async, gcsafe.} =
|
||||
try:
|
||||
let conn = await node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec)
|
||||
pingDelay = await node.libp2pPing.ping(conn)
|
||||
return ok()
|
||||
|
||||
except CatchableError:
|
||||
var msg = getCurrentExceptionMsg()
|
||||
if msg == "Future operation cancelled!":
|
||||
msg = "timedout"
|
||||
warn "failed to ping the peer", peer=peerInfo, err=msg
|
||||
|
||||
allPeers[peerId].connError = msg
|
||||
return err("could not ping peer: " & msg)
|
||||
|
||||
let timedOut = not await ping().withTimeout(timeout)
|
||||
# need this check for pingDelat == 0 because there may be a conn error before timeout
|
||||
if timedOut or pingDelay == 0.millis:
|
||||
allPeers[peerId].retries += 1
|
||||
for peerIdFut in analyzedPeers:
|
||||
let peerIdRes = await peerIdFut
|
||||
let peerIdStr = peerIdRes.valueOr():
|
||||
continue
|
||||
|
||||
info "successfully pinged peer", peer=peerInfo, duration=pingDelay.millis
|
||||
peer_ping.observe(pingDelay.millis)
|
||||
successfulConnections += 1
|
||||
let peerId = PeerId.init(peerIdStr).valueOr():
|
||||
warn "failed to parse peerId", peerId=peerIdStr
|
||||
continue
|
||||
var customPeerInfo = allPeers[peerIdStr]
|
||||
|
||||
if allPeers[peerId].avgPingDuration == 0.millis:
|
||||
allPeers[peerId].avgPingDuration = pingDelay
|
||||
|
||||
# TODO: check why the calculation ends up losing precision
|
||||
allPeers[peerId].avgPingDuration = int64((float64(allPeers[peerId].avgPingDuration.millis) * (AvgPingWindow - 1.0) + float64(pingDelay.millis)) / AvgPingWindow).millis
|
||||
allPeers[peerId].lastPingDuration = pingDelay
|
||||
debug "connected to peer", peer=customPeerInfo[]
|
||||
|
||||
# after connection, get supported protocols
|
||||
let lp2pPeerStore = node.switch.peerStore
|
||||
let nodeProtocols = lp2pPeerStore[ProtoBook][peerInfo.peerId]
|
||||
allPeers[peerId].supportedProtocols = nodeProtocols
|
||||
allPeers[peerId].lastTimeConnected = currentTime
|
||||
let nodeProtocols = lp2pPeerStore[ProtoBook][peerId]
|
||||
customPeerInfo.supportedProtocols = nodeProtocols
|
||||
customPeerInfo.lastTimeConnected = currentTime
|
||||
|
||||
# after connection, get user-agent
|
||||
let nodeUserAgent = lp2pPeerStore[AgentBook][peerInfo.peerId]
|
||||
allPeers[peerId].userAgent = nodeUserAgent
|
||||
|
||||
# store avaiable protocols in the network
|
||||
for protocol in nodeProtocols:
|
||||
if not allProtocols.hasKey(protocol):
|
||||
allProtocols[protocol] = 0
|
||||
allProtocols[protocol] += 1
|
||||
|
||||
# store available user-agents in the network
|
||||
if not allAgentStrings.hasKey(nodeUserAgent):
|
||||
allAgentStrings[nodeUserAgent] = 0
|
||||
allAgentStrings[nodeUserAgent] += 1
|
||||
|
||||
debug "connected to peer", peer=allPeers[customPeerInfo.peerId]
|
||||
let nodeUserAgent = lp2pPeerStore[AgentBook][peerId]
|
||||
customPeerInfo.userAgent = nodeUserAgent
|
||||
|
||||
info "number of newly discovered peers", amount=newPeers
|
||||
# inform the total connections that we did in this round
|
||||
let nOfOkConnections = allProtocols.len()
|
||||
info "number of successful connections", amount=nOfOkConnections
|
||||
info "number of successful connections", amount=successfulConnections
|
||||
|
||||
proc updateMetrics(allPeersRef: CustomPeersTableRef) {.gcsafe.} =
|
||||
var allProtocols: Table[string, int]
|
||||
var allAgentStrings: Table[string, int]
|
||||
var countries: Table[string, int]
|
||||
var connectedPeers = 0
|
||||
var failedPeers = 0
|
||||
|
||||
for peerInfo in allPeersRef.values:
|
||||
if peerInfo.connError == "":
|
||||
for protocol in peerInfo.supportedProtocols:
|
||||
allProtocols[protocol] = allProtocols.mgetOrPut(protocol, 0) + 1
|
||||
|
||||
# store available user-agents in the network
|
||||
allAgentStrings[peerInfo.userAgent] = allAgentStrings.mgetOrPut(peerInfo.userAgent, 0) + 1
|
||||
|
||||
if peerInfo.country != "":
|
||||
countries[peerInfo.country] = countries.mgetOrPut(peerInfo.country, 0) + 1
|
||||
|
||||
connectedPeers += 1
|
||||
else:
|
||||
failedPeers += 1
|
||||
|
||||
networkmonitor_peer_count.set(int64(connectedPeers), labelValues = ["true"])
|
||||
networkmonitor_peer_count.set(int64(failedPeers), labelValues = ["false"])
|
||||
# update count on each protocol
|
||||
for protocol in allProtocols.keys():
|
||||
let countOfProtocols = allProtocols[protocol]
|
||||
peer_type_as_per_protocol.set(int64(countOfProtocols), labelValues = [protocol])
|
||||
let countOfProtocols = allProtocols.mgetOrPut(protocol, 0)
|
||||
networkmonitor_peer_type_as_per_protocol.set(int64(countOfProtocols), labelValues = [protocol])
|
||||
info "supported protocols in the network", protocol=protocol, count=countOfProtocols
|
||||
|
||||
# update count on each user-agent
|
||||
for userAgent in allAgentStrings.keys():
|
||||
let countOfUserAgent = allAgentStrings[userAgent]
|
||||
peer_user_agents.set(int64(countOfUserAgent), labelValues = [userAgent])
|
||||
let countOfUserAgent = allAgentStrings.mgetOrPut(userAgent, 0)
|
||||
networkmonitor_peer_user_agents.set(int64(countOfUserAgent), labelValues = [userAgent])
|
||||
info "user agents participating in the network", userAgent=userAgent, count=countOfUserAgent
|
||||
|
||||
for country in countries.keys():
|
||||
let peerCount = countries.mgetOrPut(country, 0)
|
||||
networkmonitor_peer_country_count.set(int64(peerCount), labelValues = [country])
|
||||
info "number of peers per country", country=country, count=peerCount
|
||||
|
||||
proc populateInfoFromIp(allPeersRef: CustomPeersTableRef,
|
||||
restClient: RestClientRef) {.async.} =
|
||||
for peer in allPeersRef.keys():
|
||||
@ -209,6 +265,7 @@ proc crawlNetwork(node: WakuNode,
|
||||
|
||||
let crawlInterval = conf.refreshInterval * 1000
|
||||
while true:
|
||||
let startTime = Moment.now()
|
||||
# discover new random nodes
|
||||
let discoveredNodes = await wakuDiscv5.protocol.queryRandom()
|
||||
|
||||
@ -223,6 +280,8 @@ proc crawlNetwork(node: WakuNode,
|
||||
# note random discovered nodes can be already known
|
||||
await setConnectedPeersMetrics(discoveredNodes, node, conf.timeout, restClient, allPeersRef)
|
||||
|
||||
updateMetrics(allPeersRef)
|
||||
|
||||
# populate info from ip addresses
|
||||
await populateInfoFromIp(allPeersRef, restClient)
|
||||
|
||||
@ -234,8 +293,12 @@ proc crawlNetwork(node: WakuNode,
|
||||
# Notes:
|
||||
# we dont run ipMajorityLoop
|
||||
# we dont run revalidateLoop
|
||||
let endTime = Moment.now()
|
||||
let elapsed = (endTime - startTime).nanos
|
||||
|
||||
await sleepAsync(crawlInterval.millis)
|
||||
info "crawl duration", time=elapsed.millis
|
||||
|
||||
await sleepAsync(crawlInterval.millis - elapsed.millis)
|
||||
|
||||
proc retrieveDynamicBootstrapNodes(dnsDiscovery: bool, dnsDiscoveryUrl: string, dnsDiscoveryNameServers: seq[ValidIpAddress]): Result[seq[RemotePeerInfo], string] =
|
||||
if dnsDiscovery and dnsDiscoveryUrl != "":
|
||||
|
@ -64,7 +64,6 @@ type
|
||||
defaultValue: 8009,
|
||||
name: "metrics-rest-port" }: uint16
|
||||
|
||||
|
||||
proc parseCmdArg*(T: type ValidIpAddress, p: string): T =
|
||||
try:
|
||||
result = ValidIpAddress.init(p)
|
||||
@ -85,7 +84,7 @@ proc completeCmdArg*(T: type chronos.Duration, val: string): seq[string] =
|
||||
|
||||
proc loadConfig*(T: type NetworkMonitorConf): Result[T, string] =
|
||||
try:
|
||||
let conf = NetworkMonitorConf.load()
|
||||
let conf = NetworkMonitorConf.load(version=git_version)
|
||||
ok(conf)
|
||||
except CatchableError:
|
||||
err(getCurrentExceptionMsg())
|
||||
|
@ -25,22 +25,30 @@ logScope:
|
||||
#discovery_message_requests_outgoing_total{response=""}
|
||||
#discovery_message_requests_outgoing_total{response="no_response"}
|
||||
|
||||
declarePublicGauge peer_type_as_per_enr,
|
||||
declarePublicGauge networkmonitor_peer_type_as_per_enr,
|
||||
"Number of peers supporting each capability according the the ENR",
|
||||
labels = ["capability"]
|
||||
|
||||
declarePublicGauge peer_type_as_per_protocol,
|
||||
declarePublicGauge networkmonitor_peer_type_as_per_protocol,
|
||||
"Number of peers supporting each protocol, after a successful connection) ",
|
||||
labels = ["protocols"]
|
||||
|
||||
declarePublicGauge peer_user_agents,
|
||||
declarePublicGauge networkmonitor_peer_user_agents,
|
||||
"Number of peers with each user agent",
|
||||
labels = ["user_agent"]
|
||||
|
||||
declarePublicHistogram peer_ping,
|
||||
declarePublicHistogram networkmonitor_peer_ping,
|
||||
"Histogram tracking ping durations for discovered peers",
|
||||
buckets = [100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0, Inf]
|
||||
|
||||
declarePublicGauge networkmonitor_peer_count,
|
||||
"Number of discovered peers",
|
||||
labels = ["connected"]
|
||||
|
||||
declarePublicGauge networkmonitor_peer_country_count,
|
||||
"Number of peers per country",
|
||||
labels = ["country"]
|
||||
|
||||
type
|
||||
CustomPeerInfo* = object
|
||||
# populated after discovery
|
||||
@ -64,8 +72,10 @@ type
|
||||
# only after a ok/nok connection
|
||||
connError*: string
|
||||
|
||||
CustomPeerInfoRef* = ref CustomPeerInfo
|
||||
|
||||
# Stores information about all discovered/connected peers
|
||||
CustomPeersTableRef* = TableRef[string, CustomPeerInfo]
|
||||
CustomPeersTableRef* = TableRef[string, CustomPeerInfoRef]
|
||||
|
||||
# stores the content topic and the count of rx messages
|
||||
ContentTopicMessageTableRef* = TableRef[string, int]
|
||||
|
Loading…
x
Reference in New Issue
Block a user