mirror of
https://github.com/status-im/status-go.git
synced 2025-01-22 20:54:23 +00:00
f75f7bb738
This commit expands the confirmation mechanism to allow private group chat messages to be confirmed: Changes: - Added a separate table for message confirmations as group chat messages have same messageID but multiple datasyncID - Removed DataSyncID from raw message (I haven't removed the column name as it can't be done in sqlite without copying over the table)
816 lines
26 KiB
Go
816 lines
26 KiB
Go
package common
|
|
|
|
import (
|
|
"context"
|
|
"crypto/ecdsa"
|
|
"database/sql"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
"github.com/pkg/errors"
|
|
datasyncnode "github.com/vacp2p/mvds/node"
|
|
datasyncproto "github.com/vacp2p/mvds/protobuf"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/status-im/status-go/eth-node/crypto"
|
|
"github.com/status-im/status-go/eth-node/types"
|
|
"github.com/status-im/status-go/protocol/datasync"
|
|
datasyncpeer "github.com/status-im/status-go/protocol/datasync/peer"
|
|
"github.com/status-im/status-go/protocol/encryption"
|
|
"github.com/status-im/status-go/protocol/encryption/sharedsecret"
|
|
"github.com/status-im/status-go/protocol/protobuf"
|
|
"github.com/status-im/status-go/protocol/transport"
|
|
v1protocol "github.com/status-im/status-go/protocol/v1"
|
|
)
|
|
|
|
// Whisper message properties.
|
|
const (
|
|
whisperTTL = 15
|
|
whisperDefaultPoW = 0.002
|
|
// whisperLargeSizePoW is the PoWTarget for larger payload sizes
|
|
whisperLargeSizePoW = 0.000002
|
|
// largeSizeInBytes is when should we be using a lower POW.
|
|
// Roughly this is 50KB
|
|
largeSizeInBytes = 50000
|
|
whisperPoWTime = 5
|
|
)
|
|
|
|
// SentMessage reprent a message that has been passed to the transport layer
|
|
type SentMessage struct {
|
|
PublicKey *ecdsa.PublicKey
|
|
Spec *encryption.ProtocolMessageSpec
|
|
MessageIDs [][]byte
|
|
}
|
|
|
|
type MessageProcessor struct {
|
|
identity *ecdsa.PrivateKey
|
|
datasync *datasync.DataSync
|
|
protocol *encryption.Protocol
|
|
transport transport.Transport
|
|
logger *zap.Logger
|
|
persistence *RawMessagesPersistence
|
|
|
|
// ephemeralKeys is a map that contains the ephemeral keys of the client, used
|
|
// to decrypt messages
|
|
ephemeralKeys map[string]*ecdsa.PrivateKey
|
|
ephemeralKeysMutex sync.Mutex
|
|
|
|
// sentMessagesSubscriptions contains all the subscriptions for sent messages
|
|
sentMessagesSubscriptions []chan<- *SentMessage
|
|
// sentMessagesSubscriptions contains all the subscriptions for scheduled messages
|
|
scheduledMessagesSubscriptions []chan<- *RawMessage
|
|
|
|
featureFlags FeatureFlags
|
|
|
|
// handleSharedSecrets is a callback that is called every time a new shared secret is negotiated
|
|
handleSharedSecrets func([]*sharedsecret.Secret) error
|
|
}
|
|
|
|
func NewMessageProcessor(
|
|
identity *ecdsa.PrivateKey,
|
|
database *sql.DB,
|
|
enc *encryption.Protocol,
|
|
transport transport.Transport,
|
|
logger *zap.Logger,
|
|
features FeatureFlags,
|
|
) (*MessageProcessor, error) {
|
|
dataSyncTransport := datasync.NewNodeTransport()
|
|
dataSyncNode, err := datasyncnode.NewPersistentNode(
|
|
database,
|
|
dataSyncTransport,
|
|
datasyncpeer.PublicKeyToPeerID(identity.PublicKey),
|
|
datasyncnode.BATCH,
|
|
datasync.CalculateSendTime,
|
|
logger,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ds := datasync.New(dataSyncNode, dataSyncTransport, features.Datasync, logger)
|
|
|
|
p := &MessageProcessor{
|
|
identity: identity,
|
|
datasync: ds,
|
|
protocol: enc,
|
|
persistence: NewRawMessagesPersistence(database),
|
|
transport: transport,
|
|
logger: logger,
|
|
ephemeralKeys: make(map[string]*ecdsa.PrivateKey),
|
|
featureFlags: features,
|
|
}
|
|
|
|
// Initializing DataSync is required to encrypt and send messages.
|
|
// With DataSync enabled, messages are added to the DataSync
|
|
// but actual encrypt and send calls are postponed.
|
|
// sendDataSync is responsible for encrypting and sending postponed messages.
|
|
if features.Datasync {
|
|
// We set the max message size to 3/4 of the allowed message size, to leave
|
|
// room for encryption.
|
|
// Messages will be tried to send in any case, even if they exceed this
|
|
// value
|
|
ds.Init(p.sendDataSync, transport.MaxMessageSize()/4*3, logger)
|
|
ds.Start(datasync.DatasyncTicker)
|
|
}
|
|
|
|
return p, nil
|
|
}
|
|
|
|
func (p *MessageProcessor) Stop() {
|
|
for _, c := range p.sentMessagesSubscriptions {
|
|
close(c)
|
|
}
|
|
p.sentMessagesSubscriptions = nil
|
|
p.datasync.Stop() // idempotent op
|
|
}
|
|
|
|
func (p *MessageProcessor) SetHandleSharedSecrets(handler func([]*sharedsecret.Secret) error) {
|
|
p.handleSharedSecrets = handler
|
|
}
|
|
|
|
// SendPrivate takes encoded data, encrypts it and sends through the wire.
|
|
func (p *MessageProcessor) SendPrivate(
|
|
ctx context.Context,
|
|
recipient *ecdsa.PublicKey,
|
|
rawMessage *RawMessage,
|
|
) ([]byte, error) {
|
|
p.logger.Debug(
|
|
"sending a private message",
|
|
zap.String("public-key", types.EncodeHex(crypto.FromECDSAPub(recipient))),
|
|
zap.String("site", "SendPrivate"),
|
|
)
|
|
// Currently we don't support sending through datasync and setting custom waku fields,
|
|
// as the datasync interface is not rich enough to propagate that information, so we
|
|
// would have to add some complexity to handle this.
|
|
if rawMessage.ResendAutomatically && (rawMessage.Sender != nil || rawMessage.SkipEncryption || rawMessage.SendOnPersonalTopic) {
|
|
return nil, errors.New("setting identity, skip-encryption or personal topic and datasync not supported")
|
|
}
|
|
|
|
// Set sender identity if not specified
|
|
if rawMessage.Sender == nil {
|
|
rawMessage.Sender = p.identity
|
|
}
|
|
|
|
return p.sendPrivate(ctx, recipient, rawMessage)
|
|
}
|
|
|
|
// SendCommunityMessage takes encoded data, encrypts it and sends through the wire
|
|
// using the community topic and their key
|
|
func (p *MessageProcessor) SendCommunityMessage(
|
|
ctx context.Context,
|
|
recipient *ecdsa.PublicKey,
|
|
rawMessage RawMessage,
|
|
) ([]byte, error) {
|
|
p.logger.Debug(
|
|
"sending a community message",
|
|
zap.String("public-key", types.EncodeHex(crypto.FromECDSAPub(recipient))),
|
|
zap.String("site", "SendPrivate"),
|
|
)
|
|
rawMessage.Sender = p.identity
|
|
|
|
return p.sendCommunity(ctx, recipient, &rawMessage)
|
|
}
|
|
|
|
// SendGroup takes encoded data, encrypts it and sends through the wire,
|
|
// always return the messageID
|
|
func (p *MessageProcessor) SendGroup(
|
|
ctx context.Context,
|
|
recipients []*ecdsa.PublicKey,
|
|
rawMessage RawMessage,
|
|
) ([]byte, error) {
|
|
p.logger.Debug(
|
|
"sending a private group message",
|
|
zap.String("site", "SendGroup"),
|
|
)
|
|
// Set sender if not specified
|
|
if rawMessage.Sender == nil {
|
|
rawMessage.Sender = p.identity
|
|
}
|
|
|
|
// Calculate messageID first and set on raw message
|
|
wrappedMessage, err := p.wrapMessageV1(&rawMessage)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to wrap message")
|
|
}
|
|
messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
|
|
rawMessage.ID = types.EncodeHex(messageID)
|
|
|
|
// Send to each recipients
|
|
for _, recipient := range recipients {
|
|
_, err = p.sendPrivate(ctx, recipient, &rawMessage)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to send message")
|
|
}
|
|
}
|
|
return messageID, nil
|
|
}
|
|
|
|
// sendCommunity sends data to the recipient identifying with a given public key.
|
|
func (p *MessageProcessor) sendCommunity(
|
|
ctx context.Context,
|
|
recipient *ecdsa.PublicKey,
|
|
rawMessage *RawMessage,
|
|
) ([]byte, error) {
|
|
p.logger.Debug("sending community message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient))))
|
|
|
|
wrappedMessage, err := p.wrapMessageV1(rawMessage)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to wrap message")
|
|
}
|
|
|
|
messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
|
|
rawMessage.ID = types.EncodeHex(messageID)
|
|
|
|
// Notify before dispatching, otherwise the dispatch subscription might happen
|
|
// earlier than the scheduled
|
|
p.notifyOnScheduledMessage(rawMessage)
|
|
|
|
messageIDs := [][]byte{messageID}
|
|
hash, newMessage, err := p.sendCommunityRawMessage(ctx, recipient, wrappedMessage, messageIDs)
|
|
if err != nil {
|
|
p.logger.Error("failed to send a community message", zap.Error(err))
|
|
return nil, errors.Wrap(err, "failed to send a message spec")
|
|
}
|
|
|
|
p.transport.Track(messageIDs, hash, newMessage)
|
|
|
|
return messageID, nil
|
|
}
|
|
|
|
// sendPrivate sends data to the recipient identifying with a given public key.
|
|
func (p *MessageProcessor) sendPrivate(
|
|
ctx context.Context,
|
|
recipient *ecdsa.PublicKey,
|
|
rawMessage *RawMessage,
|
|
) ([]byte, error) {
|
|
p.logger.Debug("sending private message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient))))
|
|
|
|
wrappedMessage, err := p.wrapMessageV1(rawMessage)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to wrap message")
|
|
}
|
|
|
|
messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
|
|
rawMessage.ID = types.EncodeHex(messageID)
|
|
|
|
// Notify before dispatching, otherwise the dispatch subscription might happen
|
|
// earlier than the scheduled
|
|
p.notifyOnScheduledMessage(rawMessage)
|
|
|
|
if p.featureFlags.Datasync && rawMessage.ResendAutomatically {
|
|
// No need to call transport tracking.
|
|
// It is done in a data sync dispatch step.
|
|
datasyncID, err := p.addToDataSync(recipient, wrappedMessage)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to send message with datasync")
|
|
}
|
|
// We don't need to receive confirmations from our own devices
|
|
if !IsPubKeyEqual(recipient, &p.identity.PublicKey) {
|
|
confirmation := &RawMessageConfirmation{
|
|
DataSyncID: datasyncID,
|
|
MessageID: messageID,
|
|
PublicKey: crypto.CompressPubkey(recipient),
|
|
}
|
|
|
|
err = p.persistence.InsertPendingConfirmation(confirmation)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
} else if rawMessage.SkipEncryption {
|
|
// When SkipEncryption is set we don't pass the message to the encryption layer
|
|
messageIDs := [][]byte{messageID}
|
|
hash, newMessage, err := p.sendPrivateRawMessage(ctx, rawMessage, recipient, wrappedMessage, messageIDs)
|
|
if err != nil {
|
|
p.logger.Error("failed to send a private message", zap.Error(err))
|
|
return nil, errors.Wrap(err, "failed to send a message spec")
|
|
}
|
|
|
|
p.transport.Track(messageIDs, hash, newMessage)
|
|
|
|
} else {
|
|
messageSpec, err := p.protocol.BuildDirectMessage(rawMessage.Sender, recipient, wrappedMessage)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to encrypt message")
|
|
}
|
|
|
|
// The shared secret needs to be handle before we send a message
|
|
// otherwise the topic might not be set up before we receive a message
|
|
if p.handleSharedSecrets != nil {
|
|
err := p.handleSharedSecrets([]*sharedsecret.Secret{messageSpec.SharedSecret})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
}
|
|
|
|
messageIDs := [][]byte{messageID}
|
|
hash, newMessage, err := p.sendMessageSpec(ctx, recipient, messageSpec, messageIDs)
|
|
if err != nil {
|
|
p.logger.Error("failed to send a private message", zap.Error(err))
|
|
return nil, errors.Wrap(err, "failed to send a message spec")
|
|
}
|
|
|
|
p.transport.Track(messageIDs, hash, newMessage)
|
|
}
|
|
|
|
return messageID, nil
|
|
}
|
|
|
|
// sendPairInstallation sends data to the recipients, using DH
|
|
func (p *MessageProcessor) SendPairInstallation(
|
|
ctx context.Context,
|
|
recipient *ecdsa.PublicKey,
|
|
rawMessage RawMessage,
|
|
) ([]byte, error) {
|
|
p.logger.Debug("sending private message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient))))
|
|
|
|
wrappedMessage, err := p.wrapMessageV1(&rawMessage)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to wrap message")
|
|
}
|
|
|
|
messageSpec, err := p.protocol.BuildDHMessage(p.identity, recipient, wrappedMessage)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to encrypt message")
|
|
}
|
|
|
|
messageID := v1protocol.MessageID(&p.identity.PublicKey, wrappedMessage)
|
|
messageIDs := [][]byte{messageID}
|
|
|
|
hash, newMessage, err := p.sendMessageSpec(ctx, recipient, messageSpec, messageIDs)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to send a message spec")
|
|
}
|
|
|
|
p.transport.Track(messageIDs, hash, newMessage)
|
|
|
|
return messageID, nil
|
|
}
|
|
|
|
func (p *MessageProcessor) encodeMembershipUpdate(
|
|
message v1protocol.MembershipUpdateMessage,
|
|
chatEntity ChatEntity,
|
|
) ([]byte, error) {
|
|
|
|
if chatEntity != nil {
|
|
chatEntityProtobuf := chatEntity.GetProtobuf()
|
|
switch chatEntityProtobuf := chatEntityProtobuf.(type) {
|
|
case *protobuf.ChatMessage:
|
|
message.Message = chatEntityProtobuf
|
|
case *protobuf.EmojiReaction:
|
|
message.EmojiReaction = chatEntityProtobuf
|
|
|
|
}
|
|
}
|
|
|
|
encodedMessage, err := v1protocol.EncodeMembershipUpdateMessage(message)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to encode membership update message")
|
|
}
|
|
|
|
return encodedMessage, nil
|
|
}
|
|
|
|
// EncodeMembershipUpdate takes a group and an optional chat message and returns the protobuf representation to be sent on the wire.
|
|
// All the events in a group are encoded and added to the payload
|
|
func (p *MessageProcessor) EncodeMembershipUpdate(
|
|
group *v1protocol.Group,
|
|
chatEntity ChatEntity,
|
|
) ([]byte, error) {
|
|
message := v1protocol.MembershipUpdateMessage{
|
|
ChatID: group.ChatID(),
|
|
Events: group.Events(),
|
|
}
|
|
|
|
return p.encodeMembershipUpdate(message, chatEntity)
|
|
}
|
|
|
|
// EncodeAbridgedMembershipUpdate takes a group and an optional chat message and returns the protobuf representation to be sent on the wire.
|
|
// Only the events relevant to the sender are encoded
|
|
func (p *MessageProcessor) EncodeAbridgedMembershipUpdate(
|
|
group *v1protocol.Group,
|
|
chatEntity ChatEntity,
|
|
) ([]byte, error) {
|
|
message := v1protocol.MembershipUpdateMessage{
|
|
ChatID: group.ChatID(),
|
|
Events: group.AbridgedEvents(&p.identity.PublicKey),
|
|
}
|
|
return p.encodeMembershipUpdate(message, chatEntity)
|
|
}
|
|
|
|
// SendPublic takes encoded data, encrypts it and sends through the wire.
|
|
func (p *MessageProcessor) SendPublic(
|
|
ctx context.Context,
|
|
chatName string,
|
|
rawMessage RawMessage,
|
|
) ([]byte, error) {
|
|
// Set sender
|
|
if rawMessage.Sender == nil {
|
|
rawMessage.Sender = p.identity
|
|
}
|
|
|
|
wrappedMessage, err := p.wrapMessageV1(&rawMessage)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to wrap message")
|
|
}
|
|
|
|
var newMessage *types.NewMessage
|
|
|
|
messageSpec, err := p.protocol.BuildPublicMessage(p.identity, wrappedMessage)
|
|
if err != nil {
|
|
p.logger.Error("failed to send a public message", zap.Error(err))
|
|
return nil, errors.Wrap(err, "failed to wrap a public message in the encryption layer")
|
|
}
|
|
|
|
if !rawMessage.SkipEncryption {
|
|
newMessage, err = MessageSpecToWhisper(messageSpec)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
newMessage = &types.NewMessage{
|
|
TTL: whisperTTL,
|
|
Payload: wrappedMessage,
|
|
PowTarget: calculatePoW(wrappedMessage),
|
|
PowTime: whisperPoWTime,
|
|
}
|
|
}
|
|
|
|
messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
|
|
rawMessage.ID = types.EncodeHex(messageID)
|
|
|
|
// notify before dispatching
|
|
p.notifyOnScheduledMessage(&rawMessage)
|
|
|
|
hash, err := p.transport.SendPublic(ctx, newMessage, chatName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
sentMessage := &SentMessage{
|
|
Spec: messageSpec,
|
|
MessageIDs: [][]byte{messageID},
|
|
}
|
|
|
|
p.notifyOnSentMessage(sentMessage)
|
|
|
|
p.transport.Track([][]byte{messageID}, hash, newMessage)
|
|
|
|
return messageID, nil
|
|
}
|
|
|
|
// unwrapDatasyncMessage tries to unwrap message as datasync one and in case of success
|
|
// returns cloned messages with replaced payloads
|
|
func unwrapDatasyncMessage(m *v1protocol.StatusMessage, datasync *datasync.DataSync) ([]*v1protocol.StatusMessage, [][]byte, error) {
|
|
var statusMessages []*v1protocol.StatusMessage
|
|
|
|
payloads, acks, err := datasync.UnwrapPayloadsAndAcks(
|
|
m.SigPubKey(),
|
|
m.DecryptedPayload,
|
|
)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
for _, payload := range payloads {
|
|
message, err := m.Clone()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
message.DecryptedPayload = payload
|
|
statusMessages = append(statusMessages, message)
|
|
}
|
|
return statusMessages, acks, nil
|
|
}
|
|
|
|
// HandleMessages expects a whisper message as input, and it will go through
|
|
// a series of transformations until the message is parsed into an application
|
|
// layer message, or in case of Raw methods, the processing stops at the layer
|
|
// before.
|
|
// It returns an error only if the processing of required steps failed.
|
|
func (p *MessageProcessor) HandleMessages(shhMessage *types.Message, applicationLayer bool) ([]*v1protocol.StatusMessage, [][]byte, error) {
|
|
logger := p.logger.With(zap.String("site", "handleMessages"))
|
|
hlogger := logger.With(zap.ByteString("hash", shhMessage.Hash))
|
|
var statusMessage v1protocol.StatusMessage
|
|
var statusMessages []*v1protocol.StatusMessage
|
|
|
|
err := statusMessage.HandleTransport(shhMessage)
|
|
if err != nil {
|
|
hlogger.Error("failed to handle transport layer message", zap.Error(err))
|
|
return nil, nil, err
|
|
}
|
|
|
|
err = p.handleEncryptionLayer(context.Background(), &statusMessage)
|
|
if err != nil {
|
|
hlogger.Debug("failed to handle an encryption message", zap.Error(err))
|
|
}
|
|
|
|
statusMessages, acks, err := unwrapDatasyncMessage(&statusMessage, p.datasync)
|
|
if err != nil {
|
|
hlogger.Debug("failed to handle datasync message", zap.Error(err))
|
|
//that wasn't a datasync message, so use the original payload
|
|
statusMessages = append(statusMessages, &statusMessage)
|
|
}
|
|
|
|
for _, statusMessage := range statusMessages {
|
|
err := statusMessage.HandleApplicationMetadata()
|
|
if err != nil {
|
|
hlogger.Error("failed to handle application metadata layer message", zap.Error(err))
|
|
}
|
|
|
|
if applicationLayer {
|
|
err = statusMessage.HandleApplication()
|
|
if err != nil {
|
|
hlogger.Error("failed to handle application layer message", zap.Error(err))
|
|
}
|
|
}
|
|
}
|
|
|
|
return statusMessages, acks, nil
|
|
}
|
|
|
|
// fetchDecryptionKey returns the private key associated with this public key, and returns true if it's an ephemeral key
|
|
func (p *MessageProcessor) fetchDecryptionKey(destination *ecdsa.PublicKey) (*ecdsa.PrivateKey, bool) {
|
|
destinationID := types.EncodeHex(crypto.FromECDSAPub(destination))
|
|
|
|
p.ephemeralKeysMutex.Lock()
|
|
decryptionKey, ok := p.ephemeralKeys[destinationID]
|
|
p.ephemeralKeysMutex.Unlock()
|
|
|
|
// the key is not there, fallback on identity
|
|
if !ok {
|
|
return p.identity, false
|
|
}
|
|
return decryptionKey, true
|
|
}
|
|
|
|
func (p *MessageProcessor) handleEncryptionLayer(ctx context.Context, message *v1protocol.StatusMessage) error {
|
|
logger := p.logger.With(zap.String("site", "handleEncryptionLayer"))
|
|
publicKey := message.SigPubKey()
|
|
|
|
// if it's an ephemeral key, we don't negotiate a topic
|
|
decryptionKey, skipNegotiation := p.fetchDecryptionKey(message.Dst)
|
|
|
|
err := message.HandleEncryption(decryptionKey, publicKey, p.protocol, skipNegotiation)
|
|
|
|
// if it's an ephemeral key, we don't have to handle a device not found error
|
|
if err == encryption.ErrDeviceNotFound && !skipNegotiation {
|
|
if err := p.handleErrDeviceNotFound(ctx, publicKey); err != nil {
|
|
logger.Error("failed to handle ErrDeviceNotFound", zap.Error(err))
|
|
}
|
|
}
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to process an encrypted message")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *MessageProcessor) handleErrDeviceNotFound(ctx context.Context, publicKey *ecdsa.PublicKey) error {
|
|
now := time.Now().Unix()
|
|
advertise, err := p.protocol.ShouldAdvertiseBundle(publicKey, now)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !advertise {
|
|
return nil
|
|
}
|
|
|
|
messageSpec, err := p.protocol.BuildBundleAdvertiseMessage(p.identity, publicKey)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(ctx, time.Second)
|
|
defer cancel()
|
|
// We don't pass an array of messageIDs as no action needs to be taken
|
|
// when sending a bundle
|
|
_, _, err = p.sendMessageSpec(ctx, publicKey, messageSpec, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
p.protocol.ConfirmBundleAdvertisement(publicKey, now)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *MessageProcessor) wrapMessageV1(rawMessage *RawMessage) ([]byte, error) {
|
|
wrappedMessage, err := v1protocol.WrapMessageV1(rawMessage.Payload, rawMessage.MessageType, rawMessage.Sender)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to wrap message")
|
|
}
|
|
return wrappedMessage, nil
|
|
}
|
|
|
|
func (p *MessageProcessor) addToDataSync(publicKey *ecdsa.PublicKey, message []byte) ([]byte, error) {
|
|
groupID := datasync.ToOneToOneGroupID(&p.identity.PublicKey, publicKey)
|
|
peerID := datasyncpeer.PublicKeyToPeerID(*publicKey)
|
|
exist, err := p.datasync.IsPeerInGroup(groupID, peerID)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to check if peer is in group")
|
|
}
|
|
if !exist {
|
|
if err := p.datasync.AddPeer(groupID, peerID); err != nil {
|
|
return nil, errors.Wrap(err, "failed to add peer")
|
|
}
|
|
}
|
|
id, err := p.datasync.AppendMessage(groupID, message)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to append message to datasync")
|
|
}
|
|
|
|
return id[:], nil
|
|
}
|
|
|
|
// sendDataSync sends a message scheduled by the data sync layer.
|
|
// Data Sync layer calls this method "dispatch" function.
|
|
func (p *MessageProcessor) sendDataSync(ctx context.Context, publicKey *ecdsa.PublicKey, encodedMessage []byte, payload *datasyncproto.Payload) error {
|
|
// Calculate the messageIDs
|
|
messageIDs := make([][]byte, 0, len(payload.Messages))
|
|
for _, payload := range payload.Messages {
|
|
messageIDs = append(messageIDs, v1protocol.MessageID(&p.identity.PublicKey, payload.Body))
|
|
}
|
|
|
|
messageSpec, err := p.protocol.BuildDirectMessage(p.identity, publicKey, encodedMessage)
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to encrypt message")
|
|
}
|
|
|
|
// The shared secret needs to be handle before we send a message
|
|
// otherwise the topic might not be set up before we receive a message
|
|
if p.handleSharedSecrets != nil {
|
|
err := p.handleSharedSecrets([]*sharedsecret.Secret{messageSpec.SharedSecret})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
}
|
|
|
|
hash, newMessage, err := p.sendMessageSpec(ctx, publicKey, messageSpec, messageIDs)
|
|
if err != nil {
|
|
p.logger.Error("failed to send a datasync message", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
p.transport.Track(messageIDs, hash, newMessage)
|
|
|
|
return nil
|
|
}
|
|
|
|
// sendPrivateRawMessage sends a message not wrapped in an encryption layer
|
|
func (p *MessageProcessor) sendPrivateRawMessage(ctx context.Context, rawMessage *RawMessage, publicKey *ecdsa.PublicKey, payload []byte, messageIDs [][]byte) ([]byte, *types.NewMessage, error) {
|
|
newMessage := &types.NewMessage{
|
|
TTL: whisperTTL,
|
|
Payload: payload,
|
|
PowTarget: calculatePoW(payload),
|
|
PowTime: whisperPoWTime,
|
|
}
|
|
var hash []byte
|
|
var err error
|
|
|
|
if rawMessage.SendOnPersonalTopic {
|
|
hash, err = p.transport.SendPrivateOnPersonalTopic(ctx, newMessage, publicKey)
|
|
} else {
|
|
hash, err = p.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey)
|
|
}
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
return hash, newMessage, nil
|
|
}
|
|
|
|
// sendCommunityRawMessage sends a message not wrapped in an encryption layer
|
|
// to a community
|
|
func (p *MessageProcessor) sendCommunityRawMessage(ctx context.Context, publicKey *ecdsa.PublicKey, payload []byte, messageIDs [][]byte) ([]byte, *types.NewMessage, error) {
|
|
newMessage := &types.NewMessage{
|
|
TTL: whisperTTL,
|
|
Payload: payload,
|
|
PowTarget: calculatePoW(payload),
|
|
PowTime: whisperPoWTime,
|
|
}
|
|
|
|
hash, err := p.transport.SendCommunityMessage(ctx, newMessage, publicKey)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
return hash, newMessage, nil
|
|
}
|
|
|
|
// sendMessageSpec analyses the spec properties and selects a proper transport method.
|
|
func (p *MessageProcessor) sendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec, messageIDs [][]byte) ([]byte, *types.NewMessage, error) {
|
|
newMessage, err := MessageSpecToWhisper(messageSpec)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
logger := p.logger.With(zap.String("site", "sendMessageSpec"))
|
|
|
|
var hash []byte
|
|
|
|
// process shared secret
|
|
if messageSpec.AgreedSecret {
|
|
logger.Debug("sending using shared secret")
|
|
hash, err = p.transport.SendPrivateWithSharedSecret(ctx, newMessage, publicKey, messageSpec.SharedSecret.Key)
|
|
} else {
|
|
logger.Debug("sending partitioned topic")
|
|
hash, err = p.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey)
|
|
}
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
sentMessage := &SentMessage{
|
|
PublicKey: publicKey,
|
|
Spec: messageSpec,
|
|
MessageIDs: messageIDs,
|
|
}
|
|
|
|
p.notifyOnSentMessage(sentMessage)
|
|
|
|
return hash, newMessage, nil
|
|
}
|
|
|
|
// SubscribeToSentMessages returns a channel where we publish every time a message is sent
|
|
func (p *MessageProcessor) SubscribeToSentMessages() <-chan *SentMessage {
|
|
c := make(chan *SentMessage, 100)
|
|
p.sentMessagesSubscriptions = append(p.sentMessagesSubscriptions, c)
|
|
return c
|
|
}
|
|
|
|
func (p *MessageProcessor) notifyOnSentMessage(sentMessage *SentMessage) {
|
|
// Publish on channels, drop if buffer is full
|
|
for _, c := range p.sentMessagesSubscriptions {
|
|
select {
|
|
case c <- sentMessage:
|
|
default:
|
|
p.logger.Warn("sent messages subscription channel full, dropping message")
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
// SubscribeToScheduledMessages returns a channel where we publish every time a message is scheduled for sending
|
|
func (p *MessageProcessor) SubscribeToScheduledMessages() <-chan *RawMessage {
|
|
c := make(chan *RawMessage, 100)
|
|
p.scheduledMessagesSubscriptions = append(p.scheduledMessagesSubscriptions, c)
|
|
return c
|
|
}
|
|
|
|
func (p *MessageProcessor) notifyOnScheduledMessage(message *RawMessage) {
|
|
// Publish on channels, drop if buffer is full
|
|
for _, c := range p.scheduledMessagesSubscriptions {
|
|
select {
|
|
case c <- message:
|
|
default:
|
|
p.logger.Warn("scheduled messages subscription channel full, dropping message")
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *MessageProcessor) JoinPublic(id string) (*transport.Filter, error) {
|
|
return p.transport.JoinPublic(id)
|
|
}
|
|
|
|
// AddEphemeralKey adds an ephemeral key that we will be listening to
|
|
// note that we never removed them from now, as waku/whisper does not
|
|
// recalculate topics on removal, so effectively there's no benefit.
|
|
// On restart they will be gone.
|
|
func (p *MessageProcessor) AddEphemeralKey(privateKey *ecdsa.PrivateKey) (*transport.Filter, error) {
|
|
p.ephemeralKeysMutex.Lock()
|
|
p.ephemeralKeys[types.EncodeHex(crypto.FromECDSAPub(&privateKey.PublicKey))] = privateKey
|
|
p.ephemeralKeysMutex.Unlock()
|
|
return p.transport.LoadKeyFilters(privateKey)
|
|
}
|
|
|
|
func MessageSpecToWhisper(spec *encryption.ProtocolMessageSpec) (*types.NewMessage, error) {
|
|
var newMessage *types.NewMessage
|
|
|
|
payload, err := proto.Marshal(spec.Message)
|
|
if err != nil {
|
|
return newMessage, err
|
|
}
|
|
|
|
newMessage = &types.NewMessage{
|
|
TTL: whisperTTL,
|
|
Payload: payload,
|
|
PowTarget: calculatePoW(payload),
|
|
PowTime: whisperPoWTime,
|
|
}
|
|
return newMessage, nil
|
|
}
|
|
|
|
// calculatePoW returns the PoWTarget to be used.
|
|
// We check the size and arbitrarily set it to a lower PoW if the packet is
|
|
// greater than 50KB. We do this as the defaultPoW is too high for clients to send
|
|
// large messages.
|
|
func calculatePoW(payload []byte) float64 {
|
|
if len(payload) > largeSizeInBytes {
|
|
return whisperLargeSizePoW
|
|
}
|
|
return whisperDefaultPoW
|
|
}
|