feat(api): parameterize filter subscriptions

This commit is contained in:
Richard Ramos 2024-08-26 11:09:15 -04:00
parent 949684092e
commit 3b5ec53bab
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760

View File

@ -14,8 +14,6 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
const MultiplexChannelBuffer = 100
type FilterConfig struct { type FilterConfig struct {
MaxPeers int `json:"maxPeers"` MaxPeers int `json:"maxPeers"`
Peers []peer.ID `json:"peers"` Peers []peer.ID `json:"peers"`
@ -44,14 +42,46 @@ type Sub struct {
id string id string
} }
type subscribeParameters struct {
batchInterval time.Duration
multiplexChannelBuffer int
}
type SubscribeOptions func(*subscribeParameters)
func WithBatchInterval(t time.Duration) SubscribeOptions {
return func(params *subscribeParameters) {
params.batchInterval = t
}
}
func WithMultiplexChannelBuffer(value int) SubscribeOptions {
return func(params *subscribeParameters) {
params.multiplexChannelBuffer = value
}
}
func defaultOptions() []SubscribeOptions {
return []SubscribeOptions{
WithBatchInterval(5 * time.Second),
WithMultiplexChannelBuffer(100),
}
}
// Subscribe // Subscribe
func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger) (*Sub, error) { func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, opts ...SubscribeOptions) (*Sub, error) {
optList := append(defaultOptions(), opts...)
params := new(subscribeParameters)
for _, opt := range optList {
opt(params)
}
sub := new(Sub) sub := new(Sub)
sub.id = uuid.NewString() sub.id = uuid.NewString()
sub.wf = wf sub.wf = wf
sub.ctx, sub.cancel = context.WithCancel(ctx) sub.ctx, sub.cancel = context.WithCancel(ctx)
sub.subs = make(subscription.SubscriptionSet) sub.subs = make(subscription.SubscriptionSet)
sub.DataCh = make(chan *protocol.Envelope, MultiplexChannelBuffer) sub.DataCh = make(chan *protocol.Envelope, params.multiplexChannelBuffer)
sub.ContentFilter = contentFilter sub.ContentFilter = contentFilter
sub.Config = config sub.Config = config
sub.log = log.Named("filter-api").With(zap.String("apisub-id", sub.id), zap.Stringer("content-filter", sub.ContentFilter)) sub.log = log.Named("filter-api").With(zap.String("apisub-id", sub.id), zap.Stringer("content-filter", sub.ContentFilter))
@ -66,7 +96,7 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte
} }
} }
go sub.subscriptionLoop() go sub.subscriptionLoop(params.batchInterval)
return sub, nil return sub, nil
} }
@ -78,8 +108,8 @@ func (apiSub *Sub) Unsubscribe(contentFilter protocol.ContentFilter) {
} }
} }
func (apiSub *Sub) subscriptionLoop() { func (apiSub *Sub) subscriptionLoop(batchInterval time.Duration) {
ticker := time.NewTicker(5 * time.Second) ticker := time.NewTicker(batchInterval)
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {