Publisher refactor (#1508)
This commit is contained in:
parent
699b70fea2
commit
aa39647094
|
@ -17,8 +17,15 @@ import (
|
|||
"github.com/status-im/status-go/messaging/chat/protobuf"
|
||||
)
|
||||
|
||||
var ErrSessionNotFound = errors.New("session not found")
|
||||
var ErrDeviceNotFound = errors.New("device not found")
|
||||
var (
|
||||
ErrSessionNotFound = errors.New("session not found")
|
||||
ErrDeviceNotFound = errors.New("device not found")
|
||||
// ErrNotPairedDevice means that we received a message signed with our public key
|
||||
// but from a device that has not been paired.
|
||||
// This should not happen because the protocol forbids sending a message to
|
||||
// non-paired devices, however, in theory it is possible to receive such a message.
|
||||
ErrNotPairedDevice = errors.New("received a message from not paired device")
|
||||
)
|
||||
|
||||
// If we have no bundles, we use a constant so that the message can reach any device.
|
||||
const noInstallationID = "none"
|
||||
|
@ -228,9 +235,12 @@ func (s *EncryptionService) DecryptPayload(myIdentityKey *ecdsa.PrivateKey, thei
|
|||
}
|
||||
|
||||
// We should not be sending a signal if it's coming from us, as we receive our own messages
|
||||
if msg == nil && *theirIdentityKey != myIdentityKey.PublicKey {
|
||||
if msg == nil && !samePublicKeys(*theirIdentityKey, myIdentityKey.PublicKey) {
|
||||
return nil, ErrDeviceNotFound
|
||||
} else if msg == nil {
|
||||
return nil, ErrNotPairedDevice
|
||||
}
|
||||
|
||||
payload := msg.GetPayload()
|
||||
|
||||
if x3dhHeader := msg.GetX3DHHeader(); x3dhHeader != nil {
|
||||
|
@ -567,3 +577,7 @@ func (s *EncryptionService) EncryptPayload(theirIdentityKey *ecdsa.PublicKey, my
|
|||
|
||||
return response, targetedInstallations, nil
|
||||
}
|
||||
|
||||
func samePublicKeys(pubKey1, pubKey2 ecdsa.PublicKey) bool {
|
||||
return pubKey1.X.Cmp(pubKey2.X) == 0 && pubKey1.Y.Cmp(pubKey2.Y) == 0
|
||||
}
|
||||
|
|
|
@ -32,7 +32,10 @@ type ProtocolService struct {
|
|||
Enabled bool
|
||||
}
|
||||
|
||||
var ErrNotProtocolMessage = errors.New("Not a protocol message")
|
||||
var (
|
||||
ErrNotProtocolMessage = errors.New("not a protocol message")
|
||||
ErrNoPayload = errors.New("no payload")
|
||||
)
|
||||
|
||||
// NewProtocolService creates a new ProtocolService instance
|
||||
func NewProtocolService(encryption *EncryptionService, secret *sharedsecret.Service, multidevice *multidevice.Service, addedBundlesHandler func([]*multidevice.Installation), onNewSharedSecretHandler func([]*sharedsecret.Secret)) *ProtocolService {
|
||||
|
@ -313,7 +316,7 @@ func (p *ProtocolService) HandleMessage(myIdentityKey *ecdsa.PrivateKey, theirPu
|
|||
}
|
||||
|
||||
// Return error
|
||||
return nil, errors.New("no payload")
|
||||
return nil, ErrNoPayload
|
||||
}
|
||||
|
||||
func getProtocolVersion(bundles []*protobuf.Bundle, installationID string) uint32 {
|
||||
|
|
|
@ -0,0 +1,448 @@
|
|||
package publisher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
|
||||
"github.com/status-im/status-go/messaging/chat"
|
||||
"github.com/status-im/status-go/messaging/chat/multidevice"
|
||||
"github.com/status-im/status-go/messaging/chat/protobuf"
|
||||
"github.com/status-im/status-go/messaging/filter"
|
||||
|
||||
"github.com/status-im/status-go/services/shhext/whisperutils"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
whisper "github.com/status-im/whisper/whisperv6"
|
||||
)
|
||||
|
||||
const (
|
||||
tickerInterval = 120
|
||||
// How often we should publish a contact code in seconds
|
||||
publishInterval = 21600
|
||||
)
|
||||
|
||||
var (
|
||||
errProtocolNotInitialized = errors.New("protocol is not initialized")
|
||||
// ErrPFSNotEnabled is returned when an endpoint PFS only is called but
|
||||
// PFS is disabled.
|
||||
ErrPFSNotEnabled = errors.New("pfs not enabled")
|
||||
errNoKeySelected = errors.New("no key selected")
|
||||
// ErrNoProtocolMessage means that a message was not a protocol message,
|
||||
// that is it could not be unmarshaled.
|
||||
ErrNoProtocolMessage = errors.New("not a protocol message")
|
||||
)
|
||||
|
||||
type Publisher struct {
|
||||
config Config
|
||||
whisper *whisper.Whisper
|
||||
online func() bool
|
||||
whisperAPI *whisper.PublicWhisperAPI
|
||||
protocol *chat.ProtocolService
|
||||
persistence Persistence
|
||||
log log.Logger
|
||||
filter *filter.Service
|
||||
quit chan struct{}
|
||||
ticker *time.Ticker
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
PFSEnabled bool
|
||||
}
|
||||
|
||||
func New(w *whisper.Whisper, c Config) *Publisher {
|
||||
return &Publisher{
|
||||
config: c,
|
||||
whisper: w,
|
||||
whisperAPI: whisper.NewPublicWhisperAPI(w),
|
||||
log: log.New("package", "status-go/services/publisher.Publisher"),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Publisher) Init(db *sql.DB, protocol *chat.ProtocolService, filter *filter.Service) {
|
||||
p.persistence = NewSQLLitePersistence(db)
|
||||
p.protocol = protocol
|
||||
p.filter = filter
|
||||
}
|
||||
|
||||
func (p *Publisher) ProcessPublicBundle(myIdentityKey *ecdsa.PrivateKey, bundle *protobuf.Bundle) ([]*multidevice.Installation, error) {
|
||||
if p.protocol == nil {
|
||||
return nil, errProtocolNotInitialized
|
||||
}
|
||||
|
||||
return p.protocol.ProcessPublicBundle(myIdentityKey, bundle)
|
||||
}
|
||||
|
||||
func (p *Publisher) GetBundle(myIdentityKey *ecdsa.PrivateKey) (*protobuf.Bundle, error) {
|
||||
if p.protocol == nil {
|
||||
return nil, errProtocolNotInitialized
|
||||
}
|
||||
|
||||
return p.protocol.GetBundle(myIdentityKey)
|
||||
}
|
||||
|
||||
// EnableInstallation enables an installation for multi-device sync.
|
||||
func (p *Publisher) EnableInstallation(installationID string) error {
|
||||
if p.protocol == nil {
|
||||
return errProtocolNotInitialized
|
||||
}
|
||||
|
||||
privateKeyID := p.whisper.SelectedKeyPairID()
|
||||
if privateKeyID == "" {
|
||||
return errNoKeySelected
|
||||
}
|
||||
|
||||
privateKey, err := p.whisper.GetPrivateKey(privateKeyID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return p.protocol.EnableInstallation(&privateKey.PublicKey, installationID)
|
||||
}
|
||||
|
||||
// DisableInstallation disables an installation for multi-device sync.
|
||||
func (p *Publisher) DisableInstallation(installationID string) error {
|
||||
if p.protocol == nil {
|
||||
return errProtocolNotInitialized
|
||||
}
|
||||
|
||||
privateKeyID := p.whisper.SelectedKeyPairID()
|
||||
if privateKeyID == "" {
|
||||
return errNoKeySelected
|
||||
}
|
||||
|
||||
privateKey, err := p.whisper.GetPrivateKey(privateKeyID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return p.protocol.DisableInstallation(&privateKey.PublicKey, installationID)
|
||||
}
|
||||
|
||||
// GetOurInstallations returns all the installations available given an identity
|
||||
func (p *Publisher) GetOurInstallations() ([]*multidevice.Installation, error) {
|
||||
if p.protocol == nil {
|
||||
return nil, errProtocolNotInitialized
|
||||
}
|
||||
|
||||
privateKeyID := p.whisper.SelectedKeyPairID()
|
||||
if privateKeyID == "" {
|
||||
return nil, errNoKeySelected
|
||||
}
|
||||
|
||||
privateKey, err := p.whisper.GetPrivateKey(privateKeyID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return p.protocol.GetOurInstallations(&privateKey.PublicKey)
|
||||
}
|
||||
|
||||
// SetInstallationMetadata sets the metadata for our own installation
|
||||
func (p *Publisher) SetInstallationMetadata(installationID string, data *multidevice.InstallationMetadata) error {
|
||||
if p.protocol == nil {
|
||||
return errProtocolNotInitialized
|
||||
}
|
||||
|
||||
privateKeyID := p.whisper.SelectedKeyPairID()
|
||||
if privateKeyID == "" {
|
||||
return errNoKeySelected
|
||||
}
|
||||
|
||||
privateKey, err := p.whisper.GetPrivateKey(privateKeyID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return p.protocol.SetInstallationMetadata(&privateKey.PublicKey, installationID, data)
|
||||
}
|
||||
|
||||
func (p *Publisher) GetPublicBundle(identityKey *ecdsa.PublicKey) (*protobuf.Bundle, error) {
|
||||
if p.protocol == nil {
|
||||
return nil, errProtocolNotInitialized
|
||||
}
|
||||
|
||||
return p.protocol.GetPublicBundle(identityKey)
|
||||
}
|
||||
|
||||
func (p *Publisher) Start(online func() bool, startTicker bool) error {
|
||||
p.online = online
|
||||
if startTicker {
|
||||
p.startTicker()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Publisher) Stop() error {
|
||||
if p.filter != nil {
|
||||
if err := p.filter.Stop(); err != nil {
|
||||
log.Error("Failed to stop filter service with error", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Publisher) getNegotiatedChat(identity *ecdsa.PublicKey) *filter.Chat {
|
||||
return p.filter.GetNegotiated(identity)
|
||||
}
|
||||
|
||||
func (p *Publisher) LoadFilters(chats []*filter.Chat) ([]*filter.Chat, error) {
|
||||
return p.filter.Init(chats)
|
||||
}
|
||||
|
||||
func (p *Publisher) LoadFilter(chat *filter.Chat) ([]*filter.Chat, error) {
|
||||
return p.filter.Load(chat)
|
||||
}
|
||||
|
||||
func (p *Publisher) RemoveFilters(chats []*filter.Chat) error {
|
||||
return p.filter.Remove(chats)
|
||||
}
|
||||
|
||||
func (p *Publisher) ProcessMessage(msg *whisper.Message, msgID []byte) error {
|
||||
if !p.config.PFSEnabled {
|
||||
return ErrPFSNotEnabled
|
||||
}
|
||||
|
||||
privateKeyID := p.whisper.SelectedKeyPairID()
|
||||
if privateKeyID == "" {
|
||||
return errNoKeySelected
|
||||
}
|
||||
|
||||
privateKey, err := p.whisper.GetPrivateKey(privateKeyID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
publicKey, err := crypto.UnmarshalPubkey(msg.Sig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Unmarshal message
|
||||
protocolMessage := &protobuf.ProtocolMessage{}
|
||||
|
||||
if err := proto.Unmarshal(msg.Payload, protocolMessage); err != nil {
|
||||
p.log.Debug("Not a protocol message", "err", err)
|
||||
return ErrNoProtocolMessage
|
||||
}
|
||||
|
||||
response, err := p.protocol.HandleMessage(privateKey, publicKey, protocolMessage, msgID)
|
||||
if err == nil {
|
||||
msg.Payload = response
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// CreateDirectMessage creates a 1:1 chat message
|
||||
func (p *Publisher) CreateDirectMessage(signature string, destination hexutil.Bytes, DH bool, payload []byte) (*whisper.NewMessage, error) {
|
||||
if !p.config.PFSEnabled {
|
||||
return nil, ErrPFSNotEnabled
|
||||
}
|
||||
|
||||
privateKey, err := p.whisper.GetPrivateKey(signature)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
publicKey, err := crypto.UnmarshalPubkey(destination)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var msgSpec *chat.ProtocolMessageSpec
|
||||
|
||||
if DH {
|
||||
p.log.Debug("Building dh message")
|
||||
msgSpec, err = p.protocol.BuildDHMessage(privateKey, publicKey, payload)
|
||||
} else {
|
||||
p.log.Debug("Building direct message")
|
||||
msgSpec, err = p.protocol.BuildDirectMessage(privateKey, publicKey, payload)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
whisperMessage, err := p.directMessageToWhisper(privateKey, publicKey, destination, signature, msgSpec)
|
||||
if err != nil {
|
||||
p.log.Error("sshext-service", "error building whisper message", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return whisperMessage, nil
|
||||
}
|
||||
|
||||
func (p *Publisher) directMessageToWhisper(myPrivateKey *ecdsa.PrivateKey, theirPublicKey *ecdsa.PublicKey, destination hexutil.Bytes, signature string, spec *chat.ProtocolMessageSpec) (*whisper.NewMessage, error) {
|
||||
// marshal for sending to wire
|
||||
marshaledMessage, err := proto.Marshal(spec.Message)
|
||||
if err != nil {
|
||||
p.log.Error("encryption-service", "error marshaling message", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
whisperMessage := whisperutils.DefaultWhisperMessage()
|
||||
whisperMessage.Payload = marshaledMessage
|
||||
whisperMessage.Sig = signature
|
||||
|
||||
if spec.SharedSecret != nil {
|
||||
chat := p.getNegotiatedChat(theirPublicKey)
|
||||
if chat != nil {
|
||||
p.log.Debug("Sending on negotiated topic", "public-key", destination)
|
||||
whisperMessage.SymKeyID = chat.SymKeyID
|
||||
whisperMessage.Topic = chat.Topic
|
||||
whisperMessage.PublicKey = nil
|
||||
return &whisperMessage, nil
|
||||
}
|
||||
} else if spec.PartitionedTopic() == chat.PartitionTopicV1 {
|
||||
p.log.Debug("Sending on partitioned topic", "public-key", destination)
|
||||
// Create filter on demand
|
||||
if _, err := p.filter.LoadPartitioned(myPrivateKey, theirPublicKey, false); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
t := filter.PublicKeyToPartitionedTopicBytes(theirPublicKey)
|
||||
whisperMessage.Topic = whisper.BytesToTopic(t)
|
||||
whisperMessage.PublicKey = destination
|
||||
return &whisperMessage, nil
|
||||
}
|
||||
|
||||
p.log.Debug("Sending on old discovery topic", "public-key", destination)
|
||||
whisperMessage.Topic = whisperutils.DiscoveryTopicBytes
|
||||
whisperMessage.PublicKey = destination
|
||||
|
||||
return &whisperMessage, nil
|
||||
}
|
||||
|
||||
// CreatePublicMessage sends a public chat message to the underlying transport
|
||||
func (p *Publisher) CreatePublicMessage(signature string, chatID string, payload []byte, wrap bool) (*whisper.NewMessage, error) {
|
||||
if !p.config.PFSEnabled {
|
||||
return nil, ErrPFSNotEnabled
|
||||
}
|
||||
|
||||
filter := p.filter.GetByID(chatID)
|
||||
if filter == nil {
|
||||
return nil, errors.New("not subscribed to chat")
|
||||
}
|
||||
p.log.Info("SIG", signature)
|
||||
|
||||
// Enrich with transport layer info
|
||||
whisperMessage := whisperutils.DefaultWhisperMessage()
|
||||
whisperMessage.Sig = signature
|
||||
whisperMessage.Topic = whisperutils.ToTopic(chatID)
|
||||
whisperMessage.SymKeyID = filter.SymKeyID
|
||||
|
||||
if wrap {
|
||||
privateKeyID := p.whisper.SelectedKeyPairID()
|
||||
if privateKeyID == "" {
|
||||
return nil, errNoKeySelected
|
||||
}
|
||||
|
||||
privateKey, err := p.whisper.GetPrivateKey(privateKeyID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
message, err := p.protocol.BuildPublicMessage(privateKey, payload)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
marshaledMessage, err := proto.Marshal(message)
|
||||
if err != nil {
|
||||
p.log.Error("encryption-service", "error marshaling message", err)
|
||||
return nil, err
|
||||
}
|
||||
whisperMessage.Payload = marshaledMessage
|
||||
|
||||
} else {
|
||||
whisperMessage.Payload = payload
|
||||
}
|
||||
|
||||
return &whisperMessage, nil
|
||||
}
|
||||
|
||||
func (p *Publisher) ConfirmMessagesProcessed(ids [][]byte) error {
|
||||
return p.protocol.ConfirmMessagesProcessed(ids)
|
||||
}
|
||||
|
||||
func (p *Publisher) startTicker() {
|
||||
p.ticker = time.NewTicker(tickerInterval * time.Second)
|
||||
p.quit = make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-p.ticker.C:
|
||||
_, err := p.sendContactCode()
|
||||
if err != nil {
|
||||
p.log.Error("could not execute tick", "err", err)
|
||||
}
|
||||
case <-p.quit:
|
||||
p.ticker.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (p *Publisher) sendContactCode() (*whisper.NewMessage, error) {
|
||||
p.log.Info("publishing bundle")
|
||||
if !p.config.PFSEnabled {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
lastPublished, err := p.persistence.Get()
|
||||
if err != nil {
|
||||
p.log.Error("could not fetch config from db", "err", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
now := time.Now().Unix()
|
||||
|
||||
if now-lastPublished < publishInterval {
|
||||
fmt.Println("NOTHING")
|
||||
p.log.Debug("nothing to do")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if !p.online() {
|
||||
p.log.Debug("not connected")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
privateKeyID := p.whisper.SelectedKeyPairID()
|
||||
if privateKeyID == "" {
|
||||
return nil, errNoKeySelected
|
||||
}
|
||||
|
||||
privateKey, err := p.whisper.GetPrivateKey(privateKeyID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
identity := fmt.Sprintf("%x", crypto.FromECDSAPub(&privateKey.PublicKey))
|
||||
|
||||
message, err := p.CreatePublicMessage("0x"+identity, filter.ContactCodeTopic(identity), nil, true)
|
||||
if err != nil {
|
||||
p.log.Error("could not build contact code", "identity", identity, "err", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, err = p.whisperAPI.Post(context.TODO(), *message)
|
||||
if err != nil {
|
||||
p.log.Error("could not publish contact code on whisper", "identity", identity, "err", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = p.persistence.Set(now)
|
||||
if err != nil {
|
||||
p.log.Error("could not set last published", "err", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return message, nil
|
||||
}
|
|
@ -4,13 +4,16 @@ import (
|
|||
"crypto/ecdsa"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/crypto/ecies"
|
||||
"github.com/status-im/status-go/messaging/chat"
|
||||
"github.com/status-im/status-go/messaging/chat/multidevice"
|
||||
"github.com/status-im/status-go/messaging/chat/sharedsecret"
|
||||
"github.com/status-im/status-go/messaging/filter"
|
||||
"github.com/status-im/status-go/services/shhext/dedup"
|
||||
"github.com/status-im/status-go/services/shhext/whisperutils"
|
||||
whisper "github.com/status-im/whisper/whisperv6"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
@ -28,91 +31,80 @@ type TestKey struct {
|
|||
|
||||
type ServiceTestSuite struct {
|
||||
suite.Suite
|
||||
alice *Service
|
||||
bob *Service
|
||||
alice *Publisher
|
||||
bob *Publisher
|
||||
aliceKey *TestKey
|
||||
bobKey *TestKey
|
||||
}
|
||||
|
||||
func (s *ServiceTestSuite) createPublisher(installationID string) (*Publisher, *TestKey) {
|
||||
dir, err := ioutil.TempDir("", "publisher-test")
|
||||
s.Require().NoError(err)
|
||||
|
||||
config := Config{PFSEnabled: true}
|
||||
|
||||
whisper := whisper.New(nil)
|
||||
err = whisper.SetMinimumPoW(0)
|
||||
s.Require().NoError(err)
|
||||
|
||||
publisher := New(whisper, config)
|
||||
|
||||
pk, err := crypto.GenerateKey()
|
||||
s.Require().NoError(err)
|
||||
|
||||
keyID, err := whisper.AddKeyPair(pk)
|
||||
s.Require().NoError(err)
|
||||
|
||||
testKey := TestKey{
|
||||
privateKey: pk,
|
||||
keyID: keyID,
|
||||
publicKeyBytes: crypto.FromECDSAPub(&pk.PublicKey),
|
||||
}
|
||||
|
||||
persistence, err := chat.NewSQLLitePersistence(filepath.Join(dir, "db1.sql"), "pass")
|
||||
s.Require().NoError(err)
|
||||
|
||||
sharedSecretService := sharedsecret.NewService(persistence.GetSharedSecretStorage())
|
||||
|
||||
filterService := filter.New(whisper, filter.NewSQLLitePersistence(persistence.DB), sharedSecretService)
|
||||
|
||||
multideviceConfig := &multidevice.Config{
|
||||
InstallationID: installationID,
|
||||
ProtocolVersion: chat.ProtocolVersion,
|
||||
MaxInstallations: 3,
|
||||
}
|
||||
multideviceService := multidevice.New(multideviceConfig, persistence.GetMultideviceStorage())
|
||||
|
||||
protocolService := chat.NewProtocolService(
|
||||
chat.NewEncryptionService(
|
||||
persistence,
|
||||
chat.DefaultEncryptionServiceConfig(installationID)),
|
||||
sharedSecretService,
|
||||
multideviceService,
|
||||
func(addedBundles []*multidevice.Installation) {},
|
||||
func(sharedSecrets []*sharedsecret.Secret) {
|
||||
for _, sharedSecret := range sharedSecrets {
|
||||
_, _ = filterService.ProcessNegotiatedSecret(sharedSecret)
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
publisher.Init(persistence.DB, protocolService, filterService)
|
||||
|
||||
err = publisher.Start(func() bool { return true }, false)
|
||||
s.Require().NoError(err)
|
||||
|
||||
return publisher, &testKey
|
||||
}
|
||||
|
||||
func (s *ServiceTestSuite) SetupTest() {
|
||||
|
||||
dir1, err := ioutil.TempDir("", "publisher-test")
|
||||
s.alice, s.aliceKey = s.createPublisher("installation-1")
|
||||
_, err := s.alice.LoadFilters([]*filter.Chat{})
|
||||
s.Require().NoError(err)
|
||||
|
||||
config1 := &Config{
|
||||
PfsEnabled: true,
|
||||
DataDir: dir1,
|
||||
InstallationID: "1",
|
||||
}
|
||||
|
||||
whisper1 := whisper.New(nil)
|
||||
err = whisper1.SetMinimumPoW(0)
|
||||
s.bob, s.bobKey = s.createPublisher("installation-2")
|
||||
_, err = s.bob.LoadFilters([]*filter.Chat{})
|
||||
s.Require().NoError(err)
|
||||
|
||||
service1 := New(config1, whisper1)
|
||||
|
||||
pk1, err := crypto.GenerateKey()
|
||||
s.Require().NoError(err)
|
||||
|
||||
keyID1, err := whisper1.AddKeyPair(pk1)
|
||||
s.Require().NoError(err)
|
||||
|
||||
key1 := &TestKey{
|
||||
privateKey: pk1,
|
||||
keyID: keyID1,
|
||||
publicKeyBytes: crypto.FromECDSAPub(&pk1.PublicKey),
|
||||
}
|
||||
|
||||
s.Require().NoError(err)
|
||||
|
||||
err = service1.Start(func() bool { return true }, false)
|
||||
s.Require().NoError(err)
|
||||
|
||||
err = service1.InitProtocolWithPassword("1", "")
|
||||
s.Require().NoError(err)
|
||||
_, err = service1.LoadFilters([]*filter.Chat{})
|
||||
s.Require().NoError(err)
|
||||
|
||||
dir2, err := ioutil.TempDir("", "publisher-test")
|
||||
s.Require().NoError(err)
|
||||
|
||||
config2 := &Config{
|
||||
PfsEnabled: true,
|
||||
DataDir: dir2,
|
||||
InstallationID: "2",
|
||||
}
|
||||
|
||||
whisper2 := whisper.New(nil)
|
||||
err = whisper2.SetMinimumPoW(0)
|
||||
s.Require().NoError(err)
|
||||
|
||||
service2 := New(config2, whisper2)
|
||||
|
||||
pk2, err := crypto.GenerateKey()
|
||||
s.Require().NoError(err)
|
||||
|
||||
keyID2, err := whisper2.AddKeyPair(pk2)
|
||||
s.Require().NoError(err)
|
||||
|
||||
key2 := &TestKey{
|
||||
privateKey: pk2,
|
||||
keyID: keyID2,
|
||||
publicKeyBytes: crypto.FromECDSAPub(&pk2.PublicKey),
|
||||
}
|
||||
|
||||
err = service2.Start(func() bool { return true }, false)
|
||||
s.Require().NoError(err)
|
||||
|
||||
err = service2.InitProtocolWithPassword("1", "")
|
||||
s.Require().NoError(err)
|
||||
|
||||
_, err = service2.LoadFilters([]*filter.Chat{})
|
||||
s.Require().NoError(err)
|
||||
|
||||
s.alice = service1
|
||||
s.aliceKey = key1
|
||||
s.bob = service2
|
||||
s.bobKey = key2
|
||||
}
|
||||
|
||||
func (s *ServiceTestSuite) TestCreateDirectMessage() {
|
||||
|
@ -125,14 +117,9 @@ func (s *ServiceTestSuite) TestCreateDirectMessage() {
|
|||
Payload: newMessage.Payload,
|
||||
Dst: newMessage.PublicKey,
|
||||
}
|
||||
dedupMessage := dedup.DeduplicateMessage{
|
||||
DedupID: []byte("1"),
|
||||
Message: message,
|
||||
}
|
||||
|
||||
err = s.bob.ProcessMessage(dedupMessage)
|
||||
err = s.bob.ProcessMessage(message, []byte("1"))
|
||||
s.Require().NoError(err)
|
||||
|
||||
s.Require().Equal([]byte("hello"), message.Payload)
|
||||
}
|
||||
|
||||
|
@ -164,13 +151,8 @@ func (s *ServiceTestSuite) TestTopic() {
|
|||
}
|
||||
|
||||
// We receive the contact code
|
||||
dedupMessage2 := dedup.DeduplicateMessage{
|
||||
DedupID: []byte("1"),
|
||||
Message: message2,
|
||||
}
|
||||
|
||||
err = s.alice.ProcessMessage(dedupMessage2)
|
||||
s.Require().NoError(err)
|
||||
err = s.alice.ProcessMessage(message2, []byte("1"))
|
||||
s.Require().EqualError(err, chat.ErrNoPayload.Error())
|
||||
|
||||
// We build another message, this time it should use the partitioned topic
|
||||
newMessage3, err := s.alice.CreateDirectMessage(s.aliceKey.keyID, s.bobKey.publicKeyBytes, false, []byte("hello"))
|
||||
|
@ -187,12 +169,7 @@ func (s *ServiceTestSuite) TestTopic() {
|
|||
s.Require().Equal(expectedTopic3, message3.Topic)
|
||||
|
||||
// We receive the message
|
||||
dedupMessage3 := dedup.DeduplicateMessage{
|
||||
DedupID: []byte("1"),
|
||||
Message: message3,
|
||||
}
|
||||
|
||||
err = s.bob.ProcessMessage(dedupMessage3)
|
||||
err = s.bob.ProcessMessage(message3, []byte("1"))
|
||||
s.Require().NoError(err)
|
||||
|
||||
// We build another message, this time it should use the negotiated topic
|
||||
|
@ -217,12 +194,7 @@ func (s *ServiceTestSuite) TestTopic() {
|
|||
s.Require().Equal(negotiatedTopic, message4.Topic)
|
||||
|
||||
// We receive the message
|
||||
dedupMessage4 := dedup.DeduplicateMessage{
|
||||
DedupID: []byte("1"),
|
||||
Message: message4,
|
||||
}
|
||||
|
||||
err = s.alice.ProcessMessage(dedupMessage4)
|
||||
err = s.alice.ProcessMessage(message4, []byte("1"))
|
||||
s.Require().NoError(err)
|
||||
|
||||
// Alice sends another message to Bob, this time it should use the negotiated topic
|
|
@ -1,591 +0,0 @@
|
|||
package publisher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/status-im/status-go/messaging/chat"
|
||||
chatDB "github.com/status-im/status-go/messaging/chat/db"
|
||||
"github.com/status-im/status-go/messaging/chat/multidevice"
|
||||
"github.com/status-im/status-go/messaging/chat/protobuf"
|
||||
"github.com/status-im/status-go/messaging/chat/sharedsecret"
|
||||
"github.com/status-im/status-go/messaging/filter"
|
||||
"github.com/status-im/status-go/services/shhext/dedup"
|
||||
"github.com/status-im/status-go/services/shhext/whisperutils"
|
||||
"github.com/status-im/status-go/signal"
|
||||
whisper "github.com/status-im/whisper/whisperv6"
|
||||
"golang.org/x/crypto/sha3"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
tickerInterval = 120
|
||||
// How often we should publish a contact code in seconds
|
||||
publishInterval = 21600
|
||||
maxInstallations = 3
|
||||
)
|
||||
|
||||
var (
|
||||
errProtocolNotInitialized = errors.New("protocol is not initialized")
|
||||
// ErrPFSNotEnabled is returned when an endpoint PFS only is called but
|
||||
// PFS is disabled
|
||||
ErrPFSNotEnabled = errors.New("pfs not enabled")
|
||||
errNoKeySelected = errors.New("no key selected")
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
whisper *whisper.Whisper
|
||||
online func() bool
|
||||
whisperAPI *whisper.PublicWhisperAPI
|
||||
protocol *chat.ProtocolService
|
||||
persistence Persistence
|
||||
log log.Logger
|
||||
filter *filter.Service
|
||||
config *Config
|
||||
quit chan struct{}
|
||||
ticker *time.Ticker
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
PfsEnabled bool
|
||||
DataDir string
|
||||
InstallationID string
|
||||
}
|
||||
|
||||
func New(config *Config, w *whisper.Whisper) *Service {
|
||||
return &Service{
|
||||
config: config,
|
||||
whisper: w,
|
||||
whisperAPI: whisper.NewPublicWhisperAPI(w),
|
||||
log: log.New("package", "status-go/services/publisher.Service"),
|
||||
}
|
||||
}
|
||||
|
||||
// InitProtocolWithPassword creates an instance of ProtocolService given an address and password used to generate an encryption key.
|
||||
func (s *Service) InitProtocolWithPassword(address string, password string) error {
|
||||
digest := sha3.Sum256([]byte(password))
|
||||
encKey := fmt.Sprintf("%x", digest)
|
||||
return s.initProtocol(address, encKey, password)
|
||||
}
|
||||
|
||||
// InitProtocolWithEncyptionKey creates an instance of ProtocolService given an address and encryption key.
|
||||
func (s *Service) InitProtocolWithEncyptionKey(address string, encKey string) error {
|
||||
return s.initProtocol(address, encKey, "")
|
||||
}
|
||||
|
||||
func (s *Service) initProtocol(address, encKey, password string) error {
|
||||
if !s.config.PfsEnabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(filepath.Clean(s.config.DataDir), os.ModePerm); err != nil {
|
||||
return err
|
||||
}
|
||||
v0Path := filepath.Join(s.config.DataDir, fmt.Sprintf("%x.db", address))
|
||||
v1Path := filepath.Join(s.config.DataDir, fmt.Sprintf("%s.db", s.config.InstallationID))
|
||||
v2Path := filepath.Join(s.config.DataDir, fmt.Sprintf("%s.v2.db", s.config.InstallationID))
|
||||
v3Path := filepath.Join(s.config.DataDir, fmt.Sprintf("%s.v3.db", s.config.InstallationID))
|
||||
v4Path := filepath.Join(s.config.DataDir, fmt.Sprintf("%s.v4.db", s.config.InstallationID))
|
||||
|
||||
if password != "" {
|
||||
if err := chatDB.MigrateDBFile(v0Path, v1Path, "ON", password); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := chatDB.MigrateDBFile(v1Path, v2Path, password, encKey); err != nil {
|
||||
// Remove db file as created with a blank password and never used,
|
||||
// and there's no need to rekey in this case
|
||||
os.Remove(v1Path)
|
||||
os.Remove(v2Path)
|
||||
}
|
||||
}
|
||||
|
||||
if err := chatDB.MigrateDBKeyKdfIterations(v2Path, v3Path, encKey); err != nil {
|
||||
os.Remove(v2Path)
|
||||
os.Remove(v3Path)
|
||||
}
|
||||
|
||||
// Fix IOS not encrypting database
|
||||
if err := chatDB.EncryptDatabase(v3Path, v4Path, encKey); err != nil {
|
||||
os.Remove(v3Path)
|
||||
os.Remove(v4Path)
|
||||
}
|
||||
|
||||
// Desktop was passing a network dependent directory, which meant that
|
||||
// if running on testnet it would not access the right db. This copies
|
||||
// the db from mainnet to the root location.
|
||||
networkDependentPath := filepath.Join(s.config.DataDir, "ethereum", "mainnet_rpc", fmt.Sprintf("%s.v4.db", s.config.InstallationID))
|
||||
if _, err := os.Stat(networkDependentPath); err == nil {
|
||||
if err := os.Rename(networkDependentPath, v4Path); err != nil {
|
||||
return err
|
||||
}
|
||||
} else if !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
persistence, err := chat.NewSQLLitePersistence(v4Path, encKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
addedBundlesHandler := func(addedBundles []*multidevice.Installation) {
|
||||
handler := SignalHandler{}
|
||||
for _, bundle := range addedBundles {
|
||||
handler.BundleAdded(bundle.Identity, bundle.ID)
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize persistence
|
||||
s.persistence = NewSQLLitePersistence(persistence.DB)
|
||||
|
||||
// Initialize sharedsecret
|
||||
sharedSecretService := sharedsecret.NewService(persistence.GetSharedSecretStorage())
|
||||
// Initialize filter
|
||||
filterService := filter.New(s.whisper, filter.NewSQLLitePersistence(persistence.DB), sharedSecretService)
|
||||
s.filter = filterService
|
||||
|
||||
// Initialize multidevice
|
||||
multideviceConfig := &multidevice.Config{
|
||||
InstallationID: s.config.InstallationID,
|
||||
ProtocolVersion: chat.ProtocolVersion,
|
||||
MaxInstallations: maxInstallations,
|
||||
}
|
||||
multideviceService := multidevice.New(multideviceConfig, persistence.GetMultideviceStorage())
|
||||
|
||||
s.protocol = chat.NewProtocolService(
|
||||
chat.NewEncryptionService(
|
||||
persistence,
|
||||
chat.DefaultEncryptionServiceConfig(s.config.InstallationID)),
|
||||
sharedSecretService,
|
||||
multideviceService,
|
||||
addedBundlesHandler,
|
||||
s.onNewSharedSecretHandler)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) ProcessPublicBundle(myIdentityKey *ecdsa.PrivateKey, bundle *protobuf.Bundle) ([]*multidevice.Installation, error) {
|
||||
if s.protocol == nil {
|
||||
return nil, errProtocolNotInitialized
|
||||
}
|
||||
|
||||
return s.protocol.ProcessPublicBundle(myIdentityKey, bundle)
|
||||
}
|
||||
|
||||
func (s *Service) GetBundle(myIdentityKey *ecdsa.PrivateKey) (*protobuf.Bundle, error) {
|
||||
if s.protocol == nil {
|
||||
return nil, errProtocolNotInitialized
|
||||
}
|
||||
|
||||
return s.protocol.GetBundle(myIdentityKey)
|
||||
}
|
||||
|
||||
// EnableInstallation enables an installation for multi-device sync.
|
||||
func (s *Service) EnableInstallation(installationID string) error {
|
||||
if s.protocol == nil {
|
||||
return errProtocolNotInitialized
|
||||
}
|
||||
|
||||
privateKeyID := s.whisper.SelectedKeyPairID()
|
||||
if privateKeyID == "" {
|
||||
return errNoKeySelected
|
||||
}
|
||||
|
||||
privateKey, err := s.whisper.GetPrivateKey(privateKeyID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.protocol.EnableInstallation(&privateKey.PublicKey, installationID)
|
||||
}
|
||||
|
||||
// DisableInstallation disables an installation for multi-device sync.
|
||||
func (s *Service) DisableInstallation(installationID string) error {
|
||||
if s.protocol == nil {
|
||||
return errProtocolNotInitialized
|
||||
}
|
||||
|
||||
privateKeyID := s.whisper.SelectedKeyPairID()
|
||||
if privateKeyID == "" {
|
||||
return errNoKeySelected
|
||||
}
|
||||
|
||||
privateKey, err := s.whisper.GetPrivateKey(privateKeyID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.protocol.DisableInstallation(&privateKey.PublicKey, installationID)
|
||||
}
|
||||
|
||||
// GetOurInstallations returns all the installations available given an identity
|
||||
func (s *Service) GetOurInstallations() ([]*multidevice.Installation, error) {
|
||||
if s.protocol == nil {
|
||||
return nil, errProtocolNotInitialized
|
||||
}
|
||||
|
||||
privateKeyID := s.whisper.SelectedKeyPairID()
|
||||
if privateKeyID == "" {
|
||||
return nil, errNoKeySelected
|
||||
}
|
||||
|
||||
privateKey, err := s.whisper.GetPrivateKey(privateKeyID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return s.protocol.GetOurInstallations(&privateKey.PublicKey)
|
||||
}
|
||||
|
||||
// SetInstallationMetadata sets the metadata for our own installation
|
||||
func (s *Service) SetInstallationMetadata(installationID string, data *multidevice.InstallationMetadata) error {
|
||||
if s.protocol == nil {
|
||||
return errProtocolNotInitialized
|
||||
}
|
||||
|
||||
privateKeyID := s.whisper.SelectedKeyPairID()
|
||||
if privateKeyID == "" {
|
||||
return errNoKeySelected
|
||||
}
|
||||
|
||||
privateKey, err := s.whisper.GetPrivateKey(privateKeyID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return s.protocol.SetInstallationMetadata(&privateKey.PublicKey, installationID, data)
|
||||
}
|
||||
|
||||
func (s *Service) GetPublicBundle(identityKey *ecdsa.PublicKey) (*protobuf.Bundle, error) {
|
||||
if s.protocol == nil {
|
||||
return nil, errProtocolNotInitialized
|
||||
}
|
||||
|
||||
return s.protocol.GetPublicBundle(identityKey)
|
||||
}
|
||||
|
||||
func (s *Service) Start(online func() bool, startTicker bool) error {
|
||||
s.online = online
|
||||
if startTicker {
|
||||
s.startTicker()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) Stop() error {
|
||||
if s.filter != nil {
|
||||
if err := s.filter.Stop(); err != nil {
|
||||
log.Error("Failed to stop filter service with error", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) getNegotiatedChat(identity *ecdsa.PublicKey) *filter.Chat {
|
||||
return s.filter.GetNegotiated(identity)
|
||||
}
|
||||
|
||||
func (s *Service) LoadFilters(chats []*filter.Chat) ([]*filter.Chat, error) {
|
||||
return s.filter.Init(chats)
|
||||
}
|
||||
|
||||
func (s *Service) LoadFilter(chat *filter.Chat) ([]*filter.Chat, error) {
|
||||
return s.filter.Load(chat)
|
||||
}
|
||||
|
||||
func (s *Service) RemoveFilters(chats []*filter.Chat) error {
|
||||
return s.filter.Remove(chats)
|
||||
}
|
||||
|
||||
func (s *Service) onNewSharedSecretHandler(sharedSecrets []*sharedsecret.Secret) {
|
||||
var filters []*signal.Filter
|
||||
for _, sharedSecret := range sharedSecrets {
|
||||
chat, err := s.filter.ProcessNegotiatedSecret(sharedSecret)
|
||||
if err != nil {
|
||||
log.Error("Failed to process negotiated secret", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
filter := &signal.Filter{
|
||||
ChatID: chat.ChatID,
|
||||
SymKeyID: chat.SymKeyID,
|
||||
Listen: chat.Listen,
|
||||
FilterID: chat.FilterID,
|
||||
Identity: chat.Identity,
|
||||
Topic: chat.Topic,
|
||||
}
|
||||
|
||||
filters = append(filters, filter)
|
||||
|
||||
}
|
||||
if len(filters) != 0 {
|
||||
handler := SignalHandler{}
|
||||
handler.WhisperFilterAdded(filters)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func (s *Service) ProcessMessage(dedupMessage dedup.DeduplicateMessage) error {
|
||||
if !s.config.PfsEnabled {
|
||||
return nil
|
||||
}
|
||||
msg := dedupMessage.Message
|
||||
|
||||
privateKeyID := s.whisper.SelectedKeyPairID()
|
||||
if privateKeyID == "" {
|
||||
return errNoKeySelected
|
||||
}
|
||||
|
||||
privateKey, err := s.whisper.GetPrivateKey(privateKeyID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
publicKey, err := crypto.UnmarshalPubkey(msg.Sig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Unmarshal message
|
||||
protocolMessage := &protobuf.ProtocolMessage{}
|
||||
|
||||
if err := proto.Unmarshal(msg.Payload, protocolMessage); err != nil {
|
||||
s.log.Debug("Not a protocol message", "err", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
response, err := s.protocol.HandleMessage(privateKey, publicKey, protocolMessage, dedupMessage.DedupID)
|
||||
|
||||
switch err {
|
||||
case nil:
|
||||
// Set the decrypted payload
|
||||
msg.Payload = response
|
||||
case chat.ErrDeviceNotFound:
|
||||
// Notify that someone tried to contact us using an invalid bundle
|
||||
if privateKey.PublicKey != *publicKey {
|
||||
s.log.Warn("Device not found, sending signal", "err", err)
|
||||
keyString := fmt.Sprintf("0x%x", crypto.FromECDSAPub(publicKey))
|
||||
handler := SignalHandler{}
|
||||
handler.DecryptMessageFailed(keyString)
|
||||
}
|
||||
default:
|
||||
// Log and pass to the client, even if failed to decrypt
|
||||
s.log.Error("Failed handling message with error", "err", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CreateDirectMessage creates a 1:1 chat message
|
||||
func (s *Service) CreateDirectMessage(signature string, destination hexutil.Bytes, DH bool, payload []byte) (*whisper.NewMessage, error) {
|
||||
if !s.config.PfsEnabled {
|
||||
return nil, ErrPFSNotEnabled
|
||||
}
|
||||
|
||||
privateKey, err := s.whisper.GetPrivateKey(signature)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
publicKey, err := crypto.UnmarshalPubkey(destination)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var msgSpec *chat.ProtocolMessageSpec
|
||||
|
||||
if DH {
|
||||
s.log.Debug("Building dh message")
|
||||
msgSpec, err = s.protocol.BuildDHMessage(privateKey, publicKey, payload)
|
||||
} else {
|
||||
s.log.Debug("Building direct message")
|
||||
msgSpec, err = s.protocol.BuildDirectMessage(privateKey, publicKey, payload)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
whisperMessage, err := s.directMessageToWhisper(privateKey, publicKey, destination, signature, msgSpec)
|
||||
if err != nil {
|
||||
s.log.Error("sshext-service", "error building whisper message", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return whisperMessage, nil
|
||||
}
|
||||
|
||||
func (s *Service) directMessageToWhisper(myPrivateKey *ecdsa.PrivateKey, theirPublicKey *ecdsa.PublicKey, destination hexutil.Bytes, signature string, spec *chat.ProtocolMessageSpec) (*whisper.NewMessage, error) {
|
||||
// marshal for sending to wire
|
||||
marshaledMessage, err := proto.Marshal(spec.Message)
|
||||
if err != nil {
|
||||
s.log.Error("encryption-service", "error marshaling message", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
whisperMessage := whisperutils.DefaultWhisperMessage()
|
||||
whisperMessage.Payload = marshaledMessage
|
||||
whisperMessage.Sig = signature
|
||||
|
||||
if spec.SharedSecret != nil {
|
||||
chat := s.getNegotiatedChat(theirPublicKey)
|
||||
if chat != nil {
|
||||
s.log.Debug("Sending on negotiated topic", "public-key", destination)
|
||||
whisperMessage.SymKeyID = chat.SymKeyID
|
||||
whisperMessage.Topic = chat.Topic
|
||||
whisperMessage.PublicKey = nil
|
||||
return &whisperMessage, nil
|
||||
}
|
||||
} else if spec.PartitionedTopic() == chat.PartitionTopicV1 {
|
||||
s.log.Debug("Sending on partitioned topic", "public-key", destination)
|
||||
// Create filter on demand
|
||||
if _, err := s.filter.LoadPartitioned(myPrivateKey, theirPublicKey, false); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
t := filter.PublicKeyToPartitionedTopicBytes(theirPublicKey)
|
||||
whisperMessage.Topic = whisper.BytesToTopic(t)
|
||||
whisperMessage.PublicKey = destination
|
||||
return &whisperMessage, nil
|
||||
}
|
||||
|
||||
s.log.Debug("Sending on old discovery topic", "public-key", destination)
|
||||
whisperMessage.Topic = whisperutils.DiscoveryTopicBytes
|
||||
whisperMessage.PublicKey = destination
|
||||
|
||||
return &whisperMessage, nil
|
||||
}
|
||||
|
||||
// CreatePublicMessage sends a public chat message to the underlying transport
|
||||
func (s *Service) CreatePublicMessage(signature string, chatID string, payload []byte, wrap bool) (*whisper.NewMessage, error) {
|
||||
if !s.config.PfsEnabled {
|
||||
return nil, ErrPFSNotEnabled
|
||||
}
|
||||
|
||||
filter := s.filter.GetByID(chatID)
|
||||
if filter == nil {
|
||||
return nil, errors.New("not subscribed to chat")
|
||||
}
|
||||
s.log.Info("SIG", signature)
|
||||
|
||||
// Enrich with transport layer info
|
||||
whisperMessage := whisperutils.DefaultWhisperMessage()
|
||||
whisperMessage.Sig = signature
|
||||
whisperMessage.Topic = whisperutils.ToTopic(chatID)
|
||||
whisperMessage.SymKeyID = filter.SymKeyID
|
||||
|
||||
if wrap {
|
||||
privateKeyID := s.whisper.SelectedKeyPairID()
|
||||
if privateKeyID == "" {
|
||||
return nil, errNoKeySelected
|
||||
}
|
||||
|
||||
privateKey, err := s.whisper.GetPrivateKey(privateKeyID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
message, err := s.protocol.BuildPublicMessage(privateKey, payload)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
marshaledMessage, err := proto.Marshal(message)
|
||||
if err != nil {
|
||||
s.log.Error("encryption-service", "error marshaling message", err)
|
||||
return nil, err
|
||||
}
|
||||
whisperMessage.Payload = marshaledMessage
|
||||
|
||||
} else {
|
||||
whisperMessage.Payload = payload
|
||||
}
|
||||
|
||||
return &whisperMessage, nil
|
||||
}
|
||||
|
||||
func (s *Service) ConfirmMessagesProcessed(ids [][]byte) error {
|
||||
return s.protocol.ConfirmMessagesProcessed(ids)
|
||||
}
|
||||
|
||||
func (s *Service) startTicker() {
|
||||
s.ticker = time.NewTicker(tickerInterval * time.Second)
|
||||
s.quit = make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-s.ticker.C:
|
||||
_, err := s.sendContactCode()
|
||||
if err != nil {
|
||||
s.log.Error("could not execute tick", "err", err)
|
||||
}
|
||||
case <-s.quit:
|
||||
s.ticker.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *Service) sendContactCode() (*whisper.NewMessage, error) {
|
||||
s.log.Info("publishing bundle")
|
||||
if !s.config.PfsEnabled {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
lastPublished, err := s.persistence.Get()
|
||||
if err != nil {
|
||||
s.log.Error("could not fetch config from db", "err", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
now := time.Now().Unix()
|
||||
|
||||
if now-lastPublished < publishInterval {
|
||||
fmt.Println("NOTHING")
|
||||
s.log.Debug("nothing to do")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if !s.online() {
|
||||
s.log.Debug("not connected")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
privateKeyID := s.whisper.SelectedKeyPairID()
|
||||
if privateKeyID == "" {
|
||||
return nil, errNoKeySelected
|
||||
}
|
||||
|
||||
privateKey, err := s.whisper.GetPrivateKey(privateKeyID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
identity := fmt.Sprintf("%x", crypto.FromECDSAPub(&privateKey.PublicKey))
|
||||
|
||||
message, err := s.CreatePublicMessage("0x"+identity, filter.ContactCodeTopic(identity), nil, true)
|
||||
if err != nil {
|
||||
s.log.Error("could not build contact code", "identity", identity, "err", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, err = s.whisperAPI.Post(context.TODO(), *message)
|
||||
if err != nil {
|
||||
s.log.Error("could not publish contact code on whisper", "identity", identity, "err", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = s.persistence.Set(now)
|
||||
if err != nil {
|
||||
s.log.Error("could not set last published", "err", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return message, nil
|
||||
}
|
|
@ -1,20 +0,0 @@
|
|||
package publisher
|
||||
|
||||
import (
|
||||
"github.com/status-im/status-go/signal"
|
||||
)
|
||||
|
||||
// SignalHandler sends signals on protocol events
|
||||
type SignalHandler struct{}
|
||||
|
||||
func (h SignalHandler) DecryptMessageFailed(pubKey string) {
|
||||
signal.SendDecryptMessageFailed(pubKey)
|
||||
}
|
||||
|
||||
func (h SignalHandler) BundleAdded(identity string, installationID string) {
|
||||
signal.SendBundleAdded(identity, installationID)
|
||||
}
|
||||
|
||||
func (h SignalHandler) WhisperFilterAdded(filters []*signal.Filter) {
|
||||
signal.SendWhisperFilterAdded(filters)
|
||||
}
|
|
@ -11,6 +11,7 @@ import (
|
|||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
"github.com/ethereum/go-ethereum/crypto"
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/rlp"
|
||||
|
@ -420,9 +421,23 @@ func (api *PublicAPI) GetNewFilterMessages(filterID string) ([]dedup.Deduplicate
|
|||
|
||||
// Attempt to decrypt message, otherwise leave unchanged
|
||||
for _, dedupMessage := range dedupMessages {
|
||||
err := api.service.ProcessMessage(dedupMessage.Message, dedupMessage.DedupID)
|
||||
switch err {
|
||||
case chat.ErrNotPairedDevice:
|
||||
api.log.Info("Received a message from non-paired device", "err", err)
|
||||
case chat.ErrDeviceNotFound:
|
||||
api.log.Warn("Device not found, sending signal", "err", err)
|
||||
|
||||
if err := api.service.ProcessMessage(dedupMessage); err != nil {
|
||||
return nil, err
|
||||
publicKey, err := crypto.UnmarshalPubkey(dedupMessage.Message.Sig)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to handler chat.ErrDeviceNotFound: %v", err)
|
||||
}
|
||||
|
||||
keyString := fmt.Sprintf("%#x", crypto.FromECDSAPub(publicKey))
|
||||
handler := PublisherSignalHandler{}
|
||||
handler.DecryptMessageFailed(keyString)
|
||||
default:
|
||||
api.log.Error("Failed handling message with error", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -4,6 +4,8 @@ import (
|
|||
"context"
|
||||
"crypto/ecdsa"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
|
@ -12,14 +14,22 @@ import (
|
|||
"github.com/ethereum/go-ethereum/p2p"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
"github.com/ethereum/go-ethereum/rpc"
|
||||
|
||||
"github.com/status-im/status-go/db"
|
||||
"github.com/status-im/status-go/messaging/chat"
|
||||
chatDB "github.com/status-im/status-go/messaging/chat/db"
|
||||
"github.com/status-im/status-go/messaging/chat/multidevice"
|
||||
"github.com/status-im/status-go/messaging/chat/sharedsecret"
|
||||
"github.com/status-im/status-go/messaging/filter"
|
||||
"github.com/status-im/status-go/messaging/publisher"
|
||||
"github.com/status-im/status-go/params"
|
||||
"github.com/status-im/status-go/services/shhext/dedup"
|
||||
"github.com/status-im/status-go/services/shhext/mailservers"
|
||||
"github.com/status-im/status-go/signal"
|
||||
|
||||
whisper "github.com/status-im/whisper/whisperv6"
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"golang.org/x/crypto/sha3"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -27,6 +37,8 @@ const (
|
|||
defaultConnectionsTarget = 1
|
||||
// defaultTimeoutWaitAdded is a timeout to use to establish initial connections.
|
||||
defaultTimeoutWaitAdded = 5 * time.Second
|
||||
// maxInstallations is a maximum number of supported devices for one account.
|
||||
maxInstallations = 3
|
||||
)
|
||||
|
||||
// EnvelopeEventsHandler used for two different event types.
|
||||
|
@ -39,7 +51,7 @@ type EnvelopeEventsHandler interface {
|
|||
|
||||
// Service is a service that provides some additional Whisper API.
|
||||
type Service struct {
|
||||
*publisher.Service
|
||||
*publisher.Publisher
|
||||
storage db.TransactionalStorage
|
||||
w *whisper.Whisper
|
||||
config params.ShhextConfig
|
||||
|
@ -77,14 +89,9 @@ func New(w *whisper.Whisper, handler EnvelopeEventsHandler, ldb *leveldb.DB, con
|
|||
requestsRegistry: requestsRegistry,
|
||||
}
|
||||
envelopesMonitor := NewEnvelopesMonitor(w, handler, config.MailServerConfirmations, ps, config.MaxMessageDeliveryAttempts)
|
||||
publisherConfig := &publisher.Config{
|
||||
PfsEnabled: config.PFSEnabled,
|
||||
DataDir: config.BackupDisabledDataDir,
|
||||
InstallationID: config.InstallationID,
|
||||
}
|
||||
publisherService := publisher.New(publisherConfig, w)
|
||||
publisher := publisher.New(w, publisher.Config{PFSEnabled: config.PFSEnabled})
|
||||
return &Service{
|
||||
Service: publisherService,
|
||||
Publisher: publisher,
|
||||
storage: db.NewLevelDBStorage(ldb),
|
||||
w: w,
|
||||
config: config,
|
||||
|
@ -98,6 +105,137 @@ func New(w *whisper.Whisper, handler EnvelopeEventsHandler, ldb *leveldb.DB, con
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Service) InitProtocolWithPassword(address string, password string) error {
|
||||
digest := sha3.Sum256([]byte(password))
|
||||
encKey := fmt.Sprintf("%x", digest)
|
||||
return s.initProtocol(address, encKey, password)
|
||||
}
|
||||
|
||||
// InitProtocolWithEncyptionKey creates an instance of ProtocolService given an address and encryption key.
|
||||
func (s *Service) InitProtocolWithEncyptionKey(address string, encKey string) error {
|
||||
return s.initProtocol(address, encKey, "")
|
||||
}
|
||||
|
||||
func (s *Service) initProtocol(address, encKey, password string) error {
|
||||
if !s.config.PFSEnabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
dataDir := filepath.Clean(s.config.BackupDisabledDataDir)
|
||||
|
||||
if err := os.MkdirAll(dataDir, os.ModePerm); err != nil {
|
||||
return err
|
||||
}
|
||||
v0Path := filepath.Join(dataDir, fmt.Sprintf("%x.db", address))
|
||||
v1Path := filepath.Join(dataDir, fmt.Sprintf("%s.db", s.config.InstallationID))
|
||||
v2Path := filepath.Join(dataDir, fmt.Sprintf("%s.v2.db", s.config.InstallationID))
|
||||
v3Path := filepath.Join(dataDir, fmt.Sprintf("%s.v3.db", s.config.InstallationID))
|
||||
v4Path := filepath.Join(dataDir, fmt.Sprintf("%s.v4.db", s.config.InstallationID))
|
||||
|
||||
if password != "" {
|
||||
if err := chatDB.MigrateDBFile(v0Path, v1Path, "ON", password); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := chatDB.MigrateDBFile(v1Path, v2Path, password, encKey); err != nil {
|
||||
// Remove db file as created with a blank password and never used,
|
||||
// and there's no need to rekey in this case
|
||||
os.Remove(v1Path)
|
||||
os.Remove(v2Path)
|
||||
}
|
||||
}
|
||||
|
||||
if err := chatDB.MigrateDBKeyKdfIterations(v2Path, v3Path, encKey); err != nil {
|
||||
os.Remove(v2Path)
|
||||
os.Remove(v3Path)
|
||||
}
|
||||
|
||||
// Fix IOS not encrypting database
|
||||
if err := chatDB.EncryptDatabase(v3Path, v4Path, encKey); err != nil {
|
||||
os.Remove(v3Path)
|
||||
os.Remove(v4Path)
|
||||
}
|
||||
|
||||
// Desktop was passing a network dependent directory, which meant that
|
||||
// if running on testnet it would not access the right db. This copies
|
||||
// the db from mainnet to the root location.
|
||||
networkDependentPath := filepath.Join(dataDir, "ethereum", "mainnet_rpc", fmt.Sprintf("%s.v4.db", s.config.InstallationID))
|
||||
if _, err := os.Stat(networkDependentPath); err == nil {
|
||||
if err := os.Rename(networkDependentPath, v4Path); err != nil {
|
||||
return err
|
||||
}
|
||||
} else if !os.IsNotExist(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
persistence, err := chat.NewSQLLitePersistence(v4Path, encKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Initialize sharedsecret
|
||||
sharedSecretService := sharedsecret.NewService(persistence.GetSharedSecretStorage())
|
||||
|
||||
// Initialize filter
|
||||
filterService := filter.New(s.w, filter.NewSQLLitePersistence(persistence.DB), sharedSecretService)
|
||||
|
||||
// Initialize multidevice
|
||||
multideviceConfig := &multidevice.Config{
|
||||
InstallationID: s.config.InstallationID,
|
||||
ProtocolVersion: chat.ProtocolVersion,
|
||||
MaxInstallations: maxInstallations,
|
||||
}
|
||||
multideviceService := multidevice.New(multideviceConfig, persistence.GetMultideviceStorage())
|
||||
|
||||
addedBundlesHandler := func(addedBundles []*multidevice.Installation) {
|
||||
handler := PublisherSignalHandler{}
|
||||
for _, bundle := range addedBundles {
|
||||
handler.BundleAdded(bundle.Identity, bundle.ID)
|
||||
}
|
||||
}
|
||||
|
||||
protocolService := chat.NewProtocolService(
|
||||
chat.NewEncryptionService(
|
||||
persistence,
|
||||
chat.DefaultEncryptionServiceConfig(s.config.InstallationID)),
|
||||
sharedSecretService,
|
||||
multideviceService,
|
||||
addedBundlesHandler,
|
||||
s.newSharedSecretHandler(filterService))
|
||||
|
||||
s.Publisher.Init(persistence.DB, protocolService, filterService)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) newSharedSecretHandler(filterService *filter.Service) func([]*sharedsecret.Secret) {
|
||||
return func(sharedSecrets []*sharedsecret.Secret) {
|
||||
var filters []*signal.Filter
|
||||
for _, sharedSecret := range sharedSecrets {
|
||||
chat, err := filterService.ProcessNegotiatedSecret(sharedSecret)
|
||||
if err != nil {
|
||||
log.Error("Failed to process negotiated secret", "err", err)
|
||||
return
|
||||
}
|
||||
|
||||
filter := &signal.Filter{
|
||||
ChatID: chat.ChatID,
|
||||
SymKeyID: chat.SymKeyID,
|
||||
Listen: chat.Listen,
|
||||
FilterID: chat.FilterID,
|
||||
Identity: chat.Identity,
|
||||
Topic: chat.Topic,
|
||||
}
|
||||
|
||||
filters = append(filters, filter)
|
||||
}
|
||||
if len(filters) != 0 {
|
||||
handler := PublisherSignalHandler{}
|
||||
handler.WhisperFilterAdded(filters)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateMailservers updates information about selected mail servers.
|
||||
func (s *Service) UpdateMailservers(nodes []*enode.Node) error {
|
||||
if err := s.peerStore.Update(nodes); err != nil {
|
||||
|
@ -154,7 +292,7 @@ func (s *Service) Start(server *p2p.Server) error {
|
|||
s.mailMonitor.Start()
|
||||
s.nodeID = server.PrivateKey
|
||||
s.server = server
|
||||
return s.Service.Start(s.online, true)
|
||||
return s.Publisher.Start(s.online, true)
|
||||
}
|
||||
|
||||
func (s *Service) online() bool {
|
||||
|
@ -179,7 +317,7 @@ func (s *Service) Stop() error {
|
|||
}
|
||||
}
|
||||
|
||||
return s.Service.Stop()
|
||||
return s.Publisher.Stop()
|
||||
}
|
||||
|
||||
func (s *Service) syncMessages(ctx context.Context, mailServerID []byte, r whisper.SyncMailRequest) (resp whisper.SyncEventResponse, err error) {
|
||||
|
|
|
@ -27,3 +27,18 @@ func (h EnvelopeSignalHandler) MailServerRequestCompleted(requestID common.Hash,
|
|||
func (h EnvelopeSignalHandler) MailServerRequestExpired(hash common.Hash) {
|
||||
signal.SendMailServerRequestExpired(hash)
|
||||
}
|
||||
|
||||
// PublisherSignalHandler sends signals on protocol events
|
||||
type PublisherSignalHandler struct{}
|
||||
|
||||
func (h PublisherSignalHandler) DecryptMessageFailed(pubKey string) {
|
||||
signal.SendDecryptMessageFailed(pubKey)
|
||||
}
|
||||
|
||||
func (h PublisherSignalHandler) BundleAdded(identity string, installationID string) {
|
||||
signal.SendBundleAdded(identity, installationID)
|
||||
}
|
||||
|
||||
func (h PublisherSignalHandler) WhisperFilterAdded(filters []*signal.Filter) {
|
||||
signal.SendWhisperFilterAdded(filters)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue