chore: cleanup message segments

closes: #4297
This commit is contained in:
Patryk Osmaczko 2023-11-15 12:01:02 +01:00 committed by osmaczko
parent b994cedfc3
commit d04f99d56d
5 changed files with 95 additions and 13 deletions

View File

@ -1349,7 +1349,7 @@ func (s *MessageSender) handleSegmentationLayer(message *v1protocol.StatusMessag
return ErrMessageSegmentsInvalidCount
}
err = s.persistence.SaveMessageSegment(&segmentMessage, message.TransportLayer.SigPubKey)
err = s.persistence.SaveMessageSegment(&segmentMessage, message.TransportLayer.SigPubKey, time.Now().Unix())
if err != nil {
return err
}
@ -1378,7 +1378,7 @@ func (s *MessageSender) handleSegmentationLayer(message *v1protocol.StatusMessag
return ErrMessageSegmentsHashMismatch
}
err = s.persistence.CompleteMessageSegments(segmentMessage.EntireMessageHash, message.TransportLayer.SigPubKey)
err = s.persistence.CompleteMessageSegments(segmentMessage.EntireMessageHash, message.TransportLayer.SigPubKey, time.Now().Unix())
if err != nil {
return err
}
@ -1387,3 +1387,20 @@ func (s *MessageSender) handleSegmentationLayer(message *v1protocol.StatusMessag
return nil
}
func (s *MessageSender) CleanupSegments() error {
weekAgo := time.Now().AddDate(0, 0, -7).Unix()
monthAgo := time.Now().AddDate(0, -1, 0).Unix()
err := s.persistence.RemoveMessageSegmentsOlderThan(weekAgo)
if err != nil {
return err
}
err = s.persistence.RemoveMessageSegmentsCompletedOlderThan(monthAgo)
if err != nil {
return err
}
return nil
}

View File

@ -347,11 +347,11 @@ func (db *RawMessagesPersistence) IsMessageAlreadyCompleted(hash []byte) (bool,
return alreadyCompleted > 0, nil
}
func (db *RawMessagesPersistence) SaveMessageSegment(segment *protobuf.SegmentMessage, sigPubKey *ecdsa.PublicKey) error {
func (db *RawMessagesPersistence) SaveMessageSegment(segment *protobuf.SegmentMessage, sigPubKey *ecdsa.PublicKey, timestamp int64) error {
sigPubKeyBlob := crypto.CompressPubkey(sigPubKey)
_, err := db.db.Exec("INSERT INTO message_segments (hash, segment_index, segments_count, sig_pub_key, payload) VALUES (?, ?, ?, ?, ?)",
segment.EntireMessageHash, segment.Index, segment.SegmentsCount, sigPubKeyBlob, segment.Payload)
_, err := db.db.Exec("INSERT INTO message_segments (hash, segment_index, segments_count, sig_pub_key, payload, timestamp) VALUES (?, ?, ?, ?, ?, ?)",
segment.EntireMessageHash, segment.Index, segment.SegmentsCount, sigPubKeyBlob, segment.Payload, timestamp)
return err
}
@ -383,7 +383,12 @@ func (db *RawMessagesPersistence) GetMessageSegments(hash []byte, sigPubKey *ecd
return segments, nil
}
func (db *RawMessagesPersistence) CompleteMessageSegments(hash []byte, sigPubKey *ecdsa.PublicKey) error {
func (db *RawMessagesPersistence) RemoveMessageSegmentsOlderThan(timestamp int64) error {
_, err := db.db.Exec("DELETE FROM message_segments WHERE timestamp < ?", timestamp)
return err
}
func (db *RawMessagesPersistence) CompleteMessageSegments(hash []byte, sigPubKey *ecdsa.PublicKey, timestamp int64) error {
tx, err := db.db.BeginTx(context.Background(), &sql.TxOptions{})
if err != nil {
return err
@ -405,10 +410,15 @@ func (db *RawMessagesPersistence) CompleteMessageSegments(hash []byte, sigPubKey
return err
}
_, err = tx.Exec("INSERT INTO message_segments_completed (hash, sig_pub_key) VALUES (?,?)", hash, sigPubKeyBlob)
_, err = tx.Exec("INSERT INTO message_segments_completed (hash, sig_pub_key, timestamp) VALUES (?,?,?)", hash, sigPubKeyBlob, timestamp)
if err != nil {
return err
}
return err
}
func (db *RawMessagesPersistence) RemoveMessageSegmentsCompletedOlderThan(timestamp int64) error {
_, err := db.db.Exec("DELETE FROM message_segments_completed WHERE timestamp < ?", timestamp)
return err
}

View File

@ -824,6 +824,7 @@ func (m *Messenger) Start() (*MessengerResponse, error) {
m.startSettingsChangesLoop()
m.startCommunityRekeyLoop()
m.startCuratedCommunitiesUpdateLoop()
m.startMessageSegmentsCleanupLoop()
if err := m.cleanTopics(); err != nil {
return nil, err
@ -5724,3 +5725,26 @@ func (m *Messenger) handleSyncSocialLinks(message *protobuf.SyncSocialLinks, cal
func (m *Messenger) GetDeleteForMeMessages() ([]*protobuf.SyncDeleteForMeMessage, error) {
return m.persistence.GetDeleteForMeMessages()
}
func (m *Messenger) startMessageSegmentsCleanupLoop() {
logger := m.logger.Named("messageSegmentsCleanupLoop")
go func() {
var interval time.Duration = 0
for {
select {
case <-time.After(interval):
// Immediate execution on first run, then set to regular interval
interval = 1 * time.Hour
err := m.sender.CleanupSegments()
if err != nil {
logger.Error("failed to cleanup segments", zap.Error(err))
}
case <-m.quit:
return
}
}
}()
}

View File

@ -111,6 +111,7 @@
// 1698746210_add_signature_to_revealed_addresses.up.sql (87B)
// 1699041816_profile_showcase_contacts.up.sql (2.206kB)
// 1699554099_message_segments.up.sql (426B)
// 1700044186_message_segments_timestamp.up.sql (322B)
// README.md (554B)
// doc.go (850B)
@ -2375,7 +2376,7 @@ func _1699041816_profile_showcase_contactsUpSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "1699041816_profile_showcase_contacts.up.sql", size: 2206, mode: os.FileMode(0644), modTime: time.Unix(1699622242, 0)}
info := bindataFileInfo{name: "1699041816_profile_showcase_contacts.up.sql", size: 2206, mode: os.FileMode(0644), modTime: time.Unix(1699887700, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xd5, 0x7b, 0x55, 0xda, 0x93, 0x4a, 0x92, 0xf8, 0x45, 0xb2, 0x9f, 0x32, 0xf4, 0x37, 0xc, 0x5f, 0x62, 0xba, 0x33, 0xe2, 0x5c, 0x91, 0x1c, 0xc, 0x7, 0x9, 0xc2, 0x27, 0x5, 0x90, 0x94, 0xf3}}
return a, nil
}
@ -2395,11 +2396,31 @@ func _1699554099_message_segmentsUpSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "1699554099_message_segments.up.sql", size: 426, mode: os.FileMode(0644), modTime: time.Unix(1699622242, 0)}
info := bindataFileInfo{name: "1699554099_message_segments.up.sql", size: 426, mode: os.FileMode(0644), modTime: time.Unix(1699976109, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x73, 0xca, 0xd, 0xfa, 0xfa, 0x17, 0xef, 0x7e, 0x24, 0xf9, 0x28, 0xbd, 0x39, 0x75, 0xff, 0x34, 0x31, 0x27, 0x58, 0x3c, 0x17, 0x77, 0xfd, 0xc2, 0x66, 0x47, 0x63, 0x58, 0x3e, 0xb3, 0x88, 0x1a}}
return a, nil
}
var __1700044186_message_segments_timestampUpSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x72\xf4\x09\x71\x0d\x52\x08\x71\x74\xf2\x71\x55\xc8\x4d\x2d\x2e\x4e\x4c\x4f\x8d\x2f\x4e\x4d\xcf\x4d\xcd\x2b\x29\xe6\x72\x74\x71\x51\x70\xf6\xf7\x09\xf5\xf5\x53\x28\xc9\xcc\x4d\x2d\x2e\x49\xcc\x2d\x50\xf0\xf4\x0b\x71\x75\x77\x0d\x52\x70\x71\x75\x73\x0c\xf5\x09\x51\x30\xb0\xe6\xe2\xc2\x67\x4a\x7c\x72\x7e\x6e\x41\x4e\x6a\x49\x6a\x0a\xf1\xe6\x39\x07\xb9\x3a\x86\xb8\x2a\x78\xfa\xb9\xb8\x46\x28\x64\xa6\x54\xc4\x63\x18\x8a\xd0\xef\xef\x87\x61\xa5\x06\x5c\x56\xd3\x9a\x08\xb3\xe0\x0e\xc4\x6f\x2a\x42\x1d\x8a\xf9\x80\x00\x00\x00\xff\xff\x95\x31\x20\x93\x42\x01\x00\x00")
func _1700044186_message_segments_timestampUpSqlBytes() ([]byte, error) {
return bindataRead(
__1700044186_message_segments_timestampUpSql,
"1700044186_message_segments_timestamp.up.sql",
)
}
func _1700044186_message_segments_timestampUpSql() (*asset, error) {
bytes, err := _1700044186_message_segments_timestampUpSqlBytes()
if err != nil {
return nil, err
}
info := bindataFileInfo{name: "1700044186_message_segments_timestamp.up.sql", size: 322, mode: os.FileMode(0644), modTime: time.Unix(1700049804, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x3e, 0x4e, 0x7, 0x86, 0x71, 0xc8, 0x1f, 0x2f, 0xf4, 0xbc, 0xc5, 0xc4, 0x37, 0x56, 0xa1, 0x47, 0xd9, 0xc9, 0xfd, 0xdf, 0x9a, 0x48, 0x1d, 0xfd, 0xb4, 0xeb, 0xb6, 0xb1, 0xc2, 0x73, 0x11, 0x19}}
return a, nil
}
var _readmeMd = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x54\x91\xc1\xce\xd3\x30\x10\x84\xef\x7e\x8a\x91\x7a\x01\xa9\x2a\x8f\xc0\x0d\x71\x82\x03\x48\x1c\xc9\x36\x9e\x36\x96\x1c\x6f\xf0\xae\x93\xe6\xed\x91\xa3\xc2\xdf\xff\x66\xed\xd8\x33\xdf\x78\x4f\xa7\x13\xbe\xea\x06\x57\x6c\x35\x39\x31\xa7\x7b\x15\x4f\x5a\xec\x73\x08\xbf\x08\x2d\x79\x7f\x4a\x43\x5b\x86\x17\xfd\x8c\x21\xea\x56\x5e\x47\x90\x4a\x14\x75\x48\xde\x64\x37\x2c\x6a\x96\xae\x99\x48\x05\xf6\x27\x77\x13\xad\x08\xae\x8a\x51\xe7\x25\xf3\xf1\xa9\x9f\xf9\x58\x58\x2c\xad\xbc\xe0\x8b\x56\xf0\x21\x5d\xeb\x4c\x95\xb3\xae\x84\x60\xd4\xdc\xe6\x82\x5d\x1b\x36\x6d\x39\x62\x92\xf5\xb8\x11\xdb\x92\xd3\x28\xce\xe0\x13\xe1\x72\xcd\x3c\x63\xd4\x65\x87\xae\xac\xe8\xc3\x28\x2e\x67\x44\x66\x3a\x21\x25\xa2\x72\xac\x14\x67\xbc\x84\x9f\x53\x32\x8c\x52\x70\x25\x56\xd6\xfd\x8d\x05\x37\xad\x30\x9d\x9f\xa6\x86\x0f\xcd\x58\x7f\xcf\x34\x93\x3b\xed\x90\x9f\xa4\x1f\xcf\x30\x85\x4d\x07\x58\xaf\x7f\x25\xc4\x9d\xf3\x72\x64\x84\xd0\x7f\xf9\x9b\x3a\x2d\x84\xef\x85\x48\x66\x8d\xd8\x88\x9b\x8c\x8c\x98\x5b\xf6\x74\x14\x4e\x33\x0d\xc9\xe0\x93\x38\xda\x12\xc5\x69\xbd\xe4\xf0\x2e\x7a\x78\x07\x1c\xfe\x13\x9f\x91\x29\x31\x95\x7b\x7f\x62\x59\x37\xb4\xe5\x5e\x25\xfe\x33\xee\xd5\x53\x71\xd6\xda\x3a\xd8\xcb\xde\x2e\xf8\xa1\x90\x55\x53\x0c\xc7\xaa\x0d\xe9\x76\x14\x29\x1c\x7b\x68\xdd\x2f\xe1\x6f\x00\x00\x00\xff\xff\x3c\x0a\xc2\xfe\x2a\x02\x00\x00")
func readmeMdBytes() ([]byte, error) {
@ -2642,8 +2663,9 @@ var _bindata = map[string]func() (*asset, error){
"1698746210_add_signature_to_revealed_addresses.up.sql": _1698746210_add_signature_to_revealed_addressesUpSql,
"1699041816_profile_showcase_contacts.up.sql": _1699041816_profile_showcase_contactsUpSql,
"1699554099_message_segments.up.sql": _1699554099_message_segmentsUpSql,
"README.md": readmeMd,
"doc.go": docGo,
"1700044186_message_segments_timestamp.up.sql": _1700044186_message_segments_timestampUpSql,
"README.md": readmeMd,
"doc.go": docGo,
}
// AssetDebug is true if the assets were built with the debug flag enabled.
@ -2803,8 +2825,9 @@ var _bintree = &bintree{nil, map[string]*bintree{
"1698746210_add_signature_to_revealed_addresses.up.sql": {_1698746210_add_signature_to_revealed_addressesUpSql, map[string]*bintree{}},
"1699041816_profile_showcase_contacts.up.sql": {_1699041816_profile_showcase_contactsUpSql, map[string]*bintree{}},
"1699554099_message_segments.up.sql": {_1699554099_message_segmentsUpSql, map[string]*bintree{}},
"README.md": {readmeMd, map[string]*bintree{}},
"doc.go": {docGo, map[string]*bintree{}},
"1700044186_message_segments_timestamp.up.sql": {_1700044186_message_segments_timestampUpSql, map[string]*bintree{}},
"README.md": {readmeMd, map[string]*bintree{}},
"doc.go": {docGo, map[string]*bintree{}},
}}
// RestoreAsset restores an asset under the given directory.

View File

@ -0,0 +1,8 @@
ALTER TABLE message_segments
ADD COLUMN timestamp INTEGER DEFAULT 0;
ALTER TABLE message_segments_completed
ADD COLUMN timestamp INTEGER DEFAULT 0;
CREATE INDEX idx_message_segments_timestamp ON message_segments(timestamp);
CREATE INDEX idx_message_segments_completed_timestamp ON message_segments_completed(timestamp);