diff --git a/cmd/waku/flags.go b/cmd/waku/flags.go index 2a2dfe00..387c4492 100644 --- a/cmd/waku/flags.go +++ b/cmd/waku/flags.go @@ -364,26 +364,6 @@ var ( Destination: &options.Filter.Timeout, EnvVars: []string{"WAKUNODE2_FILTER_TIMEOUT"}, }) - FilterLegacyFlag = altsrc.NewBoolFlag(&cli.BoolFlag{ - Name: "legacy-filter", - Usage: "Use filter protocol (legacy)", - Destination: &options.Filter.UseV1, - EnvVars: []string{"WAKUNODE2_USE_LEGACY_FILTER"}, - }) - FilterLegacyLightClient = altsrc.NewBoolFlag(&cli.BoolFlag{ - Name: "legacy-filter-light-client", - Usage: "Don't accept legacy filter subscribers", - Destination: &options.Filter.DisableFullNode, - EnvVars: []string{"WAKUNODE2_LEGACY_FILTER_LIGHT_CLIENT"}, - }) - FilterLegacyNode = cliutils.NewGenericFlagMultiValue(&cli.GenericFlag{ - Name: "legacy-filternode", - Usage: "Multiaddr of a peer that supports legacy filter protocol. Option may be repeated", - Value: &cliutils.MultiaddrSlice{ - Values: &options.Filter.NodesV1, - }, - EnvVars: []string{"WAKUNODE2_LEGACY_FILTERNODE"}, - }) LightPush = altsrc.NewBoolFlag(&cli.BoolFlag{ Name: "lightpush", Usage: "Enable lightpush protocol", diff --git a/cmd/waku/main.go b/cmd/waku/main.go index 4316aae5..cb97c027 100644 --- a/cmd/waku/main.go +++ b/cmd/waku/main.go @@ -70,9 +70,6 @@ func main() { FilterFlag, FilterNode, FilterTimeout, - FilterLegacyFlag, - FilterLegacyNode, - FilterLegacyLightClient, LightPush, LightPushNode, Discv5Discovery, diff --git a/cmd/waku/node.go b/cmd/waku/node.go index f59bf571..4a5a352b 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -46,7 +46,6 @@ import ( "github.com/waku-org/go-waku/waku/v2/node" wprotocol "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" - "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" "github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange" "github.com/waku-org/go-waku/waku/v2/protocol/relay" @@ -253,10 +252,6 @@ func Execute(options NodeOptions) error { nodeOpts = append(nodeOpts, node.WithWakuFilterFullNode(filter.WithTimeout(options.Filter.Timeout))) } - if options.Filter.UseV1 { - nodeOpts = append(nodeOpts, node.WithLegacyWakuFilter(!options.Filter.DisableFullNode, legacy_filter.WithTimeout(options.Filter.Timeout))) - } - var dbStore *persistence.DBStore if requiresDB(options) { dbOptions := []persistence.DBOption{ @@ -331,12 +326,6 @@ func Execute(options NodeOptions) error { pubSubTopicMapKeys = append(pubSubTopicMapKeys, k) } - if options.Filter.UseV1 { - if err := addStaticPeers(wakuNode, options.Filter.NodesV1, pubSubTopicMapKeys, legacy_filter.FilterID_v20beta1); err != nil { - return err - } - } - if err = wakuNode.Start(ctx); err != nil { return nonRecoverError(err) } diff --git a/cmd/waku/options.go b/cmd/waku/options.go index dc824e5b..212fcf1c 100644 --- a/cmd/waku/options.go +++ b/cmd/waku/options.go @@ -53,10 +53,8 @@ type RLNRelayOptions struct { // restricted devices. type FilterOptions struct { Enable bool - UseV1 bool DisableFullNode bool Nodes []multiaddr.Multiaddr - NodesV1 []multiaddr.Multiaddr Timeout time.Duration } diff --git a/cmd/waku/server/utils.go b/cmd/waku/server/utils.go index f11cee5c..865d0900 100644 --- a/cmd/waku/server/utils.go +++ b/cmd/waku/server/utils.go @@ -6,15 +6,13 @@ import ( "github.com/libp2p/go-libp2p/core/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" - "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/store" ) func IsWakuProtocol(protocol protocol.ID) bool { - return protocol == legacy_filter.FilterID_v20beta1 || - protocol == filter.FilterPushID_v20beta1 || + return protocol == filter.FilterPushID_v20beta1 || protocol == filter.FilterSubscribeID_v20beta1 || protocol == relay.WakuRelayID_v200 || protocol == lightpush.LightPushID_v20beta1 || diff --git a/library/c/api_legacy_filter.go b/library/c/api_legacy_filter.go deleted file mode 100644 index 68b21429..00000000 --- a/library/c/api_legacy_filter.go +++ /dev/null @@ -1,64 +0,0 @@ -package main - -/* -#include -*/ -import "C" -import ( - "unsafe" - - "github.com/waku-org/go-waku/library" -) - -// Creates a subscription to a light node matching a content filter and, optionally, a pubSub topic. -// filterJSON must contain a JSON with this format: -// -// { -// "contentFilters": [ // mandatory -// { -// "contentTopic": "the content topic" -// }, ... -// ], -// "pubsubTopic": "the pubsub topic" // optional -// } -// -// peerID should contain the ID of a peer supporting the filter protocol. Use NULL to automatically select a node -// If ms is greater than 0, the subscription must happen before the timeout -// (in milliseconds) is reached, or an error will be returned -// -//export waku_legacy_filter_subscribe -func waku_legacy_filter_subscribe(ctx unsafe.Pointer, filterJSON *C.char, peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int { - instance, err := getInstance(ctx) - if err != nil { - onError(err, cb, userData) - } - - err = library.LegacyFilterSubscribe(instance, C.GoString(filterJSON), C.GoString(peerID), int(ms)) - return onError(err, cb, userData) -} - -// Removes subscriptions in a light node matching a content filter and, optionally, a pubSub topic. -// filterJSON must contain a JSON with this format: -// -// { -// "contentFilters": [ // mandatory -// { -// "contentTopic": "the content topic" -// }, ... -// ], -// "pubsubTopic": "the pubsub topic" // optional -// } -// -// If ms is greater than 0, the subscription must happen before the timeout -// (in milliseconds) is reached, or an error will be returned -// -//export waku_legacy_filter_unsubscribe -func waku_legacy_filter_unsubscribe(ctx unsafe.Pointer, filterJSON *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int { - instance, err := getInstance(ctx) - if err != nil { - onError(err, cb, userData) - } - - err = library.LegacyFilterUnsubscribe(instance, C.GoString(filterJSON), int(ms)) - return onError(err, cb, userData) -} diff --git a/library/config.go b/library/config.go index 22bd2883..344d88a3 100644 --- a/library/config.go +++ b/library/config.go @@ -19,7 +19,6 @@ type WakuConfig struct { EnableRelay *bool `json:"relay"` RelayTopics []string `json:"relayTopics,omitempty"` GossipSubParams *GossipSubParams `json:"gossipsubParams,omitempty"` - EnableLegacyFilter *bool `json:"legacyFilter,omitempty"` MinPeersToPublish *int `json:"minPeersToPublish,omitempty"` DNSDiscoveryURLs []string `json:"dnsDiscoveryURLs,omitempty"` DNSDiscoveryNameServer string `json:"dnsDiscoveryNameServer,omitempty"` @@ -49,7 +48,6 @@ var defaultPort = 60000 var defaultKeepAliveInterval = 20 var defaultEnableRelay = true var defaultMinPeersToPublish = 0 -var defaultEnableLegacyFilter = false var defaultEnableDiscV5 = false var defaultDiscV5UDPPort = uint(9000) var defaultLogLevel = "INFO" @@ -227,10 +225,6 @@ func getConfig(configJSON string) (WakuConfig, error) { config.EnableRelay = &defaultEnableRelay } - if config.EnableLegacyFilter == nil { - config.EnableLegacyFilter = &defaultEnableLegacyFilter - } - if config.EnableDiscV5 == nil { config.EnableDiscV5 = &defaultEnableDiscV5 } diff --git a/library/legacy_filter.go b/library/legacy_filter.go deleted file mode 100644 index 41433c68..00000000 --- a/library/legacy_filter.go +++ /dev/null @@ -1,105 +0,0 @@ -package library - -import ( - "context" - "encoding/json" - "time" - - "github.com/libp2p/go-libp2p/core/peer" - "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter" - "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb" -) - -type legacyFilterArgument struct { - Topic string `json:"pubsubTopic,omitempty"` - ContentFilters []*pb.FilterRequest_ContentFilter `json:"contentFilters,omitempty"` -} - -func toLegacyContentFilter(filterJSON string) (legacy_filter.ContentFilter, error) { - var f legacyFilterArgument - err := json.Unmarshal([]byte(filterJSON), &f) - if err != nil { - return legacy_filter.ContentFilter{}, err - } - - result := legacy_filter.ContentFilter{ - Topic: f.Topic, - } - for _, cf := range f.ContentFilters { - result.ContentTopics = append(result.ContentTopics, cf.ContentTopic) - } - - return result, err -} - -// LegacyFilterSubscribe is used to create a subscription to a filter node to receive messages -// Deprecated: Use FilterSubscribe instead -func LegacyFilterSubscribe(instance *WakuInstance, filterJSON string, peerID string, ms int) error { - cf, err := toLegacyContentFilter(filterJSON) - if err != nil { - return err - } - - if err := validateInstance(instance, MustBeStarted); err != nil { - return err - } - - var ctx context.Context - var cancel context.CancelFunc - - if ms > 0 { - ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond) - defer cancel() - } else { - ctx = instance.ctx - } - - var fOptions []legacy_filter.FilterSubscribeOption - if peerID != "" { - p, err := peer.Decode(peerID) - if err != nil { - return err - } - fOptions = append(fOptions, legacy_filter.WithPeer(p)) - } else { - fOptions = append(fOptions, legacy_filter.WithAutomaticPeerSelection()) - } - - _, f, err := instance.node.LegacyFilter().Subscribe(ctx, cf, fOptions...) - if err != nil { - return err - } - - go func(f legacy_filter.Filter) { - for envelope := range f.Chan { - send(instance, "message", toSubscriptionMessage(envelope)) - } - }(f) - - return nil -} - -// LegacyFilterUnsubscribe is used to remove a filter criteria from an active subscription with a filter node -// Deprecated: Use FilterUnsubscribe or FilterUnsubscribeAll instead -func LegacyFilterUnsubscribe(instance *WakuInstance, filterJSON string, ms int) error { - cf, err := toLegacyContentFilter(filterJSON) - if err != nil { - return err - } - - if err := validateInstance(instance, MustBeStarted); err != nil { - return err - } - - var ctx context.Context - var cancel context.CancelFunc - - if ms > 0 { - ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond) - defer cancel() - } else { - ctx = instance.ctx - } - - return instance.node.LegacyFilter().UnsubscribeFilter(ctx, cf) -} diff --git a/library/mobile/api_legacy_filter.go b/library/mobile/api_legacy_filter.go deleted file mode 100644 index 43382e95..00000000 --- a/library/mobile/api_legacy_filter.go +++ /dev/null @@ -1,29 +0,0 @@ -package gowaku - -import ( - "github.com/waku-org/go-waku/library" -) - -// LegacyFilterSubscribe is used to create a subscription to a filter node to receive messages -// Deprecated: Use FilterSubscribe instead -func LegacyFilterSubscribe(instanceID uint, filterJSON string, peerID string, ms int) string { - instance, err := library.GetInstance(instanceID) - if err != nil { - return makeJSONResponse(err) - } - - err = library.LegacyFilterSubscribe(instance, filterJSON, peerID, ms) - return makeJSONResponse(err) -} - -// LegacyFilterUnsubscribe is used to remove a filter criteria from an active subscription with a filter node -// Deprecated: Use FilterUnsubscribe or FilterUnsubscribeAll instead -func LegacyFilterUnsubscribe(instanceID uint, filterJSON string, ms int) string { - instance, err := library.GetInstance(instanceID) - if err != nil { - return makeJSONResponse(err) - } - - err = library.LegacyFilterUnsubscribe(instance, filterJSON, ms) - return makeJSONResponse(err) -} diff --git a/library/node.go b/library/node.go index 34c06931..d8b78798 100644 --- a/library/node.go +++ b/library/node.go @@ -195,10 +195,6 @@ func NewNode(instance *WakuInstance, configJSON string) error { } } - if *config.EnableLegacyFilter { - opts = append(opts, node.WithLegacyWakuFilter(false)) - } - opts = append(opts, node.WithWakuFilterLightNode()) if *config.EnableStore { diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 2c89eb9e..dda7c092 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -36,7 +36,6 @@ import ( wakuprotocol "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/enr" "github.com/waku-org/go-waku/waku/v2/protocol/filter" - "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" "github.com/waku-org/go-waku/waku/v2/protocol/metadata" "github.com/waku-org/go-waku/waku/v2/protocol/pb" @@ -99,7 +98,6 @@ type WakuNode struct { peerExchange Service rendezvous Service metadata Service - legacyFilter ReceptorService filterFullNode ReceptorService filterLightNode Service store ReceptorService @@ -194,7 +192,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.log = params.logger.Named("node2") w.wg = &sync.WaitGroup{} w.keepAliveFails = make(map[peer.ID]int) - w.wakuFlag = enr.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilterFullNode || w.opts.enableLegacyFilter, w.opts.enableStore, w.opts.enableRelay) + w.wakuFlag = enr.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilterFullNode, w.opts.enableStore, w.opts.enableRelay) w.circuitRelayNodes = make(chan peer.AddrInfo) w.metrics = newMetrics(params.prometheusReg) @@ -286,10 +284,8 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { } } - w.opts.legacyFilterOpts = append(w.opts.legacyFilterOpts, legacy_filter.WithPeerManager(w.peermanager)) w.opts.filterOpts = append(w.opts.filterOpts, filter.WithPeerManager(w.peermanager)) - w.legacyFilter = legacy_filter.NewWakuFilter(w.bcaster, w.opts.isLegacyFilterFullNode, w.timesource, w.opts.prometheusReg, w.log, w.opts.legacyFilterOpts...) w.filterFullNode = filter.NewWakuFilterFullNode(w.timesource, w.opts.prometheusReg, w.log, w.opts.filterOpts...) w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.prometheusReg, w.log) w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log) @@ -443,16 +439,6 @@ func (w *WakuNode) Start(ctx context.Context) error { } } - w.legacyFilter.SetHost(host) - if w.opts.enableLegacyFilter { - sub := w.bcaster.RegisterForAll() - err := w.legacyFilter.Start(ctx, sub) - if err != nil { - return err - } - w.log.Info("Subscribing filter to broadcaster") - } - w.filterFullNode.SetHost(host) if w.opts.enableFilterFullNode { sub := w.bcaster.RegisterForAll() @@ -512,7 +498,6 @@ func (w *WakuNode) Stop() { w.relay.Stop() w.lightPush.Stop() w.store.Stop() - w.legacyFilter.Stop() w.filterFullNode.Stop() w.filterLightNode.Stop() @@ -602,14 +587,6 @@ func (w *WakuNode) Store() store.Store { return w.store.(store.Store) } -// LegacyFilter is used to access any operation related to Waku LegacyFilter protocol -func (w *WakuNode) LegacyFilter() *legacy_filter.WakuFilter { - if result, ok := w.legacyFilter.(*legacy_filter.WakuFilter); ok { - return result - } - return nil -} - // FilterLightnode is used to access any operation related to Waku Filter protocol Full node feature func (w *WakuNode) FilterFullNode() *filter.WakuFilterFullNode { if result, ok := w.filterFullNode.(*filter.WakuFilterFullNode); ok { diff --git a/waku/v2/node/wakunode2_test.go b/waku/v2/node/wakunode2_test.go index d8c78b0c..ba22108f 100644 --- a/waku/v2/node/wakunode2_test.go +++ b/waku/v2/node/wakunode2_test.go @@ -19,7 +19,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/dnsdisc" "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter" + "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/store" @@ -230,7 +230,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) { wakuNode1, err := New( WithHostAddress(hostAddr1), WithWakuRelay(), - WithLegacyWakuFilter(true), + WithWakuFilterFullNode(), ) require.NoError(t, err) err = wakuNode1.Start(ctx) @@ -251,7 +251,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) { require.NoError(t, err) wakuNode2, err := New( WithHostAddress(hostAddr2), - WithLegacyWakuFilter(false), + WithWakuFilterLightNode(), WithWakuStore(), WithMessageProvider(dbStore), ) @@ -260,15 +260,13 @@ func TestDecoupledStoreFromRelay(t *testing.T) { require.NoError(t, err) defer wakuNode2.Stop() - err = wakuNode2.DialPeerWithMultiAddress(ctx, wakuNode1.ListenAddresses()[0]) + peerID, err := wakuNode2.AddPeer(wakuNode1.ListenAddresses()[0], peerstore.Static, []string{relay.DefaultWakuTopic}, filter.FilterSubscribeID_v20beta1) require.NoError(t, err) - time.Sleep(2 * time.Second) - - _, filter, err := wakuNode2.LegacyFilter().Subscribe(ctx, legacy_filter.ContentFilter{ - Topic: string(relay.DefaultWakuTopic), - ContentTopics: []string{"abc"}, - }, legacy_filter.WithPeer(wakuNode1.host.ID())) + subscription, err := wakuNode2.FilterLightnode().Subscribe(ctx, protocol.ContentFilter{ + PubsubTopic: relay.DefaultWakuTopic, + ContentTopics: protocol.NewContentTopicSet("abc"), + }, filter.WithPeer(peerID)) require.NoError(t, err) // Sleep to make sure the filter is subscribed @@ -284,7 +282,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) { go func() { // MSG1 should be pushed in NODE2 via filter defer wg.Done() - env, ok := <-filter.Chan + env, ok := <-subscription[0].C if !ok { require.Fail(t, "no message") } @@ -304,7 +302,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) { require.NoError(t, err) wakuNode3, err := New( WithHostAddress(hostAddr3), - WithLegacyWakuFilter(false), + WithWakuFilterLightNode(), ) require.NoError(t, err) err = wakuNode3.Start(ctx) diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 50970a1c..9372d456 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -28,7 +28,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol/filter" - "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/rendezvous" @@ -74,14 +73,11 @@ type WakuNodeParameters struct { logger *zap.Logger logLevel logging.LogLevel - enableRelay bool - enableLegacyFilter bool - isLegacyFilterFullNode bool - enableFilterLightNode bool - enableFilterFullNode bool - legacyFilterOpts []legacy_filter.Option - filterOpts []filter.Option - pubsubOpts []pubsub.Option + enableRelay bool + enableFilterLightNode bool + enableFilterFullNode bool + filterOpts []filter.Option + pubsubOpts []pubsub.Option minRelayPeersToPublish int maxMsgSizeBytes int @@ -413,17 +409,6 @@ func WithPeerExchange() WakuNodeOption { } } -// WithLegacyWakuFilter enables the legacy Waku Filter protocol. This WakuNodeOption -// accepts a list of WakuFilter gossipsub options to setup the protocol -func WithLegacyWakuFilter(fullnode bool, filterOpts ...legacy_filter.Option) WakuNodeOption { - return func(params *WakuNodeParameters) error { - params.enableLegacyFilter = true - params.isLegacyFilterFullNode = fullnode - params.legacyFilterOpts = filterOpts - return nil - } -} - // WithWakuFilter enables the Waku Filter V2 protocol for lightnode functionality func WithWakuFilterLightNode() WakuNodeOption { return func(params *WakuNodeParameters) error { diff --git a/waku/v2/node/wakuoptions_test.go b/waku/v2/node/wakuoptions_test.go index addd80a5..eb4793d6 100644 --- a/waku/v2/node/wakuoptions_test.go +++ b/waku/v2/node/wakuoptions_test.go @@ -54,7 +54,6 @@ func TestWakuOptions(t *testing.T) { WithPrivateKey(prvKey), WithLibP2POptions(), WithWakuRelay(), - WithLegacyWakuFilter(true), WithDiscoveryV5(123, nil, false), WithWakuStore(), WithMessageProvider(&persistence.DBStore{}), @@ -104,7 +103,6 @@ func TestWakuRLNOptions(t *testing.T) { WithPrivateKey(prvKey), WithLibP2POptions(), WithWakuRelay(), - WithLegacyWakuFilter(true), WithDiscoveryV5(123, nil, false), WithWakuStore(), WithMessageProvider(&persistence.DBStore{}), @@ -145,7 +143,6 @@ func TestWakuRLNOptions(t *testing.T) { WithPrivateKey(prvKey), WithLibP2POptions(), WithWakuRelay(), - WithLegacyWakuFilter(true), WithDiscoveryV5(123, nil, false), WithWakuStore(), WithMessageProvider(&persistence.DBStore{}), diff --git a/waku/v2/protocol/legacy_filter/filter_map.go b/waku/v2/protocol/legacy_filter/filter_map.go deleted file mode 100644 index f71e35a6..00000000 --- a/waku/v2/protocol/legacy_filter/filter_map.go +++ /dev/null @@ -1,115 +0,0 @@ -package legacy_filter - -import ( - "sync" - - "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/protocol/relay" - "github.com/waku-org/go-waku/waku/v2/timesource" -) - -type FilterMap struct { - sync.RWMutex - timesource timesource.Timesource - items map[string]Filter - broadcaster relay.Broadcaster -} - -type FilterMapItem struct { - Key string - Value Filter -} - -func NewFilterMap(broadcaster relay.Broadcaster, timesource timesource.Timesource) *FilterMap { - return &FilterMap{ - timesource: timesource, - items: make(map[string]Filter), - broadcaster: broadcaster, - } -} - -func (fm *FilterMap) Set(key string, value Filter) { - fm.Lock() - defer fm.Unlock() - - fm.items[key] = value -} - -func (fm *FilterMap) Get(key string) (Filter, bool) { - fm.Lock() - defer fm.Unlock() - - value, ok := fm.items[key] - - return value, ok -} - -func (fm *FilterMap) Delete(key string) { - fm.Lock() - defer fm.Unlock() - - _, ok := fm.items[key] - if !ok { - return - } - - close(fm.items[key].Chan) - delete(fm.items, key) -} - -func (fm *FilterMap) RemoveAll() { - fm.Lock() - defer fm.Unlock() - - for k, v := range fm.items { - close(v.Chan) - delete(fm.items, k) - } -} - -func (fm *FilterMap) Items() <-chan FilterMapItem { - c := make(chan FilterMapItem) - - f := func() { - fm.RLock() - defer fm.RUnlock() - - for k, v := range fm.items { - c <- FilterMapItem{k, v} - } - close(c) - } - go f() - - return c -} - -// Notify is used to push a received message from a filter subscription to -// any content filter registered on this node and to the broadcast subscribers -func (fm *FilterMap) Notify(msg *pb.WakuMessage, requestID string) { - fm.RLock() - defer fm.RUnlock() - - filter, ok := fm.items[requestID] - if !ok { - // We do this because the key for the filter is set to the requestID received from the filter protocol. - // This means we do not need to check the content filter explicitly as all MessagePushs already contain - // the requestID of the coresponding filter. - return - } - - envelope := protocol.NewEnvelope(msg, fm.timesource.Now().UnixNano(), filter.Topic) - - // Broadcasting message so it's stored - fm.broadcaster.Submit(envelope) - - // TODO: In case of no topics we should either trigger here for all messages, - // or we should not allow such filter to exist in the first place. - for _, contentTopic := range filter.ContentFilters { - if msg.ContentTopic == contentTopic { - filter.Chan <- envelope - break - } - } -} diff --git a/waku/v2/protocol/legacy_filter/filter_map_test.go b/waku/v2/protocol/legacy_filter/filter_map_test.go deleted file mode 100644 index 1a38c8f7..00000000 --- a/waku/v2/protocol/legacy_filter/filter_map_test.go +++ /dev/null @@ -1,37 +0,0 @@ -package legacy_filter - -import ( - "context" - "testing" - - "github.com/stretchr/testify/require" - "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/relay" - "github.com/waku-org/go-waku/waku/v2/timesource" -) - -func TestFilterMap(t *testing.T) { - b := relay.NewBroadcaster(100) - require.NoError(t, b.Start(context.Background())) - fmap := NewFilterMap(b, timesource.NewDefaultClock()) - - filter := Filter{ - PeerID: "id", - Topic: "test", - ContentFilters: []string{"test"}, - Chan: make(chan *protocol.Envelope), - } - - fmap.Set("test", filter) - res := <-fmap.Items() - require.Equal(t, "test", res.Key) - - item, ok := fmap.Get("test") - require.True(t, ok) - require.Equal(t, "test", item.Topic) - - fmap.Delete("test") - - _, ok = fmap.Get("test") - require.False(t, ok) -} diff --git a/waku/v2/protocol/legacy_filter/filter_subscribers.go b/waku/v2/protocol/legacy_filter/filter_subscribers.go deleted file mode 100644 index d3bd6862..00000000 --- a/waku/v2/protocol/legacy_filter/filter_subscribers.go +++ /dev/null @@ -1,167 +0,0 @@ -package legacy_filter - -import ( - "sync" - "time" - - "github.com/libp2p/go-libp2p/core/peer" - "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb" -) - -type Subscriber struct { - peer peer.ID - requestID string - filter *pb.FilterRequest // @TODO MAKE THIS A SEQUENCE AGAIN? -} - -func (sub Subscriber) HasContentTopic(topic string) bool { - if len(sub.filter.ContentFilters) == 0 { - return true // When the subscriber has no specific ContentTopic filter - } - - for _, filter := range sub.filter.ContentFilters { - if filter.ContentTopic == topic { - return true - } - } - return false -} - -type Subscribers struct { - sync.RWMutex - subscribers []Subscriber - timeout time.Duration - failedPeers map[peer.ID]time.Time -} - -func NewSubscribers(timeout time.Duration) *Subscribers { - return &Subscribers{ - timeout: timeout, - failedPeers: make(map[peer.ID]time.Time), - } -} - -func (sub *Subscribers) Clear() { - sub.Lock() - defer sub.Unlock() - - sub.subscribers = nil - sub.failedPeers = make(map[peer.ID]time.Time) -} - -func (sub *Subscribers) Append(s Subscriber) int { - sub.Lock() - defer sub.Unlock() - - sub.subscribers = append(sub.subscribers, s) - return len(sub.subscribers) -} - -func (sub *Subscribers) Items(contentTopic *string) <-chan Subscriber { - c := make(chan Subscriber) - - f := func() { - sub.RLock() - defer sub.RUnlock() - for _, s := range sub.subscribers { - if contentTopic == nil || s.HasContentTopic(*contentTopic) { - c <- s - } - } - close(c) - } - go f() - - return c -} - -func (sub *Subscribers) Length() int { - sub.RLock() - defer sub.RUnlock() - - return len(sub.subscribers) -} - -func (sub *Subscribers) IsFailedPeer(peerID peer.ID) bool { - sub.RLock() - defer sub.RUnlock() - _, ok := sub.failedPeers[peerID] - return ok -} - -func (sub *Subscribers) FlagAsSuccess(peerID peer.ID) { - sub.Lock() - defer sub.Unlock() - - _, ok := sub.failedPeers[peerID] - if ok { - delete(sub.failedPeers, peerID) - } -} - -func (sub *Subscribers) FlagAsFailure(peerID peer.ID) { - sub.Lock() - defer sub.Unlock() - - lastFailure, ok := sub.failedPeers[peerID] - if ok { - elapsedTime := time.Since(lastFailure) - if elapsedTime > sub.timeout { - var tmpSubs []Subscriber - for _, s := range sub.subscribers { - if s.peer != peerID { - tmpSubs = append(tmpSubs, s) - } - } - sub.subscribers = tmpSubs - - delete(sub.failedPeers, peerID) - } - } else { - sub.failedPeers[peerID] = time.Now() - } -} - -// RemoveContentFilters removes a set of content filters registered for an specific peer -func (sub *Subscribers) RemoveContentFilters(peerID peer.ID, requestID string, contentFilters []*pb.FilterRequest_ContentFilter) { - sub.Lock() - defer sub.Unlock() - - var peerIdsToRemove []peer.ID - - for subIndex, subscriber := range sub.subscribers { - if subscriber.peer != peerID || subscriber.requestID != requestID { - continue - } - - // make sure we delete the content filter - // if no more topics are left - for _, contentFilter := range contentFilters { - subCfs := subscriber.filter.ContentFilters - for i, cf := range subCfs { - if cf.ContentTopic == contentFilter.ContentTopic { - l := len(subCfs) - 1 - subCfs[i] = subCfs[l] - subscriber.filter.ContentFilters = subCfs[:l] - } - } - sub.subscribers[subIndex] = subscriber - } - - if len(subscriber.filter.ContentFilters) == 0 { - peerIdsToRemove = append(peerIdsToRemove, subscriber.peer) - } - } - - // make sure we delete the subscriber - // if no more content filters left - for _, peerID := range peerIdsToRemove { - for i, s := range sub.subscribers { - if s.peer == peerID && s.requestID == requestID { - l := len(sub.subscribers) - 1 - sub.subscribers[i] = sub.subscribers[l] - sub.subscribers = sub.subscribers[:l] - } - } - } -} diff --git a/waku/v2/protocol/legacy_filter/filter_subscribers_test.go b/waku/v2/protocol/legacy_filter/filter_subscribers_test.go deleted file mode 100644 index cd8cf609..00000000 --- a/waku/v2/protocol/legacy_filter/filter_subscribers_test.go +++ /dev/null @@ -1,145 +0,0 @@ -package legacy_filter - -import ( - "testing" - "time" - - "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/test" - "github.com/stretchr/testify/assert" - "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb" -) - -const TOPIC = "/test/topic" - -func createPeerID(t *testing.T) peer.ID { - peerID, err := test.RandPeerID() - assert.NoError(t, err) - return peerID -} - -func firstSubscriber(subs *Subscribers, contentTopic string) *Subscriber { - for sub := range subs.Items(&contentTopic) { - return &sub - } - return nil -} - -func TestAppend(t *testing.T) { - subs := NewSubscribers(10 * time.Second) - peerID := createPeerID(t) - requestID := "request_1" - contentTopic := "topic1" - request := &pb.FilterRequest{ - Subscribe: true, - Topic: TOPIC, - ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: contentTopic}}, - } - subs.Append(Subscriber{peerID, requestID, request}) - - sub := firstSubscriber(subs, contentTopic) - assert.NotNil(t, sub) -} - -func TestRemove(t *testing.T) { - subs := NewSubscribers(10 * time.Second) - peerId := createPeerID(t) - requestID := "request_1" - contentTopic := "topic1" - request := &pb.FilterRequest{ - Subscribe: true, - Topic: TOPIC, - ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: contentTopic}}, - } - subs.Append(Subscriber{peerId, requestID, request}) - subs.RemoveContentFilters(peerId, requestID, request.ContentFilters) - - sub := firstSubscriber(subs, contentTopic) - assert.Nil(t, sub) -} - -func TestRemovePartial(t *testing.T) { - subs := NewSubscribers(10 * time.Second) - peerId := createPeerID(t) - requestID := "request_1" - topic1 := "topic1" - topic2 := "topic2" - request := &pb.FilterRequest{ - Subscribe: true, - Topic: TOPIC, - ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: topic1}, {ContentTopic: topic2}}, - } - subs.Append(Subscriber{peerId, requestID, request}) - subs.RemoveContentFilters(peerId, requestID, []*pb.FilterRequest_ContentFilter{{ContentTopic: topic1}}) - - sub := firstSubscriber(subs, topic2) - assert.NotNil(t, sub) - assert.Len(t, sub.filter.ContentFilters, 1) -} - -func TestRemoveDuplicateSubscriptions(t *testing.T) { - subs := NewSubscribers(10 * time.Second) - peerId := createPeerID(t) - topic := "topic" - requestID1 := "request_1" - requestID2 := "request_2" - request1 := &pb.FilterRequest{ - Subscribe: true, - Topic: TOPIC, - ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: topic}}, - } - request2 := &pb.FilterRequest{ - Subscribe: true, - Topic: TOPIC, - ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: topic}}, - } - subs.Append(Subscriber{peerId, requestID1, request1}) - subs.Append(Subscriber{peerId, requestID2, request2}) - subs.RemoveContentFilters(peerId, requestID2, []*pb.FilterRequest_ContentFilter{{ContentTopic: topic}}) - subs.RemoveContentFilters(peerId, requestID1, []*pb.FilterRequest_ContentFilter{{ContentTopic: topic}}) - - sub := firstSubscriber(subs, topic) - assert.Nil(t, sub) -} - -func TestRemoveDuplicateSubscriptionsPartial(t *testing.T) { - subs := NewSubscribers(10 * time.Second) - peerId := createPeerID(t) - topic := "topic" - requestID1 := "request_1" - requestID2 := "request_2" - request1 := &pb.FilterRequest{ - Subscribe: true, - Topic: TOPIC, - ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: topic}}, - } - request2 := &pb.FilterRequest{ - Subscribe: true, - Topic: TOPIC, - ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: topic}}, - } - subs.Append(Subscriber{peerId, requestID1, request1}) - subs.Append(Subscriber{peerId, requestID2, request2}) - subs.RemoveContentFilters(peerId, requestID1, []*pb.FilterRequest_ContentFilter{{ContentTopic: topic}}) - - sub := firstSubscriber(subs, topic) - assert.NotNil(t, sub) - assert.Equal(t, sub.requestID, requestID2) -} - -func TestRemoveBogus(t *testing.T) { - subs := NewSubscribers(10 * time.Second) - peerId := createPeerID(t) - requestID := "request_1" - contentTopic := "topic1" - request := &pb.FilterRequest{ - Subscribe: true, - Topic: TOPIC, - ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: contentTopic}}, - } - subs.Append(Subscriber{peerId, requestID, request}) - subs.RemoveContentFilters(peerId, requestID, []*pb.FilterRequest_ContentFilter{{ContentTopic: "does not exist"}, {ContentTopic: contentTopic}}) - - sub := firstSubscriber(subs, contentTopic) - assert.Nil(t, sub) -} diff --git a/waku/v2/protocol/legacy_filter/metrics.go b/waku/v2/protocol/legacy_filter/metrics.go deleted file mode 100644 index abb59822..00000000 --- a/waku/v2/protocol/legacy_filter/metrics.go +++ /dev/null @@ -1,75 +0,0 @@ -package legacy_filter - -import ( - "github.com/libp2p/go-libp2p/p2p/metricshelper" - "github.com/prometheus/client_golang/prometheus" -) - -var filterMessages = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "legacy_filter_messages", - Help: "The number of messages received via legacy filter protocol", - }) - -var filterErrors = prometheus.NewCounterVec( - prometheus.CounterOpts{ - Name: "legacy_filter_errors", - Help: "The distribution of the legacy filter protocol errors", - }, - []string{"error_type"}, -) - -var filterSubscribers = prometheus.NewGauge( - prometheus.GaugeOpts{ - Name: "legacy_filter_subscriptions", - Help: "The number of legacy filter subscribers", - }) - -var collectors = []prometheus.Collector{ - filterMessages, - filterErrors, - filterSubscribers, -} - -// Metrics exposes the functions required to update prometheus metrics for legacy filter protocol -type Metrics interface { - RecordMessages(num int) - RecordSubscribers(num int) - RecordError(err metricsErrCategory) -} - -type metricsImpl struct { - reg prometheus.Registerer -} - -func newMetrics(reg prometheus.Registerer) Metrics { - metricshelper.RegisterCollectors(reg, collectors...) - return &metricsImpl{ - reg: reg, - } -} - -// RecordMessage is used to increase the counter for the number of messages received via waku filter -func (m *metricsImpl) RecordMessages(num int) { - filterMessages.Add(float64(num)) -} - -type metricsErrCategory string - -var ( - decodeRPCFailure metricsErrCategory = "decode_rpc_failure" - dialFailure metricsErrCategory = "dial_failure" - pushWriteError metricsErrCategory = "push_write_error" - peerNotFoundFailure metricsErrCategory = "peer_not_found_failure" - writeRequestFailure metricsErrCategory = "write_request_failure" -) - -// RecordError increases the counter for different error types -func (m *metricsImpl) RecordError(err metricsErrCategory) { - filterErrors.WithLabelValues(string(err)).Inc() -} - -// RecordSubscribers track the current number of filter subscribers -func (m *metricsImpl) RecordSubscribers(num int) { - filterSubscribers.Set(float64(num)) -} diff --git a/waku/v2/protocol/legacy_filter/pb/generate.go b/waku/v2/protocol/legacy_filter/pb/generate.go deleted file mode 100644 index 45524a8c..00000000 --- a/waku/v2/protocol/legacy_filter/pb/generate.go +++ /dev/null @@ -1,5 +0,0 @@ -package pb - -//go:generate mv ./../../waku-proto/waku/filter/v2beta1/filter.proto ./../../waku-proto/waku/filter/v2beta1/legacy_filter.proto -//go:generate protoc -I./../../waku-proto/waku/filter/v2beta1/. -I./../../waku-proto/ --go_opt=paths=source_relative --go_opt=Mlegacy_filter.proto=github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb --go_opt=Mwaku/message/v1/message.proto=github.com/waku-org/go-waku/waku/v2/protocol/pb --go_out=. ./../../waku-proto/waku/filter/v2beta1/legacy_filter.proto -//go:generate mv ./../../waku-proto/waku/filter/v2beta1/legacy_filter.proto ./../../waku-proto/waku/filter/v2beta1/filter.proto diff --git a/waku/v2/protocol/legacy_filter/pb/legacy_filter.pb.go b/waku/v2/protocol/legacy_filter/pb/legacy_filter.pb.go deleted file mode 100644 index 9bbb181a..00000000 --- a/waku/v2/protocol/legacy_filter/pb/legacy_filter.pb.go +++ /dev/null @@ -1,394 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.31.0 -// protoc v4.24.4 -// source: legacy_filter.proto - -// 12/WAKU2-FILTER rfc: https://rfc.vac.dev/spec/12/ -// Protocol identifier: /vac/waku/filter/2.0.0-beta1 - -package pb - -import ( - pb "github.com/waku-org/go-waku/waku/v2/protocol/pb" - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type FilterRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Subscribe bool `protobuf:"varint,1,opt,name=subscribe,proto3" json:"subscribe,omitempty"` - Topic string `protobuf:"bytes,2,opt,name=topic,proto3" json:"topic,omitempty"` - ContentFilters []*FilterRequest_ContentFilter `protobuf:"bytes,3,rep,name=content_filters,json=contentFilters,proto3" json:"content_filters,omitempty"` -} - -func (x *FilterRequest) Reset() { - *x = FilterRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_legacy_filter_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *FilterRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*FilterRequest) ProtoMessage() {} - -func (x *FilterRequest) ProtoReflect() protoreflect.Message { - mi := &file_legacy_filter_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use FilterRequest.ProtoReflect.Descriptor instead. -func (*FilterRequest) Descriptor() ([]byte, []int) { - return file_legacy_filter_proto_rawDescGZIP(), []int{0} -} - -func (x *FilterRequest) GetSubscribe() bool { - if x != nil { - return x.Subscribe - } - return false -} - -func (x *FilterRequest) GetTopic() string { - if x != nil { - return x.Topic - } - return "" -} - -func (x *FilterRequest) GetContentFilters() []*FilterRequest_ContentFilter { - if x != nil { - return x.ContentFilters - } - return nil -} - -type MessagePush struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Messages []*pb.WakuMessage `protobuf:"bytes,1,rep,name=messages,proto3" json:"messages,omitempty"` -} - -func (x *MessagePush) Reset() { - *x = MessagePush{} - if protoimpl.UnsafeEnabled { - mi := &file_legacy_filter_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *MessagePush) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*MessagePush) ProtoMessage() {} - -func (x *MessagePush) ProtoReflect() protoreflect.Message { - mi := &file_legacy_filter_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use MessagePush.ProtoReflect.Descriptor instead. -func (*MessagePush) Descriptor() ([]byte, []int) { - return file_legacy_filter_proto_rawDescGZIP(), []int{1} -} - -func (x *MessagePush) GetMessages() []*pb.WakuMessage { - if x != nil { - return x.Messages - } - return nil -} - -type FilterRpc struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"` - Request *FilterRequest `protobuf:"bytes,2,opt,name=request,proto3,oneof" json:"request,omitempty"` - Push *MessagePush `protobuf:"bytes,3,opt,name=push,proto3,oneof" json:"push,omitempty"` -} - -func (x *FilterRpc) Reset() { - *x = FilterRpc{} - if protoimpl.UnsafeEnabled { - mi := &file_legacy_filter_proto_msgTypes[2] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *FilterRpc) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*FilterRpc) ProtoMessage() {} - -func (x *FilterRpc) ProtoReflect() protoreflect.Message { - mi := &file_legacy_filter_proto_msgTypes[2] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use FilterRpc.ProtoReflect.Descriptor instead. -func (*FilterRpc) Descriptor() ([]byte, []int) { - return file_legacy_filter_proto_rawDescGZIP(), []int{2} -} - -func (x *FilterRpc) GetRequestId() string { - if x != nil { - return x.RequestId - } - return "" -} - -func (x *FilterRpc) GetRequest() *FilterRequest { - if x != nil { - return x.Request - } - return nil -} - -func (x *FilterRpc) GetPush() *MessagePush { - if x != nil { - return x.Push - } - return nil -} - -type FilterRequest_ContentFilter struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - ContentTopic string `protobuf:"bytes,1,opt,name=content_topic,json=contentTopic,proto3" json:"content_topic,omitempty"` -} - -func (x *FilterRequest_ContentFilter) Reset() { - *x = FilterRequest_ContentFilter{} - if protoimpl.UnsafeEnabled { - mi := &file_legacy_filter_proto_msgTypes[3] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *FilterRequest_ContentFilter) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*FilterRequest_ContentFilter) ProtoMessage() {} - -func (x *FilterRequest_ContentFilter) ProtoReflect() protoreflect.Message { - mi := &file_legacy_filter_proto_msgTypes[3] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use FilterRequest_ContentFilter.ProtoReflect.Descriptor instead. -func (*FilterRequest_ContentFilter) Descriptor() ([]byte, []int) { - return file_legacy_filter_proto_rawDescGZIP(), []int{0, 0} -} - -func (x *FilterRequest_ContentFilter) GetContentTopic() string { - if x != nil { - return x.ContentTopic - } - return "" -} - -var File_legacy_filter_proto protoreflect.FileDescriptor - -var file_legacy_filter_proto_rawDesc = []byte{ - 0x0a, 0x13, 0x6c, 0x65, 0x67, 0x61, 0x63, 0x79, 0x5f, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x13, 0x77, 0x61, 0x6b, 0x75, 0x2e, 0x66, 0x69, 0x6c, 0x74, - 0x65, 0x72, 0x2e, 0x76, 0x32, 0x62, 0x65, 0x74, 0x61, 0x31, 0x1a, 0x1d, 0x77, 0x61, 0x6b, 0x75, - 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2f, 0x76, 0x31, 0x2f, 0x6d, 0x65, 0x73, 0x73, - 0x61, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xd4, 0x01, 0x0a, 0x0d, 0x46, 0x69, - 0x6c, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x73, - 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x09, - 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, - 0x69, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, - 0x59, 0x0a, 0x0f, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x5f, 0x66, 0x69, 0x6c, 0x74, 0x65, - 0x72, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x30, 0x2e, 0x77, 0x61, 0x6b, 0x75, 0x2e, - 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x62, 0x65, 0x74, 0x61, 0x31, 0x2e, 0x46, - 0x69, 0x6c, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x43, 0x6f, 0x6e, - 0x74, 0x65, 0x6e, 0x74, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x52, 0x0e, 0x63, 0x6f, 0x6e, 0x74, - 0x65, 0x6e, 0x74, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x73, 0x1a, 0x34, 0x0a, 0x0d, 0x43, 0x6f, - 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x12, 0x23, 0x0a, 0x0d, 0x63, - 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x0c, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63, - 0x22, 0x47, 0x0a, 0x0b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x50, 0x75, 0x73, 0x68, 0x12, - 0x38, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, - 0x0b, 0x32, 0x1c, 0x2e, 0x77, 0x61, 0x6b, 0x75, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, - 0x2e, 0x76, 0x31, 0x2e, 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52, - 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x22, 0xbd, 0x01, 0x0a, 0x09, 0x46, 0x69, - 0x6c, 0x74, 0x65, 0x72, 0x52, 0x70, 0x63, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x41, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x77, 0x61, 0x6b, 0x75, 0x2e, 0x66, - 0x69, 0x6c, 0x74, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x62, 0x65, 0x74, 0x61, 0x31, 0x2e, 0x46, 0x69, - 0x6c, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x07, 0x72, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x88, 0x01, 0x01, 0x12, 0x39, 0x0a, 0x04, 0x70, 0x75, 0x73, - 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x77, 0x61, 0x6b, 0x75, 0x2e, 0x66, - 0x69, 0x6c, 0x74, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x62, 0x65, 0x74, 0x61, 0x31, 0x2e, 0x4d, 0x65, - 0x73, 0x73, 0x61, 0x67, 0x65, 0x50, 0x75, 0x73, 0x68, 0x48, 0x01, 0x52, 0x04, 0x70, 0x75, 0x73, - 0x68, 0x88, 0x01, 0x01, 0x42, 0x0a, 0x0a, 0x08, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x42, 0x07, 0x0a, 0x05, 0x5f, 0x70, 0x75, 0x73, 0x68, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x33, -} - -var ( - file_legacy_filter_proto_rawDescOnce sync.Once - file_legacy_filter_proto_rawDescData = file_legacy_filter_proto_rawDesc -) - -func file_legacy_filter_proto_rawDescGZIP() []byte { - file_legacy_filter_proto_rawDescOnce.Do(func() { - file_legacy_filter_proto_rawDescData = protoimpl.X.CompressGZIP(file_legacy_filter_proto_rawDescData) - }) - return file_legacy_filter_proto_rawDescData -} - -var file_legacy_filter_proto_msgTypes = make([]protoimpl.MessageInfo, 4) -var file_legacy_filter_proto_goTypes = []interface{}{ - (*FilterRequest)(nil), // 0: waku.filter.v2beta1.FilterRequest - (*MessagePush)(nil), // 1: waku.filter.v2beta1.MessagePush - (*FilterRpc)(nil), // 2: waku.filter.v2beta1.FilterRpc - (*FilterRequest_ContentFilter)(nil), // 3: waku.filter.v2beta1.FilterRequest.ContentFilter - (*pb.WakuMessage)(nil), // 4: waku.message.v1.WakuMessage -} -var file_legacy_filter_proto_depIdxs = []int32{ - 3, // 0: waku.filter.v2beta1.FilterRequest.content_filters:type_name -> waku.filter.v2beta1.FilterRequest.ContentFilter - 4, // 1: waku.filter.v2beta1.MessagePush.messages:type_name -> waku.message.v1.WakuMessage - 0, // 2: waku.filter.v2beta1.FilterRpc.request:type_name -> waku.filter.v2beta1.FilterRequest - 1, // 3: waku.filter.v2beta1.FilterRpc.push:type_name -> waku.filter.v2beta1.MessagePush - 4, // [4:4] is the sub-list for method output_type - 4, // [4:4] is the sub-list for method input_type - 4, // [4:4] is the sub-list for extension type_name - 4, // [4:4] is the sub-list for extension extendee - 0, // [0:4] is the sub-list for field type_name -} - -func init() { file_legacy_filter_proto_init() } -func file_legacy_filter_proto_init() { - if File_legacy_filter_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_legacy_filter_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*FilterRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_legacy_filter_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*MessagePush); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_legacy_filter_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*FilterRpc); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_legacy_filter_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*FilterRequest_ContentFilter); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - file_legacy_filter_proto_msgTypes[2].OneofWrappers = []interface{}{} - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_legacy_filter_proto_rawDesc, - NumEnums: 0, - NumMessages: 4, - NumExtensions: 0, - NumServices: 0, - }, - GoTypes: file_legacy_filter_proto_goTypes, - DependencyIndexes: file_legacy_filter_proto_depIdxs, - MessageInfos: file_legacy_filter_proto_msgTypes, - }.Build() - File_legacy_filter_proto = out.File - file_legacy_filter_proto_rawDesc = nil - file_legacy_filter_proto_goTypes = nil - file_legacy_filter_proto_depIdxs = nil -} diff --git a/waku/v2/protocol/legacy_filter/waku_filter.go b/waku/v2/protocol/legacy_filter/waku_filter.go deleted file mode 100644 index 116b1be6..00000000 --- a/waku/v2/protocol/legacy_filter/waku_filter.go +++ /dev/null @@ -1,471 +0,0 @@ -package legacy_filter - -import ( - "context" - "encoding/hex" - "errors" - "math" - - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/peer" - libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" - "github.com/libp2p/go-msgio/pbio" - "github.com/prometheus/client_golang/prometheus" - "github.com/waku-org/go-waku/logging" - "github.com/waku-org/go-waku/waku/v2/peermanager" - "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb" - wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/protocol/relay" - "github.com/waku-org/go-waku/waku/v2/service" - "github.com/waku-org/go-waku/waku/v2/timesource" - "go.uber.org/zap" - "golang.org/x/sync/errgroup" -) - -var ( - ErrNoPeersAvailable = errors.New("no suitable remote peers") -) - -type ( - Filter struct { - filterID string - PeerID peer.ID - Topic string - ContentFilters []string - Chan chan *protocol.Envelope - } - - ContentFilter struct { - Topic string - ContentTopics []string - } - - FilterSubscription struct { - RequestID string - Peer peer.ID - } - - WakuFilter struct { - *service.CommonService - h host.Host - pm *peermanager.PeerManager - isFullNode bool - msgSub *relay.Subscription - metrics Metrics - log *zap.Logger - - filters *FilterMap - subscribers *Subscribers - } -) - -// FilterID_v20beta1 is the current Waku Filter protocol identifier -const FilterID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter/2.0.0-beta1") - -// NewWakuRelay returns a new instance of Waku Filter struct setup according to the chosen parameter and options -func NewWakuFilter(broadcaster relay.Broadcaster, isFullNode bool, timesource timesource.Timesource, reg prometheus.Registerer, log *zap.Logger, opts ...Option) *WakuFilter { - wf := new(WakuFilter) - wf.log = log.Named("filter").With(zap.Bool("fullNode", isFullNode)) - - params := new(FilterParameters) - optList := DefaultOptions() - optList = append(optList, opts...) - for _, opt := range optList { - opt(params) - } - - wf.isFullNode = isFullNode - wf.CommonService = service.NewCommonService() - wf.filters = NewFilterMap(broadcaster, timesource) - wf.subscribers = NewSubscribers(params.Timeout) - wf.metrics = newMetrics(reg) - - return wf -} - -// Sets the host to be able to mount or consume a protocol -func (wf *WakuFilter) SetHost(h host.Host) { - wf.h = h -} - -func (wf *WakuFilter) Start(ctx context.Context, sub *relay.Subscription) error { - return wf.CommonService.Start(ctx, func() error { - return wf.start(sub) - }) -} - -func (wf *WakuFilter) start(sub *relay.Subscription) error { - wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest(wf.Context())) - wf.msgSub = sub - wf.WaitGroup().Add(1) - go wf.filterListener(wf.Context()) - wf.log.Info("filter protocol started") - return nil -} -func (wf *WakuFilter) onRequest(ctx context.Context) func(network.Stream) { - return func(stream network.Stream) { - peerID := stream.Conn().RemotePeer() - logger := wf.log.With(logging.HostID("peer", peerID)) - - filterRPCRequest := &pb.FilterRpc{} - - reader := pbio.NewDelimitedReader(stream, math.MaxInt32) - - err := reader.ReadMsg(filterRPCRequest) - if err != nil { - wf.metrics.RecordError(decodeRPCFailure) - logger.Error("reading request", zap.Error(err)) - if err := stream.Reset(); err != nil { - wf.log.Error("resetting connection", zap.Error(err)) - } - return - } - - logger.Info("received request") - - if filterRPCRequest.Push != nil && len(filterRPCRequest.Push.Messages) > 0 { - // We're on a light node. - // This is a message push coming from a full node. - for _, message := range filterRPCRequest.Push.Messages { - wf.filters.Notify(message, filterRPCRequest.RequestId) // Trigger filter handlers on a light node - } - - logger.Info("received a message push", zap.Int("messages", len(filterRPCRequest.Push.Messages))) - wf.metrics.RecordMessages(len(filterRPCRequest.Push.Messages)) - } else if filterRPCRequest.Request != nil && wf.isFullNode { - // We're on a full node. - // This is a filter request coming from a light node. - if filterRPCRequest.Request.Subscribe { - subscriber := Subscriber{peer: stream.Conn().RemotePeer(), requestID: filterRPCRequest.RequestId, filter: filterRPCRequest.Request} - if subscriber.filter.Topic == "" { // @TODO: review if empty topic is possible - subscriber.filter.Topic = relay.DefaultWakuTopic - } - - subscribersLen := wf.subscribers.Append(subscriber) - - logger.Info("adding subscriber") - wf.metrics.RecordSubscribers(subscribersLen) - } else { - wf.subscribers.RemoveContentFilters(peerID, filterRPCRequest.RequestId, filterRPCRequest.Request.ContentFilters) - - logger.Info("removing subscriber") - wf.metrics.RecordSubscribers(wf.subscribers.Length()) - } - } else { - logger.Error("can't serve request") - if err := stream.Reset(); err != nil { - wf.log.Error("resetting connection", zap.Error(err)) - } - return - } - - stream.Close() - } -} - -func (wf *WakuFilter) pushMessage(ctx context.Context, subscriber Subscriber, msg *wpb.WakuMessage) error { - pushRPC := &pb.FilterRpc{RequestId: subscriber.requestID, Push: &pb.MessagePush{Messages: []*wpb.WakuMessage{msg}}} - logger := wf.log.With(logging.HostID("peer", subscriber.peer)) - - stream, err := wf.h.NewStream(ctx, subscriber.peer, FilterID_v20beta1) - if err != nil { - wf.subscribers.FlagAsFailure(subscriber.peer) - logger.Error("opening peer stream", zap.Error(err)) - wf.metrics.RecordError(dialFailure) - return err - } - - writer := pbio.NewDelimitedWriter(stream) - err = writer.WriteMsg(pushRPC) - if err != nil { - logger.Error("pushing messages to peer", zap.Error(err)) - wf.subscribers.FlagAsFailure(subscriber.peer) - wf.metrics.RecordError(pushWriteError) - if err := stream.Reset(); err != nil { - wf.log.Error("resetting connection", zap.Error(err)) - } - return nil - } - - stream.Close() - - wf.subscribers.FlagAsSuccess(subscriber.peer) - return nil -} - -func (wf *WakuFilter) filterListener(ctx context.Context) { - defer wf.WaitGroup().Done() - - // This function is invoked for each message received - // on the full node in context of Waku2-Filter - handle := func(envelope *protocol.Envelope) error { // async - msg := envelope.Message() - pubsubTopic := envelope.PubsubTopic() - logger := wf.log.With(zap.Stringer("message", msg)) - g := new(errgroup.Group) - // Each subscriber is a light node that earlier on invoked - // a FilterRequest on this node - for subscriber := range wf.subscribers.Items(&(msg.ContentTopic)) { - logger := logger.With(logging.HostID("subscriber", subscriber.peer)) - subscriber := subscriber // https://golang.org/doc/faq#closures_and_goroutines - if subscriber.filter.Topic != pubsubTopic { - logger.Info("pubsub topic mismatch", - zap.String("subscriberTopic", subscriber.filter.Topic), - zap.String("messageTopic", pubsubTopic)) - continue - } - - // Do a message push to light node - logger.Info("pushing message to light node", zap.String("contentTopic", msg.ContentTopic)) - g.Go(func() (err error) { - err = wf.pushMessage(ctx, subscriber, msg) - if err != nil { - logger.Error("pushing message", zap.Error(err)) - } - return err - }) - } - - return g.Wait() - } - - for m := range wf.msgSub.Ch { - if err := handle(m); err != nil { - wf.log.Error("handling message", zap.Error(err)) - } - } -} - -// Having a FilterRequest struct, -// select a peer with filter support, dial it, -// and submit FilterRequest wrapped in FilterRPC -func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFilter, opts ...FilterSubscribeOption) (subscription *FilterSubscription, err error) { - params := new(FilterSubscribeParameters) - params.log = wf.log - params.host = wf.h - - optList := DefaultSubscribtionOptions() - optList = append(optList, opts...) - for _, opt := range optList { - opt(params) - } - if wf.pm != nil && params.selectedPeer == "" { - selectedPeers, _ := wf.pm.SelectPeers( - peermanager.PeerSelectionCriteria{ - SelectionType: params.peerSelectionType, - Proto: FilterID_v20beta1, - PubsubTopics: []string{filter.Topic}, - SpecificPeers: params.preferredPeers, - Ctx: ctx, - }, - ) - if err != nil { - params.selectedPeer = selectedPeers[0] - } - } - if params.selectedPeer == "" { - wf.metrics.RecordError(peerNotFoundFailure) - return nil, ErrNoPeersAvailable - } - - var contentFilters []*pb.FilterRequest_ContentFilter - for _, ct := range filter.ContentTopics { - contentFilters = append(contentFilters, &pb.FilterRequest_ContentFilter{ContentTopic: ct}) - } - - request := &pb.FilterRequest{ - Subscribe: true, - Topic: filter.Topic, - ContentFilters: contentFilters, - } - - stream, err := wf.h.NewStream(ctx, params.selectedPeer, FilterID_v20beta1) - if err != nil { - wf.metrics.RecordError(dialFailure) - return - } - - // This is the only successful path to subscription - requestID := hex.EncodeToString(protocol.GenerateRequestID()) - - writer := pbio.NewDelimitedWriter(stream) - filterRPC := &pb.FilterRpc{RequestId: requestID, Request: request} - wf.log.Debug("sending filterRPC", zap.Stringer("rpc", filterRPC)) - err = writer.WriteMsg(filterRPC) - if err != nil { - wf.metrics.RecordError(writeRequestFailure) - wf.log.Error("sending filterRPC", zap.Error(err)) - if err := stream.Reset(); err != nil { - wf.log.Error("resetting connection", zap.Error(err)) - } - return - } - - stream.Close() - - subscription = new(FilterSubscription) - subscription.Peer = params.selectedPeer - subscription.RequestID = requestID - - return -} - -// Unsubscribe is used to stop receiving messages from a peer that match a content filter -func (wf *WakuFilter) Unsubscribe(ctx context.Context, contentFilter ContentFilter, peer peer.ID) error { - stream, err := wf.h.NewStream(ctx, peer, FilterID_v20beta1) - if err != nil { - wf.metrics.RecordError(dialFailure) - return err - } - - // This is the only successful path to subscription - id := protocol.GenerateRequestID() - - var contentFilters []*pb.FilterRequest_ContentFilter - for _, ct := range contentFilter.ContentTopics { - contentFilters = append(contentFilters, &pb.FilterRequest_ContentFilter{ContentTopic: ct}) - } - - request := &pb.FilterRequest{ - Subscribe: false, - Topic: contentFilter.Topic, - ContentFilters: contentFilters, - } - - writer := pbio.NewDelimitedWriter(stream) - filterRPC := &pb.FilterRpc{RequestId: hex.EncodeToString(id), Request: request} - err = writer.WriteMsg(filterRPC) - if err != nil { - wf.metrics.RecordError(writeRequestFailure) - if err := stream.Reset(); err != nil { - wf.log.Error("resetting connection", zap.Error(err)) - } - return err - } - - stream.Close() - - return nil -} - -// Stop unmounts the filter protocol -func (wf *WakuFilter) Stop() { - wf.CommonService.Stop(func() { - wf.msgSub.Unsubscribe() - - wf.h.RemoveStreamHandler(FilterID_v20beta1) - wf.filters.RemoveAll() - wf.subscribers.Clear() - }) -} - -// Subscribe setups a subscription to receive messages that match a specific content filter -func (wf *WakuFilter) Subscribe(ctx context.Context, f ContentFilter, opts ...FilterSubscribeOption) (filterID string, theFilter Filter, err error) { - // TODO: check if there's an existing pubsub topic that uses the same peer. If so, reuse filter, and return same channel and filterID - - // Registers for messages that match a specific filter. Triggers the handler whenever a message is received. - // ContentFilterChan takes MessagePush structs - remoteSubs, err := wf.requestSubscription(ctx, f, opts...) - if err != nil || remoteSubs.RequestID == "" { - // Failed to subscribe - wf.log.Error("requesting subscription", zap.Error(err)) - return - } - - // Register handler for filter, whether remote subscription succeeded or not - - filterID = remoteSubs.RequestID - theFilter = Filter{ - filterID: filterID, - PeerID: remoteSubs.Peer, - Topic: f.Topic, - ContentFilters: f.ContentTopics, - Chan: make(chan *protocol.Envelope, 1024), // To avoid blocking - } - wf.filters.Set(filterID, theFilter) - - return -} - -// UnsubscribeFilterByID removes a subscription to a filter node completely -// using using a filter. It also closes the filter channel -func (wf *WakuFilter) UnsubscribeByFilter(ctx context.Context, filter Filter) error { - err := wf.UnsubscribeFilterByID(ctx, filter.filterID) - if err != nil { - close(filter.Chan) - } - return err -} - -// UnsubscribeFilterByID removes a subscription to a filter node completely -// using the filterID returned when the subscription was created -func (wf *WakuFilter) UnsubscribeFilterByID(ctx context.Context, filterID string) error { - - var f Filter - var ok bool - - if f, ok = wf.filters.Get(filterID); !ok { - return errors.New("filter not found") - } - - cf := ContentFilter{ - Topic: f.Topic, - ContentTopics: f.ContentFilters, - } - - err := wf.Unsubscribe(ctx, cf, f.PeerID) - if err != nil { - return err - } - - wf.filters.Delete(filterID) - - return nil -} - -// Unsubscribe filter removes content topics from a filter subscription. If all -// the contentTopics are removed the subscription is dropped completely -func (wf *WakuFilter) UnsubscribeFilter(ctx context.Context, cf ContentFilter) error { - // Remove local filter - idsToRemove := make(map[string]struct{}) - for filterMapItem := range wf.filters.Items() { - f := filterMapItem.Value - id := filterMapItem.Key - - if f.Topic != cf.Topic { - continue - } - - // Send message to full node in order to unsubscribe - err := wf.Unsubscribe(ctx, cf, f.PeerID) - if err != nil { - return err - } - - // Iterate filter entries to remove matching content topics - // make sure we delete the content filter - // if no more topics are left - for _, cfToDelete := range cf.ContentTopics { - for i, cf := range f.ContentFilters { - if cf == cfToDelete { - l := len(f.ContentFilters) - 1 - f.ContentFilters[l], f.ContentFilters[i] = f.ContentFilters[i], f.ContentFilters[l] - f.ContentFilters = f.ContentFilters[:l] - break - } - - } - if len(f.ContentFilters) == 0 { - idsToRemove[id] = struct{}{} - } - } - } - - for rID := range idsToRemove { - wf.filters.Delete(rID) - } - - return nil -} diff --git a/waku/v2/protocol/legacy_filter/waku_filter_option.go b/waku/v2/protocol/legacy_filter/waku_filter_option.go deleted file mode 100644 index 4b103a36..00000000 --- a/waku/v2/protocol/legacy_filter/waku_filter_option.go +++ /dev/null @@ -1,79 +0,0 @@ -package legacy_filter - -import ( - "time" - - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/waku-org/go-waku/waku/v2/peermanager" - "go.uber.org/zap" -) - -type ( - FilterSubscribeParameters struct { - host host.Host - selectedPeer peer.ID - peerSelectionType peermanager.PeerSelection - preferredPeers peer.IDSlice - log *zap.Logger - } - - FilterSubscribeOption func(*FilterSubscribeParameters) - - FilterParameters struct { - Timeout time.Duration - pm *peermanager.PeerManager - } - - Option func(*FilterParameters) -) - -func WithTimeout(timeout time.Duration) Option { - return func(params *FilterParameters) { - params.Timeout = timeout - } -} - -func WithPeerManager(pm *peermanager.PeerManager) Option { - return func(params *FilterParameters) { - params.pm = pm - } -} - -func WithPeer(p peer.ID) FilterSubscribeOption { - return func(params *FilterSubscribeParameters) { - params.selectedPeer = p - } -} - -// WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store. -// If a list of specific peers is passed, the peer will be chosen from that list assuming it -// supports the chosen protocol, otherwise it will chose a peer from the node peerstore -func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption { - return func(params *FilterSubscribeParameters) { - params.peerSelectionType = peermanager.Automatic - params.preferredPeers = fromThesePeers - } -} - -// WithFastestPeerSelection is an option used to select a peer from the peer store -// with the lowest ping If a list of specific peers is passed, the peer will be chosen -// from that list assuming it supports the chosen protocol, otherwise it will chose a -// peer from the node peerstore -func WithFastestPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption { - return func(params *FilterSubscribeParameters) { - params.peerSelectionType = peermanager.LowestRTT - } -} - -func DefaultOptions() []Option { - return []Option{ - WithTimeout(24 * time.Hour), - } -} - -func DefaultSubscribtionOptions() []FilterSubscribeOption { - return []FilterSubscribeOption{ - WithAutomaticPeerSelection(), - } -} diff --git a/waku/v2/protocol/legacy_filter/waku_filter_option_test.go b/waku/v2/protocol/legacy_filter/waku_filter_option_test.go deleted file mode 100644 index bf0e0ffc..00000000 --- a/waku/v2/protocol/legacy_filter/waku_filter_option_test.go +++ /dev/null @@ -1,36 +0,0 @@ -package legacy_filter - -import ( - "context" - "crypto/rand" - "testing" - - "github.com/stretchr/testify/require" - "github.com/waku-org/go-waku/tests" - "github.com/waku-org/go-waku/waku/v2/utils" -) - -func TestFilterOption(t *testing.T) { - port, err := tests.FindFreePort(t, "", 5) - require.NoError(t, err) - - host, err := tests.MakeHost(context.Background(), port, rand.Reader) - require.NoError(t, err) - - options := []FilterSubscribeOption{ - WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"), - WithAutomaticPeerSelection(), - WithFastestPeerSelection(), - } - - params := new(FilterSubscribeParameters) - params.host = host - params.log = utils.Logger() - - for _, opt := range options { - opt(params) - } - - require.Equal(t, host, params.host) - require.NotNil(t, params.selectedPeer) -} diff --git a/waku/v2/protocol/legacy_filter/waku_filter_test.go b/waku/v2/protocol/legacy_filter/waku_filter_test.go deleted file mode 100644 index 4f35ace2..00000000 --- a/waku/v2/protocol/legacy_filter/waku_filter_test.go +++ /dev/null @@ -1,241 +0,0 @@ -package legacy_filter - -import ( - "context" - "crypto/rand" - "sync" - "testing" - "time" - - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/peerstore" - "github.com/prometheus/client_golang/prometheus" - "github.com/stretchr/testify/require" - "github.com/waku-org/go-waku/tests" - "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/relay" - "github.com/waku-org/go-waku/waku/v2/timesource" - "github.com/waku-org/go-waku/waku/v2/utils" -) - -var testTopic = "/waku/2/go/filter/test" -var testContentTopic = "TopicA" - -func makeWakuRelay(t *testing.T, topic string, broadcaster relay.Broadcaster) (*relay.WakuRelay, *relay.Subscription, host.Host) { - port, err := tests.FindFreePort(t, "", 5) - require.NoError(t, err) - - host, err := tests.MakeHost(context.Background(), port, rand.Reader) - require.NoError(t, err) - - relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) - relay.SetHost(host) - err = relay.Start(context.Background()) - require.NoError(t, err) - - sub, err := relay.Subscribe(context.Background(), protocol.NewContentFilter(topic)) - require.NoError(t, err) - - return relay, sub[0], host -} - -func makeWakuFilter(t *testing.T) (*WakuFilter, host.Host) { - port, err := tests.FindFreePort(t, "", 5) - require.NoError(t, err) - - host, err := tests.MakeHost(context.Background(), port, rand.Reader) - require.NoError(t, err) - - b := relay.NewBroadcaster(10) - require.NoError(t, b.Start(context.Background())) - filter := NewWakuFilter(b, false, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) - filter.SetHost(host) - sub := relay.NewSubscription(protocol.NewContentFilter(testTopic, testContentTopic)) - err = filter.Start(context.Background(), sub) - require.NoError(t, err) - - return filter, host -} - -// Node1: Filter subscribed to content topic A -// Node2: Relay + Filter -// -// # Node1 and Node2 are peers -// -// Node2 send a successful message with topic A -// Node1 receive the message -// -// Node2 send a successful message with topic B -// Node1 doesn't receive the message -func TestWakuFilter(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds - defer cancel() - - node1, host1 := makeWakuFilter(t) - defer node1.Stop() - - broadcaster := relay.NewBroadcaster(10) - require.NoError(t, broadcaster.Start(context.Background())) - node2, sub2, host2 := makeWakuRelay(t, testTopic, broadcaster) - defer node2.Stop() - defer sub2.Unsubscribe() - - node2Filter := NewWakuFilter(broadcaster, true, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger()) - node2Filter.SetHost(host2) - sub := broadcaster.Register(protocol.NewContentFilter(testTopic)) - err := node2Filter.Start(ctx, sub) - require.NoError(t, err) - - host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) - err = host1.Peerstore().AddProtocols(host2.ID(), FilterID_v20beta1) - require.NoError(t, err) - - contentFilter := &ContentFilter{ - Topic: string(testTopic), - ContentTopics: []string{testContentTopic}, - } - - _, f, err := node1.Subscribe(ctx, *contentFilter, WithPeer(node2Filter.h.ID())) - require.NoError(t, err) - - // Sleep to make sure the filter is subscribed - time.Sleep(2 * time.Second) - - var wg sync.WaitGroup - - wg.Add(1) - go func() { - defer wg.Done() - env := <-f.Chan - require.Equal(t, contentFilter.ContentTopics[0], env.Message().GetContentTopic()) - }() - - _, err = node2.Publish(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(testTopic)) - require.NoError(t, err) - - wg.Wait() - - wg.Add(1) - go func() { - select { - case <-f.Chan: - require.Fail(t, "should not receive another message") - case <-time.After(1 * time.Second): - defer wg.Done() - case <-ctx.Done(): - require.Fail(t, "test exceeded allocated time") - } - }() - - _, err = node2.Publish(ctx, tests.CreateWakuMessage("TopicB", utils.GetUnixEpoch()), relay.WithPubSubTopic(testTopic)) - require.NoError(t, err) - - wg.Wait() - - wg.Add(1) - go func() { - select { - case <-f.Chan: - require.Fail(t, "should not receive another message") - case <-time.After(1 * time.Second): - defer wg.Done() - case <-ctx.Done(): - require.Fail(t, "test exceeded allocated time") - } - }() - - err = node1.Unsubscribe(ctx, *contentFilter, node2Filter.h.ID()) - require.NoError(t, err) - - time.Sleep(1 * time.Second) - - _, err = node2.Publish(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(testTopic)) - require.NoError(t, err) - wg.Wait() -} - -func TestWakuFilterPeerFailure(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds - defer cancel() - - node1, host1 := makeWakuFilter(t) - - broadcaster := relay.NewBroadcaster(10) - require.NoError(t, broadcaster.Start(context.Background())) - node2, sub2, host2 := makeWakuRelay(t, testTopic, broadcaster) - defer node2.Stop() - defer sub2.Unsubscribe() - - broadcaster2 := relay.NewBroadcaster(10) - require.NoError(t, broadcaster2.Start(context.Background())) - node2Filter := NewWakuFilter(broadcaster2, true, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger(), WithTimeout(3*time.Second)) - node2Filter.SetHost(host2) - sub := broadcaster.Register(protocol.NewContentFilter(testTopic)) - err := node2Filter.Start(ctx, sub) - require.NoError(t, err) - - host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) - err = host1.Peerstore().AddProtocols(host2.ID(), FilterID_v20beta1) - require.NoError(t, err) - - contentFilter := &ContentFilter{ - Topic: string(testTopic), - ContentTopics: []string{testContentTopic}, - } - - _, f, err := node1.Subscribe(ctx, *contentFilter, WithPeer(node2Filter.h.ID())) - require.NoError(t, err) - - // Simulate there's been a failure before - node2Filter.subscribers.FlagAsFailure(host1.ID()) - - // Sleep to make sure the filter is subscribed - time.Sleep(2 * time.Second) - - require.True(t, node2Filter.subscribers.IsFailedPeer(host1.ID())) - - var wg sync.WaitGroup - - wg.Add(1) - go func() { - defer wg.Done() - env := <-f.Chan - require.Equal(t, contentFilter.ContentTopics[0], env.Message().GetContentTopic()) - - // Failure is removed - require.False(t, node2Filter.subscribers.IsFailedPeer(host1.ID())) - - }() - - _, err = node2.Publish(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(testTopic)) - require.NoError(t, err) - - wg.Wait() - - // Kill the subscriber - host1.Close() - - time.Sleep(1 * time.Second) - - _, err = node2.Publish(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(testTopic)) - require.NoError(t, err) - - // TODO: find out how to eliminate this sleep - time.Sleep(1 * time.Second) - require.True(t, node2Filter.subscribers.IsFailedPeer(host1.ID())) - - time.Sleep(3 * time.Second) - - _, err = node2.Publish(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(testTopic)) - require.NoError(t, err) - - time.Sleep(1 * time.Second) - require.False(t, node2Filter.subscribers.IsFailedPeer(host1.ID())) // Failed peer has been removed - - for subscriber := range node2Filter.subscribers.Items(nil) { - if subscriber.peer == node1.h.ID() { - require.Fail(t, "Subscriber should not exist") - } - } - -}