From 36bd8d0663848d4165b13f80a20bb117d0519b09 Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Thu, 11 Jul 2024 15:15:44 +0530 Subject: [PATCH] feat: support for lightpush to use more than 1 peer --- waku/v2/protocol/lightpush/waku_lightpush.go | 67 ++++++++++++------- .../lightpush/waku_lightpush_option.go | 15 ++++- .../lightpush/waku_lightpush_option_test.go | 2 +- .../protocol/lightpush/waku_lightpush_test.go | 32 ++++++++- 4 files changed, 84 insertions(+), 32 deletions(-) diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 19708d11..b8f3c0f3 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "math" + "sync" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" @@ -187,7 +188,7 @@ func (wakuLP *WakuLightPush) reply(stream network.Stream, responsePushRPC *pb.Pu } // request sends a message via lightPush protocol to either a specified peer or peer that is selected. -func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushRequestParameters) (*pb.PushResponse, error) { +func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushRequestParameters, peer peer.ID) (*pb.PushResponse, error) { if params == nil { return nil, errors.New("lightpush params are mandatory") } @@ -196,9 +197,9 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p return nil, ErrInvalidID } - logger := wakuLP.log.With(logging.HostID("peer", params.selectedPeer)) + logger := wakuLP.log.With(logging.HostID("peer", peer)) - stream, err := wakuLP.h.NewStream(ctx, params.selectedPeer, LightPushID_v20beta1) + stream, err := wakuLP.h.NewStream(ctx, peer, LightPushID_v20beta1) if err != nil { logger.Error("creating stream to peer", zap.Error(err)) wakuLP.metrics.RecordError(dialFailure) @@ -281,10 +282,10 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe return nil, err } wakuLP.pm.Connect(pData) - params.selectedPeer = pData.AddrInfo.ID + params.selectedPeers = append(params.selectedPeers, pData.AddrInfo.ID) } - - if params.pm != nil && params.selectedPeer == "" { + reqPeerCount := params.maxPeers - len(params.selectedPeers) + if params.pm != nil && reqPeerCount > 0 { var selectedPeers peer.IDSlice //TODO: update this to work with multiple peer selection selectedPeers, err = wakuLP.pm.SelectPeers( @@ -293,17 +294,17 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe Proto: LightPushID_v20beta1, PubsubTopics: []string{params.pubsubTopic}, SpecificPeers: params.preferredPeers, + MaxPeers: reqPeerCount, Ctx: ctx, }, ) if err == nil { - params.selectedPeer = selectedPeers[0] + params.selectedPeers = append(params.selectedPeers, selectedPeers...) } - } - if params.selectedPeer == "" { + if len(params.selectedPeers) == 0 { if err != nil { - params.log.Error("selecting peer", zap.Error(err)) + params.log.Error("selecting peers", zap.Error(err)) wakuLP.metrics.RecordError(peerNotFoundFailure) return nil, ErrNoPeersAvailable } @@ -327,25 +328,41 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessa req.Message = message req.PubsubTopic = params.pubsubTopic - logger := message.Logger(wakuLP.log, params.pubsubTopic).With(logging.HostID("peerID", params.selectedPeer)) + logger := message.Logger(wakuLP.log, params.pubsubTopic).With(zap.Stringers("peerIDs", params.selectedPeers)) logger.Debug("publishing message") - - response, err := wakuLP.request(ctx, req, params) - if err != nil { - logger.Error("could not publish message", zap.Error(err)) - return wpb.MessageHash{}, err + var wg sync.WaitGroup + var responses []*pb.PushResponse + for _, peerID := range params.selectedPeers { + wg.Add(1) + go func(id peer.ID) { + defer wg.Done() + response, err := wakuLP.request(ctx, req, params, id) + if err != nil { + logger.Error("could not publish message", zap.Error(err), zap.Stringer("peer", id)) + } + responses = append(responses, response) + }(peerID) } - - if response.IsSuccess { - hash := message.Hash(params.pubsubTopic) - utils.MessagesLogger("lightpush").Debug("waku.lightpush published", logging.HexBytes("hash", hash[:])) - return hash, nil - } - + wg.Wait() + var successCount int errMsg := "lightpush error" - if response.Info != nil { - errMsg = *response.Info + + for _, response := range responses { + if response.GetIsSuccess() { + successCount++ + } else { + if response.GetInfo() != "" { + errMsg += *response.Info + } + } + } + + //in case of partial failure, should we retry here or build a layer above that takes care of these things? + if successCount > 0 { + hash := message.Hash(params.pubsubTopic) + utils.MessagesLogger("lightpush").Debug("waku.lightpush published", logging.HexBytes("hash", hash[:]), zap.Int("num-peers", len(responses))) + return hash, nil } return wpb.MessageHash{}, errors.New(errMsg) diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option.go b/waku/v2/protocol/lightpush/waku_lightpush_option.go index 6ec25899..b192fd17 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -29,7 +29,8 @@ func WithRateLimiter(r rate.Limit, b int) Option { type lightPushRequestParameters struct { host host.Host peerAddr multiaddr.Multiaddr - selectedPeer peer.ID + selectedPeers peer.IDSlice + maxPeers int peerSelectionType peermanager.PeerSelection preferredPeers peer.IDSlice requestID []byte @@ -41,10 +42,17 @@ type lightPushRequestParameters struct { // RequestOption is the type of options accepted when performing LightPush protocol requests type RequestOption func(*lightPushRequestParameters) error +func WithMaxPeers(num int) RequestOption { + return func(params *lightPushRequestParameters) error { + params.maxPeers = num + return nil + } +} + // WithPeer is an option used to specify the peerID to push a waku message to func WithPeer(p peer.ID) RequestOption { return func(params *lightPushRequestParameters) error { - params.selectedPeer = p + params.selectedPeers = append(params.selectedPeers, p) if params.peerAddr != nil { return errors.New("peerAddr and peerId options are mutually exclusive") } @@ -58,7 +66,7 @@ func WithPeer(p peer.ID) RequestOption { func WithPeerAddr(pAddr multiaddr.Multiaddr) RequestOption { return func(params *lightPushRequestParameters) error { params.peerAddr = pAddr - if params.selectedPeer != "" { + if len(params.selectedPeers) != 0 { return errors.New("peerAddr and peerId options are mutually exclusive") } return nil @@ -127,5 +135,6 @@ func DefaultOptions(host host.Host) []RequestOption { return []RequestOption{ WithAutomaticRequestID(), WithAutomaticPeerSelection(), + WithMaxPeers(1), //keeping default as 2 for status use-case } } diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option_test.go b/waku/v2/protocol/lightpush/waku_lightpush_option_test.go index 6998b1fa..94d2bea9 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option_test.go @@ -36,7 +36,7 @@ func TestLightPushOption(t *testing.T) { } require.Equal(t, host, params.host) - require.NotNil(t, params.selectedPeer) + require.NotEqual(t, 0, len(params.selectedPeers)) require.NotNil(t, params.requestID) maddr, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/12345/p2p/16Uiu2HAm8KUwGRruseAaEGD6xGg6XKrDo8Py5dwDoL9wUpCxawGy") diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index 99525b1e..8f8a55c4 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -43,15 +43,19 @@ func makeWakuRelay(t *testing.T, pusubTopic string) (*relay.WakuRelay, *relay.Su // Node1: Relay // Node2: Relay+Lightpush +// Node3: Relay+Lightpush + // Client that will lightpush a message // // Node1 and Node 2 are peers +// Node1 and Node 3 are peers // Client and Node 2 are peers -// Client will use lightpush request, sending the message to Node2 +// Client and Node 3 are peers +// Client will use lightpush request, sending the message to Node2 and Node3 // // Client send a successful message using lightpush -// Node2 receive the message and broadcast it -// Node1 receive the message +// Node2, Node3 receive the message and broadcast it +// Node1 receive the messages func TestWakuLightPush(t *testing.T) { testTopic := "/waku/2/go/lightpush/test" node1, sub1, host1 := makeWakuRelay(t, testTopic) @@ -69,6 +73,16 @@ func TestWakuLightPush(t *testing.T) { require.NoError(t, err) defer lightPushNode2.Stop() + node3, sub3, host3 := makeWakuRelay(t, testTopic) + defer node3.Stop() + defer sub3.Unsubscribe() + + lightPushNode3 := NewWakuLightPush(node3, nil, prometheus.DefaultRegisterer, utils.Logger()) + lightPushNode3.SetHost(host3) + err = lightPushNode3.Start(ctx) + require.NoError(t, err) + defer lightPushNode3.Stop() + port, err := tests.FindFreePort(t, "", 5) require.NoError(t, err) @@ -84,10 +98,21 @@ func TestWakuLightPush(t *testing.T) { err = host2.Connect(ctx, host2.Peerstore().PeerInfo(host1.ID())) require.NoError(t, err) + host3.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL) + err = host3.Peerstore().AddProtocols(host1.ID(), relay.WakuRelayID_v200) + require.NoError(t, err) + + err = host3.Connect(ctx, host3.Peerstore().PeerInfo(host1.ID())) + require.NoError(t, err) + clientHost.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL) err = clientHost.Peerstore().AddProtocols(host2.ID(), LightPushID_v20beta1) require.NoError(t, err) + clientHost.Peerstore().AddAddr(host3.ID(), tests.GetHostAddress(host3), peerstore.PermanentAddrTTL) + err = clientHost.Peerstore().AddProtocols(host3.ID(), LightPushID_v20beta1) + require.NoError(t, err) + msg2 := tests.CreateWakuMessage("test2", utils.GetUnixEpoch()) // Wait for the mesh connection to happen between node1 and node2 @@ -109,6 +134,7 @@ func TestWakuLightPush(t *testing.T) { var lpOptions []RequestOption lpOptions = append(lpOptions, WithPubSubTopic(testTopic)) lpOptions = append(lpOptions, WithPeer(host2.ID())) + lpOptions = append(lpOptions, WithMaxPeers(2)) // Checking that msg hash is correct hash, err := client.Publish(ctx, msg2, lpOptions...)