From eca93509b4be27dbd1578a71508a48ad9f7c8f9e Mon Sep 17 00:00:00 2001 From: Zahary Karadjov Date: Sat, 10 Nov 2018 02:18:00 +0200 Subject: [PATCH] Support for creating JSON dumps of all P2P network traffic Enable by compiling with -d:p2pdump. A chronicles log file named p2p_messages.json will be created in the working directory. This file will be consumed by the upcoming Chronicles Tail GUI (more details will be provided on the wiki of this repo). Other changes: * Removes the use of package_visible_types (only partially so far) * Simplifies the new Snappy code a little bit --- eth_p2p.nim | 4 +- eth_p2p.nimble | 4 +- eth_p2p/p2p_tracing.nim | 86 ++++++++ eth_p2p/private/types.nim | 274 +++++++++++++----------- eth_p2p/rlpx.nim | 167 ++++++++------- eth_p2p/rlpx_protocols/eth_protocol.nim | 4 +- 6 files changed, 327 insertions(+), 212 deletions(-) create mode 100644 eth_p2p/p2p_tracing.nim diff --git a/eth_p2p.nim b/eth_p2p.nim index b816b46..caba69a 100644 --- a/eth_p2p.nim +++ b/eth_p2p.nim @@ -15,10 +15,8 @@ import eth_p2p/[kademlia, discovery, enode, peer_pool, rlpx], eth_p2p/private/types -types.forwardPublicTypes - export - rlpx, enode, kademlia + types, rlpx, enode, kademlia proc addCapability*(n: var EthereumNode, p: ProtocolInfo) = assert n.connectionState == ConnectionState.None diff --git a/eth_p2p.nimble b/eth_p2p.nimble index 301abb5..9cfb8d1 100644 --- a/eth_p2p.nimble +++ b/eth_p2p.nimble @@ -18,8 +18,10 @@ requires "nim > 0.18.0", "chronicles", "asyncdispatch2", "eth_common", + "snappy", "package_visible_types", - "snappy" + "serialization", + "json_serialization" proc runTest(name: string, defs = "", lang = "c") = exec "nim " & lang & " " & defs & " -d:testing --experimental:ForLoopMacros -r tests/" & name diff --git a/eth_p2p/p2p_tracing.nim b/eth_p2p/p2p_tracing.nim new file mode 100644 index 0000000..efd564f --- /dev/null +++ b/eth_p2p/p2p_tracing.nim @@ -0,0 +1,86 @@ +import + macros, + chronicles, serialization, serialization/streams, json_serialization/writer, + private/types + +export + # XXX: Nim visibility rules get in the way here. + # It would be nice if the users of this module don't have to + # import json_serializer, but this won't work at the moment, + # because the `encode` call inside `logMsgEvent` has its symbols + # mixed in from the module where `logMsgEvent` is called + # (instead of from this module, which will be more logical). + init, writeValue, getOutput + # TODO: File this as an issue + +const tracingEnabled* = defined(p2pdump) + +when tracingEnabled: + logStream p2pMessages[json[file(p2p_messages.json,truncate)]] + + proc logMsgEventImpl(eventName: static[string], + peer: Peer, + protocol: ProtocolInfo, + msgId: int, + json: string) = + # this is kept as a separate proc to reduce the code bloat + p2pMessages.log LogLevel.NONE, eventName, port = int(peer.network.address.tcpPort), + peer = $peer.remote, + protocol = protocol.name, + msgId, data = JsonString(json) + + proc logMsgEvent[Msg](eventName: static[string], peer: Peer, msg: Msg) = + mixin msgProtocol, protocolInfo, msgId + + logMsgEventImpl(eventName, peer, + Msg.msgProtocol.protocolInfo, + Msg.msgId, + StringJsonWriter.encode(msg)) + + proc logSentMsgFields*(peer: NimNode, + protocolInfo: NimNode, + msgId: int, + fields: openarray[NimNode]): NimNode = + ## This generates the tracing code inserted in the message sending procs + ## `fields` contains all the params that were serialized in the message + var tracer = ident("tracer") + + result = quote do: + var `tracer` = init StringJsonWriter + beginRecord(`tracer`) + + for f in fields: + result.add newCall(bindSym"writeField", tracer, newLit($f), f) + + result.add quote do: + endRecord(`tracer`) + logMsgEventImpl("outgoing_msg", `peer`, `protocolInfo`, `msgId`, getOutput(`tracer`)) + + template logSentMsg*(peer: Peer, msg: auto) = + logMsgEvent("outgoing_msg", peer, msg) + + template logReceivedMsg*(peer: Peer, msg: auto) = + logMsgEvent("incoming_msg", peer, msg) + + template logConnectedPeer*(peer: Peer) = + p2pMessages.log LogLevel.NONE, "peer_connected", + port = int(peer.network.address.tcpPort), + peer = $peer.remote + + template logAcceptedPeer*(peer: Peer) = + p2pMessages.log LogLevel.NONE, "peer_accepted", + port = int(peer.network.address.tcpPort), + peer = $peer.remote + + template logDisconnectedPeer*(peer: Peer) = + p2pMessages.log LogLevel.NONE, "peer_disconnected", + port = int(peer.network.address.tcpPort), + peer = $peer.remote + +else: + template logSentMsg*(peer: Peer, msg: auto) = discard + template logReceivedMsg*(peer: Peer, msg: auto) = discard + template logConnectedPeer*(peer: Peer) = discard + template logAcceptedPeer*(peer: Peer) = discard + template logDisconnectedPeer*(peer: Peer) = discard + diff --git a/eth_p2p/private/types.nim b/eth_p2p/private/types.nim index 3452b25..fa193ef 100644 --- a/eth_p2p/private/types.nim +++ b/eth_p2p/private/types.nim @@ -4,151 +4,167 @@ import rlp, asyncdispatch2, eth_common/eth_types, eth_keys, ../enode, ../kademlia, ../discovery, ../options, ../rlpxcrypt -const useSnappy* = defined(useSnappy) +const + useSnappy* = defined(useSnappy) -packageTypes: - type - EthereumNode* = ref object - networkId*: uint - chain*: AbstractChainDB - clientId*: string - connectionState*: ConnectionState - keys*: KeyPair - address*: Address - rlpxCapabilities: seq[Capability] - rlpxProtocols: seq[ProtocolInfo] - listeningServer: StreamServer - protocolStates: seq[RootRef] - discovery: DiscoveryProtocol - peerPool*: PeerPool - when useSnappy: - protocolVersion: uint +type + EthereumNode* = ref object + networkId*: uint + chain*: AbstractChainDB + clientId*: string + connectionState*: ConnectionState + keys*: KeyPair + address*: Address + peerPool*: PeerPool - Peer* = ref object - transport: StreamTransport - dispatcher: Dispatcher - lastReqId*: int - network*: EthereumNode - secretsState: SecretState - connectionState: ConnectionState - remote*: Node - protocolStates: seq[RootRef] - outstandingRequests: seq[Deque[OutstandingRequest]] - awaitedMessages: seq[FutureBase] - when useSnappy: - snappyEnabled: bool + # Private fields: + rlpxCapabilities*: seq[Capability] + rlpxProtocols*: seq[ProtocolInfo] + listeningServer*: StreamServer + protocolStates*: seq[RootRef] + discovery*: DiscoveryProtocol + when useSnappy: + protocolVersion*: uint - OutstandingRequest = object - id: int - future: FutureBase - timeoutAt: uint64 + Peer* = ref object + remote*: Node + network*: EthereumNode - PeerPool* = ref object - network: EthereumNode - keyPair: KeyPair - networkId: uint - minPeers: int - clientId: string - discovery: DiscoveryProtocol - lastLookupTime: float - connectedNodes: Table[Node, Peer] - connectingNodes: HashSet[Node] - running: bool - listenPort*: Port - observers: Table[int, PeerObserver] + # Private fields: + transport*: StreamTransport + dispatcher*: Dispatcher + lastReqId*: int + secretsState*: SecretState + connectionState*: ConnectionState + protocolStates*: seq[RootRef] + outstandingRequests*: seq[Deque[OutstandingRequest]] + awaitedMessages*: seq[FutureBase] + when useSnappy: + snappyEnabled*: bool - MessageInfo* = object - id*: int - name*: string - thunk*: MessageHandler - printer*: MessageContentPrinter - requestResolver: RequestResolver - nextMsgResolver: NextMsgResolver + PeerPool* = ref object + # Private fields: + network*: EthereumNode + keyPair*: KeyPair + networkId*: uint + minPeers*: int + clientId*: string + discovery*: DiscoveryProtocol + lastLookupTime*: float + connectedNodes*: Table[Node, Peer] + connectingNodes*: HashSet[Node] + running*: bool + listenPort*: Port + observers*: Table[int, PeerObserver] - CapabilityName* = array[3, char] + PeerObserver* = object + onPeerConnected*: proc(p: Peer) + onPeerDisconnected*: proc(p: Peer) - Capability* = object - name*: CapabilityName - version*: int + Capability* = object + name*: string + version*: int - ProtocolInfo* = ref object - name*: CapabilityName - version*: int - messages*: seq[MessageInfo] - index: int # the position of the protocol in the - # ordered list of supported protocols - peerStateInitializer: PeerStateInitializer - networkStateInitializer: NetworkStateInitializer - handshake: HandshakeStep - disconnectHandler: DisconnectionHandler + UnsupportedProtocol* = object of Exception + # This is raised when you attempt to send a message from a particular + # protocol to a peer that doesn't support the protocol. - Dispatcher = ref object - # The dispatcher stores the mapping of negotiated message IDs between - # two connected peers. The dispatcher objects are shared between - # connections running with the same set of supported protocols. - # - # `protocolOffsets` will hold one slot of each locally supported - # protocol. If the other peer also supports the protocol, the stored - # offset indicates the numeric value of the first message of the protocol - # (for this particular connection). If the other peer doesn't support the - # particular protocol, the stored offset is -1. - # - # `messages` holds a mapping from valid message IDs to their handler procs. - # - protocolOffsets: seq[int] - messages: seq[ptr MessageInfo] - activeProtocols: seq[ProtocolInfo] + MalformedMessageError* = object of Exception - PeerObserver* = object - onPeerConnected*: proc(p: Peer) - onPeerDisconnected*: proc(p: Peer) + PeerDisconnected* = object of Exception + reason*: DisconnectionReason - MessageHandlerDecorator = proc(msgId: int, n: NimNode): NimNode - MessageHandler = proc(x: Peer, msgId: int, data: Rlp): Future[void] - MessageContentPrinter = proc(msg: pointer): string - RequestResolver = proc(msg: pointer, future: FutureBase) - NextMsgResolver = proc(msgData: Rlp, future: FutureBase) - PeerStateInitializer = proc(peer: Peer): RootRef - NetworkStateInitializer = proc(network: EthereumNode): RootRef - HandshakeStep = proc(peer: Peer): Future[void] - DisconnectionHandler = proc(peer: Peer, - reason: DisconnectionReason): Future[void] {.gcsafe.} + UselessPeerError* = object of Exception - RlpxMessageKind* = enum - rlpxNotification, - rlpxRequest, - rlpxResponse + ## + ## Quasy-private types. Use at your own risk. + ## - ConnectionState* = enum - None, - Connecting, - Connected, - Disconnecting, - Disconnected + ProtocolInfo* = ref object + name*: string + version*: int + messages*: seq[MessageInfo] + index*: int # the position of the protocol in the + # ordered list of supported protocols - DisconnectionReason* = enum - DisconnectRequested, - TcpError, - BreachOfProtocol, - UselessPeer, - TooManyPeers, - AlreadyConnected, - IncompatibleProtocolVersion, - NullNodeIdentityReceived, - ClientQuitting, - UnexpectedIdentity, - SelfConnection, - MessageTimeout, - SubprotocolReason = 0x10 + # Private fields: + peerStateInitializer*: PeerStateInitializer + networkStateInitializer*: NetworkStateInitializer + handshake*: HandshakeStep + disconnectHandler*: DisconnectionHandler - UnsupportedProtocol* = object of Exception - # This is raised when you attempt to send a message from a particular - # protocol to a peer that doesn't support the protocol. + MessageInfo* = object + id*: int + name*: string - MalformedMessageError* = object of Exception + # Private fields: + thunk*: MessageHandler + printer*: MessageContentPrinter + requestResolver*: RequestResolver + nextMsgResolver*: NextMsgResolver - PeerDisconnected* = object of Exception - reason*: DisconnectionReason + Dispatcher* = ref object # private + # The dispatcher stores the mapping of negotiated message IDs between + # two connected peers. The dispatcher objects are shared between + # connections running with the same set of supported protocols. + # + # `protocolOffsets` will hold one slot of each locally supported + # protocol. If the other peer also supports the protocol, the stored + # offset indicates the numeric value of the first message of the protocol + # (for this particular connection). If the other peer doesn't support the + # particular protocol, the stored offset is -1. + # + # `messages` holds a mapping from valid message IDs to their handler procs. + # + protocolOffsets*: seq[int] + messages*: seq[ptr MessageInfo] + activeProtocols*: seq[ProtocolInfo] - UselessPeerError* = object of Exception + ## + ## Private types: + ## + + OutstandingRequest* = object + id*: int + future*: FutureBase + timeoutAt*: uint64 + + # Private types: + MessageHandlerDecorator* = proc(msgId: int, n: NimNode): NimNode + MessageHandler* = proc(x: Peer, msgId: int, data: Rlp): Future[void] + MessageContentPrinter* = proc(msg: pointer): string + RequestResolver* = proc(msg: pointer, future: FutureBase) + NextMsgResolver* = proc(msgData: Rlp, future: FutureBase) + PeerStateInitializer* = proc(peer: Peer): RootRef + NetworkStateInitializer* = proc(network: EthereumNode): RootRef + HandshakeStep* = proc(peer: Peer): Future[void] + DisconnectionHandler* = proc(peer: Peer, + reason: DisconnectionReason): Future[void] {.gcsafe.} + + RlpxMessageKind* = enum + rlpxNotification, + rlpxRequest, + rlpxResponse + + ConnectionState* = enum + None, + Connecting, + Connected, + Disconnecting, + Disconnected + + DisconnectionReason* = enum + DisconnectRequested, + TcpError, + BreachOfProtocol, + UselessPeer, + TooManyPeers, + AlreadyConnected, + IncompatibleProtocolVersion, + NullNodeIdentityReceived, + ClientQuitting, + UnexpectedIdentity, + SelfConnection, + MessageTimeout, + SubprotocolReason = 0x10 diff --git a/eth_p2p/rlpx.nim b/eth_p2p/rlpx.nim index bcc0674..247d376 100644 --- a/eth_p2p/rlpx.nim +++ b/eth_p2p/rlpx.nim @@ -1,12 +1,14 @@ import macros, tables, algorithm, deques, hashes, options, typetraits, chronicles, nimcrypto, asyncdispatch2, rlp, eth_common, eth_keys, - private/types, kademlia, auth, rlpxcrypt, enode + private/types, kademlia, auth, rlpxcrypt, enode, p2p_tracing when useSnappy: import snappy - const - devp2pSnappyVersion* = 5 + const devp2pSnappyVersion* = 5 + +const + tracingEnabled = defined(p2pdump) logScope: topics = "rlpx" @@ -16,6 +18,15 @@ const defaultReqTimeout = 10000 maxMsgSize = 1024 * 1024 +when tracingEnabled: + import + eth_common/eth_types_json_serialization + + export + # XXX: This is a work-around for a Nim issue. + # See a more detailed comment in p2p_tracing.nim + init, writeValue, getOutput + var gProtocols: seq[ProtocolInfo] gDispatchers = initSet[Dispatcher]() @@ -32,7 +43,7 @@ proc newFuture[T](location: var Future[T]) = proc `$`*(p: Peer): string {.inline.} = $p.remote -proc disconnect*(peer: Peer, reason: DisconnectionReason) {.async.} +proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = true) {.async.} template raisePeerDisconnected(msg: string, r: DisconnectionReason) = var e = newException(PeerDisconnected, msg) @@ -137,9 +148,7 @@ proc newProtocol(name: string, version: int, peerInit: PeerStateInitializer, networkInit: NetworkStateInitializer): ProtocolInfo = new result - result.name[0] = name[0] - result.name[1] = name[1] - result.name[2] = name[2] + result.name = name result.version = version result.messages = @[] result.peerStateInitializer = peerInit @@ -205,12 +214,12 @@ proc registerMsg(protocol: var ProtocolInfo, nextMsgResolver: NextMsgResolver) = if protocol.messages.len <= id: protocol.messages.setLen(id + 1) - protocol.messages[id] = MessageInfo.init(id = id, - name = name, - thunk = thunk, - printer = printer, - requestResolver = requestResolver, - nextMsgResolver = nextMsgResolver) + protocol.messages[id] = MessageInfo(id: id, + name: name, + thunk: thunk, + printer: printer, + requestResolver: requestResolver, + nextMsgResolver: nextMsgResolver) proc registerProtocol(protocol: ProtocolInfo) = # TODO: This can be done at compile-time in the future @@ -225,16 +234,20 @@ proc registerProtocol(protocol: ProtocolInfo) = # Message composition and encryption # -proc protocolOffset(peer: Peer, Protocol: type): int = +template protocolOffset(peer: Peer, Protocol: type): int = peer.dispatcher.protocolOffsets[Protocol.protocolInfo.index] -proc perPeerMsgId(peer: Peer, proto: type, msgId: int): int {.inline.} = +proc perPeerMsgIdImpl(peer: Peer, proto: ProtocolInfo, msgId: int): int {.inline.} = result = msgId if not peer.dispatcher.isNil: - result += peer.protocolOffset(proto) + result += peer.dispatcher.protocolOffsets[proto.index] -proc perPeerMsgId*(peer: Peer, MsgType: type): int {.inline.} = - peer.perPeerMsgId(MsgType.msgProtocol, MsgType.msgId) +proc supports*(peer: Peer, Protocol: type): bool {.inline.} = + ## Checks whether a Peer supports a particular protocol + peer.protocolOffset(Protocol) != -1 + +template perPeerMsgId(peer: Peer, MsgType: type): int = + perPeerMsgIdImpl(peer, MsgType.msgProtocol.protocolInfo, MsgType.msgId) proc writeMsgId(p: ProtocolInfo, msgId: int, peer: Peer, rlpOut: var RlpWriter) = @@ -268,8 +281,6 @@ template compressMsg(peer: Peer, data: Bytes): Bytes = data proc sendMsg*(peer: Peer, data: Bytes) {.async.} = - trace "sending msg", peer, msg = getMsgName(peer, rlpFromBytes(data).read(int)) - var cipherText = encryptMsg(peer.compressMsg(data), peer.secretsState) try: @@ -279,6 +290,8 @@ proc sendMsg*(peer: Peer, data: Bytes) {.async.} = raise proc send*[Msg](peer: Peer, msg: Msg): Future[void] = + logSentMsg(peer, msg) + var rlpWriter = initRlpWriter() rlpWriter.append perPeerMsgId(peer, Msg) rlpWriter.appendRecordType(msg, Msg.rlpFieldsCount > 1) @@ -292,9 +305,9 @@ proc registerRequest*(peer: Peer, result = peer.lastReqId let timeoutAt = fastEpochTime() + uint64(timeout) - let req = OutstandingRequest.init(id = result, - future = responseFuture, - timeoutAt = timeoutAt) + let req = OutstandingRequest(id: result, + future: responseFuture, + timeoutAt: timeoutAt) peer.outstandingRequests[responseMsgId].addLast req assert(not peer.dispatcher.isNil) @@ -395,7 +408,6 @@ proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} = await peer.disconnectAndRaise(BreachOfProtocol, "Cannot decrypt RLPx frame header") - trace "waiting for message bytes", peer, msgSize if msgSize > maxMsgSize: await peer.disconnectAndRaise(BreachOfProtocol, "RLPx message exceeds maximum size") @@ -427,7 +439,7 @@ proc recvMsg*(peer: Peer): Future[tuple[msgId: int, msgData: Rlp]] {.async.} = decryptedBytes = snappy.uncompress(decryptedBytes) if decryptedBytes.len == 0: await peer.disconnectAndRaise(BreachOfProtocol, - "Snappy uncompress encountered malformed data") + "Snappy uncompress encountered malformed data") var rlp = rlpFromBytes(decryptedBytes.toRange) try: @@ -461,14 +473,17 @@ proc waitSingleMsg(peer: Peer, MsgType: type): Future[MsgType] {.async.} = if nextMsgId == wantedId: try: - return checkedRlpRead(peer, nextMsgData, MsgType) + result = checkedRlpRead(peer, nextMsgData, MsgType) + logReceivedMsg(peer, result) + return except RlpError: await peer.disconnectAndRaise(BreachOfProtocol, "Invalid RLPx message body") elif nextMsgId == 1: # p2p.disconnect - raisePeerDisconnected("Unexpected disconnect", - DisconnectionReason nextMsgData.listElem(0).toInt(uint32)) + let reason = DisconnectionReason nextMsgData.listElem(0).toInt(uint32) + await peer.disconnect(reason, notifyOtherPeer = false) + raisePeerDisconnected("Unexpected disconnect", reason) else: warn "Dropped RLPX message", msg = peer.dispatcher.messages[nextMsgId].name @@ -489,13 +504,11 @@ proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] = proc dispatchMessages*(peer: Peer) {.async.} = while true: var (msgId, msgData) = await peer.recvMsg() - trace "received msg ", peer, msg = getMsgName(peer, msgId) - # rpl = msgData.inspect if msgId == 1: # p2p.disconnect await peer.transport.closeWait() - debug "remote peer disconnected", peer, - reason = msgData.listElem(0).toInt(uint32).DisconnectionReason + let reason = msgData.listElem(0).toInt(uint32).DisconnectionReason + await peer.disconnect(reason, notifyOtherPeer = false) break try: @@ -532,10 +545,6 @@ proc chooseFieldType(n: NimNode): NimNode = proc getState(peer: Peer, proto: ProtocolInfo): RootRef = peer.protocolStates[proto.index] -proc supports*(peer: Peer, Protocol: type): bool {.inline.} = - ## Checks whether a Peer supports a particular protocol - peer.protocolOffset(Protocol) != -1 - template state*(peer: Peer, Protocol: type): untyped = ## Returns the state object of a particular protocol for a ## particular connection. @@ -651,6 +660,7 @@ macro rlpxProtocolImpl(name: static[string], getState = bindSym "getState" getNetworkState = bindSym "getNetworkState" perPeerMsgId = bindSym "perPeerMsgId" + perPeerMsgIdImpl = bindSym "perPeerMsgIdImpl" linkSendFailureToReqFuture = bindSym "linkSendFailureToReqFuture" # By convention, all Ethereum protocol names must be abbreviated to 3 letters @@ -727,6 +737,7 @@ macro rlpxProtocolImpl(name: static[string], reqTimeout: NimNode rlpWriter = ident"writer" appendParams = newNimNode(nnkStmtList) + paramsToWrite = newSeq[NimNode](0) reqId = ident"reqId" perPeerMsgIdVar = ident"perPeerMsgId" @@ -786,7 +797,7 @@ macro rlpxProtocolImpl(name: static[string], appendParams.add quote do: newFuture `resultIdent` let `reqId` = `registerRequestCall` - `append`(`rlpWriter`, `reqId`) + paramsToWrite.add reqId else: appendParams.add quote do: newFuture `resultIdent` @@ -800,7 +811,7 @@ macro rlpxProtocolImpl(name: static[string], addr(`receivedMsg`), `reqIdVal`) if hasReqIds: - appendParams.add newCall(append, rlpWriter, reqId) + paramsToWrite.add reqId if n.body.kind != nnkEmpty: # implement the receiving thunk proc that deserialzed the @@ -828,7 +839,7 @@ macro rlpxProtocolImpl(name: static[string], # This is a fragment of the sending proc that # serializes each of the passed parameters: - appendParams.add newCall(append, rlpWriter, param) + paramsToWrite.add param # Each message has a corresponding record type. # Here, we create its fields one by one: @@ -852,6 +863,9 @@ macro rlpxProtocolImpl(name: static[string], if paramCount > 1: readParamsPrelude.add newCall(enterList, receivedRlp) + when tracingEnabled: + readParams.add newCall(bindSym"logReceivedMsg", msgSender, receivedMsg) + let thunkName = ident(msgName & "_thunk") var thunkProc = quote do: proc `thunkName`(`msgSender`: `Peer`, _: int, data: Rlp) = @@ -924,9 +938,9 @@ macro rlpxProtocolImpl(name: static[string], quote: return `sendCall` let `perPeerMsgIdValue` = if isSubprotocol: - newCall(perPeerMsgId, msgRecipient, protoNameIdent, perProtocolMsgId) + newCall(perPeerMsgIdImpl, msgRecipient, protocol, newLit(msgId)) else: - perProtocolMsgId + newLit(msgId) if paramCount > 1: # In case there are more than 1 parameter, @@ -935,18 +949,25 @@ macro rlpxProtocolImpl(name: static[string], newCall(startList, rlpWriter, newLit(paramCount)), appendParams) + for p in paramsToWrite: + appendParams.add newCall(append, rlpWriter, p) + # Make the send proc public msgSendProc.name = newTree(nnkPostfix, ident("*"), msgSendProc.name) + let initWriter = quote do: + var `rlpWriter` = `initRlpWriter`() + const `perProtocolMsgId` = `msgId` + let `perPeerMsgIdVar` = `perPeerMsgIdValue` + `append`(`rlpWriter`, `perPeerMsgIdVar`) + + when tracingEnabled: + appendParams.add logSentMsgFields(msgRecipient, protocol, msgId, paramsToWrite) + # let paramCountNode = newLit(paramCount) msgSendProc.body = quote do: - var `rlpWriter` = `initRlpWriter`() - let `perProtocolMsgId` = `msgId` - let `perPeerMsgIdVar` = `perPeerMsgIdValue` - - `append`(`rlpWriter`, `perPeerMsgIdVar`) + `initWriter` `appendParams` - `finalizeRequest` `senderEpilogue` @@ -1055,7 +1076,9 @@ macro rlpxProtocolImpl(name: static[string], result.add newCall(bindSym("registerProtocol"), protocol) when isMainModule: echo repr(result) - # echo repr(result) + + when defined(debugRlpxProtocol) or defined(debugMacros): + echo repr(result) macro rlpxProtocol*(protocolOptions: untyped, body: untyped): untyped = let protoName = $(protocolOptions[0]) @@ -1101,16 +1124,17 @@ proc callDisconnectHandlers(peer: Peer, reason: DisconnectionReason): Future[voi return all(futures) -proc disconnect*(peer: Peer, reason: DisconnectionReason) {.async.} = +proc disconnect*(peer: Peer, reason: DisconnectionReason, notifyOtherPeer = true) {.async.} = if peer.connectionState notin {Disconnecting, Disconnected}: peer.connectionState = Disconnecting try: # TODO: investigate the failure here - if not peer.transport.closed and false: + if false and notifyOtherPeer and not peer.transport.closed: await peer.sendDisconnectMsg(reason) finally: if not peer.dispatcher.isNil: await callDisconnectHandlers(peer, reason) + logDisconnectedPeer peer peer.connectionState = Disconnected removePeer(peer.network, peer) @@ -1184,22 +1208,10 @@ proc initSecretState(hs: var Handshake, authMsg, ackMsg: openarray[byte], initSecretState(secrets, p.secretsState) burnMem(secrets) -template baseProtocolVersion(node: EthereumNode): untyped = +template checkSnappySupport(node: EthereumNode, handshake: Handshake, peer: Peer) = when useSnappy: - node.protocolVersion - else: - devp2pVersion - -template baseProtocolVersion(node: EthereumNode, peer: Peer): untyped = - when useSnappy: - if peer.snappyEnabled: node.protocolVersion - else: devp2pVersion - else: - devp2pVersion - -template checkPeerProtocolVersion(peer: Peer, handshake: Handshake) = - when useSnappy: - peer.snappyEnabled = handshake.version >= devp2pSnappyVersion.uint + peer.snappyEnabled = node.protocolVersion >= devp2pSnappyVersion.uint and + handshake.version >= devp2pSnappyVersion.uint template getVersion(handshake: Handshake): uint = when useSnappy: @@ -1207,6 +1219,12 @@ template getVersion(handshake: Handshake): uint = else: devp2pVersion +template baseProtocolVersion(node: EthereumNode): untyped = + when useSnappy: + node.protocolVersion + else: + devp2pVersion + template baseProtocolVersion(peer: Peer): uint = when useSnappy: if peer.snappyEnabled: devp2pSnappyVersion @@ -1214,11 +1232,6 @@ template baseProtocolVersion(peer: Peer): uint = else: devp2pVersion -template checkPeerNeedCompression(peer: Peer, node: EthereumNode) = - when useSnappy: - peer.snappyEnabled = peer.snappyEnabled and - node.protocolVersion >= devp2pSnappyVersion.uint - proc rlpxConnect*(node: EthereumNode, remote: Node): Future[Peer] {.async.} = new result result.network = node @@ -1228,7 +1241,7 @@ proc rlpxConnect*(node: EthereumNode, remote: Node): Future[Peer] {.async.} = var ok = false try: result.transport = await connect(ta) - var handshake = newHandshake({Initiator, EIP8}, int(node.baseProtocolVersion())) + var handshake = newHandshake({Initiator, EIP8}, int(node.baseProtocolVersion)) handshake.host = node.keys var authMsg: array[AuthMessageMaxEIP8, byte] @@ -1250,11 +1263,12 @@ proc rlpxConnect*(node: EthereumNode, remote: Node): Future[Peer] {.async.} = ret = handshake.decodeAckMessage(ackMsg) check ret - result.checkPeerProtocolVersion(handshake) + node.checkSnappySupport(handshake, result) initSecretState(handshake, ^authMsg, ackMsg, result) # if handshake.remoteHPubkey != remote.node.pubKey: # raise newException(Exception, "Remote pubkey is wrong") + logConnectedPeer result asyncCheck result.hello(handshake.getVersion(), node.clientId, node.rlpxCapabilities, @@ -1267,7 +1281,6 @@ proc rlpxConnect*(node: EthereumNode, remote: Node): Future[Peer] {.async.} = warn "Remote nodeId is not its public key" # XXX: Do we care? await postHelloSteps(result, response) - result.checkPeerNeedCompression(node) ok = true except PeerDisconnected as e: if e.reason != TooManyPeers: @@ -1313,8 +1326,8 @@ proc rlpxAccept*(node: EthereumNode, ret = handshake.decodeAuthMessage(authMsg) check ret - result.checkPeerProtocolVersion(handshake) - handshake.version = uint8(node.baseProtocolVersion(result)) + node.checkSnappySupport(handshake, result) + handshake.version = uint8(result.baseProtocolVersion) var ackMsg: array[AckMessageMaxEIP8, byte] var ackMsgLen: int @@ -1325,7 +1338,8 @@ proc rlpxAccept*(node: EthereumNode, let listenPort = transport.localAddress().port - await result.hello(result.baseProtocolVersion(), node.clientId, + logAcceptedPeer result + await result.hello(result.baseProtocolVersion, node.clientId, node.rlpxCapabilities, listenPort.uint, node.keys.pubkey.getRaw()) @@ -1339,7 +1353,6 @@ proc rlpxAccept*(node: EthereumNode, result.remote = newNode(initEnode(handshake.remoteHPubkey, address)) await postHelloSteps(result, response) - result.checkPeerNeedCompression(node) except: error "Exception in rlpxAccept", err = getCurrentExceptionMsg(), diff --git a/eth_p2p/rlpx_protocols/eth_protocol.nim b/eth_p2p/rlpx_protocols/eth_protocol.nim index 486f22d..b749a34 100644 --- a/eth_p2p/rlpx_protocols/eth_protocol.nim +++ b/eth_p2p/rlpx_protocols/eth_protocol.nim @@ -21,8 +21,8 @@ type number: uint NewBlockAnnounce* = object - header: BlockHeader - body {.rlpInline.}: BlockBody + header*: BlockHeader + body* {.rlpInline.}: BlockBody PeerState = ref object initialized*: bool