nimbus-eth2/ncli/validator_db_aggregator.nim
Zahary Karadjov 47f1f7ff1a More efficient reward data persistance; Address review comments
The new format is based on compressed CSV files in two channels:

* Detailed per-epoch data
* Aggregated "daily" summaries

The use of append-only CSV file speeds up significantly the epoch
processing speed during data generation. The use of compression
results in smaller storage requirements overall. The use of the
aggregated files has a very minor cost in both CPU and storage,
but leads to near interactive speed for report generation.

Other changes:

- Implemented support for graceful shut downs to avoid corrupting
  the saved files.

- Fixed a memory leak caused by lacking `StateCache` clean up on each
  iteration.

- Addressed review comments

- Moved the rewards and penalties calculation code in a separate module

Required invasive changes to existing modules:

- The `data` field of the `KeyedBlockRef` type is made public to be used
  by the validator rewards monitor's Chain DAG update procedure.

- The `getForkedBlock` procedure from the `blockchain_dag.nim` module
  is made public to be used by the validator rewards monitor's Chain DAG
  update procedure.
2022-01-18 01:56:56 +02:00

237 lines
8.7 KiB
Nim

import
std/[os, strutils, streams, parsecsv],
stew/[io2, byteutils], chronicles, confutils, snappy,
../beacon_chain/spec/datatypes/base,
./ncli_common
when defined(posix):
import system/ansi_c
type
AggregatorConf = object
startEpoch {.
name: "start-epoch"
abbr: "s"
desc: "The first epoch which to be aggregated. " &
"By default use the first epoch for which has a file" .}: Option[uint64]
endEpoch {.
name: "end-epoch"
abbr: "e"
desc: "The last epoch which to be aggregated. " &
"By default use the last epoch for which has a file" .}: Option[uint64]
resolution {.
defaultValue: 225,
name: "resolution"
abbr: "r"
desc: "How many epochs to be aggregated in a single file" .}: uint
inputDir {.
name: "input-dir"
abbr: "i"
desc: "The directory with the epoch info files" .}: InputDir
outputDir {.
defaultValue: ""
name: "output-dir"
abbr: "o"
desc: "The directory where aggregated file to be written. " &
"By default use the same directory as the input one"}: InputDir
var shutDown = false
proc determineStartAndEndEpochs(config: AggregatorConf):
tuple[startEpoch, endEpoch: Epoch] =
if config.startEpoch.isNone or config.endEpoch.isNone:
(result.startEpoch, result.endEpoch) = getEpochRange(config.inputDir.string)
if config.startEpoch.isSome:
result.startEpoch = config.startEpoch.get.Epoch
if config.endEpoch.isSome:
result.endEpoch = config.endEpoch.get.Epoch
if result.startEpoch > result.endEpoch:
fatal "Start epoch cannot be bigger than the end epoch.",
startEpoch = result.startEpoch, endEpoch = result.endEpoch
quit QuitFailure
proc checkIntegrity(startEpoch, endEpoch: Epoch, dir: string) =
for epoch in startEpoch .. endEpoch:
let filePath = getFilePathForEpoch(epoch, dir)
if not filePath.fileExists:
fatal "File for epoch does not exist.", epoch = epoch, filePath = filePath
quit QuitFailure
proc parseRow(csvRow: CsvRow): RewardsAndPenalties =
result = RewardsAndPenalties(
source_outcome: parseBiggestInt(csvRow[0]),
max_source_reward: parseBiggestUInt(csvRow[1]),
target_outcome: parseBiggestInt(csvRow[2]),
max_target_reward: parseBiggestUInt(csvRow[3]),
head_outcome: parseBiggestInt(csvRow[4]),
max_head_reward: parseBiggestUInt(csvRow[5]),
inclusion_delay_outcome: parseBiggestInt(csvRow[6]),
max_inclusion_delay_reward: parseBiggestUInt(csvRow[7]),
sync_committee_outcome: parseBiggestInt(csvRow[8]),
max_sync_committee_reward: parseBiggestUInt(csvRow[9]),
proposer_outcome: parseBiggestInt(csvRow[10]),
inactivity_penalty: parseBiggestUInt(csvRow[11]),
slashing_outcome: parseBiggestInt(csvRow[12]),
deposits: parseBiggestUInt(csvRow[13]))
if csvRow[14].len > 0:
result.inclusion_delay = some(parseBiggestUInt(csvRow[14]))
proc `+=`(lhs: var RewardsAndPenalties, rhs: RewardsAndPenalties) =
lhs.source_outcome += rhs.source_outcome
lhs.max_source_reward += rhs.max_source_reward
lhs.target_outcome += rhs.target_outcome
lhs.max_target_reward += rhs.max_target_reward
lhs.head_outcome += rhs.head_outcome
lhs.max_head_reward += rhs.max_head_reward
lhs.inclusion_delay_outcome += rhs.inclusion_delay_outcome
lhs.max_inclusion_delay_reward += rhs.max_inclusion_delay_reward
lhs.sync_committee_outcome += rhs.sync_committee_outcome
lhs.max_sync_committee_reward += rhs.max_sync_committee_reward
lhs.proposer_outcome += rhs.proposer_outcome
lhs.inactivity_penalty += rhs.inactivity_penalty
lhs.slashing_outcome += rhs.slashing_outcome
lhs.deposits += rhs.deposits
if lhs.inclusion_delay.isSome:
if rhs.inclusion_delay.isSome:
lhs.inclusion_delay.get += rhs.inclusion_delay.get
else:
if rhs.inclusion_delay.isSome:
lhs.inclusion_delay = some(rhs.inclusion_delay.get)
proc average(rp: var RewardsAndPenalties,
averageInclusionDelay: var Option[float],
epochsCount: uint, inclusionDelaysCount: uint64) =
rp.source_outcome = rp.source_outcome div epochsCount.int64
rp.max_source_reward = rp.max_source_reward div epochsCount
rp.target_outcome = rp.target_outcome div epochsCount.int64
rp.max_target_reward = rp.max_target_reward div epochsCount
rp.head_outcome = rp.head_outcome div epochsCount.int64
rp.max_head_reward = rp.max_head_reward div epochsCount
rp.inclusion_delay_outcome = rp.inclusion_delay_outcome div epochsCount.int64
rp.max_inclusion_delay_reward = rp.max_inclusion_delay_reward div epochsCount
rp.sync_committee_outcome = rp.sync_committee_outcome div epochsCount.int64
rp.max_sync_committee_reward = rp.max_sync_committee_reward div epochsCount
rp.proposer_outcome = rp.proposer_outcome div epochsCount.int64
rp.inactivity_penalty = rp.inactivity_penalty div epochsCount
rp.slashing_outcome = rp.slashing_outcome div epochsCount.int64
if rp.inclusion_delay.isSome:
doAssert inclusionDelaysCount != 0
averageInclusionDelay = some(
rp.inclusion_delay.get.float / inclusionDelaysCount.float)
else:
doAssert inclusionDelaysCount == 0
averageInclusionDelay = none(float)
proc getFilePathForEpochs(startEpoch, endEpoch: Epoch, dir: string): string =
let fileName = epochAsString(startEpoch) & "_" &
epochAsString(endEpoch) & epochFileNameExtension
dir / fileName
proc aggregateEpochs(startEpoch, endEpoch: Epoch, resolution: uint,
inputDir, outputDir: string) =
if startEpoch > endEpoch:
fatal "Start epoch cannot be larger than the end one.",
startEpoch = startEpoch, endEpoch = endEpoch
quit QuitFailure
info "Aggregating epochs ...", startEpoch = startEpoch, endEpoch = endEpoch,
inputDir = inputDir, outputDir = outputDir
var rewardsAndPenalties: seq[RewardsAndPenalties]
var participationEpochsCount: seq[uint]
var inclusionDelaysCount: seq[uint]
var epochsAggregated = 0'u
for epoch in startEpoch .. endEpoch:
let filePath = getFilePathForEpoch(epoch, inputDir)
info "Processing file ...", file = filePath
let data = io2.readAllBytes(filePath)
doAssert data.isOk
let dataStream = newStringStream(
string.fromBytes(snappy.decode(
data.get.toOpenArray(0, data.get.len - 1))))
var csvParser: CsvParser
csvParser.open(dataStream, filePath)
var validatorsCount = 0'u
while csvParser.readRow:
inc validatorsCount
let rp = parseRow(csvParser.row)
if validatorsCount > participationEpochsCount.len.uint:
rewardsAndPenalties.add rp
participationEpochsCount.add 1
if rp.inclusionDelay.isSome:
inclusionDelaysCount.add 1
else:
inclusionDelaysCount.add 0
else:
rewardsAndPenalties[validatorsCount - 1] += rp
inc participationEpochsCount[validatorsCount - 1]
if rp.inclusionDelay.isSome:
inc inclusionDelaysCount[validatorsCount - 1]
inc epochsAggregated
if epochsAggregated == resolution or epoch == endEpoch or shutDown:
var csvLines: string
for i in 0 ..< participationEpochsCount.len:
var averageInclusionDelay: Option[float]
average(rewardsAndPenalties[i], averageInclusionDelay,
participationEpochsCount[i], inclusionDelaysCount[i])
csvLines &= serializeToCsv(
rewardsAndPenalties[i], averageInclusionDelay)
let fileName = getFilePathForEpochs(
epoch - epochsAggregated + 1, epoch, outputDir)
info "Writing file ...", fileName = fileName
var result = io2.removeFile(fileName)
doAssert result.isOk
result = io2.writeFile(fileName, snappy.encode(csvLines.toBytes))
doAssert result.isOk
if shutDown:
quit QuitSuccess
participationEpochsCount.setLen(0)
rewardsAndPenalties.setLen(0)
inclusionDelaysCount.setLen(0)
epochsAggregated = 0
proc controlCHook {.noconv.} =
notice "Shutting down after having received SIGINT."
shutDown = true
proc exitOnSigterm(signal: cint) {.noconv.} =
notice "Shutting down after having received SIGTERM."
shutDown = true
proc main =
setControlCHook(controlCHook)
when defined(posix):
c_signal(SIGTERM, exitOnSigterm)
let config = load AggregatorConf
let (startEpoch, endEpoch) = config.determineStartAndEndEpochs
if endEpoch == 0:
fatal "Not found epoch info files in the directory.",
inputDir = config.inputDir
quit QuitFailure
checkIntegrity(startEpoch, endEpoch, config.inputDir.string)
let outputDir =
if config.outputDir.string.len > 0:
config.outputDir
else:
config.inputDir
aggregateEpochs(startEpoch, endEpoch, config.resolution,
config.inputDir.string, outputDir.string)
when isMainModule:
main()