From 9bb957afebcbfc87bc51581b6bee463449871fad Mon Sep 17 00:00:00 2001 From: Anthony Laibe Date: Mon, 6 Dec 2021 11:49:13 +0100 Subject: [PATCH] feat: build swap and attach it to the store --- waku/options.go | 8 ++++++++ waku/v2/node/wakunode2.go | 9 ++++++++- waku/v2/node/wakuoptions.go | 14 ++++++++++++++ waku/v2/protocol/store/waku_resume_test.go | 14 +++++++------- waku/v2/protocol/store/waku_store.go | 5 ++++- .../store/waku_store_persistence_test.go | 4 ++-- .../protocol/store/waku_store_protocol_test.go | 8 ++++---- waku/v2/protocol/store/waku_store_query_test.go | 16 ++++++++-------- 8 files changed, 55 insertions(+), 23 deletions(-) diff --git a/waku/options.go b/waku/options.go index 45811c28..ff226546 100644 --- a/waku/options.go +++ b/waku/options.go @@ -53,6 +53,13 @@ type StoreOptions struct { Nodes []string `long:"store-node" description:"Multiaddr of a peer that supports store protocol. Option may be repeated"` } +// SwapOptions are settings used for configuring the swap protocol +type SwapOptions struct { + Mode int `long:"swap-mode" description:"Swap mode: 0=soft, 1=mock, 2=hard" default:"0"` + PaymentThreshold int `long:"swap-payment-threshold" description:"Threshold for payment" default:"100"` + DisconnectThreshold int `long:"swap-disconnect-threshold" description:"Threshold for disconnecting" default:"-100"` +} + func (s *StoreOptions) RetentionMaxDaysDuration() time.Duration { return time.Duration(s.RetentionMaxDays) * time.Hour * 24 } @@ -103,6 +110,7 @@ type Options struct { Relay RelayOptions `group:"Relay Options"` Store StoreOptions `group:"Store Options"` + Swap SwapOptions `group:"Swap Options"` Filter FilterOptions `group:"Filter Options"` LightPush LightpushOptions `group:"LightPush Options"` DiscV5 DiscV5Options `group:"DiscoveryV5 Options"` diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 8b9e274f..132b956b 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -30,6 +30,7 @@ import ( "github.com/status-im/go-waku/waku/v2/protocol/lightpush" "github.com/status-im/go-waku/waku/v2/protocol/relay" "github.com/status-im/go-waku/waku/v2/protocol/store" + "github.com/status-im/go-waku/waku/v2/protocol/swap" "github.com/status-im/go-waku/waku/v2/utils" ) @@ -55,6 +56,7 @@ type WakuNode struct { lightPush *lightpush.WakuLightPush rendezvous *rendezvous.RendezvousService store *store.WakuStore + swap *swap.WakuSwap wakuFlag utils.WakuEnrBitfield addrChan chan ma.Multiaddr @@ -241,7 +243,12 @@ func (w *WakuNode) checkForAddressChanges() { } func (w *WakuNode) Start() error { - w.store = store.NewWakuStore(w.host, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration) + w.swap = swap.NewWakuSwap([]swap.SwapOption{ + swap.WithMode(w.opts.swapMode), + swap.WithThreshold(w.opts.swapPaymentThreshold, w.opts.swapDisconnectThreshold), + }...) + + w.store = store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration) if w.opts.enableStore { w.startStore() } diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 06165237..5956997f 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -43,6 +43,10 @@ type WakuNodeParameters struct { maxMessages int maxDuration time.Duration + swapMode int + swapDisconnectThreshold int + swapPaymentThreshold int + enableRendezvous bool enableRendezvousServer bool rendevousStorage rendezvous.Storage @@ -213,6 +217,16 @@ func WithWakuStore(shouldStoreMessages bool, shouldResume bool) WakuNodeOption { } } +// WithWakuSwap set the option of the Waku V2 Swap protocol +func WithWakuSwap(mode int, disconnectThreshold, paymentThreshold int) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.swapMode = mode + params.swapDisconnectThreshold = disconnectThreshold + params.swapPaymentThreshold = paymentThreshold + return nil + } +} + // WithWakuStoreAndRetentionPolicy enables the Waku V2 Store protocol, storing them in an optional message provider // applying an specific retention policy func WithWakuStoreAndRetentionPolicy(shouldResume bool, maxDuration time.Duration, maxMessages int) WakuNodeOption { diff --git a/waku/v2/protocol/store/waku_resume_test.go b/waku/v2/protocol/store/waku_resume_test.go index cf13ef3d..65b3e733 100644 --- a/waku/v2/protocol/store/waku_resume_test.go +++ b/waku/v2/protocol/store/waku_resume_test.go @@ -21,7 +21,7 @@ func TestFindLastSeenMessage(t *testing.T) { msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", 4), "test") msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", 5), "test") - s := NewWakuStore(nil, nil, 0, 0) + s := NewWakuStore(nil, nil, nil, 0, 0) s.storeMessage(msg1) s.storeMessage(msg3) s.storeMessage(msg5) @@ -38,7 +38,7 @@ func TestResume(t *testing.T) { host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, 0, 0) + s1 := NewWakuStore(host1, nil, nil, 0, 0) s1.Start(ctx) defer s1.Stop() @@ -55,7 +55,7 @@ func TestResume(t *testing.T) { host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(host2, nil, 0, 0) + s2 := NewWakuStore(host2, nil, nil, 0, 0) s2.Start(ctx) defer s2.Stop() @@ -87,7 +87,7 @@ func TestResumeWithListOfPeers(t *testing.T) { host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, 0, 0) + s1 := NewWakuStore(host1, nil, nil, 0, 0) s1.Start(ctx) defer s1.Stop() @@ -98,7 +98,7 @@ func TestResumeWithListOfPeers(t *testing.T) { host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(host2, nil, 0, 0) + s2 := NewWakuStore(host2, nil, nil, 0, 0) s2.Start(ctx) defer s2.Stop() @@ -120,7 +120,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) { host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, 0, 0) + s1 := NewWakuStore(host1, nil, nil, 0, 0) s1.Start(ctx) defer s1.Stop() @@ -131,7 +131,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) { host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s2 := NewWakuStore(host2, nil, 0, 0) + s2 := NewWakuStore(host2, nil, nil, 0, 0) s2.Start(ctx) defer s2.Stop() diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index b1048f28..bef39de6 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -22,6 +22,7 @@ import ( "github.com/status-im/go-waku/waku/v2/metrics" "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol/pb" + "github.com/status-im/go-waku/waku/v2/protocol/swap" "github.com/status-im/go-waku/waku/v2/utils" ) @@ -235,13 +236,15 @@ type WakuStore struct { messageQueue *MessageQueue msgProvider MessageProvider h host.Host + swap *swap.WakuSwap } // NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages -func NewWakuStore(host host.Host, p MessageProvider, maxNumberOfMessages int, maxRetentionDuration time.Duration) *WakuStore { +func NewWakuStore(host host.Host, swap *swap.WakuSwap, p MessageProvider, maxNumberOfMessages int, maxRetentionDuration time.Duration) *WakuStore { wakuStore := new(WakuStore) wakuStore.msgProvider = p wakuStore.h = host + wakuStore.swap = swap wakuStore.wg = &sync.WaitGroup{} wakuStore.messageQueue = NewMessageQueue(maxNumberOfMessages, maxRetentionDuration) return wakuStore diff --git a/waku/v2/protocol/store/waku_store_persistence_test.go b/waku/v2/protocol/store/waku_store_persistence_test.go index f239c118..4d484d35 100644 --- a/waku/v2/protocol/store/waku_store_persistence_test.go +++ b/waku/v2/protocol/store/waku_store_persistence_test.go @@ -24,7 +24,7 @@ func TestStorePersistence(t *testing.T) { dbStore, err := persistence.NewDBStore(persistence.WithDB(db)) require.NoError(t, err) - s1 := NewWakuStore(nil, dbStore, 0, 0) + s1 := NewWakuStore(nil, nil, dbStore, 0, 0) s1.fetchDBRecords(ctx) require.Len(t, s1.messageQueue.messages, 0) @@ -39,7 +39,7 @@ func TestStorePersistence(t *testing.T) { s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic)) - s2 := NewWakuStore(nil, dbStore, 0, 0) + s2 := NewWakuStore(nil, nil, dbStore, 0, 0) s2.fetchDBRecords(ctx) require.Len(t, s2.messageQueue.messages, 1) require.Equal(t, msg, s2.messageQueue.messages[0].msg) diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go index 50bc943a..65dbf085 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -20,7 +20,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) { host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, 0, 0) + s1 := NewWakuStore(host1, nil, nil, 0, 0) s1.Start(ctx) defer s1.Stop() @@ -39,7 +39,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) { // Simulate a message has been received via relay protocol s1.MsgC <- protocol.NewEnvelope(msg, pubsubTopic1) - s2 := NewWakuStore(host2, nil, 0, 0) + s2 := NewWakuStore(host2, nil, nil, 0, 0) s2.Start(ctx) defer s2.Stop() @@ -66,7 +66,7 @@ func TestWakuStoreProtocolNext(t *testing.T) { host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) require.NoError(t, err) - s1 := NewWakuStore(host1, nil, 0, 0) + s1 := NewWakuStore(host1, nil, nil, 0, 0) s1.Start(ctx) defer s1.Stop() @@ -92,7 +92,7 @@ func TestWakuStoreProtocolNext(t *testing.T) { err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta3)) require.NoError(t, err) - s2 := NewWakuStore(host2, nil, 0, 0) + s2 := NewWakuStore(host2, nil, nil, 0, 0) s2.Start(ctx) defer s2.Stop() diff --git a/waku/v2/protocol/store/waku_store_query_test.go b/waku/v2/protocol/store/waku_store_query_test.go index 0f39e25b..f4411c74 100644 --- a/waku/v2/protocol/store/waku_store_query_test.go +++ b/waku/v2/protocol/store/waku_store_query_test.go @@ -17,7 +17,7 @@ func TestStoreQuery(t *testing.T) { msg1 := tests.CreateWakuMessage(defaultContentTopic, utils.GetUnixEpoch()) msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, 0, 0) + s := NewWakuStore(nil, nil, nil, 0, 0) s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic)) s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic)) @@ -43,7 +43,7 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, 0, 0) + s := NewWakuStore(nil, nil, nil, 0, 0) s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic)) s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic)) @@ -77,7 +77,7 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, 0, 0) + s := NewWakuStore(nil, nil, nil, 0, 0) s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1)) s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2)) s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2)) @@ -109,7 +109,7 @@ func TestStoreQueryPubsubTopicNoMatch(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, 0, 0) + s := NewWakuStore(nil, nil, nil, 0, 0) s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic2)) s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2)) s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2)) @@ -131,7 +131,7 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) { msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) - s := NewWakuStore(nil, nil, 0, 0) + s := NewWakuStore(nil, nil, nil, 0, 0) s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1)) s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic1)) s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic1)) @@ -150,7 +150,7 @@ func TestStoreQueryForwardPagination(t *testing.T) { topic1 := "1" pubsubTopic1 := "topic1" - s := NewWakuStore(nil, nil, 0, 0) + s := NewWakuStore(nil, nil, nil, 0, 0) for i := 0; i < 10; i++ { msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch()) msg.Payload = []byte{byte(i)} @@ -174,7 +174,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) { topic1 := "1" pubsubTopic1 := "topic1" - s := NewWakuStore(nil, nil, 0, 0) + s := NewWakuStore(nil, nil, nil, 0, 0) for i := 0; i < 10; i++ { msg := &pb.WakuMessage{ Payload: []byte{byte(i)}, @@ -200,7 +200,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) { } func TestTemporalHistoryQueries(t *testing.T) { - s := NewWakuStore(nil, nil, 0, 0) + s := NewWakuStore(nil, nil, nil, 0, 0) var messages []*pb.WakuMessage for i := 0; i < 10; i++ {