Wire up blob fetching with request manager (#4527)

* Wire up blob fetching and request manager

* Update beacon_chain/sync/request_manager.nim

Co-authored-by: tersec <tersec@users.noreply.github.com>

* Review feedback: remove redundant RequestManager field.

* Fix syntax

* Improve blob fetching logic

* fix previous commit

* rman.isBlobsTime(): Remove spurious parameter

* Review feedback

* expressionify an if

---------

Co-authored-by: tersec <tersec@users.noreply.github.com>
This commit is contained in:
henridf 2023-02-01 00:25:08 +01:00 committed by GitHub
parent cdf69b9360
commit d7316ac863
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 139 additions and 6 deletions

View File

@ -204,8 +204,8 @@ type
Eth2NetworkingError = object
case kind*: Eth2NetworkingErrorKind
of ReceivedErrorResponse:
responseCode: ResponseCode
errorMsg: string
responseCode*: ResponseCode
errorMsg*: string
else:
discard

View File

@ -331,6 +331,16 @@ proc initFullNode(
blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.none(eip4844.BlobsSidecar), resfut)
resfut
blockBlobsVerifier = proc(signedBlock: ForkedSignedBeaconBlock, blobs: eip4844.BlobsSidecar):
Future[Result[void, VerifierError]] =
# The design with a callback for block verification is unusual compared
# to the rest of the application, but fits with the general approach
# taken in the sync/request managers - this is an architectural compromise
# that should probably be reimagined more holistically in the future.
let resfut = newFuture[Result[void, VerifierError]]("blockVerifier")
blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.some(blobs), resfut)
resfut
processor = Eth2Processor.new(
config.doppelgangerDetection,
blockProcessor, node.validatorMonitor, dag, attestationPool,
@ -377,7 +387,8 @@ proc initFullNode(
node.processor = processor
node.blockProcessor = blockProcessor
node.consensusManager = consensusManager
node.requestManager = RequestManager.init(node.network, blockVerifier)
node.requestManager = RequestManager.init(node.network, dag.cfg, getBeaconTime,
blockVerifier, blockBlobsVerifier)
node.syncManager = syncManager
node.backfiller = backfiller
node.router = router

View File

@ -15,6 +15,7 @@ import
../networking/eth2_network,
../consensus_object_pools/block_quarantine,
"."/sync_protocol, "."/sync_manager
from ../beacon_clock import GetBeaconTimeFn
export block_quarantine, sync_manager
logScope:
@ -31,11 +32,17 @@ type
BlockVerifier* =
proc(signedBlock: ForkedSignedBeaconBlock):
Future[Result[void, VerifierError]] {.gcsafe, raises: [Defect].}
BlockBlobsVerifier* =
proc(signedBlock: ForkedSignedBeaconBlock, blobs: eip4844.BlobsSidecar):
Future[Result[void, VerifierError]] {.gcsafe, raises: [Defect].}
RequestManager* = object
network*: Eth2Node
inpQueue*: AsyncQueue[FetchRecord]
cfg: RuntimeConfig
getBeaconTime: GetBeaconTimeFn
blockVerifier: BlockVerifier
blockBlobsVerifier: BlockBlobsVerifier
loopFuture: Future[void]
func shortLog*(x: seq[Eth2Digest]): string =
@ -45,13 +52,33 @@ func shortLog*(x: seq[FetchRecord]): string =
"[" & x.mapIt(shortLog(it.root)).join(", ") & "]"
proc init*(T: type RequestManager, network: Eth2Node,
blockVerifier: BlockVerifier): RequestManager =
cfg: RuntimeConfig,
getBeaconTime: GetBeaconTimeFn,
blockVerifier: BlockVerifier,
blockBlobsVerifier: BlockBlobsVerifier): RequestManager =
RequestManager(
network: network,
inpQueue: newAsyncQueue[FetchRecord](),
blockVerifier: blockVerifier
cfg: cfg,
getBeaconTime: getBeaconTime,
blockVerifier: blockVerifier,
blockBlobsVerifier: blockBlobsVerifier,
)
proc checkResponse(roots: openArray[Eth2Digest],
blocks: openArray[ref SignedBeaconBlockAndBlobsSidecar]): bool =
## This procedure checks peer's response.
var checks = @roots
if len(blocks) > len(roots):
return false
for item in blocks:
let res = checks.find(item[].beacon_block.root)
if res == -1:
return false
else:
checks.del(res)
true
proc checkResponse(roots: openArray[Eth2Digest],
blocks: openArray[ref ForkedSignedBeaconBlock]): bool =
## This procedure checks peer's response.
@ -138,6 +165,92 @@ proc fetchAncestorBlocksFromNetwork(rman: RequestManager,
if not(isNil(peer)):
rman.network.peerPool.release(peer)
proc fetchAncestorBlocksAndBlobsFromNetwork(rman: RequestManager,
items: seq[Eth2Digest]) {.async.} =
var peer: Peer
try:
peer = await rman.network.peerPool.acquire()
debug "Requesting blocks by root", peer = peer, blocks = shortLog(items),
peer_score = peer.getScore()
let blocks = (await beaconBlockAndBlobsSidecarByRoot_v1(peer, BlockRootsList items))
if blocks.isOk:
let ublocks = blocks.get()
if checkResponse(items, ublocks.asSeq()):
var
gotGoodBlock = false
gotUnviableBlock = false
for b in ublocks:
let ver = await rman.blockBlobsVerifier(ForkedSignedBeaconBlock.init(b[].beacon_block), b[].blobs_sidecar)
if ver.isErr():
case ver.error()
of VerifierError.MissingParent:
# Ignoring because the order of the blocks that
# we requested may be different from the order in which we need
# these blocks to apply.
discard
of VerifierError.Duplicate:
# Ignoring because these errors could occur due to the
# concurrent/parallel requests we made.
discard
of VerifierError.UnviableFork:
# If they're working a different fork, we'll want to descore them
# but also process the other blocks (in case we can register the
# other blocks as unviable)
gotUnviableBlock = true
of VerifierError.Invalid:
# We stop processing blocks because peer is either sending us
# junk or working a different fork
notice "Received invalid block",
peer = peer, blocks = shortLog(items),
peer_score = peer.getScore()
peer.updateScore(PeerScoreBadValues)
return # Stop processing this junk...
else:
gotGoodBlock = true
if gotUnviableBlock:
notice "Received blocks from an unviable fork",
peer = peer, blocks = shortLog(items),
peer_score = peer.getScore()
peer.updateScore(PeerScoreUnviableFork)
elif gotGoodBlock:
# We reward peer only if it returns something.
peer.updateScore(PeerScoreGoodValues)
else:
let err = blocks.error()
case err.kind
of ReceivedErrorResponse:
if err.responseCode == ResourceUnavailable:
if not(isNil(peer)):
rman.network.peerPool.release(peer)
await rman.fetchAncestorBlocksFromNetwork(items)
return
else:
peer.updateScore(PeerScoreBadResponse)
else:
discard
else:
peer.updateScore(PeerScoreNoValues)
except CancelledError as exc:
raise exc
except CatchableError as exc:
peer.updateScore(PeerScoreNoValues)
debug "Error while fetching ancestor blocks", exc = exc.msg,
items = shortLog(items), peer = peer, peer_score = peer.getScore()
raise exc
finally:
if not(isNil(peer)):
rman.network.peerPool.release(peer)
proc isBlobsTime(rman: RequestManager): bool =
rman.getBeaconTime().slotOrZero.epoch >= rman.cfg.EIP4844_FORK_EPOCH
proc requestManagerLoop(rman: RequestManager) {.async.} =
var rootList = newSeq[Eth2Digest]()
var workers = newSeq[Future[void]](PARALLEL_REQUESTS)
@ -154,8 +267,17 @@ proc requestManagerLoop(rman: RequestManager) {.async.} =
let start = SyncMoment.now(0)
# As soon as EIP4844_FORK_EPOCH comes around in wall time, we
# switch to requesting blocks and blobs. In the vicinity of the
# transition, that means that we *may* request blobs for a
# pre-eip4844. In that case, we get ResourceUnavailable from the
# peer and fall back to requesting blocks only.
let getBlobs = rman.isBlobsTime()
for i in 0 ..< PARALLEL_REQUESTS:
workers[i] = rman.fetchAncestorBlocksFromNetwork(rootList)
workers[i] = if getBlobs:
rman.fetchAncestorBlocksAndBlobsFromNetwork(rootList)
else:
rman.fetchAncestorBlocksFromNetwork(rootList)
# We do not care about
await allFutures(workers)