diff --git a/cmd/msgfinder/execute.go b/cmd/msgfinder/execute.go index 4dd3905..52d8784 100644 --- a/cmd/msgfinder/execute.go +++ b/cmd/msgfinder/execute.go @@ -8,6 +8,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/log" + "github.com/google/uuid" "github.com/multiformats/go-multiaddr" "github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/protocol" @@ -36,6 +37,8 @@ type MessageAttr struct { PubsubTopic string } +type contextKey string + func Execute(ctx context.Context, options Options) error { // Set encoding for logs (console, json, ...) // Note that libp2p reads the encoding from GOLOG_LOG_FMT env var. @@ -60,6 +63,9 @@ func Execute(ctx context.Context, options Options) error { node.WithNTP(), node.WithClusterID(uint16(options.ClusterID)), ) + if err != nil { + return err + } err = wakuNode.Start(ctx) if err != nil { return err @@ -71,23 +77,31 @@ func Execute(ctx context.Context, options Options) error { return err } - ticker := time.NewTicker(timeInterval) - defer ticker.Stop() + timer := time.NewTimer(0) + defer timer.Stop() for { select { case <-ctx.Done(): - break - case <-ticker.C: - verifyHistory(ctx, wakuNode, dbStore, logger) + return nil + case <-timer.C: + err := verifyHistory(ctx, wakuNode, dbStore, logger) + if err != nil { + return err + } + timer.Reset(timeInterval) } } } -var msgMapLock sync.Locker +var msgMapLock sync.Mutex var msgMap map[pb.MessageHash]map[string]MessageExistence var msgAttr map[pb.MessageHash]MessageAttr func verifyHistory(ctx context.Context, wakuNode *node.WakuNode, dbStore *persistence.DBStore, logger *zap.Logger) error { + runId := uuid.New().String() + + logger = logger.With(zap.String("runId", runId)) + // [MessageHash][StoreNode] = exists? msgMapLock.Lock() msgMap := make(map[pb.MessageHash]map[string]MessageExistence) @@ -115,10 +129,11 @@ func verifyHistory(ctx context.Context, wakuNode *node.WakuNode, dbStore *persis wg := sync.WaitGroup{} for topic, lastSyncTimestamp := range topicSyncStatus { - go func() { + wg.Add(1) + go func(topic string, lastSyncTimestamp *time.Time) { defer wg.Done() retrieveHistory(ctx, topic, lastSyncTimestamp, wakuNode, dbStore, tx, logger) - }() + }(topic, lastSyncTimestamp) } wg.Wait() @@ -139,11 +154,11 @@ func verifyHistory(ctx context.Context, wakuNode *node.WakuNode, dbStore *persis wg = sync.WaitGroup{} for node, messageHashes := range msgsToVerify { wg.Add(1) - go func() { + go func(node string, messageHashes []pb.MessageHash) { defer wg.Done() nodeMultiaddr, _ := multiaddr.NewMultiaddr(node) verifyMessageExistence(ctx, nodeMultiaddr, messageHashes, wakuNode, logger) - }() + }(node, messageHashes) } wg.Wait() @@ -163,12 +178,12 @@ func verifyHistory(ctx context.Context, wakuNode *node.WakuNode, dbStore *persis } } - err := dbStore.RecordMessage(tx, msgHash, options.ClusterID, msgAttr[msgHash].PubsubTopic, msgAttr[msgHash].Timestamp, missingIn, "does_not_exist") + err := dbStore.RecordMessage(runId, tx, msgHash, options.ClusterID, msgAttr[msgHash].PubsubTopic, msgAttr[msgHash].Timestamp, missingIn, "does_not_exist") if err != nil { return err } - err = dbStore.RecordMessage(tx, msgHash, options.ClusterID, msgAttr[msgHash].PubsubTopic, msgAttr[msgHash].Timestamp, unknownIn, "unknown") + err = dbStore.RecordMessage(runId, tx, msgHash, options.ClusterID, msgAttr[msgHash].PubsubTopic, msgAttr[msgHash].Timestamp, unknownIn, "unknown") if err != nil { return err } diff --git a/cmd/msgfinder/main.go b/cmd/msgfinder/main.go index 2cd33bf..04601be 100644 --- a/cmd/msgfinder/main.go +++ b/cmd/msgfinder/main.go @@ -14,16 +14,13 @@ func main() { options.LogLevel = "INFO" options.LogEncoding = "console" - cliFlags := []cli.Flag{} - app := &cli.App{ Name: "storenode-messages", Version: "0.0.1", Before: altsrc.InitInputSourceWithContext(cliFlags, altsrc.NewTomlSourceFromFlagFunc("config-file")), Flags: cliFlags, Action: func(c *cli.Context) error { - Execute(c.Context, options) - return nil + return Execute(c.Context, options) }, } diff --git a/internal/persistence/database.go b/internal/persistence/database.go index 42fd612..ecfcd94 100644 --- a/internal/persistence/database.go +++ b/internal/persistence/database.go @@ -130,16 +130,15 @@ func (d *DBStore) Start(ctx context.Context, timesource timesource.Timesource) e } func (d *DBStore) cleanOlderRecords(ctx context.Context) error { - d.log.Info("Cleaning older records...") + d.log.Debug("Cleaning older records...") - // TODO: deleteFrom := time.Now().Add(-14 * 24 * time.Hour) - _, err := d.db.Exec("DELETE FROM missingMessages WHERE storedAt <", deleteFrom) + _, err := d.db.ExecContext(ctx, "DELETE FROM missingMessages WHERE storedAt < ?", deleteFrom) if err != nil { return err } - d.log.Info("Older records removed") + d.log.Debug("Older records removed") return nil } @@ -181,7 +180,7 @@ func (d *DBStore) GetTrx(ctx context.Context) (*sql.Tx, error) { func (d *DBStore) GetTopicSyncStatus(ctx context.Context, clusterID uint, pubsubTopics []string) (map[string]*time.Time, error) { result := make(map[string]*time.Time) for _, topic := range pubsubTopics { - result[topic] = &time.Time{} + result[topic] = nil } sqlQuery := `SELECT pubsubTopic, lastSyncTimestamp FROM syncTopicStatus WHERE clusterId = ?` @@ -198,8 +197,10 @@ func (d *DBStore) GetTopicSyncStatus(ctx context.Context, clusterID uint, pubsub return nil, err } - t := time.Unix(0, lastSyncTimestamp) - result[pubsubTopic] = &t + if lastSyncTimestamp != 0 { + t := time.Unix(0, lastSyncTimestamp) + result[pubsubTopic] = &t + } } defer rows.Close() @@ -220,15 +221,15 @@ func (d *DBStore) UpdateTopicSyncState(tx *sql.Tx, clusterID uint, topic string, return stmt.Close() } -func (d *DBStore) RecordMessage(tx *sql.Tx, msgHash pb.MessageHash, clusterID uint, topic string, timestamp uint64, storenodes []string, status string) error { - stmt, err := tx.Prepare("INSERT INTO missingMessages(clusterId, pubsubTopic, messageHash, msgTimestamp, storenode, status, storedAt) VALUES (?, ?, ?, ?, ?, ?, ?)") +func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, clusterID uint, topic string, timestamp uint64, storenodes []string, status string) error { + stmt, err := tx.Prepare("INSERT INTO missingMessages(runId, clusterId, pubsubTopic, messageHash, msgTimestamp, storenode, status, storedAt) VALUES (?, ?, ?, ?, ?, ?, ?)") if err != nil { return err } now := time.Now().Unix() for _, s := range storenodes { - _, err := stmt.Exec(clusterID, topic, msgHash.String(), timestamp, s, status, now) + _, err := stmt.Exec(uuid, clusterID, topic, msgHash.String(), timestamp, s, status, now) if err != nil { return err } diff --git a/internal/persistence/sqlite/migrations/bindata.go b/internal/persistence/sqlite/migrations/bindata.go index f8a75d4..572e795 100644 --- a/internal/persistence/sqlite/migrations/bindata.go +++ b/internal/persistence/sqlite/migrations/bindata.go @@ -1,7 +1,7 @@ // Code generated by go-bindata. DO NOT EDIT. // sources: -// 1_messages.down.sql (34B) -// 1_messages.up.sql (192B) +// 1_setup.down.sql (106B) +// 1_setup.up.sql (580B) // doc.go (74B) package migrations @@ -71,43 +71,43 @@ func (fi bindataFileInfo) Sys() interface{} { return nil } -var __1_messagesDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x09\xf2\x0f\x50\x08\x71\x74\xf2\x71\x55\xf0\x74\x53\x70\x8d\xf0\x0c\x0e\x09\x56\x28\xae\xcc\x4b\x8e\x2f\xc9\x2f\xc8\x4c\x2e\xb6\xe6\x02\x04\x00\x00\xff\xff\xb2\x54\x30\xb5\x22\x00\x00\x00") +var __1_setupDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x09\xf2\x0f\x50\xf0\xf4\x73\x71\x8d\x50\xf0\x74\x53\x70\x8d\xf0\x0c\x0e\x09\x56\xc8\x4c\xa9\xf0\x2d\x4e\x37\xb4\xe6\x02\xcb\x86\x38\x3a\xf9\xb8\x22\xc9\x16\x57\xe6\x25\x87\xe4\x17\x64\x26\x07\x97\x24\x96\x94\x16\xe3\x50\x95\x9b\x59\x5c\x9c\x99\x97\xee\x9b\x5a\x5c\x9c\x98\x9e\x5a\x6c\xcd\x05\x08\x00\x00\xff\xff\x21\x92\x2c\xdf\x6a\x00\x00\x00") -func _1_messagesDownSqlBytes() ([]byte, error) { +func _1_setupDownSqlBytes() ([]byte, error) { return bindataRead( - __1_messagesDownSql, - "1_messages.down.sql", + __1_setupDownSql, + "1_setup.down.sql", ) } -func _1_messagesDownSql() (*asset, error) { - bytes, err := _1_messagesDownSqlBytes() +func _1_setupDownSql() (*asset, error) { + bytes, err := _1_setupDownSqlBytes() if err != nil { return nil, err } - info := bindataFileInfo{name: "1_messages.down.sql", size: 34, mode: os.FileMode(0664), modTime: time.Unix(1715177338, 0)} - a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x37, 0x66, 0x71, 0x87, 0x7d, 0xb7, 0xb1, 0x90, 0x2, 0x5e, 0x13, 0xb3, 0xb8, 0xa3, 0x93, 0x72, 0xb5, 0x8, 0xf, 0x79, 0xaf, 0xba, 0xfc, 0x38, 0x64, 0x79, 0x48, 0xac, 0x30, 0x59, 0x80, 0x21}} + info := bindataFileInfo{name: "1_setup.down.sql", size: 106, mode: os.FileMode(0664), modTime: time.Unix(1715268524, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x3b, 0x6c, 0x2, 0x65, 0x57, 0xd3, 0xeb, 0xf4, 0xfc, 0x58, 0x8a, 0x59, 0x1e, 0x6c, 0xa, 0x59, 0xbc, 0x22, 0x76, 0x8d, 0x78, 0x88, 0x84, 0xdd, 0x12, 0xe, 0x90, 0x96, 0xe7, 0xe6, 0x26, 0xe0}} return a, nil } -var __1_messagesUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x6c\xcd\x31\xae\x82\x40\x14\x85\xe1\x1a\x56\x71\x4a\x48\xd8\xc1\xab\xe6\xe1\x55\x6e\x44\x30\xc3\x45\xa4\x32\x30\x52\x90\x00\x12\x67\x28\xd8\xbd\xd1\xc2\x68\x62\x7d\xfe\x7c\x27\xd6\xa4\x84\x20\xea\x3f\x25\xf0\x16\x59\x2e\xa0\x33\x17\x52\xc0\xae\x93\xb9\xb8\xdb\xdc\x1b\x8b\xc0\xf7\xcc\xb0\x58\xd7\xdd\xf9\x0a\xce\x84\x76\xa4\x5f\x6d\x56\xa6\x69\xe4\x7b\xf3\xd2\xda\xa5\x95\x67\x8c\x93\xd2\x71\xa2\xbe\xe6\xa1\xb1\xae\x58\x27\x23\xfd\xd8\x59\xd7\x8c\xf3\x2f\xe3\xa8\xf9\xa0\x74\x8d\x3d\xd5\x08\xde\x6f\x11\x3e\xec\xd0\x0f\x51\xb1\x24\x79\x29\xd0\x79\xc5\x9b\xbf\x47\x00\x00\x00\xff\xff\x19\xfe\xc7\x67\xc0\x00\x00\x00") +var __1_setupUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xa4\x90\xbd\x6e\x83\x30\x14\x85\xe7\xf8\x29\xee\x08\x12\x4b\xe7\x4c\x2e\xb8\xc5\x2a\x98\xca\x76\x9a\x64\x74\xc0\xa2\x96\xc2\x8f\x72\x8d\xd4\xbe\x7d\x55\x1a\xa1\x94\x52\x96\xcc\xc7\xe7\xf3\x3d\x5f\x2c\x19\xd5\x0c\x34\x7d\xcc\x18\xf0\x27\x10\x85\x06\x76\xe0\x4a\x2b\xc0\xcf\xb6\xd4\x5d\xef\x4a\xe5\x8d\x1f\x10\x02\xb2\x29\xcf\x03\x7a\x7b\xe1\x15\x70\xa1\xd9\x33\x93\xe3\x7b\xb1\xcb\xb2\x88\x6c\xfa\xe1\x84\xc3\x69\x6c\xc0\x1b\x95\x71\x4a\x7f\xc5\x67\x83\x5e\x7d\x23\x5d\x63\xd1\x9b\xa6\x5f\x62\xbc\x4a\x9e\x53\x79\x84\x17\x76\x84\x60\xfa\x2d\x82\x1b\x76\x48\x42\xd8\x73\x9d\x16\x3b\x0d\xb2\xd8\xf3\x64\x4b\x08\x59\x99\xd1\x38\x44\xd7\xd6\xb9\x45\x34\xb5\xbd\x7b\x46\xf3\xc3\x49\x0d\xbe\x2f\xc6\x58\xaf\x0e\x44\xdf\x5d\x6c\xdb\x55\xf6\x9f\xf2\xd5\xf5\x42\x38\x36\x2b\xea\xff\x52\x61\xe6\xed\xe6\xc4\x08\xa6\x0f\x97\xbc\x5d\xb5\x71\x91\xb0\xc3\x4c\x9b\xab\x3e\x72\xac\x1f\xa0\x10\x73\x83\xc1\x74\x49\xc2\x54\x1c\x6e\xc9\x57\x00\x00\x00\xff\xff\x4d\xb6\x1f\x1f\x44\x02\x00\x00") -func _1_messagesUpSqlBytes() ([]byte, error) { +func _1_setupUpSqlBytes() ([]byte, error) { return bindataRead( - __1_messagesUpSql, - "1_messages.up.sql", + __1_setupUpSql, + "1_setup.up.sql", ) } -func _1_messagesUpSql() (*asset, error) { - bytes, err := _1_messagesUpSqlBytes() +func _1_setupUpSql() (*asset, error) { + bytes, err := _1_setupUpSqlBytes() if err != nil { return nil, err } - info := bindataFileInfo{name: "1_messages.up.sql", size: 192, mode: os.FileMode(0664), modTime: time.Unix(1715177323, 0)} - a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x7b, 0xd, 0x5e, 0xfd, 0xd, 0x7f, 0x5a, 0xac, 0x26, 0x8f, 0xd2, 0xc5, 0xfe, 0xcc, 0xf6, 0xfd, 0x95, 0x56, 0x40, 0xd4, 0xed, 0xe, 0x27, 0x33, 0x7d, 0xc5, 0x66, 0x86, 0x9c, 0xff, 0x2b, 0x47}} + info := bindataFileInfo{name: "1_setup.up.sql", size: 580, mode: os.FileMode(0664), modTime: time.Unix(1715270769, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xa9, 0xbe, 0x16, 0x81, 0x21, 0xb1, 0x90, 0xd0, 0xf2, 0x7c, 0x5c, 0x19, 0x62, 0xf4, 0x6a, 0xb7, 0xa4, 0x2, 0x54, 0xc8, 0xec, 0x2c, 0xdb, 0x3b, 0x11, 0xe9, 0x85, 0x6a, 0x73, 0x76, 0xe4, 0xf}} return a, nil } @@ -222,9 +222,9 @@ func AssetNames() []string { // _bindata is a table, holding each asset generator, mapped to its name. var _bindata = map[string]func() (*asset, error){ - "1_messages.down.sql": _1_messagesDownSql, + "1_setup.down.sql": _1_setupDownSql, - "1_messages.up.sql": _1_messagesUpSql, + "1_setup.up.sql": _1_setupUpSql, "doc.go": docGo, } @@ -270,9 +270,9 @@ type bintree struct { } var _bintree = &bintree{nil, map[string]*bintree{ - "1_messages.down.sql": &bintree{_1_messagesDownSql, map[string]*bintree{}}, - "1_messages.up.sql": &bintree{_1_messagesUpSql, map[string]*bintree{}}, - "doc.go": &bintree{docGo, map[string]*bintree{}}, + "1_setup.down.sql": &bintree{_1_setupDownSql, map[string]*bintree{}}, + "1_setup.up.sql": &bintree{_1_setupUpSql, map[string]*bintree{}}, + "doc.go": &bintree{docGo, map[string]*bintree{}}, }} // RestoreAsset restores an asset under the given directory. diff --git a/internal/persistence/sqlite/migrations/sql/1_setup.down.sql b/internal/persistence/sqlite/migrations/sql/1_setup.down.sql index 2080a7b..836a265 100644 --- a/internal/persistence/sqlite/migrations/sql/1_setup.down.sql +++ b/internal/persistence/sqlite/migrations/sql/1_setup.down.sql @@ -1,3 +1,4 @@ DROP INDEX IF EXISTS idxMsg1; +DROP INDEX IF EXISTS idxMsg2; DROP TABLE IF EXISTS syncTopicStatus; DROP TABLE IF EXISTS missingMessages; diff --git a/internal/persistence/sqlite/migrations/sql/1_setup.up.sql b/internal/persistence/sqlite/migrations/sql/1_setup.up.sql index 20dd191..7730aba 100644 --- a/internal/persistence/sqlite/migrations/sql/1_setup.up.sql +++ b/internal/persistence/sqlite/migrations/sql/1_setup.up.sql @@ -7,6 +7,7 @@ CREATE TABLE IF NOT EXISTS syncTopicStatus ( CREATE TABLE IF NOT EXISTS missingMessages ( + runId VARCHAR NOT NULL, clusterId INTEGER NOT NULL, pubsubTopic VARCHAR NOT NULL, messageHash VARCHAR NOT NULL, @@ -18,3 +19,4 @@ CREATE TABLE IF NOT EXISTS missingMessages ( ) WITHOUT ROWID; CREATE INDEX IF NOT EXISTS idxMsg1 ON missingMessages(storedAt DESC); +CREATE INDEX IF NOT EXISTS idxMsg2 ON missingMessages(runId);