mirror of
https://github.com/status-im/go-waku.git
synced 2025-01-12 14:54:19 +00:00
chore: remove RPC server (#1008)
Co-authored-by: Prem Chaitanya Prathi <chaitanyaprem@gmail.com>
This commit is contained in:
parent
8e95f75a38
commit
57cf95cd5c
@ -499,40 +499,6 @@ var (
|
||||
Destination: &options.Metrics.Port,
|
||||
EnvVars: []string{"WAKUNODE2_METRICS_SERVER_PORT"},
|
||||
})
|
||||
RPCFlag = altsrc.NewBoolFlag(&cli.BoolFlag{
|
||||
Name: "rpc",
|
||||
Usage: "Enable the rpc server",
|
||||
Destination: &options.RPCServer.Enable,
|
||||
EnvVars: []string{"WAKUNODE2_RPC"},
|
||||
})
|
||||
RPCPort = altsrc.NewIntFlag(&cli.IntFlag{
|
||||
Name: "rpc-port",
|
||||
Value: 8545,
|
||||
Usage: "Listening port of the rpc server",
|
||||
Destination: &options.RPCServer.Port,
|
||||
EnvVars: []string{"WAKUNODE2_RPC_PORT"},
|
||||
})
|
||||
RPCAddress = altsrc.NewStringFlag(&cli.StringFlag{
|
||||
Name: "rpc-address",
|
||||
Value: "127.0.0.1",
|
||||
Usage: "Listening address of the rpc server",
|
||||
Destination: &options.RPCServer.Address,
|
||||
EnvVars: []string{"WAKUNODE2_RPC_ADDRESS"},
|
||||
})
|
||||
RPCRelayCacheCapacity = altsrc.NewIntFlag(&cli.IntFlag{
|
||||
Name: "rpc-relay-cache-capacity",
|
||||
Value: 30,
|
||||
Usage: "Capacity of the Relay REST API message cache",
|
||||
Destination: &options.RPCServer.RelayCacheCapacity,
|
||||
EnvVars: []string{"WAKUNODE2_RPC_RELAY_CACHE_CAPACITY"},
|
||||
})
|
||||
RPCAdmin = altsrc.NewBoolFlag(&cli.BoolFlag{
|
||||
Name: "rpc-admin",
|
||||
Value: false,
|
||||
Usage: "Enable access to JSON-RPC Admin API",
|
||||
Destination: &options.RPCServer.Admin,
|
||||
EnvVars: []string{"WAKUNODE2_RPC_ADMIN"},
|
||||
})
|
||||
RESTFlag = altsrc.NewBoolFlag(&cli.BoolFlag{
|
||||
Name: "rest",
|
||||
Usage: "Enable Waku REST HTTP server",
|
||||
|
@ -90,11 +90,6 @@ func main() {
|
||||
MetricsServer,
|
||||
MetricsServerAddress,
|
||||
MetricsServerPort,
|
||||
RPCFlag,
|
||||
RPCPort,
|
||||
RPCAddress,
|
||||
RPCRelayCacheCapacity,
|
||||
RPCAdmin,
|
||||
RESTFlag,
|
||||
RESTAddress,
|
||||
RESTPort,
|
||||
|
@ -40,7 +40,6 @@ import (
|
||||
ws "github.com/libp2p/go-libp2p/p2p/transport/websocket"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"github.com/waku-org/go-waku/cmd/waku/server/rest"
|
||||
"github.com/waku-org/go-waku/cmd/waku/server/rpc"
|
||||
"github.com/waku-org/go-waku/logging"
|
||||
"github.com/waku-org/go-waku/waku/metrics"
|
||||
"github.com/waku-org/go-waku/waku/persistence"
|
||||
@ -400,12 +399,6 @@ func Execute(options NodeOptions) error {
|
||||
}
|
||||
}
|
||||
|
||||
var rpcServer *rpc.WakuRPC
|
||||
if options.RPCServer.Enable {
|
||||
rpcServer = rpc.NewWakuRPC(wakuNode, options.RPCServer.Address, options.RPCServer.Port, options.RPCServer.Admin, options.PProf, options.RPCServer.RelayCacheCapacity, logger)
|
||||
rpcServer.Start()
|
||||
}
|
||||
|
||||
var restServer *rest.WakuRest
|
||||
if options.RESTServer.Enable {
|
||||
wg.Add(1)
|
||||
@ -432,12 +425,6 @@ func Execute(options NodeOptions) error {
|
||||
// shut the node down
|
||||
wakuNode.Stop()
|
||||
|
||||
if options.RPCServer.Enable {
|
||||
if err := rpcServer.Stop(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if options.RESTServer.Enable {
|
||||
if err := restServer.Stop(ctx); err != nil {
|
||||
return err
|
||||
|
@ -101,15 +101,6 @@ type MetricsOptions struct {
|
||||
Port int
|
||||
}
|
||||
|
||||
// RPCServerOptions are settings used to start a json rpc server
|
||||
type RPCServerOptions struct {
|
||||
Enable bool
|
||||
Port int
|
||||
Address string
|
||||
Admin bool
|
||||
RelayCacheCapacity int
|
||||
}
|
||||
|
||||
// RESTServerOptions are settings used to start a rest http server
|
||||
type RESTServerOptions struct {
|
||||
Enable bool
|
||||
@ -185,6 +176,5 @@ type NodeOptions struct {
|
||||
DNSDiscovery DNSDiscoveryOptions
|
||||
Rendezvous RendezvousOptions
|
||||
Metrics MetricsOptions
|
||||
RPCServer RPCServerOptions
|
||||
RESTServer RESTServerOptions
|
||||
}
|
||||
|
@ -1,78 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/protocol"
|
||||
ma "github.com/multiformats/go-multiaddr"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/waku-org/go-waku/cmd/waku/server"
|
||||
"github.com/waku-org/go-waku/waku/v2/node"
|
||||
)
|
||||
|
||||
type AdminService struct {
|
||||
node *node.WakuNode
|
||||
log *zap.Logger
|
||||
}
|
||||
|
||||
type GetPeersArgs struct {
|
||||
}
|
||||
|
||||
type PeersArgs struct {
|
||||
Peers []string `json:"peers,omitempty"`
|
||||
}
|
||||
|
||||
type PeerReply struct {
|
||||
Multiaddr string `json:"multiaddr,omitempty"`
|
||||
Protocol protocol.ID `json:"protocol,omitempty"`
|
||||
Connected bool `json:"connected,omitempty"`
|
||||
}
|
||||
|
||||
type PeersReply []PeerReply
|
||||
|
||||
func (a *AdminService) PostV1Peers(req *http.Request, args *PeersArgs, reply *SuccessReply) error {
|
||||
for _, peer := range args.Peers {
|
||||
addr, err := ma.NewMultiaddr(peer)
|
||||
if err != nil {
|
||||
a.log.Error("building multiaddr", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
err = a.node.DialPeerWithMultiAddress(req.Context(), addr)
|
||||
if err != nil {
|
||||
a.log.Error("dialing peers", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
*reply = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *AdminService) GetV1Peers(req *http.Request, args *GetPeersArgs, reply *PeersReply) error {
|
||||
peers, err := a.node.Peers()
|
||||
if err != nil {
|
||||
a.log.Error("getting peers", zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
for _, peer := range peers {
|
||||
if peer.ID.String() == a.node.Host().ID().String() {
|
||||
//Skip own node id
|
||||
continue
|
||||
}
|
||||
for _, addr := range peer.Addrs {
|
||||
for _, proto := range peer.Protocols {
|
||||
if !server.IsWakuProtocol(proto) {
|
||||
continue
|
||||
}
|
||||
*reply = append(*reply, PeerReply{
|
||||
Multiaddr: addr.String(),
|
||||
Protocol: proto,
|
||||
Connected: peer.Connected,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
@ -1,77 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/waku-org/go-waku/tests"
|
||||
"github.com/waku-org/go-waku/waku/v2/node"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
)
|
||||
|
||||
func makeAdminService(t *testing.T) *AdminService {
|
||||
options := node.WithWakuRelay()
|
||||
n, err := node.New(options)
|
||||
require.NoError(t, err)
|
||||
err = n.Start(context.Background())
|
||||
require.NoError(t, err)
|
||||
return &AdminService{n, utils.Logger()}
|
||||
}
|
||||
|
||||
func TestV1Peers(t *testing.T) {
|
||||
port, err := tests.FindFreePort(t, "", 5)
|
||||
require.NoError(t, err)
|
||||
|
||||
broadcaster := relay.NewBroadcaster(10)
|
||||
require.NoError(t, broadcaster.Start(context.Background()))
|
||||
|
||||
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
|
||||
require.NoError(t, err)
|
||||
relay := relay.NewWakuRelay(broadcaster, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
|
||||
relay.SetHost(host)
|
||||
err = relay.Start(context.Background())
|
||||
require.NoError(t, err)
|
||||
defer relay.Stop()
|
||||
|
||||
var reply PeersReply
|
||||
|
||||
request, err := http.NewRequest(http.MethodPost, "url", bytes.NewReader([]byte("")))
|
||||
require.NoError(t, err)
|
||||
|
||||
a := makeAdminService(t)
|
||||
|
||||
err = a.GetV1Peers(request, &GetPeersArgs{}, &reply)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, reply, 0)
|
||||
|
||||
var reply2 SuccessReply
|
||||
|
||||
hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", host.ID().String()))
|
||||
require.NoError(t, err)
|
||||
|
||||
var addr multiaddr.Multiaddr
|
||||
for _, a := range host.Addrs() {
|
||||
addr = a.Encapsulate(hostInfo)
|
||||
break
|
||||
}
|
||||
err = a.PostV1Peers(request, &PeersArgs{Peers: []string{addr.String()}}, &reply2)
|
||||
require.NoError(t, err)
|
||||
require.True(t, reply2)
|
||||
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
err = a.GetV1Peers(request, &GetPeersArgs{}, &reply)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, reply, 1)
|
||||
}
|
@ -1,195 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
"github.com/gorilla/rpc/v2"
|
||||
"golang.org/x/text/cases"
|
||||
"golang.org/x/text/language"
|
||||
)
|
||||
|
||||
// Based on github.com/gorilla/rpc/v2/json which is governed by a BSD-style license
|
||||
|
||||
var null = json.RawMessage([]byte("null"))
|
||||
|
||||
// An Error is a wrapper for a JSON interface value. It can be used by either
|
||||
// a service's handler func to write more complex JSON data to an error field
|
||||
// of a server's response, or by a client to read it.
|
||||
type Error struct {
|
||||
Data interface{}
|
||||
}
|
||||
|
||||
func (e *Error) Error() string {
|
||||
return fmt.Sprintf("%v", e.Data)
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
// Request and Response
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
// serverRequest represents a JSON-RPC request received by the server.
|
||||
type serverRequest struct {
|
||||
// A String containing the name of the method to be invoked.
|
||||
Method string `json:"method"`
|
||||
// An Array of objects to pass as arguments to the method.
|
||||
Params *json.RawMessage `json:"params"`
|
||||
// The request id. This can be of any type. It is used to match the
|
||||
// response with the request that it is replying to.
|
||||
ID *json.RawMessage `json:"id"`
|
||||
}
|
||||
|
||||
// serverResponse represents a JSON-RPC response returned by the server.
|
||||
type serverResponse struct {
|
||||
// The Object that was returned by the invoked method. This must be null
|
||||
// in case there was an error invoking the method.
|
||||
Result interface{} `json:"result"`
|
||||
// An Error object if there was an error invoking the method. It must be
|
||||
// null if there was no error.
|
||||
Error interface{} `json:"error"`
|
||||
// This must be the same id as the request it is responding to.
|
||||
ID *json.RawMessage `json:"id"`
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
// Codec
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
// NewCodec returns a new SnakeCaseCodec Codec.
|
||||
func NewSnakeCaseCodec() *SnakeCaseCodec {
|
||||
return &SnakeCaseCodec{}
|
||||
}
|
||||
|
||||
// SnakeCaseCodec creates a CodecRequest to process each request.
|
||||
type SnakeCaseCodec struct {
|
||||
}
|
||||
|
||||
// NewRequest returns a CodecRequest.
|
||||
func (c *SnakeCaseCodec) NewRequest(r *http.Request) rpc.CodecRequest {
|
||||
return newCodecRequest(r)
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
// CodecRequest
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
// newCodecRequest returns a new CodecRequest.
|
||||
func newCodecRequest(r *http.Request) rpc.CodecRequest {
|
||||
// Decode the request body and check if RPC method is valid.
|
||||
req := new(serverRequest)
|
||||
err := json.NewDecoder(r.Body).Decode(req)
|
||||
r.Body.Close()
|
||||
return &CodecRequest{request: req, err: err}
|
||||
}
|
||||
|
||||
// CodecRequest decodes and encodes a single request.
|
||||
type CodecRequest struct {
|
||||
request *serverRequest
|
||||
err error
|
||||
}
|
||||
|
||||
// Method returns the RPC method for the current request.
|
||||
//
|
||||
// The method uses a dotted notation as in "Service.Method".
|
||||
func (c *CodecRequest) Method() (string, error) {
|
||||
if c.err == nil {
|
||||
return toWakuMethod(c.request.Method), nil
|
||||
}
|
||||
return "", c.err
|
||||
}
|
||||
|
||||
// toWakuMethod transform get_waku_v2_debug_v1_info to Debug.GetV1Info
|
||||
func toWakuMethod(input string) string {
|
||||
typ := "get"
|
||||
if strings.HasPrefix(input, "post") {
|
||||
typ = "post"
|
||||
} else if strings.HasPrefix(input, "delete") {
|
||||
typ = "delete"
|
||||
}
|
||||
|
||||
base := typ + "_waku_v2_"
|
||||
cleanedInput := strings.Replace(input, base, "", 1)
|
||||
splited := strings.Split(cleanedInput, "_")
|
||||
|
||||
c := cases.Title(language.AmericanEnglish)
|
||||
|
||||
method := c.String(typ)
|
||||
for _, val := range splited[1:] {
|
||||
method = method + c.String(val)
|
||||
}
|
||||
|
||||
return c.String(splited[0]) + "." + method
|
||||
}
|
||||
|
||||
// ReadRequest fills the request object for the RPC method.
|
||||
func (c *CodecRequest) ReadRequest(args interface{}) error {
|
||||
if c.err == nil {
|
||||
if c.request.Params != nil {
|
||||
// JSON params is array value. RPC params is struct.
|
||||
// Attempt to unmarshal into array containing the request struct.
|
||||
params := [1]interface{}{args}
|
||||
err := json.Unmarshal(*c.request.Params, ¶ms)
|
||||
if err != nil {
|
||||
// This failed so we might have received an array of parameters
|
||||
// instead of a object
|
||||
argsValueOf := reflect.Indirect(reflect.ValueOf(args))
|
||||
if argsValueOf.Kind() == reflect.Struct {
|
||||
var params []interface{}
|
||||
for i := 0; i < argsValueOf.NumField(); i++ {
|
||||
params = append(params, argsValueOf.Field(i).Addr().Interface())
|
||||
}
|
||||
c.err = json.Unmarshal(*c.request.Params, ¶ms)
|
||||
} else {
|
||||
// Unknown field type...
|
||||
c.err = err
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
c.err = errors.New("rpc: method request ill-formed: missing params field")
|
||||
}
|
||||
}
|
||||
return c.err
|
||||
}
|
||||
|
||||
// WriteResponse encodes the response and writes it to the ResponseWriter.
|
||||
func (c *CodecRequest) WriteResponse(w http.ResponseWriter, reply interface{}) {
|
||||
if c.request.ID != nil {
|
||||
// Id is null for notifications and they don't have a response.
|
||||
res := &serverResponse{
|
||||
Result: reply,
|
||||
Error: &null,
|
||||
ID: c.request.ID,
|
||||
}
|
||||
c.writeServerResponse(w, 200, res)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *CodecRequest) WriteError(w http.ResponseWriter, _ int, err error) {
|
||||
res := &serverResponse{
|
||||
Result: &null,
|
||||
ID: c.request.ID,
|
||||
}
|
||||
if jsonErr, ok := err.(*Error); ok {
|
||||
res.Error = jsonErr.Data
|
||||
} else {
|
||||
res.Error = err.Error()
|
||||
}
|
||||
c.writeServerResponse(w, 400, res)
|
||||
}
|
||||
|
||||
func (c *CodecRequest) writeServerResponse(w http.ResponseWriter, status int, res *serverResponse) {
|
||||
b, err := json.Marshal(res)
|
||||
if err == nil {
|
||||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||
w.WriteHeader(status)
|
||||
_, _ = w.Write(b)
|
||||
} else {
|
||||
// Not sure in which case will this happen. But seems harmless.
|
||||
rpc.WriteError(w, status, err.Error())
|
||||
}
|
||||
}
|
@ -1,18 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestConvertWakuMethod(t *testing.T) {
|
||||
res := toWakuMethod("get_waku_v2_debug_v1_info")
|
||||
require.Equal(t, "Debug.GetV1Info", res)
|
||||
|
||||
res = toWakuMethod("post_waku_v2_relay_v1_message")
|
||||
require.Equal(t, "Relay.PostV1Message", res)
|
||||
|
||||
res = toWakuMethod("delete_waku_v2_relay_v1_subscriptions")
|
||||
require.Equal(t, "Relay.DeleteV1Subscriptions", res)
|
||||
}
|
@ -1,40 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/waku-org/go-waku/waku/v2/node"
|
||||
)
|
||||
|
||||
type DebugService struct {
|
||||
node *node.WakuNode
|
||||
}
|
||||
|
||||
type InfoArgs struct {
|
||||
}
|
||||
|
||||
type InfoReply struct {
|
||||
ENRUri string `json:"enrUri,omitempty"`
|
||||
ListenAddresses []string `json:"listenAddresses,omitempty"`
|
||||
}
|
||||
|
||||
func NewDebugService(node *node.WakuNode) *DebugService {
|
||||
return &DebugService{
|
||||
node: node,
|
||||
}
|
||||
}
|
||||
|
||||
func (d *DebugService) GetV1Info(r *http.Request, args *InfoArgs, reply *InfoReply) error {
|
||||
reply.ENRUri = d.node.ENR().String()
|
||||
for _, addr := range d.node.ListenAddresses() {
|
||||
reply.ListenAddresses = append(reply.ListenAddresses, addr.String())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type VersionResponse string
|
||||
|
||||
func (d *DebugService) GetV1Version(r *http.Request, args *InfoArgs, reply *VersionResponse) error {
|
||||
*reply = VersionResponse(node.GetVersionInfo().String())
|
||||
return nil
|
||||
}
|
@ -1,31 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"net/http"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/waku-org/go-waku/waku/v2/node"
|
||||
)
|
||||
|
||||
func TestGetV1Info(t *testing.T) {
|
||||
var reply InfoReply
|
||||
|
||||
request, err := http.NewRequest(http.MethodPost, "url", bytes.NewReader([]byte("")))
|
||||
require.NoError(t, err)
|
||||
|
||||
wakuNode1, err := node.New()
|
||||
require.NoError(t, err)
|
||||
defer wakuNode1.Stop()
|
||||
err = wakuNode1.Start(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
d := &DebugService{
|
||||
node: wakuNode1,
|
||||
}
|
||||
|
||||
err = d.GetV1Info(request, &InfoArgs{}, &reply)
|
||||
require.NoError(t, err)
|
||||
}
|
@ -1,139 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"github.com/waku-org/go-waku/waku/v2/node"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb"
|
||||
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type FilterService struct {
|
||||
node *node.WakuNode
|
||||
log *zap.Logger
|
||||
|
||||
messages map[string][]*wpb.WakuMessage
|
||||
cacheCapacity int
|
||||
messagesMutex sync.RWMutex
|
||||
|
||||
runner *runnerService
|
||||
}
|
||||
|
||||
type FilterContentArgs struct {
|
||||
Topic string `json:"topic,omitempty"`
|
||||
ContentFilters []*pb.FilterRequest_ContentFilter `json:"contentFilters,omitempty"`
|
||||
}
|
||||
|
||||
type ContentTopicArgs struct {
|
||||
ContentTopic string `json:"contentTopic,omitempty"`
|
||||
}
|
||||
|
||||
func NewFilterService(node *node.WakuNode, cacheCapacity int, log *zap.Logger) *FilterService {
|
||||
s := &FilterService{
|
||||
node: node,
|
||||
log: log.Named("filter"),
|
||||
cacheCapacity: cacheCapacity,
|
||||
messages: make(map[string][]*wpb.WakuMessage),
|
||||
}
|
||||
s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope)
|
||||
return s
|
||||
}
|
||||
|
||||
func makeContentFilter(args *FilterContentArgs) legacy_filter.ContentFilter {
|
||||
var contentTopics []string
|
||||
for _, contentFilter := range args.ContentFilters {
|
||||
contentTopics = append(contentTopics, contentFilter.ContentTopic)
|
||||
}
|
||||
|
||||
return legacy_filter.ContentFilter{
|
||||
Topic: args.Topic,
|
||||
ContentTopics: contentTopics,
|
||||
}
|
||||
}
|
||||
|
||||
func (f *FilterService) addEnvelope(envelope *protocol.Envelope) {
|
||||
f.messagesMutex.Lock()
|
||||
defer f.messagesMutex.Unlock()
|
||||
|
||||
contentTopic := envelope.Message().ContentTopic
|
||||
if _, ok := f.messages[contentTopic]; !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// Keep a specific max number of messages per topic
|
||||
if len(f.messages[envelope.PubsubTopic()]) >= f.cacheCapacity {
|
||||
f.messages[envelope.PubsubTopic()] = f.messages[envelope.PubsubTopic()][1:]
|
||||
}
|
||||
|
||||
f.messages[contentTopic] = append(f.messages[contentTopic], envelope.Message())
|
||||
}
|
||||
|
||||
func (f *FilterService) Start() {
|
||||
f.runner.Start()
|
||||
}
|
||||
|
||||
func (f *FilterService) Stop() {
|
||||
f.runner.Stop()
|
||||
}
|
||||
|
||||
func (f *FilterService) PostV1Subscription(req *http.Request, args *FilterContentArgs, reply *SuccessReply) error {
|
||||
_, _, err := f.node.LegacyFilter().Subscribe(
|
||||
req.Context(),
|
||||
makeContentFilter(args),
|
||||
legacy_filter.WithAutomaticPeerSelection(),
|
||||
)
|
||||
if err != nil {
|
||||
f.log.Error("subscribing to topic", zap.String("topic", args.Topic), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
for _, contentFilter := range args.ContentFilters {
|
||||
f.messages[contentFilter.ContentTopic] = make([]*wpb.WakuMessage, 0)
|
||||
}
|
||||
|
||||
*reply = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *FilterService) DeleteV1Subscription(req *http.Request, args *FilterContentArgs, reply *SuccessReply) error {
|
||||
err := f.node.LegacyFilter().UnsubscribeFilter(
|
||||
req.Context(),
|
||||
makeContentFilter(args),
|
||||
)
|
||||
if err != nil {
|
||||
f.log.Error("unsubscribing from topic", zap.String("topic", args.Topic), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
for _, contentFilter := range args.ContentFilters {
|
||||
delete(f.messages, contentFilter.ContentTopic)
|
||||
}
|
||||
|
||||
*reply = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *FilterService) GetV1Messages(req *http.Request, args *ContentTopicArgs, reply *MessagesReply) error {
|
||||
f.messagesMutex.Lock()
|
||||
defer f.messagesMutex.Unlock()
|
||||
|
||||
if _, ok := f.messages[args.ContentTopic]; !ok {
|
||||
return fmt.Errorf("topic %s not subscribed", args.ContentTopic)
|
||||
}
|
||||
|
||||
for i := range f.messages[args.ContentTopic] {
|
||||
msg := f.messages[args.ContentTopic][i]
|
||||
rpcMsg, err := ProtoToRPC(msg)
|
||||
if err != nil {
|
||||
f.log.Warn("could not include message in response", zap.Error(err))
|
||||
} else {
|
||||
*reply = append(*reply, rpcMsg)
|
||||
}
|
||||
}
|
||||
|
||||
f.messages[args.ContentTopic] = make([]*wpb.WakuMessage, 0)
|
||||
return nil
|
||||
}
|
@ -1,177 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/waku-org/go-waku/tests"
|
||||
"github.com/waku-org/go-waku/waku/v2/node"
|
||||
"github.com/waku-org/go-waku/waku/v2/peerstore"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb"
|
||||
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
)
|
||||
|
||||
var testTopic = "test"
|
||||
|
||||
func makeFilterService(t *testing.T, isFullNode bool) *FilterService {
|
||||
var nodeOpts []node.WakuNodeOption
|
||||
|
||||
nodeOpts = append(nodeOpts, node.WithLegacyWakuFilter(isFullNode))
|
||||
if isFullNode {
|
||||
nodeOpts = append(nodeOpts, node.WithWakuRelay())
|
||||
}
|
||||
|
||||
n, err := node.New(nodeOpts...)
|
||||
require.NoError(t, err)
|
||||
err = n.Start(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
if isFullNode {
|
||||
sub, err := n.Relay().Subscribe(context.Background(), protocol.NewContentFilter(testTopic))
|
||||
go func() {
|
||||
for range sub[0].Ch {
|
||||
}
|
||||
}()
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
return NewFilterService(n, 30, utils.Logger())
|
||||
}
|
||||
|
||||
func TestFilterSubscription(t *testing.T) {
|
||||
t.Skip("skipping since it is legacy filter")
|
||||
port, err := tests.FindFreePort(t, "", 5)
|
||||
require.NoError(t, err)
|
||||
|
||||
host, err := tests.MakeHost(context.Background(), port, rand.Reader)
|
||||
require.NoError(t, err)
|
||||
|
||||
b := relay.NewBroadcaster(10)
|
||||
require.NoError(t, b.Start(context.Background()))
|
||||
node := relay.NewWakuRelay(b, 0, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
|
||||
node.SetHost(host)
|
||||
err = node.Start(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = node.Subscribe(context.Background(), protocol.NewContentFilter(testTopic))
|
||||
require.NoError(t, err)
|
||||
|
||||
b2 := relay.NewBroadcaster(10)
|
||||
require.NoError(t, b2.Start(context.Background()))
|
||||
f := legacy_filter.NewWakuFilter(b2, false, timesource.NewDefaultClock(), prometheus.DefaultRegisterer, utils.Logger())
|
||||
f.SetHost(host)
|
||||
sub := relay.NewSubscription(protocol.NewContentFilter(relay.DefaultWakuTopic))
|
||||
err = f.Start(context.Background(), sub)
|
||||
require.NoError(t, err)
|
||||
|
||||
d := makeFilterService(t, true)
|
||||
defer d.node.Stop()
|
||||
|
||||
hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", host.ID().String()))
|
||||
require.NoError(t, err)
|
||||
|
||||
var addr multiaddr.Multiaddr
|
||||
for _, a := range host.Addrs() {
|
||||
addr = a.Encapsulate(hostInfo)
|
||||
break
|
||||
}
|
||||
|
||||
_, err = d.node.AddPeer(addr, peerstore.Static, []string{testTopic}, legacy_filter.FilterID_v20beta1)
|
||||
require.NoError(t, err)
|
||||
|
||||
args := &FilterContentArgs{Topic: testTopic, ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: "ct"}}}
|
||||
|
||||
var reply SuccessReply
|
||||
err = d.PostV1Subscription(
|
||||
makeRequest(t),
|
||||
args,
|
||||
&reply,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.True(t, reply)
|
||||
|
||||
err = d.DeleteV1Subscription(
|
||||
makeRequest(t),
|
||||
args,
|
||||
&reply,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.True(t, reply)
|
||||
}
|
||||
|
||||
func TestFilterGetV1Messages(t *testing.T) {
|
||||
t.Skip("skipping since it is legacy filter")
|
||||
|
||||
serviceA := makeFilterService(t, true)
|
||||
var reply SuccessReply
|
||||
|
||||
serviceB := makeFilterService(t, false)
|
||||
go serviceB.Start()
|
||||
defer serviceB.Stop()
|
||||
|
||||
hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", serviceB.node.Host().ID().String()))
|
||||
require.NoError(t, err)
|
||||
|
||||
var addr multiaddr.Multiaddr
|
||||
for _, a := range serviceB.node.Host().Addrs() {
|
||||
addr = a.Encapsulate(hostInfo)
|
||||
break
|
||||
}
|
||||
err = serviceA.node.DialPeerWithMultiAddress(context.Background(), addr)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Wait for the dial to complete
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
args := &FilterContentArgs{Topic: testTopic, ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: "ct"}}}
|
||||
err = serviceB.PostV1Subscription(
|
||||
makeRequest(t),
|
||||
args,
|
||||
&reply,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.True(t, reply)
|
||||
|
||||
// Wait for the subscription to be started
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
_, err = serviceA.node.Relay().Publish(
|
||||
context.Background(),
|
||||
&wpb.WakuMessage{ContentTopic: "ct"},
|
||||
relay.WithPubSubTopic(testTopic),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.True(t, reply)
|
||||
|
||||
// Wait for the message to be received
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
var messagesReply1 MessagesReply
|
||||
err = serviceB.GetV1Messages(
|
||||
makeRequest(t),
|
||||
&ContentTopicArgs{"ct"},
|
||||
&messagesReply1,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, messagesReply1, 1)
|
||||
|
||||
var messagesReply2 MessagesReply
|
||||
err = serviceB.GetV1Messages(
|
||||
makeRequest(t),
|
||||
&ContentTopicArgs{"ct"},
|
||||
&messagesReply2,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, messagesReply2, 0)
|
||||
}
|
@ -1,229 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/waku-org/go-waku/cmd/waku/server"
|
||||
"github.com/waku-org/go-waku/logging"
|
||||
"github.com/waku-org/go-waku/waku/v2/node"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var errChannelClosed = errors.New("consume channel is closed for subscription")
|
||||
|
||||
// RelayService represents the JSON RPC service for WakuRelay
|
||||
type RelayService struct {
|
||||
node *node.WakuNode
|
||||
|
||||
log *zap.Logger
|
||||
|
||||
cacheCapacity int
|
||||
}
|
||||
|
||||
// RelayMessageArgs represents the requests used for posting messages
|
||||
type RelayMessageArgs struct {
|
||||
Topic string `json:"topic,omitempty"`
|
||||
Message *RPCWakuMessage `json:"message,omitempty"`
|
||||
}
|
||||
|
||||
// RelayAutoMessageArgs represents the requests used for posting messages
|
||||
type RelayAutoMessageArgs struct {
|
||||
Message *RPCWakuMessage `json:"message,omitempty"`
|
||||
}
|
||||
|
||||
// TopicsArgs represents the lists of topics to use when subscribing / unsubscribing
|
||||
type TopicsArgs struct {
|
||||
Topics []string `json:"topics,omitempty"`
|
||||
}
|
||||
|
||||
// TopicArgs represents a request that contains a single topic
|
||||
type TopicArgs struct {
|
||||
Topic string `json:"topic,omitempty"`
|
||||
}
|
||||
|
||||
// NewRelayService returns an instance of RelayService
|
||||
func NewRelayService(node *node.WakuNode, cacheCapacity int, log *zap.Logger) *RelayService {
|
||||
s := &RelayService{
|
||||
node: node,
|
||||
cacheCapacity: cacheCapacity,
|
||||
log: log.Named("relay"),
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
// Start starts the RelayService
|
||||
func (r *RelayService) Start() {
|
||||
}
|
||||
|
||||
// Stop stops the RelayService
|
||||
func (r *RelayService) Stop() {
|
||||
}
|
||||
|
||||
// PostV1Message is invoked when the json rpc request uses the post_waku_v2_relay_v1_message method
|
||||
func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, reply *SuccessReply) error {
|
||||
var err error
|
||||
|
||||
topic := relay.DefaultWakuTopic
|
||||
if args.Topic != "" {
|
||||
topic = args.Topic
|
||||
}
|
||||
|
||||
msg, err := args.Message.toProto()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = server.AppendRLNProof(r.node, msg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = r.node.Relay().Publish(req.Context(), msg, relay.WithPubSubTopic(topic))
|
||||
if err != nil {
|
||||
r.log.Error("publishing message", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
*reply = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// PostV1AutoSubscription is invoked when the json rpc request uses the post_waku_v2_relay_v1_auto_subscription
|
||||
// Note that this method takes contentTopics as an argument instead of pubsubtopics and uses autosharding to derive pubsubTopics.
|
||||
func (r *RelayService) PostV1AutoSubscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
|
||||
|
||||
_, err := r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter("", args.Topics...), relay.WithCacheSize(uint(r.cacheCapacity)))
|
||||
if err != nil {
|
||||
r.log.Error("subscribing to topics", zap.Strings("topics", args.Topics), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
//TODO: Handle partial errors.
|
||||
|
||||
*reply = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteV1AutoSubscription is invoked when the json rpc request uses the delete_waku_v2_relay_v1_auto_subscription
|
||||
// Note that this method takes contentTopics as an argument instead of pubsubtopics and uses autosharding to derive pubsubTopics.
|
||||
func (r *RelayService) DeleteV1AutoSubscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
|
||||
ctx := req.Context()
|
||||
|
||||
err := r.node.Relay().Unsubscribe(ctx, protocol.NewContentFilter("", args.Topics...))
|
||||
if err != nil {
|
||||
r.log.Error("unsubscribing from topics", zap.Strings("topic", args.Topics), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
//TODO: Handle partial errors.
|
||||
*reply = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// PostV1AutoMessage is invoked when the json rpc request uses the post_waku_v2_relay_v1_auto_message
|
||||
func (r *RelayService) PostV1AutoMessage(req *http.Request, args *RelayAutoMessageArgs, reply *SuccessReply) error {
|
||||
msg, err := args.Message.toProto()
|
||||
if err != nil {
|
||||
err = fmt.Errorf("invalid message format received: %w", err)
|
||||
r.log.Error("publishing message", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
if err = server.AppendRLNProof(r.node, msg); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = r.node.Relay().Publish(req.Context(), msg)
|
||||
if err != nil {
|
||||
r.log.Error("publishing message", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
*reply = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetV1AutoMessages is invoked when the json rpc request uses the get_waku_v2_relay_v1_auto_messages method
|
||||
// Note that this method takes contentTopic as an argument instead of pubSubtopic and uses autosharding.
|
||||
func (r *RelayService) GetV1AutoMessages(req *http.Request, args *TopicArgs, reply *MessagesReply) error {
|
||||
sub, err := r.node.Relay().GetSubscription(args.Topic)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
select {
|
||||
case msg, open := <-sub.Ch:
|
||||
if !open {
|
||||
r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", args.Topic))
|
||||
return errChannelClosed
|
||||
}
|
||||
rpcMsg, err := ProtoToRPC(msg.Message())
|
||||
if err != nil {
|
||||
r.log.Warn("could not include message in response", logging.HexBytes("hash", msg.Hash()), zap.Error(err))
|
||||
} else {
|
||||
*reply = append(*reply, rpcMsg)
|
||||
}
|
||||
default:
|
||||
break
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// PostV1Subscription is invoked when the json rpc request uses the post_waku_v2_relay_v1_subscription method
|
||||
func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
|
||||
|
||||
for _, topic := range args.Topics {
|
||||
var err error
|
||||
if topic == "" {
|
||||
topic = relay.DefaultWakuTopic
|
||||
}
|
||||
_, err = r.node.Relay().Subscribe(r.node.Relay().Context(), protocol.NewContentFilter(topic), relay.WithCacheSize(uint(r.cacheCapacity)))
|
||||
if err != nil {
|
||||
r.log.Error("subscribing to topic", zap.String("topic", topic), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
*reply = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteV1Subscription is invoked when the json rpc request uses the delete_waku_v2_relay_v1_subscription method
|
||||
func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
|
||||
ctx := req.Context()
|
||||
for _, topic := range args.Topics {
|
||||
err := r.node.Relay().Unsubscribe(ctx, protocol.NewContentFilter(topic))
|
||||
if err != nil {
|
||||
r.log.Error("unsubscribing from topic", zap.String("topic", topic), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
*reply = true
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetV1Messages is invoked when the json rpc request uses the get_waku_v2_relay_v1_messages method
|
||||
func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply *MessagesReply) error {
|
||||
|
||||
sub, err := r.node.Relay().GetSubscriptionWithPubsubTopic(args.Topic, "")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
select {
|
||||
case msg, open := <-sub.Ch:
|
||||
if !open {
|
||||
r.log.Error("consume channel is closed for subscription", zap.String("pubsubTopic", args.Topic))
|
||||
return errChannelClosed
|
||||
}
|
||||
m, err := ProtoToRPC(msg.Message())
|
||||
if err == nil {
|
||||
*reply = append(*reply, m)
|
||||
}
|
||||
default:
|
||||
break
|
||||
}
|
||||
return nil
|
||||
}
|
@ -1,159 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/waku-org/go-waku/waku/v2/node"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
)
|
||||
|
||||
func makeRelayService(t *testing.T) *RelayService {
|
||||
options := node.WithWakuRelayAndMinPeers(0)
|
||||
n, err := node.New(options)
|
||||
require.NoError(t, err)
|
||||
err = n.Start(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
return NewRelayService(n, 30, utils.Logger())
|
||||
}
|
||||
|
||||
func TestPostV1Message(t *testing.T) {
|
||||
var reply SuccessReply
|
||||
|
||||
d := makeRelayService(t)
|
||||
|
||||
msg := &pb.WakuMessage{
|
||||
Payload: []byte{1, 2, 3},
|
||||
ContentTopic: "abc",
|
||||
Timestamp: utils.GetUnixEpoch(),
|
||||
}
|
||||
|
||||
rpcWakuMsg, err := ProtoToRPC(msg)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = d.PostV1Message(
|
||||
makeRequest(t),
|
||||
&RelayMessageArgs{
|
||||
Message: rpcWakuMsg,
|
||||
},
|
||||
&reply,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.True(t, reply)
|
||||
}
|
||||
|
||||
func TestRelaySubscription(t *testing.T) {
|
||||
var reply SuccessReply
|
||||
|
||||
d := makeRelayService(t)
|
||||
args := &TopicsArgs{Topics: []string{"test"}}
|
||||
|
||||
err := d.PostV1Subscription(
|
||||
makeRequest(t),
|
||||
args,
|
||||
&reply,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.True(t, reply)
|
||||
|
||||
err = d.DeleteV1Subscription(
|
||||
makeRequest(t),
|
||||
args,
|
||||
&reply,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.True(t, reply)
|
||||
}
|
||||
|
||||
func TestRelayGetV1Messages(t *testing.T) {
|
||||
serviceA := makeRelayService(t)
|
||||
go serviceA.Start()
|
||||
defer serviceA.Stop()
|
||||
|
||||
var reply SuccessReply
|
||||
|
||||
serviceB := makeRelayService(t)
|
||||
go serviceB.Start()
|
||||
defer serviceB.Stop()
|
||||
|
||||
hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", serviceB.node.Host().ID().String()))
|
||||
require.NoError(t, err)
|
||||
|
||||
var addr multiaddr.Multiaddr
|
||||
for _, a := range serviceB.node.Host().Addrs() {
|
||||
addr = a.Encapsulate(hostInfo)
|
||||
break
|
||||
}
|
||||
err = serviceA.node.DialPeerWithMultiAddress(context.Background(), addr)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Wait for the dial to complete
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
args := &TopicsArgs{Topics: []string{"test"}}
|
||||
|
||||
// Subscribe A to topic
|
||||
err = serviceA.PostV1Subscription(
|
||||
makeRequest(t),
|
||||
args,
|
||||
&reply,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.True(t, reply)
|
||||
|
||||
// Subscribe B to topic
|
||||
err = serviceB.PostV1Subscription(
|
||||
makeRequest(t),
|
||||
args,
|
||||
&reply,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.True(t, reply)
|
||||
|
||||
// Wait for the subscription to be started
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
rpcWakuMsg, err := ProtoToRPC(&pb.WakuMessage{
|
||||
Payload: []byte("test"),
|
||||
ContentTopic: "test",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
err = serviceA.PostV1Message(
|
||||
makeRequest(t),
|
||||
&RelayMessageArgs{
|
||||
Topic: "test",
|
||||
Message: rpcWakuMsg,
|
||||
},
|
||||
&reply,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.True(t, reply)
|
||||
|
||||
// Wait for the message to be received
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
var messagesReply1 MessagesReply
|
||||
err = serviceB.GetV1Messages(
|
||||
makeRequest(t),
|
||||
&TopicArgs{"test"},
|
||||
&messagesReply1,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, messagesReply1, 1)
|
||||
|
||||
var messagesReply2 MessagesReply
|
||||
err = serviceB.GetV1Messages(
|
||||
makeRequest(t),
|
||||
&TopicArgs{"test"},
|
||||
&messagesReply2,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, messagesReply2, 0)
|
||||
}
|
@ -1,41 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type SuccessReply = bool
|
||||
|
||||
type Empty struct {
|
||||
}
|
||||
|
||||
type MessagesReply = []*RPCWakuMessage
|
||||
|
||||
type Base64URLByte []byte
|
||||
|
||||
// UnmarshalText is used by json.Unmarshal to decode both url-safe and standard
|
||||
// base64 encoded strings with and without padding
|
||||
func (h *Base64URLByte) UnmarshalText(b []byte) error {
|
||||
inputValue := ""
|
||||
if b != nil {
|
||||
inputValue = string(b)
|
||||
}
|
||||
|
||||
enc := base64.StdEncoding
|
||||
if strings.ContainsAny(inputValue, "-_") {
|
||||
enc = base64.URLEncoding
|
||||
}
|
||||
if len(inputValue)%4 != 0 {
|
||||
enc = enc.WithPadding(base64.NoPadding)
|
||||
}
|
||||
|
||||
decodedBytes, err := enc.DecodeString(inputValue)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
*h = decodedBytes
|
||||
|
||||
return nil
|
||||
}
|
@ -1,32 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||
)
|
||||
|
||||
type Adder func(msg *protocol.Envelope)
|
||||
|
||||
type runnerService struct {
|
||||
broadcaster relay.Broadcaster
|
||||
sub *relay.Subscription
|
||||
adder Adder
|
||||
}
|
||||
|
||||
func newRunnerService(broadcaster relay.Broadcaster, adder Adder) *runnerService {
|
||||
return &runnerService{
|
||||
broadcaster: broadcaster,
|
||||
adder: adder,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *runnerService) Start() {
|
||||
r.sub = r.broadcaster.RegisterForAll(relay.WithBufferSize(relay.DefaultRelaySubscriptionBufferSize))
|
||||
for envelope := range r.sub.Ch {
|
||||
r.adder(envelope)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *runnerService) Stop() {
|
||||
r.sub.Unsubscribe()
|
||||
}
|
@ -1,79 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/waku-org/go-waku/waku/v2/node"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/store/pb"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type StoreService struct {
|
||||
node *node.WakuNode
|
||||
log *zap.Logger
|
||||
}
|
||||
|
||||
// cursor *pb.Index
|
||||
// pageSize uint64
|
||||
// asc bool
|
||||
|
||||
type StorePagingOptions struct {
|
||||
PageSize uint64 `json:"pageSize,omitempty"`
|
||||
Cursor *pb.Index `json:"cursor,omitempty"`
|
||||
Forward bool `json:"forward,omitempty"`
|
||||
}
|
||||
|
||||
type StoreMessagesArgs struct {
|
||||
Topic string `json:"pubsubTopic,omitempty"`
|
||||
ContentFilters []string `json:"contentFilters,omitempty"`
|
||||
StartTime *int64 `json:"startTime,omitempty"`
|
||||
EndTime *int64 `json:"endTime,omitempty"`
|
||||
PagingOptions StorePagingOptions `json:"pagingOptions,omitempty"`
|
||||
}
|
||||
|
||||
type StoreMessagesReply struct {
|
||||
Messages []*RPCWakuMessage `json:"messages,omitempty"`
|
||||
PagingInfo StorePagingOptions `json:"pagingInfo,omitempty"`
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
func (s *StoreService) GetV1Messages(req *http.Request, args *StoreMessagesArgs, reply *StoreMessagesReply) error {
|
||||
options := []store.HistoryRequestOption{
|
||||
store.WithAutomaticRequestID(),
|
||||
store.WithAutomaticPeerSelection(),
|
||||
store.WithPaging(args.PagingOptions.Forward, args.PagingOptions.PageSize),
|
||||
store.WithCursor(args.PagingOptions.Cursor),
|
||||
}
|
||||
res, err := s.node.Store().Query(
|
||||
req.Context(),
|
||||
store.Query{
|
||||
PubsubTopic: args.Topic,
|
||||
ContentTopics: args.ContentFilters,
|
||||
StartTime: args.StartTime,
|
||||
EndTime: args.EndTime,
|
||||
},
|
||||
options...,
|
||||
)
|
||||
if err != nil {
|
||||
s.log.Error("querying messages", zap.Error(err))
|
||||
reply.Error = err.Error()
|
||||
return nil
|
||||
}
|
||||
|
||||
reply.Messages = make([]*RPCWakuMessage, len(res.Messages))
|
||||
for i := range res.Messages {
|
||||
msg, err := ProtoToRPC(res.Messages[i])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
reply.Messages[i] = msg
|
||||
}
|
||||
|
||||
reply.PagingInfo = StorePagingOptions{
|
||||
PageSize: args.PagingOptions.PageSize,
|
||||
Cursor: res.Cursor(),
|
||||
Forward: args.PagingOptions.Forward,
|
||||
}
|
||||
return nil
|
||||
}
|
@ -1,33 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/waku-org/go-waku/waku/v2/node"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
)
|
||||
|
||||
func makeStoreService(t *testing.T) *StoreService {
|
||||
options := node.WithWakuStore()
|
||||
n, err := node.New(options)
|
||||
require.NoError(t, err)
|
||||
err = n.Start(context.Background())
|
||||
require.NoError(t, err)
|
||||
return &StoreService{n, utils.Logger()}
|
||||
}
|
||||
|
||||
func TestStoreGetV1Messages(t *testing.T) {
|
||||
var reply StoreMessagesReply
|
||||
|
||||
s := makeStoreService(t)
|
||||
|
||||
err := s.GetV1Messages(
|
||||
makeRequest(t),
|
||||
&StoreMessagesArgs{},
|
||||
&reply,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, reply.Error)
|
||||
}
|
@ -1,41 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
)
|
||||
|
||||
func makeRequest(t *testing.T) *http.Request {
|
||||
request, err := http.NewRequest(http.MethodPost, "url", bytes.NewReader([]byte("")))
|
||||
require.NoError(t, err)
|
||||
return request
|
||||
}
|
||||
|
||||
func TestBase64Encoding(t *testing.T) {
|
||||
input := "Hello World"
|
||||
|
||||
rpcMsg, err := ProtoToRPC(&pb.WakuMessage{
|
||||
Payload: []byte(input),
|
||||
ContentTopic: "test",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
jsonBytes, err := json.Marshal(rpcMsg)
|
||||
require.NoError(t, err)
|
||||
|
||||
m := make(map[string]interface{})
|
||||
err = json.Unmarshal(jsonBytes, &m)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, base64.StdEncoding.EncodeToString([]byte(input)), m["payload"])
|
||||
|
||||
decodedRPCMsg := new(RPCWakuMessage)
|
||||
err = json.Unmarshal(jsonBytes, decodedRPCMsg)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, input, string(decodedRPCMsg.Payload))
|
||||
}
|
@ -1,103 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/waku-org/go-waku/cmd/waku/server"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||
rlnpb "github.com/waku-org/go-waku/waku/v2/protocol/rln/pb"
|
||||
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
type RateLimitProof struct {
|
||||
Proof Base64URLByte `json:"proof,omitempty"`
|
||||
MerkleRoot Base64URLByte `json:"merkle_root,omitempty"`
|
||||
Epoch Base64URLByte `json:"epoch,omitempty"`
|
||||
ShareX Base64URLByte `json:"share_x,omitempty"`
|
||||
ShareY Base64URLByte `json:"share_y,omitempty"`
|
||||
Nullifier Base64URLByte `json:"nullifier,omitempty"`
|
||||
RlnIdentifier Base64URLByte `json:"rln_identifier,omitempty"`
|
||||
}
|
||||
|
||||
type RPCWakuMessage struct {
|
||||
Payload server.Base64URLByte `json:"payload,omitempty"`
|
||||
ContentTopic string `json:"contentTopic,omitempty"`
|
||||
Version uint32 `json:"version"`
|
||||
Timestamp int64 `json:"timestamp,omitempty"`
|
||||
RateLimitProof *RateLimitProof `json:"rateLimitProof,omitempty"`
|
||||
Ephemeral bool `json:"ephemeral,omitempty"`
|
||||
}
|
||||
|
||||
func ProtoToRPC(input *pb.WakuMessage) (*RPCWakuMessage, error) {
|
||||
if input == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if err := input.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rpcWakuMsg := &RPCWakuMessage{
|
||||
Payload: input.Payload,
|
||||
ContentTopic: input.ContentTopic,
|
||||
Version: input.GetVersion(),
|
||||
Timestamp: input.GetTimestamp(),
|
||||
Ephemeral: input.GetEphemeral(),
|
||||
}
|
||||
|
||||
if input.RateLimitProof != nil {
|
||||
rateLimitProof := &rlnpb.RateLimitProof{}
|
||||
err := proto.Unmarshal(input.RateLimitProof, rateLimitProof)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rpcWakuMsg.RateLimitProof = &RateLimitProof{
|
||||
Proof: rateLimitProof.Proof,
|
||||
MerkleRoot: rateLimitProof.MerkleRoot,
|
||||
Epoch: rateLimitProof.Epoch,
|
||||
ShareX: rateLimitProof.ShareX,
|
||||
ShareY: rateLimitProof.ShareY,
|
||||
Nullifier: rateLimitProof.Nullifier,
|
||||
RlnIdentifier: rateLimitProof.RlnIdentifier,
|
||||
}
|
||||
}
|
||||
|
||||
return rpcWakuMsg, nil
|
||||
}
|
||||
|
||||
func (r *RPCWakuMessage) toProto() (*pb.WakuMessage, error) {
|
||||
if r == nil {
|
||||
return nil, errors.New("wakumessage is missing")
|
||||
}
|
||||
|
||||
msg := &pb.WakuMessage{
|
||||
Payload: r.Payload,
|
||||
ContentTopic: r.ContentTopic,
|
||||
Version: proto.Uint32(r.Version),
|
||||
Timestamp: proto.Int64(r.Timestamp),
|
||||
Ephemeral: proto.Bool(r.Ephemeral),
|
||||
}
|
||||
|
||||
if r.RateLimitProof != nil {
|
||||
rateLimitProof := &rlnpb.RateLimitProof{
|
||||
Proof: r.RateLimitProof.Proof,
|
||||
MerkleRoot: r.RateLimitProof.MerkleRoot,
|
||||
Epoch: r.RateLimitProof.Epoch,
|
||||
ShareX: r.RateLimitProof.ShareX,
|
||||
ShareY: r.RateLimitProof.ShareY,
|
||||
Nullifier: r.RateLimitProof.Nullifier,
|
||||
RlnIdentifier: r.RateLimitProof.RlnIdentifier,
|
||||
}
|
||||
|
||||
b, err := proto.Marshal(rateLimitProof)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
msg.RateLimitProof = b
|
||||
}
|
||||
|
||||
return msg, nil
|
||||
}
|
@ -1,121 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/pprof"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/gorilla/rpc/v2"
|
||||
"github.com/waku-org/go-waku/waku/v2/node"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type WakuRPC struct {
|
||||
node *node.WakuNode
|
||||
server *http.Server
|
||||
|
||||
log *zap.Logger
|
||||
|
||||
relayService *RelayService
|
||||
filterService *FilterService
|
||||
adminService *AdminService
|
||||
}
|
||||
|
||||
func NewWakuRPC(node *node.WakuNode, address string, port int, enableAdmin bool, enablePProf bool, cacheCapacity int, log *zap.Logger) *WakuRPC {
|
||||
wrpc := new(WakuRPC)
|
||||
wrpc.log = log.Named("rpc")
|
||||
|
||||
s := rpc.NewServer()
|
||||
s.RegisterCodec(NewSnakeCaseCodec(), "application/json")
|
||||
s.RegisterCodec(NewSnakeCaseCodec(), "application/json;charset=UTF-8")
|
||||
|
||||
mux := mux.NewRouter()
|
||||
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
||||
t := time.Now()
|
||||
s.ServeHTTP(w, r)
|
||||
wrpc.log.Info("served request", zap.String("path", r.URL.Path), zap.Duration("duration", time.Since(t)))
|
||||
})
|
||||
|
||||
if enablePProf {
|
||||
mux.PathPrefix("/debug/").Handler(http.DefaultServeMux)
|
||||
mux.HandleFunc("/debug/pprof/", pprof.Index)
|
||||
}
|
||||
|
||||
debugService := NewDebugService(node)
|
||||
err := s.RegisterService(debugService, "Debug")
|
||||
if err != nil {
|
||||
wrpc.log.Error("registering debug service", zap.Error(err))
|
||||
}
|
||||
|
||||
var relayService *RelayService
|
||||
if node.Relay() != nil {
|
||||
relayService = NewRelayService(node, cacheCapacity, log)
|
||||
err = s.RegisterService(relayService, "Relay")
|
||||
if err != nil {
|
||||
wrpc.log.Error("registering relay service", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
err = s.RegisterService(&StoreService{node, log}, "Store")
|
||||
if err != nil {
|
||||
wrpc.log.Error("registering store service", zap.Error(err))
|
||||
}
|
||||
|
||||
if enableAdmin {
|
||||
adminService := &AdminService{node, log.Named("admin")}
|
||||
err = s.RegisterService(adminService, "Admin")
|
||||
if err != nil {
|
||||
wrpc.log.Error("registering admin service", zap.Error(err))
|
||||
}
|
||||
wrpc.adminService = adminService
|
||||
}
|
||||
|
||||
filterService := NewFilterService(node, cacheCapacity, log)
|
||||
err = s.RegisterService(filterService, "Filter")
|
||||
if err != nil {
|
||||
wrpc.log.Error("registering filter service", zap.Error(err))
|
||||
}
|
||||
|
||||
listenAddr := fmt.Sprintf("%s:%d", address, port)
|
||||
|
||||
server := &http.Server{
|
||||
Addr: listenAddr,
|
||||
Handler: mux,
|
||||
}
|
||||
|
||||
server.RegisterOnShutdown(func() {
|
||||
filterService.Stop()
|
||||
|
||||
if relayService != nil {
|
||||
relayService.Stop()
|
||||
}
|
||||
})
|
||||
|
||||
wrpc.node = node
|
||||
wrpc.server = server
|
||||
wrpc.relayService = relayService
|
||||
wrpc.filterService = filterService
|
||||
|
||||
return wrpc
|
||||
}
|
||||
|
||||
func (r *WakuRPC) Start() {
|
||||
if r.relayService != nil {
|
||||
go r.relayService.Start()
|
||||
}
|
||||
|
||||
go r.filterService.Start()
|
||||
|
||||
go func() {
|
||||
_ = r.server.ListenAndServe()
|
||||
}()
|
||||
r.log.Info("server started", zap.String("addr", r.server.Addr))
|
||||
}
|
||||
|
||||
func (r *WakuRPC) Stop(ctx context.Context) error {
|
||||
r.log.Info("shutting down server")
|
||||
return r.server.Shutdown(ctx)
|
||||
}
|
@ -1,19 +0,0 @@
|
||||
package rpc
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/waku-org/go-waku/waku/v2/node"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
)
|
||||
|
||||
func TestWakuRpc(t *testing.T) {
|
||||
options := node.WithWakuStore()
|
||||
n, err := node.New(options)
|
||||
require.NoError(t, err)
|
||||
|
||||
rpc := NewWakuRPC(n, "127.0.0.1", 8080, true, false, 30, utils.Logger())
|
||||
require.NotNil(t, rpc.server)
|
||||
require.Equal(t, rpc.server.Addr, "127.0.0.1:8080")
|
||||
}
|
@ -29,7 +29,7 @@
|
||||
];
|
||||
doCheck = false;
|
||||
# FIXME: This needs to be manually changed when updating modules.
|
||||
vendorSha256 = "sha256-PnN+61S9F58A/VeO2M1DW7IJYYUP7xpkZrYYnWoO8lc=";
|
||||
vendorSha256 = "sha256-D0IwlMmCW32T/bfmJjFu3Mlg7pgW4j8IJGZUQ6fnHJQ=";
|
||||
# Fix for 'nix run' trying to execute 'go-waku'.
|
||||
meta = { mainProgram = "waku"; };
|
||||
};
|
||||
|
4
go.mod
4
go.mod
@ -10,7 +10,6 @@ require (
|
||||
github.com/ethereum/go-ethereum v1.10.26
|
||||
github.com/golang-migrate/migrate/v4 v4.15.2
|
||||
github.com/golang/protobuf v1.5.3 // indirect
|
||||
github.com/gorilla/rpc v1.2.0
|
||||
github.com/ipfs/go-ds-sql v0.3.0
|
||||
github.com/ipfs/go-log/v2 v2.5.1
|
||||
github.com/libp2p/go-libp2p v0.32.2
|
||||
@ -27,9 +26,8 @@ require (
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/gorilla/mux v1.8.0
|
||||
github.com/waku-org/go-discover v0.0.0-20221209174356-61c833f34d98
|
||||
golang.org/x/text v0.13.0
|
||||
golang.org/x/text v0.13.0 // indirect
|
||||
)
|
||||
|
||||
require (
|
||||
|
3
go.sum
3
go.sum
@ -772,10 +772,7 @@ github.com/gorilla/handlers v1.4.2/go.mod h1:Qkdc/uu4tH4g6mTK6auzZ766c4CA0Ng8+o/
|
||||
github.com/gorilla/mux v1.7.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
|
||||
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
|
||||
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
|
||||
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
|
||||
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
|
||||
github.com/gorilla/rpc v1.2.0 h1:WvvdC2lNeT1SP32zrIce5l0ECBfbAlmrmSBsuc57wfk=
|
||||
github.com/gorilla/rpc v1.2.0/go.mod h1:V4h9r+4sF5HnzqbwIez0fKSpANP0zlYd3qR7p36jkTQ=
|
||||
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
|
||||
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
|
||||
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
|
||||
|
Loading…
x
Reference in New Issue
Block a user