refactor: store and lightpush (#118)

This commit is contained in:
Richard Ramos 2021-11-01 08:38:03 -04:00 committed by GitHub
parent 98255060f3
commit c0ba800af7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 149 additions and 134 deletions

View File

@ -205,7 +205,7 @@ func main() {
q := store.Query{
ContentTopics: []string{*contentTopicFlag},
}
response, err := wakuNode.Query(tCtx, q,
response, err := wakuNode.Store().Query(tCtx, q,
store.WithAutomaticRequestId(),
store.WithPeer(*storeNodeId),
store.WithPaging(true, 0))

View File

@ -138,6 +138,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
}
func (w *WakuNode) Start() error {
w.store = store.NewWakuStore(w.opts.messageProvider)
if w.opts.enableStore {
w.startStore()
}
@ -162,8 +163,11 @@ func (w *WakuNode) Start() error {
}
}
w.lightPush = lightpush.NewWakuLightPush(w.ctx, w.host, w.relay)
if w.opts.enableLightPush {
w.mountLightPush()
if err := w.lightPush.Start(); err != nil {
return err
}
}
if w.opts.enableRendezvousServer {
@ -208,13 +212,8 @@ func (w *WakuNode) Stop() {
w.filters = nil
}
if w.lightPush != nil {
w.lightPush.Stop()
}
if w.store != nil {
w.store.Stop()
}
w.lightPush.Stop()
w.store.Stop()
w.host.Close()
}
@ -240,10 +239,18 @@ func (w *WakuNode) Relay() *relay.WakuRelay {
return w.relay
}
func (w *WakuNode) Store() *store.WakuStore {
return w.store
}
func (w *WakuNode) Filter() *filter.WakuFilter {
return w.filter
}
func (w *WakuNode) Lightpush() *lightpush.WakuLightPush {
return w.lightPush
}
func (w *WakuNode) mountRelay(opts ...pubsub.Option) error {
var err error
w.relay, err = relay.NewWakuRelay(w.ctx, w.host, opts...)
@ -273,10 +280,6 @@ func (w *WakuNode) mountFilter() error {
return nil
}
func (w *WakuNode) mountLightPush() {
w.lightPush = lightpush.NewWakuLightPush(w.ctx, w.host, w.relay)
}
func (w *WakuNode) mountRendezvous() error {
w.rendezvous = rendezvous.NewRendezvousService(w.host, w.opts.rendevousStorage)
@ -289,7 +292,6 @@ func (w *WakuNode) mountRendezvous() error {
}
func (w *WakuNode) startStore() {
w.store = w.opts.store
w.store.Start(w.ctx, w.host)
if w.opts.shouldResume {
@ -313,7 +315,7 @@ func (w *WakuNode) startStore() {
ctxWithTimeout, ctxCancel := context.WithTimeout(w.ctx, 20*time.Second)
defer ctxCancel()
if err := w.Resume(ctxWithTimeout, nil); err != nil {
if _, err := w.store.Resume(ctxWithTimeout, string(relay.DefaultWakuTopic), nil); err != nil {
log.Info("Retrying in 10s...")
time.Sleep(10 * time.Second)
} else {
@ -344,37 +346,6 @@ func (w *WakuNode) AddPeer(address ma.Multiaddr, protocolID p2pproto.ID) (*peer.
return &info.ID, w.addPeer(info, protocolID)
}
func (w *WakuNode) Query(ctx context.Context, query store.Query, opts ...store.HistoryRequestOption) (*store.Result, error) {
if w.store == nil {
return nil, errors.New("WakuStore is not set")
}
return w.store.Query(ctx, query, opts...)
}
func (w *WakuNode) Next(ctx context.Context, result *store.Result) (*store.Result, error) {
if w.store == nil {
return nil, errors.New("WakuStore is not set")
}
return w.store.Next(ctx, result)
}
func (w *WakuNode) Resume(ctx context.Context, peerList []peer.ID) error {
if w.store == nil {
return errors.New("WakuStore is not set")
}
result, err := w.store.Resume(ctx, string(relay.DefaultWakuTopic), peerList)
if err != nil {
return err
}
log.Info("Retrieved messages since the last online time: ", result)
return nil
}
func (node *WakuNode) Subscribe(ctx context.Context, topic *relay.Topic) (*Subscription, error) {
// Subscribes to a PubSub topic.
// NOTE The data field SHOULD be decoded as a WakuMessage.
@ -386,9 +357,9 @@ func (node *WakuNode) Subscribe(ctx context.Context, topic *relay.Topic) (*Subsc
sub, isNew, err := node.relay.Subscribe(t)
// Subscribe store to topic
if isNew && node.opts.store != nil && node.opts.storeMsgs {
if isNew && node.opts.storeMsgs {
log.Info("Subscribing store to topic ", t)
node.bcaster.Register(node.opts.store.MsgC)
node.bcaster.Register(node.store.MsgC)
}
// Subscribe filter
@ -574,10 +545,6 @@ func (node *WakuNode) Publish(ctx context.Context, message *pb.WakuMessage, topi
return nil, errors.New("message can't be null")
}
if node.lightPush != nil {
return node.LightPush(ctx, message, topic)
}
if node.relay == nil {
return nil, errors.New("WakuRelay hasn't been set")
}
@ -589,32 +556,6 @@ func (node *WakuNode) Publish(ctx context.Context, message *pb.WakuMessage, topi
return hash, nil
}
func (node *WakuNode) LightPush(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic, opts ...lightpush.LightPushOption) ([]byte, error) {
if node.lightPush == nil {
return nil, errors.New("WakuLightPush hasn't been set")
}
if message == nil {
return nil, errors.New("message can't be null")
}
req := new(pb.PushRequest)
req.Message = message
req.PubsubTopic = string(relay.GetTopic(topic))
response, err := node.lightPush.Request(ctx, req, opts...)
if err != nil {
return nil, err
}
if response.IsSuccess {
hash, _ := message.Hash()
return hash, nil
} else {
return nil, errors.New(response.Info)
}
}
func (w *WakuNode) DialPeerWithMultiAddress(ctx context.Context, address ma.Multiaddr) error {
info, err := peer.AddrInfoFromP2pAddr(address)
if err != nil {

View File

@ -33,10 +33,10 @@ type WakuNodeParameters struct {
isFilterFullNode bool
wOpts []pubsub.Option
enableStore bool
shouldResume bool
storeMsgs bool
store *store.WakuStore
enableStore bool
shouldResume bool
storeMsgs bool
messageProvider store.MessageProvider
enableRendezvous bool
enableRendezvousServer bool
@ -166,7 +166,6 @@ func WithWakuStore(shouldStoreMessages bool, shouldResume bool) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableStore = true
params.storeMsgs = shouldStoreMessages
params.store = store.NewWakuStore(shouldStoreMessages, nil)
params.shouldResume = shouldResume
return nil
}
@ -176,11 +175,7 @@ func WithWakuStore(shouldStoreMessages bool, shouldResume bool) WakuNodeOption {
// used to store and retrieve persisted messages
func WithMessageProvider(s store.MessageProvider) WakuNodeOption {
return func(params *WakuNodeParameters) error {
if params.store != nil {
params.store.SetMsgProvider(s)
} else {
params.store = store.NewWakuStore(true, s)
}
params.messageProvider = s
return nil
}
}

View File

@ -40,14 +40,18 @@ func NewWakuLightPush(ctx context.Context, h host.Host, relay *relay.WakuRelay)
wakuLP.ctx = ctx
wakuLP.h = h
if relay != nil {
wakuLP.h.SetStreamHandlerMatch(LightPushID_v20beta1, protocol.PrefixTextMatch(string(LightPushID_v20beta1)), wakuLP.onRequest)
log.Info("Light Push protocol started")
} else {
log.Info("Light Push protocol started (only client mode)")
return wakuLP
}
func (wakuLP *WakuLightPush) Start() error {
if wakuLP.relay == nil {
return errors.New("relay is required")
}
return wakuLP
wakuLP.h.SetStreamHandlerMatch(LightPushID_v20beta1, protocol.PrefixTextMatch(string(LightPushID_v20beta1)), wakuLP.onRequest)
log.Info("Light Push protocol started")
return nil
}
func (wakuLP *WakuLightPush) onRequest(s network.Stream) {
@ -74,7 +78,9 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) {
response := new(pb.PushResponse)
if wakuLP.relay != nil {
// XXX Assumes success, should probably be extended to check for network, peers, etc
// TODO: Assumes success, should probably be extended to check for network, peers, etc
// It might make sense to use WithReadiness option here?
_, err := wakuLP.relay.Publish(wakuLP.ctx, message, &pubSubTopic)
if err != nil {
@ -157,7 +163,7 @@ func DefaultOptions() []LightPushOption {
}
}
func (wakuLP *WakuLightPush) Request(ctx context.Context, req *pb.PushRequest, opts ...LightPushOption) (*pb.PushResponse, error) {
func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, opts ...LightPushOption) (*pb.PushResponse, error) {
params := new(LightPushParameters)
params.lp = wakuLP
@ -217,3 +223,25 @@ func (wakuLP *WakuLightPush) Request(ctx context.Context, req *pb.PushRequest, o
func (w *WakuLightPush) Stop() {
w.h.RemoveStreamHandler(LightPushID_v20beta1)
}
func (w *WakuLightPush) Publish(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic, opts ...LightPushOption) ([]byte, error) {
if message == nil {
return nil, errors.New("message can't be null")
}
req := new(pb.PushRequest)
req.Message = message
req.PubsubTopic = string(relay.GetTopic(topic))
response, err := w.request(ctx, req, opts...)
if err != nil {
return nil, err
}
if response.IsSuccess {
hash, _ := message.Hash()
return hash, nil
} else {
return nil, errors.New(response.Info)
}
}

View File

@ -11,6 +11,7 @@ import (
"github.com/libp2p/go-libp2p-core/peerstore"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/status-im/go-waku/tests"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/protocol/relay"
"github.com/stretchr/testify/require"
@ -55,6 +56,8 @@ func TestWakuLightPush(t *testing.T) {
ctx := context.Background()
lightPushNode2 := NewWakuLightPush(ctx, host2, node2)
err := lightPushNode2.Start()
require.NoError(t, err)
defer lightPushNode2.Stop()
port, err := tests.FindFreePort(t, "", 5)
@ -75,13 +78,11 @@ func TestWakuLightPush(t *testing.T) {
err = clientHost.Peerstore().AddProtocols(host2.ID(), string(LightPushID_v20beta1))
require.NoError(t, err)
msg1 := tests.CreateWakuMessage("test1", float64(0))
msg2 := tests.CreateWakuMessage("test2", float64(1))
req := new(pb.PushRequest)
req.Message = &pb.WakuMessage{
Payload: []byte{1},
Version: 0,
ContentTopic: "test",
Timestamp: 0,
}
req.Message = msg1
req.PubsubTopic = string(testTopic)
// Wait for the mesh connection to happen between node1 and node2
@ -93,6 +94,9 @@ func TestWakuLightPush(t *testing.T) {
defer wg.Done()
_, err := sub1.Next(context.Background())
require.NoError(t, err)
_, err = sub1.Next(context.Background())
require.NoError(t, err)
}()
wg.Add(1)
@ -100,11 +104,46 @@ func TestWakuLightPush(t *testing.T) {
defer wg.Done()
_, err := sub2.Next(context.Background())
require.NoError(t, err)
_, err = sub2.Next(context.Background())
require.NoError(t, err)
}()
resp, err := client.Request(ctx, req, []LightPushOption{}...)
// Verifying succesful request
resp, err := client.request(ctx, req)
require.NoError(t, err)
require.True(t, resp.IsSuccess)
// Checking that msg hash is correct
hash, err := client.Publish(ctx, msg2, &testTopic)
require.NoError(t, err)
require.Equal(t, protocol.NewEnvelope(msg2, string(testTopic)).Hash(), hash)
wg.Wait()
}
func TestWakuLightPushStartWithoutRelay(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader)
require.NoError(t, err)
client := NewWakuLightPush(ctx, clientHost, nil)
err = client.Start()
require.Errorf(t, err, "relay is required")
}
func TestWakuLightPushNoPeers(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
testTopic := relay.Topic("abc")
clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader)
require.NoError(t, err)
client := NewWakuLightPush(ctx, clientHost, nil)
_, err = client.Publish(ctx, tests.CreateWakuMessage("test", float64(0)), &testTopic)
require.Errorf(t, err, "no suitable remote peers")
}

View File

@ -20,7 +20,7 @@ func TestFindLastSeenMessage(t *testing.T) {
msg4 := tests.CreateWakuMessage("4", 4)
msg5 := tests.CreateWakuMessage("5", 5)
s := NewWakuStore(true, nil)
s := NewWakuStore(nil)
s.storeMessage("test", msg1)
s.storeMessage("test", msg3)
s.storeMessage("test", msg5)
@ -37,7 +37,7 @@ func TestResume(t *testing.T) {
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(true, nil)
s1 := NewWakuStore(nil)
s1.Start(ctx, host1)
defer s1.Stop()
@ -54,7 +54,7 @@ func TestResume(t *testing.T) {
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s2 := NewWakuStore(false, nil)
s2 := NewWakuStore(nil)
s2.Start(ctx, host2)
defer s2.Stop()
@ -86,7 +86,7 @@ func TestResumeWithListOfPeers(t *testing.T) {
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(true, nil)
s1 := NewWakuStore(nil)
s1.Start(ctx, host1)
defer s1.Stop()
@ -97,7 +97,7 @@ func TestResumeWithListOfPeers(t *testing.T) {
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s2 := NewWakuStore(false, nil)
s2 := NewWakuStore(nil)
s2.Start(ctx, host2)
defer s2.Stop()
@ -119,7 +119,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(true, nil)
s1 := NewWakuStore(nil)
s1.Start(ctx, host1)
defer s1.Stop()
@ -130,7 +130,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s2 := NewWakuStore(false, nil)
s2 := NewWakuStore(nil)
s2.Start(ctx, host2)
defer s2.Stop()

View File

@ -227,18 +227,17 @@ type WakuStore struct {
messages []IndexedWakuMessage
seen map[[32]byte]struct{}
started bool
messagesMutex sync.Mutex
storeMsgs bool
msgProvider MessageProvider
h host.Host
}
func NewWakuStore(shouldStoreMessages bool, p MessageProvider) *WakuStore {
func NewWakuStore(p MessageProvider) *WakuStore {
wakuStore := new(WakuStore)
wakuStore.MsgC = make(chan *protocol.Envelope)
wakuStore.msgProvider = p
wakuStore.storeMsgs = shouldStoreMessages
wakuStore.seen = make(map[[32]byte]struct{})
return wakuStore
@ -249,14 +248,15 @@ func (store *WakuStore) SetMsgProvider(p MessageProvider) {
}
func (store *WakuStore) Start(ctx context.Context, h host.Host) {
store.h = h
store.ctx = ctx
if !store.storeMsgs {
log.Info("Store protocol started (messages aren't stored)")
if store.started {
return
}
store.started = true
store.h = h
store.ctx = ctx
store.MsgC = make(chan *protocol.Envelope)
store.h.SetStreamHandlerMatch(StoreID_v20beta3, protocol.PrefixTextMatch(string(StoreID_v20beta3)), store.onRequest)
go store.storeIncomingMessages(ctx)
@ -272,7 +272,11 @@ func (store *WakuStore) Start(ctx context.Context, h host.Host) {
}
func (store *WakuStore) fetchDBRecords(ctx context.Context) {
storedMessages, err := store.msgProvider.GetAll()
if store.msgProvider == nil {
return
}
storedMessages, err := (store.msgProvider).GetAll()
if err != nil {
log.Error("could not load DBProvider messages", err)
metrics.RecordStoreError(ctx, "store_load_failure")
@ -647,6 +651,10 @@ func (store *WakuStore) findLastSeen() float64 {
// the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList []peer.ID) (int, error) {
if !store.started {
return 0, errors.New("can't resume: store has not started")
}
currentTime := utils.GetUnixEpoch()
lastSeenTime := store.findLastSeen()
@ -690,11 +698,15 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList
store.storeMessage(pubsubTopic, msg)
}
log.Info("Retrieved messages since the last online time: ", len(response.Messages))
return len(response.Messages), nil
}
// TODO: queryWithAccounting
func (w *WakuStore) Stop() {
w.started = false
close(w.MsgC)
w.h.RemoveStreamHandler(StoreID_v20beta3)
}

View File

@ -23,7 +23,7 @@ func TestStorePersistence(t *testing.T) {
dbStore, err := persistence.NewDBStore(persistence.WithDB(db))
require.NoError(t, err)
s1 := NewWakuStore(true, dbStore)
s1 := NewWakuStore(dbStore)
s1.fetchDBRecords(ctx)
require.Len(t, s1.messages, 0)
@ -38,7 +38,7 @@ func TestStorePersistence(t *testing.T) {
s1.storeMessage(defaultPubSubTopic, msg)
s2 := NewWakuStore(true, dbStore)
s2 := NewWakuStore(dbStore)
s2.fetchDBRecords(ctx)
require.Len(t, s2.messages, 1)
require.Equal(t, msg, s2.messages[0].msg)

View File

@ -20,7 +20,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(true, nil)
s1 := NewWakuStore(nil)
s1.Start(ctx, host1)
defer s1.Stop()
@ -39,7 +39,7 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
// Simulate a message has been received via relay protocol
s1.MsgC <- protocol.NewEnvelope(msg, pubsubTopic1)
s2 := NewWakuStore(false, nil)
s2 := NewWakuStore(nil)
s2.Start(ctx, host2)
defer s2.Stop()
@ -66,7 +66,7 @@ func TestWakuStoreProtocolNext(t *testing.T) {
host1, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(true, nil)
s1 := NewWakuStore(nil)
s1.Start(ctx, host1)
defer s1.Stop()
@ -92,7 +92,7 @@ func TestWakuStoreProtocolNext(t *testing.T) {
err = host2.Peerstore().AddProtocols(host1.ID(), string(StoreID_v20beta3))
require.NoError(t, err)
s2 := NewWakuStore(false, nil)
s2 := NewWakuStore(nil)
s2.Start(ctx, host2)
defer s2.Stop()

View File

@ -16,7 +16,7 @@ func TestStoreQuery(t *testing.T) {
msg1 := tests.CreateWakuMessage(defaultContentTopic, utils.GetUnixEpoch())
msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch())
s := NewWakuStore(true, nil)
s := NewWakuStore(nil)
s.storeMessage(defaultPubSubTopic, msg1)
s.storeMessage(defaultPubSubTopic, msg2)
@ -42,7 +42,7 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) {
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(true, nil)
s := NewWakuStore(nil)
s.storeMessage(defaultPubSubTopic, msg1)
s.storeMessage(defaultPubSubTopic, msg2)
@ -76,7 +76,7 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) {
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(true, nil)
s := NewWakuStore(nil)
s.storeMessage(pubsubTopic1, msg1)
s.storeMessage(pubsubTopic2, msg2)
s.storeMessage(pubsubTopic2, msg3)
@ -108,7 +108,7 @@ func TestStoreQueryPubsubTopicNoMatch(t *testing.T) {
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(true, nil)
s := NewWakuStore(nil)
s.storeMessage(pubsubTopic2, msg1)
s.storeMessage(pubsubTopic2, msg2)
s.storeMessage(pubsubTopic2, msg3)
@ -130,7 +130,7 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) {
msg2 := tests.CreateWakuMessage(topic2, utils.GetUnixEpoch())
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(true, nil)
s := NewWakuStore(nil)
s.storeMessage(pubsubTopic1, msg1)
s.storeMessage(pubsubTopic1, msg2)
s.storeMessage(pubsubTopic1, msg3)
@ -149,7 +149,7 @@ func TestStoreQueryForwardPagination(t *testing.T) {
topic1 := "1"
pubsubTopic1 := "topic1"
s := NewWakuStore(true, nil)
s := NewWakuStore(nil)
for i := 0; i < 10; i++ {
msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch())
msg.Payload = []byte{byte(i)}
@ -173,7 +173,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) {
topic1 := "1"
pubsubTopic1 := "topic1"
s := NewWakuStore(true, nil)
s := NewWakuStore(nil)
for i := 0; i < 10; i++ {
msg := &pb.WakuMessage{
Payload: []byte{byte(i)},
@ -199,7 +199,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) {
}
func TestTemporalHistoryQueries(t *testing.T) {
s := NewWakuStore(true, nil)
s := NewWakuStore(nil)
var messages []*pb.WakuMessage
for i := 0; i < 10; i++ {