diff --git a/beacon_chain/beacon_clock.nim b/beacon_chain/beacon_clock.nim index 047051288..00759145b 100644 --- a/beacon_chain/beacon_clock.nim +++ b/beacon_chain/beacon_clock.nim @@ -33,7 +33,7 @@ type BeaconTime* = distinct Duration ## Nanoseconds from beacon genesis time - GetWallTimeFn* = proc(): BeaconTime {.gcsafe, raises: [Defect].} + GetTimeFn* = proc(): Time {.gcsafe, raises: [Defect].} proc init*(T: type BeaconClock, genesis_time: uint64): T = # ~290 billion years into the future diff --git a/beacon_chain/beacon_node_common.nim b/beacon_chain/beacon_node_common.nim index 91e8a3439..7c7dc4e62 100644 --- a/beacon_chain/beacon_node_common.nim +++ b/beacon_chain/beacon_node_common.nim @@ -45,11 +45,9 @@ type attestationPool*: ref AttestationPool exitPool*: ref ExitPool eth1Monitor*: Eth1Monitor - beaconClock*: BeaconClock rpcServer*: RpcServer restServer*: RestServerRef vcProcess*: Process - forkDigest*: ForkDigest requestManager*: RequestManager syncManager*: SyncManager[Peer, PeerID] topicBeaconBlocks*: string @@ -73,6 +71,9 @@ template findIt*(s: openArray, predicate: untyped): int = break res +template beaconClock*(node: BeaconNode): BeaconClock = + node.dag.beaconClock + proc currentSlot*(node: BeaconNode): Slot = node.beaconClock.now.slotOrZero diff --git a/beacon_chain/consensus_object_pools/block_clearance.nim b/beacon_chain/consensus_object_pools/block_clearance.nim index e9995437b..9a47b7ddf 100644 --- a/beacon_chain/consensus_object_pools/block_clearance.nim +++ b/beacon_chain/consensus_object_pools/block_clearance.nim @@ -43,25 +43,25 @@ template asSigVerified(x: phase0.SignedBeaconBlock): phase0.SigVerifiedSignedBeaconBlock = ## This converts a signed beacon block to a sig verified beacon clock. ## This assumes that their bytes representation is the same. - isomorphicCast[phase0.SigVerifiedSignedBeaconBlock](signedBlock) + isomorphicCast[phase0.SigVerifiedSignedBeaconBlock](x) template asSigVerified(x: altair.SignedBeaconBlock): altair.SigVerifiedSignedBeaconBlock = ## This converts a signed beacon block to a sig verified beacon clock. ## This assumes that their bytes representation is the same. - isomorphicCast[altair.SigVerifiedSignedBeaconBlock](signedBlock) + isomorphicCast[altair.SigVerifiedSignedBeaconBlock](x) template asTrusted(x: phase0.SignedBeaconBlock or phase0.SigVerifiedBeaconBlock): phase0.TrustedSignedBeaconBlock = ## This converts a sigverified beacon block to a trusted beacon clock. ## This assumes that their bytes representation is the same. - isomorphicCast[phase0.TrustedSignedBeaconBlock](signedBlock) + isomorphicCast[phase0.TrustedSignedBeaconBlock](x) template asTrusted(x: altair.SignedBeaconBlock or altair.SigVerifiedBeaconBlock): altair.TrustedSignedBeaconBlock = ## This converts a sigverified beacon block to a trusted beacon clock. ## This assumes that their bytes representation is the same. - isomorphicCast[altair.TrustedSignedBeaconBlock](signedBlock) + isomorphicCast[altair.TrustedSignedBeaconBlock](x) func batchVerify(quarantine: QuarantineRef, sigs: openArray[SignatureSet]): bool = var secureRandomBytes: array[32, byte] @@ -213,7 +213,7 @@ proc checkStateTransition( return (ValidationResult.Reject, Invalid) return (ValidationResult.Accept, default(BlockError)) -proc advanceClearanceState*(dag: ChainDagRef) = +proc advanceClearanceState*(dag: ChainDAGRef) = # When the chain is synced, the most likely block to be produced is the block # right after head - we can exploit this assumption and advance the state # to that slot before the block arrives, thus allowing us to do the expensive diff --git a/beacon_chain/consensus_object_pools/block_pools_types.nim b/beacon_chain/consensus_object_pools/block_pools_types.nim index c9382c85b..92140b75a 100644 --- a/beacon_chain/consensus_object_pools/block_pools_types.nim +++ b/beacon_chain/consensus_object_pools/block_pools_types.nim @@ -16,7 +16,7 @@ import # Internals ../spec/[crypto, digest, signatures_batch, forkedbeaconstate_helpers], ../spec/datatypes/[phase0, altair], - ../beacon_chain_db, ../extras + ".."/[beacon_chain_db, beacon_clock, extras] export sets, tables @@ -114,6 +114,8 @@ type db*: BeaconChainDB ##\ ## ColdDB - Stores the canonical chain + beaconClock*: BeaconClock + # ----------------------------------- # ChainDAGRef - DAG of candidate chains @@ -165,6 +167,12 @@ type ## block - we limit the number of held EpochRefs to put a cap on ## memory usage + forkDigests*: ForkDigestsRef + ## Cached copy of the fork digests associated with the current + ## database. We use a ref type to facilitate sharing this small + ## value with other components which don't have access to the + ## full ChainDAG. + altairTransitionSlot*: Slot ##\ ## Slot at which to upgrade from phase 0 to Altair forks @@ -236,7 +244,7 @@ type blck: altair.TrustedSignedBeaconBlock, epochRef: EpochRef) {.gcsafe, raises: [Defect].} -template head*(dag: ChainDagRef): BlockRef = dag.headState.blck +template head*(dag: ChainDAGRef): BlockRef = dag.headState.blck template epoch*(e: EpochRef): Epoch = e.key.epoch diff --git a/beacon_chain/consensus_object_pools/blockchain_dag.nim b/beacon_chain/consensus_object_pools/blockchain_dag.nim index 7cdcfe8c2..e78732fea 100644 --- a/beacon_chain/consensus_object_pools/blockchain_dag.nim +++ b/beacon_chain/consensus_object_pools/blockchain_dag.nim @@ -410,6 +410,11 @@ proc init*(T: type ChainDAGRef, tail: tailRef, genesis: genesisRef, db: db, + beaconClock: BeaconClock.init( + getStateField(tmpState.data, genesis_time)), + forkDigests: newClone ForkDigests.init( + preset, + getStateField(tmpState.data, genesis_validators_root)), heads: @[headRef], headState: tmpState[], epochRefState: tmpState[], @@ -540,6 +545,12 @@ func stateCheckpoint*(bs: BlockSlot): BlockSlot = bs = bs.parentOrSlot bs +proc forkDigestAtSlot*(dag: ChainDAGRef, slot: Slot): ForkDigest = + if slot < dag.altairTransitionSlot: + dag.forkDigests.phase0 + else: + dag.forkDigests.altair + proc getState(dag: ChainDAGRef, state: var StateData, bs: BlockSlot): bool = ## Load a state from the database given a block and a slot - this will first ## lookup the state root in the state root table then load the corresponding @@ -657,6 +668,20 @@ proc get*(dag: ChainDAGRef, blck: BlockRef): BlockData = BlockData(data: data.get(), refs: blck) +proc getForkedBlock*(dag: ChainDAGRef, blck: BlockRef): ForkedTrustedSignedBeaconBlock = + # TODO implement this properly + let phase0Block = dag.db.getBlock(blck.root) + if phase0Block.isOk: + return ForkedTrustedSignedBeaconBlock(kind: BeaconBlockFork.Phase0, + phase0Block: phase0Block.get) + + let altairBlock = dag.db.getAltairBlock(blck.root) + if altairBlock.isOk: + return ForkedTrustedSignedBeaconBlock(kind: BeaconBlockFork.Altair, + altairBlock: altairBlock.get) + + raiseAssert "BlockRef without backing data, database corrupt?" + proc get*(dag: ChainDAGRef, root: Eth2Digest): Option[BlockData] = ## Retrieve a resolved block reference and its associated body, if available let refs = dag.getRef(root) diff --git a/beacon_chain/gossip_processing/block_processor.nim b/beacon_chain/gossip_processing/block_processor.nim index c0fde9b0e..9c86bd226 100644 --- a/beacon_chain/gossip_processing/block_processor.nim +++ b/beacon_chain/gossip_processing/block_processor.nim @@ -52,10 +52,6 @@ type dumpDirInvalid: string dumpDirIncoming: string - # Clock - # ---------------------------------------------------------------- - getWallTime: GetWallTimeFn - # Producers # ---------------------------------------------------------------- blocksQueue*: AsyncQueue[BlockEntry] # Exported for "test_sync_manager" @@ -64,6 +60,7 @@ type # ---------------------------------------------------------------- consensusManager: ref ConsensusManager ## Blockchain DAG, AttestationPool and Quarantine + getTime: GetTimeFn # Initialization # ------------------------------------------------------------------------------ @@ -72,17 +69,17 @@ proc new*(T: type BlockProcessor, dumpEnabled: bool, dumpDirInvalid, dumpDirIncoming: string, consensusManager: ref ConsensusManager, - getWallTime: GetWallTimeFn): ref BlockProcessor = + getTime: GetTimeFn): ref BlockProcessor = (ref BlockProcessor)( dumpEnabled: dumpEnabled, dumpDirInvalid: dumpDirInvalid, dumpDirIncoming: dumpDirIncoming, - - getWallTime: getWallTime, - blocksQueue: newAsyncQueue[BlockEntry](), consensusManager: consensusManager, - ) + getTime: getTime) + +proc getCurrentBeaconTime*(self: BlockProcessor): BeaconTime = + self.consensusManager.dag.beaconClock.toBeaconTime(self.getTime()) # Sync callbacks # ------------------------------------------------------------------------------ @@ -175,7 +172,7 @@ proc processBlock(self: var BlockProcessor, entry: BlockEntry) = blockRoot = shortLog(entry.blck.root) let - wallTime = self.getWallTime() + wallTime = self.getCurrentBeaconTime() (afterGenesis, wallSlot) = wallTime.toSlot() if not afterGenesis: diff --git a/beacon_chain/gossip_processing/eth2_processor.nim b/beacon_chain/gossip_processing/eth2_processor.nim index 9427cc49f..59ee57122 100644 --- a/beacon_chain/gossip_processing/eth2_processor.nim +++ b/beacon_chain/gossip_processing/eth2_processor.nim @@ -60,7 +60,6 @@ type Eth2Processor* = object doppelGangerDetectionEnabled*: bool - getWallTime*: GetWallTimeFn # Local sources of truth for validation # ---------------------------------------------------------------- @@ -86,6 +85,9 @@ type # ---------------------------------------------------------------- quarantine*: QuarantineRef + # Application-provided current time provider (to facilitate testing) + getTime*: GetTimeFn + # Initialization # ------------------------------------------------------------------------------ @@ -98,18 +100,18 @@ proc new*(T: type Eth2Processor, validatorPool: ref ValidatorPool, quarantine: QuarantineRef, rng: ref BrHmacDrbgContext, - getWallTime: GetWallTimeFn): ref Eth2Processor = + getTime: GetTimeFn): ref Eth2Processor = (ref Eth2Processor)( doppelGangerDetectionEnabled: doppelGangerDetectionEnabled, doppelgangerDetection: DoppelgangerProtection( - nodeLaunchSlot: getWallTime().slotOrZero), - getWallTime: getWallTime, + nodeLaunchSlot: dag.beaconClock.now.slotOrZero), blockProcessor: blockProcessor, dag: dag, attestationPool: attestationPool, exitPool: exitPool, validatorPool: validatorPool, quarantine: quarantine, + getTime: getTime, batchCrypto: BatchCrypto.new( rng = rng, # Only run eager attestation signature verification if we're not @@ -117,6 +119,9 @@ proc new*(T: type Eth2Processor, eager = proc(): bool = not blockProcessor[].hasBlocks()) ) +proc getCurrentBeaconTime*(self: Eth2Processor|ref Eth2Processor): BeaconTime = + self.dag.beaconClock.toBeaconTime(self.getTime()) + # Gossip Management # ----------------------------------------------------------------------------------- @@ -128,7 +133,7 @@ proc blockValidator*( blockRoot = shortLog(signedBlock.root) let - wallTime = self.getWallTime() + wallTime = self.getCurrentBeaconTime() (afterGenesis, wallSlot) = wallTime.toSlot() if not afterGenesis: @@ -167,7 +172,7 @@ proc blockValidator*( # propagation of seemingly good blocks trace "Block validated" self.blockProcessor[].addBlock( - signedBlock, validationDur = self.getWallTime() - wallTime) + signedBlock, validationDur = self.getCurrentBeaconTime() - wallTime) ValidationResult.Accept @@ -207,7 +212,7 @@ proc attestationValidator*( attestation = shortLog(attestation) subnet_id - let wallTime = self.getWallTime() + let wallTime = self.getCurrentBeaconTime() var (afterGenesis, wallSlot) = wallTime.toSlot() if not afterGenesis: @@ -228,7 +233,7 @@ proc attestationValidator*( return v.error[0] # Due to async validation the wallSlot here might have changed - (afterGenesis, wallSlot) = self.getWallTime().toSlot() + (afterGenesis, wallSlot) = self.getCurrentBeaconTime().toSlot() beacon_attestations_received.inc() beacon_attestation_delay.observe(delay.toFloatSeconds()) @@ -250,7 +255,7 @@ proc aggregateValidator*( aggregate = shortLog(signedAggregateAndProof.message.aggregate) signature = shortLog(signedAggregateAndProof.signature) - let wallTime = self.getWallTime() + let wallTime = self.getCurrentBeaconTime() var (afterGenesis, wallSlot) = wallTime.toSlot() if not afterGenesis: @@ -276,7 +281,7 @@ proc aggregateValidator*( return v.error[0] # Due to async validation the wallSlot here might have changed - (afterGenesis, wallSlot) = self.getWallTime().toSlot() + (afterGenesis, wallSlot) = self.getCurrentBeaconTime().toSlot() beacon_aggregates_received.inc() beacon_aggregate_delay.observe(delay.toFloatSeconds()) diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index 8b6490dc7..6984b0c7e 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -36,10 +36,12 @@ import version, conf, ssz/ssz_serialization, beacon_clock], ../spec/datatypes/base, - ../spec/[digest, network], + ../spec/[digest, network, helpers, forkedbeaconstate_helpers], ../validators/keystore_management, ./eth2_discovery, ./peer_pool, ./libp2p_json_serialization +from ../spec/datatypes/altair import nil + when chronicles.enabledLogLevel == LogLevel.TRACE: import std/sequtils @@ -75,7 +77,7 @@ type peerPool*: PeerPool[Peer, PeerID] protocolStates*: seq[RootRef] libp2pTransportLoops*: seq[Future[void]] - metadata*: MetaData + metadata*: altair.MetaData connectTimeout*: chronos.Duration seenThreshold*: chronos.Duration connQueue: AsyncQueue[PeerAddr] @@ -83,6 +85,7 @@ type connWorkers: seq[Future[void]] connTable: HashSet[PeerID] forkId: ENRForkID + forkDigests*: ForkDigestsRef rng*: ref BrHmacDrbgContext peers*: Table[PeerID, Peer] validTopics: HashSet[string] @@ -119,7 +122,7 @@ type Disconnecting, Disconnected - UntypedResponse = ref object + UntypedResponse* = ref object peer*: Peer stream*: Connection writtenChunks*: int @@ -194,6 +197,7 @@ type ReadResponseTimeout ZeroSizePrefix SizePrefixOverflow + InvalidContextBytes Eth2NetworkingError = object case kind*: Eth2NetworkingErrorKind @@ -208,6 +212,11 @@ type NetRes*[T] = Result[T, Eth2NetworkingError] ## This is type returned from all network requests +func phase0metadata*(node: Eth2Node): MetaData = + MetaData( + seq_number: node.metadata.seq_number, + attnets: node.metadata.attnets) + const clientId* = "Nimbus beacon node " & fullVersionStr nodeMetadataFilename = "node-metadata.json" @@ -243,7 +252,7 @@ const SeenTableTimeReconnect = 1.minutes ## Minimal time between disconnection and reconnection attempt -template neterr(kindParam: Eth2NetworkingErrorKind): auto = +template neterr*(kindParam: Eth2NetworkingErrorKind): auto = err(type(result), Eth2NetworkingError(kind: kindParam)) # Metrics for tracking attestation and beacon block loss @@ -487,14 +496,19 @@ proc getRequestProtoName(fn: NimNode): NimNode = proc writeChunk*(conn: Connection, responseCode: Option[ResponseCode], - payload: Bytes): Future[void] = + payload: Bytes, + contextBytes: openarray[byte] = []): Future[void] = var output = memoryOutput() try: if responseCode.isSome: output.write byte(responseCode.get) + if contextBytes.len > 0: + output.write contextBytes + output.write toBytes(payload.lenu64, Leb128).toOpenArray() + framingFormatCompress(output, payload) except IOError as exc: raiseAssert exc.msg # memoryOutput shouldn't raise @@ -545,7 +559,7 @@ proc sendResponseChunkBytes(response: UntypedResponse, payload: Bytes): Future[v inc response.writtenChunks response.stream.writeChunk(some Success, payload) -proc sendResponseChunkObj(response: UntypedResponse, val: auto): Future[void] = +proc sendResponseChunk*(response: UntypedResponse, val: auto): Future[void] = inc response.writtenChunks response.stream.writeChunk(some Success, SSZ.encode(val)) @@ -593,12 +607,14 @@ proc init*[MsgType](T: type SingleChunkResponse[MsgType], peer: Peer, conn: Connection): T = T(UntypedResponse(peer: peer, stream: conn)) -template write*[M](r: MultipleChunksResponse[M], val: auto): untyped = - sendResponseChunkObj(UntypedResponse(r), val) +template write*[M](r: MultipleChunksResponse[M], val: M): untyped = + mixin sendResponseChunk + sendResponseChunk(UntypedResponse(r), val) -template send*[M](r: SingleChunkResponse[M], val: auto): untyped = +template send*[M](r: SingleChunkResponse[M], val: M): untyped = + mixin sendResponseChunk doAssert UntypedResponse(r).writtenChunks == 0 - sendResponseChunkObj(UntypedResponse(r), val) + sendResponseChunk(UntypedResponse(r), val) proc performProtocolHandshakes*(peer: Peer, incoming: bool) {.async.} = # Loop down serially because it's easier to reason about the connection state @@ -723,6 +739,9 @@ proc handleIncomingStream(network: Eth2Node, of UnexpectedEOF, PotentiallyExpectedEOF: (InvalidRequest, errorMsgLit "Incomplete request") + of InvalidContextBytes: + (ServerError, errorMsgLit "Unrecognized context bytes") + of InvalidSnappyBytes: (InvalidRequest, errorMsgLit "Failed to decompress snappy payload") @@ -960,10 +979,11 @@ proc runDiscoveryLoop*(node: Eth2Node) {.async.} = # when no peers are in the routing table. Don't run it in continuous loop. await sleepAsync(1.seconds) -proc getPersistentNetMetadata*(config: BeaconNodeConf): MetaData {.raises: [Defect, IOError, SerializationError].} = +proc getPersistentNetMetadata*(config: BeaconNodeConf): altair.MetaData + {.raises: [Defect, IOError, SerializationError].} = let metadataPath = config.dataDir / nodeMetadataFilename if not fileExists(metadataPath): - result = MetaData() + result = altair.MetaData() for i in 0 ..< ATTESTATION_SUBNET_COUNT: # TODO: # Persistent (stability) subnets should be stored with their expiration @@ -971,7 +991,7 @@ proc getPersistentNetMetadata*(config: BeaconNodeConf): MetaData {.raises: [Defe result.attnets[i] = false Json.saveFile(metadataPath, result) else: - result = Json.loadFile(metadataPath, MetaData) + result = Json.loadFile(metadataPath, altair.MetaData) proc resolvePeer(peer: Peer) = # Resolve task which performs searching of peer's public key and recovery of @@ -1105,7 +1125,8 @@ proc onConnEvent(node: Eth2Node, peerId: PeerID, event: ConnEvent) {.async.} = peer = peerId, peer_state = peer.connectionState peer.connectionState = Disconnected -proc new*(T: type Eth2Node, config: BeaconNodeConf, enrForkId: ENRForkID, +proc new*(T: type Eth2Node, config: BeaconNodeConf, + enrForkId: ENRForkID, forkDigests: ForkDigestsRef, switch: Switch, pubsub: GossipSub, ip: Option[ValidIpAddress], tcpPort, udpPort: Option[Port], privKey: keys.PrivateKey, discovery: bool, rng: ref BrHmacDrbgContext): T {.raises: [Defect, CatchableError].} = @@ -1132,6 +1153,7 @@ proc new*(T: type Eth2Node, config: BeaconNodeConf, enrForkId: ENRForkID, # the previous netkey. metadata: metadata, forkId: enrForkId, + forkDigests: forkDigests, discovery: Eth2DiscoveryProtocol.new( config, ip, tcpPort, udpPort, privKey, {"eth2": SSZ.encode(enrForkId), "attnets": SSZ.encode(metadata.attnets)}, @@ -1518,29 +1540,24 @@ func gossipId(data: openArray[byte], topic: string, valid: bool): seq[byte] = return messageDigest.data[0..19].toSeq() -func isAltairTopic(topic: string): bool = - let splitted = topic.split('/') - if splitted.len < 3: - false - #TODO i'm not sure how to get the altair fork digest - else: - splitted[1] == "ALTAIRFORKDIGEST" +func isAltairTopic(topic: string, altairPrefix: string): bool = + const prefixLen = "/eth2/".len -func getAltairTopic(m: messages.Message): string = + if topic.len <= altairPrefix.len + prefixLen: + false + else: + for ind, ch in altairPrefix: + if ch != topic[ind + prefixLen]: return false + true + +func getAltairTopic(m: messages.Message, altairPrefix: string): string = + # TODO Return a lent string here to avoid the string copy let topic = if m.topicIDs.len > 0: m.topicIDs[0] else: "" - if isAltairTopic(topic): + if isAltairTopic(topic, altairPrefix): topic else: "" -func msgIdProvider(m: messages.Message): seq[byte] = - let topic = getAltairTopic(m) - try: - let decoded = snappy.decode(m.data, GOSSIP_MAX_SIZE) - gossipId(decoded, topic, true) - except CatchableError: - gossipId(m.data, topic, false) - proc newBeaconSwitch*(config: BeaconNodeConf, seckey: PrivateKey, address: MultiAddress, rng: ref BrHmacDrbgContext): Switch {.raises: [Defect, CatchableError].} = @@ -1564,12 +1581,24 @@ proc newBeaconSwitch*(config: BeaconNodeConf, seckey: PrivateKey, proc createEth2Node*(rng: ref BrHmacDrbgContext, config: BeaconNodeConf, netKeys: NetKeyPair, - enrForkId: ENRForkID): Eth2Node {.raises: [Defect, CatchableError].} = + runtimePreset: RuntimePreset, + forkDigests: ForkDigestsRef, + genesisValidatorsRoot: Eth2Digest): Eth2Node + {.raises: [Defect, CatchableError].} = var + enrForkId = getENRForkID( + # TODO altair-transition + # This function should gain an extra argument specifying + # whether the client head state is already past the Altair + # migration point. + runtimePreset.GENESIS_FORK_VERSION, + genesisValidatorsRoot) + (extIp, extTcpPort, extUdpPort) = try: setupAddress( config.nat, config.listenAddress, config.tcpPort, config.udpPort, clientId) except CatchableError as exc: raise exc except Exception as exc: raiseAssert exc.msg + hostAddress = tcpEndPoint(config.listenAddress, config.tcpPort) announcedAddresses = if extIp.isNone() or extTcpPort.isNone(): @[] else: @[tcpEndPoint(extIp.get(), extTcpPort.get())] @@ -1583,6 +1612,14 @@ proc createEth2Node*(rng: ref BrHmacDrbgContext, # are running behind a NAT). var switch = newBeaconSwitch(config, netKeys.seckey, hostAddress, rng) + func msgIdProvider(m: messages.Message): seq[byte] = + let topic = getAltairTopic(m, forkDigests.altairTopicPrefix) + try: + let decoded = snappy.decode(m.data, GOSSIP_MAX_SIZE) + gossipId(decoded, topic, true) + except CatchableError: + gossipId(m.data, topic, false) + let params = GossipSubParams( explicit: true, @@ -1638,7 +1675,9 @@ proc createEth2Node*(rng: ref BrHmacDrbgContext, except Exception as exc: raiseAssert exc.msg # TODO fix libp2p switch.mount(pubsub) - Eth2Node.new(config, enrForkId, switch, pubsub, + Eth2Node.new(config, enrForkId, + forkDigests, + switch, pubsub, extIp, extTcpPort, extUdpPort, netKeys.seckey.asEthKey, discovery = config.discv5Enabled, diff --git a/beacon_chain/networking/libp2p_streams_backend.nim b/beacon_chain/networking/libp2p_streams_backend.nim index 644d62c48..40644ad20 100644 --- a/beacon_chain/networking/libp2p_streams_backend.nim +++ b/beacon_chain/networking/libp2p_streams_backend.nim @@ -93,8 +93,8 @@ proc uncompressFramedStream*(conn: Connection, return ok output -proc readChunkPayload(conn: Connection, peer: Peer, - MsgType: type): Future[NetRes[MsgType]] {.async.} = +proc readChunkPayload*(conn: Connection, peer: Peer, + MsgType: type): Future[NetRes[MsgType]] {.async.} = let sm = now(chronos.Moment) let size = try: await conn.readVarint() @@ -126,6 +126,8 @@ proc readChunkPayload(conn: Connection, peer: Peer, proc readResponseChunk(conn: Connection, peer: Peer, MsgType: typedesc): Future[NetRes[MsgType]] {.async.} = + mixin readChunkPayload + try: var responseCodeByte: byte try: diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index 70e7d44e6..a55eeeeca 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -245,8 +245,6 @@ proc init*(T: type BeaconNode, chainDagFlags = if config.verifyFinalization: {verifyFinalization} else: {} dag = ChainDAGRef.init(runtimePreset, db, chainDagFlags) - beaconClock = - BeaconClock.init(getStateField(dag.headState.data, genesis_time)) quarantine = QuarantineRef.init(rng) databaseGenesisValidatorsRoot = getStateField(dag.headState.data, genesis_validators_root) @@ -264,7 +262,7 @@ proc init*(T: type BeaconNode, if config.weakSubjectivityCheckpoint.isSome: let - currentSlot = beaconClock.now.slotOrZero + currentSlot = dag.beaconClock.now.slotOrZero isCheckpointStale = not is_within_weak_subjectivity_period( currentSlot, dag.headState.data, @@ -314,12 +312,12 @@ proc init*(T: type BeaconNode, netKeys = getPersistentNetKeys(rng[], config) nickname = if config.nodeName == "auto": shortForm(netKeys) else: config.nodeName - enrForkId = getENRForkID( - getStateField(dag.headState.data, fork), + network = createEth2Node( + rng, config, netKeys, runtimePreset, dag.forkDigests, getStateField(dag.headState.data, genesis_validators_root)) - topicBeaconBlocks = getBeaconBlocksTopic(enrForkId.fork_digest) - topicAggregateAndProofs = getAggregateAndProofsTopic(enrForkId.fork_digest) - network = createEth2Node(rng, config, netKeys, enrForkId) + # TODO altair-transition + topicBeaconBlocks = getBeaconBlocksTopic(dag.forkDigests.phase0) + topicAggregateAndProofs = getAggregateAndProofsTopic(dag.forkDigests.phase0) attestationPool = newClone(AttestationPool.init(dag, quarantine)) exitPool = newClone(ExitPool.init(dag, quarantine)) @@ -347,15 +345,11 @@ proc init*(T: type BeaconNode, ) blockProcessor = BlockProcessor.new( config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming, - consensusManager, - proc(): BeaconTime = beaconClock.now()) + consensusManager, getTime) processor = Eth2Processor.new( config.doppelgangerDetection, - blockProcessor, - dag, attestationPool, exitPool, validatorPool, - quarantine, - rng, - proc(): BeaconTime = beaconClock.now()) + blockProcessor, dag, attestationPool, exitPool, validatorPool, + quarantine, rng, getTime) var node = BeaconNode( nickname: nickname, @@ -371,10 +365,8 @@ proc init*(T: type BeaconNode, attachedValidators: validatorPool, exitPool: exitPool, eth1Monitor: eth1Monitor, - beaconClock: beaconClock, rpcServer: rpcServer, restServer: restServer, - forkDigest: enrForkId.fork_digest, topicBeaconBlocks: topicBeaconBlocks, topicAggregateAndProofs: topicAggregateAndProofs, processor: processor, @@ -386,16 +378,17 @@ proc init*(T: type BeaconNode, # set topic validation routine network.setValidTopics( block: + # TODO altair-transition var topics = @[ topicBeaconBlocks, - getAttesterSlashingsTopic(enrForkId.fork_digest), - getProposerSlashingsTopic(enrForkId.fork_digest), - getVoluntaryExitsTopic(enrForkId.fork_digest), - getAggregateAndProofsTopic(enrForkId.fork_digest) + getAttesterSlashingsTopic(network.forkDigests.phase0), + getProposerSlashingsTopic(network.forkDigests.phase0), + getVoluntaryExitsTopic(network.forkDigests.phase0), + getAggregateAndProofsTopic(network.forkDigests.phase0) ] for subnet_id in 0'u64 ..< ATTESTATION_SUBNET_COUNT: - topics &= getAttestationTopic(enrForkId.fork_digest, SubnetId(subnet_id)) + topics &= getAttestationTopic(network.forkDigests.phase0, SubnetId(subnet_id)) topics) if node.config.inProcessValidators: @@ -411,7 +404,8 @@ proc init*(T: type BeaconNode, # This merely configures the BeaconSync # The traffic will be started when we join the network. - network.initBeaconSync(dag, enrForkId.fork_digest) + # TODO altair-transition + network.initBeaconSync(dag, network.forkDigests.phase0, getTime) node.updateValidatorMetrics() @@ -635,7 +629,7 @@ proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) {.async.} = proc getInitialAggregateSubnets(node: BeaconNode): Table[SubnetId, Slot] = let - wallEpoch = node.beaconClock.now().slotOrZero().epoch + wallEpoch = node.dag.beaconClock.now.slotOrZero.epoch validatorIndices = toIntSet(toSeq(node.getAttachedValidators().keys())) template mergeAggregateSubnets(epoch: Epoch) = @@ -674,7 +668,7 @@ proc subscribeAttestationSubnetHandlers(node: BeaconNode) {. ss.subnet_id = SubnetId(i) ss.expiration = FAR_FUTURE_EPOCH else: - let wallEpoch = node.beaconClock.now().slotOrZero().epoch + let wallEpoch = node.dag.beaconClock.now.slotOrZero.epoch # TODO make length dynamic when validator-client-based validators join and leave # In normal mode, there's one subnet subscription per validator, changing @@ -758,11 +752,12 @@ proc addMessageHandlers(node: BeaconNode) {.raises: [Defect, CatchableError].} = aggregateTopicParams.validateParameters().tryGet() basicParams.validateParameters.tryGet() + # TODO altair-transition node.network.subscribe(node.topicBeaconBlocks, blocksTopicParams, enableTopicMetrics = true) - node.network.subscribe(getAttesterSlashingsTopic(node.forkDigest), basicParams) - node.network.subscribe(getProposerSlashingsTopic(node.forkDigest), basicParams) - node.network.subscribe(getVoluntaryExitsTopic(node.forkDigest), basicParams) - node.network.subscribe(getAggregateAndProofsTopic(node.forkDigest), aggregateTopicParams, enableTopicMetrics = true) + node.network.subscribe(getAttesterSlashingsTopic(node.dag.forkDigests.phase0), basicParams) + node.network.subscribe(getProposerSlashingsTopic(node.dag.forkDigests.phase0), basicParams) + node.network.subscribe(getVoluntaryExitsTopic(node.dag.forkDigests.phase0), basicParams) + node.network.subscribe(getAggregateAndProofsTopic(node.dag.forkDigests.phase0), aggregateTopicParams, enableTopicMetrics = true) node.subscribeAttestationSubnetHandlers() func getTopicSubscriptionEnabled(node: BeaconNode): bool = @@ -772,15 +767,16 @@ proc removeMessageHandlers(node: BeaconNode) {.raises: [Defect, CatchableError]. node.attestationSubnets.enabled = false doAssert not node.getTopicSubscriptionEnabled() - node.network.unsubscribe(getBeaconBlocksTopic(node.forkDigest)) - node.network.unsubscribe(getVoluntaryExitsTopic(node.forkDigest)) - node.network.unsubscribe(getProposerSlashingsTopic(node.forkDigest)) - node.network.unsubscribe(getAttesterSlashingsTopic(node.forkDigest)) - node.network.unsubscribe(getAggregateAndProofsTopic(node.forkDigest)) + # TODO altair-transition + node.network.unsubscribe(getBeaconBlocksTopic(node.dag.forkDigests.phase0)) + node.network.unsubscribe(getVoluntaryExitsTopic(node.dag.forkDigests.phase0)) + node.network.unsubscribe(getProposerSlashingsTopic(node.dag.forkDigests.phase0)) + node.network.unsubscribe(getAttesterSlashingsTopic(node.dag.forkDigests.phase0)) + node.network.unsubscribe(getAggregateAndProofsTopic(node.dag.forkDigests.phase0)) for subnet_id in 0'u64 ..< ATTESTATION_SUBNET_COUNT: node.network.unsubscribe( - getAttestationTopic(node.forkDigest, SubnetId(subnet_id))) + getAttestationTopic(node.dag.forkDigests.phase0, SubnetId(subnet_id))) proc setupDoppelgangerDetection(node: BeaconNode, slot: Slot) = # When another client's already running, this is very likely to detect @@ -932,7 +928,7 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} = node.attestationSubnets.proposingSlots, node.attestationSubnets.lastCalculatedEpoch, slot) nextActionWaitTime = saturate(fromNow( - node.beaconClock, min(nextAttestationSlot, nextProposalSlot))) + node.dag.beaconClock, min(nextAttestationSlot, nextProposalSlot))) info "Slot end", slot = shortLog(slot), @@ -959,7 +955,7 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} = # state in anticipation of receiving the next block - we do it after logging # slot end since the nextActionWaitTime can be short let - advanceCutoff = node.beaconClock.fromNow( + advanceCutoff = node.dag.beaconClock.fromNow( slot.toBeaconTime(chronos.seconds(int(SECONDS_PER_SLOT - 1)))) if advanceCutoff.inFuture: # We wait until there's only a second left before the next slot begins, then @@ -1037,7 +1033,7 @@ proc runSlotLoop(node: BeaconNode, startTime: BeaconTime) {.async.} = await sleepAsync(timeToNextSlot) let - wallTime = node.beaconClock.now() + wallTime = node.dag.beaconClock.now() wallSlot = wallTime.slotOrZero() # Always > GENESIS! if wallSlot < nextSlot: @@ -1086,7 +1082,7 @@ proc runSlotLoop(node: BeaconNode, startTime: BeaconTime) {.async.} = curSlot = wallSlot nextSlot = wallSlot + 1 - timeToNextSlot = saturate(node.beaconClock.fromNow(nextSlot)) + timeToNextSlot = saturate(node.dag.beaconClock.fromNow(nextSlot)) proc handleMissingBlocks(node: BeaconNode) = let missingBlocks = node.quarantine.checkMissing() @@ -1118,7 +1114,7 @@ proc startSyncManager(node: BeaconNode) = node.dag.head.slot proc getLocalWallSlot(): Slot = - node.beaconClock.now().slotOrZero + node.dag.beaconClock.now.slotOrZero func getFirstSlotAtFinalizedEpoch(): Slot = node.dag.finalizedHead.slot @@ -1180,17 +1176,19 @@ proc installMessageValidators(node: BeaconNode) = # https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#attestations-and-aggregation # These validators stay around the whole time, regardless of which specific # subnets are subscribed to during any given epoch. + + # TODO altair-transition for it in 0'u64 ..< ATTESTATION_SUBNET_COUNT.uint64: closureScope: let subnet_id = SubnetId(it) node.network.addAsyncValidator( - getAttestationTopic(node.forkDigest, subnet_id), + getAttestationTopic(node.dag.forkDigests.phase0, subnet_id), # This proc needs to be within closureScope; don't lift out of loop. proc(attestation: Attestation): Future[ValidationResult] = node.processor.attestationValidator(attestation, subnet_id)) node.network.addAsyncValidator( - getAggregateAndProofsTopic(node.forkDigest), + getAggregateAndProofsTopic(node.dag.forkDigests.phase0), proc(signedAggregateAndProof: SignedAggregateAndProof): Future[ValidationResult] = node.processor.aggregateValidator(signedAggregateAndProof)) @@ -1200,17 +1198,17 @@ proc installMessageValidators(node: BeaconNode) = node.processor[].blockValidator(signedBlock)) node.network.addValidator( - getAttesterSlashingsTopic(node.forkDigest), + getAttesterSlashingsTopic(node.dag.forkDigests.phase0), proc (attesterSlashing: AttesterSlashing): ValidationResult = node.processor[].attesterSlashingValidator(attesterSlashing)) node.network.addValidator( - getProposerSlashingsTopic(node.forkDigest), + getProposerSlashingsTopic(node.dag.forkDigests.phase0), proc (proposerSlashing: ProposerSlashing): ValidationResult = node.processor[].proposerSlashingValidator(proposerSlashing)) node.network.addValidator( - getVoluntaryExitsTopic(node.forkDigest), + getVoluntaryExitsTopic(node.dag.forkDigests.phase0), proc (signedVoluntaryExit: SignedVoluntaryExit): ValidationResult = node.processor[].voluntaryExitValidator(signedVoluntaryExit)) @@ -1246,7 +1244,7 @@ proc run*(node: BeaconNode) {.raises: [Defect, CatchableError].} = node.installMessageValidators() - let startTime = node.beaconClock.now() + let startTime = node.dag.beaconClock.now() asyncSpawn runSlotLoop(node, startTime) asyncSpawn runOnSecondLoop(node) asyncSpawn runQueueProcessingLoop(node.blockProcessor) @@ -1310,14 +1308,14 @@ proc start(node: BeaconNode) {.raises: [Defect, CatchableError].} = let head = node.dag.head finalizedHead = node.dag.finalizedHead - genesisTime = node.beaconClock.fromNow(toBeaconTime(Slot 0)) + genesisTime = node.dag.beaconClock.fromNow(toBeaconTime(Slot 0)) notice "Starting beacon node", version = fullVersionStr, enr = node.network.announcedENR.toURI, peerId = $node.network.switch.peerInfo.peerId, timeSinceFinalization = - node.beaconClock.now() - finalizedHead.slot.toBeaconTime(), + node.dag.beaconClock.now() - finalizedHead.slot.toBeaconTime(), head = shortLog(head), finalizedHead = shortLog(finalizedHead), SLOTS_PER_EPOCH, @@ -1756,7 +1754,8 @@ proc doCreateTestnet(config: BeaconNodeConf, rng: var BrHmacDrbgContext) {.raise some(config.bootstrapPort), some(config.bootstrapPort), [toFieldPair("eth2", SSZ.encode(getENRForkID( - initialState[].fork, initialState[].genesis_validators_root))), + initialState[].fork.current_version, + initialState[].genesis_validators_root))), toFieldPair("attnets", SSZ.encode(netMetadata.attnets))]) writeFile(bootstrapFile, bootstrapEnr.tryGet().toURI) diff --git a/beacon_chain/rpc/beacon_api.nim b/beacon_chain/rpc/beacon_api.nim index a48afa2b2..4b30be683 100644 --- a/beacon_chain/rpc/beacon_api.nim +++ b/beacon_chain/rpc/beacon_api.nim @@ -404,14 +404,18 @@ proc installBeaconApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {. "Beacon node is currently syncing, try again later.") let head = node.dag.head if head.slot >= blck.message.slot: - node.network.broadcast(getBeaconBlocksTopic(node.forkDigest), blck) + # TODO altair-transition + let blocksTopic = getBeaconBlocksTopic(node.dag.forkDigests.phase0) + node.network.broadcast(blocksTopic, blck) # The block failed validation, but was successfully broadcast anyway. # It was not integrated into the beacon node's database. return 202 else: let res = await proposeSignedBlock(node, head, AttachedValidator(), blck) if res == head: - node.network.broadcast(getBeaconBlocksTopic(node.forkDigest), blck) + # TODO altair-transition + let blocksTopic = getBeaconBlocksTopic(node.dag.forkDigests.phase0) + node.network.broadcast(blocksTopic, blck) # The block failed validation, but was successfully broadcast anyway. # It was not integrated into the beacon node''s database. return 202 diff --git a/beacon_chain/rpc/beacon_rest_api.nim b/beacon_chain/rpc/beacon_rest_api.nim index 0d0d8051f..ea8145306 100644 --- a/beacon_chain/rpc/beacon_rest_api.nim +++ b/beacon_chain/rpc/beacon_rest_api.nim @@ -626,12 +626,16 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) = return RestApiResponse.jsonError(Http503, BeaconNodeInSyncError) if head.slot >= blck.message.slot: - node.network.broadcast(getBeaconBlocksTopic(node.forkDigest), blck) + # TODO altair-transition + let blocksTopic = getBeaconBlocksTopic(node.dag.forkDigests.phase0) + node.network.broadcast(blocksTopic, blck) return RestApiResponse.jsonError(Http202, BlockValidationError) else: let res = await proposeSignedBlock(node, head, AttachedValidator(), blck) if res == head: - node.network.broadcast(getBeaconBlocksTopic(node.forkDigest), blck) + # TODO altair-transition + let blocksTopic = getBeaconBlocksTopic(node.dag.forkDigests.phase0) + node.network.broadcast(blocksTopic, blck) return RestApiResponse.jsonError(Http202, BlockValidationError) else: return RestApiResponse.jsonError(Http200, BlockValidationSuccess) diff --git a/beacon_chain/rpc/validator_rest_api.nim b/beacon_chain/rpc/validator_rest_api.nim index 685299efd..5a2ac7461 100644 --- a/beacon_chain/rpc/validator_rest_api.nim +++ b/beacon_chain/rpc/validator_rest_api.nim @@ -302,7 +302,7 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) = dres.get() for item in proofs: - let wallTime = node.processor.getWallTime() + let wallTime = node.processor.getCurrentBeaconTime() let res = await node.attestationPool.validateAggregate( node.processor.batchCrypto, item, wallTime ) diff --git a/beacon_chain/spec/datatypes/altair.nim b/beacon_chain/spec/datatypes/altair.nim index 7cbcbd3d7..e24ad623f 100644 --- a/beacon_chain/spec/datatypes/altair.nim +++ b/beacon_chain/spec/datatypes/altair.nim @@ -353,6 +353,12 @@ type # [New in Altair] sync_aggregate*: SyncAggregate + # https://github.com/ethereum/eth2.0-specs/blob/v1.1.0-alpha.7/specs/altair/p2p-interface.md#metadata + MetaData* = object + seq_number*: uint64 + attnets*: BitArray[ATTESTATION_SUBNET_COUNT] + syncnets*: BitArray[SYNC_COMMITTEE_SUBNET_COUNT] + TrustedBeaconBlockBody* = object ## A full verified block randao_reveal*: TrustedSig diff --git a/beacon_chain/spec/datatypes/base.nim b/beacon_chain/spec/datatypes/base.nim index 063e23563..d1f3546c0 100644 --- a/beacon_chain/spec/datatypes/base.nim +++ b/beacon_chain/spec/datatypes/base.nim @@ -777,6 +777,9 @@ func low*(v: ForkDigest | Version): int = 0 func high*(v: ForkDigest | Version): int = len(v) - 1 func `[]`*(v: ForkDigest | Version, idx: int): byte = array[4, byte](v)[idx] +template bytes*(v: ForkDigest): array[4, byte] = + distinctBase(v) + func shortLog*(s: Slot): uint64 = s - GENESIS_SLOT diff --git a/beacon_chain/spec/forkedbeaconstate_helpers.nim b/beacon_chain/spec/forkedbeaconstate_helpers.nim index 7aeb0acc5..0d4ef4d8b 100644 --- a/beacon_chain/spec/forkedbeaconstate_helpers.nim +++ b/beacon_chain/spec/forkedbeaconstate_helpers.nim @@ -26,6 +26,31 @@ type of forkPhase0: hbsPhase0*: phase0.HashedBeaconState of forkAltair: hbsAltair*: altair.HashedBeaconState + BeaconBlockFork* {.pure.} = enum + Phase0 + Altair + + ForkedSignedBeaconBlock* = object + case kind*: BeaconBlockFork + of BeaconBlockFork.Phase0: + phase0Block*: phase0.SignedBeaconBlock + of BeaconBlockFork.Altair: + altairBlock*: altair.SignedBeaconBlock + + ForkedTrustedSignedBeaconBlock* = object + case kind*: BeaconBlockFork + of BeaconBlockFork.Phase0: + phase0Block*: phase0.TrustedSignedBeaconBlock + of BeaconBlockFork.Altair: + altairBlock*: altair.TrustedSignedBeaconBlock + + ForkDigests* = object + phase0*: ForkDigest + altair*: ForkDigest + altairTopicPrefix*: string # Used by isAltairTopic + + ForkDigestsRef* = ref ForkDigests + # State-related functionality based on ForkedHashedBeaconState instead of BeaconState # Dispatch functions @@ -177,3 +202,37 @@ func get_previous_epoch*(stateData: ForkedHashedBeaconState): Epoch = GENESIS_EPOCH else: current_epoch - 1 + +func init*(T: type ForkDigests, + runtimePreset: RuntimePreset, + genesisValidatorsRoot: Eth2Digest): T = + let altairForkDigest = compute_fork_digest( + runtimePreset.ALTAIR_FORK_VERSION, + genesisValidatorsRoot) + + T(phase0: compute_fork_digest( + runtimePreset.GENESIS_FORK_VERSION, + genesisValidatorsRoot), + altair: altairForkDigest, + altairTopicPrefix: $altairForkDigest) + +template asSigned*(x: phase0.TrustedSignedBeaconBlock or phase0.SigVerifiedBeaconBlock): + phase0.SignedBeaconBlock = + static: # TODO See isomorphicCast + doAssert sizeof(x) == sizeof(phase0.SignedBeaconBlock) + + cast[ptr phase0.SignedBeaconBlock](x.unsafeAddr)[] + +template asSigned*(x: altair.TrustedSignedBeaconBlock or altair.SigVerifiedBeaconBlock): + altair.SignedBeaconBlock = + static: # TODO See isomorphicCast + doAssert sizeof(x) == sizeof(altair.SignedBeaconBlock) + + cast[ptr altair.SignedBeaconBlock](x.unsafeAddr)[] + +template asSigned*(x: ForkedTrustedSignedBeaconBlock): ForkedSignedBeaconBlock = + static: # TODO See isomorphicCast + doAssert sizeof(x) == sizeof(ForkedSignedBeaconBlock) + + cast[ptr ForkedSignedBeaconBlock](x.unsafeAddr)[] + diff --git a/beacon_chain/spec/network.nim b/beacon_chain/spec/network.nim index 9037a63ec..062e4ef77 100644 --- a/beacon_chain/spec/network.nim +++ b/beacon_chain/spec/network.nim @@ -73,13 +73,14 @@ func getAttestationTopic*(forkDigest: ForkDigest, subnet_id: SubnetId): ## For subscribing and unsubscribing to/from a subnet. eth2Prefix(forkDigest) & "beacon_attestation_" & $uint64(subnet_id) & "/ssz" -func getENRForkID*(fork: Fork, genesis_validators_root: Eth2Digest): ENRForkID = +func getENRForkID*(fork_version: Version, + genesis_validators_root: Eth2Digest): ENRForkID = let - current_fork_version = fork.current_version + current_fork_version = fork_version fork_digest = compute_fork_digest( current_fork_version, genesis_validators_root) ENRForkID( fork_digest: fork_digest, next_fork_version: current_fork_version, - next_fork_epoch: FAR_FUTURE_EPOCH) + next_fork_epoch: FAR_FUTURE_EPOCH) # TODO altair-transition diff --git a/beacon_chain/sync/sync_protocol.nim b/beacon_chain/sync/sync_protocol.nim index efa3e20f0..277a34e0e 100644 --- a/beacon_chain/sync/sync_protocol.nim +++ b/beacon_chain/sync/sync_protocol.nim @@ -11,10 +11,12 @@ import options, tables, sets, macros, chronicles, chronos, stew/ranges/bitranges, libp2p/switch, ../spec/[crypto, datatypes, digest, forkedbeaconstate_helpers, network], - ../beacon_node_types, + ../beacon_node_types, ../beacon_clock, ../networking/eth2_network, ../consensus_object_pools/blockchain_dag +from ../spec/datatypes/altair import nil + logScope: topics = "sync" @@ -49,6 +51,7 @@ type BeaconSyncNetworkState* = ref object dag*: ChainDAGRef forkDigest*: ForkDigest + getTime*: GetTimeFn BeaconSyncPeerState* = ref object statusLastTime*: chronos.Moment @@ -60,6 +63,49 @@ type BlockRootsList* = List[Eth2Digest, Limit MAX_REQUEST_BLOCKS] + AltairSignedBeaconBlock* = altair.SignedBeaconBlock + +proc readChunkPayload*(conn: Connection, peer: Peer, + MsgType: type ForkedSignedBeaconBlock): Future[NetRes[ForkedSignedBeaconBlock]] {.async.} = + var contextBytes: ForkDigest + try: + await conn.readExactly(addr contextBytes, sizeof contextBytes) + except CatchableError as e: + return neterr UnexpectedEOF + + if contextBytes == peer.network.forkDigests.phase0: + let res = await readChunkPayload(conn, peer, SignedBeaconBlock) + if res.isOk: + return ok ForkedSignedBeaconBlock( + kind: BeaconBlockFork.Phase0, + phase0Block: res.get) + else: + return err(res.error) + elif contextBytes == peer.network.forkDigests.altair: + let res = await readChunkPayload(conn, peer, AltairSignedBeaconBlock) + if res.isOk: + return ok ForkedSignedBeaconBlock( + kind: BeaconBlockFork.Altair, + altairBlock: res.get) + else: + return err(res.error) + else: + return neterr InvalidContextBytes + +proc sendResponseChunk*(response: UntypedResponse, + val: ForkedSignedBeaconBlock): Future[void] = + inc response.writtenChunks + + case val.kind + of BeaconBlockFork.Phase0: + response.stream.writeChunk(some ResponseCode.Success, + SSZ.encode(val.phase0Block), + response.peer.network.forkDigests.phase0.bytes) + of BeaconBlockFork.Altair: + response.stream.writeChunk(some ResponseCode.Success, + SSZ.encode(val.altairBlock), + response.peer.network.forkDigests.altair.bytes) + func shortLog*(s: StatusMsg): auto = ( forkDigest: s.forkDigest, @@ -81,9 +127,11 @@ proc getCurrentStatus*(state: BeaconSyncNetworkState): StatusMsg {.gcsafe.} = let dag = state.dag headBlock = dag.head + wallTime = state.getTime() + wallTimeSlot = dag.beaconClock.toBeaconTime(wallTime).slotOrZero StatusMsg( - forkDigest: state.forkDigest, + forkDigest: state.dag.forkDigestAtSlot(wallTimeSlot), finalizedRoot: getStateField(dag.headState.data, finalized_checkpoint).root, finalizedEpoch: @@ -99,6 +147,7 @@ proc handleStatus(peer: Peer, proc setStatusMsg(peer: Peer, statusMsg: StatusMsg) {.gcsafe.} {.pop.} # TODO fix p2p macro for raises + p2pProtocol BeaconSync(version = 1, networkState = BeaconSyncNetworkState, peerState = BeaconSyncPeerState): @@ -145,6 +194,10 @@ p2pProtocol BeaconSync(version = 1, proc getMetaData(peer: Peer): MetaData {.libp2pProtocol("metadata", 1).} = + return peer.network.phase0Metadata + + proc getMetadata_v2(peer: Peer): altair.MetaData + {.libp2pProtocol("metadata", 2).} = return peer.network.metadata proc beaconBlocksByRange( @@ -177,7 +230,8 @@ p2pProtocol BeaconSync(version = 1, doAssert not blocks[i].isNil, "getBlockRange should return non-nil blocks only" trace "wrote response block", slot = blocks[i].slot, roor = shortLog(blocks[i].root) - await response.write(dag.get(blocks[i]).data) + let blk = dag.get(blocks[i]).data + await response.write(blk.asSigned) debug "Block range request done", peer, startSlot, count, reqStep, found = count - startIndex @@ -205,7 +259,77 @@ p2pProtocol BeaconSync(version = 1, for i in 0.. 0'u64 and reqStep > 0'u64: + var blocks: array[MAX_REQUEST_BLOCKS, BlockRef] + let + dag = peer.networkState.dag + # Limit number of blocks in response + count = int min(reqCount, blocks.lenu64) + + let + endIndex = count - 1 + startIndex = + dag.getBlockRange(startSlot, reqStep, + blocks.toOpenArray(0, endIndex)) + peer.updateRequestQuota( + blockByRangeLookupCost + + max(0, endIndex - startIndex + 1).float * blockResponseCost) + peer.awaitNonNegativeRequestQuota() + + for i in startIndex..endIndex: + doAssert not blocks[i].isNil, "getBlockRange should return non-nil blocks only" + trace "wrote response block", + slot = blocks[i].slot, roor = shortLog(blocks[i].root) + let blk = dag.getForkedBlock(blocks[i]) + await response.write(blk.asSigned) + + debug "Block range request done", + peer, startSlot, count, reqStep, found = count - startIndex + else: + raise newException(InvalidInputsError, "Empty range requested") + + proc beaconBlocksByRoot_v2( + peer: Peer, + # Please note that the SSZ list here ensures that the + # spec constant MAX_REQUEST_BLOCKS is enforced: + blockRoots: BlockRootsList, + response: MultipleChunksResponse[ForkedSignedBeaconBlock]) + {.async, libp2pProtocol("beacon_blocks_by_root", 2).} = + + if blockRoots.len == 0: + raise newException(InvalidInputsError, "No blocks requested") + + let + dag = peer.networkState.dag + count = blockRoots.len + + peer.updateRequestQuota(count.float * blockByRootLookupCost) + peer.awaitNonNegativeRequestQuota() + + var found = 0 + for i in 0..