Fix sync for blocks older than MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS (#4977)
When doing sync for blocks older than MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, we skip the blobs by range request, but we then pass en empty blob sequence to validation, which then fails. To fix this: Use an Option[Blobsidecars] to allow expressing the distinction between "empty blob sequence" and "blobs unavailable". Use the latter for "old" blocks, and don't attempt to run blob validation.
This commit is contained in:
parent
4b23c4292d
commit
1cf777c64b
|
@ -60,7 +60,7 @@ type
|
||||||
BlobSidecars* = seq[ref BlobSidecar]
|
BlobSidecars* = seq[ref BlobSidecar]
|
||||||
BlockEntry = object
|
BlockEntry = object
|
||||||
blck*: ForkedSignedBeaconBlock
|
blck*: ForkedSignedBeaconBlock
|
||||||
blobs*: BlobSidecars
|
blobs*: Opt[BlobSidecars]
|
||||||
maybeFinalized*: bool
|
maybeFinalized*: bool
|
||||||
## The block source claims the block has been finalized already
|
## The block source claims the block has been finalized already
|
||||||
resfut*: Future[Result[void, VerifierError]]
|
resfut*: Future[Result[void, VerifierError]]
|
||||||
|
@ -126,7 +126,7 @@ type
|
||||||
|
|
||||||
proc addBlock*(
|
proc addBlock*(
|
||||||
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
|
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
|
||||||
blobs: BlobSidecars,
|
blobs: Opt[BlobSidecars],
|
||||||
resfut: Future[Result[void, VerifierError]] = nil,
|
resfut: Future[Result[void, VerifierError]] = nil,
|
||||||
maybeFinalized = false,
|
maybeFinalized = false,
|
||||||
validationDur = Duration())
|
validationDur = Duration())
|
||||||
|
@ -189,31 +189,30 @@ from ../consensus_object_pools/block_clearance import
|
||||||
proc storeBackfillBlock(
|
proc storeBackfillBlock(
|
||||||
self: var BlockProcessor,
|
self: var BlockProcessor,
|
||||||
signedBlock: ForkySignedBeaconBlock,
|
signedBlock: ForkySignedBeaconBlock,
|
||||||
blobs: BlobSidecars): Result[void, VerifierError] =
|
blobsOpt: Opt[BlobSidecars]): Result[void, VerifierError] =
|
||||||
|
|
||||||
# The block is certainly not missing any more
|
# The block is certainly not missing any more
|
||||||
self.consensusManager.quarantine[].missing.del(signedBlock.root)
|
self.consensusManager.quarantine[].missing.del(signedBlock.root)
|
||||||
|
|
||||||
# Establish blob viability before calling addbackfillBlock to avoid
|
# Establish blob viability before calling addbackfillBlock to avoid
|
||||||
# writing the block in case of blob error.
|
# writing the block in case of blob error.
|
||||||
let blobsOk =
|
var blobsOk = true
|
||||||
when typeof(signedBlock).toFork() >= ConsensusFork.Deneb:
|
when typeof(signedBlock).toFork() >= ConsensusFork.Deneb:
|
||||||
let kzgCommits = signedBlock.message.body.blob_kzg_commitments.asSeq
|
if blobsOpt.isSome:
|
||||||
if blobs.len > 0 or kzgCommits.len > 0:
|
let blobs = blobsOpt.get()
|
||||||
let r = validate_blobs(kzgCommits, blobs.mapIt(it.blob),
|
let kzgCommits = signedBlock.message.body.blob_kzg_commitments.asSeq
|
||||||
blobs.mapIt(it.kzg_proof))
|
if blobs.len > 0 or kzgCommits.len > 0:
|
||||||
if r.isErr():
|
let r = validate_blobs(kzgCommits, blobs.mapIt(it.blob),
|
||||||
debug "backfill blob validation failed",
|
blobs.mapIt(it.kzg_proof))
|
||||||
blockRoot = shortLog(signedBlock.root),
|
if r.isErr():
|
||||||
blobs = shortLog(blobs),
|
debug "backfill blob validation failed",
|
||||||
blck = shortLog(signedBlock.message),
|
blockRoot = shortLog(signedBlock.root),
|
||||||
signature = shortLog(signedBlock.signature),
|
blobs = shortLog(blobs),
|
||||||
msg = r.error()
|
blck = shortLog(signedBlock.message),
|
||||||
r.isOk()
|
signature = shortLog(signedBlock.signature),
|
||||||
else:
|
msg = r.error()
|
||||||
true
|
blobsOk = r.isOk()
|
||||||
else:
|
|
||||||
true
|
|
||||||
if not blobsOk:
|
if not blobsOk:
|
||||||
return err(VerifierError.Invalid)
|
return err(VerifierError.Invalid)
|
||||||
|
|
||||||
|
@ -236,6 +235,7 @@ proc storeBackfillBlock(
|
||||||
return res
|
return res
|
||||||
|
|
||||||
# Only store blobs after successfully establishing block viability.
|
# Only store blobs after successfully establishing block viability.
|
||||||
|
let blobs = blobsOpt.valueOr: BlobSidecars @[]
|
||||||
for b in blobs:
|
for b in blobs:
|
||||||
self.consensusManager.dag.db.putBlobSidecar(b[])
|
self.consensusManager.dag.db.putBlobSidecar(b[])
|
||||||
|
|
||||||
|
@ -398,7 +398,7 @@ proc checkBloblessSignature(self: BlockProcessor,
|
||||||
proc storeBlock*(
|
proc storeBlock*(
|
||||||
self: ref BlockProcessor, src: MsgSource, wallTime: BeaconTime,
|
self: ref BlockProcessor, src: MsgSource, wallTime: BeaconTime,
|
||||||
signedBlock: ForkySignedBeaconBlock,
|
signedBlock: ForkySignedBeaconBlock,
|
||||||
blobs: BlobSidecars,
|
blobsOpt: Opt[BlobSidecars],
|
||||||
maybeFinalized = false,
|
maybeFinalized = false,
|
||||||
queueTick: Moment = Moment.now(), validationDur = Duration()):
|
queueTick: Moment = Moment.now(), validationDur = Duration()):
|
||||||
Future[Result[BlockRef, (VerifierError, ProcessingStatus)]] {.async.} =
|
Future[Result[BlockRef, (VerifierError, ProcessingStatus)]] {.async.} =
|
||||||
|
@ -465,18 +465,20 @@ proc storeBlock*(
|
||||||
# Establish blob viability before calling addHeadBlock to avoid
|
# Establish blob viability before calling addHeadBlock to avoid
|
||||||
# writing the block in case of blob error.
|
# writing the block in case of blob error.
|
||||||
when typeof(signedBlock).toFork() >= ConsensusFork.Deneb:
|
when typeof(signedBlock).toFork() >= ConsensusFork.Deneb:
|
||||||
let kzgCommits = signedBlock.message.body.blob_kzg_commitments.asSeq
|
if blobsOpt.isSome:
|
||||||
if blobs.len > 0 or kzgCommits.len > 0:
|
let blobs = blobsOpt.get()
|
||||||
let r = validate_blobs(kzgCommits, blobs.mapIt(it.blob),
|
let kzgCommits = signedBlock.message.body.blob_kzg_commitments.asSeq
|
||||||
blobs.mapIt(it.kzg_proof))
|
if blobs.len > 0 or kzgCommits.len > 0:
|
||||||
if r.isErr():
|
let r = validate_blobs(kzgCommits, blobs.mapIt(it.blob),
|
||||||
debug "blob validation failed",
|
blobs.mapIt(it.kzg_proof))
|
||||||
blockRoot = shortLog(signedBlock.root),
|
if r.isErr():
|
||||||
blobs = shortLog(blobs),
|
debug "blob validation failed",
|
||||||
blck = shortLog(signedBlock.message),
|
blockRoot = shortLog(signedBlock.root),
|
||||||
signature = shortLog(signedBlock.signature),
|
blobs = shortLog(blobs),
|
||||||
msg = r.error()
|
blck = shortLog(signedBlock.message),
|
||||||
return err((VerifierError.Invalid, ProcessingStatus.completed))
|
signature = shortLog(signedBlock.signature),
|
||||||
|
msg = r.error()
|
||||||
|
return err((VerifierError.Invalid, ProcessingStatus.completed))
|
||||||
|
|
||||||
type Trusted = typeof signedBlock.asTrusted()
|
type Trusted = typeof signedBlock.asTrusted()
|
||||||
let blck = dag.addHeadBlock(self.verifier, signedBlock, payloadValid) do (
|
let blck = dag.addHeadBlock(self.verifier, signedBlock, payloadValid) do (
|
||||||
|
@ -536,6 +538,7 @@ proc storeBlock*(
|
||||||
self[].lastPayload = signedBlock.message.slot
|
self[].lastPayload = signedBlock.message.slot
|
||||||
|
|
||||||
# write blobs now that block has been written.
|
# write blobs now that block has been written.
|
||||||
|
let blobs = blobsOpt.valueOr: BlobSidecars @[]
|
||||||
for b in blobs:
|
for b in blobs:
|
||||||
self.consensusManager.dag.db.putBlobSidecar(b[])
|
self.consensusManager.dag.db.putBlobSidecar(b[])
|
||||||
|
|
||||||
|
@ -669,10 +672,10 @@ proc storeBlock*(
|
||||||
# Process the blocks that had the newly accepted block as parent
|
# Process the blocks that had the newly accepted block as parent
|
||||||
withBlck(quarantined):
|
withBlck(quarantined):
|
||||||
when typeof(blck).toFork() < ConsensusFork.Deneb:
|
when typeof(blck).toFork() < ConsensusFork.Deneb:
|
||||||
self[].addBlock(MsgSource.gossip, quarantined, BlobSidecars @[])
|
self[].addBlock(MsgSource.gossip, quarantined, Opt.none(BlobSidecars))
|
||||||
else:
|
else:
|
||||||
if len(blck.message.body.blob_kzg_commitments) == 0:
|
if len(blck.message.body.blob_kzg_commitments) == 0:
|
||||||
self[].addBlock(MsgSource.gossip, quarantined, BlobSidecars @[])
|
self[].addBlock(MsgSource.gossip, quarantined, Opt.some(BlobSidecars @[]))
|
||||||
else:
|
else:
|
||||||
if (let res = checkBloblessSignature(self[], blck); res.isErr):
|
if (let res = checkBloblessSignature(self[], blck); res.isErr):
|
||||||
warn "Failed to verify signature of unorphaned blobless block",
|
warn "Failed to verify signature of unorphaned blobless block",
|
||||||
|
@ -681,7 +684,7 @@ proc storeBlock*(
|
||||||
continue
|
continue
|
||||||
if self.blobQuarantine[].hasBlobs(blck):
|
if self.blobQuarantine[].hasBlobs(blck):
|
||||||
let blobs = self.blobQuarantine[].popBlobs(blck.root)
|
let blobs = self.blobQuarantine[].popBlobs(blck.root)
|
||||||
self[].addBlock(MsgSource.gossip, quarantined, blobs)
|
self[].addBlock(MsgSource.gossip, quarantined, Opt.some(blobs))
|
||||||
else:
|
else:
|
||||||
if not self.consensusManager.quarantine[].addBlobless(
|
if not self.consensusManager.quarantine[].addBlobless(
|
||||||
dag.finalizedHead.slot, blck):
|
dag.finalizedHead.slot, blck):
|
||||||
|
@ -696,7 +699,7 @@ proc storeBlock*(
|
||||||
|
|
||||||
proc addBlock*(
|
proc addBlock*(
|
||||||
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
|
self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock,
|
||||||
blobs: BlobSidecars,
|
blobs: Opt[BlobSidecars],
|
||||||
resfut: Future[Result[void, VerifierError]] = nil,
|
resfut: Future[Result[void, VerifierError]] = nil,
|
||||||
maybeFinalized = false,
|
maybeFinalized = false,
|
||||||
validationDur = Duration()) =
|
validationDur = Duration()) =
|
||||||
|
|
|
@ -238,10 +238,10 @@ proc processSignedBeaconBlock*(
|
||||||
# propagation of seemingly good blocks
|
# propagation of seemingly good blocks
|
||||||
trace "Block validated"
|
trace "Block validated"
|
||||||
|
|
||||||
var blobs: BlobSidecars
|
var blobs = Opt.none(BlobSidecars)
|
||||||
when typeof(signedBlock).toFork() >= ConsensusFork.Deneb:
|
when typeof(signedBlock).toFork() >= ConsensusFork.Deneb:
|
||||||
if self.blobQuarantine[].hasBlobs(signedBlock):
|
if self.blobQuarantine[].hasBlobs(signedBlock):
|
||||||
blobs = self.blobQuarantine[].popBlobs(signedBlock.root)
|
blobs = Opt.some(self.blobQuarantine[].popBlobs(signedBlock.root))
|
||||||
else:
|
else:
|
||||||
if not self.quarantine[].addBlobless(self.dag.finalizedHead.slot,
|
if not self.quarantine[].addBlobless(self.dag.finalizedHead.slot,
|
||||||
signedBlock):
|
signedBlock):
|
||||||
|
@ -314,8 +314,8 @@ proc processSignedBlobSidecar*(
|
||||||
self.blockProcessor[].addBlock(
|
self.blockProcessor[].addBlock(
|
||||||
MsgSource.gossip,
|
MsgSource.gossip,
|
||||||
ForkedSignedBeaconBlock.init(blobless),
|
ForkedSignedBeaconBlock.init(blobless),
|
||||||
self.blobQuarantine[].popBlobs(
|
Opt.some(self.blobQuarantine[].popBlobs(
|
||||||
signedBlobSidecar.message.block_root)
|
signedBlobSidecar.message.block_root))
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
discard self.quarantine[].addBlobless(self.dag.finalizedHead.slot,
|
discard self.quarantine[].addBlobless(self.dag.finalizedHead.slot,
|
||||||
|
|
|
@ -328,7 +328,7 @@ proc initFullNode(
|
||||||
rng, taskpool, consensusManager, node.validatorMonitor,
|
rng, taskpool, consensusManager, node.validatorMonitor,
|
||||||
blobQuarantine, getBeaconTime)
|
blobQuarantine, getBeaconTime)
|
||||||
blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
|
blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
|
||||||
blobs: BlobSidecars, maybeFinalized: bool):
|
blobs: Opt[BlobSidecars], maybeFinalized: bool):
|
||||||
Future[Result[void, VerifierError]] =
|
Future[Result[void, VerifierError]] =
|
||||||
# The design with a callback for block verification is unusual compared
|
# The design with a callback for block verification is unusual compared
|
||||||
# to the rest of the application, but fits with the general approach
|
# to the rest of the application, but fits with the general approach
|
||||||
|
@ -340,7 +340,7 @@ proc initFullNode(
|
||||||
resfut,
|
resfut,
|
||||||
maybeFinalized = maybeFinalized)
|
maybeFinalized = maybeFinalized)
|
||||||
resfut
|
resfut
|
||||||
rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
|
rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
|
||||||
maybeFinalized: bool):
|
maybeFinalized: bool):
|
||||||
Future[Result[void, VerifierError]] =
|
Future[Result[void, VerifierError]] =
|
||||||
let resfut = newFuture[Result[void, VerifierError]]("rmanBlockVerifier")
|
let resfut = newFuture[Result[void, VerifierError]]("rmanBlockVerifier")
|
||||||
|
@ -355,12 +355,12 @@ proc initFullNode(
|
||||||
return
|
return
|
||||||
let blobs = blobQuarantine[].popBlobs(blck.root)
|
let blobs = blobQuarantine[].popBlobs(blck.root)
|
||||||
blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
|
blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
|
||||||
blobs,
|
Opt.some(blobs),
|
||||||
resfut,
|
resfut,
|
||||||
maybeFinalized = maybeFinalized)
|
maybeFinalized = maybeFinalized)
|
||||||
else:
|
else:
|
||||||
blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
|
blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
|
||||||
BlobSidecars @[],
|
Opt.none(BlobSidecars),
|
||||||
resfut,
|
resfut,
|
||||||
maybeFinalized = maybeFinalized)
|
maybeFinalized = maybeFinalized)
|
||||||
resfut
|
resfut
|
||||||
|
@ -1375,8 +1375,8 @@ proc handleMissingBlobs(node: BeaconNode) =
|
||||||
node.blockProcessor[].addBlock(
|
node.blockProcessor[].addBlock(
|
||||||
MsgSource.gossip,
|
MsgSource.gossip,
|
||||||
ForkedSignedBeaconBlock.init(blobless),
|
ForkedSignedBeaconBlock.init(blobless),
|
||||||
node.blobQuarantine[].popBlobs(
|
Opt.some(node.blobQuarantine[].popBlobs(
|
||||||
blobless.root)
|
blobless.root))
|
||||||
)
|
)
|
||||||
node.quarantine[].removeBlobless(blobless)
|
node.quarantine[].removeBlobless(blobless)
|
||||||
if fetches.len > 0:
|
if fetches.len > 0:
|
||||||
|
|
|
@ -26,7 +26,7 @@ type
|
||||||
GetSlotCallback* = proc(): Slot {.gcsafe, raises: [Defect].}
|
GetSlotCallback* = proc(): Slot {.gcsafe, raises: [Defect].}
|
||||||
ProcessingCallback* = proc() {.gcsafe, raises: [Defect].}
|
ProcessingCallback* = proc() {.gcsafe, raises: [Defect].}
|
||||||
BlockVerifier* = proc(signedBlock: ForkedSignedBeaconBlock,
|
BlockVerifier* = proc(signedBlock: ForkedSignedBeaconBlock,
|
||||||
blobs: BlobSidecars, maybeFinalized: bool):
|
blobs: Opt[BlobSidecars], maybeFinalized: bool):
|
||||||
Future[Result[void, VerifierError]] {.gcsafe, raises: [Defect].}
|
Future[Result[void, VerifierError]] {.gcsafe, raises: [Defect].}
|
||||||
|
|
||||||
SyncQueueKind* {.pure.} = enum
|
SyncQueueKind* {.pure.} = enum
|
||||||
|
@ -680,9 +680,13 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
|
||||||
var i=0
|
var i=0
|
||||||
for blk in sq.blocks(item):
|
for blk in sq.blocks(item):
|
||||||
if reqres.get().blobs.isNone():
|
if reqres.get().blobs.isNone():
|
||||||
res = await sq.blockVerifier(blk[], BlobSidecars @[], maybeFinalized)
|
res = await sq.blockVerifier(blk[],
|
||||||
|
Opt.none(BlobSidecars),
|
||||||
|
maybeFinalized)
|
||||||
else:
|
else:
|
||||||
res = await sq.blockVerifier(blk[], reqres.get().blobs.get()[i], maybeFinalized)
|
res = await sq.blockVerifier(blk[],
|
||||||
|
Opt.some(reqres.get().blobs.get()[i]),
|
||||||
|
maybeFinalized)
|
||||||
inc(i)
|
inc(i)
|
||||||
|
|
||||||
if res.isOk():
|
if res.isOk():
|
||||||
|
|
|
@ -121,7 +121,7 @@ proc routeSignedBeaconBlock*(
|
||||||
signature = shortLog(blck.signature), error = res.error()
|
signature = shortLog(blck.signature), error = res.error()
|
||||||
|
|
||||||
let newBlockRef = await router[].blockProcessor.storeBlock(
|
let newBlockRef = await router[].blockProcessor.storeBlock(
|
||||||
MsgSource.api, sendTime, blck, BlobSidecars @[])
|
MsgSource.api, sendTime, blck, Opt.none(BlobSidecars))
|
||||||
|
|
||||||
# The boolean we return tells the caller whether the block was integrated
|
# The boolean we return tells the caller whether the block was integrated
|
||||||
# into the chain
|
# into the chain
|
||||||
|
|
|
@ -61,7 +61,7 @@ suite "Block processor" & preset():
|
||||||
|
|
||||||
asyncTest "Reverse order block add & get" & preset():
|
asyncTest "Reverse order block add & get" & preset():
|
||||||
let missing = await processor.storeBlock(
|
let missing = await processor.storeBlock(
|
||||||
MsgSource.gossip, b2.message.slot.start_beacon_time(), b2, BlobSidecars @[])
|
MsgSource.gossip, b2.message.slot.start_beacon_time(), b2, Opt.none(BlobSidecars))
|
||||||
check: missing.error[0] == VerifierError.MissingParent
|
check: missing.error[0] == VerifierError.MissingParent
|
||||||
|
|
||||||
check:
|
check:
|
||||||
|
@ -71,7 +71,7 @@ suite "Block processor" & preset():
|
||||||
|
|
||||||
let
|
let
|
||||||
status = await processor.storeBlock(
|
status = await processor.storeBlock(
|
||||||
MsgSource.gossip, b2.message.slot.start_beacon_time(), b1, BlobSidecars @[])
|
MsgSource.gossip, b2.message.slot.start_beacon_time(), b1, Opt.none(BlobSidecars))
|
||||||
b1Get = dag.getBlockRef(b1.root)
|
b1Get = dag.getBlockRef(b1.root)
|
||||||
|
|
||||||
check:
|
check:
|
||||||
|
|
|
@ -49,7 +49,7 @@ proc collector(queue: AsyncQueue[BlockEntry]): BlockVerifier =
|
||||||
# in the async queue, similar to how BlockProcessor does it - as far as
|
# in the async queue, similar to how BlockProcessor does it - as far as
|
||||||
# testing goes, this is risky because it might introduce differences between
|
# testing goes, this is risky because it might introduce differences between
|
||||||
# the BlockProcessor and this test
|
# the BlockProcessor and this test
|
||||||
proc verify(signedBlock: ForkedSignedBeaconBlock, blobs: BlobSidecars,
|
proc verify(signedBlock: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars],
|
||||||
maybeFinalized: bool):
|
maybeFinalized: bool):
|
||||||
Future[Result[void, VerifierError]] =
|
Future[Result[void, VerifierError]] =
|
||||||
let fut = newFuture[Result[void, VerifierError]]()
|
let fut = newFuture[Result[void, VerifierError]]()
|
||||||
|
|
Loading…
Reference in New Issue