fix: ignore outdated COMMUNITY_SHARD_KEY messages
This mitigates issue where community shard on client's side was not in sync with owner's. relates to: status-im/status-desktop#13217
This commit is contained in:
parent
67e18a8ff0
commit
ac81c15805
|
@ -1075,9 +1075,9 @@ func (m *Manager) DeleteCommunity(id types.HexBytes) error {
|
|||
return m.persistence.DeleteCommunitySettings(id)
|
||||
}
|
||||
|
||||
func (m *Manager) UpdateShard(community *Community, shard *shard.Shard) error {
|
||||
func (m *Manager) UpdateShard(community *Community, shard *shard.Shard, clock uint64) error {
|
||||
community.config.Shard = shard
|
||||
return m.persistence.SaveCommunityShard(community.ID(), shard, community.Clock())
|
||||
return m.persistence.SaveCommunityShard(community.ID(), shard, clock)
|
||||
}
|
||||
|
||||
// SetShard assigns a shard to a community
|
||||
|
@ -1092,7 +1092,7 @@ func (m *Manager) SetShard(communityID types.HexBytes, shard *shard.Shard) (*Com
|
|||
|
||||
community.increaseClock()
|
||||
|
||||
err = m.UpdateShard(community, shard)
|
||||
err = m.UpdateShard(community, shard, community.Clock())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -2486,7 +2486,7 @@ func (m *Messenger) SendCommunityShardKey(community *communities.Community, pubk
|
|||
}
|
||||
|
||||
communityShardKey := &protobuf.CommunityShardKey{
|
||||
Clock: m.getTimesource().GetCurrentTime(),
|
||||
Clock: community.Clock(),
|
||||
CommunityId: community.ID(),
|
||||
PrivateKey: crypto.FromECDSA(key),
|
||||
Shard: community.Shard().Protobuffer(),
|
||||
|
@ -2799,11 +2799,6 @@ func (m *Messenger) HandleCommunityEventsMessageRejected(state *ReceivedMessageS
|
|||
|
||||
// HandleCommunityShardKey handles the private keys for the community shards
|
||||
func (m *Messenger) HandleCommunityShardKey(state *ReceivedMessageState, message *protobuf.CommunityShardKey, statusMessage *v1protocol.StatusMessage) error {
|
||||
// TODO: @cammellos: This is risky, it does not seem to support out of order messages
|
||||
// (say that the community changes shards twice, last one wins, but we don't check clock
|
||||
// etc)
|
||||
|
||||
// TODO: @cammellos: getbyid returns nil if the community is not in the db, so we need to handle it
|
||||
community, err := m.communitiesManager.GetByID(message.CommunityId)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -2819,7 +2814,7 @@ func (m *Messenger) HandleCommunityShardKey(state *ReceivedMessageState, message
|
|||
return errors.New("signer can't be nil")
|
||||
}
|
||||
|
||||
err = m.handleCommunityShardAndFiltersFromProto(community, shard.FromProtobuff(message.Shard), message.PrivateKey)
|
||||
err = m.handleCommunityShardAndFiltersFromProto(community, message)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -2829,15 +2824,15 @@ func (m *Messenger) HandleCommunityShardKey(state *ReceivedMessageState, message
|
|||
return nil
|
||||
}
|
||||
|
||||
func (m *Messenger) handleCommunityShardAndFiltersFromProto(community *communities.Community, shard *shard.Shard, privateKeyBytes []byte) error {
|
||||
err := m.communitiesManager.UpdateShard(community, shard)
|
||||
func (m *Messenger) handleCommunityShardAndFiltersFromProto(community *communities.Community, message *protobuf.CommunityShardKey) error {
|
||||
err := m.communitiesManager.UpdateShard(community, shard.FromProtobuff(message.Shard), community.Clock())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var privKey *ecdsa.PrivateKey = nil
|
||||
if privateKeyBytes != nil {
|
||||
privKey, err = crypto.ToECDSA(privateKeyBytes)
|
||||
if message.PrivateKey != nil {
|
||||
privKey, err = crypto.ToECDSA(message.PrivateKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -2,15 +2,19 @@ package protocol
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"testing"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/stretchr/testify/suite"
|
||||
"go.uber.org/zap"
|
||||
|
||||
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
|
||||
"github.com/status-im/status-go/eth-node/types"
|
||||
"github.com/status-im/status-go/protocol/common"
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
"github.com/status-im/status-go/protocol/communities"
|
||||
"github.com/status-im/status-go/protocol/protobuf"
|
||||
"github.com/status-im/status-go/protocol/requests"
|
||||
"github.com/status-im/status-go/protocol/tt"
|
||||
)
|
||||
|
@ -25,8 +29,9 @@ type MessengerCommunitiesShardingSuite struct {
|
|||
owner *Messenger
|
||||
ownerWaku types.Waku
|
||||
|
||||
alice *Messenger
|
||||
aliceWaku types.Waku
|
||||
alice *Messenger
|
||||
aliceWaku types.Waku
|
||||
aliceUnhandledMessagesTracker *unhandledMessagesTracker
|
||||
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
@ -48,11 +53,15 @@ func (s *MessengerCommunitiesShardingSuite) SetupTest() {
|
|||
nodeConfig: nodeConfig,
|
||||
})
|
||||
|
||||
s.aliceUnhandledMessagesTracker = &unhandledMessagesTracker{
|
||||
messages: map[protobuf.ApplicationMetadataMessage_Type][]*unhandedMessage{},
|
||||
}
|
||||
s.aliceWaku = wakuNodes[1]
|
||||
s.alice = newTestCommunitiesMessenger(&s.Suite, s.aliceWaku, testCommunitiesMessengerConfig{
|
||||
testMessengerConfig: testMessengerConfig{
|
||||
name: "alice",
|
||||
logger: s.logger,
|
||||
name: "alice",
|
||||
logger: s.logger,
|
||||
unhandledMessagesTracker: s.aliceUnhandledMessagesTracker,
|
||||
},
|
||||
nodeConfig: nodeConfig,
|
||||
})
|
||||
|
@ -136,3 +145,78 @@ func (s *MessengerCommunitiesShardingSuite) TestPostToCommunityChat() {
|
|||
s.testPostToCommunityChat(nil, community, chat)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *MessengerCommunitiesShardingSuite) TestIgnoreOutdatedShardKey() {
|
||||
community, _ := createCommunity(&s.Suite, s.owner)
|
||||
|
||||
advertiseCommunityToUserOldWay(&s.Suite, community, s.owner, s.alice)
|
||||
joinCommunity(&s.Suite, community, s.owner, s.alice, &requests.RequestToJoinCommunity{CommunityID: community.ID()}, "")
|
||||
|
||||
shard := &shard.Shard{
|
||||
Cluster: shard.MainStatusShardCluster,
|
||||
Index: 128,
|
||||
}
|
||||
|
||||
// Members should receive shard update.
|
||||
{
|
||||
response, err := s.owner.SetCommunityShard(&requests.SetCommunityShard{
|
||||
CommunityID: community.ID(),
|
||||
Shard: shard,
|
||||
})
|
||||
s.Require().NoError(err)
|
||||
s.Require().Len(response.Communities(), 1)
|
||||
community = response.Communities()[0]
|
||||
|
||||
_, err = WaitOnMessengerResponse(s.alice, func(mr *MessengerResponse) bool {
|
||||
return len(mr.communities) > 0 && mr.Communities()[0].Shard() != nil && mr.Communities()[0].Shard().Index == shard.Index
|
||||
}, "shard info not updated")
|
||||
s.Require().NoError(err)
|
||||
}
|
||||
|
||||
// Members should ignore outdated shard update.
|
||||
{
|
||||
// Simulate outdated CommunityShardKey message.
|
||||
shard.Index = 256
|
||||
communityShardKey := &protobuf.CommunityShardKey{
|
||||
Clock: community.Clock() - 1, // simulate outdated clock
|
||||
CommunityId: community.ID(),
|
||||
Shard: shard.Protobuffer(),
|
||||
}
|
||||
|
||||
encodedMessage, err := proto.Marshal(communityShardKey)
|
||||
s.Require().NoError(err)
|
||||
|
||||
rawMessage := common.RawMessage{
|
||||
Recipients: []*ecdsa.PublicKey{&s.alice.identity.PublicKey},
|
||||
ResendAutomatically: true,
|
||||
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_SHARD_KEY,
|
||||
Payload: encodedMessage,
|
||||
}
|
||||
|
||||
_, err = s.owner.sender.SendPubsubTopicKey(context.Background(), &rawMessage)
|
||||
s.Require().NoError(err)
|
||||
|
||||
_, err = WaitOnMessengerResponse(s.alice, func(mr *MessengerResponse) bool {
|
||||
msgType := protobuf.ApplicationMetadataMessage_COMMUNITY_SHARD_KEY
|
||||
msgs, exists := s.aliceUnhandledMessagesTracker.messages[msgType]
|
||||
if !exists {
|
||||
return false
|
||||
}
|
||||
|
||||
for _, msg := range msgs {
|
||||
p := &protobuf.CommunityShardKey{}
|
||||
err := proto.Unmarshal(msg.ApplicationLayer.Payload, p)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if msg.err == communities.ErrOldShardInfo && p.Shard != nil && p.Shard.Index == int32(shard.Index) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}, "shard info with outdated clock either not received or not ignored")
|
||||
s.Require().NoError(err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,7 +25,6 @@ import (
|
|||
"github.com/status-im/status-go/multiaccounts/settings"
|
||||
walletsettings "github.com/status-im/status-go/multiaccounts/settings_wallet"
|
||||
"github.com/status-im/status-go/protocol/common"
|
||||
"github.com/status-im/status-go/protocol/common/shard"
|
||||
"github.com/status-im/status-go/protocol/communities"
|
||||
"github.com/status-im/status-go/protocol/encryption/multidevice"
|
||||
"github.com/status-im/status-go/protocol/identity"
|
||||
|
@ -1636,7 +1635,14 @@ func (m *Messenger) HandleCommunityRequestToJoinResponse(state *ReceivedMessageS
|
|||
return err
|
||||
}
|
||||
|
||||
err = m.handleCommunityShardAndFiltersFromProto(community, shard.FromProtobuff(requestToJoinResponseProto.Shard), requestToJoinResponseProto.ProtectedTopicPrivateKey)
|
||||
communityShardKey := &protobuf.CommunityShardKey{
|
||||
CommunityId: requestToJoinResponseProto.CommunityId,
|
||||
PrivateKey: requestToJoinResponseProto.ProtectedTopicPrivateKey,
|
||||
Clock: requestToJoinResponseProto.Community.Clock,
|
||||
Shard: requestToJoinResponseProto.Shard,
|
||||
}
|
||||
|
||||
err = m.handleCommunityShardAndFiltersFromProto(community, communityShardKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue