status-go/protocol/transport/whisper/whisper_service.go

441 lines
11 KiB
Go
Raw Normal View History

2019-07-17 22:25:42 +00:00
package whisper
import (
"bytes"
2019-07-17 22:25:42 +00:00
"context"
"crypto/ecdsa"
2019-07-30 06:14:13 +00:00
"database/sql"
2019-07-17 22:25:42 +00:00
"sync"
"time"
2019-07-17 22:25:42 +00:00
"github.com/pkg/errors"
"go.uber.org/zap"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/transport"
2019-07-17 22:25:42 +00:00
)
var (
// ErrNoMailservers returned if there is no configured mailservers that can be used.
ErrNoMailservers = errors.New("no configured mailservers")
)
type whisperServiceKeysManager struct {
shh types.Whisper
2019-07-17 22:25:42 +00:00
// Identity of the current user.
privateKey *ecdsa.PrivateKey
passToSymKeyMutex sync.RWMutex
passToSymKeyCache map[string]string
}
func (m *whisperServiceKeysManager) AddOrGetKeyPair(priv *ecdsa.PrivateKey) (string, error) {
// caching is handled in Whisper
return m.shh.AddKeyPair(priv)
}
func (m *whisperServiceKeysManager) AddOrGetSymKeyFromPassword(password string) (string, error) {
m.passToSymKeyMutex.Lock()
defer m.passToSymKeyMutex.Unlock()
if val, ok := m.passToSymKeyCache[password]; ok {
return val, nil
}
id, err := m.shh.AddSymKeyFromPassword(password)
if err != nil {
return id, err
}
m.passToSymKeyCache[password] = id
return id, nil
}
func (m *whisperServiceKeysManager) RawSymKey(id string) ([]byte, error) {
return m.shh.GetSymKey(id)
}
type Option func(*Transport) error
2019-08-29 06:33:46 +00:00
// Transport is a transport based on Whisper service.
type Transport struct {
shh types.Whisper
shhAPI types.PublicWhisperAPI // only PublicWhisperAPI implements logic to send messages
2019-07-17 22:25:42 +00:00
keysManager *whisperServiceKeysManager
filters *transport.FiltersManager
2019-07-17 22:25:42 +00:00
logger *zap.Logger
mailservers []string
envelopesMonitor *EnvelopesMonitor
2019-07-17 22:25:42 +00:00
}
// NewTransport returns a new Transport.
2019-10-14 14:10:48 +00:00
// TODO: leaving a chat should verify that for a given public key
// there are no other chats. It may happen that we leave a private chat
// but still have a public chat for a given public key.
func NewTransport(
shh types.Whisper,
2019-07-17 22:25:42 +00:00
privateKey *ecdsa.PrivateKey,
2019-07-30 06:14:13 +00:00
db *sql.DB,
2019-07-17 22:25:42 +00:00
mailservers []string,
envelopesMonitorConfig *transport.EnvelopesMonitorConfig,
2019-07-17 22:25:42 +00:00
logger *zap.Logger,
2019-08-29 06:33:46 +00:00
opts ...Option,
) (*Transport, error) {
filtersManager, err := transport.NewFiltersManager(newSQLitePersistence(db), shh, privateKey, logger)
2019-07-17 22:25:42 +00:00
if err != nil {
return nil, err
}
var envelopesMonitor *EnvelopesMonitor
if envelopesMonitorConfig != nil {
envelopesMonitor = NewEnvelopesMonitor(shh, *envelopesMonitorConfig)
envelopesMonitor.Start()
}
2019-08-29 06:33:46 +00:00
var shhAPI types.PublicWhisperAPI
if shh != nil {
shhAPI = shh.PublicWhisperAPI()
}
t := &Transport{
shh: shh,
shhAPI: shhAPI,
envelopesMonitor: envelopesMonitor,
2019-07-17 22:25:42 +00:00
keysManager: &whisperServiceKeysManager{
shh: shh,
privateKey: privateKey,
passToSymKeyCache: make(map[string]string),
},
2019-08-29 06:33:46 +00:00
filters: filtersManager,
2019-07-17 22:25:42 +00:00
mailservers: mailservers,
logger: logger.With(zap.Namespace("Transport")),
2019-08-29 06:33:46 +00:00
}
for _, opt := range opts {
if err := opt(t); err != nil {
return nil, err
}
}
return t, nil
}
func (a *Transport) InitFilters(chatIDs []string, publicKeys []*ecdsa.PublicKey) ([]*transport.Filter, error) {
return a.filters.Init(chatIDs, publicKeys)
2019-08-29 06:33:46 +00:00
}
func (a *Transport) Filters() []*transport.Filter {
2019-08-29 06:33:46 +00:00
return a.filters.Filters()
2019-07-17 22:25:42 +00:00
}
// DEPRECATED
func (a *Transport) LoadFilters(filters []*transport.Filter) ([]*transport.Filter, error) {
return a.filters.InitWithFilters(filters)
2019-07-17 22:25:42 +00:00
}
// DEPRECATED
func (a *Transport) RemoveFilters(filters []*transport.Filter) error {
2019-08-29 06:33:46 +00:00
return a.filters.Remove(filters...)
2019-07-17 22:25:42 +00:00
}
func (a *Transport) ResetFilters() error {
2019-08-29 06:33:46 +00:00
return a.filters.Reset()
2019-07-17 22:25:42 +00:00
}
func (a *Transport) ProcessNegotiatedSecret(secret types.NegotiatedSecret) (*transport.Filter, error) {
2019-08-29 06:33:46 +00:00
filter, err := a.filters.LoadNegotiated(secret)
if err != nil {
return nil, err
}
return filter, nil
2019-07-17 22:25:42 +00:00
}
func (a *Transport) JoinPublic(chatID string) error {
2019-08-29 06:33:46 +00:00
_, err := a.filters.LoadPublic(chatID)
2019-07-17 22:25:42 +00:00
return err
}
func (a *Transport) LeavePublic(chatID string) error {
2019-08-29 06:33:46 +00:00
chat := a.filters.Filter(chatID)
2019-07-17 22:25:42 +00:00
if chat != nil {
return nil
}
2019-08-29 06:33:46 +00:00
return a.filters.Remove(chat)
2019-07-17 22:25:42 +00:00
}
func (a *Transport) JoinPrivate(publicKey *ecdsa.PublicKey) error {
2019-09-02 09:29:06 +00:00
_, err := a.filters.LoadDiscovery()
if err != nil {
return err
}
_, err = a.filters.LoadContactCode(publicKey)
2019-07-17 22:25:42 +00:00
return err
}
func (a *Transport) LeavePrivate(publicKey *ecdsa.PublicKey) error {
2019-08-29 06:33:46 +00:00
filters := a.filters.FiltersByPublicKey(publicKey)
return a.filters.Remove(filters...)
2019-07-17 22:25:42 +00:00
}
func (a *Transport) JoinGroup(publicKeys []*ecdsa.PublicKey) error {
2019-10-14 14:10:48 +00:00
_, err := a.filters.LoadDiscovery()
if err != nil {
return err
}
for _, pk := range publicKeys {
_, err = a.filters.LoadContactCode(pk)
if err != nil {
return err
}
}
return nil
}
func (a *Transport) LeaveGroup(publicKeys []*ecdsa.PublicKey) error {
2019-10-14 14:10:48 +00:00
for _, publicKey := range publicKeys {
filters := a.filters.FiltersByPublicKey(publicKey)
if err := a.filters.Remove(filters...); err != nil {
return err
}
}
return nil
}
type Message struct {
Message *types.Message
Public bool
}
func (a *Transport) RetrieveAllMessages() ([]Message, error) {
var messages []Message
2019-08-29 06:33:46 +00:00
for _, filter := range a.filters.Filters() {
filterMsgs, err := a.shhAPI.GetFilterMessages(filter.FilterID)
if err != nil {
return nil, err
}
for _, m := range filterMsgs {
messages = append(messages, Message{
Message: m,
Public: filter.IsPublic(),
})
}
}
return messages, nil
}
func (a *Transport) RetrievePublicMessages(chatID string) ([]*types.Message, error) {
2019-08-29 06:33:46 +00:00
filter, err := a.filters.LoadPublic(chatID)
2019-07-17 22:25:42 +00:00
if err != nil {
return nil, err
}
return a.shhAPI.GetFilterMessages(filter.FilterID)
2019-07-17 22:25:42 +00:00
}
func (a *Transport) RetrievePrivateMessages(publicKey *ecdsa.PublicKey) ([]*types.Message, error) {
2019-08-29 06:33:46 +00:00
chats := a.filters.FiltersByPublicKey(publicKey)
discoveryChats, err := a.filters.Init(nil, nil)
2019-07-17 22:25:42 +00:00
if err != nil {
return nil, err
}
var result []*types.Message
2019-07-17 22:25:42 +00:00
for _, chat := range append(chats, discoveryChats...) {
filterMsgs, err := a.shhAPI.GetFilterMessages(chat.FilterID)
if err != nil {
return nil, err
2019-07-17 22:25:42 +00:00
}
result = append(result, filterMsgs...)
2019-07-17 22:25:42 +00:00
}
return result, nil
}
func (a *Transport) RetrieveRawAll() (map[transport.Filter][]*types.Message, error) {
result := make(map[transport.Filter][]*types.Message)
2019-07-17 22:25:42 +00:00
allFilters := a.filters.Filters()
for _, filter := range allFilters {
msgs, err := a.shhAPI.GetFilterMessages(filter.FilterID)
if err != nil {
continue
}
result[*filter] = append(result[*filter], msgs...)
}
return result, nil
2019-07-17 22:25:42 +00:00
}
// SendPublic sends a new message using the Whisper service.
// For public filters, chat name is used as an ID as well as
2019-07-17 22:25:42 +00:00
// a topic.
func (a *Transport) SendPublic(ctx context.Context, newMessage *types.NewMessage, chatName string) ([]byte, error) {
if err := a.addSig(newMessage); err != nil {
2019-07-17 22:25:42 +00:00
return nil, err
}
2019-08-29 06:33:46 +00:00
filter, err := a.filters.LoadPublic(chatName)
2019-07-17 22:25:42 +00:00
if err != nil {
return nil, err
}
2019-08-29 06:33:46 +00:00
newMessage.SymKeyID = filter.SymKeyID
newMessage.Topic = filter.Topic
2019-07-17 22:25:42 +00:00
return a.shhAPI.Post(ctx, *newMessage)
2019-07-17 22:25:42 +00:00
}
func (a *Transport) SendPrivateWithSharedSecret(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey, secret []byte) ([]byte, error) {
if err := a.addSig(newMessage); err != nil {
2019-07-17 22:25:42 +00:00
return nil, err
}
filter, err := a.filters.LoadNegotiated(types.NegotiatedSecret{
2019-07-17 22:25:42 +00:00
PublicKey: publicKey,
Key: secret,
})
if err != nil {
return nil, err
}
2019-08-29 06:33:46 +00:00
newMessage.SymKeyID = filter.SymKeyID
newMessage.Topic = filter.Topic
2019-07-17 22:25:42 +00:00
newMessage.PublicKey = nil
return a.shhAPI.Post(ctx, *newMessage)
2019-07-17 22:25:42 +00:00
}
func (a *Transport) SendPrivateWithPartitioned(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error) {
if err := a.addSig(newMessage); err != nil {
2019-07-17 22:25:42 +00:00
return nil, err
}
filter, err := a.filters.LoadPartitioned(publicKey, a.keysManager.privateKey, false)
2019-07-17 22:25:42 +00:00
if err != nil {
return nil, err
}
newMessage.Topic = filter.Topic
2019-07-17 22:25:42 +00:00
newMessage.PublicKey = crypto.FromECDSAPub(publicKey)
return a.shhAPI.Post(ctx, *newMessage)
2019-07-17 22:25:42 +00:00
}
func (a *Transport) SendPrivateOnDiscovery(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error) {
if err := a.addSig(newMessage); err != nil {
2019-07-17 22:25:42 +00:00
return nil, err
}
// There is no need to load any chat
// because listening on the discovery topic
// is done automatically.
// TODO: change this anyway, it should be explicit
// and idempotent.
newMessage.Topic = types.BytesToTopic(transport.ToTopic(transport.DiscoveryTopic()))
2019-07-17 22:25:42 +00:00
newMessage.PublicKey = crypto.FromECDSAPub(publicKey)
return a.shhAPI.Post(ctx, *newMessage)
2019-07-17 22:25:42 +00:00
}
func (a *Transport) addSig(newMessage *types.NewMessage) error {
2019-07-17 22:25:42 +00:00
sigID, err := a.keysManager.AddOrGetKeyPair(a.keysManager.privateKey)
if err != nil {
return err
}
2019-10-28 13:50:33 +00:00
newMessage.SigID = sigID
2019-07-17 22:25:42 +00:00
return nil
}
func (a *Transport) Track(identifiers [][]byte, hash []byte, newMessage *types.NewMessage) {
if a.envelopesMonitor != nil {
a.envelopesMonitor.Add(identifiers, types.BytesToHash(hash), *newMessage)
2019-07-17 22:25:42 +00:00
}
}
// GetCurrentTime returns the current unix timestamp in milliseconds
func (a *Transport) GetCurrentTime() uint64 {
return uint64(a.shh.GetCurrentTime().UnixNano() / int64(time.Millisecond))
}
func (a *Transport) MaxMessageSize() uint32 {
return a.shh.MaxMessageSize()
}
func (a *Transport) Stop() error {
if a.envelopesMonitor != nil {
a.envelopesMonitor.Stop()
2019-07-17 22:25:42 +00:00
}
2019-09-02 09:29:06 +00:00
return nil
2019-07-17 22:25:42 +00:00
}
// RequestHistoricMessages requests historic messages for all registered filters.
func (a *Transport) SendMessagesRequest(
ctx context.Context,
peerID []byte,
from, to uint32,
previousCursor []byte,
) (cursor []byte, err error) {
topics := make([]types.TopicType, len(a.Filters()))
for _, f := range a.Filters() {
topics = append(topics, f.Topic)
}
2019-07-17 22:25:42 +00:00
r := createMessagesRequest(from, to, previousCursor, topics)
r.SetDefaults(a.shh.GetCurrentTime())
2019-07-17 22:25:42 +00:00
events := make(chan types.EnvelopeEvent, 10)
sub := a.shh.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
2019-07-17 22:25:42 +00:00
err = a.shh.SendMessagesRequest(peerID, r)
if err != nil {
return
}
2019-07-17 22:25:42 +00:00
resp, err := a.waitForRequestCompleted(ctx, r.ID, events)
if err == nil && resp != nil && resp.Error != nil {
err = resp.Error
} else if err == nil && resp != nil {
cursor = resp.Cursor
}
return
}
func (a *Transport) LoadKeyFilters(key *ecdsa.PrivateKey) (*transport.Filter, error) {
return a.filters.LoadPartitioned(&key.PublicKey, key, true)
}
func (a *Transport) waitForRequestCompleted(ctx context.Context, requestID []byte, events chan types.EnvelopeEvent) (*types.MailServerResponse, error) {
for {
select {
case ev := <-events:
a.logger.Debug(
"waiting for request completed and received an event",
zap.Binary("requestID", requestID),
zap.Any("event", ev),
)
if !bytes.Equal(ev.Hash.Bytes(), requestID) {
continue
}
if ev.Event != types.EventMailServerRequestCompleted {
continue
}
data, ok := ev.Data.(*types.MailServerResponse)
if ok {
return data, nil
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
2019-07-17 22:25:42 +00:00
}