mirror of https://github.com/waku-org/nwaku.git
203 lines
6.4 KiB
Nim
203 lines
6.4 KiB
Nim
{.push raises: [].}
|
|
|
|
import
|
|
std/[sets, tables, strutils, sequtils, options, strformat],
|
|
chronos/timer as chtimer,
|
|
chronicles,
|
|
results
|
|
|
|
import ./tester_message
|
|
|
|
type
|
|
StatHelper = object
|
|
prevIndex: uint32
|
|
prevArrivedAt: Moment
|
|
lostIndices: HashSet[uint32]
|
|
seenIndices: HashSet[uint32]
|
|
maxIndex: uint32
|
|
|
|
Statistics* = object
|
|
allMessageCount*: uint32
|
|
receivedMessages*: uint32
|
|
misorderCount*: uint32
|
|
lateCount*: uint32
|
|
duplicateCount*: uint32
|
|
minLatency*: Duration
|
|
maxLatency*: Duration
|
|
cummulativeLatency: Duration
|
|
helper: StatHelper
|
|
|
|
PerPeerStatistics* = Table[string, Statistics]
|
|
|
|
func `$`*(a: Duration): string {.inline.} =
|
|
## Original stringify implementation from chronos/timer.nim is not capable of printing 0ns
|
|
## Returns string representation of Duration ``a`` as nanoseconds value.
|
|
|
|
if a.isZero:
|
|
return "0ns"
|
|
|
|
return chtimer.`$`(a)
|
|
|
|
proc init*(T: type Statistics, expectedMessageCount: int = 1000): T =
|
|
result.helper.prevIndex = 0
|
|
result.helper.maxIndex = 0
|
|
result.helper.seenIndices.init(expectedMessageCount)
|
|
result.minLatency = nanos(0)
|
|
result.maxLatency = nanos(0)
|
|
result.cummulativeLatency = nanos(0)
|
|
return result
|
|
|
|
proc addMessage*(self: var Statistics, msg: ProtocolTesterMessage) =
|
|
if self.allMessageCount == 0:
|
|
self.allMessageCount = msg.count
|
|
elif self.allMessageCount != msg.count:
|
|
warn "Message count mismatch at message",
|
|
index = msg.index, expected = self.allMessageCount, got = msg.count
|
|
|
|
if not self.helper.seenIndices.contains(msg.index):
|
|
self.helper.seenIndices.incl(msg.index)
|
|
else:
|
|
inc(self.duplicateCount)
|
|
warn "Duplicate message", index = msg.index
|
|
## just do not count into stats
|
|
return
|
|
|
|
## detect misorder arrival and possible lost messages
|
|
if self.helper.prevIndex + 1 < msg.index:
|
|
inc(self.misorderCount)
|
|
warn "Misordered message arrival",
|
|
index = msg.index, expected = self.helper.prevIndex + 1
|
|
|
|
## collect possible lost message indicies
|
|
for idx in self.helper.prevIndex + 1 ..< msg.index:
|
|
self.helper.lostIndices.incl(idx)
|
|
elif self.helper.prevIndex > msg.index:
|
|
inc(self.lateCount)
|
|
warn "Late message arrival", index = msg.index, expected = self.helper.prevIndex + 1
|
|
else:
|
|
## may remove late arrival
|
|
self.helper.lostIndices.excl(msg.index)
|
|
|
|
## calculate latency
|
|
let currentArrivedAt = Moment.now()
|
|
|
|
let delaySincePrevArrived: Duration = currentArrivedAt - self.helper.prevArrivedAt
|
|
|
|
let expectedDelay: Duration = nanos(msg.sincePrev)
|
|
|
|
var latency: Duration
|
|
|
|
# if we have any latency...
|
|
if expectedDelay > delaySincePrevArrived:
|
|
latency = delaySincePrevArrived - expectedDelay
|
|
if self.minLatency.isZero or (latency < self.minLatency and latency > nanos(0)):
|
|
self.minLatency = latency
|
|
if latency > self.maxLatency:
|
|
self.maxLatency = latency
|
|
self.cummulativeLatency += latency
|
|
else:
|
|
warn "Negative latency detected",
|
|
index = msg.index, expected = expectedDelay, actual = delaySincePrevArrived
|
|
|
|
self.helper.maxIndex = max(self.helper.maxIndex, msg.index)
|
|
self.helper.prevIndex = msg.index
|
|
self.helper.prevArrivedAt = currentArrivedAt
|
|
inc(self.receivedMessages)
|
|
|
|
proc addMessage*(
|
|
self: var PerPeerStatistics, peerId: string, msg: ProtocolTesterMessage
|
|
) =
|
|
if not self.contains(peerId):
|
|
self[peerId] = Statistics.init()
|
|
|
|
discard catch:
|
|
self[peerId].addMessage(msg)
|
|
|
|
proc lossCount*(self: Statistics): uint32 =
|
|
self.helper.maxIndex - self.receivedMessages
|
|
|
|
proc averageLatency*(self: Statistics): Duration =
|
|
if self.receivedMessages == 0:
|
|
return nanos(0)
|
|
return self.cummulativeLatency div self.receivedMessages
|
|
|
|
proc echoStat*(self: Statistics) =
|
|
let printable = catch:
|
|
"""*------------------------------------------------------------------------------------------*
|
|
| Expected | Received | Target | Loss | Misorder | Late | Duplicate |
|
|
|{self.helper.maxIndex:>11} |{self.receivedMessages:>11} |{self.allMessageCount:>11} |{self.lossCount():>11} |{self.misorderCount:>11} |{self.lateCount:>11} |{self.duplicateCount:>11} |
|
|
*------------------------------------------------------------------------------------------*
|
|
| Latency stat: |
|
|
| avg latency: {$self.averageLatency():<73}|
|
|
| min latency: {$self.maxLatency:<73}|
|
|
| max latency: {$self.minLatency:<73}|
|
|
*------------------------------------------------------------------------------------------*""".fmt()
|
|
|
|
if printable.isErr():
|
|
echo "Error while printing statistics: " & printable.error().msg
|
|
else:
|
|
echo printable.get()
|
|
|
|
proc jsonStat*(self: Statistics): string =
|
|
let json = catch:
|
|
"""{{"expected":{self.helper.maxIndex},
|
|
"received": {self.receivedMessages},
|
|
"target": {self.allMessageCount},
|
|
"loss": {self.lossCount()},
|
|
"misorder": {self.misorderCount},
|
|
"late": {self.lateCount},
|
|
"duplicate": {self.duplicateCount},
|
|
"latency":
|
|
{{"avg": "{self.averageLatency()}",
|
|
"min": "{self.minLatency}",
|
|
"max": "{self.maxLatency}"
|
|
}}
|
|
}}""".fmt()
|
|
if json.isErr:
|
|
return "{\"result:\": \"" & json.error.msg & "\"}"
|
|
|
|
return json.get()
|
|
|
|
proc echoStats*(self: var PerPeerStatistics) =
|
|
for peerId, stats in self.pairs:
|
|
let peerLine = catch:
|
|
"Receiver statistics from peer {peerId}".fmt()
|
|
if peerLine.isErr:
|
|
echo "Error while printing statistics"
|
|
else:
|
|
echo peerLine.get()
|
|
stats.echoStat()
|
|
|
|
proc jsonStats*(self: PerPeerStatistics): string =
|
|
try:
|
|
#!fmt: off
|
|
var json = "{\"statistics\": ["
|
|
var first = true
|
|
for peerId, stats in self.pairs:
|
|
if first:
|
|
first = false
|
|
else:
|
|
json.add(", ")
|
|
json.add("{{\"sender\": \"{peerId}\", \"stat\":".fmt())
|
|
json.add(stats.jsonStat())
|
|
json.add("}")
|
|
json.add("]}")
|
|
return json
|
|
#!fmt: on
|
|
except CatchableError:
|
|
return
|
|
"{\"result:\": \"Error while generating json stats: " & getCurrentExceptionMsg() &
|
|
"\"}"
|
|
|
|
proc checkIfAllMessagesReceived*(self: PerPeerStatistics): bool =
|
|
# if there are no peers have sent messages, assume we just have started.
|
|
if self.len == 0:
|
|
return false
|
|
|
|
for stat in self.values:
|
|
if (stat.allMessageCount == 0 and stat.receivedMessages == 0) or
|
|
stat.receivedMessages < stat.allMessageCount:
|
|
return false
|
|
|
|
return true
|