From 60a49fc7d92d8bc28d3fc5a374d0cc798d53c89f Mon Sep 17 00:00:00 2001 From: Parvesh Monu Date: Wed, 3 Aug 2022 04:38:01 +0530 Subject: [PATCH] Timeout automatic status updates (#2757) --- VERSION | 2 +- protocol/messenger.go | 1 + protocol/messenger_communities.go | 8 +- protocol/messenger_config.go | 1 + protocol/messenger_status_updates.go | 55 +++++++ protocol/messenger_status_updates_test.go | 171 ++++++++++++++++++++++ protocol/persistence.go | 52 +++++++ services/ext/signal.go | 4 + signal/events_messenger.go | 8 +- 9 files changed, 296 insertions(+), 6 deletions(-) create mode 100644 protocol/messenger_status_updates_test.go diff --git a/VERSION b/VERSION index e49057b33..6ce4388fd 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.104.0 +0.104.1 diff --git a/protocol/messenger.go b/protocol/messenger.go index 49eb149c2..c67dcbe1e 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -648,6 +648,7 @@ func (m *Messenger) Start() (*MessengerResponse, error) { m.watchExpiredMessages() m.watchIdentityImageChanges() m.broadcastLatestUserStatus() + m.timeoutAutomaticStatusUpdates() m.startBackupLoop() err = m.startAutoMessageLoop() if err != nil { diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index 1f872d685..9db2c3a5a 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -1322,13 +1322,13 @@ func (m *Messenger) passStoredCommunityInfoToSignalHandler(communityID string) { return } - //if there is no info helpful for client, we don't post it - if community.Name() == "" && community.DescriptionText() == "" && community.MembersCount() == 0 { + if err != nil { + m.logger.Warn("cant get community and pass it to signal handler", zap.Error(err)) return } - if err != nil { - m.logger.Warn("cant get community and pass it to signal handler", zap.Error(err)) + //if there is no info helpful for client, we don't post it + if community.Name() == "" && community.DescriptionText() == "" && community.MembersCount() == 0 { return } diff --git a/protocol/messenger_config.go b/protocol/messenger_config.go index b0431e032..d4554604a 100644 --- a/protocol/messenger_config.go +++ b/protocol/messenger_config.go @@ -44,6 +44,7 @@ type MessengerSignalsHandler interface { HistoryArchivesSeeding(communityID string) HistoryArchivesUnseeded(communityID string) HistoryArchiveDownloaded(communityID string, from int, to int) + StatusUpdatesTimedOut(statusUpdates *[]UserStatus) } type config struct { diff --git a/protocol/messenger_status_updates.go b/protocol/messenger_status_updates.go index 3ecc10387..045d7d088 100644 --- a/protocol/messenger_status_updates.go +++ b/protocol/messenger_status_updates.go @@ -267,3 +267,58 @@ func (m *Messenger) HandleStatusUpdate(state *ReceivedMessageState, statusMessag func (m *Messenger) StatusUpdates() ([]UserStatus, error) { return m.persistence.StatusUpdates() } + +func (m *Messenger) timeoutStatusUpdates(fromClock uint64, tillClock uint64) { + // Most of the time we only need to time out just one status update, + // but the range covers special cases like, other status updates had the same clock value + // or the received another status update with higher clock value than the reference clock but + // lower clock value than the nextClock + deactivatedStatusUpdates, err := m.persistence.DeactivatedAutomaticStatusUpdates(fromClock, tillClock) + + // Send deactivatedStatusUpdates to Client + if err == nil { + m.config.messengerSignalsHandler.StatusUpdatesTimedOut(&deactivatedStatusUpdates) + } else { + m.logger.Debug("Unable to get deactivated automatic status updates from db", zap.Error(err)) + } +} + +func (m *Messenger) timeoutAutomaticStatusUpdates() { + + nextClock := uint64(0) + waitDuration := uint64(10) // Initial 10 sec wait, to make sure new status updates are fetched before starting timing out loop + fiveMinutes := uint64(5 * 60) + referenceClock := uint64(time.Now().Unix()) - fiveMinutes + + go func() { + for { + select { + case <-time.After(time.Duration(waitDuration) * time.Second): + tempNextClock, err := m.persistence.NextHigherClockValueOfAutomaticStatusUpdates(referenceClock) + + if err == nil { + if nextClock == 0 || tempNextClock > nextClock { + nextClock = tempNextClock + // Extra 5 sec wait (broadcast receiving delay) + waitDuration = tempNextClock + fiveMinutes + 5 - uint64(time.Now().Unix()) + if waitDuration < 0 { + waitDuration = 0 + } + } else { + m.timeoutStatusUpdates(referenceClock, tempNextClock) + waitDuration = 0 + referenceClock = tempNextClock + } + } else if err == common.ErrRecordNotFound { + // No More status updates to timeout, keep loop running at five minutes interval + waitDuration = fiveMinutes + } else { + m.logger.Debug("Unable to timeout automatic status updates", zap.Error(err)) + return + } + case <-m.quit: + return + } + } + }() +} diff --git a/protocol/messenger_status_updates_test.go b/protocol/messenger_status_updates_test.go new file mode 100644 index 000000000..e029fa971 --- /dev/null +++ b/protocol/messenger_status_updates_test.go @@ -0,0 +1,171 @@ +package protocol + +import ( + "crypto/ecdsa" + "testing" + + "github.com/stretchr/testify/suite" + "go.uber.org/zap" + + gethbridge "github.com/status-im/status-go/eth-node/bridge/geth" + "github.com/status-im/status-go/eth-node/crypto" + "github.com/status-im/status-go/eth-node/types" + "github.com/status-im/status-go/protocol/protobuf" + + "github.com/status-im/status-go/protocol/tt" + "github.com/status-im/status-go/waku" +) + +func TestMessengerStatusUpdatesSuite(t *testing.T) { + suite.Run(t, new(MessengerStatusUpdatesSuite)) +} + +type MessengerStatusUpdatesSuite struct { + suite.Suite + m *Messenger + privateKey *ecdsa.PrivateKey // private key for the main instance of Messenger + // If one wants to send messages between different instances of Messenger, + // a single waku service should be shared. + shh types.Waku + logger *zap.Logger +} + +func (s *MessengerStatusUpdatesSuite) SetupTest() { + s.logger = tt.MustCreateTestLogger() + + config := waku.DefaultConfig + config.MinimumAcceptedPoW = 0 + shh := waku.New(&config, s.logger) + s.shh = gethbridge.NewGethWakuWrapper(shh) + s.Require().NoError(shh.Start()) + + s.m = s.newMessenger() + s.privateKey = s.m.identity + _, err := s.m.Start() + s.Require().NoError(err) + +} + +func (s *MessengerStatusUpdatesSuite) TearDownTest() { + s.Require().NoError(s.m.Shutdown()) +} + +func (s *MessengerStatusUpdatesSuite) newMessenger() *Messenger { + privateKey, err := crypto.GenerateKey() + s.Require().NoError(err) + + messenger, err := newMessengerWithKey(s.shh, privateKey, s.logger, nil) + s.Require().NoError(err) + return messenger +} + +func (s *MessengerStatusUpdatesSuite) TestNextHigherClockValueOfAutomaticStatusUpdates() { + + statusUpdate1 := UserStatus{ + StatusType: int(protobuf.StatusUpdate_AUTOMATIC), + Clock: 100, + CustomText: "", + PublicKey: "pub-key1", + } + + err := s.m.persistence.InsertStatusUpdate(statusUpdate1) + s.Require().NoError(err) + + statusUpdate2 := UserStatus{ + StatusType: int(protobuf.StatusUpdate_AUTOMATIC), + Clock: 200, + CustomText: "", + PublicKey: "pub-key2", + } + + err = s.m.persistence.InsertStatusUpdate(statusUpdate2) + s.Require().NoError(err) + + statusUpdate3 := UserStatus{ + StatusType: int(protobuf.StatusUpdate_AUTOMATIC), + Clock: 300, + CustomText: "", + PublicKey: "pub-key3", + } + + err = s.m.persistence.InsertStatusUpdate(statusUpdate3) + s.Require().NoError(err) + + // nextClock: clock value next higher than passed clock, of status update of type StatusUpdate_AUTOMATIC + nextClock, err := s.m.persistence.NextHigherClockValueOfAutomaticStatusUpdates(100) + s.Require().NoError(err) + + s.Require().Equal(nextClock, uint64(200)) + +} + +func (s *MessengerStatusUpdatesSuite) TestDeactivatedStatusUpdates() { + + statusUpdate1 := UserStatus{ + StatusType: int(protobuf.StatusUpdate_AUTOMATIC), + Clock: 100, + CustomText: "", + PublicKey: "pub-key1", + } + + err := s.m.persistence.InsertStatusUpdate(statusUpdate1) + s.Require().NoError(err) + + statusUpdate2 := UserStatus{ + StatusType: int(protobuf.StatusUpdate_AUTOMATIC), + Clock: 200, + CustomText: "", + PublicKey: "pub-key2", + } + + err = s.m.persistence.InsertStatusUpdate(statusUpdate2) + s.Require().NoError(err) + + statusUpdate3 := UserStatus{ + StatusType: int(protobuf.StatusUpdate_AUTOMATIC), + Clock: 400, + CustomText: "", + PublicKey: "pub-key3", + } + + err = s.m.persistence.InsertStatusUpdate(statusUpdate3) + s.Require().NoError(err) + + statusUpdate4 := UserStatus{ + StatusType: int(protobuf.StatusUpdate_AUTOMATIC), + Clock: 400, // Adding duplicate clock value for testing + CustomText: "", + PublicKey: "pub-key4", + } + + err = s.m.persistence.InsertStatusUpdate(statusUpdate4) + s.Require().NoError(err) + + statusUpdate5 := UserStatus{ + StatusType: int(protobuf.StatusUpdate_AUTOMATIC), + Clock: 500, + CustomText: "", + PublicKey: "pub-key5", + } + + err = s.m.persistence.InsertStatusUpdate(statusUpdate5) + s.Require().NoError(err) + + // Lower limit is not included, but upper limit is included + // So every status update in this range (lowerClock upperClock] will be deactivated + deactivatedAutomaticStatusUpdates, err := s.m.persistence.DeactivatedAutomaticStatusUpdates(100, 400) + s.Require().NoError(err) + + count := len(deactivatedAutomaticStatusUpdates) + s.Require().Equal(3, count) + + // Status is deactivated + s.Require().Equal(int(protobuf.StatusUpdate_INACTIVE), deactivatedAutomaticStatusUpdates[0].StatusType) + + // Lower range starts at 201 (clock + 1) + // (clock is bumped, so that client replaces old status update with new one) + s.Require().Equal(uint64(201), deactivatedAutomaticStatusUpdates[0].Clock) + + //Upper rannge ends at 401 (clock + 1) + s.Require().Equal(uint64(401), deactivatedAutomaticStatusUpdates[count-1].Clock) +} diff --git a/protocol/persistence.go b/protocol/persistence.go index 9f9068a02..a66791c60 100644 --- a/protocol/persistence.go +++ b/protocol/persistence.go @@ -1005,6 +1005,58 @@ func (db sqlitePersistence) StatusUpdates() (statusUpdates []UserStatus, err err return } +func (db sqlitePersistence) NextHigherClockValueOfAutomaticStatusUpdates(clock uint64) (uint64, error) { + var nextClock uint64 + + err := db.db.QueryRow(` + SELECT clock + FROM status_updates + WHERE clock > ? AND status_type = ? + LIMIT 1 + `, clock, protobuf.StatusUpdate_AUTOMATIC).Scan(&nextClock) + + switch err { + case sql.ErrNoRows: + return 0, common.ErrRecordNotFound + case nil: + return nextClock, nil + default: + return 0, err + } +} + +func (db sqlitePersistence) DeactivatedAutomaticStatusUpdates(fromClock uint64, tillClock uint64) (statusUpdates []UserStatus, err error) { + rows, err := db.db.Query(` + SELECT + public_key, + ?, + clock + 1, + custom_text + FROM status_updates + WHERE clock > ? AND clock <= ? AND status_type = ? + `, protobuf.StatusUpdate_INACTIVE, fromClock, tillClock, protobuf.StatusUpdate_AUTOMATIC) + if err != nil { + return + } + defer rows.Close() + + for rows.Next() { + var userStatus UserStatus + err = rows.Scan( + &userStatus.PublicKey, + &userStatus.StatusType, + &userStatus.Clock, + &userStatus.CustomText, + ) + if err != nil { + return + } + statusUpdates = append(statusUpdates, userStatus) + } + + return +} + func (db *sqlitePersistence) AddBookmark(bookmark browsers.Bookmark) (browsers.Bookmark, error) { tx, err := db.db.Begin() if err != nil { diff --git a/services/ext/signal.go b/services/ext/signal.go index aca8f6f1e..b2cf87eb0 100644 --- a/services/ext/signal.go +++ b/services/ext/signal.go @@ -118,3 +118,7 @@ func (m *MessengerSignalsHandler) HistoryArchivesUnseeded(communityID string) { func (m *MessengerSignalsHandler) HistoryArchiveDownloaded(communityID string, from int, to int) { signal.SendHistoryArchiveDownloaded(communityID, from, to) } + +func (m *MessengerSignalsHandler) StatusUpdatesTimedOut(statusUpdates *[]protocol.UserStatus) { + signal.SendStatusUpdatesTimedOut(statusUpdates) +} diff --git a/signal/events_messenger.go b/signal/events_messenger.go index 804bb5428..f3cba2dbc 100644 --- a/signal/events_messenger.go +++ b/signal/events_messenger.go @@ -8,6 +8,9 @@ const ( // EventCommunityFound triggered when user requested info about some community and messenger successfully // retrieved it from mailserver EventCommunityInfoFound = "community.found" + + // Event Automatic Status Updates Timed out + EventStatusUpdatesTimedOut = "status.updates.timedout" ) // MessageDeliveredSignal specifies chat and message that was delivered @@ -16,7 +19,6 @@ type MessageDeliveredSignal struct { MessageID string `json:"messageID"` } -// MessageDeliveredSignal specifies chat and message that was delivered type CommunityInfoFoundSignal struct { Name string `json:"name"` Description string `json:"description"` @@ -33,3 +35,7 @@ func SendMessageDelivered(chatID string, messageID string) { func SendCommunityInfoFound(community interface{}) { send(EventCommunityInfoFound, community) } + +func SendStatusUpdatesTimedOut(statusUpdates interface{}) { + send(EventStatusUpdatesTimedOut, statusUpdates) +}