nimbus-eth2/beacon_chain/sync/request_manager.nim
Jacek Sieka c7abc97545
harden and speed up block sync (#3358)
* harden and speed up block sync

The `GetBlockBy*` server implementation currently reads SSZ bytes from
database, deserializes them into a Nim object then serializes them right
back to SSZ - here, we eliminate the deser/ser steps and send the bytes
straight to the network. Unfortunately, the snappy recoding must still
be done because of differences in framing.

Also, the quota system makes one giant request for quota right before
sending all blocks - this means that a 1024 block request will be
"paused" for a long time, then all blocks will be sent at once causing a
spike in database reads which potentially will see the reading client
time out before any block is sent.

Finally, on the reading side we make several copies of blocks as they
travel through various queues - this was not noticeable before but
becomes a problem in two cases: bellatrix blocks are up to 10mb (instead
of .. 30-40kb) and when backfilling, we process a lot more of them a lot
faster.

* fix status comparisons for nodes syncing from genesis (#3327 was a bit
too hard)
* don't hit database at all for post-altair slots in GetBlock v1
requests
2022-02-07 19:20:10 +02:00

200 lines
6.7 KiB
Nim

# beacon_chain
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
import std/[sequtils, strutils]
import chronos, chronicles
import
../spec/datatypes/[phase0],
../spec/forks,
../networking/eth2_network,
../consensus_object_pools/block_quarantine,
"."/sync_protocol, "."/sync_manager
export sync_manager
logScope:
topics = "requman"
const
SYNC_MAX_REQUESTED_BLOCKS* = 32 # Spec allows up to MAX_REQUEST_BLOCKS.
## Maximum number of blocks which will be requested in each
## `beaconBlocksByRoot` invocation.
PARALLEL_REQUESTS* = 2
## Number of peers we using to resolve our request.
type
BlockVerifier* =
proc(signedBlock: ForkedSignedBeaconBlock):
Future[Result[void, BlockError]] {.gcsafe, raises: [Defect].}
RequestManager* = object
network*: Eth2Node
inpQueue*: AsyncQueue[FetchRecord]
blockVerifier: BlockVerifier
loopFuture: Future[void]
func shortLog*(x: seq[Eth2Digest]): string =
"[" & x.mapIt(shortLog(it)).join(", ") & "]"
func shortLog*(x: seq[FetchRecord]): string =
"[" & x.mapIt(shortLog(it.root)).join(", ") & "]"
proc init*(T: type RequestManager, network: Eth2Node,
blockVerifier: BlockVerifier): RequestManager =
RequestManager(
network: network,
inpQueue: newAsyncQueue[FetchRecord](),
blockVerifier: blockVerifier
)
proc checkResponse(roots: openArray[Eth2Digest],
blocks: openArray[ref ForkedSignedBeaconBlock]): bool =
## This procedure checks peer's response.
var checks = @roots
if len(blocks) > len(roots):
return false
for blk in blocks:
let res = checks.find(blk[].root)
if res == -1:
return false
else:
checks.del(res)
return true
proc fetchAncestorBlocksFromNetwork(rman: RequestManager,
items: seq[Eth2Digest]) {.async.} =
var peer: Peer
try:
peer = await rman.network.peerPool.acquire()
debug "Requesting blocks by root", peer = peer, blocks = shortLog(items),
peer_score = peer.getScore()
let blocks = if peer.useSyncV2():
await beaconBlocksByRoot_v2(peer, BlockRootsList items)
else:
(await beaconBlocksByRoot(peer, BlockRootsList items)).map(
proc(blcks: seq[phase0.SignedBeaconBlock]): auto =
blcks.mapIt(newClone(ForkedSignedBeaconBlock.init(it))))
if blocks.isOk:
let ublocks = blocks.get()
if checkResponse(items, ublocks):
var
gotGoodBlock = false
gotUnviableBlock = false
for b in ublocks:
let ver = await rman.blockVerifier(b[])
if ver.isErr():
case ver.error()
of BlockError.MissingParent:
# Ignoring because the order of the blocks that
# we requested may be different from the order in which we need
# these blocks to apply.
discard
of BlockError.Duplicate:
# Ignoring because these errors could occur due to the
# concurrent/parallel requests we made.
discard
of BlockError.UnviableFork:
# If they're working a different fork, we'll want to descore them
# but also process the other blocks (in case we can register the
# other blocks as unviable)
gotUnviableBlock = true
of BlockError.Invalid:
# We stop processing blocks because peer is either sending us
# junk or working a different fork
warn "Received invalid block",
peer = peer, blocks = shortLog(items),
peer_score = peer.getScore()
peer.updateScore(PeerScoreBadBlocks)
return # Stop processing this junk...
else:
gotGoodBlock = true
if gotUnviableBlock:
notice "Received blocks from an unviable fork",
peer = peer, blocks = shortLog(items),
peer_score = peer.getScore()
peer.updateScore(PeerScoreUnviableFork)
elif gotGoodBlock:
# We reward peer only if it returns something.
peer.updateScore(PeerScoreGoodBlocks)
else:
peer.updateScore(PeerScoreBadResponse)
else:
peer.updateScore(PeerScoreNoBlocks)
except CancelledError as exc:
raise exc
except CatchableError as exc:
peer.updateScore(PeerScoreNoBlocks)
debug "Error while fetching ancestor blocks", exc = exc.msg,
items = shortLog(items), peer = peer, peer_score = peer.getScore()
raise exc
finally:
if not(isNil(peer)):
rman.network.peerPool.release(peer)
proc requestManagerLoop(rman: RequestManager) {.async.} =
var rootList = newSeq[Eth2Digest]()
var workers = newSeq[Future[void]](PARALLEL_REQUESTS)
while true:
try:
rootList.setLen(0)
let req = await rman.inpQueue.popFirst()
rootList.add(req.root)
var count = min(SYNC_MAX_REQUESTED_BLOCKS - 1, len(rman.inpQueue))
while count > 0:
rootList.add(rman.inpQueue.popFirstNoWait().root)
dec(count)
let start = SyncMoment.now(0)
for i in 0 ..< PARALLEL_REQUESTS:
workers[i] = rman.fetchAncestorBlocksFromNetwork(rootList)
# We do not care about
await allFutures(workers)
let finish = SyncMoment.now(uint64(len(rootList)))
var succeed = 0
for worker in workers:
if worker.finished() and not(worker.failed()):
inc(succeed)
debug "Request manager tick", blocks_count = len(rootList),
succeed = succeed,
failed = (len(workers) - succeed),
queue_size = len(rman.inpQueue),
sync_speed = speed(start, finish)
except CatchableError as exc:
debug "Got a problem in request manager", exc = exc.msg
proc start*(rman: var RequestManager) =
## Start Request Manager's loop.
rman.loopFuture = rman.requestManagerLoop()
proc stop*(rman: RequestManager) =
## Stop Request Manager's loop.
if not(isNil(rman.loopFuture)):
rman.loopFuture.cancel()
proc fetchAncestorBlocks*(rman: RequestManager, roots: seq[FetchRecord]) =
## Enqueue list missing blocks roots ``roots`` for download by
## Request Manager ``rman``.
for item in roots:
try:
rman.inpQueue.addLastNoWait(item)
except AsyncQueueFullError: raiseAssert "unbounded queue"