feat(communities): add archiveLogger for stdout logs

The default logger writes to `geth.log`, which makes debugging
the archive protocol pretty hard.

This adds an additional logger that logs to stdout, while keeping
the default logger intact for production.
This commit is contained in:
Pascal Precht 2022-10-06 22:51:04 +02:00 committed by r4bbit.eth
parent 616a58f5c9
commit dd49c4c954
2 changed files with 49 additions and 27 deletions

View File

@ -43,6 +43,7 @@ type Manager struct {
ensVerifier *ens.Verifier
identity *ecdsa.PublicKey
logger *zap.Logger
stdoutLogger *zap.Logger
transport *transport.Transport
quit chan struct{}
torrentConfig *params.TorrentConfig
@ -64,8 +65,14 @@ func NewManager(identity *ecdsa.PublicKey, db *sql.DB, logger *zap.Logger, verif
}
}
stdoutLogger, err := zap.NewDevelopment()
if err != nil {
return nil, errors.Wrap(err, "failed to create archive logger")
}
manager := &Manager{
logger: logger,
stdoutLogger: stdoutLogger,
identity: identity,
quit: make(chan struct{}),
transport: transport,
@ -88,6 +95,11 @@ func NewManager(identity *ecdsa.PublicKey, db *sql.DB, logger *zap.Logger, verif
return manager, nil
}
func (m *Manager) LogStdout(msg string, fields ...zap.Field) {
m.stdoutLogger.Info(msg, fields...)
m.logger.Debug(msg, fields...)
}
type archiveMDSlice []*archiveMetadata
type archiveMetadata struct {
@ -1475,7 +1487,7 @@ func (m *Manager) GetLastMessageArchiveEndDate(communityID types.HexBytes) (uint
func (m *Manager) GetHistoryArchivePartitionStartTimestamp(communityID types.HexBytes) (uint64, error) {
filters, err := m.GetCommunityChatsFilters(communityID)
if err != nil {
m.logger.Warn("failed to get community chats filters", zap.Error(err))
m.LogStdout("failed to get community chats filters", zap.Error(err))
return 0, err
}
@ -1494,7 +1506,7 @@ func (m *Manager) GetHistoryArchivePartitionStartTimestamp(communityID types.Hex
lastArchiveEndDateTimestamp, err := m.GetLastMessageArchiveEndDate(communityID)
if err != nil {
m.logger.Debug("failed to get last archive end date", zap.Error(err))
m.LogStdout("failed to get last archive end date", zap.Error(err))
return 0, err
}
@ -1505,12 +1517,13 @@ func (m *Manager) GetHistoryArchivePartitionStartTimestamp(communityID types.Hex
// this community
lastArchiveEndDateTimestamp, err = m.GetOldestWakuMessageTimestamp(topics)
if err != nil {
m.logger.Warn("failed to get oldest waku message timestamp", zap.Error(err))
m.LogStdout("failed to get oldest waku message timestamp", zap.Error(err))
return 0, err
}
if lastArchiveEndDateTimestamp == 0 {
// This means there's no waku message stored for this community so far
// (even after requesting possibly missed messages), so no messages exist yet that can be archived
m.LogStdout("can't find valid `lastArchiveEndTimestamp`")
return 0, nil
}
}
@ -1532,7 +1545,7 @@ func (m *Manager) StartHistoryArchiveTasksInterval(community *Community, interva
_, exists := m.historyArchiveTasks[id]
if exists {
m.logger.Debug("History archive tasks interval already runs for community: ", zap.Any("id", id))
m.LogStdout("history archive tasks interval already in progres", zap.String("id", id))
return
}
@ -1543,26 +1556,27 @@ func (m *Manager) StartHistoryArchiveTasksInterval(community *Community, interva
ticker := time.NewTicker(interval)
defer ticker.Stop()
m.logger.Debug("Starting history archive tasks interval", zap.Any("id", id))
m.LogStdout("starting history archive tasks interval", zap.String("id", id))
for {
select {
case <-ticker.C:
m.logger.Debug("Executing history archive tasks", zap.Any("id", id))
m.LogStdout("starting archive task...", zap.String("id", id))
lastArchiveEndDateTimestamp, err := m.GetHistoryArchivePartitionStartTimestamp(community.ID())
if err != nil {
m.logger.Debug("failed to get last archive end date", zap.Error(err))
m.LogStdout("failed to get last archive end date", zap.Error(err))
continue
}
if lastArchiveEndDateTimestamp == 0 {
// This means there are no waku messages for this community,
// so nothing to do here
m.LogStdout("couldn't determine archive start date - skipping")
continue
}
topics, err := m.GetCommunityChatsTopics(community.ID())
if err != nil {
m.logger.Debug("failed to get community chats topics", zap.Error(err))
m.LogStdout("failed to get community chat topics ", zap.Error(err))
continue
}
@ -1572,7 +1586,7 @@ func (m *Manager) StartHistoryArchiveTasksInterval(community *Community, interva
err = m.CreateAndSeedHistoryArchive(community.ID(), topics, lastArchiveEndDate, to, interval)
if err != nil {
m.logger.Debug("failed to create and seed history archive", zap.Error(err))
m.LogStdout("failed to create and seed history archive", zap.Error(err))
continue
}
case <-cancel:
@ -1659,7 +1673,7 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics
CommunityID: communityID.String(),
}})
m.logger.Debug("Creating archives...",
m.LogStdout("creating archives",
zap.Any("startDate", startDate),
zap.Any("endDate", endDate),
zap.Duration("partition", partition),
@ -1668,11 +1682,11 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics
if from.Equal(endDate) || from.After(endDate) {
break
}
m.logger.Debug("Creating message archive",
zap.Duration("partition", partition),
m.LogStdout("creating message archive",
zap.Any("from", from),
zap.Any("to", to),
)
messages, err := m.persistence.GetWakuMessagesByFilterTopic(topics, uint64(from.Unix()), uint64(to.Unix()))
if err != nil {
return archiveIDs, err
@ -1680,7 +1694,7 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics
if len(messages) == 0 {
// No need to create an archive with zero messages
m.logger.Debug("No messages in this partition")
m.LogStdout("no messages in this partition")
from = to
to = to.Add(partition)
if to.After(endDate) {
@ -1792,6 +1806,8 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics
return archiveIDs, err
}
m.LogStdout("torrent created", zap.Any("from", startDate.Unix()), zap.Any("to", endDate.Unix()))
m.publish(&Subscription{
HistoryArchivesCreatedSignal: &signal.HistoryArchivesCreatedSignal{
CommunityID: communityID.String(),
@ -1800,7 +1816,7 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics
},
})
} else {
m.logger.Debug("No archives created")
m.LogStdout("no archives created")
m.publish(&Subscription{
NoHistoryArchivesCreatedSignal: &signal.NoHistoryArchivesCreatedSignal{
CommunityID: communityID.String(),
@ -1860,8 +1876,10 @@ func (m *Manager) SeedHistoryArchiveTorrent(communityID types.HexBytes) error {
CommunityID: communityID.String(),
},
})
m.logger.Info("Seeding torrent", zap.String("id", id))
m.logger.Info(metaInfo.Magnet(nil, &info).String())
magnetLink := metaInfo.Magnet(nil, &info).String()
m.LogStdout("seeding torrent", zap.String("id", id), zap.String("magnetLink", magnetLink))
return nil
}
@ -1907,6 +1925,8 @@ func (m *Manager) DownloadHistoryArchivesByMagnetlink(communityID types.HexBytes
}
m.torrentTasks[id] = ml.InfoHash
timeout := time.After(20 * time.Second)
m.LogStdout("fetching torrent info", zap.String("magnetlink", magnetlink))
select {
case <-timeout:
return nil, errors.New("torrent has timed out")
@ -1921,8 +1941,8 @@ func (m *Manager) DownloadHistoryArchivesByMagnetlink(communityID types.HexBytes
indexFile := files[i]
indexFile.Download()
m.logger.Debug("downloading history archive index")
m.LogStdout("downloading history archive index")
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
@ -1953,13 +1973,14 @@ func (m *Manager) DownloadHistoryArchivesByMagnetlink(communityID types.HexBytes
continue
}
if hasArchive {
m.LogStdout("has archive", zap.String("hash", hash))
continue
}
startIndex := int(metadata.Offset) / pieceLength
endIndex := startIndex + int(metadata.Size)/pieceLength
m.logger.Debug("downloading data for message archive", zap.String("hash", hash))
m.LogStdout("downloading data for message archive", zap.String("hash", hash))
m.logger.Debug("pieces (start, end)", zap.Any("startIndex", startIndex), zap.Any("endIndex", endIndex-1))
torrent.DownloadPieces(startIndex, endIndex)
@ -2001,6 +2022,7 @@ func (m *Manager) DownloadHistoryArchivesByMagnetlink(communityID types.HexBytes
CommunityID: communityID.String(),
},
})
m.LogStdout("finished download archives")
return archiveIDs, nil
}
}

View File

@ -1667,7 +1667,7 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community
if m.communitiesManager.TorrentFileExists(c.IDString()) {
err = m.communitiesManager.SeedHistoryArchiveTorrent(c.ID())
if err != nil {
m.logger.Debug("failed to seed history archive", zap.Error(err))
m.communitiesManager.LogStdout("failed to seed history archive", zap.Error(err))
}
}
@ -1678,7 +1678,7 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community
}
if len(filters) == 0 {
m.logger.Debug("no filters or chats for this community starting interval", zap.String("id", c.IDString()))
m.communitiesManager.LogStdout("no filters or chats for this community starting interval", zap.String("id", c.IDString()))
go m.communitiesManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval)
continue
}
@ -1694,7 +1694,7 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community
// possibly missed since then
latestWakuMessageTimestamp, err := m.communitiesManager.GetLatestWakuMessageTimestamp(topics)
if err != nil {
m.logger.Debug("failed to get Latest waku message timestamp", zap.Error(err))
m.communitiesManager.LogStdout("failed to get Latest waku message timestamp", zap.Error(err))
continue
}
@ -1711,7 +1711,7 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community
// Request possibly missed waku messages for community
_, err = m.syncFiltersFrom(filters, uint32(latestWakuMessageTimestamp))
if err != nil {
m.logger.Debug("failed to request missing messages", zap.Error(err))
m.communitiesManager.LogStdout("failed to request missing messages", zap.Error(err))
continue
}
@ -1720,7 +1720,7 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community
// If the last end date is at least `interval` ago, we create an archive immediately first
lastArchiveEndDateTimestamp, err := m.communitiesManager.GetHistoryArchivePartitionStartTimestamp(c.ID())
if err != nil {
m.logger.Debug("failed to get archive partition start timestamp", zap.Error(err))
m.communitiesManager.LogStdout("failed to get archive partition start timestamp", zap.Error(err))
continue
}
@ -1738,15 +1738,15 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community
// Seed current archive in the meantime
err := m.communitiesManager.SeedHistoryArchiveTorrent(c.ID())
if err != nil {
m.logger.Debug("failed to seed history archive", zap.Error(err))
m.communitiesManager.LogStdout("failed to seed history archive", zap.Error(err))
}
timeToNextInterval := messageArchiveInterval - durationSinceLastArchive
m.logger.Debug("Starting history archive tasks interval in", zap.Any("timeLeft", timeToNextInterval))
m.communitiesManager.LogStdout("Starting history archive tasks interval in", zap.Any("timeLeft", timeToNextInterval))
time.AfterFunc(timeToNextInterval, func() {
err := m.communitiesManager.CreateAndSeedHistoryArchive(c.ID(), topics, lastArchiveEndDate, to.Add(timeToNextInterval), messageArchiveInterval)
if err != nil {
m.logger.Debug("failed to get create and seed history archive", zap.Error(err))
m.communitiesManager.LogStdout("failed to get create and seed history archive", zap.Error(err))
}
go m.communitiesManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval)
})
@ -1756,7 +1756,7 @@ func (m *Messenger) InitHistoryArchiveTasks(communities []*communities.Community
// creation loop
err := m.communitiesManager.CreateAndSeedHistoryArchive(c.ID(), topics, lastArchiveEndDate, to, messageArchiveInterval)
if err != nil {
m.logger.Debug("failed to get create and seed history archive", zap.Error(err))
m.communitiesManager.LogStdout("failed to get create and seed history archive", zap.Error(err))
}
go m.communitiesManager.StartHistoryArchiveTasksInterval(c, messageArchiveInterval)