chore(filter2): test updates

This commit is contained in:
Vitaliy Vlasov 2023-08-14 23:29:00 +03:00 committed by GitHub
parent c320b38cbe
commit 419adcb6a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 287 additions and 225 deletions

View File

@ -122,8 +122,14 @@ func MakeHost(ctx context.Context, port int, randomness io.Reader) (host.Host, e
}
// CreateWakuMessage creates a WakuMessage protobuffer with default values and a custom contenttopic and timestamp
func CreateWakuMessage(contentTopic string, timestamp int64) *pb.WakuMessage {
return &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: contentTopic, Version: 0, Timestamp: timestamp}
func CreateWakuMessage(contentTopic string, timestamp int64, optionalPayload ...string) *pb.WakuMessage {
var payload []byte
if len(optionalPayload) > 0 {
payload = []byte(optionalPayload[0])
} else {
payload = []byte{1, 2, 3}
}
return &pb.WakuMessage{Payload: payload, ContentTopic: contentTopic, Version: 0, Timestamp: timestamp}
}
// RandomHex returns a random hex string of n bytes

View File

@ -90,8 +90,8 @@ type WakuNode struct {
peerExchange Service
rendezvous Service
legacyFilter ReceptorService
filterFullnode ReceptorService
filterLightnode Service
filterFullNode ReceptorService
filterLightNode Service
store ReceptorService
rlnRelay RLNRelay
@ -272,9 +272,9 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.rendezvous = rendezvous.NewRendezvous(w.opts.rendezvousDB, w.peerConnector, w.log)
w.relay = relay.NewWakuRelay(w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...)
w.legacyFilter = legacy_filter.NewWakuFilter(w.bcaster, w.opts.isLegacyFilterFullnode, w.timesource, w.log, w.opts.legacyFilterOpts...)
w.filterFullnode = filter.NewWakuFilterFullnode(w.timesource, w.log, w.opts.filterOpts...)
w.filterLightnode = filter.NewWakuFilterLightnode(w.bcaster, w.peermanager, w.timesource, w.log)
w.legacyFilter = legacy_filter.NewWakuFilter(w.bcaster, w.opts.isLegacyFilterFullNode, w.timesource, w.log, w.opts.legacyFilterOpts...)
w.filterFullNode = filter.NewWakuFilterFullNode(w.timesource, w.log, w.opts.filterOpts...)
w.filterLightNode = filter.NewWakuFilterLightNode(w.bcaster, w.peermanager, w.timesource, w.log)
w.lightPush = lightpush.NewWakuLightPush(w.Relay(), w.peermanager, w.log)
if params.storeFactory != nil {
@ -434,10 +434,10 @@ func (w *WakuNode) Start(ctx context.Context) error {
w.log.Info("Subscribing filter to broadcaster")
}
w.filterFullnode.SetHost(host)
w.filterFullNode.SetHost(host)
if w.opts.enableFilterFullNode {
sub := w.bcaster.RegisterForAll()
err := w.filterFullnode.Start(ctx, sub)
err := w.filterFullNode.Start(ctx, sub)
if err != nil {
return err
}
@ -445,9 +445,9 @@ func (w *WakuNode) Start(ctx context.Context) error {
}
w.filterLightnode.SetHost(host)
w.filterLightNode.SetHost(host)
if w.opts.enableFilterLightNode {
err := w.filterLightnode.Start(ctx)
err := w.filterLightNode.Start(ctx)
if err != nil {
return err
}
@ -501,7 +501,7 @@ func (w *WakuNode) Stop() {
w.lightPush.Stop()
w.store.Stop()
w.legacyFilter.Stop()
w.filterFullnode.Stop()
w.filterFullNode.Stop()
if w.opts.enableDiscV5 {
w.discoveryV5.Stop()
@ -598,16 +598,16 @@ func (w *WakuNode) LegacyFilter() *legacy_filter.WakuFilter {
}
// FilterLightnode is used to access any operation related to Waku Filter protocol Full node feature
func (w *WakuNode) FilterFullnode() *filter.WakuFilterFullNode {
if result, ok := w.filterFullnode.(*filter.WakuFilterFullNode); ok {
func (w *WakuNode) FilterFullNode() *filter.WakuFilterFullNode {
if result, ok := w.filterFullNode.(*filter.WakuFilterFullNode); ok {
return result
}
return nil
}
// FilterFullnode is used to access any operation related to Waku Filter protocol Light node feature
func (w *WakuNode) FilterLightnode() *filter.WakuFilterLightnode {
if result, ok := w.filterLightnode.(*filter.WakuFilterLightnode); ok {
// FilterFullNode is used to access any operation related to Waku Filter protocol Light node feature
func (w *WakuNode) FilterLightnode() *filter.WakuFilterLightNode {
if result, ok := w.filterLightNode.(*filter.WakuFilterLightNode); ok {
return result
}
return nil

View File

@ -68,7 +68,7 @@ type WakuNodeParameters struct {
enableRelay bool
enableLegacyFilter bool
isLegacyFilterFullnode bool
isLegacyFilterFullNode bool
enableFilterLightNode bool
enableFilterFullNode bool
legacyFilterOpts []legacy_filter.Option
@ -363,7 +363,7 @@ func WithPeerExchange() WakuNodeOption {
func WithLegacyWakuFilter(fullnode bool, filterOpts ...legacy_filter.Option) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableLegacyFilter = true
params.isLegacyFilterFullnode = fullnode
params.isLegacyFilterFullNode = fullnode
params.legacyFilterOpts = filterOpts
return nil
}

View File

@ -34,7 +34,7 @@ var (
ErrNoPeersAvailable = errors.New("no suitable remote peers")
)
type WakuFilterLightnode struct {
type WakuFilterLightNode struct {
cancel context.CancelFunc
ctx context.Context
h host.Host
@ -59,9 +59,9 @@ type WakuFilterPushResult struct {
// NewWakuFilterLightnode returns a new instance of Waku Filter struct setup according to the chosen parameter and options
// Takes an optional peermanager if WakuFilterLightnode is being created along with WakuNode.
// If using libp2p host, then pass peermanager as nil
func NewWakuFilterLightnode(broadcaster relay.Broadcaster, pm *peermanager.PeerManager,
timesource timesource.Timesource, log *zap.Logger) *WakuFilterLightnode {
wf := new(WakuFilterLightnode)
func NewWakuFilterLightNode(broadcaster relay.Broadcaster, pm *peermanager.PeerManager,
timesource timesource.Timesource, log *zap.Logger) *WakuFilterLightNode {
wf := new(WakuFilterLightNode)
wf.log = log.Named("filterv2-lightnode")
wf.broadcaster = broadcaster
wf.timesource = timesource
@ -72,11 +72,11 @@ func NewWakuFilterLightnode(broadcaster relay.Broadcaster, pm *peermanager.PeerM
}
// Sets the host to be able to mount or consume a protocol
func (wf *WakuFilterLightnode) SetHost(h host.Host) {
func (wf *WakuFilterLightNode) SetHost(h host.Host) {
wf.h = h
}
func (wf *WakuFilterLightnode) Start(ctx context.Context) error {
func (wf *WakuFilterLightNode) Start(ctx context.Context) error {
wf.wg.Wait() // Wait for any goroutines to stop
ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter"))
@ -98,7 +98,7 @@ func (wf *WakuFilterLightnode) Start(ctx context.Context) error {
}
// Stop unmounts the filter protocol
func (wf *WakuFilterLightnode) Stop() {
func (wf *WakuFilterLightNode) Stop() {
if wf.cancel == nil {
return
}
@ -114,7 +114,7 @@ func (wf *WakuFilterLightnode) Stop() {
wf.wg.Wait()
}
func (wf *WakuFilterLightnode) onRequest(ctx context.Context) func(s network.Stream) {
func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Stream) {
return func(s network.Stream) {
defer s.Close()
logger := wf.log.With(logging.HostID("peer", s.Conn().RemotePeer()))
@ -149,7 +149,7 @@ func (wf *WakuFilterLightnode) onRequest(ctx context.Context) func(s network.Str
}
}
func (wf *WakuFilterLightnode) notify(remotePeerID peer.ID, pubsubTopic string, msg *wpb.WakuMessage) {
func (wf *WakuFilterLightNode) notify(remotePeerID peer.ID, pubsubTopic string, msg *wpb.WakuMessage) {
envelope := protocol.NewEnvelope(msg, wf.timesource.Now().UnixNano(), pubsubTopic)
// Broadcasting message so it's stored
@ -159,7 +159,7 @@ func (wf *WakuFilterLightnode) notify(remotePeerID peer.ID, pubsubTopic string,
wf.subscriptions.Notify(remotePeerID, envelope)
}
func (wf *WakuFilterLightnode) request(ctx context.Context, params *FilterSubscribeParameters, reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter ContentFilter) error {
func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscribeParameters, reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter ContentFilter) error {
conn, err := wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_v20beta1)
if err != nil {
metrics.RecordFilterError(ctx, "dial_failure")
@ -210,7 +210,7 @@ func (wf *WakuFilterLightnode) request(ctx context.Context, params *FilterSubscr
}
// Subscribe setups a subscription to receive messages that match a specific content filter
func (wf *WakuFilterLightnode) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) (*SubscriptionDetails, error) {
func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) (*SubscriptionDetails, error) {
if contentFilter.Topic == "" {
return nil, errors.New("topic is required")
}
@ -248,7 +248,7 @@ func (wf *WakuFilterLightnode) Subscribe(ctx context.Context, contentFilter Cont
}
// FilterSubscription is used to obtain an object from which you could receive messages received via filter protocol
func (wf *WakuFilterLightnode) FilterSubscription(peerID peer.ID, contentFilter ContentFilter) (*SubscriptionDetails, error) {
func (wf *WakuFilterLightNode) FilterSubscription(peerID peer.ID, contentFilter ContentFilter) (*SubscriptionDetails, error) {
if !wf.subscriptions.Has(peerID, contentFilter.Topic, contentFilter.ContentTopics...) {
return nil, errors.New("subscription does not exist")
}
@ -256,7 +256,7 @@ func (wf *WakuFilterLightnode) FilterSubscription(peerID peer.ID, contentFilter
return wf.subscriptions.NewSubscription(peerID, contentFilter.Topic, contentFilter.ContentTopics), nil
}
func (wf *WakuFilterLightnode) getUnsubscribeParameters(opts ...FilterUnsubscribeOption) (*FilterUnsubscribeParameters, error) {
func (wf *WakuFilterLightNode) getUnsubscribeParameters(opts ...FilterUnsubscribeOption) (*FilterUnsubscribeParameters, error) {
params := new(FilterUnsubscribeParameters)
params.log = wf.log
for _, opt := range opts {
@ -266,7 +266,7 @@ func (wf *WakuFilterLightnode) getUnsubscribeParameters(opts ...FilterUnsubscrib
return params, nil
}
func (wf *WakuFilterLightnode) Ping(ctx context.Context, peerID peer.ID) error {
func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID) error {
return wf.request(
ctx,
&FilterSubscribeParameters{selectedPeer: peerID},
@ -274,11 +274,11 @@ func (wf *WakuFilterLightnode) Ping(ctx context.Context, peerID peer.ID) error {
ContentFilter{})
}
func (wf *WakuFilterLightnode) IsSubscriptionAlive(ctx context.Context, subscription *SubscriptionDetails) error {
func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscription *SubscriptionDetails) error {
return wf.Ping(ctx, subscription.PeerID)
}
func (wf *WakuFilterLightnode) Subscriptions() []*SubscriptionDetails {
func (wf *WakuFilterLightNode) Subscriptions() []*SubscriptionDetails {
wf.subscriptions.RLock()
defer wf.subscriptions.RUnlock()
@ -295,7 +295,7 @@ func (wf *WakuFilterLightnode) Subscriptions() []*SubscriptionDetails {
return output
}
func (wf *WakuFilterLightnode) cleanupSubscriptions(peerID peer.ID, contentFilter ContentFilter) {
func (wf *WakuFilterLightNode) cleanupSubscriptions(peerID peer.ID, contentFilter ContentFilter) {
wf.subscriptions.Lock()
defer wf.subscriptions.Unlock()
@ -327,7 +327,7 @@ func (wf *WakuFilterLightnode) cleanupSubscriptions(peerID peer.ID, contentFilte
}
// Unsubscribe is used to stop receiving messages from a peer that match a content filter
func (wf *WakuFilterLightnode) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
if contentFilter.Topic == "" {
return nil, errors.New("topic is required")
}
@ -391,7 +391,7 @@ func (wf *WakuFilterLightnode) Unsubscribe(ctx context.Context, contentFilter Co
}
// Unsubscribe is used to stop receiving messages from a peer that match a content filter
func (wf *WakuFilterLightnode) UnsubscribeWithSubscription(ctx context.Context, sub *SubscriptionDetails, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *SubscriptionDetails, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
var contentTopics []string
for k := range sub.ContentTopics {
contentTopics = append(contentTopics, k)
@ -403,7 +403,7 @@ func (wf *WakuFilterLightnode) UnsubscribeWithSubscription(ctx context.Context,
}
// UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions
func (wf *WakuFilterLightnode) UnsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
params, err := wf.getUnsubscribeParameters(opts...)
if err != nil {
return nil, err

View File

@ -9,270 +9,326 @@ import (
"time"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
func makeWakuRelay(t *testing.T, topic string, broadcaster relay.Broadcaster) (*relay.WakuRelay, *relay.Subscription, host.Host) {
port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)
func TestFilterSuite(t *testing.T) {
suite.Run(t, new(FilterTestSuite))
}
type FilterTestSuite struct {
suite.Suite
testTopic string
testContentTopic string
ctx context.Context
ctxCancel context.CancelFunc
lightNode *WakuFilterLightNode
lightNodeHost host.Host
relayNode *relay.WakuRelay
relaySub *relay.Subscription
fullNode *WakuFilterFullNode
fullNodeHost host.Host
wg *sync.WaitGroup
contentFilter ContentFilter
subDetails *SubscriptionDetails
log *zap.Logger
}
func (s *FilterTestSuite) makeWakuRelay(topic string) (*relay.WakuRelay, *relay.Subscription, host.Host, relay.Broadcaster) {
broadcaster := relay.NewBroadcaster(10)
s.Require().NoError(broadcaster.Start(context.Background()))
port, err := tests.FindFreePort(s.T(), "", 5)
s.Require().NoError(err)
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
s.Require().NoError(err)
relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), utils.Logger())
relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), s.log)
relay.SetHost(host)
s.fullNodeHost = host
err = relay.Start(context.Background())
require.NoError(t, err)
s.Require().NoError(err)
sub, err := relay.SubscribeToTopic(context.Background(), topic)
require.NoError(t, err)
s.Require().NoError(err)
return relay, sub, host
return relay, sub, host, broadcaster
}
func makeWakuFilterLightNode(t *testing.T) (*WakuFilterLightnode, host.Host) {
port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)
func (s *FilterTestSuite) makeWakuFilterLightNode() *WakuFilterLightNode {
port, err := tests.FindFreePort(s.T(), "", 5)
s.Require().NoError(err)
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
s.Require().NoError(err)
b := relay.NewBroadcaster(10)
require.NoError(t, b.Start(context.Background()))
filterPush := NewWakuFilterLightnode(b, nil, timesource.NewDefaultClock(), utils.Logger())
s.Require().NoError(b.Start(context.Background()))
filterPush := NewWakuFilterLightNode(b, nil, timesource.NewDefaultClock(), s.log)
filterPush.SetHost(host)
s.lightNodeHost = host
err = filterPush.Start(context.Background())
require.NoError(t, err)
s.Require().NoError(err)
return filterPush, host
return filterPush
}
// Node1: Filter subscribed to content topic A
// Node2: Relay + Filter
//
// # Node1 and Node2 are peers
//
// Node2 send a successful message with topic A
// Node1 receive the message
//
// Node2 send a successful message with topic B
// Node1 doesn't receive the message
func TestWakuFilter(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds
defer cancel()
func (s *FilterTestSuite) makeWakuFilterFullNode(topic string) (*relay.WakuRelay, *WakuFilterFullNode) {
node, relaySub, host, broadcaster := s.makeWakuRelay(topic)
s.relaySub = relaySub
testTopic := "/waku/2/go/filter/test"
testContentTopic := "TopicA"
node2Filter := NewWakuFilterFullNode(timesource.NewDefaultClock(), s.log)
node2Filter.SetHost(host)
sub := broadcaster.Register(topic)
err := node2Filter.Start(s.ctx, sub)
s.Require().NoError(err)
node1, host1 := makeWakuFilterLightNode(t)
defer node1.Stop()
return node, node2Filter
}
broadcaster := relay.NewBroadcaster(10)
require.NoError(t, broadcaster.Start(context.Background()))
node2, sub2, host2 := makeWakuRelay(t, testTopic, broadcaster)
defer node2.Stop()
defer sub2.Unsubscribe()
func (s *FilterTestSuite) waitForMsg(fn func(), ch chan *protocol.Envelope) {
s.wg.Add(1)
go func() {
defer s.wg.Done()
select {
case env := <-ch:
s.Require().Equal(s.contentFilter.ContentTopics[0], env.Message().GetContentTopic())
case <-time.After(5 * time.Second):
s.Require().Fail("Message timeout")
case <-s.ctx.Done():
s.Require().Fail("test exceeded allocated time")
}
}()
node2Filter := NewWakuFilterFullnode(timesource.NewDefaultClock(), utils.Logger())
node2Filter.SetHost(host2)
sub := broadcaster.Register(testTopic)
err := node2Filter.Start(ctx, sub)
require.NoError(t, err)
fn()
host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
err = host1.Peerstore().AddProtocols(host2.ID(), FilterSubscribeID_v20beta1)
require.NoError(t, err)
s.wg.Wait()
}
contentFilter := ContentFilter{
Topic: string(testTopic),
ContentTopics: []string{testContentTopic},
func (s *FilterTestSuite) waitForTimeout(fn func(), ch chan *protocol.Envelope) {
s.wg.Add(1)
go func() {
defer s.wg.Done()
select {
case <-ch:
s.Require().Fail("should not receive another message")
case <-time.After(1 * time.Second):
// Timeout elapsed, all good
case <-s.ctx.Done():
s.Require().Fail("test exceeded allocated time")
}
}()
fn()
s.wg.Wait()
}
func (s *FilterTestSuite) subscribe(topic string, contentTopic string, peer peer.ID) *SubscriptionDetails {
s.contentFilter = ContentFilter{
Topic: string(topic),
ContentTopics: []string{contentTopic},
}
subscriptionChannel, err := node1.Subscribe(ctx, contentFilter, WithPeer(node2Filter.h.ID()))
require.NoError(t, err)
subDetails, err := s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(peer))
s.Require().NoError(err)
// Sleep to make sure the filter is subscribed
time.Sleep(2 * time.Second)
var wg sync.WaitGroup
return subDetails
}
wg.Add(1)
go func() {
defer wg.Done()
env := <-subscriptionChannel.C
require.Equal(t, contentFilter.ContentTopics[0], env.Message().GetContentTopic())
}()
func (s *FilterTestSuite) publishMsg(topic, contentTopic string, optionalPayload ...string) {
var payload string
if len(optionalPayload) > 0 {
payload = optionalPayload[0]
} else {
payload = "123"
}
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic)
require.NoError(t, err)
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(contentTopic, utils.GetUnixEpoch(), payload), topic)
s.Require().NoError(err)
}
wg.Wait()
func (s *FilterTestSuite) SetupTest() {
log := utils.Logger() //.Named("filterv2-test")
s.log = log
// Use a pointer to WaitGroup so that to avoid copying
// https://pkg.go.dev/sync#WaitGroup
s.wg = &sync.WaitGroup{}
wg.Add(1)
go func() {
select {
case <-subscriptionChannel.C:
require.Fail(t, "should not receive another message")
case <-time.After(1 * time.Second):
defer wg.Done()
case <-ctx.Done():
require.Fail(t, "test exceeded allocated time")
}
}()
// Create test context
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds
s.ctx = ctx
s.ctxCancel = cancel
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage("TopicB", utils.GetUnixEpoch()), testTopic)
require.NoError(t, err)
s.testTopic = "/waku/2/go/filter/test"
s.testContentTopic = "TopicA"
wg.Wait()
s.lightNode = s.makeWakuFilterLightNode()
wg.Add(1)
go func() {
select {
case <-subscriptionChannel.C:
require.Fail(t, "should not receive another message")
case <-time.After(1 * time.Second):
defer wg.Done()
case <-ctx.Done():
require.Fail(t, "test exceeded allocated time")
}
}()
s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic)
_, err = node1.Unsubscribe(ctx, contentFilter, Peer(node2Filter.h.ID()))
require.NoError(t, err)
// Connect nodes
s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL)
err := s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1)
s.Require().NoError(err)
}
func (s *FilterTestSuite) TearDownTest() {
s.fullNode.Stop()
s.relayNode.Stop()
s.relaySub.Unsubscribe()
s.lightNode.Stop()
s.ctxCancel()
}
func (s *FilterTestSuite) TestWakuFilter() {
// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
// Should be received
s.waitForMsg(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "first")
}, s.subDetails.C)
// Wrong content topic
s.waitForTimeout(func() {
s.publishMsg(s.testTopic, "TopicB", "second")
}, s.subDetails.C)
_, err := s.lightNode.Unsubscribe(s.ctx, s.contentFilter, Peer(s.fullNodeHost.ID()))
s.Require().NoError(err)
time.Sleep(1 * time.Second)
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic)
require.NoError(t, err)
wg.Wait()
// Should not receive after unsubscribe
s.waitForTimeout(func() {
s.publishMsg(s.testTopic, s.testContentTopic, "third")
}, s.subDetails.C)
}
func TestSubscriptionPing(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds
defer cancel()
func (s *FilterTestSuite) TestSubscriptionPing() {
testTopic := "/waku/2/go/filter/test"
node1, host1 := makeWakuFilterLightNode(t)
defer node1.Stop()
broadcaster := relay.NewBroadcaster(10)
require.NoError(t, broadcaster.Start(context.Background()))
node2, sub2, host2 := makeWakuRelay(t, testTopic, broadcaster)
defer node2.Stop()
defer sub2.Unsubscribe()
node2Filter := NewWakuFilterFullnode(timesource.NewDefaultClock(), utils.Logger())
node2Filter.SetHost(host2)
err := node2Filter.Start(ctx, relay.NoopSubscription())
require.NoError(t, err)
host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
err = host1.Peerstore().AddProtocols(host2.ID(), FilterSubscribeID_v20beta1)
require.NoError(t, err)
err = node1.Ping(context.Background(), host2.ID())
require.Error(t, err)
err := s.lightNode.Ping(context.Background(), s.fullNodeHost.ID())
s.Require().Error(err)
filterErr, ok := err.(*FilterError)
require.True(t, ok)
require.Equal(t, filterErr.Code, http.StatusNotFound)
s.Require().True(ok)
s.Require().Equal(filterErr.Code, http.StatusNotFound)
contentFilter := ContentFilter{
Topic: string(testTopic),
ContentTopics: []string{"abc"},
}
_, err = node1.Subscribe(ctx, contentFilter, WithPeer(node2Filter.h.ID()))
require.NoError(t, err)
contentTopic := "abc"
s.subDetails = s.subscribe(s.testTopic, contentTopic, s.fullNodeHost.ID())
err = node1.Ping(context.Background(), host2.ID())
require.NoError(t, err)
err = s.lightNode.Ping(context.Background(), s.fullNodeHost.ID())
s.Require().NoError(err)
}
func TestWakuFilterPeerFailure(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds
defer cancel()
testTopic := "/waku/2/go/filter/test"
testContentTopic := "TopicA"
node1, host1 := makeWakuFilterLightNode(t)
broadcaster := relay.NewBroadcaster(10)
require.NoError(t, broadcaster.Start(context.Background()))
node2, sub2, host2 := makeWakuRelay(t, testTopic, broadcaster)
defer node2.Stop()
defer sub2.Unsubscribe()
func (s *FilterTestSuite) TestPeerFailure() {
broadcaster2 := relay.NewBroadcaster(10)
require.NoError(t, broadcaster2.Start(context.Background()))
node2Filter := NewWakuFilterFullnode(timesource.NewDefaultClock(), utils.Logger(), WithTimeout(5*time.Second))
node2Filter.SetHost(host2)
sub := broadcaster.Register(testTopic)
err := node2Filter.Start(ctx, sub)
require.NoError(t, err)
s.Require().NoError(broadcaster2.Start(context.Background()))
host1.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
err = host1.Peerstore().AddProtocols(host2.ID(), FilterPushID_v20beta1)
require.NoError(t, err)
contentFilter := &ContentFilter{
Topic: string(testTopic),
ContentTopics: []string{testContentTopic},
}
f, err := node1.Subscribe(ctx, *contentFilter, WithPeer(node2Filter.h.ID()))
require.NoError(t, err)
// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
// Simulate there's been a failure before
node2Filter.subscriptions.FlagAsFailure(host1.ID())
s.fullNode.subscriptions.FlagAsFailure(s.lightNodeHost.ID())
// Sleep to make sure the filter is subscribed
time.Sleep(2 * time.Second)
require.True(t, node2Filter.subscriptions.IsFailedPeer(host1.ID()))
s.Require().True(s.fullNode.subscriptions.IsFailedPeer(s.lightNodeHost.ID()))
var wg sync.WaitGroup
s.waitForMsg(func() {
s.publishMsg(s.testTopic, s.testContentTopic)
}, s.subDetails.C)
wg.Add(1)
go func() {
defer wg.Done()
env := <-f.C
require.Equal(t, contentFilter.ContentTopics[0], env.Message().GetContentTopic())
// Failure is removed
require.False(t, node2Filter.subscriptions.IsFailedPeer(host1.ID()))
}()
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic)
require.NoError(t, err)
wg.Wait()
// Failure is removed
s.Require().False(s.fullNode.subscriptions.IsFailedPeer(s.lightNodeHost.ID()))
// Kill the subscriber
host1.Close()
s.lightNodeHost.Close()
time.Sleep(1 * time.Second)
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic)
require.NoError(t, err)
s.publishMsg(s.testTopic, s.testContentTopic)
// TODO: find out how to eliminate this sleep
time.Sleep(1 * time.Second)
require.True(t, node2Filter.subscriptions.IsFailedPeer(host1.ID()))
s.Require().True(s.fullNode.subscriptions.IsFailedPeer(s.lightNodeHost.ID()))
time.Sleep(2 * time.Second)
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic)
require.NoError(t, err)
s.publishMsg(s.testTopic, s.testContentTopic)
time.Sleep(2 * time.Second)
require.True(t, node2Filter.subscriptions.IsFailedPeer(host1.ID())) // Failed peer has been removed
require.False(t, node2Filter.subscriptions.Has(host1.ID())) // Failed peer has been removed
s.Require().True(s.fullNode.subscriptions.IsFailedPeer(s.lightNodeHost.ID())) // Failed peer has been removed
s.Require().False(s.fullNode.subscriptions.Has(s.lightNodeHost.ID())) // Failed peer has been removed
}
func (s *FilterTestSuite) TestCreateSubscription() {
// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.waitForMsg(func() {
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic)
s.Require().NoError(err)
}, s.subDetails.C)
}
func (s *FilterTestSuite) TestModifySubscription() {
// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.waitForMsg(func() {
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic)
s.Require().NoError(err)
}, s.subDetails.C)
// Subscribe to another content_topic
newContentTopic := "Topic_modified"
s.subDetails = s.subscribe(s.testTopic, newContentTopic, s.fullNodeHost.ID())
s.waitForMsg(func() {
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), s.testTopic)
s.Require().NoError(err)
}, s.subDetails.C)
}
func (s *FilterTestSuite) TestMultipleMessages() {
// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.waitForMsg(func() {
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "first"), s.testTopic)
s.Require().NoError(err)
}, s.subDetails.C)
s.waitForMsg(func() {
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), s.testTopic)
s.Require().NoError(err)
}, s.subDetails.C)
}

View File

@ -45,8 +45,8 @@ type (
}
)
// NewWakuFilterFullnode returns a new instance of Waku Filter struct setup according to the chosen parameter and options
func NewWakuFilterFullnode(timesource timesource.Timesource, log *zap.Logger, opts ...Option) *WakuFilterFullNode {
// NewWakuFilterFullNode returns a new instance of Waku Filter struct setup according to the chosen parameter and options
func NewWakuFilterFullNode(timesource timesource.Timesource, log *zap.Logger, opts ...Option) *WakuFilterFullNode {
wf := new(WakuFilterFullNode)
wf.log = log.Named("filterv2-fullnode")