Implement most of the v0.11.1 networking changes

This commit is contained in:
Zahary Karadjov 2020-04-15 05:41:22 +03:00 committed by zah
parent 8d639d9bd7
commit 672f690bf6
13 changed files with 208 additions and 77 deletions

View File

@ -54,7 +54,6 @@ type
BeaconNode = ref object BeaconNode = ref object
nickname: string nickname: string
network: Eth2Node network: Eth2Node
forkVersion: array[4, byte]
netKeys: KeyPair netKeys: KeyPair
requestManager: RequestManager requestManager: RequestManager
db: BeaconChainDB db: BeaconChainDB
@ -65,6 +64,9 @@ type
mainchainMonitor: MainchainMonitor mainchainMonitor: MainchainMonitor
beaconClock: BeaconClock beaconClock: BeaconClock
rpcServer: RpcServer rpcServer: RpcServer
forkDigest: ForkDigest
topicBeaconBlocks: string
topicAggregateAndProofs: string
proc onBeaconBlock*(node: BeaconNode, signedBlock: SignedBeaconBlock) {.gcsafe.} proc onBeaconBlock*(node: BeaconNode, signedBlock: SignedBeaconBlock) {.gcsafe.}
proc updateHead(node: BeaconNode): BlockRef proc updateHead(node: BeaconNode): BlockRef
@ -130,6 +132,16 @@ proc getStateFromSnapshot(conf: BeaconNodeConf, state: var BeaconState): bool =
result = true result = true
proc enrForkIdFromState(state: BeaconState): ENRForkID =
let
forkVer = state.fork.current_version
forkDigest = compute_fork_digest(forkVer, state.genesis_validators_root)
ENRForkID(
fork_digest: forkDigest,
next_fork_version: forkVer,
next_fork_epoch: FAR_FUTURE_EPOCH)
proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async.} = proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async.} =
let let
netKeys = getPersistentNetKeys(conf) netKeys = getPersistentNetKeys(conf)
@ -186,17 +198,20 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async
# monitor # monitor
mainchainMonitor.start() mainchainMonitor.start()
let network = await createEth2Node(conf)
let rpcServer = if conf.rpcEnabled: let rpcServer = if conf.rpcEnabled:
RpcServer.init(conf.rpcAddress, conf.rpcPort) RpcServer.init(conf.rpcAddress, conf.rpcPort)
else: else:
nil nil
let
enrForkId = enrForkIdFromState(blockPool.headState.data.data)
topicBeaconBlocks = getBeaconBlocksTopic(enrForkId.forkDigest)
topicAggregateAndProofs = getAggregateAndProofsTopic(enrForkId.forkDigest)
network = await createEth2Node(conf, enrForkId)
var res = BeaconNode( var res = BeaconNode(
nickname: nickname, nickname: nickname,
network: network, network: network,
forkVersion: blockPool.headState.data.data.fork.current_version,
netKeys: netKeys, netKeys: netKeys,
requestManager: RequestManager.init(network), requestManager: RequestManager.init(network),
db: db, db: db,
@ -207,12 +222,14 @@ proc init*(T: type BeaconNode, conf: BeaconNodeConf): Future[BeaconNode] {.async
mainchainMonitor: mainchainMonitor, mainchainMonitor: mainchainMonitor,
beaconClock: BeaconClock.init(blockPool.headState.data.data), beaconClock: BeaconClock.init(blockPool.headState.data.data),
rpcServer: rpcServer, rpcServer: rpcServer,
forkDigest: enrForkId.forkDigest,
topicBeaconBlocks: topicBeaconBlocks,
topicAggregateAndProofs: topicAggregateAndProofs,
) )
# TODO sync is called when a remote peer is connected - is that the right # TODO sync is called when a remote peer is connected - is that the right
# time to do so? # time to do so?
let sync = network.protocolState(BeaconSync) network.initBeaconSync(blockPool, enrForkId.forkDigest,
sync.init(blockPool, res.forkVersion,
proc(signedBlock: SignedBeaconBlock) = proc(signedBlock: SignedBeaconBlock) =
if signedBlock.message.slot mod SLOTS_PER_EPOCH == 0: if signedBlock.message.slot mod SLOTS_PER_EPOCH == 0:
# TODO this is a hack to make sure that lmd ghost is run regularly # TODO this is a hack to make sure that lmd ghost is run regularly
@ -337,7 +354,7 @@ proc sendAttestation(node: BeaconNode,
# https://github.com/ethereum/eth2.0-specs/blob/v0.11.1/specs/phase0/validator.md#broadcast-attestation # https://github.com/ethereum/eth2.0-specs/blob/v0.11.1/specs/phase0/validator.md#broadcast-attestation
node.network.broadcast( node.network.broadcast(
getAttestationTopic(attestationData.index), attestation) getAttestationTopic(node.forkDigest, attestationData.index), attestation)
if node.config.dumpEnabled: if node.config.dumpEnabled:
SSZ.saveFile( SSZ.saveFile(
@ -431,7 +448,7 @@ proc proposeBlock(node: BeaconNode,
shortLog(newBlockRef.root) & "-" & shortLog(root()) & ".ssz", shortLog(newBlockRef.root) & "-" & shortLog(root()) & ".ssz",
state()) state())
node.network.broadcast(topicBeaconBlocks, newBlock) node.network.broadcast(node.topicBeaconBlocks, newBlock)
beacon_blocks_proposed.inc() beacon_blocks_proposed.inc()
@ -627,7 +644,7 @@ proc broadcastAggregatedAttestations(
# going to happen once per slot, but this is the best way to get at # going to happen once per slot, but this is the best way to get at
# the validator index and private key pair. TODO verify it only has # the validator index and private key pair. TODO verify it only has
# one isSome() with test. # one isSome() with test.
let option_aggregateandproof = let aggregateAndProof =
aggregate_attestations(node.attestationPool, state, aggregate_attestations(node.attestationPool, state,
committee_index.CommitteeIndex, committee_index.CommitteeIndex,
# TODO https://github.com/status-im/nim-beacon-chain/issues/545 # TODO https://github.com/status-im/nim-beacon-chain/issues/545
@ -636,9 +653,14 @@ proc broadcastAggregatedAttestations(
trailing_distance) trailing_distance)
# Don't broadcast when, e.g., this node isn't an aggregator # Don't broadcast when, e.g., this node isn't an aggregator
if option_aggregateandproof.isSome: if aggregateAndProof.isSome:
node.network.broadcast( var signedAP = SignedAggregateAndProof(
topicAggregateAndProof, option_aggregateandproof.get) message: aggregateAndProof.get,
# TODO Make the signing async here
signature: validator.signAggregateAndProof(aggregateAndProof.get,
state.fork,
state.genesis_validators_root))
node.network.broadcast(node.topicAggregateAndProofs, signedAP)
proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.gcsafe, async.} = proc onSlotStart(node: BeaconNode, lastSlot, scheduledSlot: Slot) {.gcsafe, async.} =
## Called at the beginning of a slot - usually every slot, but sometimes might ## Called at the beginning of a slot - usually every slot, but sometimes might
@ -991,7 +1013,7 @@ proc run*(node: BeaconNode) =
node.rpcServer.installRpcHandlers(node) node.rpcServer.installRpcHandlers(node)
node.rpcServer.start() node.rpcServer.start()
waitFor node.network.subscribe(topicBeaconBlocks) do (signedBlock: SignedBeaconBlock): waitFor node.network.subscribe(node.topicBeaconBlocks) do (signedBlock: SignedBeaconBlock):
onBeaconBlock(node, signedBlock) onBeaconBlock(node, signedBlock)
do (signedBlock: SignedBeaconBlock) -> bool: do (signedBlock: SignedBeaconBlock) -> bool:
let (afterGenesis, slot) = node.beaconClock.now.toSlot() let (afterGenesis, slot) = node.beaconClock.now.toSlot()
@ -1010,7 +1032,7 @@ proc run*(node: BeaconNode) =
closureScope: closureScope:
let ci = it let ci = it
attestationSubscriptions.add(node.network.subscribe( attestationSubscriptions.add(node.network.subscribe(
getAttestationTopic(ci), attestationHandler, getAttestationTopic(node.forkDigest, ci), attestationHandler,
proc(attestation: Attestation): bool = proc(attestation: Attestation): bool =
# https://github.com/ethereum/eth2.0-specs/blob/v0.11.1/specs/phase0/p2p-interface.md#attestation-subnets # https://github.com/ethereum/eth2.0-specs/blob/v0.11.1/specs/phase0/p2p-interface.md#attestation-subnets
let (afterGenesis, slot) = node.beaconClock.now().toSlot() let (afterGenesis, slot) = node.beaconClock.now().toSlot()
@ -1272,7 +1294,8 @@ when isMainModule:
networkKeys.seckey.asEthKey, networkKeys.seckey.asEthKey,
some(config.bootstrapAddress), some(config.bootstrapAddress),
config.bootstrapPort, config.bootstrapPort,
config.bootstrapPort) config.bootstrapPort,
[toFieldPair("eth2", SSZ.encode(enrForkIdFromState initialState))])
writeFile(bootstrapFile, bootstrapEnr.toURI) writeFile(bootstrapFile, bootstrapEnr.toURI)
echo "Wrote ", bootstrapFile echo "Wrote ", bootstrapFile

View File

@ -3,7 +3,7 @@
# {.push raises: [Defect].} # {.push raises: [Defect].}
import import
os, net, strutils, strformat, parseutils, os, net, sequtils, strutils, strformat, parseutils,
chronicles, stew/[results, objects], eth/keys, eth/trie/db, eth/p2p/enode, chronicles, stew/[results, objects], eth/keys, eth/trie/db, eth/p2p/enode,
eth/p2p/discoveryv5/[enr, protocol, discovery_db, types], eth/p2p/discoveryv5/[enr, protocol, discovery_db, types],
libp2p/[multiaddress, peer], libp2p/[multiaddress, peer],
@ -164,7 +164,8 @@ proc loadBootstrapFile*(bootstrapFile: string,
proc new*(T: type Eth2DiscoveryProtocol, proc new*(T: type Eth2DiscoveryProtocol,
conf: BeaconNodeConf, conf: BeaconNodeConf,
ip: Option[IpAddress], tcpPort, udpPort: Port, ip: Option[IpAddress], tcpPort, udpPort: Port,
rawPrivKeyBytes: openarray[byte]): T = rawPrivKeyBytes: openarray[byte],
enrFields: openarray[(string, seq[byte])]): T =
# TODO # TODO
# Implement more configuration options: # Implement more configuration options:
# * for setting up a specific key # * for setting up a specific key
@ -184,4 +185,6 @@ proc new*(T: type Eth2DiscoveryProtocol,
if fileExists(persistentBootstrapFile): if fileExists(persistentBootstrapFile):
loadBootstrapFile(persistentBootstrapFile, bootNodes, bootEnrs, ourPubKey) loadBootstrapFile(persistentBootstrapFile, bootNodes, bootEnrs, ourPubKey)
newProtocol(pk, db, ip, tcpPort, udpPort, bootEnrs) let enrFieldPairs = mapIt(enrFields, toFieldPair(it[0], it[1]))
newProtocol(pk, db, ip, tcpPort, udpPort, enrFieldPairs, bootEnrs)

View File

@ -4,8 +4,8 @@ import
options as stdOptions, net as stdNet, options as stdOptions, net as stdNet,
# Status libs # Status libs
stew/[varints, base58], stew/shims/[macros, tables], stint, stew/[varints, base58, bitseqs], stew/shims/[macros, tables], stint,
faststreams/output_stream, faststreams/output_stream, snappy/framing,
json_serialization, json_serialization/std/[net, options], json_serialization, json_serialization/std/[net, options],
chronos, chronicles, metrics, chronos, chronicles, metrics,
# TODO: create simpler to use libp2p modules that use re-exports # TODO: create simpler to use libp2p modules that use re-exports
@ -21,7 +21,7 @@ import
# Beacon node modules # Beacon node modules
version, conf, eth2_discovery, libp2p_json_serialization, conf, ssz, version, conf, eth2_discovery, libp2p_json_serialization, conf, ssz,
peer_pool peer_pool, spec/[datatypes, network]
import import
eth/p2p/discoveryv5/protocol as discv5_protocol eth/p2p/discoveryv5/protocol as discv5_protocol
@ -48,9 +48,19 @@ type
peerPool*: PeerPool[Peer, PeerID] peerPool*: PeerPool[Peer, PeerID]
protocolStates*: seq[RootRef] protocolStates*: seq[RootRef]
libp2pTransportLoops*: seq[Future[void]] libp2pTransportLoops*: seq[Future[void]]
metadata*: Eth2Metadata
EthereumNode = Eth2Node # needed for the definitions in p2p_backends_helpers EthereumNode = Eth2Node # needed for the definitions in p2p_backends_helpers
Eth2MetaData* = object
seq_number*: uint64
attnets*: BitArray[ATTESTATION_SUBNET_COUNT]
ENRForkID* = object
fork_digest*: ForkDigest
next_fork_version*: Version
next_fork_epoch*: Epoch
Peer* = ref object Peer* = ref object
network*: Eth2Node network*: Eth2Node
info*: PeerInfo info*: PeerInfo
@ -60,6 +70,7 @@ type
protocolStates*: seq[RootRef] protocolStates*: seq[RootRef]
maxInactivityAllowed*: Duration maxInactivityAllowed*: Duration
score*: int score*: int
supportsSnappy: bool
ConnectionState* = enum ConnectionState* = enum
None, None,
@ -121,13 +132,14 @@ type
const const
clientId* = "Nimbus beacon node v" & fullVersionStr clientId* = "Nimbus beacon node v" & fullVersionStr
networkKeyFilename = "privkey.protobuf" networkKeyFilename = "privkey.protobuf"
nodeMetadataFilename = "node-metadata.json"
TCP = net.Protocol.IPPROTO_TCP TCP = net.Protocol.IPPROTO_TCP
HandshakeTimeout = FaultOrError HandshakeTimeout = FaultOrError
# Spec constants # Spec constants
# https://github.com/ethereum/eth2.0-specs/blob/dev/specs/networking/p2p-interface.md#eth-20-network-interaction-domains # https://github.com/ethereum/eth2.0-specs/blob/dev/specs/networking/p2p-interface.md#eth-20-network-interaction-domains
REQ_RESP_MAX_SIZE* = 1 * 1024 * 1024 # bytes MAX_CHUNK_SIZE* = 1 * 1024 * 1024 # bytes
GOSSIP_MAX_SIZE* = 1 * 1024 * 1024 # bytes GOSSIP_MAX_SIZE* = 1 * 1024 * 1024 # bytes
TTFB_TIMEOUT* = 5.seconds TTFB_TIMEOUT* = 5.seconds
RESP_TIMEOUT* = 10.seconds RESP_TIMEOUT* = 10.seconds
@ -213,7 +225,7 @@ proc getRequestProtoName(fn: NimNode): NimNode =
if pragma.len > 0 and $pragma[0] == "libp2pProtocol": if pragma.len > 0 and $pragma[0] == "libp2pProtocol":
let protoName = $(pragma[1]) let protoName = $(pragma[1])
let protoVer = $(pragma[2].intVal) let protoVer = $(pragma[2].intVal)
return newLit("/eth2/beacon_chain/req/" & protoName & "/" & protoVer & "/ssz") return newLit("/eth2/beacon_chain/req/" & protoName & "/" & protoVer & "/")
return newLit("") return newLit("")
@ -248,7 +260,7 @@ proc readSizePrefix(conn: Connection,
case parser.feedByte(nextByte) case parser.feedByte(nextByte)
of Done: of Done:
let res = parser.getResult let res = parser.getResult
if res > uint64(REQ_RESP_MAX_SIZE): if res > uint64(MAX_CHUNK_SIZE):
trace "size prefix outside of range", res trace "size prefix outside of range", res
return -1 return -1
else: else:
@ -382,20 +394,25 @@ proc sendErrorResponse(peer: Peer,
await conn.close() await conn.close()
proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.async} = proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.async} =
var deadline = sleepAsync RESP_TIMEOUT var
var streamFut = peer.network.openStream(peer, protocolId) deadline = sleepAsync RESP_TIMEOUT
protocolId = protocolId & (if peer.supportsSnappy: "ssz_snappy" else: "ssz")
streamFut = peer.network.openStream(peer, protocolId)
await streamFut or deadline await streamFut or deadline
if not streamFut.finished: if not streamFut.finished:
# TODO: we are returning here because the deadline passed, but streamFut.cancel()
# the stream can still be opened eventually a bit later. Who is
# going to close it then?
raise newException(TransmissionError, "Failed to open LibP2P stream") raise newException(TransmissionError, "Failed to open LibP2P stream")
let stream = streamFut.read let stream = streamFut.read
try: try:
var s = memoryOutput() var s = memoryOutput()
s.appendVarint requestBytes.len.uint64 s.appendVarint requestBytes.len.uint64
s.append requestBytes if peer.supportsSnappy:
framing_format_compress(s, requestBytes)
else:
s.append requestBytes
let bytes = s.getOutput let bytes = s.getOutput
await stream.write(bytes) await stream.write(bytes)
finally: finally:
@ -430,15 +447,15 @@ proc sendResponseChunks[T](responder: UntypedResponder, chunks: seq[T]) {.async.
proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes, proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
ResponseMsg: type, ResponseMsg: type,
timeout: Duration): Future[Option[ResponseMsg]] {.gcsafe, async.} = timeout: Duration): Future[Option[ResponseMsg]] {.gcsafe, async.} =
var deadline = sleepAsync timeout var
deadline = sleepAsync timeout
protocolId = protocolId & (if peer.supportsSnappy: "ssz_snappy" else: "ssz")
streamFut = peer.network.openStream(peer, protocolId)
# Open a new LibP2P stream
var streamFut = peer.network.openStream(peer, protocolId)
await streamFut or deadline await streamFut or deadline
if not streamFut.finished: if not streamFut.finished:
# TODO: we are returning here because the deadline passed, but streamFut.cancel()
# the stream can still be opened eventually a bit later. Who is
# going to close it then?
return none(ResponseMsg) return none(ResponseMsg)
let stream = streamFut.read let stream = streamFut.read
@ -446,7 +463,10 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
# Send the request # Send the request
var s = memoryOutput() var s = memoryOutput()
s.appendVarint requestBytes.len.uint64 s.appendVarint requestBytes.len.uint64
s.append requestBytes if peer.supportsSnappy:
framing_format_compress(s, requestBytes)
else:
s.append requestBytes
let bytes = s.getOutput let bytes = s.getOutput
await stream.write(bytes) await stream.write(bytes)
@ -678,15 +698,28 @@ proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
await sleepAsync seconds(1) await sleepAsync seconds(1)
proc init*(T: type Eth2Node, conf: BeaconNodeConf, proc getPersistentNetMetadata*(conf: BeaconNodeConf): Eth2Metadata =
let metadataPath = conf.dataDir / nodeMetadataFilename
if not fileExists(metadataPath):
result = Eth2Metadata()
for i in 0 ..< ATTESTATION_SUBNET_COUNT:
# TODO: For now, we indicate that we participate in all subnets
result.attnets[i] = true
Json.saveFile(metadataPath, result)
else:
result = Json.loadFile(metadataPath, Eth2Metadata)
proc init*(T: type Eth2Node, conf: BeaconNodeConf, enrForkId: ENRForkID,
switch: Switch, ip: Option[IpAddress], tcpPort, udpPort: Port, switch: Switch, ip: Option[IpAddress], tcpPort, udpPort: Port,
privKey: keys.PrivateKey): T = privKey: keys.PrivateKey): T =
new result new result
result.switch = switch result.switch = switch
result.discovery = Eth2DiscoveryProtocol.new(
conf, ip, tcpPort, udpPort, privKey.toRaw)
result.wantedPeers = conf.maxPeers result.wantedPeers = conf.maxPeers
result.peerPool = newPeerPool[Peer, PeerID](maxPeers = conf.maxPeers) result.peerPool = newPeerPool[Peer, PeerID](maxPeers = conf.maxPeers)
result.metadata = getPersistentNetMetadata(conf)
result.discovery = Eth2DiscoveryProtocol.new(
conf, ip, tcpPort, udpPort, privKey.toRaw,
{"eth2": SSZ.encode(enrForkId), "attnets": SSZ.encode(result.metadata.attnets)})
newSeq result.protocolStates, allProtocols.len newSeq result.protocolStates, allProtocols.len
for proto in allProtocols: for proto in allProtocols:
@ -897,7 +930,7 @@ proc getPersistentNetKeys*(conf: BeaconNodeConf): KeyPair =
KeyPair(seckey: privKey, pubkey: privKey.getKey()) KeyPair(seckey: privKey, pubkey: privKey.getKey())
proc createEth2Node*(conf: BeaconNodeConf): Future[Eth2Node] {.async, gcsafe.} = proc createEth2Node*(conf: BeaconNodeConf, enrForkId: ENRForkID): Future[Eth2Node] {.async, gcsafe.} =
var var
(extIp, extTcpPort, extUdpPort) = setupNat(conf) (extIp, extTcpPort, extUdpPort) = setupNat(conf)
hostAddress = tcpEndPoint(conf.libp2pAddress, conf.tcpPort) hostAddress = tcpEndPoint(conf.libp2pAddress, conf.tcpPort)
@ -913,7 +946,8 @@ proc createEth2Node*(conf: BeaconNodeConf): Future[Eth2Node] {.async, gcsafe.} =
# are running behind a NAT). # are running behind a NAT).
var switch = newStandardSwitch(some keys.seckey, hostAddress, var switch = newStandardSwitch(some keys.seckey, hostAddress,
triggerSelf = true, gossip = true) triggerSelf = true, gossip = true)
result = Eth2Node.init(conf, switch, extIp, extTcpPort, extUdpPort, result = Eth2Node.init(conf, enrForkId, switch,
extIp, extTcpPort, extUdpPort,
keys.seckey.asEthKey) keys.seckey.asEthKey)
proc getPersistenBootstrapAddr*(conf: BeaconNodeConf, proc getPersistenBootstrapAddr*(conf: BeaconNodeConf,
@ -921,7 +955,7 @@ proc getPersistenBootstrapAddr*(conf: BeaconNodeConf,
let pair = getPersistentNetKeys(conf) let pair = getPersistentNetKeys(conf)
return enr.Record.init(1'u64, # sequence number return enr.Record.init(1'u64, # sequence number
pair.seckey.asEthKey, pair.seckey.asEthKey,
some(ip), port, port) some(ip), port, port, @[])
proc announcedENR*(node: Eth2Node): enr.Record = proc announcedENR*(node: Eth2Node): enr.Record =
doAssert node.discovery != nil, "The Eth2Node must be initialized" doAssert node.discovery != nil, "The Eth2Node must be initialized"

View File

@ -184,16 +184,18 @@ proc run(conf: InspectorConf) {.async.} =
if conf.decode: if conf.decode:
try: try:
if ticket.topic.startsWith(topicBeaconBlocks): if ticket.topic.endsWith(topicBeaconBlocksSuffix):
info "SignedBeaconBlock", msg = SSZ.decode(message.data, SignedBeaconBlock) info "SignedBeaconBlock", msg = SSZ.decode(message.data, SignedBeaconBlock)
elif ticket.topic.endsWith(topicAttestationSuffix): elif ticket.topic.endsWith(topicAttestationSuffix):
info "Attestation", msg = SSZ.decode(message.data, Attestation) info "Attestation", msg = SSZ.decode(message.data, Attestation)
elif ticket.topic.startsWith(topicVoluntaryExits): elif ticket.topic.endssWith(topicVoluntaryExitsSuffix):
info "SignedVoluntaryExit", msg = SSZ.decode(message.data, SignedVoluntaryExit) info "SignedVoluntaryExit", msg = SSZ.decode(message.data, SignedVoluntaryExit)
elif ticket.topic.startsWith(topicProposerSlashings): elif ticket.topic.endsWith(topicProposerSlashingsSuffix):
info "ProposerSlashing", msg = SSZ.decode(message.data, ProposerSlashing) info "ProposerSlashing", msg = SSZ.decode(message.data, ProposerSlashing)
elif ticket.topic.startsWith(topicAttesterSlashings): elif ticket.topic.endsWith(topicAttesterSlashingsSuffix):
info "AttesterSlashing", msg = SSZ.decode(message.data, AttesterSlashing) info "AttesterSlashing", msg = SSZ.decode(message.data, AttesterSlashing)
elif ticket.topic.endsWith(topicAggregateAndProofsSuffix):
info "AggregateAndProof", msg = SSZ.decode(message.data, AggregateAndProof)
except CatchableError as exc: except CatchableError as exc:
info "Unable to decode message", msg = exc.msg info "Unable to decode message", msg = exc.msg

View File

@ -141,10 +141,12 @@ type
data*: AttestationData data*: AttestationData
signature*: ValidatorSig signature*: ValidatorSig
Version* = array[4, byte] # TODO Maybe make this distinct
ForkDigest* = array[4, byte] # TODO Maybe make this distinct
# https://github.com/ethereum/eth2.0-specs/blob/v0.11.1/specs/phase0/beacon-chain.md#forkdata # https://github.com/ethereum/eth2.0-specs/blob/v0.11.1/specs/phase0/beacon-chain.md#forkdata
ForkData* = object ForkData* = object
# TODO: Spec introduced an alias for Version = array[4, byte] current_version*: Version
current_version*: array[4, byte]
genesis_validators_root*: Eth2Digest genesis_validators_root*: Eth2Digest
# https://github.com/ethereum/eth2.0-specs/blob/v0.11.1/specs/phase0/beacon-chain.md#checkpoint # https://github.com/ethereum/eth2.0-specs/blob/v0.11.1/specs/phase0/beacon-chain.md#checkpoint
@ -328,8 +330,8 @@ type
Fork* = object Fork* = object
# TODO: Spec introduced an alias for Version = array[4, byte] # TODO: Spec introduced an alias for Version = array[4, byte]
# and a default parameter to compute_domain # and a default parameter to compute_domain
previous_version*: array[4, byte] previous_version*: Version
current_version*: array[4, byte] current_version*: Version
epoch*: Epoch ##\ epoch*: Epoch ##\
## Epoch of latest fork ## Epoch of latest fork

View File

@ -131,6 +131,16 @@ func compute_fork_data_root(current_version: array[4, byte],
genesis_validators_root: genesis_validators_root genesis_validators_root: genesis_validators_root
)) ))
# https://github.com/ethereum/eth2.0-specs/blob/v0.11.1/specs/phase0/beacon-chain.md#compute_fork_digest
func compute_fork_digest*(current_version: array[4, byte],
genesis_validators_root: Eth2Digest): array[4, byte] =
# Return the 4-byte fork digest for the ``current_version`` and
# ``genesis_validators_root``.
# This is a digest primarily used for domain separation on the p2p layer.
# 4-bytes suffices for practical separation of forks/chains.
result[0..3] =
compute_fork_data_root(current_version, genesis_validators_root).data[0..3]
# https://github.com/ethereum/eth2.0-specs/blob/v0.11.1/specs/phase0/beacon-chain.md#compute_domain # https://github.com/ethereum/eth2.0-specs/blob/v0.11.1/specs/phase0/beacon-chain.md#compute_domain
func compute_domain*( func compute_domain*(
domain_type: DomainType, domain_type: DomainType,

View File

@ -5,15 +5,18 @@
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms. # at your option. This file may not be copied, modified, or distributed except according to those terms.
import strformat import
strformat,
stew/byteutils,
datatypes
const const
topicBeaconBlocks* = "/eth2/beacon_block/ssz" topicBeaconBlocksSuffix* = "beacon_block/ssz"
topicAttestationSuffix* = "_beacon_attestation/ssz" topicAttestationsSuffix* = "_beacon_attestation/ssz"
topicVoluntaryExits* = "/eth2/voluntary_exit/ssz" topicVoluntaryExitsSuffix* = "voluntary_exit/ssz"
topicProposerSlashings* = "/eth2/proposer_slashing/ssz" topicProposerSlashingsSuffix* = "proposer_slashing/ssz"
topicAttesterSlashings* = "/eth2/attester_slashing/ssz" topicAttesterSlashingsSuffix* = "attester_slashing/ssz"
topicAggregateAndProof* = "/eth2/beacon_aggregate_and_proof/ssz" topicAggregateAndProofsSuffix* = "beacon_aggregate_and_proof/ssz"
# https://github.com/ethereum/eth2.0-specs/blob/v0.11.0/specs/phase0/p2p-interface.md#configuration # https://github.com/ethereum/eth2.0-specs/blob/v0.11.0/specs/phase0/p2p-interface.md#configuration
ATTESTATION_SUBNET_COUNT* = 64 ATTESTATION_SUBNET_COUNT* = 64
@ -23,7 +26,23 @@ const
# This is not part of the spec yet! # This is not part of the spec yet!
defaultEth2RpcPort* = 9090 defaultEth2RpcPort* = 9090
func getAttestationTopic*(committeeIndex: uint64): string = func getBeaconBlocksTopic*(forkDigest: ForkDigest): string =
&"/eth2/{toHex forkDigest}/{topicBeaconBlocksSuffix}"
func getVoluntaryExistsTopic*(forkDigest: ForkDigest): string =
&"/eth2/{toHex forkDigest}/{topicVoluntaryExitsSuffix}"
func getProposerSlashingsTopic*(forkDigest: ForkDigest): string =
&"/eth2/{toHex forkDigest}/{topicProposerSlashingsSuffix}"
func getAttesterSlashingsTopic*(forkDigest: ForkDigest): string =
&"/eth2/{toHex forkDigest}/{topicAttesterSlashingsSuffix}"
func getAggregateAndProofsTopic*(forkDigest: ForkDigest): string =
&"/eth2/{toHex forkDigest}/{topicAggregateAndProofsSuffix}"
func getAttestationTopic*(forkDigest: ForkDigest, committeeIndex: uint64): string =
# https://github.com/ethereum/eth2.0-specs/blob/v0.11.0/specs/phase0/validator.md#broadcast-attestation # https://github.com/ethereum/eth2.0-specs/blob/v0.11.0/specs/phase0/validator.md#broadcast-attestation
let topicIndex = committeeIndex mod ATTESTATION_SUBNET_COUNT let topicIndex = committeeIndex mod ATTESTATION_SUBNET_COUNT
&"/eth2/committee_index{topicIndex}{topicAttestationSuffix}" &"/eth2/{toHex forkDigest}/committee_index{topicIndex}{topicAttestationsSuffix}"

View File

@ -517,6 +517,19 @@ func get_block_signature*(
blsSign(privKey, signing_root.data) blsSign(privKey, signing_root.data)
# https://github.com/ethereum/eth2.0-specs/blob/v0.11.1/specs/phase0/validator.md#broadcast-aggregate
func get_aggregate_and_proof_signature*(fork: Fork, genesis_validators_root: Eth2Digest,
aggregate_and_proof: AggregateAndProof,
privKey: ValidatorPrivKey): ValidatorSig =
let
aggregate = aggregate_and_proof.aggregate
domain = get_domain(fork, DOMAIN_AGGREGATE_AND_PROOF,
compute_epoch_at_slot(aggregate.data.slot),
genesis_validators_root)
signing_root = compute_signing_root(aggregate_and_proof, domain)
return blsSign(privKey, signing_root.data)
# https://github.com/ethereum/eth2.0-specs/blob/v0.11.1/specs/phase0/validator.md#aggregate-signature # https://github.com/ethereum/eth2.0-specs/blob/v0.11.1/specs/phase0/validator.md#aggregate-signature
func get_attestation_signature*( func get_attestation_signature*(
fork: Fork, genesis_validators_root: Eth2Digest, attestation: AttestationData, fork: Fork, genesis_validators_root: Eth2Digest, attestation: AttestationData,

View File

@ -20,9 +20,10 @@ type
index: uint32 index: uint32
BeaconBlockCallback* = proc(signedBlock: SignedBeaconBlock) {.gcsafe.} BeaconBlockCallback* = proc(signedBlock: SignedBeaconBlock) {.gcsafe.}
BeaconSyncNetworkState* = ref object BeaconSyncNetworkState* = ref object
blockPool*: BlockPool blockPool*: BlockPool
forkVersion*: array[4, byte] forkDigest*: ForkDigest
onBeaconBlock*: BeaconBlockCallback onBeaconBlock*: BeaconBlockCallback
BeaconSyncPeerState* = ref object BeaconSyncPeerState* = ref object
@ -35,13 +36,6 @@ type
const const
MAX_REQUESTED_BLOCKS = 20'u64 MAX_REQUESTED_BLOCKS = 20'u64
func init*(
v: BeaconSyncNetworkState, blockPool: BlockPool,
forkVersion: array[4, byte], onBeaconBlock: BeaconBlockCallback) =
v.blockPool = blockPool
v.forkVersion = forkVersion
v.onBeaconBlock = onBeaconBlock
proc importBlocks(state: BeaconSyncNetworkState, proc importBlocks(state: BeaconSyncNetworkState,
blocks: openarray[SignedBeaconBlock]) {.gcsafe.} = blocks: openarray[SignedBeaconBlock]) {.gcsafe.} =
for blk in blocks: for blk in blocks:
@ -50,7 +44,7 @@ proc importBlocks(state: BeaconSyncNetworkState,
type type
StatusMsg = object StatusMsg = object
forkVersion*: array[4, byte] forkDigest*: ForkDigest
finalizedRoot*: Eth2Digest finalizedRoot*: Eth2Digest
finalizedEpoch*: Epoch finalizedEpoch*: Epoch
headRoot*: Eth2Digest headRoot*: Eth2Digest
@ -66,7 +60,7 @@ proc getCurrentStatus(state: BeaconSyncNetworkState): StatusMsg {.gcsafe.} =
finalizedEpoch = finalizedHead.slot.compute_epoch_at_slot() finalizedEpoch = finalizedHead.slot.compute_epoch_at_slot()
StatusMsg( StatusMsg(
fork_version: state.forkVersion, forkDigest: state.forkDigest,
finalizedRoot: finalizedHead.blck.root, finalizedRoot: finalizedHead.blck.root,
finalizedEpoch: finalizedEpoch, finalizedEpoch: finalizedEpoch,
headRoot: headRoot, headRoot: headRoot,
@ -111,6 +105,18 @@ p2pProtocol BeaconSync(version = 1,
proc goodbye(peer: Peer, reason: DisconnectionReason) {.libp2pProtocol("goodbye", 1).} proc goodbye(peer: Peer, reason: DisconnectionReason) {.libp2pProtocol("goodbye", 1).}
requestResponse:
proc ping(peer: Peer, value: uint64) {.libp2pProtocol("ping", 1).} =
await response.write(peer.network.metadata.seq_number)
proc pingResp(peer: Peer, value: uint64)
requestResponse:
proc getMetadata(peer: Peer) {.libp2pProtocol("metadata", 1).} =
await response.write(peer.network.metadata)
proc metadataReps(peer: Peer, metadata: Eth2Metadata)
requestResponse: requestResponse:
proc beaconBlocksByRange( proc beaconBlocksByRange(
peer: Peer, peer: Peer,
@ -153,9 +159,9 @@ proc handleInitialStatus(peer: Peer,
state: BeaconSyncNetworkState, state: BeaconSyncNetworkState,
ourStatus: StatusMsg, ourStatus: StatusMsg,
theirStatus: StatusMsg) {.async, gcsafe.} = theirStatus: StatusMsg) {.async, gcsafe.} =
if theirStatus.forkVersion != state.forkVersion: if theirStatus.forkDigest != state.forkDigest:
notice "Irrelevant peer", notice "Irrelevant peer", peer,
peer, theirFork = theirStatus.forkVersion, ourFork = state.forkVersion theirFork = theirStatus.forkDigest, ourFork = state.forkDigest
await peer.disconnect(IrrelevantNetwork) await peer.disconnect(IrrelevantNetwork)
return return
@ -224,3 +230,12 @@ proc handleInitialStatus(peer: Peer,
except CatchableError as e: except CatchableError as e:
warn "Failed to sync with peer", peer, err = e.msg warn "Failed to sync with peer", peer, err = e.msg
proc initBeaconSync*(network: Eth2Node,
blockPool: BlockPool,
forkDigest: ForkDigest,
onBeaconBlock: BeaconBlockCallback) =
var networkState = network.protocolState(BeaconSync)
networkState.blockPool = blockPool
networkState.forkDigest = forkDigest
networkState.onBeaconBlock = onBeaconBlock

View File

@ -57,6 +57,16 @@ proc signAttestation*(v: AttachedValidator,
error "Unimplemented" error "Unimplemented"
quit 1 quit 1
proc signAggregateAndProof*(v: AttachedValidator,
aggregate_and_proof: AggregateAndProof,
fork: Fork, genesis_validators_root: Eth2Digest): ValidatorSig =
if v.kind == inProcess:
result = get_aggregate_and_proof_signature(
fork, genesis_validators_root, aggregate_and_proof, v.privKey)
else:
error "Out of process signAggregateAndProof not implemented"
quit 1
# https://github.com/ethereum/eth2.0-specs/blob/v0.11.1/specs/phase0/validator.md#randao-reveal # https://github.com/ethereum/eth2.0-specs/blob/v0.11.1/specs/phase0/validator.md#randao-reveal
func genRandaoReveal*(k: ValidatorPrivKey, fork: Fork, func genRandaoReveal*(k: ValidatorPrivKey, fork: Fork,
genesis_validators_root: Eth2Digest, slot: Slot): ValidatorSig = genesis_validators_root: Eth2Digest, slot: Slot): ValidatorSig =

View File

@ -12,7 +12,7 @@ mkdir -p "$VALIDATORS_DIR"
cd "$GIT_ROOT" cd "$GIT_ROOT"
CUSTOM_NIMFLAGS="${NIMFLAGS} -d:chronicles_sinks:textlines,json[file]" CUSTOM_NIMFLAGS="${NIMFLAGS} -d:useSysAsserts -d:chronicles_sinks:textlines,json[file]"
# Run with "SLOTS_PER_EPOCH=8 ./start.sh" to change these # Run with "SLOTS_PER_EPOCH=8 ./start.sh" to change these
DEFS="" DEFS=""

View File

@ -19,7 +19,7 @@ asyncTest "connect two nodes":
var n1PersistentAddress = c1.getPersistenBootstrapAddr( var n1PersistentAddress = c1.getPersistenBootstrapAddr(
parseIpAddress("127.0.0.1"), Port c1.tcpPort) parseIpAddress("127.0.0.1"), Port c1.tcpPort)
var n1 = await createEth2Node(c1) var n1 = await createEth2Node(c1, ENRForkID())
echo "Node 1 persistent address: ", n1PersistentAddress echo "Node 1 persistent address: ", n1PersistentAddress
@ -33,7 +33,7 @@ asyncTest "connect two nodes":
c2.dataDir = OutDir(tempDir / "node-2") c2.dataDir = OutDir(tempDir / "node-2")
c2.tcpPort = 50001 c2.tcpPort = 50001
c2.nat = "none" c2.nat = "none"
var n2 = await createEth2Node(c2) var n2 = await createEth2Node(c2, ENRForkID())
await n2.connectToNetwork(@[n1PersistentAddress]) await n2.connectToNetwork(@[n1PersistentAddress])

2
vendor/nim-eth vendored

@ -1 +1 @@
Subproject commit 8f3bf360540f7aaeba21b0a4296ae70af6734c21 Subproject commit bac62483016f7e66fcf2bfbf83372e9e4dfbc24f