mirror of
https://github.com/logos-messaging/logos-messaging-go.git
synced 2026-01-08 08:53:12 +00:00
feat: add option to specify peerAddr for lightpush
This commit is contained in:
parent
446d1e5d1c
commit
04fe20a60c
@ -80,7 +80,7 @@ func WithPeer(p peer.ID) FilterSubscribeOption {
|
||||
}
|
||||
}
|
||||
|
||||
// WithPeerAddr is an option used to specify a peerAddress to request the message history.
|
||||
// WithPeerAddr is an option used to specify a peerAddress.
|
||||
// This new peer will be added to peerStore.
|
||||
// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used.
|
||||
func WithPeerAddr(pAddr multiaddr.Multiaddr) FilterSubscribeOption {
|
||||
|
||||
@ -30,7 +30,8 @@ func TestFilterOption(t *testing.T) {
|
||||
params.log = utils.Logger()
|
||||
|
||||
for _, opt := range options {
|
||||
_ = opt(params)
|
||||
err = opt(params)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
require.Equal(t, host, params.host)
|
||||
|
||||
@ -14,6 +14,7 @@ import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/waku-org/go-waku/logging"
|
||||
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||
"github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush/pb"
|
||||
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
@ -239,7 +240,10 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe
|
||||
|
||||
optList := append(DefaultOptions(wakuLP.h), opts...)
|
||||
for _, opt := range optList {
|
||||
opt(params)
|
||||
err := opt(params)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if params.pubsubTopic == "" {
|
||||
@ -249,6 +253,15 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe
|
||||
}
|
||||
}
|
||||
|
||||
if params.pm != nil && params.peerAddr != nil {
|
||||
pData, err := wakuLP.pm.AddPeer(params.peerAddr, peerstore.Static, []string{params.pubsubTopic}, LightPushID_v20beta1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
wakuLP.pm.Connect(pData)
|
||||
params.selectedPeer = pData.AddrInfo.ID
|
||||
}
|
||||
|
||||
if params.pm != nil && params.selectedPeer == "" {
|
||||
params.selectedPeer, err = wakuLP.pm.SelectPeer(
|
||||
peermanager.PeerSelectionCriteria{
|
||||
|
||||
@ -1,8 +1,11 @@
|
||||
package lightpush
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||
@ -11,6 +14,7 @@ import (
|
||||
|
||||
type lightPushParameters struct {
|
||||
host host.Host
|
||||
peerAddr multiaddr.Multiaddr
|
||||
selectedPeer peer.ID
|
||||
peerSelectionType peermanager.PeerSelection
|
||||
preferredPeers peer.IDSlice
|
||||
@ -21,12 +25,29 @@ type lightPushParameters struct {
|
||||
}
|
||||
|
||||
// Option is the type of options accepted when performing LightPush protocol requests
|
||||
type Option func(*lightPushParameters)
|
||||
type Option func(*lightPushParameters) error
|
||||
|
||||
// WithPeer is an option used to specify the peerID to push a waku message to
|
||||
func WithPeer(p peer.ID) Option {
|
||||
return func(params *lightPushParameters) {
|
||||
return func(params *lightPushParameters) error {
|
||||
params.selectedPeer = p
|
||||
if params.peerAddr != nil {
|
||||
return errors.New("peerAddr and peerId options are mutually exclusive")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithPeerAddr is an option used to specify a peerAddress
|
||||
// This new peer will be added to peerStore.
|
||||
// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used.
|
||||
func WithPeerAddr(pAddr multiaddr.Multiaddr) Option {
|
||||
return func(params *lightPushParameters) error {
|
||||
params.peerAddr = pAddr
|
||||
if params.selectedPeer != "" {
|
||||
return errors.New("peerAddr and peerId options are mutually exclusive")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@ -35,9 +56,10 @@ func WithPeer(p peer.ID) Option {
|
||||
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
|
||||
// from the node peerstore
|
||||
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) Option {
|
||||
return func(params *lightPushParameters) {
|
||||
return func(params *lightPushParameters) error {
|
||||
params.peerSelectionType = peermanager.Automatic
|
||||
params.preferredPeers = fromThesePeers
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@ -46,38 +68,43 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) Option {
|
||||
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
|
||||
// from the node peerstore
|
||||
func WithFastestPeerSelection(fromThesePeers ...peer.ID) Option {
|
||||
return func(params *lightPushParameters) {
|
||||
return func(params *lightPushParameters) error {
|
||||
params.peerSelectionType = peermanager.LowestRTT
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithPubSubTopic is used to specify the pubsub topic on which a WakuMessage will be broadcasted
|
||||
func WithPubSubTopic(pubsubTopic string) Option {
|
||||
return func(params *lightPushParameters) {
|
||||
return func(params *lightPushParameters) error {
|
||||
params.pubsubTopic = pubsubTopic
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithDefaultPubsubTopic is used to indicate that the message should be broadcasted in the default pubsub topic
|
||||
func WithDefaultPubsubTopic() Option {
|
||||
return func(params *lightPushParameters) {
|
||||
return func(params *lightPushParameters) error {
|
||||
params.pubsubTopic = relay.DefaultWakuTopic
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithRequestID is an option to set a specific request ID to be used when
|
||||
// publishing a message
|
||||
func WithRequestID(requestID []byte) Option {
|
||||
return func(params *lightPushParameters) {
|
||||
return func(params *lightPushParameters) error {
|
||||
params.requestID = requestID
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithAutomaticRequestID is an option to automatically generate a request ID
|
||||
// when publishing a message
|
||||
func WithAutomaticRequestID() Option {
|
||||
return func(params *lightPushParameters) {
|
||||
return func(params *lightPushParameters) error {
|
||||
params.requestID = protocol.GenerateRequestID()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -5,6 +5,7 @@ import (
|
||||
"crypto/rand"
|
||||
"testing"
|
||||
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/waku-org/go-waku/tests"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
@ -30,10 +31,28 @@ func TestLightPushOption(t *testing.T) {
|
||||
params.log = utils.Logger()
|
||||
|
||||
for _, opt := range options {
|
||||
opt(params)
|
||||
err := opt(params)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
require.Equal(t, host, params.host)
|
||||
require.NotNil(t, params.selectedPeer)
|
||||
require.NotNil(t, params.requestID)
|
||||
|
||||
maddr, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/12345/p2p/16Uiu2HAm8KUwGRruseAaEGD6xGg6XKrDo8Py5dwDoL9wUpCxawGy")
|
||||
require.NoError(t, err)
|
||||
|
||||
options = []Option{
|
||||
WithPeer("16Uiu2HAm8KUwGRruseAaEGD6xGg6XKrDo8Py5dwDoL9wUpCxawGy"),
|
||||
WithPeerAddr(maddr),
|
||||
}
|
||||
|
||||
for idx, opt := range options {
|
||||
err = opt(params)
|
||||
if idx == 0 {
|
||||
require.NoError(t, err)
|
||||
} else {
|
||||
require.Error(t, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user