From 2325692f55b5b91f415779650e62ef1b01c4af3a Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 10 Aug 2020 16:17:11 -0600 Subject: [PATCH] Fix half closed (#324) * don't call `close` in `remoteClose` * make sure timeout are properly propagted * fix tests * adding remote close write test --- libp2p/muxers/mplex/lpchannel.nim | 7 +-- libp2p/protocols/secure/secure.nim | 6 ++- libp2p/stream/bufferstream.nim | 4 +- libp2p/stream/connection.nim | 13 +++-- tests/testmplex.nim | 85 ++++++++++++++++++++---------- 5 files changed, 77 insertions(+), 38 deletions(-) diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index 51d2613..27e653e 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -138,12 +138,9 @@ proc closeRemote*(s: LPChannel) {.async.} = trace "got EOF, closing channel" try: await s.drainBuffer() - s.isEof = true # set EOF immediately to prevent further reads - await s.close() # close local end - - # call to avoid leaks - await procCall BufferStream(s).close() # close parent bufferstream + # close parent bufferstream to prevent further reads + await procCall BufferStream(s).close() trace "channel closed on EOF" except CancelledError as exc: diff --git a/libp2p/protocols/secure/secure.nim b/libp2p/protocols/secure/secure.nim index ab92d9e..ad8d707 100644 --- a/libp2p/protocols/secure/secure.nim +++ b/libp2p/protocols/secure/secure.nim @@ -28,11 +28,13 @@ type proc init*[T: SecureConn](C: type T, conn: Connection, peerInfo: PeerInfo, - observedAddr: Multiaddress): T = + observedAddr: Multiaddress, + timeout: Duration = DefaultConnectionTimeout): T = result = C(stream: conn, peerInfo: peerInfo, observedAddr: observedAddr, - closeEvent: conn.closeEvent) + closeEvent: conn.closeEvent, + timeout: timeout) result.initStream() method initStream*(s: SecureConn) = diff --git a/libp2p/stream/bufferstream.nim b/libp2p/stream/bufferstream.nim index 6f00580..00547d2 100644 --- a/libp2p/stream/bufferstream.nim +++ b/libp2p/stream/bufferstream.nim @@ -143,8 +143,10 @@ proc initBufferStream*(s: BufferStream, trace "created bufferstream", oid = $s.oid proc newBufferStream*(handler: WriteHandler = nil, - size: int = DefaultBufferSize): BufferStream = + size: int = DefaultBufferSize, + timeout: Duration = DefaultConnectionTimeout): BufferStream = new result + result.timeout = timeout result.initBufferStream(handler, size) proc popFirst*(s: BufferStream): byte = diff --git a/libp2p/stream/connection.nim b/libp2p/stream/connection.nim index 55461fd..a5925d7 100644 --- a/libp2p/stream/connection.nim +++ b/libp2p/stream/connection.nim @@ -7,7 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import hashes +import hashes, oids import chronicles, chronos, metrics import lpstream, ../multiaddress, @@ -20,7 +20,7 @@ logScope: const ConnectionTrackerName* = "libp2p.connection" - DefaultConnectionTimeout* = 1.minutes + DefaultConnectionTimeout* = 5.minutes type TimeoutHandler* = proc(): Future[void] {.gcsafe.} @@ -73,8 +73,15 @@ method initStream*(s: Connection) = procCall LPStream(s).initStream() s.closeEvent = newAsyncEvent() + if isNil(s.timeoutHandler): + s.timeoutHandler = proc() {.async.} = + await s.close() + + trace "timeout", timeout = $s.timeout.millis doAssert(isNil(s.timerTaskFut)) - s.timerTaskFut = s.timeoutMonitor() + # doAssert(s.timeout > 0.millis) + if s.timeout > 0.millis: + s.timerTaskFut = s.timeoutMonitor() inc getConnectionTracker().opened diff --git a/tests/testmplex.nim b/tests/testmplex.nim index fbcf843..6119dc3 100644 --- a/tests/testmplex.nim +++ b/tests/testmplex.nim @@ -135,18 +135,20 @@ suite "Mplex": let conn = newBufferStream( proc (data: seq[byte]) {.gcsafe, async.} = - discard + discard, + timeout = 5.minutes ) chann = LPChannel.init(1, conn, true) await chann.pushTo(("Hello!").toBytes) - let closeFut = chann.closeRemote() var data = newSeq[byte](6) - await chann.readExactly(addr data[0], 6) # this should work, since there is data in the buffer + await chann.readExactly(addr data[0], 3) + let closeFut = chann.closeRemote() # closing channel + let readFut = chann.readExactly(addr data[3], 3) + await all(closeFut, readFut) try: - await chann.readExactly(addr data[0], 6) # this should throw - await closeFut + await chann.readExactly(addr data[0], 6) # this should fail now except LPStreamEOFError: result = true finally: @@ -156,6 +158,29 @@ suite "Mplex": check: waitFor(testClosedForRead()) == true + test "half closed - channel should allow writting on remote close": + proc testClosedForRead(): Future[bool] {.async.} = + let + testData = "Hello!".toBytes + conn = newBufferStream( + proc (data: seq[byte]) {.gcsafe, async.} = + discard + , timeout = 5.minutes + ) + chann = LPChannel.init(1, conn, true) + + var data = newSeq[byte](6) + await chann.closeRemote() # closing channel + try: + await chann.writeLp(testData) + return true + finally: + await chann.close() + await conn.close() + + check: + waitFor(testClosedForRead()) == true + test "should not allow pushing data to channel when remote end closed": proc testResetWrite(): Future[bool] {.async.} = proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard @@ -211,20 +236,20 @@ suite "Mplex": check: waitFor(testResetWrite()) == true - test "reset - channel should reset on timeout": - proc testResetWrite(): Future[bool] {.async.} = - proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard - let - conn = newBufferStream(writeHandler) - chann = LPChannel.init( - 1, conn, true, timeout = 100.millis) + test "reset - channel should reset on timeout": + proc testResetWrite(): Future[bool] {.async.} = + proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard + let + conn = newBufferStream(writeHandler) + chann = LPChannel.init( + 1, conn, true, timeout = 100.millis) - await chann.closeEvent.wait() - await conn.close() - result = true + await chann.closeEvent.wait() + await conn.close() + result = true - check: - waitFor(testResetWrite()) + check: + waitFor(testResetWrite()) test "e2e - read/write receiver": proc testNewStream() {.async.} = @@ -318,17 +343,23 @@ suite "Mplex": bigseq.add(uint8(rand(uint('A')..uint('z')))) proc connHandler(conn: Connection) {.async, gcsafe.} = - let mplexListen = Mplex.init(conn) - mplexListen.streamHandler = proc(stream: Connection) - {.async, gcsafe.} = - let msg = await stream.readLp(MaxMsgSize) - check msg == bigseq - trace "Bigseq check passed!" - await stream.close() - listenJob.complete() + try: + let mplexListen = Mplex.init(conn) + mplexListen.streamHandler = proc(stream: Connection) + {.async, gcsafe.} = + let msg = await stream.readLp(MaxMsgSize) + check msg == bigseq + trace "Bigseq check passed!" + await stream.close() + listenJob.complete() - await mplexListen.handle() - await mplexListen.close() + await mplexListen.handle() + await sleepAsync(1.seconds) # give chronos some slack to process things + await mplexListen.close() + except CancelledError as exc: + raise exc + except CatchableError as exc: + check false let transport1: TcpTransport = TcpTransport.init() let listenFut = await transport1.listen(ma, connHandler)