From 0f8a3a5ae8b73057739cca934ba5037a187a7fe5 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Fri, 18 Dec 2020 22:01:24 +0100 Subject: [PATCH] checkpoint database at end of each slot (#2195) * checkpoint database at end of each slot To avoid spending time on synchronizing with the file system while doing processing, the manual checkpointing mode turns off fsync during processing and instead checkpoints the database when the slot has ended. From an sqlite perspecitve, in WAL mode this guaranees database consistency but may lead to data loss which is fine - anything missing from the beacon chain database can be recovered on the next startup. * log sync status and delay in slot start message * bump --- beacon_chain/beacon_chain_db.nim | 13 +++---- beacon_chain/nimbus_beacon_node.nim | 57 ++++++++++++++++++----------- vendor/nim-eth | 2 +- 3 files changed, 43 insertions(+), 29 deletions(-) diff --git a/beacon_chain/beacon_chain_db.nim b/beacon_chain/beacon_chain_db.nim index 248c534ac..5f17a8bcb 100644 --- a/beacon_chain/beacon_chain_db.nim +++ b/beacon_chain/beacon_chain_db.nim @@ -21,8 +21,6 @@ type keyspace: int DepositsSeq = DbSeq[DepositData] - ImmutableValidatorDataSeq = seq[ImmutableValidatorData] - ValidatorKeyToIndexMap = Table[ValidatorPubKey, ValidatorIndex] DepositsMerkleizer* = SszMerkleizer[depositContractLimit] @@ -46,6 +44,7 @@ type backend: KvStoreRef preset: RuntimePreset genesisDeposits*: DepositsSeq + checkpoint*: proc() {.gcsafe.} Keyspaces* = enum defaultKeyspace = "kvstore" @@ -218,8 +217,8 @@ proc init*(T: type BeaconChainDB, let s = secureCreatePath(dir) doAssert s.isOk # TODO(zah) Handle this in a better way - let sqliteStore = SqStoreRef.init(dir, "nbc", Keyspaces).expect( - "working database") + let sqliteStore = SqStoreRef.init( + dir, "nbc", Keyspaces, manualCheckpoint = true).expect("working database") # Remove the deposits table we used before we switched # to storing only deposit contract checkpoints @@ -230,12 +229,12 @@ proc init*(T: type BeaconChainDB, validatorKeyToIndex = initTable[ValidatorPubKey, ValidatorIndex]() genesisDepositsSeq = DbSeq[DepositData].init(sqliteStore, "genesis_deposits") - let isPyrmont = - not pyrmontMetadata.incompatible and preset == pyrmontMetadata.runtimePreset T(backend: kvStore sqliteStore, preset: preset, - genesisDeposits: genesisDepositsSeq) + genesisDeposits: genesisDepositsSeq, + checkpoint: proc() = sqliteStore.checkpoint() + ) proc snappyEncode(inp: openArray[byte]): seq[byte] = try: diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 58d87750b..c417d4dca 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -542,6 +542,34 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} = if slot.isEpoch and node.getTopicSubscriptionEnabled: await node.cycleAttestationSubnets(slot) +proc onSlotEnd(node: BeaconNode, slot, nextSlot: Slot): Future[void] = + # Things we do when slot processing has ended and we're about to wait for the + # next slot + + when declared(GC_fullCollect): + # The slots in the beacon node work as frames in a game: we want to make + # sure that we're ready for the next one and don't get stuck in lengthy + # garbage collection tasks when time is of essence in the middle of a slot - + # while this does not guarantee that we'll never collect during a slot, it + # makes sure that all the scratch space we used during slot tasks (logging, + # temporary buffers etc) gets recycled for the next slot that is likely to + # need similar amounts of memory. + GC_fullCollect() + + # Checkpoint the database to clear the WAL file and make sure changes in + # the database are synced with the filesystem. + node.db.checkpoint() + + info "Slot end", + slot = shortLog(slot), + nextSlot = shortLog(nextSlot), + head = shortLog(node.chainDag.head), + headEpoch = shortLog(node.chainDag.head.slot.compute_epoch_at_slot()), + finalizedHead = shortLog(node.chainDag.finalizedHead.blck), + finalizedEpoch = shortLog(node.chainDag.finalizedHead.blck.slot.compute_epoch_at_slot()) + + node.updateGossipStatus(slot) + proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} = ## Called at the beginning of a slot - usually every slot, but sometimes might ## skip a few in case we're running late. @@ -559,15 +587,20 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} = node.processor[].blockReceivedDuringSlot.complete() node.processor[].blockReceivedDuringSlot = newFuture[void]() + let delay = beaconTime - scheduledSlot.toBeaconTime() + info "Slot start", lastSlot = shortLog(lastSlot), scheduledSlot = shortLog(scheduledSlot), - beaconTime = shortLog(beaconTime), + delay, peers = len(node.network.peerPool), head = shortLog(node.chainDag.head), headEpoch = shortLog(node.chainDag.head.slot.compute_epoch_at_slot()), finalized = shortLog(node.chainDag.finalizedHead.blck), - finalizedEpoch = shortLog(finalizedEpoch) + finalizedEpoch = shortLog(finalizedEpoch), + sync = + if node.syncManager.inProgress: node.syncManager.syncStatus + else: "synced" # Check before any re-scheduling of onSlotStart() checkIfShouldStopAtEpoch(scheduledSlot, node.config.stopAtEpoch) @@ -597,7 +630,7 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} = slot = wallSlot.slot # afterGenesis == true! nextSlot = slot + 1 - defer: await node.updateGossipStatus(slot) + defer: await onSlotEnd(node, slot, nextSlot) beacon_slot.set slot.int64 beacon_current_epoch.set slot.epoch.int64 @@ -649,24 +682,6 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} = let nextSlotStart = saturate(node.beaconClock.fromNow(nextSlot)) - info "Slot end", - slot = shortLog(slot), - nextSlot = shortLog(nextSlot), - head = shortLog(node.chainDag.head), - headEpoch = shortLog(node.chainDag.head.slot.compute_epoch_at_slot()), - finalizedHead = shortLog(node.chainDag.finalizedHead.blck), - finalizedEpoch = shortLog(node.chainDag.finalizedHead.blck.slot.compute_epoch_at_slot()) - - when declared(GC_fullCollect): - # The slots in the beacon node work as frames in a game: we want to make - # sure that we're ready for the next one and don't get stuck in lengthy - # garbage collection tasks when time is of essence in the middle of a slot - - # while this does not guarantee that we'll never collect during a slot, it - # makes sure that all the scratch space we used during slot tasks (logging, - # temporary buffers etc) gets recycled for the next slot that is likely to - # need similar amounts of memory. - GC_fullCollect() - addTimer(nextSlotStart) do (p: pointer): asyncCheck node.onSlotStart(slot, nextSlot) diff --git a/vendor/nim-eth b/vendor/nim-eth index b4c1391be..0f48ccecc 160000 --- a/vendor/nim-eth +++ b/vendor/nim-eth @@ -1 +1 @@ -Subproject commit b4c1391be912c5b716d1ccc134ba405deea0c62f +Subproject commit 0f48ccecc0386df2f965cf4694ca63c8836b2d67