nimbus-eth2/beacon_chain/networking/eth2_network.nim

1895 lines
68 KiB
Nim
Raw Normal View History

# beacon_chain
# Copyright (c) 2018-2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
import
2020-03-22 21:55:01 +00:00
# Std lib
2021-06-25 00:07:46 +00:00
std/[typetraits, sequtils, os, algorithm, math, sets, strutils],
2020-08-02 19:27:36 +00:00
std/options as stdOptions,
2020-03-22 21:55:01 +00:00
# Status libs
stew/[leb128, base58, endians2, results, byteutils, io2], bearssl,
2020-06-05 15:08:50 +00:00
stew/shims/net as stewNet,
2020-05-20 17:05:22 +00:00
stew/shims/[macros, tables],
faststreams/[inputs, outputs, buffers], snappy, snappy/framing,
2020-03-22 21:55:01 +00:00
json_serialization, json_serialization/std/[net, options],
chronos, chronicles, metrics,
libp2p/[switch, peerinfo, multicodec,
multiaddress, crypto/crypto, crypto/secp,
2021-04-05 20:21:27 +00:00
protocols/identify, protocols/protocol,
builders],
libp2p/muxers/muxer, libp2p/muxers/mplex/mplex,
libp2p/transports/[transport, tcptransport],
libp2p/protocols/secure/[secure, noise],
libp2p/protocols/pubsub/[pubsub, gossipsub, rpc/message, rpc/messages],
2020-05-20 04:57:39 +00:00
libp2p/transports/tcptransport,
libp2p/stream/connection,
libp2p/utils/semaphore,
eth/[keys, async_utils], eth/p2p/p2p_protocol_dsl,
eth/net/nat, eth/p2p/discoveryv5/[enr, node, random2],
".."/[
version, conf,
ssz/ssz_serialization, beacon_clock],
../spec/datatypes/[phase0, altair],
../spec/[digest, network, helpers, forks],
../validators/keystore_management,
./eth2_discovery, ./peer_pool, ./libp2p_json_serialization
2020-03-22 21:55:01 +00:00
when chronicles.enabledLogLevel == LogLevel.TRACE:
import std/sequtils
2020-03-22 21:55:01 +00:00
export
version, multiaddress, peer_pool, peerinfo, p2pProtocol, connection,
libp2p_json_serialization, ssz_serialization, results, eth2_discovery
2020-03-22 21:55:01 +00:00
logScope:
topics = "networking"
type
NetKeyPair* = crypto.KeyPair
PublicKey* = crypto.PublicKey
PrivateKey* = crypto.PrivateKey
2020-03-22 21:55:01 +00:00
Bytes = seq[byte]
ErrorMsg = List[byte, 256]
2020-03-22 21:55:01 +00:00
# TODO: This is here only to eradicate a compiler
# warning about unused import (rpc/messages).
GossipMsg = messages.Message
SeenItem* = object
2020-08-02 19:27:36 +00:00
peerId*: PeerID
stamp*: chronos.Moment
2020-03-22 21:55:01 +00:00
Eth2Node* = ref object of RootObj
switch*: Switch
2020-11-29 19:07:20 +00:00
pubsub*: GossipSub
2020-03-22 21:55:01 +00:00
discovery*: Eth2DiscoveryProtocol
2020-08-24 11:52:06 +00:00
discoveryEnabled*: bool
2020-03-22 21:55:01 +00:00
wantedPeers*: int
peerPool*: PeerPool[Peer, PeerID]
protocolStates*: seq[RootRef]
libp2pTransportLoops*: seq[Future[void]]
metadata*: altair.MetaData
connectTimeout*: chronos.Duration
seenThreshold*: chronos.Duration
2020-08-02 19:27:36 +00:00
connQueue: AsyncQueue[PeerAddr]
seenTable: Table[PeerID, SeenItem]
connWorkers: seq[Future[void]]
2020-08-02 19:27:36 +00:00
connTable: HashSet[PeerID]
forkId*: ENRForkID
forkDigests*: ForkDigestsRef
rng*: ref BrHmacDrbgContext
peers*: Table[PeerID, Peer]
validTopics: HashSet[string]
2020-03-22 21:55:01 +00:00
EthereumNode = Eth2Node # needed for the definitions in p2p_backends_helpers
2020-07-25 06:12:23 +00:00
AverageThroughput* = object
count*: uint64
average*: float
2020-03-22 21:55:01 +00:00
Peer* = ref object
network*: Eth2Node
info*: PeerInfo
discoveryId*: Eth2DiscoveryId
connectionState*: ConnectionState
protocolStates*: seq[RootRef]
2020-07-25 06:12:23 +00:00
netThroughput: AverageThroughput
2020-03-22 21:55:01 +00:00
score*: int
requestQuota*: float
lastReqTime*: Moment
connections*: int
enr*: Option[enr.Record]
direction*: PeerType
disconnectedFut: Future[void]
2020-03-22 21:55:01 +00:00
2020-08-02 19:27:36 +00:00
PeerAddr* = object
peerId*: PeerID
addrs*: seq[MultiAddress]
2020-03-22 21:55:01 +00:00
ConnectionState* = enum
None,
Connecting,
Connected,
Disconnecting,
Disconnected
UntypedResponse* = ref object
2020-03-22 21:55:01 +00:00
peer*: Peer
2020-03-22 23:23:21 +00:00
stream*: Connection
writtenChunks*: int
2020-03-22 21:55:01 +00:00
SingleChunkResponse*[MsgType] = distinct UntypedResponse
## Protocol requests using this type will produce request-making
## client-side procs that return `NetRes[MsgType]`
MultipleChunksResponse*[MsgType] = distinct UntypedResponse
## Protocol requests using this type will produce request-making
## client-side procs that return `NetRes[seq[MsgType]]`.
## In the future, such procs will return an `InputStream[NetRes[MsgType]]`.
2020-03-22 21:55:01 +00:00
MessageInfo* = object
name*: string
# Private fields:
libp2pCodecName: string
protocolMounter*: MounterProc
ProtocolInfoObj* = object
name*: string
messages*: seq[MessageInfo]
index*: int # the position of the protocol in the
# ordered list of supported protocols
# Private fields:
peerStateInitializer*: PeerStateInitializer
networkStateInitializer*: NetworkStateInitializer
onPeerConnected*: OnPeerConnectedHandler
onPeerDisconnected*: OnPeerDisconnectedHandler
2020-03-22 21:55:01 +00:00
ProtocolInfo* = ptr ProtocolInfoObj
ResponseCode* = enum
Success
InvalidRequest
ServerError
PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe, raises: [Defect].}
NetworkStateInitializer* = proc(network: EthereumNode): RootRef {.gcsafe, raises: [Defect].}
OnPeerConnectedHandler* = proc(peer: Peer, incoming: bool): Future[void] {.gcsafe.}
OnPeerDisconnectedHandler* = proc(peer: Peer): Future[void] {.gcsafe, raises: [Defect].}
2020-03-22 21:55:01 +00:00
ThunkProc* = LPProtoHandler
MounterProc* = proc(network: Eth2Node) {.gcsafe, raises: [Defect, CatchableError].}
MessageContentPrinter* = proc(msg: pointer): string {.gcsafe, raises: [Defect].}
2020-03-22 21:55:01 +00:00
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#goodbye
2020-03-22 21:55:01 +00:00
DisconnectionReason* = enum
# might see other values on the wire!
ClientShutDown = 1
IrrelevantNetwork = 2
FaultOrError = 3
# Clients MAY use reason codes above 128 to indicate alternative,
# erroneous request-specific responses.
PeerScoreLow = 237 # 79 * 3
2020-03-22 21:55:01 +00:00
PeerDisconnected* = object of CatchableError
reason*: DisconnectionReason
TransmissionError* = object of CatchableError
Eth2NetworkingErrorKind* = enum
BrokenConnection
ReceivedErrorResponse
UnexpectedEOF
PotentiallyExpectedEOF
InvalidResponseCode
InvalidSnappyBytes
InvalidSszBytes
StreamOpenTimeout
ReadResponseTimeout
ZeroSizePrefix
SizePrefixOverflow
InvalidContextBytes
Eth2NetworkingError = object
case kind*: Eth2NetworkingErrorKind
of ReceivedErrorResponse:
responseCode: ResponseCode
Implement split preset/config support (#2710) * Implement split preset/config support This is the initial bulk refactor to introduce runtime config values in a number of places, somewhat replacing the existing mechanism of loading network metadata. It still needs more work, this is the initial refactor that introduces runtime configuration in some of the places that need it. The PR changes the way presets and constants work, to match the spec. In particular, a "preset" now refers to the compile-time configuration while a "cfg" or "RuntimeConfig" is the dynamic part. A single binary can support either mainnet or minimal, but not both. Support for other presets has been removed completely (can be readded, in case there's need). There's a number of outstanding tasks: * `SECONDS_PER_SLOT` still needs fixing * loading custom runtime configs needs redoing * checking constants against YAML file * yeerongpilly support `build/nimbus_beacon_node --network=yeerongpilly --discv5:no --log-level=DEBUG` * load fork epoch from config * fix fork digest sent in status * nicer error string for request failures * fix tools * one more * fixup * fixup * fixup * use "standard" network definition folder in local testnet Files are loaded from their standard locations, including genesis etc, to conform to the format used in the `eth2-networks` repo. * fix launch scripts, allow unknown config values * fix base config of rest test * cleanups * bundle mainnet config using common loader * fix spec links and names * only include supported preset in binary * drop yeerongpilly, add altair-devnet-0, support boot_enr.yaml
2021-07-12 13:01:38 +00:00
errorMsg: string
else:
discard
InvalidInputsError* = object of CatchableError
NetRes*[T] = Result[T, Eth2NetworkingError]
## This is type returned from all network requests
func phase0metadata*(node: Eth2Node): phase0.MetaData =
phase0.MetaData(
seq_number: node.metadata.seq_number,
attnets: node.metadata.attnets)
const
clientId* = "Nimbus beacon node " & fullVersionStr
nodeMetadataFilename = "node-metadata.json"
NewPeerScore* = 200
## Score which will be assigned to new connected Peer
PeerScoreLowLimit* = 0
## Score after which peer will be kicked
PeerScoreHighLimit* = 1000
## Max value of peer's score
PeerScoreInvalidRequest* = -500
## This peer is sending malformed or nonsensical data
ConcurrentConnections = 10
## Maximum number of active concurrent connection requests.
SeenTableTimeTimeout =
when not defined(local_testnet): 5.minutes else: 10.seconds
## Seen period of time for timeout connections
SeenTableTimeDeadPeer =
when not defined(local_testnet): 5.minutes else: 10.seconds
## Period of time for dead peers.
SeenTableTimeIrrelevantNetwork = 24.hours
## Period of time for `IrrelevantNetwork` error reason.
SeenTableTimeClientShutDown = 10.minutes
## Period of time for `ClientShutDown` error reason.
SeenTableTimeFaultOrError = 10.minutes
## Period of time for `FaultOnError` error reason.
SeenTablePenaltyError = 60.minutes
## Period of time for peers which score below or equal to zero.
SeenTableTimeReconnect = 1.minutes
## Minimal time between disconnection and reconnection attempt
template neterr*(kindParam: Eth2NetworkingErrorKind): auto =
err(type(result), Eth2NetworkingError(kind: kindParam))
# Metrics for tracking attestation and beacon block loss
declareCounter nbc_gossip_messages_sent,
"Number of gossip messages sent by this peer"
declareCounter nbc_gossip_messages_received,
"Number of gossip messages received by this peer"
declareCounter nbc_successful_dials,
2020-03-22 23:23:21 +00:00
"Number of successfully dialed peers"
2020-03-22 21:55:01 +00:00
declareCounter nbc_failed_dials,
"Number of dialing attempts that failed"
declareCounter nbc_timeout_dials,
"Number of dialing attempts that exceeded timeout"
declareGauge nbc_peers,
2020-03-22 23:23:21 +00:00
"Number of active libp2p peers"
2020-03-22 21:55:01 +00:00
declareCounter nbc_successful_discoveries,
"Number of successfull discoveries"
declareCounter nbc_failed_discoveries,
"Number of failed discoveries"
const delayBuckets = [1.0, 5.0, 10.0, 20.0, 40.0, 60.0]
declareHistogram nbc_resolve_time,
"Time(s) used while resolving peer information",
buckets = delayBuckets
const
snappy_implementation {.strdefine.} = "libp2p"
const useNativeSnappy = when snappy_implementation == "native": true
elif snappy_implementation == "libp2p": false
else: {.fatal: "Please set snappy_implementation to either 'libp2p' or 'native'".}
const
libp2p_pki_schemes {.strdefine.} = ""
when libp2p_pki_schemes != "secp256k1":
{.fatal: "Incorrect building process, please use -d:\"libp2p_pki_schemes=secp256k1\"".}
const
NetworkInsecureKeyPassword = "INSECUREPASSWORD"
2020-03-22 21:55:01 +00:00
template libp2pProtocol*(name: string, version: int) {.pragma.}
func shortLog*(peer: Peer): string = shortLog(peer.info.peerId)
chronicles.formatIt(Peer): shortLog(it)
chronicles.formatIt(PublicKey): byteutils.toHex(it.getBytes().tryGet())
2020-03-22 21:55:01 +00:00
template remote*(peer: Peer): untyped =
peer.info.peerId
proc openStream(node: Eth2Node,
peer: Peer,
protocolId: string): Future[Connection] {.async.} =
# When dialling here, we do not provide addresses - all new connection
# attempts are handled via `connect` which also takes into account
# reconnection timeouts
let
protocolId = protocolId & "ssz_snappy"
conn = await dial(
node.switch, peer.info.peerId, protocolId)
# libp2p may replace peerinfo ref sometimes, so make sure we have a recent
# one
if conn.peerInfo != nil:
peer.info = conn.peerInfo
return conn
2020-03-22 21:55:01 +00:00
proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer {.gcsafe.}
func peerId*(node: Eth2Node): PeerID =
node.switch.peerInfo.peerId
func enrRecord*(node: Eth2Node): Record =
node.discovery.localNode.record
proc getPeer*(node: Eth2Node, peerId: PeerID): Peer =
node.peers.withValue(peerId, peer) do:
return peer[]
do:
let peer = Peer.init(node, PeerInfo.init(peerId))
return node.peers.mgetOrPut(peerId, peer)
2020-03-22 21:55:01 +00:00
proc peerFromStream(network: Eth2Node, conn: Connection): Peer =
result = network.getPeer(conn.peerInfo.peerId)
result.info = conn.peerInfo
2020-03-22 21:55:01 +00:00
proc getKey*(peer: Peer): PeerID {.inline.} =
peer.info.peerId
2020-03-22 21:55:01 +00:00
proc getFuture*(peer: Peer): Future[void] {.inline.} =
if isNil(peer.disconnectedFut):
peer.disconnectedFut = newFuture[void]("Peer.disconnectedFut")
peer.disconnectedFut
2020-03-22 21:55:01 +00:00
2020-04-20 14:59:18 +00:00
proc getScore*(a: Peer): int =
2020-07-25 06:12:23 +00:00
## Returns current score value for peer ``peer``.
a.score
2020-04-20 14:59:18 +00:00
proc updateScore*(peer: Peer, score: int) {.inline.} =
## Update peer's ``peer`` score with value ``score``.
peer.score = peer.score + score
if peer.score > PeerScoreHighLimit:
peer.score = PeerScoreHighLimit
proc calcThroughput(dur: Duration, value: uint64): float =
2020-07-25 06:12:23 +00:00
let secs = float(chronos.seconds(1).nanoseconds)
if isZero(dur):
0.0
else:
float(value) * (secs / float(dur.nanoseconds))
proc updateNetThroughput*(peer: Peer, dur: Duration,
bytesCount: uint64) {.inline.} =
## Update peer's ``peer`` network throughput.
let bytesPerSecond = calcThroughput(dur, bytesCount)
let a = peer.netThroughput.average
let n = peer.netThroughput.count
peer.netThroughput.average = a + (bytesPerSecond - a) / float(n + 1)
inc(peer.netThroughput.count)
proc netBps*(peer: Peer): float {.inline.} =
## Returns current network throughput average value in Bps for peer ``peer``.
round((peer.netThroughput.average * 10_000) / 10_000)
proc netKbps*(peer: Peer): float {.inline.} =
## Returns current network throughput average value in Kbps for peer ``peer``.
round(((peer.netThroughput.average / 1024) * 10_000) / 10_000)
proc netMbps*(peer: Peer): float {.inline.} =
## Returns current network throughput average value in Mbps for peer ``peer``.
round(((peer.netThroughput.average / (1024 * 1024)) * 10_000) / 10_000)
proc `<`*(a, b: Peer): bool =
## Comparison function, which first checks peer's scores, and if the peers'
## score is equal it compares peers' network throughput.
if a.score < b.score:
true
elif a.score == b.score:
(a.netThroughput.average < b.netThroughput.average)
else:
false
const
maxRequestQuota = 1000000.0
fullReplenishTime = 5.seconds
replenishRate = (maxRequestQuota / fullReplenishTime.nanoseconds.float)
proc updateRequestQuota*(peer: Peer, reqCost: float) =
let
currentTime = now(chronos.Moment)
nanosSinceLastReq = nanoseconds(currentTime - peer.lastReqTime)
replenishedQuota = peer.requestQuota + nanosSinceLastReq.float * replenishRate
peer.lastReqTime = currentTime
peer.requestQuota = min(replenishedQuota, maxRequestQuota) - reqCost
template awaitNonNegativeRequestQuota*(peer: Peer) =
let quota = peer.requestQuota
if quota < 0:
await sleepAsync(nanoseconds(int((-quota) / replenishRate)))
func allowedOpsPerSecondCost*(n: int): float =
(replenishRate * 1000000000'f / n.float)
2020-08-02 19:27:36 +00:00
proc isSeen*(network: ETh2Node, peerId: PeerID): bool =
## Returns ``true`` if ``peerId`` present in SeenTable and time period is not
## yet expired.
let currentTime = now(chronos.Moment)
2020-08-02 19:27:36 +00:00
if peerId notin network.seenTable:
false
else:
let item = try: network.seenTable[peerId]
except KeyError: raiseAssert "checked with notin"
if currentTime >= item.stamp:
# Peer is in SeenTable, but the time period has expired.
network.seenTable.del(peerId)
false
else:
true
2020-08-02 19:27:36 +00:00
proc addSeen*(network: ETh2Node, peerId: PeerID,
period: chronos.Duration) =
## Adds peer with PeerID ``peerId`` to SeenTable and timeout ``period``.
2020-08-02 19:27:36 +00:00
let item = SeenItem(peerId: peerId, stamp: now(chronos.Moment) + period)
withValue(network.seenTable, peerId, entry) do:
if entry.stamp < item.stamp:
entry.stamp = item.stamp
do:
network.seenTable[peerId] = item
2020-03-22 21:55:01 +00:00
proc disconnect*(peer: Peer, reason: DisconnectionReason,
notifyOtherPeer = false) {.async.} =
# TODO(zah): How should we notify the other peer?
try:
if peer.connectionState notin {Disconnecting, Disconnected}:
peer.connectionState = Disconnecting
# We adding peer in SeenTable before actual disconnect to avoid races.
let seenTime = case reason
of ClientShutDown:
SeenTableTimeClientShutDown
of IrrelevantNetwork:
SeenTableTimeIrrelevantNetwork
of FaultOrError:
SeenTableTimeFaultOrError
of PeerScoreLow:
SeenTablePenaltyError
peer.network.addSeen(peer.info.peerId, seenTime)
await peer.network.switch.disconnect(peer.info.peerId)
except CatchableError:
# We do not care about exceptions in disconnection procedure.
trace "Exception while disconnecting peer", peer = peer.info.peerId,
reason = reason
2020-03-22 21:55:01 +00:00
include eth/p2p/p2p_backends_helpers
include eth/p2p/p2p_tracing
proc getRequestProtoName(fn: NimNode): NimNode =
# `getCustomPragmaVal` doesn't work yet on regular nnkProcDef nodes
# (TODO: file as an issue)
let pragmas = fn.pragma
if pragmas.kind == nnkPragma and pragmas.len > 0:
for pragma in pragmas:
try:
if pragma.len > 0 and $pragma[0] == "libp2pProtocol":
let protoName = $(pragma[1])
let protoVer = $(pragma[2].intVal)
return newLit("/eth2/beacon_chain/req/" & protoName & "/" & protoVer & "/")
except Exception as exc: raiseAssert exc.msg # TODO https://github.com/nim-lang/Nim/issues/17454
2020-03-22 21:55:01 +00:00
return newLit("")
proc writeChunk*(conn: Connection,
responseCode: Option[ResponseCode],
payload: Bytes,
contextBytes: openarray[byte] = []): Future[void] =
var output = memoryOutput()
try:
if responseCode.isSome:
output.write byte(responseCode.get)
if contextBytes.len > 0:
output.write contextBytes
output.write toBytes(payload.lenu64, Leb128).toOpenArray()
framingFormatCompress(output, payload)
except IOError as exc:
raiseAssert exc.msg # memoryOutput shouldn't raise
try:
conn.write(output.getOutput)
except Exception as exc: # TODO fix libp2p
raiseAssert exc.msg
2020-03-22 21:55:01 +00:00
template errorMsgLit(x: static string): ErrorMsg =
const val = ErrorMsg toBytes(x)
val
func formatErrorMsg(msg: ErrorMsg): string =
# ErrorMsg "usually" contains a human-readable string - we'll try to parse it
# as ASCII and return hex if that fails
for c in msg:
if c < 32 or c > 127:
return byteutils.toHex(asSeq(msg))
string.fromBytes(asSeq(msg))
2020-03-22 21:55:01 +00:00
proc sendErrorResponse(peer: Peer,
2020-03-22 23:23:21 +00:00
conn: Connection,
2020-03-22 21:55:01 +00:00
responseCode: ResponseCode,
errMsg: ErrorMsg): Future[void] =
debug "Error processing request",
peer, responseCode, errMsg = formatErrorMsg(errMsg)
conn.writeChunk(some responseCode, SSZ.encode(errMsg))
2020-03-22 21:55:01 +00:00
proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.async.} =
var
deadline = sleepAsync RESP_TIMEOUT
streamFut = peer.network.openStream(peer, protocolId)
2020-03-22 21:55:01 +00:00
await streamFut or deadline
2020-03-22 21:55:01 +00:00
if not streamFut.finished:
await streamFut.cancelAndWait()
2020-03-22 21:55:01 +00:00
raise newException(TransmissionError, "Failed to open LibP2P stream")
let stream = streamFut.read
try:
await stream.writeChunk(none ResponseCode, requestBytes)
finally:
await stream.close()
2020-03-22 21:55:01 +00:00
proc sendResponseChunkBytes(response: UntypedResponse, payload: Bytes): Future[void] =
inc response.writtenChunks
response.stream.writeChunk(some Success, payload)
proc sendResponseChunk*(response: UntypedResponse, val: auto): Future[void] =
inc response.writtenChunks
response.stream.writeChunk(some Success, SSZ.encode(val))
2020-03-22 21:55:01 +00:00
template sendUserHandlerResultAsChunkImpl*(stream: Connection,
handlerResultFut: Future): untyped =
let handlerRes = await handlerResultFut
writeChunk(stream, some Success, SSZ.encode(handlerRes))
2020-03-22 21:55:01 +00:00
template sendUserHandlerResultAsChunkImpl*(stream: Connection,
handlerResult: auto): untyped =
writeChunk(stream, some Success, SSZ.encode(handlerResult))
2020-03-22 21:55:01 +00:00
when useNativeSnappy:
include faststreams_backend
else:
include libp2p_streams_backend
2020-03-22 21:55:01 +00:00
proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes,
ResponseMsg: type,
timeout: Duration): Future[NetRes[ResponseMsg]]
{.async.} =
var deadline = sleepAsync timeout
let stream = awaitWithTimeout(peer.network.openStream(peer, protocolId),
deadline): return neterr StreamOpenTimeout
try:
# Send the request
await stream.writeChunk(none ResponseCode, requestBytes)
# Half-close the stream to mark the end of the request - if this is not
# done, the other peer might never send us the response.
await stream.close()
# Read the response
return
await readResponse(when useNativeSnappy: libp2pInput(stream) else: stream,
peer, ResponseMsg, timeout)
finally:
await stream.closeWithEOF()
2020-03-22 21:55:01 +00:00
proc init*[MsgType](T: type MultipleChunksResponse[MsgType],
peer: Peer, conn: Connection): T =
T(UntypedResponse(peer: peer, stream: conn))
proc init*[MsgType](T: type SingleChunkResponse[MsgType],
peer: Peer, conn: Connection): T =
T(UntypedResponse(peer: peer, stream: conn))
template write*[M](r: MultipleChunksResponse[M], val: M): untyped =
mixin sendResponseChunk
sendResponseChunk(UntypedResponse(r), val)
template send*[M](r: SingleChunkResponse[M], val: M): untyped =
mixin sendResponseChunk
doAssert UntypedResponse(r).writtenChunks == 0
sendResponseChunk(UntypedResponse(r), val)
2020-03-22 21:55:01 +00:00
proc performProtocolHandshakes*(peer: Peer, incoming: bool) {.async.} =
# Loop down serially because it's easier to reason about the connection state
# when there are fewer async races, specially during setup
2020-03-22 21:55:01 +00:00
for protocol in allProtocols:
if protocol.onPeerConnected != nil:
await protocol.onPeerConnected(peer, incoming)
2020-03-22 21:55:01 +00:00
proc initProtocol(name: string,
peerInit: PeerStateInitializer,
networkInit: NetworkStateInitializer): ProtocolInfoObj =
ProtocolInfoObj(
name: name,
messages: @[],
peerStateInitializer: peerInit,
networkStateInitializer: networkInit)
2020-03-22 21:55:01 +00:00
proc registerProtocol(protocol: ProtocolInfo) =
# TODO: This can be done at compile-time in the future
let pos = lowerBound(gProtocols, protocol)
gProtocols.insert(protocol, pos)
for i in 0 ..< gProtocols.len:
gProtocols[i].index = i
proc setEventHandlers(p: ProtocolInfo,
onPeerConnected: OnPeerConnectedHandler,
onPeerDisconnected: OnPeerDisconnectedHandler) =
p.onPeerConnected = onPeerConnected
p.onPeerDisconnected = onPeerDisconnected
2020-03-22 21:55:01 +00:00
proc implementSendProcBody(sendProc: SendProc) =
2020-03-22 21:55:01 +00:00
let
msg = sendProc.msg
UntypedResponse = bindSym "UntypedResponse"
2020-03-22 21:55:01 +00:00
proc sendCallGenerator(peer, bytes: NimNode): NimNode =
if msg.kind != msgResponse:
let msgProto = getRequestProtoName(msg.procDef)
case msg.kind
of msgRequest:
let ResponseRecord = msg.response.recName
2020-03-22 21:55:01 +00:00
quote:
makeEth2Request(`peer`, `msgProto`, `bytes`,
`ResponseRecord`, `timeoutVar`)
2020-03-22 21:55:01 +00:00
else:
quote: sendNotificationMsg(`peer`, `msgProto`, `bytes`)
else:
quote: sendResponseChunkBytes(`UntypedResponse`(`peer`), `bytes`)
2020-03-22 21:55:01 +00:00
sendProc.useStandardBody(nil, nil, sendCallGenerator)
proc handleIncomingStream(network: Eth2Node,
conn: Connection,
MsgType: type) {.async.} =
2020-03-22 21:55:01 +00:00
mixin callUserHandler, RecType
type MsgRec = RecType(MsgType)
const msgName {.used.} = typetraits.name(MsgType)
2020-03-22 21:55:01 +00:00
## Uncomment this to enable tracing on all incoming requests
## You can include `msgNameLit` in the condition to select
## more specific requests:
# when chronicles.runtimeFilteringEnabled:
# setLogLevel(LogLevel.TRACE)
# defer: setLogLevel(LogLevel.DEBUG)
2020-03-22 23:23:21 +00:00
# trace "incoming " & `msgNameLit` & " conn"
2020-03-22 21:55:01 +00:00
let peer = peerFromStream(network, conn)
2020-03-22 21:55:01 +00:00
try:
case peer.connectionState
of Disconnecting, Disconnected, None:
# We got incoming stream request while disconnected or disconnecting.
debug "Got incoming request from disconnected peer", peer = peer,
message = msgName
await conn.closeWithEOF()
return
of Connecting:
# We got incoming stream request while handshake is not yet finished,
# TODO: We could check it here.
debug "Got incoming request from peer while in handshake", peer = peer,
msgName
of Connected:
# We got incoming stream from peer with proper connection state.
debug "Got incoming request from peer", peer = peer, msgName
template returnInvalidRequest(msg: ErrorMsg) =
peer.updateScore(PeerScoreInvalidRequest)
await sendErrorResponse(peer, conn, InvalidRequest, msg)
return
template returnInvalidRequest(msg: string) =
returnInvalidRequest(ErrorMsg msg.toBytes)
let s = when useNativeSnappy:
let fs = libp2pInput(conn)
if fs.timeoutToNextByte(TTFB_TIMEOUT):
returnInvalidRequest(errorMsgLit "Request first byte not sent in time")
fs
else:
# TODO(zah) The TTFB timeout is not implemented in LibP2P streams back-end
conn
let deadline = sleepAsync RESP_TIMEOUT
let msg = if sizeof(MsgRec) > 0:
try:
awaitWithTimeout(readChunkPayload(s, peer, MsgRec), deadline):
returnInvalidRequest(errorMsgLit "Request full data not sent in time")
except SerializationError as err:
returnInvalidRequest err.formatMsg("msg")
except SnappyError as err:
returnInvalidRequest err.msg
else:
NetRes[MsgRec].ok default(MsgRec)
if msg.isErr:
let (responseCode, errMsg) = case msg.error.kind
of UnexpectedEOF, PotentiallyExpectedEOF:
(InvalidRequest, errorMsgLit "Incomplete request")
of InvalidContextBytes:
(ServerError, errorMsgLit "Unrecognized context bytes")
of InvalidSnappyBytes:
(InvalidRequest, errorMsgLit "Failed to decompress snappy payload")
of InvalidSszBytes:
(InvalidRequest, errorMsgLit "Failed to decode SSZ payload")
of ZeroSizePrefix:
(InvalidRequest, errorMsgLit "The request chunk cannot have a size of zero")
of SizePrefixOverflow:
(InvalidRequest, errorMsgLit "The chunk size exceed the maximum allowed")
of InvalidResponseCode, ReceivedErrorResponse,
StreamOpenTimeout, ReadResponseTimeout:
# These shouldn't be possible in a request, because
# there are no response codes being read, no stream
# openings and no reading of responses:
(ServerError, errorMsgLit "Internal server error")
of BrokenConnection:
return
await sendErrorResponse(peer, conn, responseCode, errMsg)
return
try:
logReceivedMsg(peer, MsgType(msg.get))
await callUserHandler(MsgType, peer, conn, msg.get)
except InvalidInputsError as err:
returnInvalidRequest err.msg
await sendErrorResponse(peer, conn, ServerError,
ErrorMsg err.msg.toBytes)
except CatchableError as err:
await sendErrorResponse(peer, conn, ServerError,
ErrorMsg err.msg.toBytes)
except CatchableError as err:
debug "Error processing an incoming request", err = err.msg, msgName
finally:
await conn.closeWithEOF()
discard network.peerPool.checkPeerScore(peer)
2020-03-22 21:55:01 +00:00
proc toPeerAddr*(r: enr.TypedRecord,
proto: IpTransportProtocol): Result[PeerAddr, cstring] {.
raises: [Defect].} =
2020-08-02 19:27:36 +00:00
if not r.secp256k1.isSome:
return err("enr: no secp256k1 key in record")
let
pubKey = ? keys.PublicKey.fromRaw(r.secp256k1.get)
peerId = ? PeerID.init(crypto.PublicKey(
scheme: Secp256k1, skkey: secp.SkPublicKey(pubKey)))
var addrs = newSeq[MultiAddress]()
case proto
of tcpProtocol:
if r.ip.isSome and r.tcp.isSome:
let ip = ipv4(r.ip.get)
addrs.add MultiAddress.init(ip, tcpProtocol, Port r.tcp.get)
if r.ip6.isSome:
let ip = ipv6(r.ip6.get)
if r.tcp6.isSome:
addrs.add MultiAddress.init(ip, tcpProtocol, Port r.tcp6.get)
elif r.tcp.isSome:
addrs.add MultiAddress.init(ip, tcpProtocol, Port r.tcp.get)
else:
discard
of udpProtocol:
if r.ip.isSome and r.udp.isSome:
let ip = ipv4(r.ip.get)
addrs.add MultiAddress.init(ip, udpProtocol, Port r.udp.get)
if r.ip6.isSome:
let ip = ipv6(r.ip6.get)
if r.udp6.isSome:
addrs.add MultiAddress.init(ip, udpProtocol, Port r.udp6.get)
elif r.udp.isSome:
addrs.add MultiAddress.init(ip, udpProtocol, Port r.udp.get)
else:
discard
2020-03-22 21:55:01 +00:00
2020-08-02 19:27:36 +00:00
if addrs.len == 0:
return err("enr: no addresses in record")
2020-03-22 21:55:01 +00:00
2020-08-02 19:27:36 +00:00
ok(PeerAddr(peerId: peerId, addrs: addrs))
2020-03-22 21:55:01 +00:00
proc checkPeer(node: Eth2Node, peerAddr: PeerAddr): bool =
2020-08-02 19:27:36 +00:00
logScope: peer = peerAddr.peerId
let peerId = peerAddr.peerId
if node.peerPool.hasPeer(peerId):
trace "Already connected"
false
else:
if node.isSeen(peerId):
trace "Recently connected"
false
else:
true
2020-03-22 21:55:01 +00:00
proc dialPeer*(node: Eth2Node, peerAddr: PeerAddr, index = 0) {.async.} =
## Establish connection with remote peer identified by address ``peerAddr``.
logScope:
peer = peerAddr.peerId
index = index
if not(node.checkPeer(peerAddr)):
return
debug "Connecting to discovered peer"
var deadline = sleepAsync(node.connectTimeout)
var workfut = node.switch.connect(peerAddr.peerId, peerAddr.addrs)
try:
# `or` operation will only raise exception of `workfut`, because `deadline`
# could not raise exception.
await workfut or deadline
if workfut.finished():
if not deadline.finished():
deadline.cancel()
inc nbc_successful_dials
else:
debug "Connection to remote peer timed out"
inc nbc_timeout_dials
node.addSeen(peerAddr.peerId, SeenTableTimeTimeout)
await cancelAndWait(workfut)
except CatchableError as exc:
debug "Connection to remote peer failed", msg = exc.msg
inc nbc_failed_dials
node.addSeen(peerAddr.peerId, SeenTableTimeDeadPeer)
proc connectWorker(node: Eth2Node, index: int) {.async.} =
debug "Connection worker started", index = index
while true:
# This loop will never produce HIGH CPU usage because it will wait
# and block until it not obtains new peer from the queue ``connQueue``.
let remotePeerAddr = await node.connQueue.popFirst()
# Previous worker dial might have hit the maximum peers.
# TODO: could clear the whole connTable and connQueue here also, best
# would be to have this event based coming from peer pool or libp2p.
if node.switch.connManager.outSema.count > 0:
await node.dialPeer(remotePeerAddr, index)
# Peer was added to `connTable` before adding it to `connQueue`, so we
# excluding peer here after processing.
node.connTable.excl(remotePeerAddr.peerId)
proc toPeerAddr(node: Node): Result[PeerAddr, cstring] {.raises: [Defect].} =
let nodeRecord = ? node.record.toTypedRecord()
let peerAddr = ? nodeRecord.toPeerAddr(tcpProtocol)
ok(peerAddr)
2020-03-22 21:55:01 +00:00
proc queryRandom*(d: Eth2DiscoveryProtocol, forkId: ENRForkID,
attnets: BitArray[ATTESTATION_SUBNET_COUNT]):
Future[seq[PeerAddr]] {.async, raises: [Defect].} =
## Perform a discovery query for a random target matching the eth2 field
## (forkId) and matching at least one of the attestation subnets.
let nodes = await d.queryRandom()
let sszForkId = SSZ.encode(forkId)
var filtered: seq[PeerAddr]
for n in nodes:
if n.record.contains((enrForkIdField, sszForkId)):
let res = n.record.tryGet(enrAttestationSubnetsField, seq[byte])
if res.isSome():
let attnetsNode =
try:
SSZ.decode(res.get(), BitArray[ATTESTATION_SUBNET_COUNT])
except SszError as e:
debug "Could not decode attestation subnet bitfield of peer",
peer = n.record.toURI(), exception = e.name, msg = e.msg
continue
for i in 0..<attnetsNode.bytes.len:
if (attnets.bytes[i] and attnetsNode.bytes[i]) > 0:
# we have at least one subnet match
let peerAddr = n.toPeerAddr()
if peerAddr.isOk():
filtered.add(peerAddr.get())
break
return filtered
2020-03-22 21:55:01 +00:00
proc runDiscoveryLoop*(node: Eth2Node) {.async.} =
debug "Starting discovery loop"
2020-03-22 21:55:01 +00:00
while true:
if node.switch.connManager.outSema.count > 0:
let forkId = (enrForkIdField, SSZ.encode(node.forkId))
var discoveredNodes = await node.discovery.queryRandom(forkId)
var newPeers = 0
for discNode in discoveredNodes:
let res = discNode.toPeerAddr()
if res.isOk():
let peerAddr = res.get()
# Waiting for an empty space in PeerPool.
while true:
if node.peerPool.lenSpace({PeerType.Outgoing}) == 0:
await node.peerPool.waitForEmptySpace(PeerType.Outgoing)
else:
break
# Check if peer present in SeenTable or PeerPool.
if node.checkPeer(peerAddr):
if peerAddr.peerId notin node.connTable:
# We adding to pending connections table here, but going
# to remove it only in `connectWorker`.
node.connTable.incl(peerAddr.peerId)
await node.connQueue.addLast(peerAddr)
inc(newPeers)
else:
debug "Failed to decode discovery's node address",
node = discnode, errMsg = res.error
debug "Discovery tick", wanted_peers = node.wantedPeers,
space = node.peerPool.shortLogSpace(),
acquired = node.peerPool.shortLogAcquired(),
available = node.peerPool.shortLogAvailable(),
current = node.peerPool.shortLogCurrent(),
length = len(node.peerPool),
discovered_nodes = len(discoveredNodes),
new_peers = newPeers
if newPeers == 0:
let currentPeers = node.peerPool.lenCurrent()
if currentPeers <= node.wantedPeers shr 2: # 25%
warn "Peer count low, no new peers discovered",
discovered_nodes = len(discoveredNodes), new_peers = newPeers,
current_peers = currentPeers, wanted_peers = node.wantedPeers
# Discovery `queryRandom` can have a synchronous fast path for example
# when no peers are in the routing table. Don't run it in continuous loop.
await sleepAsync(1.seconds)
2020-03-22 21:55:01 +00:00
proc getPersistentNetMetadata*(config: BeaconNodeConf): altair.MetaData
{.raises: [Defect, IOError, SerializationError].} =
let metadataPath = config.dataDir / nodeMetadataFilename
if not fileExists(metadataPath):
var res: altair.MetaData
for i in 0 ..< ATTESTATION_SUBNET_COUNT:
# TODO:
# Persistent (stability) subnets should be stored with their expiration
# epochs. For now, indicate that we participate in no persistent subnets.
res.attnets[i] = false
Json.saveFile(metadataPath, res)
res
else:
Json.loadFile(metadataPath, altair.MetaData)
proc resolvePeer(peer: Peer) =
# Resolve task which performs searching of peer's public key and recovery of
# ENR using discovery5. We only resolve ENR for peers we know about to avoid
# querying the network - as of now, the ENR is not needed, except for
# debuggging
logScope: peer = peer.info.peerId
let startTime = now(chronos.Moment)
let nodeId =
block:
var key: PublicKey
# `secp256k1` keys are always stored inside PeerID.
discard peer.info.peerId.extractPublicKey(key)
keys.PublicKey.fromRaw(key.skkey.getBytes()).get().toNodeId()
debug "Peer's ENR recovery task started", node_id = $nodeId
# This is "fast-path" for peers which was dialed. In this case discovery
# already has most recent ENR information about this peer.
let gnode = peer.network.discovery.getNode(nodeId)
if gnode.isSome():
peer.enr = some(gnode.get().record)
inc(nbc_successful_discoveries)
let delay = now(chronos.Moment) - startTime
nbc_resolve_time.observe(delay.toFloatSeconds())
debug "Peer's ENR recovered", delay
proc handlePeer*(peer: Peer) {.async.} =
let res = peer.network.peerPool.addPeerNoWait(peer, peer.direction)
case res:
of PeerStatus.LowScoreError, PeerStatus.NoSpaceError:
# Peer has low score or we do not have enough space in PeerPool,
# we are going to disconnect it gracefully.
# Peer' state will be updated in connection event.
debug "Peer has low score or there no space in PeerPool",
peer = peer, reason = res
await peer.disconnect(FaultOrError)
of PeerStatus.DeadPeerError:
# Peer's lifetime future is finished, so its already dead,
# we do not need to perform gracefull disconect.
# Peer's state will be updated in connection event.
discard
of PeerStatus.DuplicateError:
# Peer is already present in PeerPool, we can't perform disconnect,
# because in such case we could kill both connections (connection
# which is present in PeerPool and new one).
# This is possible bug, because we could enter here only if number
# of `peer.connections == 1`, it means that Peer's lifetime is not
# tracked properly and we still not received `Disconnected` event.
debug "Peer is already present in PeerPool", peer = peer
of PeerStatus.Success:
# Peer was added to PeerPool.
peer.score = NewPeerScore
peer.connectionState = Connected
# We spawn task which will obtain ENR for this peer.
resolvePeer(peer)
debug "Peer successfully connected", peer = peer,
connections = peer.connections
proc onConnEvent(node: Eth2Node, peerId: PeerID, event: ConnEvent) {.async.} =
let peer = node.getPeer(peerId)
case event.kind
of ConnEventKind.Connected:
inc peer.connections
debug "Peer connection upgraded", peer = $peerId,
connections = peer.connections
if peer.connections == 1:
# Libp2p may connect multiple times to the same peer - using different
2020-08-08 20:52:02 +00:00
# transports for both incoming and outgoing. For now, we'll count our
# "fist" encounter with the peer as the true connection, leaving the
# other connections be - libp2p limits the number of concurrent
# connections to the same peer, and only one of these connections will be
# active. Nonetheless, this quirk will cause a number of odd behaviours:
# * For peer limits, we might miscount the incoming vs outgoing quota
# * Protocol handshakes are wonky: we'll not necessarily use the newly
# connected transport - instead we'll just pick a random one!
case peer.connectionState
of Disconnecting:
# We got connection with peer which we currently disconnecting.
# Normally this does not happen, but if a peer is being disconnected
# while a concurrent (incoming for example) connection attempt happens,
# we might end up here
debug "Got connection attempt from peer that we are disconnecting",
peer = peerId
await node.switch.disconnect(peerId)
return
of None:
# We have established a connection with the new peer.
peer.connectionState = Connecting
of Disconnected:
# We have established a connection with the peer that we have seen
# before - reusing the existing peer object is fine
peer.connectionState = Connecting
peer.score = 0 # Will be set to NewPeerScore after handshake
of Connecting, Connected:
# This means that we got notification event from peer which we already
# connected or connecting right now. If this situation will happened,
# it means bug on `nim-libp2p` side.
warn "Got connection attempt from peer which we already connected",
peer = peerId
await peer.disconnect(FaultOrError)
return
# Store connection direction inside Peer object.
if event.incoming:
peer.direction = PeerType.Incoming
else:
peer.direction = PeerType.Outgoing
2020-08-08 20:52:02 +00:00
await performProtocolHandshakes(peer, event.incoming)
of ConnEventKind.Disconnected:
dec peer.connections
debug "Lost connection to peer", peer = peerId,
connections = peer.connections
if peer.connections == 0:
debug "Peer disconnected", peer = $peerId, connections = peer.connections
# Whatever caused disconnection, avoid connection spamming
node.addSeen(peerId, SeenTableTimeReconnect)
let fut = peer.disconnectedFut
if not(isNil(fut)):
fut.complete()
peer.disconnectedFut = nil
else:
# TODO (cheatfate): This could be removed when bug will be fixed inside
# `nim-libp2p`.
debug "Got new event while peer is already disconnected",
peer = peerId, peer_state = peer.connectionState
peer.connectionState = Disconnected
proc new*(T: type Eth2Node, config: BeaconNodeConf,
enrForkId: ENRForkID, forkDigests: ForkDigestsRef,
switch: Switch, pubsub: GossipSub, ip: Option[ValidIpAddress],
tcpPort, udpPort: Option[Port], privKey: keys.PrivateKey, discovery: bool,
rng: ref BrHmacDrbgContext): T {.raises: [Defect, CatchableError].} =
let
metadata = getPersistentNetMetadata(config)
when not defined(local_testnet):
let
connectTimeout = 1.minutes
seenThreshold = 5.minutes
else:
let
connectTimeout = 10.seconds
seenThreshold = 10.seconds
let node = T(
switch: switch,
pubsub: pubsub,
wantedPeers: config.maxPeers,
peerPool: newPeerPool[Peer, PeerID](maxPeers = config.maxPeers),
# Its important here to create AsyncQueue with limited size, otherwise
# it could produce HIGH cpu usage.
connQueue: newAsyncQueue[PeerAddr](ConcurrentConnections),
# TODO: The persistent net metadata should only be used in the case of reusing
# the previous netkey.
metadata: metadata,
forkId: enrForkId,
forkDigests: forkDigests,
discovery: Eth2DiscoveryProtocol.new(
config, ip, tcpPort, udpPort, privKey,
{
enrForkIdField: SSZ.encode(enrForkId),
enrAttestationSubnetsField: SSZ.encode(metadata.attnets)
},
rng),
discoveryEnabled: discovery,
rng: rng,
connectTimeout: connectTimeout,
seenThreshold: seenThreshold,
)
newSeq node.protocolStates, allProtocols.len
2020-03-22 21:55:01 +00:00
for proto in allProtocols:
if proto.networkStateInitializer != nil:
node.protocolStates[proto.index] = proto.networkStateInitializer(node)
2020-03-22 21:55:01 +00:00
for msg in proto.messages:
if msg.protocolMounter != nil:
msg.protocolMounter node
2020-03-22 21:55:01 +00:00
proc peerHook(peerInfo: PeerInfo, event: ConnEvent): Future[void] {.gcsafe.} =
onConnEvent(node, peerInfo.peerId, event)
try:
switch.addConnEventHandler(peerHook, ConnEventKind.Connected)
switch.addConnEventHandler(peerHook, ConnEventKind.Disconnected)
except Exception as exc: # TODO fix libp2p, shouldn't happen
raiseAssert exc.msg
node
2020-03-22 21:55:01 +00:00
template publicKey*(node: Eth2Node): keys.PublicKey =
node.discovery.privKey.toPublicKey
2020-03-22 21:55:01 +00:00
proc startListening*(node: Eth2Node) {.async.} =
2020-08-24 11:52:06 +00:00
if node.discoveryEnabled:
try:
node.discovery.open()
except CatchableError:
fatal "Failed to start discovery service. UDP port may be already in use"
quit 1
try:
node.libp2pTransportLoops = await node.switch.start()
except CatchableError:
fatal "Failed to start LibP2P transport. TCP port may be already in use"
quit 1
2020-08-08 20:52:02 +00:00
await node.pubsub.start()
proc start*(node: Eth2Node) {.async.} =
proc onPeerCountChanged() =
trace "Number of peers has been changed",
space = node.peerPool.shortLogSpace(),
acquired = node.peerPool.shortLogAcquired(),
available = node.peerPool.shortLogAvailable(),
current = node.peerPool.shortLogCurrent(),
length = len(node.peerPool)
nbc_peers.set int64(len(node.peerPool))
node.peerPool.setPeerCounter(onPeerCountChanged)
for i in 0 ..< ConcurrentConnections:
node.connWorkers.add connectWorker(node, i)
2020-08-24 11:52:06 +00:00
if node.discoveryEnabled:
node.discovery.start()
traceAsyncErrors node.runDiscoveryLoop()
else:
notice "Discovery disabled; trying bootstrap nodes",
2020-08-24 11:52:06 +00:00
nodes = node.discovery.bootstrapRecords.len
for enr in node.discovery.bootstrapRecords:
let tr = enr.toTypedRecord()
if tr.isOk():
let pa = tr.get().toPeerAddr(tcpProtocol)
2020-08-24 11:52:06 +00:00
if pa.isOk():
await node.connQueue.addLast(pa.get())
2020-03-22 21:55:01 +00:00
proc stop*(node: Eth2Node) {.async.} =
2020-05-28 01:14:01 +00:00
# Ignore errors in futures, since we're shutting down (but log them on the
# TRACE level, if a timeout is reached).
let
waitedFutures = if node.discoveryEnabled:
@[node.discovery.closeWait(), node.switch.stop()]
else:
@[node.switch.stop()]
2020-05-28 01:14:01 +00:00
timeout = 5.seconds
completed = await withTimeout(allFutures(waitedFutures), timeout)
if not completed:
trace "Eth2Node.stop(): timeout reached", timeout,
futureErrors = waitedFutures.filterIt(it.error != nil).mapIt(it.error.msg)
2020-03-22 21:55:01 +00:00
proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer =
let res = Peer(
info: info,
network: network,
connectionState: ConnectionState.None,
lastReqTime: now(chronos.Moment),
protocolStates: newSeq[RootRef](len(allProtocols))
)
for i in 0 ..< len(allProtocols):
2020-03-22 21:55:01 +00:00
let proto = allProtocols[i]
if not(isNil(proto.peerStateInitializer)):
res.protocolStates[i] = proto.peerStateInitializer(res)
res
2020-03-22 21:55:01 +00:00
proc registerMsg(protocol: ProtocolInfo,
name: string,
mounter: MounterProc,
2020-03-22 23:23:21 +00:00
libp2pCodecName: string) =
2020-03-22 21:55:01 +00:00
protocol.messages.add MessageInfo(name: name,
protocolMounter: mounter,
2020-03-22 23:23:21 +00:00
libp2pCodecName: libp2pCodecName)
2020-03-22 21:55:01 +00:00
proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend =
var
Format = ident "SSZ"
Bool = bindSym "bool"
2020-03-22 23:23:21 +00:00
Connection = bindSym "Connection"
2020-03-22 21:55:01 +00:00
Peer = bindSym "Peer"
Eth2Node = bindSym "Eth2Node"
registerMsg = bindSym "registerMsg"
initProtocol = bindSym "initProtocol"
msgVar = ident "msg"
networkVar = ident "network"
callUserHandler = ident "callUserHandler"
MSG = ident "MSG"
2020-03-22 21:55:01 +00:00
p.useRequestIds = false
p.useSingleRecordInlining = true
new result
result.PeerType = Peer
result.NetworkType = Eth2Node
result.registerProtocol = bindSym "registerProtocol"
result.setEventHandlers = bindSym "setEventHandlers"
result.SerializationFormat = Format
result.RequestResultsWrapper = ident "NetRes"
2020-03-22 21:55:01 +00:00
result.implementMsg = proc (msg: p2p_protocol_dsl.Message) =
if msg.kind == msgResponse:
return
2020-03-22 21:55:01 +00:00
let
protocol = msg.protocol
msgName = $msg.ident
msgNameLit = newLit msgName
MsgRecName = msg.recName
MsgStrongRecName = msg.strongRecName
codecNameLit = getRequestProtoName(msg.procDef)
protocolMounterName = ident(msgName & "Mounter")
2020-03-22 21:55:01 +00:00
##
## Implement the Thunk:
##
2020-03-22 23:23:21 +00:00
## The protocol handlers in nim-libp2p receive only a `Connection`
2020-03-22 21:55:01 +00:00
## parameter and there is no way to access the wider context (such
## as the current `Switch`). In our handlers, we may need to list all
## peers in the current network, so we must keep a reference to the
## network object in the closure environment of the installed handlers.
##
## For this reason, we define a `protocol mounter` proc that will
## initialize the network object by creating handlers bound to the
## specific network.
##
var userHandlerCall = newTree(nnkDiscardStmt)
2020-03-22 21:55:01 +00:00
if msg.userHandler != nil:
var OutputParamType = if msg.kind == msgRequest: msg.outputParamType
else: nil
if OutputParamType == nil:
userHandlerCall = msg.genUserHandlerCall(msgVar, [peerVar])
if msg.kind == msgRequest:
userHandlerCall = newCall(ident"sendUserHandlerResultAsChunkImpl",
streamVar,
userHandlerCall)
else:
if OutputParamType.kind == nnkVarTy:
OutputParamType = OutputParamType[0]
let isChunkStream = eqIdent(OutputParamType[0], "MultipleChunksResponse")
msg.response.recName = if isChunkStream:
newTree(nnkBracketExpr, ident"seq", OutputParamType[1])
else:
OutputParamType[1]
let responseVar = ident("response")
userHandlerCall = newStmtList(
newVarStmt(responseVar,
newCall(ident"init", OutputParamType,
peerVar, streamVar)),
msg.genUserHandlerCall(msgVar, [peerVar], outputParam = responseVar))
protocol.outRecvProcs.add quote do:
template `callUserHandler`(`MSG`: type `MsgStrongRecName`,
`peerVar`: `Peer`,
`streamVar`: `Connection`,
`msgVar`: `MsgRecName`): untyped =
`userHandlerCall`
proc `protocolMounterName`(`networkVar`: `Eth2Node`) =
proc snappyThunk(`streamVar`: `Connection`,
`protocolVar`: string): Future[void] {.gcsafe.} =
return handleIncomingStream(`networkVar`, `streamVar`,
`MsgStrongRecName`)
mount `networkVar`.switch,
LPProtocol(codecs: @[`codecNameLit` & "ssz_snappy"],
handler: snappyThunk)
2020-03-22 21:55:01 +00:00
##
## Implement Senders and Handshake
##
if msg.kind == msgHandshake:
macros.error "Handshake messages are not supported in LibP2P protocols"
else:
var sendProc = msg.createSendProc()
implementSendProcBody sendProc
protocol.outProcRegistrations.add(
newCall(registerMsg,
protocol.protocolInfoVar,
msgNameLit,
protocolMounterName,
2020-03-22 23:23:21 +00:00
codecNameLit))
2020-03-22 21:55:01 +00:00
result.implementProtocolInit = proc (p: P2PProtocol): NimNode =
return newCall(initProtocol, newLit(p.name), p.peerInit, p.netInit)
func asLibp2pKey*(key: keys.PublicKey): PublicKey =
2020-04-17 13:29:49 +00:00
PublicKey(scheme: Secp256k1, skkey: secp.SkPublicKey(key))
func asEthKey*(key: PrivateKey): keys.PrivateKey =
2020-04-17 13:29:49 +00:00
keys.PrivateKey(key.skkey)
proc initAddress*(T: type MultiAddress, str: string): T =
let address = MultiAddress.init(str)
if IPFS.match(address) and matchPartial(multiaddress.TCP, address):
result = address
else:
raise newException(MultiAddressError,
"Invalid bootstrap node multi-address")
template tcpEndPoint(address, port): auto =
2020-06-05 15:08:50 +00:00
MultiAddress.init(address, tcpProtocol, port)
proc getPersistentNetKeys*(rng: var BrHmacDrbgContext,
config: BeaconNodeConf): NetKeyPair =
case config.cmd
2020-11-24 23:23:28 +00:00
of noCommand, record:
if config.netKeyFile == "random":
let res = PrivateKey.random(Secp256k1, rng)
if res.isErr():
fatal "Could not generate random network key file"
quit QuitFailure
let
privKey = res.get()
pubKey = privKey.getKey().expect("working public key from random")
pres = PeerID.init(pubKey)
if pres.isErr():
fatal "Could not obtain PeerID from network key"
quit QuitFailure
info "Generating new networking key", network_public_key = pubKey,
network_peer_id = $pres.get()
NetKeyPair(seckey: privKey, pubkey: pubKey)
else:
let keyPath =
if isAbsolute(config.netKeyFile):
config.netKeyFile
else:
config.dataDir / config.netKeyFile
if fileAccessible(keyPath, {AccessFlags.Find}):
info "Network key storage is present, unlocking", key_path = keyPath
# Insecure password used only for automated testing.
let insecurePassword =
if config.netKeyInsecurePassword:
some(NetworkInsecureKeyPassword)
else:
none[string]()
let res = loadNetKeystore(keyPath, insecurePassword)
if res.isNone():
fatal "Could not load network key file"
quit QuitFailure
let
privKey = res.get()
pubKey = privKey.getKey().expect("working public key from file")
info "Network key storage was successfully unlocked",
key_path = keyPath, network_public_key = pubKey
NetKeyPair(seckey: privKey, pubkey: pubKey)
else:
info "Network key storage is missing, creating a new one",
key_path = keyPath
let rres = PrivateKey.random(Secp256k1, rng)
if rres.isErr():
fatal "Could not generate random network key file"
quit QuitFailure
let
privKey = rres.get()
pubKey = privKey.getKey().expect("working public key from random")
# Insecure password used only for automated testing.
let insecurePassword =
if config.netKeyInsecurePassword:
some(NetworkInsecureKeyPassword)
else:
none[string]()
let sres = saveNetKeystore(rng, keyPath, privKey, insecurePassword)
if sres.isErr():
fatal "Could not create network key file", key_path = keyPath
quit QuitFailure
info "New network key storage was created", key_path = keyPath,
network_public_key = pubKey
NetKeyPair(seckey: privKey, pubkey: pubKey)
of createTestnet:
if config.netKeyFile == "random":
fatal "Could not create testnet using `random` network key"
quit QuitFailure
let keyPath =
if isAbsolute(config.netKeyFile):
config.netKeyFile
2020-05-18 08:11:21 +00:00
else:
config.dataDir / config.netKeyFile
let rres = PrivateKey.random(Secp256k1, rng)
if rres.isErr():
fatal "Could not generate random network key file"
quit QuitFailure
let
privKey = rres.get()
pubKey = privKey.getKey().expect("working public key from random")
# Insecure password used only for automated testing.
let insecurePassword =
if config.netKeyInsecurePassword:
some(NetworkInsecureKeyPassword)
else:
none[string]()
let sres = saveNetKeystore(rng, keyPath, privKey, insecurePassword)
if sres.isErr():
fatal "Could not create network key file", key_path = keyPath
quit QuitFailure
info "New network key storage was created", key_path = keyPath,
network_public_key = pubKey
NetKeyPair(seckey: privKey, pubkey: pubKey)
else:
let res = PrivateKey.random(Secp256k1, rng)
if res.isErr():
fatal "Could not generate random network key file"
quit QuitFailure
let
privKey = res.get()
pubKey = privKey.getKey().expect("working public key from random")
NetKeyPair(seckey: privKey, pubkey: pubKey)
2021-06-25 00:07:46 +00:00
func gossipId(data: openArray[byte], topic: string, valid: bool): seq[byte] =
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#topics-and-messages
# https://github.com/ethereum/eth2.0-specs/blob/v1.1.0-alpha.8/specs/altair/p2p-interface.md#topics-and-messages
const
MESSAGE_DOMAIN_INVALID_SNAPPY = [0x00'u8, 0x00, 0x00, 0x00]
MESSAGE_DOMAIN_VALID_SNAPPY = [0x01'u8, 0x00, 0x00, 0x00]
let messageDigest = withEth2Hash:
h.update(
if valid: MESSAGE_DOMAIN_VALID_SNAPPY else: MESSAGE_DOMAIN_INVALID_SNAPPY)
2021-06-25 00:07:46 +00:00
if topic.len > 0: #altair topic
h.update topic.len.uint64.toBytesLE
h.update topic
h.update data
2021-06-25 00:07:46 +00:00
return messageDigest.data[0..19].toSeq()
func isAltairTopic(topic: string, altairPrefix: string): bool =
const prefixLen = "/eth2/".len
if topic.len <= altairPrefix.len + prefixLen:
2021-06-25 00:07:46 +00:00
false
else:
for ind, ch in altairPrefix:
if ch != topic[ind + prefixLen]: return false
true
2021-06-25 00:07:46 +00:00
func getAltairTopic(m: messages.Message, altairPrefix: string): string =
# TODO Return a lent string here to avoid the string copy
2021-06-25 00:07:46 +00:00
let topic = if m.topicIDs.len > 0: m.topicIDs[0] else: ""
if isAltairTopic(topic, altairPrefix):
2021-06-25 00:07:46 +00:00
topic
else:
""
proc newBeaconSwitch*(config: BeaconNodeConf, seckey: PrivateKey,
address: MultiAddress,
rng: ref BrHmacDrbgContext): Switch {.raises: [Defect, CatchableError].} =
try:
2021-04-05 20:21:27 +00:00
SwitchBuilder
2021-04-06 17:23:48 +00:00
.new()
2021-04-05 20:21:27 +00:00
.withPrivateKey(seckey)
.withAddress(address)
.withRng(rng)
.withNoise()
2021-04-13 20:08:33 +00:00
.withMplex(5.minutes, 5.minutes)
.withMaxConnections(config.maxPeers)
2021-04-05 20:21:27 +00:00
.withAgentVersion(config.agentString)
2021-04-13 20:08:33 +00:00
.withTcpTransport({ServerFlags.ReuseAddr})
2021-04-05 20:21:27 +00:00
.build()
except CatchableError as exc: raise exc
except Exception as exc: # TODO fix libp2p
if exc is Defect: raise (ref Defect)exc
raiseAssert exc.msg
proc createEth2Node*(rng: ref BrHmacDrbgContext,
config: BeaconNodeConf,
netKeys: NetKeyPair,
Implement split preset/config support (#2710) * Implement split preset/config support This is the initial bulk refactor to introduce runtime config values in a number of places, somewhat replacing the existing mechanism of loading network metadata. It still needs more work, this is the initial refactor that introduces runtime configuration in some of the places that need it. The PR changes the way presets and constants work, to match the spec. In particular, a "preset" now refers to the compile-time configuration while a "cfg" or "RuntimeConfig" is the dynamic part. A single binary can support either mainnet or minimal, but not both. Support for other presets has been removed completely (can be readded, in case there's need). There's a number of outstanding tasks: * `SECONDS_PER_SLOT` still needs fixing * loading custom runtime configs needs redoing * checking constants against YAML file * yeerongpilly support `build/nimbus_beacon_node --network=yeerongpilly --discv5:no --log-level=DEBUG` * load fork epoch from config * fix fork digest sent in status * nicer error string for request failures * fix tools * one more * fixup * fixup * fixup * use "standard" network definition folder in local testnet Files are loaded from their standard locations, including genesis etc, to conform to the format used in the `eth2-networks` repo. * fix launch scripts, allow unknown config values * fix base config of rest test * cleanups * bundle mainnet config using common loader * fix spec links and names * only include supported preset in binary * drop yeerongpilly, add altair-devnet-0, support boot_enr.yaml
2021-07-12 13:01:38 +00:00
cfg: RuntimeConfig,
forkDigests: ForkDigestsRef,
wallEpoch: Epoch,
genesisValidatorsRoot: Eth2Digest): Eth2Node
{.raises: [Defect, CatchableError].} =
let
enrForkId = getENRForkID(cfg, wallEpoch, genesisValidatorsRoot)
(extIp, extTcpPort, extUdpPort) = try: setupAddress(
config.nat, config.listenAddress, config.tcpPort, config.udpPort, clientId)
except CatchableError as exc: raise exc
except Exception as exc: raiseAssert exc.msg
hostAddress = tcpEndPoint(config.listenAddress, config.tcpPort)
announcedAddresses = if extIp.isNone() or extTcpPort.isNone(): @[]
else: @[tcpEndPoint(extIp.get(), extTcpPort.get())]
debug "Initializing networking", hostAddress,
network_public_key = netKeys.pubkey,
announcedAddresses
# TODO nim-libp2p still doesn't have support for announcing addresses
# that are different from the host address (this is relevant when we
# are running behind a NAT).
var switch = newBeaconSwitch(config, netKeys.seckey, hostAddress, rng)
2020-08-08 20:52:02 +00:00
func msgIdProvider(m: messages.Message): seq[byte] =
let topic = getAltairTopic(m, forkDigests.altairTopicPrefix)
try:
let decoded = snappy.decode(m.data, GOSSIP_MAX_SIZE)
gossipId(decoded, topic, true)
except CatchableError:
gossipId(m.data, topic, false)
let
params = GossipSubParams(
explicit: true,
pruneBackoff: 1.minutes,
floodPublish: true,
gossipFactor: 0.05,
d: 8,
dLow: 6,
dHigh: 12,
dScore: 6,
dOut: 6 div 2, # less than dlow and no more than dlow/2
dLazy: 6,
heartbeatInterval: 700.milliseconds,
historyLength: 6,
historyGossip: 3,
fanoutTTL: 60.seconds,
seenTTL: 385.seconds,
gossipThreshold: -4000,
publishThreshold: -8000,
graylistThreshold: -16000, # also disconnect threshold
opportunisticGraftThreshold: 0,
decayInterval: 12.seconds,
decayToZero: 0.01,
retainScore: 385.seconds,
appSpecificWeight: 0.0,
ipColocationFactorWeight: -53.75,
ipColocationFactorThreshold: 3.0,
behaviourPenaltyWeight: -15.9,
behaviourPenaltyDecay: 0.986,
disconnectBadPeers: true,
directPeers:
block:
var res = initTable[PeerId, seq[MultiAddress]]()
if config.directPeers.len > 0:
for s in config.directPeers:
let
maddress = MultiAddress.init(s).tryGet()
mpeerId = maddress[multiCodec("p2p")].tryGet()
peerId = PeerID.init(mpeerId.protoAddress().tryGet()).tryGet()
res.mgetOrPut(peerId, @[]).add(maddress)
info "Adding priviledged direct peer", peerId, address = maddress
res
)
pubsub = try: GossipSub.init(
switch = switch,
msgIdProvider = msgIdProvider,
triggerSelf = true,
sign = false,
verifySignature = false,
anonymize = true,
2020-11-29 19:07:20 +00:00
parameters = params)
except CatchableError as exc: raise exc
except Exception as exc: raiseAssert exc.msg # TODO fix libp2p
2020-08-08 20:52:02 +00:00
switch.mount(pubsub)
Eth2Node.new(config, enrForkId,
forkDigests,
switch, pubsub,
extIp, extTcpPort, extUdpPort,
netKeys.seckey.asEthKey,
discovery = config.discv5Enabled,
rng = rng)
proc announcedENR*(node: Eth2Node): enr.Record =
doAssert node.discovery != nil, "The Eth2Node must be initialized"
node.discovery.localNode.record
proc shortForm*(id: NetKeyPair): string =
$PeerID.init(id.pubkey)
proc subscribe*(
node: Eth2Node, topic: string, topicParams: TopicParams,
enableTopicMetrics: bool = false) {.raises: [Defect, CatchableError].} =
proc dummyMsgHandler(topic: string, data: seq[byte]): Future[void] =
# Avoid closure environment with `{.async.}`
var res = newFuture[void]("eth2_network.dummyMsgHandler")
res.complete()
res
let
topicName = topic & "_snappy"
if enableTopicMetrics:
node.pubsub.knownTopics.incl(topicName)
node.pubsub.topicParams[topicName] = topicParams
try:
node.pubsub.subscribe(topicName, dummyMsgHandler)
except CatchableError as exc: raise exc # TODO fix libp2p
except Exception as exc: raiseAssert exc.msg
proc setValidTopics*(node: Eth2Node, topics: openArray[string]) =
let topicsSnappy = topics.mapIt(it & "_snappy")
node.validTopics = topicsSnappy.toHashSet()
# there is a window of time where we got the switch open, we need this lazy update for now
node.pubsub.subscriptionValidator =
proc(topic: string): bool {.gcsafe, raises: [Defect].} =
topic in node.validTopics
proc newValidationResultFuture(v: ValidationResult): Future[ValidationResult] =
let res = newFuture[ValidationResult]("eth2_network.execValidator")
res.complete(v)
res
proc addValidator*[MsgType](node: Eth2Node,
topic: string,
msgValidator: proc(msg: MsgType):
ValidationResult {.gcsafe, raises: [Defect].} ) =
# Validate messages as soon as subscribed
proc execValidator(
topic: string, message: GossipMsg): Future[ValidationResult] {.raises: [Defect].} =
inc nbc_gossip_messages_received
trace "Validating incoming gossip message",
len = message.data.len, topic
let res =
try:
var decompressed = snappy.decode(message.data, GOSSIP_MAX_SIZE)
if decompressed.len > 0:
let decoded = SSZ.decode(decompressed, MsgType)
decompressed = newSeq[byte](0) # release memory before validating
msgValidator(decoded)
else:
debug "Empty gossip data after decompression",
topic, len = message.data.len
ValidationResult.Ignore
except CatchableError as err:
debug "Gossip validation error",
msg = err.msg, topic, len = message.data.len
ValidationResult.Ignore
return newValidationResultFuture(res)
try:
node.pubsub.addValidator(topic & "_snappy", execValidator)
except Exception as exc: raiseAssert exc.msg # TODO fix libp2p
proc addAsyncValidator*[MsgType](node: Eth2Node,
topic: string,
msgValidator: proc(msg: MsgType):
Future[ValidationResult] {.gcsafe, raises: [Defect].} ) =
proc execValidator(
topic: string, message: GossipMsg): Future[ValidationResult] {.raises: [Defect].} =
inc nbc_gossip_messages_received
trace "Validating incoming gossip message",
len = message.data.len, topic
let res =
try:
var decompressed = snappy.decode(message.data, GOSSIP_MAX_SIZE)
if decompressed.len > 0:
let decoded = SSZ.decode(decompressed, MsgType)
decompressed = newSeq[byte](0) # release memory before validating
return msgValidator(decoded) # Reuses future from msgValidator
else:
debug "Empty gossip data after decompression",
topic, len = message.data.len
ValidationResult.Ignore
except CatchableError as err:
debug "Gossip validation error",
msg = err.msg, topic, len = message.data.len
ValidationResult.Ignore
return newValidationResultFuture(res)
try:
node.pubsub.addValidator(topic & "_snappy", execValidator)
except Exception as exc: raiseAssert exc.msg # TODO fix libp2p
proc unsubscribe*(node: Eth2Node, topic: string) {.raises: [Defect, CatchableError].} =
try:
node.pubsub.unsubscribeAll(topic & "_snappy")
except CatchableError as exc: raise exc
except Exception as exc: raiseAssert exc.msg # TODO fix libp2p
proc traceMessage(fut: FutureBase, msgId: seq[byte]) =
fut.addCallback do (arg: pointer):
if not(fut.failed):
trace "Outgoing pubsub message sent", msgId = byteutils.toHex(msgId)
elif fut.error != nil:
debug "Gossip message not sent",
msgId = byteutils.toHex(msgId), err = fut.error.msg
else:
debug "Unexpected future state for gossip",
msgId = byteutils.toHex(msgId), state = fut.state
proc broadcast*(node: Eth2Node, topic: string, msg: auto) =
try:
let
uncompressed = SSZ.encode(msg)
compressed = try: snappy.encode(uncompressed)
except InputTooLarge:
raiseAssert "More than 4gb? not likely.."
# This is only for messages we create. A message this large amounts to an
# internal logic error.
doAssert uncompressed.len <= GOSSIP_MAX_SIZE
inc nbc_gossip_messages_sent
var futSnappy = try: node.pubsub.publish(topic & "_snappy", compressed)
except Exception as exc:
raiseAssert exc.msg # TODO fix libp2p
2021-06-25 00:07:46 +00:00
traceMessage(futSnappy, gossipId(uncompressed, topic & "_snappy", true))
except IOError as exc:
raiseAssert exc.msg # TODO in-memory compression shouldn't fail
proc subscribeAttestationSubnets*(node: Eth2Node, subnets: BitArray[ATTESTATION_SUBNET_COUNT],
forkDigest: ForkDigest)
{.raises: [Defect, CatchableError].} =
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#attestations-and-aggregation
# nimbus won't score attestation subnets for now, we just rely on block and aggregate which are more stabe and reliable
for subnet_id, enabled in subnets:
if enabled:
node.subscribe(getAttestationTopic(
forkDigest, SubnetId(subnet_id)), TopicParams.init()) # don't score attestation subnets for now
proc unsubscribeAttestationSubnets*(node: Eth2Node, subnets: BitArray[ATTESTATION_SUBNET_COUNT],
forkDigest: ForkDigest)
{.raises: [Defect, CatchableError].} =
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#attestations-and-aggregation
# nimbus won't score attestation subnets for now, we just rely on block and aggregate which are more stabe and reliable
for subnet_id, enabled in subnets:
if enabled:
node.unsubscribe(getAttestationTopic(forkDigest, SubnetId(subnet_id)))
proc updateStabilitySubnetMetadata*(
node: Eth2Node, attnets: BitArray[ATTESTATION_SUBNET_COUNT]) =
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#metadata
node.metadata.seq_number += 1
node.metadata.attnets = attnets
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#phase-0-attestation-subnet-stability
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#attestation-subnet-bitfield
let res = node.discovery.updateRecord({
enrAttestationSubnetsField: SSZ.encode(node.metadata.attnets)
})
if res.isErr():
# This should not occur in this scenario as the private key would always
# be the correct one and the ENR will not increase in size.
warn "Failed to update the ENR attnets field", error = res.error
else:
debug "Stability subnets changed; updated ENR attnets", attnets
proc updateSyncnetsMetadata*(
node: Eth2Node, syncnets: BitArray[altair.SYNC_COMMITTEE_SUBNET_COUNT]) =
# https://github.com/ethereum/eth2.0-specs/blob/v1.1.0-alpha.8/specs/altair/validator.md#sync-committee-subnet-stability
node.metadata.seq_number += 1
node.metadata.syncnets = syncnets
let res = node.discovery.updateRecord({
enrSyncSubnetsField: SSZ.encode(node.metadata.syncnets)
})
if res.isErr():
# This should not occur in this scenario as the private key would always
# be the correct one and the ENR will not increase in size.
warn "Failed to update the ENR syncnets field", error = res.error
else:
debug "Sync committees changed; updated ENR syncnets", syncnets
proc updateForkId*(node: Eth2Node, value: ENRForkID) =
node.forkId = value
let res = node.discovery.updateRecord({enrForkIdField: SSZ.encode value})
if res.isErr():
# This should not occur in this scenario as the private key would always
# be the correct one and the ENR will not increase in size.
warn "Failed to update the ENR fork id", value, error = res.error
else:
debug "ENR fork id changed", value
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/validator.md#phase-0-attestation-subnet-stability
func getStabilitySubnetLength*(node: Eth2Node): uint64 =
EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION +
node.rng[].rand(EPOCHS_PER_RANDOM_SUBNET_SUBSCRIPTION.int).uint64
func getRandomSubnetId*(node: Eth2Node): SubnetId =
node.rng[].rand(ATTESTATION_SUBNET_COUNT - 1).SubnetId