fix: add runId to logs, and fix running issues

This commit is contained in:
Richard Ramos 2024-05-09 16:03:33 -04:00
parent acd3bf02e6
commit 4ce0b98106
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760
6 changed files with 65 additions and 49 deletions

View File

@ -8,6 +8,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
"github.com/google/uuid"
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol"
@ -36,6 +37,8 @@ type MessageAttr struct {
PubsubTopic string
}
type contextKey string
func Execute(ctx context.Context, options Options) error {
// Set encoding for logs (console, json, ...)
// Note that libp2p reads the encoding from GOLOG_LOG_FMT env var.
@ -60,6 +63,9 @@ func Execute(ctx context.Context, options Options) error {
node.WithNTP(),
node.WithClusterID(uint16(options.ClusterID)),
)
if err != nil {
return err
}
err = wakuNode.Start(ctx)
if err != nil {
return err
@ -71,23 +77,31 @@ func Execute(ctx context.Context, options Options) error {
return err
}
ticker := time.NewTicker(timeInterval)
defer ticker.Stop()
timer := time.NewTimer(0)
defer timer.Stop()
for {
select {
case <-ctx.Done():
break
case <-ticker.C:
verifyHistory(ctx, wakuNode, dbStore, logger)
return nil
case <-timer.C:
err := verifyHistory(ctx, wakuNode, dbStore, logger)
if err != nil {
return err
}
timer.Reset(timeInterval)
}
}
}
var msgMapLock sync.Locker
var msgMapLock sync.Mutex
var msgMap map[pb.MessageHash]map[string]MessageExistence
var msgAttr map[pb.MessageHash]MessageAttr
func verifyHistory(ctx context.Context, wakuNode *node.WakuNode, dbStore *persistence.DBStore, logger *zap.Logger) error {
runId := uuid.New().String()
logger = logger.With(zap.String("runId", runId))
// [MessageHash][StoreNode] = exists?
msgMapLock.Lock()
msgMap := make(map[pb.MessageHash]map[string]MessageExistence)
@ -115,10 +129,11 @@ func verifyHistory(ctx context.Context, wakuNode *node.WakuNode, dbStore *persis
wg := sync.WaitGroup{}
for topic, lastSyncTimestamp := range topicSyncStatus {
go func() {
wg.Add(1)
go func(topic string, lastSyncTimestamp *time.Time) {
defer wg.Done()
retrieveHistory(ctx, topic, lastSyncTimestamp, wakuNode, dbStore, tx, logger)
}()
}(topic, lastSyncTimestamp)
}
wg.Wait()
@ -139,11 +154,11 @@ func verifyHistory(ctx context.Context, wakuNode *node.WakuNode, dbStore *persis
wg = sync.WaitGroup{}
for node, messageHashes := range msgsToVerify {
wg.Add(1)
go func() {
go func(node string, messageHashes []pb.MessageHash) {
defer wg.Done()
nodeMultiaddr, _ := multiaddr.NewMultiaddr(node)
verifyMessageExistence(ctx, nodeMultiaddr, messageHashes, wakuNode, logger)
}()
}(node, messageHashes)
}
wg.Wait()
@ -163,12 +178,12 @@ func verifyHistory(ctx context.Context, wakuNode *node.WakuNode, dbStore *persis
}
}
err := dbStore.RecordMessage(tx, msgHash, options.ClusterID, msgAttr[msgHash].PubsubTopic, msgAttr[msgHash].Timestamp, missingIn, "does_not_exist")
err := dbStore.RecordMessage(runId, tx, msgHash, options.ClusterID, msgAttr[msgHash].PubsubTopic, msgAttr[msgHash].Timestamp, missingIn, "does_not_exist")
if err != nil {
return err
}
err = dbStore.RecordMessage(tx, msgHash, options.ClusterID, msgAttr[msgHash].PubsubTopic, msgAttr[msgHash].Timestamp, unknownIn, "unknown")
err = dbStore.RecordMessage(runId, tx, msgHash, options.ClusterID, msgAttr[msgHash].PubsubTopic, msgAttr[msgHash].Timestamp, unknownIn, "unknown")
if err != nil {
return err
}

View File

@ -14,16 +14,13 @@ func main() {
options.LogLevel = "INFO"
options.LogEncoding = "console"
cliFlags := []cli.Flag{}
app := &cli.App{
Name: "storenode-messages",
Version: "0.0.1",
Before: altsrc.InitInputSourceWithContext(cliFlags, altsrc.NewTomlSourceFromFlagFunc("config-file")),
Flags: cliFlags,
Action: func(c *cli.Context) error {
Execute(c.Context, options)
return nil
return Execute(c.Context, options)
},
}

View File

@ -130,16 +130,15 @@ func (d *DBStore) Start(ctx context.Context, timesource timesource.Timesource) e
}
func (d *DBStore) cleanOlderRecords(ctx context.Context) error {
d.log.Info("Cleaning older records...")
d.log.Debug("Cleaning older records...")
// TODO:
deleteFrom := time.Now().Add(-14 * 24 * time.Hour)
_, err := d.db.Exec("DELETE FROM missingMessages WHERE storedAt <", deleteFrom)
_, err := d.db.ExecContext(ctx, "DELETE FROM missingMessages WHERE storedAt < ?", deleteFrom)
if err != nil {
return err
}
d.log.Info("Older records removed")
d.log.Debug("Older records removed")
return nil
}
@ -181,7 +180,7 @@ func (d *DBStore) GetTrx(ctx context.Context) (*sql.Tx, error) {
func (d *DBStore) GetTopicSyncStatus(ctx context.Context, clusterID uint, pubsubTopics []string) (map[string]*time.Time, error) {
result := make(map[string]*time.Time)
for _, topic := range pubsubTopics {
result[topic] = &time.Time{}
result[topic] = nil
}
sqlQuery := `SELECT pubsubTopic, lastSyncTimestamp FROM syncTopicStatus WHERE clusterId = ?`
@ -198,8 +197,10 @@ func (d *DBStore) GetTopicSyncStatus(ctx context.Context, clusterID uint, pubsub
return nil, err
}
t := time.Unix(0, lastSyncTimestamp)
result[pubsubTopic] = &t
if lastSyncTimestamp != 0 {
t := time.Unix(0, lastSyncTimestamp)
result[pubsubTopic] = &t
}
}
defer rows.Close()
@ -220,15 +221,15 @@ func (d *DBStore) UpdateTopicSyncState(tx *sql.Tx, clusterID uint, topic string,
return stmt.Close()
}
func (d *DBStore) RecordMessage(tx *sql.Tx, msgHash pb.MessageHash, clusterID uint, topic string, timestamp uint64, storenodes []string, status string) error {
stmt, err := tx.Prepare("INSERT INTO missingMessages(clusterId, pubsubTopic, messageHash, msgTimestamp, storenode, status, storedAt) VALUES (?, ?, ?, ?, ?, ?, ?)")
func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, clusterID uint, topic string, timestamp uint64, storenodes []string, status string) error {
stmt, err := tx.Prepare("INSERT INTO missingMessages(runId, clusterId, pubsubTopic, messageHash, msgTimestamp, storenode, status, storedAt) VALUES (?, ?, ?, ?, ?, ?, ?)")
if err != nil {
return err
}
now := time.Now().Unix()
for _, s := range storenodes {
_, err := stmt.Exec(clusterID, topic, msgHash.String(), timestamp, s, status, now)
_, err := stmt.Exec(uuid, clusterID, topic, msgHash.String(), timestamp, s, status, now)
if err != nil {
return err
}

View File

@ -1,7 +1,7 @@
// Code generated by go-bindata. DO NOT EDIT.
// sources:
// 1_messages.down.sql (34B)
// 1_messages.up.sql (192B)
// 1_setup.down.sql (106B)
// 1_setup.up.sql (580B)
// doc.go (74B)
package migrations
@ -71,43 +71,43 @@ func (fi bindataFileInfo) Sys() interface{} {
return nil
}
var __1_messagesDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x09\xf2\x0f\x50\x08\x71\x74\xf2\x71\x55\xf0\x74\x53\x70\x8d\xf0\x0c\x0e\x09\x56\x28\xae\xcc\x4b\x8e\x2f\xc9\x2f\xc8\x4c\x2e\xb6\xe6\x02\x04\x00\x00\xff\xff\xb2\x54\x30\xb5\x22\x00\x00\x00")
var __1_setupDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x09\xf2\x0f\x50\xf0\xf4\x73\x71\x8d\x50\xf0\x74\x53\x70\x8d\xf0\x0c\x0e\x09\x56\xc8\x4c\xa9\xf0\x2d\x4e\x37\xb4\xe6\x02\xcb\x86\x38\x3a\xf9\xb8\x22\xc9\x16\x57\xe6\x25\x87\xe4\x17\x64\x26\x07\x97\x24\x96\x94\x16\xe3\x50\x95\x9b\x59\x5c\x9c\x99\x97\xee\x9b\x5a\x5c\x9c\x98\x9e\x5a\x6c\xcd\x05\x08\x00\x00\xff\xff\x21\x92\x2c\xdf\x6a\x00\x00\x00")
func _1_messagesDownSqlBytes() ([]byte, error) {
func _1_setupDownSqlBytes() ([]byte, error) {
return bindataRead(
__1_messagesDownSql,
"1_messages.down.sql",
__1_setupDownSql,
"1_setup.down.sql",
)
}
func _1_messagesDownSql() (*asset, error) {
bytes, err := _1_messagesDownSqlBytes()
func _1_setupDownSql() (*asset, error) {
bytes, err := _1_setupDownSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "1_messages.down.sql", size: 34, mode: os.FileMode(0664), modTime: time.Unix(1715177338, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x37, 0x66, 0x71, 0x87, 0x7d, 0xb7, 0xb1, 0x90, 0x2, 0x5e, 0x13, 0xb3, 0xb8, 0xa3, 0x93, 0x72, 0xb5, 0x8, 0xf, 0x79, 0xaf, 0xba, 0xfc, 0x38, 0x64, 0x79, 0x48, 0xac, 0x30, 0x59, 0x80, 0x21}}
info := bindataFileInfo{name: "1_setup.down.sql", size: 106, mode: os.FileMode(0664), modTime: time.Unix(1715268524, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x3b, 0x6c, 0x2, 0x65, 0x57, 0xd3, 0xeb, 0xf4, 0xfc, 0x58, 0x8a, 0x59, 0x1e, 0x6c, 0xa, 0x59, 0xbc, 0x22, 0x76, 0x8d, 0x78, 0x88, 0x84, 0xdd, 0x12, 0xe, 0x90, 0x96, 0xe7, 0xe6, 0x26, 0xe0}}
return a, nil
}
var __1_messagesUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x6c\xcd\x31\xae\x82\x40\x14\x85\xe1\x1a\x56\x71\x4a\x48\xd8\xc1\xab\xe6\xe1\x55\x6e\x44\x30\xc3\x45\xa4\x32\x30\x52\x90\x00\x12\x67\x28\xd8\xbd\xd1\xc2\x68\x62\x7d\xfe\x7c\x27\xd6\xa4\x84\x20\xea\x3f\x25\xf0\x16\x59\x2e\xa0\x33\x17\x52\xc0\xae\x93\xb9\xb8\xdb\xdc\x1b\x8b\xc0\xf7\xcc\xb0\x58\xd7\xdd\xf9\x0a\xce\x84\x76\xa4\x5f\x6d\x56\xa6\x69\xe4\x7b\xf3\xd2\xda\xa5\x95\x67\x8c\x93\xd2\x71\xa2\xbe\xe6\xa1\xb1\xae\x58\x27\x23\xfd\xd8\x59\xd7\x8c\xf3\x2f\xe3\xa8\xf9\xa0\x74\x8d\x3d\xd5\x08\xde\x6f\x11\x3e\xec\xd0\x0f\x51\xb1\x24\x79\x29\xd0\x79\xc5\x9b\xbf\x47\x00\x00\x00\xff\xff\x19\xfe\xc7\x67\xc0\x00\x00\x00")
var __1_setupUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xa4\x90\xbd\x6e\x83\x30\x14\x85\xe7\xf8\x29\xee\x08\x12\x4b\xe7\x4c\x2e\xb8\xc5\x2a\x98\xca\x76\x9a\x64\x74\xc0\xa2\x96\xc2\x8f\x72\x8d\xd4\xbe\x7d\x55\x1a\xa1\x94\x52\x96\xcc\xc7\xe7\xf3\x3d\x5f\x2c\x19\xd5\x0c\x34\x7d\xcc\x18\xf0\x27\x10\x85\x06\x76\xe0\x4a\x2b\xc0\xcf\xb6\xd4\x5d\xef\x4a\xe5\x8d\x1f\x10\x02\xb2\x29\xcf\x03\x7a\x7b\xe1\x15\x70\xa1\xd9\x33\x93\xe3\x7b\xb1\xcb\xb2\x88\x6c\xfa\xe1\x84\xc3\x69\x6c\xc0\x1b\x95\x71\x4a\x7f\xc5\x67\x83\x5e\x7d\x23\x5d\x63\xd1\x9b\xa6\x5f\x62\xbc\x4a\x9e\x53\x79\x84\x17\x76\x84\x60\xfa\x2d\x82\x1b\x76\x48\x42\xd8\x73\x9d\x16\x3b\x0d\xb2\xd8\xf3\x64\x4b\x08\x59\x99\xd1\x38\x44\xd7\xd6\xb9\x45\x34\xb5\xbd\x7b\x46\xf3\xc3\x49\x0d\xbe\x2f\xc6\x58\xaf\x0e\x44\xdf\x5d\x6c\xdb\x55\xf6\x9f\xf2\xd5\xf5\x42\x38\x36\x2b\xea\xff\x52\x61\xe6\xed\xe6\xc4\x08\xa6\x0f\x97\xbc\x5d\xb5\x71\x91\xb0\xc3\x4c\x9b\xab\x3e\x72\xac\x1f\xa0\x10\x73\x83\xc1\x74\x49\xc2\x54\x1c\x6e\xc9\x57\x00\x00\x00\xff\xff\x4d\xb6\x1f\x1f\x44\x02\x00\x00")
func _1_messagesUpSqlBytes() ([]byte, error) {
func _1_setupUpSqlBytes() ([]byte, error) {
return bindataRead(
__1_messagesUpSql,
"1_messages.up.sql",
__1_setupUpSql,
"1_setup.up.sql",
)
}
func _1_messagesUpSql() (*asset, error) {
bytes, err := _1_messagesUpSqlBytes()
func _1_setupUpSql() (*asset, error) {
bytes, err := _1_setupUpSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "1_messages.up.sql", size: 192, mode: os.FileMode(0664), modTime: time.Unix(1715177323, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x7b, 0xd, 0x5e, 0xfd, 0xd, 0x7f, 0x5a, 0xac, 0x26, 0x8f, 0xd2, 0xc5, 0xfe, 0xcc, 0xf6, 0xfd, 0x95, 0x56, 0x40, 0xd4, 0xed, 0xe, 0x27, 0x33, 0x7d, 0xc5, 0x66, 0x86, 0x9c, 0xff, 0x2b, 0x47}}
info := bindataFileInfo{name: "1_setup.up.sql", size: 580, mode: os.FileMode(0664), modTime: time.Unix(1715270769, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xa9, 0xbe, 0x16, 0x81, 0x21, 0xb1, 0x90, 0xd0, 0xf2, 0x7c, 0x5c, 0x19, 0x62, 0xf4, 0x6a, 0xb7, 0xa4, 0x2, 0x54, 0xc8, 0xec, 0x2c, 0xdb, 0x3b, 0x11, 0xe9, 0x85, 0x6a, 0x73, 0x76, 0xe4, 0xf}}
return a, nil
}
@ -222,9 +222,9 @@ func AssetNames() []string {
// _bindata is a table, holding each asset generator, mapped to its name.
var _bindata = map[string]func() (*asset, error){
"1_messages.down.sql": _1_messagesDownSql,
"1_setup.down.sql": _1_setupDownSql,
"1_messages.up.sql": _1_messagesUpSql,
"1_setup.up.sql": _1_setupUpSql,
"doc.go": docGo,
}
@ -270,9 +270,9 @@ type bintree struct {
}
var _bintree = &bintree{nil, map[string]*bintree{
"1_messages.down.sql": &bintree{_1_messagesDownSql, map[string]*bintree{}},
"1_messages.up.sql": &bintree{_1_messagesUpSql, map[string]*bintree{}},
"doc.go": &bintree{docGo, map[string]*bintree{}},
"1_setup.down.sql": &bintree{_1_setupDownSql, map[string]*bintree{}},
"1_setup.up.sql": &bintree{_1_setupUpSql, map[string]*bintree{}},
"doc.go": &bintree{docGo, map[string]*bintree{}},
}}
// RestoreAsset restores an asset under the given directory.

View File

@ -1,3 +1,4 @@
DROP INDEX IF EXISTS idxMsg1;
DROP INDEX IF EXISTS idxMsg2;
DROP TABLE IF EXISTS syncTopicStatus;
DROP TABLE IF EXISTS missingMessages;

View File

@ -7,6 +7,7 @@ CREATE TABLE IF NOT EXISTS syncTopicStatus (
CREATE TABLE IF NOT EXISTS missingMessages (
runId VARCHAR NOT NULL,
clusterId INTEGER NOT NULL,
pubsubTopic VARCHAR NOT NULL,
messageHash VARCHAR NOT NULL,
@ -18,3 +19,4 @@ CREATE TABLE IF NOT EXISTS missingMessages (
) WITHOUT ROWID;
CREATE INDEX IF NOT EXISTS idxMsg1 ON missingMessages(storedAt DESC);
CREATE INDEX IF NOT EXISTS idxMsg2 ON missingMessages(runId);