Fix and refactoring of some procedures which are able to return nil as result (#97)

* Fix do not return nil as result.

* Fix mplex test to properly raise.
This commit is contained in:
Eugene Kabanov 2020-03-04 21:45:14 +02:00 committed by GitHub
parent ffc3b04222
commit 381630f185
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 69 additions and 34 deletions

View File

@ -96,7 +96,10 @@ method closed*(s: LPChannel): bool =
proc pushTo*(s: LPChannel, data: seq[byte]): Future[void] = proc pushTo*(s: LPChannel, data: seq[byte]): Future[void] =
if s.closedRemote or s.isReset: if s.closedRemote or s.isReset:
raise newLPStreamEOFError() var retFuture = newFuture[void]("LPChannel.pushTo")
retFuture.fail(newLPStreamEOFError())
return retFuture
trace "pushing data to channel", data = data.toHex(), trace "pushing data to channel", data = data.toHex(),
id = s.id, id = s.id,
initiator = s.initiator initiator = s.initiator
@ -105,7 +108,9 @@ proc pushTo*(s: LPChannel, data: seq[byte]): Future[void] =
method read*(s: LPChannel, n = -1): Future[seq[byte]] = method read*(s: LPChannel, n = -1): Future[seq[byte]] =
if s.closed or s.isReset: if s.closed or s.isReset:
raise newLPStreamEOFError() var retFuture = newFuture[seq[byte]]("LPChannel.read")
retFuture.fail(newLPStreamEOFError())
return retFuture
result = procCall read(BufferStream(s), n) result = procCall read(BufferStream(s), n)
@ -114,7 +119,10 @@ method readExactly*(s: LPChannel,
nbytes: int): nbytes: int):
Future[void] = Future[void] =
if s.closed or s.isReset: if s.closed or s.isReset:
raise newLPStreamEOFError() var retFuture = newFuture[void]("LPChannel.readExactly")
retFuture.fail(newLPStreamEOFError())
return retFuture
result = procCall readExactly(BufferStream(s), pbytes, nbytes) result = procCall readExactly(BufferStream(s), pbytes, nbytes)
method readLine*(s: LPChannel, method readLine*(s: LPChannel,
@ -122,7 +130,10 @@ method readLine*(s: LPChannel,
sep = "\r\n"): sep = "\r\n"):
Future[string] = Future[string] =
if s.closed or s.isReset: if s.closed or s.isReset:
raise newLPStreamEOFError() var retFuture = newFuture[string]("LPChannel.readLine")
retFuture.fail(newLPStreamEOFError())
return retFuture
result = procCall readLine(BufferStream(s), limit, sep) result = procCall readLine(BufferStream(s), limit, sep)
method readOnce*(s: LPChannel, method readOnce*(s: LPChannel,
@ -130,7 +141,10 @@ method readOnce*(s: LPChannel,
nbytes: int): nbytes: int):
Future[int] = Future[int] =
if s.closed or s.isReset: if s.closed or s.isReset:
raise newLPStreamEOFError() var retFuture = newFuture[int]("LPChannel.readOnce")
retFuture.fail(newLPStreamEOFError())
return retFuture
result = procCall readOnce(BufferStream(s), pbytes, nbytes) result = procCall readOnce(BufferStream(s), pbytes, nbytes)
method readUntil*(s: LPChannel, method readUntil*(s: LPChannel,
@ -138,7 +152,10 @@ method readUntil*(s: LPChannel,
sep: seq[byte]): sep: seq[byte]):
Future[int] = Future[int] =
if s.closed or s.isReset: if s.closed or s.isReset:
raise newLPStreamEOFError() var retFuture = newFuture[int]("LPChannel.readUntil")
retFuture.fail(newLPStreamEOFError())
return retFuture
result = procCall readOnce(BufferStream(s), pbytes, nbytes) result = procCall readOnce(BufferStream(s), pbytes, nbytes)
template writePrefix: untyped = template writePrefix: untyped =
@ -147,16 +164,14 @@ template writePrefix: untyped =
if s.closedLocal or s.isReset: if s.closedLocal or s.isReset:
raise newLPStreamEOFError() raise newLPStreamEOFError()
method write*(s: LPChannel, method write*(s: LPChannel, pbytes: pointer, nbytes: int) {.async.} =
pbytes: pointer,
nbytes: int): Future[void] {.async.} =
writePrefix() writePrefix()
result = procCall write(BufferStream(s), pbytes, nbytes) await procCall write(BufferStream(s), pbytes, nbytes)
method write*(s: LPChannel, msg: string, msglen = -1) {.async.} = method write*(s: LPChannel, msg: string, msglen = -1) {.async.} =
writePrefix() writePrefix()
result = procCall write(BufferStream(s), msg, msglen) await procCall write(BufferStream(s), msg, msglen)
method write*(s: LPChannel, msg: seq[byte], msglen = -1) {.async.} = method write*(s: LPChannel, msg: seq[byte], msglen = -1) {.async.} =
writePrefix() writePrefix()
result = procCall write(BufferStream(s), msg, msglen) await procCall write(BufferStream(s), msg, msglen)

View File

@ -59,7 +59,7 @@ proc addInterval(every: Duration, cb: CallbackFunc,
udata: pointer = nil): Future[void] = udata: pointer = nil): Future[void] =
## Arrange the callback ``cb`` to be called on every ``Duration`` window ## Arrange the callback ``cb`` to be called on every ``Duration`` window
var retFuture = newFuture[void]("chronos.addInterval(Duration)") var retFuture = newFuture[void]("gossipsub.addInterval(Duration)")
proc interval(arg: pointer = nil) {.gcsafe.} proc interval(arg: pointer = nil) {.gcsafe.}
proc scheduleNext() = proc scheduleNext() =
if not retFuture.finished(): if not retFuture.finished():

View File

@ -14,7 +14,8 @@ import ../protocol,
type type
Secure* = ref object of LPProtocol # base type for secure managers Secure* = ref object of LPProtocol # base type for secure managers
method secure*(p: Secure, conn: Connection): Future[Connection] method secure*(p: Secure, conn: Connection): Future[Connection] {.base.} =
{.base, async, gcsafe.} =
## default implementation matches plaintext ## default implementation matches plaintext
result = conn var retFuture = newFuture[Connection]("secure.secure")
retFuture.complete(conn)
return retFuture

View File

@ -271,10 +271,14 @@ method write*(s: BufferStream,
## ##
## Return number of bytes actually consumed (discarded). ## Return number of bytes actually consumed (discarded).
## ##
if isNil(s.writeHandler):
var retFuture = newFuture[void]("BufferStream.write(pointer)")
retFuture.fail(newNotWritableError())
return retFuture
var buf: seq[byte] = newSeq[byte](nbytes) var buf: seq[byte] = newSeq[byte](nbytes)
copyMem(addr buf[0], pbytes, nbytes) copyMem(addr buf[0], pbytes, nbytes)
if not isNil(s.writeHandler): result = s.writeHandler(buf)
result = s.writeHandler(buf)
method write*(s: BufferStream, method write*(s: BufferStream,
msg: string, msg: string,
@ -287,10 +291,14 @@ method write*(s: BufferStream,
## If ``msglen > len(sbytes)`` only ``len(sbytes)`` bytes will be written to ## If ``msglen > len(sbytes)`` only ``len(sbytes)`` bytes will be written to
## stream. ## stream.
## ##
if isNil(s.writeHandler):
var retFuture = newFuture[void]("BufferStream.write(string)")
retFuture.fail(newNotWritableError())
return retFuture
var buf = "" var buf = ""
shallowCopy(buf, if msglen > 0: msg[0..<msglen] else: msg) shallowCopy(buf, if msglen > 0: msg[0..<msglen] else: msg)
if not isNil(s.writeHandler): result = s.writeHandler(cast[seq[byte]](buf))
result = s.writeHandler(cast[seq[byte]](buf))
method write*(s: BufferStream, method write*(s: BufferStream,
msg: seq[byte], msg: seq[byte],
@ -304,10 +312,14 @@ method write*(s: BufferStream,
## If ``msglen > len(sbytes)`` only ``len(sbytes)`` bytes will be written to ## If ``msglen > len(sbytes)`` only ``len(sbytes)`` bytes will be written to
## stream. ## stream.
## ##
if isNil(s.writeHandler):
var retFuture = newFuture[void]("BufferStream.write(seq)")
retFuture.fail(newNotWritableError())
return retFuture
var buf: seq[byte] var buf: seq[byte]
shallowCopy(buf, if msglen > 0: msg[0..<msglen] else: msg) shallowCopy(buf, if msglen > 0: msg[0..<msglen] else: msg)
if not isNil(s.writeHandler): result = s.writeHandler(buf)
result = s.writeHandler(buf)
proc pipe*(s: BufferStream, proc pipe*(s: BufferStream,
target: BufferStream): BufferStream = target: BufferStream): BufferStream =

View File

@ -51,12 +51,11 @@ proc newLPStreamEOFError*(): ref Exception {.inline.} =
method closed*(s: LPStream): bool {.base, inline.} = method closed*(s: LPStream): bool {.base, inline.} =
s.isClosed s.isClosed
method read*(s: LPStream, n = -1): Future[seq[byte]] method read*(s: LPStream, n = -1): Future[seq[byte]] {.base, async.} =
{.base, async.} =
doAssert(false, "not implemented!") doAssert(false, "not implemented!")
method readExactly*(s: LPStream, pbytes: pointer, nbytes: int): Future[void] method readExactly*(s: LPStream, pbytes: pointer,
{.base, async.} = nbytes: int): Future[void] {.base, async.} =
doAssert(false, "not implemented!") doAssert(false, "not implemented!")
method readLine*(s: LPStream, limit = 0, sep = "\r\n"): Future[string] method readLine*(s: LPStream, limit = 0, sep = "\r\n"): Future[string]

View File

@ -283,7 +283,7 @@ proc start*(s: Switch): Future[seq[Future[void]]] {.async, gcsafe.} =
proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} = proc handle(conn: Connection): Future[void] {.async, closure, gcsafe.} =
try: try:
await s.upgradeIncoming(conn) # perform upgrade on incoming connection await s.upgradeIncoming(conn) # perform upgrade on incoming connection
except CatchableError as exc: except CatchableError as exc:
trace "exception occurred in Switch.start", exc = exc.msg trace "exception occurred in Switch.start", exc = exc.msg
finally: finally:
@ -325,24 +325,31 @@ proc subscribeToPeer(s: Switch, peerInfo: PeerInfo) {.async, gcsafe.} =
warn "unable to initiate pubsub", exc = exc.msg warn "unable to initiate pubsub", exc = exc.msg
s.dialedPubSubPeers.excl(peerInfo.id) s.dialedPubSubPeers.excl(peerInfo.id)
proc subscribe*(s: Switch, topic: string, handler: TopicHandler): Future[void] {.gcsafe.} = proc subscribe*(s: Switch, topic: string,
handler: TopicHandler): Future[void] =
## subscribe to a pubsub topic ## subscribe to a pubsub topic
if s.pubSub.isNone: if s.pubSub.isNone:
raise newNoPubSubException() var retFuture = newFuture[void]("Switch.subscribe")
retFuture.fail(newNoPubSubException())
return retFuture
result = s.pubSub.get().subscribe(topic, handler) result = s.pubSub.get().subscribe(topic, handler)
proc unsubscribe*(s: Switch, topics: seq[TopicPair]): Future[void] {.gcsafe.} = proc unsubscribe*(s: Switch, topics: seq[TopicPair]): Future[void] =
## unsubscribe from topics ## unsubscribe from topics
if s.pubSub.isNone: if s.pubSub.isNone:
raise newNoPubSubException() var retFuture = newFuture[void]("Switch.unsubscribe")
retFuture.fail(newNoPubSubException())
return retFuture
result = s.pubSub.get().unsubscribe(topics) result = s.pubSub.get().unsubscribe(topics)
proc publish*(s: Switch, topic: string, data: seq[byte]): Future[void] {.gcsafe.} = proc publish*(s: Switch, topic: string, data: seq[byte]): Future[void] =
# pubslish to pubsub topic # pubslish to pubsub topic
if s.pubSub.isNone: if s.pubSub.isNone:
raise newNoPubSubException() var retFuture = newFuture[void]("Switch.publish")
retFuture.fail(newNoPubSubException())
return retFuture
result = s.pubSub.get().publish(topic, data) result = s.pubSub.get().publish(topic, data)

View File

@ -381,7 +381,8 @@ suite "Mplex":
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let chann = newChannel(1, newConnection(newBufferStream(writeHandler)), true) let chann = newChannel(1, newConnection(newBufferStream(writeHandler)), true)
await chann.reset() await chann.reset()
asyncDiscard chann.read() var data = await chann.read()
doAssert(len(data) == 1)
expect LPStreamEOFError: expect LPStreamEOFError:
waitFor(testResetRead()) waitFor(testResetRead())