diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 6dc78cfed..93c557d9e 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -330,8 +330,8 @@ proc initFullNode( config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming, rng, taskpool, consensusManager, node.validatorMonitor, blobQuarantine, getBeaconTime) - blockVerifier = - proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool): + blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, + blobs: BlobSidecars, maybeFinalized: bool): Future[Result[void, VerifierError]] = # The design with a callback for block verification is unusual compared # to the rest of the application, but fits with the general approach @@ -339,22 +339,10 @@ proc initFullNode( # that should probably be reimagined more holistically in the future. let resfut = newFuture[Result[void, VerifierError]]("blockVerifier") blockProcessor[].addBlock(MsgSource.gossip, signedBlock, - BlobSidecars @[], + blobs, resfut, maybeFinalized = maybeFinalized) resfut - blockBlobsVerifier = proc(signedBlock: ForkedSignedBeaconBlock, - blobs: BlobSidecars, - maybeFinalized: bool): - Future[Result[void, VerifierError]] = - # The design with a callback for block verification is unusual compared - # to the rest of the application, but fits with the general approach - # taken in the sync/request managers - this is an architectural compromise - # that should probably be reimagined more holistically in the future. - let resfut = newFuture[Result[void, VerifierError]]("blockVerifier") - blockProcessor[].addBlock(MsgSource.gossip, signedBlock, - blobs, resfut, maybeFinalized = maybeFinalized) - resfut processor = Eth2Processor.new( config.doppelgangerDetection, blockProcessor, node.validatorMonitor, dag, attestationPool, @@ -363,11 +351,11 @@ proc initFullNode( syncManager = newSyncManager[Peer, PeerId]( node.network.peerPool, dag.cfg.DENEB_FORK_EPOCH, SyncQueueKind.Forward, getLocalHeadSlot, getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot, - getFrontfillSlot, dag.tail.slot, blockVerifier, blockBlobsVerifier) + getFrontfillSlot, dag.tail.slot, blockVerifier) backfiller = newSyncManager[Peer, PeerId]( node.network.peerPool, dag.cfg.DENEB_FORK_EPOCH, SyncQueueKind.Backward, getLocalHeadSlot, getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot, - getFrontfillSlot, dag.backfill.slot, blockVerifier, blockBlobsVerifier, + getFrontfillSlot, dag.backfill.slot, blockVerifier, maxHeadAge = 0) router = (ref MessageRouter)( processor: processor, diff --git a/beacon_chain/sync/request_manager.nim b/beacon_chain/sync/request_manager.nim index 10ee8ae02..69d8d3d5b 100644 --- a/beacon_chain/sync/request_manager.nim +++ b/beacon_chain/sync/request_manager.nim @@ -32,13 +32,9 @@ const type BlockVerifier* = - proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool): - Future[Result[void, VerifierError]] {.gcsafe, raises: [Defect].} - BlockBlobsVerifier* = proc(signedBlock: ForkedSignedBeaconBlock, blobs: BlobSidecars, maybeFinalized: bool): Future[Result[void, VerifierError]] {.gcsafe, raises: [Defect].} - RequestManager* = object network*: Eth2Node inpQueue*: AsyncQueue[FetchRecord] @@ -95,7 +91,9 @@ proc fetchAncestorBlocksFromNetwork(rman: RequestManager, gotUnviableBlock = false for b in ublocks: - let ver = await rman.blockVerifier(b[], false) + let ver = await rman.blockVerifier(b[], BlobSidecars @[], false) + # TODO: + # blob handling for Deneb blocks if ver.isErr(): case ver.error() of VerifierError.MissingParent: diff --git a/beacon_chain/sync/sync_manager.nim b/beacon_chain/sync/sync_manager.nim index a56cae0dc..5a1ab724d 100644 --- a/beacon_chain/sync/sync_manager.nim +++ b/beacon_chain/sync/sync_manager.nim @@ -64,7 +64,6 @@ type queue: SyncQueue[A] syncFut: Future[void] blockVerifier: BlockVerifier - blockBlobsVerifier: BlockBlobsVerifier inProgress*: bool insSyncSpeed*: float avgSyncSpeed*: float @@ -99,8 +98,7 @@ proc initQueue[A, B](man: SyncManager[A, B]) = man.queue = SyncQueue.init(A, man.direction, man.getFirstSlot(), man.getLastSlot(), man.chunkSize, man.getSafeSlot, man.blockVerifier, - man.blockBlobsVerifier, 1, - man.ident) + 1, man.ident) of SyncQueueKind.Backward: let firstSlot = man.getFirstSlot() @@ -113,8 +111,7 @@ proc initQueue[A, B](man: SyncManager[A, B]) = Slot(firstSlot - 1'u64) man.queue = SyncQueue.init(A, man.direction, startSlot, lastSlot, man.chunkSize, man.getSafeSlot, - man.blockVerifier, man.blockBlobsVerifier, 1, - man.ident) + man.blockVerifier, 1, man.ident) proc newSyncManager*[A, B](pool: PeerPool[A, B], denebEpoch: Epoch, @@ -126,7 +123,6 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B], getFrontfillSlotCb: GetSlotCallback, progressPivot: Slot, blockVerifier: BlockVerifier, - blockBlobsVerifier: BlockBlobsVerifier, maxHeadAge = uint64(SLOTS_PER_EPOCH * 1), chunkSize = uint64(SLOTS_PER_EPOCH), flags: set[SyncManagerFlag] = {}, @@ -150,7 +146,6 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B], maxHeadAge: maxHeadAge, chunkSize: chunkSize, blockVerifier: blockVerifier, - blockBlobsVerifier: blockBlobsVerifier, notInSyncEvent: newAsyncEvent(), direction: direction, ident: ident, diff --git a/beacon_chain/sync/sync_queue.nim b/beacon_chain/sync/sync_queue.nim index 1a7e64846..d4880b41b 100644 --- a/beacon_chain/sync/sync_queue.nim +++ b/beacon_chain/sync/sync_queue.nim @@ -25,12 +25,8 @@ logScope: type GetSlotCallback* = proc(): Slot {.gcsafe, raises: [Defect].} ProcessingCallback* = proc() {.gcsafe, raises: [Defect].} - BlockVerifier* = - proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool): - Future[Result[void, VerifierError]] {.gcsafe, raises: [Defect].} - BlockBlobsVerifier* = - proc(signedBlock: ForkedSignedBeaconBlock, blobs: BlobSidecars, - maybeFinalized: bool): + BlockVerifier* = proc(signedBlock: ForkedSignedBeaconBlock, + blobs: BlobSidecars, maybeFinalized: bool): Future[Result[void, VerifierError]] {.gcsafe, raises: [Defect].} SyncQueueKind* {.pure.} = enum @@ -79,7 +75,6 @@ type readyQueue: HeapQueue[SyncResult[T]] rewind: Option[RewindPoint] blockVerifier: BlockVerifier - blockBlobsVerifier: BlockBlobsVerifier ident*: string chronicles.formatIt SyncQueueKind: toLowerAscii($it) @@ -198,7 +193,6 @@ proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T], start, final: Slot, chunkSize: uint64, getSafeSlotCb: GetSlotCallback, blockVerifier: BlockVerifier, - blockBlobsVerifier: BlockBlobsVerifier, syncQueueSize: int = -1, ident: string = "main"): SyncQueue[T] = ## Create new synchronization queue with parameters @@ -270,7 +264,6 @@ proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T], inpSlot: start, outSlot: start, blockVerifier: blockVerifier, - blockBlobsVerifier: blockBlobsVerifier, ident: ident ) @@ -684,9 +677,9 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], var i=0 for blk in sq.blocks(item): if reqres.get().blobs.isNone(): - res = await sq.blockVerifier(blk[], maybeFinalized) + res = await sq.blockVerifier(blk[], BlobSidecars @[], maybeFinalized) else: - res = await sq.blockBlobsVerifier(blk[], reqres.get().blobs.get()[i], maybeFinalized) + res = await sq.blockVerifier(blk[], reqres.get().blobs.get()[i], maybeFinalized) inc(i) if res.isOk(): diff --git a/tests/test_sync_manager.nim b/tests/test_sync_manager.nim index 12ccafeff..72a9b3df0 100644 --- a/tests/test_sync_manager.nim +++ b/tests/test_sync_manager.nim @@ -49,7 +49,8 @@ proc collector(queue: AsyncQueue[BlockEntry]): BlockVerifier = # in the async queue, similar to how BlockProcessor does it - as far as # testing goes, this is risky because it might introduce differences between # the BlockProcessor and this test - proc verify(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool): + proc verify(signedBlock: ForkedSignedBeaconBlock, blobs: BlobSidecars, + maybeFinalized: bool): Future[Result[void, VerifierError]] = let fut = newFuture[Result[void, VerifierError]]() try: queue.addLastNoWait(BlockEntry(blck: signedBlock, resfut: fut)) @@ -94,7 +95,7 @@ suite "SyncManager test suite": var queue = SyncQueue.init(SomeTPeer, kind, Slot(0), Slot(0), 1'u64, getStaticSlotCb(Slot(0)), - collector(aq), nil) + collector(aq)) check: len(queue) == 1 pendingLen(queue) == 0 @@ -190,7 +191,7 @@ suite "SyncManager test suite": var queue = SyncQueue.init(SomeTPeer, kind, item[0], item[1], item[2], getStaticSlotCb(item[0]), - collector(aq), nil) + collector(aq)) check: len(queue) == item[4] pendingLen(queue) == item[5] @@ -214,11 +215,11 @@ suite "SyncManager test suite": of SyncQueueKind.Forward: SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, Slot(0), Slot(1), 1'u64, - getStaticSlotCb(Slot(0)), collector(aq), nil) + getStaticSlotCb(Slot(0)), collector(aq)) of SyncQueueKind.Backward: SyncQueue.init(SomeTPeer, SyncQueueKind.Backward, Slot(1), Slot(0), 1'u64, - getStaticSlotCb(Slot(1)), collector(aq), nil) + getStaticSlotCb(Slot(1)), collector(aq)) let p1 = SomeTPeer() let p2 = SomeTPeer() @@ -312,11 +313,11 @@ suite "SyncManager test suite": of SyncQueueKind.Forward: SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, start, finish, chunkSize, - getStaticSlotCb(start), collector(aq), nil) + getStaticSlotCb(start), collector(aq)) of SyncQueueKind.Backward: SyncQueue.init(SomeTPeer, SyncQueueKind.Backward, finish, start, chunkSize, - getStaticSlotCb(finish), collector(aq), nil) + getStaticSlotCb(finish), collector(aq)) chain = createChain(start, finish) validatorFut = case kkind @@ -383,12 +384,12 @@ suite "SyncManager test suite": of SyncQueueKind.Forward: SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, startSlot, finishSlot, chunkSize, - getStaticSlotCb(startSlot), collector(aq), nil, + getStaticSlotCb(startSlot), collector(aq), queueSize) of SyncQueueKind.Backward: SyncQueue.init(SomeTPeer, SyncQueueKind.Backward, finishSlot, startSlot, chunkSize, - getStaticSlotCb(finishSlot), collector(aq), nil, + getStaticSlotCb(finishSlot), collector(aq), queueSize) validatorFut = case kkind @@ -491,11 +492,11 @@ suite "SyncManager test suite": of SyncQueueKind.Forward: SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, start, finish, chunkSize, - getFowardSafeSlotCb, collector(aq), nil) + getFowardSafeSlotCb, collector(aq)) of SyncQueueKind.Backward: SyncQueue.init(SomeTPeer, SyncQueueKind.Backward, finish, start, chunkSize, - getBackwardSafeSlotCb, collector(aq), nil) + getBackwardSafeSlotCb, collector(aq)) chain = createChain(start, finish) validatorFut = case kkind @@ -584,11 +585,11 @@ suite "SyncManager test suite": of SyncQueueKind.Forward: SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, start, finish, chunkSize, - getFowardSafeSlotCb, collector(aq), nil) + getFowardSafeSlotCb, collector(aq)) of SyncQueueKind.Backward: SyncQueue.init(SomeTPeer, SyncQueueKind.Backward, finish, start, chunkSize, - getBackwardSafeSlotCb, collector(aq), nil) + getBackwardSafeSlotCb, collector(aq)) chain = createChain(start, finish) validatorFut = failingValidator(aq) @@ -735,7 +736,7 @@ suite "SyncManager test suite": chain = createChain(startSlot, finishSlot) queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, startSlot, finishSlot, chunkSize, - getStaticSlotCb(startSlot), collector(aq), nil, + getStaticSlotCb(startSlot), collector(aq), queueSize) validatorFut = forwardValidator(aq) @@ -850,7 +851,7 @@ suite "SyncManager test suite": chain = createChain(startSlot, finishSlot) queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, startSlot, finishSlot, chunkSize, - getStaticSlotCb(startSlot), collector(aq), nil, + getStaticSlotCb(startSlot), collector(aq), queueSize) validatorFut = forwardValidator(aq) @@ -909,7 +910,7 @@ suite "SyncManager test suite": chain = createChain(startSlot, finishSlot) queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Backward, finishSlot, startSlot, chunkSize, - getSafeSlot, collector(aq), nil, queueSize) + getSafeSlot, collector(aq), queueSize) validatorFut = backwardValidator(aq) let @@ -1097,7 +1098,7 @@ suite "SyncManager test suite": var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64), 1'u64, getStaticSlotCb(Slot(0)), - collector(aq), nil, 2) + collector(aq), 2) let finalizedSlot = start_slot(Epoch(0'u64)) let startSlot = start_slot(Epoch(0'u64)) + 1'u64 let finishSlot = start_slot(Epoch(2'u64)) @@ -1109,7 +1110,7 @@ suite "SyncManager test suite": var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64), 1'u64, getStaticSlotCb(Slot(0)), - collector(aq), nil, 2) + collector(aq), 2) let finalizedSlot = start_slot(Epoch(1'u64)) let startSlot = start_slot(Epoch(1'u64)) + 1'u64 let finishSlot = start_slot(Epoch(3'u64)) @@ -1121,7 +1122,7 @@ suite "SyncManager test suite": var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64), 1'u64, getStaticSlotCb(Slot(0)), - collector(aq), nil, 2) + collector(aq), 2) let finalizedSlot = start_slot(Epoch(0'u64)) let failSlot = Slot(0xFFFF_FFFF_FFFF_FFFFF'u64) let failEpoch = epoch(failSlot) @@ -1139,7 +1140,7 @@ suite "SyncManager test suite": var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward, Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64), 1'u64, getStaticSlotCb(Slot(0)), - collector(aq), nil, 2) + collector(aq), 2) let finalizedSlot = start_slot(Epoch(1'u64)) let failSlot = Slot(0xFFFF_FFFF_FFFF_FFFFF'u64) let failEpoch = epoch(failSlot) @@ -1158,7 +1159,7 @@ suite "SyncManager test suite": let getSafeSlot = getStaticSlotCb(Slot(1024)) var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Backward, Slot(1024), Slot(0), - 1'u64, getSafeSlot, collector(aq), nil, 2) + 1'u64, getSafeSlot, collector(aq), 2) let safeSlot = getSafeSlot() for i in countdown(1023, 0): check queue.getRewindPoint(Slot(i), safeSlot) == safeSlot