From 0c3f109d9e48ea94e0bcfbb4a05e11ab94409e46 Mon Sep 17 00:00:00 2001 From: Vitaliy Vlasov Date: Mon, 14 Jun 2021 15:54:50 +0300 Subject: [PATCH] Add UnsubscribeFilter --- examples/filter2/main.go | 6 ++++ waku/v2/node/wakunode2.go | 43 ++++++++++++++++++++++++-- waku/v2/protocol/filter/waku_filter.go | 40 +++++++++++++++++++++++- 3 files changed, 85 insertions(+), 4 deletions(-) diff --git a/examples/filter2/main.go b/examples/filter2/main.go index 38d93a1d..e7b3b96a 100644 --- a/examples/filter2/main.go +++ b/examples/filter2/main.go @@ -92,6 +92,12 @@ func main() { go writeLoop(ctx, fullNode) go readLoop(fullNode) + go func() { + // Unsubscribe filter after 5 seconds + time.Sleep(5 * time.Second) + filterRequest.Subscribe = false + lightNode.UnsubscribeFilter(ctx, filterRequest) + }() // Wait for a SIGINT or SIGTERM signal ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index c554a4a3..a799235a 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -357,14 +357,13 @@ func (node *WakuNode) SubscribeFilter(ctx context.Context, request pb.FilterRequ // Sanity check for well-formed subscribe FilterRequest //doAssert(request.subscribe, "invalid subscribe request") - log.Info("subscribe content", request) + log.Info("SubscribeFilter, request: ", request) var id string if node.filter != nil { id, err := node.filter.Subscribe(ctx, request) - // TODO if id == "" || err != nil { // Failed to subscribe log.Error("remote subscription to filter failed", request) @@ -374,7 +373,45 @@ func (node *WakuNode) SubscribeFilter(ctx context.Context, request pb.FilterRequ } // Register handler for filter, whether remote subscription succeeded or not - node.filters[string(id)] = filter.Filter{ContentFilters: request.ContentFilters, Chan: ch} + node.filters[id] = filter.Filter{ContentFilters: request.ContentFilters, Chan: ch} +} + +func (node *WakuNode) UnsubscribeFilter(ctx context.Context, request pb.FilterRequest) { + + log.Info("UnsubscribeFilter, request: ", request) + // Send message to full node in order to unsubscribe + node.filter.Unsubscribe(ctx, request) + + // Remove local filter + var idsToRemove []string + for id, f := range node.filters { + // Iterate filter entries to remove matching content topics + // make sure we delete the content filter + // if no more topics are left + for _, cfToDelete := range request.ContentFilters { + for i, cf := range f.ContentFilters { + if cf.ContentTopic == cfToDelete.ContentTopic { + l := len(f.ContentFilters) - 1 + f.ContentFilters[l], f.ContentFilters[i] = f.ContentFilters[i], f.ContentFilters[l] + f.ContentFilters = f.ContentFilters[:l] + break + } + + } + if len(f.ContentFilters) == 0 { + idsToRemove = append(idsToRemove, id) + } + } + } + + for _, rId := range idsToRemove { + for id, _ := range node.filters { + if id == rId { + delete(node.filters, id) + break + } + } + } } func (node *WakuNode) Publish(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic) ([]byte, error) { diff --git a/waku/v2/protocol/filter/waku_filter.go b/waku/v2/protocol/filter/waku_filter.go index a92cecc0..62f46b3f 100644 --- a/waku/v2/protocol/filter/waku_filter.go +++ b/waku/v2/protocol/filter/waku_filter.go @@ -135,7 +135,45 @@ func (wf *WakuFilter) onRequest(s network.Stream) { wf.subscribers = append(wf.subscribers, subscriber) log.Info("Full node, add a filter subscriber ", subscriber) } else { - // TODO wf.subscribers.unsubscribeFilters(filterRPCRequest.Request, conn.peerInfo.peerId) + peerId := string(s.Conn().RemotePeer()) + log.Info("Full node, remove a filter subscriber ", peerId) + contentFilters := filterRPCRequest.Request.ContentFilters + var peerIdsToRemove []string + for _, subscriber := range wf.subscribers { + if subscriber.peer != peerId { + continue + } + + // make sure we delete the content filter + // if no more topics are left + for i, contentFilter := range contentFilters { + subCfs := subscriber.filter.ContentFilters + for _, cf := range subCfs { + if cf.ContentTopic == contentFilter.ContentTopic { + l := len(subCfs) - 1 + subCfs[l], subCfs[i] = subCfs[i], subCfs[l] + subscriber.filter.ContentFilters = subCfs[:l] + } + } + } + + if len(subscriber.filter.ContentFilters) == 0 { + peerIdsToRemove = append(peerIdsToRemove, subscriber.peer) + } + } + + // make sure we delete the subscriber + // if no more content filters left + for _, peerId := range peerIdsToRemove { + for i, s := range wf.subscribers { + if s.peer == peerId { + l := len(wf.subscribers) - 1 + wf.subscribers[l], wf.subscribers[i] = wf.subscribers[i], wf.subscribers[l] + wf.subscribers = wf.subscribers[:l] + break + } + } + } } } else if filterRPCRequest.Push != nil { // We're on a light node.