Blob handling sync fixes (#4888)
* Fix groupBlobs * Fix getShortMap * Fix blob handling in sync * lint * Add some blob-related logging
This commit is contained in:
parent
ae46be7020
commit
23adf15e5a
|
@ -8,6 +8,7 @@
|
|||
{.push raises: [].}
|
||||
|
||||
import
|
||||
chronicles,
|
||||
std/[options, tables],
|
||||
stew/bitops2,
|
||||
../spec/forks
|
||||
|
@ -303,6 +304,7 @@ proc addBlobless*(
|
|||
if quarantine.blobless.lenu64 >= MaxBlobless:
|
||||
return false
|
||||
|
||||
debug "block quarantine: Adding blobless", blck = shortLog(signedBlock)
|
||||
quarantine.blobless[signedBlock.root] = signedBlock
|
||||
quarantine.missing.del(signedBlock.root)
|
||||
true
|
||||
|
|
|
@ -1360,6 +1360,7 @@ proc handleMissingBlobs(node: BeaconNode) =
|
|||
|
||||
# give blobs a chance to arrive over gossip
|
||||
if blobless.message.slot == wallSlot and delay < waitDur:
|
||||
debug "Not handling missing blobs as early in slot"
|
||||
continue
|
||||
|
||||
if not node.blobQuarantine[].hasBlobs(blobless):
|
||||
|
@ -1376,6 +1377,7 @@ proc handleMissingBlobs(node: BeaconNode) =
|
|||
blobless.root)
|
||||
)
|
||||
node.quarantine[].removeBlobless(blobless)
|
||||
debug "Requesting detected missing blobs", blobs = shortLog(fetches)
|
||||
node.requestManager.fetchMissingBlobs(fetches)
|
||||
|
||||
proc handleMissingBlocks(node: BeaconNode) =
|
||||
|
|
|
@ -233,22 +233,33 @@ proc remainingSlots(man: SyncManager): uint64 =
|
|||
else:
|
||||
0'u64
|
||||
|
||||
func groupBlobs*[T](req: SyncRequest[T], blobs: seq[ref BlobSidecar]):
|
||||
Result[seq[BlobSidecars], string] =
|
||||
var grouped = newSeq[BlobSidecars](req.count)
|
||||
var rawCur = 0
|
||||
func groupBlobs*[T](req: SyncRequest[T],
|
||||
blocks: seq[ref ForkedSignedBeaconBlock],
|
||||
blobs: seq[ref BlobSidecar]):
|
||||
Result[seq[BlobSidecars], string] =
|
||||
var grouped = newSeq[BlobSidecars](len(blocks))
|
||||
var blobCursor = 0
|
||||
var i = 0
|
||||
for blck in blocks:
|
||||
let slot = blck[].slot
|
||||
if blobCursor == len(blobs):
|
||||
# reached end of blobs, have more blobless blocks
|
||||
break
|
||||
for blob in blobs[blobCursor..len(blobs)-1]:
|
||||
if blob.slot < slot:
|
||||
return Result[seq[BlobSidecars], string].err "invalid blob sequence"
|
||||
if blob.slot==slot:
|
||||
grouped[i].add(blob)
|
||||
blobCursor = blobCursor + 1
|
||||
i = i + 1
|
||||
|
||||
for groupedCur in 0 ..< len(grouped):
|
||||
grouped[groupedCur] = newSeq[ref BlobSidecar](0)
|
||||
let slot = req.slot + groupedCur.uint64
|
||||
while rawCur < len(blobs) and blobs[rawCur].slot == slot:
|
||||
grouped[groupedCur].add(blobs[rawCur])
|
||||
inc(rawCur)
|
||||
|
||||
if rawCur != len(blobs):
|
||||
result.err "invalid blob sequence"
|
||||
else:
|
||||
result.ok grouped
|
||||
if blobCursor != len(blobs):
|
||||
# we reached end of blocks without consuming all blobs so either
|
||||
# the peer we got too few blocks in the paired request, or the
|
||||
# peer is sending us spurious blobs.
|
||||
Result[seq[BlobSidecars], string].err "invalid block or blob sequence"
|
||||
else:
|
||||
Result[seq[BlobSidecars], string].ok grouped
|
||||
|
||||
proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
|
||||
logScope:
|
||||
|
@ -422,19 +433,20 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
|
|||
debug "Failed to receive blobs on request", request = req
|
||||
return
|
||||
let blobData = blobs.get().asSeq()
|
||||
let slots = mapIt(blobData, it[].slot)
|
||||
let blobSmap = getShortMap(req, blobData)
|
||||
debug "Received blobs on request", blobs_count = len(blobData),
|
||||
blobs_map = blobSmap, request = req
|
||||
|
||||
let uniqueSlots = foldl(slots, combine(a, b), @[slots[0]])
|
||||
if not(checkResponse(req, uniqueSlots)):
|
||||
peer.updateScore(PeerScoreBadResponse)
|
||||
warn "Received blobs sequence is not in requested range",
|
||||
blobs_count = len(blobData), blobs_map = getShortMap(req, blobData),
|
||||
request = req
|
||||
return
|
||||
let groupedBlobs = groupBlobs(req, blobData)
|
||||
if len(blobData) > 0:
|
||||
let slots = mapIt(blobData, it[].slot)
|
||||
let uniqueSlots = foldl(slots, combine(a, b), @[slots[0]])
|
||||
if not(checkResponse(req, uniqueSlots)):
|
||||
peer.updateScore(PeerScoreBadResponse)
|
||||
warn "Received blobs sequence is not in requested range",
|
||||
blobs_count = len(blobData), blobs_map = getShortMap(req, blobData),
|
||||
request = req
|
||||
return
|
||||
let groupedBlobs = groupBlobs(req, blockData, blobData)
|
||||
if groupedBlobs.isErr():
|
||||
warn "Received blobs sequence is invalid",
|
||||
blobs_map = getShortMap(req, blobData), request = req, msg=groupedBlobs.error()
|
||||
|
|
|
@ -116,14 +116,17 @@ proc getShortMap*[T](req: SyncRequest[T],
|
|||
var res = newStringOfCap(req.count * MAX_BLOBS_PER_BLOCK)
|
||||
var cur : uint64 = 0
|
||||
for slot in req.slot..<req.slot+req.count:
|
||||
if cur >= lenu64(data):
|
||||
res.add('|')
|
||||
continue
|
||||
if slot == data[cur].slot:
|
||||
for k in cur..<cur+MAX_BLOBS_PER_BLOCK:
|
||||
inc(cur)
|
||||
if slot == data[k].slot:
|
||||
res.add('x')
|
||||
else:
|
||||
if k >= lenu64(data) or slot != data[k].slot:
|
||||
res.add('|')
|
||||
break
|
||||
else:
|
||||
inc(cur)
|
||||
res.add('x')
|
||||
else:
|
||||
res.add('|')
|
||||
res
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
# beacon_chain
|
||||
# Copyright (c) 2020-2022 Status Research & Development GmbH
|
||||
# Copyright (c) 2020-2023 Status Research & Development GmbH
|
||||
# Licensed and distributed under either of
|
||||
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||
|
@ -1064,9 +1064,11 @@ suite "SyncManager test suite":
|
|||
checkResponse(r21, @[slots[3]]) == false
|
||||
|
||||
test "[SyncManager] groupBlobs() test":
|
||||
let blobs = createBlobs(@[Slot(11), Slot(11), Slot(12), Slot(14)])
|
||||
let req = SyncRequest[SomeTPeer](slot: Slot(10), count: 6'u64)
|
||||
let groupedRes = groupBlobs(req, blobs)
|
||||
var blobs = createBlobs(@[Slot(11), Slot(11), Slot(12), Slot(14)])
|
||||
var blocks = createChain(Slot(10), Slot(15))
|
||||
|
||||
let req = SyncRequest[SomeTPeer](slot: Slot(10))
|
||||
let groupedRes = groupBlobs(req, blocks, blobs)
|
||||
|
||||
check:
|
||||
groupedRes.isOk()
|
||||
|
@ -1092,6 +1094,29 @@ suite "SyncManager test suite":
|
|||
# slot 15
|
||||
len(grouped[5]) == 0
|
||||
|
||||
# Add block with a gap from previous block.
|
||||
let block17 = new (ref ForkedSignedBeaconBlock)
|
||||
block17[].phase0Data.message.slot = Slot(17)
|
||||
blocks.add(block17)
|
||||
let groupedRes2 = groupBlobs(req, blocks, blobs)
|
||||
|
||||
check:
|
||||
groupedRes2.isOk()
|
||||
let grouped2 = groupedRes2.get()
|
||||
check:
|
||||
len(grouped2) == 7
|
||||
len(grouped2[6]) == 0 # slot 17
|
||||
|
||||
let blob18 = new (ref BlobSidecar)
|
||||
blob18[].slot = Slot(18)
|
||||
blobs.add(blob18)
|
||||
let groupedRes3 = groupBlobs(req, blocks, blobs)
|
||||
|
||||
check:
|
||||
groupedRes3.isErr()
|
||||
|
||||
|
||||
|
||||
test "[SyncQueue#Forward] getRewindPoint() test":
|
||||
let aq = newAsyncQueue[BlockEntry]()
|
||||
block:
|
||||
|
|
Loading…
Reference in New Issue