diff --git a/tests/v2/test_waku_filter.nim b/tests/v2/test_waku_filter.nim index 699c53d10..62d9ec177 100644 --- a/tests/v2/test_waku_filter.nim +++ b/tests/v2/test_waku_filter.nim @@ -30,7 +30,7 @@ procSuite "Waku Filter": await listenSwitch.start() var responseRequestIdFuture = newFuture[string]() - proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = + proc handle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} = check: msg.messages.len() == 1 msg.messages[0] == post @@ -43,7 +43,7 @@ procSuite "Waku Filter": dialSwitch.mount(proto) proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) - proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = + proc emptyHandle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} = discard let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle) @@ -75,7 +75,7 @@ procSuite "Waku Filter": await listenSwitch.start() var responseCompletionFuture = newFuture[bool]() - proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = + proc handle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} = check: msg.messages.len() == 1 msg.messages[0] == post @@ -88,7 +88,7 @@ procSuite "Waku Filter": dialSwitch.mount(proto) proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) - proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = + proc emptyHandle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} = discard let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle) @@ -131,7 +131,7 @@ procSuite "Waku Filter": await dialSwitch.start() var responseRequestIdFuture = newFuture[string]() - proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = + proc handle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} = discard let @@ -161,7 +161,7 @@ procSuite "Waku Filter": await listenSwitch.start() var responseCompletionFuture = newFuture[bool]() - proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = + proc handle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} = check: msg.messages.len() == 1 msg.messages[0] == post @@ -174,7 +174,7 @@ procSuite "Waku Filter": dialSwitch.mount(proto) proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) - proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = + proc emptyHandle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} = discard let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle, 1.seconds) @@ -225,7 +225,7 @@ procSuite "Waku Filter": await listenSwitch.start() var responseCompletionFuture = newFuture[bool]() - proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = + proc handle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} = check: msg.messages.len() == 1 msg.messages[0] == post @@ -238,7 +238,7 @@ procSuite "Waku Filter": dialSwitch.mount(proto) proto.setPeer(listenSwitch.peerInfo.toRemotePeerInfo()) - proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = + proc emptyHandle(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} = discard let proto2 = WakuFilter.init(PeerManager.new(listenSwitch), crypto.newRng(), emptyHandle, 2.seconds) diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index 1e4cb66c5..559284567 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -349,6 +349,65 @@ procSuite "WakuNode": await node1.stop() await node2.stop() + asyncTest "Store protocol returns expected message when relay is disabled and filter enabled": + # See nwaku issue #937: 'Store: ability to decouple store from relay' + + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) + + pubSubTopic = "/waku/2/default-waku/proto" + contentTopic = ContentTopic("/waku/2/default-content/proto") + message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) + + filterComplFut = newFuture[bool]() + storeComplFut = newFuture[bool]() + + await node1.start() + node1.mountStore(persistMessages = true) + node1.mountFilter() + + await node2.start() + node2.mountStore(persistMessages = true) + node2.mountFilter() + + node2.wakuFilter.setPeer(node1.switch.peerInfo.toRemotePeerInfo()) + node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo()) + + proc filterReqHandler(msg: WakuMessage) {.gcsafe, closure.} = + check: + msg == message + filterComplFut.complete(true) + + await node2.subscribe(FilterRequest(pubSubTopic: pubSubTopic, contentFilters: @[ContentFilter(contentTopic: contentTopic)], subscribe: true), filterReqHandler) + + await sleepAsync(2000.millis) + + # Send filter push message to node2 + await node1.wakuFilter.handleMessage(pubSubTopic, message) + + await sleepAsync(2000.millis) + + # Wait for the node2 filter to receive the push message + check: + (await filterComplFut.withTimeout(5.seconds)) == true + + proc node1StoreQueryRespHandler(response: HistoryResponse) {.gcsafe, closure.} = + check: + response.messages.len == 1 + response.messages[0] == message + storeComplFut.complete(true) + + await node1.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: contentTopic)]), node1StoreQueryRespHandler) + + check: + (await storeComplFut.withTimeout(5.seconds)) == true + + await node1.stop() + await node2.stop() + asyncTest "Messages are correctly relayed": let nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] @@ -476,7 +535,7 @@ procSuite "WakuNode": dnsAddrPeer = parseRemotePeerInfo("/dnsaddr/localhost/tcp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc") dns4Peer = parseRemotePeerInfo("/dns4/localhost/tcp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc") dns6Peer = parseRemotePeerInfo("/dns6/localhost/tcp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc") - + check: # /dns $(dnsPeer.peerId) == "16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc" @@ -494,7 +553,7 @@ procSuite "WakuNode": $(dns6Peer.peerId) == "16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc" $(dns6Peer.addrs[0][0].tryGet()) == "/dns6/localhost" $(dns6Peer.addrs[0][1].tryGet()) == "/tcp/60002" - + # Now test some common corner cases expect LPError: # gibberish @@ -511,18 +570,18 @@ procSuite "WakuNode": expect LPError: # invalid IP address discard parseRemotePeerInfo("/ip4/127.0.0.0.1/tcp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc") - + expect LPError: # no PeerID discard parseRemotePeerInfo("/ip4/127.0.0.1/tcp/60002") - + expect ValueError: # unsupported transport discard parseRemotePeerInfo("/ip4/127.0.0.1/udp/60002/p2p/16Uuu2HBmAcHvhLqQKwSSbX6BG5JLWUDRcaLVrehUVqpw7fz1hbYc") - + asyncTest "resolve and connect to dns multiaddrs": let resolver = MockResolver.new() - + resolver.ipResponses[("localhost", false)] = @["127.0.0.1"] let @@ -530,15 +589,15 @@ procSuite "WakuNode": node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000), nameResolver = resolver) nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) - + # Construct DNS multiaddr for node2 let node2PeerId = $(node2.switch.peerInfo.peerId) node2Dns4Addr = "/dns4/localhost/tcp/60002/p2p/" & node2PeerId - + node1.mountRelay() node2.mountRelay() - + await allFutures([node1.start(), node2.start()]) await node1.connectToNodes(@[node2Dns4Addr]) @@ -549,10 +608,10 @@ procSuite "WakuNode": await allFutures([node1.stop(), node2.stop()]) asyncTest "filtering relayed messages using topic validators": - ## test scenario: + ## test scenario: ## node1 and node3 set node2 as their relay node - ## node3 publishes two messages with two different contentTopics but on the same pubsub topic - ## node1 is also subscribed to the same pubsub topic + ## node3 publishes two messages with two different contentTopics but on the same pubsub topic + ## node1 is also subscribed to the same pubsub topic ## node2 sets a validator for the same pubsub topic ## only one of the messages gets delivered to node1 because the validator only validates one of the content topics @@ -596,7 +655,7 @@ procSuite "WakuNode": ## the validator that only allows messages with contentTopic1 to be relayed check: topic == pubSubTopic - let msg = WakuMessage.init(message.data) + let msg = WakuMessage.init(message.data) if msg.isOk(): # only relay messages with contentTopic1 if msg.value().contentTopic == contentTopic1: @@ -606,7 +665,7 @@ procSuite "WakuNode": result = ValidationResult.Reject completionFutValidatorRej.complete(true) - # set a topic validator for pubSubTopic + # set a topic validator for pubSubTopic let pb = PubSub(node2.wakuRelay) pb.addValidator(pubSubTopic, validator) @@ -622,14 +681,14 @@ procSuite "WakuNode": val.contentTopic == contentTopic1 # relay handler is called completionFut.complete(true) - + node3.subscribe(pubSubTopic, relayHandler) await sleepAsync(2000.millis) await node1.publish(pubSubTopic, message1) await sleepAsync(2000.millis) - + # message2 never gets relayed because of the validator await node1.publish(pubSubTopic, message2) await sleepAsync(2000.millis) @@ -641,14 +700,14 @@ procSuite "WakuNode": # check that validator is called for message2 (await completionFutValidatorRej.withTimeout(10.seconds)) == true - + await node1.stop() await node2.stop() await node3.stop() - + when defined(rln): asyncTest "testing rln-relay with valid proof": - + let # publisher node nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] @@ -665,25 +724,25 @@ procSuite "WakuNode": # set up three nodes # node1 - node1.mountRelay(@[rlnRelayPubSubTopic]) + node1.mountRelay(@[rlnRelayPubSubTopic]) let (groupOpt1, memKeyPairOpt1, memIndexOpt1) = rlnRelaySetUp(1) # set up rln relay inputs # mount rlnrelay in off-chain mode waitFor node1.mountRlnRelay(groupOpt = groupOpt1, - memKeyPairOpt = memKeyPairOpt1, - memIndexOpt= memIndexOpt1, - onchainMode = false, + memKeyPairOpt = memKeyPairOpt1, + memIndexOpt= memIndexOpt1, + onchainMode = false, pubsubTopic = rlnRelayPubSubTopic, contentTopic = contentTopic) - await node1.start() + await node1.start() # node 2 node2.mountRelay(@[rlnRelayPubSubTopic]) let (groupOpt2, memKeyPairOpt2, memIndexOpt2) = rlnRelaySetUp(2) # set up rln relay inputs # mount rlnrelay in off-chain mode - waitFor node2.mountRlnRelay(groupOpt = groupOpt2, - memKeyPairOpt = memKeyPairOpt2, - memIndexOpt= memIndexOpt2, - onchainMode = false, + waitFor node2.mountRlnRelay(groupOpt = groupOpt2, + memKeyPairOpt = memKeyPairOpt2, + memIndexOpt= memIndexOpt2, + onchainMode = false, pubsubTopic = rlnRelayPubSubTopic, contentTopic = contentTopic) await node2.start() @@ -692,10 +751,10 @@ procSuite "WakuNode": node3.mountRelay(@[rlnRelayPubSubTopic]) let (groupOpt3, memKeyPairOpt3, memIndexOpt3) = rlnRelaySetUp(3) # set up rln relay inputs # mount rlnrelay in off-chain mode - waitFor node3.mountRlnRelay(groupOpt = groupOpt3, - memKeyPairOpt = memKeyPairOpt3, - memIndexOpt= memIndexOpt3, - onchainMode = false, + waitFor node3.mountRlnRelay(groupOpt = groupOpt3, + memKeyPairOpt = memKeyPairOpt3, + memIndexOpt= memIndexOpt3, + onchainMode = false, pubsubTopic = rlnRelayPubSubTopic, contentTopic = contentTopic) await node3.start() @@ -703,7 +762,7 @@ procSuite "WakuNode": # connect them together await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) - + var completionFut = newFuture[bool]() proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = let msg = WakuMessage.init(data) @@ -721,14 +780,14 @@ procSuite "WakuNode": let payload = "Hello".toBytes() # prepare the epoch - let epoch = getCurrentEpoch() + let epoch = getCurrentEpoch() - var message = WakuMessage(payload: @payload, + var message = WakuMessage(payload: @payload, contentTopic: contentTopic) doAssert(node1.wakuRlnRelay.appendRLNProof(message, epochTime())) - ## node1 publishes a message with a rate limit proof, the message is then relayed to node2 which in turn + ## node1 publishes a message with a rate limit proof, the message is then relayed to node2 which in turn ## verifies the rate limit proof of the message and relays the message to node3 ## verification at node2 occurs inside a topic validator which is installed as part of the waku-rln-relay mount proc await node1.publish(rlnRelayPubSubTopic, message) @@ -737,7 +796,7 @@ procSuite "WakuNode": check: (await completionFut.withTimeout(10.seconds)) == true - + await node1.stop() await node2.stop() await node3.stop() @@ -758,25 +817,25 @@ procSuite "WakuNode": # set up three nodes # node1 - node1.mountRelay(@[rlnRelayPubSubTopic]) + node1.mountRelay(@[rlnRelayPubSubTopic]) let (groupOpt1, memKeyPairOpt1, memIndexOpt1) = rlnRelaySetUp(1) # set up rln relay inputs # mount rlnrelay in off-chain mode waitFor node1.mountRlnRelay(groupOpt = groupOpt1, - memKeyPairOpt = memKeyPairOpt1, - memIndexOpt= memIndexOpt1, - onchainMode = false, + memKeyPairOpt = memKeyPairOpt1, + memIndexOpt= memIndexOpt1, + onchainMode = false, pubsubTopic = rlnRelayPubSubTopic, contentTopic = contentTopic) - await node1.start() + await node1.start() # node 2 node2.mountRelay(@[rlnRelayPubSubTopic]) let (groupOpt2, memKeyPairOpt2, memIndexOpt2) = rlnRelaySetUp(2) # set up rln relay inputs # mount rlnrelay in off-chain mode - waitFor node2.mountRlnRelay(groupOpt = groupOpt2, - memKeyPairOpt = memKeyPairOpt2, - memIndexOpt= memIndexOpt2, - onchainMode = false, + waitFor node2.mountRlnRelay(groupOpt = groupOpt2, + memKeyPairOpt = memKeyPairOpt2, + memIndexOpt= memIndexOpt2, + onchainMode = false, pubsubTopic = rlnRelayPubSubTopic, contentTopic = contentTopic) await node2.start() @@ -785,10 +844,10 @@ procSuite "WakuNode": node3.mountRelay(@[rlnRelayPubSubTopic]) let (groupOpt3, memKeyPairOpt3, memIndexOpt3) = rlnRelaySetUp(3) # set up rln relay inputs # mount rlnrelay in off-chain mode - waitFor node3.mountRlnRelay(groupOpt = groupOpt3, - memKeyPairOpt = memKeyPairOpt3, - memIndexOpt= memIndexOpt3, - onchainMode = false, + waitFor node3.mountRlnRelay(groupOpt = groupOpt3, + memKeyPairOpt = memKeyPairOpt3, + memIndexOpt= memIndexOpt3, + onchainMode = false, pubsubTopic = rlnRelayPubSubTopic, contentTopic = contentTopic) await node3.start() @@ -796,7 +855,7 @@ procSuite "WakuNode": # connect them together await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) - + # define a custom relay handler var completionFut = newFuture[bool]() proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = @@ -818,22 +877,22 @@ procSuite "WakuNode": let epoch = getCurrentEpoch() # prepare the proof - let + let contentTopicBytes = contentTopic.toBytes input = concat(payload, contentTopicBytes) - rateLimitProofRes = node1.wakuRlnRelay.rlnInstance.proofGen(data = input, - memKeys = node1.wakuRlnRelay.membershipKeyPair, - memIndex = MembershipIndex(4), + rateLimitProofRes = node1.wakuRlnRelay.rlnInstance.proofGen(data = input, + memKeys = node1.wakuRlnRelay.membershipKeyPair, + memIndex = MembershipIndex(4), epoch = epoch) doAssert(rateLimitProofRes.isOk()) - let rateLimitProof = rateLimitProofRes.value + let rateLimitProof = rateLimitProofRes.value - let message = WakuMessage(payload: @payload, - contentTopic: contentTopic, + let message = WakuMessage(payload: @payload, + contentTopic: contentTopic, proof: rateLimitProof) - ## node1 publishes a message with an invalid rln proof, the message is then relayed to node2 which in turn + ## node1 publishes a message with an invalid rln proof, the message is then relayed to node2 which in turn ## attempts to verify the rate limit proof and fails hence does not relay the message to node3, thus the relayHandler of node3 ## never gets called ## verification at node2 occurs inside a topic validator which is installed as part of the waku-rln-relay mount proc @@ -843,13 +902,13 @@ procSuite "WakuNode": check: # the relayHandler of node3 never gets called (await completionFut.withTimeout(10.seconds)) == false - + await node1.stop() await node2.stop() await node3.stop() asyncTest "testing rln-relay double-signaling detection": - + let # publisher node nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] @@ -866,25 +925,25 @@ procSuite "WakuNode": # set up three nodes # node1 - node1.mountRelay(@[rlnRelayPubSubTopic]) + node1.mountRelay(@[rlnRelayPubSubTopic]) let (groupOpt1, memKeyPairOpt1, memIndexOpt1) = rlnRelaySetUp(1) # set up rln relay inputs # mount rlnrelay in off-chain mode waitFor node1.mountRlnRelay(groupOpt = groupOpt1, - memKeyPairOpt = memKeyPairOpt1, - memIndexOpt= memIndexOpt1, - onchainMode = false, + memKeyPairOpt = memKeyPairOpt1, + memIndexOpt= memIndexOpt1, + onchainMode = false, pubsubTopic = rlnRelayPubSubTopic, contentTopic = contentTopic) - await node1.start() + await node1.start() # node 2 node2.mountRelay(@[rlnRelayPubSubTopic]) let (groupOpt2, memKeyPairOpt2, memIndexOpt2) = rlnRelaySetUp(2) # set up rln relay inputs # mount rlnrelay in off-chain mode - waitFor node2.mountRlnRelay(groupOpt = groupOpt2, - memKeyPairOpt = memKeyPairOpt2, - memIndexOpt= memIndexOpt2, - onchainMode = false, + waitFor node2.mountRlnRelay(groupOpt = groupOpt2, + memKeyPairOpt = memKeyPairOpt2, + memIndexOpt= memIndexOpt2, + onchainMode = false, pubsubTopic = rlnRelayPubSubTopic, contentTopic = contentTopic) await node2.start() @@ -893,10 +952,10 @@ procSuite "WakuNode": node3.mountRelay(@[rlnRelayPubSubTopic]) let (groupOpt3, memKeyPairOpt3, memIndexOpt3) = rlnRelaySetUp(3) # set up rln relay inputs # mount rlnrelay in off-chain mode - waitFor node3.mountRlnRelay(groupOpt = groupOpt3, - memKeyPairOpt = memKeyPairOpt3, - memIndexOpt= memIndexOpt3, - onchainMode = false, + waitFor node3.mountRlnRelay(groupOpt = groupOpt3, + memKeyPairOpt = memKeyPairOpt3, + memIndexOpt= memIndexOpt3, + onchainMode = false, pubsubTopic = rlnRelayPubSubTopic, contentTopic = contentTopic) await node3.start() @@ -905,16 +964,16 @@ procSuite "WakuNode": await node1.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) await node3.connectToNodes(@[node2.switch.peerInfo.toRemotePeerInfo()]) - # get the current epoch time + # get the current epoch time let time = epochTime() # create some messages with rate limit proofs - var + var wm1 = WakuMessage(payload: "message 1".toBytes(), contentTopic: contentTopic) proofAdded1 = node3.wakuRlnRelay.appendRLNProof(wm1, time) # another message in the same epoch as wm1, it will break the messaging rate limit wm2 = WakuMessage(payload: "message 2".toBytes(), contentTopic: contentTopic) proofAdded2 = node3.wakuRlnRelay.appendRLNProof(wm2, time) - # wm3 points to the next epoch + # wm3 points to the next epoch wm3 = WakuMessage(payload: "message 3".toBytes(), contentTopic: contentTopic) proofAdded3 = node3.wakuRlnRelay.appendRLNProof(wm3, time+EPOCH_UNIT_SECONDS) wm4 = WakuMessage(payload: "message 4".toBytes(), contentTopic: contentTopic) @@ -944,7 +1003,7 @@ procSuite "WakuNode": completionFut3.complete(true) if wm == wm4: completionFut4.complete(true) - + # mount the relay handler for node3 node3.subscribe(rlnRelayPubSubTopic, relayHandler) @@ -962,7 +1021,7 @@ procSuite "WakuNode": await node1.publish(rlnRelayPubSubTopic, wm4) await sleepAsync(2000.millis) - let + let res1 = await completionFut1.withTimeout(10.seconds) res2 = await completionFut2.withTimeout(10.seconds) @@ -970,11 +1029,11 @@ procSuite "WakuNode": (res1 and res2) == false # either of the wm1 and wm2 is found as spam hence not relayed (await completionFut3.withTimeout(10.seconds)) == true (await completionFut4.withTimeout(10.seconds)) == false - + await node1.stop() await node2.stop() await node3.stop() - + asyncTest "Relay protocol is started correctly": let nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] @@ -1002,13 +1061,13 @@ procSuite "WakuNode": check: # Relay has not yet started as node has not yet started GossipSub(node2.wakuRelay).heartbeatFut.isNil - + await node2.start() check: # Relay started on node start GossipSub(node2.wakuRelay).heartbeatFut.isNil == false - + await allFutures([node1.stop(), node2.stop()]) asyncTest "Lightpush message return success": @@ -1126,10 +1185,10 @@ procSuite "WakuNode": msg2 = WakuMessage(payload: "hello world2".toBytes(), contentTopic: contentTopic, timestamp: 2) # setup sqlite database for node1 - let + let database = SqliteDatabase.init("", inMemory = true)[] store = WakuMessageStore.init(database)[] - + var completionFut = newFuture[bool]() @@ -1144,14 +1203,14 @@ procSuite "WakuNode": await sleepAsync(2000.millis) node1.wakuStore.setPeer(node2.switch.peerInfo.toRemotePeerInfo()) - + # populate db with msg1 to be a duplicate let index1 = computeIndex(msg1) let output1 = store.put(index1, msg1, DefaultTopic) check output1.isOk discard node1.wakuStore.messages.add(IndexedWakuMessage(msg: msg1, index: index1, pubsubTopic: DefaultTopic)) - + # now run the resume proc await node1.resume() @@ -1167,7 +1226,7 @@ procSuite "WakuNode": check: # if the duplicates are discarded properly, then the total number of messages after resume should be 2 # check no duplicates is in the messages field - node1.wakuStore.messages.len == 2 + node1.wakuStore.messages.len == 2 # check no duplicates is in the db responseCount == 2 @@ -1186,14 +1245,14 @@ procSuite "WakuNode": nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60013)) - + check: # Sanity check, to verify config was applied node1.switch.connManager.inSema.size == maxConnections # Node with connection limit set to 1 await node1.start() - node1.mountRelay() + node1.mountRelay() # Remote node 1 await node2.start() @@ -1214,7 +1273,7 @@ procSuite "WakuNode": await allFutures([node1.stop(), node2.stop(), node3.stop()]) - + asyncTest "Messages are relayed between two websocket nodes": let nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] @@ -1249,7 +1308,7 @@ procSuite "WakuNode": node1.subscribe(pubSubTopic, relayHandler) await sleepAsync(2000.millis) - + await node2.publish(pubSubTopic, message) await sleepAsync(2000.millis) @@ -1294,7 +1353,7 @@ procSuite "WakuNode": node1.subscribe(pubSubTopic, relayHandler) await sleepAsync(2000.millis) - + await node2.publish(pubSubTopic, message) await sleepAsync(2000.millis) @@ -1342,7 +1401,7 @@ procSuite "WakuNode": node1.subscribe(pubSubTopic, relayHandler) await sleepAsync(2000.millis) - + await node2.publish(pubSubTopic, message) await sleepAsync(2000.millis) @@ -1386,7 +1445,7 @@ procSuite "WakuNode": node1.subscribe(pubSubTopic, relayHandler) await sleepAsync(2000.millis) - + await node2.publish(pubSubTopic, message) await sleepAsync(2000.millis) @@ -1399,7 +1458,7 @@ procSuite "WakuNode": asyncTest "Messages fails with wrong key path": let nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - + expect IOError: # gibberish discard WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), @@ -1439,7 +1498,7 @@ procSuite "WakuNode": node1.subscribe(pubSubTopic, relayHandler) await sleepAsync(2000.millis) - + await node2.publish(pubSubTopic, message) await sleepAsync(2000.millis) @@ -1448,7 +1507,7 @@ procSuite "WakuNode": (await completionFut.withTimeout(5.seconds)) == true await node1.stop() await node2.stop() - + asyncTest "Peer info updates with correct announced addresses": let nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] @@ -1460,7 +1519,7 @@ procSuite "WakuNode": nodeKey, bindIp, bindPort, extIp, extPort) - + let bindEndpoint = MultiAddress.init(bindIp, tcpProtocol, bindPort) announcedEndpoint = MultiAddress.init(extIp.get(), tcpProtocol, extPort.get()) @@ -1469,10 +1528,10 @@ procSuite "WakuNode": # Check that underlying peer info contains only bindIp before starting node.switch.peerInfo.addrs.len == 1 node.switch.peerInfo.addrs.contains(bindEndpoint) - + node.announcedAddresses.len == 1 node.announcedAddresses.contains(announcedEndpoint) - + await node.start() check: @@ -1497,7 +1556,7 @@ procSuite "WakuNode": bindIp, bindPort, extIp, extPort, dns4DomainName = some(domainName)) - + check: node.announcedAddresses.len == 1 node.announcedAddresses.contains(expectedDns4Addr) diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index e61591d6e..4508e3cfb 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -324,7 +324,7 @@ proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHa waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) # Register handler for filter, whether remote subscription succeeded or not - node.filters[id] = Filter(contentFilters: request.contentFilters, handler: handler) + node.filters[id] = Filter(contentFilters: request.contentFilters, handler: handler, pubSubTopic: request.pubSubTopic) waku_node_filters.set(node.filters.len.int64) proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) = @@ -457,12 +457,16 @@ proc info*(node: WakuNode): WakuInfo = proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {.raises: [Defect, KeyError, LPError]} = info "mounting filter" proc filterHandler(requestId: string, msg: MessagePush) - {.gcsafe, raises: [Defect, KeyError].} = + {.async, gcsafe, raises: [Defect, KeyError].} = info "push received" for message in msg.messages: node.filters.notify(message, requestId) # Trigger filter handlers on a light node - + + if not node.wakuStore.isNil and (requestId in node.filters): + let pubSubTopic = node.filters[requestId].pubSubTopic + await node.wakuStore.handleMessage(pubSubTopic, message) + waku_node_messages.inc(labelValues = ["filter"]) node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler, filterTimeout) diff --git a/waku/v2/protocol/waku_filter/waku_filter.nim b/waku/v2/protocol/waku_filter/waku_filter.nim index c266cc39b..1458715ed 100644 --- a/waku/v2/protocol/waku_filter/waku_filter.nim +++ b/waku/v2/protocol/waku_filter/waku_filter.nim @@ -179,7 +179,7 @@ method init*(wf: WakuFilter) = let value = res.value if value.push != MessagePush(): waku_filter_messages.inc(labelValues = ["MessagePush"]) - wf.pushHandler(value.requestId, value.push) + await wf.pushHandler(value.requestId, value.push) if value.request != FilterRequest(): waku_filter_messages.inc(labelValues = ["FilterRequest"]) if value.request.subscribe: diff --git a/waku/v2/protocol/waku_filter/waku_filter_types.nim b/waku/v2/protocol/waku_filter/waku_filter_types.nim index e63c165dd..f4d45b197 100644 --- a/waku/v2/protocol/waku_filter/waku_filter_types.nim +++ b/waku/v2/protocol/waku_filter/waku_filter_types.nim @@ -15,6 +15,8 @@ const MaxRpcSize* = 10 * MaxWakuMessageSize + 64*1024 type + PubSubTopic* = string + ContentFilter* = object contentTopic*: ContentTopic @@ -22,6 +24,7 @@ type Filter* = object contentFilters*: seq[ContentFilter] + pubSubTopic*: PubSubTopic handler*: ContentFilterHandler # @TODO MAYBE MORE INFO? @@ -29,7 +32,7 @@ type FilterRequest* = object contentFilters*: seq[ContentFilter] - pubSubTopic*: string + pubSubTopic*: PubSubTopic subscribe*: bool MessagePush* = object @@ -45,7 +48,7 @@ type requestId*: string filter*: FilterRequest # @TODO MAKE THIS A SEQUENCE AGAIN? - MessagePushHandler* = proc(requestId: string, msg: MessagePush) {.gcsafe, closure.} + MessagePushHandler* = proc(requestId: string, msg: MessagePush): Future[void] {.gcsafe, closure.} WakuFilter* = ref object of LPProtocol rng*: ref BrHmacDrbgContext