nim-libp2p/tests/testmultistream.nim

301 lines
9.8 KiB
Nim
Raw Normal View History

2020-04-17 15:54:03 -06:00
import unittest, strutils, sequtils
import chronos, stew/byteutils
import crypto/crypto,
streams/[stream, pushable, connection, utils, lenprefixed],
transports/[transport, tcptransport],
protocols/protocol,
multistream,
multiaddress,
peerinfo,
peer
2019-08-29 23:16:55 -06:00
when defined(nimHasUsed): {.used.}
2020-04-17 15:54:03 -06:00
const
CodecString = "/multistream/1.0.0"
TestProtoString = "/test/proto/1.0.0"
TestString = "HELLO"
CodecBytes = @[19.byte, 47.byte, 109.byte,
117.byte, 108.byte, 116.byte,
105.byte, 115.byte, 116.byte,
114.byte, 101.byte, 97.byte,
109.byte, 47.byte, 49.byte,
46.byte, 48.byte, 46.byte,
48.byte, 10.byte]
TestProtoBytes = @[18.byte, 47.byte, 116.byte,
101.byte, 115.byte, 116.byte,
47.byte, 112.byte, 114.byte,
111.byte, 116.byte, 111.byte,
47.byte, 49.byte, 46.byte, 48.byte,
46.byte, 48.byte, 10.byte]
2019-08-29 23:16:55 -06:00
suite "Multistream select":
test "test select custom proto":
2020-04-19 18:10:58 -06:00
proc test() {.async.} =
2020-04-18 15:41:57 -06:00
let pushable = BytePushable.init()
2020-04-17 15:54:03 -06:00
pushable.sinkImpl = proc(s: Stream[seq[byte]]): Sink[seq[byte]] {.gcsafe.} =
return proc(i: Source[seq[byte]]) {.async, gcsafe.} =
check: (await i()) == CodecBytes
check: (await i()) == TestProtoBytes
await pushable.push(CodecBytes)
await pushable.push(TestProtoBytes)
let conn = Connection.init(pushable)
var ms = MultistreamSelect.init()
2020-04-19 18:10:58 -06:00
check: (await ms.select(conn, @[TestProtoString])) == TestProtoString
await pushable.close()
2019-08-29 23:16:55 -06:00
2020-04-19 18:10:58 -06:00
waitFor(test())
2019-08-29 23:16:55 -06:00
test "test handle custom proto":
2020-04-19 18:10:58 -06:00
proc test() {.async.} =
2020-04-17 15:54:03 -06:00
var ms = MultistreamSelect.init()
2020-04-18 15:41:57 -06:00
let pushable = BytePushable.init()
2020-04-17 15:54:03 -06:00
pushable.sinkImpl = proc(s: Stream[seq[byte]]): Sink[seq[byte]] {.gcsafe.} =
return proc(i: Source[seq[byte]]) {.async.} =
check: (await i()) == CodecBytes
check: (await i()) == TestProtoBytes
await pushable.close()
let conn = Connection.init(pushable)
2019-08-29 23:16:55 -06:00
var protocol: LPProtocol = new LPProtocol
2020-04-17 15:54:03 -06:00
proc testHandler(conn: Connection, proto: string):
2019-08-30 17:45:57 -06:00
Future[void] {.async, gcsafe.} =
2020-04-17 15:54:03 -06:00
check: proto == TestProtoString
2019-09-06 01:13:03 -06:00
await conn.close()
2019-08-29 23:16:55 -06:00
2019-08-30 17:45:57 -06:00
protocol.handler = testHandler
2020-04-17 15:54:03 -06:00
ms.addHandler(TestProtoString, protocol)
var handlerFut = ms.handle(conn)
await pushable.push(CodecBytes)
await pushable.push(TestProtoBytes)
await pushable.close()
await handlerFut
2019-08-29 23:16:55 -06:00
2020-04-19 18:10:58 -06:00
waitFor(test())
2019-08-29 23:16:55 -06:00
test "test handle `ls`":
2020-04-19 18:10:58 -06:00
proc test() {.async.} =
2020-04-17 15:54:03 -06:00
var ms = MultistreamSelect.init()
2020-04-18 15:41:57 -06:00
let pushable = BytePushable.init()
2020-04-17 15:54:03 -06:00
pushable.sinkImpl = proc(s: Stream[seq[byte]]): Sink[seq[byte]] {.gcsafe.} =
return proc(i: Source[seq[byte]]) {.async.} =
check: (await i()) == CodecBytes
check: (await i()) == ("\x26/test/proto1/1.0.0\n" &
"/test/proto2/1.0.0\n").toBytes()
2019-08-29 23:16:55 -06:00
2020-04-17 15:54:03 -06:00
let conn = Connection.init(pushable)
var protocol: LPProtocol = new LPProtocol
2020-04-17 15:54:03 -06:00
protocol.handler = proc(conn: Connection, proto: string):
Future[void] {.async, gcsafe.} = discard
2019-08-30 17:45:57 -06:00
ms.addHandler("/test/proto1/1.0.0", protocol)
ms.addHandler("/test/proto2/1.0.0", protocol)
2020-04-17 15:54:03 -06:00
var handlerFut = ms.handle(conn)
await pushable.push(CodecBytes) # handshake
await pushable.push("\3ls\n".toBytes())
await pushable.close()
await handlerFut
2019-08-29 23:16:55 -06:00
2020-04-19 18:10:58 -06:00
waitFor(test())
2019-08-29 23:16:55 -06:00
test "test handle `na`":
2020-04-19 18:10:58 -06:00
proc test() {.async.} =
2020-04-17 15:54:03 -06:00
var ms = MultistreamSelect.init()
2020-04-18 15:41:57 -06:00
let pushable = BytePushable.init()
2020-04-17 15:54:03 -06:00
pushable.sinkImpl = proc(s: Stream[seq[byte]]): Sink[seq[byte]] {.gcsafe.} =
return proc(i: Source[seq[byte]]) {.async.} =
check: (await i()) == "\3na\n".toBytes()
2020-04-19 18:10:58 -06:00
await pushable.close()
2019-08-29 23:16:55 -06:00
var protocol: LPProtocol = new LPProtocol
2020-04-17 15:54:03 -06:00
proc testHandler(conn: Connection, proto: string):
Future[void] {.async, gcsafe.} = discard
2019-08-30 17:45:57 -06:00
protocol.handler = testHandler
2020-04-17 15:54:03 -06:00
ms.addHandler(TestProtoString, protocol)
let conn = Connection.init(pushable)
2020-04-19 18:10:58 -06:00
asyncCheck ms.handle(conn) # asyncCheck is fine here, because `handle` doesn't exit on `na`
2019-08-29 23:16:55 -06:00
2020-04-17 15:54:03 -06:00
await pushable.push("/invalid/proto".toBytes())
2020-04-19 18:10:58 -06:00
waitFor(test())
2019-08-29 23:16:55 -06:00
test "e2e - handle":
2020-04-19 18:10:58 -06:00
proc test() {.async.} =
2019-10-04 14:10:01 -06:00
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
2019-08-30 17:45:57 -06:00
var protocol: LPProtocol = new LPProtocol
2020-04-17 15:54:03 -06:00
proc testHandler(conn: Connection, proto: string):
Future[void] {.async, gcsafe.} =
2020-04-18 15:41:57 -06:00
var pushable = BytePushable.init()
2020-04-17 15:54:03 -06:00
var lp = LenPrefixed.init()
var sink = pipe(pushable, lp.encoder, conn)
2019-08-29 23:16:55 -06:00
2020-04-17 15:54:03 -06:00
check: proto == TestProtoString
await pushable.push(TestString.toBytes())
2020-04-19 18:10:58 -06:00
await pushable.close()
2019-08-29 23:16:55 -06:00
2020-04-17 15:54:03 -06:00
await sink
protocol.handler = testHandler
2020-04-17 15:54:03 -06:00
var msListen = MultistreamSelect.init()
msListen.addHandler(TestProtoString, protocol)
proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
await msListen.handle(conn)
let transport1: TcpTransport = newTransport(TcpTransport)
2020-04-17 15:54:03 -06:00
let transportFut = await transport1.listen(ma, connHandler)
2020-04-17 15:54:03 -06:00
let msDial = MultistreamSelect.init()
let transport2: TcpTransport = newTransport(TcpTransport)
2019-10-04 14:10:01 -06:00
let conn = await transport2.dial(transport1.ma)
2020-04-19 18:10:58 -06:00
check:
await msDial.select(conn, TestProtoString)
2020-04-17 15:54:03 -06:00
var lp = LenPrefixed.init()
var source = pipe(conn, lp.decoder)
2020-04-17 15:54:03 -06:00
let hello = string.fromBytes(await source())
2020-04-19 18:10:58 -06:00
check: hello == TestString
await conn.close()
2020-04-17 15:54:03 -06:00
await transport1.close()
await transportFut
2020-04-19 18:10:58 -06:00
waitFor(test())
2020-04-17 15:54:03 -06:00
2020-04-19 18:10:58 -06:00
test "e2e - ls":
proc test() {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
2020-04-17 15:54:03 -06:00
2020-04-19 18:10:58 -06:00
var msListen = MultistreamSelect.init()
var protocol: LPProtocol = new LPProtocol
protocol.handler = proc(conn: Connection,
proto: string) {.async, gcsafe.} =
check: false
2020-04-17 15:54:03 -06:00
2020-04-19 18:10:58 -06:00
proc testHandler(conn: Connection,
proto: string):
Future[void] {.async.} = discard
2020-04-17 15:54:03 -06:00
2020-04-19 18:10:58 -06:00
protocol.handler = testHandler
msListen.addHandler("/test/proto1/1.0.0", protocol)
msListen.addHandler("/test/proto2/1.0.0", protocol)
2020-04-17 15:54:03 -06:00
2020-04-19 18:10:58 -06:00
let transport1 = newTransport(TcpTransport)
proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
await msListen.handle(conn)
2020-04-17 15:54:03 -06:00
2020-04-19 18:10:58 -06:00
var transportFut = await transport1.listen(ma, connHandler)
let msDial = MultistreamSelect.init()
let transport2 = newTransport(TcpTransport)
let conn = await transport2.dial(transport1.ma)
2020-04-17 15:54:03 -06:00
2020-04-19 18:10:58 -06:00
check:
(await msDial.list(conn)) == @["/test/proto1/1.0.0", "/test/proto2/1.0.0"]
2020-04-17 15:54:03 -06:00
2020-04-19 18:10:58 -06:00
await conn.close()
await transport1.close()
await transportFut
waitFor(test())
test "e2e - select one from a list with unsupported protos":
proc test() {.async.} =
let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
let lp = LenPrefixed.init()
var protocol: LPProtocol = new LPProtocol
proc testHandler(conn: Connection,
proto: string):
Future[void] {.async, gcsafe.} =
check:
proto == "/test/proto/1.0.0"
var pushable = BytePushable.init()
var sink = pipe(pushable,
lp.encoder(),
conn)
await pushable.push(TestString.toBytes())
await pushable.close()
await sink
protocol.handler = testHandler
2020-04-17 15:54:03 -06:00
2020-04-19 18:10:58 -06:00
var msListen = MultistreamSelect.init()
msListen.addHandler("/test/proto/1.0.0", protocol)
proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
await msListen.handle(conn)
2020-04-17 15:54:03 -06:00
2020-04-19 18:10:58 -06:00
let transport1: TcpTransport = newTransport(TcpTransport)
var transportFut = await transport1.listen(ma, connHandler)
2020-04-17 15:54:03 -06:00
2020-04-19 18:10:58 -06:00
let msDial = MultistreamSelect.init()
let transport2: TcpTransport = newTransport(TcpTransport)
let conn = await transport2.dial(transport1.ma)
2020-04-17 15:54:03 -06:00
2020-04-19 18:10:58 -06:00
check:
(await msDial.select(conn,
@["/test/proto/1.0.0",
"/test/no/proto/1.0.0"])) == "/test/proto/1.0.0"
2020-04-17 15:54:03 -06:00
2020-04-19 18:10:58 -06:00
var source = pipe(conn, lp.decoder())
check:
(await source()) == TestString.toBytes()
2020-04-17 15:54:03 -06:00
2020-04-19 18:10:58 -06:00
await conn.close()
await transport1.close()
await transportFut
2020-04-17 15:54:03 -06:00
2020-04-19 18:10:58 -06:00
waitFor(test())
2020-04-17 15:54:03 -06:00
# test "e2e - select one with both valid":
# proc endToEnd(): Future[bool] {.async.} =
# let ma: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
# var protocol: LPProtocol = new LPProtocol
# proc testHandler(conn: Connection,
# proto: string):
# Future[void] {.async, gcsafe.} =
# await conn.writeLp(&"Hello from {proto}!")
# await conn.close()
# protocol.handler = testHandler
# let msListen = newMultistream()
# msListen.addHandler("/test/proto1/1.0.0", protocol)
# msListen.addHandler("/test/proto2/1.0.0", protocol)
# proc connHandler(conn: Connection): Future[void] {.async, gcsafe.} =
# await msListen.handle(conn)
# let transport1: TcpTransport = newTransport(TcpTransport)
# asyncCheck transport1.listen(ma, connHandler)
# let msDial = newMultistream()
# let transport2: TcpTransport = newTransport(TcpTransport)
# let conn = await transport2.dial(transport1.ma)
# check (await msDial.select(conn, @["/test/proto2/1.0.0", "/test/proto1/1.0.0"])) == "/test/proto2/1.0.0"
# result = cast[string](await conn.readLp()) == "Hello from /test/proto2/1.0.0!"
# await conn.close()
# check:
# waitFor(endToEnd()) == true