refactor(communities): improve archive handling memory pressure

Instead of loading the entire torrent file into memory when trying
to extrract active messages, we now only read the chunks that are
necessary to decode any individual archive and then process
extracted messages in chunks.

This doesn't introduce a max cap of allowed memory yet, since the
chunk size depends entirely on the size of the archive, but this
will be done soon.
This commit is contained in:
Pascal Precht 2023-01-13 16:40:39 +01:00 committed by r4bbit
parent 8fd291f27c
commit 1bab7ae056
2 changed files with 93 additions and 72 deletions

View File

@ -2318,71 +2318,65 @@ func (m *Manager) GetMessageArchiveIDsToImport(communityID types.HexBytes) ([]st
return m.persistence.GetMessageArchiveIDsToImport(communityID)
}
func (m *Manager) ExtractMessagesFromHistoryArchives(communityID types.HexBytes, archiveIDs []string) (map[transport.Filter][]*types.Message, error) {
func (m *Manager) ExtractMessagesFromHistoryArchive(communityID types.HexBytes, archiveID string) ([]*protobuf.WakuMessage, error) {
id := communityID.String()
index, err := m.LoadHistoryArchiveIndexFromFile(m.identity, communityID)
if err != nil {
return nil, err
}
totalData, err := os.ReadFile(m.archiveDataFile(id))
dataFile, err := os.Open(m.archiveDataFile(id))
if err != nil {
return nil, err
}
defer dataFile.Close()
messages := make(map[transport.Filter][]*types.Message)
m.LogStdout("extracting messages from history archive", zap.String("archive id", archiveID))
metadata := index.Archives[archiveID]
for _, hash := range archiveIDs {
m.LogStdout("extracting messages from history archive", zap.String("archive id", hash))
metadata := index.Archives[hash]
_, err = dataFile.Seek(int64(metadata.Offset), 0)
if err != nil {
m.LogStdout("failed to seek archive data file", zap.Error(err))
return nil, err
}
archive := &protobuf.WakuMessageArchive{}
data := totalData[metadata.Offset : metadata.Offset+metadata.Size-metadata.Padding]
data := make([]byte, metadata.Size-metadata.Padding)
_, err = dataFile.Read(data)
if err != nil {
m.LogStdout("failed failed to read archive data", zap.Error(err))
return nil, err
}
err := proto.Unmarshal(data, archive)
archive := &protobuf.WakuMessageArchive{}
err = proto.Unmarshal(data, archive)
if err != nil {
// The archive data might eb encrypted so we try to decrypt instead first
var protocolMessage encryption.ProtocolMessage
err := proto.Unmarshal(data, &protocolMessage)
if err != nil {
// The archive data might eb encrypted so we try to decrypt instead first
var protocolMessage encryption.ProtocolMessage
err := proto.Unmarshal(data, &protocolMessage)
if err != nil {
m.LogStdout("failed to unmarshal protocol message", zap.Error(err))
continue
}
pk, err := crypto.DecompressPubkey(communityID)
if err != nil {
m.logger.Debug("failed to decompress community pubkey", zap.Error(err))
continue
}
decryptedBytes, err := m.encryptor.HandleMessage(m.identity, pk, &protocolMessage, make([]byte, 0))
if err != nil {
m.LogStdout("failed to decrypt message archive", zap.Error(err))
continue
}
err = proto.Unmarshal(decryptedBytes.DecryptedMessage, archive)
if err != nil {
m.LogStdout("failed to unmarshal message archive data", zap.Error(err))
return nil, err
}
m.LogStdout("failed to unmarshal protocol message", zap.Error(err))
return nil, err
}
for _, message := range archive.Messages {
filter := m.transport.FilterByTopic(message.Topic)
if filter != nil {
shhMessage := &types.Message{
Sig: message.Sig,
Timestamp: uint32(message.Timestamp),
Topic: types.BytesToTopic(message.Topic),
Payload: message.Payload,
Padding: message.Padding,
Hash: message.Hash,
ThirdPartyID: message.ThirdPartyId,
}
messages[*filter] = append(messages[*filter], shhMessage)
}
pk, err := crypto.DecompressPubkey(communityID)
if err != nil {
m.logger.Debug("failed to decompress community pubkey", zap.Error(err))
return nil, err
}
decryptedBytes, err := m.encryptor.HandleMessage(m.identity, pk, &protocolMessage, make([]byte, 0))
if err != nil {
m.LogStdout("failed to decrypt message archive", zap.Error(err))
return nil, err
}
err = proto.Unmarshal(decryptedBytes.DecryptedMessage, archive)
if err != nil {
m.LogStdout("failed to unmarshal message archive data", zap.Error(err))
return nil, err
}
}
return messages, nil
return archive.Messages, nil
}
func (m *Manager) SetMessageArchiveIDImported(communityID types.HexBytes, hash string, imported bool) error {

View File

@ -1028,36 +1028,15 @@ importMessageArchivesLoop:
// wait for all archives to be processed first
downloadedArchiveID := archiveIDsToImport[0]
messagesToHandle, err := m.communitiesManager.ExtractMessagesFromHistoryArchives(id, []string{downloadedArchiveID})
archiveMessages, err := m.communitiesManager.ExtractMessagesFromHistoryArchive(id, downloadedArchiveID)
if err != nil {
m.communitiesManager.LogStdout("failed to extract history archive messages", zap.Error(err))
continue
}
importedMessages := make(map[transport.Filter][]*types.Message, 0)
otherMessages := make(map[transport.Filter][]*types.Message, 0)
for filter, messages := range messagesToHandle {
for _, message := range messages {
if message.ThirdPartyID != "" {
importedMessages[filter] = append(importedMessages[filter], message)
} else {
otherMessages[filter] = append(otherMessages[filter], message)
}
}
}
m.config.messengerSignalsHandler.ImportingHistoryArchiveMessages(types.EncodeHex(id))
err = m.handleImportedMessages(importedMessages)
response, err := m.handleArchiveMessages(archiveMessages, id)
if err != nil {
m.communitiesManager.LogStdout("failed to handle imported messages", zap.Error(err))
continue
}
response, err := m.handleRetrievedMessages(otherMessages, false)
if err != nil {
m.communitiesManager.LogStdout("failed to write history archive messages to database", zap.Error(err))
m.communitiesManager.LogStdout("failed to handle archive messages", zap.Error(err))
continue
}
@ -1079,10 +1058,58 @@ importMessageArchivesLoop:
if err != nil {
m.communitiesManager.LogStdout("couldn't update last seen magnetlink", zap.Error(err))
}
m.config.messengerSignalsHandler.DownloadingHistoryArchivesFinished(types.EncodeHex(id))
}
func (m *Messenger) handleArchiveMessages(archiveMessages []*protobuf.WakuMessage, id types.HexBytes) (*MessengerResponse, error) {
messagesToHandle := make(map[transport.Filter][]*types.Message)
for _, message := range archiveMessages {
filter := m.transport.FilterByTopic(message.Topic)
if filter != nil {
shhMessage := &types.Message{
Sig: message.Sig,
Timestamp: uint32(message.Timestamp),
Topic: types.BytesToTopic(message.Topic),
Payload: message.Payload,
Padding: message.Padding,
Hash: message.Hash,
ThirdPartyID: message.ThirdPartyId,
}
messagesToHandle[*filter] = append(messagesToHandle[*filter], shhMessage)
}
}
importedMessages := make(map[transport.Filter][]*types.Message, 0)
otherMessages := make(map[transport.Filter][]*types.Message, 0)
for filter, messages := range messagesToHandle {
for _, message := range messages {
if message.ThirdPartyID != "" {
importedMessages[filter] = append(importedMessages[filter], message)
} else {
otherMessages[filter] = append(otherMessages[filter], message)
}
}
}
m.config.messengerSignalsHandler.ImportingHistoryArchiveMessages(types.EncodeHex(id))
err := m.handleImportedMessages(importedMessages)
if err != nil {
m.communitiesManager.LogStdout("failed to handle imported messages", zap.Error(err))
return nil, err
}
response, err := m.handleRetrievedMessages(otherMessages, false)
if err != nil {
m.communitiesManager.LogStdout("failed to write history archive messages to database", zap.Error(err))
return nil, err
}
return response, nil
}
func (m *Messenger) HandleCommunityCancelRequestToJoin(state *ReceivedMessageState, signer *ecdsa.PublicKey, cancelRequestToJoinProto protobuf.CommunityCancelRequestToJoin) error {
if cancelRequestToJoinProto.CommunityId == nil {
return errors.New("invalid community id")