From 24f4a087ded6b644154d97e150c3382f1445a34e Mon Sep 17 00:00:00 2001 From: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com> Date: Tue, 10 Nov 2020 09:13:16 +0200 Subject: [PATCH] Feature/unsubscribe from content filter (#255) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Unsubscribe from a content topic Co-authored-by: Oskar Thorén --- examples/v2/chat2.nim | 2 +- tests/v2/test_waku_filter.nim | 71 +++++++++++++++++++++++++++++++- tests/v2/test_wakunode.nim | 6 +-- waku/node/v2/rpc/wakurpc.nim | 2 +- waku/node/v2/waku_types.nim | 1 + waku/node/v2/wakunode2.nim | 49 +++++++++++++++++++--- waku/protocol/v2/waku_filter.nim | 61 +++++++++++++++++++++++---- 7 files changed, 171 insertions(+), 21 deletions(-) diff --git a/examples/v2/chat2.nim b/examples/v2/chat2.nim index 0c29749e5..ab45ec8e4 100644 --- a/examples/v2/chat2.nim +++ b/examples/v2/chat2.nim @@ -225,7 +225,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = info "Hit filter handler" await node.subscribe( - FilterRequest(contentFilters: @[ContentFilter(topics: @[DefaultContentTopic])], topic: DefaultTopic), + FilterRequest(contentFilters: @[ContentFilter(topics: @[DefaultContentTopic])], topic: DefaultTopic, subscribe: true), filterHandler ) diff --git a/tests/v2/test_waku_filter.nim b/tests/v2/test_waku_filter.nim index d0ea0b1b0..1212fb59e 100644 --- a/tests/v2/test_waku_filter.nim +++ b/tests/v2/test_waku_filter.nim @@ -17,6 +17,7 @@ import procSuite "Waku Filter": asyncTest "handle filter": + const defaultTopic = "/waku/2/default-waku/proto" let key = PrivateKey.random(ECDSA, rng[]).get() @@ -39,7 +40,7 @@ procSuite "Waku Filter": let proto = WakuFilter.init(dialSwitch, crypto.newRng(), handle) - rpc = FilterRequest(contentFilters: @[ContentFilter(topics: @[contentTopic])], topic: "topic") + rpc = FilterRequest(contentFilters: @[ContentFilter(topics: @[contentTopic])], topic: defaultTopic, subscribe: true) dialSwitch.mount(proto) proto.setPeer(listenSwitch.peerInfo) @@ -59,7 +60,73 @@ procSuite "Waku Filter": await sleepAsync(2.seconds) - await subscriptions.notify("topic", post) + await subscriptions.notify(defaultTopic, post) check: (await responseRequestIdFuture) == id + + asyncTest "Can subscribe and unsubscribe from content filter": + const defaultTopic = "/waku/2/default-waku/proto" + + let + key = PrivateKey.random(ECDSA, rng[]).get() + peer = PeerInfo.init(key) + contentTopic = ContentTopic(1) + post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic) + + var dialSwitch = newStandardSwitch() + discard await dialSwitch.start() + + var listenSwitch = newStandardSwitch(some(key)) + discard await listenSwitch.start() + + var responseCompletionFuture = newFuture[bool]() + proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = + check: + msg.messages.len() == 1 + msg.messages[0] == post + responseCompletionFuture.complete(true) + + let + proto = WakuFilter.init(dialSwitch, crypto.newRng(), handle) + rpc = FilterRequest(contentFilters: @[ContentFilter(topics: @[contentTopic])], topic: defaultTopic, subscribe: true) + + dialSwitch.mount(proto) + proto.setPeer(listenSwitch.peerInfo) + + proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} = + discard + + let + proto2 = WakuFilter.init(listenSwitch, crypto.newRng(), emptyHandle) + subscription = proto2.subscription() + + var subscriptions = newTable[string, MessageNotificationSubscription]() + subscriptions["test"] = subscription + listenSwitch.mount(proto2) + + let id = await proto.subscribe(rpc) + + await sleepAsync(2.seconds) + + await subscriptions.notify(defaultTopic, post) + + check: + # Check that subscription works as expected + (await responseCompletionFuture.withTimeout(3.seconds)) == true + + # Reset to test unsubscribe + responseCompletionFuture = newFuture[bool]() + + let + rpcU = FilterRequest(contentFilters: @[ContentFilter(topics: @[contentTopic])], topic: defaultTopic, subscribe: false) + + await proto.unsubscribe(rpcU) + + await sleepAsync(2.seconds) + + await subscriptions.notify(defaultTopic, post) + + check: + # Check that unsubscribe works as expected + (await responseCompletionFuture.withTimeout(5.seconds)) == false diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index 4c032afbb..8e6dfd431 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -20,7 +20,7 @@ procSuite "WakuNode": Port(60000)) pubSubTopic = "chat" contentTopic = ContentTopic(1) - filterRequest = FilterRequest(topic: pubSubTopic, contentFilters: @[ContentFilter(topics: @[contentTopic])]) + filterRequest = FilterRequest(topic: pubSubTopic, contentFilters: @[ContentFilter(topics: @[contentTopic])], subscribe: true) message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) @@ -72,7 +72,7 @@ procSuite "WakuNode": Port(60002)) pubSubTopic = "chat" contentTopic = ContentTopic(1) - filterRequest = FilterRequest(topic: pubSubTopic, contentFilters: @[ContentFilter(topics: @[contentTopic])]) + filterRequest = FilterRequest(topic: pubSubTopic, contentFilters: @[ContentFilter(topics: @[contentTopic])], subscribe: true) message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) @@ -186,7 +186,7 @@ procSuite "WakuNode": msg == message completionFut.complete(true) - await node1.subscribe(FilterRequest(topic: "/waku/2/default-waku/proto", contentFilters: @[ContentFilter(topics: @[contentTopic])]), handler) + await node1.subscribe(FilterRequest(topic: "/waku/2/default-waku/proto", contentFilters: @[ContentFilter(topics: @[contentTopic])], subscribe: true), handler) await sleepAsync(2000.millis) diff --git a/waku/node/v2/rpc/wakurpc.nim b/waku/node/v2/rpc/wakurpc.nim index 3d155b09e..2e453c6ac 100644 --- a/waku/node/v2/rpc/wakurpc.nim +++ b/waku/node/v2/rpc/wakurpc.nim @@ -85,7 +85,7 @@ proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) = contentTopics.add(ContentTopic(topic)) filters.add(ContentFilter(topics: contentTopics)) - await node.subscribe(FilterRequest(topic: topic, contentFilters: filters), handler) + await node.subscribe(FilterRequest(topic: topic, contentFilters: filters, subscribe: true), handler) return true rpcsrv.rpc("waku_info") do() -> string: diff --git a/waku/node/v2/waku_types.nim b/waku/node/v2/waku_types.nim index ec4270150..ac1870997 100644 --- a/waku/node/v2/waku_types.nim +++ b/waku/node/v2/waku_types.nim @@ -85,6 +85,7 @@ type FilterRequest* = object contentFilters*: seq[ContentFilter] topic*: string + subscribe*: bool MessagePush* = object messages*: seq[WakuMessage] diff --git a/waku/node/v2/wakunode2.nim b/waku/node/v2/wakunode2.nim index 5ab1ec7c1..579f36800 100644 --- a/waku/node/v2/wakunode2.nim +++ b/waku/node/v2/wakunode2.nim @@ -1,5 +1,5 @@ import - std/[options, tables, strutils], + std/[options, tables, strutils, sequtils], chronos, chronicles, stew/shims/net as stewNet, # TODO: Why do we need eth keys? eth/keys, @@ -47,6 +47,34 @@ proc initAddress(T: type MultiAddress, str: string): T = raise newException(ValueError, "Invalid bootstrap node multi-address") +proc removeContentFilters(filters: var Filters, contentFilters: seq[ContentFilter]) {.gcsafe.} = + # Flatten all unsubscribe topics into single seq + var unsubscribeTopics: seq[ContentTopic] + for cf in contentFilters: + unsubscribeTopics = unsubscribeTopics.concat(cf.topics) + + debug "unsubscribing", unsubscribeTopics=unsubscribeTopics + + var rIdToRemove: seq[string] = @[] + for rId, f in filters.mpairs: + # Iterate filter entries to remove matching content topics + for cf in f.contentFilters.mitems: + # Iterate content filters in filter entry + cf.topics.keepIf(proc (t: auto): bool = t notin unsubscribeTopics) + # make sure we delete the content filter + # if no more topics are left + f.contentFilters.keepIf(proc (cf: auto): bool = cf.topics.len > 0) + + if f.contentFilters.len == 0: + rIdToRemove.add(rId) + + # make sure we delete the filter entry + # if no more content filters left + for rId in rIdToRemove: + filters.del(rId) + + debug "filters modified", filters=filters + template tcpEndPoint(address, port): auto = MultiAddress.init(address, tcpProtocol, port) @@ -124,6 +152,10 @@ proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHa ## FilterHandler is a method that takes a MessagePush. ## ## Status: Implemented. + + # Sanity check for well-formed subscribe FilterRequest + doAssert(request.subscribe, "invalid subscribe request") + info "subscribe content", filter=request var id = generateRequestId(node.rng) @@ -151,13 +183,18 @@ proc unsubscribeAll*(node: WakuNode, topic: Topic) {.async.} = await wakuRelay.unsubscribeAll(topic) -proc unsubscribe*(w: WakuNode, contentFilter: waku_types.ContentFilter) = - echo "NYI" +proc unsubscribe*(node: WakuNode, request: FilterRequest) {.async, gcsafe.} = ## Unsubscribe from a content filter. ## - ## Status: Not yet implemented. - ## TODO Implement. - + ## Status: Implemented. + + # Sanity check for well-formed unsubscribe FilterRequest + doAssert(request.subscribe == false, "invalid unsubscribe request") + + info "unsubscribe content", filter=request + + await node.wakuFilter.unsubscribe(request) + node.filters.removeContentFilters(request.contentFilters) proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) = ## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a diff --git a/waku/protocol/v2/waku_filter.nim b/waku/protocol/v2/waku_filter.nim index ae3dd3ea9..89e0bfac6 100644 --- a/waku/protocol/v2/waku_filter.nim +++ b/waku/protocol/v2/waku_filter.nim @@ -1,5 +1,5 @@ import - std/tables, + std/[tables, sequtils], bearssl, chronos, chronicles, metrics, stew/results, libp2p/protocols/pubsub/pubsubpeer, @@ -23,6 +23,33 @@ logScope: const WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1" +proc unsubscribeFilters(subscribers: var seq[Subscriber], request: FilterRequest, peerId: PeerID) = + # Flatten all unsubscribe topics into single seq + var unsubscribeTopics: seq[ContentTopic] + for cf in request.contentFilters: + unsubscribeTopics = unsubscribeTopics.concat(cf.topics) + + debug "unsubscribing", peerId=peerId, unsubscribeTopics=unsubscribeTopics + + for subscriber in subscribers.mitems: + if subscriber.peer.peerId != peerId: continue + + # Iterate through subscriber entries matching peer ID to remove matching content topics + for cf in subscriber.filter.contentFilters.mitems: + # Iterate content filters in filter entry + cf.topics.keepIf(proc (t: auto): bool = t notin unsubscribeTopics) + + # make sure we delete the content filter + # if no more topics are left + subscriber.filter.contentFilters.keepIf(proc (cf: auto): bool = cf.topics.len > 0) + + # make sure we delete the subscriber + # if no more content filters left + subscribers.keepIf(proc (s: auto): bool = s.filter.contentFilters.len > 0) + + debug "subscribers modified", subscribers=subscribers + # @TODO: metrics? + proc encode*(filter: ContentFilter): ProtoBuffer = result = initProtoBuffer() @@ -31,12 +58,14 @@ proc encode*(filter: ContentFilter): ProtoBuffer = proc encode*(rpc: FilterRequest): ProtoBuffer = result = initProtoBuffer() - - for filter in rpc.contentFilters: - result.write(1, filter.encode()) + + result.write(1, uint64(rpc.subscribe)) result.write(2, rpc.topic) + for filter in rpc.contentFilters: + result.write(3, filter.encode()) + proc init*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] = let pb = initProtoBuffer(buffer) @@ -49,14 +78,18 @@ proc init*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] = var rpc = FilterRequest(contentFilters: @[], topic: "") let pb = initProtoBuffer(buffer) + var subflag: uint64 + if ? pb.getField(1, subflag): + rpc.subscribe = bool(subflag) + + discard ? pb.getField(2, rpc.topic) + var buffs: seq[seq[byte]] - discard ? pb.getRepeatedField(1, buffs) + discard ? pb.getRepeatedField(3, buffs) for buf in buffs: rpc.contentFilters.add(? ContentFilter.init(buf)) - discard ? pb.getField(2, rpc.topic) - ok(rpc) proc encode*(push: MessagePush): ProtoBuffer = @@ -116,7 +149,10 @@ method init*(wf: WakuFilter) = if value.push != MessagePush(): wf.pushHandler(value.requestId, value.push) if value.request != FilterRequest(): - wf.subscribers.add(Subscriber(peer: conn.peerInfo, requestId: value.requestId, filter: value.request)) + if value.request.subscribe: + wf.subscribers.add(Subscriber(peer: conn.peerInfo, requestId: value.requestId, filter: value.request)) + else: + wf.subscribers.unsubscribeFilters(value.request, conn.peerInfo.peerId) wf.handler = handle wf.codec = WakuFilterCodec @@ -157,3 +193,12 @@ proc subscribe*(wf: WakuFilter, request: FilterRequest): Future[string] {.async, let conn = await wf.switch.dial(peer.peerId, peer.addrs, WakuFilterCodec) await conn.writeLP(FilterRPC(requestId: id, request: request).encode().buffer) result = id + +proc unsubscribe*(wf: WakuFilter, request: FilterRequest) {.async, gcsafe.} = + # @TODO: NO REAL REASON TO GENERATE REQUEST ID FOR UNSUBSCRIBE OTHER THAN CREATING SANE-LOOKING RPC. + let id = generateRequestId(wf.rng) + if wf.peers.len >= 1: + let peer = wf.peers[0].peerInfo + # @TODO: THERE SHOULD BE ERROR HANDLING HERE, WHAT IF A PEER IS GONE? WHAT IF THERE IS A TIMEOUT ETC. + let conn = await wf.switch.dial(peer.peerId, peer.addrs, WakuFilterCodec) + await conn.writeLP(FilterRPC(requestId: id, request: request).encode().buffer)