chore: test store being decoupled from relay and fix empty contenttopics on filter

This commit is contained in:
Richard Ramos 2022-06-14 15:53:48 -04:00
parent 2c2725308f
commit 27b5ab9c51
3 changed files with 108 additions and 5 deletions

View File

@ -11,6 +11,12 @@ import (
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/status-im/go-waku/tests" "github.com/status-im/go-waku/tests"
"github.com/status-im/go-waku/waku/persistence"
"github.com/status-im/go-waku/waku/persistence/sqlite"
"github.com/status-im/go-waku/waku/v2/protocol/filter"
"github.com/status-im/go-waku/waku/v2/protocol/relay"
"github.com/status-im/go-waku/waku/v2/protocol/store"
"github.com/status-im/go-waku/waku/v2/utils"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -142,3 +148,96 @@ func Test5000(t *testing.T) {
wg.Wait() wg.Wait()
} }
func TestDecoupledStoreFromRelay(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// NODE1: Relay Node + Filter Server
hostAddr1, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
require.NoError(t, err)
wakuNode1, err := New(ctx,
WithHostAddress(hostAddr1),
WithWakuRelay(),
WithWakuFilter(true),
)
require.NoError(t, err)
err = wakuNode1.Start()
require.NoError(t, err)
defer wakuNode1.Stop()
// NODE2: Filter Client/Store
db, err := sqlite.NewDB(":memory:")
require.NoError(t, err)
dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db))
require.NoError(t, err)
hostAddr2, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
require.NoError(t, err)
wakuNode2, err := New(ctx,
WithHostAddress(hostAddr2),
WithWakuFilter(false),
WithWakuStore(true, false),
WithMessageProvider(dbStore),
)
require.NoError(t, err)
err = wakuNode2.Start()
require.NoError(t, err)
defer wakuNode2.Stop()
err = wakuNode2.DialPeerWithMultiAddress(ctx, wakuNode1.ListenAddresses()[0])
require.NoError(t, err)
time.Sleep(2 * time.Second)
_, filter, err := wakuNode2.Filter().Subscribe(ctx, filter.ContentFilter{
Topic: string(relay.DefaultWakuTopic),
})
require.NoError(t, err)
// Sleep to make sure the filter is subscribed
time.Sleep(1 * time.Second)
// Send MSG1 on NODE1
msg := createTestMsg(0)
msg.Payload = []byte{1, 2, 3, 4, 5}
msg.Timestamp = utils.GetUnixEpoch()
var wg sync.WaitGroup
wg.Add(1)
go func() {
// MSG1 should be pushed in NODE2 via filter
defer wg.Done()
env := <-filter.Chan
require.Equal(t, msg.Timestamp, env.Message().Timestamp)
}()
time.Sleep(500 * time.Millisecond)
if err := wakuNode1.Publish(ctx, msg); err != nil {
require.Fail(t, "Could not publish all messages")
}
wg.Wait()
// NODE3: Query from NODE2
hostAddr3, err := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
require.NoError(t, err)
wakuNode3, err := New(ctx,
WithHostAddress(hostAddr3),
WithWakuFilter(false),
)
require.NoError(t, err)
err = wakuNode3.Start()
require.NoError(t, err)
defer wakuNode3.Stop()
err = wakuNode3.DialPeerWithMultiAddress(ctx, wakuNode2.ListenAddresses()[0])
require.NoError(t, err)
// NODE2 should have returned the message received via filter
result, err := wakuNode3.Store().Query(ctx, store.Query{})
require.NoError(t, err)
require.Len(t, result.Messages, 1)
require.Equal(t, msg.Timestamp, result.Messages[0].Timestamp)
}

View File

@ -15,6 +15,10 @@ type Subscriber struct {
} }
func (sub Subscriber) HasContentTopic(topic string) bool { func (sub Subscriber) HasContentTopic(topic string) bool {
if len(sub.filter.ContentFilters) == 0 {
return true // When the subscriber has no specific ContentTopic filter
}
for _, filter := range sub.filter.ContentFilters { for _, filter := range sub.filter.ContentFilters {
if filter.ContentTopic == topic { if filter.ContentTopic == topic {
return true return true
@ -45,14 +49,14 @@ func (sub *Subscribers) Append(s Subscriber) int {
return len(sub.subscribers) return len(sub.subscribers)
} }
func (sub *Subscribers) Items(topic *string) <-chan Subscriber { func (sub *Subscribers) Items(contentTopic *string) <-chan Subscriber {
c := make(chan Subscriber) c := make(chan Subscriber)
f := func() { f := func() {
sub.RLock() sub.RLock()
defer sub.RUnlock() defer sub.RUnlock()
for _, s := range sub.subscribers { for _, s := range sub.subscribers {
if topic == nil || s.HasContentTopic(*topic) { if contentTopic == nil || s.HasContentTopic(*contentTopic) {
c <- s c <- s
} }
} }

View File

@ -184,7 +184,7 @@ func (wf *WakuFilter) FilterListener() {
// on the full node in context of Waku2-Filter // on the full node in context of Waku2-Filter
handle := func(envelope *protocol.Envelope) error { // async handle := func(envelope *protocol.Envelope) error { // async
msg := envelope.Message() msg := envelope.Message()
topic := envelope.PubsubTopic() pubsubTopic := envelope.PubsubTopic()
logger := wf.log.With(zap.Stringer("message", msg)) logger := wf.log.With(zap.Stringer("message", msg))
g := new(errgroup.Group) g := new(errgroup.Group)
// Each subscriber is a light node that earlier on invoked // Each subscriber is a light node that earlier on invoked
@ -192,10 +192,10 @@ func (wf *WakuFilter) FilterListener() {
for subscriber := range wf.subscribers.Items(&(msg.ContentTopic)) { for subscriber := range wf.subscribers.Items(&(msg.ContentTopic)) {
logger := logger.With(logging.HostID("subscriber", subscriber.peer)) logger := logger.With(logging.HostID("subscriber", subscriber.peer))
subscriber := subscriber // https://golang.org/doc/faq#closures_and_goroutines subscriber := subscriber // https://golang.org/doc/faq#closures_and_goroutines
if subscriber.filter.Topic != "" && subscriber.filter.Topic != topic { if subscriber.filter.Topic != "" && subscriber.filter.Topic != pubsubTopic {
logger.Info("pubsub topic mismatch", logger.Info("pubsub topic mismatch",
zap.String("subscriberTopic", subscriber.filter.Topic), zap.String("subscriberTopic", subscriber.filter.Topic),
zap.String("messageTopic", topic)) zap.String("messageTopic", pubsubTopic))
continue continue
} }