Streamline the validator reward analysis

Notable improvements:

* A separate aggregation pass is no longer required.

* The user can opt to produce only aggregated data
  (resuing in a much smaller data set).

* Large portion of the number cruching in Jupyter is now done in C
  through the rich DataFrames API.

* Added support for comparisons against the "median" validator
  performance in the network.
This commit is contained in:
Zahary Karadjov 2022-01-31 14:06:16 +02:00 committed by zah
parent 0c814f49ee
commit ac16eb4691
4 changed files with 213 additions and 120 deletions

View File

@ -43,6 +43,7 @@ type
const
epochInfoFileNameDigitsCount = 8
epochFileNameExtension* = ".epoch"
epochNumberRegexStr = r"\d{" & $epochInfoFileNameDigitsCount & r"}\"
proc copyParticipationFlags*(auxiliaryState: var AuxiliaryState,
forkedState: ForkedHashedBeaconState) =
@ -52,10 +53,10 @@ proc copyParticipationFlags*(auxiliaryState: var AuxiliaryState,
flags.currentEpochParticipation = state.data.current_epoch_participation
flags.previousEpochParticipation = state.data.previous_epoch_participation
proc getEpochRange*(dir: string):
proc getUnaggregatedFilesEpochRange*(dir: string):
tuple[firstEpoch, lastEpoch: Epoch] =
const epochInfoFileNameRegexStr =
r"\d{" & $epochInfoFileNameDigitsCount & r"}\" & epochFileNameExtension
epochNumberRegexStr & epochFileNameExtension
var pattern {.global.}: Regex
once: pattern = re(epochInfoFileNameRegexStr)
var smallestEpochFileName =
@ -73,6 +74,23 @@ proc getEpochRange*(dir: string):
result.lastEpoch = parseUInt(
largestEpochFileName[0 ..< epochInfoFileNameDigitsCount]).Epoch
proc getUnaggregatedFilesLastEpoch*(dir: string): Epoch =
dir.getUnaggregatedFilesEpochRange.lastEpoch
proc getAggregatedFilesLastEpoch*(dir: string): Epoch =
const epochInfoFileNameRegexStr =
epochNumberRegexStr & "_" & epochNumberRegexStr & epochFileNameExtension
var pattern {.global.}: Regex
once: pattern = re(epochInfoFileNameRegexStr)
var largestEpochInFileName = 0'u
for (_, fn) in walkDir(dir.string, relative = true):
if fn.match(pattern):
let fileLastEpoch = parseUint(
fn[epochInfoFileNameDigitsCount + 1 .. 2 * epochInfoFileNameDigitsCount])
if fileLastEpoch > largestEpochInFileName:
largestEpochInFileName = fileLastEpoch
return largestEpochInFileName.Epoch
proc epochAsString*(epoch: Epoch): string =
let strEpoch = $epoch
'0'.repeat(epochInfoFileNameDigitsCount - strEpoch.len) & strEpoch
@ -80,6 +98,11 @@ proc epochAsString*(epoch: Epoch): string =
proc getFilePathForEpoch*(epoch: Epoch, dir: string): string =
dir / epochAsString(epoch) & epochFileNameExtension
proc getFilePathForEpochs*(startEpoch, endEpoch: Epoch, dir: string): string =
let fileName = epochAsString(startEpoch) & "_" &
epochAsString(endEpoch) & epochFileNameExtension
dir / fileName
func getBlockRange*(dag: ChainDAGRef, start, ends: Slot): seq[BlockRef] =
# Range of block in reverse order
doAssert start < ends
@ -433,9 +456,6 @@ proc collectBlockRewardsAndPenalties*(
auxiliaryState.pubkeyToIndex.clear
rewardsAndPenalties.collectFromSyncAggregate(forkedState, forkedBlock, cache)
proc getStartEpoch*(outDir: string): Epoch =
outDir.getEpochRange.lastEpoch + 1
func serializeToCsv*(rp: RewardsAndPenalties,
avgInclusionDelay = none(float)): string =
for name, value in fieldPairs(rp):

View File

@ -11,7 +11,7 @@ import
ssz_codec],
../beacon_chain/sszdump,
../research/simutils,
./e2store, ./ncli_common
./e2store, ./ncli_common, ./validator_db_aggregator
when defined(posix):
import system/ansi_c
@ -180,6 +180,21 @@ type
abbr: "e"
desc: "The last for which to record statistics." &
"By default the last epoch in the input database".}: Option[uint]
resolution {.
defaultValue: 225,
name: "resolution"
abbr: "r"
desc: "How many epochs to be aggregated in a single compacted file" .}: uint
writeAggregatedFiles {.
name: "aggregated"
defaultValue: true
abbr: "a"
desc: "Whether to write aggregated files for a range of epochs with a given resolution" .}: bool
writeUnaggregatedFiles {.
name: "unaggregated"
defaultValue: true
abbr: "u"
desc: "Whether to write unaggregated file for each epoch" .}: bool
var shouldShutDown = false
@ -897,11 +912,24 @@ proc cmdValidatorDb(conf: DbConf, cfg: RuntimeConfig) =
outDb.createValidatorsView
let
unaggregatedFilesOutputDir = conf.outDir / "unaggregated"
aggregatedFilesOutputDir = conf.outDir / "aggregated"
startEpoch =
if conf.startEpoch.isSome:
Epoch(conf.startEpoch.get)
else:
getStartEpoch(conf.outDir)
let unaggregatedFilesNextEpoch = getUnaggregatedFilesLastEpoch(
unaggregatedFilesOutputDir) + 1
let aggregatedFilesNextEpoch = getAggregatedFilesLastEpoch(
aggregatedFilesOutputDir) + 1
if conf.writeUnaggregatedFiles and conf.writeAggregatedFiles:
min(unaggregatedFilesNextEpoch, aggregatedFilesNextEpoch)
elif conf.writeUnaggregatedFiles:
unaggregatedFilesNextEpoch
elif conf.writeAggregatedFiles:
aggregatedFilesNextEpoch
else:
min(unaggregatedFilesNextEpoch, aggregatedFilesNextEpoch)
endEpoch =
if conf.endEpoch.isSome:
Epoch(conf.endEpoch.get)
@ -921,6 +949,12 @@ proc cmdValidatorDb(conf: DbConf, cfg: RuntimeConfig) =
endSlot = endEpoch.start_slot + SLOTS_PER_EPOCH
blockRefs = dag.getBlockRange(startSlot, endSlot)
if not unaggregatedFilesOutputDir.dirExists:
unaggregatedFilesOutputDir.createDir
if not aggregatedFilesOutputDir.dirExists:
aggregatedFilesOutputDir.createDir
let tmpState = newClone(dag.headState)
var cache = StateCache()
let slot = if startSlot > 0: startSlot - 1 else: 0.Slot
@ -943,6 +977,9 @@ proc cmdValidatorDb(conf: DbConf, cfg: RuntimeConfig) =
var auxiliaryState: AuxiliaryState
auxiliaryState.copyParticipationFlags(tmpState[].data)
var aggregator = ValidatorDbAggregator.init(
aggregatedFilesOutputDir, conf.resolution, endEpoch)
proc processEpoch() =
let epoch = getStateField(tmpState[].data, slot).epoch
info "Processing epoch ...", epoch = epoch
@ -968,13 +1005,22 @@ proc cmdValidatorDb(conf: DbConf, cfg: RuntimeConfig) =
some(validator.is_previous_epoch_attester.get().delay.uint64)
else:
none(uint64)
csvLines.add rp.serializeToCsv
let fileName = getFilePathForEpoch(epoch, conf.outDir)
var res = io2.removeFile(fileName)
doAssert res.isOk
res = io2.writeFile(fileName, snappy.encode(csvLines.toBytes))
doAssert res.isOk
if conf.writeUnaggregatedFiles:
csvLines.add rp.serializeToCsv
if conf.writeAggregatedFiles:
aggregator.addValidatorData(index, rp)
if conf.writeUnaggregatedFiles:
let fileName = getFilePathForEpoch(epoch, unaggregatedFilesOutputDir)
var res = io2.removeFile(fileName)
doAssert res.isOk
res = io2.writeFile(fileName, snappy.encode(csvLines.toBytes))
doAssert res.isOk
if conf.writeAggregatedFiles:
aggregator.advanceEpochs(epoch, shouldShutDown)
if shouldShutDown: quit QuitSuccess
collectBalances(previousEpochBalances, tmpState[].data)

View File

@ -35,12 +35,35 @@ type
desc: "The directory where aggregated file to be written. " &
"By default use the same directory as the input one"}: InputDir
var shutDown = false
ValidatorDbAggregator* {.requiresInit.} = object
outputDir: string
resolution: uint
endEpoch: Epoch
epochsAggregated: uint
aggregatedRewardsAndPenalties: seq[RewardsAndPenalties]
participationEpochsCount: seq[uint]
inclusionDelaysCount: seq[uint]
proc init*(T: type ValidatorDbAggregator, outputDir: string,
resolution: uint, endEpoch: Epoch): T =
const initialCapacity = 1 shl 16
ValidatorDbAggregator(
outputDir: outputDir,
resolution: resolution,
endEpoch: endEpoch,
epochsAggregated: 0,
aggregatedRewardsAndPenalties:
newSeqOfCap[RewardsAndPenalties](initialCapacity),
participationEpochsCount: newSeqOfCap[uint](initialCapacity),
inclusionDelaysCount: newSeqOfCap[uint](initialCapacity))
var shouldShutDown = false
proc determineStartAndEndEpochs(config: AggregatorConf):
tuple[startEpoch, endEpoch: Epoch] =
if config.startEpoch.isNone or config.endEpoch.isNone:
(result.startEpoch, result.endEpoch) = getEpochRange(config.inputDir.string)
(result.startEpoch, result.endEpoch) = getUnaggregatedFilesEpochRange(
config.inputDir.string)
if config.startEpoch.isSome:
result.startEpoch = config.startEpoch.get.Epoch
if config.endEpoch.isSome:
@ -122,10 +145,52 @@ proc average(rp: var RewardsAndPenalties,
doAssert inclusionDelaysCount == 0
averageInclusionDelay = none(float)
proc getFilePathForEpochs(startEpoch, endEpoch: Epoch, dir: string): string =
let fileName = epochAsString(startEpoch) & "_" &
epochAsString(endEpoch) & epochFileNameExtension
dir / fileName
proc addValidatorData*(aggregator: var ValidatorDbAggregator,
index: int, rp: RewardsAndPenalties) =
if index >= aggregator.participationEpochsCount.len:
aggregator.aggregatedRewardsAndPenalties.add rp
aggregator.participationEpochsCount.add 1
if rp.inclusion_delay.isSome:
aggregator.inclusionDelaysCount.add 1
else:
aggregator.inclusionDelaysCount.add 0
else:
aggregator.aggregatedRewardsAndPenalties[index] += rp
inc aggregator.participationEpochsCount[index]
if rp.inclusionDelay.isSome:
inc aggregator.inclusionDelaysCount[index]
proc advanceEpochs*(aggregator: var ValidatorDbAggregator, epoch: Epoch,
shouldShutDown: bool) =
inc aggregator.epochsAggregated
if aggregator.epochsAggregated != aggregator.resolution and
aggregator.endEpoch != epoch and not shouldShutDown:
return
var csvLines = newStringOfCap(1000000)
for i in 0 ..< aggregator.participationEpochsCount.len:
var averageInclusionDelay: Option[float]
average(aggregator.aggregatedRewardsAndPenalties[i], averageInclusionDelay,
aggregator.participationEpochsCount[i],
aggregator.inclusionDelaysCount[i])
csvLines &= serializeToCsv(
aggregator.aggregatedRewardsAndPenalties[i], averageInclusionDelay)
let fileName = getFilePathForEpochs(
epoch - aggregator.epochsAggregated + 1, epoch, aggregator.outputDir)
info "Writing file ...", fileName = fileName
var result = io2.removeFile(fileName)
doAssert result.isOk
result = io2.writeFile(fileName, snappy.encode(csvLines.toBytes))
doAssert result.isOk
aggregator.participationEpochsCount.setLen(0)
aggregator.aggregatedRewardsAndPenalties.setLen(0)
aggregator.inclusionDelaysCount.setLen(0)
aggregator.epochsAggregated = 0
proc aggregateEpochs(startEpoch, endEpoch: Epoch, resolution: uint,
inputDir, outputDir: string) =
@ -137,10 +202,7 @@ proc aggregateEpochs(startEpoch, endEpoch: Epoch, resolution: uint,
info "Aggregating epochs ...", startEpoch = startEpoch, endEpoch = endEpoch,
inputDir = inputDir, outputDir = outputDir
var rewardsAndPenalties: seq[RewardsAndPenalties]
var participationEpochsCount: seq[uint]
var inclusionDelaysCount: seq[uint]
var epochsAggregated = 0'u
var aggregator = ValidatorDbAggregator.init(outputDir, resolution, endEpoch)
for epoch in startEpoch .. endEpoch:
let filePath = getFilePathForEpoch(epoch, inputDir)
@ -155,59 +217,24 @@ proc aggregateEpochs(startEpoch, endEpoch: Epoch, resolution: uint,
var csvParser: CsvParser
csvParser.open(dataStream, filePath)
var validatorsCount = 0'u
var validatorsCount = 0
while csvParser.readRow:
inc validatorsCount
let rp = parseRow(csvParser.row)
aggregator.addValidatorData(validatorsCount - 1, rp)
if validatorsCount > participationEpochsCount.len.uint:
rewardsAndPenalties.add rp
participationEpochsCount.add 1
if rp.inclusionDelay.isSome:
inclusionDelaysCount.add 1
else:
inclusionDelaysCount.add 0
else:
rewardsAndPenalties[validatorsCount - 1] += rp
inc participationEpochsCount[validatorsCount - 1]
if rp.inclusionDelay.isSome:
inc inclusionDelaysCount[validatorsCount - 1]
aggregator.advanceEpochs(epoch, shouldShutDown)
inc epochsAggregated
if epochsAggregated == resolution or epoch == endEpoch or shutDown:
var csvLines: string
for i in 0 ..< participationEpochsCount.len:
var averageInclusionDelay: Option[float]
average(rewardsAndPenalties[i], averageInclusionDelay,
participationEpochsCount[i], inclusionDelaysCount[i])
csvLines &= serializeToCsv(
rewardsAndPenalties[i], averageInclusionDelay)
let fileName = getFilePathForEpochs(
epoch - epochsAggregated + 1, epoch, outputDir)
info "Writing file ...", fileName = fileName
var result = io2.removeFile(fileName)
doAssert result.isOk
result = io2.writeFile(fileName, snappy.encode(csvLines.toBytes))
doAssert result.isOk
if shutDown:
quit QuitSuccess
participationEpochsCount.setLen(0)
rewardsAndPenalties.setLen(0)
inclusionDelaysCount.setLen(0)
epochsAggregated = 0
if shouldShutDown:
quit QuitSuccess
proc controlCHook {.noconv.} =
notice "Shutting down after having received SIGINT."
shutDown = true
shouldShutDown = true
proc exitOnSigterm(signal: cint) {.noconv.} =
notice "Shutting down after having received SIGTERM."
shutDown = true
shouldShutDown = true
proc main =
setControlCHook(controlCHook)

View File

@ -138,11 +138,9 @@
"metadata": {},
"outputs": [],
"source": [
"start_epoch = 10000\n",
"end_epoch = 20000\n",
"resolution = 225\n",
"start_epoch = 1\n",
"end_epoch = 94275\n",
"files_dir = \"../build/data/mainnetCompactedValidatorDb/\"\n",
"use_compacted_files = True\n",
"rewards = [SOURCE, TARGET, HEAD, INCLUSION_DELAY, SYNC_COMMITTEE]\n",
"validators_sets = {\n",
" \"set1\": list(range(10)),\n",
@ -201,6 +199,7 @@
"def adjust_constraints(sorted_file_names):\n",
" first_start_epoch, first_end_epoch = get_first_and_last_epoch(sorted_file_names[0])\n",
" _, last_end_epoch = get_first_and_last_epoch(sorted_file_names[-1])\n",
" global start_epoch, end_epoch, resolution\n",
" start_epoch = first_start_epoch\n",
" end_epoch = last_end_epoch\n",
" resolution = first_end_epoch - first_start_epoch + 1\n",
@ -216,60 +215,55 @@
"def get_max_reward_var(component):\n",
" return \"max_\" + component + \"_reward\"\n",
"\n",
"max_reward_vars = [get_max_reward_var(reward_type) for reward_type in rewards]\n",
"outcome_vars = [get_outcome_var(reward_type) for reward_type in rewards]\n",
"\n",
"def sum_max_values(t):\n",
" return sum(getattr(t, get_max_reward_var(reward)) for reward in rewards)\n",
" return sum(getattr(t, max_reward) for max_reward in max_reward_vars)\n",
"\n",
"def sum_actual_values(t):\n",
" return sum(getattr(t, get_outcome_var(reward)) for reward in rewards)\n",
" return sum(getattr(t, outcome) for outcome in outcome_vars)\n",
"\n",
"def compute_losses_median(data):\n",
" max_values = data[max_reward_vars].sum(axis = 1)\n",
" actual_values = data[outcome_vars].sum(axis = 1)\n",
" losses = max_values - actual_values\n",
" return losses.median(axis = 0)\n",
"\n",
"total_losses_per_epoch_point = {}\n",
"validators_per_epoch_point = {}\n",
"average_losses_per_epoch_point = {}\n",
"validators_sets_queries = {}\n",
"medians = {}\n",
"\n",
"def compute_total_losses(epoch_point, epochs = None):\n",
"for set_name, set_values in validators_sets.items():\n",
" total_losses_per_epoch_point[set_name] = {}\n",
" average_losses_per_epoch_point[set_name] = {}\n",
" validators_sets_queries[set_name] = []\n",
"\n",
"file_names = [file_name for file_name in os.listdir(files_dir)\n",
" if isEpochInfoFile(file_name)]\n",
"file_names.sort()\n",
"adjust_constraints(file_names)\n",
"\n",
"previous_validators_count = 0\n",
"for file_name in file_names:\n",
" data = read_csv(Path(files_dir + file_name))\n",
" file_first_epoch, file_last_epoch = get_first_and_last_epoch(file_name)\n",
" file_epoch_range = file_last_epoch - file_first_epoch + 1\n",
" epoch_point = file_first_epoch // resolution\n",
" validators_count = len(data.index)\n",
" for set_name, validators in validators_sets.items():\n",
" if not set_name in total_losses_per_epoch_point:\n",
" total_losses_per_epoch_point[set_name] = {}\n",
" validators_per_epoch_point[set_name] = {}\n",
" if not epoch_point in total_losses_per_epoch_point[set_name]:\n",
" total_losses_per_epoch_point[set_name][epoch_point] = 0\n",
" validators_per_epoch_point[set_name][epoch_point] = 0\n",
" for validator_index in validators:\n",
" validator_info = data.iloc[validator_index]\n",
" validator_losses = \\\n",
" sum_max_values(validator_info) - sum_actual_values(validator_info)\n",
" total_losses_per_epoch_point[set_name][epoch_point] += \\\n",
" validator_losses if epochs == None else validator_losses * epochs\n",
" validators_per_epoch_point[set_name][epoch_point] += \\\n",
" 1 if epochs == None else epochs\n",
"\n",
"def compute_average_losses():\n",
" for set_name in validators_sets:\n",
" if not set_name in average_losses_per_epoch_point:\n",
" average_losses_per_epoch_point[set_name] = {}\n",
" for epoch_point, total_losses in total_losses_per_epoch_point[set_name].items():\n",
" average_losses_per_epoch_point[set_name][epoch_point] = \\\n",
" total_losses / validators_per_epoch_point[set_name][epoch_point]\n",
"\n",
"if use_compacted_files:\n",
" file_names = [file_name for file_name in os.listdir(files_dir)\n",
" if isEpochInfoFile(file_name)]\n",
" file_names.sort()\n",
" adjust_constraints(file_names)\n",
"\n",
" for file_name in file_names:\n",
" data = read_csv(Path(files_dir + file_name))\n",
" file_first_epoch, file_last_epoch = get_first_and_last_epoch(file_name)\n",
" file_epochs_range = file_last_epoch - file_first_epoch + 1\n",
" epoch_point = file_first_epoch // resolution\n",
" compute_total_losses(epoch_point, file_epochs_range)\n",
"else:\n",
" for epoch in range(start_epoch, end_epoch + 1):\n",
" data = read_csv(Path(files_dir + \"{:08}.epoch\".format(epoch)))\n",
" epoch_point = epoch // resolution\n",
" compute_total_losses(epoch_point)\n",
"\n",
"compute_average_losses()"
" for i in range(previous_validators_count, validators_count):\n",
" if i in validators:\n",
" validators_sets_queries[set_name].append(i)\n",
" sums = data.iloc[validators_sets_queries[set_name]].sum(axis = 0)\n",
" difference = sum_max_values(sums) - sum_actual_values(sums)\n",
" set_validators_count = len(validators_sets_queries[set_name])\n",
" average_losses_per_epoch_point[set_name][epoch_point] = \\\n",
" difference / set_validators_count if set_validators_count > 0 else 0\n",
" total_losses_per_epoch_point[set_name][epoch_point] = difference * file_epoch_range\n",
" medians[epoch_point] = compute_losses_median(data)\n",
" previous_validators_count = validators_count\n"
]
},
{
@ -294,14 +288,20 @@
"plt.xlabel(\"Epoch\")\n",
"plt.ylabel(\"Gwei\")\n",
"\n",
"for name, value in average_losses_per_epoch_point.items():\n",
" epochs = np.array([ep * resolution + resolution // 2 for ep in value.keys()])\n",
" values = np.array(list(value.values()))\n",
"num_samples = (end_epoch - start_epoch + 1) // resolution * 100\n",
"\n",
"def plot(set_name, set_values):\n",
" epochs = np.array([ep * resolution + resolution // 2 for ep in set_values.keys()])\n",
" values = np.array(list(set_values.values()))\n",
" spline = make_interp_spline(epochs, values)\n",
" num_samples = (end_epoch - start_epoch + 1) // resolution * 100\n",
" x = np.linspace(epochs.min(), epochs.max(), num_samples)\n",
" y = spline(x)\n",
" plt.plot(x, y, label=name)\n",
" plt.plot(x, y, label=set_name)\n",
"\n",
"for name, value in average_losses_per_epoch_point.items():\n",
" plot(name, value)\n",
"\n",
"plot(\"median\", medians)\n",
"\n",
"plt.legend(loc=\"best\")"
]