diff --git a/tests/test_waku_connect.nim b/tests/test_waku_connect.nim index b096a47e2..1e8cce148 100644 --- a/tests/test_waku_connect.nim +++ b/tests/test_waku_connect.nim @@ -15,7 +15,7 @@ import const safeTTL = 5'u32 waitInterval = messageInterval + 150.milliseconds - conditionTimeoutMs = 3000 + conditionTimeoutMs = 3000.milliseconds proc resetMessageQueues(nodes: varargs[EthereumNode]) = for node in nodes: @@ -23,7 +23,8 @@ proc resetMessageQueues(nodes: varargs[EthereumNode]) = # check on a condition until true or return a future containing false # if timeout expires first -proc eventually(timeout: int, condition: proc(): bool {.gcsafe.}): Future[bool] = +proc eventually(timeout: Duration, condition: proc(): bool {.gcsafe.}): + Future[bool] = let wrappedCondition = proc(): Future[bool] {.async.} = let f = newFuture[bool]() while not condition(): @@ -76,7 +77,8 @@ suite "Waku connections": proc handler1(msg: ReceivedMessage) = var count {.global.}: int - check msg.decoded.payload == payloads[0] or msg.decoded.payload == payloads[1] + check msg.decoded.payload == payloads[0] or + msg.decoded.payload == payloads[1] count += 1 if count == 2: futures[0].complete(1) proc handler2(msg: ReceivedMessage) = @@ -84,7 +86,8 @@ suite "Waku connections": futures[1].complete(1) proc handler3(msg: ReceivedMessage) = var count {.global.}: int - check msg.decoded.payload == payloads[2] or msg.decoded.payload == payloads[3] + check msg.decoded.payload == payloads[2] or + msg.decoded.payload == payloads[3] count += 1 if count == 2: futures[2].complete(1) proc handler4(msg: ReceivedMessage) = @@ -93,19 +96,17 @@ suite "Waku connections": # Filters # filter for encrypted asym - filters.add(node1.subscribeFilter(initFilter(privateKey = some(encryptKeyPair.seckey), - topics = @[topic]), handler1)) + filters.add(node1.subscribeFilter(initFilter( + privateKey = some(encryptKeyPair.seckey), topics = @[topic]), handler1)) # filter for encrypted asym + signed filters.add(node1.subscribeFilter(initFilter(some(signKeyPair.pubkey), - privateKey = some(encryptKeyPair.seckey), - topics = @[topic]), handler2)) + privateKey = some(encryptKeyPair.seckey), topics = @[topic]), handler2)) # filter for encrypted sym filters.add(node1.subscribeFilter(initFilter(symKey = some(symKey), - topics = @[topic]), handler3)) + topics = @[topic]), handler3)) # filter for encrypted sym + signed filters.add(node1.subscribeFilter(initFilter(some(signKeyPair.pubkey), - symKey = some(symKey), - topics = @[topic]), handler4)) + symKey = some(symKey), topics = @[topic]), handler4)) # Messages check: # encrypted asym @@ -182,10 +183,10 @@ suite "Waku connections": check msg.decoded.payload == payload futures[1].complete(1) - var filter1 = node1.subscribeFilter(initFilter(topics = @[topic], powReq = 0), - handler1) - var filter2 = node1.subscribeFilter(initFilter(topics = @[topic], - powReq = 1_000_000), handler2) + var filter1 = node1.subscribeFilter( + initFilter(topics = @[topic], powReq = 0), handler1) + var filter2 = node1.subscribeFilter( + initFilter(topics = @[topic], powReq = 1_000_000), handler2) check: node2.postMessage(ttl = safeTTL, topic = topic, payload = payload) == true @@ -246,7 +247,8 @@ suite "Waku connections": proc handler(msg: ReceivedMessage) = check msg.decoded.payload == payload f.complete(1) - var filter = node1.subscribeFilter(initFilter(topics = filterTopics), handler) + var filter = node1.subscribeFilter( + initFilter(topics = filterTopics), handler) await node1.setBloomFilter(node1.filtersToBloom()) check: @@ -314,7 +316,7 @@ suite "Waku connections": # to uint32 in postMessage() let lowerTTL = 2'u32 # Lower TTL as we need to wait for messages to expire for i in countdown(10, 1): - check node2.postMessage(ttl = lowerTTL, topic = topic, payload = payload) == true + check node2.postMessage(ttl = lowerTTL, topic = topic, payload = payload) check node2.protocolState(Waku).queue.items.len == 10 await sleepAsync(waitInterval) @@ -338,6 +340,10 @@ suite "Waku connections": var filter = node1.subscribeFilter(initFilter(topics = @[topic], allowP2P = true), handler) + # Need to be sure that node1 is added in the peerpool of node2 as + # postMessage with target will iterate over the peers + require await eventually(conditionTimeoutMs, + proc(): bool = node2.peerPool.len == 1) check: node1.setPeerTrusted(toNodeId(node2.keys.pubkey)) == true node2.postMessage(ttl = 10, topic = topic, @@ -472,9 +478,11 @@ suite "Waku connections": peer.state(Waku).bloom == bloom peer.state(Waku).topics == some(topics) - let hasBloomNodeConnectedCondition = proc(): bool = wakuBloomNode.peerPool.len == 1 + let hasBloomNodeConnectedCondition = proc(): bool = + wakuBloomNode.peerPool.len == 1 # wait for the peer to be connected on the other side - let hasBloomNodeConnected = await eventually(conditionTimeoutMs, hasBloomNodeConnectedCondition) + let hasBloomNodeConnected = + await eventually(conditionTimeoutMs, hasBloomNodeConnectedCondition) # check bloom filter is updated check: hasBloomNodeConnected @@ -487,9 +495,11 @@ suite "Waku connections": let bloomFilterUpdatedCondition = proc(): bool = for peer in wakuNode.peerPool.peers: - return peer.state(Waku).bloom == bloom and peer.state(Waku).topics == none(seq[Topic]) + return peer.state(Waku).bloom == bloom and + peer.state(Waku).topics == none(seq[Topic]) - let bloomFilterUpdated = await eventually(conditionTimeoutMs, bloomFilterUpdatedCondition) + let bloomFilterUpdated = + await eventually(conditionTimeoutMs, bloomFilterUpdatedCondition) # check bloom filter is updated check: bloomFilterUpdated @@ -516,9 +526,8 @@ suite "Waku connections": wakuNode.postMessage(ttl = safeTTL, topic = wrongTopic, payload = payload) wakuNode.protocolState(Waku).queue.items.len == 3 - let response = await eventually(conditionTimeoutMs, proc (): bool = wakuTopicNode.protocolState(Waku).queue.items.len == 2) - check: - response + await eventually(conditionTimeoutMs, + proc (): bool = wakuTopicNode.protocolState(Waku).queue.items.len == 2) asyncTest "Waku topic-interest versus bloom filter": var