mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-01-25 22:11:06 +00:00
Update sync to use post-decoupling RPC (#4701)
* Update sync to use post-decoupling RPCs blob_sidecars_by_range returns a flat list of sidecars, which must then be grouped per-slot. * Add test for groupBlobs * createBlobs: convert proc to func
This commit is contained in:
parent
2c0762013e
commit
90640cce05
@ -486,6 +486,7 @@ OK: 4/4 Fail: 0/4 Skip: 0/4
|
||||
## SyncManager test suite
|
||||
```diff
|
||||
+ Process all unviable blocks OK
|
||||
+ [SyncManager] groupBlobs() test OK
|
||||
+ [SyncQueue#Backward] Async unordered push test OK
|
||||
+ [SyncQueue#Backward] Async unordered push with rewind test OK
|
||||
+ [SyncQueue#Backward] Good response with missing values towards end OK
|
||||
@ -509,7 +510,7 @@ OK: 4/4 Fail: 0/4 Skip: 0/4
|
||||
+ [SyncQueue] getLastNonEmptySlot() test OK
|
||||
+ [SyncQueue] hasEndGap() test OK
|
||||
```
|
||||
OK: 23/23 Fail: 0/23 Skip: 0/23
|
||||
OK: 24/24 Fail: 0/24 Skip: 0/24
|
||||
## Type helpers
|
||||
```diff
|
||||
+ BeaconBlockType OK
|
||||
@ -625,4 +626,4 @@ OK: 2/2 Fail: 0/2 Skip: 0/2
|
||||
OK: 9/9 Fail: 0/9 Skip: 0/9
|
||||
|
||||
---TOTAL---
|
||||
OK: 346/351 Fail: 0/351 Skip: 5/351
|
||||
OK: 347/352 Fail: 0/352 Skip: 5/352
|
||||
|
@ -46,7 +46,7 @@ const
|
||||
## Number of slots from wall time that we start processing every payload
|
||||
|
||||
type
|
||||
BlobSidecars* = List[Blob, Limit MAX_BLOBS_PER_BLOCK]
|
||||
BlobSidecars* = seq[ref BlobSidecar]
|
||||
BlockEntry = object
|
||||
blck*: ForkedSignedBeaconBlock
|
||||
blobs*: BlobSidecars
|
||||
|
@ -337,7 +337,7 @@ proc initFullNode(
|
||||
maybeFinalized = maybeFinalized)
|
||||
resfut
|
||||
blockBlobsVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
|
||||
blobs: eip4844.BlobsSidecar,
|
||||
blobs: BlobSidecars,
|
||||
maybeFinalized: bool):
|
||||
Future[Result[void, VerifierError]] =
|
||||
# The design with a callback for block verification is unusual compared
|
||||
@ -346,7 +346,7 @@ proc initFullNode(
|
||||
# that should probably be reimagined more holistically in the future.
|
||||
let resfut = newFuture[Result[void, VerifierError]]("blockVerifier")
|
||||
blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
|
||||
BlobSidecars @[], resfut, maybeFinalized = maybeFinalized)
|
||||
blobs, resfut, maybeFinalized = maybeFinalized)
|
||||
resfut
|
||||
processor = Eth2Processor.new(
|
||||
config.doppelgangerDetection,
|
||||
|
@ -14,7 +14,9 @@ import
|
||||
../spec/forks,
|
||||
../networking/eth2_network,
|
||||
../consensus_object_pools/block_quarantine,
|
||||
"."/sync_protocol, "."/sync_manager
|
||||
"."/sync_protocol, "."/sync_manager,
|
||||
../gossip_processing/block_processor
|
||||
|
||||
from ../beacon_clock import GetBeaconTimeFn
|
||||
export block_quarantine, sync_manager
|
||||
|
||||
@ -33,7 +35,7 @@ type
|
||||
proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool):
|
||||
Future[Result[void, VerifierError]] {.gcsafe, raises: [Defect].}
|
||||
BlockBlobsVerifier* =
|
||||
proc(signedBlock: ForkedSignedBeaconBlock, blobs: eip4844.BlobsSidecar,
|
||||
proc(signedBlock: ForkedSignedBeaconBlock, blobs: BlobSidecars,
|
||||
maybeFinalized: bool):
|
||||
Future[Result[void, VerifierError]] {.gcsafe, raises: [Defect].}
|
||||
|
||||
|
@ -14,6 +14,7 @@ import
|
||||
../spec/eth2_apis/rest_types,
|
||||
../spec/[helpers, forks, network],
|
||||
../networking/[peer_pool, peer_scores, eth2_network],
|
||||
../gossip_processing/block_processor,
|
||||
../beacon_clock,
|
||||
"."/[sync_protocol, sync_queue]
|
||||
|
||||
@ -77,7 +78,7 @@ type
|
||||
slots*: uint64
|
||||
|
||||
BeaconBlocksRes = NetRes[List[ref ForkedSignedBeaconBlock, MAX_REQUEST_BLOCKS]]
|
||||
BlobsSidecarRes = NetRes[List[ref BlobsSidecar, MAX_REQUEST_BLOBS_SIDECARS]]
|
||||
BlobSidecarsRes = NetRes[List[ref BlobSidecar, Limit(MAX_REQUEST_BLOBS_SIDECARS * MAX_BLOBS_PER_BLOCK)]]
|
||||
|
||||
proc now*(sm: typedesc[SyncMoment], slots: uint64): SyncMoment {.inline.} =
|
||||
SyncMoment(stamp: now(chronos.Moment), slots: slots)
|
||||
@ -193,8 +194,8 @@ proc shouldGetBlobs[A, B](man: SyncManager[A, B], e: Epoch): bool =
|
||||
(wallEpoch < MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS or
|
||||
e >= wallEpoch - MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS)
|
||||
|
||||
proc getBlobsSidecars*[A, B](man: SyncManager[A, B], peer: A,
|
||||
req: SyncRequest): Future[BlobsSidecarRes] {.async.} =
|
||||
proc getBlobSidecars*[A, B](man: SyncManager[A, B], peer: A,
|
||||
req: SyncRequest): Future[BlobSidecarsRes] {.async.} =
|
||||
mixin getScore, `==`
|
||||
|
||||
logScope:
|
||||
@ -207,18 +208,18 @@ proc getBlobsSidecars*[A, B](man: SyncManager[A, B], peer: A,
|
||||
doAssert(not(req.isEmpty()), "Request must not be empty!")
|
||||
debug "Requesting blobs sidecars from peer", request = req
|
||||
try:
|
||||
let res = await blobsSidecarsByRange(peer, req.slot, req.count)
|
||||
let res = await blobSidecarsByRange(peer, req.slot, req.count)
|
||||
|
||||
if res.isErr():
|
||||
debug "Error, while reading blobsSidecarsByRange response", request = req,
|
||||
debug "Error, while reading blobSidecarsByRange response", request = req,
|
||||
error = $res.error()
|
||||
return
|
||||
return res
|
||||
except CancelledError:
|
||||
debug "Interrupt, while waiting blobsSidecarsByRange response", request = req
|
||||
debug "Interrupt, while waiting blobSidecarsByRange response", request = req
|
||||
return
|
||||
except CatchableError as exc:
|
||||
debug "Error, while waiting blobsSidecarsByRange response", request = req,
|
||||
debug "Error, while waiting blobSidecarsByRange response", request = req,
|
||||
errName = exc.name, errMsg = exc.msg
|
||||
return
|
||||
|
||||
@ -237,6 +238,23 @@ proc remainingSlots(man: SyncManager): uint64 =
|
||||
else:
|
||||
0'u64
|
||||
|
||||
func groupBlobs*[T](req: SyncRequest[T], blobs: seq[ref BlobSidecar]):
|
||||
Result[seq[BlobSidecars], string] =
|
||||
var grouped = newSeq[BlobSidecars](req.count)
|
||||
var rawCur = 0
|
||||
|
||||
for groupedCur in 0 ..< len(grouped):
|
||||
grouped[groupedCur] = newSeq[ref BlobSidecar](0)
|
||||
let slot = req.slot + groupedCur.uint64
|
||||
while rawCur < len(blobs) and blobs[rawCur].slot == slot:
|
||||
grouped[groupedCur].add(blobs[rawCur])
|
||||
inc(rawCur)
|
||||
|
||||
if rawCur != len(blobs):
|
||||
result.err "invalid blob sequence"
|
||||
else:
|
||||
result.ok grouped
|
||||
|
||||
proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
|
||||
logScope:
|
||||
peer_score = peer.getScore()
|
||||
@ -394,30 +412,43 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
|
||||
request = req
|
||||
return
|
||||
|
||||
func combine(acc: seq[Slot], cur: Slot): seq[Slot] =
|
||||
var copy = acc
|
||||
if copy[copy.len-1] != cur:
|
||||
copy.add(cur)
|
||||
copy
|
||||
|
||||
let blobData =
|
||||
if man.shouldGetBlobs(req.slot.epoch):
|
||||
let blobs = await man.getBlobsSidecars(peer, req)
|
||||
let blobs = await man.getBlobSidecars(peer, req)
|
||||
if blobs.isErr():
|
||||
peer.updateScore(PeerScoreNoValues)
|
||||
man.queue.push(req)
|
||||
debug "Failed to receive blobs on request", request = req
|
||||
return
|
||||
let blobData = blobs.get().asSeq()
|
||||
let slots = mapIt(blobData, it[].beacon_block_slot)
|
||||
if not(checkResponse(req, slots)):
|
||||
let slots = mapIt(blobData, it[].slot)
|
||||
let blobSmap = getShortMap(req, blobData)
|
||||
debug "Received blobs on request", blobs_count = len(blobData),
|
||||
blobs_map = blobSmap, request = req
|
||||
|
||||
let uniqueSlots = foldl(slots, combine(a, b), @[slots[0]])
|
||||
if not(checkResponse(req, uniqueSlots)):
|
||||
peer.updateScore(PeerScoreBadResponse)
|
||||
warn "Received blobs sequence is not in requested range",
|
||||
blobs_count = len(blobData), blobs_map = getShortMap(req, blobData),
|
||||
request = req
|
||||
return
|
||||
Opt.some(blobData)
|
||||
let groupedBlobs = groupBlobs(req, blobData)
|
||||
if groupedBlobs.isErr():
|
||||
warn "Received blobs sequence is invalid",
|
||||
blobs_map = getShortMap(req, blobData), request = req, msg=groupedBlobs.error()
|
||||
return
|
||||
Opt.some(groupedBlobs.get())
|
||||
else:
|
||||
Opt.none(seq[ref BlobsSidecar])
|
||||
Opt.none(seq[BlobSidecars])
|
||||
|
||||
if blobData.isSome:
|
||||
let blobSmap = getShortMap(req, blobData.get(@[]))
|
||||
debug "Received blobs on request", blobs_count = len(blobData.get(@[])),
|
||||
blobs_map = blobSmap, request = req
|
||||
let blobs = blobData.get()
|
||||
if len(blobs) != len(blockData):
|
||||
info "block and blobs have different lengths", blobs=len(blobs), blocks=len(blockData)
|
||||
@ -425,7 +456,7 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
|
||||
man.queue.push(req)
|
||||
return
|
||||
for i, blk in blockData:
|
||||
if blk[].slot != blobs[i].beacon_block_slot:
|
||||
if len(blobs[i]) > 0 and blk[].slot != blobs[i][0].slot:
|
||||
peer.updateScore(PeerScoreNoValues)
|
||||
man.queue.push(req)
|
||||
debug "block and blobs data have inconsistent slots"
|
||||
|
@ -120,24 +120,6 @@ proc readChunkPayload*(
|
||||
else:
|
||||
return neterr InvalidContextBytes
|
||||
|
||||
proc readChunkPayload*(
|
||||
conn: Connection, peer: Peer, MsgType: type (ref BlobsSidecar)):
|
||||
Future[NetRes[MsgType]] {.async.} =
|
||||
var contextBytes: ForkDigest
|
||||
try:
|
||||
await conn.readExactly(addr contextBytes, sizeof contextBytes)
|
||||
except CatchableError:
|
||||
return neterr UnexpectedEOF
|
||||
|
||||
if contextBytes == peer.network.forkDigests.eip4844:
|
||||
let res = await readChunkPayload(conn, peer, BlobsSidecar)
|
||||
if res.isOk:
|
||||
return ok newClone(res.get)
|
||||
else:
|
||||
return err(res.error)
|
||||
else:
|
||||
return neterr InvalidContextBytes
|
||||
|
||||
proc readChunkPayload*(
|
||||
conn: Connection, peer: Peer, MsgType: type (ref BlobSidecar)):
|
||||
Future[NetRes[MsgType]] {.async.} =
|
||||
@ -447,77 +429,6 @@ p2pProtocol BeaconSync(version = 1,
|
||||
peer, roots = blockRoots.len, count, found
|
||||
|
||||
|
||||
# https://github.com/ethereum/consensus-specs/blob/v1.3.0-rc.0/specs/eip4844/p2p-interface.md#blobssidecarsbyrange-v1
|
||||
proc blobsSidecarsByRange(
|
||||
peer: Peer,
|
||||
startSlot: Slot,
|
||||
reqCount: uint64,
|
||||
response: MultipleChunksResponse[
|
||||
ref BlobsSidecar, MAX_REQUEST_BLOBS_SIDECARS])
|
||||
{.async, libp2pProtocol("blobs_sidecars_by_range", 1).} =
|
||||
# TODO This code is more complicated than it needs to be, since the type
|
||||
# of the multiple chunks response is not actually used in this server
|
||||
# implementation (it's used to derive the signature of the client
|
||||
# function, not in the code below!)
|
||||
# TODO although you can't tell from this function definition, a magic
|
||||
# client call that returns `seq[ref BlobsSidecar]` will
|
||||
# will be generated by the libp2p macro - we guarantee that seq items
|
||||
# are `not-nil` in the implementation
|
||||
|
||||
trace "got blobs range request", peer, startSlot, count = reqCount
|
||||
if reqCount == 0:
|
||||
raise newException(InvalidInputsError, "Empty range requested")
|
||||
|
||||
let
|
||||
dag = peer.networkState.dag
|
||||
epochBoundary =
|
||||
if MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS >= dag.head.slot.epoch:
|
||||
GENESIS_EPOCH
|
||||
else:
|
||||
dag.head.slot.epoch - MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS
|
||||
|
||||
if startSlot.epoch < epochBoundary:
|
||||
raise newException(ResourceUnavailableError, BlobsOutOfRange)
|
||||
|
||||
var blockIds: array[MAX_REQUEST_BLOBS_SIDECARS, BlockId]
|
||||
let
|
||||
# Limit number of blocks in response
|
||||
count = int min(reqCount, blockIds.lenu64)
|
||||
endIndex = count - 1
|
||||
startIndex =
|
||||
dag.getBlockRange(startSlot, 1, blockIds.toOpenArray(0, endIndex))
|
||||
|
||||
var
|
||||
found = 0
|
||||
bytes: seq[byte]
|
||||
|
||||
for i in startIndex..endIndex:
|
||||
if dag.db.getBlobsSidecarSZ(blockIds[i].root, bytes):
|
||||
# In general, there is not much intermediate time between post-merge
|
||||
# blocks all being optimistic and none of them being optimistic. The
|
||||
# EL catches up, tells the CL the head is verified, and that's it.
|
||||
if blockIds[i].slot.epoch >= dag.cfg.BELLATRIX_FORK_EPOCH and
|
||||
dag.is_optimistic(dag.head.root):
|
||||
continue
|
||||
|
||||
let uncompressedLen = uncompressedLenFramed(bytes).valueOr:
|
||||
warn "Cannot read blobs sidecar size, database corrupt?",
|
||||
bytes = bytes.len(), blck = shortLog(blockIds[i])
|
||||
continue
|
||||
|
||||
# TODO extract from libp2pProtocol
|
||||
peer.awaitQuota(blockResponseCost, "blobs_sidecars_by_range/1")
|
||||
peer.network.awaitQuota(blockResponseCost, "blobs_sidecars_by_range/1")
|
||||
|
||||
await response.writeBytesSZ(
|
||||
uncompressedLen, bytes,
|
||||
peer.networkState.forkDigestAtEpoch(blockIds[i].slot.epoch).data)
|
||||
|
||||
inc found
|
||||
|
||||
debug "BlobsSidecar range request done",
|
||||
peer, startSlot, count
|
||||
|
||||
# https://github.com/ethereum/consensus-specs/blob/v1.3.0-rc.3/specs/deneb/p2p-interface.md#blobsidecarsbyroot-v1
|
||||
proc blobSidecarsByRoot(
|
||||
peer: Peer,
|
||||
|
@ -29,7 +29,7 @@ type
|
||||
proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool):
|
||||
Future[Result[void, VerifierError]] {.gcsafe, raises: [Defect].}
|
||||
BlockBlobsVerifier* =
|
||||
proc(signedBlock: ForkedSignedBeaconBlock, blobs: eip4844.BlobsSidecar,
|
||||
proc(signedBlock: ForkedSignedBeaconBlock, blobs: BlobSidecars,
|
||||
maybeFinalized: bool):
|
||||
Future[Result[void, VerifierError]] {.gcsafe, raises: [Defect].}
|
||||
|
||||
@ -46,7 +46,7 @@ type
|
||||
SyncResult*[T] = object
|
||||
request*: SyncRequest[T]
|
||||
data*: seq[ref ForkedSignedBeaconBlock]
|
||||
blobs*: Opt[seq[ref BlobsSidecar]]
|
||||
blobs*: Opt[seq[BlobSidecars]]
|
||||
|
||||
GapItem*[T] = object
|
||||
start*: Slot
|
||||
@ -116,24 +116,21 @@ proc getShortMap*[T](req: SyncRequest[T],
|
||||
res
|
||||
|
||||
proc getShortMap*[T](req: SyncRequest[T],
|
||||
data: openArray[ref BlobsSidecar]): string =
|
||||
data: openArray[ref BlobSidecar]): string =
|
||||
## Returns all slot numbers in ``data`` as placement map.
|
||||
var res = newStringOfCap(req.count)
|
||||
var slider = req.slot
|
||||
var last = 0
|
||||
for i in 0 ..< req.count:
|
||||
if last < len(data):
|
||||
for k in last ..< len(data):
|
||||
if slider == data[k].beacon_block_slot:
|
||||
var res = newStringOfCap(req.count * MAX_BLOBS_PER_BLOCK)
|
||||
var cur : uint64 = 0
|
||||
for slot in req.slot..<req.slot+req.count:
|
||||
if slot == data[cur].slot:
|
||||
for k in cur..<cur+MAX_BLOBS_PER_BLOCK:
|
||||
inc(cur)
|
||||
if slot == data[k].slot:
|
||||
res.add('x')
|
||||
last = k + 1
|
||||
break
|
||||
elif slider < data[k].beacon_block_slot:
|
||||
res.add('.')
|
||||
else:
|
||||
res.add('|')
|
||||
break
|
||||
else:
|
||||
res.add('.')
|
||||
slider = slider + 1
|
||||
res.add('|')
|
||||
res
|
||||
|
||||
proc contains*[T](req: SyncRequest[T], slot: Slot): bool {.inline.} =
|
||||
@ -607,7 +604,7 @@ func numAlreadyKnownSlots[T](sq: SyncQueue[T], sr: SyncRequest[T]): uint64 =
|
||||
|
||||
proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
|
||||
data: seq[ref ForkedSignedBeaconBlock],
|
||||
blobs: Opt[seq[ref BlobsSidecar]],
|
||||
blobs: Opt[seq[BlobSidecars]],
|
||||
maybeFinalized: bool = false,
|
||||
processingCb: ProcessingCallback = nil) {.async.} =
|
||||
logScope:
|
||||
@ -689,7 +686,7 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
|
||||
if reqres.get().blobs.isNone():
|
||||
res = await sq.blockVerifier(blk[], maybeFinalized)
|
||||
else:
|
||||
res = await sq.blockBlobsVerifier(blk[], reqres.get().blobs.get()[i][], maybeFinalized)
|
||||
res = await sq.blockBlobsVerifier(blk[], reqres.get().blobs.get()[i], maybeFinalized)
|
||||
inc(i)
|
||||
|
||||
if res.isOk():
|
||||
|
@ -70,6 +70,13 @@ suite "SyncManager test suite":
|
||||
curslot = curslot + 1'u64
|
||||
res
|
||||
|
||||
func createBlobs(slots: seq[Slot]): seq[ref BlobSidecar] =
|
||||
var res = newSeq[ref BlobSidecar](len(slots))
|
||||
for (i, item) in res.mpairs():
|
||||
item = new BlobSidecar
|
||||
item[].slot = slots[i]
|
||||
res
|
||||
|
||||
proc getSlice(chain: openArray[ref ForkedSignedBeaconBlock], startSlot: Slot,
|
||||
request: SyncRequest[SomeTPeer]): seq[ref ForkedSignedBeaconBlock] =
|
||||
let
|
||||
@ -326,7 +333,7 @@ suite "SyncManager test suite":
|
||||
if request.isEmpty():
|
||||
break
|
||||
await queue.push(request, getSlice(chain, start, request),
|
||||
Opt.none(seq[ref BlobsSidecar]))
|
||||
Opt.none(seq[BlobSidecars]))
|
||||
await validatorFut.cancelAndWait()
|
||||
|
||||
waitFor runSmokeTest()
|
||||
@ -401,7 +408,7 @@ suite "SyncManager test suite":
|
||||
var r13 = queue.pop(finishSlot, p3)
|
||||
|
||||
var f13 = queue.push(r13, chain.getSlice(startSlot, r13),
|
||||
Opt.none(seq[ref BlobsSidecar]))
|
||||
Opt.none(seq[BlobSidecars]))
|
||||
await sleepAsync(100.milliseconds)
|
||||
check:
|
||||
f13.finished == false
|
||||
@ -410,7 +417,7 @@ suite "SyncManager test suite":
|
||||
of SyncQueueKind.Backward: counter == int(finishSlot)
|
||||
|
||||
var f11 = queue.push(r11, chain.getSlice(startSlot, r11),
|
||||
Opt.none(seq[ref BlobsSidecar]))
|
||||
Opt.none(seq[BlobSidecars]))
|
||||
await sleepAsync(100.milliseconds)
|
||||
check:
|
||||
case kkind
|
||||
@ -420,7 +427,7 @@ suite "SyncManager test suite":
|
||||
f13.finished == false
|
||||
|
||||
var f12 = queue.push(r12, chain.getSlice(startSlot, r12),
|
||||
Opt.none(seq[ref BlobsSidecar]))
|
||||
Opt.none(seq[BlobSidecars]))
|
||||
await allFutures(f11, f12, f13)
|
||||
check:
|
||||
f12.finished == true and f12.failed == false
|
||||
@ -523,7 +530,7 @@ suite "SyncManager test suite":
|
||||
check response[0][].slot >= getFowardSafeSlotCb()
|
||||
else:
|
||||
check response[^1][].slot <= getBackwardSafeSlotCb()
|
||||
await queue.push(request, response, Opt.none(seq[ref BlobsSidecar]))
|
||||
await queue.push(request, response, Opt.none(seq[BlobSidecars]))
|
||||
await validatorFut.cancelAndWait()
|
||||
|
||||
waitFor runTest()
|
||||
@ -606,7 +613,7 @@ suite "SyncManager test suite":
|
||||
|
||||
# Handle request 1. Should be re-enqueued as it simulates `Invalid`.
|
||||
let response1 = getSlice(chain, start, request1)
|
||||
await queue.push(request1, response1, Opt.none(seq[ref BlobsSidecar]))
|
||||
await queue.push(request1, response1, Opt.none(seq[BlobSidecars]))
|
||||
check debtLen(queue) == request2.count + request1.count
|
||||
|
||||
# Request 1 should be discarded as it is no longer relevant.
|
||||
@ -618,7 +625,7 @@ suite "SyncManager test suite":
|
||||
|
||||
# Handle request 3. Should be re-enqueued as it simulates `Invalid`.
|
||||
let response3 = getSlice(chain, start, request3)
|
||||
await queue.push(request3, response3, Opt.none(seq[ref BlobsSidecar]))
|
||||
await queue.push(request3, response3, Opt.none(seq[BlobSidecars]))
|
||||
check debtLen(queue) == request3.count
|
||||
|
||||
# Request 2 should be re-issued.
|
||||
@ -632,7 +639,7 @@ suite "SyncManager test suite":
|
||||
|
||||
# Handle request 4. Should be re-enqueued as it simulates `Invalid`.
|
||||
let response4 = getSlice(chain, start, request4)
|
||||
await queue.push(request4, response4, Opt.none(seq[ref BlobsSidecar]))
|
||||
await queue.push(request4, response4, Opt.none(seq[BlobSidecars]))
|
||||
check debtLen(queue) == request4.count
|
||||
|
||||
# Advance `safeSlot` out of band.
|
||||
@ -749,14 +756,14 @@ suite "SyncManager test suite":
|
||||
var r14 = queue.pop(finishSlot, p4)
|
||||
|
||||
var f14 = queue.push(r14, chain.getSlice(startSlot, r14),
|
||||
Opt.none(seq[ref BlobsSidecar]))
|
||||
Opt.none(seq[BlobSidecars]))
|
||||
await sleepAsync(100.milliseconds)
|
||||
check:
|
||||
f14.finished == false
|
||||
counter == int(startSlot)
|
||||
|
||||
var f12 = queue.push(r12, chain.getSlice(startSlot, r12),
|
||||
Opt.none(seq[ref BlobsSidecar]))
|
||||
Opt.none(seq[BlobSidecars]))
|
||||
await sleepAsync(100.milliseconds)
|
||||
check:
|
||||
counter == int(startSlot)
|
||||
@ -764,7 +771,7 @@ suite "SyncManager test suite":
|
||||
f14.finished == false
|
||||
|
||||
var f11 = queue.push(r11, chain.getSlice(startSlot, r11),
|
||||
Opt.none(seq[ref BlobsSidecar]))
|
||||
Opt.none(seq[BlobSidecars]))
|
||||
await allFutures(f11, f12)
|
||||
check:
|
||||
counter == int(startSlot + chunkSize + chunkSize)
|
||||
@ -776,7 +783,7 @@ suite "SyncManager test suite":
|
||||
withBlck(missingSlice[0][]):
|
||||
blck.message.proposer_index = 0xDEADBEAF'u64
|
||||
var f13 = queue.push(r13, missingSlice,
|
||||
Opt.none(seq[ref BlobsSidecar]))
|
||||
Opt.none(seq[BlobSidecars]))
|
||||
await allFutures(f13, f14)
|
||||
check:
|
||||
f11.finished == true and f11.failed == false
|
||||
@ -798,17 +805,17 @@ suite "SyncManager test suite":
|
||||
check r18.isEmpty() == true
|
||||
|
||||
var f17 = queue.push(r17, chain.getSlice(startSlot, r17),
|
||||
Opt.none(seq[ref BlobsSidecar]))
|
||||
Opt.none(seq[BlobSidecars]))
|
||||
await sleepAsync(100.milliseconds)
|
||||
check f17.finished == false
|
||||
|
||||
var f16 = queue.push(r16, chain.getSlice(startSlot, r16),
|
||||
Opt.none(seq[ref BlobsSidecar]))
|
||||
Opt.none(seq[BlobSidecars]))
|
||||
await sleepAsync(100.milliseconds)
|
||||
check f16.finished == false
|
||||
|
||||
var f15 = queue.push(r15, chain.getSlice(startSlot, r15),
|
||||
Opt.none(seq[ref BlobsSidecar]))
|
||||
Opt.none(seq[BlobSidecars]))
|
||||
await allFutures(f15, f16, f17)
|
||||
check:
|
||||
f15.finished == true and f15.failed == false
|
||||
@ -855,7 +862,7 @@ suite "SyncManager test suite":
|
||||
|
||||
# Push a single request that will fail with all blocks being unviable
|
||||
var f11 = queue.push(r11, chain.getSlice(startSlot, r11),
|
||||
Opt.none(seq[ref BlobsSidecar]))
|
||||
Opt.none(seq[BlobSidecars]))
|
||||
discard await f11.withTimeout(100.milliseconds)
|
||||
|
||||
check:
|
||||
@ -921,14 +928,14 @@ suite "SyncManager test suite":
|
||||
var r14 = queue.pop(finishSlot, p4)
|
||||
|
||||
var f14 = queue.push(r14, chain.getSlice(startSlot, r14),
|
||||
Opt.none(seq[ref BlobsSidecar]))
|
||||
Opt.none(seq[BlobSidecars]))
|
||||
await sleepAsync(100.milliseconds)
|
||||
check:
|
||||
f14.finished == false
|
||||
counter == int(finishSlot)
|
||||
|
||||
var f12 = queue.push(r12, chain.getSlice(startSlot, r12),
|
||||
Opt.none(seq[ref BlobsSidecar]))
|
||||
Opt.none(seq[BlobSidecars]))
|
||||
await sleepAsync(100.milliseconds)
|
||||
check:
|
||||
counter == int(finishSlot)
|
||||
@ -936,7 +943,7 @@ suite "SyncManager test suite":
|
||||
f14.finished == false
|
||||
|
||||
var f11 = queue.push(r11, chain.getSlice(startSlot, r11),
|
||||
Opt.none(seq[ref BlobsSidecar]))
|
||||
Opt.none(seq[BlobSidecars]))
|
||||
await allFutures(f11, f12)
|
||||
check:
|
||||
counter == int(finishSlot - chunkSize - chunkSize)
|
||||
@ -947,7 +954,7 @@ suite "SyncManager test suite":
|
||||
var missingSlice = chain.getSlice(startSlot, r13)
|
||||
withBlck(missingSlice[0][]):
|
||||
blck.message.proposer_index = 0xDEADBEAF'u64
|
||||
var f13 = queue.push(r13, missingSlice, Opt.none(seq[ref BlobsSidecar]))
|
||||
var f13 = queue.push(r13, missingSlice, Opt.none(seq[BlobSidecars]))
|
||||
await allFutures(f13, f14)
|
||||
check:
|
||||
f11.finished == true and f11.failed == false
|
||||
@ -965,12 +972,12 @@ suite "SyncManager test suite":
|
||||
check r17.isEmpty() == true
|
||||
|
||||
var f16 = queue.push(r16, chain.getSlice(startSlot, r16),
|
||||
Opt.none(seq[ref BlobsSidecar]))
|
||||
Opt.none(seq[BlobSidecars]))
|
||||
await sleepAsync(100.milliseconds)
|
||||
check f16.finished == false
|
||||
|
||||
var f15 = queue.push(r15, chain.getSlice(startSlot, r15),
|
||||
Opt.none(seq[ref BlobsSidecar]))
|
||||
Opt.none(seq[BlobSidecars]))
|
||||
await allFutures(f15, f16)
|
||||
check:
|
||||
f15.finished == true and f15.failed == false
|
||||
@ -1055,6 +1062,35 @@ suite "SyncManager test suite":
|
||||
checkResponse(r21, @[slots[2], slots[3]]) == false
|
||||
checkResponse(r21, @[slots[3]]) == false
|
||||
|
||||
test "[SyncManager] groupBlobs() test":
|
||||
let blobs = createBlobs(@[Slot(11), Slot(11), Slot(12), Slot(14)])
|
||||
let req = SyncRequest[SomeTPeer](slot: Slot(10), count: 6'u64)
|
||||
let groupedRes = groupBlobs(req, blobs)
|
||||
|
||||
check:
|
||||
groupedRes.isOk()
|
||||
|
||||
let grouped = groupedRes.get()
|
||||
|
||||
check:
|
||||
len(grouped) == 6
|
||||
# slot 10
|
||||
len(grouped[0]) == 0
|
||||
# slot 11
|
||||
len(grouped[1]) == 2
|
||||
grouped[1][0].slot == Slot(11)
|
||||
grouped[1][1].slot == Slot(11)
|
||||
# slot 12
|
||||
len(grouped[2]) == 1
|
||||
grouped[2][0].slot == Slot(12)
|
||||
# slot 13
|
||||
len(grouped[3]) == 0
|
||||
# slot 14
|
||||
len(grouped[4]) == 1
|
||||
grouped[4][0].slot == Slot(14)
|
||||
# slot 15
|
||||
len(grouped[5]) == 0
|
||||
|
||||
test "[SyncQueue#Forward] getRewindPoint() test":
|
||||
let aq = newAsyncQueue[BlockEntry]()
|
||||
block:
|
||||
|
Loading…
x
Reference in New Issue
Block a user