2023-09-13 10:50:23 +00:00
package wakuv2
import (
"context"
"crypto/rand"
"errors"
"math/big"
"sync"
"time"
"github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/status-im/status-go/wakuv2/common"
2023-10-12 19:21:49 +00:00
"go.uber.org/zap"
"golang.org/x/exp/maps"
2023-09-13 10:50:23 +00:00
node "github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
)
const (
FilterEventAdded = iota
FilterEventRemoved
FilterEventPingResult
FilterEventSubscribeResult
FilterEventUnsubscribeResult
FilterEventGetStats
)
const pingTimeout = 10 * time . Second
type FilterSubs map [ string ] subscription . SubscriptionSet
type FilterEvent struct {
eventType int
filterID string
success bool
peerID peer . ID
tempID string
sub * subscription . SubscriptionDetails
ch chan FilterSubs
}
// Methods on FilterManager maintain filter peer health
//
// runFilterLoop is the main event loop
//
// Filter Install/Uninstall events are pushed onto eventChan
// Subscribe, UnsubscribeWithSubscription, IsSubscriptionAlive calls
// are invoked from goroutines and request results pushed onto eventChan
//
// filterSubs is the map of filter IDs to subscriptions
type FilterManager struct {
ctx context . Context
filterSubs FilterSubs
eventChan chan ( FilterEvent )
isFilterSubAlive func ( sub * subscription . SubscriptionDetails ) error
getFilter func ( string ) * common . Filter
onNewEnvelopes func ( env * protocol . Envelope ) error
peers [ ] peer . ID
logger * zap . Logger
settings settings
node * node . WakuNode
}
func newFilterManager ( ctx context . Context , logger * zap . Logger , getFilterFn func ( string ) * common . Filter , settings settings , onNewEnvelopes func ( env * protocol . Envelope ) error , node * node . WakuNode ) * FilterManager {
// This fn is being mocked in test
mgr := new ( FilterManager )
mgr . ctx = ctx
mgr . logger = logger
mgr . getFilter = getFilterFn
mgr . onNewEnvelopes = onNewEnvelopes
mgr . filterSubs = make ( FilterSubs )
mgr . eventChan = make ( chan FilterEvent , 100 )
mgr . peers = make ( [ ] peer . ID , 0 )
mgr . settings = settings
mgr . node = node
mgr . isFilterSubAlive = func ( sub * subscription . SubscriptionDetails ) error {
ctx , cancel := context . WithTimeout ( ctx , pingTimeout )
defer cancel ( )
return mgr . node . FilterLightnode ( ) . IsSubscriptionAlive ( ctx , sub )
}
return mgr
}
func ( mgr * FilterManager ) runFilterLoop ( wg * sync . WaitGroup ) {
defer wg . Done ( )
// Use it to ping filter peer(s) periodically
ticker := time . NewTicker ( 5 * time . Second )
defer ticker . Stop ( )
// Populate filter peers initially
mgr . peers = mgr . findFilterPeers ( ) // ordered list of peers to select from
for {
select {
case <- mgr . ctx . Done ( ) :
return
case <- ticker . C :
mgr . peers = mgr . findFilterPeers ( )
mgr . pingPeers ( )
case ev := <- mgr . eventChan :
mgr . processEvents ( & ev )
}
}
}
func ( mgr * FilterManager ) processEvents ( ev * FilterEvent ) {
switch ev . eventType {
case FilterEventAdded :
mgr . filterSubs [ ev . filterID ] = make ( subscription . SubscriptionSet )
mgr . resubscribe ( ev . filterID )
case FilterEventRemoved :
for _ , sub := range mgr . filterSubs [ ev . filterID ] {
if sub == nil {
// Skip temp subs
continue
}
go mgr . unsubscribeFromFilter ( ev . filterID , sub )
}
delete ( mgr . filterSubs , ev . filterID )
case FilterEventPingResult :
if ev . success {
break
}
// filterID field is only set when there are no subs to check for this filter,
// therefore no particular peers that could be unreachable.
if ev . filterID != "" {
// Trigger full resubscribe, filter has no peers
mgr . logger . Debug ( "filter has no subs" , zap . String ( "filterId" , ev . filterID ) )
mgr . resubscribe ( ev . filterID )
break
}
// Remove peer from list
for i , p := range mgr . peers {
if ev . peerID == p {
mgr . peers = append ( mgr . peers [ : i ] , mgr . peers [ i + 1 : ] ... )
break
}
}
// Delete subs for removed peer
for filterID , subs := range mgr . filterSubs {
for _ , sub := range subs {
if sub == nil {
// Skip temp subs
continue
}
if sub . PeerID == ev . peerID {
mgr . logger . Debug ( "filter sub is inactive" , zap . String ( "filterId" , filterID ) , zap . String ( "subID" , sub . ID ) )
delete ( subs , sub . ID )
go mgr . unsubscribeFromFilter ( filterID , sub )
}
}
mgr . resubscribe ( filterID )
}
case FilterEventSubscribeResult :
subs , found := mgr . filterSubs [ ev . filterID ]
if ev . success {
if found {
subs [ ev . sub . ID ] = ev . sub
go mgr . runFilterSubscriptionLoop ( ev . sub )
} else {
// We subscribed to a filter that is already uninstalled; invoke unsubscribe
go mgr . unsubscribeFromFilter ( ev . filterID , ev . sub )
}
}
if found {
// Delete temp subscription record
delete ( subs , ev . tempID )
}
case FilterEventUnsubscribeResult :
mgr . logger . Debug ( "filter event unsubscribe_result" , zap . String ( "filterId" , ev . filterID ) , zap . Stringer ( "peerID" , ev . sub . PeerID ) )
case FilterEventGetStats :
stats := make ( FilterSubs )
for id , subs := range mgr . filterSubs {
stats [ id ] = make ( subscription . SubscriptionSet )
for subID , sub := range subs {
if sub == nil {
// Skip temp subs
continue
}
stats [ id ] [ subID ] = sub
}
}
ev . ch <- stats
}
}
func ( mgr * FilterManager ) subscribeToFilter ( filterID string , peer peer . ID , tempID string ) {
f := mgr . getFilter ( filterID )
if f == nil {
mgr . logger . Error ( "filter subscribeToFilter: No filter found" , zap . String ( "id" , filterID ) )
mgr . eventChan <- FilterEvent { eventType : FilterEventSubscribeResult , filterID : filterID , tempID : tempID , success : false }
return
}
contentFilter := mgr . buildContentFilter ( f . PubsubTopic , f . ContentTopics )
mgr . logger . Debug ( "filter subscribe to filter node" , zap . Stringer ( "peer" , peer ) , zap . String ( "pubsubTopic" , contentFilter . PubsubTopic ) , zap . Strings ( "contentTopics" , contentFilter . ContentTopicsList ( ) ) )
ctx , cancel := context . WithTimeout ( mgr . ctx , requestTimeout )
defer cancel ( )
subDetails , err := mgr . node . FilterLightnode ( ) . Subscribe ( ctx , contentFilter , filter . WithPeer ( peer ) )
var sub * subscription . SubscriptionDetails
if err != nil {
mgr . logger . Warn ( "filter could not add wakuv2 filter for peer" , zap . String ( "filterId" , filterID ) , zap . Stringer ( "peer" , peer ) , zap . Error ( err ) )
} else {
mgr . logger . Debug ( "filter subscription success" , zap . String ( "filterId" , filterID ) , zap . Stringer ( "peer" , peer ) , zap . String ( "pubsubTopic" , contentFilter . PubsubTopic ) , zap . Strings ( "contentTopics" , contentFilter . ContentTopicsList ( ) ) )
sub = subDetails [ 0 ]
}
success := err == nil
mgr . eventChan <- FilterEvent { eventType : FilterEventSubscribeResult , filterID : filterID , tempID : tempID , sub : sub , success : success }
}
func ( mgr * FilterManager ) unsubscribeFromFilter ( filterID string , sub * subscription . SubscriptionDetails ) {
mgr . logger . Debug ( "filter unsubscribe from filter node" , zap . String ( "filterId" , filterID ) , zap . String ( "subId" , sub . ID ) , zap . Stringer ( "peer" , sub . PeerID ) )
// Unsubscribe on light node
ctx , cancel := context . WithTimeout ( mgr . ctx , requestTimeout )
defer cancel ( )
_ , err := mgr . node . FilterLightnode ( ) . UnsubscribeWithSubscription ( ctx , sub )
if err != nil {
mgr . logger . Warn ( "could not unsubscribe wakuv2 filter for peer" , zap . String ( "filterId" , filterID ) , zap . String ( "subId" , sub . ID ) , zap . Error ( err ) )
}
success := err == nil
mgr . eventChan <- FilterEvent { eventType : FilterEventUnsubscribeResult , filterID : filterID , success : success , sub : sub }
}
// Check whether each of the installed filters
// has enough alive subscriptions to peers
func ( mgr * FilterManager ) pingPeers ( ) {
mgr . logger . Debug ( "filter pingPeers" )
distinctPeers := make ( map [ peer . ID ] struct { } )
for filterID , subs := range mgr . filterSubs {
if len ( subs ) == 0 {
// No subs found, trigger full resubscribe
mgr . logger . Debug ( "filter ping peer no subs" , zap . String ( "filterId" , filterID ) )
go func ( ) {
mgr . eventChan <- FilterEvent { eventType : FilterEventPingResult , filterID : filterID , success : false }
} ( )
continue
}
for _ , sub := range subs {
if sub == nil {
// Skip temp subs
continue
}
_ , found := distinctPeers [ sub . PeerID ]
if found {
continue
}
distinctPeers [ sub . PeerID ] = struct { } { }
mgr . logger . Debug ( "filter ping peer" , zap . Stringer ( "peerId" , sub . PeerID ) )
go func ( sub * subscription . SubscriptionDetails ) {
err := mgr . isFilterSubAlive ( sub )
alive := err == nil
if alive {
mgr . logger . Debug ( "filter aliveness check succeeded" , zap . Stringer ( "peerId" , sub . PeerID ) )
} else {
mgr . logger . Debug ( "filter aliveness check failed" , zap . Stringer ( "peerId" , sub . PeerID ) , zap . Error ( err ) )
}
mgr . eventChan <- FilterEvent { eventType : FilterEventPingResult , peerID : sub . PeerID , success : alive }
} ( sub )
}
}
}
func ( mgr * FilterManager ) buildContentFilter ( pubsubTopic string , contentTopicSet common . TopicSet ) protocol . ContentFilter {
contentTopics := make ( [ ] string , len ( contentTopicSet ) )
for i , ct := range maps . Keys ( contentTopicSet ) {
contentTopics [ i ] = ct . ContentTopic ( )
}
return protocol . ContentFilter {
PubsubTopic : pubsubTopic ,
ContentTopics : protocol . NewContentTopicSet ( contentTopics ... ) ,
}
}
// Find suitable peer(s)
func ( mgr * FilterManager ) findFilterPeers ( ) [ ] peer . ID {
allPeers := mgr . node . Host ( ) . Peerstore ( ) . Peers ( )
peers := make ( [ ] peer . ID , 0 )
for _ , peer := range allPeers {
protocols , err := mgr . node . Host ( ) . Peerstore ( ) . SupportsProtocols ( peer , filter . FilterSubscribeID_v20beta1 , relay . WakuRelayID_v200 )
if err != nil {
mgr . logger . Debug ( "SupportsProtocols error" , zap . Error ( err ) )
continue
}
if len ( protocols ) == 2 {
peers = append ( peers , peer )
}
}
mgr . logger . Debug ( "Filtered peers" , zap . Int ( "cnt" , len ( peers ) ) )
return peers
}
func ( mgr * FilterManager ) findPeerCandidate ( ) ( peer . ID , error ) {
if len ( mgr . peers ) == 0 {
return "" , errors . New ( "filter could not select a suitable peer" )
}
n , _ := rand . Int ( rand . Reader , big . NewInt ( int64 ( len ( mgr . peers ) ) ) )
return mgr . peers [ n . Int64 ( ) ] , nil
}
func ( mgr * FilterManager ) resubscribe ( filterID string ) {
subs , found := mgr . filterSubs [ filterID ]
if ! found {
mgr . logger . Error ( "resubscribe filter not found" , zap . String ( "filterId" , filterID ) )
return
}
mgr . logger . Debug ( "filter active subscriptions count:" , zap . String ( "filterId" , filterID ) , zap . Int ( "len" , len ( subs ) ) )
for i := len ( subs ) ; i < mgr . settings . MinPeersForFilter ; i ++ {
mgr . logger . Debug ( "filter check not passed, try subscribing to peers" , zap . String ( "filterId" , filterID ) )
peer , err := mgr . findPeerCandidate ( )
if err == nil {
// Create sub placeholder in order to avoid potentially too many subs
tempID := uuid . NewString ( )
subs [ tempID ] = nil
go mgr . subscribeToFilter ( filterID , peer , tempID )
} else {
mgr . logger . Error ( "filter resubscribe findPeer error" , zap . Error ( err ) )
}
}
}
func ( mgr * FilterManager ) runFilterSubscriptionLoop ( sub * subscription . SubscriptionDetails ) {
for {
select {
case <- mgr . ctx . Done ( ) :
return
case env , ok := <- sub . C :
if ok {
err := ( mgr . onNewEnvelopes ) ( env )
if err != nil {
mgr . logger . Error ( "OnNewEnvelopes error" , zap . Error ( err ) )
}
} else {
mgr . logger . Debug ( "filter sub is closed" , zap . String ( "id" , sub . ID ) )
return
}
}
}
}