diff --git a/cmd/storemsgcounter/execute.go b/cmd/storemsgcounter/execute.go index 8b0d496..2be66c8 100644 --- a/cmd/storemsgcounter/execute.go +++ b/cmd/storemsgcounter/execute.go @@ -39,11 +39,6 @@ const timeInterval = 2 * time.Minute const delay = 5 * time.Minute const maxAttempts = 3 -type MessageAttr struct { - Timestamp uint64 - PubsubTopic string -} - func Execute(ctx context.Context, options Options) error { // Set encoding for logs (console, json, ...) // Note that libp2p reads the encoding from GOLOG_LOG_FMT env var. @@ -155,14 +150,14 @@ func Execute(ctx context.Context, options Options) error { var msgMapLock sync.Mutex var msgMap map[pb.MessageHash]map[peer.ID]MessageExistence -var msgAttr map[pb.MessageHash]MessageAttr +var msgPubsubTopic map[pb.MessageHash]string func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo, wakuNode *node.WakuNode, dbStore *persistence.DBStore, logger *zap.Logger) error { // [MessageHash][StoreNode] = exists? msgMapLock.Lock() msgMap = make(map[pb.MessageHash]map[peer.ID]MessageExistence) - msgAttr = make(map[pb.MessageHash]MessageAttr) + msgPubsubTopic = make(map[pb.MessageHash]string) msgMapLock.Unlock() topicSyncStatus, err := dbStore.GetTopicSyncStatus(ctx, options.ClusterID, options.PubSubTopics.Value()) @@ -219,7 +214,7 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo wg.Wait() // If a message is not available, store in DB in which store nodes it wasnt - // available and its timestamp + // available // ======================================================================== msgMapLock.Lock() defer msgMapLock.Unlock() @@ -236,16 +231,16 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo } if len(missingIn) != 0 { - logger.Info("missing message identified", zap.Stringer("hash", msgHash), zap.String("pubsubTopic", msgAttr[msgHash].PubsubTopic), zap.Int("num_nodes", len(missingIn))) - err := dbStore.RecordMessage(runId, tx, msgHash, options.ClusterID, msgAttr[msgHash].PubsubTopic, msgAttr[msgHash].Timestamp, missingIn, "does_not_exist") + logger.Info("missing message identified", zap.Stringer("hash", msgHash), zap.String("pubsubTopic", msgPubsubTopic[msgHash]), zap.Int("num_nodes", len(missingIn))) + err := dbStore.RecordMessage(runId, tx, msgHash, options.ClusterID, msgPubsubTopic[msgHash], missingIn, "does_not_exist") if err != nil { return err } } if len(unknownIn) != 0 { - logger.Info("message with unknown state identified", zap.Stringer("hash", msgHash), zap.String("pubsubTopic", msgAttr[msgHash].PubsubTopic), zap.Int("num_nodes", len(missingIn))) - err = dbStore.RecordMessage(runId, tx, msgHash, options.ClusterID, msgAttr[msgHash].PubsubTopic, msgAttr[msgHash].Timestamp, unknownIn, "unknown") + logger.Info("message with unknown state identified", zap.Stringer("hash", msgHash), zap.String("pubsubTopic", msgPubsubTopic[msgHash]), zap.Int("num_nodes", len(missingIn))) + err = dbStore.RecordMessage(runId, tx, msgHash, options.ClusterID, msgPubsubTopic[msgHash], unknownIn, "unknown") if err != nil { return err } @@ -294,7 +289,7 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn ContentFilter: protocol.NewContentFilter(topic), TimeStart: proto.Int64(startTime.UnixNano()), TimeEnd: proto.Int64(endTime.UnixNano()), - }, store.WithPeer(node.ID), store.WithPaging(false, 100)) + }, store.WithPeer(node.ID), store.WithPaging(false, 100), store.IncludeData(false)) cancel() if err != nil { queryLogger.Error("could not query storenode", zap.Error(err), zap.Int("attempt", i)) @@ -325,10 +320,7 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn msgMap[hash] = make(map[peer.ID]MessageExistence) } msgMap[hash][node.ID] = Exists - msgAttr[hash] = MessageAttr{ - Timestamp: uint64(mkv.Message.GetTimestamp()), - PubsubTopic: mkv.GetPubsubTopic(), - } + msgPubsubTopic[hash] = mkv.GetPubsubTopic() } msgMapLock.Unlock() diff --git a/internal/persistence/postgres/migrations/bindata.go b/internal/persistence/postgres/migrations/bindata.go index 775715b..55bf137 100644 --- a/internal/persistence/postgres/migrations/bindata.go +++ b/internal/persistence/postgres/migrations/bindata.go @@ -1,7 +1,7 @@ // Code generated by go-bindata. DO NOT EDIT. // sources: -// 1_setup.down.sql (209B) // 1_setup.up.sql (856B) +// 2_timestamp.up.sql (53B) // doc.go (74B) package migrations @@ -71,26 +71,6 @@ func (fi bindataFileInfo) Sys() interface{} { return nil } -var __1_setupDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x09\xf2\x0f\x50\xf0\xf4\x73\x71\x8d\x50\xf0\x74\x53\x70\x8d\xf0\x0c\x0e\x09\x56\xc8\x4c\xa9\xf0\x2d\x4e\x37\xb4\xe6\xc2\x23\x6b\x84\x5b\x36\xb8\xa4\x08\xa6\x37\xc4\xd1\xc9\xc7\x15\x49\xb6\xb8\x32\x2f\x39\x24\xbf\x20\x33\x39\xb8\x24\xb1\xa4\xb4\x18\x87\xaa\xdc\xcc\xe2\xe2\xcc\xbc\x74\xdf\xd4\xe2\xe2\xc4\xf4\x54\x5c\xaa\x8a\x4b\xf2\x8b\x52\xfd\xf2\x53\x52\x43\xf3\x12\xcb\x12\x33\x73\x12\x93\x72\x52\xad\xb9\x00\x01\x00\x00\xff\xff\x4f\xc7\x7a\xbd\xd1\x00\x00\x00") - -func _1_setupDownSqlBytes() ([]byte, error) { - return bindataRead( - __1_setupDownSql, - "1_setup.down.sql", - ) -} - -func _1_setupDownSql() (*asset, error) { - bytes, err := _1_setupDownSqlBytes() - if err != nil { - return nil, err - } - - info := bindataFileInfo{name: "1_setup.down.sql", size: 209, mode: os.FileMode(0664), modTime: time.Unix(1716212112, 0)} - a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x29, 0xd4, 0xea, 0x7a, 0x83, 0x13, 0x9f, 0x16, 0x6f, 0x3f, 0x53, 0x4f, 0x1, 0xcc, 0xeb, 0x14, 0x7a, 0xf0, 0x62, 0x2d, 0x22, 0x96, 0x52, 0x9d, 0x57, 0x54, 0x8f, 0x51, 0xe8, 0x16, 0x8a, 0x33}} - return a, nil -} - var __1_setupUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x9c\x91\x3d\x6f\xc2\x30\x10\x86\x67\xfc\x2b\x6e\x4c\x24\x96\x76\x65\x0a\xe0\x52\xab\x60\xaa\xd8\x48\x30\x1a\x6c\xa5\x96\xf2\x41\x73\x76\xd5\xfe\xfb\x0a\xb7\x8a\x42\x12\x88\xda\xf9\x7c\xcf\xfb\xfa\xb9\x45\x4a\x13\x49\x41\x26\xf3\x35\x05\xf6\x04\x7c\x2b\x81\xee\x99\x90\x02\xf0\xab\x3c\xc9\xea\x6c\x4f\xc2\x29\xe7\x11\x22\x32\x39\xe5\x1e\x9d\xa9\x99\x06\xc6\x25\x5d\xd1\x34\xbc\xe7\xbb\xf5\x7a\x4a\x26\x67\x7f\x44\x7f\x0c\x1b\x20\xe9\x5e\xb6\x67\xb9\x42\x27\x2e\x3c\x5b\x18\x74\xaa\x38\xc3\x9c\xad\x18\xbf\x7a\xf3\x9a\xb2\x4d\x92\x1e\xe0\x85\x1e\x20\x6a\x92\xa6\xd0\xe2\xc6\x24\x9e\x11\x72\xa7\x73\x61\x11\x6d\x99\x6d\x0c\xa2\xca\x4c\xe8\x5c\xfb\x92\xe9\x5e\xa1\x7f\xff\xa4\xf8\x41\x3f\x2b\x7c\xeb\xcf\x30\xbb\xf7\x41\x74\x55\x6d\xca\x4a\x9b\xa1\xc5\x5f\xc9\xdd\x49\xd8\xd1\x89\xeb\xe1\xa0\x23\xac\xd5\x6b\x0a\x4d\xd2\x95\x30\xc6\x97\x74\xdf\x11\x66\xf5\xe7\x06\xb3\x07\xd8\xf2\xae\xbb\xa8\x89\x5e\x52\xb1\x88\x67\xe3\x94\xc7\x21\x4a\xd0\x3f\x72\xb5\x90\xc4\x2b\x6d\x76\xa5\xfa\x50\x36\x57\xc7\xdc\xdc\x3e\xdd\x6d\x8d\xb5\x79\xf7\x06\xdd\xe5\x06\xa3\xbe\x02\xfb\x8f\xa6\x84\xab\x83\xa9\xa1\xbe\x51\x2b\x3c\x9e\x91\xef\x00\x00\x00\xff\xff\x8d\xc8\x7a\x90\x58\x03\x00\x00") func _1_setupUpSqlBytes() ([]byte, error) { @@ -106,11 +86,31 @@ func _1_setupUpSql() (*asset, error) { return nil, err } - info := bindataFileInfo{name: "1_setup.up.sql", size: 856, mode: os.FileMode(0664), modTime: time.Unix(1716213117, 0)} + info := bindataFileInfo{name: "1_setup.up.sql", size: 856, mode: os.FileMode(0664), modTime: time.Unix(1716213156, 0)} a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xe, 0x35, 0xf8, 0x8e, 0x74, 0xf4, 0xdc, 0x81, 0xc1, 0x30, 0x44, 0x54, 0x70, 0x36, 0xdb, 0x7c, 0x41, 0x8b, 0xba, 0xd0, 0x52, 0x26, 0x59, 0xbf, 0x56, 0xa, 0x84, 0x8d, 0x89, 0xfe, 0xf1, 0x69}} return a, nil } +var __2_timestampUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\xf4\x09\x71\x0d\x52\x08\x71\x74\xf2\x71\x55\xc8\xcd\x2c\x2e\xce\xcc\x4b\xf7\x4d\x2d\x2e\x4e\x4c\x4f\x2d\x56\x70\x09\xf2\x0f\x50\x70\xf6\xf7\x09\xf5\xf5\x53\xc8\x2d\x4e\x0f\xc9\xcc\x4d\x2d\x2e\x49\xcc\x2d\xb0\x06\x04\x00\x00\xff\xff\x15\xd6\x7e\xf9\x35\x00\x00\x00") + +func _2_timestampUpSqlBytes() ([]byte, error) { + return bindataRead( + __2_timestampUpSql, + "2_timestamp.up.sql", + ) +} + +func _2_timestampUpSql() (*asset, error) { + bytes, err := _2_timestampUpSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "2_timestamp.up.sql", size: 53, mode: os.FileMode(0664), modTime: time.Unix(1720472316, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x81, 0x97, 0xcf, 0xf4, 0x2e, 0x3c, 0xca, 0x2, 0xd6, 0x9e, 0x7e, 0xf8, 0x7c, 0x18, 0x9c, 0x82, 0x25, 0x54, 0xd6, 0xd6, 0x21, 0x25, 0xdf, 0xf9, 0xdf, 0x24, 0x7a, 0xca, 0xbf, 0xeb, 0xe5, 0xd2}} + return a, nil +} + var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\xb1\x0d\xc4\x20\x0c\x05\xd0\x9e\x29\xfe\x02\xd8\xfd\x6d\xe3\x4b\xac\x2f\x44\x82\x09\x78\x7f\xa5\x49\xfd\xa6\x1d\xdd\xe8\xd8\xcf\x55\x8a\x2a\xe3\x47\x1f\xbe\x2c\x1d\x8c\xfa\x6f\xe3\xb4\x34\xd4\xd9\x89\xbb\x71\x59\xb6\x18\x1b\x35\x20\xa2\x9f\x0a\x03\xa2\xe5\x0d\x00\x00\xff\xff\x60\xcd\x06\xbe\x4a\x00\x00\x00") func docGoBytes() ([]byte, error) { @@ -222,10 +222,10 @@ func AssetNames() []string { // _bindata is a table, holding each asset generator, mapped to its name. var _bindata = map[string]func() (*asset, error){ - "1_setup.down.sql": _1_setupDownSql, - "1_setup.up.sql": _1_setupUpSql, + "2_timestamp.up.sql": _2_timestampUpSql, + "doc.go": docGo, } @@ -270,9 +270,9 @@ type bintree struct { } var _bintree = &bintree{nil, map[string]*bintree{ - "1_setup.down.sql": &bintree{_1_setupDownSql, map[string]*bintree{}}, - "1_setup.up.sql": &bintree{_1_setupUpSql, map[string]*bintree{}}, - "doc.go": &bintree{docGo, map[string]*bintree{}}, + "1_setup.up.sql": &bintree{_1_setupUpSql, map[string]*bintree{}}, + "2_timestamp.up.sql": &bintree{_2_timestampUpSql, map[string]*bintree{}}, + "doc.go": &bintree{docGo, map[string]*bintree{}}, }} // RestoreAsset restores an asset under the given directory. diff --git a/internal/persistence/postgres/migrations/sql/1_setup.down.sql b/internal/persistence/postgres/migrations/sql/1_setup.down.sql deleted file mode 100644 index 9a523b0..0000000 --- a/internal/persistence/postgres/migrations/sql/1_setup.down.sql +++ /dev/null @@ -1,6 +0,0 @@ -DROP INDEX IF EXISTS idxMsg1; -DROP INDEX IF EXISTS idxMsg2; -DROP INDEX IF EXISTS idxStr1; -DROP TABLE IF EXISTS syncTopicStatus; -DROP TABLE IF EXISTS missingMessages; -DROP TABLE IF EXISTS storeNodeUnavailable; diff --git a/internal/persistence/postgres/migrations/sql/2_timestamp.up.sql b/internal/persistence/postgres/migrations/sql/2_timestamp.up.sql new file mode 100644 index 0000000..8bf9ccc --- /dev/null +++ b/internal/persistence/postgres/migrations/sql/2_timestamp.up.sql @@ -0,0 +1 @@ +ALTER TABLE missingMessages DROP COLUMN msgTimestamp; \ No newline at end of file diff --git a/internal/persistence/sqlite/migrations/bindata.go b/internal/persistence/sqlite/migrations/bindata.go index 9c56d00..afc5f79 100644 --- a/internal/persistence/sqlite/migrations/bindata.go +++ b/internal/persistence/sqlite/migrations/bindata.go @@ -1,7 +1,7 @@ // Code generated by go-bindata. DO NOT EDIT. // sources: -// 1_setup.down.sql (209B) // 1_setup.up.sql (927B) +// 2_timestamp.up.sql (581B) // doc.go (74B) package migrations @@ -71,26 +71,6 @@ func (fi bindataFileInfo) Sys() interface{} { return nil } -var __1_setupDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x09\xf2\x0f\x50\xf0\xf4\x73\x71\x8d\x50\xf0\x74\x53\x70\x8d\xf0\x0c\x0e\x09\x56\xc8\x4c\xa9\xf0\x2d\x4e\x37\xb4\xe6\xc2\x23\x6b\x84\x5b\x36\xb8\xa4\x08\xa6\x37\xc4\xd1\xc9\xc7\x15\x49\xb6\xb8\x32\x2f\x39\x24\xbf\x20\x33\x39\xb8\x24\xb1\xa4\xb4\x18\x87\xaa\xdc\xcc\xe2\xe2\xcc\xbc\x74\xdf\xd4\xe2\xe2\xc4\xf4\x54\x5c\xaa\x8a\x4b\xf2\x8b\x52\xfd\xf2\x53\x52\x43\xf3\x12\xcb\x12\x33\x73\x12\x93\x72\x52\xad\xb9\x00\x01\x00\x00\xff\xff\x4f\xc7\x7a\xbd\xd1\x00\x00\x00") - -func _1_setupDownSqlBytes() ([]byte, error) { - return bindataRead( - __1_setupDownSql, - "1_setup.down.sql", - ) -} - -func _1_setupDownSql() (*asset, error) { - bytes, err := _1_setupDownSqlBytes() - if err != nil { - return nil, err - } - - info := bindataFileInfo{name: "1_setup.down.sql", size: 209, mode: os.FileMode(0664), modTime: time.Unix(1716212168, 0)} - a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x29, 0xd4, 0xea, 0x7a, 0x83, 0x13, 0x9f, 0x16, 0x6f, 0x3f, 0x53, 0x4f, 0x1, 0xcc, 0xeb, 0x14, 0x7a, 0xf0, 0x62, 0x2d, 0x22, 0x96, 0x52, 0x9d, 0x57, 0x54, 0x8f, 0x51, 0xe8, 0x16, 0x8a, 0x33}} - return a, nil -} - var __1_setupUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xa4\x91\x4d\x4f\xc2\x30\x18\xc7\xcf\xf4\x53\x3c\xc7\x2d\xe1\xa2\xd7\x9d\x26\x54\x69\x84\xcd\xb4\x45\xe0\x58\x68\x33\x9b\xec\x05\xf7\xb4\x46\xbf\xbd\xa1\x12\x82\x58\xc7\x81\x73\xfb\xfc\x5f\x7e\xff\x09\xa7\xb9\xa4\x20\xf3\x87\x39\x05\xf6\x08\x45\x29\x81\xae\x99\x90\x02\xf0\xab\xdd\xc9\x6e\x6f\x77\xc2\x29\xe7\x11\x12\x32\xda\xd5\x1e\x9d\xe9\x99\x06\x56\x48\xfa\x44\x79\xf8\x5f\x2c\xe7\xf3\x31\x19\xed\xfd\x16\xfd\x36\x5c\xc0\x6b\xce\x27\xb3\xfc\xd7\x73\xad\xd0\x89\x83\xa4\x6d\x0c\x3a\xd5\xec\x63\x1a\x2f\x9c\x2d\x72\xbe\x81\x67\xba\x81\xe4\xe4\x36\x86\x33\xed\x94\xa4\xb0\x62\x72\x56\x2e\x25\xf0\x72\xc5\xa6\x19\x21\x64\xa0\x46\x63\x11\x6d\x5b\x2d\x0c\xa2\xaa\x4c\xa8\xd1\xfb\x96\xe9\x58\xc6\x5b\xfa\x35\x3f\x06\x33\x85\x6f\xd1\x67\xac\x06\x9b\xa3\xeb\x7a\xd3\x76\xda\xfc\x73\x7c\x1c\x21\xf2\x18\x2e\x75\xee\xfe\xaa\xc2\x05\xd0\xb3\x88\x63\x38\x19\xc6\x80\x1e\x79\xb2\x62\x4a\xd7\x17\x3c\xad\xfe\x5c\x60\x75\x07\x65\x71\x89\x36\x39\x25\x99\x52\x31\x49\xb3\xeb\x2a\xf7\x31\x95\xb0\x4e\x9a\x0d\x8e\x1a\x9c\x8a\x4e\x9b\x65\xab\x3e\x94\xad\xd5\xb6\x36\x83\xcb\x0e\xe2\xed\xcd\xbb\x37\xe8\x0e\xfb\x5c\x87\x18\x2c\x6e\xc2\x27\x5c\x1f\xf0\xc5\x4a\x24\x67\x59\xd2\x8c\x7c\x07\x00\x00\xff\xff\x6e\xd3\x44\xe6\x9f\x03\x00\x00") func _1_setupUpSqlBytes() ([]byte, error) { @@ -111,6 +91,26 @@ func _1_setupUpSql() (*asset, error) { return a, nil } +var __2_timestampUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xac\x91\x41\x6f\x83\x30\x0c\x85\xcf\xf5\xaf\xf0\xb1\x48\xf9\x07\x9c\xd2\xce\xed\xa2\x41\xa8\x82\x27\xb5\xa7\x89\x96\x88\x21\xad\x50\xe1\xa0\xfd\xfd\xa9\x5d\x85\xd0\xc2\x71\xd7\x7c\x79\xcf\x7e\xcf\x5b\x47\x9a\x09\x59\x6f\x32\x42\xb3\x43\x5b\x30\xd2\xd1\x94\x5c\xe2\xb5\x15\x69\xbb\x26\xf7\x22\x55\xe3\xe5\xa3\xf3\xdf\xb8\x86\xd5\x30\x76\xa6\x46\xa6\x23\x3f\x3e\xdb\xf7\x2c\x53\xb0\xba\x7c\x8d\x12\xfc\x60\x6a\x34\x96\x69\x4f\x6e\x0e\x6f\xe3\x59\xc6\x33\xf7\xb7\xf6\x12\x09\xaf\xbf\xf6\xaf\x95\x7c\x46\x4c\x42\x3f\xf8\xae\xaf\x7d\xac\x92\xa6\x0c\x55\x18\x65\x59\x53\xeb\x80\x1b\xb3\x37\x76\x86\x10\x56\x07\x67\x72\xed\x4e\xf8\x46\x27\x5c\xcf\x06\x2b\x9c\x26\x25\x90\xa4\x00\xc6\x96\xe4\xf8\x1e\xa5\x58\xae\xe1\x51\x82\xc2\x29\xb5\xc2\x59\x46\x85\xcb\xde\x0a\xa7\xb5\x9f\xaf\xb5\x0e\x09\x94\x94\xd1\x96\xf1\xff\x2c\x61\xe7\x8a\xfc\xef\xde\x29\xc0\x8b\x2b\x0e\xcf\x4b\xc7\x50\x67\x4c\x6e\x99\xde\x23\x83\x23\xab\x73\xc2\xb8\x90\x14\xe0\x27\x00\x00\xff\xff\x7a\xcc\xb2\x8c\x45\x02\x00\x00") + +func _2_timestampUpSqlBytes() ([]byte, error) { + return bindataRead( + __2_timestampUpSql, + "2_timestamp.up.sql", + ) +} + +func _2_timestampUpSql() (*asset, error) { + bytes, err := _2_timestampUpSqlBytes() + if err != nil { + return nil, err + } + + info := bindataFileInfo{name: "2_timestamp.up.sql", size: 581, mode: os.FileMode(0664), modTime: time.Unix(1720472437, 0)} + a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x54, 0x76, 0x6e, 0xf4, 0x2a, 0x56, 0x88, 0xd0, 0xe3, 0x5e, 0x7d, 0xbd, 0xec, 0x5c, 0x59, 0xfa, 0x44, 0x18, 0x82, 0xae, 0x55, 0x4c, 0xcf, 0x41, 0xa6, 0x7, 0x63, 0xba, 0x41, 0xa4, 0xfc, 0x3}} + return a, nil +} + var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\xb1\x0d\xc4\x20\x0c\x05\xd0\x9e\x29\xfe\x02\xd8\xfd\x6d\xe3\x4b\xac\x2f\x44\x82\x09\x78\x7f\xa5\x49\xfd\xa6\x1d\xdd\xe8\xd8\xcf\x55\x8a\x2a\xe3\x47\x1f\xbe\x2c\x1d\x8c\xfa\x6f\xe3\xb4\x34\xd4\xd9\x89\xbb\x71\x59\xb6\x18\x1b\x35\x20\xa2\x9f\x0a\x03\xa2\xe5\x0d\x00\x00\xff\xff\x60\xcd\x06\xbe\x4a\x00\x00\x00") func docGoBytes() ([]byte, error) { @@ -222,10 +222,10 @@ func AssetNames() []string { // _bindata is a table, holding each asset generator, mapped to its name. var _bindata = map[string]func() (*asset, error){ - "1_setup.down.sql": _1_setupDownSql, - "1_setup.up.sql": _1_setupUpSql, + "2_timestamp.up.sql": _2_timestampUpSql, + "doc.go": docGo, } @@ -270,9 +270,9 @@ type bintree struct { } var _bintree = &bintree{nil, map[string]*bintree{ - "1_setup.down.sql": &bintree{_1_setupDownSql, map[string]*bintree{}}, - "1_setup.up.sql": &bintree{_1_setupUpSql, map[string]*bintree{}}, - "doc.go": &bintree{docGo, map[string]*bintree{}}, + "1_setup.up.sql": &bintree{_1_setupUpSql, map[string]*bintree{}}, + "2_timestamp.up.sql": &bintree{_2_timestampUpSql, map[string]*bintree{}}, + "doc.go": &bintree{docGo, map[string]*bintree{}}, }} // RestoreAsset restores an asset under the given directory. diff --git a/internal/persistence/sqlite/migrations/sql/1_setup.down.sql b/internal/persistence/sqlite/migrations/sql/1_setup.down.sql deleted file mode 100644 index 9a523b0..0000000 --- a/internal/persistence/sqlite/migrations/sql/1_setup.down.sql +++ /dev/null @@ -1,6 +0,0 @@ -DROP INDEX IF EXISTS idxMsg1; -DROP INDEX IF EXISTS idxMsg2; -DROP INDEX IF EXISTS idxStr1; -DROP TABLE IF EXISTS syncTopicStatus; -DROP TABLE IF EXISTS missingMessages; -DROP TABLE IF EXISTS storeNodeUnavailable; diff --git a/internal/persistence/sqlite/migrations/sql/2_timestamp.up.sql b/internal/persistence/sqlite/migrations/sql/2_timestamp.up.sql new file mode 100644 index 0000000..ca96ea2 --- /dev/null +++ b/internal/persistence/sqlite/migrations/sql/2_timestamp.up.sql @@ -0,0 +1,20 @@ +CREATE TABLE IF NOT EXISTS missingMessages_new ( + runId TEXT NOT NULL, + clusterId INTEGER NOT NULL, + pubsubTopic TEXT NOT NULL, + messageHash TEXT NOT NULL, + storenode TEXT NOT NULL, + msgStatus TEXT NOT NULL, + storedAt BIGINT NOT NULL, + PRIMARY KEY (messageHash, storenode) +); + +INSERT INTO missingMessages_new (runId, clusterId, pubsubTopic, messageHash, storenode, msgStatus, storedAt) +SELECT runId, clusterId, pubsubTopic, messageHash, storenode, msgStatus, storedAt +FROM missingMessages; + +DROP TABLE missingMessages; + +ALTER TABLE missingMessages_new +RENAME TO missingMessages; +