chore: drop legacy filter support (#1037)

This commit is contained in:
richΛrd 2024-02-20 08:47:37 -04:00 committed by GitHub
parent 57cf95cd5c
commit d65a836bb6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 17 additions and 2071 deletions

View File

@ -364,26 +364,6 @@ var (
Destination: &options.Filter.Timeout,
EnvVars: []string{"WAKUNODE2_FILTER_TIMEOUT"},
})
FilterLegacyFlag = altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "legacy-filter",
Usage: "Use filter protocol (legacy)",
Destination: &options.Filter.UseV1,
EnvVars: []string{"WAKUNODE2_USE_LEGACY_FILTER"},
})
FilterLegacyLightClient = altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "legacy-filter-light-client",
Usage: "Don't accept legacy filter subscribers",
Destination: &options.Filter.DisableFullNode,
EnvVars: []string{"WAKUNODE2_LEGACY_FILTER_LIGHT_CLIENT"},
})
FilterLegacyNode = cliutils.NewGenericFlagMultiValue(&cli.GenericFlag{
Name: "legacy-filternode",
Usage: "Multiaddr of a peer that supports legacy filter protocol. Option may be repeated",
Value: &cliutils.MultiaddrSlice{
Values: &options.Filter.NodesV1,
},
EnvVars: []string{"WAKUNODE2_LEGACY_FILTERNODE"},
})
LightPush = altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "lightpush",
Usage: "Enable lightpush protocol",

View File

@ -70,9 +70,6 @@ func main() {
FilterFlag,
FilterNode,
FilterTimeout,
FilterLegacyFlag,
FilterLegacyNode,
FilterLegacyLightClient,
LightPush,
LightPushNode,
Discv5Discovery,

View File

@ -46,7 +46,6 @@ import (
"github.com/waku-org/go-waku/waku/v2/node"
wprotocol "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
@ -253,10 +252,6 @@ func Execute(options NodeOptions) error {
nodeOpts = append(nodeOpts, node.WithWakuFilterFullNode(filter.WithTimeout(options.Filter.Timeout)))
}
if options.Filter.UseV1 {
nodeOpts = append(nodeOpts, node.WithLegacyWakuFilter(!options.Filter.DisableFullNode, legacy_filter.WithTimeout(options.Filter.Timeout)))
}
var dbStore *persistence.DBStore
if requiresDB(options) {
dbOptions := []persistence.DBOption{
@ -331,12 +326,6 @@ func Execute(options NodeOptions) error {
pubSubTopicMapKeys = append(pubSubTopicMapKeys, k)
}
if options.Filter.UseV1 {
if err := addStaticPeers(wakuNode, options.Filter.NodesV1, pubSubTopicMapKeys, legacy_filter.FilterID_v20beta1); err != nil {
return err
}
}
if err = wakuNode.Start(ctx); err != nil {
return nonRecoverError(err)
}

View File

@ -53,10 +53,8 @@ type RLNRelayOptions struct {
// restricted devices.
type FilterOptions struct {
Enable bool
UseV1 bool
DisableFullNode bool
Nodes []multiaddr.Multiaddr
NodesV1 []multiaddr.Multiaddr
Timeout time.Duration
}

View File

@ -6,15 +6,13 @@ import (
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
)
func IsWakuProtocol(protocol protocol.ID) bool {
return protocol == legacy_filter.FilterID_v20beta1 ||
protocol == filter.FilterPushID_v20beta1 ||
return protocol == filter.FilterPushID_v20beta1 ||
protocol == filter.FilterSubscribeID_v20beta1 ||
protocol == relay.WakuRelayID_v200 ||
protocol == lightpush.LightPushID_v20beta1 ||

View File

@ -1,64 +0,0 @@
package main
/*
#include <cgo_utils.h>
*/
import "C"
import (
"unsafe"
"github.com/waku-org/go-waku/library"
)
// Creates a subscription to a light node matching a content filter and, optionally, a pubSub topic.
// filterJSON must contain a JSON with this format:
//
// {
// "contentFilters": [ // mandatory
// {
// "contentTopic": "the content topic"
// }, ...
// ],
// "pubsubTopic": "the pubsub topic" // optional
// }
//
// peerID should contain the ID of a peer supporting the filter protocol. Use NULL to automatically select a node
// If ms is greater than 0, the subscription must happen before the timeout
// (in milliseconds) is reached, or an error will be returned
//
//export waku_legacy_filter_subscribe
func waku_legacy_filter_subscribe(ctx unsafe.Pointer, filterJSON *C.char, peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
instance, err := getInstance(ctx)
if err != nil {
onError(err, cb, userData)
}
err = library.LegacyFilterSubscribe(instance, C.GoString(filterJSON), C.GoString(peerID), int(ms))
return onError(err, cb, userData)
}
// Removes subscriptions in a light node matching a content filter and, optionally, a pubSub topic.
// filterJSON must contain a JSON with this format:
//
// {
// "contentFilters": [ // mandatory
// {
// "contentTopic": "the content topic"
// }, ...
// ],
// "pubsubTopic": "the pubsub topic" // optional
// }
//
// If ms is greater than 0, the subscription must happen before the timeout
// (in milliseconds) is reached, or an error will be returned
//
//export waku_legacy_filter_unsubscribe
func waku_legacy_filter_unsubscribe(ctx unsafe.Pointer, filterJSON *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
instance, err := getInstance(ctx)
if err != nil {
onError(err, cb, userData)
}
err = library.LegacyFilterUnsubscribe(instance, C.GoString(filterJSON), int(ms))
return onError(err, cb, userData)
}

View File

@ -19,7 +19,6 @@ type WakuConfig struct {
EnableRelay *bool `json:"relay"`
RelayTopics []string `json:"relayTopics,omitempty"`
GossipSubParams *GossipSubParams `json:"gossipsubParams,omitempty"`
EnableLegacyFilter *bool `json:"legacyFilter,omitempty"`
MinPeersToPublish *int `json:"minPeersToPublish,omitempty"`
DNSDiscoveryURLs []string `json:"dnsDiscoveryURLs,omitempty"`
DNSDiscoveryNameServer string `json:"dnsDiscoveryNameServer,omitempty"`
@ -49,7 +48,6 @@ var defaultPort = 60000
var defaultKeepAliveInterval = 20
var defaultEnableRelay = true
var defaultMinPeersToPublish = 0
var defaultEnableLegacyFilter = false
var defaultEnableDiscV5 = false
var defaultDiscV5UDPPort = uint(9000)
var defaultLogLevel = "INFO"
@ -227,10 +225,6 @@ func getConfig(configJSON string) (WakuConfig, error) {
config.EnableRelay = &defaultEnableRelay
}
if config.EnableLegacyFilter == nil {
config.EnableLegacyFilter = &defaultEnableLegacyFilter
}
if config.EnableDiscV5 == nil {
config.EnableDiscV5 = &defaultEnableDiscV5
}

View File

@ -1,105 +0,0 @@
package library
import (
"context"
"encoding/json"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb"
)
type legacyFilterArgument struct {
Topic string `json:"pubsubTopic,omitempty"`
ContentFilters []*pb.FilterRequest_ContentFilter `json:"contentFilters,omitempty"`
}
func toLegacyContentFilter(filterJSON string) (legacy_filter.ContentFilter, error) {
var f legacyFilterArgument
err := json.Unmarshal([]byte(filterJSON), &f)
if err != nil {
return legacy_filter.ContentFilter{}, err
}
result := legacy_filter.ContentFilter{
Topic: f.Topic,
}
for _, cf := range f.ContentFilters {
result.ContentTopics = append(result.ContentTopics, cf.ContentTopic)
}
return result, err
}
// LegacyFilterSubscribe is used to create a subscription to a filter node to receive messages
// Deprecated: Use FilterSubscribe instead
func LegacyFilterSubscribe(instance *WakuInstance, filterJSON string, peerID string, ms int) error {
cf, err := toLegacyContentFilter(filterJSON)
if err != nil {
return err
}
if err := validateInstance(instance, MustBeStarted); err != nil {
return err
}
var ctx context.Context
var cancel context.CancelFunc
if ms > 0 {
ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond)
defer cancel()
} else {
ctx = instance.ctx
}
var fOptions []legacy_filter.FilterSubscribeOption
if peerID != "" {
p, err := peer.Decode(peerID)
if err != nil {
return err
}
fOptions = append(fOptions, legacy_filter.WithPeer(p))
} else {
fOptions = append(fOptions, legacy_filter.WithAutomaticPeerSelection())
}
_, f, err := instance.node.LegacyFilter().Subscribe(ctx, cf, fOptions...)
if err != nil {
return err
}
go func(f legacy_filter.Filter) {
for envelope := range f.Chan {
send(instance, "message", toSubscriptionMessage(envelope))
}
}(f)
return nil
}
// LegacyFilterUnsubscribe is used to remove a filter criteria from an active subscription with a filter node
// Deprecated: Use FilterUnsubscribe or FilterUnsubscribeAll instead
func LegacyFilterUnsubscribe(instance *WakuInstance, filterJSON string, ms int) error {
cf, err := toLegacyContentFilter(filterJSON)
if err != nil {
return err
}
if err := validateInstance(instance, MustBeStarted); err != nil {
return err
}
var ctx context.Context
var cancel context.CancelFunc
if ms > 0 {
ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond)
defer cancel()
} else {
ctx = instance.ctx
}
return instance.node.LegacyFilter().UnsubscribeFilter(ctx, cf)
}

View File

@ -1,29 +0,0 @@
package gowaku
import (
"github.com/waku-org/go-waku/library"
)
// LegacyFilterSubscribe is used to create a subscription to a filter node to receive messages
// Deprecated: Use FilterSubscribe instead
func LegacyFilterSubscribe(instanceID uint, filterJSON string, peerID string, ms int) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
err = library.LegacyFilterSubscribe(instance, filterJSON, peerID, ms)
return makeJSONResponse(err)
}
// LegacyFilterUnsubscribe is used to remove a filter criteria from an active subscription with a filter node
// Deprecated: Use FilterUnsubscribe or FilterUnsubscribeAll instead
func LegacyFilterUnsubscribe(instanceID uint, filterJSON string, ms int) string {
instance, err := library.GetInstance(instanceID)
if err != nil {
return makeJSONResponse(err)
}
err = library.LegacyFilterUnsubscribe(instance, filterJSON, ms)
return makeJSONResponse(err)
}

View File

@ -195,10 +195,6 @@ func NewNode(instance *WakuInstance, configJSON string) error {
}
}
if *config.EnableLegacyFilter {
opts = append(opts, node.WithLegacyWakuFilter(false))
}
opts = append(opts, node.WithWakuFilterLightNode())
if *config.EnableStore {

View File

@ -36,7 +36,6 @@ import (
wakuprotocol "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"github.com/waku-org/go-waku/waku/v2/protocol/metadata"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
@ -99,7 +98,6 @@ type WakuNode struct {
peerExchange Service
rendezvous Service
metadata Service
legacyFilter ReceptorService
filterFullNode ReceptorService
filterLightNode Service
store ReceptorService
@ -194,7 +192,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.log = params.logger.Named("node2")
w.wg = &sync.WaitGroup{}
w.keepAliveFails = make(map[peer.ID]int)
w.wakuFlag = enr.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilterFullNode || w.opts.enableLegacyFilter, w.opts.enableStore, w.opts.enableRelay)
w.wakuFlag = enr.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilterFullNode, w.opts.enableStore, w.opts.enableRelay)
w.circuitRelayNodes = make(chan peer.AddrInfo)
w.metrics = newMetrics(params.prometheusReg)
@ -286,10 +284,8 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
}
}
w.opts.legacyFilterOpts = append(w.opts.legacyFilterOpts, legacy_filter.WithPeerManager(w.peermanager))
w.opts.filterOpts = append(w.opts.filterOpts, filter.WithPeerManager(w.peermanager))
w.legacyFilter = legacy_filter.NewWakuFilter(w.bcaster, w.opts.isLegacyFilterFullNode, w.timesource, w.opts.prometheusReg, w.log, w.opts.legacyFilterOpts...)
w.filterFullNode = filter.NewWakuFilterFullNode(w.timesource, w.opts.prometheusReg, w.log, w.opts.filterOpts...)
w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.prometheusReg, w.log)
w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log)
@ -443,16 +439,6 @@ func (w *WakuNode) Start(ctx context.Context) error {
}
}
w.legacyFilter.SetHost(host)
if w.opts.enableLegacyFilter {
sub := w.bcaster.RegisterForAll()
err := w.legacyFilter.Start(ctx, sub)
if err != nil {
return err
}
w.log.Info("Subscribing filter to broadcaster")
}
w.filterFullNode.SetHost(host)
if w.opts.enableFilterFullNode {
sub := w.bcaster.RegisterForAll()
@ -512,7 +498,6 @@ func (w *WakuNode) Stop() {
w.relay.Stop()
w.lightPush.Stop()
w.store.Stop()
w.legacyFilter.Stop()
w.filterFullNode.Stop()
w.filterLightNode.Stop()
@ -602,14 +587,6 @@ func (w *WakuNode) Store() store.Store {
return w.store.(store.Store)
}
// LegacyFilter is used to access any operation related to Waku LegacyFilter protocol
func (w *WakuNode) LegacyFilter() *legacy_filter.WakuFilter {
if result, ok := w.legacyFilter.(*legacy_filter.WakuFilter); ok {
return result
}
return nil
}
// FilterLightnode is used to access any operation related to Waku Filter protocol Full node feature
func (w *WakuNode) FilterFullNode() *filter.WakuFilterFullNode {
if result, ok := w.filterFullNode.(*filter.WakuFilterFullNode); ok {

View File

@ -19,7 +19,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
@ -230,7 +230,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
wakuNode1, err := New(
WithHostAddress(hostAddr1),
WithWakuRelay(),
WithLegacyWakuFilter(true),
WithWakuFilterFullNode(),
)
require.NoError(t, err)
err = wakuNode1.Start(ctx)
@ -251,7 +251,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
require.NoError(t, err)
wakuNode2, err := New(
WithHostAddress(hostAddr2),
WithLegacyWakuFilter(false),
WithWakuFilterLightNode(),
WithWakuStore(),
WithMessageProvider(dbStore),
)
@ -260,15 +260,13 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
require.NoError(t, err)
defer wakuNode2.Stop()
err = wakuNode2.DialPeerWithMultiAddress(ctx, wakuNode1.ListenAddresses()[0])
peerID, err := wakuNode2.AddPeer(wakuNode1.ListenAddresses()[0], peerstore.Static, []string{relay.DefaultWakuTopic}, filter.FilterSubscribeID_v20beta1)
require.NoError(t, err)
time.Sleep(2 * time.Second)
_, filter, err := wakuNode2.LegacyFilter().Subscribe(ctx, legacy_filter.ContentFilter{
Topic: string(relay.DefaultWakuTopic),
ContentTopics: []string{"abc"},
}, legacy_filter.WithPeer(wakuNode1.host.ID()))
subscription, err := wakuNode2.FilterLightnode().Subscribe(ctx, protocol.ContentFilter{
PubsubTopic: relay.DefaultWakuTopic,
ContentTopics: protocol.NewContentTopicSet("abc"),
}, filter.WithPeer(peerID))
require.NoError(t, err)
// Sleep to make sure the filter is subscribed
@ -284,7 +282,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
go func() {
// MSG1 should be pushed in NODE2 via filter
defer wg.Done()
env, ok := <-filter.Chan
env, ok := <-subscription[0].C
if !ok {
require.Fail(t, "no message")
}
@ -304,7 +302,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
require.NoError(t, err)
wakuNode3, err := New(
WithHostAddress(hostAddr3),
WithLegacyWakuFilter(false),
WithWakuFilterLightNode(),
)
require.NoError(t, err)
err = wakuNode3.Start(ctx)

View File

@ -28,7 +28,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/rendezvous"
@ -74,14 +73,11 @@ type WakuNodeParameters struct {
logger *zap.Logger
logLevel logging.LogLevel
enableRelay bool
enableLegacyFilter bool
isLegacyFilterFullNode bool
enableFilterLightNode bool
enableFilterFullNode bool
legacyFilterOpts []legacy_filter.Option
filterOpts []filter.Option
pubsubOpts []pubsub.Option
enableRelay bool
enableFilterLightNode bool
enableFilterFullNode bool
filterOpts []filter.Option
pubsubOpts []pubsub.Option
minRelayPeersToPublish int
maxMsgSizeBytes int
@ -413,17 +409,6 @@ func WithPeerExchange() WakuNodeOption {
}
}
// WithLegacyWakuFilter enables the legacy Waku Filter protocol. This WakuNodeOption
// accepts a list of WakuFilter gossipsub options to setup the protocol
func WithLegacyWakuFilter(fullnode bool, filterOpts ...legacy_filter.Option) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableLegacyFilter = true
params.isLegacyFilterFullNode = fullnode
params.legacyFilterOpts = filterOpts
return nil
}
}
// WithWakuFilter enables the Waku Filter V2 protocol for lightnode functionality
func WithWakuFilterLightNode() WakuNodeOption {
return func(params *WakuNodeParameters) error {

View File

@ -54,7 +54,6 @@ func TestWakuOptions(t *testing.T) {
WithPrivateKey(prvKey),
WithLibP2POptions(),
WithWakuRelay(),
WithLegacyWakuFilter(true),
WithDiscoveryV5(123, nil, false),
WithWakuStore(),
WithMessageProvider(&persistence.DBStore{}),
@ -104,7 +103,6 @@ func TestWakuRLNOptions(t *testing.T) {
WithPrivateKey(prvKey),
WithLibP2POptions(),
WithWakuRelay(),
WithLegacyWakuFilter(true),
WithDiscoveryV5(123, nil, false),
WithWakuStore(),
WithMessageProvider(&persistence.DBStore{}),
@ -145,7 +143,6 @@ func TestWakuRLNOptions(t *testing.T) {
WithPrivateKey(prvKey),
WithLibP2POptions(),
WithWakuRelay(),
WithLegacyWakuFilter(true),
WithDiscoveryV5(123, nil, false),
WithWakuStore(),
WithMessageProvider(&persistence.DBStore{}),

View File

@ -1,115 +0,0 @@
package legacy_filter
import (
"sync"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
)
type FilterMap struct {
sync.RWMutex
timesource timesource.Timesource
items map[string]Filter
broadcaster relay.Broadcaster
}
type FilterMapItem struct {
Key string
Value Filter
}
func NewFilterMap(broadcaster relay.Broadcaster, timesource timesource.Timesource) *FilterMap {
return &FilterMap{
timesource: timesource,
items: make(map[string]Filter),
broadcaster: broadcaster,
}
}
func (fm *FilterMap) Set(key string, value Filter) {
fm.Lock()
defer fm.Unlock()
fm.items[key] = value
}
func (fm *FilterMap) Get(key string) (Filter, bool) {
fm.Lock()
defer fm.Unlock()
value, ok := fm.items[key]
return value, ok
}
func (fm *FilterMap) Delete(key string) {
fm.Lock()
defer fm.Unlock()
_, ok := fm.items[key]
if !ok {
return
}
close(fm.items[key].Chan)
delete(fm.items, key)
}
func (fm *FilterMap) RemoveAll() {
fm.Lock()
defer fm.Unlock()
for k, v := range fm.items {
close(v.Chan)
delete(fm.items, k)
}
}
func (fm *FilterMap) Items() <-chan FilterMapItem {
c := make(chan FilterMapItem)
f := func() {
fm.RLock()
defer fm.RUnlock()
for k, v := range fm.items {
c <- FilterMapItem{k, v}
}
close(c)
}
go f()
return c
}
// Notify is used to push a received message from a filter subscription to
// any content filter registered on this node and to the broadcast subscribers
func (fm *FilterMap) Notify(msg *pb.WakuMessage, requestID string) {
fm.RLock()
defer fm.RUnlock()
filter, ok := fm.items[requestID]
if !ok {
// We do this because the key for the filter is set to the requestID received from the filter protocol.
// This means we do not need to check the content filter explicitly as all MessagePushs already contain
// the requestID of the coresponding filter.
return
}
envelope := protocol.NewEnvelope(msg, fm.timesource.Now().UnixNano(), filter.Topic)
// Broadcasting message so it's stored
fm.broadcaster.Submit(envelope)
// TODO: In case of no topics we should either trigger here for all messages,
// or we should not allow such filter to exist in the first place.
for _, contentTopic := range filter.ContentFilters {
if msg.ContentTopic == contentTopic {
filter.Chan <- envelope
break
}
}
}

View File

@ -1,37 +0,0 @@
package legacy_filter
import (
"context"
"testing"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
)
func TestFilterMap(t *testing.T) {
b := relay.NewBroadcaster(100)
require.NoError(t, b.Start(context.Background()))
fmap := NewFilterMap(b, timesource.NewDefaultClock())
filter := Filter{
PeerID: "id",
Topic: "test",
ContentFilters: []string{"test"},
Chan: make(chan *protocol.Envelope),
}
fmap.Set("test", filter)
res := <-fmap.Items()
require.Equal(t, "test", res.Key)
item, ok := fmap.Get("test")
require.True(t, ok)
require.Equal(t, "test", item.Topic)
fmap.Delete("test")
_, ok = fmap.Get("test")
require.False(t, ok)
}

View File

@ -1,167 +0,0 @@
package legacy_filter
import (
"sync"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb"
)
type Subscriber struct {
peer peer.ID
requestID string
filter *pb.FilterRequest // @TODO MAKE THIS A SEQUENCE AGAIN?
}
func (sub Subscriber) HasContentTopic(topic string) bool {
if len(sub.filter.ContentFilters) == 0 {
return true // When the subscriber has no specific ContentTopic filter
}
for _, filter := range sub.filter.ContentFilters {
if filter.ContentTopic == topic {
return true
}
}
return false
}
type Subscribers struct {
sync.RWMutex
subscribers []Subscriber
timeout time.Duration
failedPeers map[peer.ID]time.Time
}
func NewSubscribers(timeout time.Duration) *Subscribers {
return &Subscribers{
timeout: timeout,
failedPeers: make(map[peer.ID]time.Time),
}
}
func (sub *Subscribers) Clear() {
sub.Lock()
defer sub.Unlock()
sub.subscribers = nil
sub.failedPeers = make(map[peer.ID]time.Time)
}
func (sub *Subscribers) Append(s Subscriber) int {
sub.Lock()
defer sub.Unlock()
sub.subscribers = append(sub.subscribers, s)
return len(sub.subscribers)
}
func (sub *Subscribers) Items(contentTopic *string) <-chan Subscriber {
c := make(chan Subscriber)
f := func() {
sub.RLock()
defer sub.RUnlock()
for _, s := range sub.subscribers {
if contentTopic == nil || s.HasContentTopic(*contentTopic) {
c <- s
}
}
close(c)
}
go f()
return c
}
func (sub *Subscribers) Length() int {
sub.RLock()
defer sub.RUnlock()
return len(sub.subscribers)
}
func (sub *Subscribers) IsFailedPeer(peerID peer.ID) bool {
sub.RLock()
defer sub.RUnlock()
_, ok := sub.failedPeers[peerID]
return ok
}
func (sub *Subscribers) FlagAsSuccess(peerID peer.ID) {
sub.Lock()
defer sub.Unlock()
_, ok := sub.failedPeers[peerID]
if ok {
delete(sub.failedPeers, peerID)
}
}
func (sub *Subscribers) FlagAsFailure(peerID peer.ID) {
sub.Lock()
defer sub.Unlock()
lastFailure, ok := sub.failedPeers[peerID]
if ok {
elapsedTime := time.Since(lastFailure)
if elapsedTime > sub.timeout {
var tmpSubs []Subscriber
for _, s := range sub.subscribers {
if s.peer != peerID {
tmpSubs = append(tmpSubs, s)
}
}
sub.subscribers = tmpSubs
delete(sub.failedPeers, peerID)
}
} else {
sub.failedPeers[peerID] = time.Now()
}
}
// RemoveContentFilters removes a set of content filters registered for an specific peer
func (sub *Subscribers) RemoveContentFilters(peerID peer.ID, requestID string, contentFilters []*pb.FilterRequest_ContentFilter) {
sub.Lock()
defer sub.Unlock()
var peerIdsToRemove []peer.ID
for subIndex, subscriber := range sub.subscribers {
if subscriber.peer != peerID || subscriber.requestID != requestID {
continue
}
// make sure we delete the content filter
// if no more topics are left
for _, contentFilter := range contentFilters {
subCfs := subscriber.filter.ContentFilters
for i, cf := range subCfs {
if cf.ContentTopic == contentFilter.ContentTopic {
l := len(subCfs) - 1
subCfs[i] = subCfs[l]
subscriber.filter.ContentFilters = subCfs[:l]
}
}
sub.subscribers[subIndex] = subscriber
}
if len(subscriber.filter.ContentFilters) == 0 {
peerIdsToRemove = append(peerIdsToRemove, subscriber.peer)
}
}
// make sure we delete the subscriber
// if no more content filters left
for _, peerID := range peerIdsToRemove {
for i, s := range sub.subscribers {
if s.peer == peerID && s.requestID == requestID {
l := len(sub.subscribers) - 1
sub.subscribers[i] = sub.subscribers[l]
sub.subscribers = sub.subscribers[:l]
}
}
}
}

View File

@ -1,145 +0,0 @@
package legacy_filter
import (
"testing"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/test"
"github.com/stretchr/testify/assert"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb"
)
const TOPIC = "/test/topic"
func createPeerID(t *testing.T) peer.ID {
peerID, err := test.RandPeerID()
assert.NoError(t, err)
return peerID
}
func firstSubscriber(subs *Subscribers, contentTopic string) *Subscriber {
for sub := range subs.Items(&contentTopic) {
return &sub
}
return nil
}
func TestAppend(t *testing.T) {
subs := NewSubscribers(10 * time.Second)
peerID := createPeerID(t)
requestID := "request_1"
contentTopic := "topic1"
request := &pb.FilterRequest{
Subscribe: true,
Topic: TOPIC,
ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: contentTopic}},
}
subs.Append(Subscriber{peerID, requestID, request})
sub := firstSubscriber(subs, contentTopic)
assert.NotNil(t, sub)
}
func TestRemove(t *testing.T) {
subs := NewSubscribers(10 * time.Second)
peerId := createPeerID(t)
requestID := "request_1"
contentTopic := "topic1"
request := &pb.FilterRequest{
Subscribe: true,
Topic: TOPIC,
ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: contentTopic}},
}
subs.Append(Subscriber{peerId, requestID, request})
subs.RemoveContentFilters(peerId, requestID, request.ContentFilters)
sub := firstSubscriber(subs, contentTopic)
assert.Nil(t, sub)
}
func TestRemovePartial(t *testing.T) {
subs := NewSubscribers(10 * time.Second)
peerId := createPeerID(t)
requestID := "request_1"
topic1 := "topic1"
topic2 := "topic2"
request := &pb.FilterRequest{
Subscribe: true,
Topic: TOPIC,
ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: topic1}, {ContentTopic: topic2}},
}
subs.Append(Subscriber{peerId, requestID, request})
subs.RemoveContentFilters(peerId, requestID, []*pb.FilterRequest_ContentFilter{{ContentTopic: topic1}})
sub := firstSubscriber(subs, topic2)
assert.NotNil(t, sub)
assert.Len(t, sub.filter.ContentFilters, 1)
}
func TestRemoveDuplicateSubscriptions(t *testing.T) {
subs := NewSubscribers(10 * time.Second)
peerId := createPeerID(t)
topic := "topic"
requestID1 := "request_1"
requestID2 := "request_2"
request1 := &pb.FilterRequest{
Subscribe: true,
Topic: TOPIC,
ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: topic}},
}
request2 := &pb.FilterRequest{
Subscribe: true,
Topic: TOPIC,
ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: topic}},
}
subs.Append(Subscriber{peerId, requestID1, request1})
subs.Append(Subscriber{peerId, requestID2, request2})
subs.RemoveContentFilters(peerId, requestID2, []*pb.FilterRequest_ContentFilter{{ContentTopic: topic}})
subs.RemoveContentFilters(peerId, requestID1, []*pb.FilterRequest_ContentFilter{{ContentTopic: topic}})
sub := firstSubscriber(subs, topic)
assert.Nil(t, sub)
}
func TestRemoveDuplicateSubscriptionsPartial(t *testing.T) {
subs := NewSubscribers(10 * time.Second)
peerId := createPeerID(t)
topic := "topic"
requestID1 := "request_1"
requestID2 := "request_2"
request1 := &pb.FilterRequest{
Subscribe: true,
Topic: TOPIC,
ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: topic}},
}
request2 := &pb.FilterRequest{
Subscribe: true,
Topic: TOPIC,
ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: topic}},
}
subs.Append(Subscriber{peerId, requestID1, request1})
subs.Append(Subscriber{peerId, requestID2, request2})
subs.RemoveContentFilters(peerId, requestID1, []*pb.FilterRequest_ContentFilter{{ContentTopic: topic}})
sub := firstSubscriber(subs, topic)
assert.NotNil(t, sub)
assert.Equal(t, sub.requestID, requestID2)
}
func TestRemoveBogus(t *testing.T) {
subs := NewSubscribers(10 * time.Second)
peerId := createPeerID(t)
requestID := "request_1"
contentTopic := "topic1"
request := &pb.FilterRequest{
Subscribe: true,
Topic: TOPIC,
ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: contentTopic}},
}
subs.Append(Subscriber{peerId, requestID, request})
subs.RemoveContentFilters(peerId, requestID, []*pb.FilterRequest_ContentFilter{{ContentTopic: "does not exist"}, {ContentTopic: contentTopic}})
sub := firstSubscriber(subs, contentTopic)
assert.Nil(t, sub)
}

View File

@ -1,75 +0,0 @@
package legacy_filter
import (
"github.com/libp2p/go-libp2p/p2p/metricshelper"
"github.com/prometheus/client_golang/prometheus"
)
var filterMessages = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "legacy_filter_messages",
Help: "The number of messages received via legacy filter protocol",
})
var filterErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "legacy_filter_errors",
Help: "The distribution of the legacy filter protocol errors",
},
[]string{"error_type"},
)
var filterSubscribers = prometheus.NewGauge(
prometheus.GaugeOpts{
Name: "legacy_filter_subscriptions",
Help: "The number of legacy filter subscribers",
})
var collectors = []prometheus.Collector{
filterMessages,
filterErrors,
filterSubscribers,
}
// Metrics exposes the functions required to update prometheus metrics for legacy filter protocol
type Metrics interface {
RecordMessages(num int)
RecordSubscribers(num int)
RecordError(err metricsErrCategory)
}
type metricsImpl struct {
reg prometheus.Registerer
}
func newMetrics(reg prometheus.Registerer) Metrics {
metricshelper.RegisterCollectors(reg, collectors...)
return &metricsImpl{
reg: reg,
}
}
// RecordMessage is used to increase the counter for the number of messages received via waku filter
func (m *metricsImpl) RecordMessages(num int) {
filterMessages.Add(float64(num))
}
type metricsErrCategory string
var (
decodeRPCFailure metricsErrCategory = "decode_rpc_failure"
dialFailure metricsErrCategory = "dial_failure"
pushWriteError metricsErrCategory = "push_write_error"
peerNotFoundFailure metricsErrCategory = "peer_not_found_failure"
writeRequestFailure metricsErrCategory = "write_request_failure"
)
// RecordError increases the counter for different error types
func (m *metricsImpl) RecordError(err metricsErrCategory) {
filterErrors.WithLabelValues(string(err)).Inc()
}
// RecordSubscribers track the current number of filter subscribers
func (m *metricsImpl) RecordSubscribers(num int) {
filterSubscribers.Set(float64(num))
}

View File

@ -1,5 +0,0 @@
package pb
//go:generate mv ./../../waku-proto/waku/filter/v2beta1/filter.proto ./../../waku-proto/waku/filter/v2beta1/legacy_filter.proto
//go:generate protoc -I./../../waku-proto/waku/filter/v2beta1/. -I./../../waku-proto/ --go_opt=paths=source_relative --go_opt=Mlegacy_filter.proto=github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb --go_opt=Mwaku/message/v1/message.proto=github.com/waku-org/go-waku/waku/v2/protocol/pb --go_out=. ./../../waku-proto/waku/filter/v2beta1/legacy_filter.proto
//go:generate mv ./../../waku-proto/waku/filter/v2beta1/legacy_filter.proto ./../../waku-proto/waku/filter/v2beta1/filter.proto

View File

@ -1,394 +0,0 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.31.0
// protoc v4.24.4
// source: legacy_filter.proto
// 12/WAKU2-FILTER rfc: https://rfc.vac.dev/spec/12/
// Protocol identifier: /vac/waku/filter/2.0.0-beta1
package pb
import (
pb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type FilterRequest struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Subscribe bool `protobuf:"varint,1,opt,name=subscribe,proto3" json:"subscribe,omitempty"`
Topic string `protobuf:"bytes,2,opt,name=topic,proto3" json:"topic,omitempty"`
ContentFilters []*FilterRequest_ContentFilter `protobuf:"bytes,3,rep,name=content_filters,json=contentFilters,proto3" json:"content_filters,omitempty"`
}
func (x *FilterRequest) Reset() {
*x = FilterRequest{}
if protoimpl.UnsafeEnabled {
mi := &file_legacy_filter_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *FilterRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*FilterRequest) ProtoMessage() {}
func (x *FilterRequest) ProtoReflect() protoreflect.Message {
mi := &file_legacy_filter_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use FilterRequest.ProtoReflect.Descriptor instead.
func (*FilterRequest) Descriptor() ([]byte, []int) {
return file_legacy_filter_proto_rawDescGZIP(), []int{0}
}
func (x *FilterRequest) GetSubscribe() bool {
if x != nil {
return x.Subscribe
}
return false
}
func (x *FilterRequest) GetTopic() string {
if x != nil {
return x.Topic
}
return ""
}
func (x *FilterRequest) GetContentFilters() []*FilterRequest_ContentFilter {
if x != nil {
return x.ContentFilters
}
return nil
}
type MessagePush struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Messages []*pb.WakuMessage `protobuf:"bytes,1,rep,name=messages,proto3" json:"messages,omitempty"`
}
func (x *MessagePush) Reset() {
*x = MessagePush{}
if protoimpl.UnsafeEnabled {
mi := &file_legacy_filter_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *MessagePush) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*MessagePush) ProtoMessage() {}
func (x *MessagePush) ProtoReflect() protoreflect.Message {
mi := &file_legacy_filter_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use MessagePush.ProtoReflect.Descriptor instead.
func (*MessagePush) Descriptor() ([]byte, []int) {
return file_legacy_filter_proto_rawDescGZIP(), []int{1}
}
func (x *MessagePush) GetMessages() []*pb.WakuMessage {
if x != nil {
return x.Messages
}
return nil
}
type FilterRpc struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
RequestId string `protobuf:"bytes,1,opt,name=request_id,json=requestId,proto3" json:"request_id,omitempty"`
Request *FilterRequest `protobuf:"bytes,2,opt,name=request,proto3,oneof" json:"request,omitempty"`
Push *MessagePush `protobuf:"bytes,3,opt,name=push,proto3,oneof" json:"push,omitempty"`
}
func (x *FilterRpc) Reset() {
*x = FilterRpc{}
if protoimpl.UnsafeEnabled {
mi := &file_legacy_filter_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *FilterRpc) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*FilterRpc) ProtoMessage() {}
func (x *FilterRpc) ProtoReflect() protoreflect.Message {
mi := &file_legacy_filter_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use FilterRpc.ProtoReflect.Descriptor instead.
func (*FilterRpc) Descriptor() ([]byte, []int) {
return file_legacy_filter_proto_rawDescGZIP(), []int{2}
}
func (x *FilterRpc) GetRequestId() string {
if x != nil {
return x.RequestId
}
return ""
}
func (x *FilterRpc) GetRequest() *FilterRequest {
if x != nil {
return x.Request
}
return nil
}
func (x *FilterRpc) GetPush() *MessagePush {
if x != nil {
return x.Push
}
return nil
}
type FilterRequest_ContentFilter struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
ContentTopic string `protobuf:"bytes,1,opt,name=content_topic,json=contentTopic,proto3" json:"content_topic,omitempty"`
}
func (x *FilterRequest_ContentFilter) Reset() {
*x = FilterRequest_ContentFilter{}
if protoimpl.UnsafeEnabled {
mi := &file_legacy_filter_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *FilterRequest_ContentFilter) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*FilterRequest_ContentFilter) ProtoMessage() {}
func (x *FilterRequest_ContentFilter) ProtoReflect() protoreflect.Message {
mi := &file_legacy_filter_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use FilterRequest_ContentFilter.ProtoReflect.Descriptor instead.
func (*FilterRequest_ContentFilter) Descriptor() ([]byte, []int) {
return file_legacy_filter_proto_rawDescGZIP(), []int{0, 0}
}
func (x *FilterRequest_ContentFilter) GetContentTopic() string {
if x != nil {
return x.ContentTopic
}
return ""
}
var File_legacy_filter_proto protoreflect.FileDescriptor
var file_legacy_filter_proto_rawDesc = []byte{
0x0a, 0x13, 0x6c, 0x65, 0x67, 0x61, 0x63, 0x79, 0x5f, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x13, 0x77, 0x61, 0x6b, 0x75, 0x2e, 0x66, 0x69, 0x6c, 0x74,
0x65, 0x72, 0x2e, 0x76, 0x32, 0x62, 0x65, 0x74, 0x61, 0x31, 0x1a, 0x1d, 0x77, 0x61, 0x6b, 0x75,
0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x2f, 0x76, 0x31, 0x2f, 0x6d, 0x65, 0x73, 0x73,
0x61, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xd4, 0x01, 0x0a, 0x0d, 0x46, 0x69,
0x6c, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x73,
0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x09,
0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70,
0x69, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12,
0x59, 0x0a, 0x0f, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x5f, 0x66, 0x69, 0x6c, 0x74, 0x65,
0x72, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x30, 0x2e, 0x77, 0x61, 0x6b, 0x75, 0x2e,
0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x62, 0x65, 0x74, 0x61, 0x31, 0x2e, 0x46,
0x69, 0x6c, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x2e, 0x43, 0x6f, 0x6e,
0x74, 0x65, 0x6e, 0x74, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x52, 0x0e, 0x63, 0x6f, 0x6e, 0x74,
0x65, 0x6e, 0x74, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x73, 0x1a, 0x34, 0x0a, 0x0d, 0x43, 0x6f,
0x6e, 0x74, 0x65, 0x6e, 0x74, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x12, 0x23, 0x0a, 0x0d, 0x63,
0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x0c, 0x63, 0x6f, 0x6e, 0x74, 0x65, 0x6e, 0x74, 0x54, 0x6f, 0x70, 0x69, 0x63,
0x22, 0x47, 0x0a, 0x0b, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x50, 0x75, 0x73, 0x68, 0x12,
0x38, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28,
0x0b, 0x32, 0x1c, 0x2e, 0x77, 0x61, 0x6b, 0x75, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
0x2e, 0x76, 0x31, 0x2e, 0x57, 0x61, 0x6b, 0x75, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x52,
0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x22, 0xbd, 0x01, 0x0a, 0x09, 0x46, 0x69,
0x6c, 0x74, 0x65, 0x72, 0x52, 0x70, 0x63, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71,
0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x41, 0x0a, 0x07, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x77, 0x61, 0x6b, 0x75, 0x2e, 0x66,
0x69, 0x6c, 0x74, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x62, 0x65, 0x74, 0x61, 0x31, 0x2e, 0x46, 0x69,
0x6c, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x48, 0x00, 0x52, 0x07, 0x72,
0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x88, 0x01, 0x01, 0x12, 0x39, 0x0a, 0x04, 0x70, 0x75, 0x73,
0x68, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x20, 0x2e, 0x77, 0x61, 0x6b, 0x75, 0x2e, 0x66,
0x69, 0x6c, 0x74, 0x65, 0x72, 0x2e, 0x76, 0x32, 0x62, 0x65, 0x74, 0x61, 0x31, 0x2e, 0x4d, 0x65,
0x73, 0x73, 0x61, 0x67, 0x65, 0x50, 0x75, 0x73, 0x68, 0x48, 0x01, 0x52, 0x04, 0x70, 0x75, 0x73,
0x68, 0x88, 0x01, 0x01, 0x42, 0x0a, 0x0a, 0x08, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74,
0x42, 0x07, 0x0a, 0x05, 0x5f, 0x70, 0x75, 0x73, 0x68, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x33,
}
var (
file_legacy_filter_proto_rawDescOnce sync.Once
file_legacy_filter_proto_rawDescData = file_legacy_filter_proto_rawDesc
)
func file_legacy_filter_proto_rawDescGZIP() []byte {
file_legacy_filter_proto_rawDescOnce.Do(func() {
file_legacy_filter_proto_rawDescData = protoimpl.X.CompressGZIP(file_legacy_filter_proto_rawDescData)
})
return file_legacy_filter_proto_rawDescData
}
var file_legacy_filter_proto_msgTypes = make([]protoimpl.MessageInfo, 4)
var file_legacy_filter_proto_goTypes = []interface{}{
(*FilterRequest)(nil), // 0: waku.filter.v2beta1.FilterRequest
(*MessagePush)(nil), // 1: waku.filter.v2beta1.MessagePush
(*FilterRpc)(nil), // 2: waku.filter.v2beta1.FilterRpc
(*FilterRequest_ContentFilter)(nil), // 3: waku.filter.v2beta1.FilterRequest.ContentFilter
(*pb.WakuMessage)(nil), // 4: waku.message.v1.WakuMessage
}
var file_legacy_filter_proto_depIdxs = []int32{
3, // 0: waku.filter.v2beta1.FilterRequest.content_filters:type_name -> waku.filter.v2beta1.FilterRequest.ContentFilter
4, // 1: waku.filter.v2beta1.MessagePush.messages:type_name -> waku.message.v1.WakuMessage
0, // 2: waku.filter.v2beta1.FilterRpc.request:type_name -> waku.filter.v2beta1.FilterRequest
1, // 3: waku.filter.v2beta1.FilterRpc.push:type_name -> waku.filter.v2beta1.MessagePush
4, // [4:4] is the sub-list for method output_type
4, // [4:4] is the sub-list for method input_type
4, // [4:4] is the sub-list for extension type_name
4, // [4:4] is the sub-list for extension extendee
0, // [0:4] is the sub-list for field type_name
}
func init() { file_legacy_filter_proto_init() }
func file_legacy_filter_proto_init() {
if File_legacy_filter_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_legacy_filter_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*FilterRequest); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_legacy_filter_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*MessagePush); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_legacy_filter_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*FilterRpc); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_legacy_filter_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*FilterRequest_ContentFilter); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
file_legacy_filter_proto_msgTypes[2].OneofWrappers = []interface{}{}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_legacy_filter_proto_rawDesc,
NumEnums: 0,
NumMessages: 4,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_legacy_filter_proto_goTypes,
DependencyIndexes: file_legacy_filter_proto_depIdxs,
MessageInfos: file_legacy_filter_proto_msgTypes,
}.Build()
File_legacy_filter_proto = out.File
file_legacy_filter_proto_rawDesc = nil
file_legacy_filter_proto_goTypes = nil
file_legacy_filter_proto_depIdxs = nil
}

View File

@ -1,471 +0,0 @@
package legacy_filter
import (
"context"
"encoding/hex"
"errors"
"math"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-msgio/pbio"
"github.com/prometheus/client_golang/prometheus"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb"
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/service"
"github.com/waku-org/go-waku/waku/v2/timesource"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
var (
ErrNoPeersAvailable = errors.New("no suitable remote peers")
)
type (
Filter struct {
filterID string
PeerID peer.ID
Topic string
ContentFilters []string
Chan chan *protocol.Envelope
}
ContentFilter struct {
Topic string
ContentTopics []string
}
FilterSubscription struct {
RequestID string
Peer peer.ID
}
WakuFilter struct {
*service.CommonService
h host.Host
pm *peermanager.PeerManager
isFullNode bool
msgSub *relay.Subscription
metrics Metrics
log *zap.Logger
filters *FilterMap
subscribers *Subscribers
}
)
// FilterID_v20beta1 is the current Waku Filter protocol identifier
const FilterID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter/2.0.0-beta1")
// NewWakuRelay returns a new instance of Waku Filter struct setup according to the chosen parameter and options
func NewWakuFilter(broadcaster relay.Broadcaster, isFullNode bool, timesource timesource.Timesource, reg prometheus.Registerer, log *zap.Logger, opts ...Option) *WakuFilter {
wf := new(WakuFilter)
wf.log = log.Named("filter").With(zap.Bool("fullNode", isFullNode))
params := new(FilterParameters)
optList := DefaultOptions()
optList = append(optList, opts...)
for _, opt := range optList {
opt(params)
}
wf.isFullNode = isFullNode
wf.CommonService = service.NewCommonService()
wf.filters = NewFilterMap(broadcaster, timesource)
wf.subscribers = NewSubscribers(params.Timeout)
wf.metrics = newMetrics(reg)
return wf
}
// Sets the host to be able to mount or consume a protocol
func (wf *WakuFilter) SetHost(h host.Host) {
wf.h = h
}
func (wf *WakuFilter) Start(ctx context.Context, sub *relay.Subscription) error {
return wf.CommonService.Start(ctx, func() error {
return wf.start(sub)
})
}
func (wf *WakuFilter) start(sub *relay.Subscription) error {
wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest(wf.Context()))
wf.msgSub = sub
wf.WaitGroup().Add(1)
go wf.filterListener(wf.Context())
wf.log.Info("filter protocol started")
return nil
}
func (wf *WakuFilter) onRequest(ctx context.Context) func(network.Stream) {
return func(stream network.Stream) {
peerID := stream.Conn().RemotePeer()
logger := wf.log.With(logging.HostID("peer", peerID))
filterRPCRequest := &pb.FilterRpc{}
reader := pbio.NewDelimitedReader(stream, math.MaxInt32)
err := reader.ReadMsg(filterRPCRequest)
if err != nil {
wf.metrics.RecordError(decodeRPCFailure)
logger.Error("reading request", zap.Error(err))
if err := stream.Reset(); err != nil {
wf.log.Error("resetting connection", zap.Error(err))
}
return
}
logger.Info("received request")
if filterRPCRequest.Push != nil && len(filterRPCRequest.Push.Messages) > 0 {
// We're on a light node.
// This is a message push coming from a full node.
for _, message := range filterRPCRequest.Push.Messages {
wf.filters.Notify(message, filterRPCRequest.RequestId) // Trigger filter handlers on a light node
}
logger.Info("received a message push", zap.Int("messages", len(filterRPCRequest.Push.Messages)))
wf.metrics.RecordMessages(len(filterRPCRequest.Push.Messages))
} else if filterRPCRequest.Request != nil && wf.isFullNode {
// We're on a full node.
// This is a filter request coming from a light node.
if filterRPCRequest.Request.Subscribe {
subscriber := Subscriber{peer: stream.Conn().RemotePeer(), requestID: filterRPCRequest.RequestId, filter: filterRPCRequest.Request}
if subscriber.filter.Topic == "" { // @TODO: review if empty topic is possible
subscriber.filter.Topic = relay.DefaultWakuTopic
}
subscribersLen := wf.subscribers.Append(subscriber)
logger.Info("adding subscriber")
wf.metrics.RecordSubscribers(subscribersLen)
} else {
wf.subscribers.RemoveContentFilters(peerID, filterRPCRequest.RequestId, filterRPCRequest.Request.ContentFilters)
logger.Info("removing subscriber")
wf.metrics.RecordSubscribers(wf.subscribers.Length())
}
} else {
logger.Error("can't serve request")
if err := stream.Reset(); err != nil {
wf.log.Error("resetting connection", zap.Error(err))
}
return
}
stream.Close()
}
}
func (wf *WakuFilter) pushMessage(ctx context.Context, subscriber Subscriber, msg *wpb.WakuMessage) error {
pushRPC := &pb.FilterRpc{RequestId: subscriber.requestID, Push: &pb.MessagePush{Messages: []*wpb.WakuMessage{msg}}}
logger := wf.log.With(logging.HostID("peer", subscriber.peer))
stream, err := wf.h.NewStream(ctx, subscriber.peer, FilterID_v20beta1)
if err != nil {
wf.subscribers.FlagAsFailure(subscriber.peer)
logger.Error("opening peer stream", zap.Error(err))
wf.metrics.RecordError(dialFailure)
return err
}
writer := pbio.NewDelimitedWriter(stream)
err = writer.WriteMsg(pushRPC)
if err != nil {
logger.Error("pushing messages to peer", zap.Error(err))
wf.subscribers.FlagAsFailure(subscriber.peer)
wf.metrics.RecordError(pushWriteError)
if err := stream.Reset(); err != nil {
wf.log.Error("resetting connection", zap.Error(err))
}
return nil
}
stream.Close()
wf.subscribers.FlagAsSuccess(subscriber.peer)
return nil
}
func (wf *WakuFilter) filterListener(ctx context.Context) {
defer wf.WaitGroup().Done()
// This function is invoked for each message received
// on the full node in context of Waku2-Filter
handle := func(envelope *protocol.Envelope) error { // async
msg := envelope.Message()
pubsubTopic := envelope.PubsubTopic()
logger := wf.log.With(zap.Stringer("message", msg))
g := new(errgroup.Group)
// Each subscriber is a light node that earlier on invoked
// a FilterRequest on this node
for subscriber := range wf.subscribers.Items(&(msg.ContentTopic)) {
logger := logger.With(logging.HostID("subscriber", subscriber.peer))
subscriber := subscriber // https://golang.org/doc/faq#closures_and_goroutines
if subscriber.filter.Topic != pubsubTopic {
logger.Info("pubsub topic mismatch",
zap.String("subscriberTopic", subscriber.filter.Topic),
zap.String("messageTopic", pubsubTopic))
continue
}
// Do a message push to light node
logger.Info("pushing message to light node", zap.String("contentTopic", msg.ContentTopic))
g.Go(func() (err error) {
err = wf.pushMessage(ctx, subscriber, msg)
if err != nil {
logger.Error("pushing message", zap.Error(err))
}
return err
})
}
return g.Wait()
}
for m := range wf.msgSub.Ch {
if err := handle(m); err != nil {
wf.log.Error("handling message", zap.Error(err))
}
}
}
// Having a FilterRequest struct,
// select a peer with filter support, dial it,
// and submit FilterRequest wrapped in FilterRPC
func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFilter, opts ...FilterSubscribeOption) (subscription *FilterSubscription, err error) {
params := new(FilterSubscribeParameters)
params.log = wf.log
params.host = wf.h
optList := DefaultSubscribtionOptions()
optList = append(optList, opts...)
for _, opt := range optList {
opt(params)
}
if wf.pm != nil && params.selectedPeer == "" {
selectedPeers, _ := wf.pm.SelectPeers(
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: FilterID_v20beta1,
PubsubTopics: []string{filter.Topic},
SpecificPeers: params.preferredPeers,
Ctx: ctx,
},
)
if err != nil {
params.selectedPeer = selectedPeers[0]
}
}
if params.selectedPeer == "" {
wf.metrics.RecordError(peerNotFoundFailure)
return nil, ErrNoPeersAvailable
}
var contentFilters []*pb.FilterRequest_ContentFilter
for _, ct := range filter.ContentTopics {
contentFilters = append(contentFilters, &pb.FilterRequest_ContentFilter{ContentTopic: ct})
}
request := &pb.FilterRequest{
Subscribe: true,
Topic: filter.Topic,
ContentFilters: contentFilters,
}
stream, err := wf.h.NewStream(ctx, params.selectedPeer, FilterID_v20beta1)
if err != nil {
wf.metrics.RecordError(dialFailure)
return
}
// This is the only successful path to subscription
requestID := hex.EncodeToString(protocol.GenerateRequestID())
writer := pbio.NewDelimitedWriter(stream)
filterRPC := &pb.FilterRpc{RequestId: requestID, Request: request}
wf.log.Debug("sending filterRPC", zap.Stringer("rpc", filterRPC))
err = writer.WriteMsg(filterRPC)
if err != nil {
wf.metrics.RecordError(writeRequestFailure)
wf.log.Error("sending filterRPC", zap.Error(err))
if err := stream.Reset(); err != nil {
wf.log.Error("resetting connection", zap.Error(err))
}
return
}
stream.Close()
subscription = new(FilterSubscription)
subscription.Peer = params.selectedPeer
subscription.RequestID = requestID
return
}
// Unsubscribe is used to stop receiving messages from a peer that match a content filter
func (wf *WakuFilter) Unsubscribe(ctx context.Context, contentFilter ContentFilter, peer peer.ID) error {
stream, err := wf.h.NewStream(ctx, peer, FilterID_v20beta1)
if err != nil {
wf.metrics.RecordError(dialFailure)
return err
}
// This is the only successful path to subscription
id := protocol.GenerateRequestID()
var contentFilters []*pb.FilterRequest_ContentFilter
for _, ct := range contentFilter.ContentTopics {
contentFilters = append(contentFilters, &pb.FilterRequest_ContentFilter{ContentTopic: ct})
}
request := &pb.FilterRequest{
Subscribe: false,
Topic: contentFilter.Topic,
ContentFilters: contentFilters,
}
writer := pbio.NewDelimitedWriter(stream)
filterRPC := &pb.FilterRpc{RequestId: hex.EncodeToString(id), Request: request}
err = writer.WriteMsg(filterRPC)
if err != nil {
wf.metrics.RecordError(writeRequestFailure)
if err := stream.Reset(); err != nil {
wf.log.Error("resetting connection", zap.Error(err))
}
return err
}
stream.Close()
return nil
}
// Stop unmounts the filter protocol
func (wf *WakuFilter) Stop() {
wf.CommonService.Stop(func() {
wf.msgSub.Unsubscribe()
wf.h.RemoveStreamHandler(FilterID_v20beta1)
wf.filters.RemoveAll()
wf.subscribers.Clear()
})
}
// Subscribe setups a subscription to receive messages that match a specific content filter
func (wf *WakuFilter) Subscribe(ctx context.Context, f ContentFilter, opts ...FilterSubscribeOption) (filterID string, theFilter Filter, err error) {
// TODO: check if there's an existing pubsub topic that uses the same peer. If so, reuse filter, and return same channel and filterID
// Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
// ContentFilterChan takes MessagePush structs
remoteSubs, err := wf.requestSubscription(ctx, f, opts...)
if err != nil || remoteSubs.RequestID == "" {
// Failed to subscribe
wf.log.Error("requesting subscription", zap.Error(err))
return
}
// Register handler for filter, whether remote subscription succeeded or not
filterID = remoteSubs.RequestID
theFilter = Filter{
filterID: filterID,
PeerID: remoteSubs.Peer,
Topic: f.Topic,
ContentFilters: f.ContentTopics,
Chan: make(chan *protocol.Envelope, 1024), // To avoid blocking
}
wf.filters.Set(filterID, theFilter)
return
}
// UnsubscribeFilterByID removes a subscription to a filter node completely
// using using a filter. It also closes the filter channel
func (wf *WakuFilter) UnsubscribeByFilter(ctx context.Context, filter Filter) error {
err := wf.UnsubscribeFilterByID(ctx, filter.filterID)
if err != nil {
close(filter.Chan)
}
return err
}
// UnsubscribeFilterByID removes a subscription to a filter node completely
// using the filterID returned when the subscription was created
func (wf *WakuFilter) UnsubscribeFilterByID(ctx context.Context, filterID string) error {
var f Filter
var ok bool
if f, ok = wf.filters.Get(filterID); !ok {
return errors.New("filter not found")
}
cf := ContentFilter{
Topic: f.Topic,
ContentTopics: f.ContentFilters,
}
err := wf.Unsubscribe(ctx, cf, f.PeerID)
if err != nil {
return err
}
wf.filters.Delete(filterID)
return nil
}
// Unsubscribe filter removes content topics from a filter subscription. If all
// the contentTopics are removed the subscription is dropped completely
func (wf *WakuFilter) UnsubscribeFilter(ctx context.Context, cf ContentFilter) error {
// Remove local filter
idsToRemove := make(map[string]struct{})
for filterMapItem := range wf.filters.Items() {
f := filterMapItem.Value
id := filterMapItem.Key
if f.Topic != cf.Topic {
continue
}
// Send message to full node in order to unsubscribe
err := wf.Unsubscribe(ctx, cf, f.PeerID)
if err != nil {
return err
}
// Iterate filter entries to remove matching content topics
// make sure we delete the content filter
// if no more topics are left
for _, cfToDelete := range cf.ContentTopics {
for i, cf := range f.ContentFilters {
if cf == cfToDelete {
l := len(f.ContentFilters) - 1
f.ContentFilters[l], f.ContentFilters[i] = f.ContentFilters[i], f.ContentFilters[l]
f.ContentFilters = f.ContentFilters[:l]
break
}
}
if len(f.ContentFilters) == 0 {
idsToRemove[id] = struct{}{}
}
}
}
for rID := range idsToRemove {
wf.filters.Delete(rID)
}
return nil
}

View File

@ -1,79 +0,0 @@
package legacy_filter
import (
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"go.uber.org/zap"
)
type (
FilterSubscribeParameters struct {
host host.Host
selectedPeer peer.ID
peerSelectionType peermanager.PeerSelection
preferredPeers peer.IDSlice
log *zap.Logger
}
FilterSubscribeOption func(*FilterSubscribeParameters)
FilterParameters struct {
Timeout time.Duration
pm *peermanager.PeerManager
}
Option func(*FilterParameters)
)
func WithTimeout(timeout time.Duration) Option {
return func(params *FilterParameters) {
params.Timeout = timeout
}
}
func WithPeerManager(pm *peermanager.PeerManager) Option {
return func(params *FilterParameters) {
params.pm = pm
}
}
func WithPeer(p peer.ID) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) {
params.selectedPeer = p
}
}
// WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store.
// If a list of specific peers is passed, the peer will be chosen from that list assuming it
// supports the chosen protocol, otherwise it will chose a peer from the node peerstore
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) {
params.peerSelectionType = peermanager.Automatic
params.preferredPeers = fromThesePeers
}
}
// WithFastestPeerSelection is an option used to select a peer from the peer store
// with the lowest ping If a list of specific peers is passed, the peer will be chosen
// from that list assuming it supports the chosen protocol, otherwise it will chose a
// peer from the node peerstore
func WithFastestPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) {
params.peerSelectionType = peermanager.LowestRTT
}
}
func DefaultOptions() []Option {
return []Option{
WithTimeout(24 * time.Hour),
}
}
func DefaultSubscribtionOptions() []FilterSubscribeOption {
return []FilterSubscribeOption{
WithAutomaticPeerSelection(),
}
}

View File

@ -1,36 +0,0 @@
package legacy_filter
import (
"context"
"crypto/rand"
"testing"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/utils"
)
func TestFilterOption(t *testing.T) {
port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
options := []FilterSubscribeOption{
WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"),
WithAutomaticPeerSelection(),
WithFastestPeerSelection(),
}
params := new(FilterSubscribeParameters)
params.host = host
params.log = utils.Logger()
for _, opt := range options {
opt(params)
}
require.Equal(t, host, params.host)
require.NotNil(t, params.selectedPeer)
}

View File

@ -1,241 +0,0 @@
package legacy_filter
import (
"context"
"crypto/rand"
"sync"
"testing"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
)
var testTopic = "/waku/2/go/filter/test"
var testContentTopic = "TopicA"
func makeWakuRelay(t *testing.T, topic string, broadcaster relay.Broadcaster) (*relay.WakuRelay, *relay.Subscription, host.Host) {
port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
relay.SetHost(host)
err = relay.Start(context.Background())
require.NoError(t, err)
sub, err := relay.Subscribe(context.Background(), protocol.NewContentFilter(topic))
require.NoError(t, err)
return relay, sub[0], host
}
func makeWakuFilter(t *testing.T) (*WakuFilter, host.Host) {
port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
b := relay.NewBroadcaster(10)
require.NoError(t, b.Start(context.Background()))
filter := NewWakuFilter(b, false, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
filter.SetHost(host)
sub := relay.NewSubscription(protocol.NewContentFilter(testTopic, testContentTopic))
err = filter.Start(context.Background(), sub)
require.NoError(t, err)
return filter, host
}
// Node1: Filter subscribed to content topic A
// Node2: Relay + Filter
//
// # Node1 and Node2 are peers
//
// Node2 send a successful message with topic A
// Node1 receive the message
//
// Node2 send a successful message with topic B
// Node1 doesn't receive the message
func TestWakuFilter(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds
defer cancel()
node1, host1 := makeWakuFilter(t)
defer node1.Stop()
broadcaster := relay.NewBroadcaster(10)
require.NoError(t, broadcaster.Start(context.Background()))
node2, sub2, host2 := makeWakuRelay(t, testTopic, broadcaster)
defer node2.Stop()
defer sub2.Unsubscribe()
node2Filter := NewWakuFilter(broadcaster, true, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
node2Filter.SetHost(host2)
sub := broadcaster.Register(protocol.NewContentFilter(testTopic))
err := node2Filter.Start(ctx, sub)
require.NoError(t, err)
host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
err = host1.Peerstore().AddProtocols(host2.ID(), FilterID_v20beta1)
require.NoError(t, err)
contentFilter := &ContentFilter{
Topic: string(testTopic),
ContentTopics: []string{testContentTopic},
}
_, f, err := node1.Subscribe(ctx, *contentFilter, WithPeer(node2Filter.h.ID()))
require.NoError(t, err)
// Sleep to make sure the filter is subscribed
time.Sleep(2 * time.Second)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
env := <-f.Chan
require.Equal(t, contentFilter.ContentTopics[0], env.Message().GetContentTopic())
}()
_, err = node2.Publish(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(testTopic))
require.NoError(t, err)
wg.Wait()
wg.Add(1)
go func() {
select {
case <-f.Chan:
require.Fail(t, "should not receive another message")
case <-time.After(1 * time.Second):
defer wg.Done()
case <-ctx.Done():
require.Fail(t, "test exceeded allocated time")
}
}()
_, err = node2.Publish(ctx, tests.CreateWakuMessage("TopicB", utils.GetUnixEpoch()), relay.WithPubSubTopic(testTopic))
require.NoError(t, err)
wg.Wait()
wg.Add(1)
go func() {
select {
case <-f.Chan:
require.Fail(t, "should not receive another message")
case <-time.After(1 * time.Second):
defer wg.Done()
case <-ctx.Done():
require.Fail(t, "test exceeded allocated time")
}
}()
err = node1.Unsubscribe(ctx, *contentFilter, node2Filter.h.ID())
require.NoError(t, err)
time.Sleep(1 * time.Second)
_, err = node2.Publish(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(testTopic))
require.NoError(t, err)
wg.Wait()
}
func TestWakuFilterPeerFailure(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds
defer cancel()
node1, host1 := makeWakuFilter(t)
broadcaster := relay.NewBroadcaster(10)
require.NoError(t, broadcaster.Start(context.Background()))
node2, sub2, host2 := makeWakuRelay(t, testTopic, broadcaster)
defer node2.Stop()
defer sub2.Unsubscribe()
broadcaster2 := relay.NewBroadcaster(10)
require.NoError(t, broadcaster2.Start(context.Background()))
node2Filter := NewWakuFilter(broadcaster2, true, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger(), WithTimeout(3*time.Second))
node2Filter.SetHost(host2)
sub := broadcaster.Register(protocol.NewContentFilter(testTopic))
err := node2Filter.Start(ctx, sub)
require.NoError(t, err)
host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
err = host1.Peerstore().AddProtocols(host2.ID(), FilterID_v20beta1)
require.NoError(t, err)
contentFilter := &ContentFilter{
Topic: string(testTopic),
ContentTopics: []string{testContentTopic},
}
_, f, err := node1.Subscribe(ctx, *contentFilter, WithPeer(node2Filter.h.ID()))
require.NoError(t, err)
// Simulate there's been a failure before
node2Filter.subscribers.FlagAsFailure(host1.ID())
// Sleep to make sure the filter is subscribed
time.Sleep(2 * time.Second)
require.True(t, node2Filter.subscribers.IsFailedPeer(host1.ID()))
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
env := <-f.Chan
require.Equal(t, contentFilter.ContentTopics[0], env.Message().GetContentTopic())
// Failure is removed
require.False(t, node2Filter.subscribers.IsFailedPeer(host1.ID()))
}()
_, err = node2.Publish(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(testTopic))
require.NoError(t, err)
wg.Wait()
// Kill the subscriber
host1.Close()
time.Sleep(1 * time.Second)
_, err = node2.Publish(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(testTopic))
require.NoError(t, err)
// TODO: find out how to eliminate this sleep
time.Sleep(1 * time.Second)
require.True(t, node2Filter.subscribers.IsFailedPeer(host1.ID()))
time.Sleep(3 * time.Second)
_, err = node2.Publish(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(testTopic))
require.NoError(t, err)
time.Sleep(1 * time.Second)
require.False(t, node2Filter.subscribers.IsFailedPeer(host1.ID())) // Failed peer has been removed
for subscriber := range node2Filter.subscribers.Items(nil) {
if subscriber.peer == node1.h.ID() {
require.Fail(t, "Subscriber should not exist")
}
}
}