test: resume and time based queries (#108)

This commit is contained in:
Richard Ramos 2021-10-28 09:03:23 -04:00 committed by GitHub
parent 9030907960
commit 8ba64affba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 230 additions and 111 deletions

View File

@ -13,6 +13,7 @@ import (
"github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/host"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
ma "github.com/multiformats/go-multiaddr" ma "github.com/multiformats/go-multiaddr"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
) )
func GetHostAddress(ha host.Host) ma.Multiaddr { func GetHostAddress(ha host.Host) ma.Multiaddr {
@ -67,3 +68,7 @@ func MakeHost(ctx context.Context, port int, randomness io.Reader) (host.Host, e
libp2p.Identity(prvKey), libp2p.Identity(prvKey),
) )
} }
func CreateWakuMessage(contentTopic string, timestamp float64) *pb.WakuMessage {
return &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: contentTopic, Version: 0, Timestamp: timestamp}
}

View File

@ -85,7 +85,7 @@ func TestWakuLightPush(t *testing.T) {
req.PubsubTopic = string(testTopic) req.PubsubTopic = string(testTopic)
// Wait for the mesh connection to happen between node1 and node2 // Wait for the mesh connection to happen between node1 and node2
time.Sleep(5 * time.Second) time.Sleep(2 * time.Second)
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(1) wg.Add(1)

View File

@ -0,0 +1,146 @@
package store
import (
"context"
"testing"
"time"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/status-im/go-waku/tests"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/stretchr/testify/require"
)
func TestFindLastSeenMessage(t *testing.T) {
msg1 := tests.CreateWakuMessage("1", 1)
msg2 := tests.CreateWakuMessage("2", 2)
msg3 := tests.CreateWakuMessage("3", 3)
msg4 := tests.CreateWakuMessage("4", 4)
msg5 := tests.CreateWakuMessage("5", 5)
s := NewWakuStore(true, nil)
s.storeMessage("test", msg1)
s.storeMessage("test", msg3)
s.storeMessage("test", msg5)
s.storeMessage("test", msg2)
s.storeMessage("test", msg4)
require.Equal(t, msg5.Timestamp, s.findLastSeen())
}
func TestResume(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(true, nil)
s1.Start(ctx, host1)
defer s1.Stop()
for i := 0; i < 10; i++ {
var contentTopic = "1"
if i%2 == 0 {
contentTopic = "2"
}
msg := tests.CreateWakuMessage(contentTopic, float64(time.Duration(i)*time.Second))
s1.storeMessage("test", msg)
}
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s2 := NewWakuStore(false, nil)
s2.Start(ctx, host2)
defer s2.Stop()
host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL)
err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta3))
require.NoError(t, err)
msgCount, err := s2.Resume(ctx, "test", []peer.ID{host1.ID()})
require.NoError(t, err)
require.Equal(t, 10, msgCount)
require.Len(t, s2.messages, 10)
// Test duplication
msgCount, err = s2.Resume(ctx, "test", []peer.ID{host1.ID()})
require.NoError(t, err)
require.Equal(t, 0, msgCount)
}
func TestResumeWithListOfPeers(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Host that does not support store protocol
invalidHost, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(true, nil)
s1.Start(ctx, host1)
defer s1.Stop()
msg0 := &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: "2", Version: 0, Timestamp: float64(0 * time.Second)}
s1.storeMessage("test", msg0)
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s2 := NewWakuStore(false, nil)
s2.Start(ctx, host2)
defer s2.Stop()
host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL)
err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta3))
require.NoError(t, err)
msgCount, err := s2.Resume(ctx, "test", []peer.ID{invalidHost.ID(), host1.ID()})
require.NoError(t, err)
require.Equal(t, 1, msgCount)
require.Len(t, s2.messages, 1)
}
func TestResumeWithoutSpecifyingPeer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(true, nil)
s1.Start(ctx, host1)
defer s1.Stop()
msg0 := &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: "2", Version: 0, Timestamp: float64(0 * time.Second)}
s1.storeMessage("test", msg0)
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s2 := NewWakuStore(false, nil)
s2.Start(ctx, host2)
defer s2.Stop()
host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL)
err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta3))
require.NoError(t, err)
msgCount, err := s2.Resume(ctx, "test", []peer.ID{})
require.NoError(t, err)
require.Equal(t, 1, msgCount)
require.Len(t, s2.messages, 1)
}

View File

@ -552,9 +552,10 @@ func (store *WakuStore) queryLoop(ctx context.Context, query *pb.HistoryQuery, c
// returns the number of retrieved messages, or error if all the requests fail // returns the number of retrieved messages, or error if all the requests fail
for _, peer := range candidateList { for _, peer := range candidateList {
result, err := store.queryFrom(ctx, query, peer, protocol.GenerateRequestId()) result, err := store.queryFrom(ctx, query, peer, protocol.GenerateRequestId())
if err != nil { if err == nil {
return result, nil return result, nil
} }
log.Error(fmt.Errorf("resume history with peer %s failed: %w", peer, err))
} }
return nil, ErrFailedQuery return nil, ErrFailedQuery

View File

@ -42,4 +42,7 @@ func TestStorePersistence(t *testing.T) {
s2.fetchDBRecords(ctx) s2.fetchDBRecords(ctx)
require.Len(t, s2.messages, 1) require.Len(t, s2.messages, 1)
require.Equal(t, msg, s2.messages[0].msg) require.Equal(t, msg, s2.messages[0].msg)
// Storing a duplicated message should not crash. It's okay to generate an error log in this case
s1.storeMessage(defaultPubSubTopic, msg)
} }

View File

@ -3,6 +3,7 @@ package store
import ( import (
"testing" "testing"
"github.com/status-im/go-waku/tests"
"github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/utils" "github.com/status-im/go-waku/waku/v2/utils"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -12,22 +13,11 @@ func TestStoreQuery(t *testing.T) {
defaultPubSubTopic := "test" defaultPubSubTopic := "test"
defaultContentTopic := "1" defaultContentTopic := "1"
msg := &pb.WakuMessage{ msg1 := tests.CreateWakuMessage(defaultContentTopic, utils.GetUnixEpoch())
Payload: []byte{1, 2, 3}, msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch())
ContentTopic: defaultContentTopic,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msg2 := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "2",
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
s := NewWakuStore(true, nil) s := NewWakuStore(true, nil)
s.storeMessage(defaultPubSubTopic, msg) s.storeMessage(defaultPubSubTopic, msg1)
s.storeMessage(defaultPubSubTopic, msg2) s.storeMessage(defaultPubSubTopic, msg2)
response := s.FindMessages(&pb.HistoryQuery{ response := s.FindMessages(&pb.HistoryQuery{
@ -39,7 +29,7 @@ func TestStoreQuery(t *testing.T) {
}) })
require.Len(t, response.Messages, 1) require.Len(t, response.Messages, 1)
require.Equal(t, msg, response.Messages[0]) require.Equal(t, msg1, response.Messages[0])
} }
func TestStoreQueryMultipleContentFilters(t *testing.T) { func TestStoreQueryMultipleContentFilters(t *testing.T) {
@ -48,29 +38,13 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) {
topic2 := "2" topic2 := "2"
topic3 := "3" topic3 := "3"
msg := &pb.WakuMessage{ msg1 := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch())
Payload: []byte{1, 2, 3}, msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
ContentTopic: topic1, msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msg2 := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: topic2,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msg3 := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: topic3,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
s := NewWakuStore(true, nil) s := NewWakuStore(true, nil)
s.storeMessage(defaultPubSubTopic, msg)
s.storeMessage(defaultPubSubTopic, msg1)
s.storeMessage(defaultPubSubTopic, msg2) s.storeMessage(defaultPubSubTopic, msg2)
s.storeMessage(defaultPubSubTopic, msg3) s.storeMessage(defaultPubSubTopic, msg3)
@ -86,7 +60,7 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) {
}) })
require.Len(t, response.Messages, 2) require.Len(t, response.Messages, 2)
require.Contains(t, response.Messages, msg) require.Contains(t, response.Messages, msg1)
require.Contains(t, response.Messages, msg3) require.Contains(t, response.Messages, msg3)
require.NotContains(t, response.Messages, msg2) require.NotContains(t, response.Messages, msg2)
} }
@ -98,29 +72,12 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) {
pubsubTopic1 := "topic1" pubsubTopic1 := "topic1"
pubsubTopic2 := "topic2" pubsubTopic2 := "topic2"
msg := &pb.WakuMessage{ msg1 := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch())
Payload: []byte{1, 2, 3}, msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
ContentTopic: topic1, msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msg2 := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: topic2,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msg3 := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: topic3,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
s := NewWakuStore(true, nil) s := NewWakuStore(true, nil)
s.storeMessage(pubsubTopic1, msg) s.storeMessage(pubsubTopic1, msg1)
s.storeMessage(pubsubTopic2, msg2) s.storeMessage(pubsubTopic2, msg2)
s.storeMessage(pubsubTopic2, msg3) s.storeMessage(pubsubTopic2, msg3)
@ -137,7 +94,7 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) {
}) })
require.Len(t, response.Messages, 1) require.Len(t, response.Messages, 1)
require.Equal(t, msg, response.Messages[0]) require.Equal(t, msg1, response.Messages[0])
} }
func TestStoreQueryPubsubTopicNoMatch(t *testing.T) { func TestStoreQueryPubsubTopicNoMatch(t *testing.T) {
@ -147,29 +104,12 @@ func TestStoreQueryPubsubTopicNoMatch(t *testing.T) {
pubsubTopic1 := "topic1" pubsubTopic1 := "topic1"
pubsubTopic2 := "topic2" pubsubTopic2 := "topic2"
msg := &pb.WakuMessage{ msg1 := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch())
Payload: []byte{1, 2, 3}, msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
ContentTopic: topic1, msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msg2 := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: topic2,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msg3 := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: topic3,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
s := NewWakuStore(true, nil) s := NewWakuStore(true, nil)
s.storeMessage(pubsubTopic2, msg) s.storeMessage(pubsubTopic2, msg1)
s.storeMessage(pubsubTopic2, msg2) s.storeMessage(pubsubTopic2, msg2)
s.storeMessage(pubsubTopic2, msg3) s.storeMessage(pubsubTopic2, msg3)
@ -186,29 +126,12 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) {
topic3 := "3" topic3 := "3"
pubsubTopic1 := "topic1" pubsubTopic1 := "topic1"
msg := &pb.WakuMessage{ msg1 := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch())
Payload: []byte{1, 2, 3}, msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
ContentTopic: topic1, msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msg2 := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: topic2,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
msg3 := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: topic3,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
s := NewWakuStore(true, nil) s := NewWakuStore(true, nil)
s.storeMessage(pubsubTopic1, msg) s.storeMessage(pubsubTopic1, msg1)
s.storeMessage(pubsubTopic1, msg2) s.storeMessage(pubsubTopic1, msg2)
s.storeMessage(pubsubTopic1, msg3) s.storeMessage(pubsubTopic1, msg3)
@ -217,7 +140,7 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) {
}) })
require.Len(t, response.Messages, 3) require.Len(t, response.Messages, 3)
require.Contains(t, response.Messages, msg) require.Contains(t, response.Messages, msg1)
require.Contains(t, response.Messages, msg2) require.Contains(t, response.Messages, msg2)
require.Contains(t, response.Messages, msg3) require.Contains(t, response.Messages, msg3)
} }
@ -228,14 +151,9 @@ func TestStoreQueryForwardPagination(t *testing.T) {
s := NewWakuStore(true, nil) s := NewWakuStore(true, nil)
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
msg := &pb.WakuMessage{ msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch())
Payload: []byte{byte(i)}, msg.Payload = []byte{byte(i)}
ContentTopic: topic1,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
s.storeMessage(pubsubTopic1, msg) s.storeMessage(pubsubTopic1, msg)
} }
response := s.FindMessages(&pb.HistoryQuery{ response := s.FindMessages(&pb.HistoryQuery{
@ -279,3 +197,49 @@ func TestStoreQueryBackwardPagination(t *testing.T) {
require.Equal(t, byte(i), response.Messages[i].Payload[0]) require.Equal(t, byte(i), response.Messages[i].Payload[0])
} }
} }
func TestTemporalHistoryQueries(t *testing.T) {
s := NewWakuStore(true, nil)
var messages []*pb.WakuMessage
for i := 0; i < 10; i++ {
contentTopic := "1"
if i%2 == 0 {
contentTopic = "2"
}
msg := tests.CreateWakuMessage(contentTopic, float64(i))
s.storeMessage("test", msg)
messages = append(messages, msg)
}
// handle temporal history query with a valid time window
response := s.FindMessages(&pb.HistoryQuery{
ContentFilters: []*pb.ContentFilter{{ContentTopic: "1"}},
StartTime: float64(2),
EndTime: float64(5),
})
require.Len(t, response.Messages, 2)
require.Equal(t, messages[3].Timestamp, response.Messages[0].Timestamp)
require.Equal(t, messages[5].Timestamp, response.Messages[1].Timestamp)
// handle temporal history query with a zero-size time window
response = s.FindMessages(&pb.HistoryQuery{
ContentFilters: []*pb.ContentFilter{{ContentTopic: "1"}},
StartTime: float64(2),
EndTime: float64(2),
})
require.Len(t, response.Messages, 0)
// handle temporal history query with an invalid time window
response = s.FindMessages(&pb.HistoryQuery{
ContentFilters: []*pb.ContentFilter{{ContentTopic: "1"}},
StartTime: float64(5),
EndTime: float64(2),
})
// time window is invalid since start time > end time
// perhaps it should return an error?
require.Len(t, response.Messages, 0)
}