status-go/protocol/messenger.go
Andrea Maria Piana 2d1f6c1cfa Don't pass contacts without custom fields to the client
When receiving a message with save a contact in the database in order to
avoid re-calculating image/profile.
This contact is then passed to the client, which can negatively impact
performance.

This commit changes the behavior so that only those contacts that have
some custom fields (have been explicitly added by the user, have been
blocked by the user, have sent a contact request or have a verified ens
name) are passed to the client.
2020-04-21 09:09:27 +02:00

2931 lines
77 KiB
Go

package protocol
import (
"context"
"crypto/ecdsa"
"database/sql"
"math/rand"
"sync"
"time"
"github.com/pkg/errors"
"go.uber.org/zap"
"github.com/golang/protobuf/proto"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types"
enstypes "github.com/status-im/status-go/eth-node/types/ens"
"github.com/status-im/status-go/protocol/encryption"
"github.com/status-im/status-go/protocol/encryption/multidevice"
"github.com/status-im/status-go/protocol/encryption/sharedsecret"
"github.com/status-im/status-go/protocol/identity/alias"
"github.com/status-im/status-go/protocol/identity/identicon"
"github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/sqlite"
"github.com/status-im/status-go/protocol/transport"
wakutransp "github.com/status-im/status-go/protocol/transport/waku"
shhtransp "github.com/status-im/status-go/protocol/transport/whisper"
v1protocol "github.com/status-im/status-go/protocol/v1"
)
const PubKeyStringLength = 132
const transactionSentTxt = "Transaction sent"
var (
ErrChatIDEmpty = errors.New("chat ID is empty")
ErrChatNotFound = errors.New("can't find chat")
ErrNotImplemented = errors.New("not implemented")
)
// Messenger is a entity managing chats and messages.
// It acts as a bridge between the application and encryption
// layers.
// It needs to expose an interface to manage installations
// because installations are managed by the user.
// Similarly, it needs to expose an interface to manage
// mailservers because they can also be managed by the user.
type Messenger struct {
node types.Node
identity *ecdsa.PrivateKey
persistence *sqlitePersistence
transport transport.Transport
encryptor *encryption.Protocol
processor *messageProcessor
handler *MessageHandler
logger *zap.Logger
verifyTransactionClient EthClient
featureFlags featureFlags
messagesPersistenceEnabled bool
shutdownTasks []func() error
systemMessagesTranslations map[protobuf.MembershipUpdateEvent_EventType]string
allChats map[string]*Chat
allContacts map[string]*Contact
allInstallations map[string]*multidevice.Installation
modifiedInstallations map[string]bool
installationID string
mutex sync.Mutex
}
type RawResponse struct {
Filter *transport.Filter `json:"filter"`
Messages []*v1protocol.StatusMessage `json:"messages"`
}
type MessengerResponse struct {
Chats []*Chat `json:"chats,omitempty"`
Messages []*Message `json:"messages,omitempty"`
Contacts []*Contact `json:"contacts,omitempty"`
Installations []*multidevice.Installation `json:"installations,omitempty"`
}
func (m *MessengerResponse) IsEmpty() bool {
return len(m.Chats) == 0 && len(m.Messages) == 0 && len(m.Contacts) == 0 && len(m.Installations) == 0
}
type featureFlags struct {
// datasync indicates whether direct messages should be sent exclusively
// using datasync, breaking change for non-v1 clients. Public messages
// are not impacted
datasync bool
}
type dbConfig struct {
dbPath string
dbKey string
}
type config struct {
// This needs to be exposed until we move here mailserver logic
// as otherwise the client is not notified of a new filter and
// won't be pulling messages from mailservers until it reloads the chats/filters
onNegotiatedFilters func([]*transport.Filter)
// DEPRECATED: no need to expose it
onSendContactCodeHandler func(*encryption.ProtocolMessageSpec)
// systemMessagesTranslations holds translations for system-messages
systemMessagesTranslations map[protobuf.MembershipUpdateEvent_EventType]string
// Config for the envelopes monitor
envelopesMonitorConfig *transport.EnvelopesMonitorConfig
messagesPersistenceEnabled bool
featureFlags featureFlags
// A path to a database or a database instance is required.
// The database instance has a higher priority.
dbConfig dbConfig
db *sql.DB
verifyTransactionClient EthClient
logger *zap.Logger
}
type Option func(*config) error
// WithSystemMessagesTranslations is required for Group Chats which are currently disabled.
// nolint: unused
func WithSystemMessagesTranslations(t map[protobuf.MembershipUpdateEvent_EventType]string) Option {
return func(c *config) error {
c.systemMessagesTranslations = t
return nil
}
}
func WithOnNegotiatedFilters(h func([]*transport.Filter)) Option {
return func(c *config) error {
c.onNegotiatedFilters = h
return nil
}
}
func WithCustomLogger(logger *zap.Logger) Option {
return func(c *config) error {
c.logger = logger
return nil
}
}
func WithMessagesPersistenceEnabled() Option {
return func(c *config) error {
c.messagesPersistenceEnabled = true
return nil
}
}
func WithDatabaseConfig(dbPath, dbKey string) Option {
return func(c *config) error {
c.dbConfig = dbConfig{dbPath: dbPath, dbKey: dbKey}
return nil
}
}
func WithVerifyTransactionClient(client EthClient) Option {
return func(c *config) error {
c.verifyTransactionClient = client
return nil
}
}
func WithDatabase(db *sql.DB) Option {
return func(c *config) error {
c.db = db
return nil
}
}
func WithDatasync() func(c *config) error {
return func(c *config) error {
c.featureFlags.datasync = true
return nil
}
}
func WithEnvelopesMonitorConfig(emc *transport.EnvelopesMonitorConfig) Option {
return func(c *config) error {
c.envelopesMonitorConfig = emc
return nil
}
}
func NewMessenger(
identity *ecdsa.PrivateKey,
node types.Node,
installationID string,
opts ...Option,
) (*Messenger, error) {
var messenger *Messenger
c := config{}
for _, opt := range opts {
if err := opt(&c); err != nil {
return nil, err
}
}
logger := c.logger
if c.logger == nil {
var err error
if logger, err = zap.NewDevelopment(); err != nil {
return nil, errors.Wrap(err, "failed to create a logger")
}
}
onNewInstallationsHandler := func(installations []*multidevice.Installation) {
for _, installation := range installations {
if installation.Identity == contactIDFromPublicKey(&messenger.identity.PublicKey) {
if _, ok := messenger.allInstallations[installation.ID]; !ok {
messenger.allInstallations[installation.ID] = installation
messenger.modifiedInstallations[installation.ID] = true
}
}
}
}
// Set default config fields.
onNewSharedSecretHandler := func(secrets []*sharedsecret.Secret) {
filters, err := messenger.handleSharedSecrets(secrets)
if err != nil {
slogger := logger.With(zap.String("site", "onNewSharedSecretHandler"))
slogger.Warn("failed to process secrets", zap.Error(err))
}
if c.onNegotiatedFilters != nil {
c.onNegotiatedFilters(filters)
}
}
if c.onSendContactCodeHandler == nil {
c.onSendContactCodeHandler = func(messageSpec *encryption.ProtocolMessageSpec) {
slogger := logger.With(zap.String("site", "onSendContactCodeHandler"))
slogger.Debug("received a SendContactCode request")
newMessage, err := messageSpecToWhisper(messageSpec)
if err != nil {
slogger.Warn("failed to convert spec to Whisper message", zap.Error(err))
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
chatName := transport.ContactCodeTopic(&messenger.identity.PublicKey)
_, err = messenger.transport.SendPublic(ctx, newMessage, chatName)
if err != nil {
slogger.Warn("failed to send a contact code", zap.Error(err))
}
}
}
if c.systemMessagesTranslations == nil {
c.systemMessagesTranslations = defaultSystemMessagesTranslations
}
// Configure the database.
database := c.db
if c.db == nil && c.dbConfig == (dbConfig{}) {
return nil, errors.New("database instance or database path needs to be provided")
}
if c.db == nil {
logger.Info("opening a database", zap.String("dbPath", c.dbConfig.dbPath))
var err error
database, err = sqlite.Open(c.dbConfig.dbPath, c.dbConfig.dbKey)
if err != nil {
return nil, errors.Wrap(err, "failed to initialize database from the db config")
}
}
// Apply migrations for all components.
err := sqlite.Migrate(database)
if err != nil {
return nil, errors.Wrap(err, "failed to apply migrations")
}
// Initialize transport layer.
var transp transport.Transport
if shh, err := node.GetWhisper(nil); err == nil && shh != nil {
transp, err = shhtransp.NewTransport(
shh,
identity,
database,
nil,
c.envelopesMonitorConfig,
logger,
)
if err != nil {
return nil, errors.Wrap(err, "failed to create Transport")
}
} else {
logger.Info("failed to find Whisper service; trying Waku", zap.Error(err))
waku, err := node.GetWaku(nil)
if err != nil || waku == nil {
return nil, errors.Wrap(err, "failed to find Whisper and Waku services")
}
transp, err = wakutransp.NewTransport(
waku,
identity,
database,
nil,
c.envelopesMonitorConfig,
logger,
)
if err != nil {
return nil, errors.Wrap(err, "failed to create Transport")
}
}
// Initialize encryption layer.
encryptionProtocol := encryption.New(
database,
installationID,
onNewInstallationsHandler,
onNewSharedSecretHandler,
c.onSendContactCodeHandler,
logger,
)
processor, err := newMessageProcessor(
identity,
database,
encryptionProtocol,
transp,
logger,
c.featureFlags,
)
if err != nil {
return nil, errors.Wrap(err, "failed to create messageProcessor")
}
handler := newMessageHandler(identity, logger, &sqlitePersistence{db: database})
messenger = &Messenger{
node: node,
identity: identity,
persistence: &sqlitePersistence{db: database},
transport: transp,
encryptor: encryptionProtocol,
processor: processor,
handler: handler,
featureFlags: c.featureFlags,
systemMessagesTranslations: c.systemMessagesTranslations,
allChats: make(map[string]*Chat),
allContacts: make(map[string]*Contact),
allInstallations: make(map[string]*multidevice.Installation),
installationID: installationID,
modifiedInstallations: make(map[string]bool),
messagesPersistenceEnabled: c.messagesPersistenceEnabled,
verifyTransactionClient: c.verifyTransactionClient,
shutdownTasks: []func() error{
database.Close,
transp.ResetFilters,
transp.Stop,
func() error { processor.Stop(); return nil },
// Currently this often fails, seems like it's safe to ignore them
// https://github.com/uber-go/zap/issues/328
func() error { _ = logger.Sync; return nil },
},
logger: logger,
}
logger.Debug("messages persistence", zap.Bool("enabled", c.messagesPersistenceEnabled))
return messenger, nil
}
func (m *Messenger) Start() error {
return m.encryptor.Start(m.identity)
}
// Init analyzes chats and contacts in order to setup filters
// which are responsible for retrieving messages.
func (m *Messenger) Init() error {
m.mutex.Lock()
defer m.mutex.Unlock()
// Seed the for color generation
rand.Seed(time.Now().Unix())
logger := m.logger.With(zap.String("site", "Init"))
var (
publicChatIDs []string
publicKeys []*ecdsa.PublicKey
)
// Get chat IDs and public keys from the existing chats.
// TODO: Get only active chats by the query.
chats, err := m.persistence.Chats()
if err != nil {
return err
}
for _, chat := range chats {
if err := chat.Validate(); err != nil {
logger.Warn("failed to validate chat", zap.Error(err))
continue
}
m.allChats[chat.ID] = chat
if !chat.Active {
continue
}
switch chat.ChatType {
case ChatTypePublic:
publicChatIDs = append(publicChatIDs, chat.ID)
case ChatTypeOneToOne:
pk, err := chat.PublicKey()
if err != nil {
return err
}
publicKeys = append(publicKeys, pk)
case ChatTypePrivateGroupChat:
for _, member := range chat.Members {
publicKey, err := member.PublicKey()
if err != nil {
return errors.Wrapf(err, "invalid public key for member %s in chat %s", member.ID, chat.Name)
}
publicKeys = append(publicKeys, publicKey)
}
default:
return errors.New("invalid chat type")
}
}
// Get chat IDs and public keys from the contacts.
contacts, err := m.persistence.Contacts()
if err != nil {
return err
}
for _, contact := range contacts {
m.allContacts[contact.ID] = contact
// We only need filters for contacts added by us and not blocked.
if !contact.IsAdded() || contact.IsBlocked() {
continue
}
publicKey, err := contact.PublicKey()
if err != nil {
logger.Error("failed to get contact's public key", zap.Error(err))
continue
}
publicKeys = append(publicKeys, publicKey)
}
installations, err := m.encryptor.GetOurInstallations(&m.identity.PublicKey)
if err != nil {
return err
}
for _, installation := range installations {
m.allInstallations[installation.ID] = installation
}
_, err = m.transport.InitFilters(publicChatIDs, publicKeys)
return err
}
// Shutdown takes care of ensuring a clean shutdown of Messenger
func (m *Messenger) Shutdown() (err error) {
for _, task := range m.shutdownTasks {
if tErr := task(); tErr != nil {
if err == nil {
// First error appeared.
err = tErr
} else {
// We return all errors. They will be concatenated in the order of occurrence,
// however, they will also be returned as a single error.
err = errors.Wrap(err, tErr.Error())
}
}
}
return
}
func (m *Messenger) handleSharedSecrets(secrets []*sharedsecret.Secret) ([]*transport.Filter, error) {
logger := m.logger.With(zap.String("site", "handleSharedSecrets"))
var result []*transport.Filter
for _, secret := range secrets {
logger.Debug("received shared secret", zap.Binary("identity", crypto.FromECDSAPub(secret.Identity)))
fSecret := types.NegotiatedSecret{
PublicKey: secret.Identity,
Key: secret.Key,
}
filter, err := m.transport.ProcessNegotiatedSecret(fSecret)
if err != nil {
return nil, err
}
result = append(result, filter)
}
return result, nil
}
func (m *Messenger) EnableInstallation(id string) error {
m.mutex.Lock()
defer m.mutex.Unlock()
installation, ok := m.allInstallations[id]
if !ok {
return errors.New("no installation found")
}
err := m.encryptor.EnableInstallation(&m.identity.PublicKey, id)
if err != nil {
return err
}
installation.Enabled = true
m.allInstallations[id] = installation
return nil
}
func (m *Messenger) DisableInstallation(id string) error {
m.mutex.Lock()
defer m.mutex.Unlock()
installation, ok := m.allInstallations[id]
if !ok {
return errors.New("no installation found")
}
err := m.encryptor.DisableInstallation(&m.identity.PublicKey, id)
if err != nil {
return err
}
installation.Enabled = false
m.allInstallations[id] = installation
return nil
}
func (m *Messenger) Installations() []*multidevice.Installation {
m.mutex.Lock()
defer m.mutex.Unlock()
installations := make([]*multidevice.Installation, len(m.allInstallations))
var i = 0
for _, installation := range m.allInstallations {
installations[i] = installation
i++
}
return installations
}
func (m *Messenger) setInstallationMetadata(id string, data *multidevice.InstallationMetadata) error {
installation, ok := m.allInstallations[id]
if !ok {
return errors.New("no installation found")
}
installation.InstallationMetadata = data
return m.encryptor.SetInstallationMetadata(&m.identity.PublicKey, id, data)
}
func (m *Messenger) SetInstallationMetadata(id string, data *multidevice.InstallationMetadata) error {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.setInstallationMetadata(id, data)
}
// NOT IMPLEMENTED
func (m *Messenger) SelectMailserver(id string) error {
return ErrNotImplemented
}
// NOT IMPLEMENTED
func (m *Messenger) AddMailserver(enode string) error {
return ErrNotImplemented
}
// NOT IMPLEMENTED
func (m *Messenger) RemoveMailserver(id string) error {
return ErrNotImplemented
}
// NOT IMPLEMENTED
func (m *Messenger) Mailservers() ([]string, error) {
return nil, ErrNotImplemented
}
func (m *Messenger) Join(chat Chat) error {
switch chat.ChatType {
case ChatTypeOneToOne:
pk, err := chat.PublicKey()
if err != nil {
return err
}
return m.transport.JoinPrivate(pk)
case ChatTypePrivateGroupChat:
members, err := chat.MembersAsPublicKeys()
if err != nil {
return err
}
return m.transport.JoinGroup(members)
case ChatTypePublic:
return m.transport.JoinPublic(chat.ID)
default:
return errors.New("chat is neither public nor private")
}
}
// This is not accurate, it should not leave transport on removal of chat/group
// only once there is no more: Group chat with that member, one-to-one chat, contact added by us
func (m *Messenger) Leave(chat Chat) error {
switch chat.ChatType {
case ChatTypeOneToOne:
pk, err := chat.PublicKey()
if err != nil {
return err
}
return m.transport.LeavePrivate(pk)
case ChatTypePrivateGroupChat:
members, err := chat.MembersAsPublicKeys()
if err != nil {
return err
}
return m.transport.LeaveGroup(members)
case ChatTypePublic:
return m.transport.LeavePublic(chat.Name)
default:
return errors.New("chat is neither public nor private")
}
}
func (m *Messenger) CreateGroupChatWithMembers(ctx context.Context, name string, members []string) (*MessengerResponse, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
var response MessengerResponse
logger := m.logger.With(zap.String("site", "CreateGroupChatWithMembers"))
logger.Info("Creating group chat", zap.String("name", name), zap.Any("members", members))
chat := CreateGroupChat(m.getTimesource())
clock, _ := chat.NextClockAndTimestamp(m.getTimesource())
group, err := v1protocol.NewGroupWithCreator(name, clock, m.identity)
if err != nil {
return nil, err
}
chat.LastClockValue = clock
chat.updateChatFromProtocolGroup(group)
clock, _ = chat.NextClockAndTimestamp(m.getTimesource())
// Add members
event := v1protocol.NewMembersAddedEvent(members, clock)
event.ChatID = chat.ID
err = event.Sign(m.identity)
if err != nil {
return nil, err
}
err = group.ProcessEvent(event)
if err != nil {
return nil, err
}
recipients, err := stringSliceToPublicKeys(group.Members(), true)
if err != nil {
return nil, err
}
encodedMessage, err := m.processor.EncodeMembershipUpdate(group, nil)
if err != nil {
return nil, err
}
m.allChats[chat.ID] = &chat
_, err = m.dispatchMessage(ctx, &RawMessage{
LocalChatID: chat.ID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_MEMBERSHIP_UPDATE_MESSAGE,
Recipients: recipients,
ResendAutomatically: true,
})
if err != nil {
return nil, err
}
chat.updateChatFromProtocolGroup(group)
response.Chats = []*Chat{&chat}
response.Messages = buildSystemMessages(chat.MembershipUpdates, m.systemMessagesTranslations)
err = m.persistence.SaveMessagesLegacy(response.Messages)
if err != nil {
return nil, err
}
return &response, m.saveChat(&chat)
}
func (m *Messenger) RemoveMemberFromGroupChat(ctx context.Context, chatID string, member string) (*MessengerResponse, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
var response MessengerResponse
logger := m.logger.With(zap.String("site", "RemoveMemberFromGroupChat"))
logger.Info("Removing member form group chat", zap.String("chatID", chatID), zap.String("member", member))
chat, ok := m.allChats[chatID]
if !ok {
return nil, ErrChatNotFound
}
group, err := newProtocolGroupFromChat(chat)
if err != nil {
return nil, err
}
// We save the initial recipients as we want to send updates to also
// the members kicked out
oldRecipients, err := stringSliceToPublicKeys(group.Members(), true)
if err != nil {
return nil, err
}
clock, _ := chat.NextClockAndTimestamp(m.getTimesource())
// Remove member
event := v1protocol.NewMemberRemovedEvent(member, clock)
event.ChatID = chat.ID
err = event.Sign(m.identity)
if err != nil {
return nil, err
}
err = group.ProcessEvent(event)
if err != nil {
return nil, err
}
encodedMessage, err := m.processor.EncodeMembershipUpdate(group, nil)
if err != nil {
return nil, err
}
_, err = m.dispatchMessage(ctx, &RawMessage{
LocalChatID: chat.ID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_MEMBERSHIP_UPDATE_MESSAGE,
Recipients: oldRecipients,
ResendAutomatically: true,
})
if err != nil {
return nil, err
}
chat.updateChatFromProtocolGroup(group)
response.Chats = []*Chat{chat}
response.Messages = buildSystemMessages(chat.MembershipUpdates, m.systemMessagesTranslations)
err = m.persistence.SaveMessagesLegacy(response.Messages)
if err != nil {
return nil, err
}
return &response, m.saveChat(chat)
}
func (m *Messenger) AddMembersToGroupChat(ctx context.Context, chatID string, members []string) (*MessengerResponse, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
var response MessengerResponse
logger := m.logger.With(zap.String("site", "AddMembersFromGroupChat"))
logger.Info("Adding members form group chat", zap.String("chatID", chatID), zap.Any("members", members))
chat, ok := m.allChats[chatID]
if !ok {
return nil, ErrChatNotFound
}
group, err := newProtocolGroupFromChat(chat)
if err != nil {
return nil, err
}
clock, _ := chat.NextClockAndTimestamp(m.getTimesource())
// Add members
event := v1protocol.NewMembersAddedEvent(members, clock)
event.ChatID = chat.ID
err = event.Sign(m.identity)
if err != nil {
return nil, err
}
err = group.ProcessEvent(event)
if err != nil {
return nil, err
}
recipients, err := stringSliceToPublicKeys(group.Members(), true)
if err != nil {
return nil, err
}
encodedMessage, err := m.processor.EncodeMembershipUpdate(group, nil)
if err != nil {
return nil, err
}
_, err = m.dispatchMessage(ctx, &RawMessage{
LocalChatID: chat.ID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_MEMBERSHIP_UPDATE_MESSAGE,
Recipients: recipients,
ResendAutomatically: true,
})
if err != nil {
return nil, err
}
chat.updateChatFromProtocolGroup(group)
response.Chats = []*Chat{chat}
response.Messages = buildSystemMessages([]v1protocol.MembershipUpdateEvent{event}, m.systemMessagesTranslations)
err = m.persistence.SaveMessagesLegacy(response.Messages)
if err != nil {
return nil, err
}
return &response, m.saveChat(chat)
}
func (m *Messenger) ChangeGroupChatName(ctx context.Context, chatID string, name string) (*MessengerResponse, error) {
logger := m.logger.With(zap.String("site", "ChangeGroupChatName"))
logger.Info("Changing group chat name", zap.String("chatID", chatID), zap.String("name", name))
m.mutex.Lock()
defer m.mutex.Unlock()
chat, ok := m.allChats[chatID]
if !ok {
return nil, ErrChatNotFound
}
group, err := newProtocolGroupFromChat(chat)
if err != nil {
return nil, err
}
clock, _ := chat.NextClockAndTimestamp(m.getTimesource())
// Add members
event := v1protocol.NewNameChangedEvent(name, clock)
event.ChatID = chat.ID
err = event.Sign(m.identity)
if err != nil {
return nil, err
}
// Update in-memory group
err = group.ProcessEvent(event)
if err != nil {
return nil, err
}
recipients, err := stringSliceToPublicKeys(group.Members(), true)
if err != nil {
return nil, err
}
encodedMessage, err := m.processor.EncodeMembershipUpdate(group, nil)
if err != nil {
return nil, err
}
_, err = m.dispatchMessage(ctx, &RawMessage{
LocalChatID: chat.ID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_MEMBERSHIP_UPDATE_MESSAGE,
Recipients: recipients,
ResendAutomatically: true,
})
if err != nil {
return nil, err
}
chat.updateChatFromProtocolGroup(group)
var response MessengerResponse
response.Chats = []*Chat{chat}
response.Messages = buildSystemMessages([]v1protocol.MembershipUpdateEvent{event}, m.systemMessagesTranslations)
err = m.persistence.SaveMessagesLegacy(response.Messages)
if err != nil {
return nil, err
}
return &response, m.saveChat(chat)
}
func (m *Messenger) AddAdminsToGroupChat(ctx context.Context, chatID string, members []string) (*MessengerResponse, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
var response MessengerResponse
logger := m.logger.With(zap.String("site", "AddAdminsToGroupChat"))
logger.Info("Add admins to group chat", zap.String("chatID", chatID), zap.Any("members", members))
chat, ok := m.allChats[chatID]
if !ok {
return nil, ErrChatNotFound
}
group, err := newProtocolGroupFromChat(chat)
if err != nil {
return nil, err
}
clock, _ := chat.NextClockAndTimestamp(m.getTimesource())
// Add members
event := v1protocol.NewAdminsAddedEvent(members, clock)
event.ChatID = chat.ID
err = event.Sign(m.identity)
if err != nil {
return nil, err
}
err = group.ProcessEvent(event)
if err != nil {
return nil, err
}
recipients, err := stringSliceToPublicKeys(group.Members(), true)
if err != nil {
return nil, err
}
encodedMessage, err := m.processor.EncodeMembershipUpdate(group, nil)
if err != nil {
return nil, err
}
_, err = m.dispatchMessage(ctx, &RawMessage{
LocalChatID: chat.ID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_MEMBERSHIP_UPDATE_MESSAGE,
Recipients: recipients,
ResendAutomatically: true,
})
if err != nil {
return nil, err
}
chat.updateChatFromProtocolGroup(group)
response.Chats = []*Chat{chat}
response.Messages = buildSystemMessages([]v1protocol.MembershipUpdateEvent{event}, m.systemMessagesTranslations)
err = m.persistence.SaveMessagesLegacy(response.Messages)
if err != nil {
return nil, err
}
return &response, m.saveChat(chat)
}
func (m *Messenger) ConfirmJoiningGroup(ctx context.Context, chatID string) (*MessengerResponse, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
var response MessengerResponse
chat, ok := m.allChats[chatID]
if !ok {
return nil, ErrChatNotFound
}
err := m.Join(*chat)
if err != nil {
return nil, err
}
group, err := newProtocolGroupFromChat(chat)
if err != nil {
return nil, err
}
clock, _ := chat.NextClockAndTimestamp(m.getTimesource())
event := v1protocol.NewMemberJoinedEvent(
clock,
)
event.ChatID = chat.ID
err = event.Sign(m.identity)
if err != nil {
return nil, err
}
err = group.ProcessEvent(event)
if err != nil {
return nil, err
}
recipients, err := stringSliceToPublicKeys(group.Members(), true)
if err != nil {
return nil, err
}
encodedMessage, err := m.processor.EncodeMembershipUpdate(group, nil)
if err != nil {
return nil, err
}
_, err = m.dispatchMessage(ctx, &RawMessage{
LocalChatID: chat.ID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_MEMBERSHIP_UPDATE_MESSAGE,
Recipients: recipients,
ResendAutomatically: true,
})
if err != nil {
return nil, err
}
chat.updateChatFromProtocolGroup(group)
response.Chats = []*Chat{chat}
response.Messages = buildSystemMessages([]v1protocol.MembershipUpdateEvent{event}, m.systemMessagesTranslations)
err = m.persistence.SaveMessagesLegacy(response.Messages)
if err != nil {
return nil, err
}
return &response, m.saveChat(chat)
}
func (m *Messenger) LeaveGroupChat(ctx context.Context, chatID string, remove bool) (*MessengerResponse, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
var response MessengerResponse
chat, ok := m.allChats[chatID]
if !ok {
return nil, ErrChatNotFound
}
err := m.Leave(*chat)
if err != nil {
return nil, err
}
group, err := newProtocolGroupFromChat(chat)
if err != nil {
return nil, err
}
clock, _ := chat.NextClockAndTimestamp(m.getTimesource())
event := v1protocol.NewMemberRemovedEvent(
contactIDFromPublicKey(&m.identity.PublicKey),
clock,
)
event.ChatID = chat.ID
err = event.Sign(m.identity)
if err != nil {
return nil, err
}
err = group.ProcessEvent(event)
if err != nil {
return nil, err
}
recipients, err := stringSliceToPublicKeys(group.Members(), true)
if err != nil {
return nil, err
}
encodedMessage, err := m.processor.EncodeMembershipUpdate(group, nil)
if err != nil {
return nil, err
}
_, err = m.dispatchMessage(ctx, &RawMessage{
LocalChatID: chat.ID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_MEMBERSHIP_UPDATE_MESSAGE,
Recipients: recipients,
ResendAutomatically: true,
})
if err != nil {
return nil, err
}
chat.updateChatFromProtocolGroup(group)
if remove {
chat.Active = false
}
response.Chats = []*Chat{chat}
response.Messages = buildSystemMessages([]v1protocol.MembershipUpdateEvent{event}, m.systemMessagesTranslations)
err = m.persistence.SaveMessagesLegacy(response.Messages)
if err != nil {
return nil, err
}
return &response, m.saveChat(chat)
}
func (m *Messenger) saveChat(chat *Chat) error {
_, ok := m.allChats[chat.ID]
// Sync chat if it's a new active public chat
if !ok && chat.Active && chat.Public() {
if err := m.syncPublicChat(context.Background(), chat); err != nil {
return err
}
}
err := m.persistence.SaveChat(*chat)
if err != nil {
return err
}
m.allChats[chat.ID] = chat
return nil
}
func (m *Messenger) saveChats(chats []*Chat) error {
err := m.persistence.SaveChats(chats)
if err != nil {
return err
}
for _, chat := range chats {
m.allChats[chat.ID] = chat
}
return nil
}
func (m *Messenger) SaveChat(chat *Chat) error {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.saveChat(chat)
}
func (m *Messenger) Chats() []*Chat {
m.mutex.Lock()
defer m.mutex.Unlock()
var chats []*Chat
for _, c := range m.allChats {
chats = append(chats, c)
}
return chats
}
func (m *Messenger) DeleteChat(chatID string) error {
m.mutex.Lock()
defer m.mutex.Unlock()
err := m.persistence.DeleteChat(chatID)
if err != nil {
return err
}
delete(m.allChats, chatID)
return nil
}
func (m *Messenger) isNewContact(contact *Contact) bool {
previousContact, ok := m.allContacts[contact.ID]
return contact.IsAdded() && (!ok || !previousContact.IsAdded())
}
func (m *Messenger) saveContact(contact *Contact) error {
identicon, err := identicon.GenerateBase64(contact.ID)
if err != nil {
return err
}
contact.Identicon = identicon
name, err := alias.GenerateFromPublicKeyString(contact.ID)
if err != nil {
return err
}
contact.Alias = name
if m.isNewContact(contact) {
err := m.syncContact(context.Background(), contact)
if err != nil {
return err
}
}
err = m.persistence.SaveContact(contact, nil)
if err != nil {
return err
}
m.allContacts[contact.ID] = contact
return nil
}
func (m *Messenger) SaveContact(contact *Contact) error {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.saveContact(contact)
}
func (m *Messenger) BlockContact(contact *Contact) ([]*Chat, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
chats, err := m.persistence.BlockContact(contact)
if err != nil {
return nil, err
}
m.allContacts[contact.ID] = contact
for _, chat := range chats {
m.allChats[chat.ID] = chat
}
delete(m.allChats, contact.ID)
return chats, nil
}
func (m *Messenger) Contacts() []*Contact {
m.mutex.Lock()
defer m.mutex.Unlock()
var contacts []*Contact
for _, contact := range m.allContacts {
if contact.HasCustomFields() {
contacts = append(contacts, contact)
}
}
return contacts
}
// GetContactByID assumes pubKey includes 0x prefix
func (m *Messenger) GetContactByID(pubKey string) (*Contact, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
contact, ok := m.allContacts[pubKey]
if !ok {
return nil, errors.New("no contact found")
}
return contact, nil
}
// ReSendChatMessage pulls a message from the database and sends it again
func (m *Messenger) ReSendChatMessage(ctx context.Context, messageID string) error {
m.mutex.Lock()
defer m.mutex.Unlock()
message, err := m.persistence.RawMessageByID(messageID)
if err != nil {
return err
}
chat, ok := m.allChats[message.LocalChatID]
if !ok {
return errors.New("chat not found")
}
_, err = m.dispatchMessage(ctx, &RawMessage{
LocalChatID: chat.ID,
Payload: message.Payload,
MessageType: message.MessageType,
Recipients: message.Recipients,
})
return err
}
func (m *Messenger) hasPairedDevices() bool {
var count int
for _, i := range m.allInstallations {
if i.Enabled {
count++
}
}
return count > 1
}
// sendToPairedDevices will check if we have any paired devices and send to them if necessary
func (m *Messenger) sendToPairedDevices(ctx context.Context, payload []byte, messageType protobuf.ApplicationMetadataMessage_Type) error {
hasPairedDevices := m.hasPairedDevices()
// We send a message to any paired device
if hasPairedDevices {
_, err := m.processor.SendPrivateRaw(ctx, &m.identity.PublicKey, payload, messageType)
if err != nil {
return err
}
}
return nil
}
func (m *Messenger) dispatchPairInstallationMessage(ctx context.Context, spec *RawMessage) ([]byte, error) {
var err error
var id []byte
id, err = m.processor.SendPairInstallation(ctx, &m.identity.PublicKey, spec.Payload, spec.MessageType)
if err != nil {
return nil, err
}
spec.ID = types.EncodeHex(id)
spec.SendCount++
err = m.persistence.SaveRawMessage(spec)
if err != nil {
return nil, err
}
return id, nil
}
func (m *Messenger) dispatchMessage(ctx context.Context, spec *RawMessage) ([]byte, error) {
var err error
var id []byte
logger := m.logger.With(zap.String("site", "dispatchMessage"), zap.String("chatID", spec.LocalChatID))
chat, ok := m.allChats[spec.LocalChatID]
if !ok {
return nil, errors.New("no chat found")
}
switch chat.ChatType {
case ChatTypeOneToOne:
publicKey, err := chat.PublicKey()
if err != nil {
return nil, err
}
if !isPubKeyEqual(publicKey, &m.identity.PublicKey) {
id, err = m.processor.SendPrivateRaw(ctx, publicKey, spec.Payload, spec.MessageType)
if err != nil {
return nil, err
}
}
err = m.sendToPairedDevices(ctx, spec.Payload, spec.MessageType)
if err != nil {
return nil, err
}
case ChatTypePublic:
logger.Debug("sending public message", zap.String("chatName", chat.Name))
id, err = m.processor.SendPublicRaw(ctx, chat.ID, spec.Payload, spec.MessageType)
if err != nil {
return nil, err
}
case ChatTypePrivateGroupChat:
logger.Debug("sending group message", zap.String("chatName", chat.Name))
if spec.Recipients == nil {
spec.Recipients, err = chat.MembersAsPublicKeys()
if err != nil {
return nil, err
}
}
hasPairedDevices := m.hasPairedDevices()
if !hasPairedDevices {
// Filter out my key from the recipients
n := 0
for _, recipient := range spec.Recipients {
if !isPubKeyEqual(recipient, &m.identity.PublicKey) {
spec.Recipients[n] = recipient
n++
}
}
spec.Recipients = spec.Recipients[:n]
}
// We always wrap in group information
id, err = m.processor.SendGroupRaw(ctx, spec.Recipients, spec.Payload, protobuf.ApplicationMetadataMessage_MEMBERSHIP_UPDATE_MESSAGE)
if err != nil {
return nil, err
}
default:
return nil, errors.New("chat type not supported")
}
spec.ID = types.EncodeHex(id)
spec.SendCount++
err = m.persistence.SaveRawMessage(spec)
if err != nil {
return nil, err
}
return id, nil
}
// SendChatMessage takes a minimal message and sends it based on the corresponding chat
func (m *Messenger) SendChatMessage(ctx context.Context, message *Message) (*MessengerResponse, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
logger := m.logger.With(zap.String("site", "Send"), zap.String("chatID", message.ChatId))
var response MessengerResponse
// A valid added chat is required.
chat, ok := m.allChats[message.ChatId]
if !ok {
return nil, errors.New("Chat not found")
}
err := extendMessageFromChat(message, chat, &m.identity.PublicKey, m.getTimesource())
if err != nil {
return nil, err
}
var encodedMessage []byte
switch chat.ChatType {
case ChatTypeOneToOne:
logger.Debug("sending private message")
message.MessageType = protobuf.ChatMessage_ONE_TO_ONE
encodedMessage, err = proto.Marshal(message)
if err != nil {
return nil, err
}
case ChatTypePublic:
logger.Debug("sending public message", zap.String("chatName", chat.Name))
message.MessageType = protobuf.ChatMessage_PUBLIC_GROUP
encodedMessage, err = proto.Marshal(message)
if err != nil {
return nil, err
}
case ChatTypePrivateGroupChat:
message.MessageType = protobuf.ChatMessage_PRIVATE_GROUP
logger.Debug("sending group message", zap.String("chatName", chat.Name))
group, err := newProtocolGroupFromChat(chat)
if err != nil {
return nil, err
}
encodedMessage, err = m.processor.EncodeMembershipUpdate(group, &message.ChatMessage)
if err != nil {
return nil, err
}
default:
return nil, errors.New("chat type not supported")
}
id, err := m.dispatchMessage(ctx, &RawMessage{
LocalChatID: chat.ID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_CHAT_MESSAGE,
})
if err != nil {
return nil, err
}
message.ID = types.EncodeHex(id)
err = message.PrepareContent()
if err != nil {
return nil, err
}
err = chat.UpdateFromMessage(message, m.getTimesource())
if err != nil {
return nil, err
}
err = m.persistence.SaveMessagesLegacy([]*Message{message})
if err != nil {
return nil, err
}
response.Messages, err = m.pullMessagesAndResponsesFromDB([]*Message{message})
if err != nil {
return nil, err
}
response.Chats = []*Chat{chat}
return &response, m.saveChat(chat)
}
// Send contact updates to all contacts added by us
func (m *Messenger) SendContactUpdates(ctx context.Context, ensName, profileImage string) error {
m.mutex.Lock()
defer m.mutex.Unlock()
myID := contactIDFromPublicKey(&m.identity.PublicKey)
if _, err := m.sendContactUpdate(ctx, myID, ensName, profileImage); err != nil {
return err
}
// TODO: This should not be sending paired messages, as we do it above
for _, contact := range m.allContacts {
if contact.IsAdded() {
if _, err := m.sendContactUpdate(ctx, contact.ID, ensName, profileImage); err != nil {
return err
}
}
}
return nil
}
// SendContactUpdate sends a contact update to a user and adds the user to contacts
func (m *Messenger) SendContactUpdate(ctx context.Context, chatID, ensName, profileImage string) (*MessengerResponse, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.sendContactUpdate(ctx, chatID, ensName, profileImage)
}
func (m *Messenger) sendContactUpdate(ctx context.Context, chatID, ensName, profileImage string) (*MessengerResponse, error) {
var response MessengerResponse
contact, ok := m.allContacts[chatID]
if !ok {
pubkeyBytes, err := types.DecodeHex(chatID)
if err != nil {
return nil, err
}
publicKey, err := crypto.UnmarshalPubkey(pubkeyBytes)
if err != nil {
return nil, err
}
contact, err = buildContact(publicKey)
if err != nil {
return nil, err
}
}
chat, ok := m.allChats[chatID]
if !ok {
publicKey, err := contact.PublicKey()
if err != nil {
return nil, err
}
chat = OneToOneFromPublicKey(publicKey, m.getTimesource())
// We don't want to show the chat to the user
chat.Active = false
}
m.allChats[chat.ID] = chat
clock, _ := chat.NextClockAndTimestamp(m.getTimesource())
contactUpdate := &protobuf.ContactUpdate{
Clock: clock,
EnsName: ensName,
ProfileImage: profileImage}
encodedMessage, err := proto.Marshal(contactUpdate)
if err != nil {
return nil, err
}
_, err = m.dispatchMessage(ctx, &RawMessage{
LocalChatID: chatID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_CONTACT_UPDATE,
ResendAutomatically: true,
})
if err != nil {
return nil, err
}
if !contact.IsAdded() && contact.ID != contactIDFromPublicKey(&m.identity.PublicKey) {
contact.SystemTags = append(contact.SystemTags, contactAdded)
}
response.Contacts = []*Contact{contact}
response.Chats = []*Chat{chat}
chat.LastClockValue = clock
err = m.saveChat(chat)
if err != nil {
return nil, err
}
return &response, m.saveContact(contact)
}
// SyncDevices sends all public chats and contacts to paired devices
func (m *Messenger) SyncDevices(ctx context.Context, ensName, photoPath string) error {
m.mutex.Lock()
defer m.mutex.Unlock()
myID := contactIDFromPublicKey(&m.identity.PublicKey)
if _, err := m.sendContactUpdate(ctx, myID, ensName, photoPath); err != nil {
return err
}
for _, chat := range m.allChats {
if chat.Public() && chat.Active {
if err := m.syncPublicChat(ctx, chat); err != nil {
return err
}
}
}
for _, contact := range m.allContacts {
if contact.IsAdded() && contact.ID != myID {
if err := m.syncContact(ctx, contact); err != nil {
return err
}
}
}
return nil
}
// SendPairInstallation sends a pair installation message
func (m *Messenger) SendPairInstallation(ctx context.Context) (*MessengerResponse, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
var err error
var response MessengerResponse
installation, ok := m.allInstallations[m.installationID]
if !ok {
return nil, errors.New("no installation found")
}
if installation.InstallationMetadata == nil {
return nil, errors.New("no installation metadata")
}
chatID := contactIDFromPublicKey(&m.identity.PublicKey)
chat, ok := m.allChats[chatID]
if !ok {
chat = OneToOneFromPublicKey(&m.identity.PublicKey, m.getTimesource())
// We don't want to show the chat to the user
chat.Active = false
}
m.allChats[chat.ID] = chat
clock, _ := chat.NextClockAndTimestamp(m.getTimesource())
pairMessage := &protobuf.PairInstallation{
Clock: clock,
Name: installation.InstallationMetadata.Name,
InstallationId: installation.ID,
DeviceType: installation.InstallationMetadata.DeviceType}
encodedMessage, err := proto.Marshal(pairMessage)
if err != nil {
return nil, err
}
_, err = m.dispatchPairInstallationMessage(ctx, &RawMessage{
LocalChatID: chatID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_PAIR_INSTALLATION,
ResendAutomatically: true,
})
if err != nil {
return nil, err
}
response.Chats = []*Chat{chat}
chat.LastClockValue = clock
err = m.saveChat(chat)
if err != nil {
return nil, err
}
return &response, nil
}
// syncPublicChat sync a public chat with paired devices
func (m *Messenger) syncPublicChat(ctx context.Context, publicChat *Chat) error {
var err error
if !m.hasPairedDevices() {
return nil
}
chatID := contactIDFromPublicKey(&m.identity.PublicKey)
chat, ok := m.allChats[chatID]
if !ok {
chat = OneToOneFromPublicKey(&m.identity.PublicKey, m.getTimesource())
// We don't want to show the chat to the user
chat.Active = false
}
m.allChats[chat.ID] = chat
clock, _ := chat.NextClockAndTimestamp(m.getTimesource())
syncMessage := &protobuf.SyncInstallationPublicChat{
Clock: clock,
Id: publicChat.ID,
}
encodedMessage, err := proto.Marshal(syncMessage)
if err != nil {
return err
}
_, err = m.dispatchMessage(ctx, &RawMessage{
LocalChatID: chatID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_SYNC_INSTALLATION_PUBLIC_CHAT,
ResendAutomatically: true,
})
if err != nil {
return err
}
chat.LastClockValue = clock
return m.saveChat(chat)
}
// syncContact sync as contact with paired devices
func (m *Messenger) syncContact(ctx context.Context, contact *Contact) error {
var err error
if !m.hasPairedDevices() {
return nil
}
chatID := contactIDFromPublicKey(&m.identity.PublicKey)
chat, ok := m.allChats[chatID]
if !ok {
chat = OneToOneFromPublicKey(&m.identity.PublicKey, m.getTimesource())
// We don't want to show the chat to the user
chat.Active = false
}
m.allChats[chat.ID] = chat
clock, _ := chat.NextClockAndTimestamp(m.getTimesource())
syncMessage := &protobuf.SyncInstallationContact{
Clock: clock,
Id: contact.ID,
EnsName: contact.Name,
ProfileImage: contact.Photo,
}
encodedMessage, err := proto.Marshal(syncMessage)
if err != nil {
return err
}
_, err = m.dispatchMessage(ctx, &RawMessage{
LocalChatID: chatID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_SYNC_INSTALLATION_CONTACT,
ResendAutomatically: true,
})
if err != nil {
return err
}
chat.LastClockValue = clock
return m.saveChat(chat)
}
// RetrieveAll retrieves messages from all filters, processes them and returns a
// MessengerResponse to the client
func (m *Messenger) RetrieveAll() (*MessengerResponse, error) {
chatWithMessages, err := m.transport.RetrieveRawAll()
if err != nil {
return nil, err
}
return m.handleRetrievedMessages(chatWithMessages)
}
type CurrentMessageState struct {
// Message is the protobuf message received
Message protobuf.ChatMessage
// MessageID is the ID of the message
MessageID string
// WhisperTimestamp is the whisper timestamp of the message
WhisperTimestamp uint64
// Contact is the contact associated with the author of the message
Contact *Contact
// PublicKey is the public key of the author of the message
PublicKey *ecdsa.PublicKey
}
type ReceivedMessageState struct {
// State on the message being processed
CurrentMessageState *CurrentMessageState
// AllChats in memory
AllChats map[string]*Chat
// List of chats modified
ModifiedChats map[string]bool
// All contacts in memory
AllContacts map[string]*Contact
// List of contacts modified
ModifiedContacts map[string]bool
// All installations in memory
AllInstallations map[string]*multidevice.Installation
// List of installations modified
ModifiedInstallations map[string]bool
// Map of existing messages
ExistingMessagesMap map[string]bool
// Response to the client
Response *MessengerResponse
// Timesource is a time source for clock values/timestamps.
Timesource TimeSource
}
func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filter][]*types.Message) (*MessengerResponse, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
messageState := &ReceivedMessageState{
AllChats: m.allChats,
ModifiedChats: make(map[string]bool),
AllContacts: m.allContacts,
ModifiedContacts: make(map[string]bool),
AllInstallations: m.allInstallations,
ModifiedInstallations: m.modifiedInstallations,
ExistingMessagesMap: make(map[string]bool),
Response: &MessengerResponse{},
Timesource: m.getTimesource(),
}
logger := m.logger.With(zap.String("site", "RetrieveAll"))
for _, messages := range chatWithMessages {
for _, shhMessage := range messages {
// TODO: fix this to use an exported method.
statusMessages, err := m.processor.handleMessages(shhMessage, true)
if err != nil {
logger.Info("failed to decode messages", zap.Error(err))
continue
}
logger.Debug("processing messages further", zap.Int("count", len(statusMessages)))
for _, msg := range statusMessages {
publicKey := msg.SigPubKey()
// Check for messages from blocked users
senderID := contactIDFromPublicKey(publicKey)
if _, ok := messageState.AllContacts[senderID]; ok && messageState.AllContacts[senderID].IsBlocked() {
continue
}
// Don't process duplicates
messageID := types.EncodeHex(msg.ID)
exists, err := m.handler.messageExists(messageID, messageState.ExistingMessagesMap)
if err != nil {
logger.Warn("failed to check message exists", zap.Error(err))
}
if exists {
continue
}
var contact *Contact
if c, ok := messageState.AllContacts[senderID]; ok {
contact = c
} else {
c, err := buildContact(publicKey)
if err != nil {
logger.Info("failed to build contact", zap.Error(err))
continue
}
contact = c
messageState.AllContacts[senderID] = c
messageState.ModifiedContacts[contact.ID] = true
}
messageState.CurrentMessageState = &CurrentMessageState{
MessageID: messageID,
WhisperTimestamp: uint64(msg.TransportMessage.Timestamp) * 1000,
Contact: contact,
PublicKey: publicKey,
}
if msg.ParsedMessage != nil {
logger.Debug("Handling parsed message")
switch msg.ParsedMessage.(type) {
case protobuf.MembershipUpdateMessage:
logger.Debug("Handling MembershipUpdateMessage")
rawMembershipUpdate := msg.ParsedMessage.(protobuf.MembershipUpdateMessage)
err = m.handler.HandleMembershipUpdate(messageState, messageState.AllChats[rawMembershipUpdate.ChatId], rawMembershipUpdate, m.systemMessagesTranslations)
if err != nil {
logger.Warn("failed to handle MembershipUpdate", zap.Error(err))
continue
}
case protobuf.ChatMessage:
logger.Debug("Handling ChatMessage")
messageState.CurrentMessageState.Message = msg.ParsedMessage.(protobuf.ChatMessage)
err = m.handler.HandleChatMessage(messageState)
if err != nil {
logger.Warn("failed to handle ChatMessage", zap.Error(err))
continue
}
case protobuf.PairInstallation:
if !isPubKeyEqual(messageState.CurrentMessageState.PublicKey, &m.identity.PublicKey) {
logger.Warn("not coming from us, ignoring")
continue
}
p := msg.ParsedMessage.(protobuf.PairInstallation)
logger.Debug("Handling PairInstallation", zap.Any("message", p))
err = m.handler.HandlePairInstallation(messageState, p)
if err != nil {
logger.Warn("failed to handle PairInstallation", zap.Error(err))
continue
}
case protobuf.SyncInstallationContact:
if !isPubKeyEqual(messageState.CurrentMessageState.PublicKey, &m.identity.PublicKey) {
logger.Warn("not coming from us, ignoring")
continue
}
p := msg.ParsedMessage.(protobuf.SyncInstallationContact)
logger.Debug("Handling SyncInstallationContact", zap.Any("message", p))
err = m.handler.HandleSyncInstallationContact(messageState, p)
if err != nil {
logger.Warn("failed to handle SyncInstallationContact", zap.Error(err))
continue
}
case protobuf.SyncInstallationPublicChat:
if !isPubKeyEqual(messageState.CurrentMessageState.PublicKey, &m.identity.PublicKey) {
logger.Warn("not coming from us, ignoring")
continue
}
p := msg.ParsedMessage.(protobuf.SyncInstallationPublicChat)
logger.Debug("Handling SyncInstallationPublicChat", zap.Any("message", p))
err = m.handler.HandleSyncInstallationPublicChat(messageState, p)
if err != nil {
logger.Warn("failed to handle SyncInstallationPublicChat", zap.Error(err))
continue
}
case protobuf.RequestAddressForTransaction:
command := msg.ParsedMessage.(protobuf.RequestAddressForTransaction)
logger.Debug("Handling RequestAddressForTransaction", zap.Any("message", command))
err = m.handler.HandleRequestAddressForTransaction(messageState, command)
if err != nil {
logger.Warn("failed to handle RequestAddressForTransaction", zap.Error(err))
continue
}
case protobuf.SendTransaction:
command := msg.ParsedMessage.(protobuf.SendTransaction)
logger.Debug("Handling SendTransaction", zap.Any("message", command))
err = m.handler.HandleSendTransaction(messageState, command)
if err != nil {
logger.Warn("failed to handle SendTransaction", zap.Error(err))
continue
}
case protobuf.AcceptRequestAddressForTransaction:
command := msg.ParsedMessage.(protobuf.AcceptRequestAddressForTransaction)
logger.Debug("Handling AcceptRequestAddressForTransaction")
err = m.handler.HandleAcceptRequestAddressForTransaction(messageState, command)
if err != nil {
logger.Warn("failed to handle AcceptRequestAddressForTransaction", zap.Error(err))
continue
}
case protobuf.DeclineRequestAddressForTransaction:
command := msg.ParsedMessage.(protobuf.DeclineRequestAddressForTransaction)
logger.Debug("Handling DeclineRequestAddressForTransaction")
err = m.handler.HandleDeclineRequestAddressForTransaction(messageState, command)
if err != nil {
logger.Warn("failed to handle DeclineRequestAddressForTransaction", zap.Error(err))
continue
}
case protobuf.DeclineRequestTransaction:
command := msg.ParsedMessage.(protobuf.DeclineRequestTransaction)
logger.Debug("Handling DeclineRequestTransaction")
err = m.handler.HandleDeclineRequestTransaction(messageState, command)
if err != nil {
logger.Warn("failed to handle DeclineRequestTransaction", zap.Error(err))
continue
}
case protobuf.RequestTransaction:
command := msg.ParsedMessage.(protobuf.RequestTransaction)
logger.Debug("Handling RequestTransaction")
err = m.handler.HandleRequestTransaction(messageState, command)
if err != nil {
logger.Warn("failed to handle RequestTransaction", zap.Error(err))
continue
}
case protobuf.ContactUpdate:
logger.Debug("Handling ContactUpdate")
contactUpdate := msg.ParsedMessage.(protobuf.ContactUpdate)
err = m.handler.HandleContactUpdate(messageState, contactUpdate)
if err != nil {
logger.Warn("failed to handle ContactUpdate", zap.Error(err))
continue
}
default:
logger.Debug("message not handled")
}
}
}
}
}
for id := range messageState.ModifiedChats {
messageState.Response.Chats = append(messageState.Response.Chats, messageState.AllChats[id])
}
var contactsToSave []*Contact
for id := range messageState.ModifiedContacts {
contact := messageState.AllContacts[id]
if contact != nil {
// We save all contacts so we can pull back name/image,
// but we only send to client those
// that have some custom fields
contactsToSave = append(contactsToSave, contact)
if contact.HasCustomFields() {
messageState.Response.Contacts = append(messageState.Response.Contacts, contact)
}
}
}
for id := range messageState.ModifiedInstallations {
installation := messageState.AllInstallations[id]
messageState.Response.Installations = append(messageState.Response.Installations, installation)
if installation.InstallationMetadata != nil {
err := m.setInstallationMetadata(id, installation.InstallationMetadata)
if err != nil {
return nil, err
}
}
}
var err error
if len(messageState.Response.Chats) > 0 {
err = m.saveChats(messageState.Response.Chats)
if err != nil {
return nil, err
}
}
if len(messageState.Response.Messages) > 0 {
err = m.SaveMessages(messageState.Response.Messages)
if err != nil {
return nil, err
}
}
if len(contactsToSave) > 0 {
err = m.persistence.SaveContacts(contactsToSave)
if err != nil {
return nil, err
}
}
messagesWithResponses, err := m.pullMessagesAndResponsesFromDB(messageState.Response.Messages)
if err != nil {
return nil, err
}
messageState.Response.Messages = messagesWithResponses
// Reset installations
m.modifiedInstallations = make(map[string]bool)
return messageState.Response, nil
}
func (m *Messenger) RequestHistoricMessages(
ctx context.Context,
peer []byte, // should be removed after mailserver logic is ported
from, to uint32,
cursor []byte,
) ([]byte, error) {
return m.transport.SendMessagesRequest(ctx, peer, from, to, cursor)
}
func (m *Messenger) LoadFilters(filters []*transport.Filter) ([]*transport.Filter, error) {
return m.transport.LoadFilters(filters)
}
func (m *Messenger) RemoveFilters(filters []*transport.Filter) error {
return m.transport.RemoveFilters(filters)
}
func (m *Messenger) ConfirmMessagesProcessed(messageIDs [][]byte) error {
for _, id := range messageIDs {
if err := m.encryptor.ConfirmMessageProcessed(id); err != nil {
return err
}
}
return nil
}
func (m *Messenger) MessageByID(id string) (*Message, error) {
return m.persistence.MessageByID(id)
}
func (m *Messenger) MessagesExist(ids []string) (map[string]bool, error) {
return m.persistence.MessagesExist(ids)
}
func (m *Messenger) MessageByChatID(chatID, cursor string, limit int) ([]*Message, string, error) {
return m.persistence.MessageByChatID(chatID, cursor, limit)
}
func (m *Messenger) SaveMessages(messages []*Message) error {
return m.persistence.SaveMessagesLegacy(messages)
}
func (m *Messenger) DeleteMessage(id string) error {
return m.persistence.DeleteMessage(id)
}
func (m *Messenger) DeleteMessagesByChatID(id string) error {
return m.persistence.DeleteMessagesByChatID(id)
}
// MarkMessagesSeen marks messages with `ids` as seen in the chat `chatID`.
// It returns the number of affected messages or error. If there is an error,
// the number of affected messages is always zero.
func (m *Messenger) MarkMessagesSeen(chatID string, ids []string) (uint64, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
count, err := m.persistence.MarkMessagesSeen(chatID, ids)
if err != nil {
return 0, err
}
chat, err := m.persistence.Chat(chatID)
if err != nil {
return 0, err
}
m.allChats[chatID] = chat
return count, nil
}
func (m *Messenger) MarkAllRead(chatID string) error {
m.mutex.Lock()
defer m.mutex.Unlock()
chat, ok := m.allChats[chatID]
if !ok {
return errors.New("chat not found")
}
err := m.persistence.MarkAllRead(chatID)
if err != nil {
return err
}
chat.UnviewedMessagesCount = 0
m.allChats[chat.ID] = chat
return nil
}
func (m *Messenger) UpdateMessageOutgoingStatus(id, newOutgoingStatus string) error {
return m.persistence.UpdateMessageOutgoingStatus(id, newOutgoingStatus)
}
// Identicon returns an identicon based on the input string
func Identicon(id string) (string, error) {
return identicon.GenerateBase64(id)
}
// VerifyENSNames verifies that a registered ENS name matches the expected public key
func (m *Messenger) VerifyENSNames(ctx context.Context, rpcEndpoint, contractAddress string) (*MessengerResponse, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.logger.Debug("verifying ENS Names", zap.String("endpoint", rpcEndpoint))
verifier := m.node.NewENSVerifier(m.logger)
var response MessengerResponse
var ensDetails []enstypes.ENSDetails
// Now in seconds
now := m.getTimesource().GetCurrentTime() / 1000
for _, contact := range m.allContacts {
if shouldENSBeVerified(contact, now) {
ensDetails = append(ensDetails, enstypes.ENSDetails{
PublicKeyString: contact.ID[2:],
Name: contact.Name,
})
}
}
ensResponse, err := verifier.CheckBatch(ensDetails, rpcEndpoint, contractAddress)
if err != nil {
return nil, err
}
for _, details := range ensResponse {
contact, ok := m.allContacts["0x"+details.PublicKeyString]
if !ok {
return nil, errors.New("contact must be existing")
}
m.logger.Debug("verifying ENS Name", zap.Any("details", details), zap.Any("contact", contact))
contact.ENSVerifiedAt = uint64(details.VerifiedAt)
if details.Error == nil {
contact.ENSVerified = details.Verified
m.allContacts[contact.ID] = contact
} else {
m.logger.Warn("Failed to resolve ens name",
zap.String("name", details.Name),
zap.String("publicKey", details.PublicKeyString),
zap.Error(details.Error),
)
contact.ENSVerificationRetries++
}
response.Contacts = append(response.Contacts, contact)
}
if len(response.Contacts) != 0 {
err = m.persistence.SaveContacts(response.Contacts)
if err != nil {
return nil, err
}
}
return &response, nil
}
// GenerateAlias name returns the generated name given a public key hex encoded prefixed with 0x
func GenerateAlias(id string) (string, error) {
return alias.GenerateFromPublicKeyString(id)
}
func (m *Messenger) RequestTransaction(ctx context.Context, chatID, value, contract, address string) (*MessengerResponse, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
var response MessengerResponse
// A valid added chat is required.
chat, ok := m.allChats[chatID]
if !ok {
return nil, errors.New("Chat not found")
}
if chat.ChatType != ChatTypeOneToOne {
return nil, errors.New("Need to be a one-to-one chat")
}
message := &Message{}
err := extendMessageFromChat(message, chat, &m.identity.PublicKey, m.transport)
if err != nil {
return nil, err
}
message.MessageType = protobuf.ChatMessage_ONE_TO_ONE
message.ContentType = protobuf.ChatMessage_TRANSACTION_COMMAND
message.Text = "Request transaction"
request := &protobuf.RequestTransaction{
Clock: message.Clock,
Address: address,
Value: value,
Contract: contract,
}
encodedMessage, err := proto.Marshal(request)
if err != nil {
return nil, err
}
id, err := m.dispatchMessage(ctx, &RawMessage{
LocalChatID: chat.ID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_REQUEST_TRANSACTION,
ResendAutomatically: true,
})
message.CommandParameters = &CommandParameters{
ID: types.EncodeHex(id),
Value: value,
Address: address,
Contract: contract,
CommandState: CommandStateRequestTransaction,
}
if err != nil {
return nil, err
}
messageID := types.EncodeHex(id)
message.ID = messageID
message.CommandParameters.ID = messageID
err = message.PrepareContent()
if err != nil {
return nil, err
}
err = chat.UpdateFromMessage(message, m.transport)
if err != nil {
return nil, err
}
err = m.persistence.SaveMessagesLegacy([]*Message{message})
if err != nil {
return nil, err
}
response.Chats = []*Chat{chat}
response.Messages = []*Message{message}
return &response, m.saveChat(chat)
}
func (m *Messenger) RequestAddressForTransaction(ctx context.Context, chatID, from, value, contract string) (*MessengerResponse, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
var response MessengerResponse
// A valid added chat is required.
chat, ok := m.allChats[chatID]
if !ok {
return nil, errors.New("Chat not found")
}
if chat.ChatType != ChatTypeOneToOne {
return nil, errors.New("Need to be a one-to-one chat")
}
message := &Message{}
err := extendMessageFromChat(message, chat, &m.identity.PublicKey, m.transport)
if err != nil {
return nil, err
}
message.MessageType = protobuf.ChatMessage_ONE_TO_ONE
message.ContentType = protobuf.ChatMessage_TRANSACTION_COMMAND
message.Text = "Request address for transaction"
request := &protobuf.RequestAddressForTransaction{
Clock: message.Clock,
Value: value,
Contract: contract,
}
encodedMessage, err := proto.Marshal(request)
if err != nil {
return nil, err
}
id, err := m.dispatchMessage(ctx, &RawMessage{
LocalChatID: chat.ID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_REQUEST_ADDRESS_FOR_TRANSACTION,
ResendAutomatically: true,
})
message.CommandParameters = &CommandParameters{
ID: types.EncodeHex(id),
From: from,
Value: value,
Contract: contract,
CommandState: CommandStateRequestAddressForTransaction,
}
if err != nil {
return nil, err
}
messageID := types.EncodeHex(id)
message.ID = messageID
message.CommandParameters.ID = messageID
err = message.PrepareContent()
if err != nil {
return nil, err
}
err = chat.UpdateFromMessage(message, m.transport)
if err != nil {
return nil, err
}
err = m.persistence.SaveMessagesLegacy([]*Message{message})
if err != nil {
return nil, err
}
response.Chats = []*Chat{chat}
response.Messages = []*Message{message}
return &response, m.saveChat(chat)
}
func (m *Messenger) AcceptRequestAddressForTransaction(ctx context.Context, messageID, address string) (*MessengerResponse, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
var response MessengerResponse
message, err := m.MessageByID(messageID)
if err != nil {
return nil, err
}
if message == nil {
return nil, errors.New("message not found")
}
chatID := message.LocalChatID
// A valid added chat is required.
chat, ok := m.allChats[chatID]
if !ok {
return nil, errors.New("Chat not found")
}
if chat.ChatType != ChatTypeOneToOne {
return nil, errors.New("Need to be a one-to-one chat")
}
clock, timestamp := chat.NextClockAndTimestamp(m.transport)
message.Clock = clock
message.WhisperTimestamp = timestamp
message.Timestamp = timestamp
message.Text = "Request address for transaction accepted"
message.OutgoingStatus = OutgoingStatusSending
// Hide previous message
previousMessage, err := m.persistence.MessageByCommandID(chatID, messageID)
if err != nil {
return nil, err
}
if previousMessage == nil {
return nil, errors.New("No previous message found")
}
err = m.persistence.HideMessage(previousMessage.ID)
if err != nil {
return nil, err
}
message.Replace = previousMessage.ID
request := &protobuf.AcceptRequestAddressForTransaction{
Clock: message.Clock,
Id: messageID,
Address: address,
}
encodedMessage, err := proto.Marshal(request)
if err != nil {
return nil, err
}
newMessageID, err := m.dispatchMessage(ctx, &RawMessage{
LocalChatID: chat.ID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_ACCEPT_REQUEST_ADDRESS_FOR_TRANSACTION,
ResendAutomatically: true,
})
if err != nil {
return nil, err
}
message.ID = types.EncodeHex(newMessageID)
message.CommandParameters.Address = address
message.CommandParameters.CommandState = CommandStateRequestAddressForTransactionAccepted
err = message.PrepareContent()
if err != nil {
return nil, err
}
err = chat.UpdateFromMessage(message, m.transport)
if err != nil {
return nil, err
}
err = m.persistence.SaveMessagesLegacy([]*Message{message})
if err != nil {
return nil, err
}
response.Chats = []*Chat{chat}
response.Messages = []*Message{message}
return &response, m.saveChat(chat)
}
func (m *Messenger) DeclineRequestTransaction(ctx context.Context, messageID string) (*MessengerResponse, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
var response MessengerResponse
message, err := m.MessageByID(messageID)
if err != nil {
return nil, err
}
if message == nil {
return nil, errors.New("message not found")
}
chatID := message.LocalChatID
// A valid added chat is required.
chat, ok := m.allChats[chatID]
if !ok {
return nil, errors.New("Chat not found")
}
if chat.ChatType != ChatTypeOneToOne {
return nil, errors.New("Need to be a one-to-one chat")
}
clock, timestamp := chat.NextClockAndTimestamp(m.transport)
message.Clock = clock
message.WhisperTimestamp = timestamp
message.Timestamp = timestamp
message.Text = "Transaction request declined"
message.OutgoingStatus = OutgoingStatusSending
message.Replace = messageID
err = m.persistence.HideMessage(messageID)
if err != nil {
return nil, err
}
request := &protobuf.DeclineRequestTransaction{
Clock: message.Clock,
Id: messageID,
}
encodedMessage, err := proto.Marshal(request)
if err != nil {
return nil, err
}
newMessageID, err := m.dispatchMessage(ctx, &RawMessage{
LocalChatID: chat.ID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_DECLINE_REQUEST_TRANSACTION,
ResendAutomatically: true,
})
if err != nil {
return nil, err
}
message.ID = types.EncodeHex(newMessageID)
message.CommandParameters.CommandState = CommandStateRequestTransactionDeclined
err = message.PrepareContent()
if err != nil {
return nil, err
}
err = chat.UpdateFromMessage(message, m.transport)
if err != nil {
return nil, err
}
err = m.persistence.SaveMessagesLegacy([]*Message{message})
if err != nil {
return nil, err
}
response.Chats = []*Chat{chat}
response.Messages = []*Message{message}
return &response, m.saveChat(chat)
}
func (m *Messenger) DeclineRequestAddressForTransaction(ctx context.Context, messageID string) (*MessengerResponse, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
var response MessengerResponse
message, err := m.MessageByID(messageID)
if err != nil {
return nil, err
}
if message == nil {
return nil, errors.New("message not found")
}
chatID := message.LocalChatID
// A valid added chat is required.
chat, ok := m.allChats[chatID]
if !ok {
return nil, errors.New("Chat not found")
}
if chat.ChatType != ChatTypeOneToOne {
return nil, errors.New("Need to be a one-to-one chat")
}
clock, timestamp := chat.NextClockAndTimestamp(m.transport)
message.Clock = clock
message.WhisperTimestamp = timestamp
message.Timestamp = timestamp
message.Text = "Request address for transaction declined"
message.OutgoingStatus = OutgoingStatusSending
message.Replace = messageID
err = m.persistence.HideMessage(messageID)
if err != nil {
return nil, err
}
request := &protobuf.DeclineRequestAddressForTransaction{
Clock: message.Clock,
Id: messageID,
}
encodedMessage, err := proto.Marshal(request)
if err != nil {
return nil, err
}
newMessageID, err := m.dispatchMessage(ctx, &RawMessage{
LocalChatID: chat.ID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_DECLINE_REQUEST_ADDRESS_FOR_TRANSACTION,
ResendAutomatically: true,
})
if err != nil {
return nil, err
}
message.ID = types.EncodeHex(newMessageID)
message.CommandParameters.CommandState = CommandStateRequestAddressForTransactionDeclined
err = message.PrepareContent()
if err != nil {
return nil, err
}
err = chat.UpdateFromMessage(message, m.transport)
if err != nil {
return nil, err
}
err = m.persistence.SaveMessagesLegacy([]*Message{message})
if err != nil {
return nil, err
}
response.Chats = []*Chat{chat}
response.Messages = []*Message{message}
return &response, m.saveChat(chat)
}
func (m *Messenger) AcceptRequestTransaction(ctx context.Context, transactionHash, messageID string, signature []byte) (*MessengerResponse, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
var response MessengerResponse
message, err := m.MessageByID(messageID)
if err != nil {
return nil, err
}
if message == nil {
return nil, errors.New("message not found")
}
chatID := message.LocalChatID
// A valid added chat is required.
chat, ok := m.allChats[chatID]
if !ok {
return nil, errors.New("Chat not found")
}
if chat.ChatType != ChatTypeOneToOne {
return nil, errors.New("Need to be a one-to-one chat")
}
clock, timestamp := chat.NextClockAndTimestamp(m.transport)
message.Clock = clock
message.WhisperTimestamp = timestamp
message.Timestamp = timestamp
message.Text = transactionSentTxt
message.OutgoingStatus = OutgoingStatusSending
// Hide previous message
previousMessage, err := m.persistence.MessageByCommandID(chatID, messageID)
if err != nil && err != errRecordNotFound {
return nil, err
}
if previousMessage != nil {
err = m.persistence.HideMessage(previousMessage.ID)
if err != nil {
return nil, err
}
message.Replace = previousMessage.ID
}
err = m.persistence.HideMessage(messageID)
if err != nil {
return nil, err
}
request := &protobuf.SendTransaction{
Clock: message.Clock,
Id: messageID,
TransactionHash: transactionHash,
Signature: signature,
}
encodedMessage, err := proto.Marshal(request)
if err != nil {
return nil, err
}
newMessageID, err := m.dispatchMessage(ctx, &RawMessage{
LocalChatID: chat.ID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_SEND_TRANSACTION,
ResendAutomatically: true,
})
if err != nil {
return nil, err
}
message.ID = types.EncodeHex(newMessageID)
message.CommandParameters.TransactionHash = transactionHash
message.CommandParameters.Signature = signature
message.CommandParameters.CommandState = CommandStateTransactionSent
err = message.PrepareContent()
if err != nil {
return nil, err
}
err = chat.UpdateFromMessage(message, m.transport)
if err != nil {
return nil, err
}
err = m.persistence.SaveMessagesLegacy([]*Message{message})
if err != nil {
return nil, err
}
response.Chats = []*Chat{chat}
response.Messages = []*Message{message}
return &response, m.saveChat(chat)
}
func (m *Messenger) SendTransaction(ctx context.Context, chatID, value, contract, transactionHash string, signature []byte) (*MessengerResponse, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
var response MessengerResponse
// A valid added chat is required.
chat, ok := m.allChats[chatID]
if !ok {
return nil, errors.New("Chat not found")
}
if chat.ChatType != ChatTypeOneToOne {
return nil, errors.New("Need to be a one-to-one chat")
}
message := &Message{}
err := extendMessageFromChat(message, chat, &m.identity.PublicKey, m.transport)
if err != nil {
return nil, err
}
message.MessageType = protobuf.ChatMessage_ONE_TO_ONE
message.ContentType = protobuf.ChatMessage_TRANSACTION_COMMAND
message.LocalChatID = chatID
clock, timestamp := chat.NextClockAndTimestamp(m.transport)
message.Clock = clock
message.WhisperTimestamp = timestamp
message.Timestamp = timestamp
message.Text = transactionSentTxt
request := &protobuf.SendTransaction{
Clock: message.Clock,
TransactionHash: transactionHash,
Signature: signature,
}
encodedMessage, err := proto.Marshal(request)
if err != nil {
return nil, err
}
newMessageID, err := m.dispatchMessage(ctx, &RawMessage{
LocalChatID: chat.ID,
Payload: encodedMessage,
MessageType: protobuf.ApplicationMetadataMessage_SEND_TRANSACTION,
ResendAutomatically: true,
})
if err != nil {
return nil, err
}
message.ID = types.EncodeHex(newMessageID)
message.CommandParameters = &CommandParameters{
TransactionHash: transactionHash,
Value: value,
Contract: contract,
Signature: signature,
CommandState: CommandStateTransactionSent,
}
err = message.PrepareContent()
if err != nil {
return nil, err
}
err = chat.UpdateFromMessage(message, m.transport)
if err != nil {
return nil, err
}
err = m.persistence.SaveMessagesLegacy([]*Message{message})
if err != nil {
return nil, err
}
response.Chats = []*Chat{chat}
response.Messages = []*Message{message}
return &response, m.saveChat(chat)
}
func (m *Messenger) ValidateTransactions(ctx context.Context, addresses []types.Address) (*MessengerResponse, error) {
if m.verifyTransactionClient == nil {
return nil, nil
}
m.mutex.Lock()
defer m.mutex.Unlock()
modifiedChats := make(map[string]bool)
logger := m.logger.With(zap.String("site", "ValidateTransactions"))
logger.Debug("Validating transactions")
txs, err := m.persistence.TransactionsToValidate()
if err != nil {
logger.Error("Error pulling", zap.Error(err))
return nil, err
}
logger.Debug("Txs", zap.Int("count", len(txs)), zap.Any("txs", txs))
var response MessengerResponse
validator := NewTransactionValidator(addresses, m.persistence, m.verifyTransactionClient, m.logger)
responses, err := validator.ValidateTransactions(ctx)
if err != nil {
logger.Error("Error validating", zap.Error(err))
return nil, err
}
for _, validationResult := range responses {
var message *Message
chatID := contactIDFromPublicKey(validationResult.Transaction.From)
chat, ok := m.allChats[chatID]
if !ok {
chat = OneToOneFromPublicKey(validationResult.Transaction.From, m.transport)
}
if validationResult.Message != nil {
message = validationResult.Message
} else {
message = &Message{}
err := extendMessageFromChat(message, chat, &m.identity.PublicKey, m.transport)
if err != nil {
return nil, err
}
}
message.MessageType = protobuf.ChatMessage_ONE_TO_ONE
message.ContentType = protobuf.ChatMessage_TRANSACTION_COMMAND
message.LocalChatID = chatID
message.OutgoingStatus = ""
clock, timestamp := chat.NextClockAndTimestamp(m.transport)
message.Clock = clock
message.Timestamp = timestamp
message.WhisperTimestamp = timestamp
message.Text = "Transaction received"
message.Seen = false
message.ID = validationResult.Transaction.MessageID
if message.CommandParameters == nil {
message.CommandParameters = &CommandParameters{}
} else {
message.CommandParameters = validationResult.Message.CommandParameters
}
message.CommandParameters.Value = validationResult.Value
message.CommandParameters.Contract = validationResult.Contract
message.CommandParameters.Address = validationResult.Address
message.CommandParameters.CommandState = CommandStateTransactionSent
message.CommandParameters.TransactionHash = validationResult.Transaction.TransactionHash
err = message.PrepareContent()
if err != nil {
return nil, err
}
err = chat.UpdateFromMessage(message, m.transport)
if err != nil {
return nil, err
}
if len(message.CommandParameters.ID) != 0 {
// Hide previous message
previousMessage, err := m.persistence.MessageByCommandID(chatID, message.CommandParameters.ID)
if err != nil && err != errRecordNotFound {
return nil, err
}
if previousMessage != nil {
err = m.persistence.HideMessage(previousMessage.ID)
if err != nil {
return nil, err
}
message.Replace = previousMessage.ID
}
}
response.Messages = append(response.Messages, message)
m.allChats[chat.ID] = chat
modifiedChats[chat.ID] = true
}
for id := range modifiedChats {
response.Chats = append(response.Chats, m.allChats[id])
}
if len(response.Messages) > 0 {
err = m.SaveMessages(response.Messages)
if err != nil {
return nil, err
}
}
return &response, nil
}
// pullMessagesAndResponsesFromDB pulls all the messages and the one that have
// been replied to from the database
func (m *Messenger) pullMessagesAndResponsesFromDB(messages []*Message) ([]*Message, error) {
var messageIDs []string
for _, message := range messages {
messageIDs = append(messageIDs, message.ID)
if len(message.ResponseTo) != 0 {
messageIDs = append(messageIDs, message.ResponseTo)
}
}
// We pull from the database all the messages & replies involved,
// so we let the db build the correct messages
return m.persistence.MessagesByIDs(messageIDs)
}
func (m *Messenger) getTimesource() TimeSource {
return m.transport
}
func (m *Messenger) Timesource() TimeSource {
return m.getTimesource()
}