feat_: increase peers to publish for lightpush to 2

This commit is contained in:
Prem Chaitanya Prathi 2024-07-11 15:27:56 +05:30
parent a986894c79
commit 89e914575a
6 changed files with 60 additions and 33 deletions

2
go.mod
View File

@ -96,7 +96,7 @@ require (
github.com/schollz/peerdiscovery v1.7.0 github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/urfave/cli/v2 v2.27.2 github.com/urfave/cli/v2 v2.27.2
github.com/waku-org/go-waku v0.8.1-0.20240711160252-9412af28dd81 github.com/waku-org/go-waku v0.8.1-0.20240712035823-bb74e39ed9ec
github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/wk8/go-ordered-map/v2 v2.1.7
github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1

4
go.sum
View File

@ -2143,8 +2143,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E
github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ= github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
github.com/waku-org/go-waku v0.8.1-0.20240711160252-9412af28dd81 h1:0YV8dN8qdNV7MwcMB2s9ky826Tv5KAK3Untabf4RIAU= github.com/waku-org/go-waku v0.8.1-0.20240712035823-bb74e39ed9ec h1:3e/uM2R0uRVTASzQrv/9NtdGL24QBd/Fi9aq7ygWS4M=
github.com/waku-org/go-waku v0.8.1-0.20240711160252-9412af28dd81/go.mod h1:ugDTCvcP6oJ9mTtINeo4EIsnC3oQCU3RsctNKu4MsRw= github.com/waku-org/go-waku v0.8.1-0.20240712035823-bb74e39ed9ec/go.mod h1:ugDTCvcP6oJ9mTtINeo4EIsnC3oQCU3RsctNKu4MsRw=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=

View File

@ -6,6 +6,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"math" "math"
"sync"
"github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/network"
@ -187,7 +188,7 @@ func (wakuLP *WakuLightPush) reply(stream network.Stream, responsePushRPC *pb.Pu
} }
// request sends a message via lightPush protocol to either a specified peer or peer that is selected. // request sends a message via lightPush protocol to either a specified peer or peer that is selected.
func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushRequestParameters) (*pb.PushResponse, error) { func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushRequestParameters, peer peer.ID) (*pb.PushResponse, error) {
if params == nil { if params == nil {
return nil, errors.New("lightpush params are mandatory") return nil, errors.New("lightpush params are mandatory")
} }
@ -196,9 +197,9 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p
return nil, ErrInvalidID return nil, ErrInvalidID
} }
logger := wakuLP.log.With(logging.HostID("peer", params.selectedPeer)) logger := wakuLP.log.With(logging.HostID("peer", peer))
stream, err := wakuLP.h.NewStream(ctx, params.selectedPeer, LightPushID_v20beta1) stream, err := wakuLP.h.NewStream(ctx, peer, LightPushID_v20beta1)
if err != nil { if err != nil {
logger.Error("creating stream to peer", zap.Error(err)) logger.Error("creating stream to peer", zap.Error(err))
wakuLP.metrics.RecordError(dialFailure) wakuLP.metrics.RecordError(dialFailure)
@ -281,10 +282,10 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe
return nil, err return nil, err
} }
wakuLP.pm.Connect(pData) wakuLP.pm.Connect(pData)
params.selectedPeer = pData.AddrInfo.ID params.selectedPeers = append(params.selectedPeers, pData.AddrInfo.ID)
} }
reqPeerCount := params.maxPeers - len(params.selectedPeers)
if params.pm != nil && params.selectedPeer == "" { if params.pm != nil && reqPeerCount > 0 {
var selectedPeers peer.IDSlice var selectedPeers peer.IDSlice
//TODO: update this to work with multiple peer selection //TODO: update this to work with multiple peer selection
selectedPeers, err = wakuLP.pm.SelectPeers( selectedPeers, err = wakuLP.pm.SelectPeers(
@ -293,17 +294,17 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe
Proto: LightPushID_v20beta1, Proto: LightPushID_v20beta1,
PubsubTopics: []string{params.pubsubTopic}, PubsubTopics: []string{params.pubsubTopic},
SpecificPeers: params.preferredPeers, SpecificPeers: params.preferredPeers,
MaxPeers: reqPeerCount,
Ctx: ctx, Ctx: ctx,
}, },
) )
if err == nil { if err == nil {
params.selectedPeer = selectedPeers[0] params.selectedPeers = append(params.selectedPeers, selectedPeers...)
} }
} }
if params.selectedPeer == "" { if len(params.selectedPeers) == 0 {
if err != nil { if err != nil {
params.log.Error("selecting peer", zap.Error(err)) params.log.Error("selecting peers", zap.Error(err))
wakuLP.metrics.RecordError(peerNotFoundFailure) wakuLP.metrics.RecordError(peerNotFoundFailure)
return nil, ErrNoPeersAvailable return nil, ErrNoPeersAvailable
} }
@ -327,25 +328,41 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessa
req.Message = message req.Message = message
req.PubsubTopic = params.pubsubTopic req.PubsubTopic = params.pubsubTopic
logger := message.Logger(wakuLP.log, params.pubsubTopic).With(logging.HostID("peerID", params.selectedPeer)) logger := message.Logger(wakuLP.log, params.pubsubTopic).With(zap.Stringers("peerIDs", params.selectedPeers))
logger.Debug("publishing message") logger.Debug("publishing message")
var wg sync.WaitGroup
response, err := wakuLP.request(ctx, req, params) var responses []*pb.PushResponse
if err != nil { for _, peerID := range params.selectedPeers {
logger.Error("could not publish message", zap.Error(err)) wg.Add(1)
return wpb.MessageHash{}, err go func(id peer.ID) {
defer wg.Done()
response, err := wakuLP.request(ctx, req, params, id)
if err != nil {
logger.Error("could not publish message", zap.Error(err), zap.Stringer("peer", id))
}
responses = append(responses, response)
}(peerID)
} }
wg.Wait()
if response.IsSuccess { var successCount int
hash := message.Hash(params.pubsubTopic)
utils.MessagesLogger("lightpush").Debug("waku.lightpush published", logging.HexBytes("hash", hash[:]))
return hash, nil
}
errMsg := "lightpush error" errMsg := "lightpush error"
if response.Info != nil {
errMsg = *response.Info for _, response := range responses {
if response.GetIsSuccess() {
successCount++
} else {
if response.GetInfo() != "" {
errMsg += *response.Info
}
}
}
//in case of partial failure, should we retry here or build a layer above that takes care of these things?
if successCount > 0 {
hash := message.Hash(params.pubsubTopic)
utils.MessagesLogger("lightpush").Debug("waku.lightpush published", logging.HexBytes("hash", hash[:]), zap.Int("num-peers", len(responses)))
return hash, nil
} }
return wpb.MessageHash{}, errors.New(errMsg) return wpb.MessageHash{}, errors.New(errMsg)

View File

@ -29,7 +29,8 @@ func WithRateLimiter(r rate.Limit, b int) Option {
type lightPushRequestParameters struct { type lightPushRequestParameters struct {
host host.Host host host.Host
peerAddr multiaddr.Multiaddr peerAddr multiaddr.Multiaddr
selectedPeer peer.ID selectedPeers peer.IDSlice
maxPeers int
peerSelectionType peermanager.PeerSelection peerSelectionType peermanager.PeerSelection
preferredPeers peer.IDSlice preferredPeers peer.IDSlice
requestID []byte requestID []byte
@ -41,10 +42,17 @@ type lightPushRequestParameters struct {
// RequestOption is the type of options accepted when performing LightPush protocol requests // RequestOption is the type of options accepted when performing LightPush protocol requests
type RequestOption func(*lightPushRequestParameters) error type RequestOption func(*lightPushRequestParameters) error
func WithMaxPeers(num int) RequestOption {
return func(params *lightPushRequestParameters) error {
params.maxPeers = num
return nil
}
}
// WithPeer is an option used to specify the peerID to push a waku message to // WithPeer is an option used to specify the peerID to push a waku message to
func WithPeer(p peer.ID) RequestOption { func WithPeer(p peer.ID) RequestOption {
return func(params *lightPushRequestParameters) error { return func(params *lightPushRequestParameters) error {
params.selectedPeer = p params.selectedPeers = append(params.selectedPeers, p)
if params.peerAddr != nil { if params.peerAddr != nil {
return errors.New("peerAddr and peerId options are mutually exclusive") return errors.New("peerAddr and peerId options are mutually exclusive")
} }
@ -58,7 +66,7 @@ func WithPeer(p peer.ID) RequestOption {
func WithPeerAddr(pAddr multiaddr.Multiaddr) RequestOption { func WithPeerAddr(pAddr multiaddr.Multiaddr) RequestOption {
return func(params *lightPushRequestParameters) error { return func(params *lightPushRequestParameters) error {
params.peerAddr = pAddr params.peerAddr = pAddr
if params.selectedPeer != "" { if len(params.selectedPeers) != 0 {
return errors.New("peerAddr and peerId options are mutually exclusive") return errors.New("peerAddr and peerId options are mutually exclusive")
} }
return nil return nil
@ -127,5 +135,6 @@ func DefaultOptions(host host.Host) []RequestOption {
return []RequestOption{ return []RequestOption{
WithAutomaticRequestID(), WithAutomaticRequestID(),
WithAutomaticPeerSelection(), WithAutomaticPeerSelection(),
WithMaxPeers(1), //keeping default as 2 for status use-case
} }
} }

2
vendor/modules.txt vendored
View File

@ -1018,7 +1018,7 @@ github.com/waku-org/go-discover/discover/v5wire
github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous
github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/db
github.com/waku-org/go-libp2p-rendezvous/pb github.com/waku-org/go-libp2p-rendezvous/pb
# github.com/waku-org/go-waku v0.8.1-0.20240711160252-9412af28dd81 # github.com/waku-org/go-waku v0.8.1-0.20240712035823-bb74e39ed9ec
## explicit; go 1.21 ## explicit; go 1.21
github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/logging
github.com/waku-org/go-waku/tests github.com/waku-org/go-waku/tests

View File

@ -91,6 +91,7 @@ const messageExpiredPerid = 10 // in seconds
const maxRelayPeers = 300 const maxRelayPeers = 300
const randomPeersKeepAliveInterval = 5 * time.Second const randomPeersKeepAliveInterval = 5 * time.Second
const allPeersKeepAliveInterval = 5 * time.Minute const allPeersKeepAliveInterval = 5 * time.Minute
const PeersToPublishForLightpush = 2
type SentEnvelope struct { type SentEnvelope struct {
Envelope *protocol.Envelope Envelope *protocol.Envelope
@ -1009,7 +1010,7 @@ func (w *Waku) broadcast() {
publishMethod = LightPush publishMethod = LightPush
fn = func(env *protocol.Envelope, logger *zap.Logger) error { fn = func(env *protocol.Envelope, logger *zap.Logger) error {
logger.Info("publishing message via lightpush") logger.Info("publishing message via lightpush")
_, err := w.node.Lightpush().Publish(w.ctx, env.Message(), lightpush.WithPubSubTopic(env.PubsubTopic())) _, err := w.node.Lightpush().Publish(w.ctx, env.Message(), lightpush.WithPubSubTopic(env.PubsubTopic()), lightpush.WithMaxPeers(PeersToPublishForLightpush))
return err return err
} }
} else { } else {