From 533e39ef94a74982b65111f74cd19c0a2f352fff Mon Sep 17 00:00:00 2001 From: Tanguy Date: Mon, 4 Jul 2022 15:19:21 +0200 Subject: [PATCH] Yamux (#704) Co-authored-by: Ludovic Chenut --- libp2p/builders.nim | 22 +- libp2p/muxers/yamux/yamux.nim | 468 +++++++++++++++++++++++ libp2p/protocols/relay.nim | 1 + tests/commoninterop.nim | 596 ++++++++++++++++++++++++++++++ tests/helpers.nim | 16 + tests/testinterop.nim | 677 +++------------------------------- tests/testyamux.nim | 154 ++++++++ 7 files changed, 1301 insertions(+), 633 deletions(-) create mode 100644 libp2p/muxers/yamux/yamux.nim create mode 100644 tests/commoninterop.nim create mode 100644 tests/testyamux.nim diff --git a/libp2p/builders.nim b/libp2p/builders.nim index 873bc43..a4f8945 100644 --- a/libp2p/builders.nim +++ b/libp2p/builders.nim @@ -22,7 +22,7 @@ import options, tables, chronos, chronicles, switch, peerid, peerinfo, stream/connection, multiaddress, crypto/crypto, transports/[transport, tcptransport], - muxers/[muxer, mplex/mplex], + muxers/[muxer, mplex/mplex, yamux/yamux], protocols/[identify, secure/secure, secure/noise, relay], connmanager, upgrademngrs/muxedupgrade, nameresolving/nameresolver, @@ -38,15 +38,15 @@ type Noise, Secio {.deprecated.} - MplexOpts = object - enable: bool + MuxerBuilder = object + codec: string newMuxer: MuxerConstructor SwitchBuilder* = ref object privKey: Option[PrivateKey] addresses: seq[MultiAddress] secureManagers: seq[SecureProtocol] - mplexOpts: MplexOpts + muxers: seq[MuxerBuilder] transports: seq[TransportProvider] rng: ref HmacDrbgContext maxConnections: int @@ -119,11 +119,13 @@ proc withMplex*( outTimeout, maxChannCount) - b.mplexOpts = MplexOpts( - enable: true, - newMuxer: newMuxer, - ) + b.muxers.add(MuxerBuilder(codec: MplexCodec, newMuxer: newMuxer)) + b +proc withYamux*(b: SwitchBuilder): SwitchBuilder = + proc newMuxer(conn: Connection): Muxer = Yamux.new(conn) + + b.muxers.add(MuxerBuilder(codec: YamuxCodec, newMuxer: newMuxer)) b proc withNoise*(b: SwitchBuilder): SwitchBuilder {.public.} = @@ -213,8 +215,8 @@ proc build*(b: SwitchBuilder): Switch let muxers = block: var muxers: Table[string, MuxerProvider] - if b.mplexOpts.enable: - muxers[MplexCodec] = MuxerProvider.new(b.mplexOpts.newMuxer, MplexCodec) + for m in b.muxers: + muxers[m.codec] = MuxerProvider.new(m.newMuxer, m.codec) muxers let diff --git a/libp2p/muxers/yamux/yamux.nim b/libp2p/muxers/yamux/yamux.nim new file mode 100644 index 0000000..601ad33 --- /dev/null +++ b/libp2p/muxers/yamux/yamux.nim @@ -0,0 +1,468 @@ +# Nim-LibP2P +# Copyright (c) 2022 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +{.push raises: [Defect].} + +import sequtils, std/[tables] +import chronos, chronicles, stew/[endians2, byteutils, objects] +import ../muxer, + ../../stream/connection + +export muxer + +logScope: + topics = "libp2p yamux" + +const + YamuxCodec* = "/yamux/1.0.0" + YamuxVersion = 0.uint8 + DefaultWindowSize = 256000 + +type + YamuxError* = object of CatchableError + + MsgType = enum + Data = 0x0 + WindowUpdate = 0x1 + Ping = 0x2 + GoAway = 0x3 + + MsgFlags {.size: 2.} = enum + Syn + Ack + Fin + Rst + + GoAwayStatus = enum + NormalTermination = 0x0, + ProtocolError = 0x1, + InternalError = 0x2, + + YamuxHeader = object + version: uint8 + msgType: MsgType + flags: set[MsgFlags] + streamId: uint32 + length: uint32 + +proc readHeader(conn: LPStream): Future[YamuxHeader] {.async, gcsafe.} = + var buffer: array[12, byte] + await conn.readExactly(addr buffer[0], 12) + + result.version = buffer[0] + let flags = fromBytesBE(uint16, buffer[2..3]) + if not result.msgType.checkedEnumAssign(buffer[1]) or flags notin 0'u16..15'u16: + raise newException(YamuxError, "Wrong header") + result.flags = cast[set[MsgFlags]](flags) + result.streamId = fromBytesBE(uint32, buffer[4..7]) + result.length = fromBytesBE(uint32, buffer[8..11]) + return result + +proc `$`(header: YamuxHeader): string = + result = "{" & $header.msgType & ", " + result &= "{" & header.flags.foldl(if a != "": a & ", " & $b else: $b, "") & "}, " + result &= "streamId: " & $header.streamId & ", " + result &= "length: " & $header.length & "}" + +proc encode(header: YamuxHeader): array[12, byte] = + result[0] = header.version + result[1] = uint8(header.msgType) + result[2..3] = toBytesBE(cast[uint16](header.flags)) + result[4..7] = toBytesBE(header.streamId) + result[8..11] = toBytesBE(header.length) + +proc write(conn: LPStream, header: YamuxHeader): Future[void] {.gcsafe.} = + trace "write directly on stream", h = $header + var buffer = header.encode() + return conn.write(@buffer) + +proc ping(T: type[YamuxHeader], flag: MsgFlags, pingData: uint32): T = + T( + version: YamuxVersion, + msgType: MsgType.Ping, + flags: {flag}, + length: pingData + ) + +proc goAway(T: type[YamuxHeader], status: GoAwayStatus): T = + T( + version: YamuxVersion, + msgType: MsgType.GoAway, + length: uint32(status) + ) + +proc data( + T: type[YamuxHeader], + streamId: uint32, + length: uint32 = 0, + flags: set[MsgFlags] = {}, + ): T = + T( + version: YamuxVersion, + msgType: MsgType.Data, + length: length, + flags: flags, + streamId: streamId + ) + +proc windowUpdate( + T: type[YamuxHeader], + streamId: uint32, + delta: uint32, + flags: set[MsgFlags] = {}, + ): T = + T( + version: YamuxVersion, + msgType: MsgType.WindowUpdate, + length: delta, + flags: flags, + streamId: streamId + ) + +type + ToSend = tuple + data: seq[byte] + sent: int + fut: Future[void] + YamuxChannel* = ref object of Connection + id: uint32 + recvWindow: int + sendWindow: int + maxRecvWindow: int + conn: Connection + isSrc: bool + opened: bool + isSending: bool + sendQueue: seq[ToSend] + recvQueue: seq[byte] + isReset: bool + closedRemotely: Future[void] + closedLocally: bool + receivedData: AsyncEvent + returnedEof: bool + +proc `$`(channel: YamuxChannel): string = + result = if channel.conn.dir == Out: "=> " else: "<= " + result &= $channel.id + var s: seq[string] = @[] + if channel.closedRemotely.done(): + s.add("ClosedRemotely") + if channel.closedLocally: + s.add("ClosedLocally") + if channel.isReset: + s.add("Reset") + if s.len > 0: + result &= " {" & s.foldl(if a != "": a & ", " & b else: b, "") & "}" + +proc sendQueueBytes(channel: YamuxChannel, limit: bool = false): int = + for (elem, sent, _) in channel.sendQueue: + result.inc(min(elem.len - sent, if limit: channel.maxRecvWindow div 3 else: elem.len - sent)) + +proc actuallyClose(channel: YamuxChannel) {.async.} = + if channel.closedLocally and channel.sendQueue.len == 0 and + channel.closedRemotely.done(): + await procCall Connection(channel).closeImpl() + +proc remoteClosed(channel: YamuxChannel) {.async.} = + if not channel.closedRemotely.done(): + channel.closedRemotely.complete() + await channel.actuallyClose() + +method closeImpl*(channel: YamuxChannel) {.async, gcsafe.} = + if not channel.closedLocally: + channel.closedLocally = true + + if channel.isReset == false and channel.sendQueue.len == 0: + await channel.conn.write(YamuxHeader.data(channel.id, 0, {Fin})) + await channel.actuallyClose() + +proc reset(channel: YamuxChannel, isLocal: bool = false) {.async.} = + if not channel.isReset: + trace "Reset channel" + channel.isReset = true + for (d, s, fut) in channel.sendQueue: + fut.fail(newLPStreamEOFError()) + channel.sendQueue = @[] + channel.recvQueue = @[] + channel.sendWindow = 0 + if not channel.closedLocally: + if isLocal: + try: await channel.conn.write(YamuxHeader.data(channel.id, 0, {Rst})) + except LPStreamEOFError as exc: discard + await channel.close() + if not channel.closedRemotely.done(): + await channel.remoteClosed() + channel.receivedData.fire() + if not isLocal: + # If we reset locally, we want to flush up to a maximum of recvWindow + # bytes. We use the recvWindow in the proc cleanupChann. + channel.recvWindow = 0 + +proc updateRecvWindow(channel: YamuxChannel) {.async.} = + let inWindow = channel.recvWindow + channel.recvQueue.len + if inWindow > channel.maxRecvWindow div 2: + return + + let delta = channel.maxRecvWindow - inWindow + channel.recvWindow.inc(delta) + await channel.conn.write(YamuxHeader.windowUpdate( + channel.id, + delta.uint32 + )) + trace "increasing the recvWindow", delta + +method readOnce*( + channel: YamuxChannel, + pbytes: pointer, + nbytes: int): + Future[int] {.async.} = + + if channel.returnedEof: raise newLPStreamEOFError() + if channel.recvQueue.len == 0: + channel.receivedData.clear() + await channel.closedRemotely or channel.receivedData.wait() + if channel.closedRemotely.done() and channel.recvQueue.len == 0: + channel.returnedEof = true + return 0 + + let toRead = min(channel.recvQueue.len, nbytes) + + var p = cast[ptr UncheckedArray[byte]](pbytes) + toOpenArray(p, 0, nbytes - 1)[0.. channel.maxRecvWindow: + await channel.reset(true) + break + + let + bytesAvailable = channel.sendQueueBytes() + toSend = min(channel.sendWindow, bytesAvailable) + var + sendBuffer = newSeqUninitialized[byte](toSend + 12) + header = YamuxHeader.data(channel.id, toSend.uint32) + inBuffer = 0 + + if toSend >= bytesAvailable and channel.closedLocally: + trace "last buffer we'll sent on this channel", toSend, bytesAvailable + header.flags.incl({Fin}) + + sendBuffer[0..<12] = header.encode() + + var futures: seq[Future[void]] + while inBuffer < toSend: + let (data, sent, fut) = channel.sendQueue[0] + let bufferToSend = min(data.len - sent, toSend - inBuffer) + sendBuffer.toOpenArray(12, 12 + toSend - 1)[inBuffer..<(inBuffer+bufferToSend)] = + channel.sendQueue[0].data.toOpenArray(sent, sent + bufferToSend - 1) + channel.sendQueue[0].sent.inc(bufferToSend) + if channel.sendQueue[0].sent >= data.len: + futures.add(fut) + channel.sendQueue.delete(0) + inBuffer.inc(bufferToSend) + + trace "build send buffer", h = $header, msg=string.fromBytes(sendBuffer[12..^1]) + channel.sendWindow.dec(toSend) + try: await channel.conn.write(sendBuffer) + except LPStreamEOFError as exc: + for fut in futures.items(): + fut.fail(exc) + await channel.reset() + break + for fut in futures.items(): + fut.complete() + channel.activity = true + +method write*(channel: YamuxChannel, msg: seq[byte]): Future[void] = + result = newFuture[void]("Yamux Send") + if channel.closedLocally or channel.isReset: + result.fail(newLPStreamEOFError()) + return result + if msg.len == 0: + result.complete() + return result + channel.sendQueue.add((msg, 0, result)) + asyncSpawn channel.trySend() + +proc open*(channel: YamuxChannel) {.async, gcsafe.} = + if channel.opened: + trace "Try to open channel twice" + return + channel.opened = true + await channel.conn.write(YamuxHeader.data(channel.id, 0, {if channel.isSrc: Syn else: Ack})) + +type + Yamux* = ref object of Muxer + channels: Table[uint32, YamuxChannel] + flushed: Table[uint32, int] + currentId: uint32 + isClosed: bool + +proc cleanupChann(m: Yamux, channel: YamuxChannel) {.async.} = + await channel.join() + m.channels.del(channel.id) + if channel.isReset and channel.recvWindow > 0: + m.flushed[channel.id] = channel.recvWindow + +proc createStream(m: Yamux, id: uint32, isSrc: bool): YamuxChannel = + result = YamuxChannel( + id: id, + maxRecvWindow: DefaultWindowSize, + recvWindow: DefaultWindowSize, + sendWindow: DefaultWindowSize, + isSrc: isSrc, + conn: m.connection, + receivedData: newAsyncEvent(), + closedRemotely: newFuture[void]() + ) + result.initStream() + result.peerId = m.connection.peerId + result.observedAddr = m.connection.observedAddr + result.transportDir = m.connection.transportDir + when defined(libp2p_agents_metrics): + result.shortAgent = m.connection.shortAgent + m.channels[id] = result + asyncSpawn m.cleanupChann(result) + trace "created channel", id, pid=m.connection.peerId + +method close*(m: Yamux) {.async.} = + if m.isClosed == true: + trace "Already closed" + return + m.isClosed = true + + trace "Closing yamux" + for channel in m.channels.values: + await channel.reset() + await m.connection.write(YamuxHeader.goAway(NormalTermination)) + await m.connection.close() + trace "Closed yamux" + +proc handleStream(m: Yamux, channel: YamuxChannel) {.async.} = + ## call the muxer stream handler for this channel + ## + try: + await m.streamHandler(channel) + trace "finished handling stream" + doAssert(channel.isClosed, "connection not closed by handler!") + except CatchableError as exc: + trace "Exception in yamux stream handler", msg = exc.msg + await channel.reset() + +method handle*(m: Yamux) {.async, gcsafe.} = + trace "Starting yamux handler", pid=m.connection.peerId + try: + while not m.connection.atEof: + trace "waiting for header" + let header = await m.connection.readHeader() + trace "got message", h = $header + + case header.msgType: + of Ping: + if MsgFlags.Syn in header.flags: + await m.connection.write(YamuxHeader.ping(MsgFlags.Ack, header.length)) + of GoAway: + var status: GoAwayStatus + if status.checkedEnumAssign(header.length): trace "Received go away", status + else: trace "Received unexpected error go away" + break + of Data, WindowUpdate: + if MsgFlags.Syn in header.flags: + if header.streamId in m.channels: + debug "Trying to create an existing channel, skipping", id=header.streamId + else: + if header.streamId in m.flushed: + m.flushed.del(header.streamId) + if header.streamId mod 2 == m.currentId mod 2: + raise newException(YamuxError, "Peer used our reserved stream id") + let newStream = m.createStream(header.streamId, false) + await newStream.open() + asyncSpawn m.handleStream(newStream) + elif header.streamId notin m.channels: + if header.streamId notin m.flushed: + raise newException(YamuxError, "Unknown stream ID: " & $header.streamId) + elif header.msgType == Data: + # Flush the data + m.flushed[header.streamId].dec(int(header.length)) + if m.flushed[header.streamId] < 0: + raise newException(YamuxError, "Peer exhausted the recvWindow after reset") + var buffer = newSeqUninitialized[byte](header.length) + await m.connection.readExactly(addr buffer[0], int(header.length)) + continue + + let channel = m.channels[header.streamId] + + if header.msgType == WindowUpdate: + channel.sendWindow += int(header.length) + await channel.trySend() + else: + if header.length.int > channel.recvWindow.int: + # check before allocating the buffer + raise newException(YamuxError, "Peer exhausted the recvWindow") + + if header.length > 0: + var buffer = newSeqUninitialized[byte](header.length) + await m.connection.readExactly(addr buffer[0], int(header.length)) + trace "Msg Rcv", msg=string.fromBytes(buffer) + await channel.gotDataFromRemote(buffer) + + if MsgFlags.Fin in header.flags: + trace "remote closed channel" + await channel.remoteClosed() + if MsgFlags.Rst in header.flags: + trace "remote reset channel" + await channel.reset() + except LPStreamEOFError as exc: + trace "Stream EOF", msg = exc.msg + except YamuxError as exc: + trace "Closing yamux connection", error=exc.msg + await m.connection.write(YamuxHeader.goAway(ProtocolError)) + finally: + await m.close() + trace "Stopped yamux handler" + +method newStream*( + m: Yamux, + name: string = "", + lazy: bool = false): Future[Connection] {.async, gcsafe.} = + + let stream = m.createStream(m.currentId, true) + m.currentId += 2 + if not lazy: + await stream.open() + return stream + +proc new*(T: type[Yamux], conn: Connection): T = + T( + connection: conn, + currentId: if conn.dir == Out: 1 else: 2 + ) diff --git a/libp2p/protocols/relay.nim b/libp2p/protocols/relay.nim index 0a08f98..88c5ac9 100644 --- a/libp2p/protocols/relay.nim +++ b/libp2p/protocols/relay.nim @@ -466,6 +466,7 @@ proc dial*(self: RelayTransport, ma: MultiAddress): Future[Connection] {.async, trace "Dial", relayPeerId, relayAddrs, dstPeerId let conn = await self.relay.switch.dial(relayPeerId, @[ relayAddrs ], RelayCodec) + conn.dir = Direction.Out result = await self.relay.dialPeer(conn, dstPeerId, @[]) method dial*( diff --git a/tests/commoninterop.nim b/tests/commoninterop.nim new file mode 100644 index 0000000..652cab2 --- /dev/null +++ b/tests/commoninterop.nim @@ -0,0 +1,596 @@ +import options, tables +import chronos, chronicles, stew/byteutils +import helpers +import ../libp2p +import ../libp2p/[daemon/daemonapi, varint, transports/wstransport, crypto/crypto, protocols/relay ] + +type + SwitchCreator = proc( + isRelay: bool = false, + ma: MultiAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(), + prov: TransportProvider = proc(upgr: Upgrade): Transport = TcpTransport.new({}, upgr)): + Switch {.gcsafe, raises: [Defect, LPError].} + DaemonPeerInfo = daemonapi.PeerInfo + +proc writeLp(s: StreamTransport, msg: string | seq[byte]): Future[int] {.gcsafe.} = + ## write lenght prefixed + var buf = initVBuffer() + buf.writeSeq(msg) + buf.finish() + result = s.write(buf.buffer) + +proc readLp(s: StreamTransport): Future[seq[byte]] {.async, gcsafe.} = + ## read length prefixed msg + var + size: uint + length: int + res: VarintResult[void] + result = newSeq[byte](10) + + for i in 0.. 0.uint: + await s.readExactly(addr result[0], int(size)) + +proc testPubSubDaemonPublish( + gossip: bool = false, + count: int = 1, + swCreator: SwitchCreator) {.async.} = + var pubsubData = "TEST MESSAGE" + var testTopic = "test-topic" + var msgData = pubsubData.toBytes() + + var flags = {PSFloodSub} + if gossip: + flags = {PSGossipSub} + + let daemonNode = await newDaemonApi(flags) + let daemonPeer = await daemonNode.identity() + let nativeNode = swCreator() + + let pubsub = if gossip: + GossipSub.init( + switch = nativeNode).PubSub + else: + FloodSub.init( + switch = nativeNode).PubSub + + nativeNode.mount(pubsub) + + await nativeNode.start() + await pubsub.start() + let nativePeer = nativeNode.peerInfo + + var finished = false + var times = 0 + proc nativeHandler(topic: string, data: seq[byte]) {.async.} = + let smsg = string.fromBytes(data) + check smsg == pubsubData + times.inc() + if times >= count and not finished: + finished = true + + await nativeNode.connect(daemonPeer.peer, daemonPeer.addresses) + + await sleepAsync(1.seconds) + await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) + + proc pubsubHandler(api: DaemonAPI, + ticket: PubsubTicket, + message: PubSubMessage): Future[bool] {.async.} = + result = true # don't cancel subscription + + asyncDiscard daemonNode.pubsubSubscribe(testTopic, pubsubHandler) + pubsub.subscribe(testTopic, nativeHandler) + await sleepAsync(5.seconds) + + proc publisher() {.async.} = + while not finished: + await daemonNode.pubsubPublish(testTopic, msgData) + await sleepAsync(500.millis) + + await wait(publisher(), 5.minutes) # should be plenty of time + + await nativeNode.stop() + await pubsub.stop() + await daemonNode.close() + +proc testPubSubNodePublish( + gossip: bool = false, + count: int = 1, + swCreator: SwitchCreator) {.async.} = + var pubsubData = "TEST MESSAGE" + var testTopic = "test-topic" + var msgData = pubsubData.toBytes() + + var flags = {PSFloodSub} + if gossip: + flags = {PSGossipSub} + + let daemonNode = await newDaemonApi(flags) + let daemonPeer = await daemonNode.identity() + let nativeNode = swCreator() + + let pubsub = if gossip: + GossipSub.init( + switch = nativeNode).PubSub + else: + FloodSub.init( + switch = nativeNode).PubSub + + nativeNode.mount(pubsub) + + await nativeNode.start() + await pubsub.start() + let nativePeer = nativeNode.peerInfo + + await nativeNode.connect(daemonPeer.peer, daemonPeer.addresses) + + await sleepAsync(1.seconds) + await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) + + var times = 0 + var finished = false + proc pubsubHandler(api: DaemonAPI, + ticket: PubsubTicket, + message: PubSubMessage): Future[bool] {.async.} = + let smsg = string.fromBytes(message.data) + check smsg == pubsubData + times.inc() + if times >= count and not finished: + finished = true + result = true # don't cancel subscription + + discard await daemonNode.pubsubSubscribe(testTopic, pubsubHandler) + proc nativeHandler(topic: string, data: seq[byte]) {.async.} = discard + pubsub.subscribe(testTopic, nativeHandler) + await sleepAsync(5.seconds) + + proc publisher() {.async.} = + while not finished: + discard await pubsub.publish(testTopic, msgData) + await sleepAsync(500.millis) + + await wait(publisher(), 5.minutes) # should be plenty of time + + check finished + await nativeNode.stop() + await pubsub.stop() + await daemonNode.close() + +proc commonInteropTests*(name: string, swCreator: SwitchCreator) = + suite "Interop using " & name: + # TODO: chronos transports are leaking, + # but those are tracked for both the daemon + # and libp2p, so not sure which one it is, + # need to investigate more + # teardown: + # checkTrackers() + + # TODO: this test is failing sometimes on windows + # For some reason we receive EOF before test 4 sometimes + + asyncTest "native -> daemon multiple reads and writes": + var protos = @["/test-stream"] + + let nativeNode = swCreator() + + await nativeNode.start() + let daemonNode = await newDaemonApi() + let daemonPeer = await daemonNode.identity() + + var testFuture = newFuture[void]("test.future") + proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} = + check string.fromBytes(await stream.transp.readLp()) == "test 1" + discard await stream.transp.writeLp("test 2") + check string.fromBytes(await stream.transp.readLp()) == "test 3" + discard await stream.transp.writeLp("test 4") + testFuture.complete() + + await daemonNode.addHandler(protos, daemonHandler) + let conn = await nativeNode.dial(daemonPeer.peer, daemonPeer.addresses, protos[0]) + await conn.writeLp("test 1") + check "test 2" == string.fromBytes((await conn.readLp(1024))) + + await conn.writeLp("test 3") + check "test 4" == string.fromBytes((await conn.readLp(1024))) + + await wait(testFuture, 10.secs) + + await nativeNode.stop() + await daemonNode.close() + + await sleepAsync(1.seconds) + + asyncTest "native -> daemon connection": + var protos = @["/test-stream"] + var test = "TEST STRING" + # We are preparing expect string, which should be prefixed with varint + # length and do not have `\r\n` suffix, because we going to use + # readLine(). + var buffer = initVBuffer() + buffer.writeSeq(test & "\r\n") + buffer.finish() + var expect = newString(len(buffer) - 2) + copyMem(addr expect[0], addr buffer.buffer[0], len(expect)) + + let nativeNode = swCreator() + + await nativeNode.start() + + let daemonNode = await newDaemonApi() + let daemonPeer = await daemonNode.identity() + + var testFuture = newFuture[string]("test.future") + proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} = + # We should perform `readLp()` instead of `readLine()`. `readLine()` + # here reads actually length prefixed string. + var line = await stream.transp.readLine() + check line == expect + testFuture.complete(line) + await stream.close() + + await daemonNode.addHandler(protos, daemonHandler) + let conn = await nativeNode.dial(daemonPeer.peer, daemonPeer.addresses, protos[0]) + await conn.writeLp(test & "\r\n") + check expect == (await wait(testFuture, 10.secs)) + + await conn.close() + await nativeNode.stop() + await daemonNode.close() + + asyncTest "daemon -> native connection": + var protos = @["/test-stream"] + var test = "TEST STRING" + + var testFuture = newFuture[string]("test.future") + proc nativeHandler(conn: Connection, proto: string) {.async.} = + var line = string.fromBytes(await conn.readLp(1024)) + check line == test + testFuture.complete(line) + await conn.close() + + # custom proto + var proto = new LPProtocol + proto.handler = nativeHandler + proto.codec = protos[0] # codec + + let nativeNode = swCreator() + + nativeNode.mount(proto) + + await nativeNode.start() + let nativePeer = nativeNode.peerInfo + + let daemonNode = await newDaemonApi() + await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) + var stream = await daemonNode.openStream(nativePeer.peerId, protos) + discard await stream.transp.writeLp(test) + + check test == (await wait(testFuture, 10.secs)) + + await stream.close() + await nativeNode.stop() + await daemonNode.close() + await sleepAsync(1.seconds) + + asyncTest "native -> daemon websocket connection": + var protos = @["/test-stream"] + var test = "TEST STRING" + + var testFuture = newFuture[string]("test.future") + proc nativeHandler(conn: Connection, proto: string) {.async.} = + var line = string.fromBytes(await conn.readLp(1024)) + check line == test + testFuture.complete(line) + await conn.close() + + # custom proto + var proto = new LPProtocol + proto.handler = nativeHandler + proto.codec = protos[0] # codec + + let wsAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0/ws").tryGet() + + let nativeNode = swCreator( + ma = wsAddress, + prov = proc (upgr: Upgrade): Transport = WsTransport.new(upgr) + ) + + nativeNode.mount(proto) + + await nativeNode.start() + let nativePeer = nativeNode.peerInfo + + let daemonNode = await newDaemonApi(hostAddresses = @[wsAddress]) + await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) + var stream = await daemonNode.openStream(nativePeer.peerId, protos) + discard await stream.transp.writeLp(test) + + check test == (await wait(testFuture, 10.secs)) + + await stream.close() + await nativeNode.stop() + await daemonNode.close() + await sleepAsync(1.seconds) + + asyncTest "daemon -> native websocket connection": + var protos = @["/test-stream"] + var test = "TEST STRING" + # We are preparing expect string, which should be prefixed with varint + # length and do not have `\r\n` suffix, because we going to use + # readLine(). + var buffer = initVBuffer() + buffer.writeSeq(test & "\r\n") + buffer.finish() + var expect = newString(len(buffer) - 2) + copyMem(addr expect[0], addr buffer.buffer[0], len(expect)) + + let wsAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0/ws").tryGet() + let nativeNode = SwitchBuilder + .new() + .withAddress(wsAddress) + .withRng(crypto.newRng()) + .withMplex() + .withTransport(proc (upgr: Upgrade): Transport = WsTransport.new(upgr)) + .withNoise() + .build() + + await nativeNode.start() + + let daemonNode = await newDaemonApi(hostAddresses = @[wsAddress]) + let daemonPeer = await daemonNode.identity() + + var testFuture = newFuture[string]("test.future") + proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} = + # We should perform `readLp()` instead of `readLine()`. `readLine()` + # here reads actually length prefixed string. + var line = await stream.transp.readLine() + check line == expect + testFuture.complete(line) + await stream.close() + + await daemonNode.addHandler(protos, daemonHandler) + let conn = await nativeNode.dial(daemonPeer.peer, daemonPeer.addresses, protos[0]) + await conn.writeLp(test & "\r\n") + check expect == (await wait(testFuture, 10.secs)) + + await conn.close() + await nativeNode.stop() + await daemonNode.close() + + asyncTest "daemon -> multiple reads and writes": + var protos = @["/test-stream"] + + var testFuture = newFuture[void]("test.future") + proc nativeHandler(conn: Connection, proto: string) {.async.} = + check "test 1" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("test 2".toBytes()) + + check "test 3" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("test 4".toBytes()) + + testFuture.complete() + await conn.close() + + # custom proto + var proto = new LPProtocol + proto.handler = nativeHandler + proto.codec = protos[0] # codec + + let nativeNode = swCreator() + + nativeNode.mount(proto) + + await nativeNode.start() + let nativePeer = nativeNode.peerInfo + + let daemonNode = await newDaemonApi() + await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) + var stream = await daemonNode.openStream(nativePeer.peerId, protos) + + asyncDiscard stream.transp.writeLp("test 1") + check "test 2" == string.fromBytes(await stream.transp.readLp()) + + asyncDiscard stream.transp.writeLp("test 3") + check "test 4" == string.fromBytes(await stream.transp.readLp()) + + await wait(testFuture, 10.secs) + + await stream.close() + await nativeNode.stop() + await daemonNode.close() + + asyncTest "read write multiple": + var protos = @["/test-stream"] + var test = "TEST STRING" + + var count = 0 + var testFuture = newFuture[int]("test.future") + proc nativeHandler(conn: Connection, proto: string) {.async.} = + while count < 10: + var line = string.fromBytes(await conn.readLp(1024)) + check line == test + await conn.writeLp(test.toBytes()) + count.inc() + + testFuture.complete(count) + await conn.close() + + # custom proto + var proto = new LPProtocol + proto.handler = nativeHandler + proto.codec = protos[0] # codec + + let nativeNode = swCreator() + + nativeNode.mount(proto) + + await nativeNode.start() + let nativePeer = nativeNode.peerInfo + + let daemonNode = await newDaemonApi() + await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) + var stream = await daemonNode.openStream(nativePeer.peerId, protos) + + var count2 = 0 + while count2 < 10: + discard await stream.transp.writeLp(test) + let line = await stream.transp.readLp() + check test == string.fromBytes(line) + inc(count2) + + check 10 == (await wait(testFuture, 1.minutes)) + await stream.close() + await nativeNode.stop() + await daemonNode.close() + + asyncTest "floodsub: daemon publish one": + await testPubSubDaemonPublish(swCreator = swCreator) + + asyncTest "floodsub: daemon publish many": + await testPubSubDaemonPublish(count = 10, swCreator = swCreator) + + asyncTest "gossipsub: daemon publish one": + await testPubSubDaemonPublish(gossip = true, swCreator = swCreator) + + asyncTest "gossipsub: daemon publish many": + await testPubSubDaemonPublish(gossip = true, count = 10, swCreator = swCreator) + + asyncTest "floodsub: node publish one": + await testPubSubNodePublish(swCreator = swCreator) + + asyncTest "floodsub: node publish many": + await testPubSubNodePublish(count = 10, swCreator = swCreator) + + asyncTest "gossipsub: node publish one": + await testPubSubNodePublish(gossip = true, swCreator = swCreator) + + asyncTest "gossipsub: node publish many": + await testPubSubNodePublish(gossip = true, count = 10, swCreator = swCreator) + +proc relayInteropTests*(name: string, relayCreator: SwitchCreator) = + suite "Interop relay using " & name: + asyncTest "NativeSrc -> NativeRelay -> DaemonDst": + let closeBlocker = newFuture[void]() + # TODO: This Future blocks the daemonHandler after sending the last message. + # It exists because there's a strange behavior where stream.close sends + # a Rst instead of Fin. We should investigate this at some point. + proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} = + check "line1" == string.fromBytes(await stream.transp.readLp()) + discard await stream.transp.writeLp("line2") + check "line3" == string.fromBytes(await stream.transp.readLp()) + discard await stream.transp.writeLp("line4") + await closeBlocker + await stream.close() + let + maSrc = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + maRel = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + src = relayCreator(false, maSrc) + rel = relayCreator(true, maRel) + + await src.start() + await rel.start() + let daemonNode = await newDaemonApi() + let daemonPeer = await daemonNode.identity() + let maStr = $rel.peerInfo.addrs[0] & "/p2p/" & $rel.peerInfo.peerId & "/p2p-circuit/p2p/" & $daemonPeer.peer + let maddr = MultiAddress.init(maStr).tryGet() + await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) + await rel.connect(daemonPeer.peer, daemonPeer.addresses) + + await daemonNode.addHandler(@[ "/testCustom" ], daemonHandler) + + let conn = await src.dial(daemonPeer.peer, @[ maddr ], @[ "/testCustom" ]) + + await conn.writeLp("line1") + check string.fromBytes(await conn.readLp(1024)) == "line2" + + await conn.writeLp("line3") + check string.fromBytes(await conn.readLp(1024)) == "line4" + + closeBlocker.complete() + await allFutures(src.stop(), rel.stop()) + await daemonNode.close() + + asyncTest "DaemonSrc -> NativeRelay -> NativeDst": + proc customHandler(conn: Connection, proto: string) {.async.} = + check "line1" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("line2") + check "line3" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("line4") + await conn.close() + let + protos = @[ "/customProto", RelayCodec ] + var + customProto = new LPProtocol + customProto.handler = customHandler + customProto.codec = protos[0] + let + maRel = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + maDst = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + rel = relayCreator(true, maRel) + dst = relayCreator(false, maDst) + + dst.mount(customProto) + await rel.start() + await dst.start() + let daemonNode = await newDaemonApi() + let daemonPeer = await daemonNode.identity() + let maStr = $rel.peerInfo.addrs[0] & "/p2p/" & $rel.peerInfo.peerId & "/p2p-circuit/p2p/" & $dst.peerInfo.peerId + let maddr = MultiAddress.init(maStr).tryGet() + await daemonNode.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) + await rel.connect(dst.peerInfo.peerId, dst.peerInfo.addrs) + await daemonNode.connect(dst.peerInfo.peerId, @[ maddr ]) + var stream = await daemonNode.openStream(dst.peerInfo.peerId, protos) + + discard await stream.transp.writeLp("line1") + check string.fromBytes(await stream.transp.readLp()) == "line2" + discard await stream.transp.writeLp("line3") + check string.fromBytes(await stream.transp.readLp()) == "line4" + + await allFutures(dst.stop(), rel.stop()) + await daemonNode.close() + + asyncTest "NativeSrc -> DaemonRelay -> NativeDst": + proc customHandler(conn: Connection, proto: string) {.async.} = + check "line1" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("line2") + check "line3" == string.fromBytes(await conn.readLp(1024)) + await conn.writeLp("line4") + await conn.close() + let + protos = @[ "/customProto", RelayCodec ] + var + customProto = new LPProtocol + customProto.handler = customHandler + customProto.codec = protos[0] + let + maSrc = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + maDst = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() + src = relayCreator(false, maSrc) + dst = relayCreator(false, maDst) + + dst.mount(customProto) + await src.start() + await dst.start() + let daemonNode = await newDaemonApi({RelayHop}) + let daemonPeer = await daemonNode.identity() + let maStr = $daemonPeer.addresses[0] & "/p2p/" & $daemonPeer.peer & "/p2p-circuit/p2p/" & $dst.peerInfo.peerId + let maddr = MultiAddress.init(maStr).tryGet() + await src.connect(daemonPeer.peer, daemonPeer.addresses) + await daemonNode.connect(dst.peerInfo.peerId, dst.peerInfo.addrs) + let conn = await src.dial(dst.peerInfo.peerId, @[ maddr ], protos[0]) + + await conn.writeLp("line1") + check string.fromBytes(await conn.readLp(1024)) == "line2" + + await conn.writeLp("line3") + check string.fromBytes(await conn.readLp(1024)) == "line4" + + await allFutures(src.stop(), dst.stop()) + await daemonNode.close() diff --git a/tests/helpers.nim b/tests/helpers.nim index 82bfa12..7be83da 100644 --- a/tests/helpers.nim +++ b/tests/helpers.nim @@ -87,6 +87,22 @@ proc new*(T: typedesc[TestBufferStream], writeHandler: WriteHandler): T = testBufferStream.initStream() testBufferStream +proc bridgedConnections*: (Connection, Connection) = + let + connA = TestBufferStream() + connB = TestBufferStream() + connA.dir = Direction.Out + connB.dir = Direction.In + connA.initStream() + connB.initStream() + connA.writeHandler = proc(data: seq[byte]) {.async.} = + await connB.pushData(data) + + connB.writeHandler = proc(data: seq[byte]) {.async.} = + await connA.pushData(data) + return (connA, connB) + + proc checkExpiringInternal(cond: proc(): bool {.raises: [Defect].} ): Future[bool] {.async, gcsafe.} = {.gcsafe.}: let start = Moment.now() diff --git a/tests/testinterop.nim b/tests/testinterop.nim index 9903650..9f5e5be 100644 --- a/tests/testinterop.nim +++ b/tests/testinterop.nim @@ -1,624 +1,55 @@ -import options, tables, stublogger -import chronos, chronicles, stew/byteutils -import helpers +import helpers, commoninterop import ../libp2p -import ../libp2p/[daemon/daemonapi, varint, transports/wstransport, crypto/crypto, protocols/relay ] - -type - DaemonPeerInfo = daemonapi.PeerInfo - -proc writeLp*(s: StreamTransport, msg: string | seq[byte]): Future[int] {.gcsafe.} = - ## write lenght prefixed - var buf = initVBuffer() - buf.writeSeq(msg) - buf.finish() - result = s.write(buf.buffer) - -proc readLp*(s: StreamTransport): Future[seq[byte]] {.async, gcsafe.} = - ## read length prefixed msg - var - size: uint - length: int - res: VarintResult[void] - result = newSeq[byte](10) - - for i in 0.. 0.uint: - await s.readExactly(addr result[0], int(size)) - -proc testPubSubDaemonPublish(gossip: bool = false, count: int = 1) {.async.} = - var pubsubData = "TEST MESSAGE" - var testTopic = "test-topic" - var msgData = pubsubData.toBytes() - - var flags = {PSFloodSub} - if gossip: - flags = {PSGossipSub} - - let daemonNode = await newDaemonApi(flags) - let daemonPeer = await daemonNode.identity() - let nativeNode = newStandardSwitch(outTimeout = 5.minutes) - - let pubsub = if gossip: - GossipSub.init( - switch = nativeNode).PubSub - else: - FloodSub.init( - switch = nativeNode).PubSub - - nativeNode.mount(pubsub) - - await nativeNode.start() - let nativePeer = nativeNode.peerInfo - - var finished = false - var times = 0 - proc nativeHandler(topic: string, data: seq[byte]) {.async.} = - let smsg = string.fromBytes(data) - check smsg == pubsubData - times.inc() - if times >= count and not finished: - finished = true - - await nativeNode.connect(daemonPeer.peer, daemonPeer.addresses) - - await sleepAsync(1.seconds) - await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) - - proc pubsubHandler(api: DaemonAPI, - ticket: PubsubTicket, - message: PubSubMessage): Future[bool] {.async.} = - result = true # don't cancel subscription - - asyncDiscard daemonNode.pubsubSubscribe(testTopic, pubsubHandler) - pubsub.subscribe(testTopic, nativeHandler) - await sleepAsync(5.seconds) - - proc publisher() {.async.} = - while not finished: - await daemonNode.pubsubPublish(testTopic, msgData) - await sleepAsync(500.millis) - - await wait(publisher(), 5.minutes) # should be plenty of time - - await nativeNode.stop() - await daemonNode.close() - -proc testPubSubNodePublish(gossip: bool = false, count: int = 1) {.async.} = - var pubsubData = "TEST MESSAGE" - var testTopic = "test-topic" - var msgData = pubsubData.toBytes() - - var flags = {PSFloodSub} - if gossip: - flags = {PSGossipSub} - - let daemonNode = await newDaemonApi(flags) - let daemonPeer = await daemonNode.identity() - let nativeNode = newStandardSwitch(outTimeout = 5.minutes) - - let pubsub = if gossip: - GossipSub.init( - switch = nativeNode).PubSub - else: - FloodSub.init( - switch = nativeNode).PubSub - - nativeNode.mount(pubsub) - - await nativeNode.start() - let nativePeer = nativeNode.peerInfo - - await nativeNode.connect(daemonPeer.peer, daemonPeer.addresses) - - await sleepAsync(1.seconds) - await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) - - var times = 0 - var finished = false - proc pubsubHandler(api: DaemonAPI, - ticket: PubsubTicket, - message: PubSubMessage): Future[bool] {.async.} = - let smsg = string.fromBytes(message.data) - check smsg == pubsubData - times.inc() - if times >= count and not finished: - finished = true - result = true # don't cancel subscription - - discard await daemonNode.pubsubSubscribe(testTopic, pubsubHandler) - proc nativeHandler(topic: string, data: seq[byte]) {.async.} = discard - pubsub.subscribe(testTopic, nativeHandler) - await sleepAsync(5.seconds) - - proc publisher() {.async.} = - while not finished: - discard await pubsub.publish(testTopic, msgData) - await sleepAsync(500.millis) - - await wait(publisher(), 5.minutes) # should be plenty of time - - check finished - await nativeNode.stop() - await daemonNode.close() - -suite "Interop": - # TODO: chronos transports are leaking, - # but those are tracked for both the daemon - # and libp2p, so not sure which one it is, - # need to investigate more - # teardown: - # checkTrackers() - - # TODO: this test is failing sometimes on windows - # For some reason we receive EOF before test 4 sometimes - asyncTest "native -> daemon multiple reads and writes": - var protos = @["/test-stream"] - - let nativeNode = newStandardSwitch( - secureManagers = [SecureProtocol.Noise], - outTimeout = 5.minutes) - - await nativeNode.start() - let daemonNode = await newDaemonApi() - let daemonPeer = await daemonNode.identity() - - var testFuture = newFuture[void]("test.future") - proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} = - check string.fromBytes(await stream.transp.readLp()) == "test 1" - discard await stream.transp.writeLp("test 2") - check string.fromBytes(await stream.transp.readLp()) == "test 3" - discard await stream.transp.writeLp("test 4") - testFuture.complete() - - await daemonNode.addHandler(protos, daemonHandler) - let conn = await nativeNode.dial(daemonPeer.peer, daemonPeer.addresses, protos[0]) - await conn.writeLp("test 1") - check "test 2" == string.fromBytes((await conn.readLp(1024))) - - await conn.writeLp("test 3") - check "test 4" == string.fromBytes((await conn.readLp(1024))) - - await wait(testFuture, 10.secs) - - await nativeNode.stop() - await daemonNode.close() - - await sleepAsync(1.seconds) - - asyncTest "native -> daemon connection": - var protos = @["/test-stream"] - var test = "TEST STRING" - # We are preparing expect string, which should be prefixed with varint - # length and do not have `\r\n` suffix, because we going to use - # readLine(). - var buffer = initVBuffer() - buffer.writeSeq(test & "\r\n") - buffer.finish() - var expect = newString(len(buffer) - 2) - copyMem(addr expect[0], addr buffer.buffer[0], len(expect)) - - let nativeNode = newStandardSwitch( - secureManagers = [SecureProtocol.Noise], - outTimeout = 5.minutes) - - await nativeNode.start() - - let daemonNode = await newDaemonApi() - let daemonPeer = await daemonNode.identity() - - var testFuture = newFuture[string]("test.future") - proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} = - # We should perform `readLp()` instead of `readLine()`. `readLine()` - # here reads actually length prefixed string. - var line = await stream.transp.readLine() - check line == expect - testFuture.complete(line) - await stream.close() - - await daemonNode.addHandler(protos, daemonHandler) - let conn = await nativeNode.dial(daemonPeer.peer, daemonPeer.addresses, protos[0]) - await conn.writeLp(test & "\r\n") - check expect == (await wait(testFuture, 10.secs)) - - await conn.close() - await nativeNode.stop() - await daemonNode.close() - - asyncTest "daemon -> native connection": - var protos = @["/test-stream"] - var test = "TEST STRING" - - var testFuture = newFuture[string]("test.future") - proc nativeHandler(conn: Connection, proto: string) {.async.} = - var line = string.fromBytes(await conn.readLp(1024)) - check line == test - testFuture.complete(line) - await conn.close() - - # custom proto - var proto = new LPProtocol - proto.handler = nativeHandler - proto.codec = protos[0] # codec - - let nativeNode = newStandardSwitch( - secureManagers = [SecureProtocol.Noise], outTimeout = 5.minutes) - - nativeNode.mount(proto) - - await nativeNode.start() - let nativePeer = nativeNode.peerInfo - - let daemonNode = await newDaemonApi() - await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) - var stream = await daemonNode.openStream(nativePeer.peerId, protos) - discard await stream.transp.writeLp(test) - - check test == (await wait(testFuture, 10.secs)) - - await stream.close() - await nativeNode.stop() - await daemonNode.close() - await sleepAsync(1.seconds) - - asyncTest "native -> daemon websocket connection": - var protos = @["/test-stream"] - var test = "TEST STRING" - - var testFuture = newFuture[string]("test.future") - proc nativeHandler(conn: Connection, proto: string) {.async.} = - var line = string.fromBytes(await conn.readLp(1024)) - check line == test - testFuture.complete(line) - await conn.close() - - # custom proto - var proto = new LPProtocol - proto.handler = nativeHandler - proto.codec = protos[0] # codec - - let wsAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0/ws").tryGet() - - let nativeNode = SwitchBuilder - .new() - .withAddress(wsAddress) - .withRng(crypto.newRng()) - .withMplex() - .withTransport(proc (upgr: Upgrade): Transport = WsTransport.new(upgr)) - .withNoise() - .build() - - nativeNode.mount(proto) - - await nativeNode.start() - let nativePeer = nativeNode.peerInfo - - let daemonNode = await newDaemonApi(hostAddresses = @[wsAddress]) - await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) - var stream = await daemonNode.openStream(nativePeer.peerId, protos) - discard await stream.transp.writeLp(test) - - check test == (await wait(testFuture, 10.secs)) - - await stream.close() - await nativeNode.stop() - await daemonNode.close() - await sleepAsync(1.seconds) - - asyncTest "daemon -> native websocket connection": - var protos = @["/test-stream"] - var test = "TEST STRING" - # We are preparing expect string, which should be prefixed with varint - # length and do not have `\r\n` suffix, because we going to use - # readLine(). - var buffer = initVBuffer() - buffer.writeSeq(test & "\r\n") - buffer.finish() - var expect = newString(len(buffer) - 2) - copyMem(addr expect[0], addr buffer.buffer[0], len(expect)) - - let wsAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0/ws").tryGet() - let nativeNode = SwitchBuilder - .new() - .withAddress(wsAddress) - .withRng(crypto.newRng()) - .withMplex() - .withTransport(proc (upgr: Upgrade): Transport = WsTransport.new(upgr)) - .withNoise() - .build() - - await nativeNode.start() - - let daemonNode = await newDaemonApi(hostAddresses = @[wsAddress]) - let daemonPeer = await daemonNode.identity() - - var testFuture = newFuture[string]("test.future") - proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} = - # We should perform `readLp()` instead of `readLine()`. `readLine()` - # here reads actually length prefixed string. - var line = await stream.transp.readLine() - check line == expect - testFuture.complete(line) - await stream.close() - - await daemonNode.addHandler(protos, daemonHandler) - let conn = await nativeNode.dial(daemonPeer.peer, daemonPeer.addresses, protos[0]) - await conn.writeLp(test & "\r\n") - check expect == (await wait(testFuture, 10.secs)) - - await conn.close() - await nativeNode.stop() - await daemonNode.close() - - asyncTest "daemon -> multiple reads and writes": - var protos = @["/test-stream"] - - var testFuture = newFuture[void]("test.future") - proc nativeHandler(conn: Connection, proto: string) {.async.} = - check "test 1" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("test 2".toBytes()) - - check "test 3" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("test 4".toBytes()) - - testFuture.complete() - await conn.close() - - # custom proto - var proto = new LPProtocol - proto.handler = nativeHandler - proto.codec = protos[0] # codec - - let nativeNode = newStandardSwitch( - secureManagers = [SecureProtocol.Noise], outTimeout = 5.minutes) - - nativeNode.mount(proto) - - await nativeNode.start() - let nativePeer = nativeNode.peerInfo - - let daemonNode = await newDaemonApi() - await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) - var stream = await daemonNode.openStream(nativePeer.peerId, protos) - - asyncDiscard stream.transp.writeLp("test 1") - check "test 2" == string.fromBytes(await stream.transp.readLp()) - - asyncDiscard stream.transp.writeLp("test 3") - check "test 4" == string.fromBytes(await stream.transp.readLp()) - - await wait(testFuture, 10.secs) - - await stream.close() - await nativeNode.stop() - await daemonNode.close() - - asyncTest "read write multiple": - var protos = @["/test-stream"] - var test = "TEST STRING" - - var count = 0 - var testFuture = newFuture[int]("test.future") - proc nativeHandler(conn: Connection, proto: string) {.async.} = - while count < 10: - var line = string.fromBytes(await conn.readLp(1024)) - check line == test - await conn.writeLp(test.toBytes()) - count.inc() - - testFuture.complete(count) - await conn.close() - - # custom proto - var proto = new LPProtocol - proto.handler = nativeHandler - proto.codec = protos[0] # codec - - let nativeNode = newStandardSwitch( - secureManagers = [SecureProtocol.Noise], outTimeout = 5.minutes) - - nativeNode.mount(proto) - - await nativeNode.start() - let nativePeer = nativeNode.peerInfo - - let daemonNode = await newDaemonApi() - await daemonNode.connect(nativePeer.peerId, nativePeer.addrs) - var stream = await daemonNode.openStream(nativePeer.peerId, protos) - - var count2 = 0 - while count2 < 10: - discard await stream.transp.writeLp(test) - let line = await stream.transp.readLp() - check test == string.fromBytes(line) - inc(count2) - - check 10 == (await wait(testFuture, 1.minutes)) - await stream.close() - await nativeNode.stop() - await daemonNode.close() - - asyncTest "floodsub: daemon publish one": - await testPubSubDaemonPublish() - - asyncTest "floodsub: daemon publish many": - await testPubSubDaemonPublish(count = 10) - - asyncTest "gossipsub: daemon publish one": - await testPubSubDaemonPublish(gossip = true) - - asyncTest "gossipsub: daemon publish many": - await testPubSubDaemonPublish(gossip = true, count = 10) - - asyncTest "floodsub: node publish one": - await testPubSubNodePublish() - - asyncTest "floodsub: node publish many": - await testPubSubNodePublish(count = 10) - - asyncTest "gossipsub: node publish one": - await testPubSubNodePublish(gossip = true) - - asyncTest "gossipsub: node publish many": - await testPubSubNodePublish(gossip = true, count = 10) - - asyncTest "NativeSrc -> NativeRelay -> DaemonDst": - proc daemonHandler(api: DaemonAPI, stream: P2PStream) {.async.} = - check "line1" == string.fromBytes(await stream.transp.readLp()) - discard await stream.transp.writeLp("line2") - check "line3" == string.fromBytes(await stream.transp.readLp()) - discard await stream.transp.writeLp("line4") - await stream.close() - let - maSrc = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() - maRel = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() - src = SwitchBuilder.new() - .withRng(crypto.newRng()) - .withAddresses(@[ maSrc ]) - .withTcpTransport() - .withMplex() - .withNoise() - .withRelayTransport(false) - .build() - rel = SwitchBuilder.new() - .withRng(crypto.newRng()) - .withAddresses(@[ maRel ]) - .withTcpTransport() - .withMplex() - .withNoise() - .withRelayTransport(true) - .build() - - await src.start() - await rel.start() - let daemonNode = await newDaemonApi() - let daemonPeer = await daemonNode.identity() - let maStr = $rel.peerInfo.addrs[0] & "/p2p/" & $rel.peerInfo.peerId & "/p2p-circuit/p2p/" & $daemonPeer.peer - let maddr = MultiAddress.init(maStr).tryGet() - await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - await rel.connect(daemonPeer.peer, daemonPeer.addresses) - - await daemonNode.addHandler(@[ "/testCustom" ], daemonHandler) - - let conn = await src.dial(daemonPeer.peer, @[ maddr ], @[ "/testCustom" ]) - - await conn.writeLp("line1") - check string.fromBytes(await conn.readLp(1024)) == "line2" - - await conn.writeLp("line3") - check string.fromBytes(await conn.readLp(1024)) == "line4" - - await allFutures(src.stop(), rel.stop()) - await daemonNode.close() - - asyncTest "DaemonSrc -> NativeRelay -> NativeDst": - proc customHandler(conn: Connection, proto: string) {.async.} = - check "line1" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("line2") - check "line3" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("line4") - await conn.close() - let - protos = @[ "/customProto", RelayCodec ] - var - customProto = new LPProtocol - customProto.handler = customHandler - customProto.codec = protos[0] - let - maRel = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() - maDst = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() - rel = SwitchBuilder.new() - .withRng(crypto.newRng()) - .withAddresses(@[ maRel ]) - .withTcpTransport() - .withMplex() - .withNoise() - .withRelayTransport(true) - .build() - dst = SwitchBuilder.new() - .withRng(crypto.newRng()) - .withAddresses(@[ maDst ]) - .withTcpTransport() - .withMplex() - .withNoise() - .withRelayTransport(false) - .build() - - dst.mount(customProto) - await rel.start() - await dst.start() - let daemonNode = await newDaemonApi() - let daemonPeer = await daemonNode.identity() - let maStr = $rel.peerInfo.addrs[0] & "/p2p/" & $rel.peerInfo.peerId & "/p2p-circuit/p2p/" & $dst.peerInfo.peerId - let maddr = MultiAddress.init(maStr).tryGet() - await daemonNode.connect(rel.peerInfo.peerId, rel.peerInfo.addrs) - await rel.connect(dst.peerInfo.peerId, dst.peerInfo.addrs) - await daemonNode.connect(dst.peerInfo.peerId, @[ maddr ]) - var stream = await daemonNode.openStream(dst.peerInfo.peerId, protos) - - discard await stream.transp.writeLp("line1") - check string.fromBytes(await stream.transp.readLp()) == "line2" - discard await stream.transp.writeLp("line3") - check string.fromBytes(await stream.transp.readLp()) == "line4" - - await allFutures(dst.stop(), rel.stop()) - await daemonNode.close() - - asyncTest "NativeSrc -> DaemonRelay -> NativeDst": - proc customHandler(conn: Connection, proto: string) {.async.} = - check "line1" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("line2") - check "line3" == string.fromBytes(await conn.readLp(1024)) - await conn.writeLp("line4") - await conn.close() - let - protos = @[ "/customProto", RelayCodec ] - var - customProto = new LPProtocol - customProto.handler = customHandler - customProto.codec = protos[0] - let - maSrc = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() - maDst = MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet() - src = SwitchBuilder.new() - .withRng(crypto.newRng()) - .withAddresses(@[ maSrc ]) - .withTcpTransport() - .withMplex() - .withNoise() - .withRelayTransport(false) - .build() - dst = SwitchBuilder.new() - .withRng(crypto.newRng()) - .withAddresses(@[ maDst ]) - .withTcpTransport() - .withMplex() - .withNoise() - .withRelayTransport(false) - .build() - - dst.mount(customProto) - await src.start() - await dst.start() - let daemonNode = await newDaemonApi({RelayHop}) - let daemonPeer = await daemonNode.identity() - let maStr = $daemonPeer.addresses[0] & "/p2p/" & $daemonPeer.peer & "/p2p-circuit/p2p/" & $dst.peerInfo.peerId - let maddr = MultiAddress.init(maStr).tryGet() - await src.connect(daemonPeer.peer, daemonPeer.addresses) - await daemonNode.connect(dst.peerInfo.peerId, dst.peerInfo.addrs) - let conn = await src.dial(dst.peerInfo.peerId, @[ maddr ], protos[0]) - - await conn.writeLp("line1") - check string.fromBytes(await conn.readLp(1024)) == "line2" - - await conn.writeLp("line3") - check string.fromBytes(await conn.readLp(1024)) == "line4" - - await allFutures(src.stop(), dst.stop()) - await daemonNode.close() +import ../libp2p/crypto/crypto + +proc switchMplexCreator( + isRelay: bool = false, + ma: MultiAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(), + prov: TransportProvider = proc(upgr: Upgrade): Transport = TcpTransport.new({}, upgr)): + Switch {.raises: [Defect, LPError].} = + + SwitchBuilder.new() + .withSignedPeerRecord(false) + .withMaxConnections(MaxConnections) + .withRng(crypto.newRng()) + .withAddresses(@[ ma ]) + .withMaxIn(-1) + .withMaxOut(-1) + .withTransport(prov) + .withMplex() + .withMaxConnsPerPeer(MaxConnectionsPerPeer) + .withPeerStore(capacity=1000) + .withNoise() + .withRelayTransport(isRelay) + .withNameResolver(nil) + .build() + +proc switchYamuxCreator( + isRelay: bool = false, + ma: MultiAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(), + prov: TransportProvider = proc(upgr: Upgrade): Transport = TcpTransport.new({}, upgr)): + Switch {.raises: [Defect, LPError].} = + + SwitchBuilder.new() + .withSignedPeerRecord(false) + .withMaxConnections(MaxConnections) + .withRng(crypto.newRng()) + .withAddresses(@[ ma ]) + .withMaxIn(-1) + .withMaxOut(-1) + .withTransport(prov) + .withYamux() + .withMaxConnsPerPeer(MaxConnectionsPerPeer) + .withPeerStore(capacity=1000) + .withNoise() + .withRelayTransport(isRelay) + .withNameResolver(nil) + .build() + + +suite "Tests interop": + commonInteropTests("mplex", switchMplexCreator) + relayInteropTests("mplex", switchMplexCreator) + + commonInteropTests("yamux", switchYamuxCreator) + relayInteropTests("yamux", switchYamuxCreator) diff --git a/tests/testyamux.nim b/tests/testyamux.nim new file mode 100644 index 0000000..1d74555 --- /dev/null +++ b/tests/testyamux.nim @@ -0,0 +1,154 @@ +import sugar +import chronos +import + ../libp2p/[ + stream/connection, + muxers/yamux/yamux + ], + ./helpers + +suite "Yamux": + teardown: + checkTrackers() + + template mSetup {.inject.} = + #TODO in a template to avoid threadvar + let + (conna {.inject.}, connb {.inject.}) = bridgedConnections() + (yamuxa {.inject.}, yamuxb {.inject.}) = (Yamux.new(conna), Yamux.new(connb)) + (handlera, handlerb) = (yamuxa.handle(), yamuxb.handle()) + + defer: + await allFutures( + conna.close(), connb.close(), + yamuxa.close(), yamuxb.close(), + handlera, handlerb) + + suite "Basic": + asyncTest "Simple test": + mSetup() + + yamuxb.streamHandler = proc(conn: Connection) {.async.} = + check (await conn.readLp(100)) == fromHex("1234") + await conn.writeLp(fromHex("5678")) + await conn.close() + + let streamA = await yamuxa.newStream() + await streamA.writeLp(fromHex("1234")) + check (await streamA.readLp(100)) == fromHex("5678") + await streamA.close() + + asyncTest "Continuing read after close": + mSetup() + let + readerBlocker = newFuture[void]() + handlerBlocker = newFuture[void]() + var numberOfRead = 0 + yamuxb.streamHandler = proc(conn: Connection) {.async.} = + await readerBlocker + var buffer: array[25600, byte] + while (await conn.readOnce(addr buffer[0], 25600)) > 0: + numberOfRead.inc() + await conn.close() + handlerBlocker.complete() + + let streamA = await yamuxa.newStream() + await wait(streamA.write(newSeq[byte](256000)), 1.seconds) # shouldn't block + await streamA.close() + readerBlocker.complete() + await handlerBlocker + check: numberOfRead == 10 + + suite "Window exhaustion": + asyncTest "Basic exhaustion blocking": + mSetup() + let readerBlocker = newFuture[void]() + yamuxb.streamHandler = proc(conn: Connection) {.async.} = + await readerBlocker + var buffer: array[160000, byte] + discard await conn.readOnce(addr buffer[0], 160000) + await conn.close() + let streamA = await yamuxa.newStream() + await wait(streamA.write(newSeq[byte](256000)), 1.seconds) # shouldn't block + + let secondWriter = streamA.write(newSeq[byte](20)) + await sleepAsync(10.milliseconds) + check: not secondWriter.finished() + + readerBlocker.complete() + await wait(secondWriter, 1.seconds) + + await streamA.close() + + asyncTest "Exhaustion doesn't block other channels": + mSetup() + let readerBlocker = newFuture[void]() + yamuxb.streamHandler = proc(conn: Connection) {.async.} = + await readerBlocker + var buffer: array[160000, byte] + discard await conn.readOnce(addr buffer[0], 160000) + await conn.close() + let streamA = await yamuxa.newStream() + await wait(streamA.write(newSeq[byte](256000)), 1.seconds) # shouldn't block + + let secondWriter = streamA.write(newSeq[byte](20)) + await sleepAsync(10.milliseconds) + + # Now that the secondWriter is stuck, create a second stream + # and exchange some data + yamuxb.streamHandler = proc(conn: Connection) {.async.} = + check (await conn.readLp(100)) == fromHex("1234") + await conn.writeLp(fromHex("5678")) + await conn.close() + + let streamB = await yamuxa.newStream() + await streamB.writeLp(fromHex("1234")) + check (await streamB.readLp(100)) == fromHex("5678") + check: not secondWriter.finished() + readerBlocker.complete() + + await wait(secondWriter, 1.seconds) + await streamA.close() + await streamB.close() + + asyncTest "Can set custom window size": + mSetup() + + let writerBlocker = newFuture[void]() + var numberOfRead = 0 + yamuxb.streamHandler = proc(conn: Connection) {.async.} = + YamuxChannel(conn).setMaxRecvWindow(20) + var buffer: array[256000, byte] + while (await conn.readOnce(addr buffer[0], 256000)) > 0: + numberOfRead.inc() + writerBlocker.complete() + await conn.close() + let streamA = await yamuxa.newStream() + # Need to exhaust initial window first + await wait(streamA.write(newSeq[byte](256000)), 1.seconds) # shouldn't block + await streamA.write(newSeq[byte](142)) + await streamA.close() + + await writerBlocker + + # 1 for initial exhaustion + (142 / 20) = 9 + check numberOfRead == 9 + + asyncTest "Saturate until reset": + mSetup() + let writerBlocker = newFuture[void]() + yamuxb.streamHandler = proc(conn: Connection) {.async.} = + await writerBlocker + var buffer: array[256, byte] + check: (await conn.readOnce(addr buffer[0], 256)) == 0 + await conn.close() + + let streamA = await yamuxa.newStream() + await streamA.write(newSeq[byte](256000)) + let wrFut = collect(newSeq): + for _ in 0..3: + streamA.write(newSeq[byte](100000)) + for i in 0..3: + expect(LPStreamEOFError): await wrFut[i] + writerBlocker.complete() + await streamA.close()