From 19875ed9b5e2325b57a2603dc4d4e72939e7e441 Mon Sep 17 00:00:00 2001 From: Igor Sirotin Date: Mon, 27 May 2024 13:49:09 +0100 Subject: [PATCH] fix: archive decryption fail (#5076) * fix_: delay archive import if decryption failed * chore_: minor cleanup * chore_: test archive import of encrypted channel * test(TestImportDecryptedArchiveMessages)_: first iteration * feat_: GetHashRatchetMessagesCountForGroup * chore_: log community description and shard info message ids * test(TestImportDecryptedArchiveMessages)_: cleanup * fix_: rebase issues * chore_: remove temporal test * test(TestImportDecryptedArchiveMessages)_: cleanup * chore_: lint fix * fix(TestImportDecryptedArchiveMessages)_: ForceMembersReevaluation --- protocol/common/raw_messages_persistence.go | 13 ++ protocol/communities/manager.go | 24 ++- ...nities_messenger_token_permissions_test.go | 200 ++++++++++++++++++ protocol/encryption/encryptor.go | 4 +- protocol/messenger_communities.go | 39 +++- protocol/messenger_community_shard.go | 10 +- protocol/messenger_testing_utils.go | 13 ++ 7 files changed, 287 insertions(+), 16 deletions(-) diff --git a/protocol/common/raw_messages_persistence.go b/protocol/common/raw_messages_persistence.go index ab8cbce93..69ff7689b 100644 --- a/protocol/common/raw_messages_persistence.go +++ b/protocol/common/raw_messages_persistence.go @@ -6,6 +6,7 @@ import ( "crypto/ecdsa" "database/sql" "encoding/gob" + "errors" "strings" "time" @@ -356,6 +357,18 @@ func (db RawMessagesPersistence) GetHashRatchetMessages(keyID []byte) ([]*types. return messages, nil } +func (db RawMessagesPersistence) GetHashRatchetMessagesCountForGroup(groupID []byte) (int, error) { + var count int + err := db.db.QueryRow(`SELECT count(*) FROM hash_ratchet_encrypted_messages WHERE group_id = ?`, groupID).Scan(&count) + if err == nil { + return count, nil + } + if errors.Is(err, sql.ErrNoRows) { + return 0, nil + } + return 0, err +} + func (db RawMessagesPersistence) DeleteHashRatchetMessages(ids [][]byte) error { if len(ids) == 0 { return nil diff --git a/protocol/communities/manager.go b/protocol/communities/manager.go index d8fb5a748..bd8b04c75 100644 --- a/protocol/communities/manager.go +++ b/protocol/communities/manager.go @@ -10,6 +10,7 @@ import ( "io/ioutil" "net" "os" + "path" "sort" "strconv" "strings" @@ -4363,7 +4364,7 @@ func (m *Manager) CreateAndSeedHistoryArchive(communityID types.HexBytes, topics func (m *Manager) StartHistoryArchiveTasksInterval(community *Community, interval time.Duration) { id := community.IDString() if _, exists := m.historyArchiveTasks.Load(id); exists { - m.LogStdout("history archive tasks interval already in progres", zap.String("id", id)) + m.LogStdout("history archive tasks interval already in progress", zap.String("id", id)) return } @@ -4445,9 +4446,9 @@ func (m *Manager) CreateHistoryArchiveTorrentFromMessages(communityID types.HexB } func (m *Manager) CreateHistoryArchiveTorrentFromDB(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) { - return m.CreateHistoryArchiveTorrent(communityID, make([]*types.Message, 0), topics, startDate, endDate, partition, encrypt) } + func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, msgs []*types.Message, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) { loadFromDB := len(msgs) == 0 @@ -4528,7 +4529,6 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, msgs [ messages = append(messages, *msg) } } - } if len(messages) == 0 { @@ -4542,6 +4542,8 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, msgs [ continue } + m.LogStdout("creating archive with messages", zap.Int("messagesCount", len(messages))) + // Not only do we partition messages, we also chunk them // roughly by size, such that each chunk will not exceed a given // size and archive data doesn't get too big @@ -4990,6 +4992,10 @@ func (m *Manager) DownloadHistoryArchivesByMagnetlink(communityID types.HexBytes } } +func (m *Manager) SaveMessageArchiveID(communityID types.HexBytes, hash string) error { + return m.persistence.SaveMessageArchiveID(communityID, hash) +} + func (m *Manager) GetMessageArchiveIDsToImport(communityID types.HexBytes) ([]string, error) { return m.persistence.GetMessageArchiveIDsToImport(communityID) } @@ -5008,7 +5014,9 @@ func (m *Manager) ExtractMessagesFromHistoryArchive(communityID types.HexBytes, } defer dataFile.Close() - m.LogStdout("extracting messages from history archive", zap.String("archive id", archiveID)) + m.LogStdout("extracting messages from history archive", + zap.String("communityID", communityID.String()), + zap.String("archiveID", archiveID)) metadata := index.Archives[archiveID] _, err = dataFile.Seek(int64(metadata.Offset), 0) @@ -5049,7 +5057,7 @@ func (m *Manager) ExtractMessagesFromHistoryArchive(communityID types.HexBytes, } err = proto.Unmarshal(decryptedBytes.DecryptedMessage, archive) if err != nil { - m.LogStdout("failed to unmarshal message archive data", zap.Error(err)) + m.LogStdout("failed to unmarshal message archive", zap.Error(err)) return nil, err } } @@ -5151,15 +5159,15 @@ func (m *Manager) TorrentFileExists(communityID string) bool { } func (m *Manager) torrentFile(communityID string) string { - return m.torrentConfig.TorrentDir + "/" + communityID + ".torrent" + return path.Join(m.torrentConfig.TorrentDir, communityID+".torrent") } func (m *Manager) archiveIndexFile(communityID string) string { - return m.torrentConfig.DataDir + "/" + communityID + "/index" + return path.Join(m.torrentConfig.DataDir, communityID, "index") } func (m *Manager) archiveDataFile(communityID string) string { - return m.torrentConfig.DataDir + "/" + communityID + "/data" + return path.Join(m.torrentConfig.DataDir, communityID, "data") } func topicsAsByteArrays(topics []types.TopicType) [][]byte { diff --git a/protocol/communities_messenger_token_permissions_test.go b/protocol/communities_messenger_token_permissions_test.go index fc0db76b6..be1905284 100644 --- a/protocol/communities_messenger_token_permissions_test.go +++ b/protocol/communities_messenger_token_permissions_test.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "math/big" + "os" "strconv" "strings" "sync" @@ -16,17 +17,20 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/suite" "go.uber.org/zap" + "golang.org/x/exp/maps" gethcommon "github.com/ethereum/go-ethereum/common" hexutil "github.com/ethereum/go-ethereum/common/hexutil" gethbridge "github.com/status-im/status-go/eth-node/bridge/geth" "github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/types" + "github.com/status-im/status-go/params" "github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/common/shard" "github.com/status-im/status-go/protocol/communities" "github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/requests" + "github.com/status-im/status-go/protocol/transport" "github.com/status-im/status-go/protocol/tt" ) @@ -2133,3 +2137,199 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestReevaluateMemberPermissi fmt.Println("ReevaluateMembers Time: ", elapsed) s.Require().Less(elapsed.Seconds(), 2.0) } + +func (s *MessengerCommunitiesTokenPermissionsSuite) TestImportDecryptedArchiveMessages() { + // 1.1. Create community + community, chat := s.createCommunity() + + // 1.2. Setup permissions + communityPermission := &requests.CreateCommunityTokenPermission{ + CommunityID: community.ID(), + Type: protobuf.CommunityTokenPermission_BECOME_MEMBER, + TokenCriteria: []*protobuf.TokenCriteria{ + { + Type: protobuf.CommunityTokenType_ERC20, + ContractAddresses: map[uint64]string{testChainID1: "0x124"}, + Symbol: "TEST2", + AmountInWei: "100000000000000000000", + Decimals: uint64(18), + }, + }, + } + + channelPermission := &requests.CreateCommunityTokenPermission{ + CommunityID: community.ID(), + Type: protobuf.CommunityTokenPermission_CAN_VIEW_AND_POST_CHANNEL, + ChatIds: []string{chat.ID}, + TokenCriteria: []*protobuf.TokenCriteria{ + { + Type: protobuf.CommunityTokenType_ERC20, + ContractAddresses: map[uint64]string{testChainID1: "0x124"}, + Symbol: "TEST2", + AmountInWei: "200000000000000000000", + Decimals: uint64(18), + }, + }, + } + + waitOnChannelKeyAdded := s.waitOnKeyDistribution(func(sub *CommunityAndKeyActions) bool { + action, ok := sub.keyActions.ChannelKeysActions[chat.CommunityChatID()] + if !ok || action.ActionType != communities.EncryptionKeyAdd { + return false + } + _, ok = action.Members[common.PubkeyToHex(&s.owner.identity.PublicKey)] + return ok + }) + + waitOnCommunityPermissionCreated := waitOnCommunitiesEvent(s.owner, func(sub *communities.Subscription) bool { + return len(sub.Community.TokenPermissions()) == 2 + }) + + response, err := s.owner.CreateCommunityTokenPermission(communityPermission) + s.Require().NoError(err) + s.Require().NotNil(response) + s.Require().Len(response.Communities(), 1) + + response, err = s.owner.CreateCommunityTokenPermission(channelPermission) + s.Require().NoError(err) + s.Require().NotNil(response) + s.Require().Len(response.Communities(), 1) + + community = response.Communities()[0] + s.Require().True(community.HasTokenPermissions()) + s.Require().Len(community.TokenPermissions(), 2) + + err = <-waitOnCommunityPermissionCreated + s.Require().NoError(err) + s.Require().True(community.Encrypted()) + + err = <-waitOnChannelKeyAdded + s.Require().NoError(err) + + // 2. Owner: Send a message A + messageText1 := RandomLettersString(10) + message1 := s.sendChatMessage(s.owner, chat.ID, messageText1) + + // 2.2. Retrieve own message (to make it stored in the archive later) + _, err = s.owner.RetrieveAll() + s.Require().NoError(err) + + // 3. Owner: Create community archive + const partition = 2 * time.Minute + messageDate := time.UnixMilli(int64(message1.Timestamp)) + startDate := messageDate.Add(-time.Minute) + endDate := messageDate.Add(time.Minute) + topic := types.BytesToTopic(transport.ToTopic(chat.ID)) + topics := []types.TopicType{topic} + + torrentConfig := params.TorrentConfig{ + Enabled: true, + DataDir: os.TempDir() + "/archivedata", + TorrentDir: os.TempDir() + "/torrents", + Port: 0, + } + + // Share archive directory between all users + s.owner.communitiesManager.SetTorrentConfig(&torrentConfig) + s.bob.communitiesManager.SetTorrentConfig(&torrentConfig) + s.owner.config.messengerSignalsHandler = &MessengerSignalsHandlerMock{} + s.bob.config.messengerSignalsHandler = &MessengerSignalsHandlerMock{} + + archiveIDs, err := s.owner.communitiesManager.CreateHistoryArchiveTorrentFromDB(community.ID(), topics, startDate, endDate, partition, community.Encrypted()) + s.Require().NoError(err) + s.Require().Len(archiveIDs, 1) + + community, err = s.owner.GetCommunityByID(community.ID()) + s.Require().NoError(err) + + // 4. Bob: join community (satisfying membership, but not channel permissions) + s.makeAddressSatisfyTheCriteria(testChainID1, bobAddress, communityPermission.TokenCriteria[0]) + s.advertiseCommunityTo(community, s.bob) + + waitForKeysDistributedToBob := s.waitOnKeyDistribution(func(sub *CommunityAndKeyActions) bool { + action := sub.keyActions.CommunityKeyAction + if action.ActionType != communities.EncryptionKeySendToMembers { + return false + } + _, ok := action.Members[s.bob.IdentityPublicKeyString()] + return ok + }) + + s.joinCommunity(community, s.bob, bobPassword, []string{}) + + err = <-waitForKeysDistributedToBob + s.Require().NoError(err) + + // 5. Bob: Import community archive + // The archive is successfully decrypted, but the message inside is not. + // https://github.com/status-im/status-desktop/issues/13105 can be reproduced at this stage + // by forcing `encryption.ErrHashRatchetGroupIDNotFound` in `ExtractMessagesFromHistoryArchive` after decryption here: + // https://github.com/status-im/status-go/blob/6c82a6c2be7ebed93bcae3b9cf5053da3820de50/protocol/communities/manager.go#L4403 + + // Ensure owner has archive + archiveIndex, err := s.owner.communitiesManager.LoadHistoryArchiveIndexFromFile(s.owner.identity, community.ID()) + s.Require().NoError(err) + s.Require().Len(archiveIndex.Archives, 1) + + // Ensure bob has archive (because they share same local directory) + archiveIndex, err = s.bob.communitiesManager.LoadHistoryArchiveIndexFromFile(s.bob.identity, community.ID()) + s.Require().NoError(err) + s.Require().Len(archiveIndex.Archives, 1) + + archiveHash := maps.Keys(archiveIndex.Archives)[0] + + // Save message archive ID as in + // https://github.com/status-im/status-go/blob/6c82a6c2be7ebed93bcae3b9cf5053da3820de50/protocol/communities/manager.go#L4325-L4336 + err = s.bob.communitiesManager.SaveMessageArchiveID(community.ID(), archiveHash) + s.Require().NoError(err) + + // Import archive + s.bob.importDelayer.once.Do(func() { + close(s.bob.importDelayer.wait) + }) + cancel := make(chan struct{}) + err = s.bob.importHistoryArchives(community.ID(), cancel) + s.Require().NoError(err) + + // Ensure message1 wasn't imported, as it's encrypted, and we don't have access to the channel + receivedMessage1, err := s.bob.MessageByID(message1.ID) + s.Require().Nil(receivedMessage1) + s.Require().Error(err) + + chatID := []byte(chat.ID) + hashRatchetMessagesCount, err := s.bob.persistence.GetHashRatchetMessagesCountForGroup(chatID) + s.Require().NoError(err) + s.Require().Equal(1, hashRatchetMessagesCount) + + // Make bob satisfy channel criteria + waitOnChannelKeyToBeDistributedToBob := s.waitOnKeyDistribution(func(sub *CommunityAndKeyActions) bool { + action, ok := sub.keyActions.ChannelKeysActions[chat.CommunityChatID()] + if !ok || action.ActionType != communities.EncryptionKeySendToMembers { + return false + } + _, ok = action.Members[common.PubkeyToHex(&s.bob.identity.PublicKey)] + return ok + }) + + s.makeAddressSatisfyTheCriteria(testChainID1, bobAddress, channelPermission.TokenCriteria[0]) + + // force owner to reevaluate channel members + // in production it will happen automatically, by periodic check + err = s.owner.communitiesManager.ForceMembersReevaluation(community.ID()) + s.Require().NoError(err) + + err = <-waitOnChannelKeyToBeDistributedToBob + s.Require().NoError(err) + + // Finally ensure that the message from archive was retrieved and decrypted + + // NOTE: In theory a single RetrieveAll call should be enough, + // because we immediately process all hash ratchet messages + response, err = s.bob.RetrieveAll() + s.Require().NoError(err) + s.Require().Len(response.Messages(), 1) + + receivedMessage1, ok := response.messages[message1.ID] + s.Require().True(ok) + s.Require().Equal(messageText1, receivedMessage1.Text) +} diff --git a/protocol/encryption/encryptor.go b/protocol/encryption/encryptor.go index 521df4ec0..dd6dbbd6b 100644 --- a/protocol/encryption/encryptor.go +++ b/protocol/encryption/encryptor.go @@ -26,8 +26,8 @@ var ( // This should not happen because the protocol forbids sending a message to // non-paired devices, however, in theory it is possible to receive such a message. ErrNotPairedDevice = errors.New("received a message from not paired device") - ErrHashRatchetSeqNoTooHigh = errors.New("Hash ratchet seq no is too high") - ErrHashRatchetGroupIDNotFound = errors.New("Hash ratchet group id not found") + ErrHashRatchetSeqNoTooHigh = errors.New("hash ratchet seq no is too high") + ErrHashRatchetGroupIDNotFound = errors.New("hash ratchet group id not found") ErrNoEncryptionKey = errors.New("no encryption key found for the community") ) diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index f70c093b9..74b842907 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -36,6 +36,7 @@ import ( "github.com/status-im/status-go/protocol/communities" "github.com/status-im/status-go/protocol/communities/token" "github.com/status-im/status-go/protocol/discord" + "github.com/status-im/status-go/protocol/encryption" "github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/requests" "github.com/status-im/status-go/protocol/transport" @@ -152,7 +153,14 @@ func (m *Messenger) publishOrg(org *communities.Community, shouldRekey bool) err rawMessage.HashRatchetGroupID = org.ID() rawMessage.Recipients = members } - _, err = m.sender.SendPublic(context.Background(), org.IDString(), rawMessage) + messageID, err := m.sender.SendPublic(context.Background(), org.IDString(), rawMessage) + if err == nil { + m.logger.Debug("published community", + zap.String("communityID", org.IDString()), + zap.String("messageID", hexutil.Encode(messageID)), + zap.Uint64("clock", org.Clock()), + ) + } return err } @@ -366,7 +374,6 @@ func (m *Messenger) handleCommunitiesSubscription(c chan *communities.Subscripti m.logger.Warn("failed to publish public shard info", zap.Error(err)) return } - m.logger.Debug("published public shard info") // signal client with published community if m.config.messengerSignalsHandler != nil { @@ -3911,8 +3918,20 @@ func (m *Messenger) importHistoryArchives(communityID types.HexBytes, cancel cha return nil } + delayImport := false + importMessageArchivesLoop: for { + if delayImport { + select { + case <-ctx.Done(): + m.communitiesManager.LogStdout("interrupted importing history archive messages") + return nil + case <-time.After(1 * time.Hour): + delayImport = false + } + } + select { case <-ctx.Done(): m.communitiesManager.LogStdout("interrupted importing history archive messages") @@ -3933,7 +3952,7 @@ importMessageArchivesLoop: break importMessageArchivesLoop } - m.communitiesManager.LogStdout(fmt.Sprintf("importing message archive, %d left", len(archiveIDsToImport))) + m.communitiesManager.LogStdout("importing message archive", zap.Int("left", len(archiveIDsToImport))) // only process one archive at a time, so in case of cancel we don't // wait for all archives to be processed first @@ -3941,6 +3960,12 @@ importMessageArchivesLoop: archiveMessages, err := m.communitiesManager.ExtractMessagesFromHistoryArchive(communityID, downloadedArchiveID) if err != nil { + if errors.Is(err, encryption.ErrHashRatchetGroupIDNotFound) { + // In case we're missing hash ratchet keys, best we can do is + // to wait for them to be received and try import again. + delayImport = true + continue + } m.communitiesManager.LogStdout("failed to extract history archive messages", zap.Error(err)) continue } @@ -4054,7 +4079,9 @@ func (m *Messenger) EnableCommunityHistoryArchiveProtocol() error { if len(controlledCommunities) > 0 { go m.InitHistoryArchiveTasks(controlledCommunities) } - m.config.messengerSignalsHandler.HistoryArchivesProtocolEnabled() + if m.config.messengerSignalsHandler != nil { + m.config.messengerSignalsHandler.HistoryArchivesProtocolEnabled() + } return nil } @@ -4077,7 +4104,9 @@ func (m *Messenger) DisableCommunityHistoryArchiveProtocol() error { if err != nil { return err } - m.config.messengerSignalsHandler.HistoryArchivesProtocolDisabled() + if m.config.messengerSignalsHandler != nil { + m.config.messengerSignalsHandler.HistoryArchivesProtocolDisabled() + } return nil } diff --git a/protocol/messenger_community_shard.go b/protocol/messenger_community_shard.go index 097c3ea8a..c32b5b1a5 100644 --- a/protocol/messenger_community_shard.go +++ b/protocol/messenger_community_shard.go @@ -4,6 +4,8 @@ import ( "context" "errors" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/golang/protobuf/proto" "go.uber.org/zap" @@ -59,7 +61,13 @@ func (m *Messenger) sendPublicCommunityShardInfo(community *communities.Communit } chatName := transport.CommunityShardInfoTopic(community.IDString()) - _, err = m.sender.SendPublic(context.Background(), chatName, rawMessage) + messageID, err := m.sender.SendPublic(context.Background(), chatName, rawMessage) + if err == nil { + m.logger.Debug("published public community shard info", + zap.String("communityID", community.IDString()), + zap.String("messageID", hexutil.Encode(messageID)), + ) + } return err } diff --git a/protocol/messenger_testing_utils.go b/protocol/messenger_testing_utils.go index b79775160..f04d182a9 100644 --- a/protocol/messenger_testing_utils.go +++ b/protocol/messenger_testing_utils.go @@ -67,6 +67,19 @@ func (m *MessengerSignalsHandlerMock) SendWakuBackedUpKeypair(*wakusync.WakuBack func (m *MessengerSignalsHandlerMock) SendWakuBackedUpWatchOnlyAccount(*wakusync.WakuBackedUpDataResponse) { } +func (m *MessengerSignalsHandlerMock) BackupPerformed(uint64) {} +func (m *MessengerSignalsHandlerMock) HistoryArchivesProtocolEnabled() {} +func (m *MessengerSignalsHandlerMock) HistoryArchivesProtocolDisabled() {} +func (m *MessengerSignalsHandlerMock) CreatingHistoryArchives(string) {} +func (m *MessengerSignalsHandlerMock) NoHistoryArchivesCreated(string, int, int) {} +func (m *MessengerSignalsHandlerMock) HistoryArchivesCreated(string, int, int) {} +func (m *MessengerSignalsHandlerMock) HistoryArchivesSeeding(string) {} +func (m *MessengerSignalsHandlerMock) HistoryArchivesUnseeded(string) {} +func (m *MessengerSignalsHandlerMock) HistoryArchiveDownloaded(string, int, int) {} +func (m *MessengerSignalsHandlerMock) DownloadingHistoryArchivesStarted(string) {} +func (m *MessengerSignalsHandlerMock) DownloadingHistoryArchivesFinished(string) {} +func (m *MessengerSignalsHandlerMock) ImportingHistoryArchiveMessages(string) {} + func (m *MessengerSignalsHandlerMock) MessengerResponse(response *MessengerResponse) { // Non-blocking send select {