Always set PubsubTopic in filters

This commit is contained in:
Vitaly Vlasov 2023-11-15 17:58:15 +02:00 committed by Vit∀ly Vlasov
parent c88ab45d76
commit 1794b93c16
30 changed files with 250 additions and 265 deletions

View File

@ -29,9 +29,9 @@ import (
"github.com/status-im/status-go/params" "github.com/status-im/status-go/params"
"github.com/status-im/status-go/protocol" "github.com/status-im/status-go/protocol"
"github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/common/shard"
"github.com/status-im/status-go/protocol/identity/alias" "github.com/status-im/status-go/protocol/identity/alias"
"github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/transport"
wakuextn "github.com/status-im/status-go/services/wakuext" wakuextn "github.com/status-im/status-go/services/wakuext"
) )
@ -48,8 +48,8 @@ var (
seedPhrase = flag.String("seed-phrase", "", "Seed phrase") seedPhrase = flag.String("seed-phrase", "", "Seed phrase")
version = flag.Bool("version", false, "Print version and dump configuration") version = flag.Bool("version", false, "Print version and dump configuration")
communityID = flag.String("community-id", "", "The id of the community") communityID = flag.String("community-id", "", "The id of the community")
shardCluster = flag.Int("shard-cluster", transport.UndefinedShardValue, "The shard cluster in which the of the community is published") shardCluster = flag.Int("shard-cluster", shard.UndefinedShardValue, "The shard cluster in which the of the community is published")
shardIndex = flag.Int("shard-index", transport.UndefinedShardValue, "The shard index in which the community is published") shardIndex = flag.Int("shard-index", shard.UndefinedShardValue, "The shard index in which the community is published")
chatID = flag.String("chat-id", "", "The id of the chat") chatID = flag.String("chat-id", "", "The id of the chat")
dataDir = flag.String("dir", getDefaultDataDir(), "Directory used by node to store data") dataDir = flag.String("dir", getDefaultDataDir(), "Directory used by node to store data")
@ -148,9 +148,9 @@ func main() {
messenger := wakuextservice.Messenger() messenger := wakuextservice.Messenger()
var shard *common.Shard = nil var s *shard.Shard = nil
if shardCluster != nil && shardIndex != nil && *shardCluster != transport.UndefinedShardValue && *shardIndex != transport.UndefinedShardValue { if shardCluster != nil && shardIndex != nil && *shardCluster != shard.UndefinedShardValue && *shardIndex != shard.UndefinedShardValue {
shard = &common.Shard{ s = &shard.Shard{
Cluster: uint16(*shardCluster), Cluster: uint16(*shardCluster),
Index: uint16(*shardIndex), Index: uint16(*shardIndex),
} }
@ -158,7 +158,7 @@ func main() {
community, err := messenger.FetchCommunity(&protocol.FetchCommunityRequest{ community, err := messenger.FetchCommunity(&protocol.FetchCommunityRequest{
CommunityKey: *communityID, CommunityKey: *communityID,
Shard: shard, Shard: s,
TryDatabase: true, TryDatabase: true,
WaitForResponse: true, WaitForResponse: true,
}) })

View File

@ -1,12 +1,13 @@
package protobuf package common
import ( import (
"crypto/ecdsa" "crypto/ecdsa"
"github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/protocol/protobuf"
) )
func (m *ApplicationMetadataMessage) RecoverKey() (*ecdsa.PublicKey, error) { func RecoverKey(m *protobuf.ApplicationMetadataMessage) (*ecdsa.PublicKey, error) {
if m.Signature == nil { if m.Signature == nil {
return nil, nil return nil, nil
} }

View File

@ -7,7 +7,7 @@ import (
"reflect" "reflect"
"time" "time"
"github.com/status-im/status-go/protocol/transport" "github.com/status-im/status-go/protocol/common/shard"
"github.com/status-im/status-go/server" "github.com/status-im/status-go/server"
"github.com/status-im/status-go/signal" "github.com/status-im/status-go/signal"
"github.com/status-im/status-go/transactions" "github.com/status-im/status-go/transactions"
@ -327,7 +327,7 @@ func (b *StatusNode) wakuV2Service(nodeConfig *params.NodeConfig, telemetryServe
EnableDiscV5: nodeConfig.WakuV2Config.EnableDiscV5, EnableDiscV5: nodeConfig.WakuV2Config.EnableDiscV5,
UDPPort: nodeConfig.WakuV2Config.UDPPort, UDPPort: nodeConfig.WakuV2Config.UDPPort,
AutoUpdate: nodeConfig.WakuV2Config.AutoUpdate, AutoUpdate: nodeConfig.WakuV2Config.AutoUpdate,
DefaultShardPubsubTopic: transport.DefaultShardPubsubTopic(), DefaultShardPubsubTopic: shard.DefaultShardPubsubTopic(),
UseShardAsDefaultTopic: nodeConfig.WakuV2Config.UseShardAsDefaultTopic, UseShardAsDefaultTopic: nodeConfig.WakuV2Config.UseShardAsDefaultTopic,
TelemetryServerURL: telemetryServerURL, TelemetryServerURL: telemetryServerURL,
} }

View File

@ -464,8 +464,6 @@ func (s *MessageSender) sendPrivate(
messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage) messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
rawMessage.ID = types.EncodeHex(messageID) rawMessage.ID = types.EncodeHex(messageID)
rawMessage.PubsubTopic = transport.DefaultShardPubsubTopic() // TODO: determine which pubsub topic should be used for 1:1 messages
if rawMessage.BeforeDispatch != nil { if rawMessage.BeforeDispatch != nil {
if err := rawMessage.BeforeDispatch(rawMessage); err != nil { if err := rawMessage.BeforeDispatch(rawMessage); err != nil {
return nil, err return nil, err

View File

@ -1,44 +0,0 @@
package common
import (
"github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/transport"
)
type Shard struct {
Cluster uint16 `json:"cluster"`
Index uint16 `json:"index"`
}
func ShardFromProtobuff(p *protobuf.Shard) *Shard {
if p == nil {
return nil
}
return &Shard{
Cluster: uint16(p.Cluster),
Index: uint16(p.Index),
}
}
func (s *Shard) TransportShard() *transport.Shard {
if s == nil {
return nil
}
return &transport.Shard{
Cluster: s.Cluster,
Index: s.Index,
}
}
func (s *Shard) Protobuffer() *protobuf.Shard {
if s == nil {
return nil
}
return &protobuf.Shard{
Cluster: int32(s.Cluster),
Index: int32(s.Index),
}
}

View File

@ -0,0 +1,55 @@
package shard
import (
"github.com/status-im/status-go/protocol/protobuf"
wakuproto "github.com/waku-org/go-waku/waku/v2/protocol"
)
type Shard struct {
Cluster uint16 `json:"cluster"`
Index uint16 `json:"index"`
}
func FromProtobuff(p *protobuf.Shard) *Shard {
if p == nil {
return nil
}
return &Shard{
Cluster: uint16(p.Cluster),
Index: uint16(p.Index),
}
}
func (s *Shard) Protobuffer() *protobuf.Shard {
if s == nil {
return nil
}
return &protobuf.Shard{
Cluster: int32(s.Cluster),
Index: int32(s.Index),
}
}
func (s *Shard) PubsubTopic() string {
if s != nil {
return wakuproto.NewStaticShardingPubsubTopic(s.Cluster, s.Index).String()
}
return ""
}
func DefaultNonProtectedPubsubTopic() string {
return (&Shard{
Cluster: MainStatusShardCluster,
Index: NonProtectedShardIndex,
}).PubsubTopic()
}
const MainStatusShardCluster = 16
const DefaultShardIndex = 32
const NonProtectedShardIndex = 64
const UndefinedShardValue = 0
func DefaultShardPubsubTopic() string {
return wakuproto.NewStaticShardingPubsubTopic(MainStatusShardCluster, DefaultShardIndex).String()
}

View File

@ -19,10 +19,10 @@ import (
"github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/images" "github.com/status-im/status-go/images"
"github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/common/shard"
community_token "github.com/status-im/status-go/protocol/communities/token" community_token "github.com/status-im/status-go/protocol/communities/token"
"github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/requests" "github.com/status-im/status-go/protocol/requests"
"github.com/status-im/status-go/protocol/transport"
"github.com/status-im/status-go/protocol/v1" "github.com/status-im/status-go/protocol/v1"
) )
@ -47,7 +47,7 @@ type Config struct {
MemberIdentity *ecdsa.PublicKey MemberIdentity *ecdsa.PublicKey
SyncedAt uint64 SyncedAt uint64
EventsData *EventsData EventsData *EventsData
Shard *common.Shard Shard *shard.Shard
PubsubTopicPrivateKey *ecdsa.PrivateKey PubsubTopicPrivateKey *ecdsa.PrivateKey
} }
@ -147,7 +147,7 @@ func (o *Community) MarshalPublicAPIJSON() ([]byte, error) {
ActiveMembersCount uint64 `json:"activeMembersCount"` ActiveMembersCount uint64 `json:"activeMembersCount"`
PubsubTopic string `json:"pubsubTopic"` PubsubTopic string `json:"pubsubTopic"`
PubsubTopicKey string `json:"pubsubTopicKey"` PubsubTopicKey string `json:"pubsubTopicKey"`
Shard *common.Shard `json:"shard"` Shard *shard.Shard `json:"shard"`
}{ }{
ID: o.ID(), ID: o.ID(),
Verified: o.config.Verified, Verified: o.config.Verified,
@ -265,7 +265,7 @@ func (o *Community) MarshalJSON() ([]byte, error) {
ActiveMembersCount uint64 `json:"activeMembersCount"` ActiveMembersCount uint64 `json:"activeMembersCount"`
PubsubTopic string `json:"pubsubTopic"` PubsubTopic string `json:"pubsubTopic"`
PubsubTopicKey string `json:"pubsubTopicKey"` PubsubTopicKey string `json:"pubsubTopicKey"`
Shard *common.Shard `json:"shard"` Shard *shard.Shard `json:"shard"`
}{ }{
ID: o.ID(), ID: o.ID(),
MemberRole: o.MemberRole(o.MemberIdentity()), MemberRole: o.MemberRole(o.MemberIdentity()),
@ -382,7 +382,7 @@ func (o *Community) DescriptionText() string {
return "" return ""
} }
func (o *Community) Shard() *common.Shard { func (o *Community) Shard() *shard.Shard {
if o != nil && o.config != nil { if o != nil && o.config != nil {
return o.config.Shard return o.config.Shard
} }
@ -1321,7 +1321,7 @@ func (o *Community) MemberUpdateChannelID() string {
} }
func (o *Community) PubsubTopic() string { func (o *Community) PubsubTopic() string {
return transport.GetPubsubTopic(o.Shard().TransportShard()) return o.Shard().PubsubTopic()
} }
func (o *Community) PubsubTopicPrivateKey() *ecdsa.PrivateKey { func (o *Community) PubsubTopicPrivateKey() *ecdsa.PrivateKey {
@ -1339,25 +1339,6 @@ func (o *Community) PubsubTopicKey() string {
return hexutil.Encode(crypto.FromECDSAPub(&o.config.PubsubTopicPrivateKey.PublicKey)) return hexutil.Encode(crypto.FromECDSAPub(&o.config.PubsubTopicPrivateKey.PublicKey))
} }
func (o *Community) DefaultFilters() []transport.FiltersToInitialize {
cID := o.IDString()
uncompressedPubKey := common.PubkeyToHex(o.config.ID)[2:]
updatesChannelID := o.StatusUpdatesChannelID()
mlChannelID := o.MagnetlinkMessageChannelID()
memberUpdateChannelID := o.MemberUpdateChannelID()
communityPubsubTopic := o.PubsubTopic()
return []transport.FiltersToInitialize{
{ChatID: cID, PubsubTopic: communityPubsubTopic},
{ChatID: uncompressedPubKey, PubsubTopic: transport.DefaultNonProtectedPubsubTopic(o.Shard().TransportShard())},
{ChatID: uncompressedPubKey, PubsubTopic: communityPubsubTopic},
{ChatID: updatesChannelID, PubsubTopic: communityPubsubTopic},
{ChatID: mlChannelID, PubsubTopic: communityPubsubTopic},
{ChatID: memberUpdateChannelID, PubsubTopic: communityPubsubTopic},
}
}
func (o *Community) PrivateKey() *ecdsa.PrivateKey { func (o *Community) PrivateKey() *ecdsa.PrivateKey {
return o.config.PrivateKey return o.config.PrivateKey
} }

View File

@ -7,6 +7,7 @@ import (
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
utils "github.com/status-im/status-go/common"
"github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/protobuf"
) )
@ -382,7 +383,7 @@ func validateAndGetEventsMessageCommunityDescription(signedDescription []byte, s
return nil, ErrInvalidMessage return nil, ErrInvalidMessage
} }
signer, err := metadata.RecoverKey() signer, err := utils.RecoverKey(metadata)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -25,11 +25,13 @@ import (
gethcommon "github.com/ethereum/go-ethereum/common" gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/status-im/status-go/account" "github.com/status-im/status-go/account"
utils "github.com/status-im/status-go/common"
"github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/images" "github.com/status-im/status-go/images"
"github.com/status-im/status-go/params" "github.com/status-im/status-go/params"
"github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/common/shard"
community_token "github.com/status-im/status-go/protocol/communities/token" community_token "github.com/status-im/status-go/protocol/communities/token"
"github.com/status-im/status-go/protocol/encryption" "github.com/status-im/status-go/protocol/encryption"
"github.com/status-im/status-go/protocol/ens" "github.com/status-im/status-go/protocol/ens"
@ -634,7 +636,7 @@ func (m *Manager) All() ([]*Community, error) {
type CommunityShard struct { type CommunityShard struct {
CommunityID string `json:"communityID"` CommunityID string `json:"communityID"`
Shard *common.Shard `json:"shard"` Shard *shard.Shard `json:"shard"`
} }
type CuratedCommunities struct { type CuratedCommunities struct {
@ -1086,13 +1088,13 @@ func (m *Manager) DeleteCommunity(id types.HexBytes) error {
return m.persistence.DeleteCommunitySettings(id) return m.persistence.DeleteCommunitySettings(id)
} }
func (m *Manager) UpdateShard(community *Community, shard *common.Shard) error { func (m *Manager) UpdateShard(community *Community, shard *shard.Shard) error {
community.config.Shard = shard community.config.Shard = shard
return m.persistence.SaveCommunity(community) return m.persistence.SaveCommunity(community)
} }
// SetShard assigns a shard to a community // SetShard assigns a shard to a community
func (m *Manager) SetShard(communityID types.HexBytes, shard *common.Shard) (*Community, error) { func (m *Manager) SetShard(communityID types.HexBytes, shard *shard.Shard) (*Community, error) {
community, err := m.GetByID(communityID) community, err := m.GetByID(communityID)
if err != nil { if err != nil {
return nil, err return nil, err
@ -1118,7 +1120,8 @@ func (m *Manager) UpdatePubsubTopicPrivateKey(community *Community, privKey *ecd
community.SetPubsubTopicPrivateKey(privKey) community.SetPubsubTopicPrivateKey(privKey)
if privKey != nil { if privKey != nil {
if err := m.transport.StorePubsubTopicKey(community.PubsubTopic(), privKey); err != nil { topic := community.PubsubTopic()
if err := m.transport.StorePubsubTopicKey(topic, privKey); err != nil {
return err return err
} }
} }
@ -2910,7 +2913,7 @@ func UnwrapCommunityDescriptionMessage(payload []byte) (*ecdsa.PublicKey, *proto
if applicationMetadataMessage.Type != protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION { if applicationMetadataMessage.Type != protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION {
return nil, nil, ErrInvalidMessage return nil, nil, ErrInvalidMessage
} }
signer, err := applicationMetadataMessage.RecoverKey() signer, err := utils.RecoverKey(applicationMetadataMessage)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -3207,7 +3210,8 @@ func (m *Manager) initializeCommunity(community *Community) error {
} }
if m.transport != nil && m.transport.WakuVersion() == 2 { if m.transport != nil && m.transport.WakuVersion() == 2 {
privKey, err := m.transport.RetrievePubsubTopicKey(community.PubsubTopic()) topic := community.PubsubTopic()
privKey, err := m.transport.RetrievePubsubTopicKey(topic)
if err != nil { if err != nil {
return err return err
} }
@ -3370,19 +3374,6 @@ func (m *Manager) DeclinedPendingRequestsToJoinForCommunity(id types.HexBytes) (
} }
func (m *Manager) GetPubsubTopic(communityID string) (string, error) {
community, err := m.GetByIDString(communityID)
if err != nil {
return "", err
}
if community == nil {
return transport.DefaultShardPubsubTopic(), nil
}
return transport.GetPubsubTopic(community.Shard().TransportShard()), nil
}
func (m *Manager) RequestsToJoinForCommunityAwaitingAddresses(id types.HexBytes) ([]*RequestToJoin, error) { func (m *Manager) RequestsToJoinForCommunityAwaitingAddresses(id types.HexBytes) ([]*RequestToJoin, error) {
m.logger.Info("fetching ownership changed invitations", zap.String("community-id", id.String())) m.logger.Info("fetching ownership changed invitations", zap.String("community-id", id.String()))
return m.persistence.RequestsToJoinForCommunityAwaitingAddresses(id) return m.persistence.RequestsToJoinForCommunityAwaitingAddresses(id)

View File

@ -17,6 +17,7 @@ import (
"github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/common/shard"
"github.com/status-im/status-go/protocol/communities/token" "github.com/status-im/status-go/protocol/communities/token"
"github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/services/wallet/bigint" "github.com/status-im/status-go/services/wallet/bigint"
@ -377,9 +378,9 @@ func (p *Persistence) unmarshalCommunityFromDB(memberIdentity *ecdsa.PublicKey,
} }
var shard *common.Shard = nil var s *shard.Shard = nil
if cluster != nil && index != nil { if cluster != nil && index != nil {
shard = &common.Shard{ s = &shard.Shard{
Cluster: uint16(*cluster), Cluster: uint16(*cluster),
Index: uint16(*index), Index: uint16(*index),
} }
@ -401,7 +402,7 @@ func (p *Persistence) unmarshalCommunityFromDB(memberIdentity *ecdsa.PublicKey,
Joined: joined, Joined: joined,
Spectated: spectated, Spectated: spectated,
EventsData: eventsData, EventsData: eventsData,
Shard: shard, Shard: s,
} }
community, err := New(config, p.timesource) community, err := New(config, p.timesource)
if err != nil { if err != nil {

View File

@ -29,10 +29,10 @@ import (
"github.com/status-im/status-go/multiaccounts/settings" "github.com/status-im/status-go/multiaccounts/settings"
"github.com/status-im/status-go/params" "github.com/status-im/status-go/params"
"github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/common/shard"
"github.com/status-im/status-go/protocol/communities" "github.com/status-im/status-go/protocol/communities"
"github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/requests" "github.com/status-im/status-go/protocol/requests"
"github.com/status-im/status-go/protocol/transport"
"github.com/status-im/status-go/protocol/tt" "github.com/status-im/status-go/protocol/tt"
"github.com/status-im/status-go/services/communitytokens" "github.com/status-im/status-go/services/communitytokens"
walletToken "github.com/status-im/status-go/services/wallet/token" walletToken "github.com/status-im/status-go/services/wallet/token"
@ -148,7 +148,7 @@ func (c *CollectiblesServiceMock) DeploymentSignatureDigest(chainID uint64, addr
func newWakuV2(s *suite.Suite, logger *zap.Logger, useLocalWaku bool) *waku.Waku { func newWakuV2(s *suite.Suite, logger *zap.Logger, useLocalWaku bool) *waku.Waku {
config := &waku.Config{ config := &waku.Config{
DefaultShardPubsubTopic: transport.DefaultShardPubsubTopic(), DefaultShardPubsubTopic: shard.DefaultShardPubsubTopic(),
} }
var onPeerStats func(connStatus types.ConnStatus) var onPeerStats func(connStatus types.ConnStatus)
@ -289,7 +289,6 @@ func newCommunitiesTestMessenger(shh types.Waku, privateKey *ecdsa.PrivateKey, l
if err != nil { if err != nil {
return nil, err return nil, err
} }
options := []Option{ options := []Option{
WithCustomLogger(logger), WithCustomLogger(logger),
WithDatabase(appDb), WithDatabase(appDb),

View File

@ -8,6 +8,7 @@ import (
"github.com/status-im/status-go/api/multiformat" "github.com/status-im/status-go/api/multiformat"
"github.com/status-im/status-go/images" "github.com/status-im/status-go/images"
"github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/common/shard"
"github.com/status-im/status-go/protocol/communities" "github.com/status-im/status-go/protocol/communities"
) )
@ -94,7 +95,7 @@ func (u *StatusUnfurler) fillCommunityImages(community *communities.Community, i
return nil return nil
} }
func (u *StatusUnfurler) buildCommunityData(communityID string, shard *common.Shard) (*communities.Community, *common.StatusCommunityLinkPreview, error) { func (u *StatusUnfurler) buildCommunityData(communityID string, shard *shard.Shard) (*communities.Community, *common.StatusCommunityLinkPreview, error) {
// This automatically checks the database // This automatically checks the database
community, err := u.m.FetchCommunity(&FetchCommunityRequest{ community, err := u.m.FetchCommunity(&FetchCommunityRequest{
CommunityKey: communityID, CommunityKey: communityID,
@ -127,7 +128,7 @@ func (u *StatusUnfurler) buildCommunityData(communityID string, shard *common.Sh
return community, c, nil return community, c, nil
} }
func (u *StatusUnfurler) buildChannelData(channelUUID string, communityID string, communityShard *common.Shard) (*common.StatusCommunityChannelLinkPreview, error) { func (u *StatusUnfurler) buildChannelData(channelUUID string, communityID string, communityShard *shard.Shard) (*common.StatusCommunityChannelLinkPreview, error) {
community, communityData, err := u.buildCommunityData(communityID, communityShard) community, communityData, err := u.buildCommunityData(communityID, communityShard)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to build channel community data: %w", err) return nil, fmt.Errorf("failed to build channel community data: %w", err)

View File

@ -1684,7 +1684,7 @@ func (m *Messenger) Init() error {
} }
for _, org := range joinedCommunities { for _, org := range joinedCommunities {
// the org advertise on the public topic derived by the pk // the org advertise on the public topic derived by the pk
filtersToInit = append(filtersToInit, org.DefaultFilters()...) filtersToInit = append(filtersToInit, m.DefaultFilters(org)...)
// This is for status-go versions that didn't have `CommunitySettings` // This is for status-go versions that didn't have `CommunitySettings`
// We need to ensure communities that existed before community settings // We need to ensure communities that existed before community settings
@ -1730,9 +1730,8 @@ func (m *Messenger) Init() error {
if err != nil { if err != nil {
return err return err
} }
for _, org := range spectatedCommunities { for _, org := range spectatedCommunities {
filtersToInit = append(filtersToInit, org.DefaultFilters()...) filtersToInit = append(filtersToInit, m.DefaultFilters(org)...)
} }
// Init filters for the communities we control // Init filters for the communities we control
@ -1744,12 +1743,12 @@ func (m *Messenger) Init() error {
for _, c := range controlledCommunities { for _, c := range controlledCommunities {
communityFiltersToInitialize = append(communityFiltersToInitialize, transport.CommunityFilterToInitialize{ communityFiltersToInitialize = append(communityFiltersToInitialize, transport.CommunityFilterToInitialize{
Shard: c.Shard().TransportShard(), Shard: c.Shard(),
PrivKey: c.PrivateKey(), PrivKey: c.PrivateKey(),
}) })
} }
_, err = m.transport.InitCommunityFilters(communityFiltersToInitialize) _, err = m.InitCommunityFilters(communityFiltersToInitialize)
if err != nil { if err != nil {
return err return err
} }
@ -1781,7 +1780,7 @@ func (m *Messenger) Init() error {
switch chat.ChatType { switch chat.ChatType {
case ChatTypePublic, ChatTypeProfile: case ChatTypePublic, ChatTypeProfile:
filtersToInit = append(filtersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: transport.DefaultShardPubsubTopic()}) filtersToInit = append(filtersToInit, transport.FiltersToInitialize{ChatID: chat.ID})
case ChatTypeCommunityChat: case ChatTypeCommunityChat:
communityID, err := hexutil.Decode(chat.CommunityID) communityID, err := hexutil.Decode(chat.CommunityID)
if err != nil { if err != nil {
@ -1797,7 +1796,7 @@ func (m *Messenger) Init() error {
communityInfo[chat.CommunityID] = community communityInfo[chat.CommunityID] = community
} }
filtersToInit = append(filtersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: transport.GetPubsubTopic(community.Shard().TransportShard())}) filtersToInit = append(filtersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: community.PubsubTopic()})
case ChatTypeOneToOne: case ChatTypeOneToOne:
pk, err := chat.PublicKey() pk, err := chat.PublicKey()
if err != nil { if err != nil {
@ -2154,10 +2153,12 @@ func (m *Messenger) dispatchMessage(ctx context.Context, rawMessage common.RawMe
return rawMessage, err return rawMessage, err
} }
case ChatTypeCommunityChat: case ChatTypeCommunityChat:
rawMessage.PubsubTopic, err = m.communitiesManager.GetPubsubTopic(chat.CommunityID)
community, err := m.communitiesManager.GetByIDString(chat.CommunityID)
if err != nil { if err != nil {
return rawMessage, err return rawMessage, err
} }
rawMessage.PubsubTopic = community.PubsubTopic()
// TODO: add grant // TODO: add grant
canPost, err := m.communitiesManager.CanPost(&m.identity.PublicKey, chat.CommunityID, chat.CommunityChatID(), nil) canPost, err := m.communitiesManager.CanPost(&m.identity.PublicKey, chat.CommunityID, chat.CommunityChatID(), nil)

View File

@ -26,6 +26,7 @@ import (
"github.com/status-im/status-go/images" "github.com/status-im/status-go/images"
"github.com/status-im/status-go/multiaccounts/accounts" "github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/common/shard"
"github.com/status-im/status-go/protocol/communities" "github.com/status-im/status-go/protocol/communities"
"github.com/status-im/status-go/protocol/communities/token" "github.com/status-im/status-go/protocol/communities/token"
"github.com/status-im/status-go/protocol/discord" "github.com/status-im/status-go/protocol/discord"
@ -61,7 +62,7 @@ const (
type FetchCommunityRequest struct { type FetchCommunityRequest struct {
// CommunityKey should be either a public or a private community key // CommunityKey should be either a public or a private community key
CommunityKey string `json:"communityKey"` CommunityKey string `json:"communityKey"`
Shard *common.Shard `json:"shard"` Shard *shard.Shard `json:"shard"`
TryDatabase bool `json:"tryDatabase"` TryDatabase bool `json:"tryDatabase"`
WaitForResponse bool `json:"waitForResponse"` WaitForResponse bool `json:"waitForResponse"`
} }
@ -95,6 +96,7 @@ func (m *Messenger) publishOrg(org *communities.Community) error {
m.logger.Debug("publishing org", zap.String("org-id", org.IDString()), zap.Any("org", org)) m.logger.Debug("publishing org", zap.String("org-id", org.IDString()), zap.Any("org", org))
payload, err := org.MarshaledDescription() payload, err := org.MarshaledDescription()
if err != nil { if err != nil {
return err return err
} }
@ -556,8 +558,7 @@ func (m *Messenger) SpectatedCommunities() ([]*communities.Community, error) {
func (m *Messenger) initCommunityChats(community *communities.Community) ([]*Chat, error) { func (m *Messenger) initCommunityChats(community *communities.Community) ([]*Chat, error) {
logger := m.logger.Named("initCommunityChats") logger := m.logger.Named("initCommunityChats")
publicFiltersToInit := m.DefaultFilters(community)
publicFiltersToInit := community.DefaultFilters()
chats := CreateCommunityChats(community, m.getTimesource()) chats := CreateCommunityChats(community, m.getTimesource())
@ -575,8 +576,9 @@ func (m *Messenger) initCommunityChats(community *communities.Community) ([]*Cha
if community.IsControlNode() { if community.IsControlNode() {
// Init the community filter so we can receive messages on the community // Init the community filter so we can receive messages on the community
communityFilters, err := m.transport.InitCommunityFilters([]transport.CommunityFilterToInitialize{{
Shard: community.Shard().TransportShard(), communityFilters, err := m.InitCommunityFilters([]transport.CommunityFilterToInitialize{{
Shard: community.Shard(),
PrivKey: community.PrivateKey(), PrivKey: community.PrivateKey(),
}}) }})
@ -651,13 +653,13 @@ func (m *Messenger) JoinCommunity(ctx context.Context, communityID types.HexByte
return mr, nil return mr, nil
} }
func (m *Messenger) subscribeToCommunityShard(communityID []byte, shard *common.Shard) error { func (m *Messenger) subscribeToCommunityShard(communityID []byte, shard *shard.Shard) error {
if m.transport.WakuVersion() != 2 { if m.transport.WakuVersion() != 2 {
return nil return nil
} }
// TODO: this should probably be moved completely to transport once pubsub topic logic is implemented // TODO: this should probably be moved completely to transport once pubsub topic logic is implemented
pubsubTopic := transport.GetPubsubTopic(shard.TransportShard()) pubsubTopic := shard.PubsubTopic()
privK, err := m.transport.RetrievePubsubTopicKey(pubsubTopic) privK, err := m.transport.RetrievePubsubTopicKey(pubsubTopic)
if err != nil { if err != nil {
@ -1042,7 +1044,7 @@ func (m *Messenger) RequestToJoinCommunity(request *requests.RequestToJoinCommun
CommunityID: community.ID(), CommunityID: community.ID(),
SkipEncryptionLayer: true, SkipEncryptionLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN,
PubsubTopic: transport.DefaultNonProtectedPubsubTopic(community.Shard().TransportShard()), PubsubTopic: shard.DefaultNonProtectedPubsubTopic(),
} }
_, err = m.sender.SendCommunityMessage(context.Background(), rawMessage) _, err = m.sender.SendCommunityMessage(context.Background(), rawMessage)
@ -1361,7 +1363,7 @@ func (m *Messenger) CancelRequestToJoinCommunity(ctx context.Context, request *r
CommunityID: community.ID(), CommunityID: community.ID(),
SkipEncryptionLayer: true, SkipEncryptionLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_CANCEL_REQUEST_TO_JOIN, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_CANCEL_REQUEST_TO_JOIN,
PubsubTopic: transport.DefaultNonProtectedPubsubTopic(community.Shard().TransportShard()), PubsubTopic: shard.DefaultNonProtectedPubsubTopic(),
} }
_, err = m.sender.SendCommunityMessage(context.Background(), rawMessage) _, err = m.sender.SendCommunityMessage(context.Background(), rawMessage)
@ -1468,7 +1470,7 @@ func (m *Messenger) acceptRequestToJoinCommunity(requestToJoin *communities.Requ
Sender: community.PrivateKey(), Sender: community.PrivateKey(),
SkipEncryptionLayer: true, SkipEncryptionLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN_RESPONSE, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN_RESPONSE,
PubsubTopic: transport.DefaultNonProtectedPubsubTopic(community.Shard().TransportShard()), PubsubTopic: shard.DefaultNonProtectedPubsubTopic(),
} }
_, err = m.sender.SendPrivate(context.Background(), pk, rawMessage) _, err = m.sender.SendPrivate(context.Background(), pk, rawMessage)
@ -1881,6 +1883,42 @@ func (m *Messenger) DeleteCommunityChat(communityID types.HexBytes, chatID strin
return response, nil return response, nil
} }
func (m *Messenger) useShards() bool {
nodeConfig, err := m.settings.GetNodeConfig()
if err != nil {
return false
}
return nodeConfig.WakuV2Config.UseShardAsDefaultTopic
}
func (m *Messenger) InitCommunityFilters(communityFiltersToInitialize []transport.CommunityFilterToInitialize) ([]*transport.Filter, error) {
return m.transport.InitCommunityFilters(communityFiltersToInitialize, m.useShards())
}
func (m *Messenger) DefaultFilters(o *communities.Community) []transport.FiltersToInitialize {
cID := o.IDString()
uncompressedPubKey := common.PubkeyToHex(o.PublicKey())[2:]
updatesChannelID := o.StatusUpdatesChannelID()
mlChannelID := o.MagnetlinkMessageChannelID()
memberUpdateChannelID := o.MemberUpdateChannelID()
communityPubsubTopic := o.PubsubTopic()
filters := []transport.FiltersToInitialize{
{ChatID: cID, PubsubTopic: communityPubsubTopic},
{ChatID: uncompressedPubKey, PubsubTopic: communityPubsubTopic},
{ChatID: updatesChannelID, PubsubTopic: communityPubsubTopic},
{ChatID: mlChannelID, PubsubTopic: communityPubsubTopic},
{ChatID: memberUpdateChannelID, PubsubTopic: communityPubsubTopic},
}
if m.useShards() {
filters = append(filters, transport.FiltersToInitialize{ChatID: uncompressedPubKey, PubsubTopic: shard.DefaultNonProtectedPubsubTopic()})
}
return filters
}
func (m *Messenger) CreateCommunity(request *requests.CreateCommunity, createDefaultChannel bool) (*MessengerResponse, error) { func (m *Messenger) CreateCommunity(request *requests.CreateCommunity, createDefaultChannel bool) (*MessengerResponse, error) {
if err := request.Validate(); err != nil { if err := request.Validate(); err != nil {
return nil, err return nil, err
@ -1907,8 +1945,8 @@ func (m *Messenger) CreateCommunity(request *requests.CreateCommunity, createDef
} }
// Init the community filter so we can receive messages on the community // Init the community filter so we can receive messages on the community
_, err = m.transport.InitCommunityFilters([]transport.CommunityFilterToInitialize{{ _, err = m.InitCommunityFilters([]transport.CommunityFilterToInitialize{{
Shard: community.Shard().TransportShard(), Shard: community.Shard(),
PrivKey: community.PrivateKey(), PrivKey: community.PrivateKey(),
}}) }})
if err != nil { if err != nil {
@ -1916,7 +1954,7 @@ func (m *Messenger) CreateCommunity(request *requests.CreateCommunity, createDef
} }
// Init the default community filters // Init the default community filters
_, err = m.transport.InitPublicFilters(community.DefaultFilters()) _, err = m.transport.InitPublicFilters(m.DefaultFilters(community))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -1998,9 +2036,10 @@ func (m *Messenger) SetCommunityShard(request *requests.SetCommunityShard) (*Mes
} }
func (m *Messenger) UpdateCommunityFilters(community *communities.Community) error { func (m *Messenger) UpdateCommunityFilters(community *communities.Community) error {
publicFiltersToInit := make([]transport.FiltersToInitialize, 0, len(community.DefaultFilters())+len(community.Chats())) defaultFilters := m.DefaultFilters(community)
publicFiltersToInit := make([]transport.FiltersToInitialize, 0, len(defaultFilters)+len(community.Chats()))
publicFiltersToInit = append(publicFiltersToInit, community.DefaultFilters()...) publicFiltersToInit = append(publicFiltersToInit, defaultFilters...)
for chatID := range community.Chats() { for chatID := range community.Chats() {
communityChatID := community.IDString() + chatID communityChatID := community.IDString() + chatID
@ -2017,8 +2056,8 @@ func (m *Messenger) UpdateCommunityFilters(community *communities.Community) err
} }
// Init the community filter so we can receive messages on the community // Init the community filter so we can receive messages on the community
_, err = m.transport.InitCommunityFilters([]transport.CommunityFilterToInitialize{{ _, err = m.InitCommunityFilters([]transport.CommunityFilterToInitialize{{
Shard: community.Shard().TransportShard(), Shard: community.Shard(),
PrivKey: community.PrivateKey(), PrivKey: community.PrivateKey(),
}}) }})
if err != nil { if err != nil {
@ -2209,7 +2248,7 @@ func (m *Messenger) ImportCommunity(ctx context.Context, key *ecdsa.PrivateKey)
} }
// Load filters // Load filters
_, err = m.transport.InitPublicFilters(community.DefaultFilters()) _, err = m.transport.InitPublicFilters(m.DefaultFilters(community))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -2521,7 +2560,7 @@ func (m *Messenger) FetchCommunity(request *FetchCommunityRequest) (*communities
// requestCommunityInfoFromMailserver installs filter for community and requests its details // requestCommunityInfoFromMailserver installs filter for community and requests its details
// from mailserver. When response received it will be passed through signals handler // from mailserver. When response received it will be passed through signals handler
func (m *Messenger) requestCommunityInfoFromMailserver(communityID string, shard *common.Shard, waitForResponse bool) (*communities.Community, error) { func (m *Messenger) requestCommunityInfoFromMailserver(communityID string, shard *shard.Shard, waitForResponse bool) (*communities.Community, error) {
m.logger.Info("requesting community info", zap.String("communityID", communityID), zap.Any("shard", shard)) m.logger.Info("requesting community info", zap.String("communityID", communityID), zap.Any("shard", shard))
@ -2538,7 +2577,7 @@ func (m *Messenger) requestCommunityInfoFromMailserver(communityID string, shard
if filter == nil { if filter == nil {
filters, err := m.transport.InitPublicFilters([]transport.FiltersToInitialize{{ filters, err := m.transport.InitPublicFilters([]transport.FiltersToInitialize{{
ChatID: communityID, ChatID: communityID,
PubsubTopic: transport.GetPubsubTopic(shard.TransportShard()), PubsubTopic: shard.PubsubTopic(),
}}) }})
if err != nil { if err != nil {
return nil, fmt.Errorf("Can't install filter for community: %v", err) return nil, fmt.Errorf("Can't install filter for community: %v", err)
@ -2628,7 +2667,7 @@ func (m *Messenger) requestCommunitiesFromMailserver(communities []communities.C
if filter == nil { if filter == nil {
filters, err := m.transport.InitPublicFilters([]transport.FiltersToInitialize{{ filters, err := m.transport.InitPublicFilters([]transport.FiltersToInitialize{{
ChatID: c.CommunityID, ChatID: c.CommunityID,
PubsubTopic: transport.GetPubsubTopic(c.Shard.TransportShard()), PubsubTopic: c.Shard.PubsubTopic(),
}}) }})
if err != nil { if err != nil {
m.logger.Error("Can't install filter for community", zap.Error(err)) m.logger.Error("Can't install filter for community", zap.Error(err))
@ -2950,7 +2989,7 @@ func (m *Messenger) HandleCommunityShardKey(state *ReceivedMessageState, message
return errors.New("signer can't be nil") return errors.New("signer can't be nil")
} }
err = m.handleCommunityShardAndFiltersFromProto(community, common.ShardFromProtobuff(message.Shard), message.PrivateKey) err = m.handleCommunityShardAndFiltersFromProto(community, shard.FromProtobuff(message.Shard), message.PrivateKey)
if err != nil { if err != nil {
return err return err
} }
@ -2960,7 +2999,7 @@ func (m *Messenger) HandleCommunityShardKey(state *ReceivedMessageState, message
return nil return nil
} }
func (m *Messenger) handleCommunityShardAndFiltersFromProto(community *communities.Community, shard *common.Shard, privateKeyBytes []byte) error { func (m *Messenger) handleCommunityShardAndFiltersFromProto(community *communities.Community, shard *shard.Shard, privateKeyBytes []byte) error {
err := m.communitiesManager.UpdateShard(community, shard) err := m.communitiesManager.UpdateShard(community, shard)
if err != nil { if err != nil {
return err return err

View File

@ -1778,8 +1778,8 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
} }
// Init the community filter so we can receive messages on the community // Init the community filter so we can receive messages on the community
_, err = m.transport.InitCommunityFilters([]transport.CommunityFilterToInitialize{{ _, err = m.InitCommunityFilters([]transport.CommunityFilterToInitialize{{
Shard: discordCommunity.Shard().TransportShard(), Shard: discordCommunity.Shard(),
PrivKey: discordCommunity.PrivateKey(), PrivKey: discordCommunity.PrivateKey(),
}}) }})
if err != nil { if err != nil {
@ -1799,7 +1799,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
return return
} }
_, err = m.transport.InitPublicFilters(discordCommunity.DefaultFilters()) _, err = m.transport.InitPublicFilters(m.DefaultFilters(discordCommunity))
if err != nil { if err != nil {
m.cleanUpImport(communityID) m.cleanUpImport(communityID)
importProgress.AddTaskError(discord.InitCommunityTask, discord.Error(err.Error())) importProgress.AddTaskError(discord.InitCommunityTask, discord.Error(err.Error()))

View File

@ -478,7 +478,7 @@ func (m *Messenger) addContact(ctx context.Context, pubKey, ensName, nickname, d
if !deprecation.ChatProfileDeprecated { if !deprecation.ChatProfileDeprecated {
response.AddChat(profileChat) response.AddChat(profileChat)
_, err = m.transport.InitFilters([]transport.FiltersToInitialize{{ChatID: profileChat.ID, PubsubTopic: transport.DefaultShardPubsubTopic()}}, []*ecdsa.PublicKey{publicKey}) _, err = m.transport.InitFilters([]transport.FiltersToInitialize{{ChatID: profileChat.ID}}, []*ecdsa.PublicKey{publicKey})
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -24,6 +24,7 @@ import (
multiaccountscommon "github.com/status-im/status-go/multiaccounts/common" multiaccountscommon "github.com/status-im/status-go/multiaccounts/common"
"github.com/status-im/status-go/multiaccounts/settings" "github.com/status-im/status-go/multiaccounts/settings"
"github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/common/shard"
"github.com/status-im/status-go/protocol/communities" "github.com/status-im/status-go/protocol/communities"
"github.com/status-im/status-go/protocol/encryption/multidevice" "github.com/status-im/status-go/protocol/encryption/multidevice"
"github.com/status-im/status-go/protocol/identity" "github.com/status-im/status-go/protocol/identity"
@ -1608,7 +1609,7 @@ func (m *Messenger) HandleCommunityRequestToJoinResponse(state *ReceivedMessageS
return err return err
} }
err = m.handleCommunityShardAndFiltersFromProto(community, common.ShardFromProtobuff(requestToJoinResponseProto.Shard), requestToJoinResponseProto.ProtectedTopicPrivateKey) err = m.handleCommunityShardAndFiltersFromProto(community, shard.FromProtobuff(requestToJoinResponseProto.Shard), requestToJoinResponseProto.ProtectedTopicPrivateKey)
if err != nil { if err != nil {
return err return err
} }

View File

@ -255,7 +255,7 @@ func (m *Messenger) syncBackup() error {
to := m.calculateMailserverTo() to := m.calculateMailserverTo()
from := uint32(m.getTimesource().GetCurrentTime()/1000) - oneMonthInSeconds from := uint32(m.getTimesource().GetCurrentTime()/1000) - oneMonthInSeconds
batch := MailserverBatch{From: from, To: to, PubsubTopic: transport.DefaultShardPubsubTopic(), Topics: []types.TopicType{filter.ContentTopic}} batch := MailserverBatch{From: from, To: to, Topics: []types.TopicType{filter.ContentTopic}}
err := m.processMailserverBatch(batch) err := m.processMailserverBatch(batch)
if err != nil { if err != nil {
return err return err

View File

@ -11,6 +11,7 @@ import (
"github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/common/shard"
"github.com/status-im/status-go/protocol/communities" "github.com/status-im/status-go/protocol/communities"
"github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/requests" "github.com/status-im/status-go/protocol/requests"
@ -45,7 +46,7 @@ type URLDataResponse struct {
Community *CommunityURLData `json:"community"` Community *CommunityURLData `json:"community"`
Channel *CommunityChannelURLData `json:"channel"` Channel *CommunityChannelURLData `json:"channel"`
Contact *ContactURLData `json:"contact"` Contact *ContactURLData `json:"contact"`
Shard *common.Shard `json:"shard,omitempty"` Shard *shard.Shard `json:"shard,omitempty"`
} }
const baseShareURL = "https://status.app" const baseShareURL = "https://status.app"
@ -197,7 +198,7 @@ func parseCommunityURLWithData(data string, chatKey string) (*URLDataResponse, e
TagIndices: communityProto.TagIndices, TagIndices: communityProto.TagIndices,
CommunityID: types.EncodeHex(communityID), CommunityID: types.EncodeHex(communityID),
}, },
Shard: common.ShardFromProtobuff(urlDataProto.Shard), Shard: shard.FromProtobuff(urlDataProto.Shard),
}, nil }, nil
} }
@ -366,7 +367,7 @@ func parseCommunityChannelURLWithData(data string, chatKey string) (*URLDataResp
Color: channelProto.Color, Color: channelProto.Color,
ChannelUUID: channelProto.Uuid, ChannelUUID: channelProto.Uuid,
}, },
Shard: common.ShardFromProtobuff(urlDataProto.Shard), Shard: shard.FromProtobuff(urlDataProto.Shard),
}, nil }, nil
} }

View File

@ -4,13 +4,12 @@ import (
"errors" "errors"
"github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/common/shard"
"github.com/status-im/status-go/protocol/transport"
) )
type SetCommunityShard struct { type SetCommunityShard struct {
CommunityID types.HexBytes `json:"communityId"` CommunityID types.HexBytes `json:"communityId"`
Shard *common.Shard `json:"shard,omitempty"` Shard *shard.Shard `json:"shard,omitempty"`
PrivateKey *types.HexBytes `json:"privateKey,omitempty"` PrivateKey *types.HexBytes `json:"privateKey,omitempty"`
} }
@ -20,7 +19,7 @@ func (s *SetCommunityShard) Validate() error {
} }
if s.Shard != nil { if s.Shard != nil {
// TODO: for now only MainStatusShard(16) is accepted // TODO: for now only MainStatusShard(16) is accepted
if s.Shard.Cluster != transport.MainStatusShardCluster { if s.Shard.Cluster != shard.MainStatusShardCluster {
return errors.New("invalid shard cluster") return errors.New("invalid shard cluster")
} }
if s.Shard.Index > 1023 { if s.Shard.Index > 1023 {

View File

@ -11,17 +11,13 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/common/shard"
) )
const ( const (
minPow = 0.0 minPow = 0.0
) )
type Shard struct {
Cluster uint16
Index uint16
}
type RawFilter struct { type RawFilter struct {
FilterID string FilterID string
Topic types.TopicType Topic types.TopicType
@ -145,11 +141,11 @@ func (f *FiltersManager) InitPublicFilters(publicFiltersToInit []FiltersToInitia
} }
type CommunityFilterToInitialize struct { type CommunityFilterToInitialize struct {
Shard *Shard Shard *shard.Shard
PrivKey *ecdsa.PrivateKey PrivKey *ecdsa.PrivateKey
} }
func (f *FiltersManager) InitCommunityFilters(communityFiltersToInitialize []CommunityFilterToInitialize) ([]*Filter, error) { func (f *FiltersManager) InitCommunityFilters(communityFiltersToInitialize []CommunityFilterToInitialize, useShards bool) ([]*Filter, error) {
var filters []*Filter var filters []*Filter
f.mutex.Lock() f.mutex.Lock()
defer f.mutex.Unlock() defer f.mutex.Unlock()
@ -159,10 +155,15 @@ func (f *FiltersManager) InitCommunityFilters(communityFiltersToInitialize []Com
continue continue
} }
communityPubsubTopic := GetPubsubTopic(cf.Shard) topics := make([]string, 0)
topics := []string{communityPubsubTopic} if useShards {
if communityPubsubTopic != DefaultShardPubsubTopic() { topics = append(topics, shard.DefaultShardPubsubTopic())
topics = append(topics, DefaultShardPubsubTopic()) if cf.Shard.PubsubTopic() != "" {
topics = append(topics, cf.Shard.PubsubTopic())
}
} else {
topics = append(topics, "") // empty PubsubTopic means default pubsub topic,
// to be overridden with proper value in Waku layer
} }
// TODO: requests to join / cancels are currently being sent into the default waku topic. // TODO: requests to join / cancels are currently being sent into the default waku topic.
@ -386,11 +387,9 @@ func (f *FiltersManager) LoadPersonal(publicKey *ecdsa.PublicKey, identity *ecds
return f.filters[chatID], nil return f.filters[chatID], nil
} }
pubsubTopic := DefaultShardPubsubTopic()
// We set up a filter so we can publish, // We set up a filter so we can publish,
// but we discard envelopes if listen is false. // but we discard envelopes if listen is false.
filter, err := f.addAsymmetric(chatID, pubsubTopic, identity, listen) filter, err := f.addAsymmetric(chatID, "", identity, listen)
if err != nil { if err != nil {
f.logger.Debug("could not register personal topic filter", zap.Error(err)) f.logger.Debug("could not register personal topic filter", zap.Error(err))
return nil, err return nil, err
@ -400,7 +399,6 @@ func (f *FiltersManager) LoadPersonal(publicKey *ecdsa.PublicKey, identity *ecds
ChatID: chatID, ChatID: chatID,
FilterID: filter.FilterID, FilterID: filter.FilterID,
ContentTopic: filter.Topic, ContentTopic: filter.Topic,
PubsubTopic: pubsubTopic,
Identity: PublicKeyToStr(publicKey), Identity: PublicKeyToStr(publicKey),
Listen: listen, Listen: listen,
OneToOne: true, OneToOne: true,
@ -427,11 +425,9 @@ func (f *FiltersManager) loadPartitioned(publicKey *ecdsa.PublicKey, identity *e
return f.filters[chatID], nil return f.filters[chatID], nil
} }
pubsubTopic := DefaultShardPubsubTopic()
// We set up a filter so we can publish, // We set up a filter so we can publish,
// but we discard envelopes if listen is false. // but we discard envelopes if listen is false.
filter, err := f.addAsymmetric(chatID, pubsubTopic, identity, listen) filter, err := f.addAsymmetric(chatID, "", identity, listen)
if err != nil { if err != nil {
f.logger.Debug("could not register partitioned topic", zap.String("chatID", chatID), zap.Error(err)) f.logger.Debug("could not register partitioned topic", zap.String("chatID", chatID), zap.Error(err))
return nil, err return nil, err
@ -441,7 +437,6 @@ func (f *FiltersManager) loadPartitioned(publicKey *ecdsa.PublicKey, identity *e
ChatID: chatID, ChatID: chatID,
FilterID: filter.FilterID, FilterID: filter.FilterID,
ContentTopic: filter.Topic, ContentTopic: filter.Topic,
PubsubTopic: pubsubTopic,
Identity: PublicKeyToStr(publicKey), Identity: PublicKeyToStr(publicKey),
Listen: listen, Listen: listen,
Ephemeral: ephemeral, Ephemeral: ephemeral,
@ -466,9 +461,8 @@ func (f *FiltersManager) LoadNegotiated(secret types.NegotiatedSecret) (*Filter,
return f.filters[chatID], nil return f.filters[chatID], nil
} }
pubsubTopic := DefaultShardPubsubTopic()
keyString := hex.EncodeToString(secret.Key) keyString := hex.EncodeToString(secret.Key)
filter, err := f.addSymmetric(keyString, pubsubTopic) filter, err := f.addSymmetric(keyString, "")
if err != nil { if err != nil {
f.logger.Debug("could not register negotiated topic", zap.Error(err)) f.logger.Debug("could not register negotiated topic", zap.Error(err))
return nil, err return nil, err
@ -477,7 +471,6 @@ func (f *FiltersManager) LoadNegotiated(secret types.NegotiatedSecret) (*Filter,
chat := &Filter{ chat := &Filter{
ChatID: chatID, ChatID: chatID,
ContentTopic: filter.Topic, ContentTopic: filter.Topic,
PubsubTopic: pubsubTopic,
SymKeyID: filter.SymKeyID, SymKeyID: filter.SymKeyID,
FilterID: filter.FilterID, FilterID: filter.FilterID,
Identity: PublicKeyToStr(secret.PublicKey), Identity: PublicKeyToStr(secret.PublicKey),
@ -520,7 +513,6 @@ func (f *FiltersManager) LoadDiscovery() ([]*Filter, error) {
personalDiscoveryChat := &Filter{ personalDiscoveryChat := &Filter{
ChatID: personalDiscoveryTopic, ChatID: personalDiscoveryTopic,
Identity: identityStr, Identity: identityStr,
PubsubTopic: DefaultShardPubsubTopic(),
Discovery: true, Discovery: true,
Listen: true, Listen: true,
OneToOne: true, OneToOne: true,
@ -591,9 +583,7 @@ func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, erro
return f.filters[chatID], nil return f.filters[chatID], nil
} }
pubsubTopic := DefaultShardPubsubTopic() contactCodeFilter, err := f.addSymmetric(chatID, "")
contactCodeFilter, err := f.addSymmetric(chatID, pubsubTopic)
if err != nil { if err != nil {
f.logger.Debug("could not register contact code topic", zap.String("chatID", chatID), zap.Error(err)) f.logger.Debug("could not register contact code topic", zap.String("chatID", chatID), zap.Error(err))
return nil, err return nil, err
@ -605,7 +595,6 @@ func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, erro
ContentTopic: contactCodeFilter.Topic, ContentTopic: contactCodeFilter.Topic,
SymKeyID: contactCodeFilter.SymKeyID, SymKeyID: contactCodeFilter.SymKeyID,
Identity: PublicKeyToStr(pubKey), Identity: PublicKeyToStr(pubKey),
PubsubTopic: pubsubTopic,
Listen: true, Listen: true,
} }

View File

@ -15,8 +15,6 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/status-im/status-go/connection" "github.com/status-im/status-go/connection"
"github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/crypto"
@ -166,8 +164,8 @@ func (t *Transport) LoadFilters(filters []*Filter) ([]*Filter, error) {
return t.filters.InitWithFilters(filters) return t.filters.InitWithFilters(filters)
} }
func (t *Transport) InitCommunityFilters(communityFiltersToInitialize []CommunityFilterToInitialize) ([]*Filter, error) { func (t *Transport) InitCommunityFilters(communityFiltersToInitialize []CommunityFilterToInitialize, useShards bool) ([]*Filter, error) {
return t.filters.InitCommunityFilters(communityFiltersToInitialize) return t.filters.InitCommunityFilters(communityFiltersToInitialize, useShards)
} }
func (t *Transport) RemoveFilters(filters []*Filter) error { func (t *Transport) RemoveFilters(filters []*Filter) error {
@ -191,7 +189,7 @@ func (t *Transport) ProcessNegotiatedSecret(secret types.NegotiatedSecret) (*Fil
} }
func (t *Transport) JoinPublic(chatID string) (*Filter, error) { func (t *Transport) JoinPublic(chatID string) (*Filter, error) {
return t.filters.LoadPublic(chatID, DefaultShardPubsubTopic()) return t.filters.LoadPublic(chatID, "")
} }
func (t *Transport) LeavePublic(chatID string) error { func (t *Transport) LeavePublic(chatID string) error {
@ -288,6 +286,7 @@ func (t *Transport) SendPublic(ctx context.Context, newMessage *types.NewMessage
newMessage.SymKeyID = filter.SymKeyID newMessage.SymKeyID = filter.SymKeyID
newMessage.Topic = filter.ContentTopic newMessage.Topic = filter.ContentTopic
newMessage.PubsubTopic = filter.PubsubTopic
return t.api.Post(ctx, *newMessage) return t.api.Post(ctx, *newMessage)
} }
@ -307,6 +306,7 @@ func (t *Transport) SendPrivateWithSharedSecret(ctx context.Context, newMessage
newMessage.SymKeyID = filter.SymKeyID newMessage.SymKeyID = filter.SymKeyID
newMessage.Topic = filter.ContentTopic newMessage.Topic = filter.ContentTopic
newMessage.PubsubTopic = filter.PubsubTopic
newMessage.PublicKey = nil newMessage.PublicKey = nil
return t.api.Post(ctx, *newMessage) return t.api.Post(ctx, *newMessage)
@ -322,6 +322,7 @@ func (t *Transport) SendPrivateWithPartitioned(ctx context.Context, newMessage *
return nil, err return nil, err
} }
newMessage.PubsubTopic = filter.PubsubTopic
newMessage.Topic = filter.ContentTopic newMessage.Topic = filter.ContentTopic
newMessage.PublicKey = crypto.FromECDSAPub(publicKey) newMessage.PublicKey = crypto.FromECDSAPub(publicKey)
@ -338,6 +339,7 @@ func (t *Transport) SendPrivateOnPersonalTopic(ctx context.Context, newMessage *
return nil, err return nil, err
} }
newMessage.PubsubTopic = filter.PubsubTopic
newMessage.Topic = filter.ContentTopic newMessage.Topic = filter.ContentTopic
newMessage.PublicKey = crypto.FromECDSAPub(publicKey) newMessage.PublicKey = crypto.FromECDSAPub(publicKey)
@ -363,6 +365,7 @@ func (t *Transport) SendCommunityMessage(ctx context.Context, newMessage *types.
return nil, err return nil, err
} }
newMessage.PubsubTopic = filter.PubsubTopic
newMessage.Topic = filter.ContentTopic newMessage.Topic = filter.ContentTopic
newMessage.PublicKey = crypto.FromECDSAPub(publicKey) newMessage.PublicKey = crypto.FromECDSAPub(publicKey)
@ -676,31 +679,3 @@ func (t *Transport) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey)
func (t *Transport) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) { func (t *Transport) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) {
return t.waku.RetrievePubsubTopicKey(topic) return t.waku.RetrievePubsubTopicKey(topic)
} }
func GetPubsubTopic(shard *Shard) string {
if shard != nil {
return protocol.NewStaticShardingPubsubTopic(shard.Cluster, shard.Index).String()
}
return DefaultShardPubsubTopic()
}
func DefaultNonProtectedPubsubTopic(shard *Shard) string {
if shard != nil {
return GetPubsubTopic(&Shard{
Cluster: MainStatusShardCluster,
Index: NonProtectedShardIndex,
})
}
return DefaultShardPubsubTopic()
}
const MainStatusShardCluster = 16
const DefaultShardIndex = 32
const NonProtectedShardIndex = 64
const UndefinedShardValue = 0
func DefaultShardPubsubTopic() string {
return protocol.NewStaticShardingPubsubTopic(MainStatusShardCluster, DefaultShardIndex).String()
}

View File

@ -10,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
utils "github.com/status-im/status-go/common"
"github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/encryption" "github.com/status-im/status-go/protocol/encryption"
@ -152,12 +153,13 @@ func (m *StatusMessage) HandleEncryptionLayer(myKey *ecdsa.PrivateKey, senderKey
} }
func (m *StatusMessage) HandleApplicationLayer() error { func (m *StatusMessage) HandleApplicationLayer() error {
message, err := protobuf.Unmarshal(m.EncryptionLayer.Payload) message, err := protobuf.Unmarshal(m.EncryptionLayer.Payload)
if err != nil { if err != nil {
return err return err
} }
recoveredKey, err := message.RecoverKey() recoveredKey, err := utils.RecoverKey(message)
if err != nil { if err != nil {
return err return err
} }

View File

@ -13,6 +13,7 @@ import (
"github.com/status-im/status-go/images" "github.com/status-im/status-go/images"
"github.com/status-im/status-go/protocol" "github.com/status-im/status-go/protocol"
"github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/common/shard"
"github.com/status-im/status-go/protocol/communities" "github.com/status-im/status-go/protocol/communities"
"github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/requests" "github.com/status-im/status-go/protocol/requests"
@ -106,7 +107,7 @@ type ChannelGroup struct {
CheckChannelPermissionResponses map[string]*communities.CheckChannelPermissionsResponse `json:"checkChannelPermissionResponses"` CheckChannelPermissionResponses map[string]*communities.CheckChannelPermissionsResponse `json:"checkChannelPermissionResponses"`
PubsubTopic string `json:"pubsubTopic"` PubsubTopic string `json:"pubsubTopic"`
PubsubTopicKey string `json:"pubsubTopicKey"` PubsubTopicKey string `json:"pubsubTopicKey"`
Shard *common.Shard `json:"shard"` Shard *shard.Shard `json:"shard"`
} }
func NewAPI(service *Service) *API { func NewAPI(service *Service) *API {

View File

@ -26,6 +26,7 @@ import (
"github.com/status-im/status-go/multiaccounts/settings" "github.com/status-im/status-go/multiaccounts/settings"
"github.com/status-im/status-go/protocol" "github.com/status-im/status-go/protocol"
"github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/common/shard"
"github.com/status-im/status-go/protocol/communities" "github.com/status-im/status-go/protocol/communities"
"github.com/status-im/status-go/protocol/communities/token" "github.com/status-im/status-go/protocol/communities/token"
"github.com/status-im/status-go/protocol/discord" "github.com/status-im/status-go/protocol/discord"
@ -1228,7 +1229,7 @@ func (api *PublicAPI) RequestCommunityInfoFromMailserver(communityID string) (*c
// Deprecated: RequestCommunityInfoFromMailserverWithShard is deprecated in favor of // Deprecated: RequestCommunityInfoFromMailserverWithShard is deprecated in favor of
// configurable FetchCommunity. // configurable FetchCommunity.
func (api *PublicAPI) RequestCommunityInfoFromMailserverWithShard(communityID string, shard *common.Shard) (*communities.Community, error) { func (api *PublicAPI) RequestCommunityInfoFromMailserverWithShard(communityID string, shard *shard.Shard) (*communities.Community, error) {
request := &protocol.FetchCommunityRequest{ request := &protocol.FetchCommunityRequest{
CommunityKey: communityID, CommunityKey: communityID,
Shard: shard, Shard: shard,
@ -1253,7 +1254,7 @@ func (api *PublicAPI) RequestCommunityInfoFromMailserverAsync(communityID string
// Deprecated: RequestCommunityInfoFromMailserverAsyncWithShard is deprecated in favor of // Deprecated: RequestCommunityInfoFromMailserverAsyncWithShard is deprecated in favor of
// configurable FetchCommunity. // configurable FetchCommunity.
func (api *PublicAPI) RequestCommunityInfoFromMailserverAsyncWithShard(communityID string, shard *common.Shard) error { func (api *PublicAPI) RequestCommunityInfoFromMailserverAsyncWithShard(communityID string, shard *shard.Shard) error {
request := &protocol.FetchCommunityRequest{ request := &protocol.FetchCommunityRequest{
CommunityKey: communityID, CommunityKey: communityID,
Shard: shard, Shard: shard,

View File

@ -39,6 +39,7 @@ import (
"github.com/status-im/status-go/protocol" "github.com/status-im/status-go/protocol"
"github.com/status-im/status-go/protocol/anonmetrics" "github.com/status-im/status-go/protocol/anonmetrics"
"github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/common/shard"
"github.com/status-im/status-go/protocol/communities" "github.com/status-im/status-go/protocol/communities"
"github.com/status-im/status-go/protocol/communities/token" "github.com/status-im/status-go/protocol/communities/token"
"github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/protobuf"
@ -673,7 +674,7 @@ func (s *Service) fetchCommunity(communityID string) (*communities.Community, er
// TODO: we need the shard information in the collectible to be able to retrieve info for // TODO: we need the shard information in the collectible to be able to retrieve info for
// communities that have specific shards // communities that have specific shards
var shard *common.Shard = nil // TODO: build this with info from token var shard *shard.Shard = nil // TODO: build this with info from token
community, err := s.messenger.FetchCommunity(&protocol.FetchCommunityRequest{ community, err := s.messenger.FetchCommunity(&protocol.FetchCommunityRequest{
CommunityKey: communityID, CommunityKey: communityID,
Shard: shard, Shard: shard,

View File

@ -8,6 +8,7 @@ import (
"github.com/status-im/status-go/appdatabase" "github.com/status-im/status-go/appdatabase"
"github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/common/shard"
"github.com/status-im/status-go/protocol/transport" "github.com/status-im/status-go/protocol/transport"
"github.com/status-im/status-go/t/helpers" "github.com/status-im/status-go/t/helpers"
) )
@ -58,9 +59,9 @@ func TestTopic(t *testing.T) {
defer close() defer close()
topicA := "0x61000000" topicA := "0x61000000"
topicD := "0x64000000" topicD := "0x64000000"
topic1 := MailserverTopic{PubsubTopic: transport.DefaultShardPubsubTopic(), ContentTopic: topicA, LastRequest: 1} topic1 := MailserverTopic{PubsubTopic: shard.DefaultShardPubsubTopic(), ContentTopic: topicA, LastRequest: 1}
topic2 := MailserverTopic{PubsubTopic: transport.DefaultShardPubsubTopic(), ContentTopic: "0x6200000", LastRequest: 2} topic2 := MailserverTopic{PubsubTopic: shard.DefaultShardPubsubTopic(), ContentTopic: "0x6200000", LastRequest: 2}
topic3 := MailserverTopic{PubsubTopic: transport.DefaultShardPubsubTopic(), ContentTopic: "0x6300000", LastRequest: 3} topic3 := MailserverTopic{PubsubTopic: shard.DefaultShardPubsubTopic(), ContentTopic: "0x6300000", LastRequest: 3}
require.NoError(t, db.AddTopic(topic1)) require.NoError(t, db.AddTopic(topic1))
require.NoError(t, db.AddTopic(topic2)) require.NoError(t, db.AddTopic(topic2))
@ -73,14 +74,14 @@ func TestTopic(t *testing.T) {
filters := []*transport.Filter{ filters := []*transport.Filter{
// Existing topic, is not updated // Existing topic, is not updated
{ {
PubsubTopic: transport.DefaultShardPubsubTopic(), PubsubTopic: shard.DefaultShardPubsubTopic(),
ContentTopic: types.BytesToTopic([]byte{0x61}), ContentTopic: types.BytesToTopic([]byte{0x61}),
}, },
// Non existing topic is not inserted // Non existing topic is not inserted
{ {
Discovery: true, Discovery: true,
Negotiated: true, Negotiated: true,
PubsubTopic: transport.DefaultShardPubsubTopic(), PubsubTopic: shard.DefaultShardPubsubTopic(),
ContentTopic: types.BytesToTopic([]byte{0x64}), ContentTopic: types.BytesToTopic([]byte{0x64}),
}, },
} }
@ -156,7 +157,7 @@ func TestAddGetDeleteMailserverTopics(t *testing.T) {
defer close() defer close()
api := &API{db: db} api := &API{db: db}
testTopic := MailserverTopic{ testTopic := MailserverTopic{
PubsubTopic: transport.DefaultShardPubsubTopic(), PubsubTopic: shard.DefaultShardPubsubTopic(),
ContentTopic: "topic-001", ContentTopic: "topic-001",
ChatIDs: []string{"chatID01", "chatID02"}, ChatIDs: []string{"chatID01", "chatID02"},
LastRequest: 10, LastRequest: 10,
@ -169,14 +170,14 @@ func TestAddGetDeleteMailserverTopics(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, []MailserverTopic{testTopic}, topics) require.EqualValues(t, []MailserverTopic{testTopic}, topics)
err = api.DeleteMailserverTopic(context.Background(), transport.DefaultShardPubsubTopic(), testTopic.ContentTopic) err = api.DeleteMailserverTopic(context.Background(), shard.DefaultShardPubsubTopic(), testTopic.ContentTopic)
require.NoError(t, err) require.NoError(t, err)
topics, err = api.GetMailserverTopics(context.Background()) topics, err = api.GetMailserverTopics(context.Background())
require.NoError(t, err) require.NoError(t, err)
require.EqualValues(t, ([]MailserverTopic)(nil), topics) require.EqualValues(t, ([]MailserverTopic)(nil), topics)
// Delete non-existing topic. // Delete non-existing topic.
err = api.DeleteMailserverTopic(context.Background(), transport.DefaultShardPubsubTopic(), "non-existing-topic") err = api.DeleteMailserverTopic(context.Background(), shard.DefaultShardPubsubTopic(), "non-existing-topic")
require.NoError(t, err) require.NoError(t, err)
} }

View File

@ -10,7 +10,7 @@ import (
"github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol" "github.com/status-im/status-go/protocol"
"github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/common/shard"
) )
// Make sure that Service implements node.Lifecycle interface. // Make sure that Service implements node.Lifecycle interface.
@ -70,7 +70,7 @@ type PublicAPI struct {
service *Service service *Service
} }
func (p *PublicAPI) CommunityInfo(communityID types.HexBytes, shard *common.Shard) (json.RawMessage, error) { func (p *PublicAPI) CommunityInfo(communityID types.HexBytes, shard *shard.Shard) (json.RawMessage, error) {
if p.service.messenger == nil { if p.service.messenger == nil {
return nil, ErrNotInitialized return nil, ErrNotInitialized
} }

View File

@ -616,12 +616,19 @@ func (w *Waku) runPeerExchangeLoop() {
} }
} }
func (w *Waku) getPubsubTopic(topic string) string {
if topic == "" || !w.cfg.UseShardAsDefaultTopic {
topic = w.settings.DefaultPubsubTopic
}
return topic
}
func (w *Waku) subscribeToPubsubTopicWithWakuRelay(topic string, pubkey *ecdsa.PublicKey) error { func (w *Waku) subscribeToPubsubTopicWithWakuRelay(topic string, pubkey *ecdsa.PublicKey) error {
if w.settings.LightClient { if w.settings.LightClient {
return errors.New("only available for full nodes") return errors.New("only available for full nodes")
} }
topic = getPubsubTopic(topic, w.cfg.UseShardAsDefaultTopic, w.settings.DefaultPubsubTopic) topic = w.getPubsubTopic(topic)
if w.node.Relay().IsSubscribed(topic) { if w.node.Relay().IsSubscribed(topic) {
return nil return nil
@ -949,10 +956,7 @@ func (w *Waku) GetSymKey(id string) ([]byte, error) {
// Subscribe installs a new message handler used for filtering, decrypting // Subscribe installs a new message handler used for filtering, decrypting
// and subsequent storing of incoming messages. // and subsequent storing of incoming messages.
func (w *Waku) Subscribe(f *common.Filter) (string, error) { func (w *Waku) Subscribe(f *common.Filter) (string, error) {
if f.PubsubTopic == "" { f.PubsubTopic = w.getPubsubTopic(f.PubsubTopic)
f.PubsubTopic = w.settings.DefaultPubsubTopic
}
id, err := w.filters.Install(f) id, err := w.filters.Install(f)
if err != nil { if err != nil {
return id, err return id, err
@ -1005,20 +1009,11 @@ func (w *Waku) UnsubscribeMany(ids []string) error {
return nil return nil
} }
func getPubsubTopic(pubsubTopic string, useShardAsDefaultTopic bool, defaultShardPubsubTopic string) string {
// Override the pubsub topic used, in case the default shard is being used
// and the configuration indicates we need to use the default waku topic from relay
if !useShardAsDefaultTopic && pubsubTopic == defaultShardPubsubTopic {
pubsubTopic = relay.DefaultWakuTopic
}
return pubsubTopic
}
func (w *Waku) broadcast() { func (w *Waku) broadcast() {
for { for {
select { select {
case envelope := <-w.sendQueue: case envelope := <-w.sendQueue:
pubsubTopic := getPubsubTopic(envelope.PubsubTopic(), w.cfg.UseShardAsDefaultTopic, w.cfg.DefaultShardPubsubTopic) pubsubTopic := envelope.PubsubTopic()
var err error var err error
logger := w.logger.With(zap.String("envelopeHash", hexutil.Encode(envelope.Hash())), zap.String("pubsubTopic", pubsubTopic), zap.String("contentTopic", envelope.Message().ContentTopic), zap.Int64("timestamp", envelope.Message().Timestamp)) logger := w.logger.With(zap.String("envelopeHash", hexutil.Encode(envelope.Hash())), zap.String("pubsubTopic", pubsubTopic), zap.String("contentTopic", envelope.Message().ContentTopic), zap.Int64("timestamp", envelope.Message().Timestamp))
if w.settings.LightClient { if w.settings.LightClient {
@ -1055,10 +1050,7 @@ func (w *Waku) broadcast() {
// Send injects a message into the waku send queue, to be distributed in the // Send injects a message into the waku send queue, to be distributed in the
// network in the coming cycles. // network in the coming cycles.
func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage) ([]byte, error) { func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage) ([]byte, error) {
if pubsubTopic == "" { pubsubTopic = w.getPubsubTopic(pubsubTopic)
pubsubTopic = w.settings.DefaultPubsubTopic
}
if w.protectedTopicStore != nil { if w.protectedTopicStore != nil {
privKey, err := w.protectedTopicStore.FetchPrivateKey(pubsubTopic) privKey, err := w.protectedTopicStore.FetchPrivateKey(pubsubTopic)
if err != nil { if err != nil {
@ -1323,12 +1315,6 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag
return nil return nil
} }
// Override the message pubsub topci in case the configuration indicates we shouldn't
// use the default shard but the default waku topic from relay instead
if !w.cfg.UseShardAsDefaultTopic && recvMessage.PubsubTopic == relay.DefaultWakuTopic {
recvMessage.PubsubTopic = w.cfg.DefaultShardPubsubTopic
}
if w.statusTelemetryClient != nil { if w.statusTelemetryClient != nil {
w.statusTelemetryClient.PushReceivedEnvelope(envelope) w.statusTelemetryClient.PushReceivedEnvelope(envelope)
} }
@ -1483,6 +1469,8 @@ func (w *Waku) ListenAddresses() []string {
} }
func (w *Waku) SubscribeToPubsubTopic(topic string, pubkey *ecdsa.PublicKey) error { func (w *Waku) SubscribeToPubsubTopic(topic string, pubkey *ecdsa.PublicKey) error {
topic = w.getPubsubTopic(topic)
if !w.settings.LightClient { if !w.settings.LightClient {
err := w.subscribeToPubsubTopicWithWakuRelay(topic, pubkey) err := w.subscribeToPubsubTopicWithWakuRelay(topic, pubkey)
if err != nil { if err != nil {
@ -1493,6 +1481,7 @@ func (w *Waku) SubscribeToPubsubTopic(topic string, pubkey *ecdsa.PublicKey) err
} }
func (w *Waku) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) { func (w *Waku) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) {
topic = w.getPubsubTopic(topic)
if w.protectedTopicStore == nil { if w.protectedTopicStore == nil {
return nil, nil return nil, nil
} }
@ -1501,6 +1490,7 @@ func (w *Waku) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) {
} }
func (w *Waku) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) error { func (w *Waku) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) error {
topic = w.getPubsubTopic(topic)
if w.protectedTopicStore == nil { if w.protectedTopicStore == nil {
return nil return nil
} }

View File

@ -234,7 +234,7 @@ func TestWakuV2Filter(t *testing.T) {
msgTimestamp := w.timestamp() msgTimestamp := w.timestamp()
contentTopic := maps.Keys(filter.ContentTopics)[0] contentTopic := maps.Keys(filter.ContentTopics)[0]
_, err = w.Send(relay.DefaultWakuTopic, &pb.WakuMessage{ _, err = w.Send("", &pb.WakuMessage{
Payload: []byte{1, 2, 3, 4, 5}, Payload: []byte{1, 2, 3, 4, 5},
ContentTopic: contentTopic.ContentTopic(), ContentTopic: contentTopic.ContentTopic(),
Version: 0, Version: 0,
@ -242,7 +242,7 @@ func TestWakuV2Filter(t *testing.T) {
}) })
require.NoError(t, err) require.NoError(t, err)
time.Sleep(5 * time.Second) time.Sleep(15 * time.Second)
// Ensure there is at least 1 active filter subscription // Ensure there is at least 1 active filter subscription
subscriptions := w.node.FilterLightnode().Subscriptions() subscriptions := w.node.FilterLightnode().Subscriptions()