diff --git a/apps/chat2/chat2.nim b/apps/chat2/chat2.nim index 2d0764392..f0c4b433b 100644 --- a/apps/chat2/chat2.nim +++ b/apps/chat2/chat2.nim @@ -212,7 +212,7 @@ proc publish(c: Chat, line: string) = # Attempt lightpush asyncSpawn c.node.lightpushPublish(some(DefaultPubsubTopic), message) else: - asyncSpawn c.node.publish(DefaultPubsubTopic, message) + asyncSpawn c.node.publish(some(DefaultPubsubTopic), message) # TODO This should read or be subscribe handler subscribe proc readAndPrint(c: Chat) {.async.} = @@ -490,8 +490,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = if msg.contentTopic == chat.contentTopic: chat.printReceivedMessage(msg) - let topic = DefaultPubsubTopic - node.subscribe(topic, handler) + node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(handler)) if conf.rlnRelay: info "WakuRLNRelay is enabled" diff --git a/apps/chat2bridge/chat2bridge.nim b/apps/chat2bridge/chat2bridge.nim index 700ef5a43..af90dbbfc 100644 --- a/apps/chat2bridge/chat2bridge.nim +++ b/apps/chat2bridge/chat2bridge.nim @@ -95,7 +95,7 @@ proc toChat2(cmb: Chat2MatterBridge, jsonNode: JsonNode) {.async.} = chat2_mb_transfers.inc(labelValues = ["mb_to_chat2"]) - await cmb.nodev2.publish(DefaultPubsubTopic, msg) + await cmb.nodev2.publish(some(DefaultPubsubTopic), msg) proc toMatterbridge(cmb: Chat2MatterBridge, msg: WakuMessage) {.gcsafe, raises: [Exception].} = if cmb.seen.containsOrAdd(msg.payload.hash()): @@ -204,7 +204,7 @@ proc start*(cmb: Chat2MatterBridge) {.async.} = trace "Bridging message from Chat2 to Matterbridge", msg=msg cmb.toMatterbridge(msg) - cmb.nodev2.subscribe(DefaultPubsubTopic, relayHandler) + cmb.nodev2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)) proc stop*(cmb: Chat2MatterBridge) {.async.} = info "Stopping Chat2MatterBridge" @@ -229,8 +229,8 @@ when isMainModule: # Install enabled API handlers: if conf.relay: - let topicCache = relay_api.MessageCache.init(capacity=30) - installRelayApiHandlers(node, rpcServer, topicCache) + let cache = MessageCache[string].init(capacity=30) + installRelayApiHandlers(node, rpcServer, cache) if conf.filter: let messageCache = filter_api.MessageCache.init(capacity=30) diff --git a/apps/networkmonitor/networkmonitor.nim b/apps/networkmonitor/networkmonitor.nim index fc0787543..3af9c5cee 100644 --- a/apps/networkmonitor/networkmonitor.nim +++ b/apps/networkmonitor/networkmonitor.nim @@ -402,7 +402,7 @@ proc subscribeAndHandleMessages(node: WakuNode, else: msgPerContentTopic[msg.contentTopic] = 1 - node.subscribe(pubsubTopic, handler) + node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler)) when isMainModule: # known issue: confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index 3105991bd..3b458cc91 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -28,6 +28,21 @@ import ../../waku/node/peer_manager, ../../waku/node/peer_manager/peer_store/waku_peer_storage, ../../waku/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations, + ../../waku/waku_api/message_cache, + ../../waku/waku_api/cache_handlers, + ../../waku/waku_api/rest/server, + ../../waku/waku_api/rest/debug/handlers as rest_debug_api, + ../../waku/waku_api/rest/relay/handlers as rest_relay_api, + ../../waku/waku_api/rest/filter/legacy_handlers as rest_legacy_filter_api, + ../../waku/waku_api/rest/filter/handlers as rest_filter_api, + ../../waku/waku_api/rest/lightpush/handlers as rest_lightpush_api, + ../../waku/waku_api/rest/store/handlers as rest_store_api, + ../../waku/waku_api/rest/health/handlers as rest_health_api, + ../../waku/waku_api/jsonrpc/admin/handlers as rpc_admin_api, + ../../waku/waku_api/jsonrpc/debug/handlers as rpc_debug_api, + ../../waku/waku_api/jsonrpc/filter/handlers as rpc_filter_api, + ../../waku/waku_api/jsonrpc/relay/handlers as rpc_relay_api, + ../../waku/waku_api/jsonrpc/store/handlers as rpc_store_api, ../../waku/waku_archive, ../../waku/waku_dnsdisc, ../../waku/waku_enr, @@ -41,22 +56,6 @@ import ./wakunode2_validator_signed, ./internal_config, ./external_config -import - ../../waku/waku_api/message_cache, - ../../waku/waku_api/rest/server, - ../../waku/waku_api/rest/debug/handlers as rest_debug_api, - ../../waku/waku_api/rest/relay/handlers as rest_relay_api, - ../../waku/waku_api/rest/relay/topic_cache, - ../../waku/waku_api/rest/filter/legacy_handlers as rest_legacy_filter_api, - ../../waku/waku_api/rest/filter/handlers as rest_filter_api, - ../../waku/waku_api/rest/lightpush/handlers as rest_lightpush_api, - ../../waku/waku_api/rest/store/handlers as rest_store_api, - ../../waku/waku_api/rest/health/handlers as rest_health_api, - ../../waku/waku_api/jsonrpc/admin/handlers as rpc_admin_api, - ../../waku/waku_api/jsonrpc/debug/handlers as rpc_debug_api, - ../../waku/waku_api/jsonrpc/filter/handlers as rpc_filter_api, - ../../waku/waku_api/jsonrpc/relay/handlers as rpc_relay_api, - ../../waku/waku_api/jsonrpc/store/handlers as rpc_store_api logScope: topics = "wakunode app" @@ -576,8 +575,20 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo ## Relay REST API if conf.relay: - let relayCache = TopicCache.init(capacity=conf.restRelayCacheCapacity) - installRelayApiHandlers(server.router, app.node, relayCache) + let cache = MessageCache[string].init(capacity=conf.restRelayCacheCapacity) + + let handler = messageCacheHandler(cache) + let autoHandler = autoMessageCacheHandler(cache) + + for pubsubTopic in conf.pubsubTopics: + cache.subscribe(pubsubTopic) + app.node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler)) + + for contentTopic in conf.contentTopics: + cache.subscribe(contentTopic) + app.node.subscribe((kind: ContentSub, topic: contentTopic), some(autoHandler)) + + installRelayApiHandlers(server.router, app.node, cache) ## Filter REST API if conf.filter: @@ -610,8 +621,20 @@ proc startRpcServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNod installDebugApiHandlers(app.node, server) if conf.relay: - let relayMessageCache = rpc_relay_api.MessageCache.init(capacity=30) - installRelayApiHandlers(app.node, server, relayMessageCache) + let cache = MessageCache[string].init(capacity=30) + + let handler = messageCacheHandler(cache) + let autoHandler = autoMessageCacheHandler(cache) + + for pubsubTopic in conf.pubsubTopics: + cache.subscribe(pubsubTopic) + app.node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler)) + + for contentTopic in conf.contentTopics: + cache.subscribe(contentTopic) + app.node.subscribe((kind: ContentSub, topic: contentTopic), some(autoHandler)) + + installRelayApiHandlers(app.node, server, cache) if conf.filternode != "": let filterMessageCache = rpc_filter_api.MessageCache.init(capacity=30) diff --git a/examples/publisher.nim b/examples/publisher.nim index 1c2e9d145..da18d4e7e 100644 --- a/examples/publisher.nim +++ b/examples/publisher.nim @@ -99,7 +99,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = contentTopic: contentTopic, # content topic to publish to ephemeral: true, # tell store nodes to not store it timestamp: now()) # current timestamp - await node.publish(pubSubTopic, message) + await node.publish(some(pubSubTopic), message) notice "published message", text = text, timestamp = message.timestamp, psTopic = pubSubTopic, contentTopic = contentTopic await sleepAsync(5000) diff --git a/examples/subscriber.nim b/examples/subscriber.nim index 73e89b237..946b4809b 100644 --- a/examples/subscriber.nim +++ b/examples/subscriber.nim @@ -94,7 +94,7 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} = pubsubTopic=pubsubTopic, contentTopic=msg.contentTopic, timestamp=msg.timestamp - node.subscribe(pubSubTopic, handler) + node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler)) when isMainModule: let rng = crypto.newRng() diff --git a/tests/test_peer_exchange.nim b/tests/test_peer_exchange.nim index 95cb75ee4..a5b52f84f 100644 --- a/tests/test_peer_exchange.nim +++ b/tests/test_peer_exchange.nim @@ -54,9 +54,9 @@ procSuite "Peer Exchange": peerExchangeHandler = handlePeerExchange emptyHandler = ignorePeerExchange - await node1.mountRelay(topics = @[DefaultPubsubTopic], peerExchangeHandler = some(emptyHandler)) - await node2.mountRelay(topics = @[DefaultPubsubTopic], peerExchangeHandler = some(emptyHandler)) - await node3.mountRelay(topics = @[DefaultPubsubTopic], peerExchangeHandler = some(peerExchangeHandler)) + await node1.mountRelay(@[DefaultPubsubTopic], some(emptyHandler)) + await node2.mountRelay(@[DefaultPubsubTopic], some(emptyHandler)) + await node3.mountRelay(@[DefaultPubsubTopic], some(peerExchangeHandler)) # Ensure that node1 prunes all peers after the first connection node1.wakuRelay.parameters.dHigh = 1 diff --git a/tests/test_waku_discv5.nim b/tests/test_waku_discv5.nim index a99380d99..dd9e4fd04 100644 --- a/tests/test_waku_discv5.nim +++ b/tests/test_waku_discv5.nim @@ -415,9 +415,9 @@ procSuite "Waku Discovery v5": asyncSpawn node.subscriptionsListener(queue) ## Then - queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard1)) - queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard2)) - queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard3)) + queue.emit((kind: PubsubSub, topic: shard1)) + queue.emit((kind: PubsubSub, topic: shard2)) + queue.emit((kind: PubsubSub, topic: shard3)) await sleepAsync(1.seconds) @@ -426,9 +426,9 @@ procSuite "Waku Discovery v5": node.protocol.localNode.record.containsShard(shard2) == true node.protocol.localNode.record.containsShard(shard3) == true - queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard1)) - queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard2)) - queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard3)) + queue.emit((kind: PubsubSub, topic: shard1)) + queue.emit((kind: PubsubSub, topic: shard2)) + queue.emit((kind: PubsubSub, topic: shard3)) await sleepAsync(1.seconds) @@ -437,9 +437,9 @@ procSuite "Waku Discovery v5": node.protocol.localNode.record.containsShard(shard2) == true node.protocol.localNode.record.containsShard(shard3) == true - queue.emit(SubscriptionEvent(kind: PubsubUnsub, pubsubUnsub: shard1)) - queue.emit(SubscriptionEvent(kind: PubsubUnsub, pubsubUnsub: shard2)) - queue.emit(SubscriptionEvent(kind: PubsubUnsub, pubsubUnsub: shard3)) + queue.emit((kind: PubsubUnsub, topic: shard1)) + queue.emit((kind: PubsubUnsub, topic: shard2)) + queue.emit((kind: PubsubUnsub, topic: shard3)) await sleepAsync(1.seconds) diff --git a/tests/test_wakunode.nim b/tests/test_wakunode.nim index 5efeb8d3c..fd6f21808 100644 --- a/tests/test_wakunode.nim +++ b/tests/test_wakunode.nim @@ -67,10 +67,10 @@ suite "WakuNode": msg.payload == payload completionFut.complete(true) - node2.subscribe(pubSubTopic, relayHandler) + node2.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler)) await sleepAsync(2000.millis) - await node1.publish(pubSubTopic, message) + await node1.publish(some(pubSubTopic), message) await sleepAsync(2000.millis) check: diff --git a/tests/test_wakunode_lightpush.nim b/tests/test_wakunode_lightpush.nim index 7a15175c1..7208c587b 100644 --- a/tests/test_wakunode_lightpush.nim +++ b/tests/test_wakunode_lightpush.nim @@ -49,7 +49,7 @@ suite "WakuNode - Lightpush": topic == DefaultPubsubTopic msg == message completionFutRelay.complete(true) - destNode.subscribe(DefaultPubsubTopic, relayHandler) + destNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)) # Wait for subscription to take effect await sleepAsync(100.millis) diff --git a/tests/waku_relay/test_waku_relay.nim b/tests/waku_relay/test_waku_relay.nim index 188ddf607..62e094bd1 100644 --- a/tests/waku_relay/test_waku_relay.nim +++ b/tests/waku_relay/test_waku_relay.nim @@ -46,8 +46,8 @@ suite "Waku Relay": networkB = "test-network2" ## when - nodeA.subscribe(networkA, noopRawHandler()) - nodeA.subscribe(networkB, noopRawHandler()) + discard nodeA.subscribe(networkA, noopRawHandler()) + discard nodeA.subscribe(networkB, noopRawHandler()) ## Then check: @@ -73,9 +73,9 @@ suite "Waku Relay": networkB = "test-network2" networkC = "test-network3" - nodeA.subscribe(networkA, noopRawHandler()) - nodeA.subscribe(networkB, noopRawHandler()) - nodeA.subscribe(networkC, noopRawHandler()) + discard nodeA.subscribe(networkA, noopRawHandler()) + discard nodeA.subscribe(networkB, noopRawHandler()) + discard nodeA.subscribe(networkC, noopRawHandler()) let topics = toSeq(nodeA.subscribedTopics) require: @@ -85,7 +85,7 @@ suite "Waku Relay": topics.contains(networkC) ## When - nodeA.unsubscribe(networkA) + nodeA.unsubscribeAll(networkA) ## Then check: @@ -129,14 +129,14 @@ suite "Waku Relay": proc srcSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = srcSubsFut.complete((topic, message)) - srcNode.subscribe(networkTopic, srcSubsHandler) + discard srcNode.subscribe(networkTopic, srcSubsHandler) # Subscription let dstSubsFut = newFuture[(PubsubTopic, WakuMessage)]() proc dstSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = dstSubsFut.complete((topic, message)) - dstNode.subscribe(networkTopic, dstSubsHandler) + discard dstNode.subscribe(networkTopic, dstSubsHandler) await sleepAsync(500.millis) @@ -196,7 +196,7 @@ suite "Waku Relay": proc dstSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = dstSubsFut.complete((topic, message)) - dstNode.subscribe(networkTopic, dstSubsHandler) + discard dstNode.subscribe(networkTopic, dstSubsHandler) await sleepAsync(500.millis) diff --git a/tests/waku_relay/test_wakunode_relay.nim b/tests/waku_relay/test_wakunode_relay.nim index a59551457..db18fc537 100644 --- a/tests/waku_relay/test_wakunode_relay.nim +++ b/tests/waku_relay/test_wakunode_relay.nim @@ -92,10 +92,10 @@ suite "WakuNode - Relay": msg.payload == payload completionFut.complete(true) - node3.subscribe(pubSubTopic, relayHandler) + node3.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler)) await sleepAsync(500.millis) - await node1.publish(pubSubTopic, message) + await node1.publish(some(pubSubTopic), message) ## Then check: @@ -173,14 +173,14 @@ suite "WakuNode - Relay": # relay handler is called completionFut.complete(true) - node3.subscribe(pubSubTopic, relayHandler) + node3.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler)) await sleepAsync(500.millis) - await node1.publish(pubSubTopic, message1) + await node1.publish(some(pubSubTopic), message1) await sleepAsync(500.millis) # message2 never gets relayed because of the validator - await node1.publish(pubSubTopic, message2) + await node1.publish(some(pubSubTopic), message2) await sleepAsync(500.millis) check: @@ -207,7 +207,7 @@ suite "WakuNode - Relay": connOk == true # Node 1 subscribes to topic - nodes[1].subscribe(DefaultPubsubTopic) + nodes[1].subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)) await sleepAsync(500.millis) # Node 0 publishes 5 messages not compliant with WakuMessage (aka random bytes) @@ -254,10 +254,10 @@ suite "WakuNode - Relay": msg.payload == payload completionFut.complete(true) - node1.subscribe(pubSubTopic, relayHandler) + node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler)) await sleepAsync(500.millis) - await node2.publish(pubSubTopic, message) + await node2.publish(some(pubSubTopic), message) await sleepAsync(500.millis) @@ -295,10 +295,10 @@ suite "WakuNode - Relay": msg.payload == payload completionFut.complete(true) - node1.subscribe(pubSubTopic, relayHandler) + node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler)) await sleepAsync(500.millis) - await node2.publish(pubSubTopic, message) + await node2.publish(some(pubSubTopic), message) await sleepAsync(500.millis) @@ -340,10 +340,10 @@ suite "WakuNode - Relay": msg.payload == payload completionFut.complete(true) - node1.subscribe(pubSubTopic, relayHandler) + node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler)) await sleepAsync(500.millis) - await node2.publish(pubSubTopic, message) + await node2.publish(some(pubSubTopic), message) await sleepAsync(500.millis) check: @@ -380,10 +380,10 @@ suite "WakuNode - Relay": msg.payload == payload completionFut.complete(true) - node1.subscribe(pubSubTopic, relayHandler) + node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler)) await sleepAsync(500.millis) - await node2.publish(pubSubTopic, message) + await node2.publish(some(pubSubTopic), message) await sleepAsync(500.millis) check: @@ -420,10 +420,10 @@ suite "WakuNode - Relay": msg.payload == payload completionFut.complete(true) - node1.subscribe(pubSubTopic, relayHandler) + node1.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler)) await sleepAsync(500.millis) - await node2.publish(pubSubTopic, message) + await node2.publish(some(pubSubTopic), message) await sleepAsync(500.millis) @@ -440,7 +440,7 @@ suite "WakuNode - Relay": # subscribe all nodes to a topic let topic = "topic" - for node in nodes: node.wakuRelay.subscribe(topic, nil) + for node in nodes: discard node.wakuRelay.subscribe(topic, nil) await sleepAsync(500.millis) # connect nodes in full mesh @@ -482,3 +482,48 @@ suite "WakuNode - Relay": # Stop all nodes await allFutures(nodes.mapIt(it.stop())) + + asyncTest "Unsubscribe keep the subscription if other content topics also use the shard": + ## Setup + let + nodeKey = generateSecp256k1Key() + node = newTestWakuNode(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(0)) + + await node.start() + await node.mountRelay() + + ## Given + let + shard = "/waku/2/rs/1/1" + contentTopicA = DefaultContentTopic + contentTopicB = ContentTopic("/waku/2/default-content1/proto") + contentTopicC = ContentTopic("/waku/2/default-content2/proto") + handler: WakuRelayHandler = + proc( + pubsubTopic: PubsubTopic, + message: WakuMessage + ): Future[void] {.gcsafe, raises: [Defect].} = + discard pubsubTopic + discard message + + assert shard == getShard(contentTopicA).expect("Valid Topic"), "topic must use the same shard" + assert shard == getShard(contentTopicB).expect("Valid Topic"), "topic must use the same shard" + assert shard == getShard(contentTopicC).expect("Valid Topic"), "topic must use the same shard" + + ## When + node.subscribe((kind: ContentSub, topic: contentTopicA), some(handler)) + node.subscribe((kind: ContentSub, topic: contentTopicB), some(handler)) + node.subscribe((kind: ContentSub, topic: contentTopicC), some(handler)) + + ## Then + node.unsubscribe((kind: ContentUnsub, topic: contentTopicB)) + check node.wakuRelay.isSubscribed(shard) + + node.unsubscribe((kind: ContentUnsub, topic: contentTopicA)) + check node.wakuRelay.isSubscribed(shard) + + node.unsubscribe((kind: ContentUnsub, topic: contentTopicC)) + check not node.wakuRelay.isSubscribed(shard) + + ## Cleanup + await node.stop() \ No newline at end of file diff --git a/tests/waku_rln_relay/test_wakunode_rln_relay.nim b/tests/waku_rln_relay/test_wakunode_rln_relay.nim index 4e9ffa3a0..dac93782b 100644 --- a/tests/waku_rln_relay/test_wakunode_rln_relay.nim +++ b/tests/waku_rln_relay/test_wakunode_rln_relay.nim @@ -77,7 +77,7 @@ procSuite "WakuNode - RLN relay": completionFut.complete(true) # mount the relay handler - node3.subscribe(DefaultPubsubTopic, relayHandler) + node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)) await sleepAsync(2000.millis) # prepare the message payload @@ -91,7 +91,7 @@ procSuite "WakuNode - RLN relay": ## node1 publishes a message with a rate limit proof, the message is then relayed to node2 which in turn ## verifies the rate limit proof of the message and relays the message to node3 ## verification at node2 occurs inside a topic validator which is installed as part of the waku-rln-relay mount proc - await node1.publish(DefaultPubsubTopic, message) + await node1.publish(some(DefaultPubsubTopic), message) await sleepAsync(2000.millis) @@ -141,8 +141,8 @@ procSuite "WakuNode - RLN relay": rxMessagesTopic2 = rxMessagesTopic2 + 1 # mount the relay handlers - nodes[2].subscribe(pubsubTopics[0], relayHandler) - nodes[2].subscribe(pubsubTopics[1], relayHandler) + nodes[2].subscribe((kind: PubsubSub, topic: pubsubTopics[0]), some(relayHandler)) + nodes[2].subscribe((kind: PubsubSub, topic: pubsubTopics[1]), some(relayHandler)) await sleepAsync(1000.millis) # generate some messages with rln proofs first. generating @@ -165,8 +165,8 @@ procSuite "WakuNode - RLN relay": # publish 3 messages from node[0] (last 2 are spam, window is 10 secs) # publish 3 messages from node[1] (last 2 are spam, window is 10 secs) - for msg in messages1: await nodes[0].publish(pubsubTopics[0], msg) - for msg in messages2: await nodes[1].publish(pubsubTopics[1], msg) + for msg in messages1: await nodes[0].publish(some(pubsubTopics[0]), msg) + for msg in messages2: await nodes[1].publish(some(pubsubTopics[1]), msg) # wait for gossip to propagate await sleepAsync(5000.millis) @@ -237,7 +237,7 @@ procSuite "WakuNode - RLN relay": completionFut.complete(true) # mount the relay handler - node3.subscribe(DefaultPubsubTopic, relayHandler) + node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)) await sleepAsync(2000.millis) # prepare the message payload @@ -266,7 +266,7 @@ procSuite "WakuNode - RLN relay": ## attempts to verify the rate limit proof and fails hence does not relay the message to node3, thus the relayHandler of node3 ## never gets called ## verification at node2 occurs inside a topic validator which is installed as part of the waku-rln-relay mount proc - await node1.publish(DefaultPubsubTopic, message) + await node1.publish(some(DefaultPubsubTopic), message) await sleepAsync(2000.millis) check: @@ -369,7 +369,7 @@ procSuite "WakuNode - RLN relay": # mount the relay handler for node3 - node3.subscribe(DefaultPubsubTopic, relayHandler) + node3.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)) await sleepAsync(2000.millis) ## node1 publishes and relays 4 messages to node2 @@ -378,10 +378,10 @@ procSuite "WakuNode - RLN relay": ## node2 should detect either of wm1 or wm2 as spam and not relay it ## node2 should relay wm3 to node3 ## node2 should not relay wm4 because it has no valid rln proof - await node1.publish(DefaultPubsubTopic, wm1) - await node1.publish(DefaultPubsubTopic, wm2) - await node1.publish(DefaultPubsubTopic, wm3) - await node1.publish(DefaultPubsubTopic, wm4) + await node1.publish(some(DefaultPubsubTopic), wm1) + await node1.publish(some(DefaultPubsubTopic), wm2) + await node1.publish(some(DefaultPubsubTopic), wm3) + await node1.publish(some(DefaultPubsubTopic), wm4) await sleepAsync(2000.millis) let @@ -471,14 +471,14 @@ procSuite "WakuNode - RLN relay": completionFut3.complete(true) # mount the relay handler for node2 - node2.subscribe(DefaultPubsubTopic, relayHandler) + node2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler)) await sleepAsync(2000.millis) - await node1.publish(DefaultPubsubTopic, wm1) + await node1.publish(some(DefaultPubsubTopic), wm1) await sleepAsync(10.seconds) - await node1.publish(DefaultPubsubTopic, wm2) + await node1.publish(some(DefaultPubsubTopic), wm2) await sleepAsync(10.seconds) - await node1.publish(DefaultPubsubTopic, wm3) + await node1.publish(some(DefaultPubsubTopic), wm3) let res1 = await completionFut1.withTimeout(10.seconds) diff --git a/tests/wakunode2/test_validators.nim b/tests/wakunode2/test_validators.nim index d28273358..c1c05a0e8 100644 --- a/tests/wakunode2/test_validators.nim +++ b/tests/wakunode2/test_validators.nim @@ -61,7 +61,7 @@ suite "WakuNode2 - Validators": msgReceived += 1 # Subscribe all nodes to the same topic/handler - for node in nodes: node.wakuRelay.subscribe(spamProtectedTopic, handler) + for node in nodes: discard node.wakuRelay.subscribe(spamProtectedTopic, handler) await sleepAsync(500.millis) # Each node publishes 10 signed messages @@ -74,7 +74,7 @@ suite "WakuNode2 - Validators": # Include signature msg.meta = secretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0..63] - await nodes[i].publish(spamProtectedTopic, msg) + await nodes[i].publish(some(spamProtectedTopic), msg) # Wait for gossip await sleepAsync(2.seconds) @@ -133,7 +133,7 @@ suite "WakuNode2 - Validators": await sleepAsync(500.millis) # Subscribe all nodes to the same topic/handler - for node in nodes: node.wakuRelay.subscribe(spamProtectedTopic, handler) + for node in nodes: discard node.wakuRelay.subscribe(spamProtectedTopic, handler) await sleepAsync(500.millis) # Each node sends 5 messages, signed but with a non-whitelisted key (total = 25) @@ -146,7 +146,7 @@ suite "WakuNode2 - Validators": # Sign the message with a wrong key msg.meta = wrongSecretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0..63] - await nodes[i].publish(spamProtectedTopic, msg) + await nodes[i].publish(some(spamProtectedTopic), msg) # Each node sends 5 messages that are not signed (total = 25) for i in 0..<5: @@ -154,7 +154,7 @@ suite "WakuNode2 - Validators": let unsignedMessage = WakuMessage( payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic, version: 2, timestamp: now(), ephemeral: true) - await nodes[i].publish(spamProtectedTopic, unsignedMessage) + await nodes[i].publish(some(spamProtectedTopic), unsignedMessage) # Each node sends 5 messages that dont contain timestamp (total = 25) for i in 0..<5: @@ -162,7 +162,7 @@ suite "WakuNode2 - Validators": let unsignedMessage = WakuMessage( payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic, version: 2, timestamp: 0, ephemeral: true) - await nodes[i].publish(spamProtectedTopic, unsignedMessage) + await nodes[i].publish(some(spamProtectedTopic), unsignedMessage) # Each node sends 5 messages way BEFORE than the current timestmap (total = 25) for i in 0..<5: @@ -171,7 +171,7 @@ suite "WakuNode2 - Validators": let unsignedMessage = WakuMessage( payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic, version: 2, timestamp: beforeTimestamp, ephemeral: true) - await nodes[i].publish(spamProtectedTopic, unsignedMessage) + await nodes[i].publish(some(spamProtectedTopic), unsignedMessage) # Each node sends 5 messages way LATER than the current timestmap (total = 25) for i in 0..<5: @@ -180,7 +180,7 @@ suite "WakuNode2 - Validators": let unsignedMessage = WakuMessage( payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic, version: 2, timestamp: afterTimestamp, ephemeral: true) - await nodes[i].publish(spamProtectedTopic, unsignedMessage) + await nodes[i].publish(some(spamProtectedTopic), unsignedMessage) # Wait for gossip await sleepAsync(4.seconds) @@ -227,7 +227,7 @@ suite "WakuNode2 - Validators": msgReceived += 1 # Subscribe all nodes to the same topic/handler - for node in nodes: node.wakuRelay.subscribe(spamProtectedTopic, handler) + for node in nodes: discard node.wakuRelay.subscribe(spamProtectedTopic, handler) await sleepAsync(500.millis) # Add signed message validator to all nodes. They will only route signed messages @@ -255,7 +255,7 @@ suite "WakuNode2 - Validators": let unsignedMessage = WakuMessage( payload: urandom(1*(10^3)), contentTopic: spamProtectedTopic, version: 2, timestamp: now(), ephemeral: true) - await nodes[0].publish(spamProtectedTopic, unsignedMessage) + await nodes[0].publish(some(spamProtectedTopic), unsignedMessage) # nodes[0] spams 50 wrongly signed messages (nodes[0] just knows of nodes[1]) for j in 0..<50: @@ -264,7 +264,7 @@ suite "WakuNode2 - Validators": version: 2, timestamp: now(), ephemeral: true) # Sign the message with a wrong key msg.meta = wrongSecretKey.sign(SkMessage(spamProtectedTopic.msgHash(msg))).toRaw()[0..63] - await nodes[0].publish(spamProtectedTopic, msg) + await nodes[0].publish(some(spamProtectedTopic), msg) # Wait for gossip await sleepAsync(2.seconds) diff --git a/tests/wakunode_jsonrpc/test_jsonrpc_relay.nim b/tests/wakunode_jsonrpc/test_jsonrpc_relay.nim index f54d7399c..e99e2edaa 100644 --- a/tests/wakunode_jsonrpc/test_jsonrpc_relay.nim +++ b/tests/wakunode_jsonrpc/test_jsonrpc_relay.nim @@ -22,10 +22,6 @@ import ../testlib/wakucore, ../testlib/wakunode -proc newTestMessageCache(): relay_api.MessageCache = - relay_api.MessageCache.init(capacity=30) - - suite "Waku v2 JSON-RPC API - Relay": asyncTest "subscribe and unsubscribe from topics": @@ -33,7 +29,7 @@ suite "Waku v2 JSON-RPC API - Relay": let node = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)) await node.start() - await node.mountRelay(topics = @[DefaultPubsubTopic]) + await node.mountRelay(@[]) # JSON-RPC server let @@ -41,7 +37,8 @@ suite "Waku v2 JSON-RPC API - Relay": ta = initTAddress(ValidIpAddress.init("0.0.0.0"), rpcPort) server = newRpcHttpServer([ta]) - installRelayApiHandlers(node, server, newTestMessageCache()) + let cache = MessageCache[string].init(capacity=30) + installRelayApiHandlers(node, server, cache) server.start() # JSON-RPC client @@ -67,16 +64,14 @@ suite "Waku v2 JSON-RPC API - Relay": subResp == true check: # Node is now subscribed to default + new topics - subTopics.len == 1 + newTopics.len - DefaultPubsubTopic in subTopics + subTopics.len == newTopics.len newTopics.allIt(it in subTopics) check: unsubResp == true check: # Node is now unsubscribed from new topics - unsubTopics.len == 1 - DefaultPubsubTopic in unsubTopics + unsubTopics.len == 0 newTopics.allIt(it notin unsubTopics) await server.stop() @@ -110,14 +105,14 @@ suite "Waku v2 JSON-RPC API - Relay": await srcNode.connectToNodes(@[dstNode.peerInfo.toRemotePeerInfo()]) - # RPC server (source node) let rpcPort = Port(8548) ta = initTAddress(ValidIpAddress.init("0.0.0.0"), rpcPort) server = newRpcHttpServer([ta]) - installRelayApiHandlers(srcNode, server, newTestMessageCache()) + let cache = MessageCache[string].init(capacity=30) + installRelayApiHandlers(srcNode, server, cache) server.start() # JSON-RPC client @@ -131,7 +126,7 @@ suite "Waku v2 JSON-RPC API - Relay": proc dstHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = dstHandlerFut.complete((topic, msg)) - dstNode.subscribe(pubSubTopic, dstHandler) + dstNode.subscribe((kind: PubsubSub, topic: pubsubTopic), some(dstHandler)) ## When let rpcMessage = WakuMessageRPC( @@ -162,7 +157,7 @@ suite "Waku v2 JSON-RPC API - Relay": await server.closeWait() await allFutures(srcNode.stop(), dstNode.stop()) - asyncTest "get latest messages received from topics cache": + asyncTest "get latest messages received from pubsub topics cache": ## Setup let pubSubTopic = "test-jsonrpc-pubsub-topic" @@ -176,24 +171,26 @@ suite "Waku v2 JSON-RPC API - Relay": await allFutures(srcNode.start(), dstNode.start()) await srcNode.mountRelay(@[pubSubTopic]) - await dstNode.mountRelay(@[pubSubTopic]) + await dstNode.mountRelay(@[]) await srcNode.connectToNodes(@[dstNode.peerInfo.toRemotePeerInfo()]) - # RPC server (destination node) let rpcPort = Port(8549) ta = initTAddress(ValidIpAddress.init("0.0.0.0"), rpcPort) server = newRpcHttpServer([ta]) - installRelayApiHandlers(dstNode, server, newTestMessageCache()) + let cache = MessageCache[string].init(capacity=30) + installRelayApiHandlers(dstNode, server, cache) server.start() # JSON-RPC client let client = newRpcHttpClient() await client.connect("127.0.0.1", rpcPort, false) + discard await client.post_waku_v2_relay_v1_subscriptions(@[pubSubTopic]) + ## Given let messages = @[ fakeWakuMessage(payload= @[byte 70], contentTopic=contentTopic), @@ -204,7 +201,7 @@ suite "Waku v2 JSON-RPC API - Relay": ## When for msg in messages: - await srcNode.publish(pubSubTopic, msg) + await srcNode.publish(some(pubSubTopic), msg) await sleepAsync(200.millis) @@ -222,3 +219,66 @@ suite "Waku v2 JSON-RPC API - Relay": await server.stop() await server.closeWait() await allFutures(srcNode.stop(), dstNode.stop()) + + asyncTest "get latest messages received from content topics cache": + ## Setup + let contentTopic = DefaultContentTopic + + # Relay nodes setup + let + srcNode = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)) + dstNode = newTestWakuNode(generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)) + + await allFutures(srcNode.start(), dstNode.start()) + + let shard = getShard(contentTopic).expect("Valid Shard") + + await srcNode.mountRelay(@[shard]) + await dstNode.mountRelay(@[]) + + await srcNode.connectToNodes(@[dstNode.peerInfo.toRemotePeerInfo()]) + + # RPC server (destination node) + let + rpcPort = Port(8550) + ta = initTAddress(ValidIpAddress.init("0.0.0.0"), rpcPort) + server = newRpcHttpServer([ta]) + + let cache = MessageCache[string].init(capacity=30) + installRelayApiHandlers(dstNode, server, cache) + server.start() + + # JSON-RPC client + let client = newRpcHttpClient() + await client.connect("127.0.0.1", rpcPort, false) + + discard await client.post_waku_v2_relay_v1_auto_subscriptions(@[contentTopic]) + + ## Given + let messages = @[ + fakeWakuMessage(payload= @[byte 70], contentTopic=contentTopic), + fakeWakuMessage(payload= @[byte 71], contentTopic=contentTopic), + fakeWakuMessage(payload= @[byte 72], contentTopic=contentTopic), + fakeWakuMessage(payload= @[byte 73], contentTopic=contentTopic) + ] + + ## When + for msg in messages: + await srcNode.publish(none(PubsubTopic), msg) + + await sleepAsync(200.millis) + + let dstMessages = await client.get_waku_v2_relay_v1_auto_messages(contentTopic) + + ## Then + check: + dstMessages.len == 4 + dstMessages[2].payload == base64.encode(messages[2].payload) + dstMessages[2].contentTopic.get() == messages[2].contentTopic + dstMessages[2].timestamp.get() == messages[2].timestamp + dstMessages[2].version.get() == messages[2].version + + ## Cleanup + await server.stop() + await server.closeWait() + await allFutures(srcNode.stop(), dstNode.stop()) \ No newline at end of file diff --git a/tests/wakunode_rest/test_rest_filter.nim b/tests/wakunode_rest/test_rest_filter.nim index a2b4ea1d3..6170e9ce8 100644 --- a/tests/wakunode_rest/test_rest_filter.nim +++ b/tests/wakunode_rest/test_rest_filter.nim @@ -23,7 +23,6 @@ import ../../waku/waku_relay, ../../waku/waku_filter_v2/subscriptions, ../../waku/waku_filter_v2/common, - ../../waku/waku_api/rest/relay/topic_cache, ../../waku/waku_api/rest/relay/handlers as relay_api, ../../waku/waku_api/rest/relay/client as relay_api_client, ../testlib/wakucore, @@ -74,7 +73,7 @@ proc init(T: type RestFilterTest): Future[T] {.async.} = testSetup.messageCache = filter_api.MessageCache.init() installFilterRestApiHandlers(testSetup.restServer.router, testSetup.subscriberNode, testSetup.messageCache) - let topicCache = TopicCache.init() + let topicCache = MessageCache[string].init() installRelayApiHandlers(testSetup.restServerForService.router, testSetup.serviceNode, topicCache) testSetup.restServer.start() @@ -244,7 +243,7 @@ suite "Waku v2 Rest API - Filter V2": subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId restFilterTest.messageCache.subscribe(DefaultPubsubTopic) - restFilterTest.serviceNode.subscribe(DefaultPubsubTopic) + restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)) # When var requestBody = FilterSubscribeRequest(requestId: "1234", diff --git a/tests/wakunode_rest/test_rest_lightpush.nim b/tests/wakunode_rest/test_rest_lightpush.nim index a05228beb..a41c86c5a 100644 --- a/tests/wakunode_rest/test_rest_lightpush.nim +++ b/tests/wakunode_rest/test_rest_lightpush.nim @@ -94,8 +94,8 @@ suite "Waku v2 Rest API - lightpush": # Given let restLightPushTest = await RestLightPushTest.init() - restLightPushTest.consumerNode.subscribe(DefaultPubsubTopic) - restLightPushTest.serviceNode.subscribe(DefaultPubsubTopic) + restLightPushTest.consumerNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)) + restLightPushTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)) require: toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1 @@ -120,7 +120,7 @@ suite "Waku v2 Rest API - lightpush": # Given let restLightPushTest = await RestLightPushTest.init() - restLightPushTest.serviceNode.subscribe(DefaultPubsubTopic) + restLightPushTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)) require: toSeq(restLightPushTest.serviceNode.wakuRelay.subscribedTopics).len == 1 diff --git a/tests/wakunode_rest/test_rest_relay.nim b/tests/wakunode_rest/test_rest_relay.nim index 2d571b39d..be444c3e9 100644 --- a/tests/wakunode_rest/test_rest_relay.nim +++ b/tests/wakunode_rest/test_rest_relay.nim @@ -11,13 +11,13 @@ import ../../waku/common/base64, ../../waku/waku_core, ../../waku/waku_node, + ../../waku/waku_api/message_cache, ../../waku/waku_api/rest/server, ../../waku/waku_api/rest/client, ../../waku/waku_api/rest/responses, ../../waku/waku_api/rest/relay/types, ../../waku/waku_api/rest/relay/handlers as relay_api, ../../waku/waku_api/rest/relay/client as relay_api_client, - ../../waku/waku_api/rest/relay/topic_cache, ../../waku/waku_relay, ../../../waku/waku_rln_relay, ../testlib/wakucore, @@ -34,7 +34,7 @@ proc testWakuNode(): WakuNode = suite "Waku v2 Rest API - Relay": - asyncTest "Subscribe a node to an array of topics - POST /relay/v1/subscriptions": + asyncTest "Subscribe a node to an array of pubsub topics - POST /relay/v1/subscriptions": # Given let node = testWakuNode() await node.start() @@ -44,9 +44,9 @@ suite "Waku v2 Rest API - Relay": let restAddress = ValidIpAddress.init("0.0.0.0") let restServer = RestServerRef.init(restAddress, restPort).tryGet() - let topicCache = TopicCache.init() + let cache = MessageCache[string].init() - installRelayPostSubscriptionsV1Handler(restServer.router, node, topicCache) + installRelayApiHandlers(restServer.router, node, cache) restServer.start() let pubSubTopics = @[ @@ -67,35 +67,39 @@ suite "Waku v2 Rest API - Relay": response.data == "OK" check: - topicCache.isSubscribed("pubsub-topic-1") - topicCache.isSubscribed("pubsub-topic-2") - topicCache.isSubscribed("pubsub-topic-3") + cache.isSubscribed("pubsub-topic-1") + cache.isSubscribed("pubsub-topic-2") + cache.isSubscribed("pubsub-topic-3") check: - # Node should be subscribed to default + new topics toSeq(node.wakuRelay.subscribedTopics).len == pubSubTopics.len await restServer.stop() await restServer.closeWait() await node.stop() - asyncTest "Unsubscribe a node from an array of topics - DELETE /relay/v1/subscriptions": + asyncTest "Unsubscribe a node from an array of pubsub topics - DELETE /relay/v1/subscriptions": # Given let node = testWakuNode() await node.start() - await node.mountRelay() + await node.mountRelay(@[ + "pubsub-topic-1", + "pubsub-topic-2", + "pubsub-topic-3", + "pubsub-topic-x", + ]) let restPort = Port(58012) let restAddress = ValidIpAddress.init("0.0.0.0") let restServer = RestServerRef.init(restAddress, restPort).tryGet() - let topicCache = TopicCache.init() - topicCache.subscribe("pubsub-topic-1") - topicCache.subscribe("pubsub-topic-2") - topicCache.subscribe("pubsub-topic-3") - topicCache.subscribe("pubsub-topic-x") + let cache = MessageCache[string].init() + cache.subscribe("pubsub-topic-1") + cache.subscribe("pubsub-topic-2") + cache.subscribe("pubsub-topic-3") + cache.subscribe("pubsub-topic-x") - installRelayDeleteSubscriptionsV1Handler(restServer.router, node, topicCache) + installRelayApiHandlers(restServer.router, node, cache) restServer.start() let pubSubTopics = @[ @@ -117,17 +121,22 @@ suite "Waku v2 Rest API - Relay": response.data == "OK" check: - not topicCache.isSubscribed("pubsub-topic-1") - not topicCache.isSubscribed("pubsub-topic-2") - not topicCache.isSubscribed("pubsub-topic-3") - topicCache.isSubscribed("pubsub-topic-x") + not cache.isSubscribed("pubsub-topic-1") + not node.wakuRelay.isSubscribed("pubsub-topic-1") + not cache.isSubscribed("pubsub-topic-2") + not node.wakuRelay.isSubscribed("pubsub-topic-2") + not cache.isSubscribed("pubsub-topic-3") + not node.wakuRelay.isSubscribed("pubsub-topic-3") + cache.isSubscribed("pubsub-topic-x") + node.wakuRelay.isSubscribed("pubsub-topic-x") + not cache.isSubscribed("pubsub-topic-y") + not node.wakuRelay.isSubscribed("pubsub-topic-y") await restServer.stop() await restServer.closeWait() await node.stop() - - asyncTest "Get the latest messages for topic - GET /relay/v1/messages/{topic}": + asyncTest "Get the latest messages for a pubsub topic - GET /relay/v1/messages/{topic}": # Given let node = testWakuNode() await node.start() @@ -144,13 +153,13 @@ suite "Waku v2 Rest API - Relay": fakeWakuMessage(contentTopic = "content-topic-x", payload = toBytes("TEST-1")), ] - let topicCache = TopicCache.init() + let cache = MessageCache[string].init() - topicCache.subscribe(pubSubTopic) + cache.subscribe(pubSubTopic) for msg in messages: - topicCache.addMessage(pubSubTopic, msg) + cache.addMessage(pubSubTopic, msg) - installRelayGetMessagesV1Handler(restServer.router, node, topicCache) + installRelayApiHandlers(restServer.router, node, cache) restServer.start() # When @@ -164,20 +173,20 @@ suite "Waku v2 Rest API - Relay": response.data.len == 3 response.data.all do (msg: RelayWakuMessage) -> bool: msg.payload == base64.encode("TEST-1") and - msg.contentTopic.get().string == "content-topic-x" and + msg.contentTopic.get() == "content-topic-x" and msg.version.get() == 2 and msg.timestamp.get() != Timestamp(0) check: - topicCache.isSubscribed(pubSubTopic) - topicCache.getMessages(pubSubTopic).tryGet().len == 0 + cache.isSubscribed(pubSubTopic) + cache.getMessages(pubSubTopic).tryGet().len == 0 await restServer.stop() await restServer.closeWait() await node.stop() - asyncTest "Post a message to topic - POST /relay/v1/messages/{topic}": + asyncTest "Post a message to a pubsub topic - POST /relay/v1/messages/{topic}": ## "Relay API: publish and subscribe/unsubscribe": # Given let node = testWakuNode() @@ -192,26 +201,18 @@ suite "Waku v2 Rest API - Relay": let restAddress = ValidIpAddress.init("0.0.0.0") let restServer = RestServerRef.init(restAddress, restPort).tryGet() - let topicCache = TopicCache.init() + let cache = MessageCache[string].init() - installRelayApiHandlers(restServer.router, node, topicCache) + installRelayApiHandlers(restServer.router, node, cache) restServer.start() let client = newRestHttpClient(initTAddress(restAddress, restPort)) - node.subscribe(DefaultPubsubTopic) + node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)) require: toSeq(node.wakuRelay.subscribedTopics).len == 1 - # When - let newTopics = @[ - PubSubTopic("pubsub-topic-1"), - PubSubTopic("pubsub-topic-2"), - PubSubTopic("pubsub-topic-3") - ] - discard await client.relayPostSubscriptionsV1(newTopics) - let response = await client.relayPostMessagesV1(DefaultPubsubTopic, RelayWakuMessage( payload: base64.encode("TEST-PAYLOAD"), contentTopic: some(DefaultContentTopic), @@ -224,8 +225,193 @@ suite "Waku v2 Rest API - Relay": $response.contentType == $MIMETYPE_TEXT response.data == "OK" - # TODO: Check for the message to be published to the topic + await restServer.stop() + await restServer.closeWait() + await node.stop() + + # Autosharding API + + asyncTest "Subscribe a node to an array of content topics - POST /relay/v1/auto/subscriptions": + # Given + let node = testWakuNode() + await node.start() + await node.mountRelay() + + let restPort = Port(58011) + let restAddress = ValidIpAddress.init("0.0.0.0") + let restServer = RestServerRef.init(restAddress, restPort).tryGet() + + let cache = MessageCache[string].init() + + installRelayApiHandlers(restServer.router, node, cache) + restServer.start() + + let contentTopics = @[ + ContentTopic("/waku/2/default-content1/proto"), + ContentTopic("/waku/2/default-content2/proto"), + ContentTopic("/waku/2/default-content3/proto") + ] + + let shards = contentTopics.mapIt(getShard(it).expect("Valid Shard")).deduplicate() + + # When + let client = newRestHttpClient(initTAddress(restAddress, restPort)) + let requestBody = RelayPostSubscriptionsRequest(contentTopics) + let response = await client.relayPostAutoSubscriptionsV1(requestBody) + + # Then + check: + response.status == 200 + $response.contentType == $MIMETYPE_TEXT + response.data == "OK" + + check: + cache.isSubscribed(contentTopics[0]) + cache.isSubscribed(contentTopics[1]) + cache.isSubscribed(contentTopics[2]) + + check: + # Node should be subscribed to all shards + toSeq(node.wakuRelay.subscribedTopics).len == shards.len await restServer.stop() await restServer.closeWait() await node.stop() + + asyncTest "Unsubscribe a node from an array of content topics - DELETE /relay/v1/auto/subscriptions": + # Given + let node = testWakuNode() + await node.start() + await node.mountRelay() + + let restPort = Port(58012) + let restAddress = ValidIpAddress.init("0.0.0.0") + let restServer = RestServerRef.init(restAddress, restPort).tryGet() + + let contentTopics = @[ + ContentTopic("/waku/2/default-content1/proto"), + ContentTopic("/waku/2/default-content2/proto"), + ContentTopic("/waku/2/default-content3/proto"), + ContentTopic("/waku/2/default-contentX/proto") + ] + + let cache = MessageCache[string].init() + cache.subscribe(contentTopics[0]) + cache.subscribe(contentTopics[1]) + cache.subscribe(contentTopics[2]) + cache.subscribe("/waku/2/default-contentY/proto") + + installRelayApiHandlers(restServer.router, node, cache) + restServer.start() + + # When + let client = newRestHttpClient(initTAddress(restAddress, restPort)) + let requestBody = RelayDeleteSubscriptionsRequest(contentTopics) + let response = await client.relayDeleteAutoSubscriptionsV1(requestBody) + + # Then + check: + response.status == 200 + $response.contentType == $MIMETYPE_TEXT + response.data == "OK" + + check: + not cache.isSubscribed(contentTopics[1]) + not cache.isSubscribed(contentTopics[2]) + not cache.isSubscribed(contentTopics[3]) + cache.isSubscribed("/waku/2/default-contentY/proto") + + await restServer.stop() + await restServer.closeWait() + await node.stop() + + asyncTest "Get the latest messages for a content topic - GET /relay/v1/auto/messages/{topic}": + # Given + let node = testWakuNode() + await node.start() + await node.mountRelay() + + let restPort = Port(58013) + let restAddress = ValidIpAddress.init("0.0.0.0") + let restServer = RestServerRef.init(restAddress, restPort).tryGet() + + let contentTopic = DefaultContentTopic + let messages = @[ + fakeWakuMessage(contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")), + fakeWakuMessage(contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")), + fakeWakuMessage(contentTopic = DefaultContentTopic, payload = toBytes("TEST-1")), + ] + + let cache = MessageCache[string].init() + + cache.subscribe(contentTopic) + for msg in messages: + cache.addMessage(contentTopic, msg) + + installRelayApiHandlers(restServer.router, node, cache) + restServer.start() + + # When + let client = newRestHttpClient(initTAddress(restAddress, restPort)) + let response = await client.relayGetAutoMessagesV1(contentTopic) + + # Then + check: + response.status == 200 + $response.contentType == $MIMETYPE_JSON + response.data.len == 3 + response.data.all do (msg: RelayWakuMessage) -> bool: + msg.payload == base64.encode("TEST-1") and + msg.contentTopic.get() == DefaultContentTopic and + msg.version.get() == 2 and + msg.timestamp.get() != Timestamp(0) + + check: + cache.isSubscribed(contentTopic) + cache.getMessages(contentTopic).tryGet().len == 0 # The cache is cleared when getMessage is called + + await restServer.stop() + await restServer.closeWait() + await node.stop() + + asyncTest "Post a message to a content topic - POST /relay/v1/auto/messages/{topic}": + ## "Relay API: publish and subscribe/unsubscribe": + # Given + let node = testWakuNode() + await node.start() + await node.mountRelay() + await node.mountRlnRelay(WakuRlnConfig(rlnRelayDynamic: false, + rlnRelayCredIndex: some(1.uint), + rlnRelayTreePath: genTempPath("rln_tree", "wakunode_1"))) + + # RPC server setup + let restPort = Port(58014) + let restAddress = ValidIpAddress.init("0.0.0.0") + let restServer = RestServerRef.init(restAddress, restPort).tryGet() + + let cache = MessageCache[string].init() + installRelayApiHandlers(restServer.router, node, cache) + restServer.start() + + let client = newRestHttpClient(initTAddress(restAddress, restPort)) + + node.subscribe((kind: ContentSub, topic: DefaultContentTopic)) + require: + toSeq(node.wakuRelay.subscribedTopics).len == 1 + + # When + let response = await client.relayPostAutoMessagesV1(DefaultContentTopic, RelayWakuMessage( + payload: base64.encode("TEST-PAYLOAD"), + contentTopic: some(DefaultContentTopic), + timestamp: some(int64(2022)) + )) + + # Then + check: + response.status == 200 + $response.contentType == $MIMETYPE_TEXT + response.data == "OK" + + await restServer.stop() + await restServer.closeWait() + await node.stop() \ No newline at end of file diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index a3a8f764b..bd7f1eb5b 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -25,6 +25,7 @@ import libp2p/transports/wstransport import ../waku_core, + ../waku_core/topics/sharding, ../waku_relay, ../waku_archive, ../waku_store, @@ -101,6 +102,7 @@ type announcedAddresses* : seq[MultiAddress] started*: bool # Indicates that node has started listening topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent] + contentTopicHandlers: Table[ContentTopic, TopicHandler] proc getAutonatService*(rng: ref HmacDrbgContext): AutonatService = ## AutonatService request other peers to dial us back @@ -220,62 +222,104 @@ proc registerRelayDefaultHandler(node: WakuNode, topic: PubsubTopic) = await filterHandler(topic, msg) await archiveHandler(topic, msg) - node.wakuRelay.subscribe(topic, defaultHandler) + discard node.wakuRelay.subscribe(topic, defaultHandler) - -proc subscribe*(node: WakuNode, topic: PubsubTopic) = +proc subscribe*(node: WakuNode, subscription: SubscriptionEvent, handler = none(WakuRelayHandler)) = + ## Subscribes to a PubSub or Content topic. Triggers handler when receiving messages on + ## this topic. WakuRelayHandler is a method that takes a topic and a Waku message. + if node.wakuRelay.isNil(): error "Invalid API call to `subscribe`. WakuRelay not mounted." return - debug "subscribe", pubsubTopic= topic + let (pubsubTopic, contentTopicOp) = + case subscription.kind: + of ContentSub: + let shard = getShard((subscription.topic)).valueOr: + error "Autosharding error", error=error + return - node.topicSubscriptionQueue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: topic)) - node.registerRelayDefaultHandler(topic) - -proc subscribe*(node: WakuNode, topic: PubsubTopic, handler: WakuRelayHandler) = - ## Subscribes to a PubSub topic. Triggers handler when receiving messages on - ## this topic. TopicHandler is a method that takes a topic and some data. - if node.wakuRelay.isNil(): - error "Invalid API call to `subscribe`. WakuRelay not mounted." + (shard, some(subscription.topic)) + of PubsubSub: (subscription.topic, none(ContentTopic)) + else: return + + if contentTopicOp.isSome() and node.contentTopicHandlers.hasKey(contentTopicOp.get()): + error "Invalid API call to `subscribe`. Was already subscribed" return - debug "subscribe", pubsubTopic= topic + debug "subscribe", pubsubTopic=pubsubTopic - node.topicSubscriptionQueue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: topic)) - node.registerRelayDefaultHandler(topic) - node.wakuRelay.subscribe(topic, handler) + node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic)) + node.registerRelayDefaultHandler(pubsubTopic) -proc unsubscribe*(node: WakuNode, topic: PubsubTopic) = - ## Unsubscribes from a specific PubSub topic. + if handler.isSome(): + let wrappedHandler = node.wakuRelay.subscribe(pubsubTopic, handler.get()) + if contentTopicOp.isSome(): + node.contentTopicHandlers[contentTopicOp.get()] = wrappedHandler + +proc unsubscribe*(node: WakuNode, subscription: SubscriptionEvent) = + ## Unsubscribes from a specific PubSub or Content topic. + if node.wakuRelay.isNil(): error "Invalid API call to `unsubscribe`. WakuRelay not mounted." return - info "unsubscribe", pubsubTopic=topic + let (pubsubTopic, contentTopicOp) = + case subscription.kind: + of ContentUnsub: + let shard = getShard((subscription.topic)).valueOr: + error "Autosharding error", error=error + return - node.topicSubscriptionQueue.emit(SubscriptionEvent(kind: PubsubUnsub, pubsubUnsub: topic)) - node.wakuRelay.unsubscribe(topic) + (shard, some(subscription.topic)) + of PubsubUnsub: (subscription.topic, none(ContentTopic)) + else: return + if not node.wakuRelay.isSubscribed(pubsubTopic): + error "Invalid API call to `unsubscribe`. Was not subscribed" + return -proc publish*(node: WakuNode, topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} = - ## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a - ## `contentTopic` field for light node functionality. This field may be also - ## be omitted. + if contentTopicOp.isSome(): + # Remove this handler only + var handler: TopicHandler + if node.contentTopicHandlers.pop(contentTopicOp.get(), handler): + debug "unsubscribe", contentTopic=contentTopicOp.get() + node.wakuRelay.unsubscribe(pubsubTopic, handler) + + if contentTopicOp.isNone() or node.wakuRelay.topics.getOrDefault(pubsubTopic).len == 1: + # Remove all handlers + debug "unsubscribe", pubsubTopic=pubsubTopic + node.wakuRelay.unsubscribeAll(pubsubTopic) + node.topicSubscriptionQueue.emit((kind: PubsubUnsub, topic: pubsubTopic)) + +proc publish*( + node: WakuNode, + pubsubTopicOp: Option[PubsubTopic], + message: WakuMessage + ) {.async, gcsafe.} = + ## Publish a `WakuMessage`. Pubsub topic contains; none, a named or static shard. + ## `WakuMessage` should contain a `contentTopic` field for light node functionality. + ## It is also used to determine the shard. if node.wakuRelay.isNil(): error "Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead." # TODO: Improve error handling return - discard await node.wakuRelay.publish(topic, message) + let pubsubTopic = pubsubTopicOp.valueOr: + getShard(message.contentTopic).valueOr: + error "Autosharding error", error=error + return + + #TODO instead of discard return error when 0 peers received the message + discard await node.wakuRelay.publish(pubsubTopic, message) trace "waku.relay published", - peerId=node.peerId, - pubsubTopic=topic, - hash=topic.digest(message).to0xHex(), - publishTime=getNowInNanosecondTime() + peerId=node.peerId, + pubsubTopic=pubsubTopic, + hash=pubsubTopic.digest(message).to0xHex(), + publishTime=getNowInNanosecondTime() proc startRelay*(node: WakuNode) {.async.} = ## Setup and start relay protocol @@ -303,7 +347,7 @@ proc startRelay*(node: WakuNode) {.async.} = info "relay started successfully" proc mountRelay*(node: WakuNode, - topics: seq[string] = @[], + pubsubTopics: seq[string] = @[], peerExchangeHandler = none(RoutingRecordsHandler)) {.async, gcsafe.} = if not node.wakuRelay.isNil(): error "wakuRelay already mounted, skipping" @@ -332,8 +376,8 @@ proc mountRelay*(node: WakuNode, info "relay mounted successfully" # Subscribe to topics - for topic in topics: - node.subscribe(topic) + for pubsubTopic in pubsubTopics: + node.subscribe((kind: PubsubSub, topic: pubsubTopic)) ## Waku filter diff --git a/waku/waku_api/cache_handlers.nim b/waku/waku_api/cache_handlers.nim new file mode 100644 index 000000000..c37850576 --- /dev/null +++ b/waku/waku_api/cache_handlers.nim @@ -0,0 +1,23 @@ +when (NimMajor, NimMinor) < (1, 4): + {.push raises: [Defect].} +else: + {.push raises: [].} + +import + chronos, + chronicles +import + ../waku_relay, + ../waku_core, + ./message_cache + +##### Message handler + +proc messageCacheHandler*(cache: MessageCache[string]): WakuRelayHandler = + return proc(pubsubTopic: string, msg: WakuMessage): Future[void] {.async, closure.} = + cache.addMessage(PubSubTopic(pubsubTopic), msg) + +proc autoMessageCacheHandler*(cache: MessageCache[string]): WakuRelayHandler = + return proc(pubsubTopic: string, msg: WakuMessage): Future[void] {.async, closure.} = + if cache.isSubscribed(msg.contentTopic): + cache.addMessage(msg.contentTopic, msg) \ No newline at end of file diff --git a/waku/waku_api/jsonrpc/relay/callsigs.nim b/waku/waku_api/jsonrpc/relay/callsigs.nim index 332e31740..679eaf2f4 100644 --- a/waku/waku_api/jsonrpc/relay/callsigs.nim +++ b/waku/waku_api/jsonrpc/relay/callsigs.nim @@ -4,6 +4,11 @@ proc delete_waku_v2_relay_v1_subscriptions(topics: seq[PubsubTopic]): bool proc post_waku_v2_relay_v1_message(topic: PubsubTopic, message: WakuMessageRPC): bool proc get_waku_v2_relay_v1_messages(topic: PubsubTopic): seq[WakuMessageRPC] +proc post_waku_v2_relay_v1_auto_subscriptions(topics: seq[ContentTopic]): bool +proc delete_waku_v2_relay_v1_auto_subscriptions(topics: seq[ContentTopic]): bool +proc post_waku_v2_relay_v1_auto_message(message: WakuMessageRPC): bool +proc get_waku_v2_relay_v1_auto_messages(topic: ContentTopic): seq[WakuMessageRPC] + # Support for the Relay Private API has been deprecated. # This API existed for compatibility with the Waku v1 spec and encryption scheme. diff --git a/waku/waku_api/jsonrpc/relay/handlers.nim b/waku/waku_api/jsonrpc/relay/handlers.nim index 213b410c5..c52369abf 100644 --- a/waku/waku_api/jsonrpc/relay/handlers.nim +++ b/waku/waku_api/jsonrpc/relay/handlers.nim @@ -17,6 +17,7 @@ import ../../../waku_rln_relay/rln/wrappers, ../../../waku_node, ../../message_cache, + ../../cache_handlers, ../message from std/times import getTime @@ -29,54 +30,51 @@ logScope: const futTimeout* = 5.seconds # Max time to wait for futures -type - MessageCache* = message_cache.MessageCache[PubsubTopic] - - ## Waku Relay JSON-RPC API -proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageCache) = - if node.wakuRelay.isNil(): - debug "waku relay protocol is nil. skipping json rpc api handlers installation" - return - - let topicHandler = proc(topic: PubsubTopic, message: WakuMessage) {.async.} = - cache.addMessage(topic, message) - - # The node may already be subscribed to some topics when Relay API handlers - # are installed - for topic in node.wakuRelay.subscribedTopics: - node.subscribe(topic, topicHandler) - cache.subscribe(topic) - - - server.rpc("post_waku_v2_relay_v1_subscriptions") do (topics: seq[PubsubTopic]) -> bool: +proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageCache[string]) = + server.rpc("post_waku_v2_relay_v1_subscriptions") do (pubsubTopics: seq[PubsubTopic]) -> bool: + if pubsubTopics.len == 0: + raise newException(ValueError, "No pubsub topic provided") + ## Subscribes a node to a list of PubSub topics debug "post_waku_v2_relay_v1_subscriptions" # Subscribe to all requested topics - let newTopics = topics.filterIt(not cache.isSubscribed(it)) + let newTopics = pubsubTopics.filterIt(not cache.isSubscribed(it)) - for topic in newTopics: - cache.subscribe(topic) - node.subscribe(topic, topicHandler) + for pubsubTopic in newTopics: + if pubsubTopic == "": + raise newException(ValueError, "Empty pubsub topic") + + cache.subscribe(pubsubTopic) + node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(messageCacheHandler(cache))) return true - server.rpc("delete_waku_v2_relay_v1_subscriptions") do (topics: seq[PubsubTopic]) -> bool: + server.rpc("delete_waku_v2_relay_v1_subscriptions") do (pubsubTopics: seq[PubsubTopic]) -> bool: + if pubsubTopics.len == 0: + raise newException(ValueError, "No pubsub topic provided") + ## Unsubscribes a node from a list of PubSub topics debug "delete_waku_v2_relay_v1_subscriptions" # Unsubscribe all handlers from requested topics - let subscribedTopics = topics.filterIt(cache.isSubscribed(it)) + let subscribedTopics = pubsubTopics.filterIt(cache.isSubscribed(it)) - for topic in subscribedTopics: - node.unsubscribe(topic) - cache.unsubscribe(topic) + for pubsubTopic in subscribedTopics: + if pubsubTopic == "": + raise newException(ValueError, "Empty pubsub topic") + + cache.unsubscribe(pubsubTopic) + node.unsubscribe((kind: PubsubUnsub, topic: pubsubTopic)) return true server.rpc("post_waku_v2_relay_v1_message") do (pubsubTopic: PubsubTopic, msg: WakuMessageRPC) -> bool: + if pubsubTopic == "": + raise newException(ValueError, "Empty pubsub topic") + ## Publishes a WakuMessage to a PubSub topic debug "post_waku_v2_relay_v1_message", pubsubTopic=pubsubTopic @@ -84,10 +82,12 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC if payloadRes.isErr(): raise newException(ValueError, "invalid payload format: " & payloadRes.error) + if msg.contentTopic.isNone(): + raise newException(ValueError, "message has no content topic") + var message = WakuMessage( payload: payloadRes.value, - # TODO: Fail if the message doesn't have a content topic - contentTopic: msg.contentTopic.get(DefaultContentTopic), + contentTopic: msg.contentTopic.get(), version: msg.version.get(0'u32), timestamp: msg.timestamp.get(Timestamp(0)), ephemeral: msg.ephemeral.get(false) @@ -96,7 +96,8 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC # ensure the node is subscribed to the pubsubTopic. otherwise it risks publishing # to a topic with no connected peers if pubsubTopic notin node.wakuRelay.subscribedTopics(): - raise newException(ValueError, "Failed to publish: Node not subscribed to pubsubTopic: " & pubsubTopic) + raise newException( + ValueError, "Failed to publish: Node not subscribed to pubsubTopic: " & pubsubTopic) # if RLN is mounted, append the proof to the message if not node.wakuRlnRelay.isNil(): @@ -112,34 +113,127 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC elif result == MessageValidationResult.Spam: raise newException(ValueError, "Failed to publish: limit exceeded, try again later") elif result == MessageValidationResult.Valid: - debug "RLN proof validated successfully", pubSubTopic=pubSubTopic + debug "RLN proof validated successfully", pubSubTopic=pubsubTopic else: raise newException(ValueError, "Failed to publish: unknown RLN proof validation result") # if we reach here its either a non-RLN message or a RLN message with a valid proof - debug "Publishing message", pubSubTopic=pubSubTopic - let publishFut = node.publish(pubsubTopic, message) + debug "Publishing message", pubSubTopic=pubsubTopic, rln=defined(rln) + let publishFut = node.publish(some(pubsubTopic), message) if not await publishFut.withTimeout(futTimeout): raise newException(ValueError, "Failed to publish: timed out") return true - server.rpc("get_waku_v2_relay_v1_messages") do (topic: PubsubTopic) -> seq[WakuMessageRPC]: + server.rpc("get_waku_v2_relay_v1_messages") do (pubsubTopic: PubsubTopic) -> seq[WakuMessageRPC]: + if pubsubTopic == "": + raise newException(ValueError, "Empty pubsub topic") + ## Returns all WakuMessages received on a PubSub topic since the ## last time this method was called - debug "get_waku_v2_relay_v1_messages", topic=topic + debug "get_waku_v2_relay_v1_messages", topic=pubsubTopic - if not cache.isSubscribed(topic): - raise newException(ValueError, "Not subscribed to topic: " & topic) - - let msgRes = cache.getMessages(topic, clear=true) + let msgRes = cache.getMessages(pubsubTopic, clear=true) if msgRes.isErr(): - raise newException(ValueError, "Not subscribed to topic: " & topic) + raise newException(ValueError, "Not subscribed to pubsub topic: " & pubsubTopic) return msgRes.value.map(toWakuMessageRPC) + # Autosharding API -## Waku Relay Private JSON-RPC API (Whisper/Waku v1 compatibility) -## Support for the Relay Private API has been deprecated. -## This API existed for compatibility with the Waku v1/Whisper spec and encryption schemes. -## It is recommended to use the Relay API instead. + server.rpc("post_waku_v2_relay_v1_auto_subscriptions") do (contentTopics: seq[ContentTopic]) -> bool: + if contentTopics.len == 0: + raise newException(ValueError, "No content topic provided") + + ## Subscribes a node to a list of Content topics + debug "post_waku_v2_relay_v1_auto_subscriptions" + + let newTopics = contentTopics.filterIt(not cache.isSubscribed(it)) + + # Subscribe to all requested topics + for contentTopic in newTopics: + if contentTopic == "": + raise newException(ValueError, "Empty content topic") + + cache.subscribe(contentTopic) + node.subscribe((kind: ContentSub, topic: contentTopic), some(autoMessageCacheHandler(cache))) + + return true + + server.rpc("delete_waku_v2_relay_v1_auto_subscriptions") do (contentTopics: seq[ContentTopic]) -> bool: + if contentTopics.len == 0: + raise newException(ValueError, "No content topic provided") + + ## Unsubscribes a node from a list of Content topics + debug "delete_waku_v2_relay_v1_auto_subscriptions" + + let subscribedTopics = contentTopics.filterIt(cache.isSubscribed(it)) + + # Unsubscribe all handlers from requested topics + for contentTopic in subscribedTopics: + if contentTopic == "": + raise newException(ValueError, "Empty content topic") + + cache.unsubscribe(contentTopic) + node.unsubscribe((kind: ContentUnsub, topic: contentTopic)) + + return true + + server.rpc("post_waku_v2_relay_v1_auto_message") do (msg: WakuMessageRPC) -> bool: + ## Publishes a WakuMessage to a Content topic + debug "post_waku_v2_relay_v1_auto_message" + + let payloadRes = base64.decode(msg.payload) + if payloadRes.isErr(): + raise newException(ValueError, "invalid payload format: " & payloadRes.error) + + if msg.contentTopic.isNone(): + raise newException(ValueError, "message has no content topic") + + var message = WakuMessage( + payload: payloadRes.value, + contentTopic: msg.contentTopic.get(), + version: msg.version.get(0'u32), + timestamp: msg.timestamp.get(Timestamp(0)), + ephemeral: msg.ephemeral.get(false) + ) + + # if RLN is mounted, append the proof to the message + if not node.wakuRlnRelay.isNil(): + # append the proof to the message + let success = node.wakuRlnRelay.appendRLNProof(message, + float64(getTime().toUnix())) + if not success: + raise newException(ValueError, "Failed to publish: error appending RLN proof to message") + # validate the message before sending it + let result = node.wakuRlnRelay.validateMessage(message) + if result == MessageValidationResult.Invalid: + raise newException(ValueError, "Failed to publish: invalid RLN proof") + elif result == MessageValidationResult.Spam: + raise newException(ValueError, "Failed to publish: limit exceeded, try again later") + elif result == MessageValidationResult.Valid: + debug "RLN proof validated successfully", contentTopic=message.contentTopic + else: + raise newException(ValueError, "Failed to publish: unknown RLN proof validation result") + + # if we reach here its either a non-RLN message or a RLN message with a valid proof + debug "Publishing message", contentTopic=message.contentTopic, rln=defined(rln) + let publishFut = node.publish(none(PubsubTopic), message) + if not await publishFut.withTimeout(futTimeout): + raise newException(ValueError, "Failed to publish: timed out") + + return true + + server.rpc("get_waku_v2_relay_v1_auto_messages") do (contentTopic: ContentTopic) -> seq[WakuMessageRPC]: + if contentTopic == "": + raise newException(ValueError, "Empty content topic") + + ## Returns all WakuMessages received on a Content topic since the + ## last time this method was called + debug "get_waku_v2_relay_v1_auto_messages", topic=contentTopic + + let msgRes = cache.getMessages(contentTopic, clear=true) + if msgRes.isErr(): + raise newException(ValueError, "Not subscribed to content topic: " & contentTopic) + + return msgRes.value.map(toWakuMessageRPC) \ No newline at end of file diff --git a/waku/waku_api/rest/relay/client.nim b/waku/waku_api/rest/relay/client.nim index a0be015ae..46bdb6464 100644 --- a/waku/waku_api/rest/relay/client.nim +++ b/waku/waku_api/rest/relay/client.nim @@ -46,10 +46,11 @@ proc decodeBytes*(t: typedesc[string], value: openarray[byte], # TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) proc relayPostSubscriptionsV1*(body: RelayPostSubscriptionsRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/subscriptions", meth: HttpMethod.MethodPost.} +proc relayPostAutoSubscriptionsV1*(body: RelayPostSubscriptionsRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/auto/subscriptions", meth: HttpMethod.MethodPost.} # TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) proc relayDeleteSubscriptionsV1*(body: RelayDeleteSubscriptionsRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/subscriptions", meth: HttpMethod.MethodDelete.} - +proc relayDeleteAutoSubscriptionsV1*(body: RelayDeleteSubscriptionsRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/auto/subscriptions", meth: HttpMethod.MethodDelete.} proc decodeBytes*(t: typedesc[RelayGetMessagesResponse], data: openArray[byte], contentType: Opt[ContentTypeData]): RestResult[RelayGetMessagesResponse] = if MediaType.init($contentType) != MIMETYPE_JSON: @@ -70,6 +71,8 @@ proc encodeBytes*(value: RelayPostMessagesRequest, # TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) proc relayGetMessagesV1*(topic: string): RestResponse[RelayGetMessagesResponse] {.rest, endpoint: "/relay/v1/messages/{topic}", meth: HttpMethod.MethodGet.} +proc relayGetAutoMessagesV1*(topic: string): RestResponse[RelayGetMessagesResponse] {.rest, endpoint: "/relay/v1/auto/messages/{topic}", meth: HttpMethod.MethodGet.} # TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto) proc relayPostMessagesV1*(topic: string, body: RelayPostMessagesRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/messages/{topic}", meth: HttpMethod.MethodPost.} +proc relayPostAutoMessagesV1*(topic: string, body: RelayPostMessagesRequest): RestResponse[string] {.rest, endpoint: "/relay/v1/auto/messages/{topic}", meth: HttpMethod.MethodPost.} diff --git a/waku/waku_api/rest/relay/handlers.nim b/waku/waku_api/rest/relay/handlers.nim index 5b4d4f97f..6e74db8be 100644 --- a/waku/waku_api/rest/relay/handlers.nim +++ b/waku/waku_api/rest/relay/handlers.nim @@ -16,10 +16,12 @@ import ../../../waku_relay/protocol, ../../../waku_rln_relay, ../../../waku_rln_relay/rln/wrappers, + ../../../node/waku_node, + ../../message_cache, + ../../cache_handlers, ../serdes, ../responses, - ./types, - ./topic_cache + ./types from std/times import getTime from std/times import toUnix @@ -40,9 +42,11 @@ const futTimeout* = 5.seconds # Max time to wait for futures #### Request handlers const ROUTE_RELAY_SUBSCRIPTIONSV1* = "/relay/v1/subscriptions" +const ROUTE_RELAY_MESSAGESV1* = "/relay/v1/messages/{topic}" +const ROUTE_RELAY_AUTO_SUBSCRIPTIONSV1* = "/relay/v1/auto/subscriptions" +const ROUTE_RELAY_AUTO_MESSAGESV1* = "/relay/v1/auto/messages/{topic}" -proc installRelayPostSubscriptionsV1Handler*(router: var RestRouter, node: WakuNode, cache: TopicCache) = - +proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, cache: MessageCache[string]) = router.api(MethodPost, ROUTE_RELAY_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse: # ## Subscribes a node to a list of PubSub topics # debug "post_waku_v2_relay_v1_subscriptions" @@ -65,14 +69,12 @@ proc installRelayPostSubscriptionsV1Handler*(router: var RestRouter, node: WakuN # Only subscribe to topics for which we have no subscribed topic handlers yet let newTopics = req.filterIt(not cache.isSubscribed(it)) - for topic in newTopics: - cache.subscribe(topic) - node.subscribe(topic, cache.messageHandler()) + for pubsubTopic in newTopics: + cache.subscribe(pubsubTopic) + node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(messageCacheHandler(cache))) return RestApiResponse.ok() - -proc installRelayDeleteSubscriptionsV1Handler*(router: var RestRouter, node: WakuNode, cache: TopicCache) = router.api(MethodDelete, ROUTE_RELAY_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse: # ## Subscribes a node to a list of PubSub topics # debug "delete_waku_v2_relay_v1_subscriptions" @@ -93,17 +95,13 @@ proc installRelayDeleteSubscriptionsV1Handler*(router: var RestRouter, node: Wak let req: RelayDeleteSubscriptionsRequest = reqResult.get() # Unsubscribe all handlers from requested topics - for topic in req: - node.unsubscribe(topic) - cache.unsubscribe(topic) + for pubsubTopic in req: + node.unsubscribe((kind: PubsubUnsub, topic: pubsubTopic)) + cache.unsubscribe(pubsubTopic) # Successfully unsubscribed from all requested topics return RestApiResponse.ok() - -const ROUTE_RELAY_MESSAGESV1* = "/relay/v1/messages/{topic}" - -proc installRelayGetMessagesV1Handler*(router: var RestRouter, node: WakuNode, cache: TopicCache) = router.api(MethodGet, ROUTE_RELAY_MESSAGESV1) do (topic: string) -> RestApiResponse: # ## Returns all WakuMessages received on a PubSub topic since the # ## last time this method was called @@ -127,9 +125,7 @@ proc installRelayGetMessagesV1Handler*(router: var RestRouter, node: WakuNode, c return resp.get() -proc installRelayPostMessagesV1Handler*(router: var RestRouter, node: WakuNode) = router.api(MethodPost, ROUTE_RELAY_MESSAGESV1) do (topic: string, contentBody: Option[ContentBody]) -> RestApiResponse: - if topic.isErr(): return RestApiResponse.badRequest() let pubSubTopic = topic.get() @@ -178,16 +174,139 @@ proc installRelayPostMessagesV1Handler*(router: var RestRouter, node: WakuNode) return RestApiResponse.internalServerError("Failed to publish: unknown RLN proof validation result") # if we reach here its either a non-RLN message or a RLN message with a valid proof - debug "Publishing message", pubSubTopic=pubSubTopic - if not (waitFor node.publish(pubSubTopic, resMessage.value).withTimeout(futTimeout)): + debug "Publishing message", pubSubTopic=pubSubTopic, rln=defined(rln) + if not (waitFor node.publish(some(pubSubTopic), resMessage.value).withTimeout(futTimeout)): error "Failed to publish message to topic", pubSubTopic=pubSubTopic return RestApiResponse.internalServerError("Failed to publish: timedout") return RestApiResponse.ok() + # Autosharding API -proc installRelayApiHandlers*(router: var RestRouter, node: WakuNode, cache: TopicCache) = - installRelayPostSubscriptionsV1Handler(router, node, cache) - installRelayDeleteSubscriptionsV1Handler(router, node, cache) - installRelayGetMessagesV1Handler(router, node, cache) - installRelayPostMessagesV1Handler(router, node) + router.api(MethodPost, ROUTE_RELAY_AUTO_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse: + # ## Subscribes a node to a list of content topics + # debug "post_waku_v2_relay_v1_auto_subscriptions" + + # Check the request body + if contentBody.isNone(): + return RestApiResponse.badRequest() + + let reqBodyContentType = MediaType.init($contentBody.get().contentType) + if reqBodyContentType != MIMETYPE_JSON: + return RestApiResponse.badRequest() + + let reqBodyData = contentBody.get().data + let reqResult = decodeFromJsonBytes(RelayPostSubscriptionsRequest, reqBodyData) + if reqResult.isErr(): + return RestApiResponse.badRequest() + + let req: RelayPostSubscriptionsRequest = reqResult.get() + + # Only subscribe to topics for which we have no subscribed topic handlers yet + let newTopics = req.filterIt(not cache.isSubscribed(it)) + + for contentTopic in newTopics: + cache.subscribe(contentTopic) + node.subscribe((kind: ContentSub, topic: contentTopic), some(autoMessageCacheHandler(cache))) + + return RestApiResponse.ok() + + router.api(MethodDelete, ROUTE_RELAY_AUTO_SUBSCRIPTIONSV1) do (contentBody: Option[ContentBody]) -> RestApiResponse: + # ## Subscribes a node to a list of content topics + # debug "delete_waku_v2_relay_v1_auto_subscriptions" + + # Check the request body + if contentBody.isNone(): + return RestApiResponse.badRequest() + + let reqBodyContentType = MediaType.init($contentBody.get().contentType) + if reqBodyContentType != MIMETYPE_JSON: + return RestApiResponse.badRequest() + + let reqBodyData = contentBody.get().data + let reqResult = decodeFromJsonBytes(RelayDeleteSubscriptionsRequest, reqBodyData) + if reqResult.isErr(): + return RestApiResponse.badRequest() + + let req: RelayDeleteSubscriptionsRequest = reqResult.get() + + # Unsubscribe all handlers from requested topics + for contentTopic in req: + cache.unsubscribe(contentTopic) + node.unsubscribe((kind: ContentUnsub, topic: contentTopic)) + + # Successfully unsubscribed from all requested topics + return RestApiResponse.ok() + + router.api(MethodGet, ROUTE_RELAY_AUTO_MESSAGESV1) do (topic: string) -> RestApiResponse: + # ## Returns all WakuMessages received on a content topic since the + # ## last time this method was called + # ## TODO: ability to specify a return message limit + # debug "get_waku_v2_relay_v1_auto_messages", topic=topic + + if topic.isErr(): + return RestApiResponse.badRequest() + let contentTopic = topic.get() + + let messages = cache.getMessages(contentTopic, clear=true) + if messages.isErr(): + debug "Not subscribed to topic", topic=contentTopic + return RestApiResponse.notFound() + + let data = RelayGetMessagesResponse(messages.get().map(toRelayWakuMessage)) + let resp = RestApiResponse.jsonResponse(data, status=Http200) + if resp.isErr(): + debug "An error ocurred while building the json respose", error=resp.error + return RestApiResponse.internalServerError() + + return resp.get() + + router.api(MethodPost, ROUTE_RELAY_AUTO_MESSAGESV1) do (topic: string, contentBody: Option[ContentBody]) -> RestApiResponse: + # Check the request body + if contentBody.isNone(): + return RestApiResponse.badRequest() + + let reqBodyContentType = MediaType.init($contentBody.get().contentType) + if reqBodyContentType != MIMETYPE_JSON: + return RestApiResponse.badRequest() + + let reqBodyData = contentBody.get().data + let reqResult = decodeFromJsonBytes(RelayPostMessagesRequest, reqBodyData) + if reqResult.isErr(): + return RestApiResponse.badRequest() + + if reqResult.value.contentTopic.isNone(): + return RestApiResponse.badRequest() + + let resMessage = reqResult.value.toWakuMessage(version = 0) + if resMessage.isErr(): + return RestApiResponse.badRequest() + + var message = resMessage.get() + + # if RLN is mounted, append the proof to the message + if not node.wakuRlnRelay.isNil(): + # append the proof to the message + let success = node.wakuRlnRelay.appendRLNProof(message, + float64(getTime().toUnix())) + if not success: + return RestApiResponse.internalServerError("Failed to publish: error appending RLN proof to message") + + # validate the message before sending it + let result = node.wakuRlnRelay.validateMessage(message) + if result == MessageValidationResult.Invalid: + return RestApiResponse.internalServerError("Failed to publish: invalid RLN proof") + elif result == MessageValidationResult.Spam: + return RestApiResponse.badRequest("Failed to publish: limit exceeded, try again later") + elif result == MessageValidationResult.Valid: + debug "RLN proof validated successfully", contentTopic=message.contentTopic + else: + return RestApiResponse.internalServerError("Failed to publish: unknown RLN proof validation result") + + # if we reach here its either a non-RLN message or a RLN message with a valid proof + debug "Publishing message", contentTopic=message.contentTopic, rln=defined(rln) + if not (waitFor node.publish(none(PubSubTopic), message).withTimeout(futTimeout)): + error "Failed to publish message to topic", contentTopic=message.contentTopic + return RestApiResponse.internalServerError("Failed to publish: timedout") + + return RestApiResponse.ok() \ No newline at end of file diff --git a/waku/waku_api/rest/relay/topic_cache.nim b/waku/waku_api/rest/relay/topic_cache.nim deleted file mode 100644 index 301165087..000000000 --- a/waku/waku_api/rest/relay/topic_cache.nim +++ /dev/null @@ -1,33 +0,0 @@ -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import - chronos, - chronicles -import - ../../../waku_relay, - ../../../waku_core, - ../../message_cache - -export message_cache - - -##### TopicCache - -type TopicCacheResult*[T] = MessageCacheResult[T] - -type TopicCache* = MessageCache[PubSubTopic] - - -##### Message handler - -type TopicCacheMessageHandler* = WakuRelayHandler - -proc messageHandler*(cache: TopicCache): TopicCacheMessageHandler = - - let handler = proc(pubsubTopic: string, msg: WakuMessage): Future[void] {.async, closure.} = - cache.addMessage(PubSubTopic(pubsubTopic), msg) - - handler diff --git a/waku/waku_api/rest/relay/types.nim b/waku/waku_api/rest/relay/types.nim index 1d3c84bcf..f11ea40c5 100644 --- a/waku/waku_api/rest/relay/types.nim +++ b/waku/waku_api/rest/relay/types.nim @@ -29,8 +29,8 @@ type RelayPostMessagesRequest* = RelayWakuMessage type - RelayPostSubscriptionsRequest* = seq[PubSubTopic] - RelayDeleteSubscriptionsRequest* = seq[PubSubTopic] + RelayPostSubscriptionsRequest* = seq[string] + RelayDeleteSubscriptionsRequest* = seq[string] #### Type conversion diff --git a/waku/waku_core/topics.nim b/waku/waku_core/topics.nim index 5c44aaf78..f4915999a 100644 --- a/waku/waku_core/topics.nim +++ b/waku/waku_core/topics.nim @@ -10,9 +10,5 @@ export type SubscriptionKind* = enum ContentSub, ContentUnsub, PubsubSub, PubsubUnsub - SubscriptionEvent* = object - case kind*: SubscriptionKind - of PubsubSub: pubsubSub*: string - of ContentSub: contentSub*: string - of PubsubUnsub: pubsubUnsub*: string - of ContentUnsub: contentUnsub*: string \ No newline at end of file + SubscriptionEvent* = tuple[kind: SubscriptionKind, topic: string] + \ No newline at end of file diff --git a/waku/waku_discv5.nim b/waku/waku_discv5.nim index efcaa0214..c02371dbb 100644 --- a/waku/waku_discv5.nim +++ b/waku/waku_discv5.nim @@ -307,7 +307,10 @@ proc stop*(wd: WakuDiscoveryV5): Future[void] {.async.} = debug "Successfully stopped discovery v5 service" -proc subscriptionsListener*(wd: WakuDiscoveryV5, topicSubscriptionQueue: AsyncEventQueue[SubscriptionEvent]) {.async.} = +proc subscriptionsListener*( + wd: WakuDiscoveryV5, + topicSubscriptionQueue: AsyncEventQueue[SubscriptionEvent] + ) {.async.} = ## Listen for pubsub topics subscriptions changes let key = topicSubscriptionQueue.register() @@ -317,8 +320,8 @@ proc subscriptionsListener*(wd: WakuDiscoveryV5, topicSubscriptionQueue: AsyncEv # Since we don't know the events we will receive we have to anticipate. - let subs = events.filterIt(it.kind == SubscriptionKind.PubsubSub).mapIt(it.pubsubSub) - let unsubs = events.filterIt(it.kind == SubscriptionKind.PubsubUnsub).mapIt(it.pubsubUnsub) + let subs = events.filterIt(it.kind == PubsubSub).mapIt(it.topic) + let unsubs = events.filterIt(it.kind == PubsubUnsub).mapIt(it.topic) if subs.len == 0 and unsubs.len == 0: continue diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index 7b54e6148..15562983c 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -214,20 +214,21 @@ proc generateOrderedValidator*(w: WakuRelay): auto {.gcsafe.} = return ValidationResult.Accept return wrappedValidator -proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandler) = +proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandler): TopicHandler = debug "subscribe", pubsubTopic=pubsubTopic # we need to wrap the handler since gossipsub doesnt understand WakuMessage - let wrappedHandler = proc(pubsubTopic: string, data: seq[byte]): Future[void] {.gcsafe, raises: [].} = - let decMsg = WakuMessage.decode(data) - if decMsg.isErr(): - # fine if triggerSelf enabled, since validators are bypassed - error "failed to decode WakuMessage, validator passed a wrong message", error = decMsg.error - let fut = newFuture[void]() - fut.complete() - return fut - else: - return handler(pubsubTopic, decMsg.get()) + let wrappedHandler = + proc(pubsubTopic: string, data: seq[byte]): Future[void] {.gcsafe, raises: [].} = + let decMsg = WakuMessage.decode(data) + if decMsg.isErr(): + # fine if triggerSelf enabled, since validators are bypassed + error "failed to decode WakuMessage, validator passed a wrong message", error = decMsg.error + let fut = newFuture[void]() + fut.complete() + return fut + else: + return handler(pubsubTopic, decMsg.get()) # add the ordered validator to the topic if not w.validatorInserted.hasKey(pubSubTopic): @@ -240,12 +241,23 @@ proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandle # subscribe to the topic with our wrapped handler procCall GossipSub(w).subscribe(pubsubTopic, wrappedHandler) -proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic) = - debug "unsubscribe", pubsubTopic=pubsubTopic + return wrappedHandler + +proc unsubscribeAll*(w: WakuRelay, pubsubTopic: PubsubTopic) = + ## Unsubscribe all handlers on this pubsub topic + + debug "unsubscribe all", pubsubTopic=pubsubTopic procCall GossipSub(w).unsubscribeAll(pubsubTopic) w.validatorInserted.del(pubsubTopic) +proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: TopicHandler) = + ## Unsubscribe this handler on this pubsub topic + + debug "unsubscribe", pubsubTopic=pubsubTopic + + procCall GossipSub(w).unsubscribe(pubsubTopic, handler) + proc publish*(w: WakuRelay, pubsubTopic: PubsubTopic, message: WakuMessage): Future[int] {.async.} = trace "publish", pubsubTopic=pubsubTopic let data = message.encode().buffer