diff --git a/libp2p/muxers/mplex/coder.nim b/libp2p/muxers/mplex/coder.nim index a100cb4..090b07f 100644 --- a/libp2p/muxers/mplex/coder.nim +++ b/libp2p/muxers/mplex/coder.nim @@ -50,35 +50,24 @@ proc writeMsg*(conn: Connection, trace "sending data over mplex", id, msgType, data = data.len - try: - var - left = data.len - offset = 0 - while left > 0 or data.len == 0: - let - chunkSize = if left > MaxMsgSize: MaxMsgSize - 64 else: left - chunk = if chunkSize > 0 : data[offset..(offset + chunkSize - 1)] else: data - ## write lenght prefixed - var buf = initVBuffer() - buf.writePBVarint(id shl 3 or ord(msgType).uint64) - buf.writePBVarint(chunkSize.uint64) # size should be always sent - buf.finish() - left = left - chunkSize - offset = offset + chunkSize - await conn.write(buf.buffer & chunk) + var + left = data.len + offset = 0 + while left > 0 or data.len == 0: + let + chunkSize = if left > MaxMsgSize: MaxMsgSize - 64 else: left + chunk = if chunkSize > 0 : data[offset..(offset + chunkSize - 1)] else: data + ## write lenght prefixed + var buf = initVBuffer() + buf.writePBVarint(id shl 3 or ord(msgType).uint64) + buf.writePBVarint(chunkSize.uint64) # size should be always sent + buf.finish() + left = left - chunkSize + offset = offset + chunkSize + await conn.write(buf.buffer & chunk) - if data.len == 0: - return - except LPStreamEOFError: - trace "Ignoring EOF while writing" - except CancelledError as exc: - raise exc - except CatchableError as exc: - # TODO these exceptions are ignored since it's likely that if writes are - # are failing, the underlying connection is already closed - this needs - # more cleanup though - debug "Could not write to connection", error = exc.name - trace "Could not write to connection - verbose", msg = exc.msg + if data.len == 0: + return proc writeMsg*(conn: Connection, id: uint64, diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index a5a00a4..0133bc1 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -63,6 +63,8 @@ template withEOFExceptions(body: untyped): untyped = except LPStreamIncompleteError as exc: trace "incomplete message", exc = exc.msg +method reset*(s: LPChannel) {.base, async, gcsafe.} + proc newChannel*(id: uint64, conn: Connection, initiator: bool, @@ -81,17 +83,26 @@ proc newChannel*(id: uint64, let chan = result proc writeHandler(data: seq[byte]): Future[void] {.async, gcsafe.} = - if chan.isLazy and not(chan.isOpen): - await chan.open() + try: + if chan.isLazy and not(chan.isOpen): + await chan.open() - # writes should happen in sequence - trace "sending data", data = data.shortLog, - id = chan.id, - initiator = chan.initiator, - name = chan.name, - oid = chan.oid + # writes should happen in sequence + trace "sending data", data = data.shortLog, + id = chan.id, + initiator = chan.initiator, + name = chan.name, + oid = chan.oid - await conn.writeMsg(chan.id, chan.msgCode, data) # write header + try: + await conn.writeMsg(chan.id, + chan.msgCode, + data).wait(2.minutes) # write header + except AsyncTimeoutError: + trace "timeout writting channel, resetting" + asyncCheck chan.reset() + except CatchableError as exc: + trace "unable to write in bufferstream handler", exc = exc.msg result.initBufferStream(writeHandler, size) when chronicles.enabledLogLevel == LogLevel.TRACE: @@ -106,9 +117,9 @@ proc closeMessage(s: LPChannel) {.async.} = withEOFExceptions: withWriteLock(s.writeLock): trace "sending close message", id = s.id, - initiator = s.initiator, - name = s.name, - oid = s.oid + initiator = s.initiator, + name = s.name, + oid = s.oid await s.conn.writeMsg(s.id, s.closeCode) # write close @@ -116,9 +127,9 @@ proc resetMessage(s: LPChannel) {.async.} = withEOFExceptions: withWriteLock(s.writeLock): trace "sending reset message", id = s.id, - initiator = s.initiator, - name = s.name, - oid = s.oid + initiator = s.initiator, + name = s.name, + oid = s.oid await s.conn.writeMsg(s.id, s.resetCode) # write reset @@ -129,8 +140,8 @@ proc open*(s: LPChannel) {.async, gcsafe.} = withEOFExceptions: await s.conn.writeMsg(s.id, MessageType.New, s.name) trace "oppened channel", oid = s.oid, - name = s.name, - initiator = s.initiator + name = s.name, + initiator = s.initiator s.isOpen = true proc closeRemote*(s: LPChannel) {.async.} = @@ -144,9 +155,9 @@ proc closeRemote*(s: LPChannel) {.async.} = await s.dataReadEvent.wait() s.dataReadEvent.clear() - # TODO: Not sure if this needs to be set here or bfore consuming - # the buffer + await s.close() # close local end s.isEof = true # set EOF immediately to prevent further reads + # call to avoid leacks await procCall BufferStream(s).close() # close parent bufferstream trace "channel closed on EOF", id = s.id, @@ -161,33 +172,49 @@ method closed*(s: LPChannel): bool = ## header of the file s.closedLocal -method close*(s: LPChannel) {.async, gcsafe.} = - if s.closedLocal: - return - - trace "closing local lpchannel", id = s.id, - initiator = s.initiator, - name = s.name, - oid = s.oid - # TODO: we should install a timer that on expire - # will make sure the channel did close by the remote - # so the hald-closed flow completed, if it didn't - # we should send a `reset` and move on. - await s.closeMessage() - s.closedLocal = true - if s.atEof: # already closed by remote close parent buffer imediately - await procCall BufferStream(s).close() - - trace "lpchannel closed local", id = s.id, - initiator = s.initiator, - name = s.name, - oid = s.oid - -method reset*(s: LPChannel) {.base, async.} = +method reset*(s: LPChannel) {.base, async, gcsafe.} = # we asyncCheck here because the other end # might be dead already - reset is always # optimistic asyncCheck s.resetMessage() + # # because of the async check above, + # # give the message time to depart + # await sleepAsync(100.millis) await procCall BufferStream(s).close() s.isEof = true s.closedLocal = true + +method close*(s: LPChannel) {.async, gcsafe.} = + if s.closedLocal: + trace "channel already closed", id = s.id, + initiator = s.initiator, + name = s.name, + oid = s.oid + return + + proc closeRemote() {.async.} = + try: + trace "closing local lpchannel", id = s.id, + initiator = s.initiator, + name = s.name, + oid = s.oid + # TODO: we should install a timer that on expire + # will make sure the channel did close by the remote + # so the hald-closed flow completed, if it didn't + # we should send a `reset` and move on. + await s.closeMessage().wait(2.minutes) + s.closedLocal = true + if s.atEof: # already closed by remote close parent buffer imediately + await procCall BufferStream(s).close() + except AsyncTimeoutError: + trace "close timeoud, reset channel" + asyncCheck s.reset() # reset on timeout + except CatchableError as exc: + trace "exception closing channel" + + trace "lpchannel closed local", id = s.id, + initiator = s.initiator, + name = s.name, + oid = s.oid + + asyncCheck closeRemote() diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index adf2fe3..5b0a092 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -26,6 +26,7 @@ type Mplex* = ref object of Muxer remote: Table[uint64, LPChannel] local: Table[uint64, LPChannel] + conns: seq[Connection] handlerFuts: seq[Future[void]] currentId*: uint64 maxChannels*: uint64 @@ -93,17 +94,25 @@ method handle*(m: Mplex) {.async, gcsafe.} = oid = m.oid if not isNil(m.streamHandler): let stream = newConnection(channel) + m.conns.add(stream) stream.peerInfo = m.connection.peerInfo var fut = newFuture[void]() proc handler() {.async.} = - tryAndWarn "mplex channel handler": - await m.streamHandler(stream) + try: + try: + await m.streamHandler(stream) + trace "streamhandler ended", oid = stream.oid + finally: + if not(stream.closed): + await stream.close() + except CatchableError as exc: + trace "exception in stream handler", exc = exc.msg + finally: + m.conns.keepItIf(it != stream) + m.handlerFuts.keepItIf(it != fut) fut = handler() - m.handlerFuts.add(fut) - fut.addCallback do(udata: pointer): - m.handlerFuts.keepItIf(it != fut) of MessageType.MsgIn, MessageType.MsgOut: trace "pushing data to channel", id = id, @@ -191,16 +200,16 @@ method close*(m: Mplex) {.async, gcsafe.} = return trace "closing mplex muxer", oid = m.oid + await all( + toSeq(m.remote.values).mapIt(it.reset()) & + toSeq(m.local.values).mapIt(it.reset())) - checkFutures( - await allFinished( - toSeq(m.remote.values).mapIt(it.reset()) & - toSeq(m.local.values).mapIt(it.reset()))) - - checkFutures(await allFinished(m.handlerFuts)) + await all(m.conns.mapIt(it.close())) # dispose of channe's connections + await all(m.handlerFuts) await m.connection.close() m.remote.clear() m.local.clear() + m.conns = @[] m.handlerFuts = @[] m.isClosed = true diff --git a/libp2p/muxers/muxer.nim b/libp2p/muxers/muxer.nim index 4cba140..41a2eaa 100644 --- a/libp2p/muxers/muxer.nim +++ b/libp2p/muxers/muxer.nim @@ -46,22 +46,23 @@ proc newMuxerProvider*(creator: MuxerConstructor, codec: string): MuxerProvider method init(c: MuxerProvider) = proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} = - let - muxer = c.newMuxer(conn) + try: + let + muxer = c.newMuxer(conn) - if not isNil(c.streamHandler): - muxer.streamHandler = c.streamHandler + if not isNil(c.streamHandler): + muxer.streamHandler = c.streamHandler - var futs = newSeq[Future[void]]() + var futs = newSeq[Future[void]]() + futs &= muxer.handle() - futs &= muxer.handle() + # finally await both the futures + if not isNil(c.muxerHandler): + futs &= c.muxerHandler(muxer) - # finally await both the futures - if not isNil(c.muxerHandler): - futs &= c.muxerHandler(muxer) - - # log and re-raise on errors - futs = await allFinished(futs) - checkFutures(futs) + # log and re-raise on errors + await all(futs) + except CatchableError as exc: + trace "exception in muxer handler", exc = exc.msg c.handler = handler