380 lines
12 KiB
Nim
380 lines
12 KiB
Nim
{.used.}
|
|
|
|
# Nim-Libp2p
|
|
# Copyright (c) 2023-2024 Status Research & Development GmbH
|
|
# Licensed under either of
|
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
|
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
|
|
# at your option.
|
|
# This file may not be copied, modified, or distributed except according to
|
|
# those terms.
|
|
|
|
import sugar
|
|
import chronos
|
|
import
|
|
../libp2p/[
|
|
stream/connection,
|
|
muxers/yamux/yamux
|
|
],
|
|
./helpers
|
|
|
|
proc newBlockerFut(): Future[void] {.async: (raises: [], raw: true).} =
|
|
newFuture[void]()
|
|
|
|
suite "Yamux":
|
|
teardown:
|
|
checkTrackers()
|
|
|
|
template mSetup(ws: int = YamuxDefaultWindowSize,
|
|
inTo: Duration = 5.minutes,
|
|
outTo: Duration = 5.minutes) {.inject.} =
|
|
#TODO in a template to avoid threadvar
|
|
let
|
|
(conna {.inject.}, connb {.inject.}) = bridgedConnections()
|
|
yamuxa {.inject.} = Yamux.new(conna, windowSize = ws, inTimeout = inTo, outTimeout = outTo)
|
|
yamuxb {.inject.} = Yamux.new(connb, windowSize = ws, inTimeout = inTo, outTimeout = outTo)
|
|
(handlera, handlerb) = (yamuxa.handle(), yamuxb.handle())
|
|
|
|
defer:
|
|
await allFutures(
|
|
conna.close(), connb.close(),
|
|
yamuxa.close(), yamuxb.close(),
|
|
handlera, handlerb)
|
|
|
|
suite "Simple Reading/Writing yamux messages":
|
|
asyncTest "Roundtrip of small messages":
|
|
mSetup()
|
|
|
|
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
|
|
try:
|
|
check (await conn.readLp(100)) == fromHex("1234")
|
|
await conn.writeLp(fromHex("5678"))
|
|
except CancelledError, LPStreamError:
|
|
return
|
|
finally:
|
|
await conn.close()
|
|
|
|
let streamA = await yamuxa.newStream()
|
|
check streamA == yamuxa.getStreams()[0]
|
|
|
|
await streamA.writeLp(fromHex("1234"))
|
|
check (await streamA.readLp(100)) == fromHex("5678")
|
|
await streamA.close()
|
|
|
|
asyncTest "Continuing read after close":
|
|
mSetup()
|
|
let
|
|
readerBlocker = newBlockerFut()
|
|
handlerBlocker = newBlockerFut()
|
|
var numberOfRead = 0
|
|
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
|
|
await readerBlocker
|
|
try:
|
|
var buffer: array[25600, byte]
|
|
while (await conn.readOnce(addr buffer[0], 25600)) > 0:
|
|
numberOfRead.inc()
|
|
except CancelledError, LPStreamError:
|
|
return
|
|
finally:
|
|
await conn.close()
|
|
handlerBlocker.complete()
|
|
|
|
let streamA = await yamuxa.newStream()
|
|
check streamA == yamuxa.getStreams()[0]
|
|
|
|
await wait(streamA.write(newSeq[byte](256000)), 1.seconds) # shouldn't block
|
|
await streamA.close()
|
|
readerBlocker.complete()
|
|
await handlerBlocker
|
|
check: numberOfRead == 10
|
|
|
|
suite "Window exhaustion":
|
|
asyncTest "Basic exhaustion blocking":
|
|
mSetup()
|
|
let readerBlocker = newBlockerFut()
|
|
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
|
|
await readerBlocker
|
|
try:
|
|
var buffer: array[160000, byte]
|
|
discard await conn.readOnce(addr buffer[0], 160000)
|
|
except CancelledError, LPStreamError:
|
|
return
|
|
finally:
|
|
await conn.close()
|
|
|
|
let streamA = await yamuxa.newStream()
|
|
check streamA == yamuxa.getStreams()[0]
|
|
|
|
await wait(streamA.write(newSeq[byte](256000)), 1.seconds) # shouldn't block
|
|
|
|
let secondWriter = streamA.write(newSeq[byte](20))
|
|
await sleepAsync(10.milliseconds)
|
|
check: not secondWriter.finished()
|
|
|
|
readerBlocker.complete()
|
|
await wait(secondWriter, 1.seconds)
|
|
|
|
await streamA.close()
|
|
|
|
asyncTest "Exhaustion doesn't block other channels":
|
|
mSetup()
|
|
let readerBlocker = newBlockerFut()
|
|
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
|
|
await readerBlocker
|
|
try:
|
|
var buffer: array[160000, byte]
|
|
discard await conn.readOnce(addr buffer[0], 160000)
|
|
except CancelledError, LPStreamError:
|
|
return
|
|
finally:
|
|
await conn.close()
|
|
|
|
let streamA = await yamuxa.newStream()
|
|
check streamA == yamuxa.getStreams()[0]
|
|
|
|
await wait(streamA.write(newSeq[byte](256000)), 1.seconds) # shouldn't block
|
|
|
|
let secondWriter = streamA.write(newSeq[byte](20))
|
|
await sleepAsync(10.milliseconds)
|
|
|
|
# Now that the secondWriter is stuck, create a second stream
|
|
# and exchange some data
|
|
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
|
|
try:
|
|
check (await conn.readLp(100)) == fromHex("1234")
|
|
await conn.writeLp(fromHex("5678"))
|
|
except CancelledError, LPStreamError:
|
|
return
|
|
finally:
|
|
await conn.close()
|
|
|
|
let streamB = await yamuxa.newStream()
|
|
await streamB.writeLp(fromHex("1234"))
|
|
check (await streamB.readLp(100)) == fromHex("5678")
|
|
check: not secondWriter.finished()
|
|
readerBlocker.complete()
|
|
|
|
await wait(secondWriter, 1.seconds)
|
|
await streamA.close()
|
|
await streamB.close()
|
|
|
|
asyncTest "Can set custom window size":
|
|
mSetup()
|
|
|
|
let writerBlocker = newBlockerFut()
|
|
var numberOfRead = 0
|
|
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
|
|
YamuxChannel(conn).setMaxRecvWindow(20)
|
|
try:
|
|
var buffer: array[256000, byte]
|
|
while (await conn.readOnce(addr buffer[0], 256000)) > 0:
|
|
numberOfRead.inc()
|
|
writerBlocker.complete()
|
|
except CancelledError, LPStreamError:
|
|
return
|
|
finally:
|
|
await conn.close()
|
|
|
|
let streamA = await yamuxa.newStream()
|
|
check streamA == yamuxa.getStreams()[0]
|
|
|
|
# Need to exhaust initial window first
|
|
await wait(streamA.write(newSeq[byte](256000)), 1.seconds) # shouldn't block
|
|
await streamA.write(newSeq[byte](142))
|
|
await streamA.close()
|
|
|
|
await writerBlocker
|
|
|
|
# 1 for initial exhaustion + (142 / 20) = 9
|
|
check numberOfRead == 9
|
|
|
|
asyncTest "Saturate until reset":
|
|
mSetup()
|
|
let writerBlocker = newBlockerFut()
|
|
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
|
|
await writerBlocker
|
|
try:
|
|
var buffer: array[256, byte]
|
|
check: (await conn.readOnce(addr buffer[0], 256)) == 0
|
|
except CancelledError, LPStreamError:
|
|
return
|
|
finally:
|
|
await conn.close()
|
|
|
|
let streamA = await yamuxa.newStream()
|
|
check streamA == yamuxa.getStreams()[0]
|
|
|
|
await streamA.write(newSeq[byte](256000))
|
|
let wrFut = collect(newSeq):
|
|
for _ in 0..3:
|
|
streamA.write(newSeq[byte](100000))
|
|
for i in 0..3:
|
|
expect(LPStreamEOFError): await wrFut[i]
|
|
writerBlocker.complete()
|
|
await streamA.close()
|
|
|
|
asyncTest "Increase window size":
|
|
mSetup(512000)
|
|
let readerBlocker = newBlockerFut()
|
|
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
|
|
await readerBlocker
|
|
try:
|
|
var buffer: array[260000, byte]
|
|
discard await conn.readOnce(addr buffer[0], 260000)
|
|
except CancelledError, LPStreamError:
|
|
return
|
|
finally:
|
|
await conn.close()
|
|
|
|
let streamA = await yamuxa.newStream()
|
|
check streamA == yamuxa.getStreams()[0]
|
|
|
|
await wait(streamA.write(newSeq[byte](512000)), 1.seconds) # shouldn't block
|
|
|
|
let secondWriter = streamA.write(newSeq[byte](10000))
|
|
await sleepAsync(10.milliseconds)
|
|
check: not secondWriter.finished()
|
|
|
|
readerBlocker.complete()
|
|
await wait(secondWriter, 1.seconds)
|
|
|
|
await streamA.close()
|
|
|
|
asyncTest "Reduce window size":
|
|
mSetup(64000)
|
|
let readerBlocker1 = newBlockerFut()
|
|
let readerBlocker2 = newBlockerFut()
|
|
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
|
|
try:
|
|
await readerBlocker1
|
|
var buffer: array[256000, byte]
|
|
# For the first roundtrip, the send window size is assumed to be 256k
|
|
discard await conn.readOnce(addr buffer[0], 256000)
|
|
await readerBlocker2
|
|
discard await conn.readOnce(addr buffer[0], 40000)
|
|
except CancelledError, LPStreamError:
|
|
return
|
|
finally:
|
|
await conn.close()
|
|
|
|
let streamA = await yamuxa.newStream()
|
|
check streamA == yamuxa.getStreams()[0]
|
|
|
|
await wait(streamA.write(newSeq[byte](256000)), 1.seconds) # shouldn't block
|
|
|
|
let secondWriter = streamA.write(newSeq[byte](64000))
|
|
await sleepAsync(10.milliseconds)
|
|
check: not secondWriter.finished()
|
|
|
|
readerBlocker1.complete()
|
|
await wait(secondWriter, 1.seconds)
|
|
|
|
let thirdWriter = streamA.write(newSeq[byte](10))
|
|
await sleepAsync(10.milliseconds)
|
|
check: not thirdWriter.finished()
|
|
|
|
readerBlocker2.complete()
|
|
await wait(thirdWriter, 1.seconds)
|
|
await streamA.close()
|
|
|
|
suite "Timeout testing":
|
|
asyncTest "Check if InTimeout close both streams correctly":
|
|
mSetup(inTo = 1.seconds)
|
|
let blocker = newBlockerFut()
|
|
let connBlocker = newBlockerFut()
|
|
|
|
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
|
|
try:
|
|
check (await conn.readLp(100)) == fromHex("1234")
|
|
await conn.writeLp(fromHex("5678"))
|
|
await blocker
|
|
check conn.isClosed
|
|
connBlocker.complete()
|
|
except CancelledError, LPStreamError:
|
|
await conn.close()
|
|
|
|
let streamA = await yamuxa.newStream()
|
|
check streamA == yamuxa.getStreams()[0]
|
|
await streamA.writeLp(fromHex("1234"))
|
|
check (await streamA.readLp(100)) == fromHex("5678")
|
|
# wait for the timeout to happens, the sleep duration is set to 4 seconds
|
|
# as the timeout could be a bit long to trigger
|
|
await sleepAsync(4.seconds)
|
|
blocker.complete()
|
|
check streamA.isClosed
|
|
await connBlocker
|
|
|
|
asyncTest "Check if OutTimeout close both streams correctly":
|
|
mSetup(outTo = 1.seconds)
|
|
let blocker = newBlockerFut()
|
|
let connBlocker = newBlockerFut()
|
|
|
|
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
|
|
try:
|
|
check (await conn.readLp(100)) == fromHex("1234")
|
|
await conn.writeLp(fromHex("5678"))
|
|
await blocker
|
|
check conn.isClosed
|
|
connBlocker.complete()
|
|
except CancelledError, LPStreamError:
|
|
await conn.close()
|
|
|
|
let streamA = await yamuxa.newStream()
|
|
check streamA == yamuxa.getStreams()[0]
|
|
await streamA.writeLp(fromHex("1234"))
|
|
check (await streamA.readLp(100)) == fromHex("5678")
|
|
# wait for the timeout to happens, the sleep duration is set to 4 seconds
|
|
# as the timeout could be a bit long to trigger
|
|
await sleepAsync(4.seconds)
|
|
blocker.complete()
|
|
check streamA.isClosed
|
|
await connBlocker
|
|
|
|
suite "Exception testing":
|
|
asyncTest "Local & Remote close":
|
|
mSetup()
|
|
|
|
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
|
|
try:
|
|
check (await conn.readLp(100)) == fromHex("1234")
|
|
except CancelledError, LPStreamError:
|
|
return
|
|
finally:
|
|
await conn.close()
|
|
expect LPStreamClosedError: await conn.writeLp(fromHex("102030"))
|
|
try:
|
|
check (await conn.readLp(100)) == fromHex("5678")
|
|
except CancelledError, LPStreamError:
|
|
return
|
|
|
|
let streamA = await yamuxa.newStream()
|
|
check streamA == yamuxa.getStreams()[0]
|
|
|
|
await streamA.writeLp(fromHex("1234"))
|
|
expect LPStreamRemoteClosedError: discard await streamA.readLp(100)
|
|
await streamA.writeLp(fromHex("5678"))
|
|
await streamA.close()
|
|
|
|
asyncTest "Local & Remote reset":
|
|
mSetup()
|
|
let blocker = newBlockerFut()
|
|
|
|
yamuxb.streamHandler = proc(conn: Connection) {.async: (raises: []).} =
|
|
await blocker
|
|
try:
|
|
expect LPStreamResetError: discard await conn.readLp(100)
|
|
expect LPStreamResetError: await conn.writeLp(fromHex("1234"))
|
|
except CancelledError, LPStreamError:
|
|
return
|
|
finally:
|
|
await conn.close()
|
|
|
|
let streamA = await yamuxa.newStream()
|
|
check streamA == yamuxa.getStreams()[0]
|
|
|
|
await yamuxa.close()
|
|
expect LPStreamClosedError: await streamA.writeLp(fromHex("1234"))
|
|
expect LPStreamClosedError: discard await streamA.readLp(100)
|
|
blocker.complete()
|
|
await streamA.close()
|