package common import ( "bytes" "context" "crypto/ecdsa" "database/sql" "math" "sync" "time" "github.com/golang/protobuf/proto" "github.com/jinzhu/copier" "github.com/pkg/errors" datasyncnode "github.com/vacp2p/mvds/node" datasyncproto "github.com/vacp2p/mvds/protobuf" "go.uber.org/zap" "github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/protocol/datasync" datasyncpeer "github.com/status-im/status-go/protocol/datasync/peer" "github.com/status-im/status-go/protocol/encryption" "github.com/status-im/status-go/protocol/encryption/sharedsecret" "github.com/status-im/status-go/protocol/protobuf" "github.com/status-im/status-go/protocol/transport" v1protocol "github.com/status-im/status-go/protocol/v1" ) // Whisper message properties. const ( whisperTTL = 15 whisperDefaultPoW = 0.002 // whisperLargeSizePoW is the PoWTarget for larger payload sizes whisperLargeSizePoW = 0.000002 // largeSizeInBytes is when should we be using a lower POW. // Roughly this is 50KB largeSizeInBytes = 50000 whisperPoWTime = 5 ) // RekeyCompatibility indicates whether we should be sending // keys in 1-to-1 messages as well as in the newer format var RekeyCompatibility = true // SentMessage reprent a message that has been passed to the transport layer type SentMessage struct { PublicKey *ecdsa.PublicKey Spec *encryption.ProtocolMessageSpec MessageIDs [][]byte } type MessageEventType uint32 const ( MessageScheduled = iota + 1 MessageSent ) type MessageEvent struct { Recipient *ecdsa.PublicKey Type MessageEventType SentMessage *SentMessage RawMessage *RawMessage } type MessageSender struct { identity *ecdsa.PrivateKey datasync *datasync.DataSync database *sql.DB protocol *encryption.Protocol transport *transport.Transport logger *zap.Logger persistence *RawMessagesPersistence datasyncEnabled bool // ephemeralKeys is a map that contains the ephemeral keys of the client, used // to decrypt messages ephemeralKeys map[string]*ecdsa.PrivateKey ephemeralKeysMutex sync.Mutex // messageEventsSubscriptions contains all the subscriptions for message events messageEventsSubscriptions []chan<- *MessageEvent featureFlags FeatureFlags // handleSharedSecrets is a callback that is called every time a new shared secret is negotiated handleSharedSecrets func([]*sharedsecret.Secret) error } func NewMessageSender( identity *ecdsa.PrivateKey, database *sql.DB, enc *encryption.Protocol, transport *transport.Transport, logger *zap.Logger, features FeatureFlags, ) (*MessageSender, error) { p := &MessageSender{ identity: identity, datasyncEnabled: features.Datasync, protocol: enc, database: database, persistence: NewRawMessagesPersistence(database), transport: transport, logger: logger, ephemeralKeys: make(map[string]*ecdsa.PrivateKey), featureFlags: features, } // Initializing DataSync is required to encrypt and send messages. // With DataSync enabled, messages are added to the DataSync // but actual encrypt and send calls are postponed. // sendDataSync is responsible for encrypting and sending postponed messages. if p.datasyncEnabled { err := p.StartDatasync() if err != nil { return nil, err } } return p, nil } func (s *MessageSender) Stop() { for _, c := range s.messageEventsSubscriptions { close(c) } s.messageEventsSubscriptions = nil s.StopDatasync() } func (s *MessageSender) SetHandleSharedSecrets(handler func([]*sharedsecret.Secret) error) { s.handleSharedSecrets = handler } // SendPrivate takes encoded data, encrypts it and sends through the wire. func (s *MessageSender) SendPrivate( ctx context.Context, recipient *ecdsa.PublicKey, rawMessage *RawMessage, ) ([]byte, error) { s.logger.Debug( "sending a private message", zap.String("public-key", types.EncodeHex(crypto.FromECDSAPub(recipient))), zap.String("site", "SendPrivate"), ) // Currently we don't support sending through datasync and setting custom waku fields, // as the datasync interface is not rich enough to propagate that information, so we // would have to add some complexity to handle this. if rawMessage.ResendAutomatically && (rawMessage.Sender != nil || rawMessage.SkipEncryptionLayer || rawMessage.SendOnPersonalTopic) { return nil, errors.New("setting identity, skip-encryption or personal topic and datasync not supported") } // Set sender identity if not specified if rawMessage.Sender == nil { rawMessage.Sender = s.identity } return s.sendPrivate(ctx, recipient, rawMessage) } // SendCommunityMessage takes encoded data, encrypts it and sends through the wire // using the community topic and their key func (s *MessageSender) SendCommunityMessage( ctx context.Context, rawMessage RawMessage, ) ([]byte, error) { s.logger.Debug( "sending a community message", zap.String("communityId", types.EncodeHex(rawMessage.CommunityID)), zap.String("site", "SendCommunityMessage"), ) rawMessage.Sender = s.identity return s.sendCommunity(ctx, &rawMessage) } // SendPubsubTopicKey sends the protected topic key for a community to a list of recipients func (s *MessageSender) SendPubsubTopicKey( ctx context.Context, rawMessage *RawMessage, ) ([]byte, error) { s.logger.Debug( "sending the protected topic key for a community", zap.String("communityId", types.EncodeHex(rawMessage.CommunityID)), zap.String("site", "SendPubsubTopicKey"), ) rawMessage.Sender = s.identity messageID, err := s.getMessageID(rawMessage) if err != nil { return nil, err } rawMessage.ID = types.EncodeHex(messageID) // Notify before dispatching, otherwise the dispatch subscription might happen // earlier than the scheduled s.notifyOnScheduledMessage(nil, rawMessage) // Send to each recipients for _, recipient := range rawMessage.Recipients { _, err = s.sendPrivate(ctx, recipient, rawMessage) if err != nil { return nil, errors.Wrap(err, "failed to send message") } } return messageID, nil } // SendGroup takes encoded data, encrypts it and sends through the wire, // always return the messageID func (s *MessageSender) SendGroup( ctx context.Context, recipients []*ecdsa.PublicKey, rawMessage RawMessage, ) ([]byte, error) { s.logger.Debug( "sending a private group message", zap.String("site", "SendGroup"), ) // Set sender if not specified if rawMessage.Sender == nil { rawMessage.Sender = s.identity } // Calculate messageID first and set on raw message wrappedMessage, err := s.wrapMessageV1(&rawMessage) if err != nil { return nil, errors.Wrap(err, "failed to wrap message") } messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage) rawMessage.ID = types.EncodeHex(messageID) // We call it only once, and we nil the function after so it doesn't get called again if rawMessage.BeforeDispatch != nil { if err := rawMessage.BeforeDispatch(&rawMessage); err != nil { return nil, err } } rawMessage.BeforeDispatch = nil // Send to each recipients for _, recipient := range recipients { _, err = s.sendPrivate(ctx, recipient, &rawMessage) if err != nil { return nil, errors.Wrap(err, "failed to send message") } } return messageID, nil } func (s *MessageSender) getMessageID(rawMessage *RawMessage) (types.HexBytes, error) { wrappedMessage, err := s.wrapMessageV1(rawMessage) if err != nil { return nil, errors.Wrap(err, "failed to wrap message") } messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage) return messageID, nil } func ShouldCommunityMessageBeEncrypted(msgType protobuf.ApplicationMetadataMessage_Type) bool { return msgType == protobuf.ApplicationMetadataMessage_CHAT_MESSAGE || msgType == protobuf.ApplicationMetadataMessage_EDIT_MESSAGE || msgType == protobuf.ApplicationMetadataMessage_DELETE_MESSAGE || msgType == protobuf.ApplicationMetadataMessage_PIN_MESSAGE || msgType == protobuf.ApplicationMetadataMessage_EMOJI_REACTION } // sendCommunity sends a message that's to be sent in a community // If it's a chat message, it will go to the respective topic derived by the // chat id, if it's not a chat message, it will go to the community topic. func (s *MessageSender) sendCommunity( ctx context.Context, rawMessage *RawMessage, ) ([]byte, error) { s.logger.Debug("sending community message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(&rawMessage.Sender.PublicKey)))) // Set sender if rawMessage.Sender == nil { rawMessage.Sender = s.identity } messageID, err := s.getMessageID(rawMessage) if err != nil { return nil, err } rawMessage.ID = types.EncodeHex(messageID) messageIDs := [][]byte{messageID} if rawMessage.BeforeDispatch != nil { if err := rawMessage.BeforeDispatch(rawMessage); err != nil { return nil, err } } // Notify before dispatching, otherwise the dispatch subscription might happen // earlier than the scheduled s.notifyOnScheduledMessage(nil, rawMessage) var hashes [][]byte var newMessages []*types.NewMessage // Check if it's a key exchange message. In this case we send it // to all the recipients if rawMessage.CommunityKeyExMsgType != KeyExMsgNone { forceRekey := rawMessage.CommunityKeyExMsgType == KeyExMsgRekey // If rekeycompatibility is on, we always // want to execute below, otherwise we execute // only when we want to fill up old keys to a given user if RekeyCompatibility || !forceRekey { keyExMessageSpecs, err := s.protocol.GetKeyExMessageSpecs(rawMessage.HashRatchetGroupID, s.identity, rawMessage.Recipients, forceRekey) if err != nil { return nil, err } for i, spec := range keyExMessageSpecs { recipient := rawMessage.Recipients[i] _, _, err = s.sendMessageSpec(ctx, recipient, spec, messageIDs) if err != nil { return nil, err } } } if forceRekey { var ratchet *encryption.HashRatchetKeyCompatibility // We have just rekeyed, pull the latest if RekeyCompatibility { ratchet, err = s.protocol.GetCurrentKeyForGroup(rawMessage.HashRatchetGroupID) if err != nil { return nil, err } } // We send the message over the community topic messages, err := s.protocol.BuildHashRatchetReKeyGroupMessage(s.identity, rawMessage.Recipients, rawMessage.HashRatchetGroupID, ratchet) if err != nil { return nil, err } for _, m := range messages { payload, err := proto.Marshal(m.Message) if err != nil { return nil, err } rawMessage.Payload = payload newMessage := &types.NewMessage{ TTL: whisperTTL, Payload: payload, PowTarget: calculatePoW(payload), PowTime: whisperPoWTime, } newMessage.Ephemeral = rawMessage.Ephemeral newMessage.PubsubTopic = rawMessage.PubsubTopic messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, payload) rawMessage.ID = types.EncodeHex(messageID) // notify before dispatching s.notifyOnScheduledMessage(nil, rawMessage) newMessages, err = s.segmentMessage(newMessage) if err != nil { return nil, err } for _, newMessage := range newMessages { _, err = s.transport.SendPublic(ctx, newMessage, types.EncodeHex(rawMessage.CommunityID)) if err != nil { return nil, err } } } } return nil, nil } wrappedMessage, err := s.wrapMessageV1(rawMessage) if err != nil { return nil, err } // If it's a chat message, we send it on the community chat topic if ShouldCommunityMessageBeEncrypted(rawMessage.MessageType) { messageSpec, err := s.protocol.BuildHashRatchetMessage(rawMessage.HashRatchetGroupID, wrappedMessage) if err != nil { return nil, err } payload, err := proto.Marshal(messageSpec.Message) if err != nil { return nil, errors.Wrap(err, "failed to marshal") } hashes, newMessages, err = s.dispatchCommunityChatMessage(ctx, rawMessage, payload) if err != nil { return nil, err } sentMessage := &SentMessage{ Spec: messageSpec, MessageIDs: messageIDs, } s.notifyOnSentMessage(sentMessage) } else { payload := wrappedMessage pubkey, err := crypto.DecompressPubkey(rawMessage.CommunityID) if err != nil { return nil, errors.Wrap(err, "failed to decompress pubkey") } hashes, newMessages, err = s.dispatchCommunityMessage(ctx, pubkey, payload, messageIDs, rawMessage.PubsubTopic) if err != nil { s.logger.Error("failed to send a community message", zap.Error(err)) return nil, errors.Wrap(err, "failed to send a message spec") } } for i, newMessage := range newMessages { s.logger.Debug("sent community message ", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hashes[i]))) s.transport.Track(messageIDs, hashes[i], newMessage) } return messageID, nil } // sendPrivate sends data to the recipient identifying with a given public key. func (s *MessageSender) sendPrivate( ctx context.Context, recipient *ecdsa.PublicKey, rawMessage *RawMessage, ) ([]byte, error) { s.logger.Debug("sending private message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient)))) wrappedMessage, err := s.wrapMessageV1(rawMessage) if err != nil { return nil, errors.Wrap(err, "failed to wrap message") } messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage) rawMessage.ID = types.EncodeHex(messageID) if rawMessage.BeforeDispatch != nil { if err := rawMessage.BeforeDispatch(rawMessage); err != nil { return nil, err } } // Notify before dispatching, otherwise the dispatch subscription might happen // earlier than the scheduled s.notifyOnScheduledMessage(recipient, rawMessage) if s.featureFlags.Datasync && rawMessage.ResendAutomatically { // No need to call transport tracking. // It is done in a data sync dispatch step. datasyncID, err := s.addToDataSync(recipient, wrappedMessage) if err != nil { return nil, errors.Wrap(err, "failed to send message with datasync") } // We don't need to receive confirmations from our own devices if !IsPubKeyEqual(recipient, &s.identity.PublicKey) { confirmation := &RawMessageConfirmation{ DataSyncID: datasyncID, MessageID: messageID, PublicKey: crypto.CompressPubkey(recipient), } err = s.persistence.InsertPendingConfirmation(confirmation) if err != nil { return nil, err } } } else if rawMessage.SkipEncryptionLayer { // When SkipProtocolLayer is set we don't pass the message to the encryption layer messageIDs := [][]byte{messageID} hashes, newMessages, err := s.sendPrivateRawMessage(ctx, rawMessage, recipient, wrappedMessage, messageIDs) if err != nil { s.logger.Error("failed to send a private message", zap.Error(err)) return nil, errors.Wrap(err, "failed to send a message spec") } for i, newMessage := range newMessages { s.logger.Debug("sent private message skipProtocolLayer", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hashes[i]))) s.transport.Track(messageIDs, hashes[i], newMessage) } } else { messageSpec, err := s.protocol.BuildEncryptedMessage(rawMessage.Sender, recipient, wrappedMessage) if err != nil { return nil, errors.Wrap(err, "failed to encrypt message") } // The shared secret needs to be handle before we send a message // otherwise the topic might not be set up before we receive a message if s.handleSharedSecrets != nil { err := s.handleSharedSecrets([]*sharedsecret.Secret{messageSpec.SharedSecret}) if err != nil { return nil, err } } messageIDs := [][]byte{messageID} hashes, newMessages, err := s.sendMessageSpec(ctx, recipient, messageSpec, messageIDs) if err != nil { s.logger.Error("failed to send a private message", zap.Error(err)) return nil, errors.Wrap(err, "failed to send a message spec") } for i, newMessage := range newMessages { s.logger.Debug("sent private message without datasync", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hashes[i]))) s.transport.Track(messageIDs, hashes[i], newMessage) } } return messageID, nil } // sendPairInstallation sends data to the recipients, using DH func (s *MessageSender) SendPairInstallation( ctx context.Context, recipient *ecdsa.PublicKey, rawMessage RawMessage, ) ([]byte, error) { s.logger.Debug("sending private message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient)))) wrappedMessage, err := s.wrapMessageV1(&rawMessage) if err != nil { return nil, errors.Wrap(err, "failed to wrap message") } messageSpec, err := s.protocol.BuildDHMessage(s.identity, recipient, wrappedMessage) if err != nil { return nil, errors.Wrap(err, "failed to encrypt message") } messageID := v1protocol.MessageID(&s.identity.PublicKey, wrappedMessage) messageIDs := [][]byte{messageID} hashes, newMessages, err := s.sendMessageSpec(ctx, recipient, messageSpec, messageIDs) if err != nil { return nil, errors.Wrap(err, "failed to send a message spec") } for i, newMessage := range newMessages { s.transport.Track(messageIDs, hashes[i], newMessage) } return messageID, nil } func (s *MessageSender) encodeMembershipUpdate( message v1protocol.MembershipUpdateMessage, chatEntity ChatEntity, ) ([]byte, error) { if chatEntity != nil { chatEntityProtobuf := chatEntity.GetProtobuf() switch chatEntityProtobuf := chatEntityProtobuf.(type) { case *protobuf.ChatMessage: message.Message = chatEntityProtobuf case *protobuf.EmojiReaction: message.EmojiReaction = chatEntityProtobuf } } encodedMessage, err := v1protocol.EncodeMembershipUpdateMessage(message) if err != nil { return nil, errors.Wrap(err, "failed to encode membership update message") } return encodedMessage, nil } // EncodeMembershipUpdate takes a group and an optional chat message and returns the protobuf representation to be sent on the wire. // All the events in a group are encoded and added to the payload func (s *MessageSender) EncodeMembershipUpdate( group *v1protocol.Group, chatEntity ChatEntity, ) ([]byte, error) { message := v1protocol.MembershipUpdateMessage{ ChatID: group.ChatID(), Events: group.Events(), } return s.encodeMembershipUpdate(message, chatEntity) } // EncodeAbridgedMembershipUpdate takes a group and an optional chat message and returns the protobuf representation to be sent on the wire. // Only the events relevant to the current group are encoded func (s *MessageSender) EncodeAbridgedMembershipUpdate( group *v1protocol.Group, chatEntity ChatEntity, ) ([]byte, error) { message := v1protocol.MembershipUpdateMessage{ ChatID: group.ChatID(), Events: group.AbridgedEvents(), } return s.encodeMembershipUpdate(message, chatEntity) } func (s *MessageSender) dispatchCommunityChatMessage(ctx context.Context, rawMessage *RawMessage, wrappedMessage []byte) ([][]byte, []*types.NewMessage, error) { newMessage := &types.NewMessage{ TTL: whisperTTL, Payload: wrappedMessage, PowTarget: calculatePoW(wrappedMessage), PowTime: whisperPoWTime, PubsubTopic: rawMessage.PubsubTopic, } if rawMessage.BeforeDispatch != nil { if err := rawMessage.BeforeDispatch(rawMessage); err != nil { return nil, nil, err } } // notify before dispatching s.notifyOnScheduledMessage(nil, rawMessage) newMessages, err := s.segmentMessage(newMessage) if err != nil { return nil, nil, err } hashes := make([][]byte, 0, len(newMessages)) for _, newMessage := range newMessages { hash, err := s.transport.SendPublic(ctx, newMessage, rawMessage.LocalChatID) if err != nil { return nil, nil, err } hashes = append(hashes, hash) } return hashes, newMessages, nil } // SendPublic takes encoded data, encrypts it and sends through the wire. func (s *MessageSender) SendPublic( ctx context.Context, chatName string, rawMessage RawMessage, ) ([]byte, error) { // Set sender if rawMessage.Sender == nil { rawMessage.Sender = s.identity } wrappedMessage, err := s.wrapMessageV1(&rawMessage) if err != nil { return nil, errors.Wrap(err, "failed to wrap message") } var newMessage *types.NewMessage messageSpec, err := s.protocol.BuildPublicMessage(s.identity, wrappedMessage) if err != nil { s.logger.Error("failed to send a public message", zap.Error(err)) return nil, errors.Wrap(err, "failed to wrap a public message in the encryption layer") } if !rawMessage.SkipEncryptionLayer { newMessage, err = MessageSpecToWhisper(messageSpec) if err != nil { return nil, err } } else { newMessage = &types.NewMessage{ TTL: whisperTTL, Payload: wrappedMessage, PowTarget: calculatePoW(wrappedMessage), PowTime: whisperPoWTime, } } newMessage.Ephemeral = rawMessage.Ephemeral newMessage.PubsubTopic = rawMessage.PubsubTopic messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage) rawMessage.ID = types.EncodeHex(messageID) if rawMessage.BeforeDispatch != nil { if err := rawMessage.BeforeDispatch(&rawMessage); err != nil { return nil, err } } // notify before dispatching s.notifyOnScheduledMessage(nil, &rawMessage) newMessages, err := s.segmentMessage(newMessage) if err != nil { return nil, err } hashes := make([][]byte, 0, len(newMessages)) for _, newMessage := range newMessages { hash, err := s.transport.SendPublic(ctx, newMessage, chatName) if err != nil { return nil, err } hashes = append(hashes, hash) s.logger.Debug("sent public message", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hash))) } sentMessage := &SentMessage{ Spec: messageSpec, MessageIDs: [][]byte{messageID}, } s.notifyOnSentMessage(sentMessage) for i, newMessage := range newMessages { s.transport.Track([][]byte{messageID}, hashes[i], newMessage) } return messageID, nil } // unwrapDatasyncMessage tries to unwrap message as datasync one and in case of success // returns cloned messages with replaced payloads func unwrapDatasyncMessage(m *v1protocol.StatusMessage, datasync *datasync.DataSync) ([]*v1protocol.StatusMessage, [][]byte, error) { var statusMessages []*v1protocol.StatusMessage payloads, acks, err := datasync.UnwrapPayloadsAndAcks( m.SigPubKey(), m.EncryptionLayer.Payload, ) if err != nil { return nil, nil, err } for _, payload := range payloads { message, err := m.Clone() if err != nil { return nil, nil, err } message.EncryptionLayer.Payload = payload statusMessages = append(statusMessages, message) } return statusMessages, acks, nil } // HandleMessages expects a whisper message as input, and it will go through // a series of transformations until the message is parsed into an application // layer message, or in case of Raw methods, the processing stops at the layer // before. // It returns an error only if the processing of required steps failed. func (s *MessageSender) HandleMessages(wakuMessage *types.Message) ([]*v1protocol.StatusMessage, [][]byte, error) { logger := s.logger.With(zap.String("site", "HandleMessages")) hlogger := logger.With(zap.String("hash", types.HexBytes(wakuMessage.Hash).String())) var statusMessages []*v1protocol.StatusMessage var acks [][]byte response, err := s.handleMessage(wakuMessage) if err != nil { // Hash ratchet with a group id not found yet, save the message for future processing if err == encryption.ErrHashRatchetGroupIDNotFound && len(response.Message.EncryptionLayer.HashRatchetInfo) == 1 { info := response.Message.EncryptionLayer.HashRatchetInfo[0] return nil, nil, s.persistence.SaveHashRatchetMessage(info.GroupID, info.KeyID, wakuMessage) } // The current message segment has been successfully retrieved. // However, the collection of segments is not yet complete. if err == ErrMessageSegmentsIncomplete { return nil, nil, nil } return nil, nil, err } statusMessages = append(statusMessages, response.Messages()...) acks = append(acks, response.DatasyncAcks...) // Process queued hash ratchet messages for _, hashRatchetInfo := range response.Message.EncryptionLayer.HashRatchetInfo { messages, err := s.persistence.GetHashRatchetMessages(hashRatchetInfo.KeyID) if err != nil { return nil, nil, err } var processedIds [][]byte for _, message := range messages { response, err := s.handleMessage(message) if err != nil { hlogger.Debug("failed to handle hash ratchet message", zap.Error(err)) continue } statusMessages = append(statusMessages, response.Messages()...) acks = append(acks, response.DatasyncAcks...) processedIds = append(processedIds, message.Hash) } err = s.persistence.DeleteHashRatchetMessages(processedIds) if err != nil { s.logger.Warn("failed to delete hash ratchet messages", zap.Error(err)) return nil, nil, err } } return statusMessages, acks, nil } type handleMessageResponse struct { Message *v1protocol.StatusMessage DatasyncMessages []*v1protocol.StatusMessage DatasyncAcks [][]byte } func (h *handleMessageResponse) Messages() []*v1protocol.StatusMessage { if len(h.DatasyncMessages) > 0 { return h.DatasyncMessages } return []*v1protocol.StatusMessage{h.Message} } func (s *MessageSender) handleMessage(wakuMessage *types.Message) (*handleMessageResponse, error) { logger := s.logger.With(zap.String("site", "handleMessage")) hlogger := logger.With(zap.ByteString("hash", wakuMessage.Hash)) response := &handleMessageResponse{ Message: &v1protocol.StatusMessage{}, DatasyncMessages: []*v1protocol.StatusMessage{}, DatasyncAcks: [][]byte{}, } err := response.Message.HandleTransportLayer(wakuMessage) if err != nil { hlogger.Error("failed to handle transport layer message", zap.Error(err)) return nil, err } err = s.handleSegmentationLayer(response.Message) if err != nil { hlogger.Debug("failed to handle segmentation layer message", zap.Error(err)) // Segments not completed yet, stop processing if err == ErrMessageSegmentsIncomplete { return nil, err } // Segments already completed, stop processing if err == ErrMessageSegmentsAlreadyCompleted { return nil, err } } err = s.handleEncryptionLayer(context.Background(), response.Message) if err != nil { hlogger.Debug("failed to handle an encryption message", zap.Error(err)) // Hash ratchet with a group id not found yet, stop processing if err == encryption.ErrHashRatchetGroupIDNotFound { return response, err } } if s.datasync != nil && s.datasyncEnabled { datasyncMessages, as, err := unwrapDatasyncMessage(response.Message, s.datasync) if err != nil { hlogger.Debug("failed to handle datasync message", zap.Error(err)) } else { response.DatasyncMessages = append(response.DatasyncMessages, datasyncMessages...) response.DatasyncAcks = append(response.DatasyncAcks, as...) } } for _, msg := range response.Messages() { err := msg.HandleApplicationLayer() if err != nil { hlogger.Error("failed to handle application metadata layer message", zap.Error(err)) } } return response, nil } // fetchDecryptionKey returns the private key associated with this public key, and returns true if it's an ephemeral key func (s *MessageSender) fetchDecryptionKey(destination *ecdsa.PublicKey) (*ecdsa.PrivateKey, bool) { destinationID := types.EncodeHex(crypto.FromECDSAPub(destination)) s.ephemeralKeysMutex.Lock() decryptionKey, ok := s.ephemeralKeys[destinationID] s.ephemeralKeysMutex.Unlock() // the key is not there, fallback on identity if !ok { return s.identity, false } return decryptionKey, true } func (s *MessageSender) handleEncryptionLayer(ctx context.Context, message *v1protocol.StatusMessage) error { logger := s.logger.With(zap.String("site", "handleEncryptionLayer")) publicKey := message.SigPubKey() // if it's an ephemeral key, we don't negotiate a topic decryptionKey, skipNegotiation := s.fetchDecryptionKey(message.TransportLayer.Dst) err := message.HandleEncryptionLayer(decryptionKey, publicKey, s.protocol, skipNegotiation) // if it's an ephemeral key, we don't have to handle a device not found error if err == encryption.ErrDeviceNotFound && !skipNegotiation { if err := s.handleErrDeviceNotFound(ctx, publicKey); err != nil { logger.Error("failed to handle ErrDeviceNotFound", zap.Error(err)) } } if err != nil { logger.Error("failed to handle an encrypted message", zap.Error(err)) return err } return nil } func (s *MessageSender) handleErrDeviceNotFound(ctx context.Context, publicKey *ecdsa.PublicKey) error { now := time.Now().Unix() advertise, err := s.protocol.ShouldAdvertiseBundle(publicKey, now) if err != nil { return err } if !advertise { return nil } messageSpec, err := s.protocol.BuildBundleAdvertiseMessage(s.identity, publicKey) if err != nil { return err } ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() // We don't pass an array of messageIDs as no action needs to be taken // when sending a bundle _, _, err = s.sendMessageSpec(ctx, publicKey, messageSpec, nil) if err != nil { return err } s.protocol.ConfirmBundleAdvertisement(publicKey, now) return nil } func (s *MessageSender) wrapMessageV1(rawMessage *RawMessage) ([]byte, error) { wrappedMessage, err := v1protocol.WrapMessageV1(rawMessage.Payload, rawMessage.MessageType, rawMessage.Sender) if err != nil { return nil, errors.Wrap(err, "failed to wrap message") } return wrappedMessage, nil } func (s *MessageSender) addToDataSync(publicKey *ecdsa.PublicKey, message []byte) ([]byte, error) { groupID := datasync.ToOneToOneGroupID(&s.identity.PublicKey, publicKey) peerID := datasyncpeer.PublicKeyToPeerID(*publicKey) exist, err := s.datasync.IsPeerInGroup(groupID, peerID) if err != nil { return nil, errors.Wrap(err, "failed to check if peer is in group") } if !exist { if err := s.datasync.AddPeer(groupID, peerID); err != nil { return nil, errors.Wrap(err, "failed to add peer") } } id, err := s.datasync.AppendMessage(groupID, message) if err != nil { return nil, errors.Wrap(err, "failed to append message to datasync") } return id[:], nil } // sendDataSync sends a message scheduled by the data sync layer. // Data Sync layer calls this method "dispatch" function. func (s *MessageSender) sendDataSync(ctx context.Context, publicKey *ecdsa.PublicKey, marshalledDatasyncPayload []byte, payload *datasyncproto.Payload) error { // Calculate the messageIDs messageIDs := make([][]byte, 0, len(payload.Messages)) hexMessageIDs := make([]string, 0, len(payload.Messages)) for _, payload := range payload.Messages { mid := v1protocol.MessageID(&s.identity.PublicKey, payload.Body) messageIDs = append(messageIDs, mid) hexMessageIDs = append(hexMessageIDs, mid.String()) } messageSpec, err := s.protocol.BuildEncryptedMessage(s.identity, publicKey, marshalledDatasyncPayload) if err != nil { return errors.Wrap(err, "failed to encrypt message") } // The shared secret needs to be handle before we send a message // otherwise the topic might not be set up before we receive a message if s.handleSharedSecrets != nil { err := s.handleSharedSecrets([]*sharedsecret.Secret{messageSpec.SharedSecret}) if err != nil { return err } } hashes, newMessages, err := s.sendMessageSpec(ctx, publicKey, messageSpec, messageIDs) if err != nil { s.logger.Error("failed to send a datasync message", zap.Error(err)) return err } for i, newMessage := range newMessages { s.logger.Debug("sent private messages", zap.Any("messageIDs", hexMessageIDs), zap.String("hash", types.EncodeHex(hashes[i]))) s.transport.Track(messageIDs, hashes[i], newMessage) } return nil } // sendPrivateRawMessage sends a message not wrapped in an encryption layer func (s *MessageSender) sendPrivateRawMessage(ctx context.Context, rawMessage *RawMessage, publicKey *ecdsa.PublicKey, payload []byte, messageIDs [][]byte) ([][]byte, []*types.NewMessage, error) { newMessage := &types.NewMessage{ TTL: whisperTTL, Payload: payload, PowTarget: calculatePoW(payload), PowTime: whisperPoWTime, PubsubTopic: rawMessage.PubsubTopic, } newMessages, err := s.segmentMessage(newMessage) if err != nil { return nil, nil, err } hashes := make([][]byte, 0, len(newMessages)) var hash []byte for _, newMessage := range newMessages { if rawMessage.SendOnPersonalTopic { hash, err = s.transport.SendPrivateOnPersonalTopic(ctx, newMessage, publicKey) } else { hash, err = s.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey) } if err != nil { return nil, nil, err } hashes = append(hashes, hash) } return hashes, newMessages, nil } // sendCommunityMessage sends a message not wrapped in an encryption layer // to a community func (s *MessageSender) dispatchCommunityMessage(ctx context.Context, publicKey *ecdsa.PublicKey, payload []byte, messageIDs [][]byte, pubsubTopic string) ([][]byte, []*types.NewMessage, error) { newMessage := &types.NewMessage{ TTL: whisperTTL, Payload: payload, PowTarget: calculatePoW(payload), PowTime: whisperPoWTime, PubsubTopic: pubsubTopic, } newMessages, err := s.segmentMessage(newMessage) if err != nil { return nil, nil, err } hashes := make([][]byte, 0, len(newMessages)) for _, newMessage := range newMessages { hash, err := s.transport.SendCommunityMessage(ctx, newMessage, publicKey) if err != nil { return nil, nil, err } hashes = append(hashes, hash) } return hashes, newMessages, nil } // sendMessageSpec analyses the spec properties and selects a proper transport method. func (s *MessageSender) sendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec, messageIDs [][]byte) ([][]byte, []*types.NewMessage, error) { logger := s.logger.With(zap.String("site", "sendMessageSpec")) newMessage, err := MessageSpecToWhisper(messageSpec) if err != nil { return nil, nil, err } newMessages, err := s.segmentMessage(newMessage) if err != nil { return nil, nil, err } hashes := make([][]byte, 0, len(newMessages)) var hash []byte for _, newMessage := range newMessages { // process shared secret if messageSpec.AgreedSecret { logger.Debug("sending using shared secret") hash, err = s.transport.SendPrivateWithSharedSecret(ctx, newMessage, publicKey, messageSpec.SharedSecret.Key) } else { logger.Debug("sending partitioned topic") hash, err = s.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey) } if err != nil { return nil, nil, err } hashes = append(hashes, hash) } sentMessage := &SentMessage{ PublicKey: publicKey, Spec: messageSpec, MessageIDs: messageIDs, } s.notifyOnSentMessage(sentMessage) return hashes, newMessages, nil } func (s *MessageSender) SubscribeToMessageEvents() <-chan *MessageEvent { c := make(chan *MessageEvent, 100) s.messageEventsSubscriptions = append(s.messageEventsSubscriptions, c) return c } func (s *MessageSender) notifyOnSentMessage(sentMessage *SentMessage) { event := &MessageEvent{ Type: MessageSent, SentMessage: sentMessage, } // Publish on channels, drop if buffer is full for _, c := range s.messageEventsSubscriptions { select { case c <- event: default: s.logger.Warn("message events subscription channel full when publishing sent event, dropping message") } } } func (s *MessageSender) notifyOnScheduledMessage(recipient *ecdsa.PublicKey, message *RawMessage) { event := &MessageEvent{ Recipient: recipient, Type: MessageScheduled, RawMessage: message, } // Publish on channels, drop if buffer is full for _, c := range s.messageEventsSubscriptions { select { case c <- event: default: s.logger.Warn("message events subscription channel full when publishing scheduled event, dropping message") } } } func (s *MessageSender) JoinPublic(id string) (*transport.Filter, error) { return s.transport.JoinPublic(id) } // AddEphemeralKey adds an ephemeral key that we will be listening to // note that we never removed them from now, as waku/whisper does not // recalculate topics on removal, so effectively there's no benefit. // On restart they will be gone. func (s *MessageSender) AddEphemeralKey(privateKey *ecdsa.PrivateKey) (*transport.Filter, error) { s.ephemeralKeysMutex.Lock() s.ephemeralKeys[types.EncodeHex(crypto.FromECDSAPub(&privateKey.PublicKey))] = privateKey s.ephemeralKeysMutex.Unlock() return s.transport.LoadKeyFilters(privateKey) } func MessageSpecToWhisper(spec *encryption.ProtocolMessageSpec) (*types.NewMessage, error) { var newMessage *types.NewMessage payload, err := proto.Marshal(spec.Message) if err != nil { return newMessage, err } newMessage = &types.NewMessage{ TTL: whisperTTL, Payload: payload, PowTarget: calculatePoW(payload), PowTime: whisperPoWTime, } return newMessage, nil } // calculatePoW returns the PoWTarget to be used. // We check the size and arbitrarily set it to a lower PoW if the packet is // greater than 50KB. We do this as the defaultPoW is too high for clients to send // large messages. func calculatePoW(payload []byte) float64 { if len(payload) > largeSizeInBytes { return whisperLargeSizePoW } return whisperDefaultPoW } func (s *MessageSender) StopDatasync() { if s.datasync != nil { s.datasync.Stop() } } func (s *MessageSender) StartDatasync() error { if !s.datasyncEnabled { return nil } dataSyncTransport := datasync.NewNodeTransport() dataSyncNode, err := datasyncnode.NewPersistentNode( s.database, dataSyncTransport, datasyncpeer.PublicKeyToPeerID(s.identity.PublicKey), datasyncnode.BATCH, datasync.CalculateSendTime, s.logger, ) if err != nil { return err } s.datasync = datasync.New(dataSyncNode, dataSyncTransport, true, s.logger) s.datasync.Init(s.sendDataSync, s.logger) s.datasync.Start(datasync.DatasyncTicker) return nil } // GetCurrentKeyForGroup returns the latest key timestampID belonging to a key group func (s *MessageSender) GetCurrentKeyForGroup(groupID []byte) (*encryption.HashRatchetKeyCompatibility, error) { return s.protocol.GetCurrentKeyForGroup(groupID) } // GetKeyIDsForGroup returns a slice of key IDs belonging to a given group ID func (s *MessageSender) GetKeysForGroup(groupID []byte) ([]*encryption.HashRatchetKeyCompatibility, error) { return s.protocol.GetKeysForGroup(groupID) } // Segments message into smaller chunks if the size exceeds the maximum allowed func segmentMessage(newMessage *types.NewMessage, maxSegmentSize int) ([]*types.NewMessage, error) { if len(newMessage.Payload) <= maxSegmentSize { return []*types.NewMessage{newMessage}, nil } createSegment := func(chunkPayload []byte) (*types.NewMessage, error) { copy := &types.NewMessage{} err := copier.Copy(copy, newMessage) if err != nil { return nil, err } copy.Payload = chunkPayload copy.PowTarget = calculatePoW(chunkPayload) return copy, nil } entireMessageHash := crypto.Keccak256(newMessage.Payload) payloadSize := len(newMessage.Payload) segmentsCount := int(math.Ceil(float64(payloadSize) / float64(maxSegmentSize))) var segmentMessages []*types.NewMessage for start, index := 0, 0; start < payloadSize; start += maxSegmentSize { end := start + maxSegmentSize if end > payloadSize { end = payloadSize } chunk := newMessage.Payload[start:end] segmentMessageProto := &protobuf.SegmentMessage{ EntireMessageHash: entireMessageHash, Index: uint32(index), SegmentsCount: uint32(segmentsCount), Payload: chunk, } chunkPayload, err := proto.Marshal(segmentMessageProto) if err != nil { return nil, err } segmentMessage, err := createSegment(chunkPayload) if err != nil { return nil, err } segmentMessages = append(segmentMessages, segmentMessage) index++ } return segmentMessages, nil } func (s *MessageSender) segmentMessage(newMessage *types.NewMessage) ([]*types.NewMessage, error) { // We set the max message size to 3/4 of the allowed message size, to leave // room for segment message metadata. newMessages, err := segmentMessage(newMessage, int(s.transport.MaxMessageSize()/4*3)) s.logger.Debug("message segmented", zap.Int("segments", len(newMessages))) return newMessages, err } var ErrMessageSegmentsIncomplete = errors.New("message segments incomplete") var ErrMessageSegmentsAlreadyCompleted = errors.New("message segments already completed") var ErrMessageSegmentsInvalidCount = errors.New("invalid segments count") var ErrMessageSegmentsHashMismatch = errors.New("hash of entire payload does not match") func (s *MessageSender) handleSegmentationLayer(message *v1protocol.StatusMessage) error { logger := s.logger.With(zap.String("site", "handleSegmentationLayer")) hlogger := logger.With(zap.String("hash", types.HexBytes(message.TransportLayer.Hash).String())) var segmentMessage protobuf.SegmentMessage err := proto.Unmarshal(message.TransportLayer.Payload, &segmentMessage) if err != nil { return errors.Wrap(err, "failed to unmarshal SegmentMessage") } hlogger.Debug("handling message segment", zap.String("EntireMessageHash", types.HexBytes(segmentMessage.EntireMessageHash).String()), zap.Uint32("Index", segmentMessage.Index), zap.Uint32("SegmentsCount", segmentMessage.SegmentsCount)) alreadyCompleted, err := s.persistence.IsMessageAlreadyCompleted(segmentMessage.EntireMessageHash) if err != nil { return err } if alreadyCompleted { return ErrMessageSegmentsAlreadyCompleted } if segmentMessage.SegmentsCount < 2 { return ErrMessageSegmentsInvalidCount } err = s.persistence.SaveMessageSegment(&segmentMessage, message.TransportLayer.SigPubKey, time.Now().Unix()) if err != nil { return err } segments, err := s.persistence.GetMessageSegments(segmentMessage.EntireMessageHash, message.TransportLayer.SigPubKey) if err != nil { return err } if len(segments) != int(segmentMessage.SegmentsCount) { return ErrMessageSegmentsIncomplete } // Combine payload var entirePayload bytes.Buffer for _, segment := range segments { _, err := entirePayload.Write(segment.Payload) if err != nil { return errors.Wrap(err, "failed to write segment payload") } } // Sanity check entirePayloadHash := crypto.Keccak256(entirePayload.Bytes()) if !bytes.Equal(entirePayloadHash, segmentMessage.EntireMessageHash) { return ErrMessageSegmentsHashMismatch } err = s.persistence.CompleteMessageSegments(segmentMessage.EntireMessageHash, message.TransportLayer.SigPubKey, time.Now().Unix()) if err != nil { return err } message.TransportLayer.Payload = entirePayload.Bytes() return nil } func (s *MessageSender) CleanupSegments() error { weekAgo := time.Now().AddDate(0, 0, -7).Unix() monthAgo := time.Now().AddDate(0, -1, 0).Unix() err := s.persistence.RemoveMessageSegmentsOlderThan(weekAgo) if err != nil { return err } err = s.persistence.RemoveMessageSegmentsCompletedOlderThan(monthAgo) if err != nil { return err } return nil }