Samuel Hawksby-Robinson 3d68013332 chore(no-torrent)_: Renamed Torrent to Archive
I've renamed TorrentManager to ArchiveManager, ArchiveManager to ArchiveFileManager, TorrentContract to ArchiveService, ArchiveContract to ArchiveFileService. I've also renamed all init functions and struct fields to the appropriate archive-centric naming.
2024-06-07 13:44:12 +01:00

514 lines
15 KiB

//go:build !disable_torrent
// +build !disable_torrent
// Attribution to Pascal Precht, for further context please view the below issues
// -
// -
// -
// -
package communities
import (
type ArchiveFileManager struct {
torrentConfig *params.TorrentConfig
logger *zap.Logger
persistence *Persistence
identity *ecdsa.PrivateKey
encryptor *encryption.Protocol
publisher Publisher
func NewArchiveFileManager(torrentConfig *params.TorrentConfig, logger *zap.Logger, persistence *Persistence, identity *ecdsa.PrivateKey, encryptor *encryption.Protocol, publisher Publisher) *ArchiveFileManager {
return &ArchiveFileManager{
torrentConfig: torrentConfig,
logger: logger,
persistence: persistence,
identity: identity,
encryptor: encryptor,
publisher: publisher,
func (m *ArchiveFileManager) createHistoryArchiveTorrent(communityID types.HexBytes, msgs []*types.Message, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) {
loadFromDB := len(msgs) == 0
from := startDate
to := from.Add(partition)
if to.After(endDate) {
to = endDate
archiveDir := m.torrentConfig.DataDir + "/" + communityID.String()
torrentDir := m.torrentConfig.TorrentDir
indexPath := archiveDir + "/index"
dataPath := archiveDir + "/data"
wakuMessageArchiveIndexProto := &protobuf.WakuMessageArchiveIndex{}
wakuMessageArchiveIndex := make(map[string]*protobuf.WakuMessageArchiveIndexMetadata)
archiveIDs := make([]string, 0)
if _, err := os.Stat(archiveDir); os.IsNotExist(err) {
err := os.MkdirAll(archiveDir, 0700)
if err != nil {
return archiveIDs, err
if _, err := os.Stat(torrentDir); os.IsNotExist(err) {
err := os.MkdirAll(torrentDir, 0700)
if err != nil {
return archiveIDs, err
_, err := os.Stat(indexPath)
if err == nil {
wakuMessageArchiveIndexProto, err = m.LoadHistoryArchiveIndexFromFile(m.identity, communityID)
if err != nil {
return archiveIDs, err
var offset uint64 = 0
for hash, metadata := range wakuMessageArchiveIndexProto.Archives {
offset = offset + metadata.Size
wakuMessageArchiveIndex[hash] = metadata
var encodedArchives []*EncodedArchiveData
topicsAsByteArrays := topicsAsByteArrays(topics)
m.publisher.publish(&Subscription{CreatingHistoryArchivesSignal: &signal.CreatingHistoryArchivesSignal{
CommunityID: communityID.String(),
m.logger.Debug("creating archives",
zap.Any("startDate", startDate),
zap.Any("endDate", endDate),
zap.Duration("partition", partition),
for {
if from.Equal(endDate) || from.After(endDate) {
m.logger.Debug("creating message archive",
zap.Any("from", from),
zap.Any("to", to),
var messages []types.Message
if loadFromDB {
messages, err = m.persistence.GetWakuMessagesByFilterTopic(topics, uint64(from.Unix()), uint64(to.Unix()))
if err != nil {
return archiveIDs, err
} else {
for _, msg := range msgs {
if int64(msg.Timestamp) >= from.Unix() && int64(msg.Timestamp) < to.Unix() {
messages = append(messages, *msg)
if len(messages) == 0 {
// No need to create an archive with zero messages
m.logger.Debug("no messages in this partition")
from = to
to = to.Add(partition)
if to.After(endDate) {
to = endDate
m.logger.Debug("creating archive with messages", zap.Int("messagesCount", len(messages)))
// Not only do we partition messages, we also chunk them
// roughly by size, such that each chunk will not exceed a given
// size and archive data doesn't get too big
messageChunks := make([][]types.Message, 0)
currentChunkSize := 0
currentChunk := make([]types.Message, 0)
for _, msg := range messages {
msgSize := len(msg.Payload) + len(msg.Sig)
if msgSize > maxArchiveSizeInBytes {
// we drop messages this big
if currentChunkSize+msgSize > maxArchiveSizeInBytes {
messageChunks = append(messageChunks, currentChunk)
currentChunk = make([]types.Message, 0)
currentChunkSize = 0
currentChunk = append(currentChunk, msg)
currentChunkSize = currentChunkSize + msgSize
messageChunks = append(messageChunks, currentChunk)
for _, messages := range messageChunks {
wakuMessageArchive := m.createWakuMessageArchive(from, to, messages, topicsAsByteArrays)
encodedArchive, err := proto.Marshal(wakuMessageArchive)
if err != nil {
return archiveIDs, err
if encrypt {
messageSpec, err := m.encryptor.BuildHashRatchetMessage(communityID, encodedArchive)
if err != nil {
return archiveIDs, err
encodedArchive, err = proto.Marshal(messageSpec.Message)
if err != nil {
return archiveIDs, err
rawSize := len(encodedArchive)
padding := 0
size := 0
if rawSize > pieceLength {
size = rawSize + pieceLength - (rawSize % pieceLength)
padding = size - rawSize
} else {
padding = pieceLength - rawSize
size = rawSize + padding
wakuMessageArchiveIndexMetadata := &protobuf.WakuMessageArchiveIndexMetadata{
Metadata: wakuMessageArchive.Metadata,
Offset: offset,
Size: uint64(size),
Padding: uint64(padding),
wakuMessageArchiveIndexMetadataBytes, err := proto.Marshal(wakuMessageArchiveIndexMetadata)
if err != nil {
return archiveIDs, err
archiveID := crypto.Keccak256Hash(wakuMessageArchiveIndexMetadataBytes).String()
archiveIDs = append(archiveIDs, archiveID)
wakuMessageArchiveIndex[archiveID] = wakuMessageArchiveIndexMetadata
encodedArchives = append(encodedArchives, &EncodedArchiveData{bytes: encodedArchive, padding: padding})
offset = offset + uint64(rawSize) + uint64(padding)
from = to
to = to.Add(partition)
if to.After(endDate) {
to = endDate
if len(encodedArchives) > 0 {
dataBytes := make([]byte, 0)
for _, encodedArchiveData := range encodedArchives {
dataBytes = append(dataBytes, encodedArchiveData.bytes...)
dataBytes = append(dataBytes, make([]byte, encodedArchiveData.padding)...)
wakuMessageArchiveIndexProto.Archives = wakuMessageArchiveIndex
indexBytes, err := proto.Marshal(wakuMessageArchiveIndexProto)
if err != nil {
return archiveIDs, err
if encrypt {
messageSpec, err := m.encryptor.BuildHashRatchetMessage(communityID, indexBytes)
if err != nil {
return archiveIDs, err
indexBytes, err = proto.Marshal(messageSpec.Message)
if err != nil {
return archiveIDs, err
err = os.WriteFile(indexPath, indexBytes, 0644) // nolint: gosec
if err != nil {
return archiveIDs, err
file, err := os.OpenFile(dataPath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
return archiveIDs, err
defer file.Close()
_, err = file.Write(dataBytes)
if err != nil {
return archiveIDs, err
metaInfo := metainfo.MetaInfo{
AnnounceList: defaultAnnounceList,
metaInfo.CreatedBy = common.PubkeyToHex(&m.identity.PublicKey)
info := metainfo.Info{
PieceLength: int64(pieceLength),
err = info.BuildFromFilePath(archiveDir)
if err != nil {
return archiveIDs, err
metaInfo.InfoBytes, err = bencode.Marshal(info)
if err != nil {
return archiveIDs, err
metaInfoBytes, err := bencode.Marshal(metaInfo)
if err != nil {
return archiveIDs, err
err = os.WriteFile(torrentFile(m.torrentConfig.TorrentDir, communityID.String()), metaInfoBytes, 0644) // nolint: gosec
if err != nil {
return archiveIDs, err
m.logger.Debug("torrent created", zap.Any("from", startDate.Unix()), zap.Any("to", endDate.Unix()))
HistoryArchivesCreatedSignal: &signal.HistoryArchivesCreatedSignal{
CommunityID: communityID.String(),
From: int(startDate.Unix()),
To: int(endDate.Unix()),
} else {
m.logger.Debug("no archives created")
NoHistoryArchivesCreatedSignal: &signal.NoHistoryArchivesCreatedSignal{
CommunityID: communityID.String(),
From: int(startDate.Unix()),
To: int(endDate.Unix()),
lastMessageArchiveEndDate, err := m.persistence.GetLastMessageArchiveEndDate(communityID)
if err != nil {
return archiveIDs, err
if lastMessageArchiveEndDate > 0 {
err = m.persistence.UpdateLastMessageArchiveEndDate(communityID, uint64(from.Unix()))
} else {
err = m.persistence.SaveLastMessageArchiveEndDate(communityID, uint64(from.Unix()))
if err != nil {
return archiveIDs, err
return archiveIDs, nil
func (m *ArchiveFileManager) archiveIndexFile(communityID string) string {
return path.Join(m.torrentConfig.DataDir, communityID, "index")
func (m *ArchiveFileManager) createWakuMessageArchive(from time.Time, to time.Time, messages []types.Message, topics [][]byte) *protobuf.WakuMessageArchive {
var wakuMessages []*protobuf.WakuMessage
for _, msg := range messages {
topic := types.TopicTypeToByteArray(msg.Topic)
wakuMessage := &protobuf.WakuMessage{
Sig: msg.Sig,
Timestamp: uint64(msg.Timestamp),
Topic: topic,
Payload: msg.Payload,
Padding: msg.Padding,
Hash: msg.Hash,
ThirdPartyId: msg.ThirdPartyID,
wakuMessages = append(wakuMessages, wakuMessage)
metadata := protobuf.WakuMessageArchiveMetadata{
From: uint64(from.Unix()),
To: uint64(to.Unix()),
ContentTopic: topics,
wakuMessageArchive := &protobuf.WakuMessageArchive{
Metadata: &metadata,
Messages: wakuMessages,
return wakuMessageArchive
func (m *ArchiveFileManager) CreateHistoryArchiveTorrentFromMessages(communityID types.HexBytes, messages []*types.Message, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) {
return m.createHistoryArchiveTorrent(communityID, messages, topics, startDate, endDate, partition, encrypt)
func (m *ArchiveFileManager) CreateHistoryArchiveTorrentFromDB(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) {
return m.createHistoryArchiveTorrent(communityID, make([]*types.Message, 0), topics, startDate, endDate, partition, encrypt)
func (m *ArchiveFileManager) GetMessageArchiveIDsToImport(communityID types.HexBytes) ([]string, error) {
return m.persistence.GetMessageArchiveIDsToImport(communityID)
func (m *ArchiveFileManager) SaveMessageArchiveID(communityID types.HexBytes, hash string) error {
return m.persistence.SaveMessageArchiveID(communityID, hash)
func (m *ArchiveFileManager) SetMessageArchiveIDImported(communityID types.HexBytes, hash string, imported bool) error {
return m.persistence.SetMessageArchiveIDImported(communityID, hash, imported)
func (m *ArchiveFileManager) GetHistoryArchiveMagnetlink(communityID types.HexBytes) (string, error) {
id := communityID.String()
torrentFile := torrentFile(m.torrentConfig.TorrentDir, id)
metaInfo, err := metainfo.LoadFromFile(torrentFile)
if err != nil {
return "", err
info, err := metaInfo.UnmarshalInfo()
if err != nil {
return "", err
return metaInfo.Magnet(nil, &info).String(), nil
func (m *ArchiveFileManager) archiveDataFile(communityID string) string {
return path.Join(m.torrentConfig.DataDir, communityID, "data")
func (m *ArchiveFileManager) ExtractMessagesFromHistoryArchive(communityID types.HexBytes, archiveID string) ([]*protobuf.WakuMessage, error) {
id := communityID.String()
index, err := m.LoadHistoryArchiveIndexFromFile(m.identity, communityID)
if err != nil {
return nil, err
dataFile, err := os.Open(m.archiveDataFile(id))
if err != nil {
return nil, err
defer dataFile.Close()
m.logger.Debug("extracting messages from history archive",
zap.String("communityID", communityID.String()),
zap.String("archiveID", archiveID))
metadata := index.Archives[archiveID]
_, err = dataFile.Seek(int64(metadata.Offset), 0)
if err != nil {
m.logger.Error("failed to seek archive data file", zap.Error(err))
return nil, err
data := make([]byte, metadata.Size-metadata.Padding)
m.logger.Debug("loading history archive data into memory", zap.Float64("data_size_MB", float64(metadata.Size-metadata.Padding)/1024.0/1024.0))
_, err = dataFile.Read(data)
if err != nil {
m.logger.Error("failed failed to read archive data", zap.Error(err))
return nil, err
archive := &protobuf.WakuMessageArchive{}
err = proto.Unmarshal(data, archive)
if err != nil {
// The archive data might eb encrypted so we try to decrypt instead first
var protocolMessage encryption.ProtocolMessage
err := proto.Unmarshal(data, &protocolMessage)
if err != nil {
m.logger.Error("failed to unmarshal protocol message", zap.Error(err))
return nil, err
pk, err := crypto.DecompressPubkey(communityID)
if err != nil {
m.logger.Error("failed to decompress community pubkey", zap.Error(err))
return nil, err
decryptedBytes, err := m.encryptor.HandleMessage(m.identity, pk, &protocolMessage, make([]byte, 0))
if err != nil {
m.logger.Error("failed to decrypt message archive", zap.Error(err))
return nil, err
err = proto.Unmarshal(decryptedBytes.DecryptedMessage, archive)
if err != nil {
m.logger.Error("failed to unmarshal message archive", zap.Error(err))
return nil, err
return archive.Messages, nil
func (m *ArchiveFileManager) LoadHistoryArchiveIndexFromFile(myKey *ecdsa.PrivateKey, communityID types.HexBytes) (*protobuf.WakuMessageArchiveIndex, error) {
wakuMessageArchiveIndexProto := &protobuf.WakuMessageArchiveIndex{}
indexPath := m.archiveIndexFile(communityID.String())
indexData, err := os.ReadFile(indexPath)
if err != nil {
return nil, err
err = proto.Unmarshal(indexData, wakuMessageArchiveIndexProto)
if err != nil {
return nil, err
if len(wakuMessageArchiveIndexProto.Archives) == 0 && len(indexData) > 0 {
// This means we're dealing with an encrypted index file, so we have to decrypt it first
var protocolMessage encryption.ProtocolMessage
err := proto.Unmarshal(indexData, &protocolMessage)
if err != nil {
return nil, err
pk, err := crypto.DecompressPubkey(communityID)
if err != nil {
return nil, err
decryptedBytes, err := m.encryptor.HandleMessage(myKey, pk, &protocolMessage, make([]byte, 0))
if err != nil {
return nil, err
err = proto.Unmarshal(decryptedBytes.DecryptedMessage, wakuMessageArchiveIndexProto)
if err != nil {
return nil, err
return wakuMessageArchiveIndexProto, nil