chore: better community archive protocol logs
This commit is contained in:
parent
c7ce9adb5e
commit
4bddfbd466
|
@ -158,7 +158,9 @@ func (m *Manager) Start() error {
|
|||
|
||||
if m.torrentConfig != nil && m.torrentConfig.Enabled {
|
||||
err := m.StartTorrentClient()
|
||||
m.logger.Warn("couldn't start torrent client", zap.Error(err))
|
||||
if err != nil {
|
||||
m.LogStdout("couldn't start torrent client", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -2198,7 +2200,8 @@ func (m *Manager) DownloadHistoryArchivesByMagnetlink(communityID types.HexBytes
|
|||
startIndex := int(metadata.Offset) / pieceLength
|
||||
endIndex := startIndex + int(metadata.Size)/pieceLength
|
||||
|
||||
m.LogStdout("downloading data for message archive", zap.String("hash", hash))
|
||||
downloadMsg := fmt.Sprintf("downloading data for message archive (%d/%d)", downloadTaskInfo.TotalDownloadedArchivesCount+1, downloadTaskInfo.TotalArchivesCount)
|
||||
m.LogStdout(downloadMsg, zap.String("hash", hash))
|
||||
m.LogStdout("pieces (start, end)", zap.Any("startIndex", startIndex), zap.Any("endIndex", endIndex-1))
|
||||
torrent.DownloadPieces(startIndex, endIndex)
|
||||
|
||||
|
@ -2272,6 +2275,7 @@ func (m *Manager) ExtractMessagesFromHistoryArchives(communityID types.HexBytes,
|
|||
messages := make(map[transport.Filter][]*types.Message)
|
||||
|
||||
for _, hash := range archiveIDs {
|
||||
m.LogStdout("extracting messages from history archive", zap.String("archive id", hash))
|
||||
metadata := index.Archives[hash]
|
||||
|
||||
archive := &protobuf.WakuMessageArchive{}
|
||||
|
|
|
@ -3072,7 +3072,7 @@ func (m *Messenger) handleImportedMessages(messagesToHandle map[transport.Filter
|
|||
}
|
||||
|
||||
for _, msg := range statusMessages {
|
||||
logger := logger.With(zap.String("message-id", msg.ID.String()))
|
||||
logger := logger.With(zap.String("message-id", msg.TransportMessage.ThirdPartyID))
|
||||
logger.Debug("processing message")
|
||||
|
||||
publicKey := msg.SigPubKey()
|
||||
|
@ -3141,8 +3141,10 @@ func (m *Messenger) handleImportedMessages(messagesToHandle map[transport.Filter
|
|||
importMessagesCount := len(importMessagesToSave)
|
||||
if importMessagesCount > 0 {
|
||||
if importMessagesCount <= maxChunkSizeMessages {
|
||||
m.communitiesManager.LogStdout(fmt.Sprintf("saving %d discord messages", importMessagesCount))
|
||||
err := m.persistence.SaveDiscordMessages(importMessagesToSave)
|
||||
if err != nil {
|
||||
m.communitiesManager.LogStdout("failed to save discord messages", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
|
@ -3151,12 +3153,14 @@ func (m *Messenger) handleImportedMessages(messagesToHandle map[transport.Filter
|
|||
chunks := chunkSlice(importMessagesToSave, maxChunkSizeMessages)
|
||||
chunksCount := len(chunks)
|
||||
for i, msgs := range chunks {
|
||||
m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord messages", i+1, chunksCount, len(msgs)))
|
||||
// We can't defer Unlock here because we want to
|
||||
// unlock after every iteration to leave room for
|
||||
// other processes to access the database
|
||||
m.handleMessagesMutex.Lock()
|
||||
err := m.persistence.SaveDiscordMessages(msgs)
|
||||
if err != nil {
|
||||
m.communitiesManager.LogStdout(fmt.Sprintf("failed to save discord message chunk %d of %d", i+1, chunksCount), zap.Error(err))
|
||||
m.handleMessagesMutex.Unlock()
|
||||
return err
|
||||
}
|
||||
|
@ -3174,9 +3178,11 @@ func (m *Messenger) handleImportedMessages(messagesToHandle map[transport.Filter
|
|||
chunks := chunkAttachmentsByByteSize(messageAttachmentsToSave, maxChunkSizeBytes)
|
||||
chunksCount := len(chunks)
|
||||
for i, attachments := range chunks {
|
||||
m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d discord message attachments", i+1, chunksCount, len(attachments)))
|
||||
m.handleMessagesMutex.Lock()
|
||||
err := m.persistence.SaveDiscordMessageAttachments(attachments)
|
||||
if err != nil {
|
||||
m.communitiesManager.LogStdout(fmt.Sprintf("failed to save discord message attachments chunk %d of %d", i+1, chunksCount), zap.Error(err))
|
||||
m.handleMessagesMutex.Unlock()
|
||||
return err
|
||||
}
|
||||
|
@ -3192,6 +3198,7 @@ func (m *Messenger) handleImportedMessages(messagesToHandle map[transport.Filter
|
|||
messagesCount := len(messagesToSave)
|
||||
if messagesCount > 0 {
|
||||
if messagesCount <= maxChunkSizeMessages {
|
||||
m.communitiesManager.LogStdout(fmt.Sprintf("saving %d app messages", messagesCount))
|
||||
err := m.SaveMessages(messagesToSave)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -3200,6 +3207,7 @@ func (m *Messenger) handleImportedMessages(messagesToHandle map[transport.Filter
|
|||
chunks := chunkSlice(messagesToSave, maxChunkSizeMessages)
|
||||
chunksCount := len(chunks)
|
||||
for i, msgs := range chunks {
|
||||
m.communitiesManager.LogStdout(fmt.Sprintf("saving %d/%d chunk with %d app messages", i+1, chunksCount, len(msgs)))
|
||||
m.handleMessagesMutex.Lock()
|
||||
err := m.SaveMessages(msgs)
|
||||
if err != nil {
|
||||
|
|
|
@ -1853,15 +1853,18 @@ func (m *Messenger) handleSyncCommunitySettings(messageState *ReceivedMessageSta
|
|||
|
||||
func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community) {
|
||||
|
||||
m.communitiesManager.LogStdout("initializing history archive tasks")
|
||||
|
||||
for _, c := range communities {
|
||||
|
||||
if c.Joined() {
|
||||
settings, err := m.communitiesManager.GetCommunitySettingsByID(c.ID())
|
||||
if err != nil {
|
||||
m.logger.Debug("failed to get community settings", zap.Error(err))
|
||||
m.communitiesManager.LogStdout("failed to get community settings", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
if !settings.HistoryArchiveSupportEnabled {
|
||||
m.communitiesManager.LogStdout("history archive support disabled for community", zap.String("id", c.IDString()))
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -1875,7 +1878,7 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community
|
|||
|
||||
filters, err := m.communitiesManager.GetCommunityChatsFilters(c.ID())
|
||||
if err != nil {
|
||||
m.logger.Debug("failed to get community chats filters", zap.Error(err))
|
||||
m.communitiesManager.LogStdout("failed to get community chats filters for community", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -1944,7 +1947,7 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community
|
|||
}
|
||||
timeToNextInterval := messageArchiveInterval - durationSinceLastArchive
|
||||
|
||||
m.communitiesManager.LogStdout("Starting history archive tasks interval in", zap.Any("timeLeft", timeToNextInterval))
|
||||
m.communitiesManager.LogStdout("starting history archive tasks interval in", zap.Any("timeLeft", timeToNextInterval))
|
||||
time.AfterFunc(timeToNextInterval, func() {
|
||||
err := m.communitiesManager.CreateAndSeedHistoryArchive(c.ID(), topics, lastArchiveEndDate, to.Add(timeToNextInterval), messageArchiveInterval, c.Encrypted())
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue