Syncv2 (#2723)
* bump libp2p * altair sync v2 Use V2 sync requests after the altair fork has happened, according to the wall clock * Fix the behavior of the v1 req/resp calls after Altair Co-authored-by: Zahary Karadjov <zahary@gmail.com>
This commit is contained in:
parent
11541ff61b
commit
2d6a661ac6
|
@ -11,12 +11,16 @@ import
|
|||
std/math,
|
||||
stew/results,
|
||||
chronicles, chronos, metrics,
|
||||
../spec/[crypto, datatypes/phase0, digest],
|
||||
../spec/[
|
||||
crypto, datatypes/phase0, datatypes/altair, digest,
|
||||
forkedbeaconstate_helpers],
|
||||
../consensus_object_pools/[block_clearance, blockchain_dag, attestation_pool],
|
||||
./consensus_manager,
|
||||
".."/[beacon_clock, beacon_node_types],
|
||||
../ssz/sszdump
|
||||
|
||||
export sszdump
|
||||
|
||||
# Block Processor
|
||||
# ------------------------------------------------------------------------------
|
||||
# The block processor moves blocks from "Incoming" to "Consensus verified"
|
||||
|
@ -26,7 +30,7 @@ declareHistogram beacon_store_block_duration_seconds,
|
|||
|
||||
type
|
||||
BlockEntry* = object
|
||||
blck*: phase0.SignedBeaconBlock
|
||||
blck*: ForkedSignedBeaconBlock
|
||||
resfut*: Future[Result[void, BlockError]]
|
||||
queueTick*: Moment # Moment when block was enqueued
|
||||
validationDur*: Duration # Time it took to perform gossip validation
|
||||
|
@ -103,7 +107,7 @@ proc hasBlocks*(self: BlockProcessor): bool =
|
|||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc addBlock*(
|
||||
self: var BlockProcessor, blck: phase0.SignedBeaconBlock,
|
||||
self: var BlockProcessor, blck: ForkedSignedBeaconBlock,
|
||||
resfut: Future[Result[void, BlockError]] = nil,
|
||||
validationDur = Duration()) =
|
||||
## Enqueue a Gossip-validated block for consensus verification
|
||||
|
@ -128,7 +132,8 @@ proc addBlock*(
|
|||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc dumpBlock*[T](
|
||||
self: BlockProcessor, signedBlock: phase0.SignedBeaconBlock,
|
||||
self: BlockProcessor,
|
||||
signedBlock: phase0.SignedBeaconBlock | altair.SignedBeaconBlock,
|
||||
res: Result[T, (ValidationResult, BlockError)]) =
|
||||
if self.dumpEnabled and res.isErr:
|
||||
case res.error[1]
|
||||
|
@ -142,15 +147,16 @@ proc dumpBlock*[T](
|
|||
discard
|
||||
|
||||
proc storeBlock(
|
||||
self: var BlockProcessor, signedBlock: phase0.SignedBeaconBlock,
|
||||
self: var BlockProcessor,
|
||||
signedBlock: phase0.SignedBeaconBlock | altair.SignedBeaconBlock,
|
||||
wallSlot: Slot): Result[void, BlockError] =
|
||||
let
|
||||
attestationPool = self.consensusManager.attestationPool
|
||||
|
||||
type Trusted = typeof signedBlock.asTrusted()
|
||||
let blck = self.consensusManager.dag.addRawBlock(
|
||||
self.consensusManager.quarantine, signedBlock) do (
|
||||
blckRef: BlockRef, trustedBlock: phase0.TrustedSignedBeaconBlock,
|
||||
epochRef: EpochRef):
|
||||
blckRef: BlockRef, trustedBlock: Trusted, epochRef: EpochRef):
|
||||
# Callback add to fork choice if valid
|
||||
attestationPool[].addForkChoice(
|
||||
epochRef, blckRef, trustedBlock.message, wallSlot)
|
||||
|
@ -181,7 +187,7 @@ proc processBlock(self: var BlockProcessor, entry: BlockEntry) =
|
|||
|
||||
let
|
||||
startTick = Moment.now()
|
||||
res = self.storeBlock(entry.blck, wallSlot)
|
||||
res = withBlck(entry.blck): self.storeBlock(blck, wallSlot)
|
||||
storeBlockTick = Moment.now()
|
||||
|
||||
if res.isOk():
|
||||
|
@ -198,7 +204,7 @@ proc processBlock(self: var BlockProcessor, entry: BlockEntry) =
|
|||
|
||||
debug "Block processed",
|
||||
localHeadSlot = self.consensusManager.dag.head.slot,
|
||||
blockSlot = entry.blck.message.slot,
|
||||
blockSlot = entry.blck.slot,
|
||||
validationDur = entry.validationDur,
|
||||
queueDur, storeBlockDur, updateHeadDur
|
||||
|
||||
|
|
|
@ -11,14 +11,14 @@ import
|
|||
std/tables,
|
||||
stew/results,
|
||||
chronicles, chronos, metrics,
|
||||
../spec/[crypto, digest],
|
||||
../spec/datatypes/base,
|
||||
../spec/[crypto, digest, forkedbeaconstate_helpers],
|
||||
../spec/datatypes/[altair, phase0],
|
||||
../consensus_object_pools/[block_clearance, blockchain_dag, exit_pool, attestation_pool],
|
||||
./gossip_validation, ./block_processor,
|
||||
./batch_validation,
|
||||
../validators/validator_pool,
|
||||
../beacon_node_types,
|
||||
../beacon_clock, ../ssz/sszdump
|
||||
../beacon_clock
|
||||
|
||||
# Metrics for tracking attestation and beacon block loss
|
||||
declareCounter beacon_attestations_received,
|
||||
|
@ -127,7 +127,7 @@ proc getCurrentBeaconTime*(self: Eth2Processor|ref Eth2Processor): BeaconTime =
|
|||
|
||||
proc blockValidator*(
|
||||
self: var Eth2Processor,
|
||||
signedBlock: SignedBeaconBlock): ValidationResult =
|
||||
signedBlock: phase0.SignedBeaconBlock | altair.SignedBeaconBlock): ValidationResult =
|
||||
logScope:
|
||||
signedBlock = shortLog(signedBlock.message)
|
||||
blockRoot = shortLog(signedBlock.root)
|
||||
|
@ -172,7 +172,8 @@ proc blockValidator*(
|
|||
# propagation of seemingly good blocks
|
||||
trace "Block validated"
|
||||
self.blockProcessor[].addBlock(
|
||||
signedBlock, validationDur = self.getCurrentBeaconTime() - wallTime)
|
||||
ForkedSignedBeaconBlock.init(signedBlock),
|
||||
validationDur = self.getCurrentBeaconTime() - wallTime)
|
||||
|
||||
ValidationResult.Accept
|
||||
|
||||
|
|
|
@ -39,9 +39,9 @@ import
|
|||
./rpc/[beacon_api, config_api, debug_api, event_api, nimbus_api, node_api,
|
||||
validator_api],
|
||||
./spec/[
|
||||
datatypes, digest, crypto, forkedbeaconstate_helpers, beaconstate,
|
||||
eth2_apis/beacon_rpc_client, helpers, network, presets, weak_subjectivity,
|
||||
signatures],
|
||||
datatypes/phase0, datatypes/altair, digest, crypto,
|
||||
forkedbeaconstate_helpers, beaconstate, eth2_apis/beacon_rpc_client,
|
||||
helpers, network, presets, weak_subjectivity, signatures],
|
||||
./consensus_object_pools/[
|
||||
blockchain_dag, block_quarantine, block_clearance, block_pools_types,
|
||||
attestation_pool, exit_pool, spec_cache],
|
||||
|
@ -117,13 +117,13 @@ proc init*(T: type BeaconNode,
|
|||
db = BeaconChainDB.new(config.databaseDir, inMemory = false)
|
||||
|
||||
var
|
||||
genesisState, checkpointState: ref BeaconState
|
||||
checkpointBlock: TrustedSignedBeaconBlock
|
||||
genesisState, checkpointState: ref phase0.BeaconState
|
||||
checkpointBlock: phase0.TrustedSignedBeaconBlock
|
||||
|
||||
if config.finalizedCheckpointState.isSome:
|
||||
let checkpointStatePath = config.finalizedCheckpointState.get.string
|
||||
checkpointState = try:
|
||||
newClone(SSZ.loadFile(checkpointStatePath, BeaconState))
|
||||
newClone(SSZ.loadFile(checkpointStatePath, phase0.BeaconState))
|
||||
except SerializationError as err:
|
||||
fatal "Checkpoint state deserialization failed",
|
||||
err = formatMsg(err, checkpointStatePath)
|
||||
|
@ -140,7 +140,7 @@ proc init*(T: type BeaconNode,
|
|||
let checkpointBlockPath = config.finalizedCheckpointBlock.get.string
|
||||
try:
|
||||
# TODO Perform sanity checks like signature and slot verification at least
|
||||
checkpointBlock = SSZ.loadFile(checkpointBlockPath, TrustedSignedBeaconBlock)
|
||||
checkpointBlock = SSZ.loadFile(checkpointBlockPath, phase0.TrustedSignedBeaconBlock)
|
||||
except SerializationError as err:
|
||||
fatal "Invalid checkpoint block", err = err.formatMsg(checkpointBlockPath)
|
||||
quit 1
|
||||
|
@ -156,8 +156,8 @@ proc init*(T: type BeaconNode,
|
|||
var eth1Monitor: Eth1Monitor
|
||||
if not ChainDAGRef.isInitialized(db):
|
||||
var
|
||||
tailState: ref BeaconState
|
||||
tailBlock: TrustedSignedBeaconBlock
|
||||
tailState: ref phase0.BeaconState
|
||||
tailBlock: phase0.TrustedSignedBeaconBlock
|
||||
|
||||
if genesisStateContents.len == 0 and checkpointState == nil:
|
||||
when hasGenesisDetection:
|
||||
|
@ -216,7 +216,7 @@ proc init*(T: type BeaconNode,
|
|||
quit 1
|
||||
else:
|
||||
try:
|
||||
genesisState = newClone(SSZ.decode(genesisStateContents, BeaconState))
|
||||
genesisState = newClone(SSZ.decode(genesisStateContents, phase0.BeaconState))
|
||||
except CatchableError as err:
|
||||
raiseAssert "Invalid baked-in state: " & err.msg
|
||||
|
||||
|
@ -1118,7 +1118,7 @@ proc installMessageValidators(node: BeaconNode) =
|
|||
|
||||
node.network.addValidator(
|
||||
getBeaconBlocksTopic(node.dag.forkDigests.phase0),
|
||||
proc (signedBlock: SignedBeaconBlock): ValidationResult =
|
||||
proc (signedBlock: phase0.SignedBeaconBlock): ValidationResult =
|
||||
node.processor[].blockValidator(signedBlock))
|
||||
|
||||
node.network.addValidator(
|
||||
|
|
|
@ -18,7 +18,7 @@ import
|
|||
../validators/validator_duties,
|
||||
../gossip_processing/gossip_validation,
|
||||
../consensus_object_pools/blockchain_dag,
|
||||
../spec/[crypto, datatypes, digest, forkedbeaconstate_helpers, network],
|
||||
../spec/[crypto, datatypes/phase0, digest, forkedbeaconstate_helpers, network],
|
||||
../spec/eth2_apis/callsigs_types,
|
||||
../ssz/merkleization,
|
||||
./rpc_utils, ./eth2_json_rpc_serialization
|
||||
|
@ -399,7 +399,7 @@ proc installBeaconApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {.
|
|||
|
||||
result.canonical = bd.refs.isAncestorOf(node.dag.head)
|
||||
|
||||
rpcServer.rpc("post_v1_beacon_blocks") do (blck: SignedBeaconBlock) -> int:
|
||||
rpcServer.rpc("post_v1_beacon_blocks") do (blck: phase0.SignedBeaconBlock) -> int:
|
||||
if not(node.syncManager.inProgress):
|
||||
raise newException(CatchableError,
|
||||
"Beacon node is currently syncing, try again later.")
|
||||
|
@ -427,7 +427,7 @@ proc installBeaconApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {.
|
|||
return 200
|
||||
|
||||
rpcServer.rpc("get_v1_beacon_blocks_blockId") do (
|
||||
blockId: string) -> TrustedSignedBeaconBlock:
|
||||
blockId: string) -> phase0.TrustedSignedBeaconBlock:
|
||||
# TODO detect Altair and fail: /v1/ APIs don't support Altair
|
||||
return node.getBlockDataFromBlockId(blockId).data.phase0Block
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@ import
|
|||
../gossip_processing/gossip_validation,
|
||||
../validators/validator_duties,
|
||||
../spec/[crypto, digest, forkedbeaconstate_helpers, network],
|
||||
../spec/datatypes/base,
|
||||
../spec/datatypes/phase0,
|
||||
../ssz/merkleization,
|
||||
./eth2_json_rest_serialization, ./rest_utils
|
||||
|
||||
|
@ -595,7 +595,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
|
|||
block:
|
||||
if contentBody.isNone():
|
||||
return RestApiResponse.jsonError(Http400, EmptyRequestBodyError)
|
||||
let dres = decodeBody(SignedBeaconBlock, contentBody.get())
|
||||
let dres = decodeBody(phase0.SignedBeaconBlock, contentBody.get())
|
||||
if dres.isErr():
|
||||
return RestApiResponse.jsonError(Http400, InvalidBlockObjectError,
|
||||
$dres.error())
|
||||
|
@ -615,7 +615,8 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
|
|||
node.network.broadcast(blocksTopic, blck)
|
||||
return RestApiResponse.jsonError(Http202, BlockValidationError)
|
||||
else:
|
||||
let res = await proposeSignedBlock(node, head, AttachedValidator(), blck)
|
||||
let res = await proposeSignedBlock(
|
||||
node, head, AttachedValidator(), blck)
|
||||
if res == head:
|
||||
# TODO altair-transition, but not for immediate testnet-priority
|
||||
let blocksTopic = getBeaconBlocksTopic(node.dag.forkDigests.phase0)
|
||||
|
@ -636,7 +637,7 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
|
|||
if res.isErr():
|
||||
return RestApiResponse.jsonError(Http404, BlockNotFoundError)
|
||||
res.get()
|
||||
static: doAssert bdata.data.phase0Block is TrustedSignedBeaconBlock
|
||||
static: doAssert bdata.data.phase0Block is phase0.TrustedSignedBeaconBlock
|
||||
return RestApiResponse.jsonResponse(bdata.data.phase0Block)
|
||||
|
||||
# https://ethereum.github.io/eth2.0-APIs/#/Beacon/getBlockRoot
|
||||
|
@ -950,7 +951,7 @@ proc getStateFork*(state_id: StateIdent): RestResponse[DataRestFork] {.
|
|||
meth: MethodGet.}
|
||||
## https://ethereum.github.io/eth2.0-APIs/#/Beacon/getStateFork
|
||||
|
||||
proc publishBlock*(body: SignedBeaconBlock): RestPlainResponse {.
|
||||
proc publishBlock*(body: phase0.SignedBeaconBlock): RestPlainResponse {.
|
||||
rest, endpoint: "/eth/v1/beacon/blocks",
|
||||
meth: MethodPost.}
|
||||
## https://ethereum.github.io/eth2.0-APIs/#/Beacon/publishBlock
|
||||
|
|
|
@ -13,7 +13,7 @@ import
|
|||
chronicles,
|
||||
../version, ../beacon_node_common,
|
||||
../networking/[eth2_network, peer_pool],
|
||||
../spec/datatypes/base,
|
||||
../spec/datatypes/phase0,
|
||||
../spec/[digest, presets],
|
||||
./rpc_utils, ./eth2_json_rpc_serialization
|
||||
|
||||
|
@ -25,7 +25,7 @@ type
|
|||
proc installDebugApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {.
|
||||
raises: [Exception].} = # TODO fix json-rpc
|
||||
rpcServer.rpc("get_v1_debug_beacon_states_stateId") do (
|
||||
stateId: string) -> BeaconState:
|
||||
stateId: string) -> phase0.BeaconState:
|
||||
withStateForStateId(stateId):
|
||||
return stateData.data.hbsPhase0.data
|
||||
|
||||
|
|
|
@ -7,7 +7,7 @@ import
|
|||
nimcrypto/utils as ncrutils,
|
||||
../beacon_node_common, ../networking/eth2_network,
|
||||
../consensus_object_pools/[blockchain_dag, exit_pool],
|
||||
../spec/[crypto, digest, datatypes, eth2_apis/callsigs_types],
|
||||
../spec/[crypto, digest, datatypes/phase0, eth2_apis/callsigs_types],
|
||||
../ssz/merkleization,
|
||||
rest_utils
|
||||
export json_serialization
|
||||
|
@ -151,7 +151,7 @@ type
|
|||
DataRestFork* = DataEnclosedObject[Fork]
|
||||
DataRestProposerDuties* = DataRootEnclosedObject[seq[RestProposerDuty]]
|
||||
DataRestAttesterDuties* = DataRootEnclosedObject[seq[RestAttesterDuty]]
|
||||
DataRestBeaconBlock* = DataEnclosedObject[BeaconBlock]
|
||||
DataRestBeaconBlock* = DataEnclosedObject[phase0.BeaconBlock]
|
||||
DataRestAttestationData* = DataEnclosedObject[AttestationData]
|
||||
DataRestAttestation* = DataEnclosedObject[Attestation]
|
||||
DataRestSyncInfo* = DataEnclosedObject[RestSyncInfo]
|
||||
|
@ -160,7 +160,7 @@ type
|
|||
DataRestVersion* = DataEnclosedObject[RestVersion]
|
||||
DataRestConfig* = DataEnclosedObject[RestConfig]
|
||||
|
||||
EncodeTypes* = SignedBeaconBlock
|
||||
EncodeTypes* = phase0.SignedBeaconBlock
|
||||
EncodeArrays* = seq[ValidatorIndex] | seq[Attestation] |
|
||||
seq[SignedAggregateAndProof] | seq[RestCommitteeSubscription]
|
||||
|
||||
|
@ -581,7 +581,7 @@ proc decodeBody*[T](t: typedesc[T],
|
|||
return err("Unexpected deserialization error")
|
||||
ok(data)
|
||||
|
||||
RestJson.useCustomSerialization(BeaconState.justification_bits):
|
||||
RestJson.useCustomSerialization(phase0.BeaconState.justification_bits):
|
||||
read:
|
||||
let s = reader.readValue(string)
|
||||
if s.len != 4:
|
||||
|
|
|
@ -18,7 +18,7 @@ import
|
|||
|
||||
# Local modules
|
||||
../spec/[crypto, digest, forkedbeaconstate_helpers, helpers, network, signatures],
|
||||
../spec/datatypes/base,
|
||||
../spec/datatypes/phase0,
|
||||
../spec/eth2_apis/callsigs_types,
|
||||
../consensus_object_pools/[blockchain_dag, spec_cache, attestation_pool], ../ssz/merkleization,
|
||||
../beacon_node_common, ../beacon_node_types,
|
||||
|
@ -35,7 +35,7 @@ type
|
|||
proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {.
|
||||
raises: [Exception].} = # TODO fix json-rpc
|
||||
rpcServer.rpc("get_v1_validator_block") do (
|
||||
slot: Slot, graffiti: GraffitiBytes, randao_reveal: ValidatorSig) -> BeaconBlock:
|
||||
slot: Slot, graffiti: GraffitiBytes, randao_reveal: ValidatorSig) -> phase0.BeaconBlock:
|
||||
debug "get_v1_validator_block", slot = slot
|
||||
let head = node.doChecksAndGetCurrentHead(slot)
|
||||
let proposer = node.dag.getProposer(head, slot)
|
||||
|
@ -47,7 +47,7 @@ proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {.
|
|||
raise newException(CatchableError, "could not retrieve block for slot: " & $slot)
|
||||
return message.get()
|
||||
|
||||
rpcServer.rpc("post_v1_validator_block") do (body: SignedBeaconBlock) -> bool:
|
||||
rpcServer.rpc("post_v1_validator_block") do (body: phase0.SignedBeaconBlock) -> bool:
|
||||
debug "post_v1_validator_block",
|
||||
slot = body.message.slot,
|
||||
prop_idx = body.message.proposer_index
|
||||
|
@ -56,8 +56,7 @@ proc installValidatorApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {.
|
|||
if head.slot >= body.message.slot:
|
||||
raise newException(CatchableError,
|
||||
"Proposal is for a past slot: " & $body.message.slot)
|
||||
if head == await proposeSignedBlock(
|
||||
node, head, AttachedValidator(), body):
|
||||
if head == await proposeSignedBlock(node, head, AttachedValidator(), body):
|
||||
raise newException(CatchableError, "Could not propose block")
|
||||
return true
|
||||
|
||||
|
|
|
@ -28,10 +28,19 @@ proc dump*(dir: string, v: AttestationData, validator: ValidatorPubKey) =
|
|||
proc dump*(dir: string, v: phase0.TrustedSignedBeaconBlock) =
|
||||
logErrors:
|
||||
SSZ.saveFile(dir / &"block-{v.message.slot}-{shortLog(v.root)}.ssz", v)
|
||||
|
||||
proc dump*(dir: string, v: altair.TrustedSignedBeaconBlock) =
|
||||
logErrors:
|
||||
SSZ.saveFile(dir / &"block-{v.message.slot}-{shortLog(v.root)}.ssz", v)
|
||||
|
||||
proc dump*(dir: string, v: phase0.SignedBeaconBlock) =
|
||||
logErrors:
|
||||
SSZ.saveFile(dir / &"block-{v.message.slot}-{shortLog(v.root)}.ssz", v)
|
||||
|
||||
proc dump*(dir: string, v: altair.SignedBeaconBlock) =
|
||||
logErrors:
|
||||
SSZ.saveFile(dir / &"block-{v.message.slot}-{shortLog(v.root)}.ssz", v)
|
||||
|
||||
proc dump*(dir: string, v: SomeHashedBeaconState, blck: BlockRef) =
|
||||
mixin saveFile
|
||||
logErrors:
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
|
||||
import options, sequtils, strutils
|
||||
import chronos, chronicles
|
||||
import ../spec/[datatypes, digest],
|
||||
import ../spec/[datatypes/phase0, datatypes/altair, digest, forkedbeaconstate_helpers],
|
||||
../networking/eth2_network,
|
||||
../beacon_node_types,
|
||||
../ssz/merkleization,
|
||||
|
@ -49,7 +49,7 @@ proc init*(T: type RequestManager, network: Eth2Node,
|
|||
)
|
||||
|
||||
proc checkResponse(roots: openArray[Eth2Digest],
|
||||
blocks: openArray[SignedBeaconBlock]): bool =
|
||||
blocks: openArray[ForkedSignedBeaconBlock]): bool =
|
||||
## This procedure checks peer's response.
|
||||
var checks = @roots
|
||||
if len(blocks) > len(roots):
|
||||
|
@ -63,7 +63,7 @@ proc checkResponse(roots: openArray[Eth2Digest],
|
|||
return true
|
||||
|
||||
proc validate(rman: RequestManager,
|
||||
b: SignedBeaconBlock): Future[Result[void, BlockError]] =
|
||||
b: ForkedSignedBeaconBlock): Future[Result[void, BlockError]] =
|
||||
let resfut = newFuture[Result[void, BlockError]]("request.manager.validate")
|
||||
rman.blockProcessor[].addBlock(b, resfut)
|
||||
resfut
|
||||
|
@ -76,7 +76,12 @@ proc fetchAncestorBlocksFromNetwork(rman: RequestManager,
|
|||
debug "Requesting blocks by root", peer = peer, blocks = shortLog(items),
|
||||
peer_score = peer.getScore()
|
||||
|
||||
let blocks = await peer.beaconBlocksByRoot(BlockRootsList items)
|
||||
let blocks = if peer.useSyncV2():
|
||||
await peer.beaconBlocksByRoot_v2(BlockRootsList items)
|
||||
else:
|
||||
(await peer.beaconBlocksByRoot(BlockRootsList items)).map() do (blcks: seq[phase0.SignedBeaconBlock]) -> auto:
|
||||
blcks.mapIt(ForkedSignedBeaconBlock.init(it))
|
||||
|
||||
if blocks.isOk:
|
||||
let ublocks = blocks.get()
|
||||
if checkResponse(items, ublocks):
|
||||
|
|
|
@ -10,12 +10,12 @@
|
|||
import chronicles
|
||||
import options, deques, heapqueue, tables, strutils, sequtils, math, algorithm
|
||||
import stew/results, chronos, chronicles
|
||||
import ../spec/[datatypes, digest, helpers, eth2_apis/callsigs_types],
|
||||
import ../spec/[datatypes/phase0, datatypes/altair, digest, helpers, eth2_apis/callsigs_types, forkedbeaconstate_helpers],
|
||||
../networking/[peer_pool, eth2_network]
|
||||
|
||||
import ../gossip_processing/block_processor
|
||||
import ../consensus_object_pools/block_pools_types
|
||||
export datatypes, digest, chronos, chronicles, results, block_pools_types,
|
||||
export phase0, altair, digest, chronos, chronicles, results, block_pools_types,
|
||||
helpers
|
||||
|
||||
logScope:
|
||||
|
@ -70,7 +70,7 @@ type
|
|||
|
||||
SyncResult*[T] = object
|
||||
request*: SyncRequest[T]
|
||||
data*: seq[SignedBeaconBlock]
|
||||
data*: seq[ForkedSignedBeaconBlock]
|
||||
|
||||
SyncWaiter*[T] = object
|
||||
future: Future[bool]
|
||||
|
@ -141,16 +141,16 @@ type
|
|||
stamp*: chronos.Moment
|
||||
|
||||
SyncManagerError* = object of CatchableError
|
||||
BeaconBlocksRes* = NetRes[seq[SignedBeaconBlock]]
|
||||
BeaconBlocksRes* = NetRes[seq[ForkedSignedBeaconBlock]]
|
||||
|
||||
proc validate*[T](sq: SyncQueue[T],
|
||||
blk: SignedBeaconBlock): Future[Result[void, BlockError]] =
|
||||
blk: ForkedSignedBeaconBlock): Future[Result[void, BlockError]] =
|
||||
let resfut = newFuture[Result[void, BlockError]]("sync.manager.validate")
|
||||
sq.blockProcessor[].addBlock(blk, resfut)
|
||||
resfut
|
||||
|
||||
proc getShortMap*[T](req: SyncRequest[T],
|
||||
data: openArray[SignedBeaconBlock]): string =
|
||||
data: openArray[ForkedSignedBeaconBlock]): string =
|
||||
## Returns all slot numbers in ``data`` as placement map.
|
||||
var res = newStringOfCap(req.count)
|
||||
var slider = req.slot
|
||||
|
@ -158,11 +158,11 @@ proc getShortMap*[T](req: SyncRequest[T],
|
|||
for i in 0 ..< req.count:
|
||||
if last < len(data):
|
||||
for k in last ..< len(data):
|
||||
if slider == data[k].message.slot:
|
||||
if slider == data[k].slot:
|
||||
res.add('x')
|
||||
last = k + 1
|
||||
break
|
||||
elif slider < data[k].message.slot:
|
||||
elif slider < data[k].slot:
|
||||
res.add('.')
|
||||
break
|
||||
else:
|
||||
|
@ -178,7 +178,7 @@ proc cmp*[T](a, b: SyncRequest[T]): int =
|
|||
result = cmp(uint64(a.slot), uint64(b.slot))
|
||||
|
||||
proc checkResponse*[T](req: SyncRequest[T],
|
||||
data: openArray[SignedBeaconBlock]): bool =
|
||||
data: openArray[ForkedSignedBeaconBlock]): bool =
|
||||
if len(data) == 0:
|
||||
# Impossible to verify empty response.
|
||||
return true
|
||||
|
@ -193,9 +193,9 @@ proc checkResponse*[T](req: SyncRequest[T],
|
|||
var dindex = 0
|
||||
|
||||
while (rindex < req.count) and (dindex < len(data)):
|
||||
if slot < data[dindex].message.slot:
|
||||
if slot < data[dindex].slot:
|
||||
discard
|
||||
elif slot == data[dindex].message.slot:
|
||||
elif slot == data[dindex].slot:
|
||||
inc(dindex)
|
||||
else:
|
||||
return false
|
||||
|
@ -208,7 +208,7 @@ proc checkResponse*[T](req: SyncRequest[T],
|
|||
return false
|
||||
|
||||
proc getFullMap*[T](req: SyncRequest[T],
|
||||
data: openArray[SignedBeaconBlock]): string =
|
||||
data: openArray[ForkedSignedBeaconBlock]): string =
|
||||
# Returns all slot numbers in ``data`` as comma-delimeted string.
|
||||
result = mapIt(data, $it.message.slot).join(", ")
|
||||
|
||||
|
@ -411,7 +411,7 @@ proc hasEndGap*[T](sr: SyncResult[T]): bool {.inline.} =
|
|||
let lastslot = sr.request.slot + sr.request.count - 1'u64
|
||||
if len(sr.data) == 0:
|
||||
return true
|
||||
if sr.data[^1].message.slot != lastslot:
|
||||
if sr.data[^1].slot != lastslot:
|
||||
return true
|
||||
return false
|
||||
|
||||
|
@ -422,7 +422,7 @@ proc getLastNonEmptySlot*[T](sr: SyncResult[T]): Slot {.inline.} =
|
|||
# If response has only empty slots we going to use original request slot
|
||||
sr.request.slot
|
||||
else:
|
||||
sr.data[^1].message.slot
|
||||
sr.data[^1].slot
|
||||
|
||||
proc toDebtsQueue[T](sq: SyncQueue[T], sr: SyncRequest[T]) =
|
||||
sq.debtsQueue.push(sr)
|
||||
|
@ -512,7 +512,7 @@ proc getRewindPoint*[T](sq: SyncQueue[T], failSlot: Slot,
|
|||
compute_start_slot_at_epoch(rewindEpoch)
|
||||
|
||||
proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
|
||||
data: seq[SignedBeaconBlock]) {.async, gcsafe.} =
|
||||
data: seq[ForkedSignedBeaconBlock]) {.async, gcsafe.} =
|
||||
## Push successfull result to queue ``sq``.
|
||||
mixin updateScore
|
||||
|
||||
|
@ -559,10 +559,10 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T],
|
|||
if len(item.data) > 0:
|
||||
for blk in item.data:
|
||||
trace "Pushing block", block_root = blk.root,
|
||||
block_slot = blk.message.slot
|
||||
block_slot = blk.slot
|
||||
res = await sq.validate(blk)
|
||||
if not(res.isOk):
|
||||
failSlot = some(blk.message.slot)
|
||||
failSlot = some(blk.slot)
|
||||
break
|
||||
else:
|
||||
res = Result[void, BlockError].ok()
|
||||
|
@ -757,20 +757,36 @@ proc getBlocks*[A, B](man: SyncManager[A, B], peer: A,
|
|||
slot = req.slot, slot_count = req.count, step = req.step,
|
||||
peer_score = peer.getScore(), peer_speed = peer.netKbps(),
|
||||
topics = "syncman"
|
||||
var workFut = awaitne beaconBlocksByRange(peer, req.slot, req.count, req.step)
|
||||
if workFut.failed():
|
||||
debug "Error, while waiting getBlocks response", peer = peer,
|
||||
slot = req.slot, slot_count = req.count, step = req.step,
|
||||
errMsg = workFut.readError().msg, peer_speed = peer.netKbps(),
|
||||
topics = "syncman"
|
||||
if peer.useSyncV2():
|
||||
var workFut = awaitne beaconBlocksByRange_v2(peer, req.slot, req.count, req.step)
|
||||
if workFut.failed():
|
||||
debug "Error, while waiting getBlocks response", peer = peer,
|
||||
slot = req.slot, slot_count = req.count, step = req.step,
|
||||
errMsg = workFut.readError().msg, peer_speed = peer.netKbps(),
|
||||
topics = "syncman"
|
||||
else:
|
||||
let res = workFut.read()
|
||||
if res.isErr:
|
||||
debug "Error, while reading getBlocks response",
|
||||
peer = peer, slot = req.slot, count = req.count,
|
||||
step = req.step, peer_speed = peer.netKbps(),
|
||||
topics = "syncman", error = $res.error()
|
||||
result = res
|
||||
else:
|
||||
let res = workFut.read()
|
||||
if res.isErr:
|
||||
debug "Error, while reading getBlocks response",
|
||||
peer = peer, slot = req.slot, count = req.count,
|
||||
step = req.step, peer_speed = peer.netKbps(),
|
||||
topics = "syncman", error = $res.error()
|
||||
result = res
|
||||
var workFut = awaitne beaconBlocksByRange(peer, req.slot, req.count, req.step)
|
||||
if workFut.failed():
|
||||
debug "Error, while waiting getBlocks response", peer = peer,
|
||||
slot = req.slot, slot_count = req.count, step = req.step,
|
||||
errMsg = workFut.readError().msg, peer_speed = peer.netKbps(),
|
||||
topics = "syncman"
|
||||
else:
|
||||
let res = workFut.read()
|
||||
if res.isErr:
|
||||
debug "Error, while reading getBlocks response",
|
||||
peer = peer, slot = req.slot, count = req.count,
|
||||
step = req.step, peer_speed = peer.netKbps(),
|
||||
topics = "syncman", error = $res.error()
|
||||
result = res.map() do (blcks: seq[phase0.SignedBeaconBlock]) -> auto: blcks.mapIt(ForkedSignedBeaconBlock.init(it))
|
||||
|
||||
template headAge(): uint64 =
|
||||
wallSlot - headSlot
|
||||
|
|
|
@ -230,8 +230,17 @@ p2pProtocol BeaconSync(version = 1,
|
|||
trace "wrote response block",
|
||||
slot = blocks[i].slot, roor = shortLog(blocks[i].root)
|
||||
let blk = dag.get(blocks[i]).data
|
||||
# TODO Altair
|
||||
await response.write(blk.phase0Block.asSigned)
|
||||
case blk.kind
|
||||
of BeaconBlockFork.Phase0:
|
||||
await response.write(blk.phase0Block.asSigned)
|
||||
of BeaconBlockFork.Altair:
|
||||
# Skipping all subsequent blocks should be OK because the spec says:
|
||||
# "Clients MAY limit the number of blocks in the response."
|
||||
# https://github.com/ethereum/eth2.0-specs/blob/dev/specs/phase0/p2p-interface.md#beaconblocksbyrange
|
||||
#
|
||||
# Also, our response would be indistinguishable from a node
|
||||
# that have been synced exactly to the altair transition slot.
|
||||
break
|
||||
|
||||
debug "Block range request done",
|
||||
peer, startSlot, count, reqStep, found = count - startIndex
|
||||
|
@ -260,9 +269,18 @@ p2pProtocol BeaconSync(version = 1,
|
|||
let blockRef = dag.getRef(blockRoots[i])
|
||||
if not isNil(blockRef):
|
||||
let blk = dag.get(blockRef).data
|
||||
# TODO Altair
|
||||
await response.write(blk.phase0Block.asSigned)
|
||||
inc found
|
||||
case blk.kind
|
||||
of BeaconBlockFork.Phase0:
|
||||
await response.write(blk.phase0Block.asSigned)
|
||||
inc found
|
||||
of BeaconBlockFork.Altair:
|
||||
# Skipping this block should be fine because the spec says:
|
||||
# "Clients MAY limit the number of blocks in the response."
|
||||
# https://github.com/ethereum/eth2.0-specs/blob/dev/specs/phase0/p2p-interface.md#beaconblocksbyroot
|
||||
#
|
||||
# Also, our response would be indistinguishable from a node
|
||||
# that have been synced exactly to the altair transition slot.
|
||||
continue
|
||||
|
||||
peer.updateRequestQuota(found.float * blockResponseCost)
|
||||
|
||||
|
@ -343,6 +361,16 @@ p2pProtocol BeaconSync(version = 1,
|
|||
{.async, libp2pProtocol("goodbye", 1).} =
|
||||
debug "Received Goodbye message", reason = disconnectReasonName(reason), peer
|
||||
|
||||
proc useSyncV2*(state: BeaconSyncNetworkState): bool =
|
||||
let
|
||||
wallTime = state.getTime()
|
||||
wallTimeSlot = state.dag.beaconClock.toBeaconTime(wallTime).slotOrZero
|
||||
|
||||
wallTimeSlot.epoch >= state.dag.cfg.ALTAIR_FORK_EPOCH
|
||||
|
||||
proc useSyncV2*(peer: Peer): bool =
|
||||
peer.networkState(BeaconSync).useSyncV2()
|
||||
|
||||
proc setStatusMsg(peer: Peer, statusMsg: StatusMsg) =
|
||||
debug "Peer status", peer, statusMsg
|
||||
peer.state(BeaconSync).statusMsg = statusMsg
|
||||
|
|
|
@ -21,8 +21,8 @@ import
|
|||
|
||||
# Local modules
|
||||
../spec/[
|
||||
datatypes, digest, crypto, forkedbeaconstate_helpers, helpers, network,
|
||||
signatures, state_transition],
|
||||
datatypes/phase0, datatypes/altair, digest, crypto,
|
||||
forkedbeaconstate_helpers, helpers, network, signatures, state_transition],
|
||||
../conf, ../beacon_clock,
|
||||
../consensus_object_pools/[
|
||||
spec_cache, blockchain_dag, block_clearance,
|
||||
|
@ -298,7 +298,7 @@ proc makeBeaconBlockForHeadAndSlot*(node: BeaconNode,
|
|||
validator_index: ValidatorIndex,
|
||||
graffiti: GraffitiBytes,
|
||||
head: BlockRef,
|
||||
slot: Slot): Future[Option[BeaconBlock]] {.async.} =
|
||||
slot: Slot): Future[Option[phase0.BeaconBlock]] {.async.} =
|
||||
# Advance state to the slot that we're proposing for
|
||||
|
||||
let
|
||||
|
@ -312,9 +312,9 @@ proc makeBeaconBlockForHeadAndSlot*(node: BeaconNode,
|
|||
|
||||
if eth1Proposal.hasMissingDeposits:
|
||||
error "Eth1 deposits not available. Skipping block proposal", slot
|
||||
return none(BeaconBlock)
|
||||
return none(phase0.BeaconBlock)
|
||||
|
||||
func restore(v: var HashedBeaconState) =
|
||||
func restore(v: var phase0.HashedBeaconState) =
|
||||
# TODO address this ugly workaround - there should probably be a
|
||||
# `state_transition` that takes a `StateData` instead and updates
|
||||
# the block as well
|
||||
|
@ -342,10 +342,10 @@ proc makeBeaconBlockForHeadAndSlot*(node: BeaconNode,
|
|||
proc proposeSignedBlock*(node: BeaconNode,
|
||||
head: BlockRef,
|
||||
validator: AttachedValidator,
|
||||
newBlock: SignedBeaconBlock):
|
||||
newBlock: phase0.SignedBeaconBlock):
|
||||
Future[BlockRef] {.async.} =
|
||||
let newBlockRef = node.dag.addRawBlock(node.quarantine, newBlock) do (
|
||||
blckRef: BlockRef, trustedBlock: TrustedSignedBeaconBlock,
|
||||
blckRef: BlockRef, trustedBlock: phase0.TrustedSignedBeaconBlock,
|
||||
epochRef: EpochRef):
|
||||
# Callback add to fork choice if signed block valid (and becomes trusted)
|
||||
node.attestationPool[].addForkChoice(
|
||||
|
@ -401,7 +401,7 @@ proc proposeBlock(node: BeaconNode,
|
|||
return head # already logged elsewhere!
|
||||
|
||||
var
|
||||
newBlock = SignedBeaconBlock(
|
||||
newBlock = phase0.SignedBeaconBlock(
|
||||
message: message.get()
|
||||
)
|
||||
|
||||
|
|
|
@ -4,7 +4,9 @@ import std/strutils
|
|||
import unittest2
|
||||
import chronos
|
||||
import ../beacon_chain/gossip_processing/block_processor,
|
||||
../beacon_chain/sync/sync_manager
|
||||
../beacon_chain/sync/sync_manager,
|
||||
../beacon_chain/spec/datatypes/phase0,
|
||||
../beacon_chain/spec/forkedbeaconstate_helpers
|
||||
|
||||
type
|
||||
SomeTPeer = ref object
|
||||
|
@ -26,13 +28,13 @@ proc newBlockProcessor(): ref BlockProcessor =
|
|||
)
|
||||
|
||||
suite "SyncManager test suite":
|
||||
proc createChain(start, finish: Slot): seq[SignedBeaconBlock] =
|
||||
proc createChain(start, finish: Slot): seq[ForkedSignedBeaconBlock] =
|
||||
doAssert(start <= finish)
|
||||
let count = int(finish - start + 1'u64)
|
||||
result = newSeq[SignedBeaconBlock](count)
|
||||
result = newSeq[ForkedSignedBeaconBlock](count)
|
||||
var curslot = start
|
||||
for item in result.mitems():
|
||||
item.message.slot = curslot
|
||||
item.phase0Block.message.slot = curslot
|
||||
curslot = curslot + 1'u64
|
||||
|
||||
test "[SyncQueue] Start and finish slots equal":
|
||||
|
@ -220,7 +222,7 @@ suite "SyncManager test suite":
|
|||
proc simpleValidator(aq: AsyncQueue[BlockEntry]) {.async.} =
|
||||
while true:
|
||||
let sblock = await aq.popFirst()
|
||||
if sblock.blck.message.slot == Slot(counter):
|
||||
if sblock.blck.slot == Slot(counter):
|
||||
inc(counter)
|
||||
sblock.done()
|
||||
else:
|
||||
|
@ -269,7 +271,7 @@ suite "SyncManager test suite":
|
|||
proc simpleValidator(aq: AsyncQueue[BlockEntry]) {.async.} =
|
||||
while true:
|
||||
let sblock = await aq.popFirst()
|
||||
if sblock.blck.message.slot == Slot(counter):
|
||||
if sblock.blck.slot == Slot(counter):
|
||||
inc(counter)
|
||||
sblock.done()
|
||||
else:
|
||||
|
@ -324,7 +326,7 @@ suite "SyncManager test suite":
|
|||
proc simpleValidator(aq: AsyncQueue[BlockEntry]) {.async.} =
|
||||
while true:
|
||||
let sblock = await aq.popFirst()
|
||||
if sblock.blck.message.slot == Slot(counter):
|
||||
if sblock.blck.slot == Slot(counter):
|
||||
inc(counter)
|
||||
sblock.done()
|
||||
else:
|
||||
|
@ -395,7 +397,7 @@ suite "SyncManager test suite":
|
|||
|
||||
test "[SyncQueue] hasEndGap() test":
|
||||
let chain1 = createChain(Slot(1), Slot(1))
|
||||
let chain2 = newSeq[SignedBeaconBlock]()
|
||||
let chain2 = newSeq[ForkedSignedBeaconBlock]()
|
||||
|
||||
for counter in countdown(32'u64, 2'u64):
|
||||
let req = SyncRequest[SomeTPeer](slot: Slot(1), count: counter,
|
||||
|
@ -412,7 +414,7 @@ suite "SyncManager test suite":
|
|||
|
||||
test "[SyncQueue] getLastNonEmptySlot() test":
|
||||
let chain1 = createChain(Slot(10), Slot(10))
|
||||
let chain2 = newSeq[SignedBeaconBlock]()
|
||||
let chain2 = newSeq[ForkedSignedBeaconBlock]()
|
||||
|
||||
for counter in countdown(32'u64, 2'u64):
|
||||
let req = SyncRequest[SomeTPeer](slot: Slot(10), count: counter,
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit 3da656687be63ccbf5d659af55d159130d325038
|
||||
Subproject commit ac7f2033a0d5e51d5f5a70b5485de79f90c20b39
|
Loading…
Reference in New Issue