feat(rest): store

This commit is contained in:
Richard Ramos 2023-04-10 16:39:49 -04:00 committed by RichΛrd
parent bbb558e685
commit 5de3d9f619
6 changed files with 551 additions and 0 deletions

212
waku/v2/rest/store.go Normal file
View File

@ -0,0 +1,212 @@
package rest
import (
"context"
"encoding/base64"
"fmt"
"net/http"
"strconv"
"strings"
"time"
"github.com/go-chi/chi/v5"
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
)
type StoreService struct {
node *node.WakuNode
mux *chi.Mux
}
type StoreResponse struct {
Messages []StoreWakuMessage `json:"messages"`
Cursor *HistoryCursor `json:"cursor,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
}
type HistoryCursor struct {
PubsubTopic string `json:"pubsub_topic"`
SenderTime string `json:"sender_time"`
StoreTime string `json:"store_time"`
Digest []byte `json:"digest"`
}
type StoreWakuMessage struct {
Payload []byte `json:"payload"`
ContentTopic string `json:"content_topic"`
Version int32 `json:"version"`
Timestamp int64 `json:"timestamp"`
Meta []byte `json:"meta"`
}
const ROUTE_STORE_MESSAGESV1 = "/store/v1/messages"
func NewStoreService(node *node.WakuNode, m *chi.Mux) *StoreService {
s := &StoreService{
node: node,
mux: m,
}
m.Get(ROUTE_STORE_MESSAGESV1, s.getV1Messages)
return s
}
func getStoreParams(r *http.Request) (multiaddr.Multiaddr, *store.Query, []store.HistoryRequestOption, error) {
query := &store.Query{}
var options []store.HistoryRequestOption
peerAddrStr := r.URL.Query().Get("peerAddr")
m, err := multiaddr.NewMultiaddr(peerAddrStr)
if err != nil {
return nil, nil, nil, err
}
peerID, err := utils.GetPeerID(m)
if err != nil {
return nil, nil, nil, err
}
options = append(options, store.WithPeer(peerID))
query.Topic = r.URL.Query().Get("pubsubTopic")
contentTopics := r.URL.Query().Get("contentTopics")
if contentTopics != "" {
query.ContentTopics = strings.Split(contentTopics, ",")
}
startTimeStr := r.URL.Query().Get("startTime")
if startTimeStr != "" {
query.StartTime, err = strconv.ParseInt(startTimeStr, 10, 64)
if err != nil {
return nil, nil, nil, err
}
}
endTimeStr := r.URL.Query().Get("endTime")
if endTimeStr != "" {
query.EndTime, err = strconv.ParseInt(endTimeStr, 10, 64)
if err != nil {
return nil, nil, nil, err
}
}
var cursor *pb.Index
senderTimeStr := r.URL.Query().Get("senderTime")
storeTimeStr := r.URL.Query().Get("storeTime")
digestStr := r.URL.Query().Get("digest")
if senderTimeStr != "" || storeTimeStr != "" || digestStr != "" {
cursor = &pb.Index{}
if senderTimeStr != "" {
cursor.SenderTime, err = strconv.ParseInt(senderTimeStr, 10, 64)
if err != nil {
return nil, nil, nil, err
}
}
if storeTimeStr != "" {
cursor.ReceiverTime, err = strconv.ParseInt(storeTimeStr, 10, 64)
if err != nil {
return nil, nil, nil, err
}
}
if digestStr != "" {
cursor.Digest, err = base64.URLEncoding.DecodeString(digestStr)
if err != nil {
return nil, nil, nil, err
}
}
cursor.PubsubTopic = query.Topic
options = append(options, store.WithCursor(cursor))
}
pageSizeStr := r.URL.Query().Get("pageSize")
ascendingStr := r.URL.Query().Get("ascending")
if ascendingStr != "" || pageSizeStr != "" {
ascending := true
pageSize := uint64(store.MaxPageSize)
if ascendingStr != "" {
ascending, err = strconv.ParseBool(ascendingStr)
if err != nil {
return nil, nil, nil, err
}
}
if pageSizeStr != "" {
pageSize, err = strconv.ParseUint(pageSizeStr, 10, 64)
if err != nil {
return nil, nil, nil, err
}
}
options = append(options, store.WithPaging(ascending, pageSize))
}
return m, query, options, nil
}
func writeStoreError(w http.ResponseWriter, code int, err error) {
writeResponse(w, StoreResponse{ErrorMessage: err.Error()}, code)
}
func toStoreResponse(result *store.Result) StoreResponse {
response := StoreResponse{}
cursor := result.Cursor()
if cursor != nil {
response.Cursor = &HistoryCursor{
PubsubTopic: cursor.PubsubTopic,
SenderTime: fmt.Sprintf("%d", cursor.SenderTime),
StoreTime: fmt.Sprintf("%d", cursor.ReceiverTime),
Digest: cursor.Digest,
}
}
for _, m := range result.Messages {
response.Messages = append(response.Messages, StoreWakuMessage{
Payload: m.Payload,
ContentTopic: m.ContentTopic,
Version: int32(m.Version),
Timestamp: m.Timestamp,
Meta: m.Meta,
})
}
return response
}
func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) {
peerAddr, query, options, err := getStoreParams(r)
if err != nil {
writeStoreError(w, http.StatusBadRequest, err)
return
}
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
_, err = d.node.AddPeer(peerAddr)
if err != nil {
writeStoreError(w, http.StatusInternalServerError, err)
return
}
result, err := d.node.Store().Query(ctx, *query, options...)
if err != nil {
writeStoreError(w, http.StatusInternalServerError, err)
return
}
writeErrOrResponse(w, nil, toStoreResponse(result))
}

203
waku/v2/rest/store_api.yaml Normal file
View File

@ -0,0 +1,203 @@
openapi: 3.0.3
info:
title: Waku V2 node REST API
version: 1.0.0
contact:
name: VAC Team
url: https://forum.vac.dev/
tags:
- name: store
description: Store REST API for WakuV2 node
paths:
/store/v1/messages:
get:
summary: Gets message history
description: >
Retrieves WakuV2 message history. The returned history
can be potentially filtered by optional request parameters.
operationId: getMessageHistory
tags:
- store
parameters:
- name: peerAddr
in: query
schema:
type: string
required: true
description: >
P2P fully qualified peer multiaddress
in the format `(ip4|ip6)/tcp/p2p/$peerId` and URL-encoded.
example: '%2Fip4%2F127.0.0.1%2Ftcp%2F60001%2Fp2p%2F16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN'
- name: pubsubTopic
in: query
schema:
type: string
description: >
The pubsub topic on which a WakuMessage is published.
If left empty, no filtering is applied.
It is also intended for pagination purposes.
It should be a URL-encoded string.
example: 'my%20pubsub%20topic'
- name: contentTopics
in: query
schema: string
description: >
Comma-separated list of content topics. When specified,
only WakuMessages that are linked to any of the given
content topics will be delivered in the get response.
It should be a URL-encoded-comma-separated string.
example: 'my%20first%20content%20topic%2Cmy%20second%20content%20topic%2Cmy%20third%20content%20topic'
- name: startTime
in: query
schema:
type: string
description: >
The inclusive lower bound on the timestamp of
queried WakuMessages. This field holds the
Unix epoch time in nanoseconds as a 64-bits
integer value.
example: '1680590945000000000'
- name: endTime
in: query
schema:
type: string
description: >
The inclusive upper bound on the timestamp of
queried WakuMessages. This field holds the
Unix epoch time in nanoseconds as a 64-bits
integer value.
example: '1680590945000000000'
- name: senderTime
in: query
schema:
type: string
description: >
Cursor field intended for pagination purposes.
Represents the Unix time in nanoseconds at which a message was generated.
It could be empty for retrieving the first page,
and will be returned from the GET response so that
it can be part of the next page request.
example: '1680590947000000000'
- name: storeTime
in: query
schema:
type: string
description: >
Cursor field intended for pagination purposes.
Represents the Unix time in nanoseconds at which a message was stored.
It could be empty for retrieving the first page,
and will be returned from the GET response so that
it can be part of the next page request.
example: '1680590945000000000'
- name: digest
in: query
schema:
type: string
description: >
Cursor field intended for pagination purposes.
URL-base64-encoded string computed as a hash of the
a message content topic plus a message payload.
It could be empty for retrieving the first page,
and will be returned from the GET response so that
it can be part of the next page request.
example: 'Gc4ACThW5t2QQO82huq3WnDv%2FapPPJpD%2FwJfxDxAnR0%3D'
- name: pageSize
in: query
schema:
type: string
description: >
Number of messages to retrieve per page
example: '5'
- name: ascending
in: query
schema:
type: string
description: >
"true" for paging forward, "false" for paging backward
example: "true"
responses:
'200':
description: WakuV2 message history.
content:
application/json:
schema:
$ref: '#/components/schemas/StoreResponse'
'400':
description: Bad request error.
content:
text/plain:
type: string
'412':
description: Precondition failed.
content:
text/plain:
type: string
'500':
description: Internal server error.
content:
text/plain:
type: string
components:
schemas:
StoreResponse:
type: object
properties:
messages:
type: array
items:
$ref: '#/components/schemas/WakuMessage'
cursor:
$ref: '#/components/schemas/HistoryCursor'
error_message:
type: string
required:
- messages
HistoryCursor:
type: object
properties:
pubsub_topic:
type: string
sender_time:
type: string
store_time:
type: string
digest:
type: string
required:
- pubsub_topic
- sender_time
- store_time
- digest
WakuMessage:
type: object
properties:
payload:
type: string
content_topic:
type: string
version:
type: integer
format: int32
timestamp:
type: integer
format: int64
ephemeral:
type: boolean
required:
- payload
- content_topic

View File

@ -0,0 +1,95 @@
package rest
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"github.com/go-chi/chi/v5"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/utils"
)
func TestGetMessages(t *testing.T) {
db := MemoryDB(t)
node1, err := node.New(node.WithWakuStore(), node.WithMessageProvider(db))
require.NoError(t, err)
err = node1.Start(context.Background())
require.NoError(t, err)
defer node1.Stop()
topic1 := "1"
pubsubTopic1 := "topic1"
msg1 := tests.CreateWakuMessage(topic1, 1)
msg2 := tests.CreateWakuMessage(topic1, 2)
msg3 := tests.CreateWakuMessage(topic1, 3)
node1.Store().MessageChannel() <- protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), pubsubTopic1)
node1.Store().MessageChannel() <- protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), pubsubTopic1)
node1.Store().MessageChannel() <- protocol.NewEnvelope(msg3, utils.GetUnixEpoch(), pubsubTopic1)
n1HostInfo, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", node1.Host().ID().Pretty()))
n1Addr := node1.ListenAddresses()[0].Encapsulate(n1HostInfo)
node2, err := node.New()
require.NoError(t, err)
err = node2.Start(context.Background())
require.NoError(t, err)
defer node2.Stop()
router := chi.NewRouter()
_ = NewStoreService(node2, router)
// TEST: get cursor
// TEST: get no messages
// First page
rr := httptest.NewRecorder()
queryParams := url.Values{
"peerAddr": {n1Addr.String()},
"pubsubTopic": {pubsubTopic1},
"pageSize": {"2"},
}
path := ROUTE_STORE_MESSAGESV1 + "?" + queryParams.Encode()
req, _ := http.NewRequest(http.MethodGet, path, nil)
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
response := StoreResponse{}
err = json.Unmarshal(rr.Body.Bytes(), &response)
require.NoError(t, err)
require.Len(t, response.Messages, 2)
// Second page
rr = httptest.NewRecorder()
queryParams = url.Values{
"peerAddr": {n1Addr.String()},
"pubsubTopic": {pubsubTopic1},
"senderTime": {response.Cursor.SenderTime},
"storeTime": {response.Cursor.StoreTime},
"digest": {base64.URLEncoding.EncodeToString(response.Cursor.Digest)},
"pageSize": {"2"},
}
path = ROUTE_STORE_MESSAGESV1 + "?" + queryParams.Encode()
req, _ = http.NewRequest(http.MethodGet, path, nil)
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
response = StoreResponse{}
err = json.Unmarshal(rr.Body.Bytes(), &response)
require.NoError(t, err)
require.Len(t, response.Messages, 1)
require.Nil(t, response.Cursor)
}

View File

@ -25,3 +25,20 @@ func writeErrOrResponse(w http.ResponseWriter, err error, value interface{}) {
return
}
}
func writeResponse(w http.ResponseWriter, value interface{}, code int) {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
jsonResponse, err := json.Marshal(value)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
_, err = w.Write(jsonResponse)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(code)
}

View File

@ -0,0 +1,22 @@
package rest
import (
"database/sql"
"testing"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/waku/persistence"
"github.com/waku-org/go-waku/waku/persistence/sqlite"
"github.com/waku-org/go-waku/waku/v2/utils"
)
func MemoryDB(t *testing.T) *persistence.DBStore {
var db *sql.DB
db, migration, err := sqlite.NewDB(":memory:")
require.NoError(t, err)
dbStore, err := persistence.NewDBStore(utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(migration))
require.NoError(t, err)
return dbStore
}

View File

@ -35,6 +35,8 @@ func NewWakuRest(node *node.WakuNode, address string, port int, enableAdmin bool
_ = NewDebugService(node, mux)
_ = NewStoreService(node, mux)
listenAddr := fmt.Sprintf("%s:%d", address, port)
server := &http.Server{