diff --git a/codexdht/private/eth/p2p/discoveryv5/node.nim b/codexdht/private/eth/p2p/discoveryv5/node.nim index 7b62dff..1bfacf8 100644 --- a/codexdht/private/eth/p2p/discoveryv5/node.nim +++ b/codexdht/private/eth/p2p/discoveryv5/node.nim @@ -20,6 +20,9 @@ import export stint +const + avgSmoothingFactor = 0.9 + type NodeId* = UInt256 @@ -27,6 +30,12 @@ type ip*: ValidIpAddress port*: Port + Stats* = object + rttMin*: float #millisec + rttAvg*: float #millisec + bwAvg*: float #bps + bwMax*: float #bps + Node* = ref object id*: NodeId pubkey*: PublicKey @@ -35,6 +44,7 @@ type seen*: bool ## Indicates if there was at least one successful ## request-response with this node, or if the nde was verified ## through the underlying transport mechanisms. + stats*: Stats # traffic measurements and statistics func toNodeId*(pid: PeerId): NodeId = ## Convert public key to a node identifier. @@ -182,3 +192,21 @@ func shortLog*(address: Address): string = $address chronicles.formatIt(Address): shortLog(it) + +# collecting performane metrics +func registerRtt*(n: Node, rtt: Duration) = + ## register an RTT measurement + let rttMs = rtt.nanoseconds.float / 1e6 + n.stats.rttMin = + if n.stats.rttMin == 0: rttMs + else: min(n.stats.rttMin, rttMs) + n.stats.rttAvg = + if n.stats.rttAvg == 0: rttMs + else: avgSmoothingFactor * n.stats.rttAvg + (1.0 - avgSmoothingFactor) * rttMs + +func registerBw*(n: Node, bw: float) = + ## register an bandwidth measurement + n.stats.bwMax = max(n.stats.bwMax, bw) + n.stats.bwAvg = + if n.stats.bwAvg == 0: bw + else: avgSmoothingFactor * n.stats.bwAvg + (1.0 - avgSmoothingFactor) * bw diff --git a/codexdht/private/eth/p2p/discoveryv5/protocol.nim b/codexdht/private/eth/p2p/discoveryv5/protocol.nim index f70435a..0ac76d9 100644 --- a/codexdht/private/eth/p2p/discoveryv5/protocol.nim +++ b/codexdht/private/eth/p2p/discoveryv5/protocol.nim @@ -126,6 +126,7 @@ const RevalidateMax = 10000 ## Revalidation of a peer is done between min and max milliseconds. ## value in milliseconds IpMajorityInterval = 5.minutes ## Interval for checking the latest IP:Port + DebugPrintInterval = 5.minutes ## Interval to print neighborhood with stats ## majority and updating this when SPR auto update is set. InitialLookups = 1 ## Amount of lookups done when populating the routing table ResponseTimeout* = 1.seconds ## timeout for the response of a request-response @@ -167,6 +168,7 @@ type refreshLoop: Future[void] revalidateLoop: Future[void] ipMajorityLoop: Future[void] + debugPrintLoop: Future[void] lastLookup: chronos.Moment bootstrapRecords*: seq[SignedPeerRecord] ipVote: IpVote @@ -499,15 +501,31 @@ proc waitNodes(d: Protocol, fromNode: Node, reqId: RequestId): ## on that, more replies will be awaited. ## If one reply is lost here (timed out), others are ignored too. ## Same counts for out of order receival. + let startTime = Moment.now() var op = await d.waitMessage(fromNode, reqId) if op.isSome: if op.get.kind == MessageKind.nodes: var res = op.get.nodes.sprs - let total = op.get.nodes.total + let + total = op.get.nodes.total + firstTime = Moment.now() + rtt = firstTime - startTime + # trace "nodes RTT:", rtt, node = fromNode + fromNode.registerRtt(rtt) for i in 1 ..< total: op = await d.waitMessage(fromNode, reqId) if op.isSome and op.get.kind == MessageKind.nodes: res.add(op.get.nodes.sprs) + # Estimate bandwidth based on UDP packet train received, assuming these were + # released fast and spaced in time by bandwidth bottleneck. This is just a rough + # packet-pair based estimate, far from being perfect. + # TODO: get message size from lower layer for better bandwidth estimate + # TODO: get better reception timestamp from lower layers + let + deltaT = Moment.now() - firstTime + bwBps = 500.0 * 8.0 / (deltaT.nanoseconds.float / i.float / 1e9) + # trace "bw estimate:", deltaT = deltaT, i, bw_mbps = bwBps / 1e6, node = fromNode + fromNode.registerBw(bwBps) else: # No error on this as we received some nodes. break @@ -526,7 +544,11 @@ proc ping*(d: Protocol, toNode: Node): ## Returns the received pong message or an error. let msg = PingMessage(sprSeq: d.localNode.record.seqNum) + startTime = Moment.now() resp = await d.waitResponse(toNode, msg) + rtt = Moment.now() - startTime + # trace "ping RTT:", rtt, node = toNode + toNode.registerRtt(rtt) if resp.isSome(): if resp.get().kind == pong: @@ -586,7 +608,11 @@ proc talkReq*(d: Protocol, toNode: Node, protocol, request: seq[byte]): ## Returns the received talkresp message or an error. let msg = TalkReqMessage(protocol: protocol, request: request) + startTime = Moment.now() resp = await d.waitResponse(toNode, msg) + rtt = Moment.now() - startTime + # trace "talk RTT:", rtt, node = toNode + toNode.registerRtt(rtt) if resp.isSome(): if resp.get().kind == talkResp: @@ -927,6 +953,7 @@ proc revalidateNode*(d: Protocol, n: Node) {.async.} = discard d.addNode(nodes[][0]) # Get IP and port from pong message and add it to the ip votes + trace "pong rx", n, myip = res.ip, myport = res.port let a = Address(ip: ValidIpAddress.init(res.ip), port: Port(res.port)) d.ipVote.insert(n.id, a) @@ -1012,6 +1039,18 @@ proc ipMajorityLoop(d: Protocol) {.async.} = trace "ipMajorityLoop canceled" trace "ipMajorityLoop exited!" +proc debugPrintLoop(d: Protocol) {.async.} = + ## Loop which prints the neighborhood with stats + while true: + await sleepAsync(DebugPrintInterval) + for b in d.routingTable.buckets: + debug "bucket", depth = b.getDepth, + len = b.nodes.len, standby = b.replacementLen + for n in b.nodes: + debug "node", n, rttMin = n.stats.rttMin.int, rttAvg = n.stats.rttAvg.int + # bandwidth estimates are based on limited information, so not logging it yet to avoid confusion + # trace "node", n, bwMaxMbps = (n.stats.bwMax / 1e6).round(3), bwAvgMbps = (n.stats.bwAvg / 1e6).round(3) + func init*( T: type DiscoveryConfig, tableIpLimit: uint, @@ -1149,6 +1188,7 @@ proc start*(d: Protocol) {.async.} = d.refreshLoop = refreshLoop(d) d.revalidateLoop = revalidateLoop(d) d.ipMajorityLoop = ipMajorityLoop(d) + d.debugPrintLoop = debugPrintLoop(d) await d.providers.start() diff --git a/codexdht/private/eth/p2p/discoveryv5/routing_table.nim b/codexdht/private/eth/p2p/discoveryv5/routing_table.nim index f4a1995..dcfa75c 100644 --- a/codexdht/private/eth/p2p/discoveryv5/routing_table.nim +++ b/codexdht/private/eth/p2p/discoveryv5/routing_table.nim @@ -182,6 +182,8 @@ proc midpoint(k: KBucket): NodeId = proc len(k: KBucket): int = k.nodes.len +proc replacementLen*(k: KBucket): int = k.replacementCache.len + proc tail(k: KBucket): Node = k.nodes[high(k.nodes)] proc ipLimitInc(r: var RoutingTable, b: KBucket, n: Node): bool = @@ -283,6 +285,9 @@ proc computeSharedPrefixBits(nodes: openArray[NodeId]): int = # Reaching this would mean that all node ids are equal. doAssert(false, "Unable to calculate number of shared prefix bits") +proc getDepth*(b: KBucket) : int = + computeSharedPrefixBits(@[b.istart, b.iend]) + proc init*(T: type RoutingTable, localNode: Node, bitsPerHop = DefaultBitsPerHop, ipLimits = DefaultTableIpLimits, rng: ref HmacDrbgContext, distanceCalculator = XorDistanceCalculator): T = diff --git a/codexdht/private/eth/p2p/discoveryv5/transport.nim b/codexdht/private/eth/p2p/discoveryv5/transport.nim index 2861611..2caa73e 100644 --- a/codexdht/private/eth/p2p/discoveryv5/transport.nim +++ b/codexdht/private/eth/p2p/discoveryv5/transport.nim @@ -38,7 +38,7 @@ type client: Client bindAddress: Address ## UDP binding address transp: DatagramTransport - pendingRequests: Table[AESGCMNonce, PendingRequest] + pendingRequests: Table[AESGCMNonce, (PendingRequest, Moment)] keyexchangeInProgress: HashSet[NodeId] pendingRequestsByNode: Table[NodeId, seq[seq[byte]]] codec*: Codec @@ -87,7 +87,7 @@ proc sendMessage*(t: Transport, toId: NodeId, toAddr: Address, message: seq[byte proc registerRequest(t: Transport, n: Node, message: seq[byte], nonce: AESGCMNonce) = let request = PendingRequest(node: n, message: message) - if not t.pendingRequests.hasKeyOrPut(nonce, request): + if not t.pendingRequests.hasKeyOrPut(nonce, (request, Moment.now())): sleepAsync(responseTimeout).addCallback() do(data: pointer): t.pendingRequests.del(nonce) @@ -184,9 +184,16 @@ proc receive*(t: Transport, a: Address, packet: openArray[byte]) = of Flag.Whoareyou: trace "Received whoareyou packet", myport = t.bindAddress.port, address = a - var pr: PendingRequest - if t.pendingRequests.take(packet.whoareyou.requestNonce, pr): - let toNode = pr.node + var + prt: (PendingRequest, Moment) + if t.pendingRequests.take(packet.whoareyou.requestNonce, prt): + let + pr = prt[0] + startTime = prt[1] + toNode = pr.node + rtt = Moment.now() - startTime + # trace "whoareyou RTT:", rtt, node = toNode + toNode.registerRtt(rtt) # This is a node we previously contacted and thus must have an address. doAssert(toNode.address.isSome()) let address = toNode.address.get()