status-go/protocol/common/message_sender.go

1432 lines
43 KiB
Go

package common
import (
"bytes"
"context"
"crypto/ecdsa"
"database/sql"
"math"
"sync"
"time"
"github.com/golang/protobuf/proto"
"github.com/jinzhu/copier"
"github.com/pkg/errors"
datasyncnode "github.com/status-im/mvds/node"
datasyncproto "github.com/status-im/mvds/protobuf"
"github.com/status-im/mvds/state"
"go.uber.org/zap"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/datasync"
datasyncpeer "github.com/status-im/status-go/protocol/datasync/peer"
"github.com/status-im/status-go/protocol/encryption"
"github.com/status-im/status-go/protocol/encryption/sharedsecret"
"github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/transport"
v1protocol "github.com/status-im/status-go/protocol/v1"
)
// Whisper message properties.
const (
whisperTTL = 15
whisperDefaultPoW = 0.002
// whisperLargeSizePoW is the PoWTarget for larger payload sizes
whisperLargeSizePoW = 0.000002
// largeSizeInBytes is when should we be using a lower POW.
// Roughly this is 50KB
largeSizeInBytes = 50000
whisperPoWTime = 5
)
// RekeyCompatibility indicates whether we should be sending
// keys in 1-to-1 messages as well as in the newer format
var RekeyCompatibility = true
// SentMessage reprent a message that has been passed to the transport layer
type SentMessage struct {
PublicKey *ecdsa.PublicKey
Spec *encryption.ProtocolMessageSpec
MessageIDs [][]byte
}
type MessageEventType uint32
const (
MessageScheduled = iota + 1
MessageSent
)
type MessageEvent struct {
Recipient *ecdsa.PublicKey
Type MessageEventType
SentMessage *SentMessage
RawMessage *RawMessage
}
type MessageSender struct {
identity *ecdsa.PrivateKey
datasync *datasync.DataSync
database *sql.DB
protocol *encryption.Protocol
transport *transport.Transport
logger *zap.Logger
persistence *RawMessagesPersistence
datasyncEnabled bool
// ephemeralKeys is a map that contains the ephemeral keys of the client, used
// to decrypt messages
ephemeralKeys map[string]*ecdsa.PrivateKey
ephemeralKeysMutex sync.Mutex
// messageEventsSubscriptions contains all the subscriptions for message events
messageEventsSubscriptions []chan<- *MessageEvent
messageEventsSubscriptionsMutex sync.Mutex
featureFlags FeatureFlags
// handleSharedSecrets is a callback that is called every time a new shared secret is negotiated
handleSharedSecrets func([]*sharedsecret.Secret) error
}
func NewMessageSender(
identity *ecdsa.PrivateKey,
database *sql.DB,
enc *encryption.Protocol,
transport *transport.Transport,
logger *zap.Logger,
features FeatureFlags,
) (*MessageSender, error) {
p := &MessageSender{
identity: identity,
datasyncEnabled: features.Datasync,
protocol: enc,
database: database,
persistence: NewRawMessagesPersistence(database),
transport: transport,
logger: logger,
ephemeralKeys: make(map[string]*ecdsa.PrivateKey),
featureFlags: features,
}
return p, nil
}
func (s *MessageSender) Stop() {
s.messageEventsSubscriptionsMutex.Lock()
defer s.messageEventsSubscriptionsMutex.Unlock()
for _, c := range s.messageEventsSubscriptions {
close(c)
}
s.messageEventsSubscriptions = nil
s.StopDatasync()
}
func (s *MessageSender) SetHandleSharedSecrets(handler func([]*sharedsecret.Secret) error) {
s.handleSharedSecrets = handler
}
func (s *MessageSender) StartDatasync(handler func(peer state.PeerID, payload *datasyncproto.Payload) error) error {
if !s.datasyncEnabled {
return nil
}
dataSyncTransport := datasync.NewNodeTransport()
dataSyncNode, err := datasyncnode.NewPersistentNode(
s.database,
dataSyncTransport,
datasyncpeer.PublicKeyToPeerID(s.identity.PublicKey),
datasyncnode.BATCH,
datasync.CalculateSendTime,
s.logger,
)
if err != nil {
return err
}
s.datasync = datasync.New(dataSyncNode, dataSyncTransport, true, s.logger)
s.datasync.Init(handler, s.logger)
s.datasync.Start(datasync.DatasyncTicker)
return nil
}
// SendPrivate takes encoded data, encrypts it and sends through the wire.
func (s *MessageSender) SendPrivate(
ctx context.Context,
recipient *ecdsa.PublicKey,
rawMessage *RawMessage,
) ([]byte, error) {
s.logger.Debug(
"sending a private message",
zap.String("public-key", types.EncodeHex(crypto.FromECDSAPub(recipient))),
zap.String("site", "SendPrivate"),
)
// Currently we don't support sending through datasync and setting custom waku fields,
// as the datasync interface is not rich enough to propagate that information, so we
// would have to add some complexity to handle this.
if rawMessage.ResendAutomatically && (rawMessage.Sender != nil || rawMessage.SkipEncryptionLayer || rawMessage.SendOnPersonalTopic) {
return nil, errors.New("setting identity, skip-encryption or personal topic and datasync not supported")
}
// Set sender identity if not specified
if rawMessage.Sender == nil {
rawMessage.Sender = s.identity
}
return s.sendPrivate(ctx, recipient, rawMessage)
}
// SendCommunityMessage takes encoded data, encrypts it and sends through the wire
// using the community topic and their key
func (s *MessageSender) SendCommunityMessage(
ctx context.Context,
rawMessage RawMessage,
) ([]byte, error) {
s.logger.Debug(
"sending a community message",
zap.String("communityId", types.EncodeHex(rawMessage.CommunityID)),
zap.String("site", "SendCommunityMessage"),
)
rawMessage.Sender = s.identity
return s.sendCommunity(ctx, &rawMessage)
}
// SendPubsubTopicKey sends the protected topic key for a community to a list of recipients
func (s *MessageSender) SendPubsubTopicKey(
ctx context.Context,
rawMessage *RawMessage,
) ([]byte, error) {
s.logger.Debug(
"sending the protected topic key for a community",
zap.String("communityId", types.EncodeHex(rawMessage.CommunityID)),
zap.String("site", "SendPubsubTopicKey"),
)
rawMessage.Sender = s.identity
messageID, err := s.getMessageID(rawMessage)
if err != nil {
return nil, err
}
rawMessage.ID = types.EncodeHex(messageID)
// Notify before dispatching, otherwise the dispatch subscription might happen
// earlier than the scheduled
s.notifyOnScheduledMessage(nil, rawMessage)
// Send to each recipients
for _, recipient := range rawMessage.Recipients {
_, err = s.sendPrivate(ctx, recipient, rawMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to send message")
}
}
return messageID, nil
}
// SendGroup takes encoded data, encrypts it and sends through the wire,
// always return the messageID
func (s *MessageSender) SendGroup(
ctx context.Context,
recipients []*ecdsa.PublicKey,
rawMessage RawMessage,
) ([]byte, error) {
s.logger.Debug(
"sending a private group message",
zap.String("site", "SendGroup"),
)
// Set sender if not specified
if rawMessage.Sender == nil {
rawMessage.Sender = s.identity
}
// Calculate messageID first and set on raw message
wrappedMessage, err := s.wrapMessageV1(&rawMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
rawMessage.ID = types.EncodeHex(messageID)
// We call it only once, and we nil the function after so it doesn't get called again
if rawMessage.BeforeDispatch != nil {
if err := rawMessage.BeforeDispatch(&rawMessage); err != nil {
return nil, err
}
}
// Send to each recipients
for _, recipient := range recipients {
_, err = s.sendPrivate(ctx, recipient, &rawMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to send message")
}
}
return messageID, nil
}
func (s *MessageSender) getMessageID(rawMessage *RawMessage) (types.HexBytes, error) {
wrappedMessage, err := s.wrapMessageV1(rawMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
return messageID, nil
}
func ShouldCommunityMessageBeEncrypted(msgType protobuf.ApplicationMetadataMessage_Type) bool {
return msgType == protobuf.ApplicationMetadataMessage_CHAT_MESSAGE ||
msgType == protobuf.ApplicationMetadataMessage_EDIT_MESSAGE ||
msgType == protobuf.ApplicationMetadataMessage_DELETE_MESSAGE ||
msgType == protobuf.ApplicationMetadataMessage_PIN_MESSAGE ||
msgType == protobuf.ApplicationMetadataMessage_EMOJI_REACTION
}
// sendCommunity sends a message that's to be sent in a community
// If it's a chat message, it will go to the respective topic derived by the
// chat id, if it's not a chat message, it will go to the community topic.
func (s *MessageSender) sendCommunity(
ctx context.Context,
rawMessage *RawMessage,
) ([]byte, error) {
s.logger.Debug("sending community message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(&rawMessage.Sender.PublicKey))))
// Set sender
if rawMessage.Sender == nil {
rawMessage.Sender = s.identity
}
messageID, err := s.getMessageID(rawMessage)
if err != nil {
return nil, err
}
rawMessage.ID = types.EncodeHex(messageID)
if rawMessage.BeforeDispatch != nil {
if err := rawMessage.BeforeDispatch(rawMessage); err != nil {
return nil, err
}
}
// Notify before dispatching, otherwise the dispatch subscription might happen
// earlier than the scheduled
s.notifyOnScheduledMessage(nil, rawMessage)
var hashes [][]byte
var newMessages []*types.NewMessage
forceRekey := rawMessage.CommunityKeyExMsgType == KeyExMsgRekey
// Check if it's a key exchange message. In this case we send it
// to all the recipients
if rawMessage.CommunityKeyExMsgType != KeyExMsgNone {
// If rekeycompatibility is on, we always
// want to execute below, otherwise we execute
// only when we want to fill up old keys to a given user
if RekeyCompatibility || !forceRekey {
keyExMessageSpecs, err := s.protocol.GetKeyExMessageSpecs(rawMessage.HashRatchetGroupID, s.identity, rawMessage.Recipients, forceRekey)
if err != nil {
return nil, err
}
for i, spec := range keyExMessageSpecs {
recipient := rawMessage.Recipients[i]
_, _, err = s.sendMessageSpec(ctx, recipient, spec, [][]byte{messageID})
if err != nil {
return nil, err
}
}
}
}
wrappedMessage, err := s.wrapMessageV1(rawMessage)
if err != nil {
return nil, err
}
// If it's a chat message, we send it on the community chat topic
if ShouldCommunityMessageBeEncrypted(rawMessage.MessageType) {
messageSpec, err := s.protocol.BuildHashRatchetMessage(rawMessage.HashRatchetGroupID, wrappedMessage)
if err != nil {
return nil, err
}
payload, err := proto.Marshal(messageSpec.Message)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal")
}
hashes, newMessages, err = s.dispatchCommunityChatMessage(ctx, rawMessage, payload, forceRekey)
if err != nil {
return nil, err
}
sentMessage := &SentMessage{
Spec: messageSpec,
MessageIDs: [][]byte{messageID},
}
s.notifyOnSentMessage(sentMessage)
} else {
pubkey, err := crypto.DecompressPubkey(rawMessage.CommunityID)
if err != nil {
return nil, errors.Wrap(err, "failed to decompress pubkey")
}
hashes, newMessages, err = s.dispatchCommunityMessage(ctx, pubkey, wrappedMessage, rawMessage.PubsubTopic, forceRekey, rawMessage)
if err != nil {
s.logger.Error("failed to send a community message", zap.Error(err))
return nil, errors.Wrap(err, "failed to send a message spec")
}
}
s.logger.Debug("sent community message ", zap.String("messageID", messageID.String()), zap.Strings("hashes", types.EncodeHexes(hashes)))
s.transport.Track(messageID, hashes, newMessages)
return messageID, nil
}
// sendPrivate sends data to the recipient identifying with a given public key.
func (s *MessageSender) sendPrivate(
ctx context.Context,
recipient *ecdsa.PublicKey,
rawMessage *RawMessage,
) ([]byte, error) {
s.logger.Debug("sending private message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient))))
var wrappedMessage []byte
var err error
if rawMessage.SkipApplicationWrap {
wrappedMessage = rawMessage.Payload
} else {
wrappedMessage, err = s.wrapMessageV1(rawMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
}
messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
rawMessage.ID = types.EncodeHex(messageID)
if rawMessage.BeforeDispatch != nil {
if err := rawMessage.BeforeDispatch(rawMessage); err != nil {
return nil, err
}
}
// Notify before dispatching, otherwise the dispatch subscription might happen
// earlier than the scheduled
s.notifyOnScheduledMessage(recipient, rawMessage)
if s.datasync != nil && s.featureFlags.Datasync && rawMessage.ResendAutomatically {
// No need to call transport tracking.
// It is done in a data sync dispatch step.
datasyncID, err := s.addToDataSync(recipient, wrappedMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to send message with datasync")
}
// We don't need to receive confirmations from our own devices
if !IsPubKeyEqual(recipient, &s.identity.PublicKey) {
confirmation := &RawMessageConfirmation{
DataSyncID: datasyncID,
MessageID: messageID,
PublicKey: crypto.CompressPubkey(recipient),
}
err = s.persistence.InsertPendingConfirmation(confirmation)
if err != nil {
return nil, err
}
}
} else if rawMessage.SkipEncryptionLayer {
messageBytes := wrappedMessage
if rawMessage.CommunityKeyExMsgType == KeyExMsgReuse {
groupID := rawMessage.HashRatchetGroupID
ratchets, err := s.protocol.GetKeysForGroup(groupID)
if err != nil {
return nil, err
}
message, err := s.protocol.BuildHashRatchetKeyExchangeMessageWithPayload(s.identity, recipient, groupID, ratchets, wrappedMessage)
if err != nil {
return nil, err
}
messageBytes, err = proto.Marshal(message.Message)
if err != nil {
return nil, err
}
}
// When SkipProtocolLayer is set we don't pass the message to the encryption layer
hashes, newMessages, err := s.sendPrivateRawMessage(ctx, rawMessage, recipient, messageBytes)
if err != nil {
s.logger.Error("failed to send a private message", zap.Error(err))
return nil, errors.Wrap(err, "failed to send a message spec")
}
s.logger.Debug("sent private message skipProtocolLayer", zap.String("messageID", messageID.String()), zap.Strings("hashes", types.EncodeHexes(hashes)))
s.transport.Track(messageID, hashes, newMessages)
} else {
messageSpec, err := s.protocol.BuildEncryptedMessage(rawMessage.Sender, recipient, wrappedMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to encrypt message")
}
hashes, newMessages, err := s.sendMessageSpec(ctx, recipient, messageSpec, [][]byte{messageID})
if err != nil {
s.logger.Error("failed to send a private message", zap.Error(err))
return nil, errors.Wrap(err, "failed to send a message spec")
}
s.logger.Debug("sent private message without datasync", zap.String("messageID", messageID.String()), zap.Strings("hashes", types.EncodeHexes(hashes)))
s.transport.Track(messageID, hashes, newMessages)
}
return messageID, nil
}
// sendPairInstallation sends data to the recipients, using DH
func (s *MessageSender) SendPairInstallation(
ctx context.Context,
recipient *ecdsa.PublicKey,
rawMessage RawMessage,
) ([]byte, error) {
s.logger.Debug("sending private message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient))))
wrappedMessage, err := s.wrapMessageV1(&rawMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
messageSpec, err := s.protocol.BuildDHMessage(s.identity, recipient, wrappedMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to encrypt message")
}
messageID := v1protocol.MessageID(&s.identity.PublicKey, wrappedMessage)
hashes, newMessages, err := s.sendMessageSpec(ctx, recipient, messageSpec, [][]byte{messageID})
if err != nil {
return nil, errors.Wrap(err, "failed to send a message spec")
}
s.transport.Track(messageID, hashes, newMessages)
return messageID, nil
}
func (s *MessageSender) encodeMembershipUpdate(
message v1protocol.MembershipUpdateMessage,
chatEntity ChatEntity,
) ([]byte, error) {
if chatEntity != nil {
chatEntityProtobuf := chatEntity.GetProtobuf()
switch chatEntityProtobuf := chatEntityProtobuf.(type) {
case *protobuf.ChatMessage:
message.Message = chatEntityProtobuf
case *protobuf.EmojiReaction:
message.EmojiReaction = chatEntityProtobuf
}
}
encodedMessage, err := v1protocol.EncodeMembershipUpdateMessage(message)
if err != nil {
return nil, errors.Wrap(err, "failed to encode membership update message")
}
return encodedMessage, nil
}
// EncodeMembershipUpdate takes a group and an optional chat message and returns the protobuf representation to be sent on the wire.
// All the events in a group are encoded and added to the payload
func (s *MessageSender) EncodeMembershipUpdate(
group *v1protocol.Group,
chatEntity ChatEntity,
) ([]byte, error) {
message := v1protocol.MembershipUpdateMessage{
ChatID: group.ChatID(),
Events: group.Events(),
}
return s.encodeMembershipUpdate(message, chatEntity)
}
// EncodeAbridgedMembershipUpdate takes a group and an optional chat message and returns the protobuf representation to be sent on the wire.
// Only the events relevant to the current group are encoded
func (s *MessageSender) EncodeAbridgedMembershipUpdate(
group *v1protocol.Group,
chatEntity ChatEntity,
) ([]byte, error) {
message := v1protocol.MembershipUpdateMessage{
ChatID: group.ChatID(),
Events: group.AbridgedEvents(),
}
return s.encodeMembershipUpdate(message, chatEntity)
}
func (s *MessageSender) dispatchCommunityChatMessage(ctx context.Context, rawMessage *RawMessage, wrappedMessage []byte, rekey bool) ([][]byte, []*types.NewMessage, error) {
payload := wrappedMessage
var err error
if rekey && len(rawMessage.HashRatchetGroupID) != 0 {
var ratchet *encryption.HashRatchetKeyCompatibility
// We have just rekeyed, pull the latest
if RekeyCompatibility {
ratchet, err = s.protocol.GetCurrentKeyForGroup(rawMessage.HashRatchetGroupID)
if err != nil {
return nil, nil, err
}
}
// We send the message over the community topic
spec, err := s.protocol.BuildHashRatchetReKeyGroupMessage(s.identity, rawMessage.Recipients, rawMessage.HashRatchetGroupID, wrappedMessage, ratchet)
if err != nil {
return nil, nil, err
}
payload, err = proto.Marshal(spec.Message)
if err != nil {
return nil, nil, err
}
}
newMessage := &types.NewMessage{
TTL: whisperTTL,
Payload: payload,
PowTarget: calculatePoW(payload),
PowTime: whisperPoWTime,
PubsubTopic: rawMessage.PubsubTopic,
}
if rawMessage.BeforeDispatch != nil {
if err := rawMessage.BeforeDispatch(rawMessage); err != nil {
return nil, nil, err
}
}
// notify before dispatching
s.notifyOnScheduledMessage(nil, rawMessage)
newMessages, err := s.segmentMessage(newMessage)
if err != nil {
return nil, nil, err
}
hashes := make([][]byte, 0, len(newMessages))
for _, newMessage := range newMessages {
hash, err := s.transport.SendPublic(ctx, newMessage, rawMessage.LocalChatID)
if err != nil {
return nil, nil, err
}
hashes = append(hashes, hash)
}
return hashes, newMessages, nil
}
// SendPublic takes encoded data, encrypts it and sends through the wire.
func (s *MessageSender) SendPublic(
ctx context.Context,
chatName string,
rawMessage RawMessage,
) ([]byte, error) {
// Set sender
if rawMessage.Sender == nil {
rawMessage.Sender = s.identity
}
var wrappedMessage []byte
var err error
if rawMessage.SkipApplicationWrap {
wrappedMessage = rawMessage.Payload
} else {
wrappedMessage, err = s.wrapMessageV1(&rawMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
}
var newMessage *types.NewMessage
messageSpec, err := s.protocol.BuildPublicMessage(s.identity, wrappedMessage)
if err != nil {
s.logger.Error("failed to send a public message", zap.Error(err))
return nil, errors.Wrap(err, "failed to wrap a public message in the encryption layer")
}
if len(rawMessage.HashRatchetGroupID) != 0 {
var ratchet *encryption.HashRatchetKeyCompatibility
var err error
// We have just rekeyed, pull the latest
ratchet, err = s.protocol.GetCurrentKeyForGroup(rawMessage.HashRatchetGroupID)
if err != nil {
return nil, err
}
keyID, err := ratchet.GetKeyID()
if err != nil {
return nil, err
}
s.logger.Debug("adding key id to message", zap.String("keyid", types.Bytes2Hex(keyID)))
// We send the message over the community topic
spec, err := s.protocol.BuildHashRatchetReKeyGroupMessage(s.identity, rawMessage.Recipients, rawMessage.HashRatchetGroupID, wrappedMessage, ratchet)
if err != nil {
return nil, err
}
newMessage, err = MessageSpecToWhisper(spec)
if err != nil {
return nil, err
}
} else if !rawMessage.SkipEncryptionLayer {
newMessage, err = MessageSpecToWhisper(messageSpec)
if err != nil {
return nil, err
}
} else {
newMessage = &types.NewMessage{
TTL: whisperTTL,
Payload: wrappedMessage,
PowTarget: calculatePoW(wrappedMessage),
PowTime: whisperPoWTime,
}
}
newMessage.Ephemeral = rawMessage.Ephemeral
newMessage.PubsubTopic = rawMessage.PubsubTopic
messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
rawMessage.ID = types.EncodeHex(messageID)
if rawMessage.BeforeDispatch != nil {
if err := rawMessage.BeforeDispatch(&rawMessage); err != nil {
return nil, err
}
}
// notify before dispatching
s.notifyOnScheduledMessage(nil, &rawMessage)
newMessages, err := s.segmentMessage(newMessage)
if err != nil {
return nil, err
}
hashes := make([][]byte, 0, len(newMessages))
for _, newMessage := range newMessages {
hash, err := s.transport.SendPublic(ctx, newMessage, chatName)
if err != nil {
return nil, err
}
hashes = append(hashes, hash)
}
sentMessage := &SentMessage{
Spec: messageSpec,
MessageIDs: [][]byte{messageID},
}
s.notifyOnSentMessage(sentMessage)
s.logger.Debug("sent public message", zap.String("messageID", messageID.String()), zap.Strings("hashes", types.EncodeHexes(hashes)))
s.transport.Track(messageID, hashes, newMessages)
return messageID, nil
}
// unwrapDatasyncMessage tries to unwrap message as datasync one and in case of success
// returns cloned messages with replaced payloads
func (s *MessageSender) unwrapDatasyncMessage(m *v1protocol.StatusMessage, response *handleMessageResponse) error {
datasyncMessage, err := s.datasync.Unwrap(
m.SigPubKey(),
m.EncryptionLayer.Payload,
)
if err != nil {
return err
}
response.DatasyncSender = m.SigPubKey()
response.DatasyncAcks = append(response.DatasyncAcks, datasyncMessage.Acks...)
response.DatasyncRequests = append(response.DatasyncRequests, datasyncMessage.Requests...)
for _, o := range datasyncMessage.GroupOffers {
for _, mID := range o.MessageIds {
response.DatasyncOffers = append(response.DatasyncOffers, DatasyncOffer{GroupID: o.GroupId, MessageID: mID})
}
}
for _, ds := range datasyncMessage.Messages {
message, err := m.Clone()
if err != nil {
return err
}
message.EncryptionLayer.Payload = ds.Body
response.DatasyncMessages = append(response.DatasyncMessages, message)
}
return nil
}
// HandleMessages expects a whisper message as input, and it will go through
// a series of transformations until the message is parsed into an application
// layer message, or in case of Raw methods, the processing stops at the layer
// before.
// It returns an error only if the processing of required steps failed.
func (s *MessageSender) HandleMessages(wakuMessage *types.Message) (*HandleMessageResponse, error) {
logger := s.logger.With(zap.String("site", "HandleMessages"))
hlogger := logger.With(zap.String("hash", types.HexBytes(wakuMessage.Hash).String()))
response, err := s.handleMessage(wakuMessage)
if err != nil {
// Hash ratchet with a group id not found yet, save the message for future processing
if err == encryption.ErrHashRatchetGroupIDNotFound && len(response.Message.EncryptionLayer.HashRatchetInfo) == 1 {
info := response.Message.EncryptionLayer.HashRatchetInfo[0]
return nil, s.persistence.SaveHashRatchetMessage(info.GroupID, info.KeyID, wakuMessage)
}
// The current message segment has been successfully retrieved.
// However, the collection of segments is not yet complete.
if err == ErrMessageSegmentsIncomplete {
return nil, nil
}
return nil, err
}
// Process queued hash ratchet messages
for _, hashRatchetInfo := range response.Message.EncryptionLayer.HashRatchetInfo {
messages, err := s.persistence.GetHashRatchetMessages(hashRatchetInfo.KeyID)
if err != nil {
return nil, err
}
var processedIds [][]byte
for _, message := range messages {
hlogger.Info("handling out of order encrypted messages", zap.String("hash", types.Bytes2Hex(message.Hash)))
r, err := s.handleMessage(message)
if err != nil {
hlogger.Debug("failed to handle hash ratchet message", zap.Error(err))
continue
}
response.DatasyncMessages = append(response.toPublicResponse().StatusMessages, r.Messages()...)
response.DatasyncAcks = append(response.DatasyncAcks, r.DatasyncAcks...)
processedIds = append(processedIds, message.Hash)
}
err = s.persistence.DeleteHashRatchetMessages(processedIds)
if err != nil {
s.logger.Warn("failed to delete hash ratchet messages", zap.Error(err))
return nil, err
}
}
return response.toPublicResponse(), nil
}
type DatasyncOffer struct {
GroupID []byte
MessageID []byte
}
type HandleMessageResponse struct {
Hash []byte
StatusMessages []*v1protocol.StatusMessage
DatasyncSender *ecdsa.PublicKey
DatasyncAcks [][]byte
DatasyncOffers []DatasyncOffer
DatasyncRequests [][]byte
}
func (h *handleMessageResponse) toPublicResponse() *HandleMessageResponse {
return &HandleMessageResponse{
Hash: h.Hash,
StatusMessages: h.Messages(),
DatasyncSender: h.DatasyncSender,
DatasyncAcks: h.DatasyncAcks,
DatasyncOffers: h.DatasyncOffers,
DatasyncRequests: h.DatasyncRequests,
}
}
type handleMessageResponse struct {
Hash []byte
Message *v1protocol.StatusMessage
DatasyncMessages []*v1protocol.StatusMessage
DatasyncSender *ecdsa.PublicKey
DatasyncAcks [][]byte
DatasyncOffers []DatasyncOffer
DatasyncRequests [][]byte
}
func (h *handleMessageResponse) Messages() []*v1protocol.StatusMessage {
if len(h.DatasyncMessages) > 0 {
return h.DatasyncMessages
}
return []*v1protocol.StatusMessage{h.Message}
}
func (s *MessageSender) handleMessage(wakuMessage *types.Message) (*handleMessageResponse, error) {
logger := s.logger.With(zap.String("site", "handleMessage"))
hlogger := logger.With(zap.String("hash", types.EncodeHex(wakuMessage.Hash)))
message := &v1protocol.StatusMessage{}
response := &handleMessageResponse{
Hash: wakuMessage.Hash,
Message: message,
DatasyncMessages: []*v1protocol.StatusMessage{},
DatasyncAcks: [][]byte{},
}
err := message.HandleTransportLayer(wakuMessage)
if err != nil {
hlogger.Error("failed to handle transport layer message", zap.Error(err))
return nil, err
}
err = s.handleSegmentationLayer(message)
if err != nil {
hlogger.Debug("failed to handle segmentation layer message", zap.Error(err))
// Segments not completed yet, stop processing
if err == ErrMessageSegmentsIncomplete {
return nil, err
}
// Segments already completed, stop processing
if err == ErrMessageSegmentsAlreadyCompleted {
return nil, err
}
}
err = s.handleEncryptionLayer(context.Background(), message)
if err != nil {
hlogger.Debug("failed to handle an encryption message", zap.Error(err))
// Hash ratchet with a group id not found yet, stop processing
if err == encryption.ErrHashRatchetGroupIDNotFound {
return response, err
}
}
if s.datasync != nil && s.datasyncEnabled {
err := s.unwrapDatasyncMessage(message, response)
if err != nil {
hlogger.Debug("failed to handle datasync message", zap.Error(err))
}
}
for _, msg := range response.Messages() {
err := msg.HandleApplicationLayer()
if err != nil {
hlogger.Error("failed to handle application metadata layer message", zap.Error(err))
}
}
return response, nil
}
// fetchDecryptionKey returns the private key associated with this public key, and returns true if it's an ephemeral key
func (s *MessageSender) fetchDecryptionKey(destination *ecdsa.PublicKey) (*ecdsa.PrivateKey, bool) {
destinationID := types.EncodeHex(crypto.FromECDSAPub(destination))
s.ephemeralKeysMutex.Lock()
decryptionKey, ok := s.ephemeralKeys[destinationID]
s.ephemeralKeysMutex.Unlock()
// the key is not there, fallback on identity
if !ok {
return s.identity, false
}
return decryptionKey, true
}
func (s *MessageSender) handleEncryptionLayer(ctx context.Context, message *v1protocol.StatusMessage) error {
logger := s.logger.With(zap.String("site", "handleEncryptionLayer"))
publicKey := message.SigPubKey()
// if it's an ephemeral key, we don't negotiate a topic
decryptionKey, skipNegotiation := s.fetchDecryptionKey(message.TransportLayer.Dst)
err := message.HandleEncryptionLayer(decryptionKey, publicKey, s.protocol, skipNegotiation)
// if it's an ephemeral key, we don't have to handle a device not found error
if err == encryption.ErrDeviceNotFound && !skipNegotiation {
if err := s.handleErrDeviceNotFound(ctx, publicKey); err != nil {
logger.Error("failed to handle ErrDeviceNotFound", zap.Error(err))
}
}
if err != nil {
logger.Error("failed to handle an encrypted message", zap.Error(err))
return err
}
return nil
}
func (s *MessageSender) handleErrDeviceNotFound(ctx context.Context, publicKey *ecdsa.PublicKey) error {
now := time.Now().Unix()
advertise, err := s.protocol.ShouldAdvertiseBundle(publicKey, now)
if err != nil {
return err
}
if !advertise {
return nil
}
messageSpec, err := s.protocol.BuildBundleAdvertiseMessage(s.identity, publicKey)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
// We don't pass an array of messageIDs as no action needs to be taken
// when sending a bundle
_, _, err = s.sendMessageSpec(ctx, publicKey, messageSpec, nil)
if err != nil {
return err
}
s.protocol.ConfirmBundleAdvertisement(publicKey, now)
return nil
}
func (s *MessageSender) wrapMessageV1(rawMessage *RawMessage) ([]byte, error) {
wrappedMessage, err := v1protocol.WrapMessageV1(rawMessage.Payload, rawMessage.MessageType, rawMessage.Sender)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
return wrappedMessage, nil
}
func (s *MessageSender) addToDataSync(publicKey *ecdsa.PublicKey, message []byte) ([]byte, error) {
groupID := datasync.ToOneToOneGroupID(&s.identity.PublicKey, publicKey)
peerID := datasyncpeer.PublicKeyToPeerID(*publicKey)
exist, err := s.datasync.IsPeerInGroup(groupID, peerID)
if err != nil {
return nil, errors.Wrap(err, "failed to check if peer is in group")
}
if !exist {
if err := s.datasync.AddPeer(groupID, peerID); err != nil {
return nil, errors.Wrap(err, "failed to add peer")
}
}
id, err := s.datasync.AppendMessage(groupID, message)
if err != nil {
return nil, errors.Wrap(err, "failed to append message to datasync")
}
return id[:], nil
}
// sendPrivateRawMessage sends a message not wrapped in an encryption layer
func (s *MessageSender) sendPrivateRawMessage(ctx context.Context, rawMessage *RawMessage, publicKey *ecdsa.PublicKey, payload []byte) ([][]byte, []*types.NewMessage, error) {
newMessage := &types.NewMessage{
TTL: whisperTTL,
Payload: payload,
PowTarget: calculatePoW(payload),
PowTime: whisperPoWTime,
PubsubTopic: rawMessage.PubsubTopic,
}
newMessages, err := s.segmentMessage(newMessage)
if err != nil {
return nil, nil, err
}
hashes := make([][]byte, 0, len(newMessages))
var hash []byte
for _, newMessage := range newMessages {
if rawMessage.SendOnPersonalTopic {
hash, err = s.transport.SendPrivateOnPersonalTopic(ctx, newMessage, publicKey)
} else {
hash, err = s.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey)
}
if err != nil {
return nil, nil, err
}
hashes = append(hashes, hash)
}
return hashes, newMessages, nil
}
// sendCommunityMessage sends a message not wrapped in an encryption layer
// to a community
func (s *MessageSender) dispatchCommunityMessage(ctx context.Context, publicKey *ecdsa.PublicKey, wrappedMessage []byte, pubsubTopic string, rekey bool, rawMessage *RawMessage) ([][]byte, []*types.NewMessage, error) {
payload := wrappedMessage
if rekey && len(rawMessage.HashRatchetGroupID) != 0 {
var ratchet *encryption.HashRatchetKeyCompatibility
var err error
// We have just rekeyed, pull the latest
if RekeyCompatibility {
ratchet, err = s.protocol.GetCurrentKeyForGroup(rawMessage.HashRatchetGroupID)
if err != nil {
return nil, nil, err
}
}
keyID, err := ratchet.GetKeyID()
if err != nil {
return nil, nil, err
}
s.logger.Debug("adding key id to message", zap.String("keyid", types.Bytes2Hex(keyID)))
// We send the message over the community topic
spec, err := s.protocol.BuildHashRatchetReKeyGroupMessage(s.identity, rawMessage.Recipients, rawMessage.HashRatchetGroupID, wrappedMessage, ratchet)
if err != nil {
return nil, nil, err
}
payload, err = proto.Marshal(spec.Message)
if err != nil {
return nil, nil, err
}
}
newMessage := &types.NewMessage{
TTL: whisperTTL,
Payload: payload,
PowTarget: calculatePoW(payload),
PowTime: whisperPoWTime,
PubsubTopic: pubsubTopic,
}
newMessages, err := s.segmentMessage(newMessage)
if err != nil {
return nil, nil, err
}
hashes := make([][]byte, 0, len(newMessages))
for _, newMessage := range newMessages {
hash, err := s.transport.SendCommunityMessage(ctx, newMessage, publicKey)
if err != nil {
return nil, nil, err
}
hashes = append(hashes, hash)
}
return hashes, newMessages, nil
}
func (s *MessageSender) SendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec, messageIDs [][]byte) ([][]byte, []*types.NewMessage, error) {
return s.sendMessageSpec(ctx, publicKey, messageSpec, messageIDs)
}
// sendMessageSpec analyses the spec properties and selects a proper transport method.
func (s *MessageSender) sendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec, messageIDs [][]byte) ([][]byte, []*types.NewMessage, error) {
logger := s.logger.With(zap.String("site", "sendMessageSpec"))
newMessage, err := MessageSpecToWhisper(messageSpec)
if err != nil {
return nil, nil, err
}
newMessages, err := s.segmentMessage(newMessage)
if err != nil {
return nil, nil, err
}
hashes := make([][]byte, 0, len(newMessages))
var hash []byte
for _, newMessage := range newMessages {
// The shared secret needs to be handle before we send a message
// otherwise the topic might not be set up before we receive a message
if messageSpec.SharedSecret != nil && s.handleSharedSecrets != nil {
err := s.handleSharedSecrets([]*sharedsecret.Secret{messageSpec.SharedSecret})
if err != nil {
return nil, nil, err
}
}
// process shared secret
if messageSpec.AgreedSecret {
logger.Debug("sending using shared secret")
hash, err = s.transport.SendPrivateWithSharedSecret(ctx, newMessage, publicKey, messageSpec.SharedSecret.Key)
} else {
logger.Debug("sending partitioned topic")
hash, err = s.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey)
}
if err != nil {
return nil, nil, err
}
hashes = append(hashes, hash)
}
sentMessage := &SentMessage{
PublicKey: publicKey,
Spec: messageSpec,
MessageIDs: messageIDs,
}
s.notifyOnSentMessage(sentMessage)
return hashes, newMessages, nil
}
func (s *MessageSender) SubscribeToMessageEvents() <-chan *MessageEvent {
c := make(chan *MessageEvent, 100)
s.messageEventsSubscriptions = append(s.messageEventsSubscriptions, c)
return c
}
func (s *MessageSender) notifyOnSentMessage(sentMessage *SentMessage) {
event := &MessageEvent{
Type: MessageSent,
SentMessage: sentMessage,
}
s.messageEventsSubscriptionsMutex.Lock()
defer s.messageEventsSubscriptionsMutex.Unlock()
// Publish on channels, drop if buffer is full
for _, c := range s.messageEventsSubscriptions {
select {
case c <- event:
default:
s.logger.Warn("message events subscription channel full when publishing sent event, dropping message")
}
}
}
func (s *MessageSender) notifyOnScheduledMessage(recipient *ecdsa.PublicKey, message *RawMessage) {
event := &MessageEvent{
Recipient: recipient,
Type: MessageScheduled,
RawMessage: message,
}
s.messageEventsSubscriptionsMutex.Lock()
defer s.messageEventsSubscriptionsMutex.Unlock()
// Publish on channels, drop if buffer is full
for _, c := range s.messageEventsSubscriptions {
select {
case c <- event:
default:
s.logger.Warn("message events subscription channel full when publishing scheduled event, dropping message")
}
}
}
func (s *MessageSender) JoinPublic(id string) (*transport.Filter, error) {
return s.transport.JoinPublic(id)
}
// AddEphemeralKey adds an ephemeral key that we will be listening to
// note that we never removed them from now, as waku/whisper does not
// recalculate topics on removal, so effectively there's no benefit.
// On restart they will be gone.
func (s *MessageSender) AddEphemeralKey(privateKey *ecdsa.PrivateKey) (*transport.Filter, error) {
s.ephemeralKeysMutex.Lock()
s.ephemeralKeys[types.EncodeHex(crypto.FromECDSAPub(&privateKey.PublicKey))] = privateKey
s.ephemeralKeysMutex.Unlock()
return s.transport.LoadKeyFilters(privateKey)
}
func MessageSpecToWhisper(spec *encryption.ProtocolMessageSpec) (*types.NewMessage, error) {
var newMessage *types.NewMessage
payload, err := proto.Marshal(spec.Message)
if err != nil {
return newMessage, err
}
newMessage = &types.NewMessage{
TTL: whisperTTL,
Payload: payload,
PowTarget: calculatePoW(payload),
PowTime: whisperPoWTime,
}
return newMessage, nil
}
// calculatePoW returns the PoWTarget to be used.
// We check the size and arbitrarily set it to a lower PoW if the packet is
// greater than 50KB. We do this as the defaultPoW is too high for clients to send
// large messages.
func calculatePoW(payload []byte) float64 {
if len(payload) > largeSizeInBytes {
return whisperLargeSizePoW
}
return whisperDefaultPoW
}
func (s *MessageSender) StopDatasync() {
if s.datasync != nil {
s.datasync.Stop()
}
}
// GetCurrentKeyForGroup returns the latest key timestampID belonging to a key group
func (s *MessageSender) GetCurrentKeyForGroup(groupID []byte) (*encryption.HashRatchetKeyCompatibility, error) {
return s.protocol.GetCurrentKeyForGroup(groupID)
}
// GetKeyIDsForGroup returns a slice of key IDs belonging to a given group ID
func (s *MessageSender) GetKeysForGroup(groupID []byte) ([]*encryption.HashRatchetKeyCompatibility, error) {
return s.protocol.GetKeysForGroup(groupID)
}
// Segments message into smaller chunks if the size exceeds the maximum allowed
func segmentMessage(newMessage *types.NewMessage, maxSegmentSize int) ([]*types.NewMessage, error) {
if len(newMessage.Payload) <= maxSegmentSize {
return []*types.NewMessage{newMessage}, nil
}
createSegment := func(chunkPayload []byte) (*types.NewMessage, error) {
copy := &types.NewMessage{}
err := copier.Copy(copy, newMessage)
if err != nil {
return nil, err
}
copy.Payload = chunkPayload
copy.PowTarget = calculatePoW(chunkPayload)
return copy, nil
}
entireMessageHash := crypto.Keccak256(newMessage.Payload)
payloadSize := len(newMessage.Payload)
segmentsCount := int(math.Ceil(float64(payloadSize) / float64(maxSegmentSize)))
var segmentMessages []*types.NewMessage
for start, index := 0, 0; start < payloadSize; start += maxSegmentSize {
end := start + maxSegmentSize
if end > payloadSize {
end = payloadSize
}
chunk := newMessage.Payload[start:end]
segmentMessageProto := &protobuf.SegmentMessage{
EntireMessageHash: entireMessageHash,
Index: uint32(index),
SegmentsCount: uint32(segmentsCount),
Payload: chunk,
}
chunkPayload, err := proto.Marshal(segmentMessageProto)
if err != nil {
return nil, err
}
segmentMessage, err := createSegment(chunkPayload)
if err != nil {
return nil, err
}
segmentMessages = append(segmentMessages, segmentMessage)
index++
}
return segmentMessages, nil
}
func (s *MessageSender) segmentMessage(newMessage *types.NewMessage) ([]*types.NewMessage, error) {
// We set the max message size to 3/4 of the allowed message size, to leave
// room for segment message metadata.
newMessages, err := segmentMessage(newMessage, int(s.transport.MaxMessageSize()/4*3))
s.logger.Debug("message segmented", zap.Int("segments", len(newMessages)))
return newMessages, err
}
var ErrMessageSegmentsIncomplete = errors.New("message segments incomplete")
var ErrMessageSegmentsAlreadyCompleted = errors.New("message segments already completed")
var ErrMessageSegmentsInvalidCount = errors.New("invalid segments count")
var ErrMessageSegmentsHashMismatch = errors.New("hash of entire payload does not match")
func (s *MessageSender) handleSegmentationLayer(message *v1protocol.StatusMessage) error {
logger := s.logger.With(zap.String("site", "handleSegmentationLayer"))
hlogger := logger.With(zap.String("hash", types.HexBytes(message.TransportLayer.Hash).String()))
var segmentMessage protobuf.SegmentMessage
err := proto.Unmarshal(message.TransportLayer.Payload, &segmentMessage)
if err != nil {
return errors.Wrap(err, "failed to unmarshal SegmentMessage")
}
hlogger.Debug("handling message segment", zap.String("EntireMessageHash", types.HexBytes(segmentMessage.EntireMessageHash).String()),
zap.Uint32("Index", segmentMessage.Index), zap.Uint32("SegmentsCount", segmentMessage.SegmentsCount))
alreadyCompleted, err := s.persistence.IsMessageAlreadyCompleted(segmentMessage.EntireMessageHash)
if err != nil {
return err
}
if alreadyCompleted {
return ErrMessageSegmentsAlreadyCompleted
}
if segmentMessage.SegmentsCount < 2 {
return ErrMessageSegmentsInvalidCount
}
err = s.persistence.SaveMessageSegment(&segmentMessage, message.TransportLayer.SigPubKey, time.Now().Unix())
if err != nil {
return err
}
segments, err := s.persistence.GetMessageSegments(segmentMessage.EntireMessageHash, message.TransportLayer.SigPubKey)
if err != nil {
return err
}
if len(segments) != int(segmentMessage.SegmentsCount) {
return ErrMessageSegmentsIncomplete
}
// Combine payload
var entirePayload bytes.Buffer
for _, segment := range segments {
_, err := entirePayload.Write(segment.Payload)
if err != nil {
return errors.Wrap(err, "failed to write segment payload")
}
}
// Sanity check
entirePayloadHash := crypto.Keccak256(entirePayload.Bytes())
if !bytes.Equal(entirePayloadHash, segmentMessage.EntireMessageHash) {
return ErrMessageSegmentsHashMismatch
}
err = s.persistence.CompleteMessageSegments(segmentMessage.EntireMessageHash, message.TransportLayer.SigPubKey, time.Now().Unix())
if err != nil {
return err
}
message.TransportLayer.Payload = entirePayload.Bytes()
return nil
}
func (s *MessageSender) CleanupSegments() error {
weekAgo := time.Now().AddDate(0, 0, -7).Unix()
monthAgo := time.Now().AddDate(0, -1, 0).Unix()
err := s.persistence.RemoveMessageSegmentsOlderThan(weekAgo)
if err != nil {
return err
}
err = s.persistence.RemoveMessageSegmentsCompletedOlderThan(monthAgo)
if err != nil {
return err
}
return nil
}