diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 8ce40e80..0c450af9 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -44,6 +44,8 @@ type Peer struct { Connected bool } +type storeFactory func(w *WakuNode) store.Store + type WakuNode struct { host host.Host opts *WakuNodeParameters @@ -53,7 +55,7 @@ type WakuNode struct { filter *filter.WakuFilter lightPush *lightpush.WakuLightPush rendezvous *rendezvous.RendezvousService - store *store.WakuStore + store store.Store swap *swap.WakuSwap wakuFlag utils.WakuEnrBitfield @@ -79,6 +81,12 @@ type WakuNode struct { // Channel passed to WakuNode constructor // receiving connection status notifications connStatusChan chan ConnStatus + + storeFactory storeFactory +} + +func defaultStoreFactory(w *WakuNode) store.Store { + return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log) } func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { @@ -136,6 +144,12 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { w.keepAliveFails = make(map[peer.ID]int) w.wakuFlag = utils.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilter, w.opts.enableStore, w.opts.enableRelay) + if params.storeFactory != nil { + w.storeFactory = params.storeFactory + } else { + w.storeFactory = defaultStoreFactory + } + if w.protocolEventSub, err = host.EventBus().Subscribe(new(event.EvtPeerProtocolsUpdated)); err != nil { return nil, err } @@ -247,7 +261,7 @@ func (w *WakuNode) Start() error { swap.WithThreshold(w.opts.swapPaymentThreshold, w.opts.swapDisconnectThreshold), }...) - w.store = store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log) + w.store = w.storeFactory(w) if w.opts.enableStore { w.startStore() } @@ -298,7 +312,7 @@ func (w *WakuNode) Start() error { // Subscribe store to topic if w.opts.storeMsgs { w.log.Info("Subscribing store to broadcaster") - w.bcaster.Register(w.store.MsgC) + w.bcaster.Register(w.store.MessageChannel()) } if w.filter != nil { @@ -360,7 +374,7 @@ func (w *WakuNode) Relay() *relay.WakuRelay { return w.relay } -func (w *WakuNode) Store() *store.WakuStore { +func (w *WakuNode) Store() store.Store { return w.store } diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 0615c281..3197265e 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -74,6 +74,8 @@ type WakuNodeParameters struct { enableLightPush bool connStatusC chan ConnStatus + + storeFactory storeFactory } type WakuNodeOption func(*WakuNodeParameters) error @@ -245,6 +247,14 @@ func WithWakuStore(shouldStoreMessages bool, shouldResume bool) WakuNodeOption { } } +func WithWakuStoreFactory(factory storeFactory) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.storeFactory = factory + + return nil + } +} + // WithWakuSwap set the option of the Waku V2 Swap protocol func WithWakuSwap(mode int, disconnectThreshold, paymentThreshold int) WakuNodeOption { return func(params *WakuNodeParameters) error { diff --git a/waku/v2/node/wakuoptions_test.go b/waku/v2/node/wakuoptions_test.go index 3f678cab..502cb0f2 100644 --- a/waku/v2/node/wakuoptions_test.go +++ b/waku/v2/node/wakuoptions_test.go @@ -9,6 +9,7 @@ import ( "github.com/multiformats/go-multiaddr" rendezvous "github.com/status-im/go-waku-rendezvous" "github.com/status-im/go-waku/tests" + "github.com/status-im/go-waku/waku/v2/protocol/store" "github.com/stretchr/testify/require" ) @@ -28,6 +29,10 @@ func TestWakuOptions(t *testing.T) { advertiseAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0") + storeFactory := func(w *WakuNode) store.Store { + return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log) + } + options := []WakuNodeOption{ WithHostAddress(hostAddr), WithAdvertiseAddress(advertiseAddr, false, 4000), @@ -45,6 +50,7 @@ func TestWakuOptions(t *testing.T) { WithLightPush(), WithKeepAlive(time.Hour), WithConnectionStatusChannel(connStatusChan), + WithWakuStoreFactory(storeFactory), } params := new(WakuNodeParameters) diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index 9d245baa..9215fed5 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -240,6 +240,15 @@ type WakuStore struct { swap *swap.WakuSwap } +type Store interface { + Start(ctx context.Context) + Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error) + Next(ctx context.Context, r *Result) (*Result, error) + Resume(ctx context.Context, pubsubTopic string, peerList []peer.ID) (int, error) + MessageChannel() chan *protocol.Envelope + Stop() +} + // NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages func NewWakuStore(host host.Host, swap *swap.WakuSwap, p MessageProvider, maxNumberOfMessages int, maxRetentionDuration time.Duration, log *zap.SugaredLogger) *WakuStore { wakuStore := new(WakuStore) @@ -775,6 +784,10 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList return msgCount, nil } +func (store *WakuStore) MessageChannel() chan *protocol.Envelope { + return store.MsgC +} + // TODO: queryWithAccounting // Stop closes the store message channel and removes the protocol stream handler