diff --git a/protocol/communities/manager.go b/protocol/communities/manager.go index cb63fc63a..1438f678f 100644 --- a/protocol/communities/manager.go +++ b/protocol/communities/manager.go @@ -94,11 +94,9 @@ type Manager struct { tokenManager TokenManager collectiblesManager CollectiblesManager logger *zap.Logger - stdoutLogger *zap.Logger transport *transport.Transport timesource common.TimeSource quit chan struct{} - torrentManager *TorrentManager walletConfig *params.WalletConfig communityTokensService CommunityTokensServiceInterface membersReevaluationTasks sync.Map // stores `membersReevaluationTask` @@ -318,7 +316,7 @@ type OwnerVerifier interface { SafeGetSignerPubKey(ctx context.Context, chainID uint64, communityID string) (string, error) } -func NewManager(identity *ecdsa.PrivateKey, installationID string, db *sql.DB, encryptor *encryption.Protocol, logger *zap.Logger, ensverifier *ens.Verifier, ownerVerifier OwnerVerifier, transport *transport.Transport, timesource common.TimeSource, keyDistributor KeyDistributor, torrentConfig *params.TorrentConfig, opts ...ManagerOption) (*Manager, error) { +func NewManager(identity *ecdsa.PrivateKey, installationID string, db *sql.DB, encryptor *encryption.Protocol, logger *zap.Logger, ensverifier *ens.Verifier, ownerVerifier OwnerVerifier, transport *transport.Transport, timesource common.TimeSource, keyDistributor KeyDistributor, opts ...ManagerOption) (*Manager, error) { if identity == nil { return nil, errors.New("empty identity") } @@ -334,11 +332,6 @@ func NewManager(identity *ecdsa.PrivateKey, installationID string, db *sql.DB, e } } - stdoutLogger, err := zap.NewDevelopment() - if err != nil { - return nil, errors.Wrap(err, "failed to create archive logger") - } - managerConfig := managerOptions{} for _, opt := range opts { opt(&managerConfig) @@ -346,7 +339,6 @@ func NewManager(identity *ecdsa.PrivateKey, installationID string, db *sql.DB, e manager := &Manager{ logger: logger, - stdoutLogger: stdoutLogger, encryptor: encryptor, identity: identity, installationID: installationID, @@ -358,11 +350,10 @@ func NewManager(identity *ecdsa.PrivateKey, installationID string, db *sql.DB, e communityLock: NewCommunityLock(logger), } - persistence := &Persistence{ + manager.persistence = &Persistence{ db: db, recordBundleToCommunity: manager.dbRecordBundleToCommunity, } - manager.persistence = persistence if managerConfig.accountsManager != nil { manager.accountsManager = managerConfig.accountsManager @@ -407,16 +398,9 @@ func NewManager(identity *ecdsa.PrivateKey, installationID string, db *sql.DB, e manager.forceMembersReevaluation = make(map[string]chan struct{}, 10) } - manager.torrentManager = NewTorrentManager(torrentConfig, logger, stdoutLogger, persistence, transport, identity, encryptor, manager) - return manager, nil } -func (m *Manager) LogStdout(msg string, fields ...zap.Field) { - m.stdoutLogger.Info(msg, fields...) - m.logger.Debug(msg, fields...) -} - type Subscription struct { Community *Community CreatingHistoryArchivesSignal *signal.CreatingHistoryArchivesSignal @@ -648,7 +632,6 @@ func (m *Manager) Stop() error { for _, c := range m.subscriptions { close(c) } - m.torrentManager.StopTorrentClient() return nil } @@ -4865,6 +4848,11 @@ func (m *Manager) decryptCommunityDescription(keyIDSeqNo string, d []byte) (*Dec return decryptCommunityResponse, nil } +// GetPersistence returns the instantiated *Persistence used by the Manager +func (m *Manager) GetPersistence() *Persistence { + return m.persistence +} + func ToLinkPreveiwThumbnail(image images.IdentityImage) (*common.LinkPreviewThumbnail, error) { thumbnail := &common.LinkPreviewThumbnail{} diff --git a/protocol/communities/torrent_manager.go b/protocol/communities/torrent_manager.go index 3d14d1960..96454b36a 100644 --- a/protocol/communities/torrent_manager.go +++ b/protocol/communities/torrent_manager.go @@ -2,6 +2,7 @@ package communities import ( "crypto/ecdsa" + "errors" "fmt" "net" "os" @@ -14,7 +15,6 @@ import ( "github.com/anacrolix/torrent/bencode" "github.com/anacrolix/torrent/metainfo" "github.com/golang/protobuf/proto" - "github.com/pkg/errors" "go.uber.org/zap" "github.com/status-im/status-go/eth-node/crypto" @@ -76,7 +76,12 @@ type TorrentManager struct { publisher Publisher } -func NewTorrentManager(torrentConfig *params.TorrentConfig, logger, stdoutLogger *zap.Logger, persistence *Persistence, transport *transport.Transport, identity *ecdsa.PrivateKey, encryptor *encryption.Protocol, publisher Publisher) *TorrentManager { +func NewTorrentManager(torrentConfig *params.TorrentConfig, logger *zap.Logger, persistence *Persistence, transport *transport.Transport, identity *ecdsa.PrivateKey, encryptor *encryption.Protocol, publisher Publisher) (*TorrentManager, error) { + stdoutLogger, err := zap.NewDevelopment() + if err != nil { + return nil, fmt.Errorf("failed to create archive logger %w", err) + } + return &TorrentManager{ torrentConfig: torrentConfig, torrentTasks: make(map[string]metainfo.Hash), @@ -91,7 +96,7 @@ func NewTorrentManager(torrentConfig *params.TorrentConfig, logger, stdoutLogger encryptor: encryptor, publisher: publisher, - } + }, nil } // LogStdout is copied directly from Manager, consider a refactor @@ -203,17 +208,17 @@ func (m *TorrentManager) StartTorrentClient() error { return nil } -func (m *TorrentManager) StopTorrentClient() []error { +func (m *TorrentManager) StopTorrentClient() error { if m.TorrentClientStarted() { m.StopHistoryArchiveTasksIntervals() m.logger.Info("Stopping torrent client") errs := m.torrentClient.Close() if len(errs) > 0 { - return errs + return errors.Join(errs...) } m.torrentClient = nil } - return make([]error, 0) + return nil } func (m *TorrentManager) TorrentClientStarted() bool { diff --git a/protocol/communities_messenger_token_permissions_test.go b/protocol/communities_messenger_token_permissions_test.go index 638931c79..53d14702a 100644 --- a/protocol/communities_messenger_token_permissions_test.go +++ b/protocol/communities_messenger_token_permissions_test.go @@ -2265,12 +2265,12 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestImportDecryptedArchiveMe } // Share archive directory between all users - s.owner.communitiesManager.SetTorrentConfig(&torrentConfig) - s.bob.communitiesManager.SetTorrentConfig(&torrentConfig) + s.owner.torrentManager.SetTorrentConfig(&torrentConfig) + s.bob.torrentManager.SetTorrentConfig(&torrentConfig) s.owner.config.messengerSignalsHandler = &MessengerSignalsHandlerMock{} s.bob.config.messengerSignalsHandler = &MessengerSignalsHandlerMock{} - archiveIDs, err := s.owner.communitiesManager.CreateHistoryArchiveTorrentFromDB(community.ID(), topics, startDate, endDate, partition, community.Encrypted()) + archiveIDs, err := s.owner.torrentManager.CreateHistoryArchiveTorrentFromDB(community.ID(), topics, startDate, endDate, partition, community.Encrypted()) s.Require().NoError(err) s.Require().Len(archiveIDs, 1) @@ -2302,12 +2302,12 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestImportDecryptedArchiveMe // https://github.com/status-im/status-go/blob/6c82a6c2be7ebed93bcae3b9cf5053da3820de50/protocol/communities/manager.go#L4403 // Ensure owner has archive - archiveIndex, err := s.owner.communitiesManager.LoadHistoryArchiveIndexFromFile(s.owner.identity, community.ID()) + archiveIndex, err := s.owner.torrentManager.LoadHistoryArchiveIndexFromFile(s.owner.identity, community.ID()) s.Require().NoError(err) s.Require().Len(archiveIndex.Archives, 1) // Ensure bob has archive (because they share same local directory) - archiveIndex, err = s.bob.communitiesManager.LoadHistoryArchiveIndexFromFile(s.bob.identity, community.ID()) + archiveIndex, err = s.bob.torrentManager.LoadHistoryArchiveIndexFromFile(s.bob.identity, community.ID()) s.Require().NoError(err) s.Require().Len(archiveIndex.Archives, 1) @@ -2315,7 +2315,7 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestImportDecryptedArchiveMe // Save message archive ID as in // https://github.com/status-im/status-go/blob/6c82a6c2be7ebed93bcae3b9cf5053da3820de50/protocol/communities/manager.go#L4325-L4336 - err = s.bob.communitiesManager.SaveMessageArchiveID(community.ID(), archiveHash) + err = s.bob.torrentManager.SaveMessageArchiveID(community.ID(), archiveHash) s.Require().NoError(err) // Import archive diff --git a/protocol/messenger.go b/protocol/messenger.go index edcb397ec..05165e830 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -113,6 +113,7 @@ type Messenger struct { pushNotificationClient *pushnotificationclient.Client pushNotificationServer *pushnotificationserver.Server communitiesManager *communities.Manager + torrentManager *communities.TorrentManager communitiesKeyDistributor communities.KeyDistributor accountsManager account.Manager mentionsManager *MentionManager @@ -492,7 +493,12 @@ func NewMessenger( encryptor: encryptionProtocol, } - communitiesManager, err := communities.NewManager(identity, installationID, database, encryptionProtocol, logger, ensVerifier, c.communityTokensService, transp, transp, communitiesKeyDistributor, c.torrentConfig, managerOptions...) + communitiesManager, err := communities.NewManager(identity, installationID, database, encryptionProtocol, logger, ensVerifier, c.communityTokensService, transp, transp, communitiesKeyDistributor, managerOptions...) + if err != nil { + return nil, err + } + + torrentManager, err := communities.NewTorrentManager(c.torrentConfig, logger, communitiesManager.GetPersistence(), transp, identity, encryptionProtocol, communitiesManager) if err != nil { return nil, err } @@ -527,6 +533,7 @@ func NewMessenger( pushNotificationServer: pushNotificationServer, communitiesManager: communitiesManager, communitiesKeyDistributor: communitiesKeyDistributor, + torrentManager: torrentManager, accountsManager: c.accountsManager, ensVerifier: ensVerifier, featureFlags: c.featureFlags, @@ -572,6 +579,7 @@ func NewMessenger( ensVerifier.Stop, pushNotificationClient.Stop, communitiesManager.Stop, + torrentManager.StopTorrentClient, encryptionProtocol.Stop, func() error { ctx, cancel := context.WithTimeout(context.Background(), time.Second) @@ -911,8 +919,8 @@ func (m *Messenger) handleConnectionChange(online bool) { } // Update Communities manager - if m.communitiesManager != nil { - m.communitiesManager.SetOnline(online) + if m.torrentManager != nil { + m.torrentManager.SetOnline(online) } // Publish contact code @@ -3735,11 +3743,11 @@ func (m *Messenger) handleImportedMessages(messagesToHandle map[transport.Filter importMessagesToSave := messageState.Response.DiscordMessages() if len(importMessagesToSave) > 0 { - m.communitiesManager.LogStdout(fmt.Sprintf("saving %d discord messages", len(importMessagesToSave))) + m.torrentManager.LogStdout(fmt.Sprintf("saving %d discord messages", len(importMessagesToSave))) m.handleImportMessagesMutex.Lock() err := m.persistence.SaveDiscordMessages(importMessagesToSave) if err != nil { - m.communitiesManager.LogStdout("failed to save discord messages", zap.Error(err)) + m.torrentManager.LogStdout("failed to save discord messages", zap.Error(err)) m.handleImportMessagesMutex.Unlock() return err } @@ -3748,11 +3756,11 @@ func (m *Messenger) handleImportedMessages(messagesToHandle map[transport.Filter messageAttachmentsToSave := messageState.Response.DiscordMessageAttachments() if len(messageAttachmentsToSave) > 0 { - m.communitiesManager.LogStdout(fmt.Sprintf("saving %d discord message attachments", len(messageAttachmentsToSave))) + m.torrentManager.LogStdout(fmt.Sprintf("saving %d discord message attachments", len(messageAttachmentsToSave))) m.handleImportMessagesMutex.Lock() err := m.persistence.SaveDiscordMessageAttachments(messageAttachmentsToSave) if err != nil { - m.communitiesManager.LogStdout("failed to save discord message attachments", zap.Error(err)) + m.torrentManager.LogStdout("failed to save discord message attachments", zap.Error(err)) m.handleImportMessagesMutex.Unlock() return err } @@ -3761,7 +3769,7 @@ func (m *Messenger) handleImportedMessages(messagesToHandle map[transport.Filter messagesToSave := messageState.Response.Messages() if len(messagesToSave) > 0 { - m.communitiesManager.LogStdout(fmt.Sprintf("saving %d app messages", len(messagesToSave))) + m.torrentManager.LogStdout(fmt.Sprintf("saving %d app messages", len(messagesToSave))) m.handleMessagesMutex.Lock() err := m.SaveMessages(messagesToSave) if err != nil { diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index c2d60676c..437fab2f8 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -1895,8 +1895,8 @@ func (m *Messenger) acceptRequestToJoinCommunity(requestToJoin *communities.Requ Shard: community.Shard().Protobuffer(), } - if m.torrentClientReady() && m.communitiesManager.TorrentFileExists(community.IDString()) { - magnetlink, err := m.communitiesManager.GetHistoryArchiveMagnetlink(community.ID()) + if m.torrentClientReady() && m.torrentManager.TorrentFileExists(community.IDString()) { + magnetlink, err := m.torrentManager.GetHistoryArchiveMagnetlink(community.ID()) if err != nil { m.logger.Warn("couldn't get magnet link for community", zap.Error(err)) return nil, err @@ -2082,7 +2082,7 @@ func (m *Messenger) LeaveCommunity(communityID types.HexBytes) (*MessengerRespon return nil, err } - m.communitiesManager.StopHistoryArchiveTasksInterval(communityID) + m.torrentManager.StopHistoryArchiveTasksInterval(communityID) err = m.syncCommunity(context.Background(), community, m.dispatchMessage) if err != nil { @@ -2464,7 +2464,7 @@ func (m *Messenger) CreateCommunity(request *requests.CreateCommunity, createDef } if m.config.torrentConfig != nil && m.config.torrentConfig.Enabled && communitySettings.HistoryArchiveSupportEnabled { - go m.communitiesManager.StartHistoryArchiveTasksInterval(community, messageArchiveInterval) + go m.torrentManager.StartHistoryArchiveTasksInterval(community, messageArchiveInterval) } return response, nil @@ -2806,8 +2806,8 @@ func (m *Messenger) EditCommunity(request *requests.EditCommunity) (*MessengerRe if m.torrentClientReady() { if !communitySettings.HistoryArchiveSupportEnabled { - m.communitiesManager.StopHistoryArchiveTasksInterval(id) - } else if !m.communitiesManager.IsSeedingHistoryArchiveTorrent(id) { + m.torrentManager.StopHistoryArchiveTasksInterval(id) + } else if !m.torrentManager.IsSeedingHistoryArchiveTorrent(id) { var communities []*communities.Community communities = append(communities, community) go m.InitHistoryArchiveTasks(communities) @@ -3779,38 +3779,38 @@ func (m *Messenger) HandleSyncCommunitySettings(messageState *ReceivedMessageSta func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community) { - m.communitiesManager.LogStdout("initializing history archive tasks") + m.torrentManager.LogStdout("initializing history archive tasks") for _, c := range communities { if c.Joined() { settings, err := m.communitiesManager.GetCommunitySettingsByID(c.ID()) if err != nil { - m.communitiesManager.LogStdout("failed to get community settings", zap.Error(err)) + m.torrentManager.LogStdout("failed to get community settings", zap.Error(err)) continue } if !settings.HistoryArchiveSupportEnabled { - m.communitiesManager.LogStdout("history archive support disabled for community", zap.String("id", c.IDString())) + m.torrentManager.LogStdout("history archive support disabled for community", zap.String("id", c.IDString())) continue } // Check if there's already a torrent file for this community and seed it - if m.communitiesManager.TorrentFileExists(c.IDString()) { - err = m.communitiesManager.SeedHistoryArchiveTorrent(c.ID()) + if m.torrentManager.TorrentFileExists(c.IDString()) { + err = m.torrentManager.SeedHistoryArchiveTorrent(c.ID()) if err != nil { - m.communitiesManager.LogStdout("failed to seed history archive", zap.Error(err)) + m.torrentManager.LogStdout("failed to seed history archive", zap.Error(err)) } } - filters, err := m.communitiesManager.GetCommunityChatsFilters(c.ID()) + filters, err := m.torrentManager.GetCommunityChatsFilters(c.ID()) if err != nil { - m.communitiesManager.LogStdout("failed to get community chats filters for community", zap.Error(err)) + m.torrentManager.LogStdout("failed to get community chats filters for community", zap.Error(err)) continue } if len(filters) == 0 { - m.communitiesManager.LogStdout("no filters or chats for this community starting interval", zap.String("id", c.IDString())) - go m.communitiesManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval) + m.torrentManager.LogStdout("no filters or chats for this community starting interval", zap.String("id", c.IDString())) + go m.torrentManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval) continue } @@ -3825,7 +3825,7 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community // possibly missed since then latestWakuMessageTimestamp, err := m.communitiesManager.GetLatestWakuMessageTimestamp(topics) if err != nil { - m.communitiesManager.LogStdout("failed to get Latest waku message timestamp", zap.Error(err)) + m.torrentManager.LogStdout("failed to get Latest waku message timestamp", zap.Error(err)) continue } @@ -3843,16 +3843,16 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community ms := m.getActiveMailserver(c.ID().String()) _, err = m.syncFiltersFrom(*ms, filters, uint32(latestWakuMessageTimestamp)) if err != nil { - m.communitiesManager.LogStdout("failed to request missing messages", zap.Error(err)) + m.torrentManager.LogStdout("failed to request missing messages", zap.Error(err)) continue } // We figure out the end date of the last created archive and schedule // the interval for creating future archives // If the last end date is at least `interval` ago, we create an archive immediately first - lastArchiveEndDateTimestamp, err := m.communitiesManager.GetHistoryArchivePartitionStartTimestamp(c.ID()) + lastArchiveEndDateTimestamp, err := m.torrentManager.GetHistoryArchivePartitionStartTimestamp(c.ID()) if err != nil { - m.communitiesManager.LogStdout("failed to get archive partition start timestamp", zap.Error(err)) + m.torrentManager.LogStdout("failed to get archive partition start timestamp", zap.Error(err)) continue } @@ -3863,35 +3863,35 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community if lastArchiveEndDateTimestamp == 0 { // No prior messages to be archived, so we just kick off the archive creation loop // for future archives - go m.communitiesManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval) + go m.torrentManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval) } else if durationSinceLastArchive < messageArchiveInterval { // Last archive is less than `interval` old, wait until `interval` is complete, // then create archive and kick off archive creation loop for future archives // Seed current archive in the meantime - err := m.communitiesManager.SeedHistoryArchiveTorrent(c.ID()) + err := m.torrentManager.SeedHistoryArchiveTorrent(c.ID()) if err != nil { - m.communitiesManager.LogStdout("failed to seed history archive", zap.Error(err)) + m.torrentManager.LogStdout("failed to seed history archive", zap.Error(err)) } timeToNextInterval := messageArchiveInterval - durationSinceLastArchive - m.communitiesManager.LogStdout("starting history archive tasks interval in", zap.Any("timeLeft", timeToNextInterval)) + m.torrentManager.LogStdout("starting history archive tasks interval in", zap.Any("timeLeft", timeToNextInterval)) time.AfterFunc(timeToNextInterval, func() { - err := m.communitiesManager.CreateAndSeedHistoryArchive(c.ID(), topics, lastArchiveEndDate, to.Add(timeToNextInterval), messageArchiveInterval, c.Encrypted()) + err := m.torrentManager.CreateAndSeedHistoryArchive(c.ID(), topics, lastArchiveEndDate, to.Add(timeToNextInterval), messageArchiveInterval, c.Encrypted()) if err != nil { - m.communitiesManager.LogStdout("failed to get create and seed history archive", zap.Error(err)) + m.torrentManager.LogStdout("failed to get create and seed history archive", zap.Error(err)) } - go m.communitiesManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval) + go m.torrentManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval) }) } else { // Looks like the last archive was generated more than `interval` // ago, so lets create a new archive now and then schedule the archive // creation loop - err := m.communitiesManager.CreateAndSeedHistoryArchive(c.ID(), topics, lastArchiveEndDate, to, messageArchiveInterval, c.Encrypted()) + err := m.torrentManager.CreateAndSeedHistoryArchive(c.ID(), topics, lastArchiveEndDate, to, messageArchiveInterval, c.Encrypted()) if err != nil { - m.communitiesManager.LogStdout("failed to get create and seed history archive", zap.Error(err)) + m.torrentManager.LogStdout("failed to get create and seed history archive", zap.Error(err)) } - go m.communitiesManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval) + go m.torrentManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval) } } } @@ -3909,12 +3909,12 @@ func (m *Messenger) enableHistoryArchivesImportAfterDelay() { func (m *Messenger) checkIfIMemberOfCommunity(communityID types.HexBytes) error { community, err := m.communitiesManager.GetByID(communityID) if err != nil { - m.communitiesManager.LogStdout("couldn't get community to import archives", zap.Error(err)) + m.torrentManager.LogStdout("couldn't get community to import archives", zap.Error(err)) return err } if !community.HasMember(&m.identity.PublicKey) { - m.communitiesManager.LogStdout("can't import archives when user not a member of community") + m.torrentManager.LogStdout("can't import archives when user not a member of community") return ErrUserNotMember } @@ -3922,7 +3922,7 @@ func (m *Messenger) checkIfIMemberOfCommunity(communityID types.HexBytes) error } func (m *Messenger) resumeHistoryArchivesImport(communityID types.HexBytes) error { - archiveIDsToImport, err := m.communitiesManager.GetMessageArchiveIDsToImport(communityID) + archiveIDsToImport, err := m.torrentManager.GetMessageArchiveIDsToImport(communityID) if err != nil { return err } @@ -3936,7 +3936,7 @@ func (m *Messenger) resumeHistoryArchivesImport(communityID types.HexBytes) erro return err } - currentTask := m.communitiesManager.GetHistoryArchiveDownloadTask(communityID.String()) + currentTask := m.torrentManager.GetHistoryArchiveDownloadTask(communityID.String()) // no need to resume imports if there's already a task ongoing if currentTask != nil { return nil @@ -3949,7 +3949,7 @@ func (m *Messenger) resumeHistoryArchivesImport(communityID types.HexBytes) erro Cancelled: false, } - m.communitiesManager.AddHistoryArchiveDownloadTask(communityID.String(), task) + m.torrentManager.AddHistoryArchiveDownloadTask(communityID.String(), task) // this wait groups tracks the ongoing task for a particular community task.Waiter.Add(1) @@ -3958,7 +3958,7 @@ func (m *Messenger) resumeHistoryArchivesImport(communityID types.HexBytes) erro defer task.Waiter.Done() err := m.importHistoryArchives(communityID, task.CancelChan) if err != nil { - m.communitiesManager.LogStdout("failed to import history archives", zap.Error(err)) + m.torrentManager.LogStdout("failed to import history archives", zap.Error(err)) } m.config.messengerSignalsHandler.DownloadingHistoryArchivesFinished(types.EncodeHex(communityID)) }() @@ -3997,7 +3997,7 @@ importMessageArchivesLoop: if delayImport { select { case <-ctx.Done(): - m.communitiesManager.LogStdout("interrupted importing history archive messages") + m.torrentManager.LogStdout("interrupted importing history archive messages") return nil case <-time.After(1 * time.Hour): delayImport = false @@ -4006,31 +4006,31 @@ importMessageArchivesLoop: select { case <-ctx.Done(): - m.communitiesManager.LogStdout("interrupted importing history archive messages") + m.torrentManager.LogStdout("interrupted importing history archive messages") return nil case <-importTicker.C: err := m.checkIfIMemberOfCommunity(communityID) if err != nil { break importMessageArchivesLoop } - archiveIDsToImport, err := m.communitiesManager.GetMessageArchiveIDsToImport(communityID) + archiveIDsToImport, err := m.torrentManager.GetMessageArchiveIDsToImport(communityID) if err != nil { - m.communitiesManager.LogStdout("couldn't get message archive IDs to import", zap.Error(err)) + m.torrentManager.LogStdout("couldn't get message archive IDs to import", zap.Error(err)) return err } if len(archiveIDsToImport) == 0 { - m.communitiesManager.LogStdout("no message archives to import") + m.torrentManager.LogStdout("no message archives to import") break importMessageArchivesLoop } - m.communitiesManager.LogStdout("importing message archive", zap.Int("left", len(archiveIDsToImport))) + m.torrentManager.LogStdout("importing message archive", zap.Int("left", len(archiveIDsToImport))) // only process one archive at a time, so in case of cancel we don't // wait for all archives to be processed first downloadedArchiveID := archiveIDsToImport[0] - archiveMessages, err := m.communitiesManager.ExtractMessagesFromHistoryArchive(communityID, downloadedArchiveID) + archiveMessages, err := m.torrentManager.ExtractMessagesFromHistoryArchive(communityID, downloadedArchiveID) if err != nil { if errors.Is(err, encryption.ErrHashRatchetGroupIDNotFound) { // In case we're missing hash ratchet keys, best we can do is @@ -4038,7 +4038,7 @@ importMessageArchivesLoop: delayImport = true continue } - m.communitiesManager.LogStdout("failed to extract history archive messages", zap.Error(err)) + m.torrentManager.LogStdout("failed to extract history archive messages", zap.Error(err)) continue } @@ -4047,14 +4047,14 @@ importMessageArchivesLoop: for _, messagesChunk := range chunkSlice(archiveMessages, importMessagesChunkSize) { if err := m.importRateLimiter.Wait(ctx); err != nil { if !errors.Is(err, context.Canceled) { - m.communitiesManager.LogStdout("rate limiter error when handling archive messages", zap.Error(err)) + m.torrentManager.LogStdout("rate limiter error when handling archive messages", zap.Error(err)) } continue importMessageArchivesLoop } response, err := m.handleArchiveMessages(messagesChunk) if err != nil { - m.communitiesManager.LogStdout("failed to handle archive messages", zap.Error(err)) + m.torrentManager.LogStdout("failed to handle archive messages", zap.Error(err)) continue importMessageArchivesLoop } @@ -4066,9 +4066,9 @@ importMessageArchivesLoop: } } - err = m.communitiesManager.SetMessageArchiveIDImported(communityID, downloadedArchiveID, true) + err = m.torrentManager.SetMessageArchiveIDImported(communityID, downloadedArchiveID, true) if err != nil { - m.communitiesManager.LogStdout("failed to mark history message archive as imported", zap.Error(err)) + m.torrentManager.LogStdout("failed to mark history message archive as imported", zap.Error(err)) continue } } @@ -4083,7 +4083,7 @@ func (m *Messenger) dispatchMagnetlinkMessage(communityID string) error { return err } - magnetlink, err := m.communitiesManager.GetHistoryArchiveMagnetlink(community.ID()) + magnetlink, err := m.torrentManager.GetHistoryArchiveMagnetlink(community.ID()) if err != nil { return err } @@ -4137,8 +4137,8 @@ func (m *Messenger) EnableCommunityHistoryArchiveProtocol() error { } m.config.torrentConfig = &nodeConfig.TorrentConfig - m.communitiesManager.SetTorrentConfig(&nodeConfig.TorrentConfig) - err = m.communitiesManager.StartTorrentClient() + m.torrentManager.SetTorrentConfig(&nodeConfig.TorrentConfig) + err = m.torrentManager.StartTorrentClient() if err != nil { return err } @@ -4167,12 +4167,12 @@ func (m *Messenger) DisableCommunityHistoryArchiveProtocol() error { return nil } - m.communitiesManager.StopTorrentClient() + m.torrentManager.StopTorrentClient() nodeConfig.TorrentConfig.Enabled = false err = m.settings.SaveSetting("node-config", nodeConfig) m.config.torrentConfig = &nodeConfig.TorrentConfig - m.communitiesManager.SetTorrentConfig(&nodeConfig.TorrentConfig) + m.torrentManager.SetTorrentConfig(&nodeConfig.TorrentConfig) if err != nil { return err } @@ -4284,7 +4284,7 @@ func (m *Messenger) torrentClientReady() bool { // be instantiated (for example in case of port conflicts) return m.config.torrentConfig != nil && m.config.torrentConfig.Enabled && - m.communitiesManager.TorrentClientStarted() + m.torrentManager.TorrentClientStarted() } func (m *Messenger) chatMessagesToWakuMessages(chatMessages []*common.Message, c *communities.Community) ([]*types.Message, error) { diff --git a/protocol/messenger_communities_import_discord.go b/protocol/messenger_communities_import_discord.go index f9e0f61d9..66dd5cf4f 100644 --- a/protocol/messenger_communities_import_discord.go +++ b/protocol/messenger_communities_import_discord.go @@ -684,7 +684,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC chunksCount := len(discordMessageChunks) for ii, msgs := range discordMessageChunks { - m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord messages", ii+1, chunksCount, len(msgs))) + m.torrentManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord messages", ii+1, chunksCount, len(msgs))) err := m.persistence.SaveDiscordMessages(msgs) if err != nil { m.cleanUpImportChannel(request.CommunityID.String(), newChat.ID) @@ -727,7 +727,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC chunksCount = len(messageChunks) for ii, msgs := range messageChunks { - m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d app messages", ii+1, chunksCount, len(msgs))) + m.torrentManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d app messages", ii+1, chunksCount, len(msgs))) err := m.persistence.SaveMessages(msgs) if err != nil { m.cleanUpImportChannel(request.CommunityID.String(), request.DiscordChannelID) @@ -791,7 +791,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC go func(id string, author *protobuf.DiscordMessageAuthor) { defer wg.Done() - m.communitiesManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount)) + m.torrentManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount)) imagePayload, err := discord.DownloadAvatarAsset(author.AvatarUrl) if err != nil { errmsg := fmt.Sprintf("Couldn't download profile avatar '%s': %s", author.AvatarUrl, err.Error()) @@ -845,7 +845,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC defer wg.Done() for ii, attachment := range attachments { - m.communitiesManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount)) + m.torrentManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount)) assetPayload, contentType, err := discord.DownloadAsset(attachment.Url) if err != nil { @@ -889,7 +889,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC chunksCount = len(attachmentChunks) for ii, attachments := range attachmentChunks { - m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord message attachments", ii+1, chunksCount, len(attachments))) + m.torrentManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord message attachments", ii+1, chunksCount, len(attachments))) err := m.persistence.SaveDiscordMessageAttachments(attachments) if err != nil { importProgress.AddTaskError(discord.DownloadAssetsTask, discord.Warning(err.Error())) @@ -944,7 +944,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC wakuMessages := append(wakuChatMessages, wakuPinMessages...) - topics, err := m.communitiesManager.GetCommunityChatsTopics(request.CommunityID) + topics, err := m.torrentManager.GetCommunityChatsTopics(request.CommunityID) if err != nil { m.logger.Error("failed to get community chat topics", zap.Error(err)) continue @@ -953,7 +953,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC startDate := time.Unix(int64(exportData.OldestMessageTimestamp), 0) endDate := time.Now() - _, err = m.communitiesManager.CreateHistoryArchiveTorrentFromMessages( + _, err = m.torrentManager.CreateHistoryArchiveTorrentFromMessages( request.CommunityID, wakuMessages, topics, @@ -973,11 +973,11 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC } if m.torrentClientReady() && communitySettings.HistoryArchiveSupportEnabled { - err = m.communitiesManager.SeedHistoryArchiveTorrent(request.CommunityID) + err = m.torrentManager.SeedHistoryArchiveTorrent(request.CommunityID) if err != nil { m.logger.Error("failed to seed history archive", zap.Error(err)) } - go m.communitiesManager.StartHistoryArchiveTasksInterval(community, messageArchiveInterval) + go m.torrentManager.StartHistoryArchiveTasksInterval(community, messageArchiveInterval) } } @@ -1460,7 +1460,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor chunksCount := len(discordMessageChunks) for ii, msgs := range discordMessageChunks { - m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord messages", ii+1, chunksCount, len(msgs))) + m.torrentManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord messages", ii+1, chunksCount, len(msgs))) err = m.persistence.SaveDiscordMessages(msgs) if err != nil { m.cleanUpImport(communityID) @@ -1503,7 +1503,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor chunksCount = len(messageChunks) for ii, msgs := range messageChunks { - m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d app messages", ii+1, chunksCount, len(msgs))) + m.torrentManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d app messages", ii+1, chunksCount, len(msgs))) err = m.persistence.SaveMessages(msgs) if err != nil { m.cleanUpImport(communityID) @@ -1563,7 +1563,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor go func(id string, author *protobuf.DiscordMessageAuthor) { defer wg.Done() - m.communitiesManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount)) + m.torrentManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount)) imagePayload, err := discord.DownloadAvatarAsset(author.AvatarUrl) if err != nil { errmsg := fmt.Sprintf("Couldn't download profile avatar '%s': %s", author.AvatarUrl, err.Error()) @@ -1615,7 +1615,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor defer wg.Done() for ii, attachment := range attachments { - m.communitiesManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount)) + m.torrentManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount)) assetPayload, contentType, err := discord.DownloadAsset(attachment.Url) if err != nil { @@ -1659,7 +1659,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor chunksCount = len(attachmentChunks) for ii, attachments := range attachmentChunks { - m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord message attachments", ii+1, chunksCount, len(attachments))) + m.torrentManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord message attachments", ii+1, chunksCount, len(attachments))) err = m.persistence.SaveDiscordMessageAttachments(attachments) if err != nil { m.cleanUpImport(communityID) @@ -1714,7 +1714,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor wakuMessages := append(wakuChatMessages, wakuPinMessages...) - topics, err := m.communitiesManager.GetCommunityChatsTopics(discordCommunity.ID()) + topics, err := m.torrentManager.GetCommunityChatsTopics(discordCommunity.ID()) if err != nil { m.logger.Error("failed to get community chat topics", zap.Error(err)) continue @@ -1723,7 +1723,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor startDate := time.Unix(int64(exportData.OldestMessageTimestamp), 0) endDate := time.Now() - _, err = m.communitiesManager.CreateHistoryArchiveTorrentFromMessages( + _, err = m.torrentManager.CreateHistoryArchiveTorrentFromMessages( discordCommunity.ID(), wakuMessages, topics, @@ -1739,11 +1739,11 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor if m.torrentClientReady() && communitySettings.HistoryArchiveSupportEnabled { - err = m.communitiesManager.SeedHistoryArchiveTorrent(discordCommunity.ID()) + err = m.torrentManager.SeedHistoryArchiveTorrent(discordCommunity.ID()) if err != nil { m.logger.Error("failed to seed history archive", zap.Error(err)) } - go m.communitiesManager.StartHistoryArchiveTasksInterval(discordCommunity, messageArchiveInterval) + go m.torrentManager.StartHistoryArchiveTasksInterval(discordCommunity, messageArchiveInterval) } } diff --git a/protocol/messenger_handler.go b/protocol/messenger_handler.go index dcbadc1f1..fc313b3af 100644 --- a/protocol/messenger_handler.go +++ b/protocol/messenger_handler.go @@ -1356,12 +1356,12 @@ func (m *Messenger) HandleHistoryArchiveMagnetlinkMessage(state *ReceivedMessage // part of and doesn't own the private key at the same time if !community.IsControlNode() && community.Joined() && clock >= lastClock { if lastSeenMagnetlink == magnetlink { - m.communitiesManager.LogStdout("already processed this magnetlink") + m.torrentManager.LogStdout("already processed this magnetlink") return nil } - m.communitiesManager.UnseedHistoryArchiveTorrent(id) - currentTask := m.communitiesManager.GetHistoryArchiveDownloadTask(id.String()) + m.torrentManager.UnseedHistoryArchiveTorrent(id) + currentTask := m.torrentManager.GetHistoryArchiveDownloadTask(id.String()) go func(currentTask *communities.HistoryArchiveDownloadTask, communityID types.HexBytes) { @@ -1378,7 +1378,7 @@ func (m *Messenger) HandleHistoryArchiveMagnetlinkMessage(state *ReceivedMessage Cancelled: false, } - m.communitiesManager.AddHistoryArchiveDownloadTask(communityID.String(), task) + m.torrentManager.AddHistoryArchiveDownloadTask(communityID.String(), task) // this wait groups tracks the ongoing task for a particular community task.Waiter.Add(1) @@ -1397,32 +1397,32 @@ func (m *Messenger) HandleHistoryArchiveMagnetlinkMessage(state *ReceivedMessage } func (m *Messenger) downloadAndImportHistoryArchives(id types.HexBytes, magnetlink string, cancel chan struct{}) { - downloadTaskInfo, err := m.communitiesManager.DownloadHistoryArchivesByMagnetlink(id, magnetlink, cancel) + downloadTaskInfo, err := m.torrentManager.DownloadHistoryArchivesByMagnetlink(id, magnetlink, cancel) if err != nil { logMsg := "failed to download history archive data" if err == communities.ErrTorrentTimedout { - m.communitiesManager.LogStdout("torrent has timed out, trying once more...") - downloadTaskInfo, err = m.communitiesManager.DownloadHistoryArchivesByMagnetlink(id, magnetlink, cancel) + m.torrentManager.LogStdout("torrent has timed out, trying once more...") + downloadTaskInfo, err = m.torrentManager.DownloadHistoryArchivesByMagnetlink(id, magnetlink, cancel) if err != nil { - m.communitiesManager.LogStdout(logMsg, zap.Error(err)) + m.torrentManager.LogStdout(logMsg, zap.Error(err)) return } } else { - m.communitiesManager.LogStdout(logMsg, zap.Error(err)) + m.torrentManager.LogStdout(logMsg, zap.Error(err)) return } } if downloadTaskInfo.Cancelled { if downloadTaskInfo.TotalDownloadedArchivesCount > 0 { - m.communitiesManager.LogStdout(fmt.Sprintf("downloaded %d of %d archives so far", downloadTaskInfo.TotalDownloadedArchivesCount, downloadTaskInfo.TotalArchivesCount)) + m.torrentManager.LogStdout(fmt.Sprintf("downloaded %d of %d archives so far", downloadTaskInfo.TotalDownloadedArchivesCount, downloadTaskInfo.TotalArchivesCount)) } return } err = m.communitiesManager.UpdateLastSeenMagnetlink(id, magnetlink) if err != nil { - m.communitiesManager.LogStdout("couldn't update last seen magnetlink", zap.Error(err)) + m.torrentManager.LogStdout("couldn't update last seen magnetlink", zap.Error(err)) } err = m.checkIfIMemberOfCommunity(id) @@ -1432,7 +1432,7 @@ func (m *Messenger) downloadAndImportHistoryArchives(id types.HexBytes, magnetli err = m.importHistoryArchives(id, cancel) if err != nil { - m.communitiesManager.LogStdout("failed to import history archives", zap.Error(err)) + m.torrentManager.LogStdout("failed to import history archives", zap.Error(err)) m.config.messengerSignalsHandler.DownloadingHistoryArchivesFinished(types.EncodeHex(id)) return } @@ -1475,13 +1475,13 @@ func (m *Messenger) handleArchiveMessages(archiveMessages []*protobuf.WakuMessag err := m.handleImportedMessages(importedMessages) if err != nil { - m.communitiesManager.LogStdout("failed to handle imported messages", zap.Error(err)) + m.torrentManager.LogStdout("failed to handle imported messages", zap.Error(err)) return nil, err } response, err := m.handleRetrievedMessages(otherMessages, false, true) if err != nil { - m.communitiesManager.LogStdout("failed to write history archive messages to database", zap.Error(err)) + m.torrentManager.LogStdout("failed to write history archive messages to database", zap.Error(err)) return nil, err } @@ -1730,7 +1730,7 @@ func (m *Messenger) HandleCommunityRequestToJoinResponse(state *ReceivedMessageS magnetlink := requestToJoinResponseProto.MagnetUri if m.torrentClientReady() && communitySettings != nil && communitySettings.HistoryArchiveSupportEnabled && magnetlink != "" { - currentTask := m.communitiesManager.GetHistoryArchiveDownloadTask(community.IDString()) + currentTask := m.torrentManager.GetHistoryArchiveDownloadTask(community.IDString()) go func(currentTask *communities.HistoryArchiveDownloadTask) { // Cancel ongoing download/import task @@ -1744,7 +1744,7 @@ func (m *Messenger) HandleCommunityRequestToJoinResponse(state *ReceivedMessageS Waiter: *new(sync.WaitGroup), Cancelled: false, } - m.communitiesManager.AddHistoryArchiveDownloadTask(community.IDString(), task) + m.torrentManager.AddHistoryArchiveDownloadTask(community.IDString(), task) task.Waiter.Add(1) defer task.Waiter.Done() diff --git a/services/ext/signal.go b/services/ext/signal.go index f7c62c6e6..9b0903754 100644 --- a/services/ext/signal.go +++ b/services/ext/signal.go @@ -12,7 +12,7 @@ import ( // EnvelopeSignalHandler sends signals when envelope is sent or expired. type EnvelopeSignalHandler struct{} -// EnvelopeSent triggered when envelope delivered atleast to 1 peer. +// EnvelopeSent triggered when envelope delivered at least to 1 peer. func (h EnvelopeSignalHandler) EnvelopeSent(identifiers [][]byte) { signal.SendEnvelopeSent(identifiers) } @@ -51,21 +51,21 @@ func (h PublisherSignalHandler) Stats(stats types.StatsSummary) { signal.SendStats(stats) } -// MessengerSignalHandler sends signals on messenger events +// MessengerSignalsHandler sends signals on messenger events type MessengerSignalsHandler struct{} // MessageDelivered passes information that message was delivered -func (m MessengerSignalsHandler) MessageDelivered(chatID string, messageID string) { +func (m *MessengerSignalsHandler) MessageDelivered(chatID string, messageID string) { signal.SendMessageDelivered(chatID, messageID) } // BackupPerformed passes information that a backup was performed -func (m MessengerSignalsHandler) BackupPerformed(lastBackup uint64) { +func (m *MessengerSignalsHandler) BackupPerformed(lastBackup uint64) { signal.SendBackupPerformed(lastBackup) } -// MessageDelivered passes info about community that was requested before -func (m MessengerSignalsHandler) CommunityInfoFound(community *communities.Community) { +// CommunityInfoFound passes info about community that was requested before +func (m *MessengerSignalsHandler) CommunityInfoFound(community *communities.Community) { signal.SendCommunityInfoFound(community) }