add multiple read/write streams test

This commit is contained in:
Dmitriy Ryajov 2019-09-08 00:32:41 -06:00
parent 0c416e757e
commit 997745b7e7
1 changed files with 65 additions and 16 deletions

View File

@ -1,4 +1,4 @@
import unittest, sequtils, sugar, strformat import unittest, sequtils, sugar, strformat, options
import chronos, nimcrypto/utils import chronos, nimcrypto/utils
import ../libp2p/connection, import ../libp2p/connection,
../libp2p/stream/lpstream, ../libp2p/stream/lpstream,
@ -10,7 +10,8 @@ import ../libp2p/connection,
../libp2p/muxers/mplex/mplex, ../libp2p/muxers/mplex/mplex,
../libp2p/muxers/mplex/coder, ../libp2p/muxers/mplex/coder,
../libp2p/muxers/mplex/types, ../libp2p/muxers/mplex/types,
../libp2p/muxers/mplex/channel ../libp2p/muxers/mplex/channel,
../libp2p/helpers/debug
suite "Mplex": suite "Mplex":
test "encode header with channel id 0": test "encode header with channel id 0":
@ -74,11 +75,12 @@ suite "Mplex":
let stream = newBufferStream(encHandler) let stream = newBufferStream(encHandler)
let conn = newConnection(stream) let conn = newConnection(stream)
await stream.pushTo(fromHex("000873747265616d2031")) await stream.pushTo(fromHex("000873747265616d2031"))
let (id, msgType, data) = await conn.readMsg() let msg = await conn.readMsg()
check id == 0 if msg.isSome:
check msgType == MessageType.New check msg.get().id == 0
result = true check msg.get().msgType == MessageType.New
result = true
check: check:
waitFor(testDecodeHeader()) == true waitFor(testDecodeHeader()) == true
@ -89,12 +91,13 @@ suite "Mplex":
let stream = newBufferStream(encHandler) let stream = newBufferStream(encHandler)
let conn = newConnection(stream) let conn = newConnection(stream)
await stream.pushTo(fromHex("021668656C6C6F2066726F6D206368616E6E656C20302121")) await stream.pushTo(fromHex("021668656C6C6F2066726F6D206368616E6E656C20302121"))
let (id, msgType, data) = await conn.readMsg() let msg = await conn.readMsg()
check id == 0 if msg.isSome:
check msgType == MessageType.MsgOut check msg.get().id == 0
check cast[string](data) == "hello from channel 0!!" check msg.get().msgType == MessageType.MsgOut
result = true check cast[string](msg.get().data) == "hello from channel 0!!"
result = true
check: check:
waitFor(testDecodeHeader()) == true waitFor(testDecodeHeader()) == true
@ -105,12 +108,13 @@ suite "Mplex":
let stream = newBufferStream(encHandler) let stream = newBufferStream(encHandler)
let conn = newConnection(stream) let conn = newConnection(stream)
await stream.pushTo(fromHex("8a011668656C6C6F2066726F6D206368616E6E656C20302121")) await stream.pushTo(fromHex("8a011668656C6C6F2066726F6D206368616E6E656C20302121"))
let (id, msgType, data) = await conn.readMsg() let msg = await conn.readMsg()
check id == 17 if msg.isSome:
check msgType == MessageType.MsgOut check msg.get().id == 17
check cast[string](data) == "hello from channel 0!!" check msg.get().msgType == MessageType.MsgOut
result = true check cast[string](msg.get().data) == "hello from channel 0!!"
result = true
check: check:
waitFor(testDecodeHeader()) == true waitFor(testDecodeHeader()) == true
@ -209,6 +213,51 @@ suite "Mplex":
check: check:
waitFor(testNewStream()) == true waitFor(testNewStream()) == true
test "e2e - multiple read/write streams":
proc testNewStream(): Future[bool] {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/127.0.0.1/tcp/53383")
var count = 1
var listenFut: Future[void]
proc connHandler(conn: Connection) {.async, gcsafe.} =
proc handleMplexListen(stream: Connection) {.async, gcsafe.} =
let msg = await stream.readLp()
check cast[string](msg) == &"stream {count} from dialer!"
await stream.writeLp(&"stream {count} from listener!")
count.inc
await stream.close()
let mplexListen = newMplex(conn)
mplexListen.streamHandler = handleMplexListen
listenFut = mplexListen.handle()
listenFut.addCallback(proc(udata: pointer) {.gcsafe.}
= debug "completed listener")
let transport1: TcpTransport = newTransport(TcpTransport)
await transport1.listen(ma, connHandler)
let transport2: TcpTransport = newTransport(TcpTransport)
let conn = await transport2.dial(ma)
let mplexDial = newMplex(conn)
let dialFut = mplexDial.handle()
dialFut.addCallback(proc(udata: pointer = nil) {.gcsafe.}
= debug "completed dialer")
for i in 1..10:
let stream = await mplexDial.newStream("dialer stream")
await stream.writeLp(&"stream {i} from dialer!")
let msg = await stream.readLp()
check cast[string](msg) == &"stream {i} from listener!"
await stream.close()
await conn.close()
listenFut.complete()
dialFut.complete()
result = true
check:
waitFor(testNewStream()) == true
test "half closed - channel should close for write": test "half closed - channel should close for write":
proc testClosedForWrite(): Future[void] {.async.} = proc testClosedForWrite(): Future[void] {.async.} =
proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard proc writeHandler(data: seq[byte]) {.async, gcsafe.} = discard