From 8c20cc04fc70b15aadf323d1a093529979e8dce7 Mon Sep 17 00:00:00 2001 From: oskarth Date: Thu, 29 Apr 2021 05:15:46 +0000 Subject: [PATCH] deploy: ffb5099f9e7df614e7496cef0923aba52de2ba48 --- tests/v2/test_jsonrpc_waku.nim | 2 +- waku/v2/node/jsonrpc/jsonrpc_callsigs.nim | 2 +- waku/v2/node/jsonrpc/store_api.nim | 5 ++- waku/v2/node/scripts/rpc_query.nim | 11 +++-- waku/v2/node/wakunode2.nim | 49 +++++++++++++++++++---- 5 files changed, 55 insertions(+), 14 deletions(-) diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index deb52654d..7c7b4a82e 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -259,7 +259,7 @@ procSuite "Waku v2 JSON-RPC API": let client = newRpcHttpClient() await client.connect("127.0.0.1", rpcPort) - let response = await client.get_waku_v2_store_v1_messages(@[HistoryContentFilter(contentTopic: defaultContentTopic)], some(StorePagingOptions())) + let response = await client.get_waku_v2_store_v1_messages(defaultTopic, @[HistoryContentFilter(contentTopic: defaultContentTopic)], some(StorePagingOptions())) check: response.messages.len() == 8 response.pagingOptions.isNone diff --git a/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim b/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim index d3ff77b80..ed8e4c474 100644 --- a/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim +++ b/waku/v2/node/jsonrpc/jsonrpc_callsigs.nim @@ -16,7 +16,7 @@ proc delete_waku_v2_relay_v1_subscriptions(topics: seq[string]): bool # Store API -proc get_waku_v2_store_v1_messages(contentFilters: seq[HistoryContentFilter], pagingOptions: Option[StorePagingOptions]): StoreResponse +proc get_waku_v2_store_v1_messages(pubsubTopic: string, contentFilters: seq[HistoryContentFilter], pagingOptions: Option[StorePagingOptions]): StoreResponse # Filter API diff --git a/waku/v2/node/jsonrpc/store_api.nim b/waku/v2/node/jsonrpc/store_api.nim index 61dbffe17..6a9791b0f 100644 --- a/waku/v2/node/jsonrpc/store_api.nim +++ b/waku/v2/node/jsonrpc/store_api.nim @@ -17,7 +17,7 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = ## Store API version 1 definitions - rpcsrv.rpc("get_waku_v2_store_v1_messages") do(contentFilters: seq[HistoryContentFilter], pagingOptions: Option[StorePagingOptions]) -> StoreResponse: + rpcsrv.rpc("get_waku_v2_store_v1_messages") do(pubsubTopic: string, contentFilters: seq[HistoryContentFilter], pagingOptions: Option[StorePagingOptions]) -> StoreResponse: ## Returns history for a list of content topics with optional paging debug "get_waku_v2_store_v1_messages" @@ -27,7 +27,8 @@ proc installStoreApiHandlers*(node: WakuNode, rpcsrv: RpcServer) = debug "get_waku_v2_store_v1_messages response" responseFut.complete(response.toStoreResponse()) - let historyQuery = HistoryQuery(contentFilters: contentFilters, + let historyQuery = HistoryQuery(pubsubTopic: pubsubTopic, + contentFilters: contentFilters, pagingInfo: if pagingOptions.isSome: pagingOptions.get.toPagingInfo() else: PagingInfo()) await node.query(historyQuery, queryFuncHandler) diff --git a/waku/v2/node/scripts/rpc_query.nim b/waku/v2/node/scripts/rpc_query.nim index 40ddb1a25..a425f6b0d 100644 --- a/waku/v2/node/scripts/rpc_query.nim +++ b/waku/v2/node/scripts/rpc_query.nim @@ -24,13 +24,18 @@ if paramCount() < 1: let rpcPort = Port(parseInt(paramStr(1))) -echo "Please enter your topic:" +echo "Please enter your pubsub topic:" +let raw_pubsub = readLine(stdin) +let pubsubTopic = fmt"{raw_pubsub}" +echo "PubSubTopic is:", pubsubTopic + +echo "Please enter your content topic:" let raw_input = readLine(stdin) let input = fmt"{raw_input}" -echo "Input is:", input +echo "Content topic is:", input var node = newRpcHttpClient() waitfor node.connect("localhost", rpcPort) -var res = waitfor node.get_waku_v2_store_v1_messages(@[HistoryContentFilter(contentTopic: ContentTopic(input))], none(StorePagingOptions)) +var res = waitfor node.get_waku_v2_store_v1_messages(pubsubTopic, @[HistoryContentFilter(contentTopic: ContentTopic(input))], none(StorePagingOptions)) echo "Waku query response: ", res diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index a9c9ce2bd..4677b192b 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -180,6 +180,11 @@ proc stop*(node: WakuNode) {.async.} = node.started = false proc subscribe(node: WakuNode, topic: Topic, handler: Option[TopicHandler]) = + if node.wakuRelay.isNil: + error "Invalid API call to `subscribe`. WakuRelay not mounted." + # @TODO improved error handling + return + info "subscribe", topic=topic proc defaultHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = @@ -242,6 +247,11 @@ proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) = ## Unsubscribes a handler from a PubSub topic. ## ## Status: Implemented. + if node.wakuRelay.isNil: + error "Invalid API call to `unsubscribe`. WakuRelay not mounted." + # @TODO improved error handling + return + info "unsubscribe", topic=topic let wakuRelay = node.wakuRelay @@ -251,6 +261,12 @@ proc unsubscribeAll*(node: WakuNode, topic: Topic) = ## Unsubscribes all handlers registered on a specific PubSub topic. ## ## Status: Implemented. + + if node.wakuRelay.isNil: + error "Invalid API call to `unsubscribeAll`. WakuRelay not mounted." + # @TODO improved error handling + return + info "unsubscribeAll", topic=topic let wakuRelay = node.wakuRelay @@ -280,6 +296,11 @@ proc publish*(node: WakuNode, topic: Topic, message: WakuMessage, rlnRelayEnabl ## ## Status: Implemented. ## When rlnRelayEnabled is true, a zkp will be generated and attached to the message (it is an experimental feature) + + if node.wakuRelay.isNil: + error "Invalid API call to `publish`. WakuRelay not mounted. Try `lightpush` instead." + # @TODO improved error handling + return let wakuRelay = node.wakuRelay debug "publish", topic=topic, contentTopic=message.contentTopic @@ -346,6 +367,20 @@ proc mountFilter*(node: WakuNode) = node.filters.notify(message, requestId) # Trigger filter handlers on a light node waku_node_messages.inc(labelValues = ["filter"]) + if node.wakuRelay.isNil: + debug "light node: mounting relay without starting" + ## WakuFilter currently requires WakuRelay to be mounted in order to work. + ## This is to allow protocol stream negotation with full nodes to succeed. + ## Here we mount relay on the switch only, but do not subscribe to any pubsub + ## topics. We also never start the relay protocol. + ## @TODO: remove WakuRelay dependency + node.switch.mount(WakuRelay.init( + switch = node.switch, + triggerSelf = true, + sign = false, + verifySignature = false + )) + node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler) node.switch.mount(node.wakuFilter) node.subscriptions.subscribe(WakuFilterCodec, node.wakuFilter.subscription()) @@ -654,13 +689,6 @@ when isMainModule: if conf.storenode != "": setStorePeer(node, conf.storenode) - # Filter setup - if (conf.filternode != "") or (conf.filter): - mountFilter(node) - - if conf.filternode != "": - setFilterPeer(node, conf.filternode) - # Relay setup if conf.relay: # True by default mountRelay(node, conf.topics.split(" "), rlnRelayEnabled = conf.rlnrelay) @@ -671,6 +699,13 @@ when isMainModule: # NOTE Must be mounted after relay if conf.lightpush: mountLightPush(node) + + # Filter setup. NOTE Must be mounted after relay + if (conf.filternode != "") or (conf.filter): + mountFilter(node) + + if conf.filternode != "": + setFilterPeer(node, conf.filternode) if conf.rpc: startRpc(node, conf.rpcAddress, Port(conf.rpcPort + conf.portsShift), conf)