272 lines
7.2 KiB
Go
Raw Permalink Normal View History

2024-06-06 19:29:09 -04:00
package main
import (
"context"
"crypto/ecdsa"
"fmt"
"os"
"time"
"github.com/ethereum/go-ethereum/crypto"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
cli "github.com/urfave/cli/v2"
"github.com/urfave/cli/v2/altsrc"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/cliutils"
"github.com/waku-org/go-waku/waku/v2/node"
2024-06-06 19:43:27 -04:00
"github.com/waku-org/go-waku/waku/v2/protocol"
2024-06-06 19:29:09 -04:00
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store"
2024-06-06 19:43:27 -04:00
"github.com/waku-org/go-waku/waku/v2/protocol/store"
2024-06-06 19:29:09 -04:00
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/protobuf/proto"
)
type Options struct {
NodeKey *ecdsa.PrivateKey
ClusterID uint
PubSubTopic string
ContentTopics cli.StringSlice
StartTime int64
EndTime int64
2024-06-06 19:43:27 -04:00
PageSize uint64
2024-06-06 19:29:09 -04:00
StoreNode *multiaddr.Multiaddr
2024-06-06 19:43:27 -04:00
UseLegacy bool
2024-06-06 19:29:09 -04:00
QueryTimeout time.Duration
LogLevel string
LogEncoding string
LogOutput string
}
var options Options
var flags []cli.Flag = []cli.Flag{
&cli.StringFlag{Name: "config-file", Usage: "loads configuration from a TOML file (cmd-line parameters take precedence)"},
cliutils.NewGenericFlagSingleValue(&cli.GenericFlag{
Name: "nodekey",
Usage: "P2P node private key as hex.",
Value: &cliutils.PrivateKeyValue{
Value: &options.NodeKey,
},
}),
altsrc.NewUintFlag(&cli.UintFlag{
Name: "cluster-id",
Value: 0,
Usage: "Cluster id that the node is running in. Node in a different cluster id is disconnected.",
Destination: &options.ClusterID,
}),
altsrc.NewStringFlag(&cli.StringFlag{
Name: "pubsub-topic",
Usage: "Query pubsub topic.",
Destination: &options.PubSubTopic,
Required: true,
}),
altsrc.NewStringSliceFlag(&cli.StringSliceFlag{
Name: "content-topic",
Usage: "Query content topic. Argument may be repeated.",
Destination: &options.ContentTopics,
}),
altsrc.NewInt64Flag(&cli.Int64Flag{
Name: "start-time",
Usage: "Query start time in nanoseconds",
Destination: &options.StartTime,
}),
altsrc.NewInt64Flag(&cli.Int64Flag{
Name: "end-time",
Usage: "Query end time in nanoseconds",
Destination: &options.EndTime,
}),
2024-06-06 19:43:27 -04:00
altsrc.NewUint64Flag(&cli.Uint64Flag{
Name: "pagesize",
Value: 20,
Usage: "Pagesize",
Destination: &options.PageSize,
}),
2024-06-06 19:29:09 -04:00
cliutils.NewGenericFlagSingleValue(&cli.GenericFlag{
Name: "storenode",
Usage: "Multiaddr of a peer that supports store protocol",
Value: &cliutils.MultiaddrValue{
Value: &options.StoreNode,
},
Required: true,
}),
2024-06-06 19:43:27 -04:00
altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "use-legacy",
Usage: "Use legacy store",
Destination: &options.UseLegacy,
}),
2024-06-06 19:29:09 -04:00
altsrc.NewDurationFlag(&cli.DurationFlag{
Name: "timeout",
Usage: "timeout for each individual store query request",
Destination: &options.QueryTimeout,
Value: 1 * time.Minute,
}),
cliutils.NewGenericFlagSingleValue(&cli.GenericFlag{
Name: "log-level",
Aliases: []string{"l"},
Value: &cliutils.ChoiceValue{
Choices: []string{"DEBUG", "INFO", "WARN", "ERROR", "DPANIC", "PANIC", "FATAL"},
Value: &options.LogLevel,
},
Usage: "Define the logging level (allowed values: DEBUG, INFO, WARN, ERROR, DPANIC, PANIC, FATAL)",
}),
cliutils.NewGenericFlagSingleValue(&cli.GenericFlag{
Name: "log-encoding",
Usage: "Define the encoding used for the logs (allowed values: console, nocolor, json)",
Value: &cliutils.ChoiceValue{
Choices: []string{"console", "nocolor", "json"},
Value: &options.LogEncoding,
},
}),
altsrc.NewStringFlag(&cli.StringFlag{
Name: "log-output",
Value: "stdout",
Usage: "specifies where logging output should be written (stdout, file, file:./filename.log)",
Destination: &options.LogOutput,
}),
}
func main() {
// Defaults
options.LogLevel = "INFO"
options.LogEncoding = "console"
app := &cli.App{
Name: "query",
Version: "0.0.1",
Before: altsrc.InitInputSourceWithContext(flags, altsrc.NewTomlSourceFromFlagFunc("config-file")),
Flags: flags,
Action: func(c *cli.Context) error {
err := Execute(c.Context, options)
if err != nil {
return cli.Exit(err.Error(), 1)
}
return nil
},
}
err := app.Run(os.Args)
if err != nil {
panic(err)
}
}
func Execute(ctx context.Context, opts Options) error {
2025-04-17 17:41:01 +02:00
utils.InitLogger(options.LogEncoding, options.LogOutput, "query", zapcore.DebugLevel)
2024-06-06 19:29:09 -04:00
var prvKey *ecdsa.PrivateKey
var err error
if options.NodeKey != nil {
prvKey = options.NodeKey
} else {
if prvKey, err = crypto.GenerateKey(); err != nil {
return fmt.Errorf("error generating key: %w", err)
}
}
p2pPrvKey := utils.EcdsaPrivKeyToSecp256k1PrivKey(prvKey)
id, err := peer.IDFromPublicKey(p2pPrvKey.GetPublic())
if err != nil {
return err
}
logger := utils.Logger().With(logging.HostID("node", id))
lvl, err := zapcore.ParseLevel(options.LogLevel)
if err != nil {
return err
}
libp2pOpts := append(node.DefaultLibP2POptions, libp2p.NATPortMap()) // Attempt to open ports using uPNP for NATed hosts.)
wakuNode, err := node.New(
node.WithLogger(logger),
node.WithLogLevel(lvl),
node.WithPrivateKey(prvKey),
node.WithClusterID(uint16(options.ClusterID)),
2025-04-17 17:41:01 +02:00
node.WithShards([]uint16{32, 64}),
2024-06-06 19:29:09 -04:00
node.WithNTP(),
node.WithLibP2POptions(libp2pOpts...),
)
if err != nil {
return fmt.Errorf("could not instantiate waku: %w", err)
}
if err = wakuNode.Start(ctx); err != nil {
return err
}
defer wakuNode.Stop()
cnt := 0
2024-06-06 19:43:27 -04:00
if !options.UseLegacy {
criteria := store.FilterCriteria{
ContentFilter: protocol.NewContentFilter(options.PubSubTopic, options.ContentTopics.Value()...),
TimeStart: proto.Int64(options.StartTime),
TimeEnd: proto.Int64(options.EndTime),
}
2024-06-06 19:29:09 -04:00
ctx, cancel := context.WithTimeout(context.Background(), options.QueryTimeout)
2024-06-06 19:43:27 -04:00
result, err := wakuNode.Store().Query(ctx, criteria,
store.WithPeerAddr(*options.StoreNode),
store.WithPaging(false, options.PageSize),
store.IncludeData(false),
)
cancel()
2024-06-06 19:29:09 -04:00
if err != nil {
return err
}
2024-06-06 19:43:27 -04:00
for !result.IsComplete() {
cnt += len(result.Messages())
ctx, cancel := context.WithTimeout(context.Background(), options.QueryTimeout)
err := result.Next(ctx)
cancel()
if err != nil {
return err
}
}
} else {
query := legacy_store.Query{
PubsubTopic: options.PubSubTopic,
ContentTopics: options.ContentTopics.Value(),
StartTime: proto.Int64(options.StartTime),
EndTime: proto.Int64(options.EndTime),
}
ctx, cancel := context.WithTimeout(context.Background(), options.QueryTimeout)
result, err := wakuNode.LegacyStore().Query(ctx, query,
legacy_store.WithPeerAddr(*options.StoreNode),
legacy_store.WithPaging(false, 20),
)
cancel()
if err != nil {
return err
2024-06-06 19:29:09 -04:00
}
2024-06-06 19:43:27 -04:00
for {
ctx, cancel := context.WithTimeout(context.Background(), options.QueryTimeout)
hasNext, err := result.Next(ctx)
cancel()
if err != nil {
return err
}
if !hasNext { // No more messages available
break
}
cnt += len(result.GetMessages())
}
2024-06-06 19:29:09 -04:00
}
logger.Info("TOTAL MESSAGES RETRIEVED", zap.Int("num", cnt))
return nil
}