mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-01-10 06:16:25 +00:00
d07113767d
Since the sync committee duties are no longer updated on every slot and previously the sync committee aggregators selection proofs were generated during the duties update, this now resulted in the client using stale selection proofs (they must be generated at each slot). The fix consists of moving the selection proof generation logic in a different function which is properly executed on each slot. Other changes: * The logtrace tool has been enhanced with a framework for adding new simpler log aggregation and analysis algorithms. The default CI testnet simulation will now ensure that the blocks in the network have reasonable sync committee participation.
1182 lines
40 KiB
Nim
1182 lines
40 KiB
Nim
# beacon_chain
|
|
# Copyright (c) 2018-2021 Status Research & Development GmbH
|
|
# Licensed and distributed under either of
|
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
|
import confutils, json, times, streams, os, strutils, options, chronicles,
|
|
tables, sequtils
|
|
import json_serialization
|
|
|
|
const
|
|
LogTraceName = "Beacon-Chain LogTrace Tool"
|
|
LogTraceMajor: int = 0
|
|
LogTraceMinor: int = 0
|
|
LogTracePatch: int = 4
|
|
LogTraceVersion = $LogTraceMajor & "." & $LogTraceMinor & "." &
|
|
$LogTracePatch
|
|
LogTraceCopyright = "Copyright(C) 2021" &
|
|
" Status Research & Development GmbH"
|
|
LogTraceHeader = LogTraceName & ", Version " & LogTraceVersion &
|
|
" [" & hostOS & ": " & hostCPU & "]\r\n" &
|
|
LogTraceCopyright & "\r\n"
|
|
|
|
type
|
|
StartUpCommand {.pure.} = enum
|
|
pubsub, asl, asr, aggasr, scmsr, csr, lat, traceAll, localSimChecks
|
|
|
|
LogTraceConf = object
|
|
logFiles {.
|
|
desc: "Specifies one or more log files",
|
|
abbr: "f",
|
|
name: "log-file" .}: seq[string]
|
|
|
|
simDir {.
|
|
desc: "Specifies path to eth2_network_simulation directory",
|
|
defaultValue: "",
|
|
name: "sim-dir" .}: string
|
|
|
|
netDir {.
|
|
desc: "Specifies path to network build directory",
|
|
defaultValue: "",
|
|
name: "net-dir" .}: string
|
|
|
|
logDir {.
|
|
desc: "Specifies path with bunch of logs",
|
|
defaultValue: "",
|
|
name: "log-dir" .}: string
|
|
|
|
ignoreSerializationErrors {.
|
|
desc: "Ignore serialization errors while parsing log files",
|
|
defaultValue: true,
|
|
name: "ignore-errors" .}: bool
|
|
|
|
dumpSerializationErrors {.
|
|
desc: "Dump full serialization errors while parsing log files",
|
|
defaultValue: false ,
|
|
name: "dump-errors" .}: bool
|
|
|
|
nodes {.
|
|
desc: "Specifies node names which logs will be used",
|
|
name: "nodes" .}: seq[string]
|
|
|
|
allowedLag {.
|
|
desc: "Allowed latency lag multiplier",
|
|
defaultValue: 2.0,
|
|
name: "lag" .}: float
|
|
|
|
constPreset {.
|
|
desc: "The const preset being used"
|
|
defaultValue: "mainnet"
|
|
name: "const-preset" .}: string
|
|
|
|
case cmd {.command.}: StartUpCommand
|
|
of pubsub:
|
|
discard
|
|
of asl:
|
|
discard
|
|
of asr:
|
|
discard
|
|
of aggasr:
|
|
discard
|
|
of scmsr:
|
|
discard
|
|
of csr:
|
|
discard
|
|
of lat:
|
|
discard
|
|
of traceAll:
|
|
discard
|
|
of localSimChecks:
|
|
discard
|
|
|
|
IssuesGroup = ref object
|
|
name: string
|
|
fatalIssues: seq[string]
|
|
warnings: seq[string]
|
|
|
|
FileReport = object
|
|
categories: seq[IssuesGroup]
|
|
|
|
LogVisitor = object
|
|
visitLine: proc(msg, fullLine: string) {.gcsafe, raises: [Defect].}
|
|
produceReport: proc(): FileReport {.gcsafe, raises: [Defect].}
|
|
|
|
LogVisitorFactory = proc(): LogVisitor {.gcsafe, raises: [Defect].}
|
|
|
|
LogTracer = object
|
|
enabledVisitors: seq[LogVisitorFactory]
|
|
|
|
GossipDirection = enum
|
|
None, Incoming, Outgoing
|
|
|
|
NodeDirectory = object
|
|
name: string
|
|
path: string
|
|
logs: seq[string]
|
|
|
|
LogMessage = object of RootObj
|
|
level {.serializedFieldName: "lvl" .}: string
|
|
timestamp {.serializedFieldName: "ts" .}: DateTime
|
|
msg: string
|
|
topics: string
|
|
tid: int
|
|
|
|
SlotStartMessage = object of LogMessage
|
|
beaconTime: uint64
|
|
finalizedEpoch: uint64
|
|
finalizedRoot: string
|
|
finalizedSlot: uint64
|
|
headEpoch: uint64
|
|
headRoot: string
|
|
headSlot: uint64
|
|
lastSlot: uint64
|
|
peers: uint64
|
|
scheduledSlot: uint64
|
|
|
|
AttestationDataObject = object
|
|
slot: uint64
|
|
index: uint64
|
|
beaconBlockRoot {.serializedFieldName: "beacon_block_root".}: string
|
|
sourceEpoch {.serializedFieldName: "source_epoch".}: uint64
|
|
sourceRoot {.serializedFieldName: "source_root".}: string
|
|
targetEpoch {.serializedFieldName: "target_epoch".}: uint64
|
|
targetRoot {.serializedFieldName: "target_root".}: string
|
|
|
|
AttestationObject = object
|
|
aggregationBits {.serializedFieldName: "aggregation_bits".}: string
|
|
data: AttestationDataObject
|
|
signature: string
|
|
|
|
AttestationSentMessage = object of LogMessage
|
|
attestation: AttestationObject
|
|
|
|
AttestationReceivedMessage = object of LogMessage
|
|
attestation: AttestationObject
|
|
head: string
|
|
wallSlot: uint64
|
|
pcs: string
|
|
|
|
AggregatedAttestationSentMessage = object of LogMessage
|
|
attestation: AttestationObject
|
|
signature: string
|
|
|
|
AggregatedAttestationReceivedMessage = object of LogMessage
|
|
aggregate: AttestationObject
|
|
wallSlot: uint64
|
|
signature: string
|
|
|
|
BlockSentMessage = object
|
|
# The message structure is as follows:
|
|
#[
|
|
{
|
|
"lvl": "NOT",
|
|
"ts": "2022-11-21 23:02:37.032+02:00",
|
|
"msg": "Block sent",
|
|
"topics": "beacval",
|
|
"blockRoot": "7a0836e4",
|
|
"blck": {
|
|
"slot": 15,
|
|
"proposer_index": 96,
|
|
"parent_root": "487372dc",
|
|
"state_root": "06699625",
|
|
"eth1data": {
|
|
"deposit_root": "6c3ff67871b79b7aecc7a125e7ec9ff857879a1c83e50513be113103acf8ca3f",
|
|
"deposit_count": 1024,
|
|
"block_hash": "4242424242424242424242424242424242424242424242424242424242424242"
|
|
},
|
|
"graffiti": "Nimbus/v22.10.1-eb6615-stateofus",
|
|
"proposer_slashings_len": 0,
|
|
"attester_slashings_len": 0,
|
|
"attestations_len": 4,
|
|
"deposits_len": 0,
|
|
"voluntary_exits_len": 0,
|
|
"sync_committee_participants": 32,
|
|
"block_number": 0,
|
|
"fee_recipient": ""
|
|
},
|
|
"signature": "b544f144",
|
|
"delay": "32ms3us"
|
|
}
|
|
]#
|
|
# So far, logtrace needs only a single property of the block object.
|
|
# Feel free to add additional fields to be parsed as necessary.
|
|
blck: BlockShortLog
|
|
|
|
BlockShortLog = object
|
|
sync_committee_participants: int
|
|
|
|
SyncCommitteeMessageObject = object
|
|
slot: uint64
|
|
beaconBlockRoot {.serializedFieldName: "beacon_block_root".}: string
|
|
validatorIndex {.serializedFieldName: "validator_index".}: uint64
|
|
signature: string
|
|
|
|
ContributionObject = object
|
|
slot: uint64
|
|
beaconBlockRoot {.serializedFieldName: "beacon_block_root".}: string
|
|
subcommittee_index: uint64
|
|
aggregationBits {.serializedFieldName: "aggregation_bits".}: string
|
|
|
|
ContributionSentObject = object
|
|
contribution: ContributionObject
|
|
aggregatorIndex {.serializedFieldName: "aggregator_index".}: uint64
|
|
signature: string
|
|
|
|
SCMSentMessage = object of LogMessage
|
|
message: SyncCommitteeMessageObject
|
|
|
|
SCMReceivedMessage = object of LogMessage
|
|
wallSlot: uint64
|
|
syncCommitteeMsg: SyncCommitteeMessageObject
|
|
subcommitteeIdx: uint64
|
|
|
|
ContributionSentMessage = object of LogMessage
|
|
contribution: ContributionSentObject
|
|
|
|
ContributionReceivedMessage = object of LogMessage
|
|
contribution: ContributionObject
|
|
wallSlot: uint64
|
|
aggregatorIndex {.serializedFieldName: "aggregator_index".}: uint64
|
|
signature: string
|
|
selectionProof {.serializedFieldName: "selection_proof".}: string
|
|
|
|
GossipMessage = object
|
|
kind: GossipDirection
|
|
id: string
|
|
datetime: DateTime
|
|
processed: bool
|
|
|
|
SMessageType {.pure.} = enum
|
|
AttestationSent, SCMSent, SlotStart
|
|
|
|
SlotMessage = object
|
|
case kind: SMessageType
|
|
of SMessageType.AttestationSent:
|
|
asmsg: AttestationSentMessage
|
|
of SMessageType.SCMSent:
|
|
scmsmsg: SCMSentMessage
|
|
of SMessageType.SlotStart:
|
|
ssmsg: SlotStartMessage
|
|
|
|
# SlotMessage = object
|
|
# case kind: SMessageType
|
|
# of SMessageType.SCMSent:
|
|
# scmsmsg: SCMSentMessage
|
|
# of SMessageType.SlotStart:
|
|
# ssmsg: SlotStartMessage
|
|
|
|
SRANode = object
|
|
directory: NodeDirectory
|
|
sends: seq[AttestationSentMessage]
|
|
recvs: TableRef[string, AttestationReceivedMessage]
|
|
aggSends: seq[AggregatedAttestationSentMessage]
|
|
aggRecvs: TableRef[string, AggregatedAttestationReceivedMessage]
|
|
|
|
SRSCNode = object
|
|
directory: NodeDirectory
|
|
sends: seq[SCMSentMessage]
|
|
recvs: TableRef[string, SCMReceivedMessage]
|
|
contributionSends: seq[ContributionSentMessage]
|
|
contributionRecvs: TableRef[string, ContributionReceivedMessage]
|
|
|
|
template noIssues: FileReport =
|
|
FileReport()
|
|
|
|
template hasIssues(issuesCategories: varargs[IssuesGroup]): FileReport =
|
|
FileReport(categories: @issuesCategories)
|
|
|
|
proc copyEntriesTo(src: FileReport, dst: var FileReport) =
|
|
for c in src.categories:
|
|
dst.categories.add c
|
|
|
|
func isEmpty(r: FileReport): bool =
|
|
r.categories.len == 0
|
|
|
|
proc printCategory(severityLevel: string, issues: openArray[string]) =
|
|
if issues.len > 0:
|
|
echo ""
|
|
echo severityLevel, ":"
|
|
for issue in issues:
|
|
echo "* ", issue
|
|
|
|
proc print(r: FileReport) =
|
|
for category in r.categories:
|
|
echo "### ", category.name
|
|
printCategory "Fatal Issues", category.fatalIssues
|
|
printCategory "Warnings", category.warnings
|
|
echo ""
|
|
|
|
template fatal(issuesGroup: IssuesGroup, msg: string) =
|
|
issuesGroup.fatalIssues.add msg
|
|
|
|
template warning(issuesGroup: IssuesGroup, msg: string) =
|
|
issuesGroup.warnings.add msg
|
|
|
|
proc new(T: type IssuesGroup, name: string): T =
|
|
T(name: name)
|
|
|
|
proc readValue(reader: var JsonReader, value: var DateTime) =
|
|
let s = reader.readValue(string)
|
|
try:
|
|
value = parse(s, "YYYY-MM-dd HH:mm:ss'.'fffzzz", utc())
|
|
except CatchableError:
|
|
raiseUnexpectedValue(reader, "Invalid date time")
|
|
|
|
proc init(t: typedesc[GossipMessage], kind: GossipDirection, id,
|
|
datestr: string): GossipMessage =
|
|
GossipMessage(
|
|
kind: kind,
|
|
id: id,
|
|
datetime: parse(datestr, "YYYY-MM-dd HH:mm:ss'.'fffzzz")
|
|
)
|
|
|
|
func `$`(msg: GossipMessage): string =
|
|
msg.id
|
|
|
|
proc readLogFile(file: string): seq[JsonNode] =
|
|
var res = newSeq[JsonNode]()
|
|
var stream = newFileStream(file)
|
|
try:
|
|
while not(stream.atEnd()):
|
|
var line = stream.readLine()
|
|
let node = parseJson(line)
|
|
res.add(node)
|
|
result = res
|
|
except CatchableError as exc:
|
|
warn "Error reading JSON data from file", file = file,
|
|
errorMsg = exc.msg
|
|
finally:
|
|
stream.close()
|
|
|
|
proc readLogFileForAttsMessages(file: string,
|
|
ignoreErrors = true,
|
|
dumpErrors = false): seq[SlotMessage] =
|
|
var res = newSeq[SlotMessage]()
|
|
var stream = newFileStream(file)
|
|
var line: string
|
|
var counter = 0
|
|
try:
|
|
while not(stream.atEnd()):
|
|
line = stream.readLine()
|
|
inc(counter)
|
|
var m: LogMessage
|
|
try:
|
|
m = Json.decode(line, LogMessage, allowUnknownFields = true)
|
|
except SerializationError as exc:
|
|
if dumpErrors:
|
|
error "Serialization error while reading file, ignoring", file = file,
|
|
line_number = counter, errorMsg = exc.formatMsg(line)
|
|
else:
|
|
error "Serialization error while reading file, ignoring", file = file,
|
|
line_number = counter
|
|
if not(ignoreErrors):
|
|
raise exc
|
|
else:
|
|
continue
|
|
|
|
if m.msg == "Attestation sent":
|
|
let am = Json.decode(line, AttestationSentMessage,
|
|
allowUnknownFields = true)
|
|
let m = SlotMessage(kind: SMessageType.AttestationSent,
|
|
asmsg: am)
|
|
res.add(m)
|
|
elif m.msg == "Slot start":
|
|
let sm = Json.decode(line, SlotStartMessage,
|
|
allowUnknownFields = true)
|
|
let m = SlotMessage(kind: SMessageType.SlotStart,
|
|
ssmsg: sm)
|
|
res.add(m)
|
|
|
|
if counter mod 10_000 == 0:
|
|
info "Processing file", file = extractFilename(file),
|
|
lines_processed = counter,
|
|
lines_filtered = len(res)
|
|
result = res
|
|
|
|
except CatchableError as exc:
|
|
warn "Error reading data from file", file = file, errorMsg = exc.msg
|
|
finally:
|
|
stream.close()
|
|
|
|
proc readLogFileForASRMessages(file: string, srnode: var SRANode,
|
|
ignoreErrors = true, dumpErrors = false) =
|
|
var stream = newFileStream(file)
|
|
var line: string
|
|
var counter = 0
|
|
try:
|
|
while not(stream.atEnd()):
|
|
var m: LogMessage
|
|
line = stream.readLine()
|
|
inc(counter)
|
|
try:
|
|
m = Json.decode(line, LogMessage, allowUnknownFields = true)
|
|
except SerializationError as exc:
|
|
if dumpErrors:
|
|
error "Serialization error while reading file, ignoring", file = file,
|
|
line_number = counter, errorMsg = exc.formatMsg(line)
|
|
else:
|
|
error "Serialization error while reading file, ignoring", file = file,
|
|
line_number = counter
|
|
if not(ignoreErrors):
|
|
raise exc
|
|
else:
|
|
continue
|
|
|
|
if m.msg == "Attestation sent":
|
|
let sm = Json.decode(line, AttestationSentMessage,
|
|
allowUnknownFields = true)
|
|
srnode.sends.add(sm)
|
|
elif m.msg == "Attestation received":
|
|
let rm = Json.decode(line, AttestationReceivedMessage,
|
|
allowUnknownFields = true)
|
|
discard srnode.recvs.hasKeyOrPut(rm.attestation.signature, rm)
|
|
elif m.msg == "Aggregate received":
|
|
let rm = Json.decode(line, AggregatedAttestationReceivedMessage,
|
|
allowUnknownFields = true)
|
|
discard srnode.aggRecvs.hasKeyOrPut(rm.signature, rm)
|
|
elif m.msg == "Aggregated attestation sent":
|
|
let sm = Json.decode(line, AggregatedAttestationSentMessage,
|
|
allowUnknownFields = true)
|
|
srnode.aggSends.add(sm)
|
|
|
|
if counter mod 10_000 == 0:
|
|
info "Processing file", file = extractFilename(file),
|
|
lines_processed = counter,
|
|
sends_filtered = len(srnode.sends),
|
|
recvs_filtered = len(srnode.recvs)
|
|
|
|
except CatchableError as exc:
|
|
warn "Error reading data from file", file = file, errorMsg = exc.msg
|
|
finally:
|
|
stream.close()
|
|
|
|
proc readLogFileForSCMSendMessages(file: string,
|
|
ignoreErrors = true,
|
|
dumpErrors = false): seq[SlotMessage] =
|
|
var res = newSeq[SlotMessage]()
|
|
var stream = newFileStream(file)
|
|
var line: string
|
|
var counter = 0
|
|
try:
|
|
while not(stream.atEnd()):
|
|
line = stream.readLine()
|
|
inc(counter)
|
|
var m: LogMessage
|
|
try:
|
|
m = Json.decode(line, LogMessage, allowUnknownFields = true)
|
|
except SerializationError as exc:
|
|
if dumpErrors:
|
|
error "Serialization error while reading file, ignoring", file = file,
|
|
line_number = counter, errorMsg = exc.formatMsg(line)
|
|
else:
|
|
error "Serialization error while reading file, ignoring", file = file,
|
|
line_number = counter
|
|
if not(ignoreErrors):
|
|
raise exc
|
|
else:
|
|
continue
|
|
|
|
if m.msg == "Sync committee message sent":
|
|
let scmm = Json.decode(line, SCMSentMessage,
|
|
allowUnknownFields = true)
|
|
let m = SlotMessage(kind: SMessageType.SCMSent,
|
|
scmsmsg: scmm)
|
|
res.add(m)
|
|
elif m.msg == "Slot start":
|
|
let sm = Json.decode(line, SlotStartMessage,
|
|
allowUnknownFields = true)
|
|
let m = SlotMessage(kind: SMessageType.SlotStart,
|
|
ssmsg: sm)
|
|
res.add(m)
|
|
|
|
if counter mod 10_000 == 0:
|
|
info "Processing file", file = extractFilename(file),
|
|
lines_processed = counter,
|
|
lines_filtered = len(res)
|
|
result = res
|
|
|
|
except CatchableError as exc:
|
|
warn "Error reading data from file", file = file, errorMsg = exc.msg
|
|
finally:
|
|
stream.close()
|
|
|
|
proc readLogFileForSCMSRMessages(file: string, srnode: var SRSCNode,
|
|
ignoreErrors = true, dumpErrors = false) =
|
|
var stream = newFileStream(file)
|
|
var line: string
|
|
var counter = 0
|
|
try:
|
|
while not(stream.atEnd()):
|
|
var m: LogMessage
|
|
line = stream.readLine()
|
|
inc(counter)
|
|
try:
|
|
m = Json.decode(line, LogMessage, allowUnknownFields = true)
|
|
except SerializationError as exc:
|
|
if dumpErrors:
|
|
error "Serialization error while reading file, ignoring", file = file,
|
|
line_number = counter, errorMsg = exc.formatMsg(line)
|
|
else:
|
|
error "Serialization error while reading file, ignoring", file = file,
|
|
line_number = counter
|
|
if not(ignoreErrors):
|
|
raise exc
|
|
else:
|
|
continue
|
|
|
|
if m.msg == "Sync committee message sent":
|
|
let sm = Json.decode(line, SCMSentMessage,
|
|
allowUnknownFields = true)
|
|
srnode.sends.add(sm)
|
|
elif m.msg == "Sync committee message received":
|
|
let rm = Json.decode(line, SCMReceivedMessage,
|
|
allowUnknownFields = true)
|
|
discard srnode.recvs.hasKeyOrPut(rm.syncCommitteeMsg.signature, rm)
|
|
|
|
elif m.msg == "Contribution received":
|
|
let rm = Json.decode(line, ContributionReceivedMessage,
|
|
allowUnknownFields = true)
|
|
discard srnode.contributionRecvs.hasKeyOrPut(rm.signature, rm)
|
|
|
|
elif m.msg == "Contribution sent":
|
|
let sm = Json.decode(line, ContributionSentMessage,
|
|
allowUnknownFields = true)
|
|
srnode.contributionSends.add(sm)
|
|
|
|
if counter mod 10_000 == 0:
|
|
info "Processing file", file = extractFilename(file),
|
|
lines_processed = counter,
|
|
sends_filtered = len(srnode.sends),
|
|
recvs_filtered = len(srnode.recvs)
|
|
|
|
except CatchableError as exc:
|
|
warn "Error reading data from file", file = file, errorMsg = exc.msg
|
|
finally:
|
|
stream.close()
|
|
|
|
proc processFile(tracer: LogTracer, file: string): FileReport =
|
|
var stream = newFileStream(file)
|
|
let visitors = mapIt(tracer.enabledVisitors, it())
|
|
|
|
try:
|
|
while not (stream.atEnd()):
|
|
let line = stream.readLine()
|
|
var reader = JsonReader[DefaultFlavor].init(memoryInput(line))
|
|
for fieldName in reader.readObjectFields:
|
|
if fieldName == "msg":
|
|
let msg = reader.readValue(string)
|
|
for visitor in visitors:
|
|
visitor.visitLine(msg, line)
|
|
break
|
|
else:
|
|
reader.skipSingleJsValue()
|
|
except CatchableError as exc:
|
|
warn "Error reading data from file", file = file, errorMsg = exc.msg
|
|
finally:
|
|
stream.close()
|
|
|
|
for vistor in visitors:
|
|
let report = vistor.produceReport()
|
|
if not report.isEmpty:
|
|
report.copyEntriesTo result
|
|
|
|
proc failedValidationsChecker: LogVisitorFactory =
|
|
return proc (): LogVisitor =
|
|
var failedValidations = initCountTable[string]()
|
|
|
|
LogVisitor(
|
|
visitLine: proc (msg, line: string) =
|
|
if msg.endsWith("failed validation"):
|
|
failedValidations.inc msg
|
|
,
|
|
produceReport: proc (): FileReport =
|
|
if failedValidations.len > 0:
|
|
let issues = IssuesGroup.new "Failed Validations"
|
|
for msg, count in failedValidations:
|
|
issues.fatal(msg & " " & $count & " times")
|
|
|
|
return hasIssues(issues)
|
|
else:
|
|
return noIssues()
|
|
)
|
|
|
|
proc syncAggregateChecker(constPreset: string): LogVisitorFactory =
|
|
return proc (): LogVisitor =
|
|
var totalBlocks = 0
|
|
var minSyncAggregate = 512
|
|
var syncAggregatesCombinedSize = 0
|
|
|
|
let minExpectedAggregateSize = if constPreset == "mainnet":
|
|
450
|
|
else:
|
|
20
|
|
|
|
LogVisitor(
|
|
visitLine: proc (msgLabel, line: string) =
|
|
if msgLabel == "Block sent":
|
|
let msg = try:
|
|
Json.decode(line, BlockSentMessage, allowUnknownFields = true)
|
|
except SerializationError as err:
|
|
echo "Failure to parse a 'Block sent' message:"
|
|
echo err.formatMsg("<msg>")
|
|
quit 1
|
|
|
|
let syncAggregateSize = msg.blck.sync_committee_participants
|
|
if syncAggregateSize != -1:
|
|
inc totalBlocks
|
|
syncAggregatesCombinedSize += syncAggregateSize
|
|
if minSyncAggregate > syncAggregateSize:
|
|
minSyncAggregate = syncAggregateSize
|
|
,
|
|
produceReport: proc (): FileReport =
|
|
let avgSyncAggregateSize = syncAggregatesCombinedSize div totalBlocks
|
|
if avgSyncAggregateSize < minExpectedAggregateSize:
|
|
let issues = IssuesGroup.new "SyncAggregate Stats"
|
|
|
|
issues.fatal("Minimal sync aggregate size: " & $minSyncAggregate)
|
|
issues.fatal("Average sync aggregate size: " & $avgSyncAggregateSize)
|
|
|
|
return hasIssues(issues)
|
|
else:
|
|
return noIssues()
|
|
)
|
|
|
|
proc readLogFileForSecondMessages(file: string, ignoreErrors = true,
|
|
dumpErrors = false): seq[LogMessage] =
|
|
var stream = newFileStream(file)
|
|
var line: string
|
|
var counter = 0
|
|
try:
|
|
while not (stream.atEnd()):
|
|
var m: LogMessage
|
|
line = stream.readLine()
|
|
inc(counter)
|
|
try:
|
|
m = Json.decode(line, LogMessage, allowUnknownFields = true)
|
|
except SerializationError as exc:
|
|
if dumpErrors:
|
|
error "Serialization error while reading file, ignoring", file = file,
|
|
line_number = counter, errorMsg = exc.formatMsg(line)
|
|
else:
|
|
error "Serialization error while reading file, ignoring", file = file,
|
|
line_number = counter
|
|
if not(ignoreErrors):
|
|
raise exc
|
|
else:
|
|
continue
|
|
if m.msg == "onSecond task completed":
|
|
result.add(m)
|
|
|
|
if counter mod 10_000 == 0:
|
|
info "Processing file", file = extractFilename(file),
|
|
lines_processed = counter,
|
|
seconds_filtered = len(result)
|
|
except CatchableError as exc:
|
|
warn "Error reading data from file", file = file, errorMsg = exc.msg
|
|
finally:
|
|
stream.close()
|
|
|
|
proc filterGossipMessages(log: seq[JsonNode]): seq[GossipMessage] =
|
|
# Because of times.DateTime object we forced to turn off [ProveInit] warnings
|
|
# You can remove this pragmas when Nim compiler or times.nim will be fixed.
|
|
{.push warning[ProveInit]: off.}
|
|
result = newSeq[GossipMessage]()
|
|
{.pop.}
|
|
for node in log:
|
|
if ("msg" in node) and ("message_id" in node) and ("ts" in node):
|
|
let message = node["msg"].getStr()
|
|
if message == "Incoming pubsub message received":
|
|
let msg = GossipMessage.init(Incoming, node["message_id"].getStr(),
|
|
node["ts"].getStr())
|
|
result.add(msg)
|
|
elif message == "Outgoing pubsub message has been sent":
|
|
let msg = GossipMessage.init(Outgoing, node["message_id"].getStr(),
|
|
node["ts"].getStr())
|
|
result.add(msg)
|
|
|
|
iterator simDirectoryLogFiles(simdir: string): string =
|
|
let absPath = absolutePath(simdir)
|
|
let dataPath = absPath & DirSep & "data"
|
|
if not dirExists(dataPath):
|
|
error "Invalid `eth2_network_simulation` data directory structure",
|
|
path = dataPath
|
|
quit(1)
|
|
var index = 0
|
|
while true:
|
|
let path = dataPath & DirSep & "node-" & $index & DirSep
|
|
let simplePath = path & "beacon_node.log"
|
|
let bootPath = path & "bootstrap_node.log"
|
|
if fileExists(simplePath):
|
|
yield simplePath
|
|
elif fileExists(bootPath):
|
|
yield bootPath
|
|
else:
|
|
break
|
|
inc(index)
|
|
|
|
proc getDirectoryLogFiles(builddir: string,
|
|
filter: seq[string]): seq[NodeDirectory] =
|
|
var res = newSeq[NodeDirectory]()
|
|
let absPath = absolutePath(builddir)
|
|
let dataPath = absPath & DirSep & "data"
|
|
if not dirExists(dataPath):
|
|
error "Invalid `network` data directory structure",
|
|
path = dataPath
|
|
quit(1)
|
|
|
|
for dirPath in walkDirs(dataPath & DirSep & "*"):
|
|
let name = extractFilename(dirPath)
|
|
if (len(filter) == 0) or (name in filter):
|
|
var nodeDir = NodeDirectory(name: extractFilename(dirPath),
|
|
path: dirPath)
|
|
for filePath in walkFiles(dirPath & DirSep & "*.log"):
|
|
nodeDir.logs.add(extractFilename(filePath))
|
|
if len(nodeDir.logs) > 0:
|
|
res.add(nodeDir)
|
|
return res
|
|
|
|
proc getLogFiles(builddir: string,
|
|
filter: seq[string]): seq[NodeDirectory] =
|
|
var res = newSeq[NodeDirectory]()
|
|
let dataPath = absolutePath(builddir)
|
|
if not dirExists(dataPath):
|
|
error "Logs directory did not exist", path = dataPath
|
|
quit(1)
|
|
for filePath in walkFiles(dataPath & DirSep & "*.*"):
|
|
let name = extractFilename(filePath)
|
|
if (len(filter) == 0) or (name in filter):
|
|
let nodeDir = NodeDirectory(name: extractFilename(filePath),
|
|
path: dataPath,
|
|
logs: @[extractFilename(filePath)])
|
|
res.add(nodeDir)
|
|
return res
|
|
|
|
func getMessage(logs: seq[GossipMessage],
|
|
msg: GossipMessage): Option[GossipMessage] =
|
|
{.push warning[ProveInit]: off.}
|
|
result = none[GossipMessage]()
|
|
{.pop.}
|
|
for i in 0 ..< len(logs):
|
|
if logs[i].kind == Incoming and logs[i].id == msg.id:
|
|
{.push warning[ProveInit]: off.}
|
|
result = some(logs[i])
|
|
{.pop.}
|
|
|
|
proc runPubsub(logConf: LogTraceConf, logFiles: seq[string]) =
|
|
var logs = newSeq[tuple[name: string, data: seq[GossipMessage]]]()
|
|
|
|
if len(logFiles) < 2:
|
|
error "Number of log files insufficient to process pubsub messages",
|
|
logs_count = len(logFiles)
|
|
quit(1)
|
|
|
|
for item in logFiles:
|
|
let data = filterGossipMessages(readLogFile(item))
|
|
logs.add((name: item, data: data))
|
|
info "Loaded log file", logfile = item, lines_count = len(data)
|
|
|
|
{.push warning[ProveInit]: off.}
|
|
var checks = newSeq[Option[GossipMessage]](len(logs))
|
|
{.pop.}
|
|
|
|
var misses = 0
|
|
for i in 0 ..< len(logs):
|
|
info "Processing log file", logfile = logs[i].name
|
|
for k in 0 ..< len(logs[i].data):
|
|
let item = logs[i].data[k]
|
|
if item.kind == Outgoing:
|
|
info "Searching message", message_id = $item, logfile = logs[i].name
|
|
checks[i] = some(item)
|
|
for z in 1 ..< len(logs):
|
|
let index = (i + z) mod len(logs)
|
|
checks[index] = getMessage(logs[index].data, item)
|
|
|
|
for z in 1 ..< len(checks):
|
|
let index = (i + z) mod len(logs)
|
|
if not(checks[index].isSome()):
|
|
warn "Message not found in log", logfile = logs[index].name,
|
|
message_id = $item
|
|
inc(misses)
|
|
|
|
if misses == 0:
|
|
info "No missing messages found"
|
|
else:
|
|
warn "Number of missing messages found", count = $misses
|
|
|
|
proc runAttSend(logConf: LogTraceConf, logFiles: seq[string]) =
|
|
info "Check for late `attestation sent` messages"
|
|
if len(logFiles) < 1:
|
|
error "Number of log files insufficient to process pubsub messages",
|
|
logs_count = len(logFiles)
|
|
quit(1)
|
|
|
|
let minDuration = initDuration(seconds = 4)
|
|
var slotMessagesCount = 0
|
|
var attsMessagesCount = 0
|
|
var lateAttsMessagesCount = 0
|
|
|
|
for item in logFiles:
|
|
info "Processing log file", logFile = item
|
|
let data = readLogFileForAttsMessages(item,
|
|
logConf.ignoreSerializationErrors,
|
|
logConf.dumpSerializationErrors)
|
|
|
|
var currentSlot: Option[SlotStartMessage]
|
|
for item in data:
|
|
if item.kind == SMessageType.SlotStart:
|
|
currentSlot = some(item.ssmsg)
|
|
inc(slotMessagesCount)
|
|
elif item.kind == SMessageType.AttestationSent:
|
|
if currentSlot.isSome():
|
|
let attestationTime = currentSlot.get().timestamp -
|
|
item.asmsg.timestamp
|
|
if attestationTime > minDuration:
|
|
warn "Found an attestation that was sent later than necessary",
|
|
lateness = $attestationTime, slot = currentSlot.get(),
|
|
attestation = item.asmsg
|
|
inc(lateAttsMessagesCount)
|
|
inc(attsMessagesCount)
|
|
else:
|
|
warn "`Attestation sent` message appears before `Start slot` message",
|
|
attestation = item.asmsg
|
|
info "Check finished", attestation_sent_messages = attsMessagesCount,
|
|
slot_messages = slotMessagesCount,
|
|
late_attestation_messages = lateAttsMessagesCount
|
|
|
|
func toSimple(s: seq[string]): string =
|
|
"[" & s.mapIt("'" & it & "'").join(", ") & "]"
|
|
|
|
proc runAttSendReceive(logConf: LogTraceConf, nodes: seq[NodeDirectory]) =
|
|
info "Check for attestation sent/received messages"
|
|
if len(nodes) < 2:
|
|
error "Number of nodes' log files insufficient", nodes_count = len(nodes)
|
|
quit(1)
|
|
var srnodes = newSeq[SRANode]()
|
|
|
|
for node in nodes:
|
|
var srnode = SRANode(
|
|
directory: node,
|
|
sends: newSeq[AttestationSentMessage](),
|
|
recvs: newTable[string, AttestationReceivedMessage](),
|
|
aggSends: newSeq[AggregatedAttestationSentMessage](),
|
|
aggRecvs: newTable[string, AggregatedAttestationReceivedMessage]()
|
|
)
|
|
info "Processing node", node = node.name
|
|
for logfile in node.logs:
|
|
let path = node.path & DirSep & logfile
|
|
info "Processing node's logfile", node = node.name, logfile = path
|
|
readLogFileForASRMessages(path, srnode,
|
|
logConf.ignoreSerializationErrors,
|
|
logConf.dumpSerializationErrors)
|
|
srnodes.add(srnode)
|
|
|
|
if len(nodes) < 2:
|
|
error "Number of nodes' log files insufficient", nodes_count = len(nodes)
|
|
quit(1)
|
|
|
|
for i in 0 ..< len(srnodes):
|
|
var success = 0
|
|
var failed = 0
|
|
for item in srnodes[i].sends:
|
|
var k = (i + 1) mod len(srnodes)
|
|
var misses = newSeq[string]()
|
|
while k != i:
|
|
if item.attestation.signature notin srnodes[k].recvs:
|
|
misses.add(srnodes[k].directory.name)
|
|
k = (k + 1) mod len(srnodes)
|
|
|
|
if len(misses) == 0:
|
|
inc(success)
|
|
else:
|
|
inc(failed)
|
|
info "Attestation was not received", sender = srnodes[i].directory.name,
|
|
signature = item.attestation.signature,
|
|
receivers = misses.toSimple(), send_stamp = item.timestamp
|
|
|
|
info "Statistics for sender node", sender = srnodes[i].directory.name,
|
|
successful_broadcasts = success, failed_broadcasts = failed,
|
|
total_broadcasts = len(srnodes[i].sends)
|
|
|
|
proc runAggAttSendReceive(logConf: LogTraceConf, nodes: seq[NodeDirectory]) =
|
|
info "Check for aggregate attestation sent/received messages"
|
|
if len(nodes) < 2:
|
|
error "Number of nodes' log files insufficient", nodes_count = len(nodes)
|
|
quit(1)
|
|
var srnodes = newSeq[SRANode]()
|
|
|
|
for node in nodes:
|
|
var srnode = SRANode(
|
|
directory: node,
|
|
sends: newSeq[AttestationSentMessage](),
|
|
recvs: newTable[string, AttestationReceivedMessage](),
|
|
aggSends: newSeq[AggregatedAttestationSentMessage](),
|
|
aggRecvs: newTable[string, AggregatedAttestationReceivedMessage]()
|
|
)
|
|
info "Processing node", node = node.name
|
|
for logfile in node.logs:
|
|
let path = node.path & DirSep & logfile
|
|
info "Processing node's logfile", node = node.name, logfile = path
|
|
readLogFileForASRMessages(path, srnode,
|
|
logConf.ignoreSerializationErrors,
|
|
logConf.dumpSerializationErrors)
|
|
srnodes.add(srnode)
|
|
|
|
if len(nodes) < 2:
|
|
error "Number of nodes' log files insufficient", nodes_count = len(nodes)
|
|
quit(1)
|
|
|
|
for i in 0 ..< len(srnodes):
|
|
var success = 0
|
|
var failed = 0
|
|
for item in srnodes[i].aggSends:
|
|
var k = (i + 1) mod len(srnodes)
|
|
var misses = newSeq[string]()
|
|
while k != i:
|
|
if item.signature notin srnodes[k].aggRecvs:
|
|
misses.add(srnodes[k].directory.name)
|
|
k = (k + 1) mod len(srnodes)
|
|
|
|
if len(misses) == 0:
|
|
inc(success)
|
|
else:
|
|
inc(failed)
|
|
info "Aggregate attestation was not received",
|
|
sender = srnodes[i].directory.name,
|
|
signature = item.signature,
|
|
receivers = misses.toSimple(), send_stamp = item.timestamp
|
|
|
|
info "Statistics for sender node", sender = srnodes[i].directory.name,
|
|
successful_broadcasts = success, failed_broadcasts = failed,
|
|
total_broadcasts = len(srnodes[i].aggSends)
|
|
|
|
proc runSCMSendReceive(logConf: LogTraceConf, nodes: seq[NodeDirectory]) =
|
|
info "Check for Sync Committee Message sent/received messages"
|
|
if len(nodes) < 2:
|
|
error "Number of nodes' log files insufficient", nodes_count = len(nodes)
|
|
quit(1)
|
|
var srnodes = newSeq[SRSCNode]()
|
|
|
|
for node in nodes:
|
|
var srnode = SRSCNode(
|
|
directory: node,
|
|
sends: newSeq[SCMSentMessage](),
|
|
recvs: newTable[string, SCMReceivedMessage](),
|
|
contributionSends: newSeq[ContributionSentMessage](),
|
|
contributionRecvs: newTable[string, ContributionReceivedMessage]()
|
|
)
|
|
info "Processing node", node = node.name
|
|
for logfile in node.logs:
|
|
let path = node.path & DirSep & logfile
|
|
info "Processing node's logfile", node = node.name, logfile = path
|
|
readLogFileForSCMSRMessages(path, srnode,
|
|
logConf.ignoreSerializationErrors,
|
|
logConf.dumpSerializationErrors)
|
|
srnodes.add(srnode)
|
|
|
|
if len(nodes) < 2:
|
|
error "Number of nodes' log files insufficient", nodes_count = len(nodes)
|
|
quit(1)
|
|
|
|
for i in 0 ..< len(srnodes):
|
|
var success = 0
|
|
var failed = 0
|
|
for item in srnodes[i].sends:
|
|
var k = (i + 1) mod len(srnodes)
|
|
var misses = newSeq[string]()
|
|
while k != i:
|
|
if item.message.signature notin srnodes[k].recvs:
|
|
misses.add(srnodes[k].directory.name)
|
|
k = (k + 1) mod len(srnodes)
|
|
|
|
if len(misses) == 0:
|
|
inc(success)
|
|
else:
|
|
inc(failed)
|
|
info "Sync committee message was not received", sender = srnodes[i].directory.name,
|
|
signature = item.message.signature,
|
|
receivers = misses.toSimple(), send_stamp = item.timestamp
|
|
|
|
info "Statistics for sender node", sender = srnodes[i].directory.name,
|
|
successful_broadcasts = success, failed_broadcasts = failed,
|
|
total_broadcasts = len(srnodes[i].sends)
|
|
|
|
proc runContributionSendReceive(logConf: LogTraceConf, nodes: seq[NodeDirectory]) =
|
|
info "Check for contribution sent/received messages"
|
|
if len(nodes) < 2:
|
|
error "Number of nodes' log files insufficient", nodes_count = len(nodes)
|
|
quit(1)
|
|
var srnodes = newSeq[SRSCNode]()
|
|
|
|
for node in nodes:
|
|
var srnode = SRSCNode(
|
|
directory: node,
|
|
sends: newSeq[SCMSentMessage](),
|
|
recvs: newTable[string, SCMReceivedMessage](),
|
|
contributionSends: newSeq[ContributionSentMessage](),
|
|
contributionRecvs: newTable[string, ContributionReceivedMessage]()
|
|
)
|
|
info "Processing node", node = node.name
|
|
for logfile in node.logs:
|
|
let path = node.path & DirSep & logfile
|
|
info "Processing node's logfile", node = node.name, logfile = path
|
|
readLogFileForSCMSRMessages(path, srnode,
|
|
logConf.ignoreSerializationErrors,
|
|
logConf.dumpSerializationErrors)
|
|
srnodes.add(srnode)
|
|
|
|
if len(nodes) < 2:
|
|
error "Number of nodes' log files insufficient", nodes_count = len(nodes)
|
|
quit(1)
|
|
|
|
for i in 0 ..< len(srnodes):
|
|
var success = 0
|
|
var failed = 0
|
|
for item in srnodes[i].contributionSends:
|
|
var k = (i + 1) mod len(srnodes)
|
|
var misses = newSeq[string]()
|
|
while k != i:
|
|
if item.contribution.signature notin srnodes[k].contributionRecvs:
|
|
misses.add(srnodes[k].directory.name)
|
|
k = (k + 1) mod len(srnodes)
|
|
|
|
if len(misses) == 0:
|
|
inc(success)
|
|
else:
|
|
inc(failed)
|
|
info "Contribution was not received",
|
|
sender = srnodes[i].directory.name,
|
|
signature = item.contribution.signature,
|
|
receivers = misses.toSimple(), send_stamp = item.timestamp
|
|
|
|
info "Statistics for sender node", sender = srnodes[i].directory.name,
|
|
successful_broadcasts = success, failed_broadcasts = failed,
|
|
total_broadcasts = len(srnodes[i].contributionSends)
|
|
|
|
proc runLatencyCheck(logConf: LogTraceConf, logFiles: seq[string],
|
|
nodes: seq[NodeDirectory]) =
|
|
info "Check for async responsiveness"
|
|
if len(nodes) == 0 and len(logFiles) == 0:
|
|
error "Number of log files insufficient", nodes_count = len(nodes)
|
|
quit(1)
|
|
|
|
let allowedTime = int64(float(initDuration(seconds = 1).inMilliseconds()) *
|
|
logConf.allowedLag)
|
|
|
|
for logFile in logFiles:
|
|
info "Processing log file", logfile = logFile
|
|
let msgs = readLogFileForSecondMessages(logFile,
|
|
logConf.ignoreSerializationErrors,
|
|
logConf.dumpSerializationErrors)
|
|
var lastSecond: Option[LogMessage]
|
|
var minEntry: Option[LogMessage]
|
|
var maxEntry: Option[LogMessage]
|
|
var minTime: times.Duration = initDuration(days = 1)
|
|
var maxTime: times.Duration
|
|
var sumMilliseconds: int64
|
|
|
|
for item in msgs:
|
|
if lastSecond.isNone():
|
|
lastSecond = some(item)
|
|
else:
|
|
let time = item.timestamp - lastSecond.get().timestamp
|
|
let start_time = lastSecond.get().timestamp
|
|
let finish_time = item.timestamp
|
|
if time.inMilliseconds() > allowedTime:
|
|
info "Found time lag ",
|
|
start_time = start_time.format("yyyy-MM-dd HH:mm:ss'.'fff"),
|
|
finish_time = finish_time.format("yyyy-MM-dd HH:mm:ss'.'fff"),
|
|
lag_time = time
|
|
if time < minTime:
|
|
minTime = time
|
|
minEntry = some(item)
|
|
if time > maxTime:
|
|
maxTime = time
|
|
maxEntry = some(item)
|
|
sumMilliseconds += time.inMilliseconds()
|
|
lastSecond = some(item)
|
|
let avgTime = initDuration(milliseconds = sumMilliseconds div len(msgs))
|
|
info "Latency statistics", min_time = minTime, max_time = maxTime,
|
|
avg_time = avgTime, seconds_count = len(msgs)
|
|
|
|
proc run(conf: LogTraceConf) =
|
|
var logFiles: seq[string]
|
|
var logNodes: seq[NodeDirectory]
|
|
|
|
if len(conf.logFiles) > 0:
|
|
for item in conf.logFiles:
|
|
let absPath = absolutePath(item)
|
|
if fileExists(absPath):
|
|
logFiles.add(absPath)
|
|
|
|
if len(conf.simDir) > 0:
|
|
for item in simDirectoryLogFiles(conf.simDir):
|
|
logFiles.add(item)
|
|
logNodes = getDirectoryLogFiles(conf.simDir, conf.nodes)
|
|
|
|
if len(conf.netDir) > 0:
|
|
logNodes = getDirectoryLogFiles(conf.netDir, conf.nodes)
|
|
|
|
if len(conf.logDir) > 0:
|
|
logNodes = getLogFiles(conf.logDir, conf.nodes)
|
|
|
|
if len(logFiles) == 0 and len(logNodes) == 0:
|
|
error "Log file sources not specified or not enough log files found"
|
|
quit(1)
|
|
|
|
case conf.cmd
|
|
of StartUpCommand.pubsub:
|
|
runPubsub(conf, logFiles)
|
|
of StartUpCommand.asl:
|
|
runAttSend(conf, logFiles)
|
|
of StartUpCommand.asr:
|
|
runAttSendReceive(conf, logNodes)
|
|
of StartUpCommand.aggasr:
|
|
runAggAttSendReceive(conf, logNodes)
|
|
of StartUpCommand.scmsr:
|
|
runSCMSendReceive(conf, logNodes)
|
|
of StartUpCommand.csr:
|
|
runContributionSendReceive(conf, logNodes)
|
|
of StartUpCommand.lat:
|
|
runLatencyCheck(conf, logFiles, logNodes)
|
|
of StartUpCommand.traceAll:
|
|
runContributionSendReceive(conf, logNodes)
|
|
runSCMSendReceive(conf, logNodes)
|
|
runAggAttSendReceive(conf, logNodes)
|
|
runAttSendReceive(conf, logNodes)
|
|
runLatencyCheck(conf, logFiles, logNodes)
|
|
runPubsub(conf, logFiles)
|
|
runAttSend(conf, logFiles)
|
|
of StartUpCommand.localSimChecks:
|
|
runAggAttSendReceive(conf, logNodes)
|
|
|
|
# TODO All analysis types can be converted to the more efficient
|
|
# LogVisitor style, so they can enabled together here.
|
|
# See the discussion below for some potential caveats:
|
|
# https://github.com/status-im/nimbus-eth2/pull/3583#pullrequestreview-941934055
|
|
|
|
var tracer = LogTracer()
|
|
tracer.enabledVisitors.add failedValidationsChecker()
|
|
tracer.enabledVisitors.add syncAggregateChecker(conf.constPreset)
|
|
|
|
var issuesDetected = false
|
|
|
|
for node in logNodes:
|
|
for logFile in node.logs:
|
|
let report = tracer.processFile(node.path / logFile)
|
|
if not report.isEmpty:
|
|
if not issuesDetected:
|
|
issuesDetected = true
|
|
echo "# Logtrace Report"
|
|
echo ""
|
|
echo "## ", logFile
|
|
echo ""
|
|
print report
|
|
|
|
quit ord(issuesDetected)
|
|
|
|
when isMainModule:
|
|
echo LogTraceHeader
|
|
var conf = LogTraceConf.load(version = LogTraceVersion)
|
|
run(conf)
|