From f2f32c42b013f771ebddbfd86f829de99e50c549 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Fri, 10 Jan 2025 23:19:21 +0530 Subject: [PATCH] fix_: handle network change by resubscribing waku filters (#6232) --- go.mod | 2 +- go.sum | 4 +- .../waku/v2/api/filter/filter_manager.go | 108 ++++++++++++++---- .../go-waku/waku/v2/protocol/filter/client.go | 2 +- vendor/modules.txt | 2 +- wakuv2/waku.go | 2 +- 6 files changed, 94 insertions(+), 26 deletions(-) diff --git a/go.mod b/go.mod index fbbb53549..77fdac5f8 100644 --- a/go.mod +++ b/go.mod @@ -97,7 +97,7 @@ require ( github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20241224085853-c0afa070a376 + github.com/waku-org/go-waku v0.8.1-0.20250103101727-4ef460cb951a github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 diff --git a/go.sum b/go.sum index f3cfc649e..1d9120e07 100644 --- a/go.sum +++ b/go.sum @@ -2152,8 +2152,8 @@ github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27 github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27f/go.mod h1:Oi0zw9aw8/Y5GC99zt+Ef2gYAl+0nZlwdJonDyOz/sE= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20241224085853-c0afa070a376 h1:KAnNhZAxwjPoCK5danoSSZwwerOtiA/WIz2qVHdiZtU= -github.com/waku-org/go-waku v0.8.1-0.20241224085853-c0afa070a376/go.mod h1:zYhLgqwBE3sGP2vP+aNiM5moOKlf/uSoIv36puAj9WI= +github.com/waku-org/go-waku v0.8.1-0.20250103101727-4ef460cb951a h1:20W3RwuJvLWEtXxXZ7RWwoVfTQJtJrjde3qQETP9QMs= +github.com/waku-org/go-waku v0.8.1-0.20250103101727-4ef460cb951a/go.mod h1:zYhLgqwBE3sGP2vP+aNiM5moOKlf/uSoIv36puAj9WI= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go index 665d577bd..66762238f 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter_manager.go @@ -26,6 +26,7 @@ import ( // filterSubscriptions is the map of filter subscription IDs to subscriptions const filterSubBatchSize = 90 +const initNetworkConnType = 255 type appFilterMap map[string]filterConfig @@ -43,6 +44,7 @@ type FilterManager struct { filterConfigs appFilterMap // map of application filterID to {aggregatedFilterID, application ContentFilter} waitingToSubQueue chan filterConfig envProcessor EnevelopeProcessor + networkConnType byte } type SubDetails struct { @@ -76,6 +78,7 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter mgr.incompleteFilterBatch = make(map[string]filterConfig) mgr.filterConfigs = make(appFilterMap) mgr.waitingToSubQueue = make(chan filterConfig, 100) + mgr.networkConnType = initNetworkConnType //parsing the subscribe params only to read the batchInterval passed. mgr.params = new(subscribeParameters) @@ -114,8 +117,8 @@ func (mgr *FilterManager) startFilterSubLoop() { } } -// addFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch -// once batchlimit is hit, all filters are subscribed to and new batch is created. +// SubscribeFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch +// once batch-limit is hit, all filters are subscribed to and new batch is created. // if node is not online, then batch is pushed to a queue to be picked up later for subscription and new batch is created func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFilter) { @@ -182,37 +185,102 @@ func (mgr *FilterManager) NetworkChange() { mgr.node.PingPeers() // ping all peers to check if subscriptions are alive } +func (mgr *FilterManager) checkAndProcessQueue(pubsubTopic string) { + if len(mgr.waitingToSubQueue) > 0 { + for af := range mgr.waitingToSubQueue { + // TODO: change the below logic once topic specific health is implemented for lightClients + if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic { + // check if any filter subs are pending and subscribe them + mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter)) + go mgr.subscribeAndRunLoop(af) + } else { + mgr.waitingToSubQueue <- af + } + if len(mgr.waitingToSubQueue) == 0 { + mgr.logger.Debug("no pending subscriptions") + break + } + } + } +} + +func (mgr *FilterManager) closeAndWait(wg *sync.WaitGroup, asub *SubDetails) { + defer wg.Done() + asub.cancel() + for { + env, ok := <-asub.sub.DataCh + if !ok { + mgr.logger.Debug("unsubscribed filter", zap.Strings("content-topics", asub.sub.ContentFilter.ContentTopics.ToList())) + return + } + // process any in-flight envelopes + err := mgr.envProcessor.OnNewEnvelope(env) + if err != nil { + mgr.logger.Error("invoking onNewEnvelopes error", zap.Error(err)) + } + } +} + +func (mgr *FilterManager) resubscribeAllSubscriptions() { + filterSubsCount := len(mgr.filterSubscriptions) + mgr.Lock() + mgr.logger.Debug("unsubscribing all filter subscriptions", zap.Int("subs-count", filterSubsCount)) + var wg sync.WaitGroup + wg.Add(len(mgr.filterSubscriptions)) + + for _, asub := range mgr.filterSubscriptions { + go mgr.closeAndWait(&wg, &asub) + } + mgr.filterSubscriptions = make(map[string]SubDetails) + + mgr.Unlock() + + wg.Wait() //Waiting till all unsubs are done to avoid race between sub and unsub + + mgr.logger.Debug("unsubscribed all filter subscriptions", zap.Int("subs-count", filterSubsCount)) + + // locking to protect filterConfigs map, can't lock while calling subscribe as same lock is acquired inside subscribe + mgr.Lock() + localMap := make(appFilterMap) + for filterID, config := range mgr.filterConfigs { + localMap[filterID] = config + } + mgr.Unlock() + + for filterID, config := range localMap { + mgr.SubscribeFilter(filterID, config.contentFilter) + } + +} + // OnConnectionStatusChange to be triggered when connection status change is detected either from offline to online or vice-versa // Note that pubsubTopic specific change can be triggered by specifying pubsubTopic, // if pubsubTopic is empty it indicates complete connection status change such as node went offline or came back online. -func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus bool) { +func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus bool, connType byte) { subs := mgr.node.Subscriptions() mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus), zap.Int("agg filters count", len(mgr.filterSubscriptions)), zap.Int("filter subs count", len(subs))) + /* + Checking for initialization condition because when filterManager is initialized networkConnType is set to 255 and when first time node goes online + the network conn type will be set and will trigger resubscribe which is not desired. + Change in connType refers to scenario where the localnode's network has changed e.g: a mobile switching between wifi and cellular, + this in-turn means ip address of the localnode has changed. + this can cause issues in filter-push where it never recovers and hence resubscribing all filters + */ + if mgr.networkConnType != initNetworkConnType && + mgr.networkConnType != connType { // + // resubscribe all existing filters + go mgr.resubscribeAllSubscriptions() + } if newStatus && !mgr.onlineChecker.IsOnline() { // switched from offline to Online mgr.onlineChecker.SetOnline(newStatus) mgr.NetworkChange() mgr.logger.Debug("switching from offline to online") mgr.Lock() - if len(mgr.waitingToSubQueue) > 0 { - for af := range mgr.waitingToSubQueue { - // TODO: change the below logic once topic specific health is implemented for lightClients - if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic { - // check if any filter subs are pending and subscribe them - mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter)) - go mgr.subscribeAndRunLoop(af) - } else { - mgr.waitingToSubQueue <- af - } - if len(mgr.waitingToSubQueue) == 0 { - mgr.logger.Debug("no pending subscriptions") - break - } - } - } + mgr.checkAndProcessQueue(pubsubTopic) mgr.Unlock() } - + mgr.networkConnType = connType mgr.onlineChecker.SetOnline(newStatus) } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go index 8fbcd91c1..c1e762e9b 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go @@ -175,7 +175,7 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea } if !wf.subscriptions.IsSubscribedTo(peerID) { - logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID)) + logger.Warn("received message push from unknown peer") wf.metrics.RecordError(unknownPeerMessagePush) //Send a wildcard unsubscribe to this peer so that further requests are not forwarded to us if err := stream.Reset(); err != nil { diff --git a/vendor/modules.txt b/vendor/modules.txt index 90e13ffa1..3d17156b7 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1044,7 +1044,7 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20241224085853-c0afa070a376 +# github.com/waku-org/go-waku v0.8.1-0.20250103101727-4ef460cb951a ## explicit; go 1.21 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/tests diff --git a/wakuv2/waku.go b/wakuv2/waku.go index e7132662f..d381d679c 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -1740,7 +1740,7 @@ func (w *Waku) ConnectionChanged(state connection.State) { //TODO: Update this as per https://github.com/waku-org/go-waku/issues/1114 go func() { defer gocommon.LogOnPanic() - w.filterManager.OnConnectionStatusChange("", isOnline) + w.filterManager.OnConnectionStatusChange("", isOnline, byte(state.Type)) }() w.handleNetworkChangeFromApp(state) } else {