501 lines
14 KiB
Go
Raw Normal View History

2024-05-08 16:37:42 -04:00
package main
import (
"context"
"database/sql"
2024-05-29 21:42:22 -04:00
"encoding/hex"
2024-05-20 16:02:09 -04:00
"errors"
2024-05-21 08:02:54 -04:00
"fmt"
"net"
2024-05-09 15:22:50 -04:00
"sync"
2024-05-08 16:37:42 -04:00
"time"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/google/uuid"
2024-05-20 16:02:09 -04:00
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
2024-07-10 11:42:28 -04:00
"github.com/prometheus/client_golang/prometheus"
2024-05-20 16:02:09 -04:00
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
2024-05-08 16:37:42 -04:00
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/storenode-messages/internal/logging"
2024-07-08 10:49:48 -04:00
"github.com/waku-org/storenode-messages/internal/metrics"
2024-05-08 16:37:42 -04:00
"github.com/waku-org/storenode-messages/internal/persistence"
"go.uber.org/zap"
2024-05-28 16:37:00 -04:00
"go.uber.org/zap/zapcore"
2024-05-08 16:37:42 -04:00
"google.golang.org/protobuf/proto"
)
type MessageExistence int
const (
Unknown MessageExistence = iota
Exists
DoesNotExist
)
2024-05-09 15:22:50 -04:00
const timeInterval = 2 * time.Minute
const delay = 5 * time.Minute
2024-05-08 16:37:42 -04:00
const maxAttempts = 3
type Application struct {
node *node.WakuNode
metrics metrics.Metrics
db *persistence.DBStore
}
2024-05-08 16:37:42 -04:00
func Execute(ctx context.Context, options Options) error {
// Set encoding for logs (console, json, ...)
// Note that libp2p reads the encoding from GOLOG_LOG_FMT env var.
logging.InitLogger(options.LogEncoding, options.LogOutput)
logger := logging.Logger()
2024-07-08 10:49:48 -04:00
var metricsServer *metrics.Server
if options.EnableMetrics {
metricsServer = metrics.NewMetricsServer(options.MetricsAddress, options.MetricsPort, logger)
go metricsServer.Start()
}
2024-05-08 16:37:42 -04:00
var db *sql.DB
var migrationFn func(*sql.DB, *zap.Logger) error
db, migrationFn, err := persistence.ParseURL(options.DatabaseURL, logger)
if err != nil {
return err
}
2024-05-20 10:10:28 -04:00
dbStore, err := persistence.NewDBStore(logger, persistence.WithDB(db), persistence.WithMigrations(migrationFn), persistence.WithRetentionPolicy(options.RetentionPolicy))
2024-05-08 16:37:42 -04:00
if err != nil {
return err
}
defer dbStore.Stop()
2024-05-20 16:02:09 -04:00
var discoveredNodes []dnsdisc.DiscoveredNode
if len(options.DNSDiscoveryURLs.Value()) != 0 {
discoveredNodes = node.GetNodesFromDNSDiscovery(logger, ctx, options.DNSDiscoveryNameserver, options.DNSDiscoveryURLs.Value())
}
var storenodes []peer.AddrInfo
for _, node := range discoveredNodes {
if len(node.PeerInfo.Addrs) == 0 {
continue
}
storenodes = append(storenodes, node.PeerInfo)
}
for _, node := range options.StoreNodes {
pInfo, err := peer.AddrInfosFromP2pAddrs(node)
if err != nil {
return err
}
storenodes = append(storenodes, pInfo...)
}
if len(storenodes) == 0 {
return errors.New("no storenodes specified")
}
2024-05-21 08:02:54 -04:00
hostAddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", options.Address, options.Port))
if err != nil {
return err
}
2024-05-28 16:37:00 -04:00
lvl, err := zapcore.ParseLevel(options.LogLevel)
if err != nil {
return err
}
2024-05-08 16:37:42 -04:00
wakuNode, err := node.New(
2024-05-28 16:37:00 -04:00
node.WithLogLevel(lvl),
2024-05-08 16:37:42 -04:00
node.WithNTP(),
node.WithClusterID(uint16(options.ClusterID)),
2024-05-21 08:02:54 -04:00
node.WithHostAddress(hostAddr),
2024-05-08 16:37:42 -04:00
)
if err != nil {
return err
}
2024-07-10 11:42:28 -04:00
metrics := metrics.NewMetrics(prometheus.DefaultRegisterer, logger)
2024-05-08 16:37:42 -04:00
err = wakuNode.Start(ctx)
if err != nil {
return err
}
defer wakuNode.Stop()
2024-05-20 16:02:09 -04:00
for _, s := range storenodes {
wakuNode.Host().Peerstore().AddAddrs(s.ID, s.Addrs, peerstore.PermanentAddrTTL)
}
2024-05-08 16:37:42 -04:00
err = dbStore.Start(ctx, wakuNode.Timesource())
if err != nil {
return err
}
application := &Application{
node: wakuNode,
metrics: metrics,
db: dbStore,
}
timer := time.NewTimer(0)
defer timer.Stop()
2024-05-08 16:37:42 -04:00
for {
select {
case <-ctx.Done():
return nil
case <-timer.C:
2024-05-30 08:54:41 -04:00
tmpUUID := uuid.New()
runId := hex.EncodeToString(tmpUUID[:])
runIdLogger := logger.With(zap.String("runId", runId))
runIdLogger.Info("verifying message history...")
err := application.verifyHistory(ctx, runId, storenodes, runIdLogger)
if err != nil {
return err
}
2024-05-30 08:54:41 -04:00
runIdLogger.Info("verification complete")
timer.Reset(timeInterval)
2024-05-09 15:22:50 -04:00
}
}
}
2024-05-08 16:37:42 -04:00
var msgMapLock sync.Mutex
2024-05-20 16:02:09 -04:00
var msgMap map[pb.MessageHash]map[peer.ID]MessageExistence
2024-07-08 17:00:54 -04:00
var msgPubsubTopic map[pb.MessageHash]string
2024-05-08 16:37:42 -04:00
func (app *Application) verifyHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo, logger *zap.Logger) error {
2024-05-09 15:22:50 -04:00
// [MessageHash][StoreNode] = exists?
msgMapLock.Lock()
2024-05-20 16:02:09 -04:00
msgMap = make(map[pb.MessageHash]map[peer.ID]MessageExistence)
2024-07-08 17:00:54 -04:00
msgPubsubTopic = make(map[pb.MessageHash]string)
2024-05-09 15:22:50 -04:00
msgMapLock.Unlock()
2024-05-08 16:37:42 -04:00
topicSyncStatus, err := app.db.GetTopicSyncStatus(ctx, options.ClusterID, options.PubSubTopics.Value())
2024-05-09 15:22:50 -04:00
if err != nil {
return err
}
2024-05-08 16:37:42 -04:00
tx, err := app.db.GetTrx(ctx)
2024-05-09 15:22:50 -04:00
if err != nil {
return err
}
2024-05-08 16:37:42 -04:00
2024-05-09 15:22:50 -04:00
defer func() {
if err == nil {
err = tx.Commit()
return
}
// don't shadow original error
_ = tx.Rollback()
}()
wg := sync.WaitGroup{}
for topic, lastSyncTimestamp := range topicSyncStatus {
wg.Add(1)
go func(topic string, lastSyncTimestamp *time.Time) {
2024-05-09 15:22:50 -04:00
defer wg.Done()
app.retrieveHistory(ctx, runId, storenodes, topic, lastSyncTimestamp, tx, logger)
}(topic, lastSyncTimestamp)
2024-05-09 15:22:50 -04:00
}
wg.Wait()
// Verify for each storenode which messages are not available, and query
// for their existence using message hash
// ========================================================================
2024-05-20 16:02:09 -04:00
msgsToVerify := make(map[peer.ID][]pb.MessageHash) // storenode -> msgHash
2024-05-09 15:22:50 -04:00
msgMapLock.Lock()
for msgHash, nodes := range msgMap {
2024-05-20 16:02:09 -04:00
for _, node := range storenodes {
if nodes[node.ID] != Exists {
msgsToVerify[node.ID] = append(msgsToVerify[node.ID], msgHash)
2024-05-09 15:22:50 -04:00
}
}
}
msgMapLock.Unlock()
wg = sync.WaitGroup{}
2024-05-20 16:02:09 -04:00
for peerID, messageHashes := range msgsToVerify {
2024-05-09 15:22:50 -04:00
wg.Add(1)
2024-05-20 16:02:09 -04:00
go func(peerID peer.ID, messageHashes []pb.MessageHash) {
2024-05-09 15:22:50 -04:00
defer wg.Done()
app.verifyMessageExistence(ctx, runId, peerID, messageHashes, logger)
2024-05-20 16:02:09 -04:00
}(peerID, messageHashes)
2024-05-09 15:22:50 -04:00
}
wg.Wait()
// If a message is not available, store in DB in which store nodes it wasnt
2024-07-08 17:00:54 -04:00
// available
2024-05-09 15:22:50 -04:00
// ========================================================================
msgMapLock.Lock()
defer msgMapLock.Unlock()
2024-05-20 16:24:16 -04:00
2024-07-19 11:37:41 -04:00
missingInSummary := make(map[peer.ID]int)
unknownInSummary := make(map[peer.ID]int)
2024-07-10 11:42:28 -04:00
2024-05-09 15:22:50 -04:00
for msgHash, nodes := range msgMap {
2024-07-19 11:37:41 -04:00
var missingIn []peer.ID
var unknownIn []peer.ID
2024-05-20 16:02:09 -04:00
for _, node := range storenodes {
if nodes[node.ID] == DoesNotExist {
2024-07-19 11:37:41 -04:00
missingIn = append(missingIn, node.ID)
missingInSummary[node.ID]++
2024-05-20 16:02:09 -04:00
} else if nodes[node.ID] == Unknown {
2024-07-19 11:37:41 -04:00
unknownIn = append(unknownIn, node.ID)
unknownInSummary[node.ID]++
2024-05-09 15:22:50 -04:00
}
}
2024-05-08 16:37:42 -04:00
2024-05-20 16:24:16 -04:00
if len(missingIn) != 0 {
2024-07-08 17:00:54 -04:00
logger.Info("missing message identified", zap.Stringer("hash", msgHash), zap.String("pubsubTopic", msgPubsubTopic[msgHash]), zap.Int("num_nodes", len(missingIn)))
err := app.db.RecordMessage(runId, tx, msgHash, options.ClusterID, msgPubsubTopic[msgHash], missingIn, "does_not_exist")
2024-05-20 16:24:16 -04:00
if err != nil {
return err
}
2024-05-09 15:22:50 -04:00
}
2024-05-08 16:37:42 -04:00
2024-05-20 16:24:16 -04:00
if len(unknownIn) != 0 {
2024-07-08 17:00:54 -04:00
logger.Info("message with unknown state identified", zap.Stringer("hash", msgHash), zap.String("pubsubTopic", msgPubsubTopic[msgHash]), zap.Int("num_nodes", len(missingIn)))
err = app.db.RecordMessage(runId, tx, msgHash, options.ClusterID, msgPubsubTopic[msgHash], unknownIn, "unknown")
2024-05-20 16:24:16 -04:00
if err != nil {
return err
}
2024-05-09 15:22:50 -04:00
}
}
2024-05-08 16:37:42 -04:00
2024-07-10 11:42:28 -04:00
for s, cnt := range missingInSummary {
app.metrics.RecordMissingMessages(s, "does_not_exist", cnt)
2024-07-19 11:37:41 -04:00
logger.Info("missing message summary", zap.Stringer("storenode", s), zap.Int("numMsgs", cnt))
2024-07-10 11:42:28 -04:00
}
for s, cnt := range unknownInSummary {
app.metrics.RecordMissingMessages(s, "unknown", cnt)
2024-07-19 11:37:41 -04:00
logger.Info("messages that could not be verified summary", zap.Stringer("storenode", s), zap.Int("numMsgs", cnt))
2024-07-10 13:33:33 -04:00
2024-05-09 15:22:50 -04:00
}
2024-05-08 16:37:42 -04:00
2024-05-09 15:22:50 -04:00
return nil
}
2024-05-08 16:37:42 -04:00
func (app *Application) fetchStoreNodeMessages(ctx context.Context, runId string, storenodeID peer.ID, topic string, startTime time.Time, endTime time.Time, logger *zap.Logger) {
var result *store.Result
var err error
queryLogger := logger.With(zap.Stringer("storenode", storenodeID), zap.Int64("startTime", startTime.UnixNano()), zap.Int64("endTime", endTime.UnixNano()))
2024-05-08 16:37:42 -04:00
retry := true
success := false
count := 1
for retry && count <= maxAttempts {
queryLogger.Info("retrieving message history for topic!", zap.Int("attempt", count))
tCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
result, err = app.node.Store().Query(tCtx, store.FilterCriteria{
ContentFilter: protocol.NewContentFilter(topic),
TimeStart: proto.Int64(startTime.UnixNano()),
TimeEnd: proto.Int64(endTime.UnixNano()),
}, store.WithPeer(storenodeID), store.WithPaging(false, 100), store.IncludeData(false))
cancel()
if err != nil {
queryLogger.Error("could not query storenode", zap.Error(err), zap.Int("attempt", count))
time.Sleep(2 * time.Second)
} else {
queryLogger.Info("messages available", zap.Int("len", len(result.Messages())))
retry = false
success = true
}
count++
2024-05-09 15:22:50 -04:00
}
2024-05-08 16:37:42 -04:00
if !success {
queryLogger.Error("storenode not available")
err := app.db.RecordStorenodeUnavailable(runId, storenodeID)
if err != nil {
queryLogger.Error("could not store node unavailable", zap.Error(err))
}
app.metrics.RecordStorenodeAvailability(storenodeID, false)
2024-05-09 15:22:50 -04:00
return
}
app.metrics.RecordStorenodeAvailability(storenodeID, true)
2024-05-30 15:35:49 -04:00
for !result.IsComplete() {
msgMapLock.Lock()
for _, mkv := range result.Messages() {
hash := mkv.WakuMessageHash()
_, ok := msgMap[hash]
if !ok {
msgMap[hash] = make(map[peer.ID]MessageExistence)
}
msgMap[hash][storenodeID] = Exists
msgPubsubTopic[hash] = mkv.GetPubsubTopic()
}
msgMapLock.Unlock()
retry := true
success := false
count := 1
cursorLogger := queryLogger.With(zap.String("cursor", hex.EncodeToString(result.Cursor())))
for retry && count <= maxAttempts {
cursorLogger.Info("retrieving next page")
2024-05-30 15:35:49 -04:00
tCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
err = result.Next(tCtx)
2024-05-30 15:35:49 -04:00
cancel()
2024-05-09 15:22:50 -04:00
if err != nil {
cursorLogger.Error("could not query storenode", zap.Error(err))
2024-05-09 15:22:50 -04:00
time.Sleep(2 * time.Second)
} else {
cursorLogger.Info("more messages available", zap.Int("len", len(result.Messages())))
retry = false
success = true
2024-05-09 15:22:50 -04:00
}
count++
2024-05-09 15:22:50 -04:00
}
2024-05-08 16:37:42 -04:00
if !success {
cursorLogger.Error("storenode not available")
err := app.db.RecordStorenodeUnavailable(runId, storenodeID)
2024-05-20 10:10:28 -04:00
if err != nil {
cursorLogger.Error("could not store recordnode unavailable", zap.Error(err))
2024-05-08 16:37:42 -04:00
}
app.metrics.RecordStorenodeAvailability(storenodeID, false)
return
2024-05-09 15:22:50 -04:00
}
2024-07-10 11:42:28 -04:00
app.metrics.RecordStorenodeAvailability(storenodeID, true)
2024-05-09 15:22:50 -04:00
}
}
func (app *Application) retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo, topic string, lastSyncTimestamp *time.Time, tx *sql.Tx, logger *zap.Logger) {
logger = logger.With(zap.String("topic", topic), zap.Timep("lastSyncTimestamp", lastSyncTimestamp))
now := app.node.Timesource().Now()
// Query is done with a delay
startTime := now.Add(-(timeInterval + delay))
if lastSyncTimestamp != nil {
startTime = *lastSyncTimestamp
}
endTime := now.Add(-delay)
if startTime.After(endTime) {
logger.Warn("too soon to retrieve messages for topic")
return
}
// Determine if the messages exist across all nodes
wg := sync.WaitGroup{}
for _, node := range storenodes {
wg.Add(1)
go func(peerID peer.ID) {
defer wg.Done()
app.fetchStoreNodeMessages(ctx, runId, peerID, topic, startTime, endTime, logger)
}(node.ID)
}
wg.Wait()
2024-05-08 16:37:42 -04:00
2024-05-09 15:22:50 -04:00
// Update db with last sync time
err := app.db.UpdateTopicSyncState(tx, options.ClusterID, topic, endTime)
if err != nil {
logger.Panic("could not update topic sync state", zap.Error(err))
}
2024-05-09 15:22:50 -04:00
}
2024-05-08 16:37:42 -04:00
func (app *Application) verifyMessageExistence(ctx context.Context, runId string, peerID peer.ID, messageHashes []pb.MessageHash, logger *zap.Logger) {
2024-05-09 15:22:50 -04:00
var result *store.Result
var err error
peerInfo := app.node.Host().Peerstore().PeerInfo(peerID)
2024-05-20 16:02:09 -04:00
2024-07-08 10:49:48 -04:00
queryLogger := logger.With(zap.Stringer("storenode", peerID))
2024-05-31 14:41:54 -04:00
retry := true
success := false
count := 1
for retry && count <= maxAttempts {
queryLogger.Info("querying by hash", zap.Int("attempt", count))
2024-05-31 15:24:18 -04:00
tCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
result, err = app.node.Store().QueryByHash(tCtx, messageHashes, store.IncludeData(false), store.WithPeer(peerInfo.ID), store.WithPaging(false, 100))
2024-05-31 15:24:18 -04:00
cancel()
2024-05-09 15:22:50 -04:00
if err != nil {
queryLogger.Error("could not query storenode", zap.Error(err), zap.Int("attempt", count))
2024-05-09 15:22:50 -04:00
time.Sleep(2 * time.Second)
} else {
2024-05-31 14:41:54 -04:00
queryLogger.Info("hashes available", zap.Int("len", len(result.Messages())))
retry = false
success = true
2024-05-09 15:22:50 -04:00
}
count++
2024-05-09 15:22:50 -04:00
}
if !success {
2024-05-31 14:41:54 -04:00
queryLogger.Error("storenode not available")
err := app.db.RecordStorenodeUnavailable(runId, peerID)
2024-05-20 10:10:28 -04:00
if err != nil {
2024-05-31 14:41:54 -04:00
queryLogger.Error("could not store recordnode unavailable", zap.Error(err))
2024-05-20 10:10:28 -04:00
}
app.metrics.RecordStorenodeAvailability(peerID, false)
return
}
app.metrics.RecordStorenodeAvailability(peerID, true)
for !result.IsComplete() {
msgMapLock.Lock()
for _, mkv := range result.Messages() {
hash := mkv.WakuMessageHash()
_, ok := msgMap[hash]
if !ok {
msgMap[hash] = make(map[peer.ID]MessageExistence)
}
msgMap[hash][peerInfo.ID] = Exists
}
for _, msgHash := range messageHashes {
if msgMap[msgHash][peerInfo.ID] != Exists {
msgMap[msgHash][peerInfo.ID] = DoesNotExist
2024-05-08 16:37:42 -04:00
}
}
2024-05-08 16:37:42 -04:00
msgMapLock.Unlock()
retry := true
success := false
count := 1
for retry && count <= maxAttempts {
queryLogger.Info("executing next while querying hashes", zap.String("cursor", hexutil.Encode(result.Cursor())), zap.Int("attempt", count))
tCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
err = result.Next(tCtx)
cancel()
if err != nil {
queryLogger.Error("could not query storenode", zap.String("cursor", hexutil.Encode(result.Cursor())), zap.Error(err), zap.Int("attempt", count))
time.Sleep(2 * time.Second)
2024-07-19 11:30:22 -04:00
} else {
queryLogger.Info("more hashes available", zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Int("len", len(result.Messages())))
retry = false
success = true
2024-05-09 15:22:50 -04:00
}
count++
2024-05-08 16:37:42 -04:00
}
if !success {
queryLogger.Error("storenode not available", zap.String("cursor", hexutil.Encode(result.Cursor())))
err := app.db.RecordStorenodeUnavailable(runId, peerID)
if err != nil {
logger.Error("could not store recordnode unavailable", zap.Error(err), zap.String("cursor", hex.EncodeToString(result.Cursor())), zap.Stringer("storenode", peerInfo))
}
app.metrics.RecordStorenodeAvailability(peerID, false)
return
}
app.metrics.RecordStorenodeAvailability(peerID, true)
2024-05-08 16:37:42 -04:00
}
}