# beacon_chain
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed and distributed under either of
#   * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
#   * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
import confutils, json, times, streams, os, strutils, options, chronicles,
       tables, sequtils
import json_serialization

const
  LogTraceName = "Beacon-Chain LogTrace Tool"
  LogTraceMajor: int = 0
  LogTraceMinor: int = 0
  LogTracePatch: int = 4
  LogTraceVersion = $LogTraceMajor & "." & $LogTraceMinor & "." &
                      $LogTracePatch
  LogTraceCopyright = "Copyright(C) 2021" &
                       " Status Research & Development GmbH"
  LogTraceHeader = LogTraceName & ", Version " & LogTraceVersion &
                    " [" & hostOS & ": " & hostCPU & "]\r\n" &
                    LogTraceCopyright & "\r\n"

type
  StartUpCommand {.pure.} = enum
    pubsub, asl, asr, aggasr, lat

  LogTraceConf = object
    logFiles {.
      desc: "Specifies one or more log files",
      abbr: "f",
      name: "log-file" }: seq[string]

    simDir {.
      desc: "Specifies path to eth2_network_simulation directory",
      name: "sim-dir",
      defaultValue: "" }: string

    netDir {.
      desc: "Specifies path to network build directory",
      name: "net-dir",
      defaultValue: "" }: string

    logDir {.
      desc: "Specifies path with bunch of logs",
      name: "log-dir",
      defaultValue: "" }: string

    ignoreSerializationErrors {.
      desc: "Ignore serialization errors while parsing log files",
      name: "ignore-errors",
      defaultValue: true }: bool

    dumpSerializationErrors {.
      desc: "Dump full serialization errors while parsing log files",
      name: "dump-errors",
      defaultValue: false }: bool

    nodes {.
      desc: "Specifies node names which logs will be used",
      name: "nodes" }: seq[string]

    allowedLag {.
      desc: "Allowed latency lag multiplier",
      name: "lag",
      defaultValue: 2.0 }: float

    case cmd {.command.}: StartUpCommand
    of pubsub:
      discard
    of asl:
      discard
    of asr:
      discard
    of aggasr:
      discard
    of lat:
      discard

  GossipDirection = enum
    None, Incoming, Outgoing

  NodeDirectory = object
    name: string
    path: string
    logs: seq[string]

  LogMessage = object of RootObj
    level {.serializedFieldName: "lvl" .}: string
    timestamp {.serializedFieldName: "ts" .}: DateTime
    msg: string
    topics: string
    tid: int

  SlotStartMessage = object of LogMessage
    beaconTime: uint64
    finalizedEpoch: uint64
    finalizedRoot: string
    finalizedSlot: uint64
    headEpoch: uint64
    headRoot: string
    headSlot: uint64
    lastSlot: uint64
    peers: uint64
    scheduledSlot: uint64

  AttestationDataObject = object
    slot: uint64
    index: uint64
    beaconBlockRoot {.serializedFieldName: "beacon_block_root".}: string
    sourceEpoch {.serializedFieldName: "source_epoch".}: uint64
    sourceRoot {.serializedFieldName: "source_root".}: string
    targetEpoch {.serializedFieldName: "target_epoch".}: uint64
    targetRoot {.serializedFieldName: "target_root".}: string

  AttestationObject = object
    aggregationBits {.serializedFieldName: "aggregation_bits".}: string
    data: AttestationDataObject
    signature: string

  AttestationSentMessage = object of LogMessage
    attestation: AttestationObject
    indexInCommittee: uint64
    validator: string

  AttestationReceivedMessage = object of LogMessage
    attestation: AttestationObject
    head: string
    wallSlot: uint64
    pcs: string

  AggregatedAttestationSentMessage = object of LogMessage
    attestation: AttestationObject
    validator: string
    signature: string
    aggregationSlot: uint64

  AggregatedAttestationReceivedMessage = object of LogMessage
    aggregate: AttestationObject
    wallSlot: uint64
    signature: string

  GossipMessage = object
    kind: GossipDirection
    id: string
    datetime: DateTime
    processed: bool

  SaMessageType {.pure.} = enum
    AttestationSent, SlotStart

  SlotAttMessage = object
    case kind: SaMessageType
    of SaMessageType.AttestationSent:
      asmsg: AttestationSentMessage
    of SaMessageType.SlotStart:
      ssmsg: SlotStartMessage

  SRANode = object
    directory: NodeDirectory
    sends: seq[AttestationSentMessage]
    recvs: TableRef[string, AttestationReceivedMessage]
    aggSends: seq[AggregatedAttestationSentMessage]
    aggRecvs: TableRef[string, AggregatedAttestationReceivedMessage]

proc readValue(reader: var JsonReader, value: var DateTime) =
  let s = reader.readValue(string)
  try:
    value = parse(s, "YYYY-MM-dd HH:mm:ss'.'fffzzz", utc())
  except CatchableError:
    raiseUnexpectedValue(reader, "Invalid date time")

proc init(t: typedesc[GossipMessage], kind: GossipDirection, id,
          datestr: string): GossipMessage =
  GossipMessage(
    kind: kind,
    id: id,
    datetime: parse(datestr, "YYYY-MM-dd HH:mm:ss'.'fffzzz")
  )

func `$`(msg: GossipMessage): string =
  msg.id

proc readLogFile(file: string): seq[JsonNode] =
  var res = newSeq[JsonNode]()
  var stream = newFileStream(file)
  try:
    while not(stream.atEnd()):
      var line = stream.readLine()
      let node = parseJson(line)
      res.add(node)
    result = res
  except CatchableError as exc:
    warn "Error reading JSON data from file", file = file,
         errorMsg = exc.msg
  finally:
    stream.close()

proc readLogFileForAttsMessages(file: string,
                                ignoreErrors = true,
                                dumpErrors = false): seq[SlotAttMessage] =
  var res = newSeq[SlotAttMessage]()
  var stream = newFileStream(file)
  var line: string
  var counter = 0
  try:
    while not(stream.atEnd()):
      line = stream.readLine()
      inc(counter)
      var m: LogMessage
      try:
        m = Json.decode(line, LogMessage, allowUnknownFields = true)
      except SerializationError as exc:
        if dumpErrors:
          error "Serialization error while reading file, ignoring", file = file,
                 line_number = counter, errorMsg = exc.formatMsg(line)
        else:
          error "Serialization error while reading file, ignoring", file = file,
                 line_number = counter
        if not(ignoreErrors):
          raise exc
        else:
          continue

      if m.msg == "Attestation sent":
        let am = Json.decode(line, AttestationSentMessage,
                             allowUnknownFields = true)
        let m = SlotAttMessage(kind: SaMessageType.AttestationSent,
                               asmsg: am)
        res.add(m)
      elif m.msg == "Slot start":
        let sm = Json.decode(line, SlotStartMessage,
                             allowUnknownFields = true)
        let m = SlotAttMessage(kind: SaMessageType.SlotStart,
                               ssmsg: sm)
        res.add(m)

      if counter mod 10_000 == 0:
        info "Processing file", file = extractFilename(file),
                                lines_processed = counter,
                                lines_filtered = len(res)
    result = res

  except CatchableError as exc:
    warn "Error reading data from file", file = file, errorMsg = exc.msg
  finally:
    stream.close()

proc readLogFileForASRMessages(file: string, srnode: var SRANode,
                               ignoreErrors = true, dumpErrors = false) =
  var stream = newFileStream(file)
  var line: string
  var counter = 0
  try:
    while not(stream.atEnd()):
      var m: LogMessage
      line = stream.readLine()
      inc(counter)
      try:
        m = Json.decode(line, LogMessage, allowUnknownFields = true)
      except SerializationError as exc:
        if dumpErrors:
          error "Serialization error while reading file, ignoring", file = file,
                 line_number = counter, errorMsg = exc.formatMsg(line)
        else:
          error "Serialization error while reading file, ignoring", file = file,
                 line_number = counter
        if not(ignoreErrors):
          raise exc
        else:
          continue

      if m.msg == "Attestation sent":
        let sm = Json.decode(line, AttestationSentMessage,
                             allowUnknownFields = true)
        srnode.sends.add(sm)
      elif m.msg == "Attestation received":
        let rm = Json.decode(line, AttestationReceivedMessage,
                             allowUnknownFields = true)
        discard srnode.recvs.hasKeyOrPut(rm.attestation.signature, rm)
      elif m.msg == "Aggregate received":
        let rm = Json.decode(line, AggregatedAttestationReceivedMessage,
                             allowUnknownFields = true)
        discard srnode.aggRecvs.hasKeyOrPut(rm.signature, rm)
      elif m.msg == "Aggregated attestation sent":
        let sm = Json.decode(line, AggregatedAttestationSentMessage,
                             allowUnknownFields = true)
        srnode.aggSends.add(sm)

      if counter mod 10_000 == 0:
        info "Processing file", file = extractFilename(file),
                                lines_processed = counter,
                                sends_filtered = len(srnode.sends),
                                recvs_filtered = len(srnode.recvs)

  except CatchableError as exc:
    warn "Error reading data from file", file = file, errorMsg = exc.msg
  finally:
    stream.close()

proc readLogFileForSecondMessages(file: string, ignoreErrors = true,
                                  dumpErrors = false): seq[LogMessage] =
  var stream = newFileStream(file)
  var line: string
  var counter = 0
  try:
    while not (stream.atEnd()):
      var m: LogMessage
      line = stream.readLine()
      inc(counter)
      try:
        m = Json.decode(line, LogMessage, allowUnknownFields = true)
      except SerializationError as exc:
        if dumpErrors:
          error "Serialization error while reading file, ignoring", file = file,
                 line_number = counter, errorMsg = exc.formatMsg(line)
        else:
          error "Serialization error while reading file, ignoring", file = file,
                 line_number = counter
        if not(ignoreErrors):
          raise exc
        else:
          continue
      if m.msg == "onSecond task completed":
        result.add(m)

      if counter mod 10_000 == 0:
        info "Processing file", file = extractFilename(file),
                                lines_processed = counter,
                                seconds_filtered = len(result)
  except CatchableError as exc:
    warn "Error reading data from file", file = file, errorMsg = exc.msg
  finally:
    stream.close()

proc filterGossipMessages(log: seq[JsonNode]): seq[GossipMessage] =
  # Because of times.DateTime object we forced to turn off [ProveInit] warnings
  # You can remove this pragmas when Nim compiler or times.nim will be fixed.
  {.push warning[ProveInit]: off.}
  result = newSeq[GossipMessage]()
  {.pop.}
  for node in log:
    if ("msg" in node) and ("message_id" in node) and ("ts" in node):
      let message = node["msg"].getStr()
      if message == "Incoming pubsub message received":
        let msg = GossipMessage.init(Incoming, node["message_id"].getStr(),
                                     node["ts"].getStr())
        result.add(msg)
      elif message == "Outgoing pubsub message has been sent":
        let msg = GossipMessage.init(Outgoing, node["message_id"].getStr(),
                                     node["ts"].getStr())
        result.add(msg)

iterator simDirectoryLogFiles(simdir: string): string =
  let absPath = absolutePath(simdir)
  let dataPath = absPath & DirSep & "data"
  if not dirExists(dataPath):
    error "Invalid `eth2_network_simulation` data directory structure",
          path = dataPath
    quit(1)
  var index = 0
  while true:
    let path = dataPath & DirSep & "node-" & $index & DirSep
    let simplePath = path & "beacon_node.log"
    let bootPath = path & "bootstrap_node.log"
    if fileExists(simplePath):
      yield simplePath
    elif fileExists(bootPath):
      yield bootPath
    else:
      break
    inc(index)

proc getDirectoryLogFiles(builddir: string,
                          filter: seq[string]): seq[NodeDirectory] =
  var res = newSeq[NodeDirectory]()
  let absPath = absolutePath(builddir)
  let dataPath = absPath & DirSep & "data"
  if not dirExists(dataPath):
    error "Invalid `network` data directory structure",
          path = dataPath
    quit(1)

  for dirPath in walkDirs(dataPath & DirSep & "*"):
    let name = extractFilename(dirPath)
    if (len(filter) == 0) or (name in filter):
      var nodeDir = NodeDirectory(name: extractFilename(dirPath),
                                  path: dirPath)
      for filePath in walkFiles(dirPath & DirSep & "*.log"):
        nodeDir.logs.add(extractFilename(filePath))
      if len(nodeDir.logs) > 0:
        res.add(nodeDir)
  return res

proc getLogFiles(builddir: string,
                 filter: seq[string]): seq[NodeDirectory] =
  var res = newSeq[NodeDirectory]()
  let dataPath = absolutePath(builddir)
  if not dirExists(dataPath):
    error "Logs directory did not exist", path = dataPath
    quit(1)
  for filePath in walkFiles(dataPath & DirSep & "*.*"):
    let name = extractFilename(filePath)
    if (len(filter) == 0) or (name in filter):
      let nodeDir = NodeDirectory(name: extractFilename(filePath),
                                  path: dataPath,
                                  logs: @[extractFilename(filePath)])
      res.add(nodeDir)
  return res

func getMessage(logs: seq[GossipMessage],
                msg: GossipMessage): Option[GossipMessage] =
  {.push warning[ProveInit]: off.}
  result = none[GossipMessage]()
  {.pop.}
  for i in 0 ..< len(logs):
    if logs[i].kind == Incoming and logs[i].id == msg.id:
      {.push warning[ProveInit]: off.}
      result = some(logs[i])
      {.pop.}

proc runPubsub(logConf: LogTraceConf, logFiles: seq[string]) =
  var logs = newSeq[tuple[name: string, data: seq[GossipMessage]]]()

  if len(logFiles) < 2:
    error "Number of log files insufficient to process pubsub messages",
          logs_count = len(logFiles)
    quit(1)

  for item in logFiles:
    let data = filterGossipMessages(readLogFile(item))
    logs.add((name: item, data: data))
    info "Loaded log file", logfile = item, lines_count = len(data)

  {.push warning[ProveInit]: off.}
  var checks = newSeq[Option[GossipMessage]](len(logs))
  {.pop.}

  var misses = 0
  for i in 0 ..< len(logs):
    info "Processing log file", logfile = logs[i].name
    for k in 0 ..< len(logs[i].data):
      let item = logs[i].data[k]
      if item.kind == Outgoing:
        info "Searching message", message_id = $item, logfile = logs[i].name
        checks[i] = some(item)
        for z in 1 ..< len(logs):
          let index = (i + z) mod len(logs)
          checks[index] = getMessage(logs[index].data, item)

        for z in 1 ..< len(checks):
          let index = (i + z) mod len(logs)
          if not(checks[index].isSome()):
            warn "Message not found in log", logfile = logs[index].name,
                                             message_id = $item
            inc(misses)

  if misses == 0:
    info "No missing messages found"
  else:
    warn "Number of missing messages found", count = $misses

proc runAttSend(logConf: LogTraceConf, logFiles: seq[string]) =
  info "Check for late `attestation sent` messages"
  if len(logFiles) < 1:
    error "Number of log files insufficient to process pubsub messages",
          logs_count = len(logFiles)
    quit(1)

  let minDuration = initDuration(seconds = 4)
  var slotMessagesCount = 0
  var attsMessagesCount = 0
  var lateAttsMessagesCount = 0

  for item in logFiles:
    info "Processing log file", logFile = item
    let data = readLogFileForAttsMessages(item,
                                          logConf.ignoreSerializationErrors,
                                          logConf.dumpSerializationErrors)

    var currentSlot: Option[SlotStartMessage]
    for item in data:
      if item.kind == SaMessageType.SlotStart:
        currentSlot = some(item.ssmsg)
        inc(slotMessagesCount)
      elif item.kind == SaMessageType.AttestationSent:
        if currentSlot.isSome():
          let attestationTime = currentSlot.get().timestamp -
                                item.asmsg.timestamp
          if attestationTime > minDuration:
            warn "Found an attestation that was sent later than necessary",
                 lateness = $attestationTime, slot = currentSlot.get(),
                 attestation = item.asmsg
            inc(lateAttsMessagesCount)
          inc(attsMessagesCount)
        else:
          warn "`Attestation sent` message appears before `Start slot` message",
               attestation = item.asmsg
  info "Check finished", attestation_sent_messages = attsMessagesCount,
                         slot_messages = slotMessagesCount,
                         late_attestation_messages = lateAttsMessagesCount

func toSimple(s: seq[string]): string =
  "[" & s.mapIt("'" & it & "'").join(", ") & "]"

proc runAttSendReceive(logConf: LogTraceConf, nodes: seq[NodeDirectory]) =
  info "Check for attestation sent/received messages"
  if len(nodes) < 2:
    error "Number of nodes' log files insufficient", nodes_count = len(nodes)
    quit(1)
  var srnodes = newSeq[SRANode]()

  for node in nodes:
    var srnode = SRANode(
      directory: node,
      sends: newSeq[AttestationSentMessage](),
      recvs: newTable[string, AttestationReceivedMessage](),
      aggSends: newSeq[AggregatedAttestationSentMessage](),
      aggRecvs: newTable[string, AggregatedAttestationReceivedMessage]()
    )
    info "Processing node", node = node.name
    for logfile in node.logs:
      let path = node.path & DirSep & logfile
      info "Processing node's logfile", node = node.name, logfile = path
      readLogFileForASRMessages(path, srnode,
                                logConf.ignoreSerializationErrors,
                                logConf.dumpSerializationErrors)
    srnodes.add(srnode)

  if len(nodes) < 2:
    error "Number of nodes' log files insufficient", nodes_count = len(nodes)
    quit(1)

  for i in 0 ..< len(srnodes):
    var success = 0
    var failed = 0
    for item in srnodes[i].sends:
      var k = (i + 1) mod len(srnodes)
      var misses = newSeq[string]()
      while k != i:
        if item.attestation.signature notin srnodes[k].recvs:
          misses.add(srnodes[k].directory.name)
        k = (k + 1) mod len(srnodes)

      if len(misses) == 0:
        inc(success)
      else:
        inc(failed)
        info "Attestation was not received", sender = srnodes[i].directory.name,
             signature = item.attestation.signature,
             receivers = misses.toSimple(), send_stamp = item.timestamp

    info "Statistics for sender node", sender = srnodes[i].directory.name,
         successful_broadcasts = success, failed_broadcasts = failed,
         total_broadcasts = len(srnodes[i].sends)

proc runAggAttSendReceive(logConf: LogTraceConf, nodes: seq[NodeDirectory]) =
  info "Check for aggregate attestation sent/received messages"
  if len(nodes) < 2:
    error "Number of nodes' log files insufficient", nodes_count = len(nodes)
    quit(1)
  var srnodes = newSeq[SRANode]()

  for node in nodes:
    var srnode = SRANode(
      directory: node,
      sends: newSeq[AttestationSentMessage](),
      recvs: newTable[string, AttestationReceivedMessage](),
      aggSends: newSeq[AggregatedAttestationSentMessage](),
      aggRecvs: newTable[string, AggregatedAttestationReceivedMessage]()
    )
    info "Processing node", node = node.name
    for logfile in node.logs:
      let path = node.path & DirSep & logfile
      info "Processing node's logfile", node = node.name, logfile = path
      readLogFileForASRMessages(path, srnode,
                                logConf.ignoreSerializationErrors,
                                logConf.dumpSerializationErrors)
    srnodes.add(srnode)

  if len(nodes) < 2:
    error "Number of nodes' log files insufficient", nodes_count = len(nodes)
    quit(1)

  for i in 0 ..< len(srnodes):
    var success = 0
    var failed = 0
    for item in srnodes[i].aggSends:
      var k = (i + 1) mod len(srnodes)
      var misses = newSeq[string]()
      while k != i:
        if item.signature notin srnodes[k].aggRecvs:
          misses.add(srnodes[k].directory.name)
        k = (k + 1) mod len(srnodes)

      if len(misses) == 0:
        inc(success)
      else:
        inc(failed)
        info "Aggregate attestation was not received",
           sender = srnodes[i].directory.name,
           signature = item.signature,
           receivers = misses.toSimple(), send_stamp = item.timestamp

    info "Statistics for sender node", sender = srnodes[i].directory.name,
       successful_broadcasts = success, failed_broadcasts = failed,
       total_broadcasts = len(srnodes[i].aggSends)

proc runLatencyCheck(logConf: LogTraceConf, logFiles: seq[string],
                     nodes: seq[NodeDirectory]) =
  info "Check for async responsiveness"
  if len(nodes) == 0 and len(logFiles) == 0:
    error "Number of log files insufficient", nodes_count = len(nodes)
    quit(1)

  let allowedTime = int64(float(initDuration(seconds = 1).inMilliseconds()) *
                          logConf.allowedLag)

  for logFile in logFiles:
    info "Processing log file", logfile = logFile
    let msgs = readLogFileForSecondMessages(logFile,
                                            logConf.ignoreSerializationErrors,
                                            logConf.dumpSerializationErrors)
    var lastSecond: Option[LogMessage]
    var minEntry: Option[LogMessage]
    var maxEntry: Option[LogMessage]
    var minTime: times.Duration = initDuration(days = 1)
    var maxTime: times.Duration
    var sumMilliseconds: int64

    for item in msgs:
      if lastSecond.isNone():
        lastSecond = some(item)
      else:
        let time = item.timestamp - lastSecond.get().timestamp
        let start_time = lastSecond.get().timestamp
        let finish_time = item.timestamp
        if time.inMilliseconds() > allowedTime:
          info "Found time lag ",
               start_time = start_time.format("yyyy-MM-dd HH:mm:ss'.'fff"),
               finish_time = finish_time.format("yyyy-MM-dd HH:mm:ss'.'fff"),
               lag_time = time
        if time < minTime:
          minTime = time
          minEntry = some(item)
        if time > maxTime:
          maxTime = time
          maxEntry = some(item)
        sumMilliseconds += time.inMilliseconds()
        lastSecond = some(item)
    let avgTime = initDuration(milliseconds = sumMilliseconds div len(msgs))
    info "Latency statistics", min_time = minTime, max_time = maxTime,
                               avg_time = avgTime, seconds_count = len(msgs)

proc run(conf: LogTraceConf) =
  var logFiles: seq[string]
  var logNodes: seq[NodeDirectory]

  if len(conf.logFiles) > 0:
    for item in conf.logFiles:
      let absPath = absolutePath(item)
      if fileExists(absPath):
        logFiles.add(absPath)

  if len(conf.simDir) > 0:
    for item in simDirectoryLogFiles(conf.simDir):
      logFiles.add(item)
    logNodes = getDirectoryLogFiles(conf.simDir, conf.nodes)

  if len(conf.netDir) > 0:
    logNodes = getDirectoryLogFiles(conf.netDir, conf.nodes)

  if len(conf.logDir) > 0:
    logNodes = getLogFiles(conf.logDir, conf.nodes)

  if len(logFiles) == 0 and len(logNodes) == 0:
    error "Log file sources not specified or not enough log files found"
    quit(1)

  case conf.cmd
  of StartUpCommand.pubsub:
    runPubsub(conf, logFiles)
  of StartUpCommand.asl:
    runAttSend(conf, logFiles)
  of StartUpCommand.asr:
    runAttSendReceive(conf, logNodes)
  of StartUpCommand.aggasr:
    runAggAttSendReceive(conf, logNodes)
  of StartUpCommand.lat:
    runLatencyCheck(conf, logFiles, logNodes)

when isMainModule:
  echo LogTraceHeader
  var conf = LogTraceConf.load(version = LogTraceVersion)
  run(conf)