diff --git a/libp2p/stream/lpstream.nim b/libp2p/stream/lpstream.nim index cfb452437..86188be48 100644 --- a/libp2p/stream/lpstream.nim +++ b/libp2p/stream/lpstream.nim @@ -75,17 +75,6 @@ method readOnce*(s: LPStream, {.base, async.} = doAssert(false, "not implemented!") -proc read*(s: LPStream, nbytes: int): Future[seq[byte]] {.async, deprecated: "readExactly".} = - # This function is deprecated - it was broken and used inappropriately as - # `readExacltly` in tests and code - tests still need refactoring to remove - # any calls - # `read` without nbytes was also incorrectly implemented - it worked more - # like `readOnce` in that it would not wait for stream to close, in - # BufferStream in particular - both tests and implementation were broken - var ret = newSeq[byte](nbytes) - await readExactly(s, addr ret[0], ret.len) - return ret - proc readLine*(s: LPStream, limit = 0, sep = "\r\n"): Future[string] {.async, deprecated: "todo".} = # TODO replace with something that exploits buffering better var lim = if limit <= 0: -1 else: limit diff --git a/tests/testbufferstream.nim b/tests/testbufferstream.nim index d5ca1d6b6..e8d6e3821 100644 --- a/tests/testbufferstream.nim +++ b/tests/testbufferstream.nim @@ -52,29 +52,9 @@ suite "BufferStream": check buff.len == 0 await buff.pushTo(cast[seq[byte]](@"12345")) - let data = cast[string](await buff.read(3)) - check ['1', '2', '3'] == data - - result = true - - await buff.close() - - check: - waitFor(testRead()) == true - - test "read and wait": - proc testRead(): Future[bool] {.async.} = - proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard - let buff = newBufferStream(writeHandler, 10) - check buff.len == 0 - - await buff.pushTo(cast[seq[byte]](@"123")) - check buff.len == 3 - let readFut = buff.read(5) - await buff.pushTo(cast[seq[byte]](@"45")) - check buff.len == 2 - - check cast[string](await readFut) == ['1', '2', '3', '4', '5'] + var data = newSeq[byte](3) + await buff.readExactly(addr data[0], 3) + check ['1', '2', '3'] == cast[string](data) result = true @@ -213,17 +193,28 @@ suite "BufferStream": await buff.pushTo(cast[seq[byte]]("Msg 2")) await buff.pushTo(cast[seq[byte]]("Msg 3")) - check cast[string](await buff.read(5)) == "Msg 1" - check cast[string](await buff.read(5)) == "Msg 2" - check cast[string](await buff.read(5)) == "Msg 3" + var data = newSeq[byte](5) + await buff.readExactly(addr data[0], 5) + check cast[string](data) == "Msg 1" + + await buff.readExactly(addr data[0], 5) + check cast[string](data) == "Msg 2" + + await buff.readExactly(addr data[0], 5) + check cast[string](data) == "Msg 3" await buff.pushTo(cast[seq[byte]]("Msg 4")) await buff.pushTo(cast[seq[byte]]("Msg 5")) await buff.pushTo(cast[seq[byte]]("Msg 6")) - check cast[string](await buff.read(5)) == "Msg 4" - check cast[string](await buff.read(5)) == "Msg 5" - check cast[string](await buff.read(5)) == "Msg 6" + await buff.readExactly(addr data[0], 5) + check cast[string](data) == "Msg 4" + + await buff.readExactly(addr data[0], 5) + check cast[string](data) == "Msg 5" + + await buff.readExactly(addr data[0], 5) + check cast[string](data) == "Msg 6" result = true @@ -329,7 +320,10 @@ suite "BufferStream": buf1 = buf1.pipe(buf1) - proc reader(): Future[seq[byte]] = buf1.read(6) + proc reader(): Future[seq[byte]] {.async.} = + result = newSeq[byte](6) + await buf1.readExactly(addr result[0], 6) + proc writer(): Future[void] = buf1.write(cast[seq[byte]]("Hello!")) var writerFut = writer() @@ -402,7 +396,10 @@ suite "BufferStream": buf1 = buf1 | buf1 - proc reader(): Future[seq[byte]] = buf1.read(6) + proc reader(): Future[seq[byte]] {.async.} = + result = newSeq[byte](6) + await buf1.readExactly(addr result[0], 6) + proc writer(): Future[void] = buf1.write(cast[seq[byte]]("Hello!")) var writerFut = writer() @@ -423,15 +420,14 @@ suite "BufferStream": # piping to self test "pipe deadlock": proc pipeTest(): Future[bool] {.async.} = - var buf1 = newBufferStream(size = 5) buf1 = buf1 | buf1 var count = 30000 proc reader() {.async.} = - while count > 0: - discard await buf1.read(7) + var data = newSeq[byte](7) + await buf1.readExactly(addr data[0], 7) proc writer() {.async.} = while count > 0: @@ -470,4 +466,3 @@ suite "BufferStream": check: waitFor(closeTest()) == true - diff --git a/tests/testmplex.nim b/tests/testmplex.nim index c5c8f5154..a457363fb 100644 --- a/tests/testmplex.nim +++ b/tests/testmplex.nim @@ -447,7 +447,9 @@ suite "Mplex": test "half closed - channel should close for read by remote": proc testClosedForRead(): Future[void] {.async.} = - proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard + proc writeHandler(data: seq[byte]) {.async, gcsafe.} = + discard + let buff = newBufferStream(writeHandler) conn = newConnection(buff) @@ -456,8 +458,9 @@ suite "Mplex": try: await chann.pushTo(cast[seq[byte]]("Hello!")) await chann.closedByRemote() - discard await chann.read(6) # this should work, since there is data in the buffer - discard await chann.read(6) # this should throw + var data = newSeq[byte](6) + await chann.readExactly(addr data[0], 6) # this should work, since there is data in the buffer + await chann.readExactly(addr data[0], 6) # this should throw finally: await chann.cleanUp() await conn.close() @@ -602,7 +605,8 @@ suite "Mplex": try: await chann.reset() - var data = await chann.read(1) + var data = newSeq[byte](1) + await chann.readExactly(addr data[0], 1) doAssert(len(data) == 1) finally: await chann.cleanUp() diff --git a/tests/testnoise.nim b/tests/testnoise.nim index 28c8f50a4..1adba74e3 100644 --- a/tests/testnoise.nim +++ b/tests/testnoise.nim @@ -98,7 +98,8 @@ suite "Noise": conn = await transport2.dial(transport1.ma) sconn = await clientNoise.secure(conn, true) - msg = await sconn.read(6) + var msg = newSeq[byte](6) + await sconn.readExactly(addr msg[0], 6) await sconn.close() await conn.close() @@ -123,7 +124,8 @@ suite "Noise": defer: await sconn.close() await conn.close() - let msg = await sconn.read(6) + var msg = newSeq[byte](6) + await sconn.readExactly(addr msg[0], 6) check cast[string](msg) == "Hello!" readTask.complete() diff --git a/tests/testtransport.nim b/tests/testtransport.nim index 4a538720d..72376a712 100644 --- a/tests/testtransport.nim +++ b/tests/testtransport.nim @@ -45,7 +45,8 @@ suite "TCP transport": let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") let handlerWait = newFuture[void]() proc connHandler(conn: Connection) {.async, gcsafe.} = - let msg = await conn.read(6) + var msg = newSeq[byte](6) + await conn.readExactly(addr msg[0], 6) check cast[string](msg) == "Hello!" await conn.close() handlerWait.complete() @@ -84,7 +85,8 @@ suite "TCP transport": let ma: MultiAddress = MultiAddress.init(server.sock.getLocalAddress()) let transport: TcpTransport = newTransport(TcpTransport) let conn = await transport.dial(ma) - let msg = await conn.read(6) + var msg = newSeq[byte](6) + await conn.readExactly(addr msg[0], 6) result = cast[string](msg) == "Hello!" await handlerWait.wait(5000.millis) # when no issues will not wait that long! @@ -148,7 +150,8 @@ suite "TCP transport": let transport2: TcpTransport = newTransport(TcpTransport) let conn = await transport2.dial(transport1.ma) - let msg = await conn.read(6) + var msg = newSeq[byte](6) + await conn.readExactly(addr msg[0], 6) await handlerWait.wait(5000.millis) # when no issues will not wait that long! @@ -166,7 +169,8 @@ suite "TCP transport": let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0") let handlerWait = newFuture[void]() proc connHandler(conn: Connection) {.async, gcsafe.} = - let msg = await conn.read(6) + var msg = newSeq[byte](6) + await conn.readExactly(addr msg[0], 6) check cast[string](msg) == "Hello!" await conn.close() handlerWait.complete()