2024-05-08 16:37:42 -04:00
package main
import (
"context"
"database/sql"
2024-05-29 21:42:22 -04:00
"encoding/hex"
2024-05-20 16:02:09 -04:00
"errors"
2024-05-21 08:02:54 -04:00
"fmt"
"net"
2024-05-09 15:22:50 -04:00
"sync"
2024-05-08 16:37:42 -04:00
"time"
2024-08-07 16:14:06 -04:00
"golang.org/x/exp/maps"
2024-05-08 16:37:42 -04:00
"github.com/ethereum/go-ethereum/common/hexutil"
2024-05-09 16:03:33 -04:00
"github.com/google/uuid"
2024-05-20 16:02:09 -04:00
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
2024-07-10 11:42:28 -04:00
"github.com/prometheus/client_golang/prometheus"
2024-05-20 16:02:09 -04:00
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
2024-05-08 16:37:42 -04:00
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/storenode-messages/internal/logging"
2024-07-08 10:49:48 -04:00
"github.com/waku-org/storenode-messages/internal/metrics"
2024-05-08 16:37:42 -04:00
"github.com/waku-org/storenode-messages/internal/persistence"
"go.uber.org/zap"
2024-05-28 16:37:00 -04:00
"go.uber.org/zap/zapcore"
2024-05-08 16:37:42 -04:00
"google.golang.org/protobuf/proto"
)
type MessageExistence int
const (
Unknown MessageExistence = iota
Exists
DoesNotExist
)
2024-05-09 15:22:50 -04:00
const timeInterval = 2 * time . Minute
const delay = 5 * time . Minute
2024-05-08 16:37:42 -04:00
const maxAttempts = 3
2024-09-20 09:20:58 -04:00
const maxMsgHashesPerRequest = 25
2024-05-08 16:37:42 -04:00
2024-07-19 14:14:02 -04:00
type Application struct {
node * node . WakuNode
metrics metrics . Metrics
db * persistence . DBStore
}
2024-05-08 16:37:42 -04:00
func Execute ( ctx context . Context , options Options ) error {
// Set encoding for logs (console, json, ...)
// Note that libp2p reads the encoding from GOLOG_LOG_FMT env var.
logging . InitLogger ( options . LogEncoding , options . LogOutput )
logger := logging . Logger ( )
2024-07-19 18:34:17 -04:00
2024-07-19 18:38:27 -04:00
logger . Warn ( "AppStart" )
2024-05-08 16:37:42 -04:00
2024-07-08 10:49:48 -04:00
var metricsServer * metrics . Server
if options . EnableMetrics {
metricsServer = metrics . NewMetricsServer ( options . MetricsAddress , options . MetricsPort , logger )
go metricsServer . Start ( )
}
2024-05-08 16:37:42 -04:00
var db * sql . DB
var migrationFn func ( * sql . DB , * zap . Logger ) error
db , migrationFn , err := persistence . ParseURL ( options . DatabaseURL , logger )
if err != nil {
return err
}
2024-08-22 11:05:56 -04:00
dbStore , err := persistence . NewDBStore (
options . ClusterID ,
options . FleetName ,
logger ,
persistence . WithDB ( db ) ,
persistence . WithMigrations ( migrationFn ) ,
persistence . WithRetentionPolicy ( options . RetentionPolicy ) ,
)
2024-05-08 16:37:42 -04:00
if err != nil {
return err
}
defer dbStore . Stop ( )
2024-05-20 16:02:09 -04:00
var discoveredNodes [ ] dnsdisc . DiscoveredNode
if len ( options . DNSDiscoveryURLs . Value ( ) ) != 0 {
discoveredNodes = node . GetNodesFromDNSDiscovery ( logger , ctx , options . DNSDiscoveryNameserver , options . DNSDiscoveryURLs . Value ( ) )
}
var storenodes [ ] peer . AddrInfo
for _ , node := range discoveredNodes {
if len ( node . PeerInfo . Addrs ) == 0 {
continue
}
storenodes = append ( storenodes , node . PeerInfo )
}
for _ , node := range options . StoreNodes {
pInfo , err := peer . AddrInfosFromP2pAddrs ( node )
if err != nil {
return err
}
storenodes = append ( storenodes , pInfo ... )
}
if len ( storenodes ) == 0 {
return errors . New ( "no storenodes specified" )
}
2024-05-21 08:02:54 -04:00
hostAddr , err := net . ResolveTCPAddr ( "tcp" , fmt . Sprintf ( "%s:%d" , options . Address , options . Port ) )
if err != nil {
return err
}
2024-05-28 16:37:00 -04:00
lvl , err := zapcore . ParseLevel ( options . LogLevel )
if err != nil {
return err
}
2024-05-08 16:37:42 -04:00
wakuNode , err := node . New (
2024-05-28 16:37:00 -04:00
node . WithLogLevel ( lvl ) ,
2024-05-08 16:37:42 -04:00
node . WithNTP ( ) ,
node . WithClusterID ( uint16 ( options . ClusterID ) ) ,
2024-05-21 08:02:54 -04:00
node . WithHostAddress ( hostAddr ) ,
2024-09-18 17:29:03 -04:00
node . WithWakuStoreRateLimit ( 7 ) ,
2024-05-08 16:37:42 -04:00
)
2024-05-09 16:03:33 -04:00
if err != nil {
return err
}
2024-07-10 11:42:28 -04:00
2024-08-22 11:12:42 -04:00
metrics := metrics . NewMetrics ( options . ClusterID , options . FleetName , prometheus . DefaultRegisterer , logger )
2024-07-10 11:42:28 -04:00
2024-05-08 16:37:42 -04:00
err = wakuNode . Start ( ctx )
if err != nil {
return err
}
defer wakuNode . Stop ( )
2024-08-07 16:14:06 -04:00
var storenodeIDs peer . IDSlice
2024-05-20 16:02:09 -04:00
for _ , s := range storenodes {
wakuNode . Host ( ) . Peerstore ( ) . AddAddrs ( s . ID , s . Addrs , peerstore . PermanentAddrTTL )
2024-08-07 16:14:06 -04:00
storenodeIDs = append ( storenodeIDs , s . ID )
2024-05-20 16:02:09 -04:00
}
2024-05-08 16:37:42 -04:00
err = dbStore . Start ( ctx , wakuNode . Timesource ( ) )
if err != nil {
return err
}
2024-07-19 14:14:02 -04:00
application := & Application {
node : wakuNode ,
metrics : metrics ,
db : dbStore ,
}
2024-11-20 13:50:28 -04:00
metricsChan := make ( chan * metricsSummary , 50 )
2024-08-08 11:10:46 -04:00
go func ( ) {
missingMessagesTimer := time . NewTimer ( 0 )
defer missingMessagesTimer . Stop ( )
for {
select {
case <- ctx . Done ( ) :
return
case <- missingMessagesTimer . C :
2024-08-07 16:14:06 -04:00
tmpUUID := uuid . New ( )
runId := hex . EncodeToString ( tmpUUID [ : ] )
2024-08-22 17:43:27 -04:00
runIdLogger := logger . With ( zap . String ( "runId" , runId ) , zap . String ( "fleet" , options . FleetName ) , zap . Uint ( "clusterID" , options . ClusterID ) )
2024-08-07 16:14:06 -04:00
2024-08-08 11:10:46 -04:00
runIdLogger . Info ( "verifying message history..." )
2024-11-20 13:50:28 -04:00
shouldResetTimer , err := application . verifyHistory ( ctx , runId , storenodeIDs , metricsChan , runIdLogger )
2024-08-07 16:14:06 -04:00
if err != nil {
2024-08-08 11:10:46 -04:00
runIdLogger . Error ( "could not verify message history" , zap . Error ( err ) )
2024-08-07 16:14:06 -04:00
}
2024-08-08 11:10:46 -04:00
runIdLogger . Info ( "verification complete" )
2024-08-07 16:14:06 -04:00
2024-08-19 15:38:32 -04:00
if shouldResetTimer {
missingMessagesTimer . Reset ( 0 )
} else {
missingMessagesTimer . Reset ( timeInterval )
}
2024-08-08 11:10:46 -04:00
}
2024-05-09 15:22:50 -04:00
}
2024-08-08 11:10:46 -04:00
} ( )
go func ( ) {
2024-08-08 14:07:57 -04:00
syncCheckTimer := time . NewTimer ( 0 )
2024-08-08 11:10:46 -04:00
defer syncCheckTimer . Stop ( )
for {
select {
case <- ctx . Done ( ) :
return
case <- syncCheckTimer . C :
go func ( ) {
tmpUUID := uuid . New ( )
runId := hex . EncodeToString ( tmpUUID [ : ] )
runIdLogger := logger . With ( zap . String ( "syncRunId" , runId ) )
err := application . checkMissingMessageStatus ( ctx , storenodeIDs , runId , runIdLogger )
if err != nil {
logger . Error ( "could not recheck the status of missing messages" , zap . Error ( err ) )
return
}
err = application . countMissingMessages ( storenodeIDs )
if err != nil {
logger . Error ( "could not count missing messages" , zap . Error ( err ) )
return
}
runIdLogger . Info ( "missing messages recheck complete" )
2024-08-08 14:07:57 -04:00
2024-10-23 17:38:04 -04:00
syncCheckTimer . Reset ( 15 * time . Minute )
2024-08-08 11:10:46 -04:00
} ( )
}
}
2024-11-20 13:50:28 -04:00
} ( )
go func ( ) {
// Metrics accumulator
t := time . NewTicker ( 20 * time . Second )
defer t . Stop ( )
2024-08-08 11:10:46 -04:00
2024-11-20 13:50:28 -04:00
metricsSummaryMissing := make ( map [ peer . ID ] int )
metricsSummaryUnknown := make ( map [ peer . ID ] int )
for {
select {
case <- ctx . Done ( ) :
return
case <- t . C :
for s , missingCnt := range metricsSummaryMissing {
metrics . RecordMissingMessages ( s , "does_not_exist" , missingCnt )
}
for s , unknownCnt := range metricsSummaryUnknown {
metrics . RecordMissingMessages ( s , "unknown" , unknownCnt )
}
metricsSummaryMissing = make ( map [ peer . ID ] int )
metricsSummaryUnknown = make ( map [ peer . ID ] int )
case m := <- metricsChan :
metricsSummaryMissing [ m . storenode ] += m . missingMessages
metricsSummaryUnknown [ m . storenode ] += m . unknownMessages
}
}
2024-08-08 11:10:46 -04:00
} ( )
<- ctx . Done ( )
return nil
2024-05-09 15:22:50 -04:00
}
2024-05-08 16:37:42 -04:00
2024-11-20 13:50:28 -04:00
type metricsSummary struct {
storenode peer . ID
missingMessages int
unknownMessages int
}
2024-05-09 16:03:33 -04:00
var msgMapLock sync . Mutex
2024-05-20 16:02:09 -04:00
var msgMap map [ pb . MessageHash ] map [ peer . ID ] MessageExistence
2024-07-08 17:00:54 -04:00
var msgPubsubTopic map [ pb . MessageHash ] string
2024-05-08 16:37:42 -04:00
2024-11-20 13:50:28 -04:00
func ( app * Application ) verifyHistory ( ctx context . Context , runId string , storenodes peer . IDSlice , metricsChan chan * metricsSummary , logger * zap . Logger ) ( shouldResetTimer bool , err error ) {
2024-05-09 16:03:33 -04:00
2024-05-09 15:22:50 -04:00
// [MessageHash][StoreNode] = exists?
msgMapLock . Lock ( )
2024-05-20 16:02:09 -04:00
msgMap = make ( map [ pb . MessageHash ] map [ peer . ID ] MessageExistence )
2024-07-08 17:00:54 -04:00
msgPubsubTopic = make ( map [ pb . MessageHash ] string )
2024-05-09 15:22:50 -04:00
msgMapLock . Unlock ( )
2024-05-08 16:37:42 -04:00
2024-08-22 11:05:56 -04:00
topicSyncStatus , err := app . db . GetTopicSyncStatus ( ctx , options . PubSubTopics . Value ( ) )
2024-05-09 15:22:50 -04:00
if err != nil {
2024-08-19 15:38:32 -04:00
return false , err
2024-05-09 15:22:50 -04:00
}
2024-05-08 16:37:42 -04:00
2024-07-19 14:14:02 -04:00
tx , err := app . db . GetTrx ( ctx )
2024-05-09 15:22:50 -04:00
if err != nil {
2024-08-19 15:38:32 -04:00
return false , err
2024-05-09 15:22:50 -04:00
}
2024-05-08 16:37:42 -04:00
2024-05-09 15:22:50 -04:00
defer func ( ) {
if err == nil {
err = tx . Commit ( )
return
}
// don't shadow original error
_ = tx . Rollback ( )
} ( )
wg := sync . WaitGroup { }
for topic , lastSyncTimestamp := range topicSyncStatus {
2024-05-09 16:03:33 -04:00
wg . Add ( 1 )
2024-08-19 15:38:32 -04:00
logger = logger . With ( zap . String ( "topic" , topic ) , zap . Timep ( "lastSyncTimestamp" , lastSyncTimestamp ) )
if lastSyncTimestamp != nil {
app . metrics . RecordLastSyncDate ( topic , * lastSyncTimestamp )
}
now := app . node . Timesource ( ) . Now ( )
// Query is done with a delay
startTime := now . Add ( - ( timeInterval + delay ) )
if lastSyncTimestamp != nil {
startTime = * lastSyncTimestamp
}
endTime := now . Add ( - delay )
if startTime . After ( endTime ) {
logger . Warn ( "too soon to retrieve messages for topic" )
continue
}
// This avoids extremely large resultsets
if endTime . Sub ( startTime ) > timeInterval {
2024-08-19 18:39:08 -04:00
endTime = startTime . Add ( timeInterval )
2024-08-19 15:38:32 -04:00
shouldResetTimer = true
}
go func ( topic string , startTime time . Time , endTime time . Time , logger * zap . Logger ) {
2024-05-09 15:22:50 -04:00
defer wg . Done ( )
2024-08-19 15:38:32 -04:00
app . retrieveHistory ( ctx , runId , storenodes , topic , startTime , endTime , tx , logger )
} ( topic , startTime , endTime , logger )
2024-05-09 15:22:50 -04:00
}
wg . Wait ( )
// Verify for each storenode which messages are not available, and query
// for their existence using message hash
// ========================================================================
2024-05-20 16:02:09 -04:00
msgsToVerify := make ( map [ peer . ID ] [ ] pb . MessageHash ) // storenode -> msgHash
2024-05-09 15:22:50 -04:00
msgMapLock . Lock ( )
for msgHash , nodes := range msgMap {
2024-08-07 16:14:06 -04:00
for _ , s := range storenodes {
if nodes [ s ] != Exists {
msgsToVerify [ s ] = append ( msgsToVerify [ s ] , msgHash )
2024-05-09 15:22:50 -04:00
}
}
}
msgMapLock . Unlock ( )
wg = sync . WaitGroup { }
2024-05-20 16:02:09 -04:00
for peerID , messageHashes := range msgsToVerify {
2024-05-09 15:22:50 -04:00
wg . Add ( 1 )
2024-05-20 16:02:09 -04:00
go func ( peerID peer . ID , messageHashes [ ] pb . MessageHash ) {
2024-05-09 15:22:50 -04:00
defer wg . Done ( )
2024-08-07 16:14:06 -04:00
onResult := func ( result * store . Result ) {
msgMapLock . Lock ( )
for _ , mkv := range result . Messages ( ) {
hash := mkv . WakuMessageHash ( )
_ , ok := msgMap [ hash ]
if ! ok {
msgMap [ hash ] = make ( map [ peer . ID ] MessageExistence )
}
msgMap [ hash ] [ result . PeerID ( ) ] = Exists
}
for _ , msgHash := range messageHashes {
if msgMap [ msgHash ] [ result . PeerID ( ) ] != Exists {
msgMap [ msgHash ] [ result . PeerID ( ) ] = DoesNotExist
}
}
msgMapLock . Unlock ( )
}
app . verifyMessageExistence ( ctx , runId , peerID , messageHashes , onResult , logger )
2024-05-20 16:02:09 -04:00
} ( peerID , messageHashes )
2024-05-09 15:22:50 -04:00
}
wg . Wait ( )
// If a message is not available, store in DB in which store nodes it wasnt
2024-07-08 17:00:54 -04:00
// available
2024-05-09 15:22:50 -04:00
// ========================================================================
msgMapLock . Lock ( )
defer msgMapLock . Unlock ( )
2024-05-20 16:24:16 -04:00
2024-07-19 11:37:41 -04:00
missingInSummary := make ( map [ peer . ID ] int )
unknownInSummary := make ( map [ peer . ID ] int )
2024-07-19 16:22:13 -04:00
totalMissingMessages := 0
2024-07-10 11:42:28 -04:00
2024-05-09 15:22:50 -04:00
for msgHash , nodes := range msgMap {
2024-07-19 11:37:41 -04:00
var missingIn [ ] peer . ID
var unknownIn [ ] peer . ID
2024-07-19 16:22:13 -04:00
2024-08-07 16:14:06 -04:00
for _ , s := range storenodes {
if nodes [ s ] == DoesNotExist {
missingIn = append ( missingIn , s )
missingInSummary [ s ] ++
} else if nodes [ s ] == Unknown {
unknownIn = append ( unknownIn , s )
unknownInSummary [ s ] ++
2024-05-09 15:22:50 -04:00
}
}
2024-05-08 16:37:42 -04:00
2024-05-20 16:24:16 -04:00
if len ( missingIn ) != 0 {
2024-07-08 17:00:54 -04:00
logger . Info ( "missing message identified" , zap . Stringer ( "hash" , msgHash ) , zap . String ( "pubsubTopic" , msgPubsubTopic [ msgHash ] ) , zap . Int ( "num_nodes" , len ( missingIn ) ) )
2024-08-22 11:05:56 -04:00
err := app . db . RecordMessage ( runId , tx , msgHash , msgPubsubTopic [ msgHash ] , missingIn , "does_not_exist" )
2024-05-20 16:24:16 -04:00
if err != nil {
2024-08-19 15:38:32 -04:00
return false , err
2024-05-20 16:24:16 -04:00
}
2024-07-19 16:22:13 -04:00
totalMissingMessages ++
2024-05-09 15:22:50 -04:00
}
2024-05-08 16:37:42 -04:00
2024-05-20 16:24:16 -04:00
if len ( unknownIn ) != 0 {
2024-11-20 13:50:28 -04:00
logger . Info ( "message with unknown state identified" , zap . Stringer ( "hash" , msgHash ) , zap . String ( "pubsubTopic" , msgPubsubTopic [ msgHash ] ) , zap . Int ( "num_nodes" , len ( unknownIn ) ) )
2024-08-22 11:05:56 -04:00
err = app . db . RecordMessage ( runId , tx , msgHash , msgPubsubTopic [ msgHash ] , unknownIn , "unknown" )
2024-05-20 16:24:16 -04:00
if err != nil {
2024-08-19 15:38:32 -04:00
return false , err
2024-05-20 16:24:16 -04:00
}
2024-05-09 15:22:50 -04:00
}
}
2024-05-08 16:37:42 -04:00
2024-07-19 14:26:25 -04:00
for _ , s := range storenodes {
2024-08-07 16:14:06 -04:00
missingCnt := missingInSummary [ s ]
unknownCnt := unknownInSummary [ s ]
2024-11-20 13:50:28 -04:00
metricsChan <- & metricsSummary {
storenode : s ,
missingMessages : missingInSummary [ s ] ,
unknownMessages : unknownInSummary [ s ] ,
}
logger . Info ( "missing message summary" , zap . Stringer ( "storenode" , s ) , zap . Int ( "missing" , missingCnt ) , zap . Int ( "unknown" , unknownCnt ) )
2024-05-09 15:22:50 -04:00
}
2024-05-08 16:37:42 -04:00
2024-07-19 18:29:40 -04:00
logger . Info ( "total missing messages" , zap . Int ( "total" , totalMissingMessages ) )
2024-07-19 16:22:13 -04:00
app . metrics . RecordTotalMissingMessages ( totalMissingMessages )
2024-08-19 15:38:32 -04:00
return shouldResetTimer , nil
2024-05-09 15:22:50 -04:00
}
2024-05-08 16:37:42 -04:00
2024-08-07 21:11:01 -04:00
func ( app * Application ) checkMissingMessageStatus ( ctx context . Context , storenodes [ ] peer . ID , runId string , logger * zap . Logger ) error {
2024-08-07 16:14:06 -04:00
now := app . node . Timesource ( ) . Now ( )
2024-08-08 14:07:57 -04:00
from := now . Add ( - 2 * time . Hour )
to := now . Add ( - time . Hour )
2024-08-22 17:43:27 -04:00
logger . Info ( "rechecking missing messages status" , zap . Time ( "from" , from ) , zap . Time ( "to" , to ) )
2024-08-08 14:07:57 -04:00
2024-08-07 16:14:06 -04:00
// Get all messages whose status is missing or does not exist, and the column found_on_recheck is false
// if found, set found_on_recheck to true
2024-08-22 11:05:56 -04:00
missingMessages , err := app . db . GetMissingMessages ( from , to )
2024-08-07 16:14:06 -04:00
if err != nil {
return err
}
wg := sync . WaitGroup { }
2024-08-07 21:11:01 -04:00
for _ , storenodeID := range storenodes {
2024-08-07 16:14:06 -04:00
wg . Add ( 1 )
go func ( peerID peer . ID , messageHashes [ ] pb . MessageHash ) {
defer wg . Done ( )
foundMissingMessages := make ( map [ pb . MessageHash ] struct { } )
app . verifyMessageExistence ( ctx , runId , peerID , messageHashes , func ( result * store . Result ) {
for _ , mkv := range result . Messages ( ) {
foundMissingMessages [ mkv . WakuMessageHash ( ) ] = struct { } { }
}
} , logger )
2024-08-22 16:06:46 -04:00
err := app . db . MarkMessagesAsFound ( peerID , maps . Keys ( foundMissingMessages ) )
2024-08-07 16:14:06 -04:00
if err != nil {
logger . Error ( "could not mark messages as found" , zap . Error ( err ) )
return
}
2024-08-07 22:06:16 -04:00
cnt := len ( messageHashes ) - len ( foundMissingMessages )
app . metrics . RecordMissingMessagesPrevHour ( peerID , cnt )
logger . Info ( "missingMessages for the previous hour" , zap . Stringer ( "storenode" , peerID ) , zap . Int ( "cnt" , cnt ) )
2024-08-07 16:14:06 -04:00
2024-08-07 21:11:01 -04:00
} ( storenodeID , missingMessages [ storenodeID ] )
2024-08-07 16:14:06 -04:00
}
wg . Wait ( )
return nil
}
2024-08-07 21:11:01 -04:00
func ( app * Application ) countMissingMessages ( storenodes [ ] peer . ID ) error {
2024-10-23 17:38:04 -04:00
now := app . node . Timesource ( ) . Now ( ) . Add ( - delay )
// Count messages in last hour (not including last 5 minutes)
results , err := app . db . CountMissingMessages ( now . Add ( - time . Hour ) , now )
if err != nil {
return err
}
2024-10-25 12:01:33 -04:00
for _ , storenodeID := range storenodes {
app . metrics . RecordMissingMessagesLastHour ( storenodeID , results [ storenodeID ] )
2024-10-23 17:38:04 -04:00
}
2024-08-07 16:14:06 -04:00
// not including last two hours in now to let sync work
2024-10-23 17:38:04 -04:00
_2hAgo := now . Add ( - 2 * time . Hour )
2024-08-07 16:14:06 -04:00
// Count messages in last day (not including last two hours)
2024-10-23 17:38:04 -04:00
results , err = app . db . CountMissingMessages ( now . Add ( - 24 * time . Hour ) , _2hAgo )
2024-08-07 16:14:06 -04:00
if err != nil {
return err
}
2024-10-25 12:01:33 -04:00
for _ , storenodeID := range storenodes {
app . metrics . RecordMissingMessagesLastDay ( storenodeID , results [ storenodeID ] )
2024-08-07 16:14:06 -04:00
}
// Count messages in last week (not including last two hours)
2024-10-23 17:38:04 -04:00
results , err = app . db . CountMissingMessages ( _2hAgo . Add ( - 24 * time . Hour * 7 ) , _2hAgo )
2024-08-07 16:14:06 -04:00
if err != nil {
return err
}
2024-08-07 21:11:01 -04:00
for _ , storenodeID := range storenodes {
app . metrics . RecordMissingMessagesLastWeek ( storenodeID , results [ storenodeID ] )
2024-08-07 16:14:06 -04:00
}
return nil
}
2024-07-19 14:14:02 -04:00
func ( app * Application ) fetchStoreNodeMessages ( ctx context . Context , runId string , storenodeID peer . ID , topic string , startTime time . Time , endTime time . Time , logger * zap . Logger ) {
var result * store . Result
var err error
2024-05-16 19:12:10 -04:00
2024-07-19 14:14:02 -04:00
queryLogger := logger . With ( zap . Stringer ( "storenode" , storenodeID ) , zap . Int64 ( "startTime" , startTime . UnixNano ( ) ) , zap . Int64 ( "endTime" , endTime . UnixNano ( ) ) )
2024-05-08 16:37:42 -04:00
2024-07-19 14:14:02 -04:00
retry := true
success := false
count := 1
for retry && count <= maxAttempts {
2024-09-17 10:38:14 -04:00
requestID := protocol . GenerateRequestID ( )
queryLogger . Info ( "retrieving message history for topic" ,
zap . Int ( "attempt" , count ) ,
zap . String ( "requestID" , hex . EncodeToString ( requestID ) ) )
2024-07-19 14:14:02 -04:00
tCtx , cancel := context . WithTimeout ( ctx , 1 * time . Minute )
result , err = app . node . Store ( ) . Query ( tCtx , store . FilterCriteria {
ContentFilter : protocol . NewContentFilter ( topic ) ,
TimeStart : proto . Int64 ( startTime . UnixNano ( ) ) ,
TimeEnd : proto . Int64 ( endTime . UnixNano ( ) ) ,
} , store . WithPeer ( storenodeID ) , store . WithPaging ( false , 100 ) , store . IncludeData ( false ) )
cancel ( )
if err != nil {
2024-09-17 10:38:14 -04:00
queryLogger . Error ( "could not query storenode" ,
zap . Error ( err ) , zap . Int ( "attempt" , count ) ,
zap . String ( "requestID" , hex . EncodeToString ( requestID ) ) )
2024-07-19 14:14:02 -04:00
time . Sleep ( 2 * time . Second )
} else {
queryLogger . Info ( "messages available" , zap . Int ( "len" , len ( result . Messages ( ) ) ) )
retry = false
success = true
}
count ++
2024-05-09 15:22:50 -04:00
}
2024-05-08 16:37:42 -04:00
2024-07-19 14:14:02 -04:00
if ! success {
queryLogger . Error ( "storenode not available" )
err := app . db . RecordStorenodeUnavailable ( runId , storenodeID )
if err != nil {
queryLogger . Error ( "could not store node unavailable" , zap . Error ( err ) )
}
app . metrics . RecordStorenodeAvailability ( storenodeID , false )
2024-05-09 15:22:50 -04:00
return
}
2024-07-19 14:14:02 -04:00
app . metrics . RecordStorenodeAvailability ( storenodeID , true )
2024-05-30 15:35:49 -04:00
2024-07-19 14:14:02 -04:00
for ! result . IsComplete ( ) {
msgMapLock . Lock ( )
for _ , mkv := range result . Messages ( ) {
hash := mkv . WakuMessageHash ( )
_ , ok := msgMap [ hash ]
if ! ok {
msgMap [ hash ] = make ( map [ peer . ID ] MessageExistence )
}
msgMap [ hash ] [ storenodeID ] = Exists
2024-07-25 17:07:47 -04:00
msgPubsubTopic [ hash ] = topic
2024-07-19 14:14:02 -04:00
}
msgMapLock . Unlock ( )
retry := true
success := false
count := 1
cursorLogger := queryLogger . With ( zap . String ( "cursor" , hex . EncodeToString ( result . Cursor ( ) ) ) )
for retry && count <= maxAttempts {
2024-09-17 11:43:15 -04:00
requestID := protocol . GenerateRequestID ( )
cursorLogger . Info ( "retrieving next page" , zap . String ( "requestID" , hex . EncodeToString ( requestID ) ) )
2024-05-30 15:35:49 -04:00
tCtx , cancel := context . WithTimeout ( ctx , 1 * time . Minute )
2024-09-17 11:43:15 -04:00
err = result . Next ( tCtx , store . WithRequestID ( requestID ) )
2024-05-30 15:35:49 -04:00
cancel ( )
2024-05-09 15:22:50 -04:00
if err != nil {
2024-07-19 14:14:02 -04:00
cursorLogger . Error ( "could not query storenode" , zap . Error ( err ) )
2024-05-09 15:22:50 -04:00
time . Sleep ( 2 * time . Second )
} else {
2024-08-22 16:34:45 -04:00
msgLen := len ( result . Messages ( ) )
if msgLen != 0 {
cursorLogger . Info ( "more messages available" , zap . Int ( "len" , msgLen ) )
}
2024-07-19 14:14:02 -04:00
retry = false
success = true
2024-05-09 15:22:50 -04:00
}
2024-07-19 14:14:02 -04:00
count ++
2024-05-09 15:22:50 -04:00
}
2024-05-08 16:37:42 -04:00
2024-07-19 14:14:02 -04:00
if ! success {
cursorLogger . Error ( "storenode not available" )
err := app . db . RecordStorenodeUnavailable ( runId , storenodeID )
2024-05-20 10:10:28 -04:00
if err != nil {
2024-07-19 14:14:02 -04:00
cursorLogger . Error ( "could not store recordnode unavailable" , zap . Error ( err ) )
2024-05-08 16:37:42 -04:00
}
2024-07-19 14:14:02 -04:00
app . metrics . RecordStorenodeAvailability ( storenodeID , false )
return
2024-05-09 15:22:50 -04:00
}
2024-07-10 11:42:28 -04:00
2024-07-19 14:14:02 -04:00
app . metrics . RecordStorenodeAvailability ( storenodeID , true )
2024-05-09 15:22:50 -04:00
}
2024-07-19 14:14:02 -04:00
}
2024-08-19 15:38:32 -04:00
func ( app * Application ) retrieveHistory ( ctx context . Context , runId string , storenodes peer . IDSlice , topic string , startTime time . Time , endTime time . Time , tx * sql . Tx , logger * zap . Logger ) {
2024-07-19 14:14:02 -04:00
// Determine if the messages exist across all nodes
wg := sync . WaitGroup { }
2024-08-07 16:14:06 -04:00
for _ , storePeerID := range storenodes {
2024-07-19 14:14:02 -04:00
wg . Add ( 1 )
go func ( peerID peer . ID ) {
defer wg . Done ( )
app . fetchStoreNodeMessages ( ctx , runId , peerID , topic , startTime , endTime , logger )
2024-08-07 16:14:06 -04:00
} ( storePeerID )
2024-07-19 14:14:02 -04:00
}
wg . Wait ( )
2024-05-08 16:37:42 -04:00
2024-05-09 15:22:50 -04:00
// Update db with last sync time
2024-08-22 11:05:56 -04:00
err := app . db . UpdateTopicSyncState ( tx , topic , endTime )
2024-05-16 19:12:10 -04:00
if err != nil {
logger . Panic ( "could not update topic sync state" , zap . Error ( err ) )
}
2024-07-19 18:24:49 -04:00
app . metrics . RecordLastSyncDate ( topic , endTime )
2024-05-09 15:22:50 -04:00
}
2024-05-08 16:37:42 -04:00
2024-08-07 16:14:06 -04:00
func ( app * Application ) verifyMessageExistence ( ctx context . Context , runId string , peerID peer . ID , messageHashes [ ] pb . MessageHash , onResult func ( result * store . Result ) , logger * zap . Logger ) {
2024-08-07 21:11:01 -04:00
if len ( messageHashes ) == 0 {
return
}
2024-07-19 14:14:02 -04:00
peerInfo := app . node . Host ( ) . Peerstore ( ) . PeerInfo ( peerID )
2024-05-20 16:02:09 -04:00
2024-07-08 10:49:48 -04:00
queryLogger := logger . With ( zap . Stringer ( "storenode" , peerID ) )
2024-05-31 14:41:54 -04:00
2024-08-09 15:10:24 -04:00
wg := sync . WaitGroup { }
// Split into batches
for i := 0 ; i < len ( messageHashes ) ; i += maxMsgHashesPerRequest {
j := i + maxMsgHashesPerRequest
if j > len ( messageHashes ) {
j = len ( messageHashes )
2024-05-20 10:10:28 -04:00
}
2024-05-16 19:12:10 -04:00
2024-08-09 15:10:24 -04:00
wg . Add ( 1 )
go func ( messageHashes [ ] pb . MessageHash ) {
defer wg . Done ( )
2024-07-19 14:14:02 -04:00
2024-08-19 18:14:07 -04:00
var result * store . Result
var err error
2024-08-09 15:10:24 -04:00
retry := true
success := false
count := 1
for retry && count <= maxAttempts {
queryLogger . Info ( "querying by hash" , zap . Int ( "attempt" , count ) )
2024-09-17 11:43:15 -04:00
2024-08-09 15:10:24 -04:00
tCtx , cancel := context . WithTimeout ( ctx , 1 * time . Minute )
2024-09-20 09:20:58 -04:00
result , err = app . node . Store ( ) . QueryByHash ( tCtx , messageHashes , store . IncludeData ( false ) , store . WithPeer ( peerInfo . ID ) , store . WithPaging ( false , uint64 ( len ( messageHashes ) ) ) )
2024-08-09 15:10:24 -04:00
cancel ( )
if err != nil {
queryLogger . Error ( "could not query storenode" , zap . Error ( err ) , zap . Int ( "attempt" , count ) )
time . Sleep ( 2 * time . Second )
} else {
queryLogger . Info ( "hashes available" , zap . Int ( "len" , len ( result . Messages ( ) ) ) )
retry = false
success = true
}
count ++
2024-05-09 15:22:50 -04:00
}
2024-07-19 14:14:02 -04:00
2024-08-09 15:10:24 -04:00
if ! success {
queryLogger . Error ( "storenode not available" )
err := app . db . RecordStorenodeUnavailable ( runId , peerID )
if err != nil {
queryLogger . Error ( "could not store recordnode unavailable" , zap . Error ( err ) )
}
app . metrics . RecordStorenodeAvailability ( peerID , false )
return
2024-07-19 14:14:02 -04:00
}
2024-08-09 15:10:24 -04:00
app . metrics . RecordStorenodeAvailability ( peerID , true )
for ! result . IsComplete ( ) {
onResult ( result )
retry := true
success := false
count := 1
for retry && count <= maxAttempts {
2024-09-17 11:43:15 -04:00
requestID := protocol . GenerateRequestID ( )
queryLogger . Info ( "executing next while querying hashes" , zap . String ( "requestID" , hex . EncodeToString ( requestID ) ) , zap . String ( "cursor" , hexutil . Encode ( result . Cursor ( ) ) ) , zap . Int ( "attempt" , count ) )
2024-08-09 15:10:24 -04:00
tCtx , cancel := context . WithTimeout ( ctx , 1 * time . Minute )
2024-09-17 11:43:15 -04:00
err = result . Next ( tCtx , store . WithRequestID ( requestID ) )
2024-08-09 15:10:24 -04:00
cancel ( )
if err != nil {
queryLogger . Error ( "could not query storenode" , zap . String ( "cursor" , hexutil . Encode ( result . Cursor ( ) ) ) , zap . Error ( err ) , zap . Int ( "attempt" , count ) )
time . Sleep ( 2 * time . Second )
} else {
queryLogger . Info ( "more hashes available" , zap . String ( "cursor" , hex . EncodeToString ( result . Cursor ( ) ) ) , zap . Int ( "len" , len ( result . Messages ( ) ) ) )
retry = false
success = true
}
count ++
}
if ! success {
queryLogger . Error ( "storenode not available" , zap . String ( "cursor" , hexutil . Encode ( result . Cursor ( ) ) ) )
err := app . db . RecordStorenodeUnavailable ( runId , peerID )
if err != nil {
logger . Error ( "could not store recordnode unavailable" , zap . Error ( err ) , zap . String ( "cursor" , hex . EncodeToString ( result . Cursor ( ) ) ) , zap . Stringer ( "storenode" , peerInfo ) )
}
app . metrics . RecordStorenodeAvailability ( peerID , false )
return
}
app . metrics . RecordStorenodeAvailability ( peerID , true )
}
} ( messageHashes [ i : j ] )
2024-05-08 16:37:42 -04:00
}
2024-08-09 15:10:24 -04:00
wg . Wait ( )
2024-05-08 16:37:42 -04:00
}