Implement log tracing of attestation send and receive messages. (#1361)

* Implement log tracing of attestation send and receive messages.

* Cosmetic changes.
This commit is contained in:
Eugene Kabanov 2020-07-28 20:06:03 +03:00 committed by GitHub
parent b801dd65b9
commit ac12af16bf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 156 additions and 8 deletions

View File

@ -4,7 +4,8 @@
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import confutils, json, times, streams, os, strutils, options, chronicles
import confutils, json, times, streams, os, strutils, options, chronicles,
tables, sequtils
import json_serialization
const
@ -22,7 +23,7 @@ const
type
StartUpCommand* {.pure.} = enum
pubsub, attest
pubsub, asl, asr
LogTraceConf* = object
logFiles* {.
@ -35,15 +36,31 @@ type
name: "sim-dir",
defaultValue: "" }: string
netDir* {.
desc: "Specifies path to network build directory",
name: "net-dir",
defaultValue: "" }: string
nodes* {.
desc: "Specifies node names which logs will be used",
name: "nodes" }: seq[string]
case cmd* {.command.}: StartUpCommand
of pubsub:
discard
of attest:
of asl:
discard
of asr:
discard
GossipDirection* = enum
None, Incoming, Outgoing
NodeDirectory* = object
name*: string
path*: string
logs*: seq[string]
LogMessage* = object of RootObj
level* {.serializedFieldName: "lvl" .}: string
timestamp* {.serializedFieldName: "ts" .}: DateTime
@ -82,15 +99,24 @@ type
indexInCommittee*: uint64
validator*: string
AttestationReceivedMessage* = object of LogMessage
attestation*: AttestationObject
head*: string
wallSlot*: uint64
pcs*: string
GossipMessage* = object
kind*: GossipDirection
id*: string
datetime*: DateTime
processed*: bool
SaMessageType* = enum
SaMessageType* {.pure.} = enum
AttestationSent, SlotStart
SRMessageType* {.pure.} = enum
AttestationSent, AttestationReceived
SlotAttMessage* = object
case kind*: SaMessageType
of SaMessageType.AttestationSent:
@ -98,6 +124,18 @@ type
of SaMessageType.SlotStart:
ssmsg*: SlotStartMessage
SRAttMessage* = object
case kind*: SRMessageType
of SRMessageType.AttestationSent:
asmsg*: AttestationSentMessage
of SRMessageType.AttestationReceived:
armsg*: AttestationReceivedMessage
SRANode* = object
directory*: NodeDirectory
sends*: seq[AttestationSentMessage]
recvs*: TableRef[string, AttestationReceivedMessage]
proc readValue*(reader: var JsonReader, value: var DateTime) =
let s = reader.readValue(string)
try:
@ -151,8 +189,9 @@ proc readLogFileForAttsMessages(file: string): seq[SlotAttMessage] =
res.add(m)
inc(counter)
if counter mod 10_000 == 0:
info "Processing file", file = file, lines_processed = counter,
lines_filtered = len(res)
info "Processing file", file = extractFilename(file),
lines_processed = counter,
lines_filtered = len(res)
result = res
except SerializationError as exc:
@ -163,6 +202,37 @@ proc readLogFileForAttsMessages(file: string): seq[SlotAttMessage] =
finally:
stream.close()
proc readLogFileForASRMessages(file: string,
srnode: var SRANode) =
var stream = newFileStream(file)
var line: string
var counter = 0
try:
while not(stream.atEnd()):
line = stream.readLine()
let m = Json.decode(line, LogMessage, forwardCompatible = true)
if m.msg == "Attestation sent":
let sm = Json.decode(line, AttestationSentMessage,
forwardCompatible = true)
srnode.sends.add(sm)
elif m.msg == "Attestation received":
let rm = Json.decode(line, AttestationReceivedMessage,
forwardCompatible = true)
discard srnode.recvs.hasKeyOrPut(rm.attestation.signature, rm)
inc(counter)
if counter mod 10_000 == 0:
info "Processing file", file = extractFilename(file),
lines_processed = counter,
sends_filtered = len(srnode.sends),
recvs_filtered = len(srnode.recvs)
except SerializationError as exc:
error "Serialization error while reading data from file", file = file,
errorMsg = exc.formatMsg(line)
except CatchableError as exc:
warn "Error reading data from file", file = file, errorMsg = exc.msg
finally:
stream.close()
proc filterGossipMessages(log: seq[JsonNode]): seq[GossipMessage] =
# Because of times.DateTime object we forced to turn off [ProveInit] warnings
# You can remove this pragmas when Nim compiler or times.nim will be fixed.
@ -201,6 +271,27 @@ iterator simDirectoryLogFiles*(simdir: string): string =
break
inc(index)
proc getDirectoryLogFiles*(builddir: string,
filter: seq[string]): seq[NodeDirectory] =
var res = newSeq[NodeDirectory]()
let absPath = absolutePath(builddir)
let dataPath = absPath & DirSep & "data"
if not dirExists(dataPath):
error "Invalid `network` data directory structure",
path = dataPath
quit(1)
for dirPath in walkDirs(dataPath & DirSep & "*"):
let name = extractFilename(dirPath)
if (len(filter) == 0) or (name in filter):
var nodeDir = NodeDirectory(name: extractFilename(dirPath),
path: dirPath)
for filePath in walkFiles(dirPath & DirSep & "*.log"):
nodeDir.logs.add(extractFilename(filePath))
if len(nodeDir.logs) > 0:
res.add(nodeDir)
return res
proc getMessage(logs: seq[GossipMessage],
msg: GossipMessage): Option[GossipMessage] =
{.push warning[ProveInit]: off.}
@ -291,8 +382,59 @@ proc runAttSend(logConf: LogTraceConf, logFiles: seq[string]) =
slot_messages = slotMessagesCount,
late_attestation_messages = lateAttsMessagesCount
proc toSimple*(s: seq[string]): string =
result = "[" & s.mapIt("'" & it & "'").join(", ") & "]"
proc runAttSendReceive(logConf: LogTraceConf, nodes: seq[NodeDirectory]) =
info "Check for attestations send/receive messages"
if len(nodes) < 2:
error "Number of nodes' log files are not enough", nodes_count = len(nodes)
quit(1)
var srnodes = newSeq[SRANode]()
for node in nodes:
var srnode = SRANode(
directory: node,
sends: newSeq[AttestationSentMessage](),
recvs: newTable[string, AttestationReceivedMessage]()
)
info "Processing node", node = node.name
for logfile in node.logs:
let path = node.path & DirSep & logfile
info "Processing node's logfile", node = node.name, logfile = path
readLogFileForASRMessages(path, srnode)
srnodes.add(srnode)
if len(nodes) < 2:
error "Number of nodes' log files are not enough", nodes_count = len(nodes)
quit(1)
for i in 0 ..< len(srnodes):
var success = 0
var failed = 0
for item in srnodes[i].sends:
var k = (i + 1) mod len(srnodes)
var misses = newSeq[string]()
while k != i:
if item.attestation.signature notin srnodes[k].recvs:
misses.add(srnodes[k].directory.name)
k = (k + 1) mod len(srnodes)
if len(misses) == 0:
inc(success)
else:
inc(failed)
info "Attestation was not received", sender = srnodes[i].directory.name,
signature = item.attestation.signature,
receivers = misses.toSimple(), send_stamp = item.timestamp
info "Statistics for sender node", sender = srnodes[i].directory.name,
sucessfull_broadcasts = success, failed_broadcasts = failed,
total_broadcasts = len(srnodes[i].sends)
proc run(conf: LogTraceConf) =
var logFiles: seq[string]
var logNodes: seq[NodeDirectory]
if len(conf.logFiles) > 0:
for item in conf.logFiles:
@ -303,15 +445,21 @@ proc run(conf: LogTraceConf) =
if len(conf.simDir) > 0:
for item in simDirectoryLogFiles(conf.simDir):
logFiles.add(item)
logNodes = getDirectoryLogFiles(conf.simDir, conf.nodes)
if len(logFiles) == 0:
if len(conf.netDir) > 0:
logNodes = getDirectoryLogFiles(conf.netDir, conf.nodes)
if len(logFiles) == 0 and len(logNodes) == 0:
error "Log file sources not specified or not enough log files found"
quit(1)
if conf.cmd == StartUpCommand.pubsub:
runPubsub(conf, logFiles)
elif conf.cmd == StartUpCommand.attest:
elif conf.cmd == StartUpCommand.asl:
runAttSend(conf, logFiles)
elif conf.cmd == StartUpCommand.asr:
runAttSendReceive(conf, logNodes)
when isMainModule:
echo LogTraceHeader