Remove sleepAsync from tests/testswitch (#792)
This commit is contained in:
parent
dc13ff81d3
commit
4bce8f38c9
|
@ -105,16 +105,15 @@ proc bridgedConnections*: (Connection, Connection) =
|
|||
return (connA, connB)
|
||||
|
||||
|
||||
proc checkExpiringInternal(cond: proc(): bool {.raises: [Defect].} ): Future[bool] {.async, gcsafe.} =
|
||||
{.gcsafe.}:
|
||||
let start = Moment.now()
|
||||
while true:
|
||||
if Moment.now() > (start + chronos.seconds(5)):
|
||||
return false
|
||||
elif cond():
|
||||
return true
|
||||
else:
|
||||
await sleepAsync(1.millis)
|
||||
proc checkExpiringInternal(cond: proc(): bool {.raises: [Defect], gcsafe.} ): Future[bool] {.async, gcsafe.} =
|
||||
let start = Moment.now()
|
||||
while true:
|
||||
if Moment.now() > (start + chronos.seconds(5)):
|
||||
return false
|
||||
elif cond():
|
||||
return true
|
||||
else:
|
||||
await sleepAsync(1.millis)
|
||||
|
||||
template checkExpiring*(code: untyped): untyped =
|
||||
checkExpiringInternal(proc(): bool = code)
|
||||
check await checkExpiringInternal(proc(): bool = code)
|
||||
|
|
|
@ -351,7 +351,7 @@ suite "FloodSub":
|
|||
check (await smallNode[0].publish("foo", smallMessage1)) > 0
|
||||
check (await bigNode[0].publish("foo", smallMessage2)) > 0
|
||||
|
||||
check (await checkExpiring(messageReceived == 2)) == true
|
||||
checkExpiring: messageReceived == 2
|
||||
|
||||
check (await smallNode[0].publish("foo", bigMessage)) > 0
|
||||
check (await bigNode[0].publish("foo", bigMessage)) > 0
|
||||
|
|
|
@ -316,11 +316,10 @@ suite "GossipSub":
|
|||
let gossip1 = GossipSub(nodes[0])
|
||||
let gossip2 = GossipSub(nodes[1])
|
||||
|
||||
check await checkExpiring(
|
||||
checkExpiring:
|
||||
"foobar" in gossip2.topics and
|
||||
"foobar" in gossip1.gossipsub and
|
||||
gossip1.gossipsub.hasPeerId("foobar", gossip2.peerInfo.peerId)
|
||||
)
|
||||
|
||||
await allFuturesThrowing(
|
||||
nodes[0].switch.stop(),
|
||||
|
@ -463,7 +462,7 @@ suite "GossipSub":
|
|||
nodes[0].unsubscribe("foobar", handler)
|
||||
|
||||
let gsNode = GossipSub(nodes[1])
|
||||
check await checkExpiring(gsNode.mesh.getOrDefault("foobar").len == 0)
|
||||
checkExpiring: gsNode.mesh.getOrDefault("foobar").len == 0
|
||||
|
||||
nodes[0].subscribe("foobar", handler)
|
||||
|
||||
|
@ -582,7 +581,7 @@ suite "GossipSub":
|
|||
gossip1.seen = TimedCache[MessageId].init()
|
||||
gossip3.seen = TimedCache[MessageId].init()
|
||||
let msgId = toSeq(gossip2.validationSeen.keys)[0]
|
||||
check await checkExpiring(try: gossip2.validationSeen[msgId].len > 0 except: false)
|
||||
checkExpiring(try: gossip2.validationSeen[msgId].len > 0 except: false)
|
||||
result = ValidationResult.Accept
|
||||
bFinished.complete()
|
||||
|
||||
|
|
|
@ -202,8 +202,8 @@ suite "Identify":
|
|||
|
||||
await identifyPush2.push(switch2.peerInfo, conn)
|
||||
|
||||
check await checkExpiring(switch1.peerStore[ProtoBook][switch2.peerInfo.peerId] == switch2.peerInfo.protocols)
|
||||
check await checkExpiring(switch1.peerStore[AddressBook][switch2.peerInfo.peerId] == switch2.peerInfo.addrs)
|
||||
checkExpiring: switch1.peerStore[ProtoBook][switch2.peerInfo.peerId] == switch2.peerInfo.protocols
|
||||
checkExpiring: switch1.peerStore[AddressBook][switch2.peerInfo.peerId] == switch2.peerInfo.addrs
|
||||
|
||||
await closeAll()
|
||||
|
||||
|
|
|
@ -816,7 +816,7 @@ suite "Mplex":
|
|||
for i in 0..9:
|
||||
dialStreams.add((await mplexDial.newStream()))
|
||||
|
||||
check await checkExpiring(listenStreams.len == 10 and dialStreams.len == 10)
|
||||
checkExpiring: listenStreams.len == 10 and dialStreams.len == 10
|
||||
|
||||
await mplexListen.close()
|
||||
await allFuturesThrowing(
|
||||
|
@ -862,7 +862,7 @@ suite "Mplex":
|
|||
for i in 0..9:
|
||||
dialStreams.add((await mplexDial.newStream()))
|
||||
|
||||
check await checkExpiring(listenStreams.len == 10 and dialStreams.len == 10)
|
||||
checkExpiring: listenStreams.len == 10 and dialStreams.len == 10
|
||||
|
||||
mplexHandle.cancel()
|
||||
await allFuturesThrowing(
|
||||
|
@ -905,7 +905,7 @@ suite "Mplex":
|
|||
for i in 0..9:
|
||||
dialStreams.add((await mplexDial.newStream()))
|
||||
|
||||
check await checkExpiring(listenStreams.len == 10 and dialStreams.len == 10)
|
||||
checkExpiring: listenStreams.len == 10 and dialStreams.len == 10
|
||||
|
||||
await conn.close()
|
||||
await allFuturesThrowing(
|
||||
|
@ -951,7 +951,7 @@ suite "Mplex":
|
|||
for i in 0..9:
|
||||
dialStreams.add((await mplexDial.newStream()))
|
||||
|
||||
check await checkExpiring(listenStreams.len == 10 and dialStreams.len == 10)
|
||||
checkExpiring: listenStreams.len == 10 and dialStreams.len == 10
|
||||
|
||||
await listenConn.closeWithEOF()
|
||||
await allFuturesThrowing(
|
||||
|
|
|
@ -247,14 +247,12 @@ suite "Switch":
|
|||
await switch2.disconnect(switch1.peerInfo.peerId)
|
||||
|
||||
check not switch2.isConnected(switch1.peerInfo.peerId)
|
||||
check await(checkExpiring((not switch1.isConnected(switch2.peerInfo.peerId))))
|
||||
checkExpiring: not switch1.isConnected(switch2.peerInfo.peerId)
|
||||
|
||||
checkTracker(LPChannelTrackerName)
|
||||
checkTracker(SecureConnTrackerName)
|
||||
|
||||
await sleepAsync(1.seconds)
|
||||
|
||||
check:
|
||||
checkExpiring:
|
||||
startCounts ==
|
||||
@[
|
||||
switch1.connManager.inSema.count, switch1.connManager.outSema.count,
|
||||
|
@ -302,7 +300,7 @@ suite "Switch":
|
|||
await switch2.disconnect(switch1.peerInfo.peerId)
|
||||
|
||||
check not switch2.isConnected(switch1.peerInfo.peerId)
|
||||
check await(checkExpiring((not switch1.isConnected(switch2.peerInfo.peerId))))
|
||||
checkExpiring: not switch1.isConnected(switch2.peerInfo.peerId)
|
||||
|
||||
checkTracker(LPChannelTrackerName)
|
||||
checkTracker(SecureConnTrackerName)
|
||||
|
@ -354,7 +352,7 @@ suite "Switch":
|
|||
await switch2.disconnect(switch1.peerInfo.peerId)
|
||||
|
||||
check not switch2.isConnected(switch1.peerInfo.peerId)
|
||||
check await(checkExpiring((not switch1.isConnected(switch2.peerInfo.peerId))))
|
||||
checkExpiring: not switch1.isConnected(switch2.peerInfo.peerId)
|
||||
|
||||
checkTracker(LPChannelTrackerName)
|
||||
checkTracker(SecureConnTrackerName)
|
||||
|
@ -405,7 +403,7 @@ suite "Switch":
|
|||
await switch2.disconnect(switch1.peerInfo.peerId)
|
||||
|
||||
check not switch2.isConnected(switch1.peerInfo.peerId)
|
||||
check await(checkExpiring((not switch1.isConnected(switch2.peerInfo.peerId))))
|
||||
checkExpiring: not switch1.isConnected(switch2.peerInfo.peerId)
|
||||
|
||||
checkTracker(LPChannelTrackerName)
|
||||
checkTracker(SecureConnTrackerName)
|
||||
|
@ -456,7 +454,7 @@ suite "Switch":
|
|||
await switch2.disconnect(switch1.peerInfo.peerId)
|
||||
|
||||
check not switch2.isConnected(switch1.peerInfo.peerId)
|
||||
check await(checkExpiring((not switch1.isConnected(switch2.peerInfo.peerId))))
|
||||
checkExpiring: not switch1.isConnected(switch2.peerInfo.peerId)
|
||||
|
||||
checkTracker(LPChannelTrackerName)
|
||||
checkTracker(SecureConnTrackerName)
|
||||
|
@ -520,8 +518,8 @@ suite "Switch":
|
|||
|
||||
check not switch2.isConnected(switch1.peerInfo.peerId)
|
||||
check not switch3.isConnected(switch1.peerInfo.peerId)
|
||||
check await(checkExpiring((not switch1.isConnected(switch2.peerInfo.peerId))))
|
||||
check await(checkExpiring((not switch1.isConnected(switch3.peerInfo.peerId))))
|
||||
checkExpiring: not switch1.isConnected(switch2.peerInfo.peerId)
|
||||
checkExpiring: not switch1.isConnected(switch3.peerInfo.peerId)
|
||||
|
||||
checkTracker(LPChannelTrackerName)
|
||||
checkTracker(SecureConnTrackerName)
|
||||
|
@ -554,7 +552,6 @@ suite "Switch":
|
|||
await switches[0].disconnect(peerInfo.peerId) # trigger disconnect
|
||||
of ConnEventKind.Disconnected:
|
||||
check not switches[0].isConnected(peerInfo.peerId)
|
||||
await sleepAsync(1.millis)
|
||||
done.complete()
|
||||
|
||||
switches.add(newStandardSwitch(
|
||||
|
@ -571,8 +568,6 @@ suite "Switch":
|
|||
await onConnect
|
||||
|
||||
await done
|
||||
checkTracker(LPChannelTrackerName)
|
||||
checkTracker(SecureConnTrackerName)
|
||||
|
||||
await allFuturesThrowing(
|
||||
switches.mapIt( it.stop() ))
|
||||
|
@ -625,42 +620,6 @@ suite "Switch":
|
|||
await allFuturesThrowing(
|
||||
switches.mapIt( it.stop() ))
|
||||
|
||||
# TODO: we should be able to test cancellation
|
||||
# for most of the steps in the upgrade flow -
|
||||
# this is just a basic test for dials
|
||||
asyncTest "e2e canceling dial should not leak":
|
||||
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
|
||||
|
||||
let transport = TcpTransport.new(upgrade = Upgrade())
|
||||
await transport.start(ma)
|
||||
|
||||
proc acceptHandler() {.async, gcsafe.} =
|
||||
try:
|
||||
let conn = await transport.accept()
|
||||
discard await conn.readLp(100)
|
||||
await conn.close()
|
||||
except CatchableError:
|
||||
discard
|
||||
|
||||
let handlerWait = acceptHandler()
|
||||
let switch = newStandardSwitch(secureManagers = [SecureProtocol.Noise])
|
||||
|
||||
await switch.start()
|
||||
|
||||
var peerId = PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).get()
|
||||
let connectFut = switch.connect(peerId, transport.addrs)
|
||||
await sleepAsync(500.millis)
|
||||
connectFut.cancel()
|
||||
await handlerWait
|
||||
|
||||
checkTracker(LPChannelTrackerName)
|
||||
checkTracker(SecureConnTrackerName)
|
||||
checkTracker(ChronosStreamTrackerName)
|
||||
|
||||
await allFuturesThrowing(
|
||||
transport.stop(),
|
||||
switch.stop())
|
||||
|
||||
asyncTest "e2e closing remote conn should not leak":
|
||||
let ma = @[MultiAddress.init("/ip4/0.0.0.0/tcp/0").tryGet()]
|
||||
|
||||
|
@ -716,7 +675,7 @@ suite "Switch":
|
|||
|
||||
await allFuturesThrowing(readers)
|
||||
await switch2.stop() #Otherwise this leaks
|
||||
check await checkExpiring(not switch1.isConnected(switch2.peerInfo.peerId))
|
||||
checkExpiring: not switch1.isConnected(switch2.peerInfo.peerId)
|
||||
|
||||
checkTracker(LPChannelTrackerName)
|
||||
checkTracker(SecureConnTrackerName)
|
||||
|
|
Loading…
Reference in New Issue