RequestManager: add support for fetching Blobs (#4844)

* RequestManager: add support for fetching Blobs

* Review feedback

* Lint

* Change peekSortedBlobless -> peekBlobless
This commit is contained in:
henridf 2023-04-28 14:57:35 +02:00 committed by GitHub
parent 1027b98ea0
commit ef0b95dfbc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 207 additions and 31 deletions

View File

@ -19,8 +19,8 @@ import
./networking/eth2_network,
./eth1/eth1_monitor,
./consensus_object_pools/[
blockchain_dag, block_quarantine, consensus_manager, exit_pool,
attestation_pool, sync_committee_msg_pool],
blockchain_dag, blob_quarantine, block_quarantine, consensus_manager,
exit_pool, attestation_pool, sync_committee_msg_pool],
./spec/datatypes/[base, altair],
./spec/eth2_apis/dynamic_fee_recipients,
./sync/[sync_manager, request_manager],
@ -64,6 +64,7 @@ type
lightClient*: LightClient
dag*: ChainDAGRef
quarantine*: ref Quarantine
blobQuarantine*: ref BlobQuarantine
attestationPool*: ref AttestationPool
syncCommitteeMsgPool*: ref SyncCommitteeMsgPool
lightClientPool*: ref LightClientPool

View File

@ -8,7 +8,7 @@
{.push raises: [].}
import
std/tables,
std/[sequtils, strutils, tables],
../spec/datatypes/deneb
@ -19,7 +19,15 @@ const
type
BlobQuarantine* = object
blobs*: Table[(Eth2Digest, BlobIndex), ref BlobSidecar]
BlobFetchRecord* = object
block_root*: Eth2Digest
indices*: seq[BlobIndex]
func shortLog*(x: seq[BlobIndex]): string =
"<" & x.mapIt($it).join(", ") & ">"
func shortLog*(x: seq[BlobFetchRecord]): string =
"[" & x.mapIt(shortLog(it.block_root) & shortLog(it.indices)).join(", ") & "]"
func put*(quarantine: var BlobQuarantine, blobSidecar: ref BlobSidecar) =
if quarantine.blobs.lenu64 > MaxBlobs:
@ -30,7 +38,7 @@ func put*(quarantine: var BlobQuarantine, blobSidecar: ref BlobSidecar) =
func blobIndices*(quarantine: BlobQuarantine, digest: Eth2Digest):
seq[BlobIndex] =
var r: seq[BlobIndex] = @[]
for i in 0..MAX_BLOBS_PER_BLOCK-1:
for i in 0..<MAX_BLOBS_PER_BLOCK:
if quarantine.blobs.hasKey((digest, i)):
r.add(i)
r
@ -41,7 +49,7 @@ func hasBlob*(quarantine: BlobQuarantine, blobSidecar: BlobSidecar) : bool =
func popBlobs*(quarantine: var BlobQuarantine, digest: Eth2Digest):
seq[ref BlobSidecar] =
var r: seq[ref BlobSidecar] = @[]
for i in 0..MAX_BLOBS_PER_BLOCK-1:
for i in 0..<MAX_BLOBS_PER_BLOCK:
var b: ref BlobSidecar
if quarantine.blobs.pop((digest, i), b):
r.add(b)
@ -50,13 +58,13 @@ func popBlobs*(quarantine: var BlobQuarantine, digest: Eth2Digest):
func peekBlobs*(quarantine: var BlobQuarantine, digest: Eth2Digest):
seq[ref BlobSidecar] =
var r: seq[ref BlobSidecar] = @[]
for i in 0..MAX_BLOBS_PER_BLOCK-1:
for i in 0..<MAX_BLOBS_PER_BLOCK:
quarantine.blobs.withValue((digest, i), value):
r.add(value[])
r
func removeBlobs*(quarantine: var BlobQuarantine, digest: Eth2Digest) =
for i in 0..MAX_BLOBS_PER_BLOCK-1:
for i in 0..<MAX_BLOBS_PER_BLOCK:
quarantine.blobs.del((digest, i))
func hasBlobs*(quarantine: BlobQuarantine, blck: deneb.SignedBeaconBlock):
@ -68,3 +76,12 @@ func hasBlobs*(quarantine: BlobQuarantine, blck: deneb.SignedBeaconBlock):
if idxs[i] != uint64(i):
return false
true
func blobFetchRecord*(quarantine: BlobQuarantine, blck: deneb.SignedBeaconBlock):
BlobFetchRecord =
var indices: seq[BlobIndex]
for i in 0..<len(blck.message.body.blob_kzg_commitments):
let idx = BlobIndex(i)
if not quarantine.blobs.hasKey((blck.root, idx)):
indices.add(idx)
BlobFetchRecord(block_root: blck.root, indices: indices)

View File

@ -8,7 +8,7 @@
{.push raises: [].}
import
std/[options, tables],
std/[algorithm, options, tables],
stew/bitops2,
../spec/forks
@ -314,3 +314,7 @@ func popBlobless*(quarantine: var Quarantine, root: Eth2Digest):
return some(blck)
else:
return none(deneb.SignedBeaconBlock)
iterator peekBlobless*(quarantine: var Quarantine): deneb.SignedBeaconBlock =
for k, v in quarantine.blobless.mpairs():
yield v

View File

@ -403,6 +403,7 @@ proc initFullNode(
dag.setReorgCb(onChainReorg)
node.dag = dag
node.blobQuarantine = blobQuarantine
node.quarantine = quarantine
node.attestationPool = attestationPool
node.syncCommitteeMsgPool = syncCommitteeMsgPool
@ -414,6 +415,8 @@ proc initFullNode(
node.requestManager = RequestManager.init(node.network,
dag.cfg.DENEB_FORK_EPOCH,
getBeaconTime,
quarantine,
blobQuarantine,
rmanBlockVerifier)
node.syncManager = syncManager
node.backfiller = backfiller
@ -1345,6 +1348,36 @@ proc onSlotStart(node: BeaconNode, wallTime: BeaconTime,
return false
proc handleMissingBlobs(node: BeaconNode) =
let
wallTime = node.beaconClock.now()
wallSlot = wallTime.slotOrZero()
delay = wallTime - wallSlot.start_beacon_time()
waitDur = TimeDiff(nanoseconds: BLOB_GOSSIP_WAIT_TIME_NS)
var fetches: seq[BlobFetchRecord]
for blobless in node.quarantine[].peekBlobless():
# give blobs a chance to arrive over gossip
if blobless.message.slot == wallSlot and delay < waitDur:
continue
if not node.blobQuarantine[].hasBlobs(blobless):
let missing = node.blobQuarantine[].blobFetchRecord(blobless)
doAssert not len(missing.indices) == 0
fetches.add(missing)
else:
# this is a programming error should it occur.
warn "missing blob handler found blobless block with all blobs"
node.blockProcessor[].addBlock(
MsgSource.gossip,
ForkedSignedBeaconBlock.init(blobless),
node.blobQuarantine[].popBlobs(
blobless.root)
)
node.quarantine[].removeBlobless(blobless)
node.requestManager.fetchMissingBlobs(fetches)
proc handleMissingBlocks(node: BeaconNode) =
let missingBlocks = node.quarantine[].checkMissing()
if missingBlocks.len > 0:
@ -1355,6 +1388,7 @@ proc onSecond(node: BeaconNode, time: Moment) =
## This procedure will be called once per second.
if not(node.syncManager.inProgress):
node.handleMissingBlocks()
node.handleMissingBlobs()
# Nim GC metrics (for the main thread)
updateThreadMetrics()
@ -1364,9 +1398,6 @@ proc onSecond(node: BeaconNode, time: Moment) =
notice "Shutting down after having reached the target synced epoch"
bnStatus = BeaconNodeStatus.Stopping
# TODO
# onSecond timer to handle missing blobs, similar to above for blocks
proc runOnSecondLoop(node: BeaconNode) {.async.} =
const
sleepTime = chronos.seconds(1)

View File

@ -17,6 +17,7 @@
import
chronicles,
std/[sequtils, strutils],
stew/[bitops2, byteutils],
json_serialization,
ssz_serialization/[merkleization, proofs],
@ -566,6 +567,9 @@ func shortLog*(v: ExecutionPayload): auto =
num_withdrawals: len(v.withdrawals)
)
func shortLog*(x: seq[BlobIdentifier]): string =
"[" & x.mapIt(shortLog(it.block_root) & "/" & $it.index).join(", ") & "]"
# https://github.com/ethereum/consensus-specs/blob/v1.3.0-rc.5/specs/deneb/light-client/sync-protocol.md#modified-get_lc_execution_root
func get_lc_execution_root*(
header: LightClientHeader, cfg: RuntimeConfig): Eth2Digest =

View File

@ -10,10 +10,11 @@
import std/[sequtils, strutils]
import chronos, chronicles
import
../spec/datatypes/[phase0],
../spec/forks,
../spec/datatypes/[phase0, deneb],
../spec/[forks, network],
../networking/eth2_network,
../consensus_object_pools/block_quarantine,
../consensus_object_pools/blob_quarantine,
"."/sync_protocol, "."/sync_manager,
../gossip_processing/block_processor
@ -30,16 +31,23 @@ const
PARALLEL_REQUESTS* = 2
## Number of peers we using to resolve our request.
BLOB_GOSSIP_WAIT_TIME_NS* = 2 * 1_000_000_000
## How long to wait for blobs to arrive over gossip before fetching.
type
BlockVerifier* =
proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool):
Future[Result[void, VerifierError]] {.gcsafe, raises: [Defect].}
RequestManager* = object
network*: Eth2Node
inpQueue*: AsyncQueue[FetchRecord]
inpBlockQueue*: AsyncQueue[FetchRecord]
inpBlobQueue: AsyncQueue[BlobIdentifier]
getBeaconTime: GetBeaconTimeFn
quarantine: ref Quarantine
blobQuarantine: ref BlobQuarantine
blockVerifier: BlockVerifier
loopFuture: Future[void]
blockLoopFuture: Future[void]
blobLoopFuture: Future[void]
func shortLog*(x: seq[Eth2Digest]): string =
"[" & x.mapIt(shortLog(it)).join(", ") & "]"
@ -50,11 +58,16 @@ func shortLog*(x: seq[FetchRecord]): string =
proc init*(T: type RequestManager, network: Eth2Node,
denebEpoch: Epoch,
getBeaconTime: GetBeaconTimeFn,
quarantine: ref Quarantine,
blobQuarantine: ref BlobQuarantine,
blockVerifier: BlockVerifier): RequestManager =
RequestManager(
network: network,
inpQueue: newAsyncQueue[FetchRecord](),
inpBlockQueue: newAsyncQueue[FetchRecord](),
inpBlobQueue: newAsyncQueue[BlobIdentifier](),
getBeaconTime: getBeaconTime,
quarantine: quarantine,
blobQuarantine: blobQuarantine,
blockVerifier: blockVerifier,
)
@ -70,7 +83,22 @@ proc checkResponse(roots: openArray[Eth2Digest],
return false
else:
checks.del(res)
return true
true
proc checkResponse(idList: seq[BlobIdentifier],
blobs: openArray[ref BlobSidecar]): bool =
if len(blobs) > len(idList):
return false
for blob in blobs:
var found = false
for id in idList:
if id.block_root == blob.block_root and
id.index == blob.index:
found = true
break
if not found:
return false
true
proc fetchAncestorBlocksFromNetwork(rman: RequestManager,
items: seq[Eth2Digest]) {.async.} =
@ -144,19 +172,60 @@ proc fetchAncestorBlocksFromNetwork(rman: RequestManager,
if not(isNil(peer)):
rman.network.peerPool.release(peer)
proc fetchBlobsFromNetwork(self: RequestManager,
idList: seq[BlobIdentifier]) {.async.} =
var peer: Peer
proc requestManagerLoop(rman: RequestManager) {.async.} =
try:
peer = await self.network.peerPool.acquire()
debug "Requesting blobs by root", peer = peer, blobs = shortLog(idList),
peer_score = peer.getScore()
let blobs = (await blobSidecarsByRoot(peer, BlobIdentifierList idList))
if blobs.isOk:
let ublobs = blobs.get()
if not checkResponse(idList, ublobs.asSeq()):
peer.updateScore(PeerScoreBadResponse)
return
for b in ublobs:
self.blobQuarantine[].put(b)
var curRoot: Eth2Digest
for b in ublobs:
if b.block_root != curRoot:
curRoot = b.block_root
if (let o = self.quarantine[].popBlobless(curRoot); o.isSome):
let b = o.unsafeGet()
discard await self.blockVerifier(ForkedSignedBeaconBlock.init(b), false)
# TODO:
# If appropriate, return a VerifierError.InvalidBlob from verification,
# check for it here, and penalize the peer accordingly.
except CancelledError as exc:
raise exc
except CatchableError as exc:
peer.updateScore(PeerScoreNoValues)
debug "Error while fetching blobs", exc = exc.msg,
idList = shortLog(idList), peer = peer, peer_score = peer.getScore()
raise exc
finally:
if not(isNil(peer)):
self.network.peerPool.release(peer)
proc requestManagerBlockLoop(rman: RequestManager) {.async.} =
var rootList = newSeq[Eth2Digest]()
var workers = newSeq[Future[void]](PARALLEL_REQUESTS)
while true:
try:
rootList.setLen(0)
let req = await rman.inpQueue.popFirst()
let req = await rman.inpBlockQueue.popFirst()
rootList.add(req.root)
var count = min(SYNC_MAX_REQUESTED_BLOCKS - 1, len(rman.inpQueue))
var count = min(SYNC_MAX_REQUESTED_BLOCKS - 1, len(rman.inpBlockQueue))
while count > 0:
rootList.add(rman.inpQueue.popFirstNoWait().root)
rootList.add(rman.inpBlockQueue.popFirstNoWait().root)
dec(count)
let start = SyncMoment.now(0)
@ -174,28 +243,78 @@ proc requestManagerLoop(rman: RequestManager) {.async.} =
if worker.finished() and not(worker.failed()):
inc(succeed)
debug "Request manager tick", blocks_count = len(rootList),
succeed = succeed,
failed = (len(workers) - succeed),
queue_size = len(rman.inpQueue),
sync_speed = speed(start, finish)
debug "Request manager block tick", blocks_count = len(rootList),
succeed = succeed,
failed = (len(workers) - succeed),
queue_size = len(rman.inpBlockQueue),
sync_speed = speed(start, finish)
except CatchableError as exc:
debug "Got a problem in request manager", exc = exc.msg
proc requestManagerBlobLoop(rman: RequestManager) {.async.} =
var idList = newSeq[BlobIdentifier]()
var workers = newSeq[Future[void]](PARALLEL_REQUESTS)
while true:
try:
idList.setLen(0)
let id = await rman.inpBlobQueue.popFirst()
idList.add(id)
var count = min(MAX_REQUEST_BLOB_SIDECARS - 1, lenu64(rman.inpBlobQueue))
while count > 0:
idList.add(rman.inpBlobQueue.popFirstNoWait())
dec(count)
let start = SyncMoment.now(0)
for i in 0 ..< PARALLEL_REQUESTS:
workers[i] = rman.fetchBlobsFromNetwork(idList)
await allFutures(workers)
var succeed = 0
for worker in workers:
if worker.finished() and not(worker.failed()):
inc(succeed)
debug "Request manager blob tick", blobs_count = len(idList),
succeed = succeed,
failed = (len(workers) - succeed),
queue_size = len(rman.inpBlobQueue)
except CatchableError as exc:
debug "Got a problem in request manager", exc = exc.msg
proc start*(rman: var RequestManager) =
## Start Request Manager's loop.
rman.loopFuture = rman.requestManagerLoop()
## Start Request Manager's loops.
rman.blockLoopFuture = rman.requestManagerBlockLoop()
rman.blobLoopFuture = rman.requestManagerBlobLoop()
proc stop*(rman: RequestManager) =
## Stop Request Manager's loop.
if not(isNil(rman.loopFuture)):
rman.loopFuture.cancel()
if not(isNil(rman.blockLoopFuture)):
rman.blockLoopFuture.cancel()
if not(isNil(rman.blobLoopFuture)):
rman.blobLoopFuture.cancel()
proc fetchAncestorBlocks*(rman: RequestManager, roots: seq[FetchRecord]) =
## Enqueue list missing blocks roots ``roots`` for download by
## Request Manager ``rman``.
for item in roots:
try:
rman.inpQueue.addLastNoWait(item)
rman.inpBlockQueue.addLastNoWait(item)
except AsyncQueueFullError: raiseAssert "unbounded queue"
proc fetchMissingBlobs*(rman: RequestManager,
recs: seq[BlobFetchRecord]) =
var idList: seq[BlobIdentifier]
for r in recs:
for idx in r.indices:
idList.add(BlobIdentifier(block_root: r.block_root, index: idx))
for id in idList:
try:
rman.inpBlobQueue.addLastNoWait(id)
except AsyncQueueFullError: raiseAssert "unbounded queue"