mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-25 13:26:17 +00:00
feat(networkmonitor): add ping latencies, optimize reconnections (#2068)
This commit is contained in:
parent
45fe2d3bee
commit
ed47354528
@ -16,6 +16,7 @@ import
|
|||||||
eth/p2p/discoveryv5/enr,
|
eth/p2p/discoveryv5/enr,
|
||||||
libp2p/crypto/crypto,
|
libp2p/crypto/crypto,
|
||||||
libp2p/nameresolving/dnsresolver,
|
libp2p/nameresolving/dnsresolver,
|
||||||
|
libp2p/protocols/ping,
|
||||||
metrics,
|
metrics,
|
||||||
metrics/chronos_httpserver,
|
metrics/chronos_httpserver,
|
||||||
presto/[route, server, client]
|
presto/[route, server, client]
|
||||||
@ -33,6 +34,10 @@ import
|
|||||||
logScope:
|
logScope:
|
||||||
topics = "networkmonitor"
|
topics = "networkmonitor"
|
||||||
|
|
||||||
|
const ReconnectTime = 60
|
||||||
|
const MaxConnectionRetries = 10
|
||||||
|
const AvgPingWindow = 10.0
|
||||||
|
|
||||||
proc setDiscoveredPeersCapabilities(
|
proc setDiscoveredPeersCapabilities(
|
||||||
routingTableNodes: seq[Node]) =
|
routingTableNodes: seq[Node]) =
|
||||||
for capability in @[Relay, Store, Filter, Lightpush]:
|
for capability in @[Relay, Store, Filter, Lightpush]:
|
||||||
@ -47,12 +52,14 @@ proc setConnectedPeersMetrics(discoveredNodes: seq[Node],
|
|||||||
restClient: RestClientRef,
|
restClient: RestClientRef,
|
||||||
allPeers: CustomPeersTableRef) {.async.} =
|
allPeers: CustomPeersTableRef) {.async.} =
|
||||||
|
|
||||||
let currentTime = $getTime()
|
let currentTime = getTime().toUnix()
|
||||||
|
|
||||||
# Protocols and agent string and its count
|
# Protocols and agent string and its count
|
||||||
var allProtocols: Table[string, int]
|
var allProtocols: Table[string, int]
|
||||||
var allAgentStrings: Table[string, int]
|
var allAgentStrings: Table[string, int]
|
||||||
|
|
||||||
|
var newPeers = 0
|
||||||
|
|
||||||
# iterate all newly discovered nodes
|
# iterate all newly discovered nodes
|
||||||
for discNode in discoveredNodes:
|
for discNode in discoveredNodes:
|
||||||
let typedRecord = discNode.record.toTypedRecord()
|
let typedRecord = discNode.record.toTypedRecord()
|
||||||
@ -65,15 +72,25 @@ proc setConnectedPeersMetrics(discoveredNodes: seq[Node],
|
|||||||
warn "could not get secp256k1 key", typedRecord=typedRecord.get()
|
warn "could not get secp256k1 key", typedRecord=typedRecord.get()
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
let peerRes = toRemotePeerInfo(discNode.record)
|
||||||
|
|
||||||
|
let peerInfo = peerRes.valueOr():
|
||||||
|
warn "error converting record to remote peer info", record=discNode.record
|
||||||
|
continue
|
||||||
|
|
||||||
# create new entry if new peerId found
|
# create new entry if new peerId found
|
||||||
let peerId = secp256k1.get().toHex()
|
let peerId = $peerInfo.peerId
|
||||||
let customPeerInfo = CustomPeerInfo(peerId: peerId)
|
let customPeerInfo = CustomPeerInfo(peerId: peerId)
|
||||||
if not allPeers.hasKey(peerId):
|
if not allPeers.hasKey(peerId):
|
||||||
allPeers[peerId] = customPeerInfo
|
allPeers[peerId] = customPeerInfo
|
||||||
|
newPeers += 1
|
||||||
|
else:
|
||||||
|
info "already seen", peerId=peerId
|
||||||
|
|
||||||
allPeers[peerId].lastTimeDiscovered = currentTime
|
allPeers[peerId].lastTimeDiscovered = currentTime
|
||||||
allPeers[peerId].enr = discNode.record.toURI()
|
allPeers[peerId].enr = discNode.record.toURI()
|
||||||
allPeers[peerId].enrCapabilities = discNode.record.getCapabilities().mapIt($it)
|
allPeers[peerId].enrCapabilities = discNode.record.getCapabilities().mapIt($it)
|
||||||
|
allPeers[peerId].discovered += 1
|
||||||
|
|
||||||
if not typedRecord.get().ip.isSome():
|
if not typedRecord.get().ip.isSome():
|
||||||
warn "ip field is not set", record=typedRecord.get()
|
warn "ip field is not set", record=typedRecord.get()
|
||||||
@ -82,28 +99,52 @@ proc setConnectedPeersMetrics(discoveredNodes: seq[Node],
|
|||||||
let ip = $typedRecord.get().ip.get().join(".")
|
let ip = $typedRecord.get().ip.get().join(".")
|
||||||
allPeers[peerId].ip = ip
|
allPeers[peerId].ip = ip
|
||||||
|
|
||||||
let peer = toRemotePeerInfo(discNode.record)
|
# try to ping the peer
|
||||||
if not peer.isOk():
|
if getTime().toUnix() >= allPeers[peerId].lastTimeConnected + ReconnectTime and allPeers[peerId].retries < MaxConnectionRetries:
|
||||||
warn "error converting record to remote peer info", record=discNode.record
|
if allPeers[peerId].retries > 0:
|
||||||
|
warn "trying to dial failed peer again", peerId=peerId, retry=allPeers[peerId].retries
|
||||||
|
|
||||||
|
var pingDelay:chronos.Duration
|
||||||
|
|
||||||
|
proc ping(): Future[Result[void, string]] {.async, gcsafe.} =
|
||||||
|
try:
|
||||||
|
let conn = await node.switch.dial(peerInfo.peerId, peerInfo.addrs, PingCodec)
|
||||||
|
pingDelay = await node.libp2pPing.ping(conn)
|
||||||
|
return ok()
|
||||||
|
|
||||||
|
except CatchableError:
|
||||||
|
var msg = getCurrentExceptionMsg()
|
||||||
|
if msg == "Future operation cancelled!":
|
||||||
|
msg = "timedout"
|
||||||
|
warn "failed to ping the peer", peer=peerInfo, err=msg
|
||||||
|
|
||||||
|
allPeers[peerId].connError = msg
|
||||||
|
return err("could not ping peer: " & msg)
|
||||||
|
|
||||||
|
let timedOut = not await ping().withTimeout(timeout)
|
||||||
|
# need this check for pingDelat == 0 because there may be a conn error before timeout
|
||||||
|
if timedOut or pingDelay == 0.millis:
|
||||||
|
allPeers[peerId].retries += 1
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# try to connect to the peer
|
info "successfully pinged peer", peer=peerInfo, duration=pingDelay.millis
|
||||||
# TODO: check last connection time and if not > x, skip connecting
|
peer_ping.observe(pingDelay.millis)
|
||||||
let timedOut = not await node.connectToNodes(@[peer.get()]).withTimeout(timeout)
|
|
||||||
if timedOut:
|
if allPeers[peerId].avgPingDuration == 0.millis:
|
||||||
warn "could not connect to peer, timedout", timeout=timeout, peer=peer.get()
|
allPeers[peerId].avgPingDuration = pingDelay
|
||||||
# TODO: Add other staates
|
|
||||||
allPeers[peerId].connError = "timedout"
|
# TODO: check why the calculation ends up losing precision
|
||||||
continue
|
allPeers[peerId].avgPingDuration = int64((float64(allPeers[peerId].avgPingDuration.millis) * (AvgPingWindow - 1.0) + float64(pingDelay.millis)) / AvgPingWindow).millis
|
||||||
|
allPeers[peerId].lastPingDuration = pingDelay
|
||||||
|
|
||||||
# after connection, get supported protocols
|
# after connection, get supported protocols
|
||||||
let lp2pPeerStore = node.switch.peerStore
|
let lp2pPeerStore = node.switch.peerStore
|
||||||
let nodeProtocols = lp2pPeerStore[ProtoBook][peer.get().peerId]
|
let nodeProtocols = lp2pPeerStore[ProtoBook][peerInfo.peerId]
|
||||||
allPeers[peerId].supportedProtocols = nodeProtocols
|
allPeers[peerId].supportedProtocols = nodeProtocols
|
||||||
allPeers[peerId].lastTimeConnected = currentTime
|
allPeers[peerId].lastTimeConnected = currentTime
|
||||||
|
|
||||||
# after connection, get user-agent
|
# after connection, get user-agent
|
||||||
let nodeUserAgent = lp2pPeerStore[AgentBook][peer.get().peerId]
|
let nodeUserAgent = lp2pPeerStore[AgentBook][peerInfo.peerId]
|
||||||
allPeers[peerId].userAgent = nodeUserAgent
|
allPeers[peerId].userAgent = nodeUserAgent
|
||||||
|
|
||||||
# store avaiable protocols in the network
|
# store avaiable protocols in the network
|
||||||
@ -119,6 +160,7 @@ proc setConnectedPeersMetrics(discoveredNodes: seq[Node],
|
|||||||
|
|
||||||
debug "connected to peer", peer=allPeers[customPeerInfo.peerId]
|
debug "connected to peer", peer=allPeers[customPeerInfo.peerId]
|
||||||
|
|
||||||
|
info "number of newly discovered peers", amount=newPeers
|
||||||
# inform the total connections that we did in this round
|
# inform the total connections that we did in this round
|
||||||
let nOfOkConnections = allProtocols.len()
|
let nOfOkConnections = allProtocols.len()
|
||||||
info "number of successful connections", amount=nOfOkConnections
|
info "number of successful connections", amount=nOfOkConnections
|
||||||
@ -412,6 +454,7 @@ when isMainModule:
|
|||||||
let (node, discv5) = nodeRes.get()
|
let (node, discv5) = nodeRes.get()
|
||||||
|
|
||||||
waitFor node.mountRelay()
|
waitFor node.mountRelay()
|
||||||
|
waitFor node.mountLibp2pPing()
|
||||||
|
|
||||||
# Subscribe the node to the default pubsubtopic, to count messages
|
# Subscribe the node to the default pubsubtopic, to count messages
|
||||||
subscribeAndHandleMessages(node, DefaultPubsubTopic, msgPerContentTopic)
|
subscribeAndHandleMessages(node, DefaultPubsubTopic, msgPerContentTopic)
|
||||||
|
@ -11,7 +11,7 @@ type
|
|||||||
NetworkMonitorConf* = object
|
NetworkMonitorConf* = object
|
||||||
logLevel* {.
|
logLevel* {.
|
||||||
desc: "Sets the log level",
|
desc: "Sets the log level",
|
||||||
defaultValue: LogLevel.DEBUG,
|
defaultValue: LogLevel.INFO,
|
||||||
name: "log-level",
|
name: "log-level",
|
||||||
abbr: "l" .}: LogLevel
|
abbr: "l" .}: LogLevel
|
||||||
|
|
||||||
|
@ -37,10 +37,15 @@ declarePublicGauge peer_user_agents,
|
|||||||
"Number of peers with each user agent",
|
"Number of peers with each user agent",
|
||||||
labels = ["user_agent"]
|
labels = ["user_agent"]
|
||||||
|
|
||||||
|
declarePublicHistogram peer_ping,
|
||||||
|
"Histogram tracking ping durations for discovered peers",
|
||||||
|
buckets = [100.0, 200.0, 300.0, 400.0, 500.0, 600.0, 700.0, 800.0, 900.0, 1000.0, 2000.0, Inf]
|
||||||
|
|
||||||
type
|
type
|
||||||
CustomPeerInfo* = object
|
CustomPeerInfo* = object
|
||||||
# populated after discovery
|
# populated after discovery
|
||||||
lastTimeDiscovered*: string
|
lastTimeDiscovered*: int64
|
||||||
|
discovered*: int64
|
||||||
peerId*: string
|
peerId*: string
|
||||||
enr*: string
|
enr*: string
|
||||||
ip*: string
|
ip*: string
|
||||||
@ -49,9 +54,12 @@ type
|
|||||||
city*: string
|
city*: string
|
||||||
|
|
||||||
# only after ok connection
|
# only after ok connection
|
||||||
lastTimeConnected*: string
|
lastTimeConnected*: int64
|
||||||
|
retries*: int64
|
||||||
supportedProtocols*: seq[string]
|
supportedProtocols*: seq[string]
|
||||||
userAgent*: string
|
userAgent*: string
|
||||||
|
lastPingDuration*: Duration
|
||||||
|
avgPingDuration*: Duration
|
||||||
|
|
||||||
# only after a ok/nok connection
|
# only after a ok/nok connection
|
||||||
connError*: string
|
connError*: string
|
||||||
|
Loading…
x
Reference in New Issue
Block a user