fix: send request on non protected topics, and add missing shard information on invite
This commit is contained in:
parent
4a9220bf96
commit
c012f94681
|
@ -462,7 +462,7 @@ func (m *Manager) runOwnerVerificationLoop() {
|
|||
}
|
||||
|
||||
// TODO: handle shards
|
||||
response, err := m.HandleCommunityDescriptionMessage(signer, description, communityToValidate.payload, ownerPK)
|
||||
response, err := m.HandleCommunityDescriptionMessage(signer, description, communityToValidate.payload, ownerPK, nil)
|
||||
if err != nil {
|
||||
m.logger.Error("failed to handle community", zap.Error(err))
|
||||
err = m.persistence.DeleteCommunityToValidate(communityToValidate.id, communityToValidate.clock)
|
||||
|
@ -1577,7 +1577,7 @@ func (m *Manager) Queue(signer *ecdsa.PublicKey, community *Community, clock uin
|
|||
return nil
|
||||
}
|
||||
|
||||
func (m *Manager) HandleCommunityDescriptionMessage(signer *ecdsa.PublicKey, description *protobuf.CommunityDescription, payload []byte, verifiedOwner *ecdsa.PublicKey) (*CommunityResponse, error) {
|
||||
func (m *Manager) HandleCommunityDescriptionMessage(signer *ecdsa.PublicKey, description *protobuf.CommunityDescription, payload []byte, verifiedOwner *ecdsa.PublicKey, communityShard *protobuf.Shard) (*CommunityResponse, error) {
|
||||
if signer == nil {
|
||||
return nil, errors.New("signer can't be nil")
|
||||
}
|
||||
|
@ -1618,6 +1618,7 @@ func (m *Manager) HandleCommunityDescriptionMessage(signer *ecdsa.PublicKey, des
|
|||
MemberIdentity: &m.identity.PublicKey,
|
||||
ID: pubKey,
|
||||
ControlNode: signer,
|
||||
Shard: shard.FromProtobuff(communityShard),
|
||||
}
|
||||
|
||||
community, err = New(config, m.timesource)
|
||||
|
|
|
@ -1611,7 +1611,7 @@ func (s *ManagerSuite) TestCommunityQueue() {
|
|||
|
||||
subscription := m.Subscribe()
|
||||
|
||||
response, err := m.HandleCommunityDescriptionMessage(¬TheOwner.PublicKey, description, payload, nil)
|
||||
response, err := m.HandleCommunityDescriptionMessage(¬TheOwner.PublicKey, description, payload, nil, nil)
|
||||
s.Require().NoError(err)
|
||||
|
||||
// No response, as it should be queued
|
||||
|
@ -1703,7 +1703,7 @@ func (s *ManagerSuite) TestCommunityQueueMultipleDifferentSigners() {
|
|||
|
||||
subscription := m.Subscribe()
|
||||
|
||||
response, err := m.HandleCommunityDescriptionMessage(&oldOwner.PublicKey, description, payload, nil)
|
||||
response, err := m.HandleCommunityDescriptionMessage(&oldOwner.PublicKey, description, payload, nil, nil)
|
||||
s.Require().NoError(err)
|
||||
|
||||
// No response, as it should be queued
|
||||
|
@ -1721,7 +1721,7 @@ func (s *ManagerSuite) TestCommunityQueueMultipleDifferentSigners() {
|
|||
payload, err = v1.WrapMessageV1(payload, protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION, newOwner)
|
||||
s.Require().NoError(err)
|
||||
|
||||
response, err = m.HandleCommunityDescriptionMessage(&newOwner.PublicKey, description, payload, nil)
|
||||
response, err = m.HandleCommunityDescriptionMessage(&newOwner.PublicKey, description, payload, nil, nil)
|
||||
s.Require().NoError(err)
|
||||
|
||||
// No response, as it should be queued
|
||||
|
@ -1833,7 +1833,7 @@ func (s *ManagerSuite) TestCommunityQueueMultipleDifferentSignersIgnoreIfNotRetu
|
|||
|
||||
subscription := m.Subscribe()
|
||||
|
||||
response, err := m.HandleCommunityDescriptionMessage(&oldOwner.PublicKey, description, payload, nil)
|
||||
response, err := m.HandleCommunityDescriptionMessage(&oldOwner.PublicKey, description, payload, nil, nil)
|
||||
s.Require().NoError(err)
|
||||
|
||||
// No response, as it should be queued
|
||||
|
@ -1849,7 +1849,7 @@ func (s *ManagerSuite) TestCommunityQueueMultipleDifferentSignersIgnoreIfNotRetu
|
|||
payload, err = v1.WrapMessageV1(payload, protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION, newOwner)
|
||||
s.Require().NoError(err)
|
||||
|
||||
response, err = m.HandleCommunityDescriptionMessage(&newOwner.PublicKey, description, payload, nil)
|
||||
response, err = m.HandleCommunityDescriptionMessage(&newOwner.PublicKey, description, payload, nil, nil)
|
||||
s.Require().NoError(err)
|
||||
|
||||
// No response, as it should be queued
|
||||
|
|
|
@ -40,6 +40,7 @@ import (
|
|||
sociallinkssettings "github.com/status-im/status-go/multiaccounts/settings_social_links"
|
||||
"github.com/status-im/status-go/protocol/anonmetrics"
|
||||
"github.com/status-im/status-go/protocol/common"
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
"github.com/status-im/status-go/protocol/communities"
|
||||
"github.com/status-im/status-go/protocol/encryption"
|
||||
"github.com/status-im/status-go/protocol/encryption/multidevice"
|
||||
|
@ -1673,6 +1674,14 @@ func (m *Messenger) Init() error {
|
|||
|
||||
logger := m.logger.With(zap.String("site", "Init"))
|
||||
|
||||
if m.useShards() {
|
||||
// Community requests will arrive in this pubsub topic
|
||||
err := m.SubscribeToPubsubTopic(shard.DefaultNonProtectedPubsubTopic(), nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
filtersToInit []transport.FiltersToInitialize
|
||||
publicKeys []*ecdsa.PublicKey
|
||||
|
@ -1734,25 +1743,6 @@ func (m *Messenger) Init() error {
|
|||
filtersToInit = append(filtersToInit, m.DefaultFilters(org)...)
|
||||
}
|
||||
|
||||
// Init filters for the communities we control
|
||||
var communityFiltersToInitialize []transport.CommunityFilterToInitialize
|
||||
controlledCommunities, err := m.communitiesManager.Controlled()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, c := range controlledCommunities {
|
||||
communityFiltersToInitialize = append(communityFiltersToInitialize, transport.CommunityFilterToInitialize{
|
||||
Shard: c.Shard(),
|
||||
PrivKey: c.PrivateKey(),
|
||||
})
|
||||
}
|
||||
|
||||
_, err = m.InitCommunityFilters(communityFiltersToInitialize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Get chat IDs and public keys from the existing chats.
|
||||
// TODO: Get only active chats by the query.
|
||||
chats, err := m.persistence.Chats()
|
||||
|
@ -1872,7 +1862,30 @@ func (m *Messenger) Init() error {
|
|||
}
|
||||
|
||||
_, err = m.transport.InitFilters(filtersToInit, publicKeys)
|
||||
return err
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Init filters for the communities we control
|
||||
var communityFiltersToInitialize []transport.CommunityFilterToInitialize
|
||||
controlledCommunities, err := m.communitiesManager.Controlled()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, c := range controlledCommunities {
|
||||
communityFiltersToInitialize = append(communityFiltersToInitialize, transport.CommunityFilterToInitialize{
|
||||
Shard: c.Shard(),
|
||||
PrivKey: c.PrivateKey(),
|
||||
})
|
||||
}
|
||||
|
||||
_, err = m.InitCommunityFilters(communityFiltersToInitialize)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Shutdown takes care of ensuring a clean shutdown of Messenger
|
||||
|
|
|
@ -1906,7 +1906,6 @@ func (m *Messenger) DefaultFilters(o *communities.Community) []transport.Filters
|
|||
|
||||
filters := []transport.FiltersToInitialize{
|
||||
{ChatID: cID, PubsubTopic: communityPubsubTopic},
|
||||
{ChatID: uncompressedPubKey, PubsubTopic: communityPubsubTopic},
|
||||
{ChatID: updatesChannelID, PubsubTopic: communityPubsubTopic},
|
||||
{ChatID: mlChannelID, PubsubTopic: communityPubsubTopic},
|
||||
{ChatID: memberUpdateChannelID, PubsubTopic: communityPubsubTopic},
|
||||
|
@ -1914,6 +1913,8 @@ func (m *Messenger) DefaultFilters(o *communities.Community) []transport.Filters
|
|||
|
||||
if m.useShards() {
|
||||
filters = append(filters, transport.FiltersToInitialize{ChatID: uncompressedPubKey, PubsubTopic: shard.DefaultNonProtectedPubsubTopic()})
|
||||
} else {
|
||||
filters = append(filters, transport.FiltersToInitialize{ChatID: uncompressedPubKey, PubsubTopic: communityPubsubTopic})
|
||||
}
|
||||
|
||||
return filters
|
||||
|
@ -2315,6 +2316,7 @@ func (m *Messenger) ShareCommunity(request *requests.ShareCommunity) (*Messenger
|
|||
message := common.NewMessage()
|
||||
message.ChatId = pk.String()
|
||||
message.CommunityID = request.CommunityID.String()
|
||||
message.Shard = community.Shard().Protobuffer()
|
||||
message.Text = fmt.Sprintf("Community %s has been shared with you", community.Name())
|
||||
if request.InviteMessage != "" {
|
||||
message.Text = request.InviteMessage
|
||||
|
@ -2804,8 +2806,8 @@ func (m *Messenger) passStoredCommunityInfoToSignalHandler(communityID string) {
|
|||
}
|
||||
|
||||
// handleCommunityDescription handles an community description
|
||||
func (m *Messenger) handleCommunityDescription(state *ReceivedMessageState, signer *ecdsa.PublicKey, description *protobuf.CommunityDescription, rawPayload []byte) error {
|
||||
communityResponse, err := m.communitiesManager.HandleCommunityDescriptionMessage(signer, description, rawPayload, nil)
|
||||
func (m *Messenger) handleCommunityDescription(state *ReceivedMessageState, signer *ecdsa.PublicKey, description *protobuf.CommunityDescription, rawPayload []byte, shard *protobuf.Shard) error {
|
||||
communityResponse, err := m.communitiesManager.HandleCommunityDescriptionMessage(signer, description, rawPayload, nil, shard)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -3188,7 +3190,8 @@ func (m *Messenger) handleSyncInstallationCommunity(messageState *ReceivedMessag
|
|||
return err
|
||||
}
|
||||
|
||||
err = m.handleCommunityDescription(messageState, orgPubKey, &cd, syncCommunity.Description)
|
||||
// TODO: handle shard
|
||||
err = m.handleCommunityDescription(messageState, orgPubKey, &cd, syncCommunity.Description, nil)
|
||||
if err != nil {
|
||||
logger.Debug("m.handleCommunityDescription error", zap.Error(err))
|
||||
return err
|
||||
|
|
|
@ -2300,7 +2300,7 @@ func (m *Messenger) handleChatMessage(state *ReceivedMessageState, forceSeen boo
|
|||
return err
|
||||
}
|
||||
|
||||
err = m.handleCommunityDescription(state, signer, description, receivedMessage.GetCommunity())
|
||||
err = m.handleCommunityDescription(state, signer, description, receivedMessage.GetCommunity(), receivedMessage.GetShard())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -3587,8 +3587,8 @@ func (m *Messenger) HandlePushNotificationRequest(state *ReceivedMessageState, m
|
|||
}
|
||||
|
||||
func (m *Messenger) HandleCommunityDescription(state *ReceivedMessageState, message *protobuf.CommunityDescription, statusMessage *v1protocol.StatusMessage) error {
|
||||
|
||||
err := m.handleCommunityDescription(state, state.CurrentMessageState.PublicKey, message, statusMessage.EncryptionLayer.Payload)
|
||||
// TODO: handle shard
|
||||
err := m.handleCommunityDescription(state, state.CurrentMessageState.PublicKey, message, statusMessage.EncryptionLayer.Payload, nil)
|
||||
if err != nil {
|
||||
m.logger.Warn("failed to handle CommunityDescription", zap.Error(err))
|
||||
return err
|
||||
|
|
|
@ -157,17 +157,12 @@ func (f *FiltersManager) InitCommunityFilters(communityFiltersToInitialize []Com
|
|||
|
||||
topics := make([]string, 0)
|
||||
if useShards {
|
||||
topics = append(topics, shard.DefaultShardPubsubTopic())
|
||||
if cf.Shard.PubsubTopic() != "" {
|
||||
topics = append(topics, cf.Shard.PubsubTopic())
|
||||
}
|
||||
topics = append(topics, shard.DefaultNonProtectedPubsubTopic())
|
||||
} else {
|
||||
topics = append(topics, "") // empty PubsubTopic means default pubsub topic,
|
||||
// to be overridden with proper value in Waku layer
|
||||
}
|
||||
|
||||
// TODO: requests to join / cancels are currently being sent into the default waku topic.
|
||||
// They must be sent into an specific non protected shard
|
||||
for _, pubsubTopic := range topics {
|
||||
pk := &cf.PrivKey.PublicKey
|
||||
identityStr := PublicKeyToStr(pk)
|
||||
|
|
Loading…
Reference in New Issue