Timeout automatic status updates (#2757)
This commit is contained in:
parent
19807ce338
commit
60a49fc7d9
|
@ -648,6 +648,7 @@ func (m *Messenger) Start() (*MessengerResponse, error) {
|
||||||
m.watchExpiredMessages()
|
m.watchExpiredMessages()
|
||||||
m.watchIdentityImageChanges()
|
m.watchIdentityImageChanges()
|
||||||
m.broadcastLatestUserStatus()
|
m.broadcastLatestUserStatus()
|
||||||
|
m.timeoutAutomaticStatusUpdates()
|
||||||
m.startBackupLoop()
|
m.startBackupLoop()
|
||||||
err = m.startAutoMessageLoop()
|
err = m.startAutoMessageLoop()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -1322,13 +1322,13 @@ func (m *Messenger) passStoredCommunityInfoToSignalHandler(communityID string) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
//if there is no info helpful for client, we don't post it
|
if err != nil {
|
||||||
if community.Name() == "" && community.DescriptionText() == "" && community.MembersCount() == 0 {
|
m.logger.Warn("cant get community and pass it to signal handler", zap.Error(err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
//if there is no info helpful for client, we don't post it
|
||||||
m.logger.Warn("cant get community and pass it to signal handler", zap.Error(err))
|
if community.Name() == "" && community.DescriptionText() == "" && community.MembersCount() == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -44,6 +44,7 @@ type MessengerSignalsHandler interface {
|
||||||
HistoryArchivesSeeding(communityID string)
|
HistoryArchivesSeeding(communityID string)
|
||||||
HistoryArchivesUnseeded(communityID string)
|
HistoryArchivesUnseeded(communityID string)
|
||||||
HistoryArchiveDownloaded(communityID string, from int, to int)
|
HistoryArchiveDownloaded(communityID string, from int, to int)
|
||||||
|
StatusUpdatesTimedOut(statusUpdates *[]UserStatus)
|
||||||
}
|
}
|
||||||
|
|
||||||
type config struct {
|
type config struct {
|
||||||
|
|
|
@ -267,3 +267,58 @@ func (m *Messenger) HandleStatusUpdate(state *ReceivedMessageState, statusMessag
|
||||||
func (m *Messenger) StatusUpdates() ([]UserStatus, error) {
|
func (m *Messenger) StatusUpdates() ([]UserStatus, error) {
|
||||||
return m.persistence.StatusUpdates()
|
return m.persistence.StatusUpdates()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *Messenger) timeoutStatusUpdates(fromClock uint64, tillClock uint64) {
|
||||||
|
// Most of the time we only need to time out just one status update,
|
||||||
|
// but the range covers special cases like, other status updates had the same clock value
|
||||||
|
// or the received another status update with higher clock value than the reference clock but
|
||||||
|
// lower clock value than the nextClock
|
||||||
|
deactivatedStatusUpdates, err := m.persistence.DeactivatedAutomaticStatusUpdates(fromClock, tillClock)
|
||||||
|
|
||||||
|
// Send deactivatedStatusUpdates to Client
|
||||||
|
if err == nil {
|
||||||
|
m.config.messengerSignalsHandler.StatusUpdatesTimedOut(&deactivatedStatusUpdates)
|
||||||
|
} else {
|
||||||
|
m.logger.Debug("Unable to get deactivated automatic status updates from db", zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Messenger) timeoutAutomaticStatusUpdates() {
|
||||||
|
|
||||||
|
nextClock := uint64(0)
|
||||||
|
waitDuration := uint64(10) // Initial 10 sec wait, to make sure new status updates are fetched before starting timing out loop
|
||||||
|
fiveMinutes := uint64(5 * 60)
|
||||||
|
referenceClock := uint64(time.Now().Unix()) - fiveMinutes
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-time.After(time.Duration(waitDuration) * time.Second):
|
||||||
|
tempNextClock, err := m.persistence.NextHigherClockValueOfAutomaticStatusUpdates(referenceClock)
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
if nextClock == 0 || tempNextClock > nextClock {
|
||||||
|
nextClock = tempNextClock
|
||||||
|
// Extra 5 sec wait (broadcast receiving delay)
|
||||||
|
waitDuration = tempNextClock + fiveMinutes + 5 - uint64(time.Now().Unix())
|
||||||
|
if waitDuration < 0 {
|
||||||
|
waitDuration = 0
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
m.timeoutStatusUpdates(referenceClock, tempNextClock)
|
||||||
|
waitDuration = 0
|
||||||
|
referenceClock = tempNextClock
|
||||||
|
}
|
||||||
|
} else if err == common.ErrRecordNotFound {
|
||||||
|
// No More status updates to timeout, keep loop running at five minutes interval
|
||||||
|
waitDuration = fiveMinutes
|
||||||
|
} else {
|
||||||
|
m.logger.Debug("Unable to timeout automatic status updates", zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case <-m.quit:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,171 @@
|
||||||
|
package protocol
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/ecdsa"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/suite"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
|
||||||
|
"github.com/status-im/status-go/eth-node/crypto"
|
||||||
|
"github.com/status-im/status-go/eth-node/types"
|
||||||
|
"github.com/status-im/status-go/protocol/protobuf"
|
||||||
|
|
||||||
|
"github.com/status-im/status-go/protocol/tt"
|
||||||
|
"github.com/status-im/status-go/waku"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestMessengerStatusUpdatesSuite(t *testing.T) {
|
||||||
|
suite.Run(t, new(MessengerStatusUpdatesSuite))
|
||||||
|
}
|
||||||
|
|
||||||
|
type MessengerStatusUpdatesSuite struct {
|
||||||
|
suite.Suite
|
||||||
|
m *Messenger
|
||||||
|
privateKey *ecdsa.PrivateKey // private key for the main instance of Messenger
|
||||||
|
// If one wants to send messages between different instances of Messenger,
|
||||||
|
// a single waku service should be shared.
|
||||||
|
shh types.Waku
|
||||||
|
logger *zap.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MessengerStatusUpdatesSuite) SetupTest() {
|
||||||
|
s.logger = tt.MustCreateTestLogger()
|
||||||
|
|
||||||
|
config := waku.DefaultConfig
|
||||||
|
config.MinimumAcceptedPoW = 0
|
||||||
|
shh := waku.New(&config, s.logger)
|
||||||
|
s.shh = gethbridge.NewGethWakuWrapper(shh)
|
||||||
|
s.Require().NoError(shh.Start())
|
||||||
|
|
||||||
|
s.m = s.newMessenger()
|
||||||
|
s.privateKey = s.m.identity
|
||||||
|
_, err := s.m.Start()
|
||||||
|
s.Require().NoError(err)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MessengerStatusUpdatesSuite) TearDownTest() {
|
||||||
|
s.Require().NoError(s.m.Shutdown())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MessengerStatusUpdatesSuite) newMessenger() *Messenger {
|
||||||
|
privateKey, err := crypto.GenerateKey()
|
||||||
|
s.Require().NoError(err)
|
||||||
|
|
||||||
|
messenger, err := newMessengerWithKey(s.shh, privateKey, s.logger, nil)
|
||||||
|
s.Require().NoError(err)
|
||||||
|
return messenger
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MessengerStatusUpdatesSuite) TestNextHigherClockValueOfAutomaticStatusUpdates() {
|
||||||
|
|
||||||
|
statusUpdate1 := UserStatus{
|
||||||
|
StatusType: int(protobuf.StatusUpdate_AUTOMATIC),
|
||||||
|
Clock: 100,
|
||||||
|
CustomText: "",
|
||||||
|
PublicKey: "pub-key1",
|
||||||
|
}
|
||||||
|
|
||||||
|
err := s.m.persistence.InsertStatusUpdate(statusUpdate1)
|
||||||
|
s.Require().NoError(err)
|
||||||
|
|
||||||
|
statusUpdate2 := UserStatus{
|
||||||
|
StatusType: int(protobuf.StatusUpdate_AUTOMATIC),
|
||||||
|
Clock: 200,
|
||||||
|
CustomText: "",
|
||||||
|
PublicKey: "pub-key2",
|
||||||
|
}
|
||||||
|
|
||||||
|
err = s.m.persistence.InsertStatusUpdate(statusUpdate2)
|
||||||
|
s.Require().NoError(err)
|
||||||
|
|
||||||
|
statusUpdate3 := UserStatus{
|
||||||
|
StatusType: int(protobuf.StatusUpdate_AUTOMATIC),
|
||||||
|
Clock: 300,
|
||||||
|
CustomText: "",
|
||||||
|
PublicKey: "pub-key3",
|
||||||
|
}
|
||||||
|
|
||||||
|
err = s.m.persistence.InsertStatusUpdate(statusUpdate3)
|
||||||
|
s.Require().NoError(err)
|
||||||
|
|
||||||
|
// nextClock: clock value next higher than passed clock, of status update of type StatusUpdate_AUTOMATIC
|
||||||
|
nextClock, err := s.m.persistence.NextHigherClockValueOfAutomaticStatusUpdates(100)
|
||||||
|
s.Require().NoError(err)
|
||||||
|
|
||||||
|
s.Require().Equal(nextClock, uint64(200))
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MessengerStatusUpdatesSuite) TestDeactivatedStatusUpdates() {
|
||||||
|
|
||||||
|
statusUpdate1 := UserStatus{
|
||||||
|
StatusType: int(protobuf.StatusUpdate_AUTOMATIC),
|
||||||
|
Clock: 100,
|
||||||
|
CustomText: "",
|
||||||
|
PublicKey: "pub-key1",
|
||||||
|
}
|
||||||
|
|
||||||
|
err := s.m.persistence.InsertStatusUpdate(statusUpdate1)
|
||||||
|
s.Require().NoError(err)
|
||||||
|
|
||||||
|
statusUpdate2 := UserStatus{
|
||||||
|
StatusType: int(protobuf.StatusUpdate_AUTOMATIC),
|
||||||
|
Clock: 200,
|
||||||
|
CustomText: "",
|
||||||
|
PublicKey: "pub-key2",
|
||||||
|
}
|
||||||
|
|
||||||
|
err = s.m.persistence.InsertStatusUpdate(statusUpdate2)
|
||||||
|
s.Require().NoError(err)
|
||||||
|
|
||||||
|
statusUpdate3 := UserStatus{
|
||||||
|
StatusType: int(protobuf.StatusUpdate_AUTOMATIC),
|
||||||
|
Clock: 400,
|
||||||
|
CustomText: "",
|
||||||
|
PublicKey: "pub-key3",
|
||||||
|
}
|
||||||
|
|
||||||
|
err = s.m.persistence.InsertStatusUpdate(statusUpdate3)
|
||||||
|
s.Require().NoError(err)
|
||||||
|
|
||||||
|
statusUpdate4 := UserStatus{
|
||||||
|
StatusType: int(protobuf.StatusUpdate_AUTOMATIC),
|
||||||
|
Clock: 400, // Adding duplicate clock value for testing
|
||||||
|
CustomText: "",
|
||||||
|
PublicKey: "pub-key4",
|
||||||
|
}
|
||||||
|
|
||||||
|
err = s.m.persistence.InsertStatusUpdate(statusUpdate4)
|
||||||
|
s.Require().NoError(err)
|
||||||
|
|
||||||
|
statusUpdate5 := UserStatus{
|
||||||
|
StatusType: int(protobuf.StatusUpdate_AUTOMATIC),
|
||||||
|
Clock: 500,
|
||||||
|
CustomText: "",
|
||||||
|
PublicKey: "pub-key5",
|
||||||
|
}
|
||||||
|
|
||||||
|
err = s.m.persistence.InsertStatusUpdate(statusUpdate5)
|
||||||
|
s.Require().NoError(err)
|
||||||
|
|
||||||
|
// Lower limit is not included, but upper limit is included
|
||||||
|
// So every status update in this range (lowerClock upperClock] will be deactivated
|
||||||
|
deactivatedAutomaticStatusUpdates, err := s.m.persistence.DeactivatedAutomaticStatusUpdates(100, 400)
|
||||||
|
s.Require().NoError(err)
|
||||||
|
|
||||||
|
count := len(deactivatedAutomaticStatusUpdates)
|
||||||
|
s.Require().Equal(3, count)
|
||||||
|
|
||||||
|
// Status is deactivated
|
||||||
|
s.Require().Equal(int(protobuf.StatusUpdate_INACTIVE), deactivatedAutomaticStatusUpdates[0].StatusType)
|
||||||
|
|
||||||
|
// Lower range starts at 201 (clock + 1)
|
||||||
|
// (clock is bumped, so that client replaces old status update with new one)
|
||||||
|
s.Require().Equal(uint64(201), deactivatedAutomaticStatusUpdates[0].Clock)
|
||||||
|
|
||||||
|
//Upper rannge ends at 401 (clock + 1)
|
||||||
|
s.Require().Equal(uint64(401), deactivatedAutomaticStatusUpdates[count-1].Clock)
|
||||||
|
}
|
|
@ -1005,6 +1005,58 @@ func (db sqlitePersistence) StatusUpdates() (statusUpdates []UserStatus, err err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db sqlitePersistence) NextHigherClockValueOfAutomaticStatusUpdates(clock uint64) (uint64, error) {
|
||||||
|
var nextClock uint64
|
||||||
|
|
||||||
|
err := db.db.QueryRow(`
|
||||||
|
SELECT clock
|
||||||
|
FROM status_updates
|
||||||
|
WHERE clock > ? AND status_type = ?
|
||||||
|
LIMIT 1
|
||||||
|
`, clock, protobuf.StatusUpdate_AUTOMATIC).Scan(&nextClock)
|
||||||
|
|
||||||
|
switch err {
|
||||||
|
case sql.ErrNoRows:
|
||||||
|
return 0, common.ErrRecordNotFound
|
||||||
|
case nil:
|
||||||
|
return nextClock, nil
|
||||||
|
default:
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (db sqlitePersistence) DeactivatedAutomaticStatusUpdates(fromClock uint64, tillClock uint64) (statusUpdates []UserStatus, err error) {
|
||||||
|
rows, err := db.db.Query(`
|
||||||
|
SELECT
|
||||||
|
public_key,
|
||||||
|
?,
|
||||||
|
clock + 1,
|
||||||
|
custom_text
|
||||||
|
FROM status_updates
|
||||||
|
WHERE clock > ? AND clock <= ? AND status_type = ?
|
||||||
|
`, protobuf.StatusUpdate_INACTIVE, fromClock, tillClock, protobuf.StatusUpdate_AUTOMATIC)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
for rows.Next() {
|
||||||
|
var userStatus UserStatus
|
||||||
|
err = rows.Scan(
|
||||||
|
&userStatus.PublicKey,
|
||||||
|
&userStatus.StatusType,
|
||||||
|
&userStatus.Clock,
|
||||||
|
&userStatus.CustomText,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
statusUpdates = append(statusUpdates, userStatus)
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func (db *sqlitePersistence) AddBookmark(bookmark browsers.Bookmark) (browsers.Bookmark, error) {
|
func (db *sqlitePersistence) AddBookmark(bookmark browsers.Bookmark) (browsers.Bookmark, error) {
|
||||||
tx, err := db.db.Begin()
|
tx, err := db.db.Begin()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -118,3 +118,7 @@ func (m *MessengerSignalsHandler) HistoryArchivesUnseeded(communityID string) {
|
||||||
func (m *MessengerSignalsHandler) HistoryArchiveDownloaded(communityID string, from int, to int) {
|
func (m *MessengerSignalsHandler) HistoryArchiveDownloaded(communityID string, from int, to int) {
|
||||||
signal.SendHistoryArchiveDownloaded(communityID, from, to)
|
signal.SendHistoryArchiveDownloaded(communityID, from, to)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *MessengerSignalsHandler) StatusUpdatesTimedOut(statusUpdates *[]protocol.UserStatus) {
|
||||||
|
signal.SendStatusUpdatesTimedOut(statusUpdates)
|
||||||
|
}
|
||||||
|
|
|
@ -8,6 +8,9 @@ const (
|
||||||
// EventCommunityFound triggered when user requested info about some community and messenger successfully
|
// EventCommunityFound triggered when user requested info about some community and messenger successfully
|
||||||
// retrieved it from mailserver
|
// retrieved it from mailserver
|
||||||
EventCommunityInfoFound = "community.found"
|
EventCommunityInfoFound = "community.found"
|
||||||
|
|
||||||
|
// Event Automatic Status Updates Timed out
|
||||||
|
EventStatusUpdatesTimedOut = "status.updates.timedout"
|
||||||
)
|
)
|
||||||
|
|
||||||
// MessageDeliveredSignal specifies chat and message that was delivered
|
// MessageDeliveredSignal specifies chat and message that was delivered
|
||||||
|
@ -16,7 +19,6 @@ type MessageDeliveredSignal struct {
|
||||||
MessageID string `json:"messageID"`
|
MessageID string `json:"messageID"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// MessageDeliveredSignal specifies chat and message that was delivered
|
|
||||||
type CommunityInfoFoundSignal struct {
|
type CommunityInfoFoundSignal struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
Description string `json:"description"`
|
Description string `json:"description"`
|
||||||
|
@ -33,3 +35,7 @@ func SendMessageDelivered(chatID string, messageID string) {
|
||||||
func SendCommunityInfoFound(community interface{}) {
|
func SendCommunityInfoFound(community interface{}) {
|
||||||
send(EventCommunityInfoFound, community)
|
send(EventCommunityInfoFound, community)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func SendStatusUpdatesTimedOut(statusUpdates interface{}) {
|
||||||
|
send(EventStatusUpdatesTimedOut, statusUpdates)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue