Merge pull request #102 from codex-storage/measure-rtt-bw

Measure rtt, estimate bw, and log every 5 minutes
This commit is contained in:
Csaba Kiraly 2024-10-14 14:19:35 +02:00 committed by GitHub
commit c1d2ea410d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 86 additions and 6 deletions

View File

@ -20,6 +20,9 @@ import
export stint
const
avgSmoothingFactor = 0.9
type
NodeId* = UInt256
@ -27,6 +30,12 @@ type
ip*: ValidIpAddress
port*: Port
Stats* = object
rttMin*: float #millisec
rttAvg*: float #millisec
bwAvg*: float #bps
bwMax*: float #bps
Node* = ref object
id*: NodeId
pubkey*: PublicKey
@ -35,6 +44,7 @@ type
seen*: bool ## Indicates if there was at least one successful
## request-response with this node, or if the nde was verified
## through the underlying transport mechanisms.
stats*: Stats # traffic measurements and statistics
func toNodeId*(pid: PeerId): NodeId =
## Convert public key to a node identifier.
@ -182,3 +192,21 @@ func shortLog*(address: Address): string =
$address
chronicles.formatIt(Address): shortLog(it)
# collecting performane metrics
func registerRtt*(n: Node, rtt: Duration) =
## register an RTT measurement
let rttMs = rtt.nanoseconds.float / 1e6
n.stats.rttMin =
if n.stats.rttMin == 0: rttMs
else: min(n.stats.rttMin, rttMs)
n.stats.rttAvg =
if n.stats.rttAvg == 0: rttMs
else: avgSmoothingFactor * n.stats.rttAvg + (1.0 - avgSmoothingFactor) * rttMs
func registerBw*(n: Node, bw: float) =
## register an bandwidth measurement
n.stats.bwMax = max(n.stats.bwMax, bw)
n.stats.bwAvg =
if n.stats.bwAvg == 0: bw
else: avgSmoothingFactor * n.stats.bwAvg + (1.0 - avgSmoothingFactor) * bw

View File

@ -126,6 +126,7 @@ const
RevalidateMax = 10000 ## Revalidation of a peer is done between min and max milliseconds.
## value in milliseconds
IpMajorityInterval = 5.minutes ## Interval for checking the latest IP:Port
DebugPrintInterval = 5.minutes ## Interval to print neighborhood with stats
## majority and updating this when SPR auto update is set.
InitialLookups = 1 ## Amount of lookups done when populating the routing table
ResponseTimeout* = 1.seconds ## timeout for the response of a request-response
@ -167,6 +168,7 @@ type
refreshLoop: Future[void]
revalidateLoop: Future[void]
ipMajorityLoop: Future[void]
debugPrintLoop: Future[void]
lastLookup: chronos.Moment
bootstrapRecords*: seq[SignedPeerRecord]
ipVote: IpVote
@ -499,15 +501,31 @@ proc waitNodes(d: Protocol, fromNode: Node, reqId: RequestId):
## on that, more replies will be awaited.
## If one reply is lost here (timed out), others are ignored too.
## Same counts for out of order receival.
let startTime = Moment.now()
var op = await d.waitMessage(fromNode, reqId)
if op.isSome:
if op.get.kind == MessageKind.nodes:
var res = op.get.nodes.sprs
let total = op.get.nodes.total
let
total = op.get.nodes.total
firstTime = Moment.now()
rtt = firstTime - startTime
# trace "nodes RTT:", rtt, node = fromNode
fromNode.registerRtt(rtt)
for i in 1 ..< total:
op = await d.waitMessage(fromNode, reqId)
if op.isSome and op.get.kind == MessageKind.nodes:
res.add(op.get.nodes.sprs)
# Estimate bandwidth based on UDP packet train received, assuming these were
# released fast and spaced in time by bandwidth bottleneck. This is just a rough
# packet-pair based estimate, far from being perfect.
# TODO: get message size from lower layer for better bandwidth estimate
# TODO: get better reception timestamp from lower layers
let
deltaT = Moment.now() - firstTime
bwBps = 500.0 * 8.0 / (deltaT.nanoseconds.float / i.float / 1e9)
# trace "bw estimate:", deltaT = deltaT, i, bw_mbps = bwBps / 1e6, node = fromNode
fromNode.registerBw(bwBps)
else:
# No error on this as we received some nodes.
break
@ -526,7 +544,11 @@ proc ping*(d: Protocol, toNode: Node):
## Returns the received pong message or an error.
let
msg = PingMessage(sprSeq: d.localNode.record.seqNum)
startTime = Moment.now()
resp = await d.waitResponse(toNode, msg)
rtt = Moment.now() - startTime
# trace "ping RTT:", rtt, node = toNode
toNode.registerRtt(rtt)
if resp.isSome():
if resp.get().kind == pong:
@ -586,7 +608,11 @@ proc talkReq*(d: Protocol, toNode: Node, protocol, request: seq[byte]):
## Returns the received talkresp message or an error.
let
msg = TalkReqMessage(protocol: protocol, request: request)
startTime = Moment.now()
resp = await d.waitResponse(toNode, msg)
rtt = Moment.now() - startTime
# trace "talk RTT:", rtt, node = toNode
toNode.registerRtt(rtt)
if resp.isSome():
if resp.get().kind == talkResp:
@ -927,6 +953,7 @@ proc revalidateNode*(d: Protocol, n: Node) {.async.} =
discard d.addNode(nodes[][0])
# Get IP and port from pong message and add it to the ip votes
trace "pong rx", n, myip = res.ip, myport = res.port
let a = Address(ip: ValidIpAddress.init(res.ip), port: Port(res.port))
d.ipVote.insert(n.id, a)
@ -1012,6 +1039,18 @@ proc ipMajorityLoop(d: Protocol) {.async.} =
trace "ipMajorityLoop canceled"
trace "ipMajorityLoop exited!"
proc debugPrintLoop(d: Protocol) {.async.} =
## Loop which prints the neighborhood with stats
while true:
await sleepAsync(DebugPrintInterval)
for b in d.routingTable.buckets:
debug "bucket", depth = b.getDepth,
len = b.nodes.len, standby = b.replacementLen
for n in b.nodes:
debug "node", n, rttMin = n.stats.rttMin.int, rttAvg = n.stats.rttAvg.int
# bandwidth estimates are based on limited information, so not logging it yet to avoid confusion
# trace "node", n, bwMaxMbps = (n.stats.bwMax / 1e6).round(3), bwAvgMbps = (n.stats.bwAvg / 1e6).round(3)
func init*(
T: type DiscoveryConfig,
tableIpLimit: uint,
@ -1149,6 +1188,7 @@ proc start*(d: Protocol) {.async.} =
d.refreshLoop = refreshLoop(d)
d.revalidateLoop = revalidateLoop(d)
d.ipMajorityLoop = ipMajorityLoop(d)
d.debugPrintLoop = debugPrintLoop(d)
await d.providers.start()

View File

@ -182,6 +182,8 @@ proc midpoint(k: KBucket): NodeId =
proc len(k: KBucket): int = k.nodes.len
proc replacementLen*(k: KBucket): int = k.replacementCache.len
proc tail(k: KBucket): Node = k.nodes[high(k.nodes)]
proc ipLimitInc(r: var RoutingTable, b: KBucket, n: Node): bool =
@ -283,6 +285,9 @@ proc computeSharedPrefixBits(nodes: openArray[NodeId]): int =
# Reaching this would mean that all node ids are equal.
doAssert(false, "Unable to calculate number of shared prefix bits")
proc getDepth*(b: KBucket) : int =
computeSharedPrefixBits(@[b.istart, b.iend])
proc init*(T: type RoutingTable, localNode: Node, bitsPerHop = DefaultBitsPerHop,
ipLimits = DefaultTableIpLimits, rng: ref HmacDrbgContext,
distanceCalculator = XorDistanceCalculator): T =

View File

@ -38,7 +38,7 @@ type
client: Client
bindAddress: Address ## UDP binding address
transp: DatagramTransport
pendingRequests: Table[AESGCMNonce, PendingRequest]
pendingRequests: Table[AESGCMNonce, (PendingRequest, Moment)]
keyexchangeInProgress: HashSet[NodeId]
pendingRequestsByNode: Table[NodeId, seq[seq[byte]]]
codec*: Codec
@ -87,7 +87,7 @@ proc sendMessage*(t: Transport, toId: NodeId, toAddr: Address, message: seq[byte
proc registerRequest(t: Transport, n: Node, message: seq[byte],
nonce: AESGCMNonce) =
let request = PendingRequest(node: n, message: message)
if not t.pendingRequests.hasKeyOrPut(nonce, request):
if not t.pendingRequests.hasKeyOrPut(nonce, (request, Moment.now())):
sleepAsync(responseTimeout).addCallback() do(data: pointer):
t.pendingRequests.del(nonce)
@ -184,9 +184,16 @@ proc receive*(t: Transport, a: Address, packet: openArray[byte]) =
of Flag.Whoareyou:
trace "Received whoareyou packet", myport = t.bindAddress.port, address = a
var pr: PendingRequest
if t.pendingRequests.take(packet.whoareyou.requestNonce, pr):
let toNode = pr.node
var
prt: (PendingRequest, Moment)
if t.pendingRequests.take(packet.whoareyou.requestNonce, prt):
let
pr = prt[0]
startTime = prt[1]
toNode = pr.node
rtt = Moment.now() - startTime
# trace "whoareyou RTT:", rtt, node = toNode
toNode.registerRtt(rtt)
# This is a node we previously contacted and thus must have an address.
doAssert(toNode.address.isSome())
let address = toNode.address.get()