From 8510aef48ddb4c74b9a8c5ef2fc432c2cc69a068 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Tue, 17 Sep 2024 10:38:14 -0400 Subject: [PATCH] fix: update on insert --- cmd/storemsgcounter/execute.go | 10 ++++++++-- go.mod | 2 +- go.sum | 2 ++ internal/persistence/database.go | 2 +- 4 files changed, 12 insertions(+), 4 deletions(-) diff --git a/cmd/storemsgcounter/execute.go b/cmd/storemsgcounter/execute.go index 1a4ec68..fc20b16 100644 --- a/cmd/storemsgcounter/execute.go +++ b/cmd/storemsgcounter/execute.go @@ -475,7 +475,11 @@ func (app *Application) fetchStoreNodeMessages(ctx context.Context, runId string success := false count := 1 for retry && count <= maxAttempts { - queryLogger.Info("retrieving message history for topic", zap.Int("attempt", count)) + requestID := protocol.GenerateRequestID() + + queryLogger.Info("retrieving message history for topic", + zap.Int("attempt", count), + zap.String("requestID", hex.EncodeToString(requestID))) tCtx, cancel := context.WithTimeout(ctx, 1*time.Minute) result, err = app.node.Store().Query(tCtx, store.FilterCriteria{ @@ -486,7 +490,9 @@ func (app *Application) fetchStoreNodeMessages(ctx context.Context, runId string cancel() if err != nil { - queryLogger.Error("could not query storenode", zap.Error(err), zap.Int("attempt", count)) + queryLogger.Error("could not query storenode", + zap.Error(err), zap.Int("attempt", count), + zap.String("requestID", hex.EncodeToString(requestID))) time.Sleep(2 * time.Second) } else { queryLogger.Info("messages available", zap.Int("len", len(result.Messages()))) diff --git a/go.mod b/go.mod index 0d76982..399e027 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/multiformats/go-multiaddr v0.12.4 github.com/prometheus/client_golang v1.19.1 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20240806200653-4f1d692413e9 + github.com/waku-org/go-waku v0.8.1-0.20240917141301-991e872de95c go.opencensus.io v0.24.0 go.uber.org/zap v1.27.0 golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 diff --git a/go.sum b/go.sum index 3e9947e..d97cd5a 100644 --- a/go.sum +++ b/go.sum @@ -826,6 +826,8 @@ github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= github.com/waku-org/go-waku v0.8.1-0.20240806200653-4f1d692413e9 h1:aTOUQm0kKtHiqraFpqj1Ja++C+qyZyeiSPKtXe3Ctac= github.com/waku-org/go-waku v0.8.1-0.20240806200653-4f1d692413e9/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg= +github.com/waku-org/go-waku v0.8.1-0.20240917141301-991e872de95c h1:dyJ0JOZjQNNXtOCDNfN7tjpCZKTJLOVOrL5zNbJ/4zQ= +github.com/waku-org/go-waku v0.8.1-0.20240917141301-991e872de95c/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/internal/persistence/database.go b/internal/persistence/database.go index 8fa9e07..ee3d484 100644 --- a/internal/persistence/database.go +++ b/internal/persistence/database.go @@ -284,7 +284,7 @@ func (d *DBStore) UpdateTopicSyncState(tx *sql.Tx, topic string, lastSyncTimesta } func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, topic string, storenodes []peer.ID, status string) error { - stmt, err := tx.Prepare("INSERT INTO missingMessages(runId, fleet, clusterId, pubsubTopic, messageHash, storenode, msgStatus, storedAt) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)") + stmt, err := tx.Prepare("INSERT INTO missingMessages(runId, fleet, clusterId, pubsubTopic, messageHash, storenode, msgStatus, storedAt) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (messageHash, storenode, fleet) DO UPDATE pubsubTopic = EXCLUDED.pubsubTopic, msgStatus = EXCLUDED.msgStatus, storedAt = EXCLUDED.storedAt, clusterId = EXCLUDED.clusterId") if err != nil { return err }