From be01875d1de081c01bb72265d5c0534e31ac3ee1 Mon Sep 17 00:00:00 2001 From: Andrea Maria Piana Date: Thu, 13 May 2021 15:30:33 +0200 Subject: [PATCH] remove mailserver logic --- api/geth_backend.go | 38 ---- eth-node/bridge/geth/waku.go | 1 + eth-node/types/mailserver.go | 4 + node/get_status_node.go | 14 -- node/geth_node.go | 46 ----- protocol/transport/transport.go | 9 +- services/ext/service.go | 83 +++------ services/shhext/api_geth.go | 286 ------------------------------ services/shhext/service.go | 100 ----------- services/shhext/service_nimbus.go | 100 ----------- services/wakuext/api.go | 102 ----------- services/wakuext/service.go | 2 +- 12 files changed, 32 insertions(+), 753 deletions(-) delete mode 100644 services/shhext/api_geth.go delete mode 100644 services/shhext/service.go delete mode 100644 services/shhext/service_nimbus.go diff --git a/api/geth_backend.go b/api/geth_backend.go index a1e7173d4..da9465001 100644 --- a/api/geth_backend.go +++ b/api/geth_backend.go @@ -1074,17 +1074,6 @@ func (b *GethStatusBackend) Logout() error { // cleanupServices stops parts of services that doesn't managed by a node and removes injected data from services. func (b *GethStatusBackend) cleanupServices() error { - whisperService, err := b.statusNode.WhisperService() - switch err { - case node.ErrServiceUnknown: // Whisper was never registered - case nil: - if err := whisperService.DeleteKeyPairs(); err != nil { - return fmt.Errorf("%s: %v", ErrWhisperClearIdentitiesFailure, err) - } - b.selectedAccountKeyID = "" - default: - return err - } wakuService, err := b.statusNode.WakuService() switch err { case node.ErrServiceUnknown: // Waku was never registered @@ -1170,39 +1159,12 @@ func (b *GethStatusBackend) injectAccountsIntoServices() error { } identity := chatAccount.AccountKey.PrivateKey - whisperService, err := b.statusNode.WhisperService() - - switch err { - case node.ErrServiceUnknown: // Whisper was never registered - case nil: - if err := whisperService.DeleteKeyPairs(); err != nil { // err is not possible; method return value is incorrect - return err - } - b.selectedAccountKeyID, err = whisperService.AddKeyPair(identity) - if err != nil { - return ErrWhisperIdentityInjectionFailure - } - default: - return err - } acc, err := b.GetActiveAccount() if err != nil { return err } - if whisperService != nil { - st, err := b.statusNode.ShhExtService() - if err != nil { - return err - } - - if err := st.InitProtocol(identity, b.appDB, b.multiaccountsDB, acc, logutils.ZapLogger()); err != nil { - return err - } - return nil - } - wakuService, err := b.statusNode.WakuService() switch err { diff --git a/eth-node/bridge/geth/waku.go b/eth-node/bridge/geth/waku.go index 1793688fb..40bc60d88 100644 --- a/eth-node/bridge/geth/waku.go +++ b/eth-node/bridge/geth/waku.go @@ -167,6 +167,7 @@ func (w *gethWakuWrapper) SendMessagesRequest(peerID []byte, r types.MessagesReq Limit: r.Limit, Cursor: r.Cursor, Bloom: r.Bloom, + Topics: r.Topics, }) } diff --git a/eth-node/types/mailserver.go b/eth-node/types/mailserver.go index c17dec62f..42ede10ab 100644 --- a/eth-node/types/mailserver.go +++ b/eth-node/types/mailserver.go @@ -26,6 +26,10 @@ type MessagesRequest struct { Cursor []byte `json:"cursor"` // Bloom is a filter to match requested messages. Bloom []byte `json:"bloom"` + + // Topics is a list of topics. A returned message should + // belong to one of the topics from the list. + Topics [][]byte `json:"topics"` } // SetDefaults sets the From and To defaults diff --git a/node/get_status_node.go b/node/get_status_node.go index a4396b1ee..76f664933 100644 --- a/node/get_status_node.go +++ b/node/get_status_node.go @@ -36,7 +36,6 @@ import ( localnotifications "github.com/status-im/status-go/services/local-notifications" "github.com/status-im/status-go/services/peer" "github.com/status-im/status-go/services/permissions" - "github.com/status-im/status-go/services/shhext" "github.com/status-im/status-go/services/status" "github.com/status-im/status-go/services/wakuext" "github.com/status-im/status-go/services/wallet" @@ -635,19 +634,6 @@ func (n *StatusNode) WakuService() (w *waku.Waku, err error) { return } -// ShhExtService exposes reference to shh extension service running on top of the node -func (n *StatusNode) ShhExtService() (s *shhext.Service, err error) { - n.mu.RLock() - defer n.mu.RUnlock() - - err = n.gethService(&s) - if err == node.ErrServiceUnknown { - err = ErrServiceUnknown - } - - return -} - // WakuExtService exposes reference to shh extension service running on top of the node func (n *StatusNode) WakuExtService() (s *wakuext.Service, err error) { n.mu.RLock() diff --git a/node/geth_node.go b/node/geth_node.go index a5858baee..f2a5291ab 100644 --- a/node/geth_node.go +++ b/node/geth_node.go @@ -33,7 +33,6 @@ import ( "github.com/status-im/status-go/services/nodebridge" "github.com/status-im/status-go/services/peer" "github.com/status-im/status-go/services/personal" - "github.com/status-im/status-go/services/shhext" "github.com/status-im/status-go/services/status" "github.com/status-im/status-go/services/wakuext" "github.com/status-im/status-go/static" @@ -139,11 +138,6 @@ func activateNodeServices(stack *node.Node, config *params.NodeConfig, db *level return fmt.Errorf("failed to register NodeBridge: %v", err) } - // start Whisper service. - if err := activateShhService(stack, config, db); err != nil { - return fmt.Errorf("%v: %v", ErrWhisperServiceRegistrationFailure, err) - } - // start Waku service if err := activateWakuService(stack, config, db); err != nil { return fmt.Errorf("%v: %v", ErrWakuServiceRegistrationFailure, err) @@ -324,46 +318,6 @@ func registerWakuMailServer(wakuService *waku.Waku, config *params.WakuConfig) ( return mailServer.Init(wakuService, config) } -// activateShhService configures Whisper and adds it to the given node. -func activateShhService(stack *node.Node, config *params.NodeConfig, db *leveldb.DB) (err error) { - if !config.WhisperConfig.Enabled { - logger.Info("SHH protocol is disabled") - return nil - } - - err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { - return createShhService(ctx, &config.WhisperConfig, &config.ClusterConfig) - }) - if err != nil { - return - } - - // Register Whisper eth-node bridge - err = stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { - var ethnode *nodebridge.NodeService - if err := ctx.Service(ðnode); err != nil { - return nil, err - } - w, err := ethnode.Node.GetWhisper(ctx) - if err != nil { - return nil, err - } - return &nodebridge.WhisperService{Whisper: w}, nil - }) - if err != nil { - return - } - - // TODO(dshulyak) add a config option to enable it by default, but disable if app is started from statusd - return stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { - var ethnode *nodebridge.NodeService - if err := ctx.Service(ðnode); err != nil { - return nil, err - } - return shhext.New(config.ShhextConfig, ethnode.Node, ctx, ext.EnvelopeSignalHandler{}, db), nil - }) -} - // activateWakuService configures Waku and adds it to the given node. func activateWakuService(stack *node.Node, config *params.NodeConfig, db *leveldb.DB) (err error) { if !config.WakuConfig.Enabled { diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index eb8a1199a..5372c0fc3 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -415,7 +415,6 @@ func (t *Transport) SendMessagesRequestForTopics( ) (cursor []byte, err error) { r := createMessagesRequest(from, to, previousCursor, topics) - r.SetDefaults(t.waku.GetCurrentTime()) events := make(chan types.EnvelopeEvent, 10) sub := t.waku.SubscribeEnvelopeEvents(events) @@ -475,13 +474,17 @@ func createMessagesRequest(from, to uint32, cursor []byte, topics []types.TopicT aUUID := uuid.New() // uuid is 16 bytes, converted to hex it's 32 bytes as expected by types.MessagesRequest id := []byte(hex.EncodeToString(aUUID[:])) + var topicBytes [][]byte + for _, t := range topics { + topicBytes = append(topicBytes, t[:]) + } return types.MessagesRequest{ ID: id, From: from, To: to, - Limit: 100, + Limit: 1000, Cursor: cursor, - Bloom: topicsToBloom(topics...), + Topics: topicBytes, } } diff --git a/services/ext/service.go b/services/ext/service.go index fea0ca318..ce172a78b 100644 --- a/services/ext/service.go +++ b/services/ext/service.go @@ -55,23 +55,18 @@ type EnvelopeEventsHandler interface { // Service is a service that provides some additional API to whisper-based protocols like Whisper or Waku. type Service struct { - messenger *protocol.Messenger - identity *ecdsa.PrivateKey - cancelMessenger chan struct{} - storage db.TransactionalStorage - n types.Node - config params.ShhextConfig - mailMonitor *MailRequestMonitor - requestsRegistry *RequestsRegistry - server *p2p.Server - eventSub mailservers.EnvelopeEventSubscriber - peerStore *mailservers.PeerStore - cache *mailservers.Cache - connManager *mailservers.ConnectionManager - lastUsedMonitor *mailservers.LastUsedConnectionMonitor - accountsDB *accounts.Database - multiAccountsDB *multiaccounts.Database - account *multiaccounts.Account + messenger *protocol.Messenger + identity *ecdsa.PrivateKey + cancelMessenger chan struct{} + storage db.TransactionalStorage + n types.Node + config params.ShhextConfig + mailMonitor *MailRequestMonitor + server *p2p.Server + peerStore *mailservers.PeerStore + accountsDB *accounts.Database + multiAccountsDB *multiaccounts.Database + account *multiaccounts.Account } // Make sure that Service implements node.Service interface. @@ -82,20 +77,16 @@ func New( n types.Node, ldb *leveldb.DB, mailMonitor *MailRequestMonitor, - reqRegistry *RequestsRegistry, eventSub mailservers.EnvelopeEventSubscriber, ) *Service { cache := mailservers.NewCache(ldb) peerStore := mailservers.NewPeerStore(cache) return &Service{ - storage: db.NewLevelDBStorage(ldb), - n: n, - config: config, - mailMonitor: mailMonitor, - requestsRegistry: reqRegistry, - peerStore: peerStore, - cache: mailservers.NewCache(ldb), - eventSub: eventSub, + storage: db.NewLevelDBStorage(ldb), + n: n, + config: config, + mailMonitor: mailMonitor, + peerStore: peerStore, } } @@ -106,10 +97,6 @@ func (s *Service) NodeID() *ecdsa.PrivateKey { return s.server.PrivateKey } -func (s *Service) RequestsRegistry() *RequestsRegistry { - return s.requestsRegistry -} - func (s *Service) GetPeer(rawURL string) (*enode.Node, error) { if len(rawURL) == 0 { return mailservers.GetFirstConnected(s.server, s.peerStore) @@ -330,12 +317,12 @@ func (s *Service) UpdateMailservers(nodes []*enode.Node) error { log.Info("Setting messenger") s.messenger.SetMailserver(nodes[0].ID().Bytes()) } + for _, peer := range nodes { + s.server.AddPeer(peer) + } if err := s.peerStore.Update(nodes); err != nil { return err } - if s.connManager != nil { - s.connManager.Notify(nodes) - } return nil } @@ -352,27 +339,6 @@ func (s *Service) APIs() []rpc.API { // Start is run when a service is started. // It does nothing in this case but is required by `node.Service` interface. func (s *Service) Start(server *p2p.Server) error { - if s.config.EnableConnectionManager { - connectionsTarget := s.config.ConnectionTarget - if connectionsTarget == 0 { - connectionsTarget = defaultConnectionsTarget - } - maxFailures := s.config.MaxServerFailures - // if not defined change server on first expired event - if maxFailures == 0 { - maxFailures = 1 - } - s.connManager = mailservers.NewConnectionManager(server, s.eventSub, connectionsTarget, maxFailures, defaultTimeoutWaitAdded) - s.connManager.Start() - if err := mailservers.EnsureUsedRecordsAddedFirst(s.peerStore, s.connManager); err != nil { - return err - } - } - if s.config.EnableLastUsedMonitor { - s.lastUsedMonitor = mailservers.NewLastUsedConnectionMonitor(s.peerStore, s.cache, s.eventSub) - s.lastUsedMonitor.Start() - } - s.mailMonitor.Start() s.server = server return nil } @@ -380,15 +346,6 @@ func (s *Service) Start(server *p2p.Server) error { // Stop is run when a service is stopped. func (s *Service) Stop() error { log.Info("Stopping shhext service") - if s.config.EnableConnectionManager { - s.connManager.Stop() - } - if s.config.EnableLastUsedMonitor { - s.lastUsedMonitor.Stop() - } - s.requestsRegistry.Clear() - s.mailMonitor.Stop() - if s.cancelMessenger != nil { select { case <-s.cancelMessenger: diff --git a/services/shhext/api_geth.go b/services/shhext/api_geth.go deleted file mode 100644 index 9f467efcc..000000000 --- a/services/shhext/api_geth.go +++ /dev/null @@ -1,286 +0,0 @@ -// +build !nimbus - -package shhext - -import ( - "context" - "crypto/ecdsa" - "encoding/hex" - "fmt" - "time" - - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/p2p/enode" - - gethbridge "github.com/status-im/status-go/eth-node/bridge/geth" - "github.com/status-im/status-go/eth-node/types" - "github.com/status-im/status-go/services/ext" - "github.com/status-im/status-go/whisper" -) - -const ( - // defaultWorkTime is a work time reported in messages sent to MailServer nodes. - defaultWorkTime = 5 -) - -// PublicAPI extends whisper public API. -type PublicAPI struct { - *ext.PublicAPI - - service *Service - publicAPI types.PublicWhisperAPI - log log.Logger -} - -// NewPublicAPI returns instance of the public API. -func NewPublicAPI(s *Service) *PublicAPI { - return &PublicAPI{ - PublicAPI: ext.NewPublicAPI(s.Service, s.w), - service: s, - publicAPI: s.w.PublicWhisperAPI(), - log: log.New("package", "status-go/services/sshext.PublicAPI"), - } -} - -// makeEnvelop makes an envelop for a historic messages request. -// Symmetric key is used to authenticate to MailServer. -// PK is the current node ID. -// DEPRECATED -func makeEnvelop( - payload []byte, - symKey []byte, - publicKey *ecdsa.PublicKey, - nodeID *ecdsa.PrivateKey, - pow float64, - now time.Time, -) (types.Envelope, error) { - // TODO: replace with an types.Envelope creator passed to the API struct - params := whisper.MessageParams{ - PoW: pow, - Payload: payload, - WorkTime: defaultWorkTime, - Src: nodeID, - } - // Either symKey or public key is required. - // This condition is verified in `message.Wrap()` method. - if len(symKey) > 0 { - params.KeySym = symKey - } else if publicKey != nil { - params.Dst = publicKey - } - message, err := whisper.NewSentMessage(¶ms) - if err != nil { - return nil, err - } - envelope, err := message.Wrap(¶ms, now) - if err != nil { - return nil, err - } - return gethbridge.NewWhisperEnvelope(envelope), nil -} - -// RequestMessages sends a request for historic messages to a MailServer. -func (api *PublicAPI) RequestMessages(_ context.Context, r ext.MessagesRequest) (types.HexBytes, error) { - api.log.Info("RequestMessages", "request", r) - - now := api.service.w.GetCurrentTime() - r.SetDefaults(now) - - if r.From > r.To { - return nil, fmt.Errorf("Query range is invalid: from > to (%d > %d)", r.From, r.To) - } - - mailServerNode, err := api.service.GetPeer(r.MailServerPeer) - if err != nil { - return nil, fmt.Errorf("%v: %v", ext.ErrInvalidMailServerPeer, err) - } - - var ( - symKey []byte - publicKey *ecdsa.PublicKey - ) - - if r.SymKeyID != "" { - symKey, err = api.service.w.GetSymKey(r.SymKeyID) - if err != nil { - return nil, fmt.Errorf("%v: %v", ext.ErrInvalidSymKeyID, err) - } - } else { - publicKey = mailServerNode.Pubkey() - } - - payload, err := ext.MakeMessagesRequestPayload(r) - if err != nil { - return nil, err - } - - envelope, err := makeEnvelop( - payload, - symKey, - publicKey, - api.service.NodeID(), - api.service.w.MinPow(), - now, - ) - if err != nil { - return nil, err - } - hash := envelope.Hash() - - if !r.Force { - err = api.service.RequestsRegistry().Register(hash, r.Topics) - if err != nil { - return nil, err - } - } - - if err := api.service.w.RequestHistoricMessagesWithTimeout(mailServerNode.ID().Bytes(), envelope, r.Timeout*time.Second); err != nil { - if !r.Force { - api.service.RequestsRegistry().Unregister(hash) - } - return nil, err - } - - return hash[:], nil -} - -// RequestMessagesSync repeats MessagesRequest using configuration in retry conf. -func (api *PublicAPI) RequestMessagesSync(conf ext.RetryConfig, r ext.MessagesRequest) (ext.MessagesResponse, error) { - var resp ext.MessagesResponse - - events := make(chan types.EnvelopeEvent, 10) - var ( - requestID types.HexBytes - err error - retries int - ) - for retries <= conf.MaxRetries { - sub := api.service.w.SubscribeEnvelopeEvents(events) - r.Timeout = conf.BaseTimeout + conf.StepTimeout*time.Duration(retries) - timeout := r.Timeout - // FIXME this weird conversion is required because MessagesRequest expects seconds but defines time.Duration - r.Timeout = time.Duration(int(r.Timeout.Seconds())) - requestID, err = api.RequestMessages(context.Background(), r) - if err != nil { - sub.Unsubscribe() - return resp, err - } - mailServerResp, err := ext.WaitForExpiredOrCompleted(types.BytesToHash(requestID), events, timeout) - sub.Unsubscribe() - if err == nil { - resp.Cursor = hex.EncodeToString(mailServerResp.Cursor) - resp.Error = mailServerResp.Error - return resp, nil - } - retries++ - api.log.Error("[RequestMessagesSync] failed", "err", err, "retries", retries) - } - return resp, fmt.Errorf("failed to request messages after %d retries", retries) -} - -// SyncMessagesRequest is a SyncMessages() request payload. -type SyncMessagesRequest struct { - // MailServerPeer is MailServer's enode address. - MailServerPeer string `json:"mailServerPeer"` - - // From is a lower bound of time range (optional). - // Default is 24 hours back from now. - From uint32 `json:"from"` - - // To is a upper bound of time range (optional). - // Default is now. - To uint32 `json:"to"` - - // Limit determines the number of messages sent by the mail server - // for the current paginated request - Limit uint32 `json:"limit"` - - // Cursor is used as starting point for paginated requests - Cursor string `json:"cursor"` - - // FollowCursor if true loads messages until cursor is empty. - FollowCursor bool `json:"followCursor"` - - // Topics is a list of Whisper topics. - // If empty, a full bloom filter will be used. - Topics []types.TopicType `json:"topics"` -} - -// SyncMessagesResponse is a response from the mail server -// to which SyncMessagesRequest was sent. -type SyncMessagesResponse struct { - // Cursor from the response can be used to retrieve more messages - // for the previous request. - Cursor string `json:"cursor"` - - // Error indicates that something wrong happened when sending messages - // to the requester. - Error string `json:"error"` -} - -// createSyncMailRequest creates SyncMailRequest. It uses a full bloom filter -// if no topics are given. -func createSyncMailRequest(r SyncMessagesRequest) (types.SyncMailRequest, error) { - var bloom []byte - if len(r.Topics) > 0 { - bloom = ext.TopicsToBloom(r.Topics...) - } else { - bloom = types.MakeFullNodeBloom() - } - - cursor, err := hex.DecodeString(r.Cursor) - if err != nil { - return types.SyncMailRequest{}, err - } - - return types.SyncMailRequest{ - Lower: r.From, - Upper: r.To, - Bloom: bloom, - Limit: r.Limit, - Cursor: cursor, - }, nil -} - -func createSyncMessagesResponse(r types.SyncEventResponse) SyncMessagesResponse { - return SyncMessagesResponse{ - Cursor: hex.EncodeToString(r.Cursor), - Error: r.Error, - } -} - -// SyncMessages sends a request to a given MailServerPeer to sync historic messages. -// MailServerPeers needs to be added as a trusted peer first. -func (api *PublicAPI) SyncMessages(ctx context.Context, r SyncMessagesRequest) (SyncMessagesResponse, error) { - log.Info("SyncMessages start", "request", r) - - var response SyncMessagesResponse - - mailServerEnode, err := enode.ParseV4(r.MailServerPeer) - if err != nil { - return response, fmt.Errorf("invalid MailServerPeer: %v", err) - } - mailServerID := mailServerEnode.ID().Bytes() - - request, err := createSyncMailRequest(r) - if err != nil { - return response, fmt.Errorf("failed to create a sync mail request: %v", err) - } - - for { - log.Info("Sending a request to sync messages", "request", request) - - resp, err := api.service.SyncMessages(ctx, mailServerID, request) - if err != nil { - return response, err - } - - log.Info("Syncing messages response", "error", resp.Error, "cursor", fmt.Sprintf("%#x", resp.Cursor)) - - if resp.Error != "" || len(resp.Cursor) == 0 || !r.FollowCursor { - return createSyncMessagesResponse(resp), nil - } - - request.Cursor = resp.Cursor - } -} diff --git a/services/shhext/service.go b/services/shhext/service.go deleted file mode 100644 index 263f70bd7..000000000 --- a/services/shhext/service.go +++ /dev/null @@ -1,100 +0,0 @@ -// +build !nimbus - -package shhext - -import ( - "context" - "fmt" - "time" - - "github.com/syndtr/goleveldb/leveldb" - - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/rpc" - - "github.com/status-im/status-go/eth-node/types" - "github.com/status-im/status-go/params" - "github.com/status-im/status-go/services/ext" -) - -type Service struct { - *ext.Service - w types.Whisper -} - -func New(config params.ShhextConfig, n types.Node, ctx interface{}, handler ext.EnvelopeEventsHandler, ldb *leveldb.DB) *Service { - w, err := n.GetWhisper(ctx) - if err != nil { - panic(err) - } - delay := ext.DefaultRequestsDelay - if config.RequestsDelay != 0 { - delay = config.RequestsDelay - } - requestsRegistry := ext.NewRequestsRegistry(delay) - mailMonitor := ext.NewMailRequestMonitor(w, handler, requestsRegistry) - return &Service{ - Service: ext.New(config, n, ldb, mailMonitor, requestsRegistry, w), - w: w, - } -} - -func (s *Service) PublicWhisperAPI() types.PublicWhisperAPI { - return s.w.PublicWhisperAPI() -} - -// APIs returns a list of new APIs. -func (s *Service) APIs() []rpc.API { - apis := []rpc.API{ - { - Namespace: "shhext", - Version: "1.0", - Service: NewPublicAPI(s), - Public: false, - }, - } - return apis -} - -func (s *Service) SyncMessages(ctx context.Context, mailServerID []byte, r types.SyncMailRequest) (resp types.SyncEventResponse, err error) { - err = s.w.SyncMessages(mailServerID, r) - if err != nil { - return - } - - // Wait for the response which is received asynchronously as a p2p packet. - // This packet handler will send an event which contains the response payload. - events := make(chan types.EnvelopeEvent, 1024) - sub := s.w.SubscribeEnvelopeEvents(events) - defer sub.Unsubscribe() - - // Add explicit timeout context, otherwise the request - // can hang indefinitely if not specified by the sender. - // Sender is usually through netcat or some bash tool - // so it's not really possible to specify the timeout. - timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*30) - defer cancel() - - for { - select { - case event := <-events: - if event.Event != types.EventMailServerSyncFinished { - continue - } - - log.Info("received EventMailServerSyncFinished event", "data", event.Data) - - var ok bool - - resp, ok = event.Data.(types.SyncEventResponse) - if !ok { - err = fmt.Errorf("did not understand the response event data") - return - } - return - case <-timeoutCtx.Done(): - err = timeoutCtx.Err() - return - } - } -} diff --git a/services/shhext/service_nimbus.go b/services/shhext/service_nimbus.go deleted file mode 100644 index 198207758..000000000 --- a/services/shhext/service_nimbus.go +++ /dev/null @@ -1,100 +0,0 @@ -// +build nimbus - -package shhext - -import ( - "context" - "fmt" - "time" - - "github.com/syndtr/goleveldb/leveldb" - - "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/rpc" - - "github.com/status-im/status-go/eth-node/types" - "github.com/status-im/status-go/params" - "github.com/status-im/status-go/services/ext" -) - -type Service struct { - *ext.Service - w types.Whisper -} - -func New(config params.ShhextConfig, n types.Node, ctx interface{}, handler ext.EnvelopeEventsHandler, ldb *leveldb.DB) *Service { - w, err := n.GetWhisper(ctx) - if err != nil { - panic(err) - } - delay := ext.DefaultRequestsDelay - if config.RequestsDelay != 0 { - delay = config.RequestsDelay - } - requestsRegistry := ext.NewRequestsRegistry(delay) - mailMonitor := ext.NewMailRequestMonitor(w, handler, requestsRegistry) - return &Service{ - Service: ext.New(config, n, ldb, mailMonitor, requestsRegistry, w), - w: w, - } -} - -func (s *Service) PublicWhisperAPI() types.PublicWhisperAPI { - return s.w.PublicWhisperAPI() -} - -// APIs returns a list of new APIs. -func (s *Service) APIs() []rpc.API { - apis := []rpc.API{ - { - Namespace: "shhext", - Version: "1.0", - Service: NewPublicAPI(s), - Public: true, - }, - } - return apis -} - -func (s *Service) SyncMessages(ctx context.Context, mailServerID []byte, r types.SyncMailRequest) (resp types.SyncEventResponse, err error) { - err = s.w.SyncMessages(mailServerID, r) - if err != nil { - return - } - - // Wait for the response which is received asynchronously as a p2p packet. - // This packet handler will send an event which contains the response payload. - events := make(chan types.EnvelopeEvent, 1024) - sub := s.w.SubscribeEnvelopeEvents(events) - defer sub.Unsubscribe() - - // Add explicit timeout context, otherwise the request - // can hang indefinitely if not specified by the sender. - // Sender is usually through netcat or some bash tool - // so it's not really possible to specify the timeout. - timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*30) - defer cancel() - - for { - select { - case event := <-events: - if event.Event != types.EventMailServerSyncFinished { - continue - } - - log.Info("received EventMailServerSyncFinished event", "data", event.Data) - - var ok bool - - resp, ok = event.Data.(types.SyncEventResponse) - if !ok { - err = fmt.Errorf("did not understand the response event data") - return - } - return - case <-timeoutCtx.Done(): - err = timeoutCtx.Err() - return - } - } -} diff --git a/services/wakuext/api.go b/services/wakuext/api.go index 8364abc45..1617bec0e 100644 --- a/services/wakuext/api.go +++ b/services/wakuext/api.go @@ -1,10 +1,7 @@ package wakuext import ( - "context" "crypto/ecdsa" - "encoding/hex" - "fmt" "time" "github.com/ethereum/go-ethereum/log" @@ -72,102 +69,3 @@ func makeEnvelop( } return gethbridge.NewWakuEnvelope(envelope), nil } - -// RequestMessages sends a request for historic messages to a MailServer. -func (api *PublicAPI) RequestMessages(_ context.Context, r ext.MessagesRequest) (types.HexBytes, error) { - api.log.Info("RequestMessages", "request", r) - - now := api.service.w.GetCurrentTime() - r.SetDefaults(now) - - if r.From > r.To { - return nil, fmt.Errorf("Query range is invalid: from > to (%d > %d)", r.From, r.To) - } - - mailServerNode, err := api.service.GetPeer(r.MailServerPeer) - if err != nil { - return nil, fmt.Errorf("%v: %v", ext.ErrInvalidMailServerPeer, err) - } - - var ( - symKey []byte - publicKey *ecdsa.PublicKey - ) - - if r.SymKeyID != "" { - symKey, err = api.service.w.GetSymKey(r.SymKeyID) - if err != nil { - return nil, fmt.Errorf("%v: %v", ext.ErrInvalidSymKeyID, err) - } - } else { - publicKey = mailServerNode.Pubkey() - } - - payload, err := ext.MakeMessagesRequestPayload(r) - if err != nil { - return nil, err - } - - envelope, err := makeEnvelop( - payload, - symKey, - publicKey, - api.service.NodeID(), - api.service.w.MinPow(), - now, - ) - if err != nil { - return nil, err - } - hash := envelope.Hash() - - if !r.Force { - err = api.service.RequestsRegistry().Register(hash, r.Topics) - if err != nil { - return nil, err - } - } - - if err := api.service.w.RequestHistoricMessagesWithTimeout(mailServerNode.ID().Bytes(), envelope, r.Timeout*time.Second); err != nil { - if !r.Force { - api.service.RequestsRegistry().Unregister(hash) - } - return nil, err - } - - return hash[:], nil -} - -// RequestMessagesSync repeats MessagesRequest using configuration in retry conf. -func (api *PublicAPI) RequestMessagesSync(conf ext.RetryConfig, r ext.MessagesRequest) (ext.MessagesResponse, error) { - var resp ext.MessagesResponse - - events := make(chan types.EnvelopeEvent, 10) - var ( - requestID types.HexBytes - err error - retries int - ) - for retries <= conf.MaxRetries { - sub := api.service.w.SubscribeEnvelopeEvents(events) - r.Timeout = conf.BaseTimeout + conf.StepTimeout*time.Duration(retries) - timeout := r.Timeout - // FIXME this weird conversion is required because MessagesRequest expects seconds but defines time.Duration - r.Timeout = time.Duration(int(r.Timeout.Seconds())) - requestID, err = api.RequestMessages(context.Background(), r) - if err != nil { - sub.Unsubscribe() - return resp, err - } - mailServerResp, err := ext.WaitForExpiredOrCompleted(types.BytesToHash(requestID), events, timeout) - sub.Unsubscribe() - if err == nil { - resp.Cursor = hex.EncodeToString(mailServerResp.Cursor) - resp.Error = mailServerResp.Error - return resp, nil - } - retries++ - api.log.Error("[RequestMessagesSync] failed", "err", err, "retries", retries) - } - return resp, fmt.Errorf("failed to request messages after %d retries", retries) -} diff --git a/services/wakuext/service.go b/services/wakuext/service.go index ca4d18e48..f70e35b64 100644 --- a/services/wakuext/service.go +++ b/services/wakuext/service.go @@ -27,7 +27,7 @@ func New(config params.ShhextConfig, n types.Node, ctx interface{}, handler ext. requestsRegistry := ext.NewRequestsRegistry(delay) mailMonitor := ext.NewMailRequestMonitor(w, handler, requestsRegistry) return &Service{ - Service: ext.New(config, n, ldb, mailMonitor, requestsRegistry, w), + Service: ext.New(config, n, ldb, mailMonitor, w), w: w, } }