mirror of
https://github.com/logos-messaging/logos-messaging-go.git
synced 2026-01-08 08:53:12 +00:00
feat: add option to specify peerAddr for filter subscribe
This commit is contained in:
parent
9764801702
commit
446d1e5d1c
@ -18,6 +18,7 @@ import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/waku-org/go-waku/logging"
|
||||
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||
"github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter/pb"
|
||||
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
@ -26,6 +27,7 @@ import (
|
||||
"github.com/waku-org/go-waku/waku/v2/service"
|
||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/exp/maps"
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
@ -272,6 +274,52 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr
|
||||
return nil
|
||||
}
|
||||
|
||||
func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context, contentFilter protocol.ContentFilter, opts []FilterSubscribeOption) (*FilterSubscribeParameters, map[string][]string, error) {
|
||||
params := new(FilterSubscribeParameters)
|
||||
params.log = wf.log
|
||||
params.host = wf.h
|
||||
params.pm = wf.pm
|
||||
|
||||
optList := DefaultSubscriptionOptions()
|
||||
optList = append(optList, opts...)
|
||||
for _, opt := range optList {
|
||||
err := opt(params)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
pubSubTopicMap, err := protocol.ContentFilterToPubSubTopicMap(contentFilter)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
//Add Peer to peerstore.
|
||||
if params.pm != nil && params.peerAddr != nil {
|
||||
pData, err := wf.pm.AddPeer(params.peerAddr, peerstore.Static, maps.Keys(pubSubTopicMap), FilterSubscribeID_v20beta1)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
wf.pm.Connect(pData)
|
||||
params.selectedPeer = pData.AddrInfo.ID
|
||||
}
|
||||
if params.pm != nil && params.selectedPeer == "" {
|
||||
params.selectedPeer, err = wf.pm.SelectPeer(
|
||||
peermanager.PeerSelectionCriteria{
|
||||
SelectionType: params.peerSelectionType,
|
||||
Proto: FilterSubscribeID_v20beta1,
|
||||
PubsubTopics: maps.Keys(pubSubTopicMap),
|
||||
SpecificPeers: params.preferredPeers,
|
||||
Ctx: ctx,
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
return params, pubSubTopicMap, nil
|
||||
}
|
||||
|
||||
// Subscribe setups a subscription to receive messages that match a specific content filter
|
||||
// If contentTopics passed result in different pubSub topics (due to Auto/Static sharding), then multiple subscription requests are sent to the peer.
|
||||
// This may change if Filterv2 protocol is updated to handle such a scenario in a single request.
|
||||
@ -283,30 +331,15 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter prot
|
||||
return nil, err
|
||||
}
|
||||
|
||||
params := new(FilterSubscribeParameters)
|
||||
params.log = wf.log
|
||||
params.host = wf.h
|
||||
params.pm = wf.pm
|
||||
|
||||
optList := DefaultSubscriptionOptions()
|
||||
optList = append(optList, opts...)
|
||||
for _, opt := range optList {
|
||||
err := opt(params)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
pubSubTopicMap, err := protocol.ContentFilterToPubSubTopicMap(contentFilter)
|
||||
|
||||
params, pubSubTopicMap, err := wf.handleFilterSubscribeOptions(ctx, contentFilter, opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
failedContentTopics := []string{}
|
||||
subscriptions := make([]*subscription.SubscriptionDetails, 0)
|
||||
for pubSubTopic, cTopics := range pubSubTopicMap {
|
||||
var selectedPeer peer.ID
|
||||
//TO Optimize: find a peer with all pubSubTopics in the list if possible, if not only then look for single pubSubTopic
|
||||
if params.pm != nil && params.selectedPeer == "" {
|
||||
selectedPeer, err = wf.pm.SelectPeer(
|
||||
peermanager.PeerSelectionCriteria{
|
||||
|
||||
@ -1,11 +1,13 @@
|
||||
package filter
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"go.uber.org/zap"
|
||||
@ -34,6 +36,7 @@ func WithPingRequestId(requestId []byte) FilterPingOption {
|
||||
type (
|
||||
FilterSubscribeParameters struct {
|
||||
selectedPeer peer.ID
|
||||
peerAddr multiaddr.Multiaddr
|
||||
peerSelectionType peermanager.PeerSelection
|
||||
preferredPeers peer.IDSlice
|
||||
requestID []byte
|
||||
@ -65,9 +68,27 @@ func WithTimeout(timeout time.Duration) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// WithPeer is an option used to specify the peerID to request the message history.
|
||||
// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used.
|
||||
func WithPeer(p peer.ID) FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) error {
|
||||
params.selectedPeer = p
|
||||
if params.peerAddr != nil {
|
||||
return errors.New("peerAddr and peerId options are mutually exclusive")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// WithPeerAddr is an option used to specify a peerAddress to request the message history.
|
||||
// This new peer will be added to peerStore.
|
||||
// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used.
|
||||
func WithPeerAddr(pAddr multiaddr.Multiaddr) FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) error {
|
||||
params.peerAddr = pAddr
|
||||
if params.selectedPeer != "" {
|
||||
return errors.New("peerAddr and peerId options are mutually exclusive")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
@ -5,6 +5,7 @@ import (
|
||||
"crypto/rand"
|
||||
"testing"
|
||||
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/waku-org/go-waku/tests"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
@ -52,4 +53,26 @@ func TestFilterOption(t *testing.T) {
|
||||
require.NotNil(t, params2.selectedPeer)
|
||||
require.True(t, params2.unsubscribeAll)
|
||||
|
||||
// Mutually Exclusive options
|
||||
maddr, err := multiaddr.NewMultiaddr("/ip4/127.0.0.1/tcp/12345/p2p/16Uiu2HAm8KUwGRruseAaEGD6xGg6XKrDo8Py5dwDoL9wUpCxawGy")
|
||||
require.NoError(t, err)
|
||||
options3 := []FilterSubscribeOption{
|
||||
WithPeer("16Uiu2HAm8KUwGRruseAaEGD6xGg6XKrDo8Py5dwDoL9wUpCxawGy"),
|
||||
WithPeerAddr(maddr),
|
||||
}
|
||||
|
||||
params3 := new(FilterSubscribeParameters)
|
||||
|
||||
for idx, opt := range options3 {
|
||||
err := opt(params3)
|
||||
if idx == 0 {
|
||||
require.NoError(t, err)
|
||||
} else {
|
||||
require.Error(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
require.NotNil(t, params2.selectedPeer)
|
||||
require.True(t, params2.unsubscribeAll)
|
||||
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user