track block/blob matching/quarantines using both indices and commitments (#5621)
This commit is contained in:
parent
6a9d522705
commit
2fc43c9ba7
|
@ -16,7 +16,8 @@ const
|
||||||
|
|
||||||
type
|
type
|
||||||
BlobQuarantine* = object
|
BlobQuarantine* = object
|
||||||
blobs*: OrderedTable[(Eth2Digest, BlobIndex), ref BlobSidecar]
|
blobs*:
|
||||||
|
OrderedTable[(Eth2Digest, BlobIndex, KzgCommitment), ref BlobSidecar]
|
||||||
BlobFetchRecord* = object
|
BlobFetchRecord* = object
|
||||||
block_root*: Eth2Digest
|
block_root*: Eth2Digest
|
||||||
indices*: seq[BlobIndex]
|
indices*: seq[BlobIndex]
|
||||||
|
@ -32,22 +33,19 @@ func put*(quarantine: var BlobQuarantine, blobSidecar: ref BlobSidecar) =
|
||||||
# FIFO if full. For example, sync manager and request manager can race to
|
# FIFO if full. For example, sync manager and request manager can race to
|
||||||
# put blobs in at the same time, so one gets blob insert -> block resolve
|
# put blobs in at the same time, so one gets blob insert -> block resolve
|
||||||
# -> blob insert sequence, which leaves garbage blobs.
|
# -> blob insert sequence, which leaves garbage blobs.
|
||||||
var oldest_blob_key: (Eth2Digest, BlobIndex)
|
#
|
||||||
|
# This also therefore automatically garbage-collects otherwise valid garbage
|
||||||
|
# blobs which are correctly signed, point to either correct block roots or a
|
||||||
|
# block root which isn't ever seen, and then are for any reason simply never
|
||||||
|
# used.
|
||||||
|
var oldest_blob_key: (Eth2Digest, BlobIndex, KzgCommitment)
|
||||||
for k in quarantine.blobs.keys:
|
for k in quarantine.blobs.keys:
|
||||||
oldest_blob_key = k
|
oldest_blob_key = k
|
||||||
break
|
break
|
||||||
quarantine.blobs.del oldest_blob_key
|
quarantine.blobs.del oldest_blob_key
|
||||||
let block_root = hash_tree_root(blobSidecar.signed_block_header.message)
|
let block_root = hash_tree_root(blobSidecar.signed_block_header.message)
|
||||||
discard quarantine.blobs.hasKeyOrPut(
|
discard quarantine.blobs.hasKeyOrPut(
|
||||||
(block_root, blobSidecar.index), blobSidecar)
|
(block_root, blobSidecar.index, blobSidecar.kzg_commitment), blobSidecar)
|
||||||
|
|
||||||
func blobIndices*(quarantine: BlobQuarantine, digest: Eth2Digest):
|
|
||||||
seq[BlobIndex] =
|
|
||||||
var r: seq[BlobIndex] = @[]
|
|
||||||
for i in 0..<MAX_BLOBS_PER_BLOCK:
|
|
||||||
if quarantine.blobs.hasKey((digest, i)):
|
|
||||||
r.add(i)
|
|
||||||
r
|
|
||||||
|
|
||||||
func hasBlob*(
|
func hasBlob*(
|
||||||
quarantine: BlobQuarantine,
|
quarantine: BlobQuarantine,
|
||||||
|
@ -62,22 +60,20 @@ func hasBlob*(
|
||||||
return true
|
return true
|
||||||
false
|
false
|
||||||
|
|
||||||
func popBlobs*(quarantine: var BlobQuarantine, digest: Eth2Digest):
|
func popBlobs*(
|
||||||
seq[ref BlobSidecar] =
|
quarantine: var BlobQuarantine, digest: Eth2Digest,
|
||||||
|
blck: deneb.SignedBeaconBlock): seq[ref BlobSidecar] =
|
||||||
var r: seq[ref BlobSidecar] = @[]
|
var r: seq[ref BlobSidecar] = @[]
|
||||||
for i in 0..<MAX_BLOBS_PER_BLOCK:
|
for idx, kzg_commitment in blck.message.body.blob_kzg_commitments:
|
||||||
var b: ref BlobSidecar
|
var b: ref BlobSidecar
|
||||||
if quarantine.blobs.pop((digest, i), b):
|
if quarantine.blobs.pop((digest, BlobIndex idx, kzg_commitment), b):
|
||||||
r.add(b)
|
r.add(b)
|
||||||
r
|
r
|
||||||
|
|
||||||
func hasBlobs*(quarantine: BlobQuarantine, blck: deneb.SignedBeaconBlock):
|
func hasBlobs*(quarantine: BlobQuarantine, blck: deneb.SignedBeaconBlock):
|
||||||
bool =
|
bool =
|
||||||
let idxs = quarantine.blobIndices(blck.root)
|
for idx, kzg_commitment in blck.message.body.blob_kzg_commitments:
|
||||||
if len(blck.message.body.blob_kzg_commitments) != len(idxs):
|
if (blck.root, BlobIndex idx, kzg_commitment) notin quarantine.blobs:
|
||||||
return false
|
|
||||||
for i in 0..<len(idxs):
|
|
||||||
if idxs[i] != uint64(i):
|
|
||||||
return false
|
return false
|
||||||
true
|
true
|
||||||
|
|
||||||
|
@ -86,6 +82,7 @@ func blobFetchRecord*(quarantine: BlobQuarantine, blck: deneb.SignedBeaconBlock)
|
||||||
var indices: seq[BlobIndex]
|
var indices: seq[BlobIndex]
|
||||||
for i in 0..<len(blck.message.body.blob_kzg_commitments):
|
for i in 0..<len(blck.message.body.blob_kzg_commitments):
|
||||||
let idx = BlobIndex(i)
|
let idx = BlobIndex(i)
|
||||||
if not quarantine.blobs.hasKey((blck.root, idx)):
|
if not quarantine.blobs.hasKey(
|
||||||
|
(blck.root, idx, blck.message.body.blob_kzg_commitments[i])):
|
||||||
indices.add(idx)
|
indices.add(idx)
|
||||||
BlobFetchRecord(block_root: blck.root, indices: indices)
|
BlobFetchRecord(block_root: blck.root, indices: indices)
|
||||||
|
|
|
@ -746,7 +746,8 @@ proc storeBlock(
|
||||||
error = res.error()
|
error = res.error()
|
||||||
continue
|
continue
|
||||||
if self.blobQuarantine[].hasBlobs(forkyBlck):
|
if self.blobQuarantine[].hasBlobs(forkyBlck):
|
||||||
let blobs = self.blobQuarantine[].popBlobs(forkyBlck.root)
|
let blobs = self.blobQuarantine[].popBlobs(
|
||||||
|
forkyBlck.root, forkyBlck)
|
||||||
self[].enqueueBlock(MsgSource.gossip, quarantined, Opt.some(blobs))
|
self[].enqueueBlock(MsgSource.gossip, quarantined, Opt.some(blobs))
|
||||||
else:
|
else:
|
||||||
if not self.consensusManager.quarantine[].addBlobless(
|
if not self.consensusManager.quarantine[].addBlobless(
|
||||||
|
|
|
@ -242,7 +242,7 @@ proc processSignedBeaconBlock*(
|
||||||
let blobs =
|
let blobs =
|
||||||
when typeof(signedBlock).kind >= ConsensusFork.Deneb:
|
when typeof(signedBlock).kind >= ConsensusFork.Deneb:
|
||||||
if self.blobQuarantine[].hasBlobs(signedBlock):
|
if self.blobQuarantine[].hasBlobs(signedBlock):
|
||||||
Opt.some(self.blobQuarantine[].popBlobs(signedBlock.root))
|
Opt.some(self.blobQuarantine[].popBlobs(signedBlock.root, signedBlock))
|
||||||
else:
|
else:
|
||||||
if not self.quarantine[].addBlobless(self.dag.finalizedHead.slot,
|
if not self.quarantine[].addBlobless(self.dag.finalizedHead.slot,
|
||||||
signedBlock):
|
signedBlock):
|
||||||
|
@ -310,7 +310,7 @@ proc processBlobSidecar*(
|
||||||
self.blockProcessor[].enqueueBlock(
|
self.blockProcessor[].enqueueBlock(
|
||||||
MsgSource.gossip,
|
MsgSource.gossip,
|
||||||
ForkedSignedBeaconBlock.init(blobless),
|
ForkedSignedBeaconBlock.init(blobless),
|
||||||
Opt.some(self.blobQuarantine[].popBlobs(block_root)))
|
Opt.some(self.blobQuarantine[].popBlobs(block_root, blobless)))
|
||||||
else:
|
else:
|
||||||
discard self.quarantine[].addBlobless(self.dag.finalizedHead.slot,
|
discard self.quarantine[].addBlobless(self.dag.finalizedHead.slot,
|
||||||
blobless)
|
blobless)
|
||||||
|
|
|
@ -408,7 +408,7 @@ proc initFullNode(
|
||||||
Result[void, VerifierError].err(VerifierError.MissingParent),
|
Result[void, VerifierError].err(VerifierError.MissingParent),
|
||||||
"rmanBlockVerifier")
|
"rmanBlockVerifier")
|
||||||
else:
|
else:
|
||||||
let blobs = blobQuarantine[].popBlobs(forkyBlck.root)
|
let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck)
|
||||||
blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
|
blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
|
||||||
Opt.some(blobs),
|
Opt.some(blobs),
|
||||||
maybeFinalized = maybeFinalized)
|
maybeFinalized = maybeFinalized)
|
||||||
|
|
|
@ -504,8 +504,6 @@ proc makeBeaconBlock*(
|
||||||
else:
|
else:
|
||||||
{.error: "Unsupported fork".}
|
{.error: "Unsupported fork".}
|
||||||
|
|
||||||
# workaround for https://github.com/nim-lang/Nim/issues/20900 rather than have
|
|
||||||
# these be default arguments
|
|
||||||
proc makeBeaconBlock*(
|
proc makeBeaconBlock*(
|
||||||
cfg: RuntimeConfig, state: var ForkedHashedBeaconState,
|
cfg: RuntimeConfig, state: var ForkedHashedBeaconState,
|
||||||
proposer_index: ValidatorIndex, randao_reveal: ValidatorSig,
|
proposer_index: ValidatorIndex, randao_reveal: ValidatorSig,
|
||||||
|
|
|
@ -293,7 +293,6 @@ proc getMissingBlobs(rman: RequestManager): seq[BlobIdentifier] =
|
||||||
if len(missing.indices) == 0:
|
if len(missing.indices) == 0:
|
||||||
warn "quarantine missing blobs, but missing indices is empty",
|
warn "quarantine missing blobs, but missing indices is empty",
|
||||||
blk=blobless.root,
|
blk=blobless.root,
|
||||||
indices=rman.blobQuarantine[].blobIndices(blobless.root),
|
|
||||||
commitments=len(blobless.message.body.blob_kzg_commitments)
|
commitments=len(blobless.message.body.blob_kzg_commitments)
|
||||||
for idx in missing.indices:
|
for idx in missing.indices:
|
||||||
let id = BlobIdentifier(block_root: blobless.root, index: idx)
|
let id = BlobIdentifier(block_root: blobless.root, index: idx)
|
||||||
|
@ -303,7 +302,6 @@ proc getMissingBlobs(rman: RequestManager): seq[BlobIdentifier] =
|
||||||
# this is a programming error should it occur.
|
# this is a programming error should it occur.
|
||||||
warn "missing blob handler found blobless block with all blobs",
|
warn "missing blob handler found blobless block with all blobs",
|
||||||
blk=blobless.root,
|
blk=blobless.root,
|
||||||
indices=rman.blobQuarantine[].blobIndices(blobless.root),
|
|
||||||
commitments=len(blobless.message.body.blob_kzg_commitments)
|
commitments=len(blobless.message.body.blob_kzg_commitments)
|
||||||
discard rman.blockVerifier(ForkedSignedBeaconBlock.init(blobless),
|
discard rman.blockVerifier(ForkedSignedBeaconBlock.init(blobless),
|
||||||
false)
|
false)
|
||||||
|
|
|
@ -560,8 +560,6 @@ proc makeBeaconBlockForHeadAndSlot*(
|
||||||
else:
|
else:
|
||||||
err(blck.error)
|
err(blck.error)
|
||||||
|
|
||||||
# workaround for https://github.com/nim-lang/Nim/issues/20900 to avoid default
|
|
||||||
# parameters
|
|
||||||
proc makeBeaconBlockForHeadAndSlot*(
|
proc makeBeaconBlockForHeadAndSlot*(
|
||||||
PayloadType: type ForkyExecutionPayloadForSigning, node: BeaconNode, randao_reveal: ValidatorSig,
|
PayloadType: type ForkyExecutionPayloadForSigning, node: BeaconNode, randao_reveal: ValidatorSig,
|
||||||
validator_index: ValidatorIndex, graffiti: GraffitiBytes, head: BlockRef,
|
validator_index: ValidatorIndex, graffiti: GraffitiBytes, head: BlockRef,
|
||||||
|
|
Loading…
Reference in New Issue