From 49a12e619d1bc0688b5ef8b6ee6606a3ab8089b0 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Mon, 21 Sep 2020 19:48:19 +0200 Subject: [PATCH] channel close race and deadlock fixes (#368) * channel close race and deadlock fixes * remove send lock, write chunks in one go * push some of half-closed implementation to BufferStream * fix some hangs where LPChannel readers and writers would not always wake up * simplify lazy channels * fix close happening more than once in some orderings * reenable connection tracking tests * close channels first on mplex close such that consumers can read bytes A notable difference is that BufferedStream is no longer considered EOF until someone has actually read the EOF marker. * docs, simplification --- libp2p/muxers/mplex/coder.nim | 34 +++-- libp2p/muxers/mplex/lpchannel.nim | 212 +++++++++++----------------- libp2p/muxers/mplex/mplex.nim | 19 +-- libp2p/stream/bufferstream.nim | 51 ++++--- libp2p/stream/chronosstream.nim | 17 ++- libp2p/stream/connection.nim | 12 +- libp2p/stream/lpstream.nim | 19 ++- tests/helpers.nim | 10 +- tests/pubsub/testfloodsub.nim | 4 +- tests/pubsub/testgossipinternal.nim | 4 +- tests/pubsub/testgossipsub.nim | 4 +- tests/testbufferstream.nim | 44 +++--- tests/testconnmngr.nim | 4 +- tests/testidentify.nim | 4 +- tests/testmplex.nim | 86 ++++++++--- tests/testmultistream.nim | 4 +- tests/testnoise.nim | 4 +- tests/testswitch.nim | 22 +-- tests/testtransport.nim | 4 +- 19 files changed, 287 insertions(+), 271 deletions(-) diff --git a/libp2p/muxers/mplex/coder.nim b/libp2p/muxers/mplex/coder.nim index 4c82c7d06..4efee0910 100644 --- a/libp2p/muxers/mplex/coder.nim +++ b/libp2p/muxers/mplex/coder.nim @@ -51,33 +51,47 @@ proc readMsg*(conn: Connection): Future[Msg] {.async, gcsafe.} = if msgType.int > ord(MessageType.ResetOut): raise newInvalidMplexMsgType() - result = (header shr 3, MessageType(msgType), data) + return (header shr 3, MessageType(msgType), data) proc writeMsg*(conn: Connection, id: uint64, msgType: MessageType, data: seq[byte] = @[]) {.async, gcsafe.} = - trace "sending data over mplex", conn, - id, - msgType, - data = data.len + if conn.closed: + return # No point in trying to write to an already-closed connection + var left = data.len offset = 0 + buf = initVBuffer() + + # Split message into length-prefixed chunks while left > 0 or data.len == 0: let chunkSize = if left > MaxMsgSize: MaxMsgSize - 64 else: left - ## write length prefixed - var buf = initVBuffer() + buf.writePBVarint(id shl 3 or ord(msgType).uint64) buf.writeSeq(data.toOpenArray(offset, offset + chunkSize - 1)) - buf.finish() left = left - chunkSize offset = offset + chunkSize - await conn.write(buf.buffer) if data.len == 0: - return + break + + trace "writing mplex message", + conn, id, msgType, data = data.len, encoded = buf.buffer.len + + try: + # Write all chunks in a single write to avoid async races where a close + # message gets written before some of the chunks + await conn.write(buf.buffer) + trace "wrote mplex", conn, id, msgType + except CatchableError as exc: + # If the write to the underlying connection failed it should be closed so + # that the other channels are notified as well + trace "failed write", conn, id, msg = exc.msg + await conn.close() + raise exc proc writeMsg*(conn: Connection, id: uint64, diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index 8b3729359..1a4a26b32 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -27,19 +27,9 @@ logScope: ## | Read | Yes (until EOF) | No ## | Write | No | Yes ## - -# TODO: this is one place where we need to use -# a proper state machine, but I've opted out of -# it for now for two reasons: -# -# 1) we don't have that many states to manage -# 2) I'm not sure if adding the state machine -# would have simplified or complicated the code -# -# But now that this is in place, we should perhaps -# reconsider reworking it again, this time with a -# more formal approach. -# +## Channels are considered fully closed when both outgoing and incoming +## directions are closed and when the reader of the channel has read the +## EOF marker type LPChannel* = ref object of BufferStream @@ -47,154 +37,103 @@ type name*: string # name of the channel (for debugging) conn*: Connection # wrapped connection used to for writing initiator*: bool # initiated remotely or locally flag - isLazy*: bool # is channel lazy - isOpen*: bool # has channel been opened (only used with isLazy) - isReset*: bool # channel was reset, pushTo should drop data - pushing*: bool + isOpen*: bool # has channel been opened closedLocal*: bool # has channel been closed locally msgCode*: MessageType # cached in/out message code closeCode*: MessageType # cached in/out close code resetCode*: MessageType # cached in/out reset code - writeLock: AsyncLock proc open*(s: LPChannel) {.async, gcsafe.} -template withWriteLock(lock: AsyncLock, body: untyped): untyped = - try: - await lock.acquire() - body - finally: - if not(isNil(lock)) and lock.locked: - lock.release() - func shortLog*(s: LPChannel): auto = if s.isNil: "LPChannel(nil)" elif s.conn.peerInfo.isNil: $s.oid - elif s.name != $s.oid: &"{shortLog(s.conn.peerInfo.peerId)}:{s.oid}:{s.name}" + elif s.name != $s.oid and s.name.len > 0: + &"{shortLog(s.conn.peerInfo.peerId)}:{s.oid}:{s.name}" else: &"{shortLog(s.conn.peerInfo.peerId)}:{s.oid}" chronicles.formatIt(LPChannel): shortLog(it) -proc closeMessage(s: LPChannel) {.async.} = - ## send close message - withWriteLock(s.writeLock): - trace "sending close message", s - - await s.conn.writeMsg(s.id, s.closeCode) # write close - -proc resetMessage(s: LPChannel) {.async.} = - ## send reset message - this will not raise - try: - withWriteLock(s.writeLock): - trace "sending reset message", s - await s.conn.writeMsg(s.id, s.resetCode) # write reset - except CancelledError: - # This procedure is called from one place and never awaited, so there no - # need to re-raise CancelledError. - debug "Unexpected cancellation while resetting channel", s - except LPStreamEOFError as exc: - trace "muxed connection EOF", s, msg = exc.msg - except LPStreamClosedError as exc: - trace "muxed connection closed", s, msg = exc.msg - except LPStreamIncompleteError as exc: - trace "incomplete message", s, msg = exc.msg - except CatchableError as exc: - debug "Unhandled exception leak", s, msg = exc.msg - proc open*(s: LPChannel) {.async, gcsafe.} = + trace "Opening channel", s, conn = s.conn await s.conn.writeMsg(s.id, MessageType.New, s.name) - trace "Opened channel", s s.isOpen = true -proc closeRemote*(s: LPChannel) {.async.} = - trace "Closing remote", s - try: - # close parent bufferstream to prevent further reads - await procCall BufferStream(s).close() - except CancelledError as exc: - raise exc - except CatchableError as exc: - trace "exception closing remote channel", s, msg = exc.msg - - trace "Closed remote", s - method closed*(s: LPChannel): bool = - ## this emulates half-closed behavior - ## when closed locally writing is - ## disabled - see the table in the - ## header of the file s.closedLocal -method pushTo*(s: LPChannel, data: seq[byte]) {.async.} = - if s.isReset: - raise newLPStreamClosedError() # Terminate mplex loop - - try: - s.pushing = true - await procCall BufferStream(s).pushTo(data) - finally: - s.pushing = false - -method reset*(s: LPChannel) {.base, async, gcsafe.} = +proc closeUnderlying(s: LPChannel): Future[void] {.async.} = + ## Channels may be closed for reading and writing in any order - we'll close + ## the underlying bufferstream when both directions are closed if s.closedLocal and s.isEof: - trace "channel already closed or reset", s + await procCall BufferStream(s).close() + +proc reset*(s: LPChannel) {.async, gcsafe.} = + if s.isClosed: + trace "Already closed", s return trace "Resetting channel", s, len = s.len - # First, make sure any new calls to `readOnce` and `pushTo` will fail - there - # may already be such calls in the event queue - s.isEof = true - s.isReset = true - - s.readBuf = StreamSeq() - + # First, make sure any new calls to `readOnce` and `pushData` etc will fail - + # there may already be such calls in the event queue however s.closedLocal = true + s.isEof = true + s.readBuf = StreamSeq() + s.pushedEof = true - asyncSpawn s.resetMessage() + for i in 0.. 0: - discard s.readQueue.popFirstNoWait() - trace "Channel reset", s method close*(s: LPChannel) {.async, gcsafe.} = + ## Close channel for writing - a message will be sent to the other peer + ## informing them that the channel is closed and that we're waiting for + ## their acknowledgement. if s.closedLocal: trace "Already closed", s return - - trace "Closing channel", s, len = s.len - - proc closeInternal() {.async.} = - try: - await s.closeMessage().wait(2.minutes) - if s.atEof: # already closed by remote close parent buffer immediately - await procCall BufferStream(s).close() - except CancelledError: - debug "Unexpected cancellation while closing channel", s - await s.reset() - # This is top-level procedure which will work as separate task, so it - # do not need to propogate CancelledError. - except LPStreamClosedError, LPStreamEOFError: - trace "Connection already closed", s - except CatchableError as exc: # Shouldn't happen? - warn "Exception closing channel", s, msg = exc.msg - await s.reset() - - trace "Closed channel", s - s.closedLocal = true - # All the errors are handled inside `closeInternal()` procedure. - asyncSpawn closeInternal() + + trace "Closing channel", s, conn = s.conn, len = s.len + + if s.isOpen: + try: + await s.conn.writeMsg(s.id, s.closeCode) # write close + except CancelledError as exc: + raise exc + except CatchableError as exc: + # It's harmless that close message cannot be sent - the connection is + # likely down already + trace "Cannot send close message", s, id = s.id + + await s.closeUnderlying() # maybe already eofed + + trace "Closed channel", s, len = s.len method initStream*(s: LPChannel) = if s.objName.len == 0: @@ -206,21 +145,33 @@ method initStream*(s: LPChannel) = procCall BufferStream(s).initStream() - s.writeLock = newAsyncLock() +method readOnce*(s: LPChannel, + pbytes: pointer, + nbytes: int): + Future[int] {.async.} = + try: + let bytes = await procCall BufferStream(s).readOnce(pbytes, nbytes) + trace "readOnce", s, bytes + if bytes == 0: + await s.closeUnderlying() + return bytes + except CatchableError as exc: + await s.closeUnderlying() + raise exc method write*(s: LPChannel, msg: seq[byte]): Future[void] {.async.} = if s.closedLocal: raise newLPStreamClosedError() + doAssert msg.len > 0 try: - if s.isLazy and not(s.isOpen): + if not s.isOpen: await s.open() # writes should happen in sequence - trace "write msg", len = msg.len + trace "write msg", s, conn = s.conn, len = msg.len - withWriteLock(s.writeLock): - await s.conn.writeMsg(s.id, s.msgCode, msg) + await s.conn.writeMsg(s.id, s.msgCode, msg) s.activity = true except CatchableError as exc: trace "exception in lpchannel write handler", s, msg = exc.msg @@ -233,7 +184,6 @@ proc init*( conn: Connection, initiator: bool, name: string = "", - lazy: bool = false, timeout: Duration = DefaultChanTimeout): LPChannel = let chann = L( @@ -241,8 +191,8 @@ proc init*( name: name, conn: conn, initiator: initiator, - isLazy: lazy, timeout: timeout, + isOpen: if initiator: false else: true, msgCode: if initiator: MessageType.MsgOut else: MessageType.MsgIn, closeCode: if initiator: MessageType.CloseOut else: MessageType.CloseIn, resetCode: if initiator: MessageType.ResetOut else: MessageType.ResetIn, @@ -253,6 +203,6 @@ proc init*( when chronicles.enabledLogLevel == LogLevel.TRACE: chann.name = if chann.name.len > 0: chann.name else: $chann.oid - trace "Created new lpchannel", chann + trace "Created new lpchannel", chann, id, initiator return chann diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index 850834b3c..a85642067 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -76,7 +76,6 @@ proc newStreamInternal*(m: Mplex, initiator: bool = true, chanId: uint64 = 0, name: string = "", - lazy: bool = false, timeout: Duration): LPChannel {.gcsafe.} = ## create new channel/stream @@ -93,7 +92,6 @@ proc newStreamInternal*(m: Mplex, m.connection, initiator, name, - lazy = lazy, timeout = timeout) result.peerInfo = m.connection.peerInfo @@ -176,11 +174,11 @@ method handle*(m: Mplex) {.async, gcsafe.} = raise newLPStreamLimitError() trace "pushing data to channel", m, channel, len = data.len - await channel.pushTo(data) + await channel.pushData(data) trace "pushed data to channel", m, channel, len = data.len of MessageType.CloseIn, MessageType.CloseOut: - await channel.closeRemote() + await channel.pushEof() of MessageType.ResetIn, MessageType.ResetOut: await channel.reset() except CancelledError: @@ -208,8 +206,7 @@ proc init*(M: type Mplex, method newStream*(m: Mplex, name: string = "", lazy: bool = false): Future[Connection] {.async, gcsafe.} = - let channel = m.newStreamInternal( - lazy = lazy, timeout = m.inChannTimeout) + let channel = m.newStreamInternal(timeout = m.inChannTimeout) if not lazy: await channel.open() @@ -224,15 +221,21 @@ method close*(m: Mplex) {.async, gcsafe.} = trace "Closing mplex", m - let channs = toSeq(m.channels[false].values) & toSeq(m.channels[true].values) + var channs = toSeq(m.channels[false].values) & toSeq(m.channels[true].values) for chann in channs: - await chann.reset() + await chann.close() await m.connection.close() # TODO while we're resetting, new channels may be created that will not be # closed properly + + channs = toSeq(m.channels[false].values) & toSeq(m.channels[true].values) + + for chann in channs: + await chann.reset() + m.channels[false].clear() m.channels[true].clear() diff --git a/libp2p/stream/bufferstream.nim b/libp2p/stream/bufferstream.nim index d763042a7..a7ab8de14 100644 --- a/libp2p/stream/bufferstream.nim +++ b/libp2p/stream/bufferstream.nim @@ -79,7 +79,10 @@ proc setupBufferStreamTracker(): BufferStreamTracker = type BufferStream* = ref object of Connection readQueue*: AsyncQueue[seq[byte]] # read queue for managing backpressure - readBuf*: StreamSeq # overflow buffer for readOnce + readBuf*: StreamSeq # overflow buffer for readOnce + pushing*: int # number of ongoing push operations + + pushedEof*: bool func shortLog*(s: BufferStream): auto = if s.isNil: "BufferStream(nil)" @@ -106,14 +109,13 @@ proc newBufferStream*(timeout: Duration = DefaultConnectionTimeout): BufferStrea result.timeout = timeout result.initStream() -method pushTo*(s: BufferStream, data: seq[byte]) {.base, async.} = +method pushData*(s: BufferStream, data: seq[byte]) {.base, async.} = ## Write bytes to internal read buffer, use this to fill up the ## buffer with data. ## ## `pushTo` will block if the queue is full, thus maintaining backpressure. ## - - if s.isClosed: + if s.isClosed or s.pushedEof: raise newLPStreamEOFError() if data.len == 0: @@ -121,15 +123,32 @@ method pushTo*(s: BufferStream, data: seq[byte]) {.base, async.} = # We will block here if there is already data queued, until it has been # processed - trace "Pushing readQueue", s, len = data.len - await s.readQueue.addLast(data) + inc s.pushing + try: + trace "Pushing data", s, data = data.len + await s.readQueue.addLast(data) + finally: + dec s.pushing + +method pushEof*(s: BufferStream) {.base, async.} = + if s.pushedEof: + return + s.pushedEof = true + + # We will block here if there is already data queued, until it has been + # processed + inc s.pushing + try: + trace "Pushing EOF", s + await s.readQueue.addLast(@[]) + finally: + dec s.pushing method readOnce*(s: BufferStream, pbytes: pointer, nbytes: int): Future[int] {.async.} = doAssert(nbytes > 0, "nbytes must be positive integer") - if s.isEof and s.readBuf.len() == 0: raise newLPStreamEOFError() @@ -144,7 +163,7 @@ method readOnce*(s: BufferStream, trace "popping readQueue", s, rbytes, nbytes let buf = await s.readQueue.popFirst() - if buf.len == 0: + if buf.len == 0 or s.isEof: # Another task might have set EOF! # No more data will arrive on read queue s.isEof = true else: @@ -165,18 +184,14 @@ method readOnce*(s: BufferStream, return rbytes -method close*(s: BufferStream) {.async, gcsafe.} = +method closeImpl*(s: BufferStream): Future[void] = ## close the stream and clear the buffer - if s.isClosed: - trace "Already closed", s - return + trace "Closing BufferStream", s, len = s.len - trace "Closing BufferStream", s - - # Push empty block to signal close, but don't block - asyncSpawn s.readQueue.addLast(@[]) - - await procCall Connection(s).close() # noraises, nocancels + if not s.pushedEof: # Potentially wake up reader + asyncSpawn s.pushEof() inc getBufferStreamTracker().closed trace "Closed BufferStream", s + + procCall Connection(s).closeImpl() # noraises, nocancels diff --git a/libp2p/stream/chronosstream.nim b/libp2p/stream/chronosstream.nim index b6bb99d73..7250c13d7 100644 --- a/libp2p/stream/chronosstream.nim +++ b/libp2p/stream/chronosstream.nim @@ -90,16 +90,15 @@ method closed*(s: ChronosStream): bool {.inline.} = method atEof*(s: ChronosStream): bool {.inline.} = s.client.atEof() -method close*(s: ChronosStream) {.async.} = +method closeImpl*(s: ChronosStream) {.async.} = try: - if not s.isClosed: - trace "shutting down chronos stream", address = $s.client.remoteAddress(), - s - if not s.client.closed(): - await s.client.closeWait() - - await procCall Connection(s).close() + trace "shutting down chronos stream", address = $s.client.remoteAddress(), + s + if not s.client.closed(): + await s.client.closeWait() except CancelledError as exc: raise exc except CatchableError as exc: - trace "error closing chronosstream", exc = exc.msg, s + trace "error closing chronosstream", s, msg = exc.msg + + await procCall Connection(s).closeImpl() diff --git a/libp2p/stream/connection.nim b/libp2p/stream/connection.nim index 1d353c295..f55e07e60 100644 --- a/libp2p/stream/connection.nim +++ b/libp2p/stream/connection.nim @@ -89,14 +89,16 @@ method initStream*(s: Connection) = inc getConnectionTracker().opened -method close*(s: Connection) {.async.} = - ## cleanup timers +method closeImpl*(s: Connection): Future[void] = + # Cleanup timeout timer + trace "Closing connection", s if not isNil(s.timerTaskFut) and not s.timerTaskFut.finished: s.timerTaskFut.cancel() - if not s.isClosed: - await procCall LPStream(s).close() - inc getConnectionTracker().closed + inc getConnectionTracker().closed + trace "Closed connection" + + procCall LPStream(s).closeImpl() func hash*(p: Connection): Hash = cast[pointer](p).hash diff --git a/libp2p/stream/lpstream.nim b/libp2p/stream/lpstream.nim index f1f197d9b..24446ce29 100644 --- a/libp2p/stream/lpstream.nim +++ b/libp2p/stream/lpstream.nim @@ -210,15 +210,22 @@ proc write*(s: LPStream, pbytes: pointer, nbytes: int): Future[void] {.deprecate proc write*(s: LPStream, msg: string): Future[void] = s.write(msg.toBytes()) -# TODO: split `close` into `close` and `dispose/destroy` -method close*(s: LPStream) {.base, async.} = # {.raises [Defect].} +method closeImpl*(s: LPStream): Future[void] {.async, base.} = + ## Implementation of close - called only once + trace "Closing stream", s, objName = s.objName + s.closeEvent.fire() + libp2p_open_streams.dec(labelValues = [s.objName]) + trace "Closed stream", s, objName = s.objName + +method close*(s: LPStream): Future[void] {.base, async.} = # {.raises [Defect].} ## close the stream - this may block, but will not raise exceptions ## if s.isClosed: trace "Already closed", s return + s.isClosed = true # Set flag before performing virtual close - s.isClosed = true - s.closeEvent.fire() - libp2p_open_streams.dec(labelValues = [s.objName]) - trace "Closed stream", s, objName = s.objName + # An separate implementation method is used so that even when derived types + # override `closeImpl`, it is called only once - anyone overriding `close` + # itself must implement this - once-only check as well, with their own field + await closeImpl(s) diff --git a/tests/helpers.nim b/tests/helpers.nim index d28fbcd42..7820d9498 100644 --- a/tests/helpers.nim +++ b/tests/helpers.nim @@ -1,3 +1,5 @@ +import std/unittest + import chronos, bearssl import ../libp2p/transports/tcptransport @@ -10,7 +12,7 @@ const StreamServerTrackerName = "stream.server" trackerNames = [ - # ConnectionTrackerName, + ConnectionTrackerName, BufferStreamTrackerName, TcpTransportTrackerName, StreamTransportTrackerName, @@ -25,6 +27,12 @@ iterator testTrackers*(extras: openArray[string] = []): TrackerBase = let t = getTracker(name) if not isNil(t): yield t +template checkTrackers*() = + for tracker in testTrackers(): + if tracker.isLeaked(): + checkpoint tracker.dump() + fail() + type RngWrap = object rng: ref BrHmacDrbgContext diff --git a/tests/pubsub/testfloodsub.nim b/tests/pubsub/testfloodsub.nim index fc6a89a4d..7299b47c9 100644 --- a/tests/pubsub/testfloodsub.nim +++ b/tests/pubsub/testfloodsub.nim @@ -36,9 +36,7 @@ proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} = suite "FloodSub": teardown: - for tracker in testTrackers(): - # echo tracker.dump() - check tracker.isLeaked() == false + checkTrackers() test "FloodSub basic publish/subscribe A -> B": proc runTests() {.async.} = diff --git a/tests/pubsub/testgossipinternal.nim b/tests/pubsub/testgossipinternal.nim index 4899ee16a..1e3ab264c 100644 --- a/tests/pubsub/testgossipinternal.nim +++ b/tests/pubsub/testgossipinternal.nim @@ -28,9 +28,7 @@ proc randomPeerInfo(): PeerInfo = suite "GossipSub internal": teardown: - for tracker in testTrackers(): - # echo tracker.dump() - check tracker.isLeaked() == false + checkTrackers() test "topic params": proc testRun(): Future[bool] {.async.} = diff --git a/tests/pubsub/testgossipsub.nim b/tests/pubsub/testgossipsub.nim index 9f56b2f58..febe20fbb 100644 --- a/tests/pubsub/testgossipsub.nim +++ b/tests/pubsub/testgossipsub.nim @@ -73,9 +73,7 @@ template tryPublish(call: untyped, require: int, wait: Duration = 1.seconds, tim suite "GossipSub": teardown: - for tracker in testTrackers(): - # echo tracker.dump() - check tracker.isLeaked() == false + checkTrackers() test "GossipSub validation should succeed": proc runTests() {.async.} = diff --git a/tests/testbufferstream.nim b/tests/testbufferstream.nim index 944fe3730..9eb9a6055 100644 --- a/tests/testbufferstream.nim +++ b/tests/testbufferstream.nim @@ -11,26 +11,26 @@ suite "BufferStream": check getTracker("libp2p.bufferstream").isLeaked() == false test "push data to buffer": - proc testPushTo(): Future[bool] {.async.} = + proc testpushData(): Future[bool] {.async.} = let buff = newBufferStream() check buff.len == 0 var data = "12345" - await buff.pushTo(data.toBytes()) + await buff.pushData(data.toBytes()) check buff.len == 5 result = true await buff.close() check: - waitFor(testPushTo()) == true + waitFor(testpushData()) == true test "push and wait": - proc testPushTo(): Future[bool] {.async.} = + proc testpushData(): Future[bool] {.async.} = let buff = newBufferStream() check buff.len == 0 - let fut0 = buff.pushTo("1234".toBytes()) - let fut1 = buff.pushTo("5".toBytes()) + let fut0 = buff.pushData("1234".toBytes()) + let fut1 = buff.pushData("5".toBytes()) check buff.len == 4 # the second write should not be visible yet var data: array[1, byte] @@ -46,14 +46,14 @@ suite "BufferStream": await buff.close() check: - waitFor(testPushTo()) == true + waitFor(testpushData()) == true test "read with size": proc testRead(): Future[bool] {.async.} = let buff = newBufferStream() check buff.len == 0 - await buff.pushTo("12345".toBytes()) + await buff.pushData("12345".toBytes()) var data: array[3, byte] await buff.readExactly(addr data[0], data.len) check ['1', '2', '3'] == string.fromBytes(data) @@ -70,7 +70,7 @@ suite "BufferStream": let buff = newBufferStream() check buff.len == 0 - await buff.pushTo("12345".toBytes()) + await buff.pushData("12345".toBytes()) check buff.len == 5 var data: array[2, byte] await buff.readExactly(addr data[0], data.len) @@ -88,7 +88,7 @@ suite "BufferStream": let buff = newBufferStream() check buff.len == 0 - await buff.pushTo("123".toBytes()) + await buff.pushData("123".toBytes()) var data: array[5, byte] var readFut = buff.readExactly(addr data[0], data.len) await buff.close() @@ -108,7 +108,7 @@ suite "BufferStream": var data: array[3, byte] let readFut = buff.readOnce(addr data[0], data.len) - await buff.pushTo("123".toBytes()) + await buff.pushData("123".toBytes()) check buff.len == 3 check (await readFut) == 3 @@ -126,9 +126,9 @@ suite "BufferStream": let buff = newBufferStream() check buff.len == 0 - let w1 = buff.pushTo("Msg 1".toBytes()) - let w2 = buff.pushTo("Msg 2".toBytes()) - let w3 = buff.pushTo("Msg 3".toBytes()) + let w1 = buff.pushData("Msg 1".toBytes()) + let w2 = buff.pushData("Msg 2".toBytes()) + let w3 = buff.pushData("Msg 3".toBytes()) var data: array[5, byte] await buff.readExactly(addr data[0], data.len) @@ -143,9 +143,9 @@ suite "BufferStream": for f in [w1, w2, w3]: await f - let w4 = buff.pushTo("Msg 4".toBytes()) - let w5 = buff.pushTo("Msg 5".toBytes()) - let w6 = buff.pushTo("Msg 6".toBytes()) + let w4 = buff.pushData("Msg 4".toBytes()) + let w5 = buff.pushData("Msg 5".toBytes()) + let w6 = buff.pushData("Msg 6".toBytes()) await buff.close() @@ -173,7 +173,7 @@ suite "BufferStream": var writes: seq[Future[void]] var str: string for i in 0..<10: - writes.add buff.pushTo("123".toBytes()) + writes.add buff.pushData("123".toBytes()) str &= "123" await buff.close() # all data should still be read after close @@ -201,8 +201,8 @@ suite "BufferStream": proc closeTest(): Future[bool] {.async.} = var stream = newBufferStream() var - fut = stream.pushTo(toBytes("hello")) - fut2 = stream.pushTo(toBytes("again")) + fut = stream.pushData(toBytes("hello")) + fut2 = stream.pushData(toBytes("again")) await stream.close() try: await wait(fut, 100.milliseconds) @@ -219,13 +219,13 @@ suite "BufferStream": test "no push after close": proc closeTest(): Future[bool] {.async.} = var stream = newBufferStream() - await stream.pushTo("123".toBytes()) + await stream.pushData("123".toBytes()) var data: array[3, byte] await stream.readExactly(addr data[0], data.len) await stream.close() try: - await stream.pushTo("123".toBytes()) + await stream.pushData("123".toBytes()) except LPStreamClosedError: result = true diff --git a/tests/testconnmngr.nim b/tests/testconnmngr.nim index 568bc9187..f3e9648e9 100644 --- a/tests/testconnmngr.nim +++ b/tests/testconnmngr.nim @@ -21,9 +21,7 @@ method newStream*( suite "Connection Manager": teardown: - for tracker in testTrackers(): - # echo tracker.dump() - check tracker.isLeaked() == false + checkTrackers() test "add and retrive a connection": let connMngr = ConnManager.init() diff --git a/tests/testidentify.nim b/tests/testidentify.nim index 87c7aefee..6e63b5b49 100644 --- a/tests/testidentify.nim +++ b/tests/testidentify.nim @@ -15,9 +15,7 @@ when defined(nimHasUsed): {.used.} suite "Identify": teardown: - for tracker in testTrackers(): - # echo tracker.dump() - check tracker.isLeaked() == false + checkTrackers() test "handle identify message": proc testHandle(): Future[bool] {.async.} = diff --git a/tests/testmplex.nim b/tests/testmplex.nim index 5b33e646f..20abc52aa 100644 --- a/tests/testmplex.nim +++ b/tests/testmplex.nim @@ -18,9 +18,7 @@ import ./helpers suite "Mplex": teardown: - for tracker in testTrackers(): - # echo tracker.dump() - check tracker.isLeaked() == false + checkTrackers() test "encode header with channel id 0": proc testEncodeHeader() {.async.} = @@ -70,7 +68,7 @@ suite "Mplex": proc testDecodeHeader() {.async.} = let stream = newBufferStream() let conn = stream - await stream.pushTo(fromHex("000873747265616d2031")) + await stream.pushData(fromHex("000873747265616d2031")) let msg = await conn.readMsg() check msg.id == 0 @@ -83,7 +81,7 @@ suite "Mplex": proc testDecodeHeader() {.async.} = let stream = newBufferStream() let conn = stream - await stream.pushTo(fromHex("021668656C6C6F2066726F6D206368616E6E656C20302121")) + await stream.pushData(fromHex("021668656C6C6F2066726F6D206368616E6E656C20302121")) let msg = await conn.readMsg() check msg.id == 0 @@ -97,7 +95,7 @@ suite "Mplex": proc testDecodeHeader() {.async.} = let stream = newBufferStream() let conn = stream - await stream.pushTo(fromHex("8a011668656C6C6F2066726F6D206368616E6E656C20302121")) + await stream.pushData(fromHex("8a011668656C6C6F2066726F6D206368616E6E656C20302121")) let msg = await conn.readMsg() check msg.id == 17 @@ -134,14 +132,14 @@ suite "Mplex": ) chann = LPChannel.init(1, conn, true) - await chann.pushTo(("Hello!").toBytes) + await chann.pushData(("Hello!").toBytes) var data = newSeq[byte](6) await chann.close() # closing channel # should be able to read on local clsoe await chann.readExactly(addr data[0], 3) # closing remote end - let closeFut = chann.closeRemote() + let closeFut = chann.pushEof() # should still allow reading until buffer EOF await chann.readExactly(addr data[3], 3) try: @@ -166,11 +164,11 @@ suite "Mplex": ) chann = LPChannel.init(1, conn, true) - await chann.pushTo(("Hello!").toBytes) + await chann.pushData(("Hello!").toBytes) var data = newSeq[byte](6) await chann.readExactly(addr data[0], 3) - let closeFut = chann.closeRemote() # closing channel + let closeFut = chann.pushEof() # closing channel let readFut = chann.readExactly(addr data[3], 3) await all(closeFut, readFut) try: @@ -184,7 +182,7 @@ suite "Mplex": check: waitFor(testClosedForRead()) == true - test "half closed (remote close) - channel should allow writting on remote close": + test "half closed (remote close) - channel should allow writing on remote close": proc testClosedForRead(): Future[bool] {.async.} = let testData = "Hello!".toBytes @@ -194,12 +192,12 @@ suite "Mplex": ) chann = LPChannel.init(1, conn, true) - await chann.closeRemote() # closing channel + await chann.pushEof() # closing channel try: await chann.writeLp(testData) return true finally: - await chann.close() + await chann.reset() # there's nobody reading the EOF! await conn.close() check: @@ -211,9 +209,11 @@ suite "Mplex": let conn = newBufferStream(writeHandler) chann = LPChannel.init(1, conn, true) - await chann.closeRemote() + await chann.pushEof() + var buf: array[1, byte] + check: (await chann.readOnce(addr buf[0], 1)) == 0 # EOF marker read try: - await chann.pushTo(@[byte(1)]) + await chann.pushData(@[byte(1)]) except LPStreamEOFError: result = true finally: @@ -234,7 +234,6 @@ suite "Mplex": var data = newSeq[byte](1) try: await chann.readExactly(addr data[0], 1) - check data.len == 1 except LPStreamEOFError: result = true finally: @@ -243,6 +242,60 @@ suite "Mplex": check: waitFor(testResetRead()) == true + test "reset - should complete read": + proc testResetRead(): Future[bool] {.async.} = + proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard + let + conn = newBufferStream(writeHandler) + chann = LPChannel.init(1, conn, true) + + var data = newSeq[byte](1) + let fut = chann.readExactly(addr data[0], 1) + await chann.reset() + try: + await fut + except LPStreamEOFError: + result = true + finally: + await conn.close() + + check: + waitFor(testResetRead()) == true + + test "reset - should complete pushData": + proc testResetRead(): Future[bool] {.async.} = + proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard + let + conn = newBufferStream(writeHandler) + chann = LPChannel.init(1, conn, true) + + await chann.pushData(@[0'u8]) + let fut = chann.pushData(@[0'u8]) + await chann.reset() + result = await fut.withTimeout(100.millis) + await conn.close() + + check: + waitFor(testResetRead()) == true + + test "reset - should complete both read and push": + proc testResetRead(): Future[bool] {.async.} = + proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard + let + conn = newBufferStream(writeHandler) + chann = LPChannel.init(1, conn, true) + + var data = newSeq[byte](1) + let rfut = chann.readExactly(addr data[0], 1) + let wfut = chann.pushData(@[0'u8]) + let wfut2 = chann.pushData(@[0'u8]) + await chann.reset() + result = await allFutures(rfut, wfut, wfut2).withTimeout(100.millis) + await conn.close() + + check: + waitFor(testResetRead()) == true + test "reset - channel should fail writing": proc testResetWrite(): Future[bool] {.async.} = proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard @@ -533,6 +586,7 @@ suite "Mplex": await done.wait(5.seconds) await conn.close() await mplexDialFut + await mplexDial.close() await allFuturesThrowing( transport1.close(), transport2.close()) diff --git a/tests/testmultistream.nim b/tests/testmultistream.nim index 4e1c241bd..2743256d3 100644 --- a/tests/testmultistream.nim +++ b/tests/testmultistream.nim @@ -168,9 +168,7 @@ proc newTestNaStream(na: NaHandler): TestNaStream = suite "Multistream select": teardown: - for tracker in testTrackers(): - # echo tracker.dump() - check tracker.isLeaked() == false + checkTrackers() test "test select custom proto": proc testSelect(): Future[bool] {.async.} = diff --git a/tests/testnoise.nim b/tests/testnoise.nim index 8fbd5608b..155c174a0 100644 --- a/tests/testnoise.nim +++ b/tests/testnoise.nim @@ -68,9 +68,7 @@ proc createSwitch(ma: MultiAddress; outgoing: bool): (Switch, PeerInfo) = suite "Noise": teardown: - for tracker in testTrackers(): - # echo tracker.dump() - check tracker.isLeaked() == false + checkTrackers() test "e2e: handle write + noise": proc testListenerDialer(): Future[bool] {.async.} = diff --git a/tests/testswitch.nim b/tests/testswitch.nim index fd1645807..c75175e7f 100644 --- a/tests/testswitch.nim +++ b/tests/testswitch.nim @@ -15,7 +15,6 @@ import ../libp2p/[errors, crypto/crypto, protocols/protocol, muxers/muxer, - muxers/mplex/mplex, stream/lpstream] import ./helpers @@ -27,9 +26,7 @@ type suite "Switch": teardown: - for tracker in testTrackers(): - # echo tracker.dump() - check tracker.isLeaked() == false + checkTrackers() test "e2e use switch dial proto string": proc testSwitch() {.async, gcsafe.} = @@ -112,23 +109,6 @@ suite "Switch": check "Hello!" == msg await conn.close() - await sleepAsync(2.seconds) # wait a little for cleanup to happen - var bufferTracker = getTracker(BufferStreamTrackerName) - # echo bufferTracker.dump() - - # plus 4 for the pubsub streams - check (BufferStreamTracker(bufferTracker).opened == - (BufferStreamTracker(bufferTracker).closed)) - - var connTracker = getTracker(ConnectionTrackerName) - # echo connTracker.dump() - - # plus 8 is for the secured connection and the socket - # and the pubsub streams that won't clean up until - # `disconnect()` or `stop()` - check (ConnectionTracker(connTracker).opened == - (ConnectionTracker(connTracker).closed + 4.uint64)) - await allFuturesThrowing( done.wait(5.seconds), switch1.stop(), diff --git a/tests/testtransport.nim b/tests/testtransport.nim index df5ee4b69..fb31bbbe7 100644 --- a/tests/testtransport.nim +++ b/tests/testtransport.nim @@ -11,9 +11,7 @@ import ./helpers suite "TCP transport": teardown: - for tracker in testTrackers(): - # echo tracker.dump() - check tracker.isLeaked() == false + checkTrackers() test "test listener: handle write": proc testListener(): Future[bool] {.async, gcsafe.} =