chore(no-torrent)_: I've fully split Manager from TorrentManager

I've removed any mention or dependency of TorrentManager from Manager. There is still more work to do, but Messenger now communicates directly with a TorrentManager rather than asking the communities Manager to handle it. I've ensured that LogStdout() is only called from TorrentManager and removed entirely from Manager, this functionality seems to be some kind of debug tool specifically for torrent related functionality. Next I need to focus on functions within Messenger that call a TorrentManager and see how to isolate these from the main flows, following that I also need fix the tests that are broken. I will also need to refactor torrentClientReady() so that it is a function of TorrentManager, this may allow for pushing more functions into TorrentManager which will lead to better torrent encapsulation.
This commit is contained in:
Samuel Hawksby-Robinson 2024-05-31 15:20:36 +01:00
parent 6c52d0b120
commit 9b458c63e0
8 changed files with 134 additions and 133 deletions

View File

@ -94,11 +94,9 @@ type Manager struct {
tokenManager TokenManager
collectiblesManager CollectiblesManager
logger *zap.Logger
stdoutLogger *zap.Logger
transport *transport.Transport
timesource common.TimeSource
quit chan struct{}
torrentManager *TorrentManager
walletConfig *params.WalletConfig
communityTokensService CommunityTokensServiceInterface
membersReevaluationTasks sync.Map // stores `membersReevaluationTask`
@ -318,7 +316,7 @@ type OwnerVerifier interface {
SafeGetSignerPubKey(ctx context.Context, chainID uint64, communityID string) (string, error)
}
func NewManager(identity *ecdsa.PrivateKey, installationID string, db *sql.DB, encryptor *encryption.Protocol, logger *zap.Logger, ensverifier *ens.Verifier, ownerVerifier OwnerVerifier, transport *transport.Transport, timesource common.TimeSource, keyDistributor KeyDistributor, torrentConfig *params.TorrentConfig, opts ...ManagerOption) (*Manager, error) {
func NewManager(identity *ecdsa.PrivateKey, installationID string, db *sql.DB, encryptor *encryption.Protocol, logger *zap.Logger, ensverifier *ens.Verifier, ownerVerifier OwnerVerifier, transport *transport.Transport, timesource common.TimeSource, keyDistributor KeyDistributor, opts ...ManagerOption) (*Manager, error) {
if identity == nil {
return nil, errors.New("empty identity")
}
@ -334,11 +332,6 @@ func NewManager(identity *ecdsa.PrivateKey, installationID string, db *sql.DB, e
}
}
stdoutLogger, err := zap.NewDevelopment()
if err != nil {
return nil, errors.Wrap(err, "failed to create archive logger")
}
managerConfig := managerOptions{}
for _, opt := range opts {
opt(&managerConfig)
@ -346,7 +339,6 @@ func NewManager(identity *ecdsa.PrivateKey, installationID string, db *sql.DB, e
manager := &Manager{
logger: logger,
stdoutLogger: stdoutLogger,
encryptor: encryptor,
identity: identity,
installationID: installationID,
@ -358,11 +350,10 @@ func NewManager(identity *ecdsa.PrivateKey, installationID string, db *sql.DB, e
communityLock: NewCommunityLock(logger),
}
persistence := &Persistence{
manager.persistence = &Persistence{
db: db,
recordBundleToCommunity: manager.dbRecordBundleToCommunity,
}
manager.persistence = persistence
if managerConfig.accountsManager != nil {
manager.accountsManager = managerConfig.accountsManager
@ -407,16 +398,9 @@ func NewManager(identity *ecdsa.PrivateKey, installationID string, db *sql.DB, e
manager.forceMembersReevaluation = make(map[string]chan struct{}, 10)
}
manager.torrentManager = NewTorrentManager(torrentConfig, logger, stdoutLogger, persistence, transport, identity, encryptor, manager)
return manager, nil
}
func (m *Manager) LogStdout(msg string, fields ...zap.Field) {
m.stdoutLogger.Info(msg, fields...)
m.logger.Debug(msg, fields...)
}
type Subscription struct {
Community *Community
CreatingHistoryArchivesSignal *signal.CreatingHistoryArchivesSignal
@ -648,7 +632,6 @@ func (m *Manager) Stop() error {
for _, c := range m.subscriptions {
close(c)
}
m.torrentManager.StopTorrentClient()
return nil
}
@ -4865,6 +4848,11 @@ func (m *Manager) decryptCommunityDescription(keyIDSeqNo string, d []byte) (*Dec
return decryptCommunityResponse, nil
}
// GetPersistence returns the instantiated *Persistence used by the Manager
func (m *Manager) GetPersistence() *Persistence {
return m.persistence
}
func ToLinkPreveiwThumbnail(image images.IdentityImage) (*common.LinkPreviewThumbnail, error) {
thumbnail := &common.LinkPreviewThumbnail{}

View File

@ -2,6 +2,7 @@ package communities
import (
"crypto/ecdsa"
"errors"
"fmt"
"net"
"os"
@ -14,7 +15,6 @@ import (
"github.com/anacrolix/torrent/bencode"
"github.com/anacrolix/torrent/metainfo"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
"go.uber.org/zap"
"github.com/status-im/status-go/eth-node/crypto"
@ -76,7 +76,12 @@ type TorrentManager struct {
publisher Publisher
}
func NewTorrentManager(torrentConfig *params.TorrentConfig, logger, stdoutLogger *zap.Logger, persistence *Persistence, transport *transport.Transport, identity *ecdsa.PrivateKey, encryptor *encryption.Protocol, publisher Publisher) *TorrentManager {
func NewTorrentManager(torrentConfig *params.TorrentConfig, logger *zap.Logger, persistence *Persistence, transport *transport.Transport, identity *ecdsa.PrivateKey, encryptor *encryption.Protocol, publisher Publisher) (*TorrentManager, error) {
stdoutLogger, err := zap.NewDevelopment()
if err != nil {
return nil, fmt.Errorf("failed to create archive logger %w", err)
}
return &TorrentManager{
torrentConfig: torrentConfig,
torrentTasks: make(map[string]metainfo.Hash),
@ -91,7 +96,7 @@ func NewTorrentManager(torrentConfig *params.TorrentConfig, logger, stdoutLogger
encryptor: encryptor,
publisher: publisher,
}
}, nil
}
// LogStdout is copied directly from Manager, consider a refactor
@ -203,17 +208,17 @@ func (m *TorrentManager) StartTorrentClient() error {
return nil
}
func (m *TorrentManager) StopTorrentClient() []error {
func (m *TorrentManager) StopTorrentClient() error {
if m.TorrentClientStarted() {
m.StopHistoryArchiveTasksIntervals()
m.logger.Info("Stopping torrent client")
errs := m.torrentClient.Close()
if len(errs) > 0 {
return errs
return errors.Join(errs...)
}
m.torrentClient = nil
}
return make([]error, 0)
return nil
}
func (m *TorrentManager) TorrentClientStarted() bool {

View File

@ -2265,12 +2265,12 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestImportDecryptedArchiveMe
}
// Share archive directory between all users
s.owner.communitiesManager.SetTorrentConfig(&torrentConfig)
s.bob.communitiesManager.SetTorrentConfig(&torrentConfig)
s.owner.torrentManager.SetTorrentConfig(&torrentConfig)
s.bob.torrentManager.SetTorrentConfig(&torrentConfig)
s.owner.config.messengerSignalsHandler = &MessengerSignalsHandlerMock{}
s.bob.config.messengerSignalsHandler = &MessengerSignalsHandlerMock{}
archiveIDs, err := s.owner.communitiesManager.CreateHistoryArchiveTorrentFromDB(community.ID(), topics, startDate, endDate, partition, community.Encrypted())
archiveIDs, err := s.owner.torrentManager.CreateHistoryArchiveTorrentFromDB(community.ID(), topics, startDate, endDate, partition, community.Encrypted())
s.Require().NoError(err)
s.Require().Len(archiveIDs, 1)
@ -2302,12 +2302,12 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestImportDecryptedArchiveMe
// https://github.com/status-im/status-go/blob/6c82a6c2be7ebed93bcae3b9cf5053da3820de50/protocol/communities/manager.go#L4403
// Ensure owner has archive
archiveIndex, err := s.owner.communitiesManager.LoadHistoryArchiveIndexFromFile(s.owner.identity, community.ID())
archiveIndex, err := s.owner.torrentManager.LoadHistoryArchiveIndexFromFile(s.owner.identity, community.ID())
s.Require().NoError(err)
s.Require().Len(archiveIndex.Archives, 1)
// Ensure bob has archive (because they share same local directory)
archiveIndex, err = s.bob.communitiesManager.LoadHistoryArchiveIndexFromFile(s.bob.identity, community.ID())
archiveIndex, err = s.bob.torrentManager.LoadHistoryArchiveIndexFromFile(s.bob.identity, community.ID())
s.Require().NoError(err)
s.Require().Len(archiveIndex.Archives, 1)
@ -2315,7 +2315,7 @@ func (s *MessengerCommunitiesTokenPermissionsSuite) TestImportDecryptedArchiveMe
// Save message archive ID as in
// https://github.com/status-im/status-go/blob/6c82a6c2be7ebed93bcae3b9cf5053da3820de50/protocol/communities/manager.go#L4325-L4336
err = s.bob.communitiesManager.SaveMessageArchiveID(community.ID(), archiveHash)
err = s.bob.torrentManager.SaveMessageArchiveID(community.ID(), archiveHash)
s.Require().NoError(err)
// Import archive

View File

@ -113,6 +113,7 @@ type Messenger struct {
pushNotificationClient *pushnotificationclient.Client
pushNotificationServer *pushnotificationserver.Server
communitiesManager *communities.Manager
torrentManager *communities.TorrentManager
communitiesKeyDistributor communities.KeyDistributor
accountsManager account.Manager
mentionsManager *MentionManager
@ -492,7 +493,12 @@ func NewMessenger(
encryptor: encryptionProtocol,
}
communitiesManager, err := communities.NewManager(identity, installationID, database, encryptionProtocol, logger, ensVerifier, c.communityTokensService, transp, transp, communitiesKeyDistributor, c.torrentConfig, managerOptions...)
communitiesManager, err := communities.NewManager(identity, installationID, database, encryptionProtocol, logger, ensVerifier, c.communityTokensService, transp, transp, communitiesKeyDistributor, managerOptions...)
if err != nil {
return nil, err
}
torrentManager, err := communities.NewTorrentManager(c.torrentConfig, logger, communitiesManager.GetPersistence(), transp, identity, encryptionProtocol, communitiesManager)
if err != nil {
return nil, err
}
@ -527,6 +533,7 @@ func NewMessenger(
pushNotificationServer: pushNotificationServer,
communitiesManager: communitiesManager,
communitiesKeyDistributor: communitiesKeyDistributor,
torrentManager: torrentManager,
accountsManager: c.accountsManager,
ensVerifier: ensVerifier,
featureFlags: c.featureFlags,
@ -572,6 +579,7 @@ func NewMessenger(
ensVerifier.Stop,
pushNotificationClient.Stop,
communitiesManager.Stop,
torrentManager.StopTorrentClient,
encryptionProtocol.Stop,
func() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
@ -911,8 +919,8 @@ func (m *Messenger) handleConnectionChange(online bool) {
}
// Update Communities manager
if m.communitiesManager != nil {
m.communitiesManager.SetOnline(online)
if m.torrentManager != nil {
m.torrentManager.SetOnline(online)
}
// Publish contact code
@ -3735,11 +3743,11 @@ func (m *Messenger) handleImportedMessages(messagesToHandle map[transport.Filter
importMessagesToSave := messageState.Response.DiscordMessages()
if len(importMessagesToSave) > 0 {
m.communitiesManager.LogStdout(fmt.Sprintf("saving %d discord messages", len(importMessagesToSave)))
m.torrentManager.LogStdout(fmt.Sprintf("saving %d discord messages", len(importMessagesToSave)))
m.handleImportMessagesMutex.Lock()
err := m.persistence.SaveDiscordMessages(importMessagesToSave)
if err != nil {
m.communitiesManager.LogStdout("failed to save discord messages", zap.Error(err))
m.torrentManager.LogStdout("failed to save discord messages", zap.Error(err))
m.handleImportMessagesMutex.Unlock()
return err
}
@ -3748,11 +3756,11 @@ func (m *Messenger) handleImportedMessages(messagesToHandle map[transport.Filter
messageAttachmentsToSave := messageState.Response.DiscordMessageAttachments()
if len(messageAttachmentsToSave) > 0 {
m.communitiesManager.LogStdout(fmt.Sprintf("saving %d discord message attachments", len(messageAttachmentsToSave)))
m.torrentManager.LogStdout(fmt.Sprintf("saving %d discord message attachments", len(messageAttachmentsToSave)))
m.handleImportMessagesMutex.Lock()
err := m.persistence.SaveDiscordMessageAttachments(messageAttachmentsToSave)
if err != nil {
m.communitiesManager.LogStdout("failed to save discord message attachments", zap.Error(err))
m.torrentManager.LogStdout("failed to save discord message attachments", zap.Error(err))
m.handleImportMessagesMutex.Unlock()
return err
}
@ -3761,7 +3769,7 @@ func (m *Messenger) handleImportedMessages(messagesToHandle map[transport.Filter
messagesToSave := messageState.Response.Messages()
if len(messagesToSave) > 0 {
m.communitiesManager.LogStdout(fmt.Sprintf("saving %d app messages", len(messagesToSave)))
m.torrentManager.LogStdout(fmt.Sprintf("saving %d app messages", len(messagesToSave)))
m.handleMessagesMutex.Lock()
err := m.SaveMessages(messagesToSave)
if err != nil {

View File

@ -1895,8 +1895,8 @@ func (m *Messenger) acceptRequestToJoinCommunity(requestToJoin *communities.Requ
Shard: community.Shard().Protobuffer(),
}
if m.torrentClientReady() && m.communitiesManager.TorrentFileExists(community.IDString()) {
magnetlink, err := m.communitiesManager.GetHistoryArchiveMagnetlink(community.ID())
if m.torrentClientReady() && m.torrentManager.TorrentFileExists(community.IDString()) {
magnetlink, err := m.torrentManager.GetHistoryArchiveMagnetlink(community.ID())
if err != nil {
m.logger.Warn("couldn't get magnet link for community", zap.Error(err))
return nil, err
@ -2082,7 +2082,7 @@ func (m *Messenger) LeaveCommunity(communityID types.HexBytes) (*MessengerRespon
return nil, err
}
m.communitiesManager.StopHistoryArchiveTasksInterval(communityID)
m.torrentManager.StopHistoryArchiveTasksInterval(communityID)
err = m.syncCommunity(context.Background(), community, m.dispatchMessage)
if err != nil {
@ -2464,7 +2464,7 @@ func (m *Messenger) CreateCommunity(request *requests.CreateCommunity, createDef
}
if m.config.torrentConfig != nil && m.config.torrentConfig.Enabled && communitySettings.HistoryArchiveSupportEnabled {
go m.communitiesManager.StartHistoryArchiveTasksInterval(community, messageArchiveInterval)
go m.torrentManager.StartHistoryArchiveTasksInterval(community, messageArchiveInterval)
}
return response, nil
@ -2806,8 +2806,8 @@ func (m *Messenger) EditCommunity(request *requests.EditCommunity) (*MessengerRe
if m.torrentClientReady() {
if !communitySettings.HistoryArchiveSupportEnabled {
m.communitiesManager.StopHistoryArchiveTasksInterval(id)
} else if !m.communitiesManager.IsSeedingHistoryArchiveTorrent(id) {
m.torrentManager.StopHistoryArchiveTasksInterval(id)
} else if !m.torrentManager.IsSeedingHistoryArchiveTorrent(id) {
var communities []*communities.Community
communities = append(communities, community)
go m.InitHistoryArchiveTasks(communities)
@ -3779,38 +3779,38 @@ func (m *Messenger) HandleSyncCommunitySettings(messageState *ReceivedMessageSta
func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community) {
m.communitiesManager.LogStdout("initializing history archive tasks")
m.torrentManager.LogStdout("initializing history archive tasks")
for _, c := range communities {
if c.Joined() {
settings, err := m.communitiesManager.GetCommunitySettingsByID(c.ID())
if err != nil {
m.communitiesManager.LogStdout("failed to get community settings", zap.Error(err))
m.torrentManager.LogStdout("failed to get community settings", zap.Error(err))
continue
}
if !settings.HistoryArchiveSupportEnabled {
m.communitiesManager.LogStdout("history archive support disabled for community", zap.String("id", c.IDString()))
m.torrentManager.LogStdout("history archive support disabled for community", zap.String("id", c.IDString()))
continue
}
// Check if there's already a torrent file for this community and seed it
if m.communitiesManager.TorrentFileExists(c.IDString()) {
err = m.communitiesManager.SeedHistoryArchiveTorrent(c.ID())
if m.torrentManager.TorrentFileExists(c.IDString()) {
err = m.torrentManager.SeedHistoryArchiveTorrent(c.ID())
if err != nil {
m.communitiesManager.LogStdout("failed to seed history archive", zap.Error(err))
m.torrentManager.LogStdout("failed to seed history archive", zap.Error(err))
}
}
filters, err := m.communitiesManager.GetCommunityChatsFilters(c.ID())
filters, err := m.torrentManager.GetCommunityChatsFilters(c.ID())
if err != nil {
m.communitiesManager.LogStdout("failed to get community chats filters for community", zap.Error(err))
m.torrentManager.LogStdout("failed to get community chats filters for community", zap.Error(err))
continue
}
if len(filters) == 0 {
m.communitiesManager.LogStdout("no filters or chats for this community starting interval", zap.String("id", c.IDString()))
go m.communitiesManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval)
m.torrentManager.LogStdout("no filters or chats for this community starting interval", zap.String("id", c.IDString()))
go m.torrentManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval)
continue
}
@ -3825,7 +3825,7 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community
// possibly missed since then
latestWakuMessageTimestamp, err := m.communitiesManager.GetLatestWakuMessageTimestamp(topics)
if err != nil {
m.communitiesManager.LogStdout("failed to get Latest waku message timestamp", zap.Error(err))
m.torrentManager.LogStdout("failed to get Latest waku message timestamp", zap.Error(err))
continue
}
@ -3843,16 +3843,16 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community
ms := m.getActiveMailserver(c.ID().String())
_, err = m.syncFiltersFrom(*ms, filters, uint32(latestWakuMessageTimestamp))
if err != nil {
m.communitiesManager.LogStdout("failed to request missing messages", zap.Error(err))
m.torrentManager.LogStdout("failed to request missing messages", zap.Error(err))
continue
}
// We figure out the end date of the last created archive and schedule
// the interval for creating future archives
// If the last end date is at least `interval` ago, we create an archive immediately first
lastArchiveEndDateTimestamp, err := m.communitiesManager.GetHistoryArchivePartitionStartTimestamp(c.ID())
lastArchiveEndDateTimestamp, err := m.torrentManager.GetHistoryArchivePartitionStartTimestamp(c.ID())
if err != nil {
m.communitiesManager.LogStdout("failed to get archive partition start timestamp", zap.Error(err))
m.torrentManager.LogStdout("failed to get archive partition start timestamp", zap.Error(err))
continue
}
@ -3863,35 +3863,35 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community
if lastArchiveEndDateTimestamp == 0 {
// No prior messages to be archived, so we just kick off the archive creation loop
// for future archives
go m.communitiesManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval)
go m.torrentManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval)
} else if durationSinceLastArchive < messageArchiveInterval {
// Last archive is less than `interval` old, wait until `interval` is complete,
// then create archive and kick off archive creation loop for future archives
// Seed current archive in the meantime
err := m.communitiesManager.SeedHistoryArchiveTorrent(c.ID())
err := m.torrentManager.SeedHistoryArchiveTorrent(c.ID())
if err != nil {
m.communitiesManager.LogStdout("failed to seed history archive", zap.Error(err))
m.torrentManager.LogStdout("failed to seed history archive", zap.Error(err))
}
timeToNextInterval := messageArchiveInterval - durationSinceLastArchive
m.communitiesManager.LogStdout("starting history archive tasks interval in", zap.Any("timeLeft", timeToNextInterval))
m.torrentManager.LogStdout("starting history archive tasks interval in", zap.Any("timeLeft", timeToNextInterval))
time.AfterFunc(timeToNextInterval, func() {
err := m.communitiesManager.CreateAndSeedHistoryArchive(c.ID(), topics, lastArchiveEndDate, to.Add(timeToNextInterval), messageArchiveInterval, c.Encrypted())
err := m.torrentManager.CreateAndSeedHistoryArchive(c.ID(), topics, lastArchiveEndDate, to.Add(timeToNextInterval), messageArchiveInterval, c.Encrypted())
if err != nil {
m.communitiesManager.LogStdout("failed to get create and seed history archive", zap.Error(err))
m.torrentManager.LogStdout("failed to get create and seed history archive", zap.Error(err))
}
go m.communitiesManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval)
go m.torrentManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval)
})
} else {
// Looks like the last archive was generated more than `interval`
// ago, so lets create a new archive now and then schedule the archive
// creation loop
err := m.communitiesManager.CreateAndSeedHistoryArchive(c.ID(), topics, lastArchiveEndDate, to, messageArchiveInterval, c.Encrypted())
err := m.torrentManager.CreateAndSeedHistoryArchive(c.ID(), topics, lastArchiveEndDate, to, messageArchiveInterval, c.Encrypted())
if err != nil {
m.communitiesManager.LogStdout("failed to get create and seed history archive", zap.Error(err))
m.torrentManager.LogStdout("failed to get create and seed history archive", zap.Error(err))
}
go m.communitiesManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval)
go m.torrentManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval)
}
}
}
@ -3909,12 +3909,12 @@ func (m *Messenger) enableHistoryArchivesImportAfterDelay() {
func (m *Messenger) checkIfIMemberOfCommunity(communityID types.HexBytes) error {
community, err := m.communitiesManager.GetByID(communityID)
if err != nil {
m.communitiesManager.LogStdout("couldn't get community to import archives", zap.Error(err))
m.torrentManager.LogStdout("couldn't get community to import archives", zap.Error(err))
return err
}
if !community.HasMember(&m.identity.PublicKey) {
m.communitiesManager.LogStdout("can't import archives when user not a member of community")
m.torrentManager.LogStdout("can't import archives when user not a member of community")
return ErrUserNotMember
}
@ -3922,7 +3922,7 @@ func (m *Messenger) checkIfIMemberOfCommunity(communityID types.HexBytes) error
}
func (m *Messenger) resumeHistoryArchivesImport(communityID types.HexBytes) error {
archiveIDsToImport, err := m.communitiesManager.GetMessageArchiveIDsToImport(communityID)
archiveIDsToImport, err := m.torrentManager.GetMessageArchiveIDsToImport(communityID)
if err != nil {
return err
}
@ -3936,7 +3936,7 @@ func (m *Messenger) resumeHistoryArchivesImport(communityID types.HexBytes) erro
return err
}
currentTask := m.communitiesManager.GetHistoryArchiveDownloadTask(communityID.String())
currentTask := m.torrentManager.GetHistoryArchiveDownloadTask(communityID.String())
// no need to resume imports if there's already a task ongoing
if currentTask != nil {
return nil
@ -3949,7 +3949,7 @@ func (m *Messenger) resumeHistoryArchivesImport(communityID types.HexBytes) erro
Cancelled: false,
}
m.communitiesManager.AddHistoryArchiveDownloadTask(communityID.String(), task)
m.torrentManager.AddHistoryArchiveDownloadTask(communityID.String(), task)
// this wait groups tracks the ongoing task for a particular community
task.Waiter.Add(1)
@ -3958,7 +3958,7 @@ func (m *Messenger) resumeHistoryArchivesImport(communityID types.HexBytes) erro
defer task.Waiter.Done()
err := m.importHistoryArchives(communityID, task.CancelChan)
if err != nil {
m.communitiesManager.LogStdout("failed to import history archives", zap.Error(err))
m.torrentManager.LogStdout("failed to import history archives", zap.Error(err))
}
m.config.messengerSignalsHandler.DownloadingHistoryArchivesFinished(types.EncodeHex(communityID))
}()
@ -3997,7 +3997,7 @@ importMessageArchivesLoop:
if delayImport {
select {
case <-ctx.Done():
m.communitiesManager.LogStdout("interrupted importing history archive messages")
m.torrentManager.LogStdout("interrupted importing history archive messages")
return nil
case <-time.After(1 * time.Hour):
delayImport = false
@ -4006,31 +4006,31 @@ importMessageArchivesLoop:
select {
case <-ctx.Done():
m.communitiesManager.LogStdout("interrupted importing history archive messages")
m.torrentManager.LogStdout("interrupted importing history archive messages")
return nil
case <-importTicker.C:
err := m.checkIfIMemberOfCommunity(communityID)
if err != nil {
break importMessageArchivesLoop
}
archiveIDsToImport, err := m.communitiesManager.GetMessageArchiveIDsToImport(communityID)
archiveIDsToImport, err := m.torrentManager.GetMessageArchiveIDsToImport(communityID)
if err != nil {
m.communitiesManager.LogStdout("couldn't get message archive IDs to import", zap.Error(err))
m.torrentManager.LogStdout("couldn't get message archive IDs to import", zap.Error(err))
return err
}
if len(archiveIDsToImport) == 0 {
m.communitiesManager.LogStdout("no message archives to import")
m.torrentManager.LogStdout("no message archives to import")
break importMessageArchivesLoop
}
m.communitiesManager.LogStdout("importing message archive", zap.Int("left", len(archiveIDsToImport)))
m.torrentManager.LogStdout("importing message archive", zap.Int("left", len(archiveIDsToImport)))
// only process one archive at a time, so in case of cancel we don't
// wait for all archives to be processed first
downloadedArchiveID := archiveIDsToImport[0]
archiveMessages, err := m.communitiesManager.ExtractMessagesFromHistoryArchive(communityID, downloadedArchiveID)
archiveMessages, err := m.torrentManager.ExtractMessagesFromHistoryArchive(communityID, downloadedArchiveID)
if err != nil {
if errors.Is(err, encryption.ErrHashRatchetGroupIDNotFound) {
// In case we're missing hash ratchet keys, best we can do is
@ -4038,7 +4038,7 @@ importMessageArchivesLoop:
delayImport = true
continue
}
m.communitiesManager.LogStdout("failed to extract history archive messages", zap.Error(err))
m.torrentManager.LogStdout("failed to extract history archive messages", zap.Error(err))
continue
}
@ -4047,14 +4047,14 @@ importMessageArchivesLoop:
for _, messagesChunk := range chunkSlice(archiveMessages, importMessagesChunkSize) {
if err := m.importRateLimiter.Wait(ctx); err != nil {
if !errors.Is(err, context.Canceled) {
m.communitiesManager.LogStdout("rate limiter error when handling archive messages", zap.Error(err))
m.torrentManager.LogStdout("rate limiter error when handling archive messages", zap.Error(err))
}
continue importMessageArchivesLoop
}
response, err := m.handleArchiveMessages(messagesChunk)
if err != nil {
m.communitiesManager.LogStdout("failed to handle archive messages", zap.Error(err))
m.torrentManager.LogStdout("failed to handle archive messages", zap.Error(err))
continue importMessageArchivesLoop
}
@ -4066,9 +4066,9 @@ importMessageArchivesLoop:
}
}
err = m.communitiesManager.SetMessageArchiveIDImported(communityID, downloadedArchiveID, true)
err = m.torrentManager.SetMessageArchiveIDImported(communityID, downloadedArchiveID, true)
if err != nil {
m.communitiesManager.LogStdout("failed to mark history message archive as imported", zap.Error(err))
m.torrentManager.LogStdout("failed to mark history message archive as imported", zap.Error(err))
continue
}
}
@ -4083,7 +4083,7 @@ func (m *Messenger) dispatchMagnetlinkMessage(communityID string) error {
return err
}
magnetlink, err := m.communitiesManager.GetHistoryArchiveMagnetlink(community.ID())
magnetlink, err := m.torrentManager.GetHistoryArchiveMagnetlink(community.ID())
if err != nil {
return err
}
@ -4137,8 +4137,8 @@ func (m *Messenger) EnableCommunityHistoryArchiveProtocol() error {
}
m.config.torrentConfig = &nodeConfig.TorrentConfig
m.communitiesManager.SetTorrentConfig(&nodeConfig.TorrentConfig)
err = m.communitiesManager.StartTorrentClient()
m.torrentManager.SetTorrentConfig(&nodeConfig.TorrentConfig)
err = m.torrentManager.StartTorrentClient()
if err != nil {
return err
}
@ -4167,12 +4167,12 @@ func (m *Messenger) DisableCommunityHistoryArchiveProtocol() error {
return nil
}
m.communitiesManager.StopTorrentClient()
m.torrentManager.StopTorrentClient()
nodeConfig.TorrentConfig.Enabled = false
err = m.settings.SaveSetting("node-config", nodeConfig)
m.config.torrentConfig = &nodeConfig.TorrentConfig
m.communitiesManager.SetTorrentConfig(&nodeConfig.TorrentConfig)
m.torrentManager.SetTorrentConfig(&nodeConfig.TorrentConfig)
if err != nil {
return err
}
@ -4284,7 +4284,7 @@ func (m *Messenger) torrentClientReady() bool {
// be instantiated (for example in case of port conflicts)
return m.config.torrentConfig != nil &&
m.config.torrentConfig.Enabled &&
m.communitiesManager.TorrentClientStarted()
m.torrentManager.TorrentClientStarted()
}
func (m *Messenger) chatMessagesToWakuMessages(chatMessages []*common.Message, c *communities.Community) ([]*types.Message, error) {

View File

@ -684,7 +684,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC
chunksCount := len(discordMessageChunks)
for ii, msgs := range discordMessageChunks {
m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord messages", ii+1, chunksCount, len(msgs)))
m.torrentManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord messages", ii+1, chunksCount, len(msgs)))
err := m.persistence.SaveDiscordMessages(msgs)
if err != nil {
m.cleanUpImportChannel(request.CommunityID.String(), newChat.ID)
@ -727,7 +727,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC
chunksCount = len(messageChunks)
for ii, msgs := range messageChunks {
m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d app messages", ii+1, chunksCount, len(msgs)))
m.torrentManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d app messages", ii+1, chunksCount, len(msgs)))
err := m.persistence.SaveMessages(msgs)
if err != nil {
m.cleanUpImportChannel(request.CommunityID.String(), request.DiscordChannelID)
@ -791,7 +791,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC
go func(id string, author *protobuf.DiscordMessageAuthor) {
defer wg.Done()
m.communitiesManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount))
m.torrentManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount))
imagePayload, err := discord.DownloadAvatarAsset(author.AvatarUrl)
if err != nil {
errmsg := fmt.Sprintf("Couldn't download profile avatar '%s': %s", author.AvatarUrl, err.Error())
@ -845,7 +845,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC
defer wg.Done()
for ii, attachment := range attachments {
m.communitiesManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount))
m.torrentManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount))
assetPayload, contentType, err := discord.DownloadAsset(attachment.Url)
if err != nil {
@ -889,7 +889,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC
chunksCount = len(attachmentChunks)
for ii, attachments := range attachmentChunks {
m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord message attachments", ii+1, chunksCount, len(attachments)))
m.torrentManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord message attachments", ii+1, chunksCount, len(attachments)))
err := m.persistence.SaveDiscordMessageAttachments(attachments)
if err != nil {
importProgress.AddTaskError(discord.DownloadAssetsTask, discord.Warning(err.Error()))
@ -944,7 +944,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC
wakuMessages := append(wakuChatMessages, wakuPinMessages...)
topics, err := m.communitiesManager.GetCommunityChatsTopics(request.CommunityID)
topics, err := m.torrentManager.GetCommunityChatsTopics(request.CommunityID)
if err != nil {
m.logger.Error("failed to get community chat topics", zap.Error(err))
continue
@ -953,7 +953,7 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC
startDate := time.Unix(int64(exportData.OldestMessageTimestamp), 0)
endDate := time.Now()
_, err = m.communitiesManager.CreateHistoryArchiveTorrentFromMessages(
_, err = m.torrentManager.CreateHistoryArchiveTorrentFromMessages(
request.CommunityID,
wakuMessages,
topics,
@ -973,11 +973,11 @@ func (m *Messenger) RequestImportDiscordChannel(request *requests.ImportDiscordC
}
if m.torrentClientReady() && communitySettings.HistoryArchiveSupportEnabled {
err = m.communitiesManager.SeedHistoryArchiveTorrent(request.CommunityID)
err = m.torrentManager.SeedHistoryArchiveTorrent(request.CommunityID)
if err != nil {
m.logger.Error("failed to seed history archive", zap.Error(err))
}
go m.communitiesManager.StartHistoryArchiveTasksInterval(community, messageArchiveInterval)
go m.torrentManager.StartHistoryArchiveTasksInterval(community, messageArchiveInterval)
}
}
@ -1460,7 +1460,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
chunksCount := len(discordMessageChunks)
for ii, msgs := range discordMessageChunks {
m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord messages", ii+1, chunksCount, len(msgs)))
m.torrentManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord messages", ii+1, chunksCount, len(msgs)))
err = m.persistence.SaveDiscordMessages(msgs)
if err != nil {
m.cleanUpImport(communityID)
@ -1503,7 +1503,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
chunksCount = len(messageChunks)
for ii, msgs := range messageChunks {
m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d app messages", ii+1, chunksCount, len(msgs)))
m.torrentManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d app messages", ii+1, chunksCount, len(msgs)))
err = m.persistence.SaveMessages(msgs)
if err != nil {
m.cleanUpImport(communityID)
@ -1563,7 +1563,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
go func(id string, author *protobuf.DiscordMessageAuthor) {
defer wg.Done()
m.communitiesManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount))
m.torrentManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount))
imagePayload, err := discord.DownloadAvatarAsset(author.AvatarUrl)
if err != nil {
errmsg := fmt.Sprintf("Couldn't download profile avatar '%s': %s", author.AvatarUrl, err.Error())
@ -1615,7 +1615,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
defer wg.Done()
for ii, attachment := range attachments {
m.communitiesManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount))
m.torrentManager.LogStdout(fmt.Sprintf("downloading asset %d/%d", assetCounter.Value()+1, totalAssetsCount))
assetPayload, contentType, err := discord.DownloadAsset(attachment.Url)
if err != nil {
@ -1659,7 +1659,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
chunksCount = len(attachmentChunks)
for ii, attachments := range attachmentChunks {
m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord message attachments", ii+1, chunksCount, len(attachments)))
m.torrentManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord message attachments", ii+1, chunksCount, len(attachments)))
err = m.persistence.SaveDiscordMessageAttachments(attachments)
if err != nil {
m.cleanUpImport(communityID)
@ -1714,7 +1714,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
wakuMessages := append(wakuChatMessages, wakuPinMessages...)
topics, err := m.communitiesManager.GetCommunityChatsTopics(discordCommunity.ID())
topics, err := m.torrentManager.GetCommunityChatsTopics(discordCommunity.ID())
if err != nil {
m.logger.Error("failed to get community chat topics", zap.Error(err))
continue
@ -1723,7 +1723,7 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
startDate := time.Unix(int64(exportData.OldestMessageTimestamp), 0)
endDate := time.Now()
_, err = m.communitiesManager.CreateHistoryArchiveTorrentFromMessages(
_, err = m.torrentManager.CreateHistoryArchiveTorrentFromMessages(
discordCommunity.ID(),
wakuMessages,
topics,
@ -1739,11 +1739,11 @@ func (m *Messenger) RequestImportDiscordCommunity(request *requests.ImportDiscor
if m.torrentClientReady() && communitySettings.HistoryArchiveSupportEnabled {
err = m.communitiesManager.SeedHistoryArchiveTorrent(discordCommunity.ID())
err = m.torrentManager.SeedHistoryArchiveTorrent(discordCommunity.ID())
if err != nil {
m.logger.Error("failed to seed history archive", zap.Error(err))
}
go m.communitiesManager.StartHistoryArchiveTasksInterval(discordCommunity, messageArchiveInterval)
go m.torrentManager.StartHistoryArchiveTasksInterval(discordCommunity, messageArchiveInterval)
}
}

View File

@ -1356,12 +1356,12 @@ func (m *Messenger) HandleHistoryArchiveMagnetlinkMessage(state *ReceivedMessage
// part of and doesn't own the private key at the same time
if !community.IsControlNode() && community.Joined() && clock >= lastClock {
if lastSeenMagnetlink == magnetlink {
m.communitiesManager.LogStdout("already processed this magnetlink")
m.torrentManager.LogStdout("already processed this magnetlink")
return nil
}
m.communitiesManager.UnseedHistoryArchiveTorrent(id)
currentTask := m.communitiesManager.GetHistoryArchiveDownloadTask(id.String())
m.torrentManager.UnseedHistoryArchiveTorrent(id)
currentTask := m.torrentManager.GetHistoryArchiveDownloadTask(id.String())
go func(currentTask *communities.HistoryArchiveDownloadTask, communityID types.HexBytes) {
@ -1378,7 +1378,7 @@ func (m *Messenger) HandleHistoryArchiveMagnetlinkMessage(state *ReceivedMessage
Cancelled: false,
}
m.communitiesManager.AddHistoryArchiveDownloadTask(communityID.String(), task)
m.torrentManager.AddHistoryArchiveDownloadTask(communityID.String(), task)
// this wait groups tracks the ongoing task for a particular community
task.Waiter.Add(1)
@ -1397,32 +1397,32 @@ func (m *Messenger) HandleHistoryArchiveMagnetlinkMessage(state *ReceivedMessage
}
func (m *Messenger) downloadAndImportHistoryArchives(id types.HexBytes, magnetlink string, cancel chan struct{}) {
downloadTaskInfo, err := m.communitiesManager.DownloadHistoryArchivesByMagnetlink(id, magnetlink, cancel)
downloadTaskInfo, err := m.torrentManager.DownloadHistoryArchivesByMagnetlink(id, magnetlink, cancel)
if err != nil {
logMsg := "failed to download history archive data"
if err == communities.ErrTorrentTimedout {
m.communitiesManager.LogStdout("torrent has timed out, trying once more...")
downloadTaskInfo, err = m.communitiesManager.DownloadHistoryArchivesByMagnetlink(id, magnetlink, cancel)
m.torrentManager.LogStdout("torrent has timed out, trying once more...")
downloadTaskInfo, err = m.torrentManager.DownloadHistoryArchivesByMagnetlink(id, magnetlink, cancel)
if err != nil {
m.communitiesManager.LogStdout(logMsg, zap.Error(err))
m.torrentManager.LogStdout(logMsg, zap.Error(err))
return
}
} else {
m.communitiesManager.LogStdout(logMsg, zap.Error(err))
m.torrentManager.LogStdout(logMsg, zap.Error(err))
return
}
}
if downloadTaskInfo.Cancelled {
if downloadTaskInfo.TotalDownloadedArchivesCount > 0 {
m.communitiesManager.LogStdout(fmt.Sprintf("downloaded %d of %d archives so far", downloadTaskInfo.TotalDownloadedArchivesCount, downloadTaskInfo.TotalArchivesCount))
m.torrentManager.LogStdout(fmt.Sprintf("downloaded %d of %d archives so far", downloadTaskInfo.TotalDownloadedArchivesCount, downloadTaskInfo.TotalArchivesCount))
}
return
}
err = m.communitiesManager.UpdateLastSeenMagnetlink(id, magnetlink)
if err != nil {
m.communitiesManager.LogStdout("couldn't update last seen magnetlink", zap.Error(err))
m.torrentManager.LogStdout("couldn't update last seen magnetlink", zap.Error(err))
}
err = m.checkIfIMemberOfCommunity(id)
@ -1432,7 +1432,7 @@ func (m *Messenger) downloadAndImportHistoryArchives(id types.HexBytes, magnetli
err = m.importHistoryArchives(id, cancel)
if err != nil {
m.communitiesManager.LogStdout("failed to import history archives", zap.Error(err))
m.torrentManager.LogStdout("failed to import history archives", zap.Error(err))
m.config.messengerSignalsHandler.DownloadingHistoryArchivesFinished(types.EncodeHex(id))
return
}
@ -1475,13 +1475,13 @@ func (m *Messenger) handleArchiveMessages(archiveMessages []*protobuf.WakuMessag
err := m.handleImportedMessages(importedMessages)
if err != nil {
m.communitiesManager.LogStdout("failed to handle imported messages", zap.Error(err))
m.torrentManager.LogStdout("failed to handle imported messages", zap.Error(err))
return nil, err
}
response, err := m.handleRetrievedMessages(otherMessages, false, true)
if err != nil {
m.communitiesManager.LogStdout("failed to write history archive messages to database", zap.Error(err))
m.torrentManager.LogStdout("failed to write history archive messages to database", zap.Error(err))
return nil, err
}
@ -1730,7 +1730,7 @@ func (m *Messenger) HandleCommunityRequestToJoinResponse(state *ReceivedMessageS
magnetlink := requestToJoinResponseProto.MagnetUri
if m.torrentClientReady() && communitySettings != nil && communitySettings.HistoryArchiveSupportEnabled && magnetlink != "" {
currentTask := m.communitiesManager.GetHistoryArchiveDownloadTask(community.IDString())
currentTask := m.torrentManager.GetHistoryArchiveDownloadTask(community.IDString())
go func(currentTask *communities.HistoryArchiveDownloadTask) {
// Cancel ongoing download/import task
@ -1744,7 +1744,7 @@ func (m *Messenger) HandleCommunityRequestToJoinResponse(state *ReceivedMessageS
Waiter: *new(sync.WaitGroup),
Cancelled: false,
}
m.communitiesManager.AddHistoryArchiveDownloadTask(community.IDString(), task)
m.torrentManager.AddHistoryArchiveDownloadTask(community.IDString(), task)
task.Waiter.Add(1)
defer task.Waiter.Done()

View File

@ -12,7 +12,7 @@ import (
// EnvelopeSignalHandler sends signals when envelope is sent or expired.
type EnvelopeSignalHandler struct{}
// EnvelopeSent triggered when envelope delivered atleast to 1 peer.
// EnvelopeSent triggered when envelope delivered at least to 1 peer.
func (h EnvelopeSignalHandler) EnvelopeSent(identifiers [][]byte) {
signal.SendEnvelopeSent(identifiers)
}
@ -51,21 +51,21 @@ func (h PublisherSignalHandler) Stats(stats types.StatsSummary) {
signal.SendStats(stats)
}
// MessengerSignalHandler sends signals on messenger events
// MessengerSignalsHandler sends signals on messenger events
type MessengerSignalsHandler struct{}
// MessageDelivered passes information that message was delivered
func (m MessengerSignalsHandler) MessageDelivered(chatID string, messageID string) {
func (m *MessengerSignalsHandler) MessageDelivered(chatID string, messageID string) {
signal.SendMessageDelivered(chatID, messageID)
}
// BackupPerformed passes information that a backup was performed
func (m MessengerSignalsHandler) BackupPerformed(lastBackup uint64) {
func (m *MessengerSignalsHandler) BackupPerformed(lastBackup uint64) {
signal.SendBackupPerformed(lastBackup)
}
// MessageDelivered passes info about community that was requested before
func (m MessengerSignalsHandler) CommunityInfoFound(community *communities.Community) {
// CommunityInfoFound passes info about community that was requested before
func (m *MessengerSignalsHandler) CommunityInfoFound(community *communities.Community) {
signal.SendCommunityInfoFound(community)
}