From f3eec2a202e48341bc921da4bf251b36fe927b5f Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 7 Oct 2024 21:33:45 +0200 Subject: [PATCH 1/7] node: add RTT and bandwidth measurement holders Signed-off-by: Csaba Kiraly --- codexdht/private/eth/p2p/discoveryv5/node.nim | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/codexdht/private/eth/p2p/discoveryv5/node.nim b/codexdht/private/eth/p2p/discoveryv5/node.nim index 7b62dff..1bfacf8 100644 --- a/codexdht/private/eth/p2p/discoveryv5/node.nim +++ b/codexdht/private/eth/p2p/discoveryv5/node.nim @@ -20,6 +20,9 @@ import export stint +const + avgSmoothingFactor = 0.9 + type NodeId* = UInt256 @@ -27,6 +30,12 @@ type ip*: ValidIpAddress port*: Port + Stats* = object + rttMin*: float #millisec + rttAvg*: float #millisec + bwAvg*: float #bps + bwMax*: float #bps + Node* = ref object id*: NodeId pubkey*: PublicKey @@ -35,6 +44,7 @@ type seen*: bool ## Indicates if there was at least one successful ## request-response with this node, or if the nde was verified ## through the underlying transport mechanisms. + stats*: Stats # traffic measurements and statistics func toNodeId*(pid: PeerId): NodeId = ## Convert public key to a node identifier. @@ -182,3 +192,21 @@ func shortLog*(address: Address): string = $address chronicles.formatIt(Address): shortLog(it) + +# collecting performane metrics +func registerRtt*(n: Node, rtt: Duration) = + ## register an RTT measurement + let rttMs = rtt.nanoseconds.float / 1e6 + n.stats.rttMin = + if n.stats.rttMin == 0: rttMs + else: min(n.stats.rttMin, rttMs) + n.stats.rttAvg = + if n.stats.rttAvg == 0: rttMs + else: avgSmoothingFactor * n.stats.rttAvg + (1.0 - avgSmoothingFactor) * rttMs + +func registerBw*(n: Node, bw: float) = + ## register an bandwidth measurement + n.stats.bwMax = max(n.stats.bwMax, bw) + n.stats.bwAvg = + if n.stats.bwAvg == 0: bw + else: avgSmoothingFactor * n.stats.bwAvg + (1.0 - avgSmoothingFactor) * bw From 0b69de242fc80cb9c1095dbd399b0c1057e9a3ef Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 8 Oct 2024 10:40:33 +0200 Subject: [PATCH 2/7] add rtt measurement Signed-off-by: Csaba Kiraly --- .../private/eth/p2p/discoveryv5/protocol.nim | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/codexdht/private/eth/p2p/discoveryv5/protocol.nim b/codexdht/private/eth/p2p/discoveryv5/protocol.nim index 9deefbb..c6386e9 100644 --- a/codexdht/private/eth/p2p/discoveryv5/protocol.nim +++ b/codexdht/private/eth/p2p/discoveryv5/protocol.nim @@ -499,11 +499,17 @@ proc waitNodes(d: Protocol, fromNode: Node, reqId: RequestId): ## on that, more replies will be awaited. ## If one reply is lost here (timed out), others are ignored too. ## Same counts for out of order receival. + let startTime = Moment.now() var op = await d.waitMessage(fromNode, reqId) if op.isSome: if op.get.kind == MessageKind.nodes: var res = op.get.nodes.sprs - let total = op.get.nodes.total + let + total = op.get.nodes.total + firstTime = Moment.now() + rtt = firstTime - startTime + # trace "nodes RTT:", rtt, node = fromNode + fromNode.registerRtt(rtt) for i in 1 ..< total: op = await d.waitMessage(fromNode, reqId) if op.isSome and op.get.kind == MessageKind.nodes: @@ -526,7 +532,11 @@ proc ping*(d: Protocol, toNode: Node): ## Returns the received pong message or an error. let msg = PingMessage(sprSeq: d.localNode.record.seqNum) + startTime = Moment.now() resp = await d.waitResponse(toNode, msg) + rtt = Moment.now() - startTime + # trace "ping RTT:", rtt, node = toNode + toNode.registerRtt(rtt) if resp.isSome(): if resp.get().kind == pong: @@ -586,7 +596,11 @@ proc talkReq*(d: Protocol, toNode: Node, protocol, request: seq[byte]): ## Returns the received talkresp message or an error. let msg = TalkReqMessage(protocol: protocol, request: request) + startTime = Moment.now() resp = await d.waitResponse(toNode, msg) + rtt = Moment.now() - startTime + # trace "talk RTT:", rtt, node = toNode + toNode.registerRtt(rtt) if resp.isSome(): if resp.get().kind == talkResp: From ec4f0d4a84b497574b4e39b2cb62075a5783fc2a Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 8 Oct 2024 10:43:06 +0200 Subject: [PATCH 3/7] add transport level RTT measurement Signed-off-by: Csaba Kiraly --- .../private/eth/p2p/discoveryv5/transport.nim | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/codexdht/private/eth/p2p/discoveryv5/transport.nim b/codexdht/private/eth/p2p/discoveryv5/transport.nim index 6049504..36df7a7 100644 --- a/codexdht/private/eth/p2p/discoveryv5/transport.nim +++ b/codexdht/private/eth/p2p/discoveryv5/transport.nim @@ -25,7 +25,7 @@ type client: Client bindAddress: Address ## UDP binding address transp: DatagramTransport - pendingRequests: Table[AESGCMNonce, PendingRequest] + pendingRequests: Table[AESGCMNonce, (PendingRequest, Moment)] keyexchangeInProgress: HashSet[NodeId] pendingRequestsByNode: Table[NodeId, seq[seq[byte]]] codec*: Codec @@ -70,7 +70,7 @@ proc sendMessage*(t: Transport, toId: NodeId, toAddr: Address, message: seq[byte proc registerRequest(t: Transport, n: Node, message: seq[byte], nonce: AESGCMNonce) = let request = PendingRequest(node: n, message: message) - if not t.pendingRequests.hasKeyOrPut(nonce, request): + if not t.pendingRequests.hasKeyOrPut(nonce, (request, Moment.now())): sleepAsync(responseTimeout).addCallback() do(data: pointer): t.pendingRequests.del(nonce) @@ -165,9 +165,16 @@ proc receive*(t: Transport, a: Address, packet: openArray[byte]) = of Flag.Whoareyou: trace "Received whoareyou packet", myport = t.bindAddress.port, address = a - var pr: PendingRequest - if t.pendingRequests.take(packet.whoareyou.requestNonce, pr): - let toNode = pr.node + var + prt: (PendingRequest, Moment) + if t.pendingRequests.take(packet.whoareyou.requestNonce, prt): + let + pr = prt[0] + startTime = prt[1] + toNode = pr.node + rtt = Moment.now() - startTime + # trace "whoareyou RTT:", rtt, node = toNode + toNode.registerRtt(rtt) # This is a node we previously contacted and thus must have an address. doAssert(toNode.address.isSome()) let address = toNode.address.get() From 0825d887ea0cfc70c919d0c6b944ba4c7dbc6021 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 8 Oct 2024 10:47:05 +0200 Subject: [PATCH 4/7] add bandwidth estimate Signed-off-by: Csaba Kiraly --- codexdht/private/eth/p2p/discoveryv5/protocol.nim | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/codexdht/private/eth/p2p/discoveryv5/protocol.nim b/codexdht/private/eth/p2p/discoveryv5/protocol.nim index c6386e9..1efecb7 100644 --- a/codexdht/private/eth/p2p/discoveryv5/protocol.nim +++ b/codexdht/private/eth/p2p/discoveryv5/protocol.nim @@ -514,6 +514,18 @@ proc waitNodes(d: Protocol, fromNode: Node, reqId: RequestId): op = await d.waitMessage(fromNode, reqId) if op.isSome and op.get.kind == MessageKind.nodes: res.add(op.get.nodes.sprs) + # Estimate bandwidth based on UDP packet train received, assuming these were + # released fast and spaced in time by bandwidth bottleneck. This is just a rough + # packet-pair based estimate, far from being perfect. + # TODO: get message size from lower layer for better bandwidth estimate + # TODO: get better reception timestamp from lower layers + let + deltaT = Moment.now() - firstTime + bwBps = 500.0 * 8.0 / (deltaT.nanoseconds.float / i.float / 1e9) + debug "bw estimate:", deltaT = deltaT, i, + bw_mbps = bwBps / 1e6, + node = fromNode + fromNode.registerBw(bwBps) else: # No error on this as we received some nodes. break @@ -941,6 +953,7 @@ proc revalidateNode*(d: Protocol, n: Node) {.async.} = discard d.addNode(nodes[][0]) # Get IP and port from pong message and add it to the ip votes + trace "pong rx", n, myip = res.ip, myport = res.port let a = Address(ip: ValidIpAddress.init(res.ip), port: Port(res.port)) d.ipVote.insert(n.id, a) From 706cb50041ef79d7aee7f8495281c6abaa440c1c Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 8 Oct 2024 10:25:42 +0200 Subject: [PATCH 5/7] add debugPrintLoop to print neighborhood info Signed-off-by: Csaba Kiraly --- codexdht/private/eth/p2p/discoveryv5/protocol.nim | 14 ++++++++++++++ .../private/eth/p2p/discoveryv5/routing_table.nim | 5 +++++ 2 files changed, 19 insertions(+) diff --git a/codexdht/private/eth/p2p/discoveryv5/protocol.nim b/codexdht/private/eth/p2p/discoveryv5/protocol.nim index 1efecb7..5220243 100644 --- a/codexdht/private/eth/p2p/discoveryv5/protocol.nim +++ b/codexdht/private/eth/p2p/discoveryv5/protocol.nim @@ -126,6 +126,7 @@ const RevalidateMax = 10000 ## Revalidation of a peer is done between min and max milliseconds. ## value in milliseconds IpMajorityInterval = 5.minutes ## Interval for checking the latest IP:Port + DebugPrintInterval = 5.minutes ## Interval to print neighborhood with stats ## majority and updating this when SPR auto update is set. InitialLookups = 1 ## Amount of lookups done when populating the routing table ResponseTimeout* = 1.seconds ## timeout for the response of a request-response @@ -167,6 +168,7 @@ type refreshLoop: Future[void] revalidateLoop: Future[void] ipMajorityLoop: Future[void] + debugPrintLoop: Future[void] lastLookup: chronos.Moment bootstrapRecords*: seq[SignedPeerRecord] ipVote: IpVote @@ -1039,6 +1041,17 @@ proc ipMajorityLoop(d: Protocol) {.async.} = trace "ipMajorityLoop canceled" trace "ipMajorityLoop exited!" +proc debugPrintLoop(d: Protocol) {.async.} = + ## Loop which prints the neighborhood with stats + while true: + await sleepAsync(DebugPrintInterval) + for b in d.routingTable.buckets: + debug "bucket", depth = b.getDepth, + len = b.nodes.len, standby = b.replacementLen + for n in b.nodes: + debug "node", n, rttMin = n.stats.rttMin.int, rttAvg = n.stats.rttAvg.int, + bwMaxMbps = (n.stats.bwMax / 1e6).round(3), bwAvgMbps = (n.stats.bwAvg / 1e6).round(3) + func init*( T: type DiscoveryConfig, tableIpLimit: uint, @@ -1176,6 +1189,7 @@ proc start*(d: Protocol) {.async.} = d.refreshLoop = refreshLoop(d) d.revalidateLoop = revalidateLoop(d) d.ipMajorityLoop = ipMajorityLoop(d) + d.debugPrintLoop = debugPrintLoop(d) await d.providers.start() diff --git a/codexdht/private/eth/p2p/discoveryv5/routing_table.nim b/codexdht/private/eth/p2p/discoveryv5/routing_table.nim index a890863..25f6049 100644 --- a/codexdht/private/eth/p2p/discoveryv5/routing_table.nim +++ b/codexdht/private/eth/p2p/discoveryv5/routing_table.nim @@ -180,6 +180,8 @@ proc midpoint(k: KBucket): NodeId = proc len(k: KBucket): int = k.nodes.len +proc replacementLen*(k: KBucket): int = k.replacementCache.len + proc tail(k: KBucket): Node = k.nodes[high(k.nodes)] proc ipLimitInc(r: var RoutingTable, b: KBucket, n: Node): bool = @@ -281,6 +283,9 @@ proc computeSharedPrefixBits(nodes: openArray[NodeId]): int = # Reaching this would mean that all node ids are equal. doAssert(false, "Unable to calculate number of shared prefix bits") +proc getDepth*(b: KBucket) : int = + computeSharedPrefixBits(@[b.istart, b.iend]) + proc init*(T: type RoutingTable, localNode: Node, bitsPerHop = DefaultBitsPerHop, ipLimits = DefaultTableIpLimits, rng: ref HmacDrbgContext, distanceCalculator = XorDistanceCalculator): T = From 7057663f8180a3aad183a248345c532a064c0736 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 14 Oct 2024 11:19:36 +0200 Subject: [PATCH 6/7] fixup: remove excessive debug Signed-off-by: Csaba Kiraly --- codexdht/private/eth/p2p/discoveryv5/protocol.nim | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/codexdht/private/eth/p2p/discoveryv5/protocol.nim b/codexdht/private/eth/p2p/discoveryv5/protocol.nim index 5220243..5eb803a 100644 --- a/codexdht/private/eth/p2p/discoveryv5/protocol.nim +++ b/codexdht/private/eth/p2p/discoveryv5/protocol.nim @@ -524,9 +524,7 @@ proc waitNodes(d: Protocol, fromNode: Node, reqId: RequestId): let deltaT = Moment.now() - firstTime bwBps = 500.0 * 8.0 / (deltaT.nanoseconds.float / i.float / 1e9) - debug "bw estimate:", deltaT = deltaT, i, - bw_mbps = bwBps / 1e6, - node = fromNode + # trace "bw estimate:", deltaT = deltaT, i, bw_mbps = bwBps / 1e6, node = fromNode fromNode.registerBw(bwBps) else: # No error on this as we received some nodes. From 8b1660464d4edb56459edd08350e1150f9a08956 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 14 Oct 2024 13:57:52 +0200 Subject: [PATCH 7/7] don't log bandwidth estimates Signed-off-by: Csaba Kiraly --- codexdht/private/eth/p2p/discoveryv5/protocol.nim | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/codexdht/private/eth/p2p/discoveryv5/protocol.nim b/codexdht/private/eth/p2p/discoveryv5/protocol.nim index 5eb803a..75bef86 100644 --- a/codexdht/private/eth/p2p/discoveryv5/protocol.nim +++ b/codexdht/private/eth/p2p/discoveryv5/protocol.nim @@ -1047,8 +1047,9 @@ proc debugPrintLoop(d: Protocol) {.async.} = debug "bucket", depth = b.getDepth, len = b.nodes.len, standby = b.replacementLen for n in b.nodes: - debug "node", n, rttMin = n.stats.rttMin.int, rttAvg = n.stats.rttAvg.int, - bwMaxMbps = (n.stats.bwMax / 1e6).round(3), bwAvgMbps = (n.stats.bwAvg / 1e6).round(3) + debug "node", n, rttMin = n.stats.rttMin.int, rttAvg = n.stats.rttAvg.int + # bandwidth estimates are based on limited information, so not logging it yet to avoid confusion + # trace "node", n, bwMaxMbps = (n.stats.bwMax / 1e6).round(3), bwAvgMbps = (n.stats.bwAvg / 1e6).round(3) func init*( T: type DiscoveryConfig,