diff --git a/go.mod b/go.mod index ee911ff22..567f093e6 100644 --- a/go.mod +++ b/go.mod @@ -88,7 +88,7 @@ require ( github.com/gorilla/sessions v1.2.1 github.com/gorilla/websocket v1.5.3 github.com/ipfs/go-log/v2 v2.5.1 - github.com/jellydator/ttlcache/v3 v3.2.0 + github.com/jellydator/ttlcache/v3 v3.3.0 github.com/jmoiron/sqlx v1.3.5 github.com/klauspost/reedsolomon v1.12.1 github.com/ladydascalie/currency v1.6.0 @@ -97,7 +97,7 @@ require ( github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20241128183857-1608cf2b0b90 + github.com/waku-org/go-waku v0.8.1-0.20241203032230-6550ff35bc71 github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 diff --git a/go.sum b/go.sum index 3917ff3c3..d7b5baeee 100644 --- a/go.sum +++ b/go.sum @@ -1238,8 +1238,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk= github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU= github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU= -github.com/jellydator/ttlcache/v3 v3.2.0 h1:6lqVJ8X3ZaUwvzENqPAobDsXNExfUJd61u++uW8a3LE= -github.com/jellydator/ttlcache/v3 v3.2.0/go.mod h1:hi7MGFdMAwZna5n2tuvh63DvFLzVKySzCVW6+0gA2n4= +github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc= +github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4= @@ -2152,8 +2152,8 @@ github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27 github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27f/go.mod h1:Oi0zw9aw8/Y5GC99zt+Ef2gYAl+0nZlwdJonDyOz/sE= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20241128183857-1608cf2b0b90 h1:p7tehUW7f+D6pvMJYop2yJV03SJU2fFUusmSnKL3uow= -github.com/waku-org/go-waku v0.8.1-0.20241128183857-1608cf2b0b90/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74= +github.com/waku-org/go-waku v0.8.1-0.20241203032230-6550ff35bc71 h1:P9sQncEeeBqBRQEtiLdgQe5oWcTlAV5IVA5VGMqGslc= +github.com/waku-org/go-waku v0.8.1-0.20241203032230-6550ff35bc71/go.mod h1:zYhLgqwBE3sGP2vP+aNiM5moOKlf/uSoIv36puAj9WI= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/telemetry/client.go b/telemetry/client.go index 85cbbf218..5ded3e7b7 100644 --- a/telemetry/client.go +++ b/telemetry/client.go @@ -58,6 +58,8 @@ const ( MissedRelevantMessageMetric TelemetryType = "MissedRelevantMessages" // MVDS ack received for a sent message MessageDeliveryConfirmedMetric TelemetryType = "MessageDeliveryConfirmed" + // Total number and size of Waku messages sent by this node + SentMessageTotalMetric TelemetryType = "SentMessageTotal" ) const MaxRetryCache = 5000 @@ -145,6 +147,10 @@ func (c *Client) PushMessageDeliveryConfirmed(ctx context.Context, messageHash s c.processAndPushTelemetry(ctx, MessageDeliveryConfirmed{MessageHash: messageHash}) } +func (c *Client) PushSentMessageTotal(ctx context.Context, messageSize uint32) { + c.processAndPushTelemetry(ctx, SentMessageTotal{Size: messageSize}) +} + type ReceivedMessages struct { Filter transport.Filter SSHMessage *types.Message @@ -196,6 +202,10 @@ type MessageDeliveryConfirmed struct { MessageHash string } +type SentMessageTotal struct { + Size uint32 +} + type Client struct { serverURL string httpClient *http.Client @@ -392,6 +402,12 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{}) TelemetryType: MessageDeliveryConfirmedMetric, TelemetryData: c.ProcessMessageDeliveryConfirmed(v), } + case SentMessageTotal: + telemetryRequest = TelemetryRequest{ + Id: c.nextId, + TelemetryType: SentMessageTotalMetric, + TelemetryData: c.ProcessSentMessageTotal(v), + } default: c.logger.Error("Unknown telemetry data type") return @@ -567,6 +583,12 @@ func (c *Client) ProcessMessageDeliveryConfirmed(messageDeliveryConfirmed Messag return c.marshalPostBody(postBody) } +func (c *Client) ProcessSentMessageTotal(sentMessageTotal SentMessageTotal) *json.RawMessage { + postBody := c.commonPostBody() + postBody["size"] = sentMessageTotal.Size + return c.marshalPostBody(postBody) +} + // Helper function to marshal post body and handle errors func (c *Client) marshalPostBody(postBody map[string]interface{}) *json.RawMessage { body, err := json.Marshal(postBody) diff --git a/telemetry/client_test.go b/telemetry/client_test.go index a549da455..7c1e6e784 100644 --- a/telemetry/client_test.go +++ b/telemetry/client_test.go @@ -597,3 +597,17 @@ func TestProcessDialFailure(t *testing.T) { } runTestCase(t, tc) } + +func TestProcessSentMessageTotal(t *testing.T) { + tc := testCase{ + name: "SentMessageTotal", + input: SentMessageTotal{ + Size: uint32(1234), + }, + expectedType: SentMessageTotalMetric, + expectedFields: map[string]interface{}{ + "size": float64(1234), + }, + } + runTestCase(t, tc) +} diff --git a/vendor/github.com/jellydator/ttlcache/v3/README.md b/vendor/github.com/jellydator/ttlcache/v3/README.md index 3a557b030..a17cb2437 100644 --- a/vendor/github.com/jellydator/ttlcache/v3/README.md +++ b/vendor/github.com/jellydator/ttlcache/v3/README.md @@ -10,7 +10,8 @@ - Type parameters - Item expiration and automatic deletion - Automatic expiration time extension on each `Get` call -- `Loader` interface that may be used to load/lazily initialize missing cache +- `Loader` interface that may be used to load/lazily initialize missing cache +- Thread Safe items - Event handlers (insertion and eviction) - Metrics diff --git a/vendor/github.com/jellydator/ttlcache/v3/cache.go b/vendor/github.com/jellydator/ttlcache/v3/cache.go index 1ad3afbec..1b9e72ef0 100644 --- a/vendor/github.com/jellydator/ttlcache/v3/cache.go +++ b/vendor/github.com/jellydator/ttlcache/v3/cache.go @@ -133,7 +133,7 @@ func (c *Cache[K, V]) set(key K, value V, ttl time.Duration) *Item[K, V] { ttl = c.options.ttl } - elem := c.get(key, false) + elem := c.get(key, false, true) if elem != nil { // update/overwrite an existing item item := elem.Value.(*Item[K, V]) @@ -176,14 +176,14 @@ func (c *Cache[K, V]) set(key K, value V, ttl time.Duration) *Item[K, V] { // It returns nil if the item is not found or is expired. // Not safe for concurrent use by multiple goroutines without additional // locking. -func (c *Cache[K, V]) get(key K, touch bool) *list.Element { +func (c *Cache[K, V]) get(key K, touch bool, includeExpired bool) *list.Element { elem := c.items.values[key] if elem == nil { return nil } item := elem.Value.(*Item[K, V]) - if item.isExpiredUnsafe() { + if !includeExpired && item.isExpiredUnsafe() { return nil } @@ -218,7 +218,7 @@ func (c *Cache[K, V]) getWithOpts(key K, lockAndLoad bool, opts ...Option[K, V]) c.items.mu.Lock() } - elem := c.get(key, !getOpts.disableTouchOnHit) + elem := c.get(key, !getOpts.disableTouchOnHit, false) if lockAndLoad { c.items.mu.Unlock() @@ -339,8 +339,8 @@ func (c *Cache[K, V]) Has(key K) bool { c.items.mu.RLock() defer c.items.mu.RUnlock() - _, ok := c.items.values[key] - return ok + elem, ok := c.items.values[key] + return ok && !elem.Value.(*Item[K, V]).isExpiredUnsafe() } // GetOrSet retrieves an item from the cache by the provided key. @@ -436,26 +436,66 @@ func (c *Cache[K, V]) DeleteExpired() { // If the item is not found, the method is no-op. func (c *Cache[K, V]) Touch(key K) { c.items.mu.Lock() - c.get(key, true) + c.get(key, true, false) c.items.mu.Unlock() } -// Len returns the total number of items in the cache. +// Len returns the number of unexpired items in the cache. func (c *Cache[K, V]) Len() int { c.items.mu.RLock() defer c.items.mu.RUnlock() - return len(c.items.values) + total := c.items.expQueue.Len() + if total == 0 { + return 0 + } + + // search the heap-based expQueue by BFS + countExpired := func() int { + var ( + q []int + res int + ) + + item := c.items.expQueue[0].Value.(*Item[K, V]) + if !item.isExpiredUnsafe() { + return res + } + + q = append(q, 0) + for len(q) > 0 { + pop := q[0] + q = q[1:] + res++ + + for i := 1; i <= 2; i++ { + idx := 2*pop + i + if idx >= total { + break + } + + item = c.items.expQueue[idx].Value.(*Item[K, V]) + if item.isExpiredUnsafe() { + q = append(q, idx) + } + } + } + return res + } + + return total - countExpired() } -// Keys returns all keys currently present in the cache. +// Keys returns all unexpired keys in the cache. func (c *Cache[K, V]) Keys() []K { c.items.mu.RLock() defer c.items.mu.RUnlock() - res := make([]K, 0, len(c.items.values)) - for k := range c.items.values { - res = append(res, k) + res := make([]K, 0) + for k, elem := range c.items.values { + if !elem.Value.(*Item[K, V]).isExpiredUnsafe() { + res = append(res, k) + } } return res @@ -467,18 +507,18 @@ func (c *Cache[K, V]) Items() map[K]*Item[K, V] { c.items.mu.RLock() defer c.items.mu.RUnlock() - items := make(map[K]*Item[K, V], len(c.items.values)) - for k := range c.items.values { - item := c.get(k, false) - if item != nil { - items[k] = item.Value.(*Item[K, V]) + items := make(map[K]*Item[K, V]) + for k, elem := range c.items.values { + item := elem.Value.(*Item[K, V]) + if item != nil && !item.isExpiredUnsafe() { + items[k] = item } } return items } -// Range calls fn for each item present in the cache. If fn returns false, +// Range calls fn for each unexpired item in the cache. If fn returns false, // Range stops the iteration. func (c *Cache[K, V]) Range(fn func(item *Item[K, V]) bool) { c.items.mu.RLock() @@ -491,9 +531,10 @@ func (c *Cache[K, V]) Range(fn func(item *Item[K, V]) bool) { for item := c.items.lru.Front(); item != c.items.lru.Back().Next(); item = item.Next() { i := item.Value.(*Item[K, V]) + expired := i.isExpiredUnsafe() c.items.mu.RUnlock() - if !fn(i) { + if !expired && !fn(i) { return } @@ -503,6 +544,32 @@ func (c *Cache[K, V]) Range(fn func(item *Item[K, V]) bool) { } } +// RangeBackwards calls fn for each unexpired item in the cache in reverse order. +// If fn returns false, RangeBackwards stops the iteration. +func (c *Cache[K, V]) RangeBackwards(fn func(item *Item[K, V]) bool) { + c.items.mu.RLock() + + // Check if cache is empty + if c.items.lru.Len() == 0 { + c.items.mu.RUnlock() + return + } + + for item := c.items.lru.Back(); item != c.items.lru.Front().Prev(); item = item.Prev() { + i := item.Value.(*Item[K, V]) + expired := i.isExpiredUnsafe() + c.items.mu.RUnlock() + + if !expired && !fn(i) { + return + } + + if item.Prev() != nil { + c.items.mu.RLock() + } + } +} + // Metrics returns the metrics of the cache. func (c *Cache[K, V]) Metrics() Metrics { c.metricsMu.RLock() diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_sender.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_sender.go index c457589e7..62dcb4af7 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_sender.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_sender.go @@ -6,6 +6,8 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/libp2p/go-libp2p/core/event" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" @@ -53,6 +55,12 @@ type MessageSender struct { messageSentCheck ISentCheck rateLimiter *PublishRateLimiter logger *zap.Logger + evtMessageSent event.Emitter +} + +type MessageSent struct { + Size uint32 // Size of payload in bytes + Timestamp int64 } type Request struct { @@ -96,6 +104,15 @@ func (ms *MessageSender) WithRateLimiting(rateLimiter *PublishRateLimiter) *Mess return ms } +func (ms *MessageSender) WithMessageSentEmitter(host host.Host) *MessageSender { + evtMessageSent, err := host.EventBus().Emitter(new(MessageSent)) + if err != nil { + ms.logger.Error("failed to create message sent emitter", zap.Error(err)) + } + ms.evtMessageSent = evtMessageSent + return ms +} + func (ms *MessageSender) Send(req *Request) error { logger := ms.logger.With( zap.Stringer("envelopeHash", req.envelope.Hash()), @@ -149,6 +166,16 @@ func (ms *MessageSender) Send(req *Request) error { ) } + if ms.evtMessageSent != nil { + err := ms.evtMessageSent.Emit(MessageSent{ + Size: uint32(len(req.envelope.Message().Payload)), + Timestamp: req.envelope.Message().GetTimestamp(), + }) + if err != nil { + logger.Error("failed to emit message sent event", zap.Error(err)) + } + } + return nil } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go index 3b56d4700..3d81048d6 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go @@ -54,6 +54,7 @@ type WakuFilterLightNode struct { log *zap.Logger subscriptions *subscription.SubscriptionsMap pm *peermanager.PeerManager + limiter *utils.RateLimiter peerPingInterval time.Duration } @@ -89,6 +90,7 @@ func NewWakuFilterLightNode( onlineChecker onlinechecker.OnlineChecker, reg prometheus.Registerer, log *zap.Logger, + opts ...LightNodeOption, ) *WakuFilterLightNode { wf := new(WakuFilterLightNode) wf.log = log.Named("filterv2-lightnode") @@ -99,6 +101,14 @@ func NewWakuFilterLightNode( wf.CommonService = service.NewCommonService() wf.metrics = newMetrics(reg) wf.peerPingInterval = 1 * time.Minute + + params := &LightNodeParameters{} + opts = append(DefaultLightNodeOptions(), opts...) + for _, opt := range opts { + opt(params) + } + wf.limiter = utils.NewRateLimiter(params.limitR, params.limitB) + return wf } @@ -155,6 +165,14 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea logger := wf.log.With(logging.HostID("peerID", peerID)) + if !wf.limiter.Allow(peerID) { + wf.metrics.RecordError(rateLimitFailure) + if err := stream.Reset(); err != nil { + wf.log.Error("resetting connection", zap.Error(err)) + } + return + } + if !wf.subscriptions.IsSubscribedTo(peerID) { logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID)) wf.metrics.RecordError(unknownPeerMessagePush) @@ -287,7 +305,7 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte, } - if filterSubscribeResponse.RequestId != request.RequestId { + if filterSubscribeResponse.RequestId != "N/A" && filterSubscribeResponse.RequestId != request.RequestId { wf.log.Error("requestID mismatch", zap.String("expected", request.RequestId), zap.String("received", filterSubscribeResponse.RequestId)) wf.metrics.RecordError(requestIDMismatch) err := NewFilterError(300, "request_id_mismatch") diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/metrics.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/metrics.go index 51e3b356d..89ac8e4aa 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/metrics.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/metrics.go @@ -96,6 +96,7 @@ var ( peerNotFoundFailure metricsErrCategory = "peer_not_found_failure" writeResponseFailure metricsErrCategory = "write_response_failure" pushTimeoutFailure metricsErrCategory = "push_timeout_failure" + rateLimitFailure metricsErrCategory = "ratelimit_failure" ) // RecordError increases the counter for different error types diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/options.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/options.go index f19876212..bde105e47 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/options.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/options.go @@ -11,6 +11,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol" "go.uber.org/zap" + "golang.org/x/time/rate" ) func (old *FilterSubscribeParameters) Copy() *FilterSubscribeParameters { @@ -57,13 +58,35 @@ type ( Timeout time.Duration MaxSubscribers int pm *peermanager.PeerManager + limitR rate.Limit + limitB int } Option func(*FilterParameters) + LightNodeParameters struct { + limitR rate.Limit + limitB int + } + + LightNodeOption func(*LightNodeParameters) + FilterSubscribeOption func(*FilterSubscribeParameters) error ) +func WithLightNodeRateLimiter(r rate.Limit, b int) LightNodeOption { + return func(params *LightNodeParameters) { + params.limitR = r + params.limitB = b + } +} + +func DefaultLightNodeOptions() []LightNodeOption { + return []LightNodeOption{ + WithLightNodeRateLimiter(1, 1), + } +} + func WithTimeout(timeout time.Duration) Option { return func(params *FilterParameters) { params.Timeout = timeout @@ -202,9 +225,17 @@ func WithPeerManager(pm *peermanager.PeerManager) Option { } } +func WithFullNodeRateLimiter(r rate.Limit, b int) Option { + return func(params *FilterParameters) { + params.limitR = r + params.limitB = b + } +} + func DefaultOptions() []Option { return []Option{ WithTimeout(DefaultIdleSubscriptionTimeout), WithMaxSubscribers(DefaultMaxSubscribers), + WithFullNodeRateLimiter(1, 1), } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go index bacfe85cc..82c4c47da 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go @@ -37,9 +37,9 @@ type ( metrics Metrics log *zap.Logger *service.CommonService - subscriptions *SubscribersMap - pm *peermanager.PeerManager - + subscriptions *SubscribersMap + pm *peermanager.PeerManager + limiter *utils.RateLimiter maxSubscriptions int } ) @@ -56,6 +56,7 @@ func NewWakuFilterFullNode(timesource timesource.Timesource, reg prometheus.Regi opt(params) } + wf.limiter = utils.NewRateLimiter(params.limitR, params.limitB) wf.CommonService = service.NewCommonService() wf.metrics = newMetrics(reg) wf.subscriptions = NewSubscribersMap(params.Timeout) @@ -93,7 +94,14 @@ func (wf *WakuFilterFullNode) start(sub *relay.Subscription) error { func (wf *WakuFilterFullNode) onRequest(ctx context.Context) func(network.Stream) { return func(stream network.Stream) { - logger := wf.log.With(logging.HostID("peer", stream.Conn().RemotePeer())) + peerID := stream.Conn().RemotePeer() + logger := wf.log.With(logging.HostID("peer", peerID)) + + if !wf.limiter.Allow(peerID) { + wf.metrics.RecordError(rateLimitFailure) + wf.reply(ctx, stream, &pb.FilterSubscribeRequest{RequestId: "N/A"}, http.StatusTooManyRequests, "filter request rejected due rate limit exceeded") + return + } reader := pbio.NewDelimitedReader(stream, math.MaxInt32) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/test_utils.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/test_utils.go index 015cb352e..88b9e04e6 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/test_utils.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/test_utils.go @@ -22,6 +22,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" + "golang.org/x/time/rate" ) type LightNodeData struct { @@ -133,7 +134,7 @@ func (s *FilterTestSuite) GetWakuFilterFullNode(topic string, withRegisterAll bo nodeData := s.GetWakuRelay(topic) - node2Filter := NewWakuFilterFullNode(timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log) + node2Filter := NewWakuFilterFullNode(timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log, WithFullNodeRateLimiter(rate.Inf, 0)) node2Filter.SetHost(nodeData.FullNodeHost) var sub *relay.Subscription @@ -166,7 +167,7 @@ func (s *FilterTestSuite) GetWakuFilterLightNode() LightNodeData { b := relay.NewBroadcaster(10) s.Require().NoError(b.Start(context.Background())) pm := peermanager.NewPeerManager(5, 5, nil, nil, true, s.Log) - filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), onlinechecker.NewDefaultOnlineChecker(true), prometheus.DefaultRegisterer, s.Log) + filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), onlinechecker.NewDefaultOnlineChecker(true), prometheus.DefaultRegisterer, s.Log, WithLightNodeRateLimiter(rate.Inf, 0)) filterPush.SetHost(host) pm.SetHost(host) return LightNodeData{filterPush, host} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go index 9d6744315..7e411a4ac 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go @@ -24,7 +24,6 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" - "golang.org/x/time/rate" ) // LightPushID_v20beta1 is the current Waku LightPush protocol identifier @@ -40,7 +39,7 @@ var ( type WakuLightPush struct { h host.Host relay *relay.WakuRelay - limiter *rate.Limiter + limiter *utils.RateLimiter cancel context.CancelFunc pm *peermanager.PeerManager metrics Metrics @@ -59,11 +58,12 @@ func NewWakuLightPush(relay *relay.WakuRelay, pm *peermanager.PeerManager, reg p wakuLP.metrics = newMetrics(reg) params := &LightpushParameters{} + opts = append(DefaultLightpushOptions(), opts...) for _, opt := range opts { opt(params) } - wakuLP.limiter = params.limiter + wakuLP.limiter = utils.NewRateLimiter(params.limitR, params.limitB) return wakuLP } @@ -106,7 +106,7 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(network.Stream) Response: &pb.PushResponse{}, } - if wakuLP.limiter != nil && !wakuLP.limiter.Allow() { + if !wakuLP.limiter.Allow(stream.Conn().RemotePeer()) { wakuLP.metrics.RecordError(rateLimitFailure) responseMsg := "exceeds the rate limit" responsePushRPC.Response.Info = &responseMsg diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush_option.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush_option.go index 7ed043705..b9740ab4c 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush_option.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -14,7 +14,8 @@ import ( ) type LightpushParameters struct { - limiter *rate.Limiter + limitR rate.Limit + limitB int } type Option func(*LightpushParameters) @@ -22,7 +23,14 @@ type Option func(*LightpushParameters) // WithRateLimiter is an option used to specify a rate limiter for requests received in lightpush protocol func WithRateLimiter(r rate.Limit, b int) Option { return func(params *LightpushParameters) { - params.limiter = rate.NewLimiter(r, b) + params.limitR = r + params.limitB = b + } +} + +func DefaultLightpushOptions() []Option { + return []Option{ + WithRateLimiter(1, 1), } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/protocol.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/protocol.go index 5a3821b96..dc181fb42 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/protocol.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/protocol.go @@ -23,7 +23,6 @@ import ( "github.com/waku-org/go-waku/waku/v2/service" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" - "golang.org/x/time/rate" ) // PeerExchangeID_v20alpha1 is the current Waku Peer Exchange protocol identifier @@ -51,7 +50,7 @@ type WakuPeerExchange struct { peerConnector PeerConnector enrCache *enrCache - limiter *rate.Limiter + limiter *utils.RateLimiter } // NewWakuPeerExchange returns a new instance of WakuPeerExchange struct @@ -68,11 +67,12 @@ func NewWakuPeerExchange(disc *discv5.DiscoveryV5, clusterID uint16, peerConnect wakuPX.CommonService = service.NewCommonService() params := &PeerExchangeParameters{} + opts = append(DefaultPeerExchangeOptions(), opts...) for _, opt := range opts { opt(params) } - wakuPX.limiter = params.limiter + wakuPX.limiter = utils.NewRateLimiter(params.limiterR, params.limiterB) return wakuPX, nil } @@ -97,9 +97,10 @@ func (wakuPX *WakuPeerExchange) start() error { func (wakuPX *WakuPeerExchange) onRequest() func(network.Stream) { return func(stream network.Stream) { - logger := wakuPX.log.With(logging.HostID("peer", stream.Conn().RemotePeer())) + peerID := stream.Conn().RemotePeer() + logger := wakuPX.log.With(logging.HostID("peer", peerID)) - if wakuPX.limiter != nil && !wakuPX.limiter.Allow() { + if wakuPX.limiter != nil && !wakuPX.limiter.Allow(peerID) { wakuPX.metrics.RecordError(rateLimitFailure) wakuPX.log.Info("exceeds the rate limit") // TODO: peer exchange protocol should contain an err field diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go index c08988091..c25078b73 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go @@ -12,7 +12,8 @@ import ( ) type PeerExchangeParameters struct { - limiter *rate.Limiter + limiterR rate.Limit + limiterB int } type Option func(*PeerExchangeParameters) @@ -20,7 +21,14 @@ type Option func(*PeerExchangeParameters) // WithRateLimiter is an option used to specify a rate limiter for requests received in lightpush protocol func WithRateLimiter(r rate.Limit, b int) Option { return func(params *PeerExchangeParameters) { - params.limiter = rate.NewLimiter(r, b) + params.limiterR = r + params.limiterB = b + } +} + +func DefaultPeerExchangeOptions() []Option { + return []Option{ + WithRateLimiter(1, 1), } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/utils/limiter.go b/vendor/github.com/waku-org/go-waku/waku/v2/utils/limiter.go new file mode 100644 index 000000000..a587659f8 --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/waku/v2/utils/limiter.go @@ -0,0 +1,69 @@ +package utils + +import ( + "context" + "sync" + "time" + + "github.com/jellydator/ttlcache/v3" + "github.com/libp2p/go-libp2p/core/peer" + "golang.org/x/time/rate" +) + +type RateLimiter struct { + sync.Mutex + limiters *ttlcache.Cache[peer.ID, *rate.Limiter] + r rate.Limit + b int +} + +func NewRateLimiter(r rate.Limit, b int) *RateLimiter { + return &RateLimiter{ + r: r, + b: b, + limiters: ttlcache.New[peer.ID, *rate.Limiter]( + ttlcache.WithTTL[peer.ID, *rate.Limiter](30 * time.Minute), + ), + } +} + +func (r *RateLimiter) Start(ctx context.Context) { + go func() { + t := time.NewTicker(time.Hour) + defer t.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-t.C: + r.Lock() + r.limiters.DeleteExpired() + r.Unlock() + } + } + }() +} + +func (r *RateLimiter) getOrCreate(peerID peer.ID) *rate.Limiter { + r.Lock() + defer r.Unlock() + + var limiter *rate.Limiter + if !r.limiters.Has(peerID) { + limiter = rate.NewLimiter(r.r, r.b) + r.limiters.Set(peerID, limiter, ttlcache.DefaultTTL) + } else { + v := r.limiters.Get(peerID) + limiter = v.Value() + } + return limiter +} + +func (r *RateLimiter) Allow(peerID peer.ID) bool { + return r.getOrCreate(peerID).Allow() +} + +func (r *RateLimiter) Wait(ctx context.Context, peerID peer.ID) error { + return r.getOrCreate(peerID).Wait(ctx) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index a30c3e7c4..eece9e4f7 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -448,7 +448,7 @@ github.com/jackpal/go-nat-pmp # github.com/jbenet/go-temp-err-catcher v0.1.0 ## explicit; go 1.13 github.com/jbenet/go-temp-err-catcher -# github.com/jellydator/ttlcache/v3 v3.2.0 +# github.com/jellydator/ttlcache/v3 v3.3.0 ## explicit; go 1.18 github.com/jellydator/ttlcache/v3 # github.com/jinzhu/copier v0.0.0-20190924061706-b57f9002281a @@ -1044,7 +1044,7 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20241128183857-1608cf2b0b90 +# github.com/waku-org/go-waku v0.8.1-0.20241203032230-6550ff35bc71 ## explicit; go 1.21 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/tests diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 7566efda6..b04866f6e 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -55,6 +55,7 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/metrics" + commonapi "github.com/waku-org/go-waku/waku/v2/api/common" filterapi "github.com/waku-org/go-waku/waku/v2/api/filter" "github.com/waku-org/go-waku/waku/v2/api/history" "github.com/waku-org/go-waku/waku/v2/api/missing" @@ -72,8 +73,6 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/waku-org/go-waku/waku/v2/utils" - commonapi "github.com/waku-org/go-waku/waku/v2/api/common" - gocommon "github.com/status-im/status-go/common" "github.com/status-im/status-go/connection" "github.com/status-im/status-go/eth-node/types" @@ -119,6 +118,7 @@ type ITelemetryClient interface { PushMissedMessage(ctx context.Context, envelope *protocol.Envelope) PushMissedRelevantMessage(ctx context.Context, message *common.ReceivedMessage) PushMessageDeliveryConfirmed(ctx context.Context, messageHash string) + PushSentMessageTotal(ctx context.Context, messageSize uint32) } // Waku represents a dark communication interface through the Ethereum @@ -1114,12 +1114,18 @@ func (w *Waku) Start() error { peerTelemetryTicker := time.NewTicker(peerTelemetryTickerInterval) defer peerTelemetryTicker.Stop() - sub, err := w.node.Host().EventBus().Subscribe(new(utils.DialError)) + dialErrSub, err := w.node.Host().EventBus().Subscribe(new(utils.DialError)) if err != nil { w.logger.Error("failed to subscribe to dial errors", zap.Error(err)) return } - defer sub.Close() + defer dialErrSub.Close() + + messageSentSub, err := w.node.Host().EventBus().Subscribe(new(publish.MessageSent)) + if err != nil { + w.logger.Error("failed to subscribe to message sent events", zap.Error(err)) + return + } for { select { @@ -1127,11 +1133,13 @@ func (w *Waku) Start() error { return case <-peerTelemetryTicker.C: w.reportPeerMetrics() - case dialErr := <-sub.Out(): + case dialErr := <-dialErrSub.Out(): errors := common.ParseDialErrors(dialErr.(utils.DialError).Err.Error()) for _, dialError := range errors { w.statusTelemetryClient.PushDialFailure(w.ctx, common.DialError{ErrType: dialError.ErrType, ErrMsg: dialError.ErrMsg, Protocols: dialError.Protocols}) } + case messageSent := <-messageSentSub.Out(): + w.statusTelemetryClient.PushSentMessageTotal(w.ctx, messageSent.(publish.MessageSent).Size) } } }() @@ -1301,6 +1309,10 @@ func (w *Waku) startMessageSender() error { return err } + if w.cfg.TelemetryServerURL != "" { + sender.WithMessageSentEmitter(w.node.Host()) + } + if w.cfg.EnableStoreConfirmationForMessagesSent { msgStoredChan := make(chan gethcommon.Hash, 1000) msgExpiredChan := make(chan gethcommon.Hash, 1000)