generic branch discovery version that supports mocking peers

This commit is contained in:
Etan Kissling 2024-03-27 16:00:36 +01:00
parent f8be7c326e
commit 02a69be4e2
No known key found for this signature in database
GPG Key ID: B21DA824C5A3D03D
3 changed files with 20 additions and 19 deletions

View File

@ -85,7 +85,7 @@ type
requestManager*: RequestManager requestManager*: RequestManager
syncManager*: SyncManager[Peer, PeerId] syncManager*: SyncManager[Peer, PeerId]
backfiller*: SyncManager[Peer, PeerId] backfiller*: SyncManager[Peer, PeerId]
branchDiscovery*: ref BranchDiscovery branchDiscovery*: ref BranchDiscovery[Peer, PeerId]
genesisSnapshotContent*: string genesisSnapshotContent*: string
processor*: ref Eth2Processor processor*: ref Eth2Processor
blockProcessor*: ref BlockProcessor blockProcessor*: ref BlockProcessor

View File

@ -445,8 +445,8 @@ proc initFullNode(
blockProcessor, node.validatorMonitor, dag, attestationPool, blockProcessor, node.validatorMonitor, dag, attestationPool,
validatorChangePool, node.attachedValidators, syncCommitteeMsgPool, validatorChangePool, node.attachedValidators, syncCommitteeMsgPool,
lightClientPool, quarantine, blobQuarantine, rng, getBeaconTime, taskpool) lightClientPool, quarantine, blobQuarantine, rng, getBeaconTime, taskpool)
branchDiscovery = BranchDiscovery.new( branchDiscovery = BranchDiscovery[Peer, PeerId].new(
node.network, getFirstSlotAtFinalizedEpoch, isBlockKnown, node.network.peerPool, getFirstSlotAtFinalizedEpoch, isBlockKnown,
branchDiscoveryBlockVerifier) branchDiscoveryBlockVerifier)
fallbackSyncer = proc(peer: Peer) = fallbackSyncer = proc(peer: Peer) =
branchDiscovery.transferOwnership(peer) branchDiscovery.transferOwnership(peer)

View File

@ -62,23 +62,23 @@ type
blobs: Opt[BlobSidecars] blobs: Opt[BlobSidecars]
): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} ): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).}
BranchDiscovery* = object BranchDiscovery*[A, B] = object
network: Eth2Node pool: PeerPool[A, B]
getFinalizedSlot: GetSlotCallback getFinalizedSlot: GetSlotCallback
isBlockKnown: IsBlockKnownCallback isBlockKnown: IsBlockKnownCallback
blockVerifier: BlockVerifierCallback blockVerifier: BlockVerifierCallback
isActive: AsyncEvent isActive: AsyncEvent
loopFuture: Future[void].Raising([]) loopFuture: Future[void].Raising([])
peerQueue: Deque[Peer] peerQueue: Deque[A]
proc new*( proc new*[A, B](
T: type BranchDiscovery, T: type BranchDiscovery[A, B],
network: Eth2Node, pool: PeerPool[A, B],
getFinalizedSlot: GetSlotCallback, getFinalizedSlot: GetSlotCallback,
isBlockKnown: IsBlockKnownCallback, isBlockKnown: IsBlockKnownCallback,
blockVerifier: BlockVerifierCallback): ref BranchDiscovery = blockVerifier: BlockVerifierCallback): ref BranchDiscovery[A, B] =
let self = (ref BranchDiscovery)( let self = (ref BranchDiscovery[A, B])(
network: network, pool: pool,
getFinalizedSlot: getFinalizedSlot, getFinalizedSlot: getFinalizedSlot,
isBlockKnown: isBlockKnown, isBlockKnown: isBlockKnown,
blockVerifier: blockVerifier, blockVerifier: blockVerifier,
@ -86,8 +86,9 @@ proc new*(
self[].isActive.fire() self[].isActive.fire()
self self
proc discoverBranch( proc discoverBranch[A, B](
self: BranchDiscovery, peer: Peer) {.async: (raises: [CancelledError]).} = self: BranchDiscovery[A, B],
peer: A) {.async: (raises: [CancelledError]).} =
logScope: logScope:
peer peer
peer_score = peer.getScore() peer_score = peer.getScore()
@ -263,11 +264,11 @@ proc loop(self: ref BranchDiscovery) {.async: (raises: []).} =
self[].peerQueue.popFirst() self[].peerQueue.popFirst()
else: else:
try: try:
self[].network.peerPool.acquireNoWait() self[].pool.acquireNoWait()
except PeerPoolError as exc: except PeerPoolError as exc:
debug "Failed to acquire peer", exc = exc.msg debug "Failed to acquire peer", exc = exc.msg
continue continue
defer: self[].network.peerPool.release(peer) defer: self[].pool.release(peer)
await self[].discoverBranch(peer) await self[].discoverBranch(peer)
except CancelledError: except CancelledError:
@ -284,7 +285,7 @@ func state*(self: ref BranchDiscovery): BranchDiscoveryState =
proc clearPeerQueue(self: ref BranchDiscovery) = proc clearPeerQueue(self: ref BranchDiscovery) =
while self[].peerQueue.len > 0: while self[].peerQueue.len > 0:
let peer = self[].peerQueue.popLast() let peer = self[].peerQueue.popLast()
self[].network.peerPool.release(peer) self[].pool.release(peer)
proc start*(self: ref BranchDiscovery) = proc start*(self: ref BranchDiscovery) =
doAssert self[].loopFuture == nil doAssert self[].loopFuture == nil
@ -309,13 +310,13 @@ proc resume*(self: ref BranchDiscovery) =
self[].isActive.fire() self[].isActive.fire()
beacon_sync_branchdiscovery_state.set(self.state.ord().int64) beacon_sync_branchdiscovery_state.set(self.state.ord().int64)
proc transferOwnership*(self: ref BranchDiscovery, peer: Peer) = proc transferOwnership*[A, B](self: ref BranchDiscovery[A, B], peer: A) =
const maxPeersInQueue = 10 const maxPeersInQueue = 10
if self.state != BranchDiscoveryState.Active or if self.state != BranchDiscoveryState.Active or
self[].peerQueue.len >= maxPeersInQueue or self[].peerQueue.len >= maxPeersInQueue or
peer.getHeadSlot() <= self[].getFinalizedSlot() or peer.getHeadSlot() <= self[].getFinalizedSlot() or
self[].isBlockKnown(peer.getHeadRoot()): self[].isBlockKnown(peer.getHeadRoot()):
self[].network.peerPool.release(peer) self[].pool.release(peer)
return return
debug "Peer transferred to branch discovery", debug "Peer transferred to branch discovery",