mirror of
https://github.com/status-im/go-waku.git
synced 2025-02-26 20:10:44 +00:00
fix: handle stale clients in filter protocol (#174)
This commit is contained in:
parent
2b225e90e7
commit
c45e8a3c31
@ -173,7 +173,7 @@ func Execute(options Options) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if options.Filter.Enable {
|
if options.Filter.Enable {
|
||||||
nodeOpts = append(nodeOpts, node.WithWakuFilter(!options.Filter.DisableFullNode))
|
nodeOpts = append(nodeOpts, node.WithWakuFilter(!options.Filter.DisableFullNode, filter.WithTimeout(time.Duration(options.Filter.Timeout)*time.Second)))
|
||||||
}
|
}
|
||||||
|
|
||||||
if options.Store.Enable {
|
if options.Store.Enable {
|
||||||
|
@ -30,6 +30,7 @@ type FilterOptions struct {
|
|||||||
Enable bool `long:"filter" description:"Enable filter protocol"`
|
Enable bool `long:"filter" description:"Enable filter protocol"`
|
||||||
DisableFullNode bool `long:"light-client" description:"Don't accept filter subscribers"`
|
DisableFullNode bool `long:"light-client" description:"Don't accept filter subscribers"`
|
||||||
Nodes []string `long:"filter-node" description:"Multiaddr of a peer that supports filter protocol. Option may be repeated"`
|
Nodes []string `long:"filter-node" description:"Multiaddr of a peer that supports filter protocol. Option may be repeated"`
|
||||||
|
Timeout int `long:"filter-timeout" description:"Timeout for filter node in seconds" default:"14400"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// LightpushOptions are settings used to enable the lightpush protocol. This is
|
// LightpushOptions are settings used to enable the lightpush protocol. This is
|
||||||
|
@ -259,7 +259,11 @@ func (w *WakuNode) Start() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if w.opts.enableFilter {
|
if w.opts.enableFilter {
|
||||||
w.filter = filter.NewWakuFilter(w.ctx, w.host, w.opts.isFilterFullNode)
|
filter, err := filter.NewWakuFilter(w.ctx, w.host, w.opts.isFilterFullNode, w.opts.filterOpts...)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
w.filter = filter
|
||||||
}
|
}
|
||||||
|
|
||||||
if w.opts.enableRendezvous {
|
if w.opts.enableRendezvous {
|
||||||
|
@ -17,6 +17,7 @@ import (
|
|||||||
ma "github.com/multiformats/go-multiaddr"
|
ma "github.com/multiformats/go-multiaddr"
|
||||||
manet "github.com/multiformats/go-multiaddr/net"
|
manet "github.com/multiformats/go-multiaddr/net"
|
||||||
rendezvous "github.com/status-im/go-waku-rendezvous"
|
rendezvous "github.com/status-im/go-waku-rendezvous"
|
||||||
|
"github.com/status-im/go-waku/waku/v2/protocol/filter"
|
||||||
"github.com/status-im/go-waku/waku/v2/protocol/store"
|
"github.com/status-im/go-waku/waku/v2/protocol/store"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -37,6 +38,7 @@ type WakuNodeParameters struct {
|
|||||||
enableRelay bool
|
enableRelay bool
|
||||||
enableFilter bool
|
enableFilter bool
|
||||||
isFilterFullNode bool
|
isFilterFullNode bool
|
||||||
|
filterOpts []filter.Option
|
||||||
wOpts []pubsub.Option
|
wOpts []pubsub.Option
|
||||||
|
|
||||||
minRelayPeersToPublish int
|
minRelayPeersToPublish int
|
||||||
@ -210,10 +212,11 @@ func WithRendezvousServer(storage rendezvous.Storage) WakuNodeOption {
|
|||||||
|
|
||||||
// WithWakuFilter enables the Waku V2 Filter protocol. This WakuNodeOption
|
// WithWakuFilter enables the Waku V2 Filter protocol. This WakuNodeOption
|
||||||
// accepts a list of WakuFilter gossipsub options to setup the protocol
|
// accepts a list of WakuFilter gossipsub options to setup the protocol
|
||||||
func WithWakuFilter(fullNode bool) WakuNodeOption {
|
func WithWakuFilter(fullNode bool, filterOpts ...filter.Option) WakuNodeOption {
|
||||||
return func(params *WakuNodeParameters) error {
|
return func(params *WakuNodeParameters) error {
|
||||||
params.enableFilter = true
|
params.enableFilter = true
|
||||||
params.isFilterFullNode = fullNode
|
params.isFilterFullNode = fullNode
|
||||||
|
params.filterOpts = filterOpts
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2,6 +2,7 @@ package filter
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
"github.com/status-im/go-waku/waku/v2/protocol/pb"
|
"github.com/status-im/go-waku/waku/v2/protocol/pb"
|
||||||
@ -16,10 +17,15 @@ type Subscriber struct {
|
|||||||
type Subscribers struct {
|
type Subscribers struct {
|
||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
subscribers []Subscriber
|
subscribers []Subscriber
|
||||||
|
timeout time.Duration
|
||||||
|
failedPeers map[peer.ID]time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSubscribers() *Subscribers {
|
func NewSubscribers(timeout time.Duration) *Subscribers {
|
||||||
return &Subscribers{}
|
return &Subscribers{
|
||||||
|
timeout: timeout,
|
||||||
|
failedPeers: make(map[peer.ID]time.Time),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sub *Subscribers) Append(s Subscriber) int {
|
func (sub *Subscribers) Append(s Subscriber) int {
|
||||||
@ -53,7 +59,45 @@ func (sub *Subscribers) Length() int {
|
|||||||
return len(sub.subscribers)
|
return len(sub.subscribers)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sub *Subscribers) FlagAsSuccess(peerID peer.ID) {
|
||||||
|
sub.Lock()
|
||||||
|
defer sub.Unlock()
|
||||||
|
|
||||||
|
_, ok := sub.failedPeers[peerID]
|
||||||
|
if ok {
|
||||||
|
delete(sub.failedPeers, peerID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (sub *Subscribers) FlagAsFailure(peerID peer.ID) {
|
||||||
|
sub.Lock()
|
||||||
|
defer sub.Unlock()
|
||||||
|
|
||||||
|
lastFailure, ok := sub.failedPeers[peerID]
|
||||||
|
if ok {
|
||||||
|
elapsedTime := time.Since(lastFailure)
|
||||||
|
if elapsedTime > sub.timeout {
|
||||||
|
log.Debug("filter timeout reached for peer:", peerID)
|
||||||
|
|
||||||
|
var tmpSubs []Subscriber
|
||||||
|
for _, s := range sub.subscribers {
|
||||||
|
if s.peer != peerID {
|
||||||
|
tmpSubs = append(tmpSubs, s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sub.subscribers = tmpSubs
|
||||||
|
|
||||||
|
delete(sub.failedPeers, peerID)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
sub.failedPeers[peerID] = time.Now()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (sub *Subscribers) RemoveContentFilters(peerID peer.ID, contentFilters []*pb.FilterRequest_ContentFilter) {
|
func (sub *Subscribers) RemoveContentFilters(peerID peer.ID, contentFilters []*pb.FilterRequest_ContentFilter) {
|
||||||
|
sub.Lock()
|
||||||
|
defer sub.Unlock()
|
||||||
|
|
||||||
var peerIdsToRemove []peer.ID
|
var peerIdsToRemove []peer.ID
|
||||||
|
|
||||||
for _, subscriber := range sub.subscribers {
|
for _, subscriber := range sub.subscribers {
|
||||||
|
@ -62,10 +62,18 @@ type (
|
|||||||
// relay protocol.
|
// relay protocol.
|
||||||
const FilterID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter/2.0.0-beta1")
|
const FilterID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter/2.0.0-beta1")
|
||||||
|
|
||||||
func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool) *WakuFilter {
|
func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, opts ...Option) (*WakuFilter, error) {
|
||||||
ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter"))
|
ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
|
return nil, errors.New("could not start waku filter")
|
||||||
|
}
|
||||||
|
|
||||||
|
params := new(FilterParameters)
|
||||||
|
optList := DefaultOptions()
|
||||||
|
optList = append(optList, opts...)
|
||||||
|
for _, opt := range optList {
|
||||||
|
opt(params)
|
||||||
}
|
}
|
||||||
|
|
||||||
wf := new(WakuFilter)
|
wf := new(WakuFilter)
|
||||||
@ -75,7 +83,7 @@ func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool) *WakuFi
|
|||||||
wf.h = host
|
wf.h = host
|
||||||
wf.isFullNode = isFullNode
|
wf.isFullNode = isFullNode
|
||||||
wf.filters = NewFilterMap()
|
wf.filters = NewFilterMap()
|
||||||
wf.subscribers = NewSubscribers()
|
wf.subscribers = NewSubscribers(params.timeout)
|
||||||
|
|
||||||
wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest)
|
wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest)
|
||||||
|
|
||||||
@ -88,7 +96,7 @@ func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool) *WakuFi
|
|||||||
log.Info("Filter protocol started (only client mode)")
|
log.Info("Filter protocol started (only client mode)")
|
||||||
}
|
}
|
||||||
|
|
||||||
return wf
|
return wf, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wf *WakuFilter) onRequest(s network.Stream) {
|
func (wf *WakuFilter) onRequest(s network.Stream) {
|
||||||
@ -140,11 +148,11 @@ func (wf *WakuFilter) onRequest(s network.Stream) {
|
|||||||
func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) error {
|
func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) error {
|
||||||
pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: []*pb.WakuMessage{msg}}}
|
pushRPC := &pb.FilterRPC{RequestId: subscriber.requestId, Push: &pb.MessagePush{Messages: []*pb.WakuMessage{msg}}}
|
||||||
|
|
||||||
conn, err := wf.h.NewStream(wf.ctx, peer.ID(subscriber.peer), FilterID_v20beta1)
|
conn, err := wf.h.NewStream(wf.ctx, subscriber.peer, FilterID_v20beta1)
|
||||||
// TODO: keep track of errors to automatically unsubscribe a peer?
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// @TODO more sophisticated error handling here
|
wf.subscribers.FlagAsFailure(subscriber.peer)
|
||||||
log.Error("failed to open peer stream")
|
|
||||||
|
log.Error("failed to open peer stream", err)
|
||||||
//waku_filter_errors.inc(labelValues = [dialFailure])
|
//waku_filter_errors.inc(labelValues = [dialFailure])
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -153,10 +161,12 @@ func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) er
|
|||||||
writer := protoio.NewDelimitedWriter(conn)
|
writer := protoio.NewDelimitedWriter(conn)
|
||||||
err = writer.WriteMsg(pushRPC)
|
err = writer.WriteMsg(pushRPC)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("failed to push messages to remote peer")
|
log.Error("failed to push messages to remote peer", err)
|
||||||
|
wf.subscribers.FlagAsFailure(subscriber.peer)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
wf.subscribers.FlagAsSuccess(subscriber.peer)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -206,7 +216,7 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil
|
|||||||
params := new(FilterSubscribeParameters)
|
params := new(FilterSubscribeParameters)
|
||||||
params.host = wf.h
|
params.host = wf.h
|
||||||
|
|
||||||
optList := DefaultOptions()
|
optList := DefaultSubscribtionOptions()
|
||||||
optList = append(optList, opts...)
|
optList = append(optList, opts...)
|
||||||
for _, opt := range optList {
|
for _, opt := range optList {
|
||||||
opt(params)
|
opt(params)
|
||||||
|
@ -2,6 +2,7 @@ package filter
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/libp2p/go-libp2p-core/host"
|
"github.com/libp2p/go-libp2p-core/host"
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
"github.com/libp2p/go-libp2p-core/peer"
|
||||||
@ -15,8 +16,20 @@ type (
|
|||||||
}
|
}
|
||||||
|
|
||||||
FilterSubscribeOption func(*FilterSubscribeParameters)
|
FilterSubscribeOption func(*FilterSubscribeParameters)
|
||||||
|
|
||||||
|
FilterParameters struct {
|
||||||
|
timeout time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
Option func(*FilterParameters)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func WithTimeout(timeout time.Duration) Option {
|
||||||
|
return func(params *FilterParameters) {
|
||||||
|
params.timeout = timeout
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func WithPeer(p peer.ID) FilterSubscribeOption {
|
func WithPeer(p peer.ID) FilterSubscribeOption {
|
||||||
return func(params *FilterSubscribeParameters) {
|
return func(params *FilterSubscribeParameters) {
|
||||||
params.selectedPeer = p
|
params.selectedPeer = p
|
||||||
@ -45,7 +58,13 @@ func WithFastestPeerSelection(ctx context.Context) FilterSubscribeOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func DefaultOptions() []FilterSubscribeOption {
|
func DefaultOptions() []Option {
|
||||||
|
return []Option{
|
||||||
|
WithTimeout(24 * time.Hour),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func DefaultSubscribtionOptions() []FilterSubscribeOption {
|
||||||
return []FilterSubscribeOption{
|
return []FilterSubscribeOption{
|
||||||
WithAutomaticPeerSelection(),
|
WithAutomaticPeerSelection(),
|
||||||
}
|
}
|
||||||
|
@ -38,7 +38,7 @@ func makeWakuFilter(t *testing.T) (*WakuFilter, host.Host) {
|
|||||||
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
|
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
filter := NewWakuFilter(context.Background(), host, false)
|
filter, _ := NewWakuFilter(context.Background(), host, false)
|
||||||
|
|
||||||
return filter, host
|
return filter, host
|
||||||
}
|
}
|
||||||
@ -68,7 +68,7 @@ func TestWakuFilter(t *testing.T) {
|
|||||||
defer node2.Stop()
|
defer node2.Stop()
|
||||||
defer sub2.Unsubscribe()
|
defer sub2.Unsubscribe()
|
||||||
|
|
||||||
node2Filter := NewWakuFilter(ctx, host2, true)
|
node2Filter, _ := NewWakuFilter(ctx, host2, true)
|
||||||
broadcaster.Register(node2Filter.MsgC)
|
broadcaster.Register(node2Filter.MsgC)
|
||||||
|
|
||||||
host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
|
host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
|
||||||
@ -138,3 +138,90 @@ func TestWakuFilter(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWakuFilterPeerFailure(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
testTopic := "/waku/2/go/filter/test"
|
||||||
|
testContentTopic := "TopicA"
|
||||||
|
|
||||||
|
node1, host1 := makeWakuFilter(t)
|
||||||
|
|
||||||
|
broadcaster := v2.NewBroadcaster(10)
|
||||||
|
node2, sub2, host2 := makeWakuRelay(t, testTopic, broadcaster)
|
||||||
|
defer node2.Stop()
|
||||||
|
defer sub2.Unsubscribe()
|
||||||
|
|
||||||
|
node2Filter, _ := NewWakuFilter(ctx, host2, true, WithTimeout(3*time.Second))
|
||||||
|
broadcaster.Register(node2Filter.MsgC)
|
||||||
|
|
||||||
|
host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
|
||||||
|
err := host1.Peerstore().AddProtocols(host2.ID(), string(FilterID_v20beta1))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
contentFilter := &ContentFilter{
|
||||||
|
Topic: string(testTopic),
|
||||||
|
ContentTopics: []string{testContentTopic},
|
||||||
|
}
|
||||||
|
|
||||||
|
_, f, err := node1.Subscribe(ctx, *contentFilter, WithPeer(node2Filter.h.ID()))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// Simulate there's been a failure before
|
||||||
|
node2Filter.subscribers.FlagAsFailure(host1.ID())
|
||||||
|
|
||||||
|
// Sleep to make sure the filter is subscribed
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
|
_, ok := node2Filter.subscribers.failedPeers[host1.ID()]
|
||||||
|
require.True(t, ok)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
env := <-f.Chan
|
||||||
|
require.Equal(t, contentFilter.ContentTopics[0], env.Message().GetContentTopic())
|
||||||
|
|
||||||
|
// Failure is removed
|
||||||
|
_, ok := node2Filter.subscribers.failedPeers[host1.ID()]
|
||||||
|
require.False(t, ok)
|
||||||
|
|
||||||
|
}()
|
||||||
|
|
||||||
|
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 0), testTopic)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
// Kill the subscriber
|
||||||
|
host1.Close()
|
||||||
|
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
|
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 1), testTopic)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// TODO: find out how to eliminate this sleep
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
_, ok = node2Filter.subscribers.failedPeers[host1.ID()]
|
||||||
|
require.True(t, ok)
|
||||||
|
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
|
||||||
|
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 2), testTopic)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
_, ok = node2Filter.subscribers.failedPeers[host1.ID()]
|
||||||
|
require.False(t, ok) // Failed peer has been removed
|
||||||
|
|
||||||
|
for subscriber := range node2Filter.subscribers.Items() {
|
||||||
|
if subscriber.peer == node1.h.ID() {
|
||||||
|
require.Fail(t, "Subscriber should not exist")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
@ -44,7 +44,7 @@ func TestFilterSubscription(t *testing.T) {
|
|||||||
_, err = node.SubscribeToTopic(context.Background(), testTopic)
|
_, err = node.SubscribeToTopic(context.Background(), testTopic)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
_ = filter.NewWakuFilter(context.Background(), host, false)
|
_, _ = filter.NewWakuFilter(context.Background(), host, false)
|
||||||
|
|
||||||
d := makeFilterService(t)
|
d := makeFilterService(t)
|
||||||
defer d.node.Stop()
|
defer d.node.Stop()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user