feat(CommunitiesManager): introduce `CreateHistoryArchiveTorrentFromMessages` API

Prior to this commit we had a `CreateHistoryArchiveTorrent()` API which
takes a `startDate`, an `endDate` and a `partition` to create a bunch of
message archives, given a certain time range.

The function expects the messages to live in the database, which means,
all messages that need to be archived have to be saved there at some
point.

This turns out to be an issue when importing communities from third
party services, where, sometimes, there are several thousands of messages
including attachment payloads, that have to be save to the database
first.

There are only two options to get the messages into the database:

1. Make one write operation with all messages - this slow, takes a long
   time and blocks the database until done
2. Create message chunks and perform multiple write operations - this is
   also slow, takes long but makes the database a bit more responsive as
   it's many smaller operations instead of one big one

Option 2) turned out to not be super feasible either as sometimes,
inserting even a single such message can take up to 10 seconds
(depending on payload)

Which brings me to the third option.

**A third option** is to not store those imported messages as waku
message into the database, just to later query them again to create the
archives, but instead create the archives right away from all the
messages that have been loaded into memory.

This is significantly faster and doesn't block the database.

To make this possible, this commit introduces
a `CreateHistoryArchiveTorrentFromMessages()` API, and
a `CreateHistoryArchiveTorrentFromDB()` API which can be used for
different use cases.
This commit is contained in:
Pascal Precht 2022-10-20 16:37:04 +02:00 committed by r4bbit.eth
parent 7eb66d09e7
commit 0ccdec2985
2 changed files with 174 additions and 12 deletions

View File

@ -1538,7 +1538,7 @@ func (m *Manager) GetHistoryArchivePartitionStartTimestamp(communityID types.Hex
func (m *Manager) CreateAndSeedHistoryArchive(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) error {
m.UnseedHistoryArchiveTorrent(communityID)
_, err := m.CreateHistoryArchiveTorrent(communityID, topics, startDate, endDate, partition, encrypt)
_, err := m.CreateHistoryArchiveTorrentFromDB(communityID, topics, startDate, endDate, partition, encrypt)
if err != nil {
return err
}
@ -1626,7 +1626,17 @@ type EncodedArchiveData struct {
bytes []byte
}
func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) {
func (m *Manager) CreateHistoryArchiveTorrentFromMessages(communityID types.HexBytes, messages []*types.Message, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) {
return m.CreateHistoryArchiveTorrent(communityID, messages, topics, startDate, endDate, partition, encrypt)
}
func (m *Manager) CreateHistoryArchiveTorrentFromDB(communityID types.HexBytes, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) {
return m.CreateHistoryArchiveTorrent(communityID, make([]*types.Message, 0), topics, startDate, endDate, partition, encrypt)
}
func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, msgs []*types.Message, topics []types.TopicType, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) {
loadFromDB := len(msgs) == 0
from := startDate
to := from.Add(partition)
@ -1692,10 +1702,20 @@ func (m *Manager) CreateHistoryArchiveTorrent(communityID types.HexBytes, topics
zap.Any("to", to),
)
messages, err := m.persistence.GetWakuMessagesByFilterTopic(topics, uint64(from.Unix()), uint64(to.Unix()))
var messages []types.Message
if loadFromDB {
messages, err = m.persistence.GetWakuMessagesByFilterTopic(topics, uint64(from.Unix()), uint64(to.Unix()))
if err != nil {
return archiveIDs, err
}
} else {
for _, msg := range msgs {
if int64(msg.Timestamp) >= from.Unix() && int64(msg.Timestamp) < to.Unix() {
messages = append(messages, *msg)
}
}
}
if len(messages) == 0 {
// No need to create an archive with zero messages

View File

@ -275,7 +275,7 @@ func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_WithoutMessages() {
// Partition of 7 days
partition := 7 * 24 * time.Hour
_, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition, false)
_, err = s.manager.CreateHistoryArchiveTorrentFromDB(community.ID(), topics, startDate, endDate, partition, false)
s.Require().NoError(err)
// There are no waku messages in the database so we don't expect
@ -317,7 +317,7 @@ func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_ShouldCreateArchive() {
err = s.manager.StoreWakuMessage(&message3)
s.Require().NoError(err)
_, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition, false)
_, err = s.manager.CreateHistoryArchiveTorrentFromDB(community.ID(), topics, startDate, endDate, partition, false)
s.Require().NoError(err)
_, err = os.Stat(s.manager.archiveDataFile(community.IDString()))
@ -378,7 +378,7 @@ func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_ShouldCreateMultipleArchi
err = s.manager.StoreWakuMessage(&message4)
s.Require().NoError(err)
_, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition, false)
_, err = s.manager.CreateHistoryArchiveTorrentFromDB(community.ID(), topics, startDate, endDate, partition, false)
s.Require().NoError(err)
index, err := s.manager.LoadHistoryArchiveIndexFromFile(s.manager.identity, community.ID())
@ -427,7 +427,7 @@ func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_ShouldAppendArchives() {
err = s.manager.StoreWakuMessage(&message1)
s.Require().NoError(err)
_, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition, false)
_, err = s.manager.CreateHistoryArchiveTorrentFromDB(community.ID(), topics, startDate, endDate, partition, false)
s.Require().NoError(err)
index, err := s.manager.LoadHistoryArchiveIndexFromFile(s.manager.identity, community.ID())
@ -442,7 +442,149 @@ func (s *ManagerSuite) TestCreateHistoryArchiveTorrent_ShouldAppendArchives() {
err = s.manager.StoreWakuMessage(&message2)
s.Require().NoError(err)
_, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition, false)
_, err = s.manager.CreateHistoryArchiveTorrentFromDB(community.ID(), topics, startDate, endDate, partition, false)
s.Require().NoError(err)
index, err = s.manager.LoadHistoryArchiveIndexFromFile(s.manager.identity, community.ID())
s.Require().NoError(err)
s.Require().Len(index.Archives, 2)
}
func (s *ManagerSuite) TestCreateHistoryArchiveTorrentFromMessages() {
torrentConfig := buildTorrentConfig()
s.manager.SetTorrentConfig(&torrentConfig)
community, chatID, err := s.buildCommunityWithChat()
s.Require().NoError(err)
topic := types.BytesToTopic(transport.ToTopic(chatID))
topics := []types.TopicType{topic}
// Time range of 7 days
startDate := time.Date(2020, 1, 1, 00, 00, 00, 0, time.UTC)
endDate := time.Date(2020, 1, 7, 00, 00, 00, 0, time.UTC)
// Partition of 7 days, this should create a single archive
partition := 7 * 24 * time.Hour
message1 := buildMessage(startDate.Add(1*time.Hour), topic, []byte{1})
message2 := buildMessage(startDate.Add(2*time.Hour), topic, []byte{2})
// This message is outside of the startDate-endDate range and should not
// be part of the archive
message3 := buildMessage(endDate.Add(2*time.Hour), topic, []byte{3})
_, err = s.manager.CreateHistoryArchiveTorrentFromMessages(community.ID(), []*types.Message{&message1, &message2, &message3}, topics, startDate, endDate, partition, false)
s.Require().NoError(err)
_, err = os.Stat(s.manager.archiveDataFile(community.IDString()))
s.Require().NoError(err)
_, err = os.Stat(s.manager.archiveIndexFile(community.IDString()))
s.Require().NoError(err)
_, err = os.Stat(s.manager.torrentFile(community.IDString()))
s.Require().NoError(err)
index, err := s.manager.LoadHistoryArchiveIndexFromFile(s.manager.identity, community.ID())
s.Require().NoError(err)
s.Require().Len(index.Archives, 1)
totalData, err := os.ReadFile(s.manager.archiveDataFile(community.IDString()))
s.Require().NoError(err)
for _, metadata := range index.Archives {
archive := &protobuf.WakuMessageArchive{}
data := totalData[metadata.Offset : metadata.Offset+metadata.Size-metadata.Padding]
err = proto.Unmarshal(data, archive)
s.Require().NoError(err)
s.Require().Len(archive.Messages, 2)
}
}
func (s *ManagerSuite) TestCreateHistoryArchiveTorrentFromMessages_ShouldCreateMultipleArchives() {
torrentConfig := buildTorrentConfig()
s.manager.SetTorrentConfig(&torrentConfig)
community, chatID, err := s.buildCommunityWithChat()
s.Require().NoError(err)
topic := types.BytesToTopic(transport.ToTopic(chatID))
topics := []types.TopicType{topic}
// Time range of 3 weeks
startDate := time.Date(2020, 1, 1, 00, 00, 00, 0, time.UTC)
endDate := time.Date(2020, 1, 21, 00, 00, 00, 0, time.UTC)
// 7 days partition, this should create three archives
partition := 7 * 24 * time.Hour
message1 := buildMessage(startDate.Add(1*time.Hour), topic, []byte{1})
message2 := buildMessage(startDate.Add(2*time.Hour), topic, []byte{2})
// We expect 2 archives to be created for startDate - endDate of each
// 7 days of data. This message should end up in the second archive
message3 := buildMessage(startDate.Add(8*24*time.Hour), topic, []byte{3})
// This one should end up in the third archive
message4 := buildMessage(startDate.Add(14*24*time.Hour), topic, []byte{4})
_, err = s.manager.CreateHistoryArchiveTorrentFromMessages(community.ID(), []*types.Message{&message1, &message2, &message3, &message4}, topics, startDate, endDate, partition, false)
s.Require().NoError(err)
index, err := s.manager.LoadHistoryArchiveIndexFromFile(s.manager.identity, community.ID())
s.Require().NoError(err)
s.Require().Len(index.Archives, 3)
totalData, err := os.ReadFile(s.manager.archiveDataFile(community.IDString()))
s.Require().NoError(err)
// First archive has 2 messages
// Second archive has 1 message
// Third archive has 1 message
fromMap := map[uint64]int{
uint64(startDate.Unix()): 2,
uint64(startDate.Add(partition).Unix()): 1,
uint64(startDate.Add(partition * 2).Unix()): 1,
}
for _, metadata := range index.Archives {
archive := &protobuf.WakuMessageArchive{}
data := totalData[metadata.Offset : metadata.Offset+metadata.Size-metadata.Padding]
err = proto.Unmarshal(data, archive)
s.Require().NoError(err)
s.Require().Len(archive.Messages, fromMap[metadata.Metadata.From])
}
}
func (s *ManagerSuite) TestCreateHistoryArchiveTorrentFromMessages_ShouldAppendArchives() {
torrentConfig := buildTorrentConfig()
s.manager.SetTorrentConfig(&torrentConfig)
community, chatID, err := s.buildCommunityWithChat()
s.Require().NoError(err)
topic := types.BytesToTopic(transport.ToTopic(chatID))
topics := []types.TopicType{topic}
// Time range of 1 week
startDate := time.Date(2020, 1, 1, 00, 00, 00, 0, time.UTC)
endDate := time.Date(2020, 1, 7, 00, 00, 00, 0, time.UTC)
// 7 days partition, this should create one archive
partition := 7 * 24 * time.Hour
message1 := buildMessage(startDate.Add(1*time.Hour), topic, []byte{1})
_, err = s.manager.CreateHistoryArchiveTorrentFromMessages(community.ID(), []*types.Message{&message1}, topics, startDate, endDate, partition, false)
s.Require().NoError(err)
index, err := s.manager.LoadHistoryArchiveIndexFromFile(s.manager.identity, community.ID())
s.Require().NoError(err)
s.Require().Len(index.Archives, 1)
// Time range of next week
startDate = time.Date(2020, 1, 7, 00, 00, 00, 0, time.UTC)
endDate = time.Date(2020, 1, 14, 00, 00, 00, 0, time.UTC)
message2 := buildMessage(startDate.Add(2*time.Hour), topic, []byte{2})
_, err = s.manager.CreateHistoryArchiveTorrentFromMessages(community.ID(), []*types.Message{&message2}, topics, startDate, endDate, partition, false)
s.Require().NoError(err)
index, err = s.manager.LoadHistoryArchiveIndexFromFile(s.manager.identity, community.ID())
@ -472,7 +614,7 @@ func (s *ManagerSuite) TestSeedHistoryArchiveTorrent() {
err = s.manager.StoreWakuMessage(&message1)
s.Require().NoError(err)
_, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition, false)
_, err = s.manager.CreateHistoryArchiveTorrentFromDB(community.ID(), topics, startDate, endDate, partition, false)
s.Require().NoError(err)
err = s.manager.SeedHistoryArchiveTorrent(community.ID())
@ -509,7 +651,7 @@ func (s *ManagerSuite) TestUnseedHistoryArchiveTorrent() {
err = s.manager.StoreWakuMessage(&message1)
s.Require().NoError(err)
_, err = s.manager.CreateHistoryArchiveTorrent(community.ID(), topics, startDate, endDate, partition, false)
_, err = s.manager.CreateHistoryArchiveTorrentFromDB(community.ID(), topics, startDate, endDate, partition, false)
s.Require().NoError(err)
err = s.manager.SeedHistoryArchiveTorrent(community.ID())