mirror of
https://github.com/logos-messaging/message-finder.git
synced 2026-01-02 14:03:11 +00:00
initial commit
This commit is contained in:
commit
a28415b70c
6
Makefile
Normal file
6
Makefile
Normal file
@ -0,0 +1,6 @@
|
||||
.PHONY: all build
|
||||
|
||||
tests:
|
||||
go test ./... -count 1 -v
|
||||
|
||||
all: tests
|
||||
17
README.md
Normal file
17
README.md
Normal file
@ -0,0 +1,17 @@
|
||||
# Message Finder
|
||||
Retrieve WakuMessages from Storenodes
|
||||
|
||||
|
||||
### How to use
|
||||
|
||||
1. Edit `main_test.go`. Setup the following variables if neccesary
|
||||
- `nodeList`: set the list of storenode multiaddresses
|
||||
- `clusterID`: the cluster id used by the storenodes (this was set with the `--cluster-id` flag when running the store node)
|
||||
- `pubsubTopic`: pubsub topic on which the message was published. In the status app, `"/waku/2/default-waku/proto"` is used in `status.prod` fleet and `"/waku/2/rs/16/32"` in `shards.test` fleet
|
||||
- `contentTopics`: array of strings with content topics. In the status app use the following format `"/waku/1/0xaabbccdd/rfc26"`
|
||||
- `startTime`: unix timestamp in nanoseconds
|
||||
- `endTime`: unix timestamp in nanoseconds
|
||||
- `envelopeHash`: the hash of the message to find. This is optional. (Use `0x` to not search for an specific message)
|
||||
2. Execute `make`
|
||||
|
||||
The program will attempt to retrieve the messages that match the criteria described in the previous variables, and print basic details about them, as well as use any query cursor returned to retrieve more pages of results.
|
||||
137
go.mod
Normal file
137
go.mod
Normal file
@ -0,0 +1,137 @@
|
||||
module example.com/m
|
||||
|
||||
go 1.19
|
||||
|
||||
replace github.com/ethereum/go-ethereum v1.10.26 => github.com/status-im/go-ethereum v1.10.25-status.4
|
||||
|
||||
//replace github.com/waku-org/go-waku v0.7.0 => /home/richard/waku-org/go-waku
|
||||
|
||||
require (
|
||||
github.com/ethereum/go-ethereum v1.10.26
|
||||
github.com/libp2p/go-libp2p v0.32.2
|
||||
github.com/multiformats/go-multiaddr v0.12.0
|
||||
github.com/stretchr/testify v1.8.4
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240115144202-0723ff928263
|
||||
go.uber.org/zap v1.26.0
|
||||
google.golang.org/protobuf v1.31.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 // indirect
|
||||
github.com/avast/retry-go/v4 v4.5.1 // indirect
|
||||
github.com/beevik/ntp v0.3.0 // indirect
|
||||
github.com/benbjohnson/clock v1.3.5 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/btcsuite/btcd v0.20.1-beta // indirect
|
||||
github.com/btcsuite/btcd/btcec/v2 v2.2.1 // indirect
|
||||
github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d // indirect
|
||||
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||
github.com/containerd/cgroups v1.1.0 // indirect
|
||||
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
|
||||
github.com/cruxic/go-hmac-drbg v0.0.0-20170206035330-84c46983886d // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
|
||||
github.com/deckarep/golang-set v1.8.0 // indirect
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
|
||||
github.com/docker/go-units v0.5.0 // indirect
|
||||
github.com/elastic/gosigar v0.14.2 // indirect
|
||||
github.com/flynn/noise v1.0.0 // indirect
|
||||
github.com/francoispqt/gojay v1.2.13 // indirect
|
||||
github.com/go-ole/go-ole v1.2.1 // indirect
|
||||
github.com/go-stack/stack v1.8.1 // indirect
|
||||
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
|
||||
github.com/godbus/dbus/v5 v5.1.0 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang-jwt/jwt/v4 v4.4.2 // indirect
|
||||
github.com/golang-migrate/migrate/v4 v4.16.2 // indirect
|
||||
github.com/golang/protobuf v1.5.3 // indirect
|
||||
github.com/golang/snappy v0.0.4 // indirect
|
||||
github.com/google/gopacket v1.1.19 // indirect
|
||||
github.com/google/pprof v0.0.0-20231023181126-ff6d637d2a7b // indirect
|
||||
github.com/google/uuid v1.3.0 // indirect
|
||||
github.com/gorilla/websocket v1.5.0 // indirect
|
||||
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.5 // indirect
|
||||
github.com/huin/goupnp v1.3.0 // indirect
|
||||
github.com/ipfs/go-cid v0.4.1 // indirect
|
||||
github.com/ipfs/go-log/v2 v2.5.1 // indirect
|
||||
github.com/jackpal/go-nat-pmp v1.0.2 // indirect
|
||||
github.com/jbenet/go-temp-err-catcher v0.1.0 // indirect
|
||||
github.com/klauspost/compress v1.17.2 // indirect
|
||||
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
|
||||
github.com/koron/go-ssdp v0.0.4 // indirect
|
||||
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
|
||||
github.com/libp2p/go-cidranger v1.1.0 // indirect
|
||||
github.com/libp2p/go-flow-metrics v0.1.0 // indirect
|
||||
github.com/libp2p/go-libp2p-asn-util v0.3.0 // indirect
|
||||
github.com/libp2p/go-libp2p-mplex v0.9.0 // indirect
|
||||
github.com/libp2p/go-libp2p-pubsub v0.10.0 // indirect
|
||||
github.com/libp2p/go-mplex v0.7.0 // indirect
|
||||
github.com/libp2p/go-msgio v0.3.0 // indirect
|
||||
github.com/libp2p/go-nat v0.2.0 // indirect
|
||||
github.com/libp2p/go-netroute v0.2.1 // indirect
|
||||
github.com/libp2p/go-reuseport v0.4.0 // indirect
|
||||
github.com/libp2p/go-yamux/v4 v4.0.1 // indirect
|
||||
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
|
||||
github.com/miekg/dns v1.1.56 // indirect
|
||||
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect
|
||||
github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect
|
||||
github.com/minio/sha256-simd v1.0.1 // indirect
|
||||
github.com/mr-tron/base58 v1.2.0 // indirect
|
||||
github.com/multiformats/go-base32 v0.1.0 // indirect
|
||||
github.com/multiformats/go-base36 v0.2.0 // indirect
|
||||
github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect
|
||||
github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect
|
||||
github.com/multiformats/go-multibase v0.2.0 // indirect
|
||||
github.com/multiformats/go-multicodec v0.9.0 // indirect
|
||||
github.com/multiformats/go-multihash v0.2.3 // indirect
|
||||
github.com/multiformats/go-multistream v0.5.0 // indirect
|
||||
github.com/multiformats/go-varint v0.0.7 // indirect
|
||||
github.com/onsi/ginkgo/v2 v2.13.0 // indirect
|
||||
github.com/opencontainers/runtime-spec v1.1.0 // indirect
|
||||
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/prometheus/client_golang v1.14.0 // indirect
|
||||
github.com/prometheus/client_model v0.4.0 // indirect
|
||||
github.com/prometheus/common v0.42.0 // indirect
|
||||
github.com/prometheus/procfs v0.9.0 // indirect
|
||||
github.com/quic-go/qpack v0.4.0 // indirect
|
||||
github.com/quic-go/qtls-go1-20 v0.3.4 // indirect
|
||||
github.com/quic-go/quic-go v0.39.4 // indirect
|
||||
github.com/quic-go/webtransport-go v0.6.0 // indirect
|
||||
github.com/raulk/go-watchdog v1.3.0 // indirect
|
||||
github.com/rjeczalik/notify v0.9.3 // indirect
|
||||
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
|
||||
github.com/spaolacci/murmur3 v1.1.0 // indirect
|
||||
github.com/status-im/status-go/extkeys v1.1.2 // indirect
|
||||
github.com/syndtr/goleveldb v1.0.1-0.20220614013038-64ee5596c38a // indirect
|
||||
github.com/tklauser/go-sysconf v0.3.5 // indirect
|
||||
github.com/tklauser/numcpus v0.2.2 // indirect
|
||||
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98 // indirect
|
||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 // indirect
|
||||
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 // indirect
|
||||
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b // indirect
|
||||
github.com/waku-org/go-zerokit-rln-arm v0.0.0-20230916171929-1dd9494ff065 // indirect
|
||||
github.com/waku-org/go-zerokit-rln-x86_64 v0.0.0-20230916171518-2a77c3734dd1 // indirect
|
||||
github.com/wk8/go-ordered-map v1.0.0 // indirect
|
||||
go.uber.org/dig v1.17.1 // indirect
|
||||
go.uber.org/fx v1.20.1 // indirect
|
||||
go.uber.org/mock v0.3.0 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
golang.org/x/crypto v0.14.0 // indirect
|
||||
golang.org/x/exp v0.0.0-20231006140011-7918f672742d // indirect
|
||||
golang.org/x/mod v0.13.0 // indirect
|
||||
golang.org/x/net v0.17.0 // indirect
|
||||
golang.org/x/sync v0.4.0 // indirect
|
||||
golang.org/x/sys v0.13.0 // indirect
|
||||
golang.org/x/text v0.13.0 // indirect
|
||||
golang.org/x/time v0.0.0-20220922220347-f3bd1da661af // indirect
|
||||
golang.org/x/tools v0.14.0 // indirect
|
||||
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
lukechampine.com/blake3 v1.2.1 // indirect
|
||||
)
|
||||
52
main_test.go
Normal file
52
main_test.go
Normal file
@ -0,0 +1,52 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
)
|
||||
|
||||
// If using vscode, go to Preferences > Settings, and edit Go: Test Timeout to at least 60s
|
||||
|
||||
// List of store nodes
|
||||
var nodeList = []string{
|
||||
"/dns4/store-01.do-ams3.shards.test.statusim.net/tcp/30303/p2p/16Uiu2HAmAUdrQ3uwzuE4Gy4D56hX6uLKEeerJAnhKEHZ3DxF1EfT",
|
||||
"/dns4/store-02.do-ams3.shards.test.statusim.net/tcp/30303/p2p/16Uiu2HAm9aDJPkhGxc2SFcEACTFdZ91Q5TJjp76qZEhq9iF59x7R",
|
||||
"/dns4/store-01.gc-us-central1-a.shards.test.statusim.net/tcp/30303/p2p/16Uiu2HAmMELCo218hncCtTvC2Dwbej3rbyHQcR8erXNnKGei7WPZ",
|
||||
"/dns4/store-02.gc-us-central1-a.shards.test.statusim.net/tcp/30303/p2p/16Uiu2HAmJnVR7ZzFaYvciPVafUXuYGLHPzSUigqAmeNw9nJUVGeM",
|
||||
"/dns4/store-01.ac-cn-hongkong-c.shards.test.statusim.net/tcp/30303/p2p/16Uiu2HAm2M7xs7cLPc3jamawkEqbr7cUJX11uvY7LxQ6WFUdUKUT",
|
||||
"/dns4/store-02.ac-cn-hongkong-c.shards.test.statusim.net/tcp/30303/p2p/16Uiu2HAm9CQhsuwPR54q27kNj9iaQVfyRzTGKrhFmr94oD8ujU6P",
|
||||
}
|
||||
|
||||
var clusterID uint16 = 16 // shards.test and status.prod = 16
|
||||
|
||||
// CRITERIA --------------------------------------------------------------------------
|
||||
var pubsubTopic = "/waku/2/rs/16/32" // "/waku/2/default-waku/proto" in status.prod and "/waku/2/rs/16/32" in shards.test
|
||||
var contentTopics = []string{} // []string{"/waku/1/0xaabbccdd/rfc26"}
|
||||
var startTime = time.Now().Add(-20 * time.Minute) // time.Unix(0, 1705486902684656000).Add(-60 * time.Second)
|
||||
var endTime = time.Now() // time.Unix(0, 1705486902684656000).Add(60 * time.Second)
|
||||
var envelopeHash = "0x" // Use "0x" to find all messages that match the pubsub topic, content topic and start/end time
|
||||
|
||||
func (s *StoreSuite) TestFindMessage() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
|
||||
defer cancel()
|
||||
|
||||
addNodes(ctx, s.node)
|
||||
hash, err := hexutil.Decode(envelopeHash)
|
||||
if err != nil {
|
||||
panic("invalid envelope hash id")
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
for _, addr := range nodeList {
|
||||
wg.Add(1)
|
||||
func(addr string) {
|
||||
defer wg.Done()
|
||||
_, err := queryNode(ctx, s.node, addr, pubsubTopic, contentTopics, startTime, endTime, hash)
|
||||
s.NoError(err)
|
||||
}(addr)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
37
setup_test.go
Normal file
37
setup_test.go
Normal file
@ -0,0 +1,37 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/suite"
|
||||
"github.com/waku-org/go-waku/waku/v2/node"
|
||||
)
|
||||
|
||||
func TestStoreSuite(t *testing.T) {
|
||||
suite.Run(t, new(StoreSuite))
|
||||
}
|
||||
|
||||
type StoreSuite struct {
|
||||
suite.Suite
|
||||
node *node.WakuNode
|
||||
}
|
||||
|
||||
func (s *StoreSuite) SetupSuite() {
|
||||
wakuNode, err := node.New(
|
||||
node.WithNTP(),
|
||||
node.WithWakuRelayAndMinPeers(1),
|
||||
node.WithClusterID(clusterID),
|
||||
)
|
||||
|
||||
s.NoError(err)
|
||||
|
||||
err = wakuNode.Start(context.Background())
|
||||
s.NoError(err)
|
||||
|
||||
s.node = wakuNode
|
||||
}
|
||||
|
||||
func (s *StoreSuite) TearDownSuite() {
|
||||
s.node.Stop()
|
||||
}
|
||||
93
utils_test.go
Normal file
93
utils_test.go
Normal file
@ -0,0 +1,93 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"github.com/waku-org/go-waku/logging"
|
||||
"github.com/waku-org/go-waku/waku/v2/node"
|
||||
"github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
var log = utils.Logger().Named("message-finder")
|
||||
|
||||
func addNodes(ctx context.Context, node *node.WakuNode) {
|
||||
for _, addr := range nodeList {
|
||||
ma, err := multiaddr.NewMultiaddr(addr)
|
||||
if err != nil {
|
||||
log.Error("invalid multiaddress", zap.Error(err), zap.String("addr", addr))
|
||||
continue
|
||||
}
|
||||
|
||||
_ = ma
|
||||
|
||||
_, err = node.AddPeer(ma, peerstore.Static, []string{string(store.StoreID_v20beta4)})
|
||||
if err != nil {
|
||||
log.Error("could not add peer", zap.Error(err), zap.Stringer("addr", ma))
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func queryNode(ctx context.Context, node *node.WakuNode, addr string, pubsubTopic string, contentTopics []string, startTime time.Time, endTime time.Time, envelopeHash []byte) (int, error) {
|
||||
p, err := multiaddr.NewMultiaddr(addr)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
info, err := peer.AddrInfoFromP2pAddr(p)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
cnt := 0
|
||||
cursorIterations := 0
|
||||
|
||||
result, err := node.Store().Query(ctx, store.Query{
|
||||
PubsubTopic: pubsubTopic,
|
||||
ContentTopics: contentTopics,
|
||||
StartTime: proto.Int64(startTime.UnixNano()),
|
||||
EndTime: proto.Int64(endTime.UnixNano()),
|
||||
}, store.WithPeer(info.ID), store.WithPaging(false, 20), store.WithRequestID([]byte{1, 2, 3, 4, 5, 6, 7, 8}))
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
for {
|
||||
hasNext, err := result.Next(ctx)
|
||||
if err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
if !hasNext { // No more messages available
|
||||
break
|
||||
}
|
||||
cursorIterations += 1
|
||||
|
||||
// uncomment to find message by ID
|
||||
for _, m := range result.GetMessages() {
|
||||
if len(envelopeHash) != 0 && bytes.Equal(m.Hash(pubsubTopic), envelopeHash) {
|
||||
log.Info("⚠️⚠️⚠️ MESSAGE FOUND ⚠️⚠️⚠️", logging.HexBytes("envelopeHash", envelopeHash), logging.HostID("storeNode", info.ID))
|
||||
return 0, nil
|
||||
} else {
|
||||
log.Info("", zap.String("envelopeHash", hex.EncodeToString(m.Hash(pubsubTopic))), zap.String("contentTopic", m.ContentTopic), zap.String("timestamp", fmt.Sprintf("%d", m.GetTimestamp())), logging.HostID("storeNode", info.ID), zap.Int("page", cursorIterations))
|
||||
}
|
||||
}
|
||||
|
||||
cnt += len(result.GetMessages())
|
||||
}
|
||||
|
||||
log.Info(fmt.Sprintf("%d messages found in %s (Used cursor %d times)\n", cnt, info.ID, cursorIterations))
|
||||
|
||||
return cnt, nil
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user