diff --git a/protocol/communities/manager_archive.go b/protocol/communities/manager_archive.go new file mode 100644 index 000000000..04e0d5436 --- /dev/null +++ b/protocol/communities/manager_archive.go @@ -0,0 +1,513 @@ +package communities + +import ( + "crypto/ecdsa" + "go.uber.org/zap" + "os" + "path" + "time" + + "github.com/status-im/status-go/eth-node/crypto" + "github.com/status-im/status-go/eth-node/types" + "github.com/status-im/status-go/params" + "github.com/status-im/status-go/protocol/common" + "github.com/status-im/status-go/protocol/encryption" + "github.com/status-im/status-go/protocol/protobuf" + "github.com/status-im/status-go/signal" + + "github.com/anacrolix/torrent/bencode" + "github.com/anacrolix/torrent/metainfo" + "github.com/golang/protobuf/proto" +) + +type ArchiveManager struct { + torrentConfig *params.TorrentConfig + + logger *zap.Logger + stdoutLogger *zap.Logger + + persistence *Persistence + identity *ecdsa.PrivateKey + encryptor *encryption.Protocol + + publisher Publisher +} + +func NewArchiveManager(torrentConfig *params.TorrentConfig, logger, stdoutLogger *zap.Logger, persistence *Persistence, identity *ecdsa.PrivateKey, encryptor *encryption.Protocol, publisher Publisher) *ArchiveManager { + return &ArchiveManager{ + torrentConfig: torrentConfig, + logger: logger, + stdoutLogger: stdoutLogger, + persistence: persistence, + identity: identity, + encryptor: encryptor, + publisher: publisher, + } +} + +// LogStdout appears to be some kind of debug tool specifically for torrent functionality +func (m *ArchiveManager) LogStdout(msg string, fields ...zap.Field) { + m.stdoutLogger.Info(msg, fields...) + m.logger.Debug(msg, fields...) +} + +func (m *ArchiveManager) createHistoryArchiveTorrent(communityID types.HexBytes, msgs []*types.Message, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) { + + loadFromDB := len(msgs) == 0 + + from := startDate + to := from.Add(partition) + if to.After(endDate) { + to = endDate + } + + archiveDir := m.torrentConfig.DataDir + "/" + communityID.String() + torrentDir := m.torrentConfig.TorrentDir + indexPath := archiveDir + "/index" + dataPath := archiveDir + "/data" + + wakuMessageArchiveIndexProto := &protobuf.WakuMessageArchiveIndex{} + wakuMessageArchiveIndex := make(map[string]*protobuf.WakuMessageArchiveIndexMetadata) + archiveIDs := make([]string, 0) + + if _, err := os.Stat(archiveDir); os.IsNotExist(err) { + err := os.MkdirAll(archiveDir, 0700) + if err != nil { + return archiveIDs, err + } + } + if _, err := os.Stat(torrentDir); os.IsNotExist(err) { + err := os.MkdirAll(torrentDir, 0700) + if err != nil { + return archiveIDs, err + } + } + + _, err := os.Stat(indexPath) + if err == nil { + wakuMessageArchiveIndexProto, err = m.LoadHistoryArchiveIndexFromFile(m.identity, communityID) + if err != nil { + return archiveIDs, err + } + } + + var offset uint64 = 0 + + for hash, metadata := range wakuMessageArchiveIndexProto.Archives { + offset = offset + metadata.Size + wakuMessageArchiveIndex[hash] = metadata + } + + var encodedArchives []*EncodedArchiveData + topicsAsByteArrays := topicsAsByteArrays(topics) + + m.publisher.publish(&Subscription{CreatingHistoryArchivesSignal: &signal.CreatingHistoryArchivesSignal{ + CommunityID: communityID.String(), + }}) + + m.LogStdout("creating archives", + zap.Any("startDate", startDate), + zap.Any("endDate", endDate), + zap.Duration("partition", partition), + ) + for { + if from.Equal(endDate) || from.After(endDate) { + break + } + m.LogStdout("creating message archive", + zap.Any("from", from), + zap.Any("to", to), + ) + + var messages []types.Message + if loadFromDB { + messages, err = m.persistence.GetWakuMessagesByFilterTopic(topics, uint64(from.Unix()), uint64(to.Unix())) + if err != nil { + return archiveIDs, err + } + } else { + for _, msg := range msgs { + if int64(msg.Timestamp) >= from.Unix() && int64(msg.Timestamp) < to.Unix() { + messages = append(messages, *msg) + } + } + } + + if len(messages) == 0 { + // No need to create an archive with zero messages + m.LogStdout("no messages in this partition") + from = to + to = to.Add(partition) + if to.After(endDate) { + to = endDate + } + continue + } + + m.LogStdout("creating archive with messages", zap.Int("messagesCount", len(messages))) + + // Not only do we partition messages, we also chunk them + // roughly by size, such that each chunk will not exceed a given + // size and archive data doesn't get too big + messageChunks := make([][]types.Message, 0) + currentChunkSize := 0 + currentChunk := make([]types.Message, 0) + + for _, msg := range messages { + msgSize := len(msg.Payload) + len(msg.Sig) + if msgSize > maxArchiveSizeInBytes { + // we drop messages this big + continue + } + + if currentChunkSize+msgSize > maxArchiveSizeInBytes { + messageChunks = append(messageChunks, currentChunk) + currentChunk = make([]types.Message, 0) + currentChunkSize = 0 + } + currentChunk = append(currentChunk, msg) + currentChunkSize = currentChunkSize + msgSize + } + messageChunks = append(messageChunks, currentChunk) + + for _, messages := range messageChunks { + wakuMessageArchive := m.createWakuMessageArchive(from, to, messages, topicsAsByteArrays) + encodedArchive, err := proto.Marshal(wakuMessageArchive) + if err != nil { + return archiveIDs, err + } + + if encrypt { + messageSpec, err := m.encryptor.BuildHashRatchetMessage(communityID, encodedArchive) + if err != nil { + return archiveIDs, err + } + + encodedArchive, err = proto.Marshal(messageSpec.Message) + if err != nil { + return archiveIDs, err + } + } + + rawSize := len(encodedArchive) + padding := 0 + size := 0 + + if rawSize > pieceLength { + size = rawSize + pieceLength - (rawSize % pieceLength) + padding = size - rawSize + } else { + padding = pieceLength - rawSize + size = rawSize + padding + } + + wakuMessageArchiveIndexMetadata := &protobuf.WakuMessageArchiveIndexMetadata{ + Metadata: wakuMessageArchive.Metadata, + Offset: offset, + Size: uint64(size), + Padding: uint64(padding), + } + + wakuMessageArchiveIndexMetadataBytes, err := proto.Marshal(wakuMessageArchiveIndexMetadata) + if err != nil { + return archiveIDs, err + } + + archiveID := crypto.Keccak256Hash(wakuMessageArchiveIndexMetadataBytes).String() + archiveIDs = append(archiveIDs, archiveID) + wakuMessageArchiveIndex[archiveID] = wakuMessageArchiveIndexMetadata + encodedArchives = append(encodedArchives, &EncodedArchiveData{bytes: encodedArchive, padding: padding}) + offset = offset + uint64(rawSize) + uint64(padding) + } + + from = to + to = to.Add(partition) + if to.After(endDate) { + to = endDate + } + } + + if len(encodedArchives) > 0 { + + dataBytes := make([]byte, 0) + + for _, encodedArchiveData := range encodedArchives { + dataBytes = append(dataBytes, encodedArchiveData.bytes...) + dataBytes = append(dataBytes, make([]byte, encodedArchiveData.padding)...) + } + + wakuMessageArchiveIndexProto.Archives = wakuMessageArchiveIndex + indexBytes, err := proto.Marshal(wakuMessageArchiveIndexProto) + if err != nil { + return archiveIDs, err + } + + if encrypt { + messageSpec, err := m.encryptor.BuildHashRatchetMessage(communityID, indexBytes) + if err != nil { + return archiveIDs, err + } + indexBytes, err = proto.Marshal(messageSpec.Message) + if err != nil { + return archiveIDs, err + } + } + + err = os.WriteFile(indexPath, indexBytes, 0644) // nolint: gosec + if err != nil { + return archiveIDs, err + } + + file, err := os.OpenFile(dataPath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + return archiveIDs, err + } + defer file.Close() + + _, err = file.Write(dataBytes) + if err != nil { + return archiveIDs, err + } + + metaInfo := metainfo.MetaInfo{ + AnnounceList: defaultAnnounceList, + } + metaInfo.SetDefaults() + metaInfo.CreatedBy = common.PubkeyToHex(&m.identity.PublicKey) + + info := metainfo.Info{ + PieceLength: int64(pieceLength), + } + + err = info.BuildFromFilePath(archiveDir) + if err != nil { + return archiveIDs, err + } + + metaInfo.InfoBytes, err = bencode.Marshal(info) + if err != nil { + return archiveIDs, err + } + + metaInfoBytes, err := bencode.Marshal(metaInfo) + if err != nil { + return archiveIDs, err + } + + err = os.WriteFile(torrentFile(m.torrentConfig.TorrentDir, communityID.String()), metaInfoBytes, 0644) // nolint: gosec + if err != nil { + return archiveIDs, err + } + + m.LogStdout("torrent created", zap.Any("from", startDate.Unix()), zap.Any("to", endDate.Unix())) + + m.publisher.publish(&Subscription{ + HistoryArchivesCreatedSignal: &signal.HistoryArchivesCreatedSignal{ + CommunityID: communityID.String(), + From: int(startDate.Unix()), + To: int(endDate.Unix()), + }, + }) + } else { + m.LogStdout("no archives created") + m.publisher.publish(&Subscription{ + NoHistoryArchivesCreatedSignal: &signal.NoHistoryArchivesCreatedSignal{ + CommunityID: communityID.String(), + From: int(startDate.Unix()), + To: int(endDate.Unix()), + }, + }) + } + + lastMessageArchiveEndDate, err := m.persistence.GetLastMessageArchiveEndDate(communityID) + if err != nil { + return archiveIDs, err + } + + if lastMessageArchiveEndDate > 0 { + err = m.persistence.UpdateLastMessageArchiveEndDate(communityID, uint64(from.Unix())) + } else { + err = m.persistence.SaveLastMessageArchiveEndDate(communityID, uint64(from.Unix())) + } + if err != nil { + return archiveIDs, err + } + return archiveIDs, nil +} + +func (m *ArchiveManager) archiveIndexFile(communityID string) string { + return path.Join(m.torrentConfig.DataDir, communityID, "index") +} + +func (m *ArchiveManager) createWakuMessageArchive(from time.Time, to time.Time, messages []types.Message, topics [][]byte) *protobuf.WakuMessageArchive { + var wakuMessages []*protobuf.WakuMessage + + for _, msg := range messages { + topic := types.TopicTypeToByteArray(msg.Topic) + wakuMessage := &protobuf.WakuMessage{ + Sig: msg.Sig, + Timestamp: uint64(msg.Timestamp), + Topic: topic, + Payload: msg.Payload, + Padding: msg.Padding, + Hash: msg.Hash, + ThirdPartyId: msg.ThirdPartyID, + } + wakuMessages = append(wakuMessages, wakuMessage) + } + + metadata := protobuf.WakuMessageArchiveMetadata{ + From: uint64(from.Unix()), + To: uint64(to.Unix()), + ContentTopic: topics, + } + + wakuMessageArchive := &protobuf.WakuMessageArchive{ + Metadata: &metadata, + Messages: wakuMessages, + } + return wakuMessageArchive +} + +func (m *ArchiveManager) CreateHistoryArchiveTorrentFromMessages(communityID types.HexBytes, messages []*types.Message, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) { + return m.createHistoryArchiveTorrent(communityID, messages, topics, startDate, endDate, partition, encrypt) +} + +func (m *ArchiveManager) CreateHistoryArchiveTorrentFromDB(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) { + return m.createHistoryArchiveTorrent(communityID, make([]*types.Message, 0), topics, startDate, endDate, partition, encrypt) +} + +func (m *ArchiveManager) GetMessageArchiveIDsToImport(communityID types.HexBytes) ([]string, error) { + return m.persistence.GetMessageArchiveIDsToImport(communityID) +} + +func (m *ArchiveManager) SaveMessageArchiveID(communityID types.HexBytes, hash string) error { + return m.persistence.SaveMessageArchiveID(communityID, hash) +} + +func (m *ArchiveManager) SetMessageArchiveIDImported(communityID types.HexBytes, hash string, imported bool) error { + return m.persistence.SetMessageArchiveIDImported(communityID, hash, imported) +} + +func (m *ArchiveManager) GetHistoryArchiveMagnetlink(communityID types.HexBytes) (string, error) { + id := communityID.String() + torrentFile := torrentFile(m.torrentConfig.TorrentDir, id) + + metaInfo, err := metainfo.LoadFromFile(torrentFile) + if err != nil { + return "", err + } + + info, err := metaInfo.UnmarshalInfo() + if err != nil { + return "", err + } + + return metaInfo.Magnet(nil, &info).String(), nil +} + +func (m *ArchiveManager) archiveDataFile(communityID string) string { + return path.Join(m.torrentConfig.DataDir, communityID, "data") +} + +func (m *ArchiveManager) ExtractMessagesFromHistoryArchive(communityID types.HexBytes, archiveID string) ([]*protobuf.WakuMessage, error) { + id := communityID.String() + + index, err := m.LoadHistoryArchiveIndexFromFile(m.identity, communityID) + if err != nil { + return nil, err + } + + dataFile, err := os.Open(m.archiveDataFile(id)) + if err != nil { + return nil, err + } + defer dataFile.Close() + + m.LogStdout("extracting messages from history archive", + zap.String("communityID", communityID.String()), + zap.String("archiveID", archiveID)) + metadata := index.Archives[archiveID] + + _, err = dataFile.Seek(int64(metadata.Offset), 0) + if err != nil { + m.LogStdout("failed to seek archive data file", zap.Error(err)) + return nil, err + } + + data := make([]byte, metadata.Size-metadata.Padding) + m.LogStdout("loading history archive data into memory", zap.Float64("data_size_MB", float64(metadata.Size-metadata.Padding)/1024.0/1024.0)) + _, err = dataFile.Read(data) + if err != nil { + m.LogStdout("failed failed to read archive data", zap.Error(err)) + return nil, err + } + + archive := &protobuf.WakuMessageArchive{} + + err = proto.Unmarshal(data, archive) + if err != nil { + // The archive data might eb encrypted so we try to decrypt instead first + var protocolMessage encryption.ProtocolMessage + err := proto.Unmarshal(data, &protocolMessage) + if err != nil { + m.LogStdout("failed to unmarshal protocol message", zap.Error(err)) + return nil, err + } + + pk, err := crypto.DecompressPubkey(communityID) + if err != nil { + m.logger.Debug("failed to decompress community pubkey", zap.Error(err)) + return nil, err + } + decryptedBytes, err := m.encryptor.HandleMessage(m.identity, pk, &protocolMessage, make([]byte, 0)) + if err != nil { + m.LogStdout("failed to decrypt message archive", zap.Error(err)) + return nil, err + } + err = proto.Unmarshal(decryptedBytes.DecryptedMessage, archive) + if err != nil { + m.LogStdout("failed to unmarshal message archive", zap.Error(err)) + return nil, err + } + } + return archive.Messages, nil +} + +func (m *ArchiveManager) LoadHistoryArchiveIndexFromFile(myKey *ecdsa.PrivateKey, communityID types.HexBytes) (*protobuf.WakuMessageArchiveIndex, error) { + wakuMessageArchiveIndexProto := &protobuf.WakuMessageArchiveIndex{} + + indexPath := m.archiveIndexFile(communityID.String()) + indexData, err := os.ReadFile(indexPath) + if err != nil { + return nil, err + } + + err = proto.Unmarshal(indexData, wakuMessageArchiveIndexProto) + if err != nil { + return nil, err + } + + if len(wakuMessageArchiveIndexProto.Archives) == 0 && len(indexData) > 0 { + // This means we're dealing with an encrypted index file, so we have to decrypt it first + var protocolMessage encryption.ProtocolMessage + err := proto.Unmarshal(indexData, &protocolMessage) + if err != nil { + return nil, err + } + pk, err := crypto.DecompressPubkey(communityID) + if err != nil { + return nil, err + } + decryptedBytes, err := m.encryptor.HandleMessage(myKey, pk, &protocolMessage, make([]byte, 0)) + if err != nil { + return nil, err + } + err = proto.Unmarshal(decryptedBytes.DecryptedMessage, wakuMessageArchiveIndexProto) + if err != nil { + return nil, err + } + } + + return wakuMessageArchiveIndexProto, nil +} diff --git a/protocol/communities/torrent_manager.go b/protocol/communities/torrent_manager.go index 59c5e328f..91693c452 100644 --- a/protocol/communities/torrent_manager.go +++ b/protocol/communities/torrent_manager.go @@ -12,17 +12,12 @@ import ( "time" "github.com/anacrolix/torrent" - "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/metainfo" - "github.com/golang/protobuf/proto" "go.uber.org/zap" - "github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/params" - "github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/encryption" - "github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/transport" "github.com/status-im/status-go/signal" ) @@ -73,6 +68,7 @@ type TorrentManager struct { identity *ecdsa.PrivateKey encryptor *encryption.Protocol + *ArchiveManager publisher Publisher } @@ -95,7 +91,8 @@ func NewTorrentManager(torrentConfig *params.TorrentConfig, logger *zap.Logger, identity: identity, encryptor: encryptor, - publisher: publisher, + publisher: publisher, + ArchiveManager: NewArchiveManager(torrentConfig, logger, stdoutLogger, persistence, identity, encryptor, publisher), }, nil } @@ -318,7 +315,7 @@ func (m *TorrentManager) GetHistoryArchivePartitionStartTimestamp(communityID ty func (m *TorrentManager) CreateAndSeedHistoryArchive(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) error { m.UnseedHistoryArchiveTorrent(communityID) - _, err := m.CreateHistoryArchiveTorrentFromDB(communityID, topics, startDate, endDate, partition, encrypt) + _, err := m.ArchiveManager.CreateHistoryArchiveTorrentFromDB(communityID, topics, startDate, endDate, partition, encrypt) if err != nil { return err } @@ -400,303 +397,11 @@ func (m *TorrentManager) StopHistoryArchiveTasksInterval(communityID types.HexBy } } -func (m *TorrentManager) CreateHistoryArchiveTorrentFromMessages(communityID types.HexBytes, messages []*types.Message, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) { - return m.createHistoryArchiveTorrent(communityID, messages, topics, startDate, endDate, partition, encrypt) -} - -func (m *TorrentManager) CreateHistoryArchiveTorrentFromDB(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) { - return m.createHistoryArchiveTorrent(communityID, make([]*types.Message, 0), topics, startDate, endDate, partition, encrypt) -} - -func (m *TorrentManager) createHistoryArchiveTorrent(communityID types.HexBytes, msgs []*types.Message, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) { - - loadFromDB := len(msgs) == 0 - - from := startDate - to := from.Add(partition) - if to.After(endDate) { - to = endDate - } - - archiveDir := m.torrentConfig.DataDir + "/" + communityID.String() - torrentDir := m.torrentConfig.TorrentDir - indexPath := archiveDir + "/index" - dataPath := archiveDir + "/data" - - wakuMessageArchiveIndexProto := &protobuf.WakuMessageArchiveIndex{} - wakuMessageArchiveIndex := make(map[string]*protobuf.WakuMessageArchiveIndexMetadata) - archiveIDs := make([]string, 0) - - if _, err := os.Stat(archiveDir); os.IsNotExist(err) { - err := os.MkdirAll(archiveDir, 0700) - if err != nil { - return archiveIDs, err - } - } - if _, err := os.Stat(torrentDir); os.IsNotExist(err) { - err := os.MkdirAll(torrentDir, 0700) - if err != nil { - return archiveIDs, err - } - } - - _, err := os.Stat(indexPath) - if err == nil { - wakuMessageArchiveIndexProto, err = m.LoadHistoryArchiveIndexFromFile(m.identity, communityID) - if err != nil { - return archiveIDs, err - } - } - - var offset uint64 = 0 - - for hash, metadata := range wakuMessageArchiveIndexProto.Archives { - offset = offset + metadata.Size - wakuMessageArchiveIndex[hash] = metadata - } - - var encodedArchives []*EncodedArchiveData - topicsAsByteArrays := topicsAsByteArrays(topics) - - m.publisher.publish(&Subscription{CreatingHistoryArchivesSignal: &signal.CreatingHistoryArchivesSignal{ - CommunityID: communityID.String(), - }}) - - m.LogStdout("creating archives", - zap.Any("startDate", startDate), - zap.Any("endDate", endDate), - zap.Duration("partition", partition), - ) - for { - if from.Equal(endDate) || from.After(endDate) { - break - } - m.LogStdout("creating message archive", - zap.Any("from", from), - zap.Any("to", to), - ) - - var messages []types.Message - if loadFromDB { - messages, err = m.persistence.GetWakuMessagesByFilterTopic(topics, uint64(from.Unix()), uint64(to.Unix())) - if err != nil { - return archiveIDs, err - } - } else { - for _, msg := range msgs { - if int64(msg.Timestamp) >= from.Unix() && int64(msg.Timestamp) < to.Unix() { - messages = append(messages, *msg) - } - } - } - - if len(messages) == 0 { - // No need to create an archive with zero messages - m.LogStdout("no messages in this partition") - from = to - to = to.Add(partition) - if to.After(endDate) { - to = endDate - } - continue - } - - m.LogStdout("creating archive with messages", zap.Int("messagesCount", len(messages))) - - // Not only do we partition messages, we also chunk them - // roughly by size, such that each chunk will not exceed a given - // size and archive data doesn't get too big - messageChunks := make([][]types.Message, 0) - currentChunkSize := 0 - currentChunk := make([]types.Message, 0) - - for _, msg := range messages { - msgSize := len(msg.Payload) + len(msg.Sig) - if msgSize > maxArchiveSizeInBytes { - // we drop messages this big - continue - } - - if currentChunkSize+msgSize > maxArchiveSizeInBytes { - messageChunks = append(messageChunks, currentChunk) - currentChunk = make([]types.Message, 0) - currentChunkSize = 0 - } - currentChunk = append(currentChunk, msg) - currentChunkSize = currentChunkSize + msgSize - } - messageChunks = append(messageChunks, currentChunk) - - for _, messages := range messageChunks { - wakuMessageArchive := m.createWakuMessageArchive(from, to, messages, topicsAsByteArrays) - encodedArchive, err := proto.Marshal(wakuMessageArchive) - if err != nil { - return archiveIDs, err - } - - if encrypt { - messageSpec, err := m.encryptor.BuildHashRatchetMessage(communityID, encodedArchive) - if err != nil { - return archiveIDs, err - } - - encodedArchive, err = proto.Marshal(messageSpec.Message) - if err != nil { - return archiveIDs, err - } - } - - rawSize := len(encodedArchive) - padding := 0 - size := 0 - - if rawSize > pieceLength { - size = rawSize + pieceLength - (rawSize % pieceLength) - padding = size - rawSize - } else { - padding = pieceLength - rawSize - size = rawSize + padding - } - - wakuMessageArchiveIndexMetadata := &protobuf.WakuMessageArchiveIndexMetadata{ - Metadata: wakuMessageArchive.Metadata, - Offset: offset, - Size: uint64(size), - Padding: uint64(padding), - } - - wakuMessageArchiveIndexMetadataBytes, err := proto.Marshal(wakuMessageArchiveIndexMetadata) - if err != nil { - return archiveIDs, err - } - - archiveID := crypto.Keccak256Hash(wakuMessageArchiveIndexMetadataBytes).String() - archiveIDs = append(archiveIDs, archiveID) - wakuMessageArchiveIndex[archiveID] = wakuMessageArchiveIndexMetadata - encodedArchives = append(encodedArchives, &EncodedArchiveData{bytes: encodedArchive, padding: padding}) - offset = offset + uint64(rawSize) + uint64(padding) - } - - from = to - to = to.Add(partition) - if to.After(endDate) { - to = endDate - } - } - - if len(encodedArchives) > 0 { - - dataBytes := make([]byte, 0) - - for _, encodedArchiveData := range encodedArchives { - dataBytes = append(dataBytes, encodedArchiveData.bytes...) - dataBytes = append(dataBytes, make([]byte, encodedArchiveData.padding)...) - } - - wakuMessageArchiveIndexProto.Archives = wakuMessageArchiveIndex - indexBytes, err := proto.Marshal(wakuMessageArchiveIndexProto) - if err != nil { - return archiveIDs, err - } - - if encrypt { - messageSpec, err := m.encryptor.BuildHashRatchetMessage(communityID, indexBytes) - if err != nil { - return archiveIDs, err - } - indexBytes, err = proto.Marshal(messageSpec.Message) - if err != nil { - return archiveIDs, err - } - } - - err = os.WriteFile(indexPath, indexBytes, 0644) // nolint: gosec - if err != nil { - return archiveIDs, err - } - - file, err := os.OpenFile(dataPath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) - if err != nil { - return archiveIDs, err - } - defer file.Close() - - _, err = file.Write(dataBytes) - if err != nil { - return archiveIDs, err - } - - metaInfo := metainfo.MetaInfo{ - AnnounceList: defaultAnnounceList, - } - metaInfo.SetDefaults() - metaInfo.CreatedBy = common.PubkeyToHex(&m.identity.PublicKey) - - info := metainfo.Info{ - PieceLength: int64(pieceLength), - } - - err = info.BuildFromFilePath(archiveDir) - if err != nil { - return archiveIDs, err - } - - metaInfo.InfoBytes, err = bencode.Marshal(info) - if err != nil { - return archiveIDs, err - } - - metaInfoBytes, err := bencode.Marshal(metaInfo) - if err != nil { - return archiveIDs, err - } - - err = os.WriteFile(m.torrentFile(communityID.String()), metaInfoBytes, 0644) // nolint: gosec - if err != nil { - return archiveIDs, err - } - - m.LogStdout("torrent created", zap.Any("from", startDate.Unix()), zap.Any("to", endDate.Unix())) - - m.publisher.publish(&Subscription{ - HistoryArchivesCreatedSignal: &signal.HistoryArchivesCreatedSignal{ - CommunityID: communityID.String(), - From: int(startDate.Unix()), - To: int(endDate.Unix()), - }, - }) - } else { - m.LogStdout("no archives created") - m.publisher.publish(&Subscription{ - NoHistoryArchivesCreatedSignal: &signal.NoHistoryArchivesCreatedSignal{ - CommunityID: communityID.String(), - From: int(startDate.Unix()), - To: int(endDate.Unix()), - }, - }) - } - - lastMessageArchiveEndDate, err := m.persistence.GetLastMessageArchiveEndDate(communityID) - if err != nil { - return archiveIDs, err - } - - if lastMessageArchiveEndDate > 0 { - err = m.persistence.UpdateLastMessageArchiveEndDate(communityID, uint64(from.Unix())) - } else { - err = m.persistence.SaveLastMessageArchiveEndDate(communityID, uint64(from.Unix())) - } - if err != nil { - return archiveIDs, err - } - return archiveIDs, nil -} - func (m *TorrentManager) SeedHistoryArchiveTorrent(communityID types.HexBytes) error { m.UnseedHistoryArchiveTorrent(communityID) id := communityID.String() - torrentFile := m.torrentFile(id) + torrentFile := torrentFile(m.torrentConfig.TorrentDir, id) metaInfo, err := metainfo.LoadFromFile(torrentFile) if err != nil { @@ -828,7 +533,7 @@ func (m *TorrentManager) DownloadHistoryArchivesByMagnetlink(communityID types.H case <-ticker.C: if indexFile.BytesCompleted() == indexFile.Length() { - index, err := m.LoadHistoryArchiveIndexFromFile(m.identity, communityID) + index, err := m.ArchiveManager.LoadHistoryArchiveIndexFromFile(m.identity, communityID) if err != nil { return nil, err } @@ -941,183 +646,11 @@ func (m *TorrentManager) DownloadHistoryArchivesByMagnetlink(communityID types.H } } -func (m *TorrentManager) SaveMessageArchiveID(communityID types.HexBytes, hash string) error { - return m.persistence.SaveMessageArchiveID(communityID, hash) -} - -func (m *TorrentManager) GetMessageArchiveIDsToImport(communityID types.HexBytes) ([]string, error) { - return m.persistence.GetMessageArchiveIDsToImport(communityID) -} -func (m *TorrentManager) SetMessageArchiveIDImported(communityID types.HexBytes, hash string, imported bool) error { - return m.persistence.SetMessageArchiveIDImported(communityID, hash, imported) -} - -func (m *TorrentManager) ExtractMessagesFromHistoryArchive(communityID types.HexBytes, archiveID string) ([]*protobuf.WakuMessage, error) { - id := communityID.String() - - index, err := m.LoadHistoryArchiveIndexFromFile(m.identity, communityID) - if err != nil { - return nil, err - } - - dataFile, err := os.Open(m.archiveDataFile(id)) - if err != nil { - return nil, err - } - defer dataFile.Close() - - m.LogStdout("extracting messages from history archive", - zap.String("communityID", communityID.String()), - zap.String("archiveID", archiveID)) - metadata := index.Archives[archiveID] - - _, err = dataFile.Seek(int64(metadata.Offset), 0) - if err != nil { - m.LogStdout("failed to seek archive data file", zap.Error(err)) - return nil, err - } - - data := make([]byte, metadata.Size-metadata.Padding) - m.LogStdout("loading history archive data into memory", zap.Float64("data_size_MB", float64(metadata.Size-metadata.Padding)/1024.0/1024.0)) - _, err = dataFile.Read(data) - if err != nil { - m.LogStdout("failed failed to read archive data", zap.Error(err)) - return nil, err - } - - archive := &protobuf.WakuMessageArchive{} - - err = proto.Unmarshal(data, archive) - if err != nil { - // The archive data might eb encrypted so we try to decrypt instead first - var protocolMessage encryption.ProtocolMessage - err := proto.Unmarshal(data, &protocolMessage) - if err != nil { - m.LogStdout("failed to unmarshal protocol message", zap.Error(err)) - return nil, err - } - - pk, err := crypto.DecompressPubkey(communityID) - if err != nil { - m.logger.Debug("failed to decompress community pubkey", zap.Error(err)) - return nil, err - } - decryptedBytes, err := m.encryptor.HandleMessage(m.identity, pk, &protocolMessage, make([]byte, 0)) - if err != nil { - m.LogStdout("failed to decrypt message archive", zap.Error(err)) - return nil, err - } - err = proto.Unmarshal(decryptedBytes.DecryptedMessage, archive) - if err != nil { - m.LogStdout("failed to unmarshal message archive", zap.Error(err)) - return nil, err - } - } - return archive.Messages, nil -} - -func (m *TorrentManager) GetHistoryArchiveMagnetlink(communityID types.HexBytes) (string, error) { - id := communityID.String() - torrentFile := m.torrentFile(id) - - metaInfo, err := metainfo.LoadFromFile(torrentFile) - if err != nil { - return "", err - } - - info, err := metaInfo.UnmarshalInfo() - if err != nil { - return "", err - } - - return metaInfo.Magnet(nil, &info).String(), nil -} - -func (m *TorrentManager) createWakuMessageArchive(from time.Time, to time.Time, messages []types.Message, topics [][]byte) *protobuf.WakuMessageArchive { - var wakuMessages []*protobuf.WakuMessage - - for _, msg := range messages { - topic := types.TopicTypeToByteArray(msg.Topic) - wakuMessage := &protobuf.WakuMessage{ - Sig: msg.Sig, - Timestamp: uint64(msg.Timestamp), - Topic: topic, - Payload: msg.Payload, - Padding: msg.Padding, - Hash: msg.Hash, - ThirdPartyId: msg.ThirdPartyID, - } - wakuMessages = append(wakuMessages, wakuMessage) - } - - metadata := protobuf.WakuMessageArchiveMetadata{ - From: uint64(from.Unix()), - To: uint64(to.Unix()), - ContentTopic: topics, - } - - wakuMessageArchive := &protobuf.WakuMessageArchive{ - Metadata: &metadata, - Messages: wakuMessages, - } - return wakuMessageArchive -} - -func (m *TorrentManager) LoadHistoryArchiveIndexFromFile(myKey *ecdsa.PrivateKey, communityID types.HexBytes) (*protobuf.WakuMessageArchiveIndex, error) { - wakuMessageArchiveIndexProto := &protobuf.WakuMessageArchiveIndex{} - - indexPath := m.archiveIndexFile(communityID.String()) - indexData, err := os.ReadFile(indexPath) - if err != nil { - return nil, err - } - - err = proto.Unmarshal(indexData, wakuMessageArchiveIndexProto) - if err != nil { - return nil, err - } - - if len(wakuMessageArchiveIndexProto.Archives) == 0 && len(indexData) > 0 { - // This means we're dealing with an encrypted index file, so we have to decrypt it first - var protocolMessage encryption.ProtocolMessage - err := proto.Unmarshal(indexData, &protocolMessage) - if err != nil { - return nil, err - } - pk, err := crypto.DecompressPubkey(communityID) - if err != nil { - return nil, err - } - decryptedBytes, err := m.encryptor.HandleMessage(myKey, pk, &protocolMessage, make([]byte, 0)) - if err != nil { - return nil, err - } - err = proto.Unmarshal(decryptedBytes.DecryptedMessage, wakuMessageArchiveIndexProto) - if err != nil { - return nil, err - } - } - - return wakuMessageArchiveIndexProto, nil -} - func (m *TorrentManager) TorrentFileExists(communityID string) bool { - _, err := os.Stat(m.torrentFile(communityID)) + _, err := os.Stat(torrentFile(m.torrentConfig.TorrentDir, communityID)) return err == nil } -func (m *TorrentManager) torrentFile(communityID string) string { - return path.Join(m.torrentConfig.TorrentDir, communityID+".torrent") -} - -func (m *TorrentManager) archiveIndexFile(communityID string) string { - return path.Join(m.torrentConfig.DataDir, communityID, "index") -} - -func (m *TorrentManager) archiveDataFile(communityID string) string { - return path.Join(m.torrentConfig.DataDir, communityID, "data") -} - func topicsAsByteArrays(topics []types.TopicType) [][]byte { var topicsAsByteArrays [][]byte for _, t := range topics { @@ -1135,3 +668,7 @@ func findIndexFile(files []*torrent.File) (index int, ok bool) { } return 0, false } + +func torrentFile(torrentDir, communityID string) string { + return path.Join(torrentDir, communityID+".torrent") +} diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index fee0d1383..68ff7acd7 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -3998,7 +3998,7 @@ importMessageArchivesLoop: if delayImport { select { case <-ctx.Done(): - m.torrentManager.LogStdout("interrupted importing history archive messages") + m.logger.Debug("interrupted importing history archive messages") return nil case <-time.After(1 * time.Hour): delayImport = false @@ -4007,7 +4007,7 @@ importMessageArchivesLoop: select { case <-ctx.Done(): - m.torrentManager.LogStdout("interrupted importing history archive messages") + m.logger.Debug("interrupted importing history archive messages") return nil case <-importTicker.C: err := m.checkIfIMemberOfCommunity(communityID) @@ -4016,16 +4016,16 @@ importMessageArchivesLoop: } archiveIDsToImport, err := m.torrentManager.GetMessageArchiveIDsToImport(communityID) if err != nil { - m.torrentManager.LogStdout("couldn't get message archive IDs to import", zap.Error(err)) + m.logger.Error("couldn't get message archive IDs to import", zap.Error(err)) return err } if len(archiveIDsToImport) == 0 { - m.torrentManager.LogStdout("no message archives to import") + m.logger.Debug("no message archives to import") break importMessageArchivesLoop } - m.torrentManager.LogStdout("importing message archive", zap.Int("left", len(archiveIDsToImport))) + m.logger.Info("importing message archive", zap.Int("left", len(archiveIDsToImport))) // only process one archive at a time, so in case of cancel we don't // wait for all archives to be processed first @@ -4039,7 +4039,7 @@ importMessageArchivesLoop: delayImport = true continue } - m.torrentManager.LogStdout("failed to extract history archive messages", zap.Error(err)) + m.logger.Error("failed to extract history archive messages", zap.Error(err)) continue } @@ -4048,14 +4048,14 @@ importMessageArchivesLoop: for _, messagesChunk := range chunkSlice(archiveMessages, importMessagesChunkSize) { if err := m.importRateLimiter.Wait(ctx); err != nil { if !errors.Is(err, context.Canceled) { - m.torrentManager.LogStdout("rate limiter error when handling archive messages", zap.Error(err)) + m.logger.Error("rate limiter error when handling archive messages", zap.Error(err)) } continue importMessageArchivesLoop } response, err := m.handleArchiveMessages(messagesChunk) if err != nil { - m.torrentManager.LogStdout("failed to handle archive messages", zap.Error(err)) + m.logger.Error("failed to handle archive messages", zap.Error(err)) continue importMessageArchivesLoop } @@ -4069,7 +4069,7 @@ importMessageArchivesLoop: err = m.torrentManager.SetMessageArchiveIDImported(communityID, downloadedArchiveID, true) if err != nil { - m.torrentManager.LogStdout("failed to mark history message archive as imported", zap.Error(err)) + m.logger.Error("failed to mark history message archive as imported", zap.Error(err)) continue } }