diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index 2e103e5c4..2dc631929 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -502,8 +502,8 @@ proc getRequestProtoName(fn: NimNode): NimNode = proc writeChunk*(conn: Connection, responseCode: Option[ResponseCode], - payload: Bytes, - contextBytes: openarray[byte] = []): Future[void] = + payload: openArray[byte], + contextBytes: openArray[byte] = []): Future[void] = var output = memoryOutput() try: @@ -558,13 +558,14 @@ proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {. finally: await stream.close() -proc sendResponseChunkBytes(response: UntypedResponse, payload: Bytes): Future[void] = +proc sendResponseChunkBytes( + response: UntypedResponse, payload: openArray[byte], + contextBytes: openArray[byte] = []): Future[void] = inc response.writtenChunks - response.stream.writeChunk(some Success, payload) + response.stream.writeChunk(some Success, payload, contextBytes) proc sendResponseChunk*(response: UntypedResponse, val: auto): Future[void] = - inc response.writtenChunks - response.stream.writeChunk(some Success, SSZ.encode(val)) + sendResponseChunkBytes(response, SSZ.encode(val)) template sendUserHandlerResultAsChunkImpl*(stream: Connection, handlerResultFut: Future): untyped = @@ -617,6 +618,11 @@ template write*[M](r: MultipleChunksResponse[M], val: M): untyped = mixin sendResponseChunk sendResponseChunk(UntypedResponse(r), val) +template writeRawBytes*[M]( + r: MultipleChunksResponse[M], bytes: openArray[byte], + contextBytes: openArray[byte]): untyped = + sendResponseChunkBytes(UntypedResponse(r), bytes, contextBytes) + template send*[M](r: SingleChunkResponse[M], val: M): untyped = mixin sendResponseChunk doAssert UntypedResponse(r).writtenChunks == 0 diff --git a/beacon_chain/sync/request_manager.nim b/beacon_chain/sync/request_manager.nim index 176b29f57..d5c9534f5 100644 --- a/beacon_chain/sync/request_manager.nim +++ b/beacon_chain/sync/request_manager.nim @@ -53,13 +53,13 @@ proc init*(T: type RequestManager, network: Eth2Node, ) proc checkResponse(roots: openArray[Eth2Digest], - blocks: openArray[ForkedSignedBeaconBlock]): bool = + blocks: openArray[ref ForkedSignedBeaconBlock]): bool = ## This procedure checks peer's response. var checks = @roots if len(blocks) > len(roots): return false for blk in blocks: - let res = checks.find(blk.root) + let res = checks.find(blk[].root) if res == -1: return false else: @@ -75,10 +75,11 @@ proc fetchAncestorBlocksFromNetwork(rman: RequestManager, peer_score = peer.getScore() let blocks = if peer.useSyncV2(): - await peer.beaconBlocksByRoot_v2(BlockRootsList items) + await beaconBlocksByRoot_v2(peer, BlockRootsList items) else: - (await peer.beaconBlocksByRoot(BlockRootsList items)).map() do (blcks: seq[phase0.SignedBeaconBlock]) -> auto: - blcks.mapIt(ForkedSignedBeaconBlock.init(it)) + (await beaconBlocksByRoot(peer, BlockRootsList items)).map( + proc(blcks: seq[phase0.SignedBeaconBlock]): auto = + blcks.mapIt(newClone(ForkedSignedBeaconBlock.init(it)))) if blocks.isOk: let ublocks = blocks.get() @@ -88,7 +89,7 @@ proc fetchAncestorBlocksFromNetwork(rman: RequestManager, gotUnviableBlock = false for b in ublocks: - let ver = await rman.blockVerifier(b) + let ver = await rman.blockVerifier(b[]) if ver.isErr(): case ver.error() of BlockError.MissingParent: diff --git a/beacon_chain/sync/sync_manager.nim b/beacon_chain/sync/sync_manager.nim index 59719a9fd..4498f1027 100644 --- a/beacon_chain/sync/sync_manager.nim +++ b/beacon_chain/sync/sync_manager.nim @@ -73,7 +73,7 @@ type slots*: uint64 SyncManagerError* = object of CatchableError - BeaconBlocksRes* = NetRes[seq[ForkedSignedBeaconBlock]] + BeaconBlocksRes* = NetRes[seq[ref ForkedSignedBeaconBlock]] proc now*(sm: typedesc[SyncMoment], slots: uint64): SyncMoment {.inline.} = SyncMoment(stamp: now(chronos.Moment), slots: slots) @@ -157,22 +157,15 @@ proc getBlocks*[A, B](man: SyncManager[A, B], peer: A, slot = req.slot, slot_count = req.count, step = req.step, peer_score = peer.getScore(), peer_speed = peer.netKbps(), direction = man.direction, topics = "syncman" - if peer.useSyncV2(): + try: let res = - try: + if peer.useSyncV2(): await beaconBlocksByRange_v2(peer, req.slot, req.count, req.step) - except CancelledError: - debug "Interrupt, while waiting getBlocks response", peer = peer, - slot = req.slot, slot_count = req.count, step = req.step, - peer_speed = peer.netKbps(), direction = man.direction, - topics = "syncman" - return - except CatchableError as exc: - debug "Error, while waiting getBlocks response", peer = peer, - slot = req.slot, slot_count = req.count, step = req.step, - errName = exc.name, errMsg = exc.msg, peer_speed = peer.netKbps(), - direction = man.direction, topics = "syncman" - return + else: + (await beaconBlocksByRange(peer, req.slot, req.count, req.step)).map( + proc(blcks: seq[phase0.SignedBeaconBlock]): auto = + blcks.mapIt(newClone(ForkedSignedBeaconBlock.init(it)))) + if res.isErr(): debug "Error, while reading getBlocks response", peer = peer, slot = req.slot, count = req.count, @@ -181,33 +174,18 @@ proc getBlocks*[A, B](man: SyncManager[A, B], peer: A, error = $res.error() return return res - else: - let res = - try: - await beaconBlocksByRange(peer, req.slot, req.count, req.step) - except CancelledError: - debug "Interrupt, while waiting getBlocks response", peer = peer, - slot = req.slot, slot_count = req.count, step = req.step, - peer_speed = peer.netKbps(), direction = man.direction, - topics = "syncman" - return - except CatchableError as exc: - debug "Error, while waiting getBlocks response", peer = peer, - slot = req.slot, slot_count = req.count, step = req.step, - errName = exc.name, errMsg = exc.msg, peer_speed = peer.netKbps(), - direction = man.direction, topics = "syncman" - return - if res.isErr(): - debug "Error, while reading getBlocks response", - peer = peer, slot = req.slot, count = req.count, - step = req.step, peer_speed = peer.netKbps(), - direction = man.direction, error = $res.error(), - topics = "syncman" - return - let forked = - res.map() do (blcks: seq[phase0.SignedBeaconBlock]) -> auto: - blcks.mapIt(ForkedSignedBeaconBlock.init(it)) - return forked + except CancelledError: + debug "Interrupt, while waiting getBlocks response", peer = peer, + slot = req.slot, slot_count = req.count, step = req.step, + peer_speed = peer.netKbps(), direction = man.direction, + topics = "syncman" + return + except CatchableError as exc: + debug "Error, while waiting getBlocks response", peer = peer, + slot = req.slot, slot_count = req.count, step = req.step, + errName = exc.name, errMsg = exc.msg, peer_speed = peer.netKbps(), + direction = man.direction, topics = "syncman" + return proc remainingSlots(man: SyncManager): uint64 = if man.direction == SyncQueueKind.Forward: diff --git a/beacon_chain/sync/sync_protocol.nim b/beacon_chain/sync/sync_protocol.nim index 1df33d1dd..c26d608cb 100644 --- a/beacon_chain/sync/sync_protocol.nim +++ b/beacon_chain/sync/sync_protocol.nim @@ -59,8 +59,9 @@ type BlockRootsList* = List[Eth2Digest, Limit MAX_REQUEST_BLOCKS] -proc readChunkPayload*(conn: Connection, peer: Peer, - MsgType: type ForkedSignedBeaconBlock): Future[NetRes[ForkedSignedBeaconBlock]] {.async.} = +proc readChunkPayload*( + conn: Connection, peer: Peer, MsgType: type (ref ForkedSignedBeaconBlock)): + Future[NetRes[MsgType]] {.async.} = var contextBytes: ForkDigest try: await conn.readExactly(addr contextBytes, sizeof contextBytes) @@ -70,42 +71,24 @@ proc readChunkPayload*(conn: Connection, peer: Peer, if contextBytes == peer.network.forkDigests.phase0: let res = await readChunkPayload(conn, peer, phase0.SignedBeaconBlock) if res.isOk: - return ok ForkedSignedBeaconBlock.init(res.get) + return ok newClone(ForkedSignedBeaconBlock.init(res.get)) else: return err(res.error) elif contextBytes == peer.network.forkDigests.altair: let res = await readChunkPayload(conn, peer, altair.SignedBeaconBlock) if res.isOk: - return ok ForkedSignedBeaconBlock.init(res.get) + return ok newClone(ForkedSignedBeaconBlock.init(res.get)) else: return err(res.error) elif contextBytes == peer.network.forkDigests.bellatrix: let res = await readChunkPayload(conn, peer, bellatrix.SignedBeaconBlock) if res.isOk: - return ok ForkedSignedBeaconBlock.init(res.get) + return ok newClone(ForkedSignedBeaconBlock.init(res.get)) else: return err(res.error) else: return neterr InvalidContextBytes -proc sendResponseChunk*(response: UntypedResponse, - val: ForkedSignedBeaconBlock): Future[void] = - inc response.writtenChunks - - case val.kind - of BeaconBlockFork.Phase0: - response.stream.writeChunk(some ResponseCode.Success, - SSZ.encode(val.phase0Data), - response.peer.network.forkDigests.phase0.data) - of BeaconBlockFork.Altair: - response.stream.writeChunk(some ResponseCode.Success, - SSZ.encode(val.altairData), - response.peer.network.forkDigests.altair.data) - of BeaconBlockFork.Bellatrix: - response.stream.writeChunk(some ResponseCode.Success, - SSZ.encode(val.bellatrixData), - response.peer.network.forkDigests.bellatrix.data) - func shortLog*(s: StatusMsg): auto = ( forkDigest: s.forkDigest, @@ -153,7 +136,9 @@ proc checkStatusMsg(state: BeaconSyncNetworkState, status: StatusMsg): if status.finalizedEpoch <= dag.finalizedHead.slot.epoch: let blockId = dag.getBlockIdAtSlot(status.finalizedEpoch.start_slot()) - if status.finalizedRoot != blockId.bid.root and blockId.bid.root != Eth2Digest(): + if status.finalizedRoot != blockId.bid.root and + blockId.bid.root != Eth2Digest() and + status.finalizedRoot != Eth2Digest(): return err("peer following different finality") ok() @@ -224,47 +209,73 @@ p2pProtocol BeaconSync(version = 1, reqStep: uint64, response: MultipleChunksResponse[phase0.SignedBeaconBlock]) {.async, libp2pProtocol("beacon_blocks_by_range", 1).} = + # TODO Semantically, this request should return a non-ref, but doing so + # runs into extreme inefficiency due to the compiler introducing + # hidden copies - in future nim versions with move support, this should + # be revisited + # TODO This code is more complicated than it needs to be, since the type + # of the multiple chunks response is not actually used in this server + # implementation (it's used to derive the signature of the client + # function, not in the code below!) + # TODO although you can't tell from this function definition, a magic + # client call that returns `seq[ref SignedBeaconBlock]` will + # will be generated by the libp2p macro - we guarantee that seq items + # are `not-nil` in the implementation trace "got range request", peer, startSlot, count = reqCount, step = reqStep - if reqCount > 0'u64 and reqStep > 0'u64: - var blocks: array[MAX_REQUEST_BLOCKS, BlockId] - let - dag = peer.networkState.dag - # Limit number of blocks in response - count = int min(reqCount, blocks.lenu64) - - let - endIndex = count - 1 - startIndex = - dag.getBlockRange(startSlot, reqStep, blocks.toOpenArray(0, endIndex)) - peer.updateRequestQuota( - blockByRangeLookupCost + - max(0, endIndex - startIndex + 1).float * blockResponseCost) - peer.awaitNonNegativeRequestQuota() - - for i in startIndex..endIndex: - trace "wrote response block", - slot = blocks[i].slot, roor = shortLog(blocks[i].root) - let blk = dag.getForkedBlock(blocks[i]) - if blk.isSome(): - let blck = blk.get() - case blck.kind - of BeaconBlockFork.Phase0: - await response.write(blck.phase0Data.asSigned) - else: - # Skipping all subsequent blocks should be OK because the spec says: - # "Clients MAY limit the number of blocks in the response." - # https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#beaconblocksbyrange - # - # Also, our response would be indistinguishable from a node - # that have been synced exactly to the altair transition slot. - break - - debug "Block range request done", - peer, startSlot, count, reqStep, found = count - startIndex - else: + if reqCount == 0'u64 or reqStep == 0'u64: raise newException(InvalidInputsError, "Empty range requested") + let + dag = peer.networkState.dag + + if startSlot.epoch >= dag.cfg.ALTAIR_FORK_EPOCH: + # "Clients MAY limit the number of blocks in the response." + # https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#beaconblocksbyrange + debug "Block range v1 request for post-altair range", + peer, startSlot, reqCount, reqStep + return + + var blocks: array[MAX_REQUEST_BLOCKS, BlockId] + + let + # Limit number of blocks in response + count = int min(reqCount, blocks.lenu64) + endIndex = count - 1 + startIndex = + dag.getBlockRange(startSlot, reqStep, blocks.toOpenArray(0, endIndex)) + + peer.updateRequestQuota(blockByRangeLookupCost) + peer.awaitNonNegativeRequestQuota() + + var + found = 0 + bytes: seq[byte] + + for i in startIndex..endIndex: + if blocks[i].slot.epoch >= dag.cfg.ALTAIR_FORK_EPOCH: + # Skipping all subsequent blocks should be OK because the spec says: + # "Clients MAY limit the number of blocks in the response." + # https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#beaconblocksbyrange + # + # Also, our response would be indistinguishable from a node + # that have been synced exactly to the altair transition slot. + break + + if dag.getBlockSSZ(blocks[i], bytes): + trace "writing response block", + slot = blocks[i].slot, roor = shortLog(blocks[i].root) + + peer.updateRequestQuota(blockResponseCost) + peer.awaitNonNegativeRequestQuota() + + await response.writeRawBytes(bytes, []) # phase0 bytes + + inc found + + debug "Block range request done", + peer, startSlot, count, reqStep, found + proc beaconBlocksByRoot( peer: Peer, # Please note that the SSZ list here ensures that the @@ -272,6 +283,19 @@ p2pProtocol BeaconSync(version = 1, blockRoots: BlockRootsList, response: MultipleChunksResponse[phase0.SignedBeaconBlock]) {.async, libp2pProtocol("beacon_blocks_by_root", 1).} = + # TODO Semantically, this request should return a non-ref, but doing so + # runs into extreme inefficiency due to the compiler introducing + # hidden copies - in future nim versions with move support, this should + # be revisited + # TODO This code is more complicated than it needs to be, since the type + # of the multiple chunks response is not actually used in this server + # implementation (it's used to derive the signature of the client + # function, not in the code below!) + # TODO although you can't tell from this function definition, a magic + # client call that returns `seq[ref SignedBeaconBlock]` will + # will be generated by the libp2p macro - we guarantee that seq items + # are `not-nil` in the implementation + if blockRoots.len == 0: raise newException(InvalidInputsError, "No blocks requested") @@ -279,28 +303,33 @@ p2pProtocol BeaconSync(version = 1, dag = peer.networkState.dag count = blockRoots.len + var + found = 0 + bytes: seq[byte] + peer.updateRequestQuota(count.float * blockByRootLookupCost) peer.awaitNonNegativeRequestQuota() - var found = 0 for i in 0..= dag.cfg.ALTAIR_FORK_EPOCH: + # Skipping this block should be fine because the spec says: + # "Clients MAY limit the number of blocks in the response." + # https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#beaconblocksbyroot + # + # Also, our response would be indistinguishable from a node + # that have been synced exactly to the altair transition slot. + continue + + if dag.getBlockSSZ(blockRef.bid, bytes): + peer.updateRequestQuota(blockResponseCost) + peer.awaitNonNegativeRequestQuota() + + await response.writeRawBytes(bytes, []) # phase0 bytes + inc found debug "Block root request done", peer, roots = blockRoots.len, count, found @@ -310,47 +339,75 @@ p2pProtocol BeaconSync(version = 1, startSlot: Slot, reqCount: uint64, reqStep: uint64, - response: MultipleChunksResponse[ForkedSignedBeaconBlock]) + response: MultipleChunksResponse[ref ForkedSignedBeaconBlock]) {.async, libp2pProtocol("beacon_blocks_by_range", 2).} = + # TODO Semantically, this request should return a non-ref, but doing so + # runs into extreme inefficiency due to the compiler introducing + # hidden copies - in future nim versions with move support, this should + # be revisited + # TODO This code is more complicated than it needs to be, since the type + # of the multiple chunks response is not actually used in this server + # implementation (it's used to derive the signature of the client + # function, not in the code below!) + # TODO although you can't tell from this function definition, a magic + # client call that returns `seq[ref ForkedSignedBeaconBlock]` will + # will be generated by the libp2p macro - we guarantee that seq items + # are `not-nil` in the implementation + trace "got range request", peer, startSlot, count = reqCount, step = reqStep - if reqCount > 0'u64 and reqStep > 0'u64: - var blocks: array[MAX_REQUEST_BLOCKS, BlockId] - let - dag = peer.networkState.dag - # Limit number of blocks in response - count = int min(reqCount, blocks.lenu64) - - let - endIndex = count - 1 - startIndex = - dag.getBlockRange(startSlot, reqStep, - blocks.toOpenArray(0, endIndex)) - peer.updateRequestQuota( - blockByRangeLookupCost + - max(0, endIndex - startIndex + 1).float * blockResponseCost) - peer.awaitNonNegativeRequestQuota() - - for i in startIndex..endIndex: - let - blck = dag.getForkedBlock(blocks[i]).valueOr: - continue - - await response.write(blck.asSigned) - - debug "Block range request done", - peer, startSlot, count, reqStep, found = count - startIndex - else: + if reqCount == 0 or reqStep == 0: raise newException(InvalidInputsError, "Empty range requested") + var blocks: array[MAX_REQUEST_BLOCKS, BlockId] + let + dag = peer.networkState.dag + # Limit number of blocks in response + count = int min(reqCount, blocks.lenu64) + endIndex = count - 1 + startIndex = + dag.getBlockRange(startSlot, reqStep, + blocks.toOpenArray(0, endIndex)) + + peer.updateRequestQuota(blockByRangeLookupCost) + peer.awaitNonNegativeRequestQuota() + + var + found = 0 + bytes: seq[byte] + + for i in startIndex..endIndex: + if dag.getBlockSSZ(blocks[i], bytes): + peer.updateRequestQuota(blockResponseCost) + peer.awaitNonNegativeRequestQuota() + + await response.writeRawBytes( + bytes, dag.forkDigestAtEpoch(blocks[i].slot.epoch).data) + + inc found + + debug "Block range request done", + peer, startSlot, count, reqStep + proc beaconBlocksByRoot_v2( peer: Peer, # Please note that the SSZ list here ensures that the # spec constant MAX_REQUEST_BLOCKS is enforced: blockRoots: BlockRootsList, - response: MultipleChunksResponse[ForkedSignedBeaconBlock]) + response: MultipleChunksResponse[ref ForkedSignedBeaconBlock]) {.async, libp2pProtocol("beacon_blocks_by_root", 2).} = - + # TODO Semantically, this request should return a non-ref, but doing so + # runs into extreme inefficiency due to the compiler introducing + # hidden copies - in future nim versions with move support, this should + # be revisited + # TODO This code is more complicated than it needs to be, since the type + # of the multiple chunks response is not actually used in this server + # implementation (it's used to derive the signature of the client + # function, not in the code below!) + # TODO although you can't tell from this function definition, a magic + # client call that returns `seq[ref ForkedSignedBeaconBlock]` will + # will be generated by the libp2p macro - we guarantee that seq items + # are `not-nil` in the implementation if blockRoots.len == 0: raise newException(InvalidInputsError, "No blocks requested") @@ -361,15 +418,23 @@ p2pProtocol BeaconSync(version = 1, peer.updateRequestQuota(count.float * blockByRootLookupCost) peer.awaitNonNegativeRequestQuota() - var found = 0 - for i in 0.. 0) and (sr.slot + sr.count - 1'u64 != sq.outSlot) proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], - data: seq[ForkedSignedBeaconBlock], + data: seq[ref ForkedSignedBeaconBlock], processingCb: ProcessingCallback = nil) {.async.} = ## Push successful result to queue ``sq``. mixin updateScore @@ -579,13 +579,13 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], res: Result[void, BlockError] for blk in sq.blocks(item): - res = await sq.blockVerifier(blk) + res = await sq.blockVerifier(blk[]) if res.isOk(): hasOkBlock = true else: case res.error() of BlockError.MissingParent: - missingParentSlot = some(blk.slot) + missingParentSlot = some(blk[].slot) break of BlockError.Duplicate: # Keep going, happens naturally @@ -595,7 +595,7 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], # quarantine if unviableBlock.isNone: # Remember the first unviable block, so we can log it - unviableBlock = some((blk.root, blk.slot)) + unviableBlock = some((blk[].root, blk[].slot)) of BlockError.Invalid: hasInvalidBlock = true diff --git a/tests/test_sync_manager.nim b/tests/test_sync_manager.nim index 4357e6d31..6ea3fae4b 100644 --- a/tests/test_sync_manager.nim +++ b/tests/test_sync_manager.nim @@ -41,22 +41,26 @@ proc collector(queue: AsyncQueue[BlockEntry]): BlockVerifier = return verify suite "SyncManager test suite": - proc createChain(start, finish: Slot): seq[ForkedSignedBeaconBlock] = + proc createChain(start, finish: Slot): seq[ref ForkedSignedBeaconBlock] = doAssert(start <= finish) let count = int(finish - start + 1'u64) - var res = newSeq[ForkedSignedBeaconBlock](count) + var res = newSeq[ref ForkedSignedBeaconBlock](count) var curslot = start for item in res.mitems(): - item.phase0Data.message.slot = curslot + item = new ForkedSignedBeaconBlock + item[].phase0Data.message.slot = curslot curslot = curslot + 1'u64 res - proc getSlice(chain: openarray[ForkedSignedBeaconBlock], startSlot: Slot, - request: SyncRequest[SomeTPeer]): seq[ForkedSignedBeaconBlock] = + proc getSlice(chain: openArray[ref ForkedSignedBeaconBlock], startSlot: Slot, + request: SyncRequest[SomeTPeer]): seq[ref ForkedSignedBeaconBlock] = let startIndex = int(request.slot - startSlot) finishIndex = int(request.slot - startSlot) + int(request.count) - 1 - @chain[startIndex..finishIndex] + var res = newSeq[ref ForkedSignedBeaconBlock](1 + finishIndex - startIndex) + for i in 0..