From b81bd6ff308a98ce8f93c0303efca6f06c5bc3ba Mon Sep 17 00:00:00 2001 From: Anthony Laibe Date: Tue, 9 Nov 2021 15:15:45 +0100 Subject: [PATCH] feat: Add rpc store Add test for rpc services --- waku/v2/rpc/coded_test.go | 2 -- waku/v2/rpc/debug_test.go | 22 ++++++++++++ waku/v2/rpc/relay_test.go | 55 +++++++++++++++++++++++++++++ waku/v2/rpc/store.go | 68 ++++++++++++++++++++++++++++++++++++ waku/v2/rpc/store_test.go | 32 +++++++++++++++++ waku/v2/rpc/util_test.go | 15 ++++++++ waku/v2/rpc/waku_rpc.go | 5 +++ waku/v2/rpc/waku_rpc_test.go | 19 ++++++++++ 8 files changed, 216 insertions(+), 2 deletions(-) create mode 100644 waku/v2/rpc/debug_test.go create mode 100644 waku/v2/rpc/relay_test.go create mode 100644 waku/v2/rpc/store.go create mode 100644 waku/v2/rpc/store_test.go create mode 100644 waku/v2/rpc/util_test.go create mode 100644 waku/v2/rpc/waku_rpc_test.go diff --git a/waku/v2/rpc/coded_test.go b/waku/v2/rpc/coded_test.go index a6ad73a2..7f593c32 100644 --- a/waku/v2/rpc/coded_test.go +++ b/waku/v2/rpc/coded_test.go @@ -1,7 +1,5 @@ package rpc -// - import ( "testing" diff --git a/waku/v2/rpc/debug_test.go b/waku/v2/rpc/debug_test.go new file mode 100644 index 00000000..e84696d0 --- /dev/null +++ b/waku/v2/rpc/debug_test.go @@ -0,0 +1,22 @@ +package rpc + +import ( + "bytes" + "net/http" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestGetV1Info(t *testing.T) { + var reply InfoReply + + request, err := http.NewRequest(http.MethodPost, "url", bytes.NewReader([]byte(""))) + require.NoError(t, err) + + d := &DebugService{nil} + + err = d.GetV1Info(request, &InfoArgs{}, &reply) + require.NoError(t, err) + require.Equal(t, "2.0", reply.Version) +} diff --git a/waku/v2/rpc/relay_test.go b/waku/v2/rpc/relay_test.go new file mode 100644 index 00000000..83342adc --- /dev/null +++ b/waku/v2/rpc/relay_test.go @@ -0,0 +1,55 @@ +package rpc + +import ( + "context" + "testing" + + "github.com/status-im/go-waku/waku/v2/node" + "github.com/stretchr/testify/require" +) + +func makeRelayService(t *testing.T) *RelayService { + options := node.WithWakuRelay() + n, err := node.New(context.Background(), options) + require.NoError(t, err) + err = n.Start() + require.NoError(t, err) + return &RelayService{n} +} + +func TestPostV1Message(t *testing.T) { + var reply SuccessReply + + d := makeRelayService(t) + + err := d.PostV1Message( + makeRequest(t), + &RelayMessageArgs{}, + &reply, + ) + require.NoError(t, err) + require.True(t, reply.Success) +} + +func TestSubscription(t *testing.T) { + var reply SuccessReply + + d := makeRelayService(t) + args := &TopicsArgs{Topics: []string{"test"}} + + err := d.PostV1Subscription( + makeRequest(t), + args, + &reply, + ) + require.NoError(t, err) + require.True(t, reply.Success) + + err = d.DeleteV1Subscription( + makeRequest(t), + args, + &reply, + ) + require.NoError(t, err) + require.True(t, reply.Success) +} diff --git a/waku/v2/rpc/store.go b/waku/v2/rpc/store.go new file mode 100644 index 00000000..7354f123 --- /dev/null +++ b/waku/v2/rpc/store.go @@ -0,0 +1,68 @@ +package rpc + +import ( + "net/http" + + "github.com/status-im/go-waku/waku/v2/node" + "github.com/status-im/go-waku/waku/v2/protocol/pb" + "github.com/status-im/go-waku/waku/v2/protocol/store" +) + +type StoreService struct { + node *node.WakuNode +} + +// cursor *pb.Index +// pageSize uint64 +// asc bool + +type StorePagingOptions struct { + PageSize uint64 `json:"pageSize,omitempty"` + Cursor *pb.Index `json:"cursor,omitempty"` + Forward bool `json:"forward,omitempty"` +} + +type StoreMessagesArgs struct { + Topic string `json:"pubsubTopic,omitempty"` + ContentFilters []string `json:"contentFilters,omitempty"` + StartTime float64 `json:"startTime,omitempty"` + EndTime float64 `json:"endTime,omitempty"` + PagingOptions StorePagingOptions `json:"pagingOptions,omitempty"` +} + +type StoreMessagesReply struct { + Messages []*pb.WakuMessage `json:"messages,omitempty"` + PagingInfo StorePagingOptions `json:"pagingInfo,omitempty"` + Error string `json:"error,omitempty"` +} + +func (s *StoreService) GetV1Messages(req *http.Request, args *StoreMessagesArgs, reply *StoreMessagesReply) error { + options := []store.HistoryRequestOption{ + store.WithAutomaticRequestId(), + store.WithAutomaticPeerSelection(), + store.WithPaging(args.PagingOptions.Forward, args.PagingOptions.PageSize), + store.WithCursor(args.PagingOptions.Cursor), + } + res, err := s.node.Store().Query( + req.Context(), + store.Query{ + Topic: args.Topic, + ContentTopics: args.ContentFilters, + StartTime: args.StartTime, + EndTime: args.EndTime, + }, + options..., + ) + if err != nil { + log.Error("Error querying messages:", err) + reply.Error = err.Error() + return nil + } + reply.Messages = res.Messages + reply.PagingInfo = StorePagingOptions{ + PageSize: args.PagingOptions.PageSize, + Cursor: res.Cursor(), + Forward: args.PagingOptions.Forward, + } + return nil +} diff --git a/waku/v2/rpc/store_test.go b/waku/v2/rpc/store_test.go new file mode 100644 index 00000000..4f25b02e --- /dev/null +++ b/waku/v2/rpc/store_test.go @@ -0,0 +1,32 @@ +package rpc + +import ( + "context" + "testing" + + "github.com/status-im/go-waku/waku/v2/node" + "github.com/stretchr/testify/require" +) + +func makeStoreService(t *testing.T) *StoreService { + options := node.WithWakuStore(false, false) + n, err := node.New(context.Background(), options) + require.NoError(t, err) + err = n.Start() + require.NoError(t, err) + return &StoreService{n} +} + +func TestGetV1Message(t *testing.T) { + var reply StoreMessagesReply + + s := makeStoreService(t) + + err := s.GetV1Messages( + makeRequest(t), + &StoreMessagesArgs{}, + &reply, + ) + require.NoError(t, err) + require.NotEmpty(t, reply.Error) +} diff --git a/waku/v2/rpc/util_test.go b/waku/v2/rpc/util_test.go new file mode 100644 index 00000000..adf0abd4 --- /dev/null +++ b/waku/v2/rpc/util_test.go @@ -0,0 +1,15 @@ +package rpc + +import ( + "bytes" + "net/http" + "testing" + + "github.com/stretchr/testify/require" +) + +func makeRequest(t *testing.T) *http.Request { + request, err := http.NewRequest(http.MethodPost, "url", bytes.NewReader([]byte(""))) + require.NoError(t, err) + return request +} diff --git a/waku/v2/rpc/waku_rpc.go b/waku/v2/rpc/waku_rpc.go index 21f9f2f2..a4b9fa4a 100644 --- a/waku/v2/rpc/waku_rpc.go +++ b/waku/v2/rpc/waku_rpc.go @@ -33,6 +33,11 @@ func NewWakuRpc(node *node.WakuNode, address string, port int) *WakuRpc { log.Error(err) } + err = s.RegisterService(&StoreService{node}, "Store") + if err != nil { + log.Error(err) + } + mux := http.NewServeMux() mux.HandleFunc("/jsonrpc", func(w http.ResponseWriter, r *http.Request) { t := time.Now() diff --git a/waku/v2/rpc/waku_rpc_test.go b/waku/v2/rpc/waku_rpc_test.go new file mode 100644 index 00000000..1cec40e8 --- /dev/null +++ b/waku/v2/rpc/waku_rpc_test.go @@ -0,0 +1,19 @@ +package rpc + +import ( + "context" + "testing" + + "github.com/status-im/go-waku/waku/v2/node" + "github.com/stretchr/testify/require" +) + +func TestWakuRpc(t *testing.T) { + options := node.WithWakuStore(false, false) + n, err := node.New(context.Background(), options) + require.NoError(t, err) + + rpc := NewWakuRpc(n, "127.0.0.1", 8080) + require.NotNil(t, rpc.server) + require.Equal(t, rpc.server.Addr, "127.0.0.1:8080") +}