import # Std lib typetraits, strutils, os, random, algorithm, options as stdOptions, net as stdNet, # Status libs stew/[varints, base58, bitseqs], stew/shims/[macros, tables], stint, faststreams/output_stream, snappy/framing, json_serialization, json_serialization/std/[net, options], chronos, chronicles, metrics, # TODO: create simpler to use libp2p modules that use re-exports libp2p/[switch, standard_setup, peerinfo, peer, connection, multiaddress, multicodec, crypto/crypto, crypto/secp, protocols/identify, protocols/protocol], libp2p/protocols/secure/[secure, secio], libp2p/protocols/pubsub/[pubsub, floodsub, rpc/messages], libp2p/transports/[transport, tcptransport], libp2p/stream/lpstream, eth/[keys, async_utils], eth/p2p/[enode, p2p_protocol_dsl], eth/net/nat, eth/p2p/discoveryv5/[enr, node], # Beacon node modules version, conf, eth2_discovery, libp2p_json_serialization, conf, ssz, peer_pool, spec/[datatypes, network] import eth/p2p/discoveryv5/protocol as discv5_protocol export version, multiaddress, peer_pool, peerinfo, p2pProtocol, libp2p_json_serialization, ssz, peer logScope: topics = "networking" type KeyPair* = crypto.KeyPair PublicKey* = crypto.PublicKey PrivateKey* = crypto.PrivateKey Bytes = seq[byte] # TODO Is this really needed? Eth2Node* = ref object of RootObj switch*: Switch discovery*: Eth2DiscoveryProtocol wantedPeers*: int peerPool*: PeerPool[Peer, PeerID] protocolStates*: seq[RootRef] libp2pTransportLoops*: seq[Future[void]] metadata*: Eth2Metadata EthereumNode = Eth2Node # needed for the definitions in p2p_backends_helpers Eth2MetaData* = object seq_number*: uint64 attnets*: BitArray[ATTESTATION_SUBNET_COUNT] ENRForkID* = object fork_digest*: ForkDigest next_fork_version*: Version next_fork_epoch*: Epoch Peer* = ref object network*: Eth2Node info*: PeerInfo wasDialed*: bool discoveryId*: Eth2DiscoveryId connectionState*: ConnectionState protocolStates*: seq[RootRef] maxInactivityAllowed*: Duration score*: int supportsSnappy: bool ConnectionState* = enum None, Connecting, Connected, Disconnecting, Disconnected UntypedResponder = object peer*: Peer stream*: Connection Responder*[MsgType] = distinct UntypedResponder MessageInfo* = object name*: string # Private fields: libp2pCodecName: string protocolMounter*: MounterProc ProtocolInfoObj* = object name*: string messages*: seq[MessageInfo] index*: int # the position of the protocol in the # ordered list of supported protocols # Private fields: peerStateInitializer*: PeerStateInitializer networkStateInitializer*: NetworkStateInitializer handshake*: HandshakeStep disconnectHandler*: DisconnectionHandler ProtocolInfo* = ptr ProtocolInfoObj ResponseCode* = enum Success InvalidRequest ServerError PeerStateInitializer* = proc(peer: Peer): RootRef {.gcsafe.} NetworkStateInitializer* = proc(network: EthereumNode): RootRef {.gcsafe.} HandshakeStep* = proc(peer: Peer, conn: Connection): Future[void] {.gcsafe.} DisconnectionHandler* = proc(peer: Peer): Future[void] {.gcsafe.} ThunkProc* = LPProtoHandler MounterProc* = proc(network: Eth2Node) {.gcsafe.} MessageContentPrinter* = proc(msg: pointer): string {.gcsafe.} DisconnectionReason* = enum ClientShutDown IrrelevantNetwork FaultOrError PeerDisconnected* = object of CatchableError reason*: DisconnectionReason TransmissionError* = object of CatchableError const clientId* = "Nimbus beacon node v" & fullVersionStr networkKeyFilename = "privkey.protobuf" nodeMetadataFilename = "node-metadata.json" TCP = net.Protocol.IPPROTO_TCP HandshakeTimeout = FaultOrError # Spec constants # https://github.com/ethereum/eth2.0-specs/blob/dev/specs/networking/p2p-interface.md#eth-20-network-interaction-domains MAX_CHUNK_SIZE* = 1 * 1024 * 1024 # bytes GOSSIP_MAX_SIZE* = 1 * 1024 * 1024 # bytes TTFB_TIMEOUT* = 5.seconds RESP_TIMEOUT* = 10.seconds readTimeoutErrorMsg = "Exceeded read timeout for a request" # Metrics for tracking attestation and beacon block loss declareCounter gossip_messages_sent, "Number of gossip messages sent by this peer" declareCounter gossip_messages_received, "Number of gossip messages received by this peer" declarePublicGauge libp2p_successful_dials, "Number of successfully dialed peers" declarePublicGauge libp2p_peers, "Number of active libp2p peers" template libp2pProtocol*(name: string, version: int) {.pragma.} template `$`*(peer: Peer): string = id(peer.info) chronicles.formatIt(Peer): $it template remote*(peer: Peer): untyped = peer.info.peerId template openStream(node: Eth2Node, peer: Peer, protocolId: string): untyped = dial(node.switch, peer.info, protocolId) func peerId(conn: Connection): PeerID = # TODO: Can this be `nil`? conn.peerInfo.peerId proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer {.gcsafe.} proc getPeer*(node: Eth2Node, peerInfo: PeerInfo): Peer {.gcsafe.} = let peerId = peerInfo.peerId result = node.peerPool.getOrDefault(peerId) if result == nil: # TODO: We should register this peer in the pool! result = Peer.init(node, peerInfo) proc peerFromStream(network: Eth2Node, conn: Connection): Peer {.gcsafe.} = # TODO: Can this be `nil`? return network.getPeer(conn.peerInfo) proc getKey*(peer: Peer): PeerID {.inline.} = result = peer.info.peerId proc getFuture*(peer: Peer): Future[void] {.inline.} = result = peer.info.lifeFuture() proc `<`*(a, b: Peer): bool = result = `<`(a.score, b.score) proc getScore*(a: Peer): int = result = a.score proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false) {.async.} = # TODO: How should we notify the other peer? if peer.connectionState notin {Disconnecting, Disconnected}: peer.connectionState = Disconnecting await peer.network.switch.disconnect(peer.info) peer.connectionState = Disconnected peer.network.peerPool.release(peer) peer.info.close() proc safeClose(conn: Connection) {.async.} = if not conn.closed: await close(conn) proc handleIncomingPeer*(peer: Peer) include eth/p2p/p2p_backends_helpers include eth/p2p/p2p_tracing proc getRequestProtoName(fn: NimNode): NimNode = # `getCustomPragmaVal` doesn't work yet on regular nnkProcDef nodes # (TODO: file as an issue) let pragmas = fn.pragma if pragmas.kind == nnkPragma and pragmas.len > 0: for pragma in pragmas: if pragma.len > 0 and $pragma[0] == "libp2pProtocol": let protoName = $(pragma[1]) let protoVer = $(pragma[2].intVal) return newLit("/eth2/beacon_chain/req/" & protoName & "/" & protoVer & "/") return newLit("") template raisePeerDisconnected(msg: string, r: DisconnectionReason) = var e = newException(PeerDisconnected, msg) e.reason = r raise e proc disconnectAndRaise(peer: Peer, reason: DisconnectionReason, msg: string) {.async.} = let r = reason await peer.disconnect(r) raisePeerDisconnected(msg, r) proc readChunk(conn: Connection, MsgType: type, withResponseCode: bool, deadline: Future[void]): Future[Option[MsgType]] {.gcsafe.} proc readSizePrefix(conn: Connection, deadline: Future[void]): Future[int] {.async.} = trace "about to read msg size prefix" var parser: VarintParser[uint64, ProtoBuf] while true: var nextByte: byte var readNextByte = conn.readExactly(addr nextByte, 1) await readNextByte or deadline if not readNextByte.finished: trace "size prefix byte not received in time" return -1 case parser.feedByte(nextByte) of Done: let res = parser.getResult if res > uint64(MAX_CHUNK_SIZE): trace "size prefix outside of range", res return -1 else: trace "got size prefix", res return int(res) of Overflow: trace "size prefix overflow" return -1 of Incomplete: continue proc readMsgBytes(conn: Connection, withResponseCode: bool, deadline: Future[void]): Future[Bytes] {.async.} = trace "about to read message bytes", withResponseCode try: if withResponseCode: var responseCode: byte trace "about to read response code" var readResponseCode = conn.readExactly(addr responseCode, 1) try: await readResponseCode or deadline except LPStreamEOFError: trace "end of stream received" return if not readResponseCode.finished: trace "response code not received in time" return if responseCode > ResponseCode.high.byte: trace "invalid response code", responseCode return logScope: responseCode = ResponseCode(responseCode) trace "got response code" case ResponseCode(responseCode) of InvalidRequest, ServerError: let responseErrMsg = await conn.readChunk(string, false, deadline) debug "P2P request resulted in error", responseErrMsg return of Success: # The response is OK, the execution continues below discard var sizePrefix = await conn.readSizePrefix(deadline) trace "got msg size prefix", sizePrefix if sizePrefix == -1: debug "Failed to read an incoming message size prefix", peer = conn.peerId return if sizePrefix == 0: debug "Received SSZ with zero size", peer = conn.peerId return trace "about to read msg bytes", len = sizePrefix var msgBytes = newSeq[byte](sizePrefix) var readBody = conn.readExactly(addr msgBytes[0], sizePrefix) await readBody or deadline if not readBody.finished: trace "msg bytes not received in time" return trace "got message bytes", len = sizePrefix return msgBytes except TransportIncompleteError: return @[] proc readChunk(conn: Connection, MsgType: type, withResponseCode: bool, deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} = var msgBytes = await conn.readMsgBytes(withResponseCode, deadline) try: if msgBytes.len > 0: return some SSZ.decode(msgBytes, MsgType) except SerializationError as err: debug "Failed to decode a network message", msgBytes, errMsg = err.formatMsg("") return proc readResponse( conn: Connection, MsgType: type, deadline: Future[void]): Future[Option[MsgType]] {.gcsafe, async.} = when MsgType is seq: type E = ElemType(MsgType) var results: MsgType while true: let nextRes = await conn.readChunk(E, true, deadline) if nextRes.isNone: break results.add nextRes.get if results.len > 0: return some(results) else: return await conn.readChunk(MsgType, true, deadline) proc encodeErrorMsg(responseCode: ResponseCode, errMsg: string): Bytes = var s = memoryOutput() s.append byte(responseCode) s.appendVarint errMsg.len s.appendValue SSZ, errMsg s.getOutput proc sendErrorResponse(peer: Peer, conn: Connection, err: ref SerializationError, msgName: string, msgBytes: Bytes) {.async.} = debug "Received an invalid request", peer, msgName, msgBytes, errMsg = err.formatMsg("") let responseBytes = encodeErrorMsg(InvalidRequest, err.formatMsg("msg")) await conn.write(responseBytes) await conn.close() proc sendErrorResponse(peer: Peer, conn: Connection, responseCode: ResponseCode, errMsg: string) {.async.} = debug "Error processing request", peer, responseCode, errMsg let responseBytes = encodeErrorMsg(ServerError, errMsg) await conn.write(responseBytes) await conn.close() proc sendNotificationMsg(peer: Peer, protocolId: string, requestBytes: Bytes) {.async} = var deadline = sleepAsync RESP_TIMEOUT protocolId = protocolId & (if peer.supportsSnappy: "ssz_snappy" else: "ssz") streamFut = peer.network.openStream(peer, protocolId) await streamFut or deadline if not streamFut.finished: streamFut.cancel() raise newException(TransmissionError, "Failed to open LibP2P stream") let stream = streamFut.read try: var s = memoryOutput() s.appendVarint requestBytes.len.uint64 if peer.supportsSnappy: framing_format_compress(s, requestBytes) else: s.append requestBytes let bytes = s.getOutput await stream.write(bytes) finally: await safeClose(stream) # TODO There is too much duplication in the responder functions, but # I hope to reduce this when I increse the reliance on output streams. proc sendResponseChunkBytes(responder: UntypedResponder, payload: Bytes) {.async.} = var s = memoryOutput() s.append byte(Success) s.appendVarint payload.len.uint64 s.append payload let bytes = s.getOutput await responder.stream.write(bytes) proc sendResponseChunkObj(responder: UntypedResponder, val: auto) {.async.} = var s = memoryOutput() s.append byte(Success) s.appendValue SSZ, sizePrefixed(val) let bytes = s.getOutput await responder.stream.write(bytes) proc sendResponseChunks[T](responder: UntypedResponder, chunks: seq[T]) {.async.} = var s = memoryOutput() for chunk in chunks: s.append byte(Success) s.appendValue SSZ, sizePrefixed(chunk) let bytes = s.getOutput await responder.stream.write(bytes) proc makeEth2Request(peer: Peer, protocolId: string, requestBytes: Bytes, ResponseMsg: type, timeout: Duration): Future[Option[ResponseMsg]] {.gcsafe, async.} = var deadline = sleepAsync timeout protocolId = protocolId & (if peer.supportsSnappy: "ssz_snappy" else: "ssz") streamFut = peer.network.openStream(peer, protocolId) await streamFut or deadline if not streamFut.finished: streamFut.cancel() return none(ResponseMsg) let stream = streamFut.read try: # Send the request var s = memoryOutput() s.appendVarint requestBytes.len.uint64 if peer.supportsSnappy: framing_format_compress(s, requestBytes) else: s.append requestBytes let bytes = s.getOutput await stream.write(bytes) # Read the response return await stream.readResponse(ResponseMsg, deadline) finally: await safeClose(stream) proc init*[MsgType](T: type Responder[MsgType], peer: Peer, conn: Connection): T = T(UntypedResponder(peer: peer, stream: conn)) template write*[M](r: var Responder[M], val: auto): auto = mixin send type Msg = M type MsgRec = RecType(Msg) when MsgRec is seq|openarray: type E = ElemType(MsgRec) when val is E: sendResponseChunkObj(UntypedResponder(r), val) elif val is MsgRec: sendResponseChunks(UntypedResponder(r), val) else: {.fatal: "Unepected message type".} else: send(r, val) proc performProtocolHandshakes*(peer: Peer) {.async.} = var subProtocolsHandshakes = newSeqOfCap[Future[void]](allProtocols.len) for protocol in allProtocols: if protocol.handshake != nil: subProtocolsHandshakes.add((protocol.handshake)(peer, nil)) await all(subProtocolsHandshakes) template initializeConnection*(peer: Peer): auto = performProtocolHandshakes(peer) proc initProtocol(name: string, peerInit: PeerStateInitializer, networkInit: NetworkStateInitializer): ProtocolInfoObj = result.name = name result.messages = @[] result.peerStateInitializer = peerInit result.networkStateInitializer = networkInit proc registerProtocol(protocol: ProtocolInfo) = # TODO: This can be done at compile-time in the future let pos = lowerBound(gProtocols, protocol) gProtocols.insert(protocol, pos) for i in 0 ..< gProtocols.len: gProtocols[i].index = i proc setEventHandlers(p: ProtocolInfo, handshake: HandshakeStep, disconnectHandler: DisconnectionHandler) = p.handshake = handshake p.disconnectHandler = disconnectHandler proc implementSendProcBody(sendProc: SendProc) = let msg = sendProc.msg UntypedResponder = bindSym "UntypedResponder" proc sendCallGenerator(peer, bytes: NimNode): NimNode = if msg.kind != msgResponse: let msgProto = getRequestProtoName(msg.procDef) case msg.kind of msgRequest: let timeout = msg.timeoutParam[0] ResponseRecord = msg.response.recName quote: makeEth2Request(`peer`, `msgProto`, `bytes`, `ResponseRecord`, `timeout`) else: quote: sendNotificationMsg(`peer`, `msgProto`, `bytes`) else: quote: sendResponseChunkBytes(`UntypedResponder`(`peer`), `bytes`) sendProc.useStandardBody(nil, nil, sendCallGenerator) proc handleIncomingStream(network: Eth2Node, conn: Connection, useSnappy: bool, MsgType, Format: distinct type) {.async, gcsafe.} = mixin callUserHandler, RecType const msgName = typetraits.name(MsgType) ## Uncomment this to enable tracing on all incoming requests ## You can include `msgNameLit` in the condition to select ## more specific requests: # when chronicles.runtimeFilteringEnabled: # setLogLevel(LogLevel.TRACE) # defer: setLogLevel(LogLevel.DEBUG) # trace "incoming " & `msgNameLit` & " conn" let peer = peerFromStream(network, conn) handleIncomingPeer(peer) try: let deadline = sleepAsync RESP_TIMEOUT var msgBytes = await readMsgBytes(conn, false, deadline) if msgBytes.len == 0: await sendErrorResponse(peer, conn, ServerError, readTimeoutErrorMsg) return if useSnappy: msgBytes = framingFormatUncompress(msgBytes) type MsgRec = RecType(MsgType) var msg: MsgRec try: msg = decode(Format, msgBytes, MsgRec) except SerializationError as err: await sendErrorResponse(peer, conn, err, msgName, msgBytes) return except Exception as err: # TODO. This is temporary code that should be removed after interop. # It can be enabled only in certain diagnostic builds where it should # re-raise the exception. debug "Crash during serialization", inputBytes = toHex(msgBytes), msgName await sendErrorResponse(peer, conn, ServerError, err.msg) raise err try: logReceivedMsg(peer, MsgType(msg)) await callUserHandler(peer, conn, msg) except CatchableError as err: await sendErrorResponse(peer, conn, ServerError, err.msg) finally: await safeClose(conn) proc handleOutgoingPeer*(peer: Peer): Future[void] {.async.} = let network = peer.network proc onPeerClosed(udata: pointer) {.gcsafe.} = debug "Peer (outgoing) lost", peer = $peer.info libp2p_peers.set int64(len(network.peerPool)) let res = await network.peerPool.addOutgoingPeer(peer) if res: debug "Peer (outgoing) has been added to PeerPool", peer = $peer.info peer.getFuture().addCallback(onPeerClosed) libp2p_peers.set int64(len(network.peerPool)) proc handleIncomingPeer*(peer: Peer) = let network = peer.network proc onPeerClosed(udata: pointer) {.gcsafe.} = debug "Peer (incoming) lost", peer = $peer.info libp2p_peers.set int64(len(network.peerPool)) let res = network.peerPool.addIncomingPeerNoWait(peer) if res: debug "Peer (incoming) has been added to PeerPool", peer = $peer.info peer.getFuture().addCallback(onPeerClosed) libp2p_peers.set int64(len(network.peerPool)) proc toPeerInfo*(r: enr.TypedRecord): PeerInfo = if r.secp256k1.isSome: var pubKey = keys.PublicKey.fromRaw(r.secp256k1.get) if pubkey.isErr: return # TODO let peerId = PeerID.init crypto.PublicKey( scheme: Secp256k1, skkey: secp.SkPublicKey(pubKey[])) var addresses = newSeq[MultiAddress]() if r.ip.isSome and r.tcp.isSome: let ip = IpAddress(family: IpAddressFamily.IPv4, address_v4: r.ip.get) addresses.add MultiAddress.init(ip, TCP, Port r.tcp.get) if r.ip6.isSome: let ip = IpAddress(family: IpAddressFamily.IPv6, address_v6: r.ip6.get) if r.tcp6.isSome: addresses.add MultiAddress.init(ip, TCP, Port r.tcp6.get) elif r.tcp.isSome: addresses.add MultiAddress.init(ip, TCP, Port r.tcp.get) else: discard if addresses.len > 0: return PeerInfo.init(peerId, addresses) proc toPeerInfo(r: Option[enr.TypedRecord]): PeerInfo = if r.isSome: return r.get.toPeerInfo proc dialPeer*(node: Eth2Node, peerInfo: PeerInfo) {.async.} = logScope: peer = $peerInfo debug "Connecting to peer" await node.switch.connect(peerInfo) var peer = node.getPeer(peerInfo) peer.wasDialed = true debug "Initializing connection" await initializeConnection(peer) inc libp2p_successful_dials debug "Network handshakes completed" await handleOutgoingPeer(peer) proc runDiscoveryLoop*(node: Eth2Node) {.async.} = debug "Starting discovery loop" while true: let currentPeerCount = node.peerPool.len if currentPeerCount < node.wantedPeers: try: let discoveredPeers = node.discovery.randomNodes(node.wantedPeers - currentPeerCount) for peer in discoveredPeers: try: let peerInfo = peer.record.toTypedRecord.toPeerInfo if peerInfo != nil: if peerInfo.id notin node.switch.connections: debug "Discovered new peer", peer = $peer # TODO do this in parallel await node.dialPeer(peerInfo) else: peerInfo.close() except CatchableError as err: debug "Failed to connect to peer", peer = $peer, err = err.msg except CatchableError as err: debug "Failure in discovery", err = err.msg await sleepAsync seconds(1) proc getPersistentNetMetadata*(conf: BeaconNodeConf): Eth2Metadata = let metadataPath = conf.dataDir / nodeMetadataFilename if not fileExists(metadataPath): result = Eth2Metadata() for i in 0 ..< ATTESTATION_SUBNET_COUNT: # TODO: For now, we indicate that we participate in all subnets result.attnets[i] = true Json.saveFile(metadataPath, result) else: result = Json.loadFile(metadataPath, Eth2Metadata) proc init*(T: type Eth2Node, conf: BeaconNodeConf, enrForkId: ENRForkID, switch: Switch, ip: Option[IpAddress], tcpPort, udpPort: Port, privKey: keys.PrivateKey): T = new result result.switch = switch result.wantedPeers = conf.maxPeers result.peerPool = newPeerPool[Peer, PeerID](maxPeers = conf.maxPeers) result.metadata = getPersistentNetMetadata(conf) result.discovery = Eth2DiscoveryProtocol.new( conf, ip, tcpPort, udpPort, privKey.toRaw, {"eth2": SSZ.encode(enrForkId), "attnets": SSZ.encode(result.metadata.attnets)}) newSeq result.protocolStates, allProtocols.len for proto in allProtocols: if proto.networkStateInitializer != nil: result.protocolStates[proto.index] = proto.networkStateInitializer(result) for msg in proto.messages: if msg.protocolMounter != nil: msg.protocolMounter result template publicKey*(node: Eth2Node): keys.PublicKey = node.discovery.privKey.toPublicKey.tryGet() template addKnownPeer*(node: Eth2Node, peer: ENode|enr.Record) = node.discovery.addNode peer proc start*(node: Eth2Node) {.async.} = node.discovery.open() node.discovery.start() node.libp2pTransportLoops = await node.switch.start() traceAsyncErrors node.runDiscoveryLoop() proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer = new result result.info = info result.network = network result.connectionState = Connected result.maxInactivityAllowed = 15.minutes # TODO: Read this from the config newSeq result.protocolStates, allProtocols.len for i in 0 ..< allProtocols.len: let proto = allProtocols[i] if proto.peerStateInitializer != nil: result.protocolStates[i] = proto.peerStateInitializer(result) proc registerMsg(protocol: ProtocolInfo, name: string, mounter: MounterProc, libp2pCodecName: string) = protocol.messages.add MessageInfo(name: name, protocolMounter: mounter, libp2pCodecName: libp2pCodecName) proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = var Format = ident "SSZ" Responder = bindSym "Responder" Connection = bindSym "Connection" Peer = bindSym "Peer" Eth2Node = bindSym "Eth2Node" registerMsg = bindSym "registerMsg" initProtocol = bindSym "initProtocol" msgVar = ident "msg" networkVar = ident "network" callUserHandler = ident "callUserHandler" p.useRequestIds = false p.useSingleRecordInlining = true new result result.PeerType = Peer result.NetworkType = Eth2Node result.registerProtocol = bindSym "registerProtocol" result.setEventHandlers = bindSym "setEventHandlers" result.SerializationFormat = Format result.ResponderType = Responder result.afterProtocolInit = proc (p: P2PProtocol) = p.onPeerConnected.params.add newIdentDefs(streamVar, Connection) result.implementMsg = proc (msg: p2p_protocol_dsl.Message) = let protocol = msg.protocol msgName = $msg.ident msgNameLit = newLit msgName MsgRecName = msg.recName MsgStrongRecName = msg.strongRecName codecNameLit = getRequestProtoName(msg.procDef) if msg.procDef.body.kind != nnkEmpty and msg.kind == msgRequest: # Request procs need an extra param - the stream where the response # should be written: msg.userHandler.params.insert(2, newIdentDefs(streamVar, Connection)) msg.initResponderCall.add streamVar ## ## Implement the Thunk: ## ## The protocol handlers in nim-libp2p receive only a `Connection` ## parameter and there is no way to access the wider context (such ## as the current `Switch`). In our handlers, we may need to list all ## peers in the current network, so we must keep a reference to the ## network object in the closure environment of the installed handlers. ## ## For this reason, we define a `protocol mounter` proc that will ## initialize the network object by creating handlers bound to the ## specific network. ## let protocolMounterName = ident(msgName & "_mounter") userHandlerCall = msg.genUserHandlerCall(msgVar, [peerVar, streamVar]) var mounter: NimNode if msg.userHandler != nil: protocol.outRecvProcs.add quote do: template `callUserHandler`(`peerVar`: `Peer`, `streamVar`: `Connection`, `msgVar`: `MsgRecName`): untyped = `userHandlerCall` proc `protocolMounterName`(`networkVar`: `Eth2Node`) = proc sszThunk(`streamVar`: `Connection`, proto: string): Future[void] {.gcsafe.} = return handleIncomingStream(`networkVar`, `streamVar`, false, `MsgStrongRecName`, `Format`) mount `networkVar`.switch, LPProtocol(codec: `codecNameLit` & "ssz", handler: sszThunk) proc snappyThunk(`streamVar`: `Connection`, proto: string): Future[void] {.gcsafe.} = return handleIncomingStream(`networkVar`, `streamVar`, true, `MsgStrongRecName`, `Format`) mount `networkVar`.switch, LPProtocol(codec: `codecNameLit` & "ssz_snappy", handler: snappyThunk) mounter = protocolMounterName else: mounter = newNilLit() ## ## Implement Senders and Handshake ## if msg.kind == msgHandshake: macros.error "Handshake messages are not supported in LibP2P protocols" else: var sendProc = msg.createSendProc() implementSendProcBody sendProc protocol.outProcRegistrations.add( newCall(registerMsg, protocol.protocolInfoVar, msgNameLit, mounter, codecNameLit)) result.implementProtocolInit = proc (p: P2PProtocol): NimNode = return newCall(initProtocol, newLit(p.name), p.peerInit, p.netInit) proc setupNat(conf: BeaconNodeConf): tuple[ip: Option[IpAddress], tcpPort: Port, udpPort: Port] {.gcsafe.} = # defaults result.tcpPort = conf.tcpPort result.udpPort = conf.udpPort var nat: NatStrategy case conf.nat.toLowerAscii: of "any": nat = NatAny of "none": nat = NatNone of "upnp": nat = NatUpnp of "pmp": nat = NatPmp else: if conf.nat.startsWith("extip:") and isIpAddress(conf.nat[6..^1]): # any required port redirection is assumed to be done by hand result.ip = some(parseIpAddress(conf.nat[6..^1])) nat = NatNone else: error "not a valid NAT mechanism, nor a valid IP address", value = conf.nat quit(QuitFailure) if nat != NatNone: result.ip = getExternalIP(nat) if result.ip.isSome: # TODO redirectPorts in considered a gcsafety violation # because it obtains the address of a non-gcsafe proc? let extPorts = ({.gcsafe.}: redirectPorts(tcpPort = result.tcpPort, udpPort = result.udpPort, description = clientId)) if extPorts.isSome: (result.tcpPort, result.udpPort) = extPorts.get() func asLibp2pKey*(key: keys.PublicKey): PublicKey = PublicKey(scheme: Secp256k1, skkey: secp.SkPublicKey(key)) func asEthKey*(key: PrivateKey): keys.PrivateKey = keys.PrivateKey(key.skkey) proc initAddress*(T: type MultiAddress, str: string): T = let address = MultiAddress.init(str) if IPFS.match(address) and matchPartial(multiaddress.TCP, address): result = address else: raise newException(MultiAddressError, "Invalid bootstrap node multi-address") template tcpEndPoint(address, port): auto = MultiAddress.init(address, Protocol.IPPROTO_TCP, port) proc getPersistentNetKeys*(conf: BeaconNodeConf): KeyPair = let privKeyPath = conf.dataDir / networkKeyFilename var privKey: PrivateKey if not fileExists(privKeyPath): createDir conf.dataDir.string privKey = PrivateKey.random(Secp256k1) writeFile(privKeyPath, privKey.getBytes()) else: let keyBytes = readFile(privKeyPath) privKey = PrivateKey.init(keyBytes.toOpenArrayByte(0, keyBytes.high)) KeyPair(seckey: privKey, pubkey: privKey.getKey()) proc createEth2Node*(conf: BeaconNodeConf, enrForkId: ENRForkID): Future[Eth2Node] {.async, gcsafe.} = var (extIp, extTcpPort, extUdpPort) = setupNat(conf) hostAddress = tcpEndPoint(conf.libp2pAddress, conf.tcpPort) announcedAddresses = if extIp.isNone(): @[] else: @[tcpEndPoint(extIp.get(), extTcpPort)] info "Initializing networking", hostAddress, announcedAddresses let keys = conf.getPersistentNetKeys # TODO nim-libp2p still doesn't have support for announcing addresses # that are different from the host address (this is relevant when we # are running behind a NAT). var switch = newStandardSwitch(some keys.seckey, hostAddress, triggerSelf = true, gossip = true) result = Eth2Node.init(conf, enrForkId, switch, extIp, extTcpPort, extUdpPort, keys.seckey.asEthKey) proc getPersistenBootstrapAddr*(conf: BeaconNodeConf, ip: IpAddress, port: Port): enr.Record = let pair = getPersistentNetKeys(conf) return enr.Record.init(1'u64, # sequence number pair.seckey.asEthKey, some(ip), port, port, @[]) proc announcedENR*(node: Eth2Node): enr.Record = doAssert node.discovery != nil, "The Eth2Node must be initialized" node.discovery.localNode.record proc shortForm*(id: KeyPair): string = $PeerID.init(id.pubkey) proc toPeerInfo(enode: ENode): PeerInfo = let peerId = PeerID.init enode.pubkey.asLibp2pKey addresses = @[MultiAddress.init enode.toMultiAddressStr] return PeerInfo.init(peerId, addresses) proc connectToNetwork*(node: Eth2Node) {.async.} = await node.start() proc checkIfConnectedToBootstrapNode {.async.} = await sleepAsync(30.seconds) if node.discovery.bootstrapRecords.len > 0 and libp2p_successful_dials.value == 0: fatal "Failed to connect to any bootstrap node. Quitting", bootstrapEnrs = node.discovery.bootstrapRecords quit 1 # TODO: The initial sync forces this to time out. # Revisit when the new Sync manager is integrated. # traceAsyncErrors checkIfConnectedToBootstrapNode() func peersCount*(node: Eth2Node): int = len(node.peerPool) proc subscribe*[MsgType](node: Eth2Node, topic: string, msgHandler: proc(msg: MsgType) {.gcsafe.}, msgValidator: proc(msg: MsgType): bool {.gcsafe.} ) {.async, gcsafe.} = template execMsgHandler(peerExpr, gossipBytes, gossipTopic) = inc gossip_messages_received trace "Incoming pubsub message received", peer = peerExpr, len = gossipBytes.len, topic = gossipTopic, message_id = `$`(sha256.digest(gossipBytes)) msgHandler SSZ.decode(gossipBytes, MsgType) # All message types which are subscribed to should be validated; putting # this in subscribe(...) ensures that the default approach is correct. template execMsgValidator(gossipBytes, gossipTopic): bool = trace "Incoming pubsub message received for validation", len = gossipBytes.len, topic = gossipTopic, message_id = `$`(sha256.digest(gossipBytes)) msgValidator SSZ.decode(gossipBytes, MsgType) # Validate messages as soon as subscribed let incomingMsgValidator = proc(topic: string, message: messages.Message): Future[bool] {.async, gcsafe.} = return execMsgValidator(message.data, topic) node.switch.addValidator(topic, incomingMsgValidator) let incomingMsgHandler = proc(topic: string, data: seq[byte]) {.async, gcsafe.} = execMsgHandler "unknown", data, topic await node.switch.subscribe(topic, incomingMsgHandler) proc traceMessage(fut: FutureBase, digest: MDigest[256]) = fut.addCallback do (arg: pointer): if not(fut.failed): trace "Outgoing pubsub message has been sent", message_id = `$`(digest) proc broadcast*(node: Eth2Node, topic: string, msg: auto) = inc gossip_messages_sent let broadcastBytes = SSZ.encode(msg) var fut = node.switch.publish(topic, broadcastBytes) traceMessage(fut, sha256.digest(broadcastBytes)) traceAsyncErrors(fut) # TODO: # At the moment, this is just a compatiblity shim for the existing RLPx functionality. # The filtering is not implemented properly yet. iterator randomPeers*(node: Eth2Node, maxPeers: int, Protocol: type): Peer = var peers = newSeq[Peer]() for _, peer in pairs(node.peers): peers.add peer shuffle peers if peers.len > maxPeers: peers.setLen(maxPeers) for p in peers: yield p