nim-libp2p/tests/testmplex.nim

602 lines
19 KiB
Nim
Raw Normal View History

import unittest, strformat, strformat, random
import chronos, nimcrypto/utils, chronicles, stew/byteutils
import ../libp2p/[errors,
connection,
stream/lpstream,
stream/bufferstream,
transports/tcptransport,
transports/transport,
2019-10-03 13:30:22 -06:00
multiaddress,
muxers/mplex/mplex,
muxers/mplex/coder,
2019-10-03 13:30:22 -06:00
muxers/mplex/types,
2020-03-11 09:12:08 -06:00
muxers/mplex/lpchannel,
vbuffer,
varint]
2019-09-03 14:40:51 -06:00
import ./helpers
when defined(nimHasUsed): {.used.}
2019-09-03 14:40:51 -06:00
suite "Mplex":
teardown:
for tracker in testTrackers():
# echo tracker.dump()
check tracker.isLeaked() == false
test "encode header with channel id 0":
proc testEncodeHeader() {.async.} =
proc encHandler(msg: seq[byte]) {.async.} =
check msg == fromHex("000873747265616d2031")
let stream = newBufferStream(encHandler)
let conn = newConnection(stream)
await conn.writeMsg(0, MessageType.New, ("stream 1").toBytes)
await conn.close()
waitFor(testEncodeHeader())
test "encode header with channel id other than 0":
proc testEncodeHeader() {.async.} =
proc encHandler(msg: seq[byte]) {.async.} =
check msg == fromHex("88010873747265616d2031")
let stream = newBufferStream(encHandler)
let conn = newConnection(stream)
await conn.writeMsg(17, MessageType.New, ("stream 1").toBytes)
await conn.close()
waitFor(testEncodeHeader())
2019-09-03 14:40:51 -06:00
test "encode header and body with channel id 0":
proc testEncodeHeaderBody() {.async.} =
proc encHandler(msg: seq[byte]) {.async.} =
check msg == fromHex("020873747265616d2031")
let stream = newBufferStream(encHandler)
let conn = newConnection(stream)
await conn.writeMsg(0, MessageType.MsgOut, ("stream 1").toBytes)
await conn.close()
waitFor(testEncodeHeaderBody())
test "encode header and body with channel id other than 0":
proc testEncodeHeaderBody() {.async.} =
proc encHandler(msg: seq[byte]) {.async.} =
check msg == fromHex("8a010873747265616d2031")
let stream = newBufferStream(encHandler)
let conn = newConnection(stream)
await conn.writeMsg(17, MessageType.MsgOut, ("stream 1").toBytes)
await conn.close()
waitFor(testEncodeHeaderBody())
test "decode header with channel id 0":
proc testDecodeHeader() {.async.} =
PubSub (Gossip & Flood) Implementation (#36) This adds gossipsub and floodsub, as well as basic interop testing with the go libp2p daemon. * add close event * wip: gossipsub * splitting rpc message * making message handling more consistent * initial gossipsub implementation * feat: nim 1.0 cleanup * wip: gossipsub protobuf * adding encoding/decoding of gossipsub messages * add disconnect handler * add proper gossipsub msg handling * misc: cleanup for nim 1.0 * splitting floodsub and gossipsub tests * feat: add mesh rebalansing * test pubsub * add mesh rebalansing tests * testing mesh maintenance * finishing mcache implementatin * wip: commenting out broken tests * wip: don't run heartbeat for now * switchout debug for trace logging * testing gossip peer selection algorithm * test stream piping * more work around message amplification * get the peerid from message * use timed cache as backing store * allow setting timeout in constructor * several changes to improve performance * more through testing of msg amplification * prevent gc issues * allow piping to self and prevent deadlocks * improove floodsub * allow running hook on cache eviction * prevent race conditions * prevent race conditions and improove tests * use hashes as cache keys * removing useless file * don't create a new seq * re-enable pubsub tests * fix imports * reduce number of runs to speed up tests * break out control message processing * normalize sleeps between steps * implement proper transport filtering * initial interop testing * clean up floodsub publish logic * allow dialing without a protocol * adding multiple reads/writes * use protobuf varint in mplex * don't loose conn's peerInfo * initial interop pubsub tests * don't duplicate connections/peers * bring back interop tests * wip: interop * re-enable interop and daemon tests * add multiple read write tests from handlers * don't cleanup channel prematurely * use correct channel to send/receive msgs * adjust tests with latest changes * include interop tests * remove temp logging output * fix ci * use correct public key serialization * additional tests for pubsub interop
2019-12-05 20:16:18 -06:00
let stream = newBufferStream()
let conn = newConnection(stream)
await stream.pushTo(fromHex("000873747265616d2031"))
2019-09-08 00:32:41 -06:00
let msg = await conn.readMsg()
2019-09-03 14:40:51 -06:00
2020-02-12 09:43:42 -05:00
check msg.id == 0
check msg.msgType == MessageType.New
await conn.close()
waitFor(testDecodeHeader())
test "decode header and body with channel id 0":
proc testDecodeHeader() {.async.} =
PubSub (Gossip & Flood) Implementation (#36) This adds gossipsub and floodsub, as well as basic interop testing with the go libp2p daemon. * add close event * wip: gossipsub * splitting rpc message * making message handling more consistent * initial gossipsub implementation * feat: nim 1.0 cleanup * wip: gossipsub protobuf * adding encoding/decoding of gossipsub messages * add disconnect handler * add proper gossipsub msg handling * misc: cleanup for nim 1.0 * splitting floodsub and gossipsub tests * feat: add mesh rebalansing * test pubsub * add mesh rebalansing tests * testing mesh maintenance * finishing mcache implementatin * wip: commenting out broken tests * wip: don't run heartbeat for now * switchout debug for trace logging * testing gossip peer selection algorithm * test stream piping * more work around message amplification * get the peerid from message * use timed cache as backing store * allow setting timeout in constructor * several changes to improve performance * more through testing of msg amplification * prevent gc issues * allow piping to self and prevent deadlocks * improove floodsub * allow running hook on cache eviction * prevent race conditions * prevent race conditions and improove tests * use hashes as cache keys * removing useless file * don't create a new seq * re-enable pubsub tests * fix imports * reduce number of runs to speed up tests * break out control message processing * normalize sleeps between steps * implement proper transport filtering * initial interop testing * clean up floodsub publish logic * allow dialing without a protocol * adding multiple reads/writes * use protobuf varint in mplex * don't loose conn's peerInfo * initial interop pubsub tests * don't duplicate connections/peers * bring back interop tests * wip: interop * re-enable interop and daemon tests * add multiple read write tests from handlers * don't cleanup channel prematurely * use correct channel to send/receive msgs * adjust tests with latest changes * include interop tests * remove temp logging output * fix ci * use correct public key serialization * additional tests for pubsub interop
2019-12-05 20:16:18 -06:00
let stream = newBufferStream()
let conn = newConnection(stream)
await stream.pushTo(fromHex("021668656C6C6F2066726F6D206368616E6E656C20302121"))
2019-09-08 00:32:41 -06:00
let msg = await conn.readMsg()
2020-02-12 09:43:42 -05:00
check msg.id == 0
check msg.msgType == MessageType.MsgOut
check string.fromBytes(msg.data) == "hello from channel 0!!"
await conn.close()
waitFor(testDecodeHeader())
test "decode header and body with channel id other than 0":
proc testDecodeHeader() {.async.} =
PubSub (Gossip & Flood) Implementation (#36) This adds gossipsub and floodsub, as well as basic interop testing with the go libp2p daemon. * add close event * wip: gossipsub * splitting rpc message * making message handling more consistent * initial gossipsub implementation * feat: nim 1.0 cleanup * wip: gossipsub protobuf * adding encoding/decoding of gossipsub messages * add disconnect handler * add proper gossipsub msg handling * misc: cleanup for nim 1.0 * splitting floodsub and gossipsub tests * feat: add mesh rebalansing * test pubsub * add mesh rebalansing tests * testing mesh maintenance * finishing mcache implementatin * wip: commenting out broken tests * wip: don't run heartbeat for now * switchout debug for trace logging * testing gossip peer selection algorithm * test stream piping * more work around message amplification * get the peerid from message * use timed cache as backing store * allow setting timeout in constructor * several changes to improve performance * more through testing of msg amplification * prevent gc issues * allow piping to self and prevent deadlocks * improove floodsub * allow running hook on cache eviction * prevent race conditions * prevent race conditions and improove tests * use hashes as cache keys * removing useless file * don't create a new seq * re-enable pubsub tests * fix imports * reduce number of runs to speed up tests * break out control message processing * normalize sleeps between steps * implement proper transport filtering * initial interop testing * clean up floodsub publish logic * allow dialing without a protocol * adding multiple reads/writes * use protobuf varint in mplex * don't loose conn's peerInfo * initial interop pubsub tests * don't duplicate connections/peers * bring back interop tests * wip: interop * re-enable interop and daemon tests * add multiple read write tests from handlers * don't cleanup channel prematurely * use correct channel to send/receive msgs * adjust tests with latest changes * include interop tests * remove temp logging output * fix ci * use correct public key serialization * additional tests for pubsub interop
2019-12-05 20:16:18 -06:00
let stream = newBufferStream()
let conn = newConnection(stream)
await stream.pushTo(fromHex("8a011668656C6C6F2066726F6D206368616E6E656C20302121"))
2019-09-08 00:32:41 -06:00
let msg = await conn.readMsg()
2020-02-12 09:43:42 -05:00
check msg.id == 17
check msg.msgType == MessageType.MsgOut
check string.fromBytes(msg.data) == "hello from channel 0!!"
await conn.close()
waitFor(testDecodeHeader())
test "half closed - channel should close for write":
proc testClosedForWrite(): Future[bool] {.async.} =
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let
conn = newConnection(newBufferStream(writeHandler))
chann = newChannel(1, conn, true)
await chann.close()
try:
await chann.write("Hello")
except LPStreamClosedError:
result = true
finally:
await chann.reset()
await conn.close()
check:
waitFor(testClosedForWrite()) == true
test "half closed - channel should close for read by remote":
proc testClosedForRead(): Future[bool] {.async.} =
let
conn = newConnection(newBufferStream(
proc (data: seq[byte]) {.gcsafe, async.} =
result = nil
))
chann = newChannel(1, conn, true)
await chann.pushTo(("Hello!").toBytes)
let closeFut = chann.closeRemote()
var data = newSeq[byte](6)
await chann.readExactly(addr data[0], 6) # this should work, since there is data in the buffer
try:
await chann.readExactly(addr data[0], 6) # this should throw
await closeFut
except LPStreamEOFError:
result = true
finally:
await chann.close()
await conn.close()
check:
waitFor(testClosedForRead()) == true
test "should not allow pushing data to channel when remote end closed":
proc testResetWrite(): Future[bool] {.async.} =
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let
conn = newConnection(newBufferStream(writeHandler))
chann = newChannel(1, conn, true)
await chann.closeRemote()
try:
await chann.pushTo(@[byte(1)])
except LPStreamEOFError:
result = true
finally:
await chann.close()
await conn.close()
check:
waitFor(testResetWrite()) == true
test "reset - channel should fail reading":
proc testResetRead(): Future[bool] {.async.} =
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let
conn = newConnection(newBufferStream(writeHandler))
chann = newChannel(1, conn, true)
await chann.reset()
var data = newSeq[byte](1)
try:
await chann.readExactly(addr data[0], 1)
check data.len == 1
except LPStreamEOFError:
result = true
finally:
await conn.close()
check:
waitFor(testResetRead()) == true
test "reset - channel should fail writing":
proc testResetWrite(): Future[bool] {.async.} =
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let
conn = newConnection(newBufferStream(writeHandler))
chann = newChannel(1, conn, true)
await chann.reset()
try:
await chann.write(("Hello!").toBytes)
except LPStreamClosedError:
result = true
finally:
await conn.close()
check:
waitFor(testResetWrite()) == true
2020-02-12 09:43:42 -05:00
2019-09-25 16:57:27 -06:00
test "e2e - read/write receiver":
proc testNewStream() {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
2019-09-25 16:57:27 -06:00
var done = newFuture[void]()
2019-09-25 16:57:27 -06:00
proc connHandler(conn: Connection) {.async, gcsafe.} =
let mplexListen = newMplex(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
2020-05-08 22:58:23 +02:00
let msg = await stream.readLp(1024)
check string.fromBytes(msg) == "HELLO"
2019-09-25 16:57:27 -06:00
await stream.close()
done.complete()
2019-09-25 16:57:27 -06:00
await mplexListen.handle()
await mplexListen.close()
2019-09-25 16:57:27 -06:00
let transport1: TcpTransport = TcpTransport.init()
let listenFut = await transport1.listen(ma, connHandler)
2020-02-12 09:43:42 -05:00
let transport2: TcpTransport = TcpTransport.init()
2019-09-25 16:57:27 -06:00
let conn = await transport2.dial(transport1.ma)
let mplexDial = newMplex(conn)
let mplexDialFut = mplexDial.handle()
let stream = await mplexDial.newStream()
await stream.writeLp("HELLO")
check LPChannel(stream.stream).isOpen # not lazy
await stream.close()
await done.wait(1.seconds)
await conn.close()
await mplexDialFut
await allFuturesThrowing(
transport1.close(),
transport2.close())
await listenFut
waitFor(testNewStream())
test "e2e - read/write receiver lazy":
proc testNewStream() {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
var done = newFuture[void]()
proc connHandler(conn: Connection) {.async, gcsafe.} =
let mplexListen = newMplex(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
2020-05-08 22:58:23 +02:00
let msg = await stream.readLp(1024)
check string.fromBytes(msg) == "HELLO"
await stream.close()
done.complete()
await mplexListen.handle()
await mplexListen.close()
let transport1: TcpTransport = TcpTransport.init()
2020-03-27 17:37:00 -06:00
let listenFut = await transport1.listen(ma, connHandler)
let transport2: TcpTransport = TcpTransport.init()
let conn = await transport2.dial(transport1.ma)
let mplexDial = newMplex(conn)
let stream = await mplexDial.newStream(lazy = true)
let mplexDialFut = mplexDial.handle()
2020-05-14 16:16:58 -06:00
check not LPChannel(stream.stream).isOpen # assert lazy
await stream.writeLp("HELLO")
2020-05-14 16:16:58 -06:00
check LPChannel(stream.stream).isOpen # assert lazy
await stream.close()
2019-09-25 16:57:27 -06:00
await done.wait(1.seconds)
await conn.close()
await mplexDialFut
await allFuturesThrowing(
transport1.close(),
transport2.close())
2020-03-27 17:37:00 -06:00
await listenFut
waitFor(testNewStream())
2019-09-25 16:57:27 -06:00
test "e2e - write fragmented":
proc testNewStream() {.async.} =
let
ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
listenJob = newFuture[void]()
var bigseq = newSeqOfCap[uint8](MaxMsgSize * 2)
for _ in 0..<MaxMsgSize:
bigseq.add(uint8(rand(uint('A')..uint('z'))))
proc connHandler(conn: Connection) {.async, gcsafe.} =
let mplexListen = newMplex(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
2020-05-08 22:58:23 +02:00
let msg = await stream.readLp(MaxMsgSize)
check msg == bigseq
trace "Bigseq check passed!"
await stream.close()
listenJob.complete()
await mplexListen.handle()
await mplexListen.close()
let transport1: TcpTransport = TcpTransport.init()
2020-03-27 17:37:00 -06:00
let listenFut = await transport1.listen(ma, connHandler)
let transport2: TcpTransport = TcpTransport.init()
let conn = await transport2.dial(transport1.ma)
let mplexDial = newMplex(conn)
let mplexDialFut = mplexDial.handle()
let stream = await mplexDial.newStream()
await stream.writeLp(bigseq)
2020-02-11 14:48:52 +09:00
try:
await listenJob.wait(10.seconds)
2020-02-11 14:48:52 +09:00
except AsyncTimeoutError:
check false
await stream.close()
await conn.close()
await mplexDialFut
await allFuturesThrowing(
transport1.close(),
transport2.close())
2020-03-27 17:37:00 -06:00
await listenFut
waitFor(testNewStream())
2019-09-25 16:57:27 -06:00
test "e2e - read/write initiator":
proc testNewStream() {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
2019-09-25 16:57:27 -06:00
let done = newFuture[void]()
2019-09-25 16:57:27 -06:00
proc connHandler(conn: Connection) {.async, gcsafe.} =
let mplexListen = newMplex(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
2019-09-25 16:57:27 -06:00
await stream.writeLp("Hello from stream!")
await stream.close()
done.complete()
2019-09-25 16:57:27 -06:00
await mplexListen.handle()
await mplexListen.close()
2019-09-25 16:57:27 -06:00
let transport1: TcpTransport = TcpTransport.init()
2020-03-27 17:37:00 -06:00
let listenFut = await transport1.listen(ma, connHandler)
2019-09-25 16:57:27 -06:00
let transport2: TcpTransport = TcpTransport.init()
2019-09-25 16:57:27 -06:00
let conn = await transport2.dial(transport1.ma)
let mplexDial = newMplex(conn)
let mplexDialFut = mplexDial.handle()
2019-09-25 16:57:27 -06:00
let stream = await mplexDial.newStream("DIALER")
let msg = string.fromBytes(await stream.readLp(1024))
await stream.close()
2019-09-25 16:57:27 -06:00
check msg == "Hello from stream!"
await done.wait(1.seconds)
await conn.close()
await mplexDialFut
await allFuturesThrowing(
transport1.close(),
transport2.close())
2020-03-27 17:37:00 -06:00
await listenFut
waitFor(testNewStream())
2019-09-25 16:57:27 -06:00
test "e2e - multiple streams":
proc testNewStream() {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
2019-09-25 16:57:27 -06:00
let done = newFuture[void]()
2019-09-25 16:57:27 -06:00
proc connHandler(conn: Connection) {.async, gcsafe.} =
var count = 1
let mplexListen = newMplex(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
2020-05-08 22:58:23 +02:00
let msg = await stream.readLp(1024)
check string.fromBytes(msg) == &"stream {count}!"
2019-09-25 16:57:27 -06:00
count.inc
await stream.close()
if count == 10:
done.complete()
2019-09-25 16:57:27 -06:00
await mplexListen.handle()
await mplexListen.close()
2019-09-25 16:57:27 -06:00
let transport1 = TcpTransport.init()
2020-03-27 17:37:00 -06:00
let listenFut = await transport1.listen(ma, connHandler)
2019-09-25 16:57:27 -06:00
let transport2: TcpTransport = TcpTransport.init()
2019-09-25 16:57:27 -06:00
let conn = await transport2.dial(transport1.ma)
let mplexDial = newMplex(conn)
# TODO: Reenable once half-closed is working properly
let mplexDialFut = mplexDial.handle()
2020-03-27 17:37:00 -06:00
for i in 1..10:
2019-09-25 16:57:27 -06:00
let stream = await mplexDial.newStream()
await stream.writeLp(&"stream {i}!")
await stream.close()
await done.wait(10.seconds)
await conn.close()
await mplexDialFut
await allFuturesThrowing(
transport1.close(),
transport2.close())
2020-03-27 17:37:00 -06:00
await listenFut
waitFor(testNewStream())
2019-09-25 16:57:27 -06:00
test "e2e - multiple read/write streams":
proc testNewStream() {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
2019-09-25 16:57:27 -06:00
let done = newFuture[void]()
2019-09-25 16:57:27 -06:00
proc connHandler(conn: Connection) {.async, gcsafe.} =
var count = 1
let mplexListen = newMplex(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
2020-05-08 22:58:23 +02:00
let msg = await stream.readLp(1024)
check string.fromBytes(msg) == &"stream {count} from dialer!"
2019-09-25 16:57:27 -06:00
await stream.writeLp(&"stream {count} from listener!")
count.inc
await stream.close()
if count == 10:
done.complete()
2019-09-25 16:57:27 -06:00
await mplexListen.handle()
await mplexListen.close()
2019-09-25 16:57:27 -06:00
let transport1: TcpTransport = TcpTransport.init()
let listenFut = await transport1.listen(ma, connHandler)
2019-09-25 16:57:27 -06:00
let transport2: TcpTransport = TcpTransport.init()
2019-09-25 16:57:27 -06:00
let conn = await transport2.dial(transport1.ma)
let mplexDial = newMplex(conn)
let mplexDialFut = mplexDial.handle()
2019-09-25 16:57:27 -06:00
for i in 1..10:
let stream = await mplexDial.newStream("dialer stream")
await stream.writeLp(&"stream {i} from dialer!")
2020-05-08 22:58:23 +02:00
let msg = await stream.readLp(1024)
check string.fromBytes(msg) == &"stream {i} from listener!"
2019-09-25 16:57:27 -06:00
await stream.close()
await done.wait(5.seconds)
2019-09-25 16:57:27 -06:00
await conn.close()
await mplexDialFut
await allFuturesThrowing(
transport1.close(),
transport2.close())
await listenFut
waitFor(testNewStream())
2019-09-04 00:40:11 -06:00
2020-03-11 09:12:08 -06:00
test "jitter - channel should be able to handle erratic read/writes":
proc test() {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
2020-03-11 09:12:08 -06:00
var complete = newFuture[void]()
2020-03-11 16:23:39 -06:00
const MsgSize = 1024
2020-03-11 09:12:08 -06:00
proc connHandler(conn: Connection) {.async, gcsafe.} =
let mplexListen = newMplex(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
2020-05-08 22:58:23 +02:00
let msg = await stream.readLp(MsgSize)
2020-03-11 09:12:08 -06:00
check msg.len == MsgSize
await stream.close()
complete.complete()
await mplexListen.handle()
await mplexListen.close()
2020-03-11 09:12:08 -06:00
let transport1: TcpTransport = TcpTransport.init()
2020-03-27 17:37:00 -06:00
let listenFut = await transport1.listen(ma, connHandler)
2020-03-11 09:12:08 -06:00
let transport2: TcpTransport = TcpTransport.init()
2020-03-11 09:12:08 -06:00
let conn = await transport2.dial(transport1.ma)
let mplexDial = newMplex(conn)
let mplexDialFut = mplexDial.handle()
2020-03-11 09:12:08 -06:00
let stream = await mplexDial.newStream()
var bigseq = newSeqOfCap[uint8](MaxMsgSize + 1)
for _ in 0..<MsgSize: # write one less than max size
bigseq.add(uint8(rand(uint('A')..uint('z'))))
## create lenght prefixed libp2p frame
var buf = initVBuffer()
buf.writeSeq(bigseq)
buf.finish()
## create mplex header
var mplexBuf = initVBuffer()
mplexBuf.writePBVarint((1.uint shl 3) or ord(MessageType.MsgOut).uint)
mplexBuf.writePBVarint(buf.buffer.len.uint) # size should be always sent
2020-03-11 09:36:45 -06:00
await conn.write(mplexBuf.buffer)
2020-03-11 09:12:08 -06:00
proc writer() {.async.} =
var sent = 0
randomize()
let total = buf.buffer.len
2020-03-11 16:23:39 -06:00
const min = 20
const max = 50
2020-03-11 09:12:08 -06:00
while sent < total:
2020-03-11 16:23:39 -06:00
var size = rand(min..max)
size = if size > buf.buffer.len: buf.buffer.len else: size
var send = buf.buffer[0..<size]
2020-03-11 09:12:08 -06:00
await conn.write(send)
2020-03-11 16:23:39 -06:00
sent += size
buf.buffer = buf.buffer[size..^1]
await writer()
await stream.close()
await conn.close()
await complete.wait(1.seconds)
await mplexDialFut
2020-03-11 16:23:39 -06:00
await allFuturesThrowing(
transport1.close(),
transport2.close())
2020-03-27 17:37:00 -06:00
await listenFut
waitFor(test())
2020-03-11 16:23:39 -06:00
test "jitter - channel should handle 1 byte read/write":
proc test() {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0").tryGet()
2020-03-11 16:23:39 -06:00
var complete = newFuture[void]()
const MsgSize = 512
proc connHandler(conn: Connection) {.async, gcsafe.} =
let mplexListen = newMplex(conn)
mplexListen.streamHandler = proc(stream: Connection)
{.async, gcsafe.} =
2020-05-08 22:58:23 +02:00
let msg = await stream.readLp(MsgSize)
2020-03-11 16:23:39 -06:00
check msg.len == MsgSize
await stream.close()
complete.complete()
await mplexListen.handle()
await mplexListen.close()
2020-03-11 16:23:39 -06:00
let transport1: TcpTransport = TcpTransport.init()
2020-03-27 17:37:00 -06:00
let listenFut = await transport1.listen(ma, connHandler)
2020-03-11 16:23:39 -06:00
let transport2: TcpTransport = TcpTransport.init()
2020-03-11 16:23:39 -06:00
let conn = await transport2.dial(transport1.ma)
let mplexDial = newMplex(conn)
let stream = await mplexDial.newStream()
let mplexDialFut = mplexDial.handle()
var bigseq = newSeqOfCap[uint8](MsgSize + 1)
2020-03-11 16:23:39 -06:00
for _ in 0..<MsgSize: # write one less than max size
bigseq.add(uint8(rand(uint('A')..uint('z'))))
## create lenght prefixed libp2p frame
var buf = initVBuffer()
buf.writeSeq(bigseq)
buf.finish()
## create mplex header
var mplexBuf = initVBuffer()
mplexBuf.writePBVarint((1.uint shl 3) or ord(MessageType.MsgOut).uint)
mplexBuf.writePBVarint(buf.buffer.len.uint) # size should be always sent
await conn.write(mplexBuf.buffer)
proc writer() {.async.} =
for i in buf.buffer:
await conn.write(@[i])
2020-03-11 09:12:08 -06:00
await writer()
await complete.wait(5.seconds)
2020-03-11 09:12:08 -06:00
await stream.close()
await conn.close()
await mplexDialFut
await allFuturesThrowing(
transport1.close(),
transport2.close())
2020-03-27 17:37:00 -06:00
await listenFut
waitFor(test())