diff --git a/cmd/storemsgcounter/execute.go b/cmd/storemsgcounter/execute.go index 4b1d608..298f7b0 100644 --- a/cmd/storemsgcounter/execute.go +++ b/cmd/storemsgcounter/execute.go @@ -423,7 +423,7 @@ func (app *Application) checkMissingMessageStatus(ctx context.Context, storenode } }, logger) - err := app.db.MarkMessagesAsFound(peerID, maps.Keys(foundMissingMessages), options.ClusterID) + err := app.db.MarkMessagesAsFound(peerID, maps.Keys(foundMissingMessages)) if err != nil { logger.Error("could not mark messages as found", zap.Error(err)) return @@ -446,7 +446,7 @@ func (app *Application) countMissingMessages(storenodes []peer.ID) error { now := app.node.Timesource().Now().Add(-2 * time.Hour) // Count messages in last day (not including last two hours) - results, err := app.db.CountMissingMessages(now.Add(-24*time.Hour), now, options.ClusterID) + results, err := app.db.CountMissingMessages(now.Add(-24*time.Hour), now) if err != nil { return err } @@ -455,7 +455,7 @@ func (app *Application) countMissingMessages(storenodes []peer.ID) error { } // Count messages in last week (not including last two hours) - results, err = app.db.CountMissingMessages(now.Add(-24*time.Hour*7), now, options.ClusterID) + results, err = app.db.CountMissingMessages(now.Add(-24*time.Hour*7), now) if err != nil { return err } diff --git a/internal/persistence/database.go b/internal/persistence/database.go index bff69a5..fff7906 100644 --- a/internal/persistence/database.go +++ b/internal/persistence/database.go @@ -279,12 +279,12 @@ func (d *DBStore) GetMissingMessages(from time.Time, to time.Time) (map[peer.ID] } func (d *DBStore) UpdateTopicSyncState(tx *sql.Tx, topic string, lastSyncTimestamp time.Time) error { - _, err := tx.Exec("INSERT INTO syncTopicStatus(clusterId, pubsubTopic, lastSyncTimestamp) VALUES ($1, $2, $3) ON CONFLICT(clusterId, pubsubTopic) DO UPDATE SET lastSyncTimestamp = $4", d.clusterID, topic, lastSyncTimestamp.UnixNano(), lastSyncTimestamp.UnixNano()) + _, err := tx.Exec("INSERT INTO syncTopicStatus(fleet, clusterId, pubsubTopic, lastSyncTimestamp) VALUES ($1, $2, $3, $4) ON CONFLICT(fleet, clusterId, pubsubTopic) DO UPDATE SET lastSyncTimestamp = $5", d.fleetName, d.clusterID, topic, lastSyncTimestamp.UnixNano(), lastSyncTimestamp.UnixNano()) return err } func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, topic string, storenodes []peer.ID, status string) error { - stmt, err := tx.Prepare("INSERT INTO missingMessages(runId, clusterId, pubsubTopic, messageHash, storenode, msgStatus, storedAt) VALUES ($1, $2, $3, $4, $5, $6, $7)") + stmt, err := tx.Prepare("INSERT INTO missingMessages(runId, fleet, clusterId, pubsubTopic, messageHash, storenode, msgStatus, storedAt) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)") if err != nil { return err } @@ -292,7 +292,7 @@ func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, now := time.Now().UnixNano() for _, s := range storenodes { - _, err := stmt.Exec(uuid, d.clusterID, topic, msgHash.String(), s, status, now) + _, err := stmt.Exec(uuid, d.fleetName, d.clusterID, topic, msgHash.String(), s, status, now) if err != nil { return err } @@ -301,7 +301,7 @@ func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, return nil } -func (d *DBStore) MarkMessagesAsFound(peerID peer.ID, messageHashes []pb.MessageHash, clusterID uint) error { +func (d *DBStore) MarkMessagesAsFound(peerID peer.ID, messageHashes []pb.MessageHash) error { if len(messageHashes) == 0 { return nil } @@ -315,7 +315,7 @@ func (d *DBStore) MarkMessagesAsFound(peerID peer.ID, messageHashes []pb.Message } query += ")" - args := []interface{}{clusterID} + args := []interface{}{d.clusterID} for _, messageHash := range messageHashes { args = append(args, messageHash) } @@ -326,12 +326,12 @@ func (d *DBStore) MarkMessagesAsFound(peerID peer.ID, messageHashes []pb.Message } func (d *DBStore) RecordStorenodeUnavailable(uuid string, storenode peer.ID) error { - _, err := d.db.Exec("INSERT INTO storeNodeUnavailable(runId, storenode, requestTime) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING", uuid, storenode, time.Now().UnixNano()) + _, err := d.db.Exec("INSERT INTO storeNodeUnavailable(runId, fleet, storenode, requestTime) VALUES ($1, $2, $3, $4) ON CONFLICT DO NOTHING", uuid, d.fleetName, storenode, time.Now().UnixNano()) return err } -func (d *DBStore) CountMissingMessages(from time.Time, to time.Time, clusterID uint) (map[peer.ID]int, error) { - rows, err := d.db.Query("SELECT storenode, count(1) as cnt FROM missingMessages WHERE storedAt >= $1 AND storedAt <= $2 AND clusterId = $3 AND msgStatus = 'does_not_exist' AND foundOnRecheck = false GROUP BY storenode", from.UnixNano(), to.UnixNano(), clusterID) +func (d *DBStore) CountMissingMessages(from time.Time, to time.Time) (map[peer.ID]int, error) { + rows, err := d.db.Query("SELECT storenode, count(1) as cnt FROM missingMessages WHERE storedAt >= $1 AND storedAt <= $2 AND clusterId = $3 AND msgStatus = 'does_not_exist' AND foundOnRecheck = false GROUP BY storenode", from.UnixNano(), to.UnixNano(), d.clusterID) if err != nil { return nil, err }