feat(waku)_: add lightpush rate-limiter (#5504)
* feat(waku)_: add lightpush rate-limiter * chore_: update go-waku
This commit is contained in:
parent
5f0d344b73
commit
ea35803eef
2
go.mod
2
go.mod
|
@ -96,7 +96,7 @@ require (
|
|||
github.com/schollz/peerdiscovery v1.7.0
|
||||
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
|
||||
github.com/urfave/cli/v2 v2.27.2
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240711160252-9412af28dd81
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240712043904-2f333c1e1c13
|
||||
github.com/wk8/go-ordered-map/v2 v2.1.7
|
||||
github.com/yeqown/go-qrcode/v2 v2.2.1
|
||||
github.com/yeqown/go-qrcode/writer/standard v1.2.1
|
||||
|
|
4
go.sum
4
go.sum
|
@ -2143,8 +2143,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E
|
|||
github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ=
|
||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
|
||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240711160252-9412af28dd81 h1:0YV8dN8qdNV7MwcMB2s9ky826Tv5KAK3Untabf4RIAU=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240711160252-9412af28dd81/go.mod h1:ugDTCvcP6oJ9mTtINeo4EIsnC3oQCU3RsctNKu4MsRw=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240712043904-2f333c1e1c13 h1:2OCOlUdH4Vvt26Gj6EXnZpzgrHjWty9vUfm+6ZXvCxo=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240712043904-2f333c1e1c13/go.mod h1:ugDTCvcP6oJ9mTtINeo4EIsnC3oQCU3RsctNKu4MsRw=
|
||||
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
|
||||
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
|
||||
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=
|
||||
|
|
|
@ -272,7 +272,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
|
|||
}
|
||||
}
|
||||
|
||||
w.peerExchange, err = peer_exchange.NewWakuPeerExchange(w.DiscV5(), w.opts.clusterID, w.peerConnector, w.peermanager, w.opts.prometheusReg, w.log)
|
||||
w.peerExchange, err = peer_exchange.NewWakuPeerExchange(w.DiscV5(), w.opts.clusterID, w.peerConnector, w.peermanager, w.opts.prometheusReg, w.log, w.opts.peerExchangeOptions...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import (
|
|||
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange"
|
||||
"github.com/waku-org/go-waku/waku/v2/rendezvous"
|
||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
|
@ -102,7 +103,8 @@ type WakuNodeParameters struct {
|
|||
discV5bootnodes []*enode.Node
|
||||
discV5autoUpdate bool
|
||||
|
||||
enablePeerExchange bool
|
||||
enablePeerExchange bool
|
||||
peerExchangeOptions []peer_exchange.Option
|
||||
|
||||
enableRLN bool
|
||||
rlnRelayMemIndex *uint
|
||||
|
@ -411,9 +413,10 @@ func WithDiscoveryV5(udpPort uint, bootnodes []*enode.Node, autoUpdate bool) Wak
|
|||
}
|
||||
|
||||
// WithPeerExchange is a WakuOption used to enable Peer Exchange
|
||||
func WithPeerExchange() WakuNodeOption {
|
||||
func WithPeerExchange(options ...peer_exchange.Option) WakuNodeOption {
|
||||
return func(params *WakuNodeParameters) error {
|
||||
params.enablePeerExchange = true
|
||||
params.peerExchangeOptions = options
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
|
67
vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go
generated
vendored
67
vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go
generated
vendored
|
@ -6,6 +6,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
|
@ -187,7 +188,7 @@ func (wakuLP *WakuLightPush) reply(stream network.Stream, responsePushRPC *pb.Pu
|
|||
}
|
||||
|
||||
// request sends a message via lightPush protocol to either a specified peer or peer that is selected.
|
||||
func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushRequestParameters) (*pb.PushResponse, error) {
|
||||
func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushRequestParameters, peer peer.ID) (*pb.PushResponse, error) {
|
||||
if params == nil {
|
||||
return nil, errors.New("lightpush params are mandatory")
|
||||
}
|
||||
|
@ -196,9 +197,9 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p
|
|||
return nil, ErrInvalidID
|
||||
}
|
||||
|
||||
logger := wakuLP.log.With(logging.HostID("peer", params.selectedPeer))
|
||||
logger := wakuLP.log.With(logging.HostID("peer", peer))
|
||||
|
||||
stream, err := wakuLP.h.NewStream(ctx, params.selectedPeer, LightPushID_v20beta1)
|
||||
stream, err := wakuLP.h.NewStream(ctx, peer, LightPushID_v20beta1)
|
||||
if err != nil {
|
||||
logger.Error("creating stream to peer", zap.Error(err))
|
||||
wakuLP.metrics.RecordError(dialFailure)
|
||||
|
@ -281,10 +282,10 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe
|
|||
return nil, err
|
||||
}
|
||||
wakuLP.pm.Connect(pData)
|
||||
params.selectedPeer = pData.AddrInfo.ID
|
||||
params.selectedPeers = append(params.selectedPeers, pData.AddrInfo.ID)
|
||||
}
|
||||
|
||||
if params.pm != nil && params.selectedPeer == "" {
|
||||
reqPeerCount := params.maxPeers - len(params.selectedPeers)
|
||||
if params.pm != nil && reqPeerCount > 0 {
|
||||
var selectedPeers peer.IDSlice
|
||||
//TODO: update this to work with multiple peer selection
|
||||
selectedPeers, err = wakuLP.pm.SelectPeers(
|
||||
|
@ -293,17 +294,17 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe
|
|||
Proto: LightPushID_v20beta1,
|
||||
PubsubTopics: []string{params.pubsubTopic},
|
||||
SpecificPeers: params.preferredPeers,
|
||||
MaxPeers: reqPeerCount,
|
||||
Ctx: ctx,
|
||||
},
|
||||
)
|
||||
if err == nil {
|
||||
params.selectedPeer = selectedPeers[0]
|
||||
params.selectedPeers = append(params.selectedPeers, selectedPeers...)
|
||||
}
|
||||
|
||||
}
|
||||
if params.selectedPeer == "" {
|
||||
if len(params.selectedPeers) == 0 {
|
||||
if err != nil {
|
||||
params.log.Error("selecting peer", zap.Error(err))
|
||||
params.log.Error("selecting peers", zap.Error(err))
|
||||
wakuLP.metrics.RecordError(peerNotFoundFailure)
|
||||
return nil, ErrNoPeersAvailable
|
||||
}
|
||||
|
@ -327,25 +328,41 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessa
|
|||
req.Message = message
|
||||
req.PubsubTopic = params.pubsubTopic
|
||||
|
||||
logger := message.Logger(wakuLP.log, params.pubsubTopic).With(logging.HostID("peerID", params.selectedPeer))
|
||||
logger := message.Logger(wakuLP.log, params.pubsubTopic).With(zap.Stringers("peerIDs", params.selectedPeers))
|
||||
|
||||
logger.Debug("publishing message")
|
||||
|
||||
response, err := wakuLP.request(ctx, req, params)
|
||||
if err != nil {
|
||||
logger.Error("could not publish message", zap.Error(err))
|
||||
return wpb.MessageHash{}, err
|
||||
var wg sync.WaitGroup
|
||||
var responses []*pb.PushResponse
|
||||
for _, peerID := range params.selectedPeers {
|
||||
wg.Add(1)
|
||||
go func(id peer.ID) {
|
||||
defer wg.Done()
|
||||
response, err := wakuLP.request(ctx, req, params, id)
|
||||
if err != nil {
|
||||
logger.Error("could not publish message", zap.Error(err), zap.Stringer("peer", id))
|
||||
}
|
||||
responses = append(responses, response)
|
||||
}(peerID)
|
||||
}
|
||||
|
||||
if response.IsSuccess {
|
||||
hash := message.Hash(params.pubsubTopic)
|
||||
utils.MessagesLogger("lightpush").Debug("waku.lightpush published", logging.HexBytes("hash", hash[:]))
|
||||
return hash, nil
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
var successCount int
|
||||
errMsg := "lightpush error"
|
||||
if response.Info != nil {
|
||||
errMsg = *response.Info
|
||||
|
||||
for _, response := range responses {
|
||||
if response.GetIsSuccess() {
|
||||
successCount++
|
||||
} else {
|
||||
if response.GetInfo() != "" {
|
||||
errMsg += *response.Info
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//in case of partial failure, should we retry here or build a layer above that takes care of these things?
|
||||
if successCount > 0 {
|
||||
hash := message.Hash(params.pubsubTopic)
|
||||
utils.MessagesLogger("lightpush").Debug("waku.lightpush published", logging.HexBytes("hash", hash[:]), zap.Int("num-peers", len(responses)))
|
||||
return hash, nil
|
||||
}
|
||||
|
||||
return wpb.MessageHash{}, errors.New(errMsg)
|
||||
|
|
15
vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush_option.go
generated
vendored
15
vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush_option.go
generated
vendored
|
@ -29,7 +29,8 @@ func WithRateLimiter(r rate.Limit, b int) Option {
|
|||
type lightPushRequestParameters struct {
|
||||
host host.Host
|
||||
peerAddr multiaddr.Multiaddr
|
||||
selectedPeer peer.ID
|
||||
selectedPeers peer.IDSlice
|
||||
maxPeers int
|
||||
peerSelectionType peermanager.PeerSelection
|
||||
preferredPeers peer.IDSlice
|
||||
requestID []byte
|
||||
|
@ -41,10 +42,17 @@ type lightPushRequestParameters struct {
|
|||
// RequestOption is the type of options accepted when performing LightPush protocol requests
|
||||
type RequestOption func(*lightPushRequestParameters) error
|
||||
|
||||
func WithMaxPeers(num int) RequestOption {
|
||||
return func(params *lightPushRequestParameters) error {
|
||||
params.maxPeers = num
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithPeer is an option used to specify the peerID to push a waku message to
|
||||
func WithPeer(p peer.ID) RequestOption {
|
||||
return func(params *lightPushRequestParameters) error {
|
||||
params.selectedPeer = p
|
||||
params.selectedPeers = append(params.selectedPeers, p)
|
||||
if params.peerAddr != nil {
|
||||
return errors.New("peerAddr and peerId options are mutually exclusive")
|
||||
}
|
||||
|
@ -58,7 +66,7 @@ func WithPeer(p peer.ID) RequestOption {
|
|||
func WithPeerAddr(pAddr multiaddr.Multiaddr) RequestOption {
|
||||
return func(params *lightPushRequestParameters) error {
|
||||
params.peerAddr = pAddr
|
||||
if params.selectedPeer != "" {
|
||||
if len(params.selectedPeers) != 0 {
|
||||
return errors.New("peerAddr and peerId options are mutually exclusive")
|
||||
}
|
||||
return nil
|
||||
|
@ -127,5 +135,6 @@ func DefaultOptions(host host.Host) []RequestOption {
|
|||
return []RequestOption{
|
||||
WithAutomaticRequestID(),
|
||||
WithAutomaticPeerSelection(),
|
||||
WithMaxPeers(1), //keeping default as 2 for status use-case
|
||||
}
|
||||
}
|
||||
|
|
|
@ -100,6 +100,9 @@ func (wakuPX *WakuPeerExchange) onRequest() func(network.Stream) {
|
|||
wakuPX.metrics.RecordError(rateLimitFailure)
|
||||
wakuPX.log.Error("exceeds the rate limit")
|
||||
// TODO: peer exchange protocol should contain an err field
|
||||
if err := stream.Reset(); err != nil {
|
||||
wakuPX.log.Error("resetting connection", zap.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -1018,7 +1018,7 @@ github.com/waku-org/go-discover/discover/v5wire
|
|||
github.com/waku-org/go-libp2p-rendezvous
|
||||
github.com/waku-org/go-libp2p-rendezvous/db
|
||||
github.com/waku-org/go-libp2p-rendezvous/pb
|
||||
# github.com/waku-org/go-waku v0.8.1-0.20240711160252-9412af28dd81
|
||||
# github.com/waku-org/go-waku v0.8.1-0.20240712043904-2f333c1e1c13
|
||||
## explicit; go 1.21
|
||||
github.com/waku-org/go-waku/logging
|
||||
github.com/waku-org/go-waku/tests
|
||||
|
|
|
@ -330,7 +330,7 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge
|
|||
|
||||
if !cfg.LightClient {
|
||||
opts = append(opts, node.WithWakuFilterFullNode(filter.WithMaxSubscribers(20)))
|
||||
opts = append(opts, node.WithLightPush())
|
||||
opts = append(opts, node.WithLightPush(lightpush.WithRateLimiter(1, 1)))
|
||||
}
|
||||
|
||||
if appDB != nil {
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
"google.golang.org/protobuf/proto"
|
||||
|
||||
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
|
||||
wps "github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
|
@ -550,7 +551,11 @@ func TestWakuV2Store(t *testing.T) {
|
|||
}
|
||||
|
||||
func waitForPeerConnection(t *testing.T, peerID string, peerCh chan []string) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
waitForPeerConnectionWithTimeout(t, peerID, peerCh, 3*time.Second)
|
||||
}
|
||||
|
||||
func waitForPeerConnectionWithTimeout(t *testing.T, peerID string, peerCh chan []string, timeout time.Duration) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
for {
|
||||
select {
|
||||
|
@ -677,3 +682,104 @@ func TestOnlineChecker(t *testing.T) {
|
|||
lightNode.filterManager.addFilter("test", &common.Filter{})
|
||||
|
||||
}
|
||||
|
||||
func TestLightpushRateLimit(t *testing.T) {
|
||||
logger, err := zap.NewDevelopment()
|
||||
require.NoError(t, err)
|
||||
|
||||
config0 := &Config{}
|
||||
setDefaultConfig(config0, false)
|
||||
w0PeersCh := make(chan []string, 5) // buffered not to block on the send side
|
||||
|
||||
// Start the relayu node
|
||||
w0, err := New(nil, "", config0, logger.Named("relayNode"), nil, nil, nil, func(cs types.ConnStatus) {
|
||||
w0PeersCh <- maps.Keys(cs.Peers)
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, w0.Start())
|
||||
defer func() {
|
||||
require.NoError(t, w0.Stop())
|
||||
close(w0PeersCh)
|
||||
}()
|
||||
|
||||
contentTopics := common.NewTopicSetFromBytes([][]byte{{1, 2, 3, 4}})
|
||||
filter := &common.Filter{
|
||||
PubsubTopic: config0.DefaultShardPubsubTopic,
|
||||
Messages: common.NewMemoryMessageStore(),
|
||||
ContentTopics: contentTopics,
|
||||
}
|
||||
|
||||
_, err = w0.Subscribe(filter)
|
||||
require.NoError(t, err)
|
||||
|
||||
config1 := &Config{}
|
||||
setDefaultConfig(config1, false)
|
||||
w1PeersCh := make(chan []string, 5) // buffered not to block on the send side
|
||||
|
||||
// Start the full node
|
||||
w1, err := New(nil, "", config1, logger.Named("fullNode"), nil, nil, nil, func(cs types.ConnStatus) {
|
||||
w1PeersCh <- maps.Keys(cs.Peers)
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, w1.Start())
|
||||
defer func() {
|
||||
require.NoError(t, w1.Stop())
|
||||
close(w1PeersCh)
|
||||
}()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
//Connect the relay peer and full node
|
||||
err = w1.node.DialPeer(ctx, w0.node.ListenAddresses()[0].String())
|
||||
require.NoError(t, err)
|
||||
|
||||
err = tt.RetryWithBackOff(func() error {
|
||||
if len(w1.Peers()) == 0 {
|
||||
return errors.New("no peers discovered")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
config2 := &Config{}
|
||||
setDefaultConfig(config2, true)
|
||||
w2PeersCh := make(chan []string, 5) // buffered not to block on the send side
|
||||
|
||||
// Start the light node
|
||||
w2, err := New(nil, "", config2, logger.Named("lightNode"), nil, nil, nil, func(cs types.ConnStatus) {
|
||||
w2PeersCh <- maps.Keys(cs.Peers)
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, w2.Start())
|
||||
defer func() {
|
||||
require.NoError(t, w2.Stop())
|
||||
close(w2PeersCh)
|
||||
}()
|
||||
|
||||
//Use this instead of DialPeer to make sure the peer is added to PeerStore and can be selected for Lighpush
|
||||
w2.node.AddDiscoveredPeer(w1.PeerID(), w1.node.ListenAddresses(), wps.Static, w1.cfg.DefaultShardedPubsubTopics, w1.node.ENR(), true)
|
||||
|
||||
waitForPeerConnectionWithTimeout(t, w2.node.ID(), w1PeersCh, 5*time.Second)
|
||||
|
||||
event := make(chan common.EnvelopeEvent, 10)
|
||||
w2.SubscribeEnvelopeEvents(event)
|
||||
|
||||
for i := range [4]int{} {
|
||||
msgTimestamp := w2.timestamp()
|
||||
_, err := w2.Send(config2.DefaultShardPubsubTopic, &pb.WakuMessage{
|
||||
Payload: []byte{1, 2, 3, 4, 5, 6, byte(i)},
|
||||
ContentTopic: maps.Keys(contentTopics)[0].ContentTopic(),
|
||||
Version: proto.Uint32(0),
|
||||
Timestamp: &msgTimestamp,
|
||||
})
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(550 * time.Millisecond)
|
||||
|
||||
}
|
||||
|
||||
messages := filter.Retrieve()
|
||||
require.Len(t, messages, 2)
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue