From b98f46c04dfe9295393117bb1f40dfba4ac4f97f Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Sat, 13 Jan 2024 10:54:24 +0100 Subject: [PATCH] Avoid global in p2p macro (fixes #4578) (#5719) * Avoid global in p2p macro (fixes #4578) * copy p2p macro to this repo and start de-crufting it * make protocol registration dynamic, removing light client hacks et al * split out light client protocol into its own file * cleanups * Option -> Opt * remove more cruft * further split beacon_sync this allows the light client to respond to peer metadata messages without exposing the block sync protocol * better protocol init * "constant" protocol index * avoid casts * copyright * move some discovery code to discovery * avoid extraneous data copy when sending chunks * remove redundant forkdigest field * document how to connect to a specific peer --- beacon_chain/networking/eth2_discovery.nim | 90 +- beacon_chain/networking/eth2_network.nim | 386 +++---- beacon_chain/networking/eth2_protocol_dsl.nim | 961 ++++++++++++++++++ beacon_chain/networking/peer_protocol.nim | 247 +++++ beacon_chain/nimbus_beacon_node.nim | 14 +- beacon_chain/nimbus_light_client.nim | 5 +- beacon_chain/spec/datatypes/base.nim | 4 +- .../spec/eth2_apis/rest_builder_calls.nim | 6 +- .../spec/eth2_apis/rest_config_calls.nim | 4 +- .../spec/eth2_apis/rest_event_calls.nim | 4 +- .../spec/eth2_apis/rest_keymanager_calls.nim | 4 +- .../eth2_apis/rest_light_client_calls.nim | 2 +- .../spec/eth2_apis/rest_nimbus_calls.nim | 2 +- .../eth2_apis/rest_remote_signer_calls.nim | 6 +- .../spec/eth2_apis/rest_validator_calls.nim | 4 +- beacon_chain/sync/light_client_manager.nim | 2 +- beacon_chain/sync/light_client_protocol.nim | 192 ++++ beacon_chain/sync/sync_manager.nim | 4 +- beacon_chain/sync/sync_protocol.nim | 405 +------- .../validators/keystore_management.nim | 12 +- docs/the_nimbus_book/src/developers.md | 5 + 21 files changed, 1673 insertions(+), 686 deletions(-) create mode 100644 beacon_chain/networking/eth2_protocol_dsl.nim create mode 100644 beacon_chain/networking/peer_protocol.nim create mode 100644 beacon_chain/sync/light_client_protocol.nim diff --git a/beacon_chain/networking/eth2_discovery.nim b/beacon_chain/networking/eth2_discovery.nim index e5c46dd49..e917fdfeb 100644 --- a/beacon_chain/networking/eth2_discovery.nim +++ b/beacon_chain/networking/eth2_discovery.nim @@ -8,21 +8,24 @@ {.push raises: [].} import - chronicles, stew/shims/net, stew/results, - eth/p2p/discoveryv5/[enr, protocol, node], + std/[algorithm, sequtils], + chronos, chronicles, stew/results, + eth/p2p/discoveryv5/[enr, protocol, node, random2], + ../spec/datatypes/altair, + ../spec/eth2_ssz_serialization, ".."/[conf, conf_light_client] from std/os import splitFile from std/strutils import cmpIgnoreCase, split, startsWith, strip, toLowerAscii -export protocol +export protocol, node type Eth2DiscoveryProtocol* = protocol.Protocol Eth2DiscoveryId* = NodeId export - Eth2DiscoveryProtocol, open, start, close, closeWait, queryRandom, + Eth2DiscoveryProtocol, open, start, close, closeWait, updateRecord, results func parseBootstrapAddress*(address: string): @@ -100,3 +103,82 @@ proc new*(T: type Eth2DiscoveryProtocol, newProtocol(pk, enrIp, enrTcpPort, enrUdpPort, enrFields, bootstrapEnrs, bindPort = config.udpPort, bindIp = config.listenAddress, enrAutoUpdate = config.enrAutoUpdate, rng = rng) + +func isCompatibleForkId*(discoveryForkId: ENRForkID, peerForkId: ENRForkID): bool = + if discoveryForkId.fork_digest == peerForkId.fork_digest: + if discoveryForkId.next_fork_version < peerForkId.next_fork_version: + # Peer knows about a fork and we don't + true + elif discoveryForkId.next_fork_version == peerForkId.next_fork_version: + # We should have the same next_fork_epoch + discoveryForkId.next_fork_epoch == peerForkId.next_fork_epoch + + else: + # Our next fork version is bigger than the peer's one + false + else: + # Wrong fork digest + false + +proc queryRandom*( + d: Eth2DiscoveryProtocol, + forkId: ENRForkID, + wantedAttnets: AttnetBits, + wantedSyncnets: SyncnetBits, + minScore: int): Future[seq[Node]] {.async.} = + ## Perform a discovery query for a random target + ## (forkId) and matching at least one of the attestation subnets. + + let nodes = await d.queryRandom() + + var filtered: seq[(int, Node)] + for n in nodes: + var score: int = 0 + + let + eth2FieldBytes = n.record.get(enrForkIdField, seq[byte]).valueOr: + continue + peerForkId = + try: + SSZ.decode(eth2FieldBytes, ENRForkID) + except SszError as e: + debug "Could not decode the eth2 field of peer", + peer = n.record.toURI(), exception = e.name, msg = e.msg + continue + + if not forkId.isCompatibleForkId(peerForkId): + continue + + let attnetsBytes = n.record.get(enrAttestationSubnetsField, seq[byte]) + if attnetsBytes.isOk(): + let attnetsNode = + try: + SSZ.decode(attnetsBytes.get(), AttnetBits) + except SszError as e: + debug "Could not decode the attnets ERN bitfield of peer", + peer = n.record.toURI(), exception = e.name, msg = e.msg + continue + + for i in 0..= minScore: + filtered.add((score, n)) + + d.rng[].shuffle(filtered) + return filtered.sortedByIt(-it[0]).mapIt(it[1]) diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index 71035ee25..c5af1e7df 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -15,7 +15,7 @@ import stew/[leb128, endians2, results, byteutils, io2, bitops2], stew/shims/net as stewNet, stew/shims/[macros], - faststreams/[inputs, outputs, buffers], snappy, snappy/faststreams, + snappy, json_serialization, json_serialization/std/[net, sets, options], chronos, chronos/ratelimit, chronicles, metrics, libp2p/[switch, peerinfo, multiaddress, multicodec, crypto/crypto, @@ -23,13 +23,13 @@ import libp2p/protocols/pubsub/[ pubsub, gossipsub, rpc/message, rpc/messages, peertable, pubsubpeer], libp2p/stream/connection, - eth/[keys, async_utils], eth/p2p/p2p_protocol_dsl, + eth/[keys, async_utils], eth/net/nat, eth/p2p/discoveryv5/[enr, node, random2], ".."/[version, conf, beacon_clock, conf_light_client], ../spec/datatypes/[phase0, altair, bellatrix], ../spec/[eth2_ssz_serialization, network, helpers, forks], ../validators/keystore_management, - "."/[eth2_discovery, libp2p_json_serialization, peer_pool, peer_scores] + "."/[eth2_discovery, eth2_protocol_dsl, libp2p_json_serialization, peer_pool, peer_scores] export tables, chronos, ratelimit, version, multiaddress, peerinfo, p2pProtocol, @@ -44,7 +44,6 @@ type PublicKey* = crypto.PublicKey PrivateKey* = crypto.PrivateKey - Bytes = seq[byte] ErrorMsg = List[byte, 256] SendResult* = Result[void, cstring] @@ -66,6 +65,8 @@ type wantedPeers*: int hardMaxPeers*: int peerPool*: PeerPool[Peer, PeerId] + protocols: seq[ProtocolInfo] + ## Protocols managed by the DSL and mounted on the switch protocolStates*: seq[RootRef] metadata*: altair.MetaData connectTimeout*: chronos.Duration @@ -88,8 +89,6 @@ type quota: TokenBucket ## Global quota mainly for high-bandwidth stuff - EthereumNode = Eth2Node # needed for the definitions in p2p_backends_helpers - AverageThroughput* = object count*: uint64 average*: float @@ -105,8 +104,8 @@ type quota*: TokenBucket lastReqTime*: Moment connections*: int - enr*: Option[enr.Record] - metadata*: Option[altair.MetaData] + enr*: Opt[enr.Record] + metadata*: Opt[altair.MetaData] failedMetadataRequests: int lastMetadataTime*: Moment direction*: PeerType @@ -144,7 +143,6 @@ type # Private fields: libp2pCodecName: string protocolMounter*: MounterProc - isRequired, isLightClientRequest: bool ProtocolInfoObj* = object name*: string @@ -167,11 +165,11 @@ type ResourceUnavailable PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe, raises: [].} - NetworkStateInitializer* = proc(network: EthereumNode): RootRef {.gcsafe, raises: [].} + NetworkStateInitializer* = proc(network: Eth2Node): RootRef {.gcsafe, raises: [].} OnPeerConnectedHandler* = proc(peer: Peer, incoming: bool): Future[void] {.gcsafe, raises: [].} OnPeerDisconnectedHandler* = proc(peer: Peer): Future[void] {.gcsafe, raises: [].} ThunkProc* = LPProtoHandler - MounterProc* = proc(network: Eth2Node) {.gcsafe, raises: [CatchableError].} + MounterProc* = proc(network: Eth2Node) {.gcsafe, raises: [].} MessageContentPrinter* = proc(msg: pointer): string {.gcsafe, raises: [].} # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/phase0/p2p-interface.md#goodbye @@ -324,9 +322,7 @@ when libp2p_pki_schemes != "secp256k1": const NetworkInsecureKeyPassword = "INSECUREPASSWORD" -template libp2pProtocol*(name: string, version: int, - isRequired = false, - isLightClientRequest = false) {.pragma.} +template libp2pProtocol*(name: string, version: int) {.pragma.} func shortLog*(peer: Peer): string = shortLog(peer.peerId) chronicles.formatIt(Peer): shortLog(it) @@ -354,6 +350,37 @@ proc openStream(node: Eth2Node, proc init(T: type Peer, network: Eth2Node, peerId: PeerId): Peer {.gcsafe.} +proc getState*(peer: Peer, proto: ProtocolInfo): RootRef = + doAssert peer.protocolStates[proto.index] != nil, $proto.index + peer.protocolStates[proto.index] + +template state*(peer: Peer, Protocol: type): untyped = + ## Returns the state object of a particular protocol for a + ## particular connection. + mixin State + bind getState + type S = Protocol.State + S(getState(peer, Protocol.protocolInfo)) + +proc getNetworkState*(node: Eth2Node, proto: ProtocolInfo): RootRef = + doAssert node.protocolStates[proto.index] != nil, $proto.index + node.protocolStates[proto.index] + +template protocolState*(node: Eth2Node, Protocol: type): untyped = + mixin NetworkState + bind getNetworkState + type S = Protocol.NetworkState + S(getNetworkState(node, Protocol.protocolInfo)) + +proc initProtocolState*[T](state: T, x: Peer|Eth2Node) + {.gcsafe, raises: [].} = + discard + +template networkState*(connection: Peer, Protocol: type): untyped = + ## Returns the network state object of a particular protocol for a + ## particular connection. + protocolState(connection.network, Protocol) + func peerId*(node: Eth2Node): PeerId = node.switch.peerInfo.peerId @@ -528,9 +555,6 @@ proc releasePeer*(peer: Peer) = score_high_limit = PeerScoreHighLimit asyncSpawn(peer.disconnect(PeerScoreLow)) -include eth/p2p/p2p_backends_helpers -include eth/p2p/p2p_tracing - proc getRequestProtoName(fn: NimNode): NimNode = # `getCustomPragmaVal` doesn't work yet on regular nnkProcDef nodes # (TODO: file as an issue) @@ -547,110 +571,53 @@ proc getRequestProtoName(fn: NimNode): NimNode = return newLit("") -proc isRequiredProto(fn: NimNode): NimNode = - # `getCustomPragmaVal` doesn't work yet on regular nnkProcDef nodes - # (TODO: file as an issue) - - let pragmas = fn.pragma - if pragmas.kind == nnkPragma and pragmas.len > 0: - for pragma in pragmas: - try: - if pragma.len > 0 and $pragma[0] == "libp2pProtocol": - if pragma.len <= 3: - return newLit(false) - for i in 3 ..< pragma.len: - let param = pragma[i] - case param.kind - of nnkExprEqExpr: - if $param[0] == "isRequired": - if $param[1] == "true": - return newLit(true) - if $param[1] == "false": - return newLit(false) - raiseAssert "Unexpected value: " & $param - if $param[0] != "isLightClientRequest": - raiseAssert "Unexpected param: " & $param - of nnkIdent: - if i == 3: - return newLit(param.boolVal) - else: raiseAssert "Unexpected kind: " & param.kind.repr - return newLit(false) - except Exception as exc: raiseAssert exc.msg # TODO https://github.com/nim-lang/Nim/issues/17454 - - return newLit(false) - -proc isLightClientRequestProto(fn: NimNode): NimNode = - # `getCustomPragmaVal` doesn't work yet on regular nnkProcDef nodes - # (TODO: file as an issue) - - let pragmas = fn.pragma - if pragmas.kind == nnkPragma and pragmas.len > 0: - for pragma in pragmas: - try: - if pragma.len > 0 and $pragma[0] == "libp2pProtocol": - if pragma.len <= 3: - return newLit(false) - for i in 3 ..< pragma.len: - let param = pragma[i] - case param.kind - of nnkExprEqExpr: - if $param[0] == "isLightClientRequest": - if $param[1] == "true": - return newLit(true) - if $param[1] == "false": - return newLit(false) - raiseAssert "Unexpected value: " & $param - if $param[0] != "isRequired": - raiseAssert "Unexpected param: " & $param - of nnkIdent: - if i == 4: - return newLit(param.boolVal) - else: raiseAssert "Unexpected kind: " & param.kind.repr - return newLit(false) - except Exception as exc: raiseAssert exc.msg # TODO https://github.com/nim-lang/Nim/issues/17454 - - return newLit(false) +proc add(s: var seq[byte], pos: var int, bytes: openArray[byte]) = + s[pos.. 0: - output.write contextBytes + var + data = newSeqUninitialized[byte]( + ord(responseCode.isSome) + contextBytes.len + uncompressedLenBytes.len + + payloadSZ.len) + pos = 0 - output.write toBytes(uncompressedLen, Leb128).toOpenArray() - output.write payloadSZ - except IOError as exc: - raiseAssert exc.msg # memoryOutput shouldn't raise - - conn.write(output.getOutput) + if responseCode.isSome: + data.add(pos, [byte responseCode.get]) + data.add(pos, contextBytes) + data.add(pos, uncompressedLenBytes.toOpenArray()) + data.add(pos, payloadSZ) + conn.write(data) proc writeChunk(conn: Connection, - responseCode: Option[ResponseCode], + responseCode: Opt[ResponseCode], payload: openArray[byte], contextBytes: openArray[byte] = []): Future[void] = - var output = memoryOutput() + let + uncompressedLenBytes = toBytes(payload.lenu64, Leb128) + var + data = newSeqUninitialized[byte]( + ord(responseCode.isSome) + contextBytes.len + uncompressedLenBytes.len + + snappy.maxCompressedLenFramed(payload.len).int) + pos = 0 - try: - if responseCode.isSome: - output.write byte(responseCode.get) + if responseCode.isSome: + data.add(pos, [byte responseCode.get]) + data.add(pos, contextBytes) + data.add(pos, uncompressedLenBytes.toOpenArray()) + let + pre = pos + written = snappy.compressFramed(payload, data.toOpenArray(pos, data.high)) + .expect("compression shouldn't fail with correctly preallocated buffer") + data.setLen(pre + written) - if contextBytes.len > 0: - output.write contextBytes - - output.write toBytes(payload.lenu64, Leb128).toOpenArray() - - compressFramed(payload, output) - except IOError as exc: - raiseAssert exc.msg # memoryOutput shouldn't raise - conn.write(output.getOutput) + conn.write(data) template errorMsgLit(x: static string): ErrorMsg = const val = ErrorMsg toBytes(x) @@ -671,9 +638,9 @@ proc sendErrorResponse(peer: Peer, errMsg: ErrorMsg): Future[void] = debug "Error processing request", peer, responseCode, errMsg = formatErrorMsg(errMsg) - conn.writeChunk(some responseCode, SSZ.encode(errMsg)) + conn.writeChunk(Opt.some responseCode, SSZ.encode(errMsg)) -proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.async.} = +proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: seq[byte]) {.async.} = var deadline = sleepAsync RESP_TIMEOUT_DUR streamFut = peer.network.openStream(peer, protocolId) @@ -686,7 +653,7 @@ proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {. let stream = streamFut.read try: - await stream.writeChunk(none ResponseCode, requestBytes) + await stream.writeChunk(Opt.none ResponseCode, requestBytes) finally: await stream.close() @@ -696,13 +663,13 @@ proc sendResponseChunkBytesSZ( contextBytes: openArray[byte] = []): Future[void] = inc response.writtenChunks response.stream.writeChunkSZ( - some ResponseCode.Success, uncompressedLen, payloadSZ, contextBytes) + Opt.some ResponseCode.Success, uncompressedLen, payloadSZ, contextBytes) proc sendResponseChunkBytes( response: UntypedResponse, payload: openArray[byte], contextBytes: openArray[byte] = []): Future[void] = inc response.writtenChunks - response.stream.writeChunk(some ResponseCode.Success, payload, contextBytes) + response.stream.writeChunk(Opt.some ResponseCode.Success, payload, contextBytes) proc sendResponseChunk( response: UntypedResponse, val: auto, @@ -712,11 +679,11 @@ proc sendResponseChunk( template sendUserHandlerResultAsChunkImpl*(stream: Connection, handlerResultFut: Future): untyped = let handlerRes = await handlerResultFut - writeChunk(stream, some ResponseCode.Success, SSZ.encode(handlerRes)) + writeChunk(stream, Opt.some ResponseCode.Success, SSZ.encode(handlerRes)) template sendUserHandlerResultAsChunkImpl*(stream: Connection, handlerResult: auto): untyped = - writeChunk(stream, some ResponseCode.Success, SSZ.encode(handlerResult)) + writeChunk(stream, Opt.some ResponseCode.Success, SSZ.encode(handlerResult)) proc uncompressFramedStream(conn: Connection, expectedSize: int): Future[Result[seq[byte], cstring]] @@ -948,7 +915,7 @@ proc readResponse(conn: Connection, peer: Peer, return neterr(ReadResponseTimeout) return nextFut.read() -proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes, +proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: seq[byte], ResponseMsg: type, timeout: Duration): Future[NetRes[ResponseMsg]] {.async.} = @@ -960,7 +927,7 @@ proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes, # Some clients don't want a length sent for empty requests # So don't send anything on empty requests if requestBytes.len > 0: - await stream.writeChunk(none ResponseCode, requestBytes) + await stream.writeChunk(Opt.none ResponseCode, requestBytes) # Half-close the stream to mark the end of the request - if this is not # done, the other peer might never send us the response. await stream.close() @@ -1023,26 +990,21 @@ template sendSSZ*[M]( proc performProtocolHandshakes(peer: Peer, incoming: bool) {.async.} = # Loop down serially because it's easier to reason about the connection state # when there are fewer async races, specially during setup - for protocol in allProtocols: + for protocol in peer.network.protocols: if protocol.onPeerConnected != nil: await protocol.onPeerConnected(peer, incoming) proc initProtocol(name: string, peerInit: PeerStateInitializer, - networkInit: NetworkStateInitializer): ProtocolInfoObj = + networkInit: NetworkStateInitializer, + index: int): ProtocolInfoObj = ProtocolInfoObj( name: name, messages: @[], + index: index, peerStateInitializer: peerInit, networkStateInitializer: networkInit) -proc registerProtocol(protocol: ProtocolInfo) = - # TODO: This can be done at compile-time in the future - let pos = lowerBound(gProtocols, protocol) - gProtocols.insert(protocol, pos) - for i in 0 ..< gProtocols.len: - gProtocols[i].index = i - proc setEventHandlers(p: ProtocolInfo, onPeerConnected: OnPeerConnectedHandler, onPeerDisconnected: OnPeerDisconnectedHandler) = @@ -1155,9 +1117,6 @@ proc handleIncomingStream(network: Eth2Node, nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)]) returnInvalidRequest err.formatMsg("msg") - except SnappyError as err: - nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)]) - returnInvalidRequest err.msg finally: # The request quota is shared between all requests - it represents the # cost to perform a service on behalf of a client and is incurred @@ -1226,7 +1185,7 @@ proc handleIncomingStream(network: Eth2Node, return try: - logReceivedMsg(peer, MsgType(msg.get)) + # logReceivedMsg(peer, MsgType(msg.get)) await callUserHandler(MsgType, peer, conn, msg.get) except InvalidInputsError as err: nbc_reqresp_messages_failed.inc(1, [shortProtocolId(protocolId)]) @@ -1360,85 +1319,6 @@ proc toPeerAddr(node: Node): Result[PeerAddr, cstring] = let peerAddr = ? nodeRecord.toPeerAddr(tcpProtocol) ok(peerAddr) -func isCompatibleForkId*(discoveryForkId: ENRForkID, peerForkId: ENRForkID): bool = - if discoveryForkId.fork_digest == peerForkId.fork_digest: - if discoveryForkId.next_fork_version < peerForkId.next_fork_version: - # Peer knows about a fork and we don't - true - elif discoveryForkId.next_fork_version == peerForkId.next_fork_version: - # We should have the same next_fork_epoch - discoveryForkId.next_fork_epoch == peerForkId.next_fork_epoch - - else: - # Our next fork version is bigger than the peer's one - false - else: - # Wrong fork digest - false - -proc queryRandom*( - d: Eth2DiscoveryProtocol, - forkId: ENRForkID, - wantedAttnets: AttnetBits, - wantedSyncnets: SyncnetBits, - minScore: int): Future[seq[Node]] {.async.} = - ## Perform a discovery query for a random target - ## (forkId) and matching at least one of the attestation subnets. - - let nodes = await d.queryRandom() - - var filtered: seq[(int, Node)] - for n in nodes: - var score: int = 0 - - let eth2FieldBytes = n.record.tryGet(enrForkIdField, seq[byte]) - if eth2FieldBytes.isNone(): - continue - let peerForkId = - try: - SSZ.decode(eth2FieldBytes.get(), ENRForkID) - except SszError as e: - debug "Could not decode the eth2 field of peer", - peer = n.record.toURI(), exception = e.name, msg = e.msg - continue - - if not forkId.isCompatibleForkId(peerForkId): - continue - - let attnetsBytes = n.record.tryGet(enrAttestationSubnetsField, seq[byte]) - if attnetsBytes.isSome(): - let attnetsNode = - try: - SSZ.decode(attnetsBytes.get(), AttnetBits) - except SszError as e: - debug "Could not decode the attnets ERN bitfield of peer", - peer = n.record.toURI(), exception = e.name, msg = e.msg - continue - - for i in 0..= minScore: - filtered.add((score, n)) - - d.rng[].shuffle(filtered) - return filtered.sortedByIt(-it[0]).mapIt(it[1]) - proc trimConnections(node: Eth2Node, count: int) = # Kill `count` peers, scoring them to remove the least useful ones @@ -1688,7 +1568,7 @@ proc resolvePeer(peer: Peer) = # already has most recent ENR information about this peer. let gnode = peer.network.discovery.getNode(nodeId) if gnode.isSome(): - peer.enr = some(gnode.get().record) + peer.enr = Opt.some(gnode.get().record) inc(nbc_successful_discoveries) let delay = now(chronos.Moment) - startTime nbc_resolve_time.observe(delay.toFloatSeconds()) @@ -1854,21 +1734,6 @@ proc new(T: type Eth2Node, quota: TokenBucket.new(maxGlobalQuota, fullReplenishTime) ) - newSeq node.protocolStates, allProtocols.len - for proto in allProtocols: - if proto.networkStateInitializer != nil: - node.protocolStates[proto.index] = proto.networkStateInitializer(node) - - for msg in proto.messages: - when config is BeaconNodeConf: - if msg.isLightClientRequest and not config.lightClientDataServe: - continue - elif config is LightClientConf: - if not msg.isRequired: - continue - if msg.protocolMounter != nil: - msg.protocolMounter node - proc peerHook(peerId: PeerId, event: ConnEvent): Future[void] {.gcsafe.} = onConnEvent(node, peerId, event) @@ -1886,6 +1751,18 @@ proc new(T: type Eth2Node, node +proc registerProtocol*(node: Eth2Node, Proto: type, state: Proto.NetworkState) = + # This convoluted registration process is a leftover from the shared p2p macro + # and should be refactored + let proto = Proto.protocolInfo() + node.protocols.add(proto) + node.protocolStates.setLen(max(proto.index + 1, node.protocolStates.len)) + node.protocolStates[proto.index] = state + + for msg in proto.messages: + if msg.protocolMounter != nil: + msg.protocolMounter node + proc startListening*(node: Eth2Node) {.async.} = if node.discoveryEnabled: try: @@ -1958,30 +1835,25 @@ proc init(T: type Peer, network: Eth2Node, peerId: PeerId): Peer = connectionState: ConnectionState.None, lastReqTime: now(chronos.Moment), lastMetadataTime: now(chronos.Moment), - protocolStates: newSeq[RootRef](len(allProtocols)), quota: TokenBucket.new(maxRequestQuota.int, fullReplenishTime) ) - for i in 0 ..< len(allProtocols): - let proto = allProtocols[i] + res.protocolStates.setLen(network.protocolStates.len()) + for proto in network.protocols: if not(isNil(proto.peerStateInitializer)): - res.protocolStates[i] = proto.peerStateInitializer(res) + res.protocolStates[proto.index] = proto.peerStateInitializer(res) res proc registerMsg(protocol: ProtocolInfo, name: string, mounter: MounterProc, - libp2pCodecName: string, - isRequired, isLightClientRequest: bool) = + libp2pCodecName: string) = protocol.messages.add MessageInfo(name: name, protocolMounter: mounter, - libp2pCodecName: libp2pCodecName, - isRequired: isRequired, - isLightClientRequest: isLightClientRequest) + libp2pCodecName: libp2pCodecName) proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = var Format = ident "SSZ" - Bool = bindSym "bool" Connection = bindSym "Connection" Peer = bindSym "Peer" Eth2Node = bindSym "Eth2Node" @@ -1992,19 +1864,15 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = callUserHandler = ident "callUserHandler" MSG = ident "MSG" - p.useRequestIds = false - p.useSingleRecordInlining = true - new result result.PeerType = Peer result.NetworkType = Eth2Node - result.registerProtocol = bindSym "registerProtocol" result.setEventHandlers = bindSym "setEventHandlers" result.SerializationFormat = Format result.RequestResultsWrapper = ident "NetRes" - result.implementMsg = proc (msg: p2p_protocol_dsl.Message) = + result.implementMsg = proc (msg: eth2_protocol_dsl.Message) = if msg.kind == msgResponse: return @@ -2015,8 +1883,6 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = MsgRecName = msg.recName MsgStrongRecName = msg.strongRecName codecNameLit = getRequestProtoName(msg.procDef) - isRequiredLit = isRequiredProto(msg.procDef) - isLightClientRequestLit = isLightClientRequestProto(msg.procDef) protocolMounterName = ident(msgName & "Mounter") ## @@ -2068,15 +1934,19 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = `msgVar`: `MsgRecName`): untyped = `userHandlerCall` - proc `protocolMounterName`(`networkVar`: `Eth2Node`) = + proc `protocolMounterName`(`networkVar`: `Eth2Node`) {.raises: [].} = proc snappyThunk(`streamVar`: `Connection`, `protocolVar`: string): Future[void] {.gcsafe.} = return handleIncomingStream(`networkVar`, `streamVar`, `protocolVar`, `MsgStrongRecName`) - mount `networkVar`.switch, - LPProtocol(codecs: @[`codecNameLit`], handler: snappyThunk) - + try: + mount `networkVar`.switch, + LPProtocol(codecs: @[`codecNameLit`], handler: snappyThunk) + except LPError as exc: + # Failure here indicates that the mounting was done incorrectly which + # would be a programming error + raiseAssert exc.msg ## ## Implement Senders and Handshake ## @@ -2091,16 +1961,17 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = protocol.protocolInfoVar, msgNameLit, protocolMounterName, - codecNameLit, - isRequiredLit, - isLightClientRequestLit)) + codecNameLit)) result.implementProtocolInit = proc (p: P2PProtocol): NimNode = - return newCall(initProtocol, newLit(p.name), p.peerInit, p.netInit) + var id {.global.}: int + let tmp = id + id += 1 + return newCall(initProtocol, newLit(p.name), p.peerInit, p.netInit, newLit(tmp)) #Must import here because of cyclicity -import ../sync/sync_protocol -export sync_protocol +import ./peer_protocol +export peer_protocol proc updatePeerMetadata(node: Eth2Node, peerId: PeerId) {.async.} = trace "updating peer metadata", peerId @@ -2116,7 +1987,7 @@ proc updatePeerMetadata(node: Eth2Node, peerId: PeerId) {.async.} = peer.failedMetadataRequests.inc() return - peer.metadata = some(newMetadata) + peer.metadata = Opt.some(newMetadata) peer.failedMetadataRequests = 0 peer.lastMetadataTime = Moment.now() @@ -2201,9 +2072,9 @@ proc getPersistentNetKeys*( # Insecure password used only for automated testing. insecurePassword = if netKeyInsecurePassword: - some(NetworkInsecureKeyPassword) + Opt.some(NetworkInsecureKeyPassword) else: - none[string]() + Opt.none(string) keyPath = if isAbsolute(netKeyFile): @@ -2286,7 +2157,6 @@ proc newBeaconSwitch(config: BeaconNodeConf | LightClientConf, .withTcpTransport({ServerFlags.ReuseAddr}) .build() - proc createEth2Node*(rng: ref HmacDrbgContext, config: BeaconNodeConf | LightClientConf, netKeys: NetKeyPair, @@ -2601,7 +2471,7 @@ proc updateForkId*(node: Eth2Node, epoch: Epoch, genesis_validators_root: Eth2Di node.updateForkId(getENRForkID(node.cfg, epoch, genesis_validators_root)) node.discoveryForkId = getDiscoveryForkID(node.cfg, epoch, genesis_validators_root) -func forkDigestAtEpoch(node: Eth2Node, epoch: Epoch): ForkDigest = +func forkDigestAtEpoch*(node: Eth2Node, epoch: Epoch): ForkDigest = node.forkDigests[].atEpoch(epoch, node.cfg) proc getWallEpoch(node: Eth2Node): Epoch = diff --git a/beacon_chain/networking/eth2_protocol_dsl.nim b/beacon_chain/networking/eth2_protocol_dsl.nim new file mode 100644 index 000000000..33e9526a2 --- /dev/null +++ b/beacon_chain/networking/eth2_protocol_dsl.nim @@ -0,0 +1,961 @@ +# beacon_chain +# Copyright (c) 2024 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import + std/[sequtils], + results, + stew/shims/macros, chronos, faststreams/outputs + +export chronos, results + +type + MessageKind* = enum + msgHandshake + msgNotification + msgRequest + msgResponse + + Message* = ref object + id*: int + ident*: NimNode + kind*: MessageKind + procDef*: NimNode + timeoutParam*: NimNode + recName*: NimNode + strongRecName*: NimNode + recBody*: NimNode + protocol*: P2PProtocol + response*: Message + userHandler*: NimNode + initResponderCall*: NimNode + outputParamDef*: NimNode + + Request* = ref object + queries*: seq[Message] + response*: Message + + SendProc* = object + ## A `SendProc` is a proc used to send a single P2P message. + ## If it's a Request, then the return type will be a Future + ## of the respective Response type. All send procs also have + ## an automatically inserted `timeout` parameter. + + msg*: Message + ## The message being implemented + + def*: NimNode + ## The definition of the proc + + peerParam*: NimNode + ## Cached ident for the peer param + + msgParams*: seq[NimNode] + ## Cached param ident for all values that must be written + ## on the wire. The automatically inserted `timeout` is not + ## included. + + timeoutParam*: NimNode + ## Cached ident for the timeout parameter + + extraDefs*: NimNode + ## The response procs have extra templates that must become + ## part of the generated code + + P2PProtocol* = ref object + # Settings + name*: string + version*: int + timeouts*: int64 + outgoingRequestDecorator*: NimNode + incomingRequestDecorator*: NimNode + incomingRequestThunkDecorator*: NimNode + incomingResponseDecorator*: NimNode + incomingResponseThunkDecorator*: NimNode + PeerStateType*: NimNode + NetworkStateType*: NimNode + backend*: Backend + + # Cached properties + nameIdent*: NimNode + protocolInfoVar*: NimNode + + # All messages + messages*: seq[Message] + + # Messages by type: + handshake*: Message + notifications*: seq[Message] + requests*: seq[Request] + + # Output procs + outSendProcs*: NimNode + outRecvProcs*: NimNode + outProcRegistrations*: NimNode + + # Event handlers + onPeerConnected*: NimNode + onPeerDisconnected*: NimNode + + Backend* = ref object + # Code generators + implementMsg*: proc (msg: Message) + implementProtocolInit*: proc (protocol: P2PProtocol): NimNode + + afterProtocolInit*: proc (protocol: P2PProtocol) + + # Bound symbols to the back-end run-time types and procs + PeerType*: NimNode + NetworkType*: NimNode + SerializationFormat*: NimNode + ResponderType*: NimNode + RequestResultsWrapper*: NimNode + + registerProtocol*: NimNode + setEventHandlers*: NimNode + + BackendFactory* = proc (p: P2PProtocol): Backend + + P2PBackendError* = object of CatchableError + InvalidMsgError* = object of P2PBackendError + +const + defaultReqTimeout = 10.seconds + tracingEnabled = defined(p2pdump) + +let + # Variable names affecting the public interface of the library: + peerVar* {.compileTime.} = ident "peer" + responseVar* {.compileTime.} = ident "response" + streamVar* {.compileTime.} = ident "stream" + protocolVar* {.compileTime.} = ident "protocol" + deadlineVar* {.compileTime.} = ident "deadline" + timeoutVar* {.compileTime.} = ident "timeout" + currentProtocolSym* {.compileTime.} = ident "CurrentProtocol" + resultIdent* {.compileTime.} = ident "result" + + # Locally used symbols: + Opt {.compileTime.} = ident "Opt" + Future {.compileTime.} = ident "Future" + Void {.compileTime.} = ident "void" + writeField {.compileTime.} = ident "writeField" + + PROTO {.compileTime.} = ident "PROTO" + MSG {.compileTime.} = ident "MSG" + +template Fut(T): auto = newTree(nnkBracketExpr, Future, T) + +proc initFuture*[T](loc: var Future[T]) = + loc = newFuture[T]() + +template applyDecorator(p: NimNode, decorator: NimNode) = + if decorator.kind != nnkNilLit: + p.pragma.insert(0, decorator) + +when tracingEnabled: + proc logSentMsgFields(peer: NimNode, + protocolInfo: NimNode, + msgName: string, + fields: openArray[NimNode]): NimNode = + ## This generates the tracing code inserted in the message sending procs + ## `fields` contains all the params that were serialized in the message + let + tracer = ident "tracer" + tracerStream = ident "tracerStream" + logMsgEventImpl = ident "logMsgEventImpl" + + result = quote do: + var `tracerStream` = memoryOutput() + var `tracer` = JsonWriter.init(`tracerStream`) + beginRecord(`tracer`) + + for f in fields: + result.add newCall(writeField, tracer, newLit($f), f) + + result.add quote do: + endRecord(`tracer`) + `logMsgEventImpl`("outgoing_msg", `peer`, + `protocolInfo`, `msgName`, + getOutput(`tracerStream`, string)) + +proc createPeerState[Peer, ProtocolState](peer: Peer): RootRef = + var res = new ProtocolState + mixin initProtocolState + initProtocolState(res, peer) + return cast[RootRef](res) + +proc expectBlockWithProcs*(n: NimNode): seq[NimNode] = + template helperName: auto = $n[0] + + if n.len != 2 or n[1].kind != nnkStmtList: + error(helperName & " expects a block", n) + + for p in n[1]: + if p.kind == nnkProcDef: + result.add p + elif p.kind == nnkCommentStmt: + continue + else: + error(helperName & " expects a proc definition.", p) + +proc nameOrNil*(procDef: NimNode): NimNode = + if procDef != nil: + procDef.name + else: + newNilLit() + +proc isOutputParamName(paramName: NimNode): bool = + eqIdent(paramName, "output") or eqIdent(paramName, "response") + +proc isOutputParam(param: NimNode): bool = + param.len > 0 and param[0].skipPragma.isOutputParamName + +proc getOutputParam(procDef: NimNode): NimNode = + let params = procDef.params + for i in countdown(params.len - 1, 1): + let param = params[i] + if isOutputParam(param): + return param + +proc outputParam*(msg: Message): NimNode = + case msg.kind + of msgRequest: + outputParam(msg.response) + of msgResponse: + msg.outputParamDef + else: + raiseAssert "Only requests (and the attached responses) can have output parameters" + +proc outputParamIdent*(msg: Message): NimNode = + let outputParam = msg.outputParam + if outputParam != nil: + return outputParam[0].skipPragma + +proc outputParamType*(msg: Message): NimNode = + let outputParam = msg.outputParam + if outputParam != nil: + return outputParam[1] + +proc refreshParam(n: NimNode): NimNode = + result = copyNimTree(n) + if n.kind == nnkIdentDefs: + for i in 0.. 1: + result = quote do: + mixin init, writerType, beginRecord, endRecord + + var `writer` = init(WriterType(`Format`), `outputStream`) + var `recordWriterCtx` = beginRecord(`writer`, `RecordType`) + `appendParams` + endRecord(`writer`, `recordWriterCtx`) + else: + let param = params[0] + + result = quote do: + var `writer` = init(WriterType(`Format`), `outputStream`) + writeValue(`writer`, `param`) + +proc useStandardBody*(sendProc: SendProc, + preSerializationStep: proc(stream: NimNode): NimNode, + postSerializationStep: proc(stream: NimNode): NimNode, + sendCallGenerator: proc (peer, bytes: NimNode): NimNode) = + let + msg = sendProc.msg + msgBytes = ident "msgBytes" + recipient = sendProc.peerParam + sendCall = sendCallGenerator(recipient, msgBytes) + + if sendProc.msgParams.len == 0: + sendProc.setBody quote do: + var `msgBytes`: seq[byte] + `sendCall` + return + + let + outputStream = ident "outputStream" + + msgRecName = msg.recName + Format = msg.protocol.backend.SerializationFormat + + preSerialization = if preSerializationStep.isNil: newStmtList() + else: preSerializationStep(outputStream) + + serialization = writeParamsAsRecord(sendProc.msgParams, + outputStream, Format, msgRecName) + + postSerialization = if postSerializationStep.isNil: newStmtList() + else: postSerializationStep(outputStream) + + tracing = when not tracingEnabled: + newStmtList() + else: + logSentMsgFields(recipient, + msg.protocol.protocolInfoVar, + $msg.ident, + sendProc.msgParams) + + sendProc.setBody quote do: + mixin init, WriterType, beginRecord, endRecord, getOutput + + var `outputStream` = memoryOutput() + `preSerialization` + `serialization` + `postSerialization` + `tracing` + let `msgBytes` = getOutput(`outputStream`) + `sendCall` + +proc correctSerializerProcParams(params: NimNode) = + # A serializer proc is just like a send proc, but: + # 1. it has a void return type + params[0] = ident "void" + # 2. The peer params is replaced with OutputStream + params[1] = newIdentDefs(streamVar, bindSym "OutputStream") + # 3. The timeout param is removed + params.del(params.len - 1) + +proc createSerializer*(msg: Message, procType = nnkProcDef): NimNode = + var serializer = msg.createSendProc(procType, nameSuffix = "Serializer") + correctSerializerProcParams serializer.def.params + + serializer.setBody writeParamsAsRecord( + serializer.msgParams, + streamVar, + msg.protocol.backend.SerializationFormat, + msg.recName) + + return serializer.def + +proc defineThunk*(msg: Message, thunk: NimNode) = + let protocol = msg.protocol + + case msg.kind + of msgRequest: thunk.applyDecorator protocol.incomingRequestThunkDecorator + of msgResponse: thunk.applyDecorator protocol.incomingResponseThunkDecorator + else: discard + + protocol.outRecvProcs.add thunk + +proc genUserHandlerCall*(msg: Message, receivedMsg: NimNode, + leadingParams: openArray[NimNode], + outputParam: NimNode = nil): NimNode = + if msg.userHandler == nil: + return newStmtList() + + result = newCall(msg.userHandler.name, leadingParams) + + if msg.needsSingleParamInlining: + result.add receivedMsg + else: + var params = toSeq(msg.procDef.typedInputParams(skip = 1)) + for p in params: + result.add newDotExpr(receivedMsg, p[0]) + + if outputParam != nil: + result.add outputParam + +proc genAwaitUserHandler*(msg: Message, receivedMsg: NimNode, + leadingParams: openArray[NimNode], + outputParam: NimNode = nil): NimNode = + result = msg.genUserHandlerCall(receivedMsg, leadingParams, outputParam) + if result.len > 0: result = newCall("await", result) + +proc appendAllInputParams*(node: NimNode, procDef: NimNode): NimNode = + result = node + for p, _ in procDef.typedInputParams(): + result.add p + +proc paramNames*(procDef: NimNode, skipFirst = 0): seq[NimNode] = + result = newSeq[NimNode]() + for name, _ in procDef.typedParams(skip = skipFirst): + result.add name + +proc netInit*(p: P2PProtocol): NimNode = + newNilLit() + # if p.NetworkStateType == nil: + # newNilLit() + # else: + # newTree(nnkBracketExpr, bindSym"createNetworkState", + # p.backend.NetworkType, + # p.NetworkStateType) + +proc createHandshakeTemplate*(msg: Message, + rawSendProc, handshakeImpl, + nextMsg: NimNode): SendProc = + let + handshakeExchanger = msg.createSendProc(procType = nnkTemplateDef) + forwardCall = newCall(rawSendProc).appendAllInputParams(handshakeExchanger.def) + peerValue = forwardCall[1] + msgRecName = msg.recName + + forwardCall[1] = peerVar + forwardCall.del(forwardCall.len - 1) + + let peerVar = genSym(nskLet ,"peer") + handshakeExchanger.setBody quote do: + let `peerVar` = `peerValue` + let sendingFuture = `forwardCall` + `handshakeImpl`(`peerVar`, + sendingFuture, + `nextMsg`(`peerVar`, `msgRecName`), + `timeoutVar`) + + return handshakeExchanger + +proc peerInit*(p: P2PProtocol): NimNode = + if p.PeerStateType == nil: + newNilLit() + else: + newTree(nnkBracketExpr, bindSym"createPeerState", + p.backend.PeerType, + p.PeerStateType) + +proc processProtocolBody*(p: P2PProtocol, protocolBody: NimNode) = + ## This procs handles all DSL statements valid inside a p2pProtocol. + ## + ## It will populate the protocol's fields such as: + ## * handshake + ## * requests + ## * notifications + ## * onPeerConnected + ## * onPeerDisconnected + ## + ## All messages will have properly computed numeric IDs + ## + for n in protocolBody: + case n.kind + of {nnkCall, nnkCommand}: + if eqIdent(n[0], "requestResponse"): + # `requestResponse` can be given a block of 2 or more procs. + # The last one is considered to be a response message, while + # all preceding ones are requests triggering the response. + # The system makes sure to automatically insert a hidden `reqId` + # parameter used to discriminate the individual messages. + let procs = expectBlockWithProcs(n) + if procs.len < 2: + error "requestResponse expects a block with at least two proc definitions" + + var queries = newSeq[Message]() + let responseMsg = p.newMsg(msgResponse, procs[^1]) + + for i in 0 .. procs.len - 2: + queries.add p.newMsg(msgRequest, procs[i], response = responseMsg) + + p.requests.add Request(queries: queries, response: responseMsg) + + elif eqIdent(n[0], "handshake"): + let procs = expectBlockWithProcs(n) + if procs.len != 1: + error "handshake expects a block with a single proc definition", n + + if p.handshake != nil: + error "The handshake for the protocol is already defined", n + + p.handshake = p.newMsg(msgHandshake, procs[0]) + + elif eqIdent(n[0], "onPeerConnected"): + p.onPeerConnected = p.eventHandlerToProc(n[1], "PeerConnected") + + elif eqIdent(n[0], "onPeerDisconnected"): + p.onPeerDisconnected = p.eventHandlerToProc(n[1], "PeerDisconnected") + + else: + error(repr(n) & " is not a recognized call in P2P protocol definitions", n) + + of nnkProcDef, nnkIteratorDef: + p.addMsg(n) + + of nnkCommentStmt: + discard + + else: + error "Illegal syntax in a P2P protocol definition", n + +proc genTypeSection*(p: P2PProtocol): NimNode = + var + protocolName = p.nameIdent + peerState = p.PeerStateType + networkState= p.NetworkStateType + + result = newStmtList() + result.add quote do: + # Create a type acting as a pseudo-object representing the protocol + # (e.g. p2p) + type `protocolName`* = object + + if peerState != nil: + result.add quote do: + template State*(`PROTO`: type `protocolName`): type = `peerState` + + if networkState != nil: + result.add quote do: + template NetworkState*(`PROTO`: type `protocolName`): type = `networkState` + + for msg in p.messages: + if msg.procDef == nil: + continue + + let + msgName = msg.ident + msgRecName = msg.recName + msgStrongRecName = msg.strongRecName + msgRecBody = msg.recBody + + result.add quote do: + # This is a type featuring a single field for each message param: + type `msgStrongRecName`* = `msgRecBody` + + # Add a helper template for accessing the message type: + # e.g. p2p.hello: + template `msgName`*(`PROTO`: type `protocolName`): type = `msgRecName` + + # Add a helper template for obtaining the message Id for + # a particular message type: + template msgProtocol*(`MSG`: type `msgStrongRecName`): type = `protocolName` + template RecType*(`MSG`: type `msgStrongRecName`): untyped = `msgRecName` + +proc genCode*(p: P2PProtocol): NimNode = + for msg in p.messages: + p.backend.implementMsg msg + + result = newStmtList() + result.add p.genTypeSection() + + let + protocolInfoVar = p.protocolInfoVar + protocolInfoVarObj = ident($protocolInfoVar & "Obj") + protocolName = p.nameIdent + protocolInit = p.backend.implementProtocolInit(p) + + result.add quote do: + # One global variable per protocol holds the protocol run-time data + var `protocolInfoVarObj` = `protocolInit` + let `protocolInfoVar` = addr `protocolInfoVarObj` + + # The protocol run-time data is available as a pseudo-field + # (e.g. `p2p.protocolInfo`) + template protocolInfo*(`PROTO`: type `protocolName`): auto = `protocolInfoVar` + + result.add p.outSendProcs, + p.outRecvProcs, + p.outProcRegistrations + + if p.onPeerConnected != nil: result.add p.onPeerConnected + if p.onPeerDisconnected != nil: result.add p.onPeerDisconnected + + result.add newCall(p.backend.setEventHandlers, + protocolInfoVar, + nameOrNil p.onPeerConnected, + nameOrNil p.onPeerDisconnected) + +macro emitForSingleBackend( + name: static[string], + version: static[int], + backend: static[BackendFactory], + body: untyped, + # TODO Nim can't handle a proper duration parameter here + timeouts: static[int64] = defaultReqTimeout.milliseconds, + outgoingRequestDecorator: untyped = nil, + incomingRequestDecorator: untyped = nil, + incomingRequestThunkDecorator: untyped = nil, + incomingResponseDecorator: untyped = nil, + incomingResponseThunkDecorator: untyped = nil, + peerState = type(nil), + networkState = type(nil)): untyped = + + var p = P2PProtocol.init( + backend, + name, version, body, timeouts, + outgoingRequestDecorator, + incomingRequestDecorator, + incomingRequestThunkDecorator, + incomingResponseDecorator, + incomingResponseThunkDecorator, + peerState.getType, networkState.getType) + + result = p.genCode() + try: + result.storeMacroResult true + except IOError: + # IO error so the generated nim code might not be stored, don't sweat it. + discard + +macro emitForAllBackends(backendSyms: typed, options: untyped, body: untyped): untyped = + let name = $(options[0]) + + var backends = newSeq[NimNode]() + if backendSyms.kind == nnkSym: + backends.add backendSyms + else: + for backend in backendSyms: + backends.add backend + + result = newStmtList() + + for backend in backends: + let call = copy options + call[0] = bindSym"emitForSingleBackend" + call.add newTree(nnkExprEqExpr, ident("name"), newLit(name)) + call.add newTree(nnkExprEqExpr, ident("backend"), backend) + call.add newTree(nnkExprEqExpr, ident("body"), body) + result.add call + +template p2pProtocol*(options: untyped, body: untyped) {.dirty.} = + bind emitForAllBackends + emitForAllBackends(p2pProtocolBackendImpl, options, body) + diff --git a/beacon_chain/networking/peer_protocol.nim b/beacon_chain/networking/peer_protocol.nim new file mode 100644 index 000000000..47725065c --- /dev/null +++ b/beacon_chain/networking/peer_protocol.nim @@ -0,0 +1,247 @@ +# beacon_chain +# Copyright (c) 2018-2024 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import + chronicles, + ../spec/network, + ".."/[beacon_clock], + ../networking/eth2_network, + ../consensus_object_pools/blockchain_dag, + ../rpc/rest_constants + +logScope: + topics = "peer_proto" + +type + StatusMsg* = object + forkDigest*: ForkDigest + finalizedRoot*: Eth2Digest + finalizedEpoch*: Epoch + headRoot*: Eth2Digest + headSlot*: Slot + + PeerSyncNetworkState* {.final.} = ref object of RootObj + dag: ChainDAGRef + cfg: RuntimeConfig + forkDigests: ref ForkDigests + genesisBlockRoot: Eth2Digest + getBeaconTime: GetBeaconTimeFn + + PeerSyncPeerState* {.final.} = ref object of RootObj + statusLastTime: chronos.Moment + statusMsg: StatusMsg + +func shortLog*(s: StatusMsg): auto = + ( + forkDigest: s.forkDigest, + finalizedRoot: shortLog(s.finalizedRoot), + finalizedEpoch: shortLog(s.finalizedEpoch), + headRoot: shortLog(s.headRoot), + headSlot: shortLog(s.headSlot) + ) +chronicles.formatIt(StatusMsg): shortLog(it) + +func disconnectReasonName(reason: uint64): string = + # haha, nim doesn't support uint64 in `case`! + if reason == uint64(ClientShutDown): "Client shutdown" + elif reason == uint64(IrrelevantNetwork): "Irrelevant network" + elif reason == uint64(FaultOrError): "Fault or error" + else: "Disconnected (" & $reason & ")" + +func forkDigestAtEpoch(state: PeerSyncNetworkState, + epoch: Epoch): ForkDigest = + state.forkDigests[].atEpoch(epoch, state.cfg) + +proc getCurrentStatus(state: PeerSyncNetworkState): StatusMsg = + let + dag = state.dag + wallSlot = state.getBeaconTime().slotOrZero + + if dag != nil: + StatusMsg( + forkDigest: state.forkDigestAtEpoch(wallSlot.epoch), + finalizedRoot: dag.finalizedHead.blck.root, + finalizedEpoch: dag.finalizedHead.slot.epoch, + headRoot: dag.head.root, + headSlot: dag.head.slot) + else: + StatusMsg( + forkDigest: state.forkDigestAtEpoch(wallSlot.epoch), + finalizedRoot: state.genesisBlockRoot, + finalizedEpoch: GENESIS_EPOCH, + headRoot: state.genesisBlockRoot, + headSlot: GENESIS_SLOT) + +proc checkStatusMsg(state: PeerSyncNetworkState, status: StatusMsg): + Result[void, cstring] = + let + dag = state.dag + wallSlot = (state.getBeaconTime() + MAXIMUM_GOSSIP_CLOCK_DISPARITY).slotOrZero + + if status.finalizedEpoch > status.headSlot.epoch: + # Can be equal during genesis or checkpoint start + return err("finalized epoch newer than head") + + if status.headSlot > wallSlot: + return err("head more recent than wall clock") + + if state.forkDigestAtEpoch(wallSlot.epoch) != status.forkDigest: + return err("fork digests differ") + + if dag != nil: + if status.finalizedEpoch <= dag.finalizedHead.slot.epoch: + let blockId = dag.getBlockIdAtSlot(status.finalizedEpoch.start_slot()) + if blockId.isSome and + (not status.finalizedRoot.isZero) and + status.finalizedRoot != blockId.get().bid.root: + return err("peer following different finality") + else: + if status.finalizedEpoch == GENESIS_EPOCH: + if status.finalizedRoot != state.genesisBlockRoot: + return err("peer following different finality") + + ok() + +proc handleStatus(peer: Peer, + state: PeerSyncNetworkState, + theirStatus: StatusMsg): Future[bool] {.gcsafe.} + +{.pop.} # TODO fix p2p macro for raises + +p2pProtocol PeerSync(version = 1, + networkState = PeerSyncNetworkState, + peerState = PeerSyncPeerState): + + onPeerConnected do (peer: Peer, incoming: bool) {.async.}: + debug "Peer connected", + peer, peerId = shortLog(peer.peerId), incoming + # Per the eth2 protocol, whoever dials must send a status message when + # connected for the first time, but because of how libp2p works, there may + # be a race between incoming and outgoing connections and disconnects that + # makes the incoming flag unreliable / obsolete by the time we get to + # this point - instead of making assumptions, we'll just send a status + # message redundantly. + # TODO(zah) + # the spec does not prohibit sending the extra status message on + # incoming connections, but it should not be necessary - this would + # need a dedicated flow in libp2p that resolves the race conditions - + # this needs more thinking around the ordering of events and the + # given incoming flag + let + ourStatus = peer.networkState.getCurrentStatus() + theirStatus = await peer.status(ourStatus, timeout = RESP_TIMEOUT_DUR) + + if theirStatus.isOk: + discard await peer.handleStatus(peer.networkState, theirStatus.get()) + else: + debug "Status response not received in time", + peer, errorKind = theirStatus.error.kind + await peer.disconnect(FaultOrError) + + proc status(peer: Peer, + theirStatus: StatusMsg, + response: SingleChunkResponse[StatusMsg]) + {.async, libp2pProtocol("status", 1).} = + let ourStatus = peer.networkState.getCurrentStatus() + trace "Sending status message", peer = peer, status = ourStatus + await response.send(ourStatus) + discard await peer.handleStatus(peer.networkState, theirStatus) + + proc ping(peer: Peer, value: uint64): uint64 + {.libp2pProtocol("ping", 1).} = + return peer.network.metadata.seq_number + + # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/p2p-interface.md#transitioning-from-v1-to-v2 + proc getMetaData(peer: Peer): uint64 + {.libp2pProtocol("metadata", 1).} = + raise newException(InvalidInputsError, "GetMetaData v1 unsupported") + + proc getMetadata_v2(peer: Peer): altair.MetaData + {.libp2pProtocol("metadata", 2).} = + return peer.network.metadata + + proc goodbye(peer: Peer, + reason: uint64) + {.async, libp2pProtocol("goodbye", 1).} = + debug "Received Goodbye message", reason = disconnectReasonName(reason), peer + +proc setStatusMsg(peer: Peer, statusMsg: StatusMsg) = + debug "Peer status", peer, statusMsg + peer.state(PeerSync).statusMsg = statusMsg + peer.state(PeerSync).statusLastTime = Moment.now() + +proc handleStatus(peer: Peer, + state: PeerSyncNetworkState, + theirStatus: StatusMsg): Future[bool] {.async.} = + let + res = checkStatusMsg(state, theirStatus) + + return if res.isErr(): + debug "Irrelevant peer", peer, theirStatus, err = res.error() + await peer.disconnect(IrrelevantNetwork) + false + else: + peer.setStatusMsg(theirStatus) + + if peer.connectionState == Connecting: + # As soon as we get here it means that we passed handshake succesfully. So + # we can add this peer to PeerPool. + await peer.handlePeer() + true + +proc updateStatus*(peer: Peer): Future[bool] {.async.} = + ## Request `status` of remote peer ``peer``. + let + nstate = peer.networkState(PeerSync) + ourStatus = getCurrentStatus(nstate) + + let theirFut = awaitne peer.status(ourStatus, timeout = RESP_TIMEOUT_DUR) + if theirFut.failed(): + return false + else: + let theirStatus = theirFut.read() + if theirStatus.isOk: + return await peer.handleStatus(nstate, theirStatus.get()) + else: + return false + +proc getHeadSlot*(peer: Peer): Slot = + ## Returns head slot for specific peer ``peer``. + peer.state(PeerSync).statusMsg.headSlot + +proc getFinalizedEpoch*(peer: Peer): Epoch = + ## Returns head slot for specific peer ``peer``. + peer.state(PeerSync).statusMsg.finalizedEpoch + +proc getStatusLastTime*(peer: Peer): chronos.Moment = + ## Returns head slot for specific peer ``peer``. + peer.state(PeerSync).statusLastTime + +proc init*(T: type PeerSync.NetworkState, + dag: ChainDAGRef, getBeaconTime: GetBeaconTimeFn): T = + T( + dag: dag, + cfg: dag.cfg, + forkDigests: dag.forkDigests, + genesisBlockRoot: dag.genesisBlockRoot, + getBeaconTime: getBeaconTime, + ) + +proc init*(T: type PeerSync.NetworkState, + cfg: RuntimeConfig, + forkDigests: ref ForkDigests, + genesisBlockRoot: Eth2Digest, + getBeaconTime: GetBeaconTimeFn): T = + T( + dag: nil, + cfg: cfg, + forkDigests: forkDigests, + genesisBlockRoot: genesisBlockRoot, + getBeaconTime: getBeaconTime, + ) diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index ab92366ec..1a21f9507 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -19,6 +19,7 @@ import ./rpc/[rest_api, state_ttl_cache], ./spec/datatypes/[altair, bellatrix, phase0], ./spec/[deposit_snapshots, engine_authentication, weak_subjectivity], + ./sync/[sync_protocol, light_client_protocol], ./validators/[keystore_management, beacon_validators], "."/[ beacon_node, beacon_node_light_client, deposits, @@ -516,7 +517,18 @@ proc initFullNode( # Here, we also set the correct ENR should we be in all subnets mode! node.network.updateStabilitySubnetMetadata(stabilitySubnets) - node.network.initBeaconSync(dag, getBeaconTime) + node.network.registerProtocol( + PeerSync, PeerSync.NetworkState.init( + node.dag, + node.beaconClock.getBeaconTimeFn(), + )) + + node.network.registerProtocol( + BeaconSync, BeaconSync.NetworkState.init(node.dag)) + + if node.dag.lcDataStore.serve: + node.network.registerProtocol( + LightClientSync, LightClientSync.NetworkState.init(node.dag)) node.updateValidatorMetrics() diff --git a/beacon_chain/nimbus_light_client.nim b/beacon_chain/nimbus_light_client.nim index 2ebfc8e0a..8cc6aafc7 100644 --- a/beacon_chain/nimbus_light_client.nim +++ b/beacon_chain/nimbus_light_client.nim @@ -135,7 +135,10 @@ programMain: elManager.start(syncChain = false) info "Listening to incoming network requests" - network.initBeaconSync(cfg, forkDigests, genesisBlockRoot, getBeaconTime) + network.registerProtocol( + PeerSync, PeerSync.NetworkState.init( + cfg, forkDigests, genesisBlockRoot, getBeaconTime)) + withAll(ConsensusFork): let forkDigest = forkDigests[].atConsensusFork(consensusFork) network.addValidator( diff --git a/beacon_chain/spec/datatypes/base.nim b/beacon_chain/spec/datatypes/base.nim index f8667b64d..dd3df5215 100644 --- a/beacon_chain/spec/datatypes/base.nim +++ b/beacon_chain/spec/datatypes/base.nim @@ -62,7 +62,7 @@ import std/[macros, hashes, sets, strutils, tables, typetraits], - stew/[assign2, byteutils, results], + stew/[assign2, byteutils, endians2, results], chronicles, json_serialization, ssz_serialization/types as sszTypes, @@ -70,7 +70,7 @@ import ".."/[beacon_time, crypto, digest, presets] export - tables, results, json_serialization, sszTypes, beacon_time, crypto, + tables, results, endians2, json_serialization, sszTypes, beacon_time, crypto, digest, presets const SPEC_VERSION* = "1.4.0-beta.5" diff --git a/beacon_chain/spec/eth2_apis/rest_builder_calls.nim b/beacon_chain/spec/eth2_apis/rest_builder_calls.nim index 2f6580f1f..4fd20fa2d 100644 --- a/beacon_chain/spec/eth2_apis/rest_builder_calls.nim +++ b/beacon_chain/spec/eth2_apis/rest_builder_calls.nim @@ -1,4 +1,4 @@ -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -7,11 +7,11 @@ {.push raises: [].} import - chronos, presto/client, chronicles, + presto/client, chronicles, ".."/[helpers, forks, eth2_ssz_serialization], "."/[rest_types, rest_common, eth2_rest_serialization] -export chronos, client, rest_types, eth2_rest_serialization +export client, rest_types, eth2_rest_serialization proc getNextWithdrawals*(state_id: StateIdent ): RestResponse[GetNextWithdrawalsResponse] {. diff --git a/beacon_chain/spec/eth2_apis/rest_config_calls.nim b/beacon_chain/spec/eth2_apis/rest_config_calls.nim index 884c8b0be..a077fe3ef 100644 --- a/beacon_chain/spec/eth2_apis/rest_config_calls.nim +++ b/beacon_chain/spec/eth2_apis/rest_config_calls.nim @@ -7,10 +7,10 @@ {.push raises: [].} import - chronos, presto/client, + presto/client, "."/[rest_types, eth2_rest_serialization] -export chronos, client, rest_types, eth2_rest_serialization +export client, rest_types, eth2_rest_serialization proc getForkSchedulePlain*(): RestPlainResponse {. rest, endpoint: "/eth/v1/config/fork_schedule", meth: MethodGet.} diff --git a/beacon_chain/spec/eth2_apis/rest_event_calls.nim b/beacon_chain/spec/eth2_apis/rest_event_calls.nim index e086ee855..9fcadae07 100644 --- a/beacon_chain/spec/eth2_apis/rest_event_calls.nim +++ b/beacon_chain/spec/eth2_apis/rest_event_calls.nim @@ -1,4 +1,4 @@ -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -7,7 +7,7 @@ {.push raises: [].} import - chronos, presto/client, + presto/client, "."/[rest_types, eth2_rest_serialization] proc subscribeEventStream*(topics: set[EventTopic]): RestHttpResponseRef {. diff --git a/beacon_chain/spec/eth2_apis/rest_keymanager_calls.nim b/beacon_chain/spec/eth2_apis/rest_keymanager_calls.nim index f6b6b79fb..b3910ded1 100644 --- a/beacon_chain/spec/eth2_apis/rest_keymanager_calls.nim +++ b/beacon_chain/spec/eth2_apis/rest_keymanager_calls.nim @@ -7,13 +7,13 @@ {.push raises: [].} import - chronos, presto/client, chronicles, + presto/client, chronicles, ".."/".."/validators/slashing_protection_common, ".."/datatypes/[phase0, altair], ".."/[helpers, forks, keystore, eth2_ssz_serialization], "."/[rest_types, rest_common, rest_keymanager_types, eth2_rest_serialization] -export chronos, client, rest_types, eth2_rest_serialization, +export client, rest_types, eth2_rest_serialization, rest_keymanager_types UUID.serializesAsBaseIn RestJson diff --git a/beacon_chain/spec/eth2_apis/rest_light_client_calls.nim b/beacon_chain/spec/eth2_apis/rest_light_client_calls.nim index 415dd7b9d..01f1aa010 100644 --- a/beacon_chain/spec/eth2_apis/rest_light_client_calls.nim +++ b/beacon_chain/spec/eth2_apis/rest_light_client_calls.nim @@ -1,4 +1,4 @@ -# Copyright (c) 2023 Status Research & Development GmbH +# Copyright (c) 2023-2024 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). diff --git a/beacon_chain/spec/eth2_apis/rest_nimbus_calls.nim b/beacon_chain/spec/eth2_apis/rest_nimbus_calls.nim index 95cadafef..9e36d18a2 100644 --- a/beacon_chain/spec/eth2_apis/rest_nimbus_calls.nim +++ b/beacon_chain/spec/eth2_apis/rest_nimbus_calls.nim @@ -7,7 +7,7 @@ {.push raises: [].} import - chronos, chronicles, presto/client, + chronicles, presto/client, "."/[rest_types, eth2_rest_serialization, rest_common] proc getValidatorsActivity*(epoch: Epoch, diff --git a/beacon_chain/spec/eth2_apis/rest_remote_signer_calls.nim b/beacon_chain/spec/eth2_apis/rest_remote_signer_calls.nim index 76349f943..3e43aa587 100644 --- a/beacon_chain/spec/eth2_apis/rest_remote_signer_calls.nim +++ b/beacon_chain/spec/eth2_apis/rest_remote_signer_calls.nim @@ -8,13 +8,11 @@ import chronicles, metrics, - chronos, chronos/apps/http/httpclient, presto, presto/client, - serialization, json_serialization, - json_serialization/std/[net, sets], + chronos, presto/client, stew/[results, base10, byteutils], "."/[rest_types, eth2_rest_serialization] -export chronos, httpclient, client, rest_types, eth2_rest_serialization, results +export chronos, client, rest_types, eth2_rest_serialization, results type Web3SignerErrorKind* {.pure.} = enum diff --git a/beacon_chain/spec/eth2_apis/rest_validator_calls.nim b/beacon_chain/spec/eth2_apis/rest_validator_calls.nim index 4ee50e881..402500da1 100644 --- a/beacon_chain/spec/eth2_apis/rest_validator_calls.nim +++ b/beacon_chain/spec/eth2_apis/rest_validator_calls.nim @@ -7,10 +7,10 @@ {.push raises: [].} import - chronos, presto/client, + presto/client, "."/[rest_types, eth2_rest_serialization] -export chronos, client, rest_types, eth2_rest_serialization +export client, rest_types, eth2_rest_serialization proc getAttesterDutiesPlain*( epoch: Epoch, diff --git a/beacon_chain/sync/light_client_manager.nim b/beacon_chain/sync/light_client_manager.nim index 141a4891d..c9cfd717b 100644 --- a/beacon_chain/sync/light_client_manager.nim +++ b/beacon_chain/sync/light_client_manager.nim @@ -12,7 +12,7 @@ import ../spec/network, ../networking/eth2_network, ../beacon_clock, - "."/[light_client_sync_helpers, sync_protocol, sync_manager] + "."/[light_client_sync_helpers, light_client_protocol, sync_manager] export sync_manager logScope: diff --git a/beacon_chain/sync/light_client_protocol.nim b/beacon_chain/sync/light_client_protocol.nim new file mode 100644 index 000000000..d4aa4a47f --- /dev/null +++ b/beacon_chain/sync/light_client_protocol.nim @@ -0,0 +1,192 @@ +# beacon_chain +# Copyright (c) 2018-2024 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [].} + +import + chronicles, chronos, snappy, snappy/codec, + ../spec/[helpers, forks, network], + ../networking/eth2_network, + ../consensus_object_pools/blockchain_dag, + ../rpc/rest_constants + +logScope: + topics = "lc_proto" + +const + lightClientBootstrapResponseCost = allowedOpsPerSecondCost(1) + ## Only one bootstrap per peer should ever be needed - no need to allow more + lightClientUpdateResponseCost = allowedOpsPerSecondCost(1000) + ## Updates are tiny - we can allow lots of them + lightClientFinalityUpdateResponseCost = allowedOpsPerSecondCost(100) + lightClientOptimisticUpdateResponseCost = allowedOpsPerSecondCost(100) + +type + LightClientNetworkState* {.final.} = ref object of RootObj + dag*: ChainDAGRef + +proc readChunkPayload*( + conn: Connection, peer: Peer, MsgType: type SomeForkedLightClientObject): + Future[NetRes[MsgType]] {.async.} = + var contextBytes: ForkDigest + try: + await conn.readExactly(addr contextBytes, sizeof contextBytes) + except CatchableError: + return neterr UnexpectedEOF + let contextFork = + peer.network.forkDigests[].consensusForkForDigest(contextBytes).valueOr: + return neterr InvalidContextBytes + + withLcDataFork(lcDataForkAtConsensusFork(contextFork)): + when lcDataFork > LightClientDataFork.None: + let res = await eth2_network.readChunkPayload( + conn, peer, MsgType.Forky(lcDataFork)) + if res.isOk: + if contextFork != + peer.network.cfg.consensusForkAtEpoch(res.get.contextEpoch): + return neterr InvalidContextBytes + return ok MsgType.init(res.get) + else: + return err(res.error) + else: + return neterr InvalidContextBytes + +{.pop.} + +func forkDigestAtEpoch(state: LightClientNetworkState, + epoch: Epoch): ForkDigest = + state.dag.forkDigests[].atEpoch(epoch, state.dag.cfg) + +p2pProtocol LightClientSync(version = 1, + networkState = LightClientNetworkState): + # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#getlightclientbootstrap + proc lightClientBootstrap( + peer: Peer, + blockRoot: Eth2Digest, + response: SingleChunkResponse[ForkedLightClientBootstrap]) + {.async, libp2pProtocol("light_client_bootstrap", 1).} = + trace "Received LC bootstrap request", peer, blockRoot + let dag = peer.networkState.dag + doAssert dag.lcDataStore.serve + + let bootstrap = dag.getLightClientBootstrap(blockRoot) + withForkyBootstrap(bootstrap): + when lcDataFork > LightClientDataFork.None: + let + contextEpoch = forkyBootstrap.contextEpoch + contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data + + # TODO extract from libp2pProtocol + peer.awaitQuota( + lightClientBootstrapResponseCost, + "light_client_bootstrap/1") + await response.sendSSZ(forkyBootstrap, contextBytes) + else: + raise newException(ResourceUnavailableError, LCBootstrapUnavailable) + + debug "LC bootstrap request done", peer, blockRoot + + # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#lightclientupdatesbyrange + proc lightClientUpdatesByRange( + peer: Peer, + startPeriod: SyncCommitteePeriod, + reqCount: uint64, + response: MultipleChunksResponse[ + ForkedLightClientUpdate, MAX_REQUEST_LIGHT_CLIENT_UPDATES]) + {.async, libp2pProtocol("light_client_updates_by_range", 1).} = + trace "Received LC updates by range request", peer, startPeriod, reqCount + let dag = peer.networkState.dag + doAssert dag.lcDataStore.serve + + let + headPeriod = dag.head.slot.sync_committee_period + # Limit number of updates in response + maxSupportedCount = + if startPeriod > headPeriod: + 0'u64 + else: + min(headPeriod + 1 - startPeriod, MAX_REQUEST_LIGHT_CLIENT_UPDATES) + count = min(reqCount, maxSupportedCount) + onePastPeriod = startPeriod + count + + var found = 0 + for period in startPeriod.. LightClientDataFork.None: + let + contextEpoch = forkyUpdate.contextEpoch + contextBytes = + peer.networkState.forkDigestAtEpoch(contextEpoch).data + + # TODO extract from libp2pProtocol + peer.awaitQuota( + lightClientUpdateResponseCost, + "light_client_updates_by_range/1") + await response.writeSSZ(forkyUpdate, contextBytes) + inc found + else: + discard + + debug "LC updates by range request done", peer, startPeriod, count, found + + # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#getlightclientfinalityupdate + proc lightClientFinalityUpdate( + peer: Peer, + response: SingleChunkResponse[ForkedLightClientFinalityUpdate]) + {.async, libp2pProtocol("light_client_finality_update", 1).} = + trace "Received LC finality update request", peer + let dag = peer.networkState.dag + doAssert dag.lcDataStore.serve + + let finality_update = dag.getLightClientFinalityUpdate() + withForkyFinalityUpdate(finality_update): + when lcDataFork > LightClientDataFork.None: + let + contextEpoch = forkyFinalityUpdate.contextEpoch + contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data + + # TODO extract from libp2pProtocol + peer.awaitQuota( + lightClientFinalityUpdateResponseCost, + "light_client_finality_update/1") + await response.sendSSZ(forkyFinalityUpdate, contextBytes) + else: + raise newException(ResourceUnavailableError, LCFinUpdateUnavailable) + + debug "LC finality update request done", peer + + # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#getlightclientoptimisticupdate + proc lightClientOptimisticUpdate( + peer: Peer, + response: SingleChunkResponse[ForkedLightClientOptimisticUpdate]) + {.async, libp2pProtocol("light_client_optimistic_update", 1).} = + trace "Received LC optimistic update request", peer + let dag = peer.networkState.dag + doAssert dag.lcDataStore.serve + + let optimistic_update = dag.getLightClientOptimisticUpdate() + withForkyOptimisticUpdate(optimistic_update): + when lcDataFork > LightClientDataFork.None: + let + contextEpoch = forkyOptimisticUpdate.contextEpoch + contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data + + # TODO extract from libp2pProtocol + peer.awaitQuota( + lightClientOptimisticUpdateResponseCost, + "light_client_optimistic_update/1") + await response.sendSSZ(forkyOptimisticUpdate, contextBytes) + else: + raise newException(ResourceUnavailableError, LCOptUpdateUnavailable) + + debug "LC optimistic update request done", peer + +proc init*(T: type LightClientSync.NetworkState, dag: ChainDAGRef): T = + T( + dag: dag, + ) diff --git a/beacon_chain/sync/sync_manager.nim b/beacon_chain/sync/sync_manager.nim index 001dc3f59..39f656bf5 100644 --- a/beacon_chain/sync/sync_manager.nim +++ b/beacon_chain/sync/sync_manager.nim @@ -7,7 +7,7 @@ {.push raises: [].} -import std/[heapqueue, tables, strutils, sequtils, algorithm] +import std/[strutils, sequtils, algorithm] import stew/[results, base10], chronos, chronicles import ../spec/datatypes/[phase0, altair], @@ -287,7 +287,7 @@ proc syncStep[A, B](man: SyncManager[A, B], index: int, peer: A) {.async.} = remote_head_slot = peerSlot, local_head_slot = headSlot let - peerStatusAge = Moment.now() - peer.state(BeaconSync).statusLastTime + peerStatusAge = Moment.now() - peer.getStatusLastTime() needsUpdate = # Latest status we got is old peerStatusAge >= StatusExpirationTime or diff --git a/beacon_chain/sync/sync_protocol.nim b/beacon_chain/sync/sync_protocol.nim index e88d234ab..c1b13f30f 100644 --- a/beacon_chain/sync/sync_protocol.nim +++ b/beacon_chain/sync/sync_protocol.nim @@ -8,9 +8,7 @@ {.push raises: [].} import - std/[tables, sets, macros], chronicles, chronos, snappy, snappy/codec, - libp2p/switch, ../spec/datatypes/[phase0, altair, bellatrix, capella, deneb], ../spec/[helpers, forks, network], ".."/[beacon_clock], @@ -19,47 +17,16 @@ import ../rpc/rest_constants logScope: - topics = "sync" + topics = "sync_proto" const blockResponseCost = allowedOpsPerSecondCost(64) # Allow syncing ~64 blocks/sec (minus request costs) - lightClientBootstrapResponseCost = allowedOpsPerSecondCost(1) - ## Only one bootstrap per peer should ever be needed - no need to allow more - lightClientUpdateResponseCost = allowedOpsPerSecondCost(1000) - ## Updates are tiny - we can allow lots of them - lightClientFinalityUpdateResponseCost = allowedOpsPerSecondCost(100) - lightClientOptimisticUpdateResponseCost = allowedOpsPerSecondCost(100) - type - StatusMsg* = object - forkDigest*: ForkDigest - finalizedRoot*: Eth2Digest - finalizedEpoch*: Epoch - headRoot*: Eth2Digest - headSlot*: Slot - - ValidatorSetDeltaFlags {.pure.} = enum - Activation = 0 - Exit = 1 - - ValidatorChangeLogEntry* = object - case kind*: ValidatorSetDeltaFlags - of Activation: - pubkey: ValidatorPubKey - else: - index: uint32 - - BeaconSyncNetworkState = ref object + BeaconSyncNetworkState* {.final.} = ref object of RootObj dag: ChainDAGRef cfg: RuntimeConfig - forkDigests: ref ForkDigests genesisBlockRoot: Eth2Digest - getBeaconTime: GetBeaconTimeFn - - BeaconSyncPeerState* = ref object - statusLastTime*: chronos.Moment - statusMsg*: StatusMsg BlockRootSlot* = object blockRoot: Eth2Digest @@ -133,163 +100,10 @@ proc readChunkPayload*( else: return neterr InvalidContextBytes -proc readChunkPayload*( - conn: Connection, peer: Peer, MsgType: type SomeForkedLightClientObject): - Future[NetRes[MsgType]] {.async.} = - var contextBytes: ForkDigest - try: - await conn.readExactly(addr contextBytes, sizeof contextBytes) - except CatchableError: - return neterr UnexpectedEOF - let contextFork = - peer.network.forkDigests[].consensusForkForDigest(contextBytes).valueOr: - return neterr InvalidContextBytes - - withLcDataFork(lcDataForkAtConsensusFork(contextFork)): - when lcDataFork > LightClientDataFork.None: - let res = await eth2_network.readChunkPayload( - conn, peer, MsgType.Forky(lcDataFork)) - if res.isOk: - if contextFork != - peer.network.cfg.consensusForkAtEpoch(res.get.contextEpoch): - return neterr InvalidContextBytes - return ok MsgType.init(res.get) - else: - return err(res.error) - else: - return neterr InvalidContextBytes - -func shortLog*(s: StatusMsg): auto = - ( - forkDigest: s.forkDigest, - finalizedRoot: shortLog(s.finalizedRoot), - finalizedEpoch: shortLog(s.finalizedEpoch), - headRoot: shortLog(s.headRoot), - headSlot: shortLog(s.headSlot) - ) -chronicles.formatIt(StatusMsg): shortLog(it) - -func disconnectReasonName(reason: uint64): string = - # haha, nim doesn't support uint64 in `case`! - if reason == uint64(ClientShutDown): "Client shutdown" - elif reason == uint64(IrrelevantNetwork): "Irrelevant network" - elif reason == uint64(FaultOrError): "Fault or error" - else: "Disconnected (" & $reason & ")" - -func forkDigestAtEpoch(state: BeaconSyncNetworkState, - epoch: Epoch): ForkDigest = - state.forkDigests[].atEpoch(epoch, state.cfg) - -proc getCurrentStatus(state: BeaconSyncNetworkState): StatusMsg = - let - dag = state.dag - wallSlot = state.getBeaconTime().slotOrZero - - if dag != nil: - StatusMsg( - forkDigest: state.forkDigestAtEpoch(wallSlot.epoch), - finalizedRoot: dag.finalizedHead.blck.root, - finalizedEpoch: dag.finalizedHead.slot.epoch, - headRoot: dag.head.root, - headSlot: dag.head.slot) - else: - StatusMsg( - forkDigest: state.forkDigestAtEpoch(wallSlot.epoch), - finalizedRoot: state.genesisBlockRoot, - finalizedEpoch: GENESIS_EPOCH, - headRoot: state.genesisBlockRoot, - headSlot: GENESIS_SLOT) - -proc checkStatusMsg(state: BeaconSyncNetworkState, status: StatusMsg): - Result[void, cstring] = - let - dag = state.dag - wallSlot = (state.getBeaconTime() + MAXIMUM_GOSSIP_CLOCK_DISPARITY).slotOrZero - - if status.finalizedEpoch > status.headSlot.epoch: - # Can be equal during genesis or checkpoint start - return err("finalized epoch newer than head") - - if status.headSlot > wallSlot: - return err("head more recent than wall clock") - - if state.forkDigestAtEpoch(wallSlot.epoch) != status.forkDigest: - return err("fork digests differ") - - if dag != nil: - if status.finalizedEpoch <= dag.finalizedHead.slot.epoch: - let blockId = dag.getBlockIdAtSlot(status.finalizedEpoch.start_slot()) - if blockId.isSome and - (not status.finalizedRoot.isZero) and - status.finalizedRoot != blockId.get().bid.root: - return err("peer following different finality") - else: - if status.finalizedEpoch == GENESIS_EPOCH: - if status.finalizedRoot != state.genesisBlockRoot: - return err("peer following different finality") - - ok() - -proc handleStatus(peer: Peer, - state: BeaconSyncNetworkState, - theirStatus: StatusMsg): Future[bool] {.gcsafe.} - -proc setStatusMsg(peer: Peer, statusMsg: StatusMsg) {.gcsafe.} - {.pop.} # TODO fix p2p macro for raises p2pProtocol BeaconSync(version = 1, - networkState = BeaconSyncNetworkState, - peerState = BeaconSyncPeerState): - - onPeerConnected do (peer: Peer, incoming: bool) {.async.}: - debug "Peer connected", - peer, peerId = shortLog(peer.peerId), incoming - # Per the eth2 protocol, whoever dials must send a status message when - # connected for the first time, but because of how libp2p works, there may - # be a race between incoming and outgoing connections and disconnects that - # makes the incoming flag unreliable / obsolete by the time we get to - # this point - instead of making assumptions, we'll just send a status - # message redundantly. - # TODO(zah) - # the spec does not prohibit sending the extra status message on - # incoming connections, but it should not be necessary - this would - # need a dedicated flow in libp2p that resolves the race conditions - - # this needs more thinking around the ordering of events and the - # given incoming flag - let - ourStatus = peer.networkState.getCurrentStatus() - theirStatus = await peer.status(ourStatus, timeout = RESP_TIMEOUT_DUR) - - if theirStatus.isOk: - discard await peer.handleStatus(peer.networkState, theirStatus.get()) - else: - debug "Status response not received in time", - peer, errorKind = theirStatus.error.kind - await peer.disconnect(FaultOrError) - - proc status(peer: Peer, - theirStatus: StatusMsg, - response: SingleChunkResponse[StatusMsg]) - {.async, libp2pProtocol("status", 1, isRequired = true).} = - let ourStatus = peer.networkState.getCurrentStatus() - trace "Sending status message", peer = peer, status = ourStatus - await response.send(ourStatus) - discard await peer.handleStatus(peer.networkState, theirStatus) - - proc ping(peer: Peer, value: uint64): uint64 - {.libp2pProtocol("ping", 1, isRequired = true).} = - return peer.network.metadata.seq_number - - # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/p2p-interface.md#transitioning-from-v1-to-v2 - proc getMetaData(peer: Peer): uint64 - {.libp2pProtocol("metadata", 1, isRequired = true).} = - raise newException(InvalidInputsError, "GetMetaData v1 unsupported") - - proc getMetadata_v2(peer: Peer): altair.MetaData - {.libp2pProtocol("metadata", 2, isRequired = true).} = - return peer.network.metadata - + networkState = BeaconSyncNetworkState): proc beaconBlocksByRange_v2( peer: Peer, startSlot: Slot, @@ -352,7 +166,7 @@ p2pProtocol BeaconSync(version = 1, await response.writeBytesSZ( uncompressedLen, bytes, - peer.networkState.forkDigestAtEpoch(blocks[i].slot.epoch).data) + peer.network.forkDigestAtEpoch(blocks[i].slot.epoch).data) inc found @@ -414,14 +228,13 @@ p2pProtocol BeaconSync(version = 1, await response.writeBytesSZ( uncompressedLen, bytes, - peer.networkState.forkDigestAtEpoch(blockRef.slot.epoch).data) + peer.network.forkDigestAtEpoch(blockRef.slot.epoch).data) inc found debug "Block root request done", peer, roots = blockRoots.len, count, found - # https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/deneb/p2p-interface.md#blobsidecarsbyroot-v1 proc blobSidecarsByRoot( peer: Peer, @@ -468,7 +281,7 @@ p2pProtocol BeaconSync(version = 1, await response.writeBytesSZ( uncompressedLen, bytes, - peer.networkState.forkDigestAtEpoch(blockRef.slot.epoch).data) + peer.network.forkDigestAtEpoch(blockRef.slot.epoch).data) inc found debug "Blob root request done", @@ -538,7 +351,7 @@ p2pProtocol BeaconSync(version = 1, await response.writeBytesSZ( uncompressedLen, bytes, - peer.networkState.forkDigestAtEpoch(blockIds[i].slot.epoch).data) + peer.network.forkDigestAtEpoch(blockIds[i].slot.epoch).data) inc found else: break @@ -546,203 +359,7 @@ p2pProtocol BeaconSync(version = 1, debug "BlobSidecar range request done", peer, startSlot, count = reqCount, found - # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#getlightclientbootstrap - proc lightClientBootstrap( - peer: Peer, - blockRoot: Eth2Digest, - response: SingleChunkResponse[ForkedLightClientBootstrap]) - {.async, libp2pProtocol("light_client_bootstrap", 1, - isLightClientRequest = true).} = - trace "Received LC bootstrap request", peer, blockRoot - let dag = peer.networkState.dag - doAssert dag.lcDataStore.serve - - let bootstrap = dag.getLightClientBootstrap(blockRoot) - withForkyBootstrap(bootstrap): - when lcDataFork > LightClientDataFork.None: - let - contextEpoch = forkyBootstrap.contextEpoch - contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data - - # TODO extract from libp2pProtocol - peer.awaitQuota( - lightClientBootstrapResponseCost, - "light_client_bootstrap/1") - await response.sendSSZ(forkyBootstrap, contextBytes) - else: - raise newException(ResourceUnavailableError, LCBootstrapUnavailable) - - debug "LC bootstrap request done", peer, blockRoot - - # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#lightclientupdatesbyrange - proc lightClientUpdatesByRange( - peer: Peer, - startPeriod: SyncCommitteePeriod, - reqCount: uint64, - response: MultipleChunksResponse[ - ForkedLightClientUpdate, MAX_REQUEST_LIGHT_CLIENT_UPDATES]) - {.async, libp2pProtocol("light_client_updates_by_range", 1, - isLightClientRequest = true).} = - trace "Received LC updates by range request", peer, startPeriod, reqCount - let dag = peer.networkState.dag - doAssert dag.lcDataStore.serve - - let - headPeriod = dag.head.slot.sync_committee_period - # Limit number of updates in response - maxSupportedCount = - if startPeriod > headPeriod: - 0'u64 - else: - min(headPeriod + 1 - startPeriod, MAX_REQUEST_LIGHT_CLIENT_UPDATES) - count = min(reqCount, maxSupportedCount) - onePastPeriod = startPeriod + count - - var found = 0 - for period in startPeriod.. LightClientDataFork.None: - let - contextEpoch = forkyUpdate.contextEpoch - contextBytes = - peer.networkState.forkDigestAtEpoch(contextEpoch).data - - # TODO extract from libp2pProtocol - peer.awaitQuota( - lightClientUpdateResponseCost, - "light_client_updates_by_range/1") - await response.writeSSZ(forkyUpdate, contextBytes) - inc found - else: - discard - - debug "LC updates by range request done", peer, startPeriod, count, found - - # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#getlightclientfinalityupdate - proc lightClientFinalityUpdate( - peer: Peer, - response: SingleChunkResponse[ForkedLightClientFinalityUpdate]) - {.async, libp2pProtocol("light_client_finality_update", 1, - isLightClientRequest = true).} = - trace "Received LC finality update request", peer - let dag = peer.networkState.dag - doAssert dag.lcDataStore.serve - - let finality_update = dag.getLightClientFinalityUpdate() - withForkyFinalityUpdate(finality_update): - when lcDataFork > LightClientDataFork.None: - let - contextEpoch = forkyFinalityUpdate.contextEpoch - contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data - - # TODO extract from libp2pProtocol - peer.awaitQuota( - lightClientFinalityUpdateResponseCost, - "light_client_finality_update/1") - await response.sendSSZ(forkyFinalityUpdate, contextBytes) - else: - raise newException(ResourceUnavailableError, LCFinUpdateUnavailable) - - debug "LC finality update request done", peer - - # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/altair/light-client/p2p-interface.md#getlightclientoptimisticupdate - proc lightClientOptimisticUpdate( - peer: Peer, - response: SingleChunkResponse[ForkedLightClientOptimisticUpdate]) - {.async, libp2pProtocol("light_client_optimistic_update", 1, - isLightClientRequest = true).} = - trace "Received LC optimistic update request", peer - let dag = peer.networkState.dag - doAssert dag.lcDataStore.serve - - let optimistic_update = dag.getLightClientOptimisticUpdate() - withForkyOptimisticUpdate(optimistic_update): - when lcDataFork > LightClientDataFork.None: - let - contextEpoch = forkyOptimisticUpdate.contextEpoch - contextBytes = peer.networkState.forkDigestAtEpoch(contextEpoch).data - - # TODO extract from libp2pProtocol - peer.awaitQuota( - lightClientOptimisticUpdateResponseCost, - "light_client_optimistic_update/1") - await response.sendSSZ(forkyOptimisticUpdate, contextBytes) - else: - raise newException(ResourceUnavailableError, LCOptUpdateUnavailable) - - debug "LC optimistic update request done", peer - - proc goodbye(peer: Peer, - reason: uint64) - {.async, libp2pProtocol("goodbye", 1, isRequired = true).} = - debug "Received Goodbye message", reason = disconnectReasonName(reason), peer - -proc setStatusMsg(peer: Peer, statusMsg: StatusMsg) = - debug "Peer status", peer, statusMsg - peer.state(BeaconSync).statusMsg = statusMsg - peer.state(BeaconSync).statusLastTime = Moment.now() - -proc handleStatus(peer: Peer, - state: BeaconSyncNetworkState, - theirStatus: StatusMsg): Future[bool] {.async, gcsafe.} = - let - res = checkStatusMsg(state, theirStatus) - - return if res.isErr(): - debug "Irrelevant peer", peer, theirStatus, err = res.error() - await peer.disconnect(IrrelevantNetwork) - false - else: - peer.setStatusMsg(theirStatus) - - if peer.connectionState == Connecting: - # As soon as we get here it means that we passed handshake succesfully. So - # we can add this peer to PeerPool. - await peer.handlePeer() - true - -proc updateStatus*(peer: Peer): Future[bool] {.async.} = - ## Request `status` of remote peer ``peer``. - let - nstate = peer.networkState(BeaconSync) - ourStatus = getCurrentStatus(nstate) - - let theirFut = awaitne peer.status(ourStatus, timeout = RESP_TIMEOUT_DUR) - if theirFut.failed(): - return false - else: - let theirStatus = theirFut.read() - if theirStatus.isOk: - return await peer.handleStatus(nstate, theirStatus.get()) - else: - return false - -proc getHeadSlot*(peer: Peer): Slot = - ## Returns head slot for specific peer ``peer``. - peer.state(BeaconSync).statusMsg.headSlot - -proc getFinalizedEpoch*(peer: Peer): Epoch = - ## Returns head slot for specific peer ``peer``. - peer.state(BeaconSync).statusMsg.finalizedEpoch - -proc initBeaconSync*(network: Eth2Node, dag: ChainDAGRef, - getBeaconTime: GetBeaconTimeFn) = - var networkState = network.protocolState(BeaconSync) - networkState.dag = dag - networkState.cfg = dag.cfg - networkState.forkDigests = dag.forkDigests - networkState.genesisBlockRoot = dag.genesisBlockRoot - networkState.getBeaconTime = getBeaconTime - -proc initBeaconSync*(network: Eth2Node, - cfg: RuntimeConfig, - forkDigests: ref ForkDigests, - genesisBlockRoot: Eth2Digest, - getBeaconTime: GetBeaconTimeFn) = - var networkState = network.protocolState(BeaconSync) - networkState.dag = nil - networkState.cfg = cfg - networkState.forkDigests = forkDigests - networkState.genesisBlockRoot = genesisBlockRoot - networkState.getBeaconTime = getBeaconTime +proc init*(T: type BeaconSync.NetworkState, dag: ChainDAGRef): T = + T( + dag: dag, + ) diff --git a/beacon_chain/validators/keystore_management.nim b/beacon_chain/validators/keystore_management.nim index 721bbbc46..0bf413a0e 100644 --- a/beacon_chain/validators/keystore_management.nim +++ b/beacon_chain/validators/keystore_management.nim @@ -940,7 +940,7 @@ func mapErrTo*[T, E](r: Result[T, E], v: static KeystoreGenerationErrorKind): KeystoreGenerationError(kind: v, error: $e)) proc loadNetKeystore*(keystorePath: string, - insecurePwd: Option[string]): Opt[lcrypto.PrivateKey] = + insecurePwd: Opt[string]): Opt[lcrypto.PrivateKey] = if not(checkSensitiveFilePermissions(keystorePath)): error "Network keystorage file has insecure permissions", @@ -984,7 +984,7 @@ proc loadNetKeystore*(keystorePath: string, return proc saveNetKeystore*(rng: var HmacDrbgContext, keystorePath: string, - netKey: lcrypto.PrivateKey, insecurePwd: Option[string] + netKey: lcrypto.PrivateKey, insecurePwd: Opt[string] ): Result[void, KeystoreGenerationError] = let password = if insecurePwd.isSome(): @@ -2092,7 +2092,7 @@ proc loadWallet*(fileName: string): Result[Wallet, string] = err "Error accessing wallet file \"" & fileName & "\": " & err.msg proc findWallet*(config: BeaconNodeConf, - name: WalletName): Result[Option[WalletPathPair], string] = + name: WalletName): Result[Opt[WalletPathPair], string] = var walletFiles = newSeq[string]() try: for kind, walletFile in walkDir(config.walletsDir): @@ -2100,7 +2100,7 @@ proc findWallet*(config: BeaconNodeConf, let walletId = splitFile(walletFile).name if cmpIgnoreCase(walletId, name.string) == 0: let wallet = ? loadWallet(walletFile) - return ok some WalletPathPair(wallet: wallet, path: walletFile) + return ok Opt.some WalletPathPair(wallet: wallet, path: walletFile) walletFiles.add walletFile except OSError as err: return err("Error accessing the wallets directory \"" & @@ -2110,9 +2110,9 @@ proc findWallet*(config: BeaconNodeConf, let wallet = ? loadWallet(walletFile) if cmpIgnoreCase(wallet.name.string, name.string) == 0 or cmpIgnoreCase(wallet.uuid.string, name.string) == 0: - return ok some WalletPathPair(wallet: wallet, path: walletFile) + return ok Opt.some WalletPathPair(wallet: wallet, path: walletFile) - return ok none(WalletPathPair) + return ok Opt.none(WalletPathPair) type # This is not particularly well-standardized yet. diff --git a/docs/the_nimbus_book/src/developers.md b/docs/the_nimbus_book/src/developers.md index 586c62fc2..0c9cace1d 100644 --- a/docs/the_nimbus_book/src/developers.md +++ b/docs/the_nimbus_book/src/developers.md @@ -251,3 +251,8 @@ For example, to run the block simulator for 384 slots, with 20,000 validators, a build/block_sim --slots=384 --validators=20000 --attesterRatio=0.66 ``` +## Sync from a specific peer + +```sh +build/nimbus_beacon_node --discv5:off --tcp-port=9876 --direct-peer="/ip4/127.0.0.1/tcp/9000/p2p/$(curl -s -X 'GET' 'http://localhost:5052/eth/v1/node/identity' -H 'accept: application/json' | jq -r .data.peer_id)" +```