From 381630f1854818be634a98e92a65dc317bf780a0 Mon Sep 17 00:00:00 2001 From: Eugene Kabanov Date: Wed, 4 Mar 2020 21:45:14 +0200 Subject: [PATCH] Fix and refactoring of some procedures which are able to return nil as result (#97) * Fix do not return nil as result. * Fix mplex test to properly raise. --- libp2p/muxers/mplex/lpchannel.nim | 39 ++++++++++++++++++--------- libp2p/protocols/pubsub/gossipsub.nim | 2 +- libp2p/protocols/secure/secure.nim | 7 ++--- libp2p/stream/bufferstream.nim | 24 ++++++++++++----- libp2p/stream/lpstream.nim | 7 +++-- libp2p/switch.nim | 21 ++++++++++----- tests/testmplex.nim | 3 ++- 7 files changed, 69 insertions(+), 34 deletions(-) diff --git a/libp2p/muxers/mplex/lpchannel.nim b/libp2p/muxers/mplex/lpchannel.nim index dfd83a614..4d52ed469 100644 --- a/libp2p/muxers/mplex/lpchannel.nim +++ b/libp2p/muxers/mplex/lpchannel.nim @@ -96,7 +96,10 @@ method closed*(s: LPChannel): bool = proc pushTo*(s: LPChannel, data: seq[byte]): Future[void] = if s.closedRemote or s.isReset: - raise newLPStreamEOFError() + var retFuture = newFuture[void]("LPChannel.pushTo") + retFuture.fail(newLPStreamEOFError()) + return retFuture + trace "pushing data to channel", data = data.toHex(), id = s.id, initiator = s.initiator @@ -105,7 +108,9 @@ proc pushTo*(s: LPChannel, data: seq[byte]): Future[void] = method read*(s: LPChannel, n = -1): Future[seq[byte]] = if s.closed or s.isReset: - raise newLPStreamEOFError() + var retFuture = newFuture[seq[byte]]("LPChannel.read") + retFuture.fail(newLPStreamEOFError()) + return retFuture result = procCall read(BufferStream(s), n) @@ -114,7 +119,10 @@ method readExactly*(s: LPChannel, nbytes: int): Future[void] = if s.closed or s.isReset: - raise newLPStreamEOFError() + var retFuture = newFuture[void]("LPChannel.readExactly") + retFuture.fail(newLPStreamEOFError()) + return retFuture + result = procCall readExactly(BufferStream(s), pbytes, nbytes) method readLine*(s: LPChannel, @@ -122,7 +130,10 @@ method readLine*(s: LPChannel, sep = "\r\n"): Future[string] = if s.closed or s.isReset: - raise newLPStreamEOFError() + var retFuture = newFuture[string]("LPChannel.readLine") + retFuture.fail(newLPStreamEOFError()) + return retFuture + result = procCall readLine(BufferStream(s), limit, sep) method readOnce*(s: LPChannel, @@ -130,7 +141,10 @@ method readOnce*(s: LPChannel, nbytes: int): Future[int] = if s.closed or s.isReset: - raise newLPStreamEOFError() + var retFuture = newFuture[int]("LPChannel.readOnce") + retFuture.fail(newLPStreamEOFError()) + return retFuture + result = procCall readOnce(BufferStream(s), pbytes, nbytes) method readUntil*(s: LPChannel, @@ -138,7 +152,10 @@ method readUntil*(s: LPChannel, sep: seq[byte]): Future[int] = if s.closed or s.isReset: - raise newLPStreamEOFError() + var retFuture = newFuture[int]("LPChannel.readUntil") + retFuture.fail(newLPStreamEOFError()) + return retFuture + result = procCall readOnce(BufferStream(s), pbytes, nbytes) template writePrefix: untyped = @@ -147,16 +164,14 @@ template writePrefix: untyped = if s.closedLocal or s.isReset: raise newLPStreamEOFError() -method write*(s: LPChannel, - pbytes: pointer, - nbytes: int): Future[void] {.async.} = +method write*(s: LPChannel, pbytes: pointer, nbytes: int) {.async.} = writePrefix() - result = procCall write(BufferStream(s), pbytes, nbytes) + await procCall write(BufferStream(s), pbytes, nbytes) method write*(s: LPChannel, msg: string, msglen = -1) {.async.} = writePrefix() - result = procCall write(BufferStream(s), msg, msglen) + await procCall write(BufferStream(s), msg, msglen) method write*(s: LPChannel, msg: seq[byte], msglen = -1) {.async.} = writePrefix() - result = procCall write(BufferStream(s), msg, msglen) + await procCall write(BufferStream(s), msg, msglen) diff --git a/libp2p/protocols/pubsub/gossipsub.nim b/libp2p/protocols/pubsub/gossipsub.nim index a63668347..d930dd112 100644 --- a/libp2p/protocols/pubsub/gossipsub.nim +++ b/libp2p/protocols/pubsub/gossipsub.nim @@ -59,7 +59,7 @@ proc addInterval(every: Duration, cb: CallbackFunc, udata: pointer = nil): Future[void] = ## Arrange the callback ``cb`` to be called on every ``Duration`` window - var retFuture = newFuture[void]("chronos.addInterval(Duration)") + var retFuture = newFuture[void]("gossipsub.addInterval(Duration)") proc interval(arg: pointer = nil) {.gcsafe.} proc scheduleNext() = if not retFuture.finished(): diff --git a/libp2p/protocols/secure/secure.nim b/libp2p/protocols/secure/secure.nim index 65e014307..9f47a8233 100644 --- a/libp2p/protocols/secure/secure.nim +++ b/libp2p/protocols/secure/secure.nim @@ -14,7 +14,8 @@ import ../protocol, type Secure* = ref object of LPProtocol # base type for secure managers -method secure*(p: Secure, conn: Connection): Future[Connection] - {.base, async, gcsafe.} = +method secure*(p: Secure, conn: Connection): Future[Connection] {.base.} = ## default implementation matches plaintext - result = conn + var retFuture = newFuture[Connection]("secure.secure") + retFuture.complete(conn) + return retFuture diff --git a/libp2p/stream/bufferstream.nim b/libp2p/stream/bufferstream.nim index d35a6ac03..d56a969a8 100644 --- a/libp2p/stream/bufferstream.nim +++ b/libp2p/stream/bufferstream.nim @@ -271,10 +271,14 @@ method write*(s: BufferStream, ## ## Return number of bytes actually consumed (discarded). ## + if isNil(s.writeHandler): + var retFuture = newFuture[void]("BufferStream.write(pointer)") + retFuture.fail(newNotWritableError()) + return retFuture + var buf: seq[byte] = newSeq[byte](nbytes) copyMem(addr buf[0], pbytes, nbytes) - if not isNil(s.writeHandler): - result = s.writeHandler(buf) + result = s.writeHandler(buf) method write*(s: BufferStream, msg: string, @@ -287,10 +291,14 @@ method write*(s: BufferStream, ## If ``msglen > len(sbytes)`` only ``len(sbytes)`` bytes will be written to ## stream. ## + if isNil(s.writeHandler): + var retFuture = newFuture[void]("BufferStream.write(string)") + retFuture.fail(newNotWritableError()) + return retFuture + var buf = "" shallowCopy(buf, if msglen > 0: msg[0.. len(sbytes)`` only ``len(sbytes)`` bytes will be written to ## stream. ## + if isNil(s.writeHandler): + var retFuture = newFuture[void]("BufferStream.write(seq)") + retFuture.fail(newNotWritableError()) + return retFuture + var buf: seq[byte] shallowCopy(buf, if msglen > 0: msg[0..