From 27b5ab9c51f59d76d5aa74b39344f41a35e5ca33 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Tue, 14 Jun 2022 15:53:48 -0400 Subject: [PATCH] chore: test store being decoupled from relay and fix empty contenttopics on filter --- waku/v2/node/wakunode2_test.go | 99 +++++++++++++++++++ waku/v2/protocol/filter/filter_subscribers.go | 8 +- waku/v2/protocol/filter/waku_filter.go | 6 +- 3 files changed, 108 insertions(+), 5 deletions(-) diff --git a/waku/v2/node/wakunode2_test.go b/waku/v2/node/wakunode2_test.go index f26d42a0..61f30c00 100644 --- a/waku/v2/node/wakunode2_test.go +++ b/waku/v2/node/wakunode2_test.go @@ -11,6 +11,12 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/status-im/go-waku/tests" + "github.com/status-im/go-waku/waku/persistence" + "github.com/status-im/go-waku/waku/persistence/sqlite" + "github.com/status-im/go-waku/waku/v2/protocol/filter" + "github.com/status-im/go-waku/waku/v2/protocol/relay" + "github.com/status-im/go-waku/waku/v2/protocol/store" + "github.com/status-im/go-waku/waku/v2/utils" "github.com/stretchr/testify/require" ) @@ -142,3 +148,96 @@ func Test5000(t *testing.T) { wg.Wait() } + +func TestDecoupledStoreFromRelay(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // NODE1: Relay Node + Filter Server + hostAddr1, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0") + require.NoError(t, err) + wakuNode1, err := New(ctx, + WithHostAddress(hostAddr1), + WithWakuRelay(), + WithWakuFilter(true), + ) + require.NoError(t, err) + err = wakuNode1.Start() + require.NoError(t, err) + defer wakuNode1.Stop() + + // NODE2: Filter Client/Store + db, err := sqlite.NewDB(":memory:") + require.NoError(t, err) + dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db)) + require.NoError(t, err) + + hostAddr2, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0") + require.NoError(t, err) + wakuNode2, err := New(ctx, + WithHostAddress(hostAddr2), + WithWakuFilter(false), + WithWakuStore(true, false), + WithMessageProvider(dbStore), + ) + require.NoError(t, err) + err = wakuNode2.Start() + require.NoError(t, err) + defer wakuNode2.Stop() + + err = wakuNode2.DialPeerWithMultiAddress(ctx, wakuNode1.ListenAddresses()[0]) + require.NoError(t, err) + + time.Sleep(2 * time.Second) + + _, filter, err := wakuNode2.Filter().Subscribe(ctx, filter.ContentFilter{ + Topic: string(relay.DefaultWakuTopic), + }) + require.NoError(t, err) + + // Sleep to make sure the filter is subscribed + time.Sleep(1 * time.Second) + + // Send MSG1 on NODE1 + msg := createTestMsg(0) + msg.Payload = []byte{1, 2, 3, 4, 5} + msg.Timestamp = utils.GetUnixEpoch() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + // MSG1 should be pushed in NODE2 via filter + defer wg.Done() + env := <-filter.Chan + require.Equal(t, msg.Timestamp, env.Message().Timestamp) + }() + + time.Sleep(500 * time.Millisecond) + + if err := wakuNode1.Publish(ctx, msg); err != nil { + require.Fail(t, "Could not publish all messages") + } + + wg.Wait() + + // NODE3: Query from NODE2 + hostAddr3, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0") + require.NoError(t, err) + wakuNode3, err := New(ctx, + WithHostAddress(hostAddr3), + WithWakuFilter(false), + ) + require.NoError(t, err) + err = wakuNode3.Start() + require.NoError(t, err) + defer wakuNode3.Stop() + + err = wakuNode3.DialPeerWithMultiAddress(ctx, wakuNode2.ListenAddresses()[0]) + require.NoError(t, err) + + // NODE2 should have returned the message received via filter + result, err := wakuNode3.Store().Query(ctx, store.Query{}) + require.NoError(t, err) + require.Len(t, result.Messages, 1) + require.Equal(t, msg.Timestamp, result.Messages[0].Timestamp) +} diff --git a/waku/v2/protocol/filter/filter_subscribers.go b/waku/v2/protocol/filter/filter_subscribers.go index 49c1d306..fdfd7389 100644 --- a/waku/v2/protocol/filter/filter_subscribers.go +++ b/waku/v2/protocol/filter/filter_subscribers.go @@ -15,6 +15,10 @@ type Subscriber struct { } func (sub Subscriber) HasContentTopic(topic string) bool { + if len(sub.filter.ContentFilters) == 0 { + return true // When the subscriber has no specific ContentTopic filter + } + for _, filter := range sub.filter.ContentFilters { if filter.ContentTopic == topic { return true @@ -45,14 +49,14 @@ func (sub *Subscribers) Append(s Subscriber) int { return len(sub.subscribers) } -func (sub *Subscribers) Items(topic *string) <-chan Subscriber { +func (sub *Subscribers) Items(contentTopic *string) <-chan Subscriber { c := make(chan Subscriber) f := func() { sub.RLock() defer sub.RUnlock() for _, s := range sub.subscribers { - if topic == nil || s.HasContentTopic(*topic) { + if contentTopic == nil || s.HasContentTopic(*contentTopic) { c <- s } } diff --git a/waku/v2/protocol/filter/waku_filter.go b/waku/v2/protocol/filter/waku_filter.go index b7f2fbc1..7b41431c 100644 --- a/waku/v2/protocol/filter/waku_filter.go +++ b/waku/v2/protocol/filter/waku_filter.go @@ -184,7 +184,7 @@ func (wf *WakuFilter) FilterListener() { // on the full node in context of Waku2-Filter handle := func(envelope *protocol.Envelope) error { // async msg := envelope.Message() - topic := envelope.PubsubTopic() + pubsubTopic := envelope.PubsubTopic() logger := wf.log.With(zap.Stringer("message", msg)) g := new(errgroup.Group) // Each subscriber is a light node that earlier on invoked @@ -192,10 +192,10 @@ func (wf *WakuFilter) FilterListener() { for subscriber := range wf.subscribers.Items(&(msg.ContentTopic)) { logger := logger.With(logging.HostID("subscriber", subscriber.peer)) subscriber := subscriber // https://golang.org/doc/faq#closures_and_goroutines - if subscriber.filter.Topic != "" && subscriber.filter.Topic != topic { + if subscriber.filter.Topic != "" && subscriber.filter.Topic != pubsubTopic { logger.Info("pubsub topic mismatch", zap.String("subscriberTopic", subscriber.filter.Topic), - zap.String("messageTopic", topic)) + zap.String("messageTopic", pubsubTopic)) continue }