From 5de3d9f619a95ac186089f8c67e7ead43bb2d673 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Mon, 10 Apr 2023 16:39:49 -0400 Subject: [PATCH] feat(rest): store --- waku/v2/rest/store.go | 212 ++++++++++++++++++++++++++++++++++++ waku/v2/rest/store_api.yaml | 203 ++++++++++++++++++++++++++++++++++ waku/v2/rest/store_test.go | 95 ++++++++++++++++ waku/v2/rest/utils.go | 17 +++ waku/v2/rest/utils_test.go | 22 ++++ waku/v2/rest/waku_rest.go | 2 + 6 files changed, 551 insertions(+) create mode 100644 waku/v2/rest/store.go create mode 100644 waku/v2/rest/store_api.yaml create mode 100644 waku/v2/rest/store_test.go create mode 100644 waku/v2/rest/utils_test.go diff --git a/waku/v2/rest/store.go b/waku/v2/rest/store.go new file mode 100644 index 00000000..c300592f --- /dev/null +++ b/waku/v2/rest/store.go @@ -0,0 +1,212 @@ +package rest + +import ( + "context" + "encoding/base64" + "fmt" + "net/http" + "strconv" + "strings" + "time" + + "github.com/go-chi/chi/v5" + "github.com/multiformats/go-multiaddr" + "github.com/waku-org/go-waku/waku/v2/node" + "github.com/waku-org/go-waku/waku/v2/protocol/store" + "github.com/waku-org/go-waku/waku/v2/protocol/store/pb" + "github.com/waku-org/go-waku/waku/v2/utils" +) + +type StoreService struct { + node *node.WakuNode + mux *chi.Mux +} + +type StoreResponse struct { + Messages []StoreWakuMessage `json:"messages"` + Cursor *HistoryCursor `json:"cursor,omitempty"` + ErrorMessage string `json:"error_message,omitempty"` +} + +type HistoryCursor struct { + PubsubTopic string `json:"pubsub_topic"` + SenderTime string `json:"sender_time"` + StoreTime string `json:"store_time"` + Digest []byte `json:"digest"` +} + +type StoreWakuMessage struct { + Payload []byte `json:"payload"` + ContentTopic string `json:"content_topic"` + Version int32 `json:"version"` + Timestamp int64 `json:"timestamp"` + Meta []byte `json:"meta"` +} + +const ROUTE_STORE_MESSAGESV1 = "/store/v1/messages" + +func NewStoreService(node *node.WakuNode, m *chi.Mux) *StoreService { + s := &StoreService{ + node: node, + mux: m, + } + + m.Get(ROUTE_STORE_MESSAGESV1, s.getV1Messages) + + return s +} + +func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store.HistoryRequestOption, error) { + query := &store.Query{} + var options []store.HistoryRequestOption + + peerAddrStr := r.URL.Query().Get("peerAddr") + m, err := multiaddr.NewMultiaddr(peerAddrStr) + if err != nil { + return nil, nil, nil, err + } + + peerID, err := utils.GetPeerID(m) + if err != nil { + return nil, nil, nil, err + } + + options = append(options, store.WithPeer(peerID)) + + query.Topic = r.URL.Query().Get("pubsubTopic") + + contentTopics := r.URL.Query().Get("contentTopics") + if contentTopics != "" { + query.ContentTopics = strings.Split(contentTopics, ",") + } + + startTimeStr := r.URL.Query().Get("startTime") + if startTimeStr != "" { + query.StartTime, err = strconv.ParseInt(startTimeStr, 10, 64) + if err != nil { + return nil, nil, nil, err + } + } + + endTimeStr := r.URL.Query().Get("endTime") + if endTimeStr != "" { + query.EndTime, err = strconv.ParseInt(endTimeStr, 10, 64) + if err != nil { + return nil, nil, nil, err + } + } + + var cursor *pb.Index + + senderTimeStr := r.URL.Query().Get("senderTime") + storeTimeStr := r.URL.Query().Get("storeTime") + digestStr := r.URL.Query().Get("digest") + + if senderTimeStr != "" || storeTimeStr != "" || digestStr != "" { + cursor = &pb.Index{} + + if senderTimeStr != "" { + cursor.SenderTime, err = strconv.ParseInt(senderTimeStr, 10, 64) + if err != nil { + return nil, nil, nil, err + } + } + + if storeTimeStr != "" { + cursor.ReceiverTime, err = strconv.ParseInt(storeTimeStr, 10, 64) + if err != nil { + return nil, nil, nil, err + } + } + + if digestStr != "" { + cursor.Digest, err = base64.URLEncoding.DecodeString(digestStr) + if err != nil { + return nil, nil, nil, err + } + } + + cursor.PubsubTopic = query.Topic + + options = append(options, store.WithCursor(cursor)) + } + + pageSizeStr := r.URL.Query().Get("pageSize") + ascendingStr := r.URL.Query().Get("ascending") + if ascendingStr != "" || pageSizeStr != "" { + ascending := true + pageSize := uint64(store.MaxPageSize) + if ascendingStr != "" { + ascending, err = strconv.ParseBool(ascendingStr) + if err != nil { + return nil, nil, nil, err + } + } + + if pageSizeStr != "" { + pageSize, err = strconv.ParseUint(pageSizeStr, 10, 64) + if err != nil { + return nil, nil, nil, err + } + } + + options = append(options, store.WithPaging(ascending, pageSize)) + } + + return m, query, options, nil +} + +func writeStoreError(w http.ResponseWriter, code int, err error) { + writeResponse(w, StoreResponse{ErrorMessage: err.Error()}, code) +} + +func toStoreResponse(result *store.Result) StoreResponse { + response := StoreResponse{} + + cursor := result.Cursor() + if cursor != nil { + response.Cursor = &HistoryCursor{ + PubsubTopic: cursor.PubsubTopic, + SenderTime: fmt.Sprintf("%d", cursor.SenderTime), + StoreTime: fmt.Sprintf("%d", cursor.ReceiverTime), + Digest: cursor.Digest, + } + } + + for _, m := range result.Messages { + response.Messages = append(response.Messages, StoreWakuMessage{ + Payload: m.Payload, + ContentTopic: m.ContentTopic, + Version: int32(m.Version), + Timestamp: m.Timestamp, + Meta: m.Meta, + }) + } + + return response +} + +func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) { + peerAddr, query, options, err := getStoreParams(r) + if err != nil { + writeStoreError(w, http.StatusBadRequest, err) + return + } + + ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + defer cancel() + + _, err = d.node.AddPeer(peerAddr) + if err != nil { + writeStoreError(w, http.StatusInternalServerError, err) + return + } + + result, err := d.node.Store().Query(ctx, *query, options...) + if err != nil { + writeStoreError(w, http.StatusInternalServerError, err) + return + } + + writeErrOrResponse(w, nil, toStoreResponse(result)) +} diff --git a/waku/v2/rest/store_api.yaml b/waku/v2/rest/store_api.yaml new file mode 100644 index 00000000..cbd653ca --- /dev/null +++ b/waku/v2/rest/store_api.yaml @@ -0,0 +1,203 @@ +openapi: 3.0.3 +info: + title: Waku V2 node REST API + version: 1.0.0 + contact: + name: VAC Team + url: https://forum.vac.dev/ + +tags: + - name: store + description: Store REST API for WakuV2 node + +paths: + /store/v1/messages: + get: + summary: Gets message history + description: > + Retrieves WakuV2 message history. The returned history + can be potentially filtered by optional request parameters. + operationId: getMessageHistory + tags: + - store + parameters: + - name: peerAddr + in: query + schema: + type: string + required: true + description: > + P2P fully qualified peer multiaddress + in the format `(ip4|ip6)/tcp/p2p/$peerId` and URL-encoded. + example: '%2Fip4%2F127.0.0.1%2Ftcp%2F60001%2Fp2p%2F16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN' + + - name: pubsubTopic + in: query + schema: + type: string + description: > + The pubsub topic on which a WakuMessage is published. + If left empty, no filtering is applied. + It is also intended for pagination purposes. + It should be a URL-encoded string. + example: 'my%20pubsub%20topic' + + - name: contentTopics + in: query + schema: string + description: > + Comma-separated list of content topics. When specified, + only WakuMessages that are linked to any of the given + content topics will be delivered in the get response. + It should be a URL-encoded-comma-separated string. + example: 'my%20first%20content%20topic%2Cmy%20second%20content%20topic%2Cmy%20third%20content%20topic' + + - name: startTime + in: query + schema: + type: string + description: > + The inclusive lower bound on the timestamp of + queried WakuMessages. This field holds the + Unix epoch time in nanoseconds as a 64-bits + integer value. + example: '1680590945000000000' + + - name: endTime + in: query + schema: + type: string + description: > + The inclusive upper bound on the timestamp of + queried WakuMessages. This field holds the + Unix epoch time in nanoseconds as a 64-bits + integer value. + example: '1680590945000000000' + + - name: senderTime + in: query + schema: + type: string + description: > + Cursor field intended for pagination purposes. + Represents the Unix time in nanoseconds at which a message was generated. + It could be empty for retrieving the first page, + and will be returned from the GET response so that + it can be part of the next page request. + example: '1680590947000000000' + + - name: storeTime + in: query + schema: + type: string + description: > + Cursor field intended for pagination purposes. + Represents the Unix time in nanoseconds at which a message was stored. + It could be empty for retrieving the first page, + and will be returned from the GET response so that + it can be part of the next page request. + example: '1680590945000000000' + + - name: digest + in: query + schema: + type: string + description: > + Cursor field intended for pagination purposes. + URL-base64-encoded string computed as a hash of the + a message content topic plus a message payload. + It could be empty for retrieving the first page, + and will be returned from the GET response so that + it can be part of the next page request. + example: 'Gc4ACThW5t2QQO82huq3WnDv%2FapPPJpD%2FwJfxDxAnR0%3D' + + - name: pageSize + in: query + schema: + type: string + description: > + Number of messages to retrieve per page + example: '5' + + - name: ascending + in: query + schema: + type: string + description: > + "true" for paging forward, "false" for paging backward + example: "true" + + responses: + '200': + description: WakuV2 message history. + content: + application/json: + schema: + $ref: '#/components/schemas/StoreResponse' + '400': + description: Bad request error. + content: + text/plain: + type: string + '412': + description: Precondition failed. + content: + text/plain: + type: string + '500': + description: Internal server error. + content: + text/plain: + type: string + +components: + schemas: + StoreResponse: + type: object + properties: + messages: + type: array + items: + $ref: '#/components/schemas/WakuMessage' + cursor: + $ref: '#/components/schemas/HistoryCursor' + error_message: + type: string + required: + - messages + + HistoryCursor: + type: object + properties: + pubsub_topic: + type: string + sender_time: + type: string + store_time: + type: string + digest: + type: string + required: + - pubsub_topic + - sender_time + - store_time + - digest + + WakuMessage: + type: object + properties: + payload: + type: string + content_topic: + type: string + version: + type: integer + format: int32 + timestamp: + type: integer + format: int64 + ephemeral: + type: boolean + required: + - payload + - content_topic diff --git a/waku/v2/rest/store_test.go b/waku/v2/rest/store_test.go new file mode 100644 index 00000000..e9d19f77 --- /dev/null +++ b/waku/v2/rest/store_test.go @@ -0,0 +1,95 @@ +package rest + +import ( + "context" + "encoding/base64" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/go-chi/chi/v5" + "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/tests" + "github.com/waku-org/go-waku/waku/v2/node" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/utils" +) + +func TestGetMessages(t *testing.T) { + + db := MemoryDB(t) + + node1, err := node.New(node.WithWakuStore(), node.WithMessageProvider(db)) + require.NoError(t, err) + err = node1.Start(context.Background()) + require.NoError(t, err) + defer node1.Stop() + + topic1 := "1" + pubsubTopic1 := "topic1" + + msg1 := tests.CreateWakuMessage(topic1, 1) + msg2 := tests.CreateWakuMessage(topic1, 2) + msg3 := tests.CreateWakuMessage(topic1, 3) + + node1.Store().MessageChannel() <- protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1) + node1.Store().MessageChannel() <- protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1) + node1.Store().MessageChannel() <- protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1) + + n1HostInfo, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", node1.Host().ID().Pretty())) + n1Addr := node1.ListenAddresses()[0].Encapsulate(n1HostInfo) + + node2, err := node.New() + require.NoError(t, err) + err = node2.Start(context.Background()) + require.NoError(t, err) + defer node2.Stop() + router := chi.NewRouter() + + _ = NewStoreService(node2, router) + + // TEST: get cursor + // TEST: get no messages + + // First page + rr := httptest.NewRecorder() + queryParams := url.Values{ + "peerAddr": {n1Addr.String()}, + "pubsubTopic": {pubsubTopic1}, + "pageSize": {"2"}, + } + path := ROUTE_STORE_MESSAGESV1 + "?" + queryParams.Encode() + req, _ := http.NewRequest(http.MethodGet, path, nil) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusOK, rr.Code) + + response := StoreResponse{} + err = json.Unmarshal(rr.Body.Bytes(), &response) + require.NoError(t, err) + require.Len(t, response.Messages, 2) + + // Second page + rr = httptest.NewRecorder() + queryParams = url.Values{ + "peerAddr": {n1Addr.String()}, + "pubsubTopic": {pubsubTopic1}, + "senderTime": {response.Cursor.SenderTime}, + "storeTime": {response.Cursor.StoreTime}, + "digest": {base64.URLEncoding.EncodeToString(response.Cursor.Digest)}, + "pageSize": {"2"}, + } + path = ROUTE_STORE_MESSAGESV1 + "?" + queryParams.Encode() + req, _ = http.NewRequest(http.MethodGet, path, nil) + router.ServeHTTP(rr, req) + require.Equal(t, http.StatusOK, rr.Code) + + response = StoreResponse{} + err = json.Unmarshal(rr.Body.Bytes(), &response) + require.NoError(t, err) + require.Len(t, response.Messages, 1) + require.Nil(t, response.Cursor) +} diff --git a/waku/v2/rest/utils.go b/waku/v2/rest/utils.go index f1fde64e..7d549d71 100644 --- a/waku/v2/rest/utils.go +++ b/waku/v2/rest/utils.go @@ -25,3 +25,20 @@ func writeErrOrResponse(w http.ResponseWriter, err error, value interface{}) { return } } + +func writeResponse(w http.ResponseWriter, value interface{}, code int) { + w.Header().Set("Content-Type", "application/json; charset=UTF-8") + jsonResponse, err := json.Marshal(value) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + _, err = w.Write(jsonResponse) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.WriteHeader(code) +} diff --git a/waku/v2/rest/utils_test.go b/waku/v2/rest/utils_test.go new file mode 100644 index 00000000..fe3fdd18 --- /dev/null +++ b/waku/v2/rest/utils_test.go @@ -0,0 +1,22 @@ +package rest + +import ( + "database/sql" + "testing" + + "github.com/stretchr/testify/require" + "github.com/waku-org/go-waku/waku/persistence" + "github.com/waku-org/go-waku/waku/persistence/sqlite" + "github.com/waku-org/go-waku/waku/v2/utils" +) + +func MemoryDB(t *testing.T) *persistence.DBStore { + var db *sql.DB + db, migration, err := sqlite.NewDB(":memory:") + require.NoError(t, err) + + dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migration)) + require.NoError(t, err) + + return dbStore +} diff --git a/waku/v2/rest/waku_rest.go b/waku/v2/rest/waku_rest.go index fc98d18b..fcf2ddd2 100644 --- a/waku/v2/rest/waku_rest.go +++ b/waku/v2/rest/waku_rest.go @@ -35,6 +35,8 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enableAdmin bool _ = NewDebugService(node, mux) + _ = NewStoreService(node, mux) + listenAddr := fmt.Sprintf("%s:%d", address, port) server := &http.Server{