diff --git a/Makefile b/Makefile index 0a163ca..f448cd3 100644 --- a/Makefile +++ b/Makefile @@ -5,8 +5,6 @@ all: build build: go build -tags=gowaku_no_rln -o build/storemsgcounter ./cmd/storemsgcounter - go build -tags=gowaku_no_rln -o build/populatedb ./cmd/populatedb - lint-install: curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | \ diff --git a/cmd/populatedb/main.go b/cmd/populatedb/main.go deleted file mode 100644 index a6ca320..0000000 --- a/cmd/populatedb/main.go +++ /dev/null @@ -1,135 +0,0 @@ -package main - -import ( - "database/sql" - "fmt" - "math/rand" - "os" - "os/signal" - "syscall" - "time" - - "github.com/google/uuid" - _ "github.com/mattn/go-sqlite3" // Blank import to register the sqlite3 driver - "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "google.golang.org/protobuf/proto" -) - -type dbInfo struct { - id string - db *sql.DB - file string - stmt *sql.Stmt - inserted bool -} - -type dbMap map[string]*dbInfo - -var databases dbMap - -func newDBInfo(file string) { - id := uuid.New().String() - - db, err := sql.Open("sqlite3", file+"?_journal=WAL") - if err != nil { - panic(err.Error()) - } - - stmt, err := db.Prepare("INSERT INTO message(pubsubTopic, contentTopic, payload, version, timestamp, id, messageHash, storedAt) VALUES(?,?,?,?,?,?,?,?)") - if err != nil { - panic(err.Error()) - } - - databases[id] = &dbInfo{ - id: id, - file: file, - db: db, - stmt: stmt, - } -} - -func (d dbMap) values() []*dbInfo { - var result []*dbInfo - for _, x := range d { - result = append(result, x) - } - return result -} - -func main() { - if len(os.Args) == 1 { - fmt.Println("Use: populatedb [path_to_db1 path_to_db2 ...]") - return - } - - databases = make(dbMap) - for _, dbPath := range os.Args[1:] { - newDBInfo(dbPath) - } - - pubsubTopics := []string{ - "/waku/2/rs/1/1", - "/waku/2/rs/1/2", - "/waku/2/rs/1/3", - } - - // Wait for a SIGINT or SIGTERM signal - ch := make(chan os.Signal, 1) - signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) - - timeInterval := 5 * time.Second - t := time.NewTicker(timeInterval) - - minuteTimer := time.NewTimer(0) - - fmt.Print(" \t\t\t\t\t\t") - i := 0 - for range databases { - i++ - fmt.Printf("db_%d\t", i) - } - fmt.Println() - - for { - select { - case currTime := <-minuteTimer.C: - fmt.Println(currTime) - minuteTimer.Reset(1 * time.Minute) - case <-t.C: - insertCnt := rand.Intn(len(databases)) + 1 // Insert in at least one store node - - dbArr := databases.values() - rand.Shuffle(len(dbArr), func(i, j int) { dbArr[i], dbArr[j] = dbArr[j], dbArr[i] }) - - for _, x := range dbArr { - x.inserted = false - } - - now := time.Now().UnixNano() - msg := &pb.WakuMessage{ - Timestamp: proto.Int64(now), - Payload: []byte{1, 2, 3, 4, 5}, - ContentTopic: "test", - } - envelope := protocol.NewEnvelope(msg, now, pubsubTopics[rand.Intn(len(pubsubTopics))]) - hash := envelope.Hash() - for i := 0; i < insertCnt; i++ { - dbArr[i].inserted = true - _, err := dbArr[i].stmt.Exec([]byte(envelope.PubsubTopic()), []byte(msg.ContentTopic), msg.Payload, msg.GetVersion(), msg.GetTimestamp(), envelope.Index().Digest, hash.Bytes(), now) - if err != nil { - panic(err.Error()) - } - } - - fmt.Printf("%s\t%s\t%v\t", hash.String(), envelope.PubsubTopic(), msg.GetTimestamp()) - for _, x := range databases { - fmt.Printf("%v\t", x.inserted) - } - fmt.Println() - - case <-ch: - return - } - } -} diff --git a/cmd/storemsgcounter/execute.go b/cmd/storemsgcounter/execute.go index eea8aea..b2f6281 100644 --- a/cmd/storemsgcounter/execute.go +++ b/cmd/storemsgcounter/execute.go @@ -10,6 +10,8 @@ import ( "sync" "time" + "golang.org/x/exp/maps" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/google/uuid" "github.com/libp2p/go-libp2p/core/peer" @@ -128,8 +130,10 @@ func Execute(ctx context.Context, options Options) error { } defer wakuNode.Stop() + var storenodeIDs peer.IDSlice for _, s := range storenodes { wakuNode.Host().Peerstore().AddAddrs(s.ID, s.Addrs, peerstore.PermanentAddrTTL) + storenodeIDs = append(storenodeIDs, s.ID) } err = dbStore.Start(ctx, wakuNode.Timesource()) @@ -143,25 +147,50 @@ func Execute(ctx context.Context, options Options) error { db: dbStore, } - timer := time.NewTimer(0) - defer timer.Stop() + missingMessagesTimer := time.NewTimer(0) + defer missingMessagesTimer.Stop() + + syncCheckTimer := time.NewTicker(30 * time.Minute) + defer syncCheckTimer.Stop() + for { select { case <-ctx.Done(): return nil - case <-timer.C: + case <-missingMessagesTimer.C: tmpUUID := uuid.New() runId := hex.EncodeToString(tmpUUID[:]) runIdLogger := logger.With(zap.String("runId", runId)) runIdLogger.Info("verifying message history...") - err := application.verifyHistory(ctx, runId, storenodes, runIdLogger) + err := application.verifyHistory(ctx, runId, storenodeIDs, runIdLogger) if err != nil { return err } runIdLogger.Info("verification complete") - timer.Reset(timeInterval) + missingMessagesTimer.Reset(timeInterval) + case <-syncCheckTimer.C: + go func() { + tmpUUID := uuid.New() + runId := hex.EncodeToString(tmpUUID[:]) + runIdLogger := logger.With(zap.String("syncRunId", runId)) + runIdLogger.Info("rechecking missing messages status") + + err := application.checkMissingMessageStatus(ctx, runId, runIdLogger) + if err != nil { + logger.Error("could not recheck the status of missing messages", zap.Error(err)) + return + } + + err = application.countMissingMessages() + if err != nil { + logger.Error("could not count missing messages", zap.Error(err)) + return + } + + runIdLogger.Info("missing messages recheck complete") + }() } } } @@ -170,7 +199,7 @@ var msgMapLock sync.Mutex var msgMap map[pb.MessageHash]map[peer.ID]MessageExistence var msgPubsubTopic map[pb.MessageHash]string -func (app *Application) verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo, logger *zap.Logger) error { +func (app *Application) verifyHistory(ctx context.Context, runId string, storenodes peer.IDSlice, logger *zap.Logger) error { // [MessageHash][StoreNode] = exists? msgMapLock.Lock() @@ -213,9 +242,9 @@ func (app *Application) verifyHistory(ctx context.Context, runId string, storeno msgsToVerify := make(map[peer.ID][]pb.MessageHash) // storenode -> msgHash msgMapLock.Lock() for msgHash, nodes := range msgMap { - for _, node := range storenodes { - if nodes[node.ID] != Exists { - msgsToVerify[node.ID] = append(msgsToVerify[node.ID], msgHash) + for _, s := range storenodes { + if nodes[s] != Exists { + msgsToVerify[s] = append(msgsToVerify[s], msgHash) } } } @@ -226,7 +255,27 @@ func (app *Application) verifyHistory(ctx context.Context, runId string, storeno wg.Add(1) go func(peerID peer.ID, messageHashes []pb.MessageHash) { defer wg.Done() - app.verifyMessageExistence(ctx, runId, peerID, messageHashes, logger) + + onResult := func(result *store.Result) { + msgMapLock.Lock() + for _, mkv := range result.Messages() { + hash := mkv.WakuMessageHash() + _, ok := msgMap[hash] + if !ok { + msgMap[hash] = make(map[peer.ID]MessageExistence) + } + msgMap[hash][result.PeerID()] = Exists + } + + for _, msgHash := range messageHashes { + if msgMap[msgHash][result.PeerID()] != Exists { + msgMap[msgHash][result.PeerID()] = DoesNotExist + } + } + msgMapLock.Unlock() + } + + app.verifyMessageExistence(ctx, runId, peerID, messageHashes, onResult, logger) }(peerID, messageHashes) } wg.Wait() @@ -245,13 +294,13 @@ func (app *Application) verifyHistory(ctx context.Context, runId string, storeno var missingIn []peer.ID var unknownIn []peer.ID - for _, node := range storenodes { - if nodes[node.ID] == DoesNotExist { - missingIn = append(missingIn, node.ID) - missingInSummary[node.ID]++ - } else if nodes[node.ID] == Unknown { - unknownIn = append(unknownIn, node.ID) - unknownInSummary[node.ID]++ + for _, s := range storenodes { + if nodes[s] == DoesNotExist { + missingIn = append(missingIn, s) + missingInSummary[s]++ + } else if nodes[s] == Unknown { + unknownIn = append(unknownIn, s) + unknownInSummary[s]++ } } @@ -274,13 +323,13 @@ func (app *Application) verifyHistory(ctx context.Context, runId string, storeno } for _, s := range storenodes { - missingCnt := missingInSummary[s.ID] - app.metrics.RecordMissingMessages(s.ID, "does_not_exist", missingCnt) - logger.Info("missing message summary", zap.Stringer("storenode", s.ID), zap.Int("numMsgs", missingCnt)) + missingCnt := missingInSummary[s] + app.metrics.RecordMissingMessages(s, "does_not_exist", missingCnt) + logger.Info("missing message summary", zap.Stringer("storenode", s), zap.Int("numMsgs", missingCnt)) - unknownCnt := unknownInSummary[s.ID] - app.metrics.RecordMissingMessages(s.ID, "unknown", unknownCnt) - logger.Info("messages that could not be verified summary", zap.Stringer("storenode", s.ID), zap.Int("numMsgs", missingCnt)) + unknownCnt := unknownInSummary[s] + app.metrics.RecordMissingMessages(s, "unknown", unknownCnt) + logger.Info("messages that could not be verified summary", zap.Stringer("storenode", s), zap.Int("numMsgs", missingCnt)) } logger.Info("total missing messages", zap.Int("total", totalMissingMessages)) @@ -289,6 +338,70 @@ func (app *Application) verifyHistory(ctx context.Context, runId string, storeno return nil } +func (app *Application) checkMissingMessageStatus(ctx context.Context, runId string, logger *zap.Logger) error { + now := app.node.Timesource().Now() + + // Get all messages whose status is missing or does not exist, and the column found_on_recheck is false + // if found, set found_on_recheck to true + missingMessages, err := app.db.GetMissingMessages(now.Add(-2*time.Hour), now.Add(-time.Hour), options.ClusterID) + if err != nil { + return err + } + + wg := sync.WaitGroup{} + for storenodeID, messageHashes := range missingMessages { + wg.Add(1) + go func(peerID peer.ID, messageHashes []pb.MessageHash) { + defer wg.Done() + + foundMissingMessages := make(map[pb.MessageHash]struct{}) + app.verifyMessageExistence(ctx, runId, peerID, messageHashes, func(result *store.Result) { + for _, mkv := range result.Messages() { + foundMissingMessages[mkv.WakuMessageHash()] = struct{}{} + } + }, logger) + + err := app.db.MarkMessagesAsFound(peerID, maps.Keys(foundMissingMessages), options.ClusterID) + if err != nil { + logger.Error("could not mark messages as found", zap.Error(err)) + return + } + + app.metrics.RecordMissingMessagesPrevHour(peerID, len(messageHashes)-len(foundMissingMessages)) + + }(storenodeID, messageHashes) + } + + wg.Wait() + + return nil +} + +func (app *Application) countMissingMessages() error { + + // not including last two hours in now to let sync work + now := app.node.Timesource().Now().Add(-2 * time.Hour) + + // Count messages in last day (not including last two hours) + results, err := app.db.CountMissingMessages(now.Add(-24*time.Hour), now, options.ClusterID) + if err != nil { + return err + } + for storenode, cnt := range results { + app.metrics.RecordMissingMessagesLastDay(storenode, cnt) + } + + // Count messages in last week (not including last two hours) + results, err = app.db.CountMissingMessages(now.Add(-24*time.Hour*7), now, options.ClusterID) + if err != nil { + return err + } + for storenode, cnt := range results { + app.metrics.RecordMissingMessagesLastWeek(storenode, cnt) + } + return nil +} + func (app *Application) fetchStoreNodeMessages(ctx context.Context, runId string, storenodeID peer.ID, topic string, startTime time.Time, endTime time.Time, logger *zap.Logger) { var result *store.Result var err error @@ -379,7 +492,7 @@ func (app *Application) fetchStoreNodeMessages(ctx context.Context, runId string } } -func (app *Application) retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo, topic string, lastSyncTimestamp *time.Time, tx *sql.Tx, logger *zap.Logger) { +func (app *Application) retrieveHistory(ctx context.Context, runId string, storenodes peer.IDSlice, topic string, lastSyncTimestamp *time.Time, tx *sql.Tx, logger *zap.Logger) { logger = logger.With(zap.String("topic", topic), zap.Timep("lastSyncTimestamp", lastSyncTimestamp)) if lastSyncTimestamp != nil { @@ -402,12 +515,12 @@ func (app *Application) retrieveHistory(ctx context.Context, runId string, store // Determine if the messages exist across all nodes wg := sync.WaitGroup{} - for _, node := range storenodes { + for _, storePeerID := range storenodes { wg.Add(1) go func(peerID peer.ID) { defer wg.Done() app.fetchStoreNodeMessages(ctx, runId, peerID, topic, startTime, endTime, logger) - }(node.ID) + }(storePeerID) } wg.Wait() @@ -422,7 +535,7 @@ func (app *Application) retrieveHistory(ctx context.Context, runId string, store } -func (app *Application) verifyMessageExistence(ctx context.Context, runId string, peerID peer.ID, messageHashes []pb.MessageHash, logger *zap.Logger) { +func (app *Application) verifyMessageExistence(ctx context.Context, runId string, peerID peer.ID, messageHashes []pb.MessageHash, onResult func(result *store.Result), logger *zap.Logger) { var result *store.Result var err error @@ -462,23 +575,7 @@ func (app *Application) verifyMessageExistence(ctx context.Context, runId string app.metrics.RecordStorenodeAvailability(peerID, true) for !result.IsComplete() { - msgMapLock.Lock() - for _, mkv := range result.Messages() { - hash := mkv.WakuMessageHash() - _, ok := msgMap[hash] - if !ok { - msgMap[hash] = make(map[peer.ID]MessageExistence) - } - msgMap[hash][peerInfo.ID] = Exists - } - - for _, msgHash := range messageHashes { - if msgMap[msgHash][peerInfo.ID] != Exists { - msgMap[msgHash][peerInfo.ID] = DoesNotExist - } - } - - msgMapLock.Unlock() + onResult(result) retry := true success := false diff --git a/cmd/storemsgcounter/flags.go b/cmd/storemsgcounter/flags.go index 39e83e6..ebcdf38 100644 --- a/cmd/storemsgcounter/flags.go +++ b/cmd/storemsgcounter/flags.go @@ -65,7 +65,7 @@ var cliFlags = []cli.Flag{ altsrc.NewStringFlag(&cli.StringFlag{ Name: "db-url", Usage: "The database connection URL for persistent storage.", - Value: "sqlite3://storage.db", + Value: "", Destination: &options.DatabaseURL, EnvVars: []string{"MSG_VERIF_DB_URL"}, }), diff --git a/go.mod b/go.mod index ba94a51..0d76982 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,9 @@ go 1.21 toolchain go1.21.10 -replace github.com/ethereum/go-ethereum v1.10.26 => github.com/status-im/go-ethereum v1.10.25-status.4 +replace github.com/ethereum/go-ethereum v1.10.26 => github.com/status-im/go-ethereum v1.10.25-status.15 + +replace github.com/libp2p/go-libp2p-pubsub v0.11.0 => github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 require ( github.com/ethereum/go-ethereum v1.10.26 @@ -12,14 +14,14 @@ require ( github.com/google/uuid v1.4.0 github.com/ipfs/go-log/v2 v2.5.1 github.com/jackc/pgx/v5 v5.4.1 - github.com/libp2p/go-libp2p v0.35.0 - github.com/mattn/go-sqlite3 v1.14.17 + github.com/libp2p/go-libp2p v0.35.2 github.com/multiformats/go-multiaddr v0.12.4 github.com/prometheus/client_golang v1.19.1 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20240605190333-d2d2f5672ebd + github.com/waku-org/go-waku v0.8.1-0.20240806200653-4f1d692413e9 go.opencensus.io v0.24.0 go.uber.org/zap v1.27.0 + golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 google.golang.org/protobuf v1.34.1 ) @@ -33,6 +35,7 @@ require ( github.com/btcsuite/btcd v0.20.1-beta // indirect github.com/btcsuite/btcd/btcec/v2 v2.2.1 // indirect github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d // indirect + github.com/cenkalti/backoff/v3 v3.2.2 // indirect github.com/cenkalti/backoff/v4 v4.1.2 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/containerd/cgroups v1.1.0 // indirect @@ -56,7 +59,7 @@ require ( github.com/golang/snappy v0.0.4 // indirect github.com/google/gopacket v1.1.19 // indirect github.com/google/pprof v0.0.0-20240207164012-fb44976bdcd5 // indirect - github.com/gorilla/websocket v1.5.1 // indirect + github.com/gorilla/websocket v1.5.3 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect @@ -107,7 +110,7 @@ require ( github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/pion/datachannel v1.5.6 // indirect github.com/pion/dtls/v2 v2.2.11 // indirect - github.com/pion/ice/v2 v2.3.24 // indirect + github.com/pion/ice/v2 v2.3.25 // indirect github.com/pion/interceptor v0.1.29 // indirect github.com/pion/logging v0.2.2 // indirect github.com/pion/mdns v0.0.12 // indirect @@ -149,11 +152,10 @@ require ( github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/dig v1.17.1 // indirect - go.uber.org/fx v1.21.1 // indirect + go.uber.org/fx v1.22.1 // indirect go.uber.org/mock v0.4.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.23.0 // indirect - golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.25.0 // indirect golang.org/x/sync v0.7.0 // indirect diff --git a/go.sum b/go.sum index d79bd4a..3e9947e 100644 --- a/go.sum +++ b/go.sum @@ -100,6 +100,8 @@ github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtE github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs= github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s= github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34= +github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M= +github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo= github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -316,8 +318,8 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY= -github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/graph-gophers/graphql-go v1.3.0/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw= @@ -475,12 +477,10 @@ github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6 github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg= github.com/libp2p/go-flow-metrics v0.1.0 h1:0iPhMI8PskQwzh57jB9WxIuIOQ0r+15PChFGkx3Q3WM= github.com/libp2p/go-flow-metrics v0.1.0/go.mod h1:4Xi8MX8wj5aWNDAZttg6UPmc0ZrnFNsMtpsYUClFtro= -github.com/libp2p/go-libp2p v0.35.0 h1:1xS1Bkr9X7GtdvV6ntLnDV9xB1kNjHK1lZ0eaO6gnhc= -github.com/libp2p/go-libp2p v0.35.0/go.mod h1:snyJQix4ET6Tj+LeI0VPjjxTtdWpeOhYt5lEY0KirkQ= +github.com/libp2p/go-libp2p v0.35.2 h1:287oHbuplkrLdAF+syB0n/qDgd50AUBtEODqS0e0HDs= +github.com/libp2p/go-libp2p v0.35.2/go.mod h1:RKCDNt30IkFipGL0tl8wQW/3zVWEGFUZo8g2gAKxwjU= github.com/libp2p/go-libp2p-asn-util v0.4.1 h1:xqL7++IKD9TBFMgnLPZR6/6iYhawHKHl950SO9L6n94= github.com/libp2p/go-libp2p-asn-util v0.4.1/go.mod h1:d/NI6XZ9qxw67b4e+NgpQexCIiFYJjErASrYW4PFDN8= -github.com/libp2p/go-libp2p-pubsub v0.11.0 h1:+JvS8Kty0OiyUiN0i8H5JbaCgjnJTRnTHe4rU88dLFc= -github.com/libp2p/go-libp2p-pubsub v0.11.0/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ= github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA= github.com/libp2p/go-libp2p-testing v0.12.0/go.mod h1:KcGDRXyN7sQCllucn1cOOS+Dmm7ujhfEyXQL5lvkcPg= github.com/libp2p/go-msgio v0.3.0 h1:mf3Z8B1xcFN314sWX+2vOTShIE0Mmn2TXn3YCUQGNj0= @@ -630,8 +630,8 @@ github.com/pion/datachannel v1.5.6/go.mod h1:1eKT6Q85pRnr2mHiWHxJwO50SfZRtWHTsNI github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s= github.com/pion/dtls/v2 v2.2.11 h1:9U/dpCYl1ySttROPWJgqWKEylUdT0fXp/xst6JwY5Ks= github.com/pion/dtls/v2 v2.2.11/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE= -github.com/pion/ice/v2 v2.3.24 h1:RYgzhH/u5lH0XO+ABatVKCtRd+4U1GEaCXSMjNr13tI= -github.com/pion/ice/v2 v2.3.24/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw= +github.com/pion/ice/v2 v2.3.25 h1:M5rJA07dqhi3nobJIg+uPtcVjFECTrhcR3n0ns8kDZs= +github.com/pion/ice/v2 v2.3.25/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw= github.com/pion/interceptor v0.1.29 h1:39fsnlP1U8gw2JzOFWdfCU82vHvhW9o0rZnZF56wF+M= github.com/pion/interceptor v0.1.29/go.mod h1:ri+LGNjRUc5xUNtDEPzfdkmSqISixVTBF/z/Zms/6T4= github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= @@ -771,8 +771,8 @@ github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2 github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= -github.com/status-im/go-ethereum v1.10.25-status.4 h1:Ff35UseaP49DK2RRpSLo3DL7NUmMKzicQ/7/yub6gfU= -github.com/status-im/go-ethereum v1.10.25-status.4/go.mod h1:Dt4K5JYMhJRdtXJwBEyGZLZn9iz/chSOZyjVmt5ZhwQ= +github.com/status-im/go-ethereum v1.10.25-status.15 h1:il5fD124sV2i6+2hf6iK4MRSRQIeZAl51kc/svZTsdo= +github.com/status-im/go-ethereum v1.10.25-status.15/go.mod h1:Dt4K5JYMhJRdtXJwBEyGZLZn9iz/chSOZyjVmt5ZhwQ= github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4 h1:Gb2Tyox57NRNuZ2d3rmvB3pcmbu7O1RS3m8WRx7ilrg= github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q= github.com/status-im/status-go/extkeys v1.1.2 h1:FSjARgDathJ3rIapJt851LsIXP9Oyuu2M2jPJKuzloU= @@ -820,10 +820,12 @@ github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49u github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM= github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5 h1:4K3IS97JryAEV8pRXB//qPcg+8bPXl/O+AOLt3FeCKc= github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw= +github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16Et17jSGWM3mQhlIOZYiG+O+rlX5BsrBumw5flxk= +github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20240605190333-d2d2f5672ebd h1:g5EneT88eHOakr8Zukx4RP1JICgrSl9ZtmgGS+wQbC8= -github.com/waku-org/go-waku v0.8.1-0.20240605190333-d2d2f5672ebd/go.mod h1:biffO55kWbvfO8jdu/aAPiWcmozrfFKPum4EMFDib+k= +github.com/waku-org/go-waku v0.8.1-0.20240806200653-4f1d692413e9 h1:aTOUQm0kKtHiqraFpqj1Ja++C+qyZyeiSPKtXe3Ctac= +github.com/waku-org/go-waku v0.8.1-0.20240806200653-4f1d692413e9/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= @@ -858,8 +860,8 @@ go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/dig v1.17.1 h1:Tga8Lz8PcYNsWsyHMZ1Vm0OQOUaJNDyvPImgbAu9YSc= go.uber.org/dig v1.17.1/go.mod h1:Us0rSJiThwCv2GteUN0Q7OKvU7n5J4dxZ9JKUXozFdE= -go.uber.org/fx v1.21.1 h1:RqBh3cYdzZS0uqwVeEjOX2p73dddLpym315myy/Bpb0= -go.uber.org/fx v1.21.1/go.mod h1:HT2M7d7RHo+ebKGh9NRcrsrHHfpZ60nW3QRubMRfv48= +go.uber.org/fx v1.22.1 h1:nvvln7mwyT5s1q201YE29V/BFrGor6vMiDNpU/78Mys= +go.uber.org/fx v1.22.1/go.mod h1:HT2M7d7RHo+ebKGh9NRcrsrHHfpZ60nW3QRubMRfv48= go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index aa8d2ee..8842bc6 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -40,11 +40,36 @@ var topicLastSync = prometheus.NewGaugeVec( []string{"pubsubtopic"}, ) +var missingMessagesLastDay = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "msgcounter_missing_messages_last_day", + Help: "The number of messages missing in last 24hr (with 2hr delay)", + }, + []string{"storenode"}, +) + +var missingMessagesLastWeek = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "msgcounter_missing_messages_last_week", + Help: "The number of messages missing in last week (with 2hr delay)", + }, + []string{"storenode"}, +) + +var missingMessagesPreviousHour = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "msgcounter_missing_messages_prev_hour", + Help: "The number of messages missing in the previous hour", + }, + []string{"storenode"}, +) + var collectors = []prometheus.Collector{ missingMessages, storenodeAvailability, totalMissingMessages, topicLastSync, + missingMessagesLastDay, } // Metrics exposes the functions required to update prometheus metrics for relay protocol @@ -53,6 +78,9 @@ type Metrics interface { RecordStorenodeAvailability(peerID peer.ID, available bool) RecordTotalMissingMessages(cnt int) RecordLastSyncDate(topic string, date time.Time) + RecordMissingMessagesLastDay(peerID peer.ID, cnt int) + RecordMissingMessagesLastWeek(peerID peer.ID, cnt int) + RecordMissingMessagesPrevHour(peerID peer.ID, cnt int) } type metricsImpl struct { @@ -95,3 +123,21 @@ func (m *metricsImpl) RecordLastSyncDate(topic string, date time.Time) { topicLastSync.WithLabelValues(topic).Set(float64(date.Unix())) }() } + +func (m *metricsImpl) RecordMissingMessagesLastDay(peerID peer.ID, cnt int) { + go func() { + missingMessagesLastDay.WithLabelValues(peerID.String()).Set(float64(cnt)) + }() +} + +func (m *metricsImpl) RecordMissingMessagesLastWeek(peerID peer.ID, cnt int) { + go func() { + missingMessagesLastWeek.WithLabelValues(peerID.String()).Set(float64(cnt)) + }() +} + +func (m *metricsImpl) RecordMissingMessagesPrevHour(peerID peer.ID, cnt int) { + go func() { + missingMessagesPreviousHour.WithLabelValues(peerID.String()).Set(float64(cnt)) + }() +} diff --git a/internal/persistence/database.go b/internal/persistence/database.go index 76e46ea..61236a9 100644 --- a/internal/persistence/database.go +++ b/internal/persistence/database.go @@ -3,9 +3,11 @@ package persistence import ( "context" "database/sql" + "fmt" "sync" "time" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/timesource" @@ -237,6 +239,40 @@ func (d *DBStore) GetTopicSyncStatus(ctx context.Context, clusterID uint, pubsub return result, nil } +func (d *DBStore) GetMissingMessages(from time.Time, to time.Time, clusterID uint) (map[peer.ID][]pb.MessageHash, error) { + rows, err := d.db.Query("SELECT messageHash, storenode FROM missingMessages WHERE storedAt >= $1 AND storedAt <= $2 AND clusterId = $3 AND msgStatus = 'does_not_exist'", from.UnixNano(), to.UnixNano(), clusterID) + if err != nil { + return nil, err + } + defer rows.Close() + + results := make(map[peer.ID][]pb.MessageHash) + for rows.Next() { + var messageHashStr string + var peerIDStr string + err := rows.Scan(&messageHashStr, &peerIDStr) + if err != nil { + return nil, err + } + + peerID, err := peer.Decode(peerIDStr) + if err != nil { + d.log.Warn("could not decode peerID", zap.String("peerIDStr", peerIDStr), zap.Error(err)) + continue + } + + messageHashBytes, err := hexutil.Decode(messageHashStr) + if err != nil { + d.log.Warn("could not decode messageHash", zap.String("messageHashStr", messageHashStr), zap.Error(err)) + continue + } + + results[peerID] = append(results[peerID], pb.ToMessageHash(messageHashBytes)) + } + + return results, nil +} + func (d *DBStore) UpdateTopicSyncState(tx *sql.Tx, clusterID uint, topic string, lastSyncTimestamp time.Time) error { stmt, err := tx.Prepare("INSERT INTO syncTopicStatus(clusterId, pubsubTopic, lastSyncTimestamp) VALUES ($1, $2, $3) ON CONFLICT(clusterId, pubsubTopic) DO UPDATE SET lastSyncTimestamp = $4") if err != nil { @@ -270,6 +306,26 @@ func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, return nil } +func (d *DBStore) MarkMessagesAsFound(peerID peer.ID, messageHashes []pb.MessageHash, clusterID uint) error { + query := "UPDATE missingMessages SET foundOnRecheck = true WHERE clusterID = $1 AND messageHash IN (" + for i := range messageHashes { + if i > 0 { + query += ", " + } + query += fmt.Sprintf("$%d", i+2) + } + query += ")" + + args := []interface{}{clusterID} + for _, messageHash := range messageHashes { + args = append(args, messageHash) + } + + _, err := d.db.Exec(query, args...) + + return err +} + func (d *DBStore) RecordStorenodeUnavailable(uuid string, storenode peer.ID) error { stmt, err := d.db.Prepare("INSERT INTO storeNodeUnavailable(runId, storenode, requestTime) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING") if err != nil { @@ -285,3 +341,31 @@ func (d *DBStore) RecordStorenodeUnavailable(uuid string, storenode peer.ID) err return nil } + +func (d *DBStore) CountMissingMessages(from time.Time, to time.Time, clusterID uint) (map[peer.ID]int, error) { + rows, err := d.db.Query("SELECT storenode, count(1) as cnt FROM missingMessages WHERE storedAt >= $1 AND storedAt <= $2 AND clusterId = $3 AND msgStatus = 'does_not_exist' GROUP BY storenode", from.UnixNano(), to.UnixNano(), clusterID) + if err != nil { + return nil, err + } + + defer rows.Close() + results := make(map[peer.ID]int) + for rows.Next() { + var peerIDStr string + var cnt int + err := rows.Scan(&peerIDStr, &cnt) + if err != nil { + return nil, err + } + + peerID, err := peer.Decode(peerIDStr) + if err != nil { + d.log.Warn("could not decode peerID", zap.String("peerIDStr", peerIDStr), zap.Error(err)) + continue + } + + results[peerID] = cnt + } + + return results, nil +} diff --git a/internal/persistence/postgres/migrations/bindata.go b/internal/persistence/postgres/migrations/bindata.go index 55bf137..e6ea36a 100644 --- a/internal/persistence/postgres/migrations/bindata.go +++ b/internal/persistence/postgres/migrations/bindata.go @@ -2,6 +2,7 @@ // sources: // 1_setup.up.sql (856B) // 2_timestamp.up.sql (53B) +// 3_found.up.sql (86B) // doc.go (74B) package migrations @@ -111,6 +112,26 @@ func _2_timestampUpSql() (*asset, error) { return a, nil } +var __3_foundUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x04\xc0\x41\x0e\x82\x40\x0c\x05\xd0\x3d\xa7\xf8\xf7\x70\x55\x9c\xb2\xfa\xb4\x09\x76\x0e\x60\xb0\x22\x31\x8e\x8b\xc6\xfb\xfb\x84\xa1\x1b\x42\x66\x2a\x3e\x67\xd5\x39\x8e\x35\xab\xee\x47\xd6\x24\xad\xe1\xea\xec\xab\xe1\xf9\xfd\x8d\x87\x8f\x2d\xf7\x57\xee\x6f\xcc\xee\x54\x31\x34\x5d\xa4\x33\xb0\x08\x6f\x0a\xf3\x80\x75\xf2\x32\xfd\x03\x00\x00\xff\xff\xc0\x0e\x74\x77\x56\x00\x00\x00") + +func _3_foundUpSqlBytes() ([]byte, error) { + return bindataRead( + __3_foundUpSql, + "3_found.up.sql", + ) +} + +func _3_foundUpSql() (*asset, error) { + bytes, err := _3_foundUpSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "3_found.up.sql", size: 86, mode: os.FileMode(0664), modTime: time.Unix(1723057360, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x65, 0x30, 0x49, 0xb0, 0x22, 0x8c, 0xab, 0xc4, 0x3a, 0x4e, 0x39, 0xe, 0x31, 0x1, 0x53, 0xbe, 0x5c, 0x7a, 0x9a, 0xcd, 0x84, 0xe9, 0x28, 0x7d, 0xeb, 0xb4, 0x2, 0xc2, 0x3d, 0xee, 0x5d, 0x7a}} + return a, nil +} + var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\xb1\x0d\xc4\x20\x0c\x05\xd0\x9e\x29\xfe\x02\xd8\xfd\x6d\xe3\x4b\xac\x2f\x44\x82\x09\x78\x7f\xa5\x49\xfd\xa6\x1d\xdd\xe8\xd8\xcf\x55\x8a\x2a\xe3\x47\x1f\xbe\x2c\x1d\x8c\xfa\x6f\xe3\xb4\x34\xd4\xd9\x89\xbb\x71\x59\xb6\x18\x1b\x35\x20\xa2\x9f\x0a\x03\xa2\xe5\x0d\x00\x00\xff\xff\x60\xcd\x06\xbe\x4a\x00\x00\x00") func docGoBytes() ([]byte, error) { @@ -226,6 +247,8 @@ var _bindata = map[string]func() (*asset, error){ "2_timestamp.up.sql": _2_timestampUpSql, + "3_found.up.sql": _3_foundUpSql, + "doc.go": docGo, } @@ -272,6 +295,7 @@ type bintree struct { var _bintree = &bintree{nil, map[string]*bintree{ "1_setup.up.sql": &bintree{_1_setupUpSql, map[string]*bintree{}}, "2_timestamp.up.sql": &bintree{_2_timestampUpSql, map[string]*bintree{}}, + "3_found.up.sql": &bintree{_3_foundUpSql, map[string]*bintree{}}, "doc.go": &bintree{docGo, map[string]*bintree{}}, }} diff --git a/internal/persistence/postgres/migrations/sql/3_found.up.sql b/internal/persistence/postgres/migrations/sql/3_found.up.sql new file mode 100644 index 0000000..99c376c --- /dev/null +++ b/internal/persistence/postgres/migrations/sql/3_found.up.sql @@ -0,0 +1,2 @@ +ALTER TABLE missingMessages +ADD COLUMN foundOnRecheck BOOLEAN DEFAULT FALSE NOT NULL; diff --git a/internal/persistence/sqlite/migrations/bindata.go b/internal/persistence/sqlite/migrations/bindata.go deleted file mode 100644 index afc5f79..0000000 --- a/internal/persistence/sqlite/migrations/bindata.go +++ /dev/null @@ -1,319 +0,0 @@ -// Code generated by go-bindata. DO NOT EDIT. -// sources: -// 1_setup.up.sql (927B) -// 2_timestamp.up.sql (581B) -// doc.go (74B) - -package migrations - -import ( - "bytes" - "compress/gzip" - "crypto/sha256" - "fmt" - "io" - "io/ioutil" - "os" - "path/filepath" - "strings" - "time" -) - -func bindataRead(data []byte, name string) ([]byte, error) { - gz, err := gzip.NewReader(bytes.NewBuffer(data)) - if err != nil { - return nil, fmt.Errorf("read %q: %v", name, err) - } - - var buf bytes.Buffer - _, err = io.Copy(&buf, gz) - clErr := gz.Close() - - if err != nil { - return nil, fmt.Errorf("read %q: %v", name, err) - } - if clErr != nil { - return nil, err - } - - return buf.Bytes(), nil -} - -type asset struct { - bytes []byte - info os.FileInfo - digest [sha256.Size]byte -} - -type bindataFileInfo struct { - name string - size int64 - mode os.FileMode - modTime time.Time -} - -func (fi bindataFileInfo) Name() string { - return fi.name -} -func (fi bindataFileInfo) Size() int64 { - return fi.size -} -func (fi bindataFileInfo) Mode() os.FileMode { - return fi.mode -} -func (fi bindataFileInfo) ModTime() time.Time { - return fi.modTime -} -func (fi bindataFileInfo) IsDir() bool { - return false -} -func (fi bindataFileInfo) Sys() interface{} { - return nil -} - -var __1_setupUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xa4\x91\x4d\x4f\xc2\x30\x18\xc7\xcf\xf4\x53\x3c\xc7\x2d\xe1\xa2\xd7\x9d\x26\x54\x69\x84\xcd\xb4\x45\xe0\x58\x68\x33\x9b\xec\x05\xf7\xb4\x46\xbf\xbd\xa1\x12\x82\x58\xc7\x81\x73\xfb\xfc\x5f\x7e\xff\x09\xa7\xb9\xa4\x20\xf3\x87\x39\x05\xf6\x08\x45\x29\x81\xae\x99\x90\x02\xf0\xab\xdd\xc9\x6e\x6f\x77\xc2\x29\xe7\x11\x12\x32\xda\xd5\x1e\x9d\xe9\x99\x06\x56\x48\xfa\x44\x79\xf8\x5f\x2c\xe7\xf3\x31\x19\xed\xfd\x16\xfd\x36\x5c\xc0\x6b\xce\x27\xb3\xfc\xd7\x73\xad\xd0\x89\x83\xa4\x6d\x0c\x3a\xd5\xec\x63\x1a\x2f\x9c\x2d\x72\xbe\x81\x67\xba\x81\xe4\xe4\x36\x86\x33\xed\x94\xa4\xb0\x62\x72\x56\x2e\x25\xf0\x72\xc5\xa6\x19\x21\x64\xa0\x46\x63\x11\x6d\x5b\x2d\x0c\xa2\xaa\x4c\xa8\xd1\xfb\x96\xe9\x58\xc6\x5b\xfa\x35\x3f\x06\x33\x85\x6f\xd1\x67\xac\x06\x9b\xa3\xeb\x7a\xd3\x76\xda\xfc\x73\x7c\x1c\x21\xf2\x18\x2e\x75\xee\xfe\xaa\xc2\x05\xd0\xb3\x88\x63\x38\x19\xc6\x80\x1e\x79\xb2\x62\x4a\xd7\x17\x3c\xad\xfe\x5c\x60\x75\x07\x65\x71\x89\x36\x39\x25\x99\x52\x31\x49\xb3\xeb\x2a\xf7\x31\x95\xb0\x4e\x9a\x0d\x8e\x1a\x9c\x8a\x4e\x9b\x65\xab\x3e\x94\xad\xd5\xb6\x36\x83\xcb\x0e\xe2\xed\xcd\xbb\x37\xe8\x0e\xfb\x5c\x87\x18\x2c\x6e\xc2\x27\x5c\x1f\xf0\xc5\x4a\x24\x67\x59\xd2\x8c\x7c\x07\x00\x00\xff\xff\x6e\xd3\x44\xe6\x9f\x03\x00\x00") - -func _1_setupUpSqlBytes() ([]byte, error) { - return bindataRead( - __1_setupUpSql, - "1_setup.up.sql", - ) -} - -func _1_setupUpSql() (*asset, error) { - bytes, err := _1_setupUpSqlBytes() - if err != nil { - return nil, err - } - - info := bindataFileInfo{name: "1_setup.up.sql", size: 927, mode: os.FileMode(0664), modTime: time.Unix(1716212183, 0)} - a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xcc, 0x43, 0xbc, 0x2c, 0x21, 0xab, 0xc4, 0xe7, 0xc5, 0x35, 0xea, 0xbb, 0x4b, 0x1c, 0x98, 0x4, 0x62, 0x56, 0x6d, 0xaf, 0x18, 0x28, 0xe5, 0xf8, 0x8, 0xfd, 0xa1, 0x3, 0xa8, 0xe0, 0x65, 0xe1}} - return a, nil -} - -var __2_timestampUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xac\x91\x41\x6f\x83\x30\x0c\x85\xcf\xf5\xaf\xf0\xb1\x48\xf9\x07\x9c\xd2\xce\xed\xa2\x41\xa8\x82\x27\xb5\xa7\x89\x96\x88\x21\xad\x50\xe1\xa0\xfd\xfd\xa9\x5d\x85\xd0\xc2\x71\xd7\x7c\x79\xcf\x7e\xcf\x5b\x47\x9a\x09\x59\x6f\x32\x42\xb3\x43\x5b\x30\xd2\xd1\x94\x5c\xe2\xb5\x15\x69\xbb\x26\xf7\x22\x55\xe3\xe5\xa3\xf3\xdf\xb8\x86\xd5\x30\x76\xa6\x46\xa6\x23\x3f\x3e\xdb\xf7\x2c\x53\xb0\xba\x7c\x8d\x12\xfc\x60\x6a\x34\x96\x69\x4f\x6e\x0e\x6f\xe3\x59\xc6\x33\xf7\xb7\xf6\x12\x09\xaf\xbf\xf6\xaf\x95\x7c\x46\x4c\x42\x3f\xf8\xae\xaf\x7d\xac\x92\xa6\x0c\x55\x18\x65\x59\x53\xeb\x80\x1b\xb3\x37\x76\x86\x10\x56\x07\x67\x72\xed\x4e\xf8\x46\x27\x5c\xcf\x06\x2b\x9c\x26\x25\x90\xa4\x00\xc6\x96\xe4\xf8\x1e\xa5\x58\xae\xe1\x51\x82\xc2\x29\xb5\xc2\x59\x46\x85\xcb\xde\x0a\xa7\xb5\x9f\xaf\xb5\x0e\x09\x94\x94\xd1\x96\xf1\xff\x2c\x61\xe7\x8a\xfc\xef\xde\x29\xc0\x8b\x2b\x0e\xcf\x4b\xc7\x50\x67\x4c\x6e\x99\xde\x23\x83\x23\xab\x73\xc2\xb8\x90\x14\xe0\x27\x00\x00\xff\xff\x7a\xcc\xb2\x8c\x45\x02\x00\x00") - -func _2_timestampUpSqlBytes() ([]byte, error) { - return bindataRead( - __2_timestampUpSql, - "2_timestamp.up.sql", - ) -} - -func _2_timestampUpSql() (*asset, error) { - bytes, err := _2_timestampUpSqlBytes() - if err != nil { - return nil, err - } - - info := bindataFileInfo{name: "2_timestamp.up.sql", size: 581, mode: os.FileMode(0664), modTime: time.Unix(1720472437, 0)} - a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x54, 0x76, 0x6e, 0xf4, 0x2a, 0x56, 0x88, 0xd0, 0xe3, 0x5e, 0x7d, 0xbd, 0xec, 0x5c, 0x59, 0xfa, 0x44, 0x18, 0x82, 0xae, 0x55, 0x4c, 0xcf, 0x41, 0xa6, 0x7, 0x63, 0xba, 0x41, 0xa4, 0xfc, 0x3}} - return a, nil -} - -var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\xb1\x0d\xc4\x20\x0c\x05\xd0\x9e\x29\xfe\x02\xd8\xfd\x6d\xe3\x4b\xac\x2f\x44\x82\x09\x78\x7f\xa5\x49\xfd\xa6\x1d\xdd\xe8\xd8\xcf\x55\x8a\x2a\xe3\x47\x1f\xbe\x2c\x1d\x8c\xfa\x6f\xe3\xb4\x34\xd4\xd9\x89\xbb\x71\x59\xb6\x18\x1b\x35\x20\xa2\x9f\x0a\x03\xa2\xe5\x0d\x00\x00\xff\xff\x60\xcd\x06\xbe\x4a\x00\x00\x00") - -func docGoBytes() ([]byte, error) { - return bindataRead( - _docGo, - "doc.go", - ) -} - -func docGo() (*asset, error) { - bytes, err := docGoBytes() - if err != nil { - return nil, err - } - - info := bindataFileInfo{name: "doc.go", size: 74, mode: os.FileMode(0664), modTime: time.Unix(1715177003, 0)} - a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xde, 0x7c, 0x28, 0xcd, 0x47, 0xf2, 0xfa, 0x7c, 0x51, 0x2d, 0xd8, 0x38, 0xb, 0xb0, 0x34, 0x9d, 0x4c, 0x62, 0xa, 0x9e, 0x28, 0xc3, 0x31, 0x23, 0xd9, 0xbb, 0x89, 0x9f, 0xa0, 0x89, 0x1f, 0xe8}} - return a, nil -} - -// Asset loads and returns the asset for the given name. -// It returns an error if the asset could not be found or -// could not be loaded. -func Asset(name string) ([]byte, error) { - canonicalName := strings.Replace(name, "\\", "/", -1) - if f, ok := _bindata[canonicalName]; ok { - a, err := f() - if err != nil { - return nil, fmt.Errorf("Asset %s can't read by error: %v", name, err) - } - return a.bytes, nil - } - return nil, fmt.Errorf("Asset %s not found", name) -} - -// AssetString returns the asset contents as a string (instead of a []byte). -func AssetString(name string) (string, error) { - data, err := Asset(name) - return string(data), err -} - -// MustAsset is like Asset but panics when Asset would return an error. -// It simplifies safe initialization of global variables. -func MustAsset(name string) []byte { - a, err := Asset(name) - if err != nil { - panic("asset: Asset(" + name + "): " + err.Error()) - } - - return a -} - -// MustAssetString is like AssetString but panics when Asset would return an -// error. It simplifies safe initialization of global variables. -func MustAssetString(name string) string { - return string(MustAsset(name)) -} - -// AssetInfo loads and returns the asset info for the given name. -// It returns an error if the asset could not be found or -// could not be loaded. -func AssetInfo(name string) (os.FileInfo, error) { - canonicalName := strings.Replace(name, "\\", "/", -1) - if f, ok := _bindata[canonicalName]; ok { - a, err := f() - if err != nil { - return nil, fmt.Errorf("AssetInfo %s can't read by error: %v", name, err) - } - return a.info, nil - } - return nil, fmt.Errorf("AssetInfo %s not found", name) -} - -// AssetDigest returns the digest of the file with the given name. It returns an -// error if the asset could not be found or the digest could not be loaded. -func AssetDigest(name string) ([sha256.Size]byte, error) { - canonicalName := strings.Replace(name, "\\", "/", -1) - if f, ok := _bindata[canonicalName]; ok { - a, err := f() - if err != nil { - return [sha256.Size]byte{}, fmt.Errorf("AssetDigest %s can't read by error: %v", name, err) - } - return a.digest, nil - } - return [sha256.Size]byte{}, fmt.Errorf("AssetDigest %s not found", name) -} - -// Digests returns a map of all known files and their checksums. -func Digests() (map[string][sha256.Size]byte, error) { - mp := make(map[string][sha256.Size]byte, len(_bindata)) - for name := range _bindata { - a, err := _bindata[name]() - if err != nil { - return nil, err - } - mp[name] = a.digest - } - return mp, nil -} - -// AssetNames returns the names of the assets. -func AssetNames() []string { - names := make([]string, 0, len(_bindata)) - for name := range _bindata { - names = append(names, name) - } - return names -} - -// _bindata is a table, holding each asset generator, mapped to its name. -var _bindata = map[string]func() (*asset, error){ - "1_setup.up.sql": _1_setupUpSql, - - "2_timestamp.up.sql": _2_timestampUpSql, - - "doc.go": docGo, -} - -// AssetDir returns the file names below a certain -// directory embedded in the file by go-bindata. -// For example if you run go-bindata on data/... and data contains the -// following hierarchy: -// data/ -// foo.txt -// img/ -// a.png -// b.png -// then AssetDir("data") would return []string{"foo.txt", "img"}, -// AssetDir("data/img") would return []string{"a.png", "b.png"}, -// AssetDir("foo.txt") and AssetDir("notexist") would return an error, and -// AssetDir("") will return []string{"data"}. -func AssetDir(name string) ([]string, error) { - node := _bintree - if len(name) != 0 { - canonicalName := strings.Replace(name, "\\", "/", -1) - pathList := strings.Split(canonicalName, "/") - for _, p := range pathList { - node = node.Children[p] - if node == nil { - return nil, fmt.Errorf("Asset %s not found", name) - } - } - } - if node.Func != nil { - return nil, fmt.Errorf("Asset %s not found", name) - } - rv := make([]string, 0, len(node.Children)) - for childName := range node.Children { - rv = append(rv, childName) - } - return rv, nil -} - -type bintree struct { - Func func() (*asset, error) - Children map[string]*bintree -} - -var _bintree = &bintree{nil, map[string]*bintree{ - "1_setup.up.sql": &bintree{_1_setupUpSql, map[string]*bintree{}}, - "2_timestamp.up.sql": &bintree{_2_timestampUpSql, map[string]*bintree{}}, - "doc.go": &bintree{docGo, map[string]*bintree{}}, -}} - -// RestoreAsset restores an asset under the given directory. -func RestoreAsset(dir, name string) error { - data, err := Asset(name) - if err != nil { - return err - } - info, err := AssetInfo(name) - if err != nil { - return err - } - err = os.MkdirAll(_filePath(dir, filepath.Dir(name)), os.FileMode(0755)) - if err != nil { - return err - } - err = ioutil.WriteFile(_filePath(dir, name), data, info.Mode()) - if err != nil { - return err - } - return os.Chtimes(_filePath(dir, name), info.ModTime(), info.ModTime()) -} - -// RestoreAssets restores an asset under the given directory recursively. -func RestoreAssets(dir, name string) error { - children, err := AssetDir(name) - // File - if err != nil { - return RestoreAsset(dir, name) - } - // Dir - for _, child := range children { - err = RestoreAssets(dir, filepath.Join(name, child)) - if err != nil { - return err - } - } - return nil -} - -func _filePath(dir, name string) string { - canonicalName := strings.Replace(name, "\\", "/", -1) - return filepath.Join(append([]string{dir}, strings.Split(canonicalName, "/")...)...) -} diff --git a/internal/persistence/sqlite/migrations/sql/1_setup.up.sql b/internal/persistence/sqlite/migrations/sql/1_setup.up.sql deleted file mode 100644 index 0d7835b..0000000 --- a/internal/persistence/sqlite/migrations/sql/1_setup.up.sql +++ /dev/null @@ -1,31 +0,0 @@ -CREATE TABLE IF NOT EXISTS syncTopicStatus ( - clusterId INTEGER NOT NULL, - pubsubTopic VARCHAR NOT NULL, - lastSyncTimestamp INTEGER NOT NULL, - PRIMARY KEY (clusterId, pubsubTopic) -) WITHOUT ROWID; - - -CREATE TABLE IF NOT EXISTS missingMessages ( - runId VARCHAR NOT NULL, - clusterId INTEGER NOT NULL, - pubsubTopic VARCHAR NOT NULL, - messageHash VARCHAR NOT NULL, - msgTimestamp INTEGER NOT NULL, - storenode VARCHAR NOT NULL, - msgStatus VARCHAR NOT NULL, - storedAt INTEGER NOT NULL, - PRIMARY KEY (messageHash, storenode) -) WITHOUT ROWID; - -CREATE INDEX IF NOT EXISTS idxMsg1 ON missingMessages(storedAt DESC); -CREATE INDEX IF NOT EXISTS idxMsg2 ON missingMessages(runId); - -CREATE TABLE IF NOT EXISTS storeNodeUnavailable ( - runId VARCHAR NOT NULL, - storenode VARCHAR NOT NULL, - requestTime INTEGER NOT NULL, - PRIMARY KEY (runId, storenode) -) WITHOUT ROWID; - -CREATE INDEX IF NOT EXISTS idxStr1 ON storeNodeUnavailable(requestTime); diff --git a/internal/persistence/sqlite/migrations/sql/2_timestamp.up.sql b/internal/persistence/sqlite/migrations/sql/2_timestamp.up.sql deleted file mode 100644 index ca96ea2..0000000 --- a/internal/persistence/sqlite/migrations/sql/2_timestamp.up.sql +++ /dev/null @@ -1,20 +0,0 @@ -CREATE TABLE IF NOT EXISTS missingMessages_new ( - runId TEXT NOT NULL, - clusterId INTEGER NOT NULL, - pubsubTopic TEXT NOT NULL, - messageHash TEXT NOT NULL, - storenode TEXT NOT NULL, - msgStatus TEXT NOT NULL, - storedAt BIGINT NOT NULL, - PRIMARY KEY (messageHash, storenode) -); - -INSERT INTO missingMessages_new (runId, clusterId, pubsubTopic, messageHash, storenode, msgStatus, storedAt) -SELECT runId, clusterId, pubsubTopic, messageHash, storenode, msgStatus, storedAt -FROM missingMessages; - -DROP TABLE missingMessages; - -ALTER TABLE missingMessages_new -RENAME TO missingMessages; - diff --git a/internal/persistence/sqlite/migrations/sql/doc.go b/internal/persistence/sqlite/migrations/sql/doc.go deleted file mode 100644 index e0a0603..0000000 --- a/internal/persistence/sqlite/migrations/sql/doc.go +++ /dev/null @@ -1,3 +0,0 @@ -package sql - -//go:generate go-bindata -pkg migrations -o ../bindata.go ./ diff --git a/internal/persistence/sqlite/sqlite.go b/internal/persistence/sqlite/sqlite.go deleted file mode 100644 index cc797d4..0000000 --- a/internal/persistence/sqlite/sqlite.go +++ /dev/null @@ -1,58 +0,0 @@ -package sqlite - -import ( - "database/sql" - "strings" - - "github.com/golang-migrate/migrate/v4/database" - "github.com/golang-migrate/migrate/v4/database/sqlite3" - _ "github.com/mattn/go-sqlite3" // Blank import to register the sqlite3 driver - "github.com/waku-org/storenode-messages/internal/persistence/migrate" - "github.com/waku-org/storenode-messages/internal/persistence/sqlite/migrations" - "go.uber.org/zap" -) - -func addSqliteURLDefaults(dburl string) string { - if !strings.Contains(dburl, "?") { - dburl += "?" - } - - if !strings.Contains(dburl, "_journal=") { - dburl += "&_journal=WAL" - } - - if !strings.Contains(dburl, "_timeout=") { - dburl += "&_timeout=5000" - } - - return dburl -} - -// NewDB creates a sqlite3 DB in the specified path -func NewDB(dburl string, logger *zap.Logger) (*sql.DB, error) { - db, err := sql.Open("sqlite3", addSqliteURLDefaults(dburl)) - if err != nil { - return nil, err - } - - // Disable concurrent access as not supported by the driver - db.SetMaxOpenConns(1) - - return db, nil -} - -func migrationDriver(db *sql.DB) (database.Driver, error) { - return sqlite3.WithInstance(db, &sqlite3.Config{ - MigrationsTable: "message_counter_" + sqlite3.DefaultMigrationsTable, - }) -} - -// Migrations is the function used for DB migration with sqlite driver -func Migrations(db *sql.DB, logger *zap.Logger) error { - migrationDriver, err := migrationDriver(db) - if err != nil { - return err - } - - return migrate.Migrate(db, migrationDriver, migrations.AssetNames(), migrations.Asset) -} diff --git a/internal/persistence/utils.go b/internal/persistence/utils.go index e358806..adc7ecc 100644 --- a/internal/persistence/utils.go +++ b/internal/persistence/utils.go @@ -7,7 +7,6 @@ import ( "strings" "github.com/waku-org/storenode-messages/internal/persistence/postgres" - "github.com/waku-org/storenode-messages/internal/persistence/sqlite" "go.uber.org/zap" ) @@ -42,10 +41,8 @@ func ParseURL(databaseURL string, logger *zap.Logger) (*sql.DB, func(*sql.DB, *z dbURLParts := strings.Split(dbURL, "://") dbEngine := dbURLParts[0] dbParams := dbURLParts[1] + _ = dbParams switch dbEngine { - case "sqlite3": - db, err = sqlite.NewDB(dbParams, logger) - migrationFn = sqlite.Migrations case "postgres", "postgresql": db, err = postgres.NewDB(dbURL, logger) migrationFn = postgres.Migrations