From 1bab7ae05636a6fdc9e904de07bb31ad6f362e1a Mon Sep 17 00:00:00 2001 From: Pascal Precht <445106+0x-r4bbit@users.noreply.github.com> Date: Fri, 13 Jan 2023 16:40:39 +0100 Subject: [PATCH] refactor(communities): improve archive handling memory pressure Instead of loading the entire torrent file into memory when trying to extrract active messages, we now only read the chunks that are necessary to decode any individual archive and then process extracted messages in chunks. This doesn't introduce a max cap of allowed memory yet, since the chunk size depends entirely on the size of the archive, but this will be done soon. --- protocol/communities/manager.go | 88 +++++++++++++++------------------ protocol/messenger_handler.go | 77 +++++++++++++++++++---------- 2 files changed, 93 insertions(+), 72 deletions(-) diff --git a/protocol/communities/manager.go b/protocol/communities/manager.go index 3c000beff..70f42cd68 100644 --- a/protocol/communities/manager.go +++ b/protocol/communities/manager.go @@ -2318,71 +2318,65 @@ func (m *Manager) GetMessageArchiveIDsToImport(communityID types.HexBytes) ([]st return m.persistence.GetMessageArchiveIDsToImport(communityID) } -func (m *Manager) ExtractMessagesFromHistoryArchives(communityID types.HexBytes, archiveIDs []string) (map[transport.Filter][]*types.Message, error) { +func (m *Manager) ExtractMessagesFromHistoryArchive(communityID types.HexBytes, archiveID string) ([]*protobuf.WakuMessage, error) { id := communityID.String() index, err := m.LoadHistoryArchiveIndexFromFile(m.identity, communityID) if err != nil { return nil, err } - totalData, err := os.ReadFile(m.archiveDataFile(id)) + + dataFile, err := os.Open(m.archiveDataFile(id)) if err != nil { return nil, err } + defer dataFile.Close() - messages := make(map[transport.Filter][]*types.Message) + m.LogStdout("extracting messages from history archive", zap.String("archive id", archiveID)) + metadata := index.Archives[archiveID] - for _, hash := range archiveIDs { - m.LogStdout("extracting messages from history archive", zap.String("archive id", hash)) - metadata := index.Archives[hash] + _, err = dataFile.Seek(int64(metadata.Offset), 0) + if err != nil { + m.LogStdout("failed to seek archive data file", zap.Error(err)) + return nil, err + } - archive := &protobuf.WakuMessageArchive{} - data := totalData[metadata.Offset : metadata.Offset+metadata.Size-metadata.Padding] + data := make([]byte, metadata.Size-metadata.Padding) + _, err = dataFile.Read(data) + if err != nil { + m.LogStdout("failed failed to read archive data", zap.Error(err)) + return nil, err + } - err := proto.Unmarshal(data, archive) + archive := &protobuf.WakuMessageArchive{} + + err = proto.Unmarshal(data, archive) + if err != nil { + // The archive data might eb encrypted so we try to decrypt instead first + var protocolMessage encryption.ProtocolMessage + err := proto.Unmarshal(data, &protocolMessage) if err != nil { - // The archive data might eb encrypted so we try to decrypt instead first - var protocolMessage encryption.ProtocolMessage - err := proto.Unmarshal(data, &protocolMessage) - if err != nil { - m.LogStdout("failed to unmarshal protocol message", zap.Error(err)) - continue - } - - pk, err := crypto.DecompressPubkey(communityID) - if err != nil { - m.logger.Debug("failed to decompress community pubkey", zap.Error(err)) - continue - } - decryptedBytes, err := m.encryptor.HandleMessage(m.identity, pk, &protocolMessage, make([]byte, 0)) - if err != nil { - m.LogStdout("failed to decrypt message archive", zap.Error(err)) - continue - } - err = proto.Unmarshal(decryptedBytes.DecryptedMessage, archive) - if err != nil { - m.LogStdout("failed to unmarshal message archive data", zap.Error(err)) - return nil, err - } + m.LogStdout("failed to unmarshal protocol message", zap.Error(err)) + return nil, err } - for _, message := range archive.Messages { - filter := m.transport.FilterByTopic(message.Topic) - if filter != nil { - shhMessage := &types.Message{ - Sig: message.Sig, - Timestamp: uint32(message.Timestamp), - Topic: types.BytesToTopic(message.Topic), - Payload: message.Payload, - Padding: message.Padding, - Hash: message.Hash, - ThirdPartyID: message.ThirdPartyId, - } - messages[*filter] = append(messages[*filter], shhMessage) - } + pk, err := crypto.DecompressPubkey(communityID) + if err != nil { + m.logger.Debug("failed to decompress community pubkey", zap.Error(err)) + return nil, err + } + decryptedBytes, err := m.encryptor.HandleMessage(m.identity, pk, &protocolMessage, make([]byte, 0)) + if err != nil { + m.LogStdout("failed to decrypt message archive", zap.Error(err)) + return nil, err + } + err = proto.Unmarshal(decryptedBytes.DecryptedMessage, archive) + if err != nil { + m.LogStdout("failed to unmarshal message archive data", zap.Error(err)) + return nil, err } } - return messages, nil + return archive.Messages, nil } func (m *Manager) SetMessageArchiveIDImported(communityID types.HexBytes, hash string, imported bool) error { diff --git a/protocol/messenger_handler.go b/protocol/messenger_handler.go index e67eeda83..3c25e83c9 100644 --- a/protocol/messenger_handler.go +++ b/protocol/messenger_handler.go @@ -1028,36 +1028,15 @@ importMessageArchivesLoop: // wait for all archives to be processed first downloadedArchiveID := archiveIDsToImport[0] - messagesToHandle, err := m.communitiesManager.ExtractMessagesFromHistoryArchives(id, []string{downloadedArchiveID}) + archiveMessages, err := m.communitiesManager.ExtractMessagesFromHistoryArchive(id, downloadedArchiveID) if err != nil { m.communitiesManager.LogStdout("failed to extract history archive messages", zap.Error(err)) continue } - importedMessages := make(map[transport.Filter][]*types.Message, 0) - otherMessages := make(map[transport.Filter][]*types.Message, 0) - - for filter, messages := range messagesToHandle { - for _, message := range messages { - if message.ThirdPartyID != "" { - importedMessages[filter] = append(importedMessages[filter], message) - } else { - otherMessages[filter] = append(otherMessages[filter], message) - } - } - } - - m.config.messengerSignalsHandler.ImportingHistoryArchiveMessages(types.EncodeHex(id)) - - err = m.handleImportedMessages(importedMessages) + response, err := m.handleArchiveMessages(archiveMessages, id) if err != nil { - m.communitiesManager.LogStdout("failed to handle imported messages", zap.Error(err)) - continue - } - - response, err := m.handleRetrievedMessages(otherMessages, false) - if err != nil { - m.communitiesManager.LogStdout("failed to write history archive messages to database", zap.Error(err)) + m.communitiesManager.LogStdout("failed to handle archive messages", zap.Error(err)) continue } @@ -1079,10 +1058,58 @@ importMessageArchivesLoop: if err != nil { m.communitiesManager.LogStdout("couldn't update last seen magnetlink", zap.Error(err)) } - m.config.messengerSignalsHandler.DownloadingHistoryArchivesFinished(types.EncodeHex(id)) } +func (m *Messenger) handleArchiveMessages(archiveMessages []*protobuf.WakuMessage, id types.HexBytes) (*MessengerResponse, error) { + + messagesToHandle := make(map[transport.Filter][]*types.Message) + + for _, message := range archiveMessages { + filter := m.transport.FilterByTopic(message.Topic) + if filter != nil { + shhMessage := &types.Message{ + Sig: message.Sig, + Timestamp: uint32(message.Timestamp), + Topic: types.BytesToTopic(message.Topic), + Payload: message.Payload, + Padding: message.Padding, + Hash: message.Hash, + ThirdPartyID: message.ThirdPartyId, + } + messagesToHandle[*filter] = append(messagesToHandle[*filter], shhMessage) + } + } + + importedMessages := make(map[transport.Filter][]*types.Message, 0) + otherMessages := make(map[transport.Filter][]*types.Message, 0) + + for filter, messages := range messagesToHandle { + for _, message := range messages { + if message.ThirdPartyID != "" { + importedMessages[filter] = append(importedMessages[filter], message) + } else { + otherMessages[filter] = append(otherMessages[filter], message) + } + } + } + + m.config.messengerSignalsHandler.ImportingHistoryArchiveMessages(types.EncodeHex(id)) + err := m.handleImportedMessages(importedMessages) + if err != nil { + m.communitiesManager.LogStdout("failed to handle imported messages", zap.Error(err)) + return nil, err + } + + response, err := m.handleRetrievedMessages(otherMessages, false) + if err != nil { + m.communitiesManager.LogStdout("failed to write history archive messages to database", zap.Error(err)) + return nil, err + } + + return response, nil +} + func (m *Messenger) HandleCommunityCancelRequestToJoin(state *ReceivedMessageState, signer *ecdsa.PublicKey, cancelRequestToJoinProto protobuf.CommunityCancelRequestToJoin) error { if cancelRequestToJoinProto.CommunityId == nil { return errors.New("invalid community id")