Add functionality to unsubscribe from pubsub topic (#241)

This commit is contained in:
Hanno Cornelius 2020-10-27 03:13:56 +02:00 committed by GitHub
parent aab93912ef
commit 16c4e65762
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 33 additions and 9 deletions

View File

@ -40,11 +40,15 @@ proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHa
## ##
## Status: Implemented. ## Status: Implemented.
proc unsubscribe*(w: WakuNode, topic: Topic) = proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) {.async.} =
## Unsubscribe from a topic. ## Unsubscribes a handler from a PubSub topic.
## ##
## Status: Not yet implemented. ## Status: Implemented.
## TODO Implement.
proc unsubscribeAll*(node: WakuNode, topic: Topic) {.async.} =
## Unsubscribes all handlers registered on a specific PubSub topic.
##
## Status: Implemented.
proc unsubscribe*(w: WakuNode, contentFilter: ContentFilter) = proc unsubscribe*(w: WakuNode, contentFilter: ContentFilter) =
## Unsubscribe from a content filter. ## Unsubscribe from a content filter.

View File

@ -132,12 +132,24 @@ proc subscribe*(node: WakuNode, request: FilterRequest, handler: ContentFilterHa
id = await node.wakuFilter.subscribe(request) id = await node.wakuFilter.subscribe(request)
node.filters[id] = Filter(contentFilters: request.contentFilters, handler: handler) node.filters[id] = Filter(contentFilters: request.contentFilters, handler: handler)
proc unsubscribe*(w: WakuNode, topic: Topic) = proc unsubscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) {.async.} =
echo "NYI" ## Unsubscribes a handler from a PubSub topic.
## Unsubscribe from a topic.
## ##
## Status: Not yet implemented. ## Status: Implemented.
## TODO Implement. info "unsubscribe", topic=topic
let wakuRelay = node.wakuRelay
await wakuRelay.unsubscribe(@[(topic, handler)])
proc unsubscribeAll*(node: WakuNode, topic: Topic) {.async.} =
## Unsubscribes all handlers registered on a specific PubSub topic.
##
## Status: Implemented.
info "unsubscribeAll", topic=topic
let wakuRelay = node.wakuRelay
await wakuRelay.unsubscribeAll(topic)
proc unsubscribe*(w: WakuNode, contentFilter: waku_types.ContentFilter) = proc unsubscribe*(w: WakuNode, contentFilter: waku_types.ContentFilter) =
echo "NYI" echo "NYI"

View File

@ -74,6 +74,14 @@ method unsubscribe*(w: WakuRelay,
else: else:
await procCall FloodSub(w).unsubscribe(topics) await procCall FloodSub(w).unsubscribe(topics)
method unsubscribeAll*(w: WakuRelay,
pubSubTopic: string) {.async.} =
debug "unsubscribeAll"
if w.gossipEnabled:
await procCall GossipSub(w).unsubscribeAll(pubSubTopic)
else:
await procCall FloodSub(w).unsubscribeAll(pubSubTopic)
# GossipSub specific methods -------------------------------------------------- # GossipSub specific methods --------------------------------------------------
method start*(w: WakuRelay) {.async.} = method start*(w: WakuRelay) {.async.} =
debug "start" debug "start"