# beacon_chain # Copyright (c) 2018-2024 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). # at your option. This file may not be copied, modified, or distributed except according to those terms. {.push raises: [].} import # Std lib std/[typetraits, os, sequtils, strutils, algorithm, math, tables, macrocache], # Status libs results, stew/[leb128, endians2, byteutils, io2, bitops2], stew/shims/macros, snappy, json_serialization, json_serialization/std/[net, sets, options], chronos, chronos/ratelimit, chronicles, metrics, libp2p/[switch, peerinfo, multiaddress, multicodec, crypto/crypto, crypto/secp, builders], libp2p/protocols/pubsub/[ pubsub, gossipsub, rpc/message, rpc/messages, peertable, pubsubpeer], libp2p/stream/connection, eth/[keys, async_utils], eth/net/nat, eth/p2p/discoveryv5/[enr, node, random2], ".."/[version, conf, beacon_clock, conf_light_client], ../spec/datatypes/[phase0, altair, bellatrix], ../spec/[eth2_ssz_serialization, network, helpers, forks], ../validators/keystore_management, "."/[eth2_discovery, eth2_protocol_dsl, libp2p_json_serialization, peer_pool, peer_scores] export tables, chronos, ratelimit, version, multiaddress, peerinfo, p2pProtocol, connection, libp2p_json_serialization, eth2_ssz_serialization, results, eth2_discovery, peer_pool, peer_scores logScope: topics = "networking" type NetKeyPair* = crypto.KeyPair PublicKey* = crypto.PublicKey PrivateKey* = crypto.PrivateKey ErrorMsg = List[byte, 256] SendResult* = Result[void, cstring] DirectPeers = Table[PeerId, seq[MultiAddress]] # TODO: This is here only to eradicate a compiler # warning about unused import (rpc/messages). GossipMsg = messages.Message SeenItem* = object peerId*: PeerId stamp*: chronos.Moment Eth2Node* = ref object of RootObj switch*: Switch pubsub*: GossipSub discovery*: Eth2DiscoveryProtocol discoveryEnabled*: bool wantedPeers*: int hardMaxPeers*: int peerPool*: PeerPool[Peer, PeerId] protocols: seq[ProtocolInfo] ## Protocols managed by the DSL and mounted on the switch protocolStates*: seq[RootRef] metadata*: altair.MetaData connectTimeout*: chronos.Duration seenThreshold*: chronos.Duration connQueue: AsyncQueue[PeerAddr] seenTable: Table[PeerId, SeenItem] connWorkers: seq[Future[void].Raising([CancelledError])] connTable: HashSet[PeerId] forkId*: ENRForkID discoveryForkId*: ENRForkID forkDigests*: ref ForkDigests rng*: ref HmacDrbgContext peers*: Table[PeerId, Peer] directPeers*: DirectPeers validTopics: HashSet[string] peerPingerHeartbeatFut: Future[void].Raising([CancelledError]) peerTrimmerHeartbeatFut: Future[void].Raising([CancelledError]) cfg: RuntimeConfig getBeaconTime: GetBeaconTimeFn quota: TokenBucket ## Global quota mainly for high-bandwidth stuff AverageThroughput* = object count*: uint64 average*: float Peer* = ref object network*: Eth2Node peerId*: PeerId discoveryId*: Eth2DiscoveryId connectionState*: ConnectionState protocolStates*: seq[RootRef] netThroughput: AverageThroughput score*: int quota*: TokenBucket lastReqTime*: Moment connections*: int enr*: Opt[enr.Record] metadata*: Opt[altair.MetaData] failedMetadataRequests: int lastMetadataTime*: Moment direction*: PeerType disconnectedFut: Future[void] statistics*: SyncResponseStats PeerAddr* = object peerId*: PeerId addrs*: seq[MultiAddress] ConnectionState* = enum None, Connecting, Connected, Disconnecting, Disconnected UntypedResponse* = ref object peer*: Peer stream*: Connection writtenChunks*: int SingleChunkResponse*[MsgType] = distinct UntypedResponse ## Protocol requests using this type will produce request-making ## client-side procs that return `NetRes[MsgType]` MultipleChunksResponse*[MsgType; maxLen: static Limit] = distinct UntypedResponse ## Protocol requests using this type will produce request-making ## client-side procs that return `NetRes[List[MsgType, maxLen]]`. ## In the future, such procs will return an `InputStream[NetRes[MsgType]]`. MessageInfo* = object name*: string # Private fields: libp2pCodecName: string protocolMounter*: MounterProc ProtocolInfoObj* = object name*: string messages*: seq[MessageInfo] index*: int # the position of the protocol in the # ordered list of supported protocols # Private fields: peerStateInitializer*: PeerStateInitializer networkStateInitializer*: NetworkStateInitializer onPeerConnected*: OnPeerConnectedHandler onPeerDisconnected*: OnPeerDisconnectedHandler ProtocolInfo* = ptr ProtocolInfoObj ResponseCode* = enum Success InvalidRequest ServerError ResourceUnavailable PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe, raises: [].} NetworkStateInitializer* = proc(network: Eth2Node): RootRef {.gcsafe, raises: [].} OnPeerConnectedHandler* = proc(peer: Peer, incoming: bool): Future[void] {.async: (raises: [CancelledError]).} OnPeerDisconnectedHandler* = proc(peer: Peer): Future[void] {.async: (raises: [CancelledError]).} ThunkProc* = LPProtoHandler MounterProc* = proc(network: Eth2Node) {.gcsafe, raises: [].} MessageContentPrinter* = proc(msg: pointer): string {.gcsafe, raises: [].} # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/phase0/p2p-interface.md#goodbye DisconnectionReason* = enum # might see other values on the wire! ClientShutDown = 1 IrrelevantNetwork = 2 FaultOrError = 3 # Clients MAY use reason codes above 128 to indicate alternative, # erroneous request-specific responses. PeerScoreLow = 237 # 79 * 3 TransmissionError* = object of CatchableError Eth2NetworkingErrorKind* = enum # Potentially benign errors (network conditions) BrokenConnection ReceivedErrorResponse UnexpectedEOF PotentiallyExpectedEOF StreamOpenTimeout ReadResponseTimeout # Errors for which we descore heavily (protocol violations) InvalidResponseCode InvalidSnappyBytes InvalidSszBytes InvalidSizePrefix ZeroSizePrefix SizePrefixOverflow InvalidContextBytes ResponseChunkOverflow UnknownError Eth2NetworkingError = object case kind*: Eth2NetworkingErrorKind of ReceivedErrorResponse: responseCode*: ResponseCode errorMsg*: string else: discard InvalidInputsError* = object of CatchableError ResourceUnavailableError* = object of CatchableError NetRes*[T] = Result[T, Eth2NetworkingError] ## This is type returned from all network requests const clientId* = "Nimbus beacon node " & fullVersionStr requestPrefix = "/eth2/beacon_chain/req/" requestSuffix = "/ssz_snappy" ConcurrentConnections = 20 ## Maximum number of active concurrent connection requests. SeenTableTimeTimeout = when not defined(local_testnet): 5.minutes else: 10.seconds ## Seen period of time for timeout connections SeenTableTimeDeadPeer = when not defined(local_testnet): 5.minutes else: 10.seconds ## Period of time for dead peers. SeenTableTimeIrrelevantNetwork = 24.hours ## Period of time for `IrrelevantNetwork` error reason. SeenTableTimeClientShutDown = 10.minutes ## Period of time for `ClientShutDown` error reason. SeenTableTimeFaultOrError = 10.minutes ## Period of time for `FaultOnError` error reason. SeenTablePenaltyError = 60.minutes ## Period of time for peers which score below or equal to zero. SeenTableTimeReconnect = 1.minutes ## Minimal time between disconnection and reconnection attempt ProtocolViolations = {InvalidResponseCode..Eth2NetworkingErrorKind.high()} template neterr*(kindParam: Eth2NetworkingErrorKind): auto = err(type(result), Eth2NetworkingError(kind: kindParam)) # Metrics for tracking attestation and beacon block loss declareCounter nbc_gossip_messages_sent, "Number of gossip messages sent by this peer" declareCounter nbc_gossip_messages_received, "Number of gossip messages received by this peer" declareCounter nbc_gossip_failed_snappy, "Number of gossip messages that failed snappy decompression" declareCounter nbc_gossip_failed_ssz, "Number of gossip messages that failed SSZ parsing" declareCounter nbc_successful_dials, "Number of successfully dialed peers" declareCounter nbc_failed_dials, "Number of dialing attempts that failed" declareCounter nbc_timeout_dials, "Number of dialing attempts that exceeded timeout" declareGauge nbc_peers, "Number of active libp2p peers" declareCounter nbc_successful_discoveries, "Number of successful discoveries" declareCounter nbc_failed_discoveries, "Number of failed discoveries" declareCounter nbc_cycling_kicked_peers, "Number of peers kicked for peer cycling" declareGauge nbc_gossipsub_low_fanout, "numbers of topics with low fanout" declareGauge nbc_gossipsub_good_fanout, "numbers of topics with good fanout" declareGauge nbc_gossipsub_healthy_fanout, "numbers of topics with dHigh fanout" declareHistogram nbc_resolve_time, "Time(s) used while resolving peer information", buckets = [1.0, 5.0, 10.0, 20.0, 40.0, 60.0] declareCounter nbc_reqresp_messages_sent, "Number of Req/Resp messages sent", labels = ["protocol"] declareCounter nbc_reqresp_messages_received, "Number of Req/Resp messages received", labels = ["protocol"] declareCounter nbc_reqresp_messages_failed, "Number of Req/Resp messages that failed decoding", labels = ["protocol"] declareCounter nbc_reqresp_messages_throttled, "Number of Req/Resp messages that were throttled", labels = ["protocol"] const libp2p_pki_schemes {.strdefine.} = "" when libp2p_pki_schemes != "secp256k1": {.fatal: "Incorrect building process, please use -d:\"libp2p_pki_schemes=secp256k1\"".} const NetworkInsecureKeyPassword = "INSECUREPASSWORD" template libp2pProtocol*(name: string, version: int) {.pragma.} func shortLog*(peer: Peer): string = shortLog(peer.peerId) chronicles.formatIt(Peer): shortLog(it) chronicles.formatIt(PublicKey): byteutils.toHex(it.getBytes().tryGet()) func shortProtocolId(protocolId: string): string = let start = if protocolId.startsWith(requestPrefix): requestPrefix.len else: 0 ends = if protocolId.endsWith(requestSuffix): protocolId.high - requestSuffix.len else: protocolId.high protocolId[start..ends] proc openStream(node: Eth2Node, peer: Peer, protocolId: string): Future[NetRes[Connection]] {.async: (raises: [CancelledError]).} = # When dialing here, we do not provide addresses - all new connection # attempts are handled via `connect` which also takes into account # reconnection timeouts try: ok await dial(node.switch, peer.peerId, protocolId) except LPError as exc: debug "Dialing failed", exc = exc.msg neterr BrokenConnection except CancelledError as exc: raise exc except CatchableError as exc: # TODO remove once libp2p supports `raises` debug "Unexpected error when opening stream", exc = exc.msg neterr UnknownError proc init(T: type Peer, network: Eth2Node, peerId: PeerId): Peer {.gcsafe.} proc getState*(peer: Peer, proto: ProtocolInfo): RootRef = doAssert peer.protocolStates[proto.index] != nil, $proto.index peer.protocolStates[proto.index] template state*(peer: Peer, Protocol: type): untyped = ## Returns the state object of a particular protocol for a ## particular connection. mixin State bind getState type S = Protocol.State S(getState(peer, Protocol.protocolInfo)) proc getNetworkState*(node: Eth2Node, proto: ProtocolInfo): RootRef = doAssert node.protocolStates[proto.index] != nil, $proto.index node.protocolStates[proto.index] template protocolState*(node: Eth2Node, Protocol: type): untyped = mixin NetworkState bind getNetworkState type S = Protocol.NetworkState S(getNetworkState(node, Protocol.protocolInfo)) proc initProtocolState*[T](state: T, x: Peer|Eth2Node) {.gcsafe, raises: [].} = discard template networkState*(connection: Peer, Protocol: type): untyped = ## Returns the network state object of a particular protocol for a ## particular connection. protocolState(connection.network, Protocol) func peerId*(node: Eth2Node): PeerId = node.switch.peerInfo.peerId func nodeId*(node: Eth2Node): NodeId = # `secp256k1` keys are always stored inside PeerId. toNodeId(keys.PublicKey(node.switch.peerInfo.publicKey.skkey)) func enrRecord*(node: Eth2Node): Record = node.discovery.localNode.record proc getPeer(node: Eth2Node, peerId: PeerId): Peer = node.peers.withValue(peerId, peer) do: return peer[] do: let peer = Peer.init(node, peerId) return node.peers.mgetOrPut(peerId, peer) proc peerFromStream(network: Eth2Node, conn: Connection): Peer = result = network.getPeer(conn.peerId) result.peerId = conn.peerId func getKey*(peer: Peer): PeerId {.inline.} = peer.peerId proc getFuture(peer: Peer): Future[void] {.inline.} = if isNil(peer.disconnectedFut): peer.disconnectedFut = newFuture[void]("Peer.disconnectedFut") peer.disconnectedFut func getScore*(a: Peer): int = ## Returns current score value for peer ``peer``. a.score func updateScore*(peer: Peer, score: int) {.inline.} = ## Update peer's ``peer`` score with value ``score``. peer.score = peer.score + score if peer.score > PeerScoreHighLimit: peer.score = PeerScoreHighLimit func updateStats*(peer: Peer, index: SyncResponseKind, value: uint64) {.inline.} = ## Update peer's ``peer`` specific ``index`` statistics with value ``value``. peer.statistics.update(index, value) func getStats*(peer: Peer, index: SyncResponseKind): uint64 {.inline.} = ## Returns current statistics value for peer ``peer`` and index ``index``. peer.statistics.get(index) func calcThroughput(dur: Duration, value: uint64): float = let secs = float(chronos.seconds(1).nanoseconds) if isZero(dur): 0.0 else: float(value) * (secs / float(dur.nanoseconds)) func updateNetThroughput(peer: Peer, dur: Duration, bytesCount: uint64) {.inline.} = ## Update peer's ``peer`` network throughput. let bytesPerSecond = calcThroughput(dur, bytesCount) let a = peer.netThroughput.average let n = peer.netThroughput.count peer.netThroughput.average = a + (bytesPerSecond - a) / float(n + 1) inc(peer.netThroughput.count) func netKbps*(peer: Peer): float {.inline.} = ## Returns current network throughput average value in Kbps for peer ``peer``. round(((peer.netThroughput.average / 1024) * 10_000) / 10_000) func `<`(a, b: Peer): bool = ## Comparison function, which first checks peer's scores, and if the peers' ## score is equal it compares peers' network throughput. if a.score < b.score: true elif a.score == b.score: (a.netThroughput.average < b.netThroughput.average) else: false const maxRequestQuota = 1000000 maxGlobalQuota = 2 * maxRequestQuota ## Roughly, this means we allow 2 peers to sync from us at a time fullReplenishTime = 5.seconds template awaitQuota*(peerParam: Peer, costParam: float, protocolIdParam: string) = let peer = peerParam cost = int(costParam) if not peer.quota.tryConsume(cost.int): let protocolId = protocolIdParam debug "Awaiting peer quota", peer, cost, protocolId nbc_reqresp_messages_throttled.inc(1, [protocolId]) await peer.quota.consume(cost.int) template awaitQuota*(networkParam: Eth2Node, costParam: float, protocolIdParam: string) = let network = networkParam cost = int(costParam) if not network.quota.tryConsume(cost.int): let protocolId = protocolIdParam debug "Awaiting network quota", peer, cost, protocolId nbc_reqresp_messages_throttled.inc(1, [protocolId]) await network.quota.consume(cost.int) func allowedOpsPerSecondCost*(n: int): float = const replenishRate = (maxRequestQuota / fullReplenishTime.nanoseconds.float) (replenishRate * 1000000000'f / n.float) const libp2pRequestCost = allowedOpsPerSecondCost(8) ## Maximum number of libp2p requests per peer per second proc isSeen(network: Eth2Node, peerId: PeerId): bool = ## Returns ``true`` if ``peerId`` present in SeenTable and time period is not ## yet expired. let currentTime = now(chronos.Moment) if peerId notin network.seenTable: false else: let item = try: network.seenTable[peerId] except KeyError: raiseAssert "checked with notin" if currentTime >= item.stamp: # Peer is in SeenTable, but the time period has expired. network.seenTable.del(peerId) false else: true proc addSeen(network: Eth2Node, peerId: PeerId, period: chronos.Duration) = ## Adds peer with PeerId ``peerId`` to SeenTable and timeout ``period``. let item = SeenItem(peerId: peerId, stamp: now(chronos.Moment) + period) withValue(network.seenTable, peerId, entry) do: if entry.stamp < item.stamp: entry.stamp = item.stamp do: network.seenTable[peerId] = item proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async: (raises: [CancelledError]).} = # Per the specification, we MAY send a disconnect reason to the other peer but # we currently don't - the fact that we're disconnecting is obvious and the # reason already known (wrong network is known from status message) or doesn't # greatly matter for the listening side (since it can't be trusted anyway) try: if peer.connectionState notin {Disconnecting, Disconnected}: peer.connectionState = Disconnecting # We adding peer in SeenTable before actual disconnect to avoid races. let seenTime = case reason of ClientShutDown: SeenTableTimeClientShutDown of IrrelevantNetwork: SeenTableTimeIrrelevantNetwork of FaultOrError: SeenTableTimeFaultOrError of PeerScoreLow: SeenTablePenaltyError peer.network.addSeen(peer.peerId, seenTime) await peer.network.switch.disconnect(peer.peerId) except CancelledError as exc: raise exc except CatchableError as exc: # switch.disconnect shouldn't raise warn "Unexpected error while disconnecting peer", peer = peer.peerId, reason = reason, exc = exc.msg proc releasePeer(peer: Peer) = ## Checks for peer's score and disconnects peer if score is less than ## `PeerScoreLowLimit`. if peer.connectionState notin {ConnectionState.Disconnecting, ConnectionState.Disconnected}: if peer.score < PeerScoreLowLimit: debug "Peer was disconnected due to low score", peer = peer, peer_score = peer.score, score_low_limit = PeerScoreLowLimit, score_high_limit = PeerScoreHighLimit asyncSpawn(peer.disconnect(PeerScoreLow)) proc getRequestProtoName(fn: NimNode): NimNode = # `getCustomPragmaVal` doesn't work yet on regular nnkProcDef nodes # (TODO: file as an issue) let pragmas = fn.pragma if pragmas.kind == nnkPragma and pragmas.len > 0: for pragma in pragmas: try: if pragma.len > 0 and $pragma[0] == "libp2pProtocol": let protoName = $(pragma[1]) let protoVer = $(pragma[2].intVal) return newLit(requestPrefix & protoName & "/" & protoVer & requestSuffix) except Exception as exc: raiseAssert exc.msg # TODO https://github.com/nim-lang/Nim/issues/17454 return newLit("") proc add(s: var seq[byte], pos: var int, bytes: openArray[byte]) = s[pos.. 127: return byteutils.toHex(asSeq(msg)) string.fromBytes(asSeq(msg)) proc sendErrorResponse(peer: Peer, conn: Connection, responseCode: ResponseCode, errMsg: ErrorMsg): Future[void] = debug "Error processing request", peer, responseCode, errMsg = formatErrorMsg(errMsg) conn.writeChunk(Opt.some responseCode, SSZ.encode(errMsg)) proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: seq[byte]) {.async: (raises: [CancelledError]).} = # Notifications are sent as a best effort, ie errors are not reported back # to the caller let deadline = sleepAsync RESP_TIMEOUT_DUR streamRes = awaitWithTimeout(peer.network.openStream(peer, protocolId), deadline): debug "Timeout while opening stream for notification", peer, protocolId return let stream = streamRes.valueOr: debug "Could not open stream for notification", peer, protocolId, error = streamRes.error return try: await stream.writeChunk(Opt.none ResponseCode, requestBytes) except CancelledError as exc: raise exc except CatchableError as exc: debug "Error while writing notification", peer, protocolId, exc = exc.msg finally: try: await noCancel stream.close() except CatchableError as exc: debug "Unexpected error while closing notification stream", peer, protocolId, exc = exc.msg proc sendResponseChunkBytesSZ( response: UntypedResponse, uncompressedLen: uint64, payloadSZ: openArray[byte], contextBytes: openArray[byte] = []): Future[void] = inc response.writtenChunks response.stream.writeChunkSZ( Opt.some ResponseCode.Success, uncompressedLen, payloadSZ, contextBytes) proc sendResponseChunkBytes( response: UntypedResponse, payload: openArray[byte], contextBytes: openArray[byte] = []): Future[void] = inc response.writtenChunks response.stream.writeChunk(Opt.some ResponseCode.Success, payload, contextBytes) proc sendResponseChunk( response: UntypedResponse, val: auto, contextBytes: openArray[byte] = []): Future[void] = sendResponseChunkBytes(response, SSZ.encode(val), contextBytes) template sendUserHandlerResultAsChunkImpl*(stream: Connection, handlerResultFut: Future): untyped = let handlerRes = await handlerResultFut writeChunk(stream, Opt.some ResponseCode.Success, SSZ.encode(handlerRes)) template sendUserHandlerResultAsChunkImpl*(stream: Connection, handlerResult: auto): untyped = writeChunk(stream, Opt.some ResponseCode.Success, SSZ.encode(handlerResult)) proc uncompressFramedStream(conn: Connection, expectedSize: int): Future[Result[seq[byte], string]] {.async: (raises: [CancelledError]).} = var header: array[framingHeader.len, byte] try: await conn.readExactly(addr header[0], header.len) except LPStreamEOFError, LPStreamIncompleteError: return err "Unexpected EOF before snappy header" except CancelledError as exc: raise exc except CatchableError as exc: return err "Unexpected error reading header: " & exc.msg if header != framingHeader: return err "Incorrect snappy header" static: doAssert maxCompressedFrameDataLen >= maxUncompressedFrameDataLen.uint64 var frameData = newSeqUninitialized[byte](maxCompressedFrameDataLen + 4) output = newSeqUninitialized[byte](expectedSize) written = 0 while written < expectedSize: var frameHeader: array[4, byte] try: await conn.readExactly(addr frameHeader[0], frameHeader.len) except LPStreamEOFError, LPStreamIncompleteError: return err "Snappy frame header missing" except CancelledError as exc: raise exc except CatchableError as exc: return err "Unexpected error reading frame header: " & exc.msg let (id, dataLen) = decodeFrameHeader(frameHeader) if dataLen > frameData.len: # In theory, compressed frames could be bigger and still result in a # valid, small snappy frame, but this would mean they are not getting # compressed correctly return err "Snappy frame too big" if dataLen > 0: try: await conn.readExactly(addr frameData[0], dataLen) except LPStreamEOFError, LPStreamIncompleteError: return err "Incomplete snappy frame" except CancelledError as exc: raise exc except CatchableError as exc: return err "Unexpected error reading frame data: " & exc.msg if id == chunkCompressed: if dataLen < 6: # At least CRC + 2 bytes of frame data return err "Compressed snappy frame too small" let crc = uint32.fromBytesLE frameData.toOpenArray(0, 3) uncompressed = snappy.uncompress( frameData.toOpenArray(4, dataLen - 1), output.toOpenArray(written, output.high)).valueOr: return err "Failed to decompress content" if maskedCrc( output.toOpenArray(written, written + uncompressed-1)) != crc: return err "Snappy content CRC checksum failed" written += uncompressed elif id == chunkUncompressed: if dataLen < 5: # At least one byte of data return err "Uncompressed snappy frame too small" let uncompressed = dataLen - 4 if uncompressed > maxUncompressedFrameDataLen.int: return err "Snappy frame size too large" if uncompressed > output.len - written: return err "Too much data" let crc = uint32.fromBytesLE frameData.toOpenArray(0, 3) if maskedCrc(frameData.toOpenArray(4, dataLen - 1)) != crc: return err "Snappy content CRC checksum failed" output[written.. maxSize: return neterr SizePrefixOverflow if size == 0: return neterr ZeroSizePrefix # The `size.int` conversion is safe because `size` is bounded to `MAX_CHUNK_SIZE` let dataRes = await conn.uncompressFramedStream(size.int) data = dataRes.valueOr: debug "Snappy decompression/read failed", msg = $dataRes.error, conn return neterr InvalidSnappyBytes # `10` is the maximum size of variable integer on wire, so error could # not be significant. peer.updateNetThroughput(now(chronos.Moment) - sm, uint64(10 + size)) try: ok SSZ.decode(data, MsgType) except SerializationError: neterr InvalidSszBytes proc readResponseChunk( conn: Connection, peer: Peer, MsgType: typedesc): Future[NetRes[MsgType]] {.async: (raises: [CancelledError]).} = mixin readChunkPayload var responseCodeByte: byte try: await conn.readExactly(addr responseCodeByte, 1) except LPStreamEOFError, LPStreamIncompleteError: return neterr PotentiallyExpectedEOF except CancelledError as exc: raise exc except CatchableError as exc: warn "Unexpected error", exc = exc.msg return neterr UnknownError static: assert ResponseCode.low.ord == 0 if responseCodeByte > ResponseCode.high.byte: return neterr InvalidResponseCode let responseCode = ResponseCode responseCodeByte case responseCode: of InvalidRequest, ServerError, ResourceUnavailable: let errorMsg = ? await readChunkPayload(conn, peer, ErrorMsg) errorMsgStr = toPrettyString(errorMsg.asSeq) debug "Error response from peer", responseCode, errMsg = errorMsgStr return err Eth2NetworkingError(kind: ReceivedErrorResponse, responseCode: responseCode, errorMsg: errorMsgStr) of Success: discard return await readChunkPayload(conn, peer, MsgType) proc readResponse(conn: Connection, peer: Peer, MsgType: type, timeout: Duration): Future[NetRes[MsgType]] {.async: (raises: [CancelledError]).} = when MsgType is List: type E = MsgType.T var results: MsgType while true: # Because we interleave networking with response processing, it may # happen that reading all chunks takes longer than a strict dealine # timeout would allow, so we allow each chunk a new timeout instead. # The problem is exacerbated by the large number of round-trips to the # poll loop that each future along the way causes. trace "reading chunk", conn let nextFut = conn.readResponseChunk(peer, E) if not await nextFut.withTimeout(timeout): return neterr(ReadResponseTimeout) let nextRes = await nextFut if nextRes.isErr: if nextRes.error.kind == PotentiallyExpectedEOF: trace "EOF chunk", conn, err = nextRes.error return ok results trace "Error chunk", conn, err = nextRes.error return err nextRes.error else: trace "Got chunk", conn if not results.add nextRes.value: return neterr(ResponseChunkOverflow) else: let nextFut = conn.readResponseChunk(peer, MsgType) if not await nextFut.withTimeout(timeout): return neterr(ReadResponseTimeout) return await nextFut # Guaranteed to complete without waiting proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: seq[byte], ResponseMsg: type, timeout: Duration): Future[NetRes[ResponseMsg]] {.async: (raises: [CancelledError]).} = let deadline = sleepAsync timeout streamRes = awaitWithTimeout(peer.network.openStream(peer, protocolId), deadline): return neterr StreamOpenTimeout stream = ?streamRes try: # Send the request # Some clients don't want a length sent for empty requests # So don't send anything on empty requests if requestBytes.len > 0: await stream.writeChunk(Opt.none ResponseCode, requestBytes) # Half-close the stream to mark the end of the request - if this is not # done, the other peer might never send us the response. await stream.close() nbc_reqresp_messages_sent.inc(1, [shortProtocolId(protocolId)]) # Read the response let res = await readResponse(stream, peer, ResponseMsg, timeout) if res.isErr(): if res.error().kind in ProtocolViolations: peer.updateScore(PeerScoreInvalidRequest) else: peer.updateScore(PeerScorePoorRequest) res except CancelledError as exc: raise exc except CatchableError: peer.updateScore(PeerScorePoorRequest) neterr BrokenConnection finally: try: await noCancel stream.closeWithEOF() except CatchableError as exc: debug "Unexpected error while closing stream", peer, protocolId, exc = exc.msg proc init*(T: type MultipleChunksResponse, peer: Peer, conn: Connection): T = T(UntypedResponse(peer: peer, stream: conn)) proc init*[MsgType](T: type SingleChunkResponse[MsgType], peer: Peer, conn: Connection): T = T(UntypedResponse(peer: peer, stream: conn)) template write*[M; maxLen: static Limit]( r: MultipleChunksResponse[M, maxLen], val: M, contextBytes: openArray[byte] = []): untyped = mixin sendResponseChunk sendResponseChunk(UntypedResponse(r), val, contextBytes) template writeSSZ*[M; maxLen: static Limit]( r: MultipleChunksResponse[M, maxLen], val: auto, contextBytes: openArray[byte] = []): untyped = mixin sendResponseChunk sendResponseChunk(UntypedResponse(r), val, contextBytes) template writeBytesSZ*( r: MultipleChunksResponse, uncompressedLen: uint64, bytes: openArray[byte], contextBytes: openArray[byte]): untyped = sendResponseChunkBytesSZ(UntypedResponse(r), uncompressedLen, bytes, contextBytes) template send*[M]( r: SingleChunkResponse[M], val: M, contextBytes: openArray[byte] = []): untyped = mixin sendResponseChunk doAssert UntypedResponse(r).writtenChunks == 0 sendResponseChunk(UntypedResponse(r), val, contextBytes) template sendSSZ*[M]( r: SingleChunkResponse[M], val: auto, contextBytes: openArray[byte] = []): untyped = mixin sendResponseChunk doAssert UntypedResponse(r).writtenChunks == 0 sendResponseChunk(UntypedResponse(r), val, contextBytes) proc performProtocolHandshakes(peer: Peer, incoming: bool) {.async: (raises: [CancelledError]).} = # Loop down serially because it's easier to reason about the connection state # when there are fewer async races, specially during setup for protocol in peer.network.protocols: if protocol.onPeerConnected != nil: await protocol.onPeerConnected(peer, incoming) proc initProtocol(name: string, peerInit: PeerStateInitializer, networkInit: NetworkStateInitializer, index: int): ProtocolInfoObj = ProtocolInfoObj( name: name, messages: @[], index: index, peerStateInitializer: peerInit, networkStateInitializer: networkInit) proc setEventHandlers(p: ProtocolInfo, onPeerConnected: OnPeerConnectedHandler, onPeerDisconnected: OnPeerDisconnectedHandler) = p.onPeerConnected = onPeerConnected p.onPeerDisconnected = onPeerDisconnected proc implementSendProcBody(sendProc: SendProc) = let msg = sendProc.msg UntypedResponse = bindSym "UntypedResponse" proc sendCallGenerator(peer, bytes: NimNode): NimNode = if msg.kind != msgResponse: let msgProto = getRequestProtoName(msg.procDef) case msg.kind of msgRequest: let ResponseRecord = msg.response.recName quote: makeEth2Request(`peer`, `msgProto`, `bytes`, `ResponseRecord`, `timeoutVar`) else: quote: sendNotificationMsg(`peer`, `msgProto`, `bytes`) else: quote: sendResponseChunkBytes(`UntypedResponse`(`peer`), `bytes`) sendProc.useStandardBody(nil, nil, sendCallGenerator) proc handleIncomingStream(network: Eth2Node, conn: Connection, protocolId: string, MsgType: type) {.async: (raises: [CancelledError]).} = mixin callUserHandler, RecType type MsgRec = RecType(MsgType) const msgName {.used.} = typetraits.name(MsgType) ## Uncomment this to enable tracing on all incoming requests ## You can include `msgNameLit` in the condition to select ## more specific requests: # when chronicles.runtimeFilteringEnabled: # setLogLevel(LogLevel.TRACE) # defer: setLogLevel(LogLevel.DEBUG) # trace "incoming " & `msgNameLit` & " conn" let peer = peerFromStream(network, conn) try: case peer.connectionState of Disconnecting, Disconnected, None: # We got incoming stream request while disconnected or disconnecting. debug "Got incoming request from disconnected peer", peer = peer, message = msgName return of Connecting: # We got incoming stream request while handshake is not yet finished, # TODO: We could check it here. debug "Got incoming request from peer while in handshake", peer = peer, msgName of Connected: # We got incoming stream from peer with proper connection state. debug "Got incoming request from peer", peer = peer, msgName template returnInvalidRequest(msg: ErrorMsg) = peer.updateScore(PeerScoreInvalidRequest) await sendErrorResponse(peer, conn, InvalidRequest, msg) return template returnInvalidRequest(msg: string) = returnInvalidRequest(ErrorMsg msg.toBytes) template returnResourceUnavailable(msg: ErrorMsg) = await sendErrorResponse(peer, conn, ResourceUnavailable, msg) return template returnResourceUnavailable(msg: string) = returnResourceUnavailable(ErrorMsg msg.toBytes) nbc_reqresp_messages_received.inc(1, [shortProtocolId(protocolId)]) const isEmptyMsg = when MsgRec is object: # We need nested `when` statements here, because Nim doesn't properly # apply boolean short-circuit logic at compile time and this causes # `totalSerializedFields` to be applied to non-object types that it # doesn't know how to support. when totalSerializedFields(MsgRec) == 0: true else: false else: false let msg = try: when isEmptyMsg: NetRes[MsgRec].ok default(MsgRec) else: # TODO(zah) The TTFB timeout is not implemented in LibP2P streams # back-end let deadline = sleepAsync RESP_TIMEOUT_DUR awaitWithTimeout( readChunkPayload(conn, peer, MsgRec), deadline): # Timeout, e.g., cancellation due to fulfillment by different peer. # Treat this similarly to `UnexpectedEOF`, `PotentiallyExpectedEOF`. nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)]) await sendErrorResponse( peer, conn, InvalidRequest, errorMsgLit "Request full data not sent in time") return finally: # The request quota is shared between all requests - it represents the # cost to perform a service on behalf of a client and is incurred # regardless if the request succeeds or fails - we don't count waiting # for this quota against timeouts so as not to prematurely disconnect # clients that are on the edge - nonetheless, the client will count it. # When a client exceeds their quota, they will be slowed down without # notification - as long as they don't make parallel requests (which is # limited by libp2p), this will naturally adapt them to the available # quota. # Note that the `msg` will be stored in memory while we wait for the # quota to be available. The amount of such messages in memory is # bounded by the libp2p limit of parallel streams # This quota also applies to invalid requests thanks to the use of # `finally`. awaitQuota(peer, libp2pRequestCost, shortProtocolId(protocolId)) if msg.isErr: if msg.error.kind in ProtocolViolations: peer.updateScore(PeerScoreInvalidRequest) else: peer.updateScore(PeerScorePoorRequest) nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)]) let (responseCode, errMsg) = case msg.error.kind of UnexpectedEOF, PotentiallyExpectedEOF: nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)]) (InvalidRequest, errorMsgLit "Incomplete request") of InvalidContextBytes: (ServerError, errorMsgLit "Unrecognized context bytes") of InvalidSnappyBytes: (InvalidRequest, errorMsgLit "Failed to decompress snappy payload") of InvalidSszBytes: (InvalidRequest, errorMsgLit "Failed to decode SSZ payload") of InvalidSizePrefix: (InvalidRequest, errorMsgLit "Invalid chunk size prefix") of ZeroSizePrefix: (InvalidRequest, errorMsgLit "The request chunk cannot have a size of zero") of SizePrefixOverflow: (InvalidRequest, errorMsgLit "The chunk size exceed the maximum allowed") of InvalidResponseCode, ReceivedErrorResponse, StreamOpenTimeout, ReadResponseTimeout: # These shouldn't be possible in a request, because # there are no response codes being read, no stream # openings and no reading of responses: (ServerError, errorMsgLit "Internal server error") of BrokenConnection: return of ResponseChunkOverflow: (InvalidRequest, errorMsgLit "Too many chunks in response") of UnknownError: (InvalidRequest, errorMsgLit "Unknown error while processing request") await sendErrorResponse(peer, conn, responseCode, errMsg) return try: # logReceivedMsg(peer, MsgType(msg.get)) await callUserHandler(MsgType, peer, conn, msg.get) except InvalidInputsError as exc: nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)]) returnInvalidRequest exc.msg except ResourceUnavailableError as exc: returnResourceUnavailable exc.msg except CatchableError as exc: nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)]) await sendErrorResponse(peer, conn, ServerError, ErrorMsg exc.msg.toBytes) except CatchableError as exc: nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)]) debug "Error processing an incoming request", exc = exc.msg, msgName finally: try: await noCancel conn.closeWithEOF() except CatchableError as exc: debug "Unexpected error while closing incoming connection", exc = exc.msg releasePeer(peer) proc toPeerAddr*(r: enr.TypedRecord, proto: IpTransportProtocol): Result[PeerAddr, cstring] = if not r.secp256k1.isSome: return err("enr: no secp256k1 key in record") let pubKey = ? keys.PublicKey.fromRaw(r.secp256k1.get) peerId = ? PeerId.init(crypto.PublicKey( scheme: Secp256k1, skkey: secp.SkPublicKey(pubKey))) var addrs = newSeq[MultiAddress]() case proto of tcpProtocol: if r.ip.isSome and r.tcp.isSome: let ip = ipv4(r.ip.get) addrs.add MultiAddress.init(ip, tcpProtocol, Port r.tcp.get) if r.ip6.isSome: let ip = ipv6(r.ip6.get) if r.tcp6.isSome: addrs.add MultiAddress.init(ip, tcpProtocol, Port r.tcp6.get) elif r.tcp.isSome: addrs.add MultiAddress.init(ip, tcpProtocol, Port r.tcp.get) else: discard of udpProtocol: if r.ip.isSome and r.udp.isSome: let ip = ipv4(r.ip.get) addrs.add MultiAddress.init(ip, udpProtocol, Port r.udp.get) if r.ip6.isSome: let ip = ipv6(r.ip6.get) if r.udp6.isSome: addrs.add MultiAddress.init(ip, udpProtocol, Port r.udp6.get) elif r.udp.isSome: addrs.add MultiAddress.init(ip, udpProtocol, Port r.udp.get) else: discard if addrs.len == 0: return err("enr: no addresses in record") ok(PeerAddr(peerId: peerId, addrs: addrs)) proc checkPeer(node: Eth2Node, peerAddr: PeerAddr): bool = logScope: peer = peerAddr.peerId let peerId = peerAddr.peerId if node.peerPool.hasPeer(peerId): trace "Already connected" false else: if node.isSeen(peerId): trace "Recently connected" false else: true proc dialPeer(node: Eth2Node, peerAddr: PeerAddr, index = 0) {.async: (raises: [CancelledError]).} = ## Establish connection with remote peer identified by address ``peerAddr``. logScope: peer = peerAddr.peerId index = index if not(node.checkPeer(peerAddr)): return debug "Connecting to discovered peer" var deadline = sleepAsync(node.connectTimeout) var workfut = node.switch.connect( peerAddr.peerId, peerAddr.addrs, forceDial = true ) try: # `or` operation will only raise exception of `workfut`, because `deadline` # could not raise exception. await workfut or deadline if workfut.finished(): if not deadline.finished(): deadline.cancelSoon() inc nbc_successful_dials else: debug "Connection to remote peer timed out" inc nbc_timeout_dials node.addSeen(peerAddr.peerId, SeenTableTimeTimeout) await cancelAndWait(workfut) except CatchableError as exc: debug "Connection to remote peer failed", msg = exc.msg inc nbc_failed_dials node.addSeen(peerAddr.peerId, SeenTableTimeDeadPeer) proc connectWorker(node: Eth2Node, index: int) {.async: (raises: [CancelledError]).} = debug "Connection worker started", index = index while true: # This loop will never produce HIGH CPU usage because it will wait # and block until it not obtains new peer from the queue ``connQueue``. let remotePeerAddr = await node.connQueue.popFirst() # Previous worker dial might have hit the maximum peers. # TODO: could clear the whole connTable and connQueue here also, best # would be to have this event based coming from peer pool or libp2p. if node.peerPool.len < node.hardMaxPeers: await node.dialPeer(remotePeerAddr, index) # Peer was added to `connTable` before adding it to `connQueue`, so we # excluding peer here after processing. node.connTable.excl(remotePeerAddr.peerId) proc toPeerAddr(node: Node): Result[PeerAddr, cstring] = let nodeRecord = ? node.record.toTypedRecord() let peerAddr = ? nodeRecord.toPeerAddr(tcpProtocol) ok(peerAddr) proc trimConnections(node: Eth2Node, count: int) = # Kill `count` peers, scoring them to remove the least useful ones var scores = initOrderedTable[PeerId, int]() # Take into account the stabilitySubnets # During sync, only this will be used to score peers # since gossipsub is not running yet # # A peer subscribed to all stabilitySubnets will # have 640 points var peersInGracePeriod = 0 for peer in node.peers.values: if peer.connectionState != Connected: continue # Metadata pinger is used as grace period if peer.metadata.isNone: peersInGracePeriod.inc() continue let stabilitySubnets = peer.metadata.get().attnets stabilitySubnetsCount = stabilitySubnets.countOnes() thisPeersScore = 10 * stabilitySubnetsCount scores[peer.peerId] = thisPeersScore # Safegard: if we have too many peers in the grace # period, don't kick anyone. Otherwise, they will be # preferred over long-standing peers if peersInGracePeriod > scores.len div 2: return # Split a 1000 points for each topic's peers # + 5 000 points for each subbed topic # This gives priority to peers in topics with few peers # For instance, a topic with `dHigh` peers will give 80 points to each peer # Whereas a topic with `dLow` peers will give 250 points to each peer # # Then, use the average of all topics per peers, to avoid giving too much # point to big peers var gossipScores = initTable[PeerId, tuple[sum: int, count: int]]() for topic, _ in node.pubsub.gossipsub: let peersInMesh = node.pubsub.mesh.peers(topic) peersSubbed = node.pubsub.gossipsub.peers(topic) scorePerMeshPeer = 5_000 div max(peersInMesh, 1) scorePerSubbedPeer = 1_000 div max(peersSubbed, 1) for peer in node.pubsub.gossipsub.getOrDefault(topic): if peer.peerId notin scores: continue let currentVal = gossipScores.getOrDefault(peer.peerId) gossipScores[peer.peerId] = ( currentVal.sum + scorePerSubbedPeer, currentVal.count + 1 ) # Avoid global topics (>75% of peers), which would greatly reduce # the average score for small peers if peersSubbed > scores.len div 4 * 3: continue for peer in node.pubsub.mesh.getOrDefault(topic): if peer.peerId notin scores: continue let currentVal = gossipScores.getOrDefault(peer.peerId) gossipScores[peer.peerId] = ( currentVal.sum + scorePerMeshPeer, currentVal.count + 1 ) for peerId, gScore in gossipScores: scores[peerId] = scores.getOrDefault(peerId) + (gScore.sum div gScore.count) proc sortPerScore(a, b: (PeerId, int)): int = system.cmp(a[1], b[1]) scores.sort(sortPerScore) var toKick = count for peerId in scores.keys: if peerId in node.directPeers: continue debug "kicking peer", peerId, score=scores[peerId] asyncSpawn node.getPeer(peerId).disconnect(PeerScoreLow) dec toKick inc(nbc_cycling_kicked_peers) if toKick <= 0: return proc getLowSubnets(node: Eth2Node, epoch: Epoch): (AttnetBits, SyncnetBits) = # Returns the subnets required to have a healthy mesh # The subnets are computed, to, in order: # - Have 0 subnet with < `dLow` peers from topic subscription # - Have 0 subscribed subnet below `dLow` # - Have 0 subscribed subnet below `dOut` outgoing peers # - Have 0 subnet with < `dHigh` peers from topic subscription nbc_gossipsub_low_fanout.set(0) nbc_gossipsub_good_fanout.set(0) nbc_gossipsub_healthy_fanout.set(0) template findLowSubnets(topicNameGenerator: untyped, SubnetIdType: type, totalSubnets: static int): auto = var lowOutgoingSubnets: BitArray[totalSubnets] notHighOutgoingSubnets: BitArray[totalSubnets] belowDSubnets: BitArray[totalSubnets] belowDOutSubnets: BitArray[totalSubnets] for subNetId in 0 ..< totalSubnets: let topic = topicNameGenerator(node.forkId.fork_digest, SubnetIdType(subNetId)) if node.pubsub.gossipsub.peers(topic) < node.pubsub.parameters.dLow: lowOutgoingSubnets.setBit(subNetId) if node.pubsub.gossipsub.peers(topic) < node.pubsub.parameters.dHigh: notHighOutgoingSubnets.setBit(subNetId) # Not subscribed if topic notin node.pubsub.mesh: continue if node.pubsub.mesh.peers(topic) < node.pubsub.parameters.dLow: belowDSubnets.setBit(subNetId) let outPeers = node.pubsub.mesh.getOrDefault(topic).countIt(it.outbound) if outPeers < node.pubsub.parameters.dOut: belowDOutSubnets.setBit(subNetId) nbc_gossipsub_low_fanout.inc(int64(lowOutgoingSubnets.countOnes())) nbc_gossipsub_good_fanout.inc(int64( notHighOutgoingSubnets.countOnes() - lowOutgoingSubnets.countOnes() )) nbc_gossipsub_healthy_fanout.inc(int64( totalSubnets - notHighOutgoingSubnets.countOnes())) if lowOutgoingSubnets.countOnes() > 0: lowOutgoingSubnets elif belowDSubnets.countOnes() > 0: belowDSubnets elif belowDOutSubnets.countOnes() > 0: belowDOutSubnets else: notHighOutgoingSubnets return ( findLowSubnets(getAttestationTopic, SubnetId, ATTESTATION_SUBNET_COUNT.int), # We start looking one epoch before the transition in order to allow # some time for the gossip meshes to get healthy: if epoch + 1 >= node.cfg.ALTAIR_FORK_EPOCH: findLowSubnets(getSyncCommitteeTopic, SyncSubcommitteeIndex, SYNC_COMMITTEE_SUBNET_COUNT) else: default(SyncnetBits) ) proc runDiscoveryLoop(node: Eth2Node) {.async.} = debug "Starting discovery loop" while true: let currentEpoch = node.getBeaconTime().slotOrZero.epoch (wantedAttnets, wantedSyncnets) = node.getLowSubnets(currentEpoch) wantedAttnetsCount = wantedAttnets.countOnes() wantedSyncnetsCount = wantedSyncnets.countOnes() outgoingPeers = node.peerPool.lenCurrent({PeerType.Outgoing}) targetOutgoingPeers = max(node.wantedPeers div 10, 3) if wantedAttnetsCount > 0 or wantedSyncnetsCount > 0 or outgoingPeers < targetOutgoingPeers: let minScore = if wantedAttnetsCount > 0 or wantedSyncnetsCount > 0: 1 else: 0 discoveredNodes = await node.discovery.queryRandom( node.discoveryForkId, wantedAttnets, wantedSyncnets, minScore) let newPeers = block: var np = newSeq[PeerAddr]() for discNode in discoveredNodes: let res = discNode.toPeerAddr() if res.isErr(): debug "Failed to decode discovery's node address", node = discNode, errMsg = res.error continue let peerAddr = res.get() if node.checkPeer(peerAddr) and peerAddr.peerId notin node.connTable: np.add(peerAddr) np let roomCurrent = node.hardMaxPeers - len(node.peerPool) peersToKick = min(newPeers.len - roomCurrent, node.hardMaxPeers div 5) if peersToKick > 0 and newPeers.len > 0: node.trimConnections(peersToKick) for peerAddr in newPeers: # We adding to pending connections table here, but going # to remove it only in `connectWorker`. node.connTable.incl(peerAddr.peerId) await node.connQueue.addLast(peerAddr) debug "Discovery tick", wanted_peers = node.wantedPeers, current_peers = len(node.peerPool), discovered_nodes = len(discoveredNodes), new_peers = len(newPeers) if len(newPeers) == 0: let currentPeers = len(node.peerPool) if currentPeers <= node.wantedPeers shr 2: # 25% warn "Peer count low, no new peers discovered", discovered_nodes = len(discoveredNodes), new_peers = newPeers, current_peers = currentPeers, wanted_peers = node.wantedPeers # Discovery `queryRandom` can have a synchronous fast path for example # when no peers are in the routing table. Don't run it in continuous loop. # # Also, give some time to dial the discovered nodes and update stats etc await sleepAsync(5.seconds) proc resolvePeer(peer: Peer) = # Resolve task which performs searching of peer's public key and recovery of # ENR using discovery5. We only resolve ENR for peers we know about to avoid # querying the network - as of now, the ENR is not needed, except for # debuggging logScope: peer = peer.peerId let startTime = now(chronos.Moment) let nodeId = block: var key: PublicKey # `secp256k1` keys are always stored inside PeerId. discard peer.peerId.extractPublicKey(key) keys.PublicKey.fromRaw(key.skkey.getBytes()).get().toNodeId() debug "Peer's ENR recovery task started", node_id = $nodeId # This is "fast-path" for peers which was dialed. In this case discovery # already has most recent ENR information about this peer. let gnode = peer.network.discovery.getNode(nodeId) if gnode.isSome(): peer.enr = Opt.some(gnode.get().record) inc(nbc_successful_discoveries) let delay = now(chronos.Moment) - startTime nbc_resolve_time.observe(delay.toFloatSeconds()) debug "Peer's ENR recovered", delay proc handlePeer*(peer: Peer) {.async: (raises: [CancelledError]).} = let res = peer.network.peerPool.addPeerNoWait(peer, peer.direction) case res: of PeerStatus.LowScoreError, PeerStatus.NoSpaceError: # Peer has low score or we do not have enough space in PeerPool, # we are going to disconnect it gracefully. # Peer' state will be updated in connection event. debug "Peer has low score or there no space in PeerPool", peer = peer, reason = res await peer.disconnect(FaultOrError) of PeerStatus.DeadPeerError: # Peer's lifetime future is finished, so its already dead, # we do not need to perform gracefull disconect. # Peer's state will be updated in connection event. discard of PeerStatus.DuplicateError: # Peer is already present in PeerPool, we can't perform disconnect, # because in such case we could kill both connections (connection # which is present in PeerPool and new one). # This is possible bug, because we could enter here only if number # of `peer.connections == 1`, it means that Peer's lifetime is not # tracked properly and we still not received `Disconnected` event. debug "Peer is already present in PeerPool", peer = peer of PeerStatus.Success: # Peer was added to PeerPool. peer.score = NewPeerScore peer.connectionState = Connected # We spawn task which will obtain ENR for this peer. resolvePeer(peer) debug "Peer successfully connected", peer = peer, connections = peer.connections proc onConnEvent( node: Eth2Node, peerId: PeerId, event: ConnEvent) {. async: (raises: [CancelledError]).} = let peer = node.getPeer(peerId) case event.kind of ConnEventKind.Connected: inc peer.connections debug "Peer connection upgraded", peer = $peerId, connections = peer.connections if peer.connections == 1: # Libp2p may connect multiple times to the same peer - using different # transports for both incoming and outgoing. For now, we'll count our # "fist" encounter with the peer as the true connection, leaving the # other connections be - libp2p limits the number of concurrent # connections to the same peer, and only one of these connections will be # active. Nonetheless, this quirk will cause a number of odd behaviours: # * For peer limits, we might miscount the incoming vs outgoing quota # * Protocol handshakes are wonky: we'll not necessarily use the newly # connected transport - instead we'll just pick a random one! case peer.connectionState of Disconnecting: # We got connection with peer which we currently disconnecting. # Normally this does not happen, but if a peer is being disconnected # while a concurrent (incoming for example) connection attempt happens, # we might end up here debug "Got connection attempt from peer that we are disconnecting", peer = peerId try: await node.switch.disconnect(peerId) except CancelledError as exc: raise exc except CatchableError as exc: debug "Unexpected error while disconnecting peer", exc = exc.msg return of None: # We have established a connection with the new peer. peer.connectionState = Connecting of Disconnected: # We have established a connection with the peer that we have seen # before - reusing the existing peer object is fine peer.connectionState = Connecting peer.score = 0 # Will be set to NewPeerScore after handshake of Connecting, Connected: # This means that we got notification event from peer which we already # connected or connecting right now. If this situation will happened, # it means bug on `nim-libp2p` side. warn "Got connection attempt from peer which we already connected", peer = peerId await peer.disconnect(FaultOrError) return # Store connection direction inside Peer object. if event.incoming: peer.direction = PeerType.Incoming else: peer.direction = PeerType.Outgoing await performProtocolHandshakes(peer, event.incoming) of ConnEventKind.Disconnected: dec peer.connections debug "Lost connection to peer", peer = peerId, connections = peer.connections if peer.connections == 0: debug "Peer disconnected", peer = $peerId, connections = peer.connections # Whatever caused disconnection, avoid connection spamming node.addSeen(peerId, SeenTableTimeReconnect) let fut = peer.disconnectedFut if not(isNil(fut)): fut.complete() peer.disconnectedFut = nil else: # TODO (cheatfate): This could be removed when bug will be fixed inside # `nim-libp2p`. debug "Got new event while peer is already disconnected", peer = peerId, peer_state = peer.connectionState peer.connectionState = Disconnected proc new(T: type Eth2Node, config: BeaconNodeConf | LightClientConf, runtimeCfg: RuntimeConfig, enrForkId: ENRForkID, discoveryForkId: ENRForkID, forkDigests: ref ForkDigests, getBeaconTime: GetBeaconTimeFn, switch: Switch, pubsub: GossipSub, ip: Option[IpAddress], tcpPort, udpPort: Option[Port], privKey: keys.PrivateKey, discovery: bool, directPeers: DirectPeers, rng: ref HmacDrbgContext): T {.raises: [CatchableError].} = when not defined(local_testnet): let connectTimeout = chronos.minutes(1) seenThreshold = chronos.minutes(5) else: let connectTimeout = chronos.seconds(10) seenThreshold = chronos.seconds(10) type MetaData = altair.MetaData # Weird bug without this.. # Versions up to v22.3.0 would write an empty `MetaData` to #`data-dir/node-metadata.json` which would then be reloaded on startup - don't # write a file with this name or downgrades will break! const metadata = MetaData() let node = T( switch: switch, pubsub: pubsub, wantedPeers: config.maxPeers, hardMaxPeers: config.hardMaxPeers.get(config.maxPeers * 3 div 2), #*1.5 cfg: runtimeCfg, peerPool: newPeerPool[Peer, PeerId](), # Its important here to create AsyncQueue with limited size, otherwise # it could produce HIGH cpu usage. connQueue: newAsyncQueue[PeerAddr](ConcurrentConnections), metadata: metadata, forkId: enrForkId, discoveryForkId: discoveryForkId, forkDigests: forkDigests, getBeaconTime: getBeaconTime, discovery: Eth2DiscoveryProtocol.new( config, ip, tcpPort, udpPort, privKey, { enrForkIdField: SSZ.encode(enrForkId), enrAttestationSubnetsField: SSZ.encode(metadata.attnets) }, rng), discoveryEnabled: discovery, rng: rng, connectTimeout: connectTimeout, seenThreshold: seenThreshold, directPeers: directPeers, quota: TokenBucket.new(maxGlobalQuota, fullReplenishTime) ) proc peerHook(peerId: PeerId, event: ConnEvent): Future[void] {.gcsafe.} = onConnEvent(node, peerId, event) switch.addConnEventHandler(peerHook, ConnEventKind.Connected) switch.addConnEventHandler(peerHook, ConnEventKind.Disconnected) proc scoreCheck(peer: Peer): bool = peer.score >= PeerScoreLowLimit proc onDeletePeer(peer: Peer) = peer.releasePeer() node.peerPool.setScoreCheck(scoreCheck) node.peerPool.setOnDeletePeer(onDeletePeer) node proc registerProtocol*(node: Eth2Node, Proto: type, state: Proto.NetworkState) = # This convoluted registration process is a leftover from the shared p2p macro # and should be refactored let proto = Proto.protocolInfo() node.protocols.add(proto) node.protocolStates.setLen(max(proto.index + 1, node.protocolStates.len)) node.protocolStates[proto.index] = state for msg in proto.messages: if msg.protocolMounter != nil: msg.protocolMounter node proc startListening*(node: Eth2Node) {.async.} = if node.discoveryEnabled: try: node.discovery.open() except CatchableError as exc: fatal "Failed to start discovery service. UDP port may be already in use", exc = exc.msg quit 1 try: await node.switch.start() except CatchableError as exc: fatal "Failed to start LibP2P transport. TCP port may be already in use", exc = exc.msg quit 1 proc peerPingerHeartbeat(node: Eth2Node): Future[void] {.async: (raises: [CancelledError]).} proc peerTrimmerHeartbeat(node: Eth2Node): Future[void] {.async: (raises: [CancelledError]).} proc start*(node: Eth2Node) {.async: (raises: [CancelledError]).} = proc onPeerCountChanged() = trace "Number of peers has been changed", length = len(node.peerPool) nbc_peers.set int64(len(node.peerPool)) node.peerPool.setPeerCounter(onPeerCountChanged) for i in 0 ..< ConcurrentConnections: node.connWorkers.add connectWorker(node, i) if node.discoveryEnabled: node.discovery.start() traceAsyncErrors node.runDiscoveryLoop() else: notice "Discovery disabled; trying bootstrap nodes", nodes = node.discovery.bootstrapRecords.len for enr in node.discovery.bootstrapRecords: let tr = enr.toTypedRecord() if tr.isOk(): let pa = tr.get().toPeerAddr(tcpProtocol) if pa.isOk(): await node.connQueue.addLast(pa.get()) node.peerPingerHeartbeatFut = node.peerPingerHeartbeat() node.peerTrimmerHeartbeatFut = node.peerTrimmerHeartbeat() proc stop*(node: Eth2Node) {.async: (raises: [CancelledError]).} = # Ignore errors in futures, since we're shutting down (but log them on the # TRACE level, if a timeout is reached). var waitedFutures = @[ node.switch.stop(), node.peerPingerHeartbeat.cancelAndWait(), node.peerTrimmerHeartbeatFut.cancelAndWait(), ] if node.discoveryEnabled: waitedFutures &= node.discovery.closeWait() let timeout = 5.seconds completed = await withTimeout(allFutures(waitedFutures), timeout) if not completed: trace "Eth2Node.stop(): timeout reached", timeout, futureErrors = waitedFutures.filterIt(it.error != nil).mapIt(it.error.msg) proc init(T: type Peer, network: Eth2Node, peerId: PeerId): Peer = let res = Peer( peerId: peerId, network: network, connectionState: ConnectionState.None, lastReqTime: now(chronos.Moment), lastMetadataTime: now(chronos.Moment), quota: TokenBucket.new(maxRequestQuota.int, fullReplenishTime) ) res.protocolStates.setLen(network.protocolStates.len()) for proto in network.protocols: if not(isNil(proto.peerStateInitializer)): res.protocolStates[proto.index] = proto.peerStateInitializer(res) res proc registerMsg(protocol: ProtocolInfo, name: string, mounter: MounterProc, libp2pCodecName: string) = protocol.messages.add MessageInfo(name: name, protocolMounter: mounter, libp2pCodecName: libp2pCodecName) proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = var Format = ident "SSZ" Connection = bindSym "Connection" Peer = bindSym "Peer" Eth2Node = bindSym "Eth2Node" registerMsg = bindSym "registerMsg" initProtocol = bindSym "initProtocol" msgVar = ident "msg" networkVar = ident "network" callUserHandler = ident "callUserHandler" MSG = ident "MSG" new result result.PeerType = Peer result.NetworkType = Eth2Node result.setEventHandlers = bindSym "setEventHandlers" result.SerializationFormat = Format result.RequestResultsWrapper = ident "NetRes" result.implementMsg = proc (msg: eth2_protocol_dsl.Message) = if msg.kind == msgResponse: return let protocol = msg.protocol msgName = $msg.ident msgNameLit = newLit msgName MsgRecName = msg.recName MsgStrongRecName = msg.strongRecName codecNameLit = getRequestProtoName(msg.procDef) protocolMounterName = ident(msgName & "Mounter") ## ## Implement the Thunk: ## ## The protocol handlers in nim-libp2p receive only a `Connection` ## parameter and there is no way to access the wider context (such ## as the current `Switch`). In our handlers, we may need to list all ## peers in the current network, so we must keep a reference to the ## network object in the closure environment of the installed handlers. ## ## For this reason, we define a `protocol mounter` proc that will ## initialize the network object by creating handlers bound to the ## specific network. ## var userHandlerCall = newTree(nnkDiscardStmt) if msg.userHandler != nil: var OutputParamType = if msg.kind == msgRequest: msg.outputParamType else: nil if OutputParamType == nil: userHandlerCall = msg.genUserHandlerCall(msgVar, [peerVar]) if msg.kind == msgRequest: userHandlerCall = newCall(ident"sendUserHandlerResultAsChunkImpl", streamVar, userHandlerCall) else: if OutputParamType.kind == nnkVarTy: OutputParamType = OutputParamType[0] let isChunkStream = eqIdent(OutputParamType[0], "MultipleChunksResponse") msg.response.recName = if isChunkStream: newTree(nnkBracketExpr, ident"List", OutputParamType[1], OutputParamType[2]) else: OutputParamType[1] let responseVar = ident("response") userHandlerCall = newStmtList( newVarStmt(responseVar, newCall(ident"init", OutputParamType, peerVar, streamVar)), msg.genUserHandlerCall(msgVar, [peerVar], outputParam = responseVar)) protocol.outRecvProcs.add quote do: template `callUserHandler`(`MSG`: type `MsgStrongRecName`, `peerVar`: `Peer`, `streamVar`: `Connection`, `msgVar`: `MsgRecName`): untyped = `userHandlerCall` proc `protocolMounterName`(`networkVar`: `Eth2Node`) {.raises: [].} = proc snappyThunk(`streamVar`: `Connection`, `protocolVar`: string): Future[void] {.gcsafe.} = return handleIncomingStream(`networkVar`, `streamVar`, `protocolVar`, `MsgStrongRecName`) try: mount `networkVar`.switch, LPProtocol(codecs: @[`codecNameLit`], handler: snappyThunk) except LPError as exc: # Failure here indicates that the mounting was done incorrectly which # would be a programming error raiseAssert exc.msg ## ## Implement Senders and Handshake ## if msg.kind == msgHandshake: macros.error "Handshake messages are not supported in LibP2P protocols" else: var sendProc = msg.createSendProc() implementSendProcBody sendProc protocol.outProcRegistrations.add( newCall(registerMsg, protocol.protocolInfoVar, msgNameLit, protocolMounterName, codecNameLit)) result.implementProtocolInit = proc (p: P2PProtocol): NimNode = # This `macrocache` counter gives each protocol its own integer index which # is later used to index per-protocol, per-instace data kept in the peer and # network - the counter is global across all modules / protocols of the # application let id = CacheCounter"eth2_network_protocol_id" tmp = id.value id.inc(1) newCall(initProtocol, newLit(p.name), p.peerInit, p.netInit, newLit(tmp)) #Must import here because of cyclicity import ./peer_protocol export peer_protocol proc updatePeerMetadata(node: Eth2Node, peerId: PeerId) {.async: (raises: [CancelledError]).} = trace "updating peer metadata", peerId let peer = node.getPeer(peerId) newMetadataRes = await peer.getMetadata_v2() newMetadata = newMetadataRes.valueOr: debug "Failed to retrieve metadata from peer!", peerId, error = newMetadataRes.error peer.failedMetadataRequests.inc() return peer.metadata = Opt.some(newMetadata) peer.failedMetadataRequests = 0 peer.lastMetadataTime = Moment.now() const # For Phase0, metadata change every +27 hours MetadataRequestFrequency = 30.minutes MetadataRequestMaxFailures = 3 proc peerPingerHeartbeat(node: Eth2Node) {.async: (raises: [CancelledError]).} = while true: let heartbeatStart_m = Moment.now() var updateFutures: seq[Future[void]] for peer in node.peers.values: if peer.connectionState != Connected: continue if peer.metadata.isNone or heartbeatStart_m - peer.lastMetadataTime > MetadataRequestFrequency: updateFutures.add(node.updatePeerMetadata(peer.peerId)) await allFutures(updateFutures) reset(updateFutures) for peer in node.peers.values: if peer.connectionState != Connected: continue if peer.failedMetadataRequests > MetadataRequestMaxFailures: debug "no metadata from peer, kicking it", peer updateFutures.add(peer.disconnect(PeerScoreLow)) await allFutures(updateFutures) await sleepAsync(5.seconds) proc peerTrimmerHeartbeat(node: Eth2Node) {.async: (raises: [CancelledError]).} = # Disconnect peers in excess of the (soft) max peer count while true: # Only count Connected peers (to avoid counting Disconnecting ones) let connectedPeers = node.peers.values.countIt( it.connectionState == Connected) excessPeers = connectedPeers - node.wantedPeers if excessPeers > 0: # Let chronos take back control every trimming node.trimConnections(1) await sleepAsync(1.seconds div max(1, excessPeers)) func asEthKey*(key: PrivateKey): keys.PrivateKey = keys.PrivateKey(key.skkey) template tcpEndPoint(address, port): auto = MultiAddress.init(address, tcpProtocol, port) func initNetKeys(privKey: PrivateKey): NetKeyPair = let pubKey = privKey.getPublicKey().expect("working public key from random") NetKeyPair(seckey: privKey, pubkey: pubKey) proc getRandomNetKeys*(rng: var HmacDrbgContext): NetKeyPair = let privKey = PrivateKey.random(Secp256k1, rng).valueOr: fatal "Could not generate random network key file" quit QuitFailure initNetKeys(privKey) proc getPersistentNetKeys*( rng: var HmacDrbgContext, dataDir, netKeyFile: string, netKeyInsecurePassword: bool, allowLoadExisting: bool): NetKeyPair = if netKeyFile == "random": let keys = rng.getRandomNetKeys() pres = PeerId.init(keys.pubkey).valueOr: fatal "Could not obtain PeerId from network key", error quit QuitFailure info "Generating new networking key", network_public_key = keys.pubkey, network_peer_id = $pres keys else: let # Insecure password used only for automated testing. insecurePassword = if netKeyInsecurePassword: Opt.some(NetworkInsecureKeyPassword) else: Opt.none(string) keyPath = if isAbsolute(netKeyFile): netKeyFile else: dataDir / netKeyFile logScope: key_path = keyPath if fileAccessible(keyPath, {AccessFlags.Find}) and allowLoadExisting: info "Network key storage is present, unlocking" let privKey = loadNetKeystore(keyPath, insecurePassword).valueOr: fatal "Could not load network key file" quit QuitFailure keys = initNetKeys(privKey) info "Network key storage was successfully unlocked", network_public_key = keys.pubkey keys else: if allowLoadExisting: info "Network key storage is missing, creating a new one", key_path = keyPath let keys = rng.getRandomNetKeys() sres = saveNetKeystore(rng, keyPath, keys.seckey, insecurePassword) if sres.isErr(): fatal "Could not create network key file" quit QuitFailure info "New network key storage was created", network_public_key = keys.pubkey keys proc getPersistentNetKeys*( rng: var HmacDrbgContext, config: BeaconNodeConf): NetKeyPair = case config.cmd of BNStartUpCmd.noCommand, BNStartUpCmd.record: rng.getPersistentNetKeys( string(config.dataDir), config.netKeyFile, config.netKeyInsecurePassword, allowLoadExisting = true) else: rng.getRandomNetKeys() func gossipId( data: openArray[byte], phase0Prefix, topic: string): seq[byte] = # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/phase0/p2p-interface.md#topics-and-messages # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.6/specs/altair/p2p-interface.md#topics-and-messages const MESSAGE_DOMAIN_VALID_SNAPPY = [0x01'u8, 0x00, 0x00, 0x00] let messageDigest = withEth2Hash: h.update(MESSAGE_DOMAIN_VALID_SNAPPY) if not topic.startsWith(phase0Prefix): # everything >= altair h.update topic.len.uint64.toBytesLE h.update topic h.update data messageDigest.data[0..19] proc newBeaconSwitch(config: BeaconNodeConf | LightClientConf, seckey: PrivateKey, address: MultiAddress, rng: ref HmacDrbgContext): Switch {.raises: [CatchableError].} = var sb = if config.enableYamux: SwitchBuilder.new().withYamux() else: SwitchBuilder.new() # Order of multiplexers matters, the first will be default sb .withPrivateKey(seckey) .withAddress(address) .withRng(rng) .withNoise() .withMplex(chronos.minutes(5), chronos.minutes(5)) .withMaxConnections(config.maxPeers) .withAgentVersion(config.agentString) .withTcpTransport({ServerFlags.ReuseAddr}) .build() proc createEth2Node*(rng: ref HmacDrbgContext, config: BeaconNodeConf | LightClientConf, netKeys: NetKeyPair, cfg: RuntimeConfig, forkDigests: ref ForkDigests, getBeaconTime: GetBeaconTimeFn, genesis_validators_root: Eth2Digest): Eth2Node {.raises: [CatchableError].} = let enrForkId = getENRForkID( cfg, getBeaconTime().slotOrZero.epoch, genesis_validators_root) discoveryForkId = getDiscoveryForkID( cfg, getBeaconTime().slotOrZero.epoch, genesis_validators_root) (extIp, extTcpPort, extUdpPort) = try: setupAddress( config.nat, config.listenAddress, config.tcpPort, config.udpPort, clientId) except CatchableError as exc: raise exc directPeers = block: var res: DirectPeers for s in config.directPeers: let (peerId, address) = if s.startsWith("enr:"): let typedEnr = parseBootstrapAddress(s).get().toTypedRecord().get() peerAddress = toPeerAddr(typedEnr, tcpProtocol).get() (peerAddress.peerId, peerAddress.addrs[0]) elif s.startsWith("/"): parseFullAddress(s).tryGet() else: fatal "direct peers address should start with / (multiaddress) or enr:", conf=s quit 1 res.mgetOrPut(peerId, @[]).add(address) info "Adding privileged direct peer", peerId, address res hostAddress = tcpEndPoint(config.listenAddress, config.tcpPort) announcedAddresses = if extIp.isNone() or extTcpPort.isNone(): @[] else: @[tcpEndPoint(extIp.get(), extTcpPort.get())] debug "Initializing networking", hostAddress, network_public_key = netKeys.pubkey, announcedAddresses # TODO nim-libp2p still doesn't have support for announcing addresses # that are different from the host address (this is relevant when we # are running behind a NAT). var switch = newBeaconSwitch(config, netKeys.seckey, hostAddress, rng) let phase0Prefix = "/eth2/" & $forkDigests.phase0 func msgIdProvider(m: messages.Message): Result[seq[byte], ValidationResult] = template topic: untyped = if m.topicIds.len > 0: m.topicIds[0] else: "" try: # This doesn't have to be a tight bound, just enough to avoid denial of # service attacks. let decoded = snappy.decode(m.data, static(GOSSIP_MAX_SIZE.uint32)) ok(gossipId(decoded, phase0Prefix, topic)) except CatchableError: err(ValidationResult.Reject) let params = GossipSubParams( explicit: true, pruneBackoff: chronos.minutes(1), unsubscribeBackoff: chronos.seconds(10), floodPublish: true, gossipFactor: 0.05, d: 8, dLow: 6, dHigh: 12, dScore: 6, dOut: 6 div 2, # less than dlow and no more than dlow/2 dLazy: 6, heartbeatInterval: chronos.milliseconds(700), historyLength: 6, historyGossip: 3, fanoutTTL: chronos.seconds(60), seenTTL: chronos.seconds(385), gossipThreshold: -4000, publishThreshold: -8000, graylistThreshold: -16000, # also disconnect threshold opportunisticGraftThreshold: 0, decayInterval: chronos.seconds(12), decayToZero: 0.01, retainScore: chronos.seconds(385), appSpecificWeight: 0.0, ipColocationFactorWeight: -53.75, ipColocationFactorThreshold: 3.0, behaviourPenaltyWeight: -15.9, behaviourPenaltyDecay: 0.986, disconnectBadPeers: true, directPeers: directPeers, bandwidthEstimatebps: config.bandwidthEstimate.get(100_000_000) ) pubsub = GossipSub.init( switch = switch, msgIdProvider = msgIdProvider, # We process messages in the validator, so we don't need data callbacks triggerSelf = false, sign = false, verifySignature = false, anonymize = true, maxMessageSize = static(GOSSIP_MAX_SIZE.int), parameters = params) switch.mount(pubsub) let node = Eth2Node.new( config, cfg, enrForkId, discoveryForkId, forkDigests, getBeaconTime, switch, pubsub, extIp, extTcpPort, extUdpPort, netKeys.seckey.asEthKey, discovery = config.discv5Enabled, directPeers, rng = rng) node.pubsub.subscriptionValidator = proc(topic: string): bool {.gcsafe, raises: [].} = topic in node.validTopics node func announcedENR*(node: Eth2Node): enr.Record = doAssert node.discovery != nil, "The Eth2Node must be initialized" node.discovery.localNode.record func shortForm*(id: NetKeyPair): string = $PeerId.init(id.pubkey) proc subscribe*( node: Eth2Node, topic: string, topicParams: TopicParams, enableTopicMetrics: bool = false) = if enableTopicMetrics: node.pubsub.knownTopics.incl(topic) node.pubsub.topicParams[topic] = topicParams # Passing in `nil` because we do all message processing in the validator node.pubsub.subscribe(topic, nil) proc newValidationResultFuture(v: ValidationResult): Future[ValidationResult] {.async: (raises: [CancelledError], raw: true).} = let res = newFuture[ValidationResult]("eth2_network.execValidator") res.complete(v) res proc addValidator*[MsgType](node: Eth2Node, topic: string, msgValidator: proc(msg: MsgType): ValidationResult {.gcsafe, raises: [].} ) = # Message validators run when subscriptions are enabled - they validate the # data and return an indication of whether the message should be broadcast # or not - validation is `async` but implemented without the macro because # this is a performance hotspot. proc execValidator(topic: string, message: GossipMsg): Future[ValidationResult] {.raises: [].} = inc nbc_gossip_messages_received trace "Validating incoming gossip message", len = message.data.len, topic var decompressed = snappy.decode(message.data, gossipMaxSize(MsgType)) let res = if decompressed.len > 0: try: let decoded = SSZ.decode(decompressed, MsgType) decompressed = newSeq[byte](0) # release memory before validating msgValidator(decoded) # doesn't raise! except SerializationError as e: inc nbc_gossip_failed_ssz debug "Error decoding gossip", topic, len = message.data.len, decompressed = decompressed.len, error = e.msg ValidationResult.Reject else: # snappy returns empty seq on failed decompression inc nbc_gossip_failed_snappy debug "Error decompressing gossip", topic, len = message.data.len ValidationResult.Reject newValidationResultFuture(res) node.validTopics.incl topic # Only allow subscription to validated topics node.pubsub.addValidator(topic, execValidator) proc addAsyncValidator*[MsgType](node: Eth2Node, topic: string, msgValidator: proc(msg: MsgType): Future[ValidationResult] {.async: (raises: [CancelledError]).} ) = proc execValidator(topic: string, message: GossipMsg): Future[ValidationResult] {.async: (raw: true).} = inc nbc_gossip_messages_received trace "Validating incoming gossip message", len = message.data.len, topic var decompressed = snappy.decode(message.data, gossipMaxSize(MsgType)) if decompressed.len > 0: try: let decoded = SSZ.decode(decompressed, MsgType) decompressed = newSeq[byte](0) # release memory before validating msgValidator(decoded) # doesn't raise! except SerializationError as e: inc nbc_gossip_failed_ssz debug "Error decoding gossip", topic, len = message.data.len, decompressed = decompressed.len, error = e.msg newValidationResultFuture(ValidationResult.Reject) else: # snappy returns empty seq on failed decompression inc nbc_gossip_failed_snappy debug "Error decompressing gossip", topic, len = message.data.len newValidationResultFuture(ValidationResult.Reject) node.validTopics.incl topic # Only allow subscription to validated topics node.pubsub.addValidator(topic, execValidator) proc unsubscribe*(node: Eth2Node, topic: string) = node.pubsub.unsubscribeAll(topic) proc gossipEncode(msg: auto): seq[byte] = let uncompressed = SSZ.encode(msg) # This function only for messages we create. A message this large amounts to # an internal logic error. doAssert uncompressed.lenu64 <= GOSSIP_MAX_SIZE snappy.encode(uncompressed) proc broadcast(node: Eth2Node, topic: string, msg: seq[byte]): Future[SendResult] {.async: (raises: [CancelledError]).} = let peers = try: await node.pubsub.publish(topic, msg) except CancelledError as exc: raise exc except CatchableError as exc: debug "Unexpected error during broadcast", exc = exc.msg return err("Broadcast failed") # TODO remove workaround for sync committee BN/VC log spam if peers > 0 or find(topic, "sync_committee_") != -1: inc nbc_gossip_messages_sent ok() else: # Increments libp2p_gossipsub_failed_publish metric err("No peers on libp2p topic") proc broadcast(node: Eth2Node, topic: string, msg: auto): Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = # Avoid {.async.} copies of message while broadcasting broadcast(node, topic, gossipEncode(msg)) proc subscribeAttestationSubnets*( node: Eth2Node, subnets: AttnetBits, forkDigest: ForkDigest) = # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/phase0/p2p-interface.md#attestations-and-aggregation # Nimbus won't score attestation subnets for now, we just rely on block and # aggregate which are more stable and reliable for subnet_id, enabled in subnets: if enabled: node.subscribe(getAttestationTopic( forkDigest, SubnetId(subnet_id)), TopicParams.init()) # don't score attestation subnets for now proc unsubscribeAttestationSubnets*( node: Eth2Node, subnets: AttnetBits, forkDigest: ForkDigest) = # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/phase0/p2p-interface.md#attestations-and-aggregation # Nimbus won't score attestation subnets for now; we just rely on block and # aggregate which are more stable and reliable for subnet_id, enabled in subnets: if enabled: node.unsubscribe(getAttestationTopic(forkDigest, SubnetId(subnet_id))) proc updateStabilitySubnetMetadata*(node: Eth2Node, attnets: AttnetBits) = # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/phase0/p2p-interface.md#metadata if node.metadata.attnets == attnets: return node.metadata.seq_number += 1 node.metadata.attnets = attnets # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/phase0/p2p-interface.md#attestation-subnet-subscription # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.4/specs/phase0/p2p-interface.md#attestation-subnet-bitfield let res = node.discovery.updateRecord({ enrAttestationSubnetsField: SSZ.encode(node.metadata.attnets) }) if res.isErr(): # This should not occur in this scenario as the private key would always # be the correct one and the ENR will not increase in size. warn "Failed to update the ENR attnets field", error = res.error else: debug "Stability subnets changed; updated ENR attnets", attnets proc updateSyncnetsMetadata*(node: Eth2Node, syncnets: SyncnetBits) = # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.6/specs/altair/validator.md#sync-committee-subnet-stability if node.metadata.syncnets == syncnets: return node.metadata.seq_number += 1 node.metadata.syncnets = syncnets let res = node.discovery.updateRecord({ enrSyncSubnetsField: SSZ.encode(node.metadata.syncnets) }) if res.isErr(): # This should not occur in this scenario as the private key would always # be the correct one and the ENR will not increase in size. warn "Failed to update the ENR syncnets field", error = res.error else: debug "Sync committees changed; updated ENR syncnets", syncnets proc updateForkId(node: Eth2Node, value: ENRForkID) = node.forkId = value let res = node.discovery.updateRecord({enrForkIdField: SSZ.encode value}) if res.isErr(): # This should not occur in this scenario as the private key would always # be the correct one and the ENR will not increase in size. warn "Failed to update the ENR fork id", value, error = res.error else: debug "ENR fork id changed", value proc updateForkId*(node: Eth2Node, epoch: Epoch, genesis_validators_root: Eth2Digest) = node.updateForkId(getENRForkID(node.cfg, epoch, genesis_validators_root)) node.discoveryForkId = getDiscoveryForkID(node.cfg, epoch, genesis_validators_root) func forkDigestAtEpoch*(node: Eth2Node, epoch: Epoch): ForkDigest = node.forkDigests[].atEpoch(epoch, node.cfg) proc getWallEpoch(node: Eth2Node): Epoch = node.getBeaconTime().slotOrZero.epoch proc broadcastAttestation*( node: Eth2Node, subnet_id: SubnetId, attestation: Attestation): Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = # Regardless of the contents of the attestation, # https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/altair/p2p-interface.md#transitioning-the-gossip # implies that pre-fork, messages using post-fork digests might be # ignored, whilst post-fork, there is effectively a seen_ttl-based # timer unsubscription point that means no new pre-fork-forkdigest # should be sent. let forkPrefix = node.forkDigestAtEpoch(node.getWallEpoch) topic = getAttestationTopic(forkPrefix, subnet_id) node.broadcast(topic, attestation) proc broadcastVoluntaryExit*( node: Eth2Node, exit: SignedVoluntaryExit): Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = let topic = getVoluntaryExitsTopic(node.forkDigestAtEpoch(node.getWallEpoch)) node.broadcast(topic, exit) proc broadcastAttesterSlashing*( node: Eth2Node, slashing: AttesterSlashing): Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = let topic = getAttesterSlashingsTopic( node.forkDigestAtEpoch(node.getWallEpoch)) node.broadcast(topic, slashing) proc broadcastProposerSlashing*( node: Eth2Node, slashing: ProposerSlashing): Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = let topic = getProposerSlashingsTopic( node.forkDigestAtEpoch(node.getWallEpoch)) node.broadcast(topic, slashing) proc broadcastBlsToExecutionChange*( node: Eth2Node, bls_to_execution_change: SignedBLSToExecutionChange): Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = let topic = getBlsToExecutionChangeTopic( node.forkDigestAtEpoch(node.getWallEpoch)) node.broadcast(topic, bls_to_execution_change) proc broadcastAggregateAndProof*( node: Eth2Node, proof: SignedAggregateAndProof): Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = let topic = getAggregateAndProofsTopic( node.forkDigestAtEpoch(node.getWallEpoch)) node.broadcast(topic, proof) proc broadcastBeaconBlock*( node: Eth2Node, blck: phase0.SignedBeaconBlock): Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = let topic = getBeaconBlocksTopic(node.forkDigests.phase0) node.broadcast(topic, blck) proc broadcastBeaconBlock*( node: Eth2Node, blck: altair.SignedBeaconBlock): Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = let topic = getBeaconBlocksTopic(node.forkDigests.altair) node.broadcast(topic, blck) proc broadcastBeaconBlock*( node: Eth2Node, blck: bellatrix.SignedBeaconBlock): Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = let topic = getBeaconBlocksTopic(node.forkDigests.bellatrix) node.broadcast(topic, blck) proc broadcastBeaconBlock*( node: Eth2Node, blck: capella.SignedBeaconBlock): Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = let topic = getBeaconBlocksTopic(node.forkDigests.capella) node.broadcast(topic, blck) proc broadcastBeaconBlock*( node: Eth2Node, blck: deneb.SignedBeaconBlock): Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = let topic = getBeaconBlocksTopic(node.forkDigests.deneb) node.broadcast(topic, blck) proc broadcastBlobSidecar*( node: Eth2Node, subnet_id: BlobId, blob: deneb.BlobSidecar): Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = let forkPrefix = node.forkDigestAtEpoch(node.getWallEpoch) topic = getBlobSidecarTopic(forkPrefix, subnet_id) node.broadcast(topic, blob) proc broadcastSyncCommitteeMessage*( node: Eth2Node, msg: SyncCommitteeMessage, subcommitteeIdx: SyncSubcommitteeIndex): Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = let topic = getSyncCommitteeTopic( node.forkDigestAtEpoch(node.getWallEpoch), subcommitteeIdx) node.broadcast(topic, msg) proc broadcastSignedContributionAndProof*( node: Eth2Node, msg: SignedContributionAndProof): Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = let topic = getSyncCommitteeContributionAndProofTopic( node.forkDigestAtEpoch(node.getWallEpoch)) node.broadcast(topic, msg) proc broadcastLightClientFinalityUpdate*( node: Eth2Node, msg: ForkyLightClientFinalityUpdate): Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = let topic = getLightClientFinalityUpdateTopic( node.forkDigestAtEpoch(msg.contextEpoch)) node.broadcast(topic, msg) proc broadcastLightClientOptimisticUpdate*( node: Eth2Node, msg: ForkyLightClientOptimisticUpdate): Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = let topic = getLightClientOptimisticUpdateTopic( node.forkDigestAtEpoch(msg.contextEpoch)) node.broadcast(topic, msg)