diff --git a/libp2p/multistream.nim b/libp2p/multistream.nim index cd239d340..fd1260db1 100644 --- a/libp2p/multistream.nim +++ b/libp2p/multistream.nim @@ -16,7 +16,7 @@ import connection, logScope: topic = "Multistream" -const +const MsgSize* = 64*1024 Codec* = "/multistream/1.0.0" @@ -83,13 +83,13 @@ proc select*(m: MultisteamSelect, proc select*(m: MultisteamSelect, conn: Connection, - proto: string): Future[bool] {.async.} = - if proto.len > 0: - result = (await m.select(conn, @[proto])) == proto - else: + proto: string): Future[bool] {.async.} = + if proto.len > 0: + result = (await m.select(conn, @[proto])) == proto + else: result = (await m.select(conn, @[])) == Codec -proc select*(m: MultisteamSelect, conn: Connection): Future[bool] = +proc select*(m: MultisteamSelect, conn: Connection): Future[bool] = m.select(conn, "") proc list*(m: MultisteamSelect, @@ -112,10 +112,9 @@ proc handle*(m: MultisteamSelect, conn: Connection) {.async, gcsafe.} = trace "handle: starting multistream handling" try: while not conn.closed: - await sleepAsync(1.millis) var ms = cast[string]((await conn.readLp())) ms.removeSuffix("\n") - + trace "handle: got request for ", ms if ms.len() <= 0: trace "handle: invalid proto" @@ -158,9 +157,9 @@ proc addHandler*[T: LPProtocol](m: MultisteamSelect, protocol: T, matcher: Matcher = nil) = ## register a protocol - # TODO: This is a bug in chronicles, + # TODO: This is a bug in chronicles, # it break if I uncoment this line. - # Which is almost the same as the + # Which is almost the same as the # one on the next override of addHandler # # trace "registering protocol", codec = codec diff --git a/libp2p/muxers/mplex/mplex.nim b/libp2p/muxers/mplex/mplex.nim index b6f5ee7c0..c77f2783f 100644 --- a/libp2p/muxers/mplex/mplex.nim +++ b/libp2p/muxers/mplex/mplex.nim @@ -125,7 +125,8 @@ method handle*(m: Mplex) {.async, gcsafe.} = trace "exception occurred", exception = exc.msg finally: trace "stopping mplex main loop" - await m.connection.close() + if not m.connection.closed(): + await m.connection.close() proc newMplex*(conn: Connection, maxChanns: uint = MaxChannels): Mplex = @@ -136,7 +137,8 @@ proc newMplex*(conn: Connection, result.local = initTable[uint, LPChannel]() let m = result - conn.closeEvent.wait().addCallback do (udata: pointer): + conn.closeEvent.wait() + .addCallback do (udata: pointer): trace "connection closed, cleaning up mplex" asyncCheck m.close() diff --git a/libp2p/stream/bufferstream.nim b/libp2p/stream/bufferstream.nim index 3aba26dab..b4f87f75f 100644 --- a/libp2p/stream/bufferstream.nim +++ b/libp2p/stream/bufferstream.nim @@ -7,27 +7,27 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -## This module implements an asynchronous buffer stream +## This module implements an asynchronous buffer stream ## which emulates physical async IO. ## -## The stream is based on the standard library's `Deque`, -## which is itself based on a ring buffer. +## The stream is based on the standard library's `Deque`, +## which is itself based on a ring buffer. ## -## It works by exposing a regular LPStream interface and -## a method ``pushTo`` to push data to the internal read -## buffer; as well as a handler that can be registrered -## that gets triggered on every write to the stream. This -## allows using the buffered stream as a sort of proxy, -## which can be consumed as a regular LPStream but allows -## injecting data for reads and intercepting writes. -## -## Another notable feature is that the stream is fully -## ordered and asynchronous. Reads are queued up in order +## It works by exposing a regular LPStream interface and +## a method ``pushTo`` to push data to the internal read +## buffer; as well as a handler that can be registrered +## that gets triggered on every write to the stream. This +## allows using the buffered stream as a sort of proxy, +## which can be consumed as a regular LPStream but allows +## injecting data for reads and intercepting writes. +## +## Another notable feature is that the stream is fully +## ordered and asynchronous. Reads are queued up in order ## and are suspended when not enough data available. This -## allows preserving backpressure while maintaining full -## asynchrony. Both writting to the internal buffer with -## ``pushTo`` as well as reading with ``read*` methods, -## will suspend until either the amount of elements in the +## allows preserving backpressure while maintaining full +## asynchrony. Both writting to the internal buffer with +## ``pushTo`` as well as reading with ``read*` methods, +## will suspend until either the amount of elements in the ## buffer goes below ``maxSize`` or more data becomes available. import deques, math @@ -59,7 +59,7 @@ proc newNotWritableError*(): ref Exception {.inline.} = result = newException(NotWritableError, "stream is not writable") proc requestReadBytes(s: BufferStream): Future[void] = - ## create a future that will complete when more + ## create a future that will complete when more ## data becomes available in the read buffer result = newFuture[void]() s.readReqs.addLast(result) @@ -95,10 +95,10 @@ proc shrink(s: BufferStream, fromFirst = 0, fromLast = 0) = proc len*(s: BufferStream): int = s.readBuf.len proc pushTo*(s: BufferStream, data: seq[byte]) {.async.} = - ## Write bytes to internal read buffer, use this to fill up the + ## Write bytes to internal read buffer, use this to fill up the ## buffer with data. ## - ## This method is async and will wait until all data has been + ## This method is async and will wait until all data has been ## written to the internal buffer; this is done so that backpressure ## is preserved. ## @@ -123,7 +123,7 @@ proc pushTo*(s: BufferStream, data: seq[byte]) {.async.} = if index >= data.len: break - # if we couldn't transfer all the data to the + # if we couldn't transfer all the data to the # internal buf wait on a read event await s.dataReadEvent.wait() s.lock.release() @@ -159,8 +159,8 @@ method readExactly*(s: BufferStream, copyMem(pbytes, addr buff[0], nbytes) -method readLine*(s: BufferStream, - limit = 0, +method readLine*(s: BufferStream, + limit = 0, sep = "\r\n"): Future[string] {.async.} = ## Read one line from read-only stream ``rstream``, where ``"line"`` is a @@ -213,9 +213,9 @@ method readOnce*(s: BufferStream, result = len method readUntil*(s: BufferStream, - pbytes: pointer, + pbytes: pointer, nbytes: int, - sep: seq[byte]): + sep: seq[byte]): Future[int] {.async.} = ## Read data from the read-only stream ``rstream`` until separator ``sep`` is ## found. @@ -275,8 +275,8 @@ method write*(s: BufferStream, if not isNil(s.writeHandler): result = s.writeHandler(buf) -method write*(s: BufferStream, - msg: string, +method write*(s: BufferStream, + msg: string, msglen = -1): Future[void] = ## Write string ``sbytes`` of length ``msglen`` to writer stream ``wstream``. ## @@ -291,8 +291,8 @@ method write*(s: BufferStream, if not isNil(s.writeHandler): result = s.writeHandler(cast[seq[byte]](buf)) -method write*(s: BufferStream, - msg: seq[byte], +method write*(s: BufferStream, + msg: seq[byte], msglen = -1): Future[void] = ## Write sequence of bytes ``sbytes`` of length ``msglen`` to writer ## stream ``wstream``. @@ -310,11 +310,11 @@ method write*(s: BufferStream, proc pipe*(s: BufferStream, target: BufferStream): BufferStream = - ## pipe the write end of this stream to + ## pipe the write end of this stream to ## be the source of the target stream ## ## Note that this only works with the LPStream - ## interface methods `read*` and `write` are + ## interface methods `read*` and `write` are ## piped. ## if s.isPiped: @@ -326,15 +326,15 @@ proc pipe*(s: BufferStream, if not isNil(oldHandler): await oldHandler(data) - # if we're piping to self, - # then add the data to the + # if we're piping to self, + # then add the data to the # buffer directly and fire # the read event if s == target: for b in data: s.readBuf.addLast(b) - # notify main loop of available + # notify main loop of available # data s.dataReadEvent.fire() else: @@ -350,7 +350,8 @@ proc `|`*(s: BufferStream, target: BufferStream): BufferStream = method close*(s: BufferStream) {.async.} = ## close the stream and clear the buffer for r in s.readReqs: - r.cancel() + if not(isNil(r)) and not(r.finished()): + r.cancel() s.dataReadEvent.fire() s.readBuf.clear() s.closeEvent.fire()