From 0dd1cd585befdc5be7d3a8f8665e94f45f50fc50 Mon Sep 17 00:00:00 2001 From: Adam Babik Date: Thu, 29 Aug 2019 09:01:59 +0200 Subject: [PATCH] Replace adapter with message processor (#58) --- adapters.go | 620 ------------------ datasync/datasync.go | 2 +- message_processor.go | 467 +++++++++++++ adapters_test.go => message_processor_test.go | 211 +++--- messenger.go | 229 +++---- messenger_test.go | 17 +- persistence.go | 29 +- transport/whisper/whisper_service.go | 18 +- v1/status_message.go | 8 +- 9 files changed, 715 insertions(+), 886 deletions(-) delete mode 100644 adapters.go create mode 100644 message_processor.go rename adapters_test.go => message_processor_test.go (67%) diff --git a/adapters.go b/adapters.go deleted file mode 100644 index 3f5769e..0000000 --- a/adapters.go +++ /dev/null @@ -1,620 +0,0 @@ -package statusproto - -import ( - "context" - "crypto/ecdsa" - "time" - - "go.uber.org/zap" - - "github.com/ethereum/go-ethereum/crypto" - "github.com/golang/protobuf/proto" - "github.com/pkg/errors" - "github.com/status-im/status-protocol-go/encryption/sharedsecret" - whisper "github.com/status-im/whisper/whisperv6" - - "github.com/status-im/status-protocol-go/encryption" - "github.com/status-im/status-protocol-go/encryption/multidevice" - transport "github.com/status-im/status-protocol-go/transport/whisper" - protocol "github.com/status-im/status-protocol-go/v1" - - "github.com/status-im/status-protocol-go/datasync" - datasyncpeer "github.com/status-im/status-protocol-go/datasync/peer" - datasyncproto "github.com/vacp2p/mvds/protobuf" -) - -// Whisper message properties. -const ( - whisperTTL = 15 - whisperPoW = 0.002 - whisperPoWTime = 5 -) - -// whisperAdapter is a bridge between encryption and transport -// layers. -type whisperAdapter struct { - privateKey *ecdsa.PrivateKey - transport *transport.WhisperServiceTransport - protocol *encryption.Protocol - datasync *datasync.DataSync - logger *zap.Logger - - featureFlags featureFlags -} - -func newWhisperAdapter( - pk *ecdsa.PrivateKey, - t *transport.WhisperServiceTransport, - p *encryption.Protocol, - d *datasync.DataSync, - featureFlags featureFlags, - logger *zap.Logger, -) *whisperAdapter { - if logger == nil { - logger = zap.NewNop() - } - - adapter := &whisperAdapter{ - privateKey: pk, - transport: t, - protocol: p, - datasync: d, - featureFlags: featureFlags, - logger: logger.With(zap.Namespace("whisperAdapter")), - } - - if featureFlags.datasync { - // We pass our encryption/transport handling to the datasync - // so it's correctly encrypted. - d.Init(adapter.sendDataSync) - } - - return adapter -} - -func (a *whisperAdapter) JoinPublic(chatID string) error { - return a.transport.JoinPublic(chatID) -} - -func (a *whisperAdapter) LeavePublic(chatID string) error { - return a.transport.LeavePublic(chatID) -} - -func (a *whisperAdapter) JoinPrivate(publicKey *ecdsa.PublicKey) error { - return a.transport.JoinPrivate(publicKey) -} - -func (a *whisperAdapter) LeavePrivate(publicKey *ecdsa.PublicKey) error { - return a.transport.LeavePrivate(publicKey) -} - -type ChatMessages struct { - Messages []*protocol.Message - Public bool - ChatID string -} - -func (a *whisperAdapter) RetrieveAllMessages() ([]ChatMessages, error) { - chatMessages, err := a.transport.RetrieveAllMessages() - if err != nil { - return nil, err - } - - var result []ChatMessages - for _, messages := range chatMessages { - protoMessages, err := a.handleRetrievedMessages(messages.Messages) - if err != nil { - return nil, err - } - - result = append(result, ChatMessages{ - Messages: protoMessages, - Public: messages.Public, - ChatID: messages.ChatID, - }) - } - return result, nil -} - -// RetrievePublicMessages retrieves the collected public messages. -// It implies joining a chat if it has not been joined yet. -func (a *whisperAdapter) RetrievePublicMessages(chatID string) ([]*protocol.Message, error) { - messages, err := a.transport.RetrievePublicMessages(chatID) - if err != nil { - return nil, err - } - - return a.handleRetrievedMessages(messages) -} - -// RetrievePrivateMessages retrieves the collected private messages. -// It implies joining a chat if it has not been joined yet. -func (a *whisperAdapter) RetrievePrivateMessages(publicKey *ecdsa.PublicKey) ([]*protocol.Message, error) { - messages, err := a.transport.RetrievePrivateMessages(publicKey) - if err != nil { - return nil, err - } - - return a.handleRetrievedMessages(messages) -} - -func (a *whisperAdapter) handleRetrievedMessages(messages []*whisper.ReceivedMessage) ([]*protocol.Message, error) { - logger := a.logger.With(zap.String("site", "handleRetrievedMessages")) - - decodedMessages := make([]*protocol.Message, 0, len(messages)) - for _, item := range messages { - shhMessage := whisper.ToWhisperMessage(item) - - hlogger := logger.With(zap.Binary("hash", shhMessage.Hash)) - hlogger.Debug("handling a received message") - - statusMessages, err := a.handleMessages(shhMessage, true) - if err != nil { - hlogger.Info("failed to decode messages", zap.Error(err)) - continue - } - - for _, statusMessage := range statusMessages { - switch m := statusMessage.ParsedMessage.(type) { - case protocol.Message: - m.ID = statusMessage.ID - m.SigPubKey = statusMessage.SigPubKey() - decodedMessages = append(decodedMessages, &m) - case protocol.PairMessage: - fromOurDevice := isPubKeyEqual(statusMessage.SigPubKey(), &a.privateKey.PublicKey) - if !fromOurDevice { - hlogger.Debug("received PairMessage from not our device, skipping") - break - } - - metadata := &multidevice.InstallationMetadata{ - Name: m.Name, - FCMToken: m.FCMToken, - DeviceType: m.DeviceType, - } - err := a.protocol.SetInstallationMetadata(&a.privateKey.PublicKey, m.InstallationID, metadata) - if err != nil { - return nil, err - } - default: - hlogger.Error("skipped a public message of unsupported type") - } - } - } - return decodedMessages, nil -} - -// DEPRECATED -func (a *whisperAdapter) RetrieveRawAll() (map[transport.Filter][]*protocol.StatusMessage, error) { - chatWithMessages, err := a.transport.RetrieveRawAll() - if err != nil { - return nil, err - } - - logger := a.logger.With(zap.String("site", "RetrieveRawAll")) - result := make(map[transport.Filter][]*protocol.StatusMessage) - - for chat, messages := range chatWithMessages { - for _, message := range messages { - shhMessage := whisper.ToWhisperMessage(message) - statusMessages, err := a.handleMessages(shhMessage, false) - if err != nil { - logger.Info("failed to decode messages", zap.Error(err)) - continue - } - - result[chat] = append(result[chat], statusMessages...) - - } - } - - return result, nil -} - -// handleMessages expects a whisper message as input, and it will go through -// a series of transformations until the message is parsed into an application -// layer message, or in case of Raw methods, the processing stops at the layer -// before -func (a *whisperAdapter) handleMessages(shhMessage *whisper.Message, applicationLayer bool) ([]*protocol.StatusMessage, error) { - logger := a.logger.With(zap.String("site", "handleMessages")) - hlogger := logger.With(zap.Binary("hash", shhMessage.Hash)) - var statusMessage protocol.StatusMessage - - err := statusMessage.HandleTransport(shhMessage) - if err != nil { - hlogger.Error("failed to handle transport layer message", zap.Error(err)) - return nil, err - } - - err = a.handleEncryptionLayer(context.Background(), &statusMessage) - if err != nil { - hlogger.Debug("failed to handle an encryption message", zap.Error(err)) - } - - statusMessages, err := statusMessage.HandleDatasync(a.datasync) - if err != nil { - hlogger.Debug("failed to handle datasync message", zap.Error(err)) - } - - for _, statusMessage := range statusMessages { - err := statusMessage.HandleApplicationMetadata() - if err != nil { - hlogger.Error("failed to handle application metadata layer message", zap.Error(err)) - } - - if applicationLayer { - err = statusMessage.HandleApplication() - if err != nil { - hlogger.Error("failed to handle application layer message") - } - } - } - - return statusMessages, nil -} - -func (a *whisperAdapter) handleEncryptionLayer(ctx context.Context, message *protocol.StatusMessage) error { - publicKey := message.SigPubKey() - - logger := a.logger.With(zap.String("site", "handleEncryptionLayer")) - - err := message.HandleEncryption(a.privateKey, publicKey, a.protocol) - if err == encryption.ErrDeviceNotFound { - handleErr := a.handleErrDeviceNotFound(ctx, publicKey) - if handleErr != nil { - logger.Error("failed to handle error", zap.Error(err), zap.NamedError("handleErr", handleErr)) - } - } - if err != nil { - return errors.Wrap(err, "failed to process an encrypted message") - } - - return nil -} - -func (a *whisperAdapter) handleErrDeviceNotFound(ctx context.Context, publicKey *ecdsa.PublicKey) error { - now := time.Now().Unix() - advertise, err := a.protocol.ShouldAdvertiseBundle(publicKey, now) - if err != nil { - return err - } - if !advertise { - return nil - } - - messageSpec, err := a.protocol.BuildBundleAdvertiseMessage(a.privateKey, publicKey) - if err != nil { - return err - } - - ctx, cancel := context.WithTimeout(ctx, time.Second) - defer cancel() - _, _, err = a.sendMessageSpec(ctx, publicKey, messageSpec) - if err != nil { - return err - } - - a.protocol.ConfirmBundleAdvertisement(publicKey, now) - - return nil -} - -// SendPublic sends a public message passing chat name to the transport layer. -// -// Be aware that this method returns a message ID using protocol.MessageID -// instead of Whisper message hash. -func (a *whisperAdapter) SendPublic(ctx context.Context, chatName, chatID string, data []byte, clock int64) ([]byte, error) { - logger := a.logger.With(zap.String("site", "SendPublic")) - - logger.Debug("sending a public message", zap.String("chat-name", chatName)) - - message := protocol.CreatePublicTextMessage(data, clock, chatName) - - encodedMessage, err := a.encodeMessage(message) - if err != nil { - return nil, errors.Wrap(err, "failed to encode message") - } - - wrappedMessage, err := a.tryWrapMessageV1(encodedMessage) - if err != nil { - return nil, errors.Wrap(err, "failed to wrap message") - } - - messageSpec, err := a.protocol.BuildPublicMessage(a.privateKey, wrappedMessage) - if err != nil { - return nil, errors.Wrap(err, "failed to build public message") - } - - newMessage, err := a.messageSpecToWhisper(messageSpec) - if err != nil { - return nil, err - } - - hash, err := a.transport.SendPublic(ctx, &newMessage, chatName) - if err != nil { - return nil, err - } - messageID := protocol.MessageID(&a.privateKey.PublicKey, wrappedMessage) - - a.transport.Track([][]byte{messageID}, hash, newMessage) - - return messageID, nil -} - -// SendPublicRaw takes encoded data, encrypts it and sends through the wire. -// DEPRECATED -func (a *whisperAdapter) SendPublicRaw(ctx context.Context, chatName string, data []byte) ([]byte, error) { - - var newMessage whisper.NewMessage - - wrappedMessage, err := a.tryWrapMessageV1(data) - if err != nil { - return nil, errors.Wrap(err, "failed to wrap message") - } - - newMessage = whisper.NewMessage{ - TTL: whisperTTL, - Payload: wrappedMessage, - PowTarget: whisperPoW, - PowTime: whisperPoWTime, - } - - hash, err := a.transport.SendPublic(ctx, &newMessage, chatName) - if err != nil { - return nil, err - } - - messageID := protocol.MessageID(&a.privateKey.PublicKey, wrappedMessage) - - a.transport.Track([][]byte{messageID}, hash, newMessage) - - return messageID, nil -} - -func (a *whisperAdapter) SendContactCode(ctx context.Context, messageSpec *encryption.ProtocolMessageSpec) ([]byte, error) { - newMessage, err := a.messageSpecToWhisper(messageSpec) - if err != nil { - return nil, err - } - - return a.transport.SendPublic(ctx, &newMessage, transport.ContactCodeTopic(&a.privateKey.PublicKey)) -} - -func (a *whisperAdapter) tryWrapMessageV1(encodedMessage []byte) ([]byte, error) { - if a.featureFlags.sendV1Messages { - wrappedMessage, err := protocol.WrapMessageV1(encodedMessage, a.privateKey) - if err != nil { - return nil, errors.Wrap(err, "failed to wrap message") - } - - return wrappedMessage, nil - - } - - return encodedMessage, nil -} - -func (a *whisperAdapter) encodeMessage(message protocol.Message) ([]byte, error) { - encodedMessage, err := protocol.EncodeMessage(message) - if err != nil { - return nil, errors.Wrap(err, "failed to encode message") - } - return encodedMessage, nil -} - -// SendPrivate sends a one-to-one message. It needs to return it -// because the registered Whisper filter handles only incoming messages -// and our own messages need to be handled manually. -// -// This might be not true if a shared secret is used because it relies on -// symmetric encryption. -// -// Be aware that this method returns a message ID using protocol.MessageID -// instead of Whisper message hash. -func (a *whisperAdapter) SendPrivate( - ctx context.Context, - publicKey *ecdsa.PublicKey, - chatID string, - data []byte, - clock int64, -) ([]byte, *protocol.Message, error) { - logger := a.logger.With(zap.String("site", "SendPrivate")) - - logger.Debug("sending a private message", zap.Binary("public-key", crypto.FromECDSAPub(publicKey))) - - message := protocol.CreatePrivateTextMessage(data, clock, chatID) - - encodedMessage, err := a.encodeMessage(message) - if err != nil { - return nil, nil, errors.Wrap(err, "failed to encode message") - } - - wrappedMessage, err := a.tryWrapMessageV1(encodedMessage) - if err != nil { - return nil, nil, errors.Wrap(err, "failed to wrap message") - } - - if a.featureFlags.datasync { - if err := a.sendWithDataSync(publicKey, wrappedMessage); err != nil { - return nil, nil, errors.Wrap(err, "failed to send message with datasync") - } - } else { - err = a.encryptAndSend(ctx, publicKey, wrappedMessage) - if err != nil { - return nil, nil, err - } - } - - return protocol.MessageID(&a.privateKey.PublicKey, wrappedMessage), &message, nil -} - -func (a *whisperAdapter) sendWithDataSync(publicKey *ecdsa.PublicKey, message []byte) error { - groupID := datasync.ToOneToOneGroupID(&a.privateKey.PublicKey, publicKey) - peerID := datasyncpeer.PublicKeyToPeerID(*publicKey) - exist, err := a.datasync.IsPeerInGroup(groupID, peerID) - if err != nil { - return errors.Wrap(err, "failed to check if peer is in group") - } - if !exist { - if err := a.datasync.AddPeer(groupID, peerID); err != nil { - return errors.Wrap(err, "failed to add peer") - } - } - _, err = a.datasync.AppendMessage(groupID, message) - if err != nil { - return errors.Wrap(err, "failed to append message to datasync") - } - - return nil -} - -// SendPrivateRaw takes encoded data, encrypts it and sends through the wire. -// DEPRECATED -func (a *whisperAdapter) SendPrivateRaw( - ctx context.Context, - publicKey *ecdsa.PublicKey, - data []byte, -) ([]byte, error) { - a.logger.Debug( - "sending a private message", - zap.Binary("public-key", crypto.FromECDSAPub(publicKey)), - zap.String("site", "SendPrivateRaw"), - ) - - wrappedMessage, err := a.tryWrapMessageV1(data) - if err != nil { - return nil, errors.Wrap(err, "failed to wrap message") - } - - messageSpec, err := a.protocol.BuildDirectMessage(a.privateKey, publicKey, wrappedMessage) - if err != nil { - return nil, errors.Wrap(err, "failed to encrypt message") - } - - messageID := protocol.MessageID(&a.privateKey.PublicKey, wrappedMessage) - - if a.featureFlags.datasync { - if err := a.sendWithDataSync(publicKey, wrappedMessage); err != nil { - return nil, errors.Wrap(err, "failed to send message with datasync") - } - return messageID, nil - } - - hash, newMessage, err := a.sendMessageSpec(ctx, publicKey, messageSpec) - a.transport.Track([][]byte{messageID}, hash, *newMessage) - - return messageID, err -} - -func (a *whisperAdapter) sendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec) ([]byte, *whisper.NewMessage, error) { - var err error - var hash []byte - newMessage, err := a.messageSpecToWhisper(messageSpec) - if err != nil { - return nil, nil, err - } - - logger := a.logger.With(zap.String("site", "sendMessageSpec")) - switch { - case messageSpec.SharedSecret != nil: - logger.Debug("sending using shared secret") - hash, err = a.transport.SendPrivateWithSharedSecret(ctx, &newMessage, publicKey, messageSpec.SharedSecret) - case messageSpec.PartitionedTopicMode() == encryption.PartitionTopicV1: - logger.Debug("sending partitioned topic") - hash, err = a.transport.SendPrivateWithPartitioned(ctx, &newMessage, publicKey) - case !a.featureFlags.genericDiscoveryTopicEnabled: - logger.Debug("sending partitioned topic (generic discovery topic disabled)") - hash, err = a.transport.SendPrivateWithPartitioned(ctx, &newMessage, publicKey) - default: - logger.Debug("sending using discovery topic") - hash, err = a.transport.SendPrivateOnDiscovery(ctx, &newMessage, publicKey) - } - if err != nil { - return nil, nil, err - } - - return hash, &newMessage, nil -} - -func (a *whisperAdapter) sendDataSync(ctx context.Context, publicKey *ecdsa.PublicKey, encodedMessage []byte, datasyncPayload *datasyncproto.Payload) error { - var messageIDs [][]byte - for _, payload := range datasyncPayload.Messages { - messageIDs = append(messageIDs, protocol.MessageID(&a.privateKey.PublicKey, payload.Body)) - } - messageSpec, err := a.protocol.BuildDirectMessage(a.privateKey, publicKey, encodedMessage) - if err != nil { - return errors.Wrap(err, "failed to encrypt message") - } - - hash, newMessage, err := a.sendMessageSpec(ctx, publicKey, messageSpec) - if err != nil { - return err - } - - a.transport.Track(messageIDs, hash, *newMessage) - - return nil -} - -func (a *whisperAdapter) encryptAndSend(ctx context.Context, publicKey *ecdsa.PublicKey, encodedMessage []byte) error { - messageID := protocol.MessageID(&a.privateKey.PublicKey, encodedMessage) - messageSpec, err := a.protocol.BuildDirectMessage(a.privateKey, publicKey, encodedMessage) - if err != nil { - return errors.Wrap(err, "failed to encrypt message") - } - - hash, newMessage, err := a.sendMessageSpec(ctx, publicKey, messageSpec) - if err != nil { - return err - } - - a.transport.Track([][]byte{messageID}, hash, *newMessage) - - return nil -} - -func (a *whisperAdapter) messageSpecToWhisper(spec *encryption.ProtocolMessageSpec) (whisper.NewMessage, error) { - var newMessage whisper.NewMessage - - payload, err := proto.Marshal(spec.Message) - if err != nil { - return newMessage, err - } - - newMessage = whisper.NewMessage{ - TTL: whisperTTL, - Payload: payload, - PowTarget: whisperPoW, - PowTime: whisperPoWTime, - } - return newMessage, nil -} - -func (a *whisperAdapter) handleSharedSecrets(secrets []*sharedsecret.Secret) ([]*transport.Filter, error) { - logger := a.logger.With(zap.String("site", "handleSharedSecrets")) - var filters []*transport.Filter - for _, secret := range secrets { - logger.Debug("received shared secret", zap.Binary("identity", crypto.FromECDSAPub(secret.Identity))) - - fSecret := transport.NegotiatedSecret{ - PublicKey: secret.Identity, - Key: secret.Key, - } - filter, err := a.transport.ProcessNegotiatedSecret(fSecret) - if err != nil { - return nil, err - } - filters = append(filters, filter) - } - return filters, nil -} - -func (a *whisperAdapter) Stop() { - a.transport.Stop() -} - -// isPubKeyEqual checks that two public keys are equal -func isPubKeyEqual(a, b *ecdsa.PublicKey) bool { - // the curve is always the same, just compare the points - return a.X.Cmp(b.X) == 0 && a.Y.Cmp(b.Y) == 0 -} diff --git a/datasync/datasync.go b/datasync/datasync.go index 18a5dcc..1e6a089 100644 --- a/datasync/datasync.go +++ b/datasync/datasync.go @@ -2,6 +2,7 @@ package datasync import ( "crypto/ecdsa" + "github.com/golang/protobuf/proto" datasyncpeer "github.com/status-im/status-protocol-go/datasync/peer" datasyncnode "github.com/vacp2p/mvds/node" @@ -31,7 +32,6 @@ func (d *DataSync) Add(publicKey *ecdsa.PublicKey, datasyncMessage datasyncproto } func (d *DataSync) Handle(sender *ecdsa.PublicKey, payload []byte) [][]byte { - var payloads [][]byte logger := d.logger.With(zap.String("site", "Handle")) diff --git a/message_processor.go b/message_processor.go new file mode 100644 index 0000000..53e7bb9 --- /dev/null +++ b/message_processor.go @@ -0,0 +1,467 @@ +package statusproto + +import ( + "context" + "crypto/ecdsa" + "database/sql" + "time" + + "github.com/ethereum/go-ethereum/crypto" + "github.com/golang/protobuf/proto" + "github.com/pkg/errors" + whisper "github.com/status-im/whisper/whisperv6" + "go.uber.org/zap" + + "github.com/status-im/status-protocol-go/datasync" + datasyncpeer "github.com/status-im/status-protocol-go/datasync/peer" + "github.com/status-im/status-protocol-go/encryption" + "github.com/status-im/status-protocol-go/encryption/multidevice" + transport "github.com/status-im/status-protocol-go/transport/whisper" + protocol "github.com/status-im/status-protocol-go/v1" + datasyncnode "github.com/vacp2p/mvds/node" + datasyncproto "github.com/vacp2p/mvds/protobuf" +) + +// Whisper message properties. +const ( + whisperTTL = 15 + whisperPoW = 0.002 + whisperPoWTime = 5 +) + +type messageProcessor struct { + identity *ecdsa.PrivateKey + datasync *datasync.DataSync + protocol *encryption.Protocol + transport *transport.WhisperServiceTransport + logger *zap.Logger + + featureFlags featureFlags +} + +func newMessageProcessor( + identity *ecdsa.PrivateKey, + database *sql.DB, + enc *encryption.Protocol, + transport *transport.WhisperServiceTransport, + logger *zap.Logger, + features featureFlags, +) (*messageProcessor, error) { + + dataSyncTransport := datasync.NewDataSyncNodeTransport() + dataSyncNode, err := datasyncnode.NewPersistentNode( + database, + dataSyncTransport, + datasyncpeer.PublicKeyToPeerID(identity.PublicKey), + datasyncnode.BATCH, + datasync.CalculateSendTime, + logger, + ) + if err != nil { + return nil, err + } + ds := datasync.New(dataSyncNode, dataSyncTransport, features.datasync, logger) + + p := &messageProcessor{ + identity: identity, + datasync: ds, + protocol: enc, + transport: transport, + logger: logger, + featureFlags: features, + } + + // Initializing DataSync is required to encrypt and send messages. + // With DataSync enabled, messages are added to the DataSync + // but actual encrypt and send calls are postponed. + // sendDataSync is responsible for encrypting and sending postponed messages. + if features.datasync { + ds.Init(p.sendDataSync) + ds.Start(300 * time.Millisecond) + } + + return p, nil +} + +func (p *messageProcessor) Stop() { + p.datasync.Stop() // idempotent op +} + +func (p *messageProcessor) SendPrivate( + ctx context.Context, + publicKey *ecdsa.PublicKey, + chatID string, + data []byte, + clock int64, +) ([]byte, *protocol.Message, error) { + logger := p.logger.With(zap.String("site", "SendPrivate")) + logger.Debug("sending a private message", zap.Binary("public-key", crypto.FromECDSAPub(publicKey))) + + message := protocol.CreatePrivateTextMessage(data, clock, chatID) + encodedMessage, err := p.encodeMessage(message) + if err != nil { + return nil, nil, errors.Wrap(err, "failed to encode message") + } + + messageID, err := p.SendPrivateRaw(ctx, publicKey, encodedMessage) + if err != nil { + return nil, nil, err + } + + return messageID, &message, nil +} + +// SendPrivateRaw takes encoded data, encrypts it and sends through the wire. +// DEPRECATED +func (p *messageProcessor) SendPrivateRaw( + ctx context.Context, + publicKey *ecdsa.PublicKey, + data []byte, +) ([]byte, error) { + p.logger.Debug( + "sending a private message", + zap.Binary("public-key", crypto.FromECDSAPub(publicKey)), + zap.String("site", "SendPrivateRaw"), + ) + + wrappedMessage, err := p.tryWrapMessageV1(data) + if err != nil { + return nil, errors.Wrap(err, "failed to wrap message") + } + + messageID := protocol.MessageID(&p.identity.PublicKey, wrappedMessage) + + if p.featureFlags.datasync { + if err := p.addToDataSync(publicKey, wrappedMessage); err != nil { + return nil, errors.Wrap(err, "failed to send message with datasync") + } + } else { + messageSpec, err := p.protocol.BuildDirectMessage(p.identity, publicKey, wrappedMessage) + if err != nil { + return nil, errors.Wrap(err, "failed to encrypt message") + } + + hash, newMessage, err := p.sendMessageSpec(ctx, publicKey, messageSpec) + if err != nil { + return nil, errors.Wrap(err, "failed to send a message spec") + } + + p.transport.Track([][]byte{messageID}, hash, *newMessage) + } + + return messageID, nil +} + +func (p *messageProcessor) SendPublic(ctx context.Context, chatName, chatID string, data []byte, clock int64) ([]byte, error) { + logger := p.logger.With(zap.String("site", "SendPublic")) + logger.Debug("sending a public message", zap.String("chat-name", chatName)) + + message := protocol.CreatePublicTextMessage(data, clock, chatName) + + encodedMessage, err := p.encodeMessage(message) + if err != nil { + return nil, errors.Wrap(err, "failed to encode message") + } + + wrappedMessage, err := p.tryWrapMessageV1(encodedMessage) + if err != nil { + return nil, errors.Wrap(err, "failed to wrap message") + } + + messageSpec, err := p.protocol.BuildPublicMessage(p.identity, wrappedMessage) + if err != nil { + return nil, errors.Wrap(err, "failed to build public message") + } + + newMessage, err := messageSpecToWhisper(messageSpec) + if err != nil { + return nil, err + } + + hash, err := p.transport.SendPublic(ctx, &newMessage, chatName) + if err != nil { + return nil, err + } + messageID := protocol.MessageID(&p.identity.PublicKey, wrappedMessage) + + p.transport.Track([][]byte{messageID}, hash, newMessage) + + return messageID, nil +} + +// SendPublicRaw takes encoded data, encrypts it and sends through the wire. +// DEPRECATED +func (p *messageProcessor) SendPublicRaw(ctx context.Context, chatName string, data []byte) ([]byte, error) { + var newMessage whisper.NewMessage + + wrappedMessage, err := p.tryWrapMessageV1(data) + if err != nil { + return nil, errors.Wrap(err, "failed to wrap message") + } + + newMessage = whisper.NewMessage{ + TTL: whisperTTL, + Payload: wrappedMessage, + PowTarget: whisperPoW, + PowTime: whisperPoWTime, + } + + hash, err := p.transport.SendPublic(ctx, &newMessage, chatName) + if err != nil { + return nil, err + } + + messageID := protocol.MessageID(&p.identity.PublicKey, wrappedMessage) + + p.transport.Track([][]byte{messageID}, hash, newMessage) + + return messageID, nil +} + +func (p *messageProcessor) Process(messages []*whisper.ReceivedMessage) ([]*protocol.Message, error) { + logger := p.logger.With(zap.String("site", "handleRetrievedMessages")) + + decodedMessages := make([]*protocol.Message, 0, len(messages)) + for _, item := range messages { + shhMessage := whisper.ToWhisperMessage(item) + + hlogger := logger.With(zap.Binary("hash", shhMessage.Hash)) + hlogger.Debug("handling a received message") + + statusMessages, err := p.handleMessages(shhMessage, true) + if err != nil { + hlogger.Info("failed to decode messages", zap.Error(err)) + continue + } + + for _, statusMessage := range statusMessages { + switch m := statusMessage.ParsedMessage.(type) { + case protocol.Message: + m.ID = statusMessage.ID + m.SigPubKey = statusMessage.SigPubKey() + decodedMessages = append(decodedMessages, &m) + case protocol.PairMessage: + fromOurDevice := isPubKeyEqual(statusMessage.SigPubKey(), &p.identity.PublicKey) + if !fromOurDevice { + hlogger.Debug("received PairMessage from not our device, skipping") + break + } + + metadata := &multidevice.InstallationMetadata{ + Name: m.Name, + FCMToken: m.FCMToken, + DeviceType: m.DeviceType, + } + err := p.protocol.SetInstallationMetadata(&p.identity.PublicKey, m.InstallationID, metadata) + if err != nil { + return nil, err + } + default: + hlogger.Error("skipped a public message of unsupported type") + } + } + } + return decodedMessages, nil +} + +// handleMessages expects a whisper message as input, and it will go through +// a series of transformations until the message is parsed into an application +// layer message, or in case of Raw methods, the processing stops at the layer +// before +func (p *messageProcessor) handleMessages(shhMessage *whisper.Message, applicationLayer bool) ([]*protocol.StatusMessage, error) { + logger := p.logger.With(zap.String("site", "handleMessages")) + hlogger := logger.With(zap.Binary("hash", shhMessage.Hash)) + var statusMessage protocol.StatusMessage + + err := statusMessage.HandleTransport(shhMessage) + if err != nil { + hlogger.Error("failed to handle transport layer message", zap.Error(err)) + return nil, err + } + + err = p.handleEncryptionLayer(context.Background(), &statusMessage) + if err != nil { + hlogger.Debug("failed to handle an encryption message", zap.Error(err)) + } + + statusMessages, err := statusMessage.HandleDatasync(p.datasync) + if err != nil { + hlogger.Debug("failed to handle datasync message", zap.Error(err)) + } + + for _, statusMessage := range statusMessages { + err := statusMessage.HandleApplicationMetadata() + if err != nil { + hlogger.Error("failed to handle application metadata layer message", zap.Error(err)) + } + + if applicationLayer { + err = statusMessage.HandleApplication() + if err != nil { + hlogger.Error("failed to handle application layer message") + } + } + } + + return statusMessages, nil +} + +func (p *messageProcessor) handleEncryptionLayer(ctx context.Context, message *protocol.StatusMessage) error { + logger := p.logger.With(zap.String("site", "handleEncryptionLayer")) + publicKey := message.SigPubKey() + + err := message.HandleEncryption(p.identity, publicKey, p.protocol) + if err == encryption.ErrDeviceNotFound { + handleErr := p.handleErrDeviceNotFound(ctx, publicKey) + if handleErr != nil { + logger.Error("failed to handle error", zap.Error(err), zap.NamedError("handleErr", handleErr)) + } + } + if err != nil { + return errors.Wrap(err, "failed to process an encrypted message") + } + + return nil +} + +func (p *messageProcessor) handleErrDeviceNotFound(ctx context.Context, publicKey *ecdsa.PublicKey) error { + now := time.Now().Unix() + advertise, err := p.protocol.ShouldAdvertiseBundle(publicKey, now) + if err != nil { + return err + } + if !advertise { + return nil + } + + messageSpec, err := p.protocol.BuildBundleAdvertiseMessage(p.identity, publicKey) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + _, _, err = p.sendMessageSpec(ctx, publicKey, messageSpec) + if err != nil { + return err + } + + p.protocol.ConfirmBundleAdvertisement(publicKey, now) + + return nil +} + +func (p *messageProcessor) encodeMessage(message protocol.Message) ([]byte, error) { + encodedMessage, err := protocol.EncodeMessage(message) + if err != nil { + return nil, errors.Wrap(err, "failed to encode message") + } + return encodedMessage, nil +} + +func (p *messageProcessor) tryWrapMessageV1(encodedMessage []byte) ([]byte, error) { + if p.featureFlags.sendV1Messages { + wrappedMessage, err := protocol.WrapMessageV1(encodedMessage, p.identity) + if err != nil { + return nil, errors.Wrap(err, "failed to wrap message") + } + return wrappedMessage, nil + } + return encodedMessage, nil +} + +func (p *messageProcessor) addToDataSync(publicKey *ecdsa.PublicKey, message []byte) error { + groupID := datasync.ToOneToOneGroupID(&p.identity.PublicKey, publicKey) + peerID := datasyncpeer.PublicKeyToPeerID(*publicKey) + exist, err := p.datasync.IsPeerInGroup(groupID, peerID) + if err != nil { + return errors.Wrap(err, "failed to check if peer is in group") + } + if !exist { + if err := p.datasync.AddPeer(groupID, peerID); err != nil { + return errors.Wrap(err, "failed to add peer") + } + } + _, err = p.datasync.AppendMessage(groupID, message) + if err != nil { + return errors.Wrap(err, "failed to append message to datasync") + } + + return nil +} + +// sendDataSync sends a message scheduled by the data sync layer. +func (p *messageProcessor) sendDataSync(ctx context.Context, publicKey *ecdsa.PublicKey, encodedMessage []byte, payload *datasyncproto.Payload) error { + messageIDs := make([][]byte, 0, len(payload.Messages)) + for _, payload := range payload.Messages { + messageIDs = append(messageIDs, protocol.MessageID(&p.identity.PublicKey, payload.Body)) + } + + messageSpec, err := p.protocol.BuildDirectMessage(p.identity, publicKey, encodedMessage) + if err != nil { + return errors.Wrap(err, "failed to encrypt message") + } + + hash, newMessage, err := p.sendMessageSpec(ctx, publicKey, messageSpec) + if err != nil { + return err + } + + p.transport.Track(messageIDs, hash, *newMessage) + + return nil +} + +func (p *messageProcessor) sendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec) ([]byte, *whisper.NewMessage, error) { + newMessage, err := messageSpecToWhisper(messageSpec) + if err != nil { + return nil, nil, err + } + + logger := p.logger.With(zap.String("site", "sendMessageSpec")) + + var hash []byte + + switch { + case messageSpec.SharedSecret != nil: + logger.Debug("sending using shared secret") + hash, err = p.transport.SendPrivateWithSharedSecret(ctx, &newMessage, publicKey, messageSpec.SharedSecret) + case messageSpec.PartitionedTopicMode() == encryption.PartitionTopicV1: + logger.Debug("sending partitioned topic") + hash, err = p.transport.SendPrivateWithPartitioned(ctx, &newMessage, publicKey) + case !p.featureFlags.genericDiscoveryTopicEnabled: + logger.Debug("sending partitioned topic (generic discovery topic disabled)") + hash, err = p.transport.SendPrivateWithPartitioned(ctx, &newMessage, publicKey) + default: + logger.Debug("sending using discovery topic") + hash, err = p.transport.SendPrivateOnDiscovery(ctx, &newMessage, publicKey) + } + if err != nil { + return nil, nil, err + } + + return hash, &newMessage, nil +} + +func messageSpecToWhisper(spec *encryption.ProtocolMessageSpec) (whisper.NewMessage, error) { + var newMessage whisper.NewMessage + + payload, err := proto.Marshal(spec.Message) + if err != nil { + return newMessage, err + } + + newMessage = whisper.NewMessage{ + TTL: whisperTTL, + Payload: payload, + PowTarget: whisperPoW, + PowTime: whisperPoWTime, + } + return newMessage, nil +} + +// isPubKeyEqual checks that two public keys are equal +func isPubKeyEqual(a, b *ecdsa.PublicKey) bool { + // the curve is always the same, just compare the points + return a.X.Cmp(b.X) == 0 && a.Y.Cmp(b.Y) == 0 +} diff --git a/adapters_test.go b/message_processor_test.go similarity index 67% rename from adapters_test.go rename to message_processor_test.go index c3b2a26..e3f50a6 100644 --- a/adapters_test.go +++ b/message_processor_test.go @@ -1,35 +1,41 @@ package statusproto import ( - "crypto/ecdsa" "io/ioutil" "os" "path/filepath" "testing" - "time" - - "github.com/status-im/status-protocol-go/sqlite" + "github.com/ethereum/go-ethereum/crypto" "github.com/golang/protobuf/proto" - "github.com/status-im/status-protocol-go/encryption" - "github.com/status-im/status-protocol-go/encryption/multidevice" - "github.com/status-im/status-protocol-go/encryption/sharedsecret" - transport "github.com/status-im/status-protocol-go/transport/whisper" + whisper "github.com/status-im/whisper/whisperv6" "github.com/stretchr/testify/suite" "go.uber.org/zap" - "github.com/ethereum/go-ethereum/crypto" - "github.com/status-im/status-protocol-go/datasync" - datasyncpeer "github.com/status-im/status-protocol-go/datasync/peer" - datasyncnode "github.com/vacp2p/mvds/node" - datasyncproto "github.com/vacp2p/mvds/protobuf" - + "github.com/status-im/status-protocol-go/encryption" + "github.com/status-im/status-protocol-go/encryption/multidevice" + "github.com/status-im/status-protocol-go/encryption/sharedsecret" + "github.com/status-im/status-protocol-go/sqlite" + transport "github.com/status-im/status-protocol-go/transport/whisper" protocol "github.com/status-im/status-protocol-go/v1" - whisper "github.com/status-im/whisper/whisperv6" + datasyncproto "github.com/vacp2p/mvds/protobuf" ) -var ( - testMessageStruct = protocol.Message{ +func TestMessageProcessorSuite(t *testing.T) { + suite.Run(t, new(MessageProcessorSuite)) +} + +type MessageProcessorSuite struct { + suite.Suite + + processor *messageProcessor + tmpDir string + testMessage protocol.Message + logger *zap.Logger +} + +func (s *MessageProcessorSuite) SetupTest() { + s.testMessage = protocol.Message{ Text: "abc123", ContentT: "text/plain", MessageT: "public-group-user-message", @@ -40,53 +46,19 @@ var ( Text: "abc123", }, } -) -func TestAdaptersSuite(t *testing.T) { - suite.Run(t, new(AdaptersSuite)) -} - -type AdaptersSuite struct { - suite.Suite - - a *whisperAdapter - tmpDir string - privateKey *ecdsa.PrivateKey - senderEncryptionProtocol *encryption.Protocol - logger *zap.Logger -} - -func (s *AdaptersSuite) SetupTest() { var err error - logger, err := zap.NewDevelopment() - s.Require().NoError(err) - s.logger = logger - - s.tmpDir, err = ioutil.TempDir("", "adapters-test") + s.logger, err = zap.NewDevelopment() s.Require().NoError(err) - database, err := sqlite.Open(filepath.Join(s.tmpDir, "transport.db.sql"), "some-key") + s.tmpDir, err = ioutil.TempDir("", "") s.Require().NoError(err) - s.privateKey, err = crypto.GenerateKey() + identity, err := crypto.GenerateKey() s.Require().NoError(err) - whisperConfig := whisper.DefaultConfig - whisperConfig.MinimumAcceptedPOW = 0 - shh := whisper.New(&whisperConfig) - s.Require().NoError(shh.Start(nil)) - config := &config{} - s.Require().NoError(WithDatasync()(config)) - - whisperTransport, err := transport.NewWhisperServiceTransport( - shh, - s.privateKey, - database, - nil, - nil, - logger, - ) + database, err := sqlite.Open(filepath.Join(s.tmpDir, "processor-test.sql"), "some-key") s.Require().NoError(err) onNewInstallations := func([]*multidevice.Installation) {} @@ -98,79 +70,74 @@ func (s *AdaptersSuite) SetupTest() { onNewInstallations, onNewSharedSecret, onSendContactCode, - logger, + s.logger, ) - senderDatabase, err := sqlite.Open(filepath.Join(s.tmpDir, "sender.db.sql"), "some-key") - s.Require().NoError(err) - s.senderEncryptionProtocol = encryption.New( - senderDatabase, - "installation-2", - onNewInstallations, - onNewSharedSecret, - onSendContactCode, - logger, - ) + whisperConfig := whisper.DefaultConfig + whisperConfig.MinimumAcceptedPOW = 0 + shh := whisper.New(&whisperConfig) + s.Require().NoError(shh.Start(nil)) + config := &config{} + s.Require().NoError(WithDatasync()(config)) - dataSyncTransport := datasync.NewDataSyncNodeTransport() - dataSyncNode, err := datasyncnode.NewPersistentNode( - senderDatabase, - dataSyncTransport, - datasyncpeer.PublicKeyToPeerID(s.privateKey.PublicKey), - datasyncnode.BATCH, - datasync.CalculateSendTime, - logger, + whisperTransport, err := transport.NewWhisperServiceTransport( + shh, + identity, + database, + nil, + nil, + s.logger, ) s.Require().NoError(err) - datasync := datasync.New(dataSyncNode, dataSyncTransport, true, logger) - - s.a = newWhisperAdapter( - s.privateKey, - whisperTransport, + s.processor, err = newMessageProcessor( + identity, + database, encryptionProtocol, - datasync, - config.featureFlags, - logger, + whisperTransport, + s.logger, + featureFlags{}, ) - - dataSyncNode.Start(100 * time.Second) + s.Require().NoError(err) } -func (s *AdaptersSuite) TestHandleDecodedMessagesSingle() { +func (s *MessageProcessorSuite) TearDownTest() { + os.Remove(s.tmpDir) + _ = s.logger.Sync() +} +func (s *MessageProcessorSuite) TestHandleDecodedMessagesSingle() { privateKey, err := crypto.GenerateKey() s.Require().NoError(err) - encodedPayload, err := protocol.EncodeMessage(testMessageStruct) + encodedPayload, err := protocol.EncodeMessage(s.testMessage) s.Require().NoError(err) message := &whisper.Message{} message.Sig = crypto.FromECDSAPub(&privateKey.PublicKey) message.Payload = encodedPayload - decodedMessages, err := s.a.handleMessages(message, true) + decodedMessages, err := s.processor.handleMessages(message, true) s.Require().NoError(err) s.Require().Equal(1, len(decodedMessages)) s.Require().Equal(encodedPayload, decodedMessages[0].DecryptedPayload) s.Require().Equal(&privateKey.PublicKey, decodedMessages[0].SigPubKey()) s.Require().Equal(protocol.MessageID(&privateKey.PublicKey, encodedPayload), decodedMessages[0].ID) - s.Require().Equal(testMessageStruct, decodedMessages[0].ParsedMessage) + s.Require().Equal(s.testMessage, decodedMessages[0].ParsedMessage) } -func (s *AdaptersSuite) TestHandleDecodedMessagesRaw() { - +func (s *MessageProcessorSuite) TestHandleDecodedMessagesRaw() { privateKey, err := crypto.GenerateKey() s.Require().NoError(err) - encodedPayload, err := protocol.EncodeMessage(testMessageStruct) + encodedPayload, err := protocol.EncodeMessage(s.testMessage) s.Require().NoError(err) message := &whisper.Message{} message.Sig = crypto.FromECDSAPub(&privateKey.PublicKey) message.Payload = encodedPayload - decodedMessages, err := s.a.handleMessages(message, false) + decodedMessages, err := s.processor.handleMessages(message, false) s.Require().NoError(err) s.Require().Equal(1, len(decodedMessages)) s.Require().Equal(message, decodedMessages[0].TransportMessage) @@ -180,14 +147,14 @@ func (s *AdaptersSuite) TestHandleDecodedMessagesRaw() { s.Require().Equal(nil, decodedMessages[0].ParsedMessage) } -func (s *AdaptersSuite) TestHandleDecodedMessagesWrapped() { +func (s *MessageProcessorSuite) TestHandleDecodedMessagesWrapped() { relayerKey, err := crypto.GenerateKey() s.Require().NoError(err) authorKey, err := crypto.GenerateKey() s.Require().NoError(err) - encodedPayload, err := protocol.EncodeMessage(testMessageStruct) + encodedPayload, err := protocol.EncodeMessage(s.testMessage) s.Require().NoError(err) wrappedPayload, err := protocol.WrapMessageV1(encodedPayload, authorKey) @@ -197,25 +164,24 @@ func (s *AdaptersSuite) TestHandleDecodedMessagesWrapped() { message.Sig = crypto.FromECDSAPub(&relayerKey.PublicKey) message.Payload = wrappedPayload - decodedMessages, err := s.a.handleMessages(message, true) + decodedMessages, err := s.processor.handleMessages(message, true) s.Require().NoError(err) s.Require().Equal(1, len(decodedMessages)) s.Require().Equal(&authorKey.PublicKey, decodedMessages[0].SigPubKey()) s.Require().Equal(protocol.MessageID(&authorKey.PublicKey, wrappedPayload), decodedMessages[0].ID) s.Require().Equal(encodedPayload, decodedMessages[0].DecryptedPayload) - s.Require().Equal(testMessageStruct, decodedMessages[0].ParsedMessage) + s.Require().Equal(s.testMessage, decodedMessages[0].ParsedMessage) } -func (s *AdaptersSuite) TestHandleDecodedMessagesDatasync() { - +func (s *MessageProcessorSuite) TestHandleDecodedMessagesDatasync() { relayerKey, err := crypto.GenerateKey() s.Require().NoError(err) authorKey, err := crypto.GenerateKey() s.Require().NoError(err) - encodedPayload, err := protocol.EncodeMessage(testMessageStruct) + encodedPayload, err := protocol.EncodeMessage(s.testMessage) s.Require().NoError(err) wrappedPayload, err := protocol.WrapMessageV1(encodedPayload, authorKey) @@ -223,8 +189,8 @@ func (s *AdaptersSuite) TestHandleDecodedMessagesDatasync() { dataSyncMessage := datasyncproto.Payload{ Messages: []*datasyncproto.Message{ - &datasyncproto.Message{Body: encodedPayload}, - &datasyncproto.Message{Body: wrappedPayload}, + {Body: encodedPayload}, + {Body: wrappedPayload}, }, } marshalledDataSyncMessage, err := proto.Marshal(&dataSyncMessage) @@ -233,7 +199,7 @@ func (s *AdaptersSuite) TestHandleDecodedMessagesDatasync() { message.Sig = crypto.FromECDSAPub(&relayerKey.PublicKey) message.Payload = marshalledDataSyncMessage - decodedMessages, err := s.a.handleMessages(message, true) + decodedMessages, err := s.processor.handleMessages(message, true) s.Require().NoError(err) // We send two messages, the unwrapped one will be attributed to the relayer, while the wrapped one will be attributed to the author @@ -241,22 +207,22 @@ func (s *AdaptersSuite) TestHandleDecodedMessagesDatasync() { s.Require().Equal(&relayerKey.PublicKey, decodedMessages[0].SigPubKey()) s.Require().Equal(protocol.MessageID(&relayerKey.PublicKey, encodedPayload), decodedMessages[0].ID) s.Require().Equal(encodedPayload, decodedMessages[0].DecryptedPayload) - s.Require().Equal(testMessageStruct, decodedMessages[0].ParsedMessage) + s.Require().Equal(s.testMessage, decodedMessages[0].ParsedMessage) s.Require().Equal(&authorKey.PublicKey, decodedMessages[1].SigPubKey()) s.Require().Equal(protocol.MessageID(&authorKey.PublicKey, wrappedPayload), decodedMessages[1].ID) s.Require().Equal(encodedPayload, decodedMessages[1].DecryptedPayload) - s.Require().Equal(testMessageStruct, decodedMessages[1].ParsedMessage) + s.Require().Equal(s.testMessage, decodedMessages[1].ParsedMessage) } -func (s *AdaptersSuite) TestHandleDecodedMessagesDatasyncEncrypted() { +func (s *MessageProcessorSuite) TestHandleDecodedMessagesDatasyncEncrypted() { relayerKey, err := crypto.GenerateKey() s.Require().NoError(err) authorKey, err := crypto.GenerateKey() s.Require().NoError(err) - encodedPayload, err := protocol.EncodeMessage(testMessageStruct) + encodedPayload, err := protocol.EncodeMessage(s.testMessage) s.Require().NoError(err) wrappedPayload, err := protocol.WrapMessageV1(encodedPayload, authorKey) @@ -271,7 +237,23 @@ func (s *AdaptersSuite) TestHandleDecodedMessagesDatasyncEncrypted() { marshalledDataSyncMessage, err := proto.Marshal(&dataSyncMessage) s.Require().NoError(err) - messageSpec, err := s.senderEncryptionProtocol.BuildDirectMessage(relayerKey, &s.privateKey.PublicKey, marshalledDataSyncMessage) + // Create sender encryption protocol. + senderDatabase, err := sqlite.Open(filepath.Join(s.tmpDir, "sender.db.sql"), "") + s.Require().NoError(err) + senderEncryptionProtocol := encryption.New( + senderDatabase, + "installation-2", + func([]*multidevice.Installation) {}, + func([]*sharedsecret.Secret) {}, + func(*encryption.ProtocolMessageSpec) {}, + s.logger, + ) + + messageSpec, err := senderEncryptionProtocol.BuildDirectMessage( + relayerKey, + &s.processor.identity.PublicKey, + marshalledDataSyncMessage, + ) s.Require().NoError(err) encryptedPayload, err := proto.Marshal(messageSpec.Message) @@ -281,24 +263,19 @@ func (s *AdaptersSuite) TestHandleDecodedMessagesDatasyncEncrypted() { message.Sig = crypto.FromECDSAPub(&relayerKey.PublicKey) message.Payload = encryptedPayload - decodedMessages, err := s.a.handleMessages(message, true) + decodedMessages, err := s.processor.handleMessages(message, true) s.Require().NoError(err) - // We send two messages, the unwrapped one will be attributed to the relayer, while the wrapped one will be attributed to the author + // We send two messages, the unwrapped one will be attributed to the relayer, + // while the wrapped one will be attributed to the author. s.Require().Equal(2, len(decodedMessages)) s.Require().Equal(&relayerKey.PublicKey, decodedMessages[0].SigPubKey()) s.Require().Equal(protocol.MessageID(&relayerKey.PublicKey, encodedPayload), decodedMessages[0].ID) s.Require().Equal(encodedPayload, decodedMessages[0].DecryptedPayload) - s.Require().Equal(testMessageStruct, decodedMessages[0].ParsedMessage) + s.Require().Equal(s.testMessage, decodedMessages[0].ParsedMessage) s.Require().Equal(&authorKey.PublicKey, decodedMessages[1].SigPubKey()) s.Require().Equal(protocol.MessageID(&authorKey.PublicKey, wrappedPayload), decodedMessages[1].ID) s.Require().Equal(encodedPayload, decodedMessages[1].DecryptedPayload) - s.Require().Equal(testMessageStruct, decodedMessages[1].ParsedMessage) -} - -func (s *AdaptersSuite) TearDownTest() { - os.Remove(s.tmpDir) - - _ = s.logger.Sync() + s.Require().Equal(s.testMessage, decodedMessages[1].ParsedMessage) } diff --git a/messenger.go b/messenger.go index ec29105..48494f2 100644 --- a/messenger.go +++ b/messenger.go @@ -6,20 +6,17 @@ import ( "database/sql" "time" - "go.uber.org/zap" - + "github.com/ethereum/go-ethereum/crypto" "github.com/pkg/errors" whisper "github.com/status-im/whisper/whisperv6" + "go.uber.org/zap" - "github.com/status-im/status-protocol-go/datasync" - datasyncpeer "github.com/status-im/status-protocol-go/datasync/peer" "github.com/status-im/status-protocol-go/encryption" "github.com/status-im/status-protocol-go/encryption/multidevice" "github.com/status-im/status-protocol-go/encryption/sharedsecret" "github.com/status-im/status-protocol-go/sqlite" transport "github.com/status-im/status-protocol-go/transport/whisper" protocol "github.com/status-im/status-protocol-go/v1" - datasyncnode "github.com/vacp2p/mvds/node" ) var ( @@ -37,11 +34,12 @@ var ( type Messenger struct { identity *ecdsa.PrivateKey persistence *sqlitePersistence - adapter *whisperAdapter + transport *transport.WhisperServiceTransport encryptor *encryption.Protocol + processor *messageProcessor logger *zap.Logger - ownMessages map[string][]*protocol.Message + ownMessages []*protocol.Message featureFlags featureFlags messagesPersistenceEnabled bool shutdownTasks []func() error @@ -212,9 +210,17 @@ func NewMessenger( c.onSendContactCodeHandler = func(messageSpec *encryption.ProtocolMessageSpec) { slogger := logger.With(zap.String("site", "onSendContactCodeHandler")) slogger.Info("received a SendContactCode request") + + newMessage, err := messageSpecToWhisper(messageSpec) + if err != nil { + slogger.Warn("failed to convert spec to Whisper message", zap.Error(err)) + return + } + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - _, err := messenger.adapter.SendContactCode(ctx, messageSpec) + chatName := transport.ContactCodeTopic(&messenger.identity.PublicKey) + _, err = messenger.transport.SendPublic(ctx, &newMessage, chatName) if err != nil { slogger.Warn("failed to send a contact code", zap.Error(err)) } @@ -265,40 +271,34 @@ func NewMessenger( logger, ) - // Initialize data sync. - dataSyncTransport := datasync.NewDataSyncNodeTransport() - dataSyncNode, err := datasyncnode.NewPersistentNode( + processor, err := newMessageProcessor( + identity, database, - dataSyncTransport, - datasyncpeer.PublicKeyToPeerID(identity.PublicKey), - datasyncnode.BATCH, - datasync.CalculateSendTime, + encryptionProtocol, + t, logger, + c.featureFlags, ) if err != nil { - return nil, errors.Wrap(err, "failed to create a persistent datasync node") + return nil, errors.Wrap(err, "failed to create messageProcessor") } - datasync := datasync.New(dataSyncNode, dataSyncTransport, c.featureFlags.datasync, logger) - - adapter := newWhisperAdapter(identity, t, encryptionProtocol, datasync, c.featureFlags, logger) - messenger = &Messenger{ identity: identity, persistence: &sqlitePersistence{db: database}, - adapter: adapter, + transport: t, encryptor: encryptionProtocol, - ownMessages: make(map[string][]*protocol.Message), + processor: processor, featureFlags: c.featureFlags, messagesPersistenceEnabled: c.messagesPersistenceEnabled, shutdownTasks: []func() error{ database.Close, - adapter.transport.Reset, - func() error { datasync.Stop(); return nil }, + t.Reset, + t.Stop, + func() error { processor.Stop(); return nil }, // Currently this often fails, seems like it's safe to ignore them // https://github.com/uber-go/zap/issues/328 func() error { _ = logger.Sync; return nil }, - func() error { adapter.Stop(); return nil }, }, logger: logger, } @@ -308,9 +308,6 @@ func NewMessenger( if err := encryptionProtocol.Start(identity); err != nil { return nil, err } - if c.featureFlags.datasync { - dataSyncNode.Start(300 * time.Millisecond) - } logger.Debug("messages persistence", zap.Bool("enabled", c.messagesPersistenceEnabled)) @@ -373,7 +370,7 @@ func (m *Messenger) Init() error { publicKeys = append(publicKeys, publicKey) } - _, err = m.adapter.transport.InitFilters(publicChatIDs, publicKeys) + _, err = m.transport.InitFilters(publicChatIDs, publicKeys) return err } @@ -395,7 +392,21 @@ func (m *Messenger) Shutdown() (err error) { } func (m *Messenger) handleSharedSecrets(secrets []*sharedsecret.Secret) ([]*transport.Filter, error) { - return m.adapter.handleSharedSecrets(secrets) + logger := m.logger.With(zap.String("site", "handleSharedSecrets")) + var result []*transport.Filter + for _, secret := range secrets { + logger.Debug("received shared secret", zap.Binary("identity", crypto.FromECDSAPub(secret.Identity))) + fSecret := transport.NegotiatedSecret{ + PublicKey: secret.Identity, + Key: secret.Key, + } + filter, err := m.transport.ProcessNegotiatedSecret(fSecret) + if err != nil { + return nil, err + } + result = append(result, filter) + } + return result, nil } func (m *Messenger) EnableInstallation(id string) error { @@ -436,18 +447,18 @@ func (m *Messenger) Mailservers() ([]string, error) { func (m *Messenger) Join(chat Chat) error { if chat.PublicKey != nil { - return m.adapter.JoinPrivate(chat.PublicKey) + return m.transport.JoinPrivate(chat.PublicKey) } else if chat.Name != "" { - return m.adapter.JoinPublic(chat.Name) + return m.transport.JoinPublic(chat.Name) } return errors.New("chat is neither public nor private") } func (m *Messenger) Leave(chat Chat) error { if chat.PublicKey != nil { - return m.adapter.LeavePrivate(chat.PublicKey) + return m.transport.LeavePrivate(chat.PublicKey) } else if chat.Name != "" { - return m.adapter.LeavePublic(chat.Name) + return m.transport.LeavePublic(chat.Name) } return errors.New("chat is neither public nor private") } @@ -477,6 +488,8 @@ func (m *Messenger) Contacts() ([]*Contact, error) { } func (m *Messenger) Send(ctx context.Context, chat Chat, data []byte) ([]byte, error) { + logger := m.logger.With(zap.String("site", "Send"), zap.String("chatID", chat.ID)) + chatID := chat.ID if chatID == "" { return nil, ErrChatIDEmpty @@ -487,8 +500,12 @@ func (m *Messenger) Send(ctx context.Context, chat Chat, data []byte) ([]byte, e return nil, err } + logger.Debug("last message clock received", zap.Int64("clock", clock)) + if chat.PublicKey != nil { - hash, message, err := m.adapter.SendPrivate(ctx, chat.PublicKey, chat.ID, data, clock) + logger.Debug("sending private message", zap.Binary("publicKey", crypto.FromECDSAPub(chat.PublicKey))) + + hash, message, err := m.processor.SendPrivate(ctx, chat.PublicKey, chat.ID, data, clock) if err != nil { return nil, err } @@ -498,18 +515,19 @@ func (m *Messenger) Send(ctx context.Context, chat Chat, data []byte) ([]byte, e message.SigPubKey = &m.identity.PublicKey if m.messagesPersistenceEnabled { - _, err = m.persistence.SaveMessages(chat.ID, []*protocol.Message{message}) + _, err = m.persistence.SaveMessages([]*protocol.Message{message}) if err != nil { return nil, err } } // Cache it to be returned in Retrieve(). - m.ownMessages[chatID] = append(m.ownMessages[chatID], message) + m.ownMessages = append(m.ownMessages, message) return hash, nil } else if chat.Name != "" { - return m.adapter.SendPublic(ctx, chat.Name, chat.ID, data, clock) + logger.Debug("sending public message", zap.String("chatName", chat.Name)) + return m.processor.SendPublic(ctx, chat.Name, chat.ID, data, clock) } return nil, errors.New("chat is neither public nor private") } @@ -518,9 +536,9 @@ func (m *Messenger) Send(ctx context.Context, chat Chat, data []byte) ([]byte, e // DEPRECATED func (m *Messenger) SendRaw(ctx context.Context, chat Chat, data []byte) ([]byte, error) { if chat.PublicKey != nil { - return m.adapter.SendPrivateRaw(ctx, chat.PublicKey, data) + return m.processor.SendPrivateRaw(ctx, chat.PublicKey, data) } else if chat.Name != "" { - return m.adapter.SendPublicRaw(ctx, chat.Name, data) + return m.processor.SendPublicRaw(ctx, chat.Name, data) } return nil, errors.New("chat is neither public nor private") } @@ -538,129 +556,96 @@ var ( ) // RetrieveAll retrieves all previously fetched messages -func (m *Messenger) RetrieveAll(ctx context.Context, c RetrieveConfig) (allMessages []*protocol.Message, err error) { - latest, err := m.adapter.RetrieveAllMessages() +func (m *Messenger) RetrieveAll(ctx context.Context, c RetrieveConfig) ([]*protocol.Message, error) { + latest, err := m.transport.RetrieveAllMessages() if err != nil { - err = errors.Wrap(err, "failed to retrieve messages") - return + return nil, errors.Wrap(err, "failed to retrieve messages") } - for _, messages := range latest { - chatID := messages.ChatID + logger := m.logger.With(zap.String("site", "RetrieveAll")) + logger.Debug("retrieved messages grouped by chat", zap.Int("count", len(latest))) - _, err = m.persistence.SaveMessages(chatID, messages.Messages) + var result []*protocol.Message + + for _, chat := range latest { + logger.Debug("processing chat", zap.String("chatID", chat.ChatID)) + protoMessages, err := m.processor.Process(chat.Messages) if err != nil { - return nil, errors.Wrap(err, "failed to save messages") + return nil, err } - - if !messages.Public { - // Return any own messages for this chat as well. - if ownMessages, ok := m.ownMessages[chatID]; ok { - messages.Messages = append(messages.Messages, ownMessages...) - } - } - - retrievedMessages, err := m.retrieveSaved(ctx, chatID, c, messages.Messages) - if err != nil { - return nil, errors.Wrap(err, "failed to get saved messages") - } - - allMessages = append(allMessages, retrievedMessages...) - } - - // Delete own messages as they were added to the result. - for _, messages := range latest { - if !messages.Public { - delete(m.ownMessages, messages.ChatID) - } - } - - return -} - -func (m *Messenger) Retrieve(ctx context.Context, chat Chat, c RetrieveConfig) (messages []*protocol.Message, err error) { - var ( - latest []*protocol.Message - ownLatest []*protocol.Message - ) - - if chat.PublicKey != nil { - latest, err = m.adapter.RetrievePrivateMessages(chat.PublicKey) - // Return any own messages for this chat as well. - if ownMessages, ok := m.ownMessages[chat.ID]; ok { - ownLatest = ownMessages - } - } else if chat.Name != "" { - latest, err = m.adapter.RetrievePublicMessages(chat.Name) - } else { - return nil, errors.New("chat is neither public nor private") + result = append(result, protoMessages...) } + _, err = m.persistence.SaveMessages(result) if err != nil { - err = errors.Wrap(err, "failed to retrieve messages") - return + return nil, errors.Wrap(err, "failed to save messages") } - if m.messagesPersistenceEnabled { - _, err = m.persistence.SaveMessages(chat.ID, latest) - if err != nil { - return nil, errors.Wrap(err, "failed to save latest messages") - } - } - - // Confirm received and decrypted messages. - if m.messagesPersistenceEnabled && chat.PublicKey != nil { - for _, message := range latest { - // Confirm received and decrypted messages. - if err := m.encryptor.ConfirmMessageProcessed(message.ID); err != nil { - return nil, errors.Wrap(err, "failed to confirm message being processed") - } - } - } - - // We may need to add more messages from the past. - result, err := m.retrieveSaved(ctx, chat.ID, c, append(latest, ownLatest...)) + retrievedMessages, err := m.retrieveSaved(ctx, c) if err != nil { - return nil, err + return nil, errors.Wrap(err, "failed to get saved messages") } + result = append(result, retrievedMessages...) - // When our messages are returned, we can delete them. - delete(m.ownMessages, chat.ID) + // Include own messages. + result = append(result, m.ownMessages...) + m.ownMessages = nil return result, nil } -func (m *Messenger) retrieveSaved(ctx context.Context, chatID string, c RetrieveConfig, latest []*protocol.Message) (messages []*protocol.Message, err error) { +func (m *Messenger) retrieveSaved(ctx context.Context, c RetrieveConfig) (messages []*protocol.Message, err error) { if !m.messagesPersistenceEnabled { - return latest, nil + return nil, nil } if !c.latest { - return m.persistence.Messages(chatID, c.From, c.To) + return m.persistence.Messages(c.From, c.To) } if c.last24Hours { to := time.Now() from := to.Add(-time.Hour * 24) - return m.persistence.Messages(chatID, from, to) + return m.persistence.Messages(from, to) } - return latest, nil + return nil, nil } // DEPRECATED func (m *Messenger) RetrieveRawAll() (map[transport.Filter][]*protocol.StatusMessage, error) { - return m.adapter.RetrieveRawAll() + chatWithMessages, err := m.transport.RetrieveRawAll() + if err != nil { + return nil, err + } + + logger := m.logger.With(zap.String("site", "RetrieveRawAll")) + result := make(map[transport.Filter][]*protocol.StatusMessage) + + for chat, messages := range chatWithMessages { + for _, message := range messages { + shhMessage := whisper.ToWhisperMessage(message) + // TODO: fix this to use an exported method. + statusMessages, err := m.processor.handleMessages(shhMessage, false) + if err != nil { + logger.Info("failed to decode messages", zap.Error(err)) + continue + } + result[chat] = append(result[chat], statusMessages...) + } + } + + return result, nil } // DEPRECATED func (m *Messenger) LoadFilters(filters []*transport.Filter) ([]*transport.Filter, error) { - return m.adapter.transport.LoadFilters(filters) + return m.transport.LoadFilters(filters) } // DEPRECATED func (m *Messenger) RemoveFilters(filters []*transport.Filter) error { - return m.adapter.transport.RemoveFilters(filters) + return m.transport.RemoveFilters(filters) } // DEPRECATED diff --git a/messenger_test.go b/messenger_test.go index 76a694a..3e5986b 100644 --- a/messenger_test.go +++ b/messenger_test.go @@ -10,14 +10,11 @@ import ( "time" "github.com/ethereum/go-ethereum/common/hexutil" - + "github.com/ethereum/go-ethereum/crypto" "github.com/status-im/status-protocol-go/tt" - + whisper "github.com/status-im/whisper/whisperv6" "github.com/stretchr/testify/suite" "go.uber.org/zap" - - "github.com/ethereum/go-ethereum/crypto" - whisper "github.com/status-im/whisper/whisperv6" ) func TestMessengerSuite(t *testing.T) { @@ -196,7 +193,7 @@ func (s *MessengerSuite) TestInit() { tc.Prep() err := s.m.Init() s.Require().NoError(err) - filters := s.m.adapter.transport.Filters() + filters := s.m.transport.Filters() expectedFilters += tc.AddedFilters s.Equal(expectedFilters, len(filters)) }) @@ -225,12 +222,12 @@ func (s *MessengerSuite) TestRetrievePublic() { time.Sleep(time.Millisecond * 500) // Retrieve chat - messages, err := s.m.Retrieve(context.Background(), chat, RetrieveLatest) + messages, err := s.m.RetrieveAll(context.Background(), RetrieveLatest) s.NoError(err) s.Len(messages, 1) // Retrieve again to test skipping already existing err. - messages, err = s.m.Retrieve(context.Background(), chat, RetrieveLastDay) + messages, err = s.m.RetrieveAll(context.Background(), RetrieveLastDay) s.NoError(err) s.Require().Len(messages, 1) @@ -252,12 +249,12 @@ func (s *MessengerSuite) TestRetrievePrivate() { time.Sleep(time.Millisecond * 500) // Retrieve chat - messages, err := s.m.Retrieve(context.Background(), chat, RetrieveLatest) + messages, err := s.m.RetrieveAll(context.Background(), RetrieveLatest) s.NoError(err) s.Len(messages, 1) // Retrieve again to test skipping already existing err. - messages, err = s.m.Retrieve(context.Background(), chat, RetrieveLastDay) + messages, err = s.m.RetrieveAll(context.Background(), RetrieveLastDay) s.NoError(err) s.Len(messages, 1) diff --git a/persistence.go b/persistence.go index 8d9da52..7c94637 100644 --- a/persistence.go +++ b/persistence.go @@ -335,19 +335,30 @@ func (db sqlitePersistence) SaveContact(contact Contact, tx *sql.Tx) error { } // Messages returns messages for a given contact, in a given period. Ordered by a timestamp. -func (db sqlitePersistence) Messages(chatID string, from, to time.Time) (result []*protocol.Message, err error) { +func (db sqlitePersistence) Messages(from, to time.Time) (result []*protocol.Message, err error) { rows, err := db.db.Query(`SELECT -id, content_type, message_type, text, clock, timestamp, content_chat_id, content_text, public_key, flags -FROM user_messages WHERE chat_id = ? AND timestamp >= ? AND timestamp <= ? ORDER BY timestamp`, - chatID, protocol.TimestampInMsFromTime(from), protocol.TimestampInMsFromTime(to)) + id, + content_type, + message_type, + text, + clock, + timestamp, + content_chat_id, + content_text, + public_key, + flags + FROM user_messages + WHERE timestamp >= ? AND timestamp <= ? + ORDER BY timestamp`, + protocol.TimestampInMsFromTime(from), + protocol.TimestampInMsFromTime(to), + ) if err != nil { return nil, err } defer rows.Close() - var ( - rst = []*protocol.Message{} - ) + var rst []*protocol.Message for rows.Next() { msg := protocol.Message{ Content: protocol.Content{}, @@ -458,7 +469,7 @@ func (db sqlitePersistence) UnreadMessages(chatID string) ([]*protocol.Message, return result, nil } -func (db sqlitePersistence) SaveMessages(chatID string, messages []*protocol.Message) (last int64, err error) { +func (db sqlitePersistence) SaveMessages(messages []*protocol.Message) (last int64, err error) { var ( tx *sql.Tx stmt *sql.Stmt @@ -491,7 +502,7 @@ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`) pkey, err = marshalECDSAPub(msg.SigPubKey) } rst, err = stmt.Exec( - msg.ID, chatID, msg.ContentT, msg.MessageT, msg.Text, + msg.ID, msg.ChatID, msg.ContentT, msg.MessageT, msg.Text, msg.Clock, msg.Timestamp, msg.Content.ChatID, msg.Content.Text, pkey, msg.Flags) if err != nil { diff --git a/transport/whisper/whisper_service.go b/transport/whisper/whisper_service.go index e7355d5..284e776 100644 --- a/transport/whisper/whisper_service.go +++ b/transport/whisper/whisper_service.go @@ -167,7 +167,11 @@ func (a *WhisperServiceTransport) LeavePublic(chatID string) error { } func (a *WhisperServiceTransport) JoinPrivate(publicKey *ecdsa.PublicKey) error { - _, err := a.filters.LoadContactCode(publicKey) + _, err := a.filters.LoadDiscovery() + if err != nil { + return err + } + _, err = a.filters.LoadContactCode(publicKey) return err } @@ -191,10 +195,11 @@ func (a *WhisperServiceTransport) RetrieveAllMessages() ([]ChatMessages, error) return nil, errors.New("failed to return a filter") } - messages := chatMessages[filter.ChatID] - messages.ChatID = filter.ChatID - messages.Public = filter.IsPublic() - messages.Messages = append(messages.Messages, f.Retrieve()...) + ch := chatMessages[filter.ChatID] + ch.ChatID = filter.ChatID + ch.Public = filter.IsPublic() + ch.Messages = append(ch.Messages, f.Retrieve()...) + chatMessages[filter.ChatID] = ch } var result []ChatMessages @@ -354,10 +359,11 @@ func (a *WhisperServiceTransport) Track(identifiers [][]byte, hash []byte, newMe } } -func (a *WhisperServiceTransport) Stop() { +func (a *WhisperServiceTransport) Stop() error { if a.envelopesMonitor != nil { a.envelopesMonitor.Stop() } + return nil } // MessagesRequest is a RequestMessages() request payload. diff --git a/v1/status_message.go b/v1/status_message.go index 6ce6824..a04e8ca 100644 --- a/v1/status_message.go +++ b/v1/status_message.go @@ -2,9 +2,10 @@ package statusproto import ( "crypto/ecdsa" - "github.com/pkg/errors" "log" + "github.com/pkg/errors" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" "github.com/golang/protobuf/proto" @@ -94,12 +95,17 @@ func (m *StatusMessage) HandleEncryption(myKey *ecdsa.PrivateKey, senderKey *ecd return nil } +// HandleDatasync processes StatusMessage through data sync layer. +// This is optional and DataSync might be nil. In such a case, +// only one payload will be returned equal to DecryptedPayload. func (m *StatusMessage) HandleDatasync(datasync *datasync.DataSync) ([]*StatusMessage, error) { var statusMessages []*StatusMessage + payloads := datasync.Handle( m.SigPubKey(), m.DecryptedPayload, ) + for _, payload := range payloads { message, err := m.Clone() if err != nil {