mirror of
https://github.com/status-im/go-waku.git
synced 2025-01-27 05:56:07 +00:00
feat: build swap and attach it to the store
This commit is contained in:
parent
ed91f47ff0
commit
9bb957afeb
@ -53,6 +53,13 @@ type StoreOptions struct {
|
|||||||
Nodes []string `long:"store-node" description:"Multiaddr of a peer that supports store protocol. Option may be repeated"`
|
Nodes []string `long:"store-node" description:"Multiaddr of a peer that supports store protocol. Option may be repeated"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SwapOptions are settings used for configuring the swap protocol
|
||||||
|
type SwapOptions struct {
|
||||||
|
Mode int `long:"swap-mode" description:"Swap mode: 0=soft, 1=mock, 2=hard" default:"0"`
|
||||||
|
PaymentThreshold int `long:"swap-payment-threshold" description:"Threshold for payment" default:"100"`
|
||||||
|
DisconnectThreshold int `long:"swap-disconnect-threshold" description:"Threshold for disconnecting" default:"-100"`
|
||||||
|
}
|
||||||
|
|
||||||
func (s *StoreOptions) RetentionMaxDaysDuration() time.Duration {
|
func (s *StoreOptions) RetentionMaxDaysDuration() time.Duration {
|
||||||
return time.Duration(s.RetentionMaxDays) * time.Hour * 24
|
return time.Duration(s.RetentionMaxDays) * time.Hour * 24
|
||||||
}
|
}
|
||||||
@ -103,6 +110,7 @@ type Options struct {
|
|||||||
|
|
||||||
Relay RelayOptions `group:"Relay Options"`
|
Relay RelayOptions `group:"Relay Options"`
|
||||||
Store StoreOptions `group:"Store Options"`
|
Store StoreOptions `group:"Store Options"`
|
||||||
|
Swap SwapOptions `group:"Swap Options"`
|
||||||
Filter FilterOptions `group:"Filter Options"`
|
Filter FilterOptions `group:"Filter Options"`
|
||||||
LightPush LightpushOptions `group:"LightPush Options"`
|
LightPush LightpushOptions `group:"LightPush Options"`
|
||||||
DiscV5 DiscV5Options `group:"DiscoveryV5 Options"`
|
DiscV5 DiscV5Options `group:"DiscoveryV5 Options"`
|
||||||
|
@ -30,6 +30,7 @@ import (
|
|||||||
"github.com/status-im/go-waku/waku/v2/protocol/lightpush"
|
"github.com/status-im/go-waku/waku/v2/protocol/lightpush"
|
||||||
"github.com/status-im/go-waku/waku/v2/protocol/relay"
|
"github.com/status-im/go-waku/waku/v2/protocol/relay"
|
||||||
"github.com/status-im/go-waku/waku/v2/protocol/store"
|
"github.com/status-im/go-waku/waku/v2/protocol/store"
|
||||||
|
"github.com/status-im/go-waku/waku/v2/protocol/swap"
|
||||||
"github.com/status-im/go-waku/waku/v2/utils"
|
"github.com/status-im/go-waku/waku/v2/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -55,6 +56,7 @@ type WakuNode struct {
|
|||||||
lightPush *lightpush.WakuLightPush
|
lightPush *lightpush.WakuLightPush
|
||||||
rendezvous *rendezvous.RendezvousService
|
rendezvous *rendezvous.RendezvousService
|
||||||
store *store.WakuStore
|
store *store.WakuStore
|
||||||
|
swap *swap.WakuSwap
|
||||||
wakuFlag utils.WakuEnrBitfield
|
wakuFlag utils.WakuEnrBitfield
|
||||||
|
|
||||||
addrChan chan ma.Multiaddr
|
addrChan chan ma.Multiaddr
|
||||||
@ -241,7 +243,12 @@ func (w *WakuNode) checkForAddressChanges() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (w *WakuNode) Start() error {
|
func (w *WakuNode) Start() error {
|
||||||
w.store = store.NewWakuStore(w.host, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration)
|
w.swap = swap.NewWakuSwap([]swap.SwapOption{
|
||||||
|
swap.WithMode(w.opts.swapMode),
|
||||||
|
swap.WithThreshold(w.opts.swapPaymentThreshold, w.opts.swapDisconnectThreshold),
|
||||||
|
}...)
|
||||||
|
|
||||||
|
w.store = store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration)
|
||||||
if w.opts.enableStore {
|
if w.opts.enableStore {
|
||||||
w.startStore()
|
w.startStore()
|
||||||
}
|
}
|
||||||
|
@ -43,6 +43,10 @@ type WakuNodeParameters struct {
|
|||||||
maxMessages int
|
maxMessages int
|
||||||
maxDuration time.Duration
|
maxDuration time.Duration
|
||||||
|
|
||||||
|
swapMode int
|
||||||
|
swapDisconnectThreshold int
|
||||||
|
swapPaymentThreshold int
|
||||||
|
|
||||||
enableRendezvous bool
|
enableRendezvous bool
|
||||||
enableRendezvousServer bool
|
enableRendezvousServer bool
|
||||||
rendevousStorage rendezvous.Storage
|
rendevousStorage rendezvous.Storage
|
||||||
@ -213,6 +217,16 @@ func WithWakuStore(shouldStoreMessages bool, shouldResume bool) WakuNodeOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithWakuSwap set the option of the Waku V2 Swap protocol
|
||||||
|
func WithWakuSwap(mode int, disconnectThreshold, paymentThreshold int) WakuNodeOption {
|
||||||
|
return func(params *WakuNodeParameters) error {
|
||||||
|
params.swapMode = mode
|
||||||
|
params.swapDisconnectThreshold = disconnectThreshold
|
||||||
|
params.swapPaymentThreshold = paymentThreshold
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithWakuStoreAndRetentionPolicy enables the Waku V2 Store protocol, storing them in an optional message provider
|
// WithWakuStoreAndRetentionPolicy enables the Waku V2 Store protocol, storing them in an optional message provider
|
||||||
// applying an specific retention policy
|
// applying an specific retention policy
|
||||||
func WithWakuStoreAndRetentionPolicy(shouldResume bool, maxDuration time.Duration, maxMessages int) WakuNodeOption {
|
func WithWakuStoreAndRetentionPolicy(shouldResume bool, maxDuration time.Duration, maxMessages int) WakuNodeOption {
|
||||||
|
@ -21,7 +21,7 @@ func TestFindLastSeenMessage(t *testing.T) {
|
|||||||
msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", 4), "test")
|
msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", 4), "test")
|
||||||
msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", 5), "test")
|
msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", 5), "test")
|
||||||
|
|
||||||
s := NewWakuStore(nil, nil, 0, 0)
|
s := NewWakuStore(nil, nil, nil, 0, 0)
|
||||||
s.storeMessage(msg1)
|
s.storeMessage(msg1)
|
||||||
s.storeMessage(msg3)
|
s.storeMessage(msg3)
|
||||||
s.storeMessage(msg5)
|
s.storeMessage(msg5)
|
||||||
@ -38,7 +38,7 @@ func TestResume(t *testing.T) {
|
|||||||
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s1 := NewWakuStore(host1, nil, 0, 0)
|
s1 := NewWakuStore(host1, nil, nil, 0, 0)
|
||||||
s1.Start(ctx)
|
s1.Start(ctx)
|
||||||
defer s1.Stop()
|
defer s1.Stop()
|
||||||
|
|
||||||
@ -55,7 +55,7 @@ func TestResume(t *testing.T) {
|
|||||||
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s2 := NewWakuStore(host2, nil, 0, 0)
|
s2 := NewWakuStore(host2, nil, nil, 0, 0)
|
||||||
s2.Start(ctx)
|
s2.Start(ctx)
|
||||||
defer s2.Stop()
|
defer s2.Stop()
|
||||||
|
|
||||||
@ -87,7 +87,7 @@ func TestResumeWithListOfPeers(t *testing.T) {
|
|||||||
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s1 := NewWakuStore(host1, nil, 0, 0)
|
s1 := NewWakuStore(host1, nil, nil, 0, 0)
|
||||||
s1.Start(ctx)
|
s1.Start(ctx)
|
||||||
defer s1.Stop()
|
defer s1.Stop()
|
||||||
|
|
||||||
@ -98,7 +98,7 @@ func TestResumeWithListOfPeers(t *testing.T) {
|
|||||||
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s2 := NewWakuStore(host2, nil, 0, 0)
|
s2 := NewWakuStore(host2, nil, nil, 0, 0)
|
||||||
s2.Start(ctx)
|
s2.Start(ctx)
|
||||||
defer s2.Stop()
|
defer s2.Stop()
|
||||||
|
|
||||||
@ -120,7 +120,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
|
|||||||
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s1 := NewWakuStore(host1, nil, 0, 0)
|
s1 := NewWakuStore(host1, nil, nil, 0, 0)
|
||||||
s1.Start(ctx)
|
s1.Start(ctx)
|
||||||
defer s1.Stop()
|
defer s1.Stop()
|
||||||
|
|
||||||
@ -131,7 +131,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
|
|||||||
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s2 := NewWakuStore(host2, nil, 0, 0)
|
s2 := NewWakuStore(host2, nil, nil, 0, 0)
|
||||||
s2.Start(ctx)
|
s2.Start(ctx)
|
||||||
defer s2.Stop()
|
defer s2.Stop()
|
||||||
|
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"github.com/status-im/go-waku/waku/v2/metrics"
|
"github.com/status-im/go-waku/waku/v2/metrics"
|
||||||
"github.com/status-im/go-waku/waku/v2/protocol"
|
"github.com/status-im/go-waku/waku/v2/protocol"
|
||||||
"github.com/status-im/go-waku/waku/v2/protocol/pb"
|
"github.com/status-im/go-waku/waku/v2/protocol/pb"
|
||||||
|
"github.com/status-im/go-waku/waku/v2/protocol/swap"
|
||||||
"github.com/status-im/go-waku/waku/v2/utils"
|
"github.com/status-im/go-waku/waku/v2/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -235,13 +236,15 @@ type WakuStore struct {
|
|||||||
messageQueue *MessageQueue
|
messageQueue *MessageQueue
|
||||||
msgProvider MessageProvider
|
msgProvider MessageProvider
|
||||||
h host.Host
|
h host.Host
|
||||||
|
swap *swap.WakuSwap
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages
|
// NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages
|
||||||
func NewWakuStore(host host.Host, p MessageProvider, maxNumberOfMessages int, maxRetentionDuration time.Duration) *WakuStore {
|
func NewWakuStore(host host.Host, swap *swap.WakuSwap, p MessageProvider, maxNumberOfMessages int, maxRetentionDuration time.Duration) *WakuStore {
|
||||||
wakuStore := new(WakuStore)
|
wakuStore := new(WakuStore)
|
||||||
wakuStore.msgProvider = p
|
wakuStore.msgProvider = p
|
||||||
wakuStore.h = host
|
wakuStore.h = host
|
||||||
|
wakuStore.swap = swap
|
||||||
wakuStore.wg = &sync.WaitGroup{}
|
wakuStore.wg = &sync.WaitGroup{}
|
||||||
wakuStore.messageQueue = NewMessageQueue(maxNumberOfMessages, maxRetentionDuration)
|
wakuStore.messageQueue = NewMessageQueue(maxNumberOfMessages, maxRetentionDuration)
|
||||||
return wakuStore
|
return wakuStore
|
||||||
|
@ -24,7 +24,7 @@ func TestStorePersistence(t *testing.T) {
|
|||||||
dbStore, err := persistence.NewDBStore(persistence.WithDB(db))
|
dbStore, err := persistence.NewDBStore(persistence.WithDB(db))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s1 := NewWakuStore(nil, dbStore, 0, 0)
|
s1 := NewWakuStore(nil, nil, dbStore, 0, 0)
|
||||||
s1.fetchDBRecords(ctx)
|
s1.fetchDBRecords(ctx)
|
||||||
require.Len(t, s1.messageQueue.messages, 0)
|
require.Len(t, s1.messageQueue.messages, 0)
|
||||||
|
|
||||||
@ -39,7 +39,7 @@ func TestStorePersistence(t *testing.T) {
|
|||||||
|
|
||||||
s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic))
|
s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic))
|
||||||
|
|
||||||
s2 := NewWakuStore(nil, dbStore, 0, 0)
|
s2 := NewWakuStore(nil, nil, dbStore, 0, 0)
|
||||||
s2.fetchDBRecords(ctx)
|
s2.fetchDBRecords(ctx)
|
||||||
require.Len(t, s2.messageQueue.messages, 1)
|
require.Len(t, s2.messageQueue.messages, 1)
|
||||||
require.Equal(t, msg, s2.messageQueue.messages[0].msg)
|
require.Equal(t, msg, s2.messageQueue.messages[0].msg)
|
||||||
|
@ -20,7 +20,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
|
|||||||
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s1 := NewWakuStore(host1, nil, 0, 0)
|
s1 := NewWakuStore(host1, nil, nil, 0, 0)
|
||||||
s1.Start(ctx)
|
s1.Start(ctx)
|
||||||
defer s1.Stop()
|
defer s1.Stop()
|
||||||
|
|
||||||
@ -39,7 +39,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
|
|||||||
// Simulate a message has been received via relay protocol
|
// Simulate a message has been received via relay protocol
|
||||||
s1.MsgC <- protocol.NewEnvelope(msg, pubsubTopic1)
|
s1.MsgC <- protocol.NewEnvelope(msg, pubsubTopic1)
|
||||||
|
|
||||||
s2 := NewWakuStore(host2, nil, 0, 0)
|
s2 := NewWakuStore(host2, nil, nil, 0, 0)
|
||||||
s2.Start(ctx)
|
s2.Start(ctx)
|
||||||
defer s2.Stop()
|
defer s2.Stop()
|
||||||
|
|
||||||
@ -66,7 +66,7 @@ func TestWakuStoreProtocolNext(t *testing.T) {
|
|||||||
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s1 := NewWakuStore(host1, nil, 0, 0)
|
s1 := NewWakuStore(host1, nil, nil, 0, 0)
|
||||||
s1.Start(ctx)
|
s1.Start(ctx)
|
||||||
defer s1.Stop()
|
defer s1.Stop()
|
||||||
|
|
||||||
@ -92,7 +92,7 @@ func TestWakuStoreProtocolNext(t *testing.T) {
|
|||||||
err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta3))
|
err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta3))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
s2 := NewWakuStore(host2, nil, 0, 0)
|
s2 := NewWakuStore(host2, nil, nil, 0, 0)
|
||||||
s2.Start(ctx)
|
s2.Start(ctx)
|
||||||
defer s2.Stop()
|
defer s2.Stop()
|
||||||
|
|
||||||
|
@ -17,7 +17,7 @@ func TestStoreQuery(t *testing.T) {
|
|||||||
msg1 := tests.CreateWakuMessage(defaultContentTopic, utils.GetUnixEpoch())
|
msg1 := tests.CreateWakuMessage(defaultContentTopic, utils.GetUnixEpoch())
|
||||||
msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch())
|
msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch())
|
||||||
|
|
||||||
s := NewWakuStore(nil, nil, 0, 0)
|
s := NewWakuStore(nil, nil, nil, 0, 0)
|
||||||
s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic))
|
s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic))
|
||||||
s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic))
|
s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic))
|
||||||
|
|
||||||
@ -43,7 +43,7 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) {
|
|||||||
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
||||||
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
||||||
|
|
||||||
s := NewWakuStore(nil, nil, 0, 0)
|
s := NewWakuStore(nil, nil, nil, 0, 0)
|
||||||
|
|
||||||
s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic))
|
s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic))
|
||||||
s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic))
|
s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic))
|
||||||
@ -77,7 +77,7 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) {
|
|||||||
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
||||||
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
||||||
|
|
||||||
s := NewWakuStore(nil, nil, 0, 0)
|
s := NewWakuStore(nil, nil, nil, 0, 0)
|
||||||
s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1))
|
s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1))
|
||||||
s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2))
|
s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2))
|
||||||
s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2))
|
s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2))
|
||||||
@ -109,7 +109,7 @@ func TestStoreQueryPubsubTopicNoMatch(t *testing.T) {
|
|||||||
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
||||||
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
||||||
|
|
||||||
s := NewWakuStore(nil, nil, 0, 0)
|
s := NewWakuStore(nil, nil, nil, 0, 0)
|
||||||
s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic2))
|
s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic2))
|
||||||
s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2))
|
s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2))
|
||||||
s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2))
|
s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2))
|
||||||
@ -131,7 +131,7 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) {
|
|||||||
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
|
||||||
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
|
||||||
|
|
||||||
s := NewWakuStore(nil, nil, 0, 0)
|
s := NewWakuStore(nil, nil, nil, 0, 0)
|
||||||
s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1))
|
s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1))
|
||||||
s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic1))
|
s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic1))
|
||||||
s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic1))
|
s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic1))
|
||||||
@ -150,7 +150,7 @@ func TestStoreQueryForwardPagination(t *testing.T) {
|
|||||||
topic1 := "1"
|
topic1 := "1"
|
||||||
pubsubTopic1 := "topic1"
|
pubsubTopic1 := "topic1"
|
||||||
|
|
||||||
s := NewWakuStore(nil, nil, 0, 0)
|
s := NewWakuStore(nil, nil, nil, 0, 0)
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch())
|
msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch())
|
||||||
msg.Payload = []byte{byte(i)}
|
msg.Payload = []byte{byte(i)}
|
||||||
@ -174,7 +174,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) {
|
|||||||
topic1 := "1"
|
topic1 := "1"
|
||||||
pubsubTopic1 := "topic1"
|
pubsubTopic1 := "topic1"
|
||||||
|
|
||||||
s := NewWakuStore(nil, nil, 0, 0)
|
s := NewWakuStore(nil, nil, nil, 0, 0)
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
msg := &pb.WakuMessage{
|
msg := &pb.WakuMessage{
|
||||||
Payload: []byte{byte(i)},
|
Payload: []byte{byte(i)},
|
||||||
@ -200,7 +200,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestTemporalHistoryQueries(t *testing.T) {
|
func TestTemporalHistoryQueries(t *testing.T) {
|
||||||
s := NewWakuStore(nil, nil, 0, 0)
|
s := NewWakuStore(nil, nil, nil, 0, 0)
|
||||||
|
|
||||||
var messages []*pb.WakuMessage
|
var messages []*pb.WakuMessage
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user