fix: archive decryption fail (#5076)

* fix_: delay archive import if decryption failed

* chore_: minor cleanup

* chore_: test archive import of encrypted channel

* test(TestImportDecryptedArchiveMessages)_: first iteration

* feat_: GetHashRatchetMessagesCountForGroup

* chore_: log community description and shard info message ids

* test(TestImportDecryptedArchiveMessages)_: cleanup

* fix_: rebase issues

* chore_: remove temporal test

* test(TestImportDecryptedArchiveMessages)_: cleanup

* chore_: lint fix

* fix(TestImportDecryptedArchiveMessages)_: ForceMembersReevaluation
This commit is contained in:
Igor Sirotin 2024-05-27 13:49:09 +01:00 committed by GitHub
parent 644f64b260
commit 19875ed9b5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 287 additions and 16 deletions

View File

@ -6,6 +6,7 @@ import (
"crypto/ecdsa"
"database/sql"
"encoding/gob"
"errors"
"strings"
"time"
@ -356,6 +357,18 @@ func (db RawMessagesPersistence) GetHashRatchetMessages(keyID []byte) ([]*types.
return messages, nil
}
func (db RawMessagesPersistence) GetHashRatchetMessagesCountForGroup(groupID []byte) (int, error) {
var count int
err := db.db.QueryRow(`SELECT count(*) FROM hash_ratchet_encrypted_messages WHERE group_id = ?`, groupID).Scan(&count)
if err == nil {
return count, nil
}
if errors.Is(err, sql.ErrNoRows) {
return 0, nil
}
return 0, err
}
func (db RawMessagesPersistence) DeleteHashRatchetMessages(ids [][]byte) error {
if len(ids) == 0 {
return nil

View File

@ -10,6 +10,7 @@ import (
"io/ioutil"
"net"
"os"
"path"
"sort"
"strconv"
"strings"
@ -4363,7 +4364,7 @@ func (m *Manager) CreateAndSeedHistoryArchive(communityID types.HexBytes, topics
func (m *Manager) StartHistoryArchiveTasksInterval(community *Community, interval time.Duration) {
id := community.IDString()
if _, exists := m.historyArchiveTasks.Load(id); exists {
m.LogStdout("history archive tasks interval already in progres", zap.String("id", id))
m.LogStdout("history archive tasks interval already in progress", zap.String("id", id))
return
}
@ -4445,9 +4446,9 @@ func (m *Manager) CreateHistoryArchiveTorrentFromMessages(communityID types.HexB
}
func (m *Manager) CreateHistoryArchiveTorrentFromDB(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) {
return m.CreateHistoryArchiveTorrent(communityID, make([]*types.Message, 0), topics, startDate, endDate, partition, encrypt)
}
func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, msgs []*types.Message, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) {
loadFromDB := len(msgs) == 0
@ -4528,7 +4529,6 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, msgs [
messages = append(messages, *msg)
}
}
}
if len(messages) == 0 {
@ -4542,6 +4542,8 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, msgs [
continue
}
m.LogStdout("creating archive with messages", zap.Int("messagesCount", len(messages)))
// Not only do we partition messages, we also chunk them
// roughly by size, such that each chunk will not exceed a given
// size and archive data doesn't get too big
@ -4990,6 +4992,10 @@ func (m *Manager) DownloadHistoryArchivesByMagnetlink(communityID types.HexBytes
}
}
func (m *Manager) SaveMessageArchiveID(communityID types.HexBytes, hash string) error {
return m.persistence.SaveMessageArchiveID(communityID, hash)
}
func (m *Manager) GetMessageArchiveIDsToImport(communityID types.HexBytes) ([]string, error) {
return m.persistence.GetMessageArchiveIDsToImport(communityID)
}
@ -5008,7 +5014,9 @@ func (m *Manager) ExtractMessagesFromHistoryArchive(communityID types.HexBytes,
}
defer dataFile.Close()
m.LogStdout("extracting messages from history archive", zap.String("archive id", archiveID))
m.LogStdout("extracting messages from history archive",
zap.String("communityID", communityID.String()),
zap.String("archiveID", archiveID))
metadata := index.Archives[archiveID]
_, err = dataFile.Seek(int64(metadata.Offset), 0)
@ -5049,7 +5057,7 @@ func (m *Manager) ExtractMessagesFromHistoryArchive(communityID types.HexBytes,
}
err = proto.Unmarshal(decryptedBytes.DecryptedMessage, archive)
if err != nil {
m.LogStdout("failed to unmarshal message archive data", zap.Error(err))
m.LogStdout("failed to unmarshal message archive", zap.Error(err))
return nil, err
}
}
@ -5151,15 +5159,15 @@ func (m *Manager) TorrentFileExists(communityID string) bool {
}
func (m *Manager) torrentFile(communityID string) string {
return m.torrentConfig.TorrentDir + "/" + communityID + ".torrent"
return path.Join(m.torrentConfig.TorrentDir, communityID+".torrent")
}
func (m *Manager) archiveIndexFile(communityID string) string {
return m.torrentConfig.DataDir + "/" + communityID + "/index"
return path.Join(m.torrentConfig.DataDir, communityID, "index")
}
func (m *Manager) archiveDataFile(communityID string) string {
return m.torrentConfig.DataDir + "/" + communityID + "/data"
return path.Join(m.torrentConfig.DataDir, communityID, "data")
}
func topicsAsByteArrays(topics []types.TopicType) [][]byte {

View File

@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"math/big"
"os"
"strconv"
"strings"
"sync"
@ -16,17 +17,20 @@ import (
"github.com/google/uuid"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
"golang.org/x/exp/maps"
gethcommon "github.com/ethereum/go-ethereum/common"
hexutil "github.com/ethereum/go-ethereum/common/hexutil"
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/common/shard"
"github.com/status-im/status-go/protocol/communities"
"github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/requests"
"github.com/status-im/status-go/protocol/transport"
"github.com/status-im/status-go/protocol/tt"
)
@ -2133,3 +2137,199 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestReevaluateMemberPermissi
fmt.Println("ReevaluateMembers Time: ", elapsed)
s.Require().Less(elapsed.Seconds(), 2.0)
}
func (s *MessengerCommunitiesTokenPermissionsSuite) TestImportDecryptedArchiveMessages() {
// 1.1. Create community
community, chat := s.createCommunity()
// 1.2. Setup permissions
communityPermission := &requests.CreateCommunityTokenPermission{
CommunityID: community.ID(),
Type: protobuf.CommunityTokenPermission_BECOME_MEMBER,
TokenCriteria: []*protobuf.TokenCriteria{
{
Type: protobuf.CommunityTokenType_ERC20,
ContractAddresses: map[uint64]string{testChainID1: "0x124"},
Symbol: "TEST2",
AmountInWei: "100000000000000000000",
Decimals: uint64(18),
},
},
}
channelPermission := &requests.CreateCommunityTokenPermission{
CommunityID: community.ID(),
Type: protobuf.CommunityTokenPermission_CAN_VIEW_AND_POST_CHANNEL,
ChatIds: []string{chat.ID},
TokenCriteria: []*protobuf.TokenCriteria{
{
Type: protobuf.CommunityTokenType_ERC20,
ContractAddresses: map[uint64]string{testChainID1: "0x124"},
Symbol: "TEST2",
AmountInWei: "200000000000000000000",
Decimals: uint64(18),
},
},
}
waitOnChannelKeyAdded := s.waitOnKeyDistribution(func(sub *CommunityAndKeyActions) bool {
action, ok := sub.keyActions.ChannelKeysActions[chat.CommunityChatID()]
if !ok || action.ActionType != communities.EncryptionKeyAdd {
return false
}
_, ok = action.Members[common.PubkeyToHex(&s.owner.identity.PublicKey)]
return ok
})
waitOnCommunityPermissionCreated := waitOnCommunitiesEvent(s.owner, func(sub *communities.Subscription) bool {
return len(sub.Community.TokenPermissions()) == 2
})
response, err := s.owner.CreateCommunityTokenPermission(communityPermission)
s.Require().NoError(err)
s.Require().NotNil(response)
s.Require().Len(response.Communities(), 1)
response, err = s.owner.CreateCommunityTokenPermission(channelPermission)
s.Require().NoError(err)
s.Require().NotNil(response)
s.Require().Len(response.Communities(), 1)
community = response.Communities()[0]
s.Require().True(community.HasTokenPermissions())
s.Require().Len(community.TokenPermissions(), 2)
err = <-waitOnCommunityPermissionCreated
s.Require().NoError(err)
s.Require().True(community.Encrypted())
err = <-waitOnChannelKeyAdded
s.Require().NoError(err)
// 2. Owner: Send a message A
messageText1 := RandomLettersString(10)
message1 := s.sendChatMessage(s.owner, chat.ID, messageText1)
// 2.2. Retrieve own message (to make it stored in the archive later)
_, err = s.owner.RetrieveAll()
s.Require().NoError(err)
// 3. Owner: Create community archive
const partition = 2 * time.Minute
messageDate := time.UnixMilli(int64(message1.Timestamp))
startDate := messageDate.Add(-time.Minute)
endDate := messageDate.Add(time.Minute)
topic := types.BytesToTopic(transport.ToTopic(chat.ID))
topics := []types.TopicType{topic}
torrentConfig := params.TorrentConfig{
Enabled: true,
DataDir: os.TempDir() + "/archivedata",
TorrentDir: os.TempDir() + "/torrents",
Port: 0,
}
// Share archive directory between all users
s.owner.communitiesManager.SetTorrentConfig(&torrentConfig)
s.bob.communitiesManager.SetTorrentConfig(&torrentConfig)
s.owner.config.messengerSignalsHandler = &MessengerSignalsHandlerMock{}
s.bob.config.messengerSignalsHandler = &MessengerSignalsHandlerMock{}
archiveIDs, err := s.owner.communitiesManager.CreateHistoryArchiveTorrentFromDB(community.ID(), topics, startDate, endDate, partition, community.Encrypted())
s.Require().NoError(err)
s.Require().Len(archiveIDs, 1)
community, err = s.owner.GetCommunityByID(community.ID())
s.Require().NoError(err)
// 4. Bob: join community (satisfying membership, but not channel permissions)
s.makeAddressSatisfyTheCriteria(testChainID1, bobAddress, communityPermission.TokenCriteria[0])
s.advertiseCommunityTo(community, s.bob)
waitForKeysDistributedToBob := s.waitOnKeyDistribution(func(sub *CommunityAndKeyActions) bool {
action := sub.keyActions.CommunityKeyAction
if action.ActionType != communities.EncryptionKeySendToMembers {
return false
}
_, ok := action.Members[s.bob.IdentityPublicKeyString()]
return ok
})
s.joinCommunity(community, s.bob, bobPassword, []string{})
err = <-waitForKeysDistributedToBob
s.Require().NoError(err)
// 5. Bob: Import community archive
// The archive is successfully decrypted, but the message inside is not.
// https://github.com/status-im/status-desktop/issues/13105 can be reproduced at this stage
// by forcing `encryption.ErrHashRatchetGroupIDNotFound` in `ExtractMessagesFromHistoryArchive` after decryption here:
// https://github.com/status-im/status-go/blob/6c82a6c2be7ebed93bcae3b9cf5053da3820de50/protocol/communities/manager.go#L4403
// Ensure owner has archive
archiveIndex, err := s.owner.communitiesManager.LoadHistoryArchiveIndexFromFile(s.owner.identity, community.ID())
s.Require().NoError(err)
s.Require().Len(archiveIndex.Archives, 1)
// Ensure bob has archive (because they share same local directory)
archiveIndex, err = s.bob.communitiesManager.LoadHistoryArchiveIndexFromFile(s.bob.identity, community.ID())
s.Require().NoError(err)
s.Require().Len(archiveIndex.Archives, 1)
archiveHash := maps.Keys(archiveIndex.Archives)[0]
// Save message archive ID as in
// https://github.com/status-im/status-go/blob/6c82a6c2be7ebed93bcae3b9cf5053da3820de50/protocol/communities/manager.go#L4325-L4336
err = s.bob.communitiesManager.SaveMessageArchiveID(community.ID(), archiveHash)
s.Require().NoError(err)
// Import archive
s.bob.importDelayer.once.Do(func() {
close(s.bob.importDelayer.wait)
})
cancel := make(chan struct{})
err = s.bob.importHistoryArchives(community.ID(), cancel)
s.Require().NoError(err)
// Ensure message1 wasn't imported, as it's encrypted, and we don't have access to the channel
receivedMessage1, err := s.bob.MessageByID(message1.ID)
s.Require().Nil(receivedMessage1)
s.Require().Error(err)
chatID := []byte(chat.ID)
hashRatchetMessagesCount, err := s.bob.persistence.GetHashRatchetMessagesCountForGroup(chatID)
s.Require().NoError(err)
s.Require().Equal(1, hashRatchetMessagesCount)
// Make bob satisfy channel criteria
waitOnChannelKeyToBeDistributedToBob := s.waitOnKeyDistribution(func(sub *CommunityAndKeyActions) bool {
action, ok := sub.keyActions.ChannelKeysActions[chat.CommunityChatID()]
if !ok || action.ActionType != communities.EncryptionKeySendToMembers {
return false
}
_, ok = action.Members[common.PubkeyToHex(&s.bob.identity.PublicKey)]
return ok
})
s.makeAddressSatisfyTheCriteria(testChainID1, bobAddress, channelPermission.TokenCriteria[0])
// force owner to reevaluate channel members
// in production it will happen automatically, by periodic check
err = s.owner.communitiesManager.ForceMembersReevaluation(community.ID())
s.Require().NoError(err)
err = <-waitOnChannelKeyToBeDistributedToBob
s.Require().NoError(err)
// Finally ensure that the message from archive was retrieved and decrypted
// NOTE: In theory a single RetrieveAll call should be enough,
// because we immediately process all hash ratchet messages
response, err = s.bob.RetrieveAll()
s.Require().NoError(err)
s.Require().Len(response.Messages(), 1)
receivedMessage1, ok := response.messages[message1.ID]
s.Require().True(ok)
s.Require().Equal(messageText1, receivedMessage1.Text)
}

View File

@ -26,8 +26,8 @@ var (
// This should not happen because the protocol forbids sending a message to
// non-paired devices, however, in theory it is possible to receive such a message.
ErrNotPairedDevice = errors.New("received a message from not paired device")
ErrHashRatchetSeqNoTooHigh = errors.New("Hash ratchet seq no is too high")
ErrHashRatchetGroupIDNotFound = errors.New("Hash ratchet group id not found")
ErrHashRatchetSeqNoTooHigh = errors.New("hash ratchet seq no is too high")
ErrHashRatchetGroupIDNotFound = errors.New("hash ratchet group id not found")
ErrNoEncryptionKey = errors.New("no encryption key found for the community")
)

View File

@ -36,6 +36,7 @@ import (
"github.com/status-im/status-go/protocol/communities"
"github.com/status-im/status-go/protocol/communities/token"
"github.com/status-im/status-go/protocol/discord"
"github.com/status-im/status-go/protocol/encryption"
"github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/requests"
"github.com/status-im/status-go/protocol/transport"
@ -152,7 +153,14 @@ func (m *Messenger) publishOrg(org *communities.Community, shouldRekey bool) err
rawMessage.HashRatchetGroupID = org.ID()
rawMessage.Recipients = members
}
_, err = m.sender.SendPublic(context.Background(), org.IDString(), rawMessage)
messageID, err := m.sender.SendPublic(context.Background(), org.IDString(), rawMessage)
if err == nil {
m.logger.Debug("published community",
zap.String("communityID", org.IDString()),
zap.String("messageID", hexutil.Encode(messageID)),
zap.Uint64("clock", org.Clock()),
)
}
return err
}
@ -366,7 +374,6 @@ func (m *Messenger) handleCommunitiesSubscription(c chan *communities.Subscripti
m.logger.Warn("failed to publish public shard info", zap.Error(err))
return
}
m.logger.Debug("published public shard info")
// signal client with published community
if m.config.messengerSignalsHandler != nil {
@ -3911,8 +3918,20 @@ func (m *Messenger) importHistoryArchives(communityID types.HexBytes, cancel cha
return nil
}
delayImport := false
importMessageArchivesLoop:
for {
if delayImport {
select {
case <-ctx.Done():
m.communitiesManager.LogStdout("interrupted importing history archive messages")
return nil
case <-time.After(1 * time.Hour):
delayImport = false
}
}
select {
case <-ctx.Done():
m.communitiesManager.LogStdout("interrupted importing history archive messages")
@ -3933,7 +3952,7 @@ importMessageArchivesLoop:
break importMessageArchivesLoop
}
m.communitiesManager.LogStdout(fmt.Sprintf("importing message archive, %d left", len(archiveIDsToImport)))
m.communitiesManager.LogStdout("importing message archive", zap.Int("left", len(archiveIDsToImport)))
// only process one archive at a time, so in case of cancel we don't
// wait for all archives to be processed first
@ -3941,6 +3960,12 @@ importMessageArchivesLoop:
archiveMessages, err := m.communitiesManager.ExtractMessagesFromHistoryArchive(communityID, downloadedArchiveID)
if err != nil {
if errors.Is(err, encryption.ErrHashRatchetGroupIDNotFound) {
// In case we're missing hash ratchet keys, best we can do is
// to wait for them to be received and try import again.
delayImport = true
continue
}
m.communitiesManager.LogStdout("failed to extract history archive messages", zap.Error(err))
continue
}
@ -4054,7 +4079,9 @@ func (m *Messenger) EnableCommunityHistoryArchiveProtocol() error {
if len(controlledCommunities) > 0 {
go m.InitHistoryArchiveTasks(controlledCommunities)
}
m.config.messengerSignalsHandler.HistoryArchivesProtocolEnabled()
if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.HistoryArchivesProtocolEnabled()
}
return nil
}
@ -4077,7 +4104,9 @@ func (m *Messenger) DisableCommunityHistoryArchiveProtocol() error {
if err != nil {
return err
}
m.config.messengerSignalsHandler.HistoryArchivesProtocolDisabled()
if m.config.messengerSignalsHandler != nil {
m.config.messengerSignalsHandler.HistoryArchivesProtocolDisabled()
}
return nil
}

View File

@ -4,6 +4,8 @@ import (
"context"
"errors"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
@ -59,7 +61,13 @@ func (m *Messenger) sendPublicCommunityShardInfo(community *communities.Communit
}
chatName := transport.CommunityShardInfoTopic(community.IDString())
_, err = m.sender.SendPublic(context.Background(), chatName, rawMessage)
messageID, err := m.sender.SendPublic(context.Background(), chatName, rawMessage)
if err == nil {
m.logger.Debug("published public community shard info",
zap.String("communityID", community.IDString()),
zap.String("messageID", hexutil.Encode(messageID)),
)
}
return err
}

View File

@ -67,6 +67,19 @@ func (m *MessengerSignalsHandlerMock) SendWakuBackedUpKeypair(*wakusync.WakuBack
func (m *MessengerSignalsHandlerMock) SendWakuBackedUpWatchOnlyAccount(*wakusync.WakuBackedUpDataResponse) {
}
func (m *MessengerSignalsHandlerMock) BackupPerformed(uint64) {}
func (m *MessengerSignalsHandlerMock) HistoryArchivesProtocolEnabled() {}
func (m *MessengerSignalsHandlerMock) HistoryArchivesProtocolDisabled() {}
func (m *MessengerSignalsHandlerMock) CreatingHistoryArchives(string) {}
func (m *MessengerSignalsHandlerMock) NoHistoryArchivesCreated(string, int, int) {}
func (m *MessengerSignalsHandlerMock) HistoryArchivesCreated(string, int, int) {}
func (m *MessengerSignalsHandlerMock) HistoryArchivesSeeding(string) {}
func (m *MessengerSignalsHandlerMock) HistoryArchivesUnseeded(string) {}
func (m *MessengerSignalsHandlerMock) HistoryArchiveDownloaded(string, int, int) {}
func (m *MessengerSignalsHandlerMock) DownloadingHistoryArchivesStarted(string) {}
func (m *MessengerSignalsHandlerMock) DownloadingHistoryArchivesFinished(string) {}
func (m *MessengerSignalsHandlerMock) ImportingHistoryArchiveMessages(string) {}
func (m *MessengerSignalsHandlerMock) MessengerResponse(response *MessengerResponse) {
// Non-blocking send
select {