mirror of
https://github.com/logos-messaging/storenode-messages-counter.git
synced 2026-01-02 14:13:11 +00:00
fix: sql query and use custom requestIDs and bump go-waku
This commit is contained in:
parent
8510aef48d
commit
3df361b6fd
@ -11,6 +11,7 @@ import (
|
||||
"time"
|
||||
|
||||
"golang.org/x/exp/maps"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
"github.com/google/uuid"
|
||||
@ -138,10 +139,13 @@ func Execute(ctx context.Context, options Options) error {
|
||||
}
|
||||
defer wakuNode.Stop()
|
||||
|
||||
rateLimiters := make(map[peer.ID]*rate.Limiter)
|
||||
|
||||
var storenodeIDs peer.IDSlice
|
||||
for _, s := range storenodes {
|
||||
wakuNode.Host().Peerstore().AddAddrs(s.ID, s.Addrs, peerstore.PermanentAddrTTL)
|
||||
storenodeIDs = append(storenodeIDs, s.ID)
|
||||
rateLimiters[s.ID] = rate.NewLimiter(7, 1)
|
||||
}
|
||||
|
||||
err = dbStore.Start(ctx, wakuNode.Timesource())
|
||||
@ -476,7 +480,6 @@ func (app *Application) fetchStoreNodeMessages(ctx context.Context, runId string
|
||||
count := 1
|
||||
for retry && count <= maxAttempts {
|
||||
requestID := protocol.GenerateRequestID()
|
||||
|
||||
queryLogger.Info("retrieving message history for topic",
|
||||
zap.Int("attempt", count),
|
||||
zap.String("requestID", hex.EncodeToString(requestID)))
|
||||
@ -532,9 +535,10 @@ func (app *Application) fetchStoreNodeMessages(ctx context.Context, runId string
|
||||
count := 1
|
||||
cursorLogger := queryLogger.With(zap.String("cursor", hex.EncodeToString(result.Cursor())))
|
||||
for retry && count <= maxAttempts {
|
||||
cursorLogger.Info("retrieving next page")
|
||||
requestID := protocol.GenerateRequestID()
|
||||
cursorLogger.Info("retrieving next page", zap.String("requestID", hex.EncodeToString(requestID)))
|
||||
tCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
|
||||
err = result.Next(tCtx)
|
||||
err = result.Next(tCtx, store.WithRequestID(requestID))
|
||||
cancel()
|
||||
if err != nil {
|
||||
cursorLogger.Error("could not query storenode", zap.Error(err))
|
||||
@ -616,6 +620,7 @@ func (app *Application) verifyMessageExistence(ctx context.Context, runId string
|
||||
count := 1
|
||||
for retry && count <= maxAttempts {
|
||||
queryLogger.Info("querying by hash", zap.Int("attempt", count))
|
||||
|
||||
tCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
|
||||
result, err = app.node.Store().QueryByHash(tCtx, messageHashes, store.IncludeData(false), store.WithPeer(peerInfo.ID), store.WithPaging(false, 100))
|
||||
cancel()
|
||||
@ -649,9 +654,10 @@ func (app *Application) verifyMessageExistence(ctx context.Context, runId string
|
||||
success := false
|
||||
count := 1
|
||||
for retry && count <= maxAttempts {
|
||||
queryLogger.Info("executing next while querying hashes", zap.String("cursor", hexutil.Encode(result.Cursor())), zap.Int("attempt", count))
|
||||
requestID := protocol.GenerateRequestID()
|
||||
queryLogger.Info("executing next while querying hashes", zap.String("requestID", hex.EncodeToString(requestID)), zap.String("cursor", hexutil.Encode(result.Cursor())), zap.Int("attempt", count))
|
||||
tCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
|
||||
err = result.Next(tCtx)
|
||||
err = result.Next(tCtx, store.WithRequestID(requestID))
|
||||
cancel()
|
||||
if err != nil {
|
||||
queryLogger.Error("could not query storenode", zap.String("cursor", hexutil.Encode(result.Cursor())), zap.Error(err), zap.Int("attempt", count))
|
||||
|
||||
4
go.mod
4
go.mod
@ -18,10 +18,11 @@ require (
|
||||
github.com/multiformats/go-multiaddr v0.12.4
|
||||
github.com/prometheus/client_golang v1.19.1
|
||||
github.com/urfave/cli/v2 v2.27.2
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240917141301-991e872de95c
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240918210937-f0acee4d1dd3
|
||||
go.opencensus.io v0.24.0
|
||||
go.uber.org/zap v1.27.0
|
||||
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842
|
||||
golang.org/x/time v0.5.0
|
||||
google.golang.org/protobuf v1.34.1
|
||||
)
|
||||
|
||||
@ -161,7 +162,6 @@ require (
|
||||
golang.org/x/sync v0.7.0 // indirect
|
||||
golang.org/x/sys v0.20.0 // indirect
|
||||
golang.org/x/text v0.15.0 // indirect
|
||||
golang.org/x/time v0.5.0 // indirect
|
||||
golang.org/x/tools v0.21.0 // indirect
|
||||
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
|
||||
6
go.sum
6
go.sum
@ -824,10 +824,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E
|
||||
github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ=
|
||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
|
||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240806200653-4f1d692413e9 h1:aTOUQm0kKtHiqraFpqj1Ja++C+qyZyeiSPKtXe3Ctac=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240806200653-4f1d692413e9/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240917141301-991e872de95c h1:dyJ0JOZjQNNXtOCDNfN7tjpCZKTJLOVOrL5zNbJ/4zQ=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240917141301-991e872de95c/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240918210937-f0acee4d1dd3 h1:K03h68dOG+lOj+giCqrocjYd4cG9o3mIB/Lvf0HXitk=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240918210937-f0acee4d1dd3/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg=
|
||||
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
|
||||
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
|
||||
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=
|
||||
|
||||
@ -284,7 +284,7 @@ func (d *DBStore) UpdateTopicSyncState(tx *sql.Tx, topic string, lastSyncTimesta
|
||||
}
|
||||
|
||||
func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, topic string, storenodes []peer.ID, status string) error {
|
||||
stmt, err := tx.Prepare("INSERT INTO missingMessages(runId, fleet, clusterId, pubsubTopic, messageHash, storenode, msgStatus, storedAt) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (messageHash, storenode, fleet) DO UPDATE pubsubTopic = EXCLUDED.pubsubTopic, msgStatus = EXCLUDED.msgStatus, storedAt = EXCLUDED.storedAt, clusterId = EXCLUDED.clusterId")
|
||||
stmt, err := tx.Prepare("INSERT INTO missingMessages(runId, fleet, clusterId, pubsubTopic, messageHash, storenode, msgStatus, storedAt) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (messageHash, storenode, fleet) DO UPDATE SET pubsubTopic = EXCLUDED.pubsubTopic, msgStatus = EXCLUDED.msgStatus, storedAt = EXCLUDED.storedAt, clusterId = EXCLUDED.clusterId")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user