diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 04181ab83..96945882b 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -445,12 +445,17 @@ proc initFullNode( blockProcessor, node.validatorMonitor, dag, attestationPool, validatorChangePool, node.attachedValidators, syncCommitteeMsgPool, lightClientPool, quarantine, blobQuarantine, rng, getBeaconTime, taskpool) + branchDiscovery = BranchDiscovery.new( + node.network, getFirstSlotAtFinalizedEpoch, isBlockKnown, + branchDiscoveryBlockVerifier) + fallbackSyncer = proc(peer: Peer) = + branchDiscovery.transferOwnership(peer) syncManager = newSyncManager[Peer, PeerId]( node.network.peerPool, dag.cfg.DENEB_FORK_EPOCH, dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, SyncQueueKind.Forward, getLocalHeadSlot, getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot, - getFrontfillSlot, dag.tail.slot, blockVerifier) + getFrontfillSlot, dag.tail.slot, blockVerifier, fallbackSyncer) backfiller = newSyncManager[Peer, PeerId]( node.network.peerPool, dag.cfg.DENEB_FORK_EPOCH, dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, @@ -458,9 +463,6 @@ proc initFullNode( getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot, getFrontfillSlot, dag.backfill.slot, blockVerifier, maxHeadAge = 0) - branchDiscovery = BranchDiscovery.new( - node.network, getFirstSlotAtFinalizedEpoch, isBlockKnown, - branchDiscoveryBlockVerifier) router = (ref MessageRouter)( processor: processor, network: node.network) diff --git a/beacon_chain/sync/branch_discovery.nim b/beacon_chain/sync/branch_discovery.nim index 4a6545bf4..8e3d1ca3e 100644 --- a/beacon_chain/sync/branch_discovery.nim +++ b/beacon_chain/sync/branch_discovery.nim @@ -29,7 +29,7 @@ # with lower validator support while the canonical chain was not visible. import - std/algorithm, + std/[algorithm, deques], chronos, chronicles, metrics, results, ../spec/[forks, network], ../consensus_object_pools/block_pools_types, @@ -69,6 +69,7 @@ type blockVerifier: BlockVerifierCallback isActive: AsyncEvent loopFuture: Future[void].Raising([]) + peerQueue: Deque[Peer] proc new*( T: type BranchDiscovery, @@ -245,11 +246,14 @@ proc loop(self: ref BranchDiscovery) {.async: (raises: []).} = await sleepAsync(RESP_TIMEOUT_DUR) let peer = - try: - self[].network.peerPool.acquireNoWait() - except PeerPoolError as exc: - debug "Failed to acquire peer", exc = exc.msg - continue + if self[].peerQueue.len > 0: + self[].peerQueue.popFirst() + else: + try: + self[].network.peerPool.acquireNoWait() + except PeerPoolError as exc: + debug "Failed to acquire peer", exc = exc.msg + continue defer: self[].network.peerPool.release(peer) await self[].discoverBranch(peer) @@ -264,6 +268,11 @@ func state*(self: ref BranchDiscovery): BranchDiscoveryState = else: BranchDiscoveryState.Active +proc clearPeerQueue(self: ref BranchDiscovery) = + while self[].peerQueue.len > 0: + let peer = self[].peerQueue.popLast() + self[].network.peerPool.release(peer) + proc start*(self: ref BranchDiscovery) = doAssert self[].loopFuture == nil info "Starting discovery of new branches" @@ -276,11 +285,23 @@ proc stop*(self: ref BranchDiscovery) {.async: (raises: []).} = await self[].loopFuture.cancelAndWait() self[].loopFuture = nil beacon_sync_branchdiscovery_state.set(self.state.ord().int64) + self.clearPeerQueue() proc suspend*(self: ref BranchDiscovery) = self[].isActive.clear() beacon_sync_branchdiscovery_state.set(self.state.ord().int64) + self.clearPeerQueue() proc resume*(self: ref BranchDiscovery) = self[].isActive.fire() beacon_sync_branchdiscovery_state.set(self.state.ord().int64) + +proc transferOwnership*(self: ref BranchDiscovery, peer: Peer) = + const maxPeersInQueue = 10 + if self.state != BranchDiscoveryState.Active or + self[].peerQueue.len >= maxPeersInQueue: + self[].network.peerPool.release(peer) + return + debug "Peer transferred to branch discovery", + peer, peer_score = peer.getScore() + self[].peerQueue.addLast(peer) diff --git a/beacon_chain/sync/sync_manager.nim b/beacon_chain/sync/sync_manager.nim index 1f13549eb..745d4af3e 100644 --- a/beacon_chain/sync/sync_manager.nim +++ b/beacon_chain/sync/sync_manager.nim @@ -35,6 +35,8 @@ const ## Time time it takes for the peer's status information to expire. type + PeerSyncer*[T] = proc(peer: T) {.gcsafe, raises: [].} + SyncWorkerStatus* {.pure.} = enum Sleeping, WaitingPeer, UpdatingStatus, Requesting, Downloading, Queueing, Processing @@ -65,6 +67,7 @@ type queue: SyncQueue[A] syncFut: Future[void] blockVerifier: BlockVerifier + fallbackSyncer: PeerSyncer[A] inProgress*: bool insSyncSpeed*: float avgSyncSpeed*: float @@ -126,6 +129,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B], getFrontfillSlotCb: GetSlotCallback, progressPivot: Slot, blockVerifier: BlockVerifier, + fallbackSyncer: PeerSyncer[A] = nil, maxHeadAge = uint64(SLOTS_PER_EPOCH * 1), chunkSize = uint64(SLOTS_PER_EPOCH), flags: set[SyncManagerFlag] = {}, @@ -150,6 +154,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B], maxHeadAge: maxHeadAge, chunkSize: chunkSize, blockVerifier: blockVerifier, + fallbackSyncer: fallbackSyncer, notInSyncEvent: newAsyncEvent(), direction: direction, ident: ident, @@ -520,7 +525,13 @@ proc syncWorker[A, B](man: SyncManager[A, B], index: int) {.async: (raises: [Can man.workers[index].status = SyncWorkerStatus.WaitingPeer peer = await man.pool.acquire() await man.syncStep(index, peer) - man.pool.release(peer) + if man.workers[index].status < SyncWorkerStatus.Downloading and + man.fallbackSyncer != nil: + # The peer was not useful for us, hand it over to the fallback syncer. + # It is the responsibility of the fallback syncer to release the peer + man.fallbackSyncer(peer) + else: + man.pool.release(peer) peer = nil finally: if not(isNil(peer)):