mirror of
https://github.com/logos-messaging/storenode-messages-counter.git
synced 2026-01-02 14:13:11 +00:00
feat: insert fleetname
This commit is contained in:
parent
e0f369326e
commit
4378920387
@ -423,7 +423,7 @@ func (app *Application) checkMissingMessageStatus(ctx context.Context, storenode
|
||||
}
|
||||
}, logger)
|
||||
|
||||
err := app.db.MarkMessagesAsFound(peerID, maps.Keys(foundMissingMessages), options.ClusterID)
|
||||
err := app.db.MarkMessagesAsFound(peerID, maps.Keys(foundMissingMessages))
|
||||
if err != nil {
|
||||
logger.Error("could not mark messages as found", zap.Error(err))
|
||||
return
|
||||
@ -446,7 +446,7 @@ func (app *Application) countMissingMessages(storenodes []peer.ID) error {
|
||||
now := app.node.Timesource().Now().Add(-2 * time.Hour)
|
||||
|
||||
// Count messages in last day (not including last two hours)
|
||||
results, err := app.db.CountMissingMessages(now.Add(-24*time.Hour), now, options.ClusterID)
|
||||
results, err := app.db.CountMissingMessages(now.Add(-24*time.Hour), now)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -455,7 +455,7 @@ func (app *Application) countMissingMessages(storenodes []peer.ID) error {
|
||||
}
|
||||
|
||||
// Count messages in last week (not including last two hours)
|
||||
results, err = app.db.CountMissingMessages(now.Add(-24*time.Hour*7), now, options.ClusterID)
|
||||
results, err = app.db.CountMissingMessages(now.Add(-24*time.Hour*7), now)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -279,12 +279,12 @@ func (d *DBStore) GetMissingMessages(from time.Time, to time.Time) (map[peer.ID]
|
||||
}
|
||||
|
||||
func (d *DBStore) UpdateTopicSyncState(tx *sql.Tx, topic string, lastSyncTimestamp time.Time) error {
|
||||
_, err := tx.Exec("INSERT INTO syncTopicStatus(clusterId, pubsubTopic, lastSyncTimestamp) VALUES ($1, $2, $3) ON CONFLICT(clusterId, pubsubTopic) DO UPDATE SET lastSyncTimestamp = $4", d.clusterID, topic, lastSyncTimestamp.UnixNano(), lastSyncTimestamp.UnixNano())
|
||||
_, err := tx.Exec("INSERT INTO syncTopicStatus(fleet, clusterId, pubsubTopic, lastSyncTimestamp) VALUES ($1, $2, $3, $4) ON CONFLICT(fleet, clusterId, pubsubTopic) DO UPDATE SET lastSyncTimestamp = $5", d.fleetName, d.clusterID, topic, lastSyncTimestamp.UnixNano(), lastSyncTimestamp.UnixNano())
|
||||
return err
|
||||
}
|
||||
|
||||
func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, topic string, storenodes []peer.ID, status string) error {
|
||||
stmt, err := tx.Prepare("INSERT INTO missingMessages(runId, clusterId, pubsubTopic, messageHash, storenode, msgStatus, storedAt) VALUES ($1, $2, $3, $4, $5, $6, $7)")
|
||||
stmt, err := tx.Prepare("INSERT INTO missingMessages(runId, fleet, clusterId, pubsubTopic, messageHash, storenode, msgStatus, storedAt) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -292,7 +292,7 @@ func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash,
|
||||
|
||||
now := time.Now().UnixNano()
|
||||
for _, s := range storenodes {
|
||||
_, err := stmt.Exec(uuid, d.clusterID, topic, msgHash.String(), s, status, now)
|
||||
_, err := stmt.Exec(uuid, d.fleetName, d.clusterID, topic, msgHash.String(), s, status, now)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -301,7 +301,7 @@ func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *DBStore) MarkMessagesAsFound(peerID peer.ID, messageHashes []pb.MessageHash, clusterID uint) error {
|
||||
func (d *DBStore) MarkMessagesAsFound(peerID peer.ID, messageHashes []pb.MessageHash) error {
|
||||
if len(messageHashes) == 0 {
|
||||
return nil
|
||||
}
|
||||
@ -315,7 +315,7 @@ func (d *DBStore) MarkMessagesAsFound(peerID peer.ID, messageHashes []pb.Message
|
||||
}
|
||||
query += ")"
|
||||
|
||||
args := []interface{}{clusterID}
|
||||
args := []interface{}{d.clusterID}
|
||||
for _, messageHash := range messageHashes {
|
||||
args = append(args, messageHash)
|
||||
}
|
||||
@ -326,12 +326,12 @@ func (d *DBStore) MarkMessagesAsFound(peerID peer.ID, messageHashes []pb.Message
|
||||
}
|
||||
|
||||
func (d *DBStore) RecordStorenodeUnavailable(uuid string, storenode peer.ID) error {
|
||||
_, err := d.db.Exec("INSERT INTO storeNodeUnavailable(runId, storenode, requestTime) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING", uuid, storenode, time.Now().UnixNano())
|
||||
_, err := d.db.Exec("INSERT INTO storeNodeUnavailable(runId, fleet, storenode, requestTime) VALUES ($1, $2, $3, $4) ON CONFLICT DO NOTHING", uuid, d.fleetName, storenode, time.Now().UnixNano())
|
||||
return err
|
||||
}
|
||||
|
||||
func (d *DBStore) CountMissingMessages(from time.Time, to time.Time, clusterID uint) (map[peer.ID]int, error) {
|
||||
rows, err := d.db.Query("SELECT storenode, count(1) as cnt FROM missingMessages WHERE storedAt >= $1 AND storedAt <= $2 AND clusterId = $3 AND msgStatus = 'does_not_exist' AND foundOnRecheck = false GROUP BY storenode", from.UnixNano(), to.UnixNano(), clusterID)
|
||||
func (d *DBStore) CountMissingMessages(from time.Time, to time.Time) (map[peer.ID]int, error) {
|
||||
rows, err := d.db.Query("SELECT storenode, count(1) as cnt FROM missingMessages WHERE storedAt >= $1 AND storedAt <= $2 AND clusterId = $3 AND msgStatus = 'does_not_exist' AND foundOnRecheck = false GROUP BY storenode", from.UnixNano(), to.UnixNano(), d.clusterID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user