From 3b5ec53babcbfe8bcd1064c6ec57453fb8c44d08 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Mon, 26 Aug 2024 11:09:15 -0400 Subject: [PATCH] feat(api): parameterize filter subscriptions --- waku/v2/api/filter/filter.go | 44 ++++++++++++++++++++++++++++++------ 1 file changed, 37 insertions(+), 7 deletions(-) diff --git a/waku/v2/api/filter/filter.go b/waku/v2/api/filter/filter.go index 6bd041e6..f8123704 100644 --- a/waku/v2/api/filter/filter.go +++ b/waku/v2/api/filter/filter.go @@ -14,8 +14,6 @@ import ( "go.uber.org/zap" ) -const MultiplexChannelBuffer = 100 - type FilterConfig struct { MaxPeers int `json:"maxPeers"` Peers []peer.ID `json:"peers"` @@ -44,14 +42,46 @@ type Sub struct { id string } +type subscribeParameters struct { + batchInterval time.Duration + multiplexChannelBuffer int +} + +type SubscribeOptions func(*subscribeParameters) + +func WithBatchInterval(t time.Duration) SubscribeOptions { + return func(params *subscribeParameters) { + params.batchInterval = t + } +} + +func WithMultiplexChannelBuffer(value int) SubscribeOptions { + return func(params *subscribeParameters) { + params.multiplexChannelBuffer = value + } +} + +func defaultOptions() []SubscribeOptions { + return []SubscribeOptions{ + WithBatchInterval(5 * time.Second), + WithMultiplexChannelBuffer(100), + } +} + // Subscribe -func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger) (*Sub, error) { +func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, opts ...SubscribeOptions) (*Sub, error) { + optList := append(defaultOptions(), opts...) + params := new(subscribeParameters) + for _, opt := range optList { + opt(params) + } + sub := new(Sub) sub.id = uuid.NewString() sub.wf = wf sub.ctx, sub.cancel = context.WithCancel(ctx) sub.subs = make(subscription.SubscriptionSet) - sub.DataCh = make(chan *protocol.Envelope, MultiplexChannelBuffer) + sub.DataCh = make(chan *protocol.Envelope, params.multiplexChannelBuffer) sub.ContentFilter = contentFilter sub.Config = config sub.log = log.Named("filter-api").With(zap.String("apisub-id", sub.id), zap.Stringer("content-filter", sub.ContentFilter)) @@ -66,7 +96,7 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte } } - go sub.subscriptionLoop() + go sub.subscriptionLoop(params.batchInterval) return sub, nil } @@ -78,8 +108,8 @@ func (apiSub *Sub) Unsubscribe(contentFilter protocol.ContentFilter) { } } -func (apiSub *Sub) subscriptionLoop() { - ticker := time.NewTicker(5 * time.Second) +func (apiSub *Sub) subscriptionLoop(batchInterval time.Duration) { + ticker := time.NewTicker(batchInterval) defer ticker.Stop() for { select {