Feature/unsubscribe from content filter (#255)

Unsubscribe from a content topic

Co-authored-by: Oskar Thorén <ot@oskarthoren.com>
This commit is contained in:
Hanno Cornelius 2020-11-10 09:13:16 +02:00 committed by GitHub
parent ea5f9993a7
commit 24f4a087de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 171 additions and 21 deletions

View File

@ -225,7 +225,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
info "Hit filter handler" info "Hit filter handler"
await node.subscribe( await node.subscribe(
FilterRequest(contentFilters: @[ContentFilter(topics: @[DefaultContentTopic])], topic: DefaultTopic), FilterRequest(contentFilters: @[ContentFilter(topics: @[DefaultContentTopic])], topic: DefaultTopic, subscribe: true),
filterHandler filterHandler
) )

View File

@ -17,6 +17,7 @@ import
procSuite "Waku Filter": procSuite "Waku Filter":
asyncTest "handle filter": asyncTest "handle filter":
const defaultTopic = "/waku/2/default-waku/proto"
let let
key = PrivateKey.random(ECDSA, rng[]).get() key = PrivateKey.random(ECDSA, rng[]).get()
@ -39,7 +40,7 @@ procSuite "Waku Filter":
let let
proto = WakuFilter.init(dialSwitch, crypto.newRng(), handle) proto = WakuFilter.init(dialSwitch, crypto.newRng(), handle)
rpc = FilterRequest(contentFilters: @[ContentFilter(topics: @[contentTopic])], topic: "topic") rpc = FilterRequest(contentFilters: @[ContentFilter(topics: @[contentTopic])], topic: defaultTopic, subscribe: true)
dialSwitch.mount(proto) dialSwitch.mount(proto)
proto.setPeer(listenSwitch.peerInfo) proto.setPeer(listenSwitch.peerInfo)
@ -59,7 +60,73 @@ procSuite "Waku Filter":
await sleepAsync(2.seconds) await sleepAsync(2.seconds)
await subscriptions.notify("topic", post) await subscriptions.notify(defaultTopic, post)
check: check:
(await responseRequestIdFuture) == id (await responseRequestIdFuture) == id
asyncTest "Can subscribe and unsubscribe from content filter":
const defaultTopic = "/waku/2/default-waku/proto"
let
key = PrivateKey.random(ECDSA, rng[]).get()
peer = PeerInfo.init(key)
contentTopic = ContentTopic(1)
post = WakuMessage(payload: @[byte 1, 2, 3], contentTopic: contentTopic)
var dialSwitch = newStandardSwitch()
discard await dialSwitch.start()
var listenSwitch = newStandardSwitch(some(key))
discard await listenSwitch.start()
var responseCompletionFuture = newFuture[bool]()
proc handle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
check:
msg.messages.len() == 1
msg.messages[0] == post
responseCompletionFuture.complete(true)
let
proto = WakuFilter.init(dialSwitch, crypto.newRng(), handle)
rpc = FilterRequest(contentFilters: @[ContentFilter(topics: @[contentTopic])], topic: defaultTopic, subscribe: true)
dialSwitch.mount(proto)
proto.setPeer(listenSwitch.peerInfo)
proc emptyHandle(requestId: string, msg: MessagePush) {.gcsafe, closure.} =
discard
let
proto2 = WakuFilter.init(listenSwitch, crypto.newRng(), emptyHandle)
subscription = proto2.subscription()
var subscriptions = newTable[string, MessageNotificationSubscription]()
subscriptions["test"] = subscription
listenSwitch.mount(proto2)
let id = await proto.subscribe(rpc)
await sleepAsync(2.seconds)
await subscriptions.notify(defaultTopic, post)
check:
# Check that subscription works as expected
(await responseCompletionFuture.withTimeout(3.seconds)) == true
# Reset to test unsubscribe
responseCompletionFuture = newFuture[bool]()
let
rpcU = FilterRequest(contentFilters: @[ContentFilter(topics: @[contentTopic])], topic: defaultTopic, subscribe: false)
await proto.unsubscribe(rpcU)
await sleepAsync(2.seconds)
await subscriptions.notify(defaultTopic, post)
check:
# Check that unsubscribe works as expected
(await responseCompletionFuture.withTimeout(5.seconds)) == false

View File

@ -20,7 +20,7 @@ procSuite "WakuNode":
Port(60000)) Port(60000))
pubSubTopic = "chat" pubSubTopic = "chat"
contentTopic = ContentTopic(1) contentTopic = ContentTopic(1)
filterRequest = FilterRequest(topic: pubSubTopic, contentFilters: @[ContentFilter(topics: @[contentTopic])]) filterRequest = FilterRequest(topic: pubSubTopic, contentFilters: @[ContentFilter(topics: @[contentTopic])], subscribe: true)
message = WakuMessage(payload: "hello world".toBytes(), message = WakuMessage(payload: "hello world".toBytes(),
contentTopic: contentTopic) contentTopic: contentTopic)
@ -72,7 +72,7 @@ procSuite "WakuNode":
Port(60002)) Port(60002))
pubSubTopic = "chat" pubSubTopic = "chat"
contentTopic = ContentTopic(1) contentTopic = ContentTopic(1)
filterRequest = FilterRequest(topic: pubSubTopic, contentFilters: @[ContentFilter(topics: @[contentTopic])]) filterRequest = FilterRequest(topic: pubSubTopic, contentFilters: @[ContentFilter(topics: @[contentTopic])], subscribe: true)
message = WakuMessage(payload: "hello world".toBytes(), message = WakuMessage(payload: "hello world".toBytes(),
contentTopic: contentTopic) contentTopic: contentTopic)
@ -186,7 +186,7 @@ procSuite "WakuNode":
msg == message msg == message
completionFut.complete(true) completionFut.complete(true)
await node1.subscribe(FilterRequest(topic: "/waku/2/default-waku/proto", contentFilters: @[ContentFilter(topics: @[contentTopic])]), handler) await node1.subscribe(FilterRequest(topic: "/waku/2/default-waku/proto", contentFilters: @[ContentFilter(topics: @[contentTopic])], subscribe: true), handler)
await sleepAsync(2000.millis) await sleepAsync(2000.millis)

View File

@ -85,7 +85,7 @@ proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) =
contentTopics.add(ContentTopic(topic)) contentTopics.add(ContentTopic(topic))
filters.add(ContentFilter(topics: contentTopics)) filters.add(ContentFilter(topics: contentTopics))
await node.subscribe(FilterRequest(topic: topic, contentFilters: filters), handler) await node.subscribe(FilterRequest(topic: topic, contentFilters: filters, subscribe: true), handler)
return true return true
rpcsrv.rpc("waku_info") do() -> string: rpcsrv.rpc("waku_info") do() -> string:

View File

@ -85,6 +85,7 @@ type
FilterRequest* = object FilterRequest* = object
contentFilters*: seq[ContentFilter] contentFilters*: seq[ContentFilter]
topic*: string topic*: string
subscribe*: bool
MessagePush* = object MessagePush* = object
messages*: seq[WakuMessage] messages*: seq[WakuMessage]

View File

@ -1,5 +1,5 @@
import import
std/[options, tables, strutils], std/[options, tables, strutils, sequtils],
chronos, chronicles, stew/shims/net as stewNet, chronos, chronicles, stew/shims/net as stewNet,
# TODO: Why do we need eth keys? # TODO: Why do we need eth keys?
eth/keys, eth/keys,
@ -47,6 +47,34 @@ proc initAddress(T: type MultiAddress, str: string): T =
raise newException(ValueError, raise newException(ValueError,
"Invalid bootstrap node multi-address") "Invalid bootstrap node multi-address")
proc removeContentFilters(filters: var Filters, contentFilters: seq[ContentFilter]) {.gcsafe.} =
# Flatten all unsubscribe topics into single seq
var unsubscribeTopics: seq[ContentTopic]
for cf in contentFilters:
unsubscribeTopics = unsubscribeTopics.concat(cf.topics)
debug "unsubscribing", unsubscribeTopics=unsubscribeTopics
var rIdToRemove: seq[string] = @[]
for rId, f in filters.mpairs:
# Iterate filter entries to remove matching content topics
for cf in f.contentFilters.mitems:
# Iterate content filters in filter entry
cf.topics.keepIf(proc (t: auto): bool = t notin unsubscribeTopics)
# make sure we delete the content filter
# if no more topics are left
f.contentFilters.keepIf(proc (cf: auto): bool = cf.topics.len > 0)
if f.contentFilters.len == 0:
rIdToRemove.add(rId)
# make sure we delete the filter entry
# if no more content filters left
for rId in rIdToRemove:
filters.del(rId)
debug "filters modified", filters=filters
template tcpEndPoint(address, port): auto = template tcpEndPoint(address, port): auto =
MultiAddress.init(address, tcpProtocol, port) MultiAddress.init(address, tcpProtocol, port)
@ -124,6 +152,10 @@ proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHa
## FilterHandler is a method that takes a MessagePush. ## FilterHandler is a method that takes a MessagePush.
## ##
## Status: Implemented. ## Status: Implemented.
# Sanity check for well-formed subscribe FilterRequest
doAssert(request.subscribe, "invalid subscribe request")
info "subscribe content", filter=request info "subscribe content", filter=request
var id = generateRequestId(node.rng) var id = generateRequestId(node.rng)
@ -151,13 +183,18 @@ proc unsubscribeAll*(node: WakuNode, topic: Topic) {.async.} =
await wakuRelay.unsubscribeAll(topic) await wakuRelay.unsubscribeAll(topic)
proc unsubscribe*(w: WakuNode, contentFilter: waku_types.ContentFilter) = proc unsubscribe*(node: WakuNode, request: FilterRequest) {.async, gcsafe.} =
echo "NYI"
## Unsubscribe from a content filter. ## Unsubscribe from a content filter.
## ##
## Status: Not yet implemented. ## Status: Implemented.
## TODO Implement.
# Sanity check for well-formed unsubscribe FilterRequest
doAssert(request.subscribe == false, "invalid unsubscribe request")
info "unsubscribe content", filter=request
await node.wakuFilter.unsubscribe(request)
node.filters.removeContentFilters(request.contentFilters)
proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) = proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) =
## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a ## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a

View File

@ -1,5 +1,5 @@
import import
std/tables, std/[tables, sequtils],
bearssl, bearssl,
chronos, chronicles, metrics, stew/results, chronos, chronicles, metrics, stew/results,
libp2p/protocols/pubsub/pubsubpeer, libp2p/protocols/pubsub/pubsubpeer,
@ -23,6 +23,33 @@ logScope:
const const
WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1" WakuFilterCodec* = "/vac/waku/filter/2.0.0-beta1"
proc unsubscribeFilters(subscribers: var seq[Subscriber], request: FilterRequest, peerId: PeerID) =
# Flatten all unsubscribe topics into single seq
var unsubscribeTopics: seq[ContentTopic]
for cf in request.contentFilters:
unsubscribeTopics = unsubscribeTopics.concat(cf.topics)
debug "unsubscribing", peerId=peerId, unsubscribeTopics=unsubscribeTopics
for subscriber in subscribers.mitems:
if subscriber.peer.peerId != peerId: continue
# Iterate through subscriber entries matching peer ID to remove matching content topics
for cf in subscriber.filter.contentFilters.mitems:
# Iterate content filters in filter entry
cf.topics.keepIf(proc (t: auto): bool = t notin unsubscribeTopics)
# make sure we delete the content filter
# if no more topics are left
subscriber.filter.contentFilters.keepIf(proc (cf: auto): bool = cf.topics.len > 0)
# make sure we delete the subscriber
# if no more content filters left
subscribers.keepIf(proc (s: auto): bool = s.filter.contentFilters.len > 0)
debug "subscribers modified", subscribers=subscribers
# @TODO: metrics?
proc encode*(filter: ContentFilter): ProtoBuffer = proc encode*(filter: ContentFilter): ProtoBuffer =
result = initProtoBuffer() result = initProtoBuffer()
@ -31,12 +58,14 @@ proc encode*(filter: ContentFilter): ProtoBuffer =
proc encode*(rpc: FilterRequest): ProtoBuffer = proc encode*(rpc: FilterRequest): ProtoBuffer =
result = initProtoBuffer() result = initProtoBuffer()
for filter in rpc.contentFilters: result.write(1, uint64(rpc.subscribe))
result.write(1, filter.encode())
result.write(2, rpc.topic) result.write(2, rpc.topic)
for filter in rpc.contentFilters:
result.write(3, filter.encode())
proc init*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] = proc init*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
@ -49,14 +78,18 @@ proc init*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] =
var rpc = FilterRequest(contentFilters: @[], topic: "") var rpc = FilterRequest(contentFilters: @[], topic: "")
let pb = initProtoBuffer(buffer) let pb = initProtoBuffer(buffer)
var subflag: uint64
if ? pb.getField(1, subflag):
rpc.subscribe = bool(subflag)
discard ? pb.getField(2, rpc.topic)
var buffs: seq[seq[byte]] var buffs: seq[seq[byte]]
discard ? pb.getRepeatedField(1, buffs) discard ? pb.getRepeatedField(3, buffs)
for buf in buffs: for buf in buffs:
rpc.contentFilters.add(? ContentFilter.init(buf)) rpc.contentFilters.add(? ContentFilter.init(buf))
discard ? pb.getField(2, rpc.topic)
ok(rpc) ok(rpc)
proc encode*(push: MessagePush): ProtoBuffer = proc encode*(push: MessagePush): ProtoBuffer =
@ -116,7 +149,10 @@ method init*(wf: WakuFilter) =
if value.push != MessagePush(): if value.push != MessagePush():
wf.pushHandler(value.requestId, value.push) wf.pushHandler(value.requestId, value.push)
if value.request != FilterRequest(): if value.request != FilterRequest():
wf.subscribers.add(Subscriber(peer: conn.peerInfo, requestId: value.requestId, filter: value.request)) if value.request.subscribe:
wf.subscribers.add(Subscriber(peer: conn.peerInfo, requestId: value.requestId, filter: value.request))
else:
wf.subscribers.unsubscribeFilters(value.request, conn.peerInfo.peerId)
wf.handler = handle wf.handler = handle
wf.codec = WakuFilterCodec wf.codec = WakuFilterCodec
@ -157,3 +193,12 @@ proc subscribe*(wf: WakuFilter, request: FilterRequest): Future[string] {.async,
let conn = await wf.switch.dial(peer.peerId, peer.addrs, WakuFilterCodec) let conn = await wf.switch.dial(peer.peerId, peer.addrs, WakuFilterCodec)
await conn.writeLP(FilterRPC(requestId: id, request: request).encode().buffer) await conn.writeLP(FilterRPC(requestId: id, request: request).encode().buffer)
result = id result = id
proc unsubscribe*(wf: WakuFilter, request: FilterRequest) {.async, gcsafe.} =
# @TODO: NO REAL REASON TO GENERATE REQUEST ID FOR UNSUBSCRIBE OTHER THAN CREATING SANE-LOOKING RPC.
let id = generateRequestId(wf.rng)
if wf.peers.len >= 1:
let peer = wf.peers[0].peerInfo
# @TODO: THERE SHOULD BE ERROR HANDLING HERE, WHAT IF A PEER IS GONE? WHAT IF THERE IS A TIMEOUT ETC.
let conn = await wf.switch.dial(peer.peerId, peer.addrs, WakuFilterCodec)
await conn.writeLP(FilterRPC(requestId: id, request: request).encode().buffer)