chore(no-torrent)_: Ensured move of all torrent funcs and structs

I also ensured that the order of functions matches the original code, to make comparison easier during review.
This commit is contained in:
Samuel Hawksby-Robinson 2024-05-29 20:25:47 +01:00
parent 21325345f9
commit c084ca4230
2 changed files with 285 additions and 284 deletions

View File

@ -411,25 +411,6 @@ func (m *Manager) LogStdout(msg string, fields ...zap.Field) {
m.logger.Debug(msg, fields...)
}
type archiveMDSlice []*archiveMetadata
type archiveMetadata struct {
hash string
from uint64
}
func (md archiveMDSlice) Len() int {
return len(md)
}
func (md archiveMDSlice) Swap(i, j int) {
md[i], md[j] = md[j], md[i]
}
func (md archiveMDSlice) Less(i, j int) bool {
return md[i].from > md[j].from
}
type Subscription struct {
Community *Community
CreatingHistoryArchivesSignal *signal.CreatingHistoryArchivesSignal
@ -4045,37 +4026,6 @@ func (m *Manager) GetLatestWakuMessageTimestamp(topics []types.TopicType) (uint6
return m.persistence.GetLatestWakuMessageTimestamp(topics)
}
type EncodedArchiveData struct {
padding int
bytes []byte
}
type HistoryArchiveDownloadTaskInfo struct {
TotalDownloadedArchivesCount int
TotalArchivesCount int
Cancelled bool
}
func (m *Manager) SaveMessageArchiveID(communityID types.HexBytes, hash string) error {
return m.persistence.SaveMessageArchiveID(communityID, hash)
}
func (m *Manager) GetMessageArchiveIDsToImport(communityID types.HexBytes) ([]string, error) {
return m.persistence.GetMessageArchiveIDsToImport(communityID)
}
func (m *Manager) SetMessageArchiveIDImported(communityID types.HexBytes, hash string, imported bool) error {
return m.persistence.SetMessageArchiveIDImported(communityID, hash, imported)
}
func topicsAsByteArrays(topics []types.TopicType) [][]byte {
var topicsAsByteArrays [][]byte
for _, t := range topics {
topic := types.TopicTypeToByteArray(t)
topicsAsByteArrays = append(topicsAsByteArrays, topic)
}
return topicsAsByteArrays
}
func (m *Manager) GetCommunityToken(communityID string, chainID int, address string) (*community_token.CommunityToken, error) {
return m.persistence.GetCommunityToken(communityID, chainID, address)
}

View File

@ -27,6 +27,36 @@ import (
"github.com/status-im/status-go/signal"
)
type archiveMDSlice []*archiveMetadata
type archiveMetadata struct {
hash string
from uint64
}
func (md archiveMDSlice) Len() int {
return len(md)
}
func (md archiveMDSlice) Swap(i, j int) {
md[i], md[j] = md[j], md[i]
}
func (md archiveMDSlice) Less(i, j int) bool {
return md[i].from > md[j].from
}
type EncodedArchiveData struct {
padding int
bytes []byte
}
type HistoryArchiveDownloadTaskInfo struct {
TotalDownloadedArchivesCount int
TotalArchivesCount int
Cancelled bool
}
type TorrentManager struct {
torrentConfig *params.TorrentConfig
torrentClient *torrent.Client
@ -49,6 +79,27 @@ func NewTorrentManager(torrentConfig *params.TorrentConfig, logger, stdoutLogger
}
}
// LogStdout is copied directly from Manager, consider a refactor
func (m *TorrentManager) LogStdout(msg string, fields ...zap.Field) {
m.stdoutLogger.Info(msg, fields...)
m.logger.Debug(msg, fields...)
}
func (m *TorrentManager) SetOnline(online bool) {
if online {
if m.torrentConfig != nil && m.torrentConfig.Enabled && !m.TorrentClientStarted() {
err := m.StartTorrentClient()
if err != nil {
m.LogStdout("couldn't start torrent client", zap.Error(err))
}
}
}
}
func (m *TorrentManager) SetTorrentConfig(config *params.TorrentConfig) {
m.torrentConfig = config
}
// getTCPandUDPport will return the same port number given if != 0,
// otherwise, it will attempt to find a free random tcp and udp port using
// the same number for both protocols
@ -99,26 +150,6 @@ func (m *TorrentManager) getTCPandUDPport(portNumber int) (int, error) {
return 0, fmt.Errorf("no free port found")
}
func (m *TorrentManager) LogStdout(msg string, fields ...zap.Field) {
m.stdoutLogger.Info(msg, fields...)
m.logger.Debug(msg, fields...)
}
func (m *TorrentManager) SetTorrentConfig(config *params.TorrentConfig) {
m.torrentConfig = config
}
func (m *TorrentManager) SetOnline(online bool) {
if online {
if m.torrentConfig != nil && m.torrentConfig.Enabled && !m.TorrentClientStarted() {
err := m.StartTorrentClient()
if err != nil {
m.LogStdout("couldn't start torrent client", zap.Error(err))
}
}
}
}
func (m *TorrentManager) StartTorrentClient() error {
if m.torrentConfig == nil {
return fmt.Errorf("can't start torrent client: missing torrentConfig")
@ -174,6 +205,180 @@ func (m *TorrentManager) TorrentClientStarted() bool {
return m.torrentClient != nil
}
func (m *TorrentManager) GetCommunityChatsFilters(communityID types.HexBytes) ([]*transport.Filter, error) {
chatIDs, err := m.persistence.GetCommunityChatIDs(communityID)
if err != nil {
return nil, err
}
filters := []*transport.Filter{}
for _, cid := range chatIDs {
filters = append(filters, m.transport.FilterByChatID(cid))
}
return filters, nil
}
func (m *TorrentManager) GetCommunityChatsTopics(communityID types.HexBytes) ([]types.TopicType, error) {
filters, err := m.GetCommunityChatsFilters(communityID)
if err != nil {
return nil, err
}
topics := []types.TopicType{}
for _, filter := range filters {
topics = append(topics, filter.ContentTopic)
}
return topics, nil
}
func (m *TorrentManager) GetOldestWakuMessageTimestamp(topics []types.TopicType) (uint64, error) {
return m.persistence.GetOldestWakuMessageTimestamp(topics)
}
func (m *TorrentManager) GetLastMessageArchiveEndDate(communityID types.HexBytes) (uint64, error) {
return m.persistence.GetLastMessageArchiveEndDate(communityID)
}
func (m *TorrentManager) GetHistoryArchivePartitionStartTimestamp(communityID types.HexBytes) (uint64, error) {
filters, err := m.GetCommunityChatsFilters(communityID)
if err != nil {
m.LogStdout("failed to get community chats filters", zap.Error(err))
return 0, err
}
if len(filters) == 0 {
// If we don't have chat filters, we likely don't have any chats
// associated to this community, which means there's nothing more
// to do here
return 0, nil
}
topics := []types.TopicType{}
for _, filter := range filters {
topics = append(topics, filter.ContentTopic)
}
lastArchiveEndDateTimestamp, err := m.GetLastMessageArchiveEndDate(communityID)
if err != nil {
m.LogStdout("failed to get last archive end date", zap.Error(err))
return 0, err
}
if lastArchiveEndDateTimestamp == 0 {
// If we don't have a tracked last message archive end date, it
// means we haven't created an archive before, which means
// the next thing to look at is the oldest waku message timestamp for
// this community
lastArchiveEndDateTimestamp, err = m.GetOldestWakuMessageTimestamp(topics)
if err != nil {
m.LogStdout("failed to get oldest waku message timestamp", zap.Error(err))
return 0, err
}
if lastArchiveEndDateTimestamp == 0 {
// This means there's no waku message stored for this community so far
// (even after requesting possibly missed messages), so no messages exist yet that can be archived
m.LogStdout("can't find valid `lastArchiveEndTimestamp`")
return 0, nil
}
}
return lastArchiveEndDateTimestamp, nil
}
func (m *TorrentManager) CreateAndSeedHistoryArchive(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) error {
m.UnseedHistoryArchiveTorrent(communityID)
_, err := m.CreateHistoryArchiveTorrentFromDB(communityID, topics, startDate, endDate, partition, encrypt)
if err != nil {
return err
}
return m.SeedHistoryArchiveTorrent(communityID)
}
func (m *TorrentManager) StartHistoryArchiveTasksInterval(community *Community, interval time.Duration) {
id := community.IDString()
if _, exists := m.historyArchiveTasks.Load(id); exists {
m.LogStdout("history archive tasks interval already in progress", zap.String("id", id))
return
}
cancel := make(chan struct{})
m.historyArchiveTasks.Store(id, cancel)
m.historyArchiveTasksWaitGroup.Add(1)
ticker := time.NewTicker(interval)
defer ticker.Stop()
m.LogStdout("starting history archive tasks interval", zap.String("id", id))
for {
select {
case <-ticker.C:
m.LogStdout("starting archive task...", zap.String("id", id))
lastArchiveEndDateTimestamp, err := m.GetHistoryArchivePartitionStartTimestamp(community.ID())
if err != nil {
m.LogStdout("failed to get last archive end date", zap.Error(err))
continue
}
if lastArchiveEndDateTimestamp == 0 {
// This means there are no waku messages for this community,
// so nothing to do here
m.LogStdout("couldn't determine archive start date - skipping")
continue
}
topics, err := m.GetCommunityChatsTopics(community.ID())
if err != nil {
m.LogStdout("failed to get community chat topics ", zap.Error(err))
continue
}
ts := time.Now().Unix()
to := time.Unix(ts, 0)
lastArchiveEndDate := time.Unix(int64(lastArchiveEndDateTimestamp), 0)
err = m.CreateAndSeedHistoryArchive(community.ID(), topics, lastArchiveEndDate, to, interval, community.Encrypted())
if err != nil {
m.LogStdout("failed to create and seed history archive", zap.Error(err))
continue
}
case <-cancel:
m.UnseedHistoryArchiveTorrent(community.ID())
m.historyArchiveTasks.Delete(id)
m.historyArchiveTasksWaitGroup.Done()
return
}
}
}
func (m *TorrentManager) StopHistoryArchiveTasksIntervals() {
m.historyArchiveTasks.Range(func(_, task interface{}) bool {
close(task.(chan struct{})) // Need to cast to the chan
return true
})
// Stoping archive interval tasks is async, so we need
// to wait for all of them to be closed before we shutdown
// the torrent client
m.historyArchiveTasksWaitGroup.Wait()
}
func (m *TorrentManager) StopHistoryArchiveTasksInterval(communityID types.HexBytes) {
task, exists := m.historyArchiveTasks.Load(communityID.String())
if exists {
m.logger.Info("Stopping history archive tasks interval", zap.Any("id", communityID.String()))
close(task.(chan struct{})) // Need to cast to the chan
}
}
func (m *TorrentManager) CreateHistoryArchiveTorrentFromMessages(communityID types.HexBytes, messages []*types.Message, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) {
return m.CreateHistoryArchiveTorrent(communityID, messages, topics, startDate, endDate, partition, encrypt)
}
func (m *TorrentManager) CreateHistoryArchiveTorrentFromDB(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) {
return m.CreateHistoryArchiveTorrent(communityID, make([]*types.Message, 0), topics, startDate, endDate, partition, encrypt)
}
func (m *TorrentManager) CreateHistoryArchiveTorrent(communityID types.HexBytes, msgs []*types.Message, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) {
loadFromDB := len(msgs) == 0
@ -500,79 +705,6 @@ func (m *TorrentManager) SeedHistoryArchiveTorrent(communityID types.HexBytes) e
return nil
}
func (m *TorrentManager) CreateAndSeedHistoryArchive(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) error {
m.UnseedHistoryArchiveTorrent(communityID)
_, err := m.CreateHistoryArchiveTorrentFromDB(communityID, topics, startDate, endDate, partition, encrypt)
if err != nil {
return err
}
return m.SeedHistoryArchiveTorrent(communityID)
}
func (m *TorrentManager) StartHistoryArchiveTasksInterval(community *Community, interval time.Duration) {
id := community.IDString()
if _, exists := m.historyArchiveTasks.Load(id); exists {
m.LogStdout("history archive tasks interval already in progress", zap.String("id", id))
return
}
cancel := make(chan struct{})
m.historyArchiveTasks.Store(id, cancel)
m.historyArchiveTasksWaitGroup.Add(1)
ticker := time.NewTicker(interval)
defer ticker.Stop()
m.LogStdout("starting history archive tasks interval", zap.String("id", id))
for {
select {
case <-ticker.C:
m.LogStdout("starting archive task...", zap.String("id", id))
lastArchiveEndDateTimestamp, err := m.GetHistoryArchivePartitionStartTimestamp(community.ID())
if err != nil {
m.LogStdout("failed to get last archive end date", zap.Error(err))
continue
}
if lastArchiveEndDateTimestamp == 0 {
// This means there are no waku messages for this community,
// so nothing to do here
m.LogStdout("couldn't determine archive start date - skipping")
continue
}
topics, err := m.GetCommunityChatsTopics(community.ID())
if err != nil {
m.LogStdout("failed to get community chat topics ", zap.Error(err))
continue
}
ts := time.Now().Unix()
to := time.Unix(ts, 0)
lastArchiveEndDate := time.Unix(int64(lastArchiveEndDateTimestamp), 0)
err = m.CreateAndSeedHistoryArchive(community.ID(), topics, lastArchiveEndDate, to, interval, community.Encrypted())
if err != nil {
m.LogStdout("failed to create and seed history archive", zap.Error(err))
continue
}
case <-cancel:
m.UnseedHistoryArchiveTorrent(community.ID())
m.historyArchiveTasks.Delete(id)
m.historyArchiveTasksWaitGroup.Done()
return
}
}
}
func (m *TorrentManager) CreateHistoryArchiveTorrentFromMessages(communityID types.HexBytes, messages []*types.Message, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) {
return m.CreateHistoryArchiveTorrent(communityID, messages, topics, startDate, endDate, partition, encrypt)
}
func (m *TorrentManager) CreateHistoryArchiveTorrentFromDB(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) {
return m.CreateHistoryArchiveTorrent(communityID, make([]*types.Message, 0), topics, startDate, endDate, partition, encrypt)
}
func (m *TorrentManager) UnseedHistoryArchiveTorrent(communityID types.HexBytes) {
id := communityID.String()
@ -601,6 +733,18 @@ func (m *TorrentManager) IsSeedingHistoryArchiveTorrent(communityID types.HexByt
return ok && torrent.Seeding()
}
func (m *TorrentManager) GetHistoryArchiveDownloadTask(communityID string) *HistoryArchiveDownloadTask {
return m.historyArchiveDownloadTasks[communityID]
}
func (m *TorrentManager) DeleteHistoryArchiveDownloadTask(communityID string) {
delete(m.historyArchiveDownloadTasks, communityID)
}
func (m *TorrentManager) AddHistoryArchiveDownloadTask(communityID string, task *HistoryArchiveDownloadTask) {
m.historyArchiveDownloadTasks[communityID] = task
}
func (m *TorrentManager) DownloadHistoryArchivesByMagnetlink(communityID types.HexBytes, magnetlink string, cancelTask chan struct{}) (*HistoryArchiveDownloadTaskInfo, error) {
id := communityID.String()
@ -772,47 +916,15 @@ func (m *TorrentManager) DownloadHistoryArchivesByMagnetlink(communityID types.H
}
}
func (m *TorrentManager) torrentFile(communityID string) string {
return path.Join(m.torrentConfig.TorrentDir, communityID+".torrent")
func (m *TorrentManager) SaveMessageArchiveID(communityID types.HexBytes, hash string) error {
return m.persistence.SaveMessageArchiveID(communityID, hash)
}
func (m *TorrentManager) archiveIndexFile(communityID string) string {
return path.Join(m.torrentConfig.DataDir, communityID, "index")
func (m *TorrentManager) GetMessageArchiveIDsToImport(communityID types.HexBytes) ([]string, error) {
return m.persistence.GetMessageArchiveIDsToImport(communityID)
}
func (m *TorrentManager) archiveDataFile(communityID string) string {
return path.Join(m.torrentConfig.DataDir, communityID, "data")
}
func (m *TorrentManager) StopHistoryArchiveTasksIntervals() {
m.historyArchiveTasks.Range(func(_, task interface{}) bool {
close(task.(chan struct{})) // Need to cast to the chan
return true
})
// Stoping archive interval tasks is async, so we need
// to wait for all of them to be closed before we shutdown
// the torrent client
m.historyArchiveTasksWaitGroup.Wait()
}
func (m *TorrentManager) StopHistoryArchiveTasksInterval(communityID types.HexBytes) {
task, exists := m.historyArchiveTasks.Load(communityID.String())
if exists {
m.logger.Info("Stopping history archive tasks interval", zap.Any("id", communityID.String()))
close(task.(chan struct{})) // Need to cast to the chan
}
}
func (m *TorrentManager) GetHistoryArchiveDownloadTask(communityID string) *HistoryArchiveDownloadTask {
return m.historyArchiveDownloadTasks[communityID]
}
func (m *TorrentManager) DeleteHistoryArchiveDownloadTask(communityID string) {
delete(m.historyArchiveDownloadTasks, communityID)
}
func (m *TorrentManager) AddHistoryArchiveDownloadTask(communityID string, task *HistoryArchiveDownloadTask) {
m.historyArchiveDownloadTasks[communityID] = task
func (m *TorrentManager) SetMessageArchiveIDImported(communityID types.HexBytes, hash string, imported bool) error {
return m.persistence.SetMessageArchiveIDImported(communityID, hash, imported)
}
func (m *TorrentManager) ExtractMessagesFromHistoryArchive(communityID types.HexBytes, archiveID string) ([]*protobuf.WakuMessage, error) {
@ -896,6 +1008,36 @@ func (m *TorrentManager) GetHistoryArchiveMagnetlink(communityID types.HexBytes)
return metaInfo.Magnet(nil, &info).String(), nil
}
func (m *TorrentManager) createWakuMessageArchive(from time.Time, to time.Time, messages []types.Message, topics [][]byte) *protobuf.WakuMessageArchive {
var wakuMessages []*protobuf.WakuMessage
for _, msg := range messages {
topic := types.TopicTypeToByteArray(msg.Topic)
wakuMessage := &protobuf.WakuMessage{
Sig: msg.Sig,
Timestamp: uint64(msg.Timestamp),
Topic: topic,
Payload: msg.Payload,
Padding: msg.Padding,
Hash: msg.Hash,
ThirdPartyId: msg.ThirdPartyID,
}
wakuMessages = append(wakuMessages, wakuMessage)
}
metadata := protobuf.WakuMessageArchiveMetadata{
From: uint64(from.Unix()),
To: uint64(to.Unix()),
ContentTopic: topics,
}
wakuMessageArchive := &protobuf.WakuMessageArchive{
Metadata: &metadata,
Messages: wakuMessages,
}
return wakuMessageArchive
}
func (m *TorrentManager) LoadHistoryArchiveIndexFromFile(myKey *ecdsa.PrivateKey, communityID types.HexBytes) (*protobuf.WakuMessageArchiveIndex, error) {
wakuMessageArchiveIndexProto := &protobuf.WakuMessageArchiveIndex{}
@ -934,121 +1076,30 @@ func (m *TorrentManager) LoadHistoryArchiveIndexFromFile(myKey *ecdsa.PrivateKey
return wakuMessageArchiveIndexProto, nil
}
func (m *TorrentManager) GetLastMessageArchiveEndDate(communityID types.HexBytes) (uint64, error) {
return m.persistence.GetLastMessageArchiveEndDate(communityID)
}
func (m *TorrentManager) GetOldestWakuMessageTimestamp(topics []types.TopicType) (uint64, error) {
return m.persistence.GetOldestWakuMessageTimestamp(topics)
}
func (m *TorrentManager) TorrentFileExists(communityID string) bool {
_, err := os.Stat(m.torrentFile(communityID))
return err == nil
}
func (m *TorrentManager) GetHistoryArchivePartitionStartTimestamp(communityID types.HexBytes) (uint64, error) {
filters, err := m.GetCommunityChatsFilters(communityID)
if err != nil {
m.LogStdout("failed to get community chats filters", zap.Error(err))
return 0, err
}
if len(filters) == 0 {
// If we don't have chat filters, we likely don't have any chats
// associated to this community, which means there's nothing more
// to do here
return 0, nil
}
topics := []types.TopicType{}
for _, filter := range filters {
topics = append(topics, filter.ContentTopic)
}
lastArchiveEndDateTimestamp, err := m.GetLastMessageArchiveEndDate(communityID)
if err != nil {
m.LogStdout("failed to get last archive end date", zap.Error(err))
return 0, err
}
if lastArchiveEndDateTimestamp == 0 {
// If we don't have a tracked last message archive end date, it
// means we haven't created an archive before, which means
// the next thing to look at is the oldest waku message timestamp for
// this community
lastArchiveEndDateTimestamp, err = m.GetOldestWakuMessageTimestamp(topics)
if err != nil {
m.LogStdout("failed to get oldest waku message timestamp", zap.Error(err))
return 0, err
}
if lastArchiveEndDateTimestamp == 0 {
// This means there's no waku message stored for this community so far
// (even after requesting possibly missed messages), so no messages exist yet that can be archived
m.LogStdout("can't find valid `lastArchiveEndTimestamp`")
return 0, nil
}
}
return lastArchiveEndDateTimestamp, nil
func (m *TorrentManager) torrentFile(communityID string) string {
return path.Join(m.torrentConfig.TorrentDir, communityID+".torrent")
}
func (m *TorrentManager) GetCommunityChatsTopics(communityID types.HexBytes) ([]types.TopicType, error) {
filters, err := m.GetCommunityChatsFilters(communityID)
if err != nil {
return nil, err
}
topics := []types.TopicType{}
for _, filter := range filters {
topics = append(topics, filter.ContentTopic)
}
return topics, nil
func (m *TorrentManager) archiveIndexFile(communityID string) string {
return path.Join(m.torrentConfig.DataDir, communityID, "index")
}
func (m *TorrentManager) GetCommunityChatsFilters(communityID types.HexBytes) ([]*transport.Filter, error) {
chatIDs, err := m.persistence.GetCommunityChatIDs(communityID)
if err != nil {
return nil, err
}
filters := []*transport.Filter{}
for _, cid := range chatIDs {
filters = append(filters, m.transport.FilterByChatID(cid))
}
return filters, nil
func (m *TorrentManager) archiveDataFile(communityID string) string {
return path.Join(m.torrentConfig.DataDir, communityID, "data")
}
func (m *TorrentManager) createWakuMessageArchive(from time.Time, to time.Time, messages []types.Message, topics [][]byte) *protobuf.WakuMessageArchive {
var wakuMessages []*protobuf.WakuMessage
for _, msg := range messages {
topic := types.TopicTypeToByteArray(msg.Topic)
wakuMessage := &protobuf.WakuMessage{
Sig: msg.Sig,
Timestamp: uint64(msg.Timestamp),
Topic: topic,
Payload: msg.Payload,
Padding: msg.Padding,
Hash: msg.Hash,
ThirdPartyId: msg.ThirdPartyID,
}
wakuMessages = append(wakuMessages, wakuMessage)
func topicsAsByteArrays(topics []types.TopicType) [][]byte {
var topicsAsByteArrays [][]byte
for _, t := range topics {
topic := types.TopicTypeToByteArray(t)
topicsAsByteArrays = append(topicsAsByteArrays, topic)
}
metadata := protobuf.WakuMessageArchiveMetadata{
From: uint64(from.Unix()),
To: uint64(to.Unix()),
ContentTopic: topics,
}
wakuMessageArchive := &protobuf.WakuMessageArchive{
Metadata: &metadata,
Messages: wakuMessages,
}
return wakuMessageArchive
return topicsAsByteArrays
}
func findIndexFile(files []*torrent.File) (index int, ok bool) {