From 1794b93c16b1f68cdb7125b4b66cc8926da9eed2 Mon Sep 17 00:00:00 2001 From: Vitaly Vlasov Date: Wed, 15 Nov 2023 17:58:15 +0200 Subject: [PATCH] Always set PubsubTopic in filters --- cmd/ping-community/main.go | 14 +-- .../protobuf/pubkey.go => common/utils.go | 5 +- node/status_node_services.go | 4 +- protocol/common/message_sender.go | 2 - protocol/common/shard.go | 44 --------- protocol/common/shard/shard.go | 55 +++++++++++ protocol/communities/community.go | 31 ++----- protocol/communities/community_event.go | 3 +- protocol/communities/manager.go | 31 +++---- protocol/communities/persistence.go | 7 +- .../communities_messenger_helpers_test.go | 5 +- protocol/linkpreview_unfurler_status.go | 5 +- protocol/messenger.go | 17 ++-- protocol/messenger_communities.go | 91 +++++++++++++------ .../messenger_communities_import_discord.go | 6 +- protocol/messenger_contacts.go | 2 +- protocol/messenger_handler.go | 3 +- protocol/messenger_mailserver.go | 2 +- protocol/messenger_share_urls.go | 7 +- protocol/requests/set_community_shard.go | 7 +- protocol/transport/filters_manager.go | 53 +++++------ protocol/transport/transport.go | 41 ++------- protocol/v1/status_message.go | 4 +- services/chat/api.go | 3 +- services/ext/api.go | 5 +- services/ext/service.go | 3 +- services/mailservers/api_test.go | 17 ++-- services/status/service.go | 4 +- wakuv2/waku.go | 40 +++----- wakuv2/waku_test.go | 4 +- 30 files changed, 250 insertions(+), 265 deletions(-) rename protocol/protobuf/pubkey.go => common/utils.go (65%) delete mode 100644 protocol/common/shard.go create mode 100644 protocol/common/shard/shard.go diff --git a/cmd/ping-community/main.go b/cmd/ping-community/main.go index d57ab7fbf..6072933cc 100644 --- a/cmd/ping-community/main.go +++ b/cmd/ping-community/main.go @@ -29,9 +29,9 @@ import ( "github.com/status-im/status-go/params" "github.com/status-im/status-go/protocol" "github.com/status-im/status-go/protocol/common" + "github.com/status-im/status-go/protocol/common/shard" "github.com/status-im/status-go/protocol/identity/alias" "github.com/status-im/status-go/protocol/protobuf" - "github.com/status-im/status-go/protocol/transport" wakuextn "github.com/status-im/status-go/services/wakuext" ) @@ -48,8 +48,8 @@ var ( seedPhrase = flag.String("seed-phrase", "", "Seed phrase") version = flag.Bool("version", false, "Print version and dump configuration") communityID = flag.String("community-id", "", "The id of the community") - shardCluster = flag.Int("shard-cluster", transport.UndefinedShardValue, "The shard cluster in which the of the community is published") - shardIndex = flag.Int("shard-index", transport.UndefinedShardValue, "The shard index in which the community is published") + shardCluster = flag.Int("shard-cluster", shard.UndefinedShardValue, "The shard cluster in which the of the community is published") + shardIndex = flag.Int("shard-index", shard.UndefinedShardValue, "The shard index in which the community is published") chatID = flag.String("chat-id", "", "The id of the chat") dataDir = flag.String("dir", getDefaultDataDir(), "Directory used by node to store data") @@ -148,9 +148,9 @@ func main() { messenger := wakuextservice.Messenger() - var shard *common.Shard = nil - if shardCluster != nil && shardIndex != nil && *shardCluster != transport.UndefinedShardValue && *shardIndex != transport.UndefinedShardValue { - shard = &common.Shard{ + var s *shard.Shard = nil + if shardCluster != nil && shardIndex != nil && *shardCluster != shard.UndefinedShardValue && *shardIndex != shard.UndefinedShardValue { + s = &shard.Shard{ Cluster: uint16(*shardCluster), Index: uint16(*shardIndex), } @@ -158,7 +158,7 @@ func main() { community, err := messenger.FetchCommunity(&protocol.FetchCommunityRequest{ CommunityKey: *communityID, - Shard: shard, + Shard: s, TryDatabase: true, WaitForResponse: true, }) diff --git a/protocol/protobuf/pubkey.go b/common/utils.go similarity index 65% rename from protocol/protobuf/pubkey.go rename to common/utils.go index 77f01624a..43b4bdd95 100644 --- a/protocol/protobuf/pubkey.go +++ b/common/utils.go @@ -1,12 +1,13 @@ -package protobuf +package common import ( "crypto/ecdsa" "github.com/status-im/status-go/eth-node/crypto" + "github.com/status-im/status-go/protocol/protobuf" ) -func (m *ApplicationMetadataMessage) RecoverKey() (*ecdsa.PublicKey, error) { +func RecoverKey(m *protobuf.ApplicationMetadataMessage) (*ecdsa.PublicKey, error) { if m.Signature == nil { return nil, nil } diff --git a/node/status_node_services.go b/node/status_node_services.go index c1809d520..573495601 100644 --- a/node/status_node_services.go +++ b/node/status_node_services.go @@ -7,7 +7,7 @@ import ( "reflect" "time" - "github.com/status-im/status-go/protocol/transport" + "github.com/status-im/status-go/protocol/common/shard" "github.com/status-im/status-go/server" "github.com/status-im/status-go/signal" "github.com/status-im/status-go/transactions" @@ -327,7 +327,7 @@ func (b *StatusNode) wakuV2Service(nodeConfig *params.NodeConfig, telemetryServe EnableDiscV5: nodeConfig.WakuV2Config.EnableDiscV5, UDPPort: nodeConfig.WakuV2Config.UDPPort, AutoUpdate: nodeConfig.WakuV2Config.AutoUpdate, - DefaultShardPubsubTopic: transport.DefaultShardPubsubTopic(), + DefaultShardPubsubTopic: shard.DefaultShardPubsubTopic(), UseShardAsDefaultTopic: nodeConfig.WakuV2Config.UseShardAsDefaultTopic, TelemetryServerURL: telemetryServerURL, } diff --git a/protocol/common/message_sender.go b/protocol/common/message_sender.go index 2827e6d3a..33c6c86dd 100644 --- a/protocol/common/message_sender.go +++ b/protocol/common/message_sender.go @@ -464,8 +464,6 @@ func (s *MessageSender) sendPrivate( messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage) rawMessage.ID = types.EncodeHex(messageID) - rawMessage.PubsubTopic = transport.DefaultShardPubsubTopic() // TODO: determine which pubsub topic should be used for 1:1 messages - if rawMessage.BeforeDispatch != nil { if err := rawMessage.BeforeDispatch(rawMessage); err != nil { return nil, err diff --git a/protocol/common/shard.go b/protocol/common/shard.go deleted file mode 100644 index d4c75f2ab..000000000 --- a/protocol/common/shard.go +++ /dev/null @@ -1,44 +0,0 @@ -package common - -import ( - "github.com/status-im/status-go/protocol/protobuf" - "github.com/status-im/status-go/protocol/transport" -) - -type Shard struct { - Cluster uint16 `json:"cluster"` - Index uint16 `json:"index"` -} - -func ShardFromProtobuff(p *protobuf.Shard) *Shard { - if p == nil { - return nil - } - - return &Shard{ - Cluster: uint16(p.Cluster), - Index: uint16(p.Index), - } -} - -func (s *Shard) TransportShard() *transport.Shard { - if s == nil { - return nil - } - - return &transport.Shard{ - Cluster: s.Cluster, - Index: s.Index, - } -} - -func (s *Shard) Protobuffer() *protobuf.Shard { - if s == nil { - return nil - } - - return &protobuf.Shard{ - Cluster: int32(s.Cluster), - Index: int32(s.Index), - } -} diff --git a/protocol/common/shard/shard.go b/protocol/common/shard/shard.go new file mode 100644 index 000000000..c1354aa0e --- /dev/null +++ b/protocol/common/shard/shard.go @@ -0,0 +1,55 @@ +package shard + +import ( + "github.com/status-im/status-go/protocol/protobuf" + wakuproto "github.com/waku-org/go-waku/waku/v2/protocol" +) + +type Shard struct { + Cluster uint16 `json:"cluster"` + Index uint16 `json:"index"` +} + +func FromProtobuff(p *protobuf.Shard) *Shard { + if p == nil { + return nil + } + + return &Shard{ + Cluster: uint16(p.Cluster), + Index: uint16(p.Index), + } +} + +func (s *Shard) Protobuffer() *protobuf.Shard { + if s == nil { + return nil + } + + return &protobuf.Shard{ + Cluster: int32(s.Cluster), + Index: int32(s.Index), + } +} +func (s *Shard) PubsubTopic() string { + if s != nil { + return wakuproto.NewStaticShardingPubsubTopic(s.Cluster, s.Index).String() + } + return "" +} + +func DefaultNonProtectedPubsubTopic() string { + return (&Shard{ + Cluster: MainStatusShardCluster, + Index: NonProtectedShardIndex, + }).PubsubTopic() +} + +const MainStatusShardCluster = 16 +const DefaultShardIndex = 32 +const NonProtectedShardIndex = 64 +const UndefinedShardValue = 0 + +func DefaultShardPubsubTopic() string { + return wakuproto.NewStaticShardingPubsubTopic(MainStatusShardCluster, DefaultShardIndex).String() +} diff --git a/protocol/communities/community.go b/protocol/communities/community.go index a91fdc865..3a455757d 100644 --- a/protocol/communities/community.go +++ b/protocol/communities/community.go @@ -19,10 +19,10 @@ import ( "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/images" "github.com/status-im/status-go/protocol/common" + "github.com/status-im/status-go/protocol/common/shard" community_token "github.com/status-im/status-go/protocol/communities/token" "github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/requests" - "github.com/status-im/status-go/protocol/transport" "github.com/status-im/status-go/protocol/v1" ) @@ -47,7 +47,7 @@ type Config struct { MemberIdentity *ecdsa.PublicKey SyncedAt uint64 EventsData *EventsData - Shard *common.Shard + Shard *shard.Shard PubsubTopicPrivateKey *ecdsa.PrivateKey } @@ -147,7 +147,7 @@ func (o *Community) MarshalPublicAPIJSON() ([]byte, error) { ActiveMembersCount uint64 `json:"activeMembersCount"` PubsubTopic string `json:"pubsubTopic"` PubsubTopicKey string `json:"pubsubTopicKey"` - Shard *common.Shard `json:"shard"` + Shard *shard.Shard `json:"shard"` }{ ID: o.ID(), Verified: o.config.Verified, @@ -265,7 +265,7 @@ func (o *Community) MarshalJSON() ([]byte, error) { ActiveMembersCount uint64 `json:"activeMembersCount"` PubsubTopic string `json:"pubsubTopic"` PubsubTopicKey string `json:"pubsubTopicKey"` - Shard *common.Shard `json:"shard"` + Shard *shard.Shard `json:"shard"` }{ ID: o.ID(), MemberRole: o.MemberRole(o.MemberIdentity()), @@ -382,7 +382,7 @@ func (o *Community) DescriptionText() string { return "" } -func (o *Community) Shard() *common.Shard { +func (o *Community) Shard() *shard.Shard { if o != nil && o.config != nil { return o.config.Shard } @@ -1321,7 +1321,7 @@ func (o *Community) MemberUpdateChannelID() string { } func (o *Community) PubsubTopic() string { - return transport.GetPubsubTopic(o.Shard().TransportShard()) + return o.Shard().PubsubTopic() } func (o *Community) PubsubTopicPrivateKey() *ecdsa.PrivateKey { @@ -1339,25 +1339,6 @@ func (o *Community) PubsubTopicKey() string { return hexutil.Encode(crypto.FromECDSAPub(&o.config.PubsubTopicPrivateKey.PublicKey)) } -func (o *Community) DefaultFilters() []transport.FiltersToInitialize { - cID := o.IDString() - uncompressedPubKey := common.PubkeyToHex(o.config.ID)[2:] - updatesChannelID := o.StatusUpdatesChannelID() - mlChannelID := o.MagnetlinkMessageChannelID() - memberUpdateChannelID := o.MemberUpdateChannelID() - - communityPubsubTopic := o.PubsubTopic() - - return []transport.FiltersToInitialize{ - {ChatID: cID, PubsubTopic: communityPubsubTopic}, - {ChatID: uncompressedPubKey, PubsubTopic: transport.DefaultNonProtectedPubsubTopic(o.Shard().TransportShard())}, - {ChatID: uncompressedPubKey, PubsubTopic: communityPubsubTopic}, - {ChatID: updatesChannelID, PubsubTopic: communityPubsubTopic}, - {ChatID: mlChannelID, PubsubTopic: communityPubsubTopic}, - {ChatID: memberUpdateChannelID, PubsubTopic: communityPubsubTopic}, - } -} - func (o *Community) PrivateKey() *ecdsa.PrivateKey { return o.config.PrivateKey } diff --git a/protocol/communities/community_event.go b/protocol/communities/community_event.go index 801c9e4f4..afac18aa9 100644 --- a/protocol/communities/community_event.go +++ b/protocol/communities/community_event.go @@ -7,6 +7,7 @@ import ( "github.com/golang/protobuf/proto" + utils "github.com/status-im/status-go/common" "github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/protobuf" ) @@ -382,7 +383,7 @@ func validateAndGetEventsMessageCommunityDescription(signedDescription []byte, s return nil, ErrInvalidMessage } - signer, err := metadata.RecoverKey() + signer, err := utils.RecoverKey(metadata) if err != nil { return nil, err } diff --git a/protocol/communities/manager.go b/protocol/communities/manager.go index 79c7c38d3..248930381 100644 --- a/protocol/communities/manager.go +++ b/protocol/communities/manager.go @@ -25,11 +25,13 @@ import ( gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/status-im/status-go/account" + utils "github.com/status-im/status-go/common" "github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/images" "github.com/status-im/status-go/params" "github.com/status-im/status-go/protocol/common" + "github.com/status-im/status-go/protocol/common/shard" community_token "github.com/status-im/status-go/protocol/communities/token" "github.com/status-im/status-go/protocol/encryption" "github.com/status-im/status-go/protocol/ens" @@ -633,8 +635,8 @@ func (m *Manager) All() ([]*Community, error) { } type CommunityShard struct { - CommunityID string `json:"communityID"` - Shard *common.Shard `json:"shard"` + CommunityID string `json:"communityID"` + Shard *shard.Shard `json:"shard"` } type CuratedCommunities struct { @@ -1086,13 +1088,13 @@ func (m *Manager) DeleteCommunity(id types.HexBytes) error { return m.persistence.DeleteCommunitySettings(id) } -func (m *Manager) UpdateShard(community *Community, shard *common.Shard) error { +func (m *Manager) UpdateShard(community *Community, shard *shard.Shard) error { community.config.Shard = shard return m.persistence.SaveCommunity(community) } // SetShard assigns a shard to a community -func (m *Manager) SetShard(communityID types.HexBytes, shard *common.Shard) (*Community, error) { +func (m *Manager) SetShard(communityID types.HexBytes, shard *shard.Shard) (*Community, error) { community, err := m.GetByID(communityID) if err != nil { return nil, err @@ -1118,7 +1120,8 @@ func (m *Manager) UpdatePubsubTopicPrivateKey(community *Community, privKey *ecd community.SetPubsubTopicPrivateKey(privKey) if privKey != nil { - if err := m.transport.StorePubsubTopicKey(community.PubsubTopic(), privKey); err != nil { + topic := community.PubsubTopic() + if err := m.transport.StorePubsubTopicKey(topic, privKey); err != nil { return err } } @@ -2910,7 +2913,7 @@ func UnwrapCommunityDescriptionMessage(payload []byte) (*ecdsa.PublicKey, *proto if applicationMetadataMessage.Type != protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION { return nil, nil, ErrInvalidMessage } - signer, err := applicationMetadataMessage.RecoverKey() + signer, err := utils.RecoverKey(applicationMetadataMessage) if err != nil { return nil, nil, err } @@ -3207,7 +3210,8 @@ func (m *Manager) initializeCommunity(community *Community) error { } if m.transport != nil && m.transport.WakuVersion() == 2 { - privKey, err := m.transport.RetrievePubsubTopicKey(community.PubsubTopic()) + topic := community.PubsubTopic() + privKey, err := m.transport.RetrievePubsubTopicKey(topic) if err != nil { return err } @@ -3370,19 +3374,6 @@ func (m *Manager) DeclinedPendingRequestsToJoinForCommunity(id types.HexBytes) ( } -func (m *Manager) GetPubsubTopic(communityID string) (string, error) { - community, err := m.GetByIDString(communityID) - if err != nil { - return "", err - } - - if community == nil { - return transport.DefaultShardPubsubTopic(), nil - } - - return transport.GetPubsubTopic(community.Shard().TransportShard()), nil -} - func (m *Manager) RequestsToJoinForCommunityAwaitingAddresses(id types.HexBytes) ([]*RequestToJoin, error) { m.logger.Info("fetching ownership changed invitations", zap.String("community-id", id.String())) return m.persistence.RequestsToJoinForCommunityAwaitingAddresses(id) diff --git a/protocol/communities/persistence.go b/protocol/communities/persistence.go index 0e2a6bb1e..a1b4246cf 100644 --- a/protocol/communities/persistence.go +++ b/protocol/communities/persistence.go @@ -17,6 +17,7 @@ import ( "github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/protocol/common" + "github.com/status-im/status-go/protocol/common/shard" "github.com/status-im/status-go/protocol/communities/token" "github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/services/wallet/bigint" @@ -377,9 +378,9 @@ func (p *Persistence) unmarshalCommunityFromDB(memberIdentity *ecdsa.PublicKey, } - var shard *common.Shard = nil + var s *shard.Shard = nil if cluster != nil && index != nil { - shard = &common.Shard{ + s = &shard.Shard{ Cluster: uint16(*cluster), Index: uint16(*index), } @@ -401,7 +402,7 @@ func (p *Persistence) unmarshalCommunityFromDB(memberIdentity *ecdsa.PublicKey, Joined: joined, Spectated: spectated, EventsData: eventsData, - Shard: shard, + Shard: s, } community, err := New(config, p.timesource) if err != nil { diff --git a/protocol/communities_messenger_helpers_test.go b/protocol/communities_messenger_helpers_test.go index 57fec1911..d3735a85b 100644 --- a/protocol/communities_messenger_helpers_test.go +++ b/protocol/communities_messenger_helpers_test.go @@ -29,10 +29,10 @@ import ( "github.com/status-im/status-go/multiaccounts/settings" "github.com/status-im/status-go/params" "github.com/status-im/status-go/protocol/common" + "github.com/status-im/status-go/protocol/common/shard" "github.com/status-im/status-go/protocol/communities" "github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/requests" - "github.com/status-im/status-go/protocol/transport" "github.com/status-im/status-go/protocol/tt" "github.com/status-im/status-go/services/communitytokens" walletToken "github.com/status-im/status-go/services/wallet/token" @@ -148,7 +148,7 @@ func (c *CollectiblesServiceMock) DeploymentSignatureDigest(chainID uint64, addr func newWakuV2(s *suite.Suite, logger *zap.Logger, useLocalWaku bool) *waku.Waku { config := &waku.Config{ - DefaultShardPubsubTopic: transport.DefaultShardPubsubTopic(), + DefaultShardPubsubTopic: shard.DefaultShardPubsubTopic(), } var onPeerStats func(connStatus types.ConnStatus) @@ -289,7 +289,6 @@ func newCommunitiesTestMessenger(shh types.Waku, privateKey *ecdsa.PrivateKey, l if err != nil { return nil, err } - options := []Option{ WithCustomLogger(logger), WithDatabase(appDb), diff --git a/protocol/linkpreview_unfurler_status.go b/protocol/linkpreview_unfurler_status.go index 0db19f88f..29752f245 100644 --- a/protocol/linkpreview_unfurler_status.go +++ b/protocol/linkpreview_unfurler_status.go @@ -8,6 +8,7 @@ import ( "github.com/status-im/status-go/api/multiformat" "github.com/status-im/status-go/images" "github.com/status-im/status-go/protocol/common" + "github.com/status-im/status-go/protocol/common/shard" "github.com/status-im/status-go/protocol/communities" ) @@ -94,7 +95,7 @@ func (u *StatusUnfurler) fillCommunityImages(community *communities.Community, i return nil } -func (u *StatusUnfurler) buildCommunityData(communityID string, shard *common.Shard) (*communities.Community, *common.StatusCommunityLinkPreview, error) { +func (u *StatusUnfurler) buildCommunityData(communityID string, shard *shard.Shard) (*communities.Community, *common.StatusCommunityLinkPreview, error) { // This automatically checks the database community, err := u.m.FetchCommunity(&FetchCommunityRequest{ CommunityKey: communityID, @@ -127,7 +128,7 @@ func (u *StatusUnfurler) buildCommunityData(communityID string, shard *common.Sh return community, c, nil } -func (u *StatusUnfurler) buildChannelData(channelUUID string, communityID string, communityShard *common.Shard) (*common.StatusCommunityChannelLinkPreview, error) { +func (u *StatusUnfurler) buildChannelData(channelUUID string, communityID string, communityShard *shard.Shard) (*common.StatusCommunityChannelLinkPreview, error) { community, communityData, err := u.buildCommunityData(communityID, communityShard) if err != nil { return nil, fmt.Errorf("failed to build channel community data: %w", err) diff --git a/protocol/messenger.go b/protocol/messenger.go index d7be6c008..2ac7d77f6 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -1684,7 +1684,7 @@ func (m *Messenger) Init() error { } for _, org := range joinedCommunities { // the org advertise on the public topic derived by the pk - filtersToInit = append(filtersToInit, org.DefaultFilters()...) + filtersToInit = append(filtersToInit, m.DefaultFilters(org)...) // This is for status-go versions that didn't have `CommunitySettings` // We need to ensure communities that existed before community settings @@ -1730,9 +1730,8 @@ func (m *Messenger) Init() error { if err != nil { return err } - for _, org := range spectatedCommunities { - filtersToInit = append(filtersToInit, org.DefaultFilters()...) + filtersToInit = append(filtersToInit, m.DefaultFilters(org)...) } // Init filters for the communities we control @@ -1744,12 +1743,12 @@ func (m *Messenger) Init() error { for _, c := range controlledCommunities { communityFiltersToInitialize = append(communityFiltersToInitialize, transport.CommunityFilterToInitialize{ - Shard: c.Shard().TransportShard(), + Shard: c.Shard(), PrivKey: c.PrivateKey(), }) } - _, err = m.transport.InitCommunityFilters(communityFiltersToInitialize) + _, err = m.InitCommunityFilters(communityFiltersToInitialize) if err != nil { return err } @@ -1781,7 +1780,7 @@ func (m *Messenger) Init() error { switch chat.ChatType { case ChatTypePublic, ChatTypeProfile: - filtersToInit = append(filtersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: transport.DefaultShardPubsubTopic()}) + filtersToInit = append(filtersToInit, transport.FiltersToInitialize{ChatID: chat.ID}) case ChatTypeCommunityChat: communityID, err := hexutil.Decode(chat.CommunityID) if err != nil { @@ -1797,7 +1796,7 @@ func (m *Messenger) Init() error { communityInfo[chat.CommunityID] = community } - filtersToInit = append(filtersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: transport.GetPubsubTopic(community.Shard().TransportShard())}) + filtersToInit = append(filtersToInit, transport.FiltersToInitialize{ChatID: chat.ID, PubsubTopic: community.PubsubTopic()}) case ChatTypeOneToOne: pk, err := chat.PublicKey() if err != nil { @@ -2154,10 +2153,12 @@ func (m *Messenger) dispatchMessage(ctx context.Context, rawMessage common.RawMe return rawMessage, err } case ChatTypeCommunityChat: - rawMessage.PubsubTopic, err = m.communitiesManager.GetPubsubTopic(chat.CommunityID) + + community, err := m.communitiesManager.GetByIDString(chat.CommunityID) if err != nil { return rawMessage, err } + rawMessage.PubsubTopic = community.PubsubTopic() // TODO: add grant canPost, err := m.communitiesManager.CanPost(&m.identity.PublicKey, chat.CommunityID, chat.CommunityChatID(), nil) diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index 85719044e..ef68abfdb 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -26,6 +26,7 @@ import ( "github.com/status-im/status-go/images" "github.com/status-im/status-go/multiaccounts/accounts" "github.com/status-im/status-go/protocol/common" + "github.com/status-im/status-go/protocol/common/shard" "github.com/status-im/status-go/protocol/communities" "github.com/status-im/status-go/protocol/communities/token" "github.com/status-im/status-go/protocol/discord" @@ -60,10 +61,10 @@ const ( type FetchCommunityRequest struct { // CommunityKey should be either a public or a private community key - CommunityKey string `json:"communityKey"` - Shard *common.Shard `json:"shard"` - TryDatabase bool `json:"tryDatabase"` - WaitForResponse bool `json:"waitForResponse"` + CommunityKey string `json:"communityKey"` + Shard *shard.Shard `json:"shard"` + TryDatabase bool `json:"tryDatabase"` + WaitForResponse bool `json:"waitForResponse"` } func (r *FetchCommunityRequest) Validate() error { @@ -95,6 +96,7 @@ func (m *Messenger) publishOrg(org *communities.Community) error { m.logger.Debug("publishing org", zap.String("org-id", org.IDString()), zap.Any("org", org)) payload, err := org.MarshaledDescription() + if err != nil { return err } @@ -556,8 +558,7 @@ func (m *Messenger) SpectatedCommunities() ([]*communities.Community, error) { func (m *Messenger) initCommunityChats(community *communities.Community) ([]*Chat, error) { logger := m.logger.Named("initCommunityChats") - - publicFiltersToInit := community.DefaultFilters() + publicFiltersToInit := m.DefaultFilters(community) chats := CreateCommunityChats(community, m.getTimesource()) @@ -575,8 +576,9 @@ func (m *Messenger) initCommunityChats(community *communities.Community) ([]*Cha if community.IsControlNode() { // Init the community filter so we can receive messages on the community - communityFilters, err := m.transport.InitCommunityFilters([]transport.CommunityFilterToInitialize{{ - Shard: community.Shard().TransportShard(), + + communityFilters, err := m.InitCommunityFilters([]transport.CommunityFilterToInitialize{{ + Shard: community.Shard(), PrivKey: community.PrivateKey(), }}) @@ -651,13 +653,13 @@ func (m *Messenger) JoinCommunity(ctx context.Context, communityID types.HexByte return mr, nil } -func (m *Messenger) subscribeToCommunityShard(communityID []byte, shard *common.Shard) error { +func (m *Messenger) subscribeToCommunityShard(communityID []byte, shard *shard.Shard) error { if m.transport.WakuVersion() != 2 { return nil } // TODO: this should probably be moved completely to transport once pubsub topic logic is implemented - pubsubTopic := transport.GetPubsubTopic(shard.TransportShard()) + pubsubTopic := shard.PubsubTopic() privK, err := m.transport.RetrievePubsubTopicKey(pubsubTopic) if err != nil { @@ -1042,7 +1044,7 @@ func (m *Messenger) RequestToJoinCommunity(request *requests.RequestToJoinCommun CommunityID: community.ID(), SkipEncryptionLayer: true, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN, - PubsubTopic: transport.DefaultNonProtectedPubsubTopic(community.Shard().TransportShard()), + PubsubTopic: shard.DefaultNonProtectedPubsubTopic(), } _, err = m.sender.SendCommunityMessage(context.Background(), rawMessage) @@ -1361,7 +1363,7 @@ func (m *Messenger) CancelRequestToJoinCommunity(ctx context.Context, request *r CommunityID: community.ID(), SkipEncryptionLayer: true, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_CANCEL_REQUEST_TO_JOIN, - PubsubTopic: transport.DefaultNonProtectedPubsubTopic(community.Shard().TransportShard()), + PubsubTopic: shard.DefaultNonProtectedPubsubTopic(), } _, err = m.sender.SendCommunityMessage(context.Background(), rawMessage) @@ -1468,7 +1470,7 @@ func (m *Messenger) acceptRequestToJoinCommunity(requestToJoin *communities.Requ Sender: community.PrivateKey(), SkipEncryptionLayer: true, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN_RESPONSE, - PubsubTopic: transport.DefaultNonProtectedPubsubTopic(community.Shard().TransportShard()), + PubsubTopic: shard.DefaultNonProtectedPubsubTopic(), } _, err = m.sender.SendPrivate(context.Background(), pk, rawMessage) @@ -1881,6 +1883,42 @@ func (m *Messenger) DeleteCommunityChat(communityID types.HexBytes, chatID strin return response, nil } +func (m *Messenger) useShards() bool { + nodeConfig, err := m.settings.GetNodeConfig() + if err != nil { + return false + } + return nodeConfig.WakuV2Config.UseShardAsDefaultTopic +} + +func (m *Messenger) InitCommunityFilters(communityFiltersToInitialize []transport.CommunityFilterToInitialize) ([]*transport.Filter, error) { + return m.transport.InitCommunityFilters(communityFiltersToInitialize, m.useShards()) +} + +func (m *Messenger) DefaultFilters(o *communities.Community) []transport.FiltersToInitialize { + cID := o.IDString() + uncompressedPubKey := common.PubkeyToHex(o.PublicKey())[2:] + updatesChannelID := o.StatusUpdatesChannelID() + mlChannelID := o.MagnetlinkMessageChannelID() + memberUpdateChannelID := o.MemberUpdateChannelID() + + communityPubsubTopic := o.PubsubTopic() + + filters := []transport.FiltersToInitialize{ + {ChatID: cID, PubsubTopic: communityPubsubTopic}, + {ChatID: uncompressedPubKey, PubsubTopic: communityPubsubTopic}, + {ChatID: updatesChannelID, PubsubTopic: communityPubsubTopic}, + {ChatID: mlChannelID, PubsubTopic: communityPubsubTopic}, + {ChatID: memberUpdateChannelID, PubsubTopic: communityPubsubTopic}, + } + + if m.useShards() { + filters = append(filters, transport.FiltersToInitialize{ChatID: uncompressedPubKey, PubsubTopic: shard.DefaultNonProtectedPubsubTopic()}) + } + + return filters +} + func (m *Messenger) CreateCommunity(request *requests.CreateCommunity, createDefaultChannel bool) (*MessengerResponse, error) { if err := request.Validate(); err != nil { return nil, err @@ -1907,8 +1945,8 @@ func (m *Messenger) CreateCommunity(request *requests.CreateCommunity, createDef } // Init the community filter so we can receive messages on the community - _, err = m.transport.InitCommunityFilters([]transport.CommunityFilterToInitialize{{ - Shard: community.Shard().TransportShard(), + _, err = m.InitCommunityFilters([]transport.CommunityFilterToInitialize{{ + Shard: community.Shard(), PrivKey: community.PrivateKey(), }}) if err != nil { @@ -1916,7 +1954,7 @@ func (m *Messenger) CreateCommunity(request *requests.CreateCommunity, createDef } // Init the default community filters - _, err = m.transport.InitPublicFilters(community.DefaultFilters()) + _, err = m.transport.InitPublicFilters(m.DefaultFilters(community)) if err != nil { return nil, err } @@ -1998,9 +2036,10 @@ func (m *Messenger) SetCommunityShard(request *requests.SetCommunityShard) (*Mes } func (m *Messenger) UpdateCommunityFilters(community *communities.Community) error { - publicFiltersToInit := make([]transport.FiltersToInitialize, 0, len(community.DefaultFilters())+len(community.Chats())) + defaultFilters := m.DefaultFilters(community) + publicFiltersToInit := make([]transport.FiltersToInitialize, 0, len(defaultFilters)+len(community.Chats())) - publicFiltersToInit = append(publicFiltersToInit, community.DefaultFilters()...) + publicFiltersToInit = append(publicFiltersToInit, defaultFilters...) for chatID := range community.Chats() { communityChatID := community.IDString() + chatID @@ -2017,8 +2056,8 @@ func (m *Messenger) UpdateCommunityFilters(community *communities.Community) err } // Init the community filter so we can receive messages on the community - _, err = m.transport.InitCommunityFilters([]transport.CommunityFilterToInitialize{{ - Shard: community.Shard().TransportShard(), + _, err = m.InitCommunityFilters([]transport.CommunityFilterToInitialize{{ + Shard: community.Shard(), PrivKey: community.PrivateKey(), }}) if err != nil { @@ -2209,7 +2248,7 @@ func (m *Messenger) ImportCommunity(ctx context.Context, key *ecdsa.PrivateKey) } // Load filters - _, err = m.transport.InitPublicFilters(community.DefaultFilters()) + _, err = m.transport.InitPublicFilters(m.DefaultFilters(community)) if err != nil { return nil, err } @@ -2521,7 +2560,7 @@ func (m *Messenger) FetchCommunity(request *FetchCommunityRequest) (*communities // requestCommunityInfoFromMailserver installs filter for community and requests its details // from mailserver. When response received it will be passed through signals handler -func (m *Messenger) requestCommunityInfoFromMailserver(communityID string, shard *common.Shard, waitForResponse bool) (*communities.Community, error) { +func (m *Messenger) requestCommunityInfoFromMailserver(communityID string, shard *shard.Shard, waitForResponse bool) (*communities.Community, error) { m.logger.Info("requesting community info", zap.String("communityID", communityID), zap.Any("shard", shard)) @@ -2538,7 +2577,7 @@ func (m *Messenger) requestCommunityInfoFromMailserver(communityID string, shard if filter == nil { filters, err := m.transport.InitPublicFilters([]transport.FiltersToInitialize{{ ChatID: communityID, - PubsubTopic: transport.GetPubsubTopic(shard.TransportShard()), + PubsubTopic: shard.PubsubTopic(), }}) if err != nil { return nil, fmt.Errorf("Can't install filter for community: %v", err) @@ -2628,7 +2667,7 @@ func (m *Messenger) requestCommunitiesFromMailserver(communities []communities.C if filter == nil { filters, err := m.transport.InitPublicFilters([]transport.FiltersToInitialize{{ ChatID: c.CommunityID, - PubsubTopic: transport.GetPubsubTopic(c.Shard.TransportShard()), + PubsubTopic: c.Shard.PubsubTopic(), }}) if err != nil { m.logger.Error("Can't install filter for community", zap.Error(err)) @@ -2950,7 +2989,7 @@ func (m *Messenger) HandleCommunityShardKey(state *ReceivedMessageState, message return errors.New("signer can't be nil") } - err = m.handleCommunityShardAndFiltersFromProto(community, common.ShardFromProtobuff(message.Shard), message.PrivateKey) + err = m.handleCommunityShardAndFiltersFromProto(community, shard.FromProtobuff(message.Shard), message.PrivateKey) if err != nil { return err } @@ -2960,7 +2999,7 @@ func (m *Messenger) HandleCommunityShardKey(state *ReceivedMessageState, message return nil } -func (m *Messenger) handleCommunityShardAndFiltersFromProto(community *communities.Community, shard *common.Shard, privateKeyBytes []byte) error { +func (m *Messenger) handleCommunityShardAndFiltersFromProto(community *communities.Community, shard *shard.Shard, privateKeyBytes []byte) error { err := m.communitiesManager.UpdateShard(community, shard) if err != nil { return err diff --git a/protocol/messenger_communities_import_discord.go b/protocol/messenger_communities_import_discord.go index 100fcb0f0..c1e295d82 100644 --- a/protocol/messenger_communities_import_discord.go +++ b/protocol/messenger_communities_import_discord.go @@ -1778,8 +1778,8 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor } // Init the community filter so we can receive messages on the community - _, err = m.transport.InitCommunityFilters([]transport.CommunityFilterToInitialize{{ - Shard: discordCommunity.Shard().TransportShard(), + _, err = m.InitCommunityFilters([]transport.CommunityFilterToInitialize{{ + Shard: discordCommunity.Shard(), PrivKey: discordCommunity.PrivateKey(), }}) if err != nil { @@ -1799,7 +1799,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor return } - _, err = m.transport.InitPublicFilters(discordCommunity.DefaultFilters()) + _, err = m.transport.InitPublicFilters(m.DefaultFilters(discordCommunity)) if err != nil { m.cleanUpImport(communityID) importProgress.AddTaskError(discord.InitCommunityTask, discord.Error(err.Error())) diff --git a/protocol/messenger_contacts.go b/protocol/messenger_contacts.go index 68f8d64b0..f5e6d714c 100644 --- a/protocol/messenger_contacts.go +++ b/protocol/messenger_contacts.go @@ -478,7 +478,7 @@ func (m *Messenger) addContact(ctx context.Context, pubKey, ensName, nickname, d if !deprecation.ChatProfileDeprecated { response.AddChat(profileChat) - _, err = m.transport.InitFilters([]transport.FiltersToInitialize{{ChatID: profileChat.ID, PubsubTopic: transport.DefaultShardPubsubTopic()}}, []*ecdsa.PublicKey{publicKey}) + _, err = m.transport.InitFilters([]transport.FiltersToInitialize{{ChatID: profileChat.ID}}, []*ecdsa.PublicKey{publicKey}) if err != nil { return nil, err } diff --git a/protocol/messenger_handler.go b/protocol/messenger_handler.go index ab00ffeea..713e1b60b 100644 --- a/protocol/messenger_handler.go +++ b/protocol/messenger_handler.go @@ -24,6 +24,7 @@ import ( multiaccountscommon "github.com/status-im/status-go/multiaccounts/common" "github.com/status-im/status-go/multiaccounts/settings" "github.com/status-im/status-go/protocol/common" + "github.com/status-im/status-go/protocol/common/shard" "github.com/status-im/status-go/protocol/communities" "github.com/status-im/status-go/protocol/encryption/multidevice" "github.com/status-im/status-go/protocol/identity" @@ -1608,7 +1609,7 @@ func (m *Messenger) HandleCommunityRequestToJoinResponse(state *ReceivedMessageS return err } - err = m.handleCommunityShardAndFiltersFromProto(community, common.ShardFromProtobuff(requestToJoinResponseProto.Shard), requestToJoinResponseProto.ProtectedTopicPrivateKey) + err = m.handleCommunityShardAndFiltersFromProto(community, shard.FromProtobuff(requestToJoinResponseProto.Shard), requestToJoinResponseProto.ProtectedTopicPrivateKey) if err != nil { return err } diff --git a/protocol/messenger_mailserver.go b/protocol/messenger_mailserver.go index ae2efc076..00cc74f33 100644 --- a/protocol/messenger_mailserver.go +++ b/protocol/messenger_mailserver.go @@ -255,7 +255,7 @@ func (m *Messenger) syncBackup() error { to := m.calculateMailserverTo() from := uint32(m.getTimesource().GetCurrentTime()/1000) - oneMonthInSeconds - batch := MailserverBatch{From: from, To: to, PubsubTopic: transport.DefaultShardPubsubTopic(), Topics: []types.TopicType{filter.ContentTopic}} + batch := MailserverBatch{From: from, To: to, Topics: []types.TopicType{filter.ContentTopic}} err := m.processMailserverBatch(batch) if err != nil { return err diff --git a/protocol/messenger_share_urls.go b/protocol/messenger_share_urls.go index 880fe1b12..d36a55936 100644 --- a/protocol/messenger_share_urls.go +++ b/protocol/messenger_share_urls.go @@ -11,6 +11,7 @@ import ( "github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/protocol/common" + "github.com/status-im/status-go/protocol/common/shard" "github.com/status-im/status-go/protocol/communities" "github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/requests" @@ -45,7 +46,7 @@ type URLDataResponse struct { Community *CommunityURLData `json:"community"` Channel *CommunityChannelURLData `json:"channel"` Contact *ContactURLData `json:"contact"` - Shard *common.Shard `json:"shard,omitempty"` + Shard *shard.Shard `json:"shard,omitempty"` } const baseShareURL = "https://status.app" @@ -197,7 +198,7 @@ func parseCommunityURLWithData(data string, chatKey string) (*URLDataResponse, e TagIndices: communityProto.TagIndices, CommunityID: types.EncodeHex(communityID), }, - Shard: common.ShardFromProtobuff(urlDataProto.Shard), + Shard: shard.FromProtobuff(urlDataProto.Shard), }, nil } @@ -366,7 +367,7 @@ func parseCommunityChannelURLWithData(data string, chatKey string) (*URLDataResp Color: channelProto.Color, ChannelUUID: channelProto.Uuid, }, - Shard: common.ShardFromProtobuff(urlDataProto.Shard), + Shard: shard.FromProtobuff(urlDataProto.Shard), }, nil } diff --git a/protocol/requests/set_community_shard.go b/protocol/requests/set_community_shard.go index 0798cc6b5..0b1240b56 100644 --- a/protocol/requests/set_community_shard.go +++ b/protocol/requests/set_community_shard.go @@ -4,13 +4,12 @@ import ( "errors" "github.com/status-im/status-go/eth-node/types" - "github.com/status-im/status-go/protocol/common" - "github.com/status-im/status-go/protocol/transport" + "github.com/status-im/status-go/protocol/common/shard" ) type SetCommunityShard struct { CommunityID types.HexBytes `json:"communityId"` - Shard *common.Shard `json:"shard,omitempty"` + Shard *shard.Shard `json:"shard,omitempty"` PrivateKey *types.HexBytes `json:"privateKey,omitempty"` } @@ -20,7 +19,7 @@ func (s *SetCommunityShard) Validate() error { } if s.Shard != nil { // TODO: for now only MainStatusShard(16) is accepted - if s.Shard.Cluster != transport.MainStatusShardCluster { + if s.Shard.Cluster != shard.MainStatusShardCluster { return errors.New("invalid shard cluster") } if s.Shard.Index > 1023 { diff --git a/protocol/transport/filters_manager.go b/protocol/transport/filters_manager.go index 3255b6274..13ac5df04 100644 --- a/protocol/transport/filters_manager.go +++ b/protocol/transport/filters_manager.go @@ -11,17 +11,13 @@ import ( "go.uber.org/zap" "github.com/status-im/status-go/eth-node/types" + "github.com/status-im/status-go/protocol/common/shard" ) const ( minPow = 0.0 ) -type Shard struct { - Cluster uint16 - Index uint16 -} - type RawFilter struct { FilterID string Topic types.TopicType @@ -145,11 +141,11 @@ func (f *FiltersManager) InitPublicFilters(publicFiltersToInit []FiltersToInitia } type CommunityFilterToInitialize struct { - Shard *Shard + Shard *shard.Shard PrivKey *ecdsa.PrivateKey } -func (f *FiltersManager) InitCommunityFilters(communityFiltersToInitialize []CommunityFilterToInitialize) ([]*Filter, error) { +func (f *FiltersManager) InitCommunityFilters(communityFiltersToInitialize []CommunityFilterToInitialize, useShards bool) ([]*Filter, error) { var filters []*Filter f.mutex.Lock() defer f.mutex.Unlock() @@ -159,10 +155,15 @@ func (f *FiltersManager) InitCommunityFilters(communityFiltersToInitialize []Com continue } - communityPubsubTopic := GetPubsubTopic(cf.Shard) - topics := []string{communityPubsubTopic} - if communityPubsubTopic != DefaultShardPubsubTopic() { - topics = append(topics, DefaultShardPubsubTopic()) + topics := make([]string, 0) + if useShards { + topics = append(topics, shard.DefaultShardPubsubTopic()) + if cf.Shard.PubsubTopic() != "" { + topics = append(topics, cf.Shard.PubsubTopic()) + } + } else { + topics = append(topics, "") // empty PubsubTopic means default pubsub topic, + // to be overridden with proper value in Waku layer } // TODO: requests to join / cancels are currently being sent into the default waku topic. @@ -386,11 +387,9 @@ func (f *FiltersManager) LoadPersonal(publicKey *ecdsa.PublicKey, identity *ecds return f.filters[chatID], nil } - pubsubTopic := DefaultShardPubsubTopic() - // We set up a filter so we can publish, // but we discard envelopes if listen is false. - filter, err := f.addAsymmetric(chatID, pubsubTopic, identity, listen) + filter, err := f.addAsymmetric(chatID, "", identity, listen) if err != nil { f.logger.Debug("could not register personal topic filter", zap.Error(err)) return nil, err @@ -400,7 +399,6 @@ func (f *FiltersManager) LoadPersonal(publicKey *ecdsa.PublicKey, identity *ecds ChatID: chatID, FilterID: filter.FilterID, ContentTopic: filter.Topic, - PubsubTopic: pubsubTopic, Identity: PublicKeyToStr(publicKey), Listen: listen, OneToOne: true, @@ -427,11 +425,9 @@ func (f *FiltersManager) loadPartitioned(publicKey *ecdsa.PublicKey, identity *e return f.filters[chatID], nil } - pubsubTopic := DefaultShardPubsubTopic() - // We set up a filter so we can publish, // but we discard envelopes if listen is false. - filter, err := f.addAsymmetric(chatID, pubsubTopic, identity, listen) + filter, err := f.addAsymmetric(chatID, "", identity, listen) if err != nil { f.logger.Debug("could not register partitioned topic", zap.String("chatID", chatID), zap.Error(err)) return nil, err @@ -441,7 +437,6 @@ func (f *FiltersManager) loadPartitioned(publicKey *ecdsa.PublicKey, identity *e ChatID: chatID, FilterID: filter.FilterID, ContentTopic: filter.Topic, - PubsubTopic: pubsubTopic, Identity: PublicKeyToStr(publicKey), Listen: listen, Ephemeral: ephemeral, @@ -466,9 +461,8 @@ func (f *FiltersManager) LoadNegotiated(secret types.NegotiatedSecret) (*Filter, return f.filters[chatID], nil } - pubsubTopic := DefaultShardPubsubTopic() keyString := hex.EncodeToString(secret.Key) - filter, err := f.addSymmetric(keyString, pubsubTopic) + filter, err := f.addSymmetric(keyString, "") if err != nil { f.logger.Debug("could not register negotiated topic", zap.Error(err)) return nil, err @@ -477,7 +471,6 @@ func (f *FiltersManager) LoadNegotiated(secret types.NegotiatedSecret) (*Filter, chat := &Filter{ ChatID: chatID, ContentTopic: filter.Topic, - PubsubTopic: pubsubTopic, SymKeyID: filter.SymKeyID, FilterID: filter.FilterID, Identity: PublicKeyToStr(secret.PublicKey), @@ -518,12 +511,11 @@ func (f *FiltersManager) LoadDiscovery() ([]*Filter, error) { // Load personal discovery personalDiscoveryChat := &Filter{ - ChatID: personalDiscoveryTopic, - Identity: identityStr, - PubsubTopic: DefaultShardPubsubTopic(), - Discovery: true, - Listen: true, - OneToOne: true, + ChatID: personalDiscoveryTopic, + Identity: identityStr, + Discovery: true, + Listen: true, + OneToOne: true, } discoveryResponse, err := f.addAsymmetric(personalDiscoveryChat.ChatID, personalDiscoveryChat.PubsubTopic, f.privateKey, true) @@ -591,9 +583,7 @@ func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, erro return f.filters[chatID], nil } - pubsubTopic := DefaultShardPubsubTopic() - - contactCodeFilter, err := f.addSymmetric(chatID, pubsubTopic) + contactCodeFilter, err := f.addSymmetric(chatID, "") if err != nil { f.logger.Debug("could not register contact code topic", zap.String("chatID", chatID), zap.Error(err)) return nil, err @@ -605,7 +595,6 @@ func (f *FiltersManager) LoadContactCode(pubKey *ecdsa.PublicKey) (*Filter, erro ContentTopic: contactCodeFilter.Topic, SymKeyID: contactCodeFilter.SymKeyID, Identity: PublicKeyToStr(pubKey), - PubsubTopic: pubsubTopic, Listen: true, } diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index e05cbdff8..315366b40 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -15,8 +15,6 @@ import ( "github.com/pkg/errors" "go.uber.org/zap" - "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/ethereum/go-ethereum/common" "github.com/status-im/status-go/connection" "github.com/status-im/status-go/eth-node/crypto" @@ -166,8 +164,8 @@ func (t *Transport) LoadFilters(filters []*Filter) ([]*Filter, error) { return t.filters.InitWithFilters(filters) } -func (t *Transport) InitCommunityFilters(communityFiltersToInitialize []CommunityFilterToInitialize) ([]*Filter, error) { - return t.filters.InitCommunityFilters(communityFiltersToInitialize) +func (t *Transport) InitCommunityFilters(communityFiltersToInitialize []CommunityFilterToInitialize, useShards bool) ([]*Filter, error) { + return t.filters.InitCommunityFilters(communityFiltersToInitialize, useShards) } func (t *Transport) RemoveFilters(filters []*Filter) error { @@ -191,7 +189,7 @@ func (t *Transport) ProcessNegotiatedSecret(secret types.NegotiatedSecret) (*Fil } func (t *Transport) JoinPublic(chatID string) (*Filter, error) { - return t.filters.LoadPublic(chatID, DefaultShardPubsubTopic()) + return t.filters.LoadPublic(chatID, "") } func (t *Transport) LeavePublic(chatID string) error { @@ -288,6 +286,7 @@ func (t *Transport) SendPublic(ctx context.Context, newMessage *types.NewMessage newMessage.SymKeyID = filter.SymKeyID newMessage.Topic = filter.ContentTopic + newMessage.PubsubTopic = filter.PubsubTopic return t.api.Post(ctx, *newMessage) } @@ -307,6 +306,7 @@ func (t *Transport) SendPrivateWithSharedSecret(ctx context.Context, newMessage newMessage.SymKeyID = filter.SymKeyID newMessage.Topic = filter.ContentTopic + newMessage.PubsubTopic = filter.PubsubTopic newMessage.PublicKey = nil return t.api.Post(ctx, *newMessage) @@ -322,6 +322,7 @@ func (t *Transport) SendPrivateWithPartitioned(ctx context.Context, newMessage * return nil, err } + newMessage.PubsubTopic = filter.PubsubTopic newMessage.Topic = filter.ContentTopic newMessage.PublicKey = crypto.FromECDSAPub(publicKey) @@ -338,6 +339,7 @@ func (t *Transport) SendPrivateOnPersonalTopic(ctx context.Context, newMessage * return nil, err } + newMessage.PubsubTopic = filter.PubsubTopic newMessage.Topic = filter.ContentTopic newMessage.PublicKey = crypto.FromECDSAPub(publicKey) @@ -363,6 +365,7 @@ func (t *Transport) SendCommunityMessage(ctx context.Context, newMessage *types. return nil, err } + newMessage.PubsubTopic = filter.PubsubTopic newMessage.Topic = filter.ContentTopic newMessage.PublicKey = crypto.FromECDSAPub(publicKey) @@ -676,31 +679,3 @@ func (t *Transport) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) func (t *Transport) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) { return t.waku.RetrievePubsubTopicKey(topic) } - -func GetPubsubTopic(shard *Shard) string { - if shard != nil { - return protocol.NewStaticShardingPubsubTopic(shard.Cluster, shard.Index).String() - } - - return DefaultShardPubsubTopic() -} - -func DefaultNonProtectedPubsubTopic(shard *Shard) string { - if shard != nil { - return GetPubsubTopic(&Shard{ - Cluster: MainStatusShardCluster, - Index: NonProtectedShardIndex, - }) - } - - return DefaultShardPubsubTopic() -} - -const MainStatusShardCluster = 16 -const DefaultShardIndex = 32 -const NonProtectedShardIndex = 64 -const UndefinedShardValue = 0 - -func DefaultShardPubsubTopic() string { - return protocol.NewStaticShardingPubsubTopic(MainStatusShardCluster, DefaultShardIndex).String() -} diff --git a/protocol/v1/status_message.go b/protocol/v1/status_message.go index 51b867647..1c5ff96a5 100644 --- a/protocol/v1/status_message.go +++ b/protocol/v1/status_message.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/log" + utils "github.com/status-im/status-go/common" "github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/protocol/encryption" @@ -152,12 +153,13 @@ func (m *StatusMessage) HandleEncryptionLayer(myKey *ecdsa.PrivateKey, senderKey } func (m *StatusMessage) HandleApplicationLayer() error { + message, err := protobuf.Unmarshal(m.EncryptionLayer.Payload) if err != nil { return err } - recoveredKey, err := message.RecoverKey() + recoveredKey, err := utils.RecoverKey(message) if err != nil { return err } diff --git a/services/chat/api.go b/services/chat/api.go index 4b15b6476..46ba12b97 100644 --- a/services/chat/api.go +++ b/services/chat/api.go @@ -13,6 +13,7 @@ import ( "github.com/status-im/status-go/images" "github.com/status-im/status-go/protocol" "github.com/status-im/status-go/protocol/common" + "github.com/status-im/status-go/protocol/common/shard" "github.com/status-im/status-go/protocol/communities" "github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/requests" @@ -106,7 +107,7 @@ type ChannelGroup struct { CheckChannelPermissionResponses map[string]*communities.CheckChannelPermissionsResponse `json:"checkChannelPermissionResponses"` PubsubTopic string `json:"pubsubTopic"` PubsubTopicKey string `json:"pubsubTopicKey"` - Shard *common.Shard `json:"shard"` + Shard *shard.Shard `json:"shard"` } func NewAPI(service *Service) *API { diff --git a/services/ext/api.go b/services/ext/api.go index 1750c3903..f5d66a334 100644 --- a/services/ext/api.go +++ b/services/ext/api.go @@ -26,6 +26,7 @@ import ( "github.com/status-im/status-go/multiaccounts/settings" "github.com/status-im/status-go/protocol" "github.com/status-im/status-go/protocol/common" + "github.com/status-im/status-go/protocol/common/shard" "github.com/status-im/status-go/protocol/communities" "github.com/status-im/status-go/protocol/communities/token" "github.com/status-im/status-go/protocol/discord" @@ -1228,7 +1229,7 @@ func (api *PublicAPI) RequestCommunityInfoFromMailserver(communityID string) (*c // Deprecated: RequestCommunityInfoFromMailserverWithShard is deprecated in favor of // configurable FetchCommunity. -func (api *PublicAPI) RequestCommunityInfoFromMailserverWithShard(communityID string, shard *common.Shard) (*communities.Community, error) { +func (api *PublicAPI) RequestCommunityInfoFromMailserverWithShard(communityID string, shard *shard.Shard) (*communities.Community, error) { request := &protocol.FetchCommunityRequest{ CommunityKey: communityID, Shard: shard, @@ -1253,7 +1254,7 @@ func (api *PublicAPI) RequestCommunityInfoFromMailserverAsync(communityID string // Deprecated: RequestCommunityInfoFromMailserverAsyncWithShard is deprecated in favor of // configurable FetchCommunity. -func (api *PublicAPI) RequestCommunityInfoFromMailserverAsyncWithShard(communityID string, shard *common.Shard) error { +func (api *PublicAPI) RequestCommunityInfoFromMailserverAsyncWithShard(communityID string, shard *shard.Shard) error { request := &protocol.FetchCommunityRequest{ CommunityKey: communityID, Shard: shard, diff --git a/services/ext/service.go b/services/ext/service.go index 374df01a2..c1dac4876 100644 --- a/services/ext/service.go +++ b/services/ext/service.go @@ -39,6 +39,7 @@ import ( "github.com/status-im/status-go/protocol" "github.com/status-im/status-go/protocol/anonmetrics" "github.com/status-im/status-go/protocol/common" + "github.com/status-im/status-go/protocol/common/shard" "github.com/status-im/status-go/protocol/communities" "github.com/status-im/status-go/protocol/communities/token" "github.com/status-im/status-go/protocol/protobuf" @@ -673,7 +674,7 @@ func (s *Service) fetchCommunity(communityID string) (*communities.Community, er // TODO: we need the shard information in the collectible to be able to retrieve info for // communities that have specific shards - var shard *common.Shard = nil // TODO: build this with info from token + var shard *shard.Shard = nil // TODO: build this with info from token community, err := s.messenger.FetchCommunity(&protocol.FetchCommunityRequest{ CommunityKey: communityID, Shard: shard, diff --git a/services/mailservers/api_test.go b/services/mailservers/api_test.go index 183fe25d7..640edd441 100644 --- a/services/mailservers/api_test.go +++ b/services/mailservers/api_test.go @@ -8,6 +8,7 @@ import ( "github.com/status-im/status-go/appdatabase" "github.com/status-im/status-go/eth-node/types" + "github.com/status-im/status-go/protocol/common/shard" "github.com/status-im/status-go/protocol/transport" "github.com/status-im/status-go/t/helpers" ) @@ -58,9 +59,9 @@ func TestTopic(t *testing.T) { defer close() topicA := "0x61000000" topicD := "0x64000000" - topic1 := MailserverTopic{PubsubTopic: transport.DefaultShardPubsubTopic(), ContentTopic: topicA, LastRequest: 1} - topic2 := MailserverTopic{PubsubTopic: transport.DefaultShardPubsubTopic(), ContentTopic: "0x6200000", LastRequest: 2} - topic3 := MailserverTopic{PubsubTopic: transport.DefaultShardPubsubTopic(), ContentTopic: "0x6300000", LastRequest: 3} + topic1 := MailserverTopic{PubsubTopic: shard.DefaultShardPubsubTopic(), ContentTopic: topicA, LastRequest: 1} + topic2 := MailserverTopic{PubsubTopic: shard.DefaultShardPubsubTopic(), ContentTopic: "0x6200000", LastRequest: 2} + topic3 := MailserverTopic{PubsubTopic: shard.DefaultShardPubsubTopic(), ContentTopic: "0x6300000", LastRequest: 3} require.NoError(t, db.AddTopic(topic1)) require.NoError(t, db.AddTopic(topic2)) @@ -73,14 +74,14 @@ func TestTopic(t *testing.T) { filters := []*transport.Filter{ // Existing topic, is not updated { - PubsubTopic: transport.DefaultShardPubsubTopic(), + PubsubTopic: shard.DefaultShardPubsubTopic(), ContentTopic: types.BytesToTopic([]byte{0x61}), }, // Non existing topic is not inserted { Discovery: true, Negotiated: true, - PubsubTopic: transport.DefaultShardPubsubTopic(), + PubsubTopic: shard.DefaultShardPubsubTopic(), ContentTopic: types.BytesToTopic([]byte{0x64}), }, } @@ -156,7 +157,7 @@ func TestAddGetDeleteMailserverTopics(t *testing.T) { defer close() api := &API{db: db} testTopic := MailserverTopic{ - PubsubTopic: transport.DefaultShardPubsubTopic(), + PubsubTopic: shard.DefaultShardPubsubTopic(), ContentTopic: "topic-001", ChatIDs: []string{"chatID01", "chatID02"}, LastRequest: 10, @@ -169,14 +170,14 @@ func TestAddGetDeleteMailserverTopics(t *testing.T) { require.NoError(t, err) require.EqualValues(t, []MailserverTopic{testTopic}, topics) - err = api.DeleteMailserverTopic(context.Background(), transport.DefaultShardPubsubTopic(), testTopic.ContentTopic) + err = api.DeleteMailserverTopic(context.Background(), shard.DefaultShardPubsubTopic(), testTopic.ContentTopic) require.NoError(t, err) topics, err = api.GetMailserverTopics(context.Background()) require.NoError(t, err) require.EqualValues(t, ([]MailserverTopic)(nil), topics) // Delete non-existing topic. - err = api.DeleteMailserverTopic(context.Background(), transport.DefaultShardPubsubTopic(), "non-existing-topic") + err = api.DeleteMailserverTopic(context.Background(), shard.DefaultShardPubsubTopic(), "non-existing-topic") require.NoError(t, err) } diff --git a/services/status/service.go b/services/status/service.go index cf9a1c5a1..abfa1531e 100644 --- a/services/status/service.go +++ b/services/status/service.go @@ -10,7 +10,7 @@ import ( "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/protocol" - "github.com/status-im/status-go/protocol/common" + "github.com/status-im/status-go/protocol/common/shard" ) // Make sure that Service implements node.Lifecycle interface. @@ -70,7 +70,7 @@ type PublicAPI struct { service *Service } -func (p *PublicAPI) CommunityInfo(communityID types.HexBytes, shard *common.Shard) (json.RawMessage, error) { +func (p *PublicAPI) CommunityInfo(communityID types.HexBytes, shard *shard.Shard) (json.RawMessage, error) { if p.service.messenger == nil { return nil, ErrNotInitialized } diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 0f7541839..5d9e5d970 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -616,12 +616,19 @@ func (w *Waku) runPeerExchangeLoop() { } } +func (w *Waku) getPubsubTopic(topic string) string { + if topic == "" || !w.cfg.UseShardAsDefaultTopic { + topic = w.settings.DefaultPubsubTopic + } + return topic +} + func (w *Waku) subscribeToPubsubTopicWithWakuRelay(topic string, pubkey *ecdsa.PublicKey) error { if w.settings.LightClient { return errors.New("only available for full nodes") } - topic = getPubsubTopic(topic, w.cfg.UseShardAsDefaultTopic, w.settings.DefaultPubsubTopic) + topic = w.getPubsubTopic(topic) if w.node.Relay().IsSubscribed(topic) { return nil @@ -949,10 +956,7 @@ func (w *Waku) GetSymKey(id string) ([]byte, error) { // Subscribe installs a new message handler used for filtering, decrypting // and subsequent storing of incoming messages. func (w *Waku) Subscribe(f *common.Filter) (string, error) { - if f.PubsubTopic == "" { - f.PubsubTopic = w.settings.DefaultPubsubTopic - } - + f.PubsubTopic = w.getPubsubTopic(f.PubsubTopic) id, err := w.filters.Install(f) if err != nil { return id, err @@ -1005,20 +1009,11 @@ func (w *Waku) UnsubscribeMany(ids []string) error { return nil } -func getPubsubTopic(pubsubTopic string, useShardAsDefaultTopic bool, defaultShardPubsubTopic string) string { - // Override the pubsub topic used, in case the default shard is being used - // and the configuration indicates we need to use the default waku topic from relay - if !useShardAsDefaultTopic && pubsubTopic == defaultShardPubsubTopic { - pubsubTopic = relay.DefaultWakuTopic - } - return pubsubTopic -} - func (w *Waku) broadcast() { for { select { case envelope := <-w.sendQueue: - pubsubTopic := getPubsubTopic(envelope.PubsubTopic(), w.cfg.UseShardAsDefaultTopic, w.cfg.DefaultShardPubsubTopic) + pubsubTopic := envelope.PubsubTopic() var err error logger := w.logger.With(zap.String("envelopeHash", hexutil.Encode(envelope.Hash())), zap.String("pubsubTopic", pubsubTopic), zap.String("contentTopic", envelope.Message().ContentTopic), zap.Int64("timestamp", envelope.Message().Timestamp)) if w.settings.LightClient { @@ -1055,10 +1050,7 @@ func (w *Waku) broadcast() { // Send injects a message into the waku send queue, to be distributed in the // network in the coming cycles. func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage) ([]byte, error) { - if pubsubTopic == "" { - pubsubTopic = w.settings.DefaultPubsubTopic - } - + pubsubTopic = w.getPubsubTopic(pubsubTopic) if w.protectedTopicStore != nil { privKey, err := w.protectedTopicStore.FetchPrivateKey(pubsubTopic) if err != nil { @@ -1323,12 +1315,6 @@ func (w *Waku) OnNewEnvelopes(envelope *protocol.Envelope, msgType common.Messag return nil } - // Override the message pubsub topci in case the configuration indicates we shouldn't - // use the default shard but the default waku topic from relay instead - if !w.cfg.UseShardAsDefaultTopic && recvMessage.PubsubTopic == relay.DefaultWakuTopic { - recvMessage.PubsubTopic = w.cfg.DefaultShardPubsubTopic - } - if w.statusTelemetryClient != nil { w.statusTelemetryClient.PushReceivedEnvelope(envelope) } @@ -1483,6 +1469,8 @@ func (w *Waku) ListenAddresses() []string { } func (w *Waku) SubscribeToPubsubTopic(topic string, pubkey *ecdsa.PublicKey) error { + topic = w.getPubsubTopic(topic) + if !w.settings.LightClient { err := w.subscribeToPubsubTopicWithWakuRelay(topic, pubkey) if err != nil { @@ -1493,6 +1481,7 @@ func (w *Waku) SubscribeToPubsubTopic(topic string, pubkey *ecdsa.PublicKey) err } func (w *Waku) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) { + topic = w.getPubsubTopic(topic) if w.protectedTopicStore == nil { return nil, nil } @@ -1501,6 +1490,7 @@ func (w *Waku) RetrievePubsubTopicKey(topic string) (*ecdsa.PrivateKey, error) { } func (w *Waku) StorePubsubTopicKey(topic string, privKey *ecdsa.PrivateKey) error { + topic = w.getPubsubTopic(topic) if w.protectedTopicStore == nil { return nil } diff --git a/wakuv2/waku_test.go b/wakuv2/waku_test.go index a77500305..b13199f66 100644 --- a/wakuv2/waku_test.go +++ b/wakuv2/waku_test.go @@ -234,7 +234,7 @@ func TestWakuV2Filter(t *testing.T) { msgTimestamp := w.timestamp() contentTopic := maps.Keys(filter.ContentTopics)[0] - _, err = w.Send(relay.DefaultWakuTopic, &pb.WakuMessage{ + _, err = w.Send("", &pb.WakuMessage{ Payload: []byte{1, 2, 3, 4, 5}, ContentTopic: contentTopic.ContentTopic(), Version: 0, @@ -242,7 +242,7 @@ func TestWakuV2Filter(t *testing.T) { }) require.NoError(t, err) - time.Sleep(5 * time.Second) + time.Sleep(15 * time.Second) // Ensure there is at least 1 active filter subscription subscriptions := w.node.FilterLightnode().Subscriptions()