diff --git a/examples/v2/chat2.nim b/examples/v2/chat2.nim index 9792c24ca..c193088c2 100644 --- a/examples/v2/chat2.nim +++ b/examples/v2/chat2.nim @@ -318,7 +318,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = info "Hit filter handler" await node.subscribe( - FilterRequest(contentFilters: @[ContentFilter(topics: @[DefaultContentTopic])], topic: DefaultTopic, subscribe: true), + FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[DefaultContentTopic])], pubSubTopic: DefaultTopic, subscribe: true), filterHandler ) diff --git a/tests/v2/test_jsonrpc_waku.nim b/tests/v2/test_jsonrpc_waku.nim index 39f0aee38..49fa9acb5 100644 --- a/tests/v2/test_jsonrpc_waku.nim +++ b/tests/v2/test_jsonrpc_waku.nim @@ -291,8 +291,8 @@ procSuite "Waku v2 JSON-RPC API": # Light node has not yet subscribed to any filters node.filters.len() == 0 - let contentFilters = @[ContentFilter(topics: @[defaultContentTopic, ContentTopic("2")]), - ContentFilter(topics: @[ContentTopic("3"), ContentTopic("4")])] + let contentFilters = @[ContentFilter(contentTopics: @[defaultContentTopic, ContentTopic("2")]), + ContentFilter(contentTopics: @[ContentTopic("3"), ContentTopic("4")])] var response = await client.post_waku_v2_filter_v1_subscription(contentFilters = contentFilters, topic = some(defaultTopic)) check: @@ -330,7 +330,7 @@ procSuite "Waku v2 JSON-RPC API": # First ensure subscription exists - let sub = await client.post_waku_v2_filter_v1_subscription(contentFilters = @[ContentFilter(topics: @[defaultContentTopic])], topic = some(defaultTopic)) + let sub = await client.post_waku_v2_filter_v1_subscription(contentFilters = @[ContentFilter(contentTopics: @[defaultContentTopic])], topic = some(defaultTopic)) check: sub diff --git a/tests/v2/test_waku_filter.nim b/tests/v2/test_waku_filter.nim index 29579cef4..40309c347 100644 --- a/tests/v2/test_waku_filter.nim +++ b/tests/v2/test_waku_filter.nim @@ -39,7 +39,7 @@ procSuite "Waku Filter": let proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle) - rpc = FilterRequest(contentFilters: @[ContentFilter(topics: @[contentTopic])], topic: defaultTopic, subscribe: true) + rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], pubSubTopic: defaultTopic, subscribe: true) dialSwitch.mount(proto) proto.setPeer(listenSwitch.peerInfo) @@ -88,7 +88,7 @@ procSuite "Waku Filter": let proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle) - rpc = FilterRequest(contentFilters: @[ContentFilter(topics: @[contentTopic])], topic: defaultTopic, subscribe: true) + rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], pubSubTopic: defaultTopic, subscribe: true) dialSwitch.mount(proto) proto.setPeer(listenSwitch.peerInfo) @@ -118,7 +118,7 @@ procSuite "Waku Filter": responseCompletionFuture = newFuture[bool]() let - rpcU = FilterRequest(contentFilters: @[ContentFilter(topics: @[contentTopic])], topic: defaultTopic, subscribe: false) + rpcU = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], pubSubTopic: defaultTopic, subscribe: false) await proto.unsubscribe(rpcU) @@ -145,7 +145,7 @@ procSuite "Waku Filter": let proto = WakuFilter.init(PeerManager.new(dialSwitch), crypto.newRng(), handle) - rpc = FilterRequest(contentFilters: @[ContentFilter(topics: @[contentTopic])], topic: defaultTopic, subscribe: true) + rpc = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], pubSubTopic: defaultTopic, subscribe: true) dialSwitch.mount(proto) diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index ceacd5166..42ee4ec1f 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -28,7 +28,7 @@ procSuite "WakuNode": Port(60000)) pubSubTopic = "chat" contentTopic = ContentTopic("/waku/2/default-content/proto") - filterRequest = FilterRequest(topic: pubSubTopic, contentFilters: @[ContentFilter(topics: @[contentTopic])], subscribe: true) + filterRequest = FilterRequest(pubSubTopic: pubSubTopic, contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], subscribe: true) message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) @@ -80,7 +80,7 @@ procSuite "WakuNode": Port(60002)) pubSubTopic = "chat" contentTopic = ContentTopic("/waku/2/default-content/proto") - filterRequest = FilterRequest(topic: pubSubTopic, contentFilters: @[ContentFilter(topics: @[contentTopic])], subscribe: true) + filterRequest = FilterRequest(pubSubTopic: pubSubTopic, contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], subscribe: true) message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) @@ -147,8 +147,8 @@ procSuite "WakuNode": otherPayload = @[byte 9] defaultMessage = WakuMessage(payload: defaultPayload, contentTopic: defaultContentTopic) otherMessage = WakuMessage(payload: otherPayload, contentTopic: otherContentTopic) - defaultFR = FilterRequest(contentFilters: @[ContentFilter(topics: @[defaultContentTopic])], subscribe: true) - otherFR = FilterRequest(contentFilters: @[ContentFilter(topics: @[otherContentTopic])], subscribe: true) + defaultFR = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[defaultContentTopic])], subscribe: true) + otherFR = FilterRequest(contentFilters: @[ContentFilter(contentTopics: @[otherContentTopic])], subscribe: true) await node1.start() node1.mountRelay() @@ -272,7 +272,7 @@ procSuite "WakuNode": msg == message completionFut.complete(true) - await node1.subscribe(FilterRequest(topic: "/waku/2/default-waku/proto", contentFilters: @[ContentFilter(topics: @[contentTopic])], subscribe: true), handler) + await node1.subscribe(FilterRequest(pubSubTopic: "/waku/2/default-waku/proto", contentFilters: @[ContentFilter(contentTopics: @[contentTopic])], subscribe: true), handler) await sleepAsync(2000.millis) diff --git a/waku/v2/node/jsonrpc/filter_api.nim b/waku/v2/node/jsonrpc/filter_api.nim index 53af00645..415c97902 100644 --- a/waku/v2/node/jsonrpc/filter_api.nim +++ b/waku/v2/node/jsonrpc/filter_api.nim @@ -58,12 +58,12 @@ proc installFilterApiHandlers*(node: WakuNode, rpcsrv: RpcServer, messageCache: # Construct a filter request # @TODO use default PubSub topic if undefined - let fReq = if topic.isSome: FilterRequest(topic: topic.get, contentFilters: contentFilters, subscribe: true) else: FilterRequest(contentFilters: contentFilters, subscribe: true) + let fReq = if topic.isSome: FilterRequest(pubSubTopic: topic.get, contentFilters: contentFilters, subscribe: true) else: FilterRequest(contentFilters: contentFilters, subscribe: true) if (await node.subscribe(fReq, filterHandler).withTimeout(futTimeout)): # Successfully subscribed to all content filters - for cTopic in concat(contentFilters.mapIt(it.topics)): + for cTopic in concat(contentFilters.mapIt(it.contentTopics)): # Create message cache for each subscribed content topic messageCache[cTopic] = @[] @@ -78,12 +78,12 @@ proc installFilterApiHandlers*(node: WakuNode, rpcsrv: RpcServer, messageCache: # Construct a filter request # @TODO consider using default PubSub topic if undefined - let fReq = if topic.isSome: FilterRequest(topic: topic.get, contentFilters: contentFilters, subscribe: false) else: FilterRequest(contentFilters: contentFilters, subscribe: false) + let fReq = if topic.isSome: FilterRequest(pubSubTopic: topic.get, contentFilters: contentFilters, subscribe: false) else: FilterRequest(contentFilters: contentFilters, subscribe: false) if (await node.unsubscribe(fReq).withTimeout(futTimeout)): # Successfully unsubscribed from all content filters - for cTopic in concat(contentFilters.mapIt(it.topics)): + for cTopic in concat(contentFilters.mapIt(it.contentTopics)): # Remove message cache for each unsubscribed content topic messageCache.del(cTopic) diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index 11cd0a094..7b0cd3bfc 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -81,7 +81,7 @@ proc removeContentFilters(filters: var Filters, contentFilters: seq[ContentFilte # Flatten all unsubscribe topics into single seq var unsubscribeTopics: seq[ContentTopic] for cf in contentFilters: - unsubscribeTopics = unsubscribeTopics.concat(cf.topics) + unsubscribeTopics = unsubscribeTopics.concat(cf.contentTopics) debug "unsubscribing", unsubscribeTopics=unsubscribeTopics @@ -90,10 +90,10 @@ proc removeContentFilters(filters: var Filters, contentFilters: seq[ContentFilte # Iterate filter entries to remove matching content topics for cf in f.contentFilters.mitems: # Iterate content filters in filter entry - cf.topics.keepIf(proc (t: auto): bool = t notin unsubscribeTopics) + cf.contentTopics.keepIf(proc (t: auto): bool = t notin unsubscribeTopics) # make sure we delete the content filter # if no more topics are left - f.contentFilters.keepIf(proc (cf: auto): bool = cf.topics.len > 0) + f.contentFilters.keepIf(proc (cf: auto): bool = cf.contentTopics.len > 0) if f.contentFilters.len == 0: rIdToRemove.add(rId) @@ -186,7 +186,6 @@ proc subscribe(node: WakuNode, topic: Topic, handler: Option[TopicHandler]) = let msg = WakuMessage.init(data) if msg.isOk(): - node.filters.notify(msg.value(), "") # Trigger filter handlers on a light node await node.subscriptions.notify(topic, msg.value()) # Trigger subscription handlers on a store/filter node waku_node_messages.inc(labelValues = ["relay"]) @@ -329,7 +328,7 @@ proc mountFilter*(node: WakuNode) = proc filterHandler(requestId: string, msg: MessagePush) {.gcsafe.} = info "push received" for message in msg.messages: - node.filters.notify(message, requestId) + node.filters.notify(message, requestId) # Trigger filter handlers on a light node waku_node_messages.inc(labelValues = ["filter"]) node.wakuFilter = WakuFilter.init(node.peerManager, node.rng, filterHandler) diff --git a/waku/v2/protocol/waku_filter/waku_filter.nim b/waku/v2/protocol/waku_filter/waku_filter.nim index d872dc782..780b31367 100644 --- a/waku/v2/protocol/waku_filter/waku_filter.nim +++ b/waku/v2/protocol/waku_filter/waku_filter.nim @@ -48,8 +48,8 @@ proc notify*(filters: Filters, msg: WakuMessage, requestId: string = "") = # TODO: In case of no topics we should either trigger here for all messages, # or we should not allow such filter to exist in the first place. for contentFilter in filter.contentFilters: - if contentFilter.topics.len > 0: - if msg.contentTopic in contentFilter.topics: + if contentFilter.contentTopics.len > 0: + if msg.contentTopic in contentFilter.contentTopics: filter.handler(msg) break @@ -57,7 +57,7 @@ proc unsubscribeFilters(subscribers: var seq[Subscriber], request: FilterRequest # Flatten all unsubscribe topics into single seq var unsubscribeTopics: seq[ContentTopic] for cf in request.contentFilters: - unsubscribeTopics = unsubscribeTopics.concat(cf.topics) + unsubscribeTopics = unsubscribeTopics.concat(cf.contentTopics) debug "unsubscribing", peerId=peerId, unsubscribeTopics=unsubscribeTopics @@ -67,11 +67,11 @@ proc unsubscribeFilters(subscribers: var seq[Subscriber], request: FilterRequest # Iterate through subscriber entries matching peer ID to remove matching content topics for cf in subscriber.filter.contentFilters.mitems: # Iterate content filters in filter entry - cf.topics.keepIf(proc (t: auto): bool = t notin unsubscribeTopics) + cf.contentTopics.keepIf(proc (t: auto): bool = t notin unsubscribeTopics) # make sure we delete the content filter # if no more topics are left - subscriber.filter.contentFilters.keepIf(proc (cf: auto): bool = cf.topics.len > 0) + subscriber.filter.contentFilters.keepIf(proc (cf: auto): bool = cf.contentTopics.len > 0) # make sure we delete the subscriber # if no more content filters left @@ -83,15 +83,15 @@ proc unsubscribeFilters(subscribers: var seq[Subscriber], request: FilterRequest proc encode*(filter: ContentFilter): ProtoBuffer = result = initProtoBuffer() - for topic in filter.topics: - result.write(1, topic) + for contentTopic in filter.contentTopics: + result.write(1, contentTopic) proc encode*(rpc: FilterRequest): ProtoBuffer = result = initProtoBuffer() result.write(1, uint64(rpc.subscribe)) - result.write(2, rpc.topic) + result.write(2, rpc.pubSubTopic) for filter in rpc.contentFilters: result.write(3, filter.encode()) @@ -99,20 +99,20 @@ proc encode*(rpc: FilterRequest): ProtoBuffer = proc init*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] = let pb = initProtoBuffer(buffer) - var topics: seq[ContentTopic] - discard ? pb.getRepeatedField(1, topics) + var contentTopics: seq[ContentTopic] + discard ? pb.getRepeatedField(1, contentTopics) - ok(ContentFilter(topics: topics)) + ok(ContentFilter(contentTopics: contentTopics)) proc init*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] = - var rpc = FilterRequest(contentFilters: @[], topic: "") + var rpc = FilterRequest(contentFilters: @[], pubSubTopic: "") let pb = initProtoBuffer(buffer) var subflag: uint64 if ? pb.getField(1, subflag): rpc.subscribe = bool(subflag) - discard ? pb.getField(2, rpc.topic) + discard ? pb.getField(2, rpc.pubSubTopic) var buffs: seq[seq[byte]] discard ? pb.getRepeatedField(3, buffs) @@ -205,12 +205,16 @@ proc subscription*(proto: WakuFilter): MessageNotificationSubscription = ## Returns a Filter for the specific protocol ## This filter can then be used to send messages to subscribers that match conditions. proc handle(topic: string, msg: WakuMessage) {.async.} = + trace "handle WakuFilter subscription", topic=topic, msg=msg + for subscriber in proto.subscribers: - if subscriber.filter.topic != topic: + if subscriber.filter.pubSubTopic != "" and subscriber.filter.pubSubTopic != topic: + trace "Subscriber's filter pubsubTopic does not match message topic", filter=subscriber.filter.pubSubTopic, topic=topic continue for filter in subscriber.filter.contentFilters: - if msg.contentTopic in filter.topics: + if msg.contentTopic in filter.contentTopics: + trace "Found matching contentTopic", filter=filter, msg=msg let push = FilterRPC(requestId: subscriber.requestId, push: MessagePush(messages: @[msg])) let connOpt = await proto.peerManager.dialPeer(subscriber.peer, WakuFilterCodec) diff --git a/waku/v2/protocol/waku_filter/waku_filter_types.nim b/waku/v2/protocol/waku_filter/waku_filter_types.nim index 5e737ce91..92c957581 100644 --- a/waku/v2/protocol/waku_filter/waku_filter_types.nim +++ b/waku/v2/protocol/waku_filter/waku_filter_types.nim @@ -10,7 +10,7 @@ export waku_message type ContentFilter* = object - topics*: seq[ContentTopic] + contentTopics*: seq[ContentTopic] ContentFilterHandler* = proc(msg: WakuMessage) {.gcsafe, closure.} @@ -23,7 +23,7 @@ type FilterRequest* = object contentFilters*: seq[ContentFilter] - topic*: string + pubSubTopic*: string subscribe*: bool MessagePush* = object