2023-09-13 13:50:23 +03:00
package wakuv2
import (
"context"
"sync"
2024-07-04 10:34:53 +05:30
"time"
"github.com/google/uuid"
2023-09-13 13:50:23 +03:00
"github.com/status-im/status-go/wakuv2/common"
2023-10-12 15:21:49 -04:00
"go.uber.org/zap"
"golang.org/x/exp/maps"
2024-06-14 15:41:45 +03:00
"github.com/waku-org/go-waku/waku/v2/api"
2024-06-28 09:54:48 -04:00
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
2023-09-13 13:50:23 +03:00
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
)
2024-07-04 10:34:53 +05:30
// Methods on FilterManager just aggregate filters from application and subscribe to them
2023-09-13 13:50:23 +03:00
//
2024-07-04 10:34:53 +05:30
// startFilterSubLoop runs a loop where-in it waits for an interval to batch subscriptions
2023-09-13 13:50:23 +03:00
//
2024-07-04 10:34:53 +05:30
// runFilterSubscriptionLoop runs a loop for receiving messages from underlying subscriptions and invokes onNewEnvelopes
2023-09-13 13:50:23 +03:00
//
2024-07-04 10:34:53 +05:30
// filterConfigs is the map of filer IDs to filter configs
// filterSubscriptions is the map of filter subscription IDs to subscriptions
const filterSubBatchSize = 90
type appFilterMap map [ string ] filterConfig
2023-09-13 13:50:23 +03:00
type FilterManager struct {
2024-06-14 15:41:45 +03:00
sync . Mutex
2024-07-04 10:34:53 +05:30
ctx context . Context
cfg * Config
onlineChecker * onlinechecker . DefaultOnlineChecker
filterSubscriptions map [ string ] SubDetails // map of aggregated filters to apiSub details
onNewEnvelopes func ( env * protocol . Envelope ) error
logger * zap . Logger
node * filter . WakuFilterLightNode
filterSubBatchDuration time . Duration
incompleteFilterBatch map [ string ] filterConfig
filterConfigs appFilterMap // map of application filterID to {aggregatedFilterID, application ContentFilter}
waitingToSubQueue chan filterConfig
2024-06-14 15:41:45 +03:00
}
2024-07-04 10:34:53 +05:30
2024-06-14 15:41:45 +03:00
type SubDetails struct {
cancel func ( )
sub * api . Sub
}
type filterConfig struct {
ID string
contentFilter protocol . ContentFilter
2023-09-13 13:50:23 +03:00
}
2024-06-14 15:41:45 +03:00
func newFilterManager ( ctx context . Context , logger * zap . Logger , cfg * Config , onNewEnvelopes func ( env * protocol . Envelope ) error , node * filter . WakuFilterLightNode ) * FilterManager {
2023-09-13 13:50:23 +03:00
// This fn is being mocked in test
mgr := new ( FilterManager )
mgr . ctx = ctx
mgr . logger = logger
2024-06-14 15:41:45 +03:00
mgr . cfg = cfg
2023-09-13 13:50:23 +03:00
mgr . onNewEnvelopes = onNewEnvelopes
2024-07-04 10:34:53 +05:30
mgr . filterSubscriptions = make ( map [ string ] SubDetails )
2023-09-13 13:50:23 +03:00
mgr . node = node
2024-06-28 09:54:48 -04:00
mgr . onlineChecker = onlinechecker . NewDefaultOnlineChecker ( false ) . ( * onlinechecker . DefaultOnlineChecker )
mgr . node . SetOnlineChecker ( mgr . onlineChecker )
2024-07-04 10:34:53 +05:30
mgr . filterSubBatchDuration = 5 * time . Second
mgr . incompleteFilterBatch = make ( map [ string ] filterConfig )
mgr . filterConfigs = make ( appFilterMap )
mgr . waitingToSubQueue = make ( chan filterConfig , 100 )
go mgr . startFilterSubLoop ( )
2023-09-13 13:50:23 +03:00
return mgr
}
2024-07-04 10:34:53 +05:30
func ( mgr * FilterManager ) startFilterSubLoop ( ) {
ticker := time . NewTicker ( mgr . filterSubBatchDuration )
defer ticker . Stop ( )
for {
select {
case <- mgr . ctx . Done ( ) :
return
case <- ticker . C :
// TODO: Optimization, handle case where 1st addFilter happens just before ticker expires.
if mgr . onlineChecker . IsOnline ( ) {
mgr . Lock ( )
for _ , af := range mgr . incompleteFilterBatch {
mgr . logger . Debug ( "ticker hit, hence subscribing" , zap . String ( "agg-filter-id" , af . ID ) , zap . Int ( "batch-size" , len ( af . contentFilter . ContentTopics ) ) ,
zap . Stringer ( "agg-content-filter" , af . contentFilter ) )
go mgr . subscribeAndRunLoop ( af )
}
mgr . incompleteFilterBatch = make ( map [ string ] filterConfig )
mgr . Unlock ( )
}
}
}
}
/ *
addFilter method checks if there are existing waiting filters for the pubsubTopic to be subscribed and adds the new filter to the same batch
once batchlimit is hit , all filters are subscribed to and new batch is created .
if node is not online , then batch is pushed to a queue to be picked up later for subscription and new batch is created
* /
2024-06-14 15:41:45 +03:00
func ( mgr * FilterManager ) addFilter ( filterID string , f * common . Filter ) {
2024-07-04 10:34:53 +05:30
mgr . logger . Debug ( "adding filter" , zap . String ( "filter-id" , filterID ) )
2024-06-14 15:41:45 +03:00
mgr . Lock ( )
defer mgr . Unlock ( )
2023-09-13 13:50:23 +03:00
2024-07-04 10:34:53 +05:30
afilter , ok := mgr . incompleteFilterBatch [ f . PubsubTopic ]
if ! ok {
//no existing batch for pubsubTopic
mgr . logger . Debug ( "new pubsubTopic batch" , zap . String ( "topic" , f . PubsubTopic ) )
cf := mgr . buildContentFilter ( f . PubsubTopic , f . ContentTopics )
afilter = filterConfig { uuid . NewString ( ) , cf }
mgr . incompleteFilterBatch [ f . PubsubTopic ] = afilter
mgr . filterConfigs [ filterID ] = filterConfig { afilter . ID , cf }
2023-09-13 13:50:23 +03:00
} else {
2024-07-04 10:34:53 +05:30
mgr . logger . Debug ( "existing pubsubTopic batch" , zap . String ( "agg-filter-id" , afilter . ID ) , zap . String ( "topic" , f . PubsubTopic ) )
if len ( afilter . contentFilter . ContentTopics ) + len ( f . ContentTopics ) > filterSubBatchSize {
//filter batch limit is hit
if mgr . onlineChecker . IsOnline ( ) {
//node is online, go ahead and subscribe the batch
mgr . logger . Debug ( "crossed pubsubTopic batchsize and online, subscribing to filters" , zap . String ( "agg-filter-id" , afilter . ID ) , zap . String ( "topic" , f . PubsubTopic ) , zap . Int ( "batch-size" , len ( afilter . contentFilter . ContentTopics ) + len ( f . ContentTopics ) ) )
go mgr . subscribeAndRunLoop ( afilter )
} else {
mgr . logger . Debug ( "crossed pubsubTopic batchsize and offline, queuing filters" , zap . String ( "agg-filter-id" , afilter . ID ) , zap . String ( "topic" , f . PubsubTopic ) , zap . Int ( "batch-size" , len ( afilter . contentFilter . ContentTopics ) + len ( f . ContentTopics ) ) )
// queue existing batch as node is not online
mgr . waitingToSubQueue <- afilter
}
cf := mgr . buildContentFilter ( f . PubsubTopic , f . ContentTopics )
afilter = filterConfig { uuid . NewString ( ) , cf }
mgr . logger . Debug ( "creating a new pubsubTopic batch" , zap . String ( "agg-filter-id" , afilter . ID ) , zap . String ( "topic" , f . PubsubTopic ) , zap . Stringer ( "content-filter" , cf ) )
mgr . incompleteFilterBatch [ f . PubsubTopic ] = afilter
mgr . filterConfigs [ filterID ] = filterConfig { afilter . ID , cf }
} else {
//add to existing batch as batch limit not reached
var contentTopics [ ] string
for _ , ct := range maps . Keys ( f . ContentTopics ) {
afilter . contentFilter . ContentTopics [ ct . ContentTopic ( ) ] = struct { } { }
contentTopics = append ( contentTopics , ct . ContentTopic ( ) )
}
cf := protocol . NewContentFilter ( f . PubsubTopic , contentTopics ... )
mgr . logger . Debug ( "adding to existing pubsubTopic batch" , zap . String ( "agg-filter-id" , afilter . ID ) , zap . Stringer ( "content-filter" , cf ) , zap . Int ( "batch-size" , len ( afilter . contentFilter . ContentTopics ) ) )
mgr . filterConfigs [ filterID ] = filterConfig { afilter . ID , cf }
}
2023-09-13 13:50:23 +03:00
}
}
2024-06-14 15:41:45 +03:00
func ( mgr * FilterManager ) subscribeAndRunLoop ( f filterConfig ) {
ctx , cancel := context . WithCancel ( mgr . ctx )
config := api . FilterConfig { MaxPeers : mgr . cfg . MinPeersForFilter }
2024-06-28 09:54:48 -04:00
sub , err := api . Subscribe ( ctx , mgr . node , f . contentFilter , config , mgr . logger )
2024-06-14 15:41:45 +03:00
mgr . Lock ( )
2024-07-04 10:34:53 +05:30
mgr . filterSubscriptions [ f . ID ] = SubDetails { cancel , sub }
2024-06-14 15:41:45 +03:00
mgr . Unlock ( )
if err == nil {
2024-07-04 10:34:53 +05:30
mgr . logger . Debug ( "subscription successful, running loop" , zap . String ( "agg-filter-id" , f . ID ) , zap . Stringer ( "content-filter" , f . contentFilter ) )
2024-06-14 15:41:45 +03:00
mgr . runFilterSubscriptionLoop ( sub )
} else {
2024-07-04 10:34:53 +05:30
mgr . logger . Error ( "subscription fail, need to debug issue" , zap . String ( "agg-filter-id" , f . ID ) , zap . Stringer ( "content-filter" , f . contentFilter ) , zap . Error ( err ) )
2023-09-13 13:50:23 +03:00
}
}
2024-06-14 15:41:45 +03:00
func ( mgr * FilterManager ) onConnectionStatusChange ( pubsubTopic string , newStatus bool ) {
mgr . logger . Debug ( "inside on connection status change" , zap . Bool ( "new-status" , newStatus ) ,
2024-07-04 10:34:53 +05:30
zap . Int ( "agg filters count" , len ( mgr . filterSubscriptions ) ) )
if newStatus && ! mgr . onlineChecker . IsOnline ( ) { //switched from offline to Online
mgr . logger . Debug ( "switching from offline to online" )
mgr . Lock ( )
if len ( mgr . waitingToSubQueue ) > 0 {
for af := range mgr . waitingToSubQueue {
// TODO: change the below logic once topic specific health is implemented for lightClients
if pubsubTopic == "" || pubsubTopic == af . contentFilter . PubsubTopic {
// Check if any filter subs are pending and subscribe them
mgr . logger . Debug ( "subscribing from filter queue" , zap . String ( "filter-id" , af . ID ) , zap . Stringer ( "content-filter" , af . contentFilter ) )
go mgr . subscribeAndRunLoop ( af )
} else {
// TODO: Can this cause issues?
mgr . waitingToSubQueue <- af
}
if len ( mgr . waitingToSubQueue ) == 0 {
mgr . logger . Debug ( "no pending subscriptions" )
2024-06-14 15:41:45 +03:00
break
}
2023-10-10 14:31:05 +03:00
}
}
2024-07-04 10:34:53 +05:30
mgr . Unlock ( )
2024-06-14 15:41:45 +03:00
}
2024-06-28 09:54:48 -04:00
mgr . onlineChecker . SetOnline ( newStatus )
2024-06-14 15:41:45 +03:00
}
2023-09-13 13:50:23 +03:00
2024-06-14 15:41:45 +03:00
func ( mgr * FilterManager ) removeFilter ( filterID string ) {
mgr . Lock ( )
defer mgr . Unlock ( )
mgr . logger . Debug ( "removing filter" , zap . String ( "filter-id" , filterID ) )
2024-07-04 10:34:53 +05:30
filterConfig , ok := mgr . filterConfigs [ filterID ]
if ! ok {
mgr . logger . Debug ( "filter removal: filter not found" , zap . String ( "filter-id" , filterID ) )
return
}
af , ok := mgr . filterSubscriptions [ filterConfig . ID ]
2024-06-14 15:41:45 +03:00
if ok {
2024-07-04 10:34:53 +05:30
delete ( mgr . filterConfigs , filterID )
for ct := range filterConfig . contentFilter . ContentTopics {
delete ( af . sub . ContentFilter . ContentTopics , ct )
}
if len ( af . sub . ContentFilter . ContentTopics ) == 0 {
af . cancel ( )
} else {
go af . sub . Unsubscribe ( filterConfig . contentFilter )
}
2024-06-14 15:41:45 +03:00
} else {
2024-07-04 10:34:53 +05:30
mgr . logger . Debug ( "filter removal: aggregated filter not found" , zap . String ( "filter-id" , filterID ) , zap . String ( "agg-filter-id" , filterConfig . ID ) )
2023-09-13 13:50:23 +03:00
}
}
func ( mgr * FilterManager ) buildContentFilter ( pubsubTopic string , contentTopicSet common . TopicSet ) protocol . ContentFilter {
contentTopics := make ( [ ] string , len ( contentTopicSet ) )
for i , ct := range maps . Keys ( contentTopicSet ) {
contentTopics [ i ] = ct . ContentTopic ( )
}
2023-11-09 20:29:15 -04:00
return protocol . NewContentFilter ( pubsubTopic , contentTopics ... )
2023-09-13 13:50:23 +03:00
}
2024-06-14 15:41:45 +03:00
func ( mgr * FilterManager ) runFilterSubscriptionLoop ( sub * api . Sub ) {
2023-09-13 13:50:23 +03:00
for {
select {
case <- mgr . ctx . Done ( ) :
2024-06-14 15:41:45 +03:00
mgr . logger . Debug ( "subscription loop ended" , zap . Stringer ( "content-filter" , sub . ContentFilter ) )
2023-09-13 13:50:23 +03:00
return
2024-06-14 15:41:45 +03:00
case env , ok := <- sub . DataCh :
2023-09-13 13:50:23 +03:00
if ok {
err := ( mgr . onNewEnvelopes ) ( env )
if err != nil {
2024-06-14 15:41:45 +03:00
mgr . logger . Error ( "invoking onNewEnvelopes error" , zap . Error ( err ) )
2023-09-13 13:50:23 +03:00
}
} else {
2024-06-14 15:41:45 +03:00
mgr . logger . Debug ( "filter sub is closed" , zap . Any ( "content-filter" , sub . ContentFilter ) )
2023-09-13 13:50:23 +03:00
return
}
}
}
}