# beacon_chain # Copyright (c) 2018-2021 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # at your option. This file may not be copied, modified, or distributed except according to those terms. import confutils, json, times, streams, os, strutils, options, chronicles, tables, sequtils import json_serialization const LogTraceName = "Beacon-Chain LogTrace Tool" LogTraceMajor: int = 0 LogTraceMinor: int = 0 LogTracePatch: int = 4 LogTraceVersion = $LogTraceMajor & "." & $LogTraceMinor & "." & $LogTracePatch LogTraceCopyright = "Copyright(C) 2021" & " Status Research & Development GmbH" LogTraceHeader = LogTraceName & ", Version " & LogTraceVersion & " [" & hostOS & ": " & hostCPU & "]\r\n" & LogTraceCopyright & "\r\n" type StartUpCommand {.pure.} = enum pubsub, asl, asr, aggasr, scmsr, csr, lat, traceAll LogTraceConf = object logFiles {. desc: "Specifies one or more log files", abbr: "f", name: "log-file" }: seq[string] simDir {. desc: "Specifies path to eth2_network_simulation directory", name: "sim-dir", defaultValue: "" }: string netDir {. desc: "Specifies path to network build directory", name: "net-dir", defaultValue: "" }: string logDir {. desc: "Specifies path with bunch of logs", name: "log-dir", defaultValue: "" }: string ignoreSerializationErrors {. desc: "Ignore serialization errors while parsing log files", name: "ignore-errors", defaultValue: true }: bool dumpSerializationErrors {. desc: "Dump full serialization errors while parsing log files", name: "dump-errors", defaultValue: false }: bool nodes {. desc: "Specifies node names which logs will be used", name: "nodes" }: seq[string] allowedLag {. desc: "Allowed latency lag multiplier", name: "lag", defaultValue: 2.0 }: float case cmd {.command.}: StartUpCommand of pubsub: discard of asl: discard of asr: discard of aggasr: discard of scmsr: discard of csr: discard of lat: discard of traceAll: discard GossipDirection = enum None, Incoming, Outgoing NodeDirectory = object name: string path: string logs: seq[string] LogMessage = object of RootObj level {.serializedFieldName: "lvl" .}: string timestamp {.serializedFieldName: "ts" .}: DateTime msg: string topics: string tid: int SlotStartMessage = object of LogMessage beaconTime: uint64 finalizedEpoch: uint64 finalizedRoot: string finalizedSlot: uint64 headEpoch: uint64 headRoot: string headSlot: uint64 lastSlot: uint64 peers: uint64 scheduledSlot: uint64 AttestationDataObject = object slot: uint64 index: uint64 beaconBlockRoot {.serializedFieldName: "beacon_block_root".}: string sourceEpoch {.serializedFieldName: "source_epoch".}: uint64 sourceRoot {.serializedFieldName: "source_root".}: string targetEpoch {.serializedFieldName: "target_epoch".}: uint64 targetRoot {.serializedFieldName: "target_root".}: string AttestationObject = object aggregationBits {.serializedFieldName: "aggregation_bits".}: string data: AttestationDataObject signature: string AttestationSentMessage = object of LogMessage attestation: AttestationObject indexInCommittee: uint64 validator: string AttestationReceivedMessage = object of LogMessage attestation: AttestationObject head: string wallSlot: uint64 pcs: string AggregatedAttestationSentMessage = object of LogMessage attestation: AttestationObject validator: string signature: string aggregationSlot: uint64 AggregatedAttestationReceivedMessage = object of LogMessage aggregate: AttestationObject wallSlot: uint64 signature: string SyncCommitteeMessageObject = object slot: uint64 beaconBlockRoot {.serializedFieldName: "beacon_block_root".}: string validatorIndex {.serializedFieldName: "validator_index".}: uint64 signature: string ContributionObject = object slot: uint64 beaconBlockRoot {.serializedFieldName: "beacon_block_root".}: string subnetId: uint64 aggregationBits {.serializedFieldName: "aggregation_bits".}: string ContributionMessageObject = object aggregatorIndex {.serializedFieldName: "aggregator_index".}: uint64 contribution: ContributionObject selectionProof {.serializedFieldName: "selection_proof".}: string ContributionSentObject = object message: ContributionMessageObject signature: string SCMSentMessage = object of LogMessage message: SyncCommitteeMessageObject validator: string SCMReceivedMessage = object of LogMessage wallSlot: uint64 syncCommitteeMsg: SyncCommitteeMessageObject subcommitteeIdx: uint64 ContributionSentMessage = object of LogMessage contribution: ContributionSentObject ContributionReceivedMessage = object of LogMessage contribution: ContributionObject wallSlot: uint64 aggregatorIndex {.serializedFieldName: "aggregator_index".}: uint64 signature: string selectionProof {.serializedFieldName: "selection_proof".}: string GossipMessage = object kind: GossipDirection id: string datetime: DateTime processed: bool SMessageType {.pure.} = enum AttestationSent, SCMSent, SlotStart SlotMessage = object case kind: SMessageType of SMessageType.AttestationSent: asmsg: AttestationSentMessage of SMessageType.SCMSent: scmsmsg: SCMSentMessage of SMessageType.SlotStart: ssmsg: SlotStartMessage # SlotMessage = object # case kind: SMessageType # of SMessageType.SCMSent: # scmsmsg: SCMSentMessage # of SMessageType.SlotStart: # ssmsg: SlotStartMessage SRANode = object directory: NodeDirectory sends: seq[AttestationSentMessage] recvs: TableRef[string, AttestationReceivedMessage] aggSends: seq[AggregatedAttestationSentMessage] aggRecvs: TableRef[string, AggregatedAttestationReceivedMessage] SRSCNode = object directory: NodeDirectory sends: seq[SCMSentMessage] recvs: TableRef[string, SCMReceivedMessage] contributionSends: seq[ContributionSentMessage] contributionRecvs: TableRef[string, ContributionReceivedMessage] proc readValue(reader: var JsonReader, value: var DateTime) = let s = reader.readValue(string) try: value = parse(s, "YYYY-MM-dd HH:mm:ss'.'fffzzz", utc()) except CatchableError: raiseUnexpectedValue(reader, "Invalid date time") proc init(t: typedesc[GossipMessage], kind: GossipDirection, id, datestr: string): GossipMessage = GossipMessage( kind: kind, id: id, datetime: parse(datestr, "YYYY-MM-dd HH:mm:ss'.'fffzzz") ) func `$`(msg: GossipMessage): string = msg.id proc readLogFile(file: string): seq[JsonNode] = var res = newSeq[JsonNode]() var stream = newFileStream(file) try: while not(stream.atEnd()): var line = stream.readLine() let node = parseJson(line) res.add(node) result = res except CatchableError as exc: warn "Error reading JSON data from file", file = file, errorMsg = exc.msg finally: stream.close() proc readLogFileForAttsMessages(file: string, ignoreErrors = true, dumpErrors = false): seq[SlotMessage] = var res = newSeq[SlotMessage]() var stream = newFileStream(file) var line: string var counter = 0 try: while not(stream.atEnd()): line = stream.readLine() inc(counter) var m: LogMessage try: m = Json.decode(line, LogMessage, allowUnknownFields = true) except SerializationError as exc: if dumpErrors: error "Serialization error while reading file, ignoring", file = file, line_number = counter, errorMsg = exc.formatMsg(line) else: error "Serialization error while reading file, ignoring", file = file, line_number = counter if not(ignoreErrors): raise exc else: continue if m.msg == "Attestation sent": let am = Json.decode(line, AttestationSentMessage, allowUnknownFields = true) let m = SlotMessage(kind: SMessageType.AttestationSent, asmsg: am) res.add(m) elif m.msg == "Slot start": let sm = Json.decode(line, SlotStartMessage, allowUnknownFields = true) let m = SlotMessage(kind: SMessageType.SlotStart, ssmsg: sm) res.add(m) if counter mod 10_000 == 0: info "Processing file", file = extractFilename(file), lines_processed = counter, lines_filtered = len(res) result = res except CatchableError as exc: warn "Error reading data from file", file = file, errorMsg = exc.msg finally: stream.close() proc readLogFileForASRMessages(file: string, srnode: var SRANode, ignoreErrors = true, dumpErrors = false) = var stream = newFileStream(file) var line: string var counter = 0 try: while not(stream.atEnd()): var m: LogMessage line = stream.readLine() inc(counter) try: m = Json.decode(line, LogMessage, allowUnknownFields = true) except SerializationError as exc: if dumpErrors: error "Serialization error while reading file, ignoring", file = file, line_number = counter, errorMsg = exc.formatMsg(line) else: error "Serialization error while reading file, ignoring", file = file, line_number = counter if not(ignoreErrors): raise exc else: continue if m.msg == "Attestation sent": let sm = Json.decode(line, AttestationSentMessage, allowUnknownFields = true) srnode.sends.add(sm) elif m.msg == "Attestation received": let rm = Json.decode(line, AttestationReceivedMessage, allowUnknownFields = true) discard srnode.recvs.hasKeyOrPut(rm.attestation.signature, rm) elif m.msg == "Aggregate received": let rm = Json.decode(line, AggregatedAttestationReceivedMessage, allowUnknownFields = true) discard srnode.aggRecvs.hasKeyOrPut(rm.signature, rm) elif m.msg == "Aggregated attestation sent": let sm = Json.decode(line, AggregatedAttestationSentMessage, allowUnknownFields = true) srnode.aggSends.add(sm) if counter mod 10_000 == 0: info "Processing file", file = extractFilename(file), lines_processed = counter, sends_filtered = len(srnode.sends), recvs_filtered = len(srnode.recvs) except CatchableError as exc: warn "Error reading data from file", file = file, errorMsg = exc.msg finally: stream.close() proc readLogFileForSCMSendMessages(file: string, ignoreErrors = true, dumpErrors = false): seq[SlotMessage] = var res = newSeq[SlotMessage]() var stream = newFileStream(file) var line: string var counter = 0 try: while not(stream.atEnd()): line = stream.readLine() inc(counter) var m: LogMessage try: m = Json.decode(line, LogMessage, allowUnknownFields = true) except SerializationError as exc: if dumpErrors: error "Serialization error while reading file, ignoring", file = file, line_number = counter, errorMsg = exc.formatMsg(line) else: error "Serialization error while reading file, ignoring", file = file, line_number = counter if not(ignoreErrors): raise exc else: continue if m.msg == "Sync committee message sent": let scmm = Json.decode(line, SCMSentMessage, allowUnknownFields = true) let m = SlotMessage(kind: SMessageType.SCMSent, scmsmsg: scmm) res.add(m) elif m.msg == "Slot start": let sm = Json.decode(line, SlotStartMessage, allowUnknownFields = true) let m = SlotMessage(kind: SMessageType.SlotStart, ssmsg: sm) res.add(m) if counter mod 10_000 == 0: info "Processing file", file = extractFilename(file), lines_processed = counter, lines_filtered = len(res) result = res except CatchableError as exc: warn "Error reading data from file", file = file, errorMsg = exc.msg finally: stream.close() proc readLogFileForSCMSRMessages(file: string, srnode: var SRSCNode, ignoreErrors = true, dumpErrors = false) = var stream = newFileStream(file) var line: string var counter = 0 try: while not(stream.atEnd()): var m: LogMessage line = stream.readLine() inc(counter) try: m = Json.decode(line, LogMessage, allowUnknownFields = true) except SerializationError as exc: if dumpErrors: error "Serialization error while reading file, ignoring", file = file, line_number = counter, errorMsg = exc.formatMsg(line) else: error "Serialization error while reading file, ignoring", file = file, line_number = counter if not(ignoreErrors): raise exc else: continue if m.msg == "Sync committee message sent": let sm = Json.decode(line, SCMSentMessage, allowUnknownFields = true) srnode.sends.add(sm) elif m.msg == "Sync committee message received": let rm = Json.decode(line, SCMReceivedMessage, allowUnknownFields = true) discard srnode.recvs.hasKeyOrPut(rm.syncCommitteeMsg.signature, rm) elif m.msg == "Contribution received": let rm = Json.decode(line, ContributionReceivedMessage, allowUnknownFields = true) discard srnode.contributionRecvs.hasKeyOrPut(rm.signature, rm) elif m.msg == "Contribution sent": let sm = Json.decode(line, ContributionSentMessage, allowUnknownFields = true) srnode.contributionSends.add(sm) if counter mod 10_000 == 0: info "Processing file", file = extractFilename(file), lines_processed = counter, sends_filtered = len(srnode.sends), recvs_filtered = len(srnode.recvs) except CatchableError as exc: warn "Error reading data from file", file = file, errorMsg = exc.msg finally: stream.close() proc readLogFileForSecondMessages(file: string, ignoreErrors = true, dumpErrors = false): seq[LogMessage] = var stream = newFileStream(file) var line: string var counter = 0 try: while not (stream.atEnd()): var m: LogMessage line = stream.readLine() inc(counter) try: m = Json.decode(line, LogMessage, allowUnknownFields = true) except SerializationError as exc: if dumpErrors: error "Serialization error while reading file, ignoring", file = file, line_number = counter, errorMsg = exc.formatMsg(line) else: error "Serialization error while reading file, ignoring", file = file, line_number = counter if not(ignoreErrors): raise exc else: continue if m.msg == "onSecond task completed": result.add(m) if counter mod 10_000 == 0: info "Processing file", file = extractFilename(file), lines_processed = counter, seconds_filtered = len(result) except CatchableError as exc: warn "Error reading data from file", file = file, errorMsg = exc.msg finally: stream.close() proc filterGossipMessages(log: seq[JsonNode]): seq[GossipMessage] = # Because of times.DateTime object we forced to turn off [ProveInit] warnings # You can remove this pragmas when Nim compiler or times.nim will be fixed. {.push warning[ProveInit]: off.} result = newSeq[GossipMessage]() {.pop.} for node in log: if ("msg" in node) and ("message_id" in node) and ("ts" in node): let message = node["msg"].getStr() if message == "Incoming pubsub message received": let msg = GossipMessage.init(Incoming, node["message_id"].getStr(), node["ts"].getStr()) result.add(msg) elif message == "Outgoing pubsub message has been sent": let msg = GossipMessage.init(Outgoing, node["message_id"].getStr(), node["ts"].getStr()) result.add(msg) iterator simDirectoryLogFiles(simdir: string): string = let absPath = absolutePath(simdir) let dataPath = absPath & DirSep & "data" if not dirExists(dataPath): error "Invalid `eth2_network_simulation` data directory structure", path = dataPath quit(1) var index = 0 while true: let path = dataPath & DirSep & "node-" & $index & DirSep let simplePath = path & "beacon_node.log" let bootPath = path & "bootstrap_node.log" if fileExists(simplePath): yield simplePath elif fileExists(bootPath): yield bootPath else: break inc(index) proc getDirectoryLogFiles(builddir: string, filter: seq[string]): seq[NodeDirectory] = var res = newSeq[NodeDirectory]() let absPath = absolutePath(builddir) let dataPath = absPath & DirSep & "data" if not dirExists(dataPath): error "Invalid `network` data directory structure", path = dataPath quit(1) for dirPath in walkDirs(dataPath & DirSep & "*"): let name = extractFilename(dirPath) if (len(filter) == 0) or (name in filter): var nodeDir = NodeDirectory(name: extractFilename(dirPath), path: dirPath) for filePath in walkFiles(dirPath & DirSep & "*.log"): nodeDir.logs.add(extractFilename(filePath)) if len(nodeDir.logs) > 0: res.add(nodeDir) return res proc getLogFiles(builddir: string, filter: seq[string]): seq[NodeDirectory] = var res = newSeq[NodeDirectory]() let dataPath = absolutePath(builddir) if not dirExists(dataPath): error "Logs directory did not exist", path = dataPath quit(1) for filePath in walkFiles(dataPath & DirSep & "*.*"): let name = extractFilename(filePath) if (len(filter) == 0) or (name in filter): let nodeDir = NodeDirectory(name: extractFilename(filePath), path: dataPath, logs: @[extractFilename(filePath)]) res.add(nodeDir) return res func getMessage(logs: seq[GossipMessage], msg: GossipMessage): Option[GossipMessage] = {.push warning[ProveInit]: off.} result = none[GossipMessage]() {.pop.} for i in 0 ..< len(logs): if logs[i].kind == Incoming and logs[i].id == msg.id: {.push warning[ProveInit]: off.} result = some(logs[i]) {.pop.} proc runPubsub(logConf: LogTraceConf, logFiles: seq[string]) = var logs = newSeq[tuple[name: string, data: seq[GossipMessage]]]() if len(logFiles) < 2: error "Number of log files insufficient to process pubsub messages", logs_count = len(logFiles) quit(1) for item in logFiles: let data = filterGossipMessages(readLogFile(item)) logs.add((name: item, data: data)) info "Loaded log file", logfile = item, lines_count = len(data) {.push warning[ProveInit]: off.} var checks = newSeq[Option[GossipMessage]](len(logs)) {.pop.} var misses = 0 for i in 0 ..< len(logs): info "Processing log file", logfile = logs[i].name for k in 0 ..< len(logs[i].data): let item = logs[i].data[k] if item.kind == Outgoing: info "Searching message", message_id = $item, logfile = logs[i].name checks[i] = some(item) for z in 1 ..< len(logs): let index = (i + z) mod len(logs) checks[index] = getMessage(logs[index].data, item) for z in 1 ..< len(checks): let index = (i + z) mod len(logs) if not(checks[index].isSome()): warn "Message not found in log", logfile = logs[index].name, message_id = $item inc(misses) if misses == 0: info "No missing messages found" else: warn "Number of missing messages found", count = $misses proc runAttSend(logConf: LogTraceConf, logFiles: seq[string]) = info "Check for late `attestation sent` messages" if len(logFiles) < 1: error "Number of log files insufficient to process pubsub messages", logs_count = len(logFiles) quit(1) let minDuration = initDuration(seconds = 4) var slotMessagesCount = 0 var attsMessagesCount = 0 var lateAttsMessagesCount = 0 for item in logFiles: info "Processing log file", logFile = item let data = readLogFileForAttsMessages(item, logConf.ignoreSerializationErrors, logConf.dumpSerializationErrors) var currentSlot: Option[SlotStartMessage] for item in data: if item.kind == SMessageType.SlotStart: currentSlot = some(item.ssmsg) inc(slotMessagesCount) elif item.kind == SMessageType.AttestationSent: if currentSlot.isSome(): let attestationTime = currentSlot.get().timestamp - item.asmsg.timestamp if attestationTime > minDuration: warn "Found an attestation that was sent later than necessary", lateness = $attestationTime, slot = currentSlot.get(), attestation = item.asmsg inc(lateAttsMessagesCount) inc(attsMessagesCount) else: warn "`Attestation sent` message appears before `Start slot` message", attestation = item.asmsg info "Check finished", attestation_sent_messages = attsMessagesCount, slot_messages = slotMessagesCount, late_attestation_messages = lateAttsMessagesCount func toSimple(s: seq[string]): string = "[" & s.mapIt("'" & it & "'").join(", ") & "]" proc runAttSendReceive(logConf: LogTraceConf, nodes: seq[NodeDirectory]) = info "Check for attestation sent/received messages" if len(nodes) < 2: error "Number of nodes' log files insufficient", nodes_count = len(nodes) quit(1) var srnodes = newSeq[SRANode]() for node in nodes: var srnode = SRANode( directory: node, sends: newSeq[AttestationSentMessage](), recvs: newTable[string, AttestationReceivedMessage](), aggSends: newSeq[AggregatedAttestationSentMessage](), aggRecvs: newTable[string, AggregatedAttestationReceivedMessage]() ) info "Processing node", node = node.name for logfile in node.logs: let path = node.path & DirSep & logfile info "Processing node's logfile", node = node.name, logfile = path readLogFileForASRMessages(path, srnode, logConf.ignoreSerializationErrors, logConf.dumpSerializationErrors) srnodes.add(srnode) if len(nodes) < 2: error "Number of nodes' log files insufficient", nodes_count = len(nodes) quit(1) for i in 0 ..< len(srnodes): var success = 0 var failed = 0 for item in srnodes[i].sends: var k = (i + 1) mod len(srnodes) var misses = newSeq[string]() while k != i: if item.attestation.signature notin srnodes[k].recvs: misses.add(srnodes[k].directory.name) k = (k + 1) mod len(srnodes) if len(misses) == 0: inc(success) else: inc(failed) info "Attestation was not received", sender = srnodes[i].directory.name, signature = item.attestation.signature, receivers = misses.toSimple(), send_stamp = item.timestamp info "Statistics for sender node", sender = srnodes[i].directory.name, successful_broadcasts = success, failed_broadcasts = failed, total_broadcasts = len(srnodes[i].sends) proc runAggAttSendReceive(logConf: LogTraceConf, nodes: seq[NodeDirectory]) = info "Check for aggregate attestation sent/received messages" if len(nodes) < 2: error "Number of nodes' log files insufficient", nodes_count = len(nodes) quit(1) var srnodes = newSeq[SRANode]() for node in nodes: var srnode = SRANode( directory: node, sends: newSeq[AttestationSentMessage](), recvs: newTable[string, AttestationReceivedMessage](), aggSends: newSeq[AggregatedAttestationSentMessage](), aggRecvs: newTable[string, AggregatedAttestationReceivedMessage]() ) info "Processing node", node = node.name for logfile in node.logs: let path = node.path & DirSep & logfile info "Processing node's logfile", node = node.name, logfile = path readLogFileForASRMessages(path, srnode, logConf.ignoreSerializationErrors, logConf.dumpSerializationErrors) srnodes.add(srnode) if len(nodes) < 2: error "Number of nodes' log files insufficient", nodes_count = len(nodes) quit(1) for i in 0 ..< len(srnodes): var success = 0 var failed = 0 for item in srnodes[i].aggSends: var k = (i + 1) mod len(srnodes) var misses = newSeq[string]() while k != i: if item.signature notin srnodes[k].aggRecvs: misses.add(srnodes[k].directory.name) k = (k + 1) mod len(srnodes) if len(misses) == 0: inc(success) else: inc(failed) info "Aggregate attestation was not received", sender = srnodes[i].directory.name, signature = item.signature, receivers = misses.toSimple(), send_stamp = item.timestamp info "Statistics for sender node", sender = srnodes[i].directory.name, successful_broadcasts = success, failed_broadcasts = failed, total_broadcasts = len(srnodes[i].aggSends) proc runSCMSendReceive(logConf: LogTraceConf, nodes: seq[NodeDirectory]) = info "Check for Sync Committee Message sent/received messages" if len(nodes) < 2: error "Number of nodes' log files insufficient", nodes_count = len(nodes) quit(1) var srnodes = newSeq[SRSCNode]() for node in nodes: var srnode = SRSCNode( directory: node, sends: newSeq[SCMSentMessage](), recvs: newTable[string, SCMReceivedMessage](), contributionSends: newSeq[ContributionSentMessage](), contributionRecvs: newTable[string, ContributionReceivedMessage]() ) info "Processing node", node = node.name for logfile in node.logs: let path = node.path & DirSep & logfile info "Processing node's logfile", node = node.name, logfile = path readLogFileForSCMSRMessages(path, srnode, logConf.ignoreSerializationErrors, logConf.dumpSerializationErrors) srnodes.add(srnode) if len(nodes) < 2: error "Number of nodes' log files insufficient", nodes_count = len(nodes) quit(1) for i in 0 ..< len(srnodes): var success = 0 var failed = 0 for item in srnodes[i].sends: var k = (i + 1) mod len(srnodes) var misses = newSeq[string]() while k != i: if item.message.signature notin srnodes[k].recvs: misses.add(srnodes[k].directory.name) k = (k + 1) mod len(srnodes) if len(misses) == 0: inc(success) else: inc(failed) info "Sync committee message was not received", sender = srnodes[i].directory.name, signature = item.message.signature, receivers = misses.toSimple(), send_stamp = item.timestamp info "Statistics for sender node", sender = srnodes[i].directory.name, successful_broadcasts = success, failed_broadcasts = failed, total_broadcasts = len(srnodes[i].sends) proc runContributionSendReceive(logConf: LogTraceConf, nodes: seq[NodeDirectory]) = info "Check for contribution sent/received messages" if len(nodes) < 2: error "Number of nodes' log files insufficient", nodes_count = len(nodes) quit(1) var srnodes = newSeq[SRSCNode]() for node in nodes: var srnode = SRSCNode( directory: node, sends: newSeq[SCMSentMessage](), recvs: newTable[string, SCMReceivedMessage](), contributionSends: newSeq[ContributionSentMessage](), contributionRecvs: newTable[string, ContributionReceivedMessage]() ) info "Processing node", node = node.name for logfile in node.logs: let path = node.path & DirSep & logfile info "Processing node's logfile", node = node.name, logfile = path readLogFileForSCMSRMessages(path, srnode, logConf.ignoreSerializationErrors, logConf.dumpSerializationErrors) srnodes.add(srnode) if len(nodes) < 2: error "Number of nodes' log files insufficient", nodes_count = len(nodes) quit(1) for i in 0 ..< len(srnodes): var success = 0 var failed = 0 for item in srnodes[i].contributionSends: var k = (i + 1) mod len(srnodes) var misses = newSeq[string]() while k != i: if item.contribution.signature notin srnodes[k].contributionRecvs: misses.add(srnodes[k].directory.name) k = (k + 1) mod len(srnodes) if len(misses) == 0: inc(success) else: inc(failed) info "Contribution was not received", sender = srnodes[i].directory.name, signature = item.contribution.signature, receivers = misses.toSimple(), send_stamp = item.timestamp info "Statistics for sender node", sender = srnodes[i].directory.name, successful_broadcasts = success, failed_broadcasts = failed, total_broadcasts = len(srnodes[i].contributionSends) proc runLatencyCheck(logConf: LogTraceConf, logFiles: seq[string], nodes: seq[NodeDirectory]) = info "Check for async responsiveness" if len(nodes) == 0 and len(logFiles) == 0: error "Number of log files insufficient", nodes_count = len(nodes) quit(1) let allowedTime = int64(float(initDuration(seconds = 1).inMilliseconds()) * logConf.allowedLag) for logFile in logFiles: info "Processing log file", logfile = logFile let msgs = readLogFileForSecondMessages(logFile, logConf.ignoreSerializationErrors, logConf.dumpSerializationErrors) var lastSecond: Option[LogMessage] var minEntry: Option[LogMessage] var maxEntry: Option[LogMessage] var minTime: times.Duration = initDuration(days = 1) var maxTime: times.Duration var sumMilliseconds: int64 for item in msgs: if lastSecond.isNone(): lastSecond = some(item) else: let time = item.timestamp - lastSecond.get().timestamp let start_time = lastSecond.get().timestamp let finish_time = item.timestamp if time.inMilliseconds() > allowedTime: info "Found time lag ", start_time = start_time.format("yyyy-MM-dd HH:mm:ss'.'fff"), finish_time = finish_time.format("yyyy-MM-dd HH:mm:ss'.'fff"), lag_time = time if time < minTime: minTime = time minEntry = some(item) if time > maxTime: maxTime = time maxEntry = some(item) sumMilliseconds += time.inMilliseconds() lastSecond = some(item) let avgTime = initDuration(milliseconds = sumMilliseconds div len(msgs)) info "Latency statistics", min_time = minTime, max_time = maxTime, avg_time = avgTime, seconds_count = len(msgs) proc run(conf: LogTraceConf) = var logFiles: seq[string] var logNodes: seq[NodeDirectory] if len(conf.logFiles) > 0: for item in conf.logFiles: let absPath = absolutePath(item) if fileExists(absPath): logFiles.add(absPath) if len(conf.simDir) > 0: for item in simDirectoryLogFiles(conf.simDir): logFiles.add(item) logNodes = getDirectoryLogFiles(conf.simDir, conf.nodes) if len(conf.netDir) > 0: logNodes = getDirectoryLogFiles(conf.netDir, conf.nodes) if len(conf.logDir) > 0: logNodes = getLogFiles(conf.logDir, conf.nodes) if len(logFiles) == 0 and len(logNodes) == 0: error "Log file sources not specified or not enough log files found" quit(1) case conf.cmd of StartUpCommand.pubsub: runPubsub(conf, logFiles) of StartUpCommand.asl: runAttSend(conf, logFiles) of StartUpCommand.asr: runAttSendReceive(conf, logNodes) of StartUpCommand.aggasr: runAggAttSendReceive(conf, logNodes) of StartUpCommand.scmsr: runSCMSendReceive(conf, logNodes) of StartUpCommand.csr: runContributionSendReceive(conf, logNodes) of StartUpCommand.lat: runLatencyCheck(conf, logFiles, logNodes) of StartUpCommand.traceAll: runContributionSendReceive(conf, logNodes) runSCMSendReceive(conf, logNodes) runAggAttSendReceive(conf, logNodes) runAttSendReceive(conf, logNodes) runLatencyCheck(conf, logFiles, logNodes) runPubsub(conf, logFiles) runAttSend(conf, logFiles) when isMainModule: echo LogTraceHeader var conf = LogTraceConf.load(version = LogTraceVersion) run(conf)