diff --git a/go.mod b/go.mod index 511015217..3fcbf82e6 100644 --- a/go.mod +++ b/go.mod @@ -96,7 +96,7 @@ require ( github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20240711160252-9412af28dd81 + github.com/waku-org/go-waku v0.8.1-0.20240712035823-bb74e39ed9ec github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 diff --git a/go.sum b/go.sum index 023138815..4966eb9d4 100644 --- a/go.sum +++ b/go.sum @@ -2143,8 +2143,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20240711160252-9412af28dd81 h1:0YV8dN8qdNV7MwcMB2s9ky826Tv5KAK3Untabf4RIAU= -github.com/waku-org/go-waku v0.8.1-0.20240711160252-9412af28dd81/go.mod h1:ugDTCvcP6oJ9mTtINeo4EIsnC3oQCU3RsctNKu4MsRw= +github.com/waku-org/go-waku v0.8.1-0.20240712035823-bb74e39ed9ec h1:3e/uM2R0uRVTASzQrv/9NtdGL24QBd/Fi9aq7ygWS4M= +github.com/waku-org/go-waku v0.8.1-0.20240712035823-bb74e39ed9ec/go.mod h1:ugDTCvcP6oJ9mTtINeo4EIsnC3oQCU3RsctNKu4MsRw= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go index 19708d116..b8f3c0f39 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "math" + "sync" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" @@ -187,7 +188,7 @@ func (wakuLP *WakuLightPush) reply(stream network.Stream, responsePushRPC *pb.Pu } // request sends a message via lightPush protocol to either a specified peer or peer that is selected. -func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushRequestParameters) (*pb.PushResponse, error) { +func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushRequestParameters, peer peer.ID) (*pb.PushResponse, error) { if params == nil { return nil, errors.New("lightpush params are mandatory") } @@ -196,9 +197,9 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p return nil, ErrInvalidID } - logger := wakuLP.log.With(logging.HostID("peer", params.selectedPeer)) + logger := wakuLP.log.With(logging.HostID("peer", peer)) - stream, err := wakuLP.h.NewStream(ctx, params.selectedPeer, LightPushID_v20beta1) + stream, err := wakuLP.h.NewStream(ctx, peer, LightPushID_v20beta1) if err != nil { logger.Error("creating stream to peer", zap.Error(err)) wakuLP.metrics.RecordError(dialFailure) @@ -281,10 +282,10 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe return nil, err } wakuLP.pm.Connect(pData) - params.selectedPeer = pData.AddrInfo.ID + params.selectedPeers = append(params.selectedPeers, pData.AddrInfo.ID) } - - if params.pm != nil && params.selectedPeer == "" { + reqPeerCount := params.maxPeers - len(params.selectedPeers) + if params.pm != nil && reqPeerCount > 0 { var selectedPeers peer.IDSlice //TODO: update this to work with multiple peer selection selectedPeers, err = wakuLP.pm.SelectPeers( @@ -293,17 +294,17 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe Proto: LightPushID_v20beta1, PubsubTopics: []string{params.pubsubTopic}, SpecificPeers: params.preferredPeers, + MaxPeers: reqPeerCount, Ctx: ctx, }, ) if err == nil { - params.selectedPeer = selectedPeers[0] + params.selectedPeers = append(params.selectedPeers, selectedPeers...) } - } - if params.selectedPeer == "" { + if len(params.selectedPeers) == 0 { if err != nil { - params.log.Error("selecting peer", zap.Error(err)) + params.log.Error("selecting peers", zap.Error(err)) wakuLP.metrics.RecordError(peerNotFoundFailure) return nil, ErrNoPeersAvailable } @@ -327,25 +328,41 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessa req.Message = message req.PubsubTopic = params.pubsubTopic - logger := message.Logger(wakuLP.log, params.pubsubTopic).With(logging.HostID("peerID", params.selectedPeer)) + logger := message.Logger(wakuLP.log, params.pubsubTopic).With(zap.Stringers("peerIDs", params.selectedPeers)) logger.Debug("publishing message") - - response, err := wakuLP.request(ctx, req, params) - if err != nil { - logger.Error("could not publish message", zap.Error(err)) - return wpb.MessageHash{}, err + var wg sync.WaitGroup + var responses []*pb.PushResponse + for _, peerID := range params.selectedPeers { + wg.Add(1) + go func(id peer.ID) { + defer wg.Done() + response, err := wakuLP.request(ctx, req, params, id) + if err != nil { + logger.Error("could not publish message", zap.Error(err), zap.Stringer("peer", id)) + } + responses = append(responses, response) + }(peerID) } - - if response.IsSuccess { - hash := message.Hash(params.pubsubTopic) - utils.MessagesLogger("lightpush").Debug("waku.lightpush published", logging.HexBytes("hash", hash[:])) - return hash, nil - } - + wg.Wait() + var successCount int errMsg := "lightpush error" - if response.Info != nil { - errMsg = *response.Info + + for _, response := range responses { + if response.GetIsSuccess() { + successCount++ + } else { + if response.GetInfo() != "" { + errMsg += *response.Info + } + } + } + + //in case of partial failure, should we retry here or build a layer above that takes care of these things? + if successCount > 0 { + hash := message.Hash(params.pubsubTopic) + utils.MessagesLogger("lightpush").Debug("waku.lightpush published", logging.HexBytes("hash", hash[:]), zap.Int("num-peers", len(responses))) + return hash, nil } return wpb.MessageHash{}, errors.New(errMsg) diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush_option.go b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush_option.go index 6ec258990..b192fd17a 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush_option.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -29,7 +29,8 @@ func WithRateLimiter(r rate.Limit, b int) Option { type lightPushRequestParameters struct { host host.Host peerAddr multiaddr.Multiaddr - selectedPeer peer.ID + selectedPeers peer.IDSlice + maxPeers int peerSelectionType peermanager.PeerSelection preferredPeers peer.IDSlice requestID []byte @@ -41,10 +42,17 @@ type lightPushRequestParameters struct { // RequestOption is the type of options accepted when performing LightPush protocol requests type RequestOption func(*lightPushRequestParameters) error +func WithMaxPeers(num int) RequestOption { + return func(params *lightPushRequestParameters) error { + params.maxPeers = num + return nil + } +} + // WithPeer is an option used to specify the peerID to push a waku message to func WithPeer(p peer.ID) RequestOption { return func(params *lightPushRequestParameters) error { - params.selectedPeer = p + params.selectedPeers = append(params.selectedPeers, p) if params.peerAddr != nil { return errors.New("peerAddr and peerId options are mutually exclusive") } @@ -58,7 +66,7 @@ func WithPeer(p peer.ID) RequestOption { func WithPeerAddr(pAddr multiaddr.Multiaddr) RequestOption { return func(params *lightPushRequestParameters) error { params.peerAddr = pAddr - if params.selectedPeer != "" { + if len(params.selectedPeers) != 0 { return errors.New("peerAddr and peerId options are mutually exclusive") } return nil @@ -127,5 +135,6 @@ func DefaultOptions(host host.Host) []RequestOption { return []RequestOption{ WithAutomaticRequestID(), WithAutomaticPeerSelection(), + WithMaxPeers(1), //keeping default as 2 for status use-case } } diff --git a/vendor/modules.txt b/vendor/modules.txt index f27ee62cd..360e3854f 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1018,7 +1018,7 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20240711160252-9412af28dd81 +# github.com/waku-org/go-waku v0.8.1-0.20240712035823-bb74e39ed9ec ## explicit; go 1.21 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/tests diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 35a80208f..10ad362ea 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -91,6 +91,7 @@ const messageExpiredPerid = 10 // in seconds const maxRelayPeers = 300 const randomPeersKeepAliveInterval = 5 * time.Second const allPeersKeepAliveInterval = 5 * time.Minute +const PeersToPublishForLightpush = 2 type SentEnvelope struct { Envelope *protocol.Envelope @@ -1009,7 +1010,7 @@ func (w *Waku) broadcast() { publishMethod = LightPush fn = func(env *protocol.Envelope, logger *zap.Logger) error { logger.Info("publishing message via lightpush") - _, err := w.node.Lightpush().Publish(w.ctx, env.Message(), lightpush.WithPubSubTopic(env.PubsubTopic())) + _, err := w.node.Lightpush().Publish(w.ctx, env.Message(), lightpush.WithPubSubTopic(env.PubsubTopic()), lightpush.WithMaxPeers(PeersToPublishForLightpush)) return err } } else {