nim-libp2p/tests/testrelayv2.nim

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

512 lines
18 KiB
Nim
Raw Permalink Normal View History

2022-08-01 12:31:22 +00:00
{.used.}
# Nim-Libp2p
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
2022-08-01 12:31:22 +00:00
import bearssl, chronos, options
import ../libp2p
import
../libp2p/[
protocols/connectivity/relay/relay,
protocols/connectivity/relay/messages,
protocols/connectivity/relay/utils,
protocols/connectivity/relay/client,
]
2022-08-01 12:31:22 +00:00
import ./helpers
import std/times
import stew/byteutils
proc createSwitch(r: Relay = nil, useYamux: bool = false): Switch =
var builder = SwitchBuilder
.new()
2022-08-01 12:31:22 +00:00
.withRng(newRng())
.withAddresses(@[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()])
.withTcpTransport()
if useYamux:
builder = builder.withYamux()
else:
builder = builder.withMplex()
if r != nil:
builder = builder.withCircuitRelay(r)
return builder.withNoise().build()
2022-08-01 12:31:22 +00:00
suite "Circuit Relay V2":
suite "Reservation":
asyncTeardown:
await allFutures(src1.stop(), src2.stop(), rel.stop())
checkTrackers()
var
ttl {.threadvar.}: int
ldur {.threadvar.}: uint32
ldata {.threadvar.}: uint64
cl1 {.threadvar.}: RelayClient
cl2 {.threadvar.}: RelayClient
rv2 {.threadvar.}: Relay
src1 {.threadvar.}: Switch
src2 {.threadvar.}: Switch
rel {.threadvar.}: Switch
rsvp {.threadvar.}: Rsvp
range {.threadvar.}: HSlice[times.DateTime, times.DateTime]
asyncSetup:
2023-02-15 10:18:42 +00:00
ttl = 3
2022-08-01 12:31:22 +00:00
ldur = 60
ldata = 2048
cl1 = RelayClient.new()
cl2 = RelayClient.new()
rv2 = Relay.new(
reservationTTL = initDuration(seconds = ttl),
limitDuration = ldur,
limitData = ldata,
maxCircuit = 1,
)
src1 = createSwitch(cl1)
src2 = createSwitch(cl2)
rel = createSwitch(rv2)
await src1.start()
await src2.start()
await rel.start()
await src1.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
await src2.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
rsvp = await cl1.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
range = now().utc + (ttl - 3).seconds .. now().utc + (ttl + 3).seconds
check:
rsvp.expire.int64.fromUnix.utc in range
rsvp.limitDuration == ldur
rsvp.limitData == ldata
asyncTest "Too many reservations":
let conn =
await cl2.switch.dial(rel.peerInfo.peerId, rel.peerInfo.addrs, RelayV2HopCodec)
let pb = encode(HopMessage(msgType: HopMessageType.Reserve))
await conn.writeLp(pb.buffer)
let msg = HopMessage.decode(await conn.readLp(RelayMsgSize)).get()
check:
msg.msgType == HopMessageType.Status
msg.status == Opt.some(StatusV2.ReservationRefused)
2022-08-01 12:31:22 +00:00
asyncTest "Too many reservations + Reconnect":
expect(ReservationError):
discard await cl2.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
await rel.disconnect(src1.peerInfo.peerId)
rsvp = await cl2.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
range = now().utc + (ttl - 3).seconds .. now().utc + (ttl + 3).seconds
check:
rsvp.expire.int64.fromUnix.utc in range
rsvp.limitDuration == ldur
rsvp.limitData == ldata
asyncTest "Reservation ttl expires":
await sleepAsync(chronos.timer.seconds(ttl + 1))
rsvp = await cl1.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
range = now().utc + (ttl - 3).seconds .. now().utc + (ttl + 3).seconds
check:
rsvp.expire.int64.fromUnix.utc in range
rsvp.limitDuration == ldur
rsvp.limitData == ldata
asyncTest "Reservation over relay":
let
rv2add = Relay.new()
addrs =
@[
MultiAddress
.init(
$rel.peerInfo.addrs[0] & "/p2p/" & $rel.peerInfo.peerId & "/p2p-circuit"
)
.get()
]
2022-08-01 12:31:22 +00:00
rv2add.setup(src2)
await rv2add.start()
src2.mount(rv2add)
rv2.maxCircuit.inc()
rsvp = await cl2.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
range = now().utc + (ttl - 3).seconds .. now().utc + (ttl + 3).seconds
check:
rsvp.expire.int64.fromUnix.utc in range
rsvp.limitDuration == ldur
rsvp.limitData == ldata
expect(ReservationError):
discard await cl1.reserve(src2.peerInfo.peerId, addrs)
for (useYamux, muxName) in [(false, "Mplex"), (true, "Yamux")]:
suite "Circuit Relay V2 Connection using " & muxName:
asyncTeardown:
checkTrackers()
var
customProtoCodec {.threadvar.}: string
proto {.threadvar.}: LPProtocol
ttl {.threadvar.}: int
ldur {.threadvar.}: uint32
ldata {.threadvar.}: uint64
srcCl {.threadvar.}: RelayClient
dstCl {.threadvar.}: RelayClient
rv2 {.threadvar.}: Relay
src {.threadvar.}: Switch
dst {.threadvar.}: Switch
rel {.threadvar.}: Switch
rsvp {.threadvar.}: Rsvp
conn {.threadvar.}: Connection
asyncSetup:
customProtoCodec = "/test"
proto = new LPProtocol
proto.codec = customProtoCodec
ttl = 60
ldur = 120
ldata = 16384
srcCl = RelayClient.new()
dstCl = RelayClient.new()
src = createSwitch(srcCl, useYamux)
dst = createSwitch(dstCl, useYamux)
rel = createSwitch(nil, useYamux)
asyncTest "Connection succeed":
proto.handler = proc(conn: Connection, proto: string) {.async.} =
check:
"test1" == string.fromBytes(await conn.readLp(1024))
await conn.writeLp("test2")
check:
"test3" == string.fromBytes(await conn.readLp(1024))
await conn.writeLp("test4")
await conn.close()
rv2 = Relay.new(
reservationTTL = initDuration(seconds = ttl),
limitDuration = ldur,
limitData = ldata,
)
rv2.setup(rel)
rel.mount(rv2)
dst.mount(proto)
await rel.start()
await src.start()
await dst.start()
let addrs = MultiAddress
.init(
$rel.peerInfo.addrs[0] & "/p2p/" & $rel.peerInfo.peerId & "/p2p-circuit"
)
.get()
2022-08-01 12:31:22 +00:00
await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
await dst.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
2022-08-01 12:31:22 +00:00
rsvp = await dstCl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
2022-10-20 10:22:28 +00:00
conn = await src.dial(dst.peerInfo.peerId, @[addrs], customProtoCodec)
await conn.writeLp("test1")
check:
"test2" == string.fromBytes(await conn.readLp(1024))
await conn.writeLp("test3")
check:
"test4" == string.fromBytes(await conn.readLp(1024))
await allFutures(conn.close())
await allFutures(src.stop(), dst.stop(), rel.stop())
asyncTest "Connection duration exceeded":
ldur = 3
proto.handler = proc(conn: Connection, proto: string) {.async.} =
check "wanna sleep?" == string.fromBytes(await conn.readLp(1024))
await conn.writeLp("yeah!")
check "go!" == string.fromBytes(await conn.readLp(1024))
await sleepAsync(chronos.timer.seconds(ldur + 1))
await conn.writeLp("that was a cool power nap")
await conn.close()
rv2 = Relay.new(
reservationTTL = initDuration(seconds = ttl),
limitDuration = ldur,
limitData = ldata,
)
rv2.setup(rel)
rel.mount(rv2)
dst.mount(proto)
await rel.start()
await src.start()
await dst.start()
let addrs = MultiAddress
.init(
$rel.peerInfo.addrs[0] & "/p2p/" & $rel.peerInfo.peerId & "/p2p-circuit"
)
.get()
2022-08-01 12:31:22 +00:00
await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
await dst.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
2022-08-01 12:31:22 +00:00
rsvp = await dstCl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
conn = await src.dial(dst.peerInfo.peerId, @[addrs], customProtoCodec)
await conn.writeLp("wanna sleep?")
check:
"yeah!" == string.fromBytes(await conn.readLp(1024))
await conn.writeLp("go!")
expect(LPStreamEOFError):
discard await conn.readLp(1024)
await allFutures(conn.close())
await allFutures(src.stop(), dst.stop(), rel.stop())
asyncTest "Connection data exceeded":
ldata = 1000
proto.handler = proc(conn: Connection, proto: string) {.async.} =
check "count me the better story you know" ==
string.fromBytes(await conn.readLp(1024))
await conn.writeLp("do you expect a lorem ipsum or...?")
check "surprise me!" == string.fromBytes(await conn.readLp(1024))
await conn.writeLp(
"""Call me Ishmael. Some years ago--never mind how long
precisely--having little or no money in my purse, and nothing
particular to interest me on shore, I thought I would sail about a
little and see the watery part of the world. It is a way I have of
driving off the spleen and regulating the circulation. Whenever I
find myself growing grim about the mouth; whenever it is a damp,
drizzly November in my soul; whenever I find myself involuntarily
pausing before coffin warehouses, and bringing up the rear of every
funeral I meet; and especially whenever my hypos get such an upper
hand of me, that it requires a strong moral principle to prevent me
from deliberately stepping into the street, and methodically knocking
people's hats off--then, I account it high time to get to sea as soon
as I can. This is my substitute for pistol and ball. With a
philosophical flourish Cato throws himself upon his sword; I quietly
take to the ship."""
)
rv2 = Relay.new(
reservationTTL = initDuration(seconds = ttl),
limitDuration = ldur,
limitData = ldata,
)
rv2.setup(rel)
rel.mount(rv2)
dst.mount(proto)
await rel.start()
await src.start()
await dst.start()
let addrs = MultiAddress
.init(
$rel.peerInfo.addrs[0] & "/p2p/" & $rel.peerInfo.peerId & "/p2p-circuit"
)
.get()
2022-08-01 12:31:22 +00:00
await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
await dst.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
2022-08-01 12:31:22 +00:00
rsvp = await dstCl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
conn = await src.dial(dst.peerInfo.peerId, @[addrs], customProtoCodec)
await conn.writeLp("count me the better story you know")
check:
"do you expect a lorem ipsum or...?" ==
string.fromBytes(await conn.readLp(1024))
await conn.writeLp("surprise me!")
expect(LPStreamEOFError):
discard await conn.readLp(1024)
await allFutures(conn.close())
await allFutures(src.stop(), dst.stop(), rel.stop())
asyncTest "Reservation ttl expire during connection":
ttl = 3
proto.handler = proc(conn: Connection, proto: string) {.async.} =
check:
"test1" == string.fromBytes(await conn.readLp(1024))
await conn.writeLp("test2")
check:
"test3" == string.fromBytes(await conn.readLp(1024))
await conn.writeLp("test4")
await conn.close()
rv2 = Relay.new(
reservationTTL = initDuration(seconds = ttl),
limitDuration = ldur,
limitData = ldata,
)
rv2.setup(rel)
rel.mount(rv2)
dst.mount(proto)
await rel.start()
await src.start()
await dst.start()
let addrs = MultiAddress
.init(
$rel.peerInfo.addrs[0] & "/p2p/" & $rel.peerInfo.peerId & "/p2p-circuit"
)
.get()
2022-10-20 10:22:28 +00:00
await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
await dst.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
2022-10-20 10:22:28 +00:00
rsvp = await dstCl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
conn = await src.dial(dst.peerInfo.peerId, @[addrs], customProtoCodec)
await conn.writeLp("test1")
check:
"test2" == string.fromBytes(await conn.readLp(1024))
await conn.writeLp("test3")
check:
"test4" == string.fromBytes(await conn.readLp(1024))
await src.disconnect(rel.peerInfo.peerId)
await sleepAsync(chronos.timer.seconds(ttl + 1))
expect(DialFailedError):
await conn.close()
await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
conn = await src.dial(dst.peerInfo.peerId, @[addrs], customProtoCodec)
await allFutures(conn.close())
await allFutures(src.stop(), dst.stop(), rel.stop())
asyncTest "Connection over relay":
# src => rel => rel2 => dst
# rel2 reserve rel
# dst reserve rel2
# src try to connect with dst
proto.handler = proc(conn: Connection, proto: string) {.async.} =
raise newException(CatchableError, "Should not be here")
let
rel2Cl = RelayClient.new(canHop = true)
rel2 = createSwitch(rel2Cl, useYamux)
rv2 = Relay.new()
rv2.setup(rel)
rel.mount(rv2)
dst.mount(proto)
await rel.start()
await rel2.start()
await src.start()
await dst.start()
let addrs =
@[
MultiAddress
.init(
$rel.peerInfo.addrs[0] & "/p2p/" & $rel.peerInfo.peerId &
"/p2p-circuit/p2p/" & $rel2.peerInfo.peerId & "/p2p/" &
$rel2.peerInfo.peerId & "/p2p-circuit"
)
.get()
]
2022-10-20 10:22:28 +00:00
await src.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
await rel2.connect(rel.peerInfo.peerId, rel.peerInfo.addrs)
await dst.connect(rel2.peerInfo.peerId, rel2.peerInfo.addrs)
rsvp = await rel2Cl.reserve(rel.peerInfo.peerId, rel.peerInfo.addrs)
let rsvp2 = await dstCl.reserve(rel2.peerInfo.peerId, rel2.peerInfo.addrs)
expect(DialFailedError):
conn = await src.dial(dst.peerInfo.peerId, addrs, customProtoCodec)
if not conn.isNil():
await allFutures(conn.close())
await allFutures(src.stop(), dst.stop(), rel.stop(), rel2.stop())
asyncTest "Connection using ClientRelay":
var
protoABC = new LPProtocol
protoBCA = new LPProtocol
protoCAB = new LPProtocol
protoABC.codec = "/abctest"
protoABC.handler = proc(conn: Connection, proto: string) {.async.} =
check:
"testABC1" == string.fromBytes(await conn.readLp(1024))
await conn.writeLp("testABC2")
check:
"testABC3" == string.fromBytes(await conn.readLp(1024))
await conn.writeLp("testABC4")
await conn.close()
protoBCA.codec = "/bcatest"
protoBCA.handler = proc(conn: Connection, proto: string) {.async.} =
check:
"testBCA1" == string.fromBytes(await conn.readLp(1024))
await conn.writeLp("testBCA2")
check:
"testBCA3" == string.fromBytes(await conn.readLp(1024))
await conn.writeLp("testBCA4")
await conn.close()
protoCAB.codec = "/cabtest"
protoCAB.handler = proc(conn: Connection, proto: string) {.async.} =
check:
"testCAB1" == string.fromBytes(await conn.readLp(1024))
await conn.writeLp("testCAB2")
check:
"testCAB3" == string.fromBytes(await conn.readLp(1024))
await conn.writeLp("testCAB4")
await conn.close()
let
clientA = RelayClient.new(canHop = true)
clientB = RelayClient.new(canHop = true)
clientC = RelayClient.new(canHop = true)
switchA = createSwitch(clientA, useYamux)
switchB = createSwitch(clientB, useYamux)
switchC = createSwitch(clientC, useYamux)
switchA.mount(protoBCA)
switchB.mount(protoCAB)
switchC.mount(protoABC)
await switchA.start()
await switchB.start()
await switchC.start()
let
addrsABC = MultiAddress
.init(
$switchB.peerInfo.addrs[0] & "/p2p/" & $switchB.peerInfo.peerId &
"/p2p-circuit"
)
.get()
addrsBCA = MultiAddress
.init(
$switchC.peerInfo.addrs[0] & "/p2p/" & $switchC.peerInfo.peerId &
"/p2p-circuit"
)
.get()
addrsCAB = MultiAddress
.init(
$switchA.peerInfo.addrs[0] & "/p2p/" & $switchA.peerInfo.peerId &
"/p2p-circuit"
)
.get()
await switchA.connect(switchB.peerInfo.peerId, switchB.peerInfo.addrs)
await switchB.connect(switchC.peerInfo.peerId, switchC.peerInfo.addrs)
await switchC.connect(switchA.peerInfo.peerId, switchA.peerInfo.addrs)
let rsvpABC =
await clientA.reserve(switchC.peerInfo.peerId, switchC.peerInfo.addrs)
let rsvpBCA =
await clientB.reserve(switchA.peerInfo.peerId, switchA.peerInfo.addrs)
let rsvpCAB =
await clientC.reserve(switchB.peerInfo.peerId, switchB.peerInfo.addrs)
let connABC =
await switchA.dial(switchC.peerInfo.peerId, @[addrsABC], "/abctest")
let connBCA =
await switchB.dial(switchA.peerInfo.peerId, @[addrsBCA], "/bcatest")
let connCAB =
await switchC.dial(switchB.peerInfo.peerId, @[addrsCAB], "/cabtest")
await connABC.writeLp("testABC1")
await connBCA.writeLp("testBCA1")
await connCAB.writeLp("testCAB1")
check:
"testABC2" == string.fromBytes(await connABC.readLp(1024))
"testBCA2" == string.fromBytes(await connBCA.readLp(1024))
"testCAB2" == string.fromBytes(await connCAB.readLp(1024))
await connABC.writeLp("testABC3")
await connBCA.writeLp("testBCA3")
await connCAB.writeLp("testCAB3")
check:
"testABC4" == string.fromBytes(await connABC.readLp(1024))
"testBCA4" == string.fromBytes(await connBCA.readLp(1024))
"testCAB4" == string.fromBytes(await connCAB.readLp(1024))
await allFutures(connABC.close(), connBCA.close(), connCAB.close())
await allFutures(switchA.stop(), switchB.stop(), switchC.stop())