From ea35803eef860a7b52f5bad4f36665abb9f1d754 Mon Sep 17 00:00:00 2001 From: Vaclav Pavlin Date: Fri, 12 Jul 2024 12:34:56 +0200 Subject: [PATCH] feat(waku)_: add lightpush rate-limiter (#5504) * feat(waku)_: add lightpush rate-limiter * chore_: update go-waku --- go.mod | 2 +- go.sum | 4 +- .../go-waku/waku/v2/node/wakunode2.go | 2 +- .../go-waku/waku/v2/node/wakuoptions.go | 7 +- .../v2/protocol/lightpush/waku_lightpush.go | 67 +++++++---- .../lightpush/waku_lightpush_option.go | 15 ++- .../v2/protocol/peer_exchange/protocol.go | 3 + vendor/modules.txt | 2 +- wakuv2/waku.go | 2 +- wakuv2/waku_test.go | 108 +++++++++++++++++- 10 files changed, 175 insertions(+), 37 deletions(-) diff --git a/go.mod b/go.mod index 511015217..8f2656ae5 100644 --- a/go.mod +++ b/go.mod @@ -96,7 +96,7 @@ require ( github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20240711160252-9412af28dd81 + github.com/waku-org/go-waku v0.8.1-0.20240712043904-2f333c1e1c13 github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 diff --git a/go.sum b/go.sum index 023138815..34752760c 100644 --- a/go.sum +++ b/go.sum @@ -2143,8 +2143,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20240711160252-9412af28dd81 h1:0YV8dN8qdNV7MwcMB2s9ky826Tv5KAK3Untabf4RIAU= -github.com/waku-org/go-waku v0.8.1-0.20240711160252-9412af28dd81/go.mod h1:ugDTCvcP6oJ9mTtINeo4EIsnC3oQCU3RsctNKu4MsRw= +github.com/waku-org/go-waku v0.8.1-0.20240712043904-2f333c1e1c13 h1:2OCOlUdH4Vvt26Gj6EXnZpzgrHjWty9vUfm+6ZXvCxo= +github.com/waku-org/go-waku v0.8.1-0.20240712043904-2f333c1e1c13/go.mod h1:ugDTCvcP6oJ9mTtINeo4EIsnC3oQCU3RsctNKu4MsRw= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go index 5032c823c..4eefa7e27 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go @@ -272,7 +272,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { } } - w.peerExchange, err = peer_exchange.NewWakuPeerExchange(w.DiscV5(), w.opts.clusterID, w.peerConnector, w.peermanager, w.opts.prometheusReg, w.log) + w.peerExchange, err = peer_exchange.NewWakuPeerExchange(w.DiscV5(), w.opts.clusterID, w.peerConnector, w.peermanager, w.opts.prometheusReg, w.log, w.opts.peerExchangeOptions...) if err != nil { return nil, err } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go index 82d964611..2e34ace73 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakuoptions.go @@ -31,6 +31,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange" "github.com/waku-org/go-waku/waku/v2/rendezvous" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" @@ -102,7 +103,8 @@ type WakuNodeParameters struct { discV5bootnodes []*enode.Node discV5autoUpdate bool - enablePeerExchange bool + enablePeerExchange bool + peerExchangeOptions []peer_exchange.Option enableRLN bool rlnRelayMemIndex *uint @@ -411,9 +413,10 @@ func WithDiscoveryV5(udpPort uint, bootnodes []*enode.Node, autoUpdate bool) Wak } // WithPeerExchange is a WakuOption used to enable Peer Exchange -func WithPeerExchange() WakuNodeOption { +func WithPeerExchange(options ...peer_exchange.Option) WakuNodeOption { return func(params *WakuNodeParameters) error { params.enablePeerExchange = true + params.peerExchangeOptions = options return nil } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go index 19708d116..b8f3c0f39 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "math" + "sync" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" @@ -187,7 +188,7 @@ func (wakuLP *WakuLightPush) reply(stream network.Stream, responsePushRPC *pb.Pu } // request sends a message via lightPush protocol to either a specified peer or peer that is selected. -func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushRequestParameters) (*pb.PushResponse, error) { +func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushRequestParameters, peer peer.ID) (*pb.PushResponse, error) { if params == nil { return nil, errors.New("lightpush params are mandatory") } @@ -196,9 +197,9 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p return nil, ErrInvalidID } - logger := wakuLP.log.With(logging.HostID("peer", params.selectedPeer)) + logger := wakuLP.log.With(logging.HostID("peer", peer)) - stream, err := wakuLP.h.NewStream(ctx, params.selectedPeer, LightPushID_v20beta1) + stream, err := wakuLP.h.NewStream(ctx, peer, LightPushID_v20beta1) if err != nil { logger.Error("creating stream to peer", zap.Error(err)) wakuLP.metrics.RecordError(dialFailure) @@ -281,10 +282,10 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe return nil, err } wakuLP.pm.Connect(pData) - params.selectedPeer = pData.AddrInfo.ID + params.selectedPeers = append(params.selectedPeers, pData.AddrInfo.ID) } - - if params.pm != nil && params.selectedPeer == "" { + reqPeerCount := params.maxPeers - len(params.selectedPeers) + if params.pm != nil && reqPeerCount > 0 { var selectedPeers peer.IDSlice //TODO: update this to work with multiple peer selection selectedPeers, err = wakuLP.pm.SelectPeers( @@ -293,17 +294,17 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe Proto: LightPushID_v20beta1, PubsubTopics: []string{params.pubsubTopic}, SpecificPeers: params.preferredPeers, + MaxPeers: reqPeerCount, Ctx: ctx, }, ) if err == nil { - params.selectedPeer = selectedPeers[0] + params.selectedPeers = append(params.selectedPeers, selectedPeers...) } - } - if params.selectedPeer == "" { + if len(params.selectedPeers) == 0 { if err != nil { - params.log.Error("selecting peer", zap.Error(err)) + params.log.Error("selecting peers", zap.Error(err)) wakuLP.metrics.RecordError(peerNotFoundFailure) return nil, ErrNoPeersAvailable } @@ -327,25 +328,41 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessa req.Message = message req.PubsubTopic = params.pubsubTopic - logger := message.Logger(wakuLP.log, params.pubsubTopic).With(logging.HostID("peerID", params.selectedPeer)) + logger := message.Logger(wakuLP.log, params.pubsubTopic).With(zap.Stringers("peerIDs", params.selectedPeers)) logger.Debug("publishing message") - - response, err := wakuLP.request(ctx, req, params) - if err != nil { - logger.Error("could not publish message", zap.Error(err)) - return wpb.MessageHash{}, err + var wg sync.WaitGroup + var responses []*pb.PushResponse + for _, peerID := range params.selectedPeers { + wg.Add(1) + go func(id peer.ID) { + defer wg.Done() + response, err := wakuLP.request(ctx, req, params, id) + if err != nil { + logger.Error("could not publish message", zap.Error(err), zap.Stringer("peer", id)) + } + responses = append(responses, response) + }(peerID) } - - if response.IsSuccess { - hash := message.Hash(params.pubsubTopic) - utils.MessagesLogger("lightpush").Debug("waku.lightpush published", logging.HexBytes("hash", hash[:])) - return hash, nil - } - + wg.Wait() + var successCount int errMsg := "lightpush error" - if response.Info != nil { - errMsg = *response.Info + + for _, response := range responses { + if response.GetIsSuccess() { + successCount++ + } else { + if response.GetInfo() != "" { + errMsg += *response.Info + } + } + } + + //in case of partial failure, should we retry here or build a layer above that takes care of these things? + if successCount > 0 { + hash := message.Hash(params.pubsubTopic) + utils.MessagesLogger("lightpush").Debug("waku.lightpush published", logging.HexBytes("hash", hash[:]), zap.Int("num-peers", len(responses))) + return hash, nil } return wpb.MessageHash{}, errors.New(errMsg) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush_option.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush_option.go index 6ec258990..b192fd17a 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush_option.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -29,7 +29,8 @@ func WithRateLimiter(r rate.Limit, b int) Option { type lightPushRequestParameters struct { host host.Host peerAddr multiaddr.Multiaddr - selectedPeer peer.ID + selectedPeers peer.IDSlice + maxPeers int peerSelectionType peermanager.PeerSelection preferredPeers peer.IDSlice requestID []byte @@ -41,10 +42,17 @@ type lightPushRequestParameters struct { // RequestOption is the type of options accepted when performing LightPush protocol requests type RequestOption func(*lightPushRequestParameters) error +func WithMaxPeers(num int) RequestOption { + return func(params *lightPushRequestParameters) error { + params.maxPeers = num + return nil + } +} + // WithPeer is an option used to specify the peerID to push a waku message to func WithPeer(p peer.ID) RequestOption { return func(params *lightPushRequestParameters) error { - params.selectedPeer = p + params.selectedPeers = append(params.selectedPeers, p) if params.peerAddr != nil { return errors.New("peerAddr and peerId options are mutually exclusive") } @@ -58,7 +66,7 @@ func WithPeer(p peer.ID) RequestOption { func WithPeerAddr(pAddr multiaddr.Multiaddr) RequestOption { return func(params *lightPushRequestParameters) error { params.peerAddr = pAddr - if params.selectedPeer != "" { + if len(params.selectedPeers) != 0 { return errors.New("peerAddr and peerId options are mutually exclusive") } return nil @@ -127,5 +135,6 @@ func DefaultOptions(host host.Host) []RequestOption { return []RequestOption{ WithAutomaticRequestID(), WithAutomaticPeerSelection(), + WithMaxPeers(1), //keeping default as 2 for status use-case } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/protocol.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/protocol.go index c02cdca6e..5f103e12c 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/protocol.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/protocol.go @@ -100,6 +100,9 @@ func (wakuPX *WakuPeerExchange) onRequest() func(network.Stream) { wakuPX.metrics.RecordError(rateLimitFailure) wakuPX.log.Error("exceeds the rate limit") // TODO: peer exchange protocol should contain an err field + if err := stream.Reset(); err != nil { + wakuPX.log.Error("resetting connection", zap.Error(err)) + } return } diff --git a/vendor/modules.txt b/vendor/modules.txt index f27ee62cd..2aa347fc0 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1018,7 +1018,7 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20240711160252-9412af28dd81 +# github.com/waku-org/go-waku v0.8.1-0.20240712043904-2f333c1e1c13 ## explicit; go 1.21 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/tests diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 11802c5a3..8a45bbdbc 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -330,7 +330,7 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge if !cfg.LightClient { opts = append(opts, node.WithWakuFilterFullNode(filter.WithMaxSubscribers(20))) - opts = append(opts, node.WithLightPush()) + opts = append(opts, node.WithLightPush(lightpush.WithRateLimiter(1, 1))) } if appDB != nil { diff --git a/wakuv2/waku_test.go b/wakuv2/waku_test.go index 0e9a18da2..11ab976fa 100644 --- a/wakuv2/waku_test.go +++ b/wakuv2/waku_test.go @@ -25,6 +25,7 @@ import ( "google.golang.org/protobuf/proto" "github.com/waku-org/go-waku/waku/v2/dnsdisc" + wps "github.com/waku-org/go-waku/waku/v2/peerstore" "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" "github.com/waku-org/go-waku/waku/v2/protocol/pb" @@ -550,7 +551,11 @@ func TestWakuV2Store(t *testing.T) { } func waitForPeerConnection(t *testing.T, peerID string, peerCh chan []string) { - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + waitForPeerConnectionWithTimeout(t, peerID, peerCh, 3*time.Second) +} + +func waitForPeerConnectionWithTimeout(t *testing.T, peerID string, peerCh chan []string, timeout time.Duration) { + ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() for { select { @@ -677,3 +682,104 @@ func TestOnlineChecker(t *testing.T) { lightNode.filterManager.addFilter("test", &common.Filter{}) } + +func TestLightpushRateLimit(t *testing.T) { + logger, err := zap.NewDevelopment() + require.NoError(t, err) + + config0 := &Config{} + setDefaultConfig(config0, false) + w0PeersCh := make(chan []string, 5) // buffered not to block on the send side + + // Start the relayu node + w0, err := New(nil, "", config0, logger.Named("relayNode"), nil, nil, nil, func(cs types.ConnStatus) { + w0PeersCh <- maps.Keys(cs.Peers) + }) + require.NoError(t, err) + require.NoError(t, w0.Start()) + defer func() { + require.NoError(t, w0.Stop()) + close(w0PeersCh) + }() + + contentTopics := common.NewTopicSetFromBytes([][]byte{{1, 2, 3, 4}}) + filter := &common.Filter{ + PubsubTopic: config0.DefaultShardPubsubTopic, + Messages: common.NewMemoryMessageStore(), + ContentTopics: contentTopics, + } + + _, err = w0.Subscribe(filter) + require.NoError(t, err) + + config1 := &Config{} + setDefaultConfig(config1, false) + w1PeersCh := make(chan []string, 5) // buffered not to block on the send side + + // Start the full node + w1, err := New(nil, "", config1, logger.Named("fullNode"), nil, nil, nil, func(cs types.ConnStatus) { + w1PeersCh <- maps.Keys(cs.Peers) + }) + require.NoError(t, err) + require.NoError(t, w1.Start()) + defer func() { + require.NoError(t, w1.Stop()) + close(w1PeersCh) + }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + //Connect the relay peer and full node + err = w1.node.DialPeer(ctx, w0.node.ListenAddresses()[0].String()) + require.NoError(t, err) + + err = tt.RetryWithBackOff(func() error { + if len(w1.Peers()) == 0 { + return errors.New("no peers discovered") + } + return nil + }) + require.NoError(t, err) + + config2 := &Config{} + setDefaultConfig(config2, true) + w2PeersCh := make(chan []string, 5) // buffered not to block on the send side + + // Start the light node + w2, err := New(nil, "", config2, logger.Named("lightNode"), nil, nil, nil, func(cs types.ConnStatus) { + w2PeersCh <- maps.Keys(cs.Peers) + }) + require.NoError(t, err) + require.NoError(t, w2.Start()) + defer func() { + require.NoError(t, w2.Stop()) + close(w2PeersCh) + }() + + //Use this instead of DialPeer to make sure the peer is added to PeerStore and can be selected for Lighpush + w2.node.AddDiscoveredPeer(w1.PeerID(), w1.node.ListenAddresses(), wps.Static, w1.cfg.DefaultShardedPubsubTopics, w1.node.ENR(), true) + + waitForPeerConnectionWithTimeout(t, w2.node.ID(), w1PeersCh, 5*time.Second) + + event := make(chan common.EnvelopeEvent, 10) + w2.SubscribeEnvelopeEvents(event) + + for i := range [4]int{} { + msgTimestamp := w2.timestamp() + _, err := w2.Send(config2.DefaultShardPubsubTopic, &pb.WakuMessage{ + Payload: []byte{1, 2, 3, 4, 5, 6, byte(i)}, + ContentTopic: maps.Keys(contentTopics)[0].ContentTopic(), + Version: proto.Uint32(0), + Timestamp: &msgTimestamp, + }) + + require.NoError(t, err) + + time.Sleep(550 * time.Millisecond) + + } + + messages := filter.Retrieve() + require.Len(t, messages, 2) + +}