speed up trusted node backfill (#3371)

With these changes, we can backfill about 400-500 slots/sec, which means
a full backfill of mainnet takes about 2-3h.

However, the CPU is not saturated - neither in server nor in client
meaning that somewhere, there's an artificial inefficiency in the
communication - 16 parallel downloads *should* saturate the CPU.

One plasible cause would be "too many async event loop iterations" per
block request, which would introduce multiple "sleep-like" delays along
the way.

I can push the speed up to 800 slots/sec by increasing parallel
downloads even further, but going after the root cause of the slowness
would be better.

* avoid some unnecessary block copies
* double parallel requests
This commit is contained in:
Jacek Sieka 2022-02-12 12:09:59 +01:00 committed by GitHub
parent 40fe8f5336
commit 1f89b7f7b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 24 additions and 23 deletions

View File

@ -160,7 +160,7 @@ proc getBlockV2Plain*(block_id: BlockIdent): RestPlainResponse {.
proc getBlockV2*(client: RestClientRef, block_id: BlockIdent, proc getBlockV2*(client: RestClientRef, block_id: BlockIdent,
cfg: RuntimeConfig, cfg: RuntimeConfig,
restAccept = ""): Future[Option[ForkedSignedBeaconBlock]] {. restAccept = ""): Future[Option[ref ForkedSignedBeaconBlock]] {.
async.} = async.} =
# Return the asked-for block, or None in case 404 is returned from the server. # Return the asked-for block, or None in case 404 is returned from the server.
# Raises on other errors # Raises on other errors
@ -181,17 +181,17 @@ proc getBlockV2*(client: RestClientRef, block_id: BlockIdent,
resp.contentType) resp.contentType)
if res.isErr(): if res.isErr():
raise newException(RestError, $res.error()) raise newException(RestError, $res.error())
res.get() newClone(res.get())
some blck some blck
of "application/octet-stream": of "application/octet-stream":
try: try:
some readSszForkedSignedBeaconBlock(cfg, resp.data) some newClone(readSszForkedSignedBeaconBlock(cfg, resp.data))
except CatchableError as exc: except CatchableError as exc:
raise newException(RestError, exc.msg) raise newException(RestError, exc.msg)
else: else:
raise newException(RestError, "Unsupported content-type") raise newException(RestError, "Unsupported content-type")
of 404: of 404:
none(ForkedSignedBeaconBlock) none(ref ForkedSignedBeaconBlock)
of 400, 500: of 400, 500:
let error = let error =

View File

@ -96,7 +96,7 @@ proc doTrustedNodeSync*(
var client = RestClientRef.new(restUrl).get() var client = RestClientRef.new(restUrl).get()
proc downloadBlock(slot: Slot): proc downloadBlock(slot: Slot):
Future[Option[ForkedSignedBeaconBlock]] {.async.} = Future[Option[ref ForkedSignedBeaconBlock]] {.async.} =
# Download block at given slot, retrying a few times, # Download block at given slot, retrying a few times,
var lastError: ref CatchableError var lastError: ref CatchableError
for i in 0..<3: for i in 0..<3:
@ -175,7 +175,7 @@ proc doTrustedNodeSync*(
# what slot we'll get - to find it, we'll keep walking backwards for a # what slot we'll get - to find it, we'll keep walking backwards for a
# reasonable number of tries # reasonable number of tries
var var
checkpointBlock: ForkedSignedBeaconBlock checkpointBlock: ref ForkedSignedBeaconBlock
id = BlockIdent.decodeString(blockId).valueOr: id = BlockIdent.decodeString(blockId).valueOr:
error "Cannot decode checkpoint block id, must be a slot, hash, 'finalized' or 'head'", error "Cannot decode checkpoint block id, must be a slot, hash, 'finalized' or 'head'",
blockId blockId
@ -207,7 +207,7 @@ proc doTrustedNodeSync*(
checkpointBlock = blck.get() checkpointBlock = blck.get()
let checkpointSlot = getForkedBlockField(checkpointBlock, slot) let checkpointSlot = getForkedBlockField(checkpointBlock[], slot)
if checkpointSlot > headSlot: if checkpointSlot > headSlot:
# When the checkpoint is newer than the head, we run into trouble: the # When the checkpoint is newer than the head, we run into trouble: the
# current backfill in ChainDAG does not support filling in arbitrary gaps. # current backfill in ChainDAG does not support filling in arbitrary gaps.
@ -215,7 +215,7 @@ proc doTrustedNodeSync*(
# backfiller would re-download the entire backfill history. # backfiller would re-download the entire backfill history.
# For now, we'll abort and let the user choose what to do. # For now, we'll abort and let the user choose what to do.
error "Checkpoint block is newer than head slot - start with a new database or use a checkpoint no more recent than the head", error "Checkpoint block is newer than head slot - start with a new database or use a checkpoint no more recent than the head",
checkpointSlot, checkpointRoot = shortLog(checkpointBlock.root), headSlot checkpointSlot, checkpointRoot = shortLog(checkpointBlock[].root), headSlot
quit 1 quit 1
if checkpointSlot.is_epoch(): if checkpointSlot.is_epoch():
@ -236,11 +236,11 @@ proc doTrustedNodeSync*(
quit 1 quit 1
checkpointBlock checkpointBlock
let checkpointSlot = getForkedBlockField(checkpointBlock, slot) let checkpointSlot = getForkedBlockField(checkpointBlock[], slot)
if checkpointBlock.root in dbCache.summaries: if checkpointBlock[].root in dbCache.summaries:
notice "Checkpoint block is already known, skipping checkpoint state download" notice "Checkpoint block is already known, skipping checkpoint state download"
withBlck(checkpointBlock): withBlck(checkpointBlock[]):
dbCache.updateSlots(blck.root, blck.message.slot) dbCache.updateSlots(blck.root, blck.message.slot)
else: else:
@ -261,10 +261,10 @@ proc doTrustedNodeSync*(
withState(state[]): withState(state[]):
let latest_block_root = state.latest_block_root let latest_block_root = state.latest_block_root
if latest_block_root != checkpointBlock.root: if latest_block_root != checkpointBlock[].root:
error "Checkpoint state does not match checkpoint block, server error?", error "Checkpoint state does not match checkpoint block, server error?",
blockRoot = shortLog(checkpointBlock.root), blockRoot = shortLog(checkpointBlock[].root),
blck = shortLog(checkpointBlock), blck = shortLog(checkpointBlock[]),
stateBlockRoot = shortLog(latest_block_root) stateBlockRoot = shortLog(latest_block_root)
quit 1 quit 1
@ -272,7 +272,7 @@ proc doTrustedNodeSync*(
stateRoot = shortLog(state.root) stateRoot = shortLog(state.root)
db.putState(state) db.putState(state)
withBlck(checkpointBlock): withBlck(checkpointBlock[]):
info "Writing checkpoint block", info "Writing checkpoint block",
blockRoot = shortLog(blck.root), blockRoot = shortLog(blck.root),
blck = shortLog(blck.message) blck = shortLog(blck.message)
@ -310,8 +310,9 @@ proc doTrustedNodeSync*(
stamp = SyncMoment.now(0) stamp = SyncMoment.now(0)
# Download several blocks in parallel but process them serially # Download several blocks in parallel but process them serially
var gets: array[8, Future[Option[ForkedSignedBeaconBlock]]] var gets: array[16, Future[Option[ref ForkedSignedBeaconBlock]]]
proc processBlock(fut: Future[Option[ForkedSignedBeaconBlock]], slot: Slot) {.async.} = proc processBlock(
fut: Future[Option[ref ForkedSignedBeaconBlock]], slot: Slot) {.async.} =
processed += 1 processed += 1
var blck = await fut var blck = await fut
if blck.isNone(): if blck.isNone():
@ -319,7 +320,7 @@ proc doTrustedNodeSync*(
return return
let data = blck.get() let data = blck.get()
withBlck(data): withBlck(data[]):
debug "Processing", debug "Processing",
blck = shortLog(blck.message), blck = shortLog(blck.message),
blockRoot = shortLog(blck.root) blockRoot = shortLog(blck.root)
@ -364,17 +365,17 @@ proc doTrustedNodeSync*(
syncCount += 1 syncCount += 1
let let
remaining = blck.message.slot.int.float remaining = blck.message.slot.int
slotsPerSec = speed(stamp, newStamp) slotsPerSec = speed(stamp, newStamp)
avgSyncSpeed = avgSyncSpeed + (slotsPerSec - avgSyncSpeed) / float(syncCount) avgSyncSpeed = avgSyncSpeed + (slotsPerSec - avgSyncSpeed) / float(syncCount)
info "Backfilling", info "Backfilling",
timeleft = toTimeLeftString( timeleft = toTimeLeftString(
if avgSyncSpeed >= 0.001: if avgSyncSpeed >= 0.001:
Duration.fromFloatSeconds(remaining / avgSyncSpeed) Duration.fromFloatSeconds(remaining.float / avgSyncSpeed)
else: InfiniteDuration), else: InfiniteDuration),
avgSyncSpeed, slotsPerSecond = avgSyncSpeed,
remaining remainingSlots = remaining
stamp = newStamp stamp = newStamp
# Download blocks backwards from the checkpoint slot, skipping the ones we # Download blocks backwards from the checkpoint slot, skipping the ones we
@ -398,7 +399,7 @@ proc doTrustedNodeSync*(
missingSlots missingSlots
notice "Done, your beacon node is ready to serve you! Don't forget to check that you're on the canoncial chain by comparing the checkpoint root with other online sources. See https://nimbus.guide/trusted-node-sync.html for more information.", notice "Done, your beacon node is ready to serve you! Don't forget to check that you're on the canoncial chain by comparing the checkpoint root with other online sources. See https://nimbus.guide/trusted-node-sync.html for more information.",
checkpointRoot = checkpointBlock.root checkpointRoot = checkpointBlock[].root
when isMainModule: when isMainModule:
import std/[os] import std/[os]