bolster `BlobSidecar` syncing on incomplete responses (#5766)
Avoid marking blocks invalid when corresponding `blobSidecarsByRange` returns an incomplete / incorrect response while syncing. The block itself may still be valid in that scenario.
This commit is contained in:
parent
0b5ddd8a0e
commit
62ee92a094
|
@ -179,17 +179,11 @@ func check_attestation_subnet(
|
||||||
|
|
||||||
ok()
|
ok()
|
||||||
|
|
||||||
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.4/specs/deneb/p2p-interface.md#verify_blob_sidecar_inclusion_proof
|
func check_blob_sidecar_inclusion_proof(
|
||||||
func verify_blob_sidecar_inclusion_proof(
|
|
||||||
blob_sidecar: deneb.BlobSidecar): Result[void, ValidationError] =
|
blob_sidecar: deneb.BlobSidecar): Result[void, ValidationError] =
|
||||||
let gindex = kzg_commitment_inclusion_proof_gindex(blob_sidecar.index)
|
let res = blob_sidecar.verify_blob_sidecar_inclusion_proof()
|
||||||
if not is_valid_merkle_branch(
|
if res.isErr:
|
||||||
hash_tree_root(blob_sidecar.kzg_commitment),
|
return errReject(res.error)
|
||||||
blob_sidecar.kzg_commitment_inclusion_proof,
|
|
||||||
KZG_COMMITMENT_INCLUSION_PROOF_DEPTH,
|
|
||||||
get_subtree_index(gindex),
|
|
||||||
blob_sidecar.signed_block_header.message.body_root):
|
|
||||||
return errReject("BlobSidecar: inclusion proof not valid")
|
|
||||||
|
|
||||||
ok()
|
ok()
|
||||||
|
|
||||||
|
@ -361,7 +355,7 @@ proc validateBlobSidecar*(
|
||||||
# [REJECT] The sidecar's inclusion proof is valid as verified by
|
# [REJECT] The sidecar's inclusion proof is valid as verified by
|
||||||
# `verify_blob_sidecar_inclusion_proof(blob_sidecar)`.
|
# `verify_blob_sidecar_inclusion_proof(blob_sidecar)`.
|
||||||
block:
|
block:
|
||||||
let v = verify_blob_sidecar_inclusion_proof(blob_sidecar)
|
let v = check_blob_sidecar_inclusion_proof(blob_sidecar)
|
||||||
if v.isErr:
|
if v.isErr:
|
||||||
return dag.checkedReject(v.error)
|
return dag.checkedReject(v.error)
|
||||||
|
|
||||||
|
|
|
@ -211,6 +211,19 @@ func has_flag*(flags: ParticipationFlags, flag_index: TimelyFlag): bool =
|
||||||
let flag = ParticipationFlags(1'u8 shl ord(flag_index))
|
let flag = ParticipationFlags(1'u8 shl ord(flag_index))
|
||||||
(flags and flag) == flag
|
(flags and flag) == flag
|
||||||
|
|
||||||
|
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.4/specs/deneb/p2p-interface.md#check_blob_sidecar_inclusion_proof
|
||||||
|
func verify_blob_sidecar_inclusion_proof*(
|
||||||
|
blob_sidecar: BlobSidecar): Result[void, string] =
|
||||||
|
let gindex = kzg_commitment_inclusion_proof_gindex(blob_sidecar.index)
|
||||||
|
if not is_valid_merkle_branch(
|
||||||
|
hash_tree_root(blob_sidecar.kzg_commitment),
|
||||||
|
blob_sidecar.kzg_commitment_inclusion_proof,
|
||||||
|
KZG_COMMITMENT_INCLUSION_PROOF_DEPTH,
|
||||||
|
get_subtree_index(gindex),
|
||||||
|
blob_sidecar.signed_block_header.message.body_root):
|
||||||
|
return err("BlobSidecar: inclusion proof not valid")
|
||||||
|
ok()
|
||||||
|
|
||||||
func create_blob_sidecars*(
|
func create_blob_sidecars*(
|
||||||
forkyBlck: deneb.SignedBeaconBlock,
|
forkyBlck: deneb.SignedBeaconBlock,
|
||||||
kzg_proofs: KzgProofs,
|
kzg_proofs: KzgProofs,
|
||||||
|
|
|
@ -193,8 +193,9 @@ proc shouldGetBlobs[A, B](man: SyncManager[A, B], e: Epoch): bool =
|
||||||
(wallEpoch < man.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS or
|
(wallEpoch < man.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS or
|
||||||
e >= wallEpoch - man.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS)
|
e >= wallEpoch - man.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS)
|
||||||
|
|
||||||
proc getBlobSidecars*[A, B](man: SyncManager[A, B], peer: A,
|
proc getBlobSidecars[A, B](man: SyncManager[A, B], peer: A,
|
||||||
req: SyncRequest): Future[BlobSidecarsRes] {.async.} =
|
req: SyncRequest
|
||||||
|
): Future[BlobSidecarsRes] {.async.} =
|
||||||
mixin getScore, `==`
|
mixin getScore, `==`
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
|
@ -241,23 +242,33 @@ func groupBlobs*[T](req: SyncRequest[T],
|
||||||
blocks: seq[ref ForkedSignedBeaconBlock],
|
blocks: seq[ref ForkedSignedBeaconBlock],
|
||||||
blobs: seq[ref BlobSidecar]):
|
blobs: seq[ref BlobSidecar]):
|
||||||
Result[seq[BlobSidecars], string] =
|
Result[seq[BlobSidecars], string] =
|
||||||
var grouped = newSeq[BlobSidecars](len(blocks))
|
var
|
||||||
var blobCursor = 0
|
grouped = newSeq[BlobSidecars](len(blocks))
|
||||||
var i = 0
|
blob_cursor = 0
|
||||||
for blck in blocks:
|
for block_idx, blck in blocks:
|
||||||
let slot = blck[].slot
|
withBlck(blck[]):
|
||||||
if blobCursor == len(blobs):
|
when consensusFork >= ConsensusFork.Deneb:
|
||||||
# reached end of blobs, have more blobless blocks
|
template kzgs: untyped = forkyBlck.message.body.blob_kzg_commitments
|
||||||
break
|
if kzgs.len == 0:
|
||||||
for blob in blobs[blobCursor..len(blobs)-1]:
|
continue
|
||||||
if blob.signed_block_header.message.slot < slot:
|
# Clients MUST include all blob sidecars of each block from which they include blob sidecars.
|
||||||
return Result[seq[BlobSidecars], string].err "invalid blob sequence"
|
# The following blob sidecars, where they exist, MUST be sent in consecutive (slot, index) order.
|
||||||
if blob.signed_block_header.message.slot == slot:
|
# https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/deneb/p2p-interface.md#blobsidecarsbyrange-v1
|
||||||
grouped[i].add(blob)
|
let header = forkyBlck.toSignedBeaconBlockHeader()
|
||||||
blobCursor = blobCursor + 1
|
for blob_idx, kzg_commitment in kzgs:
|
||||||
i = i + 1
|
if blob_cursor >= blobs.len:
|
||||||
|
return err("BlobSidecar: response too short")
|
||||||
|
let blob_sidecar = blobs[blob_cursor]
|
||||||
|
if blob_sidecar.index != BlobIndex blob_idx:
|
||||||
|
return err("BlobSidecar: unexpected index")
|
||||||
|
if blob_sidecar.kzg_commitment != kzg_commitment:
|
||||||
|
return err("BlobSidecar: unexpected kzg_commitment")
|
||||||
|
if blob_sidecar.signed_block_header != header:
|
||||||
|
return err("BlobSidecar: unexpected signed_block_header")
|
||||||
|
grouped[block_idx].add(blob_sidecar)
|
||||||
|
inc blob_cursor
|
||||||
|
|
||||||
if blobCursor != len(blobs):
|
if blob_cursor != len(blobs):
|
||||||
# we reached end of blocks without consuming all blobs so either
|
# we reached end of blocks without consuming all blobs so either
|
||||||
# the peer we got too few blocks in the paired request, or the
|
# the peer we got too few blocks in the paired request, or the
|
||||||
# peer is sending us spurious blobs.
|
# peer is sending us spurious blobs.
|
||||||
|
@ -265,6 +276,12 @@ func groupBlobs*[T](req: SyncRequest[T],
|
||||||
else:
|
else:
|
||||||
Result[seq[BlobSidecars], string].ok grouped
|
Result[seq[BlobSidecars], string].ok grouped
|
||||||
|
|
||||||
|
func checkBlobs(blobs: seq[BlobSidecars]): Result[void, string] =
|
||||||
|
for blob_sidecars in blobs:
|
||||||
|
for blob_sidecar in blob_sidecars:
|
||||||
|
? blob_sidecar[].verify_blob_sidecar_inclusion_proof()
|
||||||
|
ok()
|
||||||
|
|
||||||
proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
|
proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
|
||||||
logScope:
|
logScope:
|
||||||
peer_score = peer.getScore()
|
peer_score = peer.getScore()
|
||||||
|
@ -454,30 +471,24 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
|
||||||
return
|
return
|
||||||
let groupedBlobs = groupBlobs(req, blockData, blobData)
|
let groupedBlobs = groupBlobs(req, blockData, blobData)
|
||||||
if groupedBlobs.isErr():
|
if groupedBlobs.isErr():
|
||||||
|
peer.updateScore(PeerScoreNoValues)
|
||||||
|
man.queue.push(req)
|
||||||
|
info "Received blobs sequence is inconsistent",
|
||||||
|
blobs_map = getShortMap(req, blobData), request = req, msg=groupedBlobs.error()
|
||||||
|
return
|
||||||
|
if (let checkRes = groupedBlobs.get.checkBlobs(); checkRes.isErr):
|
||||||
peer.updateScore(PeerScoreBadResponse)
|
peer.updateScore(PeerScoreBadResponse)
|
||||||
man.queue.push(req)
|
man.queue.push(req)
|
||||||
warn "Received blobs sequence is invalid",
|
warn "Received blobs sequence is invalid",
|
||||||
blobs_map = getShortMap(req, blobData), request = req, msg=groupedBlobs.error()
|
blobs_count = len(blobData),
|
||||||
|
blobs_map = getShortMap(req, blobData),
|
||||||
|
request = req,
|
||||||
|
msg = checkRes.error
|
||||||
return
|
return
|
||||||
Opt.some(groupedBlobs.get())
|
Opt.some(groupedBlobs.get())
|
||||||
else:
|
else:
|
||||||
Opt.none(seq[BlobSidecars])
|
Opt.none(seq[BlobSidecars])
|
||||||
|
|
||||||
if blobData.isSome:
|
|
||||||
let blobs = blobData.get()
|
|
||||||
if len(blobs) != len(blockData):
|
|
||||||
peer.updateScore(PeerScoreNoValues)
|
|
||||||
man.queue.push(req)
|
|
||||||
info "block and blobs have different lengths", blobs=len(blobs), blocks=len(blockData)
|
|
||||||
return
|
|
||||||
for i, blk in blockData:
|
|
||||||
if len(blobs[i]) > 0 and blk[].slot !=
|
|
||||||
blobs[i][0].signed_block_header.message.slot:
|
|
||||||
peer.updateScore(PeerScoreNoValues)
|
|
||||||
man.queue.push(req)
|
|
||||||
debug "block and blobs data have inconsistent slots"
|
|
||||||
return
|
|
||||||
|
|
||||||
if len(blockData) == 0 and man.direction == SyncQueueKind.Backward and
|
if len(blockData) == 0 and man.direction == SyncQueueKind.Backward and
|
||||||
req.contains(man.getSafeSlot()):
|
req.contains(man.getSafeSlot()):
|
||||||
# The sync protocol does not distinguish between:
|
# The sync protocol does not distinguish between:
|
||||||
|
|
|
@ -12,7 +12,6 @@ import unittest2
|
||||||
import chronos
|
import chronos
|
||||||
import ../beacon_chain/gossip_processing/block_processor,
|
import ../beacon_chain/gossip_processing/block_processor,
|
||||||
../beacon_chain/sync/sync_manager,
|
../beacon_chain/sync/sync_manager,
|
||||||
../beacon_chain/spec/datatypes/phase0,
|
|
||||||
../beacon_chain/spec/forks
|
../beacon_chain/spec/forks
|
||||||
|
|
||||||
type
|
type
|
||||||
|
@ -66,16 +65,36 @@ suite "SyncManager test suite":
|
||||||
var res = newSeq[ref ForkedSignedBeaconBlock](count)
|
var res = newSeq[ref ForkedSignedBeaconBlock](count)
|
||||||
var curslot = start
|
var curslot = start
|
||||||
for item in res.mitems():
|
for item in res.mitems():
|
||||||
item = new ForkedSignedBeaconBlock
|
item = newClone ForkedSignedBeaconBlock(kind: ConsensusFork.Deneb)
|
||||||
item[].phase0Data.message.slot = curslot
|
item[].denebData.message.slot = curslot
|
||||||
curslot = curslot + 1'u64
|
curslot = curslot + 1'u64
|
||||||
res
|
res
|
||||||
|
|
||||||
func createBlobs(slots: seq[Slot]): seq[ref BlobSidecar] =
|
func createBlobs(
|
||||||
|
blocks: var seq[ref ForkedSignedBeaconBlock], slots: seq[Slot]
|
||||||
|
): seq[ref BlobSidecar] =
|
||||||
var res = newSeq[ref BlobSidecar](len(slots))
|
var res = newSeq[ref BlobSidecar](len(slots))
|
||||||
for (i, item) in res.mpairs():
|
for blck in blocks:
|
||||||
item = new BlobSidecar
|
withBlck(blck[]):
|
||||||
item[].signed_block_header.message.slot = slots[i]
|
when consensusFork >= ConsensusFork.Deneb:
|
||||||
|
template kzgs: untyped = forkyBlck.message.body.blob_kzg_commitments
|
||||||
|
for i, slot in slots:
|
||||||
|
if slot == forkyBlck.message.slot:
|
||||||
|
doAssert kzgs.add default(KzgCommitment)
|
||||||
|
if kzgs.len > 0:
|
||||||
|
forkyBlck.root = hash_tree_root(forkyBlck.message)
|
||||||
|
var
|
||||||
|
kzg_proofs: KzgProofs
|
||||||
|
blobs: Blobs
|
||||||
|
for _ in kzgs:
|
||||||
|
doAssert kzg_proofs.add default(KzgProof)
|
||||||
|
doAssert blobs.add default(Blob)
|
||||||
|
let sidecars = forkyBlck.create_blob_sidecars(kzg_proofs, blobs)
|
||||||
|
var sidecarIdx = 0
|
||||||
|
for i, slot in slots:
|
||||||
|
if slot == forkyBlck.message.slot:
|
||||||
|
res[i] = newClone sidecars[sidecarIdx]
|
||||||
|
inc sidecarIdx
|
||||||
res
|
res
|
||||||
|
|
||||||
proc getSlice(chain: openArray[ref ForkedSignedBeaconBlock], startSlot: Slot,
|
proc getSlice(chain: openArray[ref ForkedSignedBeaconBlock], startSlot: Slot,
|
||||||
|
@ -1064,8 +1083,8 @@ suite "SyncManager test suite":
|
||||||
checkResponse(r21, @[slots[3]]) == false
|
checkResponse(r21, @[slots[3]]) == false
|
||||||
|
|
||||||
test "[SyncManager] groupBlobs() test":
|
test "[SyncManager] groupBlobs() test":
|
||||||
var blobs = createBlobs(@[Slot(11), Slot(11), Slot(12), Slot(14)])
|
|
||||||
var blocks = createChain(Slot(10), Slot(15))
|
var blocks = createChain(Slot(10), Slot(15))
|
||||||
|
var blobs = createBlobs(blocks, @[Slot(11), Slot(11), Slot(12), Slot(14)])
|
||||||
|
|
||||||
let req = SyncRequest[SomeTPeer](slot: Slot(10))
|
let req = SyncRequest[SomeTPeer](slot: Slot(10))
|
||||||
let groupedRes = groupBlobs(req, blocks, blobs)
|
let groupedRes = groupBlobs(req, blocks, blobs)
|
||||||
|
@ -1095,8 +1114,8 @@ suite "SyncManager test suite":
|
||||||
len(grouped[5]) == 0
|
len(grouped[5]) == 0
|
||||||
|
|
||||||
# Add block with a gap from previous block.
|
# Add block with a gap from previous block.
|
||||||
let block17 = new (ref ForkedSignedBeaconBlock)
|
let block17 = newClone ForkedSignedBeaconBlock(kind: ConsensusFork.Deneb)
|
||||||
block17[].phase0Data.message.slot = Slot(17)
|
block17[].denebData.message.slot = Slot(17)
|
||||||
blocks.add(block17)
|
blocks.add(block17)
|
||||||
let groupedRes2 = groupBlobs(req, blocks, blobs)
|
let groupedRes2 = groupBlobs(req, blocks, blobs)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue