feat_: aggregate filter subscriptions to do bulk subs with peer (#5440)

* feat_: aggregate filter subscriptions to do bulk subs with peer

* chore_: take possible deadlock fix in go-waku

* fix_: don't resubscribe filters unless there is a change in shard for community (#5467)
This commit is contained in:
Prem Chaitanya Prathi 2024-07-04 10:34:53 +05:30 committed by GitHub
parent 1e0a24f1b8
commit 437f830b51
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 182 additions and 74 deletions

2
go.mod
View File

@ -93,7 +93,7 @@ require (
github.com/schollz/peerdiscovery v1.7.0 github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/urfave/cli/v2 v2.27.2 github.com/urfave/cli/v2 v2.27.2
github.com/waku-org/go-waku v0.8.1-0.20240628140035-3604bf39ad28 github.com/waku-org/go-waku v0.8.1-0.20240701141800-5b5ea977afe0
github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/wk8/go-ordered-map/v2 v2.1.7
github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1

4
go.sum
View File

@ -2137,8 +2137,8 @@ github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5 h1:4K3IS97Jry
github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw= github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
github.com/waku-org/go-waku v0.8.1-0.20240628140035-3604bf39ad28 h1:7BqEcKgJs9QNzrLlC4jn1opCjGKZxNX2B/AVqhsvwzw= github.com/waku-org/go-waku v0.8.1-0.20240701141800-5b5ea977afe0 h1:3Idg7XvXc9iQpUyg8KNKgZnziHJRs3xm7EDJHFzC9to=
github.com/waku-org/go-waku v0.8.1-0.20240628140035-3604bf39ad28/go.mod h1:fHQ6WCSAlTollYHvAeZeO+d7lOYwcvQxHk+DyGLeoMI= github.com/waku-org/go-waku v0.8.1-0.20240701141800-5b5ea977afe0/go.mod h1:hkW5zXyM/ZIMDPniVooTk4dOGwY+OzrB0Q6fx+1sLpo=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=

View File

@ -3554,7 +3554,7 @@ func (m *Messenger) handleCommunityShardAndFiltersFromProto(community *communiti
} }
// Unsubscribing from existing shard // Unsubscribing from existing shard
if community.Shard() != nil { if community.Shard() != nil && community.Shard() != shard.FromProtobuff(message.GetShard()) {
err := m.unsubscribeFromShard(community.Shard()) err := m.unsubscribeFromShard(community.Shard())
if err != nil { if err != nil {
return err return err
@ -3567,12 +3567,14 @@ func (m *Messenger) handleCommunityShardAndFiltersFromProto(community *communiti
if err != nil { if err != nil {
return err return err
} }
// Update community filters in case of change of shard
if community.Shard() != shard.FromProtobuff(message.GetShard()) {
err = m.UpdateCommunityFilters(community)
if err != nil {
return err
}
err = m.UpdateCommunityFilters(community)
if err != nil {
return err
} }
return nil return nil
} }

View File

@ -140,9 +140,13 @@ func (s *MessengerCommunitiesShardingSuite) TestPostToCommunityChat() {
s.testPostToCommunityChat(shard, community, chat) s.testPostToCommunityChat(shard, community, chat)
} }
// Members should continue to receive messages in a community if sharding is disabled after it was previously enabled. // Members should continue to receive messages in a community if it is moved back to default shard.
{ {
s.testPostToCommunityChat(nil, community, chat) shard := &shard.Shard{
Cluster: shard.MainStatusShardCluster,
Index: 32,
}
s.testPostToCommunityChat(shard, community, chat)
} }
} }

View File

@ -71,8 +71,12 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte
return sub, nil return sub, nil
} }
func (apiSub *Sub) Unsubscribe() { func (apiSub *Sub) Unsubscribe(contentFilter protocol.ContentFilter) {
apiSub.cancel() _, err := apiSub.wf.Unsubscribe(apiSub.ctx, contentFilter)
//Not reading result unless we want to do specific error handling?
if err != nil {
apiSub.log.Debug("failed to unsubscribe", zap.Error(err), zap.Stringer("content-filter", contentFilter))
}
} }
func (apiSub *Sub) subscriptionLoop() { func (apiSub *Sub) subscriptionLoop() {

View File

@ -240,11 +240,19 @@ func selectWSListenAddresses(addresses []ma.Multiaddr) ([]ma.Multiaddr, error) {
func selectCircuitRelayListenAddresses(ctx context.Context, addresses []ma.Multiaddr) ([]ma.Multiaddr, error) { func selectCircuitRelayListenAddresses(ctx context.Context, addresses []ma.Multiaddr) ([]ma.Multiaddr, error) {
var result []ma.Multiaddr var result []ma.Multiaddr
for _, addr := range addresses { for _, addr := range addresses {
addr, err := decapsulateCircuitRelayAddr(ctx, addr) addr, err := decapsulateCircuitRelayAddr(ctx, addr)
if err != nil { if err != nil {
continue continue
} }
_, noWS := addr.ValueForProtocol(ma.P_WSS)
_, noWSS := addr.ValueForProtocol(ma.P_WS)
if noWS == nil || noWSS == nil { // WS or WSS found
continue
}
result = append(result, addr) result = append(result, addr)
} }

View File

@ -120,7 +120,8 @@ func Multiaddress(node *enode.Node) (peer.ID, []multiaddr.Multiaddr, error) {
maRaw := multiaddrRaw[offset+2 : offset+2+int(maSize)] maRaw := multiaddrRaw[offset+2 : offset+2+int(maSize)]
addr, err := multiaddr.NewMultiaddrBytes(maRaw) addr, err := multiaddr.NewMultiaddrBytes(maRaw)
if err != nil { if err != nil {
return "", nil, fmt.Errorf("invalid multiaddress field length") // The value is not a multiaddress. Ignoring...
continue
} }
hostInfoStr := fmt.Sprintf("/p2p/%s", peerID.String()) hostInfoStr := fmt.Sprintf("/p2p/%s", peerID.String())

View File

@ -37,6 +37,8 @@ type SubscriptionDetails struct {
} }
func (s *SubscriptionDetails) Add(contentTopics ...string) { func (s *SubscriptionDetails) Add(contentTopics ...string) {
s.mapRef.Lock()
defer s.mapRef.Unlock()
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
@ -44,14 +46,14 @@ func (s *SubscriptionDetails) Add(contentTopics ...string) {
if _, ok := s.ContentFilter.ContentTopics[ct]; !ok { if _, ok := s.ContentFilter.ContentTopics[ct]; !ok {
s.ContentFilter.ContentTopics[ct] = struct{}{} s.ContentFilter.ContentTopics[ct] = struct{}{}
// Increase the number of subscriptions for this (pubsubTopic, contentTopic) pair // Increase the number of subscriptions for this (pubsubTopic, contentTopic) pair
s.mapRef.Lock()
s.mapRef.increaseSubFor(s.ContentFilter.PubsubTopic, ct) s.mapRef.increaseSubFor(s.ContentFilter.PubsubTopic, ct)
s.mapRef.Unlock()
} }
} }
} }
func (s *SubscriptionDetails) Remove(contentTopics ...string) { func (s *SubscriptionDetails) Remove(contentTopics ...string) {
s.mapRef.Lock()
defer s.mapRef.Unlock()
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
@ -59,15 +61,13 @@ func (s *SubscriptionDetails) Remove(contentTopics ...string) {
if _, ok := s.ContentFilter.ContentTopics[ct]; ok { if _, ok := s.ContentFilter.ContentTopics[ct]; ok {
delete(s.ContentFilter.ContentTopics, ct) delete(s.ContentFilter.ContentTopics, ct)
// Decrease the number of subscriptions for this (pubsubTopic, contentTopic) pair // Decrease the number of subscriptions for this (pubsubTopic, contentTopic) pair
s.mapRef.Lock()
s.mapRef.decreaseSubFor(s.ContentFilter.PubsubTopic, ct) s.mapRef.decreaseSubFor(s.ContentFilter.PubsubTopic, ct)
s.mapRef.Unlock()
} }
} }
if len(s.ContentFilter.ContentTopics) == 0 { if len(s.ContentFilter.ContentTopics) == 0 {
// err doesn't matter // err doesn't matter
_ = s.mapRef.Delete(s) _ = s.mapRef.DeleteNoLock(s)
} }
} }
@ -105,7 +105,9 @@ func (s *SubscriptionDetails) CloseC() {
func (s *SubscriptionDetails) Close() error { func (s *SubscriptionDetails) Close() error {
s.CloseC() s.CloseC()
return s.mapRef.Delete(s) s.mapRef.Lock()
defer s.mapRef.Unlock()
return s.mapRef.DeleteNoLock(s)
} }
func (s *SubscriptionDetails) SetClosing() { func (s *SubscriptionDetails) SetClosing() {

View File

@ -130,9 +130,9 @@ func (sub *SubscriptionsMap) Has(peerID peer.ID, cf protocol.ContentFilter) bool
return true return true
} }
func (sub *SubscriptionsMap) Delete(subscription *SubscriptionDetails) error {
sub.Lock() // Caller has to acquire lock before invoking this method.This is done to avoid possible deadlock
defer sub.Unlock() func (sub *SubscriptionsMap) DeleteNoLock(subscription *SubscriptionDetails) error {
peerSubscription, ok := sub.items[subscription.PeerID] peerSubscription, ok := sub.items[subscription.PeerID]
if !ok { if !ok {

2
vendor/modules.txt vendored
View File

@ -1015,7 +1015,7 @@ github.com/waku-org/go-discover/discover/v5wire
github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous
github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/db
github.com/waku-org/go-libp2p-rendezvous/pb github.com/waku-org/go-libp2p-rendezvous/pb
# github.com/waku-org/go-waku v0.8.1-0.20240628140035-3604bf39ad28 # github.com/waku-org/go-waku v0.8.1-0.20240701141800-5b5ea977afe0
## explicit; go 1.21 ## explicit; go 1.21
github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/logging
github.com/waku-org/go-waku/tests github.com/waku-org/go-waku/tests

View File

@ -3,6 +3,9 @@ package wakuv2
import ( import (
"context" "context"
"sync" "sync"
"time"
"github.com/google/uuid"
"github.com/status-im/status-go/wakuv2/common" "github.com/status-im/status-go/wakuv2/common"
@ -15,34 +18,39 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/filter"
) )
// Methods on FilterManager maintain filter peer health // Methods on FilterManager just aggregate filters from application and subscribe to them
// //
// runFilterLoop is the main event loop // startFilterSubLoop runs a loop where-in it waits for an interval to batch subscriptions
// //
// Filter Install/Uninstall events are pushed onto eventChan // runFilterSubscriptionLoop runs a loop for receiving messages from underlying subscriptions and invokes onNewEnvelopes
// Subscribe, UnsubscribeWithSubscription, IsSubscriptionAlive calls
// are invoked from goroutines and request results pushed onto eventChan
// //
// filterSubs is the map of filter IDs to subscriptions // filterConfigs is the map of filer IDs to filter configs
// filterSubscriptions is the map of filter subscription IDs to subscriptions
const filterSubBatchSize = 90
type appFilterMap map[string]filterConfig
type FilterManager struct { type FilterManager struct {
sync.Mutex sync.Mutex
ctx context.Context ctx context.Context
cfg *Config cfg *Config
filters map[string]SubDetails // map of filters to apiSub details onlineChecker *onlinechecker.DefaultOnlineChecker
onNewEnvelopes func(env *protocol.Envelope) error filterSubscriptions map[string]SubDetails // map of aggregated filters to apiSub details
logger *zap.Logger onNewEnvelopes func(env *protocol.Envelope) error
node *filter.WakuFilterLightNode logger *zap.Logger
onlineChecker *onlinechecker.DefaultOnlineChecker node *filter.WakuFilterLightNode
filterQueue chan filterConfig filterSubBatchDuration time.Duration
incompleteFilterBatch map[string]filterConfig
filterConfigs appFilterMap // map of application filterID to {aggregatedFilterID, application ContentFilter}
waitingToSubQueue chan filterConfig
} }
type SubDetails struct { type SubDetails struct {
cancel func() cancel func()
sub *api.Sub sub *api.Sub
} }
const filterQueueSize = 1000
type filterConfig struct { type filterConfig struct {
ID string ID string
contentFilter protocol.ContentFilter contentFilter protocol.ContentFilter
@ -55,62 +63,131 @@ func newFilterManager(ctx context.Context, logger *zap.Logger, cfg *Config, onNe
mgr.logger = logger mgr.logger = logger
mgr.cfg = cfg mgr.cfg = cfg
mgr.onNewEnvelopes = onNewEnvelopes mgr.onNewEnvelopes = onNewEnvelopes
mgr.filters = make(map[string]SubDetails) mgr.filterSubscriptions = make(map[string]SubDetails)
mgr.node = node mgr.node = node
mgr.filterQueue = make(chan filterConfig, filterQueueSize)
mgr.onlineChecker = onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker) mgr.onlineChecker = onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker)
mgr.node.SetOnlineChecker(mgr.onlineChecker) mgr.node.SetOnlineChecker(mgr.onlineChecker)
mgr.filterSubBatchDuration = 5 * time.Second
mgr.incompleteFilterBatch = make(map[string]filterConfig)
mgr.filterConfigs = make(appFilterMap)
mgr.waitingToSubQueue = make(chan filterConfig, 100)
go mgr.startFilterSubLoop()
return mgr return mgr
} }
func (mgr *FilterManager) startFilterSubLoop() {
ticker := time.NewTicker(mgr.filterSubBatchDuration)
defer ticker.Stop()
for {
select {
case <-mgr.ctx.Done():
return
case <-ticker.C:
// TODO: Optimization, handle case where 1st addFilter happens just before ticker expires.
if mgr.onlineChecker.IsOnline() {
mgr.Lock()
for _, af := range mgr.incompleteFilterBatch {
mgr.logger.Debug("ticker hit, hence subscribing", zap.String("agg-filter-id", af.ID), zap.Int("batch-size", len(af.contentFilter.ContentTopics)),
zap.Stringer("agg-content-filter", af.contentFilter))
go mgr.subscribeAndRunLoop(af)
}
mgr.incompleteFilterBatch = make(map[string]filterConfig)
mgr.Unlock()
}
}
}
}
/*
addFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch
once batchlimit is hit, all filters are subscribed to and new batch is created.
if node is not online, then batch is pushed to a queue to be picked up later for subscription and new batch is created
*/
func (mgr *FilterManager) addFilter(filterID string, f *common.Filter) { func (mgr *FilterManager) addFilter(filterID string, f *common.Filter) {
mgr.logger.Debug("adding filter", zap.String("filter-id", filterID))
mgr.Lock() mgr.Lock()
defer mgr.Unlock() defer mgr.Unlock()
contentFilter := mgr.buildContentFilter(f.PubsubTopic, f.ContentTopics)
mgr.logger.Debug("adding filter", zap.String("filter-id", filterID), zap.Stringer("content-filter", contentFilter))
if mgr.onlineChecker.IsOnline() { afilter, ok := mgr.incompleteFilterBatch[f.PubsubTopic]
go mgr.subscribeAndRunLoop(filterConfig{filterID, contentFilter}) if !ok {
//no existing batch for pubsubTopic
mgr.logger.Debug("new pubsubTopic batch", zap.String("topic", f.PubsubTopic))
cf := mgr.buildContentFilter(f.PubsubTopic, f.ContentTopics)
afilter = filterConfig{uuid.NewString(), cf}
mgr.incompleteFilterBatch[f.PubsubTopic] = afilter
mgr.filterConfigs[filterID] = filterConfig{afilter.ID, cf}
} else { } else {
mgr.logger.Debug("queuing filter as not online", zap.String("filter-id", filterID), zap.Stringer("content-filter", contentFilter)) mgr.logger.Debug("existing pubsubTopic batch", zap.String("agg-filter-id", afilter.ID), zap.String("topic", f.PubsubTopic))
mgr.filterQueue <- filterConfig{filterID, contentFilter} if len(afilter.contentFilter.ContentTopics)+len(f.ContentTopics) > filterSubBatchSize {
//filter batch limit is hit
if mgr.onlineChecker.IsOnline() {
//node is online, go ahead and subscribe the batch
mgr.logger.Debug("crossed pubsubTopic batchsize and online, subscribing to filters", zap.String("agg-filter-id", afilter.ID), zap.String("topic", f.PubsubTopic), zap.Int("batch-size", len(afilter.contentFilter.ContentTopics)+len(f.ContentTopics)))
go mgr.subscribeAndRunLoop(afilter)
} else {
mgr.logger.Debug("crossed pubsubTopic batchsize and offline, queuing filters", zap.String("agg-filter-id", afilter.ID), zap.String("topic", f.PubsubTopic), zap.Int("batch-size", len(afilter.contentFilter.ContentTopics)+len(f.ContentTopics)))
// queue existing batch as node is not online
mgr.waitingToSubQueue <- afilter
}
cf := mgr.buildContentFilter(f.PubsubTopic, f.ContentTopics)
afilter = filterConfig{uuid.NewString(), cf}
mgr.logger.Debug("creating a new pubsubTopic batch", zap.String("agg-filter-id", afilter.ID), zap.String("topic", f.PubsubTopic), zap.Stringer("content-filter", cf))
mgr.incompleteFilterBatch[f.PubsubTopic] = afilter
mgr.filterConfigs[filterID] = filterConfig{afilter.ID, cf}
} else {
//add to existing batch as batch limit not reached
var contentTopics []string
for _, ct := range maps.Keys(f.ContentTopics) {
afilter.contentFilter.ContentTopics[ct.ContentTopic()] = struct{}{}
contentTopics = append(contentTopics, ct.ContentTopic())
}
cf := protocol.NewContentFilter(f.PubsubTopic, contentTopics...)
mgr.logger.Debug("adding to existing pubsubTopic batch", zap.String("agg-filter-id", afilter.ID), zap.Stringer("content-filter", cf), zap.Int("batch-size", len(afilter.contentFilter.ContentTopics)))
mgr.filterConfigs[filterID] = filterConfig{afilter.ID, cf}
}
} }
} }
func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) { func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) {
ctx, cancel := context.WithCancel(mgr.ctx) ctx, cancel := context.WithCancel(mgr.ctx)
config := api.FilterConfig{MaxPeers: mgr.cfg.MinPeersForFilter} config := api.FilterConfig{MaxPeers: mgr.cfg.MinPeersForFilter}
sub, err := api.Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger) sub, err := api.Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger)
mgr.Lock() mgr.Lock()
mgr.filters[f.ID] = SubDetails{cancel, sub} mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub}
mgr.Unlock() mgr.Unlock()
if err == nil { if err == nil {
mgr.logger.Debug("subscription successful, running loop", zap.String("filter-id", f.ID), zap.Stringer("content-filter", f.contentFilter)) mgr.logger.Debug("subscription successful, running loop", zap.String("agg-filter-id", f.ID), zap.Stringer("content-filter", f.contentFilter))
mgr.runFilterSubscriptionLoop(sub) mgr.runFilterSubscriptionLoop(sub)
} else { } else {
mgr.logger.Error("subscription fail, need to debug issue", zap.String("filter-id", f.ID), zap.Stringer("content-filter", f.contentFilter), zap.Error(err)) mgr.logger.Error("subscription fail, need to debug issue", zap.String("agg-filter-id", f.ID), zap.Stringer("content-filter", f.contentFilter), zap.Error(err))
} }
} }
func (mgr *FilterManager) onConnectionStatusChange(pubsubTopic string, newStatus bool) { func (mgr *FilterManager) onConnectionStatusChange(pubsubTopic string, newStatus bool) {
mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus), mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus),
zap.Int("filters count", len(mgr.filters)), zap.Int("filter-queue-len", len(mgr.filterQueue))) zap.Int("agg filters count", len(mgr.filterSubscriptions)))
//TODO: Needs optimization because only on transition from offline to online should trigger this logic. if newStatus && !mgr.onlineChecker.IsOnline() { //switched from offline to Online
if newStatus { //Online mgr.logger.Debug("switching from offline to online")
if len(mgr.filterQueue) > 0 { mgr.Lock()
//Check if any filter subs are pending and subscribe them if len(mgr.waitingToSubQueue) > 0 {
for filter := range mgr.filterQueue { for af := range mgr.waitingToSubQueue {
mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", filter.ID), zap.Stringer("content-filter", filter.contentFilter)) // TODO: change the below logic once topic specific health is implemented for lightClients
go mgr.subscribeAndRunLoop(filter) if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic {
if len(mgr.filterQueue) == 0 { // Check if any filter subs are pending and subscribe them
mgr.logger.Debug("filter queue empty") mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter))
go mgr.subscribeAndRunLoop(af)
} else {
// TODO: Can this cause issues?
mgr.waitingToSubQueue <- af
}
if len(mgr.waitingToSubQueue) == 0 {
mgr.logger.Debug("no pending subscriptions")
break break
} }
} }
} }
mgr.Unlock()
} }
mgr.onlineChecker.SetOnline(newStatus) mgr.onlineChecker.SetOnline(newStatus)
@ -120,15 +197,24 @@ func (mgr *FilterManager) removeFilter(filterID string) {
mgr.Lock() mgr.Lock()
defer mgr.Unlock() defer mgr.Unlock()
mgr.logger.Debug("removing filter", zap.String("filter-id", filterID)) mgr.logger.Debug("removing filter", zap.String("filter-id", filterID))
filterConfig, ok := mgr.filterConfigs[filterID]
subDetails, ok := mgr.filters[filterID] if !ok {
if ok {
delete(mgr.filters, filterID)
// close goroutine running runFilterSubscriptionLoop
// this will also close api.Sub
subDetails.cancel()
} else {
mgr.logger.Debug("filter removal: filter not found", zap.String("filter-id", filterID)) mgr.logger.Debug("filter removal: filter not found", zap.String("filter-id", filterID))
return
}
af, ok := mgr.filterSubscriptions[filterConfig.ID]
if ok {
delete(mgr.filterConfigs, filterID)
for ct := range filterConfig.contentFilter.ContentTopics {
delete(af.sub.ContentFilter.ContentTopics, ct)
}
if len(af.sub.ContentFilter.ContentTopics) == 0 {
af.cancel()
} else {
go af.sub.Unsubscribe(filterConfig.contentFilter)
}
} else {
mgr.logger.Debug("filter removal: aggregated filter not found", zap.String("filter-id", filterID), zap.String("agg-filter-id", filterConfig.ID))
} }
} }

View File

@ -343,7 +343,7 @@ func TestWakuV2Filter(t *testing.T) {
w, err := New(nil, "", config, nil, nil, nil, nil, nil) w, err := New(nil, "", config, nil, nil, nil, nil, nil)
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, w.Start()) require.NoError(t, w.Start())
w.filterManager.filterSubBatchDuration = 1 * time.Second
options := func(b *backoff.ExponentialBackOff) { options := func(b *backoff.ExponentialBackOff) {
b.MaxElapsedTime = 10 * time.Second b.MaxElapsedTime = 10 * time.Second
} }
@ -371,7 +371,7 @@ func TestWakuV2Filter(t *testing.T) {
ContentTopics: common.NewTopicSetFromBytes([][]byte{contentTopicBytes}), ContentTopics: common.NewTopicSetFromBytes([][]byte{contentTopicBytes}),
} }
_, err = w.Subscribe(filter) fID, err := w.Subscribe(filter)
require.NoError(t, err) require.NoError(t, err)
msgTimestamp := w.timestamp() msgTimestamp := w.timestamp()
@ -415,7 +415,8 @@ func TestWakuV2Filter(t *testing.T) {
messages = filter.Retrieve() messages = filter.Retrieve()
require.Len(t, messages, 1) require.Len(t, messages, 1)
err = w.Unsubscribe(context.Background(), fID)
require.NoError(t, err)
require.NoError(t, w.Stop()) require.NoError(t, w.Stop())
} }