2023-09-13 10:50:23 +00:00
package wakuv2
import (
"context"
"sync"
"time"
"github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/status-im/status-go/wakuv2/common"
2023-10-12 19:21:49 +00:00
"go.uber.org/zap"
"golang.org/x/exp/maps"
2023-09-13 10:50:23 +00:00
node "github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
)
const (
FilterEventAdded = iota
FilterEventRemoved
FilterEventPingResult
FilterEventSubscribeResult
FilterEventUnsubscribeResult
FilterEventGetStats
)
type FilterSubs map [ string ] subscription . SubscriptionSet
type FilterEvent struct {
eventType int
filterID string
success bool
peerID peer . ID
tempID string
sub * subscription . SubscriptionDetails
ch chan FilterSubs
}
// Methods on FilterManager maintain filter peer health
//
// runFilterLoop is the main event loop
//
// Filter Install/Uninstall events are pushed onto eventChan
// Subscribe, UnsubscribeWithSubscription, IsSubscriptionAlive calls
// are invoked from goroutines and request results pushed onto eventChan
//
// filterSubs is the map of filter IDs to subscriptions
type FilterManager struct {
ctx context . Context
filterSubs FilterSubs
eventChan chan ( FilterEvent )
isFilterSubAlive func ( sub * subscription . SubscriptionDetails ) error
getFilter func ( string ) * common . Filter
onNewEnvelopes func ( env * protocol . Envelope ) error
logger * zap . Logger
2024-03-04 09:21:20 +00:00
config * Config
2023-09-13 10:50:23 +00:00
node * node . WakuNode
}
2024-03-04 09:21:20 +00:00
func newFilterManager ( ctx context . Context , logger * zap . Logger , getFilterFn func ( string ) * common . Filter , config * Config , onNewEnvelopes func ( env * protocol . Envelope ) error , node * node . WakuNode ) * FilterManager {
2023-09-13 10:50:23 +00:00
// This fn is being mocked in test
mgr := new ( FilterManager )
mgr . ctx = ctx
mgr . logger = logger
mgr . getFilter = getFilterFn
mgr . onNewEnvelopes = onNewEnvelopes
mgr . filterSubs = make ( FilterSubs )
mgr . eventChan = make ( chan FilterEvent , 100 )
2024-03-04 09:21:20 +00:00
mgr . config = config
2023-09-13 10:50:23 +00:00
mgr . node = node
mgr . isFilterSubAlive = func ( sub * subscription . SubscriptionDetails ) error {
2024-06-03 13:19:18 +00:00
return nil
2023-09-13 10:50:23 +00:00
}
return mgr
}
func ( mgr * FilterManager ) runFilterLoop ( wg * sync . WaitGroup ) {
defer wg . Done ( )
// Use it to ping filter peer(s) periodically
ticker := time . NewTicker ( 5 * time . Second )
defer ticker . Stop ( )
for {
select {
case <- mgr . ctx . Done ( ) :
2024-01-30 11:45:08 +00:00
mgr . logger . Debug ( "filter loop stopped" )
2023-09-13 10:50:23 +00:00
return
case <- ticker . C :
mgr . pingPeers ( )
case ev := <- mgr . eventChan :
mgr . processEvents ( & ev )
}
}
}
func ( mgr * FilterManager ) processEvents ( ev * FilterEvent ) {
switch ev . eventType {
case FilterEventAdded :
mgr . filterSubs [ ev . filterID ] = make ( subscription . SubscriptionSet )
mgr . resubscribe ( ev . filterID )
case FilterEventRemoved :
for _ , sub := range mgr . filterSubs [ ev . filterID ] {
if sub == nil {
// Skip temp subs
continue
}
go mgr . unsubscribeFromFilter ( ev . filterID , sub )
}
delete ( mgr . filterSubs , ev . filterID )
case FilterEventPingResult :
if ev . success {
break
}
// filterID field is only set when there are no subs to check for this filter,
// therefore no particular peers that could be unreachable.
if ev . filterID != "" {
2023-10-10 11:31:05 +00:00
// Trigger full resubscribe, filter has too few peers
mgr . logger . Debug ( "filter has too few subs" , zap . String ( "filterId" , ev . filterID ) )
2023-09-13 10:50:23 +00:00
mgr . resubscribe ( ev . filterID )
break
}
2024-01-12 07:09:35 +00:00
2023-09-13 10:50:23 +00:00
// Delete subs for removed peer
for filterID , subs := range mgr . filterSubs {
for _ , sub := range subs {
if sub == nil {
// Skip temp subs
continue
}
if sub . PeerID == ev . peerID {
2023-10-10 11:31:05 +00:00
mgr . logger . Debug ( "filter sub is inactive" , zap . String ( "filterId" , filterID ) , zap . Stringer ( "peerId" , sub . PeerID ) , zap . String ( "subID" , sub . ID ) )
2023-09-13 10:50:23 +00:00
delete ( subs , sub . ID )
go mgr . unsubscribeFromFilter ( filterID , sub )
}
}
mgr . resubscribe ( filterID )
}
case FilterEventSubscribeResult :
subs , found := mgr . filterSubs [ ev . filterID ]
if ev . success {
if found {
subs [ ev . sub . ID ] = ev . sub
go mgr . runFilterSubscriptionLoop ( ev . sub )
} else {
// We subscribed to a filter that is already uninstalled; invoke unsubscribe
go mgr . unsubscribeFromFilter ( ev . filterID , ev . sub )
}
}
if found {
// Delete temp subscription record
delete ( subs , ev . tempID )
}
case FilterEventUnsubscribeResult :
2023-10-10 11:31:05 +00:00
mgr . logger . Debug ( "filter event unsubscribe result" , zap . String ( "filterId" , ev . filterID ) , zap . Stringer ( "peerID" , ev . sub . PeerID ) )
2023-09-13 10:50:23 +00:00
case FilterEventGetStats :
stats := make ( FilterSubs )
for id , subs := range mgr . filterSubs {
stats [ id ] = make ( subscription . SubscriptionSet )
for subID , sub := range subs {
if sub == nil {
// Skip temp subs
continue
}
stats [ id ] [ subID ] = sub
}
}
ev . ch <- stats
}
}
2024-01-12 07:09:35 +00:00
func ( mgr * FilterManager ) subscribeToFilter ( filterID string , tempID string ) {
2023-09-13 10:50:23 +00:00
2023-10-10 11:31:05 +00:00
logger := mgr . logger . With ( zap . String ( "filterId" , filterID ) )
2023-09-13 10:50:23 +00:00
f := mgr . getFilter ( filterID )
if f == nil {
2023-10-10 11:31:05 +00:00
logger . Error ( "filter subscribeToFilter: No filter found" )
2023-09-13 10:50:23 +00:00
mgr . eventChan <- FilterEvent { eventType : FilterEventSubscribeResult , filterID : filterID , tempID : tempID , success : false }
return
}
contentFilter := mgr . buildContentFilter ( f . PubsubTopic , f . ContentTopics )
2024-01-12 07:09:35 +00:00
logger . Debug ( "filter subscribe to filter node" , zap . String ( "pubsubTopic" , contentFilter . PubsubTopic ) , zap . Strings ( "contentTopics" , contentFilter . ContentTopicsList ( ) ) )
2023-09-13 10:50:23 +00:00
ctx , cancel := context . WithTimeout ( mgr . ctx , requestTimeout )
defer cancel ( )
2024-01-12 07:09:35 +00:00
subDetails , err := mgr . node . FilterLightnode ( ) . Subscribe ( ctx , contentFilter , filter . WithAutomaticPeerSelection ( ) )
2023-09-13 10:50:23 +00:00
var sub * subscription . SubscriptionDetails
if err != nil {
2024-01-12 07:09:35 +00:00
logger . Warn ( "filter could not add wakuv2 filter for peers" , zap . Error ( err ) )
2023-09-13 10:50:23 +00:00
} else {
sub = subDetails [ 0 ]
2024-01-12 07:09:35 +00:00
logger . Debug ( "filter subscription success" , zap . Stringer ( "peer" , sub . PeerID ) , zap . String ( "pubsubTopic" , contentFilter . PubsubTopic ) , zap . Strings ( "contentTopics" , contentFilter . ContentTopicsList ( ) ) )
2023-09-13 10:50:23 +00:00
}
success := err == nil
mgr . eventChan <- FilterEvent { eventType : FilterEventSubscribeResult , filterID : filterID , tempID : tempID , sub : sub , success : success }
}
func ( mgr * FilterManager ) unsubscribeFromFilter ( filterID string , sub * subscription . SubscriptionDetails ) {
mgr . logger . Debug ( "filter unsubscribe from filter node" , zap . String ( "filterId" , filterID ) , zap . String ( "subId" , sub . ID ) , zap . Stringer ( "peer" , sub . PeerID ) )
// Unsubscribe on light node
ctx , cancel := context . WithTimeout ( mgr . ctx , requestTimeout )
defer cancel ( )
_ , err := mgr . node . FilterLightnode ( ) . UnsubscribeWithSubscription ( ctx , sub )
if err != nil {
mgr . logger . Warn ( "could not unsubscribe wakuv2 filter for peer" , zap . String ( "filterId" , filterID ) , zap . String ( "subId" , sub . ID ) , zap . Error ( err ) )
}
success := err == nil
mgr . eventChan <- FilterEvent { eventType : FilterEventUnsubscribeResult , filterID : filterID , success : success , sub : sub }
}
// Check whether each of the installed filters
// has enough alive subscriptions to peers
func ( mgr * FilterManager ) pingPeers ( ) {
mgr . logger . Debug ( "filter pingPeers" )
distinctPeers := make ( map [ peer . ID ] struct { } )
for filterID , subs := range mgr . filterSubs {
2023-10-10 11:31:05 +00:00
logger := mgr . logger . With ( zap . String ( "filterId" , filterID ) )
nilSubsCnt := 0
for _ , s := range subs {
if s == nil {
nilSubsCnt ++
}
}
logger . Debug ( "filter ping peers" , zap . Int ( "len" , len ( subs ) ) , zap . Int ( "len(nilSubs)" , nilSubsCnt ) )
2024-03-04 09:21:20 +00:00
if len ( subs ) < mgr . config . MinPeersForFilter {
2023-10-10 11:31:05 +00:00
// Trigger full resubscribe
logger . Debug ( "filter ping peers not enough subs" )
go func ( filterID string ) {
2023-09-13 10:50:23 +00:00
mgr . eventChan <- FilterEvent { eventType : FilterEventPingResult , filterID : filterID , success : false }
2023-10-10 11:31:05 +00:00
} ( filterID )
2023-09-13 10:50:23 +00:00
}
for _ , sub := range subs {
if sub == nil {
// Skip temp subs
continue
}
_ , found := distinctPeers [ sub . PeerID ]
if found {
continue
}
distinctPeers [ sub . PeerID ] = struct { } { }
2023-10-10 11:31:05 +00:00
logger . Debug ( "filter ping peer" , zap . Stringer ( "peerId" , sub . PeerID ) )
2023-09-13 10:50:23 +00:00
go func ( sub * subscription . SubscriptionDetails ) {
err := mgr . isFilterSubAlive ( sub )
alive := err == nil
if alive {
2023-10-10 11:31:05 +00:00
logger . Debug ( "filter aliveness check succeeded" , zap . Stringer ( "peerId" , sub . PeerID ) )
2023-09-13 10:50:23 +00:00
} else {
2023-10-10 11:31:05 +00:00
logger . Debug ( "filter aliveness check failed" , zap . Stringer ( "peerId" , sub . PeerID ) , zap . Error ( err ) )
2023-09-13 10:50:23 +00:00
}
mgr . eventChan <- FilterEvent { eventType : FilterEventPingResult , peerID : sub . PeerID , success : alive }
} ( sub )
}
}
}
func ( mgr * FilterManager ) buildContentFilter ( pubsubTopic string , contentTopicSet common . TopicSet ) protocol . ContentFilter {
contentTopics := make ( [ ] string , len ( contentTopicSet ) )
for i , ct := range maps . Keys ( contentTopicSet ) {
contentTopics [ i ] = ct . ContentTopic ( )
}
2023-11-10 00:29:15 +00:00
return protocol . NewContentFilter ( pubsubTopic , contentTopics ... )
2023-09-13 10:50:23 +00:00
}
func ( mgr * FilterManager ) resubscribe ( filterID string ) {
subs , found := mgr . filterSubs [ filterID ]
if ! found {
mgr . logger . Error ( "resubscribe filter not found" , zap . String ( "filterId" , filterID ) )
return
}
2024-03-04 09:21:20 +00:00
if len ( subs ) > mgr . config . MinPeersForFilter {
2023-10-10 11:31:05 +00:00
mgr . logger . Error ( "filter resubscribe too many subs" , zap . String ( "filterId" , filterID ) , zap . Int ( "len" , len ( subs ) ) )
}
2024-03-04 09:21:20 +00:00
if len ( subs ) == mgr . config . MinPeersForFilter {
2023-10-10 11:31:05 +00:00
// do nothing
return
}
mgr . logger . Debug ( "filter resubscribe subs count:" , zap . String ( "filterId" , filterID ) , zap . Int ( "len" , len ( subs ) ) )
2024-03-04 09:21:20 +00:00
for i := len ( subs ) ; i < mgr . config . MinPeersForFilter ; i ++ {
2023-09-13 10:50:23 +00:00
mgr . logger . Debug ( "filter check not passed, try subscribing to peers" , zap . String ( "filterId" , filterID ) )
2024-01-12 07:09:35 +00:00
// Create sub placeholder in order to avoid potentially too many subs
tempID := uuid . NewString ( )
subs [ tempID ] = nil
go mgr . subscribeToFilter ( filterID , tempID )
2023-09-13 10:50:23 +00:00
}
}
func ( mgr * FilterManager ) runFilterSubscriptionLoop ( sub * subscription . SubscriptionDetails ) {
for {
select {
case <- mgr . ctx . Done ( ) :
return
case env , ok := <- sub . C :
if ok {
err := ( mgr . onNewEnvelopes ) ( env )
if err != nil {
mgr . logger . Error ( "OnNewEnvelopes error" , zap . Error ( err ) )
}
} else {
mgr . logger . Debug ( "filter sub is closed" , zap . String ( "id" , sub . ID ) )
return
}
}
}
}