mirror of
https://github.com/status-im/status-go.git
synced 2025-01-17 18:22:13 +00:00
feat(telem)_: track total waku message bandwidth (#6157)
This commit is contained in:
parent
d07e61f615
commit
e7cc535292
4
go.mod
4
go.mod
@ -88,7 +88,7 @@ require (
|
||||
github.com/gorilla/sessions v1.2.1
|
||||
github.com/gorilla/websocket v1.5.3
|
||||
github.com/ipfs/go-log/v2 v2.5.1
|
||||
github.com/jellydator/ttlcache/v3 v3.2.0
|
||||
github.com/jellydator/ttlcache/v3 v3.3.0
|
||||
github.com/jmoiron/sqlx v1.3.5
|
||||
github.com/klauspost/reedsolomon v1.12.1
|
||||
github.com/ladydascalie/currency v1.6.0
|
||||
@ -97,7 +97,7 @@ require (
|
||||
github.com/schollz/peerdiscovery v1.7.0
|
||||
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
|
||||
github.com/urfave/cli/v2 v2.27.2
|
||||
github.com/waku-org/go-waku v0.8.1-0.20241128183857-1608cf2b0b90
|
||||
github.com/waku-org/go-waku v0.8.1-0.20241203032230-6550ff35bc71
|
||||
github.com/wk8/go-ordered-map/v2 v2.1.7
|
||||
github.com/yeqown/go-qrcode/v2 v2.2.1
|
||||
github.com/yeqown/go-qrcode/writer/standard v1.2.1
|
||||
|
8
go.sum
8
go.sum
@ -1238,8 +1238,8 @@ github.com/jbenet/go-temp-err-catcher v0.1.0 h1:zpb3ZH6wIE8Shj2sKS+khgRvf7T7RABo
|
||||
github.com/jbenet/go-temp-err-catcher v0.1.0/go.mod h1:0kJRvmDZXNMIiJirNPEYfhpPwbGVtZVWC34vc5WLsDk=
|
||||
github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU=
|
||||
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
|
||||
github.com/jellydator/ttlcache/v3 v3.2.0 h1:6lqVJ8X3ZaUwvzENqPAobDsXNExfUJd61u++uW8a3LE=
|
||||
github.com/jellydator/ttlcache/v3 v3.2.0/go.mod h1:hi7MGFdMAwZna5n2tuvh63DvFLzVKySzCVW6+0gA2n4=
|
||||
github.com/jellydator/ttlcache/v3 v3.3.0 h1:BdoC9cE81qXfrxeb9eoJi9dWrdhSuwXMAnHTbnBm4Wc=
|
||||
github.com/jellydator/ttlcache/v3 v3.3.0/go.mod h1:bj2/e0l4jRnQdrnSTaGTsh4GSXvMjQcy41i7th0GVGw=
|
||||
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
||||
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
|
||||
github.com/jessevdk/go-flags v1.5.0/go.mod h1:Fw0T6WPc1dYxT4mKEZRfG5kJhaTDP9pj1c2EWnYs/m4=
|
||||
@ -2152,8 +2152,8 @@ github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27
|
||||
github.com/waku-org/go-libp2p-pubsub v0.12.0-gowaku.0.20240823143342-b0f2429ca27f/go.mod h1:Oi0zw9aw8/Y5GC99zt+Ef2gYAl+0nZlwdJonDyOz/sE=
|
||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
|
||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20241128183857-1608cf2b0b90 h1:p7tehUW7f+D6pvMJYop2yJV03SJU2fFUusmSnKL3uow=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20241128183857-1608cf2b0b90/go.mod h1:1BRnyg2mQ2aBNLTBaPq6vEvobzywGykPOhGQFbHGf74=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20241203032230-6550ff35bc71 h1:P9sQncEeeBqBRQEtiLdgQe5oWcTlAV5IVA5VGMqGslc=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20241203032230-6550ff35bc71/go.mod h1:zYhLgqwBE3sGP2vP+aNiM5moOKlf/uSoIv36puAj9WI=
|
||||
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
|
||||
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
|
||||
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=
|
||||
|
@ -58,6 +58,8 @@ const (
|
||||
MissedRelevantMessageMetric TelemetryType = "MissedRelevantMessages"
|
||||
// MVDS ack received for a sent message
|
||||
MessageDeliveryConfirmedMetric TelemetryType = "MessageDeliveryConfirmed"
|
||||
// Total number and size of Waku messages sent by this node
|
||||
SentMessageTotalMetric TelemetryType = "SentMessageTotal"
|
||||
)
|
||||
|
||||
const MaxRetryCache = 5000
|
||||
@ -145,6 +147,10 @@ func (c *Client) PushMessageDeliveryConfirmed(ctx context.Context, messageHash s
|
||||
c.processAndPushTelemetry(ctx, MessageDeliveryConfirmed{MessageHash: messageHash})
|
||||
}
|
||||
|
||||
func (c *Client) PushSentMessageTotal(ctx context.Context, messageSize uint32) {
|
||||
c.processAndPushTelemetry(ctx, SentMessageTotal{Size: messageSize})
|
||||
}
|
||||
|
||||
type ReceivedMessages struct {
|
||||
Filter transport.Filter
|
||||
SSHMessage *types.Message
|
||||
@ -196,6 +202,10 @@ type MessageDeliveryConfirmed struct {
|
||||
MessageHash string
|
||||
}
|
||||
|
||||
type SentMessageTotal struct {
|
||||
Size uint32
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
serverURL string
|
||||
httpClient *http.Client
|
||||
@ -392,6 +402,12 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{})
|
||||
TelemetryType: MessageDeliveryConfirmedMetric,
|
||||
TelemetryData: c.ProcessMessageDeliveryConfirmed(v),
|
||||
}
|
||||
case SentMessageTotal:
|
||||
telemetryRequest = TelemetryRequest{
|
||||
Id: c.nextId,
|
||||
TelemetryType: SentMessageTotalMetric,
|
||||
TelemetryData: c.ProcessSentMessageTotal(v),
|
||||
}
|
||||
default:
|
||||
c.logger.Error("Unknown telemetry data type")
|
||||
return
|
||||
@ -567,6 +583,12 @@ func (c *Client) ProcessMessageDeliveryConfirmed(messageDeliveryConfirmed Messag
|
||||
return c.marshalPostBody(postBody)
|
||||
}
|
||||
|
||||
func (c *Client) ProcessSentMessageTotal(sentMessageTotal SentMessageTotal) *json.RawMessage {
|
||||
postBody := c.commonPostBody()
|
||||
postBody["size"] = sentMessageTotal.Size
|
||||
return c.marshalPostBody(postBody)
|
||||
}
|
||||
|
||||
// Helper function to marshal post body and handle errors
|
||||
func (c *Client) marshalPostBody(postBody map[string]interface{}) *json.RawMessage {
|
||||
body, err := json.Marshal(postBody)
|
||||
|
@ -597,3 +597,17 @@ func TestProcessDialFailure(t *testing.T) {
|
||||
}
|
||||
runTestCase(t, tc)
|
||||
}
|
||||
|
||||
func TestProcessSentMessageTotal(t *testing.T) {
|
||||
tc := testCase{
|
||||
name: "SentMessageTotal",
|
||||
input: SentMessageTotal{
|
||||
Size: uint32(1234),
|
||||
},
|
||||
expectedType: SentMessageTotalMetric,
|
||||
expectedFields: map[string]interface{}{
|
||||
"size": float64(1234),
|
||||
},
|
||||
}
|
||||
runTestCase(t, tc)
|
||||
}
|
||||
|
1
vendor/github.com/jellydator/ttlcache/v3/README.md
generated
vendored
1
vendor/github.com/jellydator/ttlcache/v3/README.md
generated
vendored
@ -11,6 +11,7 @@
|
||||
- Item expiration and automatic deletion
|
||||
- Automatic expiration time extension on each `Get` call
|
||||
- `Loader` interface that may be used to load/lazily initialize missing cache
|
||||
- Thread Safe
|
||||
items
|
||||
- Event handlers (insertion and eviction)
|
||||
- Metrics
|
||||
|
105
vendor/github.com/jellydator/ttlcache/v3/cache.go
generated
vendored
105
vendor/github.com/jellydator/ttlcache/v3/cache.go
generated
vendored
@ -133,7 +133,7 @@ func (c *Cache[K, V]) set(key K, value V, ttl time.Duration) *Item[K, V] {
|
||||
ttl = c.options.ttl
|
||||
}
|
||||
|
||||
elem := c.get(key, false)
|
||||
elem := c.get(key, false, true)
|
||||
if elem != nil {
|
||||
// update/overwrite an existing item
|
||||
item := elem.Value.(*Item[K, V])
|
||||
@ -176,14 +176,14 @@ func (c *Cache[K, V]) set(key K, value V, ttl time.Duration) *Item[K, V] {
|
||||
// It returns nil if the item is not found or is expired.
|
||||
// Not safe for concurrent use by multiple goroutines without additional
|
||||
// locking.
|
||||
func (c *Cache[K, V]) get(key K, touch bool) *list.Element {
|
||||
func (c *Cache[K, V]) get(key K, touch bool, includeExpired bool) *list.Element {
|
||||
elem := c.items.values[key]
|
||||
if elem == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
item := elem.Value.(*Item[K, V])
|
||||
if item.isExpiredUnsafe() {
|
||||
if !includeExpired && item.isExpiredUnsafe() {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -218,7 +218,7 @@ func (c *Cache[K, V]) getWithOpts(key K, lockAndLoad bool, opts ...Option[K, V])
|
||||
c.items.mu.Lock()
|
||||
}
|
||||
|
||||
elem := c.get(key, !getOpts.disableTouchOnHit)
|
||||
elem := c.get(key, !getOpts.disableTouchOnHit, false)
|
||||
|
||||
if lockAndLoad {
|
||||
c.items.mu.Unlock()
|
||||
@ -339,8 +339,8 @@ func (c *Cache[K, V]) Has(key K) bool {
|
||||
c.items.mu.RLock()
|
||||
defer c.items.mu.RUnlock()
|
||||
|
||||
_, ok := c.items.values[key]
|
||||
return ok
|
||||
elem, ok := c.items.values[key]
|
||||
return ok && !elem.Value.(*Item[K, V]).isExpiredUnsafe()
|
||||
}
|
||||
|
||||
// GetOrSet retrieves an item from the cache by the provided key.
|
||||
@ -436,27 +436,67 @@ func (c *Cache[K, V]) DeleteExpired() {
|
||||
// If the item is not found, the method is no-op.
|
||||
func (c *Cache[K, V]) Touch(key K) {
|
||||
c.items.mu.Lock()
|
||||
c.get(key, true)
|
||||
c.get(key, true, false)
|
||||
c.items.mu.Unlock()
|
||||
}
|
||||
|
||||
// Len returns the total number of items in the cache.
|
||||
// Len returns the number of unexpired items in the cache.
|
||||
func (c *Cache[K, V]) Len() int {
|
||||
c.items.mu.RLock()
|
||||
defer c.items.mu.RUnlock()
|
||||
|
||||
return len(c.items.values)
|
||||
total := c.items.expQueue.Len()
|
||||
if total == 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
// search the heap-based expQueue by BFS
|
||||
countExpired := func() int {
|
||||
var (
|
||||
q []int
|
||||
res int
|
||||
)
|
||||
|
||||
item := c.items.expQueue[0].Value.(*Item[K, V])
|
||||
if !item.isExpiredUnsafe() {
|
||||
return res
|
||||
}
|
||||
|
||||
q = append(q, 0)
|
||||
for len(q) > 0 {
|
||||
pop := q[0]
|
||||
q = q[1:]
|
||||
res++
|
||||
|
||||
for i := 1; i <= 2; i++ {
|
||||
idx := 2*pop + i
|
||||
if idx >= total {
|
||||
break
|
||||
}
|
||||
|
||||
item = c.items.expQueue[idx].Value.(*Item[K, V])
|
||||
if item.isExpiredUnsafe() {
|
||||
q = append(q, idx)
|
||||
}
|
||||
}
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
return total - countExpired()
|
||||
}
|
||||
|
||||
// Keys returns all keys currently present in the cache.
|
||||
// Keys returns all unexpired keys in the cache.
|
||||
func (c *Cache[K, V]) Keys() []K {
|
||||
c.items.mu.RLock()
|
||||
defer c.items.mu.RUnlock()
|
||||
|
||||
res := make([]K, 0, len(c.items.values))
|
||||
for k := range c.items.values {
|
||||
res := make([]K, 0)
|
||||
for k, elem := range c.items.values {
|
||||
if !elem.Value.(*Item[K, V]).isExpiredUnsafe() {
|
||||
res = append(res, k)
|
||||
}
|
||||
}
|
||||
|
||||
return res
|
||||
}
|
||||
@ -467,18 +507,18 @@ func (c *Cache[K, V]) Items() map[K]*Item[K, V] {
|
||||
c.items.mu.RLock()
|
||||
defer c.items.mu.RUnlock()
|
||||
|
||||
items := make(map[K]*Item[K, V], len(c.items.values))
|
||||
for k := range c.items.values {
|
||||
item := c.get(k, false)
|
||||
if item != nil {
|
||||
items[k] = item.Value.(*Item[K, V])
|
||||
items := make(map[K]*Item[K, V])
|
||||
for k, elem := range c.items.values {
|
||||
item := elem.Value.(*Item[K, V])
|
||||
if item != nil && !item.isExpiredUnsafe() {
|
||||
items[k] = item
|
||||
}
|
||||
}
|
||||
|
||||
return items
|
||||
}
|
||||
|
||||
// Range calls fn for each item present in the cache. If fn returns false,
|
||||
// Range calls fn for each unexpired item in the cache. If fn returns false,
|
||||
// Range stops the iteration.
|
||||
func (c *Cache[K, V]) Range(fn func(item *Item[K, V]) bool) {
|
||||
c.items.mu.RLock()
|
||||
@ -491,9 +531,10 @@ func (c *Cache[K, V]) Range(fn func(item *Item[K, V]) bool) {
|
||||
|
||||
for item := c.items.lru.Front(); item != c.items.lru.Back().Next(); item = item.Next() {
|
||||
i := item.Value.(*Item[K, V])
|
||||
expired := i.isExpiredUnsafe()
|
||||
c.items.mu.RUnlock()
|
||||
|
||||
if !fn(i) {
|
||||
if !expired && !fn(i) {
|
||||
return
|
||||
}
|
||||
|
||||
@ -503,6 +544,32 @@ func (c *Cache[K, V]) Range(fn func(item *Item[K, V]) bool) {
|
||||
}
|
||||
}
|
||||
|
||||
// RangeBackwards calls fn for each unexpired item in the cache in reverse order.
|
||||
// If fn returns false, RangeBackwards stops the iteration.
|
||||
func (c *Cache[K, V]) RangeBackwards(fn func(item *Item[K, V]) bool) {
|
||||
c.items.mu.RLock()
|
||||
|
||||
// Check if cache is empty
|
||||
if c.items.lru.Len() == 0 {
|
||||
c.items.mu.RUnlock()
|
||||
return
|
||||
}
|
||||
|
||||
for item := c.items.lru.Back(); item != c.items.lru.Front().Prev(); item = item.Prev() {
|
||||
i := item.Value.(*Item[K, V])
|
||||
expired := i.isExpiredUnsafe()
|
||||
c.items.mu.RUnlock()
|
||||
|
||||
if !expired && !fn(i) {
|
||||
return
|
||||
}
|
||||
|
||||
if item.Prev() != nil {
|
||||
c.items.mu.RLock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Metrics returns the metrics of the cache.
|
||||
func (c *Cache[K, V]) Metrics() Metrics {
|
||||
c.metricsMu.RLock()
|
||||
|
27
vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_sender.go
generated
vendored
27
vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_sender.go
generated
vendored
@ -6,6 +6,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/libp2p/go-libp2p/core/event"
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
@ -53,6 +55,12 @@ type MessageSender struct {
|
||||
messageSentCheck ISentCheck
|
||||
rateLimiter *PublishRateLimiter
|
||||
logger *zap.Logger
|
||||
evtMessageSent event.Emitter
|
||||
}
|
||||
|
||||
type MessageSent struct {
|
||||
Size uint32 // Size of payload in bytes
|
||||
Timestamp int64
|
||||
}
|
||||
|
||||
type Request struct {
|
||||
@ -96,6 +104,15 @@ func (ms *MessageSender) WithRateLimiting(rateLimiter *PublishRateLimiter) *Mess
|
||||
return ms
|
||||
}
|
||||
|
||||
func (ms *MessageSender) WithMessageSentEmitter(host host.Host) *MessageSender {
|
||||
evtMessageSent, err := host.EventBus().Emitter(new(MessageSent))
|
||||
if err != nil {
|
||||
ms.logger.Error("failed to create message sent emitter", zap.Error(err))
|
||||
}
|
||||
ms.evtMessageSent = evtMessageSent
|
||||
return ms
|
||||
}
|
||||
|
||||
func (ms *MessageSender) Send(req *Request) error {
|
||||
logger := ms.logger.With(
|
||||
zap.Stringer("envelopeHash", req.envelope.Hash()),
|
||||
@ -149,6 +166,16 @@ func (ms *MessageSender) Send(req *Request) error {
|
||||
)
|
||||
}
|
||||
|
||||
if ms.evtMessageSent != nil {
|
||||
err := ms.evtMessageSent.Emit(MessageSent{
|
||||
Size: uint32(len(req.envelope.Message().Payload)),
|
||||
Timestamp: req.envelope.Message().GetTimestamp(),
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error("failed to emit message sent event", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
20
vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go
generated
vendored
20
vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go
generated
vendored
@ -54,6 +54,7 @@ type WakuFilterLightNode struct {
|
||||
log *zap.Logger
|
||||
subscriptions *subscription.SubscriptionsMap
|
||||
pm *peermanager.PeerManager
|
||||
limiter *utils.RateLimiter
|
||||
peerPingInterval time.Duration
|
||||
}
|
||||
|
||||
@ -89,6 +90,7 @@ func NewWakuFilterLightNode(
|
||||
onlineChecker onlinechecker.OnlineChecker,
|
||||
reg prometheus.Registerer,
|
||||
log *zap.Logger,
|
||||
opts ...LightNodeOption,
|
||||
) *WakuFilterLightNode {
|
||||
wf := new(WakuFilterLightNode)
|
||||
wf.log = log.Named("filterv2-lightnode")
|
||||
@ -99,6 +101,14 @@ func NewWakuFilterLightNode(
|
||||
wf.CommonService = service.NewCommonService()
|
||||
wf.metrics = newMetrics(reg)
|
||||
wf.peerPingInterval = 1 * time.Minute
|
||||
|
||||
params := &LightNodeParameters{}
|
||||
opts = append(DefaultLightNodeOptions(), opts...)
|
||||
for _, opt := range opts {
|
||||
opt(params)
|
||||
}
|
||||
wf.limiter = utils.NewRateLimiter(params.limitR, params.limitB)
|
||||
|
||||
return wf
|
||||
}
|
||||
|
||||
@ -155,6 +165,14 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea
|
||||
|
||||
logger := wf.log.With(logging.HostID("peerID", peerID))
|
||||
|
||||
if !wf.limiter.Allow(peerID) {
|
||||
wf.metrics.RecordError(rateLimitFailure)
|
||||
if err := stream.Reset(); err != nil {
|
||||
wf.log.Error("resetting connection", zap.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if !wf.subscriptions.IsSubscribedTo(peerID) {
|
||||
logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID))
|
||||
wf.metrics.RecordError(unknownPeerMessagePush)
|
||||
@ -287,7 +305,7 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,
|
||||
|
||||
}
|
||||
|
||||
if filterSubscribeResponse.RequestId != request.RequestId {
|
||||
if filterSubscribeResponse.RequestId != "N/A" && filterSubscribeResponse.RequestId != request.RequestId {
|
||||
wf.log.Error("requestID mismatch", zap.String("expected", request.RequestId), zap.String("received", filterSubscribeResponse.RequestId))
|
||||
wf.metrics.RecordError(requestIDMismatch)
|
||||
err := NewFilterError(300, "request_id_mismatch")
|
||||
|
1
vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/metrics.go
generated
vendored
1
vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/metrics.go
generated
vendored
@ -96,6 +96,7 @@ var (
|
||||
peerNotFoundFailure metricsErrCategory = "peer_not_found_failure"
|
||||
writeResponseFailure metricsErrCategory = "write_response_failure"
|
||||
pushTimeoutFailure metricsErrCategory = "push_timeout_failure"
|
||||
rateLimitFailure metricsErrCategory = "ratelimit_failure"
|
||||
)
|
||||
|
||||
// RecordError increases the counter for different error types
|
||||
|
31
vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/options.go
generated
vendored
31
vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/options.go
generated
vendored
@ -11,6 +11,7 @@ import (
|
||||
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
func (old *FilterSubscribeParameters) Copy() *FilterSubscribeParameters {
|
||||
@ -57,13 +58,35 @@ type (
|
||||
Timeout time.Duration
|
||||
MaxSubscribers int
|
||||
pm *peermanager.PeerManager
|
||||
limitR rate.Limit
|
||||
limitB int
|
||||
}
|
||||
|
||||
Option func(*FilterParameters)
|
||||
|
||||
LightNodeParameters struct {
|
||||
limitR rate.Limit
|
||||
limitB int
|
||||
}
|
||||
|
||||
LightNodeOption func(*LightNodeParameters)
|
||||
|
||||
FilterSubscribeOption func(*FilterSubscribeParameters) error
|
||||
)
|
||||
|
||||
func WithLightNodeRateLimiter(r rate.Limit, b int) LightNodeOption {
|
||||
return func(params *LightNodeParameters) {
|
||||
params.limitR = r
|
||||
params.limitB = b
|
||||
}
|
||||
}
|
||||
|
||||
func DefaultLightNodeOptions() []LightNodeOption {
|
||||
return []LightNodeOption{
|
||||
WithLightNodeRateLimiter(1, 1),
|
||||
}
|
||||
}
|
||||
|
||||
func WithTimeout(timeout time.Duration) Option {
|
||||
return func(params *FilterParameters) {
|
||||
params.Timeout = timeout
|
||||
@ -202,9 +225,17 @@ func WithPeerManager(pm *peermanager.PeerManager) Option {
|
||||
}
|
||||
}
|
||||
|
||||
func WithFullNodeRateLimiter(r rate.Limit, b int) Option {
|
||||
return func(params *FilterParameters) {
|
||||
params.limitR = r
|
||||
params.limitB = b
|
||||
}
|
||||
}
|
||||
|
||||
func DefaultOptions() []Option {
|
||||
return []Option{
|
||||
WithTimeout(DefaultIdleSubscriptionTimeout),
|
||||
WithMaxSubscribers(DefaultMaxSubscribers),
|
||||
WithFullNodeRateLimiter(1, 1),
|
||||
}
|
||||
}
|
||||
|
12
vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go
generated
vendored
12
vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/server.go
generated
vendored
@ -39,7 +39,7 @@ type (
|
||||
*service.CommonService
|
||||
subscriptions *SubscribersMap
|
||||
pm *peermanager.PeerManager
|
||||
|
||||
limiter *utils.RateLimiter
|
||||
maxSubscriptions int
|
||||
}
|
||||
)
|
||||
@ -56,6 +56,7 @@ func NewWakuFilterFullNode(timesource timesource.Timesource, reg prometheus.Regi
|
||||
opt(params)
|
||||
}
|
||||
|
||||
wf.limiter = utils.NewRateLimiter(params.limitR, params.limitB)
|
||||
wf.CommonService = service.NewCommonService()
|
||||
wf.metrics = newMetrics(reg)
|
||||
wf.subscriptions = NewSubscribersMap(params.Timeout)
|
||||
@ -93,7 +94,14 @@ func (wf *WakuFilterFullNode) start(sub *relay.Subscription) error {
|
||||
|
||||
func (wf *WakuFilterFullNode) onRequest(ctx context.Context) func(network.Stream) {
|
||||
return func(stream network.Stream) {
|
||||
logger := wf.log.With(logging.HostID("peer", stream.Conn().RemotePeer()))
|
||||
peerID := stream.Conn().RemotePeer()
|
||||
logger := wf.log.With(logging.HostID("peer", peerID))
|
||||
|
||||
if !wf.limiter.Allow(peerID) {
|
||||
wf.metrics.RecordError(rateLimitFailure)
|
||||
wf.reply(ctx, stream, &pb.FilterSubscribeRequest{RequestId: "N/A"}, http.StatusTooManyRequests, "filter request rejected due rate limit exceeded")
|
||||
return
|
||||
}
|
||||
|
||||
reader := pbio.NewDelimitedReader(stream, math.MaxInt32)
|
||||
|
||||
|
5
vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/test_utils.go
generated
vendored
5
vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/test_utils.go
generated
vendored
@ -22,6 +22,7 @@ import (
|
||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
type LightNodeData struct {
|
||||
@ -133,7 +134,7 @@ func (s *FilterTestSuite) GetWakuFilterFullNode(topic string, withRegisterAll bo
|
||||
|
||||
nodeData := s.GetWakuRelay(topic)
|
||||
|
||||
node2Filter := NewWakuFilterFullNode(timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log)
|
||||
node2Filter := NewWakuFilterFullNode(timesource.NewDefaultClock(), prometheus.DefaultRegisterer, s.Log, WithFullNodeRateLimiter(rate.Inf, 0))
|
||||
node2Filter.SetHost(nodeData.FullNodeHost)
|
||||
|
||||
var sub *relay.Subscription
|
||||
@ -166,7 +167,7 @@ func (s *FilterTestSuite) GetWakuFilterLightNode() LightNodeData {
|
||||
b := relay.NewBroadcaster(10)
|
||||
s.Require().NoError(b.Start(context.Background()))
|
||||
pm := peermanager.NewPeerManager(5, 5, nil, nil, true, s.Log)
|
||||
filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), onlinechecker.NewDefaultOnlineChecker(true), prometheus.DefaultRegisterer, s.Log)
|
||||
filterPush := NewWakuFilterLightNode(b, pm, timesource.NewDefaultClock(), onlinechecker.NewDefaultOnlineChecker(true), prometheus.DefaultRegisterer, s.Log, WithLightNodeRateLimiter(rate.Inf, 0))
|
||||
filterPush.SetHost(host)
|
||||
pm.SetHost(host)
|
||||
return LightNodeData{filterPush, host}
|
||||
|
8
vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go
generated
vendored
8
vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go
generated
vendored
@ -24,7 +24,6 @@ import (
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
// LightPushID_v20beta1 is the current Waku LightPush protocol identifier
|
||||
@ -40,7 +39,7 @@ var (
|
||||
type WakuLightPush struct {
|
||||
h host.Host
|
||||
relay *relay.WakuRelay
|
||||
limiter *rate.Limiter
|
||||
limiter *utils.RateLimiter
|
||||
cancel context.CancelFunc
|
||||
pm *peermanager.PeerManager
|
||||
metrics Metrics
|
||||
@ -59,11 +58,12 @@ func NewWakuLightPush(relay *relay.WakuRelay, pm *peermanager.PeerManager, reg p
|
||||
wakuLP.metrics = newMetrics(reg)
|
||||
|
||||
params := &LightpushParameters{}
|
||||
opts = append(DefaultLightpushOptions(), opts...)
|
||||
for _, opt := range opts {
|
||||
opt(params)
|
||||
}
|
||||
|
||||
wakuLP.limiter = params.limiter
|
||||
wakuLP.limiter = utils.NewRateLimiter(params.limitR, params.limitB)
|
||||
|
||||
return wakuLP
|
||||
}
|
||||
@ -106,7 +106,7 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(network.Stream)
|
||||
Response: &pb.PushResponse{},
|
||||
}
|
||||
|
||||
if wakuLP.limiter != nil && !wakuLP.limiter.Allow() {
|
||||
if !wakuLP.limiter.Allow(stream.Conn().RemotePeer()) {
|
||||
wakuLP.metrics.RecordError(rateLimitFailure)
|
||||
responseMsg := "exceeds the rate limit"
|
||||
responsePushRPC.Response.Info = &responseMsg
|
||||
|
12
vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush_option.go
generated
vendored
12
vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush_option.go
generated
vendored
@ -14,7 +14,8 @@ import (
|
||||
)
|
||||
|
||||
type LightpushParameters struct {
|
||||
limiter *rate.Limiter
|
||||
limitR rate.Limit
|
||||
limitB int
|
||||
}
|
||||
|
||||
type Option func(*LightpushParameters)
|
||||
@ -22,7 +23,14 @@ type Option func(*LightpushParameters)
|
||||
// WithRateLimiter is an option used to specify a rate limiter for requests received in lightpush protocol
|
||||
func WithRateLimiter(r rate.Limit, b int) Option {
|
||||
return func(params *LightpushParameters) {
|
||||
params.limiter = rate.NewLimiter(r, b)
|
||||
params.limitR = r
|
||||
params.limitB = b
|
||||
}
|
||||
}
|
||||
|
||||
func DefaultLightpushOptions() []Option {
|
||||
return []Option{
|
||||
WithRateLimiter(1, 1),
|
||||
}
|
||||
}
|
||||
|
||||
|
11
vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/protocol.go
generated
vendored
11
vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/protocol.go
generated
vendored
@ -23,7 +23,6 @@ import (
|
||||
"github.com/waku-org/go-waku/waku/v2/service"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
// PeerExchangeID_v20alpha1 is the current Waku Peer Exchange protocol identifier
|
||||
@ -51,7 +50,7 @@ type WakuPeerExchange struct {
|
||||
|
||||
peerConnector PeerConnector
|
||||
enrCache *enrCache
|
||||
limiter *rate.Limiter
|
||||
limiter *utils.RateLimiter
|
||||
}
|
||||
|
||||
// NewWakuPeerExchange returns a new instance of WakuPeerExchange struct
|
||||
@ -68,11 +67,12 @@ func NewWakuPeerExchange(disc *discv5.DiscoveryV5, clusterID uint16, peerConnect
|
||||
wakuPX.CommonService = service.NewCommonService()
|
||||
|
||||
params := &PeerExchangeParameters{}
|
||||
opts = append(DefaultPeerExchangeOptions(), opts...)
|
||||
for _, opt := range opts {
|
||||
opt(params)
|
||||
}
|
||||
|
||||
wakuPX.limiter = params.limiter
|
||||
wakuPX.limiter = utils.NewRateLimiter(params.limiterR, params.limiterB)
|
||||
return wakuPX, nil
|
||||
}
|
||||
|
||||
@ -97,9 +97,10 @@ func (wakuPX *WakuPeerExchange) start() error {
|
||||
|
||||
func (wakuPX *WakuPeerExchange) onRequest() func(network.Stream) {
|
||||
return func(stream network.Stream) {
|
||||
logger := wakuPX.log.With(logging.HostID("peer", stream.Conn().RemotePeer()))
|
||||
peerID := stream.Conn().RemotePeer()
|
||||
logger := wakuPX.log.With(logging.HostID("peer", peerID))
|
||||
|
||||
if wakuPX.limiter != nil && !wakuPX.limiter.Allow() {
|
||||
if wakuPX.limiter != nil && !wakuPX.limiter.Allow(peerID) {
|
||||
wakuPX.metrics.RecordError(rateLimitFailure)
|
||||
wakuPX.log.Info("exceeds the rate limit")
|
||||
// TODO: peer exchange protocol should contain an err field
|
||||
|
@ -12,7 +12,8 @@ import (
|
||||
)
|
||||
|
||||
type PeerExchangeParameters struct {
|
||||
limiter *rate.Limiter
|
||||
limiterR rate.Limit
|
||||
limiterB int
|
||||
}
|
||||
|
||||
type Option func(*PeerExchangeParameters)
|
||||
@ -20,7 +21,14 @@ type Option func(*PeerExchangeParameters)
|
||||
// WithRateLimiter is an option used to specify a rate limiter for requests received in lightpush protocol
|
||||
func WithRateLimiter(r rate.Limit, b int) Option {
|
||||
return func(params *PeerExchangeParameters) {
|
||||
params.limiter = rate.NewLimiter(r, b)
|
||||
params.limiterR = r
|
||||
params.limiterB = b
|
||||
}
|
||||
}
|
||||
|
||||
func DefaultPeerExchangeOptions() []Option {
|
||||
return []Option{
|
||||
WithRateLimiter(1, 1),
|
||||
}
|
||||
}
|
||||
|
||||
|
69
vendor/github.com/waku-org/go-waku/waku/v2/utils/limiter.go
generated
vendored
Normal file
69
vendor/github.com/waku-org/go-waku/waku/v2/utils/limiter.go
generated
vendored
Normal file
@ -0,0 +1,69 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/jellydator/ttlcache/v3"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
type RateLimiter struct {
|
||||
sync.Mutex
|
||||
limiters *ttlcache.Cache[peer.ID, *rate.Limiter]
|
||||
r rate.Limit
|
||||
b int
|
||||
}
|
||||
|
||||
func NewRateLimiter(r rate.Limit, b int) *RateLimiter {
|
||||
return &RateLimiter{
|
||||
r: r,
|
||||
b: b,
|
||||
limiters: ttlcache.New[peer.ID, *rate.Limiter](
|
||||
ttlcache.WithTTL[peer.ID, *rate.Limiter](30 * time.Minute),
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RateLimiter) Start(ctx context.Context) {
|
||||
go func() {
|
||||
t := time.NewTicker(time.Hour)
|
||||
defer t.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
r.Lock()
|
||||
r.limiters.DeleteExpired()
|
||||
r.Unlock()
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (r *RateLimiter) getOrCreate(peerID peer.ID) *rate.Limiter {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
|
||||
var limiter *rate.Limiter
|
||||
if !r.limiters.Has(peerID) {
|
||||
limiter = rate.NewLimiter(r.r, r.b)
|
||||
r.limiters.Set(peerID, limiter, ttlcache.DefaultTTL)
|
||||
} else {
|
||||
v := r.limiters.Get(peerID)
|
||||
limiter = v.Value()
|
||||
}
|
||||
return limiter
|
||||
}
|
||||
|
||||
func (r *RateLimiter) Allow(peerID peer.ID) bool {
|
||||
return r.getOrCreate(peerID).Allow()
|
||||
}
|
||||
|
||||
func (r *RateLimiter) Wait(ctx context.Context, peerID peer.ID) error {
|
||||
return r.getOrCreate(peerID).Wait(ctx)
|
||||
}
|
4
vendor/modules.txt
vendored
4
vendor/modules.txt
vendored
@ -448,7 +448,7 @@ github.com/jackpal/go-nat-pmp
|
||||
# github.com/jbenet/go-temp-err-catcher v0.1.0
|
||||
## explicit; go 1.13
|
||||
github.com/jbenet/go-temp-err-catcher
|
||||
# github.com/jellydator/ttlcache/v3 v3.2.0
|
||||
# github.com/jellydator/ttlcache/v3 v3.3.0
|
||||
## explicit; go 1.18
|
||||
github.com/jellydator/ttlcache/v3
|
||||
# github.com/jinzhu/copier v0.0.0-20190924061706-b57f9002281a
|
||||
@ -1044,7 +1044,7 @@ github.com/waku-org/go-discover/discover/v5wire
|
||||
github.com/waku-org/go-libp2p-rendezvous
|
||||
github.com/waku-org/go-libp2p-rendezvous/db
|
||||
github.com/waku-org/go-libp2p-rendezvous/pb
|
||||
# github.com/waku-org/go-waku v0.8.1-0.20241128183857-1608cf2b0b90
|
||||
# github.com/waku-org/go-waku v0.8.1-0.20241203032230-6550ff35bc71
|
||||
## explicit; go 1.21
|
||||
github.com/waku-org/go-waku/logging
|
||||
github.com/waku-org/go-waku/tests
|
||||
|
@ -55,6 +55,7 @@ import (
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
"github.com/libp2p/go-libp2p/core/metrics"
|
||||
|
||||
commonapi "github.com/waku-org/go-waku/waku/v2/api/common"
|
||||
filterapi "github.com/waku-org/go-waku/waku/v2/api/filter"
|
||||
"github.com/waku-org/go-waku/waku/v2/api/history"
|
||||
"github.com/waku-org/go-waku/waku/v2/api/missing"
|
||||
@ -72,8 +73,6 @@ import (
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
|
||||
commonapi "github.com/waku-org/go-waku/waku/v2/api/common"
|
||||
|
||||
gocommon "github.com/status-im/status-go/common"
|
||||
"github.com/status-im/status-go/connection"
|
||||
"github.com/status-im/status-go/eth-node/types"
|
||||
@ -119,6 +118,7 @@ type ITelemetryClient interface {
|
||||
PushMissedMessage(ctx context.Context, envelope *protocol.Envelope)
|
||||
PushMissedRelevantMessage(ctx context.Context, message *common.ReceivedMessage)
|
||||
PushMessageDeliveryConfirmed(ctx context.Context, messageHash string)
|
||||
PushSentMessageTotal(ctx context.Context, messageSize uint32)
|
||||
}
|
||||
|
||||
// Waku represents a dark communication interface through the Ethereum
|
||||
@ -1114,12 +1114,18 @@ func (w *Waku) Start() error {
|
||||
peerTelemetryTicker := time.NewTicker(peerTelemetryTickerInterval)
|
||||
defer peerTelemetryTicker.Stop()
|
||||
|
||||
sub, err := w.node.Host().EventBus().Subscribe(new(utils.DialError))
|
||||
dialErrSub, err := w.node.Host().EventBus().Subscribe(new(utils.DialError))
|
||||
if err != nil {
|
||||
w.logger.Error("failed to subscribe to dial errors", zap.Error(err))
|
||||
return
|
||||
}
|
||||
defer sub.Close()
|
||||
defer dialErrSub.Close()
|
||||
|
||||
messageSentSub, err := w.node.Host().EventBus().Subscribe(new(publish.MessageSent))
|
||||
if err != nil {
|
||||
w.logger.Error("failed to subscribe to message sent events", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
@ -1127,11 +1133,13 @@ func (w *Waku) Start() error {
|
||||
return
|
||||
case <-peerTelemetryTicker.C:
|
||||
w.reportPeerMetrics()
|
||||
case dialErr := <-sub.Out():
|
||||
case dialErr := <-dialErrSub.Out():
|
||||
errors := common.ParseDialErrors(dialErr.(utils.DialError).Err.Error())
|
||||
for _, dialError := range errors {
|
||||
w.statusTelemetryClient.PushDialFailure(w.ctx, common.DialError{ErrType: dialError.ErrType, ErrMsg: dialError.ErrMsg, Protocols: dialError.Protocols})
|
||||
}
|
||||
case messageSent := <-messageSentSub.Out():
|
||||
w.statusTelemetryClient.PushSentMessageTotal(w.ctx, messageSent.(publish.MessageSent).Size)
|
||||
}
|
||||
}
|
||||
}()
|
||||
@ -1301,6 +1309,10 @@ func (w *Waku) startMessageSender() error {
|
||||
return err
|
||||
}
|
||||
|
||||
if w.cfg.TelemetryServerURL != "" {
|
||||
sender.WithMessageSentEmitter(w.node.Host())
|
||||
}
|
||||
|
||||
if w.cfg.EnableStoreConfirmationForMessagesSent {
|
||||
msgStoredChan := make(chan gethcommon.Hash, 1000)
|
||||
msgExpiredChan := make(chan gethcommon.Hash, 1000)
|
||||
|
Loading…
x
Reference in New Issue
Block a user