Keep track of last seen magnetlink

This is to prevent processing magnetlinks if they haven't changed
This commit is contained in:
Pascal Precht 2022-12-19 09:34:37 +01:00 committed by r4bbit
parent e7d827fff1
commit b4bdfd3df6
6 changed files with 130 additions and 53 deletions

View File

@ -39,6 +39,7 @@
// 1664783660_add_sync_info_to_saved_addresses.up.sql (388B)
// 1668109917_wakunodes.up.sql (99B)
// 1670836810_add_imported_flag_to_community_archive_hashes.up.sql (144B)
// 1671438731_add_magnetlink_uri_to_communities_archive_info.up.sql (86B)
// doc.go (74B)
package migrations
@ -883,11 +884,31 @@ func _1670836810_add_imported_flag_to_community_archive_hashesUpSql() (*asset, e
return nil, err
}
info := bindataFileInfo{name: "1670836810_add_imported_flag_to_community_archive_hashes.up.sql", size: 144, mode: os.FileMode(0664), modTime: time.Unix(1670836854, 0)}
info := bindataFileInfo{name: "1670836810_add_imported_flag_to_community_archive_hashes.up.sql", size: 144, mode: os.FileMode(0664), modTime: time.Unix(1670944751, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x6f, 0xf, 0xf0, 0xbd, 0xfe, 0x63, 0x25, 0x8f, 0x5e, 0x46, 0x4b, 0x45, 0x31, 0x8b, 0x3e, 0xd8, 0x6b, 0x5d, 0x9d, 0x6d, 0x10, 0x9a, 0x87, 0x4b, 0x18, 0xc6, 0x39, 0x81, 0x6e, 0xe4, 0x75, 0xfb}}
return a, nil
}
var __1671438731_add_magnetlink_uri_to_communities_archive_infoUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x04\xc0\xc1\x0a\xc2\x30\x0c\x06\xe0\xfb\x9e\xe2\x67\xaf\xe1\x29\xda\x78\x8a\x0a\x92\x81\xb7\x50\x46\xd5\xe0\x9a\xc1\xd6\xfa\xfc\xfb\x48\x94\x9f\x50\x3a\x0b\x63\x5e\x6b\xed\xe1\xcd\xcb\x6e\x79\x9b\xbf\xfe\x2f\xe6\xf1\x5e\x41\x29\xe1\xf2\x90\xe9\x76\xc7\x92\xf7\x66\x35\x7f\xa2\xb4\xc5\xe3\x67\x7d\x73\x28\xbf\x14\x89\xaf\x34\x89\x62\x1c\x4f\xc3\x70\x04\x00\x00\xff\xff\x53\xcc\x9d\x7d\x56\x00\x00\x00")
func _1671438731_add_magnetlink_uri_to_communities_archive_infoUpSqlBytes() ([]byte, error) {
return bindataRead(
__1671438731_add_magnetlink_uri_to_communities_archive_infoUpSql,
"1671438731_add_magnetlink_uri_to_communities_archive_info.up.sql",
)
}
func _1671438731_add_magnetlink_uri_to_communities_archive_infoUpSql() (*asset, error) {
bytes, err := _1671438731_add_magnetlink_uri_to_communities_archive_infoUpSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "1671438731_add_magnetlink_uri_to_communities_archive_info.up.sql", size: 86, mode: os.FileMode(0664), modTime: time.Unix(1671438768, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xda, 0x8b, 0x4b, 0xd6, 0xd8, 0xe2, 0x3d, 0xf7, 0x6b, 0xcd, 0x1e, 0x70, 0x9, 0x2e, 0x35, 0x4, 0x61, 0xc3, 0xb5, 0x9d, 0xc5, 0x27, 0x21, 0xa, 0x5a, 0xd6, 0x3e, 0xa6, 0x24, 0xa2, 0x12, 0xdf}}
return a, nil
}
var _docGo = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x2c\xc9\xb1\x0d\xc4\x20\x0c\x05\xd0\x9e\x29\xfe\x02\xd8\xfd\x6d\xe3\x4b\xac\x2f\x44\x82\x09\x78\x7f\xa5\x49\xfd\xa6\x1d\xdd\xe8\xd8\xcf\x55\x8a\x2a\xe3\x47\x1f\xbe\x2c\x1d\x8c\xfa\x6f\xe3\xb4\x34\xd4\xd9\x89\xbb\x71\x59\xb6\x18\x1b\x35\x20\xa2\x9f\x0a\x03\xa2\xe5\x0d\x00\x00\xff\xff\x60\xcd\x06\xbe\x4a\x00\x00\x00")
func docGoBytes() ([]byte, error) {
@ -1077,6 +1098,8 @@ var _bindata = map[string]func() (*asset, error){
"1670836810_add_imported_flag_to_community_archive_hashes.up.sql": _1670836810_add_imported_flag_to_community_archive_hashesUpSql,
"1671438731_add_magnetlink_uri_to_communities_archive_info.up.sql": _1671438731_add_magnetlink_uri_to_communities_archive_infoUpSql,
"doc.go": docGo,
}
@ -1160,6 +1183,7 @@ var _bintree = &bintree{nil, map[string]*bintree{
"1664783660_add_sync_info_to_saved_addresses.up.sql": &bintree{_1664783660_add_sync_info_to_saved_addressesUpSql, map[string]*bintree{}},
"1668109917_wakunodes.up.sql": &bintree{_1668109917_wakunodesUpSql, map[string]*bintree{}},
"1670836810_add_imported_flag_to_community_archive_hashes.up.sql": &bintree{_1670836810_add_imported_flag_to_community_archive_hashesUpSql, map[string]*bintree{}},
"1671438731_add_magnetlink_uri_to_communities_archive_info.up.sql": &bintree{_1671438731_add_magnetlink_uri_to_communities_archive_infoUpSql, map[string]*bintree{}},
"doc.go": &bintree{docGo, map[string]*bintree{}},
}}

View File

@ -0,0 +1,2 @@
ALTER TABLE communities_archive_info ADD COLUMN last_magnetlink_uri TEXT DEFAULT "";

2
go.mod
View File

@ -68,7 +68,7 @@ require (
go.uber.org/zap v1.23.0
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e
golang.org/x/image v0.0.0-20210220032944-ac19c3e999fb
google.golang.org/protobuf v1.28.1
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/go-playground/assert.v1 v1.2.1 // indirect
gopkg.in/go-playground/validator.v9 v9.31.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0

View File

@ -1153,6 +1153,14 @@ func (m *Manager) UpdateMagnetlinkMessageClock(communityID types.HexBytes, clock
return m.persistence.UpdateMagnetlinkMessageClock(communityID, clock)
}
func (m *Manager) UpdateLastSeenMagnetlink(communityID types.HexBytes, magnetlinkURI string) error {
return m.persistence.UpdateLastSeenMagnetlink(communityID, magnetlinkURI)
}
func (m *Manager) GetLastSeenMagnetlink(communityID types.HexBytes) (string, error) {
return m.persistence.GetLastSeenMagnetlink(communityID)
}
func (m *Manager) LeaveCommunity(id types.HexBytes) (*Community, error) {
community, err := m.GetByID(id)
if err != nil {
@ -2121,6 +2129,10 @@ func (m *Manager) GetHistoryArchiveDownloadTask(communityID string) *HistoryArch
return m.historyArchiveDownloadTasks[communityID]
}
func (m *Manager) DeleteHistoryArchiveDownloadTask(communityID string) {
delete(m.historyArchiveDownloadTasks, communityID)
}
func (m *Manager) AddHistoryArchiveDownloadTask(communityID string, task *HistoryArchiveDownloadTask) {
m.historyArchiveDownloadTasks[communityID] = task
}
@ -2146,6 +2158,12 @@ func (m *Manager) DownloadHistoryArchivesByMagnetlink(communityID types.HexBytes
return nil, err
}
downloadTaskInfo := &HistoryArchiveDownloadTaskInfo{
TotalDownloadedArchivesCount: 0,
TotalArchivesCount: 0,
Cancelled: false,
}
m.torrentTasks[id] = ml.InfoHash
timeout := time.After(20 * time.Second)
@ -2153,6 +2171,10 @@ func (m *Manager) DownloadHistoryArchivesByMagnetlink(communityID types.HexBytes
select {
case <-timeout:
return nil, ErrTorrentTimedout
case <-cancelTask:
m.LogStdout("cancelled fetching torrent info")
downloadTaskInfo.Cancelled = true
return downloadTaskInfo, nil
case <-torrent.GotInfo():
files := torrent.Files()
@ -2170,12 +2192,6 @@ func (m *Manager) DownloadHistoryArchivesByMagnetlink(communityID types.HexBytes
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
downloadTaskInfo := &HistoryArchiveDownloadTaskInfo{
TotalDownloadedArchivesCount: 0,
TotalArchivesCount: 0,
Cancelled: false,
}
for {
select {
case <-cancelTask:

View File

@ -597,6 +597,15 @@ func (p *Persistence) HasCommunityArchiveInfo(communityID types.HexBytes) (exist
return exists, err
}
func (p *Persistence) GetLastSeenMagnetlink(communityID types.HexBytes) (string, error) {
var magnetlinkURI string
err := p.db.QueryRow(`SELECT last_magnetlink_uri FROM communities_archive_info WHERE community_id = ?`, communityID.String()).Scan(&magnetlinkURI)
if err == sql.ErrNoRows {
return "", nil
}
return magnetlinkURI, err
}
func (p *Persistence) GetMagnetlinkMessageClock(communityID types.HexBytes) (uint64, error) {
var magnetlinkClock uint64
err := p.db.QueryRow(`SELECT magnetlink_clock FROM communities_archive_info WHERE community_id = ?`, communityID.String()).Scan(&magnetlinkClock)
@ -623,6 +632,15 @@ func (p *Persistence) UpdateMagnetlinkMessageClock(communityID types.HexBytes, c
return err
}
func (p *Persistence) UpdateLastSeenMagnetlink(communityID types.HexBytes, magnetlinkURI string) error {
_, err := p.db.Exec(`UPDATE communities_archive_info SET
last_magnetlink_uri = ?
WHERE community_id = ?`,
magnetlinkURI,
communityID.String())
return err
}
func (p *Persistence) SaveLastMessageArchiveEndDate(communityID types.HexBytes, endDate uint64) error {
_, err := p.db.Exec(`INSERT INTO communities_archive_info (last_message_archive_end_date, community_id) VALUES (?, ?)`,
endDate,

View File

@ -919,15 +919,23 @@ func (m *Messenger) HandleHistoryArchiveMagnetlinkMessage(state *ReceivedMessage
if err != nil {
return err
}
lastSeenMagnetlink, err := m.communitiesManager.GetLastSeenMagnetlink(id)
if err != nil {
return err
}
// We are only interested in a community archive magnet link
// if it originates from a community that the current account is
// part of and doesn't own the private key at the same time
if !signedByOwnedCommunity && joinedCommunity && clock >= lastClock {
if lastSeenMagnetlink == magnetlink {
m.communitiesManager.LogStdout("already processed this magnetlink")
return nil
}
m.communitiesManager.UnseedHistoryArchiveTorrent(id)
currentTask := m.communitiesManager.GetHistoryArchiveDownloadTask(id.String())
go func(currentTask *communities.HistoryArchiveDownloadTask) {
go func(currentTask *communities.HistoryArchiveDownloadTask, communityID types.HexBytes) {
// Cancel ongoing download/import task
if currentTask != nil {
@ -941,17 +949,20 @@ func (m *Messenger) HandleHistoryArchiveMagnetlinkMessage(state *ReceivedMessage
Waiter: *new(sync.WaitGroup),
}
m.communitiesManager.AddHistoryArchiveDownloadTask(id.String(), task)
m.communitiesManager.AddHistoryArchiveDownloadTask(communityID.String(), task)
// this wait groups tracks the ongoing task for a particular community
task.Waiter.Add(1)
defer task.Waiter.Done()
defer func() {
task.Waiter.Done()
m.communitiesManager.DeleteHistoryArchiveDownloadTask(communityID.String())
}()
// this wait groups tracks all ongoing tasks across communities
m.downloadHistoryArchiveTasksWaitGroup.Add(1)
defer m.downloadHistoryArchiveTasksWaitGroup.Done()
m.downloadAndImportHistoryArchives(id, magnetlink, task.Cancel)
}(currentTask)
m.downloadAndImportHistoryArchives(communityID, magnetlink, task.Cancel)
}(currentTask, id)
return m.communitiesManager.UpdateMagnetlinkMessageClock(id, clock)
}
@ -977,7 +988,9 @@ func (m *Messenger) downloadAndImportHistoryArchives(id types.HexBytes, magnetli
}
if downloadTaskInfo.Cancelled {
if downloadTaskInfo.TotalDownloadedArchivesCount > 0 {
m.communitiesManager.LogStdout(fmt.Sprintf("downloaded %d of %d archives so far", downloadTaskInfo.TotalDownloadedArchivesCount, downloadTaskInfo.TotalArchivesCount))
}
return
}
@ -1056,6 +1069,10 @@ importMessageArchivesLoop:
}
}
}
err = m.communitiesManager.UpdateLastSeenMagnetlink(id, magnetlink)
if err != nil {
m.communitiesManager.LogStdout("couldn't update last seen magnetlink", zap.Error(err))
}
m.config.messengerSignalsHandler.DownloadingHistoryArchivesFinished(types.EncodeHex(id))
}