2023-06-07 13:12:49 +02:00
|
|
|
{.push raises: [].}
|
2020-09-21 19:48:19 +02:00
|
|
|
|
2022-06-16 10:08:52 +02:00
|
|
|
import chronos
|
2024-02-09 11:51:27 +01:00
|
|
|
import macros
|
2023-03-31 00:16:39 +02:00
|
|
|
import algorithm
|
2020-05-08 22:10:06 +02:00
|
|
|
|
|
|
|
import ../libp2p/transports/tcptransport
|
|
|
|
import ../libp2p/stream/bufferstream
|
2020-07-07 13:14:11 +02:00
|
|
|
import ../libp2p/crypto/crypto
|
2020-06-19 11:29:43 -06:00
|
|
|
import ../libp2p/stream/lpstream
|
2021-07-13 13:53:08 +02:00
|
|
|
import ../libp2p/stream/chronosstream
|
2020-11-04 21:52:54 -06:00
|
|
|
import ../libp2p/muxers/mplex/lpchannel
|
|
|
|
import ../libp2p/protocols/secure/secure
|
2023-06-13 17:58:41 +02:00
|
|
|
import ../libp2p/switch
|
|
|
|
import ../libp2p/nameresolving/[nameresolver, mockresolver]
|
2020-05-08 22:10:06 +02:00
|
|
|
|
2024-03-01 18:06:26 +01:00
|
|
|
import "."/[asyncunit, errorhelpers]
|
|
|
|
export asyncunit, errorhelpers, mockresolver
|
2022-01-05 16:27:33 +01:00
|
|
|
|
2020-05-08 22:10:06 +02:00
|
|
|
const
|
|
|
|
StreamTransportTrackerName = "stream.transport"
|
|
|
|
StreamServerTrackerName = "stream.server"
|
2021-08-18 09:40:12 +02:00
|
|
|
DgramTransportTrackerName = "datagram.transport"
|
2020-05-08 22:10:06 +02:00
|
|
|
|
|
|
|
trackerNames = [
|
2020-11-04 21:52:54 -06:00
|
|
|
LPStreamTrackerName,
|
2020-09-21 19:48:19 +02:00
|
|
|
ConnectionTrackerName,
|
2020-11-04 21:52:54 -06:00
|
|
|
LPChannelTrackerName,
|
|
|
|
SecureConnTrackerName,
|
2020-05-08 22:10:06 +02:00
|
|
|
BufferStreamTrackerName,
|
|
|
|
TcpTransportTrackerName,
|
|
|
|
StreamTransportTrackerName,
|
2021-08-18 09:40:12 +02:00
|
|
|
StreamServerTrackerName,
|
|
|
|
DgramTransportTrackerName,
|
|
|
|
ChronosStreamTrackerName
|
2020-05-08 22:10:06 +02:00
|
|
|
]
|
|
|
|
|
2020-11-06 09:24:24 -06:00
|
|
|
template checkTracker*(name: string) =
|
2024-03-03 18:13:37 +01:00
|
|
|
if isCounterLeaked(name):
|
|
|
|
let
|
|
|
|
tracker = getTrackerCounter(name)
|
|
|
|
trackerDescription =
|
|
|
|
"Opened " & name & ": " & $tracker.opened & "\n" &
|
|
|
|
"Closed " & name & ": " & $tracker.closed
|
|
|
|
checkpoint trackerDescription
|
2020-11-06 09:24:24 -06:00
|
|
|
fail()
|
|
|
|
|
2020-09-21 19:48:19 +02:00
|
|
|
template checkTrackers*() =
|
2024-03-03 18:13:37 +01:00
|
|
|
for name in trackerNames:
|
|
|
|
checkTracker(name)
|
2020-11-04 23:24:41 +09:00
|
|
|
# Also test the GC is not fooling with us
|
2023-05-18 10:24:17 +02:00
|
|
|
when defined(nimHasWarnBareExcept):
|
|
|
|
{.push warning[BareExcept]:off.}
|
2022-01-05 16:27:33 +01:00
|
|
|
try:
|
|
|
|
GC_fullCollect()
|
2023-05-18 10:24:17 +02:00
|
|
|
except:
|
|
|
|
discard
|
|
|
|
when defined(nimHasWarnBareExcept):
|
|
|
|
{.pop.}
|
2020-09-21 19:48:19 +02:00
|
|
|
|
2020-07-07 13:14:11 +02:00
|
|
|
type RngWrap = object
|
2022-06-16 10:08:52 +02:00
|
|
|
rng: ref HmacDrbgContext
|
2020-07-07 13:14:11 +02:00
|
|
|
|
|
|
|
var rngVar: RngWrap
|
|
|
|
|
2022-06-16 10:08:52 +02:00
|
|
|
proc getRng(): ref HmacDrbgContext =
|
2020-07-07 13:14:11 +02:00
|
|
|
# TODO if `rngVar` is a threadvar like it should be, there are random and
|
|
|
|
# spurious compile failures on mac - this is not gcsafe but for the
|
|
|
|
# purpose of the tests, it's ok as long as we only use a single thread
|
|
|
|
{.gcsafe.}:
|
|
|
|
if rngVar.rng.isNil:
|
|
|
|
rngVar.rng = newRng()
|
|
|
|
rngVar.rng
|
|
|
|
|
2022-06-16 10:08:52 +02:00
|
|
|
template rng*(): ref HmacDrbgContext =
|
2020-07-07 13:14:11 +02:00
|
|
|
getRng()
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 08:19:13 +02:00
|
|
|
|
|
|
|
type
|
2024-03-05 08:06:27 +01:00
|
|
|
WriteHandler* = proc(
|
|
|
|
data: seq[byte]
|
|
|
|
): Future[void] {.async: (raises: [CancelledError, LPStreamError]).}
|
|
|
|
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 08:19:13 +02:00
|
|
|
TestBufferStream* = ref object of BufferStream
|
|
|
|
writeHandler*: WriteHandler
|
|
|
|
|
2024-03-05 08:06:27 +01:00
|
|
|
method write*(
|
|
|
|
s: TestBufferStream,
|
|
|
|
msg: seq[byte]
|
|
|
|
): Future[void] {.async: (raises: [
|
|
|
|
CancelledError, LPStreamError], raw: true).} =
|
refactor bufferstream to use a queue (#346)
This change modifies how the backpressure algorithm in bufferstream
works - in particular, instead of working byte-by-byte, it will now work
seq-by-seq.
When data arrives, it usually does so in packets - in the current
bufferstream, the packet is read then split into bytes which are fed one
by one to the bufferstream. On the reading side, the bytes are popped of
the bufferstream, again byte by byte, to satisfy `readOnce` requests -
this introduces a lot of synchronization traffic because the checks for
full buffer and for async event handling must be done for every byte.
In this PR, a queue of length 1 is used instead - this means there will
at most exist one "packet" in `pushTo`, one in the queue and one in the
slush buffer that is used to store incomplete reads.
* avoid byte-by-byte copy to buffer, with synchronization in-between
* reuse AsyncQueue synchronization logic instead of rolling own
* avoid writeHandler callback - implement `write` method instead
* simplify EOF signalling by only setting EOF flag in queue reader (and
reset)
* remove BufferStream pipes (unused)
* fixes drainBuffer deadlock when drain is called from within read loop
and thus blocks draining
* fix lpchannel init order
2020-09-10 08:19:13 +02:00
|
|
|
s.writeHandler(msg)
|
|
|
|
|
2023-04-03 11:05:01 +02:00
|
|
|
method getWrapped*(s: TestBufferStream): Connection = nil
|
|
|
|
|
2021-06-07 09:32:08 +02:00
|
|
|
proc new*(T: typedesc[TestBufferStream], writeHandler: WriteHandler): T =
|
|
|
|
let testBufferStream = T(writeHandler: writeHandler)
|
|
|
|
testBufferStream.initStream()
|
|
|
|
testBufferStream
|
|
|
|
|
2022-07-04 15:19:21 +02:00
|
|
|
proc bridgedConnections*: (Connection, Connection) =
|
|
|
|
let
|
|
|
|
connA = TestBufferStream()
|
|
|
|
connB = TestBufferStream()
|
|
|
|
connA.dir = Direction.Out
|
|
|
|
connB.dir = Direction.In
|
|
|
|
connA.initStream()
|
|
|
|
connB.initStream()
|
2024-03-05 08:06:27 +01:00
|
|
|
connA.writeHandler =
|
|
|
|
proc(data: seq[byte]) {.async: (raises: [
|
|
|
|
CancelledError, LPStreamError], raw: true).} =
|
|
|
|
connB.pushData(data)
|
|
|
|
|
|
|
|
connB.writeHandler =
|
|
|
|
proc(data: seq[byte]) {.async: (raises: [
|
|
|
|
CancelledError, LPStreamError], raw: true).} =
|
|
|
|
connA.pushData(data)
|
2022-07-04 15:19:21 +02:00
|
|
|
return (connA, connB)
|
|
|
|
|
2024-02-09 11:51:27 +01:00
|
|
|
macro checkUntilCustomTimeout*(timeout: Duration, code: untyped): untyped =
|
|
|
|
## Periodically checks a given condition until it is true or a timeout occurs.
|
|
|
|
##
|
|
|
|
## `code`: untyped - A condition expression that should eventually evaluate to true.
|
|
|
|
## `timeout`: Duration - The maximum duration to wait for the condition to be true.
|
|
|
|
##
|
|
|
|
## Examples:
|
|
|
|
## ```nim
|
|
|
|
## # Example 1:
|
|
|
|
## asyncTest "checkUntilCustomTimeout should pass if the condition is true":
|
|
|
|
## let a = 2
|
|
|
|
## let b = 2
|
|
|
|
## checkUntilCustomTimeout(2.seconds):
|
|
|
|
## a == b
|
|
|
|
##
|
|
|
|
## # Example 2: Multiple conditions
|
|
|
|
## asyncTest "checkUntilCustomTimeout should pass if the conditions are true":
|
|
|
|
## let a = 2
|
|
|
|
## let b = 2
|
|
|
|
## checkUntilCustomTimeout(5.seconds)::
|
|
|
|
## a == b
|
|
|
|
## a == 2
|
|
|
|
## b == 1
|
|
|
|
## ```
|
|
|
|
# Helper proc to recursively build a combined boolean expression
|
|
|
|
proc buildAndExpr(n: NimNode): NimNode =
|
|
|
|
if n.kind == nnkStmtList and n.len > 0:
|
|
|
|
var combinedExpr = n[0] # Start with the first expression
|
|
|
|
for i in 1..<n.len:
|
|
|
|
# Combine the current expression with the next using 'and'
|
|
|
|
combinedExpr = newCall("and", combinedExpr, n[i])
|
|
|
|
return combinedExpr
|
2022-10-28 01:10:24 +02:00
|
|
|
else:
|
2024-02-09 11:51:27 +01:00
|
|
|
return n
|
|
|
|
|
|
|
|
# Build the combined expression
|
|
|
|
let combinedBoolExpr = buildAndExpr(code)
|
|
|
|
|
|
|
|
result = quote do:
|
|
|
|
proc checkExpiringInternal(): Future[void] {.gensym, async.} =
|
|
|
|
let start = Moment.now()
|
|
|
|
while true:
|
|
|
|
if Moment.now() > (start + `timeout`):
|
|
|
|
checkpoint("[TIMEOUT] Timeout was reached and the conditions were not true. Check if the code is working as " &
|
|
|
|
"expected or consider increasing the timeout param.")
|
|
|
|
check `code`
|
|
|
|
return
|
|
|
|
else:
|
|
|
|
if `combinedBoolExpr`:
|
|
|
|
return
|
|
|
|
else:
|
|
|
|
await sleepAsync(1.millis)
|
|
|
|
await checkExpiringInternal()
|
|
|
|
|
|
|
|
macro checkUntilTimeout*(code: untyped): untyped =
|
|
|
|
## Same as `checkUntilCustomTimeout` but with a default timeout of 10 seconds.
|
|
|
|
##
|
|
|
|
## Examples:
|
|
|
|
## ```nim
|
|
|
|
## # Example 1:
|
|
|
|
## asyncTest "checkUntilTimeout should pass if the condition is true":
|
|
|
|
## let a = 2
|
|
|
|
## let b = 2
|
|
|
|
## checkUntilTimeout:
|
|
|
|
## a == b
|
|
|
|
##
|
|
|
|
## # Example 2: Multiple conditions
|
|
|
|
## asyncTest "checkUntilTimeout should pass if the conditions are true":
|
|
|
|
## let a = 2
|
|
|
|
## let b = 2
|
|
|
|
## checkUntilTimeout:
|
|
|
|
## a == b
|
|
|
|
## a == 2
|
|
|
|
## b == 1
|
|
|
|
## ```
|
|
|
|
result = quote do:
|
|
|
|
checkUntilCustomTimeout(10.seconds, `code`)
|
2023-03-31 00:16:39 +02:00
|
|
|
|
|
|
|
proc unorderedCompare*[T](a, b: seq[T]): bool =
|
|
|
|
if a == b:
|
|
|
|
return true
|
|
|
|
if a.len != b.len:
|
|
|
|
return false
|
|
|
|
|
|
|
|
var aSorted = a
|
|
|
|
var bSorted = b
|
|
|
|
aSorted.sort()
|
|
|
|
bSorted.sort()
|
|
|
|
|
|
|
|
if aSorted == bSorted:
|
|
|
|
return true
|
|
|
|
|
2023-06-07 13:12:49 +02:00
|
|
|
return false
|
2023-06-13 17:58:41 +02:00
|
|
|
|
|
|
|
proc default*(T: typedesc[MockResolver]): T =
|
|
|
|
let resolver = MockResolver.new()
|
|
|
|
resolver.ipResponses[("localhost", false)] = @["127.0.0.1"]
|
|
|
|
resolver.ipResponses[("localhost", true)] = @["::1"]
|
|
|
|
resolver
|