Upgrade status-protocol-go (#1593)

This commit is contained in:
Andrea Maria Piana 2019-09-02 11:29:06 +02:00 committed by GitHub
parent 3dbcb6a19e
commit 1a47893e75
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 658 additions and 776 deletions

2
go.mod
View File

@ -21,7 +21,7 @@ require (
github.com/status-im/doubleratchet v2.0.0+incompatible
github.com/status-im/migrate/v4 v4.0.0-20190821140204-a9d340ec8fb76af4afda06acf01740d45d2661ed
github.com/status-im/rendezvous v1.3.0
github.com/status-im/status-protocol-go v0.0.0-20190701094942-822d18916e417b768a13f85601a7b10e02236655
github.com/status-im/status-protocol-go v0.0.0-20190701094942-8a6d5a6b49ee40f22ffac18377835e4a0fd7032e
github.com/status-im/whisper v1.4.14
github.com/stretchr/testify v1.3.1-0.20190712000136-221dbe5ed467
github.com/syndtr/goleveldb v1.0.0

10
go.sum
View File

@ -53,6 +53,8 @@ github.com/btcsuite/snappy-go v0.0.0-20151229074030-0bdef8d06723/go.mod h1:8woku
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792 h1:R8vQdOQdZ9Y3SkEwmHoWBmX1DNXhXZqlTpq6s4tyJGc=
github.com/btcsuite/websocket v0.0.0-20150119174127-31079b680792/go.mod h1:ghJtEyQwv5/p4Mg4C0fgbePVuGr935/5ddU9Z3TmDRY=
github.com/btcsuite/winsvc v1.0.0/go.mod h1:jsenWakMcC0zFBFurPLEAyrnc/teJEM1O46fmI40EZs=
github.com/cenkalti/backoff/v3 v3.0.0 h1:ske+9nBpD9qZsTBoF41nW5L+AIuFBKMeze18XQ3eG1c=
github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/cespare/cp v0.1.0 h1:SE+dxFebS7Iik5LK0tsi1k9ZCxEaFX4AjQmoyA+1dJk=
github.com/cespare/cp v0.1.0/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
@ -450,8 +452,8 @@ github.com/status-im/migrate/v4 v4.0.0-20190821140204-a9d340ec8fb76af4afda06acf0
github.com/status-im/migrate/v4 v4.0.0-20190821140204-a9d340ec8fb76af4afda06acf01740d45d2661ed/go.mod h1:r8HggRBZ/k7TRwByq/Hp3P/ubFppIna0nvyavVK0pjA=
github.com/status-im/rendezvous v1.3.0 h1:7RK/MXXW+tlm0asKm1u7Qp7Yni6AO29a7j8+E4Lbjg4=
github.com/status-im/rendezvous v1.3.0/go.mod h1:+hzjuP+j/XzLPeF6E50b88pWOTLdTcwjvNYt+Gh1W1s=
github.com/status-im/status-protocol-go v0.0.0-20190701094942-822d18916e417b768a13f85601a7b10e02236655 h1:7lJRfkt9fzt+wpL+1rgfhvFThs2yP8VJm3BHGbgfWsg=
github.com/status-im/status-protocol-go v0.0.0-20190701094942-822d18916e417b768a13f85601a7b10e02236655/go.mod h1:zEvtd2lNRzsqo4RCLU6WlK0EKFTGEoAAuYfj+lZjHNQ=
github.com/status-im/status-protocol-go v0.0.0-20190701094942-8a6d5a6b49ee40f22ffac18377835e4a0fd7032e h1:D17xbTZlT2Wdrylvcu+gTLryFYzOeq2QAzKgxmCmXIE=
github.com/status-im/status-protocol-go v0.0.0-20190701094942-8a6d5a6b49ee40f22ffac18377835e4a0fd7032e/go.mod h1:9cLBinLjWaeYCMWBq4TBMf4P/bp2aFQDwzS/9uhHf6I=
github.com/status-im/whisper v1.4.14 h1:9VHqx4+PUYfhDnYYtDxHkg/3cfVvkHjPNciY4LO83yc=
github.com/status-im/whisper v1.4.14/go.mod h1:WS6z39YJQ8WJa9s+DmTuEM/s2nVF6Iz3B1SZYw5cYf0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@ -469,8 +471,8 @@ github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpP
github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/uber/jaeger-client-go v0.0.0-20180607151842-f7e0d4744fa6/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v0.0.0-20180615202729-a51202d6f4a7/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/vacp2p/mvds v0.0.0-20190705123435-943ab5e228078f6389dffe00946c0d0a5b495a5c h1:O7gT6vNipoBxFe19iWtDyUjIcIbkJ5MYhMXLS+RRCFs=
github.com/vacp2p/mvds v0.0.0-20190705123435-943ab5e228078f6389dffe00946c0d0a5b495a5c/go.mod h1:pIqr2Hg4cIkTJniGPCp4ptong2jxgxx6uToVoY94+II=
github.com/vacp2p/mvds v0.0.21 h1:YeYja8noKsHvrnOHM4pqjBvDwwy+kUzXMkX1IBYJAbU=
github.com/vacp2p/mvds v0.0.21/go.mod h1:pIqr2Hg4cIkTJniGPCp4ptong2jxgxx6uToVoY94+II=
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc=
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc h1:9lDbC6Rz4bwmou+oE6Dt4Cb2BGMur5eR/GYptkKUVHo=
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc/go.mod h1:bopw91TMyo8J3tvftk8xmU2kPmlrt4nScJQZU2hE5EM=

View File

@ -41,3 +41,18 @@ install-dev:
generate:
go generate ./...
.PHONY: generate
new-migration:
@if [ -z "$$DIR" ]; then \
echo 'missing DIR var'; \
exit 1; \
fi
@if [ -z "$$NAME" ]; then \
echo 'missing NAME var'; \
exit 1; \
fi
mkdir -p $(DIR)
touch $(DIR)/`date +"%s"`_$(NAME).down.sql ./$(DIR)/`date +"%s"`_$(NAME).up.sql
.PHONY: create-migration

View File

@ -1,620 +0,0 @@
package statusproto
import (
"context"
"crypto/ecdsa"
"time"
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/crypto"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
"github.com/status-im/status-protocol-go/encryption/sharedsecret"
whisper "github.com/status-im/whisper/whisperv6"
"github.com/status-im/status-protocol-go/encryption"
"github.com/status-im/status-protocol-go/encryption/multidevice"
transport "github.com/status-im/status-protocol-go/transport/whisper"
protocol "github.com/status-im/status-protocol-go/v1"
"github.com/status-im/status-protocol-go/datasync"
datasyncpeer "github.com/status-im/status-protocol-go/datasync/peer"
datasyncproto "github.com/vacp2p/mvds/protobuf"
)
// Whisper message properties.
const (
whisperTTL = 15
whisperPoW = 0.002
whisperPoWTime = 5
)
// whisperAdapter is a bridge between encryption and transport
// layers.
type whisperAdapter struct {
privateKey *ecdsa.PrivateKey
transport *transport.WhisperServiceTransport
protocol *encryption.Protocol
datasync *datasync.DataSync
logger *zap.Logger
featureFlags featureFlags
}
func newWhisperAdapter(
pk *ecdsa.PrivateKey,
t *transport.WhisperServiceTransport,
p *encryption.Protocol,
d *datasync.DataSync,
featureFlags featureFlags,
logger *zap.Logger,
) *whisperAdapter {
if logger == nil {
logger = zap.NewNop()
}
adapter := &whisperAdapter{
privateKey: pk,
transport: t,
protocol: p,
datasync: d,
featureFlags: featureFlags,
logger: logger.With(zap.Namespace("whisperAdapter")),
}
if featureFlags.datasync {
// We pass our encryption/transport handling to the datasync
// so it's correctly encrypted.
d.Init(adapter.sendDataSync)
}
return adapter
}
func (a *whisperAdapter) JoinPublic(chatID string) error {
return a.transport.JoinPublic(chatID)
}
func (a *whisperAdapter) LeavePublic(chatID string) error {
return a.transport.LeavePublic(chatID)
}
func (a *whisperAdapter) JoinPrivate(publicKey *ecdsa.PublicKey) error {
return a.transport.JoinPrivate(publicKey)
}
func (a *whisperAdapter) LeavePrivate(publicKey *ecdsa.PublicKey) error {
return a.transport.LeavePrivate(publicKey)
}
type ChatMessages struct {
Messages []*protocol.Message
Public bool
ChatID string
}
func (a *whisperAdapter) RetrieveAllMessages() ([]ChatMessages, error) {
chatMessages, err := a.transport.RetrieveAllMessages()
if err != nil {
return nil, err
}
var result []ChatMessages
for _, messages := range chatMessages {
protoMessages, err := a.handleRetrievedMessages(messages.Messages)
if err != nil {
return nil, err
}
result = append(result, ChatMessages{
Messages: protoMessages,
Public: messages.Public,
ChatID: messages.ChatID,
})
}
return result, nil
}
// RetrievePublicMessages retrieves the collected public messages.
// It implies joining a chat if it has not been joined yet.
func (a *whisperAdapter) RetrievePublicMessages(chatID string) ([]*protocol.Message, error) {
messages, err := a.transport.RetrievePublicMessages(chatID)
if err != nil {
return nil, err
}
return a.handleRetrievedMessages(messages)
}
// RetrievePrivateMessages retrieves the collected private messages.
// It implies joining a chat if it has not been joined yet.
func (a *whisperAdapter) RetrievePrivateMessages(publicKey *ecdsa.PublicKey) ([]*protocol.Message, error) {
messages, err := a.transport.RetrievePrivateMessages(publicKey)
if err != nil {
return nil, err
}
return a.handleRetrievedMessages(messages)
}
func (a *whisperAdapter) handleRetrievedMessages(messages []*whisper.ReceivedMessage) ([]*protocol.Message, error) {
logger := a.logger.With(zap.String("site", "handleRetrievedMessages"))
decodedMessages := make([]*protocol.Message, 0, len(messages))
for _, item := range messages {
shhMessage := whisper.ToWhisperMessage(item)
hlogger := logger.With(zap.Binary("hash", shhMessage.Hash))
hlogger.Debug("handling a received message")
statusMessages, err := a.handleMessages(shhMessage, true)
if err != nil {
hlogger.Info("failed to decode messages", zap.Error(err))
continue
}
for _, statusMessage := range statusMessages {
switch m := statusMessage.ParsedMessage.(type) {
case protocol.Message:
m.ID = statusMessage.ID
m.SigPubKey = statusMessage.SigPubKey()
decodedMessages = append(decodedMessages, &m)
case protocol.PairMessage:
fromOurDevice := isPubKeyEqual(statusMessage.SigPubKey(), &a.privateKey.PublicKey)
if !fromOurDevice {
hlogger.Debug("received PairMessage from not our device, skipping")
break
}
metadata := &multidevice.InstallationMetadata{
Name: m.Name,
FCMToken: m.FCMToken,
DeviceType: m.DeviceType,
}
err := a.protocol.SetInstallationMetadata(&a.privateKey.PublicKey, m.InstallationID, metadata)
if err != nil {
return nil, err
}
default:
hlogger.Error("skipped a public message of unsupported type")
}
}
}
return decodedMessages, nil
}
// DEPRECATED
func (a *whisperAdapter) RetrieveRawAll() (map[transport.Filter][]*protocol.StatusMessage, error) {
chatWithMessages, err := a.transport.RetrieveRawAll()
if err != nil {
return nil, err
}
logger := a.logger.With(zap.String("site", "RetrieveRawAll"))
result := make(map[transport.Filter][]*protocol.StatusMessage)
for chat, messages := range chatWithMessages {
for _, message := range messages {
shhMessage := whisper.ToWhisperMessage(message)
statusMessages, err := a.handleMessages(shhMessage, false)
if err != nil {
logger.Info("failed to decode messages", zap.Error(err))
continue
}
result[chat] = append(result[chat], statusMessages...)
}
}
return result, nil
}
// handleMessages expects a whisper message as input, and it will go through
// a series of transformations until the message is parsed into an application
// layer message, or in case of Raw methods, the processing stops at the layer
// before
func (a *whisperAdapter) handleMessages(shhMessage *whisper.Message, applicationLayer bool) ([]*protocol.StatusMessage, error) {
logger := a.logger.With(zap.String("site", "handleMessages"))
hlogger := logger.With(zap.Binary("hash", shhMessage.Hash))
var statusMessage protocol.StatusMessage
err := statusMessage.HandleTransport(shhMessage)
if err != nil {
hlogger.Error("failed to handle transport layer message", zap.Error(err))
return nil, err
}
err = a.handleEncryptionLayer(context.Background(), &statusMessage)
if err != nil {
hlogger.Debug("failed to handle an encryption message", zap.Error(err))
}
statusMessages, err := statusMessage.HandleDatasync(a.datasync)
if err != nil {
hlogger.Debug("failed to handle datasync message", zap.Error(err))
}
for _, statusMessage := range statusMessages {
err := statusMessage.HandleApplicationMetadata()
if err != nil {
hlogger.Error("failed to handle application metadata layer message", zap.Error(err))
}
if applicationLayer {
err = statusMessage.HandleApplication()
if err != nil {
hlogger.Error("failed to handle application layer message")
}
}
}
return statusMessages, nil
}
func (a *whisperAdapter) handleEncryptionLayer(ctx context.Context, message *protocol.StatusMessage) error {
publicKey := message.SigPubKey()
logger := a.logger.With(zap.String("site", "handleEncryptionLayer"))
err := message.HandleEncryption(a.privateKey, publicKey, a.protocol)
if err == encryption.ErrDeviceNotFound {
handleErr := a.handleErrDeviceNotFound(ctx, publicKey)
if handleErr != nil {
logger.Error("failed to handle error", zap.Error(err), zap.NamedError("handleErr", handleErr))
}
}
if err != nil {
return errors.Wrap(err, "failed to process an encrypted message")
}
return nil
}
func (a *whisperAdapter) handleErrDeviceNotFound(ctx context.Context, publicKey *ecdsa.PublicKey) error {
now := time.Now().Unix()
advertise, err := a.protocol.ShouldAdvertiseBundle(publicKey, now)
if err != nil {
return err
}
if !advertise {
return nil
}
messageSpec, err := a.protocol.BuildBundleAdvertiseMessage(a.privateKey, publicKey)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
_, _, err = a.sendMessageSpec(ctx, publicKey, messageSpec)
if err != nil {
return err
}
a.protocol.ConfirmBundleAdvertisement(publicKey, now)
return nil
}
// SendPublic sends a public message passing chat name to the transport layer.
//
// Be aware that this method returns a message ID using protocol.MessageID
// instead of Whisper message hash.
func (a *whisperAdapter) SendPublic(ctx context.Context, chatName, chatID string, data []byte, clock int64) ([]byte, error) {
logger := a.logger.With(zap.String("site", "SendPublic"))
logger.Debug("sending a public message", zap.String("chat-name", chatName))
message := protocol.CreatePublicTextMessage(data, clock, chatName)
encodedMessage, err := a.encodeMessage(message)
if err != nil {
return nil, errors.Wrap(err, "failed to encode message")
}
wrappedMessage, err := a.tryWrapMessageV1(encodedMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
messageSpec, err := a.protocol.BuildPublicMessage(a.privateKey, wrappedMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to build public message")
}
newMessage, err := a.messageSpecToWhisper(messageSpec)
if err != nil {
return nil, err
}
hash, err := a.transport.SendPublic(ctx, &newMessage, chatName)
if err != nil {
return nil, err
}
messageID := protocol.MessageID(&a.privateKey.PublicKey, wrappedMessage)
a.transport.Track([][]byte{messageID}, hash, newMessage)
return messageID, nil
}
// SendPublicRaw takes encoded data, encrypts it and sends through the wire.
// DEPRECATED
func (a *whisperAdapter) SendPublicRaw(ctx context.Context, chatName string, data []byte) ([]byte, error) {
var newMessage whisper.NewMessage
wrappedMessage, err := a.tryWrapMessageV1(data)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
newMessage = whisper.NewMessage{
TTL: whisperTTL,
Payload: wrappedMessage,
PowTarget: whisperPoW,
PowTime: whisperPoWTime,
}
hash, err := a.transport.SendPublic(ctx, &newMessage, chatName)
if err != nil {
return nil, err
}
messageID := protocol.MessageID(&a.privateKey.PublicKey, wrappedMessage)
a.transport.Track([][]byte{messageID}, hash, newMessage)
return messageID, nil
}
func (a *whisperAdapter) SendContactCode(ctx context.Context, messageSpec *encryption.ProtocolMessageSpec) ([]byte, error) {
newMessage, err := a.messageSpecToWhisper(messageSpec)
if err != nil {
return nil, err
}
return a.transport.SendPublic(ctx, &newMessage, transport.ContactCodeTopic(&a.privateKey.PublicKey))
}
func (a *whisperAdapter) tryWrapMessageV1(encodedMessage []byte) ([]byte, error) {
if a.featureFlags.sendV1Messages {
wrappedMessage, err := protocol.WrapMessageV1(encodedMessage, a.privateKey)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
return wrappedMessage, nil
}
return encodedMessage, nil
}
func (a *whisperAdapter) encodeMessage(message protocol.Message) ([]byte, error) {
encodedMessage, err := protocol.EncodeMessage(message)
if err != nil {
return nil, errors.Wrap(err, "failed to encode message")
}
return encodedMessage, nil
}
// SendPrivate sends a one-to-one message. It needs to return it
// because the registered Whisper filter handles only incoming messages
// and our own messages need to be handled manually.
//
// This might be not true if a shared secret is used because it relies on
// symmetric encryption.
//
// Be aware that this method returns a message ID using protocol.MessageID
// instead of Whisper message hash.
func (a *whisperAdapter) SendPrivate(
ctx context.Context,
publicKey *ecdsa.PublicKey,
chatID string,
data []byte,
clock int64,
) ([]byte, *protocol.Message, error) {
logger := a.logger.With(zap.String("site", "SendPrivate"))
logger.Debug("sending a private message", zap.Binary("public-key", crypto.FromECDSAPub(publicKey)))
message := protocol.CreatePrivateTextMessage(data, clock, chatID)
encodedMessage, err := a.encodeMessage(message)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to encode message")
}
wrappedMessage, err := a.tryWrapMessageV1(encodedMessage)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to wrap message")
}
if a.featureFlags.datasync {
if err := a.sendWithDataSync(publicKey, wrappedMessage); err != nil {
return nil, nil, errors.Wrap(err, "failed to send message with datasync")
}
} else {
err = a.encryptAndSend(ctx, publicKey, wrappedMessage)
if err != nil {
return nil, nil, err
}
}
return protocol.MessageID(&a.privateKey.PublicKey, wrappedMessage), &message, nil
}
func (a *whisperAdapter) sendWithDataSync(publicKey *ecdsa.PublicKey, message []byte) error {
groupID := datasync.ToOneToOneGroupID(&a.privateKey.PublicKey, publicKey)
peerID := datasyncpeer.PublicKeyToPeerID(*publicKey)
exist, err := a.datasync.IsPeerInGroup(groupID, peerID)
if err != nil {
return errors.Wrap(err, "failed to check if peer is in group")
}
if !exist {
if err := a.datasync.AddPeer(groupID, peerID); err != nil {
return errors.Wrap(err, "failed to add peer")
}
}
_, err = a.datasync.AppendMessage(groupID, message)
if err != nil {
return errors.Wrap(err, "failed to append message to datasync")
}
return nil
}
// SendPrivateRaw takes encoded data, encrypts it and sends through the wire.
// DEPRECATED
func (a *whisperAdapter) SendPrivateRaw(
ctx context.Context,
publicKey *ecdsa.PublicKey,
data []byte,
) ([]byte, error) {
a.logger.Debug(
"sending a private message",
zap.Binary("public-key", crypto.FromECDSAPub(publicKey)),
zap.String("site", "SendPrivateRaw"),
)
wrappedMessage, err := a.tryWrapMessageV1(data)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
messageSpec, err := a.protocol.BuildDirectMessage(a.privateKey, publicKey, wrappedMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to encrypt message")
}
messageID := protocol.MessageID(&a.privateKey.PublicKey, wrappedMessage)
if a.featureFlags.datasync {
if err := a.sendWithDataSync(publicKey, wrappedMessage); err != nil {
return nil, errors.Wrap(err, "failed to send message with datasync")
}
return messageID, nil
}
hash, newMessage, err := a.sendMessageSpec(ctx, publicKey, messageSpec)
a.transport.Track([][]byte{messageID}, hash, *newMessage)
return messageID, err
}
func (a *whisperAdapter) sendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec) ([]byte, *whisper.NewMessage, error) {
var err error
var hash []byte
newMessage, err := a.messageSpecToWhisper(messageSpec)
if err != nil {
return nil, nil, err
}
logger := a.logger.With(zap.String("site", "sendMessageSpec"))
switch {
case messageSpec.SharedSecret != nil:
logger.Debug("sending using shared secret")
hash, err = a.transport.SendPrivateWithSharedSecret(ctx, &newMessage, publicKey, messageSpec.SharedSecret)
case messageSpec.PartitionedTopicMode() == encryption.PartitionTopicV1:
logger.Debug("sending partitioned topic")
hash, err = a.transport.SendPrivateWithPartitioned(ctx, &newMessage, publicKey)
case !a.featureFlags.genericDiscoveryTopicEnabled:
logger.Debug("sending partitioned topic (generic discovery topic disabled)")
hash, err = a.transport.SendPrivateWithPartitioned(ctx, &newMessage, publicKey)
default:
logger.Debug("sending using discovery topic")
hash, err = a.transport.SendPrivateOnDiscovery(ctx, &newMessage, publicKey)
}
if err != nil {
return nil, nil, err
}
return hash, &newMessage, nil
}
func (a *whisperAdapter) sendDataSync(ctx context.Context, publicKey *ecdsa.PublicKey, encodedMessage []byte, datasyncPayload *datasyncproto.Payload) error {
var messageIDs [][]byte
for _, payload := range datasyncPayload.Messages {
messageIDs = append(messageIDs, protocol.MessageID(&a.privateKey.PublicKey, payload.Body))
}
messageSpec, err := a.protocol.BuildDirectMessage(a.privateKey, publicKey, encodedMessage)
if err != nil {
return errors.Wrap(err, "failed to encrypt message")
}
hash, newMessage, err := a.sendMessageSpec(ctx, publicKey, messageSpec)
if err != nil {
return err
}
a.transport.Track(messageIDs, hash, *newMessage)
return nil
}
func (a *whisperAdapter) encryptAndSend(ctx context.Context, publicKey *ecdsa.PublicKey, encodedMessage []byte) error {
messageID := protocol.MessageID(&a.privateKey.PublicKey, encodedMessage)
messageSpec, err := a.protocol.BuildDirectMessage(a.privateKey, publicKey, encodedMessage)
if err != nil {
return errors.Wrap(err, "failed to encrypt message")
}
hash, newMessage, err := a.sendMessageSpec(ctx, publicKey, messageSpec)
if err != nil {
return err
}
a.transport.Track([][]byte{messageID}, hash, *newMessage)
return nil
}
func (a *whisperAdapter) messageSpecToWhisper(spec *encryption.ProtocolMessageSpec) (whisper.NewMessage, error) {
var newMessage whisper.NewMessage
payload, err := proto.Marshal(spec.Message)
if err != nil {
return newMessage, err
}
newMessage = whisper.NewMessage{
TTL: whisperTTL,
Payload: payload,
PowTarget: whisperPoW,
PowTime: whisperPoWTime,
}
return newMessage, nil
}
func (a *whisperAdapter) handleSharedSecrets(secrets []*sharedsecret.Secret) ([]*transport.Filter, error) {
logger := a.logger.With(zap.String("site", "handleSharedSecrets"))
var filters []*transport.Filter
for _, secret := range secrets {
logger.Debug("received shared secret", zap.Binary("identity", crypto.FromECDSAPub(secret.Identity)))
fSecret := transport.NegotiatedSecret{
PublicKey: secret.Identity,
Key: secret.Key,
}
filter, err := a.transport.ProcessNegotiatedSecret(fSecret)
if err != nil {
return nil, err
}
filters = append(filters, filter)
}
return filters, nil
}
func (a *whisperAdapter) Stop() {
a.transport.Stop()
}
// isPubKeyEqual checks that two public keys are equal
func isPubKeyEqual(a, b *ecdsa.PublicKey) bool {
// the curve is always the same, just compare the points
return a.X.Cmp(b.X) == 0 && a.Y.Cmp(b.Y) == 0
}

View File

@ -47,6 +47,7 @@ type Chat struct {
UnviewedMessagesCount uint `json:"unviewedMessagesCount"`
LastMessageContentType string `json:"lastMessageContentType"`
LastMessageContent string `json:"lastMessageContent"`
LastMessageTimestamp int64 `json:"lastMessageTimestamp"`
// Group chat fields
// Members are the members who have been invited to the group chat

View File

@ -2,6 +2,7 @@ package datasync
import (
"crypto/ecdsa"
"github.com/golang/protobuf/proto"
datasyncpeer "github.com/status-im/status-protocol-go/datasync/peer"
datasyncnode "github.com/vacp2p/mvds/node"
@ -31,7 +32,6 @@ func (d *DataSync) Add(publicKey *ecdsa.PublicKey, datasyncMessage datasyncproto
}
func (d *DataSync) Handle(sender *ecdsa.PublicKey, payload []byte) [][]byte {
var payloads [][]byte
logger := d.logger.With(zap.String("site", "Handle"))

View File

@ -4,6 +4,7 @@ go 1.12
require (
github.com/aristanetworks/goarista v0.0.0-20190704150520-f44d68189fd7 // indirect
github.com/cenkalti/backoff/v3 v3.0.0
github.com/deckarep/golang-set v1.7.1 // indirect
github.com/ethereum/go-ethereum v1.8.27
github.com/golang/protobuf v1.3.2
@ -20,7 +21,7 @@ require (
github.com/status-im/migrate/v4 v4.0.0-20190821140204-a9d340ec8fb76af4afda06acf01740d45d2661ed
github.com/status-im/whisper v1.4.14
github.com/stretchr/testify v1.3.1-0.20190712000136-221dbe5ed467
github.com/vacp2p/mvds v0.0.0-20190705123435-943ab5e228078f6389dffe00946c0d0a5b495a5c
github.com/vacp2p/mvds v0.0.21
go.uber.org/zap v1.10.0
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4
golang.org/x/net v0.0.0-20190628185345-da137c7871d7 // indirect

View File

@ -30,6 +30,8 @@ github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCS
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/btcsuite/btcd v0.0.0-20171128150713-2e60448ffcc6 h1:Eey/GGQ/E5Xp1P2Lyx1qj007hLZfbi0+CoVeJruGCtI=
github.com/btcsuite/btcd v0.0.0-20171128150713-2e60448ffcc6/go.mod h1:Dmm/EzmjnCiweXmzRIAiUWCInVmPgjkzgv5k4tVyXiQ=
github.com/cenkalti/backoff/v3 v3.0.0 h1:ske+9nBpD9qZsTBoF41nW5L+AIuFBKMeze18XQ3eG1c=
github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/cespare/cp v0.1.0/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
@ -252,8 +254,8 @@ github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2/go.mod h1:Z4AUp2K
github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/uber/jaeger-client-go v0.0.0-20180607151842-f7e0d4744fa6/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v0.0.0-20180615202729-a51202d6f4a7/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/vacp2p/mvds v0.0.0-20190705123435-943ab5e228078f6389dffe00946c0d0a5b495a5c h1:O7gT6vNipoBxFe19iWtDyUjIcIbkJ5MYhMXLS+RRCFs=
github.com/vacp2p/mvds v0.0.0-20190705123435-943ab5e228078f6389dffe00946c0d0a5b495a5c/go.mod h1:pIqr2Hg4cIkTJniGPCp4ptong2jxgxx6uToVoY94+II=
github.com/vacp2p/mvds v0.0.21 h1:YeYja8noKsHvrnOHM4pqjBvDwwy+kUzXMkX1IBYJAbU=
github.com/vacp2p/mvds v0.0.21/go.mod h1:pIqr2Hg4cIkTJniGPCp4ptong2jxgxx6uToVoY94+II=
github.com/xanzy/go-gitlab v0.15.0/go.mod h1:8zdQa/ri1dfn8eS3Ir1SyfvOKlw7WBJ8DVThkpGiXrs=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=

View File

@ -0,0 +1,467 @@
package statusproto
import (
"context"
"crypto/ecdsa"
"database/sql"
"time"
"github.com/ethereum/go-ethereum/crypto"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"
whisper "github.com/status-im/whisper/whisperv6"
"go.uber.org/zap"
"github.com/status-im/status-protocol-go/datasync"
datasyncpeer "github.com/status-im/status-protocol-go/datasync/peer"
"github.com/status-im/status-protocol-go/encryption"
"github.com/status-im/status-protocol-go/encryption/multidevice"
transport "github.com/status-im/status-protocol-go/transport/whisper"
protocol "github.com/status-im/status-protocol-go/v1"
datasyncnode "github.com/vacp2p/mvds/node"
datasyncproto "github.com/vacp2p/mvds/protobuf"
)
// Whisper message properties.
const (
whisperTTL = 15
whisperPoW = 0.002
whisperPoWTime = 5
)
type messageProcessor struct {
identity *ecdsa.PrivateKey
datasync *datasync.DataSync
protocol *encryption.Protocol
transport *transport.WhisperServiceTransport
logger *zap.Logger
featureFlags featureFlags
}
func newMessageProcessor(
identity *ecdsa.PrivateKey,
database *sql.DB,
enc *encryption.Protocol,
transport *transport.WhisperServiceTransport,
logger *zap.Logger,
features featureFlags,
) (*messageProcessor, error) {
dataSyncTransport := datasync.NewDataSyncNodeTransport()
dataSyncNode, err := datasyncnode.NewPersistentNode(
database,
dataSyncTransport,
datasyncpeer.PublicKeyToPeerID(identity.PublicKey),
datasyncnode.BATCH,
datasync.CalculateSendTime,
logger,
)
if err != nil {
return nil, err
}
ds := datasync.New(dataSyncNode, dataSyncTransport, features.datasync, logger)
p := &messageProcessor{
identity: identity,
datasync: ds,
protocol: enc,
transport: transport,
logger: logger,
featureFlags: features,
}
// Initializing DataSync is required to encrypt and send messages.
// With DataSync enabled, messages are added to the DataSync
// but actual encrypt and send calls are postponed.
// sendDataSync is responsible for encrypting and sending postponed messages.
if features.datasync {
ds.Init(p.sendDataSync)
ds.Start(300 * time.Millisecond)
}
return p, nil
}
func (p *messageProcessor) Stop() {
p.datasync.Stop() // idempotent op
}
func (p *messageProcessor) SendPrivate(
ctx context.Context,
publicKey *ecdsa.PublicKey,
chatID string,
data []byte,
clock int64,
) ([]byte, *protocol.Message, error) {
logger := p.logger.With(zap.String("site", "SendPrivate"))
logger.Debug("sending a private message", zap.Binary("public-key", crypto.FromECDSAPub(publicKey)))
message := protocol.CreatePrivateTextMessage(data, clock, chatID)
encodedMessage, err := p.encodeMessage(message)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to encode message")
}
messageID, err := p.SendPrivateRaw(ctx, publicKey, encodedMessage)
if err != nil {
return nil, nil, err
}
return messageID, &message, nil
}
// SendPrivateRaw takes encoded data, encrypts it and sends through the wire.
// DEPRECATED
func (p *messageProcessor) SendPrivateRaw(
ctx context.Context,
publicKey *ecdsa.PublicKey,
data []byte,
) ([]byte, error) {
p.logger.Debug(
"sending a private message",
zap.Binary("public-key", crypto.FromECDSAPub(publicKey)),
zap.String("site", "SendPrivateRaw"),
)
wrappedMessage, err := p.tryWrapMessageV1(data)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
messageID := protocol.MessageID(&p.identity.PublicKey, wrappedMessage)
if p.featureFlags.datasync {
if err := p.addToDataSync(publicKey, wrappedMessage); err != nil {
return nil, errors.Wrap(err, "failed to send message with datasync")
}
} else {
messageSpec, err := p.protocol.BuildDirectMessage(p.identity, publicKey, wrappedMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to encrypt message")
}
hash, newMessage, err := p.sendMessageSpec(ctx, publicKey, messageSpec)
if err != nil {
return nil, errors.Wrap(err, "failed to send a message spec")
}
p.transport.Track([][]byte{messageID}, hash, *newMessage)
}
return messageID, nil
}
func (p *messageProcessor) SendPublic(ctx context.Context, chatName, chatID string, data []byte, clock int64) ([]byte, error) {
logger := p.logger.With(zap.String("site", "SendPublic"))
logger.Debug("sending a public message", zap.String("chat-name", chatName))
message := protocol.CreatePublicTextMessage(data, clock, chatName)
encodedMessage, err := p.encodeMessage(message)
if err != nil {
return nil, errors.Wrap(err, "failed to encode message")
}
wrappedMessage, err := p.tryWrapMessageV1(encodedMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
messageSpec, err := p.protocol.BuildPublicMessage(p.identity, wrappedMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to build public message")
}
newMessage, err := messageSpecToWhisper(messageSpec)
if err != nil {
return nil, err
}
hash, err := p.transport.SendPublic(ctx, &newMessage, chatName)
if err != nil {
return nil, err
}
messageID := protocol.MessageID(&p.identity.PublicKey, wrappedMessage)
p.transport.Track([][]byte{messageID}, hash, newMessage)
return messageID, nil
}
// SendPublicRaw takes encoded data, encrypts it and sends through the wire.
// DEPRECATED
func (p *messageProcessor) SendPublicRaw(ctx context.Context, chatName string, data []byte) ([]byte, error) {
var newMessage whisper.NewMessage
wrappedMessage, err := p.tryWrapMessageV1(data)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
newMessage = whisper.NewMessage{
TTL: whisperTTL,
Payload: wrappedMessage,
PowTarget: whisperPoW,
PowTime: whisperPoWTime,
}
hash, err := p.transport.SendPublic(ctx, &newMessage, chatName)
if err != nil {
return nil, err
}
messageID := protocol.MessageID(&p.identity.PublicKey, wrappedMessage)
p.transport.Track([][]byte{messageID}, hash, newMessage)
return messageID, nil
}
func (p *messageProcessor) Process(messages []*whisper.ReceivedMessage) ([]*protocol.Message, error) {
logger := p.logger.With(zap.String("site", "handleRetrievedMessages"))
decodedMessages := make([]*protocol.Message, 0, len(messages))
for _, item := range messages {
shhMessage := whisper.ToWhisperMessage(item)
hlogger := logger.With(zap.Binary("hash", shhMessage.Hash))
hlogger.Debug("handling a received message")
statusMessages, err := p.handleMessages(shhMessage, true)
if err != nil {
hlogger.Info("failed to decode messages", zap.Error(err))
continue
}
for _, statusMessage := range statusMessages {
switch m := statusMessage.ParsedMessage.(type) {
case protocol.Message:
m.ID = statusMessage.ID
m.SigPubKey = statusMessage.SigPubKey()
decodedMessages = append(decodedMessages, &m)
case protocol.PairMessage:
fromOurDevice := isPubKeyEqual(statusMessage.SigPubKey(), &p.identity.PublicKey)
if !fromOurDevice {
hlogger.Debug("received PairMessage from not our device, skipping")
break
}
metadata := &multidevice.InstallationMetadata{
Name: m.Name,
FCMToken: m.FCMToken,
DeviceType: m.DeviceType,
}
err := p.protocol.SetInstallationMetadata(&p.identity.PublicKey, m.InstallationID, metadata)
if err != nil {
return nil, err
}
default:
hlogger.Error("skipped a public message of unsupported type")
}
}
}
return decodedMessages, nil
}
// handleMessages expects a whisper message as input, and it will go through
// a series of transformations until the message is parsed into an application
// layer message, or in case of Raw methods, the processing stops at the layer
// before
func (p *messageProcessor) handleMessages(shhMessage *whisper.Message, applicationLayer bool) ([]*protocol.StatusMessage, error) {
logger := p.logger.With(zap.String("site", "handleMessages"))
hlogger := logger.With(zap.Binary("hash", shhMessage.Hash))
var statusMessage protocol.StatusMessage
err := statusMessage.HandleTransport(shhMessage)
if err != nil {
hlogger.Error("failed to handle transport layer message", zap.Error(err))
return nil, err
}
err = p.handleEncryptionLayer(context.Background(), &statusMessage)
if err != nil {
hlogger.Debug("failed to handle an encryption message", zap.Error(err))
}
statusMessages, err := statusMessage.HandleDatasync(p.datasync)
if err != nil {
hlogger.Debug("failed to handle datasync message", zap.Error(err))
}
for _, statusMessage := range statusMessages {
err := statusMessage.HandleApplicationMetadata()
if err != nil {
hlogger.Error("failed to handle application metadata layer message", zap.Error(err))
}
if applicationLayer {
err = statusMessage.HandleApplication()
if err != nil {
hlogger.Error("failed to handle application layer message")
}
}
}
return statusMessages, nil
}
func (p *messageProcessor) handleEncryptionLayer(ctx context.Context, message *protocol.StatusMessage) error {
logger := p.logger.With(zap.String("site", "handleEncryptionLayer"))
publicKey := message.SigPubKey()
err := message.HandleEncryption(p.identity, publicKey, p.protocol)
if err == encryption.ErrDeviceNotFound {
handleErr := p.handleErrDeviceNotFound(ctx, publicKey)
if handleErr != nil {
logger.Error("failed to handle error", zap.Error(err), zap.NamedError("handleErr", handleErr))
}
}
if err != nil {
return errors.Wrap(err, "failed to process an encrypted message")
}
return nil
}
func (p *messageProcessor) handleErrDeviceNotFound(ctx context.Context, publicKey *ecdsa.PublicKey) error {
now := time.Now().Unix()
advertise, err := p.protocol.ShouldAdvertiseBundle(publicKey, now)
if err != nil {
return err
}
if !advertise {
return nil
}
messageSpec, err := p.protocol.BuildBundleAdvertiseMessage(p.identity, publicKey)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
_, _, err = p.sendMessageSpec(ctx, publicKey, messageSpec)
if err != nil {
return err
}
p.protocol.ConfirmBundleAdvertisement(publicKey, now)
return nil
}
func (p *messageProcessor) encodeMessage(message protocol.Message) ([]byte, error) {
encodedMessage, err := protocol.EncodeMessage(message)
if err != nil {
return nil, errors.Wrap(err, "failed to encode message")
}
return encodedMessage, nil
}
func (p *messageProcessor) tryWrapMessageV1(encodedMessage []byte) ([]byte, error) {
if p.featureFlags.sendV1Messages {
wrappedMessage, err := protocol.WrapMessageV1(encodedMessage, p.identity)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
return wrappedMessage, nil
}
return encodedMessage, nil
}
func (p *messageProcessor) addToDataSync(publicKey *ecdsa.PublicKey, message []byte) error {
groupID := datasync.ToOneToOneGroupID(&p.identity.PublicKey, publicKey)
peerID := datasyncpeer.PublicKeyToPeerID(*publicKey)
exist, err := p.datasync.IsPeerInGroup(groupID, peerID)
if err != nil {
return errors.Wrap(err, "failed to check if peer is in group")
}
if !exist {
if err := p.datasync.AddPeer(groupID, peerID); err != nil {
return errors.Wrap(err, "failed to add peer")
}
}
_, err = p.datasync.AppendMessage(groupID, message)
if err != nil {
return errors.Wrap(err, "failed to append message to datasync")
}
return nil
}
// sendDataSync sends a message scheduled by the data sync layer.
func (p *messageProcessor) sendDataSync(ctx context.Context, publicKey *ecdsa.PublicKey, encodedMessage []byte, payload *datasyncproto.Payload) error {
messageIDs := make([][]byte, 0, len(payload.Messages))
for _, payload := range payload.Messages {
messageIDs = append(messageIDs, protocol.MessageID(&p.identity.PublicKey, payload.Body))
}
messageSpec, err := p.protocol.BuildDirectMessage(p.identity, publicKey, encodedMessage)
if err != nil {
return errors.Wrap(err, "failed to encrypt message")
}
hash, newMessage, err := p.sendMessageSpec(ctx, publicKey, messageSpec)
if err != nil {
return err
}
p.transport.Track(messageIDs, hash, *newMessage)
return nil
}
func (p *messageProcessor) sendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec) ([]byte, *whisper.NewMessage, error) {
newMessage, err := messageSpecToWhisper(messageSpec)
if err != nil {
return nil, nil, err
}
logger := p.logger.With(zap.String("site", "sendMessageSpec"))
var hash []byte
switch {
case messageSpec.SharedSecret != nil:
logger.Debug("sending using shared secret")
hash, err = p.transport.SendPrivateWithSharedSecret(ctx, &newMessage, publicKey, messageSpec.SharedSecret)
case messageSpec.PartitionedTopicMode() == encryption.PartitionTopicV1:
logger.Debug("sending partitioned topic")
hash, err = p.transport.SendPrivateWithPartitioned(ctx, &newMessage, publicKey)
case !p.featureFlags.genericDiscoveryTopicEnabled:
logger.Debug("sending partitioned topic (generic discovery topic disabled)")
hash, err = p.transport.SendPrivateWithPartitioned(ctx, &newMessage, publicKey)
default:
logger.Debug("sending using discovery topic")
hash, err = p.transport.SendPrivateOnDiscovery(ctx, &newMessage, publicKey)
}
if err != nil {
return nil, nil, err
}
return hash, &newMessage, nil
}
func messageSpecToWhisper(spec *encryption.ProtocolMessageSpec) (whisper.NewMessage, error) {
var newMessage whisper.NewMessage
payload, err := proto.Marshal(spec.Message)
if err != nil {
return newMessage, err
}
newMessage = whisper.NewMessage{
TTL: whisperTTL,
Payload: payload,
PowTarget: whisperPoW,
PowTime: whisperPoWTime,
}
return newMessage, nil
}
// isPubKeyEqual checks that two public keys are equal
func isPubKeyEqual(a, b *ecdsa.PublicKey) bool {
// the curve is always the same, just compare the points
return a.X.Cmp(b.X) == 0 && a.Y.Cmp(b.Y) == 0
}

View File

@ -6,20 +6,17 @@ import (
"database/sql"
"time"
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/crypto"
"github.com/pkg/errors"
whisper "github.com/status-im/whisper/whisperv6"
"go.uber.org/zap"
"github.com/status-im/status-protocol-go/datasync"
datasyncpeer "github.com/status-im/status-protocol-go/datasync/peer"
"github.com/status-im/status-protocol-go/encryption"
"github.com/status-im/status-protocol-go/encryption/multidevice"
"github.com/status-im/status-protocol-go/encryption/sharedsecret"
"github.com/status-im/status-protocol-go/sqlite"
transport "github.com/status-im/status-protocol-go/transport/whisper"
protocol "github.com/status-im/status-protocol-go/v1"
datasyncnode "github.com/vacp2p/mvds/node"
)
var (
@ -37,11 +34,12 @@ var (
type Messenger struct {
identity *ecdsa.PrivateKey
persistence *sqlitePersistence
adapter *whisperAdapter
transport *transport.WhisperServiceTransport
encryptor *encryption.Protocol
processor *messageProcessor
logger *zap.Logger
ownMessages map[string][]*protocol.Message
ownMessages []*protocol.Message
featureFlags featureFlags
messagesPersistenceEnabled bool
shutdownTasks []func() error
@ -212,9 +210,17 @@ func NewMessenger(
c.onSendContactCodeHandler = func(messageSpec *encryption.ProtocolMessageSpec) {
slogger := logger.With(zap.String("site", "onSendContactCodeHandler"))
slogger.Info("received a SendContactCode request")
newMessage, err := messageSpecToWhisper(messageSpec)
if err != nil {
slogger.Warn("failed to convert spec to Whisper message", zap.Error(err))
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err := messenger.adapter.SendContactCode(ctx, messageSpec)
chatName := transport.ContactCodeTopic(&messenger.identity.PublicKey)
_, err = messenger.transport.SendPublic(ctx, &newMessage, chatName)
if err != nil {
slogger.Warn("failed to send a contact code", zap.Error(err))
}
@ -265,40 +271,34 @@ func NewMessenger(
logger,
)
// Initialize data sync.
dataSyncTransport := datasync.NewDataSyncNodeTransport()
dataSyncNode, err := datasyncnode.NewPersistentNode(
processor, err := newMessageProcessor(
identity,
database,
dataSyncTransport,
datasyncpeer.PublicKeyToPeerID(identity.PublicKey),
datasyncnode.BATCH,
datasync.CalculateSendTime,
encryptionProtocol,
t,
logger,
c.featureFlags,
)
if err != nil {
return nil, errors.Wrap(err, "failed to create a persistent datasync node")
return nil, errors.Wrap(err, "failed to create messageProcessor")
}
datasync := datasync.New(dataSyncNode, dataSyncTransport, c.featureFlags.datasync, logger)
adapter := newWhisperAdapter(identity, t, encryptionProtocol, datasync, c.featureFlags, logger)
messenger = &Messenger{
identity: identity,
persistence: &sqlitePersistence{db: database},
adapter: adapter,
transport: t,
encryptor: encryptionProtocol,
ownMessages: make(map[string][]*protocol.Message),
processor: processor,
featureFlags: c.featureFlags,
messagesPersistenceEnabled: c.messagesPersistenceEnabled,
shutdownTasks: []func() error{
database.Close,
adapter.transport.Reset,
func() error { datasync.Stop(); return nil },
t.Reset,
t.Stop,
func() error { processor.Stop(); return nil },
// Currently this often fails, seems like it's safe to ignore them
// https://github.com/uber-go/zap/issues/328
func() error { _ = logger.Sync; return nil },
func() error { adapter.Stop(); return nil },
},
logger: logger,
}
@ -308,9 +308,6 @@ func NewMessenger(
if err := encryptionProtocol.Start(identity); err != nil {
return nil, err
}
if c.featureFlags.datasync {
dataSyncNode.Start(300 * time.Millisecond)
}
logger.Debug("messages persistence", zap.Bool("enabled", c.messagesPersistenceEnabled))
@ -373,7 +370,7 @@ func (m *Messenger) Init() error {
publicKeys = append(publicKeys, publicKey)
}
_, err = m.adapter.transport.InitFilters(publicChatIDs, publicKeys)
_, err = m.transport.InitFilters(publicChatIDs, publicKeys)
return err
}
@ -395,7 +392,21 @@ func (m *Messenger) Shutdown() (err error) {
}
func (m *Messenger) handleSharedSecrets(secrets []*sharedsecret.Secret) ([]*transport.Filter, error) {
return m.adapter.handleSharedSecrets(secrets)
logger := m.logger.With(zap.String("site", "handleSharedSecrets"))
var result []*transport.Filter
for _, secret := range secrets {
logger.Debug("received shared secret", zap.Binary("identity", crypto.FromECDSAPub(secret.Identity)))
fSecret := transport.NegotiatedSecret{
PublicKey: secret.Identity,
Key: secret.Key,
}
filter, err := m.transport.ProcessNegotiatedSecret(fSecret)
if err != nil {
return nil, err
}
result = append(result, filter)
}
return result, nil
}
func (m *Messenger) EnableInstallation(id string) error {
@ -436,18 +447,18 @@ func (m *Messenger) Mailservers() ([]string, error) {
func (m *Messenger) Join(chat Chat) error {
if chat.PublicKey != nil {
return m.adapter.JoinPrivate(chat.PublicKey)
return m.transport.JoinPrivate(chat.PublicKey)
} else if chat.Name != "" {
return m.adapter.JoinPublic(chat.Name)
return m.transport.JoinPublic(chat.Name)
}
return errors.New("chat is neither public nor private")
}
func (m *Messenger) Leave(chat Chat) error {
if chat.PublicKey != nil {
return m.adapter.LeavePrivate(chat.PublicKey)
return m.transport.LeavePrivate(chat.PublicKey)
} else if chat.Name != "" {
return m.adapter.LeavePublic(chat.Name)
return m.transport.LeavePublic(chat.Name)
}
return errors.New("chat is neither public nor private")
}
@ -477,6 +488,8 @@ func (m *Messenger) Contacts() ([]*Contact, error) {
}
func (m *Messenger) Send(ctx context.Context, chat Chat, data []byte) ([]byte, error) {
logger := m.logger.With(zap.String("site", "Send"), zap.String("chatID", chat.ID))
chatID := chat.ID
if chatID == "" {
return nil, ErrChatIDEmpty
@ -487,8 +500,12 @@ func (m *Messenger) Send(ctx context.Context, chat Chat, data []byte) ([]byte, e
return nil, err
}
logger.Debug("last message clock received", zap.Int64("clock", clock))
if chat.PublicKey != nil {
hash, message, err := m.adapter.SendPrivate(ctx, chat.PublicKey, chat.ID, data, clock)
logger.Debug("sending private message", zap.Binary("publicKey", crypto.FromECDSAPub(chat.PublicKey)))
hash, message, err := m.processor.SendPrivate(ctx, chat.PublicKey, chat.ID, data, clock)
if err != nil {
return nil, err
}
@ -498,18 +515,19 @@ func (m *Messenger) Send(ctx context.Context, chat Chat, data []byte) ([]byte, e
message.SigPubKey = &m.identity.PublicKey
if m.messagesPersistenceEnabled {
_, err = m.persistence.SaveMessages(chat.ID, []*protocol.Message{message})
_, err = m.persistence.SaveMessages([]*protocol.Message{message})
if err != nil {
return nil, err
}
}
// Cache it to be returned in Retrieve().
m.ownMessages[chatID] = append(m.ownMessages[chatID], message)
m.ownMessages = append(m.ownMessages, message)
return hash, nil
} else if chat.Name != "" {
return m.adapter.SendPublic(ctx, chat.Name, chat.ID, data, clock)
logger.Debug("sending public message", zap.String("chatName", chat.Name))
return m.processor.SendPublic(ctx, chat.Name, chat.ID, data, clock)
}
return nil, errors.New("chat is neither public nor private")
}
@ -518,9 +536,9 @@ func (m *Messenger) Send(ctx context.Context, chat Chat, data []byte) ([]byte, e
// DEPRECATED
func (m *Messenger) SendRaw(ctx context.Context, chat Chat, data []byte) ([]byte, error) {
if chat.PublicKey != nil {
return m.adapter.SendPrivateRaw(ctx, chat.PublicKey, data)
return m.processor.SendPrivateRaw(ctx, chat.PublicKey, data)
} else if chat.Name != "" {
return m.adapter.SendPublicRaw(ctx, chat.Name, data)
return m.processor.SendPublicRaw(ctx, chat.Name, data)
}
return nil, errors.New("chat is neither public nor private")
}
@ -538,129 +556,96 @@ var (
)
// RetrieveAll retrieves all previously fetched messages
func (m *Messenger) RetrieveAll(ctx context.Context, c RetrieveConfig) (allMessages []*protocol.Message, err error) {
latest, err := m.adapter.RetrieveAllMessages()
func (m *Messenger) RetrieveAll(ctx context.Context, c RetrieveConfig) ([]*protocol.Message, error) {
latest, err := m.transport.RetrieveAllMessages()
if err != nil {
err = errors.Wrap(err, "failed to retrieve messages")
return
return nil, errors.Wrap(err, "failed to retrieve messages")
}
for _, messages := range latest {
chatID := messages.ChatID
logger := m.logger.With(zap.String("site", "RetrieveAll"))
logger.Debug("retrieved messages grouped by chat", zap.Int("count", len(latest)))
_, err = m.persistence.SaveMessages(chatID, messages.Messages)
var result []*protocol.Message
for _, chat := range latest {
logger.Debug("processing chat", zap.String("chatID", chat.ChatID))
protoMessages, err := m.processor.Process(chat.Messages)
if err != nil {
return nil, errors.Wrap(err, "failed to save messages")
return nil, err
}
if !messages.Public {
// Return any own messages for this chat as well.
if ownMessages, ok := m.ownMessages[chatID]; ok {
messages.Messages = append(messages.Messages, ownMessages...)
}
}
retrievedMessages, err := m.retrieveSaved(ctx, chatID, c, messages.Messages)
if err != nil {
return nil, errors.Wrap(err, "failed to get saved messages")
}
allMessages = append(allMessages, retrievedMessages...)
}
// Delete own messages as they were added to the result.
for _, messages := range latest {
if !messages.Public {
delete(m.ownMessages, messages.ChatID)
}
}
return
}
func (m *Messenger) Retrieve(ctx context.Context, chat Chat, c RetrieveConfig) (messages []*protocol.Message, err error) {
var (
latest []*protocol.Message
ownLatest []*protocol.Message
)
if chat.PublicKey != nil {
latest, err = m.adapter.RetrievePrivateMessages(chat.PublicKey)
// Return any own messages for this chat as well.
if ownMessages, ok := m.ownMessages[chat.ID]; ok {
ownLatest = ownMessages
}
} else if chat.Name != "" {
latest, err = m.adapter.RetrievePublicMessages(chat.Name)
} else {
return nil, errors.New("chat is neither public nor private")
result = append(result, protoMessages...)
}
_, err = m.persistence.SaveMessages(result)
if err != nil {
err = errors.Wrap(err, "failed to retrieve messages")
return
return nil, errors.Wrap(err, "failed to save messages")
}
if m.messagesPersistenceEnabled {
_, err = m.persistence.SaveMessages(chat.ID, latest)
if err != nil {
return nil, errors.Wrap(err, "failed to save latest messages")
}
}
// Confirm received and decrypted messages.
if m.messagesPersistenceEnabled && chat.PublicKey != nil {
for _, message := range latest {
// Confirm received and decrypted messages.
if err := m.encryptor.ConfirmMessageProcessed(message.ID); err != nil {
return nil, errors.Wrap(err, "failed to confirm message being processed")
}
}
}
// We may need to add more messages from the past.
result, err := m.retrieveSaved(ctx, chat.ID, c, append(latest, ownLatest...))
retrievedMessages, err := m.retrieveSaved(ctx, c)
if err != nil {
return nil, err
return nil, errors.Wrap(err, "failed to get saved messages")
}
result = append(result, retrievedMessages...)
// When our messages are returned, we can delete them.
delete(m.ownMessages, chat.ID)
// Include own messages.
result = append(result, m.ownMessages...)
m.ownMessages = nil
return result, nil
}
func (m *Messenger) retrieveSaved(ctx context.Context, chatID string, c RetrieveConfig, latest []*protocol.Message) (messages []*protocol.Message, err error) {
func (m *Messenger) retrieveSaved(ctx context.Context, c RetrieveConfig) (messages []*protocol.Message, err error) {
if !m.messagesPersistenceEnabled {
return latest, nil
return nil, nil
}
if !c.latest {
return m.persistence.Messages(chatID, c.From, c.To)
return m.persistence.Messages(c.From, c.To)
}
if c.last24Hours {
to := time.Now()
from := to.Add(-time.Hour * 24)
return m.persistence.Messages(chatID, from, to)
return m.persistence.Messages(from, to)
}
return latest, nil
return nil, nil
}
// DEPRECATED
func (m *Messenger) RetrieveRawAll() (map[transport.Filter][]*protocol.StatusMessage, error) {
return m.adapter.RetrieveRawAll()
chatWithMessages, err := m.transport.RetrieveRawAll()
if err != nil {
return nil, err
}
logger := m.logger.With(zap.String("site", "RetrieveRawAll"))
result := make(map[transport.Filter][]*protocol.StatusMessage)
for chat, messages := range chatWithMessages {
for _, message := range messages {
shhMessage := whisper.ToWhisperMessage(message)
// TODO: fix this to use an exported method.
statusMessages, err := m.processor.handleMessages(shhMessage, false)
if err != nil {
logger.Info("failed to decode messages", zap.Error(err))
continue
}
result[chat] = append(result[chat], statusMessages...)
}
}
return result, nil
}
// DEPRECATED
func (m *Messenger) LoadFilters(filters []*transport.Filter) ([]*transport.Filter, error) {
return m.adapter.transport.LoadFilters(filters)
return m.transport.LoadFilters(filters)
}
// DEPRECATED
func (m *Messenger) RemoveFilters(filters []*transport.Filter) error {
return m.adapter.transport.RemoveFilters(filters)
return m.transport.RemoveFilters(filters)
}
// DEPRECATED

View File

@ -3,7 +3,7 @@
// 000001_init.down.db.sql (82B)
// 000001_init.up.db.sql (840B)
// 000002_add_chats.down.db.sql (74B)
// 000002_add_chats.up.db.sql (541B)
// 000002_add_chats.up.db.sql (569B)
// 000003_add_contacts.down.db.sql (21B)
// 000003_add_contacts.up.db.sql (251B)
// 000004_user_messages_compatibility.down.sql (33B)
@ -137,7 +137,7 @@ func _000002_add_chatsDownDbSql() (*asset, error) {
return a, nil
}
var __000002_add_chatsUpDbSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x8c\x91\x4f\x4b\x03\x31\x10\xc5\xef\xf3\x29\x06\x3c\x54\x61\x0f\x7a\x10\x85\x9e\xb2\xdb\x14\x17\xe3\xa6\xa4\xa9\xd8\x53\x48\xb3\x83\x5d\xba\xff\x68\xb2\x95\x7e\x7b\x69\x5d\xda\xea\x2a\x78\x4c\xde\xef\xbd\x49\xe6\x25\x8a\x33\xcd\x51\xb3\x58\x70\x4c\xa7\x98\x49\x8d\xfc\x2d\x9d\xeb\x39\xba\xb5\x0d\x1e\xaf\xa1\xc8\xf1\x95\xa9\xe4\x89\x29\x9c\xa9\xf4\x85\xa9\x25\x3e\xf3\x25\xca\x0c\x13\x99\x4d\x45\x9a\x68\x54\x7c\x26\x58\xc2\x23\xa8\x6d\x45\x27\xfa\x90\x95\x2d\x84\x88\xc0\x35\x65\xb3\x1d\xdc\xe3\x84\x4f\xd9\x42\x68\x1c\x5d\xd9\xbb\xc7\x87\xfc\x7e\x14\x41\xd8\xb7\x84\x69\xa6\x2f\xcc\xd6\x85\x62\x47\x18\x4b\x29\x38\xcb\x86\x6e\xad\x16\x3c\x82\x50\x54\xe4\x83\xad\xda\x1f\xee\x9c\x4a\x0a\x94\x1b\x1b\x8c\x2b\x1b\xb7\x31\x3b\x5b\x76\xdf\x47\x9c\x92\x6e\x23\x68\xbb\x55\x59\x38\xb3\xa1\x3d\xc6\x42\xc6\x11\x74\xf5\xae\xa0\x0f\xca\x4d\x45\xde\xdb\x77\x32\xae\xe9\xea\xf0\xa7\xbf\xb4\xfe\x7f\x83\x8e\xe0\x39\xb3\x0e\x54\x07\x73\xfc\x7d\xbf\xa6\xdf\x91\xb3\x5a\x51\xb5\xa2\xad\xef\x9f\xd9\x9f\xd6\x45\x6b\xba\x36\xb7\x81\xbe\x04\xb8\x19\x03\xc0\x44\xc9\x59\x5f\xf1\x90\x1b\x5f\xca\x87\xce\x4d\xcf\x8c\xe1\x33\x00\x00\xff\xff\xf3\xb4\xa4\xad\x1d\x02\x00\x00")
var __000002_add_chatsUpDbSql = []byte("\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\xff\x8c\x92\xcf\x6a\x32\x31\x14\xc5\xf7\xf7\x29\x2e\x7c\x0b\xbf\x42\x16\xed\xa2\xb4\xe0\x2a\x33\x46\x3a\x34\x9d\x48\x8c\xa5\xae\x42\xcc\x5c\xea\xe0\xfc\xc3\xc9\x58\x7c\xfb\xa2\x1d\xac\x76\x2a\x74\x99\x9c\xdf\x39\x97\x93\xdc\x58\x0b\x6e\x04\x1a\x1e\x49\x81\xc9\x14\x53\x65\x50\xbc\x25\x73\x33\x47\xbf\x76\xa1\xc5\xff\x90\x67\xf8\xca\x75\xfc\xc4\x35\xce\x74\xf2\xc2\xf5\x12\x9f\xc5\x12\x55\x8a\xb1\x4a\xa7\x32\x89\x0d\x6a\x31\x93\x3c\x16\x0c\x2a\x57\xd2\x89\x3e\x64\xa5\x0b\x29\x19\xf8\xba\xa8\xb7\x83\x7b\x9c\x88\x29\x5f\x48\x83\xa3\x7f\xee\xee\xf1\x21\xbb\x1f\x31\x08\xfb\x86\x30\x49\xcd\x99\xd9\xf9\x90\xef\x08\x23\xa5\xa4\xe0\xe9\xd0\x6d\xf4\x42\x30\x08\x79\x49\x6d\x70\x65\xf3\xc3\x9d\x51\x41\x81\x32\xeb\x82\xf5\x45\xed\x37\x76\xe7\x8a\xee\x72\xc4\x29\xe9\x96\x41\xd3\xad\x8a\xdc\xdb\x0d\xed\x31\x92\x2a\x62\xd0\x55\xbb\x9c\x3e\x28\xb3\x25\xb5\xad\x7b\x27\xeb\xeb\xae\x0a\x57\xfd\x85\x6b\xff\x36\xe8\x08\x7e\x67\x56\x81\xaa\x60\x8f\xed\xfb\x67\xfa\x1d\xb9\xa2\x5e\xb4\x67\x50\x52\xb9\xa2\x6d\xdb\x57\xe8\x4f\xeb\xbc\xb1\x5d\x93\xb9\x40\x5f\x02\xdc\x8c\x01\x60\xa2\xd5\xac\xff\xfe\x21\x37\x3e\x97\x0f\xfb\x60\x7b\x66\x0c\x9f\x01\x00\x00\xff\xff\x78\x3a\x6f\x25\x39\x02\x00\x00")
func _000002_add_chatsUpDbSqlBytes() ([]byte, error) {
return bindataRead(
@ -152,8 +152,8 @@ func _000002_add_chatsUpDbSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "000002_add_chats.up.db.sql", size: 541, mode: os.FileMode(0644), modTime: time.Unix(1565597460, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xd, 0x7f, 0x3a, 0xd7, 0xf6, 0x8b, 0x6e, 0x4d, 0xce, 0x7d, 0x63, 0x1d, 0x30, 0xa2, 0xc1, 0xb, 0xa0, 0x35, 0x2e, 0xfa, 0xef, 0xf0, 0x39, 0xf7, 0x22, 0xdd, 0x31, 0x11, 0xb1, 0xff, 0xbf, 0xb3}}
info := bindataFileInfo{name: "000002_add_chats.up.db.sql", size: 569, mode: os.FileMode(0644), modTime: time.Unix(1567170206, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0x89, 0xb, 0x12, 0x5, 0x4b, 0xda, 0xab, 0xb2, 0x47, 0x1b, 0x66, 0xe, 0x47, 0x8a, 0xb0, 0x9c, 0xa0, 0xe4, 0x12, 0xa4, 0xf9, 0xaa, 0x72, 0xba, 0xd9, 0x17, 0x8f, 0xac, 0x7f, 0xfd, 0x85, 0xa9}}
return a, nil
}
@ -232,7 +232,7 @@ func _000004_user_messages_compatibilityUpSql() (*asset, error) {
return nil, err
}
info := bindataFileInfo{name: "000004_user_messages_compatibility.up.sql", size: 928, mode: os.FileMode(0644), modTime: time.Unix(1565697832, 0)}
info := bindataFileInfo{name: "000004_user_messages_compatibility.up.sql", size: 928, mode: os.FileMode(0644), modTime: time.Unix(1566286773, 0)}
a := &asset{bytes: bytes, info: info, digest: [32]uint8{0xdf, 0xc4, 0x5c, 0xed, 0x4, 0x26, 0xb1, 0xb2, 0x53, 0xac, 0x1, 0x20, 0xf3, 0x17, 0x37, 0xb3, 0x3d, 0x84, 0x5e, 0xd8, 0x1, 0x53, 0x88, 0x9a, 0x9c, 0xaf, 0x9, 0xdf, 0x58, 0x2e, 0xf0, 0x19}}
return a, nil
}

View File

@ -78,8 +78,8 @@ func (db sqlitePersistence) SaveChat(chat Chat) error {
}
// Insert record
stmt, err := db.db.Prepare(`INSERT INTO chats(id, name, color, active, type, timestamp, deleted_at_clock_value, public_key, unviewed_message_count, last_clock_value, last_message_content_type, last_message_content, members, membership_updates)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`)
stmt, err := db.db.Prepare(`INSERT INTO chats(id, name, color, active, type, timestamp, deleted_at_clock_value, public_key, unviewed_message_count, last_clock_value, last_message_content_type, last_message_content, last_message_timestamp, members, membership_updates)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`)
if err != nil {
return err
}
@ -98,6 +98,7 @@ func (db sqlitePersistence) SaveChat(chat Chat) error {
chat.LastClockValue,
chat.LastMessageContentType,
chat.LastMessageContent,
chat.LastMessageTimestamp,
encodedMembers.Bytes(),
encodedMembershipUpdates.Bytes(),
)
@ -149,6 +150,7 @@ func (db sqlitePersistence) chats(tx *sql.Tx) ([]*Chat, error) {
last_clock_value,
last_message_content_type,
last_message_content,
last_message_timestamp,
members,
membership_updates
FROM chats
@ -163,6 +165,7 @@ func (db sqlitePersistence) chats(tx *sql.Tx) ([]*Chat, error) {
for rows.Next() {
var lastMessageContentType sql.NullString
var lastMessageContent sql.NullString
var lastMessageTimestamp sql.NullInt64
chat := &Chat{}
encodedMembers := []byte{}
@ -181,6 +184,7 @@ func (db sqlitePersistence) chats(tx *sql.Tx) ([]*Chat, error) {
&chat.LastClockValue,
&lastMessageContentType,
&lastMessageContent,
&lastMessageTimestamp,
&encodedMembers,
&encodedMembershipUpdates,
)
@ -189,6 +193,7 @@ func (db sqlitePersistence) chats(tx *sql.Tx) ([]*Chat, error) {
}
chat.LastMessageContent = lastMessageContent.String
chat.LastMessageContentType = lastMessageContentType.String
chat.LastMessageTimestamp = lastMessageTimestamp.Int64
// Restore members
membersDecoder := gob.NewDecoder(bytes.NewBuffer(encodedMembers))
@ -335,19 +340,30 @@ func (db sqlitePersistence) SaveContact(contact Contact, tx *sql.Tx) error {
}
// Messages returns messages for a given contact, in a given period. Ordered by a timestamp.
func (db sqlitePersistence) Messages(chatID string, from, to time.Time) (result []*protocol.Message, err error) {
func (db sqlitePersistence) Messages(from, to time.Time) (result []*protocol.Message, err error) {
rows, err := db.db.Query(`SELECT
id, content_type, message_type, text, clock, timestamp, content_chat_id, content_text, public_key, flags
FROM user_messages WHERE chat_id = ? AND timestamp >= ? AND timestamp <= ? ORDER BY timestamp`,
chatID, protocol.TimestampInMsFromTime(from), protocol.TimestampInMsFromTime(to))
id,
content_type,
message_type,
text,
clock,
timestamp,
content_chat_id,
content_text,
public_key,
flags
FROM user_messages
WHERE timestamp >= ? AND timestamp <= ?
ORDER BY timestamp`,
protocol.TimestampInMsFromTime(from),
protocol.TimestampInMsFromTime(to),
)
if err != nil {
return nil, err
}
defer rows.Close()
var (
rst = []*protocol.Message{}
)
var rst []*protocol.Message
for rows.Next() {
msg := protocol.Message{
Content: protocol.Content{},
@ -458,7 +474,7 @@ func (db sqlitePersistence) UnreadMessages(chatID string) ([]*protocol.Message,
return result, nil
}
func (db sqlitePersistence) SaveMessages(chatID string, messages []*protocol.Message) (last int64, err error) {
func (db sqlitePersistence) SaveMessages(messages []*protocol.Message) (last int64, err error) {
var (
tx *sql.Tx
stmt *sql.Stmt
@ -491,7 +507,7 @@ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`)
pkey, err = marshalECDSAPub(msg.SigPubKey)
}
rst, err = stmt.Exec(
msg.ID, chatID, msg.ContentT, msg.MessageT, msg.Text,
msg.ID, msg.ChatID, msg.ContentT, msg.MessageT, msg.Text,
msg.Clock, msg.Timestamp, msg.Content.ChatID, msg.Content.Text,
pkey, msg.Flags)
if err != nil {

View File

@ -371,6 +371,7 @@ func (db sqlitePersistence) BlockContact(contact Contact) ([]*Chat, error) {
SET
unviewed_message_count = (SELECT COUNT(1) FROM user_messages_legacy WHERE seen = 0 AND chat_id = chats.id),
last_message_content = (SELECT content from user_messages_legacy WHERE chat_id = chats.id ORDER BY clock_value DESC LIMIT 1),
last_message_timestamp = (SELECT timestamp from user_messages_legacy WHERE chat_id = chats.id ORDER BY clock_value DESC LIMIT 1),
last_message_content_type = (SELECT content_type from user_messages_legacy WHERE chat_id = chats.id ORDER BY clock_value DESC LIMIT 1)`)
if err != nil {
return nil, err

View File

@ -167,7 +167,11 @@ func (a *WhisperServiceTransport) LeavePublic(chatID string) error {
}
func (a *WhisperServiceTransport) JoinPrivate(publicKey *ecdsa.PublicKey) error {
_, err := a.filters.LoadContactCode(publicKey)
_, err := a.filters.LoadDiscovery()
if err != nil {
return err
}
_, err = a.filters.LoadContactCode(publicKey)
return err
}
@ -191,10 +195,11 @@ func (a *WhisperServiceTransport) RetrieveAllMessages() ([]ChatMessages, error)
return nil, errors.New("failed to return a filter")
}
messages := chatMessages[filter.ChatID]
messages.ChatID = filter.ChatID
messages.Public = filter.IsPublic()
messages.Messages = append(messages.Messages, f.Retrieve()...)
ch := chatMessages[filter.ChatID]
ch.ChatID = filter.ChatID
ch.Public = filter.IsPublic()
ch.Messages = append(ch.Messages, f.Retrieve()...)
chatMessages[filter.ChatID] = ch
}
var result []ChatMessages
@ -354,10 +359,11 @@ func (a *WhisperServiceTransport) Track(identifiers [][]byte, hash []byte, newMe
}
}
func (a *WhisperServiceTransport) Stop() {
func (a *WhisperServiceTransport) Stop() error {
if a.envelopesMonitor != nil {
a.envelopesMonitor.Stop()
}
return nil
}
// MessagesRequest is a RequestMessages() request payload.

View File

@ -2,9 +2,10 @@ package statusproto
import (
"crypto/ecdsa"
"github.com/pkg/errors"
"log"
"github.com/pkg/errors"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/golang/protobuf/proto"
@ -94,12 +95,17 @@ func (m *StatusMessage) HandleEncryption(myKey *ecdsa.PrivateKey, senderKey *ecd
return nil
}
// HandleDatasync processes StatusMessage through data sync layer.
// This is optional and DataSync might be nil. In such a case,
// only one payload will be returned equal to DecryptedPayload.
func (m *StatusMessage) HandleDatasync(datasync *datasync.DataSync) ([]*StatusMessage, error) {
var statusMessages []*StatusMessage
payloads := datasync.Handle(
m.SigPubKey(),
m.DecryptedPayload,
)
for _, payload := range payloads {
message, err := m.Clone()
if err != nil {

4
vendor/modules.txt vendored
View File

@ -317,7 +317,7 @@ github.com/status-im/migrate/v4/database/sqlcipher
github.com/status-im/rendezvous
github.com/status-im/rendezvous/protocol
github.com/status-im/rendezvous/server
# github.com/status-im/status-protocol-go v0.0.0-20190701094942-822d18916e417b768a13f85601a7b10e02236655
# github.com/status-im/status-protocol-go v0.0.0-20190701094942-8a6d5a6b49ee40f22ffac18377835e4a0fd7032e
github.com/status-im/status-protocol-go/zaputil
github.com/status-im/status-protocol-go
github.com/status-im/status-protocol-go/encryption/multidevice
@ -353,7 +353,7 @@ github.com/syndtr/goleveldb/leveldb/filter
github.com/syndtr/goleveldb/leveldb/journal
github.com/syndtr/goleveldb/leveldb/memdb
github.com/syndtr/goleveldb/leveldb/table
# github.com/vacp2p/mvds v0.0.0-20190705123435-943ab5e228078f6389dffe00946c0d0a5b495a5c
# github.com/vacp2p/mvds v0.0.21
github.com/vacp2p/mvds/node
github.com/vacp2p/mvds/protobuf
github.com/vacp2p/mvds/state