nimbus-eth2/ncli/logtrace.nim
tersec 61825f4979
add aggregated attestation tracing to logtrace and enable it in Jenkins (#2766)
* add aggregated attestation tracing to logtrace and enable it in Jenkins CI

* use a slightly less cryptic acronym than aasr

* mostly, nimbus and the eth2 spec use aggregate attestation, not aggregated attestation
2021-08-06 21:25:30 +00:00

696 lines
24 KiB
Nim

# beacon_chain
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import confutils, json, times, streams, os, strutils, options, chronicles,
tables, sequtils
import json_serialization
const
LogTraceName = "Beacon-Chain LogTrace Tool"
LogTraceMajor: int = 0
LogTraceMinor: int = 0
LogTracePatch: int = 4
LogTraceVersion = $LogTraceMajor & "." & $LogTraceMinor & "." &
$LogTracePatch
LogTraceCopyright = "Copyright(C) 2021" &
" Status Research & Development GmbH"
LogTraceHeader = LogTraceName & ", Version " & LogTraceVersion &
" [" & hostOS & ": " & hostCPU & "]\r\n" &
LogTraceCopyright & "\r\n"
type
StartUpCommand {.pure.} = enum
pubsub, asl, asr, aggasr, lat
LogTraceConf = object
logFiles {.
desc: "Specifies one or more log files",
abbr: "f",
name: "log-file" }: seq[string]
simDir {.
desc: "Specifies path to eth2_network_simulation directory",
name: "sim-dir",
defaultValue: "" }: string
netDir {.
desc: "Specifies path to network build directory",
name: "net-dir",
defaultValue: "" }: string
logDir {.
desc: "Specifies path with bunch of logs",
name: "log-dir",
defaultValue: "" }: string
ignoreSerializationErrors {.
desc: "Ignore serialization errors while parsing log files",
name: "ignore-errors",
defaultValue: true }: bool
dumpSerializationErrors {.
desc: "Dump full serialization errors while parsing log files",
name: "dump-errors",
defaultValue: false }: bool
nodes {.
desc: "Specifies node names which logs will be used",
name: "nodes" }: seq[string]
allowedLag {.
desc: "Allowed latency lag multiplier",
name: "lag",
defaultValue: 2.0 }: float
case cmd {.command.}: StartUpCommand
of pubsub:
discard
of asl:
discard
of asr:
discard
of aggasr:
discard
of lat:
discard
GossipDirection = enum
None, Incoming, Outgoing
NodeDirectory = object
name: string
path: string
logs: seq[string]
LogMessage = object of RootObj
level {.serializedFieldName: "lvl" .}: string
timestamp {.serializedFieldName: "ts" .}: DateTime
msg: string
topics: string
tid: int
SlotStartMessage = object of LogMessage
beaconTime: uint64
finalizedEpoch: uint64
finalizedRoot: string
finalizedSlot: uint64
headEpoch: uint64
headRoot: string
headSlot: uint64
lastSlot: uint64
peers: uint64
scheduledSlot: uint64
AttestationDataObject = object
slot: uint64
index: uint64
beaconBlockRoot {.serializedFieldName: "beacon_block_root".}: string
sourceEpoch {.serializedFieldName: "source_epoch".}: uint64
sourceRoot {.serializedFieldName: "source_root".}: string
targetEpoch {.serializedFieldName: "target_epoch".}: uint64
targetRoot {.serializedFieldName: "target_root".}: string
AttestationObject = object
aggregationBits {.serializedFieldName: "aggregation_bits".}: string
data: AttestationDataObject
signature: string
AttestationSentMessage = object of LogMessage
attestation: AttestationObject
indexInCommittee: uint64
validator: string
AttestationReceivedMessage = object of LogMessage
attestation: AttestationObject
head: string
wallSlot: uint64
pcs: string
AggregatedAttestationSentMessage = object of LogMessage
attestation: AttestationObject
validator: string
signature: string
aggregationSlot: uint64
AggregatedAttestationReceivedMessage = object of LogMessage
aggregate: AttestationObject
wallSlot: uint64
signature: string
GossipMessage = object
kind: GossipDirection
id: string
datetime: DateTime
processed: bool
SaMessageType {.pure.} = enum
AttestationSent, SlotStart
SlotAttMessage = object
case kind: SaMessageType
of SaMessageType.AttestationSent:
asmsg: AttestationSentMessage
of SaMessageType.SlotStart:
ssmsg: SlotStartMessage
SRANode = object
directory: NodeDirectory
sends: seq[AttestationSentMessage]
recvs: TableRef[string, AttestationReceivedMessage]
aggSends: seq[AggregatedAttestationSentMessage]
aggRecvs: TableRef[string, AggregatedAttestationReceivedMessage]
proc readValue*(reader: var JsonReader, value: var DateTime) =
let s = reader.readValue(string)
try:
value = parse(s, "YYYY-MM-dd HH:mm:ss'.'fffzzz", utc())
except CatchableError:
raiseUnexpectedValue(reader, "Invalid date time")
proc init(t: typedesc[GossipMessage], kind: GossipDirection, id,
datestr: string): GossipMessage =
GossipMessage(
kind: kind,
id: id,
datetime: parse(datestr, "YYYY-MM-dd HH:mm:ss'.'fffzzz")
)
proc `$`*(msg: GossipMessage): string =
result = msg.id
proc readLogFile(file: string): seq[JsonNode] =
var res = newSeq[JsonNode]()
var stream = newFileStream(file)
try:
while not(stream.atEnd()):
var line = stream.readLine()
let node = parseJson(line)
res.add(node)
result = res
except CatchableError as exc:
warn "Error reading JSON data from file", file = file,
errorMsg = exc.msg
finally:
stream.close()
proc readLogFileForAttsMessages(file: string,
ignoreErrors = true,
dumpErrors = false): seq[SlotAttMessage] =
var res = newSeq[SlotAttMessage]()
var stream = newFileStream(file)
var line: string
var counter = 0
try:
while not(stream.atEnd()):
line = stream.readLine()
inc(counter)
var m: LogMessage
try:
m = Json.decode(line, LogMessage, allowUnknownFields = true)
except SerializationError as exc:
if dumpErrors:
error "Serialization error while reading file, ignoring", file = file,
line_number = counter, errorMsg = exc.formatMsg(line)
else:
error "Serialization error while reading file, ignoring", file = file,
line_number = counter
if not(ignoreErrors):
raise exc
else:
continue
if m.msg == "Attestation sent":
let am = Json.decode(line, AttestationSentMessage,
allowUnknownFields = true)
let m = SlotAttMessage(kind: SaMessageType.AttestationSent,
asmsg: am)
res.add(m)
elif m.msg == "Slot start":
let sm = Json.decode(line, SlotStartMessage,
allowUnknownFields = true)
let m = SlotAttMessage(kind: SaMessageType.SlotStart,
ssmsg: sm)
res.add(m)
if counter mod 10_000 == 0:
info "Processing file", file = extractFilename(file),
lines_processed = counter,
lines_filtered = len(res)
result = res
except CatchableError as exc:
warn "Error reading data from file", file = file, errorMsg = exc.msg
finally:
stream.close()
proc readLogFileForASRMessages(file: string, srnode: var SRANode,
ignoreErrors = true, dumpErrors = false) =
var stream = newFileStream(file)
var line: string
var counter = 0
try:
while not(stream.atEnd()):
var m: LogMessage
line = stream.readLine()
inc(counter)
try:
m = Json.decode(line, LogMessage, allowUnknownFields = true)
except SerializationError as exc:
if dumpErrors:
error "Serialization error while reading file, ignoring", file = file,
line_number = counter, errorMsg = exc.formatMsg(line)
else:
error "Serialization error while reading file, ignoring", file = file,
line_number = counter
if not(ignoreErrors):
raise exc
else:
continue
if m.msg == "Attestation sent":
let sm = Json.decode(line, AttestationSentMessage,
allowUnknownFields = true)
srnode.sends.add(sm)
elif m.msg == "Attestation received":
let rm = Json.decode(line, AttestationReceivedMessage,
allowUnknownFields = true)
discard srnode.recvs.hasKeyOrPut(rm.attestation.signature, rm)
elif m.msg == "Aggregate received":
let rm = Json.decode(line, AggregatedAttestationReceivedMessage,
allowUnknownFields = true)
discard srnode.aggRecvs.hasKeyOrPut(rm.signature, rm)
elif m.msg == "Aggregated attestation sent":
let sm = Json.decode(line, AggregatedAttestationSentMessage,
allowUnknownFields = true)
srnode.aggSends.add(sm)
if counter mod 10_000 == 0:
info "Processing file", file = extractFilename(file),
lines_processed = counter,
sends_filtered = len(srnode.sends),
recvs_filtered = len(srnode.recvs)
except CatchableError as exc:
warn "Error reading data from file", file = file, errorMsg = exc.msg
finally:
stream.close()
proc readLogFileForSecondMessages(file: string, ignoreErrors = true,
dumpErrors = false): seq[LogMessage] =
var stream = newFileStream(file)
var line: string
var counter = 0
try:
while not (stream.atEnd()):
var m: LogMessage
line = stream.readLine()
inc(counter)
try:
m = Json.decode(line, LogMessage, allowUnknownFields = true)
except SerializationError as exc:
if dumpErrors:
error "Serialization error while reading file, ignoring", file = file,
line_number = counter, errorMsg = exc.formatMsg(line)
else:
error "Serialization error while reading file, ignoring", file = file,
line_number = counter
if not(ignoreErrors):
raise exc
else:
continue
if m.msg == "onSecond task completed":
result.add(m)
if counter mod 10_000 == 0:
info "Processing file", file = extractFilename(file),
lines_processed = counter,
seconds_filtered = len(result)
except CatchableError as exc:
warn "Error reading data from file", file = file, errorMsg = exc.msg
finally:
stream.close()
proc filterGossipMessages(log: seq[JsonNode]): seq[GossipMessage] =
# Because of times.DateTime object we forced to turn off [ProveInit] warnings
# You can remove this pragmas when Nim compiler or times.nim will be fixed.
{.push warning[ProveInit]: off.}
result = newSeq[GossipMessage]()
{.pop.}
for node in log:
if ("msg" in node) and ("message_id" in node) and ("ts" in node):
let message = node["msg"].getStr()
if message == "Incoming pubsub message received":
let msg = GossipMessage.init(Incoming, node["message_id"].getStr(),
node["ts"].getStr())
result.add(msg)
elif message == "Outgoing pubsub message has been sent":
let msg = GossipMessage.init(Outgoing, node["message_id"].getStr(),
node["ts"].getStr())
result.add(msg)
iterator simDirectoryLogFiles*(simdir: string): string =
let absPath = absolutePath(simdir)
let dataPath = absPath & DirSep & "data"
if not dirExists(dataPath):
error "Invalid `eth2_network_simulation` data directory structure",
path = dataPath
quit(1)
var index = 0
while true:
let path = dataPath & DirSep & "node-" & $index & DirSep
let simplePath = path & "beacon_node.log"
let bootPath = path & "bootstrap_node.log"
if fileExists(simplePath):
yield simplePath
elif fileExists(bootPath):
yield bootPath
else:
break
inc(index)
proc getDirectoryLogFiles*(builddir: string,
filter: seq[string]): seq[NodeDirectory] =
var res = newSeq[NodeDirectory]()
let absPath = absolutePath(builddir)
let dataPath = absPath & DirSep & "data"
if not dirExists(dataPath):
error "Invalid `network` data directory structure",
path = dataPath
quit(1)
for dirPath in walkDirs(dataPath & DirSep & "*"):
let name = extractFilename(dirPath)
if (len(filter) == 0) or (name in filter):
var nodeDir = NodeDirectory(name: extractFilename(dirPath),
path: dirPath)
for filePath in walkFiles(dirPath & DirSep & "*.log"):
nodeDir.logs.add(extractFilename(filePath))
if len(nodeDir.logs) > 0:
res.add(nodeDir)
return res
proc getLogFiles*(builddir: string,
filter: seq[string]): seq[NodeDirectory] =
var res = newSeq[NodeDirectory]()
let dataPath = absolutePath(builddir)
if not dirExists(dataPath):
error "Logs directory did not exist", path = dataPath
quit(1)
for filePath in walkFiles(dataPath & DirSep & "*.*"):
let name = extractFilename(filePath)
if (len(filter) == 0) or (name in filter):
let nodeDir = NodeDirectory(name: extractFilename(filePath),
path: dataPath,
logs: @[extractFilename(filePath)])
res.add(nodeDir)
return res
proc getMessage(logs: seq[GossipMessage],
msg: GossipMessage): Option[GossipMessage] =
{.push warning[ProveInit]: off.}
result = none[GossipMessage]()
{.pop.}
for i in 0 ..< len(logs):
if logs[i].kind == Incoming and logs[i].id == msg.id:
{.push warning[ProveInit]: off.}
result = some(logs[i])
{.pop.}
proc runPubsub(logConf: LogTraceConf, logFiles: seq[string]) =
var logs = newSeq[tuple[name: string, data: seq[GossipMessage]]]()
if len(logFiles) < 2:
error "Number of log files insufficient to process pubsub messages",
logs_count = len(logFiles)
quit(1)
for item in logFiles:
let data = filterGossipMessages(readLogFile(item))
logs.add((name: item, data: data))
info "Loaded log file", logfile = item, lines_count = len(data)
{.push warning[ProveInit]: off.}
var checks = newSeq[Option[GossipMessage]](len(logs))
{.pop.}
var misses = 0
for i in 0 ..< len(logs):
info "Processing log file", logfile = logs[i].name
for k in 0 ..< len(logs[i].data):
let item = logs[i].data[k]
if item.kind == Outgoing:
info "Searching message", message_id = $item, logfile = logs[i].name
checks[i] = some(item)
for z in 1 ..< len(logs):
let index = (i + z) mod len(logs)
checks[index] = getMessage(logs[index].data, item)
for z in 1 ..< len(checks):
let index = (i + z) mod len(logs)
if not(checks[index].isSome()):
warn "Message not found in log", logfile = logs[index].name,
message_id = $item
inc(misses)
if misses == 0:
info "No missing messages found"
else:
warn "Number of missing messages found", count = $misses
proc runAttSend(logConf: LogTraceConf, logFiles: seq[string]) =
info "Check for late `attestation sent` messages"
if len(logFiles) < 1:
error "Number of log files insufficient to process pubsub messages",
logs_count = len(logFiles)
quit(1)
let minDuration = initDuration(seconds = 4)
var slotMessagesCount = 0
var attsMessagesCount = 0
var lateAttsMessagesCount = 0
for item in logFiles:
info "Processing log file", logFile = item
let data = readLogFileForAttsMessages(item,
logConf.ignoreSerializationErrors,
logConf.dumpSerializationErrors)
var currentSlot: Option[SlotStartMessage]
for item in data:
if item.kind == SaMessageType.SlotStart:
currentSlot = some(item.ssmsg)
inc(slotMessagesCount)
elif item.kind == SaMessageType.AttestationSent:
if currentSlot.isSome():
let attestationTime = currentSlot.get().timestamp -
item.asmsg.timestamp
if attestationTime > minDuration:
warn "Found an attestation that was sent later than necessary",
lateness = $attestationTime, slot = currentSlot.get(),
attestation = item.asmsg
inc(lateAttsMessagesCount)
inc(attsMessagesCount)
else:
warn "`Attestation sent` message appears before `Start slot` message",
attestation = item.asmsg
info "Check finished", attestation_sent_messages = attsMessagesCount,
slot_messages = slotMessagesCount,
late_attestation_messages = lateAttsMessagesCount
proc toSimple*(s: seq[string]): string =
result = "[" & s.mapIt("'" & it & "'").join(", ") & "]"
proc runAttSendReceive(logConf: LogTraceConf, nodes: seq[NodeDirectory]) =
info "Check for attestation sent/received messages"
if len(nodes) < 2:
error "Number of nodes' log files insufficient", nodes_count = len(nodes)
quit(1)
var srnodes = newSeq[SRANode]()
for node in nodes:
var srnode = SRANode(
directory: node,
sends: newSeq[AttestationSentMessage](),
recvs: newTable[string, AttestationReceivedMessage](),
aggSends: newSeq[AggregatedAttestationSentMessage](),
aggRecvs: newTable[string, AggregatedAttestationReceivedMessage]()
)
info "Processing node", node = node.name
for logfile in node.logs:
let path = node.path & DirSep & logfile
info "Processing node's logfile", node = node.name, logfile = path
readLogFileForASRMessages(path, srnode,
logConf.ignoreSerializationErrors,
logConf.dumpSerializationErrors)
srnodes.add(srnode)
if len(nodes) < 2:
error "Number of nodes' log files insufficient", nodes_count = len(nodes)
quit(1)
for i in 0 ..< len(srnodes):
var success = 0
var failed = 0
for item in srnodes[i].sends:
var k = (i + 1) mod len(srnodes)
var misses = newSeq[string]()
while k != i:
if item.attestation.signature notin srnodes[k].recvs:
misses.add(srnodes[k].directory.name)
k = (k + 1) mod len(srnodes)
if len(misses) == 0:
inc(success)
else:
inc(failed)
info "Attestation was not received", sender = srnodes[i].directory.name,
signature = item.attestation.signature,
receivers = misses.toSimple(), send_stamp = item.timestamp
info "Statistics for sender node", sender = srnodes[i].directory.name,
successful_broadcasts = success, failed_broadcasts = failed,
total_broadcasts = len(srnodes[i].sends)
proc runAggAttSendReceive(logConf: LogTraceConf, nodes: seq[NodeDirectory]) =
info "Check for aggregate attestation sent/received messages"
if len(nodes) < 2:
error "Number of nodes' log files insufficient", nodes_count = len(nodes)
quit(1)
var srnodes = newSeq[SRANode]()
for node in nodes:
var srnode = SRANode(
directory: node,
sends: newSeq[AttestationSentMessage](),
recvs: newTable[string, AttestationReceivedMessage](),
aggSends: newSeq[AggregatedAttestationSentMessage](),
aggRecvs: newTable[string, AggregatedAttestationReceivedMessage]()
)
info "Processing node", node = node.name
for logfile in node.logs:
let path = node.path & DirSep & logfile
info "Processing node's logfile", node = node.name, logfile = path
readLogFileForASRMessages(path, srnode,
logConf.ignoreSerializationErrors,
logConf.dumpSerializationErrors)
srnodes.add(srnode)
if len(nodes) < 2:
error "Number of nodes' log files insufficient", nodes_count = len(nodes)
quit(1)
for i in 0 ..< len(srnodes):
var success = 0
var failed = 0
for item in srnodes[i].aggSends:
var k = (i + 1) mod len(srnodes)
var misses = newSeq[string]()
while k != i:
if item.signature notin srnodes[k].aggRecvs:
misses.add(srnodes[k].directory.name)
k = (k + 1) mod len(srnodes)
if len(misses) == 0:
inc(success)
else:
inc(failed)
info "Aggregate attestation was not received",
sender = srnodes[i].directory.name,
signature = item.signature,
receivers = misses.toSimple(), send_stamp = item.timestamp
info "Statistics for sender node", sender = srnodes[i].directory.name,
successful_broadcasts = success, failed_broadcasts = failed,
total_broadcasts = len(srnodes[i].aggSends)
proc runLatencyCheck(logConf: LogTraceConf, logFiles: seq[string],
nodes: seq[NodeDirectory]) =
info "Check for async responsiveness"
if len(nodes) == 0 and len(logFiles) == 0:
error "Number of log files insufficient", nodes_count = len(nodes)
quit(1)
let allowedTime = int64(float(initDuration(seconds = 1).inMilliseconds()) *
logConf.allowedLag)
for logFile in logFiles:
info "Processing log file", logfile = logFile
let msgs = readLogFileForSecondMessages(logFile,
logConf.ignoreSerializationErrors,
logConf.dumpSerializationErrors)
var lastSecond: Option[LogMessage]
var minEntry: Option[LogMessage]
var maxEntry: Option[LogMessage]
var minTime: times.Duration = initDuration(days = 1)
var maxTime: times.Duration
var sumMilliseconds: int64
for item in msgs:
if lastSecond.isNone():
lastSecond = some(item)
else:
let time = item.timestamp - lastSecond.get().timestamp
let start_time = lastSecond.get().timestamp
let finish_time = item.timestamp
if time.inMilliseconds() > allowedTime:
info "Found time lag ",
start_time = start_time.format("yyyy-MM-dd HH:mm:ss'.'fff"),
finish_time = finish_time.format("yyyy-MM-dd HH:mm:ss'.'fff"),
lag_time = time
if time < minTime:
minTime = time
minEntry = some(item)
if time > maxTime:
maxTime = time
maxEntry = some(item)
sumMilliseconds += time.inMilliseconds()
lastSecond = some(item)
let avgTime = initDuration(milliseconds = sumMilliseconds div len(msgs))
info "Latency statistics", min_time = minTime, max_time = maxTime,
avg_time = avgTime, seconds_count = len(msgs)
proc run(conf: LogTraceConf) =
var logFiles: seq[string]
var logNodes: seq[NodeDirectory]
if len(conf.logFiles) > 0:
for item in conf.logFiles:
let absPath = absolutePath(item)
if fileExists(absPath):
logFiles.add(absPath)
if len(conf.simDir) > 0:
for item in simDirectoryLogFiles(conf.simDir):
logFiles.add(item)
logNodes = getDirectoryLogFiles(conf.simDir, conf.nodes)
if len(conf.netDir) > 0:
logNodes = getDirectoryLogFiles(conf.netDir, conf.nodes)
if len(conf.logDir) > 0:
logNodes = getLogFiles(conf.logDir, conf.nodes)
if len(logFiles) == 0 and len(logNodes) == 0:
error "Log file sources not specified or not enough log files found"
quit(1)
case conf.cmd
of StartUpCommand.pubsub:
runPubsub(conf, logFiles)
of StartUpCommand.asl:
runAttSend(conf, logFiles)
of StartUpCommand.asr:
runAttSendReceive(conf, logNodes)
of StartUpCommand.aggasr:
runAggAttSendReceive(conf, logNodes)
of StartUpCommand.lat:
runLatencyCheck(conf, logFiles, logNodes)
when isMainModule:
echo LogTraceHeader
var conf = LogTraceConf.load(version = LogTraceVersion)
run(conf)