status-protocol-go/transport/whisper/whisper_service.go

640 lines
17 KiB
Go
Raw Normal View History

package whisper
import (
"context"
"crypto/ecdsa"
"encoding/hex"
"fmt"
"log"
"math/big"
2019-07-16 10:43:07 +00:00
"path/filepath"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
2019-07-16 10:43:07 +00:00
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
"github.com/pkg/errors"
"github.com/status-im/status-go/mailserver"
whisper "github.com/status-im/whisper/whisperv6"
2019-07-16 10:43:07 +00:00
"github.com/status-im/status-protocol-go/sqlite"
"github.com/status-im/status-protocol-go/transport/whisper/filter"
migrations "github.com/status-im/status-protocol-go/transport/whisper/internal/sqlite"
)
const (
// defaultRequestTimeout is the default request timeout in seconds
defaultRequestTimeout = 10
)
var (
// ErrNoMailservers returned if there is no configured mailservers that can be used.
ErrNoMailservers = errors.New("no configured mailservers")
)
2019-07-16 10:43:07 +00:00
type whisperServiceKeysManager struct {
shh *whisper.Whisper
// Identity of the current user.
privateKey *ecdsa.PrivateKey
passToSymKeyMutex sync.RWMutex
passToSymKeyCache map[string]string
}
2019-07-16 10:43:07 +00:00
func (m *whisperServiceKeysManager) AddOrGetKeyPair(priv *ecdsa.PrivateKey) (string, error) {
// caching is handled in Whisper
return m.shh.AddKeyPair(priv)
}
2019-07-16 10:43:07 +00:00
func (m *whisperServiceKeysManager) AddOrGetSymKeyFromPassword(password string) (string, error) {
m.passToSymKeyMutex.Lock()
defer m.passToSymKeyMutex.Unlock()
if val, ok := m.passToSymKeyCache[password]; ok {
return val, nil
}
id, err := m.shh.AddSymKeyFromPassword(password)
if err != nil {
return id, err
}
m.passToSymKeyCache[password] = id
return id, nil
}
2019-07-16 10:43:07 +00:00
func (m *whisperServiceKeysManager) RawSymKey(id string) ([]byte, error) {
return m.shh.GetSymKey(id)
}
// WhisperServiceTransport is a transport based on Whisper service.
type WhisperServiceTransport struct {
2019-07-16 10:43:07 +00:00
node Server
shh *whisper.Whisper
2019-07-16 10:43:07 +00:00
shhAPI *whisper.PublicWhisperAPI // only PublicWhisperAPI implements logic to send messages
keysManager *whisperServiceKeysManager
chats *filter.ChatsManager
mailservers []string
selectedMailServerEnode string
}
var _ WhisperTransport = (*WhisperServiceTransport)(nil)
// NewWhisperService returns a new WhisperServiceTransport.
func NewWhisperServiceTransport(
2019-07-16 10:43:07 +00:00
node Server,
shh *whisper.Whisper,
privateKey *ecdsa.PrivateKey,
2019-07-16 10:43:07 +00:00
dataDir string,
dbKey string,
mailservers []string,
) (*WhisperServiceTransport, error) {
2019-07-16 10:43:07 +00:00
// DB is shared between this package and all sub-packages.
dbPath := filepath.Join(dataDir, "transport.sql")
db, err := sqlite.Open(dbPath, dbKey, sqlite.MigrationConfig{
AssetNames: migrations.AssetNames(),
AssetGetter: func(name string) ([]byte, error) {
return migrations.Asset(name)
},
})
if err != nil {
return nil, err
}
chats, err := filter.New(db, shh, privateKey)
if err != nil {
return nil, err
}
return &WhisperServiceTransport{
2019-07-16 10:43:07 +00:00
node: node,
shh: shh,
shhAPI: whisper.NewPublicWhisperAPI(shh),
keysManager: &whisperServiceKeysManager{
shh: shh,
privateKey: privateKey,
passToSymKeyCache: make(map[string]string),
},
2019-07-16 10:43:07 +00:00
chats: chats,
mailservers: mailservers,
}, nil
}
2019-07-16 10:43:07 +00:00
func (a *WhisperServiceTransport) Init(
chatIDs []string,
publicKeys []*ecdsa.PublicKey,
negotiated []filter.NegotiatedSecret,
genericDiscoveryTopicEnabled bool,
2019-07-16 10:43:07 +00:00
) ([]*filter.Chat, error) {
return a.chats.Init(chatIDs, publicKeys, negotiated, genericDiscoveryTopicEnabled)
2019-07-16 10:43:07 +00:00
}
func (a *WhisperServiceTransport) ProcessNegotiatedSecret(secret filter.NegotiatedSecret) error {
_, err := a.chats.LoadNegotiated(secret)
return err
}
func (a *WhisperServiceTransport) JoinPublic(chatID string) error {
_, err := a.chats.LoadPublic(chatID)
return err
}
2019-07-16 10:43:07 +00:00
func (a *WhisperServiceTransport) LeavePublic(chatID string) error {
chat := a.chats.ChatByID(chatID)
if chat != nil {
return nil
}
return a.chats.Remove(chat)
}
func (a *WhisperServiceTransport) JoinPrivate(publicKey *ecdsa.PublicKey) error {
_, err := a.chats.LoadContactCode(publicKey)
return err
}
2019-07-16 10:43:07 +00:00
func (a *WhisperServiceTransport) LeavePrivate(publicKey *ecdsa.PublicKey) error {
chats := a.chats.ChatsByPublicKey(publicKey)
return a.chats.Remove(chats...)
}
func (a *WhisperServiceTransport) RetrievePublicMessages(chatID string) ([]*whisper.ReceivedMessage, error) {
chat, err := a.chats.LoadPublic(chatID)
if err != nil {
return nil, err
}
2019-07-16 10:43:07 +00:00
f := a.shh.GetFilter(chat.FilterID)
if f == nil {
return nil, errors.New("failed to return a filter")
}
2019-07-16 10:43:07 +00:00
return f.Retrieve(), nil
}
2019-07-16 10:43:07 +00:00
func (a *WhisperServiceTransport) RetrievePrivateMessages(publicKey *ecdsa.PublicKey) ([]*whisper.ReceivedMessage, error) {
chats := a.chats.ChatsByPublicKey(publicKey)
discoveryChats, err := a.chats.LoadDiscovery()
if err != nil {
return nil, err
}
2019-07-16 10:43:07 +00:00
var result []*whisper.ReceivedMessage
2019-07-16 10:43:07 +00:00
for _, chat := range append(chats, discoveryChats...) {
f := a.shh.GetFilter(chat.FilterID)
if f == nil {
return nil, errors.New("failed to return a filter")
}
2019-07-16 10:43:07 +00:00
result = append(result, f.Retrieve()...)
}
2019-07-16 10:43:07 +00:00
return result, nil
}
2019-07-16 10:43:07 +00:00
// SendPublic sends a new message using the Whisper service.
// For public chats, chat name is used as an ID as well as
// a topic.
func (a *WhisperServiceTransport) SendPublic(ctx context.Context, newMessage whisper.NewMessage, chatName string) ([]byte, error) {
if err := a.addSig(&newMessage); err != nil {
return nil, err
}
chat, err := a.chats.LoadPublic(chatName)
if err != nil {
return nil, err
}
newMessage.SymKeyID = chat.SymKeyID
newMessage.Topic = chat.Topic
return a.shhAPI.Post(ctx, newMessage)
}
func (a *WhisperServiceTransport) SendPrivateWithSharedSecret(ctx context.Context, newMessage whisper.NewMessage, publicKey *ecdsa.PublicKey, secret []byte) ([]byte, error) {
if err := a.addSig(&newMessage); err != nil {
return nil, err
}
chat, err := a.chats.LoadNegotiated(filter.NegotiatedSecret{
PublicKey: publicKey,
Key: secret,
})
if err != nil {
return nil, err
}
newMessage.SymKeyID = chat.SymKeyID
newMessage.Topic = chat.Topic
newMessage.PublicKey = nil
return a.shhAPI.Post(ctx, newMessage)
}
func (a *WhisperServiceTransport) SendPrivateWithPartitioned(ctx context.Context, newMessage whisper.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error) {
if err := a.addSig(&newMessage); err != nil {
return nil, err
}
chat, err := a.chats.LoadPartitioned(publicKey)
if err != nil {
return nil, err
}
newMessage.Topic = chat.Topic
newMessage.PublicKey = crypto.FromECDSAPub(publicKey)
return a.shhAPI.Post(ctx, newMessage)
}
func (a *WhisperServiceTransport) SendPrivateOnDiscovery(ctx context.Context, newMessage whisper.NewMessage, publicKey *ecdsa.PublicKey) ([]byte, error) {
if err := a.addSig(&newMessage); err != nil {
return nil, err
}
// There is no need to load any chat
// because listening on the discovery topic
// is done automatically.
// TODO: change this anyway, it should be explicit
// and idempotent.
newMessage.Topic = whisper.BytesToTopic(
filter.ToTopic(filter.DiscoveryTopic),
)
newMessage.PublicKey = crypto.FromECDSAPub(publicKey)
return a.shhAPI.Post(ctx, newMessage)
}
func (a *WhisperServiceTransport) addSig(newMessage *whisper.NewMessage) error {
sigID, err := a.keysManager.AddOrGetKeyPair(a.keysManager.privateKey)
if err != nil {
return err
}
newMessage.Sig = sigID
return nil
}
// Request requests messages from mail servers.
func (a *WhisperServiceTransport) Request(ctx context.Context, options RequestOptions) error {
// TODO: remove from here. MailServerEnode must be provided in the params.
enode, err := a.selectAndAddMailServer()
if err != nil {
return err
}
keyID, err := a.keysManager.AddOrGetSymKeyFromPassword(options.Password)
if err != nil {
return err
}
req, err := createRequestMessagesParam(enode, keyID, options)
if err != nil {
return err
}
_, err = a.requestMessages(ctx, req, true)
return err
}
func (a *WhisperServiceTransport) requestMessages(ctx context.Context, req MessagesRequest, followCursor bool) (resp MessagesResponse, err error) {
log.Printf("[WhisperServiceTransport::requestMessages] request for a chunk with %d messages", req.Limit)
start := time.Now()
resp, err = a.requestMessagesWithRetry(RetryConfig{
BaseTimeout: time.Second * 10,
StepTimeout: time.Second,
MaxRetries: 3,
}, req)
if err != nil {
log.Printf("[WhisperServiceTransport::requestMessages] failed with err: %v", err)
return
}
log.Printf("[WhisperServiceTransport::requestMessages] delivery of %d message took %v", req.Limit, time.Since(start))
log.Printf("[WhisperServiceTransport::requestMessages] response: %+v", resp)
if resp.Error != nil {
err = resp.Error
return
}
if !followCursor || resp.Cursor == "" {
return
}
req.Cursor = resp.Cursor
log.Printf("[WhisperServiceTransport::requestMessages] request messages with cursor %v", req.Cursor)
return a.requestMessages(ctx, req, true)
}
// MessagesRequest is a RequestMessages() request payload.
type MessagesRequest struct {
// MailServerPeer is MailServer's enode address.
MailServerPeer string `json:"mailServerPeer"`
// From is a lower bound of time range (optional).
// Default is 24 hours back from now.
From uint32 `json:"from"`
// To is a upper bound of time range (optional).
// Default is now.
To uint32 `json:"to"`
// Limit determines the number of messages sent by the mail server
// for the current paginated request
Limit uint32 `json:"limit"`
// Cursor is used as starting point for paginated requests
Cursor string `json:"cursor"`
// Topic is a regular Whisper topic.
// DEPRECATED
Topic whisper.TopicType `json:"topic"`
// Topics is a list of Whisper topics.
Topics []whisper.TopicType `json:"topics"`
// SymKeyID is an ID of a symmetric key to authenticate to MailServer.
// It's derived from MailServer password.
SymKeyID string `json:"symKeyID"`
// Timeout is the time to live of the request specified in seconds.
// Default is 10 seconds
Timeout time.Duration `json:"timeout"`
// Force ensures that requests will bypass enforced delay.
// TODO(adam): it's currently not handled.
Force bool `json:"force"`
}
func (r *MessagesRequest) setDefaults(now time.Time) {
// set From and To defaults
if r.To == 0 {
r.To = uint32(now.UTC().Unix())
}
if r.From == 0 {
oneDay := uint32(86400) // -24 hours
if r.To < oneDay {
r.From = 0
} else {
r.From = r.To - oneDay
}
}
if r.Timeout == 0 {
r.Timeout = defaultRequestTimeout
}
}
type MessagesResponse struct {
// Cursor from the response can be used to retrieve more messages
// for the previous request.
Cursor string `json:"cursor"`
// Error indicates that something wrong happened when sending messages
// to the requester.
Error error `json:"error"`
}
// RetryConfig specifies configuration for retries with timeout and max amount of retries.
type RetryConfig struct {
BaseTimeout time.Duration
// StepTimeout defines duration increase per each retry.
StepTimeout time.Duration
MaxRetries int
}
func (a *WhisperServiceTransport) requestMessagesWithRetry(conf RetryConfig, r MessagesRequest) (MessagesResponse, error) {
var (
resp MessagesResponse
requestID hexutil.Bytes
err error
retries int
)
events := make(chan whisper.EnvelopeEvent, 10)
for retries <= conf.MaxRetries {
sub := a.shh.SubscribeEnvelopeEvents(events)
r.Timeout = conf.BaseTimeout + conf.StepTimeout*time.Duration(retries)
timeout := r.Timeout
// FIXME this weird conversion is required because MessagesRequest expects seconds but defines time.Duration
r.Timeout = time.Duration(int(r.Timeout.Seconds()))
requestID, err = a.requestMessagesSync(context.Background(), r)
if err != nil {
sub.Unsubscribe()
return resp, err
}
mailServerResp, err := waitForExpiredOrCompleted(common.BytesToHash(requestID), events, timeout)
sub.Unsubscribe()
if err == nil {
resp.Cursor = hex.EncodeToString(mailServerResp.Cursor)
resp.Error = mailServerResp.Error
return resp, nil
}
retries++
log.Printf("requestMessagesSync failed, retyring %d: %v", retries, err)
}
return resp, fmt.Errorf("failed to request messages after %d retries", retries)
}
// RequestMessages sends a request for historic messages to a MailServer.
func (a *WhisperServiceTransport) requestMessagesSync(_ context.Context, r MessagesRequest) (hexutil.Bytes, error) {
now := a.shh.GetCurrentTime()
r.setDefaults(now)
if r.From > r.To {
return nil, fmt.Errorf("Query range is invalid: from > to (%d > %d)", r.From, r.To)
}
// TODO: bring mailserverspackage here
mailServerNode, err := enode.ParseV4(r.MailServerPeer)
if err != nil {
return nil, fmt.Errorf("invalid MailServerPeer: %v", err)
}
var (
symKey []byte
publicKey *ecdsa.PublicKey
)
if r.SymKeyID != "" {
symKey, err = a.shh.GetSymKey(r.SymKeyID)
if err != nil {
return nil, fmt.Errorf("invalid SymKeyID: %v", err)
}
} else {
publicKey = mailServerNode.Pubkey()
}
payload, err := makeMessagesRequestPayload(r)
if err != nil {
return nil, err
}
envelope, err := makeEnvelop(
payload,
symKey,
publicKey,
a.node.NodeID(),
a.shh.MinPow(),
now,
)
if err != nil {
return nil, err
}
hash := envelope.Hash()
if err := a.shh.RequestHistoricMessagesWithTimeout(mailServerNode.ID().Bytes(), envelope, r.Timeout*time.Second); err != nil {
return nil, err
}
return hash[:], nil
}
2019-07-16 10:43:07 +00:00
func (a *WhisperServiceTransport) selectAndAddMailServer() (string, error) {
var enodeAddr string
if a.selectedMailServerEnode != "" {
enodeAddr = a.selectedMailServerEnode
} else {
if len(a.mailservers) == 0 {
return "", ErrNoMailservers
}
enodeAddr = randomItem(a.mailservers)
}
2019-07-16 10:43:07 +00:00
log.Printf("dialing mail server %s", enodeAddr)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
err := dial(ctx, a.node, enodeAddr, dialOpts{PollInterval: 200 * time.Millisecond})
cancel()
if err == nil {
a.selectedMailServerEnode = enodeAddr
return enodeAddr, nil
}
2019-07-16 10:43:07 +00:00
return "", fmt.Errorf("peer %s failed to connect: %v", enodeAddr, err)
}
func createRequestMessagesParam(enode, symKeyID string, options RequestOptions) (MessagesRequest, error) {
req := MessagesRequest{
MailServerPeer: enode,
From: uint32(options.From), // TODO: change to int in status-go
To: uint32(options.To), // TODO: change to int in status-go
Limit: uint32(options.Limit), // TODO: change to int in status-go
SymKeyID: symKeyID,
Topics: options.Topics,
}
return req, nil
}
func waitForExpiredOrCompleted(requestID common.Hash, events chan whisper.EnvelopeEvent, timeout time.Duration) (*whisper.MailServerResponse, error) {
expired := fmt.Errorf("request %x expired", requestID)
after := time.NewTimer(timeout)
defer after.Stop()
for {
var ev whisper.EnvelopeEvent
select {
case ev = <-events:
case <-after.C:
return nil, expired
}
if ev.Hash != requestID {
continue
}
switch ev.Event {
case whisper.EventMailServerRequestCompleted:
data, ok := ev.Data.(*whisper.MailServerResponse)
if ok {
return data, nil
}
return nil, errors.New("invalid event data type")
case whisper.EventMailServerRequestExpired:
return nil, expired
}
}
}
// makeMessagesRequestPayload makes a specific payload for MailServer
// to request historic messages.
func makeMessagesRequestPayload(r MessagesRequest) ([]byte, error) {
cursor, err := hex.DecodeString(r.Cursor)
if err != nil {
return nil, fmt.Errorf("invalid cursor: %v", err)
}
if len(cursor) > 0 && len(cursor) != mailserver.CursorLength {
return nil, fmt.Errorf("invalid cursor size: expected %d but got %d", mailserver.CursorLength, len(cursor))
}
payload := mailserver.MessagesRequestPayload{
Lower: r.From,
Upper: r.To,
Bloom: createBloomFilter(r),
Limit: r.Limit,
Cursor: cursor,
// Client must tell the MailServer if it supports batch responses.
// This can be removed in the future.
Batch: true,
}
return rlp.EncodeToBytes(payload)
}
// makeEnvelop makes an envelop for a historic messages request.
// Symmetric key is used to authenticate to MailServer.
// PK is the current node ID.
func makeEnvelop(
payload []byte,
symKey []byte,
publicKey *ecdsa.PublicKey,
nodeID *ecdsa.PrivateKey,
pow float64,
now time.Time,
) (*whisper.Envelope, error) {
params := whisper.MessageParams{
PoW: pow,
Payload: payload,
2019-07-16 10:43:07 +00:00
WorkTime: DefaultWhisperMessage().PowTime,
Src: nodeID,
}
// Either symKey or public key is required.
// This condition is verified in `message.Wrap()` method.
if len(symKey) > 0 {
params.KeySym = symKey
} else if publicKey != nil {
params.Dst = publicKey
}
message, err := whisper.NewSentMessage(&params)
if err != nil {
return nil, err
}
return message.Wrap(&params, now)
}
func createBloomFilter(r MessagesRequest) []byte {
if len(r.Topics) > 0 {
return topicsToBloom(r.Topics...)
}
return whisper.TopicToBloom(r.Topic)
}
func topicsToBloom(topics ...whisper.TopicType) []byte {
i := new(big.Int)
for _, topic := range topics {
bloom := whisper.TopicToBloom(topic)
i.Or(i, new(big.Int).SetBytes(bloom[:]))
}
combined := make([]byte, whisper.BloomFilterSize)
data := i.Bytes()
copy(combined[whisper.BloomFilterSize-len(data):], data[:])
return combined
}