mirror of
https://github.com/status-im/status-go.git
synced 2025-02-19 18:28:18 +00:00
refactor: only use shards (#5474)
* refactor_: use shards by default * fix_: metadata lightclient check * chore_: update go-waku
This commit is contained in:
parent
094d26dc66
commit
9a703162c4
@ -155,7 +155,6 @@ func SetFleet(fleet string, nodeConfig *params.NodeConfig) error {
|
||||
|
||||
if fleet == params.FleetShardsTest {
|
||||
nodeConfig.ClusterConfig.ClusterID = shardsTestClusterID
|
||||
nodeConfig.WakuV2Config.UseShardAsDefaultTopic = true
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -59,7 +59,6 @@ func (s *MessengerRawMessageResendTest) SetupTest() {
|
||||
EnableDiscV5: true,
|
||||
EnablePeerExchangeServer: true,
|
||||
ClusterID: 16,
|
||||
UseShardAsDefaultTopic: true,
|
||||
DefaultShardPubsubTopic: shard.DefaultShardPubsubTopic(),
|
||||
}
|
||||
s.exchangeBootNode, err = wakuv2.New(nil, "", exchangeNodeConfig, s.logger.Named("pxServerNode"), nil, nil, nil, nil)
|
||||
|
@ -48,8 +48,8 @@ var (
|
||||
seedPhrase = flag.String("seed-phrase", "", "Seed phrase")
|
||||
version = flag.Bool("version", false, "Print version and dump configuration")
|
||||
communityID = flag.String("community-id", "", "The id of the community")
|
||||
shardCluster = flag.Int("shard-cluster", shard.UndefinedShardValue, "The shard cluster in which the of the community is published")
|
||||
shardIndex = flag.Int("shard-index", shard.UndefinedShardValue, "The shard index in which the community is published")
|
||||
shardCluster = flag.Int("shard-cluster", shard.MainStatusShardCluster, "The shard cluster in which the of the community is published")
|
||||
shardIndex = flag.Int("shard-index", shard.DefaultShardIndex, "The shard index in which the community is published")
|
||||
chatID = flag.String("chat-id", "", "The id of the chat")
|
||||
|
||||
dataDir = flag.String("dir", getDefaultDataDir(), "Directory used by node to store data")
|
||||
@ -149,7 +149,7 @@ func main() {
|
||||
messenger := wakuextservice.Messenger()
|
||||
|
||||
var s *shard.Shard = nil
|
||||
if shardCluster != nil && shardIndex != nil && *shardCluster != shard.UndefinedShardValue && *shardIndex != shard.UndefinedShardValue {
|
||||
if shardCluster != nil && shardIndex != nil {
|
||||
s = &shard.Shard{
|
||||
Cluster: uint16(*shardCluster),
|
||||
Index: uint16(*shardIndex),
|
||||
|
@ -28,7 +28,6 @@
|
||||
]
|
||||
},
|
||||
"WakuV2Config": {
|
||||
"Enabled": true,
|
||||
"UseShardAsDefaultTopic": true
|
||||
"Enabled": true
|
||||
}
|
||||
}
|
||||
|
2
go.mod
2
go.mod
@ -96,7 +96,7 @@ require (
|
||||
github.com/schollz/peerdiscovery v1.7.0
|
||||
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
|
||||
github.com/urfave/cli/v2 v2.27.2
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240713051642-554e93791529
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240715141727-dacff8a6ae5d
|
||||
github.com/wk8/go-ordered-map/v2 v2.1.7
|
||||
github.com/yeqown/go-qrcode/v2 v2.2.1
|
||||
github.com/yeqown/go-qrcode/writer/standard v1.2.1
|
||||
|
4
go.sum
4
go.sum
@ -2143,8 +2143,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E
|
||||
github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ=
|
||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
|
||||
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240713051642-554e93791529 h1:iP8SuT74PV+jOtWHuzlBYGjsyw9RrY/V8o2Y71zshsw=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240713051642-554e93791529/go.mod h1:ugDTCvcP6oJ9mTtINeo4EIsnC3oQCU3RsctNKu4MsRw=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240715141727-dacff8a6ae5d h1:bXYwTtpvJSckgV1Jks2XpnZzvk1x1StJw9cL1odqOwY=
|
||||
github.com/waku-org/go-waku v0.8.1-0.20240715141727-dacff8a6ae5d/go.mod h1:ugDTCvcP6oJ9mTtINeo4EIsnC3oQCU3RsctNKu4MsRw=
|
||||
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
|
||||
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
|
||||
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=
|
||||
|
@ -334,7 +334,6 @@ func (b *StatusNode) wakuV2Service(nodeConfig *params.NodeConfig) (*wakuv2.Waku,
|
||||
UDPPort: nodeConfig.WakuV2Config.UDPPort,
|
||||
AutoUpdate: nodeConfig.WakuV2Config.AutoUpdate,
|
||||
DefaultShardPubsubTopic: shard.DefaultShardPubsubTopic(),
|
||||
UseShardAsDefaultTopic: nodeConfig.WakuV2Config.UseShardAsDefaultTopic,
|
||||
TelemetryServerURL: nodeConfig.WakuV2Config.TelemetryServerURL,
|
||||
ClusterID: nodeConfig.ClusterConfig.ClusterID,
|
||||
EnableMissingMessageVerification: nodeConfig.WakuV2Config.EnableMissingMessageVerification,
|
||||
|
@ -240,13 +240,11 @@ func insertWakuV2ConfigPostMigration(tx *sql.Tx, c *params.NodeConfig) error {
|
||||
SET enable_store = ?,
|
||||
store_capacity = ?,
|
||||
store_seconds = ?,
|
||||
use_shard_default_topic = ?,
|
||||
enable_missing_message_verification = ?,
|
||||
enable_store_confirmation_for_messages_sent = ?
|
||||
WHERE synthetic_id = 'id'`,
|
||||
c.WakuV2Config.EnableStore, c.WakuV2Config.StoreCapacity, c.WakuV2Config.StoreSeconds,
|
||||
c.WakuV2Config.UseShardAsDefaultTopic, c.WakuV2Config.EnableMissingMessageVerification,
|
||||
c.WakuV2Config.EnableStoreConfirmationForMessagesSent,
|
||||
c.WakuV2Config.EnableMissingMessageVerification, c.WakuV2Config.EnableStoreConfirmationForMessagesSent,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
@ -680,14 +678,14 @@ func loadNodeConfig(tx *sql.Tx) (*params.NodeConfig, error) {
|
||||
err = tx.QueryRow(`
|
||||
SELECT enabled, host, port, light_client, full_node, discovery_limit, data_dir,
|
||||
max_message_size, enable_confirmations, peer_exchange, enable_discv5, udp_port, auto_update,
|
||||
enable_store, store_capacity, store_seconds, use_shard_default_topic, enable_missing_message_verification,
|
||||
enable_store, store_capacity, store_seconds, enable_missing_message_verification,
|
||||
enable_store_confirmation_for_messages_sent
|
||||
FROM wakuv2_config WHERE synthetic_id = 'id'
|
||||
`).Scan(
|
||||
&nodecfg.WakuV2Config.Enabled, &nodecfg.WakuV2Config.Host, &nodecfg.WakuV2Config.Port, &nodecfg.WakuV2Config.LightClient, &nodecfg.WakuV2Config.FullNode,
|
||||
&nodecfg.WakuV2Config.DiscoveryLimit, &nodecfg.WakuV2Config.DataDir, &nodecfg.WakuV2Config.MaxMessageSize, &nodecfg.WakuV2Config.EnableConfirmations,
|
||||
&nodecfg.WakuV2Config.PeerExchange, &nodecfg.WakuV2Config.EnableDiscV5, &nodecfg.WakuV2Config.UDPPort, &nodecfg.WakuV2Config.AutoUpdate,
|
||||
&nodecfg.WakuV2Config.EnableStore, &nodecfg.WakuV2Config.StoreCapacity, &nodecfg.WakuV2Config.StoreSeconds, &nodecfg.WakuV2Config.UseShardAsDefaultTopic,
|
||||
&nodecfg.WakuV2Config.EnableStore, &nodecfg.WakuV2Config.StoreCapacity, &nodecfg.WakuV2Config.StoreSeconds,
|
||||
&nodecfg.WakuV2Config.EnableMissingMessageVerification, &nodecfg.WakuV2Config.EnableStoreConfirmationForMessagesSent,
|
||||
)
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
|
@ -212,9 +212,6 @@ type WakuV2Config struct {
|
||||
|
||||
TelemetryServerURL string
|
||||
|
||||
// UseShardAsDefaultTopic indicates whether the default shard should be used instead of the default relay topic
|
||||
UseShardAsDefaultTopic bool
|
||||
|
||||
// EnableMissingMessageVerification indicates whether the storenodes must be queried periodically to retrieve any missing message
|
||||
EnableMissingMessageVerification bool
|
||||
|
||||
|
@ -42,7 +42,6 @@ func (s *Shard) PubsubTopic() string {
|
||||
const MainStatusShardCluster = 16
|
||||
const DefaultShardIndex = 32
|
||||
const NonProtectedShardIndex = 64
|
||||
const UndefinedShardValue = 0
|
||||
|
||||
func DefaultShardPubsubTopic() string {
|
||||
return wakuproto.NewStaticShardingPubsubTopic(MainStatusShardCluster, DefaultShardIndex).String()
|
||||
|
@ -2155,6 +2155,12 @@ func (m *Manager) HandleCommunityDescriptionMessage(signer *ecdsa.PublicKey, des
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var cShard *shard.Shard
|
||||
if communityShard == nil {
|
||||
cShard = &shard.Shard{Cluster: shard.MainStatusShardCluster, Index: shard.DefaultShardIndex}
|
||||
} else {
|
||||
cShard = shard.FromProtobuff(communityShard)
|
||||
}
|
||||
config := Config{
|
||||
CommunityDescription: processedDescription,
|
||||
Logger: m.logger,
|
||||
@ -2162,7 +2168,7 @@ func (m *Manager) HandleCommunityDescriptionMessage(signer *ecdsa.PublicKey, des
|
||||
MemberIdentity: m.identity,
|
||||
ID: pubKey,
|
||||
ControlNode: signer,
|
||||
Shard: shard.FromProtobuff(communityShard),
|
||||
Shard: cShard,
|
||||
}
|
||||
|
||||
var descriptionEncryptor DescriptionEncryptor
|
||||
|
@ -474,8 +474,7 @@ func advertiseCommunityTo(s *suite.Suite, community *communities.Community, owne
|
||||
messageState := user.buildMessageState()
|
||||
messageState.CurrentMessageState = &CurrentMessageState{}
|
||||
messageState.CurrentMessageState.PublicKey = &user.identity.PublicKey
|
||||
// TODO: handle shards?
|
||||
err = user.handleCommunityDescription(messageState, signer, description, wrappedCommunity, nil, nil)
|
||||
err = user.handleCommunityDescription(messageState, signer, description, wrappedCommunity, nil, community.Shard().Protobuffer())
|
||||
s.Require().NoError(err)
|
||||
}
|
||||
|
||||
|
@ -56,7 +56,7 @@ func (s *MessengerCommunitiesSharedMemberAddressSuite) SetupTest() {
|
||||
|
||||
s.logger = tt.MustCreateTestLogger()
|
||||
|
||||
wakuNodes := CreateWakuV2Network(&s.Suite, s.logger, false, []string{"owner", "bob", "alice"})
|
||||
wakuNodes := CreateWakuV2Network(&s.Suite, s.logger, []string{"owner", "bob", "alice"})
|
||||
|
||||
s.ownerWaku = wakuNodes[0]
|
||||
s.owner = s.newMessenger(ownerPassword, []string{ownerAddress}, s.ownerWaku, "owner", []Option{})
|
||||
|
@ -4797,7 +4797,7 @@ func (s *MessengerCommunitiesSuite) TestIgnoreOutdatedCommunityDescription() {
|
||||
// Handle first community description
|
||||
{
|
||||
messageState := s.bob.buildMessageState()
|
||||
err = s.bob.handleCommunityDescription(messageState, signer, description1, wrappedDescription1, nil, nil)
|
||||
err = s.bob.handleCommunityDescription(messageState, signer, description1, wrappedDescription1, nil, community.Shard().Protobuffer())
|
||||
s.Require().NoError(err)
|
||||
s.Require().Len(messageState.Response.Communities(), 1)
|
||||
s.Require().Equal(description1.Clock, messageState.Response.Communities()[0].Clock())
|
||||
@ -4806,7 +4806,7 @@ func (s *MessengerCommunitiesSuite) TestIgnoreOutdatedCommunityDescription() {
|
||||
// Handle third community description
|
||||
{
|
||||
messageState := s.bob.buildMessageState()
|
||||
err = s.bob.handleCommunityDescription(messageState, signer, description3, wrappedDescription3, nil, nil)
|
||||
err = s.bob.handleCommunityDescription(messageState, signer, description3, wrappedDescription3, nil, community.Shard().Protobuffer())
|
||||
s.Require().NoError(err)
|
||||
s.Require().Len(messageState.Response.Communities(), 1)
|
||||
s.Require().Equal(description3.Clock, messageState.Response.Communities()[0].Clock())
|
||||
@ -4821,7 +4821,7 @@ func (s *MessengerCommunitiesSuite) TestIgnoreOutdatedCommunityDescription() {
|
||||
// It should be ignored
|
||||
{
|
||||
messageState := s.bob.buildMessageState()
|
||||
err = s.bob.handleCommunityDescription(messageState, signer, description2, wrappedDescription2, nil, nil)
|
||||
err = s.bob.handleCommunityDescription(messageState, signer, description2, wrappedDescription2, nil, community.Shard().Protobuffer())
|
||||
s.Require().Len(messageState.Response.Communities(), 0)
|
||||
s.Require().Len(messageState.Response.CommunityChanges, 0)
|
||||
s.Require().ErrorIs(err, communities.ErrInvalidCommunityDescriptionClockOutdated)
|
||||
|
@ -163,7 +163,7 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) SetupTest() {
|
||||
|
||||
s.logger = tt.MustCreateTestLogger()
|
||||
|
||||
wakuNodes := CreateWakuV2Network(&s.Suite, s.logger, false, []string{"owner", "bob", "alice"})
|
||||
wakuNodes := CreateWakuV2Network(&s.Suite, s.logger, []string{"owner", "bob", "alice"})
|
||||
|
||||
s.ownerWaku = wakuNodes[0]
|
||||
s.owner = s.newMessenger(ownerPassword, []string{ownerAddress}, s.ownerWaku, "owner", []Option{})
|
||||
@ -465,10 +465,9 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestBecomeMemberPermissions(
|
||||
var err error
|
||||
|
||||
cfg := testWakuV2Config{
|
||||
logger: s.logger.Named("store-node-waku"),
|
||||
enableStore: false,
|
||||
useShardAsDefaultTopic: false,
|
||||
clusterID: shard.UndefinedShardValue,
|
||||
logger: s.logger.Named("store-node-waku"),
|
||||
enableStore: false,
|
||||
clusterID: shard.MainStatusShardCluster,
|
||||
}
|
||||
wakuStoreNode := NewTestWakuV2(&s.Suite, cfg)
|
||||
|
||||
@ -480,7 +479,7 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestBecomeMemberPermissions(
|
||||
|
||||
// Create messengers
|
||||
|
||||
wakuNodes := CreateWakuV2Network(&s.Suite, s.logger, false, []string{"owner", "bob"})
|
||||
wakuNodes := CreateWakuV2Network(&s.Suite, s.logger, []string{"owner", "bob"})
|
||||
s.ownerWaku = wakuNodes[0]
|
||||
s.bobWaku = wakuNodes[1]
|
||||
|
||||
|
@ -1757,12 +1757,10 @@ func (m *Messenger) InitFilters() error {
|
||||
|
||||
logger := m.logger.With(zap.String("site", "Init"))
|
||||
|
||||
if m.useShards() {
|
||||
// Community requests will arrive in this pubsub topic
|
||||
err := m.SubscribeToPubsubTopic(shard.DefaultNonProtectedPubsubTopic(), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Community requests will arrive in this pubsub topic
|
||||
err := m.SubscribeToPubsubTopic(shard.DefaultNonProtectedPubsubTopic(), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -156,6 +156,7 @@ func (m *Messenger) publishOrg(org *communities.Community, shouldRekey bool) err
|
||||
messageID, err := m.sender.SendPublic(context.Background(), org.IDString(), rawMessage)
|
||||
if err == nil {
|
||||
m.logger.Debug("published community",
|
||||
zap.String("pubsubTopic", org.PubsubTopic()),
|
||||
zap.String("communityID", org.IDString()),
|
||||
zap.String("messageID", hexutil.Encode(messageID)),
|
||||
zap.Uint64("clock", org.Clock()),
|
||||
@ -2423,16 +2424,8 @@ func (m *Messenger) DeleteCommunityChat(communityID types.HexBytes, chatID strin
|
||||
return response, nil
|
||||
}
|
||||
|
||||
func (m *Messenger) useShards() bool {
|
||||
nodeConfig, err := m.settings.GetNodeConfig()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return nodeConfig.WakuV2Config.UseShardAsDefaultTopic
|
||||
}
|
||||
|
||||
func (m *Messenger) InitCommunityFilters(communityFiltersToInitialize []transport.CommunityFilterToInitialize) ([]*transport.Filter, error) {
|
||||
return m.transport.InitCommunityFilters(communityFiltersToInitialize, m.useShards())
|
||||
return m.transport.InitCommunityFilters(communityFiltersToInitialize)
|
||||
}
|
||||
|
||||
func (m *Messenger) DefaultFilters(o *communities.Community) []transport.FiltersToInitialize {
|
||||
@ -2449,12 +2442,7 @@ func (m *Messenger) DefaultFilters(o *communities.Community) []transport.Filters
|
||||
{ChatID: updatesChannelID, PubsubTopic: communityPubsubTopic},
|
||||
{ChatID: mlChannelID, PubsubTopic: communityPubsubTopic},
|
||||
{ChatID: memberUpdateChannelID, PubsubTopic: communityPubsubTopic},
|
||||
}
|
||||
|
||||
if m.useShards() {
|
||||
filters = append(filters, transport.FiltersToInitialize{ChatID: uncompressedPubKey, PubsubTopic: shard.DefaultNonProtectedPubsubTopic()})
|
||||
} else {
|
||||
filters = append(filters, transport.FiltersToInitialize{ChatID: uncompressedPubKey, PubsubTopic: communityPubsubTopic})
|
||||
{ChatID: uncompressedPubKey, PubsubTopic: shard.DefaultNonProtectedPubsubTopic()},
|
||||
}
|
||||
|
||||
return filters
|
||||
@ -3769,7 +3757,7 @@ func (m *Messenger) handleSyncInstallationCommunity(messageState *ReceivedMessag
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO: handle shard
|
||||
// Passing shard as nil so that defaultProtected shard 32 is considered
|
||||
err = m.handleCommunityDescription(messageState, signer, &cd, syncCommunity.Description, signer, nil)
|
||||
// Even if the Description is outdated we should proceed in order to sync settings and joined state
|
||||
if err != nil && err != communities.ErrInvalidCommunityDescriptionClockOutdated {
|
||||
|
@ -39,10 +39,9 @@ type MessengerCommunitiesShardingSuite struct {
|
||||
func (s *MessengerCommunitiesShardingSuite) SetupTest() {
|
||||
s.logger = tt.MustCreateTestLogger()
|
||||
|
||||
wakuNodes := CreateWakuV2Network(&s.Suite, s.logger, true, []string{"owner", "alice"})
|
||||
wakuNodes := CreateWakuV2Network(&s.Suite, s.logger, []string{"owner", "alice"})
|
||||
|
||||
nodeConfig := defaultTestCommunitiesMessengerNodeConfig()
|
||||
nodeConfig.WakuV2Config.UseShardAsDefaultTopic = true
|
||||
|
||||
s.ownerWaku = wakuNodes[0]
|
||||
s.owner = newTestCommunitiesMessenger(&s.Suite, s.ownerWaku, testCommunitiesMessengerConfig{
|
||||
|
@ -3846,7 +3846,7 @@ func (m *Messenger) HandlePushNotificationRequest(state *ReceivedMessageState, m
|
||||
}
|
||||
|
||||
func (m *Messenger) HandleCommunityDescription(state *ReceivedMessageState, message *protobuf.CommunityDescription, statusMessage *v1protocol.StatusMessage) error {
|
||||
// TODO: handle shard
|
||||
// shard passed as nil since it is handled within by using default shard
|
||||
err := m.handleCommunityDescription(state, state.CurrentMessageState.PublicKey, message, statusMessage.EncryptionLayer.Payload, nil, nil)
|
||||
if err != nil {
|
||||
m.logger.Warn("failed to handle CommunityDescription", zap.Error(err))
|
||||
|
@ -91,7 +91,7 @@ type MessengerMessagesTrackingSuite struct {
|
||||
func (s *MessengerMessagesTrackingSuite) SetupTest() {
|
||||
s.logger = tt.MustCreateTestLogger()
|
||||
|
||||
wakuNodes := CreateWakuV2Network(&s.Suite, s.logger, false, []string{"bob", "alice"})
|
||||
wakuNodes := CreateWakuV2Network(&s.Suite, s.logger, []string{"bob", "alice"})
|
||||
|
||||
s.bobWaku = wakuNodes[0]
|
||||
s.bob, s.bobInterceptor = s.newMessenger(s.bobWaku, s.logger.With(zap.String("name", "bob")))
|
||||
|
@ -42,7 +42,7 @@ func TestMessengerOfflineSuite(t *testing.T) {
|
||||
func (s *MessengerOfflineSuite) SetupTest() {
|
||||
s.logger = tt.MustCreateTestLogger()
|
||||
|
||||
wakuNodes := CreateWakuV2Network(&s.Suite, s.logger, false, []string{"owner", "bob", "alice"})
|
||||
wakuNodes := CreateWakuV2Network(&s.Suite, s.logger, []string{"owner", "bob", "alice"})
|
||||
|
||||
ownerLogger := s.logger.With(zap.String("name", "owner"))
|
||||
s.ownerWaku = wakuNodes[0]
|
||||
|
@ -86,10 +86,9 @@ func (s *MessengerStoreNodeCommunitySuite) TearDown() {
|
||||
|
||||
func (s *MessengerStoreNodeCommunitySuite) createStore(name string) (*waku2.Waku, string) {
|
||||
cfg := testWakuV2Config{
|
||||
logger: s.logger.Named(name),
|
||||
enableStore: true,
|
||||
useShardAsDefaultTopic: false,
|
||||
clusterID: shard.UndefinedShardValue,
|
||||
logger: s.logger.Named(name),
|
||||
enableStore: true,
|
||||
clusterID: shard.MainStatusShardCluster,
|
||||
}
|
||||
|
||||
storeNode := NewTestWakuV2(&s.Suite, cfg)
|
||||
@ -104,10 +103,9 @@ func (s *MessengerStoreNodeCommunitySuite) newMessenger(name, storenodeAddress s
|
||||
|
||||
logger := s.logger.Named(name)
|
||||
cfg := testWakuV2Config{
|
||||
logger: logger,
|
||||
enableStore: false,
|
||||
useShardAsDefaultTopic: false,
|
||||
clusterID: shard.UndefinedShardValue,
|
||||
logger: logger,
|
||||
enableStore: false,
|
||||
clusterID: shard.MainStatusShardCluster,
|
||||
}
|
||||
wakuV2 := NewTestWakuV2(&s.Suite, cfg)
|
||||
wakuV2Wrapper := gethbridge.NewGethWakuV2Wrapper(wakuV2)
|
||||
|
@ -155,10 +155,9 @@ func (s *MessengerStoreNodeRequestSuite) TearDown() {
|
||||
|
||||
func (s *MessengerStoreNodeRequestSuite) createStore() {
|
||||
cfg := testWakuV2Config{
|
||||
logger: s.logger.Named("store-waku"),
|
||||
enableStore: true,
|
||||
useShardAsDefaultTopic: false,
|
||||
clusterID: shard.UndefinedShardValue,
|
||||
logger: s.logger.Named("store-waku"),
|
||||
enableStore: true,
|
||||
clusterID: shard.MainStatusShardCluster,
|
||||
}
|
||||
|
||||
s.wakuStoreNode = NewTestWakuV2(&s.Suite, cfg)
|
||||
@ -174,10 +173,9 @@ func (s *MessengerStoreNodeRequestSuite) tearDownOwner() {
|
||||
func (s *MessengerStoreNodeRequestSuite) createOwner() {
|
||||
|
||||
cfg := testWakuV2Config{
|
||||
logger: s.logger.Named("owner-waku"),
|
||||
enableStore: false,
|
||||
useShardAsDefaultTopic: false,
|
||||
clusterID: shard.UndefinedShardValue,
|
||||
logger: s.logger.Named("owner-waku"),
|
||||
enableStore: false,
|
||||
clusterID: shard.MainStatusShardCluster,
|
||||
}
|
||||
|
||||
wakuV2 := NewTestWakuV2(&s.Suite, cfg)
|
||||
@ -196,10 +194,9 @@ func (s *MessengerStoreNodeRequestSuite) createOwner() {
|
||||
|
||||
func (s *MessengerStoreNodeRequestSuite) createBob() {
|
||||
cfg := testWakuV2Config{
|
||||
logger: s.logger.Named("bob-waku"),
|
||||
enableStore: false,
|
||||
useShardAsDefaultTopic: false,
|
||||
clusterID: shard.UndefinedShardValue,
|
||||
logger: s.logger.Named("bob-waku"),
|
||||
enableStore: false,
|
||||
clusterID: shard.MainStatusShardCluster,
|
||||
}
|
||||
wakuV2 := NewTestWakuV2(&s.Suite, cfg)
|
||||
s.bobWaku = gethbridge.NewGethWakuV2Wrapper(wakuV2)
|
||||
@ -691,24 +688,33 @@ func (s *MessengerStoreNodeRequestSuite) TestSequentialUpdates() {
|
||||
func (s *MessengerStoreNodeRequestSuite) TestRequestShardAndCommunityInfo() {
|
||||
s.createOwner()
|
||||
s.createBob()
|
||||
|
||||
community := s.createCommunity(s.owner)
|
||||
|
||||
topicPrivKey, err := crypto.GenerateKey()
|
||||
s.Require().NoError(err)
|
||||
|
||||
expectedShard := &shard.Shard{
|
||||
Cluster: shard.MainStatusShardCluster,
|
||||
Index: 23,
|
||||
}
|
||||
|
||||
err = s.wakuStoreNode.SubscribeToPubsubTopic(expectedShard.PubsubTopic(), &topicPrivKey.PublicKey)
|
||||
s.Require().NoError(err)
|
||||
|
||||
topicPrivKeyBytes := crypto.FromECDSA(topicPrivKey)
|
||||
h := types.HexBytes(topicPrivKeyBytes)
|
||||
|
||||
shardRequest := &requests.SetCommunityShard{
|
||||
CommunityID: community.ID(),
|
||||
Shard: expectedShard,
|
||||
PrivateKey: &h,
|
||||
}
|
||||
|
||||
shardTopic := transport.CommunityShardInfoTopic(community.IDString())
|
||||
contentContentTopic := wakuV2common.BytesToTopic(transport.ToTopic(shardTopic))
|
||||
storeNodeSubscription := s.setupStoreNodeEnvelopesWatcher(&contentContentTopic)
|
||||
|
||||
_, err := s.owner.SetCommunityShard(shardRequest)
|
||||
_, err = s.owner.SetCommunityShard(shardRequest)
|
||||
s.Require().NoError(err)
|
||||
|
||||
s.waitForEnvelopes(storeNodeSubscription, 1)
|
||||
@ -831,13 +837,12 @@ type testFetchRealCommunityExampleTokenInfo struct {
|
||||
}
|
||||
|
||||
var testFetchRealCommunityExample = []struct {
|
||||
CommunityID string
|
||||
CommunityURL string // If set, takes precedence over CommunityID
|
||||
CommunityShard *shard.Shard // WARNING: I didn't test a sharded community
|
||||
Fleet string
|
||||
UseShardAsDefaultTopic bool
|
||||
ClusterID uint16
|
||||
UserPrivateKeyString string // When empty a new user will be created
|
||||
CommunityID string
|
||||
CommunityURL string // If set, takes precedence over CommunityID
|
||||
CommunityShard *shard.Shard // WARNING: I didn't test a sharded community
|
||||
Fleet string
|
||||
ClusterID uint16
|
||||
UserPrivateKeyString string // When empty a new user will be created
|
||||
// Optional request parameters
|
||||
CustomOptions []StoreNodeRequestOption
|
||||
// Setup OwnerPublicKey and CommunityTokens if the community has owner token
|
||||
@ -852,19 +857,17 @@ var testFetchRealCommunityExample = []struct {
|
||||
}{
|
||||
{
|
||||
//Example 1, status.prod fleet
|
||||
CommunityID: "0x03073514d4c14a7d10ae9fc9b0f05abc904d84166a6ac80add58bf6a3542a4e50a",
|
||||
CommunityShard: nil,
|
||||
Fleet: params.FleetStatusProd,
|
||||
UseShardAsDefaultTopic: false,
|
||||
ClusterID: shard.UndefinedShardValue,
|
||||
CommunityID: "0x03073514d4c14a7d10ae9fc9b0f05abc904d84166a6ac80add58bf6a3542a4e50a",
|
||||
CommunityShard: nil,
|
||||
Fleet: params.FleetStatusProd,
|
||||
ClusterID: shard.MainStatusShardCluster,
|
||||
},
|
||||
{
|
||||
// Example 3, shards.test fleet
|
||||
// https://status.app/c/CxiACi8KFGFwIHJlcSAxIHN0dCBiZWMgbWVtEgdkc2Fkc2FkGAMiByM0MzYwREYqAxkrHAM=#zQ3shwDYZHtrLE7NqoTGjTWzWUu6hom5D4qxfskLZfgfyGRyL
|
||||
CommunityID: "0x03f64be95ed5c925022265f9250f538f65ed3dcf6e4ef6c139803dc02a3487ae7b",
|
||||
Fleet: params.FleetShardsTest,
|
||||
UseShardAsDefaultTopic: true,
|
||||
ClusterID: shard.MainStatusShardCluster,
|
||||
CommunityID: "0x03f64be95ed5c925022265f9250f538f65ed3dcf6e4ef6c139803dc02a3487ae7b",
|
||||
Fleet: params.FleetShardsTest,
|
||||
ClusterID: shard.MainStatusShardCluster,
|
||||
|
||||
CheckExpectedEnvelopes: true,
|
||||
ExpectedShardEnvelopes: []string{
|
||||
@ -967,7 +970,6 @@ var testFetchRealCommunityExample = []struct {
|
||||
//Example 1, shards.test fleet
|
||||
CommunityID: "0x02471dd922756a3a50b623e59cf3b99355d6587e43d5c517eb55f9aea9d3fe9fe9",
|
||||
Fleet: params.FleetShardsTest,
|
||||
UseShardAsDefaultTopic: true,
|
||||
ClusterID: shard.MainStatusShardCluster,
|
||||
CheckExpectedEnvelopes: true,
|
||||
ExpectedShardEnvelopes: []string{
|
||||
@ -984,9 +986,8 @@ var testFetchRealCommunityExample = []struct {
|
||||
{
|
||||
CommunityURL: "https://status.app/c/G4IAAMQn9ucHF-V3W5Ouuy0xf0BtTjlwCANJEmwB2CG5p2xKUYzK_l37kzXulUppltT1t6mBcCEJsljRoGrKCP7rWommQomrMA2gBN7RrvCMkFqQwnCNzkNYWrLG85E6GVoM_nolTtfIzl53J1N-tj8fz4_TnO4IIw==#zQ3shZeEJqTC1xhGUjxuS4rtHSrhJ8vUYp64v6qWkLpvdy9L9",
|
||||
//CommunityID: "0x02b5bdaf5a25fcfe2ee14c501fab1836b8de57f61621080c3d52073d16de0d98d6",
|
||||
Fleet: params.FleetShardsTest,
|
||||
UseShardAsDefaultTopic: true,
|
||||
OwnerPublicKey: "0x04953f5f0d355b37c39d1d6460a31ed1114455f8263b3fd1b84406c5f12c9eb7dfb76ba7513b92186010928254984fe98aee069b4c7e20f9ea3da497c3ae769477",
|
||||
Fleet: params.FleetShardsTest,
|
||||
OwnerPublicKey: "0x04953f5f0d355b37c39d1d6460a31ed1114455f8263b3fd1b84406c5f12c9eb7dfb76ba7513b92186010928254984fe98aee069b4c7e20f9ea3da497c3ae769477",
|
||||
CommunityTokens: []testFetchRealCommunityExampleTokenInfo{
|
||||
{
|
||||
ChainID: 10,
|
||||
@ -1029,7 +1030,6 @@ func (s *MessengerStoreNodeRequestSuite) TestFetchRealCommunity() {
|
||||
communityID := exampleToRun.CommunityID
|
||||
communityShard := exampleToRun.CommunityShard
|
||||
fleet := exampleToRun.Fleet
|
||||
useShardAsDefaultTopic := exampleToRun.UseShardAsDefaultTopic
|
||||
clusterID := exampleToRun.ClusterID
|
||||
userPrivateKeyString := exampleToRun.UserPrivateKeyString
|
||||
ownerPublicKey := exampleToRun.OwnerPublicKey
|
||||
@ -1081,10 +1081,9 @@ func (s *MessengerStoreNodeRequestSuite) TestFetchRealCommunity() {
|
||||
messengerLogger := s.logger.Named(fmt.Sprintf("user-messenger-%d", i))
|
||||
|
||||
cfg := testWakuV2Config{
|
||||
logger: wakuLogger,
|
||||
enableStore: false,
|
||||
useShardAsDefaultTopic: useShardAsDefaultTopic,
|
||||
clusterID: clusterID,
|
||||
logger: wakuLogger,
|
||||
enableStore: false,
|
||||
clusterID: clusterID,
|
||||
}
|
||||
wakuV2 := NewTestWakuV2(&s.Suite, cfg)
|
||||
userWaku := gethbridge.NewGethWakuV2Wrapper(wakuV2)
|
||||
|
@ -145,7 +145,7 @@ type CommunityFilterToInitialize struct {
|
||||
PrivKey *ecdsa.PrivateKey
|
||||
}
|
||||
|
||||
func (f *FiltersManager) InitCommunityFilters(communityFiltersToInitialize []CommunityFilterToInitialize, useShards bool) ([]*Filter, error) {
|
||||
func (f *FiltersManager) InitCommunityFilters(communityFiltersToInitialize []CommunityFilterToInitialize) ([]*Filter, error) {
|
||||
var filters []*Filter
|
||||
f.mutex.Lock()
|
||||
defer f.mutex.Unlock()
|
||||
@ -158,12 +158,8 @@ func (f *FiltersManager) InitCommunityFilters(communityFiltersToInitialize []Com
|
||||
}
|
||||
|
||||
topics := make([]string, 0)
|
||||
if useShards {
|
||||
topics = append(topics, shard.DefaultNonProtectedPubsubTopic())
|
||||
} else {
|
||||
topics = append(topics, "") // empty PubsubTopic means default pubsub topic,
|
||||
// to be overridden with proper value in Waku layer
|
||||
}
|
||||
topics = append(topics, shard.DefaultNonProtectedPubsubTopic())
|
||||
topics = append(topics, communityFilter.Shard.PubsubTopic())
|
||||
|
||||
for _, pubsubTopic := range topics {
|
||||
pk := &cf.PrivKey.PublicKey
|
||||
@ -174,7 +170,7 @@ func (f *FiltersManager) InitCommunityFilters(communityFiltersToInitialize []Com
|
||||
return nil, err
|
||||
|
||||
}
|
||||
filterID := identityStr + "-admin"
|
||||
filterID := identityStr + "-admin" + pubsubTopic
|
||||
filter := &Filter{
|
||||
ChatID: filterID,
|
||||
FilterID: rawFilter.FilterID,
|
||||
@ -187,7 +183,7 @@ func (f *FiltersManager) InitCommunityFilters(communityFiltersToInitialize []Com
|
||||
|
||||
f.filters[filterID] = filter
|
||||
|
||||
f.logger.Debug("registering filter for", zap.String("chatID", filterID), zap.String("type", "community"), zap.String("topic", rawFilter.Topic.String()))
|
||||
f.logger.Debug("registering filter for", zap.String("chatID", filterID), zap.String("type", "community"), zap.String("pubsubTopic", pubsubTopic), zap.String("contentTopic", rawFilter.Topic.String()))
|
||||
|
||||
filters = append(filters, filter)
|
||||
}
|
||||
@ -543,6 +539,17 @@ func (f *FiltersManager) LoadPublic(chatID string, pubsubTopic string) (*Filter,
|
||||
defer f.mutex.Unlock()
|
||||
|
||||
if chat, ok := f.filters[chatID]; ok {
|
||||
if chat.PubsubTopic != pubsubTopic {
|
||||
f.logger.Debug("updating pubsub topic for filter",
|
||||
zap.String("chatID", chatID),
|
||||
zap.String("type", "public"),
|
||||
zap.String("oldTOpic", chat.PubsubTopic),
|
||||
zap.String("newTopic", pubsubTopic),
|
||||
)
|
||||
chat.PubsubTopic = pubsubTopic
|
||||
f.filters[chatID] = chat
|
||||
}
|
||||
|
||||
return chat, nil
|
||||
}
|
||||
|
||||
|
@ -165,8 +165,8 @@ func (t *Transport) LoadFilters(filters []*Filter) ([]*Filter, error) {
|
||||
return t.filters.InitWithFilters(filters)
|
||||
}
|
||||
|
||||
func (t *Transport) InitCommunityFilters(communityFiltersToInitialize []CommunityFilterToInitialize, useShards bool) ([]*Filter, error) {
|
||||
return t.filters.InitCommunityFilters(communityFiltersToInitialize, useShards)
|
||||
func (t *Transport) InitCommunityFilters(communityFiltersToInitialize []CommunityFilterToInitialize) ([]*Filter, error) {
|
||||
return t.filters.InitCommunityFilters(communityFiltersToInitialize)
|
||||
}
|
||||
|
||||
func (t *Transport) RemoveFilters(filters []*Filter) error {
|
||||
|
@ -18,16 +18,14 @@ import (
|
||||
)
|
||||
|
||||
type testWakuV2Config struct {
|
||||
logger *zap.Logger
|
||||
enableStore bool
|
||||
useShardAsDefaultTopic bool
|
||||
clusterID uint16
|
||||
nodekey []byte
|
||||
logger *zap.Logger
|
||||
enableStore bool
|
||||
clusterID uint16
|
||||
nodekey []byte
|
||||
}
|
||||
|
||||
func NewTestWakuV2(s *suite.Suite, cfg testWakuV2Config) *waku2.Waku {
|
||||
wakuConfig := &waku2.Config{
|
||||
UseShardAsDefaultTopic: cfg.useShardAsDefaultTopic,
|
||||
ClusterID: cfg.clusterID,
|
||||
LightClient: false,
|
||||
EnablePeerExchangeServer: true,
|
||||
@ -41,12 +39,10 @@ func NewTestWakuV2(s *suite.Suite, cfg testWakuV2Config) *waku2.Waku {
|
||||
}
|
||||
|
||||
var db *sql.DB
|
||||
db, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{})
|
||||
s.Require().NoError(err)
|
||||
|
||||
if cfg.enableStore {
|
||||
var err error
|
||||
db, err = helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{})
|
||||
s.Require().NoError(err)
|
||||
|
||||
wakuConfig.EnableStore = true
|
||||
wakuConfig.StoreCapacity = 200
|
||||
wakuConfig.StoreSeconds = 200
|
||||
@ -65,21 +61,24 @@ func NewTestWakuV2(s *suite.Suite, cfg testWakuV2Config) *waku2.Waku {
|
||||
s.Require().NoError(err)
|
||||
|
||||
err = wakuNode.Start()
|
||||
if cfg.enableStore {
|
||||
err := wakuNode.SubscribeToPubsubTopic(shard.DefaultNonProtectedPubsubTopic(), nil)
|
||||
s.Require().NoError(err)
|
||||
}
|
||||
s.Require().NoError(err)
|
||||
|
||||
return wakuNode
|
||||
}
|
||||
|
||||
func CreateWakuV2Network(s *suite.Suite, parentLogger *zap.Logger, useShardAsDefaultTopic bool, nodeNames []string) []types.Waku {
|
||||
func CreateWakuV2Network(s *suite.Suite, parentLogger *zap.Logger, nodeNames []string) []types.Waku {
|
||||
nodes := make([]*waku2.Waku, len(nodeNames))
|
||||
wrappers := make([]types.Waku, len(nodes))
|
||||
|
||||
for i, name := range nodeNames {
|
||||
nodes[i] = NewTestWakuV2(s, testWakuV2Config{
|
||||
logger: parentLogger.Named("waku-" + name),
|
||||
enableStore: false,
|
||||
useShardAsDefaultTopic: useShardAsDefaultTopic,
|
||||
clusterID: shard.UndefinedShardValue, // FIXME: why it was 0 here?
|
||||
logger: parentLogger.Named("waku-" + name),
|
||||
enableStore: false,
|
||||
clusterID: shard.MainStatusShardCluster,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -198,7 +198,6 @@ func TestTelemetryUponPublishError(t *testing.T) {
|
||||
wakuConfig.EnableDiscV5 = false
|
||||
wakuConfig.DiscV5BootstrapNodes = []string{enrTreeAddress}
|
||||
wakuConfig.DiscoveryLimit = 20
|
||||
wakuConfig.UseShardAsDefaultTopic = true
|
||||
wakuConfig.ClusterID = 16
|
||||
wakuConfig.WakuNodes = []string{enrTreeAddress}
|
||||
wakuConfig.TelemetryServerURL = client.serverURL
|
||||
@ -337,7 +336,6 @@ func TestRetryCacheCleanup(t *testing.T) {
|
||||
}
|
||||
func setDefaultConfig(config *wakuv2.Config, lightMode bool) {
|
||||
config.ClusterID = 16
|
||||
config.UseShardAsDefaultTopic = true
|
||||
|
||||
if lightMode {
|
||||
config.EnablePeerExchangeClient = true
|
||||
|
26
vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go
generated
vendored
26
vendor/github.com/waku-org/go-waku/waku/v2/protocol/filter/client.go
generated
vendored
@ -147,6 +147,17 @@ func (wf *WakuFilterLightNode) Stop() {
|
||||
})
|
||||
}
|
||||
|
||||
func (wf *WakuFilterLightNode) unsubscribeWithoutSubscription(cf protocol.ContentFilter, peerID peer.ID) {
|
||||
err := wf.request(
|
||||
wf.Context(),
|
||||
protocol.GenerateRequestID(),
|
||||
pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL,
|
||||
cf, peerID)
|
||||
if err != nil {
|
||||
wf.log.Warn("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Stream) {
|
||||
return func(stream network.Stream) {
|
||||
peerID := stream.Conn().RemotePeer()
|
||||
@ -156,6 +167,9 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea
|
||||
if !wf.subscriptions.IsSubscribedTo(peerID) {
|
||||
logger.Warn("received message push from unknown peer", logging.HostID("peerID", peerID))
|
||||
wf.metrics.RecordError(unknownPeerMessagePush)
|
||||
//Send a wildcard unsubscribe to this peer so that further requests are not forwarded to us
|
||||
//This could be happening due to https://github.com/waku-org/go-waku/issues/1124
|
||||
go wf.unsubscribeWithoutSubscription(protocol.ContentFilter{}, peerID)
|
||||
if err := stream.Reset(); err != nil {
|
||||
wf.log.Error("resetting connection", zap.Error(err))
|
||||
}
|
||||
@ -199,22 +213,24 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(network.Strea
|
||||
}
|
||||
|
||||
logger = messagePush.WakuMessage.Logger(logger, pubSubTopic)
|
||||
|
||||
if !wf.subscriptions.Has(peerID, protocol.NewContentFilter(pubSubTopic, messagePush.WakuMessage.ContentTopic)) {
|
||||
cf := protocol.NewContentFilter(pubSubTopic, messagePush.WakuMessage.ContentTopic)
|
||||
if !wf.subscriptions.Has(peerID, cf) {
|
||||
logger.Warn("received messagepush with invalid subscription parameters")
|
||||
//Unsubscribe from that peer for the contentTopic, possibly due to https://github.com/waku-org/go-waku/issues/1124
|
||||
go wf.unsubscribeWithoutSubscription(cf, peerID)
|
||||
wf.metrics.RecordError(invalidSubscriptionMessage)
|
||||
return
|
||||
}
|
||||
|
||||
wf.metrics.RecordMessage()
|
||||
|
||||
wf.notify(peerID, pubSubTopic, messagePush.WakuMessage)
|
||||
wf.notify(ctx, peerID, pubSubTopic, messagePush.WakuMessage)
|
||||
|
||||
logger.Info("received message push")
|
||||
}
|
||||
}
|
||||
|
||||
func (wf *WakuFilterLightNode) notify(remotePeerID peer.ID, pubsubTopic string, msg *wpb.WakuMessage) {
|
||||
func (wf *WakuFilterLightNode) notify(ctx context.Context, remotePeerID peer.ID, pubsubTopic string, msg *wpb.WakuMessage) {
|
||||
envelope := protocol.NewEnvelope(msg, wf.timesource.Now().UnixNano(), pubsubTopic)
|
||||
|
||||
if wf.broadcaster != nil {
|
||||
@ -222,7 +238,7 @@ func (wf *WakuFilterLightNode) notify(remotePeerID peer.ID, pubsubTopic string,
|
||||
wf.broadcaster.Submit(envelope)
|
||||
}
|
||||
// Notify filter subscribers
|
||||
wf.subscriptions.Notify(remotePeerID, envelope)
|
||||
wf.subscriptions.Notify(ctx, remotePeerID, envelope)
|
||||
}
|
||||
|
||||
func (wf *WakuFilterLightNode) request(ctx context.Context, requestID []byte,
|
||||
|
11
vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go
generated
vendored
11
vendor/github.com/waku-org/go-waku/waku/v2/protocol/lightpush/waku_lightpush.go
generated
vendored
@ -189,13 +189,6 @@ func (wakuLP *WakuLightPush) reply(stream network.Stream, responsePushRPC *pb.Pu
|
||||
|
||||
// request sends a message via lightPush protocol to either a specified peer or peer that is selected.
|
||||
func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushRequestParameters, peer peer.ID) (*pb.PushResponse, error) {
|
||||
if params == nil {
|
||||
return nil, errors.New("lightpush params are mandatory")
|
||||
}
|
||||
|
||||
if len(params.requestID) == 0 {
|
||||
return nil, ErrInvalidID
|
||||
}
|
||||
|
||||
logger := wakuLP.log.With(logging.HostID("peer", peer))
|
||||
|
||||
@ -336,8 +329,10 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessa
|
||||
for _, peerID := range params.selectedPeers {
|
||||
wg.Add(1)
|
||||
go func(id peer.ID) {
|
||||
paramsValue := *params
|
||||
paramsValue.requestID = protocol.GenerateRequestID()
|
||||
defer wg.Done()
|
||||
response, err := wakuLP.request(ctx, req, params, id)
|
||||
response, err := wakuLP.request(ctx, req, ¶msValue, id)
|
||||
if err != nil {
|
||||
logger.Error("could not publish message", zap.Error(err), zap.Stringer("peer", id))
|
||||
}
|
||||
|
@ -133,7 +133,6 @@ func WithAutomaticRequestID() RequestOption {
|
||||
// DefaultOptions are the default options to be used when using the lightpush protocol
|
||||
func DefaultOptions(host host.Host) []RequestOption {
|
||||
return []RequestOption{
|
||||
WithAutomaticRequestID(),
|
||||
WithAutomaticPeerSelection(),
|
||||
WithMaxPeers(1), //keeping default as 2 for status use-case
|
||||
}
|
||||
|
21
vendor/github.com/waku-org/go-waku/waku/v2/protocol/metadata/waku_metadata.go
generated
vendored
21
vendor/github.com/waku-org/go-waku/waku/v2/protocol/metadata/waku_metadata.go
generated
vendored
@ -117,8 +117,6 @@ func (wakuM *WakuMetadata) Request(ctx context.Context, peerID peer.ID) (*pb.Wak
|
||||
request := &pb.WakuMetadataRequest{}
|
||||
request.ClusterId = clusterID
|
||||
request.Shards = shards
|
||||
// TODO: remove with nwaku 0.28 deployment
|
||||
request.ShardsDeprecated = shards // nolint: staticcheck
|
||||
|
||||
writer := pbio.NewDelimitedWriter(stream)
|
||||
reader := pbio.NewDelimitedReader(stream, math.MaxInt32)
|
||||
@ -173,8 +171,6 @@ func (wakuM *WakuMetadata) onRequest(ctx context.Context) func(network.Stream) {
|
||||
} else {
|
||||
response.ClusterId = clusterID
|
||||
response.Shards = shards
|
||||
// TODO: remove with nwaku 0.28 deployment
|
||||
response.ShardsDeprecated = shards // nolint: staticcheck
|
||||
}
|
||||
|
||||
err = writer.WriteMsg(response)
|
||||
@ -245,14 +241,6 @@ func (wakuM *WakuMetadata) Connected(n network.Network, cc network.Conn) {
|
||||
rClusterID := uint16(*response.ClusterId)
|
||||
var rs protocol.RelayShards
|
||||
|
||||
if _, err = wakuM.h.Peerstore().SupportsProtocols(peerID, relay.WakuRelayID_v200); err == nil {
|
||||
wakuM.log.Debug("light peer only checking clusterID")
|
||||
if rClusterID != wakuM.clusterID {
|
||||
wakuM.disconnectPeer(peerID, errors.New("different clusterID reported"))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
wakuM.log.Debug("relay peer checking cluster and shards")
|
||||
|
||||
var rShardIDs []uint16
|
||||
@ -261,9 +249,12 @@ func (wakuM *WakuMetadata) Connected(n network.Network, cc network.Conn) {
|
||||
rShardIDs = append(rShardIDs, uint16(i))
|
||||
}
|
||||
} else {
|
||||
// TODO: remove with nwaku 0.28 deployment
|
||||
for _, i := range response.ShardsDeprecated { // nolint: staticcheck
|
||||
rShardIDs = append(rShardIDs, uint16(i))
|
||||
if proto, err := wakuM.h.Peerstore().FirstSupportedProtocol(peerID, relay.WakuRelayID_v200); err == nil && proto == "" {
|
||||
wakuM.log.Debug("light peer only checking clusterID")
|
||||
if rClusterID != wakuM.clusterID {
|
||||
wakuM.disconnectPeer(peerID, errors.New("different clusterID reported"))
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
wakuM.log.Debug("getting remote cluster and shards")
|
||||
|
@ -1,6 +1,7 @@
|
||||
package subscription
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
@ -178,17 +179,17 @@ func (sub *SubscriptionsMap) Clear() {
|
||||
sub.clear()
|
||||
}
|
||||
|
||||
func (sub *SubscriptionsMap) Notify(peerID peer.ID, envelope *protocol.Envelope) {
|
||||
func (sub *SubscriptionsMap) Notify(ctx context.Context, peerID peer.ID, envelope *protocol.Envelope) {
|
||||
sub.RLock()
|
||||
defer sub.RUnlock()
|
||||
|
||||
subscriptions, ok := sub.items[peerID].SubsPerPubsubTopic[envelope.PubsubTopic()]
|
||||
if ok {
|
||||
iterateSubscriptionSet(sub.logger, subscriptions, envelope)
|
||||
iterateSubscriptionSet(ctx, sub.logger, subscriptions, envelope)
|
||||
}
|
||||
}
|
||||
|
||||
func iterateSubscriptionSet(logger *zap.Logger, subscriptions SubscriptionSet, envelope *protocol.Envelope) {
|
||||
func iterateSubscriptionSet(ctx context.Context, logger *zap.Logger, subscriptions SubscriptionSet, envelope *protocol.Envelope) {
|
||||
for _, subscription := range subscriptions {
|
||||
func(subscription *SubscriptionDetails) {
|
||||
subscription.RLock()
|
||||
@ -201,6 +202,8 @@ func iterateSubscriptionSet(logger *zap.Logger, subscriptions SubscriptionSet, e
|
||||
|
||||
if !subscription.Closed {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case subscription.C <- envelope:
|
||||
default:
|
||||
logger.Warn("can't deliver message to subscription. subscriber too slow")
|
||||
|
2
vendor/modules.txt
vendored
2
vendor/modules.txt
vendored
@ -1018,7 +1018,7 @@ github.com/waku-org/go-discover/discover/v5wire
|
||||
github.com/waku-org/go-libp2p-rendezvous
|
||||
github.com/waku-org/go-libp2p-rendezvous/db
|
||||
github.com/waku-org/go-libp2p-rendezvous/pb
|
||||
# github.com/waku-org/go-waku v0.8.1-0.20240713051642-554e93791529
|
||||
# github.com/waku-org/go-waku v0.8.1-0.20240715141727-dacff8a6ae5d
|
||||
## explicit; go 1.21
|
||||
github.com/waku-org/go-waku/logging
|
||||
github.com/waku-org/go-waku/tests
|
||||
|
@ -24,8 +24,7 @@ import (
|
||||
|
||||
"golang.org/x/exp/maps"
|
||||
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
"github.com/status-im/status-go/wakuv2/common"
|
||||
)
|
||||
|
||||
@ -58,7 +57,7 @@ func TestMultipleTopicCopyInNewMessageFilter(t *testing.T) {
|
||||
}
|
||||
|
||||
found := false
|
||||
candidates := w.filters.GetWatchersByTopic(relay.DefaultWakuTopic, t1)
|
||||
candidates := w.filters.GetWatchersByTopic(shard.DefaultShardPubsubTopic(), t1)
|
||||
for _, f := range candidates {
|
||||
if maps.Equal(f.ContentTopics, common.NewTopicSet(crit.ContentTopics)) {
|
||||
found = true
|
||||
|
@ -23,8 +23,6 @@ import (
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
|
||||
ethdisc "github.com/ethereum/go-ethereum/p2p/dnsdisc"
|
||||
@ -63,7 +61,6 @@ type Config struct {
|
||||
TelemetrySendPeriodMs int `toml:",omitempty"` // Number of milliseconds to wait between sending requests to telemetry service
|
||||
DefaultShardPubsubTopic string `toml:",omitempty"` // Pubsub topic to be used by default for messages that do not have a topic assigned (depending whether sharding is used or not)
|
||||
DefaultShardedPubsubTopics []string `toml:", omitempty"`
|
||||
UseShardAsDefaultTopic bool `toml:",omitempty"`
|
||||
ClusterID uint16 `toml:",omitempty"`
|
||||
EnableConfirmations bool `toml:",omitempty"` // Enable sending message confirmations
|
||||
SkipPublishToTopic bool `toml:",omitempty"` // Used in testing
|
||||
@ -119,14 +116,10 @@ func setDefaults(cfg *Config) *Config {
|
||||
}
|
||||
|
||||
if cfg.DefaultShardPubsubTopic == "" {
|
||||
if cfg.UseShardAsDefaultTopic {
|
||||
cfg.DefaultShardPubsubTopic = shard.DefaultShardPubsubTopic()
|
||||
//For now populating with both used shards, but this can be populated from user subscribed communities etc once community sharding is implemented
|
||||
cfg.DefaultShardedPubsubTopics = append(cfg.DefaultShardedPubsubTopics, shard.DefaultShardPubsubTopic())
|
||||
cfg.DefaultShardedPubsubTopics = append(cfg.DefaultShardedPubsubTopics, shard.DefaultNonProtectedPubsubTopic())
|
||||
} else {
|
||||
cfg.DefaultShardPubsubTopic = relay.DefaultWakuTopic
|
||||
}
|
||||
cfg.DefaultShardPubsubTopic = shard.DefaultShardPubsubTopic()
|
||||
//For now populating with both used shards, but this can be populated from user subscribed communities etc once community sharding is implemented
|
||||
cfg.DefaultShardedPubsubTopics = append(cfg.DefaultShardedPubsubTopics, shard.DefaultShardPubsubTopic())
|
||||
cfg.DefaultShardedPubsubTopics = append(cfg.DefaultShardedPubsubTopics, shard.DefaultNonProtectedPubsubTopic())
|
||||
}
|
||||
|
||||
return cfg
|
||||
|
@ -592,9 +592,10 @@ func (w *Waku) runPeerExchangeLoop() {
|
||||
}
|
||||
|
||||
func (w *Waku) GetPubsubTopic(topic string) string {
|
||||
if topic == "" || !w.cfg.UseShardAsDefaultTopic {
|
||||
if topic == "" {
|
||||
topic = w.cfg.DefaultShardPubsubTopic
|
||||
}
|
||||
|
||||
return topic
|
||||
}
|
||||
|
||||
@ -1928,7 +1929,7 @@ func (w *Waku) AddStorePeer(address string) (peer.ID, error) {
|
||||
return "", err
|
||||
}
|
||||
|
||||
peerID, err := w.node.AddPeer(addr, wps.Static, []string{}, legacy_store.StoreID_v20beta4)
|
||||
peerID, err := w.node.AddPeer(addr, wps.Static, w.cfg.DefaultShardedPubsubTopics, legacy_store.StoreID_v20beta4)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
@ -1945,7 +1946,7 @@ func (w *Waku) AddRelayPeer(address string) (peer.ID, error) {
|
||||
return "", err
|
||||
}
|
||||
|
||||
peerID, err := w.node.AddPeer(addr, wps.Static, []string{}, relay.WakuRelayID_v200)
|
||||
peerID, err := w.node.AddPeer(addr, wps.Static, w.cfg.DefaultShardedPubsubTopics, relay.WakuRelayID_v200)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
@ -29,11 +29,11 @@ import (
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||
|
||||
"github.com/status-im/status-go/appdatabase"
|
||||
"github.com/status-im/status-go/connection"
|
||||
"github.com/status-im/status-go/eth-node/types"
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
"github.com/status-im/status-go/protocol/tt"
|
||||
"github.com/status-im/status-go/t/helpers"
|
||||
"github.com/status-im/status-go/wakuv2/common"
|
||||
@ -44,7 +44,6 @@ var testBootENRBootstrap = "enrtree://AMOJVZX4V6EXP7NTJPMAYJYST2QP6AJXYW76IU6VGJ
|
||||
|
||||
func setDefaultConfig(config *Config, lightMode bool) {
|
||||
config.ClusterID = 16
|
||||
config.UseShardAsDefaultTopic = true
|
||||
|
||||
if lightMode {
|
||||
config.EnablePeerExchangeClient = true
|
||||
@ -197,7 +196,7 @@ func TestBasicWakuV2(t *testing.T) {
|
||||
filter := &common.Filter{
|
||||
PubsubTopic: config.DefaultShardPubsubTopic,
|
||||
Messages: common.NewMemoryMessageStore(),
|
||||
ContentTopics: common.NewTopicSetFromBytes([][]byte{[]byte{1, 2, 3, 4}}),
|
||||
ContentTopics: common.NewTopicSetFromBytes([][]byte{{1, 2, 3, 4}}),
|
||||
}
|
||||
|
||||
_, err = w.Subscribe(filter)
|
||||
@ -452,14 +451,13 @@ func TestWakuV2Filter(t *testing.T) {
|
||||
func TestWakuV2Store(t *testing.T) {
|
||||
// Configuration for the first Waku node
|
||||
config1 := &Config{
|
||||
Port: 0,
|
||||
UseShardAsDefaultTopic: true,
|
||||
ClusterID: 16,
|
||||
EnableDiscV5: false,
|
||||
DiscoveryLimit: 20,
|
||||
EnableStore: false,
|
||||
StoreCapacity: 100,
|
||||
StoreSeconds: 3600,
|
||||
Port: 0,
|
||||
ClusterID: 16,
|
||||
EnableDiscV5: false,
|
||||
DiscoveryLimit: 20,
|
||||
EnableStore: false,
|
||||
StoreCapacity: 100,
|
||||
StoreSeconds: 3600,
|
||||
}
|
||||
w1PeersCh := make(chan []string, 100) // buffered not to block on the send side
|
||||
|
||||
@ -478,14 +476,13 @@ func TestWakuV2Store(t *testing.T) {
|
||||
sql2, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{})
|
||||
require.NoError(t, err)
|
||||
config2 := &Config{
|
||||
Port: 0,
|
||||
UseShardAsDefaultTopic: true,
|
||||
ClusterID: 16,
|
||||
EnableDiscV5: false,
|
||||
DiscoveryLimit: 20,
|
||||
EnableStore: true,
|
||||
StoreCapacity: 100,
|
||||
StoreSeconds: 3600,
|
||||
Port: 0,
|
||||
ClusterID: 16,
|
||||
EnableDiscV5: false,
|
||||
DiscoveryLimit: 20,
|
||||
EnableStore: true,
|
||||
StoreCapacity: 100,
|
||||
StoreSeconds: 3600,
|
||||
}
|
||||
|
||||
// Start the second Waku node
|
||||
@ -622,7 +619,7 @@ func TestConfirmMessageDelivered(t *testing.T) {
|
||||
msgTimestamp := aliceNode.timestamp()
|
||||
contentTopic := maps.Keys(filter.ContentTopics)[0]
|
||||
|
||||
_, err = aliceNode.Send(relay.DefaultWakuTopic, &pb.WakuMessage{
|
||||
_, err = aliceNode.Send(shard.DefaultShardPubsubTopic(), &pb.WakuMessage{
|
||||
Payload: []byte{1, 2, 3, 4, 5},
|
||||
ContentTopic: contentTopic.ContentTopic(),
|
||||
Version: proto.Uint32(0),
|
||||
|
Loading…
x
Reference in New Issue
Block a user