From 4e6118d42d293955b7f5014a767552e4f091ca15 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Mon, 20 May 2024 16:02:09 -0400 Subject: [PATCH] feat: support dns discovery URLs --- cmd/storemsgcounter/execute.go | 130 +++++++++++++++++++------------ cmd/storemsgcounter/flags.go | 32 +++++--- cmd/storemsgcounter/options.go | 18 +++-- internal/persistence/database.go | 9 ++- 4 files changed, 119 insertions(+), 70 deletions(-) diff --git a/cmd/storemsgcounter/execute.go b/cmd/storemsgcounter/execute.go index d6f8762..297162d 100644 --- a/cmd/storemsgcounter/execute.go +++ b/cmd/storemsgcounter/execute.go @@ -3,12 +3,15 @@ package main import ( "context" "database/sql" + "errors" "sync" "time" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/google/uuid" - "github.com/multiformats/go-multiaddr" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/peerstore" + "github.com/waku-org/go-waku/waku/v2/dnsdisc" "github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" @@ -56,6 +59,32 @@ func Execute(ctx context.Context, options Options) error { } defer dbStore.Stop() + var discoveredNodes []dnsdisc.DiscoveredNode + if len(options.DNSDiscoveryURLs.Value()) != 0 { + discoveredNodes = node.GetNodesFromDNSDiscovery(logger, ctx, options.DNSDiscoveryNameserver, options.DNSDiscoveryURLs.Value()) + } + + var storenodes []peer.AddrInfo + for _, node := range discoveredNodes { + if len(node.PeerInfo.Addrs) == 0 { + continue + } + storenodes = append(storenodes, node.PeerInfo) + } + + for _, node := range options.StoreNodes { + pInfo, err := peer.AddrInfosFromP2pAddrs(node) + if err != nil { + return err + } + + storenodes = append(storenodes, pInfo...) + } + + if len(storenodes) == 0 { + return errors.New("no storenodes specified") + } + wakuNode, err := node.New( node.WithNTP(), node.WithClusterID(uint16(options.ClusterID)), @@ -69,6 +98,10 @@ func Execute(ctx context.Context, options Options) error { } defer wakuNode.Stop() + for _, s := range storenodes { + wakuNode.Host().Peerstore().AddAddrs(s.ID, s.Addrs, peerstore.PermanentAddrTTL) + } + err = dbStore.Start(ctx, wakuNode.Timesource()) if err != nil { return err @@ -82,7 +115,7 @@ func Execute(ctx context.Context, options Options) error { return nil case <-timer.C: logger.Info("verifying message history...") - err := verifyHistory(ctx, wakuNode, dbStore, logger) + err := verifyHistory(ctx, storenodes, wakuNode, dbStore, logger) if err != nil { return err } @@ -94,17 +127,17 @@ func Execute(ctx context.Context, options Options) error { } var msgMapLock sync.Mutex -var msgMap map[pb.MessageHash]map[string]MessageExistence +var msgMap map[pb.MessageHash]map[peer.ID]MessageExistence var msgAttr map[pb.MessageHash]MessageAttr -func verifyHistory(ctx context.Context, wakuNode *node.WakuNode, dbStore *persistence.DBStore, logger *zap.Logger) error { +func verifyHistory(ctx context.Context, storenodes []peer.AddrInfo, wakuNode *node.WakuNode, dbStore *persistence.DBStore, logger *zap.Logger) error { runId := uuid.New().String() logger = logger.With(zap.String("runId", runId)) // [MessageHash][StoreNode] = exists? msgMapLock.Lock() - msgMap = make(map[pb.MessageHash]map[string]MessageExistence) + msgMap = make(map[pb.MessageHash]map[peer.ID]MessageExistence) msgAttr = make(map[pb.MessageHash]MessageAttr) msgMapLock.Unlock() @@ -132,7 +165,7 @@ func verifyHistory(ctx context.Context, wakuNode *node.WakuNode, dbStore *persis wg.Add(1) go func(topic string, lastSyncTimestamp *time.Time) { defer wg.Done() - retrieveHistory(ctx, runId, topic, lastSyncTimestamp, wakuNode, dbStore, tx, logger) + retrieveHistory(ctx, runId, storenodes, topic, lastSyncTimestamp, wakuNode, dbStore, tx, logger) }(topic, lastSyncTimestamp) } wg.Wait() @@ -140,26 +173,24 @@ func verifyHistory(ctx context.Context, wakuNode *node.WakuNode, dbStore *persis // Verify for each storenode which messages are not available, and query // for their existence using message hash // ======================================================================== - msgsToVerify := make(map[string][]pb.MessageHash) // storenode -> msgHash + msgsToVerify := make(map[peer.ID][]pb.MessageHash) // storenode -> msgHash msgMapLock.Lock() for msgHash, nodes := range msgMap { - for _, node := range options.StoreNodes { - nodeStr := node.String() - if nodes[nodeStr] != Exists { - msgsToVerify[nodeStr] = append(msgsToVerify[nodeStr], msgHash) + for _, node := range storenodes { + if nodes[node.ID] != Exists { + msgsToVerify[node.ID] = append(msgsToVerify[node.ID], msgHash) } } } msgMapLock.Unlock() wg = sync.WaitGroup{} - for node, messageHashes := range msgsToVerify { + for peerID, messageHashes := range msgsToVerify { wg.Add(1) - go func(node string, messageHashes []pb.MessageHash) { + go func(peerID peer.ID, messageHashes []pb.MessageHash) { defer wg.Done() - nodeMultiaddr, _ := multiaddr.NewMultiaddr(node) - verifyMessageExistence(ctx, runId, tx, nodeMultiaddr, messageHashes, wakuNode, dbStore, logger) - }(node, messageHashes) + verifyMessageExistence(ctx, runId, tx, peerID, messageHashes, wakuNode, dbStore, logger) + }(peerID, messageHashes) } wg.Wait() @@ -169,14 +200,13 @@ func verifyHistory(ctx context.Context, wakuNode *node.WakuNode, dbStore *persis msgMapLock.Lock() defer msgMapLock.Unlock() for msgHash, nodes := range msgMap { - var missingIn []string - var unknownIn []string - for _, node := range options.StoreNodes { - nodeStr := node.String() - if nodes[nodeStr] == DoesNotExist { - missingIn = append(missingIn, nodeStr) - } else if nodes[nodeStr] == Unknown { - unknownIn = append(unknownIn, nodeStr) + var missingIn []peer.AddrInfo + var unknownIn []peer.AddrInfo + for _, node := range storenodes { + if nodes[node.ID] == DoesNotExist { + missingIn = append(missingIn, node) + } else if nodes[node.ID] == Unknown { + unknownIn = append(unknownIn, node) } } @@ -198,7 +228,7 @@ func verifyHistory(ctx context.Context, wakuNode *node.WakuNode, dbStore *persis return nil } -func retrieveHistory(ctx context.Context, runId string, topic string, lastSyncTimestamp *time.Time, wakuNode *node.WakuNode, dbStore *persistence.DBStore, tx *sql.Tx, logger *zap.Logger) { +func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo, topic string, lastSyncTimestamp *time.Time, wakuNode *node.WakuNode, dbStore *persistence.DBStore, tx *sql.Tx, logger *zap.Logger) { logger = logger.With(zap.String("topic", topic), zap.Timep("lastSyncTimestamp", lastSyncTimestamp)) now := wakuNode.Timesource().Now() @@ -216,7 +246,7 @@ func retrieveHistory(ctx context.Context, runId string, topic string, lastSyncTi } // Determine if the messages exist across all nodes - for _, node := range options.StoreNodes { + for _, node := range storenodes { storeNodeFailure := false var result *store.Result var err error @@ -229,7 +259,7 @@ func retrieveHistory(ctx context.Context, runId string, topic string, lastSyncTi ContentFilter: protocol.NewContentFilter(topic), TimeStart: proto.Int64(startTime.UnixNano()), TimeEnd: proto.Int64(endTime.UnixNano()), - }, store.WithPeerAddr(node)) + }, store.WithPeer(node.ID)) if err != nil { logger.Error("could not query storenode", zap.Stringer("storenode", node), zap.Error(err)) storeNodeFailure = true @@ -243,9 +273,9 @@ func retrieveHistory(ctx context.Context, runId string, topic string, lastSyncTi if storeNodeFailure { logger.Error("storenode not available", zap.Stringer("storenode", node), zap.Time("startTime", startTime), zap.Time("endTime", endTime)) - err := dbStore.RecordStorenodeUnavailable(runId, node.String()) + err := dbStore.RecordStorenodeUnavailable(runId, node) if err != nil { - logger.Error("could not store recordnode unavailable", zap.Error(err), zap.Stringer("storenode", node)) + logger.Error("could not store node unavailable", zap.Error(err), zap.Stringer("storenode", node)) } } else { @@ -257,9 +287,9 @@ func retrieveHistory(ctx context.Context, runId string, topic string, lastSyncTi hash := mkv.WakuMessageHash() _, ok := msgMap[hash] if !ok { - msgMap[hash] = make(map[string]MessageExistence) + msgMap[hash] = make(map[peer.ID]MessageExistence) } - msgMap[hash][node.String()] = Exists + msgMap[hash][node.ID] = Exists msgAttr[hash] = MessageAttr{ Timestamp: uint64(mkv.Message.GetTimestamp()), PubsubTopic: mkv.GetPubsubTopic(), @@ -283,13 +313,17 @@ func retrieveHistory(ctx context.Context, runId string, topic string, lastSyncTi } if storeNodeFailure { - // TODO: Notify that storenode was not available from X to Y time logger.Error("storenode not available", zap.Stringer("storenode", node), zap.Time("startTime", startTime), zap.Time("endTime", endTime), zap.String("topic", topic), zap.String("cursor", hexutil.Encode(result.Cursor()))) + + err := dbStore.RecordStorenodeUnavailable(runId, node) + if err != nil { + logger.Error("could not store recordnode unavailable", zap.Error(err), zap.Stringer("storenode", node)) + } break iteratorLbl } } @@ -303,16 +337,18 @@ func retrieveHistory(ctx context.Context, runId string, topic string, lastSyncTi } } -func verifyMessageExistence(ctx context.Context, runId string, tx *sql.Tx, nodeAddr multiaddr.Multiaddr, messageHashes []pb.MessageHash, wakuNode *node.WakuNode, dbStore *persistence.DBStore, logger *zap.Logger) { +func verifyMessageExistence(ctx context.Context, runId string, tx *sql.Tx, peerID peer.ID, messageHashes []pb.MessageHash, wakuNode *node.WakuNode, dbStore *persistence.DBStore, logger *zap.Logger) { storeNodeFailure := false var result *store.Result var err error + peerInfo := wakuNode.Host().Peerstore().PeerInfo(peerID) + queryLbl: for i := 0; i < maxAttempts; i++ { - result, err = wakuNode.Store().QueryByHash(ctx, messageHashes, store.IncludeData(false), store.WithPeerAddr(nodeAddr)) + result, err = wakuNode.Store().QueryByHash(ctx, messageHashes, store.IncludeData(false), store.WithPeer(peerInfo.ID)) if err != nil { - logger.Error("could not query storenode", zap.Stringer("storenode", nodeAddr), zap.Error(err)) + logger.Error("could not query storenode", zap.Stringer("storenode", peerInfo), zap.Error(err)) storeNodeFailure = true time.Sleep(2 * time.Second) } else { @@ -323,30 +359,28 @@ queryLbl: if storeNodeFailure { logger.Error("storenode not available", - zap.Stringer("storenode", nodeAddr), + zap.Stringer("storenode", peerInfo), zap.Stringers("hashes", messageHashes)) - err := dbStore.RecordStorenodeUnavailable(runId, nodeAddr.String()) + err := dbStore.RecordStorenodeUnavailable(runId, peerInfo) if err != nil { - logger.Error("could not store recordnode unavailable", zap.Error(err), zap.Stringer("storenode", nodeAddr)) + logger.Error("could not store recordnode unavailable", zap.Error(err), zap.Stringer("storenode", peerInfo)) } } else { for !result.IsComplete() { - nodeAddrStr := nodeAddr.String() - msgMapLock.Lock() for _, mkv := range result.Messages() { hash := mkv.WakuMessageHash() _, ok := msgMap[hash] if !ok { - msgMap[hash] = make(map[string]MessageExistence) + msgMap[hash] = make(map[peer.ID]MessageExistence) } - msgMap[hash][nodeAddrStr] = Exists + msgMap[hash][peerInfo.ID] = Exists } for _, msgHash := range messageHashes { - if msgMap[msgHash][nodeAddrStr] != Exists { - msgMap[msgHash][nodeAddrStr] = DoesNotExist + if msgMap[msgHash][peerInfo.ID] != Exists { + msgMap[msgHash][peerInfo.ID] = DoesNotExist } } @@ -358,7 +392,7 @@ queryLbl: for i := 0; i < maxAttempts; i++ { err = result.Next(ctx) if err != nil { - logger.Error("could not query storenode", zap.Stringer("storenode", nodeAddr), zap.Error(err)) + logger.Error("could not query storenode", zap.Stringer("storenode", peerInfo), zap.Error(err)) storeNodeFailure = true time.Sleep(2 * time.Second) } else { @@ -369,13 +403,13 @@ queryLbl: if storeNodeFailure { logger.Error("storenode not available", - zap.Stringer("storenode", nodeAddr), + zap.Stringer("storenode", peerInfo), zap.Stringers("hashes", messageHashes), zap.String("cursor", hexutil.Encode(result.Cursor()))) - err := dbStore.RecordStorenodeUnavailable(runId, nodeAddr.String()) + err := dbStore.RecordStorenodeUnavailable(runId, peerInfo) if err != nil { - logger.Error("could not store recordnode unavailable", zap.Error(err), zap.Stringer("storenode", nodeAddr)) + logger.Error("could not store recordnode unavailable", zap.Error(err), zap.Stringer("storenode", peerInfo)) } } diff --git a/cmd/storemsgcounter/flags.go b/cmd/storemsgcounter/flags.go index 030cc6a..03ed182 100644 --- a/cmd/storemsgcounter/flags.go +++ b/cmd/storemsgcounter/flags.go @@ -12,20 +12,32 @@ import ( var cliFlags = []cli.Flag{ &cli.StringFlag{Name: "config-file", Usage: "loads configuration from a TOML file (cmd-line parameters take precedence)"}, cliutils.NewGenericFlagMultiValue(&cli.GenericFlag{ - Name: "storenode", - Required: true, - Usage: "Multiaddr of peers that supports storeV3 protocol. Option may be repeated", + Name: "storenode", + Usage: "Multiaddr of peers that supports storeV3 protocol. Option may be repeated", Value: &cliutils.MultiaddrSlice{ Values: &options.StoreNodes, }, - EnvVars: []string{"MSGVERIF_STORENODE"}, + EnvVars: []string{"STORE_MSG_CTR_STORENODE"}, + }), + altsrc.NewStringSliceFlag(&cli.StringSliceFlag{ + Name: "dns-discovery-url", + Usage: "URL for DNS node list in format 'enrtree://@'. Option may be repeated", + Destination: &options.DNSDiscoveryURLs, + EnvVars: []string{"STORE_MSG_CTR_DNS_DISC_URL"}, + }), + altsrc.NewStringFlag(&cli.StringFlag{ + Name: "dns-discovery-name-server", + Aliases: []string{"dns-discovery-nameserver"}, + Usage: "DNS nameserver IP to query (empty to use system's default)", + Destination: &options.DNSDiscoveryNameserver, + EnvVars: []string{"STORE_MSG_CTR_DNS_DISC_NAMESERVER"}, }), altsrc.NewUintFlag(&cli.UintFlag{ Name: "cluster-id", Usage: "ClusterID to use", Destination: &options.ClusterID, Value: 0, - EnvVars: []string{"MSGVERIF_CLUSTER_ID"}, + EnvVars: []string{"STORE_MSG_CTR_CLUSTER_ID"}, }), altsrc.NewStringSliceFlag(&cli.StringSliceFlag{ Name: "pubsub-topic", @@ -33,7 +45,7 @@ var cliFlags = []cli.Flag{ Usage: "Pubsub topic used for the query. Argument may be repeated.", Value: cli.NewStringSlice(relay.DefaultWakuTopic), Destination: &options.PubSubTopics, - EnvVars: []string{"MSGVERIF_PUBSUB_TOPICS"}, + EnvVars: []string{"STORE_MSG_CTR_PUBSUB_TOPICS"}, }), altsrc.NewStringFlag(&cli.StringFlag{ Name: "db-url", @@ -47,7 +59,7 @@ var cliFlags = []cli.Flag{ Usage: "Retention policy. ", Destination: &options.RetentionPolicy, Value: 15 * 24 * time.Hour, - EnvVars: []string{"MSGVERIF_RETENTION_POLICY"}, + EnvVars: []string{"STORE_MSG_CTR_RETENTION_POLICY"}, }), cliutils.NewGenericFlagSingleValue(&cli.GenericFlag{ Name: "log-level", @@ -57,7 +69,7 @@ var cliFlags = []cli.Flag{ Value: &options.LogLevel, }, Usage: "Define the logging level (allowed values: DEBUG, INFO, WARN, ERROR, DPANIC, PANIC, FATAL)", - EnvVars: []string{"MSGVERIF_LOG_LEVEL"}, + EnvVars: []string{"STORE_MSG_CTR_LOG_LEVEL"}, }), cliutils.NewGenericFlagSingleValue(&cli.GenericFlag{ Name: "log-encoding", @@ -66,13 +78,13 @@ var cliFlags = []cli.Flag{ Choices: []string{"console", "nocolor", "json"}, Value: &options.LogEncoding, }, - EnvVars: []string{"MSGVERIF_LOG_ENCODING"}, + EnvVars: []string{"STORE_MSG_CTR_LOG_ENCODING"}, }), altsrc.NewStringFlag(&cli.StringFlag{ Name: "log-output", Value: "stdout", Usage: "specifies where logging output should be written (stdout, file, file:./filename.log)", Destination: &options.LogOutput, - EnvVars: []string{"MSGVERIF_LOG_OUTPUT"}, + EnvVars: []string{"STORE_MSG_CTR_LOG_OUTPUT"}, }), } diff --git a/cmd/storemsgcounter/options.go b/cmd/storemsgcounter/options.go index 33b4f0d..e71d899 100644 --- a/cmd/storemsgcounter/options.go +++ b/cmd/storemsgcounter/options.go @@ -8,12 +8,14 @@ import ( ) type Options struct { - LogLevel string - LogEncoding string - LogOutput string - ClusterID uint - PubSubTopics cli.StringSlice - DatabaseURL string - RetentionPolicy time.Duration - StoreNodes []multiaddr.Multiaddr + LogLevel string + LogEncoding string + LogOutput string + ClusterID uint + PubSubTopics cli.StringSlice + DatabaseURL string + RetentionPolicy time.Duration + StoreNodes []multiaddr.Multiaddr + DNSDiscoveryNameserver string + DNSDiscoveryURLs cli.StringSlice } diff --git a/internal/persistence/database.go b/internal/persistence/database.go index ab3e00c..a1f1cda 100644 --- a/internal/persistence/database.go +++ b/internal/persistence/database.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/timesource" "go.uber.org/zap" @@ -238,7 +239,7 @@ func (d *DBStore) UpdateTopicSyncState(tx *sql.Tx, clusterID uint, topic string, return stmt.Close() } -func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, clusterID uint, topic string, timestamp uint64, storenodes []string, status string) error { +func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, clusterID uint, topic string, timestamp uint64, storenodes []peer.AddrInfo, status string) error { if len(storenodes) == 0 { return nil } @@ -251,7 +252,7 @@ func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, now := time.Now().UnixNano() for _, s := range storenodes { - _, err := stmt.Exec(uuid, clusterID, topic, msgHash.String(), timestamp, s, status, now) + _, err := stmt.Exec(uuid, clusterID, topic, msgHash.String(), timestamp, s.Addrs[0].String(), status, now) if err != nil { return err } @@ -260,7 +261,7 @@ func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, return nil } -func (d *DBStore) RecordStorenodeUnavailable(uuid string, storenode string) error { +func (d *DBStore) RecordStorenodeUnavailable(uuid string, storenode peer.AddrInfo) error { stmt, err := d.db.Prepare("INSERT INTO storeNodeUnavailable(runId, storenode, requestTime) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING") if err != nil { return err @@ -268,7 +269,7 @@ func (d *DBStore) RecordStorenodeUnavailable(uuid string, storenode string) erro defer stmt.Close() now := time.Now().UnixNano() - _, err = stmt.Exec(uuid, storenode, now) + _, err = stmt.Exec(uuid, storenode.Addrs[0].String(), now) if err != nil { return err }