mirror of https://github.com/status-im/go-waku.git
fix: store protocol should have a host regardless if acts as store node or not (#142)
This commit is contained in:
parent
df6757fc33
commit
9426cd133a
|
@ -128,7 +128,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WakuNode) Start() error {
|
func (w *WakuNode) Start() error {
|
||||||
w.store = store.NewWakuStore(w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration)
|
w.store = store.NewWakuStore(w.host, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration)
|
||||||
if w.opts.enableStore {
|
if w.opts.enableStore {
|
||||||
w.startStore()
|
w.startStore()
|
||||||
}
|
}
|
||||||
|
@ -265,7 +265,7 @@ func (w *WakuNode) mountRendezvous() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WakuNode) startStore() {
|
func (w *WakuNode) startStore() {
|
||||||
w.store.Start(w.ctx, w.host)
|
w.store.Start(w.ctx)
|
||||||
|
|
||||||
if w.opts.shouldResume {
|
if w.opts.shouldResume {
|
||||||
// TODO: extract this to a function and run it when you go offline
|
// TODO: extract this to a function and run it when you go offline
|
||||||
|
|
|
@ -21,7 +21,7 @@ func TestFindLastSeenMessage(t *testing.T) {
|
||||||
msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", 4), "test")
|
msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", 4), "test")
|
||||||
msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", 5), "test")
|
msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", 5), "test")
|
||||||
|
|
||||||
s := NewWakuStore(nil, 0, 0)
|
s := NewWakuStore(nil, nil, 0, 0)
|
||||||
s.storeMessage(msg1)
|
s.storeMessage(msg1)
|
||||||
s.storeMessage(msg3)
|
s.storeMessage(msg3)
|
||||||
s.storeMessage(msg5)
|
s.storeMessage(msg5)
|
||||||
|
@ -38,8 +38,8 @@ func TestResume(t *testing.T) {
|
||||||
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s1 := NewWakuStore(nil, 0, 0)
|
s1 := NewWakuStore(host1, nil, 0, 0)
|
||||||
s1.Start(ctx, host1)
|
s1.Start(ctx)
|
||||||
defer s1.Stop()
|
defer s1.Stop()
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
|
@ -55,8 +55,8 @@ func TestResume(t *testing.T) {
|
||||||
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s2 := NewWakuStore(nil, 0, 0)
|
s2 := NewWakuStore(host2, nil, 0, 0)
|
||||||
s2.Start(ctx, host2)
|
s2.Start(ctx)
|
||||||
defer s2.Stop()
|
defer s2.Stop()
|
||||||
|
|
||||||
host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL)
|
host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL)
|
||||||
|
@ -87,8 +87,8 @@ func TestResumeWithListOfPeers(t *testing.T) {
|
||||||
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s1 := NewWakuStore(nil, 0, 0)
|
s1 := NewWakuStore(host1, nil, 0, 0)
|
||||||
s1.Start(ctx, host1)
|
s1.Start(ctx)
|
||||||
defer s1.Stop()
|
defer s1.Stop()
|
||||||
|
|
||||||
msg0 := &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: "2", Version: 0, Timestamp: float64(0 * time.Second)}
|
msg0 := &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: "2", Version: 0, Timestamp: float64(0 * time.Second)}
|
||||||
|
@ -98,8 +98,8 @@ func TestResumeWithListOfPeers(t *testing.T) {
|
||||||
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s2 := NewWakuStore(nil, 0, 0)
|
s2 := NewWakuStore(host2, nil, 0, 0)
|
||||||
s2.Start(ctx, host2)
|
s2.Start(ctx)
|
||||||
defer s2.Stop()
|
defer s2.Stop()
|
||||||
|
|
||||||
host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL)
|
host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL)
|
||||||
|
@ -120,8 +120,8 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
|
||||||
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s1 := NewWakuStore(nil, 0, 0)
|
s1 := NewWakuStore(host1, nil, 0, 0)
|
||||||
s1.Start(ctx, host1)
|
s1.Start(ctx)
|
||||||
defer s1.Stop()
|
defer s1.Stop()
|
||||||
|
|
||||||
msg0 := &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: "2", Version: 0, Timestamp: float64(0 * time.Second)}
|
msg0 := &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: "2", Version: 0, Timestamp: float64(0 * time.Second)}
|
||||||
|
@ -131,8 +131,8 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
|
||||||
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s2 := NewWakuStore(nil, 0, 0)
|
s2 := NewWakuStore(host2, nil, 0, 0)
|
||||||
s2.Start(ctx, host2)
|
s2.Start(ctx)
|
||||||
defer s2.Stop()
|
defer s2.Stop()
|
||||||
|
|
||||||
host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL)
|
host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL)
|
||||||
|
|
|
@ -236,9 +236,10 @@ type WakuStore struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages
|
// NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages
|
||||||
func NewWakuStore(p MessageProvider, maxNumberOfMessages int, maxRetentionDuration time.Duration) *WakuStore {
|
func NewWakuStore(host host.Host, p MessageProvider, maxNumberOfMessages int, maxRetentionDuration time.Duration) *WakuStore {
|
||||||
wakuStore := new(WakuStore)
|
wakuStore := new(WakuStore)
|
||||||
wakuStore.msgProvider = p
|
wakuStore.msgProvider = p
|
||||||
|
wakuStore.h = host
|
||||||
wakuStore.messageQueue = NewMessageQueue(maxNumberOfMessages, maxRetentionDuration)
|
wakuStore.messageQueue = NewMessageQueue(maxNumberOfMessages, maxRetentionDuration)
|
||||||
return wakuStore
|
return wakuStore
|
||||||
}
|
}
|
||||||
|
@ -249,13 +250,12 @@ func (store *WakuStore) SetMessageProvider(p MessageProvider) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start initializes the WakuStore by enabling the protocol and fetching records from a message provider
|
// Start initializes the WakuStore by enabling the protocol and fetching records from a message provider
|
||||||
func (store *WakuStore) Start(ctx context.Context, h host.Host) {
|
func (store *WakuStore) Start(ctx context.Context) {
|
||||||
if store.started {
|
if store.started {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
store.started = true
|
store.started = true
|
||||||
store.h = h
|
|
||||||
store.ctx = ctx
|
store.ctx = ctx
|
||||||
store.MsgC = make(chan *protocol.Envelope, 1024)
|
store.MsgC = make(chan *protocol.Envelope, 1024)
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,7 @@ func TestStorePersistence(t *testing.T) {
|
||||||
dbStore, err := persistence.NewDBStore(persistence.WithDB(db))
|
dbStore, err := persistence.NewDBStore(persistence.WithDB(db))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s1 := NewWakuStore(dbStore, 0, 0)
|
s1 := NewWakuStore(nil, dbStore, 0, 0)
|
||||||
s1.fetchDBRecords(ctx)
|
s1.fetchDBRecords(ctx)
|
||||||
require.Len(t, s1.messageQueue.messages, 0)
|
require.Len(t, s1.messageQueue.messages, 0)
|
||||||
|
|
||||||
|
@ -39,7 +39,7 @@ func TestStorePersistence(t *testing.T) {
|
||||||
|
|
||||||
s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic))
|
s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic))
|
||||||
|
|
||||||
s2 := NewWakuStore(dbStore, 0, 0)
|
s2 := NewWakuStore(nil, dbStore, 0, 0)
|
||||||
s2.fetchDBRecords(ctx)
|
s2.fetchDBRecords(ctx)
|
||||||
require.Len(t, s2.messageQueue.messages, 1)
|
require.Len(t, s2.messageQueue.messages, 1)
|
||||||
require.Equal(t, msg, s2.messageQueue.messages[0].msg)
|
require.Equal(t, msg, s2.messageQueue.messages[0].msg)
|
||||||
|
|
|
@ -20,8 +20,8 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
|
||||||
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s1 := NewWakuStore(nil, 0, 0)
|
s1 := NewWakuStore(host1, nil, 0, 0)
|
||||||
s1.Start(ctx, host1)
|
s1.Start(ctx)
|
||||||
defer s1.Stop()
|
defer s1.Stop()
|
||||||
|
|
||||||
topic1 := "1"
|
topic1 := "1"
|
||||||
|
@ -39,8 +39,8 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
|
||||||
// Simulate a message has been received via relay protocol
|
// Simulate a message has been received via relay protocol
|
||||||
s1.MsgC <- protocol.NewEnvelope(msg, pubsubTopic1)
|
s1.MsgC <- protocol.NewEnvelope(msg, pubsubTopic1)
|
||||||
|
|
||||||
s2 := NewWakuStore(nil, 0, 0)
|
s2 := NewWakuStore(host2, nil, 0, 0)
|
||||||
s2.Start(ctx, host2)
|
s2.Start(ctx)
|
||||||
defer s2.Stop()
|
defer s2.Stop()
|
||||||
|
|
||||||
host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL)
|
host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL)
|
||||||
|
@ -66,8 +66,8 @@ func TestWakuStoreProtocolNext(t *testing.T) {
|
||||||
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s1 := NewWakuStore(nil, 0, 0)
|
s1 := NewWakuStore(host1, nil, 0, 0)
|
||||||
s1.Start(ctx, host1)
|
s1.Start(ctx)
|
||||||
defer s1.Stop()
|
defer s1.Stop()
|
||||||
|
|
||||||
topic1 := "1"
|
topic1 := "1"
|
||||||
|
@ -92,8 +92,8 @@ func TestWakuStoreProtocolNext(t *testing.T) {
|
||||||
err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta3))
|
err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta3))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s2 := NewWakuStore(nil, 0, 0)
|
s2 := NewWakuStore(host2, nil, 0, 0)
|
||||||
s2.Start(ctx, host2)
|
s2.Start(ctx)
|
||||||
defer s2.Stop()
|
defer s2.Stop()
|
||||||
|
|
||||||
q := Query{
|
q := Query{
|
||||||
|
|
|
@ -17,7 +17,7 @@ func TestStoreQuery(t *testing.T) {
|
||||||
msg1 := tests.CreateWakuMessage(defaultContentTopic, utils.GetUnixEpoch())
|
msg1 := tests.CreateWakuMessage(defaultContentTopic, utils.GetUnixEpoch())
|
||||||
msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch())
|
msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch())
|
||||||
|
|
||||||
s := NewWakuStore(nil, 0, 0)
|
s := NewWakuStore(nil, nil, 0, 0)
|
||||||
s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic))
|
s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic))
|
||||||
s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic))
|
s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic))
|
||||||
|
|
||||||
|
@ -43,7 +43,7 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) {
|
||||||
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
||||||
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
||||||
|
|
||||||
s := NewWakuStore(nil, 0, 0)
|
s := NewWakuStore(nil, nil, 0, 0)
|
||||||
|
|
||||||
s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic))
|
s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic))
|
||||||
s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic))
|
s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic))
|
||||||
|
@ -77,7 +77,7 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) {
|
||||||
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
||||||
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
||||||
|
|
||||||
s := NewWakuStore(nil, 0, 0)
|
s := NewWakuStore(nil, nil, 0, 0)
|
||||||
s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1))
|
s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1))
|
||||||
s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2))
|
s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2))
|
||||||
s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2))
|
s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2))
|
||||||
|
@ -109,7 +109,7 @@ func TestStoreQueryPubsubTopicNoMatch(t *testing.T) {
|
||||||
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
||||||
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
||||||
|
|
||||||
s := NewWakuStore(nil, 0, 0)
|
s := NewWakuStore(nil, nil, 0, 0)
|
||||||
s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic2))
|
s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic2))
|
||||||
s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2))
|
s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2))
|
||||||
s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2))
|
s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2))
|
||||||
|
@ -131,7 +131,7 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) {
|
||||||
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
||||||
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
||||||
|
|
||||||
s := NewWakuStore(nil, 0, 0)
|
s := NewWakuStore(nil, nil, 0, 0)
|
||||||
s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1))
|
s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1))
|
||||||
s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic1))
|
s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic1))
|
||||||
s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic1))
|
s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic1))
|
||||||
|
@ -150,7 +150,7 @@ func TestStoreQueryForwardPagination(t *testing.T) {
|
||||||
topic1 := "1"
|
topic1 := "1"
|
||||||
pubsubTopic1 := "topic1"
|
pubsubTopic1 := "topic1"
|
||||||
|
|
||||||
s := NewWakuStore(nil, 0, 0)
|
s := NewWakuStore(nil, nil, 0, 0)
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch())
|
msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch())
|
||||||
msg.Payload = []byte{byte(i)}
|
msg.Payload = []byte{byte(i)}
|
||||||
|
@ -174,7 +174,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) {
|
||||||
topic1 := "1"
|
topic1 := "1"
|
||||||
pubsubTopic1 := "topic1"
|
pubsubTopic1 := "topic1"
|
||||||
|
|
||||||
s := NewWakuStore(nil, 0, 0)
|
s := NewWakuStore(nil, nil, 0, 0)
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
msg := &pb.WakuMessage{
|
msg := &pb.WakuMessage{
|
||||||
Payload: []byte{byte(i)},
|
Payload: []byte{byte(i)},
|
||||||
|
@ -200,7 +200,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestTemporalHistoryQueries(t *testing.T) {
|
func TestTemporalHistoryQueries(t *testing.T) {
|
||||||
s := NewWakuStore(nil, 0, 0)
|
s := NewWakuStore(nil, nil, 0, 0)
|
||||||
|
|
||||||
var messages []*pb.WakuMessage
|
var messages []*pb.WakuMessage
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
|
|
Loading…
Reference in New Issue