Remove async blob queue from Request Manager (#5198)

* Remove async blob queue from Request Manager

* Review feedback

* Review feedback
This commit is contained in:
henridf 2023-08-01 13:39:14 -07:00 committed by GitHub
parent 32e8bfe911
commit 1da82a5dcd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 70 additions and 86 deletions

View File

@ -1366,48 +1366,7 @@ proc onSlotStart(node: BeaconNode, wallTime: BeaconTime,
return false
proc handleMissingBlobs(node: BeaconNode) =
let
wallTime = node.beaconClock.now()
wallSlot = wallTime.slotOrZero()
delay = wallTime - wallSlot.start_beacon_time()
waitDur = TimeDiff(nanoseconds: BLOB_GOSSIP_WAIT_TIME_NS)
var fetches: seq[BlobFetchRecord]
for blobless in node.quarantine[].peekBlobless():
# give blobs a chance to arrive over gossip
if blobless.message.slot == wallSlot and delay < waitDur:
debug "Not handling missing blobs as early in slot"
continue
if not node.blobQuarantine[].hasBlobs(blobless):
let missing = node.blobQuarantine[].blobFetchRecord(blobless)
if len(missing.indices) == 0:
warn "quarantine missing blobs, but missing indices is empty",
blk=blobless.root,
indices=node.blobQuarantine[].blobIndices(blobless.root),
kzgs=len(blobless.message.body.blob_kzg_commitments)
fetches.add(missing)
else:
# this is a programming error should it occur.
warn "missing blob handler found blobless block with all blobs"
node.blockProcessor[].addBlock(
MsgSource.gossip,
ForkedSignedBeaconBlock.init(blobless),
Opt.some(node.blobQuarantine[].popBlobs(
blobless.root))
)
node.quarantine[].removeBlobless(blobless)
if fetches.len > 0:
debug "Requesting detected missing blobs", blobs = shortLog(fetches)
node.requestManager.fetchMissingBlobs(fetches)
proc onSecond(node: BeaconNode, time: Moment) =
## This procedure will be called once per second.
if not(node.syncManager.inProgress):
node.handleMissingBlobs()
# Nim GC metrics (for the main thread)
updateThreadMetrics()

View File

@ -34,7 +34,7 @@ const
BLOB_GOSSIP_WAIT_TIME_NS* = 2 * 1_000_000_000
## How long to wait for blobs to arrive over gossip before fetching.
POLL_FREQUENCY = 1.seconds
POLL_INTERVAL = 1.seconds
type
BlockVerifierFn* =
@ -44,7 +44,6 @@ type
RequestManager* = object
network*: Eth2Node
inpBlobQueue: AsyncQueue[BlobIdentifier]
getBeaconTime: GetBeaconTimeFn
inhibit: InhibitFn
quarantine: ref Quarantine
@ -68,8 +67,6 @@ proc init*(T: type RequestManager, network: Eth2Node,
blockVerifier: BlockVerifierFn): RequestManager =
RequestManager(
network: network,
# TODO remove this queue and poll the quarantine directly
inpBlobQueue: newAsyncQueue[BlobIdentifier](),
getBeaconTime: getBeaconTime,
inhibit: inhibit,
quarantine: quarantine,
@ -235,7 +232,7 @@ proc requestManagerBlockLoop(rman: RequestManager) {.async.} =
while true:
# TODO This polling could be replaced with an AsyncEvent that is fired
# from the quarantine when there's work to do
await sleepAsync(POLL_FREQUENCY)
await sleepAsync(POLL_INTERVAL)
if rman.inhibit():
continue
@ -268,46 +265,85 @@ proc requestManagerBlockLoop(rman: RequestManager) {.async.} =
failed = (len(workers) - succeed),
sync_speed = speed(start, finish)
except CancelledError as exc:
except CancelledError:
break
except CatchableError as exc:
warn "Unexpected error in request manager block loop", exc = exc.msg
proc getMissingBlobs(rman: RequestManager): seq[BlobIdentifier] =
let
wallTime = rman.getBeaconTime()
wallSlot = wallTime.slotOrZero()
delay = wallTime - wallSlot.start_beacon_time()
waitDur = TimeDiff(nanoseconds: BLOB_GOSSIP_WAIT_TIME_NS)
var fetches: seq[BlobIdentifier]
for blobless in rman.quarantine[].peekBlobless():
# give blobs a chance to arrive over gossip
if blobless.message.slot == wallSlot and delay < waitDur:
debug "Not handling missing blobs early in slot"
continue
if not rman.blobQuarantine[].hasBlobs(blobless):
let missing = rman.blobQuarantine[].blobFetchRecord(blobless)
if len(missing.indices) == 0:
warn "quarantine missing blobs, but missing indices is empty",
blk=blobless.root,
indices=rman.blobQuarantine[].blobIndices(blobless.root),
kzgs=len(blobless.message.body.blob_kzg_commitments)
for idx in missing.indices:
let id = BlobIdentifier(block_root: blobless.root, index: idx)
if id notin fetches:
fetches.add(id)
else:
# this is a programming error should it occur.
warn "missing blob handler found blobless block with all blobs",
blk=blobless.root,
indices=rman.blobQuarantine[].blobIndices(blobless.root),
kzgs=len(blobless.message.body.blob_kzg_commitments)
discard rman.blockVerifier(ForkedSignedBeaconBlock.init(blobless),
false)
rman.quarantine[].removeBlobless(blobless)
fetches
proc requestManagerBlobLoop(rman: RequestManager) {.async.} =
var idList = newSeq[BlobIdentifier]()
var workers = newSeq[Future[void]](PARALLEL_REQUESTS)
while true:
try:
idList.setLen(0)
idList.add(await rman.inpBlobQueue.popFirst())
# TODO This polling could be replaced with an AsyncEvent that is fired
# from the quarantine when there's work to do
await sleepAsync(POLL_INTERVAL)
if rman.inhibit():
continue
while len(rman.inpBlobQueue) > 0 and
lenu64(idList) < MAX_REQUEST_BLOB_SIDECARS:
let id = rman.inpBlobQueue.popFirstNoWait()
if id notin idList:
idList.add(id)
let fetches = rman.getMissingBlobs()
if fetches.len > 0:
debug "Requesting detected missing blobs", blobs = shortLog(fetches)
try:
let start = SyncMoment.now(0)
var workers: array[PARALLEL_REQUESTS, Future[void]]
for i in 0 ..< PARALLEL_REQUESTS:
workers[i] = rman.fetchBlobsFromNetwork(fetches)
let start = SyncMoment.now(0)
await allFutures(workers)
let finish = SyncMoment.now(uint64(len(fetches)))
for i in 0 ..< PARALLEL_REQUESTS:
workers[i] = rman.fetchBlobsFromNetwork(idList)
var succeed = 0
for worker in workers:
if worker.finished() and not(worker.failed()):
inc(succeed)
await allFutures(workers)
debug "Request manager blob tick",
blobs_count = len(fetches),
succeed = succeed,
failed = (len(workers) - succeed),
sync_speed = speed(start, finish)
var succeed = 0
for worker in workers:
if worker.finished() and not(worker.failed()):
inc(succeed)
debug "Request manager blob tick", blobs_count = len(idList),
succeed = succeed,
failed = (len(workers) - succeed),
queue_size = len(rman.inpBlobQueue)
except CancelledError as exc:
break
except CatchableError as exc:
warn "Unexpected error in request manager blob loop", exc = exc.msg
except CancelledError as exc:
break
except CatchableError as exc:
warn "Unexpected error in request manager blob loop", exc = exc.msg
proc start*(rman: var RequestManager) =
## Start Request Manager's loops.
@ -321,14 +357,3 @@ proc stop*(rman: RequestManager) =
if not(isNil(rman.blobLoopFuture)):
rman.blobLoopFuture.cancel()
proc fetchMissingBlobs*(rman: RequestManager,
recs: seq[BlobFetchRecord]) =
var idList: seq[BlobIdentifier]
for r in recs:
for idx in r.indices:
idList.add(BlobIdentifier(block_root: r.block_root, index: idx))
for id in idList:
try:
rman.inpBlobQueue.addLastNoWait(id)
except AsyncQueueFullError: raiseAssert "unbounded queue"