Update to FilterV2 (#3392)

* Update to FilterV2

* Use a separate goroutine for each filter subscription

* Add multiple filter sub test

* Fix test (add Stop() call)
This commit is contained in:
Vitaliy Vlasov 2023-06-07 12:02:19 +03:00 committed by GitHub
parent d74d930b70
commit 4006cb78b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 252 additions and 51 deletions

View File

@ -30,6 +30,7 @@ type Config struct {
PeerExchange bool `toml:",omitempty"` PeerExchange bool `toml:",omitempty"`
KeepAliveInterval int `toml:",omitempty"` KeepAliveInterval int `toml:",omitempty"`
MinPeersForRelay int `toml:",omitempty"` MinPeersForRelay int `toml:",omitempty"`
MinPeersForFilter int `toml:",omitempty"`
LightClient bool `toml:",omitempty"` LightClient bool `toml:",omitempty"`
WakuNodes []string `toml:",omitempty"` WakuNodes []string `toml:",omitempty"`
Rendezvous bool `toml:",omitempty"` Rendezvous bool `toml:",omitempty"`
@ -52,6 +53,7 @@ var DefaultConfig = Config{
KeepAliveInterval: 10, // second KeepAliveInterval: 10, // second
DiscoveryLimit: 40, DiscoveryLimit: 40,
MinPeersForRelay: 1, // TODO: determine correct value with Vac team MinPeersForRelay: 1, // TODO: determine correct value with Vac team
MinPeersForFilter: 1, // TODO: determine correct value with Vac team and via testing
AutoUpdate: false, AutoUpdate: false,
} }
@ -80,5 +82,9 @@ func setDefaults(cfg *Config) *Config {
cfg.MinPeersForRelay = DefaultConfig.MinPeersForRelay cfg.MinPeersForRelay = DefaultConfig.MinPeersForRelay
} }
if cfg.MinPeersForFilter == 0 {
cfg.MinPeersForFilter = DefaultConfig.MinPeersForFilter
}
return cfg return cfg
} }

View File

@ -28,6 +28,7 @@ import (
"math" "math"
"net" "net"
"runtime" "runtime"
"sort"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -55,7 +56,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/dnsdisc" "github.com/waku-org/go-waku/waku/v2/dnsdisc"
"github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter" "github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange" "github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange"
"github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/relay"
@ -80,6 +81,7 @@ const bootnodesMaxRetries = 7
type settings struct { type settings struct {
LightClient bool // Indicates if the node is a light client LightClient bool // Indicates if the node is a light client
MinPeersForRelay int // Indicates the minimum number of peers required for using Relay Protocol MinPeersForRelay int // Indicates the minimum number of peers required for using Relay Protocol
MinPeersForFilter int // Indicates the minimum number of peers required for using Filter Protocol
MaxMsgSize uint32 // Maximal message length allowed by the waku node MaxMsgSize uint32 // Maximal message length allowed by the waku node
EnableConfirmations bool // Enable sending message confirmations EnableConfirmations bool // Enable sending message confirmations
PeerExchange bool // Enable peer exchange PeerExchange bool // Enable peer exchange
@ -99,8 +101,12 @@ type Waku struct {
dnsAddressCache map[string][]dnsdisc.DiscoveredNode // Map to store the multiaddresses returned by dns discovery dnsAddressCache map[string][]dnsdisc.DiscoveredNode // Map to store the multiaddresses returned by dns discovery
dnsAddressCacheLock *sync.RWMutex // lock to handle access to the map dnsAddressCacheLock *sync.RWMutex // lock to handle access to the map
filters *common.Filters // Message filters installed with Subscribe function // Filter-related
filterMsgChannel chan *protocol.Envelope // Channel for wakuv2 filter messages filters *common.Filters // Message filters installed with Subscribe function
filterSubscriptions map[*common.Filter]map[string]*filter.SubscriptionDetails // wakuv2 filter subscription details
filterPeerDisconnectMap map[peer.ID]int64
isFilterSubAlive func(sub *filter.SubscriptionDetails) error
privateKeys map[string]*ecdsa.PrivateKey // Private key storage privateKeys map[string]*ecdsa.PrivateKey // Private key storage
symKeys map[string][]byte // Symmetric key storage symKeys map[string][]byte // Symmetric key storage
@ -201,11 +207,17 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
dnsAddressCache: make(map[string][]dnsdisc.DiscoveredNode), dnsAddressCache: make(map[string][]dnsdisc.DiscoveredNode),
dnsAddressCacheLock: &sync.RWMutex{}, dnsAddressCacheLock: &sync.RWMutex{},
storeMsgIDs: make(map[gethcommon.Hash]bool), storeMsgIDs: make(map[gethcommon.Hash]bool),
filterPeerDisconnectMap: make(map[peer.ID]int64),
filterSubscriptions: make(map[*common.Filter]map[string]*filter.SubscriptionDetails),
timesource: ts, timesource: ts,
storeMsgIDsMu: sync.RWMutex{}, storeMsgIDsMu: sync.RWMutex{},
logger: logger, logger: logger,
discV5BootstrapNodes: cfg.DiscV5BootstrapNodes, discV5BootstrapNodes: cfg.DiscV5BootstrapNodes,
} }
// This fn is being mocked in test
waku.isFilterSubAlive = func(sub *filter.SubscriptionDetails) error {
return waku.node.FilterLightnode().IsSubscriptionAlive(context.Background(), sub)
}
// Disabling light client mode if using status.prod or undefined // Disabling light client mode if using status.prod or undefined
if fleet == "status.prod" || fleet == "" { if fleet == "status.prod" || fleet == "" {
@ -213,13 +225,14 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
} }
waku.settings = settings{ waku.settings = settings{
MaxMsgSize: cfg.MaxMessageSize, MaxMsgSize: cfg.MaxMessageSize,
LightClient: cfg.LightClient, LightClient: cfg.LightClient,
MinPeersForRelay: cfg.MinPeersForRelay, MinPeersForRelay: cfg.MinPeersForRelay,
PeerExchange: cfg.PeerExchange, MinPeersForFilter: cfg.MinPeersForFilter,
DiscoveryLimit: cfg.DiscoveryLimit, PeerExchange: cfg.PeerExchange,
Nameserver: cfg.Nameserver, DiscoveryLimit: cfg.DiscoveryLimit,
EnableDiscV5: cfg.EnableDiscV5, Nameserver: cfg.Nameserver,
EnableDiscV5: cfg.EnableDiscV5,
} }
waku.filters = common.NewFilters() waku.filters = common.NewFilters()
@ -271,7 +284,7 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
} }
if cfg.LightClient { if cfg.LightClient {
opts = append(opts, node.WithLegacyWakuFilter(false)) opts = append(opts, node.WithWakuFilterLightNode())
} else { } else {
relayOpts := []pubsub.Option{ relayOpts := []pubsub.Option{
pubsub.WithMaxMessageSize(int(waku.settings.MaxMsgSize)), pubsub.WithMaxMessageSize(int(waku.settings.MaxMsgSize)),
@ -616,6 +629,24 @@ func (w *Waku) runRelayMsgLoop() {
} }
} }
func (w *Waku) runFilterSubscriptionLoop(sub *filter.SubscriptionDetails) {
for {
select {
case <-w.quit:
return
case env, ok := <-sub.C:
if ok {
envelopeErrors, err := w.OnNewEnvelopes(env, common.RelayedMessageType)
// TODO: should these be handled?
_ = envelopeErrors
_ = err
} else {
return
}
}
}
}
func (w *Waku) runFilterMsgLoop() { func (w *Waku) runFilterMsgLoop() {
defer w.wg.Done() defer w.wg.Done()
@ -623,41 +654,61 @@ func (w *Waku) runFilterMsgLoop() {
return return
} }
// Use it to ping filter peer(s) periodically
ticker := time.NewTicker(time.Duration(w.cfg.KeepAliveInterval) * time.Second)
defer ticker.Stop()
for { for {
select { select {
case <-w.quit: case <-w.quit:
return return
case env, ok := <-w.filterMsgChannel: case <-ticker.C:
if ok { for f, subMap := range w.filterSubscriptions {
envelopeErrors, err := w.OnNewEnvelopes(env, common.RelayedMessageType) if len(subMap) == 0 {
// TODO: should these be handled? // All peers have disconnected on previous iteration,
_ = envelopeErrors // attempt full reconnect
_ = err err := w.subscribeToFilter(f)
if err != nil {
w.logger.Error("Failed to subscribe to filter")
}
continue
}
for id, sub := range subMap {
err := w.isFilterSubAlive(sub)
if err != nil {
w.filterPeerDisconnectMap[sub.PeerID] = time.Now().Unix()
delete(subMap, id)
// TODO Is it necessary to unsubscribe?
// Re-subscribe
peers := w.findFilterPeers()
if len(peers) > 0 && len(subMap) < w.settings.MinPeersForFilter {
contentFilter := w.buildContentFilter(f.Topics)
subDetails, err := w.node.FilterLightnode().Subscribe(context.Background(), contentFilter, filter.WithPeer(peers[0]))
if err != nil {
w.logger.Warn("could not add wakuv2 filter for peer", zap.Any("peer", peers[0]))
}
subMap[subDetails.ID] = subDetails
go w.runFilterSubscriptionLoop(subDetails)
break
}
}
}
} }
} }
} }
} }
func (w *Waku) buildContentFilter(topics [][]byte) filter.ContentFilter {
func (w *Waku) subscribeWakuFilterTopic(topics [][]byte) { contentFilter := filter.ContentFilter{
var contentTopics []string Topic: relay.DefaultWakuTopic,
}
for _, topic := range topics { for _, topic := range topics {
contentTopics = append(contentTopics, common.BytesToTopic(topic).ContentTopic()) contentFilter.ContentTopics = append(contentFilter.ContentTopics, common.BytesToTopic(topic).ContentTopic())
} }
var err error return contentFilter
contentFilter := legacy_filter.ContentFilter{
Topic: relay.DefaultWakuTopic,
ContentTopics: contentTopics,
}
var wakuFilter legacy_filter.Filter
_, wakuFilter, err = w.node.LegacyFilter().Subscribe(context.Background(), contentFilter)
if err != nil {
w.logger.Warn("could not add wakuv2 filter for topics", zap.Any("topics", topics))
return
}
w.filterMsgChannel = wakuFilter.Chan
} }
// MaxMessageSize returns the maximum accepted message size. // MaxMessageSize returns the maximum accepted message size.
@ -945,35 +996,28 @@ func (w *Waku) GetSymKey(id string) ([]byte, error) {
// Subscribe installs a new message handler used for filtering, decrypting // Subscribe installs a new message handler used for filtering, decrypting
// and subsequent storing of incoming messages. // and subsequent storing of incoming messages.
func (w *Waku) Subscribe(f *common.Filter) (string, error) { func (w *Waku) Subscribe(f *common.Filter) (string, error) {
s, err := w.filters.Install(f) s, err := w.filters.Install(f)
if err != nil { if err != nil {
return s, err return s, err
} }
if w.settings.LightClient { if w.settings.LightClient {
w.subscribeWakuFilterTopic(f.Topics) err = w.subscribeToFilter(f)
if err != nil {
return "", err
}
} }
return s, nil return s, nil
} }
// GetFilter returns the filter by id.
func (w *Waku) GetFilter(id string) *common.Filter {
return w.filters.Get(id)
}
// Unsubscribe removes an installed message handler. // Unsubscribe removes an installed message handler.
func (w *Waku) Unsubscribe(id string) error { func (w *Waku) Unsubscribe(id string) error {
f := w.filters.Get(id) f := w.filters.Get(id)
if f != nil && w.settings.LightClient { if f != nil && w.settings.LightClient {
contentFilter := legacy_filter.ContentFilter{ contentFilter := w.buildContentFilter(f.Topics)
Topic: relay.DefaultWakuTopic, if _, err := w.node.FilterLightnode().Unsubscribe(context.Background(), contentFilter); err != nil {
}
for _, topic := range f.Topics {
contentFilter.ContentTopics = append(contentFilter.ContentTopics, common.BytesToTopic(topic).ContentTopic())
}
if err := w.node.LegacyFilter().UnsubscribeFilter(context.Background(), contentFilter); err != nil {
return fmt.Errorf("failed to unsubscribe: %w", err) return fmt.Errorf("failed to unsubscribe: %w", err)
} }
} }
@ -985,6 +1029,11 @@ func (w *Waku) Unsubscribe(id string) error {
return nil return nil
} }
// GetFilter returns the filter by id.
func (w *Waku) GetFilter(id string) *common.Filter {
return w.filters.Get(id)
}
// Unsubscribe removes an installed message handler. // Unsubscribe removes an installed message handler.
func (w *Waku) UnsubscribeMany(ids []string) error { func (w *Waku) UnsubscribeMany(ids []string) error {
for _, id := range ids { for _, id := range ids {
@ -1110,7 +1159,6 @@ func (w *Waku) Start() error {
} }
w.quit = make(chan struct{}) w.quit = make(chan struct{})
w.filterMsgChannel = make(chan *protocol.Envelope, 1024)
w.connectionChanged = make(chan struct{}) w.connectionChanged = make(chan struct{})
ctx := context.Background() ctx := context.Background()
@ -1202,7 +1250,6 @@ func (w *Waku) Stop() error {
close(w.quit) close(w.quit)
w.identifyService.Close() w.identifyService.Close()
w.node.Stop() w.node.Stop()
close(w.filterMsgChannel)
close(w.connectionChanged) close(w.connectionChanged)
w.wg.Wait() w.wg.Wait()
return nil return nil
@ -1587,3 +1634,63 @@ func formatConnStatus(wakuNode *node.WakuNode, c node.ConnStatus) types.ConnStat
Peers: FormatPeerStats(wakuNode, c.Peers), Peers: FormatPeerStats(wakuNode, c.Peers),
} }
} }
// Find suitable peer(s). For this we use a peerDisconnectMap, it works so that
// peers that have been recently disconnected from have lower priority
func (w *Waku) findFilterPeers() []peer.ID {
allPeers := w.node.Host().Peerstore().Peers()
var peers peer.IDSlice
for _, peer := range allPeers {
protocols, err := w.node.Host().Peerstore().SupportsProtocols(peer, filter.FilterSubscribeID_v20beta1, relay.WakuRelayID_v200)
if err != nil {
continue
}
if len(protocols) == 2 {
peers = append(peers, peer)
}
}
if len(peers) > 0 {
sort.Slice(peers, func(i, j int) bool {
// If element not found in map, [] operator will return 0
return w.filterPeerDisconnectMap[peers[i]] < w.filterPeerDisconnectMap[peers[j]]
})
}
var peerLen = len(peers)
if w.settings.MinPeersForFilter < peerLen {
peerLen = w.settings.MinPeersForFilter
}
peers = peers[0:peerLen]
return peers
}
func (w *Waku) subscribeToFilter(f *common.Filter) error {
peers := w.findFilterPeers()
if len(peers) > 0 {
contentFilter := w.buildContentFilter(f.Topics)
for i := 0; i < len(peers) && i < w.settings.MinPeersForFilter; i++ {
subDetails, err := w.node.FilterLightnode().Subscribe(context.Background(), contentFilter, filter.WithPeer(peers[i]))
if err != nil {
w.logger.Warn("could not add wakuv2 filter for peer", zap.Any("peer", peers[i]))
continue
}
subMap := w.filterSubscriptions[f]
if subMap == nil {
subMap = make(map[string]*filter.SubscriptionDetails)
w.filterSubscriptions[f] = subMap
}
subMap[subDetails.ID] = subDetails
go w.runFilterSubscriptionLoop(subDetails)
}
} else {
return errors.New("could not select a suitable peer for filter")
}
return nil
}

View File

@ -12,6 +12,7 @@ import (
"github.com/cenkalti/backoff/v3" "github.com/cenkalti/backoff/v3"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/waku/v2/dnsdisc" "github.com/waku-org/go-waku/waku/v2/dnsdisc"
waku_filter "github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/protocol/store"
@ -159,4 +160,91 @@ func TestBasicWakuV2(t *testing.T) {
storeResult, err := w.query(context.Background(), storeNode.PeerID, []common.TopicType{contentTopic}, uint64(timestampInSeconds-20), uint64(timestampInSeconds+20), []store.HistoryRequestOption{}) storeResult, err := w.query(context.Background(), storeNode.PeerID, []common.TopicType{contentTopic}, uint64(timestampInSeconds-20), uint64(timestampInSeconds+20), []store.HistoryRequestOption{})
require.NoError(t, err) require.NoError(t, err)
require.NotZero(t, len(storeResult.Messages)) require.NotZero(t, len(storeResult.Messages))
require.NoError(t, w.Stop())
}
func TestWakuV2Filter(t *testing.T) {
enrTreeAddress := testENRBootstrap
envEnrTreeAddress := os.Getenv("ENRTREE_ADDRESS")
if envEnrTreeAddress != "" {
enrTreeAddress = envEnrTreeAddress
}
config := &Config{}
config.Port = 0
config.LightClient = true
config.KeepAliveInterval = 1
config.MinPeersForFilter = 2
config.EnableDiscV5 = true
config.DiscV5BootstrapNodes = []string{enrTreeAddress}
config.DiscoveryLimit = 20
config.UDPPort = 9001
config.WakuNodes = []string{enrTreeAddress}
fleet := "status.test" // Need a name fleet so that LightClient is not set to false
w, err := New("", fleet, config, nil, nil, nil)
require.NoError(t, err)
require.NoError(t, w.Start())
// DNSDiscovery
// Wait for some peers to be discovered
time.Sleep(10 * time.Second)
// At least 3 peers should have been discovered
require.Greater(t, w.PeerCount(), 3)
filter := &common.Filter{
Messages: common.NewMemoryMessageStore(),
Topics: [][]byte{
{1, 2, 3, 4},
},
}
_, err = w.Subscribe(filter)
require.NoError(t, err)
msgTimestamp := w.timestamp()
contentTopic := common.BytesToTopic(filter.Topics[0])
_, err = w.Send(&pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: contentTopic.ContentTopic(),
Version: 0,
Timestamp: msgTimestamp,
})
require.NoError(t, err)
time.Sleep(1 * time.Second)
// Ensure there is 1 active filter subscription
require.Len(t, w.filterSubscriptions, 1)
subMap := w.filterSubscriptions[filter]
// Ensure there are some active peers for this filter subscription
require.Greater(t, len(subMap), 0)
messages := filter.Retrieve()
//require.Len(t, messages, 1)
require.Len(t, messages, 1)
// Mock peers going down
isFilterSubAliveBak := w.isFilterSubAlive
w.settings.MinPeersForFilter = 0
w.isFilterSubAlive = func(sub *waku_filter.SubscriptionDetails) error {
return errors.New("peer down")
}
time.Sleep(10 * time.Second)
// Ensure there are 0 active peers now
require.Len(t, subMap, 0)
// Reconnect
w.settings.MinPeersForFilter = 2
w.isFilterSubAlive = isFilterSubAliveBak
time.Sleep(10 * time.Second)
// Ensure there are some active peers now
require.Greater(t, len(subMap), 0)
require.NoError(t, w.Stop())
} }