2023-05-18 10:24:17 +02:00
|
|
|
{.used.}
|
|
|
|
|
|
|
|
# Nim-Libp2p
|
|
|
|
# Copyright (c) 2023 Status Research & Development GmbH
|
|
|
|
# Licensed under either of
|
|
|
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
|
|
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
|
|
|
# at your option.
|
|
|
|
# This file may not be copied, modified, or distributed except according to
|
|
|
|
# those terms.
|
|
|
|
|
|
|
|
import strformat, random, sequtils
|
2020-05-18 07:49:49 -06:00
|
|
|
import chronos, nimcrypto/utils, chronicles, stew/byteutils
|
2020-04-21 10:24:42 +09:00
|
|
|
import ../libp2p/[errors,
|
2020-06-19 11:29:43 -06:00
|
|
|
stream/connection,
|
2019-10-29 20:51:48 +02:00
|
|
|
stream/bufferstream,
|
|
|
|
transports/tcptransport,
|
|
|
|
transports/transport,
|
2019-10-03 13:30:22 -06:00
|
|
|
multiaddress,
|
2019-10-29 20:51:48 +02:00
|
|
|
muxers/mplex/mplex,
|
|
|
|
muxers/mplex/coder,
|
2020-03-11 09:12:08 -06:00
|
|
|
muxers/mplex/lpchannel,
|
2021-03-18 09:20:36 -06:00
|
|
|
upgrademngrs/upgrade,
|
2020-03-11 09:12:08 -06:00
|
|
|
vbuffer,
|
|
|
|
varint]
|
2019-09-03 14:40:51 -06:00
|
|
|
|
2020-05-08 22:10:06 +02:00
|
|
|
import ./helpers
|
2019-10-29 20:51:48 +02:00
|
|
|
|
2019-09-03 14:40:51 -06:00
|
|
|
suite "Mplex":
|
2020-04-21 10:24:42 +09:00
|
|
|
teardown:
|
2020-09-21 19:48:19 +02:00
|
|
|
checkTrackers()
|
2020-04-21 10:24:42 +09:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
suite "channel encoding":
|
|
|
|
asyncTest "encode header with channel id 0":
|
2019-09-07 00:20:03 -06:00
|
|
|
proc encHandler(msg: seq[byte]) {.async.} =
|
|
|
|
check msg == fromHex("000873747265616d2031")
|
2019-10-29 20:51:48 +02:00
|
|
|
|
2021-06-07 09:32:08 +02:00
|
|
|
let conn = TestBufferStream.new(encHandler)
|
2020-05-18 11:05:34 -06:00
|
|
|
await conn.writeMsg(0, MessageType.New, ("stream 1").toBytes)
|
2020-05-18 07:49:49 -06:00
|
|
|
await conn.close()
|
2020-04-21 10:24:42 +09:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
asyncTest "encode header with channel id other than 0":
|
2019-09-07 00:20:03 -06:00
|
|
|
proc encHandler(msg: seq[byte]) {.async.} =
|
|
|
|
check msg == fromHex("88010873747265616d2031")
|
2019-10-29 20:51:48 +02:00
|
|
|
|
2021-06-07 09:32:08 +02:00
|
|
|
let conn = TestBufferStream.new(encHandler)
|
2020-05-18 11:05:34 -06:00
|
|
|
await conn.writeMsg(17, MessageType.New, ("stream 1").toBytes)
|
2020-05-18 07:49:49 -06:00
|
|
|
await conn.close()
|
2020-04-21 10:24:42 +09:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
asyncTest "encode header and body with channel id 0":
|
2019-09-07 00:20:03 -06:00
|
|
|
proc encHandler(msg: seq[byte]) {.async.} =
|
|
|
|
check msg == fromHex("020873747265616d2031")
|
2019-10-29 20:51:48 +02:00
|
|
|
|
2021-06-07 09:32:08 +02:00
|
|
|
let conn = TestBufferStream.new(encHandler)
|
2020-05-18 11:05:34 -06:00
|
|
|
await conn.writeMsg(0, MessageType.MsgOut, ("stream 1").toBytes)
|
2020-05-18 07:49:49 -06:00
|
|
|
await conn.close()
|
2020-04-21 10:24:42 +09:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
asyncTest "encode header and body with channel id other than 0":
|
2019-09-07 00:20:03 -06:00
|
|
|
proc encHandler(msg: seq[byte]) {.async.} =
|
|
|
|
check msg == fromHex("8a010873747265616d2031")
|
|
|
|
|
2021-06-07 09:32:08 +02:00
|
|
|
let conn = TestBufferStream.new(encHandler)
|
2020-05-18 11:05:34 -06:00
|
|
|
await conn.writeMsg(17, MessageType.MsgOut, ("stream 1").toBytes)
|
2019-09-07 00:20:03 -06:00
|
|
|
await conn.close()
|
2020-04-21 10:24:42 +09:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
asyncTest "decode header with channel id 0":
|
2021-06-07 09:32:08 +02:00
|
|
|
let stream = BufferStream.new()
|
2020-06-19 11:29:43 -06:00
|
|
|
let conn = stream
|
2020-09-21 19:48:19 +02:00
|
|
|
await stream.pushData(fromHex("000873747265616d2031"))
|
2019-09-08 00:32:41 -06:00
|
|
|
let msg = await conn.readMsg()
|
2019-09-03 14:40:51 -06:00
|
|
|
|
2020-02-12 09:43:42 -05:00
|
|
|
check msg.id == 0
|
|
|
|
check msg.msgType == MessageType.New
|
2020-05-18 07:49:49 -06:00
|
|
|
await conn.close()
|
2020-04-21 10:24:42 +09:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
asyncTest "decode header and body with channel id 0":
|
2021-06-07 09:32:08 +02:00
|
|
|
let stream = BufferStream.new()
|
2020-06-19 11:29:43 -06:00
|
|
|
let conn = stream
|
2020-09-21 19:48:19 +02:00
|
|
|
await stream.pushData(fromHex("021668656C6C6F2066726F6D206368616E6E656C20302121"))
|
2019-09-08 00:32:41 -06:00
|
|
|
let msg = await conn.readMsg()
|
2019-09-07 00:20:03 -06:00
|
|
|
|
2020-02-12 09:43:42 -05:00
|
|
|
check msg.id == 0
|
|
|
|
check msg.msgType == MessageType.MsgOut
|
2020-05-18 11:05:34 -06:00
|
|
|
check string.fromBytes(msg.data) == "hello from channel 0!!"
|
2020-05-18 07:49:49 -06:00
|
|
|
await conn.close()
|
2020-04-21 10:24:42 +09:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
asyncTest "decode header and body with channel id other than 0":
|
2021-06-07 09:32:08 +02:00
|
|
|
let stream = BufferStream.new()
|
2020-06-19 11:29:43 -06:00
|
|
|
let conn = stream
|
2020-09-21 19:48:19 +02:00
|
|
|
await stream.pushData(fromHex("8a011668656C6C6F2066726F6D206368616E6E656C20302121"))
|
2019-09-08 00:32:41 -06:00
|
|
|
let msg = await conn.readMsg()
|
2019-09-07 00:20:03 -06:00
|
|
|
|
2020-02-12 09:43:42 -05:00
|
|
|
check msg.id == 17
|
|
|
|
check msg.msgType == MessageType.MsgOut
|
2020-05-18 11:05:34 -06:00
|
|
|
check string.fromBytes(msg.data) == "hello from channel 0!!"
|
2020-05-18 07:49:49 -06:00
|
|
|
await conn.close()
|
2020-04-21 10:24:42 +09:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
suite "channel half-closed":
|
|
|
|
asyncTest "(local close) - should close for write":
|
2020-05-18 07:49:49 -06:00
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
2021-06-07 09:32:08 +02:00
|
|
|
conn = TestBufferStream.new(writeHandler)
|
2020-07-17 12:44:41 -06:00
|
|
|
chann = LPChannel.init(1, conn, true)
|
2020-11-12 21:44:02 -06:00
|
|
|
|
2020-05-18 11:05:34 -06:00
|
|
|
await chann.close()
|
2020-11-12 21:44:02 -06:00
|
|
|
expect LPStreamClosedError:
|
2020-05-18 07:49:49 -06:00
|
|
|
await chann.write("Hello")
|
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
await chann.reset()
|
|
|
|
await conn.close()
|
2020-04-21 10:24:42 +09:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
asyncTest "(local close) - should allow reads until remote closes":
|
2020-08-11 23:23:49 -06:00
|
|
|
let
|
2021-06-07 09:32:08 +02:00
|
|
|
conn = TestBufferStream.new(
|
2020-08-11 23:23:49 -06:00
|
|
|
proc (data: seq[byte]) {.gcsafe, async.} =
|
|
|
|
discard,
|
|
|
|
)
|
|
|
|
chann = LPChannel.init(1, conn, true)
|
|
|
|
|
2020-09-21 19:48:19 +02:00
|
|
|
await chann.pushData(("Hello!").toBytes)
|
2020-08-11 23:23:49 -06:00
|
|
|
|
|
|
|
var data = newSeq[byte](6)
|
|
|
|
await chann.close() # closing channel
|
|
|
|
# should be able to read on local clsoe
|
|
|
|
await chann.readExactly(addr data[0], 3)
|
|
|
|
# closing remote end
|
2020-09-21 19:48:19 +02:00
|
|
|
let closeFut = chann.pushEof()
|
2020-08-11 23:23:49 -06:00
|
|
|
# should still allow reading until buffer EOF
|
|
|
|
await chann.readExactly(addr data[3], 3)
|
2020-11-12 21:44:02 -06:00
|
|
|
|
2022-09-14 10:58:41 +02:00
|
|
|
expect LPStreamRemoteClosedError:
|
2020-08-11 23:23:49 -06:00
|
|
|
# this should fail now
|
|
|
|
await chann.readExactly(addr data[0], 3)
|
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
await chann.close()
|
|
|
|
await conn.close()
|
|
|
|
await closeFut
|
2020-08-11 23:23:49 -06:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
asyncTest "(remote close) - channel should close for reading by remote":
|
2020-05-18 07:49:49 -06:00
|
|
|
let
|
2021-06-07 09:32:08 +02:00
|
|
|
conn = TestBufferStream.new(
|
2020-05-18 07:49:49 -06:00
|
|
|
proc (data: seq[byte]) {.gcsafe, async.} =
|
2020-08-10 16:17:11 -06:00
|
|
|
discard,
|
2020-06-19 11:29:43 -06:00
|
|
|
)
|
2020-07-17 12:44:41 -06:00
|
|
|
chann = LPChannel.init(1, conn, true)
|
2020-05-18 07:49:49 -06:00
|
|
|
|
2020-09-21 19:48:19 +02:00
|
|
|
await chann.pushData(("Hello!").toBytes)
|
2020-05-18 07:49:49 -06:00
|
|
|
|
2020-05-18 11:05:34 -06:00
|
|
|
var data = newSeq[byte](6)
|
2020-08-10 16:17:11 -06:00
|
|
|
await chann.readExactly(addr data[0], 3)
|
2020-09-21 19:48:19 +02:00
|
|
|
let closeFut = chann.pushEof() # closing channel
|
2020-08-10 16:17:11 -06:00
|
|
|
let readFut = chann.readExactly(addr data[3], 3)
|
2020-11-17 15:59:25 +01:00
|
|
|
await allFutures(closeFut, readFut)
|
2020-11-12 21:44:02 -06:00
|
|
|
|
2022-09-14 10:58:41 +02:00
|
|
|
expect LPStreamRemoteClosedError:
|
2020-08-10 16:17:11 -06:00
|
|
|
await chann.readExactly(addr data[0], 6) # this should fail now
|
2020-05-18 07:49:49 -06:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
await chann.close()
|
|
|
|
await conn.close()
|
2020-05-18 07:49:49 -06:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
asyncTest "(remote close) - channel should allow writing on remote close":
|
2020-08-10 16:17:11 -06:00
|
|
|
let
|
|
|
|
testData = "Hello!".toBytes
|
2021-06-07 09:32:08 +02:00
|
|
|
conn = TestBufferStream.new(
|
2020-08-10 16:17:11 -06:00
|
|
|
proc (data: seq[byte]) {.gcsafe, async.} =
|
|
|
|
discard
|
|
|
|
)
|
|
|
|
chann = LPChannel.init(1, conn, true)
|
|
|
|
|
2020-09-21 19:48:19 +02:00
|
|
|
await chann.pushEof() # closing channel
|
2020-08-10 16:17:11 -06:00
|
|
|
try:
|
|
|
|
await chann.writeLp(testData)
|
|
|
|
finally:
|
2020-09-21 19:48:19 +02:00
|
|
|
await chann.reset() # there's nobody reading the EOF!
|
2020-08-10 16:17:11 -06:00
|
|
|
await conn.close()
|
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
asyncTest "should not allow pushing data to channel when remote end closed":
|
2020-05-18 07:49:49 -06:00
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
2021-06-07 09:32:08 +02:00
|
|
|
conn = TestBufferStream.new(writeHandler)
|
2020-07-17 12:44:41 -06:00
|
|
|
chann = LPChannel.init(1, conn, true)
|
2020-09-21 19:48:19 +02:00
|
|
|
await chann.pushEof()
|
|
|
|
var buf: array[1, byte]
|
|
|
|
check: (await chann.readOnce(addr buf[0], 1)) == 0 # EOF marker read
|
2020-11-12 21:44:02 -06:00
|
|
|
|
2022-09-14 10:58:41 +02:00
|
|
|
expect LPStreamClosedError:
|
2020-09-21 19:48:19 +02:00
|
|
|
await chann.pushData(@[byte(1)])
|
2020-05-18 07:49:49 -06:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
await chann.close()
|
|
|
|
await conn.close()
|
|
|
|
|
|
|
|
suite "channel reset":
|
2020-05-18 07:49:49 -06:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
asyncTest "channel should fail reading":
|
2020-05-18 07:49:49 -06:00
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
2021-06-07 09:32:08 +02:00
|
|
|
conn = TestBufferStream.new(writeHandler)
|
2020-07-17 12:44:41 -06:00
|
|
|
chann = LPChannel.init(1, conn, true)
|
2020-05-18 07:49:49 -06:00
|
|
|
|
2020-05-18 11:05:34 -06:00
|
|
|
await chann.reset()
|
|
|
|
var data = newSeq[byte](1)
|
2022-09-14 10:58:41 +02:00
|
|
|
expect LPStreamClosedError:
|
2020-05-18 07:49:49 -06:00
|
|
|
await chann.readExactly(addr data[0], 1)
|
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
await conn.close()
|
2020-05-18 07:49:49 -06:00
|
|
|
|
2020-11-17 15:59:25 +01:00
|
|
|
asyncTest "reset should complete read":
|
2020-09-21 19:48:19 +02:00
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
2021-06-07 09:32:08 +02:00
|
|
|
conn = TestBufferStream.new(writeHandler)
|
2020-09-21 19:48:19 +02:00
|
|
|
chann = LPChannel.init(1, conn, true)
|
|
|
|
|
|
|
|
var data = newSeq[byte](1)
|
|
|
|
let fut = chann.readExactly(addr data[0], 1)
|
2020-11-12 21:44:02 -06:00
|
|
|
|
2020-09-21 19:48:19 +02:00
|
|
|
await chann.reset()
|
2022-09-14 10:58:41 +02:00
|
|
|
expect LPStreamClosedError:
|
2020-09-21 19:48:19 +02:00
|
|
|
await fut
|
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
await conn.close()
|
2020-09-21 19:48:19 +02:00
|
|
|
|
2020-11-17 15:59:25 +01:00
|
|
|
asyncTest "reset should complete pushData":
|
2020-09-21 19:48:19 +02:00
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
2021-06-07 09:32:08 +02:00
|
|
|
conn = TestBufferStream.new(writeHandler)
|
2020-09-21 19:48:19 +02:00
|
|
|
chann = LPChannel.init(1, conn, true)
|
|
|
|
|
2020-11-23 09:07:11 -06:00
|
|
|
proc pushes() {.async.} = # pushes don't hang on reset
|
|
|
|
await chann.pushData(@[0'u8])
|
|
|
|
await chann.pushData(@[0'u8])
|
|
|
|
await chann.pushData(@[0'u8])
|
|
|
|
await chann.pushData(@[0'u8])
|
|
|
|
await chann.pushData(@[0'u8])
|
|
|
|
await chann.pushData(@[0'u8])
|
|
|
|
|
|
|
|
let push = pushes()
|
2020-09-21 19:48:19 +02:00
|
|
|
await chann.reset()
|
2020-11-23 09:07:11 -06:00
|
|
|
check await allFutures(push).withTimeout(100.millis)
|
2020-09-21 19:48:19 +02:00
|
|
|
await conn.close()
|
|
|
|
|
2020-11-17 15:59:25 +01:00
|
|
|
asyncTest "reset should complete both read and push":
|
2020-09-21 19:48:19 +02:00
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
2021-06-07 09:32:08 +02:00
|
|
|
conn = TestBufferStream.new(writeHandler)
|
2020-09-21 19:48:19 +02:00
|
|
|
chann = LPChannel.init(1, conn, true)
|
|
|
|
|
|
|
|
var data = newSeq[byte](1)
|
2020-11-17 15:59:25 +01:00
|
|
|
let futs = [
|
|
|
|
chann.readExactly(addr data[0], 1),
|
|
|
|
chann.pushData(@[0'u8]),
|
|
|
|
]
|
2020-09-21 19:48:19 +02:00
|
|
|
await chann.reset()
|
2020-11-17 15:59:25 +01:00
|
|
|
check await allFutures(futs).withTimeout(100.millis)
|
2020-09-21 19:48:19 +02:00
|
|
|
await conn.close()
|
|
|
|
|
2020-11-17 15:59:25 +01:00
|
|
|
asyncTest "reset should complete both read and pushes":
|
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
2021-06-07 09:32:08 +02:00
|
|
|
conn = TestBufferStream.new(writeHandler)
|
2020-11-17 15:59:25 +01:00
|
|
|
chann = LPChannel.init(1, conn, true)
|
|
|
|
|
|
|
|
var data = newSeq[byte](1)
|
2020-11-23 09:07:11 -06:00
|
|
|
let read = chann.readExactly(addr data[0], 1)
|
|
|
|
proc pushes() {.async.} =
|
|
|
|
await chann.pushData(@[0'u8])
|
|
|
|
await chann.pushData(@[0'u8])
|
|
|
|
await chann.pushData(@[0'u8])
|
|
|
|
await chann.pushData(@[0'u8])
|
|
|
|
await chann.pushData(@[0'u8])
|
|
|
|
await chann.pushData(@[0'u8])
|
|
|
|
await chann.pushData(@[0'u8])
|
|
|
|
await chann.pushData(@[0'u8])
|
|
|
|
await chann.pushData(@[0'u8])
|
|
|
|
await chann.pushData(@[0'u8])
|
|
|
|
await chann.pushData(@[0'u8])
|
|
|
|
|
2020-11-17 15:59:25 +01:00
|
|
|
await chann.reset()
|
2020-11-23 09:07:11 -06:00
|
|
|
check await allFutures(read, pushes()).withTimeout(100.millis)
|
2020-11-17 15:59:25 +01:00
|
|
|
await conn.close()
|
|
|
|
|
|
|
|
asyncTest "reset should complete both read and push with cancel":
|
2020-11-13 19:30:14 +01:00
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
2021-06-07 09:32:08 +02:00
|
|
|
conn = TestBufferStream.new(writeHandler)
|
2020-11-13 19:30:14 +01:00
|
|
|
chann = LPChannel.init(1, conn, true)
|
|
|
|
|
|
|
|
var data = newSeq[byte](1)
|
|
|
|
let rfut = chann.readExactly(addr data[0], 1)
|
|
|
|
rfut.cancel()
|
2020-11-17 15:59:25 +01:00
|
|
|
let xfut = chann.reset()
|
2020-11-13 19:30:14 +01:00
|
|
|
|
2020-11-17 15:59:25 +01:00
|
|
|
check await allFutures(rfut, xfut).withTimeout(100.millis)
|
2020-11-13 19:30:14 +01:00
|
|
|
await conn.close()
|
|
|
|
|
|
|
|
asyncTest "should complete both read and push after reset":
|
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
2021-06-07 09:32:08 +02:00
|
|
|
conn = TestBufferStream.new(writeHandler)
|
2020-11-13 19:30:14 +01:00
|
|
|
chann = LPChannel.init(1, conn, true)
|
|
|
|
|
|
|
|
var data = newSeq[byte](1)
|
|
|
|
let rfut = chann.readExactly(addr data[0], 1)
|
2020-11-17 15:59:25 +01:00
|
|
|
let rfut2 = sleepAsync(1.millis) or rfut
|
2020-11-13 19:30:14 +01:00
|
|
|
|
|
|
|
await sleepAsync(5.millis)
|
|
|
|
|
|
|
|
let wfut = chann.pushData(@[0'u8])
|
|
|
|
let wfut2 = chann.pushData(@[0'u8])
|
|
|
|
await chann.reset()
|
2020-11-17 15:59:25 +01:00
|
|
|
check await allFutures(rfut, rfut2, wfut, wfut2).withTimeout(100.millis)
|
2020-11-13 19:30:14 +01:00
|
|
|
await conn.close()
|
|
|
|
|
2020-11-23 09:07:11 -06:00
|
|
|
asyncTest "reset should complete ongoing push without reader":
|
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
2021-06-07 09:32:08 +02:00
|
|
|
conn = TestBufferStream.new(writeHandler)
|
2020-11-23 09:07:11 -06:00
|
|
|
chann = LPChannel.init(1, conn, true)
|
|
|
|
|
|
|
|
await chann.pushData(@[0'u8])
|
|
|
|
let push1 = chann.pushData(@[0'u8])
|
|
|
|
await chann.reset()
|
|
|
|
check await allFutures(push1).withTimeout(100.millis)
|
|
|
|
await conn.close()
|
|
|
|
|
|
|
|
asyncTest "reset should complete ongoing read without a push":
|
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
2021-06-07 09:32:08 +02:00
|
|
|
conn = TestBufferStream.new(writeHandler)
|
2020-11-23 09:07:11 -06:00
|
|
|
chann = LPChannel.init(1, conn, true)
|
|
|
|
|
|
|
|
var data = newSeq[byte](1)
|
|
|
|
let rfut = chann.readExactly(addr data[0], 1)
|
|
|
|
await chann.reset()
|
|
|
|
check await allFutures(rfut).withTimeout(100.millis)
|
|
|
|
await conn.close()
|
|
|
|
|
|
|
|
asyncTest "reset should allow all reads and pushes to complete":
|
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
2021-06-07 09:32:08 +02:00
|
|
|
conn = TestBufferStream.new(writeHandler)
|
2020-11-23 09:07:11 -06:00
|
|
|
chann = LPChannel.init(1, conn, true)
|
|
|
|
|
|
|
|
var data = newSeq[byte](1)
|
|
|
|
proc writer() {.async.} =
|
|
|
|
await chann.pushData(@[0'u8])
|
|
|
|
await chann.pushData(@[0'u8])
|
|
|
|
await chann.pushData(@[0'u8])
|
|
|
|
|
|
|
|
proc reader() {.async.} =
|
|
|
|
await chann.readExactly(addr data[0], 1)
|
|
|
|
await chann.readExactly(addr data[0], 1)
|
|
|
|
await chann.readExactly(addr data[0], 1)
|
|
|
|
|
|
|
|
let rw = @[writer(), reader()]
|
|
|
|
|
|
|
|
await chann.close()
|
|
|
|
check await chann.reset() # this would hang
|
|
|
|
.withTimeout(100.millis)
|
|
|
|
|
|
|
|
check await allFuturesThrowing(
|
|
|
|
allFinished(rw))
|
|
|
|
.withTimeout(100.millis)
|
|
|
|
|
|
|
|
await conn.close()
|
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
asyncTest "channel should fail writing":
|
2020-05-18 07:49:49 -06:00
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
2021-06-07 09:32:08 +02:00
|
|
|
conn = TestBufferStream.new(writeHandler)
|
2020-07-17 12:44:41 -06:00
|
|
|
chann = LPChannel.init(1, conn, true)
|
2020-05-18 11:05:34 -06:00
|
|
|
await chann.reset()
|
2020-11-12 21:44:02 -06:00
|
|
|
|
|
|
|
expect LPStreamClosedError:
|
2020-05-18 11:05:34 -06:00
|
|
|
await chann.write(("Hello!").toBytes)
|
2020-05-18 07:49:49 -06:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
await conn.close()
|
2020-02-12 09:43:42 -05:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
asyncTest "channel should reset on timeout":
|
2020-08-10 16:17:11 -06:00
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
2021-06-07 09:32:08 +02:00
|
|
|
conn = TestBufferStream.new(writeHandler)
|
2020-08-10 16:17:11 -06:00
|
|
|
chann = LPChannel.init(
|
|
|
|
1, conn, true, timeout = 100.millis)
|
2020-07-17 12:44:41 -06:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
check await chann.join().withTimeout(1.minutes)
|
2020-08-10 16:17:11 -06:00
|
|
|
await conn.close()
|
2020-07-17 12:44:41 -06:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
suite "mplex e2e":
|
|
|
|
asyncTest "read/write receiver":
|
2021-12-16 11:05:20 +01:00
|
|
|
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
|
2019-09-25 16:57:27 -06:00
|
|
|
|
2021-06-30 10:59:30 +02:00
|
|
|
let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
|
2020-11-18 20:06:42 -06:00
|
|
|
let listenFut = transport1.start(ma)
|
|
|
|
|
|
|
|
proc acceptHandler() {.async, gcsafe.} =
|
|
|
|
let conn = await transport1.accept()
|
2021-10-25 10:26:32 +02:00
|
|
|
let mplexListen = Mplex.new(conn)
|
2020-05-18 07:49:49 -06:00
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
2020-05-08 22:58:23 +02:00
|
|
|
let msg = await stream.readLp(1024)
|
2020-05-18 11:05:34 -06:00
|
|
|
check string.fromBytes(msg) == "HELLO"
|
2019-09-25 16:57:27 -06:00
|
|
|
await stream.close()
|
|
|
|
|
2020-04-21 10:24:42 +09:00
|
|
|
await mplexListen.handle()
|
2020-05-18 07:49:49 -06:00
|
|
|
await mplexListen.close()
|
2019-09-25 16:57:27 -06:00
|
|
|
|
2020-11-18 20:06:42 -06:00
|
|
|
let acceptFut = acceptHandler()
|
2021-06-30 10:59:30 +02:00
|
|
|
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
|
2021-11-24 14:01:12 -06:00
|
|
|
let conn = await transport2.dial(transport1.addrs[0])
|
2019-09-25 16:57:27 -06:00
|
|
|
|
2021-10-25 10:26:32 +02:00
|
|
|
let mplexDial = Mplex.new(conn)
|
2020-05-18 07:49:49 -06:00
|
|
|
let mplexDialFut = mplexDial.handle()
|
2020-02-12 02:30:36 +09:00
|
|
|
let stream = await mplexDial.newStream()
|
2020-05-18 07:49:49 -06:00
|
|
|
await stream.writeLp("HELLO")
|
2020-06-19 11:29:43 -06:00
|
|
|
check LPChannel(stream).isOpen # not lazy
|
2020-04-21 10:24:42 +09:00
|
|
|
await stream.close()
|
2020-05-18 07:49:49 -06:00
|
|
|
|
2020-04-21 10:24:42 +09:00
|
|
|
await conn.close()
|
2020-11-18 20:06:42 -06:00
|
|
|
await acceptFut.wait(1.seconds)
|
2020-06-29 09:15:31 -06:00
|
|
|
await mplexDialFut.wait(1.seconds)
|
2020-06-02 17:53:38 -06:00
|
|
|
await allFuturesThrowing(
|
2020-11-18 20:06:42 -06:00
|
|
|
transport1.stop(),
|
|
|
|
transport2.stop())
|
2020-11-12 21:44:02 -06:00
|
|
|
await listenFut
|
2020-02-12 02:30:36 +09:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
asyncTest "read/write receiver lazy":
|
2021-12-16 11:05:20 +01:00
|
|
|
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
|
2020-02-12 02:30:36 +09:00
|
|
|
|
2021-06-30 10:59:30 +02:00
|
|
|
let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
|
2020-11-18 20:06:42 -06:00
|
|
|
let listenFut = transport1.start(ma)
|
|
|
|
|
|
|
|
proc acceptHandler() {.async, gcsafe.} =
|
|
|
|
let conn = await transport1.accept()
|
2021-10-25 10:26:32 +02:00
|
|
|
let mplexListen = Mplex.new(conn)
|
2020-05-18 07:49:49 -06:00
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
2020-05-08 22:58:23 +02:00
|
|
|
let msg = await stream.readLp(1024)
|
2020-05-18 11:05:34 -06:00
|
|
|
check string.fromBytes(msg) == "HELLO"
|
2020-02-12 02:30:36 +09:00
|
|
|
await stream.close()
|
|
|
|
|
2020-04-21 10:24:42 +09:00
|
|
|
await mplexListen.handle()
|
2020-05-18 07:49:49 -06:00
|
|
|
await mplexListen.close()
|
2020-02-12 02:30:36 +09:00
|
|
|
|
2020-11-18 20:06:42 -06:00
|
|
|
let acceptFut = acceptHandler()
|
2021-06-30 10:59:30 +02:00
|
|
|
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
|
2021-11-24 14:01:12 -06:00
|
|
|
let conn = await transport2.dial(transport1.addrs[0])
|
2020-02-12 02:30:36 +09:00
|
|
|
|
2021-10-25 10:26:32 +02:00
|
|
|
let mplexDial = Mplex.new(conn)
|
2020-05-18 07:49:49 -06:00
|
|
|
let stream = await mplexDial.newStream(lazy = true)
|
|
|
|
let mplexDialFut = mplexDial.handle()
|
2020-06-19 11:29:43 -06:00
|
|
|
check not LPChannel(stream).isOpen # assert lazy
|
2020-05-18 07:49:49 -06:00
|
|
|
await stream.writeLp("HELLO")
|
2020-06-19 11:29:43 -06:00
|
|
|
check LPChannel(stream).isOpen # assert lazy
|
2020-05-18 07:49:49 -06:00
|
|
|
await stream.close()
|
2019-09-25 16:57:27 -06:00
|
|
|
|
2020-04-21 10:24:42 +09:00
|
|
|
await conn.close()
|
2020-11-18 20:06:42 -06:00
|
|
|
await acceptFut.wait(1.seconds)
|
2020-05-18 07:49:49 -06:00
|
|
|
await mplexDialFut
|
2020-06-02 17:53:38 -06:00
|
|
|
await allFuturesThrowing(
|
2020-11-18 20:06:42 -06:00
|
|
|
transport1.stop(),
|
|
|
|
transport2.stop())
|
2020-03-27 17:37:00 -06:00
|
|
|
await listenFut
|
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
asyncTest "write fragmented":
|
2020-02-07 15:41:24 +09:00
|
|
|
let
|
2021-12-16 11:05:20 +01:00
|
|
|
ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
|
2020-02-09 12:46:52 +09:00
|
|
|
listenJob = newFuture[void]()
|
2020-02-06 15:24:11 +09:00
|
|
|
|
2020-04-04 00:26:46 +09:00
|
|
|
var bigseq = newSeqOfCap[uint8](MaxMsgSize * 2)
|
|
|
|
for _ in 0..<MaxMsgSize:
|
|
|
|
bigseq.add(uint8(rand(uint('A')..uint('z'))))
|
|
|
|
|
2021-06-30 10:59:30 +02:00
|
|
|
let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
|
2020-11-18 20:06:42 -06:00
|
|
|
let listenFut = transport1.start(ma)
|
|
|
|
|
|
|
|
proc acceptHandler() {.async, gcsafe.} =
|
2020-08-10 16:17:11 -06:00
|
|
|
try:
|
2020-11-18 20:06:42 -06:00
|
|
|
let conn = await transport1.accept()
|
2021-10-25 10:26:32 +02:00
|
|
|
let mplexListen = Mplex.new(conn)
|
2020-08-10 16:17:11 -06:00
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
|
|
|
let msg = await stream.readLp(MaxMsgSize)
|
|
|
|
check msg == bigseq
|
|
|
|
trace "Bigseq check passed!"
|
|
|
|
await stream.close()
|
|
|
|
listenJob.complete()
|
|
|
|
|
|
|
|
await mplexListen.handle()
|
|
|
|
await sleepAsync(1.seconds) # give chronos some slack to process things
|
|
|
|
await mplexListen.close()
|
|
|
|
except CancelledError as exc:
|
|
|
|
raise exc
|
|
|
|
except CatchableError as exc:
|
|
|
|
check false
|
2020-02-06 15:24:11 +09:00
|
|
|
|
2020-11-18 20:06:42 -06:00
|
|
|
let acceptFut = acceptHandler()
|
2021-06-30 10:59:30 +02:00
|
|
|
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
|
2021-11-24 14:01:12 -06:00
|
|
|
let conn = await transport2.dial(transport1.addrs[0])
|
2020-02-06 15:24:11 +09:00
|
|
|
|
2021-10-25 10:26:32 +02:00
|
|
|
let mplexDial = Mplex.new(conn)
|
2020-05-18 07:49:49 -06:00
|
|
|
let mplexDialFut = mplexDial.handle()
|
2020-02-06 15:24:11 +09:00
|
|
|
let stream = await mplexDial.newStream()
|
2020-03-27 08:25:52 -06:00
|
|
|
|
2020-04-04 00:26:46 +09:00
|
|
|
await stream.writeLp(bigseq)
|
2020-11-12 21:44:02 -06:00
|
|
|
await listenJob.wait(10.seconds)
|
2020-03-27 08:25:52 -06:00
|
|
|
|
2020-04-21 10:24:42 +09:00
|
|
|
await stream.close()
|
|
|
|
await conn.close()
|
2020-11-18 20:06:42 -06:00
|
|
|
await acceptFut
|
2020-05-18 07:49:49 -06:00
|
|
|
await mplexDialFut
|
2020-06-02 17:53:38 -06:00
|
|
|
await allFuturesThrowing(
|
2020-11-18 20:06:42 -06:00
|
|
|
transport1.stop(),
|
|
|
|
transport2.stop())
|
2020-03-27 17:37:00 -06:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
await listenFut
|
2020-02-06 15:24:11 +09:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
asyncTest "read/write initiator":
|
2021-12-16 11:05:20 +01:00
|
|
|
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
|
2019-09-25 16:57:27 -06:00
|
|
|
|
2021-06-30 10:59:30 +02:00
|
|
|
let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
|
2020-11-18 20:06:42 -06:00
|
|
|
let listenFut = transport1.start(ma)
|
|
|
|
|
|
|
|
proc acceptHandler() {.async, gcsafe.} =
|
|
|
|
let conn = await transport1.accept()
|
2021-10-25 10:26:32 +02:00
|
|
|
let mplexListen = Mplex.new(conn)
|
2020-05-18 07:49:49 -06:00
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
2019-09-25 16:57:27 -06:00
|
|
|
await stream.writeLp("Hello from stream!")
|
|
|
|
await stream.close()
|
|
|
|
|
|
|
|
await mplexListen.handle()
|
2020-05-18 07:49:49 -06:00
|
|
|
await mplexListen.close()
|
2019-09-25 16:57:27 -06:00
|
|
|
|
2021-06-30 10:59:30 +02:00
|
|
|
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
|
2021-11-24 14:01:12 -06:00
|
|
|
let conn = await transport2.dial(transport1.addrs[0])
|
2019-09-25 16:57:27 -06:00
|
|
|
|
2020-11-18 20:06:42 -06:00
|
|
|
let acceptFut = acceptHandler()
|
2021-10-25 10:26:32 +02:00
|
|
|
let mplexDial = Mplex.new(conn)
|
2020-05-18 07:49:49 -06:00
|
|
|
let mplexDialFut = mplexDial.handle()
|
2019-09-25 16:57:27 -06:00
|
|
|
let stream = await mplexDial.newStream("DIALER")
|
2020-05-18 11:05:34 -06:00
|
|
|
let msg = string.fromBytes(await stream.readLp(1024))
|
2020-05-18 07:49:49 -06:00
|
|
|
await stream.close()
|
2019-09-25 16:57:27 -06:00
|
|
|
check msg == "Hello from stream!"
|
2020-04-21 10:24:42 +09:00
|
|
|
|
|
|
|
await conn.close()
|
2020-11-18 20:06:42 -06:00
|
|
|
await acceptFut.wait(1.seconds)
|
2020-05-18 07:49:49 -06:00
|
|
|
await mplexDialFut
|
2020-06-02 17:53:38 -06:00
|
|
|
await allFuturesThrowing(
|
2020-11-18 20:06:42 -06:00
|
|
|
transport1.stop(),
|
|
|
|
transport2.stop())
|
2020-03-27 17:37:00 -06:00
|
|
|
await listenFut
|
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
asyncTest "multiple streams":
|
2021-12-16 11:05:20 +01:00
|
|
|
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
|
2019-09-25 16:57:27 -06:00
|
|
|
|
2021-06-30 10:59:30 +02:00
|
|
|
let transport1 = TcpTransport.new(upgrade = Upgrade())
|
2020-11-18 20:06:42 -06:00
|
|
|
let listenFut = transport1.start(ma)
|
|
|
|
|
2020-04-21 10:24:42 +09:00
|
|
|
let done = newFuture[void]()
|
2020-11-18 20:06:42 -06:00
|
|
|
proc acceptHandler() {.async, gcsafe.} =
|
2020-05-18 07:49:49 -06:00
|
|
|
var count = 1
|
2020-11-18 20:06:42 -06:00
|
|
|
let conn = await transport1.accept()
|
2021-10-25 10:26:32 +02:00
|
|
|
let mplexListen = Mplex.new(conn)
|
2020-05-18 07:49:49 -06:00
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
2020-05-08 22:58:23 +02:00
|
|
|
let msg = await stream.readLp(1024)
|
2020-05-18 07:49:49 -06:00
|
|
|
check string.fromBytes(msg) == &"stream {count}!"
|
2019-09-25 16:57:27 -06:00
|
|
|
count.inc
|
2020-11-17 15:59:25 +01:00
|
|
|
if count == 11:
|
2020-04-21 10:24:42 +09:00
|
|
|
done.complete()
|
2020-11-17 15:59:25 +01:00
|
|
|
await stream.close()
|
2019-09-25 16:57:27 -06:00
|
|
|
|
|
|
|
await mplexListen.handle()
|
2020-05-18 07:49:49 -06:00
|
|
|
await mplexListen.close()
|
2019-09-25 16:57:27 -06:00
|
|
|
|
2021-06-30 10:59:30 +02:00
|
|
|
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
|
2021-11-24 14:01:12 -06:00
|
|
|
let conn = await transport2.dial(transport1.addrs[0])
|
2019-09-25 16:57:27 -06:00
|
|
|
|
2020-11-18 20:06:42 -06:00
|
|
|
let acceptFut = acceptHandler()
|
2021-10-25 10:26:32 +02:00
|
|
|
let mplexDial = Mplex.new(conn)
|
2020-05-18 07:49:49 -06:00
|
|
|
# TODO: Reenable once half-closed is working properly
|
2020-05-19 18:14:15 -06:00
|
|
|
let mplexDialFut = mplexDial.handle()
|
2020-03-27 17:37:00 -06:00
|
|
|
for i in 1..10:
|
2019-09-25 16:57:27 -06:00
|
|
|
let stream = await mplexDial.newStream()
|
|
|
|
await stream.writeLp(&"stream {i}!")
|
|
|
|
await stream.close()
|
|
|
|
|
2020-05-18 07:49:49 -06:00
|
|
|
await done.wait(10.seconds)
|
2020-04-21 10:24:42 +09:00
|
|
|
await conn.close()
|
2020-11-18 20:06:42 -06:00
|
|
|
await acceptFut.wait(1.seconds)
|
2020-05-19 18:14:15 -06:00
|
|
|
await mplexDialFut
|
2020-06-02 17:53:38 -06:00
|
|
|
await allFuturesThrowing(
|
2020-11-18 20:06:42 -06:00
|
|
|
transport1.stop(),
|
|
|
|
transport2.stop())
|
2020-03-27 17:37:00 -06:00
|
|
|
await listenFut
|
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
asyncTest "multiple read/write streams":
|
2021-12-16 11:05:20 +01:00
|
|
|
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
|
2019-09-25 16:57:27 -06:00
|
|
|
|
2021-06-30 10:59:30 +02:00
|
|
|
let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
|
2020-11-18 20:06:42 -06:00
|
|
|
let listenFut = transport1.start(ma)
|
|
|
|
|
2020-04-21 10:24:42 +09:00
|
|
|
let done = newFuture[void]()
|
2020-11-18 20:06:42 -06:00
|
|
|
proc acceptHandler() {.async, gcsafe.} =
|
2020-05-18 07:49:49 -06:00
|
|
|
var count = 1
|
2020-11-18 20:06:42 -06:00
|
|
|
let conn = await transport1.accept()
|
2021-10-25 10:26:32 +02:00
|
|
|
let mplexListen = Mplex.new(conn)
|
2020-05-18 07:49:49 -06:00
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
2020-05-08 22:58:23 +02:00
|
|
|
let msg = await stream.readLp(1024)
|
2020-05-18 11:05:34 -06:00
|
|
|
check string.fromBytes(msg) == &"stream {count} from dialer!"
|
2019-09-25 16:57:27 -06:00
|
|
|
await stream.writeLp(&"stream {count} from listener!")
|
|
|
|
count.inc
|
2020-11-17 15:59:25 +01:00
|
|
|
if count == 11:
|
2020-04-21 10:24:42 +09:00
|
|
|
done.complete()
|
2020-11-17 15:59:25 +01:00
|
|
|
await stream.close()
|
2019-09-25 16:57:27 -06:00
|
|
|
|
2020-04-21 10:24:42 +09:00
|
|
|
await mplexListen.handle()
|
2020-05-18 07:49:49 -06:00
|
|
|
await mplexListen.close()
|
2019-09-25 16:57:27 -06:00
|
|
|
|
2021-06-30 10:59:30 +02:00
|
|
|
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
|
2021-11-24 14:01:12 -06:00
|
|
|
let conn = await transport2.dial(transport1.addrs[0])
|
2019-09-25 16:57:27 -06:00
|
|
|
|
2020-11-18 20:06:42 -06:00
|
|
|
let acceptFut = acceptHandler()
|
2021-10-25 10:26:32 +02:00
|
|
|
let mplexDial = Mplex.new(conn)
|
2020-05-18 07:49:49 -06:00
|
|
|
let mplexDialFut = mplexDial.handle()
|
2019-09-25 16:57:27 -06:00
|
|
|
for i in 1..10:
|
|
|
|
let stream = await mplexDial.newStream("dialer stream")
|
|
|
|
await stream.writeLp(&"stream {i} from dialer!")
|
2020-05-08 22:58:23 +02:00
|
|
|
let msg = await stream.readLp(1024)
|
2020-05-18 11:05:34 -06:00
|
|
|
check string.fromBytes(msg) == &"stream {i} from listener!"
|
2019-09-25 16:57:27 -06:00
|
|
|
await stream.close()
|
|
|
|
|
2020-04-21 10:24:42 +09:00
|
|
|
await done.wait(5.seconds)
|
2019-09-25 16:57:27 -06:00
|
|
|
await conn.close()
|
2020-11-18 20:06:42 -06:00
|
|
|
await acceptFut.wait(1.seconds)
|
2020-05-18 07:49:49 -06:00
|
|
|
await mplexDialFut
|
2020-09-21 19:48:19 +02:00
|
|
|
await mplexDial.close()
|
2020-06-02 17:53:38 -06:00
|
|
|
await allFuturesThrowing(
|
2020-11-18 20:06:42 -06:00
|
|
|
transport1.stop(),
|
|
|
|
transport2.stop())
|
2020-05-18 07:49:49 -06:00
|
|
|
await listenFut
|
2020-05-14 22:02:05 -06:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
asyncTest "channel closes listener with EOF":
|
2021-12-16 11:05:20 +01:00
|
|
|
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
|
2020-11-06 09:24:24 -06:00
|
|
|
|
2021-06-30 10:59:30 +02:00
|
|
|
let transport1 = TcpTransport.new(upgrade = Upgrade())
|
2020-11-06 09:24:24 -06:00
|
|
|
var listenStreams: seq[Connection]
|
2020-11-18 20:06:42 -06:00
|
|
|
proc acceptHandler() {.async, gcsafe.} =
|
|
|
|
let conn = await transport1.accept()
|
2021-10-25 10:26:32 +02:00
|
|
|
let mplexListen = Mplex.new(conn)
|
2020-11-18 20:06:42 -06:00
|
|
|
|
2020-11-06 09:24:24 -06:00
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
|
|
|
listenStreams.add(stream)
|
|
|
|
try:
|
|
|
|
discard await stream.readLp(1024)
|
|
|
|
except LPStreamEOFError:
|
|
|
|
await stream.close()
|
|
|
|
return
|
|
|
|
|
|
|
|
check false
|
|
|
|
|
|
|
|
await mplexListen.handle()
|
|
|
|
await mplexListen.close()
|
|
|
|
|
2020-11-18 20:06:42 -06:00
|
|
|
await transport1.start(ma)
|
|
|
|
let acceptFut = acceptHandler()
|
2021-06-30 10:59:30 +02:00
|
|
|
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
|
2021-11-24 14:01:12 -06:00
|
|
|
let conn = await transport2.dial(transport1.addrs[0])
|
2020-11-06 09:24:24 -06:00
|
|
|
|
2021-10-25 10:26:32 +02:00
|
|
|
let mplexDial = Mplex.new(conn)
|
2020-11-06 09:24:24 -06:00
|
|
|
let mplexDialFut = mplexDial.handle()
|
2023-03-31 00:16:39 +02:00
|
|
|
var dialStreams = toSeq(0..9).mapIt(await mplexDial.newStream())
|
|
|
|
|
|
|
|
check:
|
|
|
|
unorderedCompare(dialStreams, mplexDial.getStreams())
|
2020-11-06 09:24:24 -06:00
|
|
|
|
|
|
|
for i, s in dialStreams:
|
|
|
|
await s.closeWithEOF()
|
|
|
|
check listenStreams[i].closed
|
|
|
|
check s.closed
|
|
|
|
|
|
|
|
checkTracker(LPChannelTrackerName)
|
|
|
|
|
|
|
|
await conn.close()
|
|
|
|
await mplexDialFut
|
|
|
|
await allFuturesThrowing(
|
2020-11-18 20:06:42 -06:00
|
|
|
transport1.stop(),
|
|
|
|
transport2.stop())
|
|
|
|
await acceptFut
|
2020-11-06 09:24:24 -06:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
asyncTest "channel closes dialer with EOF":
|
2021-12-16 11:05:20 +01:00
|
|
|
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
|
2021-06-30 10:59:30 +02:00
|
|
|
let transport1 = TcpTransport.new(upgrade = Upgrade())
|
2020-11-06 09:24:24 -06:00
|
|
|
|
|
|
|
var count = 0
|
|
|
|
var done = newFuture[void]()
|
2020-11-18 20:06:42 -06:00
|
|
|
var listenStreams: seq[Connection]
|
|
|
|
proc acceptHandler() {.async, gcsafe.} =
|
|
|
|
let conn = await transport1.accept()
|
2021-10-25 10:26:32 +02:00
|
|
|
let mplexListen = Mplex.new(conn)
|
2020-11-06 09:24:24 -06:00
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
|
|
|
listenStreams.add(stream)
|
|
|
|
count.inc()
|
|
|
|
if count == 10:
|
|
|
|
done.complete()
|
|
|
|
|
|
|
|
await stream.join()
|
|
|
|
|
|
|
|
await mplexListen.handle()
|
|
|
|
await mplexListen.close()
|
|
|
|
|
2020-11-18 20:06:42 -06:00
|
|
|
await transport1.start(ma)
|
|
|
|
let acceptFut = acceptHandler()
|
2020-11-06 09:24:24 -06:00
|
|
|
|
2021-06-30 10:59:30 +02:00
|
|
|
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
|
2021-11-24 14:01:12 -06:00
|
|
|
let conn = await transport2.dial(transport1.addrs[0])
|
2020-11-06 09:24:24 -06:00
|
|
|
|
2021-10-25 10:26:32 +02:00
|
|
|
let mplexDial = Mplex.new(conn)
|
2020-11-06 09:24:24 -06:00
|
|
|
let mplexDialFut = mplexDial.handle()
|
2023-03-31 00:16:39 +02:00
|
|
|
var dialStreams = toSeq(0..9).mapIt(await mplexDial.newStream())
|
|
|
|
|
|
|
|
check:
|
|
|
|
unorderedCompare(dialStreams, mplexDial.getStreams())
|
2020-11-06 09:24:24 -06:00
|
|
|
|
|
|
|
proc dialReadLoop() {.async.} =
|
|
|
|
for s in dialStreams:
|
|
|
|
try:
|
|
|
|
discard await s.readLp(1024)
|
|
|
|
check false
|
|
|
|
except LPStreamEOFError:
|
|
|
|
await s.close()
|
|
|
|
continue
|
|
|
|
|
|
|
|
check false
|
|
|
|
|
|
|
|
await done
|
|
|
|
let readLoop = dialReadLoop()
|
|
|
|
for s in listenStreams:
|
|
|
|
await s.closeWithEOF()
|
|
|
|
check s.closed
|
|
|
|
|
|
|
|
await readLoop
|
|
|
|
await allFuturesThrowing(
|
|
|
|
(dialStreams & listenStreams)
|
2020-11-06 12:20:59 -06:00
|
|
|
.mapIt( it.join() ))
|
2020-11-06 09:24:24 -06:00
|
|
|
|
|
|
|
checkTracker(LPChannelTrackerName)
|
|
|
|
|
|
|
|
await conn.close()
|
|
|
|
await mplexDialFut
|
|
|
|
await allFuturesThrowing(
|
2020-11-18 20:06:42 -06:00
|
|
|
transport1.stop(),
|
|
|
|
transport2.stop())
|
|
|
|
await acceptFut
|
2020-11-06 09:24:24 -06:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
asyncTest "dialing mplex closes both ends":
|
2021-12-16 11:05:20 +01:00
|
|
|
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
|
2021-06-30 10:59:30 +02:00
|
|
|
let transport1 = TcpTransport.new(upgrade = Upgrade())
|
2020-11-06 09:24:24 -06:00
|
|
|
|
|
|
|
var listenStreams: seq[Connection]
|
2020-11-18 20:06:42 -06:00
|
|
|
proc acceptHandler() {.async, gcsafe.} =
|
|
|
|
let conn = await transport1.accept()
|
2021-10-25 10:26:32 +02:00
|
|
|
let mplexListen = Mplex.new(conn)
|
2020-11-06 09:24:24 -06:00
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
|
|
|
listenStreams.add(stream)
|
|
|
|
await stream.join()
|
|
|
|
|
|
|
|
await mplexListen.handle()
|
|
|
|
await mplexListen.close()
|
|
|
|
|
2020-11-18 20:06:42 -06:00
|
|
|
await transport1.start(ma)
|
|
|
|
let acceptFut = acceptHandler()
|
2020-11-06 09:24:24 -06:00
|
|
|
|
2021-06-30 10:59:30 +02:00
|
|
|
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
|
2021-11-24 14:01:12 -06:00
|
|
|
let conn = await transport2.dial(transport1.addrs[0])
|
2020-11-06 09:24:24 -06:00
|
|
|
|
2021-10-25 10:26:32 +02:00
|
|
|
let mplexDial = Mplex.new(conn)
|
2020-11-06 09:24:24 -06:00
|
|
|
let mplexDialFut = mplexDial.handle()
|
2023-03-31 00:16:39 +02:00
|
|
|
var dialStreams = toSeq(0..9).mapIt(await mplexDial.newStream())
|
|
|
|
|
|
|
|
check:
|
|
|
|
unorderedCompare(dialStreams, mplexDial.getStreams())
|
2020-11-06 09:24:24 -06:00
|
|
|
|
|
|
|
await mplexDial.close()
|
|
|
|
await allFuturesThrowing(
|
|
|
|
(dialStreams & listenStreams)
|
2020-11-06 12:20:59 -06:00
|
|
|
.mapIt( it.join() ))
|
2020-11-06 09:24:24 -06:00
|
|
|
|
|
|
|
checkTracker(LPChannelTrackerName)
|
|
|
|
|
|
|
|
await conn.close()
|
|
|
|
await mplexDialFut
|
|
|
|
await allFuturesThrowing(
|
2020-11-18 20:06:42 -06:00
|
|
|
transport1.stop(),
|
|
|
|
transport2.stop())
|
|
|
|
await acceptFut
|
2020-11-06 09:24:24 -06:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
asyncTest "listening mplex closes both ends":
|
2021-12-16 11:05:20 +01:00
|
|
|
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
|
2021-06-30 10:59:30 +02:00
|
|
|
let transport1 = TcpTransport.new(upgrade = Upgrade())
|
2020-11-06 09:24:24 -06:00
|
|
|
|
|
|
|
var mplexListen: Mplex
|
|
|
|
var listenStreams: seq[Connection]
|
2020-11-18 20:06:42 -06:00
|
|
|
proc acceptHandler() {.async, gcsafe.} =
|
|
|
|
let conn = await transport1.accept()
|
2021-10-25 10:26:32 +02:00
|
|
|
mplexListen = Mplex.new(conn)
|
2020-11-06 09:24:24 -06:00
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
|
|
|
listenStreams.add(stream)
|
|
|
|
await stream.join()
|
|
|
|
|
|
|
|
await mplexListen.handle()
|
|
|
|
await mplexListen.close()
|
|
|
|
|
2020-11-18 20:06:42 -06:00
|
|
|
await transport1.start(ma)
|
|
|
|
let acceptFut = acceptHandler()
|
2020-11-06 09:24:24 -06:00
|
|
|
|
2021-06-30 10:59:30 +02:00
|
|
|
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
|
2021-11-24 14:01:12 -06:00
|
|
|
let conn = await transport2.dial(transport1.addrs[0])
|
2020-11-06 09:24:24 -06:00
|
|
|
|
2021-10-25 10:26:32 +02:00
|
|
|
let mplexDial = Mplex.new(conn)
|
2020-11-06 09:24:24 -06:00
|
|
|
let mplexDialFut = mplexDial.handle()
|
2023-03-31 00:16:39 +02:00
|
|
|
var dialStreams = toSeq(0..9).mapIt(await mplexDial.newStream())
|
|
|
|
|
|
|
|
check:
|
|
|
|
unorderedCompare(dialStreams, mplexDial.getStreams())
|
2020-11-06 09:24:24 -06:00
|
|
|
|
2022-10-28 01:10:24 +02:00
|
|
|
checkExpiring: listenStreams.len == 10 and dialStreams.len == 10
|
2021-12-13 11:25:33 +01:00
|
|
|
|
2020-11-06 09:24:24 -06:00
|
|
|
await mplexListen.close()
|
|
|
|
await allFuturesThrowing(
|
|
|
|
(dialStreams & listenStreams)
|
2020-11-06 12:20:59 -06:00
|
|
|
.mapIt( it.join() ))
|
2020-11-06 09:24:24 -06:00
|
|
|
|
|
|
|
checkTracker(LPChannelTrackerName)
|
|
|
|
|
|
|
|
await conn.close()
|
|
|
|
await mplexDialFut
|
|
|
|
await allFuturesThrowing(
|
2020-11-18 20:06:42 -06:00
|
|
|
transport1.stop(),
|
|
|
|
transport2.stop())
|
|
|
|
await acceptFut
|
2020-11-06 09:24:24 -06:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
asyncTest "canceling mplex handler closes both ends":
|
2021-12-16 11:05:20 +01:00
|
|
|
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
|
2021-06-30 10:59:30 +02:00
|
|
|
let transport1 = TcpTransport.new(upgrade = Upgrade())
|
2020-11-06 09:24:24 -06:00
|
|
|
|
|
|
|
var mplexHandle: Future[void]
|
|
|
|
var listenStreams: seq[Connection]
|
2020-11-18 20:06:42 -06:00
|
|
|
proc acceptHandler() {.async, gcsafe.} =
|
|
|
|
let conn = await transport1.accept()
|
2021-10-25 10:26:32 +02:00
|
|
|
let mplexListen = Mplex.new(conn)
|
2020-11-06 09:24:24 -06:00
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
|
|
|
listenStreams.add(stream)
|
|
|
|
await stream.join()
|
|
|
|
|
|
|
|
mplexHandle = mplexListen.handle()
|
|
|
|
await mplexHandle
|
|
|
|
await mplexListen.close()
|
|
|
|
|
2020-11-18 20:06:42 -06:00
|
|
|
await transport1.start(ma)
|
|
|
|
let acceptFut = acceptHandler()
|
2020-11-06 09:24:24 -06:00
|
|
|
|
2021-06-30 10:59:30 +02:00
|
|
|
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
|
2021-11-24 14:01:12 -06:00
|
|
|
let conn = await transport2.dial(transport1.addrs[0])
|
2020-11-06 09:24:24 -06:00
|
|
|
|
2021-10-25 10:26:32 +02:00
|
|
|
let mplexDial = Mplex.new(conn)
|
2020-11-06 09:24:24 -06:00
|
|
|
let mplexDialFut = mplexDial.handle()
|
2023-03-31 00:16:39 +02:00
|
|
|
var dialStreams = toSeq(0..9).mapIt(await mplexDial.newStream())
|
|
|
|
|
|
|
|
check:
|
|
|
|
unorderedCompare(dialStreams, mplexDial.getStreams())
|
2020-11-06 09:24:24 -06:00
|
|
|
|
2022-10-28 01:10:24 +02:00
|
|
|
checkExpiring: listenStreams.len == 10 and dialStreams.len == 10
|
2021-12-13 11:25:33 +01:00
|
|
|
|
2020-11-06 09:24:24 -06:00
|
|
|
mplexHandle.cancel()
|
|
|
|
await allFuturesThrowing(
|
|
|
|
(dialStreams & listenStreams)
|
2020-11-06 12:20:59 -06:00
|
|
|
.mapIt( it.join() ))
|
2020-11-06 09:24:24 -06:00
|
|
|
|
|
|
|
checkTracker(LPChannelTrackerName)
|
|
|
|
|
|
|
|
await conn.close()
|
|
|
|
await mplexDialFut
|
|
|
|
await allFuturesThrowing(
|
2020-11-18 20:06:42 -06:00
|
|
|
transport1.stop(),
|
|
|
|
transport2.stop())
|
2020-11-06 09:24:24 -06:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
asyncTest "closing dialing connection should close both ends":
|
2021-12-16 11:05:20 +01:00
|
|
|
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
|
2021-06-30 10:59:30 +02:00
|
|
|
let transport1 = TcpTransport.new(upgrade = Upgrade())
|
2020-11-06 09:24:24 -06:00
|
|
|
|
|
|
|
var listenStreams: seq[Connection]
|
2020-11-18 20:06:42 -06:00
|
|
|
proc acceptHandler() {.async, gcsafe.} =
|
|
|
|
let conn = await transport1.accept()
|
2021-10-25 10:26:32 +02:00
|
|
|
let mplexListen = Mplex.new(conn)
|
2020-11-06 09:24:24 -06:00
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
|
|
|
listenStreams.add(stream)
|
|
|
|
await stream.join()
|
|
|
|
|
|
|
|
await mplexListen.handle()
|
|
|
|
await mplexListen.close()
|
|
|
|
|
2020-11-18 20:06:42 -06:00
|
|
|
await transport1.start(ma)
|
|
|
|
let acceptFut = acceptHandler()
|
2020-11-06 09:24:24 -06:00
|
|
|
|
2021-06-30 10:59:30 +02:00
|
|
|
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
|
2021-11-24 14:01:12 -06:00
|
|
|
let conn = await transport2.dial(transport1.addrs[0])
|
2020-11-06 09:24:24 -06:00
|
|
|
|
2021-10-25 10:26:32 +02:00
|
|
|
let mplexDial = Mplex.new(conn)
|
2020-11-06 09:24:24 -06:00
|
|
|
let mplexDialFut = mplexDial.handle()
|
2023-03-31 00:16:39 +02:00
|
|
|
var dialStreams = toSeq(0..9).mapIt(await mplexDial.newStream())
|
|
|
|
|
|
|
|
check:
|
|
|
|
unorderedCompare(dialStreams, mplexDial.getStreams())
|
2020-11-06 09:24:24 -06:00
|
|
|
|
2022-10-28 01:10:24 +02:00
|
|
|
checkExpiring: listenStreams.len == 10 and dialStreams.len == 10
|
2021-12-13 11:25:33 +01:00
|
|
|
|
2020-11-06 09:24:24 -06:00
|
|
|
await conn.close()
|
|
|
|
await allFuturesThrowing(
|
|
|
|
(dialStreams & listenStreams)
|
2020-11-06 12:20:59 -06:00
|
|
|
.mapIt( it.join() ))
|
2020-11-06 09:24:24 -06:00
|
|
|
|
2021-12-13 11:25:33 +01:00
|
|
|
|
2020-11-06 09:24:24 -06:00
|
|
|
checkTracker(LPChannelTrackerName)
|
|
|
|
|
2020-12-02 19:24:48 -06:00
|
|
|
await conn.closeWithEOF()
|
2020-11-06 09:24:24 -06:00
|
|
|
await mplexDialFut
|
|
|
|
await allFuturesThrowing(
|
2020-11-18 20:06:42 -06:00
|
|
|
transport1.stop(),
|
|
|
|
transport2.stop())
|
|
|
|
await acceptFut
|
2020-11-06 09:24:24 -06:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
asyncTest "canceling listening connection should close both ends":
|
2021-12-16 11:05:20 +01:00
|
|
|
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
|
2021-06-30 10:59:30 +02:00
|
|
|
let transport1 = TcpTransport.new(upgrade = Upgrade())
|
2020-11-06 09:24:24 -06:00
|
|
|
|
|
|
|
var listenConn: Connection
|
|
|
|
var listenStreams: seq[Connection]
|
2020-11-18 20:06:42 -06:00
|
|
|
proc acceptHandler() {.async, gcsafe.} =
|
|
|
|
listenConn = await transport1.accept()
|
2021-10-25 10:26:32 +02:00
|
|
|
let mplexListen = Mplex.new(listenConn)
|
2020-11-06 09:24:24 -06:00
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
|
|
|
listenStreams.add(stream)
|
|
|
|
await stream.join()
|
|
|
|
|
|
|
|
await mplexListen.handle()
|
|
|
|
await mplexListen.close()
|
|
|
|
|
2020-11-18 20:06:42 -06:00
|
|
|
await transport1.start(ma)
|
|
|
|
let acceptFut = acceptHandler()
|
2020-11-06 09:24:24 -06:00
|
|
|
|
2021-06-30 10:59:30 +02:00
|
|
|
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
|
2021-11-24 14:01:12 -06:00
|
|
|
let conn = await transport2.dial(transport1.addrs[0])
|
2020-11-06 09:24:24 -06:00
|
|
|
|
2021-10-25 10:26:32 +02:00
|
|
|
let mplexDial = Mplex.new(conn)
|
2020-11-06 09:24:24 -06:00
|
|
|
let mplexDialFut = mplexDial.handle()
|
2023-03-31 00:16:39 +02:00
|
|
|
var dialStreams = toSeq(0..9).mapIt(await mplexDial.newStream())
|
|
|
|
|
|
|
|
check:
|
|
|
|
unorderedCompare(dialStreams, mplexDial.getStreams())
|
2020-11-06 09:24:24 -06:00
|
|
|
|
2022-10-28 01:10:24 +02:00
|
|
|
checkExpiring: listenStreams.len == 10 and dialStreams.len == 10
|
2021-12-13 11:25:33 +01:00
|
|
|
|
2020-12-02 19:24:48 -06:00
|
|
|
await listenConn.closeWithEOF()
|
2020-11-06 09:24:24 -06:00
|
|
|
await allFuturesThrowing(
|
|
|
|
(dialStreams & listenStreams)
|
2020-11-06 12:20:59 -06:00
|
|
|
.mapIt( it.join() ))
|
2020-11-06 09:24:24 -06:00
|
|
|
|
|
|
|
checkTracker(LPChannelTrackerName)
|
|
|
|
|
|
|
|
await conn.close()
|
|
|
|
await mplexDialFut
|
|
|
|
await allFuturesThrowing(
|
2020-11-18 20:06:42 -06:00
|
|
|
transport1.stop(),
|
|
|
|
transport2.stop())
|
|
|
|
await acceptFut
|
2020-11-06 09:24:24 -06:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
suite "jitter":
|
|
|
|
asyncTest "channel should be able to handle erratic read/writes":
|
2021-12-16 11:05:20 +01:00
|
|
|
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
|
2020-03-11 09:12:08 -06:00
|
|
|
|
2021-06-30 10:59:30 +02:00
|
|
|
let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
|
2020-11-18 20:06:42 -06:00
|
|
|
let listenFut = transport1.start(ma)
|
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
var complete = newFuture[void]()
|
|
|
|
const MsgSize = 1024
|
2020-11-18 20:06:42 -06:00
|
|
|
proc acceptHandler() {.async, gcsafe.} =
|
|
|
|
let conn = await transport1.accept()
|
2021-10-25 10:26:32 +02:00
|
|
|
let mplexListen = Mplex.new(conn)
|
2020-11-12 21:44:02 -06:00
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
|
|
|
try:
|
|
|
|
let msg = await stream.readLp(MsgSize)
|
|
|
|
check msg.len == MsgSize
|
|
|
|
except CatchableError as e:
|
|
|
|
echo e.msg
|
|
|
|
await stream.close()
|
|
|
|
complete.complete()
|
2020-03-11 16:23:39 -06:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
await mplexListen.handle()
|
|
|
|
await mplexListen.close()
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 08:19:13 +02:00
|
|
|
|
2021-06-30 10:59:30 +02:00
|
|
|
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
|
2021-11-24 14:01:12 -06:00
|
|
|
let conn = await transport2.dial(transport1.addrs[0])
|
2020-11-12 21:44:02 -06:00
|
|
|
|
2020-11-18 20:06:42 -06:00
|
|
|
let acceptFut = acceptHandler()
|
2021-10-25 10:26:32 +02:00
|
|
|
let mplexDial = Mplex.new(conn)
|
2020-11-12 21:44:02 -06:00
|
|
|
let mplexDialFut = mplexDial.handle()
|
|
|
|
let stream = await mplexDial.newStream()
|
|
|
|
var bigseq = newSeqOfCap[uint8](MaxMsgSize + 1)
|
|
|
|
for _ in 0..<MsgSize: # write one less than max size
|
|
|
|
bigseq.add(uint8(rand(uint('A')..uint('z'))))
|
|
|
|
|
|
|
|
## create length prefixed libp2p frame
|
|
|
|
var buf = initVBuffer()
|
|
|
|
buf.writeSeq(bigseq)
|
|
|
|
buf.finish()
|
|
|
|
|
|
|
|
## create mplex header
|
|
|
|
var mplexBuf = initVBuffer()
|
|
|
|
mplexBuf.writePBVarint((1.uint shl 3) or ord(MessageType.MsgOut).uint)
|
|
|
|
mplexBuf.writePBVarint(buf.buffer.len.uint) # size should be always sent
|
|
|
|
|
|
|
|
await conn.write(mplexBuf.buffer)
|
|
|
|
proc writer() {.async.} =
|
|
|
|
var sent = 0
|
|
|
|
randomize()
|
|
|
|
let total = buf.buffer.len
|
|
|
|
const min = 20
|
|
|
|
const max = 50
|
|
|
|
while sent < total:
|
|
|
|
var size = rand(min..max)
|
|
|
|
size = if size > buf.buffer.len: buf.buffer.len else: size
|
|
|
|
var send = buf.buffer[0..<size]
|
|
|
|
await conn.write(send)
|
|
|
|
sent += size
|
|
|
|
buf.buffer = buf.buffer[size..^1]
|
|
|
|
|
|
|
|
await writer()
|
|
|
|
await complete.wait(1.seconds)
|
|
|
|
await stream.close()
|
|
|
|
await conn.close()
|
2020-11-18 20:06:42 -06:00
|
|
|
await acceptFut
|
2020-11-12 21:44:02 -06:00
|
|
|
await mplexDialFut
|
2020-03-11 16:23:39 -06:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
await allFuturesThrowing(
|
2020-11-18 20:06:42 -06:00
|
|
|
transport1.stop(),
|
|
|
|
transport2.stop())
|
2020-11-12 21:44:02 -06:00
|
|
|
await listenFut
|
2020-03-11 16:23:39 -06:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
asyncTest "channel should handle 1 byte read/write":
|
2021-12-16 11:05:20 +01:00
|
|
|
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
|
2020-03-11 16:23:39 -06:00
|
|
|
|
2021-06-30 10:59:30 +02:00
|
|
|
let transport1: TcpTransport = TcpTransport.new(upgrade = Upgrade())
|
2020-11-18 20:06:42 -06:00
|
|
|
let listenFut = transport1.start(ma)
|
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
var complete = newFuture[void]()
|
|
|
|
const MsgSize = 512
|
2020-11-18 20:06:42 -06:00
|
|
|
proc acceptHandler() {.async, gcsafe.} =
|
|
|
|
let conn = await transport1.accept()
|
2021-10-25 10:26:32 +02:00
|
|
|
let mplexListen = Mplex.new(conn)
|
2020-11-12 21:44:02 -06:00
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
|
|
|
let msg = await stream.readLp(MsgSize)
|
|
|
|
check msg.len == MsgSize
|
|
|
|
await stream.close()
|
|
|
|
complete.complete()
|
2020-03-11 16:23:39 -06:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
await mplexListen.handle()
|
|
|
|
await mplexListen.close()
|
2020-03-11 16:23:39 -06:00
|
|
|
|
2021-06-30 10:59:30 +02:00
|
|
|
let transport2: TcpTransport = TcpTransport.new(upgrade = Upgrade())
|
2021-11-24 14:01:12 -06:00
|
|
|
let conn = await transport2.dial(transport1.addrs[0])
|
2020-03-11 16:23:39 -06:00
|
|
|
|
2020-11-18 20:06:42 -06:00
|
|
|
let acceptFut = acceptHandler()
|
2021-10-25 10:26:32 +02:00
|
|
|
let mplexDial = Mplex.new(conn)
|
2020-11-12 21:44:02 -06:00
|
|
|
let stream = await mplexDial.newStream()
|
|
|
|
let mplexDialFut = mplexDial.handle()
|
|
|
|
var bigseq = newSeqOfCap[uint8](MsgSize + 1)
|
|
|
|
for _ in 0..<MsgSize: # write one less than max size
|
|
|
|
bigseq.add(uint8(rand(uint('A')..uint('z'))))
|
2020-03-11 16:23:39 -06:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
## create length prefixed libp2p frame
|
|
|
|
var buf = initVBuffer()
|
|
|
|
buf.writeSeq(bigseq)
|
|
|
|
buf.finish()
|
2020-03-11 16:23:39 -06:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
## create mplex header
|
|
|
|
var mplexBuf = initVBuffer()
|
|
|
|
mplexBuf.writePBVarint((1.uint shl 3) or ord(MessageType.MsgOut).uint)
|
|
|
|
mplexBuf.writePBVarint(buf.buffer.len.uint) # size should be always sent
|
2020-03-11 09:12:08 -06:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
await conn.write(mplexBuf.buffer)
|
|
|
|
proc writer() {.async.} =
|
|
|
|
for i in buf.buffer:
|
|
|
|
await conn.write(@[i])
|
2020-03-11 09:12:08 -06:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
await writer()
|
2020-03-27 17:37:00 -06:00
|
|
|
|
2020-11-12 21:44:02 -06:00
|
|
|
await complete.wait(5.seconds)
|
|
|
|
await stream.close()
|
|
|
|
await conn.close()
|
2020-11-18 20:06:42 -06:00
|
|
|
await acceptFut
|
2020-11-12 21:44:02 -06:00
|
|
|
await mplexDialFut
|
|
|
|
await allFuturesThrowing(
|
2020-11-18 20:06:42 -06:00
|
|
|
transport1.stop(),
|
|
|
|
transport2.stop())
|
2020-11-12 21:44:02 -06:00
|
|
|
await listenFut
|