status-go/protocol/common/message_sender.go
Andrea Maria Piana 605fe40e32 Fix encryption metadata issues #4613
This commit fixes a few issues with communities encryption:

Key distribution was disconnected from the community description, this created a case where the key would arrive after the community description and that would result in the client thinking that it was kicked.
To overcome this, we added a message that signals the user that is kicked. Also, we distribute the key with the community description so that there's no more issues with timing.
This is a bit expensive for large communities, and it will require some further optimizations.

Key distribution is now also connected to the request to join response, so there are no timing issues.

Fixes an issue with key distribution (race condition) where the community would be modified before being compared, resulting in a comparison of two identical communities, which would result in no key being distributed. This commit only partially address the issue.
2024-02-07 10:25:41 +00:00

1421 lines
43 KiB
Go

package common
import (
"bytes"
"context"
"crypto/ecdsa"
"database/sql"
"math"
"sync"
"time"
"github.com/golang/protobuf/proto"
"github.com/jinzhu/copier"
"github.com/pkg/errors"
datasyncnode "github.com/status-im/mvds/node"
datasyncproto "github.com/status-im/mvds/protobuf"
"github.com/status-im/mvds/state"
"go.uber.org/zap"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/datasync"
datasyncpeer "github.com/status-im/status-go/protocol/datasync/peer"
"github.com/status-im/status-go/protocol/encryption"
"github.com/status-im/status-go/protocol/encryption/sharedsecret"
"github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/transport"
v1protocol "github.com/status-im/status-go/protocol/v1"
)
// Whisper message properties.
const (
whisperTTL = 15
whisperDefaultPoW = 0.002
// whisperLargeSizePoW is the PoWTarget for larger payload sizes
whisperLargeSizePoW = 0.000002
// largeSizeInBytes is when should we be using a lower POW.
// Roughly this is 50KB
largeSizeInBytes = 50000
whisperPoWTime = 5
)
// RekeyCompatibility indicates whether we should be sending
// keys in 1-to-1 messages as well as in the newer format
var RekeyCompatibility = true
// SentMessage reprent a message that has been passed to the transport layer
type SentMessage struct {
PublicKey *ecdsa.PublicKey
Spec *encryption.ProtocolMessageSpec
MessageIDs [][]byte
}
type MessageEventType uint32
const (
MessageScheduled = iota + 1
MessageSent
)
type MessageEvent struct {
Recipient *ecdsa.PublicKey
Type MessageEventType
SentMessage *SentMessage
RawMessage *RawMessage
}
type MessageSender struct {
identity *ecdsa.PrivateKey
datasync *datasync.DataSync
database *sql.DB
protocol *encryption.Protocol
transport *transport.Transport
logger *zap.Logger
persistence *RawMessagesPersistence
datasyncEnabled bool
// ephemeralKeys is a map that contains the ephemeral keys of the client, used
// to decrypt messages
ephemeralKeys map[string]*ecdsa.PrivateKey
ephemeralKeysMutex sync.Mutex
// messageEventsSubscriptions contains all the subscriptions for message events
messageEventsSubscriptions []chan<- *MessageEvent
featureFlags FeatureFlags
// handleSharedSecrets is a callback that is called every time a new shared secret is negotiated
handleSharedSecrets func([]*sharedsecret.Secret) error
}
func NewMessageSender(
identity *ecdsa.PrivateKey,
database *sql.DB,
enc *encryption.Protocol,
transport *transport.Transport,
logger *zap.Logger,
features FeatureFlags,
) (*MessageSender, error) {
p := &MessageSender{
identity: identity,
datasyncEnabled: features.Datasync,
protocol: enc,
database: database,
persistence: NewRawMessagesPersistence(database),
transport: transport,
logger: logger,
ephemeralKeys: make(map[string]*ecdsa.PrivateKey),
featureFlags: features,
}
return p, nil
}
func (s *MessageSender) Stop() {
for _, c := range s.messageEventsSubscriptions {
close(c)
}
s.messageEventsSubscriptions = nil
s.StopDatasync()
}
func (s *MessageSender) SetHandleSharedSecrets(handler func([]*sharedsecret.Secret) error) {
s.handleSharedSecrets = handler
}
func (s *MessageSender) StartDatasync(handler func(peer state.PeerID, payload *datasyncproto.Payload) error) error {
if !s.datasyncEnabled {
return nil
}
dataSyncTransport := datasync.NewNodeTransport()
dataSyncNode, err := datasyncnode.NewPersistentNode(
s.database,
dataSyncTransport,
datasyncpeer.PublicKeyToPeerID(s.identity.PublicKey),
datasyncnode.BATCH,
datasync.CalculateSendTime,
s.logger,
)
if err != nil {
return err
}
s.datasync = datasync.New(dataSyncNode, dataSyncTransport, true, s.logger)
s.datasync.Init(handler, s.logger)
s.datasync.Start(datasync.DatasyncTicker)
return nil
}
// SendPrivate takes encoded data, encrypts it and sends through the wire.
func (s *MessageSender) SendPrivate(
ctx context.Context,
recipient *ecdsa.PublicKey,
rawMessage *RawMessage,
) ([]byte, error) {
s.logger.Debug(
"sending a private message",
zap.String("public-key", types.EncodeHex(crypto.FromECDSAPub(recipient))),
zap.String("site", "SendPrivate"),
)
// Currently we don't support sending through datasync and setting custom waku fields,
// as the datasync interface is not rich enough to propagate that information, so we
// would have to add some complexity to handle this.
if rawMessage.ResendAutomatically && (rawMessage.Sender != nil || rawMessage.SkipEncryptionLayer || rawMessage.SendOnPersonalTopic) {
return nil, errors.New("setting identity, skip-encryption or personal topic and datasync not supported")
}
// Set sender identity if not specified
if rawMessage.Sender == nil {
rawMessage.Sender = s.identity
}
return s.sendPrivate(ctx, recipient, rawMessage)
}
// SendCommunityMessage takes encoded data, encrypts it and sends through the wire
// using the community topic and their key
func (s *MessageSender) SendCommunityMessage(
ctx context.Context,
rawMessage RawMessage,
) ([]byte, error) {
s.logger.Debug(
"sending a community message",
zap.String("communityId", types.EncodeHex(rawMessage.CommunityID)),
zap.String("site", "SendCommunityMessage"),
)
rawMessage.Sender = s.identity
return s.sendCommunity(ctx, &rawMessage)
}
// SendPubsubTopicKey sends the protected topic key for a community to a list of recipients
func (s *MessageSender) SendPubsubTopicKey(
ctx context.Context,
rawMessage *RawMessage,
) ([]byte, error) {
s.logger.Debug(
"sending the protected topic key for a community",
zap.String("communityId", types.EncodeHex(rawMessage.CommunityID)),
zap.String("site", "SendPubsubTopicKey"),
)
rawMessage.Sender = s.identity
messageID, err := s.getMessageID(rawMessage)
if err != nil {
return nil, err
}
rawMessage.ID = types.EncodeHex(messageID)
// Notify before dispatching, otherwise the dispatch subscription might happen
// earlier than the scheduled
s.notifyOnScheduledMessage(nil, rawMessage)
// Send to each recipients
for _, recipient := range rawMessage.Recipients {
_, err = s.sendPrivate(ctx, recipient, rawMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to send message")
}
}
return messageID, nil
}
// SendGroup takes encoded data, encrypts it and sends through the wire,
// always return the messageID
func (s *MessageSender) SendGroup(
ctx context.Context,
recipients []*ecdsa.PublicKey,
rawMessage RawMessage,
) ([]byte, error) {
s.logger.Debug(
"sending a private group message",
zap.String("site", "SendGroup"),
)
// Set sender if not specified
if rawMessage.Sender == nil {
rawMessage.Sender = s.identity
}
// Calculate messageID first and set on raw message
wrappedMessage, err := s.wrapMessageV1(&rawMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
rawMessage.ID = types.EncodeHex(messageID)
// We call it only once, and we nil the function after so it doesn't get called again
if rawMessage.BeforeDispatch != nil {
if err := rawMessage.BeforeDispatch(&rawMessage); err != nil {
return nil, err
}
}
// Send to each recipients
for _, recipient := range recipients {
_, err = s.sendPrivate(ctx, recipient, &rawMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to send message")
}
}
return messageID, nil
}
func (s *MessageSender) getMessageID(rawMessage *RawMessage) (types.HexBytes, error) {
wrappedMessage, err := s.wrapMessageV1(rawMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
return messageID, nil
}
func ShouldCommunityMessageBeEncrypted(msgType protobuf.ApplicationMetadataMessage_Type) bool {
return msgType == protobuf.ApplicationMetadataMessage_CHAT_MESSAGE ||
msgType == protobuf.ApplicationMetadataMessage_EDIT_MESSAGE ||
msgType == protobuf.ApplicationMetadataMessage_DELETE_MESSAGE ||
msgType == protobuf.ApplicationMetadataMessage_PIN_MESSAGE ||
msgType == protobuf.ApplicationMetadataMessage_EMOJI_REACTION
}
// sendCommunity sends a message that's to be sent in a community
// If it's a chat message, it will go to the respective topic derived by the
// chat id, if it's not a chat message, it will go to the community topic.
func (s *MessageSender) sendCommunity(
ctx context.Context,
rawMessage *RawMessage,
) ([]byte, error) {
s.logger.Debug("sending community message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(&rawMessage.Sender.PublicKey))))
// Set sender
if rawMessage.Sender == nil {
rawMessage.Sender = s.identity
}
messageID, err := s.getMessageID(rawMessage)
if err != nil {
return nil, err
}
rawMessage.ID = types.EncodeHex(messageID)
if rawMessage.BeforeDispatch != nil {
if err := rawMessage.BeforeDispatch(rawMessage); err != nil {
return nil, err
}
}
// Notify before dispatching, otherwise the dispatch subscription might happen
// earlier than the scheduled
s.notifyOnScheduledMessage(nil, rawMessage)
var hashes [][]byte
var newMessages []*types.NewMessage
forceRekey := rawMessage.CommunityKeyExMsgType == KeyExMsgRekey
// Check if it's a key exchange message. In this case we send it
// to all the recipients
if rawMessage.CommunityKeyExMsgType != KeyExMsgNone {
// If rekeycompatibility is on, we always
// want to execute below, otherwise we execute
// only when we want to fill up old keys to a given user
if RekeyCompatibility || !forceRekey {
keyExMessageSpecs, err := s.protocol.GetKeyExMessageSpecs(rawMessage.HashRatchetGroupID, s.identity, rawMessage.Recipients, forceRekey)
if err != nil {
return nil, err
}
for i, spec := range keyExMessageSpecs {
recipient := rawMessage.Recipients[i]
_, _, err = s.sendMessageSpec(ctx, recipient, spec, [][]byte{messageID})
if err != nil {
return nil, err
}
}
}
}
wrappedMessage, err := s.wrapMessageV1(rawMessage)
if err != nil {
return nil, err
}
// If it's a chat message, we send it on the community chat topic
if ShouldCommunityMessageBeEncrypted(rawMessage.MessageType) {
messageSpec, err := s.protocol.BuildHashRatchetMessage(rawMessage.HashRatchetGroupID, wrappedMessage)
if err != nil {
return nil, err
}
payload, err := proto.Marshal(messageSpec.Message)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal")
}
hashes, newMessages, err = s.dispatchCommunityChatMessage(ctx, rawMessage, payload, forceRekey)
if err != nil {
return nil, err
}
sentMessage := &SentMessage{
Spec: messageSpec,
MessageIDs: [][]byte{messageID},
}
s.notifyOnSentMessage(sentMessage)
} else {
pubkey, err := crypto.DecompressPubkey(rawMessage.CommunityID)
if err != nil {
return nil, errors.Wrap(err, "failed to decompress pubkey")
}
hashes, newMessages, err = s.dispatchCommunityMessage(ctx, pubkey, wrappedMessage, rawMessage.PubsubTopic, forceRekey, rawMessage)
if err != nil {
s.logger.Error("failed to send a community message", zap.Error(err))
return nil, errors.Wrap(err, "failed to send a message spec")
}
}
s.logger.Debug("sent community message ", zap.String("messageID", messageID.String()), zap.Strings("hashes", types.EncodeHexes(hashes)))
s.transport.Track(messageID, hashes, newMessages)
return messageID, nil
}
// sendPrivate sends data to the recipient identifying with a given public key.
func (s *MessageSender) sendPrivate(
ctx context.Context,
recipient *ecdsa.PublicKey,
rawMessage *RawMessage,
) ([]byte, error) {
s.logger.Debug("sending private message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient))))
var wrappedMessage []byte
var err error
if rawMessage.SkipApplicationWrap {
wrappedMessage = rawMessage.Payload
} else {
wrappedMessage, err = s.wrapMessageV1(rawMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
}
messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
rawMessage.ID = types.EncodeHex(messageID)
if rawMessage.BeforeDispatch != nil {
if err := rawMessage.BeforeDispatch(rawMessage); err != nil {
return nil, err
}
}
// Notify before dispatching, otherwise the dispatch subscription might happen
// earlier than the scheduled
s.notifyOnScheduledMessage(recipient, rawMessage)
if s.datasync != nil && s.featureFlags.Datasync && rawMessage.ResendAutomatically {
// No need to call transport tracking.
// It is done in a data sync dispatch step.
datasyncID, err := s.addToDataSync(recipient, wrappedMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to send message with datasync")
}
// We don't need to receive confirmations from our own devices
if !IsPubKeyEqual(recipient, &s.identity.PublicKey) {
confirmation := &RawMessageConfirmation{
DataSyncID: datasyncID,
MessageID: messageID,
PublicKey: crypto.CompressPubkey(recipient),
}
err = s.persistence.InsertPendingConfirmation(confirmation)
if err != nil {
return nil, err
}
}
} else if rawMessage.SkipEncryptionLayer {
messageBytes := wrappedMessage
if rawMessage.CommunityKeyExMsgType == KeyExMsgReuse {
groupID := rawMessage.HashRatchetGroupID
ratchets, err := s.protocol.GetKeysForGroup(groupID)
if err != nil {
return nil, err
}
message, err := s.protocol.BuildHashRatchetKeyExchangeMessageWithPayload(s.identity, recipient, groupID, ratchets, wrappedMessage)
if err != nil {
return nil, err
}
messageBytes, err = proto.Marshal(message.Message)
if err != nil {
return nil, err
}
}
// When SkipProtocolLayer is set we don't pass the message to the encryption layer
hashes, newMessages, err := s.sendPrivateRawMessage(ctx, rawMessage, recipient, messageBytes)
if err != nil {
s.logger.Error("failed to send a private message", zap.Error(err))
return nil, errors.Wrap(err, "failed to send a message spec")
}
s.logger.Debug("sent private message skipProtocolLayer", zap.String("messageID", messageID.String()), zap.Strings("hashes", types.EncodeHexes(hashes)))
s.transport.Track(messageID, hashes, newMessages)
} else {
messageSpec, err := s.protocol.BuildEncryptedMessage(rawMessage.Sender, recipient, wrappedMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to encrypt message")
}
hashes, newMessages, err := s.sendMessageSpec(ctx, recipient, messageSpec, [][]byte{messageID})
if err != nil {
s.logger.Error("failed to send a private message", zap.Error(err))
return nil, errors.Wrap(err, "failed to send a message spec")
}
s.logger.Debug("sent private message without datasync", zap.String("messageID", messageID.String()), zap.Strings("hashes", types.EncodeHexes(hashes)))
s.transport.Track(messageID, hashes, newMessages)
}
return messageID, nil
}
// sendPairInstallation sends data to the recipients, using DH
func (s *MessageSender) SendPairInstallation(
ctx context.Context,
recipient *ecdsa.PublicKey,
rawMessage RawMessage,
) ([]byte, error) {
s.logger.Debug("sending private message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient))))
wrappedMessage, err := s.wrapMessageV1(&rawMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
messageSpec, err := s.protocol.BuildDHMessage(s.identity, recipient, wrappedMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to encrypt message")
}
messageID := v1protocol.MessageID(&s.identity.PublicKey, wrappedMessage)
hashes, newMessages, err := s.sendMessageSpec(ctx, recipient, messageSpec, [][]byte{messageID})
if err != nil {
return nil, errors.Wrap(err, "failed to send a message spec")
}
s.transport.Track(messageID, hashes, newMessages)
return messageID, nil
}
func (s *MessageSender) encodeMembershipUpdate(
message v1protocol.MembershipUpdateMessage,
chatEntity ChatEntity,
) ([]byte, error) {
if chatEntity != nil {
chatEntityProtobuf := chatEntity.GetProtobuf()
switch chatEntityProtobuf := chatEntityProtobuf.(type) {
case *protobuf.ChatMessage:
message.Message = chatEntityProtobuf
case *protobuf.EmojiReaction:
message.EmojiReaction = chatEntityProtobuf
}
}
encodedMessage, err := v1protocol.EncodeMembershipUpdateMessage(message)
if err != nil {
return nil, errors.Wrap(err, "failed to encode membership update message")
}
return encodedMessage, nil
}
// EncodeMembershipUpdate takes a group and an optional chat message and returns the protobuf representation to be sent on the wire.
// All the events in a group are encoded and added to the payload
func (s *MessageSender) EncodeMembershipUpdate(
group *v1protocol.Group,
chatEntity ChatEntity,
) ([]byte, error) {
message := v1protocol.MembershipUpdateMessage{
ChatID: group.ChatID(),
Events: group.Events(),
}
return s.encodeMembershipUpdate(message, chatEntity)
}
// EncodeAbridgedMembershipUpdate takes a group and an optional chat message and returns the protobuf representation to be sent on the wire.
// Only the events relevant to the current group are encoded
func (s *MessageSender) EncodeAbridgedMembershipUpdate(
group *v1protocol.Group,
chatEntity ChatEntity,
) ([]byte, error) {
message := v1protocol.MembershipUpdateMessage{
ChatID: group.ChatID(),
Events: group.AbridgedEvents(),
}
return s.encodeMembershipUpdate(message, chatEntity)
}
func (s *MessageSender) dispatchCommunityChatMessage(ctx context.Context, rawMessage *RawMessage, wrappedMessage []byte, rekey bool) ([][]byte, []*types.NewMessage, error) {
payload := wrappedMessage
var err error
if rekey && len(rawMessage.HashRatchetGroupID) != 0 {
var ratchet *encryption.HashRatchetKeyCompatibility
// We have just rekeyed, pull the latest
if RekeyCompatibility {
ratchet, err = s.protocol.GetCurrentKeyForGroup(rawMessage.HashRatchetGroupID)
if err != nil {
return nil, nil, err
}
}
// We send the message over the community topic
spec, err := s.protocol.BuildHashRatchetReKeyGroupMessage(s.identity, rawMessage.Recipients, rawMessage.HashRatchetGroupID, wrappedMessage, ratchet)
if err != nil {
return nil, nil, err
}
payload, err = proto.Marshal(spec.Message)
if err != nil {
return nil, nil, err
}
}
newMessage := &types.NewMessage{
TTL: whisperTTL,
Payload: payload,
PowTarget: calculatePoW(payload),
PowTime: whisperPoWTime,
PubsubTopic: rawMessage.PubsubTopic,
}
if rawMessage.BeforeDispatch != nil {
if err := rawMessage.BeforeDispatch(rawMessage); err != nil {
return nil, nil, err
}
}
// notify before dispatching
s.notifyOnScheduledMessage(nil, rawMessage)
newMessages, err := s.segmentMessage(newMessage)
if err != nil {
return nil, nil, err
}
hashes := make([][]byte, 0, len(newMessages))
for _, newMessage := range newMessages {
hash, err := s.transport.SendPublic(ctx, newMessage, rawMessage.LocalChatID)
if err != nil {
return nil, nil, err
}
hashes = append(hashes, hash)
}
return hashes, newMessages, nil
}
// SendPublic takes encoded data, encrypts it and sends through the wire.
func (s *MessageSender) SendPublic(
ctx context.Context,
chatName string,
rawMessage RawMessage,
) ([]byte, error) {
// Set sender
if rawMessage.Sender == nil {
rawMessage.Sender = s.identity
}
var wrappedMessage []byte
var err error
if rawMessage.SkipApplicationWrap {
wrappedMessage = rawMessage.Payload
} else {
wrappedMessage, err = s.wrapMessageV1(&rawMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
}
var newMessage *types.NewMessage
messageSpec, err := s.protocol.BuildPublicMessage(s.identity, wrappedMessage)
if err != nil {
s.logger.Error("failed to send a public message", zap.Error(err))
return nil, errors.Wrap(err, "failed to wrap a public message in the encryption layer")
}
if len(rawMessage.HashRatchetGroupID) != 0 {
var ratchet *encryption.HashRatchetKeyCompatibility
var err error
// We have just rekeyed, pull the latest
ratchet, err = s.protocol.GetCurrentKeyForGroup(rawMessage.HashRatchetGroupID)
if err != nil {
return nil, err
}
keyID, err := ratchet.GetKeyID()
if err != nil {
return nil, err
}
s.logger.Debug("adding key id to message", zap.String("keyid", types.Bytes2Hex(keyID)))
// We send the message over the community topic
spec, err := s.protocol.BuildHashRatchetReKeyGroupMessage(s.identity, rawMessage.Recipients, rawMessage.HashRatchetGroupID, wrappedMessage, ratchet)
if err != nil {
return nil, err
}
newMessage, err = MessageSpecToWhisper(spec)
if err != nil {
return nil, err
}
} else if !rawMessage.SkipEncryptionLayer {
newMessage, err = MessageSpecToWhisper(messageSpec)
if err != nil {
return nil, err
}
} else {
newMessage = &types.NewMessage{
TTL: whisperTTL,
Payload: wrappedMessage,
PowTarget: calculatePoW(wrappedMessage),
PowTime: whisperPoWTime,
}
}
newMessage.Ephemeral = rawMessage.Ephemeral
newMessage.PubsubTopic = rawMessage.PubsubTopic
messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
rawMessage.ID = types.EncodeHex(messageID)
if rawMessage.BeforeDispatch != nil {
if err := rawMessage.BeforeDispatch(&rawMessage); err != nil {
return nil, err
}
}
// notify before dispatching
s.notifyOnScheduledMessage(nil, &rawMessage)
newMessages, err := s.segmentMessage(newMessage)
if err != nil {
return nil, err
}
hashes := make([][]byte, 0, len(newMessages))
for _, newMessage := range newMessages {
hash, err := s.transport.SendPublic(ctx, newMessage, chatName)
if err != nil {
return nil, err
}
hashes = append(hashes, hash)
}
sentMessage := &SentMessage{
Spec: messageSpec,
MessageIDs: [][]byte{messageID},
}
s.notifyOnSentMessage(sentMessage)
s.logger.Debug("sent public message", zap.String("messageID", messageID.String()), zap.Strings("hashes", types.EncodeHexes(hashes)))
s.transport.Track(messageID, hashes, newMessages)
return messageID, nil
}
// unwrapDatasyncMessage tries to unwrap message as datasync one and in case of success
// returns cloned messages with replaced payloads
func (s *MessageSender) unwrapDatasyncMessage(m *v1protocol.StatusMessage, response *handleMessageResponse) error {
datasyncMessage, err := s.datasync.Unwrap(
m.SigPubKey(),
m.EncryptionLayer.Payload,
)
if err != nil {
return err
}
response.DatasyncSender = m.SigPubKey()
response.DatasyncAcks = append(response.DatasyncAcks, datasyncMessage.Acks...)
response.DatasyncRequests = append(response.DatasyncRequests, datasyncMessage.Requests...)
for _, o := range datasyncMessage.GroupOffers {
for _, mID := range o.MessageIds {
response.DatasyncOffers = append(response.DatasyncOffers, DatasyncOffer{GroupID: o.GroupId, MessageID: mID})
}
}
for _, ds := range datasyncMessage.Messages {
message, err := m.Clone()
if err != nil {
return err
}
message.EncryptionLayer.Payload = ds.Body
response.DatasyncMessages = append(response.DatasyncMessages, message)
}
return nil
}
// HandleMessages expects a whisper message as input, and it will go through
// a series of transformations until the message is parsed into an application
// layer message, or in case of Raw methods, the processing stops at the layer
// before.
// It returns an error only if the processing of required steps failed.
func (s *MessageSender) HandleMessages(wakuMessage *types.Message) (*HandleMessageResponse, error) {
logger := s.logger.With(zap.String("site", "HandleMessages"))
hlogger := logger.With(zap.String("hash", types.HexBytes(wakuMessage.Hash).String()))
response, err := s.handleMessage(wakuMessage)
if err != nil {
// Hash ratchet with a group id not found yet, save the message for future processing
if err == encryption.ErrHashRatchetGroupIDNotFound && len(response.Message.EncryptionLayer.HashRatchetInfo) == 1 {
info := response.Message.EncryptionLayer.HashRatchetInfo[0]
return nil, s.persistence.SaveHashRatchetMessage(info.GroupID, info.KeyID, wakuMessage)
}
// The current message segment has been successfully retrieved.
// However, the collection of segments is not yet complete.
if err == ErrMessageSegmentsIncomplete {
return nil, nil
}
return nil, err
}
// Process queued hash ratchet messages
for _, hashRatchetInfo := range response.Message.EncryptionLayer.HashRatchetInfo {
messages, err := s.persistence.GetHashRatchetMessages(hashRatchetInfo.KeyID)
if err != nil {
return nil, err
}
var processedIds [][]byte
for _, message := range messages {
hlogger.Info("handling out of order encrypted messages", zap.String("hash", types.Bytes2Hex(message.Hash)))
r, err := s.handleMessage(message)
if err != nil {
hlogger.Debug("failed to handle hash ratchet message", zap.Error(err))
continue
}
response.DatasyncMessages = append(response.toPublicResponse().StatusMessages, r.Messages()...)
response.DatasyncAcks = append(response.DatasyncAcks, r.DatasyncAcks...)
processedIds = append(processedIds, message.Hash)
}
err = s.persistence.DeleteHashRatchetMessages(processedIds)
if err != nil {
s.logger.Warn("failed to delete hash ratchet messages", zap.Error(err))
return nil, err
}
}
return response.toPublicResponse(), nil
}
type DatasyncOffer struct {
GroupID []byte
MessageID []byte
}
type HandleMessageResponse struct {
Hash []byte
StatusMessages []*v1protocol.StatusMessage
DatasyncSender *ecdsa.PublicKey
DatasyncAcks [][]byte
DatasyncOffers []DatasyncOffer
DatasyncRequests [][]byte
}
func (h *handleMessageResponse) toPublicResponse() *HandleMessageResponse {
return &HandleMessageResponse{
Hash: h.Hash,
StatusMessages: h.Messages(),
DatasyncSender: h.DatasyncSender,
DatasyncAcks: h.DatasyncAcks,
DatasyncOffers: h.DatasyncOffers,
DatasyncRequests: h.DatasyncRequests,
}
}
type handleMessageResponse struct {
Hash []byte
Message *v1protocol.StatusMessage
DatasyncMessages []*v1protocol.StatusMessage
DatasyncSender *ecdsa.PublicKey
DatasyncAcks [][]byte
DatasyncOffers []DatasyncOffer
DatasyncRequests [][]byte
}
func (h *handleMessageResponse) Messages() []*v1protocol.StatusMessage {
if len(h.DatasyncMessages) > 0 {
return h.DatasyncMessages
}
return []*v1protocol.StatusMessage{h.Message}
}
func (s *MessageSender) handleMessage(wakuMessage *types.Message) (*handleMessageResponse, error) {
logger := s.logger.With(zap.String("site", "handleMessage"))
hlogger := logger.With(zap.String("hash", types.EncodeHex(wakuMessage.Hash)))
message := &v1protocol.StatusMessage{}
response := &handleMessageResponse{
Hash: wakuMessage.Hash,
Message: message,
DatasyncMessages: []*v1protocol.StatusMessage{},
DatasyncAcks: [][]byte{},
}
err := message.HandleTransportLayer(wakuMessage)
if err != nil {
hlogger.Error("failed to handle transport layer message", zap.Error(err))
return nil, err
}
err = s.handleSegmentationLayer(message)
if err != nil {
hlogger.Debug("failed to handle segmentation layer message", zap.Error(err))
// Segments not completed yet, stop processing
if err == ErrMessageSegmentsIncomplete {
return nil, err
}
// Segments already completed, stop processing
if err == ErrMessageSegmentsAlreadyCompleted {
return nil, err
}
}
err = s.handleEncryptionLayer(context.Background(), message)
if err != nil {
hlogger.Debug("failed to handle an encryption message", zap.Error(err))
// Hash ratchet with a group id not found yet, stop processing
if err == encryption.ErrHashRatchetGroupIDNotFound {
return response, err
}
}
if s.datasync != nil && s.datasyncEnabled {
err := s.unwrapDatasyncMessage(message, response)
if err != nil {
hlogger.Debug("failed to handle datasync message", zap.Error(err))
}
}
for _, msg := range response.Messages() {
err := msg.HandleApplicationLayer()
if err != nil {
hlogger.Error("failed to handle application metadata layer message", zap.Error(err))
}
}
return response, nil
}
// fetchDecryptionKey returns the private key associated with this public key, and returns true if it's an ephemeral key
func (s *MessageSender) fetchDecryptionKey(destination *ecdsa.PublicKey) (*ecdsa.PrivateKey, bool) {
destinationID := types.EncodeHex(crypto.FromECDSAPub(destination))
s.ephemeralKeysMutex.Lock()
decryptionKey, ok := s.ephemeralKeys[destinationID]
s.ephemeralKeysMutex.Unlock()
// the key is not there, fallback on identity
if !ok {
return s.identity, false
}
return decryptionKey, true
}
func (s *MessageSender) handleEncryptionLayer(ctx context.Context, message *v1protocol.StatusMessage) error {
logger := s.logger.With(zap.String("site", "handleEncryptionLayer"))
publicKey := message.SigPubKey()
// if it's an ephemeral key, we don't negotiate a topic
decryptionKey, skipNegotiation := s.fetchDecryptionKey(message.TransportLayer.Dst)
err := message.HandleEncryptionLayer(decryptionKey, publicKey, s.protocol, skipNegotiation)
// if it's an ephemeral key, we don't have to handle a device not found error
if err == encryption.ErrDeviceNotFound && !skipNegotiation {
if err := s.handleErrDeviceNotFound(ctx, publicKey); err != nil {
logger.Error("failed to handle ErrDeviceNotFound", zap.Error(err))
}
}
if err != nil {
logger.Error("failed to handle an encrypted message", zap.Error(err))
return err
}
return nil
}
func (s *MessageSender) handleErrDeviceNotFound(ctx context.Context, publicKey *ecdsa.PublicKey) error {
now := time.Now().Unix()
advertise, err := s.protocol.ShouldAdvertiseBundle(publicKey, now)
if err != nil {
return err
}
if !advertise {
return nil
}
messageSpec, err := s.protocol.BuildBundleAdvertiseMessage(s.identity, publicKey)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
// We don't pass an array of messageIDs as no action needs to be taken
// when sending a bundle
_, _, err = s.sendMessageSpec(ctx, publicKey, messageSpec, nil)
if err != nil {
return err
}
s.protocol.ConfirmBundleAdvertisement(publicKey, now)
return nil
}
func (s *MessageSender) wrapMessageV1(rawMessage *RawMessage) ([]byte, error) {
wrappedMessage, err := v1protocol.WrapMessageV1(rawMessage.Payload, rawMessage.MessageType, rawMessage.Sender)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
return wrappedMessage, nil
}
func (s *MessageSender) addToDataSync(publicKey *ecdsa.PublicKey, message []byte) ([]byte, error) {
groupID := datasync.ToOneToOneGroupID(&s.identity.PublicKey, publicKey)
peerID := datasyncpeer.PublicKeyToPeerID(*publicKey)
exist, err := s.datasync.IsPeerInGroup(groupID, peerID)
if err != nil {
return nil, errors.Wrap(err, "failed to check if peer is in group")
}
if !exist {
if err := s.datasync.AddPeer(groupID, peerID); err != nil {
return nil, errors.Wrap(err, "failed to add peer")
}
}
id, err := s.datasync.AppendMessage(groupID, message)
if err != nil {
return nil, errors.Wrap(err, "failed to append message to datasync")
}
return id[:], nil
}
// sendPrivateRawMessage sends a message not wrapped in an encryption layer
func (s *MessageSender) sendPrivateRawMessage(ctx context.Context, rawMessage *RawMessage, publicKey *ecdsa.PublicKey, payload []byte) ([][]byte, []*types.NewMessage, error) {
newMessage := &types.NewMessage{
TTL: whisperTTL,
Payload: payload,
PowTarget: calculatePoW(payload),
PowTime: whisperPoWTime,
PubsubTopic: rawMessage.PubsubTopic,
}
newMessages, err := s.segmentMessage(newMessage)
if err != nil {
return nil, nil, err
}
hashes := make([][]byte, 0, len(newMessages))
var hash []byte
for _, newMessage := range newMessages {
if rawMessage.SendOnPersonalTopic {
hash, err = s.transport.SendPrivateOnPersonalTopic(ctx, newMessage, publicKey)
} else {
hash, err = s.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey)
}
if err != nil {
return nil, nil, err
}
hashes = append(hashes, hash)
}
return hashes, newMessages, nil
}
// sendCommunityMessage sends a message not wrapped in an encryption layer
// to a community
func (s *MessageSender) dispatchCommunityMessage(ctx context.Context, publicKey *ecdsa.PublicKey, wrappedMessage []byte, pubsubTopic string, rekey bool, rawMessage *RawMessage) ([][]byte, []*types.NewMessage, error) {
payload := wrappedMessage
if rekey && len(rawMessage.HashRatchetGroupID) != 0 {
var ratchet *encryption.HashRatchetKeyCompatibility
var err error
// We have just rekeyed, pull the latest
if RekeyCompatibility {
ratchet, err = s.protocol.GetCurrentKeyForGroup(rawMessage.HashRatchetGroupID)
if err != nil {
return nil, nil, err
}
}
keyID, err := ratchet.GetKeyID()
if err != nil {
return nil, nil, err
}
s.logger.Debug("adding key id to message", zap.String("keyid", types.Bytes2Hex(keyID)))
// We send the message over the community topic
spec, err := s.protocol.BuildHashRatchetReKeyGroupMessage(s.identity, rawMessage.Recipients, rawMessage.HashRatchetGroupID, wrappedMessage, ratchet)
if err != nil {
return nil, nil, err
}
payload, err = proto.Marshal(spec.Message)
if err != nil {
return nil, nil, err
}
}
newMessage := &types.NewMessage{
TTL: whisperTTL,
Payload: payload,
PowTarget: calculatePoW(payload),
PowTime: whisperPoWTime,
PubsubTopic: pubsubTopic,
}
newMessages, err := s.segmentMessage(newMessage)
if err != nil {
return nil, nil, err
}
hashes := make([][]byte, 0, len(newMessages))
for _, newMessage := range newMessages {
hash, err := s.transport.SendCommunityMessage(ctx, newMessage, publicKey)
if err != nil {
return nil, nil, err
}
hashes = append(hashes, hash)
}
return hashes, newMessages, nil
}
func (s *MessageSender) SendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec, messageIDs [][]byte) ([][]byte, []*types.NewMessage, error) {
return s.sendMessageSpec(ctx, publicKey, messageSpec, messageIDs)
}
// sendMessageSpec analyses the spec properties and selects a proper transport method.
func (s *MessageSender) sendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec, messageIDs [][]byte) ([][]byte, []*types.NewMessage, error) {
logger := s.logger.With(zap.String("site", "sendMessageSpec"))
newMessage, err := MessageSpecToWhisper(messageSpec)
if err != nil {
return nil, nil, err
}
newMessages, err := s.segmentMessage(newMessage)
if err != nil {
return nil, nil, err
}
hashes := make([][]byte, 0, len(newMessages))
var hash []byte
for _, newMessage := range newMessages {
// The shared secret needs to be handle before we send a message
// otherwise the topic might not be set up before we receive a message
if messageSpec.SharedSecret != nil && s.handleSharedSecrets != nil {
err := s.handleSharedSecrets([]*sharedsecret.Secret{messageSpec.SharedSecret})
if err != nil {
return nil, nil, err
}
}
// process shared secret
if messageSpec.AgreedSecret {
logger.Debug("sending using shared secret")
hash, err = s.transport.SendPrivateWithSharedSecret(ctx, newMessage, publicKey, messageSpec.SharedSecret.Key)
} else {
logger.Debug("sending partitioned topic")
hash, err = s.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey)
}
if err != nil {
return nil, nil, err
}
hashes = append(hashes, hash)
}
sentMessage := &SentMessage{
PublicKey: publicKey,
Spec: messageSpec,
MessageIDs: messageIDs,
}
s.notifyOnSentMessage(sentMessage)
return hashes, newMessages, nil
}
func (s *MessageSender) SubscribeToMessageEvents() <-chan *MessageEvent {
c := make(chan *MessageEvent, 100)
s.messageEventsSubscriptions = append(s.messageEventsSubscriptions, c)
return c
}
func (s *MessageSender) notifyOnSentMessage(sentMessage *SentMessage) {
event := &MessageEvent{
Type: MessageSent,
SentMessage: sentMessage,
}
// Publish on channels, drop if buffer is full
for _, c := range s.messageEventsSubscriptions {
select {
case c <- event:
default:
s.logger.Warn("message events subscription channel full when publishing sent event, dropping message")
}
}
}
func (s *MessageSender) notifyOnScheduledMessage(recipient *ecdsa.PublicKey, message *RawMessage) {
event := &MessageEvent{
Recipient: recipient,
Type: MessageScheduled,
RawMessage: message,
}
// Publish on channels, drop if buffer is full
for _, c := range s.messageEventsSubscriptions {
select {
case c <- event:
default:
s.logger.Warn("message events subscription channel full when publishing scheduled event, dropping message")
}
}
}
func (s *MessageSender) JoinPublic(id string) (*transport.Filter, error) {
return s.transport.JoinPublic(id)
}
// AddEphemeralKey adds an ephemeral key that we will be listening to
// note that we never removed them from now, as waku/whisper does not
// recalculate topics on removal, so effectively there's no benefit.
// On restart they will be gone.
func (s *MessageSender) AddEphemeralKey(privateKey *ecdsa.PrivateKey) (*transport.Filter, error) {
s.ephemeralKeysMutex.Lock()
s.ephemeralKeys[types.EncodeHex(crypto.FromECDSAPub(&privateKey.PublicKey))] = privateKey
s.ephemeralKeysMutex.Unlock()
return s.transport.LoadKeyFilters(privateKey)
}
func MessageSpecToWhisper(spec *encryption.ProtocolMessageSpec) (*types.NewMessage, error) {
var newMessage *types.NewMessage
payload, err := proto.Marshal(spec.Message)
if err != nil {
return newMessage, err
}
newMessage = &types.NewMessage{
TTL: whisperTTL,
Payload: payload,
PowTarget: calculatePoW(payload),
PowTime: whisperPoWTime,
}
return newMessage, nil
}
// calculatePoW returns the PoWTarget to be used.
// We check the size and arbitrarily set it to a lower PoW if the packet is
// greater than 50KB. We do this as the defaultPoW is too high for clients to send
// large messages.
func calculatePoW(payload []byte) float64 {
if len(payload) > largeSizeInBytes {
return whisperLargeSizePoW
}
return whisperDefaultPoW
}
func (s *MessageSender) StopDatasync() {
if s.datasync != nil {
s.datasync.Stop()
}
}
// GetCurrentKeyForGroup returns the latest key timestampID belonging to a key group
func (s *MessageSender) GetCurrentKeyForGroup(groupID []byte) (*encryption.HashRatchetKeyCompatibility, error) {
return s.protocol.GetCurrentKeyForGroup(groupID)
}
// GetKeyIDsForGroup returns a slice of key IDs belonging to a given group ID
func (s *MessageSender) GetKeysForGroup(groupID []byte) ([]*encryption.HashRatchetKeyCompatibility, error) {
return s.protocol.GetKeysForGroup(groupID)
}
// Segments message into smaller chunks if the size exceeds the maximum allowed
func segmentMessage(newMessage *types.NewMessage, maxSegmentSize int) ([]*types.NewMessage, error) {
if len(newMessage.Payload) <= maxSegmentSize {
return []*types.NewMessage{newMessage}, nil
}
createSegment := func(chunkPayload []byte) (*types.NewMessage, error) {
copy := &types.NewMessage{}
err := copier.Copy(copy, newMessage)
if err != nil {
return nil, err
}
copy.Payload = chunkPayload
copy.PowTarget = calculatePoW(chunkPayload)
return copy, nil
}
entireMessageHash := crypto.Keccak256(newMessage.Payload)
payloadSize := len(newMessage.Payload)
segmentsCount := int(math.Ceil(float64(payloadSize) / float64(maxSegmentSize)))
var segmentMessages []*types.NewMessage
for start, index := 0, 0; start < payloadSize; start += maxSegmentSize {
end := start + maxSegmentSize
if end > payloadSize {
end = payloadSize
}
chunk := newMessage.Payload[start:end]
segmentMessageProto := &protobuf.SegmentMessage{
EntireMessageHash: entireMessageHash,
Index: uint32(index),
SegmentsCount: uint32(segmentsCount),
Payload: chunk,
}
chunkPayload, err := proto.Marshal(segmentMessageProto)
if err != nil {
return nil, err
}
segmentMessage, err := createSegment(chunkPayload)
if err != nil {
return nil, err
}
segmentMessages = append(segmentMessages, segmentMessage)
index++
}
return segmentMessages, nil
}
func (s *MessageSender) segmentMessage(newMessage *types.NewMessage) ([]*types.NewMessage, error) {
// We set the max message size to 3/4 of the allowed message size, to leave
// room for segment message metadata.
newMessages, err := segmentMessage(newMessage, int(s.transport.MaxMessageSize()/4*3))
s.logger.Debug("message segmented", zap.Int("segments", len(newMessages)))
return newMessages, err
}
var ErrMessageSegmentsIncomplete = errors.New("message segments incomplete")
var ErrMessageSegmentsAlreadyCompleted = errors.New("message segments already completed")
var ErrMessageSegmentsInvalidCount = errors.New("invalid segments count")
var ErrMessageSegmentsHashMismatch = errors.New("hash of entire payload does not match")
func (s *MessageSender) handleSegmentationLayer(message *v1protocol.StatusMessage) error {
logger := s.logger.With(zap.String("site", "handleSegmentationLayer"))
hlogger := logger.With(zap.String("hash", types.HexBytes(message.TransportLayer.Hash).String()))
var segmentMessage protobuf.SegmentMessage
err := proto.Unmarshal(message.TransportLayer.Payload, &segmentMessage)
if err != nil {
return errors.Wrap(err, "failed to unmarshal SegmentMessage")
}
hlogger.Debug("handling message segment", zap.String("EntireMessageHash", types.HexBytes(segmentMessage.EntireMessageHash).String()),
zap.Uint32("Index", segmentMessage.Index), zap.Uint32("SegmentsCount", segmentMessage.SegmentsCount))
alreadyCompleted, err := s.persistence.IsMessageAlreadyCompleted(segmentMessage.EntireMessageHash)
if err != nil {
return err
}
if alreadyCompleted {
return ErrMessageSegmentsAlreadyCompleted
}
if segmentMessage.SegmentsCount < 2 {
return ErrMessageSegmentsInvalidCount
}
err = s.persistence.SaveMessageSegment(&segmentMessage, message.TransportLayer.SigPubKey, time.Now().Unix())
if err != nil {
return err
}
segments, err := s.persistence.GetMessageSegments(segmentMessage.EntireMessageHash, message.TransportLayer.SigPubKey)
if err != nil {
return err
}
if len(segments) != int(segmentMessage.SegmentsCount) {
return ErrMessageSegmentsIncomplete
}
// Combine payload
var entirePayload bytes.Buffer
for _, segment := range segments {
_, err := entirePayload.Write(segment.Payload)
if err != nil {
return errors.Wrap(err, "failed to write segment payload")
}
}
// Sanity check
entirePayloadHash := crypto.Keccak256(entirePayload.Bytes())
if !bytes.Equal(entirePayloadHash, segmentMessage.EntireMessageHash) {
return ErrMessageSegmentsHashMismatch
}
err = s.persistence.CompleteMessageSegments(segmentMessage.EntireMessageHash, message.TransportLayer.SigPubKey, time.Now().Unix())
if err != nil {
return err
}
message.TransportLayer.Payload = entirePayload.Bytes()
return nil
}
func (s *MessageSender) CleanupSegments() error {
weekAgo := time.Now().AddDate(0, 0, -7).Unix()
monthAgo := time.Now().AddDate(0, -1, 0).Unix()
err := s.persistence.RemoveMessageSegmentsOlderThan(weekAgo)
if err != nil {
return err
}
err = s.persistence.RemoveMessageSegmentsCompletedOlderThan(monthAgo)
if err != nil {
return err
}
return nil
}