introduce ForkedBlobSidecar for EIP-7688 Electra period before PeerDAS

On `ELECTRA_FORK_EPOCH`, PeerDAS is not yet activated, hence the current
mechanism based on `BlobSidecar` is still in use. With EIP-7688, the
generalized indices of `BeaconBlockBody` get reindexed, changing the
length of the inclusion proof within the `BlobSidecar`. Because network
Req/Resp operations allow responses across fork boundaries, this creates
the need for a `ForkedBlobSidecar` in that layer, same as already done
for `ForkedSignedBeaconBock` for similar reasons.

Note: This PR is only needed if PeerDAS is adopted _after_ EIP-7688.
If PeerDAS is adopted _before_ EIP-7688, a similar PR may be needed for
forked columns. Coincidental `Forked` jank can only be fully avoided if
both features activate at the same epoch, actual changes to blobs aside.
Delaying EIP-7688 for sole purpose of epoch alignemnt is not worth it.
This commit is contained in:
Etan Kissling 2024-07-25 18:51:22 +02:00
parent 8c621b9ae6
commit d028baea2a
No known key found for this signature in database
GPG Key ID: B21DA824C5A3D03D
19 changed files with 548 additions and 310 deletions

View File

@ -55,7 +55,7 @@ OK: 4/4 Fail: 0/4 Skip: 0/4
+ sanity check Deneb states [Preset: mainnet] OK
+ sanity check Deneb states, reusing buffers [Preset: mainnet] OK
+ sanity check Electra blocks [Preset: mainnet] OK
+ sanity check blobs [Preset: mainnet] OK
+ sanity check blobs (Deneb) [Preset: mainnet] OK
+ sanity check genesis roundtrip [Preset: mainnet] OK
+ sanity check phase 0 blocks [Preset: mainnet] OK
+ sanity check phase 0 getState rollback [Preset: mainnet] OK

View File

@ -114,7 +114,7 @@ type
keyValues: KvStoreRef # Random stuff using DbKeyKind - suitable for small values mainly!
blocks: array[ConsensusFork, KvStoreRef] # BlockRoot -> TrustedSignedBeaconBlock
blobs: KvStoreRef # (BlockRoot -> BlobSidecar)
blobs: array[BlobFork, KvStoreRef] # (BlockRoot -> BlobSidecar)
stateRoots: KvStoreRef # (Slot, BlockRoot) -> StateRoot
@ -559,9 +559,9 @@ proc new*(T: type BeaconChainDB,
sealedPeriods: "lc_sealed_periods")).expectDb()
static: doAssert LightClientDataFork.high == LightClientDataFork.Electra
var blobs : KvStoreRef
var blobs: array[BlobFork, KvStoreRef]
if cfg.DENEB_FORK_EPOCH != FAR_FUTURE_EPOCH:
blobs = kvStore db.openKvStore("deneb_blobs").expectDb()
blobs[BlobFork.Deneb] = kvStore db.openKvStore("deneb_blobs").expectDb()
# Versions prior to 1.4.0 (altair) stored validators in `immutable_validators`
# which stores validator keys in compressed format - this is
@ -765,8 +765,9 @@ proc close*(db: BeaconChainDB) =
if db.db == nil: return
# Close things roughly in reverse order
if not isNil(db.blobs):
discard db.blobs.close()
for blobFork in BlobFork:
if not isNil(db.blobs[blobFork]):
discard db.blobs[blobFork].close()
db.lcData.close()
db.finalizedBlocks.close()
discard db.summaries.close()
@ -812,16 +813,19 @@ proc putBlock*(
db.blocks[type(value).kind].putSZSSZ(value.root.data, value)
db.putBeaconBlockSummary(value.root, value.message.toBeaconBlockSummary())
proc putBlobSidecar*(
db: BeaconChainDB,
value: BlobSidecar) =
proc putBlobSidecar*[T: ForkyBlobSidecar](
db: BeaconChainDB, value: T) =
let block_root = hash_tree_root(value.signed_block_header.message)
db.blobs.putSZSSZ(blobkey(block_root, value.index), value)
db.blobs[T.kind].putSZSSZ(blobkey(block_root, value.index), value)
proc delBlobSidecar*(
db: BeaconChainDB,
root: Eth2Digest, index: BlobIndex): bool =
db.blobs.del(blobkey(root, index)).expectDb()
var res = false
for blobFork in BlobFork:
if db.blobs[blobFork].del(blobkey(root, index)).expectDb():
res = true
res
proc updateImmutableValidators*(
db: BeaconChainDB, validators: openArray[Validator]) =
@ -1070,16 +1074,18 @@ proc getBlockSSZ*(
withConsensusFork(fork):
getBlockSSZ(db, key, data, consensusFork.TrustedSignedBeaconBlock)
proc getBlobSidecarSZ*(db: BeaconChainDB, root: Eth2Digest, index: BlobIndex,
data: var seq[byte]): bool =
proc getBlobSidecarSZ*[T: ForkyBlobSidecar](
db: BeaconChainDB, root: Eth2Digest, index: BlobIndex,
data: var seq[byte]): bool =
let dataPtr = addr data # Short-lived
func decode(data: openArray[byte]) =
assign(dataPtr[], data)
db.blobs.get(blobkey(root, index), decode).expectDb()
db.blobs[T.kind].get(blobkey(root, index), decode).expectDb()
proc getBlobSidecar*(db: BeaconChainDB, root: Eth2Digest, index: BlobIndex,
value: var BlobSidecar): bool =
db.blobs.getSZSSZ(blobkey(root, index), value) == GetResult.found
proc getBlobSidecar*[T: ForkyBlobSidecar](
db: BeaconChainDB, root: Eth2Digest, index: BlobIndex,
value: var T): bool =
db.blobs[T.kind].getSZSSZ(blobkey(root, index), value) == GetResult.found
proc getBlockSZ*(
db: BeaconChainDB, key: Eth2Digest, data: var seq[byte],

View File

@ -21,8 +21,8 @@ const
type
BlobQuarantine* = object
blobs*:
OrderedTable[(Eth2Digest, BlobIndex, KzgCommitment), ref BlobSidecar]
blobs*: OrderedTable[
(Eth2Digest, BlobIndex, KzgCommitment), ForkedBlobSidecar]
onBlobSidecarCallback*: OnBlobSidecarCallback
BlobFetchRecord* = object
@ -38,7 +38,7 @@ func shortLog*(x: seq[BlobIndex]): string =
func shortLog*(x: seq[BlobFetchRecord]): string =
"[" & x.mapIt(shortLog(it.block_root) & shortLog(it.indices)).join(", ") & "]"
func put*(quarantine: var BlobQuarantine, blobSidecar: ref BlobSidecar) =
func put*(quarantine: var BlobQuarantine, blobSidecar: ForkedBlobSidecar) =
if quarantine.blobs.lenu64 >= MaxBlobs:
# FIFO if full. For example, sync manager and request manager can race to
# put blobs in at the same time, so one gets blob insert -> block resolve
@ -53,43 +53,61 @@ func put*(quarantine: var BlobQuarantine, blobSidecar: ref BlobSidecar) =
oldest_blob_key = k
break
quarantine.blobs.del oldest_blob_key
let block_root = hash_tree_root(blobSidecar.signed_block_header.message)
discard quarantine.blobs.hasKeyOrPut(
(block_root, blobSidecar.index, blobSidecar.kzg_commitment), blobSidecar)
withForkyBlob(blobSidecar):
let block_root = hash_tree_root(forkyBlob[].signed_block_header.message)
discard quarantine.blobs.hasKeyOrPut(
(block_root, forkyBlob[].index, forkyBlob[].kzg_commitment), blobSidecar)
func put*(quarantine: var BlobQuarantine, blobSidecar: ref ForkyBlobSidecar) =
quarantine.put(ForkedBlobSidecar.init(blobSidecar))
func hasBlob*(
quarantine: BlobQuarantine,
slot: Slot,
proposer_index: uint64,
index: BlobIndex): bool =
for blob_sidecar in quarantine.blobs.values:
template block_header: untyped = blob_sidecar.signed_block_header.message
if block_header.slot == slot and
block_header.proposer_index == proposer_index and
blob_sidecar.index == index:
return true
for blobSidecar in quarantine.blobs.values:
withForkyBlob(blobSidecar):
template block_header: untyped = forkyBlob[].signed_block_header.message
if block_header.slot == slot and
block_header.proposer_index == proposer_index and
forkyBlob[].index == index:
return true
false
func popBlobs*(
quarantine: var BlobQuarantine, digest: Eth2Digest,
blck: deneb.SignedBeaconBlock | electra.SignedBeaconBlock):
seq[ref BlobSidecar] =
var r: seq[ref BlobSidecar] = @[]
blck:
deneb.SignedBeaconBlock |
electra.SignedBeaconBlock): auto =
const blobFork = blobForkAtConsensusFork(typeof(blck).kind).expect("Blobs OK")
type ResultType = blobFork.BlobSidecars
var r: ResultType = @[]
for idx, kzg_commitment in blck.message.body.blob_kzg_commitments:
var b: ref BlobSidecar
var b: ForkedBlobSidecar
if quarantine.blobs.pop((digest, BlobIndex idx, kzg_commitment), b):
r.add(b)
# It was already verified that the blob is linked to `blck`.
# Therefore, we can assume that `BlobFork` is correct.
doAssert b.kind == blobFork,
"Must verify blob inclusion proof before `BlobQuarantine.put`"
r.add(b.forky(blobFork))
r
func hasBlobs*(quarantine: BlobQuarantine,
blck: deneb.SignedBeaconBlock | electra.SignedBeaconBlock): bool =
func hasBlobs*(
quarantine: BlobQuarantine,
blck:
deneb.SignedBeaconBlock |
electra.SignedBeaconBlock): bool =
for idx, kzg_commitment in blck.message.body.blob_kzg_commitments:
if (blck.root, BlobIndex idx, kzg_commitment) notin quarantine.blobs:
return false
true
func blobFetchRecord*(quarantine: BlobQuarantine,
blck: deneb.SignedBeaconBlock | electra.SignedBeaconBlock): BlobFetchRecord =
func blobFetchRecord*(
quarantine: BlobQuarantine,
blck:
deneb.SignedBeaconBlock |
electra.SignedBeaconBlock): BlobFetchRecord =
var indices: seq[BlobIndex]
for i in 0..<len(blck.message.body.blob_kzg_commitments):
let idx = BlobIndex(i)

View File

@ -56,7 +56,7 @@ const
type
BlockEntry = object
blck*: ForkedSignedBeaconBlock
blobs*: Opt[BlobSidecars]
blobs*: Opt[ForkedBlobSidecars]
maybeFinalized*: bool
## The block source claims the block has been finalized already
resfut*: Future[Result[void, VerifierError]].Raising([CancelledError])
@ -173,7 +173,12 @@ from ../consensus_object_pools/block_clearance import
proc storeBackfillBlock(
self: var BlockProcessor,
signedBlock: ForkySignedBeaconBlock,
blobsOpt: Opt[BlobSidecars]): Result[void, VerifierError] =
blobsOpt: Opt[ForkyBlobSidecars]
): Result[void, VerifierError] =
const
consensusFork = typeof(signedBlock).kind
blobFork = blobForkAtConsensusFork(consensusFork).get(BlobFork.Deneb)
static: doAssert typeof(blobsOpt).T is blobFork.BlobSidecars
# The block is certainly not missing any more
self.consensusManager.quarantine[].missing.del(signedBlock.root)
@ -181,7 +186,7 @@ proc storeBackfillBlock(
# Establish blob viability before calling addbackfillBlock to avoid
# writing the block in case of blob error.
var blobsOk = true
when typeof(signedBlock).kind >= ConsensusFork.Deneb:
when consensusFork >= ConsensusFork.Deneb:
if blobsOpt.isSome:
let blobs = blobsOpt.get()
let kzgCommits = signedBlock.message.body.blob_kzg_commitments.asSeq
@ -220,7 +225,7 @@ proc storeBackfillBlock(
return res
# Only store blobs after successfully establishing block viability.
let blobs = blobsOpt.valueOr: BlobSidecars @[]
let blobs = blobsOpt.valueOr: blobFork.BlobSidecars() @[]
for b in blobs:
self.consensusManager.dag.db.putBlobSidecar(b[])
@ -381,17 +386,42 @@ proc checkBloblessSignature(
return err("checkBloblessSignature: Invalid proposer signature")
ok()
template withForkyBlckAndBlobs(
blck: ForkedSignedBeaconBlock,
blobs: Opt[ForkedBlobSidecars],
body: untyped): untyped =
withBlck(blck):
when consensusFork >= ConsensusFork.Deneb:
const blobFork = blobForkAtConsensusFork(consensusFork).expect("Blobs OK")
let forkyBlobs {.inject, used.} =
if blobs.isSome:
# Nim 2.0.8: `forks.BlobSidecars(blobFork)` does not work here:
# > type mismatch: got 'BlobFork' for 'blobFork`gensym15'
# but expected 'BlobSidecars'
var fBlobs: deneb.BlobSidecars
for blob in blobs.get:
doAssert blob.kind == blobFork,
"Must verify blob inclusion proof before `enqueueBlock`"
fBlobs.add blob.forky(blobFork)
Opt.some fBlobs
else:
Opt.none deneb.BlobSidecars
else:
doAssert blobs.isNone, "Blobs are not supported before Deneb"
let forkyBlobs {.inject, used.} = Opt.none deneb.BlobSidecars
body
proc enqueueBlock*(
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars],
blobs: Opt[ForkedBlobSidecars],
resfut: Future[Result[void, VerifierError]].Raising([CancelledError]) = nil,
maybeFinalized = false,
validationDur = Duration()) =
withBlck(blck):
withForkyBlckAndBlobs(blck, blobs):
if forkyBlck.message.slot <= self.consensusManager.dag.finalizedHead.slot:
# let backfill blocks skip the queue - these are always "fast" to process
# because there are no state rewinds to deal with
let res = self.storeBackfillBlock(forkyBlck, blobs)
let res = self.storeBackfillBlock(forkyBlck, forkyBlobs)
resfut.complete(res)
return
@ -409,14 +439,20 @@ proc enqueueBlock*(
proc storeBlock(
self: ref BlockProcessor, src: MsgSource, wallTime: BeaconTime,
signedBlock: ForkySignedBeaconBlock,
blobsOpt: Opt[BlobSidecars],
blobsOpt: Opt[ForkyBlobSidecars],
maybeFinalized = false,
queueTick: Moment = Moment.now(), validationDur = Duration()):
Future[Result[BlockRef, (VerifierError, ProcessingStatus)]] {.async: (raises: [CancelledError]).} =
queueTick: Moment = Moment.now(),
validationDur = Duration()
): Future[Result[BlockRef, (VerifierError, ProcessingStatus)]] {.
async: (raises: [CancelledError]).} =
## storeBlock is the main entry point for unvalidated blocks - all untrusted
## blocks, regardless of origin, pass through here. When storing a block,
## we will add it to the dag and pass it to all block consumers that need
## to know about it, such as the fork choice and the monitoring
const
consensusFork = typeof(signedBlock).kind
blobFork = blobForkAtConsensusFork(consensusFork).get(BlobFork.Deneb)
static: doAssert typeof(blobsOpt).T is blobFork.BlobSidecars
let
attestationPool = self.consensusManager.attestationPool
@ -497,16 +533,18 @@ proc storeBlock(
let blobs =
withBlck(parentBlck.get()):
when consensusFork >= ConsensusFork.Deneb:
var blob_sidecars: BlobSidecars
const blobFork =
blobForkAtConsensusFork(consensusFork).expect("Blobs OK")
var blob_sidecars: ForkedBlobSidecars
for i in 0 ..< forkyBlck.message.body.blob_kzg_commitments.len:
let blob = BlobSidecar.new()
let blob = blobFork.BlobSidecar.new()
if not dag.db.getBlobSidecar(parent_root, i.BlobIndex, blob[]):
blobsOk = false # Pruned, or inconsistent DB
break
blob_sidecars.add blob
blob_sidecars.add ForkedBlobSidecar.init(blob)
Opt.some blob_sidecars
else:
Opt.none BlobSidecars
Opt.none ForkedBlobSidecars
if blobsOk:
debug "Loaded parent block from storage", parent_root
self[].enqueueBlock(
@ -772,11 +810,11 @@ proc storeBlock(
withBlck(quarantined):
when typeof(forkyBlck).kind < ConsensusFork.Deneb:
self[].enqueueBlock(
MsgSource.gossip, quarantined, Opt.none(BlobSidecars))
MsgSource.gossip, quarantined, Opt.none(ForkedBlobSidecars))
else:
if len(forkyBlck.message.body.blob_kzg_commitments) == 0:
self[].enqueueBlock(
MsgSource.gossip, quarantined, Opt.some(BlobSidecars @[]))
MsgSource.gossip, quarantined, Opt.some(ForkedBlobSidecars @[]))
else:
if (let res = checkBloblessSignature(self[], forkyBlck); res.isErr):
warn "Failed to verify signature of unorphaned blobless block",
@ -784,8 +822,9 @@ proc storeBlock(
error = res.error()
continue
if self.blobQuarantine[].hasBlobs(forkyBlck):
let blobs = self.blobQuarantine[].popBlobs(
forkyBlck.root, forkyBlck)
let blobs = self.blobQuarantine[]
.popBlobs(forkyBlck.root, forkyBlck)
.mapIt(ForkedBlobSidecar.init(it))
self[].enqueueBlock(MsgSource.gossip, quarantined, Opt.some(blobs))
else:
discard self.consensusManager.quarantine[].addBlobless(
@ -798,8 +837,10 @@ proc storeBlock(
proc addBlock*(
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars], maybeFinalized = false,
validationDur = Duration()): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} =
blobs: Opt[ForkedBlobSidecars], maybeFinalized = false,
validationDur = Duration()
): Future[Result[void, VerifierError]] {.
async: (raises: [CancelledError], raw: true).} =
## Enqueue a Gossip-validated block for consensus verification
# Backpressure:
# There is no backpressure here - producers must wait for `resfut` to
@ -829,9 +870,9 @@ proc processBlock(
error "Processing block before genesis, clock turned back?"
quit 1
let res = withBlck(entry.blck):
let res = withForkyBlckAndBlobs(entry.blck, entry.blobs):
await self.storeBlock(
entry.src, wallTime, forkyBlck, entry.blobs, entry.maybeFinalized,
entry.src, wallTime, forkyBlck, forkyBlobs, entry.maybeFinalized,
entry.queueTick, entry.validationDur)
if res.isErr and res.error[1] == ProcessingStatus.notCompleted:

View File

@ -8,7 +8,7 @@
{.push raises: [].}
import
std/tables,
std/[sequtils, tables],
chronicles, chronos, metrics,
taskpools,
../spec/[helpers, forks],
@ -244,13 +244,15 @@ proc processSignedBeaconBlock*(
let blobs =
when typeof(signedBlock).kind >= ConsensusFork.Deneb:
if self.blobQuarantine[].hasBlobs(signedBlock):
Opt.some(self.blobQuarantine[].popBlobs(signedBlock.root, signedBlock))
Opt.some(self.blobQuarantine[]
.popBlobs(signedBlock.root, signedBlock)
.mapIt(ForkedBlobSidecar.init(it)))
else:
discard self.quarantine[].addBlobless(self.dag.finalizedHead.slot,
signedBlock)
return v
else:
Opt.none(BlobSidecars)
Opt.none(ForkedBlobSidecars)
self.blockProcessor[].enqueueBlock(
src, ForkedSignedBeaconBlock.init(signedBlock),
@ -308,7 +310,9 @@ proc processBlobSidecar*(
if self.blobQuarantine[].hasBlobs(forkyBlck):
self.blockProcessor[].enqueueBlock(
MsgSource.gossip, blobless,
Opt.some(self.blobQuarantine[].popBlobs(block_root, forkyBlck)))
Opt.some(self.blobQuarantine[]
.popBlobs(block_root, forkyBlck)
.mapIt(ForkedBlobSidecar.init(it))))
else:
discard self.quarantine[].addBlobless(
self.dag.finalizedHead.slot, forkyBlck)

View File

@ -2691,7 +2691,7 @@ proc broadcastBeaconBlock*(
node.broadcast(topic, blck)
proc broadcastBlobSidecar*(
node: Eth2Node, subnet_id: BlobId, blob: deneb.BlobSidecar):
node: Eth2Node, subnet_id: BlobId, blob: ForkyBlobSidecar):
Future[SendResult] {.async: (raises: [CancelledError], raw: true).} =
let
contextEpoch = blob.signed_block_header.message.slot.epoch

View File

@ -8,7 +8,7 @@
{.push raises: [].}
import
std/[os, random, terminal, times],
std/[os, random, sequtils, terminal, times],
chronos, chronicles,
metrics, metrics/chronos_httpserver,
stew/[byteutils, io2],
@ -403,18 +403,22 @@ proc initFullNode(
config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming,
rng, taskpool, consensusManager, node.validatorMonitor,
blobQuarantine, getBeaconTime)
blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars], maybeFinalized: bool):
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} =
blockVerifier = proc(
signedBlock: ForkedSignedBeaconBlock,
blobs: Opt[ForkedBlobSidecars],
maybeFinalized: bool
): Future[Result[void, VerifierError]] {.
async: (raises: [CancelledError], raw: true).} =
# The design with a callback for block verification is unusual compared
# to the rest of the application, but fits with the general approach
# taken in the sync/request managers - this is an architectural compromise
# that should probably be reimagined more holistically in the future.
blockProcessor[].addBlock(
MsgSource.gossip, signedBlock, blobs, maybeFinalized = maybeFinalized)
rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
maybeFinalized: bool):
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} =
rmanBlockVerifier = proc(
signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool
): Future[Result[void, VerifierError]] {.
async: (raises: [CancelledError]).} =
withBlck(signedBlock):
when consensusFork >= ConsensusFork.Deneb:
if not blobQuarantine[].hasBlobs(forkyBlck):
@ -425,24 +429,27 @@ proc initFullNode(
else:
err(VerifierError.MissingParent)
else:
let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck)
await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.some(blobs),
maybeFinalized = maybeFinalized)
let blobs = blobQuarantine[]
.popBlobs(forkyBlck.root, forkyBlck)
.mapIt(ForkedBlobSidecar.init(newClone(it)))
await blockProcessor[].addBlock(
MsgSource.gossip, signedBlock, Opt.some(blobs),
maybeFinalized = maybeFinalized)
else:
await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.none(BlobSidecars),
maybeFinalized = maybeFinalized)
await blockProcessor[].addBlock(
MsgSource.gossip, signedBlock, Opt.none(ForkedBlobSidecars),
maybeFinalized = maybeFinalized)
rmanBlockLoader = proc(
blockRoot: Eth2Digest): Opt[ForkedTrustedSignedBeaconBlock] =
dag.getForkedBlock(blockRoot)
rmanBlobLoader = proc(
blobId: BlobIdentifier): Opt[ref BlobSidecar] =
var blob_sidecar = BlobSidecar.new()
if dag.db.getBlobSidecar(blobId.block_root, blobId.index, blob_sidecar[]):
Opt.some blob_sidecar
else:
Opt.none(ref BlobSidecar)
blobId: BlobIdentifier): Opt[ForkedBlobSidecar] =
withAll(BlobFork):
var blob_sidecar = blobFork.BlobSidecar.new()
if dag.db.getBlobSidecar(
blobId.block_root, blobId.index, blob_sidecar[]):
return Opt.some ForkedBlobSidecar.init(blob_sidecar)
Opt.none(ForkedBlobSidecar)
processor = Eth2Processor.new(
config.doppelgangerDetection,
@ -1913,15 +1920,24 @@ proc installMessageValidators(node: BeaconNode) =
MsgSource.gossip, msg)))
when consensusFork >= ConsensusFork.Deneb:
const blobFork =
blobForkAtConsensusFork(consensusFork).expect("Blobs OK")
# blob_sidecar_{subnet_id}
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/deneb/p2p-interface.md#blob_sidecar_subnet_id
for it in BlobId:
closureScope: # Needed for inner `proc`; don't lift it out of loop.
let subnet_id = it
let
contextFork = consensusFork
subnet_id = it
node.network.addValidator(
getBlobSidecarTopic(digest, subnet_id), proc (
blobSidecar: deneb.BlobSidecar
blobSidecar: blobFork.BlobSidecar
): ValidationResult =
if contextFork != node.dag.cfg.consensusForkAtEpoch(
blobSidecar.signed_block_header.message.slot.epoch):
return ValidationResult.Reject
toValidationResult(
node.processor[].processBlobSidecar(
MsgSource.gossip, blobSidecar, subnet_id)))

View File

@ -1518,29 +1518,32 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
return RestApiResponse.jsonError(Http406, ContentNotAcceptableError)
res.get()
# https://github.com/ethereum/beacon-APIs/blob/v2.4.2/types/deneb/blob_sidecar.yaml#L2-L28
let data = newClone(default(List[BlobSidecar, Limit MAX_BLOBS_PER_BLOCK]))
consensusFork = node.dag.cfg.consensusForkAtEpoch(bid.slot.epoch)
if indices.isErr:
return RestApiResponse.jsonError(Http400,
InvalidSidecarIndexValueError)
withBlobFork(blobForkAtConsensusFork(consensusFork).get(BlobFork.Deneb)):
# https://github.com/ethereum/beacon-APIs/blob/v2.4.2/types/deneb/blob_sidecar.yaml#L2-L28
let data = newClone(
default(List[blobFork.BlobSidecar, Limit MAX_BLOBS_PER_BLOCK]))
let indexFilter = indices.get.toHashSet
if indices.isErr:
return RestApiResponse.jsonError(Http400,
InvalidSidecarIndexValueError)
for blobIndex in 0'u64 ..< MAX_BLOBS_PER_BLOCK:
if indexFilter.len > 0 and blobIndex notin indexFilter:
continue
let indexFilter = indices.get.toHashSet
var blobSidecar = new BlobSidecar
for blobIndex in 0'u64 ..< MAX_BLOBS_PER_BLOCK:
if indexFilter.len > 0 and blobIndex notin indexFilter:
continue
if node.dag.db.getBlobSidecar(bid.root, blobIndex, blobSidecar[]):
discard data[].add blobSidecar[]
var blobSidecar = new blobFork.BlobSidecar
if contentType == sszMediaType:
RestApiResponse.sszResponse(
data[], headers = [("eth-consensus-version",
node.dag.cfg.consensusForkAtEpoch(bid.slot.epoch).toString())])
elif contentType == jsonMediaType:
RestApiResponse.jsonResponse(data)
else:
RestApiResponse.jsonError(Http500, InvalidAcceptError)
if node.dag.db.getBlobSidecar(bid.root, blobIndex, blobSidecar[]):
discard data[].add blobSidecar[]
if contentType == sszMediaType:
RestApiResponse.sszResponse(data[], headers = [
("eth-consensus-version", consensusFork.toString())])
elif contentType == jsonMediaType:
RestApiResponse.jsonResponse(data)
else:
RestApiResponse.jsonError(Http500, InvalidAcceptError)

View File

@ -47,7 +47,6 @@ RestJson.useDefaultSerializationFor(
AttestationData,
BLSToExecutionChange,
BeaconBlockHeader,
BlobSidecar,
BlobSidecarInfoObject,
BlobsBundle,
Checkpoint,
@ -228,6 +227,7 @@ RestJson.useDefaultSerializationFor(
deneb.BeaconBlock,
deneb.BeaconBlockBody,
deneb.BeaconState,
deneb.BlobSidecar,
deneb.BlockContents,
deneb.ExecutionPayload,
deneb.ExecutionPayloadHeader,

View File

@ -274,6 +274,22 @@ type
ForkyMsgTrustedSignedBeaconBlock |
ForkyTrustedSignedBeaconBlock
BlobFork* {.pure.} = enum
Deneb
ForkyBlobSidecar* =
deneb.BlobSidecar
ForkyBlobSidecars* =
deneb.BlobSidecars
ForkedBlobSidecar* = object
case kind*: BlobFork
of BlobFork.Deneb:
denebData*: ref deneb.BlobSidecar
ForkedBlobSidecars* = seq[ForkedBlobSidecar]
EpochInfoFork* {.pure.} = enum
Phase0
Altair
@ -815,6 +831,75 @@ static:
for fork in ConsensusFork:
doAssert ConsensusFork.init(fork.toString()).expect("init defined") == fork
template kind*(x: typedesc[deneb.BlobSidecar]): BlobFork =
BlobFork.Deneb
template kzg_commitment_inclusion_proof_gindex*(
kind: static BlobFork, index: BlobIndex): GeneralizedIndex =
when kind == BlobFork.Deneb:
deneb.kzg_commitment_inclusion_proof_gindex(index)
else:
{.error: "kzg_commitment_inclusion_proof_gindex does not support " & $kind.}
template BlobSidecar*(kind: static BlobFork): auto =
when kind == BlobFork.Deneb:
typedesc[deneb.BlobSidecar]
else:
{.error: "BlobSidecar does not support " & $kind.}
template BlobSidecars*(kind: static BlobFork): auto =
when kind == BlobFork.Deneb:
typedesc[deneb.BlobSidecars]
else:
{.error: "BlobSidecars does not support " & $kind.}
template withAll*(x: typedesc[BlobFork], body: untyped): untyped =
static: doAssert BlobFork.high == BlobFork.Deneb
block:
const blobFork {.inject, used.} = BlobFork.Deneb
body
template withBlobFork*(x: BlobFork, body: untyped): untyped =
case x
of BlobFork.Deneb:
const blobFork {.inject, used.} = BlobFork.Deneb
body
template withForkyBlob*(x: ForkedBlobSidecar, body: untyped): untyped =
case x.kind
of BlobFork.Deneb:
const blobFork {.inject, used.} = BlobFork.Deneb
template forkyBlob: untyped {.inject, used.} = x.denebData
body
func init*(
x: typedesc[ForkedBlobSidecar],
forkyData: ref ForkyBlobSidecar): ForkedBlobSidecar =
const kind = typeof(forkyData[]).kind
when kind == BlobFork.Deneb:
ForkedBlobSidecar(kind: kind, denebData: forkyData)
else:
{.error: "ForkedBlobSidecar.init does not support " & $kind.}
template forky*(x: ForkedBlobSidecar, kind: static BlobFork): untyped =
when kind == BlobFork.Deneb:
x.denebData
else:
{.error: "ForkedBlobSidecar.forky does not support " & $kind.}
func shortLog*[T: ForkedBlobSidecar](x: T): auto =
type ResultType = object
case kind: BlobFork
of BlobFork.Deneb:
denebData: typeof(x.denebData.shortLog())
let xKind = x.kind # https://github.com/nim-lang/Nim/issues/23762
case xKind
of BlobFork.Deneb:
ResultType(kind: xKind, denebData: x.denebData.shortLog())
chronicles.formatIt ForkedBlobSidecar: it.shortLog
template init*(T: type ForkedEpochInfo, info: phase0.EpochInfo): T =
T(kind: EpochInfoFork.Phase0, phase0Data: info)
template init*(T: type ForkedEpochInfo, info: altair.EpochInfo): T =
@ -1323,6 +1408,13 @@ func forkVersion*(cfg: RuntimeConfig, consensusFork: ConsensusFork): Version =
of ConsensusFork.Deneb: cfg.DENEB_FORK_VERSION
of ConsensusFork.Electra: cfg.ELECTRA_FORK_VERSION
func blobForkAtConsensusFork*(consensusFork: ConsensusFork): Opt[BlobFork] =
static: doAssert BlobFork.high == BlobFork.Deneb
if consensusFork >= ConsensusFork.Deneb:
Opt.some BlobFork.Deneb
else:
Opt.none BlobFork
func lcDataForkAtConsensusFork*(
consensusFork: ConsensusFork): LightClientDataFork =
static: doAssert LightClientDataFork.high == LightClientDataFork.Electra

View File

@ -11,7 +11,7 @@
import
# Status libraries
stew/[byteutils, endians2, objects],
stew/[bitops2, byteutils, endians2, objects],
chronicles,
eth/common/[eth_types, eth_types_rlp],
eth/rlp, eth/trie/[db, hexary],
@ -223,12 +223,13 @@ func has_flag*(flags: ParticipationFlags, flag_index: TimelyFlag): bool =
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.4/specs/deneb/p2p-interface.md#check_blob_sidecar_inclusion_proof
func verify_blob_sidecar_inclusion_proof*(
blob_sidecar: BlobSidecar): Result[void, string] =
let gindex = kzg_commitment_inclusion_proof_gindex(blob_sidecar.index)
blob_sidecar: ForkyBlobSidecar): Result[void, string] =
let gindex = withBlobFork(typeof(blob_sidecar).kind):
blobFork.kzg_commitment_inclusion_proof_gindex(blob_sidecar.index)
if not is_valid_merkle_branch(
hash_tree_root(blob_sidecar.kzg_commitment),
blob_sidecar.kzg_commitment_inclusion_proof,
KZG_COMMITMENT_INCLUSION_PROOF_DEPTH,
log2trunc(gindex),
get_subtree_index(gindex),
blob_sidecar.signed_block_header.message.body_root):
return err("BlobSidecar: inclusion proof not valid")
@ -237,23 +238,28 @@ func verify_blob_sidecar_inclusion_proof*(
func create_blob_sidecars*(
forkyBlck: deneb.SignedBeaconBlock | electra.SignedBeaconBlock,
kzg_proofs: KzgProofs,
blobs: Blobs): seq[BlobSidecar] =
blobs: Blobs): auto =
const
consensusFork = typeof(forkyBlck).kind
blobFork = blobForkAtConsensusFork(consensusFork).expect("Blobs OK")
type ResultType = seq[blobFork.BlobSidecar]
template kzg_commitments: untyped =
forkyBlck.message.body.blob_kzg_commitments
doAssert kzg_proofs.len == blobs.len
doAssert kzg_proofs.len == kzg_commitments.len
var res = newSeqOfCap[BlobSidecar](blobs.len)
var res: ResultType = newSeqOfCap[blobFork.BlobSidecar](blobs.len)
let signedBlockHeader = forkyBlck.toSignedBeaconBlockHeader()
for i in 0 ..< blobs.lenu64:
var sidecar = BlobSidecar(
var sidecar = blobFork.BlobSidecar(
index: i,
blob: blobs[i],
kzg_commitment: kzg_commitments[i],
kzg_proof: kzg_proofs[i],
signed_block_header: signedBlockHeader)
forkyBlck.message.body.build_proof(
kzg_commitment_inclusion_proof_gindex(i),
blobFork.kzg_commitment_inclusion_proof_gindex(i),
sidecar.kzg_commitment_inclusion_proof).expect("Valid gindex")
res.add(sidecar)
res

View File

@ -47,7 +47,7 @@ type
): Opt[ForkedTrustedSignedBeaconBlock] {.gcsafe, raises: [].}
BlobLoaderFn* = proc(
blobId: BlobIdentifier): Opt[ref BlobSidecar] {.gcsafe, raises: [].}
blobId: BlobIdentifier): Opt[ForkedBlobSidecar] {.gcsafe, raises: [].}
InhibitFn* = proc: bool {.gcsafe, raises: [].}
@ -102,21 +102,23 @@ proc checkResponse(roots: openArray[Eth2Digest],
checks.del(res)
true
proc checkResponse(idList: seq[BlobIdentifier],
blobs: openArray[ref BlobSidecar]): bool =
proc checkResponse(
idList: seq[BlobIdentifier],
blobs: openArray[ForkedBlobSidecar]): bool =
if len(blobs) > len(idList):
return false
for blob in blobs:
let block_root = hash_tree_root(blob.signed_block_header.message)
var found = false
for id in idList:
if id.block_root == block_root and id.index == blob.index:
found = true
break
if not found:
return false
blob[].verify_blob_sidecar_inclusion_proof().isOkOr:
return false
withForkyBlob(blob):
let block_root = hash_tree_root(forkyBlob[].signed_block_header.message)
var found = false
for id in idList:
if id.block_root == block_root and id.index == forkyBlob[].index:
found = true
break
if not found:
return false
forkyBlob[].verify_blob_sidecar_inclusion_proof().isOkOr:
return false
true
proc requestBlocksByRoot(rman: RequestManager, items: seq[Eth2Digest]) {.async: (raises: [CancelledError]).} =
@ -214,7 +216,8 @@ proc fetchBlobsFromNetwork(self: RequestManager,
self.blobQuarantine[].put(b)
var curRoot: Eth2Digest
for b in ublobs:
let block_root = hash_tree_root(b.signed_block_header.message)
let block_root = withForkyBlob(b):
hash_tree_root(forkyBlob[].signed_block_header.message)
if block_root != curRoot:
curRoot = block_root
if (let o = self.quarantine[].popBlobless(curRoot); o.isSome):

View File

@ -88,7 +88,8 @@ type
BeaconBlocksRes =
NetRes[List[ref ForkedSignedBeaconBlock, Limit MAX_REQUEST_BLOCKS]]
BlobSidecarsRes = NetRes[List[ref BlobSidecar, Limit(MAX_REQUEST_BLOB_SIDECARS)]]
BlobSidecarsRes =
NetRes[List[ForkedBlobSidecar, Limit(MAX_REQUEST_BLOB_SIDECARS)]]
proc now*(sm: typedesc[SyncMoment], slots: uint64): SyncMoment {.inline.} =
SyncMoment(stamp: now(chronos.Moment), slots: slots)
@ -225,12 +226,12 @@ proc remainingSlots(man: SyncManager): uint64 =
else:
0'u64
func groupBlobs*[T](req: SyncRequest[T],
blocks: seq[ref ForkedSignedBeaconBlock],
blobs: seq[ref BlobSidecar]):
Result[seq[BlobSidecars], string] =
func groupBlobs*[T](
req: SyncRequest[T],
blocks: seq[ref ForkedSignedBeaconBlock],
blobs: seq[ForkedBlobSidecar]): Result[seq[ForkedBlobSidecars], string] =
var
grouped = newSeq[BlobSidecars](len(blocks))
grouped = newSeq[ForkedBlobSidecars](len(blocks))
blob_cursor = 0
for block_idx, blck in blocks:
withBlck(blck[]):
@ -241,17 +242,23 @@ func groupBlobs*[T](req: SyncRequest[T],
# Clients MUST include all blob sidecars of each block from which they include blob sidecars.
# The following blob sidecars, where they exist, MUST be sent in consecutive (slot, index) order.
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/deneb/p2p-interface.md#blobsidecarsbyrange-v1
const expectedBlobFork =
blobForkAtConsensusFork(consensusFork).expect("Blobs OK")
let header = forkyBlck.toSignedBeaconBlockHeader()
for blob_idx, kzg_commitment in kzgs:
if blob_cursor >= blobs.len:
return err("BlobSidecar: response too short")
let blob_sidecar = blobs[blob_cursor]
if blob_sidecar.index != BlobIndex blob_idx:
return err("BlobSidecar: unexpected index")
if blob_sidecar.kzg_commitment != kzg_commitment:
return err("BlobSidecar: unexpected kzg_commitment")
if blob_sidecar.signed_block_header != header:
return err("BlobSidecar: unexpected signed_block_header")
withForkyBlob(blob_sidecar):
when blobFork != expectedBlobFork:
return err("BlobSidecar: unexpected data fork")
else:
if forkyBlob[].index != BlobIndex blob_idx:
return err("BlobSidecar: unexpected index")
if forkyBlob[].kzg_commitment != kzg_commitment:
return err("BlobSidecar: unexpected kzg_commitment")
if forkyBlob[].signed_block_header != header:
return err("BlobSidecar: unexpected signed_block_header")
grouped[block_idx].add(blob_sidecar)
inc blob_cursor
@ -259,14 +266,15 @@ func groupBlobs*[T](req: SyncRequest[T],
# we reached end of blocks without consuming all blobs so either
# the peer we got too few blocks in the paired request, or the
# peer is sending us spurious blobs.
Result[seq[BlobSidecars], string].err "invalid block or blob sequence"
Result[seq[ForkedBlobSidecars], string].err "invalid block or blob sequence"
else:
Result[seq[BlobSidecars], string].ok grouped
Result[seq[ForkedBlobSidecars], string].ok grouped
func checkBlobs(blobs: seq[BlobSidecars]): Result[void, string] =
func checkBlobs(blobs: seq[ForkedBlobSidecars]): Result[void, string] =
for blob_sidecars in blobs:
for blob_sidecar in blob_sidecars:
? blob_sidecar[].verify_blob_sidecar_inclusion_proof()
withForkyBlob(blob_sidecar):
? forkyBlob[].verify_blob_sidecar_inclusion_proof()
ok()
proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A)
@ -456,7 +464,8 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A)
blobs_map = blobSmap, request = req
if len(blobData) > 0:
let slots = mapIt(blobData, it[].signed_block_header.message.slot)
let slots = mapIt(blobData, it.withForkyBlob(
forkyBlob[].signed_block_header.message.slot))
let uniqueSlots = foldl(slots, combine(a, b), @[slots[0]])
if not(checkResponse(req, uniqueSlots)):
peer.updateScore(PeerScoreBadResponse)
@ -483,7 +492,7 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A)
return
Opt.some(groupedBlobs.get())
else:
Opt.none(seq[BlobSidecars])
Opt.none(seq[ForkedBlobSidecars])
if len(blockData) == 0 and man.direction == SyncQueueKind.Backward and
req.contains(man.getSafeSlot()):

View File

@ -59,26 +59,29 @@ proc readChunkPayload*(
return err(res.error)
proc readChunkPayload*(
conn: Connection, peer: Peer, MsgType: type (ref BlobSidecar)):
conn: Connection, peer: Peer, MsgType: type (ForkedBlobSidecar)):
Future[NetRes[MsgType]] {.async: (raises: [CancelledError]).} =
var contextBytes: ForkDigest
try:
await conn.readExactly(addr contextBytes, sizeof contextBytes)
except CatchableError:
return neterr UnexpectedEOF
let contextFork =
peer.network.forkDigests[].consensusForkForDigest(contextBytes).valueOr:
let
contextFork =
peer.network.forkDigests[].consensusForkForDigest(contextBytes).valueOr:
return neterr InvalidContextBytes
blobFork = blobForkAtConsensusFork(contextFork).valueOr:
return neterr InvalidContextBytes
withConsensusFork(contextFork):
when consensusFork >= ConsensusFork.Deneb:
let res = await readChunkPayload(conn, peer, BlobSidecar)
if res.isOk:
return ok newClone(res.get)
else:
return err(res.error)
withBlobFork(blobFork):
let res = await readChunkPayload(conn, peer, blobFork.BlobSidecar)
if res.isOk:
if contextFork != peer.network.cfg.consensusForkAtEpoch(
res.get.signed_block_header.message.slot.epoch):
return neterr InvalidContextBytes
return ok ForkedBlobSidecar.init(newClone(res.get))
else:
return neterr InvalidContextBytes
return err(res.error)
{.pop.} # TODO fix p2p macro for raises
@ -220,7 +223,7 @@ p2pProtocol BeaconSync(version = 1,
peer: Peer,
blobIds: BlobIdentifierList,
response: MultipleChunksResponse[
ref BlobSidecar, Limit(MAX_REQUEST_BLOB_SIDECARS)])
ForkedBlobSidecar, Limit(MAX_REQUEST_BLOB_SIDECARS)])
{.async, libp2pProtocol("blob_sidecars_by_root", 1).} =
# TODO Semantically, this request should return a non-ref, but doing so
# runs into extreme inefficiency due to the compiler introducing
@ -231,7 +234,7 @@ p2pProtocol BeaconSync(version = 1,
# implementation (it's used to derive the signature of the client
# function, not in the code below!)
# TODO although you can't tell from this function definition, a magic
# client call that returns `seq[ref BlobSidecar]` will
# client call that returns `seq[ForkedBlobSidecar]` will
# will be generated by the libp2p macro - we guarantee that seq items
# are `not-nil` in the implementation
trace "got blobs range request", peer, len = blobIds.len
@ -247,10 +250,17 @@ p2pProtocol BeaconSync(version = 1,
bytes: seq[byte]
for i in 0..<count:
let blockRef = dag.getBlockRef(blobIds[i].block_root).valueOr:
continue
let index = blobIds[i].index
if dag.db.getBlobSidecarSZ(blockRef.bid.root, index, bytes):
let
blockRef = dag.getBlockRef(blobIds[i].block_root).valueOr:
continue
consensusFork = dag.cfg.consensusForkAtEpoch(blockRef.bid.slot.epoch)
blobFork = blobForkAtConsensusFork(consensusFork).valueOr:
continue # Pre-Deneb
index = blobIds[i].index
ok = withBlobFork(blobFork):
getBlobSidecarSZ[blobFork.BlobSidecar](
dag.db, blockRef.bid.root, index, bytes)
if ok:
let uncompressedLen = uncompressedLenFramed(bytes).valueOr:
warn "Cannot read blob size, database corrupt?",
bytes = bytes.len(), blck = shortLog(blockRef), blobindex = index
@ -273,14 +283,14 @@ p2pProtocol BeaconSync(version = 1,
startSlot: Slot,
reqCount: uint64,
response: MultipleChunksResponse[
ref BlobSidecar, Limit(MAX_REQUEST_BLOB_SIDECARS)])
ForkedBlobSidecar, Limit(MAX_REQUEST_BLOB_SIDECARS)])
{.async, libp2pProtocol("blob_sidecars_by_range", 1).} =
# TODO This code is more complicated than it needs to be, since the type
# of the multiple chunks response is not actually used in this server
# implementation (it's used to derive the signature of the client
# function, not in the code below!)
# TODO although you can't tell from this function definition, a magic
# client call that returns `seq[ref BlobSidecar]` will
# client call that returns `seq[ForkedBlobSidecar]` will
# will be generated by the libp2p macro - we guarantee that seq items
# are `not-nil` in the implementation
@ -311,8 +321,15 @@ p2pProtocol BeaconSync(version = 1,
bytes: seq[byte]
for i in startIndex..endIndex:
let
consensusFork = dag.cfg.consensusForkAtEpoch(blockIds[i].slot.epoch)
blobFork = blobForkAtConsensusFork(consensusFork).valueOr:
continue # Pre-Deneb
for j in 0..<MAX_BLOBS_PER_BLOCK:
if dag.db.getBlobSidecarSZ(blockIds[i].root, BlobIndex(j), bytes):
let ok = withBlobFork(blobFork):
getBlobSidecarSZ[blobFork.BlobSidecar](
dag.db, blockIds[i].root, BlobIndex(j), bytes)
if ok:
# In general, there is not much intermediate time between post-merge
# blocks all being optimistic and none of them being optimistic. The
# EL catches up, tells the CL the head is verified, and that's it.

View File

@ -26,9 +26,11 @@ type
GetSlotCallback* = proc(): Slot {.gcsafe, raises: [].}
GetBoolCallback* = proc(): bool {.gcsafe, raises: [].}
ProcessingCallback* = proc() {.gcsafe, raises: [].}
BlockVerifier* = proc(signedBlock: ForkedSignedBeaconBlock,
blobs: Opt[BlobSidecars], maybeFinalized: bool):
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).}
BlockVerifier* = proc(
signedBlock: ForkedSignedBeaconBlock,
blobs: Opt[ForkedBlobSidecars],
maybeFinalized: bool
): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).}
SyncQueueKind* {.pure.} = enum
Forward, Backward
@ -43,7 +45,7 @@ type
SyncResult*[T] = object
request*: SyncRequest[T]
data*: seq[ref ForkedSignedBeaconBlock]
blobs*: Opt[seq[BlobSidecars]]
blobs*: Opt[seq[ForkedBlobSidecars]]
GapItem*[T] = object
start*: Slot
@ -90,8 +92,8 @@ chronicles.expandIt SyncRequest:
peer = shortLog(it.item)
direction = toLowerAscii($it.kind)
proc getShortMap*[T](req: SyncRequest[T],
data: openArray[ref ForkedSignedBeaconBlock]): string =
proc getShortMap*[T](
req: SyncRequest[T], data: openArray[ref ForkedSignedBeaconBlock]): string =
## Returns all slot numbers in ``data`` as placement map.
var res = newStringOfCap(req.count)
var slider = req.slot
@ -111,8 +113,8 @@ proc getShortMap*[T](req: SyncRequest[T],
slider = slider + 1
res
proc getShortMap*[T](req: SyncRequest[T],
data: openArray[ref BlobSidecar]): string =
proc getShortMap*[T](
req: SyncRequest[T], data: openArray[ForkedBlobSidecar]): string =
## Returns all slot numbers in ``data`` as placement map.
var res = newStringOfCap(req.count * MAX_BLOBS_PER_BLOCK)
var cur : uint64 = 0
@ -120,9 +122,11 @@ proc getShortMap*[T](req: SyncRequest[T],
if cur >= lenu64(data):
res.add('|')
continue
if slot == data[cur].signed_block_header.message.slot:
let blobSlot = withForkyBlob(data[cur]):
forkyBlob[].signed_block_header.message.slot
if slot == blobSlot:
for k in cur..<cur+MAX_BLOBS_PER_BLOCK:
if k >= lenu64(data) or slot != data[k].signed_block_header.message.slot:
if k >= lenu64(data) or slot != blobSlot:
res.add('|')
break
else:
@ -541,14 +545,16 @@ proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot,
# This belongs inside the blocks iterator below, but can't be there due to
# https://github.com/nim-lang/Nim/issues/21242
func getOpt(blobs: Opt[seq[BlobSidecars]], i: int): Opt[BlobSidecars] =
func getOpt(
blobs: Opt[seq[ForkedBlobSidecars]], i: int): Opt[ForkedBlobSidecars] =
if blobs.isSome:
Opt.some(blobs.get()[i])
else:
Opt.none(BlobSidecars)
Opt.none(ForkedBlobSidecars)
iterator blocks[T](sq: SyncQueue[T],
sr: SyncResult[T]): (ref ForkedSignedBeaconBlock, Opt[BlobSidecars]) =
iterator blocks[T](
sq: SyncQueue[T],
sr: SyncResult[T]): (ref ForkedSignedBeaconBlock, Opt[ForkedBlobSidecars]) =
case sq.kind
of SyncQueueKind.Forward:
for i in countup(0, len(sr.data) - 1):
@ -607,11 +613,13 @@ func numAlreadyKnownSlots[T](sq: SyncQueue[T], sr: SyncRequest[T]): uint64 =
# Entire request is still relevant.
0
proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
data: seq[ref ForkedSignedBeaconBlock],
blobs: Opt[seq[BlobSidecars]],
maybeFinalized: bool = false,
processingCb: ProcessingCallback = nil) {.async: (raises: [CancelledError]).} =
proc push*[T](
sq: SyncQueue[T], sr: SyncRequest[T],
data: seq[ref ForkedSignedBeaconBlock],
blobs: Opt[seq[ForkedBlobSidecars]],
maybeFinalized: bool = false,
processingCb: ProcessingCallback = nil
) {.async: (raises: [CancelledError]).} =
logScope:
sync_ident = sq.ident
topics = "syncman"

View File

@ -84,11 +84,16 @@ template getCurrentBeaconTime(router: MessageRouter): BeaconTime =
type RouteBlockResult = Result[Opt[BlockRef], string]
proc routeSignedBeaconBlock*(
router: ref MessageRouter, blck: ForkySignedBeaconBlock,
blobsOpt: Opt[seq[BlobSidecar]], checkValidator: bool):
blobsOpt: Opt[seq[ForkyBlobSidecar]], checkValidator: bool):
Future[RouteBlockResult] {.async: (raises: [CancelledError]).} =
## Validate and broadcast beacon block, then add it to the block database
## Returns the new Head when block is added successfully to dag, none when
## block passes validation but is not added, and error otherwise
const
consensusFork = typeof(blck).kind
blobFork = blobForkAtConsensusFork(consensusFork).get(BlobFork.Deneb)
static: doAssert typeof(blobsOpt).T is seq[blobFork.BlobSidecar]
let wallTime = router[].getCurrentBeaconTime()
block:
@ -152,7 +157,7 @@ proc routeSignedBeaconBlock*(
blockRoot = shortLog(blck.root), blck = shortLog(blck.message),
signature = shortLog(blck.signature), error = res.error()
var blobRefs = Opt.none(BlobSidecars)
var blobRefs = Opt.none(ForkedBlobSidecars)
if blobsOpt.isSome():
let blobs = blobsOpt.get()
var workers = newSeq[Future[SendResult]](blobs.len)
@ -168,7 +173,7 @@ proc routeSignedBeaconBlock*(
blob = shortLog(blobs[i]), error = res.error[]
else:
notice "Blob sent", blob = shortLog(blobs[i])
blobRefs = Opt.some(blobs.mapIt(newClone(it)))
blobRefs = Opt.some(blobs.mapIt(ForkedBlobSidecar.init(newClone(it))))
let added = await router[].blockProcessor[].addBlock(
MsgSource.api, ForkedSignedBeaconBlock.init(blck), blobRefs)

View File

@ -829,105 +829,109 @@ suite "Beacon chain DB" & preset():
check:
hash_tree_root(state2[]) == root
test "sanity check blobs" & preset():
const
blockHeader0 = SignedBeaconBlockHeader(
message: BeaconBlockHeader(slot: Slot(0)))
blockHeader1 = SignedBeaconBlockHeader(
message: BeaconBlockHeader(slot: Slot(1)))
withAll(BlobFork):
test "sanity check blobs (" & $blobFork & ")" & preset():
const
blockHeader0 = SignedBeaconBlockHeader(
message: BeaconBlockHeader(slot: Slot(0)))
blockHeader1 = SignedBeaconBlockHeader(
message: BeaconBlockHeader(slot: Slot(1)))
let
blockRoot0 = hash_tree_root(blockHeader0.message)
blockRoot1 = hash_tree_root(blockHeader1.message)
let
blockRoot0 = hash_tree_root(blockHeader0.message)
blockRoot1 = hash_tree_root(blockHeader1.message)
# Ensure minimal-difference pairs on both block root and blob index to
# verify that blobkey uses both
blobSidecar0 = BlobSidecar(signed_block_header: blockHeader0, index: 3)
blobSidecar1 = BlobSidecar(signed_block_header: blockHeader0, index: 2)
blobSidecar2 = BlobSidecar(signed_block_header: blockHeader1, index: 2)
# Ensure minimal-difference pairs on both block root and blob index to
# verify that blobkey uses both
blobSidecar0 = blobFork.BlobSidecar(
signed_block_header: blockHeader0, index: 3)
blobSidecar1 = blobFork.BlobSidecar(
signed_block_header: blockHeader0, index: 2)
blobSidecar2 = blobFork.BlobSidecar(
signed_block_header: blockHeader1, index: 2)
db = makeTestDB(SLOTS_PER_EPOCH)
db = makeTestDB(SLOTS_PER_EPOCH)
var
buf: seq[byte]
blobSidecar: BlobSidecar
var
buf: seq[byte]
blobSidecar: blobFork.BlobSidecar
check:
not db.getBlobSidecar(blockRoot0, 3, blobSidecar)
not db.getBlobSidecar(blockRoot0, 2, blobSidecar)
not db.getBlobSidecar(blockRoot1, 2, blobSidecar)
not db.getBlobSidecarSZ(blockRoot0, 3, buf)
not db.getBlobSidecarSZ(blockRoot0, 2, buf)
not db.getBlobSidecarSZ(blockRoot1, 2, buf)
check:
not db.getBlobSidecar(blockRoot0, 3, blobSidecar)
not db.getBlobSidecar(blockRoot0, 2, blobSidecar)
not db.getBlobSidecar(blockRoot1, 2, blobSidecar)
not getBlobSidecarSZ[blobFork.BlobSidecar](db, blockRoot0, 3, buf)
not getBlobSidecarSZ[blobFork.BlobSidecar](db, blockRoot0, 2, buf)
not getBlobSidecarSZ[blobFork.BlobSidecar](db, blockRoot1, 2, buf)
db.putBlobSidecar(blobSidecar0)
db.putBlobSidecar(blobSidecar0)
check:
db.getBlobSidecar(blockRoot0, 3, blobSidecar)
blobSidecar == blobSidecar0
not db.getBlobSidecar(blockRoot0, 2, blobSidecar)
not db.getBlobSidecar(blockRoot1, 2, blobSidecar)
db.getBlobSidecarSZ(blockRoot0, 3, buf)
not db.getBlobSidecarSZ(blockRoot0, 2, buf)
not db.getBlobSidecarSZ(blockRoot1, 2, buf)
check:
db.getBlobSidecar(blockRoot0, 3, blobSidecar)
blobSidecar == blobSidecar0
not db.getBlobSidecar(blockRoot0, 2, blobSidecar)
not db.getBlobSidecar(blockRoot1, 2, blobSidecar)
getBlobSidecarSZ[blobFork.BlobSidecar](db, blockRoot0, 3, buf)
not getBlobSidecarSZ[blobFork.BlobSidecar](db, blockRoot0, 2, buf)
not getBlobSidecarSZ[blobFork.BlobSidecar](db, blockRoot1, 2, buf)
db.putBlobSidecar(blobSidecar1)
db.putBlobSidecar(blobSidecar1)
check:
db.getBlobSidecar(blockRoot0, 3, blobSidecar)
blobSidecar == blobSidecar0
db.getBlobSidecar(blockRoot0, 2, blobSidecar)
blobSidecar == blobSidecar1
not db.getBlobSidecar(blockRoot1, 2, blobSidecar)
db.getBlobSidecarSZ(blockRoot0, 3, buf)
db.getBlobSidecarSZ(blockRoot0, 2, buf)
not db.getBlobSidecarSZ(blockRoot1, 2, buf)
check:
db.getBlobSidecar(blockRoot0, 3, blobSidecar)
blobSidecar == blobSidecar0
db.getBlobSidecar(blockRoot0, 2, blobSidecar)
blobSidecar == blobSidecar1
not db.getBlobSidecar(blockRoot1, 2, blobSidecar)
getBlobSidecarSZ[blobFork.BlobSidecar](db, blockRoot0, 3, buf)
getBlobSidecarSZ[blobFork.BlobSidecar](db, blockRoot0, 2, buf)
not getBlobSidecarSZ[blobFork.BlobSidecar](db, blockRoot1, 2, buf)
check db.delBlobSidecar(blockRoot0, 3)
check db.delBlobSidecar(blockRoot0, 3)
check:
not db.getBlobSidecar(blockRoot0, 3, blobSidecar)
db.getBlobSidecar(blockRoot0, 2, blobSidecar)
blobSidecar == blobSidecar1
not db.getBlobSidecar(blockRoot1, 2, blobSidecar)
not db.getBlobSidecarSZ(blockRoot0, 3, buf)
db.getBlobSidecarSZ(blockRoot0, 2, buf)
not db.getBlobSidecarSZ(blockRoot1, 2, buf)
check:
not db.getBlobSidecar(blockRoot0, 3, blobSidecar)
db.getBlobSidecar(blockRoot0, 2, blobSidecar)
blobSidecar == blobSidecar1
not db.getBlobSidecar(blockRoot1, 2, blobSidecar)
not getBlobSidecarSZ[blobFork.BlobSidecar](db, blockRoot0, 3, buf)
getBlobSidecarSZ[blobFork.BlobSidecar](db, blockRoot0, 2, buf)
not getBlobSidecarSZ[blobFork.BlobSidecar](db, blockRoot1, 2, buf)
db.putBlobSidecar(blobSidecar2)
db.putBlobSidecar(blobSidecar2)
check:
not db.getBlobSidecar(blockRoot0, 3, blobSidecar)
db.getBlobSidecar(blockRoot0, 2, blobSidecar)
blobSidecar == blobSidecar1
db.getBlobSidecar(blockRoot1, 2, blobSidecar)
blobSidecar == blobSidecar2
not db.getBlobSidecarSZ(blockRoot0, 3, buf)
db.getBlobSidecarSZ(blockRoot0, 2, buf)
db.getBlobSidecarSZ(blockRoot1, 2, buf)
check:
not db.getBlobSidecar(blockRoot0, 3, blobSidecar)
db.getBlobSidecar(blockRoot0, 2, blobSidecar)
blobSidecar == blobSidecar1
db.getBlobSidecar(blockRoot1, 2, blobSidecar)
blobSidecar == blobSidecar2
not getBlobSidecarSZ[blobFork.BlobSidecar](db, blockRoot0, 3, buf)
getBlobSidecarSZ[blobFork.BlobSidecar](db, blockRoot0, 2, buf)
getBlobSidecarSZ[blobFork.BlobSidecar](db, blockRoot1, 2, buf)
check db.delBlobSidecar(blockRoot0, 2)
check db.delBlobSidecar(blockRoot0, 2)
check:
not db.getBlobSidecar(blockRoot0, 3, blobSidecar)
not db.getBlobSidecar(blockRoot0, 2, blobSidecar)
db.getBlobSidecar(blockRoot1, 2, blobSidecar)
blobSidecar == blobSidecar2
not db.getBlobSidecarSZ(blockRoot0, 3, buf)
not db.getBlobSidecarSZ(blockRoot0, 2, buf)
db.getBlobSidecarSZ(blockRoot1, 2, buf)
check:
not db.getBlobSidecar(blockRoot0, 3, blobSidecar)
not db.getBlobSidecar(blockRoot0, 2, blobSidecar)
db.getBlobSidecar(blockRoot1, 2, blobSidecar)
blobSidecar == blobSidecar2
not getBlobSidecarSZ[blobFork.BlobSidecar](db, blockRoot0, 3, buf)
not getBlobSidecarSZ[blobFork.BlobSidecar](db, blockRoot0, 2, buf)
getBlobSidecarSZ[blobFork.BlobSidecar](db, blockRoot1, 2, buf)
check db.delBlobSidecar(blockRoot1, 2)
check db.delBlobSidecar(blockRoot1, 2)
check:
not db.getBlobSidecar(blockRoot0, 3, blobSidecar)
not db.getBlobSidecar(blockRoot0, 2, blobSidecar)
not db.getBlobSidecar(blockRoot1, 2, blobSidecar)
not db.getBlobSidecarSZ(blockRoot0, 3, buf)
not db.getBlobSidecarSZ(blockRoot0, 2, buf)
not db.getBlobSidecarSZ(blockRoot1, 2, buf)
check:
not db.getBlobSidecar(blockRoot0, 3, blobSidecar)
not db.getBlobSidecar(blockRoot0, 2, blobSidecar)
not db.getBlobSidecar(blockRoot1, 2, blobSidecar)
not getBlobSidecarSZ[blobFork.BlobSidecar](db, blockRoot0, 3, buf)
not getBlobSidecarSZ[blobFork.BlobSidecar](db, blockRoot0, 2, buf)
not getBlobSidecarSZ[blobFork.BlobSidecar](db, blockRoot1, 2, buf)
db.close()
db.close()
suite "FinalizedBlocks" & preset():
test "Basic ops" & preset():
@ -956,4 +960,4 @@ suite "FinalizedBlocks" & preset():
check: k in [Slot 0, Slot 5]
items += 1
check: items == 2
check: items == 2

View File

@ -64,7 +64,7 @@ suite "Block processor" & preset():
let
missing = await processor[].addBlock(
MsgSource.gossip, ForkedSignedBeaconBlock.init(b2),
Opt.none(BlobSidecars))
Opt.none(ForkedBlobSidecars))
check: missing.error == VerifierError.MissingParent
@ -76,7 +76,7 @@ suite "Block processor" & preset():
let
status = await processor[].addBlock(
MsgSource.gossip, ForkedSignedBeaconBlock.init(b1),
Opt.none(BlobSidecars))
Opt.none(ForkedBlobSidecars))
b1Get = dag.getBlockRef(b1.root)
check:

View File

@ -49,9 +49,12 @@ func collector(queue: AsyncQueue[BlockEntry]): BlockVerifier =
# in the async queue, similar to how BlockProcessor does it - as far as
# testing goes, this is risky because it might introduce differences between
# the BlockProcessor and this test
proc verify(signedBlock: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars],
maybeFinalized: bool):
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} =
proc verify(
signedBlock: ForkedSignedBeaconBlock,
blobs: Opt[ForkedBlobSidecars],
maybeFinalized: bool
): Future[Result[void, VerifierError]] {.
async: (raises: [CancelledError], raw: true).} =
let fut = Future[Result[void, VerifierError]].Raising([CancelledError]).init()
try: queue.addLastNoWait(BlockEntry(blck: signedBlock, resfut: fut))
except CatchableError as exc: raiseAssert exc.msg
@ -73,8 +76,8 @@ suite "SyncManager test suite":
func createBlobs(
blocks: var seq[ref ForkedSignedBeaconBlock], slots: seq[Slot]
): seq[ref BlobSidecar] =
var res = newSeq[ref BlobSidecar](len(slots))
): seq[ForkedBlobSidecar] =
var res = newSeq[ForkedBlobSidecar](len(slots))
for blck in blocks:
withBlck(blck[]):
when consensusFork >= ConsensusFork.Deneb:
@ -94,7 +97,7 @@ suite "SyncManager test suite":
var sidecarIdx = 0
for i, slot in slots:
if slot == forkyBlck.message.slot:
res[i] = newClone sidecars[sidecarIdx]
res[i] = ForkedBlobSidecar.init(newClone sidecars[sidecarIdx])
inc sidecarIdx
res
@ -354,7 +357,7 @@ suite "SyncManager test suite":
if request.isEmpty():
break
await queue.push(request, getSlice(chain, start, request),
Opt.none(seq[BlobSidecars]))
Opt.none(seq[ForkedBlobSidecars]))
await validatorFut.cancelAndWait()
waitFor runSmokeTest()
@ -429,7 +432,7 @@ suite "SyncManager test suite":
var r13 = queue.pop(finishSlot, p3)
var f13 = queue.push(r13, chain.getSlice(startSlot, r13),
Opt.none(seq[BlobSidecars]))
Opt.none(seq[ForkedBlobSidecars]))
await sleepAsync(100.milliseconds)
check:
f13.finished == false
@ -438,7 +441,7 @@ suite "SyncManager test suite":
of SyncQueueKind.Backward: counter == int(finishSlot)
var f11 = queue.push(r11, chain.getSlice(startSlot, r11),
Opt.none(seq[BlobSidecars]))
Opt.none(seq[ForkedBlobSidecars]))
await sleepAsync(100.milliseconds)
check:
case kkind
@ -448,7 +451,7 @@ suite "SyncManager test suite":
f13.finished == false
var f12 = queue.push(r12, chain.getSlice(startSlot, r12),
Opt.none(seq[BlobSidecars]))
Opt.none(seq[ForkedBlobSidecars]))
await allFutures(f11, f12, f13)
check:
f12.finished == true and f12.failed == false
@ -551,7 +554,7 @@ suite "SyncManager test suite":
check response[0][].slot >= getFowardSafeSlotCb()
else:
check response[^1][].slot <= getBackwardSafeSlotCb()
await queue.push(request, response, Opt.none(seq[BlobSidecars]))
await queue.push(request, response, Opt.none(seq[ForkedBlobSidecars]))
await validatorFut.cancelAndWait()
waitFor runTest()
@ -634,7 +637,7 @@ suite "SyncManager test suite":
# Handle request 1. Should be re-enqueued as it simulates `Invalid`.
let response1 = getSlice(chain, start, request1)
await queue.push(request1, response1, Opt.none(seq[BlobSidecars]))
await queue.push(request1, response1, Opt.none(seq[ForkedBlobSidecars]))
check debtLen(queue) == request2.count + request1.count
# Request 1 should be discarded as it is no longer relevant.
@ -646,7 +649,7 @@ suite "SyncManager test suite":
# Handle request 3. Should be re-enqueued as it simulates `Invalid`.
let response3 = getSlice(chain, start, request3)
await queue.push(request3, response3, Opt.none(seq[BlobSidecars]))
await queue.push(request3, response3, Opt.none(seq[ForkedBlobSidecars]))
check debtLen(queue) == request3.count
# Request 2 should be re-issued.
@ -660,7 +663,7 @@ suite "SyncManager test suite":
# Handle request 4. Should be re-enqueued as it simulates `Invalid`.
let response4 = getSlice(chain, start, request4)
await queue.push(request4, response4, Opt.none(seq[BlobSidecars]))
await queue.push(request4, response4, Opt.none(seq[ForkedBlobSidecars]))
check debtLen(queue) == request4.count
# Advance `safeSlot` out of band.
@ -777,14 +780,14 @@ suite "SyncManager test suite":
var r14 = queue.pop(finishSlot, p4)
var f14 = queue.push(r14, chain.getSlice(startSlot, r14),
Opt.none(seq[BlobSidecars]))
Opt.none(seq[ForkedBlobSidecars]))
await sleepAsync(100.milliseconds)
check:
f14.finished == false
counter == int(startSlot)
var f12 = queue.push(r12, chain.getSlice(startSlot, r12),
Opt.none(seq[BlobSidecars]))
Opt.none(seq[ForkedBlobSidecars]))
await sleepAsync(100.milliseconds)
check:
counter == int(startSlot)
@ -792,7 +795,7 @@ suite "SyncManager test suite":
f14.finished == false
var f11 = queue.push(r11, chain.getSlice(startSlot, r11),
Opt.none(seq[BlobSidecars]))
Opt.none(seq[ForkedBlobSidecars]))
await allFutures(f11, f12)
check:
counter == int(startSlot + chunkSize + chunkSize)
@ -804,7 +807,7 @@ suite "SyncManager test suite":
withBlck(missingSlice[0][]):
forkyBlck.message.proposer_index = 0xDEADBEAF'u64
var f13 = queue.push(r13, missingSlice,
Opt.none(seq[BlobSidecars]))
Opt.none(seq[ForkedBlobSidecars]))
await allFutures(f13, f14)
check:
f11.finished == true and f11.failed == false
@ -826,17 +829,17 @@ suite "SyncManager test suite":
check r18.isEmpty() == true
var f17 = queue.push(r17, chain.getSlice(startSlot, r17),
Opt.none(seq[BlobSidecars]))
Opt.none(seq[ForkedBlobSidecars]))
await sleepAsync(100.milliseconds)
check f17.finished == false
var f16 = queue.push(r16, chain.getSlice(startSlot, r16),
Opt.none(seq[BlobSidecars]))
Opt.none(seq[ForkedBlobSidecars]))
await sleepAsync(100.milliseconds)
check f16.finished == false
var f15 = queue.push(r15, chain.getSlice(startSlot, r15),
Opt.none(seq[BlobSidecars]))
Opt.none(seq[ForkedBlobSidecars]))
await allFutures(f15, f16, f17)
check:
f15.finished == true and f15.failed == false
@ -883,7 +886,7 @@ suite "SyncManager test suite":
# Push a single request that will fail with all blocks being unviable
var f11 = queue.push(r11, chain.getSlice(startSlot, r11),
Opt.none(seq[BlobSidecars]))
Opt.none(seq[ForkedBlobSidecars]))
discard await f11.withTimeout(1.seconds)
check:
@ -949,14 +952,14 @@ suite "SyncManager test suite":
var r14 = queue.pop(finishSlot, p4)
var f14 = queue.push(r14, chain.getSlice(startSlot, r14),
Opt.none(seq[BlobSidecars]))
Opt.none(seq[ForkedBlobSidecars]))
await sleepAsync(100.milliseconds)
check:
f14.finished == false
counter == int(finishSlot)
var f12 = queue.push(r12, chain.getSlice(startSlot, r12),
Opt.none(seq[BlobSidecars]))
Opt.none(seq[ForkedBlobSidecars]))
await sleepAsync(100.milliseconds)
check:
counter == int(finishSlot)
@ -964,7 +967,7 @@ suite "SyncManager test suite":
f14.finished == false
var f11 = queue.push(r11, chain.getSlice(startSlot, r11),
Opt.none(seq[BlobSidecars]))
Opt.none(seq[ForkedBlobSidecars]))
await allFutures(f11, f12)
check:
counter == int(finishSlot - chunkSize - chunkSize)
@ -975,7 +978,7 @@ suite "SyncManager test suite":
var missingSlice = chain.getSlice(startSlot, r13)
withBlck(missingSlice[0][]):
forkyBlck.message.proposer_index = 0xDEADBEAF'u64
var f13 = queue.push(r13, missingSlice, Opt.none(seq[BlobSidecars]))
var f13 = queue.push(r13, missingSlice, Opt.none(seq[ForkedBlobSidecars]))
await allFutures(f13, f14)
check:
f11.finished == true and f11.failed == false
@ -993,12 +996,12 @@ suite "SyncManager test suite":
check r17.isEmpty() == true
var f16 = queue.push(r16, chain.getSlice(startSlot, r16),
Opt.none(seq[BlobSidecars]))
Opt.none(seq[ForkedBlobSidecars]))
await sleepAsync(100.milliseconds)
check f16.finished == false
var f15 = queue.push(r15, chain.getSlice(startSlot, r15),
Opt.none(seq[BlobSidecars]))
Opt.none(seq[ForkedBlobSidecars]))
await allFutures(f15, f16)
check:
f15.finished == true and f15.failed == false
@ -1101,16 +1104,20 @@ suite "SyncManager test suite":
len(grouped[0]) == 0
# slot 11
len(grouped[1]) == 2
grouped[1][0].signed_block_header.message.slot == Slot(11)
grouped[1][1].signed_block_header.message.slot == Slot(11)
withForkyBlob(grouped[1][0]):
forkyBlob[].signed_block_header.message.slot == Slot(11)
withForkyBlob(grouped[1][1]):
forkyBlob[].signed_block_header.message.slot == Slot(11)
# slot 12
len(grouped[2]) == 1
grouped[2][0].signed_block_header.message.slot == Slot(12)
withForkyBlob(grouped[2][0]):
forkyBlob[].signed_block_header.message.slot == Slot(12)
# slot 13
len(grouped[3]) == 0
# slot 14
len(grouped[4]) == 1
grouped[4][0].signed_block_header.message.slot == Slot(14)
withForkyBlob(grouped[4][0]):
forkyBlob[].signed_block_header.message.slot == Slot(14)
# slot 15
len(grouped[5]) == 0
@ -1127,16 +1134,15 @@ suite "SyncManager test suite":
len(grouped2) == 7
len(grouped2[6]) == 0 # slot 17
let blob18 = new (ref BlobSidecar)
blob18[].signed_block_header.message.slot = Slot(18)
let blob18 = ForkedBlobSidecar.init(new (ref deneb.BlobSidecar))
withForkyBlob(blob18):
forkyBlob[].signed_block_header.message.slot = Slot(18)
blobs.add(blob18)
let groupedRes3 = groupBlobs(req, blocks, blobs)
check:
groupedRes3.isErr()
test "[SyncQueue#Forward] getRewindPoint() test":
let aq = newAsyncQueue[BlockEntry]()
block: