# nim-eth # Copyright (c) 2018-2024 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at # https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at # https://www.apache.org/licenses/LICENSE-2.0). # at your option. This file may not be copied, modified, or distributed except # according to those terms. ## This module implements the `RLPx` Transport Protocol defined at ## `RLPx `_ ## in its EIP-8 version. ## ## This modules implements version 5 of the p2p protocol as defined by EIP-706 - ## earlier versions are not supported. ## ## Both, the message ID and the request/response ID are now unsigned. This goes ## along with the RLPx specs (see above) and the sub-protocol specs at ## `sub-proto `_ plus the ## fact that RLP is defined for non-negative integers smaller than 2^64 only at ## `Yellow Paper `_, ## Appx B, clauses (195) ff and (199). ## {.push raises: [].} import std/[algorithm, deques, options, os, sequtils, strutils, typetraits], stew/byteutils, stew/shims/macros, chronicles, chronos, metrics, snappy, ../rlp, ./private/p2p_types, ./[kademlia, auth, rlpxcrypt, enode, p2p_protocol_dsl] const devp2pSnappyVersion* = 5 ## EIP-706 version of devp2p, with snappy compression - no support offered ## for earlier versions maxMsgSize = 1024 * 1024 * 16 ## The maximum message size is normally limited by the 24-bit length field in ## the message header but in the case of snappy, we need to protect against ## decompression bombs: ## https://eips.ethereum.org/EIPS/eip-706#avoiding-dos-attacks connectionTimeout = 10.seconds msgIdHello = byte 0 msgIdDisconnect = byte 1 msgIdPing = byte 2 msgIdPong = byte 3 # TODO: chronicles re-export here is added for the error # "undeclared identifier: 'activeChroniclesStream'", when the code using p2p # does not import chronicles. Need to resolve this properly. export options, p2pProtocol, rlp, chronicles, metrics declarePublicGauge rlpx_connected_peers, "Number of connected peers in the pool" declarePublicCounter rlpx_connect_success, "Number of successfull rlpx connects" declarePublicCounter rlpx_connect_failure, "Number of rlpx connects that failed", labels = ["reason"] declarePublicCounter rlpx_accept_success, "Number of successful rlpx accepted peers" declarePublicCounter rlpx_accept_failure, "Number of rlpx accept attempts that failed", labels = ["reason"] logScope: topics = "eth p2p rlpx" type ResponderWithId*[MsgType] = object peer*: Peer reqId*: uint64 ResponderWithoutId*[MsgType] = distinct Peer # We need these two types in rlpx/devp2p as no parameters or single parameters # are not getting encoded in an rlp list. # TODO: we could generalize this in the protocol dsl but it would need an # `alwaysList` flag as not every protocol expects lists in these cases. EmptyList = object DisconnectionReasonList = object value: DisconnectionReason proc read( rlp: var Rlp, T: type DisconnectionReasonList ): T {.gcsafe, raises: [RlpError].} = ## Rlp mixin: `DisconnectionReasonList` parser if rlp.isList: # Be strict here: The expression `rlp.read(DisconnectionReasonList)` # accepts lists with at least one item. The array expression wants # exactly one item. if rlp.rawData.len < 3: # avoids looping through all items when parsing for an overlarge array return DisconnectionReasonList(value: rlp.read(array[1, DisconnectionReason])[0]) # Also accepted: a single byte reason code. Is is typically used # by variants of the reference implementation `Geth` elif rlp.blobLen <= 1: return DisconnectionReasonList(value: rlp.read(DisconnectionReason)) # Also accepted: a blob of a list (aka object) of reason code. It is # used by `bor`, a `geth` fork elif rlp.blobLen < 4: var subList = rlp.toBytes.rlpFromBytes if subList.isList: # Ditto, see above. return DisconnectionReasonList(value: subList.read(array[1, DisconnectionReason])[0]) raise newException(RlpTypeMismatch, "Single entry list expected") include p2p_tracing when tracingEnabled: import eth/common/eth_types_json_serialization export # XXX: This is a work-around for a Nim issue. # See a more detailed comment in p2p_tracing.nim init, writeValue, getOutput proc init*[MsgName](T: type ResponderWithId[MsgName], peer: Peer, reqId: uint64): T = T(peer: peer, reqId: reqId) proc init*[MsgName](T: type ResponderWithoutId[MsgName], peer: Peer): T = T(peer) chronicles.formatIt(Peer): $(it.remote) chronicles.formatIt(Opt[uint64]): (if it.isSome(): $it.value else: "-1") include p2p_backends_helpers proc requestResolver[MsgType](msg: pointer, future: FutureBase) {.gcsafe.} = var f = Future[Option[MsgType]](future) if not f.finished: if msg != nil: f.complete some(cast[ptr MsgType](msg)[]) else: f.complete none(MsgType) proc linkSendFailureToReqFuture[S, R](sendFut: Future[S], resFut: Future[R]) = sendFut.addCallback do(arg: pointer): # Avoiding potentially double future completions if not resFut.finished: if sendFut.failed: resFut.fail(sendFut.error) proc messagePrinter[MsgType](msg: pointer): string {.gcsafe.} = result = "" # TODO: uncommenting the line below increases the compile-time # tremendously (for reasons not yet known) # result = $(cast[ptr MsgType](msg)[]) proc disconnect*( peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false ) {.async: (raises: []).} # TODO Rework the disconnect-and-raise flow to not do both raising # and disconnection - this results in convoluted control flow and redundant # disconnect calls template raisePeerDisconnected(msg: string, r: DisconnectionReason) = var e = newException(PeerDisconnected, msg) e.reason = r raise e proc disconnectAndRaise( peer: Peer, reason: DisconnectionReason, msg: string ) {.async: (raises: [PeerDisconnected]).} = if reason == BreachOfProtocol: warn "TODO Raising protocol breach", remote = peer.remote, clientId = peer.clientId, msg await peer.disconnect(reason) raisePeerDisconnected(msg, reason) proc handshakeImpl[T]( peer: Peer, sendFut: Future[void], responseFut: auto, # Future[T].Raising([CancelledError, EthP2PError]), timeout: Duration, ): Future[T] {.async: (raises: [CancelledError, EthP2PError]).} = sendFut.addCallback do(arg: pointer) {.gcsafe.}: if sendFut.failed: debug "Handshake message not delivered", peer doAssert timeout.milliseconds > 0 try: let res = await responseFut.wait(timeout) return res except AsyncTimeoutError: # TODO: Really shouldn't disconnect and raise everywhere. In order to avoid # understanding what error occured where. # And also, incoming and outgoing disconnect errors should be seperated, # probably by seperating the actual disconnect call to begin with. await disconnectAndRaise(peer, TcpError, T.name() & " was not received in time.") # Dispatcher # proc describeProtocols(d: Dispatcher): string = d.activeProtocols.mapIt($it.capability).join(",") proc numProtocols(d: Dispatcher): int = d.activeProtocols.len proc getDispatcher( node: EthereumNode, otherPeerCapabilities: openArray[Capability] ): Opt[Dispatcher] = let dispatcher = Dispatcher() newSeq(dispatcher.protocolOffsets, protocolCount()) dispatcher.protocolOffsets.fill Opt.none(uint64) var nextUserMsgId = 0x10u64 for localProtocol in node.protocols: let idx = localProtocol.index block findMatchingProtocol: for remoteCapability in otherPeerCapabilities: if localProtocol.capability == remoteCapability: dispatcher.protocolOffsets[idx] = Opt.some(nextUserMsgId) nextUserMsgId += localProtocol.messages.len.uint64 break findMatchingProtocol template copyTo(src, dest; index: int) = for i in 0 ..< src.len: dest[index + i] = src[i] dispatcher.messages = newSeq[MessageInfo](nextUserMsgId) devp2pInfo.messages.copyTo(dispatcher.messages, 0) for localProtocol in node.protocols: let idx = localProtocol.index if dispatcher.protocolOffsets[idx].isSome: dispatcher.activeProtocols.add localProtocol localProtocol.messages.copyTo( dispatcher.messages, dispatcher.protocolOffsets[idx].value.int ) if dispatcher.numProtocols == 0: Opt.none(Dispatcher) else: Opt.some(dispatcher) proc getMsgName*(peer: Peer, msgId: uint64): string = if not peer.dispatcher.isNil and msgId < peer.dispatcher.messages.len.uint64 and not peer.dispatcher.messages[msgId].isNil: return peer.dispatcher.messages[msgId].name else: return case msgId of msgIdHello: "hello" of msgIdDisconnect: "disconnect" of msgIdPing: "ping" of msgIdPong: "pong" else: $msgId # Protocol info objects # proc initProtocol( name: string, version: uint64, peerInit: PeerStateInitializer, networkInit: NetworkStateInitializer, ): ProtocolInfo = ProtocolInfo( capability: Capability(name: name, version: version), messages: @[], peerStateInitializer: peerInit, networkStateInitializer: networkInit, ) proc setEventHandlers( p: ProtocolInfo, onPeerConnected: OnPeerConnectedHandler, onPeerDisconnected: OnPeerDisconnectedHandler, ) = p.onPeerConnected = onPeerConnected p.onPeerDisconnected = onPeerDisconnected proc cmp*(lhs, rhs: ProtocolInfo): int = let c = cmp(lhs.capability.name, rhs.capability.name) if c == 0: # Highest version first! -cmp(lhs.capability.version, rhs.capability.version) else: c proc nextMsgResolver[MsgType]( msgData: Rlp, future: FutureBase ) {.gcsafe, raises: [RlpError].} = var reader = msgData Future[MsgType](future).complete reader.readRecordType( MsgType, MsgType.rlpFieldsCount > 1 ) proc failResolver[MsgType](reason: DisconnectionReason, future: FutureBase) = Future[MsgType](future).fail( (ref PeerDisconnected)(msg: "Peer disconnected during handshake", reason: reason), warn = false, ) proc registerMsg( protocol: ProtocolInfo, msgId: uint64, name: string, thunk: ThunkProc, printer: MessageContentPrinter, requestResolver: RequestResolver, nextMsgResolver: NextMsgResolver, failResolver: FailResolver, ) = if protocol.messages.len.uint64 <= msgId: protocol.messages.setLen(msgId + 1) protocol.messages[msgId] = MessageInfo( id: msgId, name: name, thunk: thunk, printer: printer, requestResolver: requestResolver, nextMsgResolver: nextMsgResolver, failResolver: failResolver, ) # Message composition and encryption # proc perPeerMsgIdImpl(peer: Peer, proto: ProtocolInfo, msgId: uint64): uint64 = result = msgId if not peer.dispatcher.isNil: result += peer.dispatcher.protocolOffsets[proto.index].value template getPeer(peer: Peer): auto = peer template getPeer(responder: ResponderWithId): auto = responder.peer template getPeer(responder: ResponderWithoutId): auto = Peer(responder) proc supports*(peer: Peer, proto: ProtocolInfo): bool = peer.dispatcher.protocolOffsets[proto.index].isSome proc supports*(peer: Peer, Protocol: type): bool = ## Checks whether a Peer supports a particular protocol peer.supports(Protocol.protocolInfo) template perPeerMsgId(peer: Peer, MsgType: type): uint64 = perPeerMsgIdImpl(peer, MsgType.msgProtocol.protocolInfo, MsgType.msgId) proc invokeThunk*( peer: Peer, msgId: uint64, msgData: Rlp ): Future[void] {.async: (raises: [CancelledError, EthP2PError]).} = template invalidIdError(): untyped = raise newException( UnsupportedMessageError, "RLPx message with an invalid id " & $msgId & " on a connection supporting " & peer.dispatcher.describeProtocols, ) if msgId >= peer.dispatcher.messages.len.uint64 or peer.dispatcher.messages[msgId].isNil: invalidIdError() let msgInfo = peer.dispatcher.messages[msgId] doAssert peer.dispatcher.messages.len == peer.awaitedMessages.len, "Should have been set up in peer constructor" # Check if the peer is "expecting" this message as part of a handshake if peer.awaitedMessages[msgId] != nil: let awaited = move(peer.awaitedMessages[msgId]) peer.awaitedMessages[msgId] = nil try: msgInfo.nextMsgResolver(msgData, awaited) except rlp.RlpError: await peer.disconnectAndRaise( BreachOfProtocol, "Could not decode rlp for " & $msgId ) else: await msgInfo.thunk(peer, msgData) template compressMsg(peer: Peer, data: seq[byte]): seq[byte] = if peer.snappyEnabled: snappy.encode(data) else: data proc recvMsg( peer: Peer ): Future[tuple[msgId: uint64, msgRlp: Rlp]] {. async: (raises: [CancelledError, EthP2PError]) .} = var msgBody: seq[byte] try: msgBody = await peer.transport.recvMsg() trace "Received message", remote = peer.remote, clientId = peer.clientId, data = toHex(msgBody.toOpenArray(0, min(255, msgBody.high))) # TODO we _really_ need an rlp decoder that doesn't require this many # copies of each message... var tmp = rlpFromBytes(msgBody) let msgId = tmp.read(uint64) if peer.snappyEnabled and tmp.hasData(): let decoded = snappy.decode(msgBody.toOpenArray(tmp.position, msgBody.high), maxMsgSize) if decoded.len == 0: if msgId == 0x01 and msgBody.len > 1 and msgBody.len < 16 and msgBody[1] == 0xc1: # Nethermind sends its TooManyPeers uncompressed but we want to be nice! # https://github.com/NethermindEth/nethermind/issues/7726 debug "Trying to decode disconnect uncompressed", remote = peer.remote, clientId = peer.clientId, data = toHex(msgBody) else: await peer.disconnectAndRaise( BreachOfProtocol, "Could not decompress snappy data" ) else: trace "Decoded message", remote = peer.remote, clientId = peer.clientId, decoded = toHex(decoded.toOpenArray(0, min(255, decoded.high))) tmp = rlpFromBytes(decoded) return (msgId, tmp) except TransportError as exc: await peer.disconnectAndRaise(TcpError, exc.msg) except RlpxTransportError as exc: await peer.disconnectAndRaise(BreachOfProtocol, exc.msg) except RlpError as exc: # TODO remove this warning before using in production warn "TODO: RLP decoding failed for msgId", remote = peer.remote, clientId = peer.clientId, err = exc.msg, rawData = toHex(msgBody) await peer.disconnectAndRaise(BreachOfProtocol, "Could not decode msgId") proc encodeMsg(msg: auto): seq[byte] = var rlpWriter = initRlpWriter() rlpWriter.appendRecordType(msg, typeof(msg).rlpFieldsCount > 1) rlpWriter.finish proc sendMsg( peer: Peer, msgId: uint64, payload: seq[byte] ): Future[void] {.async: (raises: [CancelledError, EthP2PError]).} = try: let msgIdBytes = rlp.encodeInt(msgId) payloadBytes = peer.compressMsg(payload) var msg = newSeqOfCap[byte](msgIdBytes.data.len + payloadBytes.len) msg.add msgIdBytes.data() msg.add payloadBytes trace "Sending message", remote = peer.remote, clientId = peer.clientId, msgId, data = toHex(msg.toOpenArray(0, min(255, msg.high))), payload = toHex(payload.toOpenArray(0, min(255, payload.high))) await peer.transport.sendMsg(msg) except TransportError as exc: await peer.disconnectAndRaise(TcpError, exc.msg) except RlpxTransportError as exc: await peer.disconnectAndRaise(BreachOfProtocol, exc.msg) proc send*[Msg]( peer: Peer, msg: Msg ): Future[void] {.async: (raises: [CancelledError, EthP2PError], raw: true).} = logSentMsg(peer, msg) peer.sendMsg perPeerMsgId(peer, Msg), encodeMsg(msg) proc registerRequest( peer: Peer, timeout: Duration, responseFuture: FutureBase, responseMsgId: uint64 ): uint64 = result = if peer.lastReqId.isNone: 0u64 else: peer.lastReqId.value + 1u64 peer.lastReqId = Opt.some(result) let timeoutAt = Moment.fromNow(timeout) let req = OutstandingRequest(id: result, future: responseFuture) peer.outstandingRequests[responseMsgId].addLast req doAssert(not peer.dispatcher.isNil) let requestResolver = peer.dispatcher.messages[responseMsgId].requestResolver proc timeoutExpired(udata: pointer) {.gcsafe.} = requestResolver(nil, responseFuture) discard setTimer(timeoutAt, timeoutExpired, nil) proc resolveResponseFuture(peer: Peer, msgId: uint64, msg: pointer) = ## This function is a split off from the previously combined version with ## the same name using optional request ID arguments. This here is the ## version without a request ID (there is the other part below.). ## ## Optional arguments for macro helpers seem easier to handle with ## polymorphic functions (than a `Opt[]` prototype argument.) ## let msgInfo = peer.dispatcher.messages[msgId] logScope: msg = msgInfo.name msgContents = msgInfo.printer(msg) receivedReqId = -1 remotePeer = peer.remote template outstandingReqs(): auto = peer.outstandingRequests[msgId] block: # no request ID # XXX: This is a response from an ETH-like protocol that doesn't feature # request IDs. Handling the response is quite tricky here because this may # be a late response to an already timed out request or a valid response # from a more recent one. # # We can increase the robustness by recording enough features of the # request so we can recognize the matching response, but this is not very # easy to do because our peers are allowed to send partial responses. # # A more generally robust approach is to maintain a set of the wanted # data items and then to periodically look for items that have been # requested long time ago, but are still missing. New requests can be # issues for such items potentially from another random peer. var expiredRequests = 0 for req in outstandingReqs: if not req.future.finished: break inc expiredRequests outstandingReqs.shrink(fromFirst = expiredRequests) if outstandingReqs.len > 0: let oldestReq = outstandingReqs.popFirst msgInfo.requestResolver(msg, oldestReq.future) else: trace "late or dup RPLx reply ignored", msgId proc resolveResponseFuture(peer: Peer, msgId: uint64, msg: pointer, reqId: uint64) = ## Variant of `resolveResponseFuture()` for request ID argument. let msgInfo = peer.dispatcher.messages[msgId] logScope: msg = msgInfo.name msgContents = msgInfo.printer(msg) receivedReqId = reqId remotePeer = peer.remote template outstandingReqs(): auto = peer.outstandingRequests[msgId] block: # have request ID # TODO: This is not completely sound because we are still using a global # `reqId` sequence (the problem is that we might get a response ID that # matches a request ID for a different type of request). To make the code # correct, we can use a separate sequence per response type, but we have # to first verify that the other Ethereum clients are supporting this # correctly (because then, we'll be reusing the same reqIds for different # types of requests). Alternatively, we can assign a separate interval in # the `reqId` space for each type of response. if peer.lastReqId.isNone or reqId > peer.lastReqId.value: debug "RLPx response without matching request", msgId, reqId return var idx = 0 while idx < outstandingReqs.len: template req(): auto = outstandingReqs()[idx] if req.future.finished: # Here we'll remove the expired request by swapping # it with the last one in the deque (if necessary): if idx != outstandingReqs.len - 1: req = outstandingReqs.popLast continue else: outstandingReqs.shrink(fromLast = 1) # This was the last item, so we don't have any # more work to do: return if req.id == reqId: msgInfo.requestResolver msg, req.future # Here we'll remove the found request by swapping # it with the last one in the deque (if necessary): if idx != outstandingReqs.len - 1: req = outstandingReqs.popLast else: outstandingReqs.shrink(fromLast = 1) return inc idx trace "late or dup RPLx reply ignored" proc checkedRlpRead( peer: Peer, r: var Rlp, MsgType: type ): auto {.raises: [RlpError].} = when defined(release): return r.read(MsgType) else: try: return r.read(MsgType) except rlp.RlpError as e: debug "Failed rlp.read", peer = peer, dataType = MsgType.name, err = e.msg, errName = e.name #, rlpData = r.inspect -- don't use (might crash) raise e proc nextMsg*( peer: Peer, MsgType: type ): Future[MsgType] {.async: (raises: [CancelledError, EthP2PError], raw: true).} = ## This procs awaits a specific RLPx message. ## Any messages received while waiting will be dispatched to their ## respective handlers. The designated message handler will also run ## to completion before the future returned by `nextMsg` is resolved. let wantedId = peer.perPeerMsgId(MsgType) let f = peer.awaitedMessages[wantedId] if not f.isNil: return Future[MsgType].Raising([CancelledError, EthP2PError])(f) initFuture result peer.awaitedMessages[wantedId] = result proc dispatchMessages*(peer: Peer) {.async: (raises: []).} = try: while peer.connectionState notin {Disconnecting, Disconnected}: var (msgId, msgData) = await peer.recvMsg() await peer.invokeThunk(msgId, msgData) except EthP2PError: # TODO Is this needed? Most such exceptions are raised with an accompanying # disconnect already .. ClientQuitting isn't a great error but as good # as any since it will have no effect if the disconnect already happened await peer.disconnect(ClientQuitting) except CancelledError: await peer.disconnect(ClientQuitting) proc p2pProtocolBackendImpl*(protocol: P2PProtocol): Backend = let resultIdent = ident "result" Peer = bindSym "Peer" EthereumNode = bindSym "EthereumNode" initRlpWriter = bindSym "initRlpWriter" append = bindSym("append", brForceOpen) read = bindSym("read", brForceOpen) checkedRlpRead = bindSym "checkedRlpRead" startList = bindSym "startList" tryEnterList = bindSym "tryEnterList" finish = bindSym "finish" messagePrinter = bindSym "messagePrinter" nextMsgResolver = bindSym "nextMsgResolver" failResolver = bindSym "failResolver" registerRequest = bindSym "registerRequest" requestResolver = bindSym "requestResolver" resolveResponseFuture = bindSym "resolveResponseFuture" sendMsg = bindSym "sendMsg" nextMsg = bindSym "nextMsg" initProtocol = bindSym"initProtocol" registerMsg = bindSym "registerMsg" perPeerMsgId = bindSym "perPeerMsgId" perPeerMsgIdImpl = bindSym "perPeerMsgIdImpl" linkSendFailureToReqFuture = bindSym "linkSendFailureToReqFuture" handshakeImpl = bindSym "handshakeImpl" ResponderWithId = bindSym "ResponderWithId" ResponderWithoutId = bindSym "ResponderWithoutId" isSubprotocol = protocol.rlpxName != "p2p" if protocol.rlpxName.len == 0: protocol.rlpxName = protocol.name # By convention, all Ethereum protocol names have at least 3 characters. doAssert protocol.rlpxName.len >= 3 new result result.registerProtocol = bindSym "registerProtocol" result.setEventHandlers = bindSym "setEventHandlers" result.PeerType = Peer result.NetworkType = EthereumNode result.ResponderType = if protocol.useRequestIds: ResponderWithId else: ResponderWithoutId result.implementMsg = proc(msg: Message) = var msgIdValue = msg.id msgIdent = msg.ident msgName = $msgIdent msgRecName = msg.recName responseMsgId = if msg.response.isNil: Opt.none(uint64) else: Opt.some(msg.response.id) hasReqId = msg.hasReqId protocol = msg.protocol # variables used in the sending procs peerOrResponder = ident"peerOrResponder" rlpWriter = ident"writer" perPeerMsgIdVar = ident"perPeerMsgId" # variables used in the receiving procs receivedRlp = ident"rlp" receivedMsg = ident"msg" var readParams = newNimNode(nnkStmtList) paramsToWrite = newSeq[NimNode](0) appendParams = newNimNode(nnkStmtList) if hasReqId: # Messages using request Ids readParams.add quote do: let `reqIdVar` = `read`(`receivedRlp`, uint64) case msg.kind of msgRequest: doAssert responseMsgId.isSome let reqToResponseOffset = responseMsgId.value - msgIdValue let responseMsgId = quote: `perPeerMsgIdVar` + `reqToResponseOffset` # Each request is registered so we can resolve it when the response # arrives. There are two types of protocols: newer protocols use # explicit `reqId` sent over the wire, while old versions of the ETH wire # protocol assume response order matches requests. let registerRequestCall = newCall(registerRequest, peerVar, timeoutVar, resultIdent, responseMsgId) if hasReqId: appendParams.add quote do: initFuture `resultIdent` let `reqIdVar` = `registerRequestCall` paramsToWrite.add reqIdVar else: appendParams.add quote do: initFuture `resultIdent` discard `registerRequestCall` of msgResponse: if hasReqId: paramsToWrite.add newDotExpr(peerOrResponder, reqIdVar) of msgHandshake, msgNotification: discard for param, paramType in msg.procDef.typedParams(skip = 1): # This is a fragment of the sending proc that # serializes each of the passed parameters: paramsToWrite.add param # The received RLP data is deserialized to a local variable of # the message-specific type. This is done field by field here: readParams.add quote do: `receivedMsg`.`param` = `checkedRlpRead`(`peerVar`, `receivedRlp`, `paramType`) let paramCount = paramsToWrite.len readParamsPrelude = if paramCount > 1: newCall(tryEnterList, receivedRlp) else: newStmtList() when tracingEnabled: readParams.add newCall(bindSym"logReceivedMsg", peerVar, receivedMsg) let callResolvedResponseFuture = if msg.kind != msgResponse: newStmtList() elif hasReqId: newCall( resolveResponseFuture, peerVar, newCall(perPeerMsgId, peerVar, msgRecName), newCall("addr", receivedMsg), reqIdVar, ) else: newCall( resolveResponseFuture, peerVar, newCall(perPeerMsgId, peerVar, msgRecName), newCall("addr", receivedMsg), ) var userHandlerParams = @[peerVar] if hasReqId: userHandlerParams.add reqIdVar let awaitUserHandler = msg.genAwaitUserHandler(receivedMsg, userHandlerParams) thunkName = ident(msgName & "Thunk") msg.defineThunk quote do: proc `thunkName`( `peerVar`: `Peer`, data: Rlp ) {.async: (raises: [CancelledError, EthP2PError]).} = var `receivedRlp` = data var `receivedMsg`: `msgRecName` try: `readParamsPrelude` `readParams` `awaitUserHandler` `callResolvedResponseFuture` except rlp.RlpError as exc: # TODO this is a pre-release warning - we should turn this into an # actual BreachOfProtocol disconnect down the line warn "TODO: RLP decoding failed for incoming message", msg = name(`msgRecName`), remote = `peerVar`.remote, clientId = `peerVar`.clientId, err = exc.msg await `peerVar`.disconnectAndRaise( BreachOfProtocol, "Invalid RLP in parameter list for " & $(`msgRecName`) ) var sendProc = msg.createSendProc(isRawSender = (msg.kind == msgHandshake)) sendProc.def.params[1][0] = peerOrResponder let msgBytes = ident"msgBytes" finalizeRequest = quote: let `msgBytes` = `finish`(`rlpWriter`) perPeerMsgIdValue = if isSubprotocol: newCall(perPeerMsgIdImpl, peerVar, protocol.protocolInfo, newLit(msgIdValue)) else: newLit(msgIdValue) var sendCall = newCall(sendMsg, peerVar, perPeerMsgIdVar, msgBytes) let senderEpilogue = if msg.kind == msgRequest: # In RLPx requests, the returned future was allocated here and passed # to `registerRequest`. It's already assigned to the result variable # of the proc, so we just wait for the sending operation to complete # and we return in a normal way. (the waiting is done, so we can catch # any possible errors). quote: `linkSendFailureToReqFuture`(`sendCall`, `resultIdent`) else: # In normal RLPx messages, we are returning the future returned by the # `sendMsg` call. quote: return `sendCall` if paramCount > 1: # In case there are more than 1 parameter, # the params must be wrapped in a list: appendParams = newStmtList(newCall(startList, rlpWriter, newLit(paramCount)), appendParams) for param in paramsToWrite: appendParams.add newCall(append, rlpWriter, param) let initWriter = quote: var `rlpWriter` = `initRlpWriter`() let `perPeerMsgIdVar` = `perPeerMsgIdValue` when tracingEnabled: appendParams.add logSentMsgFields(peerVar, protocol, msgId, paramsToWrite) # let paramCountNode = newLit(paramCount) sendProc.setBody quote do: let `peerVar` = getPeer(`peerOrResponder`) `initWriter` `appendParams` `finalizeRequest` `senderEpilogue` if msg.kind == msgHandshake: discard msg.createHandshakeTemplate(sendProc.def.name, handshakeImpl, nextMsg) protocol.outProcRegistrations.add( newCall( registerMsg, protocolVar, newLit(msgIdValue), newLit(msgName), thunkName, newTree(nnkBracketExpr, messagePrinter, msgRecName), newTree(nnkBracketExpr, requestResolver, msgRecName), newTree(nnkBracketExpr, nextMsgResolver, msgRecName), newTree(nnkBracketExpr, failResolver, msgRecName), ) ) result.implementProtocolInit = proc(protocol: P2PProtocol): NimNode = return newCall( initProtocol, newLit(protocol.rlpxName), newLit(protocol.version), protocol.peerInit, protocol.netInit, ) p2pProtocol DevP2P(version = devp2pSnappyVersion, rlpxName = "p2p"): proc hello( peer: Peer, version: uint64, clientId: string, capabilities: seq[Capability], listenPort: uint, nodeId: array[RawPublicKeySize, byte], ) = # The first hello message gets processed during the initial handshake - this # version is used for any subsequent messages # TODO investigate and turn warning into protocol breach warn "TODO Multiple hello messages received", remote = peer.remote, clientId = clientId # await peer.disconnectAndRaise(BreachOfProtocol, "Multiple hello messages") proc sendDisconnectMsg(peer: Peer, reason: DisconnectionReasonList) = ## Notify other peer that we're about to disconnect them for the given ## reason if reason.value == BreachOfProtocol: # TODO This is a temporary log message at warning level to aid in # debugging in pre-release versions - it should be removed before # release # TODO Nethermind sends BreachOfProtocol on network id mismatch: # https://github.com/NethermindEth/nethermind/issues/7727 if not peer.clientId.startsWith("Nethermind"): warn "TODO Peer sent BreachOfProtocol error!", remote = peer.remote, clientId = peer.clientId else: trace "disconnect message received", reason = reason.value, peer await peer.disconnect(reason.value, false) # Adding an empty RLP list as the spec defines. # The parity client specifically checks if there is rlp data. proc ping(peer: Peer, emptyList: EmptyList) = discard peer.pong(EmptyList()) proc pong(peer: Peer, emptyList: EmptyList) = discard proc removePeer(network: EthereumNode, peer: Peer) = # It is necessary to check if peer.remote still exists. The connection might # have been dropped already from the peers side. # E.g. when receiving a p2p.disconnect message from a peer, a race will happen # between which side disconnects first. if network.peerPool != nil and not peer.remote.isNil and peer.remote in network.peerPool.connectedNodes: network.peerPool.connectedNodes.del(peer.remote) rlpx_connected_peers.dec() # Note: we need to do this check as disconnect (and thus removePeer) # currently can get called before the dispatcher is initialized. if not peer.dispatcher.isNil: for observer in network.peerPool.observers.values: if not observer.onPeerDisconnected.isNil: if observer.protocol.isNil or peer.supports(observer.protocol): observer.onPeerDisconnected(peer) proc callDisconnectHandlers( peer: Peer, reason: DisconnectionReason ): Future[void] {.async: (raises: []).} = let futures = peer.dispatcher.activeProtocols .filterIt(it.onPeerDisconnected != nil) .mapIt(it.onPeerDisconnected(peer, reason)) await noCancel allFutures(futures) proc disconnect*( peer: Peer, reason: DisconnectionReason, notifyOtherPeer = false ) {.async: (raises: []).} = if reason == BreachOfProtocol: # TODO remove warning after all protocol breaches have been investigated # TODO https://github.com/NethermindEth/nethermind/issues/7727 if not peer.clientId.startsWith("Nethermind"): warn "TODO disconnecting peer because of protocol breach", remote = peer.remote, clientId = peer.clientId if peer.connectionState notin {Disconnecting, Disconnected}: if peer.connectionState == Connected: # Only log peers that successfully completed the full connection setup - # the others should have been logged already debug "Peer disconnected", remote = peer.remote, clientId = peer.clientId, reason peer.connectionState = Disconnecting # Do this first so sub-protocols have time to clean up and stop sending # before this node closes transport to remote peer if not peer.dispatcher.isNil: # Notify all pending handshake handlers that a disconnection happened for msgId, fut in peer.awaitedMessages.mpairs: if fut != nil: var tmp = fut fut = nil peer.dispatcher.messages[msgId].failResolver(reason, tmp) for msgId, reqs in peer.outstandingRequests.mpairs(): while reqs.len > 0: let req = reqs.popFirst() # Same as when they timeout peer.dispatcher.messages[msgId].requestResolver(nil, req.future) # In case of `CatchableError` in any of the handlers, this will be logged. # Other handlers will still execute. # In case of `Defect` in any of the handlers, program will quit. await callDisconnectHandlers(peer, reason) if notifyOtherPeer and not peer.transport.closed: proc waitAndClose( transport: RlpxTransport, time: Duration ) {.async: (raises: []).} = await noCancel sleepAsync(time) await noCancel peer.transport.closeWait() try: await peer.sendDisconnectMsg(DisconnectionReasonList(value: reason)) except CatchableError as e: trace "Failed to deliver disconnect message", peer, err = e.msg, errName = e.name # Give the peer a chance to disconnect asyncSpawn peer.transport.waitAndClose(2.seconds) elif not peer.transport.closed: peer.transport.close() logDisconnectedPeer peer peer.connectionState = Disconnected removePeer(peer.network, peer) proc initPeerState*( peer: Peer, capabilities: openArray[Capability] ) {.raises: [UselessPeerError].} = peer.dispatcher = getDispatcher(peer.network, capabilities).valueOr: raise (ref UselessPeerError)( msg: "No capabilities in common: " & capabilities.mapIt($it).join(",") ) # The dispatcher has determined our message ID sequence. # For each message ID, we allocate a potential slot for # tracking responses to requests. # (yes, some of the slots won't be used). peer.outstandingRequests.newSeq(peer.dispatcher.messages.len) for d in mitems(peer.outstandingRequests): d = initDeque[OutstandingRequest]() # Similarly, we need a bit of book-keeping data to keep track # of the potentially concurrent calls to `nextMsg`. peer.awaitedMessages.newSeq(peer.dispatcher.messages.len) peer.lastReqId = Opt.some(0u64) peer.initProtocolStates peer.dispatcher.activeProtocols proc postHelloSteps( peer: Peer, h: DevP2P.hello ) {.async: (raises: [CancelledError, EthP2PError]).} = peer.clientId = h.clientId initPeerState(peer, h.capabilities) # Please note that the ordering of operations here is important! # # We must first start all handshake procedures and give them a # chance to send any initial packages they might require over # the network and to yield on their `nextMsg` waits. # let handshakes = peer.dispatcher.activeProtocols .filterIt(it.onPeerConnected != nil) .mapIt(it.onPeerConnected(peer)) # The `dispatchMessages` loop must be started after this. # Otherwise, we risk that some of the handshake packets sent by # the other peer may arrive too early and be processed before # the handshake code got a change to wait for them. # let messageProcessingLoop = peer.dispatchMessages() # The handshake may involve multiple async steps, so we wait # here for all of them to finish. # await allFutures(handshakes) for handshake in handshakes: if not handshake.completed(): await handshake # raises correct error without actually waiting # This is needed as a peer might have already disconnected. In this case # we need to raise so that rlpxConnect/rlpxAccept fails. # Disconnect is done only to run the disconnect handlers. TODO: improve this # also TODO: Should we discern the type of error? if messageProcessingLoop.finished: await peer.disconnectAndRaise( ClientQuitting, "messageProcessingLoop ended while connecting" ) peer.connectionState = Connected template setSnappySupport(peer: Peer, hello: DevP2P.hello) = peer.snappyEnabled = hello.version >= devp2pSnappyVersion.uint64 type RlpxError* = enum TransportConnectError RlpxHandshakeTransportError RlpxHandshakeError ProtocolError P2PHandshakeError P2PTransportError InvalidIdentityError UselessRlpxPeerError PeerDisconnectedError TooManyPeersError proc helloHandshake( node: EthereumNode, peer: Peer ): Future[DevP2P.hello] {.async: (raises: [CancelledError, EthP2PError]).} = ## Negotiate common capabilities using the p2p `hello` message # https://github.com/ethereum/devp2p/blob/5713591d0366da78a913a811c7502d9ca91d29a8/rlpx.md#hello-0x00 await peer.send( DevP2P.hello( version: devp2pSnappyVersion, clientId: node.clientId, capabilities: node.capabilities, listenPort: 0, # obsolete nodeId: node.keys.pubkey.toRaw(), ) ) # The first message received must be a hello or a disconnect var (msgId, msgData) = await peer.recvMsg() try: case msgId of msgIdHello: # Implementations must ignore any additional list elements in Hello # because they may be used by a future version. let response = msgData.read(DevP2P.hello) trace "Received Hello", version = response.version, id = response.clientId if response.nodeId != peer.transport.pubkey.toRaw: await peer.disconnectAndRaise( BreachOfProtocol, "nodeId in hello does not match RLPx transport identity" ) return response of msgIdDisconnect: # Disconnection requested by peer # TODO distinguish their reason from ours let reason = msgData.read(DisconnectionReasonList).value await peer.disconnectAndRaise( reason, "Peer disconnecting during hello: " & $reason ) else: # No other messages may be sent until a Hello is received. await peer.disconnectAndRaise(BreachOfProtocol, "Expected hello, got " & $msgId) except RlpError: await peer.disconnectAndRaise(BreachOfProtocol, "Could not decode hello RLP") proc rlpxConnect*( node: EthereumNode, remote: Node ): Future[Result[Peer, RlpxError]] {.async: (raises: [CancelledError]).} = # TODO move logging elsewhere - the aim is to have exactly _one_ debug log per # connection attempt (success or failure) to not spam the logs initTracing(devp2pInfo, node.protocols) logScope: remote trace "Connecting to peer" let peer = Peer(remote: remote, network: node) deadline = sleepAsync(connectionTimeout) var error = true defer: deadline.cancelSoon() # Harmless if finished if error: # TODO: Not sure if I like this much if peer.transport != nil: peer.transport.close() peer.transport = try: let ta = initTAddress(remote.node.address.ip, remote.node.address.tcpPort) await RlpxTransport.connect(node.rng, node.keys, ta, remote.node.pubkey).wait( deadline ) except AsyncTimeoutError: debug "Connect timeout" return err(TransportConnectError) except RlpxTransportError as exc: debug "Connect RlpxTransport error", err = exc.msg return err(ProtocolError) except TransportError as exc: debug "Connect transport error", err = exc.msg return err(TransportConnectError) logConnectedPeer peer # RLPx p2p capability handshake: After the initial handshake, both sides of # the connection must send either Hello or a Disconnect message. let response = try: await node.helloHandshake(peer).wait(deadline) except AsyncTimeoutError: debug "Connect handshake timeout" return err(P2PHandshakeError) except PeerDisconnected as exc: debug "Connect handshake disconnection", err = exc.msg, reason = exc.reason case exc.reason of TooManyPeers: return err(TooManyPeersError) else: return err(PeerDisconnectedError) except UselessPeerError as exc: debug "Useless peer during handshake", err = exc.msg return err(UselessRlpxPeerError) except EthP2PError as exc: debug "Connect handshake error", err = exc.msg return err(PeerDisconnectedError) if response.version < devp2pSnappyVersion: await peer.disconnect(IncompatibleProtocolVersion, notifyOtherPeer = true) debug "Peer using obsolete devp2p version", version = response.version, clientId = response.clientId return err(UselessRlpxPeerError) peer.setSnappySupport(response) logScope: clientId = response.clientId trace "DevP2P handshake completed" try: await postHelloSteps(peer, response) except PeerDisconnected as exc: debug "Disconnect finishing hello", remote, clientId = response.clientId, err = exc.msg, reason = exc.reason case exc.reason of TooManyPeers: return err(TooManyPeersError) else: return err(PeerDisconnectedError) except UselessPeerError as exc: debug "Useless peer finishing hello", err = exc.msg return err(UselessRlpxPeerError) except EthP2PError as exc: debug "P2P error finishing hello", err = exc.msg return err(ProtocolError) debug "Peer connected", capabilities = response.capabilities error = false return ok(peer) # TODO: rework rlpxAccept similar to rlpxConnect. proc rlpxAccept*( node: EthereumNode, stream: StreamTransport ): Future[Peer] {.async: (raises: [CancelledError, EthP2PError]).} = # TODO move logging elsewhere - the aim is to have exactly _one_ debug log per # connection attempt (success or failure) to not spam the logs initTracing(devp2pInfo, node.protocols) let peer = Peer(network: node) deadline = sleepAsync(connectionTimeout) var error = true defer: deadline.cancelSoon() if error: stream.close() let remoteAddress = try: stream.remoteAddress() except TransportError as exc: debug "Could not get remote address", err = exc.msg return nil trace "Incoming connection", remoteAddress = $remoteAddress peer.transport = try: await RlpxTransport.accept(node.rng, node.keys, stream).wait(deadline) except AsyncTimeoutError: debug "Accept timeout", remoteAddress = $remoteAddress rlpx_accept_failure.inc(labelValues = ["timeout"]) return nil except RlpxTransportError as exc: debug "Accept RlpxTransport error", remoteAddress = $remoteAddress, err = exc.msg rlpx_accept_failure.inc(labelValues = [$BreachOfProtocol]) return nil except TransportError as exc: debug "Accept transport error", remoteAddress = $remoteAddress, err = exc.msg rlpx_accept_failure.inc(labelValues = [$TcpError]) return nil let # The ports in this address are not necessarily the ports that the peer is # actually listening on, so we cannot use this information to connect to # the peer in the future! ip = try: remoteAddress.address except ValueError: raiseAssert "only tcp sockets supported" address = Address(ip: ip, tcpPort: remoteAddress.port, udpPort: remoteAddress.port) peer.remote = newNode(ENode(pubkey: peer.transport.pubkey, address: address)) logAcceptedPeer peer logScope: remote = peer.remote let response = try: await node.helloHandshake(peer).wait(deadline) except AsyncTimeoutError: debug "Accept handshake timeout" rlpx_accept_failure.inc(labelValues = ["timeout"]) return nil except PeerDisconnected as exc: debug "Accped handshake disconnection", err = exc.msg, reason = exc.reason rlpx_accept_failure.inc(labelValues = [$exc.reason]) return nil except EthP2PError as exc: debug "Accped handshake error", err = exc.msg rlpx_accept_failure.inc(labelValues = ["error"]) return nil if response.version < devp2pSnappyVersion: await peer.disconnect(IncompatibleProtocolVersion, notifyOtherPeer = true) debug "Peer using obsolete devp2p version", version = response.version, clientId = response.clientId rlpx_accept_failure.inc(labelValues = [$IncompatibleProtocolVersion]) return nil peer.setSnappySupport(response) logScope: clientId = response.clientId trace "DevP2P handshake completed", response # In case there is an outgoing connection started with this peer we give # precedence to that one and we disconnect here with `AlreadyConnected` if peer.remote in node.peerPool.connectedNodes or peer.remote in node.peerPool.connectingNodes: trace "Duplicate connection in rlpxAccept" rlpx_accept_failure.inc(labelValues = [$AlreadyConnected]) return nil node.peerPool.connectingNodes.incl(peer.remote) try: await postHelloSteps(peer, response) except PeerDisconnected as exc: debug "Disconnect while accepting", reason = exc.reason, err = exc.msg rlpx_accept_failure.inc(labelValues = [$exc.reason]) return nil except UselessPeerError as exc: debug "Useless peer while accepting", err = exc.msg rlpx_accept_failure.inc(labelValues = [$UselessPeer]) return nil except EthP2PError as exc: trace "P2P error during accept", err = exc.msg rlpx_accept_failure.inc(labelValues = [$exc.name]) return nil debug "Peer accepted", capabilities = response.capabilities error = false rlpx_accept_success.inc() return peer