nim-libp2p-experimental/tests/testmplex.nim

661 lines
20 KiB
Nim
Raw Normal View History

import unittest, sequtils, sugar, strformat, options, strformat, random
2019-09-09 17:33:32 +00:00
import chronos, nimcrypto/utils, chronicles
import ../libp2p/[errors,
connection,
stream/lpstream,
stream/bufferstream,
transports/tcptransport,
transports/transport,
protocols/identify,
2019-10-03 19:30:22 +00:00
multiaddress,
muxers/mplex/mplex,
muxers/mplex/coder,
2019-10-03 19:30:22 +00:00
muxers/mplex/types,
2020-03-11 15:12:08 +00:00
muxers/mplex/lpchannel,
vbuffer,
varint]
2019-09-03 20:40:51 +00:00
when defined(nimHasUsed): {.used.}
const
StreamTransportTrackerName = "stream.transport"
StreamServerTrackerName = "stream.server"
2019-09-03 20:40:51 +00:00
suite "Mplex":
teardown:
let
trackers = [
getTracker(BufferStreamTrackerName),
getTracker(AsyncStreamWriterTrackerName),
getTracker(TcpTransportTrackerName),
getTracker(AsyncStreamReaderTrackerName),
getTracker(StreamTransportTrackerName),
getTracker(StreamServerTrackerName)
]
for tracker in trackers:
if not isNil(tracker):
# echo tracker.dump()
check tracker.isLeaked() == false
test "encode header with channel id 0":
proc testEncodeHeader(): Future[bool] {.async.} =
proc encHandler(msg: seq[byte]) {.async.} =
check msg == fromHex("000873747265616d2031")
let stream = newBufferStream(encHandler)
let conn = newConnection(stream)
2019-09-12 02:10:38 +00:00
await conn.writeMsg(0, MessageType.New, cast[seq[byte]]("stream 1"))
result = true
await stream.close()
check:
waitFor(testEncodeHeader()) == true
test "encode header with channel id other than 0":
2019-09-04 01:43:57 +00:00
proc testEncodeHeader(): Future[bool] {.async.} =
proc encHandler(msg: seq[byte]) {.async.} =
check msg == fromHex("88010873747265616d2031")
let stream = newBufferStream(encHandler)
let conn = newConnection(stream)
2019-09-12 02:10:38 +00:00
await conn.writeMsg(17, MessageType.New, cast[seq[byte]]("stream 1"))
2019-09-04 01:43:57 +00:00
result = true
2019-09-03 20:40:51 +00:00
await stream.close()
2019-09-04 01:43:57 +00:00
check:
waitFor(testEncodeHeader()) == true
2019-09-03 20:40:51 +00:00
test "encode header and body with channel id 0":
proc testEncodeHeaderBody(): Future[bool] {.async.} =
var step = 0
proc encHandler(msg: seq[byte]) {.async.} =
check msg == fromHex("020873747265616d2031")
let stream = newBufferStream(encHandler)
let conn = newConnection(stream)
2019-09-12 02:10:38 +00:00
await conn.writeMsg(0, MessageType.MsgOut, cast[seq[byte]]("stream 1"))
result = true
await stream.close()
check:
waitFor(testEncodeHeaderBody()) == true
test "encode header and body with channel id other than 0":
proc testEncodeHeaderBody(): Future[bool] {.async.} =
var step = 0
proc encHandler(msg: seq[byte]) {.async.} =
check msg == fromHex("8a010873747265616d2031")
let stream = newBufferStream(encHandler)
let conn = newConnection(stream)
2019-09-12 02:10:38 +00:00
await conn.writeMsg(17, MessageType.MsgOut, cast[seq[byte]]("stream 1"))
await conn.close()
result = true
await stream.close()
check:
waitFor(testEncodeHeaderBody()) == true
test "decode header with channel id 0":
2019-09-04 01:43:57 +00:00
proc testDecodeHeader(): Future[bool] {.async.} =
PubSub (Gossip & Flood) Implementation (#36) This adds gossipsub and floodsub, as well as basic interop testing with the go libp2p daemon. * add close event * wip: gossipsub * splitting rpc message * making message handling more consistent * initial gossipsub implementation * feat: nim 1.0 cleanup * wip: gossipsub protobuf * adding encoding/decoding of gossipsub messages * add disconnect handler * add proper gossipsub msg handling * misc: cleanup for nim 1.0 * splitting floodsub and gossipsub tests * feat: add mesh rebalansing * test pubsub * add mesh rebalansing tests * testing mesh maintenance * finishing mcache implementatin * wip: commenting out broken tests * wip: don't run heartbeat for now * switchout debug for trace logging * testing gossip peer selection algorithm * test stream piping * more work around message amplification * get the peerid from message * use timed cache as backing store * allow setting timeout in constructor * several changes to improve performance * more through testing of msg amplification * prevent gc issues * allow piping to self and prevent deadlocks * improove floodsub * allow running hook on cache eviction * prevent race conditions * prevent race conditions and improove tests * use hashes as cache keys * removing useless file * don't create a new seq * re-enable pubsub tests * fix imports * reduce number of runs to speed up tests * break out control message processing * normalize sleeps between steps * implement proper transport filtering * initial interop testing * clean up floodsub publish logic * allow dialing without a protocol * adding multiple reads/writes * use protobuf varint in mplex * don't loose conn's peerInfo * initial interop pubsub tests * don't duplicate connections/peers * bring back interop tests * wip: interop * re-enable interop and daemon tests * add multiple read write tests from handlers * don't cleanup channel prematurely * use correct channel to send/receive msgs * adjust tests with latest changes * include interop tests * remove temp logging output * fix ci * use correct public key serialization * additional tests for pubsub interop
2019-12-06 02:16:18 +00:00
let stream = newBufferStream()
let conn = newConnection(stream)
await stream.pushTo(fromHex("000873747265616d2031"))
2019-09-08 06:32:41 +00:00
let msg = await conn.readMsg()
2019-09-03 20:40:51 +00:00
2020-02-12 14:43:42 +00:00
check msg.id == 0
check msg.msgType == MessageType.New
2020-02-12 14:43:42 +00:00
result = true
2019-09-03 20:40:51 +00:00
await stream.close()
2019-09-04 01:43:57 +00:00
check:
waitFor(testDecodeHeader()) == true
test "decode header and body with channel id 0":
proc testDecodeHeader(): Future[bool] {.async.} =
PubSub (Gossip & Flood) Implementation (#36) This adds gossipsub and floodsub, as well as basic interop testing with the go libp2p daemon. * add close event * wip: gossipsub * splitting rpc message * making message handling more consistent * initial gossipsub implementation * feat: nim 1.0 cleanup * wip: gossipsub protobuf * adding encoding/decoding of gossipsub messages * add disconnect handler * add proper gossipsub msg handling * misc: cleanup for nim 1.0 * splitting floodsub and gossipsub tests * feat: add mesh rebalansing * test pubsub * add mesh rebalansing tests * testing mesh maintenance * finishing mcache implementatin * wip: commenting out broken tests * wip: don't run heartbeat for now * switchout debug for trace logging * testing gossip peer selection algorithm * test stream piping * more work around message amplification * get the peerid from message * use timed cache as backing store * allow setting timeout in constructor * several changes to improve performance * more through testing of msg amplification * prevent gc issues * allow piping to self and prevent deadlocks * improove floodsub * allow running hook on cache eviction * prevent race conditions * prevent race conditions and improove tests * use hashes as cache keys * removing useless file * don't create a new seq * re-enable pubsub tests * fix imports * reduce number of runs to speed up tests * break out control message processing * normalize sleeps between steps * implement proper transport filtering * initial interop testing * clean up floodsub publish logic * allow dialing without a protocol * adding multiple reads/writes * use protobuf varint in mplex * don't loose conn's peerInfo * initial interop pubsub tests * don't duplicate connections/peers * bring back interop tests * wip: interop * re-enable interop and daemon tests * add multiple read write tests from handlers * don't cleanup channel prematurely * use correct channel to send/receive msgs * adjust tests with latest changes * include interop tests * remove temp logging output * fix ci * use correct public key serialization * additional tests for pubsub interop
2019-12-06 02:16:18 +00:00
let stream = newBufferStream()
let conn = newConnection(stream)
await stream.pushTo(fromHex("021668656C6C6F2066726F6D206368616E6E656C20302121"))
2019-09-08 06:32:41 +00:00
let msg = await conn.readMsg()
2020-02-12 14:43:42 +00:00
check msg.id == 0
check msg.msgType == MessageType.MsgOut
check cast[string](msg.data) == "hello from channel 0!!"
2020-02-12 14:43:42 +00:00
result = true
await stream.close()
check:
waitFor(testDecodeHeader()) == true
test "decode header and body with channel id other than 0":
proc testDecodeHeader(): Future[bool] {.async.} =
PubSub (Gossip & Flood) Implementation (#36) This adds gossipsub and floodsub, as well as basic interop testing with the go libp2p daemon. * add close event * wip: gossipsub * splitting rpc message * making message handling more consistent * initial gossipsub implementation * feat: nim 1.0 cleanup * wip: gossipsub protobuf * adding encoding/decoding of gossipsub messages * add disconnect handler * add proper gossipsub msg handling * misc: cleanup for nim 1.0 * splitting floodsub and gossipsub tests * feat: add mesh rebalansing * test pubsub * add mesh rebalansing tests * testing mesh maintenance * finishing mcache implementatin * wip: commenting out broken tests * wip: don't run heartbeat for now * switchout debug for trace logging * testing gossip peer selection algorithm * test stream piping * more work around message amplification * get the peerid from message * use timed cache as backing store * allow setting timeout in constructor * several changes to improve performance * more through testing of msg amplification * prevent gc issues * allow piping to self and prevent deadlocks * improove floodsub * allow running hook on cache eviction * prevent race conditions * prevent race conditions and improove tests * use hashes as cache keys * removing useless file * don't create a new seq * re-enable pubsub tests * fix imports * reduce number of runs to speed up tests * break out control message processing * normalize sleeps between steps * implement proper transport filtering * initial interop testing * clean up floodsub publish logic * allow dialing without a protocol * adding multiple reads/writes * use protobuf varint in mplex * don't loose conn's peerInfo * initial interop pubsub tests * don't duplicate connections/peers * bring back interop tests * wip: interop * re-enable interop and daemon tests * add multiple read write tests from handlers * don't cleanup channel prematurely * use correct channel to send/receive msgs * adjust tests with latest changes * include interop tests * remove temp logging output * fix ci * use correct public key serialization * additional tests for pubsub interop
2019-12-06 02:16:18 +00:00
let stream = newBufferStream()
let conn = newConnection(stream)
await stream.pushTo(fromHex("8a011668656C6C6F2066726F6D206368616E6E656C20302121"))
2019-09-08 06:32:41 +00:00
let msg = await conn.readMsg()
2020-02-12 14:43:42 +00:00
check msg.id == 17
check msg.msgType == MessageType.MsgOut
check cast[string](msg.data) == "hello from channel 0!!"
2020-02-12 14:43:42 +00:00
result = true
await stream.close()
check:
waitFor(testDecodeHeader()) == true
2020-02-12 14:43:42 +00:00
2019-09-25 22:57:27 +00:00
test "e2e - read/write receiver":
proc testNewStream(): Future[bool] {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
2019-09-25 22:57:27 +00:00
var
done = newFuture[void]()
done2 = newFuture[void]()
2019-09-25 22:57:27 +00:00
proc connHandler(conn: Connection) {.async, gcsafe.} =
proc handleMplexListen(stream: Connection) {.async, gcsafe.} =
2019-09-25 22:57:27 +00:00
let msg = await stream.readLp()
check cast[string](msg) == "Hello from stream!"
await stream.close()
done.complete()
2019-09-25 22:57:27 +00:00
let mplexListen = newMplex(conn)
mplexListen.streamHandler = handleMplexListen
await mplexListen.handle()
await conn.close()
done2.complete()
2019-09-25 22:57:27 +00:00
let transport1: TcpTransport = newTransport(TcpTransport)
let lfut = await transport1.listen(ma, connHandler)
2020-02-12 14:43:42 +00:00
2019-09-25 22:57:27 +00:00
let transport2: TcpTransport = newTransport(TcpTransport)
let conn = await transport2.dial(transport1.ma)
let mplexDial = newMplex(conn)
let stream = await mplexDial.newStream()
let openState = cast[LPChannel](stream.stream).isOpen
await stream.writeLp("Hello from stream!")
await conn.close()
check openState # not lazy
result = true
await done.wait(5000.millis)
await done2.wait(5000.millis)
await stream.close()
await conn.close()
await transport2.close()
await transport1.close()
await lfut
check:
waitFor(testNewStream()) == true
test "e2e - read/write receiver lazy":
proc testNewStream(): Future[bool] {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
var
done = newFuture[void]()
done2 = newFuture[void]()
proc connHandler(conn: Connection) {.async, gcsafe.} =
proc handleMplexListen(stream: Connection) {.async, gcsafe.} =
let msg = await stream.readLp()
check cast[string](msg) == "Hello from stream!"
await stream.close()
done.complete()
let mplexListen = newMplex(conn)
mplexListen.streamHandler = handleMplexListen
await mplexListen.handle()
done2.complete()
let transport1: TcpTransport = newTransport(TcpTransport)
2020-03-27 23:37:00 +00:00
let listenFut = await transport1.listen(ma, connHandler)
let transport2: TcpTransport = newTransport(TcpTransport)
let conn = await transport2.dial(transport1.ma)
let mplexDial = newMplex(conn)
let stream = await mplexDial.newStream("", true)
let openState = cast[LPChannel](stream.stream).isOpen
2019-09-25 22:57:27 +00:00
await stream.writeLp("Hello from stream!")
await conn.close()
2020-03-27 23:37:00 +00:00
2020-02-12 14:43:42 +00:00
check not openState # assert lazy
2019-09-25 22:57:27 +00:00
result = true
await done.wait(5000.millis)
await done2.wait(5000.millis)
await conn.close()
await stream.close()
2020-03-27 23:37:00 +00:00
await mplexDial.close()
await transport2.close()
2020-03-27 23:37:00 +00:00
await transport1.close()
await listenFut
2019-09-25 22:57:27 +00:00
check:
waitFor(testNewStream()) == true
test "e2e - write fragmented":
proc testNewStream(): Future[bool] {.async.} =
let
ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
listenJob = newFuture[void]()
var bigseq = newSeqOfCap[uint8](MaxMsgSize * 2)
for _ in 0..<MaxMsgSize:
bigseq.add(uint8(rand(uint('A')..uint('z'))))
proc connHandler(conn: Connection) {.async, gcsafe.} =
proc handleMplexListen(stream: Connection) {.async, gcsafe.} =
2020-02-11 05:48:52 +00:00
defer:
await stream.close()
let msg = await stream.readLp()
check msg == bigseq
trace "Bigseq check passed!"
listenJob.complete()
let mplexListen = newMplex(conn)
mplexListen.streamHandler = handleMplexListen
discard mplexListen.handle()
let transport1: TcpTransport = newTransport(TcpTransport)
2020-03-27 23:37:00 +00:00
let listenFut = await transport1.listen(ma, connHandler)
let transport2: TcpTransport = newTransport(TcpTransport)
let conn = await transport2.dial(transport1.ma)
let mplexDial = newMplex(conn)
let stream = await mplexDial.newStream()
await stream.writeLp(bigseq)
2020-02-11 05:48:52 +00:00
try:
await listenJob.wait(millis(5000))
2020-02-11 05:48:52 +00:00
except AsyncTimeoutError:
check false
result = true
await stream.close()
2020-03-27 23:37:00 +00:00
await mplexDial.close()
await conn.close()
await transport2.close()
2020-03-27 23:37:00 +00:00
await transport1.close()
await listenFut
check:
waitFor(testNewStream()) == true
2019-09-25 22:57:27 +00:00
test "e2e - read/write initiator":
proc testNewStream(): Future[bool] {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
let done = newFuture[void]()
2019-09-25 22:57:27 +00:00
proc connHandler(conn: Connection) {.async, gcsafe.} =
proc handleMplexListen(stream: Connection) {.async, gcsafe.} =
2019-09-25 22:57:27 +00:00
await stream.writeLp("Hello from stream!")
await stream.close()
done.complete()
2019-09-25 22:57:27 +00:00
let mplexListen = newMplex(conn)
mplexListen.streamHandler = handleMplexListen
await mplexListen.handle()
let transport1: TcpTransport = newTransport(TcpTransport)
2020-03-27 23:37:00 +00:00
let listenFut = await transport1.listen(ma, connHandler)
2019-09-25 22:57:27 +00:00
let transport2: TcpTransport = newTransport(TcpTransport)
let conn = await transport2.dial(transport1.ma)
let mplexDial = newMplex(conn)
let dialFut = mplexDial.handle()
let stream = await mplexDial.newStream("DIALER")
let msg = cast[string](await stream.readLp())
check msg == "Hello from stream!"
2019-09-25 22:57:27 +00:00
# await dialFut
result = true
await done.wait(5000.millis)
await stream.close()
await conn.close()
2020-03-27 23:37:00 +00:00
await mplexDial.close()
await transport2.close()
2020-03-27 23:37:00 +00:00
await transport1.close()
await listenFut
2019-09-25 22:57:27 +00:00
check:
waitFor(testNewStream()) == true
test "e2e - multiple streams":
proc testNewStream(): Future[bool] {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
let done = newFuture[void]()
2019-09-25 22:57:27 +00:00
var count = 1
var listenConn: Connection
proc connHandler(conn: Connection) {.async, gcsafe.} =
proc handleMplexListen(stream: Connection) {.async, gcsafe.} =
2019-09-25 22:57:27 +00:00
let msg = await stream.readLp()
check cast[string](msg) == &"stream {count}!"
count.inc
await stream.close()
if count == 10:
done.complete()
2019-09-25 22:57:27 +00:00
listenConn = conn
let mplexListen = newMplex(conn)
mplexListen.streamHandler = handleMplexListen
await mplexListen.handle()
2020-03-27 23:37:00 +00:00
let transport1 = newTransport(TcpTransport)
let listenFut = await transport1.listen(ma, connHandler)
2019-09-25 22:57:27 +00:00
let transport2: TcpTransport = newTransport(TcpTransport)
let conn = await transport2.dial(transport1.ma)
let mplexDial = newMplex(conn)
2020-03-27 23:37:00 +00:00
for i in 1..10:
2019-09-25 22:57:27 +00:00
let stream = await mplexDial.newStream()
await stream.writeLp(&"stream {i}!")
await stream.close()
await done.wait(5000.millis)
await conn.close()
await transport2.close()
2020-03-27 23:37:00 +00:00
await mplexDial.close()
2019-09-25 22:57:27 +00:00
await listenConn.close()
2020-03-27 23:37:00 +00:00
await transport1.close()
await listenFut
2019-09-25 22:57:27 +00:00
result = true
check:
waitFor(testNewStream()) == true
test "e2e - multiple read/write streams":
proc testNewStream(): Future[bool] {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
var count = 1
var listenConn: Connection
let done = newFuture[void]()
2019-09-25 22:57:27 +00:00
proc connHandler(conn: Connection) {.async, gcsafe.} =
listenConn = conn
proc handleMplexListen(stream: Connection) {.async, gcsafe.} =
2019-09-25 22:57:27 +00:00
let msg = await stream.readLp()
check cast[string](msg) == &"stream {count} from dialer!"
await stream.writeLp(&"stream {count} from listener!")
count.inc
await stream.close()
if count == 10:
done.complete()
2019-09-25 22:57:27 +00:00
let mplexListen = newMplex(conn)
mplexListen.streamHandler = handleMplexListen
await mplexListen.handle()
2019-09-25 22:57:27 +00:00
let transport1: TcpTransport = newTransport(TcpTransport)
2020-03-27 23:37:00 +00:00
let transportFut = await transport1.listen(ma, connHandler)
2019-09-25 22:57:27 +00:00
let transport2: TcpTransport = newTransport(TcpTransport)
let conn = await transport2.dial(transport1.ma)
let mplexDial = newMplex(conn)
let dialFut = mplexDial.handle()
dialFut.addCallback(proc(udata: pointer = nil) {.gcsafe.}
= trace "completed dialer")
2019-09-25 22:57:27 +00:00
for i in 1..10:
let stream = await mplexDial.newStream("dialer stream")
await stream.writeLp(&"stream {i} from dialer!")
let msg = await stream.readLp()
check cast[string](msg) == &"stream {i} from listener!"
await stream.close()
await done.wait(5.seconds)
2019-09-25 22:57:27 +00:00
await conn.close()
await listenConn.close()
await allFuturesThrowing(dialFut)
await mplexDial.close()
await transport2.close()
2020-03-27 23:37:00 +00:00
await transport1.close()
await transportFut
2019-09-25 22:57:27 +00:00
result = true
check:
waitFor(testNewStream()) == true
2019-09-08 06:32:41 +00:00
2019-09-04 06:40:11 +00:00
test "half closed - channel should close for write":
proc testClosedForWrite(): Future[void] {.async.} =
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let
buff = newBufferStream(writeHandler)
conn = newConnection(buff)
chann = newChannel(1, conn, true)
try:
await chann.close()
await chann.write("Hello")
finally:
await chann.cleanUp()
await conn.close()
2019-09-04 06:40:11 +00:00
expect LPStreamEOFError:
2019-09-04 06:40:11 +00:00
waitFor(testClosedForWrite())
test "half closed - channel should close for read by remote":
proc testClosedForRead(): Future[void] {.async.} =
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let
buff = newBufferStream(writeHandler)
conn = newConnection(buff)
chann = newChannel(1, conn, true)
try:
await chann.pushTo(cast[seq[byte]]("Hello!"))
await chann.closedByRemote()
discard await chann.read() # this should work, since there is data in the buffer
discard await chann.read() # this should throw
finally:
await chann.cleanUp()
await conn.close()
expect LPStreamEOFError:
waitFor(testClosedForRead())
2019-09-04 06:40:11 +00:00
2020-03-11 15:12:08 +00:00
test "jitter - channel should be able to handle erratic read/writes":
proc test(): Future[bool] {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
var complete = newFuture[void]()
2020-03-11 22:23:39 +00:00
const MsgSize = 1024
2020-03-11 15:12:08 +00:00
proc connHandler(conn: Connection) {.async, gcsafe.} =
proc handleMplexListen(stream: Connection) {.async, gcsafe.} =
let msg = await stream.readLp()
check msg.len == MsgSize
await stream.close()
complete.complete()
let mplexListen = newMplex(conn)
mplexListen.streamHandler = handleMplexListen
discard mplexListen.handle()
let transport1: TcpTransport = newTransport(TcpTransport)
2020-03-27 23:37:00 +00:00
let listenFut = await transport1.listen(ma, connHandler)
2020-03-11 15:12:08 +00:00
let transport2: TcpTransport = newTransport(TcpTransport)
let conn = await transport2.dial(transport1.ma)
let mplexDial = newMplex(conn)
let stream = await mplexDial.newStream()
var bigseq = newSeqOfCap[uint8](MaxMsgSize + 1)
for _ in 0..<MsgSize: # write one less than max size
bigseq.add(uint8(rand(uint('A')..uint('z'))))
## create lenght prefixed libp2p frame
var buf = initVBuffer()
buf.writeSeq(bigseq)
buf.finish()
## create mplex header
var mplexBuf = initVBuffer()
mplexBuf.writePBVarint((1.uint shl 3) or ord(MessageType.MsgOut).uint)
mplexBuf.writePBVarint(buf.buffer.len.uint) # size should be always sent
2020-03-11 15:36:45 +00:00
await conn.write(mplexBuf.buffer)
2020-03-11 15:12:08 +00:00
proc writer() {.async.} =
var sent = 0
randomize()
let total = buf.buffer.len
2020-03-11 22:23:39 +00:00
const min = 20
const max = 50
2020-03-11 15:12:08 +00:00
while sent < total:
2020-03-11 22:23:39 +00:00
var size = rand(min..max)
size = if size > buf.buffer.len: buf.buffer.len else: size
var send = buf.buffer[0..<size]
2020-03-11 15:12:08 +00:00
await conn.write(send)
2020-03-11 22:23:39 +00:00
sent += size
buf.buffer = buf.buffer[size..^1]
await writer()
await stream.close()
await conn.close()
await complete
await transport2.close()
2020-03-27 23:37:00 +00:00
await transport1.close()
await listenFut
2020-03-11 22:23:39 +00:00
result = true
check:
waitFor(test()) == true
test "jitter - channel should handle 1 byte read/write":
proc test(): Future[bool] {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
var complete = newFuture[void]()
const MsgSize = 512
proc connHandler(conn: Connection) {.async, gcsafe.} =
proc handleMplexListen(stream: Connection) {.async, gcsafe.} =
let msg = await stream.readLp()
check msg.len == MsgSize
await stream.close()
complete.complete()
let mplexListen = newMplex(conn)
mplexListen.streamHandler = handleMplexListen
discard mplexListen.handle()
let transport1: TcpTransport = newTransport(TcpTransport)
2020-03-27 23:37:00 +00:00
let listenFut = await transport1.listen(ma, connHandler)
2020-03-11 22:23:39 +00:00
let transport2: TcpTransport = newTransport(TcpTransport)
let conn = await transport2.dial(transport1.ma)
let mplexDial = newMplex(conn)
let stream = await mplexDial.newStream()
var bigseq = newSeqOfCap[uint8](MaxMsgSize + 1)
for _ in 0..<MsgSize: # write one less than max size
bigseq.add(uint8(rand(uint('A')..uint('z'))))
## create lenght prefixed libp2p frame
var buf = initVBuffer()
buf.writeSeq(bigseq)
buf.finish()
## create mplex header
var mplexBuf = initVBuffer()
mplexBuf.writePBVarint((1.uint shl 3) or ord(MessageType.MsgOut).uint)
mplexBuf.writePBVarint(buf.buffer.len.uint) # size should be always sent
await conn.write(mplexBuf.buffer)
proc writer() {.async.} =
for i in buf.buffer:
await conn.write(@[i])
2020-03-11 15:12:08 +00:00
await writer()
await stream.close()
await conn.close()
await complete
await transport2.close()
2020-03-27 23:37:00 +00:00
await transport1.close()
await listenFut
2020-03-11 15:12:08 +00:00
result = true
check:
waitFor(test()) == true
2019-09-04 06:40:11 +00:00
test "reset - channel should fail reading":
proc testResetRead(): Future[void] {.async.} =
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let
buff = newBufferStream(writeHandler)
conn = newConnection(buff)
chann = newChannel(1, conn, true)
try:
await chann.reset()
var data = await chann.read()
doAssert(len(data) == 1)
finally:
await chann.cleanUp()
await conn.close()
2019-09-04 06:40:11 +00:00
expect LPStreamEOFError:
2019-09-04 06:40:11 +00:00
waitFor(testResetRead())
test "reset - channel should fail writing":
proc testResetWrite(): Future[void] {.async.} =
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let
buff = newBufferStream(writeHandler)
conn = newConnection(buff)
chann = newChannel(1, conn, true)
try:
await chann.reset()
await chann.write(cast[seq[byte]]("Hello!"))
finally:
await chann.cleanUp()
await conn.close()
2019-09-04 06:40:11 +00:00
expect LPStreamEOFError:
2019-09-04 06:40:11 +00:00
waitFor(testResetWrite())
2019-09-04 06:51:16 +00:00
test "should not allow pushing data to channel when remote end closed":
proc testResetWrite(): Future[void] {.async.} =
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard
let
buff = newBufferStream(writeHandler)
conn = newConnection(buff)
chann = newChannel(1, conn, true)
try:
await chann.closedByRemote()
await chann.pushTo(@[byte(1)])
finally:
await chann.cleanUp()
await conn.close()
2019-09-04 06:51:16 +00:00
expect LPStreamEOFError:
2019-09-04 06:51:16 +00:00
waitFor(testResetWrite())