From f36d708e769467aca64b3915f8397cd3cd3dd797 Mon Sep 17 00:00:00 2001 From: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com> Date: Fri, 3 Feb 2023 10:00:26 +0200 Subject: [PATCH] fix: improve filter test race condition (#1529) * fix: improve filter test race condition * fix: missed one sleep --- tests/v2/test_waku_filter.nim | 582 +++++++++++++++++----------------- 1 file changed, 291 insertions(+), 291 deletions(-) diff --git a/tests/v2/test_waku_filter.nim b/tests/v2/test_waku_filter.nim index df48584b2..1629b7299 100644 --- a/tests/v2/test_waku_filter.nim +++ b/tests/v2/test_waku_filter.nim @@ -1,291 +1,291 @@ -{.used.} - -import - std/[options, tables], - testutils/unittests, - chronicles, - chronos, - libp2p/crypto/crypto -import - ../../waku/v2/node/peer_manager/peer_manager, - ../../waku/v2/protocol/waku_message, - ../../waku/v2/protocol/waku_filter, - ../../waku/v2/protocol/waku_filter/client, - ./utils, - ./testlib/common, - ./testlib/switch - - -proc newTestWakuFilterNode(switch: Switch, timeout: Duration = 2.hours): Future[WakuFilter] {.async.} = - let - peerManager = PeerManager.new(switch) - rng = crypto.newRng() - proto = WakuFilter.new(peerManager, rng, timeout) - - await proto.start() - switch.mount(proto) - - return proto - -proc newTestWakuFilterClient(switch: Switch): Future[WakuFilterClient] {.async.} = - let - peerManager = PeerManager.new(switch) - rng = crypto.newRng() - proto = WakuFilterClient.new(peerManager, rng) - - await proto.start() - switch.mount(proto) - - return proto - - -# TODO: Extend test coverage -suite "Waku Filter": - asyncTest "should forward messages to client after subscribed": - ## Setup - let - serverSwitch = newTestSwitch() - clientSwitch = newTestSwitch() - - await allFutures(serverSwitch.start(), clientSwitch.start()) - - let - server = await newTestWakuFilterNode(serverSwitch) - client = await newTestWakuFilterClient(clientSwitch) - - ## Given - let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo() - - let pushHandlerFuture = newFuture[(string, WakuMessage)]() - proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.} = - pushHandlerFuture.complete((pubsubTopic, message)) - - let - pubsubTopic = DefaultPubsubTopic - contentTopic = "test-content-topic" - msg = fakeWakuMessage(contentTopic=contentTopic) - - ## When - require (await client.subscribe(pubsubTopic, contentTopic, pushHandler, peer=serverAddr)).isOk() - - # WARN: Sleep necessary to avoid a race condition between the subscription and the handle message proc - await sleepAsync(5.milliseconds) - - await server.handleMessage(pubsubTopic, msg) - - require await pushHandlerFuture.withTimeout(5.seconds) - - ## Then - let (pushedMsgPubsubTopic, pushedMsg) = pushHandlerFuture.read() - check: - pushedMsgPubsubTopic == pubsubTopic - pushedMsg == msg - - ## Cleanup - await allFutures(clientSwitch.stop(), serverSwitch.stop()) - - asyncTest "should not forward messages to client after unsuscribed": - ## Setup - let - serverSwitch = newTestSwitch() - clientSwitch = newTestSwitch() - - await allFutures(serverSwitch.start(), clientSwitch.start()) - - let - server = await newTestWakuFilterNode(serverSwitch) - client = await newTestWakuFilterClient(clientSwitch) - - ## Given - let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo() - - var pushHandlerFuture = newFuture[void]() - proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.} = - pushHandlerFuture.complete() - - let - pubsubTopic = DefaultPubsubTopic - contentTopic = "test-content-topic" - msg = fakeWakuMessage(contentTopic=contentTopic) - - ## When - require (await client.subscribe(pubsubTopic, contentTopic, pushHandler, peer=serverAddr)).isOk() - - # WARN: Sleep necessary to avoid a race condition between the subscription and the handle message proc - await sleepAsync(5.milliseconds) - - await server.handleMessage(pubsubTopic, msg) - - require await pushHandlerFuture.withTimeout(1.seconds) - - # Reset to test unsubscribe - pushHandlerFuture = newFuture[void]() - - require (await client.unsubscribe(pubsubTopic, contentTopic, peer=serverAddr)).isOk() - - # WARN: Sleep necessary to avoid a race condition between the unsubscription and the handle message proc - await sleepAsync(5.milliseconds) - - await server.handleMessage(pubsubTopic, msg) - - ## Then - let handlerWasCalledAfterUnsubscription = await pushHandlerFuture.withTimeout(1.seconds) - check: - not handlerWasCalledAfterUnsubscription - - ## Cleanup - await allFutures(clientSwitch.stop(), serverSwitch.stop()) - - asyncTest "peer subscription should be dropped if connection fails for second time after the timeout has elapsed": - ## Setup - let - serverSwitch = newTestSwitch() - clientSwitch = newTestSwitch() - - await allFutures(serverSwitch.start(), clientSwitch.start()) - - let - server = await newTestWakuFilterNode(serverSwitch, timeout=200.milliseconds) - client = await newTestWakuFilterClient(clientSwitch) - - ## Given - let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo() - - var pushHandlerFuture = newFuture[void]() - proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.} = - pushHandlerFuture.complete() - - let - pubsubTopic = DefaultPubsubTopic - contentTopic = "test-content-topic" - msg = fakeWakuMessage(contentTopic=contentTopic) - - ## When - require (await client.subscribe(pubsubTopic, contentTopic, pushHandler, peer=serverAddr)).isOk() - - # WARN: Sleep necessary to avoid a race condition between the unsubscription and the handle message proc - await sleepAsync(5.milliseconds) - - await server.handleMessage(DefaultPubsubTopic, msg) - - # Push handler should be called - require await pushHandlerFuture.withTimeout(1.seconds) - - # Stop client node to test timeout unsubscription - await clientSwitch.stop() - - await sleepAsync(5.milliseconds) - - # First failure should not remove the subscription - await server.handleMessage(DefaultPubsubTopic, msg) - let - subscriptionsBeforeTimeout = server.subscriptions.len() - failedPeersBeforeTimeout = server.failedPeers.len() - - # Wait for the configured peer connection timeout to elapse (200ms) - await sleepAsync(200.milliseconds) - - # Second failure should remove the subscription - await server.handleMessage(DefaultPubsubTopic, msg) - let - subscriptionsAfterTimeout = server.subscriptions.len() - failedPeersAfterTimeout = server.failedPeers.len() - - ## Then - check: - subscriptionsBeforeTimeout == 1 - failedPeersBeforeTimeout == 1 - subscriptionsAfterTimeout == 0 - failedPeersAfterTimeout == 0 - - ## Cleanup - await serverSwitch.stop() - - asyncTest "peer subscription should not be dropped if connection recovers before timeout elapses": - ## Setup - let - serverSwitch = newTestSwitch() - clientSwitch = newTestSwitch() - - await allFutures(serverSwitch.start(), clientSwitch.start()) - - let - server = await newTestWakuFilterNode(serverSwitch, timeout=200.milliseconds) - client = await newTestWakuFilterClient(clientSwitch) - - ## Given - let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo() - - var pushHandlerFuture = newFuture[void]() - proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.} = - pushHandlerFuture.complete() - - let - pubsubTopic = DefaultPubsubTopic - contentTopic = "test-content-topic" - msg = fakeWakuMessage(contentTopic=contentTopic) - - ## When - require (await client.subscribe(pubsubTopic, contentTopic, pushHandler, peer=serverAddr)).isOk() - - # WARN: Sleep necessary to avoid a race condition between the unsubscription and the handle message proc - await sleepAsync(5.milliseconds) - - await server.handleMessage(DefaultPubsubTopic, msg) - - # Push handler should be called - require await pushHandlerFuture.withTimeout(1.seconds) - - let - subscriptionsBeforeFailure = server.subscriptions.len() - failedPeersBeforeFailure = server.failedPeers.len() - - # Stop switch to test unsubscribe - await clientSwitch.stop() - - await sleepAsync(5.milliseconds) - - # First failure should add to failure list - await server.handleMessage(DefaultPubsubTopic, msg) - - pushHandlerFuture = newFuture[void]() - - let - subscriptionsAfterFailure = server.subscriptions.len() - failedPeersAfterFailure = server.failedPeers.len() - - await sleepAsync(100.milliseconds) - - # Start switch with same key as before - let clientSwitch2 = newTestSwitch( - some(clientSwitch.peerInfo.privateKey), - some(clientSwitch.peerInfo.addrs[0]) - ) - await clientSwitch2.start() - await client.start() - clientSwitch2.mount(client) - - # If push succeeds after failure, the peer should removed from failed peers list - await server.handleMessage(DefaultPubsubTopic, msg) - let handlerShouldHaveBeenCalled = await pushHandlerFuture.withTimeout(1.seconds) - - let - subscriptionsAfterSuccessfulConnection = server.subscriptions.len() - failedPeersAfterSuccessfulConnection = server.failedPeers.len() - - ## Then - check: - handlerShouldHaveBeenCalled - - check: - subscriptionsBeforeFailure == 1 - subscriptionsAfterFailure == 1 - subscriptionsAfterSuccessfulConnection == 1 - - check: - failedPeersBeforeFailure == 0 - failedPeersAfterFailure == 1 - failedPeersAfterSuccessfulConnection == 0 - - ## Cleanup - await allFutures(clientSwitch2.stop(), serverSwitch.stop()) +{.used.} + +import + std/[options, tables], + testutils/unittests, + chronicles, + chronos, + libp2p/crypto/crypto +import + ../../waku/v2/node/peer_manager/peer_manager, + ../../waku/v2/protocol/waku_message, + ../../waku/v2/protocol/waku_filter, + ../../waku/v2/protocol/waku_filter/client, + ./utils, + ./testlib/common, + ./testlib/switch + + +proc newTestWakuFilterNode(switch: Switch, timeout: Duration = 2.hours): Future[WakuFilter] {.async.} = + let + peerManager = PeerManager.new(switch) + rng = crypto.newRng() + proto = WakuFilter.new(peerManager, rng, timeout) + + await proto.start() + switch.mount(proto) + + return proto + +proc newTestWakuFilterClient(switch: Switch): Future[WakuFilterClient] {.async.} = + let + peerManager = PeerManager.new(switch) + rng = crypto.newRng() + proto = WakuFilterClient.new(peerManager, rng) + + await proto.start() + switch.mount(proto) + + return proto + + +# TODO: Extend test coverage +suite "Waku Filter": + asyncTest "should forward messages to client after subscribed": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + let + server = await newTestWakuFilterNode(serverSwitch) + client = await newTestWakuFilterClient(clientSwitch) + + ## Given + let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo() + + let pushHandlerFuture = newFuture[(string, WakuMessage)]() + proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.} = + pushHandlerFuture.complete((pubsubTopic, message)) + + let + pubsubTopic = DefaultPubsubTopic + contentTopic = "test-content-topic" + msg = fakeWakuMessage(contentTopic=contentTopic) + + ## When + require (await client.subscribe(pubsubTopic, contentTopic, pushHandler, peer=serverAddr)).isOk() + + # WARN: Sleep necessary to avoid a race condition between the subscription and the handle message proc + await sleepAsync(500.milliseconds) + + await server.handleMessage(pubsubTopic, msg) + + require await pushHandlerFuture.withTimeout(3.seconds) + + ## Then + let (pushedMsgPubsubTopic, pushedMsg) = pushHandlerFuture.read() + check: + pushedMsgPubsubTopic == pubsubTopic + pushedMsg == msg + + ## Cleanup + await allFutures(clientSwitch.stop(), serverSwitch.stop()) + + asyncTest "should not forward messages to client after unsuscribed": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + let + server = await newTestWakuFilterNode(serverSwitch) + client = await newTestWakuFilterClient(clientSwitch) + + ## Given + let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo() + + var pushHandlerFuture = newFuture[void]() + proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.} = + pushHandlerFuture.complete() + + let + pubsubTopic = DefaultPubsubTopic + contentTopic = "test-content-topic" + msg = fakeWakuMessage(contentTopic=contentTopic) + + ## When + require (await client.subscribe(pubsubTopic, contentTopic, pushHandler, peer=serverAddr)).isOk() + + # WARN: Sleep necessary to avoid a race condition between the subscription and the handle message proc + await sleepAsync(500.milliseconds) + + await server.handleMessage(pubsubTopic, msg) + + require await pushHandlerFuture.withTimeout(1.seconds) + + # Reset to test unsubscribe + pushHandlerFuture = newFuture[void]() + + require (await client.unsubscribe(pubsubTopic, contentTopic, peer=serverAddr)).isOk() + + # WARN: Sleep necessary to avoid a race condition between the unsubscription and the handle message proc + await sleepAsync(500.milliseconds) + + await server.handleMessage(pubsubTopic, msg) + + ## Then + let handlerWasCalledAfterUnsubscription = await pushHandlerFuture.withTimeout(1.seconds) + check: + not handlerWasCalledAfterUnsubscription + + ## Cleanup + await allFutures(clientSwitch.stop(), serverSwitch.stop()) + + asyncTest "peer subscription should be dropped if connection fails for second time after the timeout has elapsed": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + let + server = await newTestWakuFilterNode(serverSwitch, timeout=200.milliseconds) + client = await newTestWakuFilterClient(clientSwitch) + + ## Given + let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo() + + var pushHandlerFuture = newFuture[void]() + proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.} = + pushHandlerFuture.complete() + + let + pubsubTopic = DefaultPubsubTopic + contentTopic = "test-content-topic" + msg = fakeWakuMessage(contentTopic=contentTopic) + + ## When + require (await client.subscribe(pubsubTopic, contentTopic, pushHandler, peer=serverAddr)).isOk() + + # WARN: Sleep necessary to avoid a race condition between the unsubscription and the handle message proc + await sleepAsync(500.milliseconds) + + await server.handleMessage(DefaultPubsubTopic, msg) + + # Push handler should be called + require await pushHandlerFuture.withTimeout(1.seconds) + + # Stop client node to test timeout unsubscription + await clientSwitch.stop() + + await sleepAsync(500.milliseconds) + + # First failure should not remove the subscription + await server.handleMessage(DefaultPubsubTopic, msg) + let + subscriptionsBeforeTimeout = server.subscriptions.len() + failedPeersBeforeTimeout = server.failedPeers.len() + + # Wait for the configured peer connection timeout to elapse (200ms) + await sleepAsync(200.milliseconds) + + # Second failure should remove the subscription + await server.handleMessage(DefaultPubsubTopic, msg) + let + subscriptionsAfterTimeout = server.subscriptions.len() + failedPeersAfterTimeout = server.failedPeers.len() + + ## Then + check: + subscriptionsBeforeTimeout == 1 + failedPeersBeforeTimeout == 1 + subscriptionsAfterTimeout == 0 + failedPeersAfterTimeout == 0 + + ## Cleanup + await serverSwitch.stop() + + asyncTest "peer subscription should not be dropped if connection recovers before timeout elapses": + ## Setup + let + serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() + + await allFutures(serverSwitch.start(), clientSwitch.start()) + + let + server = await newTestWakuFilterNode(serverSwitch, timeout=200.milliseconds) + client = await newTestWakuFilterClient(clientSwitch) + + ## Given + let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo() + + var pushHandlerFuture = newFuture[void]() + proc pushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) {.gcsafe, closure.} = + pushHandlerFuture.complete() + + let + pubsubTopic = DefaultPubsubTopic + contentTopic = "test-content-topic" + msg = fakeWakuMessage(contentTopic=contentTopic) + + ## When + require (await client.subscribe(pubsubTopic, contentTopic, pushHandler, peer=serverAddr)).isOk() + + # WARN: Sleep necessary to avoid a race condition between the unsubscription and the handle message proc + await sleepAsync(500.milliseconds) + + await server.handleMessage(DefaultPubsubTopic, msg) + + # Push handler should be called + require await pushHandlerFuture.withTimeout(1.seconds) + + let + subscriptionsBeforeFailure = server.subscriptions.len() + failedPeersBeforeFailure = server.failedPeers.len() + + # Stop switch to test unsubscribe + await clientSwitch.stop() + + await sleepAsync(500.milliseconds) + + # First failure should add to failure list + await server.handleMessage(DefaultPubsubTopic, msg) + + pushHandlerFuture = newFuture[void]() + + let + subscriptionsAfterFailure = server.subscriptions.len() + failedPeersAfterFailure = server.failedPeers.len() + + await sleepAsync(100.milliseconds) + + # Start switch with same key as before + let clientSwitch2 = newTestSwitch( + some(clientSwitch.peerInfo.privateKey), + some(clientSwitch.peerInfo.addrs[0]) + ) + await clientSwitch2.start() + await client.start() + clientSwitch2.mount(client) + + # If push succeeds after failure, the peer should removed from failed peers list + await server.handleMessage(DefaultPubsubTopic, msg) + let handlerShouldHaveBeenCalled = await pushHandlerFuture.withTimeout(1.seconds) + + let + subscriptionsAfterSuccessfulConnection = server.subscriptions.len() + failedPeersAfterSuccessfulConnection = server.failedPeers.len() + + ## Then + check: + handlerShouldHaveBeenCalled + + check: + subscriptionsBeforeFailure == 1 + subscriptionsAfterFailure == 1 + subscriptionsAfterSuccessfulConnection == 1 + + check: + failedPeersBeforeFailure == 0 + failedPeersAfterFailure == 1 + failedPeersAfterSuccessfulConnection == 0 + + ## Cleanup + await allFutures(clientSwitch2.stop(), serverSwitch.stop())