diff --git a/ncli/ncli_common.nim b/ncli/ncli_common.nim index 725188c83..c407c534b 100644 --- a/ncli/ncli_common.nim +++ b/ncli/ncli_common.nim @@ -43,6 +43,7 @@ type const epochInfoFileNameDigitsCount = 8 epochFileNameExtension* = ".epoch" + epochNumberRegexStr = r"\d{" & $epochInfoFileNameDigitsCount & r"}\" proc copyParticipationFlags*(auxiliaryState: var AuxiliaryState, forkedState: ForkedHashedBeaconState) = @@ -52,10 +53,10 @@ proc copyParticipationFlags*(auxiliaryState: var AuxiliaryState, flags.currentEpochParticipation = state.data.current_epoch_participation flags.previousEpochParticipation = state.data.previous_epoch_participation -proc getEpochRange*(dir: string): +proc getUnaggregatedFilesEpochRange*(dir: string): tuple[firstEpoch, lastEpoch: Epoch] = const epochInfoFileNameRegexStr = - r"\d{" & $epochInfoFileNameDigitsCount & r"}\" & epochFileNameExtension + epochNumberRegexStr & epochFileNameExtension var pattern {.global.}: Regex once: pattern = re(epochInfoFileNameRegexStr) var smallestEpochFileName = @@ -73,6 +74,23 @@ proc getEpochRange*(dir: string): result.lastEpoch = parseUInt( largestEpochFileName[0 ..< epochInfoFileNameDigitsCount]).Epoch +proc getUnaggregatedFilesLastEpoch*(dir: string): Epoch = + dir.getUnaggregatedFilesEpochRange.lastEpoch + +proc getAggregatedFilesLastEpoch*(dir: string): Epoch = + const epochInfoFileNameRegexStr = + epochNumberRegexStr & "_" & epochNumberRegexStr & epochFileNameExtension + var pattern {.global.}: Regex + once: pattern = re(epochInfoFileNameRegexStr) + var largestEpochInFileName = 0'u + for (_, fn) in walkDir(dir.string, relative = true): + if fn.match(pattern): + let fileLastEpoch = parseUint( + fn[epochInfoFileNameDigitsCount + 1 .. 2 * epochInfoFileNameDigitsCount]) + if fileLastEpoch > largestEpochInFileName: + largestEpochInFileName = fileLastEpoch + return largestEpochInFileName.Epoch + proc epochAsString*(epoch: Epoch): string = let strEpoch = $epoch '0'.repeat(epochInfoFileNameDigitsCount - strEpoch.len) & strEpoch @@ -80,6 +98,11 @@ proc epochAsString*(epoch: Epoch): string = proc getFilePathForEpoch*(epoch: Epoch, dir: string): string = dir / epochAsString(epoch) & epochFileNameExtension +proc getFilePathForEpochs*(startEpoch, endEpoch: Epoch, dir: string): string = + let fileName = epochAsString(startEpoch) & "_" & + epochAsString(endEpoch) & epochFileNameExtension + dir / fileName + func getBlockRange*(dag: ChainDAGRef, start, ends: Slot): seq[BlockRef] = # Range of block in reverse order doAssert start < ends @@ -433,9 +456,6 @@ proc collectBlockRewardsAndPenalties*( auxiliaryState.pubkeyToIndex.clear rewardsAndPenalties.collectFromSyncAggregate(forkedState, forkedBlock, cache) -proc getStartEpoch*(outDir: string): Epoch = - outDir.getEpochRange.lastEpoch + 1 - func serializeToCsv*(rp: RewardsAndPenalties, avgInclusionDelay = none(float)): string = for name, value in fieldPairs(rp): diff --git a/ncli/ncli_db.nim b/ncli/ncli_db.nim index 45b4aa403..6e89daa1d 100644 --- a/ncli/ncli_db.nim +++ b/ncli/ncli_db.nim @@ -11,7 +11,7 @@ import ssz_codec], ../beacon_chain/sszdump, ../research/simutils, - ./e2store, ./ncli_common + ./e2store, ./ncli_common, ./validator_db_aggregator when defined(posix): import system/ansi_c @@ -180,6 +180,21 @@ type abbr: "e" desc: "The last for which to record statistics." & "By default the last epoch in the input database".}: Option[uint] + resolution {. + defaultValue: 225, + name: "resolution" + abbr: "r" + desc: "How many epochs to be aggregated in a single compacted file" .}: uint + writeAggregatedFiles {. + name: "aggregated" + defaultValue: true + abbr: "a" + desc: "Whether to write aggregated files for a range of epochs with a given resolution" .}: bool + writeUnaggregatedFiles {. + name: "unaggregated" + defaultValue: true + abbr: "u" + desc: "Whether to write unaggregated file for each epoch" .}: bool var shouldShutDown = false @@ -897,11 +912,24 @@ proc cmdValidatorDb(conf: DbConf, cfg: RuntimeConfig) = outDb.createValidatorsView let + unaggregatedFilesOutputDir = conf.outDir / "unaggregated" + aggregatedFilesOutputDir = conf.outDir / "aggregated" startEpoch = if conf.startEpoch.isSome: Epoch(conf.startEpoch.get) else: - getStartEpoch(conf.outDir) + let unaggregatedFilesNextEpoch = getUnaggregatedFilesLastEpoch( + unaggregatedFilesOutputDir) + 1 + let aggregatedFilesNextEpoch = getAggregatedFilesLastEpoch( + aggregatedFilesOutputDir) + 1 + if conf.writeUnaggregatedFiles and conf.writeAggregatedFiles: + min(unaggregatedFilesNextEpoch, aggregatedFilesNextEpoch) + elif conf.writeUnaggregatedFiles: + unaggregatedFilesNextEpoch + elif conf.writeAggregatedFiles: + aggregatedFilesNextEpoch + else: + min(unaggregatedFilesNextEpoch, aggregatedFilesNextEpoch) endEpoch = if conf.endEpoch.isSome: Epoch(conf.endEpoch.get) @@ -921,6 +949,12 @@ proc cmdValidatorDb(conf: DbConf, cfg: RuntimeConfig) = endSlot = endEpoch.start_slot + SLOTS_PER_EPOCH blockRefs = dag.getBlockRange(startSlot, endSlot) + if not unaggregatedFilesOutputDir.dirExists: + unaggregatedFilesOutputDir.createDir + + if not aggregatedFilesOutputDir.dirExists: + aggregatedFilesOutputDir.createDir + let tmpState = newClone(dag.headState) var cache = StateCache() let slot = if startSlot > 0: startSlot - 1 else: 0.Slot @@ -943,6 +977,9 @@ proc cmdValidatorDb(conf: DbConf, cfg: RuntimeConfig) = var auxiliaryState: AuxiliaryState auxiliaryState.copyParticipationFlags(tmpState[].data) + var aggregator = ValidatorDbAggregator.init( + aggregatedFilesOutputDir, conf.resolution, endEpoch) + proc processEpoch() = let epoch = getStateField(tmpState[].data, slot).epoch info "Processing epoch ...", epoch = epoch @@ -968,13 +1005,22 @@ proc cmdValidatorDb(conf: DbConf, cfg: RuntimeConfig) = some(validator.is_previous_epoch_attester.get().delay.uint64) else: none(uint64) - csvLines.add rp.serializeToCsv - let fileName = getFilePathForEpoch(epoch, conf.outDir) - var res = io2.removeFile(fileName) - doAssert res.isOk - res = io2.writeFile(fileName, snappy.encode(csvLines.toBytes)) - doAssert res.isOk + if conf.writeUnaggregatedFiles: + csvLines.add rp.serializeToCsv + + if conf.writeAggregatedFiles: + aggregator.addValidatorData(index, rp) + + if conf.writeUnaggregatedFiles: + let fileName = getFilePathForEpoch(epoch, unaggregatedFilesOutputDir) + var res = io2.removeFile(fileName) + doAssert res.isOk + res = io2.writeFile(fileName, snappy.encode(csvLines.toBytes)) + doAssert res.isOk + + if conf.writeAggregatedFiles: + aggregator.advanceEpochs(epoch, shouldShutDown) if shouldShutDown: quit QuitSuccess collectBalances(previousEpochBalances, tmpState[].data) diff --git a/ncli/validator_db_aggregator.nim b/ncli/validator_db_aggregator.nim index 69a4511e2..c73befaca 100644 --- a/ncli/validator_db_aggregator.nim +++ b/ncli/validator_db_aggregator.nim @@ -35,12 +35,35 @@ type desc: "The directory where aggregated file to be written. " & "By default use the same directory as the input one"}: InputDir -var shutDown = false + ValidatorDbAggregator* {.requiresInit.} = object + outputDir: string + resolution: uint + endEpoch: Epoch + epochsAggregated: uint + aggregatedRewardsAndPenalties: seq[RewardsAndPenalties] + participationEpochsCount: seq[uint] + inclusionDelaysCount: seq[uint] + +proc init*(T: type ValidatorDbAggregator, outputDir: string, + resolution: uint, endEpoch: Epoch): T = + const initialCapacity = 1 shl 16 + ValidatorDbAggregator( + outputDir: outputDir, + resolution: resolution, + endEpoch: endEpoch, + epochsAggregated: 0, + aggregatedRewardsAndPenalties: + newSeqOfCap[RewardsAndPenalties](initialCapacity), + participationEpochsCount: newSeqOfCap[uint](initialCapacity), + inclusionDelaysCount: newSeqOfCap[uint](initialCapacity)) + +var shouldShutDown = false proc determineStartAndEndEpochs(config: AggregatorConf): tuple[startEpoch, endEpoch: Epoch] = if config.startEpoch.isNone or config.endEpoch.isNone: - (result.startEpoch, result.endEpoch) = getEpochRange(config.inputDir.string) + (result.startEpoch, result.endEpoch) = getUnaggregatedFilesEpochRange( + config.inputDir.string) if config.startEpoch.isSome: result.startEpoch = config.startEpoch.get.Epoch if config.endEpoch.isSome: @@ -122,10 +145,52 @@ proc average(rp: var RewardsAndPenalties, doAssert inclusionDelaysCount == 0 averageInclusionDelay = none(float) -proc getFilePathForEpochs(startEpoch, endEpoch: Epoch, dir: string): string = - let fileName = epochAsString(startEpoch) & "_" & - epochAsString(endEpoch) & epochFileNameExtension - dir / fileName + +proc addValidatorData*(aggregator: var ValidatorDbAggregator, + index: int, rp: RewardsAndPenalties) = + if index >= aggregator.participationEpochsCount.len: + aggregator.aggregatedRewardsAndPenalties.add rp + aggregator.participationEpochsCount.add 1 + if rp.inclusion_delay.isSome: + aggregator.inclusionDelaysCount.add 1 + else: + aggregator.inclusionDelaysCount.add 0 + else: + aggregator.aggregatedRewardsAndPenalties[index] += rp + inc aggregator.participationEpochsCount[index] + if rp.inclusionDelay.isSome: + inc aggregator.inclusionDelaysCount[index] + +proc advanceEpochs*(aggregator: var ValidatorDbAggregator, epoch: Epoch, + shouldShutDown: bool) = + inc aggregator.epochsAggregated + + if aggregator.epochsAggregated != aggregator.resolution and + aggregator.endEpoch != epoch and not shouldShutDown: + return + + var csvLines = newStringOfCap(1000000) + for i in 0 ..< aggregator.participationEpochsCount.len: + var averageInclusionDelay: Option[float] + average(aggregator.aggregatedRewardsAndPenalties[i], averageInclusionDelay, + aggregator.participationEpochsCount[i], + aggregator.inclusionDelaysCount[i]) + csvLines &= serializeToCsv( + aggregator.aggregatedRewardsAndPenalties[i], averageInclusionDelay) + + let fileName = getFilePathForEpochs( + epoch - aggregator.epochsAggregated + 1, epoch, aggregator.outputDir) + info "Writing file ...", fileName = fileName + + var result = io2.removeFile(fileName) + doAssert result.isOk + result = io2.writeFile(fileName, snappy.encode(csvLines.toBytes)) + doAssert result.isOk + + aggregator.participationEpochsCount.setLen(0) + aggregator.aggregatedRewardsAndPenalties.setLen(0) + aggregator.inclusionDelaysCount.setLen(0) + aggregator.epochsAggregated = 0 proc aggregateEpochs(startEpoch, endEpoch: Epoch, resolution: uint, inputDir, outputDir: string) = @@ -137,10 +202,7 @@ proc aggregateEpochs(startEpoch, endEpoch: Epoch, resolution: uint, info "Aggregating epochs ...", startEpoch = startEpoch, endEpoch = endEpoch, inputDir = inputDir, outputDir = outputDir - var rewardsAndPenalties: seq[RewardsAndPenalties] - var participationEpochsCount: seq[uint] - var inclusionDelaysCount: seq[uint] - var epochsAggregated = 0'u + var aggregator = ValidatorDbAggregator.init(outputDir, resolution, endEpoch) for epoch in startEpoch .. endEpoch: let filePath = getFilePathForEpoch(epoch, inputDir) @@ -155,59 +217,24 @@ proc aggregateEpochs(startEpoch, endEpoch: Epoch, resolution: uint, var csvParser: CsvParser csvParser.open(dataStream, filePath) - var validatorsCount = 0'u + var validatorsCount = 0 while csvParser.readRow: inc validatorsCount let rp = parseRow(csvParser.row) + aggregator.addValidatorData(validatorsCount - 1, rp) - if validatorsCount > participationEpochsCount.len.uint: - rewardsAndPenalties.add rp - participationEpochsCount.add 1 - if rp.inclusionDelay.isSome: - inclusionDelaysCount.add 1 - else: - inclusionDelaysCount.add 0 - else: - rewardsAndPenalties[validatorsCount - 1] += rp - inc participationEpochsCount[validatorsCount - 1] - if rp.inclusionDelay.isSome: - inc inclusionDelaysCount[validatorsCount - 1] + aggregator.advanceEpochs(epoch, shouldShutDown) - inc epochsAggregated - - if epochsAggregated == resolution or epoch == endEpoch or shutDown: - var csvLines: string - for i in 0 ..< participationEpochsCount.len: - var averageInclusionDelay: Option[float] - average(rewardsAndPenalties[i], averageInclusionDelay, - participationEpochsCount[i], inclusionDelaysCount[i]) - csvLines &= serializeToCsv( - rewardsAndPenalties[i], averageInclusionDelay) - - let fileName = getFilePathForEpochs( - epoch - epochsAggregated + 1, epoch, outputDir) - info "Writing file ...", fileName = fileName - - var result = io2.removeFile(fileName) - doAssert result.isOk - result = io2.writeFile(fileName, snappy.encode(csvLines.toBytes)) - doAssert result.isOk - - if shutDown: - quit QuitSuccess - - participationEpochsCount.setLen(0) - rewardsAndPenalties.setLen(0) - inclusionDelaysCount.setLen(0) - epochsAggregated = 0 + if shouldShutDown: + quit QuitSuccess proc controlCHook {.noconv.} = notice "Shutting down after having received SIGINT." - shutDown = true + shouldShutDown = true proc exitOnSigterm(signal: cint) {.noconv.} = notice "Shutting down after having received SIGTERM." - shutDown = true + shouldShutDown = true proc main = setControlCHook(controlCHook) diff --git a/ncli/validator_db_reports.ipynb b/ncli/validator_db_reports.ipynb index 344be6d60..715020ef2 100644 --- a/ncli/validator_db_reports.ipynb +++ b/ncli/validator_db_reports.ipynb @@ -138,11 +138,9 @@ "metadata": {}, "outputs": [], "source": [ - "start_epoch = 10000\n", - "end_epoch = 20000\n", - "resolution = 225\n", + "start_epoch = 1\n", + "end_epoch = 94275\n", "files_dir = \"../build/data/mainnetCompactedValidatorDb/\"\n", - "use_compacted_files = True\n", "rewards = [SOURCE, TARGET, HEAD, INCLUSION_DELAY, SYNC_COMMITTEE]\n", "validators_sets = {\n", " \"set1\": list(range(10)),\n", @@ -201,6 +199,7 @@ "def adjust_constraints(sorted_file_names):\n", " first_start_epoch, first_end_epoch = get_first_and_last_epoch(sorted_file_names[0])\n", " _, last_end_epoch = get_first_and_last_epoch(sorted_file_names[-1])\n", + " global start_epoch, end_epoch, resolution\n", " start_epoch = first_start_epoch\n", " end_epoch = last_end_epoch\n", " resolution = first_end_epoch - first_start_epoch + 1\n", @@ -216,60 +215,55 @@ "def get_max_reward_var(component):\n", " return \"max_\" + component + \"_reward\"\n", "\n", + "max_reward_vars = [get_max_reward_var(reward_type) for reward_type in rewards]\n", + "outcome_vars = [get_outcome_var(reward_type) for reward_type in rewards]\n", + "\n", "def sum_max_values(t):\n", - " return sum(getattr(t, get_max_reward_var(reward)) for reward in rewards)\n", + " return sum(getattr(t, max_reward) for max_reward in max_reward_vars)\n", "\n", "def sum_actual_values(t):\n", - " return sum(getattr(t, get_outcome_var(reward)) for reward in rewards)\n", + " return sum(getattr(t, outcome) for outcome in outcome_vars)\n", + "\n", + "def compute_losses_median(data):\n", + " max_values = data[max_reward_vars].sum(axis = 1)\n", + " actual_values = data[outcome_vars].sum(axis = 1)\n", + " losses = max_values - actual_values\n", + " return losses.median(axis = 0)\n", "\n", "total_losses_per_epoch_point = {}\n", - "validators_per_epoch_point = {}\n", "average_losses_per_epoch_point = {}\n", + "validators_sets_queries = {}\n", + "medians = {}\n", "\n", - "def compute_total_losses(epoch_point, epochs = None):\n", + "for set_name, set_values in validators_sets.items():\n", + " total_losses_per_epoch_point[set_name] = {}\n", + " average_losses_per_epoch_point[set_name] = {}\n", + " validators_sets_queries[set_name] = []\n", + "\n", + "file_names = [file_name for file_name in os.listdir(files_dir)\n", + " if isEpochInfoFile(file_name)]\n", + "file_names.sort()\n", + "adjust_constraints(file_names)\n", + "\n", + "previous_validators_count = 0\n", + "for file_name in file_names:\n", + " data = read_csv(Path(files_dir + file_name))\n", + " file_first_epoch, file_last_epoch = get_first_and_last_epoch(file_name)\n", + " file_epoch_range = file_last_epoch - file_first_epoch + 1\n", + " epoch_point = file_first_epoch // resolution\n", + " validators_count = len(data.index)\n", " for set_name, validators in validators_sets.items():\n", - " if not set_name in total_losses_per_epoch_point:\n", - " total_losses_per_epoch_point[set_name] = {}\n", - " validators_per_epoch_point[set_name] = {}\n", - " if not epoch_point in total_losses_per_epoch_point[set_name]:\n", - " total_losses_per_epoch_point[set_name][epoch_point] = 0\n", - " validators_per_epoch_point[set_name][epoch_point] = 0\n", - " for validator_index in validators:\n", - " validator_info = data.iloc[validator_index]\n", - " validator_losses = \\\n", - " sum_max_values(validator_info) - sum_actual_values(validator_info)\n", - " total_losses_per_epoch_point[set_name][epoch_point] += \\\n", - " validator_losses if epochs == None else validator_losses * epochs\n", - " validators_per_epoch_point[set_name][epoch_point] += \\\n", - " 1 if epochs == None else epochs\n", - "\n", - "def compute_average_losses():\n", - " for set_name in validators_sets:\n", - " if not set_name in average_losses_per_epoch_point:\n", - " average_losses_per_epoch_point[set_name] = {}\n", - " for epoch_point, total_losses in total_losses_per_epoch_point[set_name].items():\n", - " average_losses_per_epoch_point[set_name][epoch_point] = \\\n", - " total_losses / validators_per_epoch_point[set_name][epoch_point]\n", - "\n", - "if use_compacted_files:\n", - " file_names = [file_name for file_name in os.listdir(files_dir)\n", - " if isEpochInfoFile(file_name)]\n", - " file_names.sort()\n", - " adjust_constraints(file_names)\n", - "\n", - " for file_name in file_names:\n", - " data = read_csv(Path(files_dir + file_name))\n", - " file_first_epoch, file_last_epoch = get_first_and_last_epoch(file_name)\n", - " file_epochs_range = file_last_epoch - file_first_epoch + 1\n", - " epoch_point = file_first_epoch // resolution\n", - " compute_total_losses(epoch_point, file_epochs_range)\n", - "else:\n", - " for epoch in range(start_epoch, end_epoch + 1):\n", - " data = read_csv(Path(files_dir + \"{:08}.epoch\".format(epoch)))\n", - " epoch_point = epoch // resolution\n", - " compute_total_losses(epoch_point)\n", - "\n", - "compute_average_losses()" + " for i in range(previous_validators_count, validators_count):\n", + " if i in validators:\n", + " validators_sets_queries[set_name].append(i)\n", + " sums = data.iloc[validators_sets_queries[set_name]].sum(axis = 0)\n", + " difference = sum_max_values(sums) - sum_actual_values(sums)\n", + " set_validators_count = len(validators_sets_queries[set_name])\n", + " average_losses_per_epoch_point[set_name][epoch_point] = \\\n", + " difference / set_validators_count if set_validators_count > 0 else 0\n", + " total_losses_per_epoch_point[set_name][epoch_point] = difference * file_epoch_range\n", + " medians[epoch_point] = compute_losses_median(data)\n", + " previous_validators_count = validators_count\n" ] }, { @@ -294,14 +288,20 @@ "plt.xlabel(\"Epoch\")\n", "plt.ylabel(\"Gwei\")\n", "\n", - "for name, value in average_losses_per_epoch_point.items():\n", - " epochs = np.array([ep * resolution + resolution // 2 for ep in value.keys()])\n", - " values = np.array(list(value.values()))\n", + "num_samples = (end_epoch - start_epoch + 1) // resolution * 100\n", + "\n", + "def plot(set_name, set_values):\n", + " epochs = np.array([ep * resolution + resolution // 2 for ep in set_values.keys()])\n", + " values = np.array(list(set_values.values()))\n", " spline = make_interp_spline(epochs, values)\n", - " num_samples = (end_epoch - start_epoch + 1) // resolution * 100\n", " x = np.linspace(epochs.min(), epochs.max(), num_samples)\n", " y = spline(x)\n", - " plt.plot(x, y, label=name)\n", + " plt.plot(x, y, label=set_name)\n", + "\n", + "for name, value in average_losses_per_epoch_point.items():\n", + " plot(name, value)\n", + "\n", + "plot(\"median\", medians)\n", "\n", "plt.legend(loc=\"best\")" ]