diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index aa0c38d5..20815a65 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -18,6 +18,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/v2/peermanager" + "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter/pb" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" @@ -26,6 +27,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/service" "github.com/waku-org/go-waku/waku/v2/timesource" "go.uber.org/zap" + "golang.org/x/exp/maps" "golang.org/x/exp/slices" ) @@ -272,6 +274,52 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr return nil } +func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context, contentFilter protocol.ContentFilter, opts []FilterSubscribeOption) (*FilterSubscribeParameters, map[string][]string, error) { + params := new(FilterSubscribeParameters) + params.log = wf.log + params.host = wf.h + params.pm = wf.pm + + optList := DefaultSubscriptionOptions() + optList = append(optList, opts...) + for _, opt := range optList { + err := opt(params) + if err != nil { + return nil, nil, err + } + } + + pubSubTopicMap, err := protocol.ContentFilterToPubSubTopicMap(contentFilter) + if err != nil { + return nil, nil, err + } + + //Add Peer to peerstore. + if params.pm != nil && params.peerAddr != nil { + pData, err := wf.pm.AddPeer(params.peerAddr, peerstore.Static, maps.Keys(pubSubTopicMap), FilterSubscribeID_v20beta1) + if err != nil { + return nil, nil, err + } + wf.pm.Connect(pData) + params.selectedPeer = pData.AddrInfo.ID + } + if params.pm != nil && params.selectedPeer == "" { + params.selectedPeer, err = wf.pm.SelectPeer( + peermanager.PeerSelectionCriteria{ + SelectionType: params.peerSelectionType, + Proto: FilterSubscribeID_v20beta1, + PubsubTopics: maps.Keys(pubSubTopicMap), + SpecificPeers: params.preferredPeers, + Ctx: ctx, + }, + ) + if err != nil { + return nil, nil, err + } + } + return params, pubSubTopicMap, nil +} + // Subscribe setups a subscription to receive messages that match a specific content filter // If contentTopics passed result in different pubSub topics (due to Auto/Static sharding), then multiple subscription requests are sent to the peer. // This may change if Filterv2 protocol is updated to handle such a scenario in a single request. @@ -283,30 +331,15 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot return nil, err } - params := new(FilterSubscribeParameters) - params.log = wf.log - params.host = wf.h - params.pm = wf.pm - - optList := DefaultSubscriptionOptions() - optList = append(optList, opts...) - for _, opt := range optList { - err := opt(params) - if err != nil { - return nil, err - } - } - - pubSubTopicMap, err := protocol.ContentFilterToPubSubTopicMap(contentFilter) - + params, pubSubTopicMap, err := wf.handleFilterSubscribeOptions(ctx, contentFilter, opts) if err != nil { return nil, err } + failedContentTopics := []string{} subscriptions := make([]*subscription.SubscriptionDetails, 0) for pubSubTopic, cTopics := range pubSubTopicMap { var selectedPeer peer.ID - //TO Optimize: find a peer with all pubSubTopics in the list if possible, if not only then look for single pubSubTopic if params.pm != nil && params.selectedPeer == "" { selectedPeer, err = wf.pm.SelectPeer( peermanager.PeerSelectionCriteria{ diff --git a/waku/v2/protocol/filter/options.go b/waku/v2/protocol/filter/options.go index 9495ee35..556e95f5 100644 --- a/waku/v2/protocol/filter/options.go +++ b/waku/v2/protocol/filter/options.go @@ -1,11 +1,13 @@ package filter import ( + "errors" "sync" "time" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol" "go.uber.org/zap" @@ -34,6 +36,7 @@ func WithPingRequestId(requestId []byte) FilterPingOption { type ( FilterSubscribeParameters struct { selectedPeer peer.ID + peerAddr multiaddr.Multiaddr peerSelectionType peermanager.PeerSelection preferredPeers peer.IDSlice requestID []byte @@ -65,9 +68,27 @@ func WithTimeout(timeout time.Duration) Option { } } +// WithPeer is an option used to specify the peerID to request the message history. +// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used. func WithPeer(p peer.ID) FilterSubscribeOption { return func(params *FilterSubscribeParameters) error { params.selectedPeer = p + if params.peerAddr != nil { + return errors.New("peerAddr and peerId options are mutually exclusive") + } + return nil + } +} + +// WithPeerAddr is an option used to specify a peerAddress to request the message history. +// This new peer will be added to peerStore. +// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used. +func WithPeerAddr(pAddr multiaddr.Multiaddr) FilterSubscribeOption { + return func(params *FilterSubscribeParameters) error { + params.peerAddr = pAddr + if params.selectedPeer != "" { + return errors.New("peerAddr and peerId options are mutually exclusive") + } return nil } } diff --git a/waku/v2/protocol/filter/options_test.go b/waku/v2/protocol/filter/options_test.go index d02bb2da..63984aeb 100644 --- a/waku/v2/protocol/filter/options_test.go +++ b/waku/v2/protocol/filter/options_test.go @@ -5,6 +5,7 @@ import ( "crypto/rand" "testing" + "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/utils" @@ -52,4 +53,26 @@ func TestFilterOption(t *testing.T) { require.NotNil(t, params2.selectedPeer) require.True(t, params2.unsubscribeAll) + // Mutually Exclusive options + maddr, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/12345/p2p/16Uiu2HAm8KUwGRruseAaEGD6xGg6XKrDo8Py5dwDoL9wUpCxawGy") + require.NoError(t, err) + options3 := []FilterSubscribeOption{ + WithPeer("16Uiu2HAm8KUwGRruseAaEGD6xGg6XKrDo8Py5dwDoL9wUpCxawGy"), + WithPeerAddr(maddr), + } + + params3 := new(FilterSubscribeParameters) + + for idx, opt := range options3 { + err := opt(params3) + if idx == 0 { + require.NoError(t, err) + } else { + require.Error(t, err) + } + } + + require.NotNil(t, params2.selectedPeer) + require.True(t, params2.unsubscribeAll) + }