From 38bf8ccbecf6e6d6102d6f35e789a57c152dd314 Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Fri, 7 Aug 2020 19:22:58 +0300 Subject: [PATCH] Implement tracing of lags in the logs. (#1465) --- beacon_chain/logtrace.nim | 118 ++++++++++++++++++++++++++++++++++---- 1 file changed, 108 insertions(+), 10 deletions(-) diff --git a/beacon_chain/logtrace.nim b/beacon_chain/logtrace.nim index 5a94a000d..664e98321 100644 --- a/beacon_chain/logtrace.nim +++ b/beacon_chain/logtrace.nim @@ -12,7 +12,7 @@ const LogTraceName* = "Beacon-Chain LogTrace Tool" LogTraceMajor*: int = 0 LogTraceMinor*: int = 0 - LogTracePatch*: int = 1 + LogTracePatch*: int = 4 LogTraceVersion* = $LogTraceMajor & "." & $LogTraceMinor & "." & $LogTracePatch LogTraceCopyright* = "Copyright(C) 2020" & @@ -23,7 +23,7 @@ const type StartUpCommand* {.pure.} = enum - pubsub, asl, asr + pubsub, asl, asr, lat LogTraceConf* = object logFiles* {. @@ -60,6 +60,11 @@ type desc: "Specifies node names which logs will be used", name: "nodes" }: seq[string] + allowedLag* {. + desc: "Allowed latency lag multiplier", + name: "lag", + defaultValue: 2.0 }: float + case cmd* {.command.}: StartUpCommand of pubsub: discard @@ -67,6 +72,8 @@ type discard of asr: discard + of lat: + discard GossipDirection* = enum None, Incoming, Outgoing @@ -154,14 +161,17 @@ type proc readValue*(reader: var JsonReader, value: var DateTime) = let s = reader.readValue(string) try: - value = parse(s, "YYYY-MM-dd HH:mm:sszzz") + value = parse(s, "YYYY-MM-dd HH:mm:ss'.'fffzzz", utc()) except CatchableError: raiseUnexpectedValue(reader, "Invalid date time") proc init(t: typedesc[GossipMessage], kind: GossipDirection, id, datestr: string): GossipMessage = - result = GossipMessage(kind: kind, id: id, - datetime: parse(datestr, "YYYY-MM-dd HH:mm:sszzz")) + GossipMessage( + kind: kind, + id: id, + datetime: parse(datestr, "YYYY-MM-dd HH:mm:ss'.'fffzzz") + ) proc `$`*(msg: GossipMessage): string = result = msg.id @@ -275,6 +285,41 @@ proc readLogFileForASRMessages(file: string, srnode: var SRANode, finally: stream.close() +proc readLogFileForSecondMessages(file: string, ignoreErrors = true, + dumpErrors = false): seq[LogMessage] = + var stream = newFileStream(file) + var line: string + var counter = 0 + try: + while not (stream.atEnd()): + var m: LogMessage + line = stream.readLine() + inc(counter) + try: + m = Json.decode(line, LogMessage, allowUnknownFields = true) + except SerializationError as exc: + if dumpErrors: + error "Serialization error while reading file, ignoring", file = file, + line_number = counter, errorMsg = exc.formatMsg(line) + else: + error "Serialization error while reading file, ignoring", file = file, + line_number = counter + if not(ignoreErrors): + raise exc + else: + continue + if m.msg == "onSecond task completed": + result.add(m) + + if counter mod 10_000 == 0: + info "Processing file", file = extractFilename(file), + lines_processed = counter, + seconds_filtered = len(result) + except CatchableError as exc: + warn "Error reading data from file", file = file, errorMsg = exc.msg + finally: + stream.close() + proc filterGossipMessages(log: seq[JsonNode]): seq[GossipMessage] = # Because of times.DateTime object we forced to turn off [ProveInit] warnings # You can remove this pragmas when Nim compiler or times.nim will be fixed. @@ -416,7 +461,9 @@ proc runAttSend(logConf: LogTraceConf, logFiles: seq[string]) = for item in logFiles: info "Processing log file", logFile = item - let data = readLogFileForAttsMessages(item) + let data = readLogFileForAttsMessages(item, + logConf.ignoreSerializationErrors, + logConf.dumpSerializationErrors) var currentSlot: Option[SlotStartMessage] for item in data: @@ -460,7 +507,9 @@ proc runAttSendReceive(logConf: LogTraceConf, nodes: seq[NodeDirectory]) = for logfile in node.logs: let path = node.path & DirSep & logfile info "Processing node's logfile", node = node.name, logfile = path - readLogFileForASRMessages(path, srnode) + readLogFileForASRMessages(path, srnode, + logConf.ignoreSerializationErrors, + logConf.dumpSerializationErrors) srnodes.add(srnode) if len(nodes) < 2: @@ -490,6 +539,52 @@ proc runAttSendReceive(logConf: LogTraceConf, nodes: seq[NodeDirectory]) = sucessfull_broadcasts = success, failed_broadcasts = failed, total_broadcasts = len(srnodes[i].sends) +proc runLatencyCheck(logConf: LogTraceConf, logFiles: seq[string], + nodes: seq[NodeDirectory]) = + info "Check for async responsiveness" + if len(nodes) == 0 and len(logFiles) == 0: + error "Number of log files are not enough", nodes_count = len(nodes) + quit(1) + + let allowedTime = int64(float(initDuration(seconds = 1).inMilliseconds()) * + logConf.allowedLag) + + for logFile in logFiles: + info "Processing log file", logfile = logFile + let msgs = readLogFileForSecondMessages(logFile, + logConf.ignoreSerializationErrors, + logConf.dumpSerializationErrors) + var lastSecond: Option[LogMessage] + var minEntry: Option[LogMessage] + var maxEntry: Option[LogMessage] + var minTime: times.Duration = initDuration(days = 1) + var maxTime: times.Duration + var sumMilliseconds: int64 + + for item in msgs: + if lastSecond.isNone(): + lastSecond = some(item) + else: + let time = item.timestamp - lastSecond.get().timestamp + let start_time = lastSecond.get().timestamp + let finish_time = item.timestamp + if time.inMilliseconds() > allowedTime: + info "Found time lag ", + start_time = start_time.format("yyyy-MM-dd HH:mm:ss'.'fff"), + finish_time = finish_time.format("yyyy-MM-dd HH:mm:ss'.'fff"), + lag_time = time + if time < minTime: + minTime = time + minEntry = some(item) + if time > maxTime: + maxTime = time + maxEntry = some(item) + sumMilliseconds += time.inMilliseconds() + lastSecond = some(item) + let avgTime = initDuration(milliseconds = sumMilliseconds div len(msgs)) + info "Latency statistics", min_time = minTime, max_time = maxTime, + avg_time = avgTime, seconds_count = len(msgs) + proc run(conf: LogTraceConf) = var logFiles: seq[string] var logNodes: seq[NodeDirectory] @@ -515,12 +610,15 @@ proc run(conf: LogTraceConf) = error "Log file sources not specified or not enough log files found" quit(1) - if conf.cmd == StartUpCommand.pubsub: + case conf.cmd + of StartUpCommand.pubsub: runPubsub(conf, logFiles) - elif conf.cmd == StartUpCommand.asl: + of StartUpCommand.asl: runAttSend(conf, logFiles) - elif conf.cmd == StartUpCommand.asr: + of StartUpCommand.asr: runAttSendReceive(conf, logNodes) + of StartUpCommand.lat: + runLatencyCheck(conf, logFiles, logNodes) when isMainModule: echo LogTraceHeader