feat: support for lightpush to use more than 1 peer

This commit is contained in:
Prem Chaitanya Prathi 2024-07-11 15:15:44 +05:30
parent c2366e7c47
commit 36bd8d0663
4 changed files with 84 additions and 32 deletions

View File

@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"math"
"sync"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
@ -187,7 +188,7 @@ func (wakuLP *WakuLightPush) reply(stream network.Stream, responsePushRPC *pb.Pu
}
// request sends a message via lightPush protocol to either a specified peer or peer that is selected.
func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushRequestParameters) (*pb.PushResponse, error) {
func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushRequestParameters, peer peer.ID) (*pb.PushResponse, error) {
if params == nil {
return nil, errors.New("lightpush params are mandatory")
}
@ -196,9 +197,9 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, p
return nil, ErrInvalidID
}
logger := wakuLP.log.With(logging.HostID("peer", params.selectedPeer))
logger := wakuLP.log.With(logging.HostID("peer", peer))
stream, err := wakuLP.h.NewStream(ctx, params.selectedPeer, LightPushID_v20beta1)
stream, err := wakuLP.h.NewStream(ctx, peer, LightPushID_v20beta1)
if err != nil {
logger.Error("creating stream to peer", zap.Error(err))
wakuLP.metrics.RecordError(dialFailure)
@ -281,10 +282,10 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe
return nil, err
}
wakuLP.pm.Connect(pData)
params.selectedPeer = pData.AddrInfo.ID
params.selectedPeers = append(params.selectedPeers, pData.AddrInfo.ID)
}
if params.pm != nil && params.selectedPeer == "" {
reqPeerCount := params.maxPeers - len(params.selectedPeers)
if params.pm != nil && reqPeerCount > 0 {
var selectedPeers peer.IDSlice
//TODO: update this to work with multiple peer selection
selectedPeers, err = wakuLP.pm.SelectPeers(
@ -293,17 +294,17 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe
Proto: LightPushID_v20beta1,
PubsubTopics: []string{params.pubsubTopic},
SpecificPeers: params.preferredPeers,
MaxPeers: reqPeerCount,
Ctx: ctx,
},
)
if err == nil {
params.selectedPeer = selectedPeers[0]
params.selectedPeers = append(params.selectedPeers, selectedPeers...)
}
}
if params.selectedPeer == "" {
if len(params.selectedPeers) == 0 {
if err != nil {
params.log.Error("selecting peer", zap.Error(err))
params.log.Error("selecting peers", zap.Error(err))
wakuLP.metrics.RecordError(peerNotFoundFailure)
return nil, ErrNoPeersAvailable
}
@ -327,25 +328,41 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessa
req.Message = message
req.PubsubTopic = params.pubsubTopic
logger := message.Logger(wakuLP.log, params.pubsubTopic).With(logging.HostID("peerID", params.selectedPeer))
logger := message.Logger(wakuLP.log, params.pubsubTopic).With(zap.Stringers("peerIDs", params.selectedPeers))
logger.Debug("publishing message")
response, err := wakuLP.request(ctx, req, params)
if err != nil {
logger.Error("could not publish message", zap.Error(err))
return wpb.MessageHash{}, err
var wg sync.WaitGroup
var responses []*pb.PushResponse
for _, peerID := range params.selectedPeers {
wg.Add(1)
go func(id peer.ID) {
defer wg.Done()
response, err := wakuLP.request(ctx, req, params, id)
if err != nil {
logger.Error("could not publish message", zap.Error(err), zap.Stringer("peer", id))
}
responses = append(responses, response)
}(peerID)
}
if response.IsSuccess {
hash := message.Hash(params.pubsubTopic)
utils.MessagesLogger("lightpush").Debug("waku.lightpush published", logging.HexBytes("hash", hash[:]))
return hash, nil
}
wg.Wait()
var successCount int
errMsg := "lightpush error"
if response.Info != nil {
errMsg = *response.Info
for _, response := range responses {
if response.GetIsSuccess() {
successCount++
} else {
if response.GetInfo() != "" {
errMsg += *response.Info
}
}
}
//in case of partial failure, should we retry here or build a layer above that takes care of these things?
if successCount > 0 {
hash := message.Hash(params.pubsubTopic)
utils.MessagesLogger("lightpush").Debug("waku.lightpush published", logging.HexBytes("hash", hash[:]), zap.Int("num-peers", len(responses)))
return hash, nil
}
return wpb.MessageHash{}, errors.New(errMsg)

View File

@ -29,7 +29,8 @@ func WithRateLimiter(r rate.Limit, b int) Option {
type lightPushRequestParameters struct {
host host.Host
peerAddr multiaddr.Multiaddr
selectedPeer peer.ID
selectedPeers peer.IDSlice
maxPeers int
peerSelectionType peermanager.PeerSelection
preferredPeers peer.IDSlice
requestID []byte
@ -41,10 +42,17 @@ type lightPushRequestParameters struct {
// RequestOption is the type of options accepted when performing LightPush protocol requests
type RequestOption func(*lightPushRequestParameters) error
func WithMaxPeers(num int) RequestOption {
return func(params *lightPushRequestParameters) error {
params.maxPeers = num
return nil
}
}
// WithPeer is an option used to specify the peerID to push a waku message to
func WithPeer(p peer.ID) RequestOption {
return func(params *lightPushRequestParameters) error {
params.selectedPeer = p
params.selectedPeers = append(params.selectedPeers, p)
if params.peerAddr != nil {
return errors.New("peerAddr and peerId options are mutually exclusive")
}
@ -58,7 +66,7 @@ func WithPeer(p peer.ID) RequestOption {
func WithPeerAddr(pAddr multiaddr.Multiaddr) RequestOption {
return func(params *lightPushRequestParameters) error {
params.peerAddr = pAddr
if params.selectedPeer != "" {
if len(params.selectedPeers) != 0 {
return errors.New("peerAddr and peerId options are mutually exclusive")
}
return nil
@ -127,5 +135,6 @@ func DefaultOptions(host host.Host) []RequestOption {
return []RequestOption{
WithAutomaticRequestID(),
WithAutomaticPeerSelection(),
WithMaxPeers(1), //keeping default as 2 for status use-case
}
}

View File

@ -36,7 +36,7 @@ func TestLightPushOption(t *testing.T) {
}
require.Equal(t, host, params.host)
require.NotNil(t, params.selectedPeer)
require.NotEqual(t, 0, len(params.selectedPeers))
require.NotNil(t, params.requestID)
maddr, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/12345/p2p/16Uiu2HAm8KUwGRruseAaEGD6xGg6XKrDo8Py5dwDoL9wUpCxawGy")

View File

@ -43,15 +43,19 @@ func makeWakuRelay(t *testing.T, pusubTopic string) (*relay.WakuRelay, *relay.Su
// Node1: Relay
// Node2: Relay+Lightpush
// Node3: Relay+Lightpush
// Client that will lightpush a message
//
// Node1 and Node 2 are peers
// Node1 and Node 3 are peers
// Client and Node 2 are peers
// Client will use lightpush request, sending the message to Node2
// Client and Node 3 are peers
// Client will use lightpush request, sending the message to Node2 and Node3
//
// Client send a successful message using lightpush
// Node2 receive the message and broadcast it
// Node1 receive the message
// Node2, Node3 receive the message and broadcast it
// Node1 receive the messages
func TestWakuLightPush(t *testing.T) {
testTopic := "/waku/2/go/lightpush/test"
node1, sub1, host1 := makeWakuRelay(t, testTopic)
@ -69,6 +73,16 @@ func TestWakuLightPush(t *testing.T) {
require.NoError(t, err)
defer lightPushNode2.Stop()
node3, sub3, host3 := makeWakuRelay(t, testTopic)
defer node3.Stop()
defer sub3.Unsubscribe()
lightPushNode3 := NewWakuLightPush(node3, nil, prometheus.DefaultRegisterer, utils.Logger())
lightPushNode3.SetHost(host3)
err = lightPushNode3.Start(ctx)
require.NoError(t, err)
defer lightPushNode3.Stop()
port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)
@ -84,10 +98,21 @@ func TestWakuLightPush(t *testing.T) {
err = host2.Connect(ctx, host2.Peerstore().PeerInfo(host1.ID()))
require.NoError(t, err)
host3.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL)
err = host3.Peerstore().AddProtocols(host1.ID(), relay.WakuRelayID_v200)
require.NoError(t, err)
err = host3.Connect(ctx, host3.Peerstore().PeerInfo(host1.ID()))
require.NoError(t, err)
clientHost.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
err = clientHost.Peerstore().AddProtocols(host2.ID(), LightPushID_v20beta1)
require.NoError(t, err)
clientHost.Peerstore().AddAddr(host3.ID(), tests.GetHostAddress(host3), peerstore.PermanentAddrTTL)
err = clientHost.Peerstore().AddProtocols(host3.ID(), LightPushID_v20beta1)
require.NoError(t, err)
msg2 := tests.CreateWakuMessage("test2", utils.GetUnixEpoch())
// Wait for the mesh connection to happen between node1 and node2
@ -109,6 +134,7 @@ func TestWakuLightPush(t *testing.T) {
var lpOptions []RequestOption
lpOptions = append(lpOptions, WithPubSubTopic(testTopic))
lpOptions = append(lpOptions, WithPeer(host2.ID()))
lpOptions = append(lpOptions, WithMaxPeers(2))
// Checking that msg hash is correct
hash, err := client.Publish(ctx, msg2, lpOptions...)