diff --git a/waku/node.go b/waku/node.go index 857a3741..635509f3 100644 --- a/waku/node.go +++ b/waku/node.go @@ -173,7 +173,7 @@ func Execute(options Options) { } if options.Filter.Enable { - nodeOpts = append(nodeOpts, node.WithWakuFilter(!options.Filter.DisableFullNode)) + nodeOpts = append(nodeOpts, node.WithWakuFilter(!options.Filter.DisableFullNode, filter.WithTimeout(time.Duration(options.Filter.Timeout)*time.Second))) } if options.Store.Enable { diff --git a/waku/options.go b/waku/options.go index d34359ca..82d1cfd2 100644 --- a/waku/options.go +++ b/waku/options.go @@ -30,6 +30,7 @@ type FilterOptions struct { Enable bool `long:"filter" description:"Enable filter protocol"` DisableFullNode bool `long:"light-client" description:"Don't accept filter subscribers"` Nodes []string `long:"filter-node" description:"Multiaddr of a peer that supports filter protocol. Option may be repeated"` + Timeout int `long:"filter-timeout" description:"Timeout for filter node in seconds" default:"14400"` } // LightpushOptions are settings used to enable the lightpush protocol. This is diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 788b5618..c0410651 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -259,7 +259,11 @@ func (w *WakuNode) Start() error { } if w.opts.enableFilter { - w.filter = filter.NewWakuFilter(w.ctx, w.host, w.opts.isFilterFullNode) + filter, err := filter.NewWakuFilter(w.ctx, w.host, w.opts.isFilterFullNode, w.opts.filterOpts...) + if err != nil { + return err + } + w.filter = filter } if w.opts.enableRendezvous { diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index f074bff2..8b3860b3 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -17,6 +17,7 @@ import ( ma "github.com/multiformats/go-multiaddr" manet "github.com/multiformats/go-multiaddr/net" rendezvous "github.com/status-im/go-waku-rendezvous" + "github.com/status-im/go-waku/waku/v2/protocol/filter" "github.com/status-im/go-waku/waku/v2/protocol/store" ) @@ -37,6 +38,7 @@ type WakuNodeParameters struct { enableRelay bool enableFilter bool isFilterFullNode bool + filterOpts []filter.Option wOpts []pubsub.Option minRelayPeersToPublish int @@ -210,10 +212,11 @@ func WithRendezvousServer(storage rendezvous.Storage) WakuNodeOption { // WithWakuFilter enables the Waku V2 Filter protocol. This WakuNodeOption // accepts a list of WakuFilter gossipsub options to setup the protocol -func WithWakuFilter(fullNode bool) WakuNodeOption { +func WithWakuFilter(fullNode bool, filterOpts ...filter.Option) WakuNodeOption { return func(params *WakuNodeParameters) error { params.enableFilter = true params.isFilterFullNode = fullNode + params.filterOpts = filterOpts return nil } } diff --git a/waku/v2/protocol/filter/filter_subscribers.go b/waku/v2/protocol/filter/filter_subscribers.go index 3b5c2b9e..82c17f1c 100644 --- a/waku/v2/protocol/filter/filter_subscribers.go +++ b/waku/v2/protocol/filter/filter_subscribers.go @@ -2,6 +2,7 @@ package filter import ( "sync" + "time" "github.com/libp2p/go-libp2p-core/peer" "github.com/status-im/go-waku/waku/v2/protocol/pb" @@ -16,10 +17,15 @@ type Subscriber struct { type Subscribers struct { sync.RWMutex subscribers []Subscriber + timeout time.Duration + failedPeers map[peer.ID]time.Time } -func NewSubscribers() *Subscribers { - return &Subscribers{} +func NewSubscribers(timeout time.Duration) *Subscribers { + return &Subscribers{ + timeout: timeout, + failedPeers: make(map[peer.ID]time.Time), + } } func (sub *Subscribers) Append(s Subscriber) int { @@ -53,7 +59,45 @@ func (sub *Subscribers) Length() int { return len(sub.subscribers) } +func (sub *Subscribers) FlagAsSuccess(peerID peer.ID) { + sub.Lock() + defer sub.Unlock() + + _, ok := sub.failedPeers[peerID] + if ok { + delete(sub.failedPeers, peerID) + } +} + +func (sub *Subscribers) FlagAsFailure(peerID peer.ID) { + sub.Lock() + defer sub.Unlock() + + lastFailure, ok := sub.failedPeers[peerID] + if ok { + elapsedTime := time.Since(lastFailure) + if elapsedTime > sub.timeout { + log.Debug("filter timeout reached for peer:", peerID) + + var tmpSubs []Subscriber + for _, s := range sub.subscribers { + if s.peer != peerID { + tmpSubs = append(tmpSubs, s) + } + } + sub.subscribers = tmpSubs + + delete(sub.failedPeers, peerID) + } + } else { + sub.failedPeers[peerID] = time.Now() + } +} + func (sub *Subscribers) RemoveContentFilters(peerID peer.ID, contentFilters []*pb.FilterRequest_ContentFilter) { + sub.Lock() + defer sub.Unlock() + var peerIdsToRemove []peer.ID for _, subscriber := range sub.subscribers { diff --git a/waku/v2/protocol/filter/waku_filter.go b/waku/v2/protocol/filter/waku_filter.go index 9e6a2346..72d7c3f4 100644 --- a/waku/v2/protocol/filter/waku_filter.go +++ b/waku/v2/protocol/filter/waku_filter.go @@ -62,10 +62,18 @@ type ( // relay protocol. const FilterID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter/2.0.0-beta1") -func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool) *WakuFilter { +func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, opts ...Option) (*WakuFilter, error) { ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter")) if err != nil { log.Error(err) + return nil, errors.New("could not start waku filter") + } + + params := new(FilterParameters) + optList := DefaultOptions() + optList = append(optList, opts...) + for _, opt := range optList { + opt(params) } wf := new(WakuFilter) @@ -75,7 +83,7 @@ func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool) *WakuFi wf.h = host wf.isFullNode = isFullNode wf.filters = NewFilterMap() - wf.subscribers = NewSubscribers() + wf.subscribers = NewSubscribers(params.timeout) wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest) @@ -88,7 +96,7 @@ func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool) *WakuFi log.Info("Filter protocol started (only client mode)") } - return wf + return wf, nil } func (wf *WakuFilter) onRequest(s network.Stream) { @@ -140,11 +148,11 @@ func (wf *WakuFilter) onRequest(s network.Stream) { func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) error { pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: []*pb.WakuMessage{msg}}} - conn, err := wf.h.NewStream(wf.ctx, peer.ID(subscriber.peer), FilterID_v20beta1) - // TODO: keep track of errors to automatically unsubscribe a peer? + conn, err := wf.h.NewStream(wf.ctx, subscriber.peer, FilterID_v20beta1) if err != nil { - // @TODO more sophisticated error handling here - log.Error("failed to open peer stream") + wf.subscribers.FlagAsFailure(subscriber.peer) + + log.Error("failed to open peer stream", err) //waku_filter_errors.inc(labelValues = [dialFailure]) return err } @@ -153,10 +161,12 @@ func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) er writer := protoio.NewDelimitedWriter(conn) err = writer.WriteMsg(pushRPC) if err != nil { - log.Error("failed to push messages to remote peer") + log.Error("failed to push messages to remote peer", err) + wf.subscribers.FlagAsFailure(subscriber.peer) return nil } + wf.subscribers.FlagAsSuccess(subscriber.peer) return nil } @@ -206,7 +216,7 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil params := new(FilterSubscribeParameters) params.host = wf.h - optList := DefaultOptions() + optList := DefaultSubscribtionOptions() optList = append(optList, opts...) for _, opt := range optList { opt(params) diff --git a/waku/v2/protocol/filter/waku_filter_option.go b/waku/v2/protocol/filter/waku_filter_option.go index 11c35953..8a6a07b7 100644 --- a/waku/v2/protocol/filter/waku_filter_option.go +++ b/waku/v2/protocol/filter/waku_filter_option.go @@ -2,6 +2,7 @@ package filter import ( "context" + "time" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" @@ -15,8 +16,20 @@ type ( } FilterSubscribeOption func(*FilterSubscribeParameters) + + FilterParameters struct { + timeout time.Duration + } + + Option func(*FilterParameters) ) +func WithTimeout(timeout time.Duration) Option { + return func(params *FilterParameters) { + params.timeout = timeout + } +} + func WithPeer(p peer.ID) FilterSubscribeOption { return func(params *FilterSubscribeParameters) { params.selectedPeer = p @@ -45,7 +58,13 @@ func WithFastestPeerSelection(ctx context.Context) FilterSubscribeOption { } } -func DefaultOptions() []FilterSubscribeOption { +func DefaultOptions() []Option { + return []Option{ + WithTimeout(24 * time.Hour), + } +} + +func DefaultSubscribtionOptions() []FilterSubscribeOption { return []FilterSubscribeOption{ WithAutomaticPeerSelection(), } diff --git a/waku/v2/protocol/filter/waku_filter_test.go b/waku/v2/protocol/filter/waku_filter_test.go index ef3e4a00..5c1af5c4 100644 --- a/waku/v2/protocol/filter/waku_filter_test.go +++ b/waku/v2/protocol/filter/waku_filter_test.go @@ -38,7 +38,7 @@ func makeWakuFilter(t *testing.T) (*WakuFilter, host.Host) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - filter := NewWakuFilter(context.Background(), host, false) + filter, _ := NewWakuFilter(context.Background(), host, false) return filter, host } @@ -68,7 +68,7 @@ func TestWakuFilter(t *testing.T) { defer node2.Stop() defer sub2.Unsubscribe() - node2Filter := NewWakuFilter(ctx, host2, true) + node2Filter, _ := NewWakuFilter(ctx, host2, true) broadcaster.Register(node2Filter.MsgC) host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) @@ -138,3 +138,90 @@ func TestWakuFilter(t *testing.T) { require.NoError(t, err) wg.Wait() } + +func TestWakuFilterPeerFailure(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds + defer cancel() + + testTopic := "/waku/2/go/filter/test" + testContentTopic := "TopicA" + + node1, host1 := makeWakuFilter(t) + + broadcaster := v2.NewBroadcaster(10) + node2, sub2, host2 := makeWakuRelay(t, testTopic, broadcaster) + defer node2.Stop() + defer sub2.Unsubscribe() + + node2Filter, _ := NewWakuFilter(ctx, host2, true, WithTimeout(3*time.Second)) + broadcaster.Register(node2Filter.MsgC) + + host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) + err := host1.Peerstore().AddProtocols(host2.ID(), string(FilterID_v20beta1)) + require.NoError(t, err) + + contentFilter := &ContentFilter{ + Topic: string(testTopic), + ContentTopics: []string{testContentTopic}, + } + + _, f, err := node1.Subscribe(ctx, *contentFilter, WithPeer(node2Filter.h.ID())) + require.NoError(t, err) + + // Simulate there's been a failure before + node2Filter.subscribers.FlagAsFailure(host1.ID()) + + // Sleep to make sure the filter is subscribed + time.Sleep(2 * time.Second) + + _, ok := node2Filter.subscribers.failedPeers[host1.ID()] + require.True(t, ok) + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + env := <-f.Chan + require.Equal(t, contentFilter.ContentTopics[0], env.Message().GetContentTopic()) + + // Failure is removed + _, ok := node2Filter.subscribers.failedPeers[host1.ID()] + require.False(t, ok) + + }() + + _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 0), testTopic) + require.NoError(t, err) + + wg.Wait() + + // Kill the subscriber + host1.Close() + + time.Sleep(1 * time.Second) + + _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 1), testTopic) + require.NoError(t, err) + + // TODO: find out how to eliminate this sleep + time.Sleep(1 * time.Second) + _, ok = node2Filter.subscribers.failedPeers[host1.ID()] + require.True(t, ok) + + time.Sleep(3 * time.Second) + + _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 2), testTopic) + require.NoError(t, err) + + time.Sleep(1 * time.Second) + _, ok = node2Filter.subscribers.failedPeers[host1.ID()] + require.False(t, ok) // Failed peer has been removed + + for subscriber := range node2Filter.subscribers.Items() { + if subscriber.peer == node1.h.ID() { + require.Fail(t, "Subscriber should not exist") + } + } + +} diff --git a/waku/v2/rpc/filter_test.go b/waku/v2/rpc/filter_test.go index c8cd669b..72be6aad 100644 --- a/waku/v2/rpc/filter_test.go +++ b/waku/v2/rpc/filter_test.go @@ -44,7 +44,7 @@ func TestFilterSubscription(t *testing.T) { _, err = node.SubscribeToTopic(context.Background(), testTopic) require.NoError(t, err) - _ = filter.NewWakuFilter(context.Background(), host, false) + _, _ = filter.NewWakuFilter(context.Background(), host, false) d := makeFilterService(t) defer d.node.Stop()