mirror of
https://github.com/logos-messaging/logos-messaging-go.git
synced 2026-01-09 17:33:08 +00:00
fix: code review
This commit is contained in:
parent
e37768a78b
commit
366e0dd5c7
@ -23,6 +23,7 @@ import (
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
func createNode(t *testing.T, opts ...node.WakuNodeOption) *node.WakuNode {
|
||||
@ -37,8 +38,8 @@ func createNode(t *testing.T, opts ...node.WakuNodeOption) *node.WakuNode {
|
||||
|
||||
// node2 connects to node1
|
||||
func twoFilterConnectedNodes(t *testing.T, pubSubTopics ...string) (*node.WakuNode, *node.WakuNode) {
|
||||
node1 := createNode(t, node.WithWakuFilterFullNode()) // full node filter
|
||||
node2 := createNode(t, node.WithWakuFilterLightNode()) // light node filter
|
||||
node1 := createNode(t, node.WithWakuFilterFullNode(filter.WithFullNodeRateLimiter(rate.Inf, 0))) // full node filter
|
||||
node2 := createNode(t, node.WithWakuFilterLightNode()) // light node filter
|
||||
|
||||
node2.Host().Peerstore().AddAddr(node1.Host().ID(), tests.GetHostAddress(node1.Host()), peerstore.PermanentAddrTTL)
|
||||
err := node2.Host().Peerstore().AddProtocols(node1.Host().ID(), filter.FilterSubscribeID_v20beta1)
|
||||
|
||||
@ -79,7 +79,7 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.
|
||||
PubsubTopic: relay.DefaultWakuTopic,
|
||||
ContentTopics: protocol.NewContentTopicSet(options.ContentTopic),
|
||||
}
|
||||
var filterOpt filter.SubscribeOption
|
||||
var filterOpt filter.FilterSubscribeOption
|
||||
peerID, err := options.Filter.NodePeerID()
|
||||
if err != nil {
|
||||
filterOpt = filter.WithAutomaticPeerSelection()
|
||||
|
||||
@ -65,7 +65,7 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.
|
||||
PubsubTopic: relay.DefaultWakuTopic,
|
||||
ContentTopics: protocol.NewContentTopicSet(options.ContentTopic),
|
||||
}
|
||||
var filterOpt filter.SubscribeOption
|
||||
var filterOpt filter.FilterSubscribeOption
|
||||
peerID, err := options.Filter.NodePeerID()
|
||||
if err != nil {
|
||||
filterOpt = filter.WithAutomaticPeerSelection()
|
||||
|
||||
@ -57,7 +57,7 @@ func FilterSubscribe(instance *WakuInstance, filterJSON string, peerID string, m
|
||||
ctx = instance.ctx
|
||||
}
|
||||
|
||||
var fOptions []filter.SubscribeOption
|
||||
var fOptions []filter.FilterSubscribeOption
|
||||
if peerID != "" {
|
||||
p, err := peer.Decode(peerID)
|
||||
if err != nil {
|
||||
@ -141,7 +141,7 @@ func FilterUnsubscribe(instance *WakuInstance, filterJSON string, peerID string,
|
||||
ctx = instance.ctx
|
||||
}
|
||||
|
||||
var fOptions []filter.SubscribeOption
|
||||
var fOptions []filter.FilterSubscribeOption
|
||||
if peerID != "" {
|
||||
p, err := peer.Decode(peerID)
|
||||
if err != nil {
|
||||
@ -185,7 +185,7 @@ func FilterUnsubscribeAll(instance *WakuInstance, peerID string, ms int) (string
|
||||
ctx = instance.ctx
|
||||
}
|
||||
|
||||
var fOptions []filter.SubscribeOption
|
||||
var fOptions []filter.FilterSubscribeOption
|
||||
if peerID != "" {
|
||||
p, err := peer.Decode(peerID)
|
||||
if err != nil {
|
||||
|
||||
@ -194,7 +194,7 @@ func possibleRecursiveError(err error) bool {
|
||||
|
||||
func (apiSub *Sub) subscribe(contentFilter protocol.ContentFilter, peerCount int, peersToExclude ...peer.ID) ([]*subscription.SubscriptionDetails, error) {
|
||||
// Low-level subscribe, returns a set of SubscriptionDetails
|
||||
options := make([]filter.SubscribeOption, 0)
|
||||
options := make([]filter.FilterSubscribeOption, 0)
|
||||
options = append(options, filter.WithMaxPeersPerContentFilter(int(peerCount)))
|
||||
for _, p := range apiSub.Config.Peers {
|
||||
options = append(options, filter.WithPeer(p))
|
||||
|
||||
@ -85,7 +85,7 @@ type WakuNodeParameters struct {
|
||||
enableRelay bool
|
||||
enableFilterLightNode bool
|
||||
enableFilterFullNode bool
|
||||
filterOpts []filter.FullNodeOption
|
||||
filterOpts []filter.Option
|
||||
pubsubOpts []pubsub.Option
|
||||
lightpushOpts []lightpush.Option
|
||||
|
||||
@ -471,7 +471,7 @@ func WithWakuFilterLightNode() WakuNodeOption {
|
||||
|
||||
// WithWakuFilterFullNode enables the Waku Filter V2 protocol full node functionality.
|
||||
// This WakuNodeOption accepts a list of WakuFilter options to setup the protocol
|
||||
func WithWakuFilterFullNode(filterOpts ...filter.FullNodeOption) WakuNodeOption {
|
||||
func WithWakuFilterFullNode(filterOpts ...filter.Option) WakuNodeOption {
|
||||
return func(params *WakuNodeParameters) error {
|
||||
params.enableFilterFullNode = true
|
||||
params.filterOpts = filterOpts
|
||||
|
||||
@ -167,6 +167,9 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea
|
||||
|
||||
if !wf.limiter.Allow(peerID) {
|
||||
wf.metrics.RecordError(rateLimitFailure)
|
||||
if err := stream.Reset(); err != nil {
|
||||
wf.log.Error("resetting connection", zap.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@ -318,8 +321,8 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context, contentFilter protocol.ContentFilter, opts []SubscribeOption) (*SubscribeParameters, map[string][]string, error) {
|
||||
params := new(SubscribeParameters)
|
||||
func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context, contentFilter protocol.ContentFilter, opts []FilterSubscribeOption) (*FilterSubscribeParameters, map[string][]string, error) {
|
||||
params := new(FilterSubscribeParameters)
|
||||
params.log = wf.log
|
||||
params.host = wf.h
|
||||
params.pm = wf.pm
|
||||
@ -377,7 +380,7 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context,
|
||||
// If contentTopics passed result in different pubSub topics (due to Auto/Static sharding), then multiple subscription requests are sent to the peer.
|
||||
// This may change if Filterv2 protocol is updated to handle such a scenario in a single request.
|
||||
// Note: In case of partial failure, results are returned for successful subscriptions along with error indicating failed contentTopics.
|
||||
func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...SubscribeOption) ([]*subscription.SubscriptionDetails, error) {
|
||||
func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) ([]*subscription.SubscriptionDetails, error) {
|
||||
wf.RLock()
|
||||
defer wf.RUnlock()
|
||||
if err := wf.ErrOnNotRunning(); err != nil {
|
||||
@ -479,8 +482,8 @@ func (wf *WakuFilterLightNode) FilterSubscription(peerID peer.ID, contentFilter
|
||||
return wf.subscriptions.NewSubscription(peerID, contentFilter), nil
|
||||
}
|
||||
|
||||
func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...SubscribeOption) (*SubscribeParameters, error) {
|
||||
params := new(SubscribeParameters)
|
||||
func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterSubscribeOption) (*FilterSubscribeParameters, error) {
|
||||
params := new(FilterSubscribeParameters)
|
||||
params.log = wf.log
|
||||
opts = append(DefaultUnsubscribeOptions(), opts...)
|
||||
for _, opt := range opts {
|
||||
@ -492,14 +495,14 @@ func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...SubscribeOption)
|
||||
return params, nil
|
||||
}
|
||||
|
||||
func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID, opts ...PingOption) error {
|
||||
func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID, opts ...FilterPingOption) error {
|
||||
wf.RLock()
|
||||
defer wf.RUnlock()
|
||||
if err := wf.ErrOnNotRunning(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
params := &PingParameters{}
|
||||
params := &FilterPingParameters{}
|
||||
for _, opt := range opts {
|
||||
opt(params)
|
||||
}
|
||||
@ -516,7 +519,7 @@ func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID, opts ..
|
||||
}
|
||||
|
||||
// Unsubscribe is used to stop receiving messages from specified peers for the content filter
|
||||
func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...SubscribeOption) (*WakuFilterPushResult, error) {
|
||||
func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) {
|
||||
wf.RLock()
|
||||
defer wf.RUnlock()
|
||||
if err := wf.ErrOnNotRunning(); err != nil {
|
||||
@ -621,7 +624,7 @@ func (wf *WakuFilterLightNode) IsListening(pubsubTopic, contentTopic string) boo
|
||||
// If there are no more subscriptions matching the passed [peer, contentFilter] pair,
|
||||
// server unsubscribe is also performed
|
||||
func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *subscription.SubscriptionDetails,
|
||||
opts ...SubscribeOption) (*WakuFilterPushResult, error) {
|
||||
opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) {
|
||||
wf.RLock()
|
||||
defer wf.RUnlock()
|
||||
if err := wf.ErrOnNotRunning(); err != nil {
|
||||
@ -668,7 +671,7 @@ func (wf *WakuFilterLightNode) unsubscribeFromServer(ctx context.Context, reques
|
||||
|
||||
// close all subscribe for selectedPeer or if selectedPeer == "", then all peers
|
||||
// send the unsubscribeAll request to the peers
|
||||
func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...SubscribeOption) (*WakuFilterPushResult, error) {
|
||||
func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) {
|
||||
params, err := wf.getUnsubscribeParameters(opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -739,7 +742,7 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Subsc
|
||||
}
|
||||
|
||||
// UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions
|
||||
func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...SubscribeOption) (*WakuFilterPushResult, error) {
|
||||
func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) {
|
||||
wf.RLock()
|
||||
defer wf.RUnlock()
|
||||
if err := wf.ErrOnNotRunning(); err != nil {
|
||||
|
||||
@ -58,7 +58,7 @@ func (s *FilterTestSuite) TestMultipleMessages() {
|
||||
s.waitForMsg(&WakuMsg{s.TestTopic, s.TestContentTopic, "second"})
|
||||
}
|
||||
|
||||
func (wf *WakuFilterLightNode) incorrectSubscribeRequest(ctx context.Context, params *SubscribeParameters,
|
||||
func (wf *WakuFilterLightNode) incorrectSubscribeRequest(ctx context.Context, params *FilterSubscribeParameters,
|
||||
reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter protocol.ContentFilter) error {
|
||||
|
||||
const FilterSubscribeID_Incorrect1 = libp2pProtocol.ID("/vac/waku/filter-subscribe/abcd")
|
||||
@ -111,7 +111,7 @@ func (wf *WakuFilterLightNode) incorrectSubscribeRequest(ctx context.Context, pa
|
||||
return nil
|
||||
}
|
||||
|
||||
func (wf *WakuFilterLightNode) IncorrectSubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...SubscribeOption) ([]*subscription.SubscriptionDetails, error) {
|
||||
func (wf *WakuFilterLightNode) IncorrectSubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) ([]*subscription.SubscriptionDetails, error) {
|
||||
wf.RLock()
|
||||
defer wf.RUnlock()
|
||||
if err := wf.ErrOnNotRunning(); err != nil {
|
||||
@ -129,7 +129,7 @@ func (wf *WakuFilterLightNode) IncorrectSubscribe(ctx context.Context, contentFi
|
||||
return nil, fmt.Errorf("exceeds maximum content topics: %d", MaxContentTopicsPerRequest)
|
||||
}
|
||||
|
||||
params := new(SubscribeParameters)
|
||||
params := new(FilterSubscribeParameters)
|
||||
params.log = wf.log
|
||||
params.host = wf.h
|
||||
params.pm = wf.pm
|
||||
|
||||
@ -392,7 +392,7 @@ func (s *FilterTestSuite) TestHandleFilterSubscribeOptions() {
|
||||
s.subscribe(s.TestTopic, s.TestContentTopic, s.FullNodeHost.ID())
|
||||
|
||||
// With valid peer
|
||||
opts := []SubscribeOption{WithPeer(s.FullNodeHost.ID())}
|
||||
opts := []FilterSubscribeOption{WithPeer(s.FullNodeHost.ID())}
|
||||
|
||||
// Positive case
|
||||
_, _, err := s.LightNode.handleFilterSubscribeOptions(s.ctx, contentFilter, opts)
|
||||
@ -401,7 +401,7 @@ func (s *FilterTestSuite) TestHandleFilterSubscribeOptions() {
|
||||
addr := s.FullNodeHost.Addrs()[0]
|
||||
|
||||
// Combine mutually exclusive options
|
||||
opts = []SubscribeOption{WithPeer(s.FullNodeHost.ID()), WithPeerAddr(addr)}
|
||||
opts = []FilterSubscribeOption{WithPeer(s.FullNodeHost.ID()), WithPeerAddr(addr)}
|
||||
|
||||
// Should fail on wrong option combination
|
||||
_, _, err = s.LightNode.handleFilterSubscribeOptions(s.ctx, contentFilter, opts)
|
||||
|
||||
@ -14,28 +14,28 @@ import (
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
func (old *SubscribeParameters) Copy() *SubscribeParameters {
|
||||
return &SubscribeParameters{
|
||||
func (old *FilterSubscribeParameters) Copy() *FilterSubscribeParameters {
|
||||
return &FilterSubscribeParameters{
|
||||
selectedPeers: old.selectedPeers,
|
||||
requestID: old.requestID,
|
||||
}
|
||||
}
|
||||
|
||||
type (
|
||||
PingParameters struct {
|
||||
FilterPingParameters struct {
|
||||
requestID []byte
|
||||
}
|
||||
PingOption func(*PingParameters)
|
||||
FilterPingOption func(*FilterPingParameters)
|
||||
)
|
||||
|
||||
func WithPingRequestId(requestId []byte) PingOption {
|
||||
return func(params *PingParameters) {
|
||||
func WithPingRequestId(requestId []byte) FilterPingOption {
|
||||
return func(params *FilterPingParameters) {
|
||||
params.requestID = requestId
|
||||
}
|
||||
}
|
||||
|
||||
type (
|
||||
SubscribeParameters struct {
|
||||
FilterSubscribeParameters struct {
|
||||
selectedPeers peer.IDSlice
|
||||
peerAddr multiaddr.Multiaddr
|
||||
peerSelectionType peermanager.PeerSelection
|
||||
@ -54,7 +54,7 @@ type (
|
||||
wg *sync.WaitGroup
|
||||
}
|
||||
|
||||
FullNodeParameters struct {
|
||||
FilterParameters struct {
|
||||
Timeout time.Duration
|
||||
MaxSubscribers int
|
||||
pm *peermanager.PeerManager
|
||||
@ -62,7 +62,7 @@ type (
|
||||
limitB int
|
||||
}
|
||||
|
||||
FullNodeOption func(*FullNodeParameters)
|
||||
Option func(*FilterParameters)
|
||||
|
||||
LightNodeParameters struct {
|
||||
limitR rate.Limit
|
||||
@ -71,7 +71,7 @@ type (
|
||||
|
||||
LightNodeOption func(*LightNodeParameters)
|
||||
|
||||
SubscribeOption func(*SubscribeParameters) error
|
||||
FilterSubscribeOption func(*FilterSubscribeParameters) error
|
||||
)
|
||||
|
||||
func WithLightNodeRateLimiter(r rate.Limit, b int) LightNodeOption {
|
||||
@ -83,20 +83,20 @@ func WithLightNodeRateLimiter(r rate.Limit, b int) LightNodeOption {
|
||||
|
||||
func DefaultLightNodeOptions() []LightNodeOption {
|
||||
return []LightNodeOption{
|
||||
WithLightNodeRateLimiter(rate.Inf, 0),
|
||||
WithLightNodeRateLimiter(1, 1),
|
||||
}
|
||||
}
|
||||
|
||||
func WithTimeout(timeout time.Duration) FullNodeOption {
|
||||
return func(params *FullNodeParameters) {
|
||||
func WithTimeout(timeout time.Duration) Option {
|
||||
return func(params *FilterParameters) {
|
||||
params.Timeout = timeout
|
||||
}
|
||||
}
|
||||
|
||||
// WithPeer is an option used to specify the peerID to request the message history.
|
||||
// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used.
|
||||
func WithPeer(p peer.ID) SubscribeOption {
|
||||
return func(params *SubscribeParameters) error {
|
||||
func WithPeer(p peer.ID) FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) error {
|
||||
params.selectedPeers = append(params.selectedPeers, p)
|
||||
if params.peerAddr != nil {
|
||||
return errors.New("peerAddr and peerId options are mutually exclusive")
|
||||
@ -108,8 +108,8 @@ func WithPeer(p peer.ID) SubscribeOption {
|
||||
// WithPeerAddr is an option used to specify a peerAddress.
|
||||
// This new peer will be added to peerStore.
|
||||
// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used.
|
||||
func WithPeerAddr(pAddr multiaddr.Multiaddr) SubscribeOption {
|
||||
return func(params *SubscribeParameters) error {
|
||||
func WithPeerAddr(pAddr multiaddr.Multiaddr) FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) error {
|
||||
params.peerAddr = pAddr
|
||||
if len(params.selectedPeers) != 0 {
|
||||
return errors.New("peerAddr and peerId options are mutually exclusive")
|
||||
@ -118,16 +118,16 @@ func WithPeerAddr(pAddr multiaddr.Multiaddr) SubscribeOption {
|
||||
}
|
||||
}
|
||||
|
||||
func WithMaxPeersPerContentFilter(numPeers int) SubscribeOption {
|
||||
return func(params *SubscribeParameters) error {
|
||||
func WithMaxPeersPerContentFilter(numPeers int) FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) error {
|
||||
params.maxPeers = numPeers
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithPeersToExclude option excludes the peers that are specified from selection
|
||||
func WithPeersToExclude(peers ...peer.ID) SubscribeOption {
|
||||
return func(params *SubscribeParameters) error {
|
||||
func WithPeersToExclude(peers ...peer.ID) FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) error {
|
||||
params.peersToExclude = peermanager.PeerSliceToMap(peers)
|
||||
return nil
|
||||
}
|
||||
@ -136,8 +136,8 @@ func WithPeersToExclude(peers ...peer.ID) SubscribeOption {
|
||||
// WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store.
|
||||
// If a list of specific peers is passed, the peer will be chosen from that list assuming it
|
||||
// supports the chosen protocol, otherwise it will chose a peer from the node peerstore
|
||||
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) SubscribeOption {
|
||||
return func(params *SubscribeParameters) error {
|
||||
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) error {
|
||||
params.peerSelectionType = peermanager.Automatic
|
||||
params.preferredPeers = fromThesePeers
|
||||
return nil
|
||||
@ -148,8 +148,8 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) SubscribeOption {
|
||||
// with the lowest ping If a list of specific peers is passed, the peer will be chosen
|
||||
// from that list assuming it supports the chosen protocol, otherwise it will chose a
|
||||
// peer from the node peerstore
|
||||
func WithFastestPeerSelection(fromThesePeers ...peer.ID) SubscribeOption {
|
||||
return func(params *SubscribeParameters) error {
|
||||
func WithFastestPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) error {
|
||||
params.peerSelectionType = peermanager.LowestRTT
|
||||
return nil
|
||||
}
|
||||
@ -157,8 +157,8 @@ func WithFastestPeerSelection(fromThesePeers ...peer.ID) SubscribeOption {
|
||||
|
||||
// WithRequestID is an option to set a specific request ID to be used when
|
||||
// creating/removing a filter subscription
|
||||
func WithRequestID(requestID []byte) SubscribeOption {
|
||||
return func(params *SubscribeParameters) error {
|
||||
func WithRequestID(requestID []byte) FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) error {
|
||||
params.requestID = requestID
|
||||
return nil
|
||||
}
|
||||
@ -166,23 +166,23 @@ func WithRequestID(requestID []byte) SubscribeOption {
|
||||
|
||||
// WithAutomaticRequestID is an option to automatically generate a request ID
|
||||
// when creating a filter subscription
|
||||
func WithAutomaticRequestID() SubscribeOption {
|
||||
return func(params *SubscribeParameters) error {
|
||||
func WithAutomaticRequestID() FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) error {
|
||||
params.requestID = protocol.GenerateRequestID()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func DefaultSubscriptionOptions() []SubscribeOption {
|
||||
return []SubscribeOption{
|
||||
func DefaultSubscriptionOptions() []FilterSubscribeOption {
|
||||
return []FilterSubscribeOption{
|
||||
WithAutomaticPeerSelection(),
|
||||
WithAutomaticRequestID(),
|
||||
WithMaxPeersPerContentFilter(1),
|
||||
}
|
||||
}
|
||||
|
||||
func UnsubscribeAll() SubscribeOption {
|
||||
return func(params *SubscribeParameters) error {
|
||||
func UnsubscribeAll() FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) error {
|
||||
params.unsubscribeAll = true
|
||||
return nil
|
||||
}
|
||||
@ -190,8 +190,8 @@ func UnsubscribeAll() SubscribeOption {
|
||||
|
||||
// WithWaitGroup allows specifying a waitgroup to wait until all
|
||||
// unsubscribe requests are complete before the function is complete
|
||||
func WithWaitGroup(wg *sync.WaitGroup) SubscribeOption {
|
||||
return func(params *SubscribeParameters) error {
|
||||
func WithWaitGroup(wg *sync.WaitGroup) FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) error {
|
||||
params.wg = wg
|
||||
return nil
|
||||
}
|
||||
@ -199,43 +199,43 @@ func WithWaitGroup(wg *sync.WaitGroup) SubscribeOption {
|
||||
|
||||
// DontWait is used to fire and forget an unsubscription, and don't
|
||||
// care about the results of it
|
||||
func DontWait() SubscribeOption {
|
||||
return func(params *SubscribeParameters) error {
|
||||
func DontWait() FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) error {
|
||||
params.wg = nil
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func DefaultUnsubscribeOptions() []SubscribeOption {
|
||||
return []SubscribeOption{
|
||||
func DefaultUnsubscribeOptions() []FilterSubscribeOption {
|
||||
return []FilterSubscribeOption{
|
||||
WithAutomaticRequestID(),
|
||||
WithWaitGroup(&sync.WaitGroup{}),
|
||||
}
|
||||
}
|
||||
|
||||
func WithMaxSubscribers(maxSubscribers int) FullNodeOption {
|
||||
return func(params *FullNodeParameters) {
|
||||
func WithMaxSubscribers(maxSubscribers int) Option {
|
||||
return func(params *FilterParameters) {
|
||||
params.MaxSubscribers = maxSubscribers
|
||||
}
|
||||
}
|
||||
|
||||
func WithPeerManager(pm *peermanager.PeerManager) FullNodeOption {
|
||||
return func(params *FullNodeParameters) {
|
||||
func WithPeerManager(pm *peermanager.PeerManager) Option {
|
||||
return func(params *FilterParameters) {
|
||||
params.pm = pm
|
||||
}
|
||||
}
|
||||
|
||||
func WithFullNodeRateLimiter(r rate.Limit, b int) FullNodeOption {
|
||||
return func(params *FullNodeParameters) {
|
||||
func WithFullNodeRateLimiter(r rate.Limit, b int) Option {
|
||||
return func(params *FilterParameters) {
|
||||
params.limitR = r
|
||||
params.limitB = b
|
||||
}
|
||||
}
|
||||
|
||||
func DefaultFullNodeOptions() []FullNodeOption {
|
||||
return []FullNodeOption{
|
||||
func DefaultOptions() []Option {
|
||||
return []Option{
|
||||
WithTimeout(DefaultIdleSubscriptionTimeout),
|
||||
WithMaxSubscribers(DefaultMaxSubscribers),
|
||||
WithFullNodeRateLimiter(rate.Inf, 0),
|
||||
WithFullNodeRateLimiter(1, 1),
|
||||
}
|
||||
}
|
||||
|
||||
@ -19,13 +19,13 @@ func TestFilterOption(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
// subscribe options
|
||||
options := []SubscribeOption{
|
||||
options := []FilterSubscribeOption{
|
||||
WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"),
|
||||
WithAutomaticPeerSelection(),
|
||||
WithFastestPeerSelection(),
|
||||
}
|
||||
|
||||
params := new(SubscribeParameters)
|
||||
params := new(FilterSubscribeParameters)
|
||||
params.host = host
|
||||
params.log = utils.Logger()
|
||||
|
||||
@ -38,13 +38,13 @@ func TestFilterOption(t *testing.T) {
|
||||
require.NotEqual(t, 0, params.selectedPeers)
|
||||
|
||||
// Unsubscribe options
|
||||
options2 := []SubscribeOption{
|
||||
options2 := []FilterSubscribeOption{
|
||||
WithAutomaticRequestID(),
|
||||
UnsubscribeAll(),
|
||||
WithPeer("QmWLxGxG65CZ7vRj5oNXCJvbY9WkF9d9FxuJg8cg8Y7q3"),
|
||||
}
|
||||
|
||||
params2 := new(SubscribeParameters)
|
||||
params2 := new(FilterSubscribeParameters)
|
||||
|
||||
for _, opt := range options2 {
|
||||
err := opt(params2)
|
||||
@ -57,12 +57,12 @@ func TestFilterOption(t *testing.T) {
|
||||
// Mutually Exclusive options
|
||||
maddr, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/12345/p2p/16Uiu2HAm8KUwGRruseAaEGD6xGg6XKrDo8Py5dwDoL9wUpCxawGy")
|
||||
require.NoError(t, err)
|
||||
options3 := []SubscribeOption{
|
||||
options3 := []FilterSubscribeOption{
|
||||
WithPeer("16Uiu2HAm8KUwGRruseAaEGD6xGg6XKrDo8Py5dwDoL9wUpCxawGy"),
|
||||
WithPeerAddr(maddr),
|
||||
}
|
||||
|
||||
params3 := new(SubscribeParameters)
|
||||
params3 := new(FilterSubscribeParameters)
|
||||
|
||||
for idx, opt := range options3 {
|
||||
err := opt(params3)
|
||||
|
||||
@ -45,12 +45,12 @@ type (
|
||||
)
|
||||
|
||||
// NewWakuFilterFullNode returns a new instance of Waku Filter struct setup according to the chosen parameter and options
|
||||
func NewWakuFilterFullNode(timesource timesource.Timesource, reg prometheus.Registerer, log *zap.Logger, opts ...FullNodeOption) *WakuFilterFullNode {
|
||||
func NewWakuFilterFullNode(timesource timesource.Timesource, reg prometheus.Registerer, log *zap.Logger, opts ...Option) *WakuFilterFullNode {
|
||||
wf := new(WakuFilterFullNode)
|
||||
wf.log = log.Named("filterv2-fullnode")
|
||||
|
||||
params := new(FullNodeParameters)
|
||||
optList := DefaultFullNodeOptions()
|
||||
params := new(FilterParameters)
|
||||
optList := DefaultOptions()
|
||||
optList = append(optList, opts...)
|
||||
for _, opt := range optList {
|
||||
opt(params)
|
||||
|
||||
@ -22,6 +22,7 @@ import (
|
||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
type LightNodeData struct {
|
||||
@ -133,7 +134,7 @@ func (s *FilterTestSuite) GetWakuFilterFullNode(topic string, withRegisterAll bo
|
||||
|
||||
nodeData := s.GetWakuRelay(topic)
|
||||
|
||||
node2Filter := NewWakuFilterFullNode(timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log)
|
||||
node2Filter := NewWakuFilterFullNode(timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log, WithFullNodeRateLimiter(rate.Inf, 0))
|
||||
node2Filter.SetHost(nodeData.FullNodeHost)
|
||||
|
||||
var sub *relay.Subscription
|
||||
@ -166,7 +167,7 @@ func (s *FilterTestSuite) GetWakuFilterLightNode() LightNodeData {
|
||||
b := relay.NewBroadcaster(10)
|
||||
s.Require().NoError(b.Start(context.Background()))
|
||||
pm := peermanager.NewPeerManager(5, 5, nil, nil, true, s.Log)
|
||||
filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), onlinechecker.NewDefaultOnlineChecker(true), prometheus.DefaultRegisterer, s.Log)
|
||||
filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), onlinechecker.NewDefaultOnlineChecker(true), prometheus.DefaultRegisterer, s.Log, WithLightNodeRateLimiter(rate.Inf, 0))
|
||||
filterPush.SetHost(host)
|
||||
pm.SetHost(host)
|
||||
return LightNodeData{filterPush, host}
|
||||
|
||||
@ -58,7 +58,7 @@ func NewWakuLightPush(relay *relay.WakuRelay, pm *peermanager.PeerManager, reg p
|
||||
wakuLP.metrics = newMetrics(reg)
|
||||
|
||||
params := &LightpushParameters{}
|
||||
opts = append(DefaultOptions(), opts...)
|
||||
opts = append(DefaultLightpushOptions(), opts...)
|
||||
for _, opt := range opts {
|
||||
opt(params)
|
||||
}
|
||||
@ -257,7 +257,7 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe
|
||||
params.pm = wakuLP.pm
|
||||
var err error
|
||||
|
||||
optList := append(DefaultRequestOptions(wakuLP.h), opts...)
|
||||
optList := append(DefaultOptions(wakuLP.h), opts...)
|
||||
for _, opt := range optList {
|
||||
err := opt(params)
|
||||
if err != nil {
|
||||
|
||||
@ -28,9 +28,9 @@ func WithRateLimiter(r rate.Limit, b int) Option {
|
||||
}
|
||||
}
|
||||
|
||||
func DefaultOptions() []Option {
|
||||
func DefaultLightpushOptions() []Option {
|
||||
return []Option{
|
||||
WithRateLimiter(rate.Inf, 0),
|
||||
WithRateLimiter(1, 1),
|
||||
}
|
||||
}
|
||||
|
||||
@ -138,8 +138,8 @@ func WithAutomaticRequestID() RequestOption {
|
||||
}
|
||||
}
|
||||
|
||||
// DefaultRequestOptions are the default options to be used when using the lightpush protocol
|
||||
func DefaultRequestOptions(host host.Host) []RequestOption {
|
||||
// DefaultOptions are the default options to be used when using the lightpush protocol
|
||||
func DefaultOptions(host host.Host) []RequestOption {
|
||||
return []RequestOption{
|
||||
WithAutomaticPeerSelection(),
|
||||
WithMaxPeers(1), //keeping default as 2 for status use-case
|
||||
|
||||
@ -8,6 +8,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/peerstore"
|
||||
@ -273,7 +274,7 @@ func TestWakuLightPushCornerCases(t *testing.T) {
|
||||
defer node2.Stop()
|
||||
defer sub2.Unsubscribe()
|
||||
|
||||
lightPushNode2 := NewWakuLightPush(node2, pm, prometheus.DefaultRegisterer, utils.Logger())
|
||||
lightPushNode2 := NewWakuLightPush(node2, pm, prometheus.DefaultRegisterer, utils.Logger(), WithRateLimiter(rate.Inf, 0))
|
||||
lightPushNode2.SetHost(host2)
|
||||
err := lightPushNode2.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
@ -358,7 +359,7 @@ func TestWakuLightPushWithStaticSharding(t *testing.T) {
|
||||
|
||||
clientHost, err := tests.MakeHost(context.Background(), port, rand.Reader)
|
||||
require.NoError(t, err)
|
||||
client := NewWakuLightPush(nil, nil, prometheus.DefaultRegisterer, utils.Logger())
|
||||
client := NewWakuLightPush(nil, nil, prometheus.DefaultRegisterer, utils.Logger(), WithRateLimiter(rate.Inf, 0))
|
||||
client.SetHost(clientHost)
|
||||
|
||||
// Node2
|
||||
@ -366,7 +367,7 @@ func TestWakuLightPushWithStaticSharding(t *testing.T) {
|
||||
defer node2.Stop()
|
||||
defer sub2.Unsubscribe()
|
||||
|
||||
lightPushNode2 := NewWakuLightPush(node2, nil, prometheus.DefaultRegisterer, utils.Logger())
|
||||
lightPushNode2 := NewWakuLightPush(node2, nil, prometheus.DefaultRegisterer, utils.Logger(), WithRateLimiter(rate.Inf, 0))
|
||||
lightPushNode2.SetHost(host2)
|
||||
err = lightPushNode2.Start(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
@ -26,7 +26,7 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts
|
||||
params.log = wakuPX.log
|
||||
params.pm = wakuPX.pm
|
||||
|
||||
optList := DefaultRequestOptions(wakuPX.h)
|
||||
optList := DefaultOptions(wakuPX.h)
|
||||
optList = append(optList, opts...)
|
||||
for _, opt := range optList {
|
||||
err := opt(params)
|
||||
|
||||
@ -67,7 +67,7 @@ func NewWakuPeerExchange(disc *discv5.DiscoveryV5, clusterID uint16, peerConnect
|
||||
wakuPX.CommonService = service.NewCommonService()
|
||||
|
||||
params := &PeerExchangeParameters{}
|
||||
opts = append(DefaultOptions(), opts...)
|
||||
opts = append(DefaultPeerExchangeOptions(), opts...)
|
||||
for _, opt := range opts {
|
||||
opt(params)
|
||||
}
|
||||
|
||||
@ -26,9 +26,9 @@ func WithRateLimiter(r rate.Limit, b int) Option {
|
||||
}
|
||||
}
|
||||
|
||||
func DefaultOptions() []Option {
|
||||
func DefaultPeerExchangeOptions() []Option {
|
||||
return []Option{
|
||||
WithRateLimiter(rate.Inf, 0),
|
||||
WithRateLimiter(1, 1),
|
||||
}
|
||||
}
|
||||
|
||||
@ -95,8 +95,8 @@ func WithFastestPeerSelection(fromThesePeers ...peer.ID) RequestOption {
|
||||
}
|
||||
}
|
||||
|
||||
// DefaultRequestOptions are the default options to be used when using the lightpush protocol
|
||||
func DefaultRequestOptions(host host.Host) []RequestOption {
|
||||
// DefaultOptions are the default options to be used when using the lightpush protocol
|
||||
func DefaultOptions(host host.Host) []RequestOption {
|
||||
return []RequestOption{
|
||||
WithAutomaticPeerSelection(),
|
||||
}
|
||||
|
||||
@ -248,7 +248,7 @@ func TestPeerExchangeOptions(t *testing.T) {
|
||||
params.log = px1.log
|
||||
params.pm = px1.pm
|
||||
|
||||
optList := DefaultRequestOptions(px1.h)
|
||||
optList := DefaultOptions(px1.h)
|
||||
optList = append(optList, WithPeerAddr(host1.Addrs()[0]))
|
||||
for _, opt := range optList {
|
||||
err := opt(params)
|
||||
@ -258,7 +258,7 @@ func TestPeerExchangeOptions(t *testing.T) {
|
||||
require.Equal(t, host1.Addrs()[0], params.peerAddr)
|
||||
|
||||
// Test WithFastestPeerSelection()
|
||||
optList = DefaultRequestOptions(px1.h)
|
||||
optList = DefaultOptions(px1.h)
|
||||
optList = append(optList, WithFastestPeerSelection(host1.ID()))
|
||||
for _, opt := range optList {
|
||||
err := opt(params)
|
||||
|
||||
@ -65,8 +65,5 @@ func (r *RateLimiter) Allow(peerID peer.ID) bool {
|
||||
}
|
||||
|
||||
func (r *RateLimiter) Wait(ctx context.Context, peerID peer.ID) error {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
return r.getOrCreate(peerID).Wait(ctx)
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user