diff --git a/cmd/storeverif/flags.go b/cmd/storeverif/flags.go index 197e3da..030cc6a 100644 --- a/cmd/storeverif/flags.go +++ b/cmd/storeverif/flags.go @@ -1,6 +1,8 @@ package main import ( + "time" + cli "github.com/urfave/cli/v2" "github.com/urfave/cli/v2/altsrc" "github.com/waku-org/go-waku/waku/cliutils" @@ -40,6 +42,13 @@ var cliFlags = []cli.Flag{ Destination: &options.DatabaseURL, EnvVars: []string{"MSG_VERIF_DB_URL"}, }), + altsrc.NewDurationFlag(&cli.DurationFlag{ + Name: "retention-policy", + Usage: "Retention policy. ", + Destination: &options.RetentionPolicy, + Value: 15 * 24 * time.Hour, + EnvVars: []string{"MSGVERIF_RETENTION_POLICY"}, + }), cliutils.NewGenericFlagSingleValue(&cli.GenericFlag{ Name: "log-level", Aliases: []string{"l"}, diff --git a/cmd/storeverif/options.go b/cmd/storeverif/options.go index 7aa6823..33b4f0d 100644 --- a/cmd/storeverif/options.go +++ b/cmd/storeverif/options.go @@ -1,16 +1,19 @@ package main import ( + "time" + "github.com/multiformats/go-multiaddr" "github.com/urfave/cli/v2" ) type Options struct { - LogLevel string - LogEncoding string - LogOutput string - ClusterID uint - PubSubTopics cli.StringSlice - DatabaseURL string - StoreNodes []multiaddr.Multiaddr + LogLevel string + LogEncoding string + LogOutput string + ClusterID uint + PubSubTopics cli.StringSlice + DatabaseURL string + RetentionPolicy time.Duration + StoreNodes []multiaddr.Multiaddr } diff --git a/internal/persistence/database.go b/internal/persistence/database.go index 9d31e36..a118e71 100644 --- a/internal/persistence/database.go +++ b/internal/persistence/database.go @@ -13,8 +13,9 @@ import ( // DBStore is a MessageProvider that has a *sql.DB connection type DBStore struct { - db *sql.DB - migrationFn func(db *sql.DB, logger *zap.Logger) error + db *sql.DB + migrationFn func(db *sql.DB, logger *zap.Logger) error + retentionPolicy time.Duration timesource timesource.Timesource log *zap.Logger @@ -36,6 +37,13 @@ func WithDB(db *sql.DB) DBOption { } } +func WithRetentionPolicy(duration time.Duration) DBOption { + return func(d *DBStore) error { + d.retentionPolicy = duration + return nil + } +} + // ConnectionPoolOptions is the options to be used for DB connection pooling type ConnectionPoolOptions struct { MaxOpenConnections int @@ -115,6 +123,8 @@ func (d *DBStore) Start(ctx context.Context, timesource timesource.Timesource) e d.cancel = cancel d.timesource = timesource + d.log.Info("Using db retention policy", zap.String("duration", d.retentionPolicy.String())) + err := d.cleanOlderRecords(ctx) if err != nil { return err @@ -129,7 +139,7 @@ func (d *DBStore) Start(ctx context.Context, timesource timesource.Timesource) e func (d *DBStore) cleanOlderRecords(ctx context.Context) error { d.log.Debug("cleaning older records...") - deleteFrom := time.Now().Add(-14 * 24 * time.Hour).UnixNano() + deleteFrom := time.Now().Add(d.retentionPolicy).UnixNano() _, err := d.db.ExecContext(ctx, "DELETE FROM missingMessages WHERE storedAt < $1", deleteFrom) if err != nil { return err