Replace adapter with message processor (#58)

This commit is contained in:
Adam Babik 2019-08-29 09:01:59 +02:00 committed by GitHub
parent 822d18916e
commit 0dd1cd585b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 715 additions and 886 deletions

View File

@ -1,620 +0,0 @@
package statusproto
import (
"context"
"crypto/ecdsa"
"time"
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/crypto"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
"github.com/status-im/status-protocol-go/encryption/sharedsecret"
whisper "github.com/status-im/whisper/whisperv6"
"github.com/status-im/status-protocol-go/encryption"
"github.com/status-im/status-protocol-go/encryption/multidevice"
transport "github.com/status-im/status-protocol-go/transport/whisper"
protocol "github.com/status-im/status-protocol-go/v1"
"github.com/status-im/status-protocol-go/datasync"
datasyncpeer "github.com/status-im/status-protocol-go/datasync/peer"
datasyncproto "github.com/vacp2p/mvds/protobuf"
)
// Whisper message properties.
const (
whisperTTL = 15
whisperPoW = 0.002
whisperPoWTime = 5
)
// whisperAdapter is a bridge between encryption and transport
// layers.
type whisperAdapter struct {
privateKey *ecdsa.PrivateKey
transport *transport.WhisperServiceTransport
protocol *encryption.Protocol
datasync *datasync.DataSync
logger *zap.Logger
featureFlags featureFlags
}
func newWhisperAdapter(
pk *ecdsa.PrivateKey,
t *transport.WhisperServiceTransport,
p *encryption.Protocol,
d *datasync.DataSync,
featureFlags featureFlags,
logger *zap.Logger,
) *whisperAdapter {
if logger == nil {
logger = zap.NewNop()
}
adapter := &whisperAdapter{
privateKey: pk,
transport: t,
protocol: p,
datasync: d,
featureFlags: featureFlags,
logger: logger.With(zap.Namespace("whisperAdapter")),
}
if featureFlags.datasync {
// We pass our encryption/transport handling to the datasync
// so it's correctly encrypted.
d.Init(adapter.sendDataSync)
}
return adapter
}
func (a *whisperAdapter) JoinPublic(chatID string) error {
return a.transport.JoinPublic(chatID)
}
func (a *whisperAdapter) LeavePublic(chatID string) error {
return a.transport.LeavePublic(chatID)
}
func (a *whisperAdapter) JoinPrivate(publicKey *ecdsa.PublicKey) error {
return a.transport.JoinPrivate(publicKey)
}
func (a *whisperAdapter) LeavePrivate(publicKey *ecdsa.PublicKey) error {
return a.transport.LeavePrivate(publicKey)
}
type ChatMessages struct {
Messages []*protocol.Message
Public bool
ChatID string
}
func (a *whisperAdapter) RetrieveAllMessages() ([]ChatMessages, error) {
chatMessages, err := a.transport.RetrieveAllMessages()
if err != nil {
return nil, err
}
var result []ChatMessages
for _, messages := range chatMessages {
protoMessages, err := a.handleRetrievedMessages(messages.Messages)
if err != nil {
return nil, err
}
result = append(result, ChatMessages{
Messages: protoMessages,
Public: messages.Public,
ChatID: messages.ChatID,
})
}
return result, nil
}
// RetrievePublicMessages retrieves the collected public messages.
// It implies joining a chat if it has not been joined yet.
func (a *whisperAdapter) RetrievePublicMessages(chatID string) ([]*protocol.Message, error) {
messages, err := a.transport.RetrievePublicMessages(chatID)
if err != nil {
return nil, err
}
return a.handleRetrievedMessages(messages)
}
// RetrievePrivateMessages retrieves the collected private messages.
// It implies joining a chat if it has not been joined yet.
func (a *whisperAdapter) RetrievePrivateMessages(publicKey *ecdsa.PublicKey) ([]*protocol.Message, error) {
messages, err := a.transport.RetrievePrivateMessages(publicKey)
if err != nil {
return nil, err
}
return a.handleRetrievedMessages(messages)
}
func (a *whisperAdapter) handleRetrievedMessages(messages []*whisper.ReceivedMessage) ([]*protocol.Message, error) {
logger := a.logger.With(zap.String("site", "handleRetrievedMessages"))
decodedMessages := make([]*protocol.Message, 0, len(messages))
for _, item := range messages {
shhMessage := whisper.ToWhisperMessage(item)
hlogger := logger.With(zap.Binary("hash", shhMessage.Hash))
hlogger.Debug("handling a received message")
statusMessages, err := a.handleMessages(shhMessage, true)
if err != nil {
hlogger.Info("failed to decode messages", zap.Error(err))
continue
}
for _, statusMessage := range statusMessages {
switch m := statusMessage.ParsedMessage.(type) {
case protocol.Message:
m.ID = statusMessage.ID
m.SigPubKey = statusMessage.SigPubKey()
decodedMessages = append(decodedMessages, &m)
case protocol.PairMessage:
fromOurDevice := isPubKeyEqual(statusMessage.SigPubKey(), &a.privateKey.PublicKey)
if !fromOurDevice {
hlogger.Debug("received PairMessage from not our device, skipping")
break
}
metadata := &multidevice.InstallationMetadata{
Name: m.Name,
FCMToken: m.FCMToken,
DeviceType: m.DeviceType,
}
err := a.protocol.SetInstallationMetadata(&a.privateKey.PublicKey, m.InstallationID, metadata)
if err != nil {
return nil, err
}
default:
hlogger.Error("skipped a public message of unsupported type")
}
}
}
return decodedMessages, nil
}
// DEPRECATED
func (a *whisperAdapter) RetrieveRawAll() (map[transport.Filter][]*protocol.StatusMessage, error) {
chatWithMessages, err := a.transport.RetrieveRawAll()
if err != nil {
return nil, err
}
logger := a.logger.With(zap.String("site", "RetrieveRawAll"))
result := make(map[transport.Filter][]*protocol.StatusMessage)
for chat, messages := range chatWithMessages {
for _, message := range messages {
shhMessage := whisper.ToWhisperMessage(message)
statusMessages, err := a.handleMessages(shhMessage, false)
if err != nil {
logger.Info("failed to decode messages", zap.Error(err))
continue
}
result[chat] = append(result[chat], statusMessages...)
}
}
return result, nil
}
// handleMessages expects a whisper message as input, and it will go through
// a series of transformations until the message is parsed into an application
// layer message, or in case of Raw methods, the processing stops at the layer
// before
func (a *whisperAdapter) handleMessages(shhMessage *whisper.Message, applicationLayer bool) ([]*protocol.StatusMessage, error) {
logger := a.logger.With(zap.String("site", "handleMessages"))
hlogger := logger.With(zap.Binary("hash", shhMessage.Hash))
var statusMessage protocol.StatusMessage
err := statusMessage.HandleTransport(shhMessage)
if err != nil {
hlogger.Error("failed to handle transport layer message", zap.Error(err))
return nil, err
}
err = a.handleEncryptionLayer(context.Background(), &statusMessage)
if err != nil {
hlogger.Debug("failed to handle an encryption message", zap.Error(err))
}
statusMessages, err := statusMessage.HandleDatasync(a.datasync)
if err != nil {
hlogger.Debug("failed to handle datasync message", zap.Error(err))
}
for _, statusMessage := range statusMessages {
err := statusMessage.HandleApplicationMetadata()
if err != nil {
hlogger.Error("failed to handle application metadata layer message", zap.Error(err))
}
if applicationLayer {
err = statusMessage.HandleApplication()
if err != nil {
hlogger.Error("failed to handle application layer message")
}
}
}
return statusMessages, nil
}
func (a *whisperAdapter) handleEncryptionLayer(ctx context.Context, message *protocol.StatusMessage) error {
publicKey := message.SigPubKey()
logger := a.logger.With(zap.String("site", "handleEncryptionLayer"))
err := message.HandleEncryption(a.privateKey, publicKey, a.protocol)
if err == encryption.ErrDeviceNotFound {
handleErr := a.handleErrDeviceNotFound(ctx, publicKey)
if handleErr != nil {
logger.Error("failed to handle error", zap.Error(err), zap.NamedError("handleErr", handleErr))
}
}
if err != nil {
return errors.Wrap(err, "failed to process an encrypted message")
}
return nil
}
func (a *whisperAdapter) handleErrDeviceNotFound(ctx context.Context, publicKey *ecdsa.PublicKey) error {
now := time.Now().Unix()
advertise, err := a.protocol.ShouldAdvertiseBundle(publicKey, now)
if err != nil {
return err
}
if !advertise {
return nil
}
messageSpec, err := a.protocol.BuildBundleAdvertiseMessage(a.privateKey, publicKey)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
_, _, err = a.sendMessageSpec(ctx, publicKey, messageSpec)
if err != nil {
return err
}
a.protocol.ConfirmBundleAdvertisement(publicKey, now)
return nil
}
// SendPublic sends a public message passing chat name to the transport layer.
//
// Be aware that this method returns a message ID using protocol.MessageID
// instead of Whisper message hash.
func (a *whisperAdapter) SendPublic(ctx context.Context, chatName, chatID string, data []byte, clock int64) ([]byte, error) {
logger := a.logger.With(zap.String("site", "SendPublic"))
logger.Debug("sending a public message", zap.String("chat-name", chatName))
message := protocol.CreatePublicTextMessage(data, clock, chatName)
encodedMessage, err := a.encodeMessage(message)
if err != nil {
return nil, errors.Wrap(err, "failed to encode message")
}
wrappedMessage, err := a.tryWrapMessageV1(encodedMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
messageSpec, err := a.protocol.BuildPublicMessage(a.privateKey, wrappedMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to build public message")
}
newMessage, err := a.messageSpecToWhisper(messageSpec)
if err != nil {
return nil, err
}
hash, err := a.transport.SendPublic(ctx, &newMessage, chatName)
if err != nil {
return nil, err
}
messageID := protocol.MessageID(&a.privateKey.PublicKey, wrappedMessage)
a.transport.Track([][]byte{messageID}, hash, newMessage)
return messageID, nil
}
// SendPublicRaw takes encoded data, encrypts it and sends through the wire.
// DEPRECATED
func (a *whisperAdapter) SendPublicRaw(ctx context.Context, chatName string, data []byte) ([]byte, error) {
var newMessage whisper.NewMessage
wrappedMessage, err := a.tryWrapMessageV1(data)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
newMessage = whisper.NewMessage{
TTL: whisperTTL,
Payload: wrappedMessage,
PowTarget: whisperPoW,
PowTime: whisperPoWTime,
}
hash, err := a.transport.SendPublic(ctx, &newMessage, chatName)
if err != nil {
return nil, err
}
messageID := protocol.MessageID(&a.privateKey.PublicKey, wrappedMessage)
a.transport.Track([][]byte{messageID}, hash, newMessage)
return messageID, nil
}
func (a *whisperAdapter) SendContactCode(ctx context.Context, messageSpec *encryption.ProtocolMessageSpec) ([]byte, error) {
newMessage, err := a.messageSpecToWhisper(messageSpec)
if err != nil {
return nil, err
}
return a.transport.SendPublic(ctx, &newMessage, transport.ContactCodeTopic(&a.privateKey.PublicKey))
}
func (a *whisperAdapter) tryWrapMessageV1(encodedMessage []byte) ([]byte, error) {
if a.featureFlags.sendV1Messages {
wrappedMessage, err := protocol.WrapMessageV1(encodedMessage, a.privateKey)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
return wrappedMessage, nil
}
return encodedMessage, nil
}
func (a *whisperAdapter) encodeMessage(message protocol.Message) ([]byte, error) {
encodedMessage, err := protocol.EncodeMessage(message)
if err != nil {
return nil, errors.Wrap(err, "failed to encode message")
}
return encodedMessage, nil
}
// SendPrivate sends a one-to-one message. It needs to return it
// because the registered Whisper filter handles only incoming messages
// and our own messages need to be handled manually.
//
// This might be not true if a shared secret is used because it relies on
// symmetric encryption.
//
// Be aware that this method returns a message ID using protocol.MessageID
// instead of Whisper message hash.
func (a *whisperAdapter) SendPrivate(
ctx context.Context,
publicKey *ecdsa.PublicKey,
chatID string,
data []byte,
clock int64,
) ([]byte, *protocol.Message, error) {
logger := a.logger.With(zap.String("site", "SendPrivate"))
logger.Debug("sending a private message", zap.Binary("public-key", crypto.FromECDSAPub(publicKey)))
message := protocol.CreatePrivateTextMessage(data, clock, chatID)
encodedMessage, err := a.encodeMessage(message)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to encode message")
}
wrappedMessage, err := a.tryWrapMessageV1(encodedMessage)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to wrap message")
}
if a.featureFlags.datasync {
if err := a.sendWithDataSync(publicKey, wrappedMessage); err != nil {
return nil, nil, errors.Wrap(err, "failed to send message with datasync")
}
} else {
err = a.encryptAndSend(ctx, publicKey, wrappedMessage)
if err != nil {
return nil, nil, err
}
}
return protocol.MessageID(&a.privateKey.PublicKey, wrappedMessage), &message, nil
}
func (a *whisperAdapter) sendWithDataSync(publicKey *ecdsa.PublicKey, message []byte) error {
groupID := datasync.ToOneToOneGroupID(&a.privateKey.PublicKey, publicKey)
peerID := datasyncpeer.PublicKeyToPeerID(*publicKey)
exist, err := a.datasync.IsPeerInGroup(groupID, peerID)
if err != nil {
return errors.Wrap(err, "failed to check if peer is in group")
}
if !exist {
if err := a.datasync.AddPeer(groupID, peerID); err != nil {
return errors.Wrap(err, "failed to add peer")
}
}
_, err = a.datasync.AppendMessage(groupID, message)
if err != nil {
return errors.Wrap(err, "failed to append message to datasync")
}
return nil
}
// SendPrivateRaw takes encoded data, encrypts it and sends through the wire.
// DEPRECATED
func (a *whisperAdapter) SendPrivateRaw(
ctx context.Context,
publicKey *ecdsa.PublicKey,
data []byte,
) ([]byte, error) {
a.logger.Debug(
"sending a private message",
zap.Binary("public-key", crypto.FromECDSAPub(publicKey)),
zap.String("site", "SendPrivateRaw"),
)
wrappedMessage, err := a.tryWrapMessageV1(data)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
messageSpec, err := a.protocol.BuildDirectMessage(a.privateKey, publicKey, wrappedMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to encrypt message")
}
messageID := protocol.MessageID(&a.privateKey.PublicKey, wrappedMessage)
if a.featureFlags.datasync {
if err := a.sendWithDataSync(publicKey, wrappedMessage); err != nil {
return nil, errors.Wrap(err, "failed to send message with datasync")
}
return messageID, nil
}
hash, newMessage, err := a.sendMessageSpec(ctx, publicKey, messageSpec)
a.transport.Track([][]byte{messageID}, hash, *newMessage)
return messageID, err
}
func (a *whisperAdapter) sendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec) ([]byte, *whisper.NewMessage, error) {
var err error
var hash []byte
newMessage, err := a.messageSpecToWhisper(messageSpec)
if err != nil {
return nil, nil, err
}
logger := a.logger.With(zap.String("site", "sendMessageSpec"))
switch {
case messageSpec.SharedSecret != nil:
logger.Debug("sending using shared secret")
hash, err = a.transport.SendPrivateWithSharedSecret(ctx, &newMessage, publicKey, messageSpec.SharedSecret)
case messageSpec.PartitionedTopicMode() == encryption.PartitionTopicV1:
logger.Debug("sending partitioned topic")
hash, err = a.transport.SendPrivateWithPartitioned(ctx, &newMessage, publicKey)
case !a.featureFlags.genericDiscoveryTopicEnabled:
logger.Debug("sending partitioned topic (generic discovery topic disabled)")
hash, err = a.transport.SendPrivateWithPartitioned(ctx, &newMessage, publicKey)
default:
logger.Debug("sending using discovery topic")
hash, err = a.transport.SendPrivateOnDiscovery(ctx, &newMessage, publicKey)
}
if err != nil {
return nil, nil, err
}
return hash, &newMessage, nil
}
func (a *whisperAdapter) sendDataSync(ctx context.Context, publicKey *ecdsa.PublicKey, encodedMessage []byte, datasyncPayload *datasyncproto.Payload) error {
var messageIDs [][]byte
for _, payload := range datasyncPayload.Messages {
messageIDs = append(messageIDs, protocol.MessageID(&a.privateKey.PublicKey, payload.Body))
}
messageSpec, err := a.protocol.BuildDirectMessage(a.privateKey, publicKey, encodedMessage)
if err != nil {
return errors.Wrap(err, "failed to encrypt message")
}
hash, newMessage, err := a.sendMessageSpec(ctx, publicKey, messageSpec)
if err != nil {
return err
}
a.transport.Track(messageIDs, hash, *newMessage)
return nil
}
func (a *whisperAdapter) encryptAndSend(ctx context.Context, publicKey *ecdsa.PublicKey, encodedMessage []byte) error {
messageID := protocol.MessageID(&a.privateKey.PublicKey, encodedMessage)
messageSpec, err := a.protocol.BuildDirectMessage(a.privateKey, publicKey, encodedMessage)
if err != nil {
return errors.Wrap(err, "failed to encrypt message")
}
hash, newMessage, err := a.sendMessageSpec(ctx, publicKey, messageSpec)
if err != nil {
return err
}
a.transport.Track([][]byte{messageID}, hash, *newMessage)
return nil
}
func (a *whisperAdapter) messageSpecToWhisper(spec *encryption.ProtocolMessageSpec) (whisper.NewMessage, error) {
var newMessage whisper.NewMessage
payload, err := proto.Marshal(spec.Message)
if err != nil {
return newMessage, err
}
newMessage = whisper.NewMessage{
TTL: whisperTTL,
Payload: payload,
PowTarget: whisperPoW,
PowTime: whisperPoWTime,
}
return newMessage, nil
}
func (a *whisperAdapter) handleSharedSecrets(secrets []*sharedsecret.Secret) ([]*transport.Filter, error) {
logger := a.logger.With(zap.String("site", "handleSharedSecrets"))
var filters []*transport.Filter
for _, secret := range secrets {
logger.Debug("received shared secret", zap.Binary("identity", crypto.FromECDSAPub(secret.Identity)))
fSecret := transport.NegotiatedSecret{
PublicKey: secret.Identity,
Key: secret.Key,
}
filter, err := a.transport.ProcessNegotiatedSecret(fSecret)
if err != nil {
return nil, err
}
filters = append(filters, filter)
}
return filters, nil
}
func (a *whisperAdapter) Stop() {
a.transport.Stop()
}
// isPubKeyEqual checks that two public keys are equal
func isPubKeyEqual(a, b *ecdsa.PublicKey) bool {
// the curve is always the same, just compare the points
return a.X.Cmp(b.X) == 0 && a.Y.Cmp(b.Y) == 0
}

View File

@ -2,6 +2,7 @@ package datasync
import (
"crypto/ecdsa"
"github.com/golang/protobuf/proto"
datasyncpeer "github.com/status-im/status-protocol-go/datasync/peer"
datasyncnode "github.com/vacp2p/mvds/node"
@ -31,7 +32,6 @@ func (d *DataSync) Add(publicKey *ecdsa.PublicKey, datasyncMessage datasyncproto
}
func (d *DataSync) Handle(sender *ecdsa.PublicKey, payload []byte) [][]byte {
var payloads [][]byte
logger := d.logger.With(zap.String("site", "Handle"))

467
message_processor.go Normal file
View File

@ -0,0 +1,467 @@
package statusproto
import (
"context"
"crypto/ecdsa"
"database/sql"
"time"
"github.com/ethereum/go-ethereum/crypto"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
whisper "github.com/status-im/whisper/whisperv6"
"go.uber.org/zap"
"github.com/status-im/status-protocol-go/datasync"
datasyncpeer "github.com/status-im/status-protocol-go/datasync/peer"
"github.com/status-im/status-protocol-go/encryption"
"github.com/status-im/status-protocol-go/encryption/multidevice"
transport "github.com/status-im/status-protocol-go/transport/whisper"
protocol "github.com/status-im/status-protocol-go/v1"
datasyncnode "github.com/vacp2p/mvds/node"
datasyncproto "github.com/vacp2p/mvds/protobuf"
)
// Whisper message properties.
const (
whisperTTL = 15
whisperPoW = 0.002
whisperPoWTime = 5
)
type messageProcessor struct {
identity *ecdsa.PrivateKey
datasync *datasync.DataSync
protocol *encryption.Protocol
transport *transport.WhisperServiceTransport
logger *zap.Logger
featureFlags featureFlags
}
func newMessageProcessor(
identity *ecdsa.PrivateKey,
database *sql.DB,
enc *encryption.Protocol,
transport *transport.WhisperServiceTransport,
logger *zap.Logger,
features featureFlags,
) (*messageProcessor, error) {
dataSyncTransport := datasync.NewDataSyncNodeTransport()
dataSyncNode, err := datasyncnode.NewPersistentNode(
database,
dataSyncTransport,
datasyncpeer.PublicKeyToPeerID(identity.PublicKey),
datasyncnode.BATCH,
datasync.CalculateSendTime,
logger,
)
if err != nil {
return nil, err
}
ds := datasync.New(dataSyncNode, dataSyncTransport, features.datasync, logger)
p := &messageProcessor{
identity: identity,
datasync: ds,
protocol: enc,
transport: transport,
logger: logger,
featureFlags: features,
}
// Initializing DataSync is required to encrypt and send messages.
// With DataSync enabled, messages are added to the DataSync
// but actual encrypt and send calls are postponed.
// sendDataSync is responsible for encrypting and sending postponed messages.
if features.datasync {
ds.Init(p.sendDataSync)
ds.Start(300 * time.Millisecond)
}
return p, nil
}
func (p *messageProcessor) Stop() {
p.datasync.Stop() // idempotent op
}
func (p *messageProcessor) SendPrivate(
ctx context.Context,
publicKey *ecdsa.PublicKey,
chatID string,
data []byte,
clock int64,
) ([]byte, *protocol.Message, error) {
logger := p.logger.With(zap.String("site", "SendPrivate"))
logger.Debug("sending a private message", zap.Binary("public-key", crypto.FromECDSAPub(publicKey)))
message := protocol.CreatePrivateTextMessage(data, clock, chatID)
encodedMessage, err := p.encodeMessage(message)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to encode message")
}
messageID, err := p.SendPrivateRaw(ctx, publicKey, encodedMessage)
if err != nil {
return nil, nil, err
}
return messageID, &message, nil
}
// SendPrivateRaw takes encoded data, encrypts it and sends through the wire.
// DEPRECATED
func (p *messageProcessor) SendPrivateRaw(
ctx context.Context,
publicKey *ecdsa.PublicKey,
data []byte,
) ([]byte, error) {
p.logger.Debug(
"sending a private message",
zap.Binary("public-key", crypto.FromECDSAPub(publicKey)),
zap.String("site", "SendPrivateRaw"),
)
wrappedMessage, err := p.tryWrapMessageV1(data)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
messageID := protocol.MessageID(&p.identity.PublicKey, wrappedMessage)
if p.featureFlags.datasync {
if err := p.addToDataSync(publicKey, wrappedMessage); err != nil {
return nil, errors.Wrap(err, "failed to send message with datasync")
}
} else {
messageSpec, err := p.protocol.BuildDirectMessage(p.identity, publicKey, wrappedMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to encrypt message")
}
hash, newMessage, err := p.sendMessageSpec(ctx, publicKey, messageSpec)
if err != nil {
return nil, errors.Wrap(err, "failed to send a message spec")
}
p.transport.Track([][]byte{messageID}, hash, *newMessage)
}
return messageID, nil
}
func (p *messageProcessor) SendPublic(ctx context.Context, chatName, chatID string, data []byte, clock int64) ([]byte, error) {
logger := p.logger.With(zap.String("site", "SendPublic"))
logger.Debug("sending a public message", zap.String("chat-name", chatName))
message := protocol.CreatePublicTextMessage(data, clock, chatName)
encodedMessage, err := p.encodeMessage(message)
if err != nil {
return nil, errors.Wrap(err, "failed to encode message")
}
wrappedMessage, err := p.tryWrapMessageV1(encodedMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
messageSpec, err := p.protocol.BuildPublicMessage(p.identity, wrappedMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to build public message")
}
newMessage, err := messageSpecToWhisper(messageSpec)
if err != nil {
return nil, err
}
hash, err := p.transport.SendPublic(ctx, &newMessage, chatName)
if err != nil {
return nil, err
}
messageID := protocol.MessageID(&p.identity.PublicKey, wrappedMessage)
p.transport.Track([][]byte{messageID}, hash, newMessage)
return messageID, nil
}
// SendPublicRaw takes encoded data, encrypts it and sends through the wire.
// DEPRECATED
func (p *messageProcessor) SendPublicRaw(ctx context.Context, chatName string, data []byte) ([]byte, error) {
var newMessage whisper.NewMessage
wrappedMessage, err := p.tryWrapMessageV1(data)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
newMessage = whisper.NewMessage{
TTL: whisperTTL,
Payload: wrappedMessage,
PowTarget: whisperPoW,
PowTime: whisperPoWTime,
}
hash, err := p.transport.SendPublic(ctx, &newMessage, chatName)
if err != nil {
return nil, err
}
messageID := protocol.MessageID(&p.identity.PublicKey, wrappedMessage)
p.transport.Track([][]byte{messageID}, hash, newMessage)
return messageID, nil
}
func (p *messageProcessor) Process(messages []*whisper.ReceivedMessage) ([]*protocol.Message, error) {
logger := p.logger.With(zap.String("site", "handleRetrievedMessages"))
decodedMessages := make([]*protocol.Message, 0, len(messages))
for _, item := range messages {
shhMessage := whisper.ToWhisperMessage(item)
hlogger := logger.With(zap.Binary("hash", shhMessage.Hash))
hlogger.Debug("handling a received message")
statusMessages, err := p.handleMessages(shhMessage, true)
if err != nil {
hlogger.Info("failed to decode messages", zap.Error(err))
continue
}
for _, statusMessage := range statusMessages {
switch m := statusMessage.ParsedMessage.(type) {
case protocol.Message:
m.ID = statusMessage.ID
m.SigPubKey = statusMessage.SigPubKey()
decodedMessages = append(decodedMessages, &m)
case protocol.PairMessage:
fromOurDevice := isPubKeyEqual(statusMessage.SigPubKey(), &p.identity.PublicKey)
if !fromOurDevice {
hlogger.Debug("received PairMessage from not our device, skipping")
break
}
metadata := &multidevice.InstallationMetadata{
Name: m.Name,
FCMToken: m.FCMToken,
DeviceType: m.DeviceType,
}
err := p.protocol.SetInstallationMetadata(&p.identity.PublicKey, m.InstallationID, metadata)
if err != nil {
return nil, err
}
default:
hlogger.Error("skipped a public message of unsupported type")
}
}
}
return decodedMessages, nil
}
// handleMessages expects a whisper message as input, and it will go through
// a series of transformations until the message is parsed into an application
// layer message, or in case of Raw methods, the processing stops at the layer
// before
func (p *messageProcessor) handleMessages(shhMessage *whisper.Message, applicationLayer bool) ([]*protocol.StatusMessage, error) {
logger := p.logger.With(zap.String("site", "handleMessages"))
hlogger := logger.With(zap.Binary("hash", shhMessage.Hash))
var statusMessage protocol.StatusMessage
err := statusMessage.HandleTransport(shhMessage)
if err != nil {
hlogger.Error("failed to handle transport layer message", zap.Error(err))
return nil, err
}
err = p.handleEncryptionLayer(context.Background(), &statusMessage)
if err != nil {
hlogger.Debug("failed to handle an encryption message", zap.Error(err))
}
statusMessages, err := statusMessage.HandleDatasync(p.datasync)
if err != nil {
hlogger.Debug("failed to handle datasync message", zap.Error(err))
}
for _, statusMessage := range statusMessages {
err := statusMessage.HandleApplicationMetadata()
if err != nil {
hlogger.Error("failed to handle application metadata layer message", zap.Error(err))
}
if applicationLayer {
err = statusMessage.HandleApplication()
if err != nil {
hlogger.Error("failed to handle application layer message")
}
}
}
return statusMessages, nil
}
func (p *messageProcessor) handleEncryptionLayer(ctx context.Context, message *protocol.StatusMessage) error {
logger := p.logger.With(zap.String("site", "handleEncryptionLayer"))
publicKey := message.SigPubKey()
err := message.HandleEncryption(p.identity, publicKey, p.protocol)
if err == encryption.ErrDeviceNotFound {
handleErr := p.handleErrDeviceNotFound(ctx, publicKey)
if handleErr != nil {
logger.Error("failed to handle error", zap.Error(err), zap.NamedError("handleErr", handleErr))
}
}
if err != nil {
return errors.Wrap(err, "failed to process an encrypted message")
}
return nil
}
func (p *messageProcessor) handleErrDeviceNotFound(ctx context.Context, publicKey *ecdsa.PublicKey) error {
now := time.Now().Unix()
advertise, err := p.protocol.ShouldAdvertiseBundle(publicKey, now)
if err != nil {
return err
}
if !advertise {
return nil
}
messageSpec, err := p.protocol.BuildBundleAdvertiseMessage(p.identity, publicKey)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
_, _, err = p.sendMessageSpec(ctx, publicKey, messageSpec)
if err != nil {
return err
}
p.protocol.ConfirmBundleAdvertisement(publicKey, now)
return nil
}
func (p *messageProcessor) encodeMessage(message protocol.Message) ([]byte, error) {
encodedMessage, err := protocol.EncodeMessage(message)
if err != nil {
return nil, errors.Wrap(err, "failed to encode message")
}
return encodedMessage, nil
}
func (p *messageProcessor) tryWrapMessageV1(encodedMessage []byte) ([]byte, error) {
if p.featureFlags.sendV1Messages {
wrappedMessage, err := protocol.WrapMessageV1(encodedMessage, p.identity)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
return wrappedMessage, nil
}
return encodedMessage, nil
}
func (p *messageProcessor) addToDataSync(publicKey *ecdsa.PublicKey, message []byte) error {
groupID := datasync.ToOneToOneGroupID(&p.identity.PublicKey, publicKey)
peerID := datasyncpeer.PublicKeyToPeerID(*publicKey)
exist, err := p.datasync.IsPeerInGroup(groupID, peerID)
if err != nil {
return errors.Wrap(err, "failed to check if peer is in group")
}
if !exist {
if err := p.datasync.AddPeer(groupID, peerID); err != nil {
return errors.Wrap(err, "failed to add peer")
}
}
_, err = p.datasync.AppendMessage(groupID, message)
if err != nil {
return errors.Wrap(err, "failed to append message to datasync")
}
return nil
}
// sendDataSync sends a message scheduled by the data sync layer.
func (p *messageProcessor) sendDataSync(ctx context.Context, publicKey *ecdsa.PublicKey, encodedMessage []byte, payload *datasyncproto.Payload) error {
messageIDs := make([][]byte, 0, len(payload.Messages))
for _, payload := range payload.Messages {
messageIDs = append(messageIDs, protocol.MessageID(&p.identity.PublicKey, payload.Body))
}
messageSpec, err := p.protocol.BuildDirectMessage(p.identity, publicKey, encodedMessage)
if err != nil {
return errors.Wrap(err, "failed to encrypt message")
}
hash, newMessage, err := p.sendMessageSpec(ctx, publicKey, messageSpec)
if err != nil {
return err
}
p.transport.Track(messageIDs, hash, *newMessage)
return nil
}
func (p *messageProcessor) sendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec) ([]byte, *whisper.NewMessage, error) {
newMessage, err := messageSpecToWhisper(messageSpec)
if err != nil {
return nil, nil, err
}
logger := p.logger.With(zap.String("site", "sendMessageSpec"))
var hash []byte
switch {
case messageSpec.SharedSecret != nil:
logger.Debug("sending using shared secret")
hash, err = p.transport.SendPrivateWithSharedSecret(ctx, &newMessage, publicKey, messageSpec.SharedSecret)
case messageSpec.PartitionedTopicMode() == encryption.PartitionTopicV1:
logger.Debug("sending partitioned topic")
hash, err = p.transport.SendPrivateWithPartitioned(ctx, &newMessage, publicKey)
case !p.featureFlags.genericDiscoveryTopicEnabled:
logger.Debug("sending partitioned topic (generic discovery topic disabled)")
hash, err = p.transport.SendPrivateWithPartitioned(ctx, &newMessage, publicKey)
default:
logger.Debug("sending using discovery topic")
hash, err = p.transport.SendPrivateOnDiscovery(ctx, &newMessage, publicKey)
}
if err != nil {
return nil, nil, err
}
return hash, &newMessage, nil
}
func messageSpecToWhisper(spec *encryption.ProtocolMessageSpec) (whisper.NewMessage, error) {
var newMessage whisper.NewMessage
payload, err := proto.Marshal(spec.Message)
if err != nil {
return newMessage, err
}
newMessage = whisper.NewMessage{
TTL: whisperTTL,
Payload: payload,
PowTarget: whisperPoW,
PowTime: whisperPoWTime,
}
return newMessage, nil
}
// isPubKeyEqual checks that two public keys are equal
func isPubKeyEqual(a, b *ecdsa.PublicKey) bool {
// the curve is always the same, just compare the points
return a.X.Cmp(b.X) == 0 && a.Y.Cmp(b.Y) == 0
}

View File

@ -1,35 +1,41 @@
package statusproto
import (
"crypto/ecdsa"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
"github.com/status-im/status-protocol-go/sqlite"
"github.com/ethereum/go-ethereum/crypto"
"github.com/golang/protobuf/proto"
"github.com/status-im/status-protocol-go/encryption"
"github.com/status-im/status-protocol-go/encryption/multidevice"
"github.com/status-im/status-protocol-go/encryption/sharedsecret"
transport "github.com/status-im/status-protocol-go/transport/whisper"
whisper "github.com/status-im/whisper/whisperv6"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/crypto"
"github.com/status-im/status-protocol-go/datasync"
datasyncpeer "github.com/status-im/status-protocol-go/datasync/peer"
datasyncnode "github.com/vacp2p/mvds/node"
datasyncproto "github.com/vacp2p/mvds/protobuf"
"github.com/status-im/status-protocol-go/encryption"
"github.com/status-im/status-protocol-go/encryption/multidevice"
"github.com/status-im/status-protocol-go/encryption/sharedsecret"
"github.com/status-im/status-protocol-go/sqlite"
transport "github.com/status-im/status-protocol-go/transport/whisper"
protocol "github.com/status-im/status-protocol-go/v1"
whisper "github.com/status-im/whisper/whisperv6"
datasyncproto "github.com/vacp2p/mvds/protobuf"
)
var (
testMessageStruct = protocol.Message{
func TestMessageProcessorSuite(t *testing.T) {
suite.Run(t, new(MessageProcessorSuite))
}
type MessageProcessorSuite struct {
suite.Suite
processor *messageProcessor
tmpDir string
testMessage protocol.Message
logger *zap.Logger
}
func (s *MessageProcessorSuite) SetupTest() {
s.testMessage = protocol.Message{
Text: "abc123",
ContentT: "text/plain",
MessageT: "public-group-user-message",
@ -40,53 +46,19 @@ var (
Text: "abc123",
},
}
)
func TestAdaptersSuite(t *testing.T) {
suite.Run(t, new(AdaptersSuite))
}
type AdaptersSuite struct {
suite.Suite
a *whisperAdapter
tmpDir string
privateKey *ecdsa.PrivateKey
senderEncryptionProtocol *encryption.Protocol
logger *zap.Logger
}
func (s *AdaptersSuite) SetupTest() {
var err error
logger, err := zap.NewDevelopment()
s.Require().NoError(err)
s.logger = logger
s.tmpDir, err = ioutil.TempDir("", "adapters-test")
s.logger, err = zap.NewDevelopment()
s.Require().NoError(err)
database, err := sqlite.Open(filepath.Join(s.tmpDir, "transport.db.sql"), "some-key")
s.tmpDir, err = ioutil.TempDir("", "")
s.Require().NoError(err)
s.privateKey, err = crypto.GenerateKey()
identity, err := crypto.GenerateKey()
s.Require().NoError(err)
whisperConfig := whisper.DefaultConfig
whisperConfig.MinimumAcceptedPOW = 0
shh := whisper.New(&whisperConfig)
s.Require().NoError(shh.Start(nil))
config := &config{}
s.Require().NoError(WithDatasync()(config))
whisperTransport, err := transport.NewWhisperServiceTransport(
shh,
s.privateKey,
database,
nil,
nil,
logger,
)
database, err := sqlite.Open(filepath.Join(s.tmpDir, "processor-test.sql"), "some-key")
s.Require().NoError(err)
onNewInstallations := func([]*multidevice.Installation) {}
@ -98,79 +70,74 @@ func (s *AdaptersSuite) SetupTest() {
onNewInstallations,
onNewSharedSecret,
onSendContactCode,
logger,
s.logger,
)
senderDatabase, err := sqlite.Open(filepath.Join(s.tmpDir, "sender.db.sql"), "some-key")
s.Require().NoError(err)
s.senderEncryptionProtocol = encryption.New(
senderDatabase,
"installation-2",
onNewInstallations,
onNewSharedSecret,
onSendContactCode,
logger,
)
whisperConfig := whisper.DefaultConfig
whisperConfig.MinimumAcceptedPOW = 0
shh := whisper.New(&whisperConfig)
s.Require().NoError(shh.Start(nil))
config := &config{}
s.Require().NoError(WithDatasync()(config))
dataSyncTransport := datasync.NewDataSyncNodeTransport()
dataSyncNode, err := datasyncnode.NewPersistentNode(
senderDatabase,
dataSyncTransport,
datasyncpeer.PublicKeyToPeerID(s.privateKey.PublicKey),
datasyncnode.BATCH,
datasync.CalculateSendTime,
logger,
whisperTransport, err := transport.NewWhisperServiceTransport(
shh,
identity,
database,
nil,
nil,
s.logger,
)
s.Require().NoError(err)
datasync := datasync.New(dataSyncNode, dataSyncTransport, true, logger)
s.a = newWhisperAdapter(
s.privateKey,
whisperTransport,
s.processor, err = newMessageProcessor(
identity,
database,
encryptionProtocol,
datasync,
config.featureFlags,
logger,
whisperTransport,
s.logger,
featureFlags{},
)
dataSyncNode.Start(100 * time.Second)
s.Require().NoError(err)
}
func (s *AdaptersSuite) TestHandleDecodedMessagesSingle() {
func (s *MessageProcessorSuite) TearDownTest() {
os.Remove(s.tmpDir)
_ = s.logger.Sync()
}
func (s *MessageProcessorSuite) TestHandleDecodedMessagesSingle() {
privateKey, err := crypto.GenerateKey()
s.Require().NoError(err)
encodedPayload, err := protocol.EncodeMessage(testMessageStruct)
encodedPayload, err := protocol.EncodeMessage(s.testMessage)
s.Require().NoError(err)
message := &whisper.Message{}
message.Sig = crypto.FromECDSAPub(&privateKey.PublicKey)
message.Payload = encodedPayload
decodedMessages, err := s.a.handleMessages(message, true)
decodedMessages, err := s.processor.handleMessages(message, true)
s.Require().NoError(err)
s.Require().Equal(1, len(decodedMessages))
s.Require().Equal(encodedPayload, decodedMessages[0].DecryptedPayload)
s.Require().Equal(&privateKey.PublicKey, decodedMessages[0].SigPubKey())
s.Require().Equal(protocol.MessageID(&privateKey.PublicKey, encodedPayload), decodedMessages[0].ID)
s.Require().Equal(testMessageStruct, decodedMessages[0].ParsedMessage)
s.Require().Equal(s.testMessage, decodedMessages[0].ParsedMessage)
}
func (s *AdaptersSuite) TestHandleDecodedMessagesRaw() {
func (s *MessageProcessorSuite) TestHandleDecodedMessagesRaw() {
privateKey, err := crypto.GenerateKey()
s.Require().NoError(err)
encodedPayload, err := protocol.EncodeMessage(testMessageStruct)
encodedPayload, err := protocol.EncodeMessage(s.testMessage)
s.Require().NoError(err)
message := &whisper.Message{}
message.Sig = crypto.FromECDSAPub(&privateKey.PublicKey)
message.Payload = encodedPayload
decodedMessages, err := s.a.handleMessages(message, false)
decodedMessages, err := s.processor.handleMessages(message, false)
s.Require().NoError(err)
s.Require().Equal(1, len(decodedMessages))
s.Require().Equal(message, decodedMessages[0].TransportMessage)
@ -180,14 +147,14 @@ func (s *AdaptersSuite) TestHandleDecodedMessagesRaw() {
s.Require().Equal(nil, decodedMessages[0].ParsedMessage)
}
func (s *AdaptersSuite) TestHandleDecodedMessagesWrapped() {
func (s *MessageProcessorSuite) TestHandleDecodedMessagesWrapped() {
relayerKey, err := crypto.GenerateKey()
s.Require().NoError(err)
authorKey, err := crypto.GenerateKey()
s.Require().NoError(err)
encodedPayload, err := protocol.EncodeMessage(testMessageStruct)
encodedPayload, err := protocol.EncodeMessage(s.testMessage)
s.Require().NoError(err)
wrappedPayload, err := protocol.WrapMessageV1(encodedPayload, authorKey)
@ -197,25 +164,24 @@ func (s *AdaptersSuite) TestHandleDecodedMessagesWrapped() {
message.Sig = crypto.FromECDSAPub(&relayerKey.PublicKey)
message.Payload = wrappedPayload
decodedMessages, err := s.a.handleMessages(message, true)
decodedMessages, err := s.processor.handleMessages(message, true)
s.Require().NoError(err)
s.Require().Equal(1, len(decodedMessages))
s.Require().Equal(&authorKey.PublicKey, decodedMessages[0].SigPubKey())
s.Require().Equal(protocol.MessageID(&authorKey.PublicKey, wrappedPayload), decodedMessages[0].ID)
s.Require().Equal(encodedPayload, decodedMessages[0].DecryptedPayload)
s.Require().Equal(testMessageStruct, decodedMessages[0].ParsedMessage)
s.Require().Equal(s.testMessage, decodedMessages[0].ParsedMessage)
}
func (s *AdaptersSuite) TestHandleDecodedMessagesDatasync() {
func (s *MessageProcessorSuite) TestHandleDecodedMessagesDatasync() {
relayerKey, err := crypto.GenerateKey()
s.Require().NoError(err)
authorKey, err := crypto.GenerateKey()
s.Require().NoError(err)
encodedPayload, err := protocol.EncodeMessage(testMessageStruct)
encodedPayload, err := protocol.EncodeMessage(s.testMessage)
s.Require().NoError(err)
wrappedPayload, err := protocol.WrapMessageV1(encodedPayload, authorKey)
@ -223,8 +189,8 @@ func (s *AdaptersSuite) TestHandleDecodedMessagesDatasync() {
dataSyncMessage := datasyncproto.Payload{
Messages: []*datasyncproto.Message{
&datasyncproto.Message{Body: encodedPayload},
&datasyncproto.Message{Body: wrappedPayload},
{Body: encodedPayload},
{Body: wrappedPayload},
},
}
marshalledDataSyncMessage, err := proto.Marshal(&dataSyncMessage)
@ -233,7 +199,7 @@ func (s *AdaptersSuite) TestHandleDecodedMessagesDatasync() {
message.Sig = crypto.FromECDSAPub(&relayerKey.PublicKey)
message.Payload = marshalledDataSyncMessage
decodedMessages, err := s.a.handleMessages(message, true)
decodedMessages, err := s.processor.handleMessages(message, true)
s.Require().NoError(err)
// We send two messages, the unwrapped one will be attributed to the relayer, while the wrapped one will be attributed to the author
@ -241,22 +207,22 @@ func (s *AdaptersSuite) TestHandleDecodedMessagesDatasync() {
s.Require().Equal(&relayerKey.PublicKey, decodedMessages[0].SigPubKey())
s.Require().Equal(protocol.MessageID(&relayerKey.PublicKey, encodedPayload), decodedMessages[0].ID)
s.Require().Equal(encodedPayload, decodedMessages[0].DecryptedPayload)
s.Require().Equal(testMessageStruct, decodedMessages[0].ParsedMessage)
s.Require().Equal(s.testMessage, decodedMessages[0].ParsedMessage)
s.Require().Equal(&authorKey.PublicKey, decodedMessages[1].SigPubKey())
s.Require().Equal(protocol.MessageID(&authorKey.PublicKey, wrappedPayload), decodedMessages[1].ID)
s.Require().Equal(encodedPayload, decodedMessages[1].DecryptedPayload)
s.Require().Equal(testMessageStruct, decodedMessages[1].ParsedMessage)
s.Require().Equal(s.testMessage, decodedMessages[1].ParsedMessage)
}
func (s *AdaptersSuite) TestHandleDecodedMessagesDatasyncEncrypted() {
func (s *MessageProcessorSuite) TestHandleDecodedMessagesDatasyncEncrypted() {
relayerKey, err := crypto.GenerateKey()
s.Require().NoError(err)
authorKey, err := crypto.GenerateKey()
s.Require().NoError(err)
encodedPayload, err := protocol.EncodeMessage(testMessageStruct)
encodedPayload, err := protocol.EncodeMessage(s.testMessage)
s.Require().NoError(err)
wrappedPayload, err := protocol.WrapMessageV1(encodedPayload, authorKey)
@ -271,7 +237,23 @@ func (s *AdaptersSuite) TestHandleDecodedMessagesDatasyncEncrypted() {
marshalledDataSyncMessage, err := proto.Marshal(&dataSyncMessage)
s.Require().NoError(err)
messageSpec, err := s.senderEncryptionProtocol.BuildDirectMessage(relayerKey, &s.privateKey.PublicKey, marshalledDataSyncMessage)
// Create sender encryption protocol.
senderDatabase, err := sqlite.Open(filepath.Join(s.tmpDir, "sender.db.sql"), "")
s.Require().NoError(err)
senderEncryptionProtocol := encryption.New(
senderDatabase,
"installation-2",
func([]*multidevice.Installation) {},
func([]*sharedsecret.Secret) {},
func(*encryption.ProtocolMessageSpec) {},
s.logger,
)
messageSpec, err := senderEncryptionProtocol.BuildDirectMessage(
relayerKey,
&s.processor.identity.PublicKey,
marshalledDataSyncMessage,
)
s.Require().NoError(err)
encryptedPayload, err := proto.Marshal(messageSpec.Message)
@ -281,24 +263,19 @@ func (s *AdaptersSuite) TestHandleDecodedMessagesDatasyncEncrypted() {
message.Sig = crypto.FromECDSAPub(&relayerKey.PublicKey)
message.Payload = encryptedPayload
decodedMessages, err := s.a.handleMessages(message, true)
decodedMessages, err := s.processor.handleMessages(message, true)
s.Require().NoError(err)
// We send two messages, the unwrapped one will be attributed to the relayer, while the wrapped one will be attributed to the author
// We send two messages, the unwrapped one will be attributed to the relayer,
// while the wrapped one will be attributed to the author.
s.Require().Equal(2, len(decodedMessages))
s.Require().Equal(&relayerKey.PublicKey, decodedMessages[0].SigPubKey())
s.Require().Equal(protocol.MessageID(&relayerKey.PublicKey, encodedPayload), decodedMessages[0].ID)
s.Require().Equal(encodedPayload, decodedMessages[0].DecryptedPayload)
s.Require().Equal(testMessageStruct, decodedMessages[0].ParsedMessage)
s.Require().Equal(s.testMessage, decodedMessages[0].ParsedMessage)
s.Require().Equal(&authorKey.PublicKey, decodedMessages[1].SigPubKey())
s.Require().Equal(protocol.MessageID(&authorKey.PublicKey, wrappedPayload), decodedMessages[1].ID)
s.Require().Equal(encodedPayload, decodedMessages[1].DecryptedPayload)
s.Require().Equal(testMessageStruct, decodedMessages[1].ParsedMessage)
}
func (s *AdaptersSuite) TearDownTest() {
os.Remove(s.tmpDir)
_ = s.logger.Sync()
s.Require().Equal(s.testMessage, decodedMessages[1].ParsedMessage)
}

View File

@ -6,20 +6,17 @@ import (
"database/sql"
"time"
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/crypto"
"github.com/pkg/errors"
whisper "github.com/status-im/whisper/whisperv6"
"go.uber.org/zap"
"github.com/status-im/status-protocol-go/datasync"
datasyncpeer "github.com/status-im/status-protocol-go/datasync/peer"
"github.com/status-im/status-protocol-go/encryption"
"github.com/status-im/status-protocol-go/encryption/multidevice"
"github.com/status-im/status-protocol-go/encryption/sharedsecret"
"github.com/status-im/status-protocol-go/sqlite"
transport "github.com/status-im/status-protocol-go/transport/whisper"
protocol "github.com/status-im/status-protocol-go/v1"
datasyncnode "github.com/vacp2p/mvds/node"
)
var (
@ -37,11 +34,12 @@ var (
type Messenger struct {
identity *ecdsa.PrivateKey
persistence *sqlitePersistence
adapter *whisperAdapter
transport *transport.WhisperServiceTransport
encryptor *encryption.Protocol
processor *messageProcessor
logger *zap.Logger
ownMessages map[string][]*protocol.Message
ownMessages []*protocol.Message
featureFlags featureFlags
messagesPersistenceEnabled bool
shutdownTasks []func() error
@ -212,9 +210,17 @@ func NewMessenger(
c.onSendContactCodeHandler = func(messageSpec *encryption.ProtocolMessageSpec) {
slogger := logger.With(zap.String("site", "onSendContactCodeHandler"))
slogger.Info("received a SendContactCode request")
newMessage, err := messageSpecToWhisper(messageSpec)
if err != nil {
slogger.Warn("failed to convert spec to Whisper message", zap.Error(err))
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err := messenger.adapter.SendContactCode(ctx, messageSpec)
chatName := transport.ContactCodeTopic(&messenger.identity.PublicKey)
_, err = messenger.transport.SendPublic(ctx, &newMessage, chatName)
if err != nil {
slogger.Warn("failed to send a contact code", zap.Error(err))
}
@ -265,40 +271,34 @@ func NewMessenger(
logger,
)
// Initialize data sync.
dataSyncTransport := datasync.NewDataSyncNodeTransport()
dataSyncNode, err := datasyncnode.NewPersistentNode(
processor, err := newMessageProcessor(
identity,
database,
dataSyncTransport,
datasyncpeer.PublicKeyToPeerID(identity.PublicKey),
datasyncnode.BATCH,
datasync.CalculateSendTime,
encryptionProtocol,
t,
logger,
c.featureFlags,
)
if err != nil {
return nil, errors.Wrap(err, "failed to create a persistent datasync node")
return nil, errors.Wrap(err, "failed to create messageProcessor")
}
datasync := datasync.New(dataSyncNode, dataSyncTransport, c.featureFlags.datasync, logger)
adapter := newWhisperAdapter(identity, t, encryptionProtocol, datasync, c.featureFlags, logger)
messenger = &Messenger{
identity: identity,
persistence: &sqlitePersistence{db: database},
adapter: adapter,
transport: t,
encryptor: encryptionProtocol,
ownMessages: make(map[string][]*protocol.Message),
processor: processor,
featureFlags: c.featureFlags,
messagesPersistenceEnabled: c.messagesPersistenceEnabled,
shutdownTasks: []func() error{
database.Close,
adapter.transport.Reset,
func() error { datasync.Stop(); return nil },
t.Reset,
t.Stop,
func() error { processor.Stop(); return nil },
// Currently this often fails, seems like it's safe to ignore them
// https://github.com/uber-go/zap/issues/328
func() error { _ = logger.Sync; return nil },
func() error { adapter.Stop(); return nil },
},
logger: logger,
}
@ -308,9 +308,6 @@ func NewMessenger(
if err := encryptionProtocol.Start(identity); err != nil {
return nil, err
}
if c.featureFlags.datasync {
dataSyncNode.Start(300 * time.Millisecond)
}
logger.Debug("messages persistence", zap.Bool("enabled", c.messagesPersistenceEnabled))
@ -373,7 +370,7 @@ func (m *Messenger) Init() error {
publicKeys = append(publicKeys, publicKey)
}
_, err = m.adapter.transport.InitFilters(publicChatIDs, publicKeys)
_, err = m.transport.InitFilters(publicChatIDs, publicKeys)
return err
}
@ -395,7 +392,21 @@ func (m *Messenger) Shutdown() (err error) {
}
func (m *Messenger) handleSharedSecrets(secrets []*sharedsecret.Secret) ([]*transport.Filter, error) {
return m.adapter.handleSharedSecrets(secrets)
logger := m.logger.With(zap.String("site", "handleSharedSecrets"))
var result []*transport.Filter
for _, secret := range secrets {
logger.Debug("received shared secret", zap.Binary("identity", crypto.FromECDSAPub(secret.Identity)))
fSecret := transport.NegotiatedSecret{
PublicKey: secret.Identity,
Key: secret.Key,
}
filter, err := m.transport.ProcessNegotiatedSecret(fSecret)
if err != nil {
return nil, err
}
result = append(result, filter)
}
return result, nil
}
func (m *Messenger) EnableInstallation(id string) error {
@ -436,18 +447,18 @@ func (m *Messenger) Mailservers() ([]string, error) {
func (m *Messenger) Join(chat Chat) error {
if chat.PublicKey != nil {
return m.adapter.JoinPrivate(chat.PublicKey)
return m.transport.JoinPrivate(chat.PublicKey)
} else if chat.Name != "" {
return m.adapter.JoinPublic(chat.Name)
return m.transport.JoinPublic(chat.Name)
}
return errors.New("chat is neither public nor private")
}
func (m *Messenger) Leave(chat Chat) error {
if chat.PublicKey != nil {
return m.adapter.LeavePrivate(chat.PublicKey)
return m.transport.LeavePrivate(chat.PublicKey)
} else if chat.Name != "" {
return m.adapter.LeavePublic(chat.Name)
return m.transport.LeavePublic(chat.Name)
}
return errors.New("chat is neither public nor private")
}
@ -477,6 +488,8 @@ func (m *Messenger) Contacts() ([]*Contact, error) {
}
func (m *Messenger) Send(ctx context.Context, chat Chat, data []byte) ([]byte, error) {
logger := m.logger.With(zap.String("site", "Send"), zap.String("chatID", chat.ID))
chatID := chat.ID
if chatID == "" {
return nil, ErrChatIDEmpty
@ -487,8 +500,12 @@ func (m *Messenger) Send(ctx context.Context, chat Chat, data []byte) ([]byte, e
return nil, err
}
logger.Debug("last message clock received", zap.Int64("clock", clock))
if chat.PublicKey != nil {
hash, message, err := m.adapter.SendPrivate(ctx, chat.PublicKey, chat.ID, data, clock)
logger.Debug("sending private message", zap.Binary("publicKey", crypto.FromECDSAPub(chat.PublicKey)))
hash, message, err := m.processor.SendPrivate(ctx, chat.PublicKey, chat.ID, data, clock)
if err != nil {
return nil, err
}
@ -498,18 +515,19 @@ func (m *Messenger) Send(ctx context.Context, chat Chat, data []byte) ([]byte, e
message.SigPubKey = &m.identity.PublicKey
if m.messagesPersistenceEnabled {
_, err = m.persistence.SaveMessages(chat.ID, []*protocol.Message{message})
_, err = m.persistence.SaveMessages([]*protocol.Message{message})
if err != nil {
return nil, err
}
}
// Cache it to be returned in Retrieve().
m.ownMessages[chatID] = append(m.ownMessages[chatID], message)
m.ownMessages = append(m.ownMessages, message)
return hash, nil
} else if chat.Name != "" {
return m.adapter.SendPublic(ctx, chat.Name, chat.ID, data, clock)
logger.Debug("sending public message", zap.String("chatName", chat.Name))
return m.processor.SendPublic(ctx, chat.Name, chat.ID, data, clock)
}
return nil, errors.New("chat is neither public nor private")
}
@ -518,9 +536,9 @@ func (m *Messenger) Send(ctx context.Context, chat Chat, data []byte) ([]byte, e
// DEPRECATED
func (m *Messenger) SendRaw(ctx context.Context, chat Chat, data []byte) ([]byte, error) {
if chat.PublicKey != nil {
return m.adapter.SendPrivateRaw(ctx, chat.PublicKey, data)
return m.processor.SendPrivateRaw(ctx, chat.PublicKey, data)
} else if chat.Name != "" {
return m.adapter.SendPublicRaw(ctx, chat.Name, data)
return m.processor.SendPublicRaw(ctx, chat.Name, data)
}
return nil, errors.New("chat is neither public nor private")
}
@ -538,129 +556,96 @@ var (
)
// RetrieveAll retrieves all previously fetched messages
func (m *Messenger) RetrieveAll(ctx context.Context, c RetrieveConfig) (allMessages []*protocol.Message, err error) {
latest, err := m.adapter.RetrieveAllMessages()
func (m *Messenger) RetrieveAll(ctx context.Context, c RetrieveConfig) ([]*protocol.Message, error) {
latest, err := m.transport.RetrieveAllMessages()
if err != nil {
err = errors.Wrap(err, "failed to retrieve messages")
return
return nil, errors.Wrap(err, "failed to retrieve messages")
}
for _, messages := range latest {
chatID := messages.ChatID
logger := m.logger.With(zap.String("site", "RetrieveAll"))
logger.Debug("retrieved messages grouped by chat", zap.Int("count", len(latest)))
_, err = m.persistence.SaveMessages(chatID, messages.Messages)
var result []*protocol.Message
for _, chat := range latest {
logger.Debug("processing chat", zap.String("chatID", chat.ChatID))
protoMessages, err := m.processor.Process(chat.Messages)
if err != nil {
return nil, errors.Wrap(err, "failed to save messages")
return nil, err
}
if !messages.Public {
// Return any own messages for this chat as well.
if ownMessages, ok := m.ownMessages[chatID]; ok {
messages.Messages = append(messages.Messages, ownMessages...)
}
}
retrievedMessages, err := m.retrieveSaved(ctx, chatID, c, messages.Messages)
if err != nil {
return nil, errors.Wrap(err, "failed to get saved messages")
}
allMessages = append(allMessages, retrievedMessages...)
}
// Delete own messages as they were added to the result.
for _, messages := range latest {
if !messages.Public {
delete(m.ownMessages, messages.ChatID)
}
}
return
}
func (m *Messenger) Retrieve(ctx context.Context, chat Chat, c RetrieveConfig) (messages []*protocol.Message, err error) {
var (
latest []*protocol.Message
ownLatest []*protocol.Message
)
if chat.PublicKey != nil {
latest, err = m.adapter.RetrievePrivateMessages(chat.PublicKey)
// Return any own messages for this chat as well.
if ownMessages, ok := m.ownMessages[chat.ID]; ok {
ownLatest = ownMessages
}
} else if chat.Name != "" {
latest, err = m.adapter.RetrievePublicMessages(chat.Name)
} else {
return nil, errors.New("chat is neither public nor private")
result = append(result, protoMessages...)
}
_, err = m.persistence.SaveMessages(result)
if err != nil {
err = errors.Wrap(err, "failed to retrieve messages")
return
return nil, errors.Wrap(err, "failed to save messages")
}
if m.messagesPersistenceEnabled {
_, err = m.persistence.SaveMessages(chat.ID, latest)
if err != nil {
return nil, errors.Wrap(err, "failed to save latest messages")
}
}
// Confirm received and decrypted messages.
if m.messagesPersistenceEnabled && chat.PublicKey != nil {
for _, message := range latest {
// Confirm received and decrypted messages.
if err := m.encryptor.ConfirmMessageProcessed(message.ID); err != nil {
return nil, errors.Wrap(err, "failed to confirm message being processed")
}
}
}
// We may need to add more messages from the past.
result, err := m.retrieveSaved(ctx, chat.ID, c, append(latest, ownLatest...))
retrievedMessages, err := m.retrieveSaved(ctx, c)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to get saved messages")
}
result = append(result, retrievedMessages...)
// When our messages are returned, we can delete them.
delete(m.ownMessages, chat.ID)
// Include own messages.
result = append(result, m.ownMessages...)
m.ownMessages = nil
return result, nil
}
func (m *Messenger) retrieveSaved(ctx context.Context, chatID string, c RetrieveConfig, latest []*protocol.Message) (messages []*protocol.Message, err error) {
func (m *Messenger) retrieveSaved(ctx context.Context, c RetrieveConfig) (messages []*protocol.Message, err error) {
if !m.messagesPersistenceEnabled {
return latest, nil
return nil, nil
}
if !c.latest {
return m.persistence.Messages(chatID, c.From, c.To)
return m.persistence.Messages(c.From, c.To)
}
if c.last24Hours {
to := time.Now()
from := to.Add(-time.Hour * 24)
return m.persistence.Messages(chatID, from, to)
return m.persistence.Messages(from, to)
}
return latest, nil
return nil, nil
}
// DEPRECATED
func (m *Messenger) RetrieveRawAll() (map[transport.Filter][]*protocol.StatusMessage, error) {
return m.adapter.RetrieveRawAll()
chatWithMessages, err := m.transport.RetrieveRawAll()
if err != nil {
return nil, err
}
logger := m.logger.With(zap.String("site", "RetrieveRawAll"))
result := make(map[transport.Filter][]*protocol.StatusMessage)
for chat, messages := range chatWithMessages {
for _, message := range messages {
shhMessage := whisper.ToWhisperMessage(message)
// TODO: fix this to use an exported method.
statusMessages, err := m.processor.handleMessages(shhMessage, false)
if err != nil {
logger.Info("failed to decode messages", zap.Error(err))
continue
}
result[chat] = append(result[chat], statusMessages...)
}
}
return result, nil
}
// DEPRECATED
func (m *Messenger) LoadFilters(filters []*transport.Filter) ([]*transport.Filter, error) {
return m.adapter.transport.LoadFilters(filters)
return m.transport.LoadFilters(filters)
}
// DEPRECATED
func (m *Messenger) RemoveFilters(filters []*transport.Filter) error {
return m.adapter.transport.RemoveFilters(filters)
return m.transport.RemoveFilters(filters)
}
// DEPRECATED

View File

@ -10,14 +10,11 @@ import (
"time"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/status-im/status-protocol-go/tt"
whisper "github.com/status-im/whisper/whisperv6"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/crypto"
whisper "github.com/status-im/whisper/whisperv6"
)
func TestMessengerSuite(t *testing.T) {
@ -196,7 +193,7 @@ func (s *MessengerSuite) TestInit() {
tc.Prep()
err := s.m.Init()
s.Require().NoError(err)
filters := s.m.adapter.transport.Filters()
filters := s.m.transport.Filters()
expectedFilters += tc.AddedFilters
s.Equal(expectedFilters, len(filters))
})
@ -225,12 +222,12 @@ func (s *MessengerSuite) TestRetrievePublic() {
time.Sleep(time.Millisecond * 500)
// Retrieve chat
messages, err := s.m.Retrieve(context.Background(), chat, RetrieveLatest)
messages, err := s.m.RetrieveAll(context.Background(), RetrieveLatest)
s.NoError(err)
s.Len(messages, 1)
// Retrieve again to test skipping already existing err.
messages, err = s.m.Retrieve(context.Background(), chat, RetrieveLastDay)
messages, err = s.m.RetrieveAll(context.Background(), RetrieveLastDay)
s.NoError(err)
s.Require().Len(messages, 1)
@ -252,12 +249,12 @@ func (s *MessengerSuite) TestRetrievePrivate() {
time.Sleep(time.Millisecond * 500)
// Retrieve chat
messages, err := s.m.Retrieve(context.Background(), chat, RetrieveLatest)
messages, err := s.m.RetrieveAll(context.Background(), RetrieveLatest)
s.NoError(err)
s.Len(messages, 1)
// Retrieve again to test skipping already existing err.
messages, err = s.m.Retrieve(context.Background(), chat, RetrieveLastDay)
messages, err = s.m.RetrieveAll(context.Background(), RetrieveLastDay)
s.NoError(err)
s.Len(messages, 1)

View File

@ -335,19 +335,30 @@ func (db sqlitePersistence) SaveContact(contact Contact, tx *sql.Tx) error {
}
// Messages returns messages for a given contact, in a given period. Ordered by a timestamp.
func (db sqlitePersistence) Messages(chatID string, from, to time.Time) (result []*protocol.Message, err error) {
func (db sqlitePersistence) Messages(from, to time.Time) (result []*protocol.Message, err error) {
rows, err := db.db.Query(`SELECT
id, content_type, message_type, text, clock, timestamp, content_chat_id, content_text, public_key, flags
FROM user_messages WHERE chat_id = ? AND timestamp >= ? AND timestamp <= ? ORDER BY timestamp`,
chatID, protocol.TimestampInMsFromTime(from), protocol.TimestampInMsFromTime(to))
id,
content_type,
message_type,
text,
clock,
timestamp,
content_chat_id,
content_text,
public_key,
flags
FROM user_messages
WHERE timestamp >= ? AND timestamp <= ?
ORDER BY timestamp`,
protocol.TimestampInMsFromTime(from),
protocol.TimestampInMsFromTime(to),
)
if err != nil {
return nil, err
}
defer rows.Close()
var (
rst = []*protocol.Message{}
)
var rst []*protocol.Message
for rows.Next() {
msg := protocol.Message{
Content: protocol.Content{},
@ -458,7 +469,7 @@ func (db sqlitePersistence) UnreadMessages(chatID string) ([]*protocol.Message,
return result, nil
}
func (db sqlitePersistence) SaveMessages(chatID string, messages []*protocol.Message) (last int64, err error) {
func (db sqlitePersistence) SaveMessages(messages []*protocol.Message) (last int64, err error) {
var (
tx *sql.Tx
stmt *sql.Stmt
@ -491,7 +502,7 @@ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`)
pkey, err = marshalECDSAPub(msg.SigPubKey)
}
rst, err = stmt.Exec(
msg.ID, chatID, msg.ContentT, msg.MessageT, msg.Text,
msg.ID, msg.ChatID, msg.ContentT, msg.MessageT, msg.Text,
msg.Clock, msg.Timestamp, msg.Content.ChatID, msg.Content.Text,
pkey, msg.Flags)
if err != nil {

View File

@ -167,7 +167,11 @@ func (a *WhisperServiceTransport) LeavePublic(chatID string) error {
}
func (a *WhisperServiceTransport) JoinPrivate(publicKey *ecdsa.PublicKey) error {
_, err := a.filters.LoadContactCode(publicKey)
_, err := a.filters.LoadDiscovery()
if err != nil {
return err
}
_, err = a.filters.LoadContactCode(publicKey)
return err
}
@ -191,10 +195,11 @@ func (a *WhisperServiceTransport) RetrieveAllMessages() ([]ChatMessages, error)
return nil, errors.New("failed to return a filter")
}
messages := chatMessages[filter.ChatID]
messages.ChatID = filter.ChatID
messages.Public = filter.IsPublic()
messages.Messages = append(messages.Messages, f.Retrieve()...)
ch := chatMessages[filter.ChatID]
ch.ChatID = filter.ChatID
ch.Public = filter.IsPublic()
ch.Messages = append(ch.Messages, f.Retrieve()...)
chatMessages[filter.ChatID] = ch
}
var result []ChatMessages
@ -354,10 +359,11 @@ func (a *WhisperServiceTransport) Track(identifiers [][]byte, hash []byte, newMe
}
}
func (a *WhisperServiceTransport) Stop() {
func (a *WhisperServiceTransport) Stop() error {
if a.envelopesMonitor != nil {
a.envelopesMonitor.Stop()
}
return nil
}
// MessagesRequest is a RequestMessages() request payload.

View File

@ -2,9 +2,10 @@ package statusproto
import (
"crypto/ecdsa"
"github.com/pkg/errors"
"log"
"github.com/pkg/errors"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/golang/protobuf/proto"
@ -94,12 +95,17 @@ func (m *StatusMessage) HandleEncryption(myKey *ecdsa.PrivateKey, senderKey *ecd
return nil
}
// HandleDatasync processes StatusMessage through data sync layer.
// This is optional and DataSync might be nil. In such a case,
// only one payload will be returned equal to DecryptedPayload.
func (m *StatusMessage) HandleDatasync(datasync *datasync.DataSync) ([]*StatusMessage, error) {
var statusMessages []*StatusMessage
payloads := datasync.Handle(
m.SigPubKey(),
m.DecryptedPayload,
)
for _, payload := range payloads {
message, err := m.Clone()
if err != nil {