From 3efec171a68f0b85858afe7db83d1f6339a1bcbf Mon Sep 17 00:00:00 2001 From: Zahary Karadjov Date: Mon, 11 Mar 2019 11:22:06 +0200 Subject: [PATCH] Make the APIs compatible with libp2p Lib2P2 handles RPC requests and responses with separate streams while DEV2P2 is relying on tagged messages transmitted over a single stream. To cover both models through the same application code, we introduce a new `response` variable in the request handlers. The user is supposed to issue a call to `response.send` in order to reply to the request. Please note that the `response.send` signature is strongly typed and depends on the current message. --- eth.nimble | 3 +- eth/p2p/p2p_backends_helpers.nim | 66 +++++++++++ eth/p2p/p2p_tracing.nim | 50 ++++---- eth/p2p/private/p2p_types.nim | 6 + eth/p2p/rlpx.nim | 146 +++++++++--------------- eth/p2p/rlpx_protocols/eth_protocol.nim | 8 +- eth/p2p/rlpx_protocols/les_protocol.nim | 18 +-- tests/p2p/tserver.nim | 12 +- 8 files changed, 175 insertions(+), 134 deletions(-) create mode 100644 eth/p2p/p2p_backends_helpers.nim diff --git a/eth.nimble b/eth.nimble index 609ce34..40a53da 100644 --- a/eth.nimble +++ b/eth.nimble @@ -13,7 +13,8 @@ requires "nim >= 0.19.0", "rocksdb", "package_visible_types", "chronos", - "chronicles" + "chronicles", + "std_shims" import strutils import oswalkdir, ospaths # In newer nim these are merged to os diff --git a/eth/p2p/p2p_backends_helpers.nim b/eth/p2p/p2p_backends_helpers.nim new file mode 100644 index 0000000..a4d5f2f --- /dev/null +++ b/eth/p2p/p2p_backends_helpers.nim @@ -0,0 +1,66 @@ +proc getState(peer: Peer, proto: ProtocolInfo): RootRef = + peer.protocolStates[proto.index] + +template state*(peer: Peer, Protocol: type): untyped = + ## Returns the state object of a particular protocol for a + ## particular connection. + mixin State + bind getState + cast[Protocol.State](getState(peer, Protocol.protocolInfo)) + +proc getNetworkState(node: EthereumNode, proto: ProtocolInfo): RootRef = + node.protocolStates[proto.index] + +template protocolState*(node: EthereumNode, Protocol: type): untyped = + mixin NetworkState + bind getNetworkState + cast[Protocol.NetworkState](getNetworkState(node, Protocol.protocolInfo)) + +template networkState*(connection: Peer, Protocol: type): untyped = + ## Returns the network state object of a particular protocol for a + ## particular connection. + protocolState(connection.network, Protocol) + +proc initProtocolState*[T](state: T, x: Peer|EthereumNode) {.gcsafe.} = discard + +proc createPeerState[ProtocolState](peer: Peer): RootRef = + var res = new ProtocolState + mixin initProtocolState + initProtocolState(res, peer) + return cast[RootRef](res) + +proc createNetworkState[NetworkState](network: EthereumNode): RootRef {.gcsafe.} = + var res = new NetworkState + mixin initProtocolState + initProtocolState(res, network) + return cast[RootRef](res) + +proc chooseFieldType(n: NimNode): NimNode = + ## Examines the parameter types used in the message signature + ## and selects the corresponding field type for use in the + ## message object type (i.e. `p2p.hello`). + ## + ## For now, only openarray types are remapped to sequences. + result = n + if n.kind == nnkBracketExpr and eqIdent(n[0], "openarray"): + result = n.copyNimTree + result[0] = ident("seq") + +proc popTimeoutParam(n: NimNode): NimNode = + var lastParam = n.params[^1] + if eqIdent(lastParam[0], "timeout"): + if lastParam[2].kind == nnkEmpty: + macros.error "You must specify a default value for the `timeout` parameter", lastParam + result = lastParam + n.params.del(n.params.len - 1) + +proc verifyStateType(t: NimNode): NimNode = + result = t[1] + if result.kind == nnkSym and $result == "nil": + return nil + if result.kind != nnkBracketExpr or $result[0] != "ref": + macros.error($result & " must be a ref type") + +proc newFuture[T](location: var Future[T]) = + location = newFuture[T]() + diff --git a/eth/p2p/p2p_tracing.nim b/eth/p2p/p2p_tracing.nim index f480dbb..325de30 100644 --- a/eth/p2p/p2p_tracing.nim +++ b/eth/p2p/p2p_tracing.nim @@ -1,11 +1,8 @@ -import - private/p2p_types - -const tracingEnabled* = defined(p2pdump) +const tracingEnabled = defined(p2pdump) when tracingEnabled: import - macros, + macros, typetraits, serialization, json_serialization/writer, chronicles, chronicles_tail/configuration @@ -25,8 +22,8 @@ when tracingEnabled: template logRecord(eventName: static[string], args: varargs[untyped]) = p2pMessages.log LogLevel.NONE, eventName, topics = "p2pdump", args - proc initTracing*(baseProtocol: ProtocolInfo, - userProtocols: seq[ProtocolInfo]) = + proc initTracing(baseProtocol: ProtocolInfo, + userProtocols: seq[ProtocolInfo]) = once: var w = init StringJsonWriter @@ -48,26 +45,27 @@ when tracingEnabled: proc logMsgEventImpl(eventName: static[string], peer: Peer, protocol: ProtocolInfo, - msgId: int, + msgName: string, json: string) = # this is kept as a separate proc to reduce the code bloat logRecord eventName, port = int(peer.network.address.tcpPort), peer = $peer.remote, protocol = protocol.name, - msgId, data = JsonString(json) + msg = msgName, + data = JsonString(json) proc logMsgEvent[Msg](eventName: static[string], peer: Peer, msg: Msg) = mixin msgProtocol, protocolInfo, msgId logMsgEventImpl(eventName, peer, Msg.msgProtocol.protocolInfo, - Msg.msgId, + Msg.type.name, StringJsonWriter.encode(msg)) - proc logSentMsgFields*(peer: NimNode, - protocolInfo: NimNode, - msgId: int, - fields: openarray[NimNode]): NimNode = + proc logSentMsgFields(peer: NimNode, + protocolInfo: NimNode, + msgName: string, + fields: openarray[NimNode]): NimNode = ## This generates the tracing code inserted in the message sending procs ## `fields` contains all the params that were serialized in the message var tracer = ident("tracer") @@ -82,35 +80,35 @@ when tracingEnabled: result.add quote do: endRecord(`tracer`) logMsgEventImpl("outgoing_msg", `peer`, - `protocolInfo`, `msgId`, getOutput(`tracer`)) + `protocolInfo`, `msgName`, getOutput(`tracer`)) - template logSentMsg*(peer: Peer, msg: auto) = + template logSentMsg(peer: Peer, msg: auto) = logMsgEvent("outgoing_msg", peer, msg) - template logReceivedMsg*(peer: Peer, msg: auto) = + template logReceivedMsg(peer: Peer, msg: auto) = logMsgEvent("incoming_msg", peer, msg) - template logConnectedPeer*(p: Peer) = + template logConnectedPeer(p: Peer) = logRecord "peer_connected", port = int(p.network.address.tcpPort), peer = $p.remote - template logAcceptedPeer*(p: Peer) = + template logAcceptedPeer(p: Peer) = logRecord "peer_accepted", port = int(p.network.address.tcpPort), peer = $p.remote - template logDisconnectedPeer*(p: Peer) = + template logDisconnectedPeer(p: Peer) = logRecord "peer_disconnected", port = int(p.network.address.tcpPort), peer = $p.remote else: - template initTracing*(baseProtocol: ProtocolInfo, + template initTracing(baseProtocol: ProtocolInfo, userProtocols: seq[ProtocolInfo])= discard - template logSentMsg*(peer: Peer, msg: auto) = discard - template logReceivedMsg*(peer: Peer, msg: auto) = discard - template logConnectedPeer*(peer: Peer) = discard - template logAcceptedPeer*(peer: Peer) = discard - template logDisconnectedPeer*(peer: Peer) = discard + template logSentMsg(peer: Peer, msg: auto) = discard + template logReceivedMsg(peer: Peer, msg: auto) = discard + template logConnectedPeer(peer: Peer) = discard + template logAcceptedPeer(peer: Peer) = discard + template logDisconnectedPeer(peer: Peer) = discard diff --git a/eth/p2p/private/p2p_types.nim b/eth/p2p/private/p2p_types.nim index 830f5bd..c46c0d8 100644 --- a/eth/p2p/private/p2p_types.nim +++ b/eth/p2p/private/p2p_types.nim @@ -170,3 +170,9 @@ type MessageTimeout, SubprotocolReason = 0x10 + ResponseWithId*[MsgType] = object + peer*: Peer + id*: int + + Response*[MsgType] = distinct Peer + diff --git a/eth/p2p/rlpx.nim b/eth/p2p/rlpx.nim index 959cb91..00babeb 100644 --- a/eth/p2p/rlpx.nim +++ b/eth/p2p/rlpx.nim @@ -1,15 +1,12 @@ import macros, tables, algorithm, deques, hashes, options, typetraits, - chronicles, nimcrypto, chronos, eth/[rlp, common, keys], - private/p2p_types, kademlia, auth, rlpxcrypt, enode, p2p_tracing + std_shims/macros_shim, chronicles, nimcrypto, chronos, eth/[rlp, common, keys], + private/p2p_types, kademlia, auth, rlpxcrypt, enode when useSnappy: import snappy const devp2pSnappyVersion* = 5 -const - tracingEnabled = defined(p2pdump) - logScope: topics = "rlpx" @@ -18,6 +15,8 @@ const defaultReqTimeout = 10000 maxMsgSize = 1024 * 1024 +include p2p_tracing + when tracingEnabled: import eth/common/eth_types_json_serialization @@ -36,9 +35,6 @@ var template allProtocols*: auto = {.gcsafe.}: gProtocols template devp2pInfo: auto = {.gcsafe.}: gDevp2pInfo -proc newFuture[T](location: var Future[T]) = - location = newFuture[T]() - proc `$`*(p: Peer): string {.inline.} = $p.remote @@ -240,6 +236,10 @@ proc perPeerMsgIdImpl(peer: Peer, proto: ProtocolInfo, msgId: int): int {.inline if not peer.dispatcher.isNil: result += peer.dispatcher.protocolOffsets[proto.index] +template getPeer(peer: Peer): auto = peer +template getPeer(response: Response): auto = Peer(response) +template getPeer(response: ResponseWithId): auto = response.peer + proc supports*(peer: Peer, Protocol: type): bool {.inline.} = ## Checks whether a Peer supports a particular protocol peer.protocolOffset(Protocol) != -1 @@ -490,6 +490,8 @@ proc waitSingleMsg(peer: Peer, MsgType: type): Future[MsgType] {.async.} = warn "Dropped RLPX message", msg = peer.dispatcher.messages[nextMsgId].name +include p2p_backends_helpers + proc nextMsg*(peer: Peer, MsgType: type): Future[MsgType] = ## This procs awaits a specific RLPx message. ## Any messages received while waiting will be dispatched to their @@ -525,77 +527,6 @@ proc dispatchMessages*(peer: Peer) {.async.} = (msgInfo.nextMsgResolver)(msgData, peer.awaitedMessages[msgId]) peer.awaitedMessages[msgId] = nil -iterator typedParams(n: NimNode, skip = 0): (NimNode, NimNode) = - for i in (1 + skip) ..< n.params.len: - let paramNodes = n.params[i] - let paramType = paramNodes[^2] - - for j in 0 ..< paramNodes.len - 2: - yield (paramNodes[j], paramType) - -proc chooseFieldType(n: NimNode): NimNode = - ## Examines the parameter types used in the message signature - ## and selects the corresponding field type for use in the - ## message object type (i.e. `p2p.hello`). - ## - ## For now, only openarray types are remapped to sequences. - result = n - if n.kind == nnkBracketExpr and eqIdent(n[0], "openarray"): - result = n.copyNimTree - result[0] = ident("seq") - -proc getState(peer: Peer, proto: ProtocolInfo): RootRef = - peer.protocolStates[proto.index] - -template state*(peer: Peer, Protocol: type): untyped = - ## Returns the state object of a particular protocol for a - ## particular connection. - mixin State - bind getState - cast[Protocol.State](getState(peer, Protocol.protocolInfo)) - -proc getNetworkState(node: EthereumNode, proto: ProtocolInfo): RootRef = - node.protocolStates[proto.index] - -template protocolState*(node: EthereumNode, Protocol: type): untyped = - mixin NetworkState - bind getNetworkState - cast[Protocol.NetworkState](getNetworkState(node, Protocol.protocolInfo)) - -template networkState*(connection: Peer, Protocol: type): untyped = - ## Returns the network state object of a particular protocol for a - ## particular connection. - protocolState(connection.network, Protocol) - -proc initProtocolState*[T](state: T, x: Peer|EthereumNode) {.gcsafe.} = discard - -proc createPeerState[ProtocolState](peer: Peer): RootRef = - var res = new ProtocolState - mixin initProtocolState - initProtocolState(res, peer) - return cast[RootRef](res) - -proc createNetworkState[NetworkState](network: EthereumNode): RootRef {.gcsafe.} = - var res = new NetworkState - mixin initProtocolState - initProtocolState(res, network) - return cast[RootRef](res) - -proc popTimeoutParam(n: NimNode): NimNode = - var lastParam = n.params[^1] - if eqIdent(lastParam[0], "timeout"): - if lastParam[2].kind == nnkEmpty: - macros.error "You must specify a default value for the `timeout` parameter", lastParam - result = lastParam - n.params.del(n.params.len - 1) - -proc verifyStateType(t: NimNode): NimNode = - result = t[1] - if result.kind == nnkSym and $result == "nil": - return nil - if result.kind != nnkBracketExpr or $result[0] != "ref": - macros.error($result & " must be a ref type") - macro p2pProtocolImpl(name: static[string], version: static[uint], body: untyped, @@ -632,6 +563,7 @@ macro p2pProtocolImpl(name: static[string], protoNameIdent = ident(protoName) resultIdent = ident "result" perProtocolMsgId = ident"perProtocolMsgId" + response = ident"response" currentProtocolSym = ident"CurrentProtocol" protocol = ident(protoName & "Protocol") isSubprotocol = version > 0'u @@ -674,7 +606,10 @@ macro p2pProtocolImpl(name: static[string], template applyDecorator(p: NimNode, decorator: NimNode) = if decorator.kind != nnkNilLit: p.addPragma decorator - proc augmentUserHandler(userHandlerProc: NimNode, msgId = -1, msgKind = rlpxNotification) = + proc augmentUserHandler(userHandlerProc: NimNode, + msgId = -1, + msgKind = rlpxNotification, + extraDefinitions: NimNode = nil) = ## Turns a regular proc definition into an async proc and adds ## the helpers for accessing the peer and network protocol states. case msgKind @@ -696,6 +631,9 @@ macro p2pProtocolImpl(name: static[string], userHandlerDefinitions.add quote do: type `currentProtocolSym` = `protoNameIdent` + if extraDefinitions != nil: + userHandlerDefinitions.add extraDefinitions + if msgId >= 0: userHandlerDefinitions.add quote do: const `perProtocolMsgId` = `msgId` @@ -729,7 +667,7 @@ macro p2pProtocolImpl(name: static[string], responseMsgId = -1, responseRecord: NimNode = nil): NimNode = if n[0].kind == nnkPostfix: - macros.error("rlpxProcotol procs are public by default. " & + macros.error("p2pProcotol procs are public by default. " & "Please remove the postfix `*`.", n) let @@ -742,6 +680,7 @@ macro p2pProtocolImpl(name: static[string], # variables used in the sending procs msgRecipient = ident"msgRecipient" + sendTo = ident"sendTo" reqTimeout: NimNode rlpWriter = ident"writer" appendParams = newNimNode(nnkStmtList) @@ -819,14 +758,25 @@ macro p2pProtocolImpl(name: static[string], addr(`receivedMsg`), `reqIdVal`) if hasReqIds: - paramsToWrite.add reqId + paramsToWrite.add newDotExpr(sendTo, ident"id") if n.body.kind != nnkEmpty: # implement the receiving thunk proc that deserialzed the # message parameters and calls the user proc: userHandlerProc = n.copyNimTree userHandlerProc.name = genSym(nskProc, msgName) - augmentUserHandler userHandlerProc, msgId, msgKind + + var extraDefs: NimNode + if msgKind == rlpxRequest: + let peer = userHandlerProc.params[1][0] + if hasReqIds: + extraDefs = quote do: + let `response` = ResponseWithId[`responseRecord`](peer: `peer`, id: `reqId`) + else: + extraDefs = quote do: + let `response` = Response[`responseRecord`](`peer`) + + augmentUserHandler userHandlerProc, msgId, msgKind, extraDefs # This is the call to the user supplied handled. Here we add only the # initial peer param, while the rest of the params will be added later. @@ -909,8 +859,11 @@ macro p2pProtocolImpl(name: static[string], template msgProtocol*(T: type `msgRecord`): type = `protoNameIdent` var msgSendProc = n + let msgSendProcName = n.name + outSendProcs.add msgSendProc + # TODO: check that the first param has the correct type - msgSendProc.params[1][0] = msgRecipient + msgSendProc.params[1][0] = sendTo msgSendProc.addPragma ident"gcsafe" # Add a timeout parameter for all request procs @@ -918,8 +871,20 @@ macro p2pProtocolImpl(name: static[string], of rlpxRequest: msgSendProc.params.add reqTimeout of rlpxResponse: - if useRequestIds: - msgSendProc.params.insert 2, newIdentDefs(reqId, ident"int") + # A response proc must be called with a response object that originates + # from a certain request. Here we change the Peer parameter at position + # 1 to the correct strongly-typed ResponseType. The incoming procs still + # gets the normal Peer paramter. + let + ResponseTypeHead = if useRequestIds: bindSym"ResponseWithId" + else: bindSym"Response" + ResponseType = newTree(nnkBracketExpr, ResponseTypeHead, msgRecord) + + msgSendProc.params[1][1] = ResponseType + + outSendProcs.add quote do: + template send*(r: `ResponseType`, args: varargs[untyped]): auto = + `msgSendProcName`(r, args) else: discard # We change the return type of the sending proc to a Future. @@ -946,7 +911,7 @@ macro p2pProtocolImpl(name: static[string], # `sendMsg` call. quote: return `sendCall` - let `perPeerMsgIdValue` = if isSubprotocol: + let perPeerMsgIdValue = if isSubprotocol: newCall(perPeerMsgIdImpl, msgRecipient, protocol, newLit(msgId)) else: newLit(msgId) @@ -975,6 +940,7 @@ macro p2pProtocolImpl(name: static[string], # let paramCountNode = newLit(paramCount) msgSendProc.body = quote do: + let `msgRecipient` = getPeer(`sendTo`) `initWriter` `appendParams` `finalizeRequest` @@ -983,8 +949,6 @@ macro p2pProtocolImpl(name: static[string], if msgKind == rlpxRequest: msgSendProc.applyDecorator outgoingRequestDecorator - outSendProcs.add msgSendProc - outProcRegistrations.add( newCall(bindSym("registerMsg"), protocol, @@ -1052,7 +1016,7 @@ macro p2pProtocolImpl(name: static[string], elif eqIdent(n[0], "onPeerDisconnected"): disconnectHandler = liftEventHandler(n[1], "PeerDisconnect") else: - macros.error(repr(n) & " is not a recognized call in RLPx protocol definitions", n) + macros.error(repr(n) & " is not a recognized call in P2P protocol definitions", n) of nnkProcDef: discard addMsgHandler(nextId, n) inc nextId @@ -1061,7 +1025,7 @@ macro p2pProtocolImpl(name: static[string], discard else: - macros.error("illegal syntax in a RLPx protocol definition", n) + macros.error("illegal syntax in a P2P protocol definition", n) let peerInit = if peerState == nil: newNilLit() else: newTree(nnkBracketExpr, createPeerState, peerState) diff --git a/eth/p2p/rlpx_protocols/eth_protocol.nim b/eth/p2p/rlpx_protocols/eth_protocol.nim index cd3fb59..59dbb3f 100644 --- a/eth/p2p/rlpx_protocols/eth_protocol.nim +++ b/eth/p2p/rlpx_protocols/eth_protocol.nim @@ -80,7 +80,7 @@ p2pProtocol eth(version = protocolVersion, await peer.disconnect(BreachOfProtocol) return - await peer.blockHeaders(peer.network.chain.getBlockHeaders(request)) + await response.send(peer.network.chain.getBlockHeaders(request)) proc blockHeaders(p: Peer, headers: openarray[BlockHeader]) @@ -90,7 +90,7 @@ p2pProtocol eth(version = protocolVersion, await peer.disconnect(BreachOfProtocol) return - await peer.blockBodies(peer.network.chain.getBlockBodies(hashes)) + await response.send(peer.network.chain.getBlockBodies(hashes)) proc blockBodies(peer: Peer, blocks: openarray[BlockBody]) @@ -101,13 +101,13 @@ p2pProtocol eth(version = protocolVersion, requestResponse: proc getNodeData(peer: Peer, hashes: openarray[KeccakHash]) = - await peer.nodeData(peer.network.chain.getStorageNodes(hashes)) + await response.send(peer.network.chain.getStorageNodes(hashes)) proc nodeData(peer: Peer, data: openarray[Blob]) requestResponse: proc getReceipts(peer: Peer, hashes: openarray[KeccakHash]) = - await peer.receipts(peer.network.chain.getReceipts(hashes)) + await response.send(peer.network.chain.getReceipts(hashes)) proc receipts(peer: Peer, receipts: openarray[Receipt]) diff --git a/eth/p2p/rlpx_protocols/les_protocol.nim b/eth/p2p/rlpx_protocols/les_protocol.nim index c42687d..03b823f 100644 --- a/eth/p2p/rlpx_protocols/les_protocol.nim +++ b/eth/p2p/rlpx_protocols/les_protocol.nim @@ -287,7 +287,7 @@ p2pProtocol les(version = lesVersion, costQuantity(req.maxResults.int, max = maxHeadersFetch).} = let headers = peer.network.chain.getBlockHeaders(req) - await peer.blockHeaders(reqId, updateBV(), headers) + await response.send(updateBV(), headers) proc blockHeaders( peer: Peer, @@ -304,7 +304,7 @@ p2pProtocol les(version = lesVersion, costQuantity(blocks.len, max = maxBodiesFetch), gcsafe.} = let blocks = peer.network.chain.getBlockBodies(blocks) - await peer.blockBodies(reqId, updateBV(), blocks) + await response.send(updateBV(), blocks) proc blockBodies( peer: Peer, @@ -318,7 +318,7 @@ p2pProtocol les(version = lesVersion, {.costQuantity(hashes.len, max = maxReceiptsFetch).} = let receipts = peer.network.chain.getReceipts(hashes) - await peer.receipts(reqId, updateBV(), receipts) + await response.send(updateBV(), receipts) proc receipts( peer: Peer, @@ -332,7 +332,7 @@ p2pProtocol les(version = lesVersion, costQuantity(proofs.len, max = maxProofsFetch).} = let proofs = peer.network.chain.getProofs(proofs) - await peer.proofs(reqId, updateBV(), proofs) + await response.send(updateBV(), proofs) proc proofs( peer: Peer, @@ -346,7 +346,7 @@ p2pProtocol les(version = lesVersion, costQuantity(reqs.len, max = maxCodeFetch).} = let results = peer.network.chain.getContractCodes(reqs) - await peer.contractCodes(reqId, updateBV(), results) + await response.send(updateBV(), results) proc contractCodes( peer: Peer, @@ -362,7 +362,7 @@ p2pProtocol les(version = lesVersion, costQuantity(reqs.len, max = maxHeaderProofsFetch).} = let proofs = peer.network.chain.getHeaderProofs(reqs) - await peer.headerProofs(reqId, updateBV(), proofs) + await response.send(updateBV(), proofs) proc headerProofs( peer: Peer, @@ -377,7 +377,7 @@ p2pProtocol les(version = lesVersion, var nodes, auxData: seq[Blob] peer.network.chain.getHelperTrieProofs(reqs, nodes, auxData) - await peer.helperTrieProofs(reqId, updateBV(), nodes, auxData) + await response.send(updateBV(), nodes, auxData) proc helperTrieProofs( peer: Peer, @@ -409,7 +409,7 @@ p2pProtocol les(version = lesVersion, results.add s - await peer.txStatus(reqId, updateBV(), results) + await response.send(updateBV(), results) proc getTxStatus( peer: Peer, @@ -421,7 +421,7 @@ p2pProtocol les(version = lesVersion, var results: seq[TransactionStatusMsg] for t in transactions: results.add chain.getTransactionStatus(t.rlpHash) - await peer.txStatus(reqId, updateBV(), results) + await response.send(updateBV(), results) proc txStatus( peer: Peer, diff --git a/tests/p2p/tserver.nim b/tests/p2p/tserver.nim index 9428b4e..195cf7a 100644 --- a/tests/p2p/tserver.nim +++ b/tests/p2p/tserver.nim @@ -42,7 +42,7 @@ p2pProtocol abc(version = 1, requestResponse: proc abcReq(p: Peer, n: int) = echo "got req ", n - await p.abcRes(reqId, &"response to #{n}") + await response.send(&"response to #{n}") proc abcRes(p: Peer, data: string) = echo "got response ", data @@ -89,6 +89,12 @@ template asyncTest(name, body: untyped) = proc scenario {.async.} = body waitFor scenario() +template sendResponseWithId(peer: Peer, proto, msg: untyped, reqId: int, data: varargs[untyped]): auto = + msg(ResponseWithId[proto.msg](peer: peer, id: reqId), data) + +template sendResponse(peer: Peer, proto, msg: untyped, data: varargs[untyped]): auto = + msg(Response[proto.msg](peer), data) + asyncTest "network with 3 peers using custom protocols": const useCompression = defined(useSnappy) let localKeys = newKeyPair() @@ -101,7 +107,7 @@ asyncTest "network with 3 peers using custom protocols": m.expect(abc.abcReq) do (peer: Peer, data: Rlp): let reqId = data.readReqId() - await peer.abcRes(reqId, "mock response") + await sendResponseWithId(peer, abc, abcRes, reqId, "mock response") await sleepAsync(100) let r = await peer.abcReq(1) assert r.get.data == "response to #1" @@ -116,7 +122,7 @@ asyncTest "network with 3 peers using custom protocols": m.expect(xyz.xyzReq) do (peer: Peer): echo "got xyz req" - await peer.xyzRes("mock peer data") + await sendResponse(peer, xyz, xyzRes, "mock peer data") when useCompression: m.useCompression = useCompression