2018-04-11 15:41:51 +00:00
|
|
|
package shhext
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2018-04-26 05:56:19 +00:00
|
|
|
"crypto/ecdsa"
|
2018-07-02 07:38:10 +00:00
|
|
|
"encoding/hex"
|
2018-04-26 05:56:19 +00:00
|
|
|
"errors"
|
|
|
|
"fmt"
|
2018-10-15 21:15:04 +00:00
|
|
|
"math/big"
|
2018-04-26 05:56:19 +00:00
|
|
|
"time"
|
2018-04-11 15:41:51 +00:00
|
|
|
|
2018-10-19 09:09:13 +00:00
|
|
|
"github.com/ethereum/go-ethereum/rlp"
|
|
|
|
|
2018-04-11 15:41:51 +00:00
|
|
|
"github.com/ethereum/go-ethereum/common"
|
2018-04-24 15:50:26 +00:00
|
|
|
"github.com/ethereum/go-ethereum/common/hexutil"
|
2018-09-24 18:07:34 +00:00
|
|
|
"github.com/ethereum/go-ethereum/crypto"
|
2019-05-17 11:06:56 +00:00
|
|
|
|
2018-04-26 05:56:19 +00:00
|
|
|
"github.com/ethereum/go-ethereum/log"
|
2018-11-14 07:03:58 +00:00
|
|
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
2019-05-17 11:06:56 +00:00
|
|
|
"github.com/golang/protobuf/proto"
|
2019-04-30 06:46:12 +00:00
|
|
|
"github.com/status-im/status-go/db"
|
2018-12-11 10:23:47 +00:00
|
|
|
"github.com/status-im/status-go/mailserver"
|
2018-09-24 18:07:34 +00:00
|
|
|
"github.com/status-im/status-go/services/shhext/chat"
|
2019-02-19 12:58:42 +00:00
|
|
|
"github.com/status-im/status-go/services/shhext/dedup"
|
2018-12-05 13:57:05 +00:00
|
|
|
"github.com/status-im/status-go/services/shhext/mailservers"
|
2018-09-25 07:05:38 +00:00
|
|
|
whisper "github.com/status-im/whisper/whisperv6"
|
2018-04-11 15:41:51 +00:00
|
|
|
)
|
|
|
|
|
2018-04-26 05:56:19 +00:00
|
|
|
const (
|
|
|
|
// defaultWorkTime is a work time reported in messages sent to MailServer nodes.
|
|
|
|
defaultWorkTime = 5
|
2018-06-15 15:12:31 +00:00
|
|
|
// defaultRequestTimeout is the default request timeout in seconds
|
|
|
|
defaultRequestTimeout = 10
|
2018-04-26 05:56:19 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
var (
|
|
|
|
// ErrInvalidMailServerPeer is returned when it fails to parse enode from params.
|
|
|
|
ErrInvalidMailServerPeer = errors.New("invalid mailServerPeer value")
|
|
|
|
// ErrInvalidSymKeyID is returned when it fails to get a symmetric key.
|
|
|
|
ErrInvalidSymKeyID = errors.New("invalid symKeyID value")
|
2018-07-04 09:30:57 +00:00
|
|
|
// ErrInvalidPublicKey is returned when public key can't be extracted
|
|
|
|
// from MailServer's nodeID.
|
|
|
|
ErrInvalidPublicKey = errors.New("can't extract public key")
|
2018-09-24 18:07:34 +00:00
|
|
|
// ErrPFSNotEnabled is returned when an endpoint PFS only is called but
|
|
|
|
// PFS is disabled
|
|
|
|
ErrPFSNotEnabled = errors.New("pfs not enabled")
|
2018-04-26 05:56:19 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// -----
|
|
|
|
// PAYLOADS
|
|
|
|
// -----
|
|
|
|
|
2018-12-14 11:21:34 +00:00
|
|
|
// MessagesRequest is a RequestMessages() request payload.
|
2018-04-26 05:56:19 +00:00
|
|
|
type MessagesRequest struct {
|
|
|
|
// MailServerPeer is MailServer's enode address.
|
|
|
|
MailServerPeer string `json:"mailServerPeer"`
|
|
|
|
|
|
|
|
// From is a lower bound of time range (optional).
|
|
|
|
// Default is 24 hours back from now.
|
|
|
|
From uint32 `json:"from"`
|
|
|
|
|
|
|
|
// To is a upper bound of time range (optional).
|
|
|
|
// Default is now.
|
|
|
|
To uint32 `json:"to"`
|
|
|
|
|
2018-07-02 07:38:10 +00:00
|
|
|
// Limit determines the number of messages sent by the mail server
|
|
|
|
// for the current paginated request
|
|
|
|
Limit uint32 `json:"limit"`
|
|
|
|
|
|
|
|
// Cursor is used as starting point for paginated requests
|
|
|
|
Cursor string `json:"cursor"`
|
|
|
|
|
2018-04-26 05:56:19 +00:00
|
|
|
// Topic is a regular Whisper topic.
|
2018-10-15 21:15:04 +00:00
|
|
|
// DEPRECATED
|
2018-04-26 05:56:19 +00:00
|
|
|
Topic whisper.TopicType `json:"topic"`
|
|
|
|
|
2018-10-15 21:15:04 +00:00
|
|
|
// Topics is a list of Whisper topics.
|
|
|
|
Topics []whisper.TopicType `json:"topics"`
|
|
|
|
|
2018-04-26 05:56:19 +00:00
|
|
|
// SymKeyID is an ID of a symmetric key to authenticate to MailServer.
|
|
|
|
// It's derived from MailServer password.
|
|
|
|
SymKeyID string `json:"symKeyID"`
|
2018-06-15 15:12:31 +00:00
|
|
|
|
|
|
|
// Timeout is the time to live of the request specified in seconds.
|
|
|
|
// Default is 10 seconds
|
|
|
|
Timeout time.Duration `json:"timeout"`
|
2019-01-15 09:21:33 +00:00
|
|
|
|
|
|
|
// Force ensures that requests will bypass enforced delay.
|
|
|
|
Force bool `json:"force"`
|
2018-04-26 05:56:19 +00:00
|
|
|
}
|
|
|
|
|
2018-05-04 08:23:38 +00:00
|
|
|
func (r *MessagesRequest) setDefaults(now time.Time) {
|
2018-04-26 05:56:19 +00:00
|
|
|
// set From and To defaults
|
2018-05-10 14:47:54 +00:00
|
|
|
if r.To == 0 {
|
2018-05-04 08:23:38 +00:00
|
|
|
r.To = uint32(now.UTC().Unix())
|
2018-04-11 15:41:51 +00:00
|
|
|
}
|
2018-05-10 14:47:54 +00:00
|
|
|
|
|
|
|
if r.From == 0 {
|
|
|
|
oneDay := uint32(86400) // -24 hours
|
|
|
|
if r.To < oneDay {
|
|
|
|
r.From = 0
|
|
|
|
} else {
|
|
|
|
r.From = r.To - oneDay
|
|
|
|
}
|
|
|
|
}
|
2018-06-15 15:12:31 +00:00
|
|
|
|
|
|
|
if r.Timeout == 0 {
|
|
|
|
r.Timeout = defaultRequestTimeout
|
|
|
|
}
|
2018-04-11 15:41:51 +00:00
|
|
|
}
|
|
|
|
|
2019-03-15 13:27:08 +00:00
|
|
|
// MessagesResponse is a response for shhext_requestMessages2 method.
|
|
|
|
type MessagesResponse struct {
|
|
|
|
// Cursor from the response can be used to retrieve more messages
|
|
|
|
// for the previous request.
|
|
|
|
Cursor string `json:"cursor"`
|
|
|
|
|
|
|
|
// Error indicates that something wrong happened when sending messages
|
|
|
|
// to the requester.
|
|
|
|
Error error `json:"error"`
|
|
|
|
}
|
|
|
|
|
2018-12-14 11:21:34 +00:00
|
|
|
// SyncMessagesRequest is a SyncMessages() request payload.
|
|
|
|
type SyncMessagesRequest struct {
|
|
|
|
// MailServerPeer is MailServer's enode address.
|
|
|
|
MailServerPeer string `json:"mailServerPeer"`
|
|
|
|
|
|
|
|
// From is a lower bound of time range (optional).
|
|
|
|
// Default is 24 hours back from now.
|
|
|
|
From uint32 `json:"from"`
|
|
|
|
|
|
|
|
// To is a upper bound of time range (optional).
|
|
|
|
// Default is now.
|
|
|
|
To uint32 `json:"to"`
|
|
|
|
|
|
|
|
// Limit determines the number of messages sent by the mail server
|
|
|
|
// for the current paginated request
|
|
|
|
Limit uint32 `json:"limit"`
|
|
|
|
|
|
|
|
// Cursor is used as starting point for paginated requests
|
|
|
|
Cursor string `json:"cursor"`
|
|
|
|
|
|
|
|
// Topics is a list of Whisper topics.
|
|
|
|
// If empty, a full bloom filter will be used.
|
|
|
|
Topics []whisper.TopicType `json:"topics"`
|
|
|
|
}
|
|
|
|
|
|
|
|
// SyncMessagesResponse is a response from the mail server
|
|
|
|
// to which SyncMessagesRequest was sent.
|
|
|
|
type SyncMessagesResponse struct {
|
|
|
|
// Cursor from the response can be used to retrieve more messages
|
|
|
|
// for the previous request.
|
|
|
|
Cursor string `json:"cursor"`
|
|
|
|
|
|
|
|
// Error indicates that something wrong happened when sending messages
|
|
|
|
// to the requester.
|
|
|
|
Error string `json:"error"`
|
|
|
|
}
|
|
|
|
|
2019-04-30 06:46:12 +00:00
|
|
|
// InitiateHistoryRequestParams type for initiating history requests from a peer.
|
|
|
|
type InitiateHistoryRequestParams struct {
|
|
|
|
Peer string
|
|
|
|
SymKeyID string
|
|
|
|
Requests []TopicRequest
|
|
|
|
Force bool
|
|
|
|
Timeout time.Duration
|
|
|
|
}
|
|
|
|
|
2018-04-26 05:56:19 +00:00
|
|
|
// -----
|
|
|
|
// PUBLIC API
|
|
|
|
// -----
|
|
|
|
|
2018-04-11 15:41:51 +00:00
|
|
|
// PublicAPI extends whisper public API.
|
|
|
|
type PublicAPI struct {
|
2018-04-26 05:56:19 +00:00
|
|
|
service *Service
|
2018-04-24 15:50:26 +00:00
|
|
|
publicAPI *whisper.PublicWhisperAPI
|
2018-04-26 05:56:19 +00:00
|
|
|
log log.Logger
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewPublicAPI returns instance of the public API.
|
|
|
|
func NewPublicAPI(s *Service) *PublicAPI {
|
|
|
|
return &PublicAPI{
|
|
|
|
service: s,
|
|
|
|
publicAPI: whisper.NewPublicWhisperAPI(s.w),
|
|
|
|
log: log.New("package", "status-go/services/sshext.PublicAPI"),
|
|
|
|
}
|
2018-04-11 15:41:51 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Post shamelessly copied from whisper codebase with slight modifications.
|
2019-03-01 13:36:21 +00:00
|
|
|
func (api *PublicAPI) Post(ctx context.Context, req whisper.NewMessage) (hexutil.Bytes, error) {
|
|
|
|
hexID, err := api.publicAPI.Post(ctx, req)
|
2018-04-11 15:41:51 +00:00
|
|
|
if err == nil {
|
2019-03-01 13:36:21 +00:00
|
|
|
api.service.envelopesMonitor.Add(common.BytesToHash(hexID), req)
|
|
|
|
} else {
|
|
|
|
return nil, err
|
2018-04-11 15:41:51 +00:00
|
|
|
}
|
2019-03-01 13:36:21 +00:00
|
|
|
mID := messageID(req)
|
|
|
|
return mID[:], err
|
2018-04-11 15:41:51 +00:00
|
|
|
}
|
2018-04-26 05:56:19 +00:00
|
|
|
|
2018-12-05 13:57:05 +00:00
|
|
|
func (api *PublicAPI) getPeer(rawurl string) (*enode.Node, error) {
|
|
|
|
if len(rawurl) == 0 {
|
|
|
|
return mailservers.GetFirstConnected(api.service.server, api.service.peerStore)
|
|
|
|
}
|
|
|
|
return enode.ParseV4(rawurl)
|
|
|
|
}
|
|
|
|
|
2019-01-21 14:00:10 +00:00
|
|
|
// RetryConfig specifies configuration for retries with timeout and max amount of retries.
|
|
|
|
type RetryConfig struct {
|
|
|
|
BaseTimeout time.Duration
|
|
|
|
// StepTimeout defines duration increase per each retry.
|
|
|
|
StepTimeout time.Duration
|
|
|
|
MaxRetries int
|
|
|
|
}
|
|
|
|
|
|
|
|
// RequestMessagesSync repeats MessagesRequest using configuration in retry conf.
|
2019-03-15 13:27:08 +00:00
|
|
|
func (api *PublicAPI) RequestMessagesSync(conf RetryConfig, r MessagesRequest) (MessagesResponse, error) {
|
|
|
|
var resp MessagesResponse
|
|
|
|
|
2019-01-21 14:00:10 +00:00
|
|
|
shh := api.service.w
|
|
|
|
events := make(chan whisper.EnvelopeEvent, 10)
|
|
|
|
var (
|
|
|
|
requestID hexutil.Bytes
|
|
|
|
err error
|
|
|
|
retries int
|
|
|
|
)
|
|
|
|
for retries <= conf.MaxRetries {
|
2019-05-20 08:10:26 +00:00
|
|
|
sub := shh.SubscribeEnvelopeEvents(events)
|
2019-01-21 14:00:10 +00:00
|
|
|
r.Timeout = conf.BaseTimeout + conf.StepTimeout*time.Duration(retries)
|
2019-05-20 08:10:26 +00:00
|
|
|
timeout := r.Timeout
|
2019-01-21 14:00:10 +00:00
|
|
|
// FIXME this weird conversion is required because MessagesRequest expects seconds but defines time.Duration
|
|
|
|
r.Timeout = time.Duration(int(r.Timeout.Seconds()))
|
|
|
|
requestID, err = api.RequestMessages(context.Background(), r)
|
|
|
|
if err != nil {
|
2019-05-20 08:10:26 +00:00
|
|
|
sub.Unsubscribe()
|
2019-03-15 13:27:08 +00:00
|
|
|
return resp, err
|
2019-01-21 14:00:10 +00:00
|
|
|
}
|
2019-05-20 08:10:26 +00:00
|
|
|
mailServerResp, err := waitForExpiredOrCompleted(common.BytesToHash(requestID), events, timeout)
|
|
|
|
sub.Unsubscribe()
|
2019-01-21 14:00:10 +00:00
|
|
|
if err == nil {
|
2019-03-15 13:27:08 +00:00
|
|
|
resp.Cursor = hex.EncodeToString(mailServerResp.Cursor)
|
|
|
|
resp.Error = mailServerResp.Error
|
|
|
|
return resp, nil
|
2019-01-21 14:00:10 +00:00
|
|
|
}
|
|
|
|
retries++
|
2019-03-15 13:27:08 +00:00
|
|
|
api.log.Error("[RequestMessagesSync] failed", "err", err, "retries", retries)
|
2019-01-21 14:00:10 +00:00
|
|
|
}
|
2019-03-15 13:27:08 +00:00
|
|
|
return resp, fmt.Errorf("failed to request messages after %d retries", retries)
|
2019-01-21 14:00:10 +00:00
|
|
|
}
|
|
|
|
|
2019-05-20 08:10:26 +00:00
|
|
|
func waitForExpiredOrCompleted(requestID common.Hash, events chan whisper.EnvelopeEvent, timeout time.Duration) (*whisper.MailServerResponse, error) {
|
|
|
|
expired := fmt.Errorf("request %x expired", requestID)
|
|
|
|
after := time.NewTimer(timeout)
|
|
|
|
defer after.Stop()
|
2019-01-21 14:00:10 +00:00
|
|
|
for {
|
2019-05-20 08:10:26 +00:00
|
|
|
var ev whisper.EnvelopeEvent
|
|
|
|
select {
|
|
|
|
case ev = <-events:
|
|
|
|
case <-after.C:
|
|
|
|
return nil, expired
|
|
|
|
}
|
2019-01-21 14:00:10 +00:00
|
|
|
if ev.Hash != requestID {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
switch ev.Event {
|
|
|
|
case whisper.EventMailServerRequestCompleted:
|
2019-03-15 13:27:08 +00:00
|
|
|
data, ok := ev.Data.(*whisper.MailServerResponse)
|
|
|
|
if ok {
|
|
|
|
return data, nil
|
|
|
|
}
|
|
|
|
return nil, errors.New("invalid event data type")
|
2019-01-21 14:00:10 +00:00
|
|
|
case whisper.EventMailServerRequestExpired:
|
2019-05-20 08:10:26 +00:00
|
|
|
return nil, expired
|
2019-01-21 14:00:10 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-04-26 05:56:19 +00:00
|
|
|
// RequestMessages sends a request for historic messages to a MailServer.
|
2018-06-15 15:12:31 +00:00
|
|
|
func (api *PublicAPI) RequestMessages(_ context.Context, r MessagesRequest) (hexutil.Bytes, error) {
|
2018-04-26 05:56:19 +00:00
|
|
|
api.log.Info("RequestMessages", "request", r)
|
|
|
|
shh := api.service.w
|
2018-05-04 08:23:38 +00:00
|
|
|
now := api.service.w.GetCurrentTime()
|
|
|
|
r.setDefaults(now)
|
2018-06-26 08:41:03 +00:00
|
|
|
|
|
|
|
if r.From > r.To {
|
|
|
|
return nil, fmt.Errorf("Query range is invalid: from > to (%d > %d)", r.From, r.To)
|
|
|
|
}
|
|
|
|
|
2018-12-05 13:57:05 +00:00
|
|
|
mailServerNode, err := api.getPeer(r.MailServerPeer)
|
2018-04-26 05:56:19 +00:00
|
|
|
if err != nil {
|
2018-06-15 15:12:31 +00:00
|
|
|
return nil, fmt.Errorf("%v: %v", ErrInvalidMailServerPeer, err)
|
2018-04-26 05:56:19 +00:00
|
|
|
}
|
|
|
|
|
2018-07-04 09:30:57 +00:00
|
|
|
var (
|
|
|
|
symKey []byte
|
|
|
|
publicKey *ecdsa.PublicKey
|
|
|
|
)
|
|
|
|
|
|
|
|
if r.SymKeyID != "" {
|
|
|
|
symKey, err = shh.GetSymKey(r.SymKeyID)
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("%v: %v", ErrInvalidSymKeyID, err)
|
|
|
|
}
|
|
|
|
} else {
|
2018-11-14 07:03:58 +00:00
|
|
|
publicKey = mailServerNode.Pubkey()
|
2018-04-26 05:56:19 +00:00
|
|
|
}
|
|
|
|
|
2018-12-11 10:23:47 +00:00
|
|
|
payload, err := makeMessagesRequestPayload(r)
|
2018-10-19 09:09:13 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2018-07-04 09:30:57 +00:00
|
|
|
envelope, err := makeEnvelop(
|
2018-10-19 09:09:13 +00:00
|
|
|
payload,
|
2018-07-04 09:30:57 +00:00
|
|
|
symKey,
|
|
|
|
publicKey,
|
|
|
|
api.service.nodeID,
|
|
|
|
shh.MinPow(),
|
|
|
|
now,
|
|
|
|
)
|
2018-04-26 05:56:19 +00:00
|
|
|
if err != nil {
|
2018-06-15 15:12:31 +00:00
|
|
|
return nil, err
|
2018-04-26 05:56:19 +00:00
|
|
|
}
|
2019-01-15 09:21:33 +00:00
|
|
|
hash := envelope.Hash()
|
2019-02-20 12:02:08 +00:00
|
|
|
|
2019-01-15 09:21:33 +00:00
|
|
|
if !r.Force {
|
|
|
|
err = api.service.requestsRegistry.Register(hash, r.Topics)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
2018-06-15 15:12:31 +00:00
|
|
|
|
2018-12-05 13:57:05 +00:00
|
|
|
if err := shh.RequestHistoricMessagesWithTimeout(mailServerNode.ID().Bytes(), envelope, r.Timeout*time.Second); err != nil {
|
2019-02-20 12:02:08 +00:00
|
|
|
if !r.Force {
|
|
|
|
api.service.requestsRegistry.Unregister(hash)
|
|
|
|
}
|
2018-06-15 15:12:31 +00:00
|
|
|
return nil, err
|
2018-04-26 05:56:19 +00:00
|
|
|
}
|
2019-02-20 12:02:08 +00:00
|
|
|
|
2018-06-15 15:12:31 +00:00
|
|
|
return hash[:], nil
|
2018-04-26 05:56:19 +00:00
|
|
|
}
|
|
|
|
|
2018-12-14 11:21:34 +00:00
|
|
|
// createSyncMailRequest creates SyncMailRequest. It uses a full bloom filter
|
|
|
|
// if no topics are given.
|
|
|
|
func createSyncMailRequest(r SyncMessagesRequest) (whisper.SyncMailRequest, error) {
|
|
|
|
var bloom []byte
|
|
|
|
if len(r.Topics) > 0 {
|
|
|
|
bloom = topicsToBloom(r.Topics...)
|
|
|
|
} else {
|
|
|
|
bloom = whisper.MakeFullNodeBloom()
|
|
|
|
}
|
|
|
|
|
|
|
|
cursor, err := hex.DecodeString(r.Cursor)
|
|
|
|
if err != nil {
|
|
|
|
return whisper.SyncMailRequest{}, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return whisper.SyncMailRequest{
|
|
|
|
Lower: r.From,
|
|
|
|
Upper: r.To,
|
|
|
|
Bloom: bloom,
|
|
|
|
Limit: r.Limit,
|
|
|
|
Cursor: cursor,
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func createSyncMessagesResponse(r whisper.SyncEventResponse) SyncMessagesResponse {
|
|
|
|
return SyncMessagesResponse{
|
|
|
|
Cursor: hex.EncodeToString(r.Cursor),
|
|
|
|
Error: r.Error,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// SyncMessages sends a request to a given MailServerPeer to sync historic messages.
|
|
|
|
// MailServerPeers needs to be added as a trusted peer first.
|
|
|
|
func (api *PublicAPI) SyncMessages(ctx context.Context, r SyncMessagesRequest) (SyncMessagesResponse, error) {
|
|
|
|
var response SyncMessagesResponse
|
|
|
|
|
|
|
|
mailServerEnode, err := enode.ParseV4(r.MailServerPeer)
|
|
|
|
if err != nil {
|
|
|
|
return response, fmt.Errorf("invalid MailServerPeer: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
request, err := createSyncMailRequest(r)
|
|
|
|
if err != nil {
|
|
|
|
return response, fmt.Errorf("failed to create a sync mail request: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := api.service.w.SyncMessages(mailServerEnode.ID().Bytes(), request); err != nil {
|
|
|
|
return response, fmt.Errorf("failed to send a sync request: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Wait for the response which is received asynchronously as a p2p packet.
|
|
|
|
// This packet handler will send an event which contains the response payload.
|
|
|
|
events := make(chan whisper.EnvelopeEvent)
|
|
|
|
sub := api.service.w.SubscribeEnvelopeEvents(events)
|
|
|
|
defer sub.Unsubscribe()
|
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case event := <-events:
|
|
|
|
if event.Event != whisper.EventMailServerSyncFinished {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
log.Info("received EventMailServerSyncFinished event", "data", event.Data)
|
|
|
|
|
|
|
|
if resp, ok := event.Data.(whisper.SyncEventResponse); ok {
|
|
|
|
return createSyncMessagesResponse(resp), nil
|
|
|
|
}
|
|
|
|
return response, fmt.Errorf("did not understand the response event data")
|
|
|
|
case <-ctx.Done():
|
|
|
|
return response, ctx.Err()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-04-20 11:26:54 +00:00
|
|
|
// GetNewFilterMessages is a prototype method with deduplication
|
2019-02-19 12:58:42 +00:00
|
|
|
func (api *PublicAPI) GetNewFilterMessages(filterID string) ([]dedup.DeduplicateMessage, error) {
|
2018-04-20 11:26:54 +00:00
|
|
|
msgs, err := api.publicAPI.GetFilterMessages(filterID)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2018-09-24 18:07:34 +00:00
|
|
|
|
|
|
|
dedupMessages := api.service.deduplicator.Deduplicate(msgs)
|
|
|
|
|
|
|
|
if api.service.pfsEnabled {
|
|
|
|
// Attempt to decrypt message, otherwise leave unchanged
|
2019-02-19 12:58:42 +00:00
|
|
|
for _, dedupMessage := range dedupMessages {
|
2018-09-24 18:07:34 +00:00
|
|
|
|
2019-02-19 12:58:42 +00:00
|
|
|
if err := api.processPFSMessage(dedupMessage); err != nil {
|
2018-09-24 18:07:34 +00:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return dedupMessages, nil
|
2018-05-02 12:14:08 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// ConfirmMessagesProcessed is a method to confirm that messages was consumed by
|
|
|
|
// the client side.
|
2019-05-06 06:33:19 +00:00
|
|
|
func (api *PublicAPI) ConfirmMessagesProcessed(messages []*whisper.Message) (err error) {
|
|
|
|
tx := api.service.storage.NewTx()
|
|
|
|
defer func() {
|
|
|
|
if err == nil {
|
|
|
|
err = tx.Commit()
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
ctx := NewContextFromService(context.Background(), api.service, tx)
|
2019-04-30 06:46:12 +00:00
|
|
|
for _, msg := range messages {
|
|
|
|
if msg.P2P {
|
2019-05-06 06:33:19 +00:00
|
|
|
err = api.service.historyUpdates.UpdateTopicHistory(ctx, msg.Topic, time.Unix(int64(msg.Timestamp), 0))
|
2019-04-30 06:46:12 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2019-05-06 06:33:19 +00:00
|
|
|
err = api.service.deduplicator.AddMessages(messages)
|
|
|
|
return err
|
2018-04-20 11:26:54 +00:00
|
|
|
}
|
|
|
|
|
2019-02-19 12:58:42 +00:00
|
|
|
// ConfirmMessagesProcessedByID is a method to confirm that messages was consumed by
|
|
|
|
// the client side.
|
|
|
|
func (api *PublicAPI) ConfirmMessagesProcessedByID(messageIDs [][]byte) error {
|
|
|
|
if err := api.service.protocol.ConfirmMessagesProcessed(messageIDs); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return api.service.deduplicator.AddMessageByID(messageIDs)
|
|
|
|
}
|
|
|
|
|
2018-09-24 18:07:34 +00:00
|
|
|
// SendPublicMessage sends a public chat message to the underlying transport
|
|
|
|
func (api *PublicAPI) SendPublicMessage(ctx context.Context, msg chat.SendPublicMessageRPC) (hexutil.Bytes, error) {
|
|
|
|
privateKey, err := api.service.w.GetPrivateKey(msg.Sig)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// This is transport layer agnostic
|
|
|
|
protocolMessage, err := api.service.protocol.BuildPublicMessage(privateKey, msg.Payload)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
symKeyID, err := api.service.w.AddSymKeyFromPassword(msg.Chat)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2019-05-17 11:06:56 +00:00
|
|
|
// marshal for sending to wire
|
|
|
|
marshaledMessage, err := proto.Marshal(protocolMessage)
|
|
|
|
if err != nil {
|
|
|
|
api.log.Error("encryption-service", "error marshaling message", err)
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2018-09-24 18:07:34 +00:00
|
|
|
// Enrich with transport layer info
|
2019-05-17 11:06:56 +00:00
|
|
|
whisperMessage := chat.PublicMessageToWhisper(msg, marshaledMessage)
|
2018-09-24 18:07:34 +00:00
|
|
|
whisperMessage.SymKeyID = symKeyID
|
|
|
|
|
|
|
|
// And dispatch
|
2018-10-16 10:31:05 +00:00
|
|
|
return api.Post(ctx, whisperMessage)
|
2018-09-24 18:07:34 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// SendDirectMessage sends a 1:1 chat message to the underlying transport
|
2019-02-12 11:07:13 +00:00
|
|
|
func (api *PublicAPI) SendDirectMessage(ctx context.Context, msg chat.SendDirectMessageRPC) (hexutil.Bytes, error) {
|
2018-09-24 18:07:34 +00:00
|
|
|
if !api.service.pfsEnabled {
|
|
|
|
return nil, ErrPFSNotEnabled
|
|
|
|
}
|
|
|
|
// To be completely agnostic from whisper we should not be using whisper to store the key
|
|
|
|
privateKey, err := api.service.w.GetPrivateKey(msg.Sig)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
publicKey, err := crypto.UnmarshalPubkey(msg.PubKey)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// This is transport layer-agnostic
|
2019-05-17 11:06:56 +00:00
|
|
|
var protocolMessage *chat.ProtocolMessage
|
|
|
|
// The negotiated secret
|
|
|
|
var topic []byte
|
2019-02-12 11:07:13 +00:00
|
|
|
|
2019-05-17 11:06:56 +00:00
|
|
|
api.log.Info("BUILDING MESSAGE")
|
2019-02-12 11:07:13 +00:00
|
|
|
if msg.DH {
|
2019-05-17 11:06:56 +00:00
|
|
|
protocolMessage, topic, err = api.service.protocol.BuildDHMessage(privateKey, &privateKey.PublicKey, msg.Payload)
|
2019-02-12 11:07:13 +00:00
|
|
|
} else {
|
2019-05-17 11:06:56 +00:00
|
|
|
protocolMessage, topic, err = api.service.protocol.BuildDirectMessage(privateKey, publicKey, msg.Payload)
|
|
|
|
}
|
|
|
|
|
|
|
|
api.log.Info("BUILT MESSAGE", "topic", topic)
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
2019-02-12 11:07:13 +00:00
|
|
|
}
|
|
|
|
|
2019-05-17 11:06:56 +00:00
|
|
|
// marshal for sending to wire
|
|
|
|
marshaledMessage, err := proto.Marshal(protocolMessage)
|
2018-09-24 18:07:34 +00:00
|
|
|
if err != nil {
|
2019-05-17 11:06:56 +00:00
|
|
|
api.log.Error("encryption-service", "error marshaling message", err)
|
2018-09-24 18:07:34 +00:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2019-05-17 11:06:56 +00:00
|
|
|
// TODO: Refactor this as it's not quite the right abstraction anymore
|
|
|
|
whisperMessage := chat.DirectMessageToWhisper(msg, marshaledMessage, topic)
|
2019-02-12 11:07:13 +00:00
|
|
|
// Enrich with transport layer info
|
2019-05-17 11:06:56 +00:00
|
|
|
if topic != nil {
|
|
|
|
api.log.Info("GETTING SYM KEY", "symkey", api.service.GetNegotiatedChat(publicKey))
|
|
|
|
|
|
|
|
chat := api.service.GetNegotiatedChat(publicKey)
|
|
|
|
|
|
|
|
if chat != nil {
|
|
|
|
whisperMessage.SymKeyID = chat.SymKeyID
|
|
|
|
whisperMessage.Topic = whisper.BytesToTopic(chat.Topic)
|
|
|
|
whisperMessage.PublicKey = nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
api.log.Info("WHISPER MESSAGE", "message", whisperMessage)
|
2018-09-24 18:07:34 +00:00
|
|
|
|
2019-02-12 11:07:13 +00:00
|
|
|
// And dispatch
|
|
|
|
return api.Post(ctx, whisperMessage)
|
2018-09-24 18:07:34 +00:00
|
|
|
}
|
|
|
|
|
2019-04-30 06:46:12 +00:00
|
|
|
func (api *PublicAPI) requestMessagesUsingPayload(request db.HistoryRequest, peer, symkeyID string, payload []byte, force bool, timeout time.Duration, topics []whisper.TopicType) (hash common.Hash, err error) {
|
|
|
|
shh := api.service.w
|
|
|
|
now := api.service.w.GetCurrentTime()
|
|
|
|
|
|
|
|
mailServerNode, err := api.getPeer(peer)
|
|
|
|
if err != nil {
|
|
|
|
return hash, fmt.Errorf("%v: %v", ErrInvalidMailServerPeer, err)
|
|
|
|
}
|
|
|
|
|
|
|
|
var (
|
|
|
|
symKey []byte
|
|
|
|
publicKey *ecdsa.PublicKey
|
|
|
|
)
|
|
|
|
|
|
|
|
if symkeyID != "" {
|
|
|
|
symKey, err = shh.GetSymKey(symkeyID)
|
|
|
|
if err != nil {
|
|
|
|
return hash, fmt.Errorf("%v: %v", ErrInvalidSymKeyID, err)
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
publicKey = mailServerNode.Pubkey()
|
|
|
|
}
|
|
|
|
|
|
|
|
envelope, err := makeEnvelop(
|
|
|
|
payload,
|
|
|
|
symKey,
|
|
|
|
publicKey,
|
|
|
|
api.service.nodeID,
|
|
|
|
shh.MinPow(),
|
|
|
|
now,
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
return hash, err
|
|
|
|
}
|
|
|
|
hash = envelope.Hash()
|
|
|
|
|
2019-05-01 10:33:18 +00:00
|
|
|
err = request.Replace(hash)
|
2019-04-30 06:46:12 +00:00
|
|
|
if err != nil {
|
|
|
|
return hash, err
|
|
|
|
}
|
|
|
|
|
|
|
|
if !force {
|
|
|
|
err = api.service.requestsRegistry.Register(hash, topics)
|
|
|
|
if err != nil {
|
|
|
|
return hash, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := shh.RequestHistoricMessagesWithTimeout(mailServerNode.ID().Bytes(), envelope, timeout); err != nil {
|
|
|
|
if !force {
|
|
|
|
api.service.requestsRegistry.Unregister(hash)
|
|
|
|
}
|
|
|
|
return hash, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return hash, nil
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
// InitiateHistoryRequests is a stateful API for initiating history request for each topic.
|
|
|
|
// Caller of this method needs to define only two parameters per each TopicRequest:
|
|
|
|
// - Topic
|
|
|
|
// - Duration in nanoseconds. Will be used to determine starting time for history request.
|
|
|
|
// After that status-go will guarantee that request for this topic and date will be performed.
|
2019-05-06 06:33:19 +00:00
|
|
|
func (api *PublicAPI) InitiateHistoryRequests(parent context.Context, request InitiateHistoryRequestParams) (rst []hexutil.Bytes, err error) {
|
|
|
|
tx := api.service.storage.NewTx()
|
|
|
|
defer func() {
|
|
|
|
if err == nil {
|
|
|
|
err = tx.Commit()
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
ctx := NewContextFromService(parent, api.service, tx)
|
|
|
|
requests, err := api.service.historyUpdates.CreateRequests(ctx, request.Requests)
|
2019-04-30 06:46:12 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2019-05-06 06:33:19 +00:00
|
|
|
var (
|
|
|
|
payload []byte
|
|
|
|
hash common.Hash
|
|
|
|
)
|
2019-04-30 06:46:12 +00:00
|
|
|
for i := range requests {
|
|
|
|
req := requests[i]
|
|
|
|
options := CreateTopicOptionsFromRequest(req)
|
|
|
|
bloom := options.ToBloomFilterOption()
|
2019-05-06 06:33:19 +00:00
|
|
|
payload, err = bloom.ToMessagesRequestPayload()
|
2019-04-30 06:46:12 +00:00
|
|
|
if err != nil {
|
|
|
|
return rst, err
|
|
|
|
}
|
2019-05-06 06:33:19 +00:00
|
|
|
hash, err = api.requestMessagesUsingPayload(req, request.Peer, request.SymKeyID, payload, request.Force, request.Timeout, options.Topics())
|
2019-04-30 06:46:12 +00:00
|
|
|
if err != nil {
|
|
|
|
return rst, err
|
|
|
|
}
|
2019-05-06 06:33:19 +00:00
|
|
|
rst = append(rst, hash.Bytes())
|
2019-04-30 06:46:12 +00:00
|
|
|
}
|
2019-05-06 06:33:19 +00:00
|
|
|
return rst, err
|
2019-04-30 06:46:12 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// CompleteRequest client must mark request completed when all envelopes were processed.
|
2019-05-06 06:33:19 +00:00
|
|
|
func (api *PublicAPI) CompleteRequest(parent context.Context, hex string) (err error) {
|
|
|
|
tx := api.service.storage.NewTx()
|
|
|
|
ctx := NewContextFromService(parent, api.service, tx)
|
|
|
|
err = api.service.historyUpdates.UpdateFinishedRequest(ctx, common.HexToHash(hex))
|
|
|
|
if err == nil {
|
|
|
|
return tx.Commit()
|
|
|
|
}
|
|
|
|
return err
|
2019-04-30 06:46:12 +00:00
|
|
|
}
|
|
|
|
|
2019-02-19 12:58:42 +00:00
|
|
|
func (api *PublicAPI) processPFSMessage(dedupMessage dedup.DeduplicateMessage) error {
|
|
|
|
msg := dedupMessage.Message
|
2018-09-24 18:07:34 +00:00
|
|
|
|
2018-12-21 10:07:25 +00:00
|
|
|
privateKeyID := api.service.w.SelectedKeyPairID()
|
|
|
|
if privateKeyID == "" {
|
|
|
|
return errors.New("no key selected")
|
|
|
|
}
|
2018-09-24 18:07:34 +00:00
|
|
|
|
2018-12-21 10:07:25 +00:00
|
|
|
privateKey, err := api.service.w.GetPrivateKey(privateKeyID)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
2018-09-24 18:07:34 +00:00
|
|
|
}
|
|
|
|
|
2018-12-24 07:18:27 +00:00
|
|
|
publicKey, err := crypto.UnmarshalPubkey(msg.Sig)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2018-12-21 10:07:25 +00:00
|
|
|
|
2019-05-17 11:06:56 +00:00
|
|
|
// Unmarshal message
|
|
|
|
protocolMessage := &chat.ProtocolMessage{}
|
|
|
|
|
|
|
|
if err := proto.Unmarshal(msg.Payload, protocolMessage); err != nil {
|
|
|
|
api.log.Debug("Not a protocol message", "err", err)
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
response, err := api.service.protocol.HandleMessage(privateKey, publicKey, protocolMessage, dedupMessage.DedupID)
|
2018-09-24 18:07:34 +00:00
|
|
|
|
2019-02-12 11:07:13 +00:00
|
|
|
switch err {
|
|
|
|
case nil:
|
|
|
|
// Set the decrypted payload
|
|
|
|
msg.Payload = response
|
|
|
|
case chat.ErrDeviceNotFound:
|
|
|
|
// Notify that someone tried to contact us using an invalid bundle
|
|
|
|
if privateKey.PublicKey != *publicKey {
|
|
|
|
api.log.Warn("Device not found, sending signal", "err", err)
|
|
|
|
keyString := fmt.Sprintf("0x%x", crypto.FromECDSAPub(publicKey))
|
|
|
|
handler := EnvelopeSignalHandler{}
|
|
|
|
handler.DecryptMessageFailed(keyString)
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
// Log and pass to the client, even if failed to decrypt
|
2018-10-16 10:31:05 +00:00
|
|
|
api.log.Error("Failed handling message with error", "err", err)
|
2019-02-12 11:07:13 +00:00
|
|
|
}
|
2018-09-24 18:07:34 +00:00
|
|
|
|
2018-10-16 10:31:05 +00:00
|
|
|
return nil
|
2018-09-24 18:07:34 +00:00
|
|
|
}
|
|
|
|
|
2018-04-26 05:56:19 +00:00
|
|
|
// -----
|
|
|
|
// HELPER
|
|
|
|
// -----
|
|
|
|
|
|
|
|
// makeEnvelop makes an envelop for a historic messages request.
|
|
|
|
// Symmetric key is used to authenticate to MailServer.
|
|
|
|
// PK is the current node ID.
|
2018-07-04 09:30:57 +00:00
|
|
|
func makeEnvelop(
|
|
|
|
payload []byte,
|
|
|
|
symKey []byte,
|
|
|
|
publicKey *ecdsa.PublicKey,
|
|
|
|
nodeID *ecdsa.PrivateKey,
|
|
|
|
pow float64,
|
|
|
|
now time.Time,
|
|
|
|
) (*whisper.Envelope, error) {
|
2018-04-26 05:56:19 +00:00
|
|
|
params := whisper.MessageParams{
|
|
|
|
PoW: pow,
|
|
|
|
Payload: payload,
|
|
|
|
WorkTime: defaultWorkTime,
|
|
|
|
Src: nodeID,
|
|
|
|
}
|
2018-07-04 09:30:57 +00:00
|
|
|
// Either symKey or public key is required.
|
|
|
|
// This condition is verified in `message.Wrap()` method.
|
|
|
|
if len(symKey) > 0 {
|
|
|
|
params.KeySym = symKey
|
|
|
|
} else if publicKey != nil {
|
|
|
|
params.Dst = publicKey
|
|
|
|
}
|
2018-04-26 05:56:19 +00:00
|
|
|
message, err := whisper.NewSentMessage(¶ms)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2018-05-04 08:23:38 +00:00
|
|
|
return message.Wrap(¶ms, now)
|
2018-04-26 05:56:19 +00:00
|
|
|
}
|
|
|
|
|
2018-12-11 10:23:47 +00:00
|
|
|
// makeMessagesRequestPayload makes a specific payload for MailServer
|
|
|
|
// to request historic messages.
|
|
|
|
func makeMessagesRequestPayload(r MessagesRequest) ([]byte, error) {
|
2018-10-19 09:09:13 +00:00
|
|
|
cursor, err := hex.DecodeString(r.Cursor)
|
2018-12-11 10:23:47 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("invalid cursor: %v", err)
|
|
|
|
}
|
2019-05-09 10:58:02 +00:00
|
|
|
|
|
|
|
if len(cursor) > 0 && len(cursor) != mailserver.CursorLength {
|
|
|
|
return nil, fmt.Errorf("invalid cursor size: expected %d but got %d", mailserver.CursorLength, len(cursor))
|
2018-10-19 09:09:13 +00:00
|
|
|
}
|
|
|
|
|
2018-12-11 10:23:47 +00:00
|
|
|
payload := mailserver.MessagesRequestPayload{
|
2018-10-19 09:09:13 +00:00
|
|
|
Lower: r.From,
|
|
|
|
Upper: r.To,
|
|
|
|
Bloom: createBloomFilter(r),
|
|
|
|
Limit: r.Limit,
|
|
|
|
Cursor: cursor,
|
|
|
|
// Client must tell the MailServer if it supports batch responses.
|
|
|
|
// This can be removed in the future.
|
|
|
|
Batch: true,
|
2018-07-02 07:38:10 +00:00
|
|
|
}
|
|
|
|
|
2018-10-19 09:09:13 +00:00
|
|
|
return rlp.EncodeToBytes(payload)
|
2018-04-26 05:56:19 +00:00
|
|
|
}
|
2018-10-15 21:15:04 +00:00
|
|
|
|
|
|
|
func createBloomFilter(r MessagesRequest) []byte {
|
|
|
|
if len(r.Topics) > 0 {
|
|
|
|
return topicsToBloom(r.Topics...)
|
|
|
|
}
|
|
|
|
|
|
|
|
return whisper.TopicToBloom(r.Topic)
|
|
|
|
}
|
|
|
|
|
|
|
|
func topicsToBloom(topics ...whisper.TopicType) []byte {
|
|
|
|
i := new(big.Int)
|
|
|
|
for _, topic := range topics {
|
|
|
|
bloom := whisper.TopicToBloom(topic)
|
|
|
|
i.Or(i, new(big.Int).SetBytes(bloom[:]))
|
|
|
|
}
|
|
|
|
|
|
|
|
combined := make([]byte, whisper.BloomFilterSize)
|
|
|
|
data := i.Bytes()
|
|
|
|
copy(combined[whisper.BloomFilterSize-len(data):], data[:])
|
|
|
|
|
|
|
|
return combined
|
|
|
|
}
|