nim-libp2p/tests/testswitch.nim

258 lines
7.5 KiB
Nim
Raw Normal View History

{.used.}
2020-01-07 08:06:27 +00:00
import unittest, tables
import chronos
2020-05-23 19:26:25 +00:00
import stew/byteutils
import nimcrypto/sysrand
import ../libp2p/[errors,
switch,
2019-09-30 15:48:40 +00:00
multistream,
2020-05-23 19:26:25 +00:00
standard_setup,
stream/bufferstream,
protocols/identify,
2019-09-30 15:48:40 +00:00
connection,
transports/transport,
transports/tcptransport,
multiaddress,
2019-09-30 15:48:40 +00:00
peerinfo,
crypto/crypto,
protocols/protocol,
2019-09-30 15:48:40 +00:00
muxers/muxer,
muxers/mplex/mplex,
2019-09-30 15:48:40 +00:00
muxers/mplex/types,
protocols/secure/secio,
protocols/secure/secure,
stream/lpstream]
import ./helpers
const
TestCodec = "/test/proto/1.0.0"
2019-08-31 17:58:49 +00:00
type
TestProto = ref object of LPProtocol
2019-12-23 18:44:51 +00:00
suite "Switch":
teardown:
for tracker in testTrackers():
# echo tracker.dump()
check tracker.isLeaked() == false
2019-12-23 18:44:51 +00:00
test "e2e use switch dial proto string":
2020-05-25 14:40:39 +00:00
proc testSwitch() {.async, gcsafe.} =
let done = newFuture[void]()
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
try:
let msg = string.fromBytes(await conn.readLp(1024))
check "Hello!" == msg
await conn.writeLp("Hello!")
finally:
await conn.close()
done.complete()
let testProto = new TestProto
testProto.codec = TestCodec
testProto.handler = handle
let switch1 = newStandardSwitch()
switch1.mount(testProto)
let switch2 = newStandardSwitch()
var awaiters: seq[Future[void]]
awaiters.add(await switch1.start())
awaiters.add(await switch2.start())
let conn = await switch2.dial(switch1.peerInfo, TestCodec)
await conn.writeLp("Hello!")
let msg = string.fromBytes(await conn.readLp(1024))
check "Hello!" == msg
await conn.close()
await all(
done.wait(5.seconds), #[if OK won't happen!!]#
switch1.stop(),
switch2.stop(),
)
# this needs to go at end
await all(awaiters)
waitFor(testSwitch())
test "e2e should not leak bufferstreams and connections on channel close":
proc testSwitch() {.async, gcsafe.} =
let done = newFuture[void]()
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
2020-05-23 19:26:25 +00:00
try:
let msg = string.fromBytes(await conn.readLp(1024))
check "Hello!" == msg
await conn.writeLp("Hello!")
finally:
await conn.close()
done.complete()
2019-09-04 22:00:39 +00:00
let testProto = new TestProto
testProto.codec = TestCodec
testProto.handler = handle
2020-05-23 19:26:25 +00:00
let switch1 = newStandardSwitch()
2019-08-31 17:58:49 +00:00
switch1.mount(testProto)
2020-05-23 19:26:25 +00:00
let switch2 = newStandardSwitch()
var awaiters: seq[Future[void]]
2020-02-12 14:43:42 +00:00
awaiters.add(await switch1.start())
2020-01-07 08:06:27 +00:00
awaiters.add(await switch2.start())
2019-09-30 15:48:40 +00:00
let conn = await switch2.dial(switch1.peerInfo, TestCodec)
await conn.writeLp("Hello!")
2020-05-23 19:26:25 +00:00
let msg = string.fromBytes(await conn.readLp(1024))
check "Hello!" == msg
2020-05-23 19:26:25 +00:00
await conn.close()
await sleepAsync(2.seconds) # wait a little for cleanup to happen
var bufferTracker = getTracker(BufferStreamTrackerName)
# echo bufferTracker.dump()
# plus 4 for the pubsub streams
check (BufferStreamTracker(bufferTracker).opened ==
(BufferStreamTracker(bufferTracker).closed + 4.uint64))
var connTracker = getTracker(ConnectionTrackerName)
# echo connTracker.dump()
# plus 8 is for the secured connection and the socket
# and the pubsub streams that won't clean up until
# `disconnect()` or `stop()`
check (ConnectionTracker(connTracker).opened ==
(ConnectionTracker(connTracker).closed + 8.uint64))
2019-08-31 17:58:49 +00:00
await all(
2020-05-23 19:26:25 +00:00
done.wait(5.seconds), #[if OK won't happen!!]#
switch1.stop(),
switch2.stop(),
)
# this needs to go at end
await all(awaiters)
2019-08-31 17:58:49 +00:00
waitFor(testSwitch())
2019-12-23 18:44:51 +00:00
2020-05-23 19:26:25 +00:00
test "e2e use connect then dial":
2020-05-14 15:19:03 +00:00
proc testSwitch(): Future[bool] {.async, gcsafe.} =
var awaiters: seq[Future[void]]
proc handle(conn: Connection, proto: string) {.async, gcsafe.} =
2020-05-23 19:26:25 +00:00
try:
let msg = string.fromBytes(await conn.readLp(1024))
check "Hello!" == msg
finally:
await conn.writeLp("Hello!")
await conn.close()
2020-05-14 15:19:03 +00:00
let testProto = new TestProto
testProto.codec = TestCodec
testProto.handler = handle
2020-05-23 19:26:25 +00:00
let switch1 = newStandardSwitch()
2020-05-14 15:19:03 +00:00
switch1.mount(testProto)
2020-05-23 19:26:25 +00:00
let switch2 = newStandardSwitch()
2020-05-14 15:19:03 +00:00
awaiters.add(await switch1.start())
awaiters.add(await switch2.start())
2020-05-23 19:26:25 +00:00
2020-05-14 15:19:03 +00:00
await switch2.connect(switch1.peerInfo)
2020-05-23 19:26:25 +00:00
check switch1.peerInfo.id in switch2.connections
2020-05-14 15:19:03 +00:00
let conn = await switch2.dial(switch1.peerInfo, TestCodec)
try:
await conn.writeLp("Hello!")
2020-05-23 19:26:25 +00:00
let msg = string.fromBytes(await conn.readLp(1024))
2020-05-14 15:19:03 +00:00
check "Hello!" == msg
result = true
except LPStreamError:
result = false
await all(
2020-05-14 15:19:03 +00:00
conn.close(),
switch1.stop(),
switch2.stop()
)
await all(awaiters)
2020-05-14 15:19:03 +00:00
check:
waitFor(testSwitch()) == true
2020-05-23 19:26:25 +00:00
test "e2e should not leak on peer disconnect":
proc testSwitch() {.async, gcsafe.} =
var awaiters: seq[Future[void]]
let switch1 = newStandardSwitch()
let switch2 = newStandardSwitch()
awaiters.add(await switch1.start())
awaiters.add(await switch2.start())
await switch2.connect(switch1.peerInfo)
await sleepAsync(100.millis)
await switch2.disconnect(switch1.peerInfo)
await sleepAsync(2.seconds)
var bufferTracker = getTracker(BufferStreamTrackerName)
# echo bufferTracker.dump()
check bufferTracker.isLeaked() == false
var connTracker = getTracker(ConnectionTrackerName)
# echo connTracker.dump()
check connTracker.isLeaked() == false
await all(
switch1.stop(),
switch2.stop()
)
await all(awaiters)
waitFor(testSwitch())
# test "e2e: handle read + secio fragmented":
# proc testListenerDialer(): Future[bool] {.async.} =
# let
# server: MultiAddress = Multiaddress.init("/ip4/0.0.0.0/tcp/0")
# serverInfo = PeerInfo.init(PrivateKey.random(ECDSA), [server])
# serverNoise = newSecio(serverInfo.privateKey)
# readTask = newFuture[void]()
# var hugePayload = newSeq[byte](0x1200000)
# check randomBytes(hugePayload) == hugePayload.len
# trace "Sending huge payload", size = hugePayload.len
# proc connHandler(conn: Connection) {.async, gcsafe.} =
# let sconn = await serverNoise.secure(conn)
# defer:
# await sconn.close()
# let msg = await sconn.read(0x1200000)
# check msg == hugePayload
# readTask.complete()
# let
# transport1: TcpTransport = TcpTransport.init()
# asyncCheck await transport1.listen(server, connHandler)
# let
# transport2: TcpTransport = TcpTransport.init()
# clientInfo = PeerInfo.init(PrivateKey.random(ECDSA), [transport1.ma])
# clientNoise = newSecio(clientInfo.privateKey)
# conn = await transport2.dial(transport1.ma)
# sconn = await clientNoise.secure(conn)
# await sconn.write(hugePayload)
# await readTask
# await sconn.close()
# await conn.close()
# await transport2.close()
# await transport1.close()
# result = true
# check:
# waitFor(testListenerDialer()) == true