From 0ccdec2985ee4d11be3b6b6d8a6a7652995ac4b6 Mon Sep 17 00:00:00 2001 From: Pascal Precht <445106+PascalPrecht@users.noreply.github.com> Date: Thu, 20 Oct 2022 16:37:04 +0200 Subject: [PATCH] feat(CommunitiesManager): introduce `CreateHistoryArchiveTorrentFromMessages` API Prior to this commit we had a `CreateHistoryArchiveTorrent()` API which takes a `startDate`, an `endDate` and a `partition` to create a bunch of message archives, given a certain time range. The function expects the messages to live in the database, which means, all messages that need to be archived have to be saved there at some point. This turns out to be an issue when importing communities from third party services, where, sometimes, there are several thousands of messages including attachment payloads, that have to be save to the database first. There are only two options to get the messages into the database: 1. Make one write operation with all messages - this slow, takes a long time and blocks the database until done 2. Create message chunks and perform multiple write operations - this is also slow, takes long but makes the database a bit more responsive as it's many smaller operations instead of one big one Option 2) turned out to not be super feasible either as sometimes, inserting even a single such message can take up to 10 seconds (depending on payload) Which brings me to the third option. **A third option** is to not store those imported messages as waku message into the database, just to later query them again to create the archives, but instead create the archives right away from all the messages that have been loaded into memory. This is significantly faster and doesn't block the database. To make this possible, this commit introduces a `CreateHistoryArchiveTorrentFromMessages()` API, and a `CreateHistoryArchiveTorrentFromDB()` API which can be used for different use cases. --- protocol/communities/manager.go | 30 +++++- protocol/communities/manager_test.go | 156 +++++++++++++++++++++++++-- 2 files changed, 174 insertions(+), 12 deletions(-) diff --git a/protocol/communities/manager.go b/protocol/communities/manager.go index 54d323626..73fc84e7d 100644 --- a/protocol/communities/manager.go +++ b/protocol/communities/manager.go @@ -1538,7 +1538,7 @@ func (m *Manager) GetHistoryArchivePartitionStartTimestamp(communityID types.Hex func (m *Manager) CreateAndSeedHistoryArchive(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) error { m.UnseedHistoryArchiveTorrent(communityID) - _, err := m.CreateHistoryArchiveTorrent(communityID, topics, startDate, endDate, partition, encrypt) + _, err := m.CreateHistoryArchiveTorrentFromDB(communityID, topics, startDate, endDate, partition, encrypt) if err != nil { return err } @@ -1626,7 +1626,17 @@ type EncodedArchiveData struct { bytes []byte } -func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) { +func (m *Manager) CreateHistoryArchiveTorrentFromMessages(communityID types.HexBytes, messages []*types.Message, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) { + return m.CreateHistoryArchiveTorrent(communityID, messages, topics, startDate, endDate, partition, encrypt) +} + +func (m *Manager) CreateHistoryArchiveTorrentFromDB(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) { + + return m.CreateHistoryArchiveTorrent(communityID, make([]*types.Message, 0), topics, startDate, endDate, partition, encrypt) +} +func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, msgs []*types.Message, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) { + + loadFromDB := len(msgs) == 0 from := startDate to := from.Add(partition) @@ -1692,9 +1702,19 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics zap.Any("to", to), ) - messages, err := m.persistence.GetWakuMessagesByFilterTopic(topics, uint64(from.Unix()), uint64(to.Unix())) - if err != nil { - return archiveIDs, err + var messages []types.Message + if loadFromDB { + messages, err = m.persistence.GetWakuMessagesByFilterTopic(topics, uint64(from.Unix()), uint64(to.Unix())) + if err != nil { + return archiveIDs, err + } + } else { + for _, msg := range msgs { + if int64(msg.Timestamp) >= from.Unix() && int64(msg.Timestamp) < to.Unix() { + messages = append(messages, *msg) + } + } + } if len(messages) == 0 { diff --git a/protocol/communities/manager_test.go b/protocol/communities/manager_test.go index e0195cf26..82161dde9 100644 --- a/protocol/communities/manager_test.go +++ b/protocol/communities/manager_test.go @@ -275,7 +275,7 @@ func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_WithoutMessages() { // Partition of 7 days partition := 7 * 24 * time.Hour - _, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition, false) + _, err = s.manager.CreateHistoryArchiveTorrentFromDB(community.ID(), topics, startDate, endDate, partition, false) s.Require().NoError(err) // There are no waku messages in the database so we don't expect @@ -317,7 +317,7 @@ func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_ShouldCreateArchive() { err = s.manager.StoreWakuMessage(&message3) s.Require().NoError(err) - _, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition, false) + _, err = s.manager.CreateHistoryArchiveTorrentFromDB(community.ID(), topics, startDate, endDate, partition, false) s.Require().NoError(err) _, err = os.Stat(s.manager.archiveDataFile(community.IDString())) @@ -378,7 +378,7 @@ func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_ShouldCreateMultipleArchi err = s.manager.StoreWakuMessage(&message4) s.Require().NoError(err) - _, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition, false) + _, err = s.manager.CreateHistoryArchiveTorrentFromDB(community.ID(), topics, startDate, endDate, partition, false) s.Require().NoError(err) index, err := s.manager.LoadHistoryArchiveIndexFromFile(s.manager.identity, community.ID()) @@ -427,7 +427,7 @@ func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_ShouldAppendArchives() { err = s.manager.StoreWakuMessage(&message1) s.Require().NoError(err) - _, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition, false) + _, err = s.manager.CreateHistoryArchiveTorrentFromDB(community.ID(), topics, startDate, endDate, partition, false) s.Require().NoError(err) index, err := s.manager.LoadHistoryArchiveIndexFromFile(s.manager.identity, community.ID()) @@ -442,7 +442,149 @@ func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_ShouldAppendArchives() { err = s.manager.StoreWakuMessage(&message2) s.Require().NoError(err) - _, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition, false) + _, err = s.manager.CreateHistoryArchiveTorrentFromDB(community.ID(), topics, startDate, endDate, partition, false) + s.Require().NoError(err) + + index, err = s.manager.LoadHistoryArchiveIndexFromFile(s.manager.identity, community.ID()) + s.Require().NoError(err) + s.Require().Len(index.Archives, 2) +} + +func (s *ManagerSuite) TestCreateHistoryArchiveTorrentFromMessages() { + torrentConfig := buildTorrentConfig() + s.manager.SetTorrentConfig(&torrentConfig) + + community, chatID, err := s.buildCommunityWithChat() + s.Require().NoError(err) + + topic := types.BytesToTopic(transport.ToTopic(chatID)) + topics := []types.TopicType{topic} + + // Time range of 7 days + startDate := time.Date(2020, 1, 1, 00, 00, 00, 0, time.UTC) + endDate := time.Date(2020, 1, 7, 00, 00, 00, 0, time.UTC) + // Partition of 7 days, this should create a single archive + partition := 7 * 24 * time.Hour + + message1 := buildMessage(startDate.Add(1*time.Hour), topic, []byte{1}) + message2 := buildMessage(startDate.Add(2*time.Hour), topic, []byte{2}) + // This message is outside of the startDate-endDate range and should not + // be part of the archive + message3 := buildMessage(endDate.Add(2*time.Hour), topic, []byte{3}) + + _, err = s.manager.CreateHistoryArchiveTorrentFromMessages(community.ID(), []*types.Message{&message1, &message2, &message3}, topics, startDate, endDate, partition, false) + s.Require().NoError(err) + + _, err = os.Stat(s.manager.archiveDataFile(community.IDString())) + s.Require().NoError(err) + _, err = os.Stat(s.manager.archiveIndexFile(community.IDString())) + s.Require().NoError(err) + _, err = os.Stat(s.manager.torrentFile(community.IDString())) + s.Require().NoError(err) + + index, err := s.manager.LoadHistoryArchiveIndexFromFile(s.manager.identity, community.ID()) + s.Require().NoError(err) + s.Require().Len(index.Archives, 1) + + totalData, err := os.ReadFile(s.manager.archiveDataFile(community.IDString())) + s.Require().NoError(err) + + for _, metadata := range index.Archives { + archive := &protobuf.WakuMessageArchive{} + data := totalData[metadata.Offset : metadata.Offset+metadata.Size-metadata.Padding] + + err = proto.Unmarshal(data, archive) + s.Require().NoError(err) + + s.Require().Len(archive.Messages, 2) + } +} + +func (s *ManagerSuite) TestCreateHistoryArchiveTorrentFromMessages_ShouldCreateMultipleArchives() { + torrentConfig := buildTorrentConfig() + s.manager.SetTorrentConfig(&torrentConfig) + + community, chatID, err := s.buildCommunityWithChat() + s.Require().NoError(err) + + topic := types.BytesToTopic(transport.ToTopic(chatID)) + topics := []types.TopicType{topic} + + // Time range of 3 weeks + startDate := time.Date(2020, 1, 1, 00, 00, 00, 0, time.UTC) + endDate := time.Date(2020, 1, 21, 00, 00, 00, 0, time.UTC) + // 7 days partition, this should create three archives + partition := 7 * 24 * time.Hour + + message1 := buildMessage(startDate.Add(1*time.Hour), topic, []byte{1}) + message2 := buildMessage(startDate.Add(2*time.Hour), topic, []byte{2}) + // We expect 2 archives to be created for startDate - endDate of each + // 7 days of data. This message should end up in the second archive + message3 := buildMessage(startDate.Add(8*24*time.Hour), topic, []byte{3}) + // This one should end up in the third archive + message4 := buildMessage(startDate.Add(14*24*time.Hour), topic, []byte{4}) + + _, err = s.manager.CreateHistoryArchiveTorrentFromMessages(community.ID(), []*types.Message{&message1, &message2, &message3, &message4}, topics, startDate, endDate, partition, false) + s.Require().NoError(err) + + index, err := s.manager.LoadHistoryArchiveIndexFromFile(s.manager.identity, community.ID()) + s.Require().NoError(err) + s.Require().Len(index.Archives, 3) + + totalData, err := os.ReadFile(s.manager.archiveDataFile(community.IDString())) + s.Require().NoError(err) + + // First archive has 2 messages + // Second archive has 1 message + // Third archive has 1 message + fromMap := map[uint64]int{ + uint64(startDate.Unix()): 2, + uint64(startDate.Add(partition).Unix()): 1, + uint64(startDate.Add(partition * 2).Unix()): 1, + } + + for _, metadata := range index.Archives { + archive := &protobuf.WakuMessageArchive{} + data := totalData[metadata.Offset : metadata.Offset+metadata.Size-metadata.Padding] + + err = proto.Unmarshal(data, archive) + s.Require().NoError(err) + s.Require().Len(archive.Messages, fromMap[metadata.Metadata.From]) + } +} + +func (s *ManagerSuite) TestCreateHistoryArchiveTorrentFromMessages_ShouldAppendArchives() { + torrentConfig := buildTorrentConfig() + s.manager.SetTorrentConfig(&torrentConfig) + + community, chatID, err := s.buildCommunityWithChat() + s.Require().NoError(err) + + topic := types.BytesToTopic(transport.ToTopic(chatID)) + topics := []types.TopicType{topic} + + // Time range of 1 week + startDate := time.Date(2020, 1, 1, 00, 00, 00, 0, time.UTC) + endDate := time.Date(2020, 1, 7, 00, 00, 00, 0, time.UTC) + // 7 days partition, this should create one archive + partition := 7 * 24 * time.Hour + + message1 := buildMessage(startDate.Add(1*time.Hour), topic, []byte{1}) + + _, err = s.manager.CreateHistoryArchiveTorrentFromMessages(community.ID(), []*types.Message{&message1}, topics, startDate, endDate, partition, false) + s.Require().NoError(err) + + index, err := s.manager.LoadHistoryArchiveIndexFromFile(s.manager.identity, community.ID()) + s.Require().NoError(err) + s.Require().Len(index.Archives, 1) + + // Time range of next week + startDate = time.Date(2020, 1, 7, 00, 00, 00, 0, time.UTC) + endDate = time.Date(2020, 1, 14, 00, 00, 00, 0, time.UTC) + + message2 := buildMessage(startDate.Add(2*time.Hour), topic, []byte{2}) + + _, err = s.manager.CreateHistoryArchiveTorrentFromMessages(community.ID(), []*types.Message{&message2}, topics, startDate, endDate, partition, false) s.Require().NoError(err) index, err = s.manager.LoadHistoryArchiveIndexFromFile(s.manager.identity, community.ID()) @@ -472,7 +614,7 @@ func (s *ManagerSuite) TestSeedHistoryArchiveTorrent() { err = s.manager.StoreWakuMessage(&message1) s.Require().NoError(err) - _, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition, false) + _, err = s.manager.CreateHistoryArchiveTorrentFromDB(community.ID(), topics, startDate, endDate, partition, false) s.Require().NoError(err) err = s.manager.SeedHistoryArchiveTorrent(community.ID()) @@ -509,7 +651,7 @@ func (s *ManagerSuite) TestUnseedHistoryArchiveTorrent() { err = s.manager.StoreWakuMessage(&message1) s.Require().NoError(err) - _, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition, false) + _, err = s.manager.CreateHistoryArchiveTorrentFromDB(community.ID(), topics, startDate, endDate, partition, false) s.Require().NoError(err) err = s.manager.SeedHistoryArchiveTorrent(community.ID())