fix_: reduce filter loop to 300ms (#5768)
Co-authored-by: Andrea Maria Piana <andrea.maria.piana@gmail.com>
This commit is contained in:
parent
0c0888927c
commit
bb6b0866f0
2
go.mod
2
go.mod
|
@ -95,7 +95,7 @@ require (
|
|||
github.com/schollz/peerdiscovery v1.7.0
|
||||
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
|
||||
github.com/urfave/cli/v2 v2.27.2
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240823143238-949684092ec5
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240826153427-69e1b559bc49
|
||||
github.com/wk8/go-ordered-map/v2 v2.1.7
|
||||
github.com/yeqown/go-qrcode/v2 v2.2.1
|
||||
github.com/yeqown/go-qrcode/writer/standard v1.2.1
|
||||
|
|
4
go.sum
4
go.sum
|
@ -2137,8 +2137,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E
|
|||
github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ=
|
||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
|
||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240823143238-949684092ec5 h1:r5kgO4DWxeKyGF+wq5KhayW710XAqX5iWXhS/4ZqVkc=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240823143238-949684092ec5/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240826153427-69e1b559bc49 h1:LKKgMmvUYFOzrWVQYLbI4nmXza4hTY7XsFk9WAXL/r0=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240826153427-69e1b559bc49/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg=
|
||||
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
|
||||
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
|
||||
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=
|
||||
|
|
|
@ -14,8 +14,6 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const MultiplexChannelBuffer = 100
|
||||
|
||||
type FilterConfig struct {
|
||||
MaxPeers int `json:"maxPeers"`
|
||||
Peers []peer.ID `json:"peers"`
|
||||
|
@ -44,14 +42,46 @@ type Sub struct {
|
|||
id string
|
||||
}
|
||||
|
||||
type subscribeParameters struct {
|
||||
batchInterval time.Duration
|
||||
multiplexChannelBuffer int
|
||||
}
|
||||
|
||||
type SubscribeOptions func(*subscribeParameters)
|
||||
|
||||
func WithBatchInterval(t time.Duration) SubscribeOptions {
|
||||
return func(params *subscribeParameters) {
|
||||
params.batchInterval = t
|
||||
}
|
||||
}
|
||||
|
||||
func WithMultiplexChannelBuffer(value int) SubscribeOptions {
|
||||
return func(params *subscribeParameters) {
|
||||
params.multiplexChannelBuffer = value
|
||||
}
|
||||
}
|
||||
|
||||
func defaultOptions() []SubscribeOptions {
|
||||
return []SubscribeOptions{
|
||||
WithBatchInterval(5 * time.Second),
|
||||
WithMultiplexChannelBuffer(100),
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe
|
||||
func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger) (*Sub, error) {
|
||||
func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, opts ...SubscribeOptions) (*Sub, error) {
|
||||
optList := append(defaultOptions(), opts...)
|
||||
params := new(subscribeParameters)
|
||||
for _, opt := range optList {
|
||||
opt(params)
|
||||
}
|
||||
|
||||
sub := new(Sub)
|
||||
sub.id = uuid.NewString()
|
||||
sub.wf = wf
|
||||
sub.ctx, sub.cancel = context.WithCancel(ctx)
|
||||
sub.subs = make(subscription.SubscriptionSet)
|
||||
sub.DataCh = make(chan *protocol.Envelope, MultiplexChannelBuffer)
|
||||
sub.DataCh = make(chan *protocol.Envelope, params.multiplexChannelBuffer)
|
||||
sub.ContentFilter = contentFilter
|
||||
sub.Config = config
|
||||
sub.log = log.Named("filter-api").With(zap.String("apisub-id", sub.id), zap.Stringer("content-filter", sub.ContentFilter))
|
||||
|
@ -66,7 +96,7 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte
|
|||
}
|
||||
}
|
||||
|
||||
go sub.subscriptionLoop()
|
||||
go sub.subscriptionLoop(params.batchInterval)
|
||||
return sub, nil
|
||||
}
|
||||
|
||||
|
@ -78,8 +108,8 @@ func (apiSub *Sub) Unsubscribe(contentFilter protocol.ContentFilter) {
|
|||
}
|
||||
}
|
||||
|
||||
func (apiSub *Sub) subscriptionLoop() {
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
func (apiSub *Sub) subscriptionLoop(batchInterval time.Duration) {
|
||||
ticker := time.NewTicker(batchInterval)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
select {
|
||||
|
|
|
@ -31,6 +31,7 @@ type appFilterMap map[string]filterConfig
|
|||
type FilterManager struct {
|
||||
sync.Mutex
|
||||
ctx context.Context
|
||||
opts []SubscribeOptions
|
||||
minPeersPerFilter int
|
||||
onlineChecker *onlinechecker.DefaultOnlineChecker
|
||||
filterSubscriptions map[string]SubDetails // map of aggregated filters to apiSub details
|
||||
|
@ -59,10 +60,11 @@ type EnevelopeProcessor interface {
|
|||
OnNewEnvelope(env *protocol.Envelope) error
|
||||
}
|
||||
|
||||
func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode) *FilterManager {
|
||||
func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter int, envProcessor EnevelopeProcessor, node *filter.WakuFilterLightNode, opts ...SubscribeOptions) *FilterManager {
|
||||
// This fn is being mocked in test
|
||||
mgr := new(FilterManager)
|
||||
mgr.ctx = ctx
|
||||
mgr.opts = opts
|
||||
mgr.logger = logger
|
||||
mgr.minPeersPerFilter = minPeersPerFilter
|
||||
mgr.envProcessor = envProcessor
|
||||
|
@ -151,7 +153,7 @@ func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFi
|
|||
func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) {
|
||||
ctx, cancel := context.WithCancel(mgr.ctx)
|
||||
config := FilterConfig{MaxPeers: mgr.minPeersPerFilter}
|
||||
sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger)
|
||||
sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.opts...)
|
||||
mgr.Lock()
|
||||
mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub}
|
||||
mgr.Unlock()
|
||||
|
|
|
@ -1010,7 +1010,7 @@ github.com/waku-org/go-discover/discover/v5wire
|
|||
github.com/waku-org/go-libp2p-rendezvous
|
||||
github.com/waku-org/go-libp2p-rendezvous/db
|
||||
github.com/waku-org/go-libp2p-rendezvous/pb
|
||||
# github.com/waku-org/go-waku v0.8.1-0.20240823143238-949684092ec5
|
||||
# github.com/waku-org/go-waku v0.8.1-0.20240826153427-69e1b559bc49
|
||||
## explicit; go 1.21
|
||||
github.com/waku-org/go-waku/logging
|
||||
github.com/waku-org/go-waku/tests
|
||||
|
|
|
@ -1125,9 +1125,13 @@ func (w *Waku) Start() error {
|
|||
if w.cfg.LightClient {
|
||||
// Create FilterManager that will main peer connectivity
|
||||
// for installed filters
|
||||
w.filterManager = filterapi.NewFilterManager(w.ctx, w.logger, w.cfg.MinPeersForFilter,
|
||||
w.filterManager = filterapi.NewFilterManager(
|
||||
w.ctx,
|
||||
w.logger,
|
||||
w.cfg.MinPeersForFilter,
|
||||
w,
|
||||
w.node.FilterLightnode())
|
||||
w.node.FilterLightnode(),
|
||||
filterapi.WithBatchInterval(300*time.Millisecond))
|
||||
}
|
||||
|
||||
err = w.setupRelaySubscriptions()
|
||||
|
|
Loading…
Reference in New Issue