nim-libp2p/tests/testyamux.nim

413 lines
13 KiB
Nim

{.used.}
# Nim-Libp2p
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import sugar
import chronos
import ../libp2p/[stream/connection, muxers/yamux/yamux], ./helpers
proc newBlockerFut(): Future[void] {.async: (raises: [], raw: true).} =
newFuture[void]()
suite "Yamux":
teardown:
checkTrackers()
template mSetup(
ws: int = YamuxDefaultWindowSize,
inTo: Duration = 5.minutes,
outTo: Duration = 5.minutes,
) {.inject.} =
#TODO in a template to avoid threadvar
let
(conna {.inject.}, connb {.inject.}) = bridgedConnections()
yamuxa {.inject.} =
Yamux.new(conna, windowSize = ws, inTimeout = inTo, outTimeout = outTo)
yamuxb {.inject.} =
Yamux.new(connb, windowSize = ws, inTimeout = inTo, outTimeout = outTo)
(handlera, handlerb) = (yamuxa.handle(), yamuxb.handle())
defer:
await allFutures(
conna.close(), connb.close(), yamuxa.close(), yamuxb.close(), handlera, handlerb
)
suite "Simple Reading/Writing yamux messages":
asyncTest "Roundtrip of small messages":
mSetup()
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
try:
check (await conn.readLp(100)) == fromHex("1234")
await conn.writeLp(fromHex("5678"))
except CancelledError, LPStreamError:
return
finally:
await conn.close()
let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
await streamA.writeLp(fromHex("1234"))
check (await streamA.readLp(100)) == fromHex("5678")
await streamA.close()
asyncTest "Continuing read after close":
mSetup()
let
readerBlocker = newBlockerFut()
handlerBlocker = newBlockerFut()
var numberOfRead = 0
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
await readerBlocker
try:
var buffer: array[25600, byte]
while (await conn.readOnce(addr buffer[0], 25600)) > 0:
numberOfRead.inc()
except CancelledError, LPStreamError:
return
finally:
await conn.close()
handlerBlocker.complete()
let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
await wait(streamA.write(newSeq[byte](256000)), 1.seconds) # shouldn't block
await streamA.close()
readerBlocker.complete()
await handlerBlocker
check:
numberOfRead == 10
suite "Window exhaustion":
asyncTest "Basic exhaustion blocking":
mSetup()
let readerBlocker = newBlockerFut()
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
await readerBlocker
try:
var buffer: array[160000, byte]
discard await conn.readOnce(addr buffer[0], 160000)
except CancelledError, LPStreamError:
return
finally:
await conn.close()
let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
await wait(streamA.write(newSeq[byte](256000)), 1.seconds) # shouldn't block
let secondWriter = streamA.write(newSeq[byte](20))
await sleepAsync(10.milliseconds)
check:
not secondWriter.finished()
readerBlocker.complete()
await wait(secondWriter, 1.seconds)
await streamA.close()
asyncTest "Exhaustion doesn't block other channels":
mSetup()
let readerBlocker = newBlockerFut()
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
await readerBlocker
try:
var buffer: array[160000, byte]
discard await conn.readOnce(addr buffer[0], 160000)
except CancelledError, LPStreamError:
return
finally:
await conn.close()
let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
await wait(streamA.write(newSeq[byte](256000)), 1.seconds) # shouldn't block
let secondWriter = streamA.write(newSeq[byte](20))
await sleepAsync(10.milliseconds)
# Now that the secondWriter is stuck, create a second stream
# and exchange some data
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
try:
check (await conn.readLp(100)) == fromHex("1234")
await conn.writeLp(fromHex("5678"))
except CancelledError, LPStreamError:
return
finally:
await conn.close()
let streamB = await yamuxa.newStream()
await streamB.writeLp(fromHex("1234"))
check (await streamB.readLp(100)) == fromHex("5678")
check:
not secondWriter.finished()
readerBlocker.complete()
await wait(secondWriter, 1.seconds)
await streamA.close()
await streamB.close()
asyncTest "Can set custom window size":
mSetup()
let writerBlocker = newBlockerFut()
var numberOfRead = 0
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
YamuxChannel(conn).setMaxRecvWindow(20)
try:
var buffer: array[256000, byte]
while (await conn.readOnce(addr buffer[0], 256000)) > 0:
numberOfRead.inc()
writerBlocker.complete()
except CancelledError, LPStreamError:
return
finally:
await conn.close()
let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
# Need to exhaust initial window first
await wait(streamA.write(newSeq[byte](256000)), 1.seconds) # shouldn't block
await streamA.write(newSeq[byte](142))
await streamA.close()
await writerBlocker
# 1 for initial exhaustion + (142 / 20) = 9
check numberOfRead == 9
asyncTest "Saturate until reset":
mSetup()
let writerBlocker = newBlockerFut()
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
await writerBlocker
try:
var buffer: array[256, byte]
check:
(await conn.readOnce(addr buffer[0], 256)) == 0
except CancelledError, LPStreamError:
return
finally:
await conn.close()
let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
await streamA.write(newSeq[byte](256000))
let wrFut = collect(newSeq):
for _ in 0 .. 3:
streamA.write(newSeq[byte](100000))
for i in 0 .. 3:
expect(LPStreamEOFError):
await wrFut[i]
writerBlocker.complete()
await streamA.close()
asyncTest "Increase window size":
mSetup(512000)
let readerBlocker = newBlockerFut()
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
await readerBlocker
try:
var buffer: array[260000, byte]
discard await conn.readOnce(addr buffer[0], 260000)
except CancelledError, LPStreamError:
return
finally:
await conn.close()
let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
await wait(streamA.write(newSeq[byte](512000)), 1.seconds) # shouldn't block
let secondWriter = streamA.write(newSeq[byte](10000))
await sleepAsync(10.milliseconds)
check:
not secondWriter.finished()
readerBlocker.complete()
await wait(secondWriter, 1.seconds)
await streamA.close()
asyncTest "Reduce window size":
mSetup(64000)
let readerBlocker1 = newBlockerFut()
let readerBlocker2 = newBlockerFut()
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
try:
await readerBlocker1
var buffer: array[256000, byte]
# For the first roundtrip, the send window size is assumed to be 256k
discard await conn.readOnce(addr buffer[0], 256000)
await readerBlocker2
discard await conn.readOnce(addr buffer[0], 40000)
except CancelledError, LPStreamError:
return
finally:
await conn.close()
let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
await wait(streamA.write(newSeq[byte](256000)), 1.seconds) # shouldn't block
let secondWriter = streamA.write(newSeq[byte](64000))
await sleepAsync(10.milliseconds)
check:
not secondWriter.finished()
readerBlocker1.complete()
await wait(secondWriter, 1.seconds)
let thirdWriter = streamA.write(newSeq[byte](10))
await sleepAsync(10.milliseconds)
check:
not thirdWriter.finished()
readerBlocker2.complete()
await wait(thirdWriter, 1.seconds)
await streamA.close()
suite "Timeout testing":
asyncTest "Check if InTimeout close both streams correctly":
mSetup(inTo = 1.seconds)
let blocker = newBlockerFut()
let connBlocker = newBlockerFut()
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
try:
check (await conn.readLp(100)) == fromHex("1234")
await conn.writeLp(fromHex("5678"))
await blocker
check conn.isClosed
connBlocker.complete()
except CancelledError, LPStreamError:
await conn.close()
let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
await streamA.writeLp(fromHex("1234"))
check (await streamA.readLp(100)) == fromHex("5678")
# wait for the timeout to happens, the sleep duration is set to 4 seconds
# as the timeout could be a bit long to trigger
await sleepAsync(4.seconds)
blocker.complete()
check streamA.isClosed
await connBlocker
asyncTest "Check if OutTimeout close both streams correctly":
mSetup(outTo = 1.seconds)
let blocker = newBlockerFut()
let connBlocker = newBlockerFut()
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
try:
check (await conn.readLp(100)) == fromHex("1234")
await conn.writeLp(fromHex("5678"))
await blocker
check conn.isClosed
connBlocker.complete()
except CancelledError, LPStreamError:
await conn.close()
let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
await streamA.writeLp(fromHex("1234"))
check (await streamA.readLp(100)) == fromHex("5678")
# wait for the timeout to happens, the sleep duration is set to 4 seconds
# as the timeout could be a bit long to trigger
await sleepAsync(4.seconds)
blocker.complete()
check streamA.isClosed
await connBlocker
suite "Exception testing":
asyncTest "Local & Remote close":
mSetup()
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
try:
check (await conn.readLp(100)) == fromHex("1234")
except CancelledError, LPStreamError:
return
finally:
await conn.close()
expect LPStreamClosedError:
await conn.writeLp(fromHex("102030"))
try:
check (await conn.readLp(100)) == fromHex("5678")
except CancelledError, LPStreamError:
return
let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
await streamA.writeLp(fromHex("1234"))
expect LPStreamRemoteClosedError:
discard await streamA.readLp(100)
await streamA.writeLp(fromHex("5678"))
await streamA.close()
asyncTest "Local & Remote reset":
mSetup()
let blocker = newBlockerFut()
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
await blocker
try:
expect LPStreamResetError:
discard await conn.readLp(100)
expect LPStreamResetError:
await conn.writeLp(fromHex("1234"))
except CancelledError, LPStreamError:
return
finally:
await conn.close()
let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
await yamuxa.close()
expect LPStreamClosedError:
await streamA.writeLp(fromHex("1234"))
expect LPStreamClosedError:
discard await streamA.readLp(100)
blocker.complete()
await streamA.close()
asyncTest "Peer must be able to read from stream after closing it for writing":
mSetup()
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
try:
check (await conn.readLp(100)) == fromHex("1234")
except CancelledError, LPStreamError:
return
try:
await conn.writeLp(fromHex("5678"))
except CancelledError, LPStreamError:
return
await conn.close()
let streamA = await yamuxa.newStream()
check streamA == yamuxa.getStreams()[0]
await streamA.writeLp(fromHex("1234"))
await streamA.close()
check (await streamA.readLp(100)) == fromHex("5678")