remove mailserver logic
This commit is contained in:
parent
cf6ef3171a
commit
be01875d1d
|
@ -1074,17 +1074,6 @@ func (b *GethStatusBackend) Logout() error {
|
||||||
|
|
||||||
// cleanupServices stops parts of services that doesn't managed by a node and removes injected data from services.
|
// cleanupServices stops parts of services that doesn't managed by a node and removes injected data from services.
|
||||||
func (b *GethStatusBackend) cleanupServices() error {
|
func (b *GethStatusBackend) cleanupServices() error {
|
||||||
whisperService, err := b.statusNode.WhisperService()
|
|
||||||
switch err {
|
|
||||||
case node.ErrServiceUnknown: // Whisper was never registered
|
|
||||||
case nil:
|
|
||||||
if err := whisperService.DeleteKeyPairs(); err != nil {
|
|
||||||
return fmt.Errorf("%s: %v", ErrWhisperClearIdentitiesFailure, err)
|
|
||||||
}
|
|
||||||
b.selectedAccountKeyID = ""
|
|
||||||
default:
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
wakuService, err := b.statusNode.WakuService()
|
wakuService, err := b.statusNode.WakuService()
|
||||||
switch err {
|
switch err {
|
||||||
case node.ErrServiceUnknown: // Waku was never registered
|
case node.ErrServiceUnknown: // Waku was never registered
|
||||||
|
@ -1170,39 +1159,12 @@ func (b *GethStatusBackend) injectAccountsIntoServices() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
identity := chatAccount.AccountKey.PrivateKey
|
identity := chatAccount.AccountKey.PrivateKey
|
||||||
whisperService, err := b.statusNode.WhisperService()
|
|
||||||
|
|
||||||
switch err {
|
|
||||||
case node.ErrServiceUnknown: // Whisper was never registered
|
|
||||||
case nil:
|
|
||||||
if err := whisperService.DeleteKeyPairs(); err != nil { // err is not possible; method return value is incorrect
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
b.selectedAccountKeyID, err = whisperService.AddKeyPair(identity)
|
|
||||||
if err != nil {
|
|
||||||
return ErrWhisperIdentityInjectionFailure
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
acc, err := b.GetActiveAccount()
|
acc, err := b.GetActiveAccount()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if whisperService != nil {
|
|
||||||
st, err := b.statusNode.ShhExtService()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := st.InitProtocol(identity, b.appDB, b.multiaccountsDB, acc, logutils.ZapLogger()); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
wakuService, err := b.statusNode.WakuService()
|
wakuService, err := b.statusNode.WakuService()
|
||||||
|
|
||||||
switch err {
|
switch err {
|
||||||
|
|
|
@ -167,6 +167,7 @@ func (w *gethWakuWrapper) SendMessagesRequest(peerID []byte, r types.MessagesReq
|
||||||
Limit: r.Limit,
|
Limit: r.Limit,
|
||||||
Cursor: r.Cursor,
|
Cursor: r.Cursor,
|
||||||
Bloom: r.Bloom,
|
Bloom: r.Bloom,
|
||||||
|
Topics: r.Topics,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,6 +26,10 @@ type MessagesRequest struct {
|
||||||
Cursor []byte `json:"cursor"`
|
Cursor []byte `json:"cursor"`
|
||||||
// Bloom is a filter to match requested messages.
|
// Bloom is a filter to match requested messages.
|
||||||
Bloom []byte `json:"bloom"`
|
Bloom []byte `json:"bloom"`
|
||||||
|
|
||||||
|
// Topics is a list of topics. A returned message should
|
||||||
|
// belong to one of the topics from the list.
|
||||||
|
Topics [][]byte `json:"topics"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetDefaults sets the From and To defaults
|
// SetDefaults sets the From and To defaults
|
||||||
|
|
|
@ -36,7 +36,6 @@ import (
|
||||||
localnotifications "github.com/status-im/status-go/services/local-notifications"
|
localnotifications "github.com/status-im/status-go/services/local-notifications"
|
||||||
"github.com/status-im/status-go/services/peer"
|
"github.com/status-im/status-go/services/peer"
|
||||||
"github.com/status-im/status-go/services/permissions"
|
"github.com/status-im/status-go/services/permissions"
|
||||||
"github.com/status-im/status-go/services/shhext"
|
|
||||||
"github.com/status-im/status-go/services/status"
|
"github.com/status-im/status-go/services/status"
|
||||||
"github.com/status-im/status-go/services/wakuext"
|
"github.com/status-im/status-go/services/wakuext"
|
||||||
"github.com/status-im/status-go/services/wallet"
|
"github.com/status-im/status-go/services/wallet"
|
||||||
|
@ -635,19 +634,6 @@ func (n *StatusNode) WakuService() (w *waku.Waku, err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// ShhExtService exposes reference to shh extension service running on top of the node
|
|
||||||
func (n *StatusNode) ShhExtService() (s *shhext.Service, err error) {
|
|
||||||
n.mu.RLock()
|
|
||||||
defer n.mu.RUnlock()
|
|
||||||
|
|
||||||
err = n.gethService(&s)
|
|
||||||
if err == node.ErrServiceUnknown {
|
|
||||||
err = ErrServiceUnknown
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// WakuExtService exposes reference to shh extension service running on top of the node
|
// WakuExtService exposes reference to shh extension service running on top of the node
|
||||||
func (n *StatusNode) WakuExtService() (s *wakuext.Service, err error) {
|
func (n *StatusNode) WakuExtService() (s *wakuext.Service, err error) {
|
||||||
n.mu.RLock()
|
n.mu.RLock()
|
||||||
|
|
|
@ -33,7 +33,6 @@ import (
|
||||||
"github.com/status-im/status-go/services/nodebridge"
|
"github.com/status-im/status-go/services/nodebridge"
|
||||||
"github.com/status-im/status-go/services/peer"
|
"github.com/status-im/status-go/services/peer"
|
||||||
"github.com/status-im/status-go/services/personal"
|
"github.com/status-im/status-go/services/personal"
|
||||||
"github.com/status-im/status-go/services/shhext"
|
|
||||||
"github.com/status-im/status-go/services/status"
|
"github.com/status-im/status-go/services/status"
|
||||||
"github.com/status-im/status-go/services/wakuext"
|
"github.com/status-im/status-go/services/wakuext"
|
||||||
"github.com/status-im/status-go/static"
|
"github.com/status-im/status-go/static"
|
||||||
|
@ -139,11 +138,6 @@ func activateNodeServices(stack *node.Node, config *params.NodeConfig, db *level
|
||||||
return fmt.Errorf("failed to register NodeBridge: %v", err)
|
return fmt.Errorf("failed to register NodeBridge: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// start Whisper service.
|
|
||||||
if err := activateShhService(stack, config, db); err != nil {
|
|
||||||
return fmt.Errorf("%v: %v", ErrWhisperServiceRegistrationFailure, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// start Waku service
|
// start Waku service
|
||||||
if err := activateWakuService(stack, config, db); err != nil {
|
if err := activateWakuService(stack, config, db); err != nil {
|
||||||
return fmt.Errorf("%v: %v", ErrWakuServiceRegistrationFailure, err)
|
return fmt.Errorf("%v: %v", ErrWakuServiceRegistrationFailure, err)
|
||||||
|
@ -324,46 +318,6 @@ func registerWakuMailServer(wakuService *waku.Waku, config *params.WakuConfig) (
|
||||||
return mailServer.Init(wakuService, config)
|
return mailServer.Init(wakuService, config)
|
||||||
}
|
}
|
||||||
|
|
||||||
// activateShhService configures Whisper and adds it to the given node.
|
|
||||||
func activateShhService(stack *node.Node, config *params.NodeConfig, db *leveldb.DB) (err error) {
|
|
||||||
if !config.WhisperConfig.Enabled {
|
|
||||||
logger.Info("SHH protocol is disabled")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
|
|
||||||
return createShhService(ctx, &config.WhisperConfig, &config.ClusterConfig)
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register Whisper eth-node bridge
|
|
||||||
err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
|
|
||||||
var ethnode *nodebridge.NodeService
|
|
||||||
if err := ctx.Service(ðnode); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
w, err := ethnode.Node.GetWhisper(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return &nodebridge.WhisperService{Whisper: w}, nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(dshulyak) add a config option to enable it by default, but disable if app is started from statusd
|
|
||||||
return stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
|
|
||||||
var ethnode *nodebridge.NodeService
|
|
||||||
if err := ctx.Service(ðnode); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return shhext.New(config.ShhextConfig, ethnode.Node, ctx, ext.EnvelopeSignalHandler{}, db), nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// activateWakuService configures Waku and adds it to the given node.
|
// activateWakuService configures Waku and adds it to the given node.
|
||||||
func activateWakuService(stack *node.Node, config *params.NodeConfig, db *leveldb.DB) (err error) {
|
func activateWakuService(stack *node.Node, config *params.NodeConfig, db *leveldb.DB) (err error) {
|
||||||
if !config.WakuConfig.Enabled {
|
if !config.WakuConfig.Enabled {
|
||||||
|
|
|
@ -415,7 +415,6 @@ func (t *Transport) SendMessagesRequestForTopics(
|
||||||
) (cursor []byte, err error) {
|
) (cursor []byte, err error) {
|
||||||
|
|
||||||
r := createMessagesRequest(from, to, previousCursor, topics)
|
r := createMessagesRequest(from, to, previousCursor, topics)
|
||||||
r.SetDefaults(t.waku.GetCurrentTime())
|
|
||||||
|
|
||||||
events := make(chan types.EnvelopeEvent, 10)
|
events := make(chan types.EnvelopeEvent, 10)
|
||||||
sub := t.waku.SubscribeEnvelopeEvents(events)
|
sub := t.waku.SubscribeEnvelopeEvents(events)
|
||||||
|
@ -475,13 +474,17 @@ func createMessagesRequest(from, to uint32, cursor []byte, topics []types.TopicT
|
||||||
aUUID := uuid.New()
|
aUUID := uuid.New()
|
||||||
// uuid is 16 bytes, converted to hex it's 32 bytes as expected by types.MessagesRequest
|
// uuid is 16 bytes, converted to hex it's 32 bytes as expected by types.MessagesRequest
|
||||||
id := []byte(hex.EncodeToString(aUUID[:]))
|
id := []byte(hex.EncodeToString(aUUID[:]))
|
||||||
|
var topicBytes [][]byte
|
||||||
|
for _, t := range topics {
|
||||||
|
topicBytes = append(topicBytes, t[:])
|
||||||
|
}
|
||||||
return types.MessagesRequest{
|
return types.MessagesRequest{
|
||||||
ID: id,
|
ID: id,
|
||||||
From: from,
|
From: from,
|
||||||
To: to,
|
To: to,
|
||||||
Limit: 100,
|
Limit: 1000,
|
||||||
Cursor: cursor,
|
Cursor: cursor,
|
||||||
Bloom: topicsToBloom(topics...),
|
Topics: topicBytes,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -62,13 +62,8 @@ type Service struct {
|
||||||
n types.Node
|
n types.Node
|
||||||
config params.ShhextConfig
|
config params.ShhextConfig
|
||||||
mailMonitor *MailRequestMonitor
|
mailMonitor *MailRequestMonitor
|
||||||
requestsRegistry *RequestsRegistry
|
|
||||||
server *p2p.Server
|
server *p2p.Server
|
||||||
eventSub mailservers.EnvelopeEventSubscriber
|
|
||||||
peerStore *mailservers.PeerStore
|
peerStore *mailservers.PeerStore
|
||||||
cache *mailservers.Cache
|
|
||||||
connManager *mailservers.ConnectionManager
|
|
||||||
lastUsedMonitor *mailservers.LastUsedConnectionMonitor
|
|
||||||
accountsDB *accounts.Database
|
accountsDB *accounts.Database
|
||||||
multiAccountsDB *multiaccounts.Database
|
multiAccountsDB *multiaccounts.Database
|
||||||
account *multiaccounts.Account
|
account *multiaccounts.Account
|
||||||
|
@ -82,7 +77,6 @@ func New(
|
||||||
n types.Node,
|
n types.Node,
|
||||||
ldb *leveldb.DB,
|
ldb *leveldb.DB,
|
||||||
mailMonitor *MailRequestMonitor,
|
mailMonitor *MailRequestMonitor,
|
||||||
reqRegistry *RequestsRegistry,
|
|
||||||
eventSub mailservers.EnvelopeEventSubscriber,
|
eventSub mailservers.EnvelopeEventSubscriber,
|
||||||
) *Service {
|
) *Service {
|
||||||
cache := mailservers.NewCache(ldb)
|
cache := mailservers.NewCache(ldb)
|
||||||
|
@ -92,10 +86,7 @@ func New(
|
||||||
n: n,
|
n: n,
|
||||||
config: config,
|
config: config,
|
||||||
mailMonitor: mailMonitor,
|
mailMonitor: mailMonitor,
|
||||||
requestsRegistry: reqRegistry,
|
|
||||||
peerStore: peerStore,
|
peerStore: peerStore,
|
||||||
cache: mailservers.NewCache(ldb),
|
|
||||||
eventSub: eventSub,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -106,10 +97,6 @@ func (s *Service) NodeID() *ecdsa.PrivateKey {
|
||||||
return s.server.PrivateKey
|
return s.server.PrivateKey
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) RequestsRegistry() *RequestsRegistry {
|
|
||||||
return s.requestsRegistry
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Service) GetPeer(rawURL string) (*enode.Node, error) {
|
func (s *Service) GetPeer(rawURL string) (*enode.Node, error) {
|
||||||
if len(rawURL) == 0 {
|
if len(rawURL) == 0 {
|
||||||
return mailservers.GetFirstConnected(s.server, s.peerStore)
|
return mailservers.GetFirstConnected(s.server, s.peerStore)
|
||||||
|
@ -330,12 +317,12 @@ func (s *Service) UpdateMailservers(nodes []*enode.Node) error {
|
||||||
log.Info("Setting messenger")
|
log.Info("Setting messenger")
|
||||||
s.messenger.SetMailserver(nodes[0].ID().Bytes())
|
s.messenger.SetMailserver(nodes[0].ID().Bytes())
|
||||||
}
|
}
|
||||||
|
for _, peer := range nodes {
|
||||||
|
s.server.AddPeer(peer)
|
||||||
|
}
|
||||||
if err := s.peerStore.Update(nodes); err != nil {
|
if err := s.peerStore.Update(nodes); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if s.connManager != nil {
|
|
||||||
s.connManager.Notify(nodes)
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -352,27 +339,6 @@ func (s *Service) APIs() []rpc.API {
|
||||||
// Start is run when a service is started.
|
// Start is run when a service is started.
|
||||||
// It does nothing in this case but is required by `node.Service` interface.
|
// It does nothing in this case but is required by `node.Service` interface.
|
||||||
func (s *Service) Start(server *p2p.Server) error {
|
func (s *Service) Start(server *p2p.Server) error {
|
||||||
if s.config.EnableConnectionManager {
|
|
||||||
connectionsTarget := s.config.ConnectionTarget
|
|
||||||
if connectionsTarget == 0 {
|
|
||||||
connectionsTarget = defaultConnectionsTarget
|
|
||||||
}
|
|
||||||
maxFailures := s.config.MaxServerFailures
|
|
||||||
// if not defined change server on first expired event
|
|
||||||
if maxFailures == 0 {
|
|
||||||
maxFailures = 1
|
|
||||||
}
|
|
||||||
s.connManager = mailservers.NewConnectionManager(server, s.eventSub, connectionsTarget, maxFailures, defaultTimeoutWaitAdded)
|
|
||||||
s.connManager.Start()
|
|
||||||
if err := mailservers.EnsureUsedRecordsAddedFirst(s.peerStore, s.connManager); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if s.config.EnableLastUsedMonitor {
|
|
||||||
s.lastUsedMonitor = mailservers.NewLastUsedConnectionMonitor(s.peerStore, s.cache, s.eventSub)
|
|
||||||
s.lastUsedMonitor.Start()
|
|
||||||
}
|
|
||||||
s.mailMonitor.Start()
|
|
||||||
s.server = server
|
s.server = server
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -380,15 +346,6 @@ func (s *Service) Start(server *p2p.Server) error {
|
||||||
// Stop is run when a service is stopped.
|
// Stop is run when a service is stopped.
|
||||||
func (s *Service) Stop() error {
|
func (s *Service) Stop() error {
|
||||||
log.Info("Stopping shhext service")
|
log.Info("Stopping shhext service")
|
||||||
if s.config.EnableConnectionManager {
|
|
||||||
s.connManager.Stop()
|
|
||||||
}
|
|
||||||
if s.config.EnableLastUsedMonitor {
|
|
||||||
s.lastUsedMonitor.Stop()
|
|
||||||
}
|
|
||||||
s.requestsRegistry.Clear()
|
|
||||||
s.mailMonitor.Stop()
|
|
||||||
|
|
||||||
if s.cancelMessenger != nil {
|
if s.cancelMessenger != nil {
|
||||||
select {
|
select {
|
||||||
case <-s.cancelMessenger:
|
case <-s.cancelMessenger:
|
||||||
|
|
|
@ -1,286 +0,0 @@
|
||||||
// +build !nimbus
|
|
||||||
|
|
||||||
package shhext
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"crypto/ecdsa"
|
|
||||||
"encoding/hex"
|
|
||||||
"fmt"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/log"
|
|
||||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
|
||||||
|
|
||||||
gethbridge "github.com/status-im/status-go/eth-node/bridge/geth"
|
|
||||||
"github.com/status-im/status-go/eth-node/types"
|
|
||||||
"github.com/status-im/status-go/services/ext"
|
|
||||||
"github.com/status-im/status-go/whisper"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
// defaultWorkTime is a work time reported in messages sent to MailServer nodes.
|
|
||||||
defaultWorkTime = 5
|
|
||||||
)
|
|
||||||
|
|
||||||
// PublicAPI extends whisper public API.
|
|
||||||
type PublicAPI struct {
|
|
||||||
*ext.PublicAPI
|
|
||||||
|
|
||||||
service *Service
|
|
||||||
publicAPI types.PublicWhisperAPI
|
|
||||||
log log.Logger
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewPublicAPI returns instance of the public API.
|
|
||||||
func NewPublicAPI(s *Service) *PublicAPI {
|
|
||||||
return &PublicAPI{
|
|
||||||
PublicAPI: ext.NewPublicAPI(s.Service, s.w),
|
|
||||||
service: s,
|
|
||||||
publicAPI: s.w.PublicWhisperAPI(),
|
|
||||||
log: log.New("package", "status-go/services/sshext.PublicAPI"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// makeEnvelop makes an envelop for a historic messages request.
|
|
||||||
// Symmetric key is used to authenticate to MailServer.
|
|
||||||
// PK is the current node ID.
|
|
||||||
// DEPRECATED
|
|
||||||
func makeEnvelop(
|
|
||||||
payload []byte,
|
|
||||||
symKey []byte,
|
|
||||||
publicKey *ecdsa.PublicKey,
|
|
||||||
nodeID *ecdsa.PrivateKey,
|
|
||||||
pow float64,
|
|
||||||
now time.Time,
|
|
||||||
) (types.Envelope, error) {
|
|
||||||
// TODO: replace with an types.Envelope creator passed to the API struct
|
|
||||||
params := whisper.MessageParams{
|
|
||||||
PoW: pow,
|
|
||||||
Payload: payload,
|
|
||||||
WorkTime: defaultWorkTime,
|
|
||||||
Src: nodeID,
|
|
||||||
}
|
|
||||||
// Either symKey or public key is required.
|
|
||||||
// This condition is verified in `message.Wrap()` method.
|
|
||||||
if len(symKey) > 0 {
|
|
||||||
params.KeySym = symKey
|
|
||||||
} else if publicKey != nil {
|
|
||||||
params.Dst = publicKey
|
|
||||||
}
|
|
||||||
message, err := whisper.NewSentMessage(¶ms)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
envelope, err := message.Wrap(¶ms, now)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return gethbridge.NewWhisperEnvelope(envelope), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RequestMessages sends a request for historic messages to a MailServer.
|
|
||||||
func (api *PublicAPI) RequestMessages(_ context.Context, r ext.MessagesRequest) (types.HexBytes, error) {
|
|
||||||
api.log.Info("RequestMessages", "request", r)
|
|
||||||
|
|
||||||
now := api.service.w.GetCurrentTime()
|
|
||||||
r.SetDefaults(now)
|
|
||||||
|
|
||||||
if r.From > r.To {
|
|
||||||
return nil, fmt.Errorf("Query range is invalid: from > to (%d > %d)", r.From, r.To)
|
|
||||||
}
|
|
||||||
|
|
||||||
mailServerNode, err := api.service.GetPeer(r.MailServerPeer)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("%v: %v", ext.ErrInvalidMailServerPeer, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
symKey []byte
|
|
||||||
publicKey *ecdsa.PublicKey
|
|
||||||
)
|
|
||||||
|
|
||||||
if r.SymKeyID != "" {
|
|
||||||
symKey, err = api.service.w.GetSymKey(r.SymKeyID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("%v: %v", ext.ErrInvalidSymKeyID, err)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
publicKey = mailServerNode.Pubkey()
|
|
||||||
}
|
|
||||||
|
|
||||||
payload, err := ext.MakeMessagesRequestPayload(r)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
envelope, err := makeEnvelop(
|
|
||||||
payload,
|
|
||||||
symKey,
|
|
||||||
publicKey,
|
|
||||||
api.service.NodeID(),
|
|
||||||
api.service.w.MinPow(),
|
|
||||||
now,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
hash := envelope.Hash()
|
|
||||||
|
|
||||||
if !r.Force {
|
|
||||||
err = api.service.RequestsRegistry().Register(hash, r.Topics)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := api.service.w.RequestHistoricMessagesWithTimeout(mailServerNode.ID().Bytes(), envelope, r.Timeout*time.Second); err != nil {
|
|
||||||
if !r.Force {
|
|
||||||
api.service.RequestsRegistry().Unregister(hash)
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return hash[:], nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RequestMessagesSync repeats MessagesRequest using configuration in retry conf.
|
|
||||||
func (api *PublicAPI) RequestMessagesSync(conf ext.RetryConfig, r ext.MessagesRequest) (ext.MessagesResponse, error) {
|
|
||||||
var resp ext.MessagesResponse
|
|
||||||
|
|
||||||
events := make(chan types.EnvelopeEvent, 10)
|
|
||||||
var (
|
|
||||||
requestID types.HexBytes
|
|
||||||
err error
|
|
||||||
retries int
|
|
||||||
)
|
|
||||||
for retries <= conf.MaxRetries {
|
|
||||||
sub := api.service.w.SubscribeEnvelopeEvents(events)
|
|
||||||
r.Timeout = conf.BaseTimeout + conf.StepTimeout*time.Duration(retries)
|
|
||||||
timeout := r.Timeout
|
|
||||||
// FIXME this weird conversion is required because MessagesRequest expects seconds but defines time.Duration
|
|
||||||
r.Timeout = time.Duration(int(r.Timeout.Seconds()))
|
|
||||||
requestID, err = api.RequestMessages(context.Background(), r)
|
|
||||||
if err != nil {
|
|
||||||
sub.Unsubscribe()
|
|
||||||
return resp, err
|
|
||||||
}
|
|
||||||
mailServerResp, err := ext.WaitForExpiredOrCompleted(types.BytesToHash(requestID), events, timeout)
|
|
||||||
sub.Unsubscribe()
|
|
||||||
if err == nil {
|
|
||||||
resp.Cursor = hex.EncodeToString(mailServerResp.Cursor)
|
|
||||||
resp.Error = mailServerResp.Error
|
|
||||||
return resp, nil
|
|
||||||
}
|
|
||||||
retries++
|
|
||||||
api.log.Error("[RequestMessagesSync] failed", "err", err, "retries", retries)
|
|
||||||
}
|
|
||||||
return resp, fmt.Errorf("failed to request messages after %d retries", retries)
|
|
||||||
}
|
|
||||||
|
|
||||||
// SyncMessagesRequest is a SyncMessages() request payload.
|
|
||||||
type SyncMessagesRequest struct {
|
|
||||||
// MailServerPeer is MailServer's enode address.
|
|
||||||
MailServerPeer string `json:"mailServerPeer"`
|
|
||||||
|
|
||||||
// From is a lower bound of time range (optional).
|
|
||||||
// Default is 24 hours back from now.
|
|
||||||
From uint32 `json:"from"`
|
|
||||||
|
|
||||||
// To is a upper bound of time range (optional).
|
|
||||||
// Default is now.
|
|
||||||
To uint32 `json:"to"`
|
|
||||||
|
|
||||||
// Limit determines the number of messages sent by the mail server
|
|
||||||
// for the current paginated request
|
|
||||||
Limit uint32 `json:"limit"`
|
|
||||||
|
|
||||||
// Cursor is used as starting point for paginated requests
|
|
||||||
Cursor string `json:"cursor"`
|
|
||||||
|
|
||||||
// FollowCursor if true loads messages until cursor is empty.
|
|
||||||
FollowCursor bool `json:"followCursor"`
|
|
||||||
|
|
||||||
// Topics is a list of Whisper topics.
|
|
||||||
// If empty, a full bloom filter will be used.
|
|
||||||
Topics []types.TopicType `json:"topics"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// SyncMessagesResponse is a response from the mail server
|
|
||||||
// to which SyncMessagesRequest was sent.
|
|
||||||
type SyncMessagesResponse struct {
|
|
||||||
// Cursor from the response can be used to retrieve more messages
|
|
||||||
// for the previous request.
|
|
||||||
Cursor string `json:"cursor"`
|
|
||||||
|
|
||||||
// Error indicates that something wrong happened when sending messages
|
|
||||||
// to the requester.
|
|
||||||
Error string `json:"error"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// createSyncMailRequest creates SyncMailRequest. It uses a full bloom filter
|
|
||||||
// if no topics are given.
|
|
||||||
func createSyncMailRequest(r SyncMessagesRequest) (types.SyncMailRequest, error) {
|
|
||||||
var bloom []byte
|
|
||||||
if len(r.Topics) > 0 {
|
|
||||||
bloom = ext.TopicsToBloom(r.Topics...)
|
|
||||||
} else {
|
|
||||||
bloom = types.MakeFullNodeBloom()
|
|
||||||
}
|
|
||||||
|
|
||||||
cursor, err := hex.DecodeString(r.Cursor)
|
|
||||||
if err != nil {
|
|
||||||
return types.SyncMailRequest{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return types.SyncMailRequest{
|
|
||||||
Lower: r.From,
|
|
||||||
Upper: r.To,
|
|
||||||
Bloom: bloom,
|
|
||||||
Limit: r.Limit,
|
|
||||||
Cursor: cursor,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func createSyncMessagesResponse(r types.SyncEventResponse) SyncMessagesResponse {
|
|
||||||
return SyncMessagesResponse{
|
|
||||||
Cursor: hex.EncodeToString(r.Cursor),
|
|
||||||
Error: r.Error,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// SyncMessages sends a request to a given MailServerPeer to sync historic messages.
|
|
||||||
// MailServerPeers needs to be added as a trusted peer first.
|
|
||||||
func (api *PublicAPI) SyncMessages(ctx context.Context, r SyncMessagesRequest) (SyncMessagesResponse, error) {
|
|
||||||
log.Info("SyncMessages start", "request", r)
|
|
||||||
|
|
||||||
var response SyncMessagesResponse
|
|
||||||
|
|
||||||
mailServerEnode, err := enode.ParseV4(r.MailServerPeer)
|
|
||||||
if err != nil {
|
|
||||||
return response, fmt.Errorf("invalid MailServerPeer: %v", err)
|
|
||||||
}
|
|
||||||
mailServerID := mailServerEnode.ID().Bytes()
|
|
||||||
|
|
||||||
request, err := createSyncMailRequest(r)
|
|
||||||
if err != nil {
|
|
||||||
return response, fmt.Errorf("failed to create a sync mail request: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
|
||||||
log.Info("Sending a request to sync messages", "request", request)
|
|
||||||
|
|
||||||
resp, err := api.service.SyncMessages(ctx, mailServerID, request)
|
|
||||||
if err != nil {
|
|
||||||
return response, err
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Info("Syncing messages response", "error", resp.Error, "cursor", fmt.Sprintf("%#x", resp.Cursor))
|
|
||||||
|
|
||||||
if resp.Error != "" || len(resp.Cursor) == 0 || !r.FollowCursor {
|
|
||||||
return createSyncMessagesResponse(resp), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
request.Cursor = resp.Cursor
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,100 +0,0 @@
|
||||||
// +build !nimbus
|
|
||||||
|
|
||||||
package shhext
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/syndtr/goleveldb/leveldb"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/log"
|
|
||||||
"github.com/ethereum/go-ethereum/rpc"
|
|
||||||
|
|
||||||
"github.com/status-im/status-go/eth-node/types"
|
|
||||||
"github.com/status-im/status-go/params"
|
|
||||||
"github.com/status-im/status-go/services/ext"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Service struct {
|
|
||||||
*ext.Service
|
|
||||||
w types.Whisper
|
|
||||||
}
|
|
||||||
|
|
||||||
func New(config params.ShhextConfig, n types.Node, ctx interface{}, handler ext.EnvelopeEventsHandler, ldb *leveldb.DB) *Service {
|
|
||||||
w, err := n.GetWhisper(ctx)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
delay := ext.DefaultRequestsDelay
|
|
||||||
if config.RequestsDelay != 0 {
|
|
||||||
delay = config.RequestsDelay
|
|
||||||
}
|
|
||||||
requestsRegistry := ext.NewRequestsRegistry(delay)
|
|
||||||
mailMonitor := ext.NewMailRequestMonitor(w, handler, requestsRegistry)
|
|
||||||
return &Service{
|
|
||||||
Service: ext.New(config, n, ldb, mailMonitor, requestsRegistry, w),
|
|
||||||
w: w,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Service) PublicWhisperAPI() types.PublicWhisperAPI {
|
|
||||||
return s.w.PublicWhisperAPI()
|
|
||||||
}
|
|
||||||
|
|
||||||
// APIs returns a list of new APIs.
|
|
||||||
func (s *Service) APIs() []rpc.API {
|
|
||||||
apis := []rpc.API{
|
|
||||||
{
|
|
||||||
Namespace: "shhext",
|
|
||||||
Version: "1.0",
|
|
||||||
Service: NewPublicAPI(s),
|
|
||||||
Public: false,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
return apis
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Service) SyncMessages(ctx context.Context, mailServerID []byte, r types.SyncMailRequest) (resp types.SyncEventResponse, err error) {
|
|
||||||
err = s.w.SyncMessages(mailServerID, r)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for the response which is received asynchronously as a p2p packet.
|
|
||||||
// This packet handler will send an event which contains the response payload.
|
|
||||||
events := make(chan types.EnvelopeEvent, 1024)
|
|
||||||
sub := s.w.SubscribeEnvelopeEvents(events)
|
|
||||||
defer sub.Unsubscribe()
|
|
||||||
|
|
||||||
// Add explicit timeout context, otherwise the request
|
|
||||||
// can hang indefinitely if not specified by the sender.
|
|
||||||
// Sender is usually through netcat or some bash tool
|
|
||||||
// so it's not really possible to specify the timeout.
|
|
||||||
timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*30)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case event := <-events:
|
|
||||||
if event.Event != types.EventMailServerSyncFinished {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Info("received EventMailServerSyncFinished event", "data", event.Data)
|
|
||||||
|
|
||||||
var ok bool
|
|
||||||
|
|
||||||
resp, ok = event.Data.(types.SyncEventResponse)
|
|
||||||
if !ok {
|
|
||||||
err = fmt.Errorf("did not understand the response event data")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
return
|
|
||||||
case <-timeoutCtx.Done():
|
|
||||||
err = timeoutCtx.Err()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,100 +0,0 @@
|
||||||
// +build nimbus
|
|
||||||
|
|
||||||
package shhext
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/syndtr/goleveldb/leveldb"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/log"
|
|
||||||
"github.com/ethereum/go-ethereum/rpc"
|
|
||||||
|
|
||||||
"github.com/status-im/status-go/eth-node/types"
|
|
||||||
"github.com/status-im/status-go/params"
|
|
||||||
"github.com/status-im/status-go/services/ext"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Service struct {
|
|
||||||
*ext.Service
|
|
||||||
w types.Whisper
|
|
||||||
}
|
|
||||||
|
|
||||||
func New(config params.ShhextConfig, n types.Node, ctx interface{}, handler ext.EnvelopeEventsHandler, ldb *leveldb.DB) *Service {
|
|
||||||
w, err := n.GetWhisper(ctx)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
delay := ext.DefaultRequestsDelay
|
|
||||||
if config.RequestsDelay != 0 {
|
|
||||||
delay = config.RequestsDelay
|
|
||||||
}
|
|
||||||
requestsRegistry := ext.NewRequestsRegistry(delay)
|
|
||||||
mailMonitor := ext.NewMailRequestMonitor(w, handler, requestsRegistry)
|
|
||||||
return &Service{
|
|
||||||
Service: ext.New(config, n, ldb, mailMonitor, requestsRegistry, w),
|
|
||||||
w: w,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Service) PublicWhisperAPI() types.PublicWhisperAPI {
|
|
||||||
return s.w.PublicWhisperAPI()
|
|
||||||
}
|
|
||||||
|
|
||||||
// APIs returns a list of new APIs.
|
|
||||||
func (s *Service) APIs() []rpc.API {
|
|
||||||
apis := []rpc.API{
|
|
||||||
{
|
|
||||||
Namespace: "shhext",
|
|
||||||
Version: "1.0",
|
|
||||||
Service: NewPublicAPI(s),
|
|
||||||
Public: true,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
return apis
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Service) SyncMessages(ctx context.Context, mailServerID []byte, r types.SyncMailRequest) (resp types.SyncEventResponse, err error) {
|
|
||||||
err = s.w.SyncMessages(mailServerID, r)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for the response which is received asynchronously as a p2p packet.
|
|
||||||
// This packet handler will send an event which contains the response payload.
|
|
||||||
events := make(chan types.EnvelopeEvent, 1024)
|
|
||||||
sub := s.w.SubscribeEnvelopeEvents(events)
|
|
||||||
defer sub.Unsubscribe()
|
|
||||||
|
|
||||||
// Add explicit timeout context, otherwise the request
|
|
||||||
// can hang indefinitely if not specified by the sender.
|
|
||||||
// Sender is usually through netcat or some bash tool
|
|
||||||
// so it's not really possible to specify the timeout.
|
|
||||||
timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*30)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case event := <-events:
|
|
||||||
if event.Event != types.EventMailServerSyncFinished {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Info("received EventMailServerSyncFinished event", "data", event.Data)
|
|
||||||
|
|
||||||
var ok bool
|
|
||||||
|
|
||||||
resp, ok = event.Data.(types.SyncEventResponse)
|
|
||||||
if !ok {
|
|
||||||
err = fmt.Errorf("did not understand the response event data")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
return
|
|
||||||
case <-timeoutCtx.Done():
|
|
||||||
err = timeoutCtx.Err()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,10 +1,7 @@
|
||||||
package wakuext
|
package wakuext
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
"encoding/hex"
|
|
||||||
"fmt"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
|
@ -72,102 +69,3 @@ func makeEnvelop(
|
||||||
}
|
}
|
||||||
return gethbridge.NewWakuEnvelope(envelope), nil
|
return gethbridge.NewWakuEnvelope(envelope), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// RequestMessages sends a request for historic messages to a MailServer.
|
|
||||||
func (api *PublicAPI) RequestMessages(_ context.Context, r ext.MessagesRequest) (types.HexBytes, error) {
|
|
||||||
api.log.Info("RequestMessages", "request", r)
|
|
||||||
|
|
||||||
now := api.service.w.GetCurrentTime()
|
|
||||||
r.SetDefaults(now)
|
|
||||||
|
|
||||||
if r.From > r.To {
|
|
||||||
return nil, fmt.Errorf("Query range is invalid: from > to (%d > %d)", r.From, r.To)
|
|
||||||
}
|
|
||||||
|
|
||||||
mailServerNode, err := api.service.GetPeer(r.MailServerPeer)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("%v: %v", ext.ErrInvalidMailServerPeer, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
|
||||||
symKey []byte
|
|
||||||
publicKey *ecdsa.PublicKey
|
|
||||||
)
|
|
||||||
|
|
||||||
if r.SymKeyID != "" {
|
|
||||||
symKey, err = api.service.w.GetSymKey(r.SymKeyID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("%v: %v", ext.ErrInvalidSymKeyID, err)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
publicKey = mailServerNode.Pubkey()
|
|
||||||
}
|
|
||||||
|
|
||||||
payload, err := ext.MakeMessagesRequestPayload(r)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
envelope, err := makeEnvelop(
|
|
||||||
payload,
|
|
||||||
symKey,
|
|
||||||
publicKey,
|
|
||||||
api.service.NodeID(),
|
|
||||||
api.service.w.MinPow(),
|
|
||||||
now,
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
hash := envelope.Hash()
|
|
||||||
|
|
||||||
if !r.Force {
|
|
||||||
err = api.service.RequestsRegistry().Register(hash, r.Topics)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := api.service.w.RequestHistoricMessagesWithTimeout(mailServerNode.ID().Bytes(), envelope, r.Timeout*time.Second); err != nil {
|
|
||||||
if !r.Force {
|
|
||||||
api.service.RequestsRegistry().Unregister(hash)
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return hash[:], nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RequestMessagesSync repeats MessagesRequest using configuration in retry conf.
|
|
||||||
func (api *PublicAPI) RequestMessagesSync(conf ext.RetryConfig, r ext.MessagesRequest) (ext.MessagesResponse, error) {
|
|
||||||
var resp ext.MessagesResponse
|
|
||||||
|
|
||||||
events := make(chan types.EnvelopeEvent, 10)
|
|
||||||
var (
|
|
||||||
requestID types.HexBytes
|
|
||||||
err error
|
|
||||||
retries int
|
|
||||||
)
|
|
||||||
for retries <= conf.MaxRetries {
|
|
||||||
sub := api.service.w.SubscribeEnvelopeEvents(events)
|
|
||||||
r.Timeout = conf.BaseTimeout + conf.StepTimeout*time.Duration(retries)
|
|
||||||
timeout := r.Timeout
|
|
||||||
// FIXME this weird conversion is required because MessagesRequest expects seconds but defines time.Duration
|
|
||||||
r.Timeout = time.Duration(int(r.Timeout.Seconds()))
|
|
||||||
requestID, err = api.RequestMessages(context.Background(), r)
|
|
||||||
if err != nil {
|
|
||||||
sub.Unsubscribe()
|
|
||||||
return resp, err
|
|
||||||
}
|
|
||||||
mailServerResp, err := ext.WaitForExpiredOrCompleted(types.BytesToHash(requestID), events, timeout)
|
|
||||||
sub.Unsubscribe()
|
|
||||||
if err == nil {
|
|
||||||
resp.Cursor = hex.EncodeToString(mailServerResp.Cursor)
|
|
||||||
resp.Error = mailServerResp.Error
|
|
||||||
return resp, nil
|
|
||||||
}
|
|
||||||
retries++
|
|
||||||
api.log.Error("[RequestMessagesSync] failed", "err", err, "retries", retries)
|
|
||||||
}
|
|
||||||
return resp, fmt.Errorf("failed to request messages after %d retries", retries)
|
|
||||||
}
|
|
||||||
|
|
|
@ -27,7 +27,7 @@ func New(config params.ShhextConfig, n types.Node, ctx interface{}, handler ext.
|
||||||
requestsRegistry := ext.NewRequestsRegistry(delay)
|
requestsRegistry := ext.NewRequestsRegistry(delay)
|
||||||
mailMonitor := ext.NewMailRequestMonitor(w, handler, requestsRegistry)
|
mailMonitor := ext.NewMailRequestMonitor(w, handler, requestsRegistry)
|
||||||
return &Service{
|
return &Service{
|
||||||
Service: ext.New(config, n, ldb, mailMonitor, requestsRegistry, w),
|
Service: ext.New(config, n, ldb, mailMonitor, w),
|
||||||
w: w,
|
w: w,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue