mirror of
https://github.com/logos-messaging/logos-messaging-test-query.git
synced 2026-01-02 14:03:10 +00:00
272 lines
7.2 KiB
Go
272 lines
7.2 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"crypto/ecdsa"
|
|
"fmt"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/crypto"
|
|
"github.com/libp2p/go-libp2p"
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
"github.com/multiformats/go-multiaddr"
|
|
cli "github.com/urfave/cli/v2"
|
|
"github.com/urfave/cli/v2/altsrc"
|
|
"github.com/waku-org/go-waku/logging"
|
|
"github.com/waku-org/go-waku/waku/cliutils"
|
|
"github.com/waku-org/go-waku/waku/v2/node"
|
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
|
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store"
|
|
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
|
"github.com/waku-org/go-waku/waku/v2/utils"
|
|
"go.uber.org/zap"
|
|
"go.uber.org/zap/zapcore"
|
|
"google.golang.org/protobuf/proto"
|
|
)
|
|
|
|
type Options struct {
|
|
NodeKey *ecdsa.PrivateKey
|
|
ClusterID uint
|
|
PubSubTopic string
|
|
ContentTopics cli.StringSlice
|
|
StartTime int64
|
|
EndTime int64
|
|
PageSize uint64
|
|
StoreNode *multiaddr.Multiaddr
|
|
UseLegacy bool
|
|
QueryTimeout time.Duration
|
|
LogLevel string
|
|
LogEncoding string
|
|
LogOutput string
|
|
}
|
|
|
|
var options Options
|
|
|
|
var flags []cli.Flag = []cli.Flag{
|
|
&cli.StringFlag{Name: "config-file", Usage: "loads configuration from a TOML file (cmd-line parameters take precedence)"},
|
|
cliutils.NewGenericFlagSingleValue(&cli.GenericFlag{
|
|
Name: "nodekey",
|
|
Usage: "P2P node private key as hex.",
|
|
Value: &cliutils.PrivateKeyValue{
|
|
Value: &options.NodeKey,
|
|
},
|
|
}),
|
|
altsrc.NewUintFlag(&cli.UintFlag{
|
|
Name: "cluster-id",
|
|
Value: 0,
|
|
Usage: "Cluster id that the node is running in. Node in a different cluster id is disconnected.",
|
|
Destination: &options.ClusterID,
|
|
}),
|
|
altsrc.NewStringFlag(&cli.StringFlag{
|
|
Name: "pubsub-topic",
|
|
Usage: "Query pubsub topic.",
|
|
Destination: &options.PubSubTopic,
|
|
Required: true,
|
|
}),
|
|
altsrc.NewStringSliceFlag(&cli.StringSliceFlag{
|
|
Name: "content-topic",
|
|
Usage: "Query content topic. Argument may be repeated.",
|
|
Destination: &options.ContentTopics,
|
|
}),
|
|
altsrc.NewInt64Flag(&cli.Int64Flag{
|
|
Name: "start-time",
|
|
Usage: "Query start time in nanoseconds",
|
|
Destination: &options.StartTime,
|
|
}),
|
|
altsrc.NewInt64Flag(&cli.Int64Flag{
|
|
Name: "end-time",
|
|
Usage: "Query end time in nanoseconds",
|
|
Destination: &options.EndTime,
|
|
}),
|
|
altsrc.NewUint64Flag(&cli.Uint64Flag{
|
|
Name: "pagesize",
|
|
Value: 20,
|
|
Usage: "Pagesize",
|
|
Destination: &options.PageSize,
|
|
}),
|
|
cliutils.NewGenericFlagSingleValue(&cli.GenericFlag{
|
|
Name: "storenode",
|
|
Usage: "Multiaddr of a peer that supports store protocol",
|
|
Value: &cliutils.MultiaddrValue{
|
|
Value: &options.StoreNode,
|
|
},
|
|
Required: true,
|
|
}),
|
|
altsrc.NewBoolFlag(&cli.BoolFlag{
|
|
Name: "use-legacy",
|
|
Usage: "Use legacy store",
|
|
Destination: &options.UseLegacy,
|
|
}),
|
|
altsrc.NewDurationFlag(&cli.DurationFlag{
|
|
Name: "timeout",
|
|
Usage: "timeout for each individual store query request",
|
|
Destination: &options.QueryTimeout,
|
|
Value: 1 * time.Minute,
|
|
}),
|
|
cliutils.NewGenericFlagSingleValue(&cli.GenericFlag{
|
|
Name: "log-level",
|
|
Aliases: []string{"l"},
|
|
Value: &cliutils.ChoiceValue{
|
|
Choices: []string{"DEBUG", "INFO", "WARN", "ERROR", "DPANIC", "PANIC", "FATAL"},
|
|
Value: &options.LogLevel,
|
|
},
|
|
Usage: "Define the logging level (allowed values: DEBUG, INFO, WARN, ERROR, DPANIC, PANIC, FATAL)",
|
|
}),
|
|
cliutils.NewGenericFlagSingleValue(&cli.GenericFlag{
|
|
Name: "log-encoding",
|
|
Usage: "Define the encoding used for the logs (allowed values: console, nocolor, json)",
|
|
Value: &cliutils.ChoiceValue{
|
|
Choices: []string{"console", "nocolor", "json"},
|
|
Value: &options.LogEncoding,
|
|
},
|
|
}),
|
|
altsrc.NewStringFlag(&cli.StringFlag{
|
|
Name: "log-output",
|
|
Value: "stdout",
|
|
Usage: "specifies where logging output should be written (stdout, file, file:./filename.log)",
|
|
Destination: &options.LogOutput,
|
|
}),
|
|
}
|
|
|
|
func main() {
|
|
// Defaults
|
|
options.LogLevel = "INFO"
|
|
options.LogEncoding = "console"
|
|
|
|
app := &cli.App{
|
|
Name: "query",
|
|
Version: "0.0.1",
|
|
Before: altsrc.InitInputSourceWithContext(flags, altsrc.NewTomlSourceFromFlagFunc("config-file")),
|
|
Flags: flags,
|
|
Action: func(c *cli.Context) error {
|
|
err := Execute(c.Context, options)
|
|
if err != nil {
|
|
return cli.Exit(err.Error(), 1)
|
|
}
|
|
return nil
|
|
},
|
|
}
|
|
|
|
err := app.Run(os.Args)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func Execute(ctx context.Context, opts Options) error {
|
|
utils.InitLogger(options.LogEncoding, options.LogOutput, "query", zapcore.DebugLevel)
|
|
|
|
var prvKey *ecdsa.PrivateKey
|
|
var err error
|
|
|
|
if options.NodeKey != nil {
|
|
prvKey = options.NodeKey
|
|
} else {
|
|
if prvKey, err = crypto.GenerateKey(); err != nil {
|
|
return fmt.Errorf("error generating key: %w", err)
|
|
}
|
|
}
|
|
|
|
p2pPrvKey := utils.EcdsaPrivKeyToSecp256k1PrivKey(prvKey)
|
|
id, err := peer.IDFromPublicKey(p2pPrvKey.GetPublic())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
logger := utils.Logger().With(logging.HostID("node", id))
|
|
|
|
lvl, err := zapcore.ParseLevel(options.LogLevel)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
libp2pOpts := append(node.DefaultLibP2POptions, libp2p.NATPortMap()) // Attempt to open ports using uPNP for NATed hosts.)
|
|
|
|
wakuNode, err := node.New(
|
|
node.WithLogger(logger),
|
|
node.WithLogLevel(lvl),
|
|
node.WithPrivateKey(prvKey),
|
|
node.WithClusterID(uint16(options.ClusterID)),
|
|
node.WithShards([]uint16{32, 64}),
|
|
node.WithNTP(),
|
|
node.WithLibP2POptions(libp2pOpts...),
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("could not instantiate waku: %w", err)
|
|
}
|
|
|
|
if err = wakuNode.Start(ctx); err != nil {
|
|
return err
|
|
}
|
|
defer wakuNode.Stop()
|
|
|
|
cnt := 0
|
|
|
|
if !options.UseLegacy {
|
|
criteria := store.FilterCriteria{
|
|
ContentFilter: protocol.NewContentFilter(options.PubSubTopic, options.ContentTopics.Value()...),
|
|
TimeStart: proto.Int64(options.StartTime),
|
|
TimeEnd: proto.Int64(options.EndTime),
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), options.QueryTimeout)
|
|
result, err := wakuNode.Store().Query(ctx, criteria,
|
|
store.WithPeerAddr(*options.StoreNode),
|
|
store.WithPaging(false, options.PageSize),
|
|
store.IncludeData(false),
|
|
)
|
|
cancel()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for !result.IsComplete() {
|
|
cnt += len(result.Messages())
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), options.QueryTimeout)
|
|
err := result.Next(ctx)
|
|
cancel()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
} else {
|
|
query := legacy_store.Query{
|
|
PubsubTopic: options.PubSubTopic,
|
|
ContentTopics: options.ContentTopics.Value(),
|
|
StartTime: proto.Int64(options.StartTime),
|
|
EndTime: proto.Int64(options.EndTime),
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), options.QueryTimeout)
|
|
result, err := wakuNode.LegacyStore().Query(ctx, query,
|
|
legacy_store.WithPeerAddr(*options.StoreNode),
|
|
legacy_store.WithPaging(false, 20),
|
|
)
|
|
cancel()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for {
|
|
ctx, cancel := context.WithTimeout(context.Background(), options.QueryTimeout)
|
|
hasNext, err := result.Next(ctx)
|
|
cancel()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !hasNext { // No more messages available
|
|
break
|
|
}
|
|
|
|
cnt += len(result.GetMessages())
|
|
}
|
|
}
|
|
|
|
logger.Info("TOTAL MESSAGES RETRIEVED", zap.Int("num", cnt))
|
|
|
|
return nil
|
|
}
|