From 4c1f96d255de831537e8e92e77041114e7c96529 Mon Sep 17 00:00:00 2001 From: Igor Mandrigin Date: Tue, 7 May 2019 09:05:38 +0200 Subject: [PATCH] Implement subscription for eth and shh filters using signals. (#1455) --- Makefile | 3 +- api/backend.go | 13 ++ api/backend_subs_test.go | 218 +++++++++++++++++++ services/subscriptions/README.md | 79 +++++++ services/subscriptions/api.go | 53 +++++ services/subscriptions/filters.go | 7 + services/subscriptions/filters_eth.go | 65 ++++++ services/subscriptions/filters_shh.go | 57 +++++ services/subscriptions/service.go | 51 +++++ services/subscriptions/signals.go | 19 ++ services/subscriptions/subscription.go | 68 ++++++ services/subscriptions/subscriptions.go | 88 ++++++++ services/subscriptions/subscriptions_test.go | 193 ++++++++++++++++ signal/events_subs.go | 33 +++ 14 files changed, 946 insertions(+), 1 deletion(-) create mode 100644 api/backend_subs_test.go create mode 100644 services/subscriptions/README.md create mode 100644 services/subscriptions/api.go create mode 100644 services/subscriptions/filters.go create mode 100644 services/subscriptions/filters_eth.go create mode 100644 services/subscriptions/filters_shh.go create mode 100644 services/subscriptions/service.go create mode 100644 services/subscriptions/signals.go create mode 100644 services/subscriptions/subscription.go create mode 100644 services/subscriptions/subscriptions.go create mode 100644 services/subscriptions/subscriptions_test.go create mode 100644 signal/events_subs.go diff --git a/Makefile b/Makefile index 1cad562be..e1c19fcc4 100644 --- a/Makefile +++ b/Makefile @@ -257,7 +257,8 @@ test-unit: UNIT_TEST_PACKAGES = $(shell go list ./... | \ grep -v /vendor | \ grep -v /t/e2e | \ grep -v /t/benchmarks | \ - grep -v /lib) + grep -v /lib | \ + grep -v /transactions/fake ) test-unit: ##@tests Run unit and integration tests go test -v -failfast $(UNIT_TEST_PACKAGES) $(gotest_extraflags) diff --git a/api/backend.go b/api/backend.go index a00ec1ce1..32cda0cc6 100644 --- a/api/backend.go +++ b/api/backend.go @@ -27,6 +27,7 @@ import ( "github.com/status-im/status-go/services/rpcfilters" "github.com/status-im/status-go/services/shhext/chat" "github.com/status-im/status-go/services/shhext/chat/crypto" + "github.com/status-im/status-go/services/subscriptions" "github.com/status-im/status-go/services/typeddata" "github.com/status-im/status-go/signal" "github.com/status-im/status-go/transactions" @@ -62,6 +63,7 @@ type StatusBackend struct { connectionState connectionState appState appState log log.Logger + allowAllRPC bool // used only for tests, disables api method restrictions } // NewStatusBackend create a new NewStatusBackend instance @@ -126,6 +128,12 @@ func (b *StatusBackend) rpcFiltersService() gethnode.ServiceConstructor { } } +func (b *StatusBackend) subscriptionService() gethnode.ServiceConstructor { + return func(*gethnode.ServiceContext) (gethnode.Service, error) { + return subscriptions.New(b.statusNode), nil + } +} + func (b *StatusBackend) startNode(config *params.NodeConfig) (err error) { defer func() { if r := recover(); r != nil { @@ -140,6 +148,7 @@ func (b *StatusBackend) startNode(config *params.NodeConfig) (err error) { services := []gethnode.ServiceConstructor{} services = appendIf(config.UpstreamConfig.Enabled, services, b.rpcFiltersService()) + services = append(services, b.subscriptionService()) if err = b.statusNode.StartWithOptions(config, node.StartOptions{ Services: services, @@ -397,6 +406,10 @@ func (b *StatusBackend) registerHandlers() error { }, ) + if b.allowAllRPC { + // this should only happen in unit-tests, this variable is not available outside this package + continue + } client.RegisterHandler(params.SendTransactionMethodName, unsupportedMethodHandler) client.RegisterHandler(params.PersonalSignMethodName, unsupportedMethodHandler) client.RegisterHandler(params.PersonalRecoverMethodName, unsupportedMethodHandler) diff --git a/api/backend_subs_test.go b/api/backend_subs_test.go new file mode 100644 index 000000000..a5c081767 --- /dev/null +++ b/api/backend_subs_test.go @@ -0,0 +1,218 @@ +package api + +import ( + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/status-im/status-go/params" + "github.com/status-im/status-go/signal" + "github.com/status-im/status-go/t/utils" + "github.com/stretchr/testify/require" +) + +const ( + password = "abc" +) + +// since `backend_test` grew too big, subscription tests are moved to its own part + +func TestSubscriptionPendingTransaction(t *testing.T) { + backend := NewStatusBackend() + backend.allowAllRPC = true + + account, _ := initNodeAndLogin(t, backend) + + defer func() { require.NoError(t, backend.StopNode()) }() + + signals := make(chan string) + defer func() { + signal.ResetDefaultNodeNotificationHandler() + close(signals) + }() + + signal.SetDefaultNodeNotificationHandler(func(jsonEvent string) { + signals <- jsonEvent + }) + + subID := createSubscription(t, backend, `"eth_newPendingTransactionFilter", []`) + + createTxFmt := ` + { + "jsonrpc":"2.0", + "method":"eth_sendTransaction", + "params":[ + { + "from": "%s", + "to": "0xd46e8dd67c5d32be8058bb8eb970870f07244567", + "gas": "0x100000", + "gasPrice": "0x0", + "value": "0x0", + "data": "0xd46e8dd67c5d32be8d46e8dd67c5d32be8058bb8eb970870f072445675058bb8eb970870f072445675" + }], + "id":99 + }` + + txJSONResponse, err := backend.CallPrivateRPC(fmt.Sprintf(createTxFmt, account)) + require.NoError(t, err) + + createdTxID := extractResult(t, txJSONResponse) + + select { + case event := <-signals: + validateTxEvent(t, subID, event, createdTxID) + case <-time.After(2 * time.Second): + require.Fail(t, "timeout waiting for subscription") + } +} + +func TestSubscriptionWhisperEnvelopes(t *testing.T) { + backend := NewStatusBackend() + + initNodeAndLogin(t, backend) + + defer func() { require.NoError(t, backend.StopNode()) }() + + signals := make(chan string) + defer func() { + signal.ResetDefaultNodeNotificationHandler() + close(signals) + }() + + signal.SetDefaultNodeNotificationHandler(func(jsonEvent string) { + signals <- jsonEvent + }) + + topic := "0x12341234" + payload := "0x12312312" + + shhGenSymKeyJSONResponse, err := backend.CallPrivateRPC(`{"jsonrpc":"2.0","method":"shh_generateSymKeyFromPassword","params":["test"],"id":119}`) + require.NoError(t, err) + symKeyID := extractResult(t, shhGenSymKeyJSONResponse) + + subID := createSubscription(t, backend, fmt.Sprintf(`"shh_newMessageFilter", [{ "symKeyID": "%s", "topics": ["%s"] }]`, symKeyID, topic)) + + sendMessageFmt := ` + { + "jsonrpc": "2.0", + "method": "shh_post", + "params": [{ + "ttl": 7, + "symKeyID": "%s", + "topic": "%s", + "powTarget": 2.01, + "powTime": 2, + "payload": "%s" + }], + "id":11 + }` + + numberOfEnvelopes := 5 + + for i := 0; i < numberOfEnvelopes; i++ { + _, err = backend.CallPrivateRPC(fmt.Sprintf(sendMessageFmt, symKeyID, topic, payload)) + require.NoError(t, err) + } + + select { + case event := <-signals: + validateShhEvent(t, event, subID, numberOfEnvelopes, topic, payload) + case <-time.After(2 * time.Second): + require.Fail(t, "timeout waiting for subscription") + } +} + +// * * * * * * * * * * utility methods below * * * * * * * * * * * + +func validateShhEvent(t *testing.T, jsonEvent string, expectedSubID string, numberOfEnvelopes int, topic string, payload string) { + result := struct { + Event signal.SubscriptionDataEvent `json:"event"` + Type string `json:"type"` + }{} + + require.NoError(t, json.Unmarshal([]byte(jsonEvent), &result)) + + require.Equal(t, signal.EventSubscriptionsData, result.Type) + require.Equal(t, expectedSubID, result.Event.FilterID) + + require.Equal(t, numberOfEnvelopes, len(result.Event.Data)) + + for _, item := range result.Event.Data { + dict := item.(map[string]interface{}) + require.Equal(t, dict["topic"], topic) + require.Equal(t, dict["payload"], payload) + } +} + +func validateTxEvent(t *testing.T, expectedSubID string, jsonEvent string, txID string) { + result := struct { + Event signal.SubscriptionDataEvent `json:"event"` + Type string `json:"type"` + }{} + + expectedData := []interface{}{ + txID, + } + + require.NoError(t, json.Unmarshal([]byte(jsonEvent), &result)) + + require.Equal(t, signal.EventSubscriptionsData, result.Type) + require.Equal(t, expectedSubID, result.Event.FilterID) + require.Equal(t, expectedData, result.Event.Data) +} + +func extractResult(t *testing.T, jsonString string) string { + resultMap := make(map[string]interface{}) + err := json.Unmarshal([]byte(jsonString), &resultMap) + require.NoError(t, err) + + value, ok := resultMap["result"] + require.True(t, ok) + + return value.(string) +} + +func createSubscription(t *testing.T, backend *StatusBackend, params string) string { + createSubFmt := ` + { + "jsonrpc": "2.0", + "id": 10, + "method": "eth_subscribeSignal", + "params": [%s] + + }` + + jsonResponse, err := backend.CallPrivateRPC(fmt.Sprintf(createSubFmt, params)) + require.NoError(t, err) + + return extractResult(t, jsonResponse) +} + +func initNodeAndLogin(t *testing.T, backend *StatusBackend) (string, string) { + config, err := utils.MakeTestNodeConfig(params.StatusChainNetworkID) + require.NoError(t, err) + + err = backend.StartNode(config) + require.NoError(t, err) + + info, _, err := backend.AccountManager().CreateAccount(password) + require.NoError(t, err) + + require.NoError(t, backend.AccountManager().SelectAccount(info.WalletAddress, info.ChatAddress, password)) + + unlockFmt := ` + { + "jsonrpc": "2.0", + "id": 11, + "method": "personal_unlockAccount", + "params": ["%s", "%s"] + }` + + unlockResult, err := backend.CallPrivateRPC(fmt.Sprintf(unlockFmt, info.WalletAddress, password)) + require.NoError(t, err) + + require.NotContains(t, unlockResult, "err") + + return info.WalletAddress, info.ChatPubKey +} diff --git a/services/subscriptions/README.md b/services/subscriptions/README.md new file mode 100644 index 000000000..c433305f5 --- /dev/null +++ b/services/subscriptions/README.md @@ -0,0 +1,79 @@ +# Signal Subscriptions + +This package implements subscriptions mechanics using [`signal`](../../signal) package. + +It defines 3 new RPC methods in the `eth` namespace and 2 signals. + +## Methods + +###`eth_subscribeSignal` +Creates a new filter and subscribes to its changes via signals. + +Parameters: receives the method name and parameters for the filter that is created. + +Example 1: +```json +{ + "jsonrpc": "2.0", + "id": 1, + "method": "eth_subscribeSignal", + "params": ["eth_newPendingTransactionFilter", []] +} +``` + +Example 2: +```json +{ + "jsonrpc": "2.0", + "id": 2, + "method": "eth_subscribeSignal", + "params": [ + "shh_newMessageFilter", + [{ "symKeyID":"abcabcabcabc", "topics": ["0x12341234"] }] + ] +} +``` + +Supported filters: `shh_newMessageFilter`, `eth_newFilter`, `eth_newBlockFilter`, `eth_newPendingTransactionFilter` +(see [Ethereum documentation](https://github.com/ethereum/wiki/wiki/JSON-RPC) for respective parameters). + +Returns: error or `subscriptionID`. + + +###`eth_unsubscribeSignal` +Unsubscribes and removes one filter by its ID. +NOTE: Unsubscribing from a filter removes it. + +Parameters: `subscriptionID` obtained from `eth_subscribeSignal` +Returns: error if something went wrong while unsubscribing. + + +## Signals + +1. Subscription data received + +```json +{ + "type": "subscriptions.data", + "event": { + "subscription_id": "shh_0x01", + "data": { + , + , + ... + } +} +``` + +2. Subscription error received + +```json +{ + "type": "subscriptions.error", + "event": { + "subscription_id": "shh_0x01", + "error_message": "can not find filter with id: 0x01" + } +} +``` + diff --git a/services/subscriptions/api.go b/services/subscriptions/api.go new file mode 100644 index 000000000..f15f3d8b4 --- /dev/null +++ b/services/subscriptions/api.go @@ -0,0 +1,53 @@ +package subscriptions + +import ( + "fmt" + "time" + + "github.com/status-im/status-go/node" +) + +type API struct { + node *node.StatusNode + activeSubscriptions *Subscriptions +} + +func NewPublicAPI(node *node.StatusNode) *API { + return &API{ + node: node, + activeSubscriptions: NewSubscriptions(100 * time.Millisecond), + } +} + +func (api *API) SubscribeSignal(method string, args []interface{}) (SubscriptionID, error) { + var ( + filter filter + err error + namespace = method[:3] + ) + + rpc := api.node.RPCPrivateClient() + + switch namespace { + case "shh": + filter, err = installShhFilter(rpc, method, args) + case "eth": + filter, err = installEthFilter(rpc, method, args) + default: + err = fmt.Errorf("unexpected namespace: %s", namespace) + } + + if err != nil { + return SubscriptionID(""), fmt.Errorf("[SubscribeSignal] could not subscribe, failed to call %s: %v", method, err) + } + + return api.activeSubscriptions.Create(namespace, filter) +} + +func (api *API) UnsubscribeSignal(id string) error { + return api.activeSubscriptions.Remove(SubscriptionID(id)) +} + +func (api *API) shutdown() error { + return api.activeSubscriptions.removeAll() +} diff --git a/services/subscriptions/filters.go b/services/subscriptions/filters.go new file mode 100644 index 000000000..bbe117acd --- /dev/null +++ b/services/subscriptions/filters.go @@ -0,0 +1,7 @@ +package subscriptions + +type filter interface { + getID() string + getChanges() ([]interface{}, error) + uninstall() error +} diff --git a/services/subscriptions/filters_eth.go b/services/subscriptions/filters_eth.go new file mode 100644 index 000000000..887de17cd --- /dev/null +++ b/services/subscriptions/filters_eth.go @@ -0,0 +1,65 @@ +package subscriptions + +import ( + "fmt" + + "github.com/status-im/status-go/rpc" +) + +type ethFilter struct { + id string + rpcClient *rpc.Client +} + +func installEthFilter(rpcClient *rpc.Client, method string, args []interface{}) (*ethFilter, error) { + + if err := validateEthMethod(method); err != nil { + return nil, err + } + + var result string + + err := rpcClient.Call(&result, method, args) + + if err != nil { + return nil, err + } + + filter := ðFilter{ + id: result, + rpcClient: rpcClient, + } + + return filter, nil + +} + +func (ef *ethFilter) getID() string { + return ef.id +} + +func (ef *ethFilter) getChanges() ([]interface{}, error) { + var result []interface{} + + err := ef.rpcClient.Call(&result, "eth_getFilterChanges", ef.getID()) + + return result, err +} + +func (ef *ethFilter) uninstall() error { + return ef.rpcClient.Call(nil, "eth_uninstallFilter", ef.getID()) +} + +func validateEthMethod(method string) error { + for _, allowedMethod := range []string{ + "eth_newFilter", + "eth_newBlockFilter", + "eth_newPendingTransactionFilter", + } { + if method == allowedMethod { + return nil + } + } + + return fmt.Errorf("unexpected filter method: %s", method) +} diff --git a/services/subscriptions/filters_shh.go b/services/subscriptions/filters_shh.go new file mode 100644 index 000000000..230f61780 --- /dev/null +++ b/services/subscriptions/filters_shh.go @@ -0,0 +1,57 @@ +package subscriptions + +import ( + "fmt" + + "github.com/status-im/status-go/rpc" +) + +type whisperFilter struct { + id string + rpcClient *rpc.Client +} + +func installShhFilter(rpcClient *rpc.Client, method string, args []interface{}) (*whisperFilter, error) { + + if err := validateShhMethod(method); err != nil { + return nil, err + } + + var result string + + err := rpcClient.Call(&result, method, args...) + + if err != nil { + return nil, err + } + + filter := &whisperFilter{ + id: result, + rpcClient: rpcClient, + } + + return filter, nil +} + +func (wf *whisperFilter) getChanges() ([]interface{}, error) { + var result []interface{} + + err := wf.rpcClient.Call(&result, "shh_getFilterMessages", wf.getID()) + + return result, err +} + +func (wf *whisperFilter) getID() string { + return wf.id +} + +func (wf *whisperFilter) uninstall() error { + return wf.rpcClient.Call(nil, "shh_deleteMessageFilter", wf.getID()) +} + +func validateShhMethod(method string) error { + if method != "shh_newMessageFilter" { + return fmt.Errorf("unexpected filter method: %s", method) + } + return nil +} diff --git a/services/subscriptions/service.go b/services/subscriptions/service.go new file mode 100644 index 000000000..b70f760a1 --- /dev/null +++ b/services/subscriptions/service.go @@ -0,0 +1,51 @@ +package subscriptions + +import ( + gethnode "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/rpc" + + "github.com/status-im/status-go/node" +) + +// Make sure that Service implements node.Service interface. +var _ gethnode.Service = (*Service)(nil) + +// Service represents our own implementation of personal sign operations. +type Service struct { + api *API +} + +// New returns a new Service. +func New(node *node.StatusNode) *Service { + return &Service{ + api: NewPublicAPI(node), + } +} + +// Protocols returns a new protocols list. In this case, there are none. +func (s *Service) Protocols() []p2p.Protocol { + return []p2p.Protocol{} +} + +// APIs returns a list of new APIs. +func (s *Service) APIs() []rpc.API { + return []rpc.API{ + { + Namespace: "eth", + Version: "1.0", + Service: s.api, + Public: true, + }, + } +} + +// Start is run when a service is started. +func (s *Service) Start(server *p2p.Server) error { + return nil +} + +// Stop is run when a service is stopped. +func (s *Service) Stop() error { + return s.api.shutdown() +} diff --git a/services/subscriptions/signals.go b/services/subscriptions/signals.go new file mode 100644 index 000000000..d521ae728 --- /dev/null +++ b/services/subscriptions/signals.go @@ -0,0 +1,19 @@ +package subscriptions + +import "github.com/status-im/status-go/signal" + +type filterSignal struct { + filterID string +} + +func newFilterSignal(filterID string) *filterSignal { + return &filterSignal{filterID} +} + +func (s *filterSignal) SendError(err error) { + signal.SendSubscriptionErrorEvent(s.filterID, err) +} + +func (s *filterSignal) SendData(data []interface{}) { + signal.SendSubscriptionDataEvent(s.filterID, data) +} diff --git a/services/subscriptions/subscription.go b/services/subscriptions/subscription.go new file mode 100644 index 000000000..919687720 --- /dev/null +++ b/services/subscriptions/subscription.go @@ -0,0 +1,68 @@ +package subscriptions + +import ( + "errors" + "fmt" + "time" +) + +type SubscriptionID string + +type Subscription struct { + id SubscriptionID + signal *filterSignal + quit chan struct{} + filter filter + stopped bool +} + +func NewSubscription(namespace string, filter filter) *Subscription { + subscriptionID := NewSubscriptionID(namespace, filter.getID()) + + quit := make(chan struct{}) + + return &Subscription{ + id: subscriptionID, + quit: quit, + signal: newFilterSignal(string(subscriptionID)), + filter: filter, + } +} + +func (s *Subscription) Start(checkPeriod time.Duration) error { + if s.stopped { + return errors.New("it is impossible to start an already stopped subscription") + } + ticker := time.NewTicker(checkPeriod) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + filterData, err := s.filter.getChanges() + if err != nil { + s.signal.SendError(err) + } else if len(filterData) > 0 { + s.signal.SendData(filterData) + } + case <-s.quit: + return nil + } + } +} + +func (s *Subscription) Stop(uninstall bool) error { + if s.stopped { + return nil + } + close(s.quit) + if uninstall { + return s.filter.uninstall() + } + s.stopped = true + return nil +} + +func NewSubscriptionID(namespace, filterID string) SubscriptionID { + return SubscriptionID(fmt.Sprintf("%s-%s", namespace, filterID)) +} diff --git a/services/subscriptions/subscriptions.go b/services/subscriptions/subscriptions.go new file mode 100644 index 000000000..01369a468 --- /dev/null +++ b/services/subscriptions/subscriptions.go @@ -0,0 +1,88 @@ +package subscriptions + +import ( + "fmt" + "sync" + "time" + + "github.com/ethereum/go-ethereum/log" +) + +type Subscriptions struct { + mu sync.Mutex + subs map[SubscriptionID]*Subscription + checkPeriod time.Duration + log log.Logger +} + +func NewSubscriptions(period time.Duration) *Subscriptions { + return &Subscriptions{ + subs: make(map[SubscriptionID]*Subscription), + checkPeriod: period, + log: log.New("package", "status-go/services/subsriptions.Subscriptions"), + } +} + +func (s *Subscriptions) Create(namespace string, filter filter) (SubscriptionID, error) { + s.mu.Lock() + defer s.mu.Unlock() + + newSub := NewSubscription(namespace, filter) + + go func() { + err := newSub.Start(s.checkPeriod) + if err != nil { + s.log.Error("error while starting subscription", "err", err) + } + }() + + s.subs[newSub.id] = newSub + + return newSub.id, nil +} + +func (s *Subscriptions) Remove(id SubscriptionID) error { + s.mu.Lock() + defer s.mu.Unlock() + + found, err := s.stopSubscription(id, true) + + if found { + delete(s.subs, id) + } + + return err +} + +func (s *Subscriptions) removeAll() error { + s.mu.Lock() + defer s.mu.Unlock() + + unsubscribeErrors := make(map[SubscriptionID]error) + + for id := range s.subs { + _, err := s.stopSubscription(id, false) + if err != nil { + unsubscribeErrors[id] = err + } + } + + s.subs = make(map[SubscriptionID]*Subscription) + + if len(unsubscribeErrors) > 0 { + return fmt.Errorf("errors while cleaning up subscriptions: %+v", unsubscribeErrors) + } + + return nil +} + +// stopSubscription isn't thread safe! +func (s *Subscriptions) stopSubscription(id SubscriptionID, uninstall bool) (bool, error) { + sub, found := s.subs[id] + if !found { + return false, nil + } + + return true, sub.Stop(uninstall) + +} diff --git a/services/subscriptions/subscriptions_test.go b/services/subscriptions/subscriptions_test.go new file mode 100644 index 000000000..c64585a9f --- /dev/null +++ b/services/subscriptions/subscriptions_test.go @@ -0,0 +1,193 @@ +package subscriptions + +import ( + "encoding/json" + "errors" + "fmt" + "testing" + "time" + + "github.com/status-im/status-go/signal" + "github.com/stretchr/testify/require" +) + +const ( + filterID = "123" + filterNS = "tst" +) + +type mockFilter struct { + filterID string + data []interface{} + filterError error + uninstalled bool + uninstallError error +} + +func newMockFilter(filterID string) *mockFilter { + return &mockFilter{ + filterID: filterID, + } +} + +func (mf *mockFilter) getID() string { + return mf.filterID +} +func (mf *mockFilter) getChanges() ([]interface{}, error) { + if mf.filterError != nil { + err := mf.filterError + mf.filterError = nil + return nil, err + } + + data := mf.data + mf.data = nil + return data, nil +} + +func (mf *mockFilter) uninstall() error { + mf.uninstalled = true + return mf.uninstallError +} + +func (mf *mockFilter) setData(data ...interface{}) { + mf.data = data +} + +func (mf *mockFilter) setError(err error) { + mf.data = nil + mf.filterError = err +} + +func TestSubscriptionGetData(t *testing.T) { + filter := newMockFilter(filterID) + + subs := NewSubscriptions(time.Microsecond) + + subID, _ := subs.Create(filterNS, filter) + + require.Equal(t, string(subID), fmt.Sprintf("%s-%s", filterNS, filterID)) + + proceed := make(chan struct{}) + + signal.SetDefaultNodeNotificationHandler(func(jsonEvent string) { + defer close(proceed) + validateFilterData(t, jsonEvent, string(subID), "1", "2", "3", "4") + }) + + filter.setData("1", "2", "3", "4") + + select { + case <-proceed: + return + case <-time.After(time.Second): + require.NoError(t, errors.New("timeout while waiting for filter results")) + } + + require.NoError(t, subs.removeAll()) + signal.ResetDefaultNodeNotificationHandler() +} + +func TestSubscriptionGetError(t *testing.T) { + filter := newMockFilter(filterID) + + subs := NewSubscriptions(time.Microsecond) + + subID, _ := subs.Create(filterNS, filter) + + require.Equal(t, string(subID), fmt.Sprintf("%s-%s", filterNS, filterID)) + + proceed := make(chan struct{}) + + expectedError := errors.New("test-error") + + signal.SetDefaultNodeNotificationHandler(func(jsonEvent string) { + defer close(proceed) + validateFilterError(t, jsonEvent, string(subID), expectedError.Error()) + }) + + filter.setError(expectedError) + + select { + case <-proceed: + return + case <-time.After(time.Second): + require.NoError(t, errors.New("timeout while waiting for filter results")) + } + + require.NoError(t, subs.removeAll()) + signal.ResetDefaultNodeNotificationHandler() +} + +func TestSubscriptionRemove(t *testing.T) { + filter := newMockFilter(filterID) + + subs := NewSubscriptions(time.Microsecond) + + subID, _ := subs.Create(filterNS, filter) + + require.NoError(t, subs.Remove(subID)) + + require.True(t, filter.uninstalled) + require.Empty(t, subs.subs) +} + +func TestSubscriptionRemoveError(t *testing.T) { + filter := newMockFilter(filterID) + filter.uninstallError = errors.New("uninstall-error-1") + + subs := NewSubscriptions(time.Microsecond) + + subID, _ := subs.Create(filterNS, filter) + + require.Equal(t, subs.Remove(subID), filter.uninstallError) + + require.True(t, filter.uninstalled) + require.Equal(t, len(subs.subs), 0) +} + +func TestSubscriptionRemoveAll(t *testing.T) { + filter0 := newMockFilter(filterID) + filter1 := newMockFilter(filterID + "1") + + subs := NewSubscriptions(time.Microsecond) + _, err := subs.Create(filterNS, filter0) + require.NoError(t, err) + _, err = subs.Create(filterNS, filter1) + require.NoError(t, err) + + require.Equal(t, len(subs.subs), 2) + + require.NoError(t, subs.removeAll()) + + require.False(t, filter0.uninstalled) + require.False(t, filter1.uninstalled) + + require.Equal(t, len(subs.subs), 0) +} + +func validateFilterError(t *testing.T, jsonEvent string, expectedSubID string, expectedErrorMessage string) { + result := struct { + Event signal.SubscriptionErrorEvent `json:"event"` + Type string `json:"type"` + }{} + + require.NoError(t, json.Unmarshal([]byte(jsonEvent), &result)) + + require.Equal(t, signal.EventSubscriptionsError, result.Type) + require.Equal(t, expectedErrorMessage, result.Event.ErrorMessage) +} + +func validateFilterData(t *testing.T, jsonEvent string, expectedSubID string, expectedData ...interface{}) { + result := struct { + Event signal.SubscriptionDataEvent `json:"event"` + Type string `json:"type"` + }{} + + require.NoError(t, json.Unmarshal([]byte(jsonEvent), &result)) + + require.Equal(t, signal.EventSubscriptionsData, result.Type) + require.Equal(t, expectedData, result.Event.Data) + require.Equal(t, expectedSubID, result.Event.FilterID) + +} diff --git a/signal/events_subs.go b/signal/events_subs.go new file mode 100644 index 000000000..a0c88a8a6 --- /dev/null +++ b/signal/events_subs.go @@ -0,0 +1,33 @@ +package signal + +const ( + // EventSubscriptionsData is triggered when there is new data in any of the subscriptions + EventSubscriptionsData = "subscriptions.data" + // EventSubscriptionsError is triggered when subscriptions failed to get new data + EventSubscriptionsError = "subscriptions.error" +) + +type SubscriptionDataEvent struct { + FilterID string `json:"subscription_id"` + Data []interface{} `json:"data"` +} + +type SubscriptionErrorEvent struct { + FilterID string `json:"subscription_id"` + ErrorMessage string `json:"error_message"` +} + +// SendSubscriptionDataEvent +func SendSubscriptionDataEvent(filterID string, data []interface{}) { + send(EventSubscriptionsData, SubscriptionDataEvent{ + FilterID: filterID, + Data: data, + }) +} + +// SendSubscriptionErrorEvent +func SendSubscriptionErrorEvent(filterID string, err error) { + send(EventSubscriptionsError, SubscriptionErrorEvent{ + ErrorMessage: err.Error(), + }) +}