Fetch by-root request directly from quarantine (#5167)

When the requestmanager is busy fetching blocks, the queue might get
filled with multiple entries of the same root - since there is no
deduplication, requests containing the same root multiple times will be
sent out.

Also, because the items sit in the queue for a long time potentially,
the request might be stale by the time that the manager is ready with
the previous request.

This PR removes the queue and directly fetches the blocks to download
from the quarantine which solves both problems (the quarantine already
de-duplicates and is clean of stale information).

Removing the queue for blobs is left for a future PR.

Co-authored-by: tersec <tersec@users.noreply.github.com>
This commit is contained in:
Jacek Sieka 2023-07-11 18:22:02 +02:00 committed by GitHub
parent 174c33e5fa
commit ca1775f725
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 73 additions and 57 deletions

View File

@ -77,7 +77,7 @@ type
func init*(T: type Quarantine): T =
T()
func checkMissing*(quarantine: var Quarantine): seq[FetchRecord] =
func checkMissing*(quarantine: var Quarantine, max: int): seq[FetchRecord] =
## Return a list of blocks that we should try to resolve from other client -
## to be called periodically but not too often (once per slot?)
var done: seq[Eth2Digest]
@ -85,16 +85,17 @@ func checkMissing*(quarantine: var Quarantine): seq[FetchRecord] =
for k, v in quarantine.missing.mpairs():
if v.tries > 8:
done.add(k)
else:
inc v.tries
for k in done:
quarantine.missing.del(k)
# simple (simplistic?) exponential backoff for retries..
for k, v in quarantine.missing:
for k, v in quarantine.missing.mpairs:
v.tries += 1
if countOnes(v.tries.uint64) == 1:
result.add(FetchRecord(root: k))
if result.len >= max:
break
# TODO stew/sequtils2
template anyIt(s, pred: untyped): bool =

View File

@ -369,6 +369,8 @@ proc initFullNode(
resfut,
maybeFinalized = maybeFinalized)
resfut
processor = Eth2Processor.new(
config.doppelgangerDetection,
blockProcessor, node.validatorMonitor, dag, attestationPool,
@ -386,6 +388,10 @@ proc initFullNode(
router = (ref MessageRouter)(
processor: processor,
network: node.network)
requestManager = RequestManager.init(
node.network, dag.cfg.DENEB_FORK_EPOCH, getBeaconTime,
(proc(): bool = syncManager.inProgress),
quarantine, blobQuarantine, rmanBlockVerifier)
if node.config.lightClientDataServe:
proc scheduleSendingLightClientUpdates(slot: Slot) =
@ -417,12 +423,7 @@ proc initFullNode(
node.processor = processor
node.blockProcessor = blockProcessor
node.consensusManager = consensusManager
node.requestManager = RequestManager.init(node.network,
dag.cfg.DENEB_FORK_EPOCH,
getBeaconTime,
quarantine,
blobQuarantine,
rmanBlockVerifier)
node.requestManager = requestManager
node.syncManager = syncManager
node.backfiller = backfiller
node.router = router
@ -1393,16 +1394,9 @@ proc handleMissingBlobs(node: BeaconNode) =
debug "Requesting detected missing blobs", blobs = shortLog(fetches)
node.requestManager.fetchMissingBlobs(fetches)
proc handleMissingBlocks(node: BeaconNode) =
let missingBlocks = node.quarantine[].checkMissing()
if missingBlocks.len > 0:
debug "Requesting detected missing blocks", blocks = shortLog(missingBlocks)
node.requestManager.fetchAncestorBlocks(missingBlocks)
proc onSecond(node: BeaconNode, time: Moment) =
## This procedure will be called once per second.
if not(node.syncManager.inProgress):
node.handleMissingBlocks()
node.handleMissingBlobs()
# Nim GC metrics (for the main thread)

View File

@ -34,18 +34,23 @@ const
BLOB_GOSSIP_WAIT_TIME_NS* = 2 * 1_000_000_000
## How long to wait for blobs to arrive over gossip before fetching.
POLL_FREQUENCY = 1.seconds
type
BlockVerifier* =
BlockVerifierFn* =
proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool):
Future[Result[void, VerifierError]] {.gcsafe, raises: [Defect].}
Future[Result[void, VerifierError]] {.gcsafe, raises: [].}
InhibitFn* = proc: bool {.gcsafe, raises:[].}
RequestManager* = object
network*: Eth2Node
inpBlockQueue*: AsyncQueue[FetchRecord]
inpBlobQueue: AsyncQueue[BlobIdentifier]
getBeaconTime: GetBeaconTimeFn
inhibit: InhibitFn
quarantine: ref Quarantine
blobQuarantine: ref BlobQuarantine
blockVerifier: BlockVerifier
blockVerifier: BlockVerifierFn
blockLoopFuture: Future[void]
blobLoopFuture: Future[void]
@ -58,14 +63,16 @@ func shortLog*(x: seq[FetchRecord]): string =
proc init*(T: type RequestManager, network: Eth2Node,
denebEpoch: Epoch,
getBeaconTime: GetBeaconTimeFn,
inhibit: InhibitFn,
quarantine: ref Quarantine,
blobQuarantine: ref BlobQuarantine,
blockVerifier: BlockVerifier): RequestManager =
blockVerifier: BlockVerifierFn): RequestManager =
RequestManager(
network: network,
inpBlockQueue: newAsyncQueue[FetchRecord](),
# TODO remove this queue and poll the quarantine directly
inpBlobQueue: newAsyncQueue[BlobIdentifier](),
getBeaconTime: getBeaconTime,
inhibit: inhibit,
quarantine: quarantine,
blobQuarantine: blobQuarantine,
blockVerifier: blockVerifier,
@ -100,8 +107,7 @@ proc checkResponse(idList: seq[BlobIdentifier],
return false
true
proc fetchAncestorBlocksFromNetwork(rman: RequestManager,
items: seq[Eth2Digest]) {.async.} =
proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async.} =
var peer: Peer
try:
peer = await rman.network.peerPool.acquire()
@ -154,21 +160,25 @@ proc fetchAncestorBlocksFromNetwork(rman: RequestManager,
peer.updateScore(PeerScoreUnviableFork)
elif gotGoodBlock:
debug "Request manager got good block",
peer = peer, blocks = shortLog(items)
peer = peer, blocks = shortLog(items), ublocks = len(ublocks)
# We reward peer only if it returns something.
peer.updateScore(PeerScoreGoodValues)
else:
debug "Mismatching response to blocks by root",
peer = peer, blocks = shortLog(items), ublocks = len(ublocks)
peer.updateScore(PeerScoreBadResponse)
else:
debug "Blocks by root request failed",
peer = peer, blocks = shortLog(items), err = blocks.error()
peer.updateScore(PeerScoreNoValues)
except CancelledError as exc:
raise exc
except CatchableError as exc:
peer.updateScore(PeerScoreNoValues)
debug "Error while fetching ancestor blocks", exc = exc.msg,
debug "Error while fetching blocks by root", exc = exc.msg,
items = shortLog(items), peer = peer, peer_score = peer.getScore()
raise exc
finally:
@ -189,6 +199,8 @@ proc fetchBlobsFromNetwork(self: RequestManager,
if blobs.isOk:
let ublobs = blobs.get()
if not checkResponse(idList, ublobs.asSeq()):
debug "Mismatched response to blobs by root",
peer = peer, blobs = shortLog(idList), ublobs = len(ublobs)
peer.updateScore(PeerScoreBadResponse)
return
@ -204,56 +216,64 @@ proc fetchBlobsFromNetwork(self: RequestManager,
# TODO:
# If appropriate, return a VerifierError.InvalidBlob from verification,
# check for it here, and penalize the peer accordingly.
else:
debug "Blobs by root request failed",
peer = peer, blobs = shortLog(idList), err = blobs.error()
peer.updateScore(PeerScoreNoValues)
except CancelledError as exc:
raise exc
except CatchableError as exc:
peer.updateScore(PeerScoreNoValues)
debug "Error while fetching blobs", exc = exc.msg,
debug "Error while fetching blobs by root", exc = exc.msg,
idList = shortLog(idList), peer = peer, peer_score = peer.getScore()
raise exc
finally:
if not(isNil(peer)):
self.network.peerPool.release(peer)
proc requestManagerBlockLoop(rman: RequestManager) {.async.} =
var rootList = newSeq[Eth2Digest]()
var workers = newSeq[Future[void]](PARALLEL_REQUESTS)
while true:
# TODO This polling could be replaced with an AsyncEvent that is fired
# from the quarantine when there's work to do
await sleepAsync(POLL_FREQUENCY)
if rman.inhibit():
continue
let blocks = mapIt(rman.quarantine[].checkMissing(
SYNC_MAX_REQUESTED_BLOCKS), it.root)
if blocks.len == 0:
continue
debug "Requesting detected missing blocks", blocks = shortLog(blocks)
try:
rootList.setLen(0)
let req = await rman.inpBlockQueue.popFirst()
rootList.add(req.root)
var count = min(SYNC_MAX_REQUESTED_BLOCKS - 1, len(rman.inpBlockQueue))
while count > 0:
rootList.add(rman.inpBlockQueue.popFirstNoWait().root)
dec(count)
let start = SyncMoment.now(0)
for i in 0 ..< PARALLEL_REQUESTS:
workers[i] = rman.fetchAncestorBlocksFromNetwork(rootList)
var workers: array[PARALLEL_REQUESTS, Future[void]]
for i in 0 ..< PARALLEL_REQUESTS:
workers[i] = rman.requestBlocksByRoot(blocks)
# We do not care about
await allFutures(workers)
let finish = SyncMoment.now(uint64(len(rootList)))
let finish = SyncMoment.now(uint64(len(blocks)))
var succeed = 0
for worker in workers:
if worker.finished() and not(worker.failed()):
if worker.completed():
inc(succeed)
debug "Request manager block tick", blocks_count = len(rootList),
debug "Request manager block tick", blocks = shortLog(blocks),
succeed = succeed,
failed = (len(workers) - succeed),
queue_size = len(rman.inpBlockQueue),
sync_speed = speed(start, finish)
except CancelledError as exc:
break
except CatchableError as exc:
debug "Got a problem in request manager", exc = exc.msg
warn "Unexpected error in request manager block loop", exc = exc.msg
proc requestManagerBlobLoop(rman: RequestManager) {.async.} =
var idList = newSeq[BlobIdentifier]()
@ -261,13 +281,13 @@ proc requestManagerBlobLoop(rman: RequestManager) {.async.} =
while true:
try:
idList.setLen(0)
let id = await rman.inpBlobQueue.popFirst()
idList.add(id)
idList.add(await rman.inpBlobQueue.popFirst())
var count = min(MAX_REQUEST_BLOB_SIDECARS - 1, lenu64(rman.inpBlobQueue))
while count > 0:
idList.add(rman.inpBlobQueue.popFirstNoWait())
dec(count)
while len(rman.inpBlobQueue) > 0 and
lenu64(idList) < MAX_REQUEST_BLOB_SIDECARS:
let id = rman.inpBlobQueue.popFirstNoWait()
if id notin idList:
idList.add(id)
let start = SyncMoment.now(0)
@ -286,8 +306,10 @@ proc requestManagerBlobLoop(rman: RequestManager) {.async.} =
failed = (len(workers) - succeed),
queue_size = len(rman.inpBlobQueue)
except CancelledError as exc:
break
except CatchableError as exc:
debug "Got a problem in request manager", exc = exc.msg
warn "Unexpected error in request manager blob loop", exc = exc.msg
proc start*(rman: var RequestManager) =
## Start Request Manager's loops.
@ -309,7 +331,6 @@ proc fetchAncestorBlocks*(rman: RequestManager, roots: seq[FetchRecord]) =
rman.inpBlockQueue.addLastNoWait(item)
except AsyncQueueFullError: raiseAssert "unbounded queue"
proc fetchMissingBlobs*(rman: RequestManager,
recs: seq[BlobFetchRecord]) =
var idList: seq[BlobIdentifier]

View File

@ -68,7 +68,7 @@ suite "Block processor" & preset():
check:
not dag.containsForkBlock(b2.root) # Unresolved, shouldn't show up
FetchRecord(root: b1.root) in quarantine[].checkMissing()
FetchRecord(root: b1.root) in quarantine[].checkMissing(32)
let
status = await processor.storeBlock(

View File

@ -42,11 +42,11 @@ suite "Block quarantine":
quarantine.addMissing(b1.root)
check:
FetchRecord(root: b1.root) in quarantine.checkMissing()
FetchRecord(root: b1.root) in quarantine.checkMissing(32)
quarantine.addOrphan(Slot 0, b1).isOk
FetchRecord(root: b1.root) notin quarantine.checkMissing()
FetchRecord(root: b1.root) notin quarantine.checkMissing(32)
quarantine.addOrphan(Slot 0, b2).isOk
quarantine.addOrphan(Slot 0, b3).isOk