diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index b36f0567..f3930111 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -263,7 +263,10 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot optList := DefaultSubscriptionOptions() optList = append(optList, opts...) for _, opt := range optList { - opt(params) + err := opt(params) + if err != nil { + return nil, err + } } if params.selectedPeer == "" { @@ -317,7 +320,10 @@ func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterSubscribeO params.log = wf.log opts = append(DefaultUnsubscribeOptions(), opts...) for _, opt := range opts { - opt(params) + err := opt(params) + if err != nil { + return nil, err + } } return params, nil diff --git a/waku/v2/protocol/filter/options.go b/waku/v2/protocol/filter/options.go index 036165fb..991dde86 100644 --- a/waku/v2/protocol/filter/options.go +++ b/waku/v2/protocol/filter/options.go @@ -35,7 +35,7 @@ type ( Option func(*FilterParameters) - FilterSubscribeOption func(*FilterSubscribeParameters) + FilterSubscribeOption func(*FilterSubscribeParameters) error ) func WithTimeout(timeout time.Duration) Option { @@ -45,8 +45,9 @@ func WithTimeout(timeout time.Duration) Option { } func WithPeer(p peer.ID) FilterSubscribeOption { - return func(params *FilterSubscribeParameters) { + return func(params *FilterSubscribeParameters) error { params.selectedPeer = p + return nil } } @@ -54,7 +55,7 @@ func WithPeer(p peer.ID) FilterSubscribeOption { // If a list of specific peers is passed, the peer will be chosen from that list assuming it // supports the chosen protocol, otherwise it will chose a peer from the node peerstore func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption { - return func(params *FilterSubscribeParameters) { + return func(params *FilterSubscribeParameters) error { var p peer.ID var err error if params.pm == nil { @@ -62,11 +63,13 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption } else { p, err = params.pm.SelectPeer(FilterSubscribeID_v20beta1, "", fromThesePeers...) } - if err == nil { - params.selectedPeer = p - } else { - params.log.Info("selecting peer", zap.Error(err)) + + if err != nil { + return err } + + params.selectedPeer = p + return nil } } @@ -75,29 +78,32 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption // from that list assuming it supports the chosen protocol, otherwise it will chose a // peer from the node peerstore func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) FilterSubscribeOption { - return func(params *FilterSubscribeParameters) { + return func(params *FilterSubscribeParameters) error { p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, FilterSubscribeID_v20beta1, fromThesePeers, params.log) - if err == nil { - params.selectedPeer = p - } else { - params.log.Info("selecting peer", zap.Error(err)) + if err != nil { + return err } + + params.selectedPeer = p + return nil } } // WithRequestID is an option to set a specific request ID to be used when // creating/removing a filter subscription func WithRequestID(requestID []byte) FilterSubscribeOption { - return func(params *FilterSubscribeParameters) { + return func(params *FilterSubscribeParameters) error { params.requestID = requestID + return nil } } // WithAutomaticRequestID is an option to automatically generate a request ID // when creating a filter subscription func WithAutomaticRequestID() FilterSubscribeOption { - return func(params *FilterSubscribeParameters) { + return func(params *FilterSubscribeParameters) error { params.requestID = protocol.GenerateRequestID() + return nil } } @@ -109,24 +115,27 @@ func DefaultSubscriptionOptions() []FilterSubscribeOption { } func UnsubscribeAll() FilterSubscribeOption { - return func(params *FilterSubscribeParameters) { + return func(params *FilterSubscribeParameters) error { params.unsubscribeAll = true + return nil } } // WithWaitGroup allows specifying a waitgroup to wait until all // unsubscribe requests are complete before the function is complete func WithWaitGroup(wg *sync.WaitGroup) FilterSubscribeOption { - return func(params *FilterSubscribeParameters) { + return func(params *FilterSubscribeParameters) error { params.wg = wg + return nil } } // DontWait is used to fire and forget an unsubscription, and don't // care about the results of it func DontWait() FilterSubscribeOption { - return func(params *FilterSubscribeParameters) { + return func(params *FilterSubscribeParameters) error { params.wg = nil + return nil } } diff --git a/waku/v2/protocol/filter/options_test.go b/waku/v2/protocol/filter/options_test.go index 28ad20b2..eeca00ee 100644 --- a/waku/v2/protocol/filter/options_test.go +++ b/waku/v2/protocol/filter/options_test.go @@ -29,7 +29,7 @@ func TestFilterOption(t *testing.T) { params.log = utils.Logger() for _, opt := range options { - opt(params) + _ = opt(params) } require.Equal(t, host, params.host) @@ -45,7 +45,8 @@ func TestFilterOption(t *testing.T) { params2 := new(FilterSubscribeParameters) for _, opt := range options2 { - opt(params2) + err := opt(params2) + require.NoError(t, err) } require.NotNil(t, params2.selectedPeer)