refactor: use envelopes in the store

Also do some linting and add documentation for some functions
This commit is contained in:
Richard Ramos 2021-11-05 16:09:48 -04:00
parent 0df1a21dba
commit c98769b7f2
14 changed files with 131 additions and 100 deletions

View File

@ -5,6 +5,8 @@ import (
"net" "net"
) )
// GetResolver returns a *net.Resolver object using a custom nameserver, or
// the default system resolver if no nameserver is specified
func GetResolver(ctx context.Context, nameserver string) *net.Resolver { func GetResolver(ctx context.Context, nameserver string) *net.Resolver {
if nameserver == "" { if nameserver == "" {
return net.DefaultResolver return net.DefaultResolver

View File

@ -45,7 +45,7 @@ func TestConnectionStatusChanges(t *testing.T) {
node1, err := New(ctx, node1, err := New(ctx,
WithHostAddress([]*net.TCPAddr{hostAddr1}), WithHostAddress([]*net.TCPAddr{hostAddr1}),
WithWakuRelay(), WithWakuRelay(),
WithConnStatusChan(connStatusChan), WithConnectionStatusChannel(connStatusChan),
) )
require.NoError(t, err) require.NoError(t, err)
err = node1.Start() err = node1.Start()

View File

@ -48,7 +48,7 @@ type KeyInfo struct {
} }
// Encodes a payload depending on the version parameter. // Encode encodes a payload depending on the version parameter.
// 0 for raw unencrypted data, and 1 for using WakuV1 encoding. // 0 for raw unencrypted data, and 1 for using WakuV1 encoding.
func (payload Payload) Encode(version uint32) ([]byte, error) { func (payload Payload) Encode(version uint32) ([]byte, error) {
switch version { switch version {
@ -105,7 +105,7 @@ func EncodeWakuMessage(message *pb.WakuMessage, keyInfo *KeyInfo) error {
return nil return nil
} }
// Decodes a WakuMessage depending on the version parameter. // DecodePayload decodes a WakuMessage depending on the version parameter.
// 0 for raw unencrypted data, and 1 for using WakuV1 decoding // 0 for raw unencrypted data, and 1 for using WakuV1 decoding
func DecodePayload(message *pb.WakuMessage, keyInfo *KeyInfo) (*DecodedPayload, error) { func DecodePayload(message *pb.WakuMessage, keyInfo *KeyInfo) (*DecodedPayload, error) {
switch message.Version { switch message.Version {

View File

@ -112,8 +112,8 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) {
return nil, err return nil, err
} }
if params.connStatusChan != nil { if params.connStatusC != nil {
w.connStatusChan = params.connStatusChan w.connStatusChan = params.connStatusC
} }
w.connectionNotif = NewConnectionNotifier(ctx, host) w.connectionNotif = NewConnectionNotifier(ctx, host)
@ -346,8 +346,8 @@ func (w *WakuNode) AddPeer(address ma.Multiaddr, protocolID p2pproto.ID) (*peer.
// Wrapper around WakuFilter.Subscribe // Wrapper around WakuFilter.Subscribe
// that adds a Filter object to node.filters // that adds a Filter object to node.filters
func (node *WakuNode) SubscribeFilter(ctx context.Context, f filter.ContentFilter) (filterID string, ch chan *protocol.Envelope, err error) { func (w *WakuNode) SubscribeFilter(ctx context.Context, f filter.ContentFilter) (filterID string, ch chan *protocol.Envelope, err error) {
if node.filter == nil { if w.filter == nil {
err = errors.New("WakuFilter is not set") err = errors.New("WakuFilter is not set")
return return
} }
@ -357,7 +357,7 @@ func (node *WakuNode) SubscribeFilter(ctx context.Context, f filter.ContentFilte
// Registers for messages that match a specific filter. Triggers the handler whenever a message is received. // Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
// ContentFilterChan takes MessagePush structs // ContentFilterChan takes MessagePush structs
subs, err := node.filter.Subscribe(ctx, f) subs, err := w.filter.Subscribe(ctx, f)
if err != nil || subs.RequestID == "" { if err != nil || subs.RequestID == "" {
// Failed to subscribe // Failed to subscribe
log.Error("remote subscription to filter failed", err) log.Error("remote subscription to filter failed", err)
@ -367,7 +367,7 @@ func (node *WakuNode) SubscribeFilter(ctx context.Context, f filter.ContentFilte
ch = make(chan *protocol.Envelope, 1024) // To avoid blocking ch = make(chan *protocol.Envelope, 1024) // To avoid blocking
// Register handler for filter, whether remote subscription succeeded or not // Register handler for filter, whether remote subscription succeeded or not
node.filters[subs.RequestID] = filter.Filter{ w.filters[subs.RequestID] = filter.Filter{
PeerID: subs.Peer, PeerID: subs.Peer,
Topic: f.Topic, Topic: f.Topic,
ContentFilters: f.ContentTopics, ContentFilters: f.ContentTopics,
@ -379,11 +379,11 @@ func (node *WakuNode) SubscribeFilter(ctx context.Context, f filter.ContentFilte
// UnsubscribeFilterByID removes a subscription to a filter node completely // UnsubscribeFilterByID removes a subscription to a filter node completely
// using the filterID returned when the subscription was created // using the filterID returned when the subscription was created
func (node *WakuNode) UnsubscribeFilterByID(ctx context.Context, filterID string) error { func (w *WakuNode) UnsubscribeFilterByID(ctx context.Context, filterID string) error {
var f filter.Filter var f filter.Filter
var ok bool var ok bool
if f, ok = node.filters[filterID]; !ok { if f, ok = w.filters[filterID]; !ok {
return errors.New("filter not found") return errors.New("filter not found")
} }
@ -392,29 +392,29 @@ func (node *WakuNode) UnsubscribeFilterByID(ctx context.Context, filterID string
ContentTopics: f.ContentFilters, ContentTopics: f.ContentFilters,
} }
err := node.filter.Unsubscribe(ctx, cf, f.PeerID) err := w.filter.Unsubscribe(ctx, cf, f.PeerID)
if err != nil { if err != nil {
return err return err
} }
close(f.Chan) close(f.Chan)
delete(node.filters, filterID) delete(w.filters, filterID)
return nil return nil
} }
// Unsubscribe filter removes content topics from a filter subscription. If all // Unsubscribe filter removes content topics from a filter subscription. If all
// the contentTopics are removed the subscription is dropped completely // the contentTopics are removed the subscription is dropped completely
func (node *WakuNode) UnsubscribeFilter(ctx context.Context, cf filter.ContentFilter) error { func (w *WakuNode) UnsubscribeFilter(ctx context.Context, cf filter.ContentFilter) error {
// Remove local filter // Remove local filter
var idsToRemove []string var idsToRemove []string
for id, f := range node.filters { for id, f := range w.filters {
if f.Topic != cf.Topic { if f.Topic != cf.Topic {
continue continue
} }
// Send message to full node in order to unsubscribe // Send message to full node in order to unsubscribe
err := node.filter.Unsubscribe(ctx, cf, f.PeerID) err := w.filter.Unsubscribe(ctx, cf, f.PeerID)
if err != nil { if err != nil {
return err return err
} }
@ -439,10 +439,10 @@ func (node *WakuNode) UnsubscribeFilter(ctx context.Context, cf filter.ContentFi
} }
for _, rId := range idsToRemove { for _, rId := range idsToRemove {
for id := range node.filters { for id := range w.filters {
if id == rId { if id == rId {
close(node.filters[id].Chan) close(w.filters[id].Chan)
delete(node.filters, id) delete(w.filters, id)
break break
} }
} }

View File

@ -47,15 +47,17 @@ type WakuNodeParameters struct {
enableLightPush bool enableLightPush bool
connStatusChan chan ConnStatus connStatusC chan ConnStatus
} }
type WakuNodeOption func(*WakuNodeParameters) error type WakuNodeOption func(*WakuNodeParameters) error
// MultiAddresses return the list of multiaddresses configured in the node
func (w WakuNodeParameters) MultiAddresses() []ma.Multiaddr { func (w WakuNodeParameters) MultiAddresses() []ma.Multiaddr {
return w.multiAddr return w.multiAddr
} }
// Identity returns a libp2p option containing the identity used by the node
func (w WakuNodeParameters) Identity() config.Option { func (w WakuNodeParameters) Identity() config.Option {
return libp2p.Identity(*w.privKey) return libp2p.Identity(*w.privKey)
} }
@ -134,6 +136,8 @@ func WithWakuRelay(opts ...pubsub.Option) WakuNodeOption {
} }
} }
// WithRendezvous is a WakuOption used to enable go-waku-rendezvous discovery.
// It accepts an optional list of DiscoveryOpt options
func WithRendezvous(discoverOpts ...pubsub.DiscoverOpt) WakuNodeOption { func WithRendezvous(discoverOpts ...pubsub.DiscoverOpt) WakuNodeOption {
return func(params *WakuNodeParameters) error { return func(params *WakuNodeParameters) error {
params.enableRendezvous = true params.enableRendezvous = true
@ -142,6 +146,8 @@ func WithRendezvous(discoverOpts ...pubsub.DiscoverOpt) WakuNodeOption {
} }
} }
// WithRendezvousServer is a WakuOption used to set the node as a rendezvous
// point, using an specific storage for the peer information
func WithRendezvousServer(storage rendezvous.Storage) WakuNodeOption { func WithRendezvousServer(storage rendezvous.Storage) WakuNodeOption {
return func(params *WakuNodeParameters) error { return func(params *WakuNodeParameters) error {
params.enableRendezvousServer = true params.enableRendezvousServer = true
@ -188,6 +194,8 @@ func WithLightPush() WakuNodeOption {
} }
} }
// WithKeepAlive is a WakuNodeOption used to set the interval of time when
// each peer will be ping to keep the TCP connection alive
func WithKeepAlive(t time.Duration) WakuNodeOption { func WithKeepAlive(t time.Duration) WakuNodeOption {
return func(params *WakuNodeParameters) error { return func(params *WakuNodeParameters) error {
params.keepAliveInterval = t params.keepAliveInterval = t
@ -195,9 +203,12 @@ func WithKeepAlive(t time.Duration) WakuNodeOption {
} }
} }
func WithConnStatusChan(connStatusChan chan ConnStatus) WakuNodeOption { // WithConnectionStatusChannel is a WakuNodeOption used to set a channel where the
// connection status changes will be pushed to. It's useful to identify when peer
// connections and disconnections occur
func WithConnectionStatusChannel(connStatus chan ConnStatus) WakuNodeOption {
return func(params *WakuNodeParameters) error { return func(params *WakuNodeParameters) error {
params.connStatusChan = connStatusChan params.connStatusC = connStatus
return nil return nil
} }
} }

View File

@ -2,6 +2,9 @@ package protocol
import "github.com/status-im/go-waku/waku/v2/protocol/pb" import "github.com/status-im/go-waku/waku/v2/protocol/pb"
// Envelope contains information about the pubsub topic of a WakuMessage
// and a hash used to identify a message based on the bytes of a WakuMessage
// protobuffer
type Envelope struct { type Envelope struct {
msg *pb.WakuMessage msg *pb.WakuMessage
pubsubTopic string pubsubTopic string

View File

@ -220,11 +220,11 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o
return pushResponseRPC.Response, nil return pushResponseRPC.Response, nil
} }
func (w *WakuLightPush) Stop() { func (wakuLP *WakuLightPush) Stop() {
w.h.RemoveStreamHandler(LightPushID_v20beta1) wakuLP.h.RemoveStreamHandler(LightPushID_v20beta1)
} }
func (w *WakuLightPush) Publish(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic, opts ...LightPushOption) ([]byte, error) { func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic, opts ...LightPushOption) ([]byte, error) {
if message == nil { if message == nil {
return nil, errors.New("message can't be null") return nil, errors.New("message can't be null")
} }
@ -233,7 +233,7 @@ func (w *WakuLightPush) Publish(ctx context.Context, message *pb.WakuMessage, to
req.Message = message req.Message = message
req.PubsubTopic = string(relay.GetTopic(topic)) req.PubsubTopic = string(relay.GetTopic(topic))
response, err := w.request(ctx, req, opts...) response, err := wakuLP.request(ctx, req, opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -9,25 +9,26 @@ import (
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/peerstore"
"github.com/status-im/go-waku/tests" "github.com/status-im/go-waku/tests"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestFindLastSeenMessage(t *testing.T) { func TestFindLastSeenMessage(t *testing.T) {
msg1 := tests.CreateWakuMessage("1", 1) msg1 := protocol.NewEnvelope(tests.CreateWakuMessage("1", 1), "test")
msg2 := tests.CreateWakuMessage("2", 2) msg2 := protocol.NewEnvelope(tests.CreateWakuMessage("2", 2), "test")
msg3 := tests.CreateWakuMessage("3", 3) msg3 := protocol.NewEnvelope(tests.CreateWakuMessage("3", 3), "test")
msg4 := tests.CreateWakuMessage("4", 4) msg4 := protocol.NewEnvelope(tests.CreateWakuMessage("4", 4), "test")
msg5 := tests.CreateWakuMessage("5", 5) msg5 := protocol.NewEnvelope(tests.CreateWakuMessage("5", 5), "test")
s := NewWakuStore(nil) s := NewWakuStore(nil)
s.storeMessage("test", msg1) s.storeMessage(msg1)
s.storeMessage("test", msg3) s.storeMessage(msg3)
s.storeMessage("test", msg5) s.storeMessage(msg5)
s.storeMessage("test", msg2) s.storeMessage(msg2)
s.storeMessage("test", msg4) s.storeMessage(msg4)
require.Equal(t, msg5.Timestamp, s.findLastSeen()) require.Equal(t, msg5.Message().Timestamp, s.findLastSeen())
} }
func TestResume(t *testing.T) { func TestResume(t *testing.T) {
@ -47,8 +48,8 @@ func TestResume(t *testing.T) {
contentTopic = "2" contentTopic = "2"
} }
msg := tests.CreateWakuMessage(contentTopic, float64(time.Duration(i)*time.Second)) msg := protocol.NewEnvelope(tests.CreateWakuMessage(contentTopic, float64(time.Duration(i)*time.Second)), "test")
s1.storeMessage("test", msg) s1.storeMessage(msg)
} }
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
@ -92,7 +93,7 @@ func TestResumeWithListOfPeers(t *testing.T) {
msg0 := &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: "2", Version: 0, Timestamp: float64(0 * time.Second)} msg0 := &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: "2", Version: 0, Timestamp: float64(0 * time.Second)}
s1.storeMessage("test", msg0) s1.storeMessage(protocol.NewEnvelope(msg0, "test"))
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err) require.NoError(t, err)
@ -125,7 +126,7 @@ func TestResumeWithoutSpecifyingPeer(t *testing.T) {
msg0 := &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: "2", Version: 0, Timestamp: float64(0 * time.Second)} msg0 := &pb.WakuMessage{Payload: []byte{1, 2, 3}, ContentTopic: "2", Version: 0, Timestamp: float64(0 * time.Second)}
s1.storeMessage("test", msg0) s1.storeMessage(protocol.NewEnvelope(msg0, "test"))
host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) host2, err := libp2p.New(ctx, libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err) require.NoError(t, err)

View File

@ -3,7 +3,6 @@ package store
import ( import (
"bytes" "bytes"
"context" "context"
"crypto/sha256"
"encoding/hex" "encoding/hex"
"errors" "errors"
"fmt" "fmt"
@ -27,8 +26,11 @@ import (
var log = logging.Logger("wakustore") var log = logging.Logger("wakustore")
// StoreID_v20beta3 is the current Waku Store protocol identifier
const StoreID_v20beta3 = libp2pProtocol.ID("/vac/waku/store/2.0.0-beta3") const StoreID_v20beta3 = libp2pProtocol.ID("/vac/waku/store/2.0.0-beta3")
const MaxPageSize = 100 // Maximum number of waku messages in each page
// MaxPageSize is the maximum number of waku messages to return per page
const MaxPageSize = 100
var ( var (
ErrNoPeersAvailable = errors.New("no suitable remote peers") ErrNoPeersAvailable = errors.New("no suitable remote peers")
@ -138,11 +140,11 @@ func paginateWithoutIndex(list []IndexedWakuMessage, pinfo *pb.PagingInfo) (resM
return return
} }
func (w *WakuStore) FindMessages(query *pb.HistoryQuery) *pb.HistoryResponse { func (store *WakuStore) FindMessages(query *pb.HistoryQuery) *pb.HistoryResponse {
result := new(pb.HistoryResponse) result := new(pb.HistoryResponse)
// data holds IndexedWakuMessage whose topics match the query // data holds IndexedWakuMessage whose topics match the query
var data []IndexedWakuMessage var data []IndexedWakuMessage
for _, indexedMsg := range w.messages { for _, indexedMsg := range store.messages {
// temporal filtering // temporal filtering
// check whether the history query contains a time filter // check whether the history query contains a time filter
if query.StartTime != 0 && query.EndTime != 0 { if query.StartTime != 0 && query.EndTime != 0 {
@ -195,6 +197,7 @@ type Query struct {
EndTime float64 EndTime float64
} }
// Result represents a valid response from a store node
type Result struct { type Result struct {
Messages []*pb.WakuMessage Messages []*pb.WakuMessage
@ -235,6 +238,7 @@ type WakuStore struct {
h host.Host h host.Host
} }
// NewWakuStore creates a WakuStore using an specific MessageProvider for storing the messages
func NewWakuStore(p MessageProvider) *WakuStore { func NewWakuStore(p MessageProvider) *WakuStore {
wakuStore := new(WakuStore) wakuStore := new(WakuStore)
wakuStore.msgProvider = p wakuStore.msgProvider = p
@ -243,10 +247,12 @@ func NewWakuStore(p MessageProvider) *WakuStore {
return wakuStore return wakuStore
} }
func (store *WakuStore) SetMsgProvider(p MessageProvider) { // SetMessageProvider allows switching the message provider used with a WakuStore
func (store *WakuStore) SetMessageProvider(p MessageProvider) {
store.msgProvider = p store.msgProvider = p
} }
// Start initializes the WakuStore by enabling the protocol and fetching records from a message provider
func (store *WakuStore) Start(ctx context.Context, h host.Host) { func (store *WakuStore) Start(ctx context.Context, h host.Host) {
if store.started { if store.started {
return return
@ -307,8 +313,8 @@ func (store *WakuStore) storeMessageWithIndex(pubsubTopic string, idx *pb.Index,
store.messages = append(store.messages, IndexedWakuMessage{msg: msg, index: idx, pubsubTopic: pubsubTopic}) store.messages = append(store.messages, IndexedWakuMessage{msg: msg, index: idx, pubsubTopic: pubsubTopic})
} }
func (store *WakuStore) storeMessage(pubSubTopic string, msg *pb.WakuMessage) { func (store *WakuStore) storeMessage(env *protocol.Envelope) {
index, err := computeIndex(msg) index, err := computeIndex(env)
if err != nil { if err != nil {
log.Error("could not calculate message index", err) log.Error("could not calculate message index", err)
return return
@ -317,14 +323,14 @@ func (store *WakuStore) storeMessage(pubSubTopic string, msg *pb.WakuMessage) {
store.messagesMutex.Lock() store.messagesMutex.Lock()
defer store.messagesMutex.Unlock() defer store.messagesMutex.Unlock()
store.storeMessageWithIndex(pubSubTopic, index, msg) store.storeMessageWithIndex(env.PubsubTopic(), index, env.Message())
if store.msgProvider == nil { if store.msgProvider == nil {
metrics.RecordMessage(store.ctx, "stored", len(store.messages)) metrics.RecordMessage(store.ctx, "stored", len(store.messages))
return return
} }
err = store.msgProvider.Put(index, pubSubTopic, msg) // Should the index be stored? err = store.msgProvider.Put(index, env.PubsubTopic(), env.Message()) // Should the index be stored?
if err != nil { if err != nil {
log.Error("could not store message", err) log.Error("could not store message", err)
@ -337,7 +343,7 @@ func (store *WakuStore) storeMessage(pubSubTopic string, msg *pb.WakuMessage) {
func (store *WakuStore) storeIncomingMessages(ctx context.Context) { func (store *WakuStore) storeIncomingMessages(ctx context.Context) {
for envelope := range store.MsgC { for envelope := range store.MsgC {
store.storeMessage(envelope.PubsubTopic(), envelope.Message()) store.storeMessage(envelope)
} }
} }
@ -371,16 +377,11 @@ func (store *WakuStore) onRequest(s network.Stream) {
} }
} }
func computeIndex(msg *pb.WakuMessage) (*pb.Index, error) { func computeIndex(env *protocol.Envelope) (*pb.Index, error) {
data, err := msg.Marshal()
if err != nil {
return nil, err
}
digest := sha256.Sum256(data)
return &pb.Index{ return &pb.Index{
Digest: digest[:], Digest: env.Hash(),
ReceiverTime: utils.GetUnixEpoch(), ReceiverTime: utils.GetUnixEpoch(),
SenderTime: msg.Timestamp, SenderTime: env.Message().Timestamp,
}, nil }, nil
} }
@ -436,12 +437,15 @@ type HistoryRequestParameters struct {
type HistoryRequestOption func(*HistoryRequestParameters) type HistoryRequestOption func(*HistoryRequestParameters)
// WithPeer is an option used to specify the peerID to request the message history
func WithPeer(p peer.ID) HistoryRequestOption { func WithPeer(p peer.ID) HistoryRequestOption {
return func(params *HistoryRequestParameters) { return func(params *HistoryRequestParameters) {
params.selectedPeer = p params.selectedPeer = p
} }
} }
// WithAutomaticPeerSelection is an option used to randomly select a peer from the store
// to request the message history
func WithAutomaticPeerSelection() HistoryRequestOption { func WithAutomaticPeerSelection() HistoryRequestOption {
return func(params *HistoryRequestParameters) { return func(params *HistoryRequestParameters) {
p, err := utils.SelectPeer(params.s.h, string(StoreID_v20beta3)) p, err := utils.SelectPeer(params.s.h, string(StoreID_v20beta3))
@ -471,6 +475,7 @@ func WithCursor(c *pb.Index) HistoryRequestOption {
} }
} }
// WithPaging is an option used to specify the order and maximum number of records to return
func WithPaging(asc bool, pageSize uint64) HistoryRequestOption { func WithPaging(asc bool, pageSize uint64) HistoryRequestOption {
return func(params *HistoryRequestParameters) { return func(params *HistoryRequestParameters) {
params.asc = asc params.asc = asc
@ -478,11 +483,12 @@ func WithPaging(asc bool, pageSize uint64) HistoryRequestOption {
} }
} }
// Default options to be used when querying a store node for results
func DefaultOptions() []HistoryRequestOption { func DefaultOptions() []HistoryRequestOption {
return []HistoryRequestOption{ return []HistoryRequestOption{
WithAutomaticRequestId(), WithAutomaticRequestId(),
WithAutomaticPeerSelection(), WithAutomaticPeerSelection(),
WithPaging(true, 0), WithPaging(true, MaxPageSize),
} }
} }
@ -583,6 +589,10 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
}, nil }, nil
} }
// Next is used with to retrieve the next page of rows from a query response.
// If no more records are found, the result will not contain any messages.
// This function is useful for iterating over results without having to manually
// specify the cursor and pagination order and max number of results
func (store *WakuStore) Next(ctx context.Context, r *Result) (*Result, error) { func (store *WakuStore) Next(ctx context.Context, r *Result) (*Result, error) {
q := &pb.HistoryQuery{ q := &pb.HistoryQuery{
PubsubTopic: r.query.PubsubTopic, PubsubTopic: r.query.PubsubTopic,
@ -641,7 +651,7 @@ func (store *WakuStore) findLastSeen() float64 {
return lastSeenTime return lastSeenTime
} }
// resume proc retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online // Resume retrieves the history of waku messages published on the default waku pubsub topic since the last time the waku store node has been online
// messages are stored in the store node's messages field and in the message db // messages are stored in the store node's messages field and in the message db
// the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message // the offline time window is measured as the difference between the current time and the timestamp of the most recent persisted waku message
// an offset of 20 second is added to the time window to count for nodes asynchrony // an offset of 20 second is added to the time window to count for nodes asynchrony
@ -649,7 +659,6 @@ func (store *WakuStore) findLastSeen() float64 {
// peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed). // peerList indicates the list of peers to query from. The history is fetched from the first available peer in this list. Such candidates should be found through a discovery method (to be developed).
// if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. The history gets fetched successfully if the dialed peer has been online during the queried time window. // if no peerList is passed, one of the peers in the underlying peer manager unit of the store protocol is picked randomly to fetch the history from. The history gets fetched successfully if the dialed peer has been online during the queried time window.
// the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string // the resume proc returns the number of retrieved messages if no error occurs, otherwise returns the error string
func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList []peer.ID) (int, error) { func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList []peer.ID) (int, error) {
if !store.started { if !store.started {
return 0, errors.New("can't resume: store has not started") return 0, errors.New("can't resume: store has not started")
@ -695,7 +704,7 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList
} }
for _, msg := range response.Messages { for _, msg := range response.Messages {
store.storeMessage(pubsubTopic, msg) store.storeMessage(protocol.NewEnvelope(msg, pubsubTopic))
} }
log.Info("Retrieved messages since the last online time: ", len(response.Messages)) log.Info("Retrieved messages since the last online time: ", len(response.Messages))
@ -705,14 +714,15 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList
// TODO: queryWithAccounting // TODO: queryWithAccounting
func (w *WakuStore) Stop() { // Stop closes the store message channel and removes the protocol stream handler
w.started = false func (store *WakuStore) Stop() {
store.started = false
if w.MsgC != nil { if store.MsgC != nil {
close(w.MsgC) close(store.MsgC)
} }
if w.h != nil { if store.h != nil {
w.h.RemoveStreamHandler(StoreID_v20beta3) store.h.RemoveStreamHandler(StoreID_v20beta3)
} }
} }

View File

@ -4,6 +4,7 @@ import (
"sort" "sort"
"testing" "testing"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/utils" "github.com/status-im/go-waku/waku/v2/utils"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -15,7 +16,7 @@ func TestIndexComputation(t *testing.T) {
Timestamp: utils.GetUnixEpoch(), Timestamp: utils.GetUnixEpoch(),
} }
idx, err := computeIndex(msg) idx, err := computeIndex(protocol.NewEnvelope(msg, "test"))
require.NoError(t, err) require.NoError(t, err)
require.NotZero(t, idx.ReceiverTime) require.NotZero(t, idx.ReceiverTime)
require.Equal(t, msg.Timestamp, idx.SenderTime) require.Equal(t, msg.Timestamp, idx.SenderTime)
@ -27,7 +28,7 @@ func TestIndexComputation(t *testing.T) {
Timestamp: 123, Timestamp: 123,
ContentTopic: "/waku/2/default-content/proto", ContentTopic: "/waku/2/default-content/proto",
} }
idx1, err := computeIndex(msg1) idx1, err := computeIndex(protocol.NewEnvelope(msg1, "test"))
require.NoError(t, err) require.NoError(t, err)
msg2 := &pb.WakuMessage{ msg2 := &pb.WakuMessage{
@ -35,7 +36,7 @@ func TestIndexComputation(t *testing.T) {
Timestamp: 123, Timestamp: 123,
ContentTopic: "/waku/2/default-content/proto", ContentTopic: "/waku/2/default-content/proto",
} }
idx2, err := computeIndex(msg2) idx2, err := computeIndex(protocol.NewEnvelope(msg2, "test"))
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, idx1.Digest, idx2.Digest) require.Equal(t, idx1.Digest, idx2.Digest)
@ -174,7 +175,7 @@ func TestForwardPagination(t *testing.T) {
require.Equal(t, uint64(0), newPagingInfo.PageSize) require.Equal(t, uint64(0), newPagingInfo.PageSize)
// test for an invalid cursor // test for an invalid cursor
invalidIndex, err := computeIndex(&pb.WakuMessage{Payload: []byte{255, 255, 255}}) invalidIndex, err := computeIndex(protocol.NewEnvelope(&pb.WakuMessage{Payload: []byte{255, 255, 255}}, "test"))
require.NoError(t, err) require.NoError(t, err)
pagingInfo = &pb.PagingInfo{PageSize: 10, Cursor: invalidIndex, Direction: pb.PagingInfo_FORWARD} pagingInfo = &pb.PagingInfo{PageSize: 10, Cursor: invalidIndex, Direction: pb.PagingInfo_FORWARD}
messages, newPagingInfo = paginateWithoutIndex(msgList, pagingInfo) messages, newPagingInfo = paginateWithoutIndex(msgList, pagingInfo)
@ -258,7 +259,7 @@ func TestBackwardPagination(t *testing.T) {
require.Equal(t, uint64(0), newPagingInfo.PageSize) require.Equal(t, uint64(0), newPagingInfo.PageSize)
// test for an invalid cursor // test for an invalid cursor
invalidIndex, err := computeIndex(&pb.WakuMessage{Payload: []byte{255, 255, 255}}) invalidIndex, err := computeIndex(protocol.NewEnvelope(&pb.WakuMessage{Payload: []byte{255, 255, 255}}, "test"))
require.NoError(t, err) require.NoError(t, err)
pagingInfo = &pb.PagingInfo{PageSize: 10, Cursor: invalidIndex, Direction: pb.PagingInfo_BACKWARD} pagingInfo = &pb.PagingInfo{PageSize: 10, Cursor: invalidIndex, Direction: pb.PagingInfo_BACKWARD}
messages, newPagingInfo = paginateWithoutIndex(msgList, pagingInfo) messages, newPagingInfo = paginateWithoutIndex(msgList, pagingInfo)

View File

@ -7,6 +7,7 @@ import (
"github.com/status-im/go-waku/waku/persistence" "github.com/status-im/go-waku/waku/persistence"
"github.com/status-im/go-waku/waku/persistence/sqlite" "github.com/status-im/go-waku/waku/persistence/sqlite"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/utils" "github.com/status-im/go-waku/waku/v2/utils"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -36,7 +37,7 @@ func TestStorePersistence(t *testing.T) {
Timestamp: utils.GetUnixEpoch(), Timestamp: utils.GetUnixEpoch(),
} }
s1.storeMessage(defaultPubSubTopic, msg) s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic))
s2 := NewWakuStore(dbStore) s2 := NewWakuStore(dbStore)
s2.fetchDBRecords(ctx) s2.fetchDBRecords(ctx)
@ -44,5 +45,5 @@ func TestStorePersistence(t *testing.T) {
require.Equal(t, msg, s2.messages[0].msg) require.Equal(t, msg, s2.messages[0].msg)
// Storing a duplicated message should not crash. It's okay to generate an error log in this case // Storing a duplicated message should not crash. It's okay to generate an error log in this case
s1.storeMessage(defaultPubSubTopic, msg) s1.storeMessage(protocol.NewEnvelope(msg, defaultPubSubTopic))
} }

View File

@ -4,6 +4,7 @@ import (
"testing" "testing"
"github.com/status-im/go-waku/tests" "github.com/status-im/go-waku/tests"
"github.com/status-im/go-waku/waku/v2/protocol"
"github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/utils" "github.com/status-im/go-waku/waku/v2/utils"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -17,8 +18,8 @@ func TestStoreQuery(t *testing.T) {
msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch()) msg2 := tests.CreateWakuMessage("2", utils.GetUnixEpoch())
s := NewWakuStore(nil) s := NewWakuStore(nil)
s.storeMessage(defaultPubSubTopic, msg1) s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic))
s.storeMessage(defaultPubSubTopic, msg2) s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic))
response := s.FindMessages(&pb.HistoryQuery{ response := s.FindMessages(&pb.HistoryQuery{
ContentFilters: []*pb.ContentFilter{ ContentFilters: []*pb.ContentFilter{
@ -44,9 +45,9 @@ func TestStoreQueryMultipleContentFilters(t *testing.T) {
s := NewWakuStore(nil) s := NewWakuStore(nil)
s.storeMessage(defaultPubSubTopic, msg1) s.storeMessage(protocol.NewEnvelope(msg1, defaultPubSubTopic))
s.storeMessage(defaultPubSubTopic, msg2) s.storeMessage(protocol.NewEnvelope(msg2, defaultPubSubTopic))
s.storeMessage(defaultPubSubTopic, msg3) s.storeMessage(protocol.NewEnvelope(msg3, defaultPubSubTopic))
response := s.FindMessages(&pb.HistoryQuery{ response := s.FindMessages(&pb.HistoryQuery{
ContentFilters: []*pb.ContentFilter{ ContentFilters: []*pb.ContentFilter{
@ -77,9 +78,9 @@ func TestStoreQueryPubsubTopicFilter(t *testing.T) {
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(nil) s := NewWakuStore(nil)
s.storeMessage(pubsubTopic1, msg1) s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1))
s.storeMessage(pubsubTopic2, msg2) s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2))
s.storeMessage(pubsubTopic2, msg3) s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2))
response := s.FindMessages(&pb.HistoryQuery{ response := s.FindMessages(&pb.HistoryQuery{
PubsubTopic: pubsubTopic1, PubsubTopic: pubsubTopic1,
@ -109,9 +110,9 @@ func TestStoreQueryPubsubTopicNoMatch(t *testing.T) {
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(nil) s := NewWakuStore(nil)
s.storeMessage(pubsubTopic2, msg1) s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic2))
s.storeMessage(pubsubTopic2, msg2) s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic2))
s.storeMessage(pubsubTopic2, msg3) s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic2))
response := s.FindMessages(&pb.HistoryQuery{ response := s.FindMessages(&pb.HistoryQuery{
PubsubTopic: pubsubTopic1, PubsubTopic: pubsubTopic1,
@ -131,9 +132,9 @@ func TestStoreQueryPubsubTopicAllMessages(t *testing.T) {
msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch()) msg3 := tests.CreateWakuMessage(topic3, utils.GetUnixEpoch())
s := NewWakuStore(nil) s := NewWakuStore(nil)
s.storeMessage(pubsubTopic1, msg1) s.storeMessage(protocol.NewEnvelope(msg1, pubsubTopic1))
s.storeMessage(pubsubTopic1, msg2) s.storeMessage(protocol.NewEnvelope(msg2, pubsubTopic1))
s.storeMessage(pubsubTopic1, msg3) s.storeMessage(protocol.NewEnvelope(msg3, pubsubTopic1))
response := s.FindMessages(&pb.HistoryQuery{ response := s.FindMessages(&pb.HistoryQuery{
PubsubTopic: pubsubTopic1, PubsubTopic: pubsubTopic1,
@ -153,7 +154,7 @@ func TestStoreQueryForwardPagination(t *testing.T) {
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch()) msg := tests.CreateWakuMessage(topic1, utils.GetUnixEpoch())
msg.Payload = []byte{byte(i)} msg.Payload = []byte{byte(i)}
s.storeMessage(pubsubTopic1, msg) s.storeMessage(protocol.NewEnvelope(msg, pubsubTopic1))
} }
response := s.FindMessages(&pb.HistoryQuery{ response := s.FindMessages(&pb.HistoryQuery{
@ -181,7 +182,7 @@ func TestStoreQueryBackwardPagination(t *testing.T) {
Version: 0, Version: 0,
Timestamp: utils.GetUnixEpoch(), Timestamp: utils.GetUnixEpoch(),
} }
s.storeMessage(pubsubTopic1, msg) s.storeMessage(protocol.NewEnvelope(msg, pubsubTopic1))
} }
@ -208,7 +209,7 @@ func TestTemporalHistoryQueries(t *testing.T) {
contentTopic = "2" contentTopic = "2"
} }
msg := tests.CreateWakuMessage(contentTopic, float64(i)) msg := tests.CreateWakuMessage(contentTopic, float64(i))
s.storeMessage("test", msg) s.storeMessage(protocol.NewEnvelope(msg, "test"))
messages = append(messages, msg) messages = append(messages, msg)
} }

View File

@ -2,10 +2,14 @@ package utils
import "time" import "time"
func GetUnixEpochFrom(now func() time.Time) float64 { // GetUnixEpoch converts a time into a unix timestamp with the integer part
return float64(now().UnixNano()) / float64(time.Second) // representing seconds and the decimal part representing subseconds
func GetUnixEpochFrom(now time.Time) float64 {
return float64(now.UnixNano()) / float64(time.Second)
} }
// GetUnixEpoch returns the current time in unix timestamp with the integer part
// representing seconds and the decimal part representing subseconds
func GetUnixEpoch() float64 { func GetUnixEpoch() float64 {
return GetUnixEpochFrom(time.Now) return GetUnixEpochFrom(time.Now())
} }

View File

@ -9,10 +9,7 @@ import (
func TestGetUnixEpochFrom(t *testing.T) { func TestGetUnixEpochFrom(t *testing.T) {
loc := time.UTC loc := time.UTC
timeFn := func() time.Time { timestamp := GetUnixEpochFrom(time.Date(2019, 1, 1, 0, 0, 0, 0, loc))
return time.Date(2019, 1, 1, 0, 0, 0, 0, loc)
}
timestamp := GetUnixEpochFrom(timeFn)
require.Equal(t, float64(1546300800), timestamp) require.Equal(t, float64(1546300800), timestamp)
} }