Implement tracing of lags in the logs. (#1465)
This commit is contained in:
parent
84a501d1ff
commit
38bf8ccbec
|
@ -12,7 +12,7 @@ const
|
||||||
LogTraceName* = "Beacon-Chain LogTrace Tool"
|
LogTraceName* = "Beacon-Chain LogTrace Tool"
|
||||||
LogTraceMajor*: int = 0
|
LogTraceMajor*: int = 0
|
||||||
LogTraceMinor*: int = 0
|
LogTraceMinor*: int = 0
|
||||||
LogTracePatch*: int = 1
|
LogTracePatch*: int = 4
|
||||||
LogTraceVersion* = $LogTraceMajor & "." & $LogTraceMinor & "." &
|
LogTraceVersion* = $LogTraceMajor & "." & $LogTraceMinor & "." &
|
||||||
$LogTracePatch
|
$LogTracePatch
|
||||||
LogTraceCopyright* = "Copyright(C) 2020" &
|
LogTraceCopyright* = "Copyright(C) 2020" &
|
||||||
|
@ -23,7 +23,7 @@ const
|
||||||
|
|
||||||
type
|
type
|
||||||
StartUpCommand* {.pure.} = enum
|
StartUpCommand* {.pure.} = enum
|
||||||
pubsub, asl, asr
|
pubsub, asl, asr, lat
|
||||||
|
|
||||||
LogTraceConf* = object
|
LogTraceConf* = object
|
||||||
logFiles* {.
|
logFiles* {.
|
||||||
|
@ -60,6 +60,11 @@ type
|
||||||
desc: "Specifies node names which logs will be used",
|
desc: "Specifies node names which logs will be used",
|
||||||
name: "nodes" }: seq[string]
|
name: "nodes" }: seq[string]
|
||||||
|
|
||||||
|
allowedLag* {.
|
||||||
|
desc: "Allowed latency lag multiplier",
|
||||||
|
name: "lag",
|
||||||
|
defaultValue: 2.0 }: float
|
||||||
|
|
||||||
case cmd* {.command.}: StartUpCommand
|
case cmd* {.command.}: StartUpCommand
|
||||||
of pubsub:
|
of pubsub:
|
||||||
discard
|
discard
|
||||||
|
@ -67,6 +72,8 @@ type
|
||||||
discard
|
discard
|
||||||
of asr:
|
of asr:
|
||||||
discard
|
discard
|
||||||
|
of lat:
|
||||||
|
discard
|
||||||
|
|
||||||
GossipDirection* = enum
|
GossipDirection* = enum
|
||||||
None, Incoming, Outgoing
|
None, Incoming, Outgoing
|
||||||
|
@ -154,14 +161,17 @@ type
|
||||||
proc readValue*(reader: var JsonReader, value: var DateTime) =
|
proc readValue*(reader: var JsonReader, value: var DateTime) =
|
||||||
let s = reader.readValue(string)
|
let s = reader.readValue(string)
|
||||||
try:
|
try:
|
||||||
value = parse(s, "YYYY-MM-dd HH:mm:sszzz")
|
value = parse(s, "YYYY-MM-dd HH:mm:ss'.'fffzzz", utc())
|
||||||
except CatchableError:
|
except CatchableError:
|
||||||
raiseUnexpectedValue(reader, "Invalid date time")
|
raiseUnexpectedValue(reader, "Invalid date time")
|
||||||
|
|
||||||
proc init(t: typedesc[GossipMessage], kind: GossipDirection, id,
|
proc init(t: typedesc[GossipMessage], kind: GossipDirection, id,
|
||||||
datestr: string): GossipMessage =
|
datestr: string): GossipMessage =
|
||||||
result = GossipMessage(kind: kind, id: id,
|
GossipMessage(
|
||||||
datetime: parse(datestr, "YYYY-MM-dd HH:mm:sszzz"))
|
kind: kind,
|
||||||
|
id: id,
|
||||||
|
datetime: parse(datestr, "YYYY-MM-dd HH:mm:ss'.'fffzzz")
|
||||||
|
)
|
||||||
|
|
||||||
proc `$`*(msg: GossipMessage): string =
|
proc `$`*(msg: GossipMessage): string =
|
||||||
result = msg.id
|
result = msg.id
|
||||||
|
@ -275,6 +285,41 @@ proc readLogFileForASRMessages(file: string, srnode: var SRANode,
|
||||||
finally:
|
finally:
|
||||||
stream.close()
|
stream.close()
|
||||||
|
|
||||||
|
proc readLogFileForSecondMessages(file: string, ignoreErrors = true,
|
||||||
|
dumpErrors = false): seq[LogMessage] =
|
||||||
|
var stream = newFileStream(file)
|
||||||
|
var line: string
|
||||||
|
var counter = 0
|
||||||
|
try:
|
||||||
|
while not (stream.atEnd()):
|
||||||
|
var m: LogMessage
|
||||||
|
line = stream.readLine()
|
||||||
|
inc(counter)
|
||||||
|
try:
|
||||||
|
m = Json.decode(line, LogMessage, allowUnknownFields = true)
|
||||||
|
except SerializationError as exc:
|
||||||
|
if dumpErrors:
|
||||||
|
error "Serialization error while reading file, ignoring", file = file,
|
||||||
|
line_number = counter, errorMsg = exc.formatMsg(line)
|
||||||
|
else:
|
||||||
|
error "Serialization error while reading file, ignoring", file = file,
|
||||||
|
line_number = counter
|
||||||
|
if not(ignoreErrors):
|
||||||
|
raise exc
|
||||||
|
else:
|
||||||
|
continue
|
||||||
|
if m.msg == "onSecond task completed":
|
||||||
|
result.add(m)
|
||||||
|
|
||||||
|
if counter mod 10_000 == 0:
|
||||||
|
info "Processing file", file = extractFilename(file),
|
||||||
|
lines_processed = counter,
|
||||||
|
seconds_filtered = len(result)
|
||||||
|
except CatchableError as exc:
|
||||||
|
warn "Error reading data from file", file = file, errorMsg = exc.msg
|
||||||
|
finally:
|
||||||
|
stream.close()
|
||||||
|
|
||||||
proc filterGossipMessages(log: seq[JsonNode]): seq[GossipMessage] =
|
proc filterGossipMessages(log: seq[JsonNode]): seq[GossipMessage] =
|
||||||
# Because of times.DateTime object we forced to turn off [ProveInit] warnings
|
# Because of times.DateTime object we forced to turn off [ProveInit] warnings
|
||||||
# You can remove this pragmas when Nim compiler or times.nim will be fixed.
|
# You can remove this pragmas when Nim compiler or times.nim will be fixed.
|
||||||
|
@ -416,7 +461,9 @@ proc runAttSend(logConf: LogTraceConf, logFiles: seq[string]) =
|
||||||
|
|
||||||
for item in logFiles:
|
for item in logFiles:
|
||||||
info "Processing log file", logFile = item
|
info "Processing log file", logFile = item
|
||||||
let data = readLogFileForAttsMessages(item)
|
let data = readLogFileForAttsMessages(item,
|
||||||
|
logConf.ignoreSerializationErrors,
|
||||||
|
logConf.dumpSerializationErrors)
|
||||||
|
|
||||||
var currentSlot: Option[SlotStartMessage]
|
var currentSlot: Option[SlotStartMessage]
|
||||||
for item in data:
|
for item in data:
|
||||||
|
@ -460,7 +507,9 @@ proc runAttSendReceive(logConf: LogTraceConf, nodes: seq[NodeDirectory]) =
|
||||||
for logfile in node.logs:
|
for logfile in node.logs:
|
||||||
let path = node.path & DirSep & logfile
|
let path = node.path & DirSep & logfile
|
||||||
info "Processing node's logfile", node = node.name, logfile = path
|
info "Processing node's logfile", node = node.name, logfile = path
|
||||||
readLogFileForASRMessages(path, srnode)
|
readLogFileForASRMessages(path, srnode,
|
||||||
|
logConf.ignoreSerializationErrors,
|
||||||
|
logConf.dumpSerializationErrors)
|
||||||
srnodes.add(srnode)
|
srnodes.add(srnode)
|
||||||
|
|
||||||
if len(nodes) < 2:
|
if len(nodes) < 2:
|
||||||
|
@ -490,6 +539,52 @@ proc runAttSendReceive(logConf: LogTraceConf, nodes: seq[NodeDirectory]) =
|
||||||
sucessfull_broadcasts = success, failed_broadcasts = failed,
|
sucessfull_broadcasts = success, failed_broadcasts = failed,
|
||||||
total_broadcasts = len(srnodes[i].sends)
|
total_broadcasts = len(srnodes[i].sends)
|
||||||
|
|
||||||
|
proc runLatencyCheck(logConf: LogTraceConf, logFiles: seq[string],
|
||||||
|
nodes: seq[NodeDirectory]) =
|
||||||
|
info "Check for async responsiveness"
|
||||||
|
if len(nodes) == 0 and len(logFiles) == 0:
|
||||||
|
error "Number of log files are not enough", nodes_count = len(nodes)
|
||||||
|
quit(1)
|
||||||
|
|
||||||
|
let allowedTime = int64(float(initDuration(seconds = 1).inMilliseconds()) *
|
||||||
|
logConf.allowedLag)
|
||||||
|
|
||||||
|
for logFile in logFiles:
|
||||||
|
info "Processing log file", logfile = logFile
|
||||||
|
let msgs = readLogFileForSecondMessages(logFile,
|
||||||
|
logConf.ignoreSerializationErrors,
|
||||||
|
logConf.dumpSerializationErrors)
|
||||||
|
var lastSecond: Option[LogMessage]
|
||||||
|
var minEntry: Option[LogMessage]
|
||||||
|
var maxEntry: Option[LogMessage]
|
||||||
|
var minTime: times.Duration = initDuration(days = 1)
|
||||||
|
var maxTime: times.Duration
|
||||||
|
var sumMilliseconds: int64
|
||||||
|
|
||||||
|
for item in msgs:
|
||||||
|
if lastSecond.isNone():
|
||||||
|
lastSecond = some(item)
|
||||||
|
else:
|
||||||
|
let time = item.timestamp - lastSecond.get().timestamp
|
||||||
|
let start_time = lastSecond.get().timestamp
|
||||||
|
let finish_time = item.timestamp
|
||||||
|
if time.inMilliseconds() > allowedTime:
|
||||||
|
info "Found time lag ",
|
||||||
|
start_time = start_time.format("yyyy-MM-dd HH:mm:ss'.'fff"),
|
||||||
|
finish_time = finish_time.format("yyyy-MM-dd HH:mm:ss'.'fff"),
|
||||||
|
lag_time = time
|
||||||
|
if time < minTime:
|
||||||
|
minTime = time
|
||||||
|
minEntry = some(item)
|
||||||
|
if time > maxTime:
|
||||||
|
maxTime = time
|
||||||
|
maxEntry = some(item)
|
||||||
|
sumMilliseconds += time.inMilliseconds()
|
||||||
|
lastSecond = some(item)
|
||||||
|
let avgTime = initDuration(milliseconds = sumMilliseconds div len(msgs))
|
||||||
|
info "Latency statistics", min_time = minTime, max_time = maxTime,
|
||||||
|
avg_time = avgTime, seconds_count = len(msgs)
|
||||||
|
|
||||||
proc run(conf: LogTraceConf) =
|
proc run(conf: LogTraceConf) =
|
||||||
var logFiles: seq[string]
|
var logFiles: seq[string]
|
||||||
var logNodes: seq[NodeDirectory]
|
var logNodes: seq[NodeDirectory]
|
||||||
|
@ -515,12 +610,15 @@ proc run(conf: LogTraceConf) =
|
||||||
error "Log file sources not specified or not enough log files found"
|
error "Log file sources not specified or not enough log files found"
|
||||||
quit(1)
|
quit(1)
|
||||||
|
|
||||||
if conf.cmd == StartUpCommand.pubsub:
|
case conf.cmd
|
||||||
|
of StartUpCommand.pubsub:
|
||||||
runPubsub(conf, logFiles)
|
runPubsub(conf, logFiles)
|
||||||
elif conf.cmd == StartUpCommand.asl:
|
of StartUpCommand.asl:
|
||||||
runAttSend(conf, logFiles)
|
runAttSend(conf, logFiles)
|
||||||
elif conf.cmd == StartUpCommand.asr:
|
of StartUpCommand.asr:
|
||||||
runAttSendReceive(conf, logNodes)
|
runAttSendReceive(conf, logNodes)
|
||||||
|
of StartUpCommand.lat:
|
||||||
|
runLatencyCheck(conf, logFiles, logNodes)
|
||||||
|
|
||||||
when isMainModule:
|
when isMainModule:
|
||||||
echo LogTraceHeader
|
echo LogTraceHeader
|
||||||
|
|
Loading…
Reference in New Issue