package protocol import ( "context" "fmt" "time" "github.com/golang/protobuf/proto" "go.uber.org/zap" datasyncnode "github.com/status-im/mvds/node" datasyncpeer "github.com/status-im/status-go/protocol/datasync/peer" "github.com/status-im/status-go/multiaccounts/settings" "github.com/status-im/status-go/protocol/common" "github.com/status-im/status-go/protocol/communities" "github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/transport" v1protocol "github.com/status-im/status-go/protocol/v1" ) func (m *Messenger) GetCurrentUserStatus() (*UserStatus, error) { status := &UserStatus{ StatusType: int(protobuf.StatusUpdate_AUTOMATIC), Clock: 0, CustomText: "", } err := m.settings.GetCurrentStatus(status) if err != nil { m.logger.Debug("Error obtaining latest status", zap.Error(err)) return nil, err } return status, nil } func (m *Messenger) sendUserStatus(ctx context.Context, status UserStatus) error { shouldBroadcastUserStatus, err := m.settings.ShouldBroadcastUserStatus() if err != nil { return err } if !shouldBroadcastUserStatus { m.logger.Debug("user status should not be broadcasted") return nil } status.Clock = uint64(time.Now().Unix()) err = m.settings.SaveSettingField(settings.CurrentUserStatus, status) if err != nil { return err } statusUpdate := &protobuf.StatusUpdate{ Clock: status.Clock, StatusType: protobuf.StatusUpdate_StatusType(status.StatusType), CustomText: status.CustomText, } encodedMessage, err := proto.Marshal(statusUpdate) if err != nil { return err } contactCodeTopic := transport.ContactCodeTopic(&m.identity.PublicKey) rawMessage := common.RawMessage{ LocalChatID: contactCodeTopic, Payload: encodedMessage, MessageType: protobuf.ApplicationMetadataMessage_STATUS_UPDATE, ResendType: common.ResendTypeNone, // does this need to be resent? Ephemeral: statusUpdate.StatusType == protobuf.StatusUpdate_AUTOMATIC, Priority: &common.LowPriority, } _, err = m.sender.SendPublic(ctx, contactCodeTopic, rawMessage) if err != nil { return err } joinedCommunities, err := m.communitiesManager.Joined() if err != nil { return err } for _, community := range joinedCommunities { rawMessage.LocalChatID = community.StatusUpdatesChannelID() rawMessage.PubsubTopic = community.PubsubTopic() _, err = m.sender.SendPublic(ctx, rawMessage.LocalChatID, rawMessage) if err != nil { return err } } return nil } func (m *Messenger) sendCurrentUserStatus(ctx context.Context) { err := m.persistence.CleanOlderStatusUpdates() if err != nil { m.logger.Debug("Error cleaning status updates", zap.Error(err)) return } shouldBroadcastUserStatus, err := m.settings.ShouldBroadcastUserStatus() if err != nil { m.logger.Debug("Error while getting status broadcast setting", zap.Error(err)) return } if !shouldBroadcastUserStatus { m.logger.Debug("user status should not be broadcasted") return } currStatus, err := m.GetCurrentUserStatus() if err != nil { m.logger.Debug("Error obtaining latest status", zap.Error(err)) return } if err := m.sendUserStatus(ctx, *currStatus); err != nil { m.logger.Debug("Error when sending the latest user status", zap.Error(err)) } } func (m *Messenger) sendCurrentUserStatusToCommunity(ctx context.Context, community *communities.Community) error { logger := m.logger.Named("sendCurrentUserStatusToCommunity") shouldBroadcastUserStatus, err := m.settings.ShouldBroadcastUserStatus() if err != nil { logger.Debug("m.settings.ShouldBroadcastUserStatus error", zap.Error(err)) return err } if !shouldBroadcastUserStatus { logger.Debug("user status should not be broadcasted") return nil } status, err := m.GetCurrentUserStatus() if err != nil { logger.Debug("Error obtaining latest status", zap.Error(err)) return err } status.Clock = uint64(time.Now().Unix()) err = m.settings.SaveSettingField(settings.CurrentUserStatus, status) if err != nil { logger.Debug("m.settings.SaveSetting error", zap.Any("current-user-status", status), zap.Error(err)) return err } statusUpdate := &protobuf.StatusUpdate{ Clock: status.Clock, StatusType: protobuf.StatusUpdate_StatusType(status.StatusType), CustomText: status.CustomText, } encodedMessage, err := proto.Marshal(statusUpdate) if err != nil { logger.Debug("proto.Marshal error", zap.Any("protobuf.StatusUpdate", statusUpdate), zap.Error(err)) return err } rawMessage := common.RawMessage{ LocalChatID: community.StatusUpdatesChannelID(), Payload: encodedMessage, MessageType: protobuf.ApplicationMetadataMessage_STATUS_UPDATE, ResendType: common.ResendTypeNone, // does this need to be resent? Ephemeral: statusUpdate.StatusType == protobuf.StatusUpdate_AUTOMATIC, PubsubTopic: community.PubsubTopic(), Priority: &common.LowPriority, } _, err = m.sender.SendPublic(ctx, rawMessage.LocalChatID, rawMessage) if err != nil { logger.Debug("m.sender.SendPublic error", zap.Error(err)) return err } return nil } func (m *Messenger) broadcastLatestUserStatus() { m.logger.Debug("broadcasting user status") ctx := context.Background() go func() { // Ensure that we are connected before sending a message time.Sleep(5 * time.Second) m.sendCurrentUserStatus(ctx) }() go func() { for { select { case <-time.After(5 * time.Minute): m.sendCurrentUserStatus(ctx) case <-m.quit: return } } }() } func (m *Messenger) SetUserStatus(ctx context.Context, newStatus int, newCustomText string) error { if len([]rune(newCustomText)) > maxStatusMessageText { return fmt.Errorf("custom text shouldn't be longer than %d", maxStatusMessageText) } if newStatus != int(protobuf.StatusUpdate_AUTOMATIC) && newStatus != int(protobuf.StatusUpdate_DO_NOT_DISTURB) && newStatus != int(protobuf.StatusUpdate_ALWAYS_ONLINE) && newStatus != int(protobuf.StatusUpdate_INACTIVE) { return fmt.Errorf("unknown status type") } currStatus, err := m.GetCurrentUserStatus() if err != nil { m.logger.Debug("Error obtaining latest status", zap.Error(err)) return err } if newStatus == currStatus.StatusType && newCustomText == currStatus.CustomText { m.logger.Debug("Status type did not change") return nil } currStatus.StatusType = newStatus currStatus.CustomText = newCustomText return m.sendUserStatus(ctx, *currStatus) } func (m *Messenger) HandleStatusUpdate(state *ReceivedMessageState, message *protobuf.StatusUpdate, statusMessage *v1protocol.StatusMessage) error { if err := ValidateStatusUpdate(message); err != nil { return err } if common.IsPubKeyEqual(state.CurrentMessageState.PublicKey, &m.identity.PublicKey) { // Status message is ours currentStatus, err := m.GetCurrentUserStatus() if err != nil { m.logger.Debug("Error obtaining latest status", zap.Error(err)) return err } if currentStatus.Clock >= message.Clock { return nil // older status message, or status does not change ignoring it } newStatus := ToUserStatus(message) err = m.settings.SaveSettingField(settings.CurrentUserStatus, newStatus) if err != nil { return err } state.Response.SetCurrentStatus(newStatus) } else { statusUpdate := ToUserStatus(message) statusUpdate.PublicKey = state.CurrentMessageState.Contact.ID err := m.persistence.InsertStatusUpdate(statusUpdate) if err != nil { return err } state.Response.AddStatusUpdate(statusUpdate) if statusUpdate.StatusType == int(protobuf.StatusUpdate_AUTOMATIC) || statusUpdate.StatusType == int(protobuf.StatusUpdate_ALWAYS_ONLINE) || statusUpdate.StatusType == int(protobuf.StatusUpdate_INACTIVE) { m.logger.Debug("reset data sync for peer", zap.String("public_key", statusUpdate.PublicKey), zap.Uint64("clock", statusUpdate.Clock)) select { case m.mvdsStatusChangeEvent <- datasyncnode.PeerStatusChangeEvent{ PeerID: datasyncpeer.PublicKeyToPeerID(*state.CurrentMessageState.PublicKey), Status: datasyncnode.OnlineStatus, EventTime: statusUpdate.Clock, }: default: m.logger.Debug("mvdsStatusChangeEvent channel is full") } } } return nil } func (m *Messenger) StatusUpdates() ([]UserStatus, error) { return m.persistence.StatusUpdates() } func (m *Messenger) timeoutStatusUpdates(fromClock uint64, tillClock uint64) { // Most of the time we only need to time out just one status update, // but the range covers special cases like, other status updates had the same clock value // or the received another status update with higher clock value than the reference clock but // lower clock value than the nextClock deactivatedStatusUpdates, err := m.persistence.DeactivatedAutomaticStatusUpdates(fromClock, tillClock) // Send deactivatedStatusUpdates to Client if err == nil { if m.config.messengerSignalsHandler != nil { m.config.messengerSignalsHandler.StatusUpdatesTimedOut(&deactivatedStatusUpdates) } } else { m.logger.Debug("Unable to get deactivated automatic status updates from db", zap.Error(err)) } } func (m *Messenger) timeoutAutomaticStatusUpdates() { nextClock := uint64(0) waitDuration := uint64(10) // Initial 10 sec wait, to make sure new status updates are fetched before starting timing out loop fiveMinutes := uint64(5 * 60) referenceClock := uint64(time.Now().Unix()) - fiveMinutes go func() { for { select { case <-time.After(time.Duration(waitDuration) * time.Second): tempNextClock, err := m.persistence.NextHigherClockValueOfAutomaticStatusUpdates(referenceClock) if err == nil { if nextClock == 0 || tempNextClock > nextClock { nextClock = tempNextClock // Extra 5 sec wait (broadcast receiving delay) waitDuration = tempNextClock + fiveMinutes + 5 - uint64(time.Now().Unix()) } else { m.timeoutStatusUpdates(referenceClock, tempNextClock) waitDuration = 0 referenceClock = tempNextClock } } else if err == common.ErrRecordNotFound { // No More status updates to timeout, keep loop running at five minutes interval waitDuration = fiveMinutes } else { m.logger.Debug("Unable to timeout automatic status updates", zap.Error(err)) return } case <-m.quit: return } } }() }