From ee2af4646cc1d3408eaf52ad7dd5fa926455eb00 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Tue, 8 Aug 2023 11:46:32 -0400 Subject: [PATCH] feat: sqlite vacuum and optional migrations --- cmd/waku/flags.go | 13 +++++++++++++ cmd/waku/main.go | 2 ++ cmd/waku/node.go | 18 +++++++++++++----- cmd/waku/options.go | 2 ++ cmd/waku/rest/utils_test.go | 2 +- library/node.go | 2 +- waku/persistence/sqlite/sqlite.go | 14 +++++++++++++- waku/persistence/utils/db.go | 12 ++++++++++-- waku/v2/node/connectedness_test.go | 2 +- waku/v2/node/wakunode2_test.go | 2 +- waku/v2/protocol/store/utils_test.go | 2 +- waku/v2/rendezvous/rendezvous_test.go | 2 +- 12 files changed, 59 insertions(+), 14 deletions(-) diff --git a/cmd/waku/flags.go b/cmd/waku/flags.go index cef87e8c..81ea1675 100644 --- a/cmd/waku/flags.go +++ b/cmd/waku/flags.go @@ -288,6 +288,19 @@ var ( Destination: &options.Store.DatabaseURL, EnvVars: []string{"WAKUNODE2_STORE_MESSAGE_DB_URL"}, }) + StoreMessageDBVacuum = altsrc.NewBoolFlag(&cli.BoolFlag{ + Name: "store-message-db-vacuum", + Usage: "Enable database vacuuming at start. Only supported by SQLite database engine.", + Destination: &options.Store.Vacuum, + EnvVars: []string{"WAKUNODE2_STORE_MESSAGE_DB_VACUUM"}, + }) + StoreMessageDBMigration = altsrc.NewBoolFlag(&cli.BoolFlag{ + Name: "store-message-db-migration", + Usage: "Enable database migration at start.", + Destination: &options.Store.Migration, + Value: true, + EnvVars: []string{"WAKUNODE2_STORE_MESSAGE_DB_MIGRATION"}, + }) StoreResumePeer = cliutils.NewGenericFlagMultiValue(&cli.GenericFlag{ Name: "store-resume-peer", Usage: "Peer multiaddress to resume the message store at boot. Option may be repeated", diff --git a/cmd/waku/main.go b/cmd/waku/main.go index 805e620d..b84c66a9 100644 --- a/cmd/waku/main.go +++ b/cmd/waku/main.go @@ -56,6 +56,8 @@ func main() { StoreMessageDBURL, StoreMessageRetentionTime, StoreMessageRetentionCapacity, + StoreMessageDBVacuum, + StoreMessageDBMigration, StoreResumePeer, FilterFlag, FilterNode, diff --git a/cmd/waku/node.go b/cmd/waku/node.go index 6035353c..f9d779db 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -96,8 +96,11 @@ func Execute(options NodeOptions) { var db *sql.DB var migrationFn func(*sql.DB) error - if requiresDB(options) { - db, migrationFn, err = dbutils.ExtractDBAndMigration(options.Store.DatabaseURL) + if requiresDB(options) && options.Store.Migration { + dbSettings := dbutils.DBSettings{ + SQLiteVacuum: options.Store.Vacuum, + } + db, migrationFn, err = dbutils.ExtractDBAndMigration(options.Store.DatabaseURL, dbSettings, logger) failOnErr(err, "Could not connect to DB") } @@ -211,11 +214,16 @@ func Execute(options NodeOptions) { var dbStore *persistence.DBStore if requiresDB(options) { - dbStore, err = persistence.NewDBStore(logger, + dbOptions := []persistence.DBOption{ persistence.WithDB(db), - persistence.WithMigrations(migrationFn), // TODO: refactor migrations out of DBStore, or merge DBStore with rendezvous DB persistence.WithRetentionPolicy(options.Store.RetentionMaxMessages, options.Store.RetentionTime), - ) + } + + if options.Store.Migration { + dbOptions = append(dbOptions, persistence.WithMigrations(migrationFn)) // TODO: refactor migrations out of DBStore, or merge DBStore with rendezvous DB + } + + dbStore, err = persistence.NewDBStore(logger, dbOptions...) failOnErr(err, "DBStore") nodeOpts = append(nodeOpts, node.WithMessageProvider(dbStore)) } diff --git a/cmd/waku/options.go b/cmd/waku/options.go index c5a23e61..d21a9930 100644 --- a/cmd/waku/options.go +++ b/cmd/waku/options.go @@ -81,6 +81,8 @@ type StoreOptions struct { RetentionMaxMessages int ResumeNodes []multiaddr.Multiaddr Nodes []multiaddr.Multiaddr + Vacuum bool + Migration bool } // DNSDiscoveryOptions are settings used for enabling DNS-based discovery diff --git a/cmd/waku/rest/utils_test.go b/cmd/waku/rest/utils_test.go index fe3fdd18..020eccb4 100644 --- a/cmd/waku/rest/utils_test.go +++ b/cmd/waku/rest/utils_test.go @@ -12,7 +12,7 @@ import ( func MemoryDB(t *testing.T) *persistence.DBStore { var db *sql.DB - db, migration, err := sqlite.NewDB(":memory:") + db, migration, err := sqlite.NewDB(":memory:", false, utils.Logger()) require.NoError(t, err) dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migration)) diff --git a/library/node.go b/library/node.go index e8352c9a..fb1accfd 100644 --- a/library/node.go +++ b/library/node.go @@ -131,7 +131,7 @@ func NewNode(configJSON string) error { if *config.EnableStore { var db *sql.DB var migrationFn func(*sql.DB) error - db, migrationFn, err = dbutils.ExtractDBAndMigration(*config.DatabaseURL) + db, migrationFn, err = dbutils.ExtractDBAndMigration(*config.DatabaseURL, dbutils.DBSettings{SQLiteVacuum: true}, utils.Logger()) if err != nil { return err } diff --git a/waku/persistence/sqlite/sqlite.go b/waku/persistence/sqlite/sqlite.go index 947e9d56..675d7ee1 100644 --- a/waku/persistence/sqlite/sqlite.go +++ b/waku/persistence/sqlite/sqlite.go @@ -11,6 +11,7 @@ import ( "github.com/waku-org/go-waku/waku/persistence" "github.com/waku-org/go-waku/waku/persistence/migrate" "github.com/waku-org/go-waku/waku/persistence/sqlite/migrations" + "go.uber.org/zap" ) func addSqliteURLDefaults(dburl string) string { @@ -56,7 +57,7 @@ func WithDB(dburl string, migrate bool) persistence.DBOption { } // NewDB creates a sqlite3 DB in the specified path -func NewDB(dburl string) (*sql.DB, func(*sql.DB) error, error) { +func NewDB(dburl string, shouldVacuum bool, logger *zap.Logger) (*sql.DB, func(*sql.DB) error, error) { db, err := sql.Open("sqlite3", addSqliteURLDefaults(dburl)) if err != nil { return nil, nil, err @@ -65,6 +66,17 @@ func NewDB(dburl string) (*sql.DB, func(*sql.DB) error, error) { // Disable concurrent access as not supported by the driver db.SetMaxOpenConns(1) + logger.Info("starting sqlite database vacuuming") + + if shouldVacuum { + _, err := db.Exec("VACUUM") + if err != nil { + return nil, nil, err + } + } + + logger.Info("finished sqlite database vacuuming") + return db, Migrate, nil } diff --git a/waku/persistence/utils/db.go b/waku/persistence/utils/db.go index 5d58ee62..2832cf99 100644 --- a/waku/persistence/utils/db.go +++ b/waku/persistence/utils/db.go @@ -8,6 +8,7 @@ import ( "github.com/waku-org/go-waku/waku/persistence/postgres" "github.com/waku-org/go-waku/waku/persistence/sqlite" + "go.uber.org/zap" ) func validateDBUrl(val string) error { @@ -18,12 +19,19 @@ func validateDBUrl(val string) error { return nil } +// DBSettings hold db specific configuration settings required during the db initialization +type DBSettings struct { + SQLiteVacuum bool +} + // ExtractDBAndMigration will return a database connection, and migration function that should be used depending on a database connection string -func ExtractDBAndMigration(databaseURL string) (*sql.DB, func(*sql.DB) error, error) { +func ExtractDBAndMigration(databaseURL string, dbSettings DBSettings, logger *zap.Logger) (*sql.DB, func(*sql.DB) error, error) { var db *sql.DB var migrationFn func(*sql.DB) error var err error + logger = logger.Named("db-setup") + dbURL := "" if databaseURL != "" { err := validateDBUrl(databaseURL) @@ -41,7 +49,7 @@ func ExtractDBAndMigration(databaseURL string) (*sql.DB, func(*sql.DB) error, er dbParams := dbURLParts[1] switch dbEngine { case "sqlite3": - db, migrationFn, err = sqlite.NewDB(dbParams) + db, migrationFn, err = sqlite.NewDB(dbParams, dbSettings.SQLiteVacuum, logger) case "postgresql": db, migrationFn, err = postgres.NewDB(dbURL) default: diff --git a/waku/v2/node/connectedness_test.go b/waku/v2/node/connectedness_test.go index a1b8a57f..3a199819 100644 --- a/waku/v2/node/connectedness_test.go +++ b/waku/v2/node/connectedness_test.go @@ -69,7 +69,7 @@ func TestConnectionStatusChanges(t *testing.T) { err = node2.Start(ctx) require.NoError(t, err) - db, migration, err := sqlite.NewDB(":memory:") + db, migration, err := sqlite.NewDB(":memory:", false, utils.Logger()) require.NoError(t, err) dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migration)) require.NoError(t, err) diff --git a/waku/v2/node/wakunode2_test.go b/waku/v2/node/wakunode2_test.go index b7c48a15..a38cd978 100644 --- a/waku/v2/node/wakunode2_test.go +++ b/waku/v2/node/wakunode2_test.go @@ -230,7 +230,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) { subs.Unsubscribe() // NODE2: Filter Client/Store - db, migration, err := sqlite.NewDB(":memory:") + db, migration, err := sqlite.NewDB(":memory:", false, utils.Logger()) require.NoError(t, err) dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migration)) require.NoError(t, err) diff --git a/waku/v2/protocol/store/utils_test.go b/waku/v2/protocol/store/utils_test.go index 24121b3c..b9914bf4 100644 --- a/waku/v2/protocol/store/utils_test.go +++ b/waku/v2/protocol/store/utils_test.go @@ -12,7 +12,7 @@ import ( func MemoryDB(t *testing.T) *persistence.DBStore { var db *sql.DB - db, migration, err := sqlite.NewDB(":memory:") + db, migration, err := sqlite.NewDB(":memory:", false, utils.Logger()) require.NoError(t, err) dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migration)) diff --git a/waku/v2/rendezvous/rendezvous_test.go b/waku/v2/rendezvous/rendezvous_test.go index 671d5a09..b0350730 100644 --- a/waku/v2/rendezvous/rendezvous_test.go +++ b/waku/v2/rendezvous/rendezvous_test.go @@ -47,7 +47,7 @@ func TestRendezvous(t *testing.T) { require.NoError(t, err) var db *sql.DB - db, migration, err := sqlite.NewDB(":memory:") + db, migration, err := sqlite.NewDB(":memory:", false, utils.Logger()) require.NoError(t, err) err = migration(db)