Add check for late attestation sent messages in logtrace tool.

This commit is contained in:
cheatfate 2020-07-01 10:06:05 +03:00 committed by zah
parent 26b7202af3
commit 76c12d493e
2 changed files with 135 additions and 2 deletions

View File

@ -5,6 +5,7 @@
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms. # at your option. This file may not be copied, modified, or distributed except according to those terms.
import confutils, json, times, streams, os, strutils, options, chronicles import confutils, json, times, streams, os, strutils, options, chronicles
import json_serialization
const const
LogTraceName* = "Beacon-Chain LogTrace Tool" LogTraceName* = "Beacon-Chain LogTrace Tool"
@ -21,7 +22,7 @@ const
type type
StartUpCommand* {.pure.} = enum StartUpCommand* {.pure.} = enum
pubsub pubsub, attest
LogTraceConf* = object LogTraceConf* = object
logFiles* {. logFiles* {.
@ -37,16 +38,73 @@ type
case cmd* {.command.}: StartUpCommand case cmd* {.command.}: StartUpCommand
of pubsub: of pubsub:
discard discard
of attest:
discard
GossipDirection* = enum GossipDirection* = enum
None, Incoming, Outgoing None, Incoming, Outgoing
LogMessage* = object of RootObj
level* {.serializedFieldName: "lvl" .}: string
timestamp* {.serializedFieldName: "ts" .}: DateTime
msg*: string
topics*: string
tid*: int
SlotStartMessage* = object of LogMessage
beaconTime*: uint64
finalizedEpoch*: uint64
finalizedRoot*: string
finalizedSlot*: uint64
headEpoch*: uint64
headRoot*: string
headSlot*: uint64
lastSlot*: uint64
peers*: uint64
scheduledSlot*: uint64
AttestationDataObject* = object
slot*: uint64
index*: uint64
beaconBlockRoot* {.serializedFieldName: "beacon_block_root".}: string
sourceEpoch* {.serializedFieldName: "source_epoch".}: uint64
sourceRoot* {.serializedFieldName: "source_root".}: string
targetEpoch* {.serializedFieldName: "target_epoch".}: uint64
targetRoot* {.serializedFieldName: "target_root".}: string
AttestationObject* = object
aggregationBits* {.serializedFieldName: "aggregation_bits".}: string
data*: AttestationDataObject
signature*: string
AttestationSentMessage* = object of LogMessage
attestation*: AttestationObject
indexInCommittee*: uint64
validator*: string
GossipMessage* = object GossipMessage* = object
kind*: GossipDirection kind*: GossipDirection
id*: string id*: string
datetime*: DateTime datetime*: DateTime
processed*: bool processed*: bool
SaMessageType* = enum
AttestationSent, SlotStart
SlotAttMessage* = object
case kind*: SaMessageType
of SaMessageType.AttestationSent:
asmsg*: AttestationSentMessage
of SaMessageType.SlotStart:
ssmsg*: SlotStartMessage
proc readValue*(reader: var JsonReader, value: var DateTime) =
let s = reader.readValue(string)
try:
value = parse(s, "YYYY-MM-dd HH:mm:sszzz")
except CatchableError:
raiseUnexpectedValue(reader, "Invalid date time")
proc init(t: typedesc[GossipMessage], kind: GossipDirection, id, proc init(t: typedesc[GossipMessage], kind: GossipDirection, id,
datestr: string): GossipMessage = datestr: string): GossipMessage =
result = GossipMessage(kind: kind, id: id, result = GossipMessage(kind: kind, id: id,
@ -70,6 +128,41 @@ proc readLogFile(file: string): seq[JsonNode] =
finally: finally:
stream.close() stream.close()
proc readLogFileForAttsMessages(file: string): seq[SlotAttMessage] =
var res = newSeq[SlotAttMessage]()
var stream = newFileStream(file)
var line: string
var counter = 0
try:
while not(stream.atEnd()):
line = stream.readLine()
let m = Json.decode(line, LogMessage, forwardCompatible = true)
if m.msg == "Attestation sent":
let am = Json.decode(line, AttestationSentMessage,
forwardCompatible = true)
let m = SlotAttMessage(kind: SaMessageType.AttestationSent,
asmsg: am)
res.add(m)
elif m.msg == "Slot start":
let sm = Json.decode(line, SlotStartMessage,
forwardCompatible = true)
let m = SlotAttMessage(kind: SaMessageType.SlotStart,
ssmsg: sm)
res.add(m)
inc(counter)
if counter mod 10_000 == 0:
info "Processing file", file = file, lines_processed = counter,
lines_filtered = len(res)
result = res
except SerializationError as exc:
error "Serialization error while reading data from file", file = file,
errorMsg = exc.formatMsg(line)
except CatchableError as exc:
warn "Error reading data from file", file = file, errorMsg = exc.msg
finally:
stream.close()
proc filterGossipMessages(log: seq[JsonNode]): seq[GossipMessage] = proc filterGossipMessages(log: seq[JsonNode]): seq[GossipMessage] =
# Because of times.DateTime object we forced to turn off [ProveInit] warnings # Because of times.DateTime object we forced to turn off [ProveInit] warnings
# You can remove this pragmas when Nim compiler or times.nim will be fixed. # You can remove this pragmas when Nim compiler or times.nim will be fixed.
@ -160,6 +253,44 @@ proc runPubsub(logConf: LogTraceConf, logFiles: seq[string]) =
else: else:
warn "Number of missing messages found", count = $misses warn "Number of missing messages found", count = $misses
proc runAttSend(logConf: LogTraceConf, logFiles: seq[string]) =
info "Check for late `attestation sent` messages"
if len(logFiles) < 1:
error "Number of log files are not enough to process pubsub messages",
logs_count = len(logFiles)
quit(1)
let minDuration = initDuration(seconds = 4)
var slotMessagesCount = 0
var attsMessagesCount = 0
var lateAttsMessagesCount = 0
for item in logFiles:
info "Processing log file", logFile = item
let data = readLogFileForAttsMessages(item)
var currentSlot: Option[SlotStartMessage]
for item in data:
if item.kind == SaMessageType.SlotStart:
currentSlot = some(item.ssmsg)
inc(slotMessagesCount)
elif item.kind == SaMessageType.AttestationSent:
if currentSlot.isSome():
let attestationTime = currentSlot.get().timestamp -
item.asmsg.timestamp
if attestationTime > minDuration:
warn "Found an attestation that was sent later than necessary",
lateness = $attestationTime, slot = currentSlot.get(),
attestation = item.asmsg
inc(lateAttsMessagesCount)
inc(attsMessagesCount)
else:
warn "`Attestation sent` message appears before `Start slot` message",
attestation = item.asmsg
info "Check finished", attestation_sent_messages = attsMessagesCount,
slot_messages = slotMessagesCount,
late_attestation_messages = lateAttsMessagesCount
proc run(conf: LogTraceConf) = proc run(conf: LogTraceConf) =
var logFiles: seq[string] var logFiles: seq[string]
@ -179,6 +310,8 @@ proc run(conf: LogTraceConf) =
if conf.cmd == StartUpCommand.pubsub: if conf.cmd == StartUpCommand.pubsub:
runPubsub(conf, logFiles) runPubsub(conf, logFiles)
elif conf.cmd == StartUpCommand.attest:
runAttSend(conf, logFiles)
when isMainModule: when isMainModule:
echo LogTraceHeader echo LogTraceHeader

@ -1 +1 @@
Subproject commit 463a0cf1b79f5e586b537c0ce7518e2d1ee85984 Subproject commit 1cf51931f1037a2c44fa0912386273c01a0e0e42