diff --git a/beacon_chain/beacon_node.nim b/beacon_chain/beacon_node.nim index 34f23eb00..8a4d2cd5a 100644 --- a/beacon_chain/beacon_node.nim +++ b/beacon_chain/beacon_node.nim @@ -85,7 +85,7 @@ type requestManager*: RequestManager syncManager*: SyncManager[Peer, PeerId] backfiller*: SyncManager[Peer, PeerId] - branchDiscovery*: ref BranchDiscovery + branchDiscovery*: ref BranchDiscovery[Peer, PeerId] genesisSnapshotContent*: string processor*: ref Eth2Processor blockProcessor*: ref BlockProcessor diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 0471ddc73..993f2627f 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -445,8 +445,8 @@ proc initFullNode( blockProcessor, node.validatorMonitor, dag, attestationPool, validatorChangePool, node.attachedValidators, syncCommitteeMsgPool, lightClientPool, quarantine, blobQuarantine, rng, getBeaconTime, taskpool) - branchDiscovery = BranchDiscovery.new( - node.network, getFirstSlotAtFinalizedEpoch, isBlockKnown, + branchDiscovery = BranchDiscovery[Peer, PeerId].new( + node.network.peerPool, getFirstSlotAtFinalizedEpoch, isBlockKnown, branchDiscoveryBlockVerifier) fallbackSyncer = proc(peer: Peer) = branchDiscovery.transferOwnership(peer) diff --git a/beacon_chain/sync/branch_discovery.nim b/beacon_chain/sync/branch_discovery.nim index 54d205c48..7d67fbd14 100644 --- a/beacon_chain/sync/branch_discovery.nim +++ b/beacon_chain/sync/branch_discovery.nim @@ -62,23 +62,23 @@ type blobs: Opt[BlobSidecars] ): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} - BranchDiscovery* = object - network: Eth2Node + BranchDiscovery*[A, B] = object + pool: PeerPool[A, B] getFinalizedSlot: GetSlotCallback isBlockKnown: IsBlockKnownCallback blockVerifier: BlockVerifierCallback isActive: AsyncEvent loopFuture: Future[void].Raising([]) - peerQueue: Deque[Peer] + peerQueue: Deque[A] -proc new*( - T: type BranchDiscovery, - network: Eth2Node, +proc new*[A, B]( + T: type BranchDiscovery[A, B], + pool: PeerPool[A, B], getFinalizedSlot: GetSlotCallback, isBlockKnown: IsBlockKnownCallback, - blockVerifier: BlockVerifierCallback): ref BranchDiscovery = - let self = (ref BranchDiscovery)( - network: network, + blockVerifier: BlockVerifierCallback): ref BranchDiscovery[A, B] = + let self = (ref BranchDiscovery[A, B])( + pool: pool, getFinalizedSlot: getFinalizedSlot, isBlockKnown: isBlockKnown, blockVerifier: blockVerifier, @@ -86,8 +86,9 @@ proc new*( self[].isActive.fire() self -proc discoverBranch( - self: BranchDiscovery, peer: Peer) {.async: (raises: [CancelledError]).} = +proc discoverBranch[A, B]( + self: BranchDiscovery[A, B], + peer: A) {.async: (raises: [CancelledError]).} = logScope: peer peer_score = peer.getScore() @@ -263,11 +264,11 @@ proc loop(self: ref BranchDiscovery) {.async: (raises: []).} = self[].peerQueue.popFirst() else: try: - self[].network.peerPool.acquireNoWait() + self[].pool.acquireNoWait() except PeerPoolError as exc: debug "Failed to acquire peer", exc = exc.msg continue - defer: self[].network.peerPool.release(peer) + defer: self[].pool.release(peer) await self[].discoverBranch(peer) except CancelledError: @@ -284,7 +285,7 @@ func state*(self: ref BranchDiscovery): BranchDiscoveryState = proc clearPeerQueue(self: ref BranchDiscovery) = while self[].peerQueue.len > 0: let peer = self[].peerQueue.popLast() - self[].network.peerPool.release(peer) + self[].pool.release(peer) proc start*(self: ref BranchDiscovery) = doAssert self[].loopFuture == nil @@ -309,13 +310,13 @@ proc resume*(self: ref BranchDiscovery) = self[].isActive.fire() beacon_sync_branchdiscovery_state.set(self.state.ord().int64) -proc transferOwnership*(self: ref BranchDiscovery, peer: Peer) = +proc transferOwnership*[A, B](self: ref BranchDiscovery[A, B], peer: A) = const maxPeersInQueue = 10 if self.state != BranchDiscoveryState.Active or self[].peerQueue.len >= maxPeersInQueue or peer.getHeadSlot() <= self[].getFinalizedSlot() or self[].isBlockKnown(peer.getHeadRoot()): - self[].network.peerPool.release(peer) + self[].pool.release(peer) return debug "Peer transferred to branch discovery",