From 2d6a661ac6ae394effe205e56695801ae8d50289 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Thu, 15 Jul 2021 21:01:07 +0200 Subject: [PATCH] Syncv2 (#2723) * bump libp2p * altair sync v2 Use V2 sync requests after the altair fork has happened, according to the wall clock * Fix the behavior of the v1 req/resp calls after Altair Co-authored-by: Zahary Karadjov --- .../gossip_processing/block_processor.nim | 24 +++--- .../gossip_processing/eth2_processor.nim | 11 +-- beacon_chain/nimbus_beacon_node.nim | 22 +++--- beacon_chain/rpc/beacon_api.nim | 6 +- beacon_chain/rpc/beacon_rest_api.nim | 11 +-- beacon_chain/rpc/debug_api.nim | 4 +- .../rpc/eth2_json_rest_serialization.nim | 8 +- beacon_chain/rpc/validator_api.nim | 9 +-- beacon_chain/ssz/sszdump.nim | 9 +++ beacon_chain/sync/request_manager.nim | 13 +++- beacon_chain/sync/sync_manager.nim | 76 +++++++++++-------- beacon_chain/sync/sync_protocol.nim | 38 ++++++++-- beacon_chain/validators/validator_duties.nim | 16 ++-- tests/test_sync_manager.nim | 20 ++--- vendor/nim-libp2p | 2 +- 15 files changed, 168 insertions(+), 101 deletions(-) diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index c9d5a36e4..07d6e9a29 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -11,12 +11,16 @@ import std/math, stew/results, chronicles, chronos, metrics, - ../spec/[crypto, datatypes/phase0, digest], + ../spec/[ + crypto, datatypes/phase0, datatypes/altair, digest, + forkedbeaconstate_helpers], ../consensus_object_pools/[block_clearance, blockchain_dag, attestation_pool], ./consensus_manager, ".."/[beacon_clock, beacon_node_types], ../ssz/sszdump +export sszdump + # Block Processor # ------------------------------------------------------------------------------ # The block processor moves blocks from "Incoming" to "Consensus verified" @@ -26,7 +30,7 @@ declareHistogram beacon_store_block_duration_seconds, type BlockEntry* = object - blck*: phase0.SignedBeaconBlock + blck*: ForkedSignedBeaconBlock resfut*: Future[Result[void, BlockError]] queueTick*: Moment # Moment when block was enqueued validationDur*: Duration # Time it took to perform gossip validation @@ -103,7 +107,7 @@ proc hasBlocks*(self: BlockProcessor): bool = # ------------------------------------------------------------------------------ proc addBlock*( - self: var BlockProcessor, blck: phase0.SignedBeaconBlock, + self: var BlockProcessor, blck: ForkedSignedBeaconBlock, resfut: Future[Result[void, BlockError]] = nil, validationDur = Duration()) = ## Enqueue a Gossip-validated block for consensus verification @@ -128,7 +132,8 @@ proc addBlock*( # ------------------------------------------------------------------------------ proc dumpBlock*[T]( - self: BlockProcessor, signedBlock: phase0.SignedBeaconBlock, + self: BlockProcessor, + signedBlock: phase0.SignedBeaconBlock | altair.SignedBeaconBlock, res: Result[T, (ValidationResult, BlockError)]) = if self.dumpEnabled and res.isErr: case res.error[1] @@ -142,15 +147,16 @@ proc dumpBlock*[T]( discard proc storeBlock( - self: var BlockProcessor, signedBlock: phase0.SignedBeaconBlock, + self: var BlockProcessor, + signedBlock: phase0.SignedBeaconBlock | altair.SignedBeaconBlock, wallSlot: Slot): Result[void, BlockError] = let attestationPool = self.consensusManager.attestationPool + type Trusted = typeof signedBlock.asTrusted() let blck = self.consensusManager.dag.addRawBlock( self.consensusManager.quarantine, signedBlock) do ( - blckRef: BlockRef, trustedBlock: phase0.TrustedSignedBeaconBlock, - epochRef: EpochRef): + blckRef: BlockRef, trustedBlock: Trusted, epochRef: EpochRef): # Callback add to fork choice if valid attestationPool[].addForkChoice( epochRef, blckRef, trustedBlock.message, wallSlot) @@ -181,7 +187,7 @@ proc processBlock(self: var BlockProcessor, entry: BlockEntry) = let startTick = Moment.now() - res = self.storeBlock(entry.blck, wallSlot) + res = withBlck(entry.blck): self.storeBlock(blck, wallSlot) storeBlockTick = Moment.now() if res.isOk(): @@ -198,7 +204,7 @@ proc processBlock(self: var BlockProcessor, entry: BlockEntry) = debug "Block processed", localHeadSlot = self.consensusManager.dag.head.slot, - blockSlot = entry.blck.message.slot, + blockSlot = entry.blck.slot, validationDur = entry.validationDur, queueDur, storeBlockDur, updateHeadDur diff --git a/beacon_chain/gossip_processing/eth2_processor.nim b/beacon_chain/gossip_processing/eth2_processor.nim index 59ee57122..fdeaed2c3 100644 --- a/beacon_chain/gossip_processing/eth2_processor.nim +++ b/beacon_chain/gossip_processing/eth2_processor.nim @@ -11,14 +11,14 @@ import std/tables, stew/results, chronicles, chronos, metrics, - ../spec/[crypto, digest], - ../spec/datatypes/base, + ../spec/[crypto, digest, forkedbeaconstate_helpers], + ../spec/datatypes/[altair, phase0], ../consensus_object_pools/[block_clearance, blockchain_dag, exit_pool, attestation_pool], ./gossip_validation, ./block_processor, ./batch_validation, ../validators/validator_pool, ../beacon_node_types, - ../beacon_clock, ../ssz/sszdump + ../beacon_clock # Metrics for tracking attestation and beacon block loss declareCounter beacon_attestations_received, @@ -127,7 +127,7 @@ proc getCurrentBeaconTime*(self: Eth2Processor|ref Eth2Processor): BeaconTime = proc blockValidator*( self: var Eth2Processor, - signedBlock: SignedBeaconBlock): ValidationResult = + signedBlock: phase0.SignedBeaconBlock | altair.SignedBeaconBlock): ValidationResult = logScope: signedBlock = shortLog(signedBlock.message) blockRoot = shortLog(signedBlock.root) @@ -172,7 +172,8 @@ proc blockValidator*( # propagation of seemingly good blocks trace "Block validated" self.blockProcessor[].addBlock( - signedBlock, validationDur = self.getCurrentBeaconTime() - wallTime) + ForkedSignedBeaconBlock.init(signedBlock), + validationDur = self.getCurrentBeaconTime() - wallTime) ValidationResult.Accept diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 681ff1a95..5f753cf70 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -39,9 +39,9 @@ import ./rpc/[beacon_api, config_api, debug_api, event_api, nimbus_api, node_api, validator_api], ./spec/[ - datatypes, digest, crypto, forkedbeaconstate_helpers, beaconstate, - eth2_apis/beacon_rpc_client, helpers, network, presets, weak_subjectivity, - signatures], + datatypes/phase0, datatypes/altair, digest, crypto, + forkedbeaconstate_helpers, beaconstate, eth2_apis/beacon_rpc_client, + helpers, network, presets, weak_subjectivity, signatures], ./consensus_object_pools/[ blockchain_dag, block_quarantine, block_clearance, block_pools_types, attestation_pool, exit_pool, spec_cache], @@ -117,13 +117,13 @@ proc init*(T: type BeaconNode, db = BeaconChainDB.new(config.databaseDir, inMemory = false) var - genesisState, checkpointState: ref BeaconState - checkpointBlock: TrustedSignedBeaconBlock + genesisState, checkpointState: ref phase0.BeaconState + checkpointBlock: phase0.TrustedSignedBeaconBlock if config.finalizedCheckpointState.isSome: let checkpointStatePath = config.finalizedCheckpointState.get.string checkpointState = try: - newClone(SSZ.loadFile(checkpointStatePath, BeaconState)) + newClone(SSZ.loadFile(checkpointStatePath, phase0.BeaconState)) except SerializationError as err: fatal "Checkpoint state deserialization failed", err = formatMsg(err, checkpointStatePath) @@ -140,7 +140,7 @@ proc init*(T: type BeaconNode, let checkpointBlockPath = config.finalizedCheckpointBlock.get.string try: # TODO Perform sanity checks like signature and slot verification at least - checkpointBlock = SSZ.loadFile(checkpointBlockPath, TrustedSignedBeaconBlock) + checkpointBlock = SSZ.loadFile(checkpointBlockPath, phase0.TrustedSignedBeaconBlock) except SerializationError as err: fatal "Invalid checkpoint block", err = err.formatMsg(checkpointBlockPath) quit 1 @@ -156,8 +156,8 @@ proc init*(T: type BeaconNode, var eth1Monitor: Eth1Monitor if not ChainDAGRef.isInitialized(db): var - tailState: ref BeaconState - tailBlock: TrustedSignedBeaconBlock + tailState: ref phase0.BeaconState + tailBlock: phase0.TrustedSignedBeaconBlock if genesisStateContents.len == 0 and checkpointState == nil: when hasGenesisDetection: @@ -216,7 +216,7 @@ proc init*(T: type BeaconNode, quit 1 else: try: - genesisState = newClone(SSZ.decode(genesisStateContents, BeaconState)) + genesisState = newClone(SSZ.decode(genesisStateContents, phase0.BeaconState)) except CatchableError as err: raiseAssert "Invalid baked-in state: " & err.msg @@ -1118,7 +1118,7 @@ proc installMessageValidators(node: BeaconNode) = node.network.addValidator( getBeaconBlocksTopic(node.dag.forkDigests.phase0), - proc (signedBlock: SignedBeaconBlock): ValidationResult = + proc (signedBlock: phase0.SignedBeaconBlock): ValidationResult = node.processor[].blockValidator(signedBlock)) node.network.addValidator( diff --git a/beacon_chain/rpc/beacon_api.nim b/beacon_chain/rpc/beacon_api.nim index 81a1d066c..cdb28e2b6 100644 --- a/beacon_chain/rpc/beacon_api.nim +++ b/beacon_chain/rpc/beacon_api.nim @@ -18,7 +18,7 @@ import ../validators/validator_duties, ../gossip_processing/gossip_validation, ../consensus_object_pools/blockchain_dag, - ../spec/[crypto, datatypes, digest, forkedbeaconstate_helpers, network], + ../spec/[crypto, datatypes/phase0, digest, forkedbeaconstate_helpers, network], ../spec/eth2_apis/callsigs_types, ../ssz/merkleization, ./rpc_utils, ./eth2_json_rpc_serialization @@ -399,7 +399,7 @@ proc installBeaconApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {. result.canonical = bd.refs.isAncestorOf(node.dag.head) - rpcServer.rpc("post_v1_beacon_blocks") do (blck: SignedBeaconBlock) -> int: + rpcServer.rpc("post_v1_beacon_blocks") do (blck: phase0.SignedBeaconBlock) -> int: if not(node.syncManager.inProgress): raise newException(CatchableError, "Beacon node is currently syncing, try again later.") @@ -427,7 +427,7 @@ proc installBeaconApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {. return 200 rpcServer.rpc("get_v1_beacon_blocks_blockId") do ( - blockId: string) -> TrustedSignedBeaconBlock: + blockId: string) -> phase0.TrustedSignedBeaconBlock: # TODO detect Altair and fail: /v1/ APIs don't support Altair return node.getBlockDataFromBlockId(blockId).data.phase0Block diff --git a/beacon_chain/rpc/beacon_rest_api.nim b/beacon_chain/rpc/beacon_rest_api.nim index 778d8e8dd..53c5fb250 100644 --- a/beacon_chain/rpc/beacon_rest_api.nim +++ b/beacon_chain/rpc/beacon_rest_api.nim @@ -13,7 +13,7 @@ import ../gossip_processing/gossip_validation, ../validators/validator_duties, ../spec/[crypto, digest, forkedbeaconstate_helpers, network], - ../spec/datatypes/base, + ../spec/datatypes/phase0, ../ssz/merkleization, ./eth2_json_rest_serialization, ./rest_utils @@ -595,7 +595,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) = block: if contentBody.isNone(): return RestApiResponse.jsonError(Http400, EmptyRequestBodyError) - let dres = decodeBody(SignedBeaconBlock, contentBody.get()) + let dres = decodeBody(phase0.SignedBeaconBlock, contentBody.get()) if dres.isErr(): return RestApiResponse.jsonError(Http400, InvalidBlockObjectError, $dres.error()) @@ -615,7 +615,8 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) = node.network.broadcast(blocksTopic, blck) return RestApiResponse.jsonError(Http202, BlockValidationError) else: - let res = await proposeSignedBlock(node, head, AttachedValidator(), blck) + let res = await proposeSignedBlock( + node, head, AttachedValidator(), blck) if res == head: # TODO altair-transition, but not for immediate testnet-priority let blocksTopic = getBeaconBlocksTopic(node.dag.forkDigests.phase0) @@ -636,7 +637,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) = if res.isErr(): return RestApiResponse.jsonError(Http404, BlockNotFoundError) res.get() - static: doAssert bdata.data.phase0Block is TrustedSignedBeaconBlock + static: doAssert bdata.data.phase0Block is phase0.TrustedSignedBeaconBlock return RestApiResponse.jsonResponse(bdata.data.phase0Block) # https://ethereum.github.io/eth2.0-APIs/#/Beacon/getBlockRoot @@ -950,7 +951,7 @@ proc getStateFork*(state_id: StateIdent): RestResponse[DataRestFork] {. meth: MethodGet.} ## https://ethereum.github.io/eth2.0-APIs/#/Beacon/getStateFork -proc publishBlock*(body: SignedBeaconBlock): RestPlainResponse {. +proc publishBlock*(body: phase0.SignedBeaconBlock): RestPlainResponse {. rest, endpoint: "/eth/v1/beacon/blocks", meth: MethodPost.} ## https://ethereum.github.io/eth2.0-APIs/#/Beacon/publishBlock diff --git a/beacon_chain/rpc/debug_api.nim b/beacon_chain/rpc/debug_api.nim index 3361f1009..00426d911 100644 --- a/beacon_chain/rpc/debug_api.nim +++ b/beacon_chain/rpc/debug_api.nim @@ -13,7 +13,7 @@ import chronicles, ../version, ../beacon_node_common, ../networking/[eth2_network, peer_pool], - ../spec/datatypes/base, + ../spec/datatypes/phase0, ../spec/[digest, presets], ./rpc_utils, ./eth2_json_rpc_serialization @@ -25,7 +25,7 @@ type proc installDebugApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {. raises: [Exception].} = # TODO fix json-rpc rpcServer.rpc("get_v1_debug_beacon_states_stateId") do ( - stateId: string) -> BeaconState: + stateId: string) -> phase0.BeaconState: withStateForStateId(stateId): return stateData.data.hbsPhase0.data diff --git a/beacon_chain/rpc/eth2_json_rest_serialization.nim b/beacon_chain/rpc/eth2_json_rest_serialization.nim index 525587cc7..07658a5dc 100644 --- a/beacon_chain/rpc/eth2_json_rest_serialization.nim +++ b/beacon_chain/rpc/eth2_json_rest_serialization.nim @@ -7,7 +7,7 @@ import nimcrypto/utils as ncrutils, ../beacon_node_common, ../networking/eth2_network, ../consensus_object_pools/[blockchain_dag, exit_pool], - ../spec/[crypto, digest, datatypes, eth2_apis/callsigs_types], + ../spec/[crypto, digest, datatypes/phase0, eth2_apis/callsigs_types], ../ssz/merkleization, rest_utils export json_serialization @@ -151,7 +151,7 @@ type DataRestFork* = DataEnclosedObject[Fork] DataRestProposerDuties* = DataRootEnclosedObject[seq[RestProposerDuty]] DataRestAttesterDuties* = DataRootEnclosedObject[seq[RestAttesterDuty]] - DataRestBeaconBlock* = DataEnclosedObject[BeaconBlock] + DataRestBeaconBlock* = DataEnclosedObject[phase0.BeaconBlock] DataRestAttestationData* = DataEnclosedObject[AttestationData] DataRestAttestation* = DataEnclosedObject[Attestation] DataRestSyncInfo* = DataEnclosedObject[RestSyncInfo] @@ -160,7 +160,7 @@ type DataRestVersion* = DataEnclosedObject[RestVersion] DataRestConfig* = DataEnclosedObject[RestConfig] - EncodeTypes* = SignedBeaconBlock + EncodeTypes* = phase0.SignedBeaconBlock EncodeArrays* = seq[ValidatorIndex] | seq[Attestation] | seq[SignedAggregateAndProof] | seq[RestCommitteeSubscription] @@ -581,7 +581,7 @@ proc decodeBody*[T](t: typedesc[T], return err("Unexpected deserialization error") ok(data) -RestJson.useCustomSerialization(BeaconState.justification_bits): +RestJson.useCustomSerialization(phase0.BeaconState.justification_bits): read: let s = reader.readValue(string) if s.len != 4: diff --git a/beacon_chain/rpc/validator_api.nim b/beacon_chain/rpc/validator_api.nim index 3df6f6d0a..af854e38a 100644 --- a/beacon_chain/rpc/validator_api.nim +++ b/beacon_chain/rpc/validator_api.nim @@ -18,7 +18,7 @@ import # Local modules ../spec/[crypto, digest, forkedbeaconstate_helpers, helpers, network, signatures], - ../spec/datatypes/base, + ../spec/datatypes/phase0, ../spec/eth2_apis/callsigs_types, ../consensus_object_pools/[blockchain_dag, spec_cache, attestation_pool], ../ssz/merkleization, ../beacon_node_common, ../beacon_node_types, @@ -35,7 +35,7 @@ type proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {. raises: [Exception].} = # TODO fix json-rpc rpcServer.rpc("get_v1_validator_block") do ( - slot: Slot, graffiti: GraffitiBytes, randao_reveal: ValidatorSig) -> BeaconBlock: + slot: Slot, graffiti: GraffitiBytes, randao_reveal: ValidatorSig) -> phase0.BeaconBlock: debug "get_v1_validator_block", slot = slot let head = node.doChecksAndGetCurrentHead(slot) let proposer = node.dag.getProposer(head, slot) @@ -47,7 +47,7 @@ proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {. raise newException(CatchableError, "could not retrieve block for slot: " & $slot) return message.get() - rpcServer.rpc("post_v1_validator_block") do (body: SignedBeaconBlock) -> bool: + rpcServer.rpc("post_v1_validator_block") do (body: phase0.SignedBeaconBlock) -> bool: debug "post_v1_validator_block", slot = body.message.slot, prop_idx = body.message.proposer_index @@ -56,8 +56,7 @@ proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {. if head.slot >= body.message.slot: raise newException(CatchableError, "Proposal is for a past slot: " & $body.message.slot) - if head == await proposeSignedBlock( - node, head, AttachedValidator(), body): + if head == await proposeSignedBlock(node, head, AttachedValidator(), body): raise newException(CatchableError, "Could not propose block") return true diff --git a/beacon_chain/ssz/sszdump.nim b/beacon_chain/ssz/sszdump.nim index d261c811f..18227516e 100644 --- a/beacon_chain/ssz/sszdump.nim +++ b/beacon_chain/ssz/sszdump.nim @@ -28,10 +28,19 @@ proc dump*(dir: string, v: AttestationData, validator: ValidatorPubKey) = proc dump*(dir: string, v: phase0.TrustedSignedBeaconBlock) = logErrors: SSZ.saveFile(dir / &"block-{v.message.slot}-{shortLog(v.root)}.ssz", v) + +proc dump*(dir: string, v: altair.TrustedSignedBeaconBlock) = + logErrors: + SSZ.saveFile(dir / &"block-{v.message.slot}-{shortLog(v.root)}.ssz", v) + proc dump*(dir: string, v: phase0.SignedBeaconBlock) = logErrors: SSZ.saveFile(dir / &"block-{v.message.slot}-{shortLog(v.root)}.ssz", v) +proc dump*(dir: string, v: altair.SignedBeaconBlock) = + logErrors: + SSZ.saveFile(dir / &"block-{v.message.slot}-{shortLog(v.root)}.ssz", v) + proc dump*(dir: string, v: SomeHashedBeaconState, blck: BlockRef) = mixin saveFile logErrors: diff --git a/beacon_chain/sync/request_manager.nim b/beacon_chain/sync/request_manager.nim index 7e392ac2e..3365c3345 100644 --- a/beacon_chain/sync/request_manager.nim +++ b/beacon_chain/sync/request_manager.nim @@ -9,7 +9,7 @@ import options, sequtils, strutils import chronos, chronicles -import ../spec/[datatypes, digest], +import ../spec/[datatypes/phase0, datatypes/altair, digest, forkedbeaconstate_helpers], ../networking/eth2_network, ../beacon_node_types, ../ssz/merkleization, @@ -49,7 +49,7 @@ proc init*(T: type RequestManager, network: Eth2Node, ) proc checkResponse(roots: openArray[Eth2Digest], - blocks: openArray[SignedBeaconBlock]): bool = + blocks: openArray[ForkedSignedBeaconBlock]): bool = ## This procedure checks peer's response. var checks = @roots if len(blocks) > len(roots): @@ -63,7 +63,7 @@ proc checkResponse(roots: openArray[Eth2Digest], return true proc validate(rman: RequestManager, - b: SignedBeaconBlock): Future[Result[void, BlockError]] = + b: ForkedSignedBeaconBlock): Future[Result[void, BlockError]] = let resfut = newFuture[Result[void, BlockError]]("request.manager.validate") rman.blockProcessor[].addBlock(b, resfut) resfut @@ -76,7 +76,12 @@ proc fetchAncestorBlocksFromNetwork(rman: RequestManager, debug "Requesting blocks by root", peer = peer, blocks = shortLog(items), peer_score = peer.getScore() - let blocks = await peer.beaconBlocksByRoot(BlockRootsList items) + let blocks = if peer.useSyncV2(): + await peer.beaconBlocksByRoot_v2(BlockRootsList items) + else: + (await peer.beaconBlocksByRoot(BlockRootsList items)).map() do (blcks: seq[phase0.SignedBeaconBlock]) -> auto: + blcks.mapIt(ForkedSignedBeaconBlock.init(it)) + if blocks.isOk: let ublocks = blocks.get() if checkResponse(items, ublocks): diff --git a/beacon_chain/sync/sync_manager.nim b/beacon_chain/sync/sync_manager.nim index c8c5a477c..80337795c 100644 --- a/beacon_chain/sync/sync_manager.nim +++ b/beacon_chain/sync/sync_manager.nim @@ -10,12 +10,12 @@ import chronicles import options, deques, heapqueue, tables, strutils, sequtils, math, algorithm import stew/results, chronos, chronicles -import ../spec/[datatypes, digest, helpers, eth2_apis/callsigs_types], +import ../spec/[datatypes/phase0, datatypes/altair, digest, helpers, eth2_apis/callsigs_types, forkedbeaconstate_helpers], ../networking/[peer_pool, eth2_network] import ../gossip_processing/block_processor import ../consensus_object_pools/block_pools_types -export datatypes, digest, chronos, chronicles, results, block_pools_types, +export phase0, altair, digest, chronos, chronicles, results, block_pools_types, helpers logScope: @@ -70,7 +70,7 @@ type SyncResult*[T] = object request*: SyncRequest[T] - data*: seq[SignedBeaconBlock] + data*: seq[ForkedSignedBeaconBlock] SyncWaiter*[T] = object future: Future[bool] @@ -141,16 +141,16 @@ type stamp*: chronos.Moment SyncManagerError* = object of CatchableError - BeaconBlocksRes* = NetRes[seq[SignedBeaconBlock]] + BeaconBlocksRes* = NetRes[seq[ForkedSignedBeaconBlock]] proc validate*[T](sq: SyncQueue[T], - blk: SignedBeaconBlock): Future[Result[void, BlockError]] = + blk: ForkedSignedBeaconBlock): Future[Result[void, BlockError]] = let resfut = newFuture[Result[void, BlockError]]("sync.manager.validate") sq.blockProcessor[].addBlock(blk, resfut) resfut proc getShortMap*[T](req: SyncRequest[T], - data: openArray[SignedBeaconBlock]): string = + data: openArray[ForkedSignedBeaconBlock]): string = ## Returns all slot numbers in ``data`` as placement map. var res = newStringOfCap(req.count) var slider = req.slot @@ -158,11 +158,11 @@ proc getShortMap*[T](req: SyncRequest[T], for i in 0 ..< req.count: if last < len(data): for k in last ..< len(data): - if slider == data[k].message.slot: + if slider == data[k].slot: res.add('x') last = k + 1 break - elif slider < data[k].message.slot: + elif slider < data[k].slot: res.add('.') break else: @@ -178,7 +178,7 @@ proc cmp*[T](a, b: SyncRequest[T]): int = result = cmp(uint64(a.slot), uint64(b.slot)) proc checkResponse*[T](req: SyncRequest[T], - data: openArray[SignedBeaconBlock]): bool = + data: openArray[ForkedSignedBeaconBlock]): bool = if len(data) == 0: # Impossible to verify empty response. return true @@ -193,9 +193,9 @@ proc checkResponse*[T](req: SyncRequest[T], var dindex = 0 while (rindex < req.count) and (dindex < len(data)): - if slot < data[dindex].message.slot: + if slot < data[dindex].slot: discard - elif slot == data[dindex].message.slot: + elif slot == data[dindex].slot: inc(dindex) else: return false @@ -208,7 +208,7 @@ proc checkResponse*[T](req: SyncRequest[T], return false proc getFullMap*[T](req: SyncRequest[T], - data: openArray[SignedBeaconBlock]): string = + data: openArray[ForkedSignedBeaconBlock]): string = # Returns all slot numbers in ``data`` as comma-delimeted string. result = mapIt(data, $it.message.slot).join(", ") @@ -411,7 +411,7 @@ proc hasEndGap*[T](sr: SyncResult[T]): bool {.inline.} = let lastslot = sr.request.slot + sr.request.count - 1'u64 if len(sr.data) == 0: return true - if sr.data[^1].message.slot != lastslot: + if sr.data[^1].slot != lastslot: return true return false @@ -422,7 +422,7 @@ proc getLastNonEmptySlot*[T](sr: SyncResult[T]): Slot {.inline.} = # If response has only empty slots we going to use original request slot sr.request.slot else: - sr.data[^1].message.slot + sr.data[^1].slot proc toDebtsQueue[T](sq: SyncQueue[T], sr: SyncRequest[T]) = sq.debtsQueue.push(sr) @@ -512,7 +512,7 @@ proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot, compute_start_slot_at_epoch(rewindEpoch) proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], - data: seq[SignedBeaconBlock]) {.async, gcsafe.} = + data: seq[ForkedSignedBeaconBlock]) {.async, gcsafe.} = ## Push successfull result to queue ``sq``. mixin updateScore @@ -559,10 +559,10 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], if len(item.data) > 0: for blk in item.data: trace "Pushing block", block_root = blk.root, - block_slot = blk.message.slot + block_slot = blk.slot res = await sq.validate(blk) if not(res.isOk): - failSlot = some(blk.message.slot) + failSlot = some(blk.slot) break else: res = Result[void, BlockError].ok() @@ -757,20 +757,36 @@ proc getBlocks*[A, B](man: SyncManager[A, B], peer: A, slot = req.slot, slot_count = req.count, step = req.step, peer_score = peer.getScore(), peer_speed = peer.netKbps(), topics = "syncman" - var workFut = awaitne beaconBlocksByRange(peer, req.slot, req.count, req.step) - if workFut.failed(): - debug "Error, while waiting getBlocks response", peer = peer, - slot = req.slot, slot_count = req.count, step = req.step, - errMsg = workFut.readError().msg, peer_speed = peer.netKbps(), - topics = "syncman" + if peer.useSyncV2(): + var workFut = awaitne beaconBlocksByRange_v2(peer, req.slot, req.count, req.step) + if workFut.failed(): + debug "Error, while waiting getBlocks response", peer = peer, + slot = req.slot, slot_count = req.count, step = req.step, + errMsg = workFut.readError().msg, peer_speed = peer.netKbps(), + topics = "syncman" + else: + let res = workFut.read() + if res.isErr: + debug "Error, while reading getBlocks response", + peer = peer, slot = req.slot, count = req.count, + step = req.step, peer_speed = peer.netKbps(), + topics = "syncman", error = $res.error() + result = res else: - let res = workFut.read() - if res.isErr: - debug "Error, while reading getBlocks response", - peer = peer, slot = req.slot, count = req.count, - step = req.step, peer_speed = peer.netKbps(), - topics = "syncman", error = $res.error() - result = res + var workFut = awaitne beaconBlocksByRange(peer, req.slot, req.count, req.step) + if workFut.failed(): + debug "Error, while waiting getBlocks response", peer = peer, + slot = req.slot, slot_count = req.count, step = req.step, + errMsg = workFut.readError().msg, peer_speed = peer.netKbps(), + topics = "syncman" + else: + let res = workFut.read() + if res.isErr: + debug "Error, while reading getBlocks response", + peer = peer, slot = req.slot, count = req.count, + step = req.step, peer_speed = peer.netKbps(), + topics = "syncman", error = $res.error() + result = res.map() do (blcks: seq[phase0.SignedBeaconBlock]) -> auto: blcks.mapIt(ForkedSignedBeaconBlock.init(it)) template headAge(): uint64 = wallSlot - headSlot diff --git a/beacon_chain/sync/sync_protocol.nim b/beacon_chain/sync/sync_protocol.nim index ba14fc3ab..1a2dfaeec 100644 --- a/beacon_chain/sync/sync_protocol.nim +++ b/beacon_chain/sync/sync_protocol.nim @@ -230,8 +230,17 @@ p2pProtocol BeaconSync(version = 1, trace "wrote response block", slot = blocks[i].slot, roor = shortLog(blocks[i].root) let blk = dag.get(blocks[i]).data - # TODO Altair - await response.write(blk.phase0Block.asSigned) + case blk.kind + of BeaconBlockFork.Phase0: + await response.write(blk.phase0Block.asSigned) + of BeaconBlockFork.Altair: + # Skipping all subsequent blocks should be OK because the spec says: + # "Clients MAY limit the number of blocks in the response." + # https://github.com/ethereum/eth2.0-specs/blob/dev/specs/phase0/p2p-interface.md#beaconblocksbyrange + # + # Also, our response would be indistinguishable from a node + # that have been synced exactly to the altair transition slot. + break debug "Block range request done", peer, startSlot, count, reqStep, found = count - startIndex @@ -260,9 +269,18 @@ p2pProtocol BeaconSync(version = 1, let blockRef = dag.getRef(blockRoots[i]) if not isNil(blockRef): let blk = dag.get(blockRef).data - # TODO Altair - await response.write(blk.phase0Block.asSigned) - inc found + case blk.kind + of BeaconBlockFork.Phase0: + await response.write(blk.phase0Block.asSigned) + inc found + of BeaconBlockFork.Altair: + # Skipping this block should be fine because the spec says: + # "Clients MAY limit the number of blocks in the response." + # https://github.com/ethereum/eth2.0-specs/blob/dev/specs/phase0/p2p-interface.md#beaconblocksbyroot + # + # Also, our response would be indistinguishable from a node + # that have been synced exactly to the altair transition slot. + continue peer.updateRequestQuota(found.float * blockResponseCost) @@ -343,6 +361,16 @@ p2pProtocol BeaconSync(version = 1, {.async, libp2pProtocol("goodbye", 1).} = debug "Received Goodbye message", reason = disconnectReasonName(reason), peer +proc useSyncV2*(state: BeaconSyncNetworkState): bool = + let + wallTime = state.getTime() + wallTimeSlot = state.dag.beaconClock.toBeaconTime(wallTime).slotOrZero + + wallTimeSlot.epoch >= state.dag.cfg.ALTAIR_FORK_EPOCH + +proc useSyncV2*(peer: Peer): bool = + peer.networkState(BeaconSync).useSyncV2() + proc setStatusMsg(peer: Peer, statusMsg: StatusMsg) = debug "Peer status", peer, statusMsg peer.state(BeaconSync).statusMsg = statusMsg diff --git a/beacon_chain/validators/validator_duties.nim b/beacon_chain/validators/validator_duties.nim index 5ea63b720..5dd8d8392 100644 --- a/beacon_chain/validators/validator_duties.nim +++ b/beacon_chain/validators/validator_duties.nim @@ -21,8 +21,8 @@ import # Local modules ../spec/[ - datatypes, digest, crypto, forkedbeaconstate_helpers, helpers, network, - signatures, state_transition], + datatypes/phase0, datatypes/altair, digest, crypto, + forkedbeaconstate_helpers, helpers, network, signatures, state_transition], ../conf, ../beacon_clock, ../consensus_object_pools/[ spec_cache, blockchain_dag, block_clearance, @@ -298,7 +298,7 @@ proc makeBeaconBlockForHeadAndSlot*(node: BeaconNode, validator_index: ValidatorIndex, graffiti: GraffitiBytes, head: BlockRef, - slot: Slot): Future[Option[BeaconBlock]] {.async.} = + slot: Slot): Future[Option[phase0.BeaconBlock]] {.async.} = # Advance state to the slot that we're proposing for let @@ -312,9 +312,9 @@ proc makeBeaconBlockForHeadAndSlot*(node: BeaconNode, if eth1Proposal.hasMissingDeposits: error "Eth1 deposits not available. Skipping block proposal", slot - return none(BeaconBlock) + return none(phase0.BeaconBlock) - func restore(v: var HashedBeaconState) = + func restore(v: var phase0.HashedBeaconState) = # TODO address this ugly workaround - there should probably be a # `state_transition` that takes a `StateData` instead and updates # the block as well @@ -342,10 +342,10 @@ proc makeBeaconBlockForHeadAndSlot*(node: BeaconNode, proc proposeSignedBlock*(node: BeaconNode, head: BlockRef, validator: AttachedValidator, - newBlock: SignedBeaconBlock): + newBlock: phase0.SignedBeaconBlock): Future[BlockRef] {.async.} = let newBlockRef = node.dag.addRawBlock(node.quarantine, newBlock) do ( - blckRef: BlockRef, trustedBlock: TrustedSignedBeaconBlock, + blckRef: BlockRef, trustedBlock: phase0.TrustedSignedBeaconBlock, epochRef: EpochRef): # Callback add to fork choice if signed block valid (and becomes trusted) node.attestationPool[].addForkChoice( @@ -401,7 +401,7 @@ proc proposeBlock(node: BeaconNode, return head # already logged elsewhere! var - newBlock = SignedBeaconBlock( + newBlock = phase0.SignedBeaconBlock( message: message.get() ) diff --git a/tests/test_sync_manager.nim b/tests/test_sync_manager.nim index 3678c10ed..66c190f15 100644 --- a/tests/test_sync_manager.nim +++ b/tests/test_sync_manager.nim @@ -4,7 +4,9 @@ import std/strutils import unittest2 import chronos import ../beacon_chain/gossip_processing/block_processor, - ../beacon_chain/sync/sync_manager + ../beacon_chain/sync/sync_manager, + ../beacon_chain/spec/datatypes/phase0, + ../beacon_chain/spec/forkedbeaconstate_helpers type SomeTPeer = ref object @@ -26,13 +28,13 @@ proc newBlockProcessor(): ref BlockProcessor = ) suite "SyncManager test suite": - proc createChain(start, finish: Slot): seq[SignedBeaconBlock] = + proc createChain(start, finish: Slot): seq[ForkedSignedBeaconBlock] = doAssert(start <= finish) let count = int(finish - start + 1'u64) - result = newSeq[SignedBeaconBlock](count) + result = newSeq[ForkedSignedBeaconBlock](count) var curslot = start for item in result.mitems(): - item.message.slot = curslot + item.phase0Block.message.slot = curslot curslot = curslot + 1'u64 test "[SyncQueue] Start and finish slots equal": @@ -220,7 +222,7 @@ suite "SyncManager test suite": proc simpleValidator(aq: AsyncQueue[BlockEntry]) {.async.} = while true: let sblock = await aq.popFirst() - if sblock.blck.message.slot == Slot(counter): + if sblock.blck.slot == Slot(counter): inc(counter) sblock.done() else: @@ -269,7 +271,7 @@ suite "SyncManager test suite": proc simpleValidator(aq: AsyncQueue[BlockEntry]) {.async.} = while true: let sblock = await aq.popFirst() - if sblock.blck.message.slot == Slot(counter): + if sblock.blck.slot == Slot(counter): inc(counter) sblock.done() else: @@ -324,7 +326,7 @@ suite "SyncManager test suite": proc simpleValidator(aq: AsyncQueue[BlockEntry]) {.async.} = while true: let sblock = await aq.popFirst() - if sblock.blck.message.slot == Slot(counter): + if sblock.blck.slot == Slot(counter): inc(counter) sblock.done() else: @@ -395,7 +397,7 @@ suite "SyncManager test suite": test "[SyncQueue] hasEndGap() test": let chain1 = createChain(Slot(1), Slot(1)) - let chain2 = newSeq[SignedBeaconBlock]() + let chain2 = newSeq[ForkedSignedBeaconBlock]() for counter in countdown(32'u64, 2'u64): let req = SyncRequest[SomeTPeer](slot: Slot(1), count: counter, @@ -412,7 +414,7 @@ suite "SyncManager test suite": test "[SyncQueue] getLastNonEmptySlot() test": let chain1 = createChain(Slot(10), Slot(10)) - let chain2 = newSeq[SignedBeaconBlock]() + let chain2 = newSeq[ForkedSignedBeaconBlock]() for counter in countdown(32'u64, 2'u64): let req = SyncRequest[SomeTPeer](slot: Slot(10), count: counter, diff --git a/vendor/nim-libp2p b/vendor/nim-libp2p index 3da656687..ac7f2033a 160000 --- a/vendor/nim-libp2p +++ b/vendor/nim-libp2p @@ -1 +1 @@ -Subproject commit 3da656687be63ccbf5d659af55d159130d325038 +Subproject commit ac7f2033a0d5e51d5f5a70b5485de79f90c20b39