chore: remove REST API

This commit is contained in:
Prem Chaitanya Prathi 2024-02-21 11:16:56 +05:30
parent d65a836bb6
commit e399ea70a4
No known key found for this signature in database
33 changed files with 0 additions and 3708 deletions

View File

@ -479,47 +479,6 @@ var (
Destination: &options.Metrics.Port,
EnvVars: []string{"WAKUNODE2_METRICS_SERVER_PORT"},
})
RESTFlag = altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "rest",
Usage: "Enable Waku REST HTTP server",
Destination: &options.RESTServer.Enable,
EnvVars: []string{"WAKUNODE2_REST"},
})
RESTAddress = altsrc.NewStringFlag(&cli.StringFlag{
Name: "rest-address",
Value: "127.0.0.1",
Usage: "Listening address of the REST HTTP server",
Destination: &options.RESTServer.Address,
EnvVars: []string{"WAKUNODE2_REST_ADDRESS"},
})
RESTPort = altsrc.NewIntFlag(&cli.IntFlag{
Name: "rest-port",
Value: 8645,
Usage: "Listening port of the REST HTTP server",
Destination: &options.RESTServer.Port,
EnvVars: []string{"WAKUNODE2_REST_PORT"},
})
RESTRelayCacheCapacity = altsrc.NewIntFlag(&cli.IntFlag{
Name: "rest-relay-cache-capacity",
Value: 1000,
Usage: "Capacity of the Relay REST API message cache",
Destination: &options.RESTServer.RelayCacheCapacity,
EnvVars: []string{"WAKUNODE2_REST_RELAY_CACHE_CAPACITY"},
})
RESTFilterCacheCapacity = altsrc.NewIntFlag(&cli.IntFlag{
Name: "rest-filter-cache-capacity",
Value: 30,
Usage: "Capacity of the Filter REST API message cache",
Destination: &options.RESTServer.FilterCacheCapacity,
EnvVars: []string{"WAKUNODE2_REST_FILTER_CACHE_CAPACITY"},
})
RESTAdmin = altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "rest-admin",
Value: false,
Usage: "Enable access to REST HTTP Admin API",
Destination: &options.RESTServer.Admin,
EnvVars: []string{"WAKUNODE2_REST_ADMIN"},
})
PProf = altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "pprof",
Usage: "provides runtime profiling data at /debug/pprof in both REST and RPC servers if they're enabled",

View File

@ -87,12 +87,6 @@ func main() {
MetricsServer,
MetricsServerAddress,
MetricsServerPort,
RESTFlag,
RESTAddress,
RESTPort,
RESTRelayCacheCapacity,
RESTFilterCacheCapacity,
RESTAdmin,
PProf,
}

View File

@ -39,7 +39,6 @@ import (
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoreds" // nolint: staticcheck
ws "github.com/libp2p/go-libp2p/p2p/transport/websocket"
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/cmd/waku/server/rest"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/metrics"
"github.com/waku-org/go-waku/waku/persistence"
@ -388,20 +387,6 @@ func Execute(options NodeOptions) error {
}
}
var restServer *rest.WakuRest
if options.RESTServer.Enable {
wg.Add(1)
restConfig := rest.RestConfig{Address: options.RESTServer.Address,
Port: uint(options.RESTServer.Port),
EnablePProf: options.PProf,
EnableAdmin: options.RESTServer.Admin,
RelayCacheCapacity: uint(options.RESTServer.RelayCacheCapacity),
FilterCacheCapacity: uint(options.RESTServer.FilterCacheCapacity)}
restServer = rest.NewWakuRest(wakuNode, restConfig, logger)
restServer.Start(ctx, &wg)
}
wg.Wait()
logger.Info("Node setup complete")
@ -414,12 +399,6 @@ func Execute(options NodeOptions) error {
// shut the node down
wakuNode.Stop()
if options.RESTServer.Enable {
if err := restServer.Stop(ctx); err != nil {
return err
}
}
if options.Metrics.Enable {
if err = metricsServer.Stop(ctx); err != nil {
return err

View File

@ -99,16 +99,6 @@ type MetricsOptions struct {
Port int
}
// RESTServerOptions are settings used to start a rest http server
type RESTServerOptions struct {
Enable bool
Port int
Address string
Admin bool
RelayCacheCapacity int
FilterCacheCapacity int
}
// WSOptions are settings used for enabling websockets and secure websockets
// support
type WSOptions struct {
@ -174,5 +164,4 @@ type NodeOptions struct {
DNSDiscovery DNSDiscoveryOptions
Rendezvous RendezvousOptions
Metrics MetricsOptions
RESTServer RESTServerOptions
}

View File

@ -1,13 +0,0 @@
//go:build gowaku_no_rln
// +build gowaku_no_rln
package server
import (
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
)
func AppendRLNProof(node *node.WakuNode, msg *pb.WakuMessage) error {
return nil
}

View File

@ -1,135 +0,0 @@
package rest
import (
"encoding/json"
"net/http"
"github.com/go-chi/chi/v5"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
ma "github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/cmd/waku/server"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/peerstore"
waku_proto "github.com/waku-org/go-waku/waku/v2/protocol"
"go.uber.org/zap"
)
type AdminService struct {
node *node.WakuNode
mux *chi.Mux
log *zap.Logger
}
type WakuPeer struct {
ID string `json:"id"`
MultiAddrs []string `json:"multiaddrs"`
Protocols []string `json:"protocols"`
Connected bool `json:"connected"`
PubsubTopics []string `json:"pubsubTopics"`
}
type WakuPeerInfo struct {
MultiAddr string `json:"multiaddr"`
Shards []int `json:"shards"`
Protocols []string `json:"protocols"`
}
const routeAdminV1Peers = "/admin/v1/peers"
func NewAdminService(node *node.WakuNode, m *chi.Mux, log *zap.Logger) *AdminService {
d := &AdminService{
node: node,
mux: m,
log: log,
}
m.Get(routeAdminV1Peers, d.getV1Peers)
m.Post(routeAdminV1Peers, d.postV1Peer)
return d
}
func (a *AdminService) getV1Peers(w http.ResponseWriter, req *http.Request) {
peers, err := a.node.Peers()
if err != nil {
a.log.Error("failed to fetch peers", zap.Error(err))
writeErrOrResponse(w, err, nil)
return
}
a.log.Info("fetched peers", zap.Int("count", len(peers)))
response := make([]WakuPeer, 0)
for _, peer := range peers {
if peer.ID.String() == a.node.Host().ID().String() {
//Skip own node id
continue
}
wPeer := WakuPeer{
ID: peer.ID.String(),
Connected: peer.Connected,
}
for _, addr := range peer.Addrs {
wPeer.MultiAddrs = append(wPeer.MultiAddrs, addr.String())
}
for _, proto := range peer.Protocols {
if !server.IsWakuProtocol(proto) {
a.log.Debug("skipping protocol as it is a non-waku protocol", logging.HostID("peer", peer.ID), zap.String("protocol", string(proto)))
continue
}
wPeer.Protocols = append(wPeer.Protocols, string(proto))
}
wPeer.PubsubTopics = peer.PubsubTopics
response = append(response, wPeer)
}
writeErrOrResponse(w, nil, response)
}
func (a *AdminService) postV1Peer(w http.ResponseWriter, req *http.Request) {
var pInfo WakuPeerInfo
var topics []string
var protos []protocol.ID
decoder := json.NewDecoder(req.Body)
if err := decoder.Decode(&pInfo); err != nil {
a.log.Error("failed to decode request", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
defer req.Body.Close()
addr, err := ma.NewMultiaddr(pInfo.MultiAddr)
if err != nil {
a.log.Error("building multiaddr", zap.Error(err))
writeErrOrResponse(w, err, nil)
return
}
for _, shard := range pInfo.Shards {
topic := waku_proto.NewStaticShardingPubsubTopic(a.node.ClusterID(), uint16(shard))
topics = append(topics, topic.String())
}
for _, proto := range pInfo.Protocols {
protos = append(protos, protocol.ID(proto))
}
id, err := a.node.AddPeer(addr, peerstore.Static, topics, protos...)
if err != nil {
a.log.Error("failed to add peer", zap.Error(err))
writeErrOrResponse(w, err, nil)
return
}
a.log.Info("add peer successful", logging.HostID("peerID", id))
pi := peer.AddrInfo{ID: id, Addrs: []ma.Multiaddr{addr}}
err = a.node.Host().Connect(req.Context(), pi)
if err != nil {
a.log.Error("failed to connect to peer", logging.HostID("peerID", id), zap.Error(err))
writeErrOrResponse(w, err, nil)
return
}
writeErrOrResponse(w, nil, nil)
}

View File

@ -1,92 +0,0 @@
openapi: 3.0.3
info:
title: Waku V2 node REST API
version: 1.0.0
contact:
name: VAC Team
url: https://forum.vac.dev/
tags:
- name: admin
description: Admin REST API for WakuV2 node
paths:
/admin/v1/peers:
get:
summary: Get connected peers info
description: Retrieve information about connected peers.
operationId: getPeerInfo
tags:
- admin
responses:
'200':
description: Information about a Waku v2 node.
content:
application/json:
schema:
type: array
items:
$ref: '#/components/schemas/WakuPeer'
'5XX':
description: Unexpected error.
post:
summary: Adds new peer(s) to connect with
description: Adds new peer(s) to connect with.
operationId: postPeerInfo
tags:
- admin
requestBody:
content:
application/json:
schema:
type: object
items:
$ref: '#/components/schemas/WakuPeerInfo'
responses:
'200':
description: Ok
'400':
description: Cannot connect to one or more peers.
'5XX':
description: Unexpected error.
components:
schemas:
WakuPeerInfo:
type: object
required:
- multiaddr
- shards
- protocols
protocols:
type: array
items:
type: string
shards:
type: array
items:
type: integer
WakuPeer:
type: object
required:
- id
- addrs
- protocols
- connected
properties:
connected:
type: string
addrs:
type: array
items:
type: string
protocols:
type: array
items:
type: string
connected:
type: boolean
pubsubTopics:
type: array
items:
type: string

View File

@ -1,52 +0,0 @@
package rest
import (
"net/http"
"github.com/go-chi/chi/v5"
"github.com/waku-org/go-waku/waku/v2/node"
)
type DebugService struct {
node *node.WakuNode
mux *chi.Mux
}
type InfoArgs struct {
}
type InfoReply struct {
ENRUri string `json:"enrUri,omitempty"`
ListenAddresses []string `json:"listenAddresses,omitempty"`
}
const routeDebugInfoV1 = "/debug/v1/info"
const routeDebugVersionV1 = "/debug/v1/version"
func NewDebugService(node *node.WakuNode, m *chi.Mux) *DebugService {
d := &DebugService{
node: node,
mux: m,
}
m.Get(routeDebugInfoV1, d.getV1Info)
m.Get(routeDebugVersionV1, d.getV1Version)
return d
}
type VersionResponse string
func (d *DebugService) getV1Info(w http.ResponseWriter, req *http.Request) {
response := new(InfoReply)
response.ENRUri = d.node.ENR().String()
for _, addr := range d.node.ListenAddresses() {
response.ListenAddresses = append(response.ListenAddresses, addr.String())
}
writeErrOrResponse(w, nil, response)
}
func (d *DebugService) getV1Version(w http.ResponseWriter, req *http.Request) {
response := VersionResponse(node.GetVersionInfo().String())
writeErrOrResponse(w, nil, response)
}

View File

@ -1,43 +0,0 @@
openapi: 3.0.3
info:
title: Waku V2 node Debug REST API
version: 1.0.0
contact:
name: VAC Team
url: https://forum.vac.dev/
tags:
- name: debug
description: Debug REST API for WakuV2 node
paths:
/debug/v1/info:
get:
summary: Get node info
description: Retrieve information about a Waku v2 node.
operationId: getNodeInfo
tags:
- debug
responses:
'200':
description: Information about a Waku v2 node.
content:
application/json:
schema:
$ref: '#/components/schemas/WakuInfo'
'5XX':
description: Unexpected error.
components:
schemas:
WakuInfo:
type: object
properties:
listenAddresses:
type: array
items:
type: string
enrUri:
type: string
required:
- listenAddresses

View File

@ -1,33 +0,0 @@
package rest
import (
"bytes"
"context"
"net/http"
"net/http/httptest"
"testing"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/waku/v2/node"
)
func TestGetV1Info(t *testing.T) {
wakuNode1, err := node.New()
require.NoError(t, err)
defer wakuNode1.Stop()
err = wakuNode1.Start(context.Background())
require.NoError(t, err)
d := &DebugService{
node: wakuNode1,
}
request, err := http.NewRequest(http.MethodPost, routeDebugInfoV1, bytes.NewReader([]byte("")))
require.NoError(t, err)
rr := httptest.NewRecorder()
d.getV1Info(rr, request)
require.Equal(t, http.StatusOK, rr.Code)
}

View File

@ -1,380 +0,0 @@
package rest
import (
"context"
"encoding/json"
"fmt"
"net/http"
"github.com/go-chi/chi/v5"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"go.uber.org/zap"
)
const filterV2Subscriptions = "/filter/v2/subscriptions"
const filterv2Messages = "/filter/v2/messages"
// FilterService represents the REST service for Filter client
type FilterService struct {
node *node.WakuNode
cancel context.CancelFunc
log *zap.Logger
cache *filterCache
runner *runnerService
}
// Start starts the RelayService
func (s *FilterService) Start(ctx context.Context) {
for _, sub := range s.node.FilterLightnode().Subscriptions() {
s.cache.subscribe(sub.ContentFilter)
}
ctx, cancel := context.WithCancel(ctx)
s.cancel = cancel
s.runner.Start(ctx)
}
// Stop stops the RelayService
func (r *FilterService) Stop() {
if r.cancel == nil {
return
}
r.cancel()
}
// NewFilterService returns an instance of FilterService
func NewFilterService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *zap.Logger) *FilterService {
logger := log.Named("filter")
s := &FilterService{
node: node,
log: logger,
cache: newFilterCache(cacheCapacity, logger),
}
m.Route(filterV2Subscriptions, func(r chi.Router) {
r.Get("/", s.ping)
r.Get("/{requestId}", s.ping)
r.Post("/", s.subscribe)
r.Delete("/", s.unsubscribe)
r.Delete("/all", s.unsubscribeAll)
})
m.Route(filterv2Messages, func(r chi.Router) {
r.Get("/{contentTopic}", s.getMessagesByContentTopic)
r.Get("/{pubsubTopic}/{contentTopic}", s.getMessagesByPubsubTopic)
})
s.runner = newRunnerService(node.Broadcaster(), s.cache.addMessage)
return s
}
func convertFilterErrorToHttpStatus(err error) (int, string) {
code := http.StatusInternalServerError
statusDesc := "ping request failed"
filterErrorCode := filter.ExtractCodeFromFilterError(err.Error())
switch filterErrorCode {
case 404:
code = http.StatusNotFound
statusDesc = "peer has no subscription"
case 300:
case 400:
code = http.StatusBadRequest
statusDesc = "bad request format"
case 504:
code = http.StatusGatewayTimeout
case 503:
code = http.StatusServiceUnavailable
}
return code, statusDesc
}
// 400 for bad requestId
// 404 when request failed or no suitable peers
// 200 when ping successful
func (s *FilterService) ping(w http.ResponseWriter, req *http.Request) {
requestID := chi.URLParam(req, "requestId")
if requestID == "" {
writeResponse(w, &filterSubscriptionResponse{
RequestID: requestID,
StatusDesc: "bad request id",
}, http.StatusBadRequest)
return
}
// selecting random peer that supports filter protocol
peerId := s.getRandomFilterPeer(req.Context(), requestID, w)
if peerId == "" {
return
}
if err := s.node.FilterLightnode().Ping(req.Context(), peerId, filter.WithPingRequestId([]byte(requestID))); err != nil {
s.log.Error("ping request failed", zap.Error(err))
code, statusDesc := convertFilterErrorToHttpStatus(err)
writeResponse(w, &filterSubscriptionResponse{
RequestID: requestID,
StatusDesc: statusDesc,
}, code)
return
}
// success
writeResponse(w, &filterSubscriptionResponse{
RequestID: requestID,
StatusDesc: http.StatusText(http.StatusOK),
}, http.StatusOK)
}
// same for FilterUnsubscribeRequest
type filterSubscriptionRequest struct {
RequestID string `json:"requestId"`
ContentFilters []string `json:"contentFilters"`
PubsubTopic string `json:"pubsubTopic"`
}
type filterSubscriptionResponse struct {
RequestID string `json:"requestId"`
StatusDesc string `json:"statusDesc"`
}
// 400 on invalid request
// 404 on failed subscription
// 200 on single returned successful subscription
// NOTE: subscribe on filter client randomly selects a peer if missing for given pubSubTopic
func (s *FilterService) subscribe(w http.ResponseWriter, req *http.Request) {
message := filterSubscriptionRequest{}
if !s.readBody(w, req, &message) {
return
}
contentFilter := protocol.NewContentFilter(message.PubsubTopic, message.ContentFilters...)
//
subscriptions, err := s.node.FilterLightnode().Subscribe(req.Context(),
contentFilter,
filter.WithRequestID([]byte(message.RequestID)))
// on partial subscribe failure
if len(subscriptions) > 0 && err != nil {
s.log.Error("partial subscribe failed", zap.Error(err))
// on partial failure
writeResponse(w, filterSubscriptionResponse{
RequestID: message.RequestID,
StatusDesc: err.Error(),
}, http.StatusOK)
}
if err != nil {
s.log.Error("subscription failed", zap.Error(err))
code := filter.ExtractCodeFromFilterError(err.Error())
if code == -1 {
code = http.StatusBadRequest
}
writeResponse(w, filterSubscriptionResponse{
RequestID: message.RequestID,
StatusDesc: "subscription failed",
}, code)
return
}
// on success
s.cache.subscribe(contentFilter)
writeResponse(w, filterSubscriptionResponse{
RequestID: message.RequestID,
StatusDesc: http.StatusText(http.StatusOK),
}, http.StatusOK)
}
// 400 on invalid request
// 500 on failed subscription
// 200 on successful unsubscribe
// NOTE: unsubscribe on filter client will remove subscription from all peers with matching pubSubTopic, if peerId is not provided
// to match functionality in nwaku, we will randomly select a peer that supports filter protocol.
func (s *FilterService) unsubscribe(w http.ResponseWriter, req *http.Request) {
message := filterSubscriptionRequest{} // as pubSubTopics can also be present
if !s.readBody(w, req, &message) {
return
}
peerId := s.getRandomFilterPeer(req.Context(), message.RequestID, w)
if peerId == "" {
return
}
contentFilter := protocol.NewContentFilter(message.PubsubTopic, message.ContentFilters...)
// unsubscribe on filter
result, err := s.node.FilterLightnode().Unsubscribe(
req.Context(),
contentFilter,
filter.WithRequestID([]byte(message.RequestID)),
filter.WithPeer(peerId),
)
if err != nil {
s.log.Error("unsubscribe failed", zap.Error(err))
if result == nil {
writeResponse(w, filterSubscriptionResponse{
RequestID: message.RequestID,
StatusDesc: err.Error(),
}, http.StatusBadRequest)
}
writeResponse(w, filterSubscriptionResponse{
RequestID: message.RequestID,
StatusDesc: err.Error(),
}, http.StatusServiceUnavailable)
return
}
// on success
for cTopic := range contentFilter.ContentTopics {
if !s.node.FilterLightnode().IsListening(contentFilter.PubsubTopic, cTopic) {
s.cache.unsubscribe(contentFilter.PubsubTopic, cTopic)
}
}
writeResponse(w, filterSubscriptionResponse{
RequestID: message.RequestID,
StatusDesc: s.unsubscribeGetMessage(result),
}, http.StatusOK)
}
func (s *FilterService) unsubscribeGetMessage(result *filter.WakuFilterPushResult) string {
if result == nil {
return http.StatusText(http.StatusOK)
}
var peerIds string
ind := 0
for _, entry := range result.Errors() {
if entry.Err != nil {
s.log.Error("can't unsubscribe", logging.HostID("peer", entry.PeerID), zap.Error(entry.Err))
if ind != 0 {
peerIds += ", "
}
peerIds += entry.PeerID.String()
}
ind++
}
if peerIds != "" {
return "can't unsubscribe from " + peerIds
}
return http.StatusText(http.StatusOK)
}
type filterUnsubscribeAllRequest struct {
RequestID string `json:"requestId"`
}
func (s *FilterService) readBody(w http.ResponseWriter, req *http.Request, message interface{}) bool {
decoder := json.NewDecoder(req.Body)
if err := decoder.Decode(message); err != nil {
s.log.Error("bad request", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return false
}
defer req.Body.Close()
return true
}
// 400 on invalid request
// 500 on failed subscription
// 200 on all successful unsubscribe
// unsubscribe all subscriptions for a given peer
func (s *FilterService) unsubscribeAll(w http.ResponseWriter, req *http.Request) {
message := filterUnsubscribeAllRequest{}
if !s.readBody(w, req, &message) {
return
}
peerId := s.getRandomFilterPeer(req.Context(), message.RequestID, w)
if peerId == "" {
return
}
// unsubscribe all subscriptions for a given peer
errCh, err := s.node.FilterLightnode().UnsubscribeAll(
req.Context(),
filter.WithRequestID([]byte(message.RequestID)),
filter.WithPeer(peerId),
)
if err != nil {
s.log.Error("unsubscribeAll failed", zap.Error(err))
writeResponse(w, filterSubscriptionResponse{
RequestID: message.RequestID,
StatusDesc: err.Error(),
}, http.StatusServiceUnavailable)
return
}
// on success
writeResponse(w, filterSubscriptionResponse{
RequestID: message.RequestID,
StatusDesc: s.unsubscribeGetMessage(errCh),
}, http.StatusOK)
}
func (s FilterService) getRandomFilterPeer(ctx context.Context, requestId string, w http.ResponseWriter) peer.ID {
// selecting random peer that supports filter protocol
peerIds, err := s.node.PeerManager().SelectPeers(peermanager.PeerSelectionCriteria{
SelectionType: peermanager.Automatic,
Proto: filter.FilterSubscribeID_v20beta1,
Ctx: ctx,
})
if err != nil {
s.log.Error("selecting peer", zap.Error(err))
writeResponse(w, filterSubscriptionResponse{
RequestID: requestId,
StatusDesc: "No suitable peers",
}, http.StatusServiceUnavailable)
return ""
}
return peerIds[0]
}
func (s *FilterService) getMessagesByContentTopic(w http.ResponseWriter, req *http.Request) {
contentTopic := topicFromPath(w, req, "contentTopic", s.log)
if contentTopic == "" {
return
}
pubsubTopic, err := protocol.GetPubSubTopicFromContentTopic(contentTopic)
if err != nil {
writeGetMessageErr(w, fmt.Errorf("bad content topic"), http.StatusBadRequest, s.log)
return
}
s.getMessages(w, req, pubsubTopic, contentTopic)
}
func (s *FilterService) getMessagesByPubsubTopic(w http.ResponseWriter, req *http.Request) {
contentTopic := topicFromPath(w, req, "contentTopic", s.log)
if contentTopic == "" {
return
}
pubsubTopic := topicFromPath(w, req, "pubsubTopic", s.log)
if pubsubTopic == "" {
return
}
s.getMessages(w, req, pubsubTopic, contentTopic)
}
// 400 on invalid request
// 500 on failed subscription
// 200 on all successful unsubscribe
// unsubscribe all subscriptions for a given peer
func (s *FilterService) getMessages(w http.ResponseWriter, req *http.Request, pubsubTopic, contentTopic string) {
msgs, err := s.cache.getMessages(pubsubTopic, contentTopic)
if err != nil {
writeGetMessageErr(w, err, http.StatusNotFound, s.log)
return
}
writeResponse(w, msgs, http.StatusOK)
}

View File

@ -1,337 +0,0 @@
openapi: 3.0.3
info:
title: Waku V2 node REST API
version: 1.0.0
contact:
name: VAC Team
url: https://forum.vac.dev/
tags:
- name: filter
description: Filter REST API for WakuV2 node
paths:
/filter/v2/subscriptions/{requestId}:
get: # get_waku_v2_filter_v2_subscription - ping
summary: Subscriber-ping - a peer can query if there is a registered subscription for it
description: |
Subscriber peer can query its subscription existence on service node.
Returns HTTP200 if exists and HTTP404 if not.
Client must not fill anything but requestId in the request body.
operationId: subscriberPing
tags:
- filter
parameters:
- in: path
name: requestId
required: true
schema:
type: string
description: Id of ping request
responses:
'200':
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'400':
description: Bad request.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'404':
description: Not found.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'5XX':
description: Unexpected error.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
/filter/v2/subscriptions:
post: # post_waku_v2_filter_v2_subscription
summary: Subscribe a peer to an array of content topics under a pubsubTopic
description: |
Subscribe a peer to an array of content topics under a pubsubTopic.
It is allowed to refresh or add new content topic to an existing subscription.
Fields pubsubTopic and contentFilters must be filled.
operationId: postSubscriptions
tags:
- filter
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscribeRequest'
responses:
'200':
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
# TODO: Review the possible errors of this endpoint
'400':
description: Bad request.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'404':
description: Not found.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'5XX':
description: Unexpected error.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
delete: # delete_waku_v2_filter_v2_subscription
summary: Unsubscribe a peer from content topics
description: |
Unsubscribe a peer from content topics
Only that subscription will be removed which matches existing.
operationId: deleteSubscriptions
tags:
- filter
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/FilterUnsubscribeRequest'
responses:
'200':
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'400':
description: Bad request.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'404':
description: Not found.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'5XX':
description: Unexpected error.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
/filter/v2/subscriptions/all:
delete: # delete_waku_v2_filter_v2_subscription
summary: Unsubscribe a peer from all content topics
description: |
Unsubscribe a peer from all content topics
operationId: deleteAllSubscriptions
tags:
- filter
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/FilterUnsubscribeAllRequest'
responses:
'200':
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'400':
description: Bad request.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'404':
description: Not found.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
'5XX':
description: Unexpected error.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterSubscriptionResponse'
/filter/v2/messages/{contentTopic}:
get: # get_waku_v2_filter_v2_messages
summary: Get the latest messages on the polled content topic
description: Get a list of messages that were received on a subscribed content topic after the last time this method was called.
operationId: getMessagesByTopic
tags:
- filter
parameters:
- in: path
name: contentTopic # Note the name is the same as in the path
required: true
schema:
type: string
description: Content topic of message
responses:
'200':
description: The latest messages on the polled topic.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterGetMessagesResponse'
# TODO: Review the possible errors of this endpoint
'400':
description: Bad request.
content:
text/plain:
schema:
type: string
'404':
description: Not found.
content:
text/plain:
schema:
type: string
'5XX':
description: Unexpected error.
content:
text/plain:
schema:
type: string
/filter/v2/messages/{pubsubTopic}/{contentTopic}:
get: # get_waku_v2_filter_v2_messages
summary: Get the latest messages on the polled pubsub/content topic pair
description: Get a list of messages that were received on a subscribed content topic after the last time this method was called.
operationId: getMessagesByTopic
tags:
- filter
parameters:
- in: path
name: contentTopic # Note the name is the same as in the path
required: true
schema:
type: string
description: Content topic of message
- in: path
name: pubsubTopic # Note the name is the same as in the path
required: true
schema:
type: string
description: pubsub topic of message
responses:
'200':
description: The latest messages on the polled topic.
content:
application/json:
schema:
$ref: '#/components/schemas/FilterGetMessagesResponse'
# TODO: Review the possible errors of this endpoint
'400':
description: Bad request.
content:
text/plain:
schema:
type: string
'404':
description: Not found.
content:
text/plain:
schema:
type: string
'5XX':
description: Unexpected error.
content:
text/plain:
schema:
type: string
components:
PubSubTopic:
type: string
ContentTopic:
type: string
FilterSubscriptionResponse:
type: object
properties:
requestId:
type: string
statusDesc:
type: string
required:
- requestId
FilterSubscribeRequest:
type: object
properties:
requestId:
type: string
contentFilters:
type: array
items:
$ref: '#/components/schemas/ContentTopic'
pubsubTopic:
$ref: "#/components/schemas/PubSubTopic"
required:
- requestId
- contentFilters
- pubsubTopic
FilterUnsubscribeRequest:
type: object
properties:
requestId:
type: string
contentFilters:
type: array
items:
$ref: '#/components/schemas/ContentTopic'
pubsubTopic:
$ref: "#/components/schemas/PubSubTopic"
required:
- requestId
- contentFilters
FilterUnsubscribeAllRequest:
type: object
properties:
requestId:
type: string
required:
- requestId
FilterGetMessagesResponse:
type: array
items:
$ref: '#/components/schemas/FilterWakuMessage'
FilterWakuMessage:
type: object
properties:
payload:
type: string
format: byte
contentTopic:
$ref: '#/components/schemas/ContentTopic'
version:
type: number
timestamp:
type: number
required:
- payload

View File

@ -1,84 +0,0 @@
package rest
import (
"fmt"
"sync"
"github.com/waku-org/go-waku/waku/v2/protocol"
"go.uber.org/zap"
)
type filterCache struct {
capacity int
mu sync.RWMutex
log *zap.Logger
data map[string]map[string][]*RestWakuMessage
}
func newFilterCache(capacity int, log *zap.Logger) *filterCache {
return &filterCache{
capacity: capacity,
data: make(map[string]map[string][]*RestWakuMessage),
log: log.Named("cache"),
}
}
func (c *filterCache) subscribe(contentFilter protocol.ContentFilter) {
c.mu.Lock()
defer c.mu.Unlock()
pubSubTopicMap, _ := protocol.ContentFilterToPubSubTopicMap(contentFilter)
for pubsubTopic, contentTopics := range pubSubTopicMap {
if c.data[pubsubTopic] == nil {
c.data[pubsubTopic] = make(map[string][]*RestWakuMessage)
}
for _, topic := range contentTopics {
if c.data[pubsubTopic][topic] == nil {
c.data[pubsubTopic][topic] = []*RestWakuMessage{}
}
}
}
}
func (c *filterCache) unsubscribe(pubsubTopic string, contentTopic string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.data[pubsubTopic], contentTopic)
}
func (c *filterCache) addMessage(envelope *protocol.Envelope) {
c.mu.Lock()
defer c.mu.Unlock()
pubsubTopic := envelope.PubsubTopic()
contentTopic := envelope.Message().ContentTopic
if c.data[pubsubTopic] == nil || c.data[pubsubTopic][contentTopic] == nil {
return
}
// Keep a specific max number of message per topic
if len(c.data[pubsubTopic][contentTopic]) >= c.capacity {
c.data[pubsubTopic][contentTopic] = c.data[pubsubTopic][contentTopic][1:]
}
message := &RestWakuMessage{}
if err := message.FromProto(envelope.Message()); err != nil {
c.log.Error("converting protobuffer msg into rest msg", zap.Error(err))
return
}
c.data[pubsubTopic][contentTopic] = append(c.data[pubsubTopic][contentTopic], message)
}
func (c *filterCache) getMessages(pubsubTopic string, contentTopic string) ([]*RestWakuMessage, error) {
c.mu.RLock()
defer c.mu.RUnlock()
if c.data[pubsubTopic] == nil || c.data[pubsubTopic][contentTopic] == nil {
return nil, fmt.Errorf("not subscribed to pubsubTopic:%s contentTopic: %s", pubsubTopic, contentTopic)
}
msgs := c.data[pubsubTopic][contentTopic]
c.data[pubsubTopic][contentTopic] = []*RestWakuMessage{}
return msgs, nil
}

View File

@ -1,392 +0,0 @@
package rest
import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"time"
"github.com/go-chi/chi/v5"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/node"
wakupeerstore "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/utils"
)
func createNode(t *testing.T, opts ...node.WakuNodeOption) *node.WakuNode {
node, err := node.New(opts...)
require.NoError(t, err)
err = node.Start(context.Background())
require.NoError(t, err)
return node
}
// node2 connects to node1
func twoFilterConnectedNodes(t *testing.T, pubSubTopics ...string) (*node.WakuNode, *node.WakuNode) {
node1 := createNode(t, node.WithWakuFilterFullNode()) // full node filter
node2 := createNode(t, node.WithWakuFilterLightNode()) // light node filter
node2.Host().Peerstore().AddAddr(node1.Host().ID(), tests.GetHostAddress(node1.Host()), peerstore.PermanentAddrTTL)
err := node2.Host().Peerstore().AddProtocols(node1.Host().ID(), filter.FilterSubscribeID_v20beta1)
require.NoError(t, err)
err = node2.Host().Peerstore().(*wakupeerstore.WakuPeerstoreImpl).SetPubSubTopics(node1.Host().ID(), pubSubTopics)
require.NoError(t, err)
return node1, node2
}
// test 400, 404 status code for ping rest endpoint
// both requests are not successful
func TestFilterPingFailure(t *testing.T) {
node1, node2 := twoFilterConnectedNodes(t)
defer func() {
node1.Stop()
node2.Stop()
}()
router := chi.NewRouter()
_ = NewFilterService(node2, router, 0, utils.Logger())
// with empty requestID
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet, fmt.Sprintf("/filter/v2/subscriptions/%s", ""), nil)
router.ServeHTTP(rr, req)
checkJSON(t, filterSubscriptionResponse{
RequestID: "",
StatusDesc: "bad request id",
}, getFilterResponse(t, rr.Body))
require.Equal(t, http.StatusBadRequest, rr.Code)
// no subscription with peer
requestID := hex.EncodeToString(protocol.GenerateRequestID())
rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("/filter/v2/subscriptions/%s", requestID), nil)
router.ServeHTTP(rr, req)
checkJSON(t, filterSubscriptionResponse{
RequestID: requestID,
StatusDesc: "peer has no subscription",
}, getFilterResponse(t, rr.Body))
require.Equal(t, http.StatusNotFound, rr.Code)
}
// create a filter subscription to the peer and try peer that peer
// both steps should be successful
func TestFilterSubscribeAndPing(t *testing.T) {
pubsubTopic := "/waku/2/test/proto"
contentTopics := []string{"test"}
requestID := hex.EncodeToString(protocol.GenerateRequestID())
node1, node2 := twoFilterConnectedNodes(t, pubsubTopic)
defer func() {
node1.Stop()
node2.Stop()
}()
router := chi.NewRouter()
_ = NewFilterService(node2, router, 0, utils.Logger())
// create subscription to peer
rr := httptest.NewRecorder()
reqReader := strings.NewReader(toString(t, filterSubscriptionRequest{
RequestID: requestID,
PubsubTopic: pubsubTopic,
ContentFilters: contentTopics,
}))
req, _ := http.NewRequest(http.MethodPost, filterV2Subscriptions, reqReader)
router.ServeHTTP(rr, req)
checkJSON(t, filterSubscriptionResponse{
RequestID: requestID,
StatusDesc: "OK",
}, getFilterResponse(t, rr.Body))
require.Equal(t, http.StatusOK, rr.Code)
// trying pinging the peer once there is subscription to it
rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", filterV2Subscriptions, requestID), nil)
router.ServeHTTP(rr, req)
checkJSON(t, filterSubscriptionResponse{
RequestID: requestID,
StatusDesc: "OK",
}, getFilterResponse(t, rr.Body))
require.Equal(t, http.StatusOK, rr.Code)
}
// create subscription to peer
// delete the subscription to the peer with matching pubSub and contentTopic
func TestFilterSubscribeAndUnsubscribe(t *testing.T) {
pubsubTopic := "/waku/2/test/proto"
contentTopics := []string{"test"}
requestID := hex.EncodeToString(protocol.GenerateRequestID())
node1, node2 := twoFilterConnectedNodes(t, pubsubTopic)
defer func() {
node1.Stop()
node2.Stop()
}()
router := chi.NewRouter()
_ = NewFilterService(node2, router, 0, utils.Logger())
// create subscription to peer
rr := httptest.NewRecorder()
reqReader := strings.NewReader(toString(t, filterSubscriptionRequest{
RequestID: requestID,
PubsubTopic: pubsubTopic,
ContentFilters: contentTopics,
}))
req, _ := http.NewRequest(http.MethodPost, filterV2Subscriptions, reqReader)
router.ServeHTTP(rr, req)
checkJSON(t, filterSubscriptionResponse{
RequestID: requestID,
StatusDesc: "OK",
}, getFilterResponse(t, rr.Body))
require.Equal(t, http.StatusOK, rr.Code)
// delete the subscription to the peer with matching pubSub and contentTopic
requestID = hex.EncodeToString(protocol.GenerateRequestID())
rr = httptest.NewRecorder()
reqReader = strings.NewReader(toString(t, filterSubscriptionRequest{
RequestID: requestID,
PubsubTopic: pubsubTopic,
ContentFilters: contentTopics,
}))
req, _ = http.NewRequest(http.MethodDelete, filterV2Subscriptions, reqReader)
router.ServeHTTP(rr, req)
checkJSON(t, filterSubscriptionResponse{
RequestID: requestID,
StatusDesc: "OK",
}, getFilterResponse(t, rr.Body))
require.Equal(t, http.StatusOK, rr.Code)
}
// create 2 subscription from filter client to server
// make a unsubscribeAll request
// try pinging the peer, if 404 is returned then unsubscribeAll was successful
func TestFilterAllUnsubscribe(t *testing.T) {
pubsubTopic := "/waku/2/test/proto"
contentTopics1 := "ct_1"
contentTopics2 := "ct_2"
node1, node2 := twoFilterConnectedNodes(t, pubsubTopic)
defer func() {
node1.Stop()
node2.Stop()
}()
router := chi.NewRouter()
_ = NewFilterService(node2, router, 0, utils.Logger())
// create 2 different subscription to peer
for _, ct := range []string{contentTopics1, contentTopics2} {
requestID := hex.EncodeToString(protocol.GenerateRequestID())
rr := httptest.NewRecorder()
reqReader := strings.NewReader(toString(t, filterSubscriptionRequest{
RequestID: requestID,
PubsubTopic: pubsubTopic,
ContentFilters: []string{ct},
}))
req, _ := http.NewRequest(http.MethodPost, filterV2Subscriptions, reqReader)
router.ServeHTTP(rr, req)
checkJSON(t, filterSubscriptionResponse{
RequestID: requestID,
StatusDesc: "OK",
}, getFilterResponse(t, rr.Body))
require.Equal(t, http.StatusOK, rr.Code)
}
// delete all subscription to the peer
requestID := hex.EncodeToString(protocol.GenerateRequestID())
rr := httptest.NewRecorder()
reqReader := strings.NewReader(toString(t, filterUnsubscribeAllRequest{
RequestID: requestID,
}))
req, _ := http.NewRequest(http.MethodDelete, fmt.Sprintf("%s/all", filterV2Subscriptions), reqReader)
router.ServeHTTP(rr, req)
checkJSON(t, filterSubscriptionResponse{
RequestID: requestID,
StatusDesc: "OK",
}, getFilterResponse(t, rr.Body))
require.Equal(t, http.StatusOK, rr.Code)
// check if all subscriptions are deleted to the peer are deleted
requestID = hex.EncodeToString(protocol.GenerateRequestID())
rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", filterV2Subscriptions, requestID), nil)
router.ServeHTTP(rr, req)
checkJSON(t, filterSubscriptionResponse{
RequestID: requestID,
StatusDesc: "peer has no subscription",
}, getFilterResponse(t, rr.Body))
require.Equal(t, http.StatusNotFound, rr.Code)
}
func checkJSON(t *testing.T, expected, actual interface{}) {
require.JSONEq(t, toString(t, expected), toString(t, actual))
}
func getFilterResponse(t *testing.T, body *bytes.Buffer) filterSubscriptionResponse {
resp := filterSubscriptionResponse{}
err := json.Unmarshal(body.Bytes(), &resp)
require.NoError(t, err)
return resp
}
func getMessageResponse(t *testing.T, body *bytes.Buffer) []*pb.WakuMessage {
resp := []*pb.WakuMessage{}
err := json.Unmarshal(body.Bytes(), &resp)
require.NoError(t, err)
return resp
}
func toString(t *testing.T, data interface{}) string {
bytes, err := json.Marshal(data)
require.NoError(t, err)
return string(bytes)
}
func TestFilterGetMessages(t *testing.T) {
pubsubTopic := "/waku/2/test/proto"
contentTopic := "/waku/2/app/1"
// get nodes add connect them
generatedPubsubTopic, err := protocol.GetPubSubTopicFromContentTopic(contentTopic)
require.NoError(t, err)
node1, node2 := twoFilterConnectedNodes(t, pubsubTopic, generatedPubsubTopic)
defer func() {
node1.Stop()
node2.Stop()
}()
// set router and start filter service
router := chi.NewRouter()
service := NewFilterService(node2, router, 2, utils.Logger())
go service.Start(context.Background())
defer service.Stop()
{ // create subscription so that messages are cached
for _, pubsubTopic := range []string{"", pubsubTopic} {
requestID := hex.EncodeToString(protocol.GenerateRequestID())
rr := httptest.NewRecorder()
reqReader := strings.NewReader(toString(t, filterSubscriptionRequest{
RequestID: requestID,
PubsubTopic: pubsubTopic,
ContentFilters: []string{contentTopic},
}))
req, _ := http.NewRequest(http.MethodPost, filterV2Subscriptions, reqReader)
router.ServeHTTP(rr, req)
checkJSON(t, filterSubscriptionResponse{
RequestID: requestID,
StatusDesc: "OK",
}, getFilterResponse(t, rr.Body))
require.Equal(t, http.StatusOK, rr.Code)
}
}
// submit messages
messageByContentTopic := []*protocol.Envelope{
genMessage("", contentTopic),
genMessage("", contentTopic),
genMessage("", contentTopic),
}
messageByPubsubTopic := []*protocol.Envelope{
genMessage(pubsubTopic, contentTopic),
}
for _, envelope := range append(messageByContentTopic, messageByPubsubTopic...) {
node2.Broadcaster().Submit(envelope)
}
time.Sleep(1 * time.Second)
{ // with malformed contentTopic
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet,
fmt.Sprintf("%s/%s", filterv2Messages, url.QueryEscape("/waku/2/wrongtopic")),
nil,
)
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusBadRequest, rr.Code)
require.Equal(t, "bad content topic", rr.Body.String())
}
{ // with check if the cache is working properly
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet,
fmt.Sprintf("%s/%s", filterv2Messages, url.QueryEscape(contentTopic)),
nil,
)
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
checkJSON(t, toMessage(messageByContentTopic[1:]), getMessageResponse(t, rr.Body))
}
{ // check if pubsubTopic is present in the url
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet,
fmt.Sprintf("%s//%s", filterv2Messages, url.QueryEscape(contentTopic)),
nil,
)
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusBadRequest, rr.Code)
require.Equal(t, "missing pubsubTopic", rr.Body.String())
}
{ // check messages by pubsub/contentTopic pair
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodGet,
fmt.Sprintf("%s/%s/%s", filterv2Messages, url.QueryEscape(pubsubTopic), url.QueryEscape(contentTopic)),
nil,
)
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
checkJSON(t, toMessage(messageByPubsubTopic), getMessageResponse(t, rr.Body))
}
{ // check if pubsubTopic/contentTOpic is subscribed or not.
rr := httptest.NewRecorder()
notSubscibredPubsubTopic := "/waku/2/test2/proto"
req, _ := http.NewRequest(http.MethodGet,
fmt.Sprintf("%s/%s/%s", filterv2Messages, url.QueryEscape(notSubscibredPubsubTopic), url.QueryEscape(contentTopic)),
nil,
)
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusNotFound, rr.Code)
require.Equal(t,
fmt.Sprintf("not subscribed to pubsubTopic:%s contentTopic: %s", notSubscibredPubsubTopic, contentTopic),
rr.Body.String(),
)
}
}
func toMessage(envs []*protocol.Envelope) []*pb.WakuMessage {
msgs := make([]*pb.WakuMessage, len(envs))
for i, env := range envs {
msgs[i] = env.Message()
}
return msgs
}
func genMessage(pubsubTopic, contentTopic string) *protocol.Envelope {
if pubsubTopic == "" {
pubsubTopic, _ = protocol.GetPubSubTopicFromContentTopic(contentTopic)
}
return protocol.NewEnvelope(
&pb.WakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: contentTopic,
Timestamp: utils.GetUnixEpoch(),
},
0,
pubsubTopic,
)
}

View File

@ -1,52 +0,0 @@
package rest
import (
"context"
"errors"
"net/http"
"github.com/go-chi/chi/v5"
"github.com/waku-org/go-waku/waku/v2/node"
)
type HealthService struct {
node *node.WakuNode
mux *chi.Mux
}
const routeHealth = "/health"
func NewHealthService(node *node.WakuNode, m *chi.Mux) *HealthService {
h := &HealthService{
node: node,
mux: m,
}
m.Get(routeHealth, h.getHealth)
return h
}
type HealthResponse string
func (d *HealthService) getHealth(w http.ResponseWriter, r *http.Request) {
if d.node.RLNRelay() != nil {
isReady, err := d.node.RLNRelay().IsReady(r.Context())
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
writeResponse(w, HealthResponse("Health check timed out"), http.StatusInternalServerError)
} else {
writeResponse(w, HealthResponse(err.Error()), http.StatusInternalServerError)
}
return
}
if isReady {
writeResponse(w, HealthResponse("Node is healthy"), http.StatusOK)
} else {
writeResponse(w, HealthResponse("Node is not ready"), http.StatusInternalServerError)
}
} else {
writeResponse(w, HealthResponse("Non RLN healthcheck is not implemented"), http.StatusNotImplemented)
}
}

View File

@ -1,41 +0,0 @@
openapi: 3.0.3
info:
title: Waku V2 node Health REST API
version: 1.0.0
contact:
name: VAC Team
url: https://forum.vac.dev/
tags:
- name: health
description: Healt check REST API for WakuV2 node
paths:
/health:
get:
summary: Get node health status
description: Retrieve readiness of a Waku v2 node.
operationId: healthcheck
tags:
- health
responses:
'200':
description: Waku v2 node is up and running.
content:
text/plain:
schema:
type: string
example: Node is healty
'500':
description: Internal server error
content:
text/plain:
schema:
type: string
'503':
description: Node not initialized or having issues
content:
text/plain:
schema:
type: string
example: Node is not initialized

View File

@ -1,84 +0,0 @@
openapi: 3.0.3
info:
title: Waku V2 node REST API
version: 1.0.0
contact:
name: VAC Team
url: https://forum.vac.dev/
tags:
- name: lightpush
description: Lightpush REST API for WakuV2 node
paths:
/lightpush/v1/message:
post:
summary: Request a message relay from a LightPush service provider
description: Push a message to be relayed on a PubSub topic.
operationId: postMessagesToPubsubTopic
tags:
- lightpush
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/PushRequest'
responses:
'200':
description: OK
content:
text/plain:
schema:
type: string
'400':
description: Bad request.
content:
text/plain:
schema:
type: string
'500':
description: Internal server error
content:
text/plain:
schema:
type: string
'503':
description: Service not available
content:
text/plain:
schema:
type: string
components:
schemas:
PubsubTopic:
type: string
ContentTopic:
type: string
WakuMessage:
type: object
properties:
payload:
type: string
format: byte
contentTopic:
$ref: '#/components/schemas/ContentTopic'
version:
type: number
timestamp:
type: number
required:
- payload
- contentTopic
PushRequest:
type: object
properties:
pusbsubTopic:
$ref: '#/components/schemas/PubsubTopic'
message:
$ref: '#/components/schemas/WakuMessage'
required:
- message

View File

@ -1,80 +0,0 @@
package rest
import (
"encoding/json"
"errors"
"net/http"
"github.com/go-chi/chi/v5"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"go.uber.org/zap"
)
const routeLightPushV1Messages = "/lightpush/v1/message"
type LightpushService struct {
node *node.WakuNode
log *zap.Logger
}
func NewLightpushService(node *node.WakuNode, m *chi.Mux, log *zap.Logger) *LightpushService {
serv := &LightpushService{
node: node,
log: log.Named("lightpush"),
}
m.Post(routeLightPushV1Messages, serv.postMessagev1)
return serv
}
func (msg lightpushRequest) Check() error {
if msg.Message == nil {
return errors.New("waku message is required")
}
return nil
}
type lightpushRequest struct {
PubSubTopic string `json:"pubsubTopic"`
Message *RestWakuMessage `json:"message"`
}
// handled error codes are 200, 400, 500, 503
func (serv *LightpushService) postMessagev1(w http.ResponseWriter, req *http.Request) {
request := &lightpushRequest{}
decoder := json.NewDecoder(req.Body)
if err := decoder.Decode(request); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
defer req.Body.Close()
if err := request.Check(); err != nil {
w.WriteHeader(http.StatusBadRequest)
_, err = w.Write([]byte(err.Error()))
serv.log.Error("writing response", zap.Error(err))
return
}
if serv.node.Lightpush() == nil {
w.WriteHeader(http.StatusServiceUnavailable)
return
}
message, err := request.Message.ToProto()
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
_, err = serv.node.Lightpush().Publish(req.Context(), message, lightpush.WithPubSubTopic(request.PubSubTopic))
if err != nil {
w.WriteHeader(http.StatusServiceUnavailable)
_, err = w.Write([]byte(err.Error()))
serv.log.Error("writing response", zap.Error(err))
} else {
writeErrOrResponse(w, err, true)
}
}

View File

@ -1,61 +0,0 @@
package rest
import (
"bytes"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"github.com/go-chi/chi/v5"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/node"
wakupeerstore "github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"github.com/waku-org/go-waku/waku/v2/utils"
)
// node2 connects to node1
func twoLightPushConnectedNodes(t *testing.T, pubSubTopic string) (*node.WakuNode, *node.WakuNode) {
node1 := createNode(t, node.WithLightPush(), node.WithWakuRelay())
node2 := createNode(t, node.WithLightPush(), node.WithWakuRelay())
node2.Host().Peerstore().AddAddr(node1.Host().ID(), tests.GetHostAddress(node1.Host()), peerstore.PermanentAddrTTL)
err := node2.Host().Peerstore().AddProtocols(node1.Host().ID(), lightpush.LightPushID_v20beta1)
require.NoError(t, err)
err = node2.Host().Peerstore().(*wakupeerstore.WakuPeerstoreImpl).SetPubSubTopics(node1.Host().ID(), []string{pubSubTopic})
require.NoError(t, err)
return node1, node2
}
func TestLightpushMessagev1(t *testing.T) {
pubSubTopic := "/waku/2/default-waku/proto"
node1, node2 := twoLightPushConnectedNodes(t, pubSubTopic)
defer func() {
node1.Stop()
node2.Stop()
}()
router := chi.NewRouter()
serv := NewLightpushService(node2, router, utils.Logger())
_ = serv
msg := lightpushRequest{
PubSubTopic: pubSubTopic,
Message: &RestWakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "abc",
Timestamp: utils.GetUnixEpoch(),
},
}
msgJSONBytes, err := json.Marshal(msg)
require.NoError(t, err)
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodPost, routeLightPushV1Messages, bytes.NewReader(msgJSONBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
require.Equal(t, "true", rr.Body.String())
}

View File

@ -1,46 +0,0 @@
package rest
import (
"errors"
"github.com/waku-org/go-waku/cmd/waku/server"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
)
type RestWakuMessage struct {
Payload server.Base64URLByte `json:"payload"`
ContentTopic string `json:"contentTopic"`
Version *uint32 `json:"version,omitempty"`
Timestamp *int64 `json:"timestamp,omitempty"`
Meta []byte `json:"meta,omitempty"`
}
func (r *RestWakuMessage) FromProto(input *pb.WakuMessage) error {
if err := input.Validate(); err != nil {
return err
}
r.Payload = input.Payload
r.ContentTopic = input.ContentTopic
r.Timestamp = input.Timestamp
r.Version = input.Version
r.Meta = input.Meta
return nil
}
func (r *RestWakuMessage) ToProto() (*pb.WakuMessage, error) {
if r == nil {
return nil, errors.New("wakumessage is missing")
}
msg := &pb.WakuMessage{
Payload: r.Payload,
ContentTopic: r.ContentTopic,
Version: r.Version,
Timestamp: r.Timestamp,
Meta: r.Meta,
}
return msg, nil
}

View File

@ -1,316 +0,0 @@
package rest
import (
"encoding/json"
"net/http"
"strings"
"github.com/go-chi/chi/v5"
"github.com/waku-org/go-waku/cmd/waku/server"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap"
)
const routeRelayV1Subscriptions = "/relay/v1/subscriptions"
const routeRelayV1Messages = "/relay/v1/messages/{topic}"
const routeRelayV1AutoSubscriptions = "/relay/v1/auto/subscriptions"
const routeRelayV1AutoMessages = "/relay/v1/auto/messages"
// RelayService represents the REST service for WakuRelay
type RelayService struct {
node *node.WakuNode
log *zap.Logger
cacheCapacity uint
}
// NewRelayService returns an instance of RelayService
func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity uint, log *zap.Logger) *RelayService {
s := &RelayService{
node: node,
log: log.Named("relay"),
cacheCapacity: cacheCapacity,
}
m.Post(routeRelayV1Subscriptions, s.postV1Subscriptions)
m.Delete(routeRelayV1Subscriptions, s.deleteV1Subscriptions)
m.Get(routeRelayV1Messages, s.getV1Messages)
m.Post(routeRelayV1Messages, s.postV1Message)
m.Post(routeRelayV1AutoSubscriptions, s.postV1AutoSubscriptions)
m.Delete(routeRelayV1AutoSubscriptions, s.deleteV1AutoSubscriptions)
m.Route(routeRelayV1AutoMessages, func(r chi.Router) {
r.Get("/{contentTopic}", s.getV1AutoMessages)
r.Post("/", s.postV1AutoMessage)
})
return s
}
func (r *RelayService) deleteV1Subscriptions(w http.ResponseWriter, req *http.Request) {
var topics []string
decoder := json.NewDecoder(req.Body)
if err := decoder.Decode(&topics); err != nil {
r.log.Error("decoding request failure", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
defer req.Body.Close()
var err error
for _, topic := range topics {
err = r.node.Relay().Unsubscribe(req.Context(), protocol.NewContentFilter(topic))
if err != nil {
r.log.Error("unsubscribing from topic", zap.String("topic", strings.Replace(strings.Replace(topic, "\n", "", -1), "\r", "", -1)), zap.Error(err))
}
}
writeErrOrResponse(w, err, true)
}
func (r *RelayService) postV1Subscriptions(w http.ResponseWriter, req *http.Request) {
var topics []string
decoder := json.NewDecoder(req.Body)
if err := decoder.Decode(&topics); err != nil {
r.log.Error("decoding request failure", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
defer req.Body.Close()
var err error
var successCnt int
var topicToSubscribe string
for _, topic := range topics {
if topic == "" {
topicToSubscribe = relay.DefaultWakuTopic
} else {
topicToSubscribe = topic
}
_, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter(topicToSubscribe), relay.WithCacheSize(r.cacheCapacity))
if err != nil {
r.log.Error("subscribing to topic", zap.String("topic", strings.Replace(topicToSubscribe, "\n", "", -1)), zap.Error(err))
continue
}
successCnt++
}
// on partial subscribe failure
if successCnt > 0 && err != nil {
r.log.Error("partial subscribe failed", zap.Error(err))
// on partial failure
writeResponse(w, err, http.StatusOK)
return
}
writeErrOrResponse(w, err, true)
}
func (r *RelayService) getV1Messages(w http.ResponseWriter, req *http.Request) {
topic := topicFromPath(w, req, "topic", r.log)
if topic == "" {
r.log.Debug("topic is not specified, using default waku topic")
topic = relay.DefaultWakuTopic
}
//TODO: Update the API to also take a contentTopic since relay now supports filtering based on contentTopic as well.
sub, err := r.node.Relay().GetSubscriptionWithPubsubTopic(topic, "")
if err != nil {
w.WriteHeader(http.StatusNotFound)
_, err = w.Write([]byte(err.Error()))
r.log.Error("writing response", zap.Error(err))
return
}
var response []*RestWakuMessage
done := false
for {
if done || len(response) > int(r.cacheCapacity) {
break
}
select {
case envelope, open := <-sub.Ch:
if !open {
r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", topic))
w.WriteHeader(http.StatusNotFound)
_, err = w.Write([]byte("consume channel is closed for subscription"))
if err != nil {
r.log.Error("writing response", zap.Error(err))
}
return
}
message := &RestWakuMessage{}
if err := message.FromProto(envelope.Message()); err != nil {
r.log.Error("converting protobuffer msg into rest msg", zap.Error(err))
} else {
response = append(response, message)
}
case <-req.Context().Done():
done = true
default:
done = true
}
}
writeErrOrResponse(w, nil, response)
}
func (r *RelayService) postV1Message(w http.ResponseWriter, req *http.Request) {
topic := topicFromPath(w, req, "topic", r.log)
if topic == "" {
r.log.Debug("topic is not specified, using default waku topic")
topic = relay.DefaultWakuTopic
}
var restMessage *RestWakuMessage
decoder := json.NewDecoder(req.Body)
if err := decoder.Decode(&restMessage); err != nil {
r.log.Error("decoding request failure", zap.Error(err))
writeErrResponse(w, r.log, err, http.StatusBadRequest)
return
}
defer req.Body.Close()
message, err := restMessage.ToProto()
if err != nil {
r.log.Error("failed to convert message to proto", zap.Error(err))
writeErrResponse(w, r.log, err, http.StatusBadRequest)
return
}
if err := server.AppendRLNProof(r.node, message); err != nil {
r.log.Error("failed to append RLN proof for the message", zap.Error(err))
writeErrOrResponse(w, err, nil)
return
}
_, err = r.node.Relay().Publish(req.Context(), message, relay.WithPubSubTopic(strings.Replace(topic, "\n", "", -1)))
if err != nil {
r.log.Error("publishing message", zap.Error(err))
if err == pb.ErrMissingPayload || err == pb.ErrMissingContentTopic || err == pb.ErrInvalidMetaLength {
writeErrResponse(w, r.log, err, http.StatusBadRequest)
return
}
}
writeErrOrResponse(w, err, true)
}
func (r *RelayService) deleteV1AutoSubscriptions(w http.ResponseWriter, req *http.Request) {
var cTopics []string
decoder := json.NewDecoder(req.Body)
if err := decoder.Decode(&cTopics); err != nil {
r.log.Error("decoding request failure", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
defer req.Body.Close()
err := r.node.Relay().Unsubscribe(req.Context(), protocol.NewContentFilter("", cTopics...))
if err != nil {
r.log.Error("unsubscribing from topics", zap.Strings("contentTopics", cTopics), zap.Error(err))
}
writeErrOrResponse(w, err, true)
}
func (r *RelayService) postV1AutoSubscriptions(w http.ResponseWriter, req *http.Request) {
var cTopics []string
decoder := json.NewDecoder(req.Body)
if err := decoder.Decode(&cTopics); err != nil {
r.log.Error("decoding request failure", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
defer req.Body.Close()
var err error
_, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", cTopics...), relay.WithCacheSize(r.cacheCapacity))
if err != nil {
r.log.Error("subscribing to topics", zap.Strings("contentTopics", cTopics), zap.Error(err))
}
r.log.Debug("subscribed to topics", zap.Strings("contentTopics", cTopics))
if err != nil {
r.log.Error("writing response", zap.Error(err))
writeErrResponse(w, r.log, err, http.StatusBadRequest)
} else {
writeErrOrResponse(w, err, true)
}
}
func (r *RelayService) getV1AutoMessages(w http.ResponseWriter, req *http.Request) {
cTopic := topicFromPath(w, req, "contentTopic", r.log)
sub, err := r.node.Relay().GetSubscription(cTopic)
if err != nil {
r.log.Error("writing response", zap.Error(err))
writeErrResponse(w, r.log, err, http.StatusNotFound)
return
}
var response []*RestWakuMessage
done := false
for {
if done || len(response) > int(r.cacheCapacity) {
break
}
select {
case envelope := <-sub.Ch:
message := &RestWakuMessage{}
if err := message.FromProto(envelope.Message()); err != nil {
r.log.Error("converting protobuffer msg into rest msg", zap.Error(err))
} else {
response = append(response, message)
}
case <-req.Context().Done():
done = true
default:
done = true
}
}
writeErrOrResponse(w, nil, response)
}
func (r *RelayService) postV1AutoMessage(w http.ResponseWriter, req *http.Request) {
var restMessage *RestWakuMessage
decoder := json.NewDecoder(req.Body)
if err := decoder.Decode(&restMessage); err != nil {
r.log.Error("decoding request failure", zap.Error(err))
w.WriteHeader(http.StatusBadRequest)
return
}
defer req.Body.Close()
message, err := restMessage.ToProto()
if err != nil {
writeErrOrResponse(w, err, nil)
return
}
if err = server.AppendRLNProof(r.node, message); err != nil {
writeErrOrResponse(w, err, nil)
return
}
_, err = r.node.Relay().Publish(req.Context(), message)
if err != nil {
r.log.Error("publishing message", zap.Error(err))
if err == pb.ErrMissingPayload || err == pb.ErrMissingContentTopic || err == pb.ErrInvalidMetaLength {
writeErrResponse(w, r.log, err, http.StatusBadRequest)
return
}
writeErrResponse(w, r.log, err, http.StatusBadRequest)
} else {
w.WriteHeader(http.StatusOK)
}
}

View File

@ -1,245 +0,0 @@
openapi: 3.0.3
info:
title: Waku V2 node Relay REST API
version: 1.0.0
contact:
name: VAC Team
url: https://forum.vac.dev/
tags:
- name: relay
description: Relay REST API for WakuV2 node
paths:
/relay/v1/messages/{topic}: # Note the plural in messages
get: # get_waku_v2_relay_v1_messages
summary: Get the latest messages on the polled topic
description: Get a list of messages that were received on a subscribed PubSub topic after the last time this method was called.
operationId: getMessagesByTopic
tags:
- relay
parameters:
- in: path
name: topic # Note the name is the same as in the path
required: true
schema:
type: string
description: The user ID
responses:
'200':
description: The latest messages on the polled topic.
content:
application/json:
schema:
$ref: '#/components/schemas/RelayGetMessagesResponse'
# TODO: Review the possible errors of this endpoint
'5XX':
description: Unexpected error.
post: # post_waku_v2_relay_v1_message
summary: Publish a message to be relayed
description: Publishes a message to be relayed on a PubSub topic.
operationId: postMessagesToTopic
tags:
- relay
parameters:
- in: path
name: topic # Note the name is the same as in the path
description: The messages content topic
required: true
schema:
$ref: '#/components/schemas/RelayPostMessagesRequest'
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/RelayPostMessagesRequest'
responses:
'200':
description: OK
# TODO: Review the possible errors of this endpoint
'5XX':
description: Unexpected error.
/relay/v1/subscriptions:
post: # post_waku_v2_relay_v1_subscriptions
summary: Subscribe a node to an array of topics
description: Subscribe a node to an array of PubSub topics.
operationId: postSubscriptions
tags:
- relay
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/RelayPostSubscriptionsRequest'
responses:
'200':
description: OK
content:
text/plain:
schema:
type: string
# TODO: Review the possible errors of this endpoint
'5XX':
description: Unexpected error.
delete: # delete_waku_v2_relay_v1_subscriptions
summary: Unsubscribe a node from an array of topics
description: Unsubscribe a node from an array of PubSub topics.
operationId: deleteSubscriptions
tags:
- relay
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/RelayDeleteSubscriptionsRequest'
responses:
'200':
description: OK
content:
text/plain:
schema:
type: string
# TODO: Review the possible errors of this endpoint
'5XX':
description: Unexpected error.
/relay/v1/auto/messages/{contentTopic}: # Note the plural in messages
get: # get_waku_v2_relay_v1_auto_messages
summary: Get the latest messages on the polled topic
description: Get a list of messages that were received on a subscribed Content topic after the last time this method was called.
operationId: getMessagesByTopic
tags:
- relay
parameters:
- in: path
name: contentTopic # Note the name is the same as in the path
required: true
schema:
type: string
description: The user ID
responses:
'200':
description: The latest messages on the polled topic.
content:
application/json:
schema:
$ref: '#/components/schemas/RelayGetMessagesResponse'
'4XX':
description: Bad request.
'5XX':
description: Unexpected error.
/relay/v1/auto/messages: # Note the plural in messages
post: # post_waku_v2_relay_v1_auto_message
summary: Publish a message to be relayed
description: Publishes a message to be relayed on a Content topic.
operationId: postMessagesToTopic
tags:
- relay
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/RelayPostMessagesRequest'
responses:
'200':
description: OK
'4XX':
description: Bad request.
'5XX':
description: Unexpected error.
/relay/v1/auto/subscriptions:
post: # post_waku_v2_relay_v1_auto_subscriptions
summary: Subscribe a node to an array of topics
description: Subscribe a node to an array of Content topics.
operationId: postSubscriptions
tags:
- relay
requestBody:
content:
application/json:
schema:
type array:
items:
$ref: '#/components/schemas/ContentTopic'
responses:
'200':
description: OK
content:
text/plain:
schema:
type: string
'4XX':
description: Bad request.
'5XX':
description: Unexpected error.
delete: # delete_waku_v2_relay_v1_auto_subscriptions
summary: Unsubscribe a node from an array of topics
description: Unsubscribe a node from an array of Content topics.
operationId: deleteSubscriptions
tags:
- relay
requestBody:
content:
application/json:
schema:
type array:
items:
$ref: '#/components/schemas/ContentTopic'
responses:
'200':
description: OK
content:
text/plain:
schema:
type: string
'4XX':
description: Bad request.
'5XX':
description: Unexpected error.
components:
schemas:
PubSubTopic:
type: string
ContentTopic:
type: string
RelayWakuMessage:
type: object
properties:
payload:
type: string
format: byte
contentTopic:
$ref: '#/components/schemas/ContentTopic'
version:
type: number
timestamp:
type: number
required:
- payload
RelayGetMessagesResponse:
type: array
items:
$ref: '#/components/schemas/RelayWakuMessage'
RelayPostMessagesRequest:
$ref: '#/components/schemas/RelayWakuMessage'
RelayPostSubscriptionsRequest:
type: array
items:
$ref: '#/components/schemas/PubSubTopic'
RelayDeleteSubscriptionsRequest:
type: array
items:
$ref: '#/components/schemas/PubSubTopic'

View File

@ -1,296 +0,0 @@
package rest
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"
"github.com/go-chi/chi/v5"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
"google.golang.org/protobuf/proto"
)
func makeRelayService(t *testing.T, mux *chi.Mux) *RelayService {
options := node.WithWakuRelayAndMinPeers(0)
n, err := node.New(options)
require.NoError(t, err)
err = n.Start(context.Background())
require.NoError(t, err)
return NewRelayService(n, mux, 3, utils.Logger())
}
func TestPostV1Message(t *testing.T) {
router := chi.NewRouter()
_ = makeRelayService(t, router)
msg := &RestWakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "abc",
Timestamp: utils.GetUnixEpoch(),
}
msgJSONBytes, err := json.Marshal(msg)
require.NoError(t, err)
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodPost, "/relay/v1/messages/test", bytes.NewReader(msgJSONBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
require.Equal(t, "true", rr.Body.String())
}
func TestRelaySubscription(t *testing.T) {
router := chi.NewRouter()
r := makeRelayService(t, router)
// Wait for node to start
time.Sleep(500 * time.Millisecond)
topics := []string{"test"}
topicsJSONBytes, err := json.Marshal(topics)
require.NoError(t, err)
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodPost, routeRelayV1Subscriptions, bytes.NewReader(topicsJSONBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
require.Equal(t, "true", rr.Body.String())
// Test max messages in subscription
now := *utils.GetUnixEpoch()
_, err = r.node.Relay().Publish(context.Background(),
tests.CreateWakuMessage("test", proto.Int64(now+1)), relay.WithPubSubTopic("test"))
require.NoError(t, err)
_, err = r.node.Relay().Publish(context.Background(),
tests.CreateWakuMessage("test", proto.Int64(now+2)), relay.WithPubSubTopic("test"))
require.NoError(t, err)
_, err = r.node.Relay().Publish(context.Background(),
tests.CreateWakuMessage("test", proto.Int64(now+3)), relay.WithPubSubTopic("test"))
require.NoError(t, err)
// Wait for the messages to be processed
time.Sleep(5 * time.Millisecond)
// Test deletion
rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodDelete, routeRelayV1Subscriptions, bytes.NewReader(topicsJSONBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
require.Equal(t, "true", rr.Body.String())
}
func TestRelayGetV1Messages(t *testing.T) {
router := chi.NewRouter()
router1 := chi.NewRouter()
serviceA := makeRelayService(t, router)
serviceB := makeRelayService(t, router1)
hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", serviceB.node.Host().ID().String()))
require.NoError(t, err)
var addr multiaddr.Multiaddr
for _, a := range serviceB.node.Host().Addrs() {
addr = a.Encapsulate(hostInfo)
break
}
err = serviceA.node.DialPeerWithMultiAddress(context.Background(), addr)
require.NoError(t, err)
// Wait for the dial to complete
time.Sleep(1 * time.Second)
topics := []string{"test"}
topicsJSONBytes, err := json.Marshal(topics)
require.NoError(t, err)
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodPost, routeRelayV1Subscriptions, bytes.NewReader(topicsJSONBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
// Wait for the subscription to be started
time.Sleep(1 * time.Second)
msg := &RestWakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "test",
Timestamp: utils.GetUnixEpoch(),
}
msgJsonBytes, err := json.Marshal(msg)
require.NoError(t, err)
rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodPost, "/relay/v1/messages/test", bytes.NewReader(msgJsonBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
// Wait for the message to be received
time.Sleep(1 * time.Second)
rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodGet, "/relay/v1/messages/test", bytes.NewReader([]byte{}))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
var messages []*pb.WakuMessage
err = json.Unmarshal(rr.Body.Bytes(), &messages)
require.NoError(t, err)
require.Len(t, messages, 1)
rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodGet, "/relay/v1/messages/test", bytes.NewReader([]byte{}))
router1.ServeHTTP(rr, req)
require.Equal(t, http.StatusNotFound, rr.Code)
}
func TestPostAutoV1Message(t *testing.T) {
router := chi.NewRouter()
_ = makeRelayService(t, router)
msg := &RestWakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: "/toychat/1/huilong/proto",
Timestamp: utils.GetUnixEpoch(),
}
msgJSONBytes, err := json.Marshal(msg)
require.NoError(t, err)
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodPost, routeRelayV1AutoMessages, bytes.NewReader(msgJSONBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
}
func TestRelayAutoSubUnsub(t *testing.T) {
router := chi.NewRouter()
r := makeRelayService(t, router)
// Wait for node to start
time.Sleep(500 * time.Millisecond)
cTopic1 := "/toychat/1/huilong/proto"
cTopics := []string{cTopic1}
topicsJSONBytes, err := json.Marshal(cTopics)
require.NoError(t, err)
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodPost, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
require.Equal(t, "true", rr.Body.String())
// Test publishing messages after subscription
now := *utils.GetUnixEpoch()
_, err = r.node.Relay().Publish(context.Background(),
tests.CreateWakuMessage(cTopic1, proto.Int64(now+1)))
require.NoError(t, err)
// Wait for the messages to be processed
time.Sleep(5 * time.Millisecond)
// Test deletion
rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodDelete, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
require.Equal(t, "true", rr.Body.String())
cTopics = append(cTopics, "test")
topicsJSONBytes, err = json.Marshal(cTopics)
require.NoError(t, err)
rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodPost, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusBadRequest, rr.Code)
}
func TestRelayGetV1AutoMessages(t *testing.T) {
router := chi.NewRouter()
router1 := chi.NewRouter()
serviceA := makeRelayService(t, router)
serviceB := makeRelayService(t, router1)
hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", serviceB.node.Host().ID().String()))
require.NoError(t, err)
var addr multiaddr.Multiaddr
for _, a := range serviceB.node.Host().Addrs() {
addr = a.Encapsulate(hostInfo)
break
}
err = serviceA.node.DialPeerWithMultiAddress(context.Background(), addr)
require.NoError(t, err)
// Wait for the dial to complete
time.Sleep(1 * time.Second)
cTopic1 := "/toychat/1/huilong/proto"
cTopics := []string{cTopic1}
topicsJSONBytes, err := json.Marshal(cTopics)
require.NoError(t, err)
rr := httptest.NewRecorder()
req, _ := http.NewRequest(http.MethodPost, routeRelayV1AutoSubscriptions, bytes.NewReader(topicsJSONBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
require.Equal(t, "true", rr.Body.String())
// Wait for the subscription to be started
time.Sleep(1 * time.Second)
msg := &RestWakuMessage{
Payload: []byte{1, 2, 3},
ContentTopic: cTopic1,
Timestamp: utils.GetUnixEpoch(),
}
msgJsonBytes, err := json.Marshal(msg)
require.NoError(t, err)
rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodPost, routeRelayV1AutoMessages, bytes.NewReader(msgJsonBytes))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
// Wait for the message to be received
time.Sleep(1 * time.Second)
rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", routeRelayV1AutoMessages, url.QueryEscape(cTopic1)), bytes.NewReader([]byte{}))
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
var messages []*pb.WakuMessage
err = json.Unmarshal(rr.Body.Bytes(), &messages)
require.NoError(t, err)
require.Len(t, messages, 1)
rr = httptest.NewRecorder()
req, _ = http.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", routeRelayV1AutoMessages, url.QueryEscape(cTopic1)), bytes.NewReader([]byte{}))
router1.ServeHTTP(rr, req)
require.Equal(t, http.StatusNotFound, rr.Code)
}

View File

@ -1,48 +0,0 @@
package rest
import (
"context"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
)
type Adder func(msg *protocol.Envelope)
type runnerService struct {
broadcaster relay.Broadcaster
sub *relay.Subscription
cancel context.CancelFunc
adder Adder
}
func newRunnerService(broadcaster relay.Broadcaster, adder Adder) *runnerService {
return &runnerService{
broadcaster: broadcaster,
adder: adder,
}
}
func (r *runnerService) Start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
r.cancel = cancel
r.sub = r.broadcaster.RegisterForAll(relay.WithBufferSize(relay.DefaultRelaySubscriptionBufferSize))
for {
select {
case <-ctx.Done():
return
case envelope, ok := <-r.sub.Ch:
if ok {
r.adder(envelope)
}
}
}
}
func (r *runnerService) Stop() {
if r.cancel == nil {
return
}
r.sub.Unsubscribe()
r.cancel()
}

View File

@ -1,206 +0,0 @@
package rest
import (
"context"
"encoding/base64"
"fmt"
"net/http"
"strconv"
"strings"
"time"
"github.com/go-chi/chi/v5"
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
)
type StoreService struct {
node *node.WakuNode
mux *chi.Mux
}
type StoreResponse struct {
Messages []StoreWakuMessage `json:"messages"`
Cursor *HistoryCursor `json:"cursor,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
}
type HistoryCursor struct {
PubsubTopic string `json:"pubsubTopic"`
SenderTime string `json:"senderTime"`
StoreTime string `json:"storeTime"`
Digest []byte `json:"digest"`
}
type StoreWakuMessage struct {
Payload []byte `json:"payload"`
ContentTopic string `json:"contentTopic"`
Version *uint32 `json:"version,omitempty"`
Timestamp *int64 `json:"timestamp,omitempty"`
Meta []byte `json:"meta,omitempty"`
}
const routeStoreMessagesV1 = "/store/v1/messages"
func NewStoreService(node *node.WakuNode, m *chi.Mux) *StoreService {
s := &StoreService{
node: node,
mux: m,
}
m.Get(routeStoreMessagesV1, s.getV1Messages)
return s
}
func getStoreParams(r *http.Request) (*store.Query, []store.HistoryRequestOption, error) {
query := &store.Query{}
var options []store.HistoryRequestOption
var err error
peerAddrStr := r.URL.Query().Get("peerAddr")
var m multiaddr.Multiaddr
if peerAddrStr != "" {
m, err = multiaddr.NewMultiaddr(peerAddrStr)
if err != nil {
return nil, nil, err
}
options = append(options, store.WithPeerAddr(m))
}
query.PubsubTopic = r.URL.Query().Get("pubsubTopic")
contentTopics := r.URL.Query().Get("contentTopics")
if contentTopics != "" {
query.ContentTopics = strings.Split(contentTopics, ",")
}
startTimeStr := r.URL.Query().Get("startTime")
if startTimeStr != "" {
startTime, err := strconv.ParseInt(startTimeStr, 10, 64)
if err != nil {
return nil, nil, err
}
query.StartTime = &startTime
}
endTimeStr := r.URL.Query().Get("endTime")
if endTimeStr != "" {
endTime, err := strconv.ParseInt(endTimeStr, 10, 64)
if err != nil {
return nil, nil, err
}
query.EndTime = &endTime
}
var cursor *pb.Index
senderTimeStr := r.URL.Query().Get("senderTime")
storeTimeStr := r.URL.Query().Get("storeTime")
digestStr := r.URL.Query().Get("digest")
if senderTimeStr != "" || storeTimeStr != "" || digestStr != "" {
cursor = &pb.Index{}
if senderTimeStr != "" {
cursor.SenderTime, err = strconv.ParseInt(senderTimeStr, 10, 64)
if err != nil {
return nil, nil, err
}
}
if storeTimeStr != "" {
cursor.ReceiverTime, err = strconv.ParseInt(storeTimeStr, 10, 64)
if err != nil {
return nil, nil, err
}
}
if digestStr != "" {
cursor.Digest, err = base64.URLEncoding.DecodeString(digestStr)
if err != nil {
return nil, nil, err
}
}
cursor.PubsubTopic = query.PubsubTopic
options = append(options, store.WithCursor(cursor))
}
pageSizeStr := r.URL.Query().Get("pageSize")
ascendingStr := r.URL.Query().Get("ascending")
if ascendingStr != "" || pageSizeStr != "" {
ascending := true
pageSize := uint64(store.DefaultPageSize)
if ascendingStr != "" {
ascending, err = strconv.ParseBool(ascendingStr)
if err != nil {
return nil, nil, err
}
}
if pageSizeStr != "" {
pageSize, err = strconv.ParseUint(pageSizeStr, 10, 64)
if err != nil {
return nil, nil, err
}
if pageSize > store.MaxPageSize {
pageSize = store.MaxPageSize
}
}
options = append(options, store.WithPaging(ascending, pageSize))
}
return query, options, nil
}
func writeStoreError(w http.ResponseWriter, code int, err error) {
writeResponse(w, StoreResponse{ErrorMessage: err.Error()}, code)
}
func toStoreResponse(result *store.Result) StoreResponse {
response := StoreResponse{}
cursor := result.Cursor()
if cursor != nil {
response.Cursor = &HistoryCursor{
PubsubTopic: cursor.PubsubTopic,
SenderTime: fmt.Sprintf("%d", cursor.SenderTime),
StoreTime: fmt.Sprintf("%d", cursor.ReceiverTime),
Digest: cursor.Digest,
}
}
for _, m := range result.Messages {
response.Messages = append(response.Messages, StoreWakuMessage{
Payload: m.Payload,
ContentTopic: m.ContentTopic,
Version: m.Version,
Timestamp: m.Timestamp,
Meta: m.Meta,
})
}
return response
}
func (d *StoreService) getV1Messages(w http.ResponseWriter, r *http.Request) {
query, options, err := getStoreParams(r)
if err != nil {
writeStoreError(w, http.StatusBadRequest, err)
return
}
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
result, err := d.node.Store().Query(ctx, *query, options...)
if err != nil {
writeStoreError(w, http.StatusInternalServerError, err)
return
}
writeErrOrResponse(w, nil, toStoreResponse(result))
}

View File

@ -1,203 +0,0 @@
openapi: 3.0.3
info:
title: Waku V2 node Store REST API
version: 1.0.0
contact:
name: VAC Team
url: https://forum.vac.dev/
tags:
- name: store
description: Store REST API for WakuV2 node
paths:
/store/v1/messages:
get:
summary: Gets message history
description: >
Retrieves WakuV2 message history. The returned history
can be potentially filtered by optional request parameters.
operationId: getMessageHistory
tags:
- store
parameters:
- name: peerAddr
in: query
schema:
type: string
required: true
description: >
P2P fully qualified peer multiaddress
in the format `(ip4|ip6)/tcp/p2p/$peerId` and URL-encoded.
example: '%2Fip4%2F127.0.0.1%2Ftcp%2F60001%2Fp2p%2F16Uiu2HAmVFXtAfSj4EiR7mL2KvL4EE2wztuQgUSBoj2Jx2KeXFLN'
- name: pubsubTopic
in: query
schema:
type: string
description: >
The pubsub topic on which a WakuMessage is published.
If left empty, no filtering is applied.
It is also intended for pagination purposes.
It should be a URL-encoded string.
example: 'my%20pubsub%20topic'
- name: contentTopics
in: query
schema: string
description: >
Comma-separated list of content topics. When specified,
only WakuMessages that are linked to any of the given
content topics will be delivered in the get response.
It should be a URL-encoded-comma-separated string.
example: 'my%20first%20content%20topic%2Cmy%20second%20content%20topic%2Cmy%20third%20content%20topic'
- name: startTime
in: query
schema:
type: string
description: >
The inclusive lower bound on the timestamp of
queried WakuMessages. This field holds the
Unix epoch time in nanoseconds as a 64-bits
integer value.
example: '1680590945000000000'
- name: endTime
in: query
schema:
type: string
description: >
The inclusive upper bound on the timestamp of
queried WakuMessages. This field holds the
Unix epoch time in nanoseconds as a 64-bits
integer value.
example: '1680590945000000000'
- name: senderTime
in: query
schema:
type: string
description: >
Cursor field intended for pagination purposes.
Represents the Unix time in nanoseconds at which a message was generated.
It could be empty for retrieving the first page,
and will be returned from the GET response so that
it can be part of the next page request.
example: '1680590947000000000'
- name: storeTime
in: query
schema:
type: string
description: >
Cursor field intended for pagination purposes.
Represents the Unix time in nanoseconds at which a message was stored.
It could be empty for retrieving the first page,
and will be returned from the GET response so that
it can be part of the next page request.
example: '1680590945000000000'
- name: digest
in: query
schema:
type: string
description: >
Cursor field intended for pagination purposes.
URL-base64-encoded string computed as a hash of the
a message content topic plus a message payload.
It could be empty for retrieving the first page,
and will be returned from the GET response so that
it can be part of the next page request.
example: 'Gc4ACThW5t2QQO82huq3WnDv%2FapPPJpD%2FwJfxDxAnR0%3D'
- name: pageSize
in: query
schema:
type: string
description: >
Number of messages to retrieve per page
example: '5'
- name: ascending
in: query
schema:
type: string
description: >
"true" for paging forward, "false" for paging backward
example: "true"
responses:
'200':
description: WakuV2 message history.
content:
application/json:
schema:
$ref: '#/components/schemas/StoreResponse'
'400':
description: Bad request error.
content:
text/plain:
type: string
'412':
description: Precondition failed.
content:
text/plain:
type: string
'500':
description: Internal server error.
content:
text/plain:
type: string
components:
schemas:
StoreResponse:
type: object
properties:
messages:
type: array
items:
$ref: '#/components/schemas/WakuMessage'
cursor:
$ref: '#/components/schemas/HistoryCursor'
error_message:
type: string
required:
- messages
HistoryCursor:
type: object
properties:
pubsub_topic:
type: string
sender_time:
type: string
store_time:
type: string
digest:
type: string
required:
- pubsub_topic
- sender_time
- store_time
- digest
WakuMessage:
type: object
properties:
payload:
type: string
content_topic:
type: string
version:
type: integer
format: int32
timestamp:
type: integer
format: int64
ephemeral:
type: boolean
required:
- payload
- content_topic

View File

@ -1,97 +0,0 @@
package rest
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"github.com/go-chi/chi/v5"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/utils"
"google.golang.org/protobuf/proto"
)
func TestGetMessages(t *testing.T) {
db := MemoryDB(t)
node1, err := node.New(node.WithWakuStore(), node.WithMessageProvider(db))
require.NoError(t, err)
err = node1.Start(context.Background())
require.NoError(t, err)
defer node1.Stop()
topic1 := "1"
pubsubTopic1 := "topic1"
now := *utils.GetUnixEpoch()
msg1 := tests.CreateWakuMessage(topic1, proto.Int64(now+1))
msg2 := tests.CreateWakuMessage(topic1, proto.Int64(now+2))
msg3 := tests.CreateWakuMessage(topic1, proto.Int64(now+3))
node1.Broadcaster().Submit(protocol.NewEnvelope(msg1, *utils.GetUnixEpoch(), pubsubTopic1))
node1.Broadcaster().Submit(protocol.NewEnvelope(msg2, *utils.GetUnixEpoch(), pubsubTopic1))
node1.Broadcaster().Submit(protocol.NewEnvelope(msg3, *utils.GetUnixEpoch(), pubsubTopic1))
n1HostInfo, _ := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", node1.Host().ID().String()))
n1Addr := node1.ListenAddresses()[0].Encapsulate(n1HostInfo)
node2, err := node.New()
require.NoError(t, err)
err = node2.Start(context.Background())
require.NoError(t, err)
defer node2.Stop()
router := chi.NewRouter()
_ = NewStoreService(node2, router)
// TEST: get cursor
// TEST: get no messages
// First page
rr := httptest.NewRecorder()
queryParams := url.Values{
"peerAddr": {n1Addr.String()},
"pubsubTopic": {pubsubTopic1},
"pageSize": {"2"},
}
path := routeStoreMessagesV1 + "?" + queryParams.Encode()
req, _ := http.NewRequest(http.MethodGet, path, nil)
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
response := StoreResponse{}
err = json.Unmarshal(rr.Body.Bytes(), &response)
require.NoError(t, err)
require.Len(t, response.Messages, 2)
// Second page
rr = httptest.NewRecorder()
queryParams = url.Values{
"peerAddr": {n1Addr.String()},
"pubsubTopic": {pubsubTopic1},
"senderTime": {response.Cursor.SenderTime},
"storeTime": {response.Cursor.StoreTime},
"digest": {base64.URLEncoding.EncodeToString(response.Cursor.Digest)},
"pageSize": {"2"},
}
path = routeStoreMessagesV1 + "?" + queryParams.Encode()
req, _ = http.NewRequest(http.MethodGet, path, nil)
router.ServeHTTP(rr, req)
require.Equal(t, http.StatusOK, rr.Code)
response = StoreResponse{}
err = json.Unmarshal(rr.Body.Bytes(), &response)
require.NoError(t, err)
require.Len(t, response.Messages, 1)
require.Nil(t, response.Cursor)
}

View File

@ -1,83 +0,0 @@
package rest
import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"github.com/go-chi/chi/v5"
"go.uber.org/zap"
)
// The functions writes error response in plain text format with specified statusCode
func writeErrResponse(w http.ResponseWriter, log *zap.Logger, err error, statusCode int) {
w.WriteHeader(statusCode)
_, err = w.Write([]byte(err.Error()))
if err != nil {
log.Error("error while writing response", zap.Error(err))
}
}
// This function writes error or response in json format with statusCode as 500 in case of error
func writeErrOrResponse(w http.ResponseWriter, err error, value interface{}) {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
jsonResponse, err := json.Marshal(value)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
_, err = w.Write(jsonResponse)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
}
// This function writes a response in json format
func writeResponse(w http.ResponseWriter, value interface{}, code int) {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
jsonResponse, err := json.Marshal(value)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
// w.Write implicitly writes a 200 status code
// and only once we can write 2xx-5xx status code
// so any statusCode apart from 1xx being written to the header, will be ignored.
w.WriteHeader(code)
_, _ = w.Write(jsonResponse)
}
func topicFromPath(w http.ResponseWriter, req *http.Request, field string, logger *zap.Logger) string {
topic := chi.URLParam(req, field)
if topic == "" {
errMissing := fmt.Errorf("missing %s", field)
writeGetMessageErr(w, errMissing, http.StatusBadRequest, logger)
return ""
}
topic, err := url.QueryUnescape(topic)
if err != nil {
errInvalid := fmt.Errorf("invalid %s format", field)
writeGetMessageErr(w, errInvalid, http.StatusBadRequest, logger)
return ""
}
return topic
}
func writeGetMessageErr(w http.ResponseWriter, err error, code int, logger *zap.Logger) {
// write status before the body
w.WriteHeader(code)
logger.Error("get message", zap.Error(err))
if _, err := w.Write([]byte(err.Error())); err != nil {
logger.Error("writing response", zap.Error(err))
}
}

View File

@ -1,23 +0,0 @@
package rest
import (
"database/sql"
"testing"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/waku/persistence"
"github.com/waku-org/go-waku/waku/persistence/sqlite"
"github.com/waku-org/go-waku/waku/v2/utils"
)
func MemoryDB(t *testing.T) *persistence.DBStore {
var db *sql.DB
db, err := sqlite.NewDB(":memory:", utils.Logger())
require.NoError(t, err)
dbStore, err := persistence.NewDBStore(prometheus.DefaultRegisterer, utils.Logger(), persistence.WithDB(db), persistence.WithMigrations(sqlite.Migrations))
require.NoError(t, err)
return dbStore
}

View File

@ -1,97 +0,0 @@
package rest
import (
"context"
"fmt"
"net/http"
"sync"
"github.com/go-chi/chi/v5"
"github.com/go-chi/chi/v5/middleware"
"github.com/waku-org/go-waku/waku/v2/node"
"go.uber.org/zap"
)
type WakuRest struct {
node *node.WakuNode
server *http.Server
log *zap.Logger
relayService *RelayService
filterService *FilterService
}
type RestConfig struct {
Address string
Port uint
EnablePProf bool
EnableAdmin bool
RelayCacheCapacity uint
FilterCacheCapacity uint
}
func NewWakuRest(node *node.WakuNode, config RestConfig, log *zap.Logger) *WakuRest {
wrpc := new(WakuRest)
wrpc.log = log.Named("rest")
mux := chi.NewRouter()
mux.Use(middleware.Logger)
mux.Use(middleware.NoCache)
if config.EnablePProf {
mux.Mount("/debug", middleware.Profiler())
}
_ = NewDebugService(node, mux)
_ = NewHealthService(node, mux)
_ = NewStoreService(node, mux)
_ = NewLightpushService(node, mux, log)
listenAddr := fmt.Sprintf("%s:%d", config.Address, config.Port)
server := &http.Server{
Addr: listenAddr,
Handler: mux,
}
wrpc.node = node
wrpc.server = server
if node.Relay() != nil {
relayService := NewRelayService(node, mux, config.RelayCacheCapacity, log)
wrpc.relayService = relayService
}
if config.EnableAdmin {
_ = NewAdminService(node, mux, wrpc.log)
}
if node.FilterLightnode() != nil {
filterService := NewFilterService(node, mux, int(config.FilterCacheCapacity), log)
server.RegisterOnShutdown(func() {
filterService.Stop()
})
wrpc.filterService = filterService
}
return wrpc
}
func (r *WakuRest) Start(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
if r.node.FilterLightnode() != nil {
go r.filterService.Start(ctx)
}
go func() {
_ = r.server.ListenAndServe()
}()
r.log.Info("server started", zap.String("addr", r.server.Addr))
}
func (r *WakuRest) Stop(ctx context.Context) error {
r.log.Info("shutting down server")
return r.server.Shutdown(ctx)
}

View File

@ -1,19 +0,0 @@
package rest
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/utils"
)
func TestWakuRest(t *testing.T) {
options := node.WithWakuStore()
n, err := node.New(options)
require.NoError(t, err)
rpc := NewWakuRest(n, RestConfig{Address: "127.0.0.1", Port: 8080, EnablePProf: false, EnableAdmin: false, RelayCacheCapacity: 10}, utils.Logger())
require.NotNil(t, rpc.server)
require.Equal(t, rpc.server.Addr, "127.0.0.1:8080")
}

View File

@ -1,23 +0,0 @@
//go:build !gowaku_no_rln
// +build !gowaku_no_rln
package server
import (
"fmt"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/rln"
)
func AppendRLNProof(node *node.WakuNode, msg *pb.WakuMessage) error {
_, rlnEnabled := node.RLNRelay().(*rln.WakuRLNRelay)
if rlnEnabled {
err := node.RLNRelay().AppendRLNProof(msg, node.Timesource().Now())
if err != nil {
return fmt.Errorf("could not append rln proof: %w", err)
}
}
return nil
}

View File

@ -1,48 +0,0 @@
package server
import (
"encoding/base64"
"strings"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
)
func IsWakuProtocol(protocol protocol.ID) bool {
return protocol == filter.FilterPushID_v20beta1 ||
protocol == filter.FilterSubscribeID_v20beta1 ||
protocol == relay.WakuRelayID_v200 ||
protocol == lightpush.LightPushID_v20beta1 ||
protocol == store.StoreID_v20beta4
}
type Base64URLByte []byte
// UnmarshalText is used by json.Unmarshal to decode both url-safe and standard
// base64 encoded strings with and without padding
func (h *Base64URLByte) UnmarshalText(b []byte) error {
inputValue := ""
if b != nil {
inputValue = string(b)
}
enc := base64.StdEncoding
if strings.ContainsAny(inputValue, "-_") {
enc = base64.URLEncoding
}
if len(inputValue)%4 != 0 {
enc = enc.WithPadding(base64.NoPadding)
}
decodedBytes, err := enc.DecodeString(inputValue)
if err != nil {
return err
}
*h = decodedBytes
return nil
}