feat(c-bindings): expose store protocol

This commit is contained in:
Richard Ramos 2023-02-10 16:17:23 -04:00 committed by RichΛrd
parent eb9e727b1b
commit 11161b8919
9 changed files with 242 additions and 44 deletions

View File

@ -277,6 +277,10 @@ interface JsonConfig {
discV5?: boolean;
discV5BootstrapNodes?: Array<string>;
discV5UDPPort?: number;
store?: boolean;
databaseURL?: string;
storeRetentionMaxMessages?: number;
storeRetentionTimeSeconds?: number;
}
```
@ -311,6 +315,15 @@ If a key is `undefined`, or `null`, a default value will be set.
- `discV5BootstrapNodes`: Array of bootstrap nodes ENR
- `discV5UDPPort`: UDP port for DiscoveryV5
Default `9000`
- `store`: Enable store protocol to persist message history
Default `false`
- `databaseURL`: url connection string. Accepts SQLite and PostgreSQL connection strings
Default: `sqlite3://store.db`
- `storeRetentionMaxMessages`: max number of messages to store in the database.
Default `10000`
- `storeRetentionTimeSeconds`: max number of seconds that a message will be persisted in the database.
Default `2592000` (30d)
For example:
```json

View File

@ -34,6 +34,10 @@ func main() {}
// - discV5BootstrapNodes: Array of bootstrap nodes ENR
// - discV5UDPPort: UDP port for DiscoveryV5
// - logLevel: Set the log level. Default `INFO`. Allowed values "DEBUG", "INFO", "WARN", "ERROR", "DPANIC", "PANIC", "FATAL"
// - store: Enable Store. Default `false`
// - databaseURL: url connection string. Default: "sqlite3://store.db". Also accepts PostgreSQL connection strings
// - storeRetentionMaxMessages: max number of messages to store in the database. Default 10000
// - storeRetentionTimeSeconds: max number of seconds that a message will be persisted in the database. Default 2592000 (30d)
//
//export waku_new
func waku_new(configJSON *C.char) *C.char {

View File

@ -40,3 +40,36 @@ func waku_store_query(queryJSON *C.char, peerID *C.char, ms C.int) *C.char {
response := mobile.StoreQuery(C.GoString(queryJSON), C.GoString(peerID), int(ms))
return C.CString(response)
}
// Query historic messages stored in the localDB using waku store protocol.
// queryJSON must contain a valid json string with the following format:
//
// {
// "pubsubTopic": "...", // optional string
// "startTime": 1234, // optional, unix epoch time in nanoseconds
// "endTime": 1234, // optional, unix epoch time in nanoseconds
// "contentFilters": [ // optional
// {
// contentTopic: "contentTopic1"
// }, ...
// ],
// "pagingOptions": {// optional pagination information
// "pageSize": 40, // number
// "cursor": { // optional
// "digest": ...,
// "receiverTime": ...,
// "senderTime": ...,
// "pubsubTopic" ...,
// }
// "forward": true, // sort order
// }
// }
//
// If a non empty cursor is returned, this function should be executed again, setting the `cursor` attribute with the cursor returned in the response
// Requires the `store` option to be passed when setting up the initial configuration
//
//export waku_store_local_query
func waku_store_local_query(queryJSON *C.char) *C.char {
response := mobile.StoreLocalQuery(C.GoString(queryJSON))
return C.CString(response)
}

View File

@ -7,6 +7,7 @@ import (
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"database/sql"
"encoding/hex"
"encoding/json"
"errors"
@ -14,7 +15,7 @@ import (
"net"
"time"
logging "github.com/ipfs/go-log/v2"
"go.uber.org/zap/zapcore"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
@ -22,6 +23,8 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku"
"github.com/waku-org/go-waku/waku/persistence"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/payload"
"github.com/waku-org/go-waku/waku/v2/protocol"
@ -52,11 +55,15 @@ type wakuConfig struct {
KeepAliveInterval *int `json:"keepAliveInterval,omitempty"`
EnableRelay *bool `json:"relay"`
RelayTopics []string `json:"relayTopics,omitempty"`
EnableFilter *bool `json:"filter"`
MinPeersToPublish *int `json:"minPeersToPublish"`
EnableDiscV5 *bool `json:"discV5"`
DiscV5BootstrapNodes []string `json:"discV5BootstrapNodes"`
DiscV5UDPPort *uint `json:"discV5UDPPort"`
EnableFilter *bool `json:"filter,omitempty"`
MinPeersToPublish *int `json:"minPeersToPublish,omitempty"`
EnableDiscV5 *bool `json:"discV5,omitempty"`
DiscV5BootstrapNodes []string `json:"discV5BootstrapNodes,omitempty"`
DiscV5UDPPort *uint `json:"discV5UDPPort,omitempty"`
EnableStore *bool `json:"store,omitempty"`
DatabaseURL *string `json:"databaseURL,omitempty"`
RetentionMaxMessages *int `json:"storeRetentionMaxMessages,omitempty"`
RetentionTimeSeconds *int `json:"storeRetentionTimeSeconds,omitempty"`
}
var defaultHost = "0.0.0.0"
@ -68,6 +75,10 @@ var defaultEnableFilter = false
var defaultEnableDiscV5 = false
var defaultDiscV5UDPPort = uint(9000)
var defaultLogLevel = "INFO"
var defaultEnableStore = false
var defaultDatabaseURL = "sqlite3://store.db"
var defaultRetentionMaxMessages = 10000
var defaultRetentionTimeSeconds = 30 * 24 * 60 * 60 // 30d
func getConfig(configJSON string) (wakuConfig, error) {
var config wakuConfig
@ -118,6 +129,22 @@ func getConfig(configJSON string) (wakuConfig, error) {
config.LogLevel = &defaultLogLevel
}
if config.EnableStore == nil {
config.EnableStore = &defaultEnableStore
}
if config.DatabaseURL == nil {
config.DatabaseURL = &defaultDatabaseURL
}
if config.RetentionMaxMessages == nil {
config.RetentionMaxMessages = &defaultRetentionMaxMessages
}
if config.RetentionTimeSeconds == nil {
config.RetentionTimeSeconds = &defaultRetentionTimeSeconds
}
return config, nil
}
@ -168,6 +195,25 @@ func NewNode(configJSON string) string {
opts = append(opts, node.WithWakuFilter(false))
}
if *config.EnableStore {
var db *sql.DB
var migrationFn func(*sql.DB) error
db, migrationFn, err = waku.ExtractDBAndMigration(*config.DatabaseURL)
if err != nil {
return MakeJSONResponse(err)
}
opts = append(opts, node.WithWakuStore())
dbStore, err := persistence.NewDBStore(utils.Logger(),
persistence.WithDB(db),
persistence.WithMigrations(migrationFn),
persistence.WithRetentionPolicy(*config.RetentionMaxMessages, time.Duration(*config.RetentionTimeSeconds)*time.Second),
)
if err != nil {
return MakeJSONResponse(err)
}
opts = append(opts, node.WithMessageProvider(dbStore))
}
if *config.EnableDiscV5 {
var bootnodes []*enode.Node
for _, addr := range config.DiscV5BootstrapNodes {
@ -182,12 +228,12 @@ func NewNode(configJSON string) string {
wakuRelayTopics = config.RelayTopics
// for go-libp2p loggers
lvl, err := logging.LevelFromString(*config.LogLevel)
lvl, err := zapcore.ParseLevel(*config.LogLevel)
if err != nil {
return MakeJSONResponse(err)
}
logging.SetAllLoggers(lvl)
opts = append(opts, node.WithLogLevel(lvl))
w, err := node.New(opts...)
if err != nil {

View File

@ -34,6 +34,39 @@ type storeMessagesReply struct {
Error string `json:"error,omitempty"`
}
func queryResponse(ctx context.Context, args storeMessagesArgs, options []store.HistoryRequestOption) string {
var contentTopics []string
for _, ct := range args.ContentFilters {
contentTopics = append(contentTopics, ct.ContentTopic)
}
res, err := wakuNode.Store().Query(
ctx,
store.Query{
Topic: args.Topic,
ContentTopics: contentTopics,
StartTime: args.StartTime,
EndTime: args.EndTime,
},
options...,
)
reply := storeMessagesReply{}
if err != nil {
reply.Error = err.Error()
return PrepareJSONResponse(reply, nil)
}
reply.Messages = res.Messages
reply.PagingInfo = storePagingOptions{
PageSize: args.PagingOptions.PageSize,
Cursor: res.Cursor(),
Forward: args.PagingOptions.Forward,
}
return PrepareJSONResponse(reply, nil)
}
func StoreQuery(queryJSON string, peerID string, ms int) string {
if wakuNode == nil {
return MakeJSONResponse(errWakuNodeNotReady)
@ -61,8 +94,6 @@ func StoreQuery(queryJSON string, peerID string, ms int) string {
options = append(options, store.WithAutomaticPeerSelection())
}
reply := storeMessagesReply{}
var ctx context.Context
var cancel context.CancelFunc
@ -73,32 +104,26 @@ func StoreQuery(queryJSON string, peerID string, ms int) string {
ctx = context.Background()
}
var contentTopics []string
for _, ct := range args.ContentFilters {
contentTopics = append(contentTopics, ct.ContentTopic)
}
res, err := wakuNode.Store().Query(
ctx,
store.Query{
Topic: args.Topic,
ContentTopics: contentTopics,
StartTime: args.StartTime,
EndTime: args.EndTime,
},
options...,
)
if err != nil {
reply.Error = err.Error()
return PrepareJSONResponse(reply, nil)
}
reply.Messages = res.Messages
reply.PagingInfo = storePagingOptions{
PageSize: args.PagingOptions.PageSize,
Cursor: res.Cursor(),
Forward: args.PagingOptions.Forward,
}
return PrepareJSONResponse(reply, nil)
return queryResponse(ctx, args, options)
}
func StoreLocalQuery(queryJSON string) string {
if wakuNode == nil {
return MakeJSONResponse(errWakuNodeNotReady)
}
var args storeMessagesArgs
err := json.Unmarshal([]byte(queryJSON), &args)
if err != nil {
return MakeJSONResponse(err)
}
options := []store.HistoryRequestOption{
store.WithAutomaticRequestId(),
store.WithPaging(args.PagingOptions.Forward, args.PagingOptions.PageSize),
store.WithCursor(args.PagingOptions.Cursor),
store.WithLocalQuery(),
}
return queryResponse(context.TODO(), args, options)
}

View File

@ -18,7 +18,7 @@ func validateDBUrl(val string) error {
return nil
}
func extractDBAndMigration(databaseURL string) (*sql.DB, func(*sql.DB) error, error) {
func ExtractDBAndMigration(databaseURL string) (*sql.DB, func(*sql.DB) error, error) {
var db *sql.DB
var migrationFn func(*sql.DB) error
var err error

View File

@ -81,7 +81,7 @@ func Execute(options Options) {
var db *sql.DB
var migrationFn func(*sql.DB) error
if options.Store.Enable {
db, migrationFn, err = extractDBAndMigration(options.Store.DatabaseURL)
db, migrationFn, err = ExtractDBAndMigration(options.Store.DatabaseURL)
failOnErr(err, "Could not connect to DB")
}

View File

@ -82,6 +82,7 @@ type criteriaFN = func(msg *pb.WakuMessage) (bool, error)
type HistoryRequestParameters struct {
selectedPeer peer.ID
localQuery bool
requestId []byte
cursor *pb.Index
pageSize uint64
@ -155,6 +156,12 @@ func WithPaging(asc bool, pageSize uint64) HistoryRequestOption {
}
}
func WithLocalQuery() HistoryRequestOption {
return func(params *HistoryRequestParameters) {
params.localQuery = true
}
}
// Default options to be used when querying a store node for results
func DefaultOptions() []HistoryRequestOption {
return []HistoryRequestOption{
@ -197,7 +204,7 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec
return nil, err
}
historyResponseRPC := &pb.HistoryRPC{}
historyResponseRPC := &pb.HistoryRPC{RequestId: historyRequest.RequestId}
err = reader.ReadMsg(historyResponseRPC)
if err != nil {
logger.Error("reading response", zap.Error(err))
@ -217,6 +224,29 @@ func (store *WakuStore) queryFrom(ctx context.Context, q *pb.HistoryQuery, selec
return historyResponseRPC.Response, nil
}
func (store *WakuStore) localQuery(query *pb.HistoryQuery, requestId []byte) (*pb.HistoryResponse, error) {
logger := store.log
logger.Info("querying local message history")
if !store.started {
return nil, errors.New("not running local store")
}
historyResponseRPC := &pb.HistoryRPC{
RequestId: hex.EncodeToString(requestId),
Response: store.FindMessages(query),
}
if historyResponseRPC.Response == nil {
// Empty response
return &pb.HistoryResponse{
PagingInfo: &pb.PagingInfo{},
}, nil
}
return historyResponseRPC.Response, nil
}
func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryRequestOption) (*Result, error) {
q := &pb.HistoryQuery{
PubsubTopic: query.Topic,
@ -243,7 +273,7 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
opt(params)
}
if params.selectedPeer == "" {
if !params.localQuery && params.selectedPeer == "" {
return nil, ErrNoPeersAvailable
}
@ -267,7 +297,14 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
}
q.PagingInfo.PageSize = pageSize
response, err := store.queryFrom(ctx, q, params.selectedPeer, params.requestId)
var response *pb.HistoryResponse
var err error
if params.localQuery {
response, err = store.localQuery(q, params.requestId)
} else {
response, err = store.queryFrom(ctx, q, params.selectedPeer, params.requestId)
}
if err != nil {
return nil, err
}

View File

@ -3,6 +3,7 @@ package store
import (
"context"
"testing"
"time"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/peerstore"
@ -63,6 +64,45 @@ func TestWakuStoreProtocolQuery(t *testing.T) {
require.Equal(t, msg, response.Messages[0])
}
func TestWakuStoreProtocolLocalQuery(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
host1, err := libp2p.New(libp2p.DefaultTransports, libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"))
require.NoError(t, err)
s1 := NewWakuStore(host1, nil, MemoryDB(t), timesource.NewDefaultClock(), utils.Logger())
err = s1.Start(ctx)
require.NoError(t, err)
defer s1.Stop()
topic1 := "1"
pubsubTopic1 := "topic1"
msg := &pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: topic1,
Version: 0,
Timestamp: utils.GetUnixEpoch(),
}
// Simulate a message has been received via relay protocol
s1.MsgC <- protocol.NewEnvelope(msg, utils.GetUnixEpoch(), pubsubTopic1)
time.Sleep(100 * time.Millisecond)
q := Query{
Topic: pubsubTopic1,
ContentTopics: []string{topic1},
}
response, err := s1.Query(ctx, q, WithLocalQuery())
require.NoError(t, err)
require.Len(t, response.Messages, 1)
require.Equal(t, msg, response.Messages[0])
}
func TestWakuStoreProtocolNext(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()