mirror of
https://github.com/status-im/status-go.git
synced 2025-02-16 08:50:09 +00:00
Use ephemeral keys for sending messsages
This commit is contained in:
parent
2be8dff54a
commit
63af6aa79b
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"database/sql"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
@ -42,11 +43,13 @@ type SentMessage struct {
|
||||
}
|
||||
|
||||
type MessageProcessor struct {
|
||||
identity *ecdsa.PrivateKey
|
||||
datasync *datasync.DataSync
|
||||
protocol *encryption.Protocol
|
||||
transport transport.Transport
|
||||
logger *zap.Logger
|
||||
identity *ecdsa.PrivateKey
|
||||
datasync *datasync.DataSync
|
||||
protocol *encryption.Protocol
|
||||
transport transport.Transport
|
||||
logger *zap.Logger
|
||||
ephemeralKeys map[string]*ecdsa.PrivateKey
|
||||
mutex sync.Mutex
|
||||
|
||||
subscriptions []chan<- *SentMessage
|
||||
|
||||
@ -76,12 +79,13 @@ func NewMessageProcessor(
|
||||
ds := datasync.New(dataSyncNode, dataSyncTransport, features.Datasync, logger)
|
||||
|
||||
p := &MessageProcessor{
|
||||
identity: identity,
|
||||
datasync: ds,
|
||||
protocol: enc,
|
||||
transport: transport,
|
||||
logger: logger,
|
||||
featureFlags: features,
|
||||
identity: identity,
|
||||
datasync: ds,
|
||||
protocol: enc,
|
||||
transport: transport,
|
||||
logger: logger,
|
||||
ephemeralKeys: make(map[string]*ecdsa.PrivateKey),
|
||||
featureFlags: features,
|
||||
}
|
||||
|
||||
// Initializing DataSync is required to encrypt and send messages.
|
||||
@ -111,7 +115,7 @@ func (p *MessageProcessor) SendPrivate(
|
||||
) ([]byte, error) {
|
||||
p.logger.Debug(
|
||||
"sending a private message",
|
||||
zap.Binary("public-key", crypto.FromECDSAPub(recipient)),
|
||||
zap.String("public-key", types.EncodeHex(crypto.FromECDSAPub(recipient))),
|
||||
zap.String("site", "SendPrivate"),
|
||||
)
|
||||
return p.sendPrivate(ctx, recipient, rawMessage)
|
||||
@ -128,12 +132,15 @@ func (p *MessageProcessor) SendGroup(
|
||||
"sending a private group message",
|
||||
zap.String("site", "SendGroup"),
|
||||
)
|
||||
if rawMessage.Sender == nil {
|
||||
rawMessage.Sender = p.identity
|
||||
}
|
||||
// Calculate messageID first
|
||||
wrappedMessage, err := p.wrapMessageV1(rawMessage)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to wrap message")
|
||||
}
|
||||
messageID := v1protocol.MessageID(&p.identity.PublicKey, wrappedMessage)
|
||||
messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
|
||||
|
||||
// Send to each recipients
|
||||
for _, recipient := range recipients {
|
||||
@ -151,24 +158,42 @@ func (p *MessageProcessor) sendPrivate(
|
||||
recipient *ecdsa.PublicKey,
|
||||
rawMessage *RawMessage,
|
||||
) ([]byte, error) {
|
||||
p.logger.Debug("sending private message", zap.Binary("recipient", crypto.FromECDSAPub(recipient)))
|
||||
p.logger.Debug("sending private message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient))))
|
||||
|
||||
if rawMessage.ResendAutomatically && (rawMessage.Sender != nil || rawMessage.SkipNegotiation) {
|
||||
return nil, errors.New("setting identity, skip-negotiation and datasync not supported")
|
||||
}
|
||||
|
||||
// If we use our own key we don't skip negotiation
|
||||
if rawMessage.Sender == nil {
|
||||
rawMessage.Sender = p.identity
|
||||
}
|
||||
|
||||
wrappedMessage, err := p.wrapMessageV1(rawMessage)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to wrap message")
|
||||
}
|
||||
|
||||
messageID := v1protocol.MessageID(&p.identity.PublicKey, wrappedMessage)
|
||||
messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
|
||||
|
||||
if p.featureFlags.Datasync && rawMessage.ResendAutomatically {
|
||||
// No need to call transport tracking.
|
||||
// It is done in a data sync dispatch step.
|
||||
if err := p.addToDataSync(recipient, wrappedMessage); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to send message with datasync")
|
||||
}
|
||||
|
||||
// No need to call transport tracking.
|
||||
// It is done in a data sync dispatch step.
|
||||
} else if rawMessage.SkipNegotiation {
|
||||
messageIDs := [][]byte{messageID}
|
||||
hash, newMessage, err := p.sendRawMessage(ctx, recipient, wrappedMessage, messageIDs)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to send a message spec")
|
||||
}
|
||||
|
||||
p.transport.Track(messageIDs, hash, newMessage)
|
||||
|
||||
} else {
|
||||
messageSpec, err := p.protocol.BuildDirectMessage(p.identity, recipient, wrappedMessage)
|
||||
messageSpec, err := p.protocol.BuildDirectMessage(rawMessage.Sender, recipient, wrappedMessage)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to encrypt message")
|
||||
}
|
||||
@ -191,7 +216,7 @@ func (p *MessageProcessor) SendPairInstallation(
|
||||
recipient *ecdsa.PublicKey,
|
||||
rawMessage *RawMessage,
|
||||
) ([]byte, error) {
|
||||
p.logger.Debug("sending private message", zap.Binary("recipient", crypto.FromECDSAPub(recipient)))
|
||||
p.logger.Debug("sending private message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient))))
|
||||
|
||||
wrappedMessage, err := p.wrapMessageV1(rawMessage)
|
||||
if err != nil {
|
||||
@ -243,6 +268,9 @@ func (p *MessageProcessor) SendPublic(
|
||||
rawMessage *RawMessage,
|
||||
) ([]byte, error) {
|
||||
var newMessage *types.NewMessage
|
||||
if rawMessage.Sender == nil {
|
||||
rawMessage.Sender = p.identity
|
||||
}
|
||||
|
||||
wrappedMessage, err := p.wrapMessageV1(rawMessage)
|
||||
if err != nil {
|
||||
@ -261,7 +289,7 @@ func (p *MessageProcessor) SendPublic(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
messageID := v1protocol.MessageID(&p.identity.PublicKey, wrappedMessage)
|
||||
messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
|
||||
|
||||
p.transport.Track([][]byte{messageID}, hash, newMessage)
|
||||
|
||||
@ -314,8 +342,20 @@ func (p *MessageProcessor) HandleMessages(shhMessage *types.Message, application
|
||||
func (p *MessageProcessor) handleEncryptionLayer(ctx context.Context, message *v1protocol.StatusMessage) error {
|
||||
logger := p.logger.With(zap.String("site", "handleEncryptionLayer"))
|
||||
publicKey := message.SigPubKey()
|
||||
destination := message.Dst
|
||||
|
||||
err := message.HandleEncryption(p.identity, publicKey, p.protocol)
|
||||
destinationID := types.EncodeHex(crypto.FromECDSAPub(destination))
|
||||
p.mutex.Lock()
|
||||
decryptionKey, ok := p.ephemeralKeys[destinationID]
|
||||
p.mutex.Unlock()
|
||||
logger.Info("destination id", zap.String("desti", destinationID))
|
||||
skipNegotiation := true
|
||||
if !ok {
|
||||
skipNegotiation = false
|
||||
decryptionKey = p.identity
|
||||
}
|
||||
|
||||
err := message.HandleEncryption(decryptionKey, publicKey, p.protocol, skipNegotiation)
|
||||
if err == encryption.ErrDeviceNotFound {
|
||||
if err := p.handleErrDeviceNotFound(ctx, publicKey); err != nil {
|
||||
logger.Error("failed to handle ErrDeviceNotFound", zap.Error(err))
|
||||
@ -358,7 +398,7 @@ func (p *MessageProcessor) handleErrDeviceNotFound(ctx context.Context, publicKe
|
||||
}
|
||||
|
||||
func (p *MessageProcessor) wrapMessageV1(rawMessage *RawMessage) ([]byte, error) {
|
||||
wrappedMessage, err := v1protocol.WrapMessageV1(rawMessage.Payload, rawMessage.MessageType, p.identity)
|
||||
wrappedMessage, err := v1protocol.WrapMessageV1(rawMessage.Payload, rawMessage.MessageType, rawMessage.Sender)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to wrap message")
|
||||
}
|
||||
@ -409,6 +449,24 @@ func (p *MessageProcessor) sendDataSync(ctx context.Context, publicKey *ecdsa.Pu
|
||||
return nil
|
||||
}
|
||||
|
||||
// sendRawMessage sends a message not wrapped in an encryption layer
|
||||
func (p *MessageProcessor) sendRawMessage(ctx context.Context, publicKey *ecdsa.PublicKey, payload []byte, messageIDs [][]byte) ([]byte, *types.NewMessage, error) {
|
||||
newMessage := &types.NewMessage{
|
||||
TTL: whisperTTL,
|
||||
Payload: payload,
|
||||
PowTarget: calculatePoW(payload),
|
||||
PowTime: whisperPoWTime,
|
||||
}
|
||||
|
||||
hash, err := p.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return hash, newMessage, nil
|
||||
|
||||
}
|
||||
|
||||
// sendMessageSpec analyses the spec properties and selects a proper transport method.
|
||||
func (p *MessageProcessor) sendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec, messageIDs [][]byte) ([]byte, *types.NewMessage, error) {
|
||||
newMessage, err := MessageSpecToWhisper(messageSpec)
|
||||
@ -462,6 +520,13 @@ func (p *MessageProcessor) JoinPublic(chatID string) error {
|
||||
return p.transport.JoinPublic(chatID)
|
||||
}
|
||||
|
||||
func (p *MessageProcessor) LoadKeyFilters(privateKey *ecdsa.PrivateKey) (*transport.Filter, error) {
|
||||
p.mutex.Lock()
|
||||
p.ephemeralKeys[types.EncodeHex(crypto.FromECDSAPub(&privateKey.PublicKey))] = privateKey
|
||||
p.mutex.Unlock()
|
||||
return p.transport.LoadKeyFilters(privateKey)
|
||||
}
|
||||
|
||||
func MessageSpecToWhisper(spec *encryption.ProtocolMessageSpec) (*types.NewMessage, error) {
|
||||
var newMessage *types.NewMessage
|
||||
|
||||
|
@ -15,7 +15,9 @@ type RawMessage struct {
|
||||
SendCount int
|
||||
Sent bool
|
||||
ResendAutomatically bool
|
||||
SkipNegotiation bool
|
||||
MessageType protobuf.ApplicationMetadataMessage_Type
|
||||
Payload []byte
|
||||
Sender *ecdsa.PrivateKey
|
||||
Recipients []*ecdsa.PublicKey
|
||||
}
|
||||
|
@ -1379,6 +1379,7 @@ func (m *Messenger) SendChatMessage(ctx context.Context, message *Message) (*Mes
|
||||
|
||||
}
|
||||
logger := m.logger.With(zap.String("site", "Send"), zap.String("chatID", message.ChatId))
|
||||
logger.Info("SENDING CHAT MESSAGE")
|
||||
var response MessengerResponse
|
||||
|
||||
// A valid added chat is required.
|
||||
@ -1426,6 +1427,7 @@ func (m *Messenger) SendChatMessage(ctx context.Context, message *Message) (*Mes
|
||||
return nil, errors.New("chat type not supported")
|
||||
}
|
||||
|
||||
// THERE'S A RACE CONDITION, WE SHOULD CALCULATE AND TRACK THE ID FIRST
|
||||
id, err := m.dispatchMessage(ctx, &common.RawMessage{
|
||||
LocalChatID: chat.ID,
|
||||
Payload: encodedMessage,
|
||||
@ -1743,7 +1745,6 @@ func (m *Messenger) syncContact(ctx context.Context, contact *Contact) error {
|
||||
// RetrieveAll retrieves messages from all filters, processes them and returns a
|
||||
// MessengerResponse to the client
|
||||
func (m *Messenger) RetrieveAll() (*MessengerResponse, error) {
|
||||
m.logger.Info("RETRIEVING ALL", zap.String("installation-id", m.installationID))
|
||||
chatWithMessages, err := m.transport.RetrieveRawAll()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -15,7 +15,7 @@ import (
|
||||
"github.com/status-im/status-go/eth-node/crypto"
|
||||
"github.com/status-im/status-go/eth-node/types"
|
||||
"github.com/status-im/status-go/protocol/tt"
|
||||
"github.com/status-im/status-go/whisper/v6"
|
||||
"github.com/status-im/status-go/waku"
|
||||
)
|
||||
|
||||
func TestMessengerContactUpdateSuite(t *testing.T) {
|
||||
@ -27,8 +27,8 @@ type MessengerContactUpdateSuite struct {
|
||||
m *Messenger // main instance of Messenger
|
||||
privateKey *ecdsa.PrivateKey // private key for the main instance of Messenger
|
||||
// If one wants to send messages between different instances of Messenger,
|
||||
// a single Whisper service should be shared.
|
||||
shh types.Whisper
|
||||
// a single waku service should be shared.
|
||||
shh types.Waku
|
||||
tmpFiles []*os.File // files to clean up
|
||||
logger *zap.Logger
|
||||
}
|
||||
@ -36,17 +36,17 @@ type MessengerContactUpdateSuite struct {
|
||||
func (s *MessengerContactUpdateSuite) SetupTest() {
|
||||
s.logger = tt.MustCreateTestLogger()
|
||||
|
||||
config := whisper.DefaultConfig
|
||||
config.MinimumAcceptedPOW = 0
|
||||
shh := whisper.New(&config)
|
||||
s.shh = gethbridge.NewGethWhisperWrapper(shh)
|
||||
config := waku.DefaultConfig
|
||||
config.MinimumAcceptedPoW = 0
|
||||
shh := waku.New(&config, s.logger)
|
||||
s.shh = gethbridge.NewGethWakuWrapper(shh)
|
||||
s.Require().NoError(shh.Start(nil))
|
||||
|
||||
s.m = s.newMessenger(s.shh)
|
||||
s.privateKey = s.m.identity
|
||||
}
|
||||
|
||||
func (s *MessengerContactUpdateSuite) newMessengerWithKey(shh types.Whisper, privateKey *ecdsa.PrivateKey) *Messenger {
|
||||
func (s *MessengerContactUpdateSuite) newMessengerWithKey(shh types.Waku, privateKey *ecdsa.PrivateKey) *Messenger {
|
||||
tmpFile, err := ioutil.TempFile("", "")
|
||||
s.Require().NoError(err)
|
||||
|
||||
@ -72,7 +72,7 @@ func (s *MessengerContactUpdateSuite) newMessengerWithKey(shh types.Whisper, pri
|
||||
return m
|
||||
}
|
||||
|
||||
func (s *MessengerContactUpdateSuite) newMessenger(shh types.Whisper) *Messenger {
|
||||
func (s *MessengerContactUpdateSuite) newMessenger(shh types.Waku) *Messenger {
|
||||
privateKey, err := crypto.GenerateKey()
|
||||
s.Require().NoError(err)
|
||||
|
||||
|
@ -17,7 +17,7 @@ import (
|
||||
"github.com/status-im/status-go/eth-node/types"
|
||||
"github.com/status-im/status-go/protocol/encryption/multidevice"
|
||||
"github.com/status-im/status-go/protocol/tt"
|
||||
"github.com/status-im/status-go/whisper/v6"
|
||||
"github.com/status-im/status-go/waku"
|
||||
)
|
||||
|
||||
func TestMessengerInstallationSuite(t *testing.T) {
|
||||
@ -30,8 +30,8 @@ type MessengerInstallationSuite struct {
|
||||
privateKey *ecdsa.PrivateKey // private key for the main instance of Messenger
|
||||
|
||||
// If one wants to send messages between different instances of Messenger,
|
||||
// a single Whisper service should be shared.
|
||||
shh types.Whisper
|
||||
// a single Waku service should be shared.
|
||||
shh types.Waku
|
||||
|
||||
tmpFiles []*os.File // files to clean up
|
||||
logger *zap.Logger
|
||||
@ -40,17 +40,17 @@ type MessengerInstallationSuite struct {
|
||||
func (s *MessengerInstallationSuite) SetupTest() {
|
||||
s.logger = tt.MustCreateTestLogger()
|
||||
|
||||
config := whisper.DefaultConfig
|
||||
config.MinimumAcceptedPOW = 0
|
||||
shh := whisper.New(&config)
|
||||
s.shh = gethbridge.NewGethWhisperWrapper(shh)
|
||||
config := waku.DefaultConfig
|
||||
config.MinimumAcceptedPoW = 0
|
||||
shh := waku.New(&config, s.logger)
|
||||
s.shh = gethbridge.NewGethWakuWrapper(shh)
|
||||
s.Require().NoError(shh.Start(nil))
|
||||
|
||||
s.m = s.newMessenger(s.shh)
|
||||
s.privateKey = s.m.identity
|
||||
}
|
||||
|
||||
func (s *MessengerInstallationSuite) newMessengerWithKey(shh types.Whisper, privateKey *ecdsa.PrivateKey) *Messenger {
|
||||
func (s *MessengerInstallationSuite) newMessengerWithKey(shh types.Waku, privateKey *ecdsa.PrivateKey) *Messenger {
|
||||
tmpFile, err := ioutil.TempFile("", "")
|
||||
s.Require().NoError(err)
|
||||
|
||||
@ -77,7 +77,7 @@ func (s *MessengerInstallationSuite) newMessengerWithKey(shh types.Whisper, priv
|
||||
return m
|
||||
}
|
||||
|
||||
func (s *MessengerInstallationSuite) newMessenger(shh types.Whisper) *Messenger {
|
||||
func (s *MessengerInstallationSuite) newMessenger(shh types.Waku) *Messenger {
|
||||
privateKey, err := crypto.GenerateKey()
|
||||
s.Require().NoError(err)
|
||||
|
||||
|
@ -14,7 +14,7 @@ import (
|
||||
"github.com/status-im/status-go/eth-node/crypto"
|
||||
"github.com/status-im/status-go/eth-node/types"
|
||||
"github.com/status-im/status-go/protocol/tt"
|
||||
"github.com/status-im/status-go/whisper/v6"
|
||||
"github.com/status-im/status-go/waku"
|
||||
)
|
||||
|
||||
func TestMessengerMuteSuite(t *testing.T) {
|
||||
@ -27,8 +27,8 @@ type MessengerMuteSuite struct {
|
||||
privateKey *ecdsa.PrivateKey // private key for the main instance of Messenger
|
||||
|
||||
// If one wants to send messages between different instances of Messenger,
|
||||
// a single Whisper service should be shared.
|
||||
shh types.Whisper
|
||||
// a single Waku service should be shared.
|
||||
shh types.Waku
|
||||
|
||||
tmpFiles []*os.File // files to clean up
|
||||
logger *zap.Logger
|
||||
@ -37,17 +37,17 @@ type MessengerMuteSuite struct {
|
||||
func (s *MessengerMuteSuite) SetupTest() {
|
||||
s.logger = tt.MustCreateTestLogger()
|
||||
|
||||
config := whisper.DefaultConfig
|
||||
config.MinimumAcceptedPOW = 0
|
||||
shh := whisper.New(&config)
|
||||
s.shh = gethbridge.NewGethWhisperWrapper(shh)
|
||||
config := waku.DefaultConfig
|
||||
config.MinimumAcceptedPoW = 0
|
||||
shh := waku.New(&config, s.logger)
|
||||
s.shh = gethbridge.NewGethWakuWrapper(shh)
|
||||
s.Require().NoError(shh.Start(nil))
|
||||
|
||||
s.m = s.newMessenger(s.shh)
|
||||
s.privateKey = s.m.identity
|
||||
}
|
||||
|
||||
func (s *MessengerMuteSuite) newMessengerWithKey(shh types.Whisper, privateKey *ecdsa.PrivateKey) *Messenger {
|
||||
func (s *MessengerMuteSuite) newMessengerWithKey(shh types.Waku, privateKey *ecdsa.PrivateKey) *Messenger {
|
||||
tmpFile, err := ioutil.TempFile("", "")
|
||||
s.Require().NoError(err)
|
||||
|
||||
@ -74,7 +74,7 @@ func (s *MessengerMuteSuite) newMessengerWithKey(shh types.Whisper, privateKey *
|
||||
return m
|
||||
}
|
||||
|
||||
func (s *MessengerMuteSuite) newMessenger(shh types.Whisper) *Messenger {
|
||||
func (s *MessengerMuteSuite) newMessenger(shh types.Waku) *Messenger {
|
||||
privateKey, err := crypto.GenerateKey()
|
||||
s.Require().NoError(err)
|
||||
|
||||
|
@ -27,7 +27,7 @@ import (
|
||||
"github.com/status-im/status-go/protocol/protobuf"
|
||||
"github.com/status-im/status-go/protocol/tt"
|
||||
v1protocol "github.com/status-im/status-go/protocol/v1"
|
||||
"github.com/status-im/status-go/whisper/v6"
|
||||
"github.com/status-im/status-go/waku"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -62,13 +62,13 @@ type MessengerSuite struct {
|
||||
privateKey *ecdsa.PrivateKey // private key for the main instance of Messenger
|
||||
// If one wants to send messages between different instances of Messenger,
|
||||
// a single Whisper service should be shared.
|
||||
shh types.Whisper
|
||||
shh types.Waku
|
||||
tmpFiles []*os.File // files to clean up
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
type testNode struct {
|
||||
shh types.Whisper
|
||||
shh types.Waku
|
||||
}
|
||||
|
||||
func (n *testNode) NewENSVerifier(_ *zap.Logger) enstypes.ENSVerifier {
|
||||
@ -84,27 +84,27 @@ func (n *testNode) RemovePeer(_ string) error {
|
||||
}
|
||||
|
||||
func (n *testNode) GetWaku(_ interface{}) (types.Waku, error) {
|
||||
panic("not implemented")
|
||||
return n.shh, nil
|
||||
}
|
||||
|
||||
func (n *testNode) GetWhisper(_ interface{}) (types.Whisper, error) {
|
||||
return n.shh, nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *MessengerSuite) SetupTest() {
|
||||
s.logger = tt.MustCreateTestLogger()
|
||||
|
||||
config := whisper.DefaultConfig
|
||||
config.MinimumAcceptedPOW = 0
|
||||
shh := whisper.New(&config)
|
||||
s.shh = gethbridge.NewGethWhisperWrapper(shh)
|
||||
config := waku.DefaultConfig
|
||||
config.MinimumAcceptedPoW = 0
|
||||
shh := waku.New(&config, s.logger)
|
||||
s.shh = gethbridge.NewGethWakuWrapper(shh)
|
||||
s.Require().NoError(shh.Start(nil))
|
||||
|
||||
s.m = s.newMessenger(s.shh)
|
||||
s.privateKey = s.m.identity
|
||||
}
|
||||
|
||||
func (s *MessengerSuite) newMessengerWithKey(shh types.Whisper, privateKey *ecdsa.PrivateKey) *Messenger {
|
||||
func (s *MessengerSuite) newMessengerWithKey(shh types.Waku, privateKey *ecdsa.PrivateKey) *Messenger {
|
||||
tmpFile, err := ioutil.TempFile("", "")
|
||||
s.Require().NoError(err)
|
||||
|
||||
@ -132,7 +132,7 @@ func (s *MessengerSuite) newMessengerWithKey(shh types.Whisper, privateKey *ecds
|
||||
return m
|
||||
}
|
||||
|
||||
func (s *MessengerSuite) newMessenger(shh types.Whisper) *Messenger {
|
||||
func (s *MessengerSuite) newMessenger(shh types.Waku) *Messenger {
|
||||
privateKey, err := crypto.GenerateKey()
|
||||
s.Require().NoError(err)
|
||||
return s.newMessengerWithKey(shh, privateKey)
|
||||
@ -2056,7 +2056,7 @@ type MockEthClient struct {
|
||||
}
|
||||
|
||||
type mockSendMessagesRequest struct {
|
||||
types.Whisper
|
||||
types.Waku
|
||||
req types.MessagesRequest
|
||||
}
|
||||
|
||||
@ -2101,7 +2101,7 @@ func (s *MessengerSuite) TestMessageJSON() {
|
||||
|
||||
func (s *MessengerSuite) TestRequestHistoricMessagesRequest() {
|
||||
shh := &mockSendMessagesRequest{
|
||||
Whisper: s.shh,
|
||||
Waku: s.shh,
|
||||
}
|
||||
m := s.newMessenger(shh)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
|
||||
|
@ -288,6 +288,7 @@ func (c *Client) HandleMessageSent(sentMessage *common.SentMessage) error {
|
||||
|
||||
// Nothing to do
|
||||
if len(trackedMessageIDs) == 0 {
|
||||
c.config.Logger.Info("nothing to do")
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -611,6 +612,14 @@ func (c *Client) sendNotification(publicKey *ecdsa.PublicKey, installationIDs []
|
||||
}
|
||||
|
||||
c.config.Logger.Info("actionable info", zap.Int("count", len(actionableInfos)))
|
||||
ephemeralKey, err := crypto.GenerateKey()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, err = c.messageProcessor.LoadKeyFilters(ephemeralKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var actionedInfo []*PushNotificationInfo
|
||||
for _, infos := range actionableInfos {
|
||||
@ -636,8 +645,10 @@ func (c *Client) sendNotification(publicKey *ecdsa.PublicKey, installationIDs []
|
||||
}
|
||||
|
||||
rawMessage := &common.RawMessage{
|
||||
Payload: payload,
|
||||
MessageType: protobuf.ApplicationMetadataMessage_PUSH_NOTIFICATION_REQUEST,
|
||||
Payload: payload,
|
||||
Sender: ephemeralKey,
|
||||
SkipNegotiation: true,
|
||||
MessageType: protobuf.ApplicationMetadataMessage_PUSH_NOTIFICATION_REQUEST,
|
||||
}
|
||||
|
||||
// TODO: We should use the messageID for the response
|
||||
@ -1076,11 +1087,24 @@ func (c *Client) QueryPushNotificationInfo(publicKey *ecdsa.PublicKey) error {
|
||||
return err
|
||||
}
|
||||
|
||||
rawMessage := &common.RawMessage{
|
||||
Payload: encodedMessage,
|
||||
MessageType: protobuf.ApplicationMetadataMessage_PUSH_NOTIFICATION_QUERY,
|
||||
ephemeralKey, err := crypto.GenerateKey()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rawMessage := &common.RawMessage{
|
||||
Payload: encodedMessage,
|
||||
Sender: ephemeralKey,
|
||||
SkipNegotiation: true,
|
||||
MessageType: protobuf.ApplicationMetadataMessage_PUSH_NOTIFICATION_QUERY,
|
||||
}
|
||||
|
||||
filter, err := c.messageProcessor.LoadKeyFilters(ephemeralKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.config.Logger.Debug("Filter", zap.String("filter-id", filter.FilterID))
|
||||
|
||||
encodedPublicKey := hex.EncodeToString(hashedPublicKey)
|
||||
c.config.Logger.Info("sending query")
|
||||
messageID, err := c.messageProcessor.SendPublic(context.Background(), encodedPublicKey, rawMessage)
|
||||
|
@ -360,8 +360,9 @@ func (p *Server) HandlePushNotificationQuery2(publicKey *ecdsa.PublicKey, messag
|
||||
}
|
||||
|
||||
rawMessage := &common.RawMessage{
|
||||
Payload: encodedMessage,
|
||||
MessageType: protobuf.ApplicationMetadataMessage_PUSH_NOTIFICATION_QUERY_RESPONSE,
|
||||
Payload: encodedMessage,
|
||||
MessageType: protobuf.ApplicationMetadataMessage_PUSH_NOTIFICATION_QUERY_RESPONSE,
|
||||
SkipNegotiation: true,
|
||||
}
|
||||
|
||||
_, err = p.messageProcessor.SendPrivate(context.Background(), publicKey, rawMessage)
|
||||
@ -382,8 +383,9 @@ func (s *Server) HandlePushNotificationRequest2(publicKey *ecdsa.PublicKey,
|
||||
}
|
||||
|
||||
rawMessage := &common.RawMessage{
|
||||
Payload: encodedMessage,
|
||||
MessageType: protobuf.ApplicationMetadataMessage_PUSH_NOTIFICATION_RESPONSE,
|
||||
Payload: encodedMessage,
|
||||
MessageType: protobuf.ApplicationMetadataMessage_PUSH_NOTIFICATION_RESPONSE,
|
||||
SkipNegotiation: true,
|
||||
}
|
||||
|
||||
_, err = s.messageProcessor.SendPrivate(context.Background(), publicKey, rawMessage)
|
||||
|
@ -20,7 +20,7 @@ import (
|
||||
"github.com/status-im/status-go/protocol/push_notification_client"
|
||||
"github.com/status-im/status-go/protocol/push_notification_server"
|
||||
"github.com/status-im/status-go/protocol/tt"
|
||||
"github.com/status-im/status-go/whisper/v6"
|
||||
"github.com/status-im/status-go/waku"
|
||||
)
|
||||
|
||||
func TestMessengerPushNotificationSuite(t *testing.T) {
|
||||
@ -32,8 +32,8 @@ type MessengerPushNotificationSuite struct {
|
||||
m *Messenger // main instance of Messenger
|
||||
privateKey *ecdsa.PrivateKey // private key for the main instance of Messenger
|
||||
// If one wants to send messages between different instances of Messenger,
|
||||
// a single Whisper service should be shared.
|
||||
shh types.Whisper
|
||||
// a single Waku service should be shared.
|
||||
shh types.Waku
|
||||
tmpFiles []*os.File // files to clean up
|
||||
logger *zap.Logger
|
||||
}
|
||||
@ -41,17 +41,25 @@ type MessengerPushNotificationSuite struct {
|
||||
func (s *MessengerPushNotificationSuite) SetupTest() {
|
||||
s.logger = tt.MustCreateTestLogger()
|
||||
|
||||
config := whisper.DefaultConfig
|
||||
config.MinimumAcceptedPOW = 0
|
||||
shh := whisper.New(&config)
|
||||
s.shh = gethbridge.NewGethWhisperWrapper(shh)
|
||||
config := waku.DefaultConfig
|
||||
config.MinimumAcceptedPoW = 0
|
||||
shh := waku.New(&config, s.logger)
|
||||
s.shh = gethbridge.NewGethWakuWrapper(shh)
|
||||
s.Require().NoError(shh.Start(nil))
|
||||
|
||||
s.m = s.newMessenger(s.shh)
|
||||
s.privateKey = s.m.identity
|
||||
}
|
||||
|
||||
func (s *MessengerPushNotificationSuite) newMessengerWithOptions(shh types.Whisper, privateKey *ecdsa.PrivateKey, options []Option) *Messenger {
|
||||
func (s *MessengerPushNotificationSuite) TearDownTest() {
|
||||
s.Require().NoError(s.m.Shutdown())
|
||||
for _, f := range s.tmpFiles {
|
||||
_ = os.Remove(f.Name())
|
||||
}
|
||||
_ = s.logger.Sync()
|
||||
}
|
||||
|
||||
func (s *MessengerPushNotificationSuite) newMessengerWithOptions(shh types.Waku, privateKey *ecdsa.PrivateKey, options []Option) *Messenger {
|
||||
tmpFile, err := ioutil.TempFile("", "")
|
||||
s.Require().NoError(err)
|
||||
|
||||
@ -71,7 +79,7 @@ func (s *MessengerPushNotificationSuite) newMessengerWithOptions(shh types.Whisp
|
||||
return m
|
||||
}
|
||||
|
||||
func (s *MessengerPushNotificationSuite) newMessengerWithKey(shh types.Whisper, privateKey *ecdsa.PrivateKey) *Messenger {
|
||||
func (s *MessengerPushNotificationSuite) newMessengerWithKey(shh types.Waku, privateKey *ecdsa.PrivateKey) *Messenger {
|
||||
tmpFile, err := ioutil.TempFile("", "")
|
||||
s.Require().NoError(err)
|
||||
|
||||
@ -84,14 +92,14 @@ func (s *MessengerPushNotificationSuite) newMessengerWithKey(shh types.Whisper,
|
||||
return s.newMessengerWithOptions(shh, privateKey, options)
|
||||
}
|
||||
|
||||
func (s *MessengerPushNotificationSuite) newMessenger(shh types.Whisper) *Messenger {
|
||||
func (s *MessengerPushNotificationSuite) newMessenger(shh types.Waku) *Messenger {
|
||||
privateKey, err := crypto.GenerateKey()
|
||||
s.Require().NoError(err)
|
||||
|
||||
return s.newMessengerWithKey(s.shh, privateKey)
|
||||
}
|
||||
|
||||
func (s *MessengerPushNotificationSuite) newPushNotificationServer(shh types.Whisper) *Messenger {
|
||||
func (s *MessengerPushNotificationSuite) newPushNotificationServer(shh types.Waku) *Messenger {
|
||||
privateKey, err := crypto.GenerateKey()
|
||||
s.Require().NoError(err)
|
||||
|
||||
@ -274,6 +282,9 @@ func (s *MessengerPushNotificationSuite) TestReceivePushNotification() {
|
||||
return nil
|
||||
})
|
||||
s.Require().NoError(err)
|
||||
s.Require().NoError(bob2.Shutdown())
|
||||
s.Require().NoError(alice.Shutdown())
|
||||
s.Require().NoError(server.Shutdown())
|
||||
}
|
||||
|
||||
func (s *MessengerPushNotificationSuite) TestReceivePushNotificationFromContactOnly() {
|
||||
@ -402,6 +413,8 @@ func (s *MessengerPushNotificationSuite) TestReceivePushNotificationFromContactO
|
||||
})
|
||||
|
||||
s.Require().NoError(err)
|
||||
s.Require().NoError(alice.Shutdown())
|
||||
s.Require().NoError(server.Shutdown())
|
||||
}
|
||||
|
||||
func (s *MessengerPushNotificationSuite) TestReceivePushNotificationRetries() {
|
||||
@ -614,4 +627,6 @@ func (s *MessengerPushNotificationSuite) TestReceivePushNotificationRetries() {
|
||||
})
|
||||
|
||||
s.Require().NoError(err)
|
||||
s.Require().NoError(alice.Shutdown())
|
||||
s.Require().NoError(server.Shutdown())
|
||||
}
|
||||
|
@ -218,15 +218,15 @@ func (s *FiltersManager) Remove(filters ...*Filter) error {
|
||||
}
|
||||
|
||||
// LoadPartitioned creates a filter for a partitioned topic.
|
||||
func (s *FiltersManager) LoadPartitioned(publicKey *ecdsa.PublicKey) (*Filter, error) {
|
||||
return s.loadPartitioned(publicKey, false)
|
||||
func (s *FiltersManager) LoadPartitioned(publicKey *ecdsa.PublicKey, identity *ecdsa.PrivateKey, listen bool) (*Filter, error) {
|
||||
return s.loadPartitioned(publicKey, identity, listen)
|
||||
}
|
||||
|
||||
func (s *FiltersManager) loadMyPartitioned() (*Filter, error) {
|
||||
return s.loadPartitioned(&s.privateKey.PublicKey, true)
|
||||
return s.loadPartitioned(&s.privateKey.PublicKey, s.privateKey, true)
|
||||
}
|
||||
|
||||
func (s *FiltersManager) loadPartitioned(publicKey *ecdsa.PublicKey, listen bool) (*Filter, error) {
|
||||
func (s *FiltersManager) loadPartitioned(publicKey *ecdsa.PublicKey, identity *ecdsa.PrivateKey, listen bool) (*Filter, error) {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
|
||||
@ -237,7 +237,7 @@ func (s *FiltersManager) loadPartitioned(publicKey *ecdsa.PublicKey, listen bool
|
||||
|
||||
// We set up a filter so we can publish,
|
||||
// but we discard envelopes if listen is false.
|
||||
filter, err := s.addAsymmetric(chatID, listen)
|
||||
filter, err := s.addAsymmetric(chatID, identity, listen)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -321,7 +321,7 @@ func (s *FiltersManager) LoadDiscovery() ([]*Filter, error) {
|
||||
OneToOne: true,
|
||||
}
|
||||
|
||||
discoveryResponse, err := s.addAsymmetric(personalDiscoveryChat.ChatID, true)
|
||||
discoveryResponse, err := s.addAsymmetric(personalDiscoveryChat.ChatID, s.privateKey, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -439,7 +439,7 @@ func (s *FiltersManager) addSymmetric(chatID string) (*RawFilter, error) {
|
||||
|
||||
// addAsymmetricFilter adds a filter with our private key
|
||||
// and set minPow according to the listen parameter.
|
||||
func (s *FiltersManager) addAsymmetric(chatID string, listen bool) (*RawFilter, error) {
|
||||
func (s *FiltersManager) addAsymmetric(chatID string, identity *ecdsa.PrivateKey, listen bool) (*RawFilter, error) {
|
||||
var (
|
||||
err error
|
||||
pow = 1.0 // use PoW high enough to discard all messages for the filter
|
||||
@ -452,7 +452,7 @@ func (s *FiltersManager) addAsymmetric(chatID string, listen bool) (*RawFilter,
|
||||
topic := ToTopic(chatID)
|
||||
topics := [][]byte{topic}
|
||||
|
||||
privateKeyID, err := s.service.AddKeyPair(s.privateKey)
|
||||
privateKeyID, err := s.service.AddKeyPair(identity)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -35,6 +35,7 @@ type Transport interface {
|
||||
RemoveFilters(filters []*Filter) error
|
||||
ResetFilters() error
|
||||
Filters() []*Filter
|
||||
LoadKeyFilters(*ecdsa.PrivateKey) (*Filter, error)
|
||||
ProcessNegotiatedSecret(secret types.NegotiatedSecret) (*Filter, error)
|
||||
RetrieveRawAll() (map[Filter][]*types.Message, error)
|
||||
}
|
||||
|
@ -204,61 +204,6 @@ func (a *Transport) LeaveGroup(publicKeys []*ecdsa.PublicKey) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type Message struct {
|
||||
Message *types.Message
|
||||
Public bool
|
||||
}
|
||||
|
||||
func (a *Transport) RetrieveAllMessages() ([]Message, error) {
|
||||
var messages []Message
|
||||
|
||||
for _, filter := range a.filters.Filters() {
|
||||
filterMsgs, err := a.api.GetFilterMessages(filter.FilterID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, m := range filterMsgs {
|
||||
messages = append(messages, Message{
|
||||
Message: m,
|
||||
Public: filter.IsPublic(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return messages, nil
|
||||
}
|
||||
|
||||
func (a *Transport) RetrievePublicMessages(chatID string) ([]*types.Message, error) {
|
||||
filter, err := a.filters.LoadPublic(chatID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return a.api.GetFilterMessages(filter.FilterID)
|
||||
}
|
||||
|
||||
func (a *Transport) RetrievePrivateMessages(publicKey *ecdsa.PublicKey) ([]*types.Message, error) {
|
||||
chats := a.filters.FiltersByPublicKey(publicKey)
|
||||
discoveryChats, err := a.filters.Init(nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var result []*types.Message
|
||||
|
||||
for _, chat := range append(chats, discoveryChats...) {
|
||||
filterMsgs, err := a.api.GetFilterMessages(chat.FilterID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result = append(result, filterMsgs...)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (a *Transport) RetrieveRawAll() (map[transport.Filter][]*types.Message, error) {
|
||||
result := make(map[transport.Filter][]*types.Message)
|
||||
|
||||
@ -318,7 +263,7 @@ func (a *Transport) SendPrivateWithPartitioned(ctx context.Context, newMessage *
|
||||
return nil, err
|
||||
}
|
||||
|
||||
filter, err := a.filters.LoadPartitioned(publicKey)
|
||||
filter, err := a.filters.LoadPartitioned(publicKey, a.keysManager.privateKey, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -329,6 +274,10 @@ func (a *Transport) SendPrivateWithPartitioned(ctx context.Context, newMessage *
|
||||
return a.api.Post(ctx, *newMessage)
|
||||
}
|
||||
|
||||
func (a *Transport) LoadKeyFilters(key *ecdsa.PrivateKey) (*transport.Filter, error) {
|
||||
return a.filters.LoadPartitioned(&key.PublicKey, key, true)
|
||||
}
|
||||
|
||||
func (a *Transport) SendPrivateOnDiscovery(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error) {
|
||||
if err := a.addSig(newMessage); err != nil {
|
||||
return nil, err
|
||||
|
@ -318,7 +318,7 @@ func (a *Transport) SendPrivateWithPartitioned(ctx context.Context, newMessage *
|
||||
return nil, err
|
||||
}
|
||||
|
||||
filter, err := a.filters.LoadPartitioned(publicKey)
|
||||
filter, err := a.filters.LoadPartitioned(publicKey, a.keysManager.privateKey, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -406,6 +406,10 @@ func (a *Transport) SendMessagesRequest(
|
||||
return
|
||||
}
|
||||
|
||||
func (a *Transport) LoadKeyFilters(key *ecdsa.PrivateKey) (*transport.Filter, error) {
|
||||
return a.filters.LoadPartitioned(&key.PublicKey, key, true)
|
||||
}
|
||||
|
||||
func (a *Transport) waitForRequestCompleted(ctx context.Context, requestID []byte, events chan types.EnvelopeEvent) (*types.MailServerResponse, error) {
|
||||
for {
|
||||
select {
|
||||
|
@ -101,10 +101,14 @@ func (m *StatusMessage) HandleTransport(shhMessage *types.Message) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *StatusMessage) HandleEncryption(myKey *ecdsa.PrivateKey, senderKey *ecdsa.PublicKey, enc *encryption.Protocol) error {
|
||||
func (m *StatusMessage) HandleEncryption(myKey *ecdsa.PrivateKey, senderKey *ecdsa.PublicKey, enc *encryption.Protocol, skipNegotiation bool) error {
|
||||
// As we handle non-encrypted messages, we make sure that DecryptPayload
|
||||
// is set regardless of whether this step is successful
|
||||
m.DecryptedPayload = m.TransportPayload
|
||||
// Nothing to do
|
||||
if skipNegotiation {
|
||||
return nil
|
||||
}
|
||||
|
||||
var protocolMessage encryption.ProtocolMessage
|
||||
err := proto.Unmarshal(m.TransportPayload, &protocolMessage)
|
||||
|
Loading…
x
Reference in New Issue
Block a user