diff --git a/waku/v2/protocol/filter/options.go b/waku/v2/protocol/filter/options.go index 556e95f5..93c5326b 100644 --- a/waku/v2/protocol/filter/options.go +++ b/waku/v2/protocol/filter/options.go @@ -80,7 +80,7 @@ func WithPeer(p peer.ID) FilterSubscribeOption { } } -// WithPeerAddr is an option used to specify a peerAddress to request the message history. +// WithPeerAddr is an option used to specify a peerAddress. // This new peer will be added to peerStore. // Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used. func WithPeerAddr(pAddr multiaddr.Multiaddr) FilterSubscribeOption { diff --git a/waku/v2/protocol/filter/options_test.go b/waku/v2/protocol/filter/options_test.go index 63984aeb..71d19500 100644 --- a/waku/v2/protocol/filter/options_test.go +++ b/waku/v2/protocol/filter/options_test.go @@ -30,7 +30,8 @@ func TestFilterOption(t *testing.T) { params.log = utils.Logger() for _, opt := range options { - _ = opt(params) + err = opt(params) + require.NoError(t, err) } require.Equal(t, host, params.host) diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index f1890ed9..5cdcc6c3 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -14,6 +14,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/v2/peermanager" + "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush/pb" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" @@ -239,7 +240,10 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe optList := append(DefaultOptions(wakuLP.h), opts...) for _, opt := range optList { - opt(params) + err := opt(params) + if err != nil { + return nil, err + } } if params.pubsubTopic == "" { @@ -249,6 +253,15 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe } } + if params.pm != nil && params.peerAddr != nil { + pData, err := wakuLP.pm.AddPeer(params.peerAddr, peerstore.Static, []string{params.pubsubTopic}, LightPushID_v20beta1) + if err != nil { + return nil, err + } + wakuLP.pm.Connect(pData) + params.selectedPeer = pData.AddrInfo.ID + } + if params.pm != nil && params.selectedPeer == "" { params.selectedPeer, err = wakuLP.pm.SelectPeer( peermanager.PeerSelectionCriteria{ diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option.go b/waku/v2/protocol/lightpush/waku_lightpush_option.go index d1627c16..caf87c0c 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -1,8 +1,11 @@ package lightpush import ( + "errors" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/relay" @@ -11,6 +14,7 @@ import ( type lightPushParameters struct { host host.Host + peerAddr multiaddr.Multiaddr selectedPeer peer.ID peerSelectionType peermanager.PeerSelection preferredPeers peer.IDSlice @@ -21,12 +25,29 @@ type lightPushParameters struct { } // Option is the type of options accepted when performing LightPush protocol requests -type Option func(*lightPushParameters) +type Option func(*lightPushParameters) error // WithPeer is an option used to specify the peerID to push a waku message to func WithPeer(p peer.ID) Option { - return func(params *lightPushParameters) { + return func(params *lightPushParameters) error { params.selectedPeer = p + if params.peerAddr != nil { + return errors.New("peerAddr and peerId options are mutually exclusive") + } + return nil + } +} + +// WithPeerAddr is an option used to specify a peerAddress +// This new peer will be added to peerStore. +// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used. +func WithPeerAddr(pAddr multiaddr.Multiaddr) Option { + return func(params *lightPushParameters) error { + params.peerAddr = pAddr + if params.selectedPeer != "" { + return errors.New("peerAddr and peerId options are mutually exclusive") + } + return nil } } @@ -35,9 +56,10 @@ func WithPeer(p peer.ID) Option { // from that list assuming it supports the chosen protocol, otherwise it will chose a peer // from the node peerstore func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) Option { - return func(params *lightPushParameters) { + return func(params *lightPushParameters) error { params.peerSelectionType = peermanager.Automatic params.preferredPeers = fromThesePeers + return nil } } @@ -46,38 +68,43 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) Option { // from that list assuming it supports the chosen protocol, otherwise it will chose a peer // from the node peerstore func WithFastestPeerSelection(fromThesePeers ...peer.ID) Option { - return func(params *lightPushParameters) { + return func(params *lightPushParameters) error { params.peerSelectionType = peermanager.LowestRTT + return nil } } // WithPubSubTopic is used to specify the pubsub topic on which a WakuMessage will be broadcasted func WithPubSubTopic(pubsubTopic string) Option { - return func(params *lightPushParameters) { + return func(params *lightPushParameters) error { params.pubsubTopic = pubsubTopic + return nil } } // WithDefaultPubsubTopic is used to indicate that the message should be broadcasted in the default pubsub topic func WithDefaultPubsubTopic() Option { - return func(params *lightPushParameters) { + return func(params *lightPushParameters) error { params.pubsubTopic = relay.DefaultWakuTopic + return nil } } // WithRequestID is an option to set a specific request ID to be used when // publishing a message func WithRequestID(requestID []byte) Option { - return func(params *lightPushParameters) { + return func(params *lightPushParameters) error { params.requestID = requestID + return nil } } // WithAutomaticRequestID is an option to automatically generate a request ID // when publishing a message func WithAutomaticRequestID() Option { - return func(params *lightPushParameters) { + return func(params *lightPushParameters) error { params.requestID = protocol.GenerateRequestID() + return nil } } diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option_test.go b/waku/v2/protocol/lightpush/waku_lightpush_option_test.go index 3bb72654..7c69dd9c 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option_test.go @@ -5,6 +5,7 @@ import ( "crypto/rand" "testing" + "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/require" "github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/waku/v2/utils" @@ -30,10 +31,28 @@ func TestLightPushOption(t *testing.T) { params.log = utils.Logger() for _, opt := range options { - opt(params) + err := opt(params) + require.NoError(t, err) } require.Equal(t, host, params.host) require.NotNil(t, params.selectedPeer) require.NotNil(t, params.requestID) + + maddr, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/12345/p2p/16Uiu2HAm8KUwGRruseAaEGD6xGg6XKrDo8Py5dwDoL9wUpCxawGy") + require.NoError(t, err) + + options = []Option{ + WithPeer("16Uiu2HAm8KUwGRruseAaEGD6xGg6XKrDo8Py5dwDoL9wUpCxawGy"), + WithPeerAddr(maddr), + } + + for idx, opt := range options { + err = opt(params) + if idx == 0 { + require.NoError(t, err) + } else { + require.Error(t, err) + } + } }