Implement the new Altair req/resp protocols (#2676)

* Implement the new Altair req/resp protocols

Also fixes the altair message-id computation by providing the correct
forkdigest prefix in `isAltairTopic`.

Co-authored-by: Tanguy Cizain <tanguycizain@gmail.com>
This commit is contained in:
zah 2021-07-07 12:09:47 +03:00 committed by GitHub
parent 17da9aaa09
commit eb2dc5cbbb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 419 additions and 134 deletions

View File

@ -33,7 +33,7 @@ type
BeaconTime* = distinct Duration ## Nanoseconds from beacon genesis time
GetWallTimeFn* = proc(): BeaconTime {.gcsafe, raises: [Defect].}
GetTimeFn* = proc(): Time {.gcsafe, raises: [Defect].}
proc init*(T: type BeaconClock, genesis_time: uint64): T =
# ~290 billion years into the future

View File

@ -45,11 +45,9 @@ type
attestationPool*: ref AttestationPool
exitPool*: ref ExitPool
eth1Monitor*: Eth1Monitor
beaconClock*: BeaconClock
rpcServer*: RpcServer
restServer*: RestServerRef
vcProcess*: Process
forkDigest*: ForkDigest
requestManager*: RequestManager
syncManager*: SyncManager[Peer, PeerID]
topicBeaconBlocks*: string
@ -73,6 +71,9 @@ template findIt*(s: openArray, predicate: untyped): int =
break
res
template beaconClock*(node: BeaconNode): BeaconClock =
node.dag.beaconClock
proc currentSlot*(node: BeaconNode): Slot =
node.beaconClock.now.slotOrZero

View File

@ -43,25 +43,25 @@ template asSigVerified(x: phase0.SignedBeaconBlock):
phase0.SigVerifiedSignedBeaconBlock =
## This converts a signed beacon block to a sig verified beacon clock.
## This assumes that their bytes representation is the same.
isomorphicCast[phase0.SigVerifiedSignedBeaconBlock](signedBlock)
isomorphicCast[phase0.SigVerifiedSignedBeaconBlock](x)
template asSigVerified(x: altair.SignedBeaconBlock):
altair.SigVerifiedSignedBeaconBlock =
## This converts a signed beacon block to a sig verified beacon clock.
## This assumes that their bytes representation is the same.
isomorphicCast[altair.SigVerifiedSignedBeaconBlock](signedBlock)
isomorphicCast[altair.SigVerifiedSignedBeaconBlock](x)
template asTrusted(x: phase0.SignedBeaconBlock or phase0.SigVerifiedBeaconBlock):
phase0.TrustedSignedBeaconBlock =
## This converts a sigverified beacon block to a trusted beacon clock.
## This assumes that their bytes representation is the same.
isomorphicCast[phase0.TrustedSignedBeaconBlock](signedBlock)
isomorphicCast[phase0.TrustedSignedBeaconBlock](x)
template asTrusted(x: altair.SignedBeaconBlock or altair.SigVerifiedBeaconBlock):
altair.TrustedSignedBeaconBlock =
## This converts a sigverified beacon block to a trusted beacon clock.
## This assumes that their bytes representation is the same.
isomorphicCast[altair.TrustedSignedBeaconBlock](signedBlock)
isomorphicCast[altair.TrustedSignedBeaconBlock](x)
func batchVerify(quarantine: QuarantineRef, sigs: openArray[SignatureSet]): bool =
var secureRandomBytes: array[32, byte]
@ -213,7 +213,7 @@ proc checkStateTransition(
return (ValidationResult.Reject, Invalid)
return (ValidationResult.Accept, default(BlockError))
proc advanceClearanceState*(dag: ChainDagRef) =
proc advanceClearanceState*(dag: ChainDAGRef) =
# When the chain is synced, the most likely block to be produced is the block
# right after head - we can exploit this assumption and advance the state
# to that slot before the block arrives, thus allowing us to do the expensive

View File

@ -16,7 +16,7 @@ import
# Internals
../spec/[crypto, digest, signatures_batch, forkedbeaconstate_helpers],
../spec/datatypes/[phase0, altair],
../beacon_chain_db, ../extras
".."/[beacon_chain_db, beacon_clock, extras]
export sets, tables
@ -114,6 +114,8 @@ type
db*: BeaconChainDB ##\
## ColdDB - Stores the canonical chain
beaconClock*: BeaconClock
# -----------------------------------
# ChainDAGRef - DAG of candidate chains
@ -165,6 +167,12 @@ type
## block - we limit the number of held EpochRefs to put a cap on
## memory usage
forkDigests*: ForkDigestsRef
## Cached copy of the fork digests associated with the current
## database. We use a ref type to facilitate sharing this small
## value with other components which don't have access to the
## full ChainDAG.
altairTransitionSlot*: Slot ##\
## Slot at which to upgrade from phase 0 to Altair forks
@ -236,7 +244,7 @@ type
blck: altair.TrustedSignedBeaconBlock,
epochRef: EpochRef) {.gcsafe, raises: [Defect].}
template head*(dag: ChainDagRef): BlockRef = dag.headState.blck
template head*(dag: ChainDAGRef): BlockRef = dag.headState.blck
template epoch*(e: EpochRef): Epoch = e.key.epoch

View File

@ -410,6 +410,11 @@ proc init*(T: type ChainDAGRef,
tail: tailRef,
genesis: genesisRef,
db: db,
beaconClock: BeaconClock.init(
getStateField(tmpState.data, genesis_time)),
forkDigests: newClone ForkDigests.init(
preset,
getStateField(tmpState.data, genesis_validators_root)),
heads: @[headRef],
headState: tmpState[],
epochRefState: tmpState[],
@ -540,6 +545,12 @@ func stateCheckpoint*(bs: BlockSlot): BlockSlot =
bs = bs.parentOrSlot
bs
proc forkDigestAtSlot*(dag: ChainDAGRef, slot: Slot): ForkDigest =
if slot < dag.altairTransitionSlot:
dag.forkDigests.phase0
else:
dag.forkDigests.altair
proc getState(dag: ChainDAGRef, state: var StateData, bs: BlockSlot): bool =
## Load a state from the database given a block and a slot - this will first
## lookup the state root in the state root table then load the corresponding
@ -657,6 +668,20 @@ proc get*(dag: ChainDAGRef, blck: BlockRef): BlockData =
BlockData(data: data.get(), refs: blck)
proc getForkedBlock*(dag: ChainDAGRef, blck: BlockRef): ForkedTrustedSignedBeaconBlock =
# TODO implement this properly
let phase0Block = dag.db.getBlock(blck.root)
if phase0Block.isOk:
return ForkedTrustedSignedBeaconBlock(kind: BeaconBlockFork.Phase0,
phase0Block: phase0Block.get)
let altairBlock = dag.db.getAltairBlock(blck.root)
if altairBlock.isOk:
return ForkedTrustedSignedBeaconBlock(kind: BeaconBlockFork.Altair,
altairBlock: altairBlock.get)
raiseAssert "BlockRef without backing data, database corrupt?"
proc get*(dag: ChainDAGRef, root: Eth2Digest): Option[BlockData] =
## Retrieve a resolved block reference and its associated body, if available
let refs = dag.getRef(root)

View File

@ -52,10 +52,6 @@ type
dumpDirInvalid: string
dumpDirIncoming: string
# Clock
# ----------------------------------------------------------------
getWallTime: GetWallTimeFn
# Producers
# ----------------------------------------------------------------
blocksQueue*: AsyncQueue[BlockEntry] # Exported for "test_sync_manager"
@ -64,6 +60,7 @@ type
# ----------------------------------------------------------------
consensusManager: ref ConsensusManager
## Blockchain DAG, AttestationPool and Quarantine
getTime: GetTimeFn
# Initialization
# ------------------------------------------------------------------------------
@ -72,17 +69,17 @@ proc new*(T: type BlockProcessor,
dumpEnabled: bool,
dumpDirInvalid, dumpDirIncoming: string,
consensusManager: ref ConsensusManager,
getWallTime: GetWallTimeFn): ref BlockProcessor =
getTime: GetTimeFn): ref BlockProcessor =
(ref BlockProcessor)(
dumpEnabled: dumpEnabled,
dumpDirInvalid: dumpDirInvalid,
dumpDirIncoming: dumpDirIncoming,
getWallTime: getWallTime,
blocksQueue: newAsyncQueue[BlockEntry](),
consensusManager: consensusManager,
)
getTime: getTime)
proc getCurrentBeaconTime*(self: BlockProcessor): BeaconTime =
self.consensusManager.dag.beaconClock.toBeaconTime(self.getTime())
# Sync callbacks
# ------------------------------------------------------------------------------
@ -175,7 +172,7 @@ proc processBlock(self: var BlockProcessor, entry: BlockEntry) =
blockRoot = shortLog(entry.blck.root)
let
wallTime = self.getWallTime()
wallTime = self.getCurrentBeaconTime()
(afterGenesis, wallSlot) = wallTime.toSlot()
if not afterGenesis:

View File

@ -60,7 +60,6 @@ type
Eth2Processor* = object
doppelGangerDetectionEnabled*: bool
getWallTime*: GetWallTimeFn
# Local sources of truth for validation
# ----------------------------------------------------------------
@ -86,6 +85,9 @@ type
# ----------------------------------------------------------------
quarantine*: QuarantineRef
# Application-provided current time provider (to facilitate testing)
getTime*: GetTimeFn
# Initialization
# ------------------------------------------------------------------------------
@ -98,18 +100,18 @@ proc new*(T: type Eth2Processor,
validatorPool: ref ValidatorPool,
quarantine: QuarantineRef,
rng: ref BrHmacDrbgContext,
getWallTime: GetWallTimeFn): ref Eth2Processor =
getTime: GetTimeFn): ref Eth2Processor =
(ref Eth2Processor)(
doppelGangerDetectionEnabled: doppelGangerDetectionEnabled,
doppelgangerDetection: DoppelgangerProtection(
nodeLaunchSlot: getWallTime().slotOrZero),
getWallTime: getWallTime,
nodeLaunchSlot: dag.beaconClock.now.slotOrZero),
blockProcessor: blockProcessor,
dag: dag,
attestationPool: attestationPool,
exitPool: exitPool,
validatorPool: validatorPool,
quarantine: quarantine,
getTime: getTime,
batchCrypto: BatchCrypto.new(
rng = rng,
# Only run eager attestation signature verification if we're not
@ -117,6 +119,9 @@ proc new*(T: type Eth2Processor,
eager = proc(): bool = not blockProcessor[].hasBlocks())
)
proc getCurrentBeaconTime*(self: Eth2Processor|ref Eth2Processor): BeaconTime =
self.dag.beaconClock.toBeaconTime(self.getTime())
# Gossip Management
# -----------------------------------------------------------------------------------
@ -128,7 +133,7 @@ proc blockValidator*(
blockRoot = shortLog(signedBlock.root)
let
wallTime = self.getWallTime()
wallTime = self.getCurrentBeaconTime()
(afterGenesis, wallSlot) = wallTime.toSlot()
if not afterGenesis:
@ -167,7 +172,7 @@ proc blockValidator*(
# propagation of seemingly good blocks
trace "Block validated"
self.blockProcessor[].addBlock(
signedBlock, validationDur = self.getWallTime() - wallTime)
signedBlock, validationDur = self.getCurrentBeaconTime() - wallTime)
ValidationResult.Accept
@ -207,7 +212,7 @@ proc attestationValidator*(
attestation = shortLog(attestation)
subnet_id
let wallTime = self.getWallTime()
let wallTime = self.getCurrentBeaconTime()
var (afterGenesis, wallSlot) = wallTime.toSlot()
if not afterGenesis:
@ -228,7 +233,7 @@ proc attestationValidator*(
return v.error[0]
# Due to async validation the wallSlot here might have changed
(afterGenesis, wallSlot) = self.getWallTime().toSlot()
(afterGenesis, wallSlot) = self.getCurrentBeaconTime().toSlot()
beacon_attestations_received.inc()
beacon_attestation_delay.observe(delay.toFloatSeconds())
@ -250,7 +255,7 @@ proc aggregateValidator*(
aggregate = shortLog(signedAggregateAndProof.message.aggregate)
signature = shortLog(signedAggregateAndProof.signature)
let wallTime = self.getWallTime()
let wallTime = self.getCurrentBeaconTime()
var (afterGenesis, wallSlot) = wallTime.toSlot()
if not afterGenesis:
@ -276,7 +281,7 @@ proc aggregateValidator*(
return v.error[0]
# Due to async validation the wallSlot here might have changed
(afterGenesis, wallSlot) = self.getWallTime().toSlot()
(afterGenesis, wallSlot) = self.getCurrentBeaconTime().toSlot()
beacon_aggregates_received.inc()
beacon_aggregate_delay.observe(delay.toFloatSeconds())

View File

@ -36,10 +36,12 @@ import
version, conf,
ssz/ssz_serialization, beacon_clock],
../spec/datatypes/base,
../spec/[digest, network],
../spec/[digest, network, helpers, forkedbeaconstate_helpers],
../validators/keystore_management,
./eth2_discovery, ./peer_pool, ./libp2p_json_serialization
from ../spec/datatypes/altair import nil
when chronicles.enabledLogLevel == LogLevel.TRACE:
import std/sequtils
@ -75,7 +77,7 @@ type
peerPool*: PeerPool[Peer, PeerID]
protocolStates*: seq[RootRef]
libp2pTransportLoops*: seq[Future[void]]
metadata*: MetaData
metadata*: altair.MetaData
connectTimeout*: chronos.Duration
seenThreshold*: chronos.Duration
connQueue: AsyncQueue[PeerAddr]
@ -83,6 +85,7 @@ type
connWorkers: seq[Future[void]]
connTable: HashSet[PeerID]
forkId: ENRForkID
forkDigests*: ForkDigestsRef
rng*: ref BrHmacDrbgContext
peers*: Table[PeerID, Peer]
validTopics: HashSet[string]
@ -119,7 +122,7 @@ type
Disconnecting,
Disconnected
UntypedResponse = ref object
UntypedResponse* = ref object
peer*: Peer
stream*: Connection
writtenChunks*: int
@ -194,6 +197,7 @@ type
ReadResponseTimeout
ZeroSizePrefix
SizePrefixOverflow
InvalidContextBytes
Eth2NetworkingError = object
case kind*: Eth2NetworkingErrorKind
@ -208,6 +212,11 @@ type
NetRes*[T] = Result[T, Eth2NetworkingError]
## This is type returned from all network requests
func phase0metadata*(node: Eth2Node): MetaData =
MetaData(
seq_number: node.metadata.seq_number,
attnets: node.metadata.attnets)
const
clientId* = "Nimbus beacon node " & fullVersionStr
nodeMetadataFilename = "node-metadata.json"
@ -243,7 +252,7 @@ const
SeenTableTimeReconnect = 1.minutes
## Minimal time between disconnection and reconnection attempt
template neterr(kindParam: Eth2NetworkingErrorKind): auto =
template neterr*(kindParam: Eth2NetworkingErrorKind): auto =
err(type(result), Eth2NetworkingError(kind: kindParam))
# Metrics for tracking attestation and beacon block loss
@ -487,14 +496,19 @@ proc getRequestProtoName(fn: NimNode): NimNode =
proc writeChunk*(conn: Connection,
responseCode: Option[ResponseCode],
payload: Bytes): Future[void] =
payload: Bytes,
contextBytes: openarray[byte] = []): Future[void] =
var output = memoryOutput()
try:
if responseCode.isSome:
output.write byte(responseCode.get)
if contextBytes.len > 0:
output.write contextBytes
output.write toBytes(payload.lenu64, Leb128).toOpenArray()
framingFormatCompress(output, payload)
except IOError as exc:
raiseAssert exc.msg # memoryOutput shouldn't raise
@ -545,7 +559,7 @@ proc sendResponseChunkBytes(response: UntypedResponse, payload: Bytes): Future[v
inc response.writtenChunks
response.stream.writeChunk(some Success, payload)
proc sendResponseChunkObj(response: UntypedResponse, val: auto): Future[void] =
proc sendResponseChunk*(response: UntypedResponse, val: auto): Future[void] =
inc response.writtenChunks
response.stream.writeChunk(some Success, SSZ.encode(val))
@ -593,12 +607,14 @@ proc init*[MsgType](T: type SingleChunkResponse[MsgType],
peer: Peer, conn: Connection): T =
T(UntypedResponse(peer: peer, stream: conn))
template write*[M](r: MultipleChunksResponse[M], val: auto): untyped =
sendResponseChunkObj(UntypedResponse(r), val)
template write*[M](r: MultipleChunksResponse[M], val: M): untyped =
mixin sendResponseChunk
sendResponseChunk(UntypedResponse(r), val)
template send*[M](r: SingleChunkResponse[M], val: auto): untyped =
template send*[M](r: SingleChunkResponse[M], val: M): untyped =
mixin sendResponseChunk
doAssert UntypedResponse(r).writtenChunks == 0
sendResponseChunkObj(UntypedResponse(r), val)
sendResponseChunk(UntypedResponse(r), val)
proc performProtocolHandshakes*(peer: Peer, incoming: bool) {.async.} =
# Loop down serially because it's easier to reason about the connection state
@ -723,6 +739,9 @@ proc handleIncomingStream(network: Eth2Node,
of UnexpectedEOF, PotentiallyExpectedEOF:
(InvalidRequest, errorMsgLit "Incomplete request")
of InvalidContextBytes:
(ServerError, errorMsgLit "Unrecognized context bytes")
of InvalidSnappyBytes:
(InvalidRequest, errorMsgLit "Failed to decompress snappy payload")
@ -960,10 +979,11 @@ proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
# when no peers are in the routing table. Don't run it in continuous loop.
await sleepAsync(1.seconds)
proc getPersistentNetMetadata*(config: BeaconNodeConf): MetaData {.raises: [Defect, IOError, SerializationError].} =
proc getPersistentNetMetadata*(config: BeaconNodeConf): altair.MetaData
{.raises: [Defect, IOError, SerializationError].} =
let metadataPath = config.dataDir / nodeMetadataFilename
if not fileExists(metadataPath):
result = MetaData()
result = altair.MetaData()
for i in 0 ..< ATTESTATION_SUBNET_COUNT:
# TODO:
# Persistent (stability) subnets should be stored with their expiration
@ -971,7 +991,7 @@ proc getPersistentNetMetadata*(config: BeaconNodeConf): MetaData {.raises: [Defe
result.attnets[i] = false
Json.saveFile(metadataPath, result)
else:
result = Json.loadFile(metadataPath, MetaData)
result = Json.loadFile(metadataPath, altair.MetaData)
proc resolvePeer(peer: Peer) =
# Resolve task which performs searching of peer's public key and recovery of
@ -1105,7 +1125,8 @@ proc onConnEvent(node: Eth2Node, peerId: PeerID, event: ConnEvent) {.async.} =
peer = peerId, peer_state = peer.connectionState
peer.connectionState = Disconnected
proc new*(T: type Eth2Node, config: BeaconNodeConf, enrForkId: ENRForkID,
proc new*(T: type Eth2Node, config: BeaconNodeConf,
enrForkId: ENRForkID, forkDigests: ForkDigestsRef,
switch: Switch, pubsub: GossipSub, ip: Option[ValidIpAddress],
tcpPort, udpPort: Option[Port], privKey: keys.PrivateKey, discovery: bool,
rng: ref BrHmacDrbgContext): T {.raises: [Defect, CatchableError].} =
@ -1132,6 +1153,7 @@ proc new*(T: type Eth2Node, config: BeaconNodeConf, enrForkId: ENRForkID,
# the previous netkey.
metadata: metadata,
forkId: enrForkId,
forkDigests: forkDigests,
discovery: Eth2DiscoveryProtocol.new(
config, ip, tcpPort, udpPort, privKey,
{"eth2": SSZ.encode(enrForkId), "attnets": SSZ.encode(metadata.attnets)},
@ -1518,29 +1540,24 @@ func gossipId(data: openArray[byte], topic: string, valid: bool): seq[byte] =
return messageDigest.data[0..19].toSeq()
func isAltairTopic(topic: string): bool =
let splitted = topic.split('/')
if splitted.len < 3:
false
#TODO i'm not sure how to get the altair fork digest
else:
splitted[1] == "ALTAIRFORKDIGEST"
func isAltairTopic(topic: string, altairPrefix: string): bool =
const prefixLen = "/eth2/".len
func getAltairTopic(m: messages.Message): string =
if topic.len <= altairPrefix.len + prefixLen:
false
else:
for ind, ch in altairPrefix:
if ch != topic[ind + prefixLen]: return false
true
func getAltairTopic(m: messages.Message, altairPrefix: string): string =
# TODO Return a lent string here to avoid the string copy
let topic = if m.topicIDs.len > 0: m.topicIDs[0] else: ""
if isAltairTopic(topic):
if isAltairTopic(topic, altairPrefix):
topic
else:
""
func msgIdProvider(m: messages.Message): seq[byte] =
let topic = getAltairTopic(m)
try:
let decoded = snappy.decode(m.data, GOSSIP_MAX_SIZE)
gossipId(decoded, topic, true)
except CatchableError:
gossipId(m.data, topic, false)
proc newBeaconSwitch*(config: BeaconNodeConf, seckey: PrivateKey,
address: MultiAddress,
rng: ref BrHmacDrbgContext): Switch {.raises: [Defect, CatchableError].} =
@ -1564,12 +1581,24 @@ proc newBeaconSwitch*(config: BeaconNodeConf, seckey: PrivateKey,
proc createEth2Node*(rng: ref BrHmacDrbgContext,
config: BeaconNodeConf,
netKeys: NetKeyPair,
enrForkId: ENRForkID): Eth2Node {.raises: [Defect, CatchableError].} =
runtimePreset: RuntimePreset,
forkDigests: ForkDigestsRef,
genesisValidatorsRoot: Eth2Digest): Eth2Node
{.raises: [Defect, CatchableError].} =
var
enrForkId = getENRForkID(
# TODO altair-transition
# This function should gain an extra argument specifying
# whether the client head state is already past the Altair
# migration point.
runtimePreset.GENESIS_FORK_VERSION,
genesisValidatorsRoot)
(extIp, extTcpPort, extUdpPort) = try: setupAddress(
config.nat, config.listenAddress, config.tcpPort, config.udpPort, clientId)
except CatchableError as exc: raise exc
except Exception as exc: raiseAssert exc.msg
hostAddress = tcpEndPoint(config.listenAddress, config.tcpPort)
announcedAddresses = if extIp.isNone() or extTcpPort.isNone(): @[]
else: @[tcpEndPoint(extIp.get(), extTcpPort.get())]
@ -1583,6 +1612,14 @@ proc createEth2Node*(rng: ref BrHmacDrbgContext,
# are running behind a NAT).
var switch = newBeaconSwitch(config, netKeys.seckey, hostAddress, rng)
func msgIdProvider(m: messages.Message): seq[byte] =
let topic = getAltairTopic(m, forkDigests.altairTopicPrefix)
try:
let decoded = snappy.decode(m.data, GOSSIP_MAX_SIZE)
gossipId(decoded, topic, true)
except CatchableError:
gossipId(m.data, topic, false)
let
params = GossipSubParams(
explicit: true,
@ -1638,7 +1675,9 @@ proc createEth2Node*(rng: ref BrHmacDrbgContext,
except Exception as exc: raiseAssert exc.msg # TODO fix libp2p
switch.mount(pubsub)
Eth2Node.new(config, enrForkId, switch, pubsub,
Eth2Node.new(config, enrForkId,
forkDigests,
switch, pubsub,
extIp, extTcpPort, extUdpPort,
netKeys.seckey.asEthKey,
discovery = config.discv5Enabled,

View File

@ -93,8 +93,8 @@ proc uncompressFramedStream*(conn: Connection,
return ok output
proc readChunkPayload(conn: Connection, peer: Peer,
MsgType: type): Future[NetRes[MsgType]] {.async.} =
proc readChunkPayload*(conn: Connection, peer: Peer,
MsgType: type): Future[NetRes[MsgType]] {.async.} =
let sm = now(chronos.Moment)
let size =
try: await conn.readVarint()
@ -126,6 +126,8 @@ proc readChunkPayload(conn: Connection, peer: Peer,
proc readResponseChunk(conn: Connection, peer: Peer,
MsgType: typedesc): Future[NetRes[MsgType]] {.async.} =
mixin readChunkPayload
try:
var responseCodeByte: byte
try:

View File

@ -245,8 +245,6 @@ proc init*(T: type BeaconNode,
chainDagFlags = if config.verifyFinalization: {verifyFinalization}
else: {}
dag = ChainDAGRef.init(runtimePreset, db, chainDagFlags)
beaconClock =
BeaconClock.init(getStateField(dag.headState.data, genesis_time))
quarantine = QuarantineRef.init(rng)
databaseGenesisValidatorsRoot =
getStateField(dag.headState.data, genesis_validators_root)
@ -264,7 +262,7 @@ proc init*(T: type BeaconNode,
if config.weakSubjectivityCheckpoint.isSome:
let
currentSlot = beaconClock.now.slotOrZero
currentSlot = dag.beaconClock.now.slotOrZero
isCheckpointStale = not is_within_weak_subjectivity_period(
currentSlot,
dag.headState.data,
@ -314,12 +312,12 @@ proc init*(T: type BeaconNode,
netKeys = getPersistentNetKeys(rng[], config)
nickname = if config.nodeName == "auto": shortForm(netKeys)
else: config.nodeName
enrForkId = getENRForkID(
getStateField(dag.headState.data, fork),
network = createEth2Node(
rng, config, netKeys, runtimePreset, dag.forkDigests,
getStateField(dag.headState.data, genesis_validators_root))
topicBeaconBlocks = getBeaconBlocksTopic(enrForkId.fork_digest)
topicAggregateAndProofs = getAggregateAndProofsTopic(enrForkId.fork_digest)
network = createEth2Node(rng, config, netKeys, enrForkId)
# TODO altair-transition
topicBeaconBlocks = getBeaconBlocksTopic(dag.forkDigests.phase0)
topicAggregateAndProofs = getAggregateAndProofsTopic(dag.forkDigests.phase0)
attestationPool = newClone(AttestationPool.init(dag, quarantine))
exitPool = newClone(ExitPool.init(dag, quarantine))
@ -347,15 +345,11 @@ proc init*(T: type BeaconNode,
)
blockProcessor = BlockProcessor.new(
config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming,
consensusManager,
proc(): BeaconTime = beaconClock.now())
consensusManager, getTime)
processor = Eth2Processor.new(
config.doppelgangerDetection,
blockProcessor,
dag, attestationPool, exitPool, validatorPool,
quarantine,
rng,
proc(): BeaconTime = beaconClock.now())
blockProcessor, dag, attestationPool, exitPool, validatorPool,
quarantine, rng, getTime)
var node = BeaconNode(
nickname: nickname,
@ -371,10 +365,8 @@ proc init*(T: type BeaconNode,
attachedValidators: validatorPool,
exitPool: exitPool,
eth1Monitor: eth1Monitor,
beaconClock: beaconClock,
rpcServer: rpcServer,
restServer: restServer,
forkDigest: enrForkId.fork_digest,
topicBeaconBlocks: topicBeaconBlocks,
topicAggregateAndProofs: topicAggregateAndProofs,
processor: processor,
@ -386,16 +378,17 @@ proc init*(T: type BeaconNode,
# set topic validation routine
network.setValidTopics(
block:
# TODO altair-transition
var
topics = @[
topicBeaconBlocks,
getAttesterSlashingsTopic(enrForkId.fork_digest),
getProposerSlashingsTopic(enrForkId.fork_digest),
getVoluntaryExitsTopic(enrForkId.fork_digest),
getAggregateAndProofsTopic(enrForkId.fork_digest)
getAttesterSlashingsTopic(network.forkDigests.phase0),
getProposerSlashingsTopic(network.forkDigests.phase0),
getVoluntaryExitsTopic(network.forkDigests.phase0),
getAggregateAndProofsTopic(network.forkDigests.phase0)
]
for subnet_id in 0'u64 ..< ATTESTATION_SUBNET_COUNT:
topics &= getAttestationTopic(enrForkId.fork_digest, SubnetId(subnet_id))
topics &= getAttestationTopic(network.forkDigests.phase0, SubnetId(subnet_id))
topics)
if node.config.inProcessValidators:
@ -411,7 +404,8 @@ proc init*(T: type BeaconNode,
# This merely configures the BeaconSync
# The traffic will be started when we join the network.
network.initBeaconSync(dag, enrForkId.fork_digest)
# TODO altair-transition
network.initBeaconSync(dag, network.forkDigests.phase0, getTime)
node.updateValidatorMetrics()
@ -635,7 +629,7 @@ proc cycleAttestationSubnets(node: BeaconNode, wallSlot: Slot) {.async.} =
proc getInitialAggregateSubnets(node: BeaconNode): Table[SubnetId, Slot] =
let
wallEpoch = node.beaconClock.now().slotOrZero().epoch
wallEpoch = node.dag.beaconClock.now.slotOrZero.epoch
validatorIndices = toIntSet(toSeq(node.getAttachedValidators().keys()))
template mergeAggregateSubnets(epoch: Epoch) =
@ -674,7 +668,7 @@ proc subscribeAttestationSubnetHandlers(node: BeaconNode) {.
ss.subnet_id = SubnetId(i)
ss.expiration = FAR_FUTURE_EPOCH
else:
let wallEpoch = node.beaconClock.now().slotOrZero().epoch
let wallEpoch = node.dag.beaconClock.now.slotOrZero.epoch
# TODO make length dynamic when validator-client-based validators join and leave
# In normal mode, there's one subnet subscription per validator, changing
@ -758,11 +752,12 @@ proc addMessageHandlers(node: BeaconNode) {.raises: [Defect, CatchableError].} =
aggregateTopicParams.validateParameters().tryGet()
basicParams.validateParameters.tryGet()
# TODO altair-transition
node.network.subscribe(node.topicBeaconBlocks, blocksTopicParams, enableTopicMetrics = true)
node.network.subscribe(getAttesterSlashingsTopic(node.forkDigest), basicParams)
node.network.subscribe(getProposerSlashingsTopic(node.forkDigest), basicParams)
node.network.subscribe(getVoluntaryExitsTopic(node.forkDigest), basicParams)
node.network.subscribe(getAggregateAndProofsTopic(node.forkDigest), aggregateTopicParams, enableTopicMetrics = true)
node.network.subscribe(getAttesterSlashingsTopic(node.dag.forkDigests.phase0), basicParams)
node.network.subscribe(getProposerSlashingsTopic(node.dag.forkDigests.phase0), basicParams)
node.network.subscribe(getVoluntaryExitsTopic(node.dag.forkDigests.phase0), basicParams)
node.network.subscribe(getAggregateAndProofsTopic(node.dag.forkDigests.phase0), aggregateTopicParams, enableTopicMetrics = true)
node.subscribeAttestationSubnetHandlers()
func getTopicSubscriptionEnabled(node: BeaconNode): bool =
@ -772,15 +767,16 @@ proc removeMessageHandlers(node: BeaconNode) {.raises: [Defect, CatchableError].
node.attestationSubnets.enabled = false
doAssert not node.getTopicSubscriptionEnabled()
node.network.unsubscribe(getBeaconBlocksTopic(node.forkDigest))
node.network.unsubscribe(getVoluntaryExitsTopic(node.forkDigest))
node.network.unsubscribe(getProposerSlashingsTopic(node.forkDigest))
node.network.unsubscribe(getAttesterSlashingsTopic(node.forkDigest))
node.network.unsubscribe(getAggregateAndProofsTopic(node.forkDigest))
# TODO altair-transition
node.network.unsubscribe(getBeaconBlocksTopic(node.dag.forkDigests.phase0))
node.network.unsubscribe(getVoluntaryExitsTopic(node.dag.forkDigests.phase0))
node.network.unsubscribe(getProposerSlashingsTopic(node.dag.forkDigests.phase0))
node.network.unsubscribe(getAttesterSlashingsTopic(node.dag.forkDigests.phase0))
node.network.unsubscribe(getAggregateAndProofsTopic(node.dag.forkDigests.phase0))
for subnet_id in 0'u64 ..< ATTESTATION_SUBNET_COUNT:
node.network.unsubscribe(
getAttestationTopic(node.forkDigest, SubnetId(subnet_id)))
getAttestationTopic(node.dag.forkDigests.phase0, SubnetId(subnet_id)))
proc setupDoppelgangerDetection(node: BeaconNode, slot: Slot) =
# When another client's already running, this is very likely to detect
@ -932,7 +928,7 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
node.attestationSubnets.proposingSlots,
node.attestationSubnets.lastCalculatedEpoch, slot)
nextActionWaitTime = saturate(fromNow(
node.beaconClock, min(nextAttestationSlot, nextProposalSlot)))
node.dag.beaconClock, min(nextAttestationSlot, nextProposalSlot)))
info "Slot end",
slot = shortLog(slot),
@ -959,7 +955,7 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
# state in anticipation of receiving the next block - we do it after logging
# slot end since the nextActionWaitTime can be short
let
advanceCutoff = node.beaconClock.fromNow(
advanceCutoff = node.dag.beaconClock.fromNow(
slot.toBeaconTime(chronos.seconds(int(SECONDS_PER_SLOT - 1))))
if advanceCutoff.inFuture:
# We wait until there's only a second left before the next slot begins, then
@ -1037,7 +1033,7 @@ proc runSlotLoop(node: BeaconNode, startTime: BeaconTime) {.async.} =
await sleepAsync(timeToNextSlot)
let
wallTime = node.beaconClock.now()
wallTime = node.dag.beaconClock.now()
wallSlot = wallTime.slotOrZero() # Always > GENESIS!
if wallSlot < nextSlot:
@ -1086,7 +1082,7 @@ proc runSlotLoop(node: BeaconNode, startTime: BeaconTime) {.async.} =
curSlot = wallSlot
nextSlot = wallSlot + 1
timeToNextSlot = saturate(node.beaconClock.fromNow(nextSlot))
timeToNextSlot = saturate(node.dag.beaconClock.fromNow(nextSlot))
proc handleMissingBlocks(node: BeaconNode) =
let missingBlocks = node.quarantine.checkMissing()
@ -1118,7 +1114,7 @@ proc startSyncManager(node: BeaconNode) =
node.dag.head.slot
proc getLocalWallSlot(): Slot =
node.beaconClock.now().slotOrZero
node.dag.beaconClock.now.slotOrZero
func getFirstSlotAtFinalizedEpoch(): Slot =
node.dag.finalizedHead.slot
@ -1180,17 +1176,19 @@ proc installMessageValidators(node: BeaconNode) =
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#attestations-and-aggregation
# These validators stay around the whole time, regardless of which specific
# subnets are subscribed to during any given epoch.
# TODO altair-transition
for it in 0'u64 ..< ATTESTATION_SUBNET_COUNT.uint64:
closureScope:
let subnet_id = SubnetId(it)
node.network.addAsyncValidator(
getAttestationTopic(node.forkDigest, subnet_id),
getAttestationTopic(node.dag.forkDigests.phase0, subnet_id),
# This proc needs to be within closureScope; don't lift out of loop.
proc(attestation: Attestation): Future[ValidationResult] =
node.processor.attestationValidator(attestation, subnet_id))
node.network.addAsyncValidator(
getAggregateAndProofsTopic(node.forkDigest),
getAggregateAndProofsTopic(node.dag.forkDigests.phase0),
proc(signedAggregateAndProof: SignedAggregateAndProof): Future[ValidationResult] =
node.processor.aggregateValidator(signedAggregateAndProof))
@ -1200,17 +1198,17 @@ proc installMessageValidators(node: BeaconNode) =
node.processor[].blockValidator(signedBlock))
node.network.addValidator(
getAttesterSlashingsTopic(node.forkDigest),
getAttesterSlashingsTopic(node.dag.forkDigests.phase0),
proc (attesterSlashing: AttesterSlashing): ValidationResult =
node.processor[].attesterSlashingValidator(attesterSlashing))
node.network.addValidator(
getProposerSlashingsTopic(node.forkDigest),
getProposerSlashingsTopic(node.dag.forkDigests.phase0),
proc (proposerSlashing: ProposerSlashing): ValidationResult =
node.processor[].proposerSlashingValidator(proposerSlashing))
node.network.addValidator(
getVoluntaryExitsTopic(node.forkDigest),
getVoluntaryExitsTopic(node.dag.forkDigests.phase0),
proc (signedVoluntaryExit: SignedVoluntaryExit): ValidationResult =
node.processor[].voluntaryExitValidator(signedVoluntaryExit))
@ -1246,7 +1244,7 @@ proc run*(node: BeaconNode) {.raises: [Defect, CatchableError].} =
node.installMessageValidators()
let startTime = node.beaconClock.now()
let startTime = node.dag.beaconClock.now()
asyncSpawn runSlotLoop(node, startTime)
asyncSpawn runOnSecondLoop(node)
asyncSpawn runQueueProcessingLoop(node.blockProcessor)
@ -1310,14 +1308,14 @@ proc start(node: BeaconNode) {.raises: [Defect, CatchableError].} =
let
head = node.dag.head
finalizedHead = node.dag.finalizedHead
genesisTime = node.beaconClock.fromNow(toBeaconTime(Slot 0))
genesisTime = node.dag.beaconClock.fromNow(toBeaconTime(Slot 0))
notice "Starting beacon node",
version = fullVersionStr,
enr = node.network.announcedENR.toURI,
peerId = $node.network.switch.peerInfo.peerId,
timeSinceFinalization =
node.beaconClock.now() - finalizedHead.slot.toBeaconTime(),
node.dag.beaconClock.now() - finalizedHead.slot.toBeaconTime(),
head = shortLog(head),
finalizedHead = shortLog(finalizedHead),
SLOTS_PER_EPOCH,
@ -1756,7 +1754,8 @@ proc doCreateTestnet(config: BeaconNodeConf, rng: var BrHmacDrbgContext) {.raise
some(config.bootstrapPort),
some(config.bootstrapPort),
[toFieldPair("eth2", SSZ.encode(getENRForkID(
initialState[].fork, initialState[].genesis_validators_root))),
initialState[].fork.current_version,
initialState[].genesis_validators_root))),
toFieldPair("attnets", SSZ.encode(netMetadata.attnets))])
writeFile(bootstrapFile, bootstrapEnr.tryGet().toURI)

View File

@ -404,14 +404,18 @@ proc installBeaconApiHandlers*(rpcServer: RpcServer, node: BeaconNode) {.
"Beacon node is currently syncing, try again later.")
let head = node.dag.head
if head.slot >= blck.message.slot:
node.network.broadcast(getBeaconBlocksTopic(node.forkDigest), blck)
# TODO altair-transition
let blocksTopic = getBeaconBlocksTopic(node.dag.forkDigests.phase0)
node.network.broadcast(blocksTopic, blck)
# The block failed validation, but was successfully broadcast anyway.
# It was not integrated into the beacon node's database.
return 202
else:
let res = await proposeSignedBlock(node, head, AttachedValidator(), blck)
if res == head:
node.network.broadcast(getBeaconBlocksTopic(node.forkDigest), blck)
# TODO altair-transition
let blocksTopic = getBeaconBlocksTopic(node.dag.forkDigests.phase0)
node.network.broadcast(blocksTopic, blck)
# The block failed validation, but was successfully broadcast anyway.
# It was not integrated into the beacon node''s database.
return 202

View File

@ -626,12 +626,16 @@ proc installBeaconApiHandlers*(router: var RestRouter, node: BeaconNode) =
return RestApiResponse.jsonError(Http503, BeaconNodeInSyncError)
if head.slot >= blck.message.slot:
node.network.broadcast(getBeaconBlocksTopic(node.forkDigest), blck)
# TODO altair-transition
let blocksTopic = getBeaconBlocksTopic(node.dag.forkDigests.phase0)
node.network.broadcast(blocksTopic, blck)
return RestApiResponse.jsonError(Http202, BlockValidationError)
else:
let res = await proposeSignedBlock(node, head, AttachedValidator(), blck)
if res == head:
node.network.broadcast(getBeaconBlocksTopic(node.forkDigest), blck)
# TODO altair-transition
let blocksTopic = getBeaconBlocksTopic(node.dag.forkDigests.phase0)
node.network.broadcast(blocksTopic, blck)
return RestApiResponse.jsonError(Http202, BlockValidationError)
else:
return RestApiResponse.jsonError(Http200, BlockValidationSuccess)

View File

@ -302,7 +302,7 @@ proc installValidatorApiHandlers*(router: var RestRouter, node: BeaconNode) =
dres.get()
for item in proofs:
let wallTime = node.processor.getWallTime()
let wallTime = node.processor.getCurrentBeaconTime()
let res = await node.attestationPool.validateAggregate(
node.processor.batchCrypto, item, wallTime
)

View File

@ -353,6 +353,12 @@ type
# [New in Altair]
sync_aggregate*: SyncAggregate
# https://github.com/ethereum/eth2.0-specs/blob/v1.1.0-alpha.7/specs/altair/p2p-interface.md#metadata
MetaData* = object
seq_number*: uint64
attnets*: BitArray[ATTESTATION_SUBNET_COUNT]
syncnets*: BitArray[SYNC_COMMITTEE_SUBNET_COUNT]
TrustedBeaconBlockBody* = object
## A full verified block
randao_reveal*: TrustedSig

View File

@ -777,6 +777,9 @@ func low*(v: ForkDigest | Version): int = 0
func high*(v: ForkDigest | Version): int = len(v) - 1
func `[]`*(v: ForkDigest | Version, idx: int): byte = array[4, byte](v)[idx]
template bytes*(v: ForkDigest): array[4, byte] =
distinctBase(v)
func shortLog*(s: Slot): uint64 =
s - GENESIS_SLOT

View File

@ -26,6 +26,31 @@ type
of forkPhase0: hbsPhase0*: phase0.HashedBeaconState
of forkAltair: hbsAltair*: altair.HashedBeaconState
BeaconBlockFork* {.pure.} = enum
Phase0
Altair
ForkedSignedBeaconBlock* = object
case kind*: BeaconBlockFork
of BeaconBlockFork.Phase0:
phase0Block*: phase0.SignedBeaconBlock
of BeaconBlockFork.Altair:
altairBlock*: altair.SignedBeaconBlock
ForkedTrustedSignedBeaconBlock* = object
case kind*: BeaconBlockFork
of BeaconBlockFork.Phase0:
phase0Block*: phase0.TrustedSignedBeaconBlock
of BeaconBlockFork.Altair:
altairBlock*: altair.TrustedSignedBeaconBlock
ForkDigests* = object
phase0*: ForkDigest
altair*: ForkDigest
altairTopicPrefix*: string # Used by isAltairTopic
ForkDigestsRef* = ref ForkDigests
# State-related functionality based on ForkedHashedBeaconState instead of BeaconState
# Dispatch functions
@ -177,3 +202,37 @@ func get_previous_epoch*(stateData: ForkedHashedBeaconState): Epoch =
GENESIS_EPOCH
else:
current_epoch - 1
func init*(T: type ForkDigests,
runtimePreset: RuntimePreset,
genesisValidatorsRoot: Eth2Digest): T =
let altairForkDigest = compute_fork_digest(
runtimePreset.ALTAIR_FORK_VERSION,
genesisValidatorsRoot)
T(phase0: compute_fork_digest(
runtimePreset.GENESIS_FORK_VERSION,
genesisValidatorsRoot),
altair: altairForkDigest,
altairTopicPrefix: $altairForkDigest)
template asSigned*(x: phase0.TrustedSignedBeaconBlock or phase0.SigVerifiedBeaconBlock):
phase0.SignedBeaconBlock =
static: # TODO See isomorphicCast
doAssert sizeof(x) == sizeof(phase0.SignedBeaconBlock)
cast[ptr phase0.SignedBeaconBlock](x.unsafeAddr)[]
template asSigned*(x: altair.TrustedSignedBeaconBlock or altair.SigVerifiedBeaconBlock):
altair.SignedBeaconBlock =
static: # TODO See isomorphicCast
doAssert sizeof(x) == sizeof(altair.SignedBeaconBlock)
cast[ptr altair.SignedBeaconBlock](x.unsafeAddr)[]
template asSigned*(x: ForkedTrustedSignedBeaconBlock): ForkedSignedBeaconBlock =
static: # TODO See isomorphicCast
doAssert sizeof(x) == sizeof(ForkedSignedBeaconBlock)
cast[ptr ForkedSignedBeaconBlock](x.unsafeAddr)[]

View File

@ -73,13 +73,14 @@ func getAttestationTopic*(forkDigest: ForkDigest, subnet_id: SubnetId):
## For subscribing and unsubscribing to/from a subnet.
eth2Prefix(forkDigest) & "beacon_attestation_" & $uint64(subnet_id) & "/ssz"
func getENRForkID*(fork: Fork, genesis_validators_root: Eth2Digest): ENRForkID =
func getENRForkID*(fork_version: Version,
genesis_validators_root: Eth2Digest): ENRForkID =
let
current_fork_version = fork.current_version
current_fork_version = fork_version
fork_digest = compute_fork_digest(
current_fork_version, genesis_validators_root)
ENRForkID(
fork_digest: fork_digest,
next_fork_version: current_fork_version,
next_fork_epoch: FAR_FUTURE_EPOCH)
next_fork_epoch: FAR_FUTURE_EPOCH) # TODO altair-transition

View File

@ -11,10 +11,12 @@ import
options, tables, sets, macros,
chronicles, chronos, stew/ranges/bitranges, libp2p/switch,
../spec/[crypto, datatypes, digest, forkedbeaconstate_helpers, network],
../beacon_node_types,
../beacon_node_types, ../beacon_clock,
../networking/eth2_network,
../consensus_object_pools/blockchain_dag
from ../spec/datatypes/altair import nil
logScope:
topics = "sync"
@ -49,6 +51,7 @@ type
BeaconSyncNetworkState* = ref object
dag*: ChainDAGRef
forkDigest*: ForkDigest
getTime*: GetTimeFn
BeaconSyncPeerState* = ref object
statusLastTime*: chronos.Moment
@ -60,6 +63,49 @@ type
BlockRootsList* = List[Eth2Digest, Limit MAX_REQUEST_BLOCKS]
AltairSignedBeaconBlock* = altair.SignedBeaconBlock
proc readChunkPayload*(conn: Connection, peer: Peer,
MsgType: type ForkedSignedBeaconBlock): Future[NetRes[ForkedSignedBeaconBlock]] {.async.} =
var contextBytes: ForkDigest
try:
await conn.readExactly(addr contextBytes, sizeof contextBytes)
except CatchableError as e:
return neterr UnexpectedEOF
if contextBytes == peer.network.forkDigests.phase0:
let res = await readChunkPayload(conn, peer, SignedBeaconBlock)
if res.isOk:
return ok ForkedSignedBeaconBlock(
kind: BeaconBlockFork.Phase0,
phase0Block: res.get)
else:
return err(res.error)
elif contextBytes == peer.network.forkDigests.altair:
let res = await readChunkPayload(conn, peer, AltairSignedBeaconBlock)
if res.isOk:
return ok ForkedSignedBeaconBlock(
kind: BeaconBlockFork.Altair,
altairBlock: res.get)
else:
return err(res.error)
else:
return neterr InvalidContextBytes
proc sendResponseChunk*(response: UntypedResponse,
val: ForkedSignedBeaconBlock): Future[void] =
inc response.writtenChunks
case val.kind
of BeaconBlockFork.Phase0:
response.stream.writeChunk(some ResponseCode.Success,
SSZ.encode(val.phase0Block),
response.peer.network.forkDigests.phase0.bytes)
of BeaconBlockFork.Altair:
response.stream.writeChunk(some ResponseCode.Success,
SSZ.encode(val.altairBlock),
response.peer.network.forkDigests.altair.bytes)
func shortLog*(s: StatusMsg): auto =
(
forkDigest: s.forkDigest,
@ -81,9 +127,11 @@ proc getCurrentStatus*(state: BeaconSyncNetworkState): StatusMsg {.gcsafe.} =
let
dag = state.dag
headBlock = dag.head
wallTime = state.getTime()
wallTimeSlot = dag.beaconClock.toBeaconTime(wallTime).slotOrZero
StatusMsg(
forkDigest: state.forkDigest,
forkDigest: state.dag.forkDigestAtSlot(wallTimeSlot),
finalizedRoot:
getStateField(dag.headState.data, finalized_checkpoint).root,
finalizedEpoch:
@ -99,6 +147,7 @@ proc handleStatus(peer: Peer,
proc setStatusMsg(peer: Peer, statusMsg: StatusMsg) {.gcsafe.}
{.pop.} # TODO fix p2p macro for raises
p2pProtocol BeaconSync(version = 1,
networkState = BeaconSyncNetworkState,
peerState = BeaconSyncPeerState):
@ -145,6 +194,10 @@ p2pProtocol BeaconSync(version = 1,
proc getMetaData(peer: Peer): MetaData
{.libp2pProtocol("metadata", 1).} =
return peer.network.phase0Metadata
proc getMetadata_v2(peer: Peer): altair.MetaData
{.libp2pProtocol("metadata", 2).} =
return peer.network.metadata
proc beaconBlocksByRange(
@ -177,7 +230,8 @@ p2pProtocol BeaconSync(version = 1,
doAssert not blocks[i].isNil, "getBlockRange should return non-nil blocks only"
trace "wrote response block",
slot = blocks[i].slot, roor = shortLog(blocks[i].root)
await response.write(dag.get(blocks[i]).data)
let blk = dag.get(blocks[i]).data
await response.write(blk.asSigned)
debug "Block range request done",
peer, startSlot, count, reqStep, found = count - startIndex
@ -205,7 +259,77 @@ p2pProtocol BeaconSync(version = 1,
for i in 0..<count:
let blockRef = dag.getRef(blockRoots[i])
if not isNil(blockRef):
await response.write(dag.get(blockRef).data)
let blk = dag.get(blockRef).data
await response.write(blk.asSigned)
inc found
peer.updateRequestQuota(found.float * blockResponseCost)
debug "Block root request done",
peer, roots = blockRoots.len, count, found
proc beaconBlocksByRange_v2(
peer: Peer,
startSlot: Slot,
reqCount: uint64,
reqStep: uint64,
response: MultipleChunksResponse[ForkedSignedBeaconBlock])
{.async, libp2pProtocol("beacon_blocks_by_range", 2).} =
trace "got range request", peer, startSlot,
count = reqCount, step = reqStep
if reqCount > 0'u64 and reqStep > 0'u64:
var blocks: array[MAX_REQUEST_BLOCKS, BlockRef]
let
dag = peer.networkState.dag
# Limit number of blocks in response
count = int min(reqCount, blocks.lenu64)
let
endIndex = count - 1
startIndex =
dag.getBlockRange(startSlot, reqStep,
blocks.toOpenArray(0, endIndex))
peer.updateRequestQuota(
blockByRangeLookupCost +
max(0, endIndex - startIndex + 1).float * blockResponseCost)
peer.awaitNonNegativeRequestQuota()
for i in startIndex..endIndex:
doAssert not blocks[i].isNil, "getBlockRange should return non-nil blocks only"
trace "wrote response block",
slot = blocks[i].slot, roor = shortLog(blocks[i].root)
let blk = dag.getForkedBlock(blocks[i])
await response.write(blk.asSigned)
debug "Block range request done",
peer, startSlot, count, reqStep, found = count - startIndex
else:
raise newException(InvalidInputsError, "Empty range requested")
proc beaconBlocksByRoot_v2(
peer: Peer,
# Please note that the SSZ list here ensures that the
# spec constant MAX_REQUEST_BLOCKS is enforced:
blockRoots: BlockRootsList,
response: MultipleChunksResponse[ForkedSignedBeaconBlock])
{.async, libp2pProtocol("beacon_blocks_by_root", 2).} =
if blockRoots.len == 0:
raise newException(InvalidInputsError, "No blocks requested")
let
dag = peer.networkState.dag
count = blockRoots.len
peer.updateRequestQuota(count.float * blockByRootLookupCost)
peer.awaitNonNegativeRequestQuota()
var found = 0
for i in 0..<count:
let blockRef = dag.getRef(blockRoots[i])
if not isNil(blockRef):
let blk = dag.getForkedBlock(blockRef)
await response.write(blk.asSigned)
inc found
peer.updateRequestQuota(found.float * blockResponseCost)
@ -259,7 +383,9 @@ proc handleStatus(peer: Peer,
await peer.handlePeer()
proc initBeaconSync*(network: Eth2Node, dag: ChainDAGRef,
forkDigest: ForkDigest) =
forkDigest: ForkDigest,
getTime: GetTimeFn) =
var networkState = network.protocolState(BeaconSync)
networkState.dag = dag
networkState.forkDigest = forkDigest
networkState.getTime = getTime

View File

@ -177,7 +177,9 @@ proc sendAttestation*(
return case ok
of ValidationResult.Accept:
node.network.broadcast(
getAttestationTopic(node.forkDigest, subnet_id), attestation)
# TODO altair-transition
getAttestationTopic(node.dag.forkDigests.phase0, subnet_id),
attestation)
beacon_attestations_sent.inc()
true
else:
@ -187,15 +189,19 @@ proc sendAttestation*(
false
proc sendVoluntaryExit*(node: BeaconNode, exit: SignedVoluntaryExit) =
node.network.broadcast(getVoluntaryExitsTopic(node.forkDigest), exit)
# TODO altair-transition
let exitsTopic = getVoluntaryExitsTopic(node.dag.forkDigests.phase0)
node.network.broadcast(exitsTopic, exit)
proc sendAttesterSlashing*(node: BeaconNode, slashing: AttesterSlashing) =
node.network.broadcast(getAttesterSlashingsTopic(node.forkDigest),
slashing)
# TODO altair-transition
let attesterSlashingsTopic = getAttesterSlashingsTopic(node.dag.forkDigests.phase0)
node.network.broadcast(attesterSlashingsTopic, slashing)
proc sendProposerSlashing*(node: BeaconNode, slashing: ProposerSlashing) =
node.network.broadcast(getProposerSlashingsTopic(node.forkDigest),
slashing)
# TODO altair-transition
let proposerSlashingsTopic = getProposerSlashingsTopic(node.dag.forkDigests.phase0)
node.network.broadcast(proposerSlashingsTopic, slashing)
proc sendAttestation*(node: BeaconNode, attestation: Attestation): Future[bool] =
# For the validator API, which doesn't supply the subnet id.

View File

@ -133,7 +133,7 @@ To mitigate blocking networking and timeshare between Io and compute, blocks are
This in turn calls:
- `storeBlock(Eth2Processor, SignedBeaconBlock, Slot)`
- `addRawBlock(ChainDagRef, QuarantineRef, SignedBeaconBlock, forkChoiceCallback)`
- `addRawBlock(ChainDAGRef, QuarantineRef, SignedBeaconBlock, forkChoiceCallback)`
- trigger sending attestation if relevant
### Steady state (synced to head)