mirror of
https://github.com/status-im/go-waku.git
synced 2025-01-12 23:04:45 +00:00
fix: return errors in FilterSubscribeOption (#794)
This commit is contained in:
parent
9017f9816a
commit
3aa477cbc6
@ -263,7 +263,10 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
|
||||
optList := DefaultSubscriptionOptions()
|
||||
optList = append(optList, opts...)
|
||||
for _, opt := range optList {
|
||||
opt(params)
|
||||
err := opt(params)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if params.selectedPeer == "" {
|
||||
@ -317,7 +320,10 @@ func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterSubscribeO
|
||||
params.log = wf.log
|
||||
opts = append(DefaultUnsubscribeOptions(), opts...)
|
||||
for _, opt := range opts {
|
||||
opt(params)
|
||||
err := opt(params)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return params, nil
|
||||
|
@ -35,7 +35,7 @@ type (
|
||||
|
||||
Option func(*FilterParameters)
|
||||
|
||||
FilterSubscribeOption func(*FilterSubscribeParameters)
|
||||
FilterSubscribeOption func(*FilterSubscribeParameters) error
|
||||
)
|
||||
|
||||
func WithTimeout(timeout time.Duration) Option {
|
||||
@ -45,8 +45,9 @@ func WithTimeout(timeout time.Duration) Option {
|
||||
}
|
||||
|
||||
func WithPeer(p peer.ID) FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) {
|
||||
return func(params *FilterSubscribeParameters) error {
|
||||
params.selectedPeer = p
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@ -54,7 +55,7 @@ func WithPeer(p peer.ID) FilterSubscribeOption {
|
||||
// If a list of specific peers is passed, the peer will be chosen from that list assuming it
|
||||
// supports the chosen protocol, otherwise it will chose a peer from the node peerstore
|
||||
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) {
|
||||
return func(params *FilterSubscribeParameters) error {
|
||||
var p peer.ID
|
||||
var err error
|
||||
if params.pm == nil {
|
||||
@ -62,11 +63,13 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption
|
||||
} else {
|
||||
p, err = params.pm.SelectPeer(FilterSubscribeID_v20beta1, "", fromThesePeers...)
|
||||
}
|
||||
if err == nil {
|
||||
params.selectedPeer = p
|
||||
} else {
|
||||
params.log.Info("selecting peer", zap.Error(err))
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
params.selectedPeer = p
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@ -75,29 +78,32 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption
|
||||
// from that list assuming it supports the chosen protocol, otherwise it will chose a
|
||||
// peer from the node peerstore
|
||||
func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) {
|
||||
return func(params *FilterSubscribeParameters) error {
|
||||
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, FilterSubscribeID_v20beta1, fromThesePeers, params.log)
|
||||
if err == nil {
|
||||
params.selectedPeer = p
|
||||
} else {
|
||||
params.log.Info("selecting peer", zap.Error(err))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
params.selectedPeer = p
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithRequestID is an option to set a specific request ID to be used when
|
||||
// creating/removing a filter subscription
|
||||
func WithRequestID(requestID []byte) FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) {
|
||||
return func(params *FilterSubscribeParameters) error {
|
||||
params.requestID = requestID
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithAutomaticRequestID is an option to automatically generate a request ID
|
||||
// when creating a filter subscription
|
||||
func WithAutomaticRequestID() FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) {
|
||||
return func(params *FilterSubscribeParameters) error {
|
||||
params.requestID = protocol.GenerateRequestID()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@ -109,24 +115,27 @@ func DefaultSubscriptionOptions() []FilterSubscribeOption {
|
||||
}
|
||||
|
||||
func UnsubscribeAll() FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) {
|
||||
return func(params *FilterSubscribeParameters) error {
|
||||
params.unsubscribeAll = true
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithWaitGroup allows specifying a waitgroup to wait until all
|
||||
// unsubscribe requests are complete before the function is complete
|
||||
func WithWaitGroup(wg *sync.WaitGroup) FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) {
|
||||
return func(params *FilterSubscribeParameters) error {
|
||||
params.wg = wg
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// DontWait is used to fire and forget an unsubscription, and don't
|
||||
// care about the results of it
|
||||
func DontWait() FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) {
|
||||
return func(params *FilterSubscribeParameters) error {
|
||||
params.wg = nil
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -29,7 +29,7 @@ func TestFilterOption(t *testing.T) {
|
||||
params.log = utils.Logger()
|
||||
|
||||
for _, opt := range options {
|
||||
opt(params)
|
||||
_ = opt(params)
|
||||
}
|
||||
|
||||
require.Equal(t, host, params.host)
|
||||
@ -45,7 +45,8 @@ func TestFilterOption(t *testing.T) {
|
||||
params2 := new(FilterSubscribeParameters)
|
||||
|
||||
for _, opt := range options2 {
|
||||
opt(params2)
|
||||
err := opt(params2)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
require.NotNil(t, params2.selectedPeer)
|
||||
|
Loading…
x
Reference in New Issue
Block a user