From 76bb2cde5c4c1dd4949abfdb00ae5da9de93cb75 Mon Sep 17 00:00:00 2001 From: Zahary Karadjov Date: Wed, 29 May 2019 11:16:59 +0300 Subject: [PATCH] More shared code extracted out of RLPx --- eth/p2p/p2p_backends_helpers.nim | 46 +++++++++++++- eth/p2p/p2p_protocol_dsl.nim | 102 +++++++++++++++++++++++++++++-- eth/p2p/p2p_tracing.nim | 22 +------ eth/p2p/private/p2p_types.nim | 4 +- eth/p2p/rlpx.nim | 58 +----------------- 5 files changed, 148 insertions(+), 84 deletions(-) diff --git a/eth/p2p/p2p_backends_helpers.nim b/eth/p2p/p2p_backends_helpers.nim index 448e255..2d01a88 100644 --- a/eth/p2p/p2p_backends_helpers.nim +++ b/eth/p2p/p2p_backends_helpers.nim @@ -23,6 +23,48 @@ template networkState*(connection: Peer, Protocol: type): untyped = proc initProtocolState*[T](state: T, x: Peer|EthereumNode) {.gcsafe.} = discard -proc initFuture[T](loc: var Future[T]) = - loc = newFuture[T]() +proc requestResolver[MsgType](msg: pointer, future: FutureBase) {.gcsafe.} = + var f = Future[Option[MsgType]](future) + if not f.finished: + if msg != nil: + f.complete some(cast[ptr MsgType](msg)[]) + else: + f.complete none(MsgType) + else: + # This future was already resolved, but let's do some sanity checks + # here. The only reasonable explanation is that the request should + # have timed out. + if msg != nil: + if f.read.isSome: + doAssert false, "trying to resolve a request twice" + else: + doAssert false, "trying to resolve a timed out request with a value" + else: + try: + if not f.read.isSome: + doAssert false, "a request timed out twice" + # This can except when the future still completes with an error. + # E.g. the `sendMsg` fails because of an already closed transport or a + # broken pipe + except TransportOsError: + # E.g. broken pipe + trace "TransportOsError during request", err = getCurrentExceptionMsg() + except TransportError: + trace "Transport got closed during request" + except: + debug "Exception in requestResolver()", + exc = getCurrentException().name, + err = getCurrentExceptionMsg() + raise + +proc linkSendFailureToReqFuture[S, R](sendFut: Future[S], resFut: Future[R]) = + sendFut.addCallback() do (arg: pointer): + if not sendFut.error.isNil: + resFut.fail(sendFut.error) + +proc messagePrinter[MsgType](msg: pointer): string {.gcsafe.} = + result = "" + # TODO: uncommenting the line below increases the compile-time + # tremendously (for reasons not yet known) + # result = $(cast[ptr MsgType](msg)[]) diff --git a/eth/p2p/p2p_protocol_dsl.nim b/eth/p2p/p2p_protocol_dsl.nim index aeafe97..eb4e891 100644 --- a/eth/p2p/p2p_protocol_dsl.nim +++ b/eth/p2p/p2p_protocol_dsl.nim @@ -1,6 +1,6 @@ import macros, - std_shims/macros_shim, chronos/timer + std_shims/macros_shim, chronos type MessageKind* = enum @@ -18,6 +18,10 @@ type recIdent*: NimNode recBody*: NimNode userHandler*: NimNode + protocol*: P2PProtocol + + sendProcPeerParam*: NimNode + sendProcMsgParams*: seq[NimNode] Request* = ref object queries*: seq[Message] @@ -69,12 +73,18 @@ type # Bound symbols to the back-end run-time types and procs PeerType*: NimNode NetworkType*: NimNode + SerializationFormat*: NimNode registerProtocol*: NimNode setEventHandlers*: NimNode BackendFactory* = proc (p: P2PProtocol): Backend + P2PBackendError* = object of CatchableError + InvalidMsgError* = object of P2PBackendError + + ProtocolInfoBase* = object + const defaultReqTimeout = 10.seconds @@ -296,24 +306,108 @@ proc createSendProc*(msg: Message, procType = nnkProcDef): NimNode = # TODO: file an issue: # macros.newProc and macros.params doesn't work with nnkMacroDef - let pragmas = if procType == nnkProcDef: newTree(nnkPragma, ident"gcsafe") - else: newEmptyNode() + # createSendProc must be called only once + assert msg.sendProcPeerParam == nil + + let + pragmas = if procType == nnkProcDef: newTree(nnkPragma, ident"gcsafe") + else: newEmptyNode() result = newNimNode(procType).add( msg.identWithExportMarker, ## name newEmptyNode(), newEmptyNode(), - msg.procDef.params.copy, ## params + copy msg.procDef.params, ## params pragmas, newEmptyNode(), newStmtList()) ## body + for param, paramType in result.typedParams(): + if msg.sendProcPeerParam == nil: + msg.sendProcPeerParam = param + else: + msg.sendProcMsgParams.add param + if msg.kind in {msgHandshake, msgRequest}: result[3].add msg.timeoutParam result[3][0] = if procType == nnkMacroDef: ident "untyped" else: newTree(nnkBracketExpr, ident("Future"), msg.recIdent) +const tracingEnabled = defined(p2pdump) + +when tracingEnabled: + proc logSentMsgFields(peer: NimNode, + protocolInfo: NimNode, + msgName: string, + fields: openarray[NimNode]): NimNode = + ## This generates the tracing code inserted in the message sending procs + ## `fields` contains all the params that were serialized in the message + var tracer = ident("tracer") + + result = quote do: + var `tracer` = init StringJsonWriter + beginRecord(`tracer`) + + for f in fields: + result.add newCall(bindSym"writeField", tracer, newLit($f), f) + + result.add quote do: + endRecord(`tracer`) + logMsgEventImpl("outgoing_msg", `peer`, + `protocolInfo`, `msgName`, getOutput(`tracer`)) + +proc initFuture*[T](loc: var Future[T]) = + loc = newFuture[T]() + +proc createSendProcBody*(msg: Message, + preludeGenerator: proc(stream: NimNode): NimNode, + sendCallGenerator: proc (peer, bytes: NimNode): NimNode): NimNode = + let + outputStream = ident "outputStream" + msgBytes = ident "msgBytes" + writer = ident "writer" + writeField = ident "writeField" + resultIdent = ident "result" + + initFuture = bindSym "initFuture" + recipient = msg.sendProcPeerParam + msgRecName = msg.recIdent + Format = msg.protocol.backend.SerializationFormat + + prelude = if preludeGenerator.isNil: newStmtList() + else: preludeGenerator(outputStream) + appendParams = newStmtList() + + initResultFuture = if msg.kind != msgRequest: newStmtList() + else: newCall(initFuture, resultIdent) + + sendCall = sendCallGenerator(recipient, msgBytes) + + tracing = when tracingEnabled: logSentMsgFields(recipient, + newLit(msg.protocol.name), + $msg.ident, + msg.sendProcMsgParams) + else: newStmtList() + + + for param in msg.sendProcMsgParams: + appendParams.add newCall(writeField, writer, newLit($param), param) + + result = quote do: + mixin init, WriterType, beginRecord, endRecord, getOutput + + `initResultFuture` + var `outputStream` = init OutputStream + `prelude` + var writer = init(WriterType(`Format`), `outputStream`) + var recordStartMemo = beginRecord(writer, `msgRecName`) + `appendParams` + `tracing` + endRecord(writer, recordStartMemo) + let `msgBytes` = getOutput(`outputStream`) + `sendCall` + proc appendAllParams*(node: NimNode, procDef: NimNode, skipFirst = 0): NimNode = result = node for p, _ in procDef.typedParams(skip = skipFirst): diff --git a/eth/p2p/p2p_tracing.nim b/eth/p2p/p2p_tracing.nim index 325de30..5fe1a26 100644 --- a/eth/p2p/p2p_tracing.nim +++ b/eth/p2p/p2p_tracing.nim @@ -62,26 +62,6 @@ when tracingEnabled: Msg.type.name, StringJsonWriter.encode(msg)) - proc logSentMsgFields(peer: NimNode, - protocolInfo: NimNode, - msgName: string, - fields: openarray[NimNode]): NimNode = - ## This generates the tracing code inserted in the message sending procs - ## `fields` contains all the params that were serialized in the message - var tracer = ident("tracer") - - result = quote do: - var `tracer` = init StringJsonWriter - beginRecord(`tracer`) - - for f in fields: - result.add newCall(bindSym"writeField", tracer, newLit($f), f) - - result.add quote do: - endRecord(`tracer`) - logMsgEventImpl("outgoing_msg", `peer`, - `protocolInfo`, `msgName`, getOutput(`tracer`)) - template logSentMsg(peer: Peer, msg: auto) = logMsgEvent("outgoing_msg", peer, msg) @@ -105,7 +85,7 @@ when tracingEnabled: else: template initTracing(baseProtocol: ProtocolInfo, - userProtocols: seq[ProtocolInfo])= discard + userProtocols: seq[ProtocolInfo])= discard template logSentMsg(peer: Peer, msg: auto) = discard template logReceivedMsg(peer: Peer, msg: auto) = discard template logConnectedPeer(peer: Peer) = discard diff --git a/eth/p2p/private/p2p_types.nim b/eth/p2p/private/p2p_types.nim index 252a2f9..90a6d44 100644 --- a/eth/p2p/private/p2p_types.nim +++ b/eth/p2p/private/p2p_types.nim @@ -101,7 +101,7 @@ type name*: string # Private fields: - thunk*: MessageHandler + thunk*: ThunkProc printer*: MessageContentPrinter requestResolver*: RequestResolver nextMsgResolver*: NextMsgResolver @@ -134,7 +134,7 @@ type # Private types: MessageHandlerDecorator* = proc(msgId: int, n: NimNode): NimNode - MessageHandler* = proc(x: Peer, msgId: int, data: Rlp): Future[void] {.gcsafe.} + ThunkProc* = proc(x: Peer, msgId: int, data: Rlp): Future[void] {.gcsafe.} MessageContentPrinter* = proc(msg: pointer): string {.gcsafe.} RequestResolver* = proc(msg: pointer, future: FutureBase) {.gcsafe.} NextMsgResolver* = proc(msgData: Rlp, future: FutureBase) {.gcsafe.} diff --git a/eth/p2p/rlpx.nim b/eth/p2p/rlpx.nim index 2f3a63e..e201cfb 100644 --- a/eth/p2p/rlpx.nim +++ b/eth/p2p/rlpx.nim @@ -53,6 +53,8 @@ proc disconnectAndRaise(peer: Peer, await peer.disconnect(r) raisePeerDisconnected(msg, r) +include p2p_backends_helpers + # Dispatcher # @@ -166,53 +168,13 @@ proc cmp*(lhs, rhs: ProtocolInfo): int = return int16(lhs.name[i]) - int16(rhs.name[i]) return 0 -proc messagePrinter[MsgType](msg: pointer): string {.gcsafe.} = - result = "" - # TODO: uncommenting the line below increases the compile-time - # tremendously (for reasons not yet known) - # result = $(cast[ptr MsgType](msg)[]) - proc nextMsgResolver[MsgType](msgData: Rlp, future: FutureBase) {.gcsafe.} = var reader = msgData Future[MsgType](future).complete reader.readRecordType(MsgType, MsgType.rlpFieldsCount > 1) -proc requestResolver[MsgType](msg: pointer, future: FutureBase) {.gcsafe.} = - var f = Future[Option[MsgType]](future) - if not f.finished: - if msg != nil: - f.complete some(cast[ptr MsgType](msg)[]) - else: - f.complete none(MsgType) - else: - # This future was already resolved, but let's do some sanity checks - # here. The only reasonable explanation is that the request should - # have timed out. - if msg != nil: - if f.read.isSome: - doAssert false, "trying to resolve a request twice" - else: - doAssert false, "trying to resolve a timed out request with a value" - else: - try: - if not f.read.isSome: - doAssert false, "a request timed out twice" - # This can except when the future still completes with an error. - # E.g. the `sendMsg` fails because of an already closed transport or a - # broken pipe - except TransportOsError: - # E.g. broken pipe - trace "TransportOsError during request", err = getCurrentExceptionMsg() - except TransportError: - trace "Transport got closed during request" - except: - debug "Exception in requestResolver()", - exc = getCurrentException().name, - err = getCurrentExceptionMsg() - raise - proc registerMsg(protocol: ProtocolInfo, id: int, name: string, - thunk: MessageHandler, + thunk: ThunkProc, printer: MessageContentPrinter, requestResolver: RequestResolver, nextMsgResolver: NextMsgResolver) = @@ -257,12 +219,6 @@ proc supports*(peer: Peer, Protocol: type): bool {.inline.} = template perPeerMsgId(peer: Peer, MsgType: type): int = perPeerMsgIdImpl(peer, MsgType.msgProtocol.protocolInfo, MsgType.msgId) -proc writeMsgId(p: ProtocolInfo, msgId: int, peer: Peer, - rlpOut: var RlpWriter) = - let baseMsgId = peer.dispatcher.protocolOffsets[p.index] - doAssert baseMsgId != -1 - rlpOut.append(baseMsgId + msgId) - proc invokeThunk*(peer: Peer, msgId: int, msgData: var Rlp): Future[void] = template invalidIdError: untyped = raise newException(UnsupportedMessageError, @@ -275,11 +231,6 @@ proc invokeThunk*(peer: Peer, msgId: int, msgData: var Rlp): Future[void] = return thunk(peer, msgId, msgData) -proc linkSendFailureToReqFuture[S, R](sendFut: Future[S], resFut: Future[R]) = - sendFut.addCallback() do (arg: pointer): - if not sendFut.error.isNil: - resFut.fail(sendFut.error) - template compressMsg(peer: Peer, data: Bytes): Bytes = when useSnappy: if peer.snappyEnabled: @@ -499,8 +450,6 @@ proc waitSingleMsg(peer: Peer, MsgType: type): Future[MsgType] {.async.} = warn "Dropped RLPX message", msg = peer.dispatcher.messages[nextMsgId].name -include p2p_backends_helpers - proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] = ## This procs awaits a specific RLPx message. ## Any messages received while waiting will be dispatched to their @@ -619,7 +568,6 @@ proc p2pProtocolBackendImpl*(p: P2PProtocol): Backend = nextMsg = bindSym "nextMsg" initProtocol = bindSym"initProtocol" registerMsg = bindSym "registerMsg" - writeMsgId = bindSym "writeMsgId" perPeerMsgId = bindSym "perPeerMsgId" perPeerMsgIdImpl = bindSym "perPeerMsgIdImpl" linkSendFailureToReqFuture = bindSym "linkSendFailureToReqFuture"