chore: do not store timestamps

This commit is contained in:
Richard Ramos 2024-07-08 17:00:54 -04:00
parent 00306bb3cc
commit 5347ec23ff
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760
7 changed files with 83 additions and 82 deletions

View File

@ -39,11 +39,6 @@ const timeInterval = 2 * time.Minute
const delay = 5 * time.Minute
const maxAttempts = 3
type MessageAttr struct {
Timestamp uint64
PubsubTopic string
}
func Execute(ctx context.Context, options Options) error {
// Set encoding for logs (console, json, ...)
// Note that libp2p reads the encoding from GOLOG_LOG_FMT env var.
@ -155,14 +150,14 @@ func Execute(ctx context.Context, options Options) error {
var msgMapLock sync.Mutex
var msgMap map[pb.MessageHash]map[peer.ID]MessageExistence
var msgAttr map[pb.MessageHash]MessageAttr
var msgPubsubTopic map[pb.MessageHash]string
func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo, wakuNode *node.WakuNode, dbStore *persistence.DBStore, logger *zap.Logger) error {
// [MessageHash][StoreNode] = exists?
msgMapLock.Lock()
msgMap = make(map[pb.MessageHash]map[peer.ID]MessageExistence)
msgAttr = make(map[pb.MessageHash]MessageAttr)
msgPubsubTopic = make(map[pb.MessageHash]string)
msgMapLock.Unlock()
topicSyncStatus, err := dbStore.GetTopicSyncStatus(ctx, options.ClusterID, options.PubSubTopics.Value())
@ -219,7 +214,7 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo
wg.Wait()
// If a message is not available, store in DB in which store nodes it wasnt
// available and its timestamp
// available
// ========================================================================
msgMapLock.Lock()
defer msgMapLock.Unlock()
@ -236,16 +231,16 @@ func verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo
}
if len(missingIn) != 0 {
logger.Info("missing message identified", zap.Stringer("hash", msgHash), zap.String("pubsubTopic", msgAttr[msgHash].PubsubTopic), zap.Int("num_nodes", len(missingIn)))
err := dbStore.RecordMessage(runId, tx, msgHash, options.ClusterID, msgAttr[msgHash].PubsubTopic, msgAttr[msgHash].Timestamp, missingIn, "does_not_exist")
logger.Info("missing message identified", zap.Stringer("hash", msgHash), zap.String("pubsubTopic", msgPubsubTopic[msgHash]), zap.Int("num_nodes", len(missingIn)))
err := dbStore.RecordMessage(runId, tx, msgHash, options.ClusterID, msgPubsubTopic[msgHash], missingIn, "does_not_exist")
if err != nil {
return err
}
}
if len(unknownIn) != 0 {
logger.Info("message with unknown state identified", zap.Stringer("hash", msgHash), zap.String("pubsubTopic", msgAttr[msgHash].PubsubTopic), zap.Int("num_nodes", len(missingIn)))
err = dbStore.RecordMessage(runId, tx, msgHash, options.ClusterID, msgAttr[msgHash].PubsubTopic, msgAttr[msgHash].Timestamp, unknownIn, "unknown")
logger.Info("message with unknown state identified", zap.Stringer("hash", msgHash), zap.String("pubsubTopic", msgPubsubTopic[msgHash]), zap.Int("num_nodes", len(missingIn)))
err = dbStore.RecordMessage(runId, tx, msgHash, options.ClusterID, msgPubsubTopic[msgHash], unknownIn, "unknown")
if err != nil {
return err
}
@ -294,7 +289,7 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn
ContentFilter: protocol.NewContentFilter(topic),
TimeStart: proto.Int64(startTime.UnixNano()),
TimeEnd: proto.Int64(endTime.UnixNano()),
}, store.WithPeer(node.ID), store.WithPaging(false, 100))
}, store.WithPeer(node.ID), store.WithPaging(false, 100), store.IncludeData(false))
cancel()
if err != nil {
queryLogger.Error("could not query storenode", zap.Error(err), zap.Int("attempt", i))
@ -325,10 +320,7 @@ func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrIn
msgMap[hash] = make(map[peer.ID]MessageExistence)
}
msgMap[hash][node.ID] = Exists
msgAttr[hash] = MessageAttr{
Timestamp: uint64(mkv.Message.GetTimestamp()),
PubsubTopic: mkv.GetPubsubTopic(),
}
msgPubsubTopic[hash] = mkv.GetPubsubTopic()
}
msgMapLock.Unlock()

View File

@ -1,7 +1,7 @@
// Code generated by go-bindata. DO NOT EDIT.
// sources:
// 1_setup.down.sql (209B)
// 1_setup.up.sql (856B)
// 2_timestamp.up.sql (53B)
// doc.go (74B)
package migrations
@ -71,26 +71,6 @@ func (fi bindataFileInfo) Sys() interface{} {
return nil
}
var __1_setupDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x09\xf2\x0f\x50\xf0\xf4\x73\x71\x8d\x50\xf0\x74\x53\x70\x8d\xf0\x0c\x0e\x09\x56\xc8\x4c\xa9\xf0\x2d\x4e\x37\xb4\xe6\xc2\x23\x6b\x84\x5b\x36\xb8\xa4\x08\xa6\x37\xc4\xd1\xc9\xc7\x15\x49\xb6\xb8\x32\x2f\x39\x24\xbf\x20\x33\x39\xb8\x24\xb1\xa4\xb4\x18\x87\xaa\xdc\xcc\xe2\xe2\xcc\xbc\x74\xdf\xd4\xe2\xe2\xc4\xf4\x54\x5c\xaa\x8a\x4b\xf2\x8b\x52\xfd\xf2\x53\x52\x43\xf3\x12\xcb\x12\x33\x73\x12\x93\x72\x52\xad\xb9\x00\x01\x00\x00\xff\xff\x4f\xc7\x7a\xbd\xd1\x00\x00\x00")
func _1_setupDownSqlBytes() ([]byte, error) {
return bindataRead(
__1_setupDownSql,
"1_setup.down.sql",
)
}
func _1_setupDownSql() (*asset, error) {
bytes, err := _1_setupDownSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "1_setup.down.sql", size: 209, mode: os.FileMode(0664), modTime: time.Unix(1716212112, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x29, 0xd4, 0xea, 0x7a, 0x83, 0x13, 0x9f, 0x16, 0x6f, 0x3f, 0x53, 0x4f, 0x1, 0xcc, 0xeb, 0x14, 0x7a, 0xf0, 0x62, 0x2d, 0x22, 0x96, 0x52, 0x9d, 0x57, 0x54, 0x8f, 0x51, 0xe8, 0x16, 0x8a, 0x33}}
return a, nil
}
var __1_setupUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x9c\x91\x3d\x6f\xc2\x30\x10\x86\x67\xfc\x2b\x6e\x4c\x24\x96\x76\x65\x0a\xe0\x52\xab\x60\xaa\xd8\x48\x30\x1a\x6c\xa5\x96\xf2\x41\x73\x76\xd5\xfe\xfb\x0a\xb7\x8a\x42\x12\x88\xda\xf9\x7c\xcf\xfb\xfa\xb9\x45\x4a\x13\x49\x41\x26\xf3\x35\x05\xf6\x04\x7c\x2b\x81\xee\x99\x90\x02\xf0\xab\x3c\xc9\xea\x6c\x4f\xc2\x29\xe7\x11\x22\x32\x39\xe5\x1e\x9d\xa9\x99\x06\xc6\x25\x5d\xd1\x34\xbc\xe7\xbb\xf5\x7a\x4a\x26\x67\x7f\x44\x7f\x0c\x1b\x20\xe9\x5e\xb6\x67\xb9\x42\x27\x2e\x3c\x5b\x18\x74\xaa\x38\xc3\x9c\xad\x18\xbf\x7a\xf3\x9a\xb2\x4d\x92\x1e\xe0\x85\x1e\x20\x6a\x92\xa6\xd0\xe2\xc6\x24\x9e\x11\x72\xa7\x73\x61\x11\x6d\x99\x6d\x0c\xa2\xca\x4c\xe8\x5c\xfb\x92\xe9\x5e\xa1\x7f\xff\xa4\xf8\x41\x3f\x2b\x7c\xeb\xcf\x30\xbb\xf7\x41\x74\x55\x6d\xca\x4a\x9b\xa1\xc5\x5f\xc9\xdd\x49\xd8\xd1\x89\xeb\xe1\xa0\x23\xac\xd5\x6b\x0a\x4d\xd2\x95\x30\xc6\x97\x74\xdf\x11\x66\xf5\xe7\x06\xb3\x07\xd8\xf2\xae\xbb\xa8\x89\x5e\x52\xb1\x88\x67\xe3\x94\xc7\x21\x4a\xd0\x3f\x72\xb5\x90\xc4\x2b\x6d\x76\xa5\xfa\x50\x36\x57\xc7\xdc\xdc\x3e\xdd\x6d\x8d\xb5\x79\xf7\x06\xdd\xe5\x06\xa3\xbe\x02\xfb\x8f\xa6\x84\xab\x83\xa9\xa1\xbe\x51\x2b\x3c\x9e\x91\xef\x00\x00\x00\xff\xff\x8d\xc8\x7a\x90\x58\x03\x00\x00")
func _1_setupUpSqlBytes() ([]byte, error) {
@ -106,11 +86,31 @@ func _1_setupUpSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "1_setup.up.sql", size: 856, mode: os.FileMode(0664), modTime: time.Unix(1716213117, 0)}
info := bindataFileInfo{name: "1_setup.up.sql", size: 856, mode: os.FileMode(0664), modTime: time.Unix(1716213156, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xe, 0x35, 0xf8, 0x8e, 0x74, 0xf4, 0xdc, 0x81, 0xc1, 0x30, 0x44, 0x54, 0x70, 0x36, 0xdb, 0x7c, 0x41, 0x8b, 0xba, 0xd0, 0x52, 0x26, 0x59, 0xbf, 0x56, 0xa, 0x84, 0x8d, 0x89, 0xfe, 0xf1, 0x69}}
return a, nil
}
var __2_timestampUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\xf4\x09\x71\x0d\x52\x08\x71\x74\xf2\x71\x55\xc8\xcd\x2c\x2e\xce\xcc\x4b\xf7\x4d\x2d\x2e\x4e\x4c\x4f\x2d\x56\x70\x09\xf2\x0f\x50\x70\xf6\xf7\x09\xf5\xf5\x53\xc8\x2d\x4e\x0f\xc9\xcc\x4d\x2d\x2e\x49\xcc\x2d\xb0\x06\x04\x00\x00\xff\xff\x15\xd6\x7e\xf9\x35\x00\x00\x00")
func _2_timestampUpSqlBytes() ([]byte, error) {
return bindataRead(
__2_timestampUpSql,
"2_timestamp.up.sql",
)
}
func _2_timestampUpSql() (*asset, error) {
bytes, err := _2_timestampUpSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "2_timestamp.up.sql", size: 53, mode: os.FileMode(0664), modTime: time.Unix(1720472316, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x81, 0x97, 0xcf, 0xf4, 0x2e, 0x3c, 0xca, 0x2, 0xd6, 0x9e, 0x7e, 0xf8, 0x7c, 0x18, 0x9c, 0x82, 0x25, 0x54, 0xd6, 0xd6, 0x21, 0x25, 0xdf, 0xf9, 0xdf, 0x24, 0x7a, 0xca, 0xbf, 0xeb, 0xe5, 0xd2}}
return a, nil
}
var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\xb1\x0d\xc4\x20\x0c\x05\xd0\x9e\x29\xfe\x02\xd8\xfd\x6d\xe3\x4b\xac\x2f\x44\x82\x09\x78\x7f\xa5\x49\xfd\xa6\x1d\xdd\xe8\xd8\xcf\x55\x8a\x2a\xe3\x47\x1f\xbe\x2c\x1d\x8c\xfa\x6f\xe3\xb4\x34\xd4\xd9\x89\xbb\x71\x59\xb6\x18\x1b\x35\x20\xa2\x9f\x0a\x03\xa2\xe5\x0d\x00\x00\xff\xff\x60\xcd\x06\xbe\x4a\x00\x00\x00")
func docGoBytes() ([]byte, error) {
@ -222,10 +222,10 @@ func AssetNames() []string {
// _bindata is a table, holding each asset generator, mapped to its name.
var _bindata = map[string]func() (*asset, error){
"1_setup.down.sql": _1_setupDownSql,
"1_setup.up.sql": _1_setupUpSql,
"2_timestamp.up.sql": _2_timestampUpSql,
"doc.go": docGo,
}
@ -270,9 +270,9 @@ type bintree struct {
}
var _bintree = &bintree{nil, map[string]*bintree{
"1_setup.down.sql": &bintree{_1_setupDownSql, map[string]*bintree{}},
"1_setup.up.sql": &bintree{_1_setupUpSql, map[string]*bintree{}},
"doc.go": &bintree{docGo, map[string]*bintree{}},
"1_setup.up.sql": &bintree{_1_setupUpSql, map[string]*bintree{}},
"2_timestamp.up.sql": &bintree{_2_timestampUpSql, map[string]*bintree{}},
"doc.go": &bintree{docGo, map[string]*bintree{}},
}}
// RestoreAsset restores an asset under the given directory.

View File

@ -1,6 +0,0 @@
DROP INDEX IF EXISTS idxMsg1;
DROP INDEX IF EXISTS idxMsg2;
DROP INDEX IF EXISTS idxStr1;
DROP TABLE IF EXISTS syncTopicStatus;
DROP TABLE IF EXISTS missingMessages;
DROP TABLE IF EXISTS storeNodeUnavailable;

View File

@ -0,0 +1 @@
ALTER TABLE missingMessages DROP COLUMN msgTimestamp;

View File

@ -1,7 +1,7 @@
// Code generated by go-bindata. DO NOT EDIT.
// sources:
// 1_setup.down.sql (209B)
// 1_setup.up.sql (927B)
// 2_timestamp.up.sql (581B)
// doc.go (74B)
package migrations
@ -71,26 +71,6 @@ func (fi bindataFileInfo) Sys() interface{} {
return nil
}
var __1_setupDownSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\x09\xf2\x0f\x50\xf0\xf4\x73\x71\x8d\x50\xf0\x74\x53\x70\x8d\xf0\x0c\x0e\x09\x56\xc8\x4c\xa9\xf0\x2d\x4e\x37\xb4\xe6\xc2\x23\x6b\x84\x5b\x36\xb8\xa4\x08\xa6\x37\xc4\xd1\xc9\xc7\x15\x49\xb6\xb8\x32\x2f\x39\x24\xbf\x20\x33\x39\xb8\x24\xb1\xa4\xb4\x18\x87\xaa\xdc\xcc\xe2\xe2\xcc\xbc\x74\xdf\xd4\xe2\xe2\xc4\xf4\x54\x5c\xaa\x8a\x4b\xf2\x8b\x52\xfd\xf2\x53\x52\x43\xf3\x12\xcb\x12\x33\x73\x12\x93\x72\x52\xad\xb9\x00\x01\x00\x00\xff\xff\x4f\xc7\x7a\xbd\xd1\x00\x00\x00")
func _1_setupDownSqlBytes() ([]byte, error) {
return bindataRead(
__1_setupDownSql,
"1_setup.down.sql",
)
}
func _1_setupDownSql() (*asset, error) {
bytes, err := _1_setupDownSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "1_setup.down.sql", size: 209, mode: os.FileMode(0664), modTime: time.Unix(1716212168, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x29, 0xd4, 0xea, 0x7a, 0x83, 0x13, 0x9f, 0x16, 0x6f, 0x3f, 0x53, 0x4f, 0x1, 0xcc, 0xeb, 0x14, 0x7a, 0xf0, 0x62, 0x2d, 0x22, 0x96, 0x52, 0x9d, 0x57, 0x54, 0x8f, 0x51, 0xe8, 0x16, 0x8a, 0x33}}
return a, nil
}
var __1_setupUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xa4\x91\x4d\x4f\xc2\x30\x18\xc7\xcf\xf4\x53\x3c\xc7\x2d\xe1\xa2\xd7\x9d\x26\x54\x69\x84\xcd\xb4\x45\xe0\x58\x68\x33\x9b\xec\x05\xf7\xb4\x46\xbf\xbd\xa1\x12\x82\x58\xc7\x81\x73\xfb\xfc\x5f\x7e\xff\x09\xa7\xb9\xa4\x20\xf3\x87\x39\x05\xf6\x08\x45\x29\x81\xae\x99\x90\x02\xf0\xab\xdd\xc9\x6e\x6f\x77\xc2\x29\xe7\x11\x12\x32\xda\xd5\x1e\x9d\xe9\x99\x06\x56\x48\xfa\x44\x79\xf8\x5f\x2c\xe7\xf3\x31\x19\xed\xfd\x16\xfd\x36\x5c\xc0\x6b\xce\x27\xb3\xfc\xd7\x73\xad\xd0\x89\x83\xa4\x6d\x0c\x3a\xd5\xec\x63\x1a\x2f\x9c\x2d\x72\xbe\x81\x67\xba\x81\xe4\xe4\x36\x86\x33\xed\x94\xa4\xb0\x62\x72\x56\x2e\x25\xf0\x72\xc5\xa6\x19\x21\x64\xa0\x46\x63\x11\x6d\x5b\x2d\x0c\xa2\xaa\x4c\xa8\xd1\xfb\x96\xe9\x58\xc6\x5b\xfa\x35\x3f\x06\x33\x85\x6f\xd1\x67\xac\x06\x9b\xa3\xeb\x7a\xd3\x76\xda\xfc\x73\x7c\x1c\x21\xf2\x18\x2e\x75\xee\xfe\xaa\xc2\x05\xd0\xb3\x88\x63\x38\x19\xc6\x80\x1e\x79\xb2\x62\x4a\xd7\x17\x3c\xad\xfe\x5c\x60\x75\x07\x65\x71\x89\x36\x39\x25\x99\x52\x31\x49\xb3\xeb\x2a\xf7\x31\x95\xb0\x4e\x9a\x0d\x8e\x1a\x9c\x8a\x4e\x9b\x65\xab\x3e\x94\xad\xd5\xb6\x36\x83\xcb\x0e\xe2\xed\xcd\xbb\x37\xe8\x0e\xfb\x5c\x87\x18\x2c\x6e\xc2\x27\x5c\x1f\xf0\xc5\x4a\x24\x67\x59\xd2\x8c\x7c\x07\x00\x00\xff\xff\x6e\xd3\x44\xe6\x9f\x03\x00\x00")
func _1_setupUpSqlBytes() ([]byte, error) {
@ -111,6 +91,26 @@ func _1_setupUpSql() (*asset, error) {
return a, nil
}
var __2_timestampUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\xac\x91\x41\x6f\x83\x30\x0c\x85\xcf\xf5\xaf\xf0\xb1\x48\xf9\x07\x9c\xd2\xce\xed\xa2\x41\xa8\x82\x27\xb5\xa7\x89\x96\x88\x21\xad\x50\xe1\xa0\xfd\xfd\xa9\x5d\x85\xd0\xc2\x71\xd7\x7c\x79\xcf\x7e\xcf\x5b\x47\x9a\x09\x59\x6f\x32\x42\xb3\x43\x5b\x30\xd2\xd1\x94\x5c\xe2\xb5\x15\x69\xbb\x26\xf7\x22\x55\xe3\xe5\xa3\xf3\xdf\xb8\x86\xd5\x30\x76\xa6\x46\xa6\x23\x3f\x3e\xdb\xf7\x2c\x53\xb0\xba\x7c\x8d\x12\xfc\x60\x6a\x34\x96\x69\x4f\x6e\x0e\x6f\xe3\x59\xc6\x33\xf7\xb7\xf6\x12\x09\xaf\xbf\xf6\xaf\x95\x7c\x46\x4c\x42\x3f\xf8\xae\xaf\x7d\xac\x92\xa6\x0c\x55\x18\x65\x59\x53\xeb\x80\x1b\xb3\x37\x76\x86\x10\x56\x07\x67\x72\xed\x4e\xf8\x46\x27\x5c\xcf\x06\x2b\x9c\x26\x25\x90\xa4\x00\xc6\x96\xe4\xf8\x1e\xa5\x58\xae\xe1\x51\x82\xc2\x29\xb5\xc2\x59\x46\x85\xcb\xde\x0a\xa7\xb5\x9f\xaf\xb5\x0e\x09\x94\x94\xd1\x96\xf1\xff\x2c\x61\xe7\x8a\xfc\xef\xde\x29\xc0\x8b\x2b\x0e\xcf\x4b\xc7\x50\x67\x4c\x6e\x99\xde\x23\x83\x23\xab\x73\xc2\xb8\x90\x14\xe0\x27\x00\x00\xff\xff\x7a\xcc\xb2\x8c\x45\x02\x00\x00")
func _2_timestampUpSqlBytes() ([]byte, error) {
return bindataRead(
__2_timestampUpSql,
"2_timestamp.up.sql",
)
}
func _2_timestampUpSql() (*asset, error) {
bytes, err := _2_timestampUpSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "2_timestamp.up.sql", size: 581, mode: os.FileMode(0664), modTime: time.Unix(1720472437, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x54, 0x76, 0x6e, 0xf4, 0x2a, 0x56, 0x88, 0xd0, 0xe3, 0x5e, 0x7d, 0xbd, 0xec, 0x5c, 0x59, 0xfa, 0x44, 0x18, 0x82, 0xae, 0x55, 0x4c, 0xcf, 0x41, 0xa6, 0x7, 0x63, 0xba, 0x41, 0xa4, 0xfc, 0x3}}
return a, nil
}
var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\xb1\x0d\xc4\x20\x0c\x05\xd0\x9e\x29\xfe\x02\xd8\xfd\x6d\xe3\x4b\xac\x2f\x44\x82\x09\x78\x7f\xa5\x49\xfd\xa6\x1d\xdd\xe8\xd8\xcf\x55\x8a\x2a\xe3\x47\x1f\xbe\x2c\x1d\x8c\xfa\x6f\xe3\xb4\x34\xd4\xd9\x89\xbb\x71\x59\xb6\x18\x1b\x35\x20\xa2\x9f\x0a\x03\xa2\xe5\x0d\x00\x00\xff\xff\x60\xcd\x06\xbe\x4a\x00\x00\x00")
func docGoBytes() ([]byte, error) {
@ -222,10 +222,10 @@ func AssetNames() []string {
// _bindata is a table, holding each asset generator, mapped to its name.
var _bindata = map[string]func() (*asset, error){
"1_setup.down.sql": _1_setupDownSql,
"1_setup.up.sql": _1_setupUpSql,
"2_timestamp.up.sql": _2_timestampUpSql,
"doc.go": docGo,
}
@ -270,9 +270,9 @@ type bintree struct {
}
var _bintree = &bintree{nil, map[string]*bintree{
"1_setup.down.sql": &bintree{_1_setupDownSql, map[string]*bintree{}},
"1_setup.up.sql": &bintree{_1_setupUpSql, map[string]*bintree{}},
"doc.go": &bintree{docGo, map[string]*bintree{}},
"1_setup.up.sql": &bintree{_1_setupUpSql, map[string]*bintree{}},
"2_timestamp.up.sql": &bintree{_2_timestampUpSql, map[string]*bintree{}},
"doc.go": &bintree{docGo, map[string]*bintree{}},
}}
// RestoreAsset restores an asset under the given directory.

View File

@ -1,6 +0,0 @@
DROP INDEX IF EXISTS idxMsg1;
DROP INDEX IF EXISTS idxMsg2;
DROP INDEX IF EXISTS idxStr1;
DROP TABLE IF EXISTS syncTopicStatus;
DROP TABLE IF EXISTS missingMessages;
DROP TABLE IF EXISTS storeNodeUnavailable;

View File

@ -0,0 +1,20 @@
CREATE TABLE IF NOT EXISTS missingMessages_new (
runId TEXT NOT NULL,
clusterId INTEGER NOT NULL,
pubsubTopic TEXT NOT NULL,
messageHash TEXT NOT NULL,
storenode TEXT NOT NULL,
msgStatus TEXT NOT NULL,
storedAt BIGINT NOT NULL,
PRIMARY KEY (messageHash, storenode)
);
INSERT INTO missingMessages_new (runId, clusterId, pubsubTopic, messageHash, storenode, msgStatus, storedAt)
SELECT runId, clusterId, pubsubTopic, messageHash, storenode, msgStatus, storedAt
FROM missingMessages;
DROP TABLE missingMessages;
ALTER TABLE missingMessages_new
RENAME TO missingMessages;