EIP4844 sync (#4581)

* EIP4844 Sync

* Pass eip4844 fork epoch rather than cfg to syncmanager

* Fix sync

* Update test

* map->mapIt
This commit is contained in:
henridf 2023-02-11 21:48:35 +01:00 committed by GitHub
parent db91710ee2
commit 59e41dc65d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 288 additions and 118 deletions

View File

@ -180,7 +180,7 @@ proc storeBackfillBlock(
# writing the block in case of blob error.
let blobsOk =
when typeof(signedBlock).toFork() >= ConsensusFork.EIP4844:
blobs.isSome() and not
blobs.isNone or
validate_blobs_sidecar(signedBlock.message.slot,
signedBlock.root,
signedBlock.message

View File

@ -800,7 +800,8 @@ func chunkMaxSize[T](): uint32 =
# compiler error on (T: type) syntax...
when T is ForkySignedBeaconBlock:
when T is phase0.SignedBeaconBlock or T is altair.SignedBeaconBlock or
T is bellatrix.SignedBeaconBlock or T is capella.SignedBeaconBlock:
T is bellatrix.SignedBeaconBlock or T is capella.SignedBeaconBlock or
T is eip4844.SignedBeaconBlock:
MAX_CHUNK_SIZE_BELLATRIX
else:
{.fatal: "what's the chunk size here?".}

View File

@ -330,10 +330,13 @@ proc initFullNode(
# that should probably be reimagined more holistically in the future.
let resfut = newFuture[Result[void, VerifierError]]("blockVerifier")
blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.none(eip4844.BlobsSidecar), resfut,
Opt.none(eip4844.BlobsSidecar),
resfut,
maybeFinalized = maybeFinalized)
resfut
blockBlobsVerifier = proc(signedBlock: ForkedSignedBeaconBlock, blobs: eip4844.BlobsSidecar):
blockBlobsVerifier = proc(signedBlock: ForkedSignedBeaconBlock,
blobs: eip4844.BlobsSidecar,
maybeFinalized: bool):
Future[Result[void, VerifierError]] =
# The design with a callback for block verification is unusual compared
# to the rest of the application, but fits with the general approach
@ -341,7 +344,7 @@ proc initFullNode(
# that should probably be reimagined more holistically in the future.
let resfut = newFuture[Result[void, VerifierError]]("blockVerifier")
blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.some(blobs), resfut)
Opt.some(blobs), resfut, maybeFinalized = maybeFinalized)
resfut
processor = Eth2Processor.new(
config.doppelgangerDetection,
@ -349,13 +352,14 @@ proc initFullNode(
validatorChangePool, node.attachedValidators, syncCommitteeMsgPool,
lightClientPool, quarantine, rng, getBeaconTime, taskpool)
syncManager = newSyncManager[Peer, PeerId](
node.network.peerPool, SyncQueueKind.Forward, getLocalHeadSlot,
node.network.peerPool, dag.cfg.EIP4844_FORK_EPOCH, SyncQueueKind.Forward, getLocalHeadSlot,
getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot,
getFrontfillSlot, dag.tail.slot, blockVerifier)
getFrontfillSlot, dag.tail.slot, blockVerifier, blockBlobsVerifier)
backfiller = newSyncManager[Peer, PeerId](
node.network.peerPool, SyncQueueKind.Backward, getLocalHeadSlot,
node.network.peerPool, dag.cfg.EIP4844_FORK_EPOCH, SyncQueueKind.Backward, getLocalHeadSlot,
getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot,
getFrontfillSlot, dag.backfill.slot, blockVerifier, maxHeadAge = 0)
getFrontfillSlot, dag.backfill.slot, blockVerifier, blockBlobsVerifier,
maxHeadAge = 0)
router = (ref MessageRouter)(
processor: processor,
network: node.network)

View File

@ -33,7 +33,8 @@ type
proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool):
Future[Result[void, VerifierError]] {.gcsafe, raises: [Defect].}
BlockBlobsVerifier* =
proc(signedBlock: ForkedSignedBeaconBlock, blobs: eip4844.BlobsSidecar):
proc(signedBlock: ForkedSignedBeaconBlock, blobs: eip4844.BlobsSidecar,
maybeFinalized: bool):
Future[Result[void, VerifierError]] {.gcsafe, raises: [Defect].}
RequestManager* = object
@ -173,7 +174,8 @@ proc fetchAncestorBlocksAndBlobsFromNetwork(rman: RequestManager,
debug "Requesting blocks and sidecars by root",
peer = peer, blocks = shortLog(items), peer_score = peer.getScore()
let blocks = (await beaconBlockAndBlobsSidecarByRoot_v1(peer, BlockRootsList items))
let blocks = await beaconBlockAndBlobsSidecarByRoot_v1(peer,
BlockRootsList items)
if blocks.isOk:
let ublocks = blocks.get()
@ -183,7 +185,9 @@ proc fetchAncestorBlocksAndBlobsFromNetwork(rman: RequestManager,
gotUnviableBlock = false
for b in ublocks:
let ver = await rman.blockBlobsVerifier(ForkedSignedBeaconBlock.init(b[].beacon_block), b[].blobs_sidecar)
let ver = await rman.blockBlobsVerifier(
ForkedSignedBeaconBlock.init(b[].beacon_block),
b[].blobs_sidecar, false)
if ver.isErr():
case ver.error()
of VerifierError.MissingParent:

View File

@ -47,6 +47,7 @@ type
SyncManager*[A, B] = ref object
pool: PeerPool[A, B]
EIP4844_FORK_EPOCH: Epoch
responseTimeout: chronos.Duration
maxHeadAge: uint64
getLocalHeadSlot: GetSlotCallback
@ -62,6 +63,7 @@ type
queue: SyncQueue[A]
syncFut: Future[void]
blockVerifier: BlockVerifier
blockBlobsVerifier: BlockBlobsVerifier
inProgress*: bool
insSyncSpeed*: float
avgSyncSpeed*: float
@ -75,6 +77,7 @@ type
slots*: uint64
BeaconBlocksRes = NetRes[List[ref ForkedSignedBeaconBlock, MAX_REQUEST_BLOCKS]]
BlobsSidecarRes = NetRes[List[ref BlobsSidecar, MAX_REQUEST_BLOBS_SIDECARS]]
proc now*(sm: typedesc[SyncMoment], slots: uint64): SyncMoment {.inline.} =
SyncMoment(stamp: now(chronos.Moment), slots: slots)
@ -94,7 +97,8 @@ proc initQueue[A, B](man: SyncManager[A, B]) =
of SyncQueueKind.Forward:
man.queue = SyncQueue.init(A, man.direction, man.getFirstSlot(),
man.getLastSlot(), man.chunkSize,
man.getSafeSlot, man.blockVerifier, 1,
man.getSafeSlot, man.blockVerifier,
man.blockBlobsVerifier, 1,
man.ident)
of SyncQueueKind.Backward:
let
@ -108,10 +112,11 @@ proc initQueue[A, B](man: SyncManager[A, B]) =
Slot(firstSlot - 1'u64)
man.queue = SyncQueue.init(A, man.direction, startSlot, lastSlot,
man.chunkSize, man.getSafeSlot,
man.blockVerifier, 1,
man.blockVerifier, man.blockBlobsVerifier, 1,
man.ident)
proc newSyncManager*[A, B](pool: PeerPool[A, B],
eip4844Epoch: Epoch,
direction: SyncQueueKind,
getLocalHeadSlotCb: GetSlotCallback,
getLocalWallSlotCb: GetSlotCallback,
@ -120,6 +125,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
getFrontfillSlotCb: GetSlotCallback,
progressPivot: Slot,
blockVerifier: BlockVerifier,
blockBlobsVerifier: BlockBlobsVerifier,
maxHeadAge = uint64(SLOTS_PER_EPOCH * 1),
chunkSize = uint64(SLOTS_PER_EPOCH),
flags: set[SyncManagerFlag] = {},
@ -133,6 +139,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
var res = SyncManager[A, B](
pool: pool,
EIP4844_FORK_EPOCH: eip4844Epoch,
getLocalHeadSlot: getLocalHeadSlotCb,
getLocalWallSlot: getLocalWallSlotCb,
getSafeSlot: getSafeSlot,
@ -142,6 +149,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
maxHeadAge: maxHeadAge,
chunkSize: chunkSize,
blockVerifier: blockVerifier,
blockBlobsVerifier: blockBlobsVerifier,
notInSyncEvent: newAsyncEvent(),
direction: direction,
ident: ident,
@ -179,6 +187,41 @@ proc getBlocks*[A, B](man: SyncManager[A, B], peer: A,
errName = exc.name, errMsg = exc.msg
return
proc shouldGetBlobs[A, B](man: SyncManager[A, B], e: Epoch): bool =
let wallEpoch = man.getLocalWallSlot().epoch
e >= man.EIP4844_FORK_EPOCH and
(wallEpoch < MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS or
e >= wallEpoch - MIN_EPOCHS_FOR_BLOBS_SIDECARS_REQUESTS)
proc getBlobsSidecars*[A, B](man: SyncManager[A, B], peer: A,
req: SyncRequest): Future[BlobsSidecarRes] {.async.} =
mixin getScore, `==`
logScope:
peer_score = peer.getScore()
peer_speed = peer.netKbps()
sync_ident = man.ident
direction = man.direction
topics = "syncman"
doAssert(not(req.isEmpty()), "Request must not be empty!")
debug "Requesting blobs sidecars from peer", request = req
try:
let res = await blobsSidecarsByRange(peer, req.slot, req.count)
if res.isErr():
debug "Error, while reading blobsSidecarsByRange response", request = req,
error = $res.error()
return
return res
except CancelledError:
debug "Interrupt, while waiting blobsSidecarsByRange response", request = req
return
except CatchableError as exc:
debug "Error, while waiting blobsSidecarsByRange response", request = req,
errName = exc.name, errMsg = exc.msg
return
proc remainingSlots(man: SyncManager): uint64 =
let
first = man.getFirstSlot()
@ -333,50 +376,87 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} =
try:
let blocks = await man.getBlocks(peer, req)
if blocks.isOk():
let data = blocks.get().asSeq()
let smap = getShortMap(req, data)
debug "Received blocks on request", blocks_count = len(data),
blocks_map = smap, request = req
if not(checkResponse(req, data)):
peer.updateScore(PeerScoreBadResponse)
warn "Received blocks sequence is not in requested range",
blocks_count = len(data), blocks_map = smap,
request = req
return
if len(data) == 0 and man.direction == SyncQueueKind.Backward and
req.contains(man.getSafeSlot()):
# The sync protocol does not distinguish between:
# - All requested slots are empty
# - Peer does not have data available about requested range
#
# However, we include the `backfill` slot in backward sync requests.
# If we receive an empty response to a request covering that slot,
# we know that the response is incomplete and can descore.
peer.updateScore(PeerScoreNoValues)
man.queue.push(req)
debug "Response does not include known-to-exist block", request = req
return
# Scoring will happen in `syncUpdate`.
man.workers[index].status = SyncWorkerStatus.Queueing
let
peerFinalized = peer.getFinalizedEpoch().start_slot()
lastSlot = req.slot + req.count
# The peer claims the block is finalized - our own block processing will
# verify this point down the line
# TODO descore peers that lie
maybeFinalized = lastSlot < peerFinalized
await man.queue.push(req, data, maybeFinalized, proc() =
man.workers[index].status = SyncWorkerStatus.Processing)
else:
if blocks.isErr():
peer.updateScore(PeerScoreNoValues)
man.queue.push(req)
debug "Failed to receive blocks on request", request = req
return
let blockData = blocks.get().asSeq()
let blockSmap = getShortMap(req, blockData)
debug "Received blocks on request", blocks_count = len(blockData),
blocks_map = blockSmap, request = req
let slots = mapIt(blockData, it[].slot)
if not(checkResponse(req, slots)):
peer.updateScore(PeerScoreBadResponse)
warn "Received blocks sequence is not in requested range",
blocks_count = len(blockData), blocks_map = blockSmap,
request = req
return
let blobData =
if man.shouldGetBlobs(req.slot.epoch):
let blobs = await man.getBlobsSidecars(peer, req)
if blobs.isErr():
peer.updateScore(PeerScoreNoValues)
man.queue.push(req)
debug "Failed to receive blobs on request", request = req
return
let blobData = blobs.get().asSeq()
let slots = mapIt(blobData, it[].beacon_block_slot)
if not(checkResponse(req, slots)):
peer.updateScore(PeerScoreBadResponse)
warn "Received blobs sequence is not in requested range",
blobs_count = len(blobData), blobs_map = getShortMap(req, blobData),
request = req
return
Opt.some(blobData)
else:
Opt.none(seq[ref BlobsSidecar])
if blobData.isSome:
let blobSmap = getShortMap(req, blobData.get(@[]))
debug "Received blobs on request", blobs_count = len(blobData.get(@[])),
blobs_map = blobSmap, request = req
let blobs = blobData.get()
if len(blobs) != len(blockData):
info "block and blobs have different lengths", blobs=len(blobs), blocks=len(blockData)
peer.updateScore(PeerScoreNoValues)
man.queue.push(req)
return
for i, blk in blockData:
if blk[].slot != blobs[i].beacon_block_slot:
peer.updateScore(PeerScoreNoValues)
man.queue.push(req)
debug "block and blobs data have inconsistent slots"
return
if len(blockData) == 0 and man.direction == SyncQueueKind.Backward and
req.contains(man.getSafeSlot()):
# The sync protocol does not distinguish between:
# - All requested slots are empty
# - Peer does not have data available about requested range
#
# However, we include the `backfill` slot in backward sync requests.
# If we receive an empty response to a request covering that slot,
# we know that the response is incomplete and can descore.
peer.updateScore(PeerScoreNoValues)
man.queue.push(req)
debug "Response does not include known-to-exist block", request = req
return
# Scoring will happen in `syncUpdate`.
man.workers[index].status = SyncWorkerStatus.Queueing
let
peerFinalized = peer.getFinalizedEpoch().start_slot()
lastSlot = req.slot + req.count
# The peer claims the block is finalized - our own block processing will
# verify this point down the line
# TODO descore peers that lie
maybeFinalized = lastSlot < peerFinalized
await man.queue.push(req, blockData, blobData, maybeFinalized, proc() =
man.workers[index].status = SyncWorkerStatus.Processing)
except CatchableError as exc:
debug "Unexpected exception while receiving blocks", request = req,

View File

@ -23,7 +23,7 @@ logScope:
const
MAX_REQUEST_BLOCKS* = 1024
MAX_REQUEST_BLOBS_SIDECARS = 128
MAX_REQUEST_BLOBS_SIDECARS* = 128
blockResponseCost = allowedOpsPerSecondCost(64) # Allow syncing ~64 blocks/sec (minus request costs)
@ -110,6 +110,12 @@ proc readChunkPayload*(
return ok newClone(ForkedSignedBeaconBlock.init(res.get))
else:
return err(res.error)
elif contextBytes == peer.network.forkDigests.eip4844:
let res = await readChunkPayload(conn, peer, eip4844.SignedBeaconBlock)
if res.isOk:
return ok newClone(ForkedSignedBeaconBlock.init(res.get))
else:
return err(res.error)
else:
return neterr InvalidContextBytes
@ -131,6 +137,24 @@ proc readChunkPayload*(
else:
return neterr InvalidContextBytes
proc readChunkPayload*(
conn: Connection, peer: Peer, MsgType: type (ref BlobsSidecar)):
Future[NetRes[MsgType]] {.async.} =
var contextBytes: ForkDigest
try:
await conn.readExactly(addr contextBytes, sizeof contextBytes)
except CatchableError:
return neterr UnexpectedEOF
if contextBytes == peer.network.forkDigests.eip4844:
let res = await readChunkPayload(conn, peer, BlobsSidecar)
if res.isOk:
return ok newClone(res.get)
else:
return err(res.error)
else:
return neterr InvalidContextBytes
proc readChunkPayload*(
conn: Connection, peer: Peer, MsgType: type SomeForkedLightClientObject):
Future[NetRes[MsgType]] {.async.} =
@ -509,7 +533,7 @@ p2pProtocol BeaconSync(version = 1,
startSlot: Slot,
reqCount: uint64,
response: MultipleChunksResponse[
BlobsSidecar, MAX_REQUEST_BLOBS_SIDECARS])
ref BlobsSidecar, MAX_REQUEST_BLOBS_SIDECARS])
{.async, libp2pProtocol("blobs_sidecars_by_range", 1).} =
# TODO This code is more complicated than it needs to be, since the type
# of the multiple chunks response is not actually used in this server

View File

@ -28,6 +28,10 @@ type
BlockVerifier* =
proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool):
Future[Result[void, VerifierError]] {.gcsafe, raises: [Defect].}
BlockBlobsVerifier* =
proc(signedBlock: ForkedSignedBeaconBlock, blobs: eip4844.BlobsSidecar,
maybeFinalized: bool):
Future[Result[void, VerifierError]] {.gcsafe, raises: [Defect].}
SyncQueueKind* {.pure.} = enum
Forward, Backward
@ -42,6 +46,7 @@ type
SyncResult*[T] = object
request*: SyncRequest[T]
data*: seq[ref ForkedSignedBeaconBlock]
blobs*: Opt[seq[ref BlobsSidecar]]
GapItem*[T] = object
start*: Slot
@ -74,6 +79,7 @@ type
readyQueue: HeapQueue[SyncResult[T]]
rewind: Option[RewindPoint]
blockVerifier: BlockVerifier
blockBlobsVerifier: BlockBlobsVerifier
ident*: string
chronicles.formatIt SyncQueueKind: toLowerAscii($it)
@ -109,6 +115,27 @@ proc getShortMap*[T](req: SyncRequest[T],
slider = slider + 1
res
proc getShortMap*[T](req: SyncRequest[T],
data: openArray[ref BlobsSidecar]): string =
## Returns all slot numbers in ``data`` as placement map.
var res = newStringOfCap(req.count)
var slider = req.slot
var last = 0
for i in 0 ..< req.count:
if last < len(data):
for k in last ..< len(data):
if slider == data[k].beacon_block_slot:
res.add('x')
last = k + 1
break
elif slider < data[k].beacon_block_slot:
res.add('.')
break
else:
res.add('.')
slider = slider + 1
res
proc contains*[T](req: SyncRequest[T], slot: Slot): bool {.inline.} =
slot >= req.slot and slot < req.slot + req.count
@ -116,7 +143,7 @@ proc cmp*[T](a, b: SyncRequest[T]): int =
cmp(uint64(a.slot), uint64(b.slot))
proc checkResponse*[T](req: SyncRequest[T],
data: openArray[ref ForkedSignedBeaconBlock]): bool =
data: openArray[Slot]): bool =
if len(data) == 0:
# Impossible to verify empty response.
return true
@ -131,9 +158,9 @@ proc checkResponse*[T](req: SyncRequest[T],
var dindex = 0
while (rindex < req.count) and (dindex < len(data)):
if slot < data[dindex][].slot:
if slot < data[dindex]:
discard
elif slot == data[dindex][].slot:
elif slot == data[dindex]:
inc(dindex)
else:
return false
@ -174,6 +201,7 @@ proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T],
start, final: Slot, chunkSize: uint64,
getSafeSlotCb: GetSlotCallback,
blockVerifier: BlockVerifier,
blockBlobsVerifier: BlockBlobsVerifier,
syncQueueSize: int = -1,
ident: string = "main"): SyncQueue[T] =
## Create new synchronization queue with parameters
@ -245,6 +273,7 @@ proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T],
inpSlot: start,
outSlot: start,
blockVerifier: blockVerifier,
blockBlobsVerifier: blockBlobsVerifier,
ident: ident
)
@ -578,6 +607,7 @@ func numAlreadyKnownSlots[T](sq: SyncQueue[T], sr: SyncRequest[T]): uint64 =
proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
data: seq[ref ForkedSignedBeaconBlock],
blobs: Opt[seq[ref BlobsSidecar]],
maybeFinalized: bool = false,
processingCb: ProcessingCallback = nil) {.async.} =
logScope:
@ -605,7 +635,7 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
# SyncQueue reset happens. We are exiting to wake up sync-worker.
return
else:
let syncres = SyncResult[T](request: sr, data: data)
let syncres = SyncResult[T](request: sr, data: data, blobs: blobs)
sq.readyQueue.push(syncres)
break
@ -654,8 +684,14 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
# Nim versions, remove workaround and move `res` into for loop
res: Result[void, VerifierError]
var i=0
for blk in sq.blocks(item):
res = await sq.blockVerifier(blk[], maybeFinalized)
if reqres.get().blobs.isNone():
res = await sq.blockVerifier(blk[], maybeFinalized)
else:
res = await sq.blockBlobsVerifier(blk[], reqres.get().blobs.get()[i][], maybeFinalized)
inc(i)
if res.isOk():
goodBlock = some(blk[].slot)
else:

View File

@ -7,7 +7,7 @@
{.used.}
import std/strutils
import std/[strutils, sequtils]
import unittest2
import chronos
import ../beacon_chain/gossip_processing/block_processor,
@ -57,6 +57,7 @@ proc collector(queue: AsyncQueue[BlockEntry]): BlockVerifier =
return fut
return verify
suite "SyncManager test suite":
proc createChain(start, finish: Slot): seq[ref ForkedSignedBeaconBlock] =
doAssert(start <= finish)
@ -85,7 +86,8 @@ suite "SyncManager test suite":
var queue = SyncQueue.init(SomeTPeer, kind,
Slot(0), Slot(0), 1'u64,
getStaticSlotCb(Slot(0)), collector(aq))
getStaticSlotCb(Slot(0)),
collector(aq), nil)
check:
len(queue) == 1
pendingLen(queue) == 0
@ -180,7 +182,8 @@ suite "SyncManager test suite":
let aq = newAsyncQueue[BlockEntry]()
var queue = SyncQueue.init(SomeTPeer, kind,
item[0], item[1], item[2],
getStaticSlotCb(item[0]), collector(aq))
getStaticSlotCb(item[0]),
collector(aq), nil)
check:
len(queue) == item[4]
pendingLen(queue) == item[5]
@ -204,11 +207,11 @@ suite "SyncManager test suite":
of SyncQueueKind.Forward:
SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
Slot(0), Slot(1), 1'u64,
getStaticSlotCb(Slot(0)), collector(aq))
getStaticSlotCb(Slot(0)), collector(aq), nil)
of SyncQueueKind.Backward:
SyncQueue.init(SomeTPeer, SyncQueueKind.Backward,
Slot(1), Slot(0), 1'u64,
getStaticSlotCb(Slot(1)), collector(aq))
getStaticSlotCb(Slot(1)), collector(aq), nil)
let p1 = SomeTPeer()
let p2 = SomeTPeer()
@ -302,11 +305,11 @@ suite "SyncManager test suite":
of SyncQueueKind.Forward:
SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
start, finish, chunkSize,
getStaticSlotCb(start), collector(aq))
getStaticSlotCb(start), collector(aq), nil)
of SyncQueueKind.Backward:
SyncQueue.init(SomeTPeer, SyncQueueKind.Backward,
finish, start, chunkSize,
getStaticSlotCb(finish), collector(aq))
getStaticSlotCb(finish), collector(aq), nil)
chain = createChain(start, finish)
validatorFut =
case kkind
@ -322,7 +325,8 @@ suite "SyncManager test suite":
var request = queue.pop(finish, p1)
if request.isEmpty():
break
await queue.push(request, getSlice(chain, start, request))
await queue.push(request, getSlice(chain, start, request),
Opt.none(seq[ref BlobsSidecar]))
await validatorFut.cancelAndWait()
waitFor runSmokeTest()
@ -372,12 +376,12 @@ suite "SyncManager test suite":
of SyncQueueKind.Forward:
SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
startSlot, finishSlot, chunkSize,
getStaticSlotCb(startSlot), collector(aq),
getStaticSlotCb(startSlot), collector(aq), nil,
queueSize)
of SyncQueueKind.Backward:
SyncQueue.init(SomeTPeer, SyncQueueKind.Backward,
finishSlot, startSlot, chunkSize,
getStaticSlotCb(finishSlot), collector(aq),
getStaticSlotCb(finishSlot), collector(aq), nil,
queueSize)
validatorFut =
case kkind
@ -396,7 +400,8 @@ suite "SyncManager test suite":
var r12 = queue.pop(finishSlot, p2)
var r13 = queue.pop(finishSlot, p3)
var f13 = queue.push(r13, chain.getSlice(startSlot, r13))
var f13 = queue.push(r13, chain.getSlice(startSlot, r13),
Opt.none(seq[ref BlobsSidecar]))
await sleepAsync(100.milliseconds)
check:
f13.finished == false
@ -404,7 +409,8 @@ suite "SyncManager test suite":
of SyncQueueKind.Forward: counter == int(startSlot)
of SyncQueueKind.Backward: counter == int(finishSlot)
var f11 = queue.push(r11, chain.getSlice(startSlot, r11))
var f11 = queue.push(r11, chain.getSlice(startSlot, r11),
Opt.none(seq[ref BlobsSidecar]))
await sleepAsync(100.milliseconds)
check:
case kkind
@ -413,7 +419,8 @@ suite "SyncManager test suite":
f11.finished == true and f11.failed == false
f13.finished == false
var f12 = queue.push(r12, chain.getSlice(startSlot, r12))
var f12 = queue.push(r12, chain.getSlice(startSlot, r12),
Opt.none(seq[ref BlobsSidecar]))
await allFutures(f11, f12, f13)
check:
f12.finished == true and f12.failed == false
@ -477,11 +484,11 @@ suite "SyncManager test suite":
of SyncQueueKind.Forward:
SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
start, finish, chunkSize,
getFowardSafeSlotCb, collector(aq))
getFowardSafeSlotCb, collector(aq), nil)
of SyncQueueKind.Backward:
SyncQueue.init(SomeTPeer, SyncQueueKind.Backward,
finish, start, chunkSize,
getBackwardSafeSlotCb, collector(aq))
getBackwardSafeSlotCb, collector(aq), nil)
chain = createChain(start, finish)
validatorFut =
case kkind
@ -516,7 +523,7 @@ suite "SyncManager test suite":
check response[0][].slot >= getFowardSafeSlotCb()
else:
check response[^1][].slot <= getBackwardSafeSlotCb()
await queue.push(request, response)
await queue.push(request, response, Opt.none(seq[ref BlobsSidecar]))
await validatorFut.cancelAndWait()
waitFor runTest()
@ -570,11 +577,11 @@ suite "SyncManager test suite":
of SyncQueueKind.Forward:
SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
start, finish, chunkSize,
getFowardSafeSlotCb, collector(aq))
getFowardSafeSlotCb, collector(aq), nil)
of SyncQueueKind.Backward:
SyncQueue.init(SomeTPeer, SyncQueueKind.Backward,
finish, start, chunkSize,
getBackwardSafeSlotCb, collector(aq))
getBackwardSafeSlotCb, collector(aq), nil)
chain = createChain(start, finish)
validatorFut = failingValidator(aq)
@ -599,7 +606,7 @@ suite "SyncManager test suite":
# Handle request 1. Should be re-enqueued as it simulates `Invalid`.
let response1 = getSlice(chain, start, request1)
await queue.push(request1, response1)
await queue.push(request1, response1, Opt.none(seq[ref BlobsSidecar]))
check debtLen(queue) == request2.count + request1.count
# Request 1 should be discarded as it is no longer relevant.
@ -611,7 +618,7 @@ suite "SyncManager test suite":
# Handle request 3. Should be re-enqueued as it simulates `Invalid`.
let response3 = getSlice(chain, start, request3)
await queue.push(request3, response3)
await queue.push(request3, response3, Opt.none(seq[ref BlobsSidecar]))
check debtLen(queue) == request3.count
# Request 2 should be re-issued.
@ -625,7 +632,7 @@ suite "SyncManager test suite":
# Handle request 4. Should be re-enqueued as it simulates `Invalid`.
let response4 = getSlice(chain, start, request4)
await queue.push(request4, response4)
await queue.push(request4, response4, Opt.none(seq[ref BlobsSidecar]))
check debtLen(queue) == request4.count
# Advance `safeSlot` out of band.
@ -721,7 +728,7 @@ suite "SyncManager test suite":
chain = createChain(startSlot, finishSlot)
queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
startSlot, finishSlot, chunkSize,
getStaticSlotCb(startSlot), collector(aq),
getStaticSlotCb(startSlot), collector(aq), nil,
queueSize)
validatorFut = forwardValidator(aq)
@ -741,20 +748,23 @@ suite "SyncManager test suite":
var r13 = queue.pop(finishSlot, p3)
var r14 = queue.pop(finishSlot, p4)
var f14 = queue.push(r14, chain.getSlice(startSlot, r14))
var f14 = queue.push(r14, chain.getSlice(startSlot, r14),
Opt.none(seq[ref BlobsSidecar]))
await sleepAsync(100.milliseconds)
check:
f14.finished == false
counter == int(startSlot)
var f12 = queue.push(r12, chain.getSlice(startSlot, r12))
var f12 = queue.push(r12, chain.getSlice(startSlot, r12),
Opt.none(seq[ref BlobsSidecar]))
await sleepAsync(100.milliseconds)
check:
counter == int(startSlot)
f12.finished == false
f14.finished == false
var f11 = queue.push(r11, chain.getSlice(startSlot, r11))
var f11 = queue.push(r11, chain.getSlice(startSlot, r11),
Opt.none(seq[ref BlobsSidecar]))
await allFutures(f11, f12)
check:
counter == int(startSlot + chunkSize + chunkSize)
@ -765,7 +775,8 @@ suite "SyncManager test suite":
var missingSlice = chain.getSlice(startSlot, r13)
withBlck(missingSlice[0][]):
blck.message.proposer_index = 0xDEADBEAF'u64
var f13 = queue.push(r13, missingSlice)
var f13 = queue.push(r13, missingSlice,
Opt.none(seq[ref BlobsSidecar]))
await allFutures(f13, f14)
check:
f11.finished == true and f11.failed == false
@ -786,15 +797,18 @@ suite "SyncManager test suite":
check r18.isEmpty() == true
var f17 = queue.push(r17, chain.getSlice(startSlot, r17))
var f17 = queue.push(r17, chain.getSlice(startSlot, r17),
Opt.none(seq[ref BlobsSidecar]))
await sleepAsync(100.milliseconds)
check f17.finished == false
var f16 = queue.push(r16, chain.getSlice(startSlot, r16))
var f16 = queue.push(r16, chain.getSlice(startSlot, r16),
Opt.none(seq[ref BlobsSidecar]))
await sleepAsync(100.milliseconds)
check f16.finished == false
var f15 = queue.push(r15, chain.getSlice(startSlot, r15))
var f15 = queue.push(r15, chain.getSlice(startSlot, r15),
Opt.none(seq[ref BlobsSidecar]))
await allFutures(f15, f16, f17)
check:
f15.finished == true and f15.failed == false
@ -829,7 +843,7 @@ suite "SyncManager test suite":
chain = createChain(startSlot, finishSlot)
queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
startSlot, finishSlot, chunkSize,
getStaticSlotCb(startSlot), collector(aq),
getStaticSlotCb(startSlot), collector(aq), nil,
queueSize)
validatorFut = forwardValidator(aq)
@ -840,7 +854,8 @@ suite "SyncManager test suite":
var r11 = queue.pop(finishSlot, p1)
# Push a single request that will fail with all blocks being unviable
var f11 = queue.push(r11, chain.getSlice(startSlot, r11))
var f11 = queue.push(r11, chain.getSlice(startSlot, r11),
Opt.none(seq[ref BlobsSidecar]))
discard await f11.withTimeout(100.milliseconds)
check:
@ -887,7 +902,7 @@ suite "SyncManager test suite":
chain = createChain(startSlot, finishSlot)
queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Backward,
finishSlot, startSlot, chunkSize,
getSafeSlot, collector(aq), queueSize)
getSafeSlot, collector(aq), nil, queueSize)
validatorFut = backwardValidator(aq)
let
@ -905,20 +920,23 @@ suite "SyncManager test suite":
var r13 = queue.pop(finishSlot, p3)
var r14 = queue.pop(finishSlot, p4)
var f14 = queue.push(r14, chain.getSlice(startSlot, r14))
var f14 = queue.push(r14, chain.getSlice(startSlot, r14),
Opt.none(seq[ref BlobsSidecar]))
await sleepAsync(100.milliseconds)
check:
f14.finished == false
counter == int(finishSlot)
var f12 = queue.push(r12, chain.getSlice(startSlot, r12))
var f12 = queue.push(r12, chain.getSlice(startSlot, r12),
Opt.none(seq[ref BlobsSidecar]))
await sleepAsync(100.milliseconds)
check:
counter == int(finishSlot)
f12.finished == false
f14.finished == false
var f11 = queue.push(r11, chain.getSlice(startSlot, r11))
var f11 = queue.push(r11, chain.getSlice(startSlot, r11),
Opt.none(seq[ref BlobsSidecar]))
await allFutures(f11, f12)
check:
counter == int(finishSlot - chunkSize - chunkSize)
@ -929,7 +947,7 @@ suite "SyncManager test suite":
var missingSlice = chain.getSlice(startSlot, r13)
withBlck(missingSlice[0][]):
blck.message.proposer_index = 0xDEADBEAF'u64
var f13 = queue.push(r13, missingSlice)
var f13 = queue.push(r13, missingSlice, Opt.none(seq[ref BlobsSidecar]))
await allFutures(f13, f14)
check:
f11.finished == true and f11.failed == false
@ -946,11 +964,13 @@ suite "SyncManager test suite":
check r17.isEmpty() == true
var f16 = queue.push(r16, chain.getSlice(startSlot, r16))
var f16 = queue.push(r16, chain.getSlice(startSlot, r16),
Opt.none(seq[ref BlobsSidecar]))
await sleepAsync(100.milliseconds)
check f16.finished == false
var f15 = queue.push(r15, chain.getSlice(startSlot, r15))
var f15 = queue.push(r15, chain.getSlice(startSlot, r15),
Opt.none(seq[ref BlobsSidecar]))
await allFutures(f15, f16)
check:
f15.finished == true and f15.failed == false
@ -1014,25 +1034,26 @@ suite "SyncManager test suite":
let chain = createChain(Slot(10), Slot(20))
let r1 = SyncRequest[SomeTPeer](slot: Slot(11), count: 1'u64)
let r21 = SyncRequest[SomeTPeer](slot: Slot(11), count: 2'u64)
let slots = mapIt(chain, it[].slot)
check:
checkResponse(r1, @[chain[1]]) == true
checkResponse(r1, @[slots[1]]) == true
checkResponse(r1, @[]) == true
checkResponse(r1, @[chain[1], chain[1]]) == false
checkResponse(r1, @[chain[0]]) == false
checkResponse(r1, @[chain[2]]) == false
checkResponse(r1, @[slots[1], slots[1]]) == false
checkResponse(r1, @[slots[0]]) == false
checkResponse(r1, @[slots[2]]) == false
checkResponse(r21, @[chain[1]]) == true
checkResponse(r21, @[slots[1]]) == true
checkResponse(r21, @[]) == true
checkResponse(r21, @[chain[1], chain[2]]) == true
checkResponse(r21, @[chain[2]]) == true
checkResponse(r21, @[chain[1], chain[2], chain[3]]) == false
checkResponse(r21, @[chain[0], chain[1]]) == false
checkResponse(r21, @[chain[0]]) == false
checkResponse(r21, @[chain[2], chain[1]]) == false
checkResponse(r21, @[chain[2], chain[1]]) == false
checkResponse(r21, @[chain[2], chain[3]]) == false
checkResponse(r21, @[chain[3]]) == false
checkResponse(r21, @[slots[1], slots[2]]) == true
checkResponse(r21, @[slots[2]]) == true
checkResponse(r21, @[slots[1], slots[2], slots[3]]) == false
checkResponse(r21, @[slots[0], slots[1]]) == false
checkResponse(r21, @[slots[0]]) == false
checkResponse(r21, @[slots[2], slots[1]]) == false
checkResponse(r21, @[slots[2], slots[1]]) == false
checkResponse(r21, @[slots[2], slots[3]]) == false
checkResponse(r21, @[slots[3]]) == false
test "[SyncQueue#Forward] getRewindPoint() test":
let aq = newAsyncQueue[BlockEntry]()
@ -1040,7 +1061,7 @@ suite "SyncManager test suite":
var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64),
1'u64, getStaticSlotCb(Slot(0)),
collector(aq), 2)
collector(aq), nil, 2)
let finalizedSlot = start_slot(Epoch(0'u64))
let startSlot = start_slot(Epoch(0'u64)) + 1'u64
let finishSlot = start_slot(Epoch(2'u64))
@ -1052,7 +1073,7 @@ suite "SyncManager test suite":
var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64),
1'u64, getStaticSlotCb(Slot(0)),
collector(aq), 2)
collector(aq), nil, 2)
let finalizedSlot = start_slot(Epoch(1'u64))
let startSlot = start_slot(Epoch(1'u64)) + 1'u64
let finishSlot = start_slot(Epoch(3'u64))
@ -1064,7 +1085,7 @@ suite "SyncManager test suite":
var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64),
1'u64, getStaticSlotCb(Slot(0)),
collector(aq), 2)
collector(aq), nil, 2)
let finalizedSlot = start_slot(Epoch(0'u64))
let failSlot = Slot(0xFFFF_FFFF_FFFF_FFFFF'u64)
let failEpoch = epoch(failSlot)
@ -1082,7 +1103,7 @@ suite "SyncManager test suite":
var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Forward,
Slot(0), Slot(0xFFFF_FFFF_FFFF_FFFFF'u64),
1'u64, getStaticSlotCb(Slot(0)),
collector(aq), 2)
collector(aq), nil, 2)
let finalizedSlot = start_slot(Epoch(1'u64))
let failSlot = Slot(0xFFFF_FFFF_FFFF_FFFFF'u64)
let failEpoch = epoch(failSlot)
@ -1101,7 +1122,7 @@ suite "SyncManager test suite":
let getSafeSlot = getStaticSlotCb(Slot(1024))
var queue = SyncQueue.init(SomeTPeer, SyncQueueKind.Backward,
Slot(1024), Slot(0),
1'u64, getSafeSlot, collector(aq), 2)
1'u64, getSafeSlot, collector(aq), nil, 2)
let safeSlot = getSafeSlot()
for i in countdown(1023, 0):
check queue.getRewindPoint(Slot(i), safeSlot) == safeSlot