refactor_: start using nwaku
- some minor progress to add nwaku in status-go - nwaku.go: GetNumConnectedPeers controls when passed pubsub is empty - waku_test.go: adapt TestWakuV2Store - add missing shard.go - feat_: build nwaku with nix and use build tags to choose between go-waku and nwaku (#5896) - chore_: update nwaku - nwaku bump (#5911) - bump: nwaku - chore: add USE_NWAKU env flag - fix: build libwaku only if needed - feat: testing discovery and dialing with nwaku integration (#5940) - message publisher and sent verifier (#5966) - storenode requestor for missing message retrieval and result iterator impl (#5971) - uncomment code that would allow status-go/go-waku to compile and libwaku test to run (#5986) - supporting peer exchange with nwaku (#5983) - store queries - ping - ping storenodes using AddrInfo (#6004) - dial, drop and retrieve connected peers (#6013) - integrate on-demand DNS discovery and implement discoverAndConnectPeers (#6017) - extract libwaku calls into WakuNode struct (#6027) - async nwaku - remove nwaku process loop - receive messages via relay (#6185) - extract timeout from context - use correct port field, get free ports and uncomment some functions (#6200) - enable filter/lightpush/px and setup rate limits - add protected topics
This commit is contained in:
parent
d291204473
commit
a3bff47800
|
@ -64,7 +64,6 @@ coverage.html
|
|||
Session.vim
|
||||
.undodir/*
|
||||
/.idea/
|
||||
/.vscode/
|
||||
/cmd/*/.ethereum/
|
||||
*.iml
|
||||
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
[submodule "third_party/nwaku"]
|
||||
path = third_party/nwaku
|
||||
url = https://github.com/waku-org/nwaku
|
|
@ -8,4 +8,7 @@
|
|||
"cSpell.words": [
|
||||
"unmarshalling"
|
||||
],
|
||||
"gopls":{
|
||||
"buildFlags": ["-tags=use_nwaku,gowaku_skip_migrations,gowaku_no_rln"]
|
||||
}
|
||||
}
|
||||
|
|
49
Makefile
49
Makefile
|
@ -1,5 +1,6 @@
|
|||
.PHONY: statusgo all test clean help
|
||||
.PHONY: statusgo-android statusgo-ios
|
||||
.PHONY: build-libwaku test-libwaku clean-libwaku rebuild-libwaku
|
||||
|
||||
# Clear any GOROOT set outside of the Nix shell
|
||||
export GOROOT=
|
||||
|
@ -60,6 +61,10 @@ GIT_AUTHOR ?= $(shell git config user.email || echo $$USER)
|
|||
ENABLE_METRICS ?= true
|
||||
BUILD_TAGS ?= gowaku_no_rln
|
||||
|
||||
ifeq ($(USE_NWAKU), true)
|
||||
BUILD_TAGS += use_nwaku
|
||||
endif
|
||||
|
||||
BUILD_FLAGS ?= -ldflags="-X github.com/status-im/status-go/vendor/github.com/ethereum/go-ethereum/metrics.EnabledStr=$(ENABLE_METRICS)"
|
||||
BUILD_FLAGS_MOBILE ?=
|
||||
|
||||
|
@ -207,8 +212,19 @@ statusgo-library: ##@cross-compile Build status-go as static library for current
|
|||
@echo "Static library built:"
|
||||
@ls -la build/bin/libstatus.*
|
||||
|
||||
statusgo-shared-library: generate
|
||||
statusgo-shared-library: ##@cross-compile Build status-go as shared library for current platform
|
||||
|
||||
LIBWAKU := third_party/nwaku/build/libwaku.$(GOBIN_SHARED_LIB_EXT)
|
||||
$(LIBWAKU):
|
||||
@echo "Building libwaku"
|
||||
$(MAKE) -C third_party/nwaku update || { echo "nwaku make update failed"; exit 1; }
|
||||
$(MAKE) -C ./third_party/nwaku libwaku
|
||||
|
||||
build-libwaku: $(LIBWAKU)
|
||||
|
||||
statusgo-shared-library: generate ##@cross-compile Build status-go as shared library for current platform
|
||||
ifeq ($(USE_NWAKU),true)
|
||||
$(MAKE) $(LIBWAKU)
|
||||
endif
|
||||
## cmd/library/README.md explains the magic incantation behind this
|
||||
mkdir -p build/bin/statusgo-lib
|
||||
go run cmd/library/*.go > build/bin/statusgo-lib/main.go
|
||||
|
@ -291,9 +307,38 @@ lint-fix:
|
|||
-w {} \;
|
||||
$(MAKE) vendor
|
||||
|
||||
mock: ##@other Regenerate mocks
|
||||
mockgen -package=fake -destination=transactions/fake/mock.go -source=transactions/fake/txservice.go
|
||||
mockgen -package=status -destination=services/status/account_mock.go -source=services/status/service.go
|
||||
mockgen -package=peer -destination=services/peer/discoverer_mock.go -source=services/peer/service.go
|
||||
mockgen -package=mock_transactor -destination=transactions/mock_transactor/transactor.go -source=transactions/transactor.go
|
||||
mockgen -package=mock_pathprocessor -destination=services/wallet/router/pathprocessor/mock_pathprocessor/processor.go -source=services/wallet/router/pathprocessor/processor.go
|
||||
mockgen -package=mock_bridge -destination=services/wallet/bridge/mock_bridge/bridge.go -source=services/wallet/bridge/bridge.go
|
||||
mockgen -package=mock_client -destination=rpc/chain/mock/client/client.go -source=rpc/chain/client.go
|
||||
mockgen -package=mock_token -destination=services/wallet/token/mock/token/tokenmanager.go -source=services/wallet/token/token.go
|
||||
mockgen -package=mock_thirdparty -destination=services/wallet/thirdparty/mock/types.go -source=services/wallet/thirdparty/types.go
|
||||
mockgen -package=mock_balance_persistence -destination=services/wallet/token/mock/balance_persistence/balance_persistence.go -source=services/wallet/token/balance_persistence.go
|
||||
mockgen -package=mock_network -destination=rpc/network/mock/network.go -source=rpc/network/network.go
|
||||
mockgen -package=mock_rpcclient -destination=rpc/mock/client/client.go -source=rpc/client.go
|
||||
mockgen -package=mock_collectibles -destination=services/wallet/collectibles/mock/collection_data_db.go -source=services/wallet/collectibles/collection_data_db.go
|
||||
mockgen -package=mock_collectibles -destination=services/wallet/collectibles/mock/collectible_data_db.go -source=services/wallet/collectibles/collectible_data_db.go
|
||||
mockgen -package=mock_thirdparty -destination=services/wallet/thirdparty/mock/collectible_types.go -source=services/wallet/thirdparty/collectible_types.go
|
||||
mockgen -package=mock_paraswap -destination=services/wallet/thirdparty/paraswap/mock/types.go -source=services/wallet/thirdparty/paraswap/types.go
|
||||
mockgen -package=mock_onramp -destination=services/wallet/onramp/mock/types.go -source=services/wallet/onramp/types.go
|
||||
|
||||
|
||||
docker-test: ##@tests Run tests in a docker container with golang.
|
||||
docker run --privileged --rm -it -v "$(PWD):$(DOCKER_TEST_WORKDIR)" -w "$(DOCKER_TEST_WORKDIR)" $(DOCKER_TEST_IMAGE) go test ${ARGS}
|
||||
|
||||
test-libwaku: | $(LIBWAKU)
|
||||
go test -tags '$(BUILD_TAGS) use_nwaku' -run TestBasicWakuV2 ./wakuv2/... -count 1 -v -json | jq -r '.Output'
|
||||
|
||||
clean-libwaku:
|
||||
@echo "Removing libwaku"
|
||||
rm $(LIBWAKU)
|
||||
|
||||
rebuild-libwaku: | clean-libwaku $(LIBWAKU)
|
||||
|
||||
test: test-unit ##@tests Run basic, short tests during development
|
||||
|
||||
test-unit-prep: generate
|
||||
|
|
|
@ -24,13 +24,13 @@ import (
|
|||
"github.com/status-im/status-go/multiaccounts"
|
||||
"github.com/status-im/status-go/multiaccounts/accounts"
|
||||
"github.com/status-im/status-go/multiaccounts/settings"
|
||||
"github.com/status-im/status-go/wakuv2"
|
||||
|
||||
"github.com/status-im/status-go/cmd/utils"
|
||||
"github.com/status-im/status-go/logutils"
|
||||
"github.com/status-im/status-go/params"
|
||||
"github.com/status-im/status-go/protocol"
|
||||
"github.com/status-im/status-go/protocol/common"
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
"github.com/status-im/status-go/protocol/identity/alias"
|
||||
"github.com/status-im/status-go/protocol/protobuf"
|
||||
wakuextn "github.com/status-im/status-go/services/wakuext"
|
||||
|
@ -49,8 +49,8 @@ var (
|
|||
seedPhrase = flag.String("seed-phrase", "", "Seed phrase")
|
||||
version = flag.Bool("version", false, "Print version and dump configuration")
|
||||
communityID = flag.String("community-id", "", "The id of the community")
|
||||
shardCluster = flag.Int("shard-cluster", shard.MainStatusShardCluster, "The shard cluster in which the of the community is published")
|
||||
shardIndex = flag.Int("shard-index", shard.DefaultShardIndex, "The shard index in which the community is published")
|
||||
shardCluster = flag.Int("shard-cluster", wakuv2.MainStatusShardCluster, "The shard cluster in which the of the community is published")
|
||||
shardIndex = flag.Int("shard-index", wakuv2.DefaultShardIndex, "The shard index in which the community is published")
|
||||
chatID = flag.String("chat-id", "", "The id of the chat")
|
||||
|
||||
dataDir = flag.String("dir", getDefaultDataDir(), "Directory used by node to store data")
|
||||
|
@ -152,9 +152,9 @@ func main() {
|
|||
|
||||
messenger := wakuextservice.Messenger()
|
||||
|
||||
var s *shard.Shard = nil
|
||||
var s *wakuv2.Shard = nil
|
||||
if shardCluster != nil && shardIndex != nil {
|
||||
s = &shard.Shard{
|
||||
s = &wakuv2.Shard{
|
||||
Cluster: uint16(*shardCluster),
|
||||
Index: uint16(*shardIndex),
|
||||
}
|
||||
|
|
|
@ -63,11 +63,6 @@ func (w *GethWakuWrapper) StopDiscV5() error {
|
|||
return errors.New("not available in WakuV1")
|
||||
}
|
||||
|
||||
// PeerCount function only added for compatibility with waku V2
|
||||
func (w *GethWakuWrapper) AddStorePeer(address multiaddr.Multiaddr) (peer.ID, error) {
|
||||
return "", errors.New("not available in WakuV1")
|
||||
}
|
||||
|
||||
// SubscribeToPubsubTopic function only added for compatibility with waku V2
|
||||
func (w *GethWakuWrapper) SubscribeToPubsubTopic(topic string, optPublicKey *ecdsa.PublicKey) error {
|
||||
// not available in WakuV1
|
||||
|
|
|
@ -206,10 +206,6 @@ func (w *gethWakuV2Wrapper) RemovePubsubTopicKey(topic string) error {
|
|||
return w.waku.RemovePubsubTopicKey(topic)
|
||||
}
|
||||
|
||||
func (w *gethWakuV2Wrapper) AddStorePeer(address multiaddr.Multiaddr) (peer.ID, error) {
|
||||
return w.waku.AddStorePeer(address)
|
||||
}
|
||||
|
||||
func (w *gethWakuV2Wrapper) AddRelayPeer(address multiaddr.Multiaddr) (peer.ID, error) {
|
||||
return w.waku.AddRelayPeer(address)
|
||||
}
|
||||
|
@ -227,7 +223,7 @@ func (w *gethWakuV2Wrapper) DialPeerByID(peerID peer.ID) error {
|
|||
}
|
||||
|
||||
func (w *gethWakuV2Wrapper) ListenAddresses() ([]multiaddr.Multiaddr, error) {
|
||||
return w.waku.ListenAddresses(), nil
|
||||
return w.waku.ListenAddresses()
|
||||
}
|
||||
|
||||
func (w *gethWakuV2Wrapper) RelayPeersByTopic(topic string) (*types.PeerList, error) {
|
||||
|
|
|
@ -134,8 +134,6 @@ type Waku interface {
|
|||
|
||||
RemovePubsubTopicKey(topic string) error
|
||||
|
||||
AddStorePeer(address multiaddr.Multiaddr) (peer.ID, error)
|
||||
|
||||
AddRelayPeer(address multiaddr.Multiaddr) (peer.ID, error)
|
||||
|
||||
DialPeer(address multiaddr.Multiaddr) error
|
||||
|
|
|
@ -26,7 +26,7 @@ in mkShell {
|
|||
|
||||
buildInputs = with pkgs; [
|
||||
git jq which
|
||||
go golangci-lint go-junit-report gopls go-bindata gomobileMod codecov-cli go-generate-fast
|
||||
go golangci-lint go-junit-report gopls go-bindata gomobileMod codecov-cli go-generate-fast openssl
|
||||
mockgen protobuf3_20 protoc-gen-go gotestsum go-modvendor openjdk
|
||||
] ++ lib.optionals (stdenv.isDarwin) [ xcodeWrapper ];
|
||||
|
||||
|
|
|
@ -12,7 +12,6 @@ import (
|
|||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
"github.com/status-im/status-go/server"
|
||||
"github.com/status-im/status-go/signal"
|
||||
"github.com/status-im/status-go/transactions"
|
||||
|
@ -338,7 +337,7 @@ func (b *StatusNode) wakuV2Service(nodeConfig *params.NodeConfig) (*wakuv2.Waku,
|
|||
Nameserver: nodeConfig.WakuV2Config.Nameserver,
|
||||
UDPPort: nodeConfig.WakuV2Config.UDPPort,
|
||||
AutoUpdate: nodeConfig.WakuV2Config.AutoUpdate,
|
||||
DefaultShardPubsubTopic: shard.DefaultShardPubsubTopic(),
|
||||
DefaultShardPubsubTopic: wakuv2.DefaultShardPubsubTopic(),
|
||||
TelemetryServerURL: nodeConfig.WakuV2Config.TelemetryServerURL,
|
||||
ClusterID: nodeConfig.ClusterConfig.ClusterID,
|
||||
EnableMissingMessageVerification: nodeConfig.WakuV2Config.EnableMissingMessageVerification,
|
||||
|
|
|
@ -23,12 +23,12 @@ import (
|
|||
"github.com/status-im/status-go/eth-node/types"
|
||||
"github.com/status-im/status-go/images"
|
||||
"github.com/status-im/status-go/protocol/common"
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
community_token "github.com/status-im/status-go/protocol/communities/token"
|
||||
"github.com/status-im/status-go/protocol/protobuf"
|
||||
"github.com/status-im/status-go/protocol/requests"
|
||||
"github.com/status-im/status-go/protocol/v1"
|
||||
"github.com/status-im/status-go/server"
|
||||
"github.com/status-im/status-go/wakuv2"
|
||||
)
|
||||
|
||||
const signatureLength = 65
|
||||
|
@ -55,7 +55,7 @@ type Config struct {
|
|||
RequestsToJoin []*RequestToJoin
|
||||
MemberIdentity *ecdsa.PrivateKey
|
||||
EventsData *EventsData
|
||||
Shard *shard.Shard
|
||||
Shard *wakuv2.Shard
|
||||
PubsubTopicPrivateKey *ecdsa.PrivateKey
|
||||
LastOpenedAt int64
|
||||
}
|
||||
|
@ -172,7 +172,7 @@ func (o *Community) MarshalPublicAPIJSON() ([]byte, error) {
|
|||
ActiveMembersCount uint64 `json:"activeMembersCount"`
|
||||
PubsubTopic string `json:"pubsubTopic"`
|
||||
PubsubTopicKey string `json:"pubsubTopicKey"`
|
||||
Shard *shard.Shard `json:"shard"`
|
||||
Shard *wakuv2.Shard `json:"shard"`
|
||||
}{
|
||||
ID: o.ID(),
|
||||
Verified: o.config.Verified,
|
||||
|
@ -308,7 +308,7 @@ func (o *Community) MarshalJSON() ([]byte, error) {
|
|||
ActiveMembersCount uint64 `json:"activeMembersCount"`
|
||||
PubsubTopic string `json:"pubsubTopic"`
|
||||
PubsubTopicKey string `json:"pubsubTopicKey"`
|
||||
Shard *shard.Shard `json:"shard"`
|
||||
Shard *wakuv2.Shard `json:"shard"`
|
||||
LastOpenedAt int64 `json:"lastOpenedAt"`
|
||||
Clock uint64 `json:"clock"`
|
||||
}{
|
||||
|
@ -461,7 +461,7 @@ func (o *Community) DescriptionText() string {
|
|||
return ""
|
||||
}
|
||||
|
||||
func (o *Community) Shard() *shard.Shard {
|
||||
func (o *Community) Shard() *wakuv2.Shard {
|
||||
if o != nil && o.config != nil {
|
||||
return o.config.Shard
|
||||
}
|
||||
|
|
|
@ -31,7 +31,6 @@ import (
|
|||
multiaccountscommon "github.com/status-im/status-go/multiaccounts/common"
|
||||
"github.com/status-im/status-go/params"
|
||||
"github.com/status-im/status-go/protocol/common"
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
community_token "github.com/status-im/status-go/protocol/communities/token"
|
||||
"github.com/status-im/status-go/protocol/encryption"
|
||||
"github.com/status-im/status-go/protocol/ens"
|
||||
|
@ -46,6 +45,7 @@ import (
|
|||
"github.com/status-im/status-go/services/wallet/token"
|
||||
"github.com/status-im/status-go/services/wallet/wallettypes"
|
||||
"github.com/status-im/status-go/signal"
|
||||
"github.com/status-im/status-go/wakuv2"
|
||||
)
|
||||
|
||||
type Publisher interface {
|
||||
|
@ -768,8 +768,8 @@ func (m *Manager) All() ([]*Community, error) {
|
|||
}
|
||||
|
||||
type CommunityShard struct {
|
||||
CommunityID string `json:"communityID"`
|
||||
Shard *shard.Shard `json:"shard"`
|
||||
CommunityID string `json:"communityID"`
|
||||
Shard *wakuv2.Shard `json:"shard"`
|
||||
}
|
||||
|
||||
type CuratedCommunities struct {
|
||||
|
@ -1577,7 +1577,7 @@ func (m *Manager) DeleteCommunity(id types.HexBytes) error {
|
|||
return m.persistence.DeleteCommunitySettings(id)
|
||||
}
|
||||
|
||||
func (m *Manager) updateShard(community *Community, shard *shard.Shard, clock uint64) error {
|
||||
func (m *Manager) updateShard(community *Community, shard *wakuv2.Shard, clock uint64) error {
|
||||
community.config.Shard = shard
|
||||
if shard == nil {
|
||||
return m.persistence.DeleteCommunityShard(community.ID())
|
||||
|
@ -1586,7 +1586,7 @@ func (m *Manager) updateShard(community *Community, shard *shard.Shard, clock ui
|
|||
return m.persistence.SaveCommunityShard(community.ID(), shard, clock)
|
||||
}
|
||||
|
||||
func (m *Manager) UpdateShard(community *Community, shard *shard.Shard, clock uint64) error {
|
||||
func (m *Manager) UpdateShard(community *Community, shard *wakuv2.Shard, clock uint64) error {
|
||||
m.communityLock.Lock(community.ID())
|
||||
defer m.communityLock.Unlock(community.ID())
|
||||
|
||||
|
@ -1594,7 +1594,7 @@ func (m *Manager) UpdateShard(community *Community, shard *shard.Shard, clock ui
|
|||
}
|
||||
|
||||
// SetShard assigns a shard to a community
|
||||
func (m *Manager) SetShard(communityID types.HexBytes, shard *shard.Shard) (*Community, error) {
|
||||
func (m *Manager) SetShard(communityID types.HexBytes, shard *wakuv2.Shard) (*Community, error) {
|
||||
m.communityLock.Lock(communityID)
|
||||
defer m.communityLock.Unlock(communityID)
|
||||
|
||||
|
@ -2207,11 +2207,11 @@ func (m *Manager) HandleCommunityDescriptionMessage(signer *ecdsa.PublicKey, des
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var cShard *shard.Shard
|
||||
var cShard *wakuv2.Shard
|
||||
if communityShard == nil {
|
||||
cShard = &shard.Shard{Cluster: shard.MainStatusShardCluster, Index: shard.DefaultShardIndex}
|
||||
cShard = &wakuv2.Shard{Cluster: wakuv2.MainStatusShardCluster, Index: wakuv2.DefaultShardIndex}
|
||||
} else {
|
||||
cShard = shard.FromProtobuff(communityShard)
|
||||
cShard = wakuv2.FromProtobuff(communityShard)
|
||||
}
|
||||
config := Config{
|
||||
CommunityDescription: processedDescription,
|
||||
|
@ -3996,11 +3996,11 @@ func (m *Manager) GetByIDString(idString string) (*Community, error) {
|
|||
return m.GetByID(id)
|
||||
}
|
||||
|
||||
func (m *Manager) GetCommunityShard(communityID types.HexBytes) (*shard.Shard, error) {
|
||||
func (m *Manager) GetCommunityShard(communityID types.HexBytes) (*wakuv2.Shard, error) {
|
||||
return m.persistence.GetCommunityShard(communityID)
|
||||
}
|
||||
|
||||
func (m *Manager) SaveCommunityShard(communityID types.HexBytes, shard *shard.Shard, clock uint64) error {
|
||||
func (m *Manager) SaveCommunityShard(communityID types.HexBytes, shard *wakuv2.Shard, clock uint64) error {
|
||||
m.communityLock.Lock(communityID)
|
||||
defer m.communityLock.Unlock(communityID)
|
||||
|
||||
|
|
|
@ -16,11 +16,11 @@ import (
|
|||
"github.com/status-im/status-go/eth-node/crypto"
|
||||
"github.com/status-im/status-go/eth-node/types"
|
||||
"github.com/status-im/status-go/protocol/common"
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
"github.com/status-im/status-go/protocol/communities/token"
|
||||
"github.com/status-im/status-go/protocol/encryption"
|
||||
"github.com/status-im/status-go/protocol/protobuf"
|
||||
"github.com/status-im/status-go/services/wallet/bigint"
|
||||
"github.com/status-im/status-go/wakuv2"
|
||||
)
|
||||
|
||||
type Persistence struct {
|
||||
|
@ -1766,7 +1766,7 @@ func (p *Persistence) AllNonApprovedCommunitiesRequestsToJoin() ([]*RequestToJoi
|
|||
return nonApprovedRequestsToJoin, nil
|
||||
}
|
||||
|
||||
func (p *Persistence) SaveCommunityShard(communityID types.HexBytes, shard *shard.Shard, clock uint64) error {
|
||||
func (p *Persistence) SaveCommunityShard(communityID types.HexBytes, shard *wakuv2.Shard, clock uint64) error {
|
||||
var cluster, index *uint16
|
||||
|
||||
if shard != nil {
|
||||
|
@ -1801,7 +1801,7 @@ func (p *Persistence) SaveCommunityShard(communityID types.HexBytes, shard *shar
|
|||
}
|
||||
|
||||
// if data will not be found, will return sql.ErrNoRows. Must be handled on the caller side
|
||||
func (p *Persistence) GetCommunityShard(communityID types.HexBytes) (*shard.Shard, error) {
|
||||
func (p *Persistence) GetCommunityShard(communityID types.HexBytes) (*wakuv2.Shard, error) {
|
||||
var cluster sql.NullInt64
|
||||
var index sql.NullInt64
|
||||
err := p.db.QueryRow(`SELECT shard_cluster, shard_index FROM communities_shards WHERE community_id = ?`,
|
||||
|
@ -1815,7 +1815,7 @@ func (p *Persistence) GetCommunityShard(communityID types.HexBytes) (*shard.Shar
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
return &shard.Shard{
|
||||
return &wakuv2.Shard{
|
||||
Cluster: uint16(cluster.Int64),
|
||||
Index: uint16(index.Int64),
|
||||
}, nil
|
||||
|
|
|
@ -7,8 +7,8 @@ import (
|
|||
|
||||
"github.com/status-im/status-go/eth-node/crypto"
|
||||
"github.com/status-im/status-go/protocol/common"
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
"github.com/status-im/status-go/server"
|
||||
"github.com/status-im/status-go/wakuv2"
|
||||
)
|
||||
|
||||
func communityToRecord(community *Community) (*CommunityRecord, error) {
|
||||
|
@ -118,9 +118,9 @@ func recordBundleToCommunity(
|
|||
}
|
||||
}
|
||||
|
||||
var s *shard.Shard = nil
|
||||
var s *wakuv2.Shard = nil
|
||||
if r.community.shardCluster != nil && r.community.shardIndex != nil {
|
||||
s = &shard.Shard{
|
||||
s = &wakuv2.Shard{
|
||||
Cluster: uint16(*r.community.shardCluster),
|
||||
Index: uint16(*r.community.shardIndex),
|
||||
}
|
||||
|
|
|
@ -15,13 +15,13 @@ import (
|
|||
"github.com/status-im/status-go/eth-node/crypto"
|
||||
"github.com/status-im/status-go/eth-node/types"
|
||||
"github.com/status-im/status-go/protocol/common"
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
"github.com/status-im/status-go/protocol/communities/token"
|
||||
"github.com/status-im/status-go/protocol/encryption"
|
||||
"github.com/status-im/status-go/protocol/protobuf"
|
||||
"github.com/status-im/status-go/protocol/sqlite"
|
||||
"github.com/status-im/status-go/services/wallet/bigint"
|
||||
"github.com/status-im/status-go/t/helpers"
|
||||
"github.com/status-im/status-go/wakuv2"
|
||||
)
|
||||
|
||||
func TestPersistenceSuite(t *testing.T) {
|
||||
|
@ -787,7 +787,7 @@ func (s *PersistenceSuite) TestSaveShardInfo() {
|
|||
s.Require().Nil(resultShard)
|
||||
|
||||
// not nil shard
|
||||
expectedShard := &shard.Shard{
|
||||
expectedShard := &wakuv2.Shard{
|
||||
Cluster: 1,
|
||||
Index: 2,
|
||||
}
|
||||
|
|
|
@ -25,13 +25,13 @@ import (
|
|||
"github.com/status-im/status-go/eth-node/types"
|
||||
"github.com/status-im/status-go/params"
|
||||
"github.com/status-im/status-go/protocol/common"
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
"github.com/status-im/status-go/protocol/communities"
|
||||
"github.com/status-im/status-go/protocol/protobuf"
|
||||
"github.com/status-im/status-go/protocol/requests"
|
||||
"github.com/status-im/status-go/protocol/transport"
|
||||
"github.com/status-im/status-go/protocol/tt"
|
||||
"github.com/status-im/status-go/services/wallet/thirdparty"
|
||||
"github.com/status-im/status-go/wakuv2"
|
||||
)
|
||||
|
||||
const testChainID1 = 1
|
||||
|
@ -488,11 +488,12 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestBecomeMemberPermissions(
|
|||
cfg := testWakuV2Config{
|
||||
logger: s.logger.Named("store-node-waku"),
|
||||
enableStore: false,
|
||||
clusterID: shard.MainStatusShardCluster,
|
||||
clusterID: wakuv2.MainStatusShardCluster,
|
||||
}
|
||||
wakuStoreNode := NewTestWakuV2(&s.Suite, cfg)
|
||||
|
||||
storeNodeListenAddresses := wakuStoreNode.ListenAddresses()
|
||||
storeNodeListenAddresses, err := wakuStoreNode.ListenAddresses()
|
||||
s.Require().NoError(err)
|
||||
s.Require().LessOrEqual(1, len(storeNodeListenAddresses))
|
||||
|
||||
storeNodeAddress := storeNodeListenAddresses[0]
|
||||
|
|
|
@ -8,8 +8,8 @@ import (
|
|||
"github.com/status-im/status-go/api/multiformat"
|
||||
"github.com/status-im/status-go/images"
|
||||
"github.com/status-im/status-go/protocol/common"
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
"github.com/status-im/status-go/protocol/communities"
|
||||
"github.com/status-im/status-go/wakuv2"
|
||||
)
|
||||
|
||||
type StatusUnfurler struct {
|
||||
|
@ -83,7 +83,7 @@ func (u *StatusUnfurler) buildContactData(publicKey string) (*common.StatusConta
|
|||
return c, nil
|
||||
}
|
||||
|
||||
func (u *StatusUnfurler) buildCommunityData(communityID string, shard *shard.Shard) (*communities.Community, *common.StatusCommunityLinkPreview, error) {
|
||||
func (u *StatusUnfurler) buildCommunityData(communityID string, shard *wakuv2.Shard) (*communities.Community, *common.StatusCommunityLinkPreview, error) {
|
||||
// This automatically checks the database
|
||||
community, err := u.m.FetchCommunity(&FetchCommunityRequest{
|
||||
CommunityKey: communityID,
|
||||
|
@ -108,7 +108,7 @@ func (u *StatusUnfurler) buildCommunityData(communityID string, shard *shard.Sha
|
|||
return community, statusCommunityLinkPreviews, nil
|
||||
}
|
||||
|
||||
func (u *StatusUnfurler) buildChannelData(channelUUID string, communityID string, communityShard *shard.Shard) (*common.StatusCommunityChannelLinkPreview, error) {
|
||||
func (u *StatusUnfurler) buildChannelData(channelUUID string, communityID string, communityShard *wakuv2.Shard) (*common.StatusCommunityChannelLinkPreview, error) {
|
||||
community, communityData, err := u.buildCommunityData(communityID, communityShard)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to build channel community data: %w", err)
|
||||
|
|
|
@ -830,18 +830,11 @@ func (m *Messenger) Start() (*MessengerResponse, error) {
|
|||
}
|
||||
response := &MessengerResponse{}
|
||||
|
||||
storenodes, err := m.AllMailservers()
|
||||
response.Mailservers, err = m.AllMailservers()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = m.setupStorenodes(storenodes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
response.Mailservers = storenodes
|
||||
|
||||
m.transport.SetStorenodeConfigProvider(m)
|
||||
|
||||
if err := m.communityStorenodes.ReloadFromDB(); err != nil {
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
|
||||
gocommon "github.com/status-im/status-go/common"
|
||||
utils "github.com/status-im/status-go/common"
|
||||
"github.com/status-im/status-go/wakuv2"
|
||||
|
||||
"github.com/status-im/status-go/account"
|
||||
multiaccountscommon "github.com/status-im/status-go/multiaccounts/common"
|
||||
|
@ -34,7 +35,6 @@ import (
|
|||
"github.com/status-im/status-go/images"
|
||||
"github.com/status-im/status-go/multiaccounts/accounts"
|
||||
"github.com/status-im/status-go/protocol/common"
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
"github.com/status-im/status-go/protocol/communities"
|
||||
"github.com/status-im/status-go/protocol/communities/token"
|
||||
"github.com/status-im/status-go/protocol/discord"
|
||||
|
@ -89,10 +89,10 @@ const (
|
|||
|
||||
type FetchCommunityRequest struct {
|
||||
// CommunityKey should be either a public or a private community key
|
||||
CommunityKey string `json:"communityKey"`
|
||||
Shard *shard.Shard `json:"shard"`
|
||||
TryDatabase bool `json:"tryDatabase"`
|
||||
WaitForResponse bool `json:"waitForResponse"`
|
||||
CommunityKey string `json:"communityKey"`
|
||||
Shard *wakuv2.Shard `json:"shard"`
|
||||
TryDatabase bool `json:"tryDatabase"`
|
||||
WaitForResponse bool `json:"waitForResponse"`
|
||||
}
|
||||
|
||||
func (r *FetchCommunityRequest) Validate() error {
|
||||
|
@ -346,7 +346,7 @@ func (m *Messenger) handleCommunitiesSubscription(c chan *communities.Subscripti
|
|||
Sender: community.PrivateKey(),
|
||||
SkipEncryptionLayer: true,
|
||||
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_USER_KICKED,
|
||||
PubsubTopic: shard.DefaultNonProtectedPubsubTopic(),
|
||||
PubsubTopic: wakuv2.DefaultNonProtectedPubsubTopic(),
|
||||
}
|
||||
|
||||
_, err = m.sender.SendPrivate(context.Background(), pk, rawMessage)
|
||||
|
@ -681,7 +681,7 @@ func (m *Messenger) handleCommunitySharedAddressesRequest(state *ReceivedMessage
|
|||
CommunityID: community.ID(),
|
||||
SkipEncryptionLayer: true,
|
||||
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_SHARED_ADDRESSES_RESPONSE,
|
||||
PubsubTopic: shard.DefaultNonProtectedPubsubTopic(),
|
||||
PubsubTopic: wakuv2.DefaultNonProtectedPubsubTopic(),
|
||||
ResendType: common.ResendTypeRawMessage,
|
||||
ResendMethod: common.ResendMethodSendPrivate,
|
||||
Recipients: []*ecdsa.PublicKey{signer},
|
||||
|
@ -1044,7 +1044,7 @@ func (m *Messenger) JoinCommunity(ctx context.Context, communityID types.HexByte
|
|||
return mr, nil
|
||||
}
|
||||
|
||||
func (m *Messenger) subscribeToCommunityShard(communityID []byte, shard *shard.Shard) error {
|
||||
func (m *Messenger) subscribeToCommunityShard(communityID []byte, shard *wakuv2.Shard) error {
|
||||
if m.transport.WakuVersion() != 2 {
|
||||
return nil
|
||||
}
|
||||
|
@ -1065,7 +1065,7 @@ func (m *Messenger) subscribeToCommunityShard(communityID []byte, shard *shard.S
|
|||
return m.transport.SubscribeToPubsubTopic(pubsubTopic, pubK)
|
||||
}
|
||||
|
||||
func (m *Messenger) unsubscribeFromShard(shard *shard.Shard) error {
|
||||
func (m *Messenger) unsubscribeFromShard(shard *wakuv2.Shard) error {
|
||||
if m.transport.WakuVersion() != 2 {
|
||||
return nil
|
||||
}
|
||||
|
@ -1494,7 +1494,7 @@ func (m *Messenger) RequestToJoinCommunity(request *requests.RequestToJoinCommun
|
|||
ResendType: common.ResendTypeRawMessage,
|
||||
SkipEncryptionLayer: true,
|
||||
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN,
|
||||
PubsubTopic: shard.DefaultNonProtectedPubsubTopic(),
|
||||
PubsubTopic: wakuv2.DefaultNonProtectedPubsubTopic(),
|
||||
Priority: &common.HighPriority,
|
||||
}
|
||||
|
||||
|
@ -1872,7 +1872,7 @@ func (m *Messenger) CancelRequestToJoinCommunity(ctx context.Context, request *r
|
|||
CommunityID: community.ID(),
|
||||
SkipEncryptionLayer: true,
|
||||
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_CANCEL_REQUEST_TO_JOIN,
|
||||
PubsubTopic: shard.DefaultNonProtectedPubsubTopic(),
|
||||
PubsubTopic: wakuv2.DefaultNonProtectedPubsubTopic(),
|
||||
ResendType: common.ResendTypeRawMessage,
|
||||
Priority: &common.HighPriority,
|
||||
}
|
||||
|
@ -2028,7 +2028,7 @@ func (m *Messenger) acceptRequestToJoinCommunity(requestToJoin *communities.Requ
|
|||
CommunityID: community.ID(),
|
||||
SkipEncryptionLayer: true,
|
||||
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN_RESPONSE,
|
||||
PubsubTopic: shard.DefaultNonProtectedPubsubTopic(),
|
||||
PubsubTopic: wakuv2.DefaultNonProtectedPubsubTopic(),
|
||||
ResendType: common.ResendTypeRawMessage,
|
||||
ResendMethod: common.ResendMethodSendPrivate,
|
||||
Recipients: []*ecdsa.PublicKey{pk},
|
||||
|
@ -2503,7 +2503,7 @@ func (m *Messenger) DefaultFilters(o *communities.Community) []transport.Filters
|
|||
{ChatID: updatesChannelID, PubsubTopic: communityPubsubTopic},
|
||||
{ChatID: mlChannelID, PubsubTopic: communityPubsubTopic},
|
||||
{ChatID: memberUpdateChannelID, PubsubTopic: communityPubsubTopic},
|
||||
{ChatID: uncompressedPubKey, PubsubTopic: shard.DefaultNonProtectedPubsubTopic()},
|
||||
{ChatID: uncompressedPubKey, PubsubTopic: wakuv2.DefaultNonProtectedPubsubTopic()},
|
||||
}
|
||||
|
||||
return filters
|
||||
|
@ -3562,7 +3562,7 @@ func (m *Messenger) HandleCommunityShardKey(state *ReceivedMessageState, message
|
|||
}
|
||||
|
||||
func (m *Messenger) handleCommunityShardAndFiltersFromProto(community *communities.Community, message *protobuf.CommunityShardKey) error {
|
||||
err := m.communitiesManager.UpdateShard(community, shard.FromProtobuff(message.Shard), message.Clock)
|
||||
err := m.communitiesManager.UpdateShard(community, wakuv2.FromProtobuff(message.Shard), message.Clock)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -3584,7 +3584,7 @@ func (m *Messenger) handleCommunityShardAndFiltersFromProto(community *communiti
|
|||
}
|
||||
|
||||
// Unsubscribing from existing shard
|
||||
if community.Shard() != nil && community.Shard() != shard.FromProtobuff(message.GetShard()) {
|
||||
if community.Shard() != nil && community.Shard() != wakuv2.FromProtobuff(message.GetShard()) {
|
||||
err := m.unsubscribeFromShard(community.Shard())
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -3598,7 +3598,7 @@ func (m *Messenger) handleCommunityShardAndFiltersFromProto(community *communiti
|
|||
return err
|
||||
}
|
||||
// Update community filters in case of change of shard
|
||||
if community.Shard() != shard.FromProtobuff(message.GetShard()) {
|
||||
if community.Shard() != wakuv2.FromProtobuff(message.GetShard()) {
|
||||
err = m.UpdateCommunityFilters(community)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -12,11 +12,11 @@ import (
|
|||
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
|
||||
"github.com/status-im/status-go/eth-node/types"
|
||||
"github.com/status-im/status-go/protocol/common"
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
"github.com/status-im/status-go/protocol/communities"
|
||||
"github.com/status-im/status-go/protocol/protobuf"
|
||||
"github.com/status-im/status-go/protocol/requests"
|
||||
"github.com/status-im/status-go/protocol/tt"
|
||||
"github.com/status-im/status-go/wakuv2"
|
||||
)
|
||||
|
||||
func TestMessengerCommunitiesShardingSuite(t *testing.T) {
|
||||
|
@ -108,7 +108,7 @@ func (s *MessengerCommunitiesShardingSuite) TearDownTest() {
|
|||
_ = s.logger.Sync()
|
||||
}
|
||||
|
||||
func (s *MessengerCommunitiesShardingSuite) testPostToCommunityChat(shard *shard.Shard, community *communities.Community, chat *Chat) {
|
||||
func (s *MessengerCommunitiesShardingSuite) testPostToCommunityChat(shard *wakuv2.Shard, community *communities.Community, chat *Chat) {
|
||||
_, err := s.owner.SetCommunityShard(&requests.SetCommunityShard{
|
||||
CommunityID: community.ID(),
|
||||
Shard: shard,
|
||||
|
@ -144,8 +144,8 @@ func (s *MessengerCommunitiesShardingSuite) TestPostToCommunityChat() {
|
|||
|
||||
// Members should be able to receive messages in a community with sharding enabled.
|
||||
{
|
||||
shard := &shard.Shard{
|
||||
Cluster: shard.MainStatusShardCluster,
|
||||
shard := &wakuv2.Shard{
|
||||
Cluster: wakuv2.MainStatusShardCluster,
|
||||
Index: 128,
|
||||
}
|
||||
s.testPostToCommunityChat(shard, community, chat)
|
||||
|
@ -153,8 +153,8 @@ func (s *MessengerCommunitiesShardingSuite) TestPostToCommunityChat() {
|
|||
|
||||
// Members should be able to receive messages in a community where the sharding configuration has been edited.
|
||||
{
|
||||
shard := &shard.Shard{
|
||||
Cluster: shard.MainStatusShardCluster,
|
||||
shard := &wakuv2.Shard{
|
||||
Cluster: wakuv2.MainStatusShardCluster,
|
||||
Index: 256,
|
||||
}
|
||||
s.testPostToCommunityChat(shard, community, chat)
|
||||
|
@ -162,8 +162,8 @@ func (s *MessengerCommunitiesShardingSuite) TestPostToCommunityChat() {
|
|||
|
||||
// Members should continue to receive messages in a community if it is moved back to default shard.
|
||||
{
|
||||
shard := &shard.Shard{
|
||||
Cluster: shard.MainStatusShardCluster,
|
||||
shard := &wakuv2.Shard{
|
||||
Cluster: wakuv2.MainStatusShardCluster,
|
||||
Index: 32,
|
||||
}
|
||||
s.testPostToCommunityChat(shard, community, chat)
|
||||
|
@ -176,8 +176,8 @@ func (s *MessengerCommunitiesShardingSuite) TestIgnoreOutdatedShardKey() {
|
|||
advertiseCommunityToUserOldWay(&s.Suite, community, s.owner, s.alice)
|
||||
joinCommunity(&s.Suite, community.ID(), s.owner, s.alice, alicePassword, []string{aliceAddress1})
|
||||
|
||||
shard := &shard.Shard{
|
||||
Cluster: shard.MainStatusShardCluster,
|
||||
shard := &wakuv2.Shard{
|
||||
Cluster: wakuv2.MainStatusShardCluster,
|
||||
Index: 128,
|
||||
}
|
||||
|
||||
|
|
|
@ -12,11 +12,11 @@ import (
|
|||
"github.com/status-im/status-go/eth-node/crypto"
|
||||
"github.com/status-im/status-go/eth-node/types"
|
||||
"github.com/status-im/status-go/protocol/common"
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
"github.com/status-im/status-go/protocol/communities"
|
||||
"github.com/status-im/status-go/protocol/protobuf"
|
||||
"github.com/status-im/status-go/protocol/transport"
|
||||
v1protocol "github.com/status-im/status-go/protocol/v1"
|
||||
"github.com/status-im/status-go/wakuv2"
|
||||
)
|
||||
|
||||
func (m *Messenger) sendPublicCommunityShardInfo(community *communities.Community) error {
|
||||
|
@ -57,7 +57,7 @@ func (m *Messenger) sendPublicCommunityShardInfo(community *communities.Communit
|
|||
// we don't want to wrap in an encryption layer message
|
||||
SkipEncryptionLayer: true,
|
||||
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_PUBLIC_SHARD_INFO,
|
||||
PubsubTopic: shard.DefaultNonProtectedPubsubTopic(), // it must be sent always to default shard pubsub topic
|
||||
PubsubTopic: wakuv2.DefaultNonProtectedPubsubTopic(), // it must be sent always to default shard pubsub topic
|
||||
Priority: &common.HighPriority,
|
||||
}
|
||||
|
||||
|
@ -89,7 +89,7 @@ func (m *Messenger) HandleCommunityPublicShardInfo(state *ReceivedMessageState,
|
|||
return err
|
||||
}
|
||||
|
||||
err = m.communitiesManager.SaveCommunityShard(publicShardInfo.CommunityId, shard.FromProtobuff(publicShardInfo.Shard), publicShardInfo.Clock)
|
||||
err = m.communitiesManager.SaveCommunityShard(publicShardInfo.CommunityId, wakuv2.FromProtobuff(publicShardInfo.Shard), publicShardInfo.Clock)
|
||||
if err != nil && err != communities.ErrOldShardInfo {
|
||||
logError(err)
|
||||
return err
|
||||
|
|
|
@ -12,9 +12,9 @@ import (
|
|||
|
||||
gocommon "github.com/status-im/status-go/common"
|
||||
"github.com/status-im/status-go/deprecation"
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
"github.com/status-im/status-go/protocol/communities"
|
||||
"github.com/status-im/status-go/protocol/transport"
|
||||
"github.com/status-im/status-go/wakuv2"
|
||||
)
|
||||
|
||||
// InitFilters analyzes chats and contacts in order to setup filters
|
||||
|
@ -24,7 +24,7 @@ func (m *Messenger) InitFilters() error {
|
|||
rand.Seed(time.Now().Unix())
|
||||
|
||||
// Community requests will arrive in this pubsub topic
|
||||
if err := m.SubscribeToPubsubTopic(shard.DefaultNonProtectedPubsubTopic(), nil); err != nil {
|
||||
if err := m.SubscribeToPubsubTopic(wakuv2.DefaultNonProtectedPubsubTopic(), nil); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -4,8 +4,6 @@ import (
|
|||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
|
||||
gocommon "github.com/status-im/status-go/common"
|
||||
"github.com/status-im/status-go/params"
|
||||
"github.com/status-im/status-go/services/mailservers"
|
||||
|
@ -39,28 +37,6 @@ func (m *Messenger) AllMailservers() ([]mailservers.Mailserver, error) {
|
|||
return allMailservers, nil
|
||||
}
|
||||
|
||||
func (m *Messenger) setupStorenodes(storenodes []mailservers.Mailserver) error {
|
||||
if m.transport.WakuVersion() != 2 {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, storenode := range storenodes {
|
||||
|
||||
peerInfo, err := storenode.PeerInfo()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, addr := range utils.EncapsulatePeerID(peerInfo.ID, peerInfo.Addrs...) {
|
||||
_, err := m.transport.AddStorePeer(addr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Messenger) getFleet() (string, error) {
|
||||
var fleet string
|
||||
dbFleet, err := m.settings.GetFleet()
|
||||
|
|
|
@ -11,10 +11,6 @@ import (
|
|||
"github.com/status-im/status-go/eth-node/types"
|
||||
)
|
||||
|
||||
func (m *Messenger) AddStorePeer(address multiaddr.Multiaddr) (peer.ID, error) {
|
||||
return m.transport.AddStorePeer(address)
|
||||
}
|
||||
|
||||
func (m *Messenger) AddRelayPeer(address multiaddr.Multiaddr) (peer.ID, error) {
|
||||
return m.transport.AddRelayPeer(address)
|
||||
}
|
||||
|
|
|
@ -15,11 +15,11 @@ import (
|
|||
"github.com/status-im/status-go/eth-node/crypto"
|
||||
"github.com/status-im/status-go/eth-node/types"
|
||||
"github.com/status-im/status-go/protocol/common"
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
"github.com/status-im/status-go/protocol/communities"
|
||||
"github.com/status-im/status-go/protocol/protobuf"
|
||||
"github.com/status-im/status-go/protocol/requests"
|
||||
"github.com/status-im/status-go/services/utils"
|
||||
"github.com/status-im/status-go/wakuv2"
|
||||
)
|
||||
|
||||
type CommunityURLData struct {
|
||||
|
@ -49,7 +49,7 @@ type URLDataResponse struct {
|
|||
Community *CommunityURLData `json:"community"`
|
||||
Channel *CommunityChannelURLData `json:"channel"`
|
||||
Contact *ContactURLData `json:"contact"`
|
||||
Shard *shard.Shard `json:"shard,omitempty"`
|
||||
Shard *wakuv2.Shard `json:"shard,omitempty"`
|
||||
}
|
||||
|
||||
const baseShareURL = "https://status.app"
|
||||
|
@ -204,7 +204,7 @@ func parseCommunityURLWithData(data string, chatKey string) (*URLDataResponse, e
|
|||
TagIndices: tagIndices,
|
||||
CommunityID: types.EncodeHex(communityID),
|
||||
},
|
||||
Shard: shard.FromProtobuff(urlDataProto.Shard),
|
||||
Shard: wakuv2.FromProtobuff(urlDataProto.Shard),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -380,7 +380,7 @@ func parseCommunityChannelURLWithData(data string, chatKey string) (*URLDataResp
|
|||
Color: channelProto.Color,
|
||||
ChannelUUID: channelProto.Uuid,
|
||||
},
|
||||
Shard: shard.FromProtobuff(urlDataProto.Shard),
|
||||
Shard: wakuv2.FromProtobuff(urlDataProto.Shard),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -8,17 +8,16 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/waku-org/go-waku/waku/v2/api/history"
|
||||
|
||||
gocommon "github.com/status-im/status-go/common"
|
||||
"github.com/status-im/status-go/eth-node/crypto"
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
"github.com/waku-org/go-waku/waku/v2/api/history"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/status-im/status-go/eth-node/types"
|
||||
"github.com/status-im/status-go/protocol/communities"
|
||||
"github.com/status-im/status-go/protocol/transport"
|
||||
"github.com/status-im/status-go/wakuv2"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -84,7 +83,7 @@ func (m *StoreNodeRequestManager) FetchCommunity(ctx context.Context, community
|
|||
zap.Any("community", community),
|
||||
zap.Any("config", cfg))
|
||||
|
||||
requestCommunity := func(communityID string, shard *shard.Shard) (*communities.Community, StoreNodeRequestStats, error) {
|
||||
requestCommunity := func(communityID string, shard *wakuv2.Shard) (*communities.Community, StoreNodeRequestStats, error) {
|
||||
channel, err := m.subscribeToRequest(ctx, storeNodeCommunityRequest, communityID, shard, cfg)
|
||||
if err != nil {
|
||||
return nil, StoreNodeRequestStats{}, fmt.Errorf("failed to create a request for community: %w", err)
|
||||
|
@ -102,7 +101,7 @@ func (m *StoreNodeRequestManager) FetchCommunity(ctx context.Context, community
|
|||
communityShard := community.Shard
|
||||
if communityShard == nil {
|
||||
id := transport.CommunityShardInfoTopic(community.CommunityID)
|
||||
fetchedShard, err := m.subscribeToRequest(ctx, storeNodeShardRequest, id, shard.DefaultNonProtectedShard(), cfg)
|
||||
fetchedShard, err := m.subscribeToRequest(ctx, storeNodeShardRequest, id, wakuv2.DefaultNonProtectedShard(), cfg)
|
||||
if err != nil {
|
||||
return nil, StoreNodeRequestStats{}, fmt.Errorf("failed to create a shard info request: %w", err)
|
||||
}
|
||||
|
@ -180,7 +179,7 @@ func (m *StoreNodeRequestManager) FetchContact(ctx context.Context, contactID st
|
|||
// subscribeToRequest checks if a request for given community/contact is already in progress, creates and installs
|
||||
// a new one if not found, and returns a subscription to the result of the found/started request.
|
||||
// The subscription can then be used to get the result of the request, this could be either a community/contact or an error.
|
||||
func (m *StoreNodeRequestManager) subscribeToRequest(ctx context.Context, requestType storeNodeRequestType, dataID string, shard *shard.Shard, cfg StoreNodeRequestConfig) (storeNodeResponseSubscription, error) {
|
||||
func (m *StoreNodeRequestManager) subscribeToRequest(ctx context.Context, requestType storeNodeRequestType, dataID string, shard *wakuv2.Shard, cfg StoreNodeRequestConfig) (storeNodeResponseSubscription, error) {
|
||||
// It's important to unlock only after getting the subscription channel.
|
||||
// We also lock `activeRequestsLock` during finalizing the requests. This ensures that the subscription
|
||||
// created in this function will get the result even if the requests proceeds faster than this function ends.
|
||||
|
@ -235,7 +234,7 @@ func (m *StoreNodeRequestManager) newStoreNodeRequest(ctx context.Context) *stor
|
|||
|
||||
// getFilter checks if a filter for a given community is already created and creates one of not found.
|
||||
// Returns the found/created filter, a flag if the filter was created by the function and an error.
|
||||
func (m *StoreNodeRequestManager) getFilter(requestType storeNodeRequestType, dataID string, shard *shard.Shard) (*transport.Filter, bool, error) {
|
||||
func (m *StoreNodeRequestManager) getFilter(requestType storeNodeRequestType, dataID string, shard *wakuv2.Shard) (*transport.Filter, bool, error) {
|
||||
// First check if such filter already exists.
|
||||
filter := m.messenger.transport.FilterByChatID(dataID)
|
||||
if filter != nil {
|
||||
|
@ -338,7 +337,7 @@ type storeNodeRequestResult struct {
|
|||
// One of data fields (community or contact) will be present depending on request type
|
||||
community *communities.Community
|
||||
contact *Contact
|
||||
shard *shard.Shard
|
||||
shard *wakuv2.Shard
|
||||
}
|
||||
|
||||
type storeNodeResponseSubscription = chan storeNodeRequestResult
|
||||
|
|
|
@ -10,9 +10,9 @@ import (
|
|||
"github.com/multiformats/go-multiaddr"
|
||||
|
||||
"github.com/status-im/status-go/protocol/storenodes"
|
||||
"github.com/status-im/status-go/wakuv2"
|
||||
|
||||
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
"github.com/status-im/status-go/protocol/communities"
|
||||
"github.com/status-im/status-go/protocol/tt"
|
||||
|
||||
|
@ -92,11 +92,12 @@ func (s *MessengerStoreNodeCommunitySuite) createStore(name string) (*waku2.Waku
|
|||
cfg := testWakuV2Config{
|
||||
logger: s.logger.Named(name),
|
||||
enableStore: true,
|
||||
clusterID: shard.MainStatusShardCluster,
|
||||
clusterID: wakuv2.MainStatusShardCluster,
|
||||
}
|
||||
|
||||
storeNode := NewTestWakuV2(&s.Suite, cfg)
|
||||
addresses := storeNode.ListenAddresses()
|
||||
addresses, err := storeNode.ListenAddresses()
|
||||
s.Require().NoError(err)
|
||||
s.Require().GreaterOrEqual(len(addresses), 1, "no storenode listen address")
|
||||
return storeNode, addresses[0]
|
||||
}
|
||||
|
@ -109,7 +110,7 @@ func (s *MessengerStoreNodeCommunitySuite) newMessenger(name string, storenodeAd
|
|||
cfg := testWakuV2Config{
|
||||
logger: logger,
|
||||
enableStore: false,
|
||||
clusterID: shard.MainStatusShardCluster,
|
||||
clusterID: wakuv2.MainStatusShardCluster,
|
||||
}
|
||||
wakuV2 := NewTestWakuV2(&s.Suite, cfg)
|
||||
wakuV2Wrapper := gethbridge.NewGethWakuV2Wrapper(wakuV2)
|
||||
|
|
|
@ -24,7 +24,6 @@ import (
|
|||
"github.com/status-im/status-go/multiaccounts/accounts"
|
||||
"github.com/status-im/status-go/params"
|
||||
"github.com/status-im/status-go/protocol/common"
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
"github.com/status-im/status-go/protocol/communities"
|
||||
"github.com/status-im/status-go/protocol/communities/token"
|
||||
"github.com/status-im/status-go/protocol/protobuf"
|
||||
|
@ -34,6 +33,7 @@ import (
|
|||
mailserversDB "github.com/status-im/status-go/services/mailservers"
|
||||
"github.com/status-im/status-go/services/wallet/bigint"
|
||||
"github.com/status-im/status-go/t/helpers"
|
||||
"github.com/status-im/status-go/wakuv2"
|
||||
waku2 "github.com/status-im/status-go/wakuv2"
|
||||
wakuV2common "github.com/status-im/status-go/wakuv2/common"
|
||||
)
|
||||
|
@ -160,7 +160,7 @@ func (s *MessengerStoreNodeRequestSuite) createStore() {
|
|||
cfg := testWakuV2Config{
|
||||
logger: s.logger.Named("store-waku"),
|
||||
enableStore: true,
|
||||
clusterID: shard.MainStatusShardCluster,
|
||||
clusterID: wakuv2.MainStatusShardCluster,
|
||||
}
|
||||
|
||||
s.wakuStoreNode = NewTestWakuV2(&s.Suite, cfg)
|
||||
|
@ -178,7 +178,7 @@ func (s *MessengerStoreNodeRequestSuite) createOwner() {
|
|||
cfg := testWakuV2Config{
|
||||
logger: s.logger.Named("owner-waku"),
|
||||
enableStore: false,
|
||||
clusterID: shard.MainStatusShardCluster,
|
||||
clusterID: wakuv2.MainStatusShardCluster,
|
||||
}
|
||||
|
||||
wakuV2 := NewTestWakuV2(&s.Suite, cfg)
|
||||
|
@ -199,7 +199,7 @@ func (s *MessengerStoreNodeRequestSuite) createBob() {
|
|||
cfg := testWakuV2Config{
|
||||
logger: s.logger.Named("bob-waku"),
|
||||
enableStore: false,
|
||||
clusterID: shard.MainStatusShardCluster,
|
||||
clusterID: wakuv2.MainStatusShardCluster,
|
||||
}
|
||||
wakuV2 := NewTestWakuV2(&s.Suite, cfg)
|
||||
s.bobWaku = gethbridge.NewGethWakuV2Wrapper(wakuV2)
|
||||
|
@ -368,7 +368,8 @@ func (s *MessengerStoreNodeRequestSuite) waitForEnvelopes(subscription <-chan st
|
|||
}
|
||||
|
||||
func (s *MessengerStoreNodeRequestSuite) wakuListenAddress(waku *waku2.Waku) multiaddr.Multiaddr {
|
||||
addresses := waku.ListenAddresses()
|
||||
addresses, err := waku.ListenAddresses()
|
||||
s.Require().NoError(err)
|
||||
s.Require().LessOrEqual(1, len(addresses))
|
||||
return addresses[0]
|
||||
}
|
||||
|
@ -698,8 +699,8 @@ func (s *MessengerStoreNodeRequestSuite) TestRequestShardAndCommunityInfo() {
|
|||
topicPrivKey, err := crypto.GenerateKey()
|
||||
s.Require().NoError(err)
|
||||
|
||||
expectedShard := &shard.Shard{
|
||||
Cluster: shard.MainStatusShardCluster,
|
||||
expectedShard := &wakuv2.Shard{
|
||||
Cluster: wakuv2.MainStatusShardCluster,
|
||||
Index: 23,
|
||||
}
|
||||
|
||||
|
@ -843,8 +844,8 @@ type testFetchRealCommunityExampleTokenInfo struct {
|
|||
|
||||
var testFetchRealCommunityExample = []struct {
|
||||
CommunityID string
|
||||
CommunityURL string // If set, takes precedence over CommunityID
|
||||
CommunityShard *shard.Shard // WARNING: I didn't test a sharded community
|
||||
CommunityURL string // If set, takes precedence over CommunityID
|
||||
CommunityShard *wakuv2.Shard // WARNING: I didn't test a sharded community
|
||||
Fleet string
|
||||
ClusterID uint16
|
||||
UserPrivateKeyString string // When empty a new user will be created
|
||||
|
@ -865,14 +866,14 @@ var testFetchRealCommunityExample = []struct {
|
|||
CommunityID: "0x03073514d4c14a7d10ae9fc9b0f05abc904d84166a6ac80add58bf6a3542a4e50a",
|
||||
CommunityShard: nil,
|
||||
Fleet: params.FleetStatusProd,
|
||||
ClusterID: shard.MainStatusShardCluster,
|
||||
ClusterID: wakuv2.MainStatusShardCluster,
|
||||
},
|
||||
{
|
||||
// Example 3,
|
||||
// https://status.app/c/CxiACi8KFGFwIHJlcSAxIHN0dCBiZWMgbWVtEgdkc2Fkc2FkGAMiByM0MzYwREYqAxkrHAM=#zQ3shwDYZHtrLE7NqoTGjTWzWUu6hom5D4qxfskLZfgfyGRyL
|
||||
CommunityID: "0x03f64be95ed5c925022265f9250f538f65ed3dcf6e4ef6c139803dc02a3487ae7b",
|
||||
Fleet: params.FleetStatusProd,
|
||||
ClusterID: shard.MainStatusShardCluster,
|
||||
ClusterID: wakuv2.MainStatusShardCluster,
|
||||
|
||||
CheckExpectedEnvelopes: true,
|
||||
ExpectedShardEnvelopes: []string{
|
||||
|
@ -975,7 +976,7 @@ var testFetchRealCommunityExample = []struct {
|
|||
//Example 1,
|
||||
CommunityID: "0x02471dd922756a3a50b623e59cf3b99355d6587e43d5c517eb55f9aea9d3fe9fe9",
|
||||
Fleet: params.FleetStatusProd,
|
||||
ClusterID: shard.MainStatusShardCluster,
|
||||
ClusterID: wakuv2.MainStatusShardCluster,
|
||||
CheckExpectedEnvelopes: true,
|
||||
ExpectedShardEnvelopes: []string{
|
||||
"0xc3e68e838d09e0117b3f3fd27aabe5f5a509d13e9045263c78e6890953d43547",
|
||||
|
@ -1015,7 +1016,7 @@ var testFetchRealCommunityExample = []struct {
|
|||
ContractAddress: "0x21F6F5Cb75E81e5104D890D750270eD6538C50cb",
|
||||
},
|
||||
},
|
||||
ClusterID: shard.MainStatusShardCluster,
|
||||
ClusterID: wakuv2.MainStatusShardCluster,
|
||||
CheckExpectedEnvelopes: false,
|
||||
CustomOptions: []StoreNodeRequestOption{
|
||||
WithInitialPageSize(1),
|
||||
|
|
|
@ -14,11 +14,11 @@ import (
|
|||
|
||||
gocommon "github.com/status-im/status-go/common"
|
||||
"github.com/status-im/status-go/protocol/wakusync"
|
||||
"github.com/status-im/status-go/wakuv2"
|
||||
|
||||
"github.com/status-im/status-go/protocol/identity"
|
||||
|
||||
"github.com/status-im/status-go/eth-node/types"
|
||||
waku2 "github.com/status-im/status-go/wakuv2"
|
||||
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
|
@ -206,7 +206,7 @@ func WaitOnSignaledCommunityFound(m *Messenger, action func(), condition func(co
|
|||
}
|
||||
}
|
||||
|
||||
func WaitForConnectionStatus(s *suite.Suite, waku *waku2.Waku, action func() bool) {
|
||||
func WaitForConnectionStatus(s *suite.Suite, waku *wakuv2.Waku, action func() bool) {
|
||||
subscription := waku.SubscribeToConnStatusChanges()
|
||||
defer subscription.Unsubscribe()
|
||||
|
||||
|
@ -238,7 +238,7 @@ func hasAllPeers(m map[peer.ID]types.WakuV2Peer, checkSlice peer.IDSlice) bool {
|
|||
return true
|
||||
}
|
||||
|
||||
func WaitForPeersConnected(s *suite.Suite, waku *waku2.Waku, action func() peer.IDSlice) {
|
||||
func WaitForPeersConnected(s *suite.Suite, waku *wakuv2.Waku, action func() peer.IDSlice) {
|
||||
subscription := waku.SubscribeToConnStatusChanges()
|
||||
defer subscription.Unsubscribe()
|
||||
|
||||
|
|
|
@ -4,12 +4,12 @@ import (
|
|||
"errors"
|
||||
|
||||
"github.com/status-im/status-go/eth-node/types"
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
"github.com/status-im/status-go/wakuv2"
|
||||
)
|
||||
|
||||
type SetCommunityShard struct {
|
||||
CommunityID types.HexBytes `json:"communityId"`
|
||||
Shard *shard.Shard `json:"shard,omitempty"`
|
||||
Shard *wakuv2.Shard `json:"shard,omitempty"`
|
||||
PrivateKey *types.HexBytes `json:"privateKey,omitempty"`
|
||||
}
|
||||
|
||||
|
@ -19,7 +19,7 @@ func (s *SetCommunityShard) Validate() error {
|
|||
}
|
||||
if s.Shard != nil {
|
||||
// TODO: for now only MainStatusShard(16) is accepted
|
||||
if s.Shard.Cluster != shard.MainStatusShardCluster {
|
||||
if s.Shard.Cluster != wakuv2.MainStatusShardCluster {
|
||||
return errors.New("invalid shard cluster")
|
||||
}
|
||||
if s.Shard.Index > 1023 {
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
"go.uber.org/zap"
|
||||
|
||||
"github.com/status-im/status-go/eth-node/types"
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
"github.com/status-im/status-go/wakuv2"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -141,7 +141,7 @@ func (f *FiltersManager) InitPublicFilters(publicFiltersToInit []FiltersToInitia
|
|||
}
|
||||
|
||||
type CommunityFilterToInitialize struct {
|
||||
Shard *shard.Shard
|
||||
Shard *wakuv2.Shard
|
||||
PrivKey *ecdsa.PrivateKey
|
||||
}
|
||||
|
||||
|
@ -158,7 +158,7 @@ func (f *FiltersManager) InitCommunityFilters(communityFiltersToInitialize []Com
|
|||
}
|
||||
|
||||
topics := make([]string, 0)
|
||||
topics = append(topics, shard.DefaultNonProtectedPubsubTopic())
|
||||
topics = append(topics, wakuv2.DefaultNonProtectedPubsubTopic())
|
||||
topics = append(topics, communityFilter.Shard.PubsubTopic())
|
||||
|
||||
for _, pubsubTopic := range topics {
|
||||
|
|
|
@ -516,10 +516,6 @@ func (t *Transport) ENR() (*enode.Node, error) {
|
|||
return t.waku.ENR()
|
||||
}
|
||||
|
||||
func (t *Transport) AddStorePeer(address multiaddr.Multiaddr) (peer.ID, error) {
|
||||
return t.waku.AddStorePeer(address)
|
||||
}
|
||||
|
||||
func (t *Transport) AddRelayPeer(address multiaddr.Multiaddr) (peer.ID, error) {
|
||||
return t.waku.AddRelayPeer(address)
|
||||
}
|
||||
|
|
|
@ -12,7 +12,6 @@ import (
|
|||
"github.com/status-im/status-go/appdatabase"
|
||||
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
|
||||
"github.com/status-im/status-go/eth-node/types"
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
"github.com/status-im/status-go/t/helpers"
|
||||
waku2 "github.com/status-im/status-go/wakuv2"
|
||||
)
|
||||
|
@ -62,7 +61,7 @@ func NewTestWakuV2(s *suite.Suite, cfg testWakuV2Config) *waku2.Waku {
|
|||
|
||||
err = wakuNode.Start()
|
||||
if cfg.enableStore {
|
||||
err := wakuNode.SubscribeToPubsubTopic(shard.DefaultNonProtectedPubsubTopic(), nil)
|
||||
err := wakuNode.SubscribeToPubsubTopic(waku2.DefaultNonProtectedPubsubTopic(), nil)
|
||||
s.Require().NoError(err)
|
||||
}
|
||||
s.Require().NoError(err)
|
||||
|
@ -78,7 +77,7 @@ func CreateWakuV2Network(s *suite.Suite, parentLogger *zap.Logger, nodeNames []s
|
|||
nodes[i] = NewTestWakuV2(s, testWakuV2Config{
|
||||
logger: parentLogger.Named("waku-" + name),
|
||||
enableStore: false,
|
||||
clusterID: shard.MainStatusShardCluster,
|
||||
clusterID: waku2.MainStatusShardCluster,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -89,9 +88,10 @@ func CreateWakuV2Network(s *suite.Suite, parentLogger *zap.Logger, nodeNames []s
|
|||
continue
|
||||
}
|
||||
|
||||
addrs := nodes[j].ListenAddresses()
|
||||
addrs, err := nodes[j].ListenAddresses()
|
||||
s.Require().NoError(err)
|
||||
s.Require().Greater(len(addrs), 0)
|
||||
_, err := nodes[i].AddRelayPeer(addrs[0])
|
||||
_, err = nodes[i].AddRelayPeer(addrs[0])
|
||||
s.Require().NoError(err)
|
||||
err = nodes[i].DialPeer(addrs[0])
|
||||
s.Require().NoError(err)
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
"github.com/status-im/status-go/services/browsers"
|
||||
"github.com/status-im/status-go/services/wallet"
|
||||
"github.com/status-im/status-go/services/wallet/bigint"
|
||||
"github.com/status-im/status-go/wakuv2"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
|
@ -33,7 +34,6 @@ import (
|
|||
"github.com/status-im/status-go/multiaccounts/settings"
|
||||
"github.com/status-im/status-go/protocol"
|
||||
"github.com/status-im/status-go/protocol/common"
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
"github.com/status-im/status-go/protocol/communities"
|
||||
"github.com/status-im/status-go/protocol/communities/token"
|
||||
"github.com/status-im/status-go/protocol/discord"
|
||||
|
@ -1279,7 +1279,7 @@ func (api *PublicAPI) RequestCommunityInfoFromMailserver(communityID string) (*c
|
|||
|
||||
// Deprecated: RequestCommunityInfoFromMailserverWithShard is deprecated in favor of
|
||||
// configurable FetchCommunity.
|
||||
func (api *PublicAPI) RequestCommunityInfoFromMailserverWithShard(communityID string, shard *shard.Shard) (*communities.Community, error) {
|
||||
func (api *PublicAPI) RequestCommunityInfoFromMailserverWithShard(communityID string, shard *wakuv2.Shard) (*communities.Community, error) {
|
||||
request := &protocol.FetchCommunityRequest{
|
||||
CommunityKey: communityID,
|
||||
Shard: shard,
|
||||
|
@ -1304,7 +1304,7 @@ func (api *PublicAPI) RequestCommunityInfoFromMailserverAsync(communityID string
|
|||
|
||||
// Deprecated: RequestCommunityInfoFromMailserverAsyncWithShard is deprecated in favor of
|
||||
// configurable FetchCommunity.
|
||||
func (api *PublicAPI) RequestCommunityInfoFromMailserverAsyncWithShard(communityID string, shard *shard.Shard) error {
|
||||
func (api *PublicAPI) RequestCommunityInfoFromMailserverAsyncWithShard(communityID string, shard *wakuv2.Shard) error {
|
||||
request := &protocol.FetchCommunityRequest{
|
||||
CommunityKey: communityID,
|
||||
Shard: shard,
|
||||
|
@ -1448,14 +1448,6 @@ func (api *PublicAPI) StorePubsubTopicKey(topic string, privKey string) error {
|
|||
return api.service.messenger.StorePubsubTopicKey(topic, p)
|
||||
}
|
||||
|
||||
func (api *PublicAPI) AddStorePeer(address string) (peer.ID, error) {
|
||||
maddr, err := multiaddr.NewMultiaddr(address)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return api.service.messenger.AddStorePeer(maddr)
|
||||
}
|
||||
|
||||
func (api *PublicAPI) AddRelayPeer(address string) (peer.ID, error) {
|
||||
maddr, err := multiaddr.NewMultiaddr(address)
|
||||
if err != nil {
|
||||
|
|
|
@ -8,10 +8,10 @@ import (
|
|||
|
||||
"github.com/status-im/status-go/appdatabase"
|
||||
"github.com/status-im/status-go/eth-node/types"
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
"github.com/status-im/status-go/protocol/sqlite"
|
||||
"github.com/status-im/status-go/protocol/transport"
|
||||
"github.com/status-im/status-go/t/helpers"
|
||||
"github.com/status-im/status-go/wakuv2"
|
||||
)
|
||||
|
||||
func setupTestDB(t *testing.T) (*Database, func()) {
|
||||
|
@ -62,9 +62,9 @@ func TestTopic(t *testing.T) {
|
|||
defer close()
|
||||
topicA := "0x61000000"
|
||||
topicD := "0x64000000"
|
||||
topic1 := MailserverTopic{PubsubTopic: shard.DefaultShardPubsubTopic(), ContentTopic: topicA, LastRequest: 1}
|
||||
topic2 := MailserverTopic{PubsubTopic: shard.DefaultShardPubsubTopic(), ContentTopic: "0x6200000", LastRequest: 2}
|
||||
topic3 := MailserverTopic{PubsubTopic: shard.DefaultShardPubsubTopic(), ContentTopic: "0x6300000", LastRequest: 3}
|
||||
topic1 := MailserverTopic{PubsubTopic: wakuv2.DefaultShardPubsubTopic(), ContentTopic: topicA, LastRequest: 1}
|
||||
topic2 := MailserverTopic{PubsubTopic: wakuv2.DefaultShardPubsubTopic(), ContentTopic: "0x6200000", LastRequest: 2}
|
||||
topic3 := MailserverTopic{PubsubTopic: wakuv2.DefaultShardPubsubTopic(), ContentTopic: "0x6300000", LastRequest: 3}
|
||||
|
||||
require.NoError(t, db.AddTopic(topic1))
|
||||
require.NoError(t, db.AddTopic(topic2))
|
||||
|
@ -77,14 +77,14 @@ func TestTopic(t *testing.T) {
|
|||
filters := []*transport.Filter{
|
||||
// Existing topic, is not updated
|
||||
{
|
||||
PubsubTopic: shard.DefaultShardPubsubTopic(),
|
||||
PubsubTopic: wakuv2.DefaultShardPubsubTopic(),
|
||||
ContentTopic: types.BytesToTopic([]byte{0x61}),
|
||||
},
|
||||
// Non existing topic is not inserted
|
||||
{
|
||||
Discovery: true,
|
||||
Negotiated: true,
|
||||
PubsubTopic: shard.DefaultShardPubsubTopic(),
|
||||
PubsubTopic: wakuv2.DefaultShardPubsubTopic(),
|
||||
ContentTopic: types.BytesToTopic([]byte{0x64}),
|
||||
},
|
||||
}
|
||||
|
@ -160,7 +160,7 @@ func TestAddGetDeleteMailserverTopics(t *testing.T) {
|
|||
defer close()
|
||||
api := &API{db: db}
|
||||
testTopic := MailserverTopic{
|
||||
PubsubTopic: shard.DefaultShardPubsubTopic(),
|
||||
PubsubTopic: wakuv2.DefaultShardPubsubTopic(),
|
||||
ContentTopic: "topic-001",
|
||||
ChatIDs: []string{"chatID01", "chatID02"},
|
||||
LastRequest: 10,
|
||||
|
@ -173,14 +173,14 @@ func TestAddGetDeleteMailserverTopics(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.EqualValues(t, []MailserverTopic{testTopic}, topics)
|
||||
|
||||
err = api.DeleteMailserverTopic(context.Background(), shard.DefaultShardPubsubTopic(), testTopic.ContentTopic)
|
||||
err = api.DeleteMailserverTopic(context.Background(), wakuv2.DefaultShardPubsubTopic(), testTopic.ContentTopic)
|
||||
require.NoError(t, err)
|
||||
topics, err = api.GetMailserverTopics(context.Background())
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, ([]MailserverTopic)(nil), topics)
|
||||
|
||||
// Delete non-existing topic.
|
||||
err = api.DeleteMailserverTopic(context.Background(), shard.DefaultShardPubsubTopic(), "non-existing-topic")
|
||||
err = api.DeleteMailserverTopic(context.Background(), wakuv2.DefaultShardPubsubTopic(), "non-existing-topic")
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
|
|
|
@ -10,7 +10,7 @@ import (
|
|||
|
||||
"github.com/status-im/status-go/eth-node/types"
|
||||
"github.com/status-im/status-go/protocol"
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
"github.com/status-im/status-go/wakuv2"
|
||||
)
|
||||
|
||||
// Make sure that Service implements node.Lifecycle interface.
|
||||
|
@ -70,7 +70,7 @@ type PublicAPI struct {
|
|||
service *Service
|
||||
}
|
||||
|
||||
func (p *PublicAPI) CommunityInfo(communityID types.HexBytes, shard *shard.Shard) (json.RawMessage, error) {
|
||||
func (p *PublicAPI) CommunityInfo(communityID types.HexBytes, shard *wakuv2.Shard) (json.RawMessage, error) {
|
||||
if p.service.messenger == nil {
|
||||
return nil, ErrNotInitialized
|
||||
}
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Subproject commit c6f47f8aeb1f730108dd0daa7b88beedb5931895
|
|
@ -2,6 +2,7 @@ package timesource
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"sort"
|
||||
"sync"
|
||||
|
@ -144,8 +145,8 @@ type NTPTimeSource struct {
|
|||
timeQuery ntpQuery // for ease of testing
|
||||
now func() time.Time
|
||||
|
||||
quit chan struct{}
|
||||
started bool
|
||||
cancel context.CancelFunc
|
||||
|
||||
mu sync.RWMutex
|
||||
latestOffset time.Duration
|
||||
|
@ -175,7 +176,7 @@ func (s *NTPTimeSource) updateOffset() error {
|
|||
|
||||
// runPeriodically runs periodically the given function based on NTPTimeSource
|
||||
// synchronization limits (fastNTPSyncPeriod / slowNTPSyncPeriod)
|
||||
func (s *NTPTimeSource) runPeriodically(fn func() error, starWithSlowSyncPeriod bool) {
|
||||
func (s *NTPTimeSource) runPeriodically(ctx context.Context, fn func() error, starWithSlowSyncPeriod bool) {
|
||||
if s.started {
|
||||
return
|
||||
}
|
||||
|
@ -184,7 +185,7 @@ func (s *NTPTimeSource) runPeriodically(fn func() error, starWithSlowSyncPeriod
|
|||
if starWithSlowSyncPeriod {
|
||||
period = s.slowNTPSyncPeriod
|
||||
}
|
||||
s.quit = make(chan struct{})
|
||||
|
||||
go func() {
|
||||
defer common.LogOnPanic()
|
||||
for {
|
||||
|
@ -196,7 +197,7 @@ func (s *NTPTimeSource) runPeriodically(fn func() error, starWithSlowSyncPeriod
|
|||
period = s.fastNTPSyncPeriod
|
||||
}
|
||||
|
||||
case <-s.quit:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -204,11 +205,13 @@ func (s *NTPTimeSource) runPeriodically(fn func() error, starWithSlowSyncPeriod
|
|||
}
|
||||
|
||||
// Start initializes the local offset and starts a goroutine that periodically updates the local offset.
|
||||
func (s *NTPTimeSource) Start() {
|
||||
func (s *NTPTimeSource) Start(ctx context.Context) error {
|
||||
if s.started {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
// Attempt to update the offset synchronously so that user can have reliable messages right away
|
||||
err := s.updateOffset()
|
||||
if err != nil {
|
||||
|
@ -217,23 +220,30 @@ func (s *NTPTimeSource) Start() {
|
|||
logutils.ZapLogger().Error("failed to update offset", zap.Error(err))
|
||||
}
|
||||
|
||||
s.runPeriodically(s.updateOffset, err == nil)
|
||||
s.runPeriodically(ctx, s.updateOffset, err == nil)
|
||||
|
||||
s.started = true
|
||||
}
|
||||
s.cancel = cancel
|
||||
|
||||
// Stop goroutine that updates time source.
|
||||
func (s *NTPTimeSource) Stop() error {
|
||||
if s.quit == nil {
|
||||
return nil
|
||||
}
|
||||
close(s.quit)
|
||||
s.started = false
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop goroutine that updates time source.
|
||||
func (s *NTPTimeSource) Stop() {
|
||||
if s.cancel == nil {
|
||||
return
|
||||
}
|
||||
|
||||
s.cancel()
|
||||
s.started = false
|
||||
}
|
||||
|
||||
func (s *NTPTimeSource) GetCurrentTime() time.Time {
|
||||
s.Start()
|
||||
err := s.Start(context.Background())
|
||||
if err != nil {
|
||||
panic("could not obtain timesource")
|
||||
}
|
||||
|
||||
return s.Now()
|
||||
}
|
||||
|
||||
|
@ -243,7 +253,11 @@ func (s *NTPTimeSource) GetCurrentTimeInMillis() uint64 {
|
|||
|
||||
func GetCurrentTime() time.Time {
|
||||
ts := Default()
|
||||
ts.Start()
|
||||
err := ts.Start(context.Background())
|
||||
if err != nil {
|
||||
panic("could not obtain timesource")
|
||||
}
|
||||
|
||||
return ts.Now()
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package timesource
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"testing"
|
||||
|
@ -214,7 +215,7 @@ func TestRunningPeriodically(t *testing.T) {
|
|||
// on NTPTimeSource specified periods (fastNTPSyncPeriod & slowNTPSyncPeriod)
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
source.runPeriodically(func() error {
|
||||
source.runPeriodically(context.TODO(), func() error {
|
||||
mu.Lock()
|
||||
periods = append(periods, time.Since(lastCall))
|
||||
mu.Unlock()
|
||||
|
@ -277,14 +278,12 @@ func TestGetCurrentTimeInMillis(t *testing.T) {
|
|||
// test repeat invoke GetCurrentTimeInMillis
|
||||
n = ts.GetCurrentTimeInMillis()
|
||||
require.Equal(t, expectedTime, n)
|
||||
e := ts.Stop()
|
||||
require.NoError(t, e)
|
||||
ts.Stop()
|
||||
|
||||
// test invoke after stop
|
||||
n = ts.GetCurrentTimeInMillis()
|
||||
require.Equal(t, expectedTime, n)
|
||||
e = ts.Stop()
|
||||
require.NoError(t, e)
|
||||
ts.Stop()
|
||||
}
|
||||
|
||||
func TestGetCurrentTimeOffline(t *testing.T) {
|
||||
|
|
|
@ -8,5 +8,5 @@ import (
|
|||
)
|
||||
|
||||
type StorenodeRequestor interface {
|
||||
Query(ctx context.Context, peerID peer.ID, query *pb.StoreQueryRequest) (StoreRequestResult, error)
|
||||
Query(ctx context.Context, peerInfo peer.AddrInfo, query *pb.StoreQueryRequest) (StoreRequestResult, error)
|
||||
}
|
||||
|
|
|
@ -338,6 +338,24 @@ func (m *StorenodeCycle) GetActiveStorenode() peer.ID {
|
|||
return m.activeStorenode
|
||||
}
|
||||
|
||||
func (m *StorenodeCycle) GetActiveStorenodePeerInfo() peer.AddrInfo {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
|
||||
storeNodes, err := m.storenodeConfigProvider.Storenodes()
|
||||
if err != nil {
|
||||
return peer.AddrInfo{}
|
||||
}
|
||||
|
||||
for _, p := range storeNodes {
|
||||
if p.ID == m.activeStorenode {
|
||||
return p
|
||||
}
|
||||
}
|
||||
|
||||
return peer.AddrInfo{}
|
||||
}
|
||||
|
||||
func (m *StorenodeCycle) IsStorenodeAvailable(peerID peer.ID) bool {
|
||||
return m.storenodeStatus(peerID) == connected
|
||||
}
|
||||
|
|
|
@ -37,7 +37,7 @@ type HistoryRetriever struct {
|
|||
|
||||
type HistoryProcessor interface {
|
||||
OnEnvelope(env *protocol.Envelope, processEnvelopes bool) error
|
||||
OnRequestFailed(requestID []byte, peerID peer.ID, err error)
|
||||
OnRequestFailed(requestID []byte, peerInfo peer.AddrInfo, err error)
|
||||
}
|
||||
|
||||
func NewHistoryRetriever(store common.StorenodeRequestor, historyProcessor HistoryProcessor, logger *zap.Logger) *HistoryRetriever {
|
||||
|
@ -51,7 +51,7 @@ func NewHistoryRetriever(store common.StorenodeRequestor, historyProcessor Histo
|
|||
func (hr *HistoryRetriever) Query(
|
||||
ctx context.Context,
|
||||
criteria store.FilterCriteria,
|
||||
storenodeID peer.ID,
|
||||
storenode peer.AddrInfo,
|
||||
pageLimit uint64,
|
||||
shouldProcessNextPage func(int) (bool, uint64),
|
||||
processEnvelopes bool,
|
||||
|
@ -178,7 +178,7 @@ loop:
|
|||
newCriteria.TimeStart = timeStart
|
||||
newCriteria.TimeEnd = timeEnd
|
||||
|
||||
cursor, envelopesCount, err := hr.createMessagesRequest(queryCtx, storenodeID, newCriteria, w.cursor, w.limit, true, processEnvelopes, logger)
|
||||
cursor, envelopesCount, err := hr.createMessagesRequest(queryCtx, storenode, newCriteria, w.cursor, w.limit, true, processEnvelopes, logger)
|
||||
queryCancel()
|
||||
|
||||
if err != nil {
|
||||
|
@ -241,7 +241,7 @@ loop:
|
|||
|
||||
func (hr *HistoryRetriever) createMessagesRequest(
|
||||
ctx context.Context,
|
||||
peerID peer.ID,
|
||||
peerInfo peer.AddrInfo,
|
||||
criteria store.FilterCriteria,
|
||||
cursor []byte,
|
||||
limit uint64,
|
||||
|
@ -257,7 +257,7 @@ func (hr *HistoryRetriever) createMessagesRequest(
|
|||
})
|
||||
|
||||
go func() {
|
||||
storeCursor, envelopesCount, err = hr.requestStoreMessages(ctx, peerID, criteria, cursor, limit, processEnvelopes)
|
||||
storeCursor, envelopesCount, err = hr.requestStoreMessages(ctx, peerInfo, criteria, cursor, limit, processEnvelopes)
|
||||
resultCh <- struct {
|
||||
storeCursor []byte
|
||||
envelopesCount int
|
||||
|
@ -273,7 +273,7 @@ func (hr *HistoryRetriever) createMessagesRequest(
|
|||
}
|
||||
} else {
|
||||
go func() {
|
||||
_, _, err = hr.requestStoreMessages(ctx, peerID, criteria, cursor, limit, false)
|
||||
_, _, err = hr.requestStoreMessages(ctx, peerInfo, criteria, cursor, limit, false)
|
||||
if err != nil {
|
||||
logger.Error("failed to request store messages", zap.Error(err))
|
||||
}
|
||||
|
@ -283,9 +283,9 @@ func (hr *HistoryRetriever) createMessagesRequest(
|
|||
return
|
||||
}
|
||||
|
||||
func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID peer.ID, criteria store.FilterCriteria, cursor []byte, limit uint64, processEnvelopes bool) ([]byte, int, error) {
|
||||
func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerInfo peer.AddrInfo, criteria store.FilterCriteria, cursor []byte, limit uint64, processEnvelopes bool) ([]byte, int, error) {
|
||||
requestID := protocol.GenerateRequestID()
|
||||
logger := hr.logger.With(zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", peerID))
|
||||
logger := hr.logger.With(zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", peerInfo.ID))
|
||||
|
||||
logger.Debug("store.query",
|
||||
logging.Timep("startTime", criteria.TimeStart),
|
||||
|
@ -307,12 +307,12 @@ func (hr *HistoryRetriever) requestStoreMessages(ctx context.Context, peerID pee
|
|||
}
|
||||
|
||||
queryStart := time.Now()
|
||||
result, err := hr.store.Query(ctx, peerID, storeQueryRequest)
|
||||
result, err := hr.store.Query(ctx, peerInfo, storeQueryRequest)
|
||||
queryDuration := time.Since(queryStart)
|
||||
if err != nil {
|
||||
logger.Error("error querying storenode", zap.Error(err))
|
||||
|
||||
hr.historyProcessor.OnRequestFailed(requestID, peerID, err)
|
||||
hr.historyProcessor.OnRequestFailed(requestID, peerInfo, err)
|
||||
|
||||
return nil, 0, err
|
||||
}
|
||||
|
|
|
@ -10,7 +10,7 @@ import (
|
|||
)
|
||||
|
||||
type criteriaInterest struct {
|
||||
peerID peer.ID
|
||||
peerInfo peer.AddrInfo
|
||||
contentFilter protocol.ContentFilter
|
||||
lastChecked time.Time
|
||||
|
||||
|
@ -19,7 +19,7 @@ type criteriaInterest struct {
|
|||
}
|
||||
|
||||
func (c criteriaInterest) equals(other criteriaInterest) bool {
|
||||
if c.peerID != other.peerID {
|
||||
if c.peerInfo.ID != other.peerInfo.ID {
|
||||
return false
|
||||
}
|
||||
|
||||
|
|
|
@ -20,10 +20,10 @@ type defaultStorenodeRequestor struct {
|
|||
store *store.WakuStore
|
||||
}
|
||||
|
||||
func (d *defaultStorenodeRequestor) GetMessagesByHash(ctx context.Context, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) (common.StoreRequestResult, error) {
|
||||
return d.store.QueryByHash(ctx, messageHashes, store.WithPeer(peerID), store.WithPaging(false, pageSize))
|
||||
func (d *defaultStorenodeRequestor) GetMessagesByHash(ctx context.Context, peerInfo peer.AddrInfo, pageSize uint64, messageHashes []pb.MessageHash) (common.StoreRequestResult, error) {
|
||||
return d.store.QueryByHash(ctx, messageHashes, store.WithPeerAddr(peerInfo.Addrs...), store.WithPaging(false, pageSize))
|
||||
}
|
||||
|
||||
func (d *defaultStorenodeRequestor) Query(ctx context.Context, peerID peer.ID, storeQueryRequest *storepb.StoreQueryRequest) (common.StoreRequestResult, error) {
|
||||
return d.store.RequestRaw(ctx, peerID, storeQueryRequest)
|
||||
func (d *defaultStorenodeRequestor) Query(ctx context.Context, peerInfo peer.AddrInfo, storeQueryRequest *storepb.StoreQueryRequest) (common.StoreRequestResult, error) {
|
||||
return d.store.RequestRaw(ctx, peerInfo, storeQueryRequest)
|
||||
}
|
||||
|
|
|
@ -66,13 +66,13 @@ func NewMissingMessageVerifier(storenodeRequester common.StorenodeRequestor, mes
|
|||
}
|
||||
}
|
||||
|
||||
func (m *MissingMessageVerifier) SetCriteriaInterest(peerID peer.ID, contentFilter protocol.ContentFilter) {
|
||||
func (m *MissingMessageVerifier) SetCriteriaInterest(peerInfo peer.AddrInfo, contentFilter protocol.ContentFilter) {
|
||||
m.criteriaInterestMu.Lock()
|
||||
defer m.criteriaInterestMu.Unlock()
|
||||
|
||||
ctx, cancel := context.WithCancel(m.ctx)
|
||||
criteriaInterest := criteriaInterest{
|
||||
peerID: peerID,
|
||||
peerInfo: peerInfo,
|
||||
contentFilter: contentFilter,
|
||||
lastChecked: m.timesource.Now().Add(-m.params.delay),
|
||||
ctx: ctx,
|
||||
|
@ -164,7 +164,7 @@ func (m *MissingMessageVerifier) fetchHistory(c chan<- *protocol.Envelope, inter
|
|||
}
|
||||
|
||||
m.logger.Error("could not fetch history",
|
||||
zap.Stringer("peerID", interest.peerID),
|
||||
zap.Stringer("peerID", interest.peerInfo.ID),
|
||||
zap.String("pubsubTopic", interest.contentFilter.PubsubTopic),
|
||||
zap.Strings("contentTopics", contentTopics))
|
||||
continue
|
||||
|
@ -207,7 +207,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
|
|||
contentTopics := interest.contentFilter.ContentTopics.ToList()
|
||||
|
||||
logger := m.logger.With(
|
||||
zap.Stringer("peerID", interest.peerID),
|
||||
zap.Stringer("peerID", interest.peerInfo.ID),
|
||||
zap.Strings("contentTopics", contentTopics[batchFrom:batchTo]),
|
||||
zap.String("pubsubTopic", interest.contentFilter.PubsubTopic),
|
||||
logging.Epoch("from", interest.lastChecked),
|
||||
|
@ -226,7 +226,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
|
|||
|
||||
return m.storenodeRequestor.Query(
|
||||
ctx,
|
||||
interest.peerID,
|
||||
interest.peerInfo,
|
||||
storeQueryRequest,
|
||||
)
|
||||
}, logger, "retrieving history to check for missing messages")
|
||||
|
@ -309,7 +309,7 @@ func (m *MissingMessageVerifier) fetchMessagesBatch(c chan<- *protocol.Envelope,
|
|||
PaginationLimit: proto.Uint64(maxMsgHashesPerRequest),
|
||||
}
|
||||
|
||||
return m.storenodeRequestor.Query(queryCtx, interest.peerID, storeQueryRequest)
|
||||
return m.storenodeRequestor.Query(queryCtx, interest.peerInfo, storeQueryRequest)
|
||||
}, logger, "retrieving missing messages")
|
||||
if err != nil {
|
||||
if !errors.Is(err, context.Canceled) {
|
||||
|
|
|
@ -18,10 +18,10 @@ type defaultStorenodeMessageVerifier struct {
|
|||
store *store.WakuStore
|
||||
}
|
||||
|
||||
func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) {
|
||||
func (d *defaultStorenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.AddrInfo, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) {
|
||||
var opts []store.RequestOption
|
||||
opts = append(opts, store.WithRequestID(requestID))
|
||||
opts = append(opts, store.WithPeer(peerID))
|
||||
opts = append(opts, store.WithPeerAddr(peerID.Addrs...))
|
||||
opts = append(opts, store.WithPaging(false, pageSize))
|
||||
opts = append(opts, store.IncludeData(false))
|
||||
|
||||
|
|
|
@ -33,7 +33,7 @@ type ISentCheck interface {
|
|||
|
||||
type StorenodeMessageVerifier interface {
|
||||
// MessagesExist returns a list of the messages it found from a list of message hashes
|
||||
MessageHashesExist(ctx context.Context, requestID []byte, peerID peer.ID, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error)
|
||||
MessageHashesExist(ctx context.Context, requestID []byte, peerInfo peer.AddrInfo, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error)
|
||||
}
|
||||
|
||||
// MessageSentCheck tracks the outgoing messages and check against store node
|
||||
|
@ -211,8 +211,8 @@ func (m *MessageSentCheck) Start() {
|
|||
}
|
||||
|
||||
func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []common.Hash, relayTime []uint32, pubsubTopic string) []common.Hash {
|
||||
selectedPeer := m.storenodeCycle.GetActiveStorenode()
|
||||
if selectedPeer == "" {
|
||||
selectedPeer := m.storenodeCycle.GetActiveStorenodePeerInfo()
|
||||
if selectedPeer.ID == "" {
|
||||
m.logger.Error("no store peer id available", zap.String("pubsubTopic", pubsubTopic))
|
||||
return []common.Hash{}
|
||||
}
|
||||
|
@ -224,13 +224,13 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c
|
|||
messageHashes[i] = pb.ToMessageHash(hash.Bytes())
|
||||
}
|
||||
|
||||
m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Stringers("messageHashes", messageHashes))
|
||||
m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer.ID), zap.Stringers("messageHashes", messageHashes))
|
||||
|
||||
queryCtx, cancel := context.WithTimeout(ctx, m.storeQueryTimeout)
|
||||
defer cancel()
|
||||
result, err := m.messageVerifier.MessageHashesExist(queryCtx, requestID, selectedPeer, m.maxHashQueryLength, messageHashes)
|
||||
if err != nil {
|
||||
m.logger.Error("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Error(err))
|
||||
m.logger.Error("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer.ID), zap.Error(err))
|
||||
return []common.Hash{}
|
||||
}
|
||||
|
||||
|
|
|
@ -703,8 +703,8 @@ func (w *WakuNode) startStore(ctx context.Context, sub *relay.Subscription) erro
|
|||
|
||||
// AddPeer is used to add a peer and the protocols it support to the node peerstore
|
||||
// TODO: Need to update this for autosharding, to only take contentTopics and optional pubSubTopics or provide an alternate API only for contentTopics.
|
||||
func (w *WakuNode) AddPeer(address ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) {
|
||||
pData, err := w.peermanager.AddPeer(address, origin, pubSubTopics, protocols...)
|
||||
func (w *WakuNode) AddPeer(addresses []ma.Multiaddr, origin wps.Origin, pubSubTopics []string, protocols ...protocol.ID) (peer.ID, error) {
|
||||
pData, err := w.peermanager.AddPeer(addresses, origin, pubSubTopics, protocols...)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
|
|
@ -684,13 +684,19 @@ func AddrInfoToPeerData(origin wps.Origin, peerID peer.ID, host host.Host, pubsu
|
|||
}
|
||||
|
||||
// AddPeer adds peer to the peerStore and also to service slots
|
||||
func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubsubTopics []string, protocols ...protocol.ID) (*service.PeerData, error) {
|
||||
func (pm *PeerManager) AddPeer(addresses []ma.Multiaddr, origin wps.Origin, pubsubTopics []string, protocols ...protocol.ID) (*service.PeerData, error) {
|
||||
//Assuming all addresses have peerId
|
||||
info, err := peer.AddrInfoFromP2pAddr(address)
|
||||
infoArr, err := peer.AddrInfosFromP2pAddrs(addresses...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(infoArr) > 1 {
|
||||
return nil, errors.New("only a single peerID is expected in AddPeer")
|
||||
}
|
||||
|
||||
info := infoArr[0]
|
||||
|
||||
//Add Service peers to serviceSlots.
|
||||
for _, proto := range protocols {
|
||||
pm.addPeerToServiceSlot(proto, info.ID)
|
||||
|
@ -703,11 +709,8 @@ func (pm *PeerManager) AddPeer(address ma.Multiaddr, origin wps.Origin, pubsubTo
|
|||
}
|
||||
|
||||
pData := &service.PeerData{
|
||||
Origin: origin,
|
||||
AddrInfo: peer.AddrInfo{
|
||||
ID: info.ID,
|
||||
Addrs: info.Addrs,
|
||||
},
|
||||
Origin: origin,
|
||||
AddrInfo: info,
|
||||
PubsubTopics: pubsubTopics,
|
||||
}
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
|
||||
"github.com/libp2p/go-msgio/pbio"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/waku-org/go-waku/logging"
|
||||
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
|
||||
|
@ -343,7 +344,7 @@ func (wf *WakuFilterLightNode) handleFilterSubscribeOptions(ctx context.Context,
|
|||
|
||||
//Add Peer to peerstore.
|
||||
if params.pm != nil && params.peerAddr != nil {
|
||||
pData, err := wf.pm.AddPeer(params.peerAddr, peerstore.Static, maps.Keys(pubSubTopicMap), FilterSubscribeID_v20beta1)
|
||||
pData, err := wf.pm.AddPeer([]multiaddr.Multiaddr{params.peerAddr}, peerstore.Static, maps.Keys(pubSubTopicMap), FilterSubscribeID_v20beta1)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"github.com/waku-org/go-waku/tests"
|
||||
|
@ -102,7 +103,7 @@ func (s *FilterTestSuite) TearDownTest() {
|
|||
|
||||
func (s *FilterTestSuite) ConnectToFullNode(h1 *WakuFilterLightNode, h2 *WakuFilterFullNode) {
|
||||
mAddr := tests.GetAddr(h2.h)
|
||||
_, err := h1.pm.AddPeer(mAddr, wps.Static, []string{s.TestTopic}, FilterSubscribeID_v20beta1)
|
||||
_, err := h1.pm.AddPeer([]multiaddr.Multiaddr{mAddr}, wps.Static, []string{s.TestTopic}, FilterSubscribeID_v20beta1)
|
||||
s.Log.Info("add peer", zap.Stringer("mAddr", mAddr))
|
||||
s.Require().NoError(err)
|
||||
}
|
||||
|
|
|
@ -310,7 +310,7 @@ func (store *WakuStore) Query(ctx context.Context, query Query, opts ...HistoryR
|
|||
|
||||
//Add Peer to peerstore.
|
||||
if store.pm != nil && params.peerAddr != nil {
|
||||
pData, err := store.pm.AddPeer(params.peerAddr, peerstore.Static, pubsubTopics, StoreID_v20beta4)
|
||||
pData, err := store.pm.AddPeer([]multiaddr.Multiaddr{params.peerAddr}, peerstore.Static, pubsubTopics, StoreID_v20beta4)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
3
vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go
generated
vendored
3
vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go
generated
vendored
|
@ -14,6 +14,7 @@ import (
|
|||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
|
||||
"github.com/libp2p/go-msgio/pbio"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/waku-org/go-waku/logging"
|
||||
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||
|
@ -273,7 +274,7 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe
|
|||
}
|
||||
|
||||
if params.pm != nil && params.peerAddr != nil {
|
||||
pData, err := wakuLP.pm.AddPeer(params.peerAddr, peerstore.Static, []string{params.pubsubTopic}, LightPushID_v20beta1)
|
||||
pData, err := wakuLP.pm.AddPeer([]multiaddr.Multiaddr{params.peerAddr}, peerstore.Static, []string{params.pubsubTopic}, LightPushID_v20beta1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"github.com/ethereum/go-ethereum/rlp"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/libp2p/go-msgio/pbio"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"github.com/waku-org/go-waku/waku/v2/peermanager"
|
||||
"github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
|
@ -36,7 +37,7 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts
|
|||
}
|
||||
|
||||
if params.pm != nil && params.peerAddr != nil {
|
||||
pData, err := wakuPX.pm.AddPeer(params.peerAddr, peerstore.Static, []string{}, PeerExchangeID_v20alpha1)
|
||||
pData, err := wakuPX.pm.AddPeer([]multiaddr.Multiaddr{params.peerAddr}, peerstore.Static, []string{}, PeerExchangeID_v20alpha1)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -194,15 +194,15 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ
|
|||
return result, nil
|
||||
}
|
||||
|
||||
func (s *WakuStore) RequestRaw(ctx context.Context, peerID peer.ID, storeRequest *pb.StoreQueryRequest) (Result, error) {
|
||||
func (s *WakuStore) RequestRaw(ctx context.Context, peerInfo peer.AddrInfo, storeRequest *pb.StoreQueryRequest) (Result, error) {
|
||||
err := storeRequest.Validate()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var params Parameters
|
||||
params.selectedPeer = peerID
|
||||
if params.selectedPeer == "" {
|
||||
params.peerAddr = peerInfo.Addrs
|
||||
if len(params.peerAddr) == 0 {
|
||||
return nil, ErrMustSelectPeer
|
||||
}
|
||||
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
|
||||
type Parameters struct {
|
||||
selectedPeer peer.ID
|
||||
peerAddr multiaddr.Multiaddr
|
||||
peerAddr []multiaddr.Multiaddr
|
||||
peerSelectionType peermanager.PeerSelection
|
||||
preferredPeers peer.IDSlice
|
||||
requestID []byte
|
||||
|
@ -33,7 +33,7 @@ type RequestOption func(*Parameters) error
|
|||
func WithPeer(p peer.ID) RequestOption {
|
||||
return func(params *Parameters) error {
|
||||
params.selectedPeer = p
|
||||
if params.peerAddr != nil {
|
||||
if len(params.peerAddr) != 0 {
|
||||
return errors.New("WithPeer and WithPeerAddr options are mutually exclusive")
|
||||
}
|
||||
return nil
|
||||
|
@ -43,7 +43,7 @@ func WithPeer(p peer.ID) RequestOption {
|
|||
// WithPeerAddr is an option used to specify a peerAddress to request the message history.
|
||||
// This new peer will be added to peerStore.
|
||||
// Note that this option is mutually exclusive to WithPeerAddr, only one of them can be used.
|
||||
func WithPeerAddr(pAddr multiaddr.Multiaddr) RequestOption {
|
||||
func WithPeerAddr(pAddr ...multiaddr.Multiaddr) RequestOption {
|
||||
return func(params *Parameters) error {
|
||||
params.peerAddr = pAddr
|
||||
if params.selectedPeer != "" {
|
||||
|
|
|
@ -18,13 +18,13 @@
|
|||
|
||||
package wakuv2
|
||||
|
||||
/* TODO-nwaku
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"golang.org/x/exp/maps"
|
||||
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
"github.com/status-im/status-go/wakuv2/common"
|
||||
)
|
||||
|
||||
|
@ -57,7 +57,7 @@ func TestMultipleTopicCopyInNewMessageFilter(t *testing.T) {
|
|||
}
|
||||
|
||||
found := false
|
||||
candidates := w.filters.GetWatchersByTopic(shard.DefaultShardPubsubTopic(), t1)
|
||||
candidates := w.filters.GetWatchersByTopic(DefaultShardPubsubTopic(), t1)
|
||||
for _, f := range candidates {
|
||||
if maps.Equal(f.ContentTopics, common.NewTopicSet(crit.ContentTopics)) {
|
||||
found = true
|
||||
|
@ -69,3 +69,4 @@ func TestMultipleTopicCopyInNewMessageFilter(t *testing.T) {
|
|||
t.Fatalf("Could not find filter with both topics")
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,79 @@
|
|||
package common
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
)
|
||||
|
||||
// Envelope contains information about the pubsub topic of a WakuMessage
|
||||
// and a hash used to identify a message based on the bytes of a WakuMessage
|
||||
// protobuffer
|
||||
type Envelope interface {
|
||||
Message() *pb.WakuMessage
|
||||
PubsubTopic() string
|
||||
Hash() pb.MessageHash
|
||||
}
|
||||
|
||||
type envelopeImpl struct {
|
||||
msg *pb.WakuMessage
|
||||
topic string
|
||||
hash pb.MessageHash
|
||||
}
|
||||
|
||||
type tmpWakuMessageJson struct {
|
||||
Payload []byte `json:"payload,omitempty"`
|
||||
ContentTopic string `json:"contentTopic,omitempty"`
|
||||
Version *uint32 `json:"version,omitempty"`
|
||||
Timestamp *int64 `json:"timestamp,omitempty"`
|
||||
Meta []byte `json:"meta,omitempty"`
|
||||
Ephemeral *bool `json:"ephemeral,omitempty"`
|
||||
RateLimitProof []byte `json:"proof,omitempty"`
|
||||
}
|
||||
|
||||
type tmpEnvelopeStruct struct {
|
||||
WakuMessage tmpWakuMessageJson `json:"wakuMessage"`
|
||||
PubsubTopic string `json:"pubsubTopic"`
|
||||
MessageHash string `json:"messageHash"`
|
||||
}
|
||||
|
||||
// NewEnvelope creates a new Envelope from a json string generated in nwaku
|
||||
func NewEnvelope(jsonEventStr string) (Envelope, error) {
|
||||
tmpEnvelopeStruct := tmpEnvelopeStruct{}
|
||||
err := json.Unmarshal([]byte(jsonEventStr), &tmpEnvelopeStruct)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
hash, err := hexutil.Decode(tmpEnvelopeStruct.MessageHash)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &envelopeImpl{
|
||||
msg: &pb.WakuMessage{
|
||||
Payload: tmpEnvelopeStruct.WakuMessage.Payload,
|
||||
ContentTopic: tmpEnvelopeStruct.WakuMessage.ContentTopic,
|
||||
Version: tmpEnvelopeStruct.WakuMessage.Version,
|
||||
Timestamp: tmpEnvelopeStruct.WakuMessage.Timestamp,
|
||||
Meta: tmpEnvelopeStruct.WakuMessage.Meta,
|
||||
Ephemeral: tmpEnvelopeStruct.WakuMessage.Ephemeral,
|
||||
RateLimitProof: tmpEnvelopeStruct.WakuMessage.RateLimitProof,
|
||||
},
|
||||
topic: tmpEnvelopeStruct.PubsubTopic,
|
||||
hash: pb.ToMessageHash(hash),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (e *envelopeImpl) Message() *pb.WakuMessage {
|
||||
return e.msg
|
||||
}
|
||||
|
||||
func (e *envelopeImpl) PubsubTopic() string {
|
||||
return e.topic
|
||||
}
|
||||
|
||||
func (e *envelopeImpl) Hash() pb.MessageHash {
|
||||
return e.hash
|
||||
}
|
|
@ -9,7 +9,6 @@ import (
|
|||
"go.uber.org/zap"
|
||||
|
||||
"github.com/waku-org/go-waku/waku/v2/payload"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
|
||||
"github.com/status-im/status-go/logutils"
|
||||
|
||||
|
@ -41,7 +40,7 @@ type MessageParams struct {
|
|||
// ReceivedMessage represents a data packet to be received through the
|
||||
// WakuV2 protocol and successfully decrypted.
|
||||
type ReceivedMessage struct {
|
||||
Envelope *protocol.Envelope // Wrapped Waku Message
|
||||
Envelope Envelope // Wrapped Waku Message
|
||||
|
||||
MsgType MessageType
|
||||
|
||||
|
@ -105,7 +104,7 @@ type MemoryMessageStore struct {
|
|||
messages map[common.Hash]*ReceivedMessage
|
||||
}
|
||||
|
||||
func NewReceivedMessage(env *protocol.Envelope, msgType MessageType) *ReceivedMessage {
|
||||
func NewReceivedMessage(env Envelope, msgType MessageType) *ReceivedMessage {
|
||||
ct, err := ExtractTopicFromContentTopic(env.Message().ContentTopic)
|
||||
if err != nil {
|
||||
logutils.ZapLogger().Debug("failed to extract content topic from message",
|
||||
|
|
|
@ -23,8 +23,6 @@ import (
|
|||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
|
||||
ethdisc "github.com/ethereum/go-ethereum/p2p/dnsdisc"
|
||||
|
||||
"github.com/status-im/status-go/wakuv2/common"
|
||||
|
@ -117,10 +115,10 @@ func setDefaults(cfg *Config) *Config {
|
|||
}
|
||||
|
||||
if cfg.DefaultShardPubsubTopic == "" {
|
||||
cfg.DefaultShardPubsubTopic = shard.DefaultShardPubsubTopic()
|
||||
cfg.DefaultShardPubsubTopic = DefaultShardPubsubTopic()
|
||||
//For now populating with both used shards, but this can be populated from user subscribed communities etc once community sharding is implemented
|
||||
cfg.DefaultShardedPubsubTopics = append(cfg.DefaultShardedPubsubTopics, shard.DefaultShardPubsubTopic())
|
||||
cfg.DefaultShardedPubsubTopics = append(cfg.DefaultShardedPubsubTopics, shard.DefaultNonProtectedPubsubTopic())
|
||||
cfg.DefaultShardedPubsubTopics = append(cfg.DefaultShardedPubsubTopics, DefaultShardPubsubTopic())
|
||||
cfg.DefaultShardedPubsubTopics = append(cfg.DefaultShardedPubsubTopics, DefaultNonProtectedPubsubTopic())
|
||||
}
|
||||
|
||||
return cfg
|
||||
|
|
|
@ -1,3 +1,6 @@
|
|||
//go:build !use_nwaku
|
||||
// +build !use_nwaku
|
||||
|
||||
// Copyright 2019 The Waku Library Authors.
|
||||
//
|
||||
// The Waku library is free software: you can redistribute it and/or modify
|
||||
|
@ -1624,8 +1627,8 @@ func (w *Waku) RelayPeersByTopic(topic string) (*types.PeerList, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (w *Waku) ListenAddresses() []multiaddr.Multiaddr {
|
||||
return w.node.ListenAddresses()
|
||||
func (w *Waku) ListenAddresses() ([]multiaddr.Multiaddr, error) {
|
||||
return w.node.ListenAddresses(), nil
|
||||
}
|
||||
|
||||
func (w *Waku) ENR() (*enode.Node, error) {
|
||||
|
@ -1878,20 +1881,12 @@ func (w *Waku) restartDiscV5(useOnlyDNSDiscCache bool) error {
|
|||
return w.node.SetDiscV5Bootnodes(bootnodes)
|
||||
}
|
||||
|
||||
func (w *Waku) AddStorePeer(address multiaddr.Multiaddr) (peer.ID, error) {
|
||||
peerID, err := w.node.AddPeer(address, wps.Static, w.cfg.DefaultShardedPubsubTopics, store.StoreQueryID_v300)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return peerID, nil
|
||||
}
|
||||
|
||||
func (w *Waku) timestamp() int64 {
|
||||
return w.timesource.Now().UnixNano()
|
||||
}
|
||||
|
||||
func (w *Waku) AddRelayPeer(address multiaddr.Multiaddr) (peer.ID, error) {
|
||||
peerID, err := w.node.AddPeer(address, wps.Static, w.cfg.DefaultShardedPubsubTopics, relay.WakuRelayID_v200)
|
||||
peerID, err := w.node.AddPeer([]multiaddr.Multiaddr{address}, wps.Static, w.cfg.DefaultShardedPubsubTopics, relay.WakuRelayID_v200)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
@ -2009,3 +2004,8 @@ func FormatPeerConnFailures(wakuNode *node.WakuNode) map[string]int {
|
|||
func (w *Waku) LegacyStoreNode() legacy_store.Store {
|
||||
return w.node.LegacyStore()
|
||||
}
|
||||
|
||||
func (w *Waku) ListPeersInMesh(pubsubTopic string) (int, error) {
|
||||
listPeers := w.node.Relay().PubSub().ListPeers(pubsubTopic)
|
||||
return len(listPeers), nil
|
||||
}
|
|
@ -3,10 +3,10 @@ package wakuv2
|
|||
import (
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
|
||||
"github.com/status-im/status-go/wakuv2/common"
|
||||
|
||||
"github.com/waku-org/go-waku/waku/v2/api/history"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
|
||||
"github.com/status-im/status-go/wakuv2/common"
|
||||
)
|
||||
|
||||
type HistoryProcessorWrapper struct {
|
||||
|
@ -21,6 +21,6 @@ func (hr *HistoryProcessorWrapper) OnEnvelope(env *protocol.Envelope, processEnv
|
|||
return hr.waku.OnNewEnvelopes(env, common.StoreMessageType, processEnvelopes)
|
||||
}
|
||||
|
||||
func (hr *HistoryProcessorWrapper) OnRequestFailed(requestID []byte, peerID peer.ID, err error) {
|
||||
hr.waku.onHistoricMessagesRequestFailed(requestID, peerID, err)
|
||||
func (hr *HistoryProcessorWrapper) OnRequestFailed(requestID []byte, peerInfo peer.AddrInfo, err error) {
|
||||
hr.waku.onHistoricMessagesRequestFailed(requestID, peerInfo, err)
|
||||
}
|
||||
|
|
|
@ -92,6 +92,7 @@ func (w *Waku) publishEnvelope(envelope *protocol.Envelope) {
|
|||
err = w.messageSender.Send(publish.NewRequest(w.ctx, envelope))
|
||||
}
|
||||
|
||||
/* TODO-nwaku
|
||||
if w.statusTelemetryClient != nil {
|
||||
if err == nil {
|
||||
w.statusTelemetryClient.PushSentEnvelope(w.ctx, SentEnvelope{Envelope: envelope, PublishMethod: w.messageSender.PublishMethod()})
|
||||
|
@ -99,6 +100,7 @@ func (w *Waku) publishEnvelope(envelope *protocol.Envelope) {
|
|||
w.statusTelemetryClient.PushErrorSendingEnvelope(w.ctx, ErrorSendingEnvelope{Error: err, SentEnvelope: SentEnvelope{Envelope: envelope, PublishMethod: w.messageSender.PublishMethod()}})
|
||||
}
|
||||
}
|
||||
*/
|
||||
|
||||
if err != nil {
|
||||
logger.Error("could not send message", zap.Error(err))
|
||||
|
|
3367
wakuv2/nwaku.go
3367
wakuv2/nwaku.go
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,58 @@
|
|||
package wakuv2
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
type NwakuInfo struct {
|
||||
ListenAddresses []string `json:"listenAddresses"`
|
||||
EnrUri string `json:"enrUri"`
|
||||
}
|
||||
|
||||
func GetNwakuInfo(host *string, port *int) (NwakuInfo, error) {
|
||||
nwakuRestPort := 8645
|
||||
if port != nil {
|
||||
nwakuRestPort = *port
|
||||
}
|
||||
envNwakuRestPort := os.Getenv("NWAKU_REST_PORT")
|
||||
if envNwakuRestPort != "" {
|
||||
v, err := strconv.Atoi(envNwakuRestPort)
|
||||
if err != nil {
|
||||
return NwakuInfo{}, err
|
||||
}
|
||||
nwakuRestPort = v
|
||||
}
|
||||
|
||||
nwakuRestHost := "localhost"
|
||||
if host != nil {
|
||||
nwakuRestHost = *host
|
||||
}
|
||||
envNwakuRestHost := os.Getenv("NWAKU_REST_HOST")
|
||||
if envNwakuRestHost != "" {
|
||||
nwakuRestHost = envNwakuRestHost
|
||||
}
|
||||
|
||||
resp, err := http.Get(fmt.Sprintf("http://%s:%d/debug/v1/info", nwakuRestHost, nwakuRestPort))
|
||||
if err != nil {
|
||||
return NwakuInfo{}, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return NwakuInfo{}, err
|
||||
}
|
||||
|
||||
var data NwakuInfo
|
||||
err = json.Unmarshal(body, &data)
|
||||
if err != nil {
|
||||
return NwakuInfo{}, err
|
||||
}
|
||||
|
||||
return data, nil
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
//go:build use_nwaku
|
||||
// +build use_nwaku
|
||||
|
||||
package wakuv2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
commonapi "github.com/waku-org/go-waku/waku/v2/api/common"
|
||||
)
|
||||
|
||||
type pinger struct {
|
||||
node *WakuNode
|
||||
}
|
||||
|
||||
func newPinger(node *WakuNode) commonapi.Pinger {
|
||||
return &pinger{
|
||||
node: node,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *pinger) PingPeer(ctx context.Context, peerInfo peer.AddrInfo) (time.Duration, error) {
|
||||
return p.node.PingPeer(ctx, peerInfo)
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
//go:build use_nwaku
|
||||
// +build use_nwaku
|
||||
|
||||
package wakuv2
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/waku-org/go-waku/waku/v2/api/publish"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
)
|
||||
|
||||
type nwakuPublisher struct {
|
||||
node *WakuNode
|
||||
}
|
||||
|
||||
func newPublisher(node *WakuNode) publish.Publisher {
|
||||
return &nwakuPublisher{
|
||||
node: node,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *nwakuPublisher) RelayListPeers(pubsubTopic string) ([]peer.ID, error) {
|
||||
// TODO-nwaku
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (p *nwakuPublisher) RelayPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string) (pb.MessageHash, error) {
|
||||
return p.node.RelayPublish(ctx, message, pubsubTopic)
|
||||
}
|
||||
|
||||
// LightpushPublish publishes a message via WakuLightPush
|
||||
func (p *nwakuPublisher) LightpushPublish(ctx context.Context, message *pb.WakuMessage, pubsubTopic string, maxPeers int) (pb.MessageHash, error) {
|
||||
// TODO-nwaku
|
||||
return pb.MessageHash{}, nil
|
||||
}
|
|
@ -0,0 +1,77 @@
|
|||
//go:build use_nwaku
|
||||
// +build use_nwaku
|
||||
|
||||
package wakuv2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||
storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
|
||||
)
|
||||
|
||||
type storeResultImpl struct {
|
||||
done bool
|
||||
|
||||
node *WakuNode
|
||||
storeRequest *storepb.StoreQueryRequest
|
||||
storeResponse *storepb.StoreQueryResponse
|
||||
peerInfo peer.AddrInfo
|
||||
}
|
||||
|
||||
func newStoreResultImpl(node *WakuNode, peerInfo peer.AddrInfo, storeRequest *storepb.StoreQueryRequest, storeResponse *storepb.StoreQueryResponse) *storeResultImpl {
|
||||
return &storeResultImpl{
|
||||
node: node,
|
||||
storeRequest: storeRequest,
|
||||
storeResponse: storeResponse,
|
||||
peerInfo: peerInfo,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *storeResultImpl) Cursor() []byte {
|
||||
return r.storeResponse.GetPaginationCursor()
|
||||
}
|
||||
|
||||
func (r *storeResultImpl) IsComplete() bool {
|
||||
return r.done
|
||||
}
|
||||
|
||||
func (r *storeResultImpl) PeerID() peer.ID {
|
||||
return r.peerInfo.ID
|
||||
}
|
||||
|
||||
func (r *storeResultImpl) Query() *storepb.StoreQueryRequest {
|
||||
return r.storeRequest
|
||||
}
|
||||
|
||||
func (r *storeResultImpl) Response() *storepb.StoreQueryResponse {
|
||||
return r.storeResponse
|
||||
}
|
||||
|
||||
func (r *storeResultImpl) Next(ctx context.Context, opts ...store.RequestOption) error {
|
||||
// TODO: opts is being ignored. Will require some changes in go-waku. For now using this
|
||||
// is not necessary
|
||||
|
||||
if r.storeResponse.GetPaginationCursor() == nil {
|
||||
r.done = true
|
||||
return nil
|
||||
}
|
||||
|
||||
r.storeRequest.RequestId = hex.EncodeToString(protocol.GenerateRequestID())
|
||||
r.storeRequest.PaginationCursor = r.storeResponse.PaginationCursor
|
||||
|
||||
storeResponse, err := r.node.StoreQuery(ctx, r.storeRequest, r.peerInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r.storeResponse = storeResponse
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *storeResultImpl) Messages() []*storepb.WakuMessageKeyValue {
|
||||
return r.storeResponse.GetMessages()
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package shard
|
||||
package wakuv2
|
||||
|
||||
import (
|
||||
wakuproto "github.com/waku-org/go-waku/waku/v2/protocol"
|
|
@ -0,0 +1,59 @@
|
|||
//go:build use_nwaku
|
||||
// +build use_nwaku
|
||||
|
||||
package wakuv2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/waku-org/go-waku/waku/v2/api/publish"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
|
||||
)
|
||||
|
||||
type storenodeMessageVerifier struct {
|
||||
node *WakuNode
|
||||
}
|
||||
|
||||
func newStorenodeMessageVerifier(node *WakuNode) publish.StorenodeMessageVerifier {
|
||||
return &storenodeMessageVerifier{
|
||||
node: node,
|
||||
}
|
||||
}
|
||||
|
||||
func (d *storenodeMessageVerifier) MessageHashesExist(ctx context.Context, requestID []byte, peerInfo peer.AddrInfo, pageSize uint64, messageHashes []pb.MessageHash) ([]pb.MessageHash, error) {
|
||||
requestIDStr := hex.EncodeToString(requestID)
|
||||
storeRequest := &storepb.StoreQueryRequest{
|
||||
RequestId: requestIDStr,
|
||||
MessageHashes: make([][]byte, len(messageHashes)),
|
||||
IncludeData: false,
|
||||
PaginationCursor: nil,
|
||||
PaginationForward: false,
|
||||
PaginationLimit: proto.Uint64(pageSize),
|
||||
}
|
||||
|
||||
for i, mhash := range messageHashes {
|
||||
storeRequest.MessageHashes[i] = mhash.Bytes()
|
||||
}
|
||||
|
||||
response, err := d.node.StoreQuery(ctx, storeRequest, peerInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if response.GetStatusCode() != http.StatusOK {
|
||||
return nil, fmt.Errorf("could not query storenode: %s %d %s", requestIDStr, response.GetStatusCode(), response.GetStatusDesc())
|
||||
}
|
||||
|
||||
result := make([]pb.MessageHash, len(response.Messages))
|
||||
for i, msg := range response.Messages {
|
||||
result[i] = pb.ToMessageHash(msg.GetMessageHash())
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
//go:build use_nwaku
|
||||
// +build use_nwaku
|
||||
|
||||
package wakuv2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
commonapi "github.com/waku-org/go-waku/waku/v2/api/common"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type storenodeRequestor struct {
|
||||
node *WakuNode
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
func newStorenodeRequestor(node *WakuNode, logger *zap.Logger) commonapi.StorenodeRequestor {
|
||||
return &storenodeRequestor{
|
||||
node: node,
|
||||
logger: logger.Named("storenodeRequestor"),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *storenodeRequestor) GetMessagesByHash(ctx context.Context, peerInfo peer.AddrInfo, pageSize uint64, messageHashes []pb.MessageHash) (commonapi.StoreRequestResult, error) {
|
||||
requestIDStr := hex.EncodeToString(protocol.GenerateRequestID())
|
||||
|
||||
logger := s.logger.With(zap.Stringer("peerID", peerInfo.ID), zap.String("requestID", requestIDStr))
|
||||
|
||||
logger.Debug("sending store request")
|
||||
|
||||
storeRequest := &storepb.StoreQueryRequest{
|
||||
RequestId: requestIDStr,
|
||||
MessageHashes: make([][]byte, len(messageHashes)),
|
||||
IncludeData: true,
|
||||
PaginationCursor: nil,
|
||||
PaginationForward: false,
|
||||
PaginationLimit: proto.Uint64(pageSize),
|
||||
}
|
||||
|
||||
for i, mhash := range messageHashes {
|
||||
storeRequest.MessageHashes[i] = mhash.Bytes()
|
||||
}
|
||||
|
||||
storeResponse, err := s.node.StoreQuery(ctx, storeRequest, peerInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if storeResponse.GetStatusCode() != http.StatusOK {
|
||||
return nil, fmt.Errorf("could not query storenode: %s %d %s", requestIDStr, storeResponse.GetStatusCode(), storeResponse.GetStatusDesc())
|
||||
}
|
||||
|
||||
return newStoreResultImpl(s.node, peerInfo, storeRequest, storeResponse), nil
|
||||
}
|
||||
|
||||
func (s *storenodeRequestor) Query(ctx context.Context, peerInfo peer.AddrInfo, storeRequest *storepb.StoreQueryRequest) (commonapi.StoreRequestResult, error) {
|
||||
storeResponse, err := s.node.StoreQuery(ctx, storeRequest, peerInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if storeResponse.GetStatusCode() != http.StatusOK {
|
||||
return nil, fmt.Errorf("could not query storenode: %s %d %s", storeRequest.RequestId, storeResponse.GetStatusCode(), storeResponse.GetStatusDesc())
|
||||
}
|
||||
|
||||
return newStoreResultImpl(s.node, peerInfo, storeRequest, storeResponse), nil
|
||||
}
|
|
@ -1,3 +1,6 @@
|
|||
//go:build !use_nwaku
|
||||
// +build !use_nwaku
|
||||
|
||||
package wakuv2
|
||||
|
||||
import (
|
||||
|
@ -545,8 +548,9 @@ func TestWakuV2Store(t *testing.T) {
|
|||
}()
|
||||
|
||||
// Connect the two nodes directly
|
||||
peer2Addr := w2.node.ListenAddresses()[0].String()
|
||||
err = w1.node.DialPeer(context.Background(), peer2Addr)
|
||||
peer2Addr, err := w2.ListenAddresses()
|
||||
require.NoError(t, err)
|
||||
err = w1.node.DialPeer(context.Background(), peer2Addr[0].String())
|
||||
require.NoError(t, err)
|
||||
|
||||
waitForPeerConnection(t, w2.node.Host().ID(), w1PeersCh)
|
||||
|
@ -719,7 +723,9 @@ func TestLightpushRateLimit(t *testing.T) {
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
//Connect the relay peer and full node
|
||||
err = w1.node.DialPeer(ctx, w0.node.ListenAddresses()[0].String())
|
||||
peerAddr, err := w0.ListenAddresses()
|
||||
require.NoError(t, err)
|
||||
err = w1.node.DialPeer(ctx, peerAddr[0].String())
|
||||
require.NoError(t, err)
|
||||
|
||||
err = tt.RetryWithBackOff(func() error {
|
||||
|
@ -746,7 +752,9 @@ func TestLightpushRateLimit(t *testing.T) {
|
|||
}()
|
||||
|
||||
//Use this instead of DialPeer to make sure the peer is added to PeerStore and can be selected for Lighpush
|
||||
w2.node.AddDiscoveredPeer(w1.PeerID(), w1.node.ListenAddresses(), wps.Static, w1.cfg.DefaultShardedPubsubTopics, w1.node.ENR(), true)
|
||||
addresses, err := w1.ListenAddresses()
|
||||
require.NoError(t, err)
|
||||
w2.node.AddDiscoveredPeer(w1.PeerID(), addresses, wps.Static, w1.cfg.DefaultShardedPubsubTopics, w1.node.ENR(), true)
|
||||
|
||||
waitForPeerConnectionWithTimeout(t, w2.node.Host().ID(), w1PeersCh, 5*time.Second)
|
||||
|
||||
|
|
Loading…
Reference in New Issue