nimbus-eth2/ncli/logtrace.nim
zah d07113767d
Bugfix: The VC was producing invalid sync committee contributions (#4343)
Since the sync committee duties are no longer updated on every slot
and previously the sync committee aggregators selection proofs were
generated during the duties update, this now resulted in the client
using stale selection proofs (they must be generated at each slot).

The fix consists of moving the selection proof generation logic in
a different function which is properly executed on each slot.

Other changes:

* The logtrace tool has been enhanced with a framework for adding
  new simpler log aggregation and analysis algorithms.
  The default CI testnet simulation will now ensure that the blocks
  in the network have reasonable sync committee participation.
2022-11-24 09:46:35 +02:00

1182 lines
40 KiB
Nim

# beacon_chain
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import confutils, json, times, streams, os, strutils, options, chronicles,
tables, sequtils
import json_serialization
const
LogTraceName = "Beacon-Chain LogTrace Tool"
LogTraceMajor: int = 0
LogTraceMinor: int = 0
LogTracePatch: int = 4
LogTraceVersion = $LogTraceMajor & "." & $LogTraceMinor & "." &
$LogTracePatch
LogTraceCopyright = "Copyright(C) 2021" &
" Status Research & Development GmbH"
LogTraceHeader = LogTraceName & ", Version " & LogTraceVersion &
" [" & hostOS & ": " & hostCPU & "]\r\n" &
LogTraceCopyright & "\r\n"
type
StartUpCommand {.pure.} = enum
pubsub, asl, asr, aggasr, scmsr, csr, lat, traceAll, localSimChecks
LogTraceConf = object
logFiles {.
desc: "Specifies one or more log files",
abbr: "f",
name: "log-file" .}: seq[string]
simDir {.
desc: "Specifies path to eth2_network_simulation directory",
defaultValue: "",
name: "sim-dir" .}: string
netDir {.
desc: "Specifies path to network build directory",
defaultValue: "",
name: "net-dir" .}: string
logDir {.
desc: "Specifies path with bunch of logs",
defaultValue: "",
name: "log-dir" .}: string
ignoreSerializationErrors {.
desc: "Ignore serialization errors while parsing log files",
defaultValue: true,
name: "ignore-errors" .}: bool
dumpSerializationErrors {.
desc: "Dump full serialization errors while parsing log files",
defaultValue: false ,
name: "dump-errors" .}: bool
nodes {.
desc: "Specifies node names which logs will be used",
name: "nodes" .}: seq[string]
allowedLag {.
desc: "Allowed latency lag multiplier",
defaultValue: 2.0,
name: "lag" .}: float
constPreset {.
desc: "The const preset being used"
defaultValue: "mainnet"
name: "const-preset" .}: string
case cmd {.command.}: StartUpCommand
of pubsub:
discard
of asl:
discard
of asr:
discard
of aggasr:
discard
of scmsr:
discard
of csr:
discard
of lat:
discard
of traceAll:
discard
of localSimChecks:
discard
IssuesGroup = ref object
name: string
fatalIssues: seq[string]
warnings: seq[string]
FileReport = object
categories: seq[IssuesGroup]
LogVisitor = object
visitLine: proc(msg, fullLine: string) {.gcsafe, raises: [Defect].}
produceReport: proc(): FileReport {.gcsafe, raises: [Defect].}
LogVisitorFactory = proc(): LogVisitor {.gcsafe, raises: [Defect].}
LogTracer = object
enabledVisitors: seq[LogVisitorFactory]
GossipDirection = enum
None, Incoming, Outgoing
NodeDirectory = object
name: string
path: string
logs: seq[string]
LogMessage = object of RootObj
level {.serializedFieldName: "lvl" .}: string
timestamp {.serializedFieldName: "ts" .}: DateTime
msg: string
topics: string
tid: int
SlotStartMessage = object of LogMessage
beaconTime: uint64
finalizedEpoch: uint64
finalizedRoot: string
finalizedSlot: uint64
headEpoch: uint64
headRoot: string
headSlot: uint64
lastSlot: uint64
peers: uint64
scheduledSlot: uint64
AttestationDataObject = object
slot: uint64
index: uint64
beaconBlockRoot {.serializedFieldName: "beacon_block_root".}: string
sourceEpoch {.serializedFieldName: "source_epoch".}: uint64
sourceRoot {.serializedFieldName: "source_root".}: string
targetEpoch {.serializedFieldName: "target_epoch".}: uint64
targetRoot {.serializedFieldName: "target_root".}: string
AttestationObject = object
aggregationBits {.serializedFieldName: "aggregation_bits".}: string
data: AttestationDataObject
signature: string
AttestationSentMessage = object of LogMessage
attestation: AttestationObject
AttestationReceivedMessage = object of LogMessage
attestation: AttestationObject
head: string
wallSlot: uint64
pcs: string
AggregatedAttestationSentMessage = object of LogMessage
attestation: AttestationObject
signature: string
AggregatedAttestationReceivedMessage = object of LogMessage
aggregate: AttestationObject
wallSlot: uint64
signature: string
BlockSentMessage = object
# The message structure is as follows:
#[
{
"lvl": "NOT",
"ts": "2022-11-21 23:02:37.032+02:00",
"msg": "Block sent",
"topics": "beacval",
"blockRoot": "7a0836e4",
"blck": {
"slot": 15,
"proposer_index": 96,
"parent_root": "487372dc",
"state_root": "06699625",
"eth1data": {
"deposit_root": "6c3ff67871b79b7aecc7a125e7ec9ff857879a1c83e50513be113103acf8ca3f",
"deposit_count": 1024,
"block_hash": "4242424242424242424242424242424242424242424242424242424242424242"
},
"graffiti": "Nimbus/v22.10.1-eb6615-stateofus",
"proposer_slashings_len": 0,
"attester_slashings_len": 0,
"attestations_len": 4,
"deposits_len": 0,
"voluntary_exits_len": 0,
"sync_committee_participants": 32,
"block_number": 0,
"fee_recipient": ""
},
"signature": "b544f144",
"delay": "32ms3us"
}
]#
# So far, logtrace needs only a single property of the block object.
# Feel free to add additional fields to be parsed as necessary.
blck: BlockShortLog
BlockShortLog = object
sync_committee_participants: int
SyncCommitteeMessageObject = object
slot: uint64
beaconBlockRoot {.serializedFieldName: "beacon_block_root".}: string
validatorIndex {.serializedFieldName: "validator_index".}: uint64
signature: string
ContributionObject = object
slot: uint64
beaconBlockRoot {.serializedFieldName: "beacon_block_root".}: string
subcommittee_index: uint64
aggregationBits {.serializedFieldName: "aggregation_bits".}: string
ContributionSentObject = object
contribution: ContributionObject
aggregatorIndex {.serializedFieldName: "aggregator_index".}: uint64
signature: string
SCMSentMessage = object of LogMessage
message: SyncCommitteeMessageObject
SCMReceivedMessage = object of LogMessage
wallSlot: uint64
syncCommitteeMsg: SyncCommitteeMessageObject
subcommitteeIdx: uint64
ContributionSentMessage = object of LogMessage
contribution: ContributionSentObject
ContributionReceivedMessage = object of LogMessage
contribution: ContributionObject
wallSlot: uint64
aggregatorIndex {.serializedFieldName: "aggregator_index".}: uint64
signature: string
selectionProof {.serializedFieldName: "selection_proof".}: string
GossipMessage = object
kind: GossipDirection
id: string
datetime: DateTime
processed: bool
SMessageType {.pure.} = enum
AttestationSent, SCMSent, SlotStart
SlotMessage = object
case kind: SMessageType
of SMessageType.AttestationSent:
asmsg: AttestationSentMessage
of SMessageType.SCMSent:
scmsmsg: SCMSentMessage
of SMessageType.SlotStart:
ssmsg: SlotStartMessage
# SlotMessage = object
# case kind: SMessageType
# of SMessageType.SCMSent:
# scmsmsg: SCMSentMessage
# of SMessageType.SlotStart:
# ssmsg: SlotStartMessage
SRANode = object
directory: NodeDirectory
sends: seq[AttestationSentMessage]
recvs: TableRef[string, AttestationReceivedMessage]
aggSends: seq[AggregatedAttestationSentMessage]
aggRecvs: TableRef[string, AggregatedAttestationReceivedMessage]
SRSCNode = object
directory: NodeDirectory
sends: seq[SCMSentMessage]
recvs: TableRef[string, SCMReceivedMessage]
contributionSends: seq[ContributionSentMessage]
contributionRecvs: TableRef[string, ContributionReceivedMessage]
template noIssues: FileReport =
FileReport()
template hasIssues(issuesCategories: varargs[IssuesGroup]): FileReport =
FileReport(categories: @issuesCategories)
proc copyEntriesTo(src: FileReport, dst: var FileReport) =
for c in src.categories:
dst.categories.add c
func isEmpty(r: FileReport): bool =
r.categories.len == 0
proc printCategory(severityLevel: string, issues: openArray[string]) =
if issues.len > 0:
echo ""
echo severityLevel, ":"
for issue in issues:
echo "* ", issue
proc print(r: FileReport) =
for category in r.categories:
echo "### ", category.name
printCategory "Fatal Issues", category.fatalIssues
printCategory "Warnings", category.warnings
echo ""
template fatal(issuesGroup: IssuesGroup, msg: string) =
issuesGroup.fatalIssues.add msg
template warning(issuesGroup: IssuesGroup, msg: string) =
issuesGroup.warnings.add msg
proc new(T: type IssuesGroup, name: string): T =
T(name: name)
proc readValue(reader: var JsonReader, value: var DateTime) =
let s = reader.readValue(string)
try:
value = parse(s, "YYYY-MM-dd HH:mm:ss'.'fffzzz", utc())
except CatchableError:
raiseUnexpectedValue(reader, "Invalid date time")
proc init(t: typedesc[GossipMessage], kind: GossipDirection, id,
datestr: string): GossipMessage =
GossipMessage(
kind: kind,
id: id,
datetime: parse(datestr, "YYYY-MM-dd HH:mm:ss'.'fffzzz")
)
func `$`(msg: GossipMessage): string =
msg.id
proc readLogFile(file: string): seq[JsonNode] =
var res = newSeq[JsonNode]()
var stream = newFileStream(file)
try:
while not(stream.atEnd()):
var line = stream.readLine()
let node = parseJson(line)
res.add(node)
result = res
except CatchableError as exc:
warn "Error reading JSON data from file", file = file,
errorMsg = exc.msg
finally:
stream.close()
proc readLogFileForAttsMessages(file: string,
ignoreErrors = true,
dumpErrors = false): seq[SlotMessage] =
var res = newSeq[SlotMessage]()
var stream = newFileStream(file)
var line: string
var counter = 0
try:
while not(stream.atEnd()):
line = stream.readLine()
inc(counter)
var m: LogMessage
try:
m = Json.decode(line, LogMessage, allowUnknownFields = true)
except SerializationError as exc:
if dumpErrors:
error "Serialization error while reading file, ignoring", file = file,
line_number = counter, errorMsg = exc.formatMsg(line)
else:
error "Serialization error while reading file, ignoring", file = file,
line_number = counter
if not(ignoreErrors):
raise exc
else:
continue
if m.msg == "Attestation sent":
let am = Json.decode(line, AttestationSentMessage,
allowUnknownFields = true)
let m = SlotMessage(kind: SMessageType.AttestationSent,
asmsg: am)
res.add(m)
elif m.msg == "Slot start":
let sm = Json.decode(line, SlotStartMessage,
allowUnknownFields = true)
let m = SlotMessage(kind: SMessageType.SlotStart,
ssmsg: sm)
res.add(m)
if counter mod 10_000 == 0:
info "Processing file", file = extractFilename(file),
lines_processed = counter,
lines_filtered = len(res)
result = res
except CatchableError as exc:
warn "Error reading data from file", file = file, errorMsg = exc.msg
finally:
stream.close()
proc readLogFileForASRMessages(file: string, srnode: var SRANode,
ignoreErrors = true, dumpErrors = false) =
var stream = newFileStream(file)
var line: string
var counter = 0
try:
while not(stream.atEnd()):
var m: LogMessage
line = stream.readLine()
inc(counter)
try:
m = Json.decode(line, LogMessage, allowUnknownFields = true)
except SerializationError as exc:
if dumpErrors:
error "Serialization error while reading file, ignoring", file = file,
line_number = counter, errorMsg = exc.formatMsg(line)
else:
error "Serialization error while reading file, ignoring", file = file,
line_number = counter
if not(ignoreErrors):
raise exc
else:
continue
if m.msg == "Attestation sent":
let sm = Json.decode(line, AttestationSentMessage,
allowUnknownFields = true)
srnode.sends.add(sm)
elif m.msg == "Attestation received":
let rm = Json.decode(line, AttestationReceivedMessage,
allowUnknownFields = true)
discard srnode.recvs.hasKeyOrPut(rm.attestation.signature, rm)
elif m.msg == "Aggregate received":
let rm = Json.decode(line, AggregatedAttestationReceivedMessage,
allowUnknownFields = true)
discard srnode.aggRecvs.hasKeyOrPut(rm.signature, rm)
elif m.msg == "Aggregated attestation sent":
let sm = Json.decode(line, AggregatedAttestationSentMessage,
allowUnknownFields = true)
srnode.aggSends.add(sm)
if counter mod 10_000 == 0:
info "Processing file", file = extractFilename(file),
lines_processed = counter,
sends_filtered = len(srnode.sends),
recvs_filtered = len(srnode.recvs)
except CatchableError as exc:
warn "Error reading data from file", file = file, errorMsg = exc.msg
finally:
stream.close()
proc readLogFileForSCMSendMessages(file: string,
ignoreErrors = true,
dumpErrors = false): seq[SlotMessage] =
var res = newSeq[SlotMessage]()
var stream = newFileStream(file)
var line: string
var counter = 0
try:
while not(stream.atEnd()):
line = stream.readLine()
inc(counter)
var m: LogMessage
try:
m = Json.decode(line, LogMessage, allowUnknownFields = true)
except SerializationError as exc:
if dumpErrors:
error "Serialization error while reading file, ignoring", file = file,
line_number = counter, errorMsg = exc.formatMsg(line)
else:
error "Serialization error while reading file, ignoring", file = file,
line_number = counter
if not(ignoreErrors):
raise exc
else:
continue
if m.msg == "Sync committee message sent":
let scmm = Json.decode(line, SCMSentMessage,
allowUnknownFields = true)
let m = SlotMessage(kind: SMessageType.SCMSent,
scmsmsg: scmm)
res.add(m)
elif m.msg == "Slot start":
let sm = Json.decode(line, SlotStartMessage,
allowUnknownFields = true)
let m = SlotMessage(kind: SMessageType.SlotStart,
ssmsg: sm)
res.add(m)
if counter mod 10_000 == 0:
info "Processing file", file = extractFilename(file),
lines_processed = counter,
lines_filtered = len(res)
result = res
except CatchableError as exc:
warn "Error reading data from file", file = file, errorMsg = exc.msg
finally:
stream.close()
proc readLogFileForSCMSRMessages(file: string, srnode: var SRSCNode,
ignoreErrors = true, dumpErrors = false) =
var stream = newFileStream(file)
var line: string
var counter = 0
try:
while not(stream.atEnd()):
var m: LogMessage
line = stream.readLine()
inc(counter)
try:
m = Json.decode(line, LogMessage, allowUnknownFields = true)
except SerializationError as exc:
if dumpErrors:
error "Serialization error while reading file, ignoring", file = file,
line_number = counter, errorMsg = exc.formatMsg(line)
else:
error "Serialization error while reading file, ignoring", file = file,
line_number = counter
if not(ignoreErrors):
raise exc
else:
continue
if m.msg == "Sync committee message sent":
let sm = Json.decode(line, SCMSentMessage,
allowUnknownFields = true)
srnode.sends.add(sm)
elif m.msg == "Sync committee message received":
let rm = Json.decode(line, SCMReceivedMessage,
allowUnknownFields = true)
discard srnode.recvs.hasKeyOrPut(rm.syncCommitteeMsg.signature, rm)
elif m.msg == "Contribution received":
let rm = Json.decode(line, ContributionReceivedMessage,
allowUnknownFields = true)
discard srnode.contributionRecvs.hasKeyOrPut(rm.signature, rm)
elif m.msg == "Contribution sent":
let sm = Json.decode(line, ContributionSentMessage,
allowUnknownFields = true)
srnode.contributionSends.add(sm)
if counter mod 10_000 == 0:
info "Processing file", file = extractFilename(file),
lines_processed = counter,
sends_filtered = len(srnode.sends),
recvs_filtered = len(srnode.recvs)
except CatchableError as exc:
warn "Error reading data from file", file = file, errorMsg = exc.msg
finally:
stream.close()
proc processFile(tracer: LogTracer, file: string): FileReport =
var stream = newFileStream(file)
let visitors = mapIt(tracer.enabledVisitors, it())
try:
while not (stream.atEnd()):
let line = stream.readLine()
var reader = JsonReader[DefaultFlavor].init(memoryInput(line))
for fieldName in reader.readObjectFields:
if fieldName == "msg":
let msg = reader.readValue(string)
for visitor in visitors:
visitor.visitLine(msg, line)
break
else:
reader.skipSingleJsValue()
except CatchableError as exc:
warn "Error reading data from file", file = file, errorMsg = exc.msg
finally:
stream.close()
for vistor in visitors:
let report = vistor.produceReport()
if not report.isEmpty:
report.copyEntriesTo result
proc failedValidationsChecker: LogVisitorFactory =
return proc (): LogVisitor =
var failedValidations = initCountTable[string]()
LogVisitor(
visitLine: proc (msg, line: string) =
if msg.endsWith("failed validation"):
failedValidations.inc msg
,
produceReport: proc (): FileReport =
if failedValidations.len > 0:
let issues = IssuesGroup.new "Failed Validations"
for msg, count in failedValidations:
issues.fatal(msg & " " & $count & " times")
return hasIssues(issues)
else:
return noIssues()
)
proc syncAggregateChecker(constPreset: string): LogVisitorFactory =
return proc (): LogVisitor =
var totalBlocks = 0
var minSyncAggregate = 512
var syncAggregatesCombinedSize = 0
let minExpectedAggregateSize = if constPreset == "mainnet":
450
else:
20
LogVisitor(
visitLine: proc (msgLabel, line: string) =
if msgLabel == "Block sent":
let msg = try:
Json.decode(line, BlockSentMessage, allowUnknownFields = true)
except SerializationError as err:
echo "Failure to parse a 'Block sent' message:"
echo err.formatMsg("<msg>")
quit 1
let syncAggregateSize = msg.blck.sync_committee_participants
if syncAggregateSize != -1:
inc totalBlocks
syncAggregatesCombinedSize += syncAggregateSize
if minSyncAggregate > syncAggregateSize:
minSyncAggregate = syncAggregateSize
,
produceReport: proc (): FileReport =
let avgSyncAggregateSize = syncAggregatesCombinedSize div totalBlocks
if avgSyncAggregateSize < minExpectedAggregateSize:
let issues = IssuesGroup.new "SyncAggregate Stats"
issues.fatal("Minimal sync aggregate size: " & $minSyncAggregate)
issues.fatal("Average sync aggregate size: " & $avgSyncAggregateSize)
return hasIssues(issues)
else:
return noIssues()
)
proc readLogFileForSecondMessages(file: string, ignoreErrors = true,
dumpErrors = false): seq[LogMessage] =
var stream = newFileStream(file)
var line: string
var counter = 0
try:
while not (stream.atEnd()):
var m: LogMessage
line = stream.readLine()
inc(counter)
try:
m = Json.decode(line, LogMessage, allowUnknownFields = true)
except SerializationError as exc:
if dumpErrors:
error "Serialization error while reading file, ignoring", file = file,
line_number = counter, errorMsg = exc.formatMsg(line)
else:
error "Serialization error while reading file, ignoring", file = file,
line_number = counter
if not(ignoreErrors):
raise exc
else:
continue
if m.msg == "onSecond task completed":
result.add(m)
if counter mod 10_000 == 0:
info "Processing file", file = extractFilename(file),
lines_processed = counter,
seconds_filtered = len(result)
except CatchableError as exc:
warn "Error reading data from file", file = file, errorMsg = exc.msg
finally:
stream.close()
proc filterGossipMessages(log: seq[JsonNode]): seq[GossipMessage] =
# Because of times.DateTime object we forced to turn off [ProveInit] warnings
# You can remove this pragmas when Nim compiler or times.nim will be fixed.
{.push warning[ProveInit]: off.}
result = newSeq[GossipMessage]()
{.pop.}
for node in log:
if ("msg" in node) and ("message_id" in node) and ("ts" in node):
let message = node["msg"].getStr()
if message == "Incoming pubsub message received":
let msg = GossipMessage.init(Incoming, node["message_id"].getStr(),
node["ts"].getStr())
result.add(msg)
elif message == "Outgoing pubsub message has been sent":
let msg = GossipMessage.init(Outgoing, node["message_id"].getStr(),
node["ts"].getStr())
result.add(msg)
iterator simDirectoryLogFiles(simdir: string): string =
let absPath = absolutePath(simdir)
let dataPath = absPath & DirSep & "data"
if not dirExists(dataPath):
error "Invalid `eth2_network_simulation` data directory structure",
path = dataPath
quit(1)
var index = 0
while true:
let path = dataPath & DirSep & "node-" & $index & DirSep
let simplePath = path & "beacon_node.log"
let bootPath = path & "bootstrap_node.log"
if fileExists(simplePath):
yield simplePath
elif fileExists(bootPath):
yield bootPath
else:
break
inc(index)
proc getDirectoryLogFiles(builddir: string,
filter: seq[string]): seq[NodeDirectory] =
var res = newSeq[NodeDirectory]()
let absPath = absolutePath(builddir)
let dataPath = absPath & DirSep & "data"
if not dirExists(dataPath):
error "Invalid `network` data directory structure",
path = dataPath
quit(1)
for dirPath in walkDirs(dataPath & DirSep & "*"):
let name = extractFilename(dirPath)
if (len(filter) == 0) or (name in filter):
var nodeDir = NodeDirectory(name: extractFilename(dirPath),
path: dirPath)
for filePath in walkFiles(dirPath & DirSep & "*.log"):
nodeDir.logs.add(extractFilename(filePath))
if len(nodeDir.logs) > 0:
res.add(nodeDir)
return res
proc getLogFiles(builddir: string,
filter: seq[string]): seq[NodeDirectory] =
var res = newSeq[NodeDirectory]()
let dataPath = absolutePath(builddir)
if not dirExists(dataPath):
error "Logs directory did not exist", path = dataPath
quit(1)
for filePath in walkFiles(dataPath & DirSep & "*.*"):
let name = extractFilename(filePath)
if (len(filter) == 0) or (name in filter):
let nodeDir = NodeDirectory(name: extractFilename(filePath),
path: dataPath,
logs: @[extractFilename(filePath)])
res.add(nodeDir)
return res
func getMessage(logs: seq[GossipMessage],
msg: GossipMessage): Option[GossipMessage] =
{.push warning[ProveInit]: off.}
result = none[GossipMessage]()
{.pop.}
for i in 0 ..< len(logs):
if logs[i].kind == Incoming and logs[i].id == msg.id:
{.push warning[ProveInit]: off.}
result = some(logs[i])
{.pop.}
proc runPubsub(logConf: LogTraceConf, logFiles: seq[string]) =
var logs = newSeq[tuple[name: string, data: seq[GossipMessage]]]()
if len(logFiles) < 2:
error "Number of log files insufficient to process pubsub messages",
logs_count = len(logFiles)
quit(1)
for item in logFiles:
let data = filterGossipMessages(readLogFile(item))
logs.add((name: item, data: data))
info "Loaded log file", logfile = item, lines_count = len(data)
{.push warning[ProveInit]: off.}
var checks = newSeq[Option[GossipMessage]](len(logs))
{.pop.}
var misses = 0
for i in 0 ..< len(logs):
info "Processing log file", logfile = logs[i].name
for k in 0 ..< len(logs[i].data):
let item = logs[i].data[k]
if item.kind == Outgoing:
info "Searching message", message_id = $item, logfile = logs[i].name
checks[i] = some(item)
for z in 1 ..< len(logs):
let index = (i + z) mod len(logs)
checks[index] = getMessage(logs[index].data, item)
for z in 1 ..< len(checks):
let index = (i + z) mod len(logs)
if not(checks[index].isSome()):
warn "Message not found in log", logfile = logs[index].name,
message_id = $item
inc(misses)
if misses == 0:
info "No missing messages found"
else:
warn "Number of missing messages found", count = $misses
proc runAttSend(logConf: LogTraceConf, logFiles: seq[string]) =
info "Check for late `attestation sent` messages"
if len(logFiles) < 1:
error "Number of log files insufficient to process pubsub messages",
logs_count = len(logFiles)
quit(1)
let minDuration = initDuration(seconds = 4)
var slotMessagesCount = 0
var attsMessagesCount = 0
var lateAttsMessagesCount = 0
for item in logFiles:
info "Processing log file", logFile = item
let data = readLogFileForAttsMessages(item,
logConf.ignoreSerializationErrors,
logConf.dumpSerializationErrors)
var currentSlot: Option[SlotStartMessage]
for item in data:
if item.kind == SMessageType.SlotStart:
currentSlot = some(item.ssmsg)
inc(slotMessagesCount)
elif item.kind == SMessageType.AttestationSent:
if currentSlot.isSome():
let attestationTime = currentSlot.get().timestamp -
item.asmsg.timestamp
if attestationTime > minDuration:
warn "Found an attestation that was sent later than necessary",
lateness = $attestationTime, slot = currentSlot.get(),
attestation = item.asmsg
inc(lateAttsMessagesCount)
inc(attsMessagesCount)
else:
warn "`Attestation sent` message appears before `Start slot` message",
attestation = item.asmsg
info "Check finished", attestation_sent_messages = attsMessagesCount,
slot_messages = slotMessagesCount,
late_attestation_messages = lateAttsMessagesCount
func toSimple(s: seq[string]): string =
"[" & s.mapIt("'" & it & "'").join(", ") & "]"
proc runAttSendReceive(logConf: LogTraceConf, nodes: seq[NodeDirectory]) =
info "Check for attestation sent/received messages"
if len(nodes) < 2:
error "Number of nodes' log files insufficient", nodes_count = len(nodes)
quit(1)
var srnodes = newSeq[SRANode]()
for node in nodes:
var srnode = SRANode(
directory: node,
sends: newSeq[AttestationSentMessage](),
recvs: newTable[string, AttestationReceivedMessage](),
aggSends: newSeq[AggregatedAttestationSentMessage](),
aggRecvs: newTable[string, AggregatedAttestationReceivedMessage]()
)
info "Processing node", node = node.name
for logfile in node.logs:
let path = node.path & DirSep & logfile
info "Processing node's logfile", node = node.name, logfile = path
readLogFileForASRMessages(path, srnode,
logConf.ignoreSerializationErrors,
logConf.dumpSerializationErrors)
srnodes.add(srnode)
if len(nodes) < 2:
error "Number of nodes' log files insufficient", nodes_count = len(nodes)
quit(1)
for i in 0 ..< len(srnodes):
var success = 0
var failed = 0
for item in srnodes[i].sends:
var k = (i + 1) mod len(srnodes)
var misses = newSeq[string]()
while k != i:
if item.attestation.signature notin srnodes[k].recvs:
misses.add(srnodes[k].directory.name)
k = (k + 1) mod len(srnodes)
if len(misses) == 0:
inc(success)
else:
inc(failed)
info "Attestation was not received", sender = srnodes[i].directory.name,
signature = item.attestation.signature,
receivers = misses.toSimple(), send_stamp = item.timestamp
info "Statistics for sender node", sender = srnodes[i].directory.name,
successful_broadcasts = success, failed_broadcasts = failed,
total_broadcasts = len(srnodes[i].sends)
proc runAggAttSendReceive(logConf: LogTraceConf, nodes: seq[NodeDirectory]) =
info "Check for aggregate attestation sent/received messages"
if len(nodes) < 2:
error "Number of nodes' log files insufficient", nodes_count = len(nodes)
quit(1)
var srnodes = newSeq[SRANode]()
for node in nodes:
var srnode = SRANode(
directory: node,
sends: newSeq[AttestationSentMessage](),
recvs: newTable[string, AttestationReceivedMessage](),
aggSends: newSeq[AggregatedAttestationSentMessage](),
aggRecvs: newTable[string, AggregatedAttestationReceivedMessage]()
)
info "Processing node", node = node.name
for logfile in node.logs:
let path = node.path & DirSep & logfile
info "Processing node's logfile", node = node.name, logfile = path
readLogFileForASRMessages(path, srnode,
logConf.ignoreSerializationErrors,
logConf.dumpSerializationErrors)
srnodes.add(srnode)
if len(nodes) < 2:
error "Number of nodes' log files insufficient", nodes_count = len(nodes)
quit(1)
for i in 0 ..< len(srnodes):
var success = 0
var failed = 0
for item in srnodes[i].aggSends:
var k = (i + 1) mod len(srnodes)
var misses = newSeq[string]()
while k != i:
if item.signature notin srnodes[k].aggRecvs:
misses.add(srnodes[k].directory.name)
k = (k + 1) mod len(srnodes)
if len(misses) == 0:
inc(success)
else:
inc(failed)
info "Aggregate attestation was not received",
sender = srnodes[i].directory.name,
signature = item.signature,
receivers = misses.toSimple(), send_stamp = item.timestamp
info "Statistics for sender node", sender = srnodes[i].directory.name,
successful_broadcasts = success, failed_broadcasts = failed,
total_broadcasts = len(srnodes[i].aggSends)
proc runSCMSendReceive(logConf: LogTraceConf, nodes: seq[NodeDirectory]) =
info "Check for Sync Committee Message sent/received messages"
if len(nodes) < 2:
error "Number of nodes' log files insufficient", nodes_count = len(nodes)
quit(1)
var srnodes = newSeq[SRSCNode]()
for node in nodes:
var srnode = SRSCNode(
directory: node,
sends: newSeq[SCMSentMessage](),
recvs: newTable[string, SCMReceivedMessage](),
contributionSends: newSeq[ContributionSentMessage](),
contributionRecvs: newTable[string, ContributionReceivedMessage]()
)
info "Processing node", node = node.name
for logfile in node.logs:
let path = node.path & DirSep & logfile
info "Processing node's logfile", node = node.name, logfile = path
readLogFileForSCMSRMessages(path, srnode,
logConf.ignoreSerializationErrors,
logConf.dumpSerializationErrors)
srnodes.add(srnode)
if len(nodes) < 2:
error "Number of nodes' log files insufficient", nodes_count = len(nodes)
quit(1)
for i in 0 ..< len(srnodes):
var success = 0
var failed = 0
for item in srnodes[i].sends:
var k = (i + 1) mod len(srnodes)
var misses = newSeq[string]()
while k != i:
if item.message.signature notin srnodes[k].recvs:
misses.add(srnodes[k].directory.name)
k = (k + 1) mod len(srnodes)
if len(misses) == 0:
inc(success)
else:
inc(failed)
info "Sync committee message was not received", sender = srnodes[i].directory.name,
signature = item.message.signature,
receivers = misses.toSimple(), send_stamp = item.timestamp
info "Statistics for sender node", sender = srnodes[i].directory.name,
successful_broadcasts = success, failed_broadcasts = failed,
total_broadcasts = len(srnodes[i].sends)
proc runContributionSendReceive(logConf: LogTraceConf, nodes: seq[NodeDirectory]) =
info "Check for contribution sent/received messages"
if len(nodes) < 2:
error "Number of nodes' log files insufficient", nodes_count = len(nodes)
quit(1)
var srnodes = newSeq[SRSCNode]()
for node in nodes:
var srnode = SRSCNode(
directory: node,
sends: newSeq[SCMSentMessage](),
recvs: newTable[string, SCMReceivedMessage](),
contributionSends: newSeq[ContributionSentMessage](),
contributionRecvs: newTable[string, ContributionReceivedMessage]()
)
info "Processing node", node = node.name
for logfile in node.logs:
let path = node.path & DirSep & logfile
info "Processing node's logfile", node = node.name, logfile = path
readLogFileForSCMSRMessages(path, srnode,
logConf.ignoreSerializationErrors,
logConf.dumpSerializationErrors)
srnodes.add(srnode)
if len(nodes) < 2:
error "Number of nodes' log files insufficient", nodes_count = len(nodes)
quit(1)
for i in 0 ..< len(srnodes):
var success = 0
var failed = 0
for item in srnodes[i].contributionSends:
var k = (i + 1) mod len(srnodes)
var misses = newSeq[string]()
while k != i:
if item.contribution.signature notin srnodes[k].contributionRecvs:
misses.add(srnodes[k].directory.name)
k = (k + 1) mod len(srnodes)
if len(misses) == 0:
inc(success)
else:
inc(failed)
info "Contribution was not received",
sender = srnodes[i].directory.name,
signature = item.contribution.signature,
receivers = misses.toSimple(), send_stamp = item.timestamp
info "Statistics for sender node", sender = srnodes[i].directory.name,
successful_broadcasts = success, failed_broadcasts = failed,
total_broadcasts = len(srnodes[i].contributionSends)
proc runLatencyCheck(logConf: LogTraceConf, logFiles: seq[string],
nodes: seq[NodeDirectory]) =
info "Check for async responsiveness"
if len(nodes) == 0 and len(logFiles) == 0:
error "Number of log files insufficient", nodes_count = len(nodes)
quit(1)
let allowedTime = int64(float(initDuration(seconds = 1).inMilliseconds()) *
logConf.allowedLag)
for logFile in logFiles:
info "Processing log file", logfile = logFile
let msgs = readLogFileForSecondMessages(logFile,
logConf.ignoreSerializationErrors,
logConf.dumpSerializationErrors)
var lastSecond: Option[LogMessage]
var minEntry: Option[LogMessage]
var maxEntry: Option[LogMessage]
var minTime: times.Duration = initDuration(days = 1)
var maxTime: times.Duration
var sumMilliseconds: int64
for item in msgs:
if lastSecond.isNone():
lastSecond = some(item)
else:
let time = item.timestamp - lastSecond.get().timestamp
let start_time = lastSecond.get().timestamp
let finish_time = item.timestamp
if time.inMilliseconds() > allowedTime:
info "Found time lag ",
start_time = start_time.format("yyyy-MM-dd HH:mm:ss'.'fff"),
finish_time = finish_time.format("yyyy-MM-dd HH:mm:ss'.'fff"),
lag_time = time
if time < minTime:
minTime = time
minEntry = some(item)
if time > maxTime:
maxTime = time
maxEntry = some(item)
sumMilliseconds += time.inMilliseconds()
lastSecond = some(item)
let avgTime = initDuration(milliseconds = sumMilliseconds div len(msgs))
info "Latency statistics", min_time = minTime, max_time = maxTime,
avg_time = avgTime, seconds_count = len(msgs)
proc run(conf: LogTraceConf) =
var logFiles: seq[string]
var logNodes: seq[NodeDirectory]
if len(conf.logFiles) > 0:
for item in conf.logFiles:
let absPath = absolutePath(item)
if fileExists(absPath):
logFiles.add(absPath)
if len(conf.simDir) > 0:
for item in simDirectoryLogFiles(conf.simDir):
logFiles.add(item)
logNodes = getDirectoryLogFiles(conf.simDir, conf.nodes)
if len(conf.netDir) > 0:
logNodes = getDirectoryLogFiles(conf.netDir, conf.nodes)
if len(conf.logDir) > 0:
logNodes = getLogFiles(conf.logDir, conf.nodes)
if len(logFiles) == 0 and len(logNodes) == 0:
error "Log file sources not specified or not enough log files found"
quit(1)
case conf.cmd
of StartUpCommand.pubsub:
runPubsub(conf, logFiles)
of StartUpCommand.asl:
runAttSend(conf, logFiles)
of StartUpCommand.asr:
runAttSendReceive(conf, logNodes)
of StartUpCommand.aggasr:
runAggAttSendReceive(conf, logNodes)
of StartUpCommand.scmsr:
runSCMSendReceive(conf, logNodes)
of StartUpCommand.csr:
runContributionSendReceive(conf, logNodes)
of StartUpCommand.lat:
runLatencyCheck(conf, logFiles, logNodes)
of StartUpCommand.traceAll:
runContributionSendReceive(conf, logNodes)
runSCMSendReceive(conf, logNodes)
runAggAttSendReceive(conf, logNodes)
runAttSendReceive(conf, logNodes)
runLatencyCheck(conf, logFiles, logNodes)
runPubsub(conf, logFiles)
runAttSend(conf, logFiles)
of StartUpCommand.localSimChecks:
runAggAttSendReceive(conf, logNodes)
# TODO All analysis types can be converted to the more efficient
# LogVisitor style, so they can enabled together here.
# See the discussion below for some potential caveats:
# https://github.com/status-im/nimbus-eth2/pull/3583#pullrequestreview-941934055
var tracer = LogTracer()
tracer.enabledVisitors.add failedValidationsChecker()
tracer.enabledVisitors.add syncAggregateChecker(conf.constPreset)
var issuesDetected = false
for node in logNodes:
for logFile in node.logs:
let report = tracer.processFile(node.path / logFile)
if not report.isEmpty:
if not issuesDetected:
issuesDetected = true
echo "# Logtrace Report"
echo ""
echo "## ", logFile
echo ""
print report
quit ord(issuesDetected)
when isMainModule:
echo LogTraceHeader
var conf = LogTraceConf.load(version = LogTraceVersion)
run(conf)