mirror of
https://github.com/logos-messaging/storenode-messages-counter.git
synced 2026-01-02 14:13:11 +00:00
chore: log missing messages info
This commit is contained in:
parent
4e6118d42d
commit
c579a272d0
@ -199,6 +199,7 @@ func verifyHistory(ctx context.Context, storenodes []peer.AddrInfo, wakuNode *no
|
||||
// ========================================================================
|
||||
msgMapLock.Lock()
|
||||
defer msgMapLock.Unlock()
|
||||
|
||||
for msgHash, nodes := range msgMap {
|
||||
var missingIn []peer.AddrInfo
|
||||
var unknownIn []peer.AddrInfo
|
||||
@ -210,14 +211,20 @@ func verifyHistory(ctx context.Context, storenodes []peer.AddrInfo, wakuNode *no
|
||||
}
|
||||
}
|
||||
|
||||
err := dbStore.RecordMessage(runId, tx, msgHash, options.ClusterID, msgAttr[msgHash].PubsubTopic, msgAttr[msgHash].Timestamp, missingIn, "does_not_exist")
|
||||
if err != nil {
|
||||
return err
|
||||
if len(missingIn) != 0 {
|
||||
logger.Info("missing message identified", zap.Stringer("hash", msgHash), zap.String("pubsubTopic", msgAttr[msgHash].PubsubTopic), zap.Int("num_nodes", len(missingIn)))
|
||||
err := dbStore.RecordMessage(runId, tx, msgHash, options.ClusterID, msgAttr[msgHash].PubsubTopic, msgAttr[msgHash].Timestamp, missingIn, "does_not_exist")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
err = dbStore.RecordMessage(runId, tx, msgHash, options.ClusterID, msgAttr[msgHash].PubsubTopic, msgAttr[msgHash].Timestamp, unknownIn, "unknown")
|
||||
if err != nil {
|
||||
return err
|
||||
if len(unknownIn) != 0 {
|
||||
logger.Debug("message with unknown state identified", zap.Stringer("hash", msgHash), zap.String("pubsubTopic", msgAttr[msgHash].PubsubTopic), zap.Int("num_nodes", len(missingIn)))
|
||||
err = dbStore.RecordMessage(runId, tx, msgHash, options.ClusterID, msgAttr[msgHash].PubsubTopic, msgAttr[msgHash].Timestamp, unknownIn, "unknown")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -27,7 +27,6 @@ var cliFlags = []cli.Flag{
|
||||
}),
|
||||
altsrc.NewStringFlag(&cli.StringFlag{
|
||||
Name: "dns-discovery-name-server",
|
||||
Aliases: []string{"dns-discovery-nameserver"},
|
||||
Usage: "DNS nameserver IP to query (empty to use system's default)",
|
||||
Destination: &options.DNSDiscoveryNameserver,
|
||||
EnvVars: []string{"STORE_MSG_CTR_DNS_DISC_NAMESERVER"},
|
||||
|
||||
@ -240,10 +240,6 @@ func (d *DBStore) UpdateTopicSyncState(tx *sql.Tx, clusterID uint, topic string,
|
||||
}
|
||||
|
||||
func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, clusterID uint, topic string, timestamp uint64, storenodes []peer.AddrInfo, status string) error {
|
||||
if len(storenodes) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
stmt, err := tx.Prepare("INSERT INTO missingMessages(runId, clusterId, pubsubTopic, messageHash, msgTimestamp, storenode, msgStatus, storedAt) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)")
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user