2020-11-06 15:24:24 +00:00
|
|
|
import unittest, strformat, strformat, random, oids, sequtils
|
2020-05-18 13:49:49 +00:00
|
|
|
import chronos, nimcrypto/utils, chronicles, stew/byteutils
|
2020-04-21 01:24:42 +00:00
|
|
|
import ../libp2p/[errors,
|
2020-06-19 17:29:43 +00:00
|
|
|
stream/connection,
|
2019-10-29 18:51:48 +00:00
|
|
|
stream/bufferstream,
|
|
|
|
transports/tcptransport,
|
|
|
|
transports/transport,
|
2019-10-03 19:30:22 +00:00
|
|
|
multiaddress,
|
2019-10-29 18:51:48 +00:00
|
|
|
muxers/mplex/mplex,
|
|
|
|
muxers/mplex/coder,
|
2020-03-11 15:12:08 +00:00
|
|
|
muxers/mplex/lpchannel,
|
|
|
|
vbuffer,
|
|
|
|
varint]
|
2019-09-03 20:40:51 +00:00
|
|
|
|
2020-05-08 20:10:06 +00:00
|
|
|
import ./helpers
|
2019-10-29 18:51:48 +00:00
|
|
|
|
2020-08-15 19:50:31 +00:00
|
|
|
{.used.}
|
2020-04-21 01:24:42 +00:00
|
|
|
|
2019-09-03 20:40:51 +00:00
|
|
|
suite "Mplex":
|
2020-04-21 01:24:42 +00:00
|
|
|
teardown:
|
2020-09-21 17:48:19 +00:00
|
|
|
checkTrackers()
|
2020-04-21 01:24:42 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
suite "channel encoding":
|
|
|
|
asyncTest "encode header with channel id 0":
|
2019-09-07 06:20:03 +00:00
|
|
|
proc encHandler(msg: seq[byte]) {.async.} =
|
|
|
|
check msg == fromHex("000873747265616d2031")
|
2019-10-29 18:51:48 +00:00
|
|
|
|
2020-08-15 19:50:31 +00:00
|
|
|
let conn = newBufferStream(encHandler)
|
2020-05-18 17:05:34 +00:00
|
|
|
await conn.writeMsg(0, MessageType.New, ("stream 1").toBytes)
|
2020-05-18 13:49:49 +00:00
|
|
|
await conn.close()
|
2020-04-21 01:24:42 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
asyncTest "encode header with channel id other than 0":
|
2019-09-07 06:20:03 +00:00
|
|
|
proc encHandler(msg: seq[byte]) {.async.} =
|
|
|
|
check msg == fromHex("88010873747265616d2031")
|
2019-10-29 18:51:48 +00:00
|
|
|
|
2020-08-15 19:50:31 +00:00
|
|
|
let conn = newBufferStream(encHandler)
|
2020-05-18 17:05:34 +00:00
|
|
|
await conn.writeMsg(17, MessageType.New, ("stream 1").toBytes)
|
2020-05-18 13:49:49 +00:00
|
|
|
await conn.close()
|
2020-04-21 01:24:42 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
asyncTest "encode header and body with channel id 0":
|
2019-09-07 06:20:03 +00:00
|
|
|
proc encHandler(msg: seq[byte]) {.async.} =
|
|
|
|
check msg == fromHex("020873747265616d2031")
|
2019-10-29 18:51:48 +00:00
|
|
|
|
2020-08-15 19:50:31 +00:00
|
|
|
let conn = newBufferStream(encHandler)
|
2020-05-18 17:05:34 +00:00
|
|
|
await conn.writeMsg(0, MessageType.MsgOut, ("stream 1").toBytes)
|
2020-05-18 13:49:49 +00:00
|
|
|
await conn.close()
|
2020-04-21 01:24:42 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
asyncTest "encode header and body with channel id other than 0":
|
2019-09-07 06:20:03 +00:00
|
|
|
proc encHandler(msg: seq[byte]) {.async.} =
|
|
|
|
check msg == fromHex("8a010873747265616d2031")
|
|
|
|
|
2020-08-15 19:50:31 +00:00
|
|
|
let conn = newBufferStream(encHandler)
|
2020-05-18 17:05:34 +00:00
|
|
|
await conn.writeMsg(17, MessageType.MsgOut, ("stream 1").toBytes)
|
2019-09-07 06:20:03 +00:00
|
|
|
await conn.close()
|
2020-04-21 01:24:42 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
asyncTest "decode header with channel id 0":
|
2019-12-06 02:16:18 +00:00
|
|
|
let stream = newBufferStream()
|
2020-06-19 17:29:43 +00:00
|
|
|
let conn = stream
|
2020-09-21 17:48:19 +00:00
|
|
|
await stream.pushData(fromHex("000873747265616d2031"))
|
2019-09-08 06:32:41 +00:00
|
|
|
let msg = await conn.readMsg()
|
2019-09-03 20:40:51 +00:00
|
|
|
|
2020-02-12 14:43:42 +00:00
|
|
|
check msg.id == 0
|
|
|
|
check msg.msgType == MessageType.New
|
2020-05-18 13:49:49 +00:00
|
|
|
await conn.close()
|
2020-04-21 01:24:42 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
asyncTest "decode header and body with channel id 0":
|
2019-12-06 02:16:18 +00:00
|
|
|
let stream = newBufferStream()
|
2020-06-19 17:29:43 +00:00
|
|
|
let conn = stream
|
2020-09-21 17:48:19 +00:00
|
|
|
await stream.pushData(fromHex("021668656C6C6F2066726F6D206368616E6E656C20302121"))
|
2019-09-08 06:32:41 +00:00
|
|
|
let msg = await conn.readMsg()
|
2019-09-07 06:20:03 +00:00
|
|
|
|
2020-02-12 14:43:42 +00:00
|
|
|
check msg.id == 0
|
|
|
|
check msg.msgType == MessageType.MsgOut
|
2020-05-18 17:05:34 +00:00
|
|
|
check string.fromBytes(msg.data) == "hello from channel 0!!"
|
2020-05-18 13:49:49 +00:00
|
|
|
await conn.close()
|
2020-04-21 01:24:42 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
asyncTest "decode header and body with channel id other than 0":
|
2019-12-06 02:16:18 +00:00
|
|
|
let stream = newBufferStream()
|
2020-06-19 17:29:43 +00:00
|
|
|
let conn = stream
|
2020-09-21 17:48:19 +00:00
|
|
|
await stream.pushData(fromHex("8a011668656C6C6F2066726F6D206368616E6E656C20302121"))
|
2019-09-08 06:32:41 +00:00
|
|
|
let msg = await conn.readMsg()
|
2019-09-07 06:20:03 +00:00
|
|
|
|
2020-02-12 14:43:42 +00:00
|
|
|
check msg.id == 17
|
|
|
|
check msg.msgType == MessageType.MsgOut
|
2020-05-18 17:05:34 +00:00
|
|
|
check string.fromBytes(msg.data) == "hello from channel 0!!"
|
2020-05-18 13:49:49 +00:00
|
|
|
await conn.close()
|
2020-04-21 01:24:42 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
suite "channel half-closed":
|
|
|
|
asyncTest "(local close) - should close for write":
|
2020-05-18 13:49:49 +00:00
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
2020-06-19 17:29:43 +00:00
|
|
|
conn = newBufferStream(writeHandler)
|
2020-07-17 18:44:41 +00:00
|
|
|
chann = LPChannel.init(1, conn, true)
|
2020-11-13 03:44:02 +00:00
|
|
|
|
2020-05-18 17:05:34 +00:00
|
|
|
await chann.close()
|
2020-11-13 03:44:02 +00:00
|
|
|
expect LPStreamClosedError:
|
2020-05-18 13:49:49 +00:00
|
|
|
await chann.write("Hello")
|
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
await chann.reset()
|
|
|
|
await conn.close()
|
2020-04-21 01:24:42 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
asyncTest "(local close) - should allow reads until remote closes":
|
2020-08-12 05:23:49 +00:00
|
|
|
let
|
|
|
|
conn = newBufferStream(
|
|
|
|
proc (data: seq[byte]) {.gcsafe, async.} =
|
|
|
|
discard,
|
|
|
|
)
|
|
|
|
chann = LPChannel.init(1, conn, true)
|
|
|
|
|
2020-09-21 17:48:19 +00:00
|
|
|
await chann.pushData(("Hello!").toBytes)
|
2020-08-12 05:23:49 +00:00
|
|
|
|
|
|
|
var data = newSeq[byte](6)
|
|
|
|
await chann.close() # closing channel
|
|
|
|
# should be able to read on local clsoe
|
|
|
|
await chann.readExactly(addr data[0], 3)
|
|
|
|
# closing remote end
|
2020-09-21 17:48:19 +00:00
|
|
|
let closeFut = chann.pushEof()
|
2020-08-12 05:23:49 +00:00
|
|
|
# should still allow reading until buffer EOF
|
|
|
|
await chann.readExactly(addr data[3], 3)
|
2020-11-13 03:44:02 +00:00
|
|
|
|
|
|
|
expect LPStreamEOFError:
|
2020-08-12 05:23:49 +00:00
|
|
|
# this should fail now
|
|
|
|
await chann.readExactly(addr data[0], 3)
|
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
await chann.close()
|
|
|
|
await conn.close()
|
|
|
|
await closeFut
|
2020-08-12 05:23:49 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
asyncTest "(remote close) - channel should close for reading by remote":
|
2020-05-18 13:49:49 +00:00
|
|
|
let
|
2020-06-19 17:29:43 +00:00
|
|
|
conn = newBufferStream(
|
2020-05-18 13:49:49 +00:00
|
|
|
proc (data: seq[byte]) {.gcsafe, async.} =
|
2020-08-10 22:17:11 +00:00
|
|
|
discard,
|
2020-06-19 17:29:43 +00:00
|
|
|
)
|
2020-07-17 18:44:41 +00:00
|
|
|
chann = LPChannel.init(1, conn, true)
|
2020-05-18 13:49:49 +00:00
|
|
|
|
2020-09-21 17:48:19 +00:00
|
|
|
await chann.pushData(("Hello!").toBytes)
|
2020-05-18 13:49:49 +00:00
|
|
|
|
2020-05-18 17:05:34 +00:00
|
|
|
var data = newSeq[byte](6)
|
2020-08-10 22:17:11 +00:00
|
|
|
await chann.readExactly(addr data[0], 3)
|
2020-09-21 17:48:19 +00:00
|
|
|
let closeFut = chann.pushEof() # closing channel
|
2020-08-10 22:17:11 +00:00
|
|
|
let readFut = chann.readExactly(addr data[3], 3)
|
|
|
|
await all(closeFut, readFut)
|
2020-11-13 03:44:02 +00:00
|
|
|
|
|
|
|
expect LPStreamEOFError:
|
2020-08-10 22:17:11 +00:00
|
|
|
await chann.readExactly(addr data[0], 6) # this should fail now
|
2020-05-18 13:49:49 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
await chann.close()
|
|
|
|
await conn.close()
|
2020-05-18 13:49:49 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
asyncTest "(remote close) - channel should allow writing on remote close":
|
2020-08-10 22:17:11 +00:00
|
|
|
let
|
|
|
|
testData = "Hello!".toBytes
|
|
|
|
conn = newBufferStream(
|
|
|
|
proc (data: seq[byte]) {.gcsafe, async.} =
|
|
|
|
discard
|
|
|
|
)
|
|
|
|
chann = LPChannel.init(1, conn, true)
|
|
|
|
|
2020-09-21 17:48:19 +00:00
|
|
|
await chann.pushEof() # closing channel
|
2020-08-10 22:17:11 +00:00
|
|
|
try:
|
|
|
|
await chann.writeLp(testData)
|
|
|
|
finally:
|
2020-09-21 17:48:19 +00:00
|
|
|
await chann.reset() # there's nobody reading the EOF!
|
2020-08-10 22:17:11 +00:00
|
|
|
await conn.close()
|
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
asyncTest "should not allow pushing data to channel when remote end closed":
|
2020-05-18 13:49:49 +00:00
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
2020-06-19 17:29:43 +00:00
|
|
|
conn = newBufferStream(writeHandler)
|
2020-07-17 18:44:41 +00:00
|
|
|
chann = LPChannel.init(1, conn, true)
|
2020-09-21 17:48:19 +00:00
|
|
|
await chann.pushEof()
|
|
|
|
var buf: array[1, byte]
|
|
|
|
check: (await chann.readOnce(addr buf[0], 1)) == 0 # EOF marker read
|
2020-11-13 03:44:02 +00:00
|
|
|
|
|
|
|
expect LPStreamEOFError:
|
2020-09-21 17:48:19 +00:00
|
|
|
await chann.pushData(@[byte(1)])
|
2020-05-18 13:49:49 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
await chann.close()
|
|
|
|
await conn.close()
|
|
|
|
|
|
|
|
suite "channel reset":
|
2020-05-18 13:49:49 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
asyncTest "channel should fail reading":
|
2020-05-18 13:49:49 +00:00
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
2020-06-19 17:29:43 +00:00
|
|
|
conn = newBufferStream(writeHandler)
|
2020-07-17 18:44:41 +00:00
|
|
|
chann = LPChannel.init(1, conn, true)
|
2020-05-18 13:49:49 +00:00
|
|
|
|
2020-05-18 17:05:34 +00:00
|
|
|
await chann.reset()
|
|
|
|
var data = newSeq[byte](1)
|
2020-11-13 03:44:02 +00:00
|
|
|
expect LPStreamEOFError:
|
2020-05-18 13:49:49 +00:00
|
|
|
await chann.readExactly(addr data[0], 1)
|
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
await conn.close()
|
2020-05-18 13:49:49 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
asyncTest "should complete read":
|
2020-09-21 17:48:19 +00:00
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
|
|
|
conn = newBufferStream(writeHandler)
|
|
|
|
chann = LPChannel.init(1, conn, true)
|
|
|
|
|
|
|
|
var data = newSeq[byte](1)
|
|
|
|
let fut = chann.readExactly(addr data[0], 1)
|
2020-11-13 03:44:02 +00:00
|
|
|
|
2020-09-21 17:48:19 +00:00
|
|
|
await chann.reset()
|
2020-11-13 03:44:02 +00:00
|
|
|
expect LPStreamEOFError:
|
2020-09-21 17:48:19 +00:00
|
|
|
await fut
|
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
await conn.close()
|
2020-09-21 17:48:19 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
asyncTest "should complete pushData":
|
2020-09-21 17:48:19 +00:00
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
|
|
|
conn = newBufferStream(writeHandler)
|
|
|
|
chann = LPChannel.init(1, conn, true)
|
|
|
|
|
|
|
|
await chann.pushData(@[0'u8])
|
|
|
|
let fut = chann.pushData(@[0'u8])
|
|
|
|
await chann.reset()
|
2020-11-13 03:44:02 +00:00
|
|
|
check await fut.withTimeout(100.millis)
|
2020-09-21 17:48:19 +00:00
|
|
|
await conn.close()
|
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
asyncTest "should complete both read and push":
|
2020-09-21 17:48:19 +00:00
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
|
|
|
conn = newBufferStream(writeHandler)
|
|
|
|
chann = LPChannel.init(1, conn, true)
|
|
|
|
|
|
|
|
var data = newSeq[byte](1)
|
|
|
|
let rfut = chann.readExactly(addr data[0], 1)
|
|
|
|
let wfut = chann.pushData(@[0'u8])
|
|
|
|
let wfut2 = chann.pushData(@[0'u8])
|
|
|
|
await chann.reset()
|
2020-11-13 03:44:02 +00:00
|
|
|
check await allFutures(rfut, wfut, wfut2).withTimeout(100.millis)
|
2020-09-21 17:48:19 +00:00
|
|
|
await conn.close()
|
|
|
|
|
2020-11-13 18:30:14 +00:00
|
|
|
asyncTest "should complete both read and push after cancel":
|
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
|
|
|
conn = newBufferStream(writeHandler)
|
|
|
|
chann = LPChannel.init(1, conn, true)
|
|
|
|
|
|
|
|
var data = newSeq[byte](1)
|
|
|
|
let rfut = chann.readExactly(addr data[0], 1)
|
|
|
|
rfut.cancel()
|
|
|
|
|
|
|
|
let wfut = chann.pushData(@[0'u8])
|
|
|
|
let wfut2 = chann.pushData(@[0'u8])
|
|
|
|
await chann.reset()
|
|
|
|
check await allFutures(rfut, wfut, wfut2).withTimeout(100.millis)
|
|
|
|
await conn.close()
|
|
|
|
|
|
|
|
asyncTest "should complete both read and push after reset":
|
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
|
|
|
conn = newBufferStream(writeHandler)
|
|
|
|
chann = LPChannel.init(1, conn, true)
|
|
|
|
|
|
|
|
var data = newSeq[byte](1)
|
|
|
|
let rfut = chann.readExactly(addr data[0], 1)
|
|
|
|
let fut2 = sleepAsync(1.millis) or rfut
|
|
|
|
|
|
|
|
await sleepAsync(5.millis)
|
|
|
|
|
|
|
|
let wfut = chann.pushData(@[0'u8])
|
|
|
|
let wfut2 = chann.pushData(@[0'u8])
|
|
|
|
await chann.reset()
|
|
|
|
check await allFutures(rfut, wfut, wfut2).withTimeout(100.millis)
|
|
|
|
await conn.close()
|
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
asyncTest "channel should fail writing":
|
2020-05-18 13:49:49 +00:00
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
2020-06-19 17:29:43 +00:00
|
|
|
conn = newBufferStream(writeHandler)
|
2020-07-17 18:44:41 +00:00
|
|
|
chann = LPChannel.init(1, conn, true)
|
2020-05-18 17:05:34 +00:00
|
|
|
await chann.reset()
|
2020-11-13 03:44:02 +00:00
|
|
|
|
|
|
|
expect LPStreamClosedError:
|
2020-05-18 17:05:34 +00:00
|
|
|
await chann.write(("Hello!").toBytes)
|
2020-05-18 13:49:49 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
await conn.close()
|
2020-02-12 14:43:42 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
asyncTest "channel should reset on timeout":
|
2020-08-10 22:17:11 +00:00
|
|
|
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
|
|
|
|
let
|
|
|
|
conn = newBufferStream(writeHandler)
|
|
|
|
chann = LPChannel.init(
|
|
|
|
1, conn, true, timeout = 100.millis)
|
2020-07-17 18:44:41 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
check await chann.join().withTimeout(1.minutes)
|
2020-08-10 22:17:11 +00:00
|
|
|
await conn.close()
|
2020-07-17 18:44:41 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
suite "mplex e2e":
|
|
|
|
asyncTest "read/write receiver":
|
2020-05-31 14:22:49 +00:00
|
|
|
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
2019-09-25 22:57:27 +00:00
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
var done = newFuture[void]()
|
2019-09-25 22:57:27 +00:00
|
|
|
proc connHandler(conn: Connection) {.async, gcsafe.} =
|
2020-07-17 18:44:41 +00:00
|
|
|
let mplexListen = Mplex.init(conn)
|
2020-05-18 13:49:49 +00:00
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
2020-05-08 20:58:23 +00:00
|
|
|
let msg = await stream.readLp(1024)
|
2020-05-18 17:05:34 +00:00
|
|
|
check string.fromBytes(msg) == "HELLO"
|
2019-09-25 22:57:27 +00:00
|
|
|
await stream.close()
|
2020-04-21 01:24:42 +00:00
|
|
|
done.complete()
|
2019-09-25 22:57:27 +00:00
|
|
|
|
2020-04-21 01:24:42 +00:00
|
|
|
await mplexListen.handle()
|
2020-05-18 13:49:49 +00:00
|
|
|
await mplexListen.close()
|
2019-09-25 22:57:27 +00:00
|
|
|
|
2020-05-18 19:04:05 +00:00
|
|
|
let transport1: TcpTransport = TcpTransport.init()
|
2020-05-18 13:49:49 +00:00
|
|
|
let listenFut = await transport1.listen(ma, connHandler)
|
2020-02-12 14:43:42 +00:00
|
|
|
|
2020-05-18 19:04:05 +00:00
|
|
|
let transport2: TcpTransport = TcpTransport.init()
|
2019-09-25 22:57:27 +00:00
|
|
|
let conn = await transport2.dial(transport1.ma)
|
|
|
|
|
2020-07-17 18:44:41 +00:00
|
|
|
let mplexDial = Mplex.init(conn)
|
2020-05-18 13:49:49 +00:00
|
|
|
let mplexDialFut = mplexDial.handle()
|
2020-02-11 17:30:36 +00:00
|
|
|
let stream = await mplexDial.newStream()
|
2020-05-18 13:49:49 +00:00
|
|
|
await stream.writeLp("HELLO")
|
2020-06-19 17:29:43 +00:00
|
|
|
check LPChannel(stream).isOpen # not lazy
|
2020-04-21 01:24:42 +00:00
|
|
|
await stream.close()
|
2020-05-18 13:49:49 +00:00
|
|
|
|
|
|
|
await done.wait(1.seconds)
|
2020-04-21 01:24:42 +00:00
|
|
|
await conn.close()
|
2020-06-29 15:15:31 +00:00
|
|
|
await mplexDialFut.wait(1.seconds)
|
2020-06-02 23:53:38 +00:00
|
|
|
await allFuturesThrowing(
|
|
|
|
transport1.close(),
|
|
|
|
transport2.close())
|
2020-04-21 01:24:42 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
await listenFut
|
2020-02-11 17:30:36 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
asyncTest "read/write receiver lazy":
|
2020-05-31 14:22:49 +00:00
|
|
|
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
2020-02-11 17:30:36 +00:00
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
var done = newFuture[void]()
|
2020-02-11 17:30:36 +00:00
|
|
|
proc connHandler(conn: Connection) {.async, gcsafe.} =
|
2020-07-17 18:44:41 +00:00
|
|
|
let mplexListen = Mplex.init(conn)
|
2020-05-18 13:49:49 +00:00
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
2020-05-08 20:58:23 +00:00
|
|
|
let msg = await stream.readLp(1024)
|
2020-05-18 17:05:34 +00:00
|
|
|
check string.fromBytes(msg) == "HELLO"
|
2020-02-11 17:30:36 +00:00
|
|
|
await stream.close()
|
2020-04-21 01:24:42 +00:00
|
|
|
done.complete()
|
2020-02-11 17:30:36 +00:00
|
|
|
|
2020-04-21 01:24:42 +00:00
|
|
|
await mplexListen.handle()
|
2020-05-18 13:49:49 +00:00
|
|
|
await mplexListen.close()
|
2020-02-11 17:30:36 +00:00
|
|
|
|
2020-05-18 19:04:05 +00:00
|
|
|
let transport1: TcpTransport = TcpTransport.init()
|
2020-03-27 23:37:00 +00:00
|
|
|
let listenFut = await transport1.listen(ma, connHandler)
|
2020-02-11 17:30:36 +00:00
|
|
|
|
2020-05-18 19:04:05 +00:00
|
|
|
let transport2: TcpTransport = TcpTransport.init()
|
2020-02-11 17:30:36 +00:00
|
|
|
let conn = await transport2.dial(transport1.ma)
|
|
|
|
|
2020-07-17 18:44:41 +00:00
|
|
|
let mplexDial = Mplex.init(conn)
|
2020-05-18 13:49:49 +00:00
|
|
|
let stream = await mplexDial.newStream(lazy = true)
|
|
|
|
let mplexDialFut = mplexDial.handle()
|
2020-06-19 17:29:43 +00:00
|
|
|
check not LPChannel(stream).isOpen # assert lazy
|
2020-05-18 13:49:49 +00:00
|
|
|
await stream.writeLp("HELLO")
|
2020-06-19 17:29:43 +00:00
|
|
|
check LPChannel(stream).isOpen # assert lazy
|
2020-05-18 13:49:49 +00:00
|
|
|
await stream.close()
|
2019-09-25 22:57:27 +00:00
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
await done.wait(1.seconds)
|
2020-04-21 01:24:42 +00:00
|
|
|
await conn.close()
|
2020-05-18 13:49:49 +00:00
|
|
|
await mplexDialFut
|
2020-06-02 23:53:38 +00:00
|
|
|
await allFuturesThrowing(
|
|
|
|
transport1.close(),
|
|
|
|
transport2.close())
|
2020-03-27 23:37:00 +00:00
|
|
|
await listenFut
|
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
asyncTest "write fragmented":
|
2020-02-07 06:41:24 +00:00
|
|
|
let
|
2020-05-31 14:22:49 +00:00
|
|
|
ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
2020-02-09 03:46:52 +00:00
|
|
|
listenJob = newFuture[void]()
|
2020-02-06 06:24:11 +00:00
|
|
|
|
2020-04-03 15:26:46 +00:00
|
|
|
var bigseq = newSeqOfCap[uint8](MaxMsgSize * 2)
|
|
|
|
for _ in 0..<MaxMsgSize:
|
|
|
|
bigseq.add(uint8(rand(uint('A')..uint('z'))))
|
|
|
|
|
2020-02-06 06:24:11 +00:00
|
|
|
proc connHandler(conn: Connection) {.async, gcsafe.} =
|
2020-08-10 22:17:11 +00:00
|
|
|
try:
|
|
|
|
let mplexListen = Mplex.init(conn)
|
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
|
|
|
let msg = await stream.readLp(MaxMsgSize)
|
|
|
|
check msg == bigseq
|
|
|
|
trace "Bigseq check passed!"
|
|
|
|
await stream.close()
|
|
|
|
listenJob.complete()
|
|
|
|
|
|
|
|
await mplexListen.handle()
|
|
|
|
await sleepAsync(1.seconds) # give chronos some slack to process things
|
|
|
|
await mplexListen.close()
|
|
|
|
except CancelledError as exc:
|
|
|
|
raise exc
|
|
|
|
except CatchableError as exc:
|
|
|
|
check false
|
2020-02-06 06:24:11 +00:00
|
|
|
|
2020-05-18 19:04:05 +00:00
|
|
|
let transport1: TcpTransport = TcpTransport.init()
|
2020-03-27 23:37:00 +00:00
|
|
|
let listenFut = await transport1.listen(ma, connHandler)
|
2020-02-06 06:24:11 +00:00
|
|
|
|
2020-05-18 19:04:05 +00:00
|
|
|
let transport2: TcpTransport = TcpTransport.init()
|
2020-02-06 06:24:11 +00:00
|
|
|
let conn = await transport2.dial(transport1.ma)
|
|
|
|
|
2020-07-17 18:44:41 +00:00
|
|
|
let mplexDial = Mplex.init(conn)
|
2020-05-18 13:49:49 +00:00
|
|
|
let mplexDialFut = mplexDial.handle()
|
2020-02-06 06:24:11 +00:00
|
|
|
let stream = await mplexDial.newStream()
|
2020-03-27 14:25:52 +00:00
|
|
|
|
2020-04-03 15:26:46 +00:00
|
|
|
await stream.writeLp(bigseq)
|
2020-11-13 03:44:02 +00:00
|
|
|
await listenJob.wait(10.seconds)
|
2020-03-27 14:25:52 +00:00
|
|
|
|
2020-04-21 01:24:42 +00:00
|
|
|
await stream.close()
|
|
|
|
await conn.close()
|
2020-05-18 13:49:49 +00:00
|
|
|
await mplexDialFut
|
2020-06-02 23:53:38 +00:00
|
|
|
await allFuturesThrowing(
|
|
|
|
transport1.close(),
|
|
|
|
transport2.close())
|
2020-03-27 23:37:00 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
await listenFut
|
2020-02-06 06:24:11 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
asyncTest "read/write initiator":
|
2020-05-31 14:22:49 +00:00
|
|
|
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
2019-09-25 22:57:27 +00:00
|
|
|
|
2020-04-21 01:24:42 +00:00
|
|
|
let done = newFuture[void]()
|
2019-09-25 22:57:27 +00:00
|
|
|
proc connHandler(conn: Connection) {.async, gcsafe.} =
|
2020-07-17 18:44:41 +00:00
|
|
|
let mplexListen = Mplex.init(conn)
|
2020-05-18 13:49:49 +00:00
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
2019-09-25 22:57:27 +00:00
|
|
|
await stream.writeLp("Hello from stream!")
|
|
|
|
await stream.close()
|
2020-04-21 01:24:42 +00:00
|
|
|
done.complete()
|
2019-09-25 22:57:27 +00:00
|
|
|
|
|
|
|
await mplexListen.handle()
|
2020-05-18 13:49:49 +00:00
|
|
|
await mplexListen.close()
|
2019-09-25 22:57:27 +00:00
|
|
|
|
2020-05-18 19:04:05 +00:00
|
|
|
let transport1: TcpTransport = TcpTransport.init()
|
2020-03-27 23:37:00 +00:00
|
|
|
let listenFut = await transport1.listen(ma, connHandler)
|
2019-09-25 22:57:27 +00:00
|
|
|
|
2020-05-18 19:04:05 +00:00
|
|
|
let transport2: TcpTransport = TcpTransport.init()
|
2019-09-25 22:57:27 +00:00
|
|
|
let conn = await transport2.dial(transport1.ma)
|
|
|
|
|
2020-07-17 18:44:41 +00:00
|
|
|
let mplexDial = Mplex.init(conn)
|
2020-05-18 13:49:49 +00:00
|
|
|
let mplexDialFut = mplexDial.handle()
|
2019-09-25 22:57:27 +00:00
|
|
|
let stream = await mplexDial.newStream("DIALER")
|
2020-05-18 17:05:34 +00:00
|
|
|
let msg = string.fromBytes(await stream.readLp(1024))
|
2020-05-18 13:49:49 +00:00
|
|
|
await stream.close()
|
2019-09-25 22:57:27 +00:00
|
|
|
check msg == "Hello from stream!"
|
2020-04-21 01:24:42 +00:00
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
await done.wait(1.seconds)
|
2020-04-21 01:24:42 +00:00
|
|
|
await conn.close()
|
2020-05-18 13:49:49 +00:00
|
|
|
await mplexDialFut
|
2020-06-02 23:53:38 +00:00
|
|
|
await allFuturesThrowing(
|
|
|
|
transport1.close(),
|
|
|
|
transport2.close())
|
2020-03-27 23:37:00 +00:00
|
|
|
await listenFut
|
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
asyncTest "multiple streams":
|
2020-05-31 14:22:49 +00:00
|
|
|
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
2019-09-25 22:57:27 +00:00
|
|
|
|
2020-04-21 01:24:42 +00:00
|
|
|
let done = newFuture[void]()
|
2019-09-25 22:57:27 +00:00
|
|
|
proc connHandler(conn: Connection) {.async, gcsafe.} =
|
2020-05-18 13:49:49 +00:00
|
|
|
var count = 1
|
2020-07-17 18:44:41 +00:00
|
|
|
let mplexListen = Mplex.init(conn)
|
2020-05-18 13:49:49 +00:00
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
2020-05-08 20:58:23 +00:00
|
|
|
let msg = await stream.readLp(1024)
|
2020-05-18 13:49:49 +00:00
|
|
|
check string.fromBytes(msg) == &"stream {count}!"
|
2019-09-25 22:57:27 +00:00
|
|
|
count.inc
|
|
|
|
await stream.close()
|
2020-04-21 01:24:42 +00:00
|
|
|
if count == 10:
|
|
|
|
done.complete()
|
2019-09-25 22:57:27 +00:00
|
|
|
|
|
|
|
await mplexListen.handle()
|
2020-05-18 13:49:49 +00:00
|
|
|
await mplexListen.close()
|
2019-09-25 22:57:27 +00:00
|
|
|
|
2020-05-18 19:04:05 +00:00
|
|
|
let transport1 = TcpTransport.init()
|
2020-03-27 23:37:00 +00:00
|
|
|
let listenFut = await transport1.listen(ma, connHandler)
|
2019-09-25 22:57:27 +00:00
|
|
|
|
2020-05-18 19:04:05 +00:00
|
|
|
let transport2: TcpTransport = TcpTransport.init()
|
2019-09-25 22:57:27 +00:00
|
|
|
let conn = await transport2.dial(transport1.ma)
|
|
|
|
|
2020-07-17 18:44:41 +00:00
|
|
|
let mplexDial = Mplex.init(conn)
|
2020-05-18 13:49:49 +00:00
|
|
|
# TODO: Reenable once half-closed is working properly
|
2020-05-20 00:14:15 +00:00
|
|
|
let mplexDialFut = mplexDial.handle()
|
2020-03-27 23:37:00 +00:00
|
|
|
for i in 1..10:
|
2019-09-25 22:57:27 +00:00
|
|
|
let stream = await mplexDial.newStream()
|
|
|
|
await stream.writeLp(&"stream {i}!")
|
|
|
|
await stream.close()
|
|
|
|
|
2020-05-18 13:49:49 +00:00
|
|
|
await done.wait(10.seconds)
|
2020-04-21 01:24:42 +00:00
|
|
|
await conn.close()
|
2020-05-20 00:14:15 +00:00
|
|
|
await mplexDialFut
|
2020-06-02 23:53:38 +00:00
|
|
|
await allFuturesThrowing(
|
|
|
|
transport1.close(),
|
|
|
|
transport2.close())
|
2020-03-27 23:37:00 +00:00
|
|
|
await listenFut
|
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
asyncTest "multiple read/write streams":
|
2020-05-31 14:22:49 +00:00
|
|
|
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
2019-09-25 22:57:27 +00:00
|
|
|
|
2020-04-21 01:24:42 +00:00
|
|
|
let done = newFuture[void]()
|
2019-09-25 22:57:27 +00:00
|
|
|
proc connHandler(conn: Connection) {.async, gcsafe.} =
|
2020-05-18 13:49:49 +00:00
|
|
|
var count = 1
|
2020-07-17 18:44:41 +00:00
|
|
|
let mplexListen = Mplex.init(conn)
|
2020-05-18 13:49:49 +00:00
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
2020-05-08 20:58:23 +00:00
|
|
|
let msg = await stream.readLp(1024)
|
2020-05-18 17:05:34 +00:00
|
|
|
check string.fromBytes(msg) == &"stream {count} from dialer!"
|
2019-09-25 22:57:27 +00:00
|
|
|
await stream.writeLp(&"stream {count} from listener!")
|
|
|
|
count.inc
|
|
|
|
await stream.close()
|
2020-04-21 01:24:42 +00:00
|
|
|
if count == 10:
|
|
|
|
done.complete()
|
2019-09-25 22:57:27 +00:00
|
|
|
|
2020-04-21 01:24:42 +00:00
|
|
|
await mplexListen.handle()
|
2020-05-18 13:49:49 +00:00
|
|
|
await mplexListen.close()
|
2019-09-25 22:57:27 +00:00
|
|
|
|
2020-05-18 19:04:05 +00:00
|
|
|
let transport1: TcpTransport = TcpTransport.init()
|
2020-05-18 13:49:49 +00:00
|
|
|
let listenFut = await transport1.listen(ma, connHandler)
|
2019-09-25 22:57:27 +00:00
|
|
|
|
2020-05-18 19:04:05 +00:00
|
|
|
let transport2: TcpTransport = TcpTransport.init()
|
2019-09-25 22:57:27 +00:00
|
|
|
let conn = await transport2.dial(transport1.ma)
|
|
|
|
|
2020-07-17 18:44:41 +00:00
|
|
|
let mplexDial = Mplex.init(conn)
|
2020-05-18 13:49:49 +00:00
|
|
|
let mplexDialFut = mplexDial.handle()
|
2019-09-25 22:57:27 +00:00
|
|
|
for i in 1..10:
|
|
|
|
let stream = await mplexDial.newStream("dialer stream")
|
|
|
|
await stream.writeLp(&"stream {i} from dialer!")
|
2020-05-08 20:58:23 +00:00
|
|
|
let msg = await stream.readLp(1024)
|
2020-05-18 17:05:34 +00:00
|
|
|
check string.fromBytes(msg) == &"stream {i} from listener!"
|
2019-09-25 22:57:27 +00:00
|
|
|
await stream.close()
|
|
|
|
|
2020-04-21 01:24:42 +00:00
|
|
|
await done.wait(5.seconds)
|
2019-09-25 22:57:27 +00:00
|
|
|
await conn.close()
|
2020-05-18 13:49:49 +00:00
|
|
|
await mplexDialFut
|
2020-09-21 17:48:19 +00:00
|
|
|
await mplexDial.close()
|
2020-06-02 23:53:38 +00:00
|
|
|
await allFuturesThrowing(
|
|
|
|
transport1.close(),
|
|
|
|
transport2.close())
|
2020-05-18 13:49:49 +00:00
|
|
|
await listenFut
|
2020-05-15 04:02:05 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
asyncTest "channel closes listener with EOF":
|
2020-11-06 15:24:24 +00:00
|
|
|
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
|
|
|
|
|
|
|
var listenStreams: seq[Connection]
|
|
|
|
proc connHandler(conn: Connection) {.async, gcsafe.} =
|
|
|
|
let mplexListen = Mplex.init(conn)
|
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
|
|
|
listenStreams.add(stream)
|
|
|
|
try:
|
|
|
|
discard await stream.readLp(1024)
|
|
|
|
except LPStreamEOFError:
|
|
|
|
await stream.close()
|
|
|
|
return
|
|
|
|
|
|
|
|
check false
|
|
|
|
|
|
|
|
await mplexListen.handle()
|
|
|
|
await mplexListen.close()
|
|
|
|
|
|
|
|
let transport1 = TcpTransport.init()
|
|
|
|
let listenFut = await transport1.listen(ma, connHandler)
|
|
|
|
|
|
|
|
let transport2: TcpTransport = TcpTransport.init()
|
|
|
|
let conn = await transport2.dial(transport1.ma)
|
|
|
|
|
|
|
|
let mplexDial = Mplex.init(conn)
|
|
|
|
let mplexDialFut = mplexDial.handle()
|
|
|
|
var dialStreams: seq[Connection]
|
|
|
|
for i in 0..9:
|
|
|
|
dialStreams.add((await mplexDial.newStream()))
|
|
|
|
|
|
|
|
for i, s in dialStreams:
|
|
|
|
await s.closeWithEOF()
|
|
|
|
check listenStreams[i].closed
|
|
|
|
check s.closed
|
|
|
|
|
|
|
|
checkTracker(LPChannelTrackerName)
|
|
|
|
|
|
|
|
await conn.close()
|
|
|
|
await mplexDialFut
|
|
|
|
await allFuturesThrowing(
|
|
|
|
transport1.close(),
|
|
|
|
transport2.close())
|
|
|
|
await listenFut
|
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
asyncTest "channel closes dialer with EOF":
|
2020-11-06 15:24:24 +00:00
|
|
|
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
|
|
|
|
|
|
|
var listenStreams: seq[Connection]
|
|
|
|
var count = 0
|
|
|
|
var done = newFuture[void]()
|
|
|
|
proc connHandler(conn: Connection) {.async, gcsafe.} =
|
|
|
|
let mplexListen = Mplex.init(conn)
|
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
|
|
|
listenStreams.add(stream)
|
|
|
|
count.inc()
|
|
|
|
if count == 10:
|
|
|
|
done.complete()
|
|
|
|
|
|
|
|
await stream.join()
|
|
|
|
|
|
|
|
await mplexListen.handle()
|
|
|
|
await mplexListen.close()
|
|
|
|
|
|
|
|
let transport1 = TcpTransport.init()
|
|
|
|
let listenFut = await transport1.listen(ma, connHandler)
|
|
|
|
|
|
|
|
let transport2: TcpTransport = TcpTransport.init()
|
|
|
|
let conn = await transport2.dial(transport1.ma)
|
|
|
|
|
|
|
|
let mplexDial = Mplex.init(conn)
|
|
|
|
let mplexDialFut = mplexDial.handle()
|
|
|
|
var dialStreams: seq[Connection]
|
|
|
|
for i in 0..9:
|
|
|
|
dialStreams.add((await mplexDial.newStream()))
|
|
|
|
|
|
|
|
proc dialReadLoop() {.async.} =
|
|
|
|
for s in dialStreams:
|
|
|
|
try:
|
|
|
|
discard await s.readLp(1024)
|
|
|
|
check false
|
|
|
|
except LPStreamEOFError:
|
|
|
|
await s.close()
|
|
|
|
continue
|
|
|
|
|
|
|
|
check false
|
|
|
|
|
|
|
|
await done
|
|
|
|
let readLoop = dialReadLoop()
|
|
|
|
for s in listenStreams:
|
|
|
|
await s.closeWithEOF()
|
|
|
|
check s.closed
|
|
|
|
|
|
|
|
await readLoop
|
|
|
|
await allFuturesThrowing(
|
|
|
|
(dialStreams & listenStreams)
|
2020-11-06 18:20:59 +00:00
|
|
|
.mapIt( it.join() ))
|
2020-11-06 15:24:24 +00:00
|
|
|
|
|
|
|
checkTracker(LPChannelTrackerName)
|
|
|
|
|
|
|
|
await conn.close()
|
|
|
|
await mplexDialFut
|
|
|
|
await allFuturesThrowing(
|
|
|
|
transport1.close(),
|
|
|
|
transport2.close())
|
|
|
|
await listenFut
|
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
asyncTest "dialing mplex closes both ends":
|
2020-11-06 15:24:24 +00:00
|
|
|
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
|
|
|
|
|
|
|
var listenStreams: seq[Connection]
|
|
|
|
proc connHandler(conn: Connection) {.async, gcsafe.} =
|
|
|
|
let mplexListen = Mplex.init(conn)
|
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
|
|
|
listenStreams.add(stream)
|
|
|
|
await stream.join()
|
|
|
|
|
|
|
|
await mplexListen.handle()
|
|
|
|
await mplexListen.close()
|
|
|
|
|
|
|
|
let transport1 = TcpTransport.init()
|
|
|
|
let listenFut = await transport1.listen(ma, connHandler)
|
|
|
|
|
|
|
|
let transport2: TcpTransport = TcpTransport.init()
|
|
|
|
let conn = await transport2.dial(transport1.ma)
|
|
|
|
|
|
|
|
let mplexDial = Mplex.init(conn)
|
|
|
|
let mplexDialFut = mplexDial.handle()
|
|
|
|
var dialStreams: seq[Connection]
|
|
|
|
for i in 0..9:
|
|
|
|
dialStreams.add((await mplexDial.newStream()))
|
|
|
|
|
|
|
|
await mplexDial.close()
|
|
|
|
await allFuturesThrowing(
|
|
|
|
(dialStreams & listenStreams)
|
2020-11-06 18:20:59 +00:00
|
|
|
.mapIt( it.join() ))
|
2020-11-06 15:24:24 +00:00
|
|
|
|
|
|
|
checkTracker(LPChannelTrackerName)
|
|
|
|
|
|
|
|
await conn.close()
|
|
|
|
await mplexDialFut
|
|
|
|
await allFuturesThrowing(
|
|
|
|
transport1.close(),
|
|
|
|
transport2.close())
|
|
|
|
await listenFut
|
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
asyncTest "listening mplex closes both ends":
|
2020-11-06 15:24:24 +00:00
|
|
|
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
|
|
|
|
|
|
|
var mplexListen: Mplex
|
|
|
|
var listenStreams: seq[Connection]
|
|
|
|
proc connHandler(conn: Connection) {.async, gcsafe.} =
|
|
|
|
mplexListen = Mplex.init(conn)
|
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
|
|
|
listenStreams.add(stream)
|
|
|
|
await stream.join()
|
|
|
|
|
|
|
|
await mplexListen.handle()
|
|
|
|
await mplexListen.close()
|
|
|
|
|
|
|
|
let transport1 = TcpTransport.init()
|
|
|
|
let listenFut = await transport1.listen(ma, connHandler)
|
|
|
|
|
|
|
|
let transport2: TcpTransport = TcpTransport.init()
|
|
|
|
let conn = await transport2.dial(transport1.ma)
|
|
|
|
|
|
|
|
let mplexDial = Mplex.init(conn)
|
|
|
|
let mplexDialFut = mplexDial.handle()
|
|
|
|
var dialStreams: seq[Connection]
|
|
|
|
for i in 0..9:
|
|
|
|
dialStreams.add((await mplexDial.newStream()))
|
|
|
|
|
|
|
|
await mplexListen.close()
|
|
|
|
await allFuturesThrowing(
|
|
|
|
(dialStreams & listenStreams)
|
2020-11-06 18:20:59 +00:00
|
|
|
.mapIt( it.join() ))
|
2020-11-06 15:24:24 +00:00
|
|
|
|
|
|
|
checkTracker(LPChannelTrackerName)
|
|
|
|
|
|
|
|
await conn.close()
|
|
|
|
await mplexDialFut
|
|
|
|
await allFuturesThrowing(
|
|
|
|
transport1.close(),
|
|
|
|
transport2.close())
|
|
|
|
await listenFut
|
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
asyncTest "canceling mplex handler closes both ends":
|
2020-11-06 15:24:24 +00:00
|
|
|
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
|
|
|
|
|
|
|
var mplexHandle: Future[void]
|
|
|
|
var listenStreams: seq[Connection]
|
|
|
|
proc connHandler(conn: Connection) {.async, gcsafe.} =
|
|
|
|
let mplexListen = Mplex.init(conn)
|
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
|
|
|
listenStreams.add(stream)
|
|
|
|
await stream.join()
|
|
|
|
|
|
|
|
mplexHandle = mplexListen.handle()
|
|
|
|
await mplexHandle
|
|
|
|
await mplexListen.close()
|
|
|
|
|
|
|
|
let transport1 = TcpTransport.init()
|
|
|
|
let listenFut = await transport1.listen(ma, connHandler)
|
|
|
|
|
|
|
|
let transport2: TcpTransport = TcpTransport.init()
|
|
|
|
let conn = await transport2.dial(transport1.ma)
|
|
|
|
|
|
|
|
let mplexDial = Mplex.init(conn)
|
|
|
|
let mplexDialFut = mplexDial.handle()
|
|
|
|
var dialStreams: seq[Connection]
|
|
|
|
for i in 0..9:
|
|
|
|
dialStreams.add((await mplexDial.newStream()))
|
|
|
|
|
|
|
|
mplexHandle.cancel()
|
|
|
|
await allFuturesThrowing(
|
|
|
|
(dialStreams & listenStreams)
|
2020-11-06 18:20:59 +00:00
|
|
|
.mapIt( it.join() ))
|
2020-11-06 15:24:24 +00:00
|
|
|
|
|
|
|
checkTracker(LPChannelTrackerName)
|
|
|
|
|
|
|
|
await conn.close()
|
|
|
|
await mplexDialFut
|
|
|
|
await allFuturesThrowing(
|
|
|
|
transport1.close(),
|
|
|
|
transport2.close())
|
|
|
|
await listenFut
|
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
asyncTest "closing dialing connection should close both ends":
|
2020-11-06 15:24:24 +00:00
|
|
|
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
|
|
|
|
|
|
|
var listenStreams: seq[Connection]
|
|
|
|
proc connHandler(conn: Connection) {.async, gcsafe.} =
|
|
|
|
let mplexListen = Mplex.init(conn)
|
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
|
|
|
listenStreams.add(stream)
|
|
|
|
await stream.join()
|
|
|
|
|
|
|
|
await mplexListen.handle()
|
|
|
|
await mplexListen.close()
|
|
|
|
|
|
|
|
let transport1 = TcpTransport.init()
|
|
|
|
let listenFut = await transport1.listen(ma, connHandler)
|
|
|
|
|
|
|
|
let transport2: TcpTransport = TcpTransport.init()
|
|
|
|
let conn = await transport2.dial(transport1.ma)
|
|
|
|
|
|
|
|
let mplexDial = Mplex.init(conn)
|
|
|
|
let mplexDialFut = mplexDial.handle()
|
|
|
|
var dialStreams: seq[Connection]
|
|
|
|
for i in 0..9:
|
|
|
|
dialStreams.add((await mplexDial.newStream()))
|
|
|
|
|
|
|
|
await conn.close()
|
|
|
|
await allFuturesThrowing(
|
|
|
|
(dialStreams & listenStreams)
|
2020-11-06 18:20:59 +00:00
|
|
|
.mapIt( it.join() ))
|
2020-11-06 15:24:24 +00:00
|
|
|
|
|
|
|
checkTracker(LPChannelTrackerName)
|
|
|
|
|
|
|
|
await conn.close()
|
|
|
|
await mplexDialFut
|
|
|
|
await allFuturesThrowing(
|
|
|
|
transport1.close(),
|
|
|
|
transport2.close())
|
|
|
|
await listenFut
|
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
asyncTest "canceling listening connection should close both ends":
|
2020-11-06 15:24:24 +00:00
|
|
|
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
|
|
|
|
|
|
|
var listenConn: Connection
|
|
|
|
var listenStreams: seq[Connection]
|
|
|
|
proc connHandler(conn: Connection) {.async, gcsafe.} =
|
|
|
|
listenConn = conn
|
|
|
|
let mplexListen = Mplex.init(conn)
|
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
|
|
|
listenStreams.add(stream)
|
|
|
|
await stream.join()
|
|
|
|
|
|
|
|
await mplexListen.handle()
|
|
|
|
await mplexListen.close()
|
|
|
|
|
|
|
|
let transport1 = TcpTransport.init()
|
|
|
|
let listenFut = await transport1.listen(ma, connHandler)
|
|
|
|
|
|
|
|
let transport2: TcpTransport = TcpTransport.init()
|
|
|
|
let conn = await transport2.dial(transport1.ma)
|
|
|
|
|
|
|
|
let mplexDial = Mplex.init(conn)
|
|
|
|
let mplexDialFut = mplexDial.handle()
|
|
|
|
var dialStreams: seq[Connection]
|
|
|
|
for i in 0..9:
|
|
|
|
dialStreams.add((await mplexDial.newStream()))
|
|
|
|
|
|
|
|
await listenConn.close()
|
|
|
|
await allFuturesThrowing(
|
|
|
|
(dialStreams & listenStreams)
|
2020-11-06 18:20:59 +00:00
|
|
|
.mapIt( it.join() ))
|
2020-11-06 15:24:24 +00:00
|
|
|
|
|
|
|
checkTracker(LPChannelTrackerName)
|
|
|
|
|
|
|
|
await conn.close()
|
|
|
|
await mplexDialFut
|
|
|
|
await allFuturesThrowing(
|
|
|
|
transport1.close(),
|
|
|
|
transport2.close())
|
|
|
|
await listenFut
|
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
suite "jitter":
|
|
|
|
asyncTest "channel should be able to handle erratic read/writes":
|
|
|
|
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
2020-03-11 15:12:08 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
var complete = newFuture[void]()
|
|
|
|
const MsgSize = 1024
|
|
|
|
proc connHandler(conn: Connection) {.async, gcsafe.} =
|
|
|
|
let mplexListen = Mplex.init(conn)
|
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
|
|
|
try:
|
|
|
|
let msg = await stream.readLp(MsgSize)
|
|
|
|
check msg.len == MsgSize
|
|
|
|
except CatchableError as e:
|
|
|
|
echo e.msg
|
|
|
|
await stream.close()
|
|
|
|
complete.complete()
|
2020-03-11 22:23:39 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
await mplexListen.handle()
|
|
|
|
await mplexListen.close()
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 06:19:13 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
let transport1: TcpTransport = TcpTransport.init()
|
|
|
|
let listenFut = await transport1.listen(ma, connHandler)
|
|
|
|
|
|
|
|
let transport2: TcpTransport = TcpTransport.init()
|
|
|
|
let conn = await transport2.dial(transport1.ma)
|
|
|
|
|
|
|
|
let mplexDial = Mplex.init(conn)
|
|
|
|
let mplexDialFut = mplexDial.handle()
|
|
|
|
let stream = await mplexDial.newStream()
|
|
|
|
var bigseq = newSeqOfCap[uint8](MaxMsgSize + 1)
|
|
|
|
for _ in 0..<MsgSize: # write one less than max size
|
|
|
|
bigseq.add(uint8(rand(uint('A')..uint('z'))))
|
|
|
|
|
|
|
|
## create length prefixed libp2p frame
|
|
|
|
var buf = initVBuffer()
|
|
|
|
buf.writeSeq(bigseq)
|
|
|
|
buf.finish()
|
|
|
|
|
|
|
|
## create mplex header
|
|
|
|
var mplexBuf = initVBuffer()
|
|
|
|
mplexBuf.writePBVarint((1.uint shl 3) or ord(MessageType.MsgOut).uint)
|
|
|
|
mplexBuf.writePBVarint(buf.buffer.len.uint) # size should be always sent
|
|
|
|
|
|
|
|
await conn.write(mplexBuf.buffer)
|
|
|
|
proc writer() {.async.} =
|
|
|
|
var sent = 0
|
|
|
|
randomize()
|
|
|
|
let total = buf.buffer.len
|
|
|
|
const min = 20
|
|
|
|
const max = 50
|
|
|
|
while sent < total:
|
|
|
|
var size = rand(min..max)
|
|
|
|
size = if size > buf.buffer.len: buf.buffer.len else: size
|
|
|
|
var send = buf.buffer[0..<size]
|
|
|
|
await conn.write(send)
|
|
|
|
sent += size
|
|
|
|
buf.buffer = buf.buffer[size..^1]
|
|
|
|
|
|
|
|
await writer()
|
|
|
|
await complete.wait(1.seconds)
|
2020-03-11 22:23:39 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
await stream.close()
|
|
|
|
await conn.close()
|
2020-03-27 23:37:00 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
await mplexDialFut
|
2020-03-11 22:23:39 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
await allFuturesThrowing(
|
|
|
|
transport1.close(),
|
|
|
|
transport2.close())
|
|
|
|
await listenFut
|
2020-03-11 22:23:39 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
asyncTest "channel should handle 1 byte read/write":
|
|
|
|
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
|
2020-03-11 22:23:39 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
var complete = newFuture[void]()
|
|
|
|
const MsgSize = 512
|
|
|
|
proc connHandler(conn: Connection) {.async, gcsafe.} =
|
|
|
|
let mplexListen = Mplex.init(conn)
|
|
|
|
mplexListen.streamHandler = proc(stream: Connection)
|
|
|
|
{.async, gcsafe.} =
|
|
|
|
let msg = await stream.readLp(MsgSize)
|
|
|
|
check msg.len == MsgSize
|
|
|
|
await stream.close()
|
|
|
|
complete.complete()
|
2020-03-11 22:23:39 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
await mplexListen.handle()
|
|
|
|
await mplexListen.close()
|
2020-03-11 22:23:39 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
let transport1: TcpTransport = TcpTransport.init()
|
|
|
|
let listenFut = await transport1.listen(ma, connHandler)
|
2020-03-11 22:23:39 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
let transport2: TcpTransport = TcpTransport.init()
|
|
|
|
let conn = await transport2.dial(transport1.ma)
|
2020-03-11 22:23:39 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
let mplexDial = Mplex.init(conn)
|
|
|
|
let stream = await mplexDial.newStream()
|
|
|
|
let mplexDialFut = mplexDial.handle()
|
|
|
|
var bigseq = newSeqOfCap[uint8](MsgSize + 1)
|
|
|
|
for _ in 0..<MsgSize: # write one less than max size
|
|
|
|
bigseq.add(uint8(rand(uint('A')..uint('z'))))
|
2020-03-11 22:23:39 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
## create length prefixed libp2p frame
|
|
|
|
var buf = initVBuffer()
|
|
|
|
buf.writeSeq(bigseq)
|
|
|
|
buf.finish()
|
2020-03-11 22:23:39 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
## create mplex header
|
|
|
|
var mplexBuf = initVBuffer()
|
|
|
|
mplexBuf.writePBVarint((1.uint shl 3) or ord(MessageType.MsgOut).uint)
|
|
|
|
mplexBuf.writePBVarint(buf.buffer.len.uint) # size should be always sent
|
2020-03-11 15:12:08 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
await conn.write(mplexBuf.buffer)
|
|
|
|
proc writer() {.async.} =
|
|
|
|
for i in buf.buffer:
|
|
|
|
await conn.write(@[i])
|
2020-03-11 15:12:08 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
await writer()
|
2020-03-27 23:37:00 +00:00
|
|
|
|
2020-11-13 03:44:02 +00:00
|
|
|
await complete.wait(5.seconds)
|
|
|
|
await stream.close()
|
|
|
|
await conn.close()
|
|
|
|
await mplexDialFut
|
|
|
|
await allFuturesThrowing(
|
|
|
|
transport1.close(),
|
|
|
|
transport2.close())
|
|
|
|
await listenFut
|