From 30a8ec410d5458372d43787981f6c0d358247463 Mon Sep 17 00:00:00 2001 From: Zahary Karadjov Date: Thu, 6 Aug 2020 02:22:12 +0300 Subject: [PATCH] More spec compliant blocksByRange requests * Eliminate possibilities for range errors and overflows * Handle more properly invalid requests for furute slots * Eliminate the confusing surrounding the MAX_REQUEST_BLOCKS constant Addresses https://github.com/status-im/nim-beacon-chain/issues/1366 --- beacon_chain/block_pools/chain_dag.nim | 20 ++++++------ beacon_chain/request_manager.nim | 6 ++-- beacon_chain/sync_protocol.nim | 26 ++++++++-------- beacon_chain/sync_protocol.nim.generated.nim | 32 +++++++++++--------- tests/test_block_pool.nim | 6 ++-- 5 files changed, 46 insertions(+), 44 deletions(-) diff --git a/beacon_chain/block_pools/chain_dag.nim b/beacon_chain/block_pools/chain_dag.nim index 9a541a2dd..d296ab6d6 100644 --- a/beacon_chain/block_pools/chain_dag.nim +++ b/beacon_chain/block_pools/chain_dag.nim @@ -394,7 +394,7 @@ func getRef*(dag: ChainDAGRef, root: Eth2Digest): BlockRef = dag.blocks.getOrDefault(root, nil) func getBlockRange*( - dag: ChainDAGRef, startSlot: Slot, skipStep: Natural, + dag: ChainDAGRef, startSlot: Slot, skipStep: uint64, output: var openArray[BlockRef]): Natural = ## This function populates an `output` buffer of blocks ## with a slots ranging from `startSlot` up to, but not including, @@ -407,17 +407,21 @@ func getBlockRange*( ## at this index. ## ## If there were no blocks in the range, `output.len` will be returned. - let count = output.len + let requestedCount = output.lenu64 trace "getBlockRange entered", - head = shortLog(dag.head.root), count, startSlot, skipStep + head = shortLog(dag.head.root), requestedCount, startSlot, skipStep let - skipStep = max(1, skipStep) # Treat 0 step as 1 - endSlot = startSlot + uint64(count * skipStep) + headSlot = dag.head.slot + runway = if headSlot > startSlot: uint64(headSlot - startSlot) + else: return output.len # Identical to returning an empty set of block as indicated above + skipStep = max(skipStep, 1) # Treat 0 step as 1 + count = min(1'u64 + (runway div skipStep), requestedCount) + endSlot = startSlot + count * skipStep var b = dag.head.atSlot(endSlot) - o = count + o = output.len for i in 0.. 0: rootList.add(rman.queue.popFirstNoWait().root) dec(count) diff --git a/beacon_chain/sync_protocol.nim b/beacon_chain/sync_protocol.nim index d62c65591..081ba5846 100644 --- a/beacon_chain/sync_protocol.nim +++ b/beacon_chain/sync_protocol.nim @@ -9,10 +9,7 @@ logScope: topics = "sync" const - MAX_REQUESTED_BLOCKS = SLOTS_PER_EPOCH * 4 - # A boundary on the number of blocks we'll allow in any single block - # request - typically clients will ask for an epoch or so at a time, but we - # allow a little bit more in case they want to stream blocks faster + MAX_REQUEST_BLOCKS = 1024 type StatusMsg* = object @@ -46,7 +43,7 @@ type blockRoot: Eth2Digest slot: Slot - BlockRootsList* = List[Eth2Digest, Limit MAX_REQUESTED_BLOCKS] + BlockRootsList* = List[Eth2Digest, Limit MAX_REQUEST_BLOCKS] proc shortLog*(s: StatusMsg): auto = ( @@ -135,23 +132,24 @@ p2pProtocol BeaconSync(version = 1, proc beaconBlocksByRange( peer: Peer, startSlot: Slot, - count: uint64, - step: uint64, + reqCount: uint64, + reqStep: uint64, response: MultipleChunksResponse[SignedBeaconBlock]) {.async, libp2pProtocol("beacon_blocks_by_range", 1).} = - trace "got range request", peer, startSlot, count, step - - if count > 0'u64: - var blocks: array[MAX_REQUESTED_BLOCKS, BlockRef] + trace "got range request", peer, startSlot, + count = reqCount, step = reqStep + if reqCount > 0'u64: + var blocks: array[MAX_REQUEST_BLOCKS, BlockRef] let chainDag = peer.networkState.chainDag # Limit number of blocks in response - count = min(count.Natural, blocks.len) + count = int min(reqCount, blocks.lenu64) let endIndex = count - 1 startIndex = - chainDag.getBlockRange(startSlot, step, blocks.toOpenArray(0, endIndex)) + chainDag.getBlockRange(startSlot, reqStep, + blocks.toOpenArray(0, endIndex)) for b in blocks[startIndex..endIndex]: doAssert not b.isNil, "getBlockRange should return non-nil blocks only" @@ -159,7 +157,7 @@ p2pProtocol BeaconSync(version = 1, await response.write(chainDag.get(b).data) debug "Block range request done", - peer, startSlot, count, step, found = count - startIndex + peer, startSlot, count, reqStep, found = count - startIndex proc beaconBlocksByRoot( peer: Peer, diff --git a/beacon_chain/sync_protocol.nim.generated.nim b/beacon_chain/sync_protocol.nim.generated.nim index 29be47474..819cec8dc 100644 --- a/beacon_chain/sync_protocol.nim.generated.nim +++ b/beacon_chain/sync_protocol.nim.generated.nim @@ -1,5 +1,5 @@ -## Generated at line 87 +## Generated at line 84 type BeaconSync* = object template State*(PROTO: type BeaconSync): type = @@ -45,8 +45,8 @@ template RecType*(MSG: type getMetadataObj): untyped = type beaconBlocksByRangeObj* = object startSlot*: Slot - count*: uint64 - step*: uint64 + reqCount*: uint64 + reqStep*: uint64 template beaconBlocksByRange*(PROTO: type BeaconSync): type = beaconBlocksByRangeObj @@ -110,7 +110,8 @@ proc getMetadata*(peer: Peer; timeout: Duration = milliseconds(10000'i64)): Futu makeEth2Request(peer, "/eth2/beacon_chain/req/metadata/1/", msgBytes, Eth2Metadata, timeout) -proc beaconBlocksByRange*(peer: Peer; startSlot: Slot; count: uint64; step: uint64; +proc beaconBlocksByRange*(peer: Peer; startSlot: Slot; reqCount: uint64; + reqStep: uint64; timeout: Duration = milliseconds(10000'i64)): Future[ NetRes[seq[SignedBeaconBlock]]] {.gcsafe, libp2pProtocol( "beacon_blocks_by_range", 1).} = @@ -118,8 +119,8 @@ proc beaconBlocksByRange*(peer: Peer; startSlot: Slot; count: uint64; step: uint var writer = init(WriterType(SSZ), outputStream) var recordWriterCtx = beginRecord(writer, beaconBlocksByRangeObj) writeField(writer, recordWriterCtx, "startSlot", startSlot) - writeField(writer, recordWriterCtx, "count", count) - writeField(writer, recordWriterCtx, "step", step) + writeField(writer, recordWriterCtx, "reqCount", reqCount) + writeField(writer, recordWriterCtx, "reqStep", reqStep) endRecord(writer, recordWriterCtx) let msgBytes = getOutput(outputStream) makeEth2Request(peer, "/eth2/beacon_chain/req/beacon_blocks_by_range/1/", @@ -187,8 +188,8 @@ proc getMetadataUserHandler(peer: Peer): Eth2Metadata {. return peer.network.metadata -proc beaconBlocksByRangeUserHandler(peer: Peer; startSlot: Slot; count: uint64; - step: uint64; response: MultipleChunksResponse[ +proc beaconBlocksByRangeUserHandler(peer: Peer; startSlot: Slot; reqCount: uint64; + reqStep: uint64; response: MultipleChunksResponse[ SignedBeaconBlock]) {.async, libp2pProtocol("beacon_blocks_by_range", 1), gcsafe.} = type CurrentProtocol = BeaconSync @@ -199,21 +200,21 @@ proc beaconBlocksByRangeUserHandler(peer: Peer; startSlot: Slot; count: uint64; cast[ref[BeaconSyncNetworkState:ObjectType]](getNetworkState(peer.network, BeaconSyncProtocol)) - trace "got range request", peer, startSlot, count, step - if count > 0'u64: - var blocks: array[MAX_REQUESTED_BLOCKS, BlockRef] + trace "got range request", peer, startSlot, count = reqCount, step = reqStep + if reqCount > 0'u64: + var blocks: array[MAX_REQUEST_BLOCKS, BlockRef] let chainDag = peer.networkState.chainDag - count = min(count.Natural, blocks.len) + count = int min(reqCount, blocks.lenu64) let endIndex = count - 1 - startIndex = chainDag.getBlockRange(startSlot, step, + startIndex = chainDag.getBlockRange(startSlot, reqStep, blocks.toOpenArray(0, endIndex)) for b in blocks[startIndex .. endIndex]: doAssert not b.isNil, "getBlockRange should return non-nil blocks only" trace "wrote response block", slot = b.slot, roor = shortLog(b.root) await response.write(chainDag.get(b).data) - debug "Block range request done", peer, startSlot, count, step, + debug "Block range request done", peer, startSlot, count, reqStep, found = count - startIndex proc beaconBlocksByRootUserHandler(peer: Peer; blockRoots: BlockRootsList; response: MultipleChunksResponse[ @@ -287,7 +288,8 @@ proc getMetadataMounter(network: Eth2Node) = template callUserHandler(MSG: type beaconBlocksByRangeObj; peer: Peer; stream: Connection; msg: beaconBlocksByRangeObj): untyped = var response = init(MultipleChunksResponse[SignedBeaconBlock], peer, stream) - beaconBlocksByRangeUserHandler(peer, msg.startSlot, msg.count, msg.step, response) + beaconBlocksByRangeUserHandler(peer, msg.startSlot, msg.reqCount, msg.reqStep, + response) proc beaconBlocksByRangeMounter(network: Eth2Node) = proc snappyThunk(stream: Connection; protocol: string): Future[void] {.gcsafe.} = diff --git a/tests/test_block_pool.nim b/tests/test_block_pool.nim index 3ae100cc4..a6e157e68 100644 --- a/tests/test_block_pool.nim +++ b/tests/test_block_pool.nim @@ -148,6 +148,8 @@ suiteReport "Block pool processing" & preset(): var blocks: array[3, BlockRef] + + check: dag.getBlockRange(Slot(0), 1, blocks.toOpenArray(0, 0)) == 0 blocks[0..<1] == [dag.tail] @@ -159,7 +161,7 @@ suiteReport "Block pool processing" & preset(): blocks[0..<2] == [dag.tail, b2Add[]] dag.getBlockRange(Slot(0), 3, blocks.toOpenArray(0, 1)) == 1 - blocks[0..<2] == [nil, dag.tail] # block 3 is missing! + blocks[1..<2] == [dag.tail] # block 3 is missing! dag.getBlockRange(Slot(2), 2, blocks.toOpenArray(0, 1)) == 0 blocks[0..<2] == [b2Add[], b4Add[]] # block 3 is missing! @@ -172,7 +174,7 @@ suiteReport "Block pool processing" & preset(): # No blocks in sight either due to gaps dag.getBlockRange(Slot(3), 2, blocks.toOpenArray(0, 1)) == 2 - blocks[0..<2] == [BlockRef nil, nil] # block 3 is missing! + blocks[2..<2].len == 0 timedTest "Reverse order block add & get" & preset(): let missing = dag.addRawBlock(quarantine, b2, nil)