chore_: take fixes from go-waku

This commit is contained in:
Prem Chaitanya Prathi 2024-09-21 06:51:59 +05:30
parent c1ce30ad20
commit 2d1350bf4a
13 changed files with 91 additions and 33 deletions

2
go.mod
View File

@ -95,7 +95,7 @@ require (
github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/urfave/cli/v2 v2.27.2
github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da
github.com/waku-org/go-waku v0.8.1-0.20240921010631-7956fad0b135
github.com/wk8/go-ordered-map/v2 v2.1.7
github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1

4
go.sum
View File

@ -2136,8 +2136,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E
github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da h1:bkAJVlJL4Ba83frABWjI9p9MeLGmEHuD/QcjYu3HNbQ=
github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg=
github.com/waku-org/go-waku v0.8.1-0.20240921010631-7956fad0b135 h1:RvHU25kgaDcQLArG6+d1uqDXRhIlc9P9KWJJRJUx6b0=
github.com/waku-org/go-waku v0.8.1-0.20240921010631-7956fad0b135/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=

View File

@ -27,6 +27,8 @@ func (fc FilterConfig) String() string {
return string(jsonStr)
}
const filterSubLoopInterval = 5 * time.Second
type Sub struct {
ContentFilter protocol.ContentFilter
DataCh chan *protocol.Envelope
@ -69,13 +71,7 @@ func defaultOptions() []SubscribeOptions {
}
// Subscribe
func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, opts ...SubscribeOptions) (*Sub, error) {
optList := append(defaultOptions(), opts...)
params := new(subscribeParameters)
for _, opt := range optList {
opt(params)
}
func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilter protocol.ContentFilter, config FilterConfig, log *zap.Logger, params *subscribeParameters) (*Sub, error) {
sub := new(Sub)
sub.id = uuid.NewString()
sub.wf = wf
@ -95,8 +91,9 @@ func Subscribe(ctx context.Context, wf *filter.WakuFilterLightNode, contentFilte
sub.multiplex(subs)
}
}
go sub.subscriptionLoop(params.batchInterval)
// filter subscription loop is to check if target subscriptions for a filter are active and if not
// trigger resubscribe.
go sub.subscriptionLoop(filterSubLoopInterval)
return sub, nil
}

View File

@ -31,7 +31,7 @@ type appFilterMap map[string]filterConfig
type FilterManager struct {
sync.Mutex
ctx context.Context
opts []SubscribeOptions
params *subscribeParameters
minPeersPerFilter int
onlineChecker *onlinechecker.DefaultOnlineChecker
filterSubscriptions map[string]SubDetails // map of aggregated filters to apiSub details
@ -64,7 +64,6 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter
// This fn is being mocked in test
mgr := new(FilterManager)
mgr.ctx = ctx
mgr.opts = opts
mgr.logger = logger
mgr.minPeersPerFilter = minPeersPerFilter
mgr.envProcessor = envProcessor
@ -72,10 +71,17 @@ func NewFilterManager(ctx context.Context, logger *zap.Logger, minPeersPerFilter
mgr.node = node
mgr.onlineChecker = onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker)
mgr.node.SetOnlineChecker(mgr.onlineChecker)
mgr.filterSubBatchDuration = 5 * time.Second
mgr.incompleteFilterBatch = make(map[string]filterConfig)
mgr.filterConfigs = make(appFilterMap)
mgr.waitingToSubQueue = make(chan filterConfig, 100)
//parsing the subscribe params only to read the batchInterval passed.
mgr.params = new(subscribeParameters)
opts = append(defaultOptions(), opts...)
for _, opt := range opts {
opt(mgr.params)
}
mgr.filterSubBatchDuration = mgr.params.batchInterval
go mgr.startFilterSubLoop()
return mgr
}
@ -153,7 +159,7 @@ func (mgr *FilterManager) SubscribeFilter(filterID string, cf protocol.ContentFi
func (mgr *FilterManager) subscribeAndRunLoop(f filterConfig) {
ctx, cancel := context.WithCancel(mgr.ctx)
config := FilterConfig{MaxPeers: mgr.minPeersPerFilter}
sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.opts...)
sub, err := Subscribe(ctx, mgr.node, f.contentFilter, config, mgr.logger, mgr.params)
mgr.Lock()
mgr.filterSubscriptions[f.ID] = SubDetails{cancel, sub}
mgr.Unlock()

View File

@ -292,7 +292,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.opts.onlineChecker, w.opts.prometheusReg, w.log)
w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.opts.prometheusReg, w.log, w.opts.lightpushOpts...)
w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log)
w.store = store.NewWakuStore(w.peermanager, w.timesource, w.log, w.opts.storeRateLimit)
if params.storeFactory != nil {
w.storeFactory = params.storeFactory

View File

@ -38,6 +38,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/time/rate"
)
// Default UserAgent
@ -94,6 +95,8 @@ type WakuNodeParameters struct {
enableStore bool
messageProvider legacy_store.MessageProvider
storeRateLimit rate.Limit
enableRendezvousPoint bool
rendezvousDB *rendezvous.DB
@ -139,6 +142,7 @@ var DefaultWakuNodeOptions = []WakuNodeOption{
WithCircuitRelayParams(2*time.Second, 3*time.Minute),
WithPeerStoreCapacity(DefaultMaxPeerStoreCapacity),
WithOnlineChecker(onlinechecker.NewDefaultOnlineChecker(true)),
WithWakuStoreRateLimit(8), // Value currently set in status.staging
}
// MultiAddresses return the list of multiaddresses configured in the node
@ -458,6 +462,16 @@ func WithWakuFilterFullNode(filterOpts ...filter.Option) WakuNodeOption {
}
}
// WithWakuStoreRateLimit is used to set a default rate limit on which storenodes will
// be sent per peerID to avoid running into a TOO_MANY_REQUESTS (429) error when consuming
// the store protocol from a storenode
func WithWakuStoreRateLimit(value rate.Limit) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.storeRateLimit = value
return nil
}
}
// WithWakuStore enables the Waku V2 Store protocol and if the messages should
// be stored or not in a message provider.
func WithWakuStore() WakuNodeOption {

View File

@ -123,13 +123,11 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb
}
if params.clusterID != 0 {
wakuPX.log.Debug("clusterID is non zero, filtering by shard")
rs, err := wenr.RelaySharding(enrRecord)
if err != nil || rs == nil || !rs.Contains(uint16(params.clusterID), uint16(params.shard)) {
wakuPX.log.Debug("peer doesn't matches filter", zap.Int("shard", params.shard))
continue
}
wakuPX.log.Debug("peer matches filter", zap.Int("shard", params.shard))
}
enodeRecord, err := enode.New(enode.ValidSchemes, enrRecord)

View File

@ -13,6 +13,10 @@ import (
var DefaultRelaySubscriptionBufferSize int = 1024
// trying to match value here https://github.com/vacp2p/nim-libp2p/pull/1077
// note that nim-libp2p has 2 peer queues 1 for priority and other non-priority, whereas go-libp2p seems to have single peer-queue
var DefaultPeerOutboundQSize int = 1024
type RelaySubscribeParameters struct {
dontConsume bool
cacheSize uint
@ -109,6 +113,7 @@ func (w *WakuRelay) defaultPubsubOptions() []pubsub.Option {
pubsub.WithSeenMessagesTTL(2 * time.Minute),
pubsub.WithPeerScore(w.peerScoreParams, w.peerScoreThresholds),
pubsub.WithPeerScoreInspect(w.peerScoreInspector, 6*time.Second),
pubsub.WithPeerOutboundQueueSize(DefaultPeerOutboundQSize),
}
}

View File

@ -19,6 +19,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
"go.uber.org/zap"
"golang.org/x/time/rate"
"google.golang.org/protobuf/proto"
)
@ -69,14 +70,19 @@ type WakuStore struct {
timesource timesource.Timesource
log *zap.Logger
pm *peermanager.PeerManager
defaultRatelimit rate.Limit
rateLimiters map[peer.ID]*rate.Limiter
}
// NewWakuStore is used to instantiate a StoreV3 client
func NewWakuStore(pm *peermanager.PeerManager, timesource timesource.Timesource, log *zap.Logger) *WakuStore {
func NewWakuStore(pm *peermanager.PeerManager, timesource timesource.Timesource, log *zap.Logger, defaultRatelimit rate.Limit) *WakuStore {
s := new(WakuStore)
s.log = log.Named("store-client")
s.timesource = timesource
s.pm = pm
s.defaultRatelimit = defaultRatelimit
s.rateLimiters = make(map[peer.ID]*rate.Limiter)
if pm != nil {
pm.RegisterWakuProtocol(StoreQueryID_v300, StoreENRField)
@ -171,7 +177,7 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ
return nil, err
}
response, err := s.queryFrom(ctx, storeRequest, params.selectedPeer)
response, err := s.queryFrom(ctx, storeRequest, params)
if err != nil {
return nil, err
}
@ -211,7 +217,7 @@ func (s *WakuStore) Exists(ctx context.Context, messageHash wpb.MessageHash, opt
return len(result.messages) != 0, nil
}
func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) {
func (s *WakuStore) next(ctx context.Context, r *Result, opts ...RequestOption) (*Result, error) {
if r.IsComplete() {
return &Result{
store: s,
@ -223,11 +229,22 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) {
}, nil
}
params := new(Parameters)
params.selectedPeer = r.PeerID()
optList := DefaultOptions()
optList = append(optList, opts...)
for _, opt := range optList {
err := opt(params)
if err != nil {
return nil, err
}
}
storeRequest := proto.Clone(r.storeRequest).(*pb.StoreQueryRequest)
storeRequest.RequestId = hex.EncodeToString(protocol.GenerateRequestID())
storeRequest.PaginationCursor = r.Cursor()
response, err := s.queryFrom(ctx, storeRequest, r.PeerID())
response, err := s.queryFrom(ctx, storeRequest, params)
if err != nil {
return nil, err
}
@ -245,16 +262,28 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) {
}
func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRequest, selectedPeer peer.ID) (*pb.StoreQueryResponse, error) {
logger := s.log.With(logging.HostID("peer", selectedPeer), zap.String("requestId", hex.EncodeToString([]byte(storeRequest.RequestId))))
func (s *WakuStore) queryFrom(ctx context.Context, storeRequest *pb.StoreQueryRequest, params *Parameters) (*pb.StoreQueryResponse, error) {
logger := s.log.With(logging.HostID("peer", params.selectedPeer), zap.String("requestId", hex.EncodeToString([]byte(storeRequest.RequestId))))
logger.Debug("sending store request")
stream, err := s.h.NewStream(ctx, selectedPeer, StoreQueryID_v300)
if !params.skipRatelimit {
rateLimiter, ok := s.rateLimiters[params.selectedPeer]
if !ok {
rateLimiter = rate.NewLimiter(s.defaultRatelimit, 1)
s.rateLimiters[params.selectedPeer] = rateLimiter
}
err := rateLimiter.Wait(ctx)
if err != nil {
return nil, err
}
}
stream, err := s.h.NewStream(ctx, params.selectedPeer, StoreQueryID_v300)
if err != nil {
logger.Error("creating stream to peer", zap.Error(err))
if ps, ok := s.h.Peerstore().(peerstore.WakuPeerstore); ok {
ps.AddConnFailure(selectedPeer)
ps.AddConnFailure(params.selectedPeer)
}
return nil, err
}

View File

@ -19,6 +19,7 @@ type Parameters struct {
pageLimit uint64
forward bool
includeData bool
skipRatelimit bool
}
type RequestOption func(*Parameters) error
@ -115,6 +116,14 @@ func IncludeData(v bool) RequestOption {
}
}
// Skips the rate limiting for the current request (might cause the store request to fail with TOO_MANY_REQUESTS (429))
func SkipRateLimit() RequestOption {
return func(params *Parameters) error {
params.skipRatelimit = true
return nil
}
}
// Default options to be used when querying a store node for results
func DefaultOptions() []RequestOption {
return []RequestOption{

View File

@ -2,6 +2,7 @@ package pb
import (
"errors"
"fmt"
)
// MaxContentTopics is the maximum number of allowed contenttopics in a query
@ -10,7 +11,6 @@ const MaxContentTopics = 10
var (
errMissingRequestID = errors.New("missing RequestId field")
errMessageHashOtherFields = errors.New("cannot use MessageHashes with ContentTopics/PubsubTopic")
errRequestIDMismatch = errors.New("requestID in response does not match request")
errMaxContentTopics = errors.New("exceeds the maximum number of ContentTopics allowed")
errEmptyContentTopic = errors.New("one or more content topics specified is empty")
errMissingPubsubTopic = errors.New("missing PubsubTopic field")
@ -57,8 +57,8 @@ func (x *StoreQueryRequest) Validate() error {
}
func (x *StoreQueryResponse) Validate(requestID string) error {
if x.RequestId != "" && x.RequestId != requestID {
return errRequestIDMismatch
if x.RequestId != "" && x.RequestId != "N/A" && x.RequestId != requestID {
return fmt.Errorf("requestID %s in response does not match requestID in request %s", x.RequestId, requestID)
}
if x.StatusCode == nil {

View File

@ -39,14 +39,14 @@ func (r *Result) Response() *pb.StoreQueryResponse {
return r.storeResponse
}
func (r *Result) Next(ctx context.Context) error {
func (r *Result) Next(ctx context.Context, opts ...RequestOption) error {
if r.cursor == nil {
r.done = true
r.messages = nil
return nil
}
newResult, err := r.store.next(ctx, r)
newResult, err := r.store.next(ctx, r, opts...)
if err != nil {
return err
}

2
vendor/modules.txt vendored
View File

@ -1007,7 +1007,7 @@ github.com/waku-org/go-discover/discover/v5wire
github.com/waku-org/go-libp2p-rendezvous
github.com/waku-org/go-libp2p-rendezvous/db
github.com/waku-org/go-libp2p-rendezvous/pb
# github.com/waku-org/go-waku v0.8.1-0.20240904143057-f9e7895202da
# github.com/waku-org/go-waku v0.8.1-0.20240921010631-7956fad0b135
## explicit; go 1.21
github.com/waku-org/go-waku/logging
github.com/waku-org/go-waku/tests