feat: support dns discovery URLs

This commit is contained in:
Richard Ramos 2024-05-20 16:02:09 -04:00
parent b40732b442
commit 4e6118d42d
No known key found for this signature in database
GPG Key ID: 1CE87DB518195760
4 changed files with 119 additions and 70 deletions

View File

@ -3,12 +3,15 @@ package main
import (
"context"
"database/sql"
"errors"
"sync"
"time"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/google/uuid"
"github.com/multiformats/go-multiaddr"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
@ -56,6 +59,32 @@ func Execute(ctx context.Context, options Options) error {
}
defer dbStore.Stop()
var discoveredNodes []dnsdisc.DiscoveredNode
if len(options.DNSDiscoveryURLs.Value()) != 0 {
discoveredNodes = node.GetNodesFromDNSDiscovery(logger, ctx, options.DNSDiscoveryNameserver, options.DNSDiscoveryURLs.Value())
}
var storenodes []peer.AddrInfo
for _, node := range discoveredNodes {
if len(node.PeerInfo.Addrs) == 0 {
continue
}
storenodes = append(storenodes, node.PeerInfo)
}
for _, node := range options.StoreNodes {
pInfo, err := peer.AddrInfosFromP2pAddrs(node)
if err != nil {
return err
}
storenodes = append(storenodes, pInfo...)
}
if len(storenodes) == 0 {
return errors.New("no storenodes specified")
}
wakuNode, err := node.New(
node.WithNTP(),
node.WithClusterID(uint16(options.ClusterID)),
@ -69,6 +98,10 @@ func Execute(ctx context.Context, options Options) error {
}
defer wakuNode.Stop()
for _, s := range storenodes {
wakuNode.Host().Peerstore().AddAddrs(s.ID, s.Addrs, peerstore.PermanentAddrTTL)
}
err = dbStore.Start(ctx, wakuNode.Timesource())
if err != nil {
return err
@ -82,7 +115,7 @@ func Execute(ctx context.Context, options Options) error {
return nil
case <-timer.C:
logger.Info("verifying message history...")
err := verifyHistory(ctx, wakuNode, dbStore, logger)
err := verifyHistory(ctx, storenodes, wakuNode, dbStore, logger)
if err != nil {
return err
}
@ -94,17 +127,17 @@ func Execute(ctx context.Context, options Options) error {
}
var msgMapLock sync.Mutex
var msgMap map[pb.MessageHash]map[string]MessageExistence
var msgMap map[pb.MessageHash]map[peer.ID]MessageExistence
var msgAttr map[pb.MessageHash]MessageAttr
func verifyHistory(ctx context.Context, wakuNode *node.WakuNode, dbStore *persistence.DBStore, logger *zap.Logger) error {
func verifyHistory(ctx context.Context, storenodes []peer.AddrInfo, wakuNode *node.WakuNode, dbStore *persistence.DBStore, logger *zap.Logger) error {
runId := uuid.New().String()
logger = logger.With(zap.String("runId", runId))
// [MessageHash][StoreNode] = exists?
msgMapLock.Lock()
msgMap = make(map[pb.MessageHash]map[string]MessageExistence)
msgMap = make(map[pb.MessageHash]map[peer.ID]MessageExistence)
msgAttr = make(map[pb.MessageHash]MessageAttr)
msgMapLock.Unlock()
@ -132,7 +165,7 @@ func verifyHistory(ctx context.Context, wakuNode *node.WakuNode, dbStore *persis
wg.Add(1)
go func(topic string, lastSyncTimestamp *time.Time) {
defer wg.Done()
retrieveHistory(ctx, runId, topic, lastSyncTimestamp, wakuNode, dbStore, tx, logger)
retrieveHistory(ctx, runId, storenodes, topic, lastSyncTimestamp, wakuNode, dbStore, tx, logger)
}(topic, lastSyncTimestamp)
}
wg.Wait()
@ -140,26 +173,24 @@ func verifyHistory(ctx context.Context, wakuNode *node.WakuNode, dbStore *persis
// Verify for each storenode which messages are not available, and query
// for their existence using message hash
// ========================================================================
msgsToVerify := make(map[string][]pb.MessageHash) // storenode -> msgHash
msgsToVerify := make(map[peer.ID][]pb.MessageHash) // storenode -> msgHash
msgMapLock.Lock()
for msgHash, nodes := range msgMap {
for _, node := range options.StoreNodes {
nodeStr := node.String()
if nodes[nodeStr] != Exists {
msgsToVerify[nodeStr] = append(msgsToVerify[nodeStr], msgHash)
for _, node := range storenodes {
if nodes[node.ID] != Exists {
msgsToVerify[node.ID] = append(msgsToVerify[node.ID], msgHash)
}
}
}
msgMapLock.Unlock()
wg = sync.WaitGroup{}
for node, messageHashes := range msgsToVerify {
for peerID, messageHashes := range msgsToVerify {
wg.Add(1)
go func(node string, messageHashes []pb.MessageHash) {
go func(peerID peer.ID, messageHashes []pb.MessageHash) {
defer wg.Done()
nodeMultiaddr, _ := multiaddr.NewMultiaddr(node)
verifyMessageExistence(ctx, runId, tx, nodeMultiaddr, messageHashes, wakuNode, dbStore, logger)
}(node, messageHashes)
verifyMessageExistence(ctx, runId, tx, peerID, messageHashes, wakuNode, dbStore, logger)
}(peerID, messageHashes)
}
wg.Wait()
@ -169,14 +200,13 @@ func verifyHistory(ctx context.Context, wakuNode *node.WakuNode, dbStore *persis
msgMapLock.Lock()
defer msgMapLock.Unlock()
for msgHash, nodes := range msgMap {
var missingIn []string
var unknownIn []string
for _, node := range options.StoreNodes {
nodeStr := node.String()
if nodes[nodeStr] == DoesNotExist {
missingIn = append(missingIn, nodeStr)
} else if nodes[nodeStr] == Unknown {
unknownIn = append(unknownIn, nodeStr)
var missingIn []peer.AddrInfo
var unknownIn []peer.AddrInfo
for _, node := range storenodes {
if nodes[node.ID] == DoesNotExist {
missingIn = append(missingIn, node)
} else if nodes[node.ID] == Unknown {
unknownIn = append(unknownIn, node)
}
}
@ -198,7 +228,7 @@ func verifyHistory(ctx context.Context, wakuNode *node.WakuNode, dbStore *persis
return nil
}
func retrieveHistory(ctx context.Context, runId string, topic string, lastSyncTimestamp *time.Time, wakuNode *node.WakuNode, dbStore *persistence.DBStore, tx *sql.Tx, logger *zap.Logger) {
func retrieveHistory(ctx context.Context, runId string, storenodes []peer.AddrInfo, topic string, lastSyncTimestamp *time.Time, wakuNode *node.WakuNode, dbStore *persistence.DBStore, tx *sql.Tx, logger *zap.Logger) {
logger = logger.With(zap.String("topic", topic), zap.Timep("lastSyncTimestamp", lastSyncTimestamp))
now := wakuNode.Timesource().Now()
@ -216,7 +246,7 @@ func retrieveHistory(ctx context.Context, runId string, topic string, lastSyncTi
}
// Determine if the messages exist across all nodes
for _, node := range options.StoreNodes {
for _, node := range storenodes {
storeNodeFailure := false
var result *store.Result
var err error
@ -229,7 +259,7 @@ func retrieveHistory(ctx context.Context, runId string, topic string, lastSyncTi
ContentFilter: protocol.NewContentFilter(topic),
TimeStart: proto.Int64(startTime.UnixNano()),
TimeEnd: proto.Int64(endTime.UnixNano()),
}, store.WithPeerAddr(node))
}, store.WithPeer(node.ID))
if err != nil {
logger.Error("could not query storenode", zap.Stringer("storenode", node), zap.Error(err))
storeNodeFailure = true
@ -243,9 +273,9 @@ func retrieveHistory(ctx context.Context, runId string, topic string, lastSyncTi
if storeNodeFailure {
logger.Error("storenode not available", zap.Stringer("storenode", node), zap.Time("startTime", startTime), zap.Time("endTime", endTime))
err := dbStore.RecordStorenodeUnavailable(runId, node.String())
err := dbStore.RecordStorenodeUnavailable(runId, node)
if err != nil {
logger.Error("could not store recordnode unavailable", zap.Error(err), zap.Stringer("storenode", node))
logger.Error("could not store node unavailable", zap.Error(err), zap.Stringer("storenode", node))
}
} else {
@ -257,9 +287,9 @@ func retrieveHistory(ctx context.Context, runId string, topic string, lastSyncTi
hash := mkv.WakuMessageHash()
_, ok := msgMap[hash]
if !ok {
msgMap[hash] = make(map[string]MessageExistence)
msgMap[hash] = make(map[peer.ID]MessageExistence)
}
msgMap[hash][node.String()] = Exists
msgMap[hash][node.ID] = Exists
msgAttr[hash] = MessageAttr{
Timestamp: uint64(mkv.Message.GetTimestamp()),
PubsubTopic: mkv.GetPubsubTopic(),
@ -283,13 +313,17 @@ func retrieveHistory(ctx context.Context, runId string, topic string, lastSyncTi
}
if storeNodeFailure {
// TODO: Notify that storenode was not available from X to Y time
logger.Error("storenode not available",
zap.Stringer("storenode", node),
zap.Time("startTime", startTime),
zap.Time("endTime", endTime),
zap.String("topic", topic),
zap.String("cursor", hexutil.Encode(result.Cursor())))
err := dbStore.RecordStorenodeUnavailable(runId, node)
if err != nil {
logger.Error("could not store recordnode unavailable", zap.Error(err), zap.Stringer("storenode", node))
}
break iteratorLbl
}
}
@ -303,16 +337,18 @@ func retrieveHistory(ctx context.Context, runId string, topic string, lastSyncTi
}
}
func verifyMessageExistence(ctx context.Context, runId string, tx *sql.Tx, nodeAddr multiaddr.Multiaddr, messageHashes []pb.MessageHash, wakuNode *node.WakuNode, dbStore *persistence.DBStore, logger *zap.Logger) {
func verifyMessageExistence(ctx context.Context, runId string, tx *sql.Tx, peerID peer.ID, messageHashes []pb.MessageHash, wakuNode *node.WakuNode, dbStore *persistence.DBStore, logger *zap.Logger) {
storeNodeFailure := false
var result *store.Result
var err error
peerInfo := wakuNode.Host().Peerstore().PeerInfo(peerID)
queryLbl:
for i := 0; i < maxAttempts; i++ {
result, err = wakuNode.Store().QueryByHash(ctx, messageHashes, store.IncludeData(false), store.WithPeerAddr(nodeAddr))
result, err = wakuNode.Store().QueryByHash(ctx, messageHashes, store.IncludeData(false), store.WithPeer(peerInfo.ID))
if err != nil {
logger.Error("could not query storenode", zap.Stringer("storenode", nodeAddr), zap.Error(err))
logger.Error("could not query storenode", zap.Stringer("storenode", peerInfo), zap.Error(err))
storeNodeFailure = true
time.Sleep(2 * time.Second)
} else {
@ -323,30 +359,28 @@ queryLbl:
if storeNodeFailure {
logger.Error("storenode not available",
zap.Stringer("storenode", nodeAddr),
zap.Stringer("storenode", peerInfo),
zap.Stringers("hashes", messageHashes))
err := dbStore.RecordStorenodeUnavailable(runId, nodeAddr.String())
err := dbStore.RecordStorenodeUnavailable(runId, peerInfo)
if err != nil {
logger.Error("could not store recordnode unavailable", zap.Error(err), zap.Stringer("storenode", nodeAddr))
logger.Error("could not store recordnode unavailable", zap.Error(err), zap.Stringer("storenode", peerInfo))
}
} else {
for !result.IsComplete() {
nodeAddrStr := nodeAddr.String()
msgMapLock.Lock()
for _, mkv := range result.Messages() {
hash := mkv.WakuMessageHash()
_, ok := msgMap[hash]
if !ok {
msgMap[hash] = make(map[string]MessageExistence)
msgMap[hash] = make(map[peer.ID]MessageExistence)
}
msgMap[hash][nodeAddrStr] = Exists
msgMap[hash][peerInfo.ID] = Exists
}
for _, msgHash := range messageHashes {
if msgMap[msgHash][nodeAddrStr] != Exists {
msgMap[msgHash][nodeAddrStr] = DoesNotExist
if msgMap[msgHash][peerInfo.ID] != Exists {
msgMap[msgHash][peerInfo.ID] = DoesNotExist
}
}
@ -358,7 +392,7 @@ queryLbl:
for i := 0; i < maxAttempts; i++ {
err = result.Next(ctx)
if err != nil {
logger.Error("could not query storenode", zap.Stringer("storenode", nodeAddr), zap.Error(err))
logger.Error("could not query storenode", zap.Stringer("storenode", peerInfo), zap.Error(err))
storeNodeFailure = true
time.Sleep(2 * time.Second)
} else {
@ -369,13 +403,13 @@ queryLbl:
if storeNodeFailure {
logger.Error("storenode not available",
zap.Stringer("storenode", nodeAddr),
zap.Stringer("storenode", peerInfo),
zap.Stringers("hashes", messageHashes),
zap.String("cursor", hexutil.Encode(result.Cursor())))
err := dbStore.RecordStorenodeUnavailable(runId, nodeAddr.String())
err := dbStore.RecordStorenodeUnavailable(runId, peerInfo)
if err != nil {
logger.Error("could not store recordnode unavailable", zap.Error(err), zap.Stringer("storenode", nodeAddr))
logger.Error("could not store recordnode unavailable", zap.Error(err), zap.Stringer("storenode", peerInfo))
}
}

View File

@ -12,20 +12,32 @@ import (
var cliFlags = []cli.Flag{
&cli.StringFlag{Name: "config-file", Usage: "loads configuration from a TOML file (cmd-line parameters take precedence)"},
cliutils.NewGenericFlagMultiValue(&cli.GenericFlag{
Name: "storenode",
Required: true,
Usage: "Multiaddr of peers that supports storeV3 protocol. Option may be repeated",
Name: "storenode",
Usage: "Multiaddr of peers that supports storeV3 protocol. Option may be repeated",
Value: &cliutils.MultiaddrSlice{
Values: &options.StoreNodes,
},
EnvVars: []string{"MSGVERIF_STORENODE"},
EnvVars: []string{"STORE_MSG_CTR_STORENODE"},
}),
altsrc.NewStringSliceFlag(&cli.StringSliceFlag{
Name: "dns-discovery-url",
Usage: "URL for DNS node list in format 'enrtree://<key>@<fqdn>'. Option may be repeated",
Destination: &options.DNSDiscoveryURLs,
EnvVars: []string{"STORE_MSG_CTR_DNS_DISC_URL"},
}),
altsrc.NewStringFlag(&cli.StringFlag{
Name: "dns-discovery-name-server",
Aliases: []string{"dns-discovery-nameserver"},
Usage: "DNS nameserver IP to query (empty to use system's default)",
Destination: &options.DNSDiscoveryNameserver,
EnvVars: []string{"STORE_MSG_CTR_DNS_DISC_NAMESERVER"},
}),
altsrc.NewUintFlag(&cli.UintFlag{
Name: "cluster-id",
Usage: "ClusterID to use",
Destination: &options.ClusterID,
Value: 0,
EnvVars: []string{"MSGVERIF_CLUSTER_ID"},
EnvVars: []string{"STORE_MSG_CTR_CLUSTER_ID"},
}),
altsrc.NewStringSliceFlag(&cli.StringSliceFlag{
Name: "pubsub-topic",
@ -33,7 +45,7 @@ var cliFlags = []cli.Flag{
Usage: "Pubsub topic used for the query. Argument may be repeated.",
Value: cli.NewStringSlice(relay.DefaultWakuTopic),
Destination: &options.PubSubTopics,
EnvVars: []string{"MSGVERIF_PUBSUB_TOPICS"},
EnvVars: []string{"STORE_MSG_CTR_PUBSUB_TOPICS"},
}),
altsrc.NewStringFlag(&cli.StringFlag{
Name: "db-url",
@ -47,7 +59,7 @@ var cliFlags = []cli.Flag{
Usage: "Retention policy. ",
Destination: &options.RetentionPolicy,
Value: 15 * 24 * time.Hour,
EnvVars: []string{"MSGVERIF_RETENTION_POLICY"},
EnvVars: []string{"STORE_MSG_CTR_RETENTION_POLICY"},
}),
cliutils.NewGenericFlagSingleValue(&cli.GenericFlag{
Name: "log-level",
@ -57,7 +69,7 @@ var cliFlags = []cli.Flag{
Value: &options.LogLevel,
},
Usage: "Define the logging level (allowed values: DEBUG, INFO, WARN, ERROR, DPANIC, PANIC, FATAL)",
EnvVars: []string{"MSGVERIF_LOG_LEVEL"},
EnvVars: []string{"STORE_MSG_CTR_LOG_LEVEL"},
}),
cliutils.NewGenericFlagSingleValue(&cli.GenericFlag{
Name: "log-encoding",
@ -66,13 +78,13 @@ var cliFlags = []cli.Flag{
Choices: []string{"console", "nocolor", "json"},
Value: &options.LogEncoding,
},
EnvVars: []string{"MSGVERIF_LOG_ENCODING"},
EnvVars: []string{"STORE_MSG_CTR_LOG_ENCODING"},
}),
altsrc.NewStringFlag(&cli.StringFlag{
Name: "log-output",
Value: "stdout",
Usage: "specifies where logging output should be written (stdout, file, file:./filename.log)",
Destination: &options.LogOutput,
EnvVars: []string{"MSGVERIF_LOG_OUTPUT"},
EnvVars: []string{"STORE_MSG_CTR_LOG_OUTPUT"},
}),
}

View File

@ -8,12 +8,14 @@ import (
)
type Options struct {
LogLevel string
LogEncoding string
LogOutput string
ClusterID uint
PubSubTopics cli.StringSlice
DatabaseURL string
RetentionPolicy time.Duration
StoreNodes []multiaddr.Multiaddr
LogLevel string
LogEncoding string
LogOutput string
ClusterID uint
PubSubTopics cli.StringSlice
DatabaseURL string
RetentionPolicy time.Duration
StoreNodes []multiaddr.Multiaddr
DNSDiscoveryNameserver string
DNSDiscoveryURLs cli.StringSlice
}

View File

@ -6,6 +6,7 @@ import (
"sync"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
"go.uber.org/zap"
@ -238,7 +239,7 @@ func (d *DBStore) UpdateTopicSyncState(tx *sql.Tx, clusterID uint, topic string,
return stmt.Close()
}
func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, clusterID uint, topic string, timestamp uint64, storenodes []string, status string) error {
func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash, clusterID uint, topic string, timestamp uint64, storenodes []peer.AddrInfo, status string) error {
if len(storenodes) == 0 {
return nil
}
@ -251,7 +252,7 @@ func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash,
now := time.Now().UnixNano()
for _, s := range storenodes {
_, err := stmt.Exec(uuid, clusterID, topic, msgHash.String(), timestamp, s, status, now)
_, err := stmt.Exec(uuid, clusterID, topic, msgHash.String(), timestamp, s.Addrs[0].String(), status, now)
if err != nil {
return err
}
@ -260,7 +261,7 @@ func (d *DBStore) RecordMessage(uuid string, tx *sql.Tx, msgHash pb.MessageHash,
return nil
}
func (d *DBStore) RecordStorenodeUnavailable(uuid string, storenode string) error {
func (d *DBStore) RecordStorenodeUnavailable(uuid string, storenode peer.AddrInfo) error {
stmt, err := d.db.Prepare("INSERT INTO storeNodeUnavailable(runId, storenode, requestTime) VALUES ($1, $2, $3) ON CONFLICT DO NOTHING")
if err != nil {
return err
@ -268,7 +269,7 @@ func (d *DBStore) RecordStorenodeUnavailable(uuid string, storenode string) erro
defer stmt.Close()
now := time.Now().UnixNano()
_, err = stmt.Exec(uuid, storenode, now)
_, err = stmt.Exec(uuid, storenode.Addrs[0].String(), now)
if err != nil {
return err
}