refactor: rename to avoid stuttering

This commit is contained in:
Richard Ramos 2024-11-26 08:57:58 -04:00
parent 5403100d91
commit e37768a78b
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760
10 changed files with 73 additions and 73 deletions

View File

@ -79,7 +79,7 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.
PubsubTopic: relay.DefaultWakuTopic,
ContentTopics: protocol.NewContentTopicSet(options.ContentTopic),
}
var filterOpt filter.FilterSubscribeOption
var filterOpt filter.SubscribeOption
peerID, err := options.Filter.NodePeerID()
if err != nil {
filterOpt = filter.WithAutomaticPeerSelection()

View File

@ -65,7 +65,7 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.
PubsubTopic: relay.DefaultWakuTopic,
ContentTopics: protocol.NewContentTopicSet(options.ContentTopic),
}
var filterOpt filter.FilterSubscribeOption
var filterOpt filter.SubscribeOption
peerID, err := options.Filter.NodePeerID()
if err != nil {
filterOpt = filter.WithAutomaticPeerSelection()

View File

@ -57,7 +57,7 @@ func FilterSubscribe(instance *WakuInstance, filterJSON string, peerID string, m
ctx = instance.ctx
}
var fOptions []filter.FilterSubscribeOption
var fOptions []filter.SubscribeOption
if peerID != "" {
p, err := peer.Decode(peerID)
if err != nil {
@ -141,7 +141,7 @@ func FilterUnsubscribe(instance *WakuInstance, filterJSON string, peerID string,
ctx = instance.ctx
}
var fOptions []filter.FilterSubscribeOption
var fOptions []filter.SubscribeOption
if peerID != "" {
p, err := peer.Decode(peerID)
if err != nil {
@ -185,7 +185,7 @@ func FilterUnsubscribeAll(instance *WakuInstance, peerID string, ms int) (string
ctx = instance.ctx
}
var fOptions []filter.FilterSubscribeOption
var fOptions []filter.SubscribeOption
if peerID != "" {
p, err := peer.Decode(peerID)
if err != nil {

View File

@ -194,7 +194,7 @@ func possibleRecursiveError(err error) bool {
func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int, peersToExclude ...peer.ID) ([]*subscription.SubscriptionDetails, error) {
// Low-level subscribe, returns a set of SubscriptionDetails
options := make([]filter.FilterSubscribeOption, 0)
options := make([]filter.SubscribeOption, 0)
options = append(options, filter.WithMaxPeersPerContentFilter(int(peerCount)))
for _, p := range apiSub.Config.Peers {
options = append(options, filter.WithPeer(p))

View File

@ -102,7 +102,7 @@ func NewWakuFilterLightNode(
wf.metrics = newMetrics(reg)
wf.peerPingInterval = 1 * time.Minute
params := &FilterLightNodeParameters{}
params := &LightNodeParameters{}
opts = append(DefaultLightNodeOptions(), opts...)
for _, opt := range opts {
opt(params)
@ -318,8 +318,8 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,
return nil
}
func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context, contentFilter protocol.ContentFilter, opts []FilterSubscribeOption) (*FilterSubscribeParameters, map[string][]string, error) {
params := new(FilterSubscribeParameters)
func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context, contentFilter protocol.ContentFilter, opts []SubscribeOption) (*SubscribeParameters, map[string][]string, error) {
params := new(SubscribeParameters)
params.log = wf.log
params.host = wf.h
params.pm = wf.pm
@ -377,7 +377,7 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context,
// If contentTopics passed result in different pubSub topics (due to Auto/Static sharding), then multiple subscription requests are sent to the peer.
// This may change if Filterv2 protocol is updated to handle such a scenario in a single request.
// Note: In case of partial failure, results are returned for successful subscriptions along with error indicating failed contentTopics.
func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) ([]*subscription.SubscriptionDetails, error) {
func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...SubscribeOption) ([]*subscription.SubscriptionDetails, error) {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
@ -479,8 +479,8 @@ func (wf *WakuFilterLightNode) FilterSubscription(peerID peer.ID, contentFilter
return wf.subscriptions.NewSubscription(peerID, contentFilter), nil
}
func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterSubscribeOption) (*FilterSubscribeParameters, error) {
params := new(FilterSubscribeParameters)
func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...SubscribeOption) (*SubscribeParameters, error) {
params := new(SubscribeParameters)
params.log = wf.log
opts = append(DefaultUnsubscribeOptions(), opts...)
for _, opt := range opts {
@ -492,14 +492,14 @@ func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterSubscribeO
return params, nil
}
func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID, opts ...FilterPingOption) error {
func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID, opts ...PingOption) error {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
return err
}
params := &FilterPingParameters{}
params := &PingParameters{}
for _, opt := range opts {
opt(params)
}
@ -516,7 +516,7 @@ func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID, opts ..
}
// Unsubscribe is used to stop receiving messages from specified peers for the content filter
func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) {
func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...SubscribeOption) (*WakuFilterPushResult, error) {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
@ -621,7 +621,7 @@ func (wf *WakuFilterLightNode) IsListening(pubsubTopic, contentTopic string) boo
// If there are no more subscriptions matching the passed [peer, contentFilter] pair,
// server unsubscribe is also performed
func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *subscription.SubscriptionDetails,
opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) {
opts ...SubscribeOption) (*WakuFilterPushResult, error) {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
@ -668,7 +668,7 @@ func (wf *WakuFilterLightNode) unsubscribeFromServer(ctx context.Context, reques
// close all subscribe for selectedPeer or if selectedPeer == "", then all peers
// send the unsubscribeAll request to the peers
func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) {
func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...SubscribeOption) (*WakuFilterPushResult, error) {
params, err := wf.getUnsubscribeParameters(opts...)
if err != nil {
return nil, err
@ -739,7 +739,7 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte
}
// UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions
func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) {
func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...SubscribeOption) (*WakuFilterPushResult, error) {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {

View File

@ -58,7 +58,7 @@ func (s *FilterTestSuite) TestMultipleMessages() {
s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, "second"})
}
func (wf *WakuFilterLightNode) incorrectSubscribeRequest(ctx context.Context, params *FilterSubscribeParameters,
func (wf *WakuFilterLightNode) incorrectSubscribeRequest(ctx context.Context, params *SubscribeParameters,
reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter protocol.ContentFilter) error {
const FilterSubscribeID_Incorrect1 = libp2pProtocol.ID("/vac/waku/filter-subscribe/abcd")
@ -111,7 +111,7 @@ func (wf *WakuFilterLightNode) incorrectSubscribeRequest(ctx context.Context, pa
return nil
}
func (wf *WakuFilterLightNode) IncorrectSubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) ([]*subscription.SubscriptionDetails, error) {
func (wf *WakuFilterLightNode) IncorrectSubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...SubscribeOption) ([]*subscription.SubscriptionDetails, error) {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
@ -129,7 +129,7 @@ func (wf *WakuFilterLightNode) IncorrectSubscribe(ctx context.Context, contentFi
return nil, fmt.Errorf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest)
}
params := new(FilterSubscribeParameters)
params := new(SubscribeParameters)
params.log = wf.log
params.host = wf.h
params.pm = wf.pm

View File

@ -392,7 +392,7 @@ func (s *FilterTestSuite) TestHandleFilterSubscribeOptions() {
s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID())
// With valid peer
opts := []FilterSubscribeOption{WithPeer(s.FullNodeHost.ID())}
opts := []SubscribeOption{WithPeer(s.FullNodeHost.ID())}
// Positive case
_, _, err := s.LightNode.handleFilterSubscribeOptions(s.ctx, contentFilter, opts)
@ -401,7 +401,7 @@ func (s *FilterTestSuite) TestHandleFilterSubscribeOptions() {
addr := s.FullNodeHost.Addrs()[0]
// Combine mutually exclusive options
opts = []FilterSubscribeOption{WithPeer(s.FullNodeHost.ID()), WithPeerAddr(addr)}
opts = []SubscribeOption{WithPeer(s.FullNodeHost.ID()), WithPeerAddr(addr)}
// Should fail on wrong option combination
_, _, err = s.LightNode.handleFilterSubscribeOptions(s.ctx, contentFilter, opts)

View File

@ -14,28 +14,28 @@ import (
"golang.org/x/time/rate"
)
func (old *FilterSubscribeParameters) Copy() *FilterSubscribeParameters {
return &FilterSubscribeParameters{
func (old *SubscribeParameters) Copy() *SubscribeParameters {
return &SubscribeParameters{
selectedPeers: old.selectedPeers,
requestID: old.requestID,
}
}
type (
FilterPingParameters struct {
PingParameters struct {
requestID []byte
}
FilterPingOption func(*FilterPingParameters)
PingOption func(*PingParameters)
)
func WithPingRequestId(requestId []byte) FilterPingOption {
return func(params *FilterPingParameters) {
func WithPingRequestId(requestId []byte) PingOption {
return func(params *PingParameters) {
params.requestID = requestId
}
}
type (
FilterSubscribeParameters struct {
SubscribeParameters struct {
selectedPeers peer.IDSlice
peerAddr multiaddr.Multiaddr
peerSelectionType peermanager.PeerSelection
@ -54,7 +54,7 @@ type (
wg *sync.WaitGroup
}
FilterFullNodeParameters struct {
FullNodeParameters struct {
Timeout time.Duration
MaxSubscribers int
pm *peermanager.PeerManager
@ -62,20 +62,20 @@ type (
limitB int
}
FullNodeOption func(*FilterFullNodeParameters)
FullNodeOption func(*FullNodeParameters)
FilterLightNodeParameters struct {
LightNodeParameters struct {
limitR rate.Limit
limitB int
}
LightNodeOption func(*FilterLightNodeParameters)
LightNodeOption func(*LightNodeParameters)
FilterSubscribeOption func(*FilterSubscribeParameters) error
SubscribeOption func(*SubscribeParameters) error
)
func WithLightNodeRateLimiter(r rate.Limit, b int) LightNodeOption {
return func(params *FilterLightNodeParameters) {
return func(params *LightNodeParameters) {
params.limitR = r
params.limitB = b
}
@ -88,15 +88,15 @@ func DefaultLightNodeOptions() []LightNodeOption {
}
func WithTimeout(timeout time.Duration) FullNodeOption {
return func(params *FilterFullNodeParameters) {
return func(params *FullNodeParameters) {
params.Timeout = timeout
}
}
// WithPeer is an option used to specify the peerID to request the message history.
// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used.
func WithPeer(p peer.ID) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) error {
func WithPeer(p peer.ID) SubscribeOption {
return func(params *SubscribeParameters) error {
params.selectedPeers = append(params.selectedPeers, p)
if params.peerAddr != nil {
return errors.New("peerAddr and peerId options are mutually exclusive")
@ -108,8 +108,8 @@ func WithPeer(p peer.ID) FilterSubscribeOption {
// WithPeerAddr is an option used to specify a peerAddress.
// This new peer will be added to peerStore.
// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used.
func WithPeerAddr(pAddr multiaddr.Multiaddr) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) error {
func WithPeerAddr(pAddr multiaddr.Multiaddr) SubscribeOption {
return func(params *SubscribeParameters) error {
params.peerAddr = pAddr
if len(params.selectedPeers) != 0 {
return errors.New("peerAddr and peerId options are mutually exclusive")
@ -118,16 +118,16 @@ func WithPeerAddr(pAddr multiaddr.Multiaddr) FilterSubscribeOption {
}
}
func WithMaxPeersPerContentFilter(numPeers int) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) error {
func WithMaxPeersPerContentFilter(numPeers int) SubscribeOption {
return func(params *SubscribeParameters) error {
params.maxPeers = numPeers
return nil
}
}
// WithPeersToExclude option excludes the peers that are specified from selection
func WithPeersToExclude(peers ...peer.ID) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) error {
func WithPeersToExclude(peers ...peer.ID) SubscribeOption {
return func(params *SubscribeParameters) error {
params.peersToExclude = peermanager.PeerSliceToMap(peers)
return nil
}
@ -136,8 +136,8 @@ func WithPeersToExclude(peers ...peer.ID) FilterSubscribeOption {
// WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store.
// If a list of specific peers is passed, the peer will be chosen from that list assuming it
// supports the chosen protocol, otherwise it will chose a peer from the node peerstore
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) error {
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) SubscribeOption {
return func(params *SubscribeParameters) error {
params.peerSelectionType = peermanager.Automatic
params.preferredPeers = fromThesePeers
return nil
@ -148,8 +148,8 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption
// with the lowest ping If a list of specific peers is passed, the peer will be chosen
// from that list assuming it supports the chosen protocol, otherwise it will chose a
// peer from the node peerstore
func WithFastestPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) error {
func WithFastestPeerSelection(fromThesePeers ...peer.ID) SubscribeOption {
return func(params *SubscribeParameters) error {
params.peerSelectionType = peermanager.LowestRTT
return nil
}
@ -157,8 +157,8 @@ func WithFastestPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption {
// WithRequestID is an option to set a specific request ID to be used when
// creating/removing a filter subscription
func WithRequestID(requestID []byte) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) error {
func WithRequestID(requestID []byte) SubscribeOption {
return func(params *SubscribeParameters) error {
params.requestID = requestID
return nil
}
@ -166,23 +166,23 @@ func WithRequestID(requestID []byte) FilterSubscribeOption {
// WithAutomaticRequestID is an option to automatically generate a request ID
// when creating a filter subscription
func WithAutomaticRequestID() FilterSubscribeOption {
return func(params *FilterSubscribeParameters) error {
func WithAutomaticRequestID() SubscribeOption {
return func(params *SubscribeParameters) error {
params.requestID = protocol.GenerateRequestID()
return nil
}
}
func DefaultSubscriptionOptions() []FilterSubscribeOption {
return []FilterSubscribeOption{
func DefaultSubscriptionOptions() []SubscribeOption {
return []SubscribeOption{
WithAutomaticPeerSelection(),
WithAutomaticRequestID(),
WithMaxPeersPerContentFilter(1),
}
}
func UnsubscribeAll() FilterSubscribeOption {
return func(params *FilterSubscribeParameters) error {
func UnsubscribeAll() SubscribeOption {
return func(params *SubscribeParameters) error {
params.unsubscribeAll = true
return nil
}
@ -190,8 +190,8 @@ func UnsubscribeAll() FilterSubscribeOption {
// WithWaitGroup allows specifying a waitgroup to wait until all
// unsubscribe requests are complete before the function is complete
func WithWaitGroup(wg *sync.WaitGroup) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) error {
func WithWaitGroup(wg *sync.WaitGroup) SubscribeOption {
return func(params *SubscribeParameters) error {
params.wg = wg
return nil
}
@ -199,34 +199,34 @@ func WithWaitGroup(wg *sync.WaitGroup) FilterSubscribeOption {
// DontWait is used to fire and forget an unsubscription, and don't
// care about the results of it
func DontWait() FilterSubscribeOption {
return func(params *FilterSubscribeParameters) error {
func DontWait() SubscribeOption {
return func(params *SubscribeParameters) error {
params.wg = nil
return nil
}
}
func DefaultUnsubscribeOptions() []FilterSubscribeOption {
return []FilterSubscribeOption{
func DefaultUnsubscribeOptions() []SubscribeOption {
return []SubscribeOption{
WithAutomaticRequestID(),
WithWaitGroup(&sync.WaitGroup{}),
}
}
func WithMaxSubscribers(maxSubscribers int) FullNodeOption {
return func(params *FilterFullNodeParameters) {
return func(params *FullNodeParameters) {
params.MaxSubscribers = maxSubscribers
}
}
func WithPeerManager(pm *peermanager.PeerManager) FullNodeOption {
return func(params *FilterFullNodeParameters) {
return func(params *FullNodeParameters) {
params.pm = pm
}
}
func WithFullNodeRateLimiter(r rate.Limit, b int) FullNodeOption {
return func(params *FilterFullNodeParameters) {
return func(params *FullNodeParameters) {
params.limitR = r
params.limitB = b
}

View File

@ -19,13 +19,13 @@ func TestFilterOption(t *testing.T) {
require.NoError(t, err)
// subscribe options
options := []FilterSubscribeOption{
options := []SubscribeOption{
WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"),
WithAutomaticPeerSelection(),
WithFastestPeerSelection(),
}
params := new(FilterSubscribeParameters)
params := new(SubscribeParameters)
params.host = host
params.log = utils.Logger()
@ -38,13 +38,13 @@ func TestFilterOption(t *testing.T) {
require.NotEqual(t, 0, params.selectedPeers)
// Unsubscribe options
options2 := []FilterSubscribeOption{
options2 := []SubscribeOption{
WithAutomaticRequestID(),
UnsubscribeAll(),
WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"),
}
params2 := new(FilterSubscribeParameters)
params2 := new(SubscribeParameters)
for _, opt := range options2 {
err := opt(params2)
@ -57,12 +57,12 @@ func TestFilterOption(t *testing.T) {
// Mutually Exclusive options
maddr, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/12345/p2p/16Uiu2HAm8KUwGRruseAaEGD6xGg6XKrDo8Py5dwDoL9wUpCxawGy")
require.NoError(t, err)
options3 := []FilterSubscribeOption{
options3 := []SubscribeOption{
WithPeer("16Uiu2HAm8KUwGRruseAaEGD6xGg6XKrDo8Py5dwDoL9wUpCxawGy"),
WithPeerAddr(maddr),
}
params3 := new(FilterSubscribeParameters)
params3 := new(SubscribeParameters)
for idx, opt := range options3 {
err := opt(params3)

View File

@ -49,7 +49,7 @@ func NewWakuFilterFullNode(timesource timesource.Timesource, reg prometheus.Regi
wf := new(WakuFilterFullNode)
wf.log = log.Named("filterv2-fullnode")
params := new(FilterFullNodeParameters)
params := new(FullNodeParameters)
optList := DefaultFullNodeOptions()
optList = append(optList, opts...)
for _, opt := range optList {