diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index aa0c38d5..20815a65 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -18,6 +18,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/v2/peermanager" + "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter/pb" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" @@ -26,6 +27,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/service" "github.com/waku-org/go-waku/waku/v2/timesource" "go.uber.org/zap" + "golang.org/x/exp/maps" "golang.org/x/exp/slices" ) @@ -272,6 +274,52 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr return nil } +func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context, contentFilter protocol.ContentFilter, opts []FilterSubscribeOption) (*FilterSubscribeParameters, map[string][]string, error) { + params := new(FilterSubscribeParameters) + params.log = wf.log + params.host = wf.h + params.pm = wf.pm + + optList := DefaultSubscriptionOptions() + optList = append(optList, opts...) + for _, opt := range optList { + err := opt(params) + if err != nil { + return nil, nil, err + } + } + + pubSubTopicMap, err := protocol.ContentFilterToPubSubTopicMap(contentFilter) + if err != nil { + return nil, nil, err + } + + //Add Peer to peerstore. + if params.pm != nil && params.peerAddr != nil { + pData, err := wf.pm.AddPeer(params.peerAddr, peerstore.Static, maps.Keys(pubSubTopicMap), FilterSubscribeID_v20beta1) + if err != nil { + return nil, nil, err + } + wf.pm.Connect(pData) + params.selectedPeer = pData.AddrInfo.ID + } + if params.pm != nil && params.selectedPeer == "" { + params.selectedPeer, err = wf.pm.SelectPeer( + peermanager.PeerSelectionCriteria{ + SelectionType: params.peerSelectionType, + Proto: FilterSubscribeID_v20beta1, + PubsubTopics: maps.Keys(pubSubTopicMap), + SpecificPeers: params.preferredPeers, + Ctx: ctx, + }, + ) + if err != nil { + return nil, nil, err + } + } + return params, pubSubTopicMap, nil +} + // Subscribe setups a subscription to receive messages that match a specific content filter // If contentTopics passed result in different pubSub topics (due to Auto/Static sharding), then multiple subscription requests are sent to the peer. // This may change if Filterv2 protocol is updated to handle such a scenario in a single request. @@ -283,30 +331,15 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot return nil, err } - params := new(FilterSubscribeParameters) - params.log = wf.log - params.host = wf.h - params.pm = wf.pm - - optList := DefaultSubscriptionOptions() - optList = append(optList, opts...) - for _, opt := range optList { - err := opt(params) - if err != nil { - return nil, err - } - } - - pubSubTopicMap, err := protocol.ContentFilterToPubSubTopicMap(contentFilter) - + params, pubSubTopicMap, err := wf.handleFilterSubscribeOptions(ctx, contentFilter, opts) if err != nil { return nil, err } + failedContentTopics := []string{} subscriptions := make([]*subscription.SubscriptionDetails, 0) for pubSubTopic, cTopics := range pubSubTopicMap { var selectedPeer peer.ID - //TO Optimize: find a peer with all pubSubTopics in the list if possible, if not only then look for single pubSubTopic if params.pm != nil && params.selectedPeer == "" { selectedPeer, err = wf.pm.SelectPeer( peermanager.PeerSelectionCriteria{ diff --git a/waku/v2/protocol/filter/options.go b/waku/v2/protocol/filter/options.go index 9495ee35..93c5326b 100644 --- a/waku/v2/protocol/filter/options.go +++ b/waku/v2/protocol/filter/options.go @@ -1,11 +1,13 @@ package filter import ( + "errors" "sync" "time" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol" "go.uber.org/zap" @@ -34,6 +36,7 @@ func WithPingRequestId(requestId []byte) FilterPingOption { type ( FilterSubscribeParameters struct { selectedPeer peer.ID + peerAddr multiaddr.Multiaddr peerSelectionType peermanager.PeerSelection preferredPeers peer.IDSlice requestID []byte @@ -65,9 +68,27 @@ func WithTimeout(timeout time.Duration) Option { } } +// WithPeer is an option used to specify the peerID to request the message history. +// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used. func WithPeer(p peer.ID) FilterSubscribeOption { return func(params *FilterSubscribeParameters) error { params.selectedPeer = p + if params.peerAddr != nil { + return errors.New("peerAddr and peerId options are mutually exclusive") + } + return nil + } +} + +// WithPeerAddr is an option used to specify a peerAddress. +// This new peer will be added to peerStore. +// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used. +func WithPeerAddr(pAddr multiaddr.Multiaddr) FilterSubscribeOption { + return func(params *FilterSubscribeParameters) error { + params.peerAddr = pAddr + if params.selectedPeer != "" { + return errors.New("peerAddr and peerId options are mutually exclusive") + } return nil } } diff --git a/waku/v2/protocol/filter/options_test.go b/waku/v2/protocol/filter/options_test.go index d02bb2da..71d19500 100644 --- a/waku/v2/protocol/filter/options_test.go +++ b/waku/v2/protocol/filter/options_test.go @@ -5,6 +5,7 @@ import ( "crypto/rand" "testing" + "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/utils" @@ -29,7 +30,8 @@ func TestFilterOption(t *testing.T) { params.log = utils.Logger() for _, opt := range options { - _ = opt(params) + err = opt(params) + require.NoError(t, err) } require.Equal(t, host, params.host) @@ -52,4 +54,26 @@ func TestFilterOption(t *testing.T) { require.NotNil(t, params2.selectedPeer) require.True(t, params2.unsubscribeAll) + // Mutually Exclusive options + maddr, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/12345/p2p/16Uiu2HAm8KUwGRruseAaEGD6xGg6XKrDo8Py5dwDoL9wUpCxawGy") + require.NoError(t, err) + options3 := []FilterSubscribeOption{ + WithPeer("16Uiu2HAm8KUwGRruseAaEGD6xGg6XKrDo8Py5dwDoL9wUpCxawGy"), + WithPeerAddr(maddr), + } + + params3 := new(FilterSubscribeParameters) + + for idx, opt := range options3 { + err := opt(params3) + if idx == 0 { + require.NoError(t, err) + } else { + require.Error(t, err) + } + } + + require.NotNil(t, params2.selectedPeer) + require.True(t, params2.unsubscribeAll) + } diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index f1890ed9..5cdcc6c3 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -14,6 +14,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/v2/peermanager" + "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush/pb" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" @@ -239,7 +240,10 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe optList := append(DefaultOptions(wakuLP.h), opts...) for _, opt := range optList { - opt(params) + err := opt(params) + if err != nil { + return nil, err + } } if params.pubsubTopic == "" { @@ -249,6 +253,15 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe } } + if params.pm != nil && params.peerAddr != nil { + pData, err := wakuLP.pm.AddPeer(params.peerAddr, peerstore.Static, []string{params.pubsubTopic}, LightPushID_v20beta1) + if err != nil { + return nil, err + } + wakuLP.pm.Connect(pData) + params.selectedPeer = pData.AddrInfo.ID + } + if params.pm != nil && params.selectedPeer == "" { params.selectedPeer, err = wakuLP.pm.SelectPeer( peermanager.PeerSelectionCriteria{ diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option.go b/waku/v2/protocol/lightpush/waku_lightpush_option.go index d1627c16..caf87c0c 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -1,8 +1,11 @@ package lightpush import ( + "errors" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/relay" @@ -11,6 +14,7 @@ import ( type lightPushParameters struct { host host.Host + peerAddr multiaddr.Multiaddr selectedPeer peer.ID peerSelectionType peermanager.PeerSelection preferredPeers peer.IDSlice @@ -21,12 +25,29 @@ type lightPushParameters struct { } // Option is the type of options accepted when performing LightPush protocol requests -type Option func(*lightPushParameters) +type Option func(*lightPushParameters) error // WithPeer is an option used to specify the peerID to push a waku message to func WithPeer(p peer.ID) Option { - return func(params *lightPushParameters) { + return func(params *lightPushParameters) error { params.selectedPeer = p + if params.peerAddr != nil { + return errors.New("peerAddr and peerId options are mutually exclusive") + } + return nil + } +} + +// WithPeerAddr is an option used to specify a peerAddress +// This new peer will be added to peerStore. +// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used. +func WithPeerAddr(pAddr multiaddr.Multiaddr) Option { + return func(params *lightPushParameters) error { + params.peerAddr = pAddr + if params.selectedPeer != "" { + return errors.New("peerAddr and peerId options are mutually exclusive") + } + return nil } } @@ -35,9 +56,10 @@ func WithPeer(p peer.ID) Option { // from that list assuming it supports the chosen protocol, otherwise it will chose a peer // from the node peerstore func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) Option { - return func(params *lightPushParameters) { + return func(params *lightPushParameters) error { params.peerSelectionType = peermanager.Automatic params.preferredPeers = fromThesePeers + return nil } } @@ -46,38 +68,43 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) Option { // from that list assuming it supports the chosen protocol, otherwise it will chose a peer // from the node peerstore func WithFastestPeerSelection(fromThesePeers ...peer.ID) Option { - return func(params *lightPushParameters) { + return func(params *lightPushParameters) error { params.peerSelectionType = peermanager.LowestRTT + return nil } } // WithPubSubTopic is used to specify the pubsub topic on which a WakuMessage will be broadcasted func WithPubSubTopic(pubsubTopic string) Option { - return func(params *lightPushParameters) { + return func(params *lightPushParameters) error { params.pubsubTopic = pubsubTopic + return nil } } // WithDefaultPubsubTopic is used to indicate that the message should be broadcasted in the default pubsub topic func WithDefaultPubsubTopic() Option { - return func(params *lightPushParameters) { + return func(params *lightPushParameters) error { params.pubsubTopic = relay.DefaultWakuTopic + return nil } } // WithRequestID is an option to set a specific request ID to be used when // publishing a message func WithRequestID(requestID []byte) Option { - return func(params *lightPushParameters) { + return func(params *lightPushParameters) error { params.requestID = requestID + return nil } } // WithAutomaticRequestID is an option to automatically generate a request ID // when publishing a message func WithAutomaticRequestID() Option { - return func(params *lightPushParameters) { + return func(params *lightPushParameters) error { params.requestID = protocol.GenerateRequestID() + return nil } } diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option_test.go b/waku/v2/protocol/lightpush/waku_lightpush_option_test.go index 3bb72654..7c69dd9c 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option_test.go @@ -5,6 +5,7 @@ import ( "crypto/rand" "testing" + "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/utils" @@ -30,10 +31,28 @@ func TestLightPushOption(t *testing.T) { params.log = utils.Logger() for _, opt := range options { - opt(params) + err := opt(params) + require.NoError(t, err) } require.Equal(t, host, params.host) require.NotNil(t, params.selectedPeer) require.NotNil(t, params.requestID) + + maddr, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/12345/p2p/16Uiu2HAm8KUwGRruseAaEGD6xGg6XKrDo8Py5dwDoL9wUpCxawGy") + require.NoError(t, err) + + options = []Option{ + WithPeer("16Uiu2HAm8KUwGRruseAaEGD6xGg6XKrDo8Py5dwDoL9wUpCxawGy"), + WithPeerAddr(maddr), + } + + for idx, opt := range options { + err = opt(params) + if idx == 0 { + require.NoError(t, err) + } else { + require.Error(t, err) + } + } } diff --git a/waku/v2/protocol/peer_exchange/client.go b/waku/v2/protocol/peer_exchange/client.go index 8de15c56..2d8564fb 100644 --- a/waku/v2/protocol/peer_exchange/client.go +++ b/waku/v2/protocol/peer_exchange/client.go @@ -27,8 +27,21 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts optList := DefaultOptions(wakuPX.h) optList = append(optList, opts...) for _, opt := range optList { - opt(params) + err := opt(params) + if err != nil { + return err + } } + + if params.pm != nil && params.peerAddr != nil { + pData, err := wakuPX.pm.AddPeer(params.peerAddr, peerstore.Static, []string{}, PeerExchangeID_v20alpha1) + if err != nil { + return err + } + wakuPX.pm.Connect(pData) + params.selectedPeer = pData.AddrInfo.ID + } + if params.pm != nil && params.selectedPeer == "" { var err error params.selectedPeer, err = wakuPX.pm.SelectPeer( diff --git a/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go b/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go index 57cf2455..55702ad5 100644 --- a/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go +++ b/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go @@ -1,8 +1,11 @@ package peer_exchange import ( + "errors" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/waku/v2/peermanager" "go.uber.org/zap" ) @@ -10,18 +13,36 @@ import ( type PeerExchangeParameters struct { host host.Host selectedPeer peer.ID + peerAddr multiaddr.Multiaddr peerSelectionType peermanager.PeerSelection preferredPeers peer.IDSlice pm *peermanager.PeerManager log *zap.Logger } -type PeerExchangeOption func(*PeerExchangeParameters) +type PeerExchangeOption func(*PeerExchangeParameters) error -// WithPeer is an option used to specify the peerID to push a waku message to +// WithPeer is an option used to specify the peerID to fetch peers from func WithPeer(p peer.ID) PeerExchangeOption { - return func(params *PeerExchangeParameters) { + return func(params *PeerExchangeParameters) error { params.selectedPeer = p + if params.peerAddr != nil { + return errors.New("peerAddr and peerId options are mutually exclusive") + } + return nil + } +} + +// WithPeerAddr is an option used to specify a peerAddress to fetch peers from +// This new peer will be added to peerStore. +// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used. +func WithPeerAddr(pAddr multiaddr.Multiaddr) PeerExchangeOption { + return func(params *PeerExchangeParameters) error { + params.peerAddr = pAddr + if params.selectedPeer != "" { + return errors.New("peerAddr and peerId options are mutually exclusive") + } + return nil } } @@ -31,9 +52,10 @@ func WithPeer(p peer.ID) PeerExchangeOption { // from the node peerstore // Note: this option can only be used if WakuNode is initialized which internally intializes the peerManager func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption { - return func(params *PeerExchangeParameters) { + return func(params *PeerExchangeParameters) error { params.peerSelectionType = peermanager.Automatic params.preferredPeers = fromThesePeers + return nil } } @@ -42,9 +64,10 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption { // from that list assuming it supports the chosen protocol, otherwise it will chose a peer // from the node peerstore func WithFastestPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption { - return func(params *PeerExchangeParameters) { + return func(params *PeerExchangeParameters) error { params.peerSelectionType = peermanager.LowestRTT params.preferredPeers = fromThesePeers + return nil } }