feat: message recheck and extra metrics

This commit is contained in:
Richard Ramos 2024-08-07 16:14:06 -04:00
parent fdf8352dc1
commit d046ad3499
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760
16 changed files with 325 additions and 639 deletions

View File

@ -5,8 +5,6 @@ all: build
build:
go build -tags=gowaku_no_rln -o build/storemsgcounter ./cmd/storemsgcounter
go build -tags=gowaku_no_rln -o build/populatedb ./cmd/populatedb
lint-install:
curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | \

View File

@ -1,135 +0,0 @@
package main
import (
"database/sql"
"fmt"
"math/rand"
"os"
"os/signal"
"syscall"
"time"
"github.com/google/uuid"
_ "github.com/mattn/go-sqlite3" // Blank import to register the sqlite3 driver
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"google.golang.org/protobuf/proto"
)
type dbInfo struct {
id string
db *sql.DB
file string
stmt *sql.Stmt
inserted bool
}
type dbMap map[string]*dbInfo
var databases dbMap
func newDBInfo(file string) {
id := uuid.New().String()
db, err := sql.Open("sqlite3", file+"?_journal=WAL")
if err != nil {
panic(err.Error())
}
stmt, err := db.Prepare("INSERT INTO message(pubsubTopic, contentTopic, payload, version, timestamp, id, messageHash, storedAt) VALUES(?,?,?,?,?,?,?,?)")
if err != nil {
panic(err.Error())
}
databases[id] = &dbInfo{
id: id,
file: file,
db: db,
stmt: stmt,
}
}
func (d dbMap) values() []*dbInfo {
var result []*dbInfo
for _, x := range d {
result = append(result, x)
}
return result
}
func main() {
if len(os.Args) == 1 {
fmt.Println("Use: populatedb [path_to_db1 path_to_db2 ...]")
return
}
databases = make(dbMap)
for _, dbPath := range os.Args[1:] {
newDBInfo(dbPath)
}
pubsubTopics := []string{
"/waku/2/rs/1/1",
"/waku/2/rs/1/2",
"/waku/2/rs/1/3",
}
// Wait for a SIGINT or SIGTERM signal
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
timeInterval := 5 * time.Second
t := time.NewTicker(timeInterval)
minuteTimer := time.NewTimer(0)
fmt.Print(" \t\t\t\t\t\t")
i := 0
for range databases {
i++
fmt.Printf("db_%d\t", i)
}
fmt.Println()
for {
select {
case currTime := <-minuteTimer.C:
fmt.Println(currTime)
minuteTimer.Reset(1 * time.Minute)
case <-t.C:
insertCnt := rand.Intn(len(databases)) + 1 // Insert in at least one store node
dbArr := databases.values()
rand.Shuffle(len(dbArr), func(i, j int) { dbArr[i], dbArr[j] = dbArr[j], dbArr[i] })
for _, x := range dbArr {
x.inserted = false
}
now := time.Now().UnixNano()
msg := &pb.WakuMessage{
Timestamp: proto.Int64(now),
Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: "test",
}
envelope := protocol.NewEnvelope(msg, now, pubsubTopics[rand.Intn(len(pubsubTopics))])
hash := envelope.Hash()
for i := 0; i < insertCnt; i++ {
dbArr[i].inserted = true
_, err := dbArr[i].stmt.Exec([]byte(envelope.PubsubTopic()), []byte(msg.ContentTopic), msg.Payload, msg.GetVersion(), msg.GetTimestamp(), envelope.Index().Digest, hash.Bytes(), now)
if err != nil {
panic(err.Error())
}
}
fmt.Printf("%s\t%s\t%v\t", hash.String(), envelope.PubsubTopic(), msg.GetTimestamp())
for _, x := range databases {
fmt.Printf("%v\t", x.inserted)
}
fmt.Println()
case <-ch:
return
}
}
}

View File

@ -10,6 +10,8 @@ import (
"sync"
"time"
"golang.org/x/exp/maps"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/peer"
@ -128,8 +130,10 @@ func Execute(ctx context.Context, options Options) error {
}
defer wakuNode.Stop()
var storenodeIDs peer.IDSlice
for _, s := range storenodes {
wakuNode.Host().Peerstore().AddAddrs(s.ID, s.Addrs, peerstore.PermanentAddrTTL)
storenodeIDs = append(storenodeIDs, s.ID)
}
err = dbStore.Start(ctx, wakuNode.Timesource())
@ -143,25 +147,50 @@ func Execute(ctx context.Context, options Options) error {
db: dbStore,
}
timer := time.NewTimer(0)
defer timer.Stop()
missingMessagesTimer := time.NewTimer(0)
defer missingMessagesTimer.Stop()
syncCheckTimer := time.NewTicker(30 * time.Minute)
defer syncCheckTimer.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-timer.C:
case <-missingMessagesTimer.C:
tmpUUID := uuid.New()
runId := hex.EncodeToString(tmpUUID[:])
runIdLogger := logger.With(zap.String("runId", runId))
runIdLogger.Info("verifying message history...")
err := application.verifyHistory(ctx, runId, storenodes, runIdLogger)
err := application.verifyHistory(ctx, runId, storenodeIDs, runIdLogger)
if err != nil {
return err
}
runIdLogger.Info("verification complete")
timer.Reset(timeInterval)
missingMessagesTimer.Reset(timeInterval)
case <-syncCheckTimer.C:
go func() {
tmpUUID := uuid.New()
runId := hex.EncodeToString(tmpUUID[:])
runIdLogger := logger.With(zap.String("syncRunId", runId))
runIdLogger.Info("rechecking missing messages status")
err := application.checkMissingMessageStatus(ctx, runId, runIdLogger)
if err != nil {
logger.Error("could not recheck the status of missing messages", zap.Error(err))
return
}
err = application.countMissingMessages()
if err != nil {
logger.Error("could not count missing messages", zap.Error(err))
return
}
runIdLogger.Info("missing messages recheck complete")
}()
}
}
}
@ -170,7 +199,7 @@ var msgMapLock sync.Mutex
var msgMap map[pb.MessageHash]map[peer.ID]MessageExistence
var msgPubsubTopic map[pb.MessageHash]string
func (app *Application) verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo, logger *zap.Logger) error {
func (app *Application) verifyHistory(ctx context.Context, runId string, storenodes peer.IDSlice, logger *zap.Logger) error {
// [MessageHash][StoreNode] = exists?
msgMapLock.Lock()
@ -213,9 +242,9 @@ func (app *Application) verifyHistory(ctx context.Context, runId string, storeno
msgsToVerify := make(map[peer.ID][]pb.MessageHash) // storenode -> msgHash
msgMapLock.Lock()
for msgHash, nodes := range msgMap {
for _, node := range storenodes {
if nodes[node.ID] != Exists {
msgsToVerify[node.ID] = append(msgsToVerify[node.ID], msgHash)
for _, s := range storenodes {
if nodes[s] != Exists {
msgsToVerify[s] = append(msgsToVerify[s], msgHash)
}
}
}
@ -226,7 +255,27 @@ func (app *Application) verifyHistory(ctx context.Context, runId string, storeno
wg.Add(1)
go func(peerID peer.ID, messageHashes []pb.MessageHash) {
defer wg.Done()
app.verifyMessageExistence(ctx, runId, peerID, messageHashes, logger)
onResult := func(result *store.Result) {
msgMapLock.Lock()
for _, mkv := range result.Messages() {
hash := mkv.WakuMessageHash()
_, ok := msgMap[hash]
if !ok {
msgMap[hash] = make(map[peer.ID]MessageExistence)
}
msgMap[hash][result.PeerID()] = Exists
}
for _, msgHash := range messageHashes {
if msgMap[msgHash][result.PeerID()] != Exists {
msgMap[msgHash][result.PeerID()] = DoesNotExist
}
}
msgMapLock.Unlock()
}
app.verifyMessageExistence(ctx, runId, peerID, messageHashes, onResult, logger)
}(peerID, messageHashes)
}
wg.Wait()
@ -245,13 +294,13 @@ func (app *Application) verifyHistory(ctx context.Context, runId string, storeno
var missingIn []peer.ID
var unknownIn []peer.ID
for _, node := range storenodes {
if nodes[node.ID] == DoesNotExist {
missingIn = append(missingIn, node.ID)
missingInSummary[node.ID]++
} else if nodes[node.ID] == Unknown {
unknownIn = append(unknownIn, node.ID)
unknownInSummary[node.ID]++
for _, s := range storenodes {
if nodes[s] == DoesNotExist {
missingIn = append(missingIn, s)
missingInSummary[s]++
} else if nodes[s] == Unknown {
unknownIn = append(unknownIn, s)
unknownInSummary[s]++
}
}
@ -274,13 +323,13 @@ func (app *Application) verifyHistory(ctx context.Context, runId string, storeno
}
for _, s := range storenodes {
missingCnt := missingInSummary[s.ID]
app.metrics.RecordMissingMessages(s.ID, "does_not_exist", missingCnt)
logger.Info("missing message summary", zap.Stringer("storenode", s.ID), zap.Int("numMsgs", missingCnt))
missingCnt := missingInSummary[s]
app.metrics.RecordMissingMessages(s, "does_not_exist", missingCnt)
logger.Info("missing message summary", zap.Stringer("storenode", s), zap.Int("numMsgs", missingCnt))
unknownCnt := unknownInSummary[s.ID]
app.metrics.RecordMissingMessages(s.ID, "unknown", unknownCnt)
logger.Info("messages that could not be verified summary", zap.Stringer("storenode", s.ID), zap.Int("numMsgs", missingCnt))
unknownCnt := unknownInSummary[s]
app.metrics.RecordMissingMessages(s, "unknown", unknownCnt)
logger.Info("messages that could not be verified summary", zap.Stringer("storenode", s), zap.Int("numMsgs", missingCnt))
}
logger.Info("total missing messages", zap.Int("total", totalMissingMessages))
@ -289,6 +338,70 @@ func (app *Application) verifyHistory(ctx context.Context, runId string, storeno
return nil
}
func (app *Application) checkMissingMessageStatus(ctx context.Context, runId string, logger *zap.Logger) error {
now := app.node.Timesource().Now()
// Get all messages whose status is missing or does not exist, and the column found_on_recheck is false
// if found, set found_on_recheck to true
missingMessages, err := app.db.GetMissingMessages(now.Add(-2*time.Hour), now.Add(-time.Hour), options.ClusterID)
if err != nil {
return err
}
wg := sync.WaitGroup{}
for storenodeID, messageHashes := range missingMessages {
wg.Add(1)
go func(peerID peer.ID, messageHashes []pb.MessageHash) {
defer wg.Done()
foundMissingMessages := make(map[pb.MessageHash]struct{})
app.verifyMessageExistence(ctx, runId, peerID, messageHashes, func(result *store.Result) {
for _, mkv := range result.Messages() {
foundMissingMessages[mkv.WakuMessageHash()] = struct{}{}
}
}, logger)
err := app.db.MarkMessagesAsFound(peerID, maps.Keys(foundMissingMessages), options.ClusterID)
if err != nil {
logger.Error("could not mark messages as found", zap.Error(err))
return
}
app.metrics.RecordMissingMessagesPrevHour(peerID, len(messageHashes)-len(foundMissingMessages))
}(storenodeID, messageHashes)
}
wg.Wait()
return nil
}
func (app *Application) countMissingMessages() error {
// not including last two hours in now to let sync work
now := app.node.Timesource().Now().Add(-2 * time.Hour)
// Count messages in last day (not including last two hours)
results, err := app.db.CountMissingMessages(now.Add(-24*time.Hour), now, options.ClusterID)
if err != nil {
return err
}
for storenode, cnt := range results {
app.metrics.RecordMissingMessagesLastDay(storenode, cnt)
}
// Count messages in last week (not including last two hours)
results, err = app.db.CountMissingMessages(now.Add(-24*time.Hour*7), now, options.ClusterID)
if err != nil {
return err
}
for storenode, cnt := range results {
app.metrics.RecordMissingMessagesLastWeek(storenode, cnt)
}
return nil
}
func (app *Application) fetchStoreNodeMessages(ctx context.Context, runId string, storenodeID peer.ID, topic string, startTime time.Time, endTime time.Time, logger *zap.Logger) {
var result *store.Result
var err error
@ -379,7 +492,7 @@ func (app *Application) fetchStoreNodeMessages(ctx context.Context, runId string
}
}
func (app *Application) retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo, topic string, lastSyncTimestamp *time.Time, tx *sql.Tx, logger *zap.Logger) {
func (app *Application) retrieveHistory(ctx context.Context, runId string, storenodes peer.IDSlice, topic string, lastSyncTimestamp *time.Time, tx *sql.Tx, logger *zap.Logger) {
logger = logger.With(zap.String("topic", topic), zap.Timep("lastSyncTimestamp", lastSyncTimestamp))
if lastSyncTimestamp != nil {
@ -402,12 +515,12 @@ func (app *Application) retrieveHistory(ctx context.Context, runId string, store
// Determine if the messages exist across all nodes
wg := sync.WaitGroup{}
for _, node := range storenodes {
for _, storePeerID := range storenodes {
wg.Add(1)
go func(peerID peer.ID) {
defer wg.Done()
app.fetchStoreNodeMessages(ctx, runId, peerID, topic, startTime, endTime, logger)
}(node.ID)
}(storePeerID)
}
wg.Wait()
@ -422,7 +535,7 @@ func (app *Application) retrieveHistory(ctx context.Context, runId string, store
}
func (app *Application) verifyMessageExistence(ctx context.Context, runId string, peerID peer.ID, messageHashes []pb.MessageHash, logger *zap.Logger) {
func (app *Application) verifyMessageExistence(ctx context.Context, runId string, peerID peer.ID, messageHashes []pb.MessageHash, onResult func(result *store.Result), logger *zap.Logger) {
var result *store.Result
var err error
@ -462,23 +575,7 @@ func (app *Application) verifyMessageExistence(ctx context.Context, runId string
app.metrics.RecordStorenodeAvailability(peerID, true)
for !result.IsComplete() {
msgMapLock.Lock()
for _, mkv := range result.Messages() {
hash := mkv.WakuMessageHash()
_, ok := msgMap[hash]
if !ok {
msgMap[hash] = make(map[peer.ID]MessageExistence)
}
msgMap[hash][peerInfo.ID] = Exists
}
for _, msgHash := range messageHashes {
if msgMap[msgHash][peerInfo.ID] != Exists {
msgMap[msgHash][peerInfo.ID] = DoesNotExist
}
}
msgMapLock.Unlock()
onResult(result)
retry := true
success := false

View File

@ -65,7 +65,7 @@ var cliFlags = []cli.Flag{
altsrc.NewStringFlag(&cli.StringFlag{
Name: "db-url",
Usage: "The database connection URL for persistent storage.",
Value: "sqlite3://storage.db",
Value: "",
Destination: &options.DatabaseURL,
EnvVars: []string{"MSG_VERIF_DB_URL"},
}),

18
go.mod
View File

@ -4,7 +4,9 @@ go 1.21
toolchain go1.21.10
replace github.com/ethereum/go-ethereum v1.10.26 => github.com/status-im/go-ethereum v1.10.25-status.4
replace github.com/ethereum/go-ethereum v1.10.26 => github.com/status-im/go-ethereum v1.10.25-status.15
replace github.com/libp2p/go-libp2p-pubsub v0.11.0 => github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5
require (
github.com/ethereum/go-ethereum v1.10.26
@ -12,14 +14,14 @@ require (
github.com/google/uuid v1.4.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/jackc/pgx/v5 v5.4.1
github.com/libp2p/go-libp2p v0.35.0
github.com/mattn/go-sqlite3 v1.14.17
github.com/libp2p/go-libp2p v0.35.2
github.com/multiformats/go-multiaddr v0.12.4
github.com/prometheus/client_golang v1.19.1
github.com/urfave/cli/v2 v2.27.2
github.com/waku-org/go-waku v0.8.1-0.20240605190333-d2d2f5672ebd
github.com/waku-org/go-waku v0.8.1-0.20240806200653-4f1d692413e9
go.opencensus.io v0.24.0
go.uber.org/zap v1.27.0
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842
google.golang.org/protobuf v1.34.1
)
@ -33,6 +35,7 @@ require (
github.com/btcsuite/btcd v0.20.1-beta // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.1 // indirect
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d // indirect
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/cgroups v1.1.0 // indirect
@ -56,7 +59,7 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/google/pprof v0.0.0-20240207164012-fb44976bdcd5 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect
@ -107,7 +110,7 @@ require (
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pion/datachannel v1.5.6 // indirect
github.com/pion/dtls/v2 v2.2.11 // indirect
github.com/pion/ice/v2 v2.3.24 // indirect
github.com/pion/ice/v2 v2.3.25 // indirect
github.com/pion/interceptor v0.1.29 // indirect
github.com/pion/logging v0.2.2 // indirect
github.com/pion/mdns v0.0.12 // indirect
@ -149,11 +152,10 @@ require (
github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/dig v1.17.1 // indirect
go.uber.org/fx v1.21.1 // indirect
go.uber.org/fx v1.22.1 // indirect
go.uber.org/mock v0.4.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sync v0.7.0 // indirect

30
go.sum
View File

@ -100,6 +100,8 @@ github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtE
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23/go.mod h1:bbYlZJ7hK1yFx9hf58LP0zeX7UjIGs20ufpu3evjr+s=
github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOCSiVIqS34=
github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M=
github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo=
github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
@ -316,8 +318,8 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.1 h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/QY=
github.com/gorilla/websocket v1.5.1/go.mod h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/graph-gophers/graphql-go v1.3.0/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw=
@ -475,12 +477,10 @@ github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6
github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg=
github.com/libp2p/go-flow-metrics v0.1.0 h1:0iPhMI8PskQwzh57jB9WxIuIOQ0r+15PChFGkx3Q3WM=
github.com/libp2p/go-flow-metrics v0.1.0/go.mod h1:4Xi8MX8wj5aWNDAZttg6UPmc0ZrnFNsMtpsYUClFtro=
github.com/libp2p/go-libp2p v0.35.0 h1:1xS1Bkr9X7GtdvV6ntLnDV9xB1kNjHK1lZ0eaO6gnhc=
github.com/libp2p/go-libp2p v0.35.0/go.mod h1:snyJQix4ET6Tj+LeI0VPjjxTtdWpeOhYt5lEY0KirkQ=
github.com/libp2p/go-libp2p v0.35.2 h1:287oHbuplkrLdAF+syB0n/qDgd50AUBtEODqS0e0HDs=
github.com/libp2p/go-libp2p v0.35.2/go.mod h1:RKCDNt30IkFipGL0tl8wQW/3zVWEGFUZo8g2gAKxwjU=
github.com/libp2p/go-libp2p-asn-util v0.4.1 h1:xqL7++IKD9TBFMgnLPZR6/6iYhawHKHl950SO9L6n94=
github.com/libp2p/go-libp2p-asn-util v0.4.1/go.mod h1:d/NI6XZ9qxw67b4e+NgpQexCIiFYJjErASrYW4PFDN8=
github.com/libp2p/go-libp2p-pubsub v0.11.0 h1:+JvS8Kty0OiyUiN0i8H5JbaCgjnJTRnTHe4rU88dLFc=
github.com/libp2p/go-libp2p-pubsub v0.11.0/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ=
github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA=
github.com/libp2p/go-libp2p-testing v0.12.0/go.mod h1:KcGDRXyN7sQCllucn1cOOS+Dmm7ujhfEyXQL5lvkcPg=
github.com/libp2p/go-msgio v0.3.0 h1:mf3Z8B1xcFN314sWX+2vOTShIE0Mmn2TXn3YCUQGNj0=
@ -630,8 +630,8 @@ github.com/pion/datachannel v1.5.6/go.mod h1:1eKT6Q85pRnr2mHiWHxJwO50SfZRtWHTsNI
github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s=
github.com/pion/dtls/v2 v2.2.11 h1:9U/dpCYl1ySttROPWJgqWKEylUdT0fXp/xst6JwY5Ks=
github.com/pion/dtls/v2 v2.2.11/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE=
github.com/pion/ice/v2 v2.3.24 h1:RYgzhH/u5lH0XO+ABatVKCtRd+4U1GEaCXSMjNr13tI=
github.com/pion/ice/v2 v2.3.24/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw=
github.com/pion/ice/v2 v2.3.25 h1:M5rJA07dqhi3nobJIg+uPtcVjFECTrhcR3n0ns8kDZs=
github.com/pion/ice/v2 v2.3.25/go.mod h1:KXJJcZK7E8WzrBEYnV4UtqEZsGeWfHxsNqhVcVvgjxw=
github.com/pion/interceptor v0.1.29 h1:39fsnlP1U8gw2JzOFWdfCU82vHvhW9o0rZnZF56wF+M=
github.com/pion/interceptor v0.1.29/go.mod h1:ri+LGNjRUc5xUNtDEPzfdkmSqISixVTBF/z/Zms/6T4=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
@ -771,8 +771,8 @@ github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/status-im/go-ethereum v1.10.25-status.4 h1:Ff35UseaP49DK2RRpSLo3DL7NUmMKzicQ/7/yub6gfU=
github.com/status-im/go-ethereum v1.10.25-status.4/go.mod h1:Dt4K5JYMhJRdtXJwBEyGZLZn9iz/chSOZyjVmt5ZhwQ=
github.com/status-im/go-ethereum v1.10.25-status.15 h1:il5fD124sV2i6+2hf6iK4MRSRQIeZAl51kc/svZTsdo=
github.com/status-im/go-ethereum v1.10.25-status.15/go.mod h1:Dt4K5JYMhJRdtXJwBEyGZLZn9iz/chSOZyjVmt5ZhwQ=
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4 h1:Gb2Tyox57NRNuZ2d3rmvB3pcmbu7O1RS3m8WRx7ilrg=
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q=
github.com/status-im/status-go/extkeys v1.1.2 h1:FSjARgDathJ3rIapJt851LsIXP9Oyuu2M2jPJKuzloU=
@ -820,10 +820,12 @@ github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49u
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=
github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5 h1:4K3IS97JryAEV8pRXB//qPcg+8bPXl/O+AOLt3FeCKc=
github.com/waku-org/go-discover v0.0.0-20240506173252-4912704efdc5/go.mod h1:eBHgM6T4EG0RZzxpxKy+rGz/6Dw2Nd8DWxS0lm9ESDw=
github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16Et17jSGWM3mQhlIOZYiG+O+rlX5BsrBumw5flxk=
github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
github.com/waku-org/go-waku v0.8.1-0.20240605190333-d2d2f5672ebd h1:g5EneT88eHOakr8Zukx4RP1JICgrSl9ZtmgGS+wQbC8=
github.com/waku-org/go-waku v0.8.1-0.20240605190333-d2d2f5672ebd/go.mod h1:biffO55kWbvfO8jdu/aAPiWcmozrfFKPum4EMFDib+k=
github.com/waku-org/go-waku v0.8.1-0.20240806200653-4f1d692413e9 h1:aTOUQm0kKtHiqraFpqj1Ja++C+qyZyeiSPKtXe3Ctac=
github.com/waku-org/go-waku v0.8.1-0.20240806200653-4f1d692413e9/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=
@ -858,8 +860,8 @@ go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/dig v1.17.1 h1:Tga8Lz8PcYNsWsyHMZ1Vm0OQOUaJNDyvPImgbAu9YSc=
go.uber.org/dig v1.17.1/go.mod h1:Us0rSJiThwCv2GteUN0Q7OKvU7n5J4dxZ9JKUXozFdE=
go.uber.org/fx v1.21.1 h1:RqBh3cYdzZS0uqwVeEjOX2p73dddLpym315myy/Bpb0=
go.uber.org/fx v1.21.1/go.mod h1:HT2M7d7RHo+ebKGh9NRcrsrHHfpZ60nW3QRubMRfv48=
go.uber.org/fx v1.22.1 h1:nvvln7mwyT5s1q201YE29V/BFrGor6vMiDNpU/78Mys=
go.uber.org/fx v1.22.1/go.mod h1:HT2M7d7RHo+ebKGh9NRcrsrHHfpZ60nW3QRubMRfv48=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=

View File

@ -40,11 +40,36 @@ var topicLastSync = prometheus.NewGaugeVec(
[]string{"pubsubtopic"},
)
var missingMessagesLastDay = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "msgcounter_missing_messages_last_day",
Help: "The number of messages missing in last 24hr (with 2hr delay)",
},
[]string{"storenode"},
)
var missingMessagesLastWeek = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "msgcounter_missing_messages_last_week",
Help: "The number of messages missing in last week (with 2hr delay)",
},
[]string{"storenode"},
)
var missingMessagesPreviousHour = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "msgcounter_missing_messages_prev_hour",
Help: "The number of messages missing in the previous hour",
},
[]string{"storenode"},
)
var collectors = []prometheus.Collector{
missingMessages,
storenodeAvailability,
totalMissingMessages,
topicLastSync,
missingMessagesLastDay,
}
// Metrics exposes the functions required to update prometheus metrics for relay protocol
@ -53,6 +78,9 @@ type Metrics interface {
RecordStorenodeAvailability(peerID peer.ID, available bool)
RecordTotalMissingMessages(cnt int)
RecordLastSyncDate(topic string, date time.Time)
RecordMissingMessagesLastDay(peerID peer.ID, cnt int)
RecordMissingMessagesLastWeek(peerID peer.ID, cnt int)
RecordMissingMessagesPrevHour(peerID peer.ID, cnt int)
}
type metricsImpl struct {
@ -95,3 +123,21 @@ func (m *metricsImpl) RecordLastSyncDate(topic string, date time.Time) {
topicLastSync.WithLabelValues(topic).Set(float64(date.Unix()))
}()
}
func (m *metricsImpl) RecordMissingMessagesLastDay(peerID peer.ID, cnt int) {
go func() {
missingMessagesLastDay.WithLabelValues(peerID.String()).Set(float64(cnt))
}()
}
func (m *metricsImpl) RecordMissingMessagesLastWeek(peerID peer.ID, cnt int) {
go func() {
missingMessagesLastWeek.WithLabelValues(peerID.String()).Set(float64(cnt))
}()
}
func (m *metricsImpl) RecordMissingMessagesPrevHour(peerID peer.ID, cnt int) {
go func() {
missingMessagesPreviousHour.WithLabelValues(peerID.String()).Set(float64(cnt))
}()
}

View File

@ -3,9 +3,11 @@ package persistence
import (
"context"
"database/sql"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
@ -237,6 +239,40 @@ func (d *DBStore) GetTopicSyncStatus(ctx context.Context, clusterID uint, pubsub
return result, nil
}
func (d *DBStore) GetMissingMessages(from time.Time, to time.Time, clusterID uint) (map[peer.ID][]pb.MessageHash, error) {
rows, err := d.db.Query("SELECT messageHash, storenode FROM missingMessages WHERE storedAt >= $1 AND storedAt <= $2 AND clusterId = $3 AND msgStatus = 'does_not_exist'", from.UnixNano(), to.UnixNano(), clusterID)
if err != nil {
return nil, err
}
defer rows.Close()
results := make(map[peer.ID][]pb.MessageHash)
for rows.Next() {
var messageHashStr string
var peerIDStr string
err := rows.Scan(&messageHashStr, &peerIDStr)
if err != nil {
return nil, err
}
peerID, err := peer.Decode(peerIDStr)
if err != nil {
d.log.Warn("could not decode peerID", zap.String("peerIDStr", peerIDStr), zap.Error(err))
continue
}
messageHashBytes, err := hexutil.Decode(messageHashStr)
if err != nil {
d.log.Warn("could not decode messageHash", zap.String("messageHashStr", messageHashStr), zap.Error(err))
continue
}
results[peerID] = append(results[peerID], pb.ToMessageHash(messageHashBytes))
}
return results, nil
}
func (d *DBStore) UpdateTopicSyncState(tx *sql.Tx, clusterID uint, topic string, lastSyncTimestamp time.Time) error {
stmt, err := tx.Prepare("INSERT INTO syncTopicStatus(clusterId, pubsubTopic, lastSyncTimestamp) VALUES ($1, $2, $3) ON CONFLICT(clusterId, pubsubTopic) DO UPDATE SET lastSyncTimestamp = $4")
if err != nil {
@ -270,6 +306,26 @@ func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash,
return nil
}
func (d *DBStore) MarkMessagesAsFound(peerID peer.ID, messageHashes []pb.MessageHash, clusterID uint) error {
query := "UPDATE missingMessages SET foundOnRecheck = true WHERE clusterID = $1 AND messageHash IN ("
for i := range messageHashes {
if i > 0 {
query += ", "
}
query += fmt.Sprintf("$%d", i+2)
}
query += ")"
args := []interface{}{clusterID}
for _, messageHash := range messageHashes {
args = append(args, messageHash)
}
_, err := d.db.Exec(query, args...)
return err
}
func (d *DBStore) RecordStorenodeUnavailable(uuid string, storenode peer.ID) error {
stmt, err := d.db.Prepare("INSERT INTO storeNodeUnavailable(runId, storenode, requestTime) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING")
if err != nil {
@ -285,3 +341,31 @@ func (d *DBStore) RecordStorenodeUnavailable(uuid string, storenode peer.ID) err
return nil
}
func (d *DBStore) CountMissingMessages(from time.Time, to time.Time, clusterID uint) (map[peer.ID]int, error) {
rows, err := d.db.Query("SELECT storenode, count(1) as cnt FROM missingMessages WHERE storedAt >= $1 AND storedAt <= $2 AND clusterId = $3 AND msgStatus = 'does_not_exist' GROUP BY storenode", from.UnixNano(), to.UnixNano(), clusterID)
if err != nil {
return nil, err
}
defer rows.Close()
results := make(map[peer.ID]int)
for rows.Next() {
var peerIDStr string
var cnt int
err := rows.Scan(&peerIDStr, &cnt)
if err != nil {
return nil, err
}
peerID, err := peer.Decode(peerIDStr)
if err != nil {
d.log.Warn("could not decode peerID", zap.String("peerIDStr", peerIDStr), zap.Error(err))
continue
}
results[peerID] = cnt
}
return results, nil
}

View File

@ -2,6 +2,7 @@
// sources:
// 1_setup.up.sql (856B)
// 2_timestamp.up.sql (53B)
// 3_found.up.sql (86B)
// doc.go (74B)
package migrations
@ -111,6 +112,26 @@ func _2_timestampUpSql() (*asset, error) {
return a, nil
}
var __3_foundUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x04\xc0\x41\x0e\x82\x40\x0c\x05\xd0\x3d\xa7\xf8\xf7\x70\x55\x9c\xb2\xfa\xb4\x09\x76\x0e\x60\xb0\x22\x31\x8e\x8b\xc6\xfb\xfb\x84\xa1\x1b\x42\x66\x2a\x3e\x67\xd5\x39\x8e\x35\xab\xee\x47\xd6\x24\xad\xe1\xea\xec\xab\xe1\xf9\xfd\x8d\x87\x8f\x2d\xf7\x57\xee\x6f\xcc\xee\x54\x31\x34\x5d\xa4\x33\xb0\x08\x6f\x0a\xf3\x80\x75\xf2\x32\xfd\x03\x00\x00\xff\xff\xc0\x0e\x74\x77\x56\x00\x00\x00")
func _3_foundUpSqlBytes() ([]byte, error) {
return bindataRead(
__3_foundUpSql,
"3_found.up.sql",
)
}
func _3_foundUpSql() (*asset, error) {
bytes, err := _3_foundUpSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "3_found.up.sql", size: 86, mode: os.FileMode(0664), modTime: time.Unix(1723057360, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x65, 0x30, 0x49, 0xb0, 0x22, 0x8c, 0xab, 0xc4, 0x3a, 0x4e, 0x39, 0xe, 0x31, 0x1, 0x53, 0xbe, 0x5c, 0x7a, 0x9a, 0xcd, 0x84, 0xe9, 0x28, 0x7d, 0xeb, 0xb4, 0x2, 0xc2, 0x3d, 0xee, 0x5d, 0x7a}}
return a, nil
}
var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\xb1\x0d\xc4\x20\x0c\x05\xd0\x9e\x29\xfe\x02\xd8\xfd\x6d\xe3\x4b\xac\x2f\x44\x82\x09\x78\x7f\xa5\x49\xfd\xa6\x1d\xdd\xe8\xd8\xcf\x55\x8a\x2a\xe3\x47\x1f\xbe\x2c\x1d\x8c\xfa\x6f\xe3\xb4\x34\xd4\xd9\x89\xbb\x71\x59\xb6\x18\x1b\x35\x20\xa2\x9f\x0a\x03\xa2\xe5\x0d\x00\x00\xff\xff\x60\xcd\x06\xbe\x4a\x00\x00\x00")
func docGoBytes() ([]byte, error) {
@ -226,6 +247,8 @@ var _bindata = map[string]func() (*asset, error){
"2_timestamp.up.sql": _2_timestampUpSql,
"3_found.up.sql": _3_foundUpSql,
"doc.go": docGo,
}
@ -272,6 +295,7 @@ type bintree struct {
var _bintree = &bintree{nil, map[string]*bintree{
"1_setup.up.sql": &bintree{_1_setupUpSql, map[string]*bintree{}},
"2_timestamp.up.sql": &bintree{_2_timestampUpSql, map[string]*bintree{}},
"3_found.up.sql": &bintree{_3_foundUpSql, map[string]*bintree{}},
"doc.go": &bintree{docGo, map[string]*bintree{}},
}}

View File

@ -0,0 +1,2 @@
ALTER TABLE missingMessages
ADD COLUMN foundOnRecheck BOOLEAN DEFAULT FALSE NOT NULL;

View File

@ -1,319 +0,0 @@
// Code generated by go-bindata. DO NOT EDIT.
// sources:
// 1_setup.up.sql (927B)
// 2_timestamp.up.sql (581B)
// doc.go (74B)
package migrations
import (
"bytes"
"compress/gzip"
"crypto/sha256"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
"time"
)
func bindataRead(data []byte, name string) ([]byte, error) {
gz, err := gzip.NewReader(bytes.NewBuffer(data))
if err != nil {
return nil, fmt.Errorf("read %q: %v", name, err)
}
var buf bytes.Buffer
_, err = io.Copy(&buf, gz)
clErr := gz.Close()
if err != nil {
return nil, fmt.Errorf("read %q: %v", name, err)
}
if clErr != nil {
return nil, err
}
return buf.Bytes(), nil
}
type asset struct {
bytes []byte
info os.FileInfo
digest [sha256.Size]byte
}
type bindataFileInfo struct {
name string
size int64
mode os.FileMode
modTime time.Time
}
func (fi bindataFileInfo) Name() string {
return fi.name
}
func (fi bindataFileInfo) Size() int64 {
return fi.size
}
func (fi bindataFileInfo) Mode() os.FileMode {
return fi.mode
}
func (fi bindataFileInfo) ModTime() time.Time {
return fi.modTime
}
func (fi bindataFileInfo) IsDir() bool {
return false
}
func (fi bindataFileInfo) Sys() interface{} {
return nil
}
var __1_setupUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xa4\x91\x4d\x4f\xc2\x30\x18\xc7\xcf\xf4\x53\x3c\xc7\x2d\xe1\xa2\xd7\x9d\x26\x54\x69\x84\xcd\xb4\x45\xe0\x58\x68\x33\x9b\xec\x05\xf7\xb4\x46\xbf\xbd\xa1\x12\x82\x58\xc7\x81\x73\xfb\xfc\x5f\x7e\xff\x09\xa7\xb9\xa4\x20\xf3\x87\x39\x05\xf6\x08\x45\x29\x81\xae\x99\x90\x02\xf0\xab\xdd\xc9\x6e\x6f\x77\xc2\x29\xe7\x11\x12\x32\xda\xd5\x1e\x9d\xe9\x99\x06\x56\x48\xfa\x44\x79\xf8\x5f\x2c\xe7\xf3\x31\x19\xed\xfd\x16\xfd\x36\x5c\xc0\x6b\xce\x27\xb3\xfc\xd7\x73\xad\xd0\x89\x83\xa4\x6d\x0c\x3a\xd5\xec\x63\x1a\x2f\x9c\x2d\x72\xbe\x81\x67\xba\x81\xe4\xe4\x36\x86\x33\xed\x94\xa4\xb0\x62\x72\x56\x2e\x25\xf0\x72\xc5\xa6\x19\x21\x64\xa0\x46\x63\x11\x6d\x5b\x2d\x0c\xa2\xaa\x4c\xa8\xd1\xfb\x96\xe9\x58\xc6\x5b\xfa\x35\x3f\x06\x33\x85\x6f\xd1\x67\xac\x06\x9b\xa3\xeb\x7a\xd3\x76\xda\xfc\x73\x7c\x1c\x21\xf2\x18\x2e\x75\xee\xfe\xaa\xc2\x05\xd0\xb3\x88\x63\x38\x19\xc6\x80\x1e\x79\xb2\x62\x4a\xd7\x17\x3c\xad\xfe\x5c\x60\x75\x07\x65\x71\x89\x36\x39\x25\x99\x52\x31\x49\xb3\xeb\x2a\xf7\x31\x95\xb0\x4e\x9a\x0d\x8e\x1a\x9c\x8a\x4e\x9b\x65\xab\x3e\x94\xad\xd5\xb6\x36\x83\xcb\x0e\xe2\xed\xcd\xbb\x37\xe8\x0e\xfb\x5c\x87\x18\x2c\x6e\xc2\x27\x5c\x1f\xf0\xc5\x4a\x24\x67\x59\xd2\x8c\x7c\x07\x00\x00\xff\xff\x6e\xd3\x44\xe6\x9f\x03\x00\x00")
func _1_setupUpSqlBytes() ([]byte, error) {
return bindataRead(
__1_setupUpSql,
"1_setup.up.sql",
)
}
func _1_setupUpSql() (*asset, error) {
bytes, err := _1_setupUpSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "1_setup.up.sql", size: 927, mode: os.FileMode(0664), modTime: time.Unix(1716212183, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xcc, 0x43, 0xbc, 0x2c, 0x21, 0xab, 0xc4, 0xe7, 0xc5, 0x35, 0xea, 0xbb, 0x4b, 0x1c, 0x98, 0x4, 0x62, 0x56, 0x6d, 0xaf, 0x18, 0x28, 0xe5, 0xf8, 0x8, 0xfd, 0xa1, 0x3, 0xa8, 0xe0, 0x65, 0xe1}}
return a, nil
}
var __2_timestampUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xac\x91\x41\x6f\x83\x30\x0c\x85\xcf\xf5\xaf\xf0\xb1\x48\xf9\x07\x9c\xd2\xce\xed\xa2\x41\xa8\x82\x27\xb5\xa7\x89\x96\x88\x21\xad\x50\xe1\xa0\xfd\xfd\xa9\x5d\x85\xd0\xc2\x71\xd7\x7c\x79\xcf\x7e\xcf\x5b\x47\x9a\x09\x59\x6f\x32\x42\xb3\x43\x5b\x30\xd2\xd1\x94\x5c\xe2\xb5\x15\x69\xbb\x26\xf7\x22\x55\xe3\xe5\xa3\xf3\xdf\xb8\x86\xd5\x30\x76\xa6\x46\xa6\x23\x3f\x3e\xdb\xf7\x2c\x53\xb0\xba\x7c\x8d\x12\xfc\x60\x6a\x34\x96\x69\x4f\x6e\x0e\x6f\xe3\x59\xc6\x33\xf7\xb7\xf6\x12\x09\xaf\xbf\xf6\xaf\x95\x7c\x46\x4c\x42\x3f\xf8\xae\xaf\x7d\xac\x92\xa6\x0c\x55\x18\x65\x59\x53\xeb\x80\x1b\xb3\x37\x76\x86\x10\x56\x07\x67\x72\xed\x4e\xf8\x46\x27\x5c\xcf\x06\x2b\x9c\x26\x25\x90\xa4\x00\xc6\x96\xe4\xf8\x1e\xa5\x58\xae\xe1\x51\x82\xc2\x29\xb5\xc2\x59\x46\x85\xcb\xde\x0a\xa7\xb5\x9f\xaf\xb5\x0e\x09\x94\x94\xd1\x96\xf1\xff\x2c\x61\xe7\x8a\xfc\xef\xde\x29\xc0\x8b\x2b\x0e\xcf\x4b\xc7\x50\x67\x4c\x6e\x99\xde\x23\x83\x23\xab\x73\xc2\xb8\x90\x14\xe0\x27\x00\x00\xff\xff\x7a\xcc\xb2\x8c\x45\x02\x00\x00")
func _2_timestampUpSqlBytes() ([]byte, error) {
return bindataRead(
__2_timestampUpSql,
"2_timestamp.up.sql",
)
}
func _2_timestampUpSql() (*asset, error) {
bytes, err := _2_timestampUpSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "2_timestamp.up.sql", size: 581, mode: os.FileMode(0664), modTime: time.Unix(1720472437, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x54, 0x76, 0x6e, 0xf4, 0x2a, 0x56, 0x88, 0xd0, 0xe3, 0x5e, 0x7d, 0xbd, 0xec, 0x5c, 0x59, 0xfa, 0x44, 0x18, 0x82, 0xae, 0x55, 0x4c, 0xcf, 0x41, 0xa6, 0x7, 0x63, 0xba, 0x41, 0xa4, 0xfc, 0x3}}
return a, nil
}
var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\xb1\x0d\xc4\x20\x0c\x05\xd0\x9e\x29\xfe\x02\xd8\xfd\x6d\xe3\x4b\xac\x2f\x44\x82\x09\x78\x7f\xa5\x49\xfd\xa6\x1d\xdd\xe8\xd8\xcf\x55\x8a\x2a\xe3\x47\x1f\xbe\x2c\x1d\x8c\xfa\x6f\xe3\xb4\x34\xd4\xd9\x89\xbb\x71\x59\xb6\x18\x1b\x35\x20\xa2\x9f\x0a\x03\xa2\xe5\x0d\x00\x00\xff\xff\x60\xcd\x06\xbe\x4a\x00\x00\x00")
func docGoBytes() ([]byte, error) {
return bindataRead(
_docGo,
"doc.go",
)
}
func docGo() (*asset, error) {
bytes, err := docGoBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "doc.go", size: 74, mode: os.FileMode(0664), modTime: time.Unix(1715177003, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xde, 0x7c, 0x28, 0xcd, 0x47, 0xf2, 0xfa, 0x7c, 0x51, 0x2d, 0xd8, 0x38, 0xb, 0xb0, 0x34, 0x9d, 0x4c, 0x62, 0xa, 0x9e, 0x28, 0xc3, 0x31, 0x23, 0xd9, 0xbb, 0x89, 0x9f, 0xa0, 0x89, 0x1f, 0xe8}}
return a, nil
}
// Asset loads and returns the asset for the given name.
// It returns an error if the asset could not be found or
// could not be loaded.
func Asset(name string) ([]byte, error) {
canonicalName := strings.Replace(name, "\\", "/", -1)
if f, ok := _bindata[canonicalName]; ok {
a, err := f()
if err != nil {
return nil, fmt.Errorf("Asset %s can't read by error: %v", name, err)
}
return a.bytes, nil
}
return nil, fmt.Errorf("Asset %s not found", name)
}
// AssetString returns the asset contents as a string (instead of a []byte).
func AssetString(name string) (string, error) {
data, err := Asset(name)
return string(data), err
}
// MustAsset is like Asset but panics when Asset would return an error.
// It simplifies safe initialization of global variables.
func MustAsset(name string) []byte {
a, err := Asset(name)
if err != nil {
panic("asset: Asset(" + name + "): " + err.Error())
}
return a
}
// MustAssetString is like AssetString but panics when Asset would return an
// error. It simplifies safe initialization of global variables.
func MustAssetString(name string) string {
return string(MustAsset(name))
}
// AssetInfo loads and returns the asset info for the given name.
// It returns an error if the asset could not be found or
// could not be loaded.
func AssetInfo(name string) (os.FileInfo, error) {
canonicalName := strings.Replace(name, "\\", "/", -1)
if f, ok := _bindata[canonicalName]; ok {
a, err := f()
if err != nil {
return nil, fmt.Errorf("AssetInfo %s can't read by error: %v", name, err)
}
return a.info, nil
}
return nil, fmt.Errorf("AssetInfo %s not found", name)
}
// AssetDigest returns the digest of the file with the given name. It returns an
// error if the asset could not be found or the digest could not be loaded.
func AssetDigest(name string) ([sha256.Size]byte, error) {
canonicalName := strings.Replace(name, "\\", "/", -1)
if f, ok := _bindata[canonicalName]; ok {
a, err := f()
if err != nil {
return [sha256.Size]byte{}, fmt.Errorf("AssetDigest %s can't read by error: %v", name, err)
}
return a.digest, nil
}
return [sha256.Size]byte{}, fmt.Errorf("AssetDigest %s not found", name)
}
// Digests returns a map of all known files and their checksums.
func Digests() (map[string][sha256.Size]byte, error) {
mp := make(map[string][sha256.Size]byte, len(_bindata))
for name := range _bindata {
a, err := _bindata[name]()
if err != nil {
return nil, err
}
mp[name] = a.digest
}
return mp, nil
}
// AssetNames returns the names of the assets.
func AssetNames() []string {
names := make([]string, 0, len(_bindata))
for name := range _bindata {
names = append(names, name)
}
return names
}
// _bindata is a table, holding each asset generator, mapped to its name.
var _bindata = map[string]func() (*asset, error){
"1_setup.up.sql": _1_setupUpSql,
"2_timestamp.up.sql": _2_timestampUpSql,
"doc.go": docGo,
}
// AssetDir returns the file names below a certain
// directory embedded in the file by go-bindata.
// For example if you run go-bindata on data/... and data contains the
// following hierarchy:
// data/
// foo.txt
// img/
// a.png
// b.png
// then AssetDir("data") would return []string{"foo.txt", "img"},
// AssetDir("data/img") would return []string{"a.png", "b.png"},
// AssetDir("foo.txt") and AssetDir("notexist") would return an error, and
// AssetDir("") will return []string{"data"}.
func AssetDir(name string) ([]string, error) {
node := _bintree
if len(name) != 0 {
canonicalName := strings.Replace(name, "\\", "/", -1)
pathList := strings.Split(canonicalName, "/")
for _, p := range pathList {
node = node.Children[p]
if node == nil {
return nil, fmt.Errorf("Asset %s not found", name)
}
}
}
if node.Func != nil {
return nil, fmt.Errorf("Asset %s not found", name)
}
rv := make([]string, 0, len(node.Children))
for childName := range node.Children {
rv = append(rv, childName)
}
return rv, nil
}
type bintree struct {
Func func() (*asset, error)
Children map[string]*bintree
}
var _bintree = &bintree{nil, map[string]*bintree{
"1_setup.up.sql": &bintree{_1_setupUpSql, map[string]*bintree{}},
"2_timestamp.up.sql": &bintree{_2_timestampUpSql, map[string]*bintree{}},
"doc.go": &bintree{docGo, map[string]*bintree{}},
}}
// RestoreAsset restores an asset under the given directory.
func RestoreAsset(dir, name string) error {
data, err := Asset(name)
if err != nil {
return err
}
info, err := AssetInfo(name)
if err != nil {
return err
}
err = os.MkdirAll(_filePath(dir, filepath.Dir(name)), os.FileMode(0755))
if err != nil {
return err
}
err = ioutil.WriteFile(_filePath(dir, name), data, info.Mode())
if err != nil {
return err
}
return os.Chtimes(_filePath(dir, name), info.ModTime(), info.ModTime())
}
// RestoreAssets restores an asset under the given directory recursively.
func RestoreAssets(dir, name string) error {
children, err := AssetDir(name)
// File
if err != nil {
return RestoreAsset(dir, name)
}
// Dir
for _, child := range children {
err = RestoreAssets(dir, filepath.Join(name, child))
if err != nil {
return err
}
}
return nil
}
func _filePath(dir, name string) string {
canonicalName := strings.Replace(name, "\\", "/", -1)
return filepath.Join(append([]string{dir}, strings.Split(canonicalName, "/")...)...)
}

View File

@ -1,31 +0,0 @@
CREATE TABLE IF NOT EXISTS syncTopicStatus (
clusterId INTEGER NOT NULL,
pubsubTopic VARCHAR NOT NULL,
lastSyncTimestamp INTEGER NOT NULL,
PRIMARY KEY (clusterId, pubsubTopic)
) WITHOUT ROWID;
CREATE TABLE IF NOT EXISTS missingMessages (
runId VARCHAR NOT NULL,
clusterId INTEGER NOT NULL,
pubsubTopic VARCHAR NOT NULL,
messageHash VARCHAR NOT NULL,
msgTimestamp INTEGER NOT NULL,
storenode VARCHAR NOT NULL,
msgStatus VARCHAR NOT NULL,
storedAt INTEGER NOT NULL,
PRIMARY KEY (messageHash, storenode)
) WITHOUT ROWID;
CREATE INDEX IF NOT EXISTS idxMsg1 ON missingMessages(storedAt DESC);
CREATE INDEX IF NOT EXISTS idxMsg2 ON missingMessages(runId);
CREATE TABLE IF NOT EXISTS storeNodeUnavailable (
runId VARCHAR NOT NULL,
storenode VARCHAR NOT NULL,
requestTime INTEGER NOT NULL,
PRIMARY KEY (runId, storenode)
) WITHOUT ROWID;
CREATE INDEX IF NOT EXISTS idxStr1 ON storeNodeUnavailable(requestTime);

View File

@ -1,20 +0,0 @@
CREATE TABLE IF NOT EXISTS missingMessages_new (
runId TEXT NOT NULL,
clusterId INTEGER NOT NULL,
pubsubTopic TEXT NOT NULL,
messageHash TEXT NOT NULL,
storenode TEXT NOT NULL,
msgStatus TEXT NOT NULL,
storedAt BIGINT NOT NULL,
PRIMARY KEY (messageHash, storenode)
);
INSERT INTO missingMessages_new (runId, clusterId, pubsubTopic, messageHash, storenode, msgStatus, storedAt)
SELECT runId, clusterId, pubsubTopic, messageHash, storenode, msgStatus, storedAt
FROM missingMessages;
DROP TABLE missingMessages;
ALTER TABLE missingMessages_new
RENAME TO missingMessages;

View File

@ -1,3 +0,0 @@
package sql
//go:generate go-bindata -pkg migrations -o ../bindata.go ./

View File

@ -1,58 +0,0 @@
package sqlite
import (
"database/sql"
"strings"
"github.com/golang-migrate/migrate/v4/database"
"github.com/golang-migrate/migrate/v4/database/sqlite3"
_ "github.com/mattn/go-sqlite3" // Blank import to register the sqlite3 driver
"github.com/waku-org/storenode-messages/internal/persistence/migrate"
"github.com/waku-org/storenode-messages/internal/persistence/sqlite/migrations"
"go.uber.org/zap"
)
func addSqliteURLDefaults(dburl string) string {
if !strings.Contains(dburl, "?") {
dburl += "?"
}
if !strings.Contains(dburl, "_journal=") {
dburl += "&_journal=WAL"
}
if !strings.Contains(dburl, "_timeout=") {
dburl += "&_timeout=5000"
}
return dburl
}
// NewDB creates a sqlite3 DB in the specified path
func NewDB(dburl string, logger *zap.Logger) (*sql.DB, error) {
db, err := sql.Open("sqlite3", addSqliteURLDefaults(dburl))
if err != nil {
return nil, err
}
// Disable concurrent access as not supported by the driver
db.SetMaxOpenConns(1)
return db, nil
}
func migrationDriver(db *sql.DB) (database.Driver, error) {
return sqlite3.WithInstance(db, &sqlite3.Config{
MigrationsTable: "message_counter_" + sqlite3.DefaultMigrationsTable,
})
}
// Migrations is the function used for DB migration with sqlite driver
func Migrations(db *sql.DB, logger *zap.Logger) error {
migrationDriver, err := migrationDriver(db)
if err != nil {
return err
}
return migrate.Migrate(db, migrationDriver, migrations.AssetNames(), migrations.Asset)
}

View File

@ -7,7 +7,6 @@ import (
"strings"
"github.com/waku-org/storenode-messages/internal/persistence/postgres"
"github.com/waku-org/storenode-messages/internal/persistence/sqlite"
"go.uber.org/zap"
)
@ -42,10 +41,8 @@ func ParseURL(databaseURL string, logger *zap.Logger) (*sql.DB, func(*sql.DB, *z
dbURLParts := strings.Split(dbURL, "://")
dbEngine := dbURLParts[0]
dbParams := dbURLParts[1]
_ = dbParams
switch dbEngine {
case "sqlite3":
db, err = sqlite.NewDB(dbParams, logger)
migrationFn = sqlite.Migrations
case "postgres", "postgresql":
db, err = postgres.NewDB(dbURL, logger)
migrationFn = postgres.Migrations