From 9b458c63e0ddf031ca06cd61dd499229a8f1479f Mon Sep 17 00:00:00 2001 From: Samuel Hawksby-Robinson Date: Fri, 31 May 2024 15:20:36 +0100 Subject: [PATCH] chore(no-torrent)_: I've fully split Manager from TorrentManager I've removed any mention or dependency of TorrentManager from Manager. There is still more work to do, but Messenger now communicates directly with a TorrentManager rather than asking the communities Manager to handle it. I've ensured that LogStdout() is only called from TorrentManager and removed entirely from Manager, this functionality seems to be some kind of debug tool specifically for torrent related functionality. Next I need to focus on functions within Messenger that call a TorrentManager and see how to isolate these from the main flows, following that I also need fix the tests that are broken. I will also need to refactor torrentClientReady() so that it is a function of TorrentManager, this may allow for pushing more functions into TorrentManager which will lead to better torrent encapsulation. --- protocol/communities/manager.go | 26 ++--- protocol/communities/torrent_manager.go | 17 ++- ...nities_messenger_token_permissions_test.go | 12 +- protocol/messenger.go | 24 ++-- protocol/messenger_communities.go | 108 +++++++++--------- .../messenger_communities_import_discord.go | 36 +++--- protocol/messenger_handler.go | 32 +++--- services/ext/signal.go | 12 +- 8 files changed, 134 insertions(+), 133 deletions(-) diff --git a/protocol/communities/manager.go b/protocol/communities/manager.go index cb63fc63a..1438f678f 100644 --- a/protocol/communities/manager.go +++ b/protocol/communities/manager.go @@ -94,11 +94,9 @@ type Manager struct { tokenManager TokenManager collectiblesManager CollectiblesManager logger *zap.Logger - stdoutLogger *zap.Logger transport *transport.Transport timesource common.TimeSource quit chan struct{} - torrentManager *TorrentManager walletConfig *params.WalletConfig communityTokensService CommunityTokensServiceInterface membersReevaluationTasks sync.Map // stores `membersReevaluationTask` @@ -318,7 +316,7 @@ type OwnerVerifier interface { SafeGetSignerPubKey(ctx context.Context, chainID uint64, communityID string) (string, error) } -func NewManager(identity *ecdsa.PrivateKey, installationID string, db *sql.DB, encryptor *encryption.Protocol, logger *zap.Logger, ensverifier *ens.Verifier, ownerVerifier OwnerVerifier, transport *transport.Transport, timesource common.TimeSource, keyDistributor KeyDistributor, torrentConfig *params.TorrentConfig, opts ...ManagerOption) (*Manager, error) { +func NewManager(identity *ecdsa.PrivateKey, installationID string, db *sql.DB, encryptor *encryption.Protocol, logger *zap.Logger, ensverifier *ens.Verifier, ownerVerifier OwnerVerifier, transport *transport.Transport, timesource common.TimeSource, keyDistributor KeyDistributor, opts ...ManagerOption) (*Manager, error) { if identity == nil { return nil, errors.New("empty identity") } @@ -334,11 +332,6 @@ func NewManager(identity *ecdsa.PrivateKey, installationID string, db *sql.DB, e } } - stdoutLogger, err := zap.NewDevelopment() - if err != nil { - return nil, errors.Wrap(err, "failed to create archive logger") - } - managerConfig := managerOptions{} for _, opt := range opts { opt(&managerConfig) @@ -346,7 +339,6 @@ func NewManager(identity *ecdsa.PrivateKey, installationID string, db *sql.DB, e manager := &Manager{ logger: logger, - stdoutLogger: stdoutLogger, encryptor: encryptor, identity: identity, installationID: installationID, @@ -358,11 +350,10 @@ func NewManager(identity *ecdsa.PrivateKey, installationID string, db *sql.DB, e communityLock: NewCommunityLock(logger), } - persistence := &Persistence{ + manager.persistence = &Persistence{ db: db, recordBundleToCommunity: manager.dbRecordBundleToCommunity, } - manager.persistence = persistence if managerConfig.accountsManager != nil { manager.accountsManager = managerConfig.accountsManager @@ -407,16 +398,9 @@ func NewManager(identity *ecdsa.PrivateKey, installationID string, db *sql.DB, e manager.forceMembersReevaluation = make(map[string]chan struct{}, 10) } - manager.torrentManager = NewTorrentManager(torrentConfig, logger, stdoutLogger, persistence, transport, identity, encryptor, manager) - return manager, nil } -func (m *Manager) LogStdout(msg string, fields ...zap.Field) { - m.stdoutLogger.Info(msg, fields...) - m.logger.Debug(msg, fields...) -} - type Subscription struct { Community *Community CreatingHistoryArchivesSignal *signal.CreatingHistoryArchivesSignal @@ -648,7 +632,6 @@ func (m *Manager) Stop() error { for _, c := range m.subscriptions { close(c) } - m.torrentManager.StopTorrentClient() return nil } @@ -4865,6 +4848,11 @@ func (m *Manager) decryptCommunityDescription(keyIDSeqNo string, d []byte) (*Dec return decryptCommunityResponse, nil } +// GetPersistence returns the instantiated *Persistence used by the Manager +func (m *Manager) GetPersistence() *Persistence { + return m.persistence +} + func ToLinkPreveiwThumbnail(image images.IdentityImage) (*common.LinkPreviewThumbnail, error) { thumbnail := &common.LinkPreviewThumbnail{} diff --git a/protocol/communities/torrent_manager.go b/protocol/communities/torrent_manager.go index 3d14d1960..96454b36a 100644 --- a/protocol/communities/torrent_manager.go +++ b/protocol/communities/torrent_manager.go @@ -2,6 +2,7 @@ package communities import ( "crypto/ecdsa" + "errors" "fmt" "net" "os" @@ -14,7 +15,6 @@ import ( "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/metainfo" "github.com/golang/protobuf/proto" - "github.com/pkg/errors" "go.uber.org/zap" "github.com/status-im/status-go/eth-node/crypto" @@ -76,7 +76,12 @@ type TorrentManager struct { publisher Publisher } -func NewTorrentManager(torrentConfig *params.TorrentConfig, logger, stdoutLogger *zap.Logger, persistence *Persistence, transport *transport.Transport, identity *ecdsa.PrivateKey, encryptor *encryption.Protocol, publisher Publisher) *TorrentManager { +func NewTorrentManager(torrentConfig *params.TorrentConfig, logger *zap.Logger, persistence *Persistence, transport *transport.Transport, identity *ecdsa.PrivateKey, encryptor *encryption.Protocol, publisher Publisher) (*TorrentManager, error) { + stdoutLogger, err := zap.NewDevelopment() + if err != nil { + return nil, fmt.Errorf("failed to create archive logger %w", err) + } + return &TorrentManager{ torrentConfig: torrentConfig, torrentTasks: make(map[string]metainfo.Hash), @@ -91,7 +96,7 @@ func NewTorrentManager(torrentConfig *params.TorrentConfig, logger, stdoutLogger encryptor: encryptor, publisher: publisher, - } + }, nil } // LogStdout is copied directly from Manager, consider a refactor @@ -203,17 +208,17 @@ func (m *TorrentManager) StartTorrentClient() error { return nil } -func (m *TorrentManager) StopTorrentClient() []error { +func (m *TorrentManager) StopTorrentClient() error { if m.TorrentClientStarted() { m.StopHistoryArchiveTasksIntervals() m.logger.Info("Stopping torrent client") errs := m.torrentClient.Close() if len(errs) > 0 { - return errs + return errors.Join(errs...) } m.torrentClient = nil } - return make([]error, 0) + return nil } func (m *TorrentManager) TorrentClientStarted() bool { diff --git a/protocol/communities_messenger_token_permissions_test.go b/protocol/communities_messenger_token_permissions_test.go index 638931c79..53d14702a 100644 --- a/protocol/communities_messenger_token_permissions_test.go +++ b/protocol/communities_messenger_token_permissions_test.go @@ -2265,12 +2265,12 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestImportDecryptedArchiveMe } // Share archive directory between all users - s.owner.communitiesManager.SetTorrentConfig(&torrentConfig) - s.bob.communitiesManager.SetTorrentConfig(&torrentConfig) + s.owner.torrentManager.SetTorrentConfig(&torrentConfig) + s.bob.torrentManager.SetTorrentConfig(&torrentConfig) s.owner.config.messengerSignalsHandler = &MessengerSignalsHandlerMock{} s.bob.config.messengerSignalsHandler = &MessengerSignalsHandlerMock{} - archiveIDs, err := s.owner.communitiesManager.CreateHistoryArchiveTorrentFromDB(community.ID(), topics, startDate, endDate, partition, community.Encrypted()) + archiveIDs, err := s.owner.torrentManager.CreateHistoryArchiveTorrentFromDB(community.ID(), topics, startDate, endDate, partition, community.Encrypted()) s.Require().NoError(err) s.Require().Len(archiveIDs, 1) @@ -2302,12 +2302,12 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestImportDecryptedArchiveMe // https://github.com/status-im/status-go/blob/6c82a6c2be7ebed93bcae3b9cf5053da3820de50/protocol/communities/manager.go#L4403 // Ensure owner has archive - archiveIndex, err := s.owner.communitiesManager.LoadHistoryArchiveIndexFromFile(s.owner.identity, community.ID()) + archiveIndex, err := s.owner.torrentManager.LoadHistoryArchiveIndexFromFile(s.owner.identity, community.ID()) s.Require().NoError(err) s.Require().Len(archiveIndex.Archives, 1) // Ensure bob has archive (because they share same local directory) - archiveIndex, err = s.bob.communitiesManager.LoadHistoryArchiveIndexFromFile(s.bob.identity, community.ID()) + archiveIndex, err = s.bob.torrentManager.LoadHistoryArchiveIndexFromFile(s.bob.identity, community.ID()) s.Require().NoError(err) s.Require().Len(archiveIndex.Archives, 1) @@ -2315,7 +2315,7 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestImportDecryptedArchiveMe // Save message archive ID as in // https://github.com/status-im/status-go/blob/6c82a6c2be7ebed93bcae3b9cf5053da3820de50/protocol/communities/manager.go#L4325-L4336 - err = s.bob.communitiesManager.SaveMessageArchiveID(community.ID(), archiveHash) + err = s.bob.torrentManager.SaveMessageArchiveID(community.ID(), archiveHash) s.Require().NoError(err) // Import archive diff --git a/protocol/messenger.go b/protocol/messenger.go index edcb397ec..05165e830 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -113,6 +113,7 @@ type Messenger struct { pushNotificationClient *pushnotificationclient.Client pushNotificationServer *pushnotificationserver.Server communitiesManager *communities.Manager + torrentManager *communities.TorrentManager communitiesKeyDistributor communities.KeyDistributor accountsManager account.Manager mentionsManager *MentionManager @@ -492,7 +493,12 @@ func NewMessenger( encryptor: encryptionProtocol, } - communitiesManager, err := communities.NewManager(identity, installationID, database, encryptionProtocol, logger, ensVerifier, c.communityTokensService, transp, transp, communitiesKeyDistributor, c.torrentConfig, managerOptions...) + communitiesManager, err := communities.NewManager(identity, installationID, database, encryptionProtocol, logger, ensVerifier, c.communityTokensService, transp, transp, communitiesKeyDistributor, managerOptions...) + if err != nil { + return nil, err + } + + torrentManager, err := communities.NewTorrentManager(c.torrentConfig, logger, communitiesManager.GetPersistence(), transp, identity, encryptionProtocol, communitiesManager) if err != nil { return nil, err } @@ -527,6 +533,7 @@ func NewMessenger( pushNotificationServer: pushNotificationServer, communitiesManager: communitiesManager, communitiesKeyDistributor: communitiesKeyDistributor, + torrentManager: torrentManager, accountsManager: c.accountsManager, ensVerifier: ensVerifier, featureFlags: c.featureFlags, @@ -572,6 +579,7 @@ func NewMessenger( ensVerifier.Stop, pushNotificationClient.Stop, communitiesManager.Stop, + torrentManager.StopTorrentClient, encryptionProtocol.Stop, func() error { ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -911,8 +919,8 @@ func (m *Messenger) handleConnectionChange(online bool) { } // Update Communities manager - if m.communitiesManager != nil { - m.communitiesManager.SetOnline(online) + if m.torrentManager != nil { + m.torrentManager.SetOnline(online) } // Publish contact code @@ -3735,11 +3743,11 @@ func (m *Messenger) handleImportedMessages(messagesToHandle map[transport.Filter importMessagesToSave := messageState.Response.DiscordMessages() if len(importMessagesToSave) > 0 { - m.communitiesManager.LogStdout(fmt.Sprintf("saving %d discord messages", len(importMessagesToSave))) + m.torrentManager.LogStdout(fmt.Sprintf("saving %d discord messages", len(importMessagesToSave))) m.handleImportMessagesMutex.Lock() err := m.persistence.SaveDiscordMessages(importMessagesToSave) if err != nil { - m.communitiesManager.LogStdout("failed to save discord messages", zap.Error(err)) + m.torrentManager.LogStdout("failed to save discord messages", zap.Error(err)) m.handleImportMessagesMutex.Unlock() return err } @@ -3748,11 +3756,11 @@ func (m *Messenger) handleImportedMessages(messagesToHandle map[transport.Filter messageAttachmentsToSave := messageState.Response.DiscordMessageAttachments() if len(messageAttachmentsToSave) > 0 { - m.communitiesManager.LogStdout(fmt.Sprintf("saving %d discord message attachments", len(messageAttachmentsToSave))) + m.torrentManager.LogStdout(fmt.Sprintf("saving %d discord message attachments", len(messageAttachmentsToSave))) m.handleImportMessagesMutex.Lock() err := m.persistence.SaveDiscordMessageAttachments(messageAttachmentsToSave) if err != nil { - m.communitiesManager.LogStdout("failed to save discord message attachments", zap.Error(err)) + m.torrentManager.LogStdout("failed to save discord message attachments", zap.Error(err)) m.handleImportMessagesMutex.Unlock() return err } @@ -3761,7 +3769,7 @@ func (m *Messenger) handleImportedMessages(messagesToHandle map[transport.Filter messagesToSave := messageState.Response.Messages() if len(messagesToSave) > 0 { - m.communitiesManager.LogStdout(fmt.Sprintf("saving %d app messages", len(messagesToSave))) + m.torrentManager.LogStdout(fmt.Sprintf("saving %d app messages", len(messagesToSave))) m.handleMessagesMutex.Lock() err := m.SaveMessages(messagesToSave) if err != nil { diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index c2d60676c..437fab2f8 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -1895,8 +1895,8 @@ func (m *Messenger) acceptRequestToJoinCommunity(requestToJoin *communities.Requ Shard: community.Shard().Protobuffer(), } - if m.torrentClientReady() && m.communitiesManager.TorrentFileExists(community.IDString()) { - magnetlink, err := m.communitiesManager.GetHistoryArchiveMagnetlink(community.ID()) + if m.torrentClientReady() && m.torrentManager.TorrentFileExists(community.IDString()) { + magnetlink, err := m.torrentManager.GetHistoryArchiveMagnetlink(community.ID()) if err != nil { m.logger.Warn("couldn't get magnet link for community", zap.Error(err)) return nil, err @@ -2082,7 +2082,7 @@ func (m *Messenger) LeaveCommunity(communityID types.HexBytes) (*MessengerRespon return nil, err } - m.communitiesManager.StopHistoryArchiveTasksInterval(communityID) + m.torrentManager.StopHistoryArchiveTasksInterval(communityID) err = m.syncCommunity(context.Background(), community, m.dispatchMessage) if err != nil { @@ -2464,7 +2464,7 @@ func (m *Messenger) CreateCommunity(request *requests.CreateCommunity, createDef } if m.config.torrentConfig != nil && m.config.torrentConfig.Enabled && communitySettings.HistoryArchiveSupportEnabled { - go m.communitiesManager.StartHistoryArchiveTasksInterval(community, messageArchiveInterval) + go m.torrentManager.StartHistoryArchiveTasksInterval(community, messageArchiveInterval) } return response, nil @@ -2806,8 +2806,8 @@ func (m *Messenger) EditCommunity(request *requests.EditCommunity) (*MessengerRe if m.torrentClientReady() { if !communitySettings.HistoryArchiveSupportEnabled { - m.communitiesManager.StopHistoryArchiveTasksInterval(id) - } else if !m.communitiesManager.IsSeedingHistoryArchiveTorrent(id) { + m.torrentManager.StopHistoryArchiveTasksInterval(id) + } else if !m.torrentManager.IsSeedingHistoryArchiveTorrent(id) { var communities []*communities.Community communities = append(communities, community) go m.InitHistoryArchiveTasks(communities) @@ -3779,38 +3779,38 @@ func (m *Messenger) HandleSyncCommunitySettings(messageState *ReceivedMessageSta func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community) { - m.communitiesManager.LogStdout("initializing history archive tasks") + m.torrentManager.LogStdout("initializing history archive tasks") for _, c := range communities { if c.Joined() { settings, err := m.communitiesManager.GetCommunitySettingsByID(c.ID()) if err != nil { - m.communitiesManager.LogStdout("failed to get community settings", zap.Error(err)) + m.torrentManager.LogStdout("failed to get community settings", zap.Error(err)) continue } if !settings.HistoryArchiveSupportEnabled { - m.communitiesManager.LogStdout("history archive support disabled for community", zap.String("id", c.IDString())) + m.torrentManager.LogStdout("history archive support disabled for community", zap.String("id", c.IDString())) continue } // Check if there's already a torrent file for this community and seed it - if m.communitiesManager.TorrentFileExists(c.IDString()) { - err = m.communitiesManager.SeedHistoryArchiveTorrent(c.ID()) + if m.torrentManager.TorrentFileExists(c.IDString()) { + err = m.torrentManager.SeedHistoryArchiveTorrent(c.ID()) if err != nil { - m.communitiesManager.LogStdout("failed to seed history archive", zap.Error(err)) + m.torrentManager.LogStdout("failed to seed history archive", zap.Error(err)) } } - filters, err := m.communitiesManager.GetCommunityChatsFilters(c.ID()) + filters, err := m.torrentManager.GetCommunityChatsFilters(c.ID()) if err != nil { - m.communitiesManager.LogStdout("failed to get community chats filters for community", zap.Error(err)) + m.torrentManager.LogStdout("failed to get community chats filters for community", zap.Error(err)) continue } if len(filters) == 0 { - m.communitiesManager.LogStdout("no filters or chats for this community starting interval", zap.String("id", c.IDString())) - go m.communitiesManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval) + m.torrentManager.LogStdout("no filters or chats for this community starting interval", zap.String("id", c.IDString())) + go m.torrentManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval) continue } @@ -3825,7 +3825,7 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community // possibly missed since then latestWakuMessageTimestamp, err := m.communitiesManager.GetLatestWakuMessageTimestamp(topics) if err != nil { - m.communitiesManager.LogStdout("failed to get Latest waku message timestamp", zap.Error(err)) + m.torrentManager.LogStdout("failed to get Latest waku message timestamp", zap.Error(err)) continue } @@ -3843,16 +3843,16 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community ms := m.getActiveMailserver(c.ID().String()) _, err = m.syncFiltersFrom(*ms, filters, uint32(latestWakuMessageTimestamp)) if err != nil { - m.communitiesManager.LogStdout("failed to request missing messages", zap.Error(err)) + m.torrentManager.LogStdout("failed to request missing messages", zap.Error(err)) continue } // We figure out the end date of the last created archive and schedule // the interval for creating future archives // If the last end date is at least `interval` ago, we create an archive immediately first - lastArchiveEndDateTimestamp, err := m.communitiesManager.GetHistoryArchivePartitionStartTimestamp(c.ID()) + lastArchiveEndDateTimestamp, err := m.torrentManager.GetHistoryArchivePartitionStartTimestamp(c.ID()) if err != nil { - m.communitiesManager.LogStdout("failed to get archive partition start timestamp", zap.Error(err)) + m.torrentManager.LogStdout("failed to get archive partition start timestamp", zap.Error(err)) continue } @@ -3863,35 +3863,35 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community if lastArchiveEndDateTimestamp == 0 { // No prior messages to be archived, so we just kick off the archive creation loop // for future archives - go m.communitiesManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval) + go m.torrentManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval) } else if durationSinceLastArchive < messageArchiveInterval { // Last archive is less than `interval` old, wait until `interval` is complete, // then create archive and kick off archive creation loop for future archives // Seed current archive in the meantime - err := m.communitiesManager.SeedHistoryArchiveTorrent(c.ID()) + err := m.torrentManager.SeedHistoryArchiveTorrent(c.ID()) if err != nil { - m.communitiesManager.LogStdout("failed to seed history archive", zap.Error(err)) + m.torrentManager.LogStdout("failed to seed history archive", zap.Error(err)) } timeToNextInterval := messageArchiveInterval - durationSinceLastArchive - m.communitiesManager.LogStdout("starting history archive tasks interval in", zap.Any("timeLeft", timeToNextInterval)) + m.torrentManager.LogStdout("starting history archive tasks interval in", zap.Any("timeLeft", timeToNextInterval)) time.AfterFunc(timeToNextInterval, func() { - err := m.communitiesManager.CreateAndSeedHistoryArchive(c.ID(), topics, lastArchiveEndDate, to.Add(timeToNextInterval), messageArchiveInterval, c.Encrypted()) + err := m.torrentManager.CreateAndSeedHistoryArchive(c.ID(), topics, lastArchiveEndDate, to.Add(timeToNextInterval), messageArchiveInterval, c.Encrypted()) if err != nil { - m.communitiesManager.LogStdout("failed to get create and seed history archive", zap.Error(err)) + m.torrentManager.LogStdout("failed to get create and seed history archive", zap.Error(err)) } - go m.communitiesManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval) + go m.torrentManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval) }) } else { // Looks like the last archive was generated more than `interval` // ago, so lets create a new archive now and then schedule the archive // creation loop - err := m.communitiesManager.CreateAndSeedHistoryArchive(c.ID(), topics, lastArchiveEndDate, to, messageArchiveInterval, c.Encrypted()) + err := m.torrentManager.CreateAndSeedHistoryArchive(c.ID(), topics, lastArchiveEndDate, to, messageArchiveInterval, c.Encrypted()) if err != nil { - m.communitiesManager.LogStdout("failed to get create and seed history archive", zap.Error(err)) + m.torrentManager.LogStdout("failed to get create and seed history archive", zap.Error(err)) } - go m.communitiesManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval) + go m.torrentManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval) } } } @@ -3909,12 +3909,12 @@ func (m *Messenger) enableHistoryArchivesImportAfterDelay() { func (m *Messenger) checkIfIMemberOfCommunity(communityID types.HexBytes) error { community, err := m.communitiesManager.GetByID(communityID) if err != nil { - m.communitiesManager.LogStdout("couldn't get community to import archives", zap.Error(err)) + m.torrentManager.LogStdout("couldn't get community to import archives", zap.Error(err)) return err } if !community.HasMember(&m.identity.PublicKey) { - m.communitiesManager.LogStdout("can't import archives when user not a member of community") + m.torrentManager.LogStdout("can't import archives when user not a member of community") return ErrUserNotMember } @@ -3922,7 +3922,7 @@ func (m *Messenger) checkIfIMemberOfCommunity(communityID types.HexBytes) error } func (m *Messenger) resumeHistoryArchivesImport(communityID types.HexBytes) error { - archiveIDsToImport, err := m.communitiesManager.GetMessageArchiveIDsToImport(communityID) + archiveIDsToImport, err := m.torrentManager.GetMessageArchiveIDsToImport(communityID) if err != nil { return err } @@ -3936,7 +3936,7 @@ func (m *Messenger) resumeHistoryArchivesImport(communityID types.HexBytes) erro return err } - currentTask := m.communitiesManager.GetHistoryArchiveDownloadTask(communityID.String()) + currentTask := m.torrentManager.GetHistoryArchiveDownloadTask(communityID.String()) // no need to resume imports if there's already a task ongoing if currentTask != nil { return nil @@ -3949,7 +3949,7 @@ func (m *Messenger) resumeHistoryArchivesImport(communityID types.HexBytes) erro Cancelled: false, } - m.communitiesManager.AddHistoryArchiveDownloadTask(communityID.String(), task) + m.torrentManager.AddHistoryArchiveDownloadTask(communityID.String(), task) // this wait groups tracks the ongoing task for a particular community task.Waiter.Add(1) @@ -3958,7 +3958,7 @@ func (m *Messenger) resumeHistoryArchivesImport(communityID types.HexBytes) erro defer task.Waiter.Done() err := m.importHistoryArchives(communityID, task.CancelChan) if err != nil { - m.communitiesManager.LogStdout("failed to import history archives", zap.Error(err)) + m.torrentManager.LogStdout("failed to import history archives", zap.Error(err)) } m.config.messengerSignalsHandler.DownloadingHistoryArchivesFinished(types.EncodeHex(communityID)) }() @@ -3997,7 +3997,7 @@ importMessageArchivesLoop: if delayImport { select { case <-ctx.Done(): - m.communitiesManager.LogStdout("interrupted importing history archive messages") + m.torrentManager.LogStdout("interrupted importing history archive messages") return nil case <-time.After(1 * time.Hour): delayImport = false @@ -4006,31 +4006,31 @@ importMessageArchivesLoop: select { case <-ctx.Done(): - m.communitiesManager.LogStdout("interrupted importing history archive messages") + m.torrentManager.LogStdout("interrupted importing history archive messages") return nil case <-importTicker.C: err := m.checkIfIMemberOfCommunity(communityID) if err != nil { break importMessageArchivesLoop } - archiveIDsToImport, err := m.communitiesManager.GetMessageArchiveIDsToImport(communityID) + archiveIDsToImport, err := m.torrentManager.GetMessageArchiveIDsToImport(communityID) if err != nil { - m.communitiesManager.LogStdout("couldn't get message archive IDs to import", zap.Error(err)) + m.torrentManager.LogStdout("couldn't get message archive IDs to import", zap.Error(err)) return err } if len(archiveIDsToImport) == 0 { - m.communitiesManager.LogStdout("no message archives to import") + m.torrentManager.LogStdout("no message archives to import") break importMessageArchivesLoop } - m.communitiesManager.LogStdout("importing message archive", zap.Int("left", len(archiveIDsToImport))) + m.torrentManager.LogStdout("importing message archive", zap.Int("left", len(archiveIDsToImport))) // only process one archive at a time, so in case of cancel we don't // wait for all archives to be processed first downloadedArchiveID := archiveIDsToImport[0] - archiveMessages, err := m.communitiesManager.ExtractMessagesFromHistoryArchive(communityID, downloadedArchiveID) + archiveMessages, err := m.torrentManager.ExtractMessagesFromHistoryArchive(communityID, downloadedArchiveID) if err != nil { if errors.Is(err, encryption.ErrHashRatchetGroupIDNotFound) { // In case we're missing hash ratchet keys, best we can do is @@ -4038,7 +4038,7 @@ importMessageArchivesLoop: delayImport = true continue } - m.communitiesManager.LogStdout("failed to extract history archive messages", zap.Error(err)) + m.torrentManager.LogStdout("failed to extract history archive messages", zap.Error(err)) continue } @@ -4047,14 +4047,14 @@ importMessageArchivesLoop: for _, messagesChunk := range chunkSlice(archiveMessages, importMessagesChunkSize) { if err := m.importRateLimiter.Wait(ctx); err != nil { if !errors.Is(err, context.Canceled) { - m.communitiesManager.LogStdout("rate limiter error when handling archive messages", zap.Error(err)) + m.torrentManager.LogStdout("rate limiter error when handling archive messages", zap.Error(err)) } continue importMessageArchivesLoop } response, err := m.handleArchiveMessages(messagesChunk) if err != nil { - m.communitiesManager.LogStdout("failed to handle archive messages", zap.Error(err)) + m.torrentManager.LogStdout("failed to handle archive messages", zap.Error(err)) continue importMessageArchivesLoop } @@ -4066,9 +4066,9 @@ importMessageArchivesLoop: } } - err = m.communitiesManager.SetMessageArchiveIDImported(communityID, downloadedArchiveID, true) + err = m.torrentManager.SetMessageArchiveIDImported(communityID, downloadedArchiveID, true) if err != nil { - m.communitiesManager.LogStdout("failed to mark history message archive as imported", zap.Error(err)) + m.torrentManager.LogStdout("failed to mark history message archive as imported", zap.Error(err)) continue } } @@ -4083,7 +4083,7 @@ func (m *Messenger) dispatchMagnetlinkMessage(communityID string) error { return err } - magnetlink, err := m.communitiesManager.GetHistoryArchiveMagnetlink(community.ID()) + magnetlink, err := m.torrentManager.GetHistoryArchiveMagnetlink(community.ID()) if err != nil { return err } @@ -4137,8 +4137,8 @@ func (m *Messenger) EnableCommunityHistoryArchiveProtocol() error { } m.config.torrentConfig = &nodeConfig.TorrentConfig - m.communitiesManager.SetTorrentConfig(&nodeConfig.TorrentConfig) - err = m.communitiesManager.StartTorrentClient() + m.torrentManager.SetTorrentConfig(&nodeConfig.TorrentConfig) + err = m.torrentManager.StartTorrentClient() if err != nil { return err } @@ -4167,12 +4167,12 @@ func (m *Messenger) DisableCommunityHistoryArchiveProtocol() error { return nil } - m.communitiesManager.StopTorrentClient() + m.torrentManager.StopTorrentClient() nodeConfig.TorrentConfig.Enabled = false err = m.settings.SaveSetting("node-config", nodeConfig) m.config.torrentConfig = &nodeConfig.TorrentConfig - m.communitiesManager.SetTorrentConfig(&nodeConfig.TorrentConfig) + m.torrentManager.SetTorrentConfig(&nodeConfig.TorrentConfig) if err != nil { return err } @@ -4284,7 +4284,7 @@ func (m *Messenger) torrentClientReady() bool { // be instantiated (for example in case of port conflicts) return m.config.torrentConfig != nil && m.config.torrentConfig.Enabled && - m.communitiesManager.TorrentClientStarted() + m.torrentManager.TorrentClientStarted() } func (m *Messenger) chatMessagesToWakuMessages(chatMessages []*common.Message, c *communities.Community) ([]*types.Message, error) { diff --git a/protocol/messenger_communities_import_discord.go b/protocol/messenger_communities_import_discord.go index f9e0f61d9..66dd5cf4f 100644 --- a/protocol/messenger_communities_import_discord.go +++ b/protocol/messenger_communities_import_discord.go @@ -684,7 +684,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC chunksCount := len(discordMessageChunks) for ii, msgs := range discordMessageChunks { - m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord messages", ii+1, chunksCount, len(msgs))) + m.torrentManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord messages", ii+1, chunksCount, len(msgs))) err := m.persistence.SaveDiscordMessages(msgs) if err != nil { m.cleanUpImportChannel(request.CommunityID.String(), newChat.ID) @@ -727,7 +727,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC chunksCount = len(messageChunks) for ii, msgs := range messageChunks { - m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d app messages", ii+1, chunksCount, len(msgs))) + m.torrentManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d app messages", ii+1, chunksCount, len(msgs))) err := m.persistence.SaveMessages(msgs) if err != nil { m.cleanUpImportChannel(request.CommunityID.String(), request.DiscordChannelID) @@ -791,7 +791,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC go func(id string, author *protobuf.DiscordMessageAuthor) { defer wg.Done() - m.communitiesManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount)) + m.torrentManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount)) imagePayload, err := discord.DownloadAvatarAsset(author.AvatarUrl) if err != nil { errmsg := fmt.Sprintf("Couldn't download profile avatar '%s': %s", author.AvatarUrl, err.Error()) @@ -845,7 +845,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC defer wg.Done() for ii, attachment := range attachments { - m.communitiesManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount)) + m.torrentManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount)) assetPayload, contentType, err := discord.DownloadAsset(attachment.Url) if err != nil { @@ -889,7 +889,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC chunksCount = len(attachmentChunks) for ii, attachments := range attachmentChunks { - m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord message attachments", ii+1, chunksCount, len(attachments))) + m.torrentManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord message attachments", ii+1, chunksCount, len(attachments))) err := m.persistence.SaveDiscordMessageAttachments(attachments) if err != nil { importProgress.AddTaskError(discord.DownloadAssetsTask, discord.Warning(err.Error())) @@ -944,7 +944,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC wakuMessages := append(wakuChatMessages, wakuPinMessages...) - topics, err := m.communitiesManager.GetCommunityChatsTopics(request.CommunityID) + topics, err := m.torrentManager.GetCommunityChatsTopics(request.CommunityID) if err != nil { m.logger.Error("failed to get community chat topics", zap.Error(err)) continue @@ -953,7 +953,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC startDate := time.Unix(int64(exportData.OldestMessageTimestamp), 0) endDate := time.Now() - _, err = m.communitiesManager.CreateHistoryArchiveTorrentFromMessages( + _, err = m.torrentManager.CreateHistoryArchiveTorrentFromMessages( request.CommunityID, wakuMessages, topics, @@ -973,11 +973,11 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC } if m.torrentClientReady() && communitySettings.HistoryArchiveSupportEnabled { - err = m.communitiesManager.SeedHistoryArchiveTorrent(request.CommunityID) + err = m.torrentManager.SeedHistoryArchiveTorrent(request.CommunityID) if err != nil { m.logger.Error("failed to seed history archive", zap.Error(err)) } - go m.communitiesManager.StartHistoryArchiveTasksInterval(community, messageArchiveInterval) + go m.torrentManager.StartHistoryArchiveTasksInterval(community, messageArchiveInterval) } } @@ -1460,7 +1460,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor chunksCount := len(discordMessageChunks) for ii, msgs := range discordMessageChunks { - m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord messages", ii+1, chunksCount, len(msgs))) + m.torrentManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord messages", ii+1, chunksCount, len(msgs))) err = m.persistence.SaveDiscordMessages(msgs) if err != nil { m.cleanUpImport(communityID) @@ -1503,7 +1503,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor chunksCount = len(messageChunks) for ii, msgs := range messageChunks { - m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d app messages", ii+1, chunksCount, len(msgs))) + m.torrentManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d app messages", ii+1, chunksCount, len(msgs))) err = m.persistence.SaveMessages(msgs) if err != nil { m.cleanUpImport(communityID) @@ -1563,7 +1563,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor go func(id string, author *protobuf.DiscordMessageAuthor) { defer wg.Done() - m.communitiesManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount)) + m.torrentManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount)) imagePayload, err := discord.DownloadAvatarAsset(author.AvatarUrl) if err != nil { errmsg := fmt.Sprintf("Couldn't download profile avatar '%s': %s", author.AvatarUrl, err.Error()) @@ -1615,7 +1615,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor defer wg.Done() for ii, attachment := range attachments { - m.communitiesManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount)) + m.torrentManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount)) assetPayload, contentType, err := discord.DownloadAsset(attachment.Url) if err != nil { @@ -1659,7 +1659,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor chunksCount = len(attachmentChunks) for ii, attachments := range attachmentChunks { - m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord message attachments", ii+1, chunksCount, len(attachments))) + m.torrentManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord message attachments", ii+1, chunksCount, len(attachments))) err = m.persistence.SaveDiscordMessageAttachments(attachments) if err != nil { m.cleanUpImport(communityID) @@ -1714,7 +1714,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor wakuMessages := append(wakuChatMessages, wakuPinMessages...) - topics, err := m.communitiesManager.GetCommunityChatsTopics(discordCommunity.ID()) + topics, err := m.torrentManager.GetCommunityChatsTopics(discordCommunity.ID()) if err != nil { m.logger.Error("failed to get community chat topics", zap.Error(err)) continue @@ -1723,7 +1723,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor startDate := time.Unix(int64(exportData.OldestMessageTimestamp), 0) endDate := time.Now() - _, err = m.communitiesManager.CreateHistoryArchiveTorrentFromMessages( + _, err = m.torrentManager.CreateHistoryArchiveTorrentFromMessages( discordCommunity.ID(), wakuMessages, topics, @@ -1739,11 +1739,11 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor if m.torrentClientReady() && communitySettings.HistoryArchiveSupportEnabled { - err = m.communitiesManager.SeedHistoryArchiveTorrent(discordCommunity.ID()) + err = m.torrentManager.SeedHistoryArchiveTorrent(discordCommunity.ID()) if err != nil { m.logger.Error("failed to seed history archive", zap.Error(err)) } - go m.communitiesManager.StartHistoryArchiveTasksInterval(discordCommunity, messageArchiveInterval) + go m.torrentManager.StartHistoryArchiveTasksInterval(discordCommunity, messageArchiveInterval) } } diff --git a/protocol/messenger_handler.go b/protocol/messenger_handler.go index dcbadc1f1..fc313b3af 100644 --- a/protocol/messenger_handler.go +++ b/protocol/messenger_handler.go @@ -1356,12 +1356,12 @@ func (m *Messenger) HandleHistoryArchiveMagnetlinkMessage(state *ReceivedMessage // part of and doesn't own the private key at the same time if !community.IsControlNode() && community.Joined() && clock >= lastClock { if lastSeenMagnetlink == magnetlink { - m.communitiesManager.LogStdout("already processed this magnetlink") + m.torrentManager.LogStdout("already processed this magnetlink") return nil } - m.communitiesManager.UnseedHistoryArchiveTorrent(id) - currentTask := m.communitiesManager.GetHistoryArchiveDownloadTask(id.String()) + m.torrentManager.UnseedHistoryArchiveTorrent(id) + currentTask := m.torrentManager.GetHistoryArchiveDownloadTask(id.String()) go func(currentTask *communities.HistoryArchiveDownloadTask, communityID types.HexBytes) { @@ -1378,7 +1378,7 @@ func (m *Messenger) HandleHistoryArchiveMagnetlinkMessage(state *ReceivedMessage Cancelled: false, } - m.communitiesManager.AddHistoryArchiveDownloadTask(communityID.String(), task) + m.torrentManager.AddHistoryArchiveDownloadTask(communityID.String(), task) // this wait groups tracks the ongoing task for a particular community task.Waiter.Add(1) @@ -1397,32 +1397,32 @@ func (m *Messenger) HandleHistoryArchiveMagnetlinkMessage(state *ReceivedMessage } func (m *Messenger) downloadAndImportHistoryArchives(id types.HexBytes, magnetlink string, cancel chan struct{}) { - downloadTaskInfo, err := m.communitiesManager.DownloadHistoryArchivesByMagnetlink(id, magnetlink, cancel) + downloadTaskInfo, err := m.torrentManager.DownloadHistoryArchivesByMagnetlink(id, magnetlink, cancel) if err != nil { logMsg := "failed to download history archive data" if err == communities.ErrTorrentTimedout { - m.communitiesManager.LogStdout("torrent has timed out, trying once more...") - downloadTaskInfo, err = m.communitiesManager.DownloadHistoryArchivesByMagnetlink(id, magnetlink, cancel) + m.torrentManager.LogStdout("torrent has timed out, trying once more...") + downloadTaskInfo, err = m.torrentManager.DownloadHistoryArchivesByMagnetlink(id, magnetlink, cancel) if err != nil { - m.communitiesManager.LogStdout(logMsg, zap.Error(err)) + m.torrentManager.LogStdout(logMsg, zap.Error(err)) return } } else { - m.communitiesManager.LogStdout(logMsg, zap.Error(err)) + m.torrentManager.LogStdout(logMsg, zap.Error(err)) return } } if downloadTaskInfo.Cancelled { if downloadTaskInfo.TotalDownloadedArchivesCount > 0 { - m.communitiesManager.LogStdout(fmt.Sprintf("downloaded %d of %d archives so far", downloadTaskInfo.TotalDownloadedArchivesCount, downloadTaskInfo.TotalArchivesCount)) + m.torrentManager.LogStdout(fmt.Sprintf("downloaded %d of %d archives so far", downloadTaskInfo.TotalDownloadedArchivesCount, downloadTaskInfo.TotalArchivesCount)) } return } err = m.communitiesManager.UpdateLastSeenMagnetlink(id, magnetlink) if err != nil { - m.communitiesManager.LogStdout("couldn't update last seen magnetlink", zap.Error(err)) + m.torrentManager.LogStdout("couldn't update last seen magnetlink", zap.Error(err)) } err = m.checkIfIMemberOfCommunity(id) @@ -1432,7 +1432,7 @@ func (m *Messenger) downloadAndImportHistoryArchives(id types.HexBytes, magnetli err = m.importHistoryArchives(id, cancel) if err != nil { - m.communitiesManager.LogStdout("failed to import history archives", zap.Error(err)) + m.torrentManager.LogStdout("failed to import history archives", zap.Error(err)) m.config.messengerSignalsHandler.DownloadingHistoryArchivesFinished(types.EncodeHex(id)) return } @@ -1475,13 +1475,13 @@ func (m *Messenger) handleArchiveMessages(archiveMessages []*protobuf.WakuMessag err := m.handleImportedMessages(importedMessages) if err != nil { - m.communitiesManager.LogStdout("failed to handle imported messages", zap.Error(err)) + m.torrentManager.LogStdout("failed to handle imported messages", zap.Error(err)) return nil, err } response, err := m.handleRetrievedMessages(otherMessages, false, true) if err != nil { - m.communitiesManager.LogStdout("failed to write history archive messages to database", zap.Error(err)) + m.torrentManager.LogStdout("failed to write history archive messages to database", zap.Error(err)) return nil, err } @@ -1730,7 +1730,7 @@ func (m *Messenger) HandleCommunityRequestToJoinResponse(state *ReceivedMessageS magnetlink := requestToJoinResponseProto.MagnetUri if m.torrentClientReady() && communitySettings != nil && communitySettings.HistoryArchiveSupportEnabled && magnetlink != "" { - currentTask := m.communitiesManager.GetHistoryArchiveDownloadTask(community.IDString()) + currentTask := m.torrentManager.GetHistoryArchiveDownloadTask(community.IDString()) go func(currentTask *communities.HistoryArchiveDownloadTask) { // Cancel ongoing download/import task @@ -1744,7 +1744,7 @@ func (m *Messenger) HandleCommunityRequestToJoinResponse(state *ReceivedMessageS Waiter: *new(sync.WaitGroup), Cancelled: false, } - m.communitiesManager.AddHistoryArchiveDownloadTask(community.IDString(), task) + m.torrentManager.AddHistoryArchiveDownloadTask(community.IDString(), task) task.Waiter.Add(1) defer task.Waiter.Done() diff --git a/services/ext/signal.go b/services/ext/signal.go index f7c62c6e6..9b0903754 100644 --- a/services/ext/signal.go +++ b/services/ext/signal.go @@ -12,7 +12,7 @@ import ( // EnvelopeSignalHandler sends signals when envelope is sent or expired. type EnvelopeSignalHandler struct{} -// EnvelopeSent triggered when envelope delivered atleast to 1 peer. +// EnvelopeSent triggered when envelope delivered at least to 1 peer. func (h EnvelopeSignalHandler) EnvelopeSent(identifiers [][]byte) { signal.SendEnvelopeSent(identifiers) } @@ -51,21 +51,21 @@ func (h PublisherSignalHandler) Stats(stats types.StatsSummary) { signal.SendStats(stats) } -// MessengerSignalHandler sends signals on messenger events +// MessengerSignalsHandler sends signals on messenger events type MessengerSignalsHandler struct{} // MessageDelivered passes information that message was delivered -func (m MessengerSignalsHandler) MessageDelivered(chatID string, messageID string) { +func (m *MessengerSignalsHandler) MessageDelivered(chatID string, messageID string) { signal.SendMessageDelivered(chatID, messageID) } // BackupPerformed passes information that a backup was performed -func (m MessengerSignalsHandler) BackupPerformed(lastBackup uint64) { +func (m *MessengerSignalsHandler) BackupPerformed(lastBackup uint64) { signal.SendBackupPerformed(lastBackup) } -// MessageDelivered passes info about community that was requested before -func (m MessengerSignalsHandler) CommunityInfoFound(community *communities.Community) { +// CommunityInfoFound passes info about community that was requested before +func (m *MessengerSignalsHandler) CommunityInfoFound(community *communities.Community) { signal.SendCommunityInfoFound(community) }