10 KiB
Codex for History Archives
As indicated in the Team-NLBR Solution Proposal, the central entry point to the history management is InitHistoryArchiveTasks. InitHistoryArchiveTasks is called from two main places:
- During
Messenger.Start()(startup) - When enabling archive protocol
In Creating History Archives - InitHistoryArchiveTasks we find the complete initialization flow:
System Startup
↓
Messenger.Start()
↓
Wait for Store Node Availability
↓
InitHistoryArchiveTasks(controlledCommunities)
│
├─ For each community owner controls:
│ ├─ Check if archive support enabled
│ ├─ Seed existing torrents (if available)
| ├─ CreateAndSeedHistoryArchive
│ ├─ Get community topics and sync missed messages
│ ├─ Check when last archive was created
│ └─ Based on last archive timing:
│ ├─ No archives → StartHistoryArchiveTasksInterval() immediately
│ ├─ Recent archive → Seed + delayed CreateAndSeedHistoryArchive followed by StartHistoryArchiveTasksInterval()
│ └─ Old archive → Create new archive + CreateAndSeedHistoryArchive + StartHistoryArchiveTasksInterval()
│
└─ Each StartHistoryArchiveTasksInterval():
├─ Runs as background goroutine
├─ Creates ticker with 7-day interval
├─ Every 7 days: CreateAndSeedHistoryArchive()
├─ After seeding: publishes HistoryArchivesSeedingSignal
├─ Signal triggers: dispatchMagnetlinkMessage()
└─ Magnetlink sent to all community members via Waku
We will be going step by step through this flow and apply the changes (where we need to diverge, we will...).
BitTorrent - with or without
In the first pass we do not delete the BitTorrent related code, but rather try to add Codex extensions next to it - this way I hope it will be easier to move things around without being too destructive from the beginning.
Seed existing torrents (if available)
This step is only needed for torrents. Codex has its own persistence and will start seeding immediately after it starts.
CreateAndSeedHistoryArchive
The first function that asks for attention is CreateAndSeedHistoryArchive. It is from ArchiveService interface.
func (m *ArchiveManager) CreateAndSeedHistoryArchive(communityID types.HexBytes, topics []messagingtypes.ContentTopic, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) error {
m.UnseedHistoryArchiveTorrent(communityID)
_, err := m.ArchiveFileManager.CreateHistoryArchiveTorrentFromDB(communityID, topics, startDate, endDate, partition, encrypt)
if err != nil {
return err
}
return m.SeedHistoryArchiveTorrent(communityID)
}
It calls CreateHistoryArchiveTorrentFromDB, which then calls createHistoryArchiveTorrent:
func (m *ArchiveFileManager) CreateHistoryArchiveTorrentFromDB(communityID types.HexBytes, topics []messagingtypes.ContentTopic, startDate time.Time, endDate time.Time, partition time.Duration, encrypt bool) ([]string, error) {
return m.createHistoryArchiveTorrent(communityID, make([]*messagingtypes.ReceivedMessage, 0), topics, startDate, endDate, partition, encrypt)
}
createHistoryArchiveTorrent (ArchiveFileManager) is where the work is done.
Protobuf messages
Here we list all the Protobuf messages that are relevant to message archives:
message CommunityMessageArchiveMagnetlink {
uint64 clock = 1;
string magnet_uri = 2;
}
message WakuMessage {
bytes sig = 1;
uint64 timestamp = 2;
bytes topic = 3;
bytes payload = 4;
bytes padding = 5;
bytes hash = 6;
string thirdPartyId = 7;
}
message WakuMessageArchiveMetadata {
uint32 version = 1;
uint64 from = 2;
uint64 to = 3;
repeated bytes contentTopic = 4;
}
message WakuMessageArchive {
uint32 version = 1;
WakuMessageArchiveMetadata metadata = 2;
repeated WakuMessage messages = 3;
}
message WakuMessageArchiveIndexMetadata {
uint32 version = 1;
WakuMessageArchiveMetadata metadata = 2;
uint64 offset = 3;
uint64 size = 4;
uint64 padding = 5;
}
message WakuMessageArchiveIndex {
map<string, WakuMessageArchiveIndexMetadata> archives = 1;
}
All in protocol/protobuf/communities.proto. There is one more, not directly related, but for some reason it contains a magnet_url field (to be checked later):
message CommunityRequestToJoinResponse {
uint64 clock = 1;
CommunityDescription community = 2 [deprecated = true];
bool accepted = 3;
bytes grant = 4;
bytes community_id = 5;
string magnet_uri = 6;
bytes protected_topic_private_key = 7;
Shard shard = 8;
// CommunityDescription protocol message with owner signature
bytes community_description_protocol_message = 9;
}
We see that most are independent from BitTorrent. The ones that are BitTorrent specific are:
CommunityMessageArchiveMagnetlinkWakuMessageArchiveIndexMetadataWakuMessageArchiveIndex(because it depends onWakuMessageArchiveIndexMetadata)CommunityRequestToJoinResponse(because of themagnet_urifield)
Now, starting with something simple (in the end we are building PoC here), we know that Codex API operates on CID encoded as base58btc strings. In WakuMessageArchiveIndexMetadata, offset, size, and padding are relevant to the current BitTorrent-based implementation. For Codex we can use something simpler:
message CodexWakuMessageArchiveIndexMetadata {
uint32 version = 1;
WakuMessageArchiveMetadata metadata = 2;
string cid = 3;
}
message CodexWakuMessageArchiveIndex {
map<string, CodexWakuMessageArchiveIndexMetadata> archives = 1;
}
Appending the index file
In a more production version we will not operate on the local file system, yet, here, for simplicity, we will be using a physical index file and a separate file for each archive. For this reason, in the initial implementation, a community owner will not query Codex for the current index file. For this purpose, we could use http://localhost:8001/api/codex/v1/data/${CID} API, which returns 404 when the file does not exist in the local store:
curl -s -D - -o /dev/null "http://localhost:8001/api/codex/v1/data/${CID}"
HTTP/1.1 404 Not Found
Connection: close
Server: nim-presto/0.0.3 (amd64/linux)
Content-Length: 74
Date: Thu, 25 Sep 2025 02:15:07 GMT
Content-Type: text/html; charset=utf-8
Instead, for this initial implementation, we will just read it from a local directory. For now, we will reuse BitTorrent configuration. BitTorrent config stores the index file under:
path.Join(m.torrentConfig.DataDir, communityID, "index")
For codex, we will store it under:
path.Join(m.torrentConfig.DataDir, "codex", communityID, "index")
In a similar way, the individual archive to be uploaded we will use:
path.Join(m.torrentConfig.DataDir, "codex", communityID, "data")
This data file is temporary and will be overwritten for each new archive created. With Codex, we do not have to append, thus, we do not need the previous data file anymore. We just use file now, because it may be easier to start it this way.
This is done with LoadHistoryArchiveIndexFromFile
Testing
There will be a number of tests that will need to adjust or fix.
But there is one test that has slightly more end-to-end nature. It is from the protocol package:
protocol/communities_messenger_token_permissions_test.go
This test call a couple of important functions, which will be a good indication which functions will need taken into account.
The test - TestImportDecryptedArchiveMessages - first creates a community and sets up the corresponding permissions. Then the community owner sends a message to the community and then immediately retrieves it so that it is now recorded in the DB.
After that it prepares archive parameters: startDate, endDate, partition, and community topics. All those will be passed to CreateHistoryArchiveTorrentFromDB - our entry point to creating history archive torrent.
// 1.1. Create community
community, chat := s.createCommunity()
// ...
// 1.2. Setup permissions
// ...
// 2. Owner: Send a message A
messageText1 := RandomLettersString(10)
message1 := s.sendChatMessage(s.owner, chat.ID, messageText1)
// 2.2. Retrieve own message (to make it stored in the archive later)
_, err = s.owner.RetrieveAll()
s.Require().NoError(err)
// 3. Owner: Create community archive
const partition = 2 * time.Minute
messageDate := time.UnixMilli(int64(message1.Timestamp))
startDate := messageDate.Add(-time.Minute)
endDate := messageDate.Add(time.Minute)
topic := messagingtypes.BytesToContentTopic(messaging.ToContentTopic(chat.ID))
communityCommonTopic := messagingtypes.BytesToContentTopic(messaging.ToContentTopic(community.UniversalChatID()))
topics := []messagingtypes.ContentTopic{topic, communityCommonTopic}
torrentConfig := params.TorrentConfig{
Enabled: true,
DataDir: os.TempDir() + "/archivedata",
TorrentDir: os.TempDir() + "/torrents",
Port: 0,
}
// Share archive directory between all users
s.owner.archiveManager.SetTorrentConfig(&torrentConfig)
s.bob.archiveManager.SetTorrentConfig(&torrentConfig)
s.owner.config.messengerSignalsHandler = &MessengerSignalsHandlerMock{}
s.bob.config.messengerSignalsHandler = &MessengerSignalsHandlerMock{}
Finally we call the CreateHistoryArchiveTorrentFromDB:
archiveIDs, err := s.owner.archiveManager.CreateHistoryArchiveTorrentFromDB(community.ID(), topics, startDate, endDate, partition, community.Encrypted())
s.Require().NoError(err)
s.Require().Len(archiveIDs, 1)
Notice, there is one archive expected.
The CreateHistoryArchiveTorrentFromDB is called directly here, in a way bypassing the torrent seeding: in normal flow CreateHistoryArchiveTorrentFromDB is called in CreateAndSeedHistoryArchive which immediately after creating the archive, calls SeedHistoryArchiveTorrent. CreateHistoryArchiveTorrentFromDB calls createHistoryArchiveTorrent - which is central to the archive creating.