diff --git a/.update.timestamp b/.update.timestamp new file mode 100644 index 000000000..5d18bc455 --- /dev/null +++ b/.update.timestamp @@ -0,0 +1 @@ +1612265639 \ No newline at end of file diff --git a/docs/api/v2/node.md b/docs/api/v2/node.md index c3ab6c9d1..d07154fad 100644 --- a/docs/api/v2/node.md +++ b/docs/api/v2/node.md @@ -27,7 +27,7 @@ proc start*(node: WakuNode) {.async.} = ## ## Status: Implemented. -proc subscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) {.async.} = +proc subscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) = ## Subscribes to a PubSub topic. Triggers handler when receiving messages on ## this topic. TopicHandler is a method that takes a topic and some data. ## @@ -40,12 +40,12 @@ proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHa ## ## Status: Implemented. -proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) {.async.} = +proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) = ## Unsubscribes a handler from a PubSub topic. ## ## Status: Implemented. -proc unsubscribeAll*(node: WakuNode, topic: Topic) {.async.} = +proc unsubscribeAll*(node: WakuNode, topic: Topic) = ## Unsubscribes all handlers registered on a specific PubSub topic. ## ## Status: Implemented. diff --git a/examples/v2/basic2.nim b/examples/v2/basic2.nim index 328fad69f..2ad659073 100644 --- a/examples/v2/basic2.nim +++ b/examples/v2/basic2.nim @@ -23,7 +23,7 @@ proc runBackground() {.async.} = Port(uint16(conf.tcpPort) + conf.portsShift), extIp, extTcpPort) await node.start() - await node.mountRelay(rlnRelayEnabled = conf.rlnrelay) + node.mountRelay(rlnRelayEnabled = conf.rlnrelay) # Subscribe to a topic let topic = cast[Topic]("foobar") @@ -31,7 +31,7 @@ proc runBackground() {.async.} = let message = WakuMessage.init(data).value let payload = cast[string](message.payload) info "Hit subscribe handler", topic=topic, payload=payload, contentTopic=message.contentTopic - await node.subscribe(topic, handler) + node.subscribe(topic, handler) # Publish to a topic let payload = cast[seq[byte]]("hello world") diff --git a/examples/v2/chat2.nim b/examples/v2/chat2.nim index b69462656..49e6af63e 100644 --- a/examples/v2/chat2.nim +++ b/examples/v2/chat2.nim @@ -176,9 +176,9 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = await node.start() if conf.filternode != "": - await node.mountRelay(conf.topics.split(" "), rlnRelayEnabled = conf.rlnrelay) + node.mountRelay(conf.topics.split(" "), rlnRelayEnabled = conf.rlnrelay) else: - await node.mountRelay(@[], rlnRelayEnabled = conf.rlnrelay) + node.mountRelay(@[], rlnRelayEnabled = conf.rlnrelay) var chat = Chat(node: node, transp: transp, subscribed: true, connected: false, started: true) @@ -251,7 +251,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = trace "Invalid encoded WakuMessage", error = decoded.error let topic = cast[Topic](DefaultTopic) - await node.subscribe(topic, handler) + node.subscribe(topic, handler) await chat.readWriteLoop() runForever() diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index 699bf396e..f46ccf231 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -42,7 +42,7 @@ procSuite "Waku v2 JSON-RPC API": asyncTest "Debug API: get node info": waitFor node.start() - waitFor node.mountRelay() + node.mountRelay() # RPC server setup let @@ -68,7 +68,7 @@ procSuite "Waku v2 JSON-RPC API": asyncTest "Relay API: publish and subscribe/unsubscribe": waitFor node.start() - waitFor node.mountRelay() + node.mountRelay() # RPC server setup let @@ -128,13 +128,13 @@ procSuite "Waku v2 JSON-RPC API": message = WakuMessage(payload: payload, contentTopic: contentTopic) await node1.start() - await node1.mountRelay(@[pubSubTopic]) + node1.mountRelay(@[pubSubTopic]) await node2.start() - await node2.mountRelay(@[pubSubTopic]) + node2.mountRelay(@[pubSubTopic]) await node3.start() - await node3.mountRelay(@[pubSubTopic]) + node3.mountRelay(@[pubSubTopic]) await node1.connectToNodes(@[node2.peerInfo]) await node3.connectToNodes(@[node2.peerInfo]) @@ -188,7 +188,7 @@ procSuite "Waku v2 JSON-RPC API": asyncTest "Store API: retrieve historical messages": waitFor node.start() - waitFor node.mountRelay(@[defaultTopic]) + node.mountRelay(@[defaultTopic]) # RPC server setup let @@ -249,7 +249,7 @@ procSuite "Waku v2 JSON-RPC API": asyncTest "Filter API: subscribe/unsubscribe": waitFor node.start() - waitFor node.mountRelay() + node.mountRelay() node.mountFilter() @@ -434,13 +434,13 @@ procSuite "Waku v2 JSON-RPC API": topicCache = newTable[string, seq[WakuMessage]]() await node1.start() - await node1.mountRelay(@[pubSubTopic]) + node1.mountRelay(@[pubSubTopic]) await node2.start() - await node2.mountRelay(@[pubSubTopic]) + node2.mountRelay(@[pubSubTopic]) await node3.start() - await node3.mountRelay(@[pubSubTopic]) + node3.mountRelay(@[pubSubTopic]) await node1.connectToNodes(@[node2.peerInfo]) await node3.connectToNodes(@[node2.peerInfo]) @@ -524,13 +524,13 @@ procSuite "Waku v2 JSON-RPC API": topicCache = newTable[string, seq[WakuMessage]]() await node1.start() - await node1.mountRelay(@[pubSubTopic]) + node1.mountRelay(@[pubSubTopic]) await node2.start() - await node2.mountRelay(@[pubSubTopic]) + node2.mountRelay(@[pubSubTopic]) await node3.start() - await node3.mountRelay(@[pubSubTopic]) + node3.mountRelay(@[pubSubTopic]) await node1.connectToNodes(@[node2.peerInfo]) await node3.connectToNodes(@[node2.peerInfo]) diff --git a/tests/v2/test_rpc_waku.nim b/tests/v2/test_rpc_waku.nim index 307a8369c..9183c8667 100644 --- a/tests/v2/test_rpc_waku.nim +++ b/tests/v2/test_rpc_waku.nim @@ -27,7 +27,7 @@ suite "Waku v2 Remote Procedure Calls": waitFor node.start() - waitFor node.mountRelay(@["waku"]) + node.mountRelay(@["waku"]) # RPC server setup let diff --git a/tests/v2/test_waku.nim b/tests/v2/test_waku.nim index c60c1e4c7..9583f969e 100644 --- a/tests/v2/test_waku.nim +++ b/tests/v2/test_waku.nim @@ -85,7 +85,7 @@ procSuite "FloodSub": ) for node in nodes: - await node.mountRelay() + node.mountRelay() await subscribeNodes(nodes) diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index 75a22502f..afee342b4 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -49,10 +49,10 @@ procSuite "WakuNode": await node.start() - await node.mountRelay() + node.mountRelay() # Subscribe our node to the pubSubTopic where all chat data go onto. - await node.subscribe(pubSubTopic, relayHandler) + node.subscribe(pubSubTopic, relayHandler) # Subscribe a contentFilter to trigger a specific application handler when # WakuMessages with that content are received @@ -101,14 +101,14 @@ procSuite "WakuNode": await allFutures([node1.start(), node2.start()]) - await node1.mountRelay() - await node2.mountRelay() + node1.mountRelay() + node2.mountRelay() node1.mountFilter() node2.mountFilter() # Subscribe our node to the pubSubTopic where all chat data go onto. - await node1.subscribe(pubSubTopic, relayHandler) + node1.subscribe(pubSubTopic, relayHandler) # Subscribe a contentFilter to trigger a specific application handler when # WakuMessages with that content are received node1.wakuFilter.setPeer(node2.peerInfo) @@ -221,13 +221,13 @@ procSuite "WakuNode": message = WakuMessage(payload: payload, contentTopic: contentTopic) await node1.start() - await node1.mountRelay(@[pubSubTopic]) + node1.mountRelay(@[pubSubTopic]) await node2.start() - await node2.mountRelay(@[pubSubTopic]) + node2.mountRelay(@[pubSubTopic]) await node3.start() - await node3.mountRelay(@[pubSubTopic]) + node3.mountRelay(@[pubSubTopic]) await node1.connectToNodes(@[node2.peerInfo]) await node3.connectToNodes(@[node2.peerInfo]) @@ -243,7 +243,7 @@ procSuite "WakuNode": val.payload == payload completionFut.complete(true) - await node3.subscribe(pubSubTopic, relayHandler) + node3.subscribe(pubSubTopic, relayHandler) await sleepAsync(2000.millis) await node1.publish(pubSubTopic, message) diff --git a/waku/common/wakubridge.nim b/waku/common/wakubridge.nim index e0a123365..c0af3eb2f 100644 --- a/waku/common/wakubridge.nim +++ b/waku/common/wakubridge.nim @@ -84,7 +84,7 @@ proc startWakuV2(config: WakuNodeConf): Future[WakuNode] {.async.} = mountFilter(node) if config.relay: - waitFor mountRelay(node, config.topics.split(" ")) + mountRelay(node, config.topics.split(" ")) if config.staticnodesv2.len > 0: waitFor connectToNodes(node, config.staticnodesv2) diff --git a/waku/v2/node/jsonrpc/relay_api.nim b/waku/v2/node/jsonrpc/relay_api.nim index 6cabe77cb..a1a6cfec4 100644 --- a/waku/v2/node/jsonrpc/relay_api.nim +++ b/waku/v2/node/jsonrpc/relay_api.nim @@ -70,45 +70,26 @@ proc installRelayApiHandlers*(node: WakuNode, rpcsrv: RpcServer, topicCache: Top rpcsrv.rpc("post_waku_v2_relay_v1_subscriptions") do(topics: seq[string]) -> bool: ## Subscribes a node to a list of PubSub topics debug "post_waku_v2_relay_v1_subscriptions" - - var failedTopics: seq[string] # Subscribe to all requested topics for topic in topics: - if not(await node.subscribe(topic, topicHandler).withTimeout(futTimeout)): - # If any topic fails to subscribe, add to list of failedTopics - failedTopics.add(topic) - else: - # Create message cache for this topic - debug "MessageCache for topic", topic=topic - topicCache[topic] = @[] - - if (failedTopics.len() == 0): - # Successfully subscribed to all requested topics - return true - else: - # Failed to subscribe to one or more topics - raise newException(ValueError, "Failed to subscribe to topics " & repr(failedTopics)) + node.subscribe(topic, topicHandler) + # Create message cache for this topic + debug "MessageCache for topic", topic=topic + topicCache[topic] = @[] + + # Successfully subscribed to all requested topics + return true rpcsrv.rpc("delete_waku_v2_relay_v1_subscriptions") do(topics: seq[string]) -> bool: ## Unsubscribes a node from a list of PubSub topics debug "delete_waku_v2_relay_v1_subscriptions" - - var failedTopics: seq[string] # Unsubscribe all handlers from requested topics for topic in topics: - if not(await node.unsubscribeAll(topic).withTimeout(futTimeout)): - # If any topic fails to unsubscribe, add to list of failedTopics - failedTopics.add(topic) - else: - # Remove message cache for topic - topicCache.del(topic) + node.unsubscribeAll(topic) + # Remove message cache for topic + topicCache.del(topic) - if (failedTopics.len() == 0): - # Successfully unsubscribed from all requested topics - return true - else: - # Failed to unsubscribe from one or more topics - raise newException(ValueError, "Failed to unsubscribe from topics " & repr(failedTopics)) - + # Successfully unsubscribed from all requested topics + return true diff --git a/waku/v2/node/rpc/wakurpc.nim b/waku/v2/node/rpc/wakurpc.nim index 79b7a7310..cb16fb583 100644 --- a/waku/v2/node/rpc/wakurpc.nim +++ b/waku/v2/node/rpc/wakurpc.nim @@ -53,8 +53,7 @@ proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) = warn "waku_subscribe decode error", msg=msg info "waku_subscribe raw data string", str=cast[string](data) - # XXX: Can we make this context async to use await? - discard node.subscribe(topic, handler) + node.subscribe(topic, handler) return true #if not result: # raise newException(ValueError, "Message could not be posted") diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 7899bead7..3498c27e1 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -157,7 +157,7 @@ proc stop*(node: WakuNode) {.async.} = await node.switch.stop() -proc subscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) {.async.} = +proc subscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) = ## Subscribes to a PubSub topic. Triggers handler when receiving messages on ## this topic. TopicHandler is a method that takes a topic and some data. ## @@ -166,7 +166,7 @@ proc subscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) {.async.} = info "subscribe", topic=topic let wakuRelay = node.wakuRelay - await wakuRelay.subscribe(topic, handler) + wakuRelay.subscribe(topic, handler) proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHandler) {.async, gcsafe.} = ## Registers for messages that match a specific filter. Triggers the handler whenever a message is received. @@ -187,23 +187,23 @@ proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHa waku_node_filters.set(node.filters.len.int64) -proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) {.async.} = +proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) = ## Unsubscribes a handler from a PubSub topic. ## ## Status: Implemented. info "unsubscribe", topic=topic let wakuRelay = node.wakuRelay - await wakuRelay.unsubscribe(@[(topic, handler)]) + wakuRelay.unsubscribe(@[(topic, handler)]) -proc unsubscribeAll*(node: WakuNode, topic: Topic) {.async.} = +proc unsubscribeAll*(node: WakuNode, topic: Topic) = ## Unsubscribes all handlers registered on a specific PubSub topic. ## ## Status: Implemented. info "unsubscribeAll", topic=topic let wakuRelay = node.wakuRelay - await wakuRelay.unsubscribeAll(topic) + wakuRelay.unsubscribeAll(topic) proc unsubscribe*(node: WakuNode, request: FilterRequest) {.async, gcsafe.} = @@ -299,7 +299,7 @@ proc mountStore*(node: WakuNode, store: MessageStore = nil) = node.switch.mount(node.wakuStore) node.subscriptions.subscribe(WakuStoreCodec, node.wakuStore.subscription()) -proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRelayEnabled: bool = false) {.async, gcsafe.} = +proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRelayEnabled: bool = false) {.gcsafe.} = # TODO add the RLN registration let wakuRelay = WakuRelay.init( switch = node.switch, @@ -328,15 +328,13 @@ proc mountRelay*(node: WakuNode, topics: seq[string] = newSeq[string](), rlnRela await node.subscriptions.notify(topic, msg.value()) waku_node_messages.inc(labelValues = ["relay"]) - await node.wakuRelay.subscribe("/waku/2/default-waku/proto", relayHandler) + node.wakuRelay.subscribe("/waku/2/default-waku/proto", relayHandler) for topic in topics: proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = debug "Hit handler", topic=topic, data=data - # XXX: Is using discard here fine? Not sure if we want init to be async? - # Can also move this to the start proc, possibly wiser? - discard node.subscribe(topic, handler) + node.subscribe(topic, handler) ## Helpers proc dialPeer*(n: WakuNode, address: string) {.async.} = @@ -495,7 +493,7 @@ when isMainModule: mountFilter(node) if conf.relay: - waitFor mountRelay(node, conf.topics.split(" "), rlnRelayEnabled = conf.rlnrelay) + mountRelay(node, conf.topics.split(" "), rlnRelayEnabled = conf.rlnrelay) if conf.staticnodes.len > 0: waitFor connectToNodes(node, conf.staticnodes) diff --git a/waku/v2/protocol/waku_relay.nim b/waku/v2/protocol/waku_relay.nim index ac7382eb3..46e0ee4e6 100644 --- a/waku/v2/protocol/waku_relay.nim +++ b/waku/v2/protocol/waku_relay.nim @@ -47,10 +47,10 @@ method initPubSub*(w: WakuRelay) = method subscribe*(w: WakuRelay, pubSubTopic: string, - handler: TopicHandler) {.async.} = + handler: TopicHandler) = debug "subscribe", pubSubTopic=pubSubTopic - await procCall GossipSub(w).subscribe(pubSubTopic, handler) + procCall GossipSub(w).subscribe(pubSubTopic, handler) method publish*(w: WakuRelay, pubSubTopic: string, @@ -61,16 +61,16 @@ method publish*(w: WakuRelay, return await procCall GossipSub(w).publish(pubSubTopic, message) method unsubscribe*(w: WakuRelay, - topics: seq[TopicPair]) {.async.} = + topics: seq[TopicPair]) = debug "unsubscribe" - await procCall GossipSub(w).unsubscribe(topics) + procCall GossipSub(w).unsubscribe(topics) method unsubscribeAll*(w: WakuRelay, - pubSubTopic: string) {.async.} = + pubSubTopic: string) = debug "unsubscribeAll" - await procCall GossipSub(w).unsubscribeAll(pubSubTopic) + procCall GossipSub(w).unsubscribeAll(pubSubTopic) # GossipSub specific methods -------------------------------------------------- method start*(w: WakuRelay) {.async.} =