202 lines
4.8 KiB
Go
202 lines
4.8 KiB
Go
package subscriptions
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/status-im/status-go/signal"
|
|
)
|
|
|
|
const (
|
|
filterID = "123"
|
|
filterNS = "tst"
|
|
)
|
|
|
|
type mockFilter struct {
|
|
sync.Mutex
|
|
filterID string
|
|
data []interface{}
|
|
filterError error
|
|
uninstalled bool
|
|
uninstallError error
|
|
}
|
|
|
|
func newMockFilter(filterID string) *mockFilter {
|
|
return &mockFilter{
|
|
filterID: filterID,
|
|
}
|
|
}
|
|
|
|
func (mf *mockFilter) getID() string {
|
|
mf.Lock()
|
|
defer mf.Unlock()
|
|
return mf.filterID
|
|
}
|
|
func (mf *mockFilter) getChanges() ([]interface{}, error) {
|
|
mf.Lock()
|
|
defer mf.Unlock()
|
|
|
|
if mf.filterError != nil {
|
|
err := mf.filterError
|
|
mf.filterError = nil
|
|
return nil, err
|
|
}
|
|
|
|
data := mf.data
|
|
mf.data = nil
|
|
return data, nil
|
|
}
|
|
|
|
func (mf *mockFilter) uninstall() error {
|
|
mf.Lock()
|
|
defer mf.Unlock()
|
|
mf.uninstalled = true
|
|
return mf.uninstallError
|
|
}
|
|
|
|
func (mf *mockFilter) setData(data ...interface{}) {
|
|
mf.Lock()
|
|
defer mf.Unlock()
|
|
mf.data = data
|
|
}
|
|
|
|
func (mf *mockFilter) setError(err error) {
|
|
mf.Lock()
|
|
defer mf.Unlock()
|
|
mf.data = nil
|
|
mf.filterError = err
|
|
}
|
|
|
|
func TestSubscriptionGetData(t *testing.T) {
|
|
filter := newMockFilter(filterID)
|
|
|
|
subs := NewSubscriptions(time.Microsecond)
|
|
|
|
subID, _ := subs.Create(filterNS, filter)
|
|
|
|
require.Equal(t, string(subID), fmt.Sprintf("%s-%s", filterNS, filterID))
|
|
|
|
proceed := make(chan struct{})
|
|
|
|
signal.SetDefaultNodeNotificationHandler(func(jsonEvent string) {
|
|
defer close(proceed)
|
|
validateFilterData(t, jsonEvent, string(subID), "1", "2", "3", "4")
|
|
})
|
|
|
|
filter.setData("1", "2", "3", "4")
|
|
|
|
select {
|
|
case <-proceed:
|
|
return
|
|
case <-time.After(time.Second):
|
|
require.NoError(t, errors.New("timeout while waiting for filter results"))
|
|
}
|
|
|
|
require.NoError(t, subs.removeAll())
|
|
signal.ResetDefaultNodeNotificationHandler()
|
|
}
|
|
|
|
func TestSubscriptionGetError(t *testing.T) {
|
|
filter := newMockFilter(filterID)
|
|
|
|
subs := NewSubscriptions(time.Microsecond)
|
|
|
|
subID, _ := subs.Create(filterNS, filter)
|
|
|
|
require.Equal(t, string(subID), fmt.Sprintf("%s-%s", filterNS, filterID))
|
|
|
|
proceed := make(chan struct{})
|
|
|
|
expectedError := errors.New("test-error")
|
|
|
|
signal.SetDefaultNodeNotificationHandler(func(jsonEvent string) {
|
|
defer close(proceed)
|
|
validateFilterError(t, jsonEvent, string(subID), expectedError.Error())
|
|
})
|
|
|
|
filter.setError(expectedError)
|
|
|
|
select {
|
|
case <-proceed:
|
|
return
|
|
case <-time.After(time.Second):
|
|
require.NoError(t, errors.New("timeout while waiting for filter results"))
|
|
}
|
|
|
|
require.NoError(t, subs.removeAll())
|
|
signal.ResetDefaultNodeNotificationHandler()
|
|
}
|
|
|
|
func TestSubscriptionRemove(t *testing.T) {
|
|
filter := newMockFilter(filterID)
|
|
subs := NewSubscriptions(time.Microsecond)
|
|
|
|
subID, err := subs.Create(filterNS, filter)
|
|
require.NoError(t, err)
|
|
time.Sleep(time.Millisecond * 100) // create starts in a goroutine
|
|
|
|
require.NoError(t, subs.Remove(subID))
|
|
require.True(t, filter.uninstalled)
|
|
require.Empty(t, subs.subs)
|
|
}
|
|
|
|
func TestSubscriptionRemoveError(t *testing.T) {
|
|
filter := newMockFilter(filterID)
|
|
filter.uninstallError = errors.New("uninstall-error-1")
|
|
|
|
subs := NewSubscriptions(time.Microsecond)
|
|
subID, err := subs.Create(filterNS, filter)
|
|
require.NoError(t, err)
|
|
time.Sleep(time.Millisecond * 100) // create starts in a goroutine
|
|
|
|
require.Equal(t, subs.Remove(subID), filter.uninstallError)
|
|
require.True(t, filter.uninstalled)
|
|
require.Equal(t, len(subs.subs), 0)
|
|
}
|
|
|
|
func TestSubscriptionRemoveAll(t *testing.T) {
|
|
filter0 := newMockFilter(filterID)
|
|
filter1 := newMockFilter(filterID + "1")
|
|
|
|
subs := NewSubscriptions(time.Microsecond)
|
|
_, err := subs.Create(filterNS, filter0)
|
|
require.NoError(t, err)
|
|
_, err = subs.Create(filterNS, filter1)
|
|
require.NoError(t, err)
|
|
time.Sleep(time.Millisecond * 100) // create starts in a goroutine
|
|
|
|
require.Equal(t, len(subs.subs), 2)
|
|
err = subs.removeAll()
|
|
require.NoError(t, err)
|
|
require.False(t, filter0.uninstalled)
|
|
require.False(t, filter1.uninstalled)
|
|
require.Equal(t, len(subs.subs), 0)
|
|
}
|
|
|
|
func validateFilterError(t *testing.T, jsonEvent string, expectedSubID string, expectedErrorMessage string) {
|
|
result := struct {
|
|
Event signal.SubscriptionErrorEvent `json:"event"`
|
|
Type string `json:"type"`
|
|
}{}
|
|
require.NoError(t, json.Unmarshal([]byte(jsonEvent), &result))
|
|
require.Equal(t, signal.EventSubscriptionsError, result.Type)
|
|
require.Equal(t, expectedErrorMessage, result.Event.ErrorMessage)
|
|
}
|
|
|
|
func validateFilterData(t *testing.T, jsonEvent string, expectedSubID string, expectedData ...interface{}) {
|
|
result := struct {
|
|
Event signal.SubscriptionDataEvent `json:"event"`
|
|
Type string `json:"type"`
|
|
}{}
|
|
require.NoError(t, json.Unmarshal([]byte(jsonEvent), &result))
|
|
require.Equal(t, signal.EventSubscriptionsData, result.Type)
|
|
require.Equal(t, expectedData, result.Event.Data)
|
|
require.Equal(t, expectedSubID, result.Event.FilterID)
|
|
}
|