diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index 34faecb63..4b0d8ff1a 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -326,9 +326,7 @@ proc writeAndPrint(c: Chat) {.async.} = if not c.node.wakuFilter.isNil(): echo "unsubscribing from content filters..." - await c.node.unsubscribe( - FilterRequest(contentFilters: @[ContentFilter(contentTopic: c.contentTopic)], pubSubTopic: DefaultTopic, subscribe: false) - ) + await c.node.unsubscribe(pubsubTopic=DefaultTopic, contentTopics=c.contentTopic) echo "quitting..." @@ -499,18 +497,16 @@ proc processInput(rfd: AsyncFD) {.async.} = if conf.filternode != "": await node.mountFilter() + await node.mountFilterClient() - node.wakuFilter.setPeer(parseRemotePeerInfo(conf.filternode)) + node.setFilterPeer(parseRemotePeerInfo(conf.filternode)) - proc filterHandler(msg: WakuMessage) {.gcsafe.} = + proc filterHandler(pubsubTopic: string, msg: WakuMessage) {.gcsafe.} = trace "Hit filter handler", contentTopic=msg.contentTopic chat.printReceivedMessage(msg) - await node.subscribe( - FilterRequest(contentFilters: @[ContentFilter(contentTopic: chat.contentTopic)], pubSubTopic: DefaultTopic, subscribe: true), - filterHandler - ) + await node.subscribe(pubsubTopic=DefaultTopic, contentTopics=chat.contentTopic, filterHandler) # Subscribe to a topic, if relay is mounted if conf.relay: diff --git a/apps/wakunode2/wakunode2.nim b/apps/wakunode2/wakunode2.nim index 66064448d..5eda893fa 100644 --- a/apps/wakunode2/wakunode2.nim +++ b/apps/wakunode2/wakunode2.nim @@ -418,17 +418,18 @@ proc setupProtocols(node: WakuNode, conf: WakuNodeConf, return err("failed to set node waku lightpush peer: " & getCurrentExceptionMsg()) # Filter setup. NOTE Must be mounted after relay - if (conf.filternode != "") or (conf.filter): + if conf.filter: try: await mountFilter(node, filterTimeout = chronos.seconds(conf.filterTimeout)) except: return err("failed to mount waku filter protocol: " & getCurrentExceptionMsg()) - if conf.filternode != "": - try: - setFilterPeer(node, conf.filternode) - except: - return err("failed to set node waku filter peer: " & getCurrentExceptionMsg()) + if conf.filternode != "": + try: + await mountFilterClient(node) + setFilterPeer(node, conf.filternode) + except: + return err("failed to set node waku filter peer: " & getCurrentExceptionMsg()) # waku peer exchange setup if (conf.peerExchangeNode != "") or (conf.peerExchange): diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index 1c7691cf9..e69f459da 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -5,7 +5,7 @@ import chronicles, testutils/unittests, stew/shims/net as stewNet, json_rpc/[rpcserver, rpcclient], - eth/[keys, rlp], eth/common/eth_types, + eth/keys, eth/common/eth_types, libp2p/[builders, switch, multiaddress], libp2p/protobuf/minprotobuf, libp2p/stream/[bufferstream, connection], @@ -27,6 +27,7 @@ import ../../waku/v2/protocol/waku_store, ../../waku/v2/protocol/waku_swap/waku_swap, ../../waku/v2/protocol/waku_filter, + ../../waku/v2/protocol/waku_filter/client, ../../waku/v2/utils/peers, ../../waku/v2/utils/time, ./testlib/common @@ -51,7 +52,7 @@ procSuite "Waku v2 JSON-RPC API": # RPC server setup let - rpcPort = Port(8545) + rpcPort = Port(8546) ta = initTAddress(bindIp, rpcPort) server = newRpcHttpServer([ta]) @@ -78,7 +79,7 @@ procSuite "Waku v2 JSON-RPC API": # RPC server setup let - rpcPort = Port(8545) + rpcPort = Port(8547) ta = initTAddress(bindIp, rpcPort) server = newRpcHttpServer([ta]) @@ -150,7 +151,7 @@ procSuite "Waku v2 JSON-RPC API": # RPC server setup let - rpcPort = Port(8545) + rpcPort = Port(8548) ta = initTAddress(bindIp, rpcPort) server = newRpcHttpServer([ta]) @@ -164,7 +165,7 @@ procSuite "Waku v2 JSON-RPC API": # First see if we can retrieve messages published on the default topic (node is already subscribed) await node2.publish(DefaultPubsubTopic, message1) - await sleepAsync(2000.millis) + await sleepAsync(100.millis) var messages = await client.get_waku_v2_relay_v1_messages(DefaultPubsubTopic) @@ -182,7 +183,7 @@ procSuite "Waku v2 JSON-RPC API": var response = await client.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic]) - await sleepAsync(2000.millis) + await sleepAsync(100.millis) check: # Node is now subscribed to pubSubTopic @@ -191,7 +192,7 @@ procSuite "Waku v2 JSON-RPC API": # Now publish a message on node1 and see if we receive it on node3 await node1.publish(pubSubTopic, message2) - await sleepAsync(2000.millis) + await sleepAsync(100.millis) messages = await client.get_waku_v2_relay_v1_messages(pubSubTopic) @@ -219,7 +220,7 @@ procSuite "Waku v2 JSON-RPC API": # RPC server setup let - rpcPort = Port(8545) + rpcPort = Port(8549) ta = initTAddress(bindIp, rpcPort) server = newRpcHttpServer([ta]) @@ -273,19 +274,26 @@ procSuite "Waku v2 JSON-RPC API": await node.stop() asyncTest "Filter API: subscribe/unsubscribe": - await node.start() + let + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.new(nodeKey1, bindIp, Port(60390)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.new(nodeKey2, bindIp, Port(60392)) - await node.mountRelay() + await allFutures(node1.start(), node2.start()) - await node.mountFilter() + await node1.mountFilter() + await node2.mountFilterClient() + + node2.setFilterPeer(node1.peerInfo.toRemotePeerInfo()) # RPC server setup let - rpcPort = Port(8545) + rpcPort = Port(8550) ta = initTAddress(bindIp, rpcPort) server = newRpcHttpServer([ta]) - installFilterApiHandlers(node, server, newTable[ContentTopic, seq[WakuMessage]]()) + installFilterApiHandlers(node2, server, newTable[ContentTopic, seq[WakuMessage]]()) server.start() let client = newRpcHttpClient() @@ -293,109 +301,31 @@ procSuite "Waku v2 JSON-RPC API": check: # Light node has not yet subscribed to any filters - node.filters.len() == 0 + node2.wakuFilterClient.getSubscriptionsCount() == 0 - let contentFilters = @[ContentFilter(contentTopic: DefaultContentTopic), - ContentFilter(contentTopic: ContentTopic("2")), - ContentFilter(contentTopic: ContentTopic("3")), - ContentFilter(contentTopic: ContentTopic("4")), - ] - var response = await client.post_waku_v2_filter_v1_subscription(contentFilters = contentFilters, topic = some(DefaultPubsubTopic)) - + let contentFilters = @[ + ContentFilter(contentTopic: DefaultContentTopic), + ContentFilter(contentTopic: ContentTopic("2")), + ContentFilter(contentTopic: ContentTopic("3")), + ContentFilter(contentTopic: ContentTopic("4")), + ] + var response = await client.post_waku_v2_filter_v1_subscription(contentFilters=contentFilters, topic=some(DefaultPubsubTopic)) check: - # Light node has successfully subscribed to a single filter - node.filters.len() == 1 response == true + # Light node has successfully subscribed to 4 content topics + node2.wakuFilterClient.getSubscriptionsCount() == 4 - response = await client.delete_waku_v2_filter_v1_subscription(contentFilters = contentFilters, topic = some(DefaultPubsubTopic)) - + response = await client.delete_waku_v2_filter_v1_subscription(contentFilters=contentFilters, topic=some(DefaultPubsubTopic)) check: + response == true # Light node has successfully unsubscribed from all filters - node.filters.len() == 0 - response == true + node2.wakuFilterClient.getSubscriptionsCount() == 0 + ## Cleanup await server.stop() await server.closeWait() - await node.stop() - - asyncTest "Filter API: get latest messages": - await node.start() - - # RPC server setup - let - rpcPort = Port(8545) - ta = initTAddress(bindIp, rpcPort) - server = newRpcHttpServer([ta]) - - installFilterApiHandlers(node, server, newTable[ContentTopic, seq[WakuMessage]]()) - server.start() - - await node.mountFilter() - - let client = newRpcHttpClient() - await client.connect("127.0.0.1", rpcPort, false) - - # First ensure subscription exists - - let sub = await client.post_waku_v2_filter_v1_subscription(contentFilters = @[ContentFilter(contentTopic: DefaultContentTopic)], topic = some(DefaultPubsubTopic)) - check: - sub - - # Now prime the node with some messages before tests - var - msgList = @[WakuMessage(payload: @[byte 0], contentTopic: ContentTopic("2")), - WakuMessage(payload: @[byte 1], contentTopic: DefaultContentTopic), - WakuMessage(payload: @[byte 2], contentTopic: DefaultContentTopic), - WakuMessage(payload: @[byte 3], contentTopic: DefaultContentTopic), - WakuMessage(payload: @[byte 4], contentTopic: DefaultContentTopic), - WakuMessage(payload: @[byte 5], contentTopic: DefaultContentTopic), - WakuMessage(payload: @[byte 6], contentTopic: DefaultContentTopic), - WakuMessage(payload: @[byte 7], contentTopic: DefaultContentTopic), - WakuMessage(payload: @[byte 8], contentTopic: DefaultContentTopic), - WakuMessage(payload: @[byte 9], contentTopic: ContentTopic("2"))] - - let - filters = node.filters - requestId = toSeq(Table(filters).keys)[0] - - for wakuMsg in msgList: - filters.notify(wakuMsg, requestId) - - var response = await client.get_waku_v2_filter_v1_messages(DefaultContentTopic) - check: - response.len() == 8 - response.allIt(it.contentTopic == DefaultContentTopic) - - # No new messages - response = await client.get_waku_v2_filter_v1_messages(DefaultContentTopic) - - check: - response.len() == 0 - - # Now ensure that no more than the preset max messages can be cached - - let maxSize = filter_api.maxCache - - for x in 1..(maxSize + 1): - # Try to cache 1 more than maximum allowed - filters.notify(WakuMessage(payload: @[byte x], contentTopic: DefaultContentTopic), requestId) - - await sleepAsync(2000.millis) - - response = await client.get_waku_v2_filter_v1_messages(DefaultContentTopic) - check: - # Max messages has not been exceeded - response.len == maxSize - response.allIt(it.contentTopic == DefaultContentTopic) - # Check that oldest item has been removed - response[0].payload == @[byte 2] - response[maxSize - 1].payload == @[byte (maxSize + 1)] - - await server.stop() - await server.closeWait() - - await node.stop() + await allFutures(node1.stop(), node2.stop()) asyncTest "Admin API: connect to ad-hoc peers": # Create a couple of nodes @@ -417,7 +347,7 @@ procSuite "Waku v2 JSON-RPC API": # RPC server setup let - rpcPort = Port(8545) + rpcPort = Port(8551) ta = initTAddress(bindIp, rpcPort) server = newRpcHttpServer([ta]) @@ -458,10 +388,10 @@ procSuite "Waku v2 JSON-RPC API": node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60220)) nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60222)) - peerInfo2 = node2.switch.peerInfo + peerInfo2 = node2.peerInfo nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60224)) - peerInfo3 = node3.switch.peerInfo + peerInfo3 = node3.peerInfo await allFutures([node1.start(), node2.start(), node3.start()]) @@ -475,7 +405,7 @@ procSuite "Waku v2 JSON-RPC API": # RPC server setup let - rpcPort = Port(8545) + rpcPort = Port(8552) ta = initTAddress(bindIp, rpcPort) server = newRpcHttpServer([ta]) @@ -510,7 +440,7 @@ procSuite "Waku v2 JSON-RPC API": # RPC server setup let - rpcPort = Port(8545) + rpcPort = Port(8553) ta = initTAddress(bindIp, rpcPort) server = newRpcHttpServer([ta]) @@ -521,6 +451,7 @@ procSuite "Waku v2 JSON-RPC API": await client.connect("127.0.0.1", rpcPort, false) await node.mountFilter() + await node.mountFilterClient() await node.mountSwap() let store = StoreQueueRef.new() await node.mountStore(store=store) @@ -539,13 +470,13 @@ procSuite "Waku v2 JSON-RPC API": storeKey = crypto.PrivateKey.random(ECDSA, rng[]).get() storePeer = PeerInfo.new(storeKey, @[locationAddr]) - node.wakuFilter.setPeer(filterPeer.toRemotePeerInfo()) node.wakuSwap.setPeer(swapPeer.toRemotePeerInfo()) - node.setStorePeer(storePeer.toRemotePeerInfo()) + node.setFilterPeer(filterPeer.toRemotePeerInfo()) let response = await client.get_waku_v2_admin_v1_peers() + ## Then check: response.len == 3 # Check filter peer @@ -555,6 +486,7 @@ procSuite "Waku v2 JSON-RPC API": # Check store peer (response.filterIt(it.protocol == WakuStoreCodec)[0]).multiaddr == constructMultiaddrStr(storePeer) + ## Cleanup await server.stop() await server.closeWait() @@ -588,10 +520,10 @@ procSuite "Waku v2 JSON-RPC API": # Setup two servers so we can see both sides of encrypted communication let - rpcPort1 = Port(8545) + rpcPort1 = Port(8554) ta1 = initTAddress(bindIp, rpcPort1) server1 = newRpcHttpServer([ta1]) - rpcPort3 = Port(8546) + rpcPort3 = Port(8555) ta3 = initTAddress(bindIp, rpcPort3) server3 = newRpcHttpServer([ta3]) @@ -616,7 +548,7 @@ procSuite "Waku v2 JSON-RPC API": let sub = await client3.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic]) - await sleepAsync(2000.millis) + await sleepAsync(100.millis) check: # node3 is now subscribed to pubSubTopic @@ -627,7 +559,7 @@ procSuite "Waku v2 JSON-RPC API": check: posted - await sleepAsync(2000.millis) + await sleepAsync(100.millis) # Let's see if we can receive, and decrypt, this message on node3 var messages = await client3.get_waku_v2_private_v1_asymmetric_messages(pubSubTopic, privateKey = (%keypair.seckey).getStr()) @@ -679,10 +611,10 @@ procSuite "Waku v2 JSON-RPC API": # Setup two servers so we can see both sides of encrypted communication let - rpcPort1 = Port(8545) + rpcPort1 = Port(8556) ta1 = initTAddress(bindIp, rpcPort1) server1 = newRpcHttpServer([ta1]) - rpcPort3 = Port(8546) + rpcPort3 = Port(8557) ta3 = initTAddress(bindIp, rpcPort3) server3 = newRpcHttpServer([ta3]) @@ -707,7 +639,7 @@ procSuite "Waku v2 JSON-RPC API": let sub = await client3.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic]) - await sleepAsync(2000.millis) + await sleepAsync(100.millis) check: # node3 is now subscribed to pubSubTopic @@ -718,7 +650,7 @@ procSuite "Waku v2 JSON-RPC API": check: posted - await sleepAsync(2000.millis) + await sleepAsync(100.millis) # Let's see if we can receive, and decrypt, this message on node3 var messages = await client3.get_waku_v2_private_v1_symmetric_messages(pubSubTopic, symkey = (%symkey).getStr()) diff --git a/tests/v2/test_peer_manager.nim b/tests/v2/test_peer_manager.nim index 97abaa399..ccb4e32f1 100644 --- a/tests/v2/test_peer_manager.nim +++ b/tests/v2/test_peer_manager.nim @@ -19,7 +19,6 @@ import ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/node/storage/peer/waku_peer_storage, ../../waku/v2/node/waku_node, - ../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_relay, ../../waku/v2/protocol/waku_store, ../../waku/v2/protocol/waku_filter, @@ -30,11 +29,9 @@ procSuite "Peer Manager": asyncTest "Peer dialing works": let nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), - Port(60000)) + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60800)) nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), - Port(60002)) + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60802)) peerInfo2 = node2.switch.peerInfo await allFutures([node1.start(), node2.start()]) @@ -63,11 +60,9 @@ procSuite "Peer Manager": asyncTest "Dialing fails gracefully": let nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), - Port(60000)) + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60810)) nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), - Port(60002)) + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60812)) peerInfo2 = node2.switch.peerInfo await node1.start() @@ -88,8 +83,7 @@ procSuite "Peer Manager": asyncTest "Adding, selecting and filtering peers work": let nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] - node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), - Port(60000)) + node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(60820)) # Create filter peer filterLoc = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet() filterKey = crypto.PrivateKey.random(ECDSA, rng[]).get() @@ -105,14 +99,14 @@ procSuite "Peer Manager": await node.start() - await node.mountFilter() + await node.mountFilterClient() await node.mountSwap() node.mountStoreClient() - node.wakuFilter.setPeer(filterPeer.toRemotePeerInfo()) node.wakuSwap.setPeer(swapPeer.toRemotePeerInfo()) node.setStorePeer(storePeer.toRemotePeerInfo()) + node.setFilterPeer(filterPeer.toRemotePeerInfo()) # Check peers were successfully added to peer manager check: @@ -133,11 +127,9 @@ procSuite "Peer Manager": asyncTest "Peer manager keeps track of connections": let nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), - Port(60000)) + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60830)) nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), - Port(60002)) + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60832)) peerInfo2 = node2.switch.peerInfo await node1.start() @@ -178,11 +170,9 @@ procSuite "Peer Manager": database = SqliteDatabase.new(":memory:")[] storage = WakuPeerStorage.new(database)[] nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), - Port(60000), peerStorage = storage) + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60840), peerStorage = storage) nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), - Port(60002)) + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60842)) peerInfo2 = node2.switch.peerInfo await node1.start() @@ -201,8 +191,7 @@ procSuite "Peer Manager": # Simulate restart by initialising a new node using the same storage let nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), - Port(60004), peerStorage = storage) + node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60844), peerStorage = storage) await node3.start() check: @@ -226,11 +215,9 @@ procSuite "Peer Manager": database = SqliteDatabase.new(":memory:")[] storage = WakuPeerStorage.new(database)[] nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), - Port(60000), peerStorage = storage) + node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60850), peerStorage = storage) nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), - Port(60002)) + node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60852)) peerInfo2 = node2.switch.peerInfo betaCodec = "/vac/waku/relay/2.0.0-beta2" stableCodec = "/vac/waku/relay/2.0.0" @@ -254,8 +241,7 @@ procSuite "Peer Manager": # Simulate restart by initialising a new node using the same storage let nodeKey3 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), - Port(60004), peerStorage = storage) + node3 = WakuNode.new(nodeKey3, ValidIpAddress.init("0.0.0.0"), Port(60854), peerStorage = storage) await node3.mountRelay() node3.wakuRelay.codec = stableCodec diff --git a/tests/v2/test_waku_filter.nim b/tests/v2/test_waku_filter.nim index 935071879..f1d8f1705 100644 --- a/tests/v2/test_waku_filter.nim +++ b/tests/v2/test_waku_filter.nim @@ -5,193 +5,171 @@ import testutils/unittests, chronos, chronicles, - libp2p/crypto/crypto, - libp2p/multistream + libp2p/crypto/crypto import ../../waku/v2/node/peer_manager/peer_manager, ../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_filter, - ../test_helpers, + ../../waku/v2/protocol/waku_filter/client, ./utils, ./testlib/common, ./testlib/switch -const dummyHandler = proc(requestId: string, msg: MessagePush) {.async, gcsafe, closure.} = discard +proc newTestWakuFilterNode(switch: Switch, timeout: Duration = 2.hours): Future[WakuFilter] {.async.} = + let + peerManager = PeerManager.new(switch) + rng = crypto.newRng() + proto = WakuFilter.new(peerManager, rng, timeout) + + await proto.start() + switch.mount(proto) + + return proto + +proc newTestWakuFilterClient(switch: Switch): Future[WakuFilterClient] {.async.} = + let + peerManager = PeerManager.new(switch) + rng = crypto.newRng() + proto = WakuFilterClient.new(peerManager, rng) + + await proto.start() + switch.mount(proto) + + return proto # TODO: Extend test coverage -procSuite "Waku Filter": - +suite "Waku Filter": asyncTest "should forward messages to client after subscribed": ## Setup - let rng = crypto.newRng() let - clientSwitch = newTestSwitch() serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() await allFutures(serverSwitch.start(), clientSwitch.start()) + let + server = await newTestWakuFilterNode(serverSwitch) + client = await newTestWakuFilterClient(clientSwitch) + ## Given - # Server - let - serverPeerManager = PeerManager.new(serverSwitch) - serverProto = WakuFilter.init(serverPeerManager, rng, dummyHandler) - await serverProto.start() - serverSwitch.mount(serverProto) + let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo() + + let pushHandlerFuture = newFuture[(string, WakuMessage)]() + proc pushHandler(pubsubTopic: string, message: WakuMessage) {.gcsafe, closure.} = + pushHandlerFuture.complete((pubsubTopic, message)) - # Client - let handlerFuture = newFuture[(string, MessagePush)]() - proc handler(requestId: string, push: MessagePush) {.async, gcsafe, closure.} = - handlerFuture.complete((requestId, push)) - - let - clientPeerManager = PeerManager.new(clientSwitch) - clientProto = WakuFilter.init(clientPeerManager, rng, handler) - await clientProto.start() - clientSwitch.mount(clientProto) - - clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) + let + pubsubTopic = DefaultPubsubTopic + contentTopic = "test-content-topic" + msg = fakeWakuMessage(contentTopic=contentTopic) ## When - let resSubscription = await clientProto.subscribe(DefaultPubsubTopic, @[DefaultContentTopic]) - require resSubscription.isOk() + require (await client.subscribe(pubsubTopic, contentTopic, pushHandler, peer=serverAddr)).isOk() + # WARN: Sleep necessary to avoid a race condition between the subscription and the handle message proc await sleepAsync(5.milliseconds) - let message = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DefaultContentTopic) - await serverProto.handleMessage(DefaultPubsubTopic, message) + await server.handleMessage(pubsubTopic, msg) + + require await pushHandlerFuture.withTimeout(5.seconds) ## Then - let subscriptionRequestId = resSubscription.get() - let (requestId, push) = await handlerFuture - + let (pushedMsgPubsubTopic, pushedMsg) = pushHandlerFuture.read() check: - requestId == subscriptionRequestId - push.messages == @[message] + pushedMsgPubsubTopic == pubsubTopic + pushedMsg == msg ## Cleanup await allFutures(clientSwitch.stop(), serverSwitch.stop()) asyncTest "should not forward messages to client after unsuscribed": ## Setup - let rng = crypto.newRng() let - clientSwitch = newTestSwitch() serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() await allFutures(serverSwitch.start(), clientSwitch.start()) + + let + server = await newTestWakuFilterNode(serverSwitch) + client = await newTestWakuFilterClient(clientSwitch) ## Given - # Server - let - serverPeerManager = PeerManager.new(serverSwitch) - serverProto = WakuFilter.init(serverPeerManager, rng, dummyHandler) - await serverProto.start() - serverSwitch.mount(serverProto) + let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo() - # Client - var handlerFuture = newFuture[void]() - proc handler(requestId: string, push: MessagePush) {.async, gcsafe, closure.} = - handlerFuture.complete() + var pushHandlerFuture = newFuture[void]() + proc pushHandler(pubsubTopic: string, message: WakuMessage) {.gcsafe, closure.} = + pushHandlerFuture.complete() - let - clientPeerManager = PeerManager.new(clientSwitch) - clientProto = WakuFilter.init(clientPeerManager, rng, handler) - await clientProto.start() - clientSwitch.mount(clientProto) + let + pubsubTopic = DefaultPubsubTopic + contentTopic = "test-content-topic" + msg = fakeWakuMessage(contentTopic=contentTopic) - clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) - - ## Given - let message = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DefaultContentTopic) - - let resSubscription = await clientProto.subscribe(DefaultPubsubTopic, @[DefaultContentTopic]) - require resSubscription.isOk() + ## When + require (await client.subscribe(pubsubTopic, contentTopic, pushHandler, peer=serverAddr)).isOk() + # WARN: Sleep necessary to avoid a race condition between the subscription and the handle message proc await sleepAsync(5.milliseconds) - await serverProto.handleMessage(DefaultPubsubTopic, message) - let handlerWasCalledAfterSubscription = await handlerFuture.withTimeout(1.seconds) - require handlerWasCalledAfterSubscription + await server.handleMessage(pubsubTopic, msg) + + require await pushHandlerFuture.withTimeout(1.seconds) # Reset to test unsubscribe - handlerFuture = newFuture[void]() + pushHandlerFuture = newFuture[void]() - let resUnsubscription = await clientProto.unsubscribe(DefaultPubsubTopic, @[DefaultContentTopic]) - require resUnsubscription.isOk() + require (await client.unsubscribe(pubsubTopic, contentTopic, peer=serverAddr)).isOk() + # WARN: Sleep necessary to avoid a race condition between the unsubscription and the handle message proc await sleepAsync(5.milliseconds) - await serverProto.handleMessage(DefaultPubsubTopic, message) + await server.handleMessage(pubsubTopic, msg) ## Then - let handlerWasCalledAfterUnsubscription = await handlerFuture.withTimeout(1.seconds) + let handlerWasCalledAfterUnsubscription = await pushHandlerFuture.withTimeout(1.seconds) check: not handlerWasCalledAfterUnsubscription ## Cleanup await allFutures(clientSwitch.stop(), serverSwitch.stop()) - - asyncTest "subscription should fail if no filter peer is provided": - ## Setup - let clientSwitch = newTestSwitch() - await clientSwitch.start() - - ## Given - let clientProto = WakuFilter.init(PeerManager.new(clientSwitch), crypto.newRng(), dummyHandler) - await clientProto.start() - clientSwitch.mount(clientProto) - - ## When - let resSubscription = await clientProto.subscribe(DefaultPubsubTopic, @[DefaultContentTopic]) - - ## Then - check: - resSubscription.isErr() - resSubscription.error() == "peer_not_found_failure" asyncTest "peer subscription should be dropped if connection fails for second time after the timeout has elapsed": ## Setup - let rng = crypto.newRng() let - clientSwitch = newTestSwitch() serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() await allFutures(serverSwitch.start(), clientSwitch.start()) + let + server = await newTestWakuFilterNode(serverSwitch, timeout=200.milliseconds) + client = await newTestWakuFilterClient(clientSwitch) + ## Given - # Server - let - serverPeerManager = PeerManager.new(serverSwitch) - serverProto = WakuFilter.init(serverPeerManager, rng, dummyHandler, timeout=1.seconds) - await serverProto.start() - serverSwitch.mount(serverProto) + let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo() - # Client - var handlerFuture = newFuture[void]() - proc handler(requestId: string, push: MessagePush) {.async, gcsafe, closure.} = - handlerFuture.complete() + var pushHandlerFuture = newFuture[void]() + proc pushHandler(pubsubTopic: string, message: WakuMessage) {.gcsafe, closure.} = + pushHandlerFuture.complete() - let - clientPeerManager = PeerManager.new(clientSwitch) - clientProto = WakuFilter.init(clientPeerManager, rng, handler) - await clientProto.start() - clientSwitch.mount(clientProto) - - clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) + let + pubsubTopic = DefaultPubsubTopic + contentTopic = "test-content-topic" + msg = fakeWakuMessage(contentTopic=contentTopic) ## When - let resSubscription = await clientProto.subscribe(DefaultPubsubTopic, @[DefaultContentTopic]) - check resSubscription.isOk() + require (await client.subscribe(pubsubTopic, contentTopic, pushHandler, peer=serverAddr)).isOk() + # WARN: Sleep necessary to avoid a race condition between the unsubscription and the handle message proc await sleepAsync(5.milliseconds) - let message = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DefaultContentTopic) - - await serverProto.handleMessage(DefaultPubsubTopic, message) - let handlerShouldHaveBeenCalled = await handlerFuture.withTimeout(1.seconds) - require handlerShouldHaveBeenCalled + await server.handleMessage(DefaultPubsubTopic, msg) + + # Push handler should be called + require await pushHandlerFuture.withTimeout(1.seconds) # Stop client node to test timeout unsubscription await clientSwitch.stop() @@ -199,19 +177,19 @@ procSuite "Waku Filter": await sleepAsync(5.milliseconds) # First failure should not remove the subscription - await serverProto.handleMessage(DefaultPubsubTopic, message) + await server.handleMessage(DefaultPubsubTopic, msg) let - subscriptionsBeforeTimeout = serverProto.subscriptions.len() - failedPeersBeforeTimeout = serverProto.failedPeers.len() + subscriptionsBeforeTimeout = server.subscriptions.len() + failedPeersBeforeTimeout = server.failedPeers.len() - # Wait for peer connection failure timeout to elapse - await sleepAsync(1.seconds) + # Wait for the configured peer connection timeout to elapse (200ms) + await sleepAsync(200.milliseconds) - #Second failure should remove the subscription - await serverProto.handleMessage(DefaultPubsubTopic, message) + # Second failure should remove the subscription + await server.handleMessage(DefaultPubsubTopic, msg) let - subscriptionsAfterTimeout = serverProto.subscriptions.len() - failedPeersAfterTimeout = serverProto.failedPeers.len() + subscriptionsAfterTimeout = server.subscriptions.len() + failedPeersAfterTimeout = server.failedPeers.len() ## Then check: @@ -222,55 +200,45 @@ procSuite "Waku Filter": ## Cleanup await serverSwitch.stop() - + asyncTest "peer subscription should not be dropped if connection recovers before timeout elapses": ## Setup let - clientKey = PrivateKey.random(ECDSA, rng[]).get() - clientAddress = MultiAddress.init("/ip4/127.0.0.1/tcp/65000").get() - - let rng = crypto.newRng() - let - clientSwitch = newTestSwitch(some(clientKey), some(clientAddress)) serverSwitch = newTestSwitch() + clientSwitch = newTestSwitch() await allFutures(serverSwitch.start(), clientSwitch.start()) + let + server = await newTestWakuFilterNode(serverSwitch, timeout=200.milliseconds) + client = await newTestWakuFilterClient(clientSwitch) + ## Given - # Server - let - serverPeerManager = PeerManager.new(serverSwitch) - serverProto = WakuFilter.init(serverPeerManager, rng, dummyHandler, timeout=2.seconds) - await serverProto.start() - serverSwitch.mount(serverProto) + let serverAddr = serverSwitch.peerInfo.toRemotePeerInfo() - # Client - var handlerFuture = newFuture[void]() - proc handler(requestId: string, push: MessagePush) {.async, gcsafe, closure.} = - handlerFuture.complete() + var pushHandlerFuture = newFuture[void]() + proc pushHandler(pubsubTopic: string, message: WakuMessage) {.gcsafe, closure.} = + pushHandlerFuture.complete() - let - clientPeerManager = PeerManager.new(clientSwitch) - clientProto = WakuFilter.init(clientPeerManager, rng, handler) - await clientProto.start() - clientSwitch.mount(clientProto) - - clientProto.setPeer(serverSwitch.peerInfo.toRemotePeerInfo()) + let + pubsubTopic = DefaultPubsubTopic + contentTopic = "test-content-topic" + msg = fakeWakuMessage(contentTopic=contentTopic) ## When - let message = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: DefaultContentTopic) - - let resSubscription = await clientProto.subscribe(DefaultPubsubTopic, @[DefaultContentTopic]) - check resSubscription.isOk() + require (await client.subscribe(pubsubTopic, contentTopic, pushHandler, peer=serverAddr)).isOk() + # WARN: Sleep necessary to avoid a race condition between the unsubscription and the handle message proc await sleepAsync(5.milliseconds) - await serverProto.handleMessage(DefaultPubsubTopic, message) - handlerFuture = newFuture[void]() - + await server.handleMessage(DefaultPubsubTopic, msg) + + # Push handler should be called + require await pushHandlerFuture.withTimeout(1.seconds) + let - subscriptionsBeforeFailure = serverProto.subscriptions.len() - failedPeersBeforeFailure = serverProto.failedPeers.len() + subscriptionsBeforeFailure = server.subscriptions.len() + failedPeersBeforeFailure = server.failedPeers.len() # Stop switch to test unsubscribe await clientSwitch.stop() @@ -278,29 +246,33 @@ procSuite "Waku Filter": await sleepAsync(5.milliseconds) # First failure should add to failure list - await serverProto.handleMessage(DefaultPubsubTopic, message) - handlerFuture = newFuture[void]() + await server.handleMessage(DefaultPubsubTopic, msg) + + pushHandlerFuture = newFuture[void]() let - subscriptionsAfterFailure = serverProto.subscriptions.len() - failedPeersAfterFailure = serverProto.failedPeers.len() + subscriptionsAfterFailure = server.subscriptions.len() + failedPeersAfterFailure = server.failedPeers.len() + + await sleepAsync(100.milliseconds) - await sleepAsync(250.milliseconds) - # Start switch with same key as before - var clientSwitch2 = newTestSwitch(some(clientKey), some(clientAddress)) + let clientSwitch2 = newTestSwitch( + some(clientSwitch.peerInfo.privateKey), + some(clientSwitch.peerInfo.addrs[0]) + ) await clientSwitch2.start() - await clientProto.start() - clientSwitch2.mount(clientProto) + await client.start() + clientSwitch2.mount(client) # If push succeeds after failure, the peer should removed from failed peers list - await serverProto.handleMessage(DefaultPubsubTopic, message) - let handlerShouldHaveBeenCalled = await handlerFuture.withTimeout(1.seconds) - - let - subscriptionsAfterSuccessfulConnection = serverProto.subscriptions.len() - failedPeersAfterSuccessfulConnection = serverProto.failedPeers.len() + await server.handleMessage(DefaultPubsubTopic, msg) + let handlerShouldHaveBeenCalled = await pushHandlerFuture.withTimeout(1.seconds) + let + subscriptionsAfterSuccessfulConnection = server.subscriptions.len() + failedPeersAfterSuccessfulConnection = server.failedPeers.len() + ## Then check: handlerShouldHaveBeenCalled @@ -314,6 +286,6 @@ procSuite "Waku Filter": failedPeersBeforeFailure == 0 failedPeersAfterFailure == 1 failedPeersAfterSuccessfulConnection == 0 - + ## Cleanup await allFutures(clientSwitch2.stop(), serverSwitch.stop()) diff --git a/tests/v2/test_wakunode_filter.nim b/tests/v2/test_wakunode_filter.nim index cfee2ba46..08702e5f0 100644 --- a/tests/v2/test_wakunode_filter.nim +++ b/tests/v2/test_wakunode_filter.nim @@ -6,292 +6,60 @@ import testutils/unittests, chronicles, chronos, - libp2p/crypto/crypto, - libp2p/peerid, - libp2p/multiaddress, - libp2p/switch, - libp2p/protocols/pubsub/rpc/messages, - libp2p/protocols/pubsub/pubsub, - libp2p/protocols/pubsub/gossipsub, - eth/keys + libp2p/crypto/crypto import - ../../waku/v2/protocol/[waku_relay, waku_message], - ../../waku/v2/protocol/waku_filter, ../../waku/v2/node/peer_manager/peer_manager, + ../../waku/v2/node/waku_node, + ../../waku/v2/protocol/waku_message, + ../../waku/v2/protocol/waku_filter, ../../waku/v2/utils/peers, - ../../waku/v2/node/waku_node + ./testlib/common -procSuite "WakuNode - Filter": - let rng = keys.newRng() +suite "WakuNode - Filter": - asyncTest "Message published with content filter is retrievable": + asyncTest "subscriber should receive the message handled by the publisher": + ## Setup + let rng = crypto.newRng() let - nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] - node = WakuNode.new(nodeKey, ValidIpAddress.init("0.0.0.0"), - Port(60000)) - pubSubTopic = "chat" - contentTopic = ContentTopic("/waku/2/default-content/proto") - filterRequest = FilterRequest(pubSubTopic: pubSubTopic, contentFilters: @[ContentFilter(contentTopic: contentTopic)], subscribe: true) - message = WakuMessage(payload: "hello world".toBytes(), - contentTopic: contentTopic) + serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60110)) + clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[] + client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60111)) - # This could/should become a more fixed handler (at least default) that - # would be enforced on WakuNode level. - proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.init(data) - if msg.isOk(): - check: - topic == "chat" - node.filters.notify(msg.value(), topic) + await allFutures(server.start(), client.start()) - var completionFut = newFuture[bool]() + await server.mountFilter() + await client.mountFilterClient() - # This would be the actual application handler - proc contentHandler(msg: WakuMessage) {.gcsafe, closure.} = - let message = string.fromBytes(msg.payload) - check: - message == "hello world" - completionFut.complete(true) + ## Given + let serverPeerInfo = server.peerInfo.toRemotePeerInfo() - await node.start() - await node.mountRelay() - - # Subscribe our node to the pubSubTopic where all chat data go onto. - node.subscribe(pubSubTopic, relayHandler) - - # Subscribe a contentFilter to trigger a specific application handler when - # WakuMessages with that content are received - await node.subscribe(filterRequest, contentHandler) - - await sleepAsync(2000.millis) - - await node.publish(pubSubTopic, message) - - check: - (await completionFut.withTimeout(5.seconds)) == true - - await node.stop() - - asyncTest "Content filtered publishing over network": let - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) - let - pubSubTopic = "chat" - contentTopic = ContentTopic("/waku/2/default-content/proto") - filterRequest = FilterRequest(pubSubTopic: pubSubTopic, contentFilters: @[ContentFilter(contentTopic: contentTopic)], subscribe: true) - message = WakuMessage(payload: "hello world".toBytes(), - contentTopic: contentTopic) + pubSubTopic = DefaultPubsubTopic + contentTopic = DefaultContentTopic + message = fakeWakuMessage(contentTopic=contentTopic) - var completionFut = newFuture[bool]() + var filterPushHandlerFut = newFuture[(PubsubTopic, WakuMessage)]() + proc filterPushHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.gcsafe, closure.} = + filterPushHandlerFut.complete((pubsubTopic, msg)) - # This could/should become a more fixed handler (at least default) that - # would be enforced on WakuNode level. - proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - let msg = WakuMessage.init(data) - if msg.isOk(): - check: - topic == "chat" - node1.filters.notify(msg.value(), topic) + ## When + await client.filterSubscribe(pubsubTopic, contentTopic, filterPushHandler, peer=serverPeerInfo) - # This would be the actual application handler - proc contentHandler(msg: WakuMessage) {.gcsafe, closure.} = - let message = string.fromBytes(msg.payload) - check: - message == "hello world" - completionFut.complete(true) + # Wait for subscription to take effect + await sleepAsync(100.millis) - await allFutures([node1.start(), node2.start()]) + await server.filterHandleMessage(pubSubTopic, message) - await node1.mountRelay() - await node2.mountRelay() - - await node1.mountFilter() - await node2.mountFilter() - - # Subscribe our node to the pubSubTopic where all chat data go onto. - node1.subscribe(pubSubTopic, relayHandler) - # Subscribe a contentFilter to trigger a specific application handler when - # WakuMessages with that content are received - node1.wakuFilter.setPeer(node2.switch.peerInfo.toRemotePeerInfo()) - await node1.subscribe(filterRequest, contentHandler) - await sleepAsync(2000.millis) - - # Connect peers by dialing from node2 to node1 - discard await node2.switch.dial(node1.switch.peerInfo.peerId, node1.switch.peerInfo.addrs, WakuRelayCodec) - - # We need to sleep to allow the subscription to go through - info "Going to sleep to allow subscribe to go through" - await sleepAsync(2000.millis) - - info "Waking up and publishing" - await node2.publish(pubSubTopic, message) + require await filterPushHandlerFut.withTimeout(5.seconds) + ## Then + check filterPushHandlerFut.completed() + let (filterPubsubTopic, filterMessage) = filterPushHandlerFut.read() check: - (await completionFut.withTimeout(5.seconds)) == true - await node1.stop() - await node2.stop() - - asyncTest "Can receive filtered messages published on both default and other topics": - let - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) - defaultTopic = "/waku/2/default-waku/proto" - otherTopic = "/non/waku/formatted" - defaultContentTopic = "defaultCT" - otherContentTopic = "otherCT" - defaultPayload = @[byte 1] - otherPayload = @[byte 9] - defaultMessage = WakuMessage(payload: defaultPayload, contentTopic: defaultContentTopic) - otherMessage = WakuMessage(payload: otherPayload, contentTopic: otherContentTopic) - defaultFR = FilterRequest(contentFilters: @[ContentFilter(contentTopic: defaultContentTopic)], subscribe: true) - otherFR = FilterRequest(contentFilters: @[ContentFilter(contentTopic: otherContentTopic)], subscribe: true) + filterPubsubTopic == pubsubTopic + filterMessage == message - await node1.start() - await node1.mountRelay() - await node1.mountFilter() - - await node2.start() - await node2.mountRelay() - await node2.mountFilter() - node2.wakuFilter.setPeer(node1.switch.peerInfo.toRemotePeerInfo()) - - var defaultComplete = newFuture[bool]() - var otherComplete = newFuture[bool]() - - # Subscribe nodes 1 and 2 to otherTopic - proc emptyHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - # Do not notify filters or subscriptions here. This should be default behaviour for all topics - discard - - node1.subscribe(otherTopic, emptyHandler) - node2.subscribe(otherTopic, emptyHandler) - - await sleepAsync(2000.millis) - - proc defaultHandler(msg: WakuMessage) {.gcsafe, closure.} = - check: - msg.payload == defaultPayload - msg.contentTopic == defaultContentTopic - defaultComplete.complete(true) - - proc otherHandler(msg: WakuMessage) {.gcsafe, closure.} = - check: - msg.payload == otherPayload - msg.contentTopic == otherContentTopic - otherComplete.complete(true) - - # Subscribe a contentFilter to trigger a specific application handler when - # WakuMessages with that content are received - await node2.subscribe(defaultFR, defaultHandler) - - await sleepAsync(2000.millis) - - # Let's check that content filtering works on the default topic - await node1.publish(defaultTopic, defaultMessage) - - check: - (await defaultComplete.withTimeout(5.seconds)) == true - - # Now check that content filtering works on other topics - await node2.subscribe(otherFR, otherHandler) - - await sleepAsync(2000.millis) - - await node1.publish(otherTopic,otherMessage) - - check: - (await otherComplete.withTimeout(5.seconds)) == true - - await node1.stop() - await node2.stop() - - asyncTest "Filter protocol works on node without relay capability": - let - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), Port(60000)) - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), Port(60002)) - defaultTopic = "/waku/2/default-waku/proto" - contentTopic = "defaultCT" - payload = @[byte 1] - message = WakuMessage(payload: payload, contentTopic: contentTopic) - filterRequest = FilterRequest(contentFilters: @[ContentFilter(contentTopic: contentTopic)], subscribe: true) - - await node1.start() - await node1.mountRelay() - await node1.mountFilter() - - await node2.start() - await node2.mountFilter() - node2.wakuFilter.setPeer(node1.switch.peerInfo.toRemotePeerInfo()) - - check: - node1.wakuRelay.isNil == false # Node1 is a full node - node2.wakuRelay.isNil == true # Node 2 is a light node - - var completeFut = newFuture[bool]() - - proc filterHandler(msg: WakuMessage) {.gcsafe, closure.} = - check: - msg.payload == payload - msg.contentTopic == contentTopic - completeFut.complete(true) - - # Subscribe a contentFilter to trigger a specific application handler when - # WakuMessages with that content are received - await node2.subscribe(filterRequest, filterHandler) - - await sleepAsync(2000.millis) - - # Let's check that content filtering works on the default topic - await node1.publish(defaultTopic, message) - - check: - (await completeFut.withTimeout(5.seconds)) == true - - await node1.stop() - await node2.stop() - - asyncTest "Filter protocol returns expected message": - let - nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node1 = WakuNode.new(nodeKey1, ValidIpAddress.init("0.0.0.0"), - Port(60000)) - nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] - node2 = WakuNode.new(nodeKey2, ValidIpAddress.init("0.0.0.0"), - Port(60002)) - contentTopic = ContentTopic("/waku/2/default-content/proto") - message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) - - var completionFut = newFuture[bool]() - - await node1.start() - await node1.mountFilter() - await node2.start() - await node2.mountFilter() - - node1.wakuFilter.setPeer(node2.switch.peerInfo.toRemotePeerInfo()) - - proc handler(msg: WakuMessage) {.gcsafe, closure.} = - check: - msg == message - completionFut.complete(true) - - await node1.subscribe(FilterRequest(pubSubTopic: "/waku/2/default-waku/proto", contentFilters: @[ContentFilter(contentTopic: contentTopic)], subscribe: true), handler) - - await sleepAsync(2000.millis) - - await node2.wakuFilter.handleMessage("/waku/2/default-waku/proto", message) - - await sleepAsync(2000.millis) - - check: - (await completionFut.withTimeout(5.seconds)) == true - await node1.stop() - await node2.stop() + ## Cleanup + await allFutures(client.stop(), server.stop()) diff --git a/tests/v2/test_wakunode_store.nim b/tests/v2/test_wakunode_store.nim index 45107feea..eb182f200 100644 --- a/tests/v2/test_wakunode_store.nim +++ b/tests/v2/test_wakunode_store.nim @@ -39,9 +39,9 @@ procSuite "WakuNode - Store": ## Setup let serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[] - server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60002)) + server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60432)) clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[] - client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60000)) + client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60430)) await allFutures(client.start(), server.start()) await server.mountStore(store=newTestMessageStore()) @@ -73,34 +73,31 @@ procSuite "WakuNode - Store": ## Setup let filterSourceKey = crypto.PrivateKey.random(Secp256k1, rng[])[] - filterSource = WakuNode.new(filterSourceKey, ValidIpAddress.init("0.0.0.0"), Port(60004)) + filterSource = WakuNode.new(filterSourceKey, ValidIpAddress.init("0.0.0.0"), Port(60404)) serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[] - server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60002)) + server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60402)) clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[] - client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60000)) + client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60400)) await allFutures(client.start(), server.start(), filterSource.start()) await filterSource.mountFilter() await server.mountStore(store=newTestMessageStore()) - await server.mountFilter() - await client.mountStore() + await server.mountFilterClient() client.mountStoreClient() - server.wakuFilter.setPeer(filterSource.peerInfo.toRemotePeerInfo()) - ## Given let message = fakeWakuMessage() - let serverPeer = server.peerInfo.toRemotePeerInfo() - ## Then - let filterFut = newFuture[bool]() - proc filterReqHandler(msg: WakuMessage) {.gcsafe, closure.} = - check: - msg == message - filterFut.complete(true) + let + serverPeer = server.peerInfo.toRemotePeerInfo() + filterSourcePeer = filterSource.peerInfo.toRemotePeerInfo() - let filterReq = FilterRequest(pubSubTopic: DefaultPubsubTopic, contentFilters: @[ContentFilter(contentTopic: DefaultContentTopic)], subscribe: true) - await server.subscribe(filterReq, filterReqHandler) + ## Then + let filterFut = newFuture[(PubsubTopic, WakuMessage)]() + proc filterHandler(pubsubTopic: PubsubTopic, msg: WakuMessage) {.gcsafe, closure.} = + filterFut.complete((pubsubTopic, msg)) + + await server.filterSubscribe(DefaultPubsubTopic, DefaultContentTopic, filterHandler, peer=filterSourcePeer) await sleepAsync(100.millis) @@ -108,7 +105,7 @@ procSuite "WakuNode - Store": await filterSource.wakuFilter.handleMessage(DefaultPubsubTopic, message) # Wait for the server filter to receive the push message - require (await filterFut.withTimeout(5.seconds)) + require await filterFut.withTimeout(5.seconds) let res = await client.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]), peer=serverPeer) @@ -120,6 +117,11 @@ procSuite "WakuNode - Store": response.messages.len == 1 response.messages[0] == message + let (handledPubsubTopic, handledMsg) = filterFut.read() + check: + handledPubsubTopic == DefaultPubsubTopic + handledMsg == message + ## Cleanup await allFutures(client.stop(), server.stop(), filterSource.stop()) @@ -128,9 +130,9 @@ procSuite "WakuNode - Store": ## Setup let serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[] - server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60002)) + server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60412)) clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[] - client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60000)) + client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60410)) await allFutures(client.start(), server.start()) @@ -161,9 +163,9 @@ procSuite "WakuNode - Store": ## Setup let serverKey = crypto.PrivateKey.random(Secp256k1, rng[])[] - server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60002)) + server = WakuNode.new(serverKey, ValidIpAddress.init("0.0.0.0"), Port(60422)) clientKey = crypto.PrivateKey.random(Secp256k1, rng[])[] - client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60000)) + client = WakuNode.new(clientKey, ValidIpAddress.init("0.0.0.0"), Port(60420)) await allFutures(server.start(), client.start()) await server.mountStore(store=StoreQueueRef.new()) diff --git a/waku/v2/node/jsonrpc/filter_api.nim b/waku/v2/node/jsonrpc/filter_api.nim index 6f33121e7..1208c6a76 100644 --- a/waku/v2/node/jsonrpc/filter_api.nim +++ b/waku/v2/node/jsonrpc/filter_api.nim @@ -1,95 +1,100 @@ {.push raises: [Defect].} import - std/[tables,sequtils], + std/[tables, sequtils], chronicles, json_rpc/rpcserver import ../../protocol/waku_message, ../../protocol/waku_filter, + ../../protocol/waku_filter/client, ../waku_node, ./jsonrpc_types export jsonrpc_types logScope: - topics = "filter api" + topics = "wakunode.rpc.filter" + +const DefaultPubsubTopic: PubsubTopic = "/waku/2/default-waku/proto" const futTimeout* = 5.seconds # Max time to wait for futures -const maxCache* = 30 # Max number of messages cached per topic @TODO make this configurable +const maxCache* = 30 # Max number of messages cached per topic TODO: make this configurable + proc installFilterApiHandlers*(node: WakuNode, rpcsrv: RpcServer, messageCache: MessageCache) = - - proc filterHandler(msg: WakuMessage) {.gcsafe, closure, raises: [Defect].} = - # Add message to current cache - trace "WakuMessage received", msg=msg - - # Make a copy of msgs for this topic to modify - var msgs = messageCache.getOrDefault(msg.contentTopic, @[]) - - if msgs.len >= maxCache: - # Message cache on this topic exceeds maximum. Delete oldest. - # @TODO this may become a bottle neck if called as the norm rather than exception when adding messages. Performance profile needed. - msgs.delete(0,0) - msgs.add(msg) - - # Replace indexed entry with copy - # @TODO max number of content topics could be limited in node - messageCache[msg.contentTopic] = msgs - ## Filter API version 1 definitions - - rpcsrv.rpc("get_waku_v2_filter_v1_messages") do(contentTopic: ContentTopic) -> seq[WakuMessage]: + + rpcsrv.rpc("get_waku_v2_filter_v1_messages") do (contentTopic: ContentTopic) -> seq[WakuMessage]: ## Returns all WakuMessages received on a content topic since the ## last time this method was called - ## @TODO ability to specify a return message limit + ## TODO: ability to specify a return message limit debug "get_waku_v2_filter_v1_messages", contentTopic=contentTopic - if messageCache.hasKey(contentTopic): - let msgs = messageCache[contentTopic] - # Clear cache before next call - messageCache[contentTopic] = @[] - return msgs - else: - # Not subscribed to this content topic + if not messageCache.hasKey(contentTopic): raise newException(ValueError, "Not subscribed to content topic: " & $contentTopic) + + let msgs = messageCache[contentTopic] + # Clear cache before next call + messageCache[contentTopic] = @[] + return msgs - rpcsrv.rpc("post_waku_v2_filter_v1_subscription") do(contentFilters: seq[ContentFilter], topic: Option[string]) -> bool: + + rpcsrv.rpc("post_waku_v2_filter_v1_subscription") do (contentFilters: seq[ContentFilter], topic: Option[string]) -> bool: ## Subscribes a node to a list of content filters debug "post_waku_v2_filter_v1_subscription" - # Construct a filter request - # @TODO use default PubSub topic if undefined - let fReq = if topic.isSome: FilterRequest(pubSubTopic: topic.get, contentFilters: contentFilters, subscribe: true) else: FilterRequest(contentFilters: contentFilters, subscribe: true) + let + pubsubTopic: string = topic.get(DefaultPubsubTopic) + contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic) + + let pushHandler:FilterPushHandler = proc(pubsubTopic: string, msg: WakuMessage) {.gcsafe, closure.} = + # Add message to current cache + trace "WakuMessage received", msg=msg + + # Make a copy of msgs for this topic to modify + var msgs = messageCache.getOrDefault(msg.contentTopic, @[]) + + if msgs.len >= maxCache: + # Message cache on this topic exceeds maximum. Delete oldest. + # TODO: this may become a bottle neck if called as the norm rather than exception when adding messages. Performance profile needed. + msgs.delete(0,0) + + msgs.add(msg) + + # Replace indexed entry with copy + # TODO: max number of content topics could be limited in node + messageCache[msg.contentTopic] = msgs + + let subFut = node.subscribe(pubsubTopic, contentTopics, pushHandler) + + if not await subFut.withTimeout(futTimeout): + raise newException(ValueError, "Failed to subscribe to contentFilters") + + # Successfully subscribed to all content filters + for cTopic in contentTopics: + # Create message cache for each subscribed content topic + messageCache[cTopic] = @[] - if (await node.subscribe(fReq, filterHandler).withTimeout(futTimeout)): - # Successfully subscribed to all content filters - - for cTopic in contentFilters.mapIt(it.contentTopic): - # Create message cache for each subscribed content topic - messageCache[cTopic] = @[] - - return true - else: - # Failed to subscribe to one or more content filters - raise newException(ValueError, "Failed to subscribe to contentFilters " & repr(fReq)) + return true + rpcsrv.rpc("delete_waku_v2_filter_v1_subscription") do(contentFilters: seq[ContentFilter], topic: Option[string]) -> bool: ## Unsubscribes a node from a list of content filters debug "delete_waku_v2_filter_v1_subscription" - # Construct a filter request - # @TODO consider using default PubSub topic if undefined - let fReq = if topic.isSome: FilterRequest(pubSubTopic: topic.get, contentFilters: contentFilters, subscribe: false) else: FilterRequest(contentFilters: contentFilters, subscribe: false) + let + pubsubTopic: string = topic.get(DefaultPubsubTopic) + contentTopics: seq[ContentTopic] = contentFilters.mapIt(it.contentTopic) - if (await node.unsubscribe(fReq).withTimeout(futTimeout)): - # Successfully unsubscribed from all content filters + let unsubFut = node.unsubscribe(pubsubTopic, contentTopics) - for cTopic in contentFilters.mapIt(it.contentTopic): - # Remove message cache for each unsubscribed content topic - messageCache.del(cTopic) + if not await unsubFut.withTimeout(futTimeout): + raise newException(ValueError, "Failed to unsubscribe from contentFilters") + + # Successfully unsubscribed from all content filters + for cTopic in contentTopics: + # Remove message cache for each unsubscribed content topic + messageCache.del(cTopic) - return true - else: - # Failed to unsubscribe from one or more content filters - raise newException(ValueError, "Failed to unsubscribe from contentFilters " & repr(fReq)) + return true diff --git a/waku/v2/node/waku_metrics.nim b/waku/v2/node/waku_metrics.nim index 02f17338c..45a389ff6 100644 --- a/waku/v2/node/waku_metrics.nim +++ b/waku/v2/node/waku_metrics.nim @@ -1,14 +1,13 @@ {.push raises: [Defect].} import - stew/results, stew/shims/net, chronicles, chronos, metrics, metrics/chronos_httpserver import - ../protocol/waku_filter, + ../protocol/waku_filter/protocol_metrics as filter_metrics, ../protocol/waku_store/protocol_metrics as store_metrics, ../protocol/waku_lightpush/protocol_metrics as lightpush_metrics, ../protocol/waku_swap/waku_swap, diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index c3fdee770..35eed81c4 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -21,11 +21,13 @@ import ../protocol/waku_store/client, ../protocol/waku_swap/waku_swap, ../protocol/waku_filter, + ../protocol/waku_filter/client, ../protocol/waku_lightpush, ../protocol/waku_lightpush/client, ../protocol/waku_rln_relay/waku_rln_relay_types, ../protocol/waku_peer_exchange, - ../utils/[peers, requests, wakuenr], + ../utils/peers, + ../utils/wakuenr, ./peer_manager/peer_manager, ./storage/message/waku_store_queue, ./storage/message/message_retention_policy, @@ -37,24 +39,26 @@ import declarePublicGauge waku_version, "Waku version info (in git describe format)", ["version"] declarePublicCounter waku_node_messages, "number of messages received", ["type"] -declarePublicGauge waku_node_filters, "number of content filter subscriptions" declarePublicGauge waku_node_errors, "number of wakunode errors", ["type"] declarePublicGauge waku_lightpush_peers, "number of lightpush peers" +declarePublicGauge waku_filter_peers, "number of filter peers" declarePublicGauge waku_store_peers, "number of store peers" declarePublicGauge waku_px_peers, "number of peers (in the node's peerManager) supporting the peer exchange protocol" - logScope: topics = "wakunode" + # Git version in git describe format (defined compile time) const git_version* {.strdefine.} = "n/a" # Default clientId const clientId* = "Nimbus Waku v2 node" -# Default topic -const defaultTopic* = "/waku/2/default-waku/proto" +# TODO: Unify pubusub topic type and default value +type PubsubTopic* = string + +const defaultTopic*: PubsubTopic = "/waku/2/default-waku/proto" # Default Waku Filter Timeout const WakuFilterTimeout: Duration = 1.days @@ -63,7 +67,6 @@ const WakuFilterTimeout: Duration = 1.days # key and crypto modules different type # XXX: Weird type, should probably be using pubsub PubsubTopic object name? - PubsubTopic* = string Message* = seq[byte] WakuInfo* = object @@ -80,6 +83,7 @@ type wakuStore*: WakuStore wakuStoreClient*: WakuStoreClient wakuFilter*: WakuFilter + wakuFilterClient*: WakuFilterClient wakuSwap*: WakuSwap wakuRlnRelay*: WakuRLNRelay wakuLightPush*: WakuLightPush @@ -87,7 +91,6 @@ type wakuPeerExchange*: WakuPeerExchange enr*: enr.Record libp2pPing*: Ping - filters*: Filters rng*: ref rand.HmacDrbgContext wakuDiscv5*: WakuDiscoveryV5 announcedAddresses* : seq[MultiAddress] @@ -217,7 +220,6 @@ proc new*(T: type WakuNode, switch: switch, rng: rng, enr: enr, - filters: Filters.init(), announcedAddresses: announcedAddresses ) @@ -410,80 +412,123 @@ proc mountRelay*(node: WakuNode, ## Waku filter proc mountFilter*(node: WakuNode, filterTimeout: Duration = WakuFilterTimeout) {.async, raises: [Defect, LPError]} = - info "mounting filter" - proc filterHandler(requestId: string, msg: MessagePush) {.async, gcsafe.} = - - info "push received" - for message in msg.messages: - node.filters.notify(message, requestId) # Trigger filter handlers on a light node + info "mounting filter protocol" + node.wakuFilter = WakuFilter.new(node.peerManager, node.rng, filterTimeout) - if not node.wakuStore.isNil and (requestId in node.filters): - let pubSubTopic = node.filters[requestId].pubSubTopic - node.wakuStore.handleMessage(pubSubTopic, message) - - waku_node_messages.inc(labelValues = ["filter"]) - - node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler, filterTimeout) if node.started: - # Node has started already. Let's start filter too. await node.wakuFilter.start() node.switch.mount(node.wakuFilter, protocolMatcher(WakuFilterCodec)) -proc setFilterPeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError].} = +proc filterHandleMessage*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMessage) {.async.}= if node.wakuFilter.isNil(): - error "could not set peer, waku filter is nil" + error "cannot handle filter message", error="waku filter is nil" return - info "Set filter peer", peer=peer + await node.wakuFilter.handleMessage(pubsubTopic, message) + + +proc mountFilterClient*(node: WakuNode) {.async, raises: [Defect, LPError].} = + info "mounting filter client" + + node.wakuFilterClient = WakuFilterClient.new(node.peerManager, node.rng) + if node.started: + # Node has started already. Let's start filter too. + await node.wakuFilterClient.start() + + node.switch.mount(node.wakuFilterClient, protocolMatcher(WakuFilterCodec)) + +proc filterSubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic], + handler: FilterPushHandler, peer: RemotePeerInfo|string) {.async, gcsafe, raises: [Defect, ValueError].} = + ## Registers for messages that match a specific filter. Triggers the handler whenever a message is received. + if node.wakuFilterClient.isNil(): + error "cannot register filter subscription to topic", error="waku filter client is nil" + return + + let remotePeer = when peer is string: parseRemotePeerInfo(peer) + else: peer + + info "registering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer + + # Add handler wrapper to store the message when pushed, when relay is disabled and filter enabled + # TODO: Move this logic to wakunode2 app + let handlerWrapper: FilterPushHandler = proc(pubsubTopic: string, message: WakuMessage) {.raises: [Exception].} = + if node.wakuRelay.isNil() and not node.wakuStore.isNil(): + node.wakuStore.handleMessage(pubSubTopic, message) + + handler(pubsubTopic, message) + + let subRes = await node.wakuFilterClient.subscribe(pubsubTopic, contentTopics, handlerWrapper, peer=remotePeer) + if subRes.isOk(): + info "subscribed to topic", pubsubTopic=pubsubTopic, contentTopics=contentTopics + else: + error "failed filter subscription", error=subRes.error + waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) + +proc filterUnsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic], + peer: RemotePeerInfo|string) {.async, gcsafe, raises: [Defect, ValueError].} = + ## Unsubscribe from a content filter. + if node.wakuFilterClient.isNil(): + error "cannot unregister filter subscription to content", error="waku filter client is nil" + return + + let remotePeer = when peer is string: parseRemotePeerInfo(peer) + else: peer + + info "deregistering filter subscription to content", pubsubTopic=pubsubTopic, contentTopics=contentTopics, peer=remotePeer + + let unsubRes = await node.wakuFilterClient.unsubscribe(pubsubTopic, contentTopics, peer=remotePeer) + if unsubRes.isOk(): + info "unsubscribed from topic", pubsubTopic=pubsubTopic, contentTopics=contentTopics + else: + error "failed filter unsubscription", error=unsubRes.error + waku_node_errors.inc(labelValues = ["unsubscribe_filter_failure"]) + + +# TODO: Move to application module (e.g., wakunode2.nim) +proc setFilterPeer*(node: WakuNode, peer: RemotePeerInfo|string) {.raises: [Defect, ValueError, LPError], + deprecated: "Use the explicit destination peer procedures".} = + if node.wakuFilterClient.isNil(): + error "could not set peer, waku filter client is nil" + return + + info "seting filter client peer", peer=peer let remotePeer = when peer is string: parseRemotePeerInfo(peer) else: peer - node.wakuFilter.setPeer(remotePeer) + node.peerManager.addPeer(remotePeer, WakuFilterCodec) -proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHandler) {.async, gcsafe.} = + waku_filter_peers.inc() + +# TODO: Move to application module (e.g., wakunode2.nim) +proc subscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic], handler: FilterPushHandler) {.async, gcsafe, + deprecated: "Use the explicit destination peer procedure. Use 'node.filterSubscribe()' instead.".} = ## Registers for messages that match a specific filter. Triggers the handler whenever a message is received. - ## FilterHandler is a method that takes a MessagePush. + if node.wakuFilterClient.isNil(): + error "cannot register filter subscription to topic", error="waku filter client is nil" + return - # Sanity check for well-formed subscribe FilterRequest - doAssert(request.subscribe, "invalid subscribe request") + let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) + if peerOpt.isNone(): + error "cannot register filter subscription to topic", error="no suitable remote peers" + return - info "subscribe content", filter=request + await node.filterSubscribe(pubsubTopic, contentTopics, handler, peer=peerOpt.get()) - var id = generateRequestId(node.rng) - - if not node.wakuFilter.isNil(): - let - pubsubTopic = request.pubsubTopic - contentTopics = request.contentFilters.mapIt(it.contentTopic) - - let resSubscription = await node.wakuFilter.subscribe(pubsubTopic, contentTopics) - if resSubscription.isOk(): - id = resSubscription.get() - else: - # Failed to subscribe - error "remote subscription to filter failed", filter = request - waku_node_errors.inc(labelValues = ["subscribe_filter_failure"]) - - # Register handler for filter, whether remote subscription succeeded or not - node.filters.addContentFilters(id, request.pubSubTopic, request.contentFilters, handler) - waku_node_filters.set(node.filters.len.int64) - -proc unsubscribe*(node: WakuNode, request: FilterRequest) {.async, gcsafe.} = +# TODO: Move to application module (e.g., wakunode2.nim) +proc unsubscribe*(node: WakuNode, pubsubTopic: PubsubTopic, contentTopics: ContentTopic|seq[ContentTopic]) {.async, gcsafe, + deprecated: "Use the explicit destination peer procedure. Use 'node.filterUnsusbscribe()' instead.".} = ## Unsubscribe from a content filter. + if node.wakuFilterClient.isNil(): + error "cannot unregister filter subscription to content", error="waku filter client is nil" + return - # Sanity check for well-formed unsubscribe FilterRequest - doAssert(request.subscribe == false, "invalid unsubscribe request") - - info "unsubscribe content", filter=request - - let - pubsubTopic = request.pubsubTopic - contentTopics = request.contentFilters.mapIt(it.contentTopic) - discard await node.wakuFilter.unsubscribe(pubsubTopic, contentTopics) - node.filters.removeContentFilters(request.contentFilters) + let peerOpt = node.peerManager.selectPeer(WakuFilterCodec) + if peerOpt.isNone(): + error "cannot register filter subscription to topic", error="no suitable remote peers" + return - waku_node_filters.set(node.filters.len.int64) + await node.filterUnsubscribe(pubsubTopic, contentTopics, peer=peerOpt.get()) ## Waku swap @@ -503,11 +548,6 @@ proc mountSwap*(node: WakuNode, swapConfig: SwapConfig = SwapConfig.init()) {.as ## Waku store -proc mountStoreClient*(node: WakuNode, store: MessageStore = nil) = - info "mounting store client" - - node.wakuStoreClient = WakuStoreClient.new(node.peerManager, node.rng, store) - const MessageStoreDefaultRetentionPolicyInterval* = 30.minutes proc executeMessageRetentionPolicy*(node: WakuNode) = @@ -557,6 +597,12 @@ proc mountStore*(node: WakuNode, store: MessageStore = nil, retentionPolicy=none node.switch.mount(node.wakuStore, protocolMatcher(WakuStoreCodec)) + +proc mountStoreClient*(node: WakuNode, store: MessageStore = nil) = + info "mounting store client" + + node.wakuStoreClient = WakuStoreClient.new(node.peerManager, node.rng, store) + proc query*(node: WakuNode, query: HistoryQuery, peer: RemotePeerInfo): Future[WakuStoreResult[HistoryResponse]] {.async, gcsafe.} = ## Queries known nodes for historical messages if node.wakuStoreClient.isNil(): @@ -626,12 +672,6 @@ proc resume*(node: WakuNode, peerList: Option[seq[RemotePeerInfo]] = none(seq[Re ## Waku lightpush -proc mountLightPushClient*(node: WakuNode) = - info "mounting light push client" - - node.wakuLightpushClient = WakuLightPushClient.new(node.peerManager, node.rng) - - proc mountLightPush*(node: WakuNode) {.async.} = info "mounting light push" @@ -654,6 +694,12 @@ proc mountLightPush*(node: WakuNode) {.async.} = node.switch.mount(node.wakuLightPush, protocolMatcher(WakuLightPushCodec)) + +proc mountLightPushClient*(node: WakuNode) = + info "mounting light push client" + + node.wakuLightpushClient = WakuLightPushClient.new(node.peerManager, node.rng) + proc lightpushPublish*(node: WakuNode, pubsubTopic: PubsubTopic, message: WakuMessage, peer: RemotePeerInfo): Future[WakuLightPushResult[void]] {.async, gcsafe.} = ## Pushes a `WakuMessage` to a node which relays it further on PubSub topic. ## Returns whether relaying was successful or not. diff --git a/waku/v2/protocol/waku_filter.nim b/waku/v2/protocol/waku_filter.nim index a4f6d728e..81685e09b 100644 --- a/waku/v2/protocol/waku_filter.nim +++ b/waku/v2/protocol/waku_filter.nim @@ -3,11 +3,9 @@ import ./waku_filter/rpc, ./waku_filter/rpc_codec, - ./waku_filter/protocol, - ./waku_filter/client + ./waku_filter/protocol export rpc, rpc_codec, - protocol, - client + protocol diff --git a/waku/v2/protocol/waku_filter/client.nim b/waku/v2/protocol/waku_filter/client.nim index 33580d9b5..9f8040b87 100644 --- a/waku/v2/protocol/waku_filter/client.nim +++ b/waku/v2/protocol/waku_filter/client.nim @@ -1,69 +1,207 @@ {.push raises: [Defect].} import - std/[tables, sequtils], - chronicles + std/[options, tables, sequtils], + stew/results, + chronicles, + chronos, + metrics, + bearssl/rand, + libp2p/protocols/protocol as libp2p_protocol import ../waku_message, - ./rpc - -type - ContentFilterHandler* = proc(msg: WakuMessage) {.gcsafe, closure, raises: [Defect].} - - Filter* = object - pubSubTopic*: string - contentFilters*: seq[ContentFilter] - handler*: ContentFilterHandler - - Filters* = Table[string, Filter] + ../../node/peer_manager/peer_manager, + ../../utils/requests, + ./rpc, + ./rpc_codec, + ./protocol, + ./protocol_metrics -proc init*(T: type Filters): T = - initTable[string, Filter]() +logScope: + topics = "wakufilter.client" -proc addContentFilters*(filters: var Filters, requestId: string, pubsubTopic: string, contentFilters: seq[ContentFilter], handler: ContentFilterHandler) {.gcsafe.}= - filters[requestId] = Filter( - pubSubTopic: pubsubTopic, - contentFilters: contentFilters, - handler: handler + +const Defaultstring = "/waku/2/default-waku/proto" + + +### Client, filter subscripton manager + +type FilterPushHandler* = proc(pubsubTopic: string, message: WakuMessage) {.gcsafe, closure.} + + +## Subscription manager + +type SubscriptionManager = object + subscriptions: TableRef[(string, ContentTopic), FilterPushHandler] + +proc init(T: type SubscriptionManager): T = + SubscriptionManager(subscriptions: newTable[(string, ContentTopic), FilterPushHandler]()) + +proc clear(m: var SubscriptionManager) = + m.subscriptions.clear() + +proc registerSubscription(m: SubscriptionManager, pubsubTopic: string, contentTopic: ContentTopic, handler: FilterPushHandler) = + try: + m.subscriptions[(pubsubTopic, contentTopic)]= handler + except: + error "failed to register filter subscription", error=getCurrentExceptionMsg() + +proc removeSubscription(m: SubscriptionManager, pubsubTopic: string, contentTopic: ContentTopic) = + m.subscriptions.del((pubsubTopic, contentTopic)) + +proc notifySubscriptionHandler(m: SubscriptionManager, pubsubTopic: string, contentTopic: ContentTopic, message: WakuMessage) = + if not m.subscriptions.hasKey((pubsubTopic, contentTopic)): + return + + try: + let handler = m.subscriptions[(pubsubTopic, contentTopic)] + handler(pubsubTopic, message) + except: + discard + +proc getSubscriptionsCount(m: SubscriptionManager): int = + m.subscriptions.len() + + +## Client + +type MessagePushHandler* = proc(requestId: string, msg: MessagePush): Future[void] {.gcsafe, closure.} + +type WakuFilterClient* = ref object of LPProtocol + rng: ref rand.HmacDrbgContext + peerManager: PeerManager + subManager: SubscriptionManager + + +proc handleMessagePush(wf: WakuFilterClient, peerId: PeerId, requestId: string, rpc: MessagePush) = + for msg in rpc.messages: + let + pubsubTopic = Defaultstring # TODO: Extend the filter push rpc to provide the pubsub topic. This is a limitation + contentTopic = msg.contentTopic + + wf.subManager.notifySubscriptionHandler(pubsubTopic, contentTopic, msg) + + +proc initProtocolHandler(wf: WakuFilterClient) = + proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = + let buffer = await conn.readLp(MaxRpcSize.int) + + let decodeReqRes = FilterRPC.init(buffer) + if decodeReqRes.isErr(): + waku_filter_errors.inc(labelValues = [decodeRpcFailure]) + return + + let rpc = decodeReqRes.get() + trace "filter message received" + + if rpc.push == MessagePush(): + waku_filter_errors.inc(labelValues = [emptyMessagePushFailure]) + # TODO: Manage the empty push message error. Perform any action? + return + + waku_filter_messages.inc(labelValues = ["MessagePush"]) + + let + peerId = conn.peerId + requestId = rpc.requestId + push = rpc.push + + info "received filter message push", peerId=conn.peerId, requestId=requestId + wf.handleMessagePush(peerId, requestId, push) + + wf.handler = handle + wf.codec = WakuFilterCodec + +proc new*(T: type WakuFilterClient, + peerManager: PeerManager, + rng: ref rand.HmacDrbgContext): T = + + let wf = WakuFilterClient( + peerManager: peerManager, + rng: rng, + subManager: SubscriptionManager.init() + ) + wf.initProtocolHandler() + wf + + +proc sendFilterRpc(wf: WakuFilterClient, rpc: FilterRPC, peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async, gcsafe.}= + let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec) + if connOpt.isNone(): + return err(dialFailure) + let connection = connOpt.get() + + await connection.writeLP(rpc.encode().buffer) + return ok() + +proc sendFilterRequestRpc(wf: WakuFilterClient, + pubsubTopic: string, + contentTopics: seq[ContentTopic], + subscribe: bool, + peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async.} = + + let requestId = generateRequestId(wf.rng) + let contentFilters = contentTopics.mapIt(ContentFilter(contentTopic: it)) + + let rpc = FilterRpc( + requestId: requestId, + request: FilterRequest( + subscribe: subscribe, + pubSubTopic: pubsubTopic, + contentFilters: contentFilters + ) ) -proc removeContentFilters*(filters: var Filters, contentFilters: seq[ContentFilter]) {.gcsafe.} = - # Flatten all unsubscribe topics into single seq - let unsubscribeTopics = contentFilters.mapIt(it.contentTopic) - - debug "unsubscribing", unsubscribeTopics=unsubscribeTopics + let sendRes = await wf.sendFilterRpc(rpc, peer) + if sendRes.isErr(): + waku_filter_errors.inc(labelValues = [sendRes.error]) + return err(sendRes.error) + + return ok() - var rIdToRemove: seq[string] = @[] - for rId, f in filters.mpairs: - # Iterate filter entries to remove matching content topics - - # make sure we delete the content filter - # if no more topics are left - f.contentFilters.keepIf(proc (cf: auto): bool = cf.contentTopic notin unsubscribeTopics) - if f.contentFilters.len == 0: - rIdToRemove.add(rId) +proc subscribe*(wf: WakuFilterClient, + pubsubTopic: string, + contentTopic: ContentTopic|seq[ContentTopic], + handler: FilterPushHandler, + peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async.} = + var topics: seq[ContentTopic] + when contentTopic is seq[ContentTopic]: + topics = contentTopic + else: + topics = @[contentTopic] - # make sure we delete the filter entry - # if no more content filters left - for rId in rIdToRemove: - filters.del(rId) - - debug "filters modified", filters=filters + let sendRes = await wf.sendFilterRequestRpc(pubsubTopic, topics, subscribe=true, peer=peer) + if sendRes.isErr(): + return err(sendRes.error) -proc notify*(filters: Filters, msg: WakuMessage, requestId: string) = - for key, filter in filters.pairs: - # We do this because the key for the filter is set to the requestId received from the filter protocol. - # This means we do not need to check the content filter explicitly as all MessagePushs already contain - # the requestId of the coresponding filter. - if requestId != "" and requestId == key: - filter.handler(msg) - continue + for topic in topics: + wf.subManager.registerSubscription(pubsubTopic, topic, handler) - # TODO: In case of no topics we should either trigger here for all messages, - # or we should not allow such filter to exist in the first place. - for contentFilter in filter.contentFilters: - if msg.contentTopic == contentFilter.contentTopic: - filter.handler(msg) - break + return ok() + +proc unsubscribe*(wf: WakuFilterClient, + pubsubTopic: string, + contentTopic: ContentTopic|seq[ContentTopic], + peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async.} = + var topics: seq[ContentTopic] + when contentTopic is seq[ContentTopic]: + topics = contentTopic + else: + topics = @[contentTopic] + + let sendRes = await wf.sendFilterRequestRpc(pubsubTopic, topics, subscribe=false, peer=peer) + if sendRes.isErr(): + return err(sendRes.error) + + for topic in topics: + wf.subManager.removeSubscription(pubsubTopic, topic) + + return ok() + +proc clearSubscriptions*(wf: WakuFilterClient) = + wf.subManager.clear() + +proc getSubscriptionsCount*(wf: WakuFilterClient): int = + wf.subManager.getSubscriptionsCount() \ No newline at end of file diff --git a/waku/v2/protocol/waku_filter/protocol.nim b/waku/v2/protocol/waku_filter/protocol.nim index da09a1472..ebccb4759 100644 --- a/waku/v2/protocol/waku_filter/protocol.nim +++ b/waku/v2/protocol/waku_filter/protocol.nim @@ -10,37 +10,26 @@ import import ../waku_message, ../../node/peer_manager/peer_manager, - ../../utils/requests, ./rpc, - ./rpc_codec + ./rpc_codec, + ./protocol_metrics -declarePublicGauge waku_filter_peers, "number of filter peers" -declarePublicGauge waku_filter_subscribers, "number of light node filter subscribers" -declarePublicGauge waku_filter_errors, "number of filter protocol errors", ["type"] -declarePublicGauge waku_filter_messages, "number of filter messages received", ["type"] - logScope: topics = "wakufilter" const - # We add a 64kB safety buffer for protocol overhead. - # 10x-multiplier also for safety: currently we never - # push more than 1 message at a time. - MaxRpcSize* = 10 * MaxWakuMessageSize + 64 * 1024 - WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1" + WakuFilterTimeout: Duration = 2.hours -# Error types (metric label values) -const - dialFailure = "dial_failure" - decodeRpcFailure = "decode_rpc_failure" - peerNotFoundFailure = "peer_not_found_failure" +type WakuFilterResult*[T] = Result[T, string] +## Subscription manager + type Subscription = object requestId: string peer: PeerID @@ -68,108 +57,86 @@ proc removeSubscription(subscriptions: var seq[Subscription], peer: PeerId, unsu subscriptions.keepItIf(it.contentTopics.len > 0) +## Protocol + type MessagePushHandler* = proc(requestId: string, msg: MessagePush): Future[void] {.gcsafe, closure.} - WakuFilterResult*[T] = Result[T, string] - WakuFilter* = ref object of LPProtocol rng*: ref rand.HmacDrbgContext peerManager*: PeerManager - pushHandler*: MessagePushHandler subscriptions*: seq[Subscription] failedPeers*: Table[string, chronos.Moment] timeout*: chronos.Duration -proc init(wf: WakuFilter) = +proc handleFilterRequest(wf: WakuFilter, peerId: PeerId, rpc: FilterRPC) = + let + requestId = rpc.requestId + subscribe = rpc.request.subscribe + pubsubTopic = rpc.request.pubsubTopic + contentTopics = rpc.request.contentFilters.mapIt(it.contentTopic) - proc handle(conn: Connection, proto: string) {.async, gcsafe, closure.} = - let message = await conn.readLp(MaxRpcSize.int) + if subscribe: + info "added filter subscritpiton", peerId=peerId, pubsubTopic=pubsubTopic, contentTopics=contentTopics + wf.subscriptions.addSubscription(peerId, requestId, pubsubTopic, contentTopics) + else: + info "removed filter subscritpiton", peerId=peerId, contentTopics=contentTopics + wf.subscriptions.removeSubscription(peerId, contentTopics) - let res = FilterRPC.init(message) - if res.isErr(): + waku_filter_subscribers.set(wf.subscriptions.len.int64) + + +proc initProtocolHandler(wf: WakuFilter) = + proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} = + let buffer = await conn.readLp(MaxRpcSize.int) + + let decodeRpcRes = FilterRPC.init(buffer) + if decodeRpcRes.isErr(): waku_filter_errors.inc(labelValues = [decodeRpcFailure]) return trace "filter message received" - let rpc = res.get() + let rpc = decodeRpcRes.get() ## Filter request - # We are receiving a subscription/unsubscription request - if rpc.request != FilterRequest(): - waku_filter_messages.inc(labelValues = ["FilterRequest"]) + # Subscription/unsubscription request + if rpc.request == FilterRequest(): + waku_filter_errors.inc(labelValues = [emptyFilterRequestFailure]) + # TODO: Manage the empty filter request message error. Perform any action? + return - let - requestId = rpc.requestId - subscribe = rpc.request.subscribe - pubsubTopic = rpc.request.pubsubTopic - contentTopics = rpc.request.contentFilters.mapIt(it.contentTopic) - - if subscribe: - info "added filter subscritpiton", peerId=conn.peerId, pubsubTopic=pubsubTopic, contentTopics=contentTopics - wf.subscriptions.addSubscription(conn.peerId, requestId, pubsubTopic, contentTopics) - else: - info "removed filter subscritpiton", peerId=conn.peerId, contentTopics=contentTopics - wf.subscriptions.removeSubscription(conn.peerId, contentTopics) - - waku_filter_subscribers.set(wf.subscriptions.len.int64) - + waku_filter_messages.inc(labelValues = ["FilterRequest"]) + wf.handleFilterRequest(conn.peerId, rpc) - ## Push message - # We are receiving a messages from the peer that we subscribed to - if rpc.push != MessagePush(): - waku_filter_messages.inc(labelValues = ["MessagePush"]) - - let - requestId = rpc.requestId - push = rpc.push - - info "received filter message push", peerId=conn.peerId - await wf.pushHandler(requestId, push) - - - wf.handler = handle + wf.handler = handler wf.codec = WakuFilterCodec +proc new*(T: type WakuFilter, + peerManager: PeerManager, + rng: ref rand.HmacDrbgContext, + timeout: Duration = WakuFilterTimeout): T = + let wf = WakuFilter(rng: rng, + peerManager: peerManager, + timeout: timeout) + wf.initProtocolHandler() + return wf + proc init*(T: type WakuFilter, peerManager: PeerManager, rng: ref rand.HmacDrbgContext, - handler: MessagePushHandler, - timeout: Duration = WakuFilterTimeout): T = - let wf = WakuFilter(rng: rng, - peerManager: peerManager, - pushHandler: handler, - timeout: timeout) - wf.init() - return wf + timeout: Duration = WakuFilterTimeout): T {. + deprecated: "WakuFilter.new()' instead".} = + WakuFilter.new(peerManager, rng, timeout) -proc setPeer*(wf: WakuFilter, peer: RemotePeerInfo) = - wf.peerManager.addPeer(peer, WakuFilterCodec) - waku_filter_peers.inc() - - -proc sendFilterRpcToPeer(wf: WakuFilter, rpc: FilterRPC, peer: PeerId): Future[WakuFilterResult[void]] {.async, gcsafe.}= +proc sendFilterRpc(wf: WakuFilter, rpc: FilterRPC, peer: PeerId|RemotePeerInfo): Future[WakuFilterResult[void]] {.async, gcsafe.}= let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec) if connOpt.isNone(): return err(dialFailure) - let connection = connOpt.get() await connection.writeLP(rpc.encode().buffer) - - return ok() - -proc sendFilterRpcToRemotePeer(wf: WakuFilter, rpc: FilterRPC, peer: RemotePeerInfo): Future[WakuFilterResult[void]] {.async, gcsafe.}= - let connOpt = await wf.peerManager.dialPeer(peer, WakuFilterCodec) - if connOpt.isNone(): - return err(dialFailure) - - let connection = connOpt.get() - - await connection.writeLP(rpc.encode().buffer) - return ok() @@ -199,6 +166,9 @@ proc handleClientError(wf: WakuFilter, subs: seq[Subscription]) {.raises: [Defec proc handleMessage*(wf: WakuFilter, pubsubTopic: string, msg: WakuMessage) {.async.} = + + trace "handling message", pubsubTopic, contentTopic=msg.contentTopic, subscriptions=wf.subscriptions.len + if wf.subscriptions.len <= 0: return @@ -218,70 +188,14 @@ proc handleMessage*(wf: WakuFilter, pubsubTopic: string, msg: WakuMessage) {.asy push: MessagePush(messages: @[msg]) ) - let res = await wf.sendFilterRpcToPeer(rpc, sub.peer) + let res = await wf.sendFilterRpc(rpc, sub.peer) if res.isErr(): waku_filter_errors.inc(labelValues = [res.error()]) failedSubscriptions.add(sub) continue - + connectedSubscriptions.add(sub) wf.removePeerFromFailedPeersTable(connectedSubscriptions) wf.handleClientError(failedSubscriptions) - - -### Send subscription/unsubscription - -proc subscribe(wf: WakuFilter, pubsubTopic: string, contentTopics: seq[ContentTopic], peer: RemotePeerInfo): Future[WakuFilterResult[string]] {.async, gcsafe.} = - let id = generateRequestId(wf.rng) - let rpc = FilterRPC( - requestId: id, - request: FilterRequest( - subscribe: true, - pubSubTopic: pubsubTopic, - contentFilters: contentTopics.mapIt(ContentFilter(contentTopic: it)) - ) - ) - - let res = await wf.sendFilterRpcToRemotePeer(rpc, peer) - if res.isErr(): - waku_filter_errors.inc(labelValues = [res.error()]) - return err(res.error()) - - return ok(id) - -proc subscribe*(wf: WakuFilter, pubsubTopic: string, contentTopics: seq[ContentTopic]): Future[WakuFilterResult[string]] {.async, gcsafe.} = - let peerOpt = wf.peerManager.selectPeer(WakuFilterCodec) - if peerOpt.isNone(): - waku_filter_errors.inc(labelValues = [peerNotFoundFailure]) - return err(peerNotFoundFailure) - - return await wf.subscribe(pubsubTopic, contentTopics, peerOpt.get()) - - -proc unsubscribe(wf: WakuFilter, pubsubTopic: string, contentTopics: seq[ContentTopic], peer: RemotePeerInfo): Future[WakuFilterResult[void]] {.async, gcsafe.} = - let id = generateRequestId(wf.rng) - let rpc = FilterRPC( - requestId: id, - request: FilterRequest( - subscribe: false, - pubSubTopic: pubsubTopic, - contentFilters: contentTopics.mapIt(ContentFilter(contentTopic: it)) - ) - ) - - let res = await wf.sendFilterRpcToRemotePeer(rpc, peer) - if res.isErr(): - waku_filter_errors.inc(labelValues = [res.error()]) - return err(res.error()) - - return ok() - -proc unsubscribe*(wf: WakuFilter, pubsubTopic: string, contentTopics: seq[ContentTopic]): Future[WakuFilterResult[void]] {.async, gcsafe.} = - let peerOpt = wf.peerManager.selectPeer(WakuFilterCodec) - if peerOpt.isNone(): - waku_filter_errors.inc(labelValues = [peerNotFoundFailure]) - return err(peerNotFoundFailure) - - return await wf.unsubscribe(pubsubTopic, contentTopics, peerOpt.get()) \ No newline at end of file diff --git a/waku/v2/protocol/waku_filter/protocol_metrics.nim b/waku/v2/protocol/waku_filter/protocol_metrics.nim new file mode 100644 index 000000000..f9dd7ad15 --- /dev/null +++ b/waku/v2/protocol/waku_filter/protocol_metrics.nim @@ -0,0 +1,19 @@ +{.push raises: [Defect].} + +import metrics + + +declarePublicGauge waku_filter_subscribers, "number of light node filter subscribers" +declarePublicGauge waku_filter_errors, "number of filter protocol errors", ["type"] +declarePublicGauge waku_filter_messages, "number of filter messages received", ["type"] +declarePublicGauge waku_node_filters, "number of content filter subscriptions" + + +# Error types (metric label values) +const + dialFailure* = "dial_failure" + decodeRpcFailure* = "decode_rpc_failure" + peerNotFoundFailure* = "peer_not_found_failure" + emptyMessagePushFailure* = "empty_message_push_failure" + emptyFilterRequestFailure* = "empty_filter_request_failure" + diff --git a/waku/v2/protocol/waku_filter/rpc_codec.nim b/waku/v2/protocol/waku_filter/rpc_codec.nim index 5fed99c89..d1e610d7d 100644 --- a/waku/v2/protocol/waku_filter/rpc_codec.nim +++ b/waku/v2/protocol/waku_filter/rpc_codec.nim @@ -9,6 +9,11 @@ import ./rpc +# Multiply by 10 for safety. Currently we never push more than 1 message at a time +# We add a 64kB safety buffer for protocol overhead. +const MaxRpcSize* = 10 * MaxWakuMessageSize + 64 * 1024 + + proc encode*(filter: ContentFilter): ProtoBuffer = var output = initProtoBuffer() output.write3(1, filter.contentTopic)