This reverts commit ac12af16bf
.
This commit is contained in:
parent
a07dab300c
commit
1220cc05ad
|
@ -4,8 +4,7 @@
|
||||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||||
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||||
import confutils, json, times, streams, os, strutils, options, chronicles,
|
import confutils, json, times, streams, os, strutils, options, chronicles
|
||||||
tables, sequtils
|
|
||||||
import json_serialization
|
import json_serialization
|
||||||
|
|
||||||
const
|
const
|
||||||
|
@ -23,7 +22,7 @@ const
|
||||||
|
|
||||||
type
|
type
|
||||||
StartUpCommand* {.pure.} = enum
|
StartUpCommand* {.pure.} = enum
|
||||||
pubsub, asl, asr
|
pubsub, attest
|
||||||
|
|
||||||
LogTraceConf* = object
|
LogTraceConf* = object
|
||||||
logFiles* {.
|
logFiles* {.
|
||||||
|
@ -36,31 +35,15 @@ type
|
||||||
name: "sim-dir",
|
name: "sim-dir",
|
||||||
defaultValue: "" }: string
|
defaultValue: "" }: string
|
||||||
|
|
||||||
netDir* {.
|
|
||||||
desc: "Specifies path to network build directory",
|
|
||||||
name: "net-dir",
|
|
||||||
defaultValue: "" }: string
|
|
||||||
|
|
||||||
nodes* {.
|
|
||||||
desc: "Specifies node names which logs will be used",
|
|
||||||
name: "nodes" }: seq[string]
|
|
||||||
|
|
||||||
case cmd* {.command.}: StartUpCommand
|
case cmd* {.command.}: StartUpCommand
|
||||||
of pubsub:
|
of pubsub:
|
||||||
discard
|
discard
|
||||||
of asl:
|
of attest:
|
||||||
discard
|
|
||||||
of asr:
|
|
||||||
discard
|
discard
|
||||||
|
|
||||||
GossipDirection* = enum
|
GossipDirection* = enum
|
||||||
None, Incoming, Outgoing
|
None, Incoming, Outgoing
|
||||||
|
|
||||||
NodeDirectory* = object
|
|
||||||
name*: string
|
|
||||||
path*: string
|
|
||||||
logs*: seq[string]
|
|
||||||
|
|
||||||
LogMessage* = object of RootObj
|
LogMessage* = object of RootObj
|
||||||
level* {.serializedFieldName: "lvl" .}: string
|
level* {.serializedFieldName: "lvl" .}: string
|
||||||
timestamp* {.serializedFieldName: "ts" .}: DateTime
|
timestamp* {.serializedFieldName: "ts" .}: DateTime
|
||||||
|
@ -99,24 +82,15 @@ type
|
||||||
indexInCommittee*: uint64
|
indexInCommittee*: uint64
|
||||||
validator*: string
|
validator*: string
|
||||||
|
|
||||||
AttestationReceivedMessage* = object of LogMessage
|
|
||||||
attestation*: AttestationObject
|
|
||||||
head*: string
|
|
||||||
wallSlot*: uint64
|
|
||||||
pcs*: string
|
|
||||||
|
|
||||||
GossipMessage* = object
|
GossipMessage* = object
|
||||||
kind*: GossipDirection
|
kind*: GossipDirection
|
||||||
id*: string
|
id*: string
|
||||||
datetime*: DateTime
|
datetime*: DateTime
|
||||||
processed*: bool
|
processed*: bool
|
||||||
|
|
||||||
SaMessageType* {.pure.} = enum
|
SaMessageType* = enum
|
||||||
AttestationSent, SlotStart
|
AttestationSent, SlotStart
|
||||||
|
|
||||||
SRMessageType* {.pure.} = enum
|
|
||||||
AttestationSent, AttestationReceived
|
|
||||||
|
|
||||||
SlotAttMessage* = object
|
SlotAttMessage* = object
|
||||||
case kind*: SaMessageType
|
case kind*: SaMessageType
|
||||||
of SaMessageType.AttestationSent:
|
of SaMessageType.AttestationSent:
|
||||||
|
@ -124,18 +98,6 @@ type
|
||||||
of SaMessageType.SlotStart:
|
of SaMessageType.SlotStart:
|
||||||
ssmsg*: SlotStartMessage
|
ssmsg*: SlotStartMessage
|
||||||
|
|
||||||
SRAttMessage* = object
|
|
||||||
case kind*: SRMessageType
|
|
||||||
of SRMessageType.AttestationSent:
|
|
||||||
asmsg*: AttestationSentMessage
|
|
||||||
of SRMessageType.AttestationReceived:
|
|
||||||
armsg*: AttestationReceivedMessage
|
|
||||||
|
|
||||||
SRANode* = object
|
|
||||||
directory*: NodeDirectory
|
|
||||||
sends*: seq[AttestationSentMessage]
|
|
||||||
recvs*: TableRef[string, AttestationReceivedMessage]
|
|
||||||
|
|
||||||
proc readValue*(reader: var JsonReader, value: var DateTime) =
|
proc readValue*(reader: var JsonReader, value: var DateTime) =
|
||||||
let s = reader.readValue(string)
|
let s = reader.readValue(string)
|
||||||
try:
|
try:
|
||||||
|
@ -189,9 +151,8 @@ proc readLogFileForAttsMessages(file: string): seq[SlotAttMessage] =
|
||||||
res.add(m)
|
res.add(m)
|
||||||
inc(counter)
|
inc(counter)
|
||||||
if counter mod 10_000 == 0:
|
if counter mod 10_000 == 0:
|
||||||
info "Processing file", file = extractFilename(file),
|
info "Processing file", file = file, lines_processed = counter,
|
||||||
lines_processed = counter,
|
lines_filtered = len(res)
|
||||||
lines_filtered = len(res)
|
|
||||||
result = res
|
result = res
|
||||||
|
|
||||||
except SerializationError as exc:
|
except SerializationError as exc:
|
||||||
|
@ -202,37 +163,6 @@ proc readLogFileForAttsMessages(file: string): seq[SlotAttMessage] =
|
||||||
finally:
|
finally:
|
||||||
stream.close()
|
stream.close()
|
||||||
|
|
||||||
proc readLogFileForASRMessages(file: string,
|
|
||||||
srnode: var SRANode) =
|
|
||||||
var stream = newFileStream(file)
|
|
||||||
var line: string
|
|
||||||
var counter = 0
|
|
||||||
try:
|
|
||||||
while not(stream.atEnd()):
|
|
||||||
line = stream.readLine()
|
|
||||||
let m = Json.decode(line, LogMessage, forwardCompatible = true)
|
|
||||||
if m.msg == "Attestation sent":
|
|
||||||
let sm = Json.decode(line, AttestationSentMessage,
|
|
||||||
forwardCompatible = true)
|
|
||||||
srnode.sends.add(sm)
|
|
||||||
elif m.msg == "Attestation received":
|
|
||||||
let rm = Json.decode(line, AttestationReceivedMessage,
|
|
||||||
forwardCompatible = true)
|
|
||||||
discard srnode.recvs.hasKeyOrPut(rm.attestation.signature, rm)
|
|
||||||
inc(counter)
|
|
||||||
if counter mod 10_000 == 0:
|
|
||||||
info "Processing file", file = extractFilename(file),
|
|
||||||
lines_processed = counter,
|
|
||||||
sends_filtered = len(srnode.sends),
|
|
||||||
recvs_filtered = len(srnode.recvs)
|
|
||||||
except SerializationError as exc:
|
|
||||||
error "Serialization error while reading data from file", file = file,
|
|
||||||
errorMsg = exc.formatMsg(line)
|
|
||||||
except CatchableError as exc:
|
|
||||||
warn "Error reading data from file", file = file, errorMsg = exc.msg
|
|
||||||
finally:
|
|
||||||
stream.close()
|
|
||||||
|
|
||||||
proc filterGossipMessages(log: seq[JsonNode]): seq[GossipMessage] =
|
proc filterGossipMessages(log: seq[JsonNode]): seq[GossipMessage] =
|
||||||
# Because of times.DateTime object we forced to turn off [ProveInit] warnings
|
# Because of times.DateTime object we forced to turn off [ProveInit] warnings
|
||||||
# You can remove this pragmas when Nim compiler or times.nim will be fixed.
|
# You can remove this pragmas when Nim compiler or times.nim will be fixed.
|
||||||
|
@ -271,27 +201,6 @@ iterator simDirectoryLogFiles*(simdir: string): string =
|
||||||
break
|
break
|
||||||
inc(index)
|
inc(index)
|
||||||
|
|
||||||
proc getDirectoryLogFiles*(builddir: string,
|
|
||||||
filter: seq[string]): seq[NodeDirectory] =
|
|
||||||
var res = newSeq[NodeDirectory]()
|
|
||||||
let absPath = absolutePath(builddir)
|
|
||||||
let dataPath = absPath & DirSep & "data"
|
|
||||||
if not dirExists(dataPath):
|
|
||||||
error "Invalid `network` data directory structure",
|
|
||||||
path = dataPath
|
|
||||||
quit(1)
|
|
||||||
|
|
||||||
for dirPath in walkDirs(dataPath & DirSep & "*"):
|
|
||||||
let name = extractFilename(dirPath)
|
|
||||||
if (len(filter) == 0) or (name in filter):
|
|
||||||
var nodeDir = NodeDirectory(name: extractFilename(dirPath),
|
|
||||||
path: dirPath)
|
|
||||||
for filePath in walkFiles(dirPath & DirSep & "*.log"):
|
|
||||||
nodeDir.logs.add(extractFilename(filePath))
|
|
||||||
if len(nodeDir.logs) > 0:
|
|
||||||
res.add(nodeDir)
|
|
||||||
return res
|
|
||||||
|
|
||||||
proc getMessage(logs: seq[GossipMessage],
|
proc getMessage(logs: seq[GossipMessage],
|
||||||
msg: GossipMessage): Option[GossipMessage] =
|
msg: GossipMessage): Option[GossipMessage] =
|
||||||
{.push warning[ProveInit]: off.}
|
{.push warning[ProveInit]: off.}
|
||||||
|
@ -382,59 +291,8 @@ proc runAttSend(logConf: LogTraceConf, logFiles: seq[string]) =
|
||||||
slot_messages = slotMessagesCount,
|
slot_messages = slotMessagesCount,
|
||||||
late_attestation_messages = lateAttsMessagesCount
|
late_attestation_messages = lateAttsMessagesCount
|
||||||
|
|
||||||
proc toSimple*(s: seq[string]): string =
|
|
||||||
result = "[" & s.mapIt("'" & it & "'").join(", ") & "]"
|
|
||||||
|
|
||||||
proc runAttSendReceive(logConf: LogTraceConf, nodes: seq[NodeDirectory]) =
|
|
||||||
info "Check for attestations send/receive messages"
|
|
||||||
if len(nodes) < 2:
|
|
||||||
error "Number of nodes' log files are not enough", nodes_count = len(nodes)
|
|
||||||
quit(1)
|
|
||||||
var srnodes = newSeq[SRANode]()
|
|
||||||
|
|
||||||
for node in nodes:
|
|
||||||
var srnode = SRANode(
|
|
||||||
directory: node,
|
|
||||||
sends: newSeq[AttestationSentMessage](),
|
|
||||||
recvs: newTable[string, AttestationReceivedMessage]()
|
|
||||||
)
|
|
||||||
info "Processing node", node = node.name
|
|
||||||
for logfile in node.logs:
|
|
||||||
let path = node.path & DirSep & logfile
|
|
||||||
info "Processing node's logfile", node = node.name, logfile = path
|
|
||||||
readLogFileForASRMessages(path, srnode)
|
|
||||||
srnodes.add(srnode)
|
|
||||||
|
|
||||||
if len(nodes) < 2:
|
|
||||||
error "Number of nodes' log files are not enough", nodes_count = len(nodes)
|
|
||||||
quit(1)
|
|
||||||
|
|
||||||
for i in 0 ..< len(srnodes):
|
|
||||||
var success = 0
|
|
||||||
var failed = 0
|
|
||||||
for item in srnodes[i].sends:
|
|
||||||
var k = (i + 1) mod len(srnodes)
|
|
||||||
var misses = newSeq[string]()
|
|
||||||
while k != i:
|
|
||||||
if item.attestation.signature notin srnodes[k].recvs:
|
|
||||||
misses.add(srnodes[k].directory.name)
|
|
||||||
k = (k + 1) mod len(srnodes)
|
|
||||||
|
|
||||||
if len(misses) == 0:
|
|
||||||
inc(success)
|
|
||||||
else:
|
|
||||||
inc(failed)
|
|
||||||
info "Attestation was not received", sender = srnodes[i].directory.name,
|
|
||||||
signature = item.attestation.signature,
|
|
||||||
receivers = misses.toSimple(), send_stamp = item.timestamp
|
|
||||||
|
|
||||||
info "Statistics for sender node", sender = srnodes[i].directory.name,
|
|
||||||
sucessfull_broadcasts = success, failed_broadcasts = failed,
|
|
||||||
total_broadcasts = len(srnodes[i].sends)
|
|
||||||
|
|
||||||
proc run(conf: LogTraceConf) =
|
proc run(conf: LogTraceConf) =
|
||||||
var logFiles: seq[string]
|
var logFiles: seq[string]
|
||||||
var logNodes: seq[NodeDirectory]
|
|
||||||
|
|
||||||
if len(conf.logFiles) > 0:
|
if len(conf.logFiles) > 0:
|
||||||
for item in conf.logFiles:
|
for item in conf.logFiles:
|
||||||
|
@ -445,21 +303,15 @@ proc run(conf: LogTraceConf) =
|
||||||
if len(conf.simDir) > 0:
|
if len(conf.simDir) > 0:
|
||||||
for item in simDirectoryLogFiles(conf.simDir):
|
for item in simDirectoryLogFiles(conf.simDir):
|
||||||
logFiles.add(item)
|
logFiles.add(item)
|
||||||
logNodes = getDirectoryLogFiles(conf.simDir, conf.nodes)
|
|
||||||
|
|
||||||
if len(conf.netDir) > 0:
|
if len(logFiles) == 0:
|
||||||
logNodes = getDirectoryLogFiles(conf.netDir, conf.nodes)
|
|
||||||
|
|
||||||
if len(logFiles) == 0 and len(logNodes) == 0:
|
|
||||||
error "Log file sources not specified or not enough log files found"
|
error "Log file sources not specified or not enough log files found"
|
||||||
quit(1)
|
quit(1)
|
||||||
|
|
||||||
if conf.cmd == StartUpCommand.pubsub:
|
if conf.cmd == StartUpCommand.pubsub:
|
||||||
runPubsub(conf, logFiles)
|
runPubsub(conf, logFiles)
|
||||||
elif conf.cmd == StartUpCommand.asl:
|
elif conf.cmd == StartUpCommand.attest:
|
||||||
runAttSend(conf, logFiles)
|
runAttSend(conf, logFiles)
|
||||||
elif conf.cmd == StartUpCommand.asr:
|
|
||||||
runAttSendReceive(conf, logNodes)
|
|
||||||
|
|
||||||
when isMainModule:
|
when isMainModule:
|
||||||
echo LogTraceHeader
|
echo LogTraceHeader
|
||||||
|
|
Loading…
Reference in New Issue