More spec compliant blocksByRange requests
* Eliminate possibilities for range errors and overflows * Handle more properly invalid requests for furute slots * Eliminate the confusing surrounding the MAX_REQUEST_BLOCKS constant Addresses https://github.com/status-im/nim-beacon-chain/issues/1366
This commit is contained in:
parent
7763df95a4
commit
30a8ec410d
|
@ -394,7 +394,7 @@ func getRef*(dag: ChainDAGRef, root: Eth2Digest): BlockRef =
|
|||
dag.blocks.getOrDefault(root, nil)
|
||||
|
||||
func getBlockRange*(
|
||||
dag: ChainDAGRef, startSlot: Slot, skipStep: Natural,
|
||||
dag: ChainDAGRef, startSlot: Slot, skipStep: uint64,
|
||||
output: var openArray[BlockRef]): Natural =
|
||||
## This function populates an `output` buffer of blocks
|
||||
## with a slots ranging from `startSlot` up to, but not including,
|
||||
|
@ -407,17 +407,21 @@ func getBlockRange*(
|
|||
## at this index.
|
||||
##
|
||||
## If there were no blocks in the range, `output.len` will be returned.
|
||||
let count = output.len
|
||||
let requestedCount = output.lenu64
|
||||
trace "getBlockRange entered",
|
||||
head = shortLog(dag.head.root), count, startSlot, skipStep
|
||||
head = shortLog(dag.head.root), requestedCount, startSlot, skipStep
|
||||
|
||||
let
|
||||
skipStep = max(1, skipStep) # Treat 0 step as 1
|
||||
endSlot = startSlot + uint64(count * skipStep)
|
||||
headSlot = dag.head.slot
|
||||
runway = if headSlot > startSlot: uint64(headSlot - startSlot)
|
||||
else: return output.len # Identical to returning an empty set of block as indicated above
|
||||
skipStep = max(skipStep, 1) # Treat 0 step as 1
|
||||
count = min(1'u64 + (runway div skipStep), requestedCount)
|
||||
endSlot = startSlot + count * skipStep
|
||||
|
||||
var
|
||||
b = dag.head.atSlot(endSlot)
|
||||
o = count
|
||||
o = output.len
|
||||
for i in 0..<count:
|
||||
for j in 0..<skipStep:
|
||||
b = b.parent
|
||||
|
@ -425,10 +429,6 @@ func getBlockRange*(
|
|||
dec o
|
||||
output[o] = b.blck
|
||||
|
||||
# Make sure the given input is cleared, just in case
|
||||
for i in 0..<o:
|
||||
output[i] = nil
|
||||
|
||||
o # Return the index of the first non-nil item in the output
|
||||
|
||||
func getBlockBySlot*(dag: ChainDAGRef, slot: Slot): BlockRef =
|
||||
|
|
|
@ -7,8 +7,8 @@ logScope:
|
|||
topics = "requman"
|
||||
|
||||
const
|
||||
MAX_REQUEST_BLOCKS* = 4 # Specification's value is 1024.
|
||||
## Maximum number of blocks, which can be requested by beaconBlocksByRoot.
|
||||
SYNC_MAX_REQUESTED_BLOCKS* = 4 # Spec allows up to MAX_REQUEST_BLOCKS.
|
||||
## Maximum number of blocks which will be requested in each `beaconBlocksByRoot` invocation.
|
||||
PARALLEL_REQUESTS* = 2
|
||||
## Number of peers we using to resolve our request.
|
||||
|
||||
|
@ -88,7 +88,7 @@ proc requestManagerLoop(rman: RequestManager) {.async.} =
|
|||
let req = await rman.queue.popFirst()
|
||||
rootList.add(req.root)
|
||||
|
||||
var count = min(MAX_REQUEST_BLOCKS - 1, len(rman.queue))
|
||||
var count = min(SYNC_MAX_REQUESTED_BLOCKS - 1, len(rman.queue))
|
||||
while count > 0:
|
||||
rootList.add(rman.queue.popFirstNoWait().root)
|
||||
dec(count)
|
||||
|
|
|
@ -9,10 +9,7 @@ logScope:
|
|||
topics = "sync"
|
||||
|
||||
const
|
||||
MAX_REQUESTED_BLOCKS = SLOTS_PER_EPOCH * 4
|
||||
# A boundary on the number of blocks we'll allow in any single block
|
||||
# request - typically clients will ask for an epoch or so at a time, but we
|
||||
# allow a little bit more in case they want to stream blocks faster
|
||||
MAX_REQUEST_BLOCKS = 1024
|
||||
|
||||
type
|
||||
StatusMsg* = object
|
||||
|
@ -46,7 +43,7 @@ type
|
|||
blockRoot: Eth2Digest
|
||||
slot: Slot
|
||||
|
||||
BlockRootsList* = List[Eth2Digest, Limit MAX_REQUESTED_BLOCKS]
|
||||
BlockRootsList* = List[Eth2Digest, Limit MAX_REQUEST_BLOCKS]
|
||||
|
||||
proc shortLog*(s: StatusMsg): auto =
|
||||
(
|
||||
|
@ -135,23 +132,24 @@ p2pProtocol BeaconSync(version = 1,
|
|||
proc beaconBlocksByRange(
|
||||
peer: Peer,
|
||||
startSlot: Slot,
|
||||
count: uint64,
|
||||
step: uint64,
|
||||
reqCount: uint64,
|
||||
reqStep: uint64,
|
||||
response: MultipleChunksResponse[SignedBeaconBlock])
|
||||
{.async, libp2pProtocol("beacon_blocks_by_range", 1).} =
|
||||
trace "got range request", peer, startSlot, count, step
|
||||
|
||||
if count > 0'u64:
|
||||
var blocks: array[MAX_REQUESTED_BLOCKS, BlockRef]
|
||||
trace "got range request", peer, startSlot,
|
||||
count = reqCount, step = reqStep
|
||||
if reqCount > 0'u64:
|
||||
var blocks: array[MAX_REQUEST_BLOCKS, BlockRef]
|
||||
let
|
||||
chainDag = peer.networkState.chainDag
|
||||
# Limit number of blocks in response
|
||||
count = min(count.Natural, blocks.len)
|
||||
count = int min(reqCount, blocks.lenu64)
|
||||
|
||||
let
|
||||
endIndex = count - 1
|
||||
startIndex =
|
||||
chainDag.getBlockRange(startSlot, step, blocks.toOpenArray(0, endIndex))
|
||||
chainDag.getBlockRange(startSlot, reqStep,
|
||||
blocks.toOpenArray(0, endIndex))
|
||||
|
||||
for b in blocks[startIndex..endIndex]:
|
||||
doAssert not b.isNil, "getBlockRange should return non-nil blocks only"
|
||||
|
@ -159,7 +157,7 @@ p2pProtocol BeaconSync(version = 1,
|
|||
await response.write(chainDag.get(b).data)
|
||||
|
||||
debug "Block range request done",
|
||||
peer, startSlot, count, step, found = count - startIndex
|
||||
peer, startSlot, count, reqStep, found = count - startIndex
|
||||
|
||||
proc beaconBlocksByRoot(
|
||||
peer: Peer,
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
|
||||
## Generated at line 87
|
||||
## Generated at line 84
|
||||
type
|
||||
BeaconSync* = object
|
||||
template State*(PROTO: type BeaconSync): type =
|
||||
|
@ -45,8 +45,8 @@ template RecType*(MSG: type getMetadataObj): untyped =
|
|||
type
|
||||
beaconBlocksByRangeObj* = object
|
||||
startSlot*: Slot
|
||||
count*: uint64
|
||||
step*: uint64
|
||||
reqCount*: uint64
|
||||
reqStep*: uint64
|
||||
|
||||
template beaconBlocksByRange*(PROTO: type BeaconSync): type =
|
||||
beaconBlocksByRangeObj
|
||||
|
@ -110,7 +110,8 @@ proc getMetadata*(peer: Peer; timeout: Duration = milliseconds(10000'i64)): Futu
|
|||
makeEth2Request(peer, "/eth2/beacon_chain/req/metadata/1/", msgBytes,
|
||||
Eth2Metadata, timeout)
|
||||
|
||||
proc beaconBlocksByRange*(peer: Peer; startSlot: Slot; count: uint64; step: uint64;
|
||||
proc beaconBlocksByRange*(peer: Peer; startSlot: Slot; reqCount: uint64;
|
||||
reqStep: uint64;
|
||||
timeout: Duration = milliseconds(10000'i64)): Future[
|
||||
NetRes[seq[SignedBeaconBlock]]] {.gcsafe, libp2pProtocol(
|
||||
"beacon_blocks_by_range", 1).} =
|
||||
|
@ -118,8 +119,8 @@ proc beaconBlocksByRange*(peer: Peer; startSlot: Slot; count: uint64; step: uint
|
|||
var writer = init(WriterType(SSZ), outputStream)
|
||||
var recordWriterCtx = beginRecord(writer, beaconBlocksByRangeObj)
|
||||
writeField(writer, recordWriterCtx, "startSlot", startSlot)
|
||||
writeField(writer, recordWriterCtx, "count", count)
|
||||
writeField(writer, recordWriterCtx, "step", step)
|
||||
writeField(writer, recordWriterCtx, "reqCount", reqCount)
|
||||
writeField(writer, recordWriterCtx, "reqStep", reqStep)
|
||||
endRecord(writer, recordWriterCtx)
|
||||
let msgBytes = getOutput(outputStream)
|
||||
makeEth2Request(peer, "/eth2/beacon_chain/req/beacon_blocks_by_range/1/",
|
||||
|
@ -187,8 +188,8 @@ proc getMetadataUserHandler(peer: Peer): Eth2Metadata {.
|
|||
|
||||
return peer.network.metadata
|
||||
|
||||
proc beaconBlocksByRangeUserHandler(peer: Peer; startSlot: Slot; count: uint64;
|
||||
step: uint64; response: MultipleChunksResponse[
|
||||
proc beaconBlocksByRangeUserHandler(peer: Peer; startSlot: Slot; reqCount: uint64;
|
||||
reqStep: uint64; response: MultipleChunksResponse[
|
||||
SignedBeaconBlock]) {.async, libp2pProtocol("beacon_blocks_by_range", 1), gcsafe.} =
|
||||
type
|
||||
CurrentProtocol = BeaconSync
|
||||
|
@ -199,21 +200,21 @@ proc beaconBlocksByRangeUserHandler(peer: Peer; startSlot: Slot; count: uint64;
|
|||
cast[ref[BeaconSyncNetworkState:ObjectType]](getNetworkState(peer.network,
|
||||
BeaconSyncProtocol))
|
||||
|
||||
trace "got range request", peer, startSlot, count, step
|
||||
if count > 0'u64:
|
||||
var blocks: array[MAX_REQUESTED_BLOCKS, BlockRef]
|
||||
trace "got range request", peer, startSlot, count = reqCount, step = reqStep
|
||||
if reqCount > 0'u64:
|
||||
var blocks: array[MAX_REQUEST_BLOCKS, BlockRef]
|
||||
let
|
||||
chainDag = peer.networkState.chainDag
|
||||
count = min(count.Natural, blocks.len)
|
||||
count = int min(reqCount, blocks.lenu64)
|
||||
let
|
||||
endIndex = count - 1
|
||||
startIndex = chainDag.getBlockRange(startSlot, step,
|
||||
startIndex = chainDag.getBlockRange(startSlot, reqStep,
|
||||
blocks.toOpenArray(0, endIndex))
|
||||
for b in blocks[startIndex .. endIndex]:
|
||||
doAssert not b.isNil, "getBlockRange should return non-nil blocks only"
|
||||
trace "wrote response block", slot = b.slot, roor = shortLog(b.root)
|
||||
await response.write(chainDag.get(b).data)
|
||||
debug "Block range request done", peer, startSlot, count, step,
|
||||
debug "Block range request done", peer, startSlot, count, reqStep,
|
||||
found = count - startIndex
|
||||
|
||||
proc beaconBlocksByRootUserHandler(peer: Peer; blockRoots: BlockRootsList; response: MultipleChunksResponse[
|
||||
|
@ -287,7 +288,8 @@ proc getMetadataMounter(network: Eth2Node) =
|
|||
template callUserHandler(MSG: type beaconBlocksByRangeObj; peer: Peer;
|
||||
stream: Connection; msg: beaconBlocksByRangeObj): untyped =
|
||||
var response = init(MultipleChunksResponse[SignedBeaconBlock], peer, stream)
|
||||
beaconBlocksByRangeUserHandler(peer, msg.startSlot, msg.count, msg.step, response)
|
||||
beaconBlocksByRangeUserHandler(peer, msg.startSlot, msg.reqCount, msg.reqStep,
|
||||
response)
|
||||
|
||||
proc beaconBlocksByRangeMounter(network: Eth2Node) =
|
||||
proc snappyThunk(stream: Connection; protocol: string): Future[void] {.gcsafe.} =
|
||||
|
|
|
@ -148,6 +148,8 @@ suiteReport "Block pool processing" & preset():
|
|||
|
||||
var blocks: array[3, BlockRef]
|
||||
|
||||
|
||||
|
||||
check:
|
||||
dag.getBlockRange(Slot(0), 1, blocks.toOpenArray(0, 0)) == 0
|
||||
blocks[0..<1] == [dag.tail]
|
||||
|
@ -159,7 +161,7 @@ suiteReport "Block pool processing" & preset():
|
|||
blocks[0..<2] == [dag.tail, b2Add[]]
|
||||
|
||||
dag.getBlockRange(Slot(0), 3, blocks.toOpenArray(0, 1)) == 1
|
||||
blocks[0..<2] == [nil, dag.tail] # block 3 is missing!
|
||||
blocks[1..<2] == [dag.tail] # block 3 is missing!
|
||||
|
||||
dag.getBlockRange(Slot(2), 2, blocks.toOpenArray(0, 1)) == 0
|
||||
blocks[0..<2] == [b2Add[], b4Add[]] # block 3 is missing!
|
||||
|
@ -172,7 +174,7 @@ suiteReport "Block pool processing" & preset():
|
|||
|
||||
# No blocks in sight either due to gaps
|
||||
dag.getBlockRange(Slot(3), 2, blocks.toOpenArray(0, 1)) == 2
|
||||
blocks[0..<2] == [BlockRef nil, nil] # block 3 is missing!
|
||||
blocks[2..<2].len == 0
|
||||
|
||||
timedTest "Reverse order block add & get" & preset():
|
||||
let missing = dag.addRawBlock(quarantine, b2, nil)
|
||||
|
|
Loading…
Reference in New Issue