nwaku/apps/liteprotocoltester/statistics.nim

203 lines
6.4 KiB
Nim
Raw Normal View History

{.push raises: [].}
import
std/[sets, tables, strutils, sequtils, options, strformat],
chronos/timer as chtimer,
chronicles,
results
import ./tester_message
type
StatHelper = object
prevIndex: uint32
prevArrivedAt: Moment
lostIndices: HashSet[uint32]
seenIndices: HashSet[uint32]
maxIndex: uint32
Statistics* = object
allMessageCount*: uint32
receivedMessages*: uint32
misorderCount*: uint32
lateCount*: uint32
duplicateCount*: uint32
minLatency*: Duration
maxLatency*: Duration
cummulativeLatency: Duration
helper: StatHelper
PerPeerStatistics* = Table[string, Statistics]
func `$`*(a: Duration): string {.inline.} =
## Original stringify implementation from chronos/timer.nim is not capable of printing 0ns
## Returns string representation of Duration ``a`` as nanoseconds value.
if a.isZero:
return "0ns"
return chtimer.`$`(a)
proc init*(T: type Statistics, expectedMessageCount: int = 1000): T =
result.helper.prevIndex = 0
result.helper.maxIndex = 0
result.helper.seenIndices.init(expectedMessageCount)
result.minLatency = nanos(0)
result.maxLatency = nanos(0)
result.cummulativeLatency = nanos(0)
return result
proc addMessage*(self: var Statistics, msg: ProtocolTesterMessage) =
if self.allMessageCount == 0:
self.allMessageCount = msg.count
elif self.allMessageCount != msg.count:
warn "Message count mismatch at message",
index = msg.index, expected = self.allMessageCount, got = msg.count
if not self.helper.seenIndices.contains(msg.index):
self.helper.seenIndices.incl(msg.index)
else:
inc(self.duplicateCount)
warn "Duplicate message", index = msg.index
## just do not count into stats
return
## detect misorder arrival and possible lost messages
if self.helper.prevIndex + 1 < msg.index:
inc(self.misorderCount)
warn "Misordered message arrival",
index = msg.index, expected = self.helper.prevIndex + 1
## collect possible lost message indicies
for idx in self.helper.prevIndex + 1 ..< msg.index:
self.helper.lostIndices.incl(idx)
elif self.helper.prevIndex > msg.index:
inc(self.lateCount)
warn "Late message arrival", index = msg.index, expected = self.helper.prevIndex + 1
else:
## may remove late arrival
self.helper.lostIndices.excl(msg.index)
## calculate latency
let currentArrivedAt = Moment.now()
let delaySincePrevArrived: Duration = currentArrivedAt - self.helper.prevArrivedAt
let expectedDelay: Duration = nanos(msg.sincePrev)
var latency: Duration
# if we have any latency...
if expectedDelay > delaySincePrevArrived:
latency = delaySincePrevArrived - expectedDelay
if self.minLatency.isZero or (latency < self.minLatency and latency > nanos(0)):
self.minLatency = latency
if latency > self.maxLatency:
self.maxLatency = latency
self.cummulativeLatency += latency
else:
warn "Negative latency detected",
index = msg.index, expected = expectedDelay, actual = delaySincePrevArrived
self.helper.maxIndex = max(self.helper.maxIndex, msg.index)
self.helper.prevIndex = msg.index
self.helper.prevArrivedAt = currentArrivedAt
inc(self.receivedMessages)
proc addMessage*(
self: var PerPeerStatistics, peerId: string, msg: ProtocolTesterMessage
) =
if not self.contains(peerId):
self[peerId] = Statistics.init()
discard catch:
self[peerId].addMessage(msg)
proc lossCount*(self: Statistics): uint32 =
self.helper.maxIndex - self.receivedMessages
proc averageLatency*(self: Statistics): Duration =
if self.receivedMessages == 0:
return nanos(0)
return self.cummulativeLatency div self.receivedMessages
proc echoStat*(self: Statistics) =
let printable = catch:
"""*------------------------------------------------------------------------------------------*
| Expected | Received | Target | Loss | Misorder | Late | Duplicate |
|{self.helper.maxIndex:>11} |{self.receivedMessages:>11} |{self.allMessageCount:>11} |{self.lossCount():>11} |{self.misorderCount:>11} |{self.lateCount:>11} |{self.duplicateCount:>11} |
*------------------------------------------------------------------------------------------*
| Latency stat: |
| avg latency: {$self.averageLatency():<73}|
| min latency: {$self.maxLatency:<73}|
| max latency: {$self.minLatency:<73}|
*------------------------------------------------------------------------------------------*""".fmt()
if printable.isErr():
echo "Error while printing statistics: " & printable.error().msg
else:
echo printable.get()
proc jsonStat*(self: Statistics): string =
let json = catch:
"""{{"expected":{self.helper.maxIndex},
"received": {self.receivedMessages},
"target": {self.allMessageCount},
"loss": {self.lossCount()},
"misorder": {self.misorderCount},
"late": {self.lateCount},
"duplicate": {self.duplicateCount},
"latency":
{{"avg": "{self.averageLatency()}",
"min": "{self.minLatency}",
"max": "{self.maxLatency}"
}}
}}""".fmt()
if json.isErr:
return "{\"result:\": \"" & json.error.msg & "\"}"
return json.get()
proc echoStats*(self: var PerPeerStatistics) =
for peerId, stats in self.pairs:
let peerLine = catch:
"Receiver statistics from peer {peerId}".fmt()
if peerLine.isErr:
echo "Error while printing statistics"
else:
echo peerLine.get()
stats.echoStat()
proc jsonStats*(self: PerPeerStatistics): string =
try:
#!fmt: off
var json = "{\"statistics\": ["
var first = true
for peerId, stats in self.pairs:
if first:
first = false
else:
json.add(", ")
json.add("{{\"sender\": \"{peerId}\", \"stat\":".fmt())
json.add(stats.jsonStat())
json.add("}")
json.add("]}")
return json
#!fmt: on
except CatchableError:
return
"{\"result:\": \"Error while generating json stats: " & getCurrentExceptionMsg() &
"\"}"
proc checkIfAllMessagesReceived*(self: PerPeerStatistics): bool =
# if there are no peers have sent messages, assume we just have started.
if self.len == 0:
return false
for stat in self.values:
if (stat.allMessageCount == 0 and stat.receivedMessages == 0) or
stat.receivedMessages < stat.allMessageCount:
return false
return true