From 1151ba292c3bb1c9ec147c67cb32f151b33ea26f Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Thu, 22 Aug 2024 11:05:56 -0400 Subject: [PATCH] chore: add fleet name --- cmd/storemsgcounter/execute.go | 19 ++++--- cmd/storemsgcounter/flags.go | 6 +++ cmd/storemsgcounter/options.go | 1 + internal/persistence/database.go | 50 +++++++------------ .../postgres/migrations/bindata.go | 24 +++++++++ .../postgres/migrations/sql/4_fleet.up.sql | 3 ++ 6 files changed, 64 insertions(+), 39 deletions(-) create mode 100644 internal/persistence/postgres/migrations/sql/4_fleet.up.sql diff --git a/cmd/storemsgcounter/execute.go b/cmd/storemsgcounter/execute.go index 8891885..9925ff8 100644 --- a/cmd/storemsgcounter/execute.go +++ b/cmd/storemsgcounter/execute.go @@ -71,7 +71,14 @@ func Execute(ctx context.Context, options Options) error { return err } - dbStore, err := persistence.NewDBStore(logger, persistence.WithDB(db), persistence.WithMigrations(migrationFn), persistence.WithRetentionPolicy(options.RetentionPolicy)) + dbStore, err := persistence.NewDBStore( + options.ClusterID, + options.FleetName, + logger, + persistence.WithDB(db), + persistence.WithMigrations(migrationFn), + persistence.WithRetentionPolicy(options.RetentionPolicy), + ) if err != nil { return err } @@ -230,7 +237,7 @@ func (app *Application) verifyHistory(ctx context.Context, runId string, storeno msgPubsubTopic = make(map[pb.MessageHash]string) msgMapLock.Unlock() - topicSyncStatus, err := app.db.GetTopicSyncStatus(ctx, options.ClusterID, options.PubSubTopics.Value()) + topicSyncStatus, err := app.db.GetTopicSyncStatus(ctx, options.PubSubTopics.Value()) if err != nil { return false, err } @@ -355,7 +362,7 @@ func (app *Application) verifyHistory(ctx context.Context, runId string, storeno if len(missingIn) != 0 { logger.Info("missing message identified", zap.Stringer("hash", msgHash), zap.String("pubsubTopic", msgPubsubTopic[msgHash]), zap.Int("num_nodes", len(missingIn))) - err := app.db.RecordMessage(runId, tx, msgHash, options.ClusterID, msgPubsubTopic[msgHash], missingIn, "does_not_exist") + err := app.db.RecordMessage(runId, tx, msgHash, msgPubsubTopic[msgHash], missingIn, "does_not_exist") if err != nil { return false, err } @@ -364,7 +371,7 @@ func (app *Application) verifyHistory(ctx context.Context, runId string, storeno if len(unknownIn) != 0 { logger.Info("message with unknown state identified", zap.Stringer("hash", msgHash), zap.String("pubsubTopic", msgPubsubTopic[msgHash]), zap.Int("num_nodes", len(missingIn))) - err = app.db.RecordMessage(runId, tx, msgHash, options.ClusterID, msgPubsubTopic[msgHash], unknownIn, "unknown") + err = app.db.RecordMessage(runId, tx, msgHash, msgPubsubTopic[msgHash], unknownIn, "unknown") if err != nil { return false, err } @@ -397,7 +404,7 @@ func (app *Application) checkMissingMessageStatus(ctx context.Context, storenode // Get all messages whose status is missing or does not exist, and the column found_on_recheck is false // if found, set found_on_recheck to true - missingMessages, err := app.db.GetMissingMessages(from, to, options.ClusterID) + missingMessages, err := app.db.GetMissingMessages(from, to) if err != nil { return err } @@ -562,7 +569,7 @@ func (app *Application) retrieveHistory(ctx context.Context, runId string, store wg.Wait() // Update db with last sync time - err := app.db.UpdateTopicSyncState(tx, options.ClusterID, topic, endTime) + err := app.db.UpdateTopicSyncState(tx, topic, endTime) if err != nil { logger.Panic("could not update topic sync state", zap.Error(err)) } diff --git a/cmd/storemsgcounter/flags.go b/cmd/storemsgcounter/flags.go index ebcdf38..d6a78c5 100644 --- a/cmd/storemsgcounter/flags.go +++ b/cmd/storemsgcounter/flags.go @@ -27,6 +27,12 @@ var cliFlags = []cli.Flag{ EnvVars: []string{"STORE_MSG_CTR_ADDRESS"}, }), &cli.StringFlag{Name: "config-file", Usage: "loads configuration from a TOML file (cmd-line parameters take precedence)"}, + altsrc.NewStringFlag(&cli.StringFlag{ + Name: "fleet-name", + Usage: "Fleet name", + Destination: &options.FleetName, + EnvVars: []string{"STORE_MSG_CTR_FLEET_NAME"}, + }), cliutils.NewGenericFlagMultiValue(&cli.GenericFlag{ Name: "storenode", Usage: "Multiaddr of peers that supports storeV3 protocol. Option may be repeated", diff --git a/cmd/storemsgcounter/options.go b/cmd/storemsgcounter/options.go index f7a63a8..fddbc41 100644 --- a/cmd/storemsgcounter/options.go +++ b/cmd/storemsgcounter/options.go @@ -13,6 +13,7 @@ type Options struct { LogLevel string LogEncoding string LogOutput string + FleetName string ClusterID uint PubSubTopics cli.StringSlice DatabaseURL string diff --git a/internal/persistence/database.go b/internal/persistence/database.go index 6604da8..bff69a5 100644 --- a/internal/persistence/database.go +++ b/internal/persistence/database.go @@ -20,6 +20,9 @@ type DBStore struct { migrationFn func(db *sql.DB, logger *zap.Logger) error retentionPolicy time.Duration + clusterID uint + fleetName string + timesource timesource.Timesource log *zap.Logger @@ -95,9 +98,11 @@ func DefaultOptions() []DBOption { // Creates a new DB store using the db specified via options. // It will run migrations if enabled // clean up records according to the retention policy used -func NewDBStore(log *zap.Logger, options ...DBOption) (*DBStore, error) { +func NewDBStore(clusterID uint, fleetName string, log *zap.Logger, options ...DBOption) (*DBStore, error) { result := new(DBStore) result.log = log.Named("dbstore") + result.clusterID = clusterID + result.fleetName = fleetName optList := DefaultOptions() optList = append(optList, options...) @@ -205,14 +210,14 @@ func (d *DBStore) GetTrx(ctx context.Context) (*sql.Tx, error) { return d.db.BeginTx(ctx, &sql.TxOptions{}) } -func (d *DBStore) GetTopicSyncStatus(ctx context.Context, clusterID uint, pubsubTopics []string) (map[string]*time.Time, error) { +func (d *DBStore) GetTopicSyncStatus(ctx context.Context, pubsubTopics []string) (map[string]*time.Time, error) { result := make(map[string]*time.Time) for _, topic := range pubsubTopics { result[topic] = nil } sqlQuery := `SELECT pubsubTopic, lastSyncTimestamp FROM syncTopicStatus WHERE clusterId = $1` - rows, err := d.db.QueryContext(ctx, sqlQuery, clusterID) + rows, err := d.db.QueryContext(ctx, sqlQuery, d.clusterID) if err != nil { return nil, err } @@ -239,8 +244,8 @@ func (d *DBStore) GetTopicSyncStatus(ctx context.Context, clusterID uint, pubsub return result, nil } -func (d *DBStore) GetMissingMessages(from time.Time, to time.Time, clusterID uint) (map[peer.ID][]pb.MessageHash, error) { - rows, err := d.db.Query("SELECT messageHash, storenode FROM missingMessages WHERE storedAt >= $1 AND storedAt <= $2 AND clusterId = $3 AND msgStatus = 'does_not_exist' AND foundOnRecheck = false", from.UnixNano(), to.UnixNano(), clusterID) +func (d *DBStore) GetMissingMessages(from time.Time, to time.Time) (map[peer.ID][]pb.MessageHash, error) { + rows, err := d.db.Query("SELECT messageHash, storenode FROM missingMessages WHERE storedAt >= $1 AND storedAt <= $2 AND clusterId = $3 AND msgStatus = 'does_not_exist' AND foundOnRecheck = false", from.UnixNano(), to.UnixNano(), d.clusterID) if err != nil { return nil, err } @@ -273,21 +278,12 @@ func (d *DBStore) GetMissingMessages(from time.Time, to time.Time, clusterID uin return results, nil } -func (d *DBStore) UpdateTopicSyncState(tx *sql.Tx, clusterID uint, topic string, lastSyncTimestamp time.Time) error { - stmt, err := tx.Prepare("INSERT INTO syncTopicStatus(clusterId, pubsubTopic, lastSyncTimestamp) VALUES ($1, $2, $3) ON CONFLICT(clusterId, pubsubTopic) DO UPDATE SET lastSyncTimestamp = $4") - if err != nil { - return err - } - - _, err = stmt.Exec(clusterID, topic, lastSyncTimestamp.UnixNano(), lastSyncTimestamp.UnixNano()) - if err != nil { - return err - } - - return stmt.Close() +func (d *DBStore) UpdateTopicSyncState(tx *sql.Tx, topic string, lastSyncTimestamp time.Time) error { + _, err := tx.Exec("INSERT INTO syncTopicStatus(clusterId, pubsubTopic, lastSyncTimestamp) VALUES ($1, $2, $3) ON CONFLICT(clusterId, pubsubTopic) DO UPDATE SET lastSyncTimestamp = $4", d.clusterID, topic, lastSyncTimestamp.UnixNano(), lastSyncTimestamp.UnixNano()) + return err } -func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, clusterID uint, topic string, storenodes []peer.ID, status string) error { +func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, topic string, storenodes []peer.ID, status string) error { stmt, err := tx.Prepare("INSERT INTO missingMessages(runId, clusterId, pubsubTopic, messageHash, storenode, msgStatus, storedAt) VALUES ($1, $2, $3, $4, $5, $6, $7)") if err != nil { return err @@ -296,11 +292,10 @@ func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, now := time.Now().UnixNano() for _, s := range storenodes { - _, err := stmt.Exec(uuid, clusterID, topic, msgHash.String(), s, status, now) + _, err := stmt.Exec(uuid, d.clusterID, topic, msgHash.String(), s, status, now) if err != nil { return err } - } return nil @@ -331,19 +326,8 @@ func (d *DBStore) MarkMessagesAsFound(peerID peer.ID, messageHashes []pb.Message } func (d *DBStore) RecordStorenodeUnavailable(uuid string, storenode peer.ID) error { - stmt, err := d.db.Prepare("INSERT INTO storeNodeUnavailable(runId, storenode, requestTime) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING") - if err != nil { - return err - } - defer stmt.Close() - - now := time.Now().UnixNano() - _, err = stmt.Exec(uuid, storenode, now) - if err != nil { - return err - } - - return nil + _, err := d.db.Exec("INSERT INTO storeNodeUnavailable(runId, storenode, requestTime) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING", uuid, storenode, time.Now().UnixNano()) + return err } func (d *DBStore) CountMissingMessages(from time.Time, to time.Time, clusterID uint) (map[peer.ID]int, error) { diff --git a/internal/persistence/postgres/migrations/bindata.go b/internal/persistence/postgres/migrations/bindata.go index e6ea36a..ab8c935 100644 --- a/internal/persistence/postgres/migrations/bindata.go +++ b/internal/persistence/postgres/migrations/bindata.go @@ -3,6 +3,7 @@ // 1_setup.up.sql (856B) // 2_timestamp.up.sql (53B) // 3_found.up.sql (86B) +// 4_fleet.up.sql (158B) // doc.go (74B) package migrations @@ -132,6 +133,26 @@ func _3_foundUpSql() (*asset, error) { return a, nil } +var __4_fleetUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\xf4\x09\x71\x0d\x52\x08\x71\x74\xf2\x71\x55\xc8\xcd\x2c\x2e\xce\xcc\x4b\xf7\x4d\x2d\x2e\x4e\x4c\x4f\x2d\x56\x70\x74\x71\x51\x70\xf6\xf7\x09\xf5\xf5\x53\x48\xcb\x49\x4d\x2d\x51\x08\x71\x8d\x08\xb1\xe6\x42\xd6\x52\x5c\x92\x5f\x94\xea\x97\x9f\x92\x1a\x9a\x97\x58\x96\x98\x99\x93\x98\x94\x93\x4a\x94\xbe\xca\xbc\xe4\x90\xfc\x82\xcc\xe4\xe0\x92\xc4\x92\x52\x9c\x56\x01\x02\x00\x00\xff\xff\x89\x58\xdc\x2d\x9e\x00\x00\x00") + +func _4_fleetUpSqlBytes() ([]byte, error) { + return bindataRead( + __4_fleetUpSql, + "4_fleet.up.sql", + ) +} + +func _4_fleetUpSql() (*asset, error) { + bytes, err := _4_fleetUpSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "4_fleet.up.sql", size: 158, mode: os.FileMode(0664), modTime: time.Unix(1724337954, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xae, 0x83, 0x6f, 0x67, 0x41, 0xfe, 0xb8, 0xd6, 0x2b, 0x27, 0xb6, 0xee, 0xa5, 0xe9, 0x52, 0x1c, 0xd4, 0xdc, 0xb5, 0xa4, 0x79, 0x15, 0x33, 0xd0, 0x8a, 0x56, 0x0, 0xbc, 0x48, 0x2f, 0x98, 0x9c}} + return a, nil +} + var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\xb1\x0d\xc4\x20\x0c\x05\xd0\x9e\x29\xfe\x02\xd8\xfd\x6d\xe3\x4b\xac\x2f\x44\x82\x09\x78\x7f\xa5\x49\xfd\xa6\x1d\xdd\xe8\xd8\xcf\x55\x8a\x2a\xe3\x47\x1f\xbe\x2c\x1d\x8c\xfa\x6f\xe3\xb4\x34\xd4\xd9\x89\xbb\x71\x59\xb6\x18\x1b\x35\x20\xa2\x9f\x0a\x03\xa2\xe5\x0d\x00\x00\xff\xff\x60\xcd\x06\xbe\x4a\x00\x00\x00") func docGoBytes() ([]byte, error) { @@ -249,6 +270,8 @@ var _bindata = map[string]func() (*asset, error){ "3_found.up.sql": _3_foundUpSql, + "4_fleet.up.sql": _4_fleetUpSql, + "doc.go": docGo, } @@ -296,6 +319,7 @@ var _bintree = &bintree{nil, map[string]*bintree{ "1_setup.up.sql": &bintree{_1_setupUpSql, map[string]*bintree{}}, "2_timestamp.up.sql": &bintree{_2_timestampUpSql, map[string]*bintree{}}, "3_found.up.sql": &bintree{_3_foundUpSql, map[string]*bintree{}}, + "4_fleet.up.sql": &bintree{_4_fleetUpSql, map[string]*bintree{}}, "doc.go": &bintree{docGo, map[string]*bintree{}}, }} diff --git a/internal/persistence/postgres/migrations/sql/4_fleet.up.sql b/internal/persistence/postgres/migrations/sql/4_fleet.up.sql new file mode 100644 index 0000000..abb6e2d --- /dev/null +++ b/internal/persistence/postgres/migrations/sql/4_fleet.up.sql @@ -0,0 +1,3 @@ +ALTER TABLE missingMessages ADD COLUMN fleet TEXT; +ALTER TABLE storeNodeUnavailable ADD COLUMN fleet TEXT; +ALTER TABLE syncTopicStatus ADD COLUMN fleet TEXT;