feat(REST): storev3 client (#1100)

This commit is contained in:
richΛrd 2024-05-14 09:06:11 -04:00 committed by GitHub
parent e8dc887c6f
commit 7028a0b1cb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 666 additions and 335 deletions

View File

@ -0,0 +1,212 @@
package rest
import (
"context"
"encoding/base64"
"fmt"
"net/http"
"strconv"
"strings"
"time"
"github.com/go-chi/chi/v5"
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/pb"
)
type LegacyStoreService struct {
node *node.WakuNode
mux *chi.Mux
}
type LegacyStoreResponse struct {
Messages []LegacyStoreWakuMessage `json:"messages"`
Cursor *LegacyHistoryCursor `json:"cursor,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
}
type LegacyHistoryCursor struct {
PubsubTopic string `json:"pubsubTopic"`
SenderTime string `json:"senderTime"`
StoreTime string `json:"storeTime"`
Digest []byte `json:"digest"`
}
type LegacyStoreWakuMessage struct {
Payload []byte `json:"payload"`
ContentTopic string `json:"contentTopic"`
Version *uint32 `json:"version,omitempty"`
Timestamp *int64 `json:"timestamp,omitempty"`
Meta []byte `json:"meta,omitempty"`
}
const routeLegacyStoreMessagesV1 = "/store/v1/messages"
func NewLegacyStoreService(node *node.WakuNode, m *chi.Mux) *LegacyStoreService {
s := &LegacyStoreService{
node: node,
mux: m,
}
m.Get(routeLegacyStoreMessagesV1, s.getV1Messages)
return s
}
func getLegacyStoreParams(r *http.Request) (*legacy_store.Query, []legacy_store.HistoryRequestOption, error) {
query := &legacy_store.Query{}
var options []legacy_store.HistoryRequestOption
var err error
peerAddrStr := r.URL.Query().Get("peerAddr")
var m multiaddr.Multiaddr
if peerAddrStr != "" {
m, err = multiaddr.NewMultiaddr(peerAddrStr)
if err != nil {
return nil, nil, err
}
options = append(options, legacy_store.WithPeerAddr(m))
} else {
// The user didn't specify a peer address and self-node is configured as a store node.
// In this case we assume that the user is willing to retrieve the messages stored by
// the local/self store node.
options = append(options, legacy_store.WithLocalQuery())
}
query.PubsubTopic = r.URL.Query().Get("pubsubTopic")
contentTopics := r.URL.Query().Get("contentTopics")
if contentTopics != "" {
query.ContentTopics = strings.Split(contentTopics, ",")
}
startTimeStr := r.URL.Query().Get("startTime")
if startTimeStr != "" {
startTime, err := strconv.ParseInt(startTimeStr, 10, 64)
if err != nil {
return nil, nil, err
}
query.StartTime = &startTime
}
endTimeStr := r.URL.Query().Get("endTime")
if endTimeStr != "" {
endTime, err := strconv.ParseInt(endTimeStr, 10, 64)
if err != nil {
return nil, nil, err
}
query.EndTime = &endTime
}
var cursor *pb.Index
senderTimeStr := r.URL.Query().Get("senderTime")
storeTimeStr := r.URL.Query().Get("storeTime")
digestStr := r.URL.Query().Get("digest")
if senderTimeStr != "" || storeTimeStr != "" || digestStr != "" {
cursor = &pb.Index{}
if senderTimeStr != "" {
cursor.SenderTime, err = strconv.ParseInt(senderTimeStr, 10, 64)
if err != nil {
return nil, nil, err
}
}
if storeTimeStr != "" {
cursor.ReceiverTime, err = strconv.ParseInt(storeTimeStr, 10, 64)
if err != nil {
return nil, nil, err
}
}
if digestStr != "" {
cursor.Digest, err = base64.URLEncoding.DecodeString(digestStr)
if err != nil {
return nil, nil, err
}
}
cursor.PubsubTopic = query.PubsubTopic
options = append(options, legacy_store.WithCursor(cursor))
}
pageSizeStr := r.URL.Query().Get("pageSize")
ascendingStr := r.URL.Query().Get("ascending")
if ascendingStr != "" || pageSizeStr != "" {
ascending := true
pageSize := uint64(legacy_store.DefaultPageSize)
if ascendingStr != "" {
ascending, err = strconv.ParseBool(ascendingStr)
if err != nil {
return nil, nil, err
}
}
if pageSizeStr != "" {
pageSize, err = strconv.ParseUint(pageSizeStr, 10, 64)
if err != nil {
return nil, nil, err
}
if pageSize > legacy_store.MaxPageSize {
pageSize = legacy_store.MaxPageSize
}
}
options = append(options, legacy_store.WithPaging(ascending, pageSize))
}
return query, options, nil
}
func writeLegacyStoreError(w http.ResponseWriter, code int, err error) {
writeResponse(w, LegacyStoreResponse{ErrorMessage: err.Error()}, code)
}
func toLegacyStoreResponse(result *legacy_store.Result) LegacyStoreResponse {
response := LegacyStoreResponse{}
cursor := result.Cursor()
if cursor != nil {
response.Cursor = &LegacyHistoryCursor{
PubsubTopic: cursor.PubsubTopic,
SenderTime: fmt.Sprintf("%d", cursor.SenderTime),
StoreTime: fmt.Sprintf("%d", cursor.ReceiverTime),
Digest: cursor.Digest,
}
}
for _, m := range result.Messages {
response.Messages = append(response.Messages, LegacyStoreWakuMessage{
Payload: m.Payload,
ContentTopic: m.ContentTopic,
Version: m.Version,
Timestamp: m.Timestamp,
Meta: m.Meta,
})
}
return response
}
func (d *LegacyStoreService) getV1Messages(w http.ResponseWriter, r *http.Request) {
query, options, err := getLegacyStoreParams(r)
if err != nil {
writeLegacyStoreError(w, http.StatusBadRequest, err)
return
}
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
result, err := d.node.LegacyStore().Query(ctx, *query, options...)
if err != nil {
writeLegacyStoreError(w, http.StatusInternalServerError, err)
return
}
writeErrOrResponse(w, nil, toLegacyStoreResponse(result))
}

View File

@ -0,0 +1,203 @@
openapi: 3.0.3
info:
title: Waku V2 node Store REST API
version: 1.0.0
contact:
name: VAC Team
url: https://forum.vac.dev/
tags:
- name: store
description: Store REST API for WakuV2 node
paths:
/store/v1/messages:
get:
summary: Gets message history
description: >
Retrieves WakuV2 message history. The returned history
can be potentially filtered by optional request parameters.
operationId: getMessageHistory
tags:
- store
parameters:
- name: peerAddr
in: query
schema:
type: string
required: true
description: >
P2P fully qualified peer multiaddress
in the format `(ip4|ip6)/tcp/p2p/$peerId` and URL-encoded.
example: '%2Fip4%2F127.0.0.1%2Ftcp%2F60001%2Fp2p%2F16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN'
- name: pubsubTopic
in: query
schema:
type: string
description: >
The pubsub topic on which a WakuMessage is published.
If left empty, no filtering is applied.
It is also intended for pagination purposes.
It should be a URL-encoded string.
example: 'my%20pubsub%20topic'
- name: contentTopics
in: query
schema: string
description: >
Comma-separated list of content topics. When specified,
only WakuMessages that are linked to any of the given
content topics will be delivered in the get response.
It should be a URL-encoded-comma-separated string.
example: 'my%20first%20content%20topic%2Cmy%20second%20content%20topic%2Cmy%20third%20content%20topic'
- name: startTime
in: query
schema:
type: string
description: >
The inclusive lower bound on the timestamp of
queried WakuMessages. This field holds the
Unix epoch time in nanoseconds as a 64-bits
integer value.
example: '1680590945000000000'
- name: endTime
in: query
schema:
type: string
description: >
The inclusive upper bound on the timestamp of
queried WakuMessages. This field holds the
Unix epoch time in nanoseconds as a 64-bits
integer value.
example: '1680590945000000000'
- name: senderTime
in: query
schema:
type: string
description: >
Cursor field intended for pagination purposes.
Represents the Unix time in nanoseconds at which a message was generated.
It could be empty for retrieving the first page,
and will be returned from the GET response so that
it can be part of the next page request.
example: '1680590947000000000'
- name: storeTime
in: query
schema:
type: string
description: >
Cursor field intended for pagination purposes.
Represents the Unix time in nanoseconds at which a message was stored.
It could be empty for retrieving the first page,
and will be returned from the GET response so that
it can be part of the next page request.
example: '1680590945000000000'
- name: digest
in: query
schema:
type: string
description: >
Cursor field intended for pagination purposes.
URL-base64-encoded string computed as a hash of the
a message content topic plus a message payload.
It could be empty for retrieving the first page,
and will be returned from the GET response so that
it can be part of the next page request.
example: 'Gc4ACThW5t2QQO82huq3WnDv%2FapPPJpD%2FwJfxDxAnR0%3D'
- name: pageSize
in: query
schema:
type: string
description: >
Number of messages to retrieve per page
example: '5'
- name: ascending
in: query
schema:
type: string
description: >
"true" for paging forward, "false" for paging backward
example: "true"
responses:
'200':
description: WakuV2 message history.
content:
application/json:
schema:
$ref: '#/components/schemas/StoreResponse'
'400':
description: Bad request error.
content:
text/plain:
type: string
'412':
description: Precondition failed.
content:
text/plain:
type: string
'500':
description: Internal server error.
content:
text/plain:
type: string
components:
schemas:
StoreResponse:
type: object
properties:
messages:
type: array
items:
$ref: '#/components/schemas/WakuMessage'
cursor:
$ref: '#/components/schemas/HistoryCursor'
error_message:
type: string
required:
- messages
HistoryCursor:
type: object
properties:
pubsub_topic:
type: string
sender_time:
type: string
store_time:
type: string
digest:
type: string
required:
- pubsub_topic
- sender_time
- store_time
- digest
WakuMessage:
type: object
properties:
payload:
type: string
content_topic:
type: string
version:
type: integer
format: int32
timestamp:
type: integer
format: int64
ephemeral:
type: boolean
required:
- payload
- content_topic

View File

@ -52,7 +52,7 @@ func TestGetMessages(t *testing.T) {
defer node2.Stop()
router := chi.NewRouter()
_ = NewStoreService(node2, router)
_ = NewLegacyStoreService(node2, router)
// TEST: get cursor
// TEST: get no messages
@ -64,12 +64,12 @@ func TestGetMessages(t *testing.T) {
"pubsubTopic": {pubsubTopic1},
"pageSize": {"2"},
}
path := routeStoreMessagesV1 + "?" + queryParams.Encode()
path := routeLegacyStoreMessagesV1 + "?" + queryParams.Encode()
req, _ := http.NewRequest(http.MethodGet, path, nil)
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
response := StoreResponse{}
response := LegacyStoreResponse{}
err = json.Unmarshal(rr.Body.Bytes(), &response)
require.NoError(t, err)
require.Len(t, response.Messages, 2)
@ -84,12 +84,12 @@ func TestGetMessages(t *testing.T) {
"digest": {base64.URLEncoding.EncodeToString(response.Cursor.Digest)},
"pageSize": {"2"},
}
path = routeStoreMessagesV1 + "?" + queryParams.Encode()
path = routeLegacyStoreMessagesV1 + "?" + queryParams.Encode()
req, _ = http.NewRequest(http.MethodGet, path, nil)
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
response = StoreResponse{}
response = LegacyStoreResponse{}
err = json.Unmarshal(rr.Body.Bytes(), &response)
require.NoError(t, err)
require.Len(t, response.Messages, 1)

View File

@ -3,7 +3,7 @@ package rest
import (
"context"
"encoding/base64"
"fmt"
"errors"
"net/http"
"strconv"
"strings"
@ -12,52 +12,34 @@ import (
"github.com/go-chi/chi/v5"
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
storepb "github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
"google.golang.org/protobuf/proto"
)
type StoreService struct {
type StoreQueryService struct {
node *node.WakuNode
mux *chi.Mux
}
type StoreResponse struct {
Messages []StoreWakuMessage `json:"messages"`
Cursor *HistoryCursor `json:"cursor,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
}
const routeStoreMessagesV1 = "/store/v3/messages"
type HistoryCursor struct {
PubsubTopic string `json:"pubsubTopic"`
SenderTime string `json:"senderTime"`
StoreTime string `json:"storeTime"`
Digest []byte `json:"digest"`
}
type StoreWakuMessage struct {
Payload []byte `json:"payload"`
ContentTopic string `json:"contentTopic"`
Version *uint32 `json:"version,omitempty"`
Timestamp *int64 `json:"timestamp,omitempty"`
Meta []byte `json:"meta,omitempty"`
}
const routeStoreMessagesV1 = "/store/v1/messages"
func NewStoreService(node *node.WakuNode, m *chi.Mux) *StoreService {
s := &StoreService{
func NewStoreQueryService(node *node.WakuNode, m *chi.Mux) *StoreQueryService {
s := &StoreQueryService{
node: node,
mux: m,
}
m.Get(routeStoreMessagesV1, s.getV1Messages)
m.Get(routeStoreMessagesV1, s.getV3Messages)
return s
}
func getStoreParams(r *http.Request) (*legacy_store.Query, []legacy_store.HistoryRequestOption, error) {
query := &legacy_store.Query{}
var options []legacy_store.HistoryRequestOption
func getStoreParams(r *http.Request) (store.Criteria, []store.RequestOption, error) {
var options []store.RequestOption
var err error
peerAddrStr := r.URL.Query().Get("peerAddr")
var m multiaddr.Multiaddr
@ -66,72 +48,78 @@ func getStoreParams(r *http.Request) (*legacy_store.Query, []legacy_store.Histor
if err != nil {
return nil, nil, err
}
options = append(options, legacy_store.WithPeerAddr(m))
} else {
// The user didn't specify a peer address and self-node is configured as a store node.
// In this case we assume that the user is willing to retrieve the messages stored by
// the local/self store node.
options = append(options, legacy_store.WithLocalQuery())
options = append(options, store.WithPeerAddr(m))
}
query.PubsubTopic = r.URL.Query().Get("pubsubTopic")
includeData := false
includeDataStr := r.URL.Query().Get("includeData")
if includeDataStr != "" {
includeData, err = strconv.ParseBool(includeDataStr)
if err != nil {
return nil, nil, errors.New("invalid value for includeData. Use true|false")
}
}
options = append(options, store.IncludeData(includeData))
pubsubTopic := r.URL.Query().Get("pubsubTopic")
contentTopics := r.URL.Query().Get("contentTopics")
var contentTopicsArr []string
if contentTopics != "" {
query.ContentTopics = strings.Split(contentTopics, ",")
contentTopicsArr = strings.Split(contentTopics, ",")
}
hashesStr := r.URL.Query().Get("hashes")
var hashes []pb.MessageHash
if hashesStr != "" {
hashesStrArr := strings.Split(hashesStr, ",")
for _, hashStr := range hashesStrArr {
hash, err := base64.URLEncoding.DecodeString(hashStr)
if err != nil {
return nil, nil, err
}
hashes = append(hashes, pb.ToMessageHash(hash))
}
}
isMsgHashCriteria := false
if len(hashes) != 0 {
isMsgHashCriteria = true
if pubsubTopic != "" || len(contentTopics) != 0 {
return nil, nil, errors.New("cant use content filters while specifying message hashes")
}
} else {
if pubsubTopic == "" || len(contentTopicsArr) != 0 {
return nil, nil, errors.New("pubsubTOpic and contentTopics are required")
}
}
startTimeStr := r.URL.Query().Get("startTime")
var startTime int64
if startTimeStr != "" {
startTime, err := strconv.ParseInt(startTimeStr, 10, 64)
startTime, err = strconv.ParseInt(startTimeStr, 10, 64)
if err != nil {
return nil, nil, err
}
query.StartTime = &startTime
}
endTimeStr := r.URL.Query().Get("endTime")
var endTime int64
if endTimeStr != "" {
endTime, err := strconv.ParseInt(endTimeStr, 10, 64)
endTime, err = strconv.ParseInt(endTimeStr, 10, 64)
if err != nil {
return nil, nil, err
}
query.EndTime = &endTime
}
var cursor *pb.Index
senderTimeStr := r.URL.Query().Get("senderTime")
storeTimeStr := r.URL.Query().Get("storeTime")
digestStr := r.URL.Query().Get("digest")
if senderTimeStr != "" || storeTimeStr != "" || digestStr != "" {
cursor = &pb.Index{}
if senderTimeStr != "" {
cursor.SenderTime, err = strconv.ParseInt(senderTimeStr, 10, 64)
if err != nil {
return nil, nil, err
}
var cursor []byte
cursorStr := r.URL.Query().Get("cursor")
if cursorStr != "" {
cursor, err = base64.URLEncoding.DecodeString(cursorStr)
if err != nil {
return nil, nil, err
}
if storeTimeStr != "" {
cursor.ReceiverTime, err = strconv.ParseInt(storeTimeStr, 10, 64)
if err != nil {
return nil, nil, err
}
}
if digestStr != "" {
cursor.Digest, err = base64.URLEncoding.DecodeString(digestStr)
if err != nil {
return nil, nil, err
}
}
cursor.PubsubTopic = query.PubsubTopic
options = append(options, legacy_store.WithCursor(cursor))
options = append(options, store.WithCursor(cursor))
}
pageSizeStr := r.URL.Query().Get("pageSize")
@ -156,43 +144,30 @@ func getStoreParams(r *http.Request) (*legacy_store.Query, []legacy_store.Histor
}
}
options = append(options, legacy_store.WithPaging(ascending, pageSize))
options = append(options, store.WithPaging(ascending, pageSize))
}
var query store.Criteria
if isMsgHashCriteria {
query = store.MessageHashCriteria{
MessageHashes: hashes,
}
} else {
query = store.FilterCriteria{
ContentFilter: protocol.NewContentFilter(pubsubTopic, contentTopicsArr...),
TimeStart: &startTime,
TimeEnd: &endTime,
}
}
return query, options, nil
}
func writeStoreError(w http.ResponseWriter, code int, err error) {
writeResponse(w, StoreResponse{ErrorMessage: err.Error()}, code)
writeResponse(w, &storepb.StoreQueryResponse{StatusCode: proto.Uint32(uint32(code)), StatusDesc: proto.String(err.Error())}, code)
}
func toStoreResponse(result *legacy_store.Result) StoreResponse {
response := StoreResponse{}
cursor := result.Cursor()
if cursor != nil {
response.Cursor = &HistoryCursor{
PubsubTopic: cursor.PubsubTopic,
SenderTime: fmt.Sprintf("%d", cursor.SenderTime),
StoreTime: fmt.Sprintf("%d", cursor.ReceiverTime),
Digest: cursor.Digest,
}
}
for _, m := range result.Messages {
response.Messages = append(response.Messages, StoreWakuMessage{
Payload: m.Payload,
ContentTopic: m.ContentTopic,
Version: m.Version,
Timestamp: m.Timestamp,
Meta: m.Meta,
})
}
return response
}
func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) {
func (d *StoreQueryService) getV3Messages(w http.ResponseWriter, r *http.Request) {
query, options, err := getStoreParams(r)
if err != nil {
writeStoreError(w, http.StatusBadRequest, err)
@ -202,11 +177,11 @@ func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
result, err := d.node.LegacyStore().Query(ctx, *query, options...)
result, err := d.node.Store().Request(ctx, query, options...)
if err != nil {
writeStoreError(w, http.StatusInternalServerError, err)
writeLegacyStoreError(w, http.StatusInternalServerError, err)
return
}
writeErrOrResponse(w, nil, toStoreResponse(result))
writeErrOrResponse(w, nil, result.Response())
}

View File

@ -1,203 +1,135 @@
openapi: 3.0.3
info:
title: Waku V2 node Store REST API
version: 1.0.0
contact:
name: VAC Team
url: https://forum.vac.dev/
tags:
- name: store
description: Store REST API for WakuV2 node
paths:
/store/v1/messages:
get:
summary: Gets message history
# /store/v3/messages:
get:
summary: Gets message history
description: >
Retrieves Waku message history. The returned history
can be potentially filtered by optional request parameters.
operationId: getMessageHistory
tags:
- store
parameters:
- name: peerAddr
in: query
schema:
type: string
required: true
description: >
Retrieves WakuV2 message history. The returned history
can be potentially filtered by optional request parameters.
operationId: getMessageHistory
tags:
- store
parameters:
- name: peerAddr
in: query
P2P fully qualified peer multiaddress
in the format `(ip4|ip6)/tcp/p2p/$peerId` and URL-encoded.
example: '%2Fip4%2F127.0.0.1%2Ftcp%2F60001%2Fp2p%2F16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN'
- name: includeData
in: query
schema:
type: string
description: >
Boolean indicating if the query should return messages (data) or hashes only.
A value of 'false' returns hashes only.
A value of 'true' returns hashes AND messages.
Default value is 'false'
example: 'true'
- name: pubsubTopic
in: query
schema:
type: string
description: >
The pubsub topic on which a WakuMessage is published.
If left empty, no filtering is applied.
It is also intended for pagination purposes.
It should be a URL-encoded string.
example: 'my%20pubsub%20topic'
- name: contentTopics
in: query
schema: string
description: >
Comma-separated list of content topics. When specified,
only WakuMessages that are linked to any of the given
content topics will be delivered in the get response.
It should be a URL-encoded-comma-separated string.
example: 'my%20first%20content%20topic%2Cmy%20second%20content%20topic%2Cmy%20third%20content%20topic'
- name: startTime
in: query
schema:
type: string
description: >
The inclusive lower bound on the timestamp of
queried WakuMessages. This field holds the
Unix epoch time in nanoseconds as a 64-bits
integer value.
example: '1680590945000000000'
- name: endTime
in: query
schema:
type: string
description: >
The inclusive upper bound on the timestamp of
queried WakuMessages. This field holds the
Unix epoch time in nanoseconds as a 64-bits
integer value.
example: '1680590945000000000'
- name: hashes
in: query
schema:
type: string
description: >
Comma-separated list of message hashes.
URL-base64-encoded string computed as a hash of messages.
Used to find messages by hash.
example: 'Gc4ACThW5t2QQO82huq3WnDv%2FapPPJpD%2FwJfxDxAnR0%3D'
- name: cursor
in: query
schema:
type: string
description: >
Cursor field intended for pagination purposes.
URL-base64-encoded string computed as a hash of a message.
It could be empty for retrieving the first page,
and will be returned from the GET response so that
it can be part of the next page request.
example: 'Gc4ACThW5t2QQO82huq3WnDv%2FapPPJpD%2FwJfxDxAnR0%3D'
- name: pageSize
in: query
schema:
type: string
description: >
Number of messages to retrieve per page
example: '5'
- name: ascending
in: query
schema:
type: string
description: >
"true" for paging forward, "false" for paging backward.
If not specified or if specified with an invalid value, the default is "true".
example: "true"
responses:
'200':
description: Waku message history.
content:
application/json:
schema:
type: string
required: true
description: >
P2P fully qualified peer multiaddress
in the format `(ip4|ip6)/tcp/p2p/$peerId` and URL-encoded.
example: '%2Fip4%2F127.0.0.1%2Ftcp%2F60001%2Fp2p%2F16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN'
- name: pubsubTopic
in: query
schema:
type: string
description: >
The pubsub topic on which a WakuMessage is published.
If left empty, no filtering is applied.
It is also intended for pagination purposes.
It should be a URL-encoded string.
example: 'my%20pubsub%20topic'
- name: contentTopics
in: query
schema: string
description: >
Comma-separated list of content topics. When specified,
only WakuMessages that are linked to any of the given
content topics will be delivered in the get response.
It should be a URL-encoded-comma-separated string.
example: 'my%20first%20content%20topic%2Cmy%20second%20content%20topic%2Cmy%20third%20content%20topic'
- name: startTime
in: query
schema:
type: string
description: >
The inclusive lower bound on the timestamp of
queried WakuMessages. This field holds the
Unix epoch time in nanoseconds as a 64-bits
integer value.
example: '1680590945000000000'
- name: endTime
in: query
schema:
type: string
description: >
The inclusive upper bound on the timestamp of
queried WakuMessages. This field holds the
Unix epoch time in nanoseconds as a 64-bits
integer value.
example: '1680590945000000000'
- name: senderTime
in: query
schema:
type: string
description: >
Cursor field intended for pagination purposes.
Represents the Unix time in nanoseconds at which a message was generated.
It could be empty for retrieving the first page,
and will be returned from the GET response so that
it can be part of the next page request.
example: '1680590947000000000'
- name: storeTime
in: query
schema:
type: string
description: >
Cursor field intended for pagination purposes.
Represents the Unix time in nanoseconds at which a message was stored.
It could be empty for retrieving the first page,
and will be returned from the GET response so that
it can be part of the next page request.
example: '1680590945000000000'
- name: digest
in: query
schema:
type: string
description: >
Cursor field intended for pagination purposes.
URL-base64-encoded string computed as a hash of the
a message content topic plus a message payload.
It could be empty for retrieving the first page,
and will be returned from the GET response so that
it can be part of the next page request.
example: 'Gc4ACThW5t2QQO82huq3WnDv%2FapPPJpD%2FwJfxDxAnR0%3D'
- name: pageSize
in: query
schema:
type: string
description: >
Number of messages to retrieve per page
example: '5'
- name: ascending
in: query
schema:
type: string
description: >
"true" for paging forward, "false" for paging backward
example: "true"
responses:
'200':
description: WakuV2 message history.
content:
application/json:
schema:
$ref: '#/components/schemas/StoreResponse'
'400':
description: Bad request error.
content:
text/plain:
type: string
'412':
description: Precondition failed.
content:
text/plain:
type: string
'500':
description: Internal server error.
content:
text/plain:
type: string
components:
schemas:
StoreResponse:
type: object
properties:
messages:
type: array
items:
$ref: '#/components/schemas/WakuMessage'
cursor:
$ref: '#/components/schemas/HistoryCursor'
error_message:
$ref: '#/components/schemas/StoreQueryResponse'
'400':
description: Bad request error.
content:
text/plain:
type: string
required:
- messages
HistoryCursor:
type: object
properties:
pubsub_topic:
'412':
description: Precondition failed.
content:
text/plain:
type: string
sender_time:
type: string
store_time:
type: string
digest:
type: string
required:
- pubsub_topic
- sender_time
- store_time
- digest
WakuMessage:
type: object
properties:
payload:
type: string
content_topic:
type: string
version:
type: integer
format: int32
timestamp:
type: integer
format: int64
ephemeral:
type: boolean
required:
- payload
- content_topic
'500':
description: Internal server error.
content:
text/plain:
type: string

View File

@ -51,7 +51,8 @@ func NewWakuRest(node *node.WakuNode, config RestConfig, log *zap.Logger) *WakuR
_ = NewDebugService(node, mux)
_ = NewHealthService(node, mux)
_ = NewStoreService(node, mux)
_ = NewStoreQueryService(node, mux)
_ = NewLegacyStoreService(node, mux)
_ = NewLightpushService(node, mux, log)
listenAddr := fmt.Sprintf("%s:%d", config.Address, config.Port)

View File

@ -177,11 +177,12 @@ func (s *WakuStore) Request(ctx context.Context, criteria Criteria, opts ...Requ
}
result := &Result{
store: s,
messages: response.Messages,
storeRequest: storeRequest,
peerID: params.selectedPeer,
cursor: response.PaginationCursor,
store: s,
messages: response.Messages,
storeRequest: storeRequest,
storeResponse: response,
peerID: params.selectedPeer,
cursor: response.PaginationCursor,
}
return result, nil
@ -213,12 +214,13 @@ func (s *WakuStore) Exists(ctx context.Context, messageHash wpb.MessageHash, opt
func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) {
if r.IsComplete() {
return &Result{
store: s,
started: true,
messages: []*pb.WakuMessageKeyValue{},
cursor: nil,
storeRequest: r.storeRequest,
peerID: r.PeerID(),
store: s,
started: true,
messages: []*pb.WakuMessageKeyValue{},
cursor: nil,
storeRequest: r.storeRequest,
storeResponse: r.storeResponse,
peerID: r.PeerID(),
}, nil
}
@ -232,12 +234,13 @@ func (s *WakuStore) next(ctx context.Context, r *Result) (*Result, error) {
}
result := &Result{
started: true,
store: s,
messages: response.Messages,
storeRequest: storeRequest,
peerID: r.PeerID(),
cursor: response.PaginationCursor,
started: true,
store: s,
messages: response.Messages,
storeRequest: storeRequest,
storeResponse: response,
peerID: r.PeerID(),
cursor: response.PaginationCursor,
}
return result, nil

View File

@ -9,12 +9,13 @@ import (
// Result represents a valid response from a store node
type Result struct {
started bool
messages []*pb.WakuMessageKeyValue
store *WakuStore
storeRequest *pb.StoreQueryRequest
cursor []byte
peerID peer.ID
started bool
messages []*pb.WakuMessageKeyValue
store *WakuStore
storeRequest *pb.StoreQueryRequest
storeResponse *pb.StoreQueryResponse
cursor []byte
peerID peer.ID
}
func (r *Result) Cursor() []byte {
@ -33,6 +34,10 @@ func (r *Result) Query() *pb.StoreQueryRequest {
return r.storeRequest
}
func (r *Result) Response() *pb.StoreQueryResponse {
return r.storeResponse
}
func (r *Result) Next(ctx context.Context) (bool, error) {
if !r.started {
r.started = true