diff --git a/examples/chat2-reliable/chat.go b/examples/chat2-reliable/chat.go index 689a4a46..bbfe60bc 100644 --- a/examples/chat2-reliable/chat.go +++ b/examples/chat2-reliable/chat.go @@ -79,7 +79,7 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node. PubsubTopic: relay.DefaultWakuTopic, ContentTopics: protocol.NewContentTopicSet(options.ContentTopic), } - var filterOpt filter.FilterSubscribeOption + var filterOpt filter.SubscribeOption peerID, err := options.Filter.NodePeerID() if err != nil { filterOpt = filter.WithAutomaticPeerSelection() diff --git a/examples/chat2/chat.go b/examples/chat2/chat.go index 48c39d4f..c3366776 100644 --- a/examples/chat2/chat.go +++ b/examples/chat2/chat.go @@ -65,7 +65,7 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node. PubsubTopic: relay.DefaultWakuTopic, ContentTopics: protocol.NewContentTopicSet(options.ContentTopic), } - var filterOpt filter.FilterSubscribeOption + var filterOpt filter.SubscribeOption peerID, err := options.Filter.NodePeerID() if err != nil { filterOpt = filter.WithAutomaticPeerSelection() diff --git a/library/filter.go b/library/filter.go index ef1d8cc0..af8a1e4d 100644 --- a/library/filter.go +++ b/library/filter.go @@ -57,7 +57,7 @@ func FilterSubscribe(instance *WakuInstance, filterJSON string, peerID string, m ctx = instance.ctx } - var fOptions []filter.FilterSubscribeOption + var fOptions []filter.SubscribeOption if peerID != "" { p, err := peer.Decode(peerID) if err != nil { @@ -141,7 +141,7 @@ func FilterUnsubscribe(instance *WakuInstance, filterJSON string, peerID string, ctx = instance.ctx } - var fOptions []filter.FilterSubscribeOption + var fOptions []filter.SubscribeOption if peerID != "" { p, err := peer.Decode(peerID) if err != nil { @@ -185,7 +185,7 @@ func FilterUnsubscribeAll(instance *WakuInstance, peerID string, ms int) (string ctx = instance.ctx } - var fOptions []filter.FilterSubscribeOption + var fOptions []filter.SubscribeOption if peerID != "" { p, err := peer.Decode(peerID) if err != nil { diff --git a/waku/v2/api/filter/filter.go b/waku/v2/api/filter/filter.go index 020bb23f..e9d1ebd0 100644 --- a/waku/v2/api/filter/filter.go +++ b/waku/v2/api/filter/filter.go @@ -194,7 +194,7 @@ func possibleRecursiveError(err error) bool { func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int, peersToExclude ...peer.ID) ([]*subscription.SubscriptionDetails, error) { // Low-level subscribe, returns a set of SubscriptionDetails - options := make([]filter.FilterSubscribeOption, 0) + options := make([]filter.SubscribeOption, 0) options = append(options, filter.WithMaxPeersPerContentFilter(int(peerCount))) for _, p := range apiSub.Config.Peers { options = append(options, filter.WithPeer(p)) diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index d9b2335a..c9e5b745 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -102,7 +102,7 @@ func NewWakuFilterLightNode( wf.metrics = newMetrics(reg) wf.peerPingInterval = 1 * time.Minute - params := &FilterLightNodeParameters{} + params := &LightNodeParameters{} opts = append(DefaultLightNodeOptions(), opts...) for _, opt := range opts { opt(params) @@ -318,8 +318,8 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte, return nil } -func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context, contentFilter protocol.ContentFilter, opts []FilterSubscribeOption) (*FilterSubscribeParameters, map[string][]string, error) { - params := new(FilterSubscribeParameters) +func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context, contentFilter protocol.ContentFilter, opts []SubscribeOption) (*SubscribeParameters, map[string][]string, error) { + params := new(SubscribeParameters) params.log = wf.log params.host = wf.h params.pm = wf.pm @@ -377,7 +377,7 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context, // If contentTopics passed result in different pubSub topics (due to Auto/Static sharding), then multiple subscription requests are sent to the peer. // This may change if Filterv2 protocol is updated to handle such a scenario in a single request. // Note: In case of partial failure, results are returned for successful subscriptions along with error indicating failed contentTopics. -func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) ([]*subscription.SubscriptionDetails, error) { +func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...SubscribeOption) ([]*subscription.SubscriptionDetails, error) { wf.RLock() defer wf.RUnlock() if err := wf.ErrOnNotRunning(); err != nil { @@ -479,8 +479,8 @@ func (wf *WakuFilterLightNode) FilterSubscription(peerID peer.ID, contentFilter return wf.subscriptions.NewSubscription(peerID, contentFilter), nil } -func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterSubscribeOption) (*FilterSubscribeParameters, error) { - params := new(FilterSubscribeParameters) +func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...SubscribeOption) (*SubscribeParameters, error) { + params := new(SubscribeParameters) params.log = wf.log opts = append(DefaultUnsubscribeOptions(), opts...) for _, opt := range opts { @@ -492,14 +492,14 @@ func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterSubscribeO return params, nil } -func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID, opts ...FilterPingOption) error { +func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID, opts ...PingOption) error { wf.RLock() defer wf.RUnlock() if err := wf.ErrOnNotRunning(); err != nil { return err } - params := &FilterPingParameters{} + params := &PingParameters{} for _, opt := range opts { opt(params) } @@ -516,7 +516,7 @@ func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID, opts .. } // Unsubscribe is used to stop receiving messages from specified peers for the content filter -func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) { +func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...SubscribeOption) (*WakuFilterPushResult, error) { wf.RLock() defer wf.RUnlock() if err := wf.ErrOnNotRunning(); err != nil { @@ -621,7 +621,7 @@ func (wf *WakuFilterLightNode) IsListening(pubsubTopic, contentTopic string) boo // If there are no more subscriptions matching the passed [peer, contentFilter] pair, // server unsubscribe is also performed func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *subscription.SubscriptionDetails, - opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) { + opts ...SubscribeOption) (*WakuFilterPushResult, error) { wf.RLock() defer wf.RUnlock() if err := wf.ErrOnNotRunning(); err != nil { @@ -668,7 +668,7 @@ func (wf *WakuFilterLightNode) unsubscribeFromServer(ctx context.Context, reques // close all subscribe for selectedPeer or if selectedPeer == "", then all peers // send the unsubscribeAll request to the peers -func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) { +func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...SubscribeOption) (*WakuFilterPushResult, error) { params, err := wf.getUnsubscribeParameters(opts...) if err != nil { return nil, err @@ -739,7 +739,7 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte } // UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions -func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) { +func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...SubscribeOption) (*WakuFilterPushResult, error) { wf.RLock() defer wf.RUnlock() if err := wf.ErrOnNotRunning(); err != nil { diff --git a/waku/v2/protocol/filter/filter_proto_ident_test.go b/waku/v2/protocol/filter/filter_proto_ident_test.go index 6614bfdc..3c87ef8d 100644 --- a/waku/v2/protocol/filter/filter_proto_ident_test.go +++ b/waku/v2/protocol/filter/filter_proto_ident_test.go @@ -58,7 +58,7 @@ func (s *FilterTestSuite) TestMultipleMessages() { s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, "second"}) } -func (wf *WakuFilterLightNode) incorrectSubscribeRequest(ctx context.Context, params *FilterSubscribeParameters, +func (wf *WakuFilterLightNode) incorrectSubscribeRequest(ctx context.Context, params *SubscribeParameters, reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter protocol.ContentFilter) error { const FilterSubscribeID_Incorrect1 = libp2pProtocol.ID("/vac/waku/filter-subscribe/abcd") @@ -111,7 +111,7 @@ func (wf *WakuFilterLightNode) incorrectSubscribeRequest(ctx context.Context, pa return nil } -func (wf *WakuFilterLightNode) IncorrectSubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) ([]*subscription.SubscriptionDetails, error) { +func (wf *WakuFilterLightNode) IncorrectSubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...SubscribeOption) ([]*subscription.SubscriptionDetails, error) { wf.RLock() defer wf.RUnlock() if err := wf.ErrOnNotRunning(); err != nil { @@ -129,7 +129,7 @@ func (wf *WakuFilterLightNode) IncorrectSubscribe(ctx context.Context, contentFi return nil, fmt.Errorf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest) } - params := new(FilterSubscribeParameters) + params := new(SubscribeParameters) params.log = wf.log params.host = wf.h params.pm = wf.pm diff --git a/waku/v2/protocol/filter/filter_subscribe_test.go b/waku/v2/protocol/filter/filter_subscribe_test.go index c8ec33c9..edaa793f 100644 --- a/waku/v2/protocol/filter/filter_subscribe_test.go +++ b/waku/v2/protocol/filter/filter_subscribe_test.go @@ -392,7 +392,7 @@ func (s *FilterTestSuite) TestHandleFilterSubscribeOptions() { s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID()) // With valid peer - opts := []FilterSubscribeOption{WithPeer(s.FullNodeHost.ID())} + opts := []SubscribeOption{WithPeer(s.FullNodeHost.ID())} // Positive case _, _, err := s.LightNode.handleFilterSubscribeOptions(s.ctx, contentFilter, opts) @@ -401,7 +401,7 @@ func (s *FilterTestSuite) TestHandleFilterSubscribeOptions() { addr := s.FullNodeHost.Addrs()[0] // Combine mutually exclusive options - opts = []FilterSubscribeOption{WithPeer(s.FullNodeHost.ID()), WithPeerAddr(addr)} + opts = []SubscribeOption{WithPeer(s.FullNodeHost.ID()), WithPeerAddr(addr)} // Should fail on wrong option combination _, _, err = s.LightNode.handleFilterSubscribeOptions(s.ctx, contentFilter, opts) diff --git a/waku/v2/protocol/filter/options.go b/waku/v2/protocol/filter/options.go index 176c326b..6f1957b5 100644 --- a/waku/v2/protocol/filter/options.go +++ b/waku/v2/protocol/filter/options.go @@ -14,28 +14,28 @@ import ( "golang.org/x/time/rate" ) -func (old *FilterSubscribeParameters) Copy() *FilterSubscribeParameters { - return &FilterSubscribeParameters{ +func (old *SubscribeParameters) Copy() *SubscribeParameters { + return &SubscribeParameters{ selectedPeers: old.selectedPeers, requestID: old.requestID, } } type ( - FilterPingParameters struct { + PingParameters struct { requestID []byte } - FilterPingOption func(*FilterPingParameters) + PingOption func(*PingParameters) ) -func WithPingRequestId(requestId []byte) FilterPingOption { - return func(params *FilterPingParameters) { +func WithPingRequestId(requestId []byte) PingOption { + return func(params *PingParameters) { params.requestID = requestId } } type ( - FilterSubscribeParameters struct { + SubscribeParameters struct { selectedPeers peer.IDSlice peerAddr multiaddr.Multiaddr peerSelectionType peermanager.PeerSelection @@ -54,7 +54,7 @@ type ( wg *sync.WaitGroup } - FilterFullNodeParameters struct { + FullNodeParameters struct { Timeout time.Duration MaxSubscribers int pm *peermanager.PeerManager @@ -62,20 +62,20 @@ type ( limitB int } - FullNodeOption func(*FilterFullNodeParameters) + FullNodeOption func(*FullNodeParameters) - FilterLightNodeParameters struct { + LightNodeParameters struct { limitR rate.Limit limitB int } - LightNodeOption func(*FilterLightNodeParameters) + LightNodeOption func(*LightNodeParameters) - FilterSubscribeOption func(*FilterSubscribeParameters) error + SubscribeOption func(*SubscribeParameters) error ) func WithLightNodeRateLimiter(r rate.Limit, b int) LightNodeOption { - return func(params *FilterLightNodeParameters) { + return func(params *LightNodeParameters) { params.limitR = r params.limitB = b } @@ -88,15 +88,15 @@ func DefaultLightNodeOptions() []LightNodeOption { } func WithTimeout(timeout time.Duration) FullNodeOption { - return func(params *FilterFullNodeParameters) { + return func(params *FullNodeParameters) { params.Timeout = timeout } } // WithPeer is an option used to specify the peerID to request the message history. // Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used. -func WithPeer(p peer.ID) FilterSubscribeOption { - return func(params *FilterSubscribeParameters) error { +func WithPeer(p peer.ID) SubscribeOption { + return func(params *SubscribeParameters) error { params.selectedPeers = append(params.selectedPeers, p) if params.peerAddr != nil { return errors.New("peerAddr and peerId options are mutually exclusive") @@ -108,8 +108,8 @@ func WithPeer(p peer.ID) FilterSubscribeOption { // WithPeerAddr is an option used to specify a peerAddress. // This new peer will be added to peerStore. // Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used. -func WithPeerAddr(pAddr multiaddr.Multiaddr) FilterSubscribeOption { - return func(params *FilterSubscribeParameters) error { +func WithPeerAddr(pAddr multiaddr.Multiaddr) SubscribeOption { + return func(params *SubscribeParameters) error { params.peerAddr = pAddr if len(params.selectedPeers) != 0 { return errors.New("peerAddr and peerId options are mutually exclusive") @@ -118,16 +118,16 @@ func WithPeerAddr(pAddr multiaddr.Multiaddr) FilterSubscribeOption { } } -func WithMaxPeersPerContentFilter(numPeers int) FilterSubscribeOption { - return func(params *FilterSubscribeParameters) error { +func WithMaxPeersPerContentFilter(numPeers int) SubscribeOption { + return func(params *SubscribeParameters) error { params.maxPeers = numPeers return nil } } // WithPeersToExclude option excludes the peers that are specified from selection -func WithPeersToExclude(peers ...peer.ID) FilterSubscribeOption { - return func(params *FilterSubscribeParameters) error { +func WithPeersToExclude(peers ...peer.ID) SubscribeOption { + return func(params *SubscribeParameters) error { params.peersToExclude = peermanager.PeerSliceToMap(peers) return nil } @@ -136,8 +136,8 @@ func WithPeersToExclude(peers ...peer.ID) FilterSubscribeOption { // WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store. // If a list of specific peers is passed, the peer will be chosen from that list assuming it // supports the chosen protocol, otherwise it will chose a peer from the node peerstore -func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption { - return func(params *FilterSubscribeParameters) error { +func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) SubscribeOption { + return func(params *SubscribeParameters) error { params.peerSelectionType = peermanager.Automatic params.preferredPeers = fromThesePeers return nil @@ -148,8 +148,8 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption // with the lowest ping If a list of specific peers is passed, the peer will be chosen // from that list assuming it supports the chosen protocol, otherwise it will chose a // peer from the node peerstore -func WithFastestPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption { - return func(params *FilterSubscribeParameters) error { +func WithFastestPeerSelection(fromThesePeers ...peer.ID) SubscribeOption { + return func(params *SubscribeParameters) error { params.peerSelectionType = peermanager.LowestRTT return nil } @@ -157,8 +157,8 @@ func WithFastestPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption { // WithRequestID is an option to set a specific request ID to be used when // creating/removing a filter subscription -func WithRequestID(requestID []byte) FilterSubscribeOption { - return func(params *FilterSubscribeParameters) error { +func WithRequestID(requestID []byte) SubscribeOption { + return func(params *SubscribeParameters) error { params.requestID = requestID return nil } @@ -166,23 +166,23 @@ func WithRequestID(requestID []byte) FilterSubscribeOption { // WithAutomaticRequestID is an option to automatically generate a request ID // when creating a filter subscription -func WithAutomaticRequestID() FilterSubscribeOption { - return func(params *FilterSubscribeParameters) error { +func WithAutomaticRequestID() SubscribeOption { + return func(params *SubscribeParameters) error { params.requestID = protocol.GenerateRequestID() return nil } } -func DefaultSubscriptionOptions() []FilterSubscribeOption { - return []FilterSubscribeOption{ +func DefaultSubscriptionOptions() []SubscribeOption { + return []SubscribeOption{ WithAutomaticPeerSelection(), WithAutomaticRequestID(), WithMaxPeersPerContentFilter(1), } } -func UnsubscribeAll() FilterSubscribeOption { - return func(params *FilterSubscribeParameters) error { +func UnsubscribeAll() SubscribeOption { + return func(params *SubscribeParameters) error { params.unsubscribeAll = true return nil } @@ -190,8 +190,8 @@ func UnsubscribeAll() FilterSubscribeOption { // WithWaitGroup allows specifying a waitgroup to wait until all // unsubscribe requests are complete before the function is complete -func WithWaitGroup(wg *sync.WaitGroup) FilterSubscribeOption { - return func(params *FilterSubscribeParameters) error { +func WithWaitGroup(wg *sync.WaitGroup) SubscribeOption { + return func(params *SubscribeParameters) error { params.wg = wg return nil } @@ -199,34 +199,34 @@ func WithWaitGroup(wg *sync.WaitGroup) FilterSubscribeOption { // DontWait is used to fire and forget an unsubscription, and don't // care about the results of it -func DontWait() FilterSubscribeOption { - return func(params *FilterSubscribeParameters) error { +func DontWait() SubscribeOption { + return func(params *SubscribeParameters) error { params.wg = nil return nil } } -func DefaultUnsubscribeOptions() []FilterSubscribeOption { - return []FilterSubscribeOption{ +func DefaultUnsubscribeOptions() []SubscribeOption { + return []SubscribeOption{ WithAutomaticRequestID(), WithWaitGroup(&sync.WaitGroup{}), } } func WithMaxSubscribers(maxSubscribers int) FullNodeOption { - return func(params *FilterFullNodeParameters) { + return func(params *FullNodeParameters) { params.MaxSubscribers = maxSubscribers } } func WithPeerManager(pm *peermanager.PeerManager) FullNodeOption { - return func(params *FilterFullNodeParameters) { + return func(params *FullNodeParameters) { params.pm = pm } } func WithFullNodeRateLimiter(r rate.Limit, b int) FullNodeOption { - return func(params *FilterFullNodeParameters) { + return func(params *FullNodeParameters) { params.limitR = r params.limitB = b } diff --git a/waku/v2/protocol/filter/options_test.go b/waku/v2/protocol/filter/options_test.go index a8a36148..a101ef2b 100644 --- a/waku/v2/protocol/filter/options_test.go +++ b/waku/v2/protocol/filter/options_test.go @@ -19,13 +19,13 @@ func TestFilterOption(t *testing.T) { require.NoError(t, err) // subscribe options - options := []FilterSubscribeOption{ + options := []SubscribeOption{ WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"), WithAutomaticPeerSelection(), WithFastestPeerSelection(), } - params := new(FilterSubscribeParameters) + params := new(SubscribeParameters) params.host = host params.log = utils.Logger() @@ -38,13 +38,13 @@ func TestFilterOption(t *testing.T) { require.NotEqual(t, 0, params.selectedPeers) // Unsubscribe options - options2 := []FilterSubscribeOption{ + options2 := []SubscribeOption{ WithAutomaticRequestID(), UnsubscribeAll(), WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"), } - params2 := new(FilterSubscribeParameters) + params2 := new(SubscribeParameters) for _, opt := range options2 { err := opt(params2) @@ -57,12 +57,12 @@ func TestFilterOption(t *testing.T) { // Mutually Exclusive options maddr, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/12345/p2p/16Uiu2HAm8KUwGRruseAaEGD6xGg6XKrDo8Py5dwDoL9wUpCxawGy") require.NoError(t, err) - options3 := []FilterSubscribeOption{ + options3 := []SubscribeOption{ WithPeer("16Uiu2HAm8KUwGRruseAaEGD6xGg6XKrDo8Py5dwDoL9wUpCxawGy"), WithPeerAddr(maddr), } - params3 := new(FilterSubscribeParameters) + params3 := new(SubscribeParameters) for idx, opt := range options3 { err := opt(params3) diff --git a/waku/v2/protocol/filter/server.go b/waku/v2/protocol/filter/server.go index 50b8373a..b9c88b45 100644 --- a/waku/v2/protocol/filter/server.go +++ b/waku/v2/protocol/filter/server.go @@ -49,7 +49,7 @@ func NewWakuFilterFullNode(timesource timesource.Timesource, reg prometheus.Regi wf := new(WakuFilterFullNode) wf.log = log.Named("filterv2-fullnode") - params := new(FilterFullNodeParameters) + params := new(FullNodeParameters) optList := DefaultFullNodeOptions() optList = append(optList, opts...) for _, opt := range optList {