test: filterv2

This commit is contained in:
Richard Ramos 2023-02-15 14:43:51 -04:00 committed by RichΛrd
parent 600a8f1c8d
commit 335f7b6771
8 changed files with 336 additions and 64 deletions

View File

@ -162,9 +162,10 @@ func Execute(options Options) {
if options.Filter.Enable {
if options.Filter.UseV2 {
if !options.Filter.DisableFullNode {
nodeOpts = append(nodeOpts, node.WithWakuFilterV2LightNode())
} else {
nodeOpts = append(nodeOpts, node.WithWakuFilterV2FullNode(filter.WithTimeout(options.Filter.Timeout)))
}
nodeOpts = append(nodeOpts, node.WithWakuFilterV2LightNode())
} else {
nodeOpts = append(nodeOpts, node.WithWakuFilter(!options.Filter.DisableFullNode, filter.WithTimeout(options.Filter.Timeout)))
}

View File

@ -211,8 +211,8 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.relay = relay.NewWakuRelay(w.host, w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...)
w.filter = filter.NewWakuFilter(w.host, w.bcaster, w.opts.isFilterFullNode, w.timesource, w.log, w.opts.filterOpts...)
w.filterV2Full = filterv2.NewWakuFilter(w.host, w.bcaster, w.timesource, w.log, w.opts.filterOpts...)
w.filterV2Light = filterv2.NewWakuFilterPush(w.host, w.bcaster, w.timesource, w.log)
w.filterV2Full = filterv2.NewWakuFilterFullnode(w.host, w.bcaster, w.timesource, w.log, w.opts.filterOpts...)
w.filterV2Light = filterv2.NewWakuFilterLightnode(w.host, w.bcaster, w.timesource, w.log)
w.lightPush = lightpush.NewWakuLightPush(w.host, w.Relay(), w.log)
if w.opts.enableSwap {
@ -240,8 +240,6 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
return nil, err
}
w.enrChangeCh = make(chan struct{}, 10)
if params.connStatusC != nil {
w.connStatusChan = params.connStatusC
}
@ -293,6 +291,8 @@ func (w *WakuNode) Start(ctx context.Context) error {
w.connectionNotif = NewConnectionNotifier(ctx, w.host, w.log)
w.host.Network().Notify(w.connectionNotif)
w.enrChangeCh = make(chan struct{}, 10)
w.wg.Add(3)
go w.connectednessListener(ctx)
go w.watchMultiaddressChanges(ctx)
@ -513,8 +513,8 @@ func (w *WakuNode) Filter() *filter.WakuFilter {
}
// FilterV2 is used to access any operation related to Waku Filter protocol
func (w *WakuNode) FilterV2() *filterv2.WakuFilterPush {
if result, ok := w.filterV2Light.(*filterv2.WakuFilterPush); ok {
func (w *WakuNode) FilterV2() *filterv2.WakuFilterLightnode {
if result, ok := w.filterV2Light.(*filterv2.WakuFilterLightnode); ok {
return result
}
return nil

View File

@ -32,7 +32,7 @@ var (
ErrNoPeersAvailable = errors.New("no suitable remote peers")
)
type WakuFilterPush struct {
type WakuFilterLightnode struct {
cancel context.CancelFunc
ctx context.Context
h host.Host
@ -54,9 +54,9 @@ type WakuFilterPushResult struct {
}
// NewWakuRelay returns a new instance of Waku Filter struct setup according to the chosen parameter and options
func NewWakuFilterPush(host host.Host, broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger) *WakuFilterPush {
wf := new(WakuFilterPush)
wf.log = log.Named("filter")
func NewWakuFilterLightnode(host host.Host, broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger) *WakuFilterLightnode {
wf := new(WakuFilterLightnode)
wf.log = log.Named("filterv2-lightnode")
wf.broadcaster = broadcaster
wf.timesource = timesource
wf.wg = &sync.WaitGroup{}
@ -65,7 +65,7 @@ func NewWakuFilterPush(host host.Host, broadcaster v2.Broadcaster, timesource ti
return wf
}
func (wf *WakuFilterPush) Start(ctx context.Context) error {
func (wf *WakuFilterLightnode) Start(ctx context.Context) error {
wf.wg.Wait() // Wait for any goroutines to stop
ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter"))
@ -81,8 +81,7 @@ func (wf *WakuFilterPush) Start(ctx context.Context) error {
wf.h.SetStreamHandlerMatch(FilterPushID_v20beta1, protocol.PrefixTextMatch(string(FilterPushID_v20beta1)), wf.onRequest(ctx))
wf.wg.Add(1)
// wf.wg.Add(1)
// TODO: go wf.keepAliveSubscriptions(ctx)
wf.log.Info("filter protocol (light) started")
@ -91,7 +90,7 @@ func (wf *WakuFilterPush) Start(ctx context.Context) error {
}
// Stop unmounts the filter protocol
func (wf *WakuFilterPush) Stop() {
func (wf *WakuFilterLightnode) Stop() {
if wf.cancel == nil {
return
}
@ -100,14 +99,14 @@ func (wf *WakuFilterPush) Stop() {
wf.h.RemoveStreamHandler(FilterPushID_v20beta1)
wf.UnsubscribeAll(wf.ctx)
_, _ = wf.UnsubscribeAll(wf.ctx)
wf.subscriptions.Clear()
wf.wg.Wait()
}
func (wf *WakuFilterPush) onRequest(ctx context.Context) func(s network.Stream) {
func (wf *WakuFilterLightnode) onRequest(ctx context.Context) func(s network.Stream) {
return func(s network.Stream) {
defer s.Close()
logger := wf.log.With(logging.HostID("peer", s.Conn().RemotePeer()))
@ -127,7 +126,7 @@ func (wf *WakuFilterPush) onRequest(ctx context.Context) func(s network.Stream)
}
}
func (wf *WakuFilterPush) notify(remotePeerID peer.ID, pubsubTopic string, msg *pb.WakuMessage) {
func (wf *WakuFilterLightnode) notify(remotePeerID peer.ID, pubsubTopic string, msg *pb.WakuMessage) {
envelope := protocol.NewEnvelope(msg, wf.timesource.Now().UnixNano(), pubsubTopic)
// Broadcasting message so it's stored
@ -137,7 +136,7 @@ func (wf *WakuFilterPush) notify(remotePeerID peer.ID, pubsubTopic string, msg *
wf.subscriptions.Notify(remotePeerID, envelope)
}
func (wf *WakuFilterPush) request(ctx context.Context, params *FilterSubscribeParameters, reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter ContentFilter) error {
func (wf *WakuFilterLightnode) request(ctx context.Context, params *FilterSubscribeParameters, reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter ContentFilter) error {
err := wf.h.Connect(ctx, wf.h.Peerstore().PeerInfo(params.selectedPeer))
if err != nil {
return err
@ -182,13 +181,13 @@ func (wf *WakuFilterPush) request(ctx context.Context, params *FilterSubscribePa
}
// Subscribe setups a subscription to receive messages that match a specific content filter
func (wf *WakuFilterPush) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) error {
func (wf *WakuFilterLightnode) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) (*SubscriptionDetails, error) {
if contentFilter.Topic == "" {
return errors.New("topic is required")
return nil, errors.New("topic is required")
}
if len(contentFilter.ContentTopics) == 0 {
return errors.New("at least one content topic is required")
return nil, errors.New("at least one content topic is required")
}
params := new(FilterSubscribeParameters)
@ -202,23 +201,23 @@ func (wf *WakuFilterPush) Subscribe(ctx context.Context, contentFilter ContentFi
}
if params.selectedPeer == "" {
return ErrNoPeersAvailable
return nil, ErrNoPeersAvailable
}
err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, contentFilter)
if err != nil {
return err
return nil, err
}
return nil
return wf.FilterSubscription(params.selectedPeer, contentFilter), nil
}
// FilterSubscription is used to obtain an object from which you could receive messages received via filter protocol
func (wf *WakuFilterPush) FilterSubscription(peerID peer.ID, topic string, contentTopics []string) *SubscriptionDetails {
return wf.subscriptions.NewSubscription(peerID, topic, contentTopics)
func (wf *WakuFilterLightnode) FilterSubscription(peerID peer.ID, contentFilter ContentFilter) *SubscriptionDetails {
return wf.subscriptions.NewSubscription(peerID, contentFilter.Topic, contentFilter.ContentTopics)
}
func (wf *WakuFilterPush) getUnsubscribeParameters(opts ...FilterUnsubscribeOption) (*FilterUnsubscribeParameters, error) {
func (wf *WakuFilterLightnode) getUnsubscribeParameters(opts ...FilterUnsubscribeOption) (*FilterUnsubscribeParameters, error) {
params := new(FilterUnsubscribeParameters)
params.log = wf.log
for _, opt := range opts {
@ -233,18 +232,18 @@ func (wf *WakuFilterPush) getUnsubscribeParameters(opts ...FilterUnsubscribeOpti
}
// Unsubscribe is used to stop receiving messages from a peer that match a content filter
func (wf *WakuFilterPush) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterUnsubscribeOption) (error, <-chan WakuFilterPushResult) {
func (wf *WakuFilterLightnode) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
if contentFilter.Topic == "" {
return errors.New("topic is required"), nil
return nil, errors.New("topic is required")
}
if len(contentFilter.ContentTopics) == 0 {
return errors.New("at least one content topic is required"), nil
return nil, errors.New("at least one content topic is required")
}
params, err := wf.getUnsubscribeParameters(opts...)
if err != nil {
return err, nil
return nil, err
}
localWg := sync.WaitGroup{}
@ -262,7 +261,7 @@ func (wf *WakuFilterPush) Unsubscribe(ctx context.Context, contentFilter Content
ctx,
&FilterSubscribeParameters{selectedPeer: peerID},
pb.FilterSubscribeRequest_UNSUBSCRIBE,
ContentFilter{})
contentFilter)
if err != nil {
wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err))
}
@ -277,11 +276,23 @@ func (wf *WakuFilterPush) Unsubscribe(ctx context.Context, contentFilter Content
localWg.Wait()
close(resultChan)
return nil, resultChan
return resultChan, nil
}
// Unsubscribe is used to stop receiving messages from a peer that match a content filter
func (wf *WakuFilterLightnode) UnsubscribeWithSubscription(ctx context.Context, sub *SubscriptionDetails, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
var contentTopics []string
for k := range sub.contentTopics {
contentTopics = append(contentTopics, k)
}
opts = append(opts, Peer(sub.peerID))
return wf.Unsubscribe(ctx, ContentFilter{Topic: sub.pubsubTopic, ContentTopics: contentTopics}, opts...)
}
// UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions
func (wf *WakuFilterPush) UnsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
func (wf *WakuFilterLightnode) UnsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
params, err := wf.getUnsubscribeParameters(opts...)
if err != nil {
return nil, err

View File

@ -0,0 +1,230 @@
package filterv2
import (
"context"
"crypto/rand"
"sync"
"testing"
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
)
func makeWakuRelay(t *testing.T, topic string, broadcaster v2.Broadcaster) (*relay.WakuRelay, *relay.Subscription, host.Host) {
port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
relay := relay.NewWakuRelay(host, broadcaster, 0, timesource.NewDefaultClock(), utils.Logger())
err = relay.Start(context.Background())
require.NoError(t, err)
sub, err := relay.SubscribeToTopic(context.Background(), topic)
require.NoError(t, err)
return relay, sub, host
}
func makeWakuFilterLightNode(t *testing.T) (*WakuFilterLightnode, host.Host) {
port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
filterPush := NewWakuFilterLightnode(host, v2.NewBroadcaster(10), timesource.NewDefaultClock(), utils.Logger())
err = filterPush.Start(context.Background())
require.NoError(t, err)
return filterPush, host
}
// Node1: Filter subscribed to content topic A
// Node2: Relay + Filter
//
// # Node1 and Node2 are peers
//
// Node2 send a successful message with topic A
// Node1 receive the message
//
// Node2 send a successful message with topic B
// Node1 doesn't receive the message
func TestWakuFilter(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds
defer cancel()
testTopic := "/waku/2/go/filter/test"
testContentTopic := "TopicA"
node1, host1 := makeWakuFilterLightNode(t)
defer node1.Stop()
broadcaster := v2.NewBroadcaster(10)
node2, sub2, host2 := makeWakuRelay(t, testTopic, broadcaster)
defer node2.Stop()
defer sub2.Unsubscribe()
node2Filter := NewWakuFilterFullnode(host2, broadcaster, timesource.NewDefaultClock(), utils.Logger())
err := node2Filter.Start(ctx)
require.NoError(t, err)
broadcaster.Register(&testTopic, node2Filter.MessageChannel())
host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
err = host1.Peerstore().AddProtocols(host2.ID(), string(FilterSubscribeID_v20beta1))
require.NoError(t, err)
contentFilter := ContentFilter{
Topic: string(testTopic),
ContentTopics: []string{testContentTopic},
}
subscriptionChannel, err := node1.Subscribe(ctx, contentFilter, WithPeer(node2Filter.h.ID()))
require.NoError(t, err)
// Sleep to make sure the filter is subscribed
time.Sleep(2 * time.Second)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
env := <-subscriptionChannel.C
require.Equal(t, contentFilter.ContentTopics[0], env.Message().GetContentTopic())
}()
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 0), testTopic)
require.NoError(t, err)
wg.Wait()
wg.Add(1)
go func() {
select {
case <-subscriptionChannel.C:
require.Fail(t, "should not receive another message")
case <-time.After(1 * time.Second):
defer wg.Done()
case <-ctx.Done():
require.Fail(t, "test exceeded allocated time")
}
}()
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage("TopicB", 1), testTopic)
require.NoError(t, err)
wg.Wait()
wg.Add(1)
go func() {
select {
case <-subscriptionChannel.C:
require.Fail(t, "should not receive another message")
case <-time.After(1 * time.Second):
defer wg.Done()
case <-ctx.Done():
require.Fail(t, "test exceeded allocated time")
}
}()
_, err = node1.Unsubscribe(ctx, contentFilter, Peer(node2Filter.h.ID()))
require.NoError(t, err)
time.Sleep(1 * time.Second)
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 2), testTopic)
require.NoError(t, err)
wg.Wait()
}
func TestWakuFilterPeerFailure(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds
defer cancel()
testTopic := "/waku/2/go/filter/test"
testContentTopic := "TopicA"
node1, host1 := makeWakuFilterLightNode(t)
broadcaster := v2.NewBroadcaster(10)
node2, sub2, host2 := makeWakuRelay(t, testTopic, broadcaster)
defer node2.Stop()
defer sub2.Unsubscribe()
node2Filter := NewWakuFilterFullnode(host2, v2.NewBroadcaster(10), timesource.NewDefaultClock(), utils.Logger(), filter.WithTimeout(5*time.Second))
err := node2Filter.Start(ctx)
require.NoError(t, err)
broadcaster.Register(&testTopic, node2Filter.MessageChannel())
host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
err = host1.Peerstore().AddProtocols(host2.ID(), string(FilterPushID_v20beta1))
require.NoError(t, err)
contentFilter := &ContentFilter{
Topic: string(testTopic),
ContentTopics: []string{testContentTopic},
}
f, err := node1.Subscribe(ctx, *contentFilter, WithPeer(node2Filter.h.ID()))
require.NoError(t, err)
// Simulate there's been a failure before
node2Filter.subscriptions.FlagAsFailure(host1.ID())
// Sleep to make sure the filter is subscribed
time.Sleep(2 * time.Second)
require.True(t, node2Filter.subscriptions.IsFailedPeer(host1.ID()))
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
env := <-f.C
require.Equal(t, contentFilter.ContentTopics[0], env.Message().GetContentTopic())
// Failure is removed
require.False(t, node2Filter.subscriptions.IsFailedPeer(host1.ID()))
}()
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 0), testTopic)
require.NoError(t, err)
wg.Wait()
// Kill the subscriber
host1.Close()
time.Sleep(1 * time.Second)
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 1), testTopic)
require.NoError(t, err)
// TODO: find out how to eliminate this sleep
time.Sleep(1 * time.Second)
require.True(t, node2Filter.subscriptions.IsFailedPeer(host1.ID()))
time.Sleep(2 * time.Second)
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 2), testTopic)
require.NoError(t, err)
time.Sleep(2 * time.Second)
require.True(t, node2Filter.subscriptions.IsFailedPeer(host1.ID())) // Failed peer has been removed
require.False(t, node2Filter.subscriptions.Has(host1.ID())) // Failed peer has been removed
}

View File

@ -28,7 +28,7 @@ import (
const FilterSubscribeID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter-subscribe/2.0.0-beta1")
type (
WakuFilter struct {
WakuFilterFull struct {
cancel context.CancelFunc
h host.Host
msgC chan *protocol.Envelope
@ -39,9 +39,9 @@ type (
}
)
// NewWakuFilter returns a new instance of Waku Filter struct setup according to the chosen parameter and options
func NewWakuFilter(host host.Host, broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger, opts ...filter.Option) *WakuFilter {
wf := new(WakuFilter)
// NewWakuFilterFullnode returns a new instance of Waku Filter struct setup according to the chosen parameter and options
func NewWakuFilterFullnode(host host.Host, broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger, opts ...filter.Option) *WakuFilterFull {
wf := new(WakuFilterFull)
wf.log = log.Named("filterv2-fullnode")
params := new(filter.FilterParameters)
@ -58,7 +58,7 @@ func NewWakuFilter(host host.Host, broadcaster v2.Broadcaster, timesource timeso
return wf
}
func (wf *WakuFilter) Start(ctx context.Context) error {
func (wf *WakuFilterFull) Start(ctx context.Context) error {
wf.wg.Wait() // Wait for any goroutines to stop
ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter"))
@ -82,7 +82,7 @@ func (wf *WakuFilter) Start(ctx context.Context) error {
return nil
}
func (wf *WakuFilter) onRequest(ctx context.Context) func(s network.Stream) {
func (wf *WakuFilterFull) onRequest(ctx context.Context) func(s network.Stream) {
return func(s network.Stream) {
defer s.Close()
logger := wf.log.With(logging.HostID("peer", s.Conn().RemotePeer()))
@ -132,7 +132,7 @@ func reply(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequ
}
}
func (wf *WakuFilter) ping(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
func (wf *WakuFilterFull) ping(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
exists := wf.subscriptions.Has(s.Conn().RemotePeer())
if exists {
@ -142,7 +142,7 @@ func (wf *WakuFilter) ping(s network.Stream, logger *zap.Logger, request *pb.Fil
}
}
func (wf *WakuFilter) subscribe(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
func (wf *WakuFilterFull) subscribe(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
if request.PubsubTopic == "" {
reply(s, logger, request, http.StatusBadRequest, "pubsubtopic can't be empty")
return
@ -160,7 +160,7 @@ func (wf *WakuFilter) subscribe(s network.Stream, logger *zap.Logger, request *p
reply(s, logger, request, http.StatusOK)
}
func (wf *WakuFilter) unsubscribe(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
func (wf *WakuFilterFull) unsubscribe(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
if request.PubsubTopic == "" {
reply(s, logger, request, http.StatusBadRequest, "pubsubtopic can't be empty")
return
@ -179,7 +179,7 @@ func (wf *WakuFilter) unsubscribe(s network.Stream, logger *zap.Logger, request
}
}
func (wf *WakuFilter) unsubscribeAll(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
func (wf *WakuFilterFull) unsubscribeAll(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
err := wf.subscriptions.DeleteAll(s.Conn().RemotePeer())
if err != nil {
reply(s, logger, request, http.StatusNotFound)
@ -188,7 +188,7 @@ func (wf *WakuFilter) unsubscribeAll(s network.Stream, logger *zap.Logger, reque
}
}
func (wf *WakuFilter) filterListener(ctx context.Context) {
func (wf *WakuFilterFull) filterListener(ctx context.Context) {
defer wf.wg.Done()
// This function is invoked for each message received
@ -225,7 +225,7 @@ func (wf *WakuFilter) filterListener(ctx context.Context) {
}
}
func (wf *WakuFilter) pushMessage(ctx context.Context, peerID peer.ID, env *protocol.Envelope) error {
func (wf *WakuFilterFull) pushMessage(ctx context.Context, peerID peer.ID, env *protocol.Envelope) error {
logger := wf.log.With(logging.HostID("peer", peerID))
messagePush := &pb.MessagePushV2{
@ -264,7 +264,7 @@ func (wf *WakuFilter) pushMessage(ctx context.Context, peerID peer.ID, env *prot
}
// Stop unmounts the filter protocol
func (wf *WakuFilter) Stop() {
func (wf *WakuFilterFull) Stop() {
if wf.cancel == nil {
return
}
@ -278,6 +278,6 @@ func (wf *WakuFilter) Stop() {
wf.wg.Wait()
}
func (wf *WakuFilter) MessageChannel() chan *protocol.Envelope {
func (wf *WakuFilterFull) MessageChannel() chan *protocol.Envelope {
return wf.msgC
}

View File

@ -37,6 +37,15 @@ func NewSubscribersMap(timeout time.Duration) *SubscribersMap {
}
}
func (sub *SubscribersMap) Clear() {
sub.Lock()
defer sub.Unlock()
sub.items = make(map[peer.ID]PubsubTopics)
sub.interestMap = make(map[string]PeerSet)
sub.failedPeers = make(map[peer.ID]time.Time)
}
func (sub *SubscribersMap) Set(peerID peer.ID, pubsubTopic string, contentTopics []string) {
sub.Lock()
defer sub.Unlock()
@ -134,7 +143,6 @@ func (sub *SubscribersMap) deleteAll(peerID peer.ID) error {
}
delete(sub.items, peerID)
delete(sub.failedPeers, peerID)
return nil
}
@ -223,8 +231,8 @@ func (sub *SubscribersMap) FlagAsFailure(peerID peer.ID) {
lastFailure, ok := sub.failedPeers[peerID]
if ok {
elapsedTime := time.Since(lastFailure)
if elapsedTime > sub.timeout {
sub.deleteAll(peerID)
if elapsedTime < sub.timeout {
_ = sub.deleteAll(peerID)
}
} else {
sub.failedPeers[peerID] = time.Now()

View File

@ -32,21 +32,21 @@ func TestAppend(t *testing.T) {
subs.Set(peerId, TOPIC, []string{"topic1"})
sub := firstSubscriber(subs, TOPIC, "topic1")
assert.NotNil(t, sub)
assert.NotEmpty(t, sub)
// Adding to existing peer
subs.Set(peerId, TOPIC, []string{"topic2"})
sub = firstSubscriber(subs, TOPIC, "topic2")
assert.NotNil(t, sub)
assert.NotEmpty(t, sub)
subs.Set(peerId, TOPIC+"2", []string{"topic1"})
sub = firstSubscriber(subs, TOPIC+"2", "topic1")
assert.NotNil(t, sub)
assert.NotEmpty(t, sub)
sub = firstSubscriber(subs, TOPIC, "topic2")
assert.Nil(t, sub)
sub = firstSubscriber(subs, TOPIC+"2", "topic2")
assert.Empty(t, sub)
}
func TestRemove(t *testing.T) {
@ -56,16 +56,17 @@ func TestRemove(t *testing.T) {
subs.Set(peerId, TOPIC+"1", []string{"topic1", "topic2"})
subs.Set(peerId, TOPIC+"2", []string{"topic1"})
subs.DeleteAll(peerId)
err := subs.DeleteAll(peerId)
assert.Empty(t, err)
sub := firstSubscriber(subs, TOPIC+"1", "topic1")
assert.Nil(t, sub)
assert.Empty(t, sub)
sub = firstSubscriber(subs, TOPIC+"1", "topic2")
assert.Nil(t, sub)
assert.Empty(t, sub)
sub = firstSubscriber(subs, TOPIC+"2", "topic1")
assert.Nil(t, sub)
assert.Empty(t, sub)
assert.False(t, subs.Has(peerId))
@ -85,7 +86,7 @@ func TestRemovePartial(t *testing.T) {
require.NoError(t, err)
sub := firstSubscriber(subs, TOPIC, "topic2")
assert.NotNil(t, sub)
assert.NotEmpty(t, sub)
}
func TestRemoveBogus(t *testing.T) {
@ -97,9 +98,9 @@ func TestRemoveBogus(t *testing.T) {
require.NoError(t, err)
sub := firstSubscriber(subs, TOPIC, "topic1")
assert.Nil(t, sub)
assert.Empty(t, sub)
sub = firstSubscriber(subs, TOPIC, "does not exist")
assert.Nil(t, sub)
assert.Empty(t, sub)
err = subs.Delete(peerId, "DOES_NOT_EXIST", []string{"topic1"})
require.Error(t, err)

View File

@ -124,6 +124,27 @@ func (s *SubscriptionDetails) Close() error {
return s.mapRef.Delete(s)
}
func (s *SubscriptionDetails) Clone() *SubscriptionDetails {
s.RLock()
defer s.RUnlock()
result := &SubscriptionDetails{
id: uuid.NewString(),
mapRef: s.mapRef,
closed: false,
peerID: s.peerID,
pubsubTopic: s.pubsubTopic,
contentTopics: make(map[string]struct{}),
C: make(chan *protocol.Envelope),
}
for k := range s.contentTopics {
result.contentTopics[k] = struct{}{}
}
return result
}
func (sub *SubscriptionsMap) clear() {
for _, peerSubscription := range sub.items {
for _, subscriptionSet := range peerSubscription.subscriptionsPerTopic {