Create pluggable store (#210)

* Add store factory
* Add to test
This commit is contained in:
Nicholas Molnar 2022-03-18 12:56:34 -07:00 committed by GitHub
parent af6f36ec54
commit 21b2e1d97c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 47 additions and 4 deletions

View File

@ -44,6 +44,8 @@ type Peer struct {
Connected bool
}
type storeFactory func(w *WakuNode) store.Store
type WakuNode struct {
host host.Host
opts *WakuNodeParameters
@ -53,7 +55,7 @@ type WakuNode struct {
filter *filter.WakuFilter
lightPush *lightpush.WakuLightPush
rendezvous *rendezvous.RendezvousService
store *store.WakuStore
store store.Store
swap *swap.WakuSwap
wakuFlag utils.WakuEnrBitfield
@ -79,6 +81,12 @@ type WakuNode struct {
// Channel passed to WakuNode constructor
// receiving connection status notifications
connStatusChan chan ConnStatus
storeFactory storeFactory
}
func defaultStoreFactory(w *WakuNode) store.Store {
return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log)
}
func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
@ -136,6 +144,12 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
w.keepAliveFails = make(map[peer.ID]int)
w.wakuFlag = utils.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilter, w.opts.enableStore, w.opts.enableRelay)
if params.storeFactory != nil {
w.storeFactory = params.storeFactory
} else {
w.storeFactory = defaultStoreFactory
}
if w.protocolEventSub, err = host.EventBus().Subscribe(new(event.EvtPeerProtocolsUpdated)); err != nil {
return nil, err
}
@ -247,7 +261,7 @@ func (w *WakuNode) Start() error {
swap.WithThreshold(w.opts.swapPaymentThreshold, w.opts.swapDisconnectThreshold),
}...)
w.store = store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log)
w.store = w.storeFactory(w)
if w.opts.enableStore {
w.startStore()
}
@ -298,7 +312,7 @@ func (w *WakuNode) Start() error {
// Subscribe store to topic
if w.opts.storeMsgs {
w.log.Info("Subscribing store to broadcaster")
w.bcaster.Register(w.store.MsgC)
w.bcaster.Register(w.store.MessageChannel())
}
if w.filter != nil {
@ -360,7 +374,7 @@ func (w *WakuNode) Relay() *relay.WakuRelay {
return w.relay
}
func (w *WakuNode) Store() *store.WakuStore {
func (w *WakuNode) Store() store.Store {
return w.store
}

View File

@ -74,6 +74,8 @@ type WakuNodeParameters struct {
enableLightPush bool
connStatusC chan ConnStatus
storeFactory storeFactory
}
type WakuNodeOption func(*WakuNodeParameters) error
@ -245,6 +247,14 @@ func WithWakuStore(shouldStoreMessages bool, shouldResume bool) WakuNodeOption {
}
}
func WithWakuStoreFactory(factory storeFactory) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.storeFactory = factory
return nil
}
}
// WithWakuSwap set the option of the Waku V2 Swap protocol
func WithWakuSwap(mode int, disconnectThreshold, paymentThreshold int) WakuNodeOption {
return func(params *WakuNodeParameters) error {

View File

@ -9,6 +9,7 @@ import (
"github.com/multiformats/go-multiaddr"
rendezvous "github.com/status-im/go-waku-rendezvous"
"github.com/status-im/go-waku/tests"
"github.com/status-im/go-waku/waku/v2/protocol/store"
"github.com/stretchr/testify/require"
)
@ -28,6 +29,10 @@ func TestWakuOptions(t *testing.T) {
advertiseAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
storeFactory := func(w *WakuNode) store.Store {
return store.NewWakuStore(w.host, w.swap, w.opts.messageProvider, w.opts.maxMessages, w.opts.maxDuration, w.log)
}
options := []WakuNodeOption{
WithHostAddress(hostAddr),
WithAdvertiseAddress(advertiseAddr, false, 4000),
@ -45,6 +50,7 @@ func TestWakuOptions(t *testing.T) {
WithLightPush(),
WithKeepAlive(time.Hour),
WithConnectionStatusChannel(connStatusChan),
WithWakuStoreFactory(storeFactory),
}
params := new(WakuNodeParameters)

View File

@ -240,6 +240,15 @@ type WakuStore struct {
swap *swap.WakuSwap
}
type Store interface {
Start(ctx context.Context)
Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error)
Next(ctx context.Context, r *Result) (*Result, error)
Resume(ctx context.Context, pubsubTopic string, peerList []peer.ID) (int, error)
MessageChannel() chan *protocol.Envelope
Stop()
}
// NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages
func NewWakuStore(host host.Host, swap *swap.WakuSwap, p MessageProvider, maxNumberOfMessages int, maxRetentionDuration time.Duration, log *zap.SugaredLogger) *WakuStore {
wakuStore := new(WakuStore)
@ -775,6 +784,10 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList
return msgCount, nil
}
func (store *WakuStore) MessageChannel() chan *protocol.Envelope {
return store.MsgC
}
// TODO: queryWithAccounting
// Stop closes the store message channel and removes the protocol stream handler