* unrevert rest of https://github.com/status-im/nimbus-eth2/pull/5765

* rm stray e2store docs changes

* reduce diff

* fix indent

---------

Co-authored-by: Jacek Sieka <jacek@status.im>
This commit is contained in:
tersec 2024-02-09 08:35:41 +00:00 committed by GitHub
parent dca444bea7
commit 642774e596
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 176 additions and 296 deletions

View File

@ -352,7 +352,7 @@ proc initFullNode(
blobQuarantine, getBeaconTime) blobQuarantine, getBeaconTime)
blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars], maybeFinalized: bool): blobs: Opt[BlobSidecars], maybeFinalized: bool):
Future[Result[void, VerifierError]] = Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} =
# The design with a callback for block verification is unusual compared # The design with a callback for block verification is unusual compared
# to the rest of the application, but fits with the general approach # to the rest of the application, but fits with the general approach
# taken in the sync/request managers - this is an architectural compromise # taken in the sync/request managers - this is an architectural compromise
@ -361,27 +361,23 @@ proc initFullNode(
MsgSource.gossip, signedBlock, blobs, maybeFinalized = maybeFinalized) MsgSource.gossip, signedBlock, blobs, maybeFinalized = maybeFinalized)
rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
maybeFinalized: bool): maybeFinalized: bool):
Future[Result[void, VerifierError]] = Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} =
withBlck(signedBlock): withBlck(signedBlock):
when typeof(forkyBlck).kind >= ConsensusFork.Deneb: when consensusFork >= ConsensusFork.Deneb:
if not blobQuarantine[].hasBlobs(forkyBlck): if not blobQuarantine[].hasBlobs(forkyBlck):
# We don't have all the blobs for this block, so we have # We don't have all the blobs for this block, so we have
# to put it in blobless quarantine. # to put it in blobless quarantine.
if not quarantine[].addBlobless(dag.finalizedHead.slot, forkyBlck): if not quarantine[].addBlobless(dag.finalizedHead.slot, forkyBlck):
Future.completed( err(VerifierError.UnviableFork)
Result[void, VerifierError].err(VerifierError.UnviableFork),
"rmanBlockVerifier")
else: else:
Future.completed( err(VerifierError.MissingParent)
Result[void, VerifierError].err(VerifierError.MissingParent),
"rmanBlockVerifier")
else: else:
let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck) let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck)
blockProcessor[].addBlock(MsgSource.gossip, signedBlock, await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.some(blobs), Opt.some(blobs),
maybeFinalized = maybeFinalized) maybeFinalized = maybeFinalized)
else: else:
blockProcessor[].addBlock(MsgSource.gossip, signedBlock, await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.none(BlobSidecars), Opt.none(BlobSidecars),
maybeFinalized = maybeFinalized) maybeFinalized = maybeFinalized)

View File

@ -39,7 +39,7 @@ const
type type
BlockVerifierFn* = BlockVerifierFn* =
proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool): proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool):
Future[Result[void, VerifierError]] {.gcsafe, raises: [].} Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).}
InhibitFn* = proc: bool {.gcsafe, raises:[].} InhibitFn* = proc: bool {.gcsafe, raises:[].}
RequestManager* = object RequestManager* = object
@ -49,8 +49,8 @@ type
quarantine: ref Quarantine quarantine: ref Quarantine
blobQuarantine: ref BlobQuarantine blobQuarantine: ref BlobQuarantine
blockVerifier: BlockVerifierFn blockVerifier: BlockVerifierFn
blockLoopFuture: Future[void] blockLoopFuture: Future[void].Raising([CancelledError])
blobLoopFuture: Future[void] blobLoopFuture: Future[void].Raising([CancelledError])
func shortLog*(x: seq[Eth2Digest]): string = func shortLog*(x: seq[Eth2Digest]): string =
"[" & x.mapIt(shortLog(it)).join(", ") & "]" "[" & x.mapIt(shortLog(it)).join(", ") & "]"
@ -104,7 +104,7 @@ proc checkResponse(idList: seq[BlobIdentifier],
return false return false
true true
proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async.} = proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async: (raises: [CancelledError]).} =
var peer: Peer var peer: Peer
try: try:
peer = await rman.network.peerPool.acquire() peer = await rman.network.peerPool.acquire()
@ -171,19 +171,13 @@ proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async.}
peer = peer, blocks = shortLog(items), err = blocks.error() peer = peer, blocks = shortLog(items), err = blocks.error()
peer.updateScore(PeerScoreNoValues) peer.updateScore(PeerScoreNoValues)
except CancelledError as exc:
raise exc
except CatchableError as exc:
peer.updateScore(PeerScoreNoValues)
debug "Error while fetching blocks by root", exc = exc.msg,
items = shortLog(items), peer = peer, peer_score = peer.getScore()
raise exc
finally: finally:
if not(isNil(peer)): if not(isNil(peer)):
rman.network.peerPool.release(peer) rman.network.peerPool.release(peer)
proc fetchBlobsFromNetwork(self: RequestManager, proc fetchBlobsFromNetwork(self: RequestManager,
idList: seq[BlobIdentifier]) {.async.} = idList: seq[BlobIdentifier])
{.async: (raises: [CancelledError]).} =
var peer: Peer var peer: Peer
try: try:
@ -191,7 +185,7 @@ proc fetchBlobsFromNetwork(self: RequestManager,
debug "Requesting blobs by root", peer = peer, blobs = shortLog(idList), debug "Requesting blobs by root", peer = peer, blobs = shortLog(idList),
peer_score = peer.getScore() peer_score = peer.getScore()
let blobs = (await blobSidecarsByRoot(peer, BlobIdentifierList idList)) let blobs = await blobSidecarsByRoot(peer, BlobIdentifierList idList)
if blobs.isOk: if blobs.isOk:
let ublobs = blobs.get() let ublobs = blobs.get()
@ -219,18 +213,11 @@ proc fetchBlobsFromNetwork(self: RequestManager,
peer = peer, blobs = shortLog(idList), err = blobs.error() peer = peer, blobs = shortLog(idList), err = blobs.error()
peer.updateScore(PeerScoreNoValues) peer.updateScore(PeerScoreNoValues)
except CancelledError as exc:
raise exc
except CatchableError as exc:
peer.updateScore(PeerScoreNoValues)
debug "Error while fetching blobs by root", exc = exc.msg,
idList = shortLog(idList), peer = peer, peer_score = peer.getScore()
raise exc
finally: finally:
if not(isNil(peer)): if not(isNil(peer)):
self.network.peerPool.release(peer) self.network.peerPool.release(peer)
proc requestManagerBlockLoop(rman: RequestManager) {.async.} = proc requestManagerBlockLoop(rman: RequestManager) {.async: (raises: [CancelledError]).} =
while true: while true:
# TODO This polling could be replaced with an AsyncEvent that is fired # TODO This polling could be replaced with an AsyncEvent that is fired
# from the quarantine when there's work to do # from the quarantine when there's work to do
@ -245,33 +232,19 @@ proc requestManagerBlockLoop(rman: RequestManager) {.async.} =
continue continue
debug "Requesting detected missing blocks", blocks = shortLog(blocks) debug "Requesting detected missing blocks", blocks = shortLog(blocks)
try: let start = SyncMoment.now(0)
let start = SyncMoment.now(0)
var workers: array[PARALLEL_REQUESTS, Future[void]] var workers: array[PARALLEL_REQUESTS, Future[void].Raising([CancelledError])]
for i in 0 ..< PARALLEL_REQUESTS: for i in 0 ..< PARALLEL_REQUESTS:
workers[i] = rman.requestBlocksByRoot(blocks) workers[i] = rman.requestBlocksByRoot(blocks)
await allFutures(workers) await allFutures(workers)
let finish = SyncMoment.now(uint64(len(blocks))) let finish = SyncMoment.now(uint64(len(blocks)))
var succeed = 0
for worker in workers:
if worker.completed():
inc(succeed)
debug "Request manager block tick", blocks = shortLog(blocks),
succeed = succeed,
failed = (len(workers) - succeed),
sync_speed = speed(start, finish)
except CancelledError:
break
except CatchableError as exc:
warn "Unexpected error in request manager block loop", exc = exc.msg
debug "Request manager block tick", blocks = shortLog(blocks),
sync_speed = speed(start, finish)
proc getMissingBlobs(rman: RequestManager): seq[BlobIdentifier] = proc getMissingBlobs(rman: RequestManager): seq[BlobIdentifier] =
let let
@ -308,11 +281,10 @@ proc getMissingBlobs(rman: RequestManager): seq[BlobIdentifier] =
rman.quarantine[].removeBlobless(blobless) rman.quarantine[].removeBlobless(blobless)
fetches fetches
proc requestManagerBlobLoop(rman: RequestManager) {.async: (raises: [CancelledError]).} =
proc requestManagerBlobLoop(rman: RequestManager) {.async.} =
while true: while true:
# TODO This polling could be replaced with an AsyncEvent that is fired # TODO This polling could be replaced with an AsyncEvent that is fired
# from the quarantine when there's work to do # from the quarantine when there's work to do
await sleepAsync(POLL_INTERVAL) await sleepAsync(POLL_INTERVAL)
if rman.inhibit(): if rman.inhibit():
continue continue
@ -320,30 +292,17 @@ proc requestManagerBlobLoop(rman: RequestManager) {.async.} =
let fetches = rman.getMissingBlobs() let fetches = rman.getMissingBlobs()
if fetches.len > 0: if fetches.len > 0:
debug "Requesting detected missing blobs", blobs = shortLog(fetches) debug "Requesting detected missing blobs", blobs = shortLog(fetches)
try: let start = SyncMoment.now(0)
let start = SyncMoment.now(0) var workers: array[PARALLEL_REQUESTS, Future[void].Raising([CancelledError])]
var workers: array[PARALLEL_REQUESTS, Future[void]] for i in 0 ..< PARALLEL_REQUESTS:
for i in 0 ..< PARALLEL_REQUESTS: workers[i] = rman.fetchBlobsFromNetwork(fetches)
workers[i] = rman.fetchBlobsFromNetwork(fetches)
await allFutures(workers) await allFutures(workers)
let finish = SyncMoment.now(uint64(len(fetches))) let finish = SyncMoment.now(uint64(len(fetches)))
var succeed = 0 debug "Request manager blob tick",
for worker in workers: blobs_count = len(fetches),
if worker.finished() and not(worker.failed()): sync_speed = speed(start, finish)
inc(succeed)
debug "Request manager blob tick",
blobs_count = len(fetches),
succeed = succeed,
failed = (len(workers) - succeed),
sync_speed = speed(start, finish)
except CancelledError:
break
except CatchableError as exc:
warn "Unexpected error in request manager blob loop", exc = exc.msg
proc start*(rman: var RequestManager) = proc start*(rman: var RequestManager) =
## Start Request Manager's loops. ## Start Request Manager's loops.

View File

@ -43,7 +43,7 @@ type
NoMonitor NoMonitor
SyncWorker*[A, B] = object SyncWorker*[A, B] = object
future: Future[void] future: Future[void].Raising([CancelledError])
status: SyncWorkerStatus status: SyncWorkerStatus
SyncManager*[A, B] = ref object SyncManager*[A, B] = ref object
@ -158,8 +158,9 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
res.initQueue() res.initQueue()
res res
proc getBlocks*[A, B](man: SyncManager[A, B], peer: A, proc getBlocks[A, B](man: SyncManager[A, B], peer: A,
req: SyncRequest): Future[BeaconBlocksRes] {.async.} = req: SyncRequest): Future[BeaconBlocksRes] {.
async: (raises: [CancelledError], raw: true).} =
mixin getScore, `==` mixin getScore, `==`
logScope: logScope:
@ -171,21 +172,8 @@ proc getBlocks*[A, B](man: SyncManager[A, B], peer: A,
doAssert(not(req.isEmpty()), "Request must not be empty!") doAssert(not(req.isEmpty()), "Request must not be empty!")
debug "Requesting blocks from peer", request = req debug "Requesting blocks from peer", request = req
try:
let res = await beaconBlocksByRange_v2(peer, req.slot, req.count, 1'u64)
if res.isErr(): beaconBlocksByRange_v2(peer, req.slot, req.count, 1'u64)
debug "Error, while reading getBlocks response", request = req,
error = $res.error()
return
return res
except CancelledError:
debug "Interrupt, while waiting getBlocks response", request = req
return
except CatchableError as exc:
debug "Error, while waiting getBlocks response", request = req,
errName = exc.name, errMsg = exc.msg
return
proc shouldGetBlobs[A, B](man: SyncManager[A, B], e: Epoch): bool = proc shouldGetBlobs[A, B](man: SyncManager[A, B], e: Epoch): bool =
let wallEpoch = man.getLocalWallSlot().epoch let wallEpoch = man.getLocalWallSlot().epoch
@ -194,8 +182,8 @@ proc shouldGetBlobs[A, B](man: SyncManager[A, B], e: Epoch): bool =
e >= wallEpoch - man.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS) e >= wallEpoch - man.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS)
proc getBlobSidecars[A, B](man: SyncManager[A, B], peer: A, proc getBlobSidecars[A, B](man: SyncManager[A, B], peer: A,
req: SyncRequest req: SyncRequest): Future[BlobSidecarsRes]
): Future[BlobSidecarsRes] {.async.} = {.async: (raises: [CancelledError], raw: true).} =
mixin getScore, `==` mixin getScore, `==`
logScope: logScope:
@ -207,21 +195,7 @@ proc getBlobSidecars[A, B](man: SyncManager[A, B], peer: A,
doAssert(not(req.isEmpty()), "Request must not be empty!") doAssert(not(req.isEmpty()), "Request must not be empty!")
debug "Requesting blobs sidecars from peer", request = req debug "Requesting blobs sidecars from peer", request = req
try: blobSidecarsByRange(peer, req.slot, req.count)
let res = await blobSidecarsByRange(peer, req.slot, req.count)
if res.isErr():
debug "Error, while reading blobSidecarsByRange response", request = req,
error = $res.error()
return
return res
except CancelledError:
debug "Interrupt, while waiting blobSidecarsByRange response", request = req
return
except CatchableError as exc:
debug "Error, while waiting blobSidecarsByRange response", request = req,
errName = exc.name, errMsg = exc.msg
return
proc remainingSlots(man: SyncManager): uint64 = proc remainingSlots(man: SyncManager): uint64 =
let let
@ -282,7 +256,8 @@ func checkBlobs(blobs: seq[BlobSidecars]): Result[void, string] =
? blob_sidecar[].verify_blob_sidecar_inclusion_proof() ? blob_sidecar[].verify_blob_sidecar_inclusion_proof()
ok() ok()
proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} = proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A)
{.async: (raises: [CancelledError]).} =
logScope: logScope:
peer_score = peer.getScore() peer_score = peer.getScore()
peer_speed = peer.netKbps() peer_speed = peer.netKbps()
@ -322,17 +297,11 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
trace "Updating peer's status information", wall_clock_slot = wallSlot, trace "Updating peer's status information", wall_clock_slot = wallSlot,
remote_head_slot = peerSlot, local_head_slot = headSlot remote_head_slot = peerSlot, local_head_slot = headSlot
try: if not(await peer.updateStatus()):
let res = await peer.updateStatus() peer.updateScore(PeerScoreNoStatus)
if not(res): debug "Failed to get remote peer's status, exiting",
peer.updateScore(PeerScoreNoStatus) peer_head_slot = peerSlot
debug "Failed to get remote peer's status, exiting",
peer_head_slot = peerSlot
return
except CatchableError as exc:
debug "Unexpected exception while updating peer's status",
peer_head_slot = peerSlot, errName = exc.name, errMsg = exc.msg
return return
let newPeerSlot = peer.getHeadSlot() let newPeerSlot = peer.getHeadSlot()
@ -419,110 +388,103 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
man.workers[index].status = SyncWorkerStatus.Downloading man.workers[index].status = SyncWorkerStatus.Downloading
try: let blocks = await man.getBlocks(peer, req)
let blocks = await man.getBlocks(peer, req) if blocks.isErr():
if blocks.isErr(): peer.updateScore(PeerScoreNoValues)
peer.updateScore(PeerScoreNoValues)
man.queue.push(req)
debug "Failed to receive blocks on request", request = req
return
let blockData = blocks.get().asSeq()
let blockSmap = getShortMap(req, blockData)
debug "Received blocks on request", blocks_count = len(blockData),
blocks_map = blockSmap, request = req
let slots = mapIt(blockData, it[].slot)
if not(checkResponse(req, slots)):
peer.updateScore(PeerScoreBadResponse)
man.queue.push(req)
warn "Received blocks sequence is not in requested range",
blocks_count = len(blockData), blocks_map = blockSmap,
request = req
return
func combine(acc: seq[Slot], cur: Slot): seq[Slot] =
var copy = acc
if copy[copy.len-1] != cur:
copy.add(cur)
copy
let blobData =
if man.shouldGetBlobs(req.slot.epoch):
let blobs = await man.getBlobSidecars(peer, req)
if blobs.isErr():
peer.updateScore(PeerScoreNoValues)
man.queue.push(req)
debug "Failed to receive blobs on request", request = req
return
let blobData = blobs.get().asSeq()
let blobSmap = getShortMap(req, blobData)
debug "Received blobs on request", blobs_count = len(blobData),
blobs_map = blobSmap, request = req
if len(blobData) > 0:
let slots = mapIt(blobData, it[].signed_block_header.message.slot)
let uniqueSlots = foldl(slots, combine(a, b), @[slots[0]])
if not(checkResponse(req, uniqueSlots)):
peer.updateScore(PeerScoreBadResponse)
man.queue.push(req)
warn "Received blobs sequence is not in requested range",
blobs_count = len(blobData), blobs_map = getShortMap(req, blobData),
request = req
return
let groupedBlobs = groupBlobs(req, blockData, blobData)
if groupedBlobs.isErr():
peer.updateScore(PeerScoreNoValues)
man.queue.push(req)
info "Received blobs sequence is inconsistent",
blobs_map = getShortMap(req, blobData), request = req, msg=groupedBlobs.error()
return
if (let checkRes = groupedBlobs.get.checkBlobs(); checkRes.isErr):
peer.updateScore(PeerScoreBadResponse)
man.queue.push(req)
warn "Received blobs sequence is invalid",
blobs_count = len(blobData),
blobs_map = getShortMap(req, blobData),
request = req,
msg = checkRes.error
return
Opt.some(groupedBlobs.get())
else:
Opt.none(seq[BlobSidecars])
if len(blockData) == 0 and man.direction == SyncQueueKind.Backward and
req.contains(man.getSafeSlot()):
# The sync protocol does not distinguish between:
# - All requested slots are empty
# - Peer does not have data available about requested range
#
# However, we include the `backfill` slot in backward sync requests.
# If we receive an empty response to a request covering that slot,
# we know that the response is incomplete and can descore.
peer.updateScore(PeerScoreNoValues)
man.queue.push(req)
debug "Response does not include known-to-exist block", request = req
return
# Scoring will happen in `syncUpdate`.
man.workers[index].status = SyncWorkerStatus.Queueing
let
peerFinalized = peer.getFinalizedEpoch().start_slot()
lastSlot = req.slot + req.count
# The peer claims the block is finalized - our own block processing will
# verify this point down the line
# TODO descore peers that lie
maybeFinalized = lastSlot < peerFinalized
await man.queue.push(req, blockData, blobData, maybeFinalized, proc() =
man.workers[index].status = SyncWorkerStatus.Processing)
except CatchableError as exc:
man.queue.push(req) man.queue.push(req)
debug "Unexpected exception while receiving blocks", request = req, debug "Failed to receive blocks on request", request = req
errName = exc.name, errMsg = exc.msg return
let blockData = blocks.get().asSeq()
let blockSmap = getShortMap(req, blockData)
debug "Received blocks on request", blocks_count = len(blockData),
blocks_map = blockSmap, request = req
let slots = mapIt(blockData, it[].slot)
if not(checkResponse(req, slots)):
peer.updateScore(PeerScoreBadResponse)
man.queue.push(req)
warn "Received blocks sequence is not in requested range",
blocks_count = len(blockData), blocks_map = blockSmap,
request = req
return return
proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async.} = func combine(acc: seq[Slot], cur: Slot): seq[Slot] =
var copy = acc
if copy[copy.len-1] != cur:
copy.add(cur)
copy
let blobData =
if man.shouldGetBlobs(req.slot.epoch):
let blobs = await man.getBlobSidecars(peer, req)
if blobs.isErr():
peer.updateScore(PeerScoreNoValues)
man.queue.push(req)
debug "Failed to receive blobs on request", request = req
return
let blobData = blobs.get().asSeq()
let blobSmap = getShortMap(req, blobData)
debug "Received blobs on request", blobs_count = len(blobData),
blobs_map = blobSmap, request = req
if len(blobData) > 0:
let slots = mapIt(blobData, it[].signed_block_header.message.slot)
let uniqueSlots = foldl(slots, combine(a, b), @[slots[0]])
if not(checkResponse(req, uniqueSlots)):
peer.updateScore(PeerScoreBadResponse)
man.queue.push(req)
warn "Received blobs sequence is not in requested range",
blobs_count = len(blobData), blobs_map = getShortMap(req, blobData),
request = req
return
let groupedBlobs = groupBlobs(req, blockData, blobData)
if groupedBlobs.isErr():
peer.updateScore(PeerScoreNoValues)
man.queue.push(req)
info "Received blobs sequence is inconsistent",
blobs_map = getShortMap(req, blobData), request = req, msg=groupedBlobs.error()
return
if (let checkRes = groupedBlobs.get.checkBlobs(); checkRes.isErr):
peer.updateScore(PeerScoreBadResponse)
man.queue.push(req)
warn "Received blobs sequence is invalid",
blobs_count = len(blobData),
blobs_map = getShortMap(req, blobData),
request = req,
msg = checkRes.error
return
Opt.some(groupedBlobs.get())
else:
Opt.none(seq[BlobSidecars])
if len(blockData) == 0 and man.direction == SyncQueueKind.Backward and
req.contains(man.getSafeSlot()):
# The sync protocol does not distinguish between:
# - All requested slots are empty
# - Peer does not have data available about requested range
#
# However, we include the `backfill` slot in backward sync requests.
# If we receive an empty response to a request covering that slot,
# we know that the response is incomplete and can descore.
peer.updateScore(PeerScoreNoValues)
man.queue.push(req)
debug "Response does not include known-to-exist block", request = req
return
# Scoring will happen in `syncUpdate`.
man.workers[index].status = SyncWorkerStatus.Queueing
let
peerFinalized = peer.getFinalizedEpoch().start_slot()
lastSlot = req.slot + req.count
# The peer claims the block is finalized - our own block processing will
# verify this point down the line
# TODO descore peers that lie
maybeFinalized = lastSlot < peerFinalized
await man.queue.push(req, blockData, blobData, maybeFinalized, proc() =
man.workers[index].status = SyncWorkerStatus.Processing)
proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async: (raises: [CancelledError]).} =
mixin getKey, getScore, getHeadSlot mixin getKey, getScore, getHeadSlot
logScope: logScope:
@ -533,30 +495,21 @@ proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async.} =
debug "Starting syncing worker" debug "Starting syncing worker"
while true: var peer: A = nil
var peer: A = nil
let doBreak = try:
try: while true:
man.workers[index].status = SyncWorkerStatus.Sleeping man.workers[index].status = SyncWorkerStatus.Sleeping
# This event is going to be set until we are not in sync with network # This event is going to be set until we are not in sync with network
await man.notInSyncEvent.wait() await man.notInSyncEvent.wait()
man.workers[index].status = SyncWorkerStatus.WaitingPeer man.workers[index].status = SyncWorkerStatus.WaitingPeer
peer = await man.pool.acquire() peer = await man.pool.acquire()
await man.syncStep(index, peer) await man.syncStep(index, peer)
man.pool.release(peer) man.pool.release(peer)
false peer = nil
except CancelledError: finally:
if not(isNil(peer)): if not(isNil(peer)):
man.pool.release(peer) man.pool.release(peer)
true
except CatchableError as exc:
debug "Unexpected exception in sync worker",
peer = peer, peer_score = peer.getScore(),
peer_speed = peer.netKbps(),
errName = exc.name, errMsg = exc.msg
true
if doBreak:
break
debug "Sync worker stopped" debug "Sync worker stopped"
@ -593,34 +546,10 @@ proc getWorkersStats[A, B](man: SyncManager[A, B]): tuple[map: string,
map[i] = ch map[i] = ch
(map, sleeping, waiting, pending) (map, sleeping, waiting, pending)
proc guardTask[A, B](man: SyncManager[A, B]) {.async.} = proc startWorkers[A, B](man: SyncManager[A, B]) =
logScope:
index = index
sync_ident = man.ident
direction = man.direction
topics = "syncman"
var pending: array[SyncWorkersCount, Future[void]]
# Starting all the synchronization workers. # Starting all the synchronization workers.
for i in 0 ..< len(man.workers): for i in 0 ..< len(man.workers):
let future = syncWorker[A, B](man, i) man.workers[i].future = syncWorker[A, B](man, i)
man.workers[i].future = future
pending[i] = future
# Wait for synchronization worker's failure and replace it with new one.
while true:
let failFuture = await one(pending)
let index = pending.find(failFuture)
if failFuture.failed():
warn "Synchronization worker stopped working unexpectedly with an error",
errName = failFuture.error.name, errMsg = failFuture.error.msg
else:
warn "Synchronization worker stopped working unexpectedly without error"
let future = syncWorker[A, B](man, index)
man.workers[index].future = future
pending[index] = future
proc toTimeLeftString*(d: Duration): string = proc toTimeLeftString*(d: Duration): string =
if d == InfiniteDuration: if d == InfiniteDuration:
@ -648,11 +577,9 @@ proc toTimeLeftString*(d: Duration): string =
res = res & "00m" res = res & "00m"
res res
proc syncClose[A, B](man: SyncManager[A, B], guardTaskFut: Future[void], proc syncClose[A, B](man: SyncManager[A, B],
speedTaskFut: Future[void]) {.async.} = speedTaskFut: Future[void]) {.async.} =
var pending: seq[FutureBase] var pending: seq[FutureBase]
if not(guardTaskFut.finished()):
pending.add(guardTaskFut.cancelAndWait())
if not(speedTaskFut.finished()): if not(speedTaskFut.finished()):
pending.add(speedTaskFut.cancelAndWait()) pending.add(speedTaskFut.cancelAndWait())
for worker in man.workers: for worker in man.workers:
@ -669,11 +596,11 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
mixin getKey, getScore mixin getKey, getScore
var pauseTime = 0 var pauseTime = 0
var guardTaskFut = man.guardTask() man.startWorkers()
debug "Synchronization loop started" debug "Synchronization loop started"
proc averageSpeedTask() {.async.} = proc averageSpeedTask() {.async: (raises: [CancelledError]).} =
while true: while true:
# Reset sync speeds between each loss-of-sync event # Reset sync speeds between each loss-of-sync event
man.avgSyncSpeed = 0 man.avgSyncSpeed = 0
@ -703,7 +630,7 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
stamp = newStamp stamp = newStamp
var averageSpeedTaskFut = averageSpeedTask() let averageSpeedTaskFut = averageSpeedTask()
while true: while true:
let wallSlot = man.getLocalWallSlot() let wallSlot = man.getLocalWallSlot()
@ -788,7 +715,7 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
of SyncQueueKind.Forward: of SyncQueueKind.Forward:
if man.inProgress: if man.inProgress:
if SyncManagerFlag.NoMonitor in man.flags: if SyncManagerFlag.NoMonitor in man.flags:
await man.syncClose(guardTaskFut, averageSpeedTaskFut) await man.syncClose(averageSpeedTaskFut)
man.inProgress = false man.inProgress = false
debug "Forward synchronization process finished, exiting", debug "Forward synchronization process finished, exiting",
wall_head_slot = wallSlot, local_head_slot = headSlot, wall_head_slot = wallSlot, local_head_slot = headSlot,
@ -809,10 +736,8 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
of SyncQueueKind.Backward: of SyncQueueKind.Backward:
# Backward syncing is going to be executed only once, so we exit loop # Backward syncing is going to be executed only once, so we exit loop
# and stop all pending tasks which belongs to this instance (sync # and stop all pending tasks which belongs to this instance (sync
# workers, guard task and speed calculation task). # workers, speed calculation task).
# We first need to cancel and wait for guard task, because otherwise await man.syncClose(averageSpeedTaskFut)
# it will be able to restore cancelled workers.
await man.syncClose(guardTaskFut, averageSpeedTaskFut)
man.inProgress = false man.inProgress = false
debug "Backward synchronization process finished, exiting", debug "Backward synchronization process finished, exiting",
wall_head_slot = wallSlot, local_head_slot = headSlot, wall_head_slot = wallSlot, local_head_slot = headSlot,

View File

@ -27,7 +27,7 @@ type
ProcessingCallback* = proc() {.gcsafe, raises: [].} ProcessingCallback* = proc() {.gcsafe, raises: [].}
BlockVerifier* = proc(signedBlock: ForkedSignedBeaconBlock, BlockVerifier* = proc(signedBlock: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars], maybeFinalized: bool): blobs: Opt[BlobSidecars], maybeFinalized: bool):
Future[Result[void, VerifierError]] {.gcsafe, raises: [].} Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).}
SyncQueueKind* {.pure.} = enum SyncQueueKind* {.pure.} = enum
Forward, Backward Forward, Backward
@ -50,7 +50,7 @@ type
item*: T item*: T
SyncWaiter* = ref object SyncWaiter* = ref object
future: Future[void] future: Future[void].Raising([CancelledError])
reset: bool reset: bool
RewindPoint = object RewindPoint = object
@ -311,9 +311,9 @@ proc wakeupWaiters[T](sq: SyncQueue[T], reset = false) =
if not(item.future.finished()): if not(item.future.finished()):
item.future.complete() item.future.complete()
proc waitForChanges[T](sq: SyncQueue[T]): Future[bool] {.async.} = proc waitForChanges[T](sq: SyncQueue[T]): Future[bool] {.async: (raises: [CancelledError]).} =
## Create new waiter and wait for completion from `wakeupWaiters()`. ## Create new waiter and wait for completion from `wakeupWaiters()`.
var waitfut = newFuture[void]("SyncQueue.waitForChanges") let waitfut = Future[void].Raising([CancelledError]).init("SyncQueue.waitForChanges")
let waititem = SyncWaiter(future: waitfut) let waititem = SyncWaiter(future: waitfut)
sq.waiters.add(waititem) sq.waiters.add(waititem)
try: try:
@ -322,7 +322,7 @@ proc waitForChanges[T](sq: SyncQueue[T]): Future[bool] {.async.} =
finally: finally:
sq.waiters.delete(sq.waiters.find(waititem)) sq.waiters.delete(sq.waiters.find(waititem))
proc wakeupAndWaitWaiters[T](sq: SyncQueue[T]) {.async.} = proc wakeupAndWaitWaiters[T](sq: SyncQueue[T]) {.async: (raises: [CancelledError]).} =
## This procedure will perform wakeupWaiters(true) and blocks until last ## This procedure will perform wakeupWaiters(true) and blocks until last
## waiter will be awakened. ## waiter will be awakened.
var waitChanges = sq.waitForChanges() var waitChanges = sq.waitForChanges()
@ -333,7 +333,7 @@ proc clearAndWakeup*[T](sq: SyncQueue[T]) =
sq.pending.clear() sq.pending.clear()
sq.wakeupWaiters(true) sq.wakeupWaiters(true)
proc resetWait*[T](sq: SyncQueue[T], toSlot: Option[Slot]) {.async.} = proc resetWait*[T](sq: SyncQueue[T], toSlot: Option[Slot]) {.async: (raises: [CancelledError]).} =
## Perform reset of all the blocked waiters in SyncQueue. ## Perform reset of all the blocked waiters in SyncQueue.
## ##
## We adding one more waiter to the waiters sequence and ## We adding one more waiter to the waiters sequence and
@ -610,7 +610,7 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
data: seq[ref ForkedSignedBeaconBlock], data: seq[ref ForkedSignedBeaconBlock],
blobs: Opt[seq[BlobSidecars]], blobs: Opt[seq[BlobSidecars]],
maybeFinalized: bool = false, maybeFinalized: bool = false,
processingCb: ProcessingCallback = nil) {.async.} = processingCb: ProcessingCallback = nil) {.async: (raises: [CancelledError]).} =
logScope: logScope:
sync_ident = sq.ident sync_ident = sq.ident
topics = "syncman" topics = "syncman"

View File

@ -50,8 +50,8 @@ proc collector(queue: AsyncQueue[BlockEntry]): BlockVerifier =
# the BlockProcessor and this test # the BlockProcessor and this test
proc verify(signedBlock: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], proc verify(signedBlock: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars],
maybeFinalized: bool): maybeFinalized: bool):
Future[Result[void, VerifierError]] = Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} =
let fut = newFuture[Result[void, VerifierError]]() let fut = Future[Result[void, VerifierError]].Raising([CancelledError]).init()
try: queue.addLastNoWait(BlockEntry(blck: signedBlock, resfut: fut)) try: queue.addLastNoWait(BlockEntry(blck: signedBlock, resfut: fut))
except CatchableError as exc: raiseAssert exc.msg except CatchableError as exc: raiseAssert exc.msg
return fut return fut