checkpoint database at end of each slot (#2195)

* checkpoint database at end of each slot

To avoid spending time on synchronizing with the file system while doing
processing, the manual checkpointing mode turns off fsync during
processing and instead checkpoints the database when the slot has ended.

From an sqlite perspecitve, in WAL mode this guaranees database
consistency but may lead to data loss which is fine - anything missing
from the beacon chain database can be recovered on the next startup.

* log sync status and delay in slot start message

* bump
This commit is contained in:
Jacek Sieka 2020-12-18 22:01:24 +01:00 committed by GitHub
parent 452042b17b
commit 0f8a3a5ae8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 43 additions and 29 deletions

View File

@ -21,8 +21,6 @@ type
keyspace: int keyspace: int
DepositsSeq = DbSeq[DepositData] DepositsSeq = DbSeq[DepositData]
ImmutableValidatorDataSeq = seq[ImmutableValidatorData]
ValidatorKeyToIndexMap = Table[ValidatorPubKey, ValidatorIndex]
DepositsMerkleizer* = SszMerkleizer[depositContractLimit] DepositsMerkleizer* = SszMerkleizer[depositContractLimit]
@ -46,6 +44,7 @@ type
backend: KvStoreRef backend: KvStoreRef
preset: RuntimePreset preset: RuntimePreset
genesisDeposits*: DepositsSeq genesisDeposits*: DepositsSeq
checkpoint*: proc() {.gcsafe.}
Keyspaces* = enum Keyspaces* = enum
defaultKeyspace = "kvstore" defaultKeyspace = "kvstore"
@ -218,8 +217,8 @@ proc init*(T: type BeaconChainDB,
let s = secureCreatePath(dir) let s = secureCreatePath(dir)
doAssert s.isOk # TODO(zah) Handle this in a better way doAssert s.isOk # TODO(zah) Handle this in a better way
let sqliteStore = SqStoreRef.init(dir, "nbc", Keyspaces).expect( let sqliteStore = SqStoreRef.init(
"working database") dir, "nbc", Keyspaces, manualCheckpoint = true).expect("working database")
# Remove the deposits table we used before we switched # Remove the deposits table we used before we switched
# to storing only deposit contract checkpoints # to storing only deposit contract checkpoints
@ -230,12 +229,12 @@ proc init*(T: type BeaconChainDB,
validatorKeyToIndex = initTable[ValidatorPubKey, ValidatorIndex]() validatorKeyToIndex = initTable[ValidatorPubKey, ValidatorIndex]()
genesisDepositsSeq = DbSeq[DepositData].init(sqliteStore, "genesis_deposits") genesisDepositsSeq = DbSeq[DepositData].init(sqliteStore, "genesis_deposits")
let isPyrmont =
not pyrmontMetadata.incompatible and preset == pyrmontMetadata.runtimePreset
T(backend: kvStore sqliteStore, T(backend: kvStore sqliteStore,
preset: preset, preset: preset,
genesisDeposits: genesisDepositsSeq) genesisDeposits: genesisDepositsSeq,
checkpoint: proc() = sqliteStore.checkpoint()
)
proc snappyEncode(inp: openArray[byte]): seq[byte] = proc snappyEncode(inp: openArray[byte]): seq[byte] =
try: try:

View File

@ -542,6 +542,34 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} =
if slot.isEpoch and node.getTopicSubscriptionEnabled: if slot.isEpoch and node.getTopicSubscriptionEnabled:
await node.cycleAttestationSubnets(slot) await node.cycleAttestationSubnets(slot)
proc onSlotEnd(node: BeaconNode, slot, nextSlot: Slot): Future[void] =
# Things we do when slot processing has ended and we're about to wait for the
# next slot
when declared(GC_fullCollect):
# The slots in the beacon node work as frames in a game: we want to make
# sure that we're ready for the next one and don't get stuck in lengthy
# garbage collection tasks when time is of essence in the middle of a slot -
# while this does not guarantee that we'll never collect during a slot, it
# makes sure that all the scratch space we used during slot tasks (logging,
# temporary buffers etc) gets recycled for the next slot that is likely to
# need similar amounts of memory.
GC_fullCollect()
# Checkpoint the database to clear the WAL file and make sure changes in
# the database are synced with the filesystem.
node.db.checkpoint()
info "Slot end",
slot = shortLog(slot),
nextSlot = shortLog(nextSlot),
head = shortLog(node.chainDag.head),
headEpoch = shortLog(node.chainDag.head.slot.compute_epoch_at_slot()),
finalizedHead = shortLog(node.chainDag.finalizedHead.blck),
finalizedEpoch = shortLog(node.chainDag.finalizedHead.blck.slot.compute_epoch_at_slot())
node.updateGossipStatus(slot)
proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} = proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} =
## Called at the beginning of a slot - usually every slot, but sometimes might ## Called at the beginning of a slot - usually every slot, but sometimes might
## skip a few in case we're running late. ## skip a few in case we're running late.
@ -559,15 +587,20 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} =
node.processor[].blockReceivedDuringSlot.complete() node.processor[].blockReceivedDuringSlot.complete()
node.processor[].blockReceivedDuringSlot = newFuture[void]() node.processor[].blockReceivedDuringSlot = newFuture[void]()
let delay = beaconTime - scheduledSlot.toBeaconTime()
info "Slot start", info "Slot start",
lastSlot = shortLog(lastSlot), lastSlot = shortLog(lastSlot),
scheduledSlot = shortLog(scheduledSlot), scheduledSlot = shortLog(scheduledSlot),
beaconTime = shortLog(beaconTime), delay,
peers = len(node.network.peerPool), peers = len(node.network.peerPool),
head = shortLog(node.chainDag.head), head = shortLog(node.chainDag.head),
headEpoch = shortLog(node.chainDag.head.slot.compute_epoch_at_slot()), headEpoch = shortLog(node.chainDag.head.slot.compute_epoch_at_slot()),
finalized = shortLog(node.chainDag.finalizedHead.blck), finalized = shortLog(node.chainDag.finalizedHead.blck),
finalizedEpoch = shortLog(finalizedEpoch) finalizedEpoch = shortLog(finalizedEpoch),
sync =
if node.syncManager.inProgress: node.syncManager.syncStatus
else: "synced"
# Check before any re-scheduling of onSlotStart() # Check before any re-scheduling of onSlotStart()
checkIfShouldStopAtEpoch(scheduledSlot, node.config.stopAtEpoch) checkIfShouldStopAtEpoch(scheduledSlot, node.config.stopAtEpoch)
@ -597,7 +630,7 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} =
slot = wallSlot.slot # afterGenesis == true! slot = wallSlot.slot # afterGenesis == true!
nextSlot = slot + 1 nextSlot = slot + 1
defer: await node.updateGossipStatus(slot) defer: await onSlotEnd(node, slot, nextSlot)
beacon_slot.set slot.int64 beacon_slot.set slot.int64
beacon_current_epoch.set slot.epoch.int64 beacon_current_epoch.set slot.epoch.int64
@ -649,24 +682,6 @@ proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.async.} =
let let
nextSlotStart = saturate(node.beaconClock.fromNow(nextSlot)) nextSlotStart = saturate(node.beaconClock.fromNow(nextSlot))
info "Slot end",
slot = shortLog(slot),
nextSlot = shortLog(nextSlot),
head = shortLog(node.chainDag.head),
headEpoch = shortLog(node.chainDag.head.slot.compute_epoch_at_slot()),
finalizedHead = shortLog(node.chainDag.finalizedHead.blck),
finalizedEpoch = shortLog(node.chainDag.finalizedHead.blck.slot.compute_epoch_at_slot())
when declared(GC_fullCollect):
# The slots in the beacon node work as frames in a game: we want to make
# sure that we're ready for the next one and don't get stuck in lengthy
# garbage collection tasks when time is of essence in the middle of a slot -
# while this does not guarantee that we'll never collect during a slot, it
# makes sure that all the scratch space we used during slot tasks (logging,
# temporary buffers etc) gets recycled for the next slot that is likely to
# need similar amounts of memory.
GC_fullCollect()
addTimer(nextSlotStart) do (p: pointer): addTimer(nextSlotStart) do (p: pointer):
asyncCheck node.onSlotStart(slot, nextSlot) asyncCheck node.onSlotStart(slot, nextSlot)

2
vendor/nim-eth vendored

@ -1 +1 @@
Subproject commit b4c1391be912c5b716d1ccc134ba405deea0c62f Subproject commit 0f48ccecc0386df2f965cf4694ca63c8836b2d67