Prepare for status-go integration (#27)

status-go requires some special methods to be exposed in order to support the current integration.

Introduced families of methods marked as `DEPRECATED`:
* `SendRaw` to send transit-encoded messages, both public and private. It supports only private PFS-enabled messages. To send private non-PFS messages, we can add this functionality or it needs to happen through `shh_post` directly from status-react.
* `RetrieveRawAll` and `RetrieveRawWithFilter` to get retrieve decrypted but transit-encoded messages.
* `LoadFilters` and `RemoveFilters` to manage Whisper filters directly.
* By default, persistence of messages is disabled. It can be enabled with an option passed to `Messenger`.
* File paths to the databases are configurable in `Messenger`.
This commit is contained in:
Adam Babik 2019-07-23 10:33:57 +02:00 committed by GitHub
parent eb918ec8af
commit 6a11d08b05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 417 additions and 242 deletions

View File

@ -32,14 +32,20 @@ const (
// layers.
type whisperAdapter struct {
privateKey *ecdsa.PrivateKey
transport transport.WhisperTransport
transport *transport.WhisperServiceTransport
protocol *encryption.Protocol
logger *zap.Logger
featureFlags featureFlags
}
func newWhisperAdapter(pk *ecdsa.PrivateKey, t transport.WhisperTransport, p *encryption.Protocol, featureFlags featureFlags, logger *zap.Logger) *whisperAdapter {
func newWhisperAdapter(
pk *ecdsa.PrivateKey,
t *transport.WhisperServiceTransport,
p *encryption.Protocol,
featureFlags featureFlags,
logger *zap.Logger,
) *whisperAdapter {
if logger == nil {
logger = zap.NewNop()
}
@ -160,6 +166,53 @@ func (a *whisperAdapter) RetrievePrivateMessages(publicKey *ecdsa.PublicKey) ([]
return decodedMessages, nil
}
// DEPRECATED
func (a *whisperAdapter) RetrieveRawAll() (map[filter.Chat][]*whisper.Message, error) {
chatWithMessages, err := a.transport.RetrieveRawAll()
if err != nil {
return nil, err
}
logger := a.logger.With(zap.String("site", "RetrieveRawAll"))
result := make(map[filter.Chat][]*whisper.Message)
for chat, messages := range chatWithMessages {
for _, message := range messages {
shhMessage := whisper.ToWhisperMessage(message)
err := a.decryptMessage(context.Background(), shhMessage)
if err != nil {
logger.Warn("failed to decrypt a message", zap.Error(err), zap.Binary("messageID", shhMessage.Hash))
}
result[chat] = append(result[chat], shhMessage)
}
}
return result, nil
}
// DEPRECATED
func (a *whisperAdapter) RetrieveRaw(filterID string) ([]*whisper.Message, error) {
messages, err := a.transport.RetrieveRaw(filterID)
if err != nil {
return nil, err
}
logger := a.logger.With(zap.String("site", "RetrieveRaw"))
var result []*whisper.Message
for _, message := range messages {
shhMessage := whisper.ToWhisperMessage(message)
err := a.decryptMessage(context.Background(), shhMessage)
if err != nil {
logger.Warn("failed to decrypt a message", zap.Error(err), zap.Binary("messageID", shhMessage.Hash))
}
result = append(result, shhMessage)
}
return result, nil
}
func (a *whisperAdapter) decodeMessage(message *whisper.Message) (*protocol.StatusMessage, error) {
publicKey, err := crypto.UnmarshalPubkey(message.Sig)
@ -238,6 +291,10 @@ func (a *whisperAdapter) handleErrDeviceNotFound(ctx context.Context, publicKey
return nil
}
// SendPublic sends a public message passing chat name to the transport layer.
//
// Be aware that this method returns a message ID using protocol.MessageID
// instead of Whisper message hash.
func (a *whisperAdapter) SendPublic(ctx context.Context, chatName, chatID string, data []byte, clock int64) ([]byte, error) {
logger := a.logger.With(zap.String("site", "SendPublic"))
@ -265,13 +322,26 @@ func (a *whisperAdapter) SendPublic(ctx context.Context, chatName, chatID string
return protocol.MessageID(&a.privateKey.PublicKey, encodedMessage), nil
}
// SendPublicRaw takes encoded data, encrypts it and sends through the wire.
// DEPRECATED
func (a *whisperAdapter) SendPublicRaw(ctx context.Context, chatName string, data []byte) ([]byte, whisper.NewMessage, error) {
newMessage := whisper.NewMessage{
TTL: whisperTTL,
Payload: data,
PowTarget: whisperPoW,
PowTime: whisperPoWTime,
}
hash, err := a.transport.SendPublic(ctx, newMessage, chatName)
return hash, newMessage, err
}
func (a *whisperAdapter) SendContactCode(ctx context.Context, messageSpec *encryption.ProtocolMessageSpec) ([]byte, error) {
newMessage, err := a.messageSpecToWhisper(messageSpec)
if err != nil {
return nil, err
}
return a.transport.SendPublic(ctx, *newMessage, filter.ContactCodeTopic(&a.privateKey.PublicKey))
return a.transport.SendPublic(ctx, newMessage, filter.ContactCodeTopic(&a.privateKey.PublicKey))
}
func (a *whisperAdapter) encodeMessage(message protocol.Message) ([]byte, error) {
@ -292,7 +362,14 @@ func (a *whisperAdapter) encodeMessage(message protocol.Message) ([]byte, error)
}
// SendPrivate sends a one-to-one message. It needs to return it
// because the registered Whisper filter handles only incoming messages.
// because the registered Whisper filter handles only incoming messages
// and our own messages need to be handled manually.
//
// This might be not true if a shared secret is used because it relies on
// symmetric encryption.
//
// Be aware that this method returns a message ID using protocol.MessageID
// instead of Whisper message hash.
func (a *whisperAdapter) SendPrivate(
ctx context.Context,
publicKey *ecdsa.PublicKey,
@ -323,6 +400,35 @@ func (a *whisperAdapter) SendPrivate(
return protocol.MessageID(&a.privateKey.PublicKey, encodedMessage), &message, nil
}
// SendPrivateRaw takes encoded data, encrypts it and sends through the wire.
// DEPRECATED
func (a *whisperAdapter) SendPrivateRaw(
ctx context.Context,
publicKey *ecdsa.PublicKey,
data []byte,
) ([]byte, whisper.NewMessage, error) {
a.logger.Debug(
"sending a private message",
zap.Binary("public-key", crypto.FromECDSAPub(publicKey)),
zap.String("site", "SendPrivateRaw"),
)
var newMessage whisper.NewMessage
messageSpec, err := a.protocol.BuildDirectMessage(a.privateKey, publicKey, data)
if err != nil {
return nil, newMessage, errors.Wrap(err, "failed to encrypt message")
}
newMessage, err = a.messageSpecToWhisper(messageSpec)
if err != nil {
return nil, newMessage, errors.Wrap(err, "failed to convert ProtocolMessageSpec to whisper.NewMessage")
}
hash, err := a.sendMessageSpec(ctx, publicKey, messageSpec)
return hash, newMessage, err
}
func (a *whisperAdapter) sendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec) ([]byte, error) {
newMessage, err := a.messageSpecToWhisper(messageSpec)
if err != nil {
@ -333,33 +439,34 @@ func (a *whisperAdapter) sendMessageSpec(ctx context.Context, publicKey *ecdsa.P
switch {
case messageSpec.SharedSecret != nil:
logger.Debug("sending using shared secret")
return a.transport.SendPrivateWithSharedSecret(ctx, *newMessage, publicKey, messageSpec.SharedSecret)
return a.transport.SendPrivateWithSharedSecret(ctx, newMessage, publicKey, messageSpec.SharedSecret)
case messageSpec.PartitionedTopicMode() == encryption.PartitionTopicV1:
logger.Debug("sending partitioned topic")
return a.transport.SendPrivateWithPartitioned(ctx, *newMessage, publicKey)
return a.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey)
case !a.featureFlags.genericDiscoveryTopicEnabled:
logger.Debug("sending partitioned topic (generic discovery topic disabled)")
return a.transport.SendPrivateWithPartitioned(ctx, *newMessage, publicKey)
return a.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey)
default:
logger.Debug("sending using discovery topic")
return a.transport.SendPrivateOnDiscovery(ctx, *newMessage, publicKey)
return a.transport.SendPrivateOnDiscovery(ctx, newMessage, publicKey)
}
}
func (a *whisperAdapter) messageSpecToWhisper(spec *encryption.ProtocolMessageSpec) (*whisper.NewMessage, error) {
func (a *whisperAdapter) messageSpecToWhisper(spec *encryption.ProtocolMessageSpec) (whisper.NewMessage, error) {
var newMessage whisper.NewMessage
payload, err := proto.Marshal(spec.Message)
if err != nil {
return nil, err
return newMessage, err
}
newMessage := whisper.NewMessage{
newMessage = whisper.NewMessage{
TTL: whisperTTL,
Payload: payload,
PowTarget: whisperPoW,
PowTime: whisperPoWTime,
}
return &newMessage, nil
return newMessage, nil
}
func (a *whisperAdapter) handleSharedSecrets(secrets []*sharedsecret.Secret) error {

View File

@ -47,14 +47,14 @@ func setupUser(user string, s *EncryptionServiceMultiDeviceSuite, n int) error {
for i := 0; i < n; i++ {
installationID := fmt.Sprintf("%s%d", user, i+1)
installationDir := fmt.Sprintf("sqlite-persistence-test-%s", installationID)
dir, err := ioutil.TempDir("", installationDir)
dbFileName := fmt.Sprintf("sqlite-persistence-test-%s.sql", installationID)
dbPath, err := ioutil.TempFile("", dbFileName)
if err != nil {
return err
}
protocol, err := New(
dir,
dbPath.Name(),
"some-key",
installationID,
func(s []*multidevice.Installation) {},

View File

@ -31,25 +31,25 @@ func TestEncryptionServiceTestSuite(t *testing.T) {
type EncryptionServiceTestSuite struct {
suite.Suite
logger *zap.Logger
alice *Protocol
bob *Protocol
aliceDir string
bobDir string
logger *zap.Logger
alice *Protocol
bob *Protocol
aliceDBPath *os.File
bobDBPath *os.File
}
func (s *EncryptionServiceTestSuite) initDatabases(config encryptorConfig) {
var err error
s.aliceDir, err = ioutil.TempDir("", "alice-dir")
s.aliceDBPath, err = ioutil.TempFile("", "alice.db.sql")
s.Require().NoError(err)
s.bobDir, err = ioutil.TempDir("", "bob-dir")
s.bobDBPath, err = ioutil.TempFile("", "bob.db.sql")
s.Require().NoError(err)
config.InstallationID = aliceInstallationID
s.alice, err = NewWithEncryptorConfig(
s.aliceDir,
s.aliceDBPath.Name(),
"alice-key",
aliceInstallationID,
config,
@ -62,7 +62,7 @@ func (s *EncryptionServiceTestSuite) initDatabases(config encryptorConfig) {
config.InstallationID = bobInstallationID
s.bob, err = NewWithEncryptorConfig(
s.bobDir,
s.bobDBPath.Name(),
"bob-key",
bobInstallationID,
config,
@ -75,17 +75,13 @@ func (s *EncryptionServiceTestSuite) initDatabases(config encryptorConfig) {
}
func (s *EncryptionServiceTestSuite) SetupTest() {
logger, err := zap.NewDevelopment()
s.Require().NoError(err)
s.logger = logger
s.logger = zap.NewNop()
s.initDatabases(defaultEncryptorConfig("none", s.logger))
}
func (s *EncryptionServiceTestSuite) TearDownTest() {
os.Remove(s.aliceDir)
os.Remove(s.bobDir)
os.Remove(s.aliceDBPath.Name())
os.Remove(s.bobDBPath.Name())
_ = s.logger.Sync()
}
@ -457,7 +453,7 @@ func (s *EncryptionServiceTestSuite) TestMaxSkipKeysError() {
}
func (s *EncryptionServiceTestSuite) TestMaxMessageKeysPerSession() {
config := defaultEncryptorConfig("none", s.logger)
config := defaultEncryptorConfig("none", zap.NewNop())
// Set MaxKeep and MaxSkip to an high value so it does not interfere
config.MaxKeep = 100000
config.MaxSkip = 100000
@ -577,8 +573,7 @@ func (s *EncryptionServiceTestSuite) TestMaxKeep() {
// Alice sends a message to Bob
// Bob receives alice message
// Alice receives Bob message
// Bob sends another message to alice and viceversa
// Bob sends another message to alice and vice-versa.
func (s *EncryptionServiceTestSuite) TestConcurrentBundles() {
bobText1 := []byte("bob text 1")
bobText2 := []byte("bob text 2")

View File

@ -5,7 +5,6 @@ import (
"encoding/hex"
"errors"
"fmt"
"path/filepath"
"sync"
"time"
@ -81,8 +80,7 @@ func defaultEncryptorConfig(installationID string, logger *zap.Logger) encryptor
}
// newEncryptor creates a new EncryptionService instance.
func newEncryptor(dbDir, dbKey string, config encryptorConfig) (*encryptor, error) {
dbPath := filepath.Join(dbDir, "sessions.sql")
func newEncryptor(dbPath, dbKey string, config encryptorConfig) (*encryptor, error) {
db, err := sqlite.Open(dbPath, dbKey, sqlite.MigrationConfig{
AssetNames: migrations.AssetNames(),
AssetGetter: func(name string) ([]byte, error) {

View File

@ -83,7 +83,7 @@ var (
// New creates a new ProtocolService instance
func New(
dataDir string,
dbPath string,
dbKey string,
installationID string,
addedBundlesHandler func([]*multidevice.Installation),
@ -92,7 +92,7 @@ func New(
logger *zap.Logger,
) (*Protocol, error) {
return NewWithEncryptorConfig(
dataDir,
dbPath,
dbKey,
installationID,
defaultEncryptorConfig(installationID, logger),
@ -104,7 +104,7 @@ func New(
}
func NewWithEncryptorConfig(
dataDir string,
dbPath string,
dbKey string,
installationID string,
encryptorConfig encryptorConfig,
@ -113,7 +113,7 @@ func NewWithEncryptorConfig(
onSendContactCodeHandler func(*ProtocolMessageSpec),
logger *zap.Logger,
) (*Protocol, error) {
encryptor, err := newEncryptor(dataDir, dbKey, encryptorConfig)
encryptor, err := newEncryptor(dbPath, dbKey, encryptorConfig)
if err != nil {
return nil, err
}
@ -177,7 +177,7 @@ func (p *Protocol) addBundle(myIdentityKey *ecdsa.PrivateKey, msg *ProtocolMessa
logger.Info("adding bundle to the message",
zap.Any("installations", installations),
zap.Stringer("msg", msg))
)
bundle, err := p.encryptor.CreateBundle(myIdentityKey, installations)
if err != nil {
@ -360,7 +360,7 @@ func (p *Protocol) recoverInstallationsFromBundle(myIdentityKey *ecdsa.PrivateKe
signedPreKeys := bundle.GetSignedPreKeys()
for installationID, signedPreKey := range signedPreKeys {
logger.Info("recovered installation %s", zap.String("installation-id", installationID))
logger.Info("recovered installation", zap.String("installation-id", installationID))
if installationID != p.multidevice.InstallationID() {
installations = append(installations, &multidevice.Installation{
Identity: theirIdentityStr,

View File

@ -19,11 +19,11 @@ func TestProtocolServiceTestSuite(t *testing.T) {
type ProtocolServiceTestSuite struct {
suite.Suite
aliceDir string
bobDir string
alice *Protocol
bob *Protocol
logger *zap.Logger
aliceDBPath *os.File
bobDBPath *os.File
alice *Protocol
bob *Protocol
logger *zap.Logger
}
func (s *ProtocolServiceTestSuite) SetupTest() {
@ -33,11 +33,11 @@ func (s *ProtocolServiceTestSuite) SetupTest() {
s.Require().NoError(err)
s.logger = logger
s.aliceDir, err = ioutil.TempDir("", "alice.db")
s.aliceDBPath, err = ioutil.TempFile("", "alice.db.sql")
s.Require().NoError(err)
aliceDBKey := "alice"
s.bobDir, err = ioutil.TempDir("", "bob.db")
s.bobDBPath, err = ioutil.TempFile("", "bob.db.sql")
s.Require().NoError(err)
bobDBKey := "bob"
@ -45,7 +45,7 @@ func (s *ProtocolServiceTestSuite) SetupTest() {
onNewSharedSecretHandler := func(secret []*sharedsecret.Secret) {}
s.alice, err = New(
s.aliceDir,
s.aliceDBPath.Name(),
aliceDBKey,
"1",
addedBundlesHandler,
@ -56,7 +56,7 @@ func (s *ProtocolServiceTestSuite) SetupTest() {
s.Require().NoError(err)
s.bob, err = New(
s.bobDir,
s.bobDBPath.Name(),
bobDBKey,
"2",
addedBundlesHandler,
@ -68,9 +68,8 @@ func (s *ProtocolServiceTestSuite) SetupTest() {
}
func (s *ProtocolServiceTestSuite) TearDownTest() {
os.Remove(s.aliceDir)
os.Remove(s.bobDir)
os.Remove(s.aliceDBPath.Name())
os.Remove(s.bobDBPath.Name())
_ = s.logger.Sync()
}

View File

@ -5,16 +5,6 @@ import (
"strings"
)
type Persistence interface {
// Add adds a shared secret, associated with an identity and an installationID.
Add(identity []byte, secret []byte, installationID string) error
// Get returns a shared secret associated with multiple installationIDs.
Get(identity []byte, installationIDs []string) (*Response, error)
// All returns an array of shared secrets, each one of them represented.
// as a byte array
All() ([][][]byte, error)
}
type Response struct {
secret []byte
installationIDs map[string]bool

View File

@ -45,7 +45,6 @@ func (s *SharedSecretTestSuite) SetupTest() {
func (s *SharedSecretTestSuite) TearDownTest() {
os.Remove(s.path)
_ = s.logger.Sync()
}

View File

@ -21,7 +21,7 @@ type Secret struct {
// SharedSecret generates and manages negotiated secrets.
// Identities (public keys) stored by SharedSecret
// are compressed.
// TODO: make it a part of sqlitePersistence instead of SharedSecret.
// TODO: make compression of public keys a responsibility of sqlitePersistence instead of SharedSecret.
type SharedSecret struct {
persistence *sqlitePersistence
logger *zap.Logger

View File

@ -3,7 +3,6 @@ package statusproto
import (
"context"
"crypto/ecdsa"
"log"
"path/filepath"
"time"
@ -22,6 +21,12 @@ import (
protocol "github.com/status-im/status-protocol-go/v1"
)
const (
// messagesDatabaseFileName is a name of the SQL file in which
// messages are stored.
messagesDatabaseFileName = "messages.sql"
)
var (
ErrChatIDEmpty = errors.New("chat ID is empty")
ErrNotImplemented = errors.New("not implemented")
@ -40,9 +45,13 @@ type Messenger struct {
adapter *whisperAdapter
encryptor *encryption.Protocol
ownMessages map[string][]*protocol.Message
ownMessages map[string][]*protocol.Message
featureFlags featureFlags
messagesPersistenceEnabled bool
shutdownTasks []func()
logger *zap.Logger
shutdownTasks []func() error
}
type featureFlags struct {
@ -56,14 +65,16 @@ type featureFlags struct {
type config struct {
onNewInstallationsHandler func([]*multidevice.Installation)
onNewSharedSecretHandler func([]*sharedsecret.Secret)
onSendContactCodeHandler func(*encryption.ProtocolMessageSpec)
// DEPRECATED: no need to expose it
onNewSharedSecretHandler func([]*sharedsecret.Secret)
// DEPRECATED: no need to expose it
onSendContactCodeHandler func(*encryption.ProtocolMessageSpec)
publicChatNames []string
publicKeys []*ecdsa.PublicKey
secrets []filter.NegotiatedSecret
encryptionLayerFilePath string
transportLayerFilePath string
featureFlags featureFlags
messagesPersistenceEnabled bool
featureFlags featureFlags
logger *zap.Logger
}
@ -84,19 +95,6 @@ func WithOnNewSharedSecret(h func([]*sharedsecret.Secret)) func(c *config) error
}
}
func WithChats(
publicChatNames []string,
publicKeys []*ecdsa.PublicKey,
secrets []filter.NegotiatedSecret,
) func(c *config) error {
return func(c *config) error {
c.publicChatNames = publicChatNames
c.publicKeys = publicKeys
c.secrets = secrets
return nil
}
}
func WithCustomLogger(logger *zap.Logger) func(c *config) error {
return func(c *config) error {
c.logger = logger
@ -111,6 +109,22 @@ func WithGenericDiscoveryTopicSupport() func(c *config) error {
}
}
func WithMessagesPersistenceEnabled() func(c *config) error {
return func(c *config) error {
c.messagesPersistenceEnabled = true
return nil
}
}
// TODO: use this config fileds.
func WithDatabaseFilePaths(encryptionLayerFilePath, transportLayerFilePath string) func(c *config) error {
return func(c *config) error {
c.encryptionLayerFilePath = encryptionLayerFilePath
c.transportLayerFilePath = transportLayerFilePath
return nil
}
}
func WithSendV1Messages() func(c *config) error {
return func(c *config) error {
c.featureFlags.sendV1Messages = true
@ -176,11 +190,19 @@ func NewMessenger(
}
}
// Set default database file paths.
if c.encryptionLayerFilePath == "" {
c.encryptionLayerFilePath = filepath.Join(dataDir, "sessions.sql")
}
if c.transportLayerFilePath == "" {
c.transportLayerFilePath = filepath.Join(dataDir, "transport.sql")
}
t, err := transport.NewWhisperServiceTransport(
server,
shh,
identity,
dataDir,
c.transportLayerFilePath,
dbKey,
nil,
logger,
@ -189,12 +211,8 @@ func NewMessenger(
return nil, errors.Wrap(err, "failed to create a WhisperServiceTransport")
}
if _, err := t.Init(c.publicChatNames, c.publicKeys, c.secrets, c.featureFlags.genericDiscoveryTopicEnabled); err != nil {
return nil, errors.Wrap(err, "failed to initialize WhisperServiceTransport")
}
encryptionProtocol, err := encryption.New(
dataDir,
c.encryptionLayerFilePath,
dbKey,
installationID,
c.onNewInstallationsHandler,
@ -206,7 +224,8 @@ func NewMessenger(
return nil, errors.Wrap(err, "failed to create the encryption layer")
}
messagesDB, err := sqlite.Open(filepath.Join(dataDir, "messages.sql"), dbKey, sqlite.MigrationConfig{
applicationLayerFilePath := filepath.Join(dataDir, messagesDatabaseFileName)
applicationLayerPersistence, err := sqlite.Open(applicationLayerFilePath, dbKey, sqlite.MigrationConfig{
AssetNames: migrations.AssetNames(),
AssetGetter: func(name string) ([]byte, error) {
return migrations.Asset(name)
@ -216,20 +235,22 @@ func NewMessenger(
return nil, errors.Wrap(err, "failed to initialize messages db")
}
persistence := &sqlitePersistence{db: applicationLayerPersistence}
adapter := newWhisperAdapter(identity, t, encryptionProtocol, c.featureFlags, logger)
messenger = &Messenger{
identity: identity,
persistence: &sqlitePersistence{db: messagesDB},
adapter: newWhisperAdapter(identity, t, encryptionProtocol, c.featureFlags, logger),
encryptor: encryptionProtocol,
ownMessages: make(map[string][]*protocol.Message),
shutdownTasks: []func(){
func() { messenger.persistence.Close() },
func() {
if err := logger.Sync(); err != nil {
log.Printf("failed to initialize log on shutdown")
}
},
identity: identity,
persistence: persistence,
adapter: adapter,
encryptor: encryptionProtocol,
ownMessages: make(map[string][]*protocol.Message),
featureFlags: c.featureFlags,
messagesPersistenceEnabled: c.messagesPersistenceEnabled,
shutdownTasks: []func() error{
persistence.Close,
adapter.transport.Reset,
logger.Sync,
},
logger: logger,
}
// Start all services immediately.
@ -238,14 +259,26 @@ func NewMessenger(
return nil, err
}
logger.Debug("messages persistence", zap.Bool("enabled", c.messagesPersistenceEnabled))
return messenger, nil
}
// Shutdown takes care of ensuring a clean shutdown of Messenger
func (m *Messenger) Shutdown() {
func (m *Messenger) Shutdown() (err error) {
for _, task := range m.shutdownTasks {
task()
if tErr := task(); tErr != nil {
if err == nil {
// First error appeared.
err = tErr
} else {
// We return all errors. They will be concatenated in the order of occurrence,
// however, they will also be returned as a single error.
err = errors.Wrap(err, tErr.Error())
}
}
}
return
}
func (m *Messenger) handleSharedSecrets(secrets []*sharedsecret.Secret) error {
@ -268,22 +301,22 @@ func (m *Messenger) SetInstallationMetadata(id string, data *multidevice.Install
return m.encryptor.SetInstallationMetadata(&m.identity.PublicKey, id, data)
}
// NOT_IMPLEMENTED
// NOT IMPLEMENTED
func (m *Messenger) SelectMailserver(id string) error {
return ErrNotImplemented
}
// NOT_IMPLEMENTED
// NOT IMPLEMENTED
func (m *Messenger) AddMailserver(enode string) error {
return ErrNotImplemented
}
// NOT_IMPLEMENTED
// NOT IMPLEMENTED
func (m *Messenger) RemoveMailserver(id string) error {
return ErrNotImplemented
}
// NOT_IMPLEMENTED
// NOT IMPLEMENTED
func (m *Messenger) Mailservers() ([]string, error) {
return nil, ErrNotImplemented
}
@ -326,9 +359,12 @@ func (m *Messenger) Send(ctx context.Context, chat Chat, data []byte) ([]byte, e
// Save our message because it won't be received from the transport layer.
message.ID = hash // a Message need ID to be properly stored in the db
message.SigPubKey = &m.identity.PublicKey
_, err = m.persistence.SaveMessages(chat.ID(), []*protocol.Message{message})
if err != nil {
return nil, err
if m.messagesPersistenceEnabled {
_, err = m.persistence.SaveMessages(chat.ID(), []*protocol.Message{message})
if err != nil {
return nil, err
}
}
// Cache it to be returned in Retrieve().
@ -341,6 +377,17 @@ func (m *Messenger) Send(ctx context.Context, chat Chat, data []byte) ([]byte, e
return nil, errors.New("chat is neither public nor private")
}
// SendRaw takes encoded data, encrypts it and sends through the wire.
// DEPRECATED
func (m *Messenger) SendRaw(ctx context.Context, chat Chat, data []byte) ([]byte, whisper.NewMessage, error) {
if chat.PublicKey() != nil {
return m.adapter.SendPrivateRaw(ctx, chat.PublicKey(), data)
} else if chat.PublicName() != "" {
return m.adapter.SendPublicRaw(ctx, chat.PublicName(), data)
}
return nil, whisper.NewMessage{}, errors.New("chat is neither public nor private")
}
type RetrieveConfig struct {
From time.Time
To time.Time
@ -354,14 +401,16 @@ var (
)
func (m *Messenger) Retrieve(ctx context.Context, chat Chat, c RetrieveConfig) (messages []*protocol.Message, err error) {
var latest []*protocol.Message
var (
latest []*protocol.Message
ownLatest []*protocol.Message
)
if chat.PublicKey() != nil {
latest, err = m.adapter.RetrievePrivateMessages(chat.PublicKey())
// Return any own messages for this chat as well.
if ownMessages, ok := m.ownMessages[chat.ID()]; ok {
latest = append(latest, ownMessages...)
delete(m.ownMessages, chat.ID())
ownLatest = ownMessages
}
} else if chat.PublicName() != "" {
latest, err = m.adapter.RetrievePublicMessages(chat.PublicName())
@ -374,15 +423,34 @@ func (m *Messenger) Retrieve(ctx context.Context, chat Chat, c RetrieveConfig) (
return
}
_, err = m.persistence.SaveMessages(chat.ID(), latest)
if err != nil {
return nil, errors.Wrap(err, "failed to save latest messages")
if m.messagesPersistenceEnabled {
_, err = m.persistence.SaveMessages(chat.ID(), latest)
if err != nil {
return nil, errors.Wrap(err, "failed to save latest messages")
}
}
return m.retrieveMessages(ctx, chat, c, latest)
// Confirm received and decrypted messages.
if m.messagesPersistenceEnabled && chat.PublicKey() != nil {
for _, message := range latest {
// Confirm received and decrypted messages.
if err := m.encryptor.ConfirmMessageProcessed(message.ID); err != nil {
return nil, errors.Wrap(err, "failed to confirm message being processed")
}
}
}
// When our messages are returned, we can delete them.
delete(m.ownMessages, chat.ID())
return m.retrieveSaved(ctx, chat, c, append(latest, ownLatest...))
}
func (m *Messenger) retrieveMessages(ctx context.Context, chat Chat, c RetrieveConfig, latest []*protocol.Message) (messages []*protocol.Message, err error) {
func (m *Messenger) retrieveSaved(ctx context.Context, chat Chat, c RetrieveConfig, latest []*protocol.Message) (messages []*protocol.Message, err error) {
if !m.messagesPersistenceEnabled {
return latest, nil
}
if !c.latest {
return m.persistence.Messages(chat.ID(), c.From, c.To)
}
@ -395,3 +463,33 @@ func (m *Messenger) retrieveMessages(ctx context.Context, chat Chat, c RetrieveC
return latest, nil
}
// DEPRECATED
func (m *Messenger) RetrieveRawAll() (map[filter.Chat][]*whisper.Message, error) {
return m.adapter.RetrieveRawAll()
}
// DEPRECATED
func (m *Messenger) RetrieveRawWithFilter(filterID string) ([]*whisper.Message, error) {
return m.adapter.RetrieveRaw(filterID)
}
// DEPRECATED
func (m *Messenger) LoadFilters(chats []*filter.Chat) ([]*filter.Chat, error) {
return m.adapter.transport.LoadFilters(chats, m.featureFlags.genericDiscoveryTopicEnabled)
}
// DEPRECATED
func (m *Messenger) RemoveFilters(chats []*filter.Chat) error {
return m.adapter.transport.RemoveFilters(chats)
}
// DEPRECATED
func (m *Messenger) ConfirmMessagesProcessed(messageIDs [][]byte) error {
for _, id := range messageIDs {
if err := m.encryptor.ConfirmMessageProcessed(id); err != nil {
return err
}
}
return nil
}

View File

@ -76,13 +76,13 @@ func (s *MessengerSuite) SetupTest() {
"some-key",
"installation-1",
WithCustomLogger(logger),
WithMessagesPersistenceEnabled(),
)
s.Require().NoError(err)
}
func (s *MessengerSuite) TearDownTest() {
os.Remove(s.tmpDir)
_ = s.logger.Sync()
}

View File

@ -35,6 +35,7 @@ type NegotiatedSecret struct {
Key []byte
}
// TODO: revise fields encoding/decoding. Some are encoded using hexutil and some using encoding/hex.
type Chat struct {
// ChatID is the identifier of the chat
ChatID string `json:"chatId"`
@ -81,30 +82,34 @@ func New(db *sql.DB, w *whisper.Whisper, privateKey *ecdsa.PrivateKey, logger *z
return nil, err
}
keys, err := persistence.All()
if err != nil {
return nil, err
}
return &ChatsManager{
privateKey: privateKey,
whisper: w,
persistence: persistence,
keys: keys,
chats: make(map[string]*Chat),
logger: logger.With(zap.Namespace("ChatsManager")),
}, nil
}
func (s *ChatsManager) Init(chatIDs []string, publicKeys []*ecdsa.PublicKey, negotiated []NegotiatedSecret, genericDiscoveryTopicEnabled bool) ([]*Chat, error) {
func (s *ChatsManager) Init(
chatIDs []string,
publicKeys []*ecdsa.PublicKey,
genericDiscoveryTopicEnabled bool,
) ([]*Chat, error) {
logger := s.logger.With(zap.String("site", "Init"))
logger.Info("initializing")
s.genericDiscoveryTopicEnabled = genericDiscoveryTopicEnabled
keys, err := s.persistence.All()
if err != nil {
return nil, err
}
s.keys = keys
// Load our contact code.
_, err = s.LoadContactCode(&s.privateKey.PublicKey)
_, err := s.LoadContactCode(&s.privateKey.PublicKey)
if err != nil {
return nil, errors.Wrap(err, "failed to load contact code")
}
@ -136,12 +141,6 @@ func (s *ChatsManager) Init(chatIDs []string, publicKeys []*ecdsa.PublicKey, neg
}
}
for _, secret := range negotiated {
if _, err := s.LoadNegotiated(secret); err != nil {
return nil, err
}
}
s.mutex.Lock()
defer s.mutex.Unlock()
@ -152,7 +151,29 @@ func (s *ChatsManager) Init(chatIDs []string, publicKeys []*ecdsa.PublicKey, neg
return allChats, nil
}
func (s *ChatsManager) Uninitialize() error {
// DEPRECATED
func (s *ChatsManager) InitWithChats(chats []*Chat, genericDiscoveryTopicEnabled bool) ([]*Chat, error) {
var (
chatIDs []string
publicKeys []*ecdsa.PublicKey
)
for _, chat := range chats {
if chat.Identity != "" && chat.OneToOne {
publicKey, err := strToPublicKey(chat.Identity)
if err != nil {
return nil, err
}
publicKeys = append(publicKeys, publicKey)
} else if chat.ChatID != "" {
chatIDs = append(chatIDs, chat.ChatID)
}
}
return s.Init(chatIDs, publicKeys, genericDiscoveryTopicEnabled)
}
func (s *ChatsManager) Reset() error {
var chats []*Chat
s.mutex.Lock()
@ -182,6 +203,17 @@ func (s *ChatsManager) ChatByID(chatID string) *Chat {
return s.chats[chatID]
}
func (s *ChatsManager) ChatByFilterID(filterID string) *Chat {
s.mutex.Lock()
defer s.mutex.Unlock()
for _, chat := range s.chats {
if chat.FilterID == filterID {
return chat
}
}
return nil
}
func (s *ChatsManager) ChatsByPublicKey(publicKey *ecdsa.PublicKey) (result []*Chat) {
s.mutex.Lock()
defer s.mutex.Unlock()
@ -503,57 +535,6 @@ func (s *ChatsManager) GetNegotiated(identity *ecdsa.PublicKey) *Chat {
return s.chats[negotiatedTopic(identity)]
}
// DEPRECATED
func (s *ChatsManager) InitDeprecated(chats []*Chat, secrets []NegotiatedSecret, genericDiscoveryTopicEnabled bool) ([]*Chat, error) {
var (
chatIDs []string
publicKeys []*ecdsa.PublicKey
)
for _, chat := range chats {
if chat.ChatID != "" {
chatIDs = append(chatIDs, chat.ChatID)
} else if chat.Identity != "" {
publicKeyBytes, err := hex.DecodeString(chat.Identity)
if err != nil {
return nil, err
}
publicKey, err := crypto.UnmarshalPubkey(publicKeyBytes)
if err != nil {
return nil, err
}
publicKeys = append(publicKeys, publicKey)
}
}
return s.Init(chatIDs, publicKeys, secrets, genericDiscoveryTopicEnabled)
}
// DEPRECATED
func (s *ChatsManager) Load(chat *Chat) ([]*Chat, error) {
if chat.ChatID != "" {
chat, err := s.LoadPublic(chat.ChatID)
return []*Chat{chat}, err
} else if chat.Identity != "" {
publicKeyBytes, err := hex.DecodeString(chat.Identity)
if err != nil {
return nil, err
}
publicKey, err := crypto.UnmarshalPubkey(publicKeyBytes)
if err != nil {
return nil, err
}
chat, err := s.LoadContactCode(publicKey)
return []*Chat{chat}, err
}
return nil, errors.New("invalid Chat to load")
}
// toTopic converts a string to a whisper topic.
func toTopic(s string) []byte {
return crypto.Keccak256([]byte(s))[:whisper.TopicLength]
@ -563,6 +544,14 @@ func ToTopic(s string) []byte {
return toTopic(s)
}
func strToPublicKey(str string) (*ecdsa.PublicKey, error) {
publicKeyBytes, err := hex.DecodeString(str)
if err != nil {
return nil, err
}
return crypto.UnmarshalPubkey(publicKeyBytes)
}
func publicKeyToStr(publicKey *ecdsa.PublicKey) string {
return hex.EncodeToString(crypto.FromECDSAPub(publicKey))
}

View File

@ -88,12 +88,11 @@ func (s *ChatsTestSuite) SetupTest() {
func (s *ChatsTestSuite) TearDownTest() {
os.Remove(s.dbPath)
_ = s.logger.Sync()
}
func (s *ChatsTestSuite) TestDiscoveryAndPartitionedTopic() {
_, err := s.chats.InitDeprecated(nil, nil, true)
_, err := s.chats.Init(nil, nil, true)
s.Require().NoError(err)
s.Require().Equal(4, len(s.chats.chats), "It creates four filters")
@ -106,7 +105,7 @@ func (s *ChatsTestSuite) TestDiscoveryAndPartitionedTopic() {
}
func (s *ChatsTestSuite) TestPartitionedTopicWithDiscoveryDisabled() {
_, err := s.chats.InitDeprecated(nil, nil, false)
_, err := s.chats.Init(nil, nil, false)
s.Require().NoError(err)
s.Require().Equal(3, len(s.chats.chats), "It creates three filters")

View File

@ -1,30 +1,9 @@
package whisper
import (
"context"
"crypto/ecdsa"
"github.com/status-im/status-protocol-go/transport/whisper/filter"
whisper "github.com/status-im/whisper/whisperv6"
)
// WhisperTransport defines an interface which each Whisper transport
// should conform to.
type WhisperTransport interface {
JoinPublic(string) error
LeavePublic(string) error
JoinPrivate(*ecdsa.PublicKey) error
LeavePrivate(*ecdsa.PublicKey) error
RetrievePublicMessages(string) ([]*whisper.ReceivedMessage, error)
RetrievePrivateMessages(*ecdsa.PublicKey) ([]*whisper.ReceivedMessage, error)
SendPublic(context.Context, whisper.NewMessage, string) ([]byte, error)
SendPrivateWithSharedSecret(context.Context, whisper.NewMessage, *ecdsa.PublicKey, []byte) ([]byte, error)
SendPrivateWithPartitioned(context.Context, whisper.NewMessage, *ecdsa.PublicKey) ([]byte, error)
SendPrivateOnDiscovery(context.Context, whisper.NewMessage, *ecdsa.PublicKey) ([]byte, error)
ProcessNegotiatedSecret(filter.NegotiatedSecret) error
Request(context.Context, RequestOptions) error
}
type RequestOptions struct {
Topics []whisper.TopicType
Password string

View File

@ -6,7 +6,6 @@ import (
"encoding/hex"
"fmt"
"math/big"
"path/filepath"
"sync"
"time"
@ -85,20 +84,17 @@ type WhisperServiceTransport struct {
selectedMailServerEnode string
}
var _ WhisperTransport = (*WhisperServiceTransport)(nil)
// NewWhisperService returns a new WhisperServiceTransport.
func NewWhisperServiceTransport(
node Server,
shh *whisper.Whisper,
privateKey *ecdsa.PrivateKey,
dataDir string,
dbPath string,
dbKey string,
mailservers []string,
logger *zap.Logger,
) (*WhisperServiceTransport, error) {
// DB is shared between this package and all sub-packages.
dbPath := filepath.Join(dataDir, "transport.sql")
db, err := sqlite.Open(dbPath, dbKey, sqlite.MigrationConfig{
AssetNames: migrations.AssetNames(),
AssetGetter: func(name string) ([]byte, error) {
@ -128,13 +124,19 @@ func NewWhisperServiceTransport(
logger: logger.With(zap.Namespace("WhisperServiceTransport")),
}, nil
}
func (a *WhisperServiceTransport) Init(
chatIDs []string,
publicKeys []*ecdsa.PublicKey,
negotiated []filter.NegotiatedSecret,
genericDiscoveryTopicEnabled bool,
) ([]*filter.Chat, error) {
return a.chats.Init(chatIDs, publicKeys, negotiated, genericDiscoveryTopicEnabled)
// DEPRECATED
func (a *WhisperServiceTransport) LoadFilters(chats []*filter.Chat, genericDiscoveryTopicEnabled bool) ([]*filter.Chat, error) {
return a.chats.InitWithChats(chats, genericDiscoveryTopicEnabled)
}
// DEPRECATED
func (a *WhisperServiceTransport) RemoveFilters(chats []*filter.Chat) error {
return a.chats.Remove(chats...)
}
func (a *WhisperServiceTransport) Reset() error {
return a.chats.Reset()
}
func (a *WhisperServiceTransport) ProcessNegotiatedSecret(secret filter.NegotiatedSecret) error {
@ -199,6 +201,31 @@ func (a *WhisperServiceTransport) RetrievePrivateMessages(publicKey *ecdsa.Publi
return result, nil
}
// DEPRECATED
func (a *WhisperServiceTransport) RetrieveRawAll() (map[filter.Chat][]*whisper.ReceivedMessage, error) {
result := make(map[filter.Chat][]*whisper.ReceivedMessage)
allChats := a.chats.Chats()
for _, chat := range allChats {
f := a.shh.GetFilter(chat.FilterID)
if f == nil {
return nil, errors.New("failed to return a filter")
}
result[*chat] = append(result[*chat], f.Retrieve()...)
}
return result, nil
}
// DEPRECATED
func (a *WhisperServiceTransport) RetrieveRaw(filterID string) ([]*whisper.ReceivedMessage, error) {
f := a.shh.GetFilter(filterID)
if f == nil {
return nil, errors.New("failed to return a filter")
}
return f.Retrieve(), nil
}
// SendPublic sends a new message using the Whisper service.
// For public chats, chat name is used as an ID as well as
// a topic.

View File

@ -10,15 +10,10 @@ import (
)
func TestSelectAndAddNoMailservers(t *testing.T) {
dbDir, err := ioutil.TempDir("", "transport")
require.NoError(t, err)
defer os.Remove(dbDir)
logger, err := zap.NewDevelopment()
require.NoError(t, err)
svc, err := NewWhisperServiceTransport(nil, nil, nil, dbDir, "some-key", nil, logger)
require.NoError(t, err)
logger := zap.NewNop()
svc := &WhisperServiceTransport{
logger: logger,
}
rst, err := svc.selectAndAddMailServer()
require.Empty(t, rst)
@ -27,14 +22,14 @@ func TestSelectAndAddNoMailservers(t *testing.T) {
}
func TestNewWhisperServiceTransport(t *testing.T) {
dbDir, err := ioutil.TempDir("", "transport")
dbPath, err := ioutil.TempFile("", "transport.sql")
require.NoError(t, err)
defer os.Remove(dbDir)
defer os.Remove(dbPath.Name())
logger, err := zap.NewDevelopment()
require.NoError(t, err)
defer func() { _ = logger.Sync() }()
_, err = NewWhisperServiceTransport(nil, nil, nil, dbDir, "some-key", nil, logger)
_, err = NewWhisperServiceTransport(nil, nil, nil, dbPath.Name(), "some-key", nil, logger)
require.NoError(t, err)
_ = logger.Sync()
}