fix_: handle network change by resubscribing waku filters (#6232)

This commit is contained in:
Prem Chaitanya Prathi 2025-01-10 23:19:21 +05:30 committed by GitHub
parent 4faaa3e55f
commit f2f32c42b0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 94 additions and 26 deletions

2
go.mod
View File

@ -97,7 +97,7 @@ require (
github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/urfave/cli/v2 v2.27.2
github.com/waku-org/go-waku v0.8.1-0.20241224085853-c0afa070a376
github.com/waku-org/go-waku v0.8.1-0.20250103101727-4ef460cb951a
github.com/wk8/go-ordered-map/v2 v2.1.7
github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1

4
go.sum
View File

@ -2152,8 +2152,8 @@ github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27
github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27f/go.mod h1:Oi0zw9aw8/Y5GC99zt+Ef2gYAl+0nZlwdJonDyOz/sE=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
github.com/waku-org/go-waku v0.8.1-0.20241224085853-c0afa070a376 h1:KAnNhZAxwjPoCK5danoSSZwwerOtiA/WIz2qVHdiZtU=
github.com/waku-org/go-waku v0.8.1-0.20241224085853-c0afa070a376/go.mod h1:zYhLgqwBE3sGP2vP+aNiM5moOKlf/uSoIv36puAj9WI=
github.com/waku-org/go-waku v0.8.1-0.20250103101727-4ef460cb951a h1:20W3RwuJvLWEtXxXZ7RWwoVfTQJtJrjde3qQETP9QMs=
github.com/waku-org/go-waku v0.8.1-0.20250103101727-4ef460cb951a/go.mod h1:zYhLgqwBE3sGP2vP+aNiM5moOKlf/uSoIv36puAj9WI=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=

View File

@ -26,6 +26,7 @@ import (
// filterSubscriptions is the map of filter subscription IDs to subscriptions
const filterSubBatchSize = 90
const initNetworkConnType = 255
type appFilterMap map[string]filterConfig
@ -43,6 +44,7 @@ type FilterManager struct {
filterConfigs appFilterMap // map of application filterID to {aggregatedFilterID, application ContentFilter}
waitingToSubQueue chan filterConfig
envProcessor EnevelopeProcessor
networkConnType byte
}
type SubDetails struct {
@ -76,6 +78,7 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter
mgr.incompleteFilterBatch = make(map[string]filterConfig)
mgr.filterConfigs = make(appFilterMap)
mgr.waitingToSubQueue = make(chan filterConfig, 100)
mgr.networkConnType = initNetworkConnType
//parsing the subscribe params only to read the batchInterval passed.
mgr.params = new(subscribeParameters)
@ -114,8 +117,8 @@ func (mgr *FilterManager) startFilterSubLoop() {
}
}
// addFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch
// once batchlimit is hit, all filters are subscribed to and new batch is created.
// SubscribeFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch
// once batch-limit is hit, all filters are subscribed to and new batch is created.
// if node is not online, then batch is pushed to a queue to be picked up later for subscription and new batch is created
func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFilter) {
@ -182,37 +185,102 @@ func (mgr *FilterManager) NetworkChange() {
mgr.node.PingPeers() // ping all peers to check if subscriptions are alive
}
func (mgr *FilterManager) checkAndProcessQueue(pubsubTopic string) {
if len(mgr.waitingToSubQueue) > 0 {
for af := range mgr.waitingToSubQueue {
// TODO: change the below logic once topic specific health is implemented for lightClients
if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic {
// check if any filter subs are pending and subscribe them
mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter))
go mgr.subscribeAndRunLoop(af)
} else {
mgr.waitingToSubQueue <- af
}
if len(mgr.waitingToSubQueue) == 0 {
mgr.logger.Debug("no pending subscriptions")
break
}
}
}
}
func (mgr *FilterManager) closeAndWait(wg *sync.WaitGroup, asub *SubDetails) {
defer wg.Done()
asub.cancel()
for {
env, ok := <-asub.sub.DataCh
if !ok {
mgr.logger.Debug("unsubscribed filter", zap.Strings("content-topics", asub.sub.ContentFilter.ContentTopics.ToList()))
return
}
// process any in-flight envelopes
err := mgr.envProcessor.OnNewEnvelope(env)
if err != nil {
mgr.logger.Error("invoking onNewEnvelopes error", zap.Error(err))
}
}
}
func (mgr *FilterManager) resubscribeAllSubscriptions() {
filterSubsCount := len(mgr.filterSubscriptions)
mgr.Lock()
mgr.logger.Debug("unsubscribing all filter subscriptions", zap.Int("subs-count", filterSubsCount))
var wg sync.WaitGroup
wg.Add(len(mgr.filterSubscriptions))
for _, asub := range mgr.filterSubscriptions {
go mgr.closeAndWait(&wg, &asub)
}
mgr.filterSubscriptions = make(map[string]SubDetails)
mgr.Unlock()
wg.Wait() //Waiting till all unsubs are done to avoid race between sub and unsub
mgr.logger.Debug("unsubscribed all filter subscriptions", zap.Int("subs-count", filterSubsCount))
// locking to protect filterConfigs map, can't lock while calling subscribe as same lock is acquired inside subscribe
mgr.Lock()
localMap := make(appFilterMap)
for filterID, config := range mgr.filterConfigs {
localMap[filterID] = config
}
mgr.Unlock()
for filterID, config := range localMap {
mgr.SubscribeFilter(filterID, config.contentFilter)
}
}
// OnConnectionStatusChange to be triggered when connection status change is detected either from offline to online or vice-versa
// Note that pubsubTopic specific change can be triggered by specifying pubsubTopic,
// if pubsubTopic is empty it indicates complete connection status change such as node went offline or came back online.
func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus bool) {
func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus bool, connType byte) {
subs := mgr.node.Subscriptions()
mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus),
zap.Int("agg filters count", len(mgr.filterSubscriptions)), zap.Int("filter subs count", len(subs)))
/*
Checking for initialization condition because when filterManager is initialized networkConnType is set to 255 and when first time node goes online
the network conn type will be set and will trigger resubscribe which is not desired.
Change in connType refers to scenario where the localnode's network has changed e.g: a mobile switching between wifi and cellular,
this in-turn means ip address of the localnode has changed.
this can cause issues in filter-push where it never recovers and hence resubscribing all filters
*/
if mgr.networkConnType != initNetworkConnType &&
mgr.networkConnType != connType { //
// resubscribe all existing filters
go mgr.resubscribeAllSubscriptions()
}
if newStatus && !mgr.onlineChecker.IsOnline() { // switched from offline to Online
mgr.onlineChecker.SetOnline(newStatus)
mgr.NetworkChange()
mgr.logger.Debug("switching from offline to online")
mgr.Lock()
if len(mgr.waitingToSubQueue) > 0 {
for af := range mgr.waitingToSubQueue {
// TODO: change the below logic once topic specific health is implemented for lightClients
if pubsubTopic == "" || pubsubTopic == af.contentFilter.PubsubTopic {
// check if any filter subs are pending and subscribe them
mgr.logger.Debug("subscribing from filter queue", zap.String("filter-id", af.ID), zap.Stringer("content-filter", af.contentFilter))
go mgr.subscribeAndRunLoop(af)
} else {
mgr.waitingToSubQueue <- af
}
if len(mgr.waitingToSubQueue) == 0 {
mgr.logger.Debug("no pending subscriptions")
break
}
}
}
mgr.checkAndProcessQueue(pubsubTopic)
mgr.Unlock()
}
mgr.networkConnType = connType
mgr.onlineChecker.SetOnline(newStatus)
}

View File

@ -175,7 +175,7 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea
}
if !wf.subscriptions.IsSubscribedTo(peerID) {
logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID))
logger.Warn("received message push from unknown peer")
wf.metrics.RecordError(unknownPeerMessagePush)
//Send a wildcard unsubscribe to this peer so that further requests are not forwarded to us
if err := stream.Reset(); err != nil {

2
vendor/modules.txt vendored
View File

@ -1044,7 +1044,7 @@ github.com/waku-org/go-discover/discover/v5wire
github.com/waku-org/go-libp2p-rendezvous
github.com/waku-org/go-libp2p-rendezvous/db
github.com/waku-org/go-libp2p-rendezvous/pb
# github.com/waku-org/go-waku v0.8.1-0.20241224085853-c0afa070a376
# github.com/waku-org/go-waku v0.8.1-0.20250103101727-4ef460cb951a
## explicit; go 1.21
github.com/waku-org/go-waku/logging
github.com/waku-org/go-waku/tests

View File

@ -1740,7 +1740,7 @@ func (w *Waku) ConnectionChanged(state connection.State) {
//TODO: Update this as per https://github.com/waku-org/go-waku/issues/1114
go func() {
defer gocommon.LogOnPanic()
w.filterManager.OnConnectionStatusChange("", isOnline)
w.filterManager.OnConnectionStatusChange("", isOnline, byte(state.Type))
}()
w.handleNetworkChangeFromApp(state)
} else {