From 11161b8919d2d2ad28b9f51dc38ca28ce2c94b72 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Fri, 10 Feb 2023 16:17:23 -0400 Subject: [PATCH] feat(c-bindings): expose store protocol --- library/README.md | 13 +++ library/api.go | 4 + library/api_store.go | 33 +++++++ mobile/api.go | 64 ++++++++++++-- mobile/api_store.go | 85 ++++++++++++------- waku/db.go | 2 +- waku/node.go | 2 +- waku/v2/protocol/store/waku_store_client.go | 43 +++++++++- .../store/waku_store_protocol_test.go | 40 +++++++++ 9 files changed, 242 insertions(+), 44 deletions(-) diff --git a/library/README.md b/library/README.md index 7c03d5cc..0ee293ad 100644 --- a/library/README.md +++ b/library/README.md @@ -277,6 +277,10 @@ interface JsonConfig { discV5?: boolean; discV5BootstrapNodes?: Array; discV5UDPPort?: number; + store?: boolean; + databaseURL?: string; + storeRetentionMaxMessages?: number; + storeRetentionTimeSeconds?: number; } ``` @@ -311,6 +315,15 @@ If a key is `undefined`, or `null`, a default value will be set. - `discV5BootstrapNodes`: Array of bootstrap nodes ENR - `discV5UDPPort`: UDP port for DiscoveryV5 Default `9000` +- `store`: Enable store protocol to persist message history + Default `false` +- `databaseURL`: url connection string. Accepts SQLite and PostgreSQL connection strings + Default: `sqlite3://store.db` +- `storeRetentionMaxMessages`: max number of messages to store in the database. + Default `10000` +- `storeRetentionTimeSeconds`: max number of seconds that a message will be persisted in the database. + Default `2592000` (30d) + For example: ```json diff --git a/library/api.go b/library/api.go index 86e4807f..4199198e 100644 --- a/library/api.go +++ b/library/api.go @@ -34,6 +34,10 @@ func main() {} // - discV5BootstrapNodes: Array of bootstrap nodes ENR // - discV5UDPPort: UDP port for DiscoveryV5 // - logLevel: Set the log level. Default `INFO`. Allowed values "DEBUG", "INFO", "WARN", "ERROR", "DPANIC", "PANIC", "FATAL" +// - store: Enable Store. Default `false` +// - databaseURL: url connection string. Default: "sqlite3://store.db". Also accepts PostgreSQL connection strings +// - storeRetentionMaxMessages: max number of messages to store in the database. Default 10000 +// - storeRetentionTimeSeconds: max number of seconds that a message will be persisted in the database. Default 2592000 (30d) // //export waku_new func waku_new(configJSON *C.char) *C.char { diff --git a/library/api_store.go b/library/api_store.go index 87e45ba1..73715f86 100644 --- a/library/api_store.go +++ b/library/api_store.go @@ -40,3 +40,36 @@ func waku_store_query(queryJSON *C.char, peerID *C.char, ms C.int) *C.char { response := mobile.StoreQuery(C.GoString(queryJSON), C.GoString(peerID), int(ms)) return C.CString(response) } + +// Query historic messages stored in the localDB using waku store protocol. +// queryJSON must contain a valid json string with the following format: +// +// { +// "pubsubTopic": "...", // optional string +// "startTime": 1234, // optional, unix epoch time in nanoseconds +// "endTime": 1234, // optional, unix epoch time in nanoseconds +// "contentFilters": [ // optional +// { +// contentTopic: "contentTopic1" +// }, ... +// ], +// "pagingOptions": {// optional pagination information +// "pageSize": 40, // number +// "cursor": { // optional +// "digest": ..., +// "receiverTime": ..., +// "senderTime": ..., +// "pubsubTopic" ..., +// } +// "forward": true, // sort order +// } +// } +// +// If a non empty cursor is returned, this function should be executed again, setting the `cursor` attribute with the cursor returned in the response +// Requires the `store` option to be passed when setting up the initial configuration +// +//export waku_store_local_query +func waku_store_local_query(queryJSON *C.char) *C.char { + response := mobile.StoreLocalQuery(C.GoString(queryJSON)) + return C.CString(response) +} diff --git a/mobile/api.go b/mobile/api.go index 4cd8997b..c9814f92 100644 --- a/mobile/api.go +++ b/mobile/api.go @@ -7,6 +7,7 @@ import ( "crypto/ecdsa" "crypto/elliptic" "crypto/rand" + "database/sql" "encoding/hex" "encoding/json" "errors" @@ -14,7 +15,7 @@ import ( "net" "time" - logging "github.com/ipfs/go-log/v2" + "go.uber.org/zap/zapcore" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" @@ -22,6 +23,8 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" + "github.com/waku-org/go-waku/waku" + "github.com/waku-org/go-waku/waku/persistence" "github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/payload" "github.com/waku-org/go-waku/waku/v2/protocol" @@ -52,11 +55,15 @@ type wakuConfig struct { KeepAliveInterval *int `json:"keepAliveInterval,omitempty"` EnableRelay *bool `json:"relay"` RelayTopics []string `json:"relayTopics,omitempty"` - EnableFilter *bool `json:"filter"` - MinPeersToPublish *int `json:"minPeersToPublish"` - EnableDiscV5 *bool `json:"discV5"` - DiscV5BootstrapNodes []string `json:"discV5BootstrapNodes"` - DiscV5UDPPort *uint `json:"discV5UDPPort"` + EnableFilter *bool `json:"filter,omitempty"` + MinPeersToPublish *int `json:"minPeersToPublish,omitempty"` + EnableDiscV5 *bool `json:"discV5,omitempty"` + DiscV5BootstrapNodes []string `json:"discV5BootstrapNodes,omitempty"` + DiscV5UDPPort *uint `json:"discV5UDPPort,omitempty"` + EnableStore *bool `json:"store,omitempty"` + DatabaseURL *string `json:"databaseURL,omitempty"` + RetentionMaxMessages *int `json:"storeRetentionMaxMessages,omitempty"` + RetentionTimeSeconds *int `json:"storeRetentionTimeSeconds,omitempty"` } var defaultHost = "0.0.0.0" @@ -68,6 +75,10 @@ var defaultEnableFilter = false var defaultEnableDiscV5 = false var defaultDiscV5UDPPort = uint(9000) var defaultLogLevel = "INFO" +var defaultEnableStore = false +var defaultDatabaseURL = "sqlite3://store.db" +var defaultRetentionMaxMessages = 10000 +var defaultRetentionTimeSeconds = 30 * 24 * 60 * 60 // 30d func getConfig(configJSON string) (wakuConfig, error) { var config wakuConfig @@ -118,6 +129,22 @@ func getConfig(configJSON string) (wakuConfig, error) { config.LogLevel = &defaultLogLevel } + if config.EnableStore == nil { + config.EnableStore = &defaultEnableStore + } + + if config.DatabaseURL == nil { + config.DatabaseURL = &defaultDatabaseURL + } + + if config.RetentionMaxMessages == nil { + config.RetentionMaxMessages = &defaultRetentionMaxMessages + } + + if config.RetentionTimeSeconds == nil { + config.RetentionTimeSeconds = &defaultRetentionTimeSeconds + } + return config, nil } @@ -168,6 +195,25 @@ func NewNode(configJSON string) string { opts = append(opts, node.WithWakuFilter(false)) } + if *config.EnableStore { + var db *sql.DB + var migrationFn func(*sql.DB) error + db, migrationFn, err = waku.ExtractDBAndMigration(*config.DatabaseURL) + if err != nil { + return MakeJSONResponse(err) + } + opts = append(opts, node.WithWakuStore()) + dbStore, err := persistence.NewDBStore(utils.Logger(), + persistence.WithDB(db), + persistence.WithMigrations(migrationFn), + persistence.WithRetentionPolicy(*config.RetentionMaxMessages, time.Duration(*config.RetentionTimeSeconds)*time.Second), + ) + if err != nil { + return MakeJSONResponse(err) + } + opts = append(opts, node.WithMessageProvider(dbStore)) + } + if *config.EnableDiscV5 { var bootnodes []*enode.Node for _, addr := range config.DiscV5BootstrapNodes { @@ -182,12 +228,12 @@ func NewNode(configJSON string) string { wakuRelayTopics = config.RelayTopics - // for go-libp2p loggers - lvl, err := logging.LevelFromString(*config.LogLevel) + lvl, err := zapcore.ParseLevel(*config.LogLevel) if err != nil { return MakeJSONResponse(err) } - logging.SetAllLoggers(lvl) + + opts = append(opts, node.WithLogLevel(lvl)) w, err := node.New(opts...) if err != nil { diff --git a/mobile/api_store.go b/mobile/api_store.go index f0270f17..e189c185 100644 --- a/mobile/api_store.go +++ b/mobile/api_store.go @@ -34,6 +34,39 @@ type storeMessagesReply struct { Error string `json:"error,omitempty"` } +func queryResponse(ctx context.Context, args storeMessagesArgs, options []store.HistoryRequestOption) string { + var contentTopics []string + for _, ct := range args.ContentFilters { + contentTopics = append(contentTopics, ct.ContentTopic) + } + + res, err := wakuNode.Store().Query( + ctx, + store.Query{ + Topic: args.Topic, + ContentTopics: contentTopics, + StartTime: args.StartTime, + EndTime: args.EndTime, + }, + options..., + ) + + reply := storeMessagesReply{} + + if err != nil { + reply.Error = err.Error() + return PrepareJSONResponse(reply, nil) + } + reply.Messages = res.Messages + reply.PagingInfo = storePagingOptions{ + PageSize: args.PagingOptions.PageSize, + Cursor: res.Cursor(), + Forward: args.PagingOptions.Forward, + } + + return PrepareJSONResponse(reply, nil) +} + func StoreQuery(queryJSON string, peerID string, ms int) string { if wakuNode == nil { return MakeJSONResponse(errWakuNodeNotReady) @@ -61,8 +94,6 @@ func StoreQuery(queryJSON string, peerID string, ms int) string { options = append(options, store.WithAutomaticPeerSelection()) } - reply := storeMessagesReply{} - var ctx context.Context var cancel context.CancelFunc @@ -73,32 +104,26 @@ func StoreQuery(queryJSON string, peerID string, ms int) string { ctx = context.Background() } - var contentTopics []string - for _, ct := range args.ContentFilters { - contentTopics = append(contentTopics, ct.ContentTopic) - } - - res, err := wakuNode.Store().Query( - ctx, - store.Query{ - Topic: args.Topic, - ContentTopics: contentTopics, - StartTime: args.StartTime, - EndTime: args.EndTime, - }, - options..., - ) - - if err != nil { - reply.Error = err.Error() - return PrepareJSONResponse(reply, nil) - } - reply.Messages = res.Messages - reply.PagingInfo = storePagingOptions{ - PageSize: args.PagingOptions.PageSize, - Cursor: res.Cursor(), - Forward: args.PagingOptions.Forward, - } - - return PrepareJSONResponse(reply, nil) + return queryResponse(ctx, args, options) +} + +func StoreLocalQuery(queryJSON string) string { + if wakuNode == nil { + return MakeJSONResponse(errWakuNodeNotReady) + } + + var args storeMessagesArgs + err := json.Unmarshal([]byte(queryJSON), &args) + if err != nil { + return MakeJSONResponse(err) + } + + options := []store.HistoryRequestOption{ + store.WithAutomaticRequestId(), + store.WithPaging(args.PagingOptions.Forward, args.PagingOptions.PageSize), + store.WithCursor(args.PagingOptions.Cursor), + store.WithLocalQuery(), + } + + return queryResponse(context.TODO(), args, options) } diff --git a/waku/db.go b/waku/db.go index 42b96442..d3fc3f1c 100644 --- a/waku/db.go +++ b/waku/db.go @@ -18,7 +18,7 @@ func validateDBUrl(val string) error { return nil } -func extractDBAndMigration(databaseURL string) (*sql.DB, func(*sql.DB) error, error) { +func ExtractDBAndMigration(databaseURL string) (*sql.DB, func(*sql.DB) error, error) { var db *sql.DB var migrationFn func(*sql.DB) error var err error diff --git a/waku/node.go b/waku/node.go index b393af9f..08322738 100644 --- a/waku/node.go +++ b/waku/node.go @@ -81,7 +81,7 @@ func Execute(options Options) { var db *sql.DB var migrationFn func(*sql.DB) error if options.Store.Enable { - db, migrationFn, err = extractDBAndMigration(options.Store.DatabaseURL) + db, migrationFn, err = ExtractDBAndMigration(options.Store.DatabaseURL) failOnErr(err, "Could not connect to DB") } diff --git a/waku/v2/protocol/store/waku_store_client.go b/waku/v2/protocol/store/waku_store_client.go index 527e5f21..6f63f459 100644 --- a/waku/v2/protocol/store/waku_store_client.go +++ b/waku/v2/protocol/store/waku_store_client.go @@ -82,6 +82,7 @@ type criteriaFN = func(msg *pb.WakuMessage) (bool, error) type HistoryRequestParameters struct { selectedPeer peer.ID + localQuery bool requestId []byte cursor *pb.Index pageSize uint64 @@ -155,6 +156,12 @@ func WithPaging(asc bool, pageSize uint64) HistoryRequestOption { } } +func WithLocalQuery() HistoryRequestOption { + return func(params *HistoryRequestParameters) { + params.localQuery = true + } +} + // Default options to be used when querying a store node for results func DefaultOptions() []HistoryRequestOption { return []HistoryRequestOption{ @@ -197,7 +204,7 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec return nil, err } - historyResponseRPC := &pb.HistoryRPC{} + historyResponseRPC := &pb.HistoryRPC{RequestId: historyRequest.RequestId} err = reader.ReadMsg(historyResponseRPC) if err != nil { logger.Error("reading response", zap.Error(err)) @@ -217,6 +224,29 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec return historyResponseRPC.Response, nil } +func (store *WakuStore) localQuery(query *pb.HistoryQuery, requestId []byte) (*pb.HistoryResponse, error) { + logger := store.log + logger.Info("querying local message history") + + if !store.started { + return nil, errors.New("not running local store") + } + + historyResponseRPC := &pb.HistoryRPC{ + RequestId: hex.EncodeToString(requestId), + Response: store.FindMessages(query), + } + + if historyResponseRPC.Response == nil { + // Empty response + return &pb.HistoryResponse{ + PagingInfo: &pb.PagingInfo{}, + }, nil + } + + return historyResponseRPC.Response, nil +} + func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error) { q := &pb.HistoryQuery{ PubsubTopic: query.Topic, @@ -243,7 +273,7 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR opt(params) } - if params.selectedPeer == "" { + if !params.localQuery && params.selectedPeer == "" { return nil, ErrNoPeersAvailable } @@ -267,7 +297,14 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR } q.PagingInfo.PageSize = pageSize - response, err := store.queryFrom(ctx, q, params.selectedPeer, params.requestId) + var response *pb.HistoryResponse + var err error + + if params.localQuery { + response, err = store.localQuery(q, params.requestId) + } else { + response, err = store.queryFrom(ctx, q, params.selectedPeer, params.requestId) + } if err != nil { return nil, err } diff --git a/waku/v2/protocol/store/waku_store_protocol_test.go b/waku/v2/protocol/store/waku_store_protocol_test.go index e7d3363b..85710ed5 100644 --- a/waku/v2/protocol/store/waku_store_protocol_test.go +++ b/waku/v2/protocol/store/waku_store_protocol_test.go @@ -3,6 +3,7 @@ package store import ( "context" "testing" + "time" "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p/core/peerstore" @@ -63,6 +64,45 @@ func TestWakuStoreProtocolQuery(t *testing.T) { require.Equal(t, msg, response.Messages[0]) } +func TestWakuStoreProtocolLocalQuery(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0")) + require.NoError(t, err) + + s1 := NewWakuStore(host1, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger()) + err = s1.Start(ctx) + require.NoError(t, err) + + defer s1.Stop() + + topic1 := "1" + pubsubTopic1 := "topic1" + + msg := &pb.WakuMessage{ + Payload: []byte{1, 2, 3}, + ContentTopic: topic1, + Version: 0, + Timestamp: utils.GetUnixEpoch(), + } + + // Simulate a message has been received via relay protocol + s1.MsgC <- protocol.NewEnvelope(msg, utils.GetUnixEpoch(), pubsubTopic1) + + time.Sleep(100 * time.Millisecond) + + q := Query{ + Topic: pubsubTopic1, + ContentTopics: []string{topic1}, + } + response, err := s1.Query(ctx, q, WithLocalQuery()) + + require.NoError(t, err) + require.Len(t, response.Messages, 1) + require.Equal(t, msg, response.Messages[0]) +} + func TestWakuStoreProtocolNext(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel()