From 2dd4165122a8c46f1dcadaa15e51c38d98e9fee2 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Wed, 25 Dec 2024 11:00:09 +0530 Subject: [PATCH] fix: handle network change scenario and resubscribe all filters --- waku/v2/api/filter/filter_manager.go | 65 +++++++++++++++++++--------- waku/v2/api/filter/filter_test.go | 4 +- waku/v2/protocol/filter/client.go | 2 +- 3 files changed, 48 insertions(+), 23 deletions(-) diff --git a/waku/v2/api/filter/filter_manager.go b/waku/v2/api/filter/filter_manager.go index 665d577b..b7754251 100644 --- a/waku/v2/api/filter/filter_manager.go +++ b/waku/v2/api/filter/filter_manager.go @@ -26,6 +26,7 @@ import ( // filterSubscriptions is the map of filter subscription IDs to subscriptions const filterSubBatchSize = 90 +const initNetworkConnType = 255 type appFilterMap map[string]filterConfig @@ -43,6 +44,7 @@ type FilterManager struct { filterConfigs appFilterMap // map of application filterID to {aggregatedFilterID, application ContentFilter} waitingToSubQueue chan filterConfig envProcessor EnevelopeProcessor + networkConnType byte } type SubDetails struct { @@ -76,6 +78,7 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter mgr.incompleteFilterBatch = make(map[string]filterConfig) mgr.filterConfigs = make(appFilterMap) mgr.waitingToSubQueue = make(chan filterConfig, 100) + mgr.networkConnType = initNetworkConnType //parsing the subscribe params only to read the batchInterval passed. mgr.params = new(subscribeParameters) @@ -114,8 +117,8 @@ func (mgr *FilterManager) startFilterSubLoop() { } } -// addFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch -// once batchlimit is hit, all filters are subscribed to and new batch is created. +// SubscribeFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch +// once batch-limit is hit, all filters are subscribed to and new batch is created. // if node is not online, then batch is pushed to a queue to be picked up later for subscription and new batch is created func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFilter) { @@ -182,37 +185,59 @@ func (mgr *FilterManager) NetworkChange() { mgr.node.PingPeers() // ping all peers to check if subscriptions are alive } +func (mgr *FilterManager) checkAndProcessQueue(pubsubTopic string) { + if len(mgr.waitingToSubQueue) > 0 { + for af := range mgr.waitingToSubQueue { + // TODO: change the below logic once topic specific health is implemented for lightClients + if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic { + // check if any filter subs are pending and subscribe them + mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter)) + go mgr.subscribeAndRunLoop(af) + } else { + mgr.waitingToSubQueue <- af + } + if len(mgr.waitingToSubQueue) == 0 { + mgr.logger.Debug("no pending subscriptions") + break + } + } + } +} + +func (mgr *FilterManager) resubscribeAllSubscriptions() { + mgr.Lock() + mgr.logger.Debug("unsubscribing all filter subscriptions", zap.Int("subs-count", len(mgr.filterSubscriptions))) + for _, asub := range mgr.filterSubscriptions { + asub.sub.cancel() + } + mgr.Unlock() + // TODO: protect range with lock + for filterID, config := range mgr.filterConfigs { + mgr.SubscribeFilter(filterID, config.contentFilter) + } +} + // OnConnectionStatusChange to be triggered when connection status change is detected either from offline to online or vice-versa // Note that pubsubTopic specific change can be triggered by specifying pubsubTopic, // if pubsubTopic is empty it indicates complete connection status change such as node went offline or came back online. -func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus bool) { +func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus bool, connType byte) { subs := mgr.node.Subscriptions() mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus), zap.Int("agg filters count", len(mgr.filterSubscriptions)), zap.Int("filter subs count", len(subs))) + if mgr.networkConnType != initNetworkConnType && //checking for initialization condition + mgr.networkConnType != connType { // this means ip address of the node has changed which can cause issues in filter-push and hence resubscribing all filters + // resubscribe all existing filters + mgr.resubscribeAllSubscriptions() + } if newStatus && !mgr.onlineChecker.IsOnline() { // switched from offline to Online mgr.onlineChecker.SetOnline(newStatus) mgr.NetworkChange() mgr.logger.Debug("switching from offline to online") mgr.Lock() - if len(mgr.waitingToSubQueue) > 0 { - for af := range mgr.waitingToSubQueue { - // TODO: change the below logic once topic specific health is implemented for lightClients - if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic { - // check if any filter subs are pending and subscribe them - mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter)) - go mgr.subscribeAndRunLoop(af) - } else { - mgr.waitingToSubQueue <- af - } - if len(mgr.waitingToSubQueue) == 0 { - mgr.logger.Debug("no pending subscriptions") - break - } - } - } + mgr.checkAndProcessQueue(pubsubTopic) mgr.Unlock() } - + mgr.networkConnType = connType mgr.onlineChecker.SetOnline(newStatus) } diff --git a/waku/v2/api/filter/filter_test.go b/waku/v2/api/filter/filter_test.go index 8a5f2d40..8a720ea6 100644 --- a/waku/v2/api/filter/filter_test.go +++ b/waku/v2/api/filter/filter_test.go @@ -161,9 +161,9 @@ func (s *FilterApiTestSuite) TestFilterManager() { // Mock peers going down s.LightNodeHost.Peerstore().RemovePeer(s.FullNodeHost.ID()) - fm.OnConnectionStatusChange("", false) + fm.OnConnectionStatusChange("", false, 0) time.Sleep(2 * time.Second) - fm.OnConnectionStatusChange("", true) + fm.OnConnectionStatusChange("", true, 0) s.ConnectToFullNode(s.LightNode, s.FullNode) time.Sleep(3 * time.Second) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 8fbcd91c..c1e762e9 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -175,7 +175,7 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea } if !wf.subscriptions.IsSubscribedTo(peerID) { - logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID)) + logger.Warn("received message push from unknown peer") wf.metrics.RecordError(unknownPeerMessagePush) //Send a wildcard unsubscribe to this peer so that further requests are not forwarded to us if err := stream.Reset(); err != nil {