From 4bddfbd466a109cc616e711a371e2c553fd7075c Mon Sep 17 00:00:00 2001 From: Pascal Precht <445106+PascalPrecht@users.noreply.github.com> Date: Fri, 9 Dec 2022 10:37:04 +0100 Subject: [PATCH] chore: better community archive protocol logs --- protocol/communities/manager.go | 8 ++++++-- protocol/messenger.go | 10 +++++++++- protocol/messenger_communities.go | 9 ++++++--- 3 files changed, 21 insertions(+), 6 deletions(-) diff --git a/protocol/communities/manager.go b/protocol/communities/manager.go index ecfb7e90a..6beecf22a 100644 --- a/protocol/communities/manager.go +++ b/protocol/communities/manager.go @@ -158,7 +158,9 @@ func (m *Manager) Start() error { if m.torrentConfig != nil && m.torrentConfig.Enabled { err := m.StartTorrentClient() - m.logger.Warn("couldn't start torrent client", zap.Error(err)) + if err != nil { + m.LogStdout("couldn't start torrent client", zap.Error(err)) + } } return nil @@ -2198,7 +2200,8 @@ func (m *Manager) DownloadHistoryArchivesByMagnetlink(communityID types.HexBytes startIndex := int(metadata.Offset) / pieceLength endIndex := startIndex + int(metadata.Size)/pieceLength - m.LogStdout("downloading data for message archive", zap.String("hash", hash)) + downloadMsg := fmt.Sprintf("downloading data for message archive (%d/%d)", downloadTaskInfo.TotalDownloadedArchivesCount+1, downloadTaskInfo.TotalArchivesCount) + m.LogStdout(downloadMsg, zap.String("hash", hash)) m.LogStdout("pieces (start, end)", zap.Any("startIndex", startIndex), zap.Any("endIndex", endIndex-1)) torrent.DownloadPieces(startIndex, endIndex) @@ -2272,6 +2275,7 @@ func (m *Manager) ExtractMessagesFromHistoryArchives(communityID types.HexBytes, messages := make(map[transport.Filter][]*types.Message) for _, hash := range archiveIDs { + m.LogStdout("extracting messages from history archive", zap.String("archive id", hash)) metadata := index.Archives[hash] archive := &protobuf.WakuMessageArchive{} diff --git a/protocol/messenger.go b/protocol/messenger.go index d8db49362..cfa52f01b 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -3072,7 +3072,7 @@ func (m *Messenger) handleImportedMessages(messagesToHandle map[transport.Filter } for _, msg := range statusMessages { - logger := logger.With(zap.String("message-id", msg.ID.String())) + logger := logger.With(zap.String("message-id", msg.TransportMessage.ThirdPartyID)) logger.Debug("processing message") publicKey := msg.SigPubKey() @@ -3141,8 +3141,10 @@ func (m *Messenger) handleImportedMessages(messagesToHandle map[transport.Filter importMessagesCount := len(importMessagesToSave) if importMessagesCount > 0 { if importMessagesCount <= maxChunkSizeMessages { + m.communitiesManager.LogStdout(fmt.Sprintf("saving %d discord messages", importMessagesCount)) err := m.persistence.SaveDiscordMessages(importMessagesToSave) if err != nil { + m.communitiesManager.LogStdout("failed to save discord messages", zap.Error(err)) return err } } else { @@ -3151,12 +3153,14 @@ func (m *Messenger) handleImportedMessages(messagesToHandle map[transport.Filter chunks := chunkSlice(importMessagesToSave, maxChunkSizeMessages) chunksCount := len(chunks) for i, msgs := range chunks { + m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord messages", i+1, chunksCount, len(msgs))) // We can't defer Unlock here because we want to // unlock after every iteration to leave room for // other processes to access the database m.handleMessagesMutex.Lock() err := m.persistence.SaveDiscordMessages(msgs) if err != nil { + m.communitiesManager.LogStdout(fmt.Sprintf("failed to save discord message chunk %d of %d", i+1, chunksCount), zap.Error(err)) m.handleMessagesMutex.Unlock() return err } @@ -3174,9 +3178,11 @@ func (m *Messenger) handleImportedMessages(messagesToHandle map[transport.Filter chunks := chunkAttachmentsByByteSize(messageAttachmentsToSave, maxChunkSizeBytes) chunksCount := len(chunks) for i, attachments := range chunks { + m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord message attachments", i+1, chunksCount, len(attachments))) m.handleMessagesMutex.Lock() err := m.persistence.SaveDiscordMessageAttachments(attachments) if err != nil { + m.communitiesManager.LogStdout(fmt.Sprintf("failed to save discord message attachments chunk %d of %d", i+1, chunksCount), zap.Error(err)) m.handleMessagesMutex.Unlock() return err } @@ -3192,6 +3198,7 @@ func (m *Messenger) handleImportedMessages(messagesToHandle map[transport.Filter messagesCount := len(messagesToSave) if messagesCount > 0 { if messagesCount <= maxChunkSizeMessages { + m.communitiesManager.LogStdout(fmt.Sprintf("saving %d app messages", messagesCount)) err := m.SaveMessages(messagesToSave) if err != nil { return err @@ -3200,6 +3207,7 @@ func (m *Messenger) handleImportedMessages(messagesToHandle map[transport.Filter chunks := chunkSlice(messagesToSave, maxChunkSizeMessages) chunksCount := len(chunks) for i, msgs := range chunks { + m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d app messages", i+1, chunksCount, len(msgs))) m.handleMessagesMutex.Lock() err := m.SaveMessages(msgs) if err != nil { diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index d0be8ff6f..c05414e69 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -1853,15 +1853,18 @@ func (m *Messenger) handleSyncCommunitySettings(messageState *ReceivedMessageSta func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community) { + m.communitiesManager.LogStdout("initializing history archive tasks") + for _, c := range communities { if c.Joined() { settings, err := m.communitiesManager.GetCommunitySettingsByID(c.ID()) if err != nil { - m.logger.Debug("failed to get community settings", zap.Error(err)) + m.communitiesManager.LogStdout("failed to get community settings", zap.Error(err)) continue } if !settings.HistoryArchiveSupportEnabled { + m.communitiesManager.LogStdout("history archive support disabled for community", zap.String("id", c.IDString())) continue } @@ -1875,7 +1878,7 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community filters, err := m.communitiesManager.GetCommunityChatsFilters(c.ID()) if err != nil { - m.logger.Debug("failed to get community chats filters", zap.Error(err)) + m.communitiesManager.LogStdout("failed to get community chats filters for community", zap.Error(err)) continue } @@ -1944,7 +1947,7 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community } timeToNextInterval := messageArchiveInterval - durationSinceLastArchive - m.communitiesManager.LogStdout("Starting history archive tasks interval in", zap.Any("timeLeft", timeToNextInterval)) + m.communitiesManager.LogStdout("starting history archive tasks interval in", zap.Any("timeLeft", timeToNextInterval)) time.AfterFunc(timeToNextInterval, func() { err := m.communitiesManager.CreateAndSeedHistoryArchive(c.ID(), topics, lastArchiveEndDate, to.Add(timeToNextInterval), messageArchiveInterval, c.Encrypted()) if err != nil {