mirror of https://github.com/status-im/go-waku.git
fix: handle network change scenario and resubscribe all filters
This commit is contained in:
parent
5893927f1b
commit
2dd4165122
|
@ -26,6 +26,7 @@ import (
|
||||||
// filterSubscriptions is the map of filter subscription IDs to subscriptions
|
// filterSubscriptions is the map of filter subscription IDs to subscriptions
|
||||||
|
|
||||||
const filterSubBatchSize = 90
|
const filterSubBatchSize = 90
|
||||||
|
const initNetworkConnType = 255
|
||||||
|
|
||||||
type appFilterMap map[string]filterConfig
|
type appFilterMap map[string]filterConfig
|
||||||
|
|
||||||
|
@ -43,6 +44,7 @@ type FilterManager struct {
|
||||||
filterConfigs appFilterMap // map of application filterID to {aggregatedFilterID, application ContentFilter}
|
filterConfigs appFilterMap // map of application filterID to {aggregatedFilterID, application ContentFilter}
|
||||||
waitingToSubQueue chan filterConfig
|
waitingToSubQueue chan filterConfig
|
||||||
envProcessor EnevelopeProcessor
|
envProcessor EnevelopeProcessor
|
||||||
|
networkConnType byte
|
||||||
}
|
}
|
||||||
|
|
||||||
type SubDetails struct {
|
type SubDetails struct {
|
||||||
|
@ -76,6 +78,7 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter
|
||||||
mgr.incompleteFilterBatch = make(map[string]filterConfig)
|
mgr.incompleteFilterBatch = make(map[string]filterConfig)
|
||||||
mgr.filterConfigs = make(appFilterMap)
|
mgr.filterConfigs = make(appFilterMap)
|
||||||
mgr.waitingToSubQueue = make(chan filterConfig, 100)
|
mgr.waitingToSubQueue = make(chan filterConfig, 100)
|
||||||
|
mgr.networkConnType = initNetworkConnType
|
||||||
|
|
||||||
//parsing the subscribe params only to read the batchInterval passed.
|
//parsing the subscribe params only to read the batchInterval passed.
|
||||||
mgr.params = new(subscribeParameters)
|
mgr.params = new(subscribeParameters)
|
||||||
|
@ -114,8 +117,8 @@ func (mgr *FilterManager) startFilterSubLoop() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// addFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch
|
// SubscribeFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch
|
||||||
// once batchlimit is hit, all filters are subscribed to and new batch is created.
|
// once batch-limit is hit, all filters are subscribed to and new batch is created.
|
||||||
// if node is not online, then batch is pushed to a queue to be picked up later for subscription and new batch is created
|
// if node is not online, then batch is pushed to a queue to be picked up later for subscription and new batch is created
|
||||||
|
|
||||||
func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFilter) {
|
func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFilter) {
|
||||||
|
@ -182,18 +185,7 @@ func (mgr *FilterManager) NetworkChange() {
|
||||||
mgr.node.PingPeers() // ping all peers to check if subscriptions are alive
|
mgr.node.PingPeers() // ping all peers to check if subscriptions are alive
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnConnectionStatusChange to be triggered when connection status change is detected either from offline to online or vice-versa
|
func (mgr *FilterManager) checkAndProcessQueue(pubsubTopic string) {
|
||||||
// Note that pubsubTopic specific change can be triggered by specifying pubsubTopic,
|
|
||||||
// if pubsubTopic is empty it indicates complete connection status change such as node went offline or came back online.
|
|
||||||
func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus bool) {
|
|
||||||
subs := mgr.node.Subscriptions()
|
|
||||||
mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus),
|
|
||||||
zap.Int("agg filters count", len(mgr.filterSubscriptions)), zap.Int("filter subs count", len(subs)))
|
|
||||||
if newStatus && !mgr.onlineChecker.IsOnline() { // switched from offline to Online
|
|
||||||
mgr.onlineChecker.SetOnline(newStatus)
|
|
||||||
mgr.NetworkChange()
|
|
||||||
mgr.logger.Debug("switching from offline to online")
|
|
||||||
mgr.Lock()
|
|
||||||
if len(mgr.waitingToSubQueue) > 0 {
|
if len(mgr.waitingToSubQueue) > 0 {
|
||||||
for af := range mgr.waitingToSubQueue {
|
for af := range mgr.waitingToSubQueue {
|
||||||
// TODO: change the below logic once topic specific health is implemented for lightClients
|
// TODO: change the below logic once topic specific health is implemented for lightClients
|
||||||
|
@ -210,9 +202,42 @@ func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
mgr.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mgr *FilterManager) resubscribeAllSubscriptions() {
|
||||||
|
mgr.Lock()
|
||||||
|
mgr.logger.Debug("unsubscribing all filter subscriptions", zap.Int("subs-count", len(mgr.filterSubscriptions)))
|
||||||
|
for _, asub := range mgr.filterSubscriptions {
|
||||||
|
asub.sub.cancel()
|
||||||
|
}
|
||||||
|
mgr.Unlock()
|
||||||
|
// TODO: protect range with lock
|
||||||
|
for filterID, config := range mgr.filterConfigs {
|
||||||
|
mgr.SubscribeFilter(filterID, config.contentFilter)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// OnConnectionStatusChange to be triggered when connection status change is detected either from offline to online or vice-versa
|
||||||
|
// Note that pubsubTopic specific change can be triggered by specifying pubsubTopic,
|
||||||
|
// if pubsubTopic is empty it indicates complete connection status change such as node went offline or came back online.
|
||||||
|
func (mgr *FilterManager) OnConnectionStatusChange(pubsubTopic string, newStatus bool, connType byte) {
|
||||||
|
subs := mgr.node.Subscriptions()
|
||||||
|
mgr.logger.Debug("inside on connection status change", zap.Bool("new-status", newStatus),
|
||||||
|
zap.Int("agg filters count", len(mgr.filterSubscriptions)), zap.Int("filter subs count", len(subs)))
|
||||||
|
if mgr.networkConnType != initNetworkConnType && //checking for initialization condition
|
||||||
|
mgr.networkConnType != connType { // this means ip address of the node has changed which can cause issues in filter-push and hence resubscribing all filters
|
||||||
|
// resubscribe all existing filters
|
||||||
|
mgr.resubscribeAllSubscriptions()
|
||||||
|
}
|
||||||
|
if newStatus && !mgr.onlineChecker.IsOnline() { // switched from offline to Online
|
||||||
|
mgr.onlineChecker.SetOnline(newStatus)
|
||||||
|
mgr.NetworkChange()
|
||||||
|
mgr.logger.Debug("switching from offline to online")
|
||||||
|
mgr.Lock()
|
||||||
|
mgr.checkAndProcessQueue(pubsubTopic)
|
||||||
|
mgr.Unlock()
|
||||||
|
}
|
||||||
|
mgr.networkConnType = connType
|
||||||
mgr.onlineChecker.SetOnline(newStatus)
|
mgr.onlineChecker.SetOnline(newStatus)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -161,9 +161,9 @@ func (s *FilterApiTestSuite) TestFilterManager() {
|
||||||
// Mock peers going down
|
// Mock peers going down
|
||||||
s.LightNodeHost.Peerstore().RemovePeer(s.FullNodeHost.ID())
|
s.LightNodeHost.Peerstore().RemovePeer(s.FullNodeHost.ID())
|
||||||
|
|
||||||
fm.OnConnectionStatusChange("", false)
|
fm.OnConnectionStatusChange("", false, 0)
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
fm.OnConnectionStatusChange("", true)
|
fm.OnConnectionStatusChange("", true, 0)
|
||||||
s.ConnectToFullNode(s.LightNode, s.FullNode)
|
s.ConnectToFullNode(s.LightNode, s.FullNode)
|
||||||
time.Sleep(3 * time.Second)
|
time.Sleep(3 * time.Second)
|
||||||
|
|
||||||
|
|
|
@ -175,7 +175,7 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea
|
||||||
}
|
}
|
||||||
|
|
||||||
if !wf.subscriptions.IsSubscribedTo(peerID) {
|
if !wf.subscriptions.IsSubscribedTo(peerID) {
|
||||||
logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID))
|
logger.Warn("received message push from unknown peer")
|
||||||
wf.metrics.RecordError(unknownPeerMessagePush)
|
wf.metrics.RecordError(unknownPeerMessagePush)
|
||||||
//Send a wildcard unsubscribe to this peer so that further requests are not forwarded to us
|
//Send a wildcard unsubscribe to this peer so that further requests are not forwarded to us
|
||||||
if err := stream.Reset(); err != nil {
|
if err := stream.Reset(); err != nil {
|
||||||
|
|
Loading…
Reference in New Issue