diff --git a/apps/networkmonitor/networkmonitor.nim b/apps/networkmonitor/networkmonitor.nim index ccd223c9e..fc0787543 100644 --- a/apps/networkmonitor/networkmonitor.nim +++ b/apps/networkmonitor/networkmonitor.nim @@ -16,6 +16,7 @@ import eth/p2p/discoveryv5/enr, libp2p/crypto/crypto, libp2p/nameresolving/dnsresolver, + libp2p/protocols/ping, metrics, metrics/chronos_httpserver, presto/[route, server, client] @@ -33,6 +34,10 @@ import logScope: topics = "networkmonitor" +const ReconnectTime = 60 +const MaxConnectionRetries = 10 +const AvgPingWindow = 10.0 + proc setDiscoveredPeersCapabilities( routingTableNodes: seq[Node]) = for capability in @[Relay, Store, Filter, Lightpush]: @@ -47,12 +52,14 @@ proc setConnectedPeersMetrics(discoveredNodes: seq[Node], restClient: RestClientRef, allPeers: CustomPeersTableRef) {.async.} = - let currentTime = $getTime() + let currentTime = getTime().toUnix() # Protocols and agent string and its count var allProtocols: Table[string, int] var allAgentStrings: Table[string, int] + var newPeers = 0 + # iterate all newly discovered nodes for discNode in discoveredNodes: let typedRecord = discNode.record.toTypedRecord() @@ -65,15 +72,25 @@ proc setConnectedPeersMetrics(discoveredNodes: seq[Node], warn "could not get secp256k1 key", typedRecord=typedRecord.get() continue + let peerRes = toRemotePeerInfo(discNode.record) + + let peerInfo = peerRes.valueOr(): + warn "error converting record to remote peer info", record=discNode.record + continue + # create new entry if new peerId found - let peerId = secp256k1.get().toHex() + let peerId = $peerInfo.peerId let customPeerInfo = CustomPeerInfo(peerId: peerId) if not allPeers.hasKey(peerId): allPeers[peerId] = customPeerInfo + newPeers += 1 + else: + info "already seen", peerId=peerId allPeers[peerId].lastTimeDiscovered = currentTime allPeers[peerId].enr = discNode.record.toURI() allPeers[peerId].enrCapabilities = discNode.record.getCapabilities().mapIt($it) + allPeers[peerId].discovered += 1 if not typedRecord.get().ip.isSome(): warn "ip field is not set", record=typedRecord.get() @@ -82,43 +99,68 @@ proc setConnectedPeersMetrics(discoveredNodes: seq[Node], let ip = $typedRecord.get().ip.get().join(".") allPeers[peerId].ip = ip - let peer = toRemotePeerInfo(discNode.record) - if not peer.isOk(): - warn "error converting record to remote peer info", record=discNode.record - continue + # try to ping the peer + if getTime().toUnix() >= allPeers[peerId].lastTimeConnected + ReconnectTime and allPeers[peerId].retries < MaxConnectionRetries: + if allPeers[peerId].retries > 0: + warn "trying to dial failed peer again", peerId=peerId, retry=allPeers[peerId].retries - # try to connect to the peer - # TODO: check last connection time and if not > x, skip connecting - let timedOut = not await node.connectToNodes(@[peer.get()]).withTimeout(timeout) - if timedOut: - warn "could not connect to peer, timedout", timeout=timeout, peer=peer.get() - # TODO: Add other staates - allPeers[peerId].connError = "timedout" - continue + var pingDelay:chronos.Duration - # after connection, get supported protocols - let lp2pPeerStore = node.switch.peerStore - let nodeProtocols = lp2pPeerStore[ProtoBook][peer.get().peerId] - allPeers[peerId].supportedProtocols = nodeProtocols - allPeers[peerId].lastTimeConnected = currentTime + proc ping(): Future[Result[void, string]] {.async, gcsafe.} = + try: + let conn = await node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec) + pingDelay = await node.libp2pPing.ping(conn) + return ok() - # after connection, get user-agent - let nodeUserAgent = lp2pPeerStore[AgentBook][peer.get().peerId] - allPeers[peerId].userAgent = nodeUserAgent + except CatchableError: + var msg = getCurrentExceptionMsg() + if msg == "Future operation cancelled!": + msg = "timedout" + warn "failed to ping the peer", peer=peerInfo, err=msg - # store avaiable protocols in the network - for protocol in nodeProtocols: - if not allProtocols.hasKey(protocol): - allProtocols[protocol] = 0 - allProtocols[protocol] += 1 + allPeers[peerId].connError = msg + return err("could not ping peer: " & msg) - # store available user-agents in the network - if not allAgentStrings.hasKey(nodeUserAgent): - allAgentStrings[nodeUserAgent] = 0 - allAgentStrings[nodeUserAgent] += 1 + let timedOut = not await ping().withTimeout(timeout) + # need this check for pingDelat == 0 because there may be a conn error before timeout + if timedOut or pingDelay == 0.millis: + allPeers[peerId].retries += 1 + continue - debug "connected to peer", peer=allPeers[customPeerInfo.peerId] + info "successfully pinged peer", peer=peerInfo, duration=pingDelay.millis + peer_ping.observe(pingDelay.millis) + if allPeers[peerId].avgPingDuration == 0.millis: + allPeers[peerId].avgPingDuration = pingDelay + + # TODO: check why the calculation ends up losing precision + allPeers[peerId].avgPingDuration = int64((float64(allPeers[peerId].avgPingDuration.millis) * (AvgPingWindow - 1.0) + float64(pingDelay.millis)) / AvgPingWindow).millis + allPeers[peerId].lastPingDuration = pingDelay + + # after connection, get supported protocols + let lp2pPeerStore = node.switch.peerStore + let nodeProtocols = lp2pPeerStore[ProtoBook][peerInfo.peerId] + allPeers[peerId].supportedProtocols = nodeProtocols + allPeers[peerId].lastTimeConnected = currentTime + + # after connection, get user-agent + let nodeUserAgent = lp2pPeerStore[AgentBook][peerInfo.peerId] + allPeers[peerId].userAgent = nodeUserAgent + + # store avaiable protocols in the network + for protocol in nodeProtocols: + if not allProtocols.hasKey(protocol): + allProtocols[protocol] = 0 + allProtocols[protocol] += 1 + + # store available user-agents in the network + if not allAgentStrings.hasKey(nodeUserAgent): + allAgentStrings[nodeUserAgent] = 0 + allAgentStrings[nodeUserAgent] += 1 + + debug "connected to peer", peer=allPeers[customPeerInfo.peerId] + + info "number of newly discovered peers", amount=newPeers # inform the total connections that we did in this round let nOfOkConnections = allProtocols.len() info "number of successful connections", amount=nOfOkConnections @@ -412,6 +454,7 @@ when isMainModule: let (node, discv5) = nodeRes.get() waitFor node.mountRelay() + waitFor node.mountLibp2pPing() # Subscribe the node to the default pubsubtopic, to count messages subscribeAndHandleMessages(node, DefaultPubsubTopic, msgPerContentTopic) diff --git a/apps/networkmonitor/networkmonitor_config.nim b/apps/networkmonitor/networkmonitor_config.nim index b29f5b823..769c2264f 100644 --- a/apps/networkmonitor/networkmonitor_config.nim +++ b/apps/networkmonitor/networkmonitor_config.nim @@ -11,7 +11,7 @@ type NetworkMonitorConf* = object logLevel* {. desc: "Sets the log level", - defaultValue: LogLevel.DEBUG, + defaultValue: LogLevel.INFO, name: "log-level", abbr: "l" .}: LogLevel diff --git a/apps/networkmonitor/networkmonitor_metrics.nim b/apps/networkmonitor/networkmonitor_metrics.nim index a136ab109..e5905ba46 100644 --- a/apps/networkmonitor/networkmonitor_metrics.nim +++ b/apps/networkmonitor/networkmonitor_metrics.nim @@ -37,10 +37,15 @@ declarePublicGauge peer_user_agents, "Number of peers with each user agent", labels = ["user_agent"] +declarePublicHistogram peer_ping, + "Histogram tracking ping durations for discovered peers", + buckets = [100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0, Inf] + type CustomPeerInfo* = object # populated after discovery - lastTimeDiscovered*: string + lastTimeDiscovered*: int64 + discovered*: int64 peerId*: string enr*: string ip*: string @@ -49,9 +54,12 @@ type city*: string # only after ok connection - lastTimeConnected*: string + lastTimeConnected*: int64 + retries*: int64 supportedProtocols*: seq[string] userAgent*: string + lastPingDuration*: Duration + avgPingDuration*: Duration # only after a ok/nok connection connError*: string