diff --git a/libp2p/connection.nim b/libp2p/connection.nim index e2a2de6..c6d4026 100644 --- a/libp2p/connection.nim +++ b/libp2p/connection.nim @@ -28,22 +28,26 @@ type proc newInvalidVarintException*(): ref InvalidVarintException = newException(InvalidVarintException, "unable to prase varint") -proc newConnection*(stream: LPStream): Connection = +proc init*[T: Connection](self: var T, stream: LPStream) = ## create a new Connection for the specified async reader/writer - new result - result.stream = stream - result.closeEvent = newAsyncEvent() + new self + self.stream = stream + self.closeEvent = newAsyncEvent() # bind stream's close event to connection's close # to ensure correct close propagation - let this = result - if not isNil(result.stream.closeEvent): - result.stream.closeEvent.wait(). + let this = self + if not isNil(self.stream.closeEvent): + self.stream.closeEvent.wait(). addCallback do (udata: pointer): if not this.closed: trace "closing this connection because wrapped stream closed" asyncCheck this.close() +proc newConnection*(stream: LPStream): Connection = + ## create a new Connection for the specified async reader/writer + result.init(stream) + method read*(s: Connection, n = -1): Future[seq[byte]] {.gcsafe.} = s.stream.read(n) diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index 8ca0542..35af8ef 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -26,6 +26,8 @@ type name*: string conn*: Connection initiator*: bool + isLazy*: bool + isOpen*: bool isReset*: bool closedLocal*: bool closedRemote*: bool @@ -39,7 +41,8 @@ proc newChannel*(id: uint, conn: Connection, initiator: bool, name: string = "", - size: int = DefaultChannelSize): LPChannel = + size: int = DefaultChannelSize, + lazy: bool = false): LPChannel = new result result.id = id result.name = name @@ -49,6 +52,7 @@ proc newChannel*(id: uint, result.closeCode = if initiator: MessageType.CloseOut else: MessageType.CloseIn result.resetCode = if initiator: MessageType.ResetOut else: MessageType.ResetIn result.asyncLock = newAsyncLock() + result.isLazy = lazy let chan = result proc writeHandler(data: seq[byte]): Future[void] {.async.} = @@ -76,6 +80,7 @@ proc cleanUp*(s: LPChannel): Future[void] = result = procCall close(BufferStream(s)) proc open*(s: LPChannel): Future[void] = + s.isOpen = true s.conn.writeMsg(s.id, MessageType.New, s.name) method close*(s: LPChannel) {.async, gcsafe.} = @@ -142,19 +147,22 @@ method readUntil*(s: LPChannel, raise newLPStreamEOFError() result = procCall readOnce(BufferStream(s), pbytes, nbytes) -method write*(s: LPChannel, - pbytes: pointer, - nbytes: int): Future[void] = +template writePrefix: untyped = + if s.isLazy and not s.isOpen: + await s.open() if s.closedLocal or s.isReset: raise newLPStreamEOFError() + +method write*(s: LPChannel, + pbytes: pointer, + nbytes: int): Future[void] {.async.} = + writePrefix() result = procCall write(BufferStream(s), pbytes, nbytes) method write*(s: LPChannel, msg: string, msglen = -1) {.async.} = - if s.closedLocal or s.isReset: - raise newLPStreamEOFError() + writePrefix() result = procCall write(BufferStream(s), msg, msglen) method write*(s: LPChannel, msg: seq[byte], msglen = -1) {.async.} = - if s.closedLocal or s.isReset: - raise newLPStreamEOFError() + writePrefix() result = procCall write(BufferStream(s), msg, msglen) diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index c77f278..d7d24c4 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -41,12 +41,13 @@ proc getChannelList(m: Mplex, initiator: bool): var Table[uint, LPChannel] = proc newStreamInternal*(m: Mplex, initiator: bool = true, chanId: uint = 0, - name: string = ""): + name: string = "", + lazy: bool = false): Future[LPChannel] {.async, gcsafe.} = ## create new channel/stream let id = if initiator: m.currentId.inc(); m.currentId else: chanId trace "creating new channel", channelId = id, initiator = initiator - result = newChannel(id, m.connection, initiator, name) + result = newChannel(id, m.connection, initiator, name, lazy = lazy) m.getChannelList(initiator)[id] = result proc cleanupChann(m: Mplex, chann: LPChannel, initiator: bool) {.async, inline.} = @@ -141,11 +142,11 @@ proc newMplex*(conn: Connection, .addCallback do (udata: pointer): trace "connection closed, cleaning up mplex" asyncCheck m.close() - -method newStream*(m: Mplex, name: string = ""): Future[Connection] {.async, gcsafe.} = - let channel = await m.newStreamInternal() - # TODO: open the channel (this should be lazy) - await channel.open() + +method newStream*(m: Mplex, name: string = "", lazy: bool = false): Future[Connection] {.async, gcsafe.} = + let channel = await m.newStreamInternal(lazy = lazy) + if not lazy: + await channel.open() result = newConnection(channel) result.peerInfo = m.connection.peerInfo diff --git a/libp2p/muxers/muxer.nim b/libp2p/muxers/muxer.nim index 331f559..1bdf7d5 100644 --- a/libp2p/muxers/muxer.nim +++ b/libp2p/muxers/muxer.nim @@ -32,7 +32,7 @@ type muxerHandler*: MuxerHandler # triggered every time there is a new muxed connection created # muxer interface -method newStream*(m: Muxer, name: string = ""): Future[Connection] {.base, async, gcsafe.} = discard +method newStream*(m: Muxer, name: string = "", lazy: bool = false): Future[Connection] {.base, async, gcsafe.} = discard method close*(m: Muxer) {.base, async, gcsafe.} = discard method handle*(m: Muxer): Future[void] {.base, async, gcsafe.} = discard diff --git a/tests/testmplex.nim b/tests/testmplex.nim index d0c0fd4..4153321 100644 --- a/tests/testmplex.nim +++ b/tests/testmplex.nim @@ -116,7 +116,7 @@ suite "Mplex": check: waitFor(testDecodeHeader()) == true - + test "e2e - read/write receiver": proc testNewStream(): Future[bool] {.async.} = let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") @@ -138,9 +138,42 @@ suite "Mplex": let conn = await transport2.dial(transport1.ma) let mplexDial = newMplex(conn) - let stream = await mplexDial.newStream() + let stream = await mplexDial.newStream() + let openState = cast[LPChannel](stream.stream).isOpen await stream.writeLp("Hello from stream!") await conn.close() + check openState # not lazy + result = true + + check: + waitFor(testNewStream()) == true + + test "e2e - read/write receiver lazy": + proc testNewStream(): Future[bool] {.async.} = + let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") + + proc connHandler(conn: Connection) {.async, gcsafe.} = + proc handleMplexListen(stream: Connection) {.async, gcsafe.} = + let msg = await stream.readLp() + check cast[string](msg) == "Hello from stream!" + await stream.close() + + let mplexListen = newMplex(conn) + mplexListen.streamHandler = handleMplexListen + discard mplexListen.handle() + + let transport1: TcpTransport = newTransport(TcpTransport) + discard await transport1.listen(ma, connHandler) + + let transport2: TcpTransport = newTransport(TcpTransport) + let conn = await transport2.dial(transport1.ma) + + let mplexDial = newMplex(conn) + let stream = await mplexDial.newStream("", true) + let openState = cast[LPChannel](stream.stream).isOpen + await stream.writeLp("Hello from stream!") + await conn.close() + check not openState # assert lazy result = true check: