feat: append RLN proofs when posting messages in REST/RPC

This commit is contained in:
Richard Ramos 2023-09-07 14:00:59 -04:00 committed by richΛrd
parent 18efd2c737
commit cc28267951
36 changed files with 91 additions and 15 deletions

View File

@ -40,8 +40,8 @@ import (
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoreds" "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoreds"
ws "github.com/libp2p/go-libp2p/p2p/transport/websocket" ws "github.com/libp2p/go-libp2p/p2p/transport/websocket"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/cmd/waku/rest" "github.com/waku-org/go-waku/cmd/waku/server/rest"
"github.com/waku-org/go-waku/cmd/waku/rpc" "github.com/waku-org/go-waku/cmd/waku/server/rpc"
"github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/metrics" "github.com/waku-org/go-waku/waku/metrics"
"github.com/waku-org/go-waku/waku/persistence" "github.com/waku-org/go-waku/waku/persistence"

13
cmd/waku/server/no_rln.go Normal file
View File

@ -0,0 +1,13 @@
//go:build !gowaku_rln
// +build !gowaku_rln
package server
import (
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
)
func AppendRLNProof(node *node.WakuNode, msg *pb.WakuMessage) error {
return nil
}

View File

@ -3,11 +3,13 @@ package rest
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"errors"
"net/http" "net/http"
"strings" "strings"
"sync" "sync"
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
"github.com/waku-org/go-waku/cmd/waku/server"
"github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/pb"
@ -15,9 +17,10 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
const ROUTE_RELAY_SUBSCRIPTIONSV1 = "/relay/v1/subscriptions" const routeRelayV1Subscriptions = "/relay/v1/subscriptions"
const ROUTE_RELAY_MESSAGESV1 = "/relay/v1/messages/{topic}" const routeRelayV1Messages = "/relay/v1/messages/{topic}"
// RelayService represents the REST service for WakuRelay
type RelayService struct { type RelayService struct {
node *node.WakuNode node *node.WakuNode
cancel context.CancelFunc cancel context.CancelFunc
@ -31,6 +34,7 @@ type RelayService struct {
runner *runnerService runner *runnerService
} }
// NewRelayService returns an instance of RelayService
func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *zap.Logger) *RelayService { func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *zap.Logger) *RelayService {
s := &RelayService{ s := &RelayService{
node: node, node: node,
@ -41,10 +45,10 @@ func NewRelayService(node *node.WakuNode, m *chi.Mux, cacheCapacity int, log *za
s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope) s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope)
m.Post(ROUTE_RELAY_SUBSCRIPTIONSV1, s.postV1Subscriptions) m.Post(routeRelayV1Subscriptions, s.postV1Subscriptions)
m.Delete(ROUTE_RELAY_SUBSCRIPTIONSV1, s.deleteV1Subscriptions) m.Delete(routeRelayV1Subscriptions, s.deleteV1Subscriptions)
m.Get(ROUTE_RELAY_MESSAGESV1, s.getV1Messages) m.Get(routeRelayV1Messages, s.getV1Messages)
m.Post(ROUTE_RELAY_MESSAGESV1, s.postV1Message) m.Post(routeRelayV1Messages, s.postV1Message)
return s return s
} }
@ -65,6 +69,7 @@ func (r *RelayService) addEnvelope(envelope *protocol.Envelope) {
r.messages[envelope.PubsubTopic()] = append(r.messages[envelope.PubsubTopic()], envelope.Message()) r.messages[envelope.PubsubTopic()] = append(r.messages[envelope.PubsubTopic()], envelope.Message())
} }
// Start starts the RelayService
func (r *RelayService) Start(ctx context.Context) { func (r *RelayService) Start(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
r.cancel = cancel r.cancel = cancel
@ -80,6 +85,7 @@ func (r *RelayService) Start(ctx context.Context) {
r.runner.Start(ctx) r.runner.Start(ctx)
} }
// Stop stops the RelayService
func (r *RelayService) Stop() { func (r *RelayService) Stop() {
if r.cancel == nil { if r.cancel == nil {
return return
@ -187,11 +193,20 @@ func (d *RelayService) postV1Message(w http.ResponseWriter, r *http.Request) {
var err error var err error
if topic == "" { if topic == "" {
_, err = d.node.Relay().Publish(r.Context(), message) topic = relay.DefaultWakuTopic
} else {
_, err = d.node.Relay().PublishToTopic(r.Context(), message, strings.Replace(topic, "\n", "", -1))
} }
if !d.node.Relay().IsSubscribed(topic) {
writeErrOrResponse(w, errors.New("not subscribed to pubsubTopic"), nil)
return
}
if err = server.AppendRLNProof(d.node, message); err != nil {
writeErrOrResponse(w, err, nil)
return
}
_, err = d.node.Relay().PublishToTopic(r.Context(), message, strings.Replace(topic, "\n", "", -1))
if err != nil { if err != nil {
d.log.Error("publishing message", zap.Error(err)) d.log.Error("publishing message", zap.Error(err))
} }

23
cmd/waku/server/rln.go Normal file
View File

@ -0,0 +1,23 @@
//go:build gowaku_rln
// +build gowaku_rln
package server
import (
"fmt"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/rln"
)
func AppendRLNProof(node *node.WakuNode, msg *pb.WakuMessage) error {
_, rlnEnabled := node.RLNRelay().(*rln.WakuRLNRelay)
if rlnEnabled {
err := node.RLNRelay().AppendRLNProof(msg, node.Timesource().Now())
if err != nil {
return fmt.Errorf("could not append rln proof: %w", err)
}
}
return nil
}

View File

@ -1,10 +1,12 @@
package rpc package rpc
import ( import (
"errors"
"fmt" "fmt"
"net/http" "net/http"
"sync" "sync"
"github.com/waku-org/go-waku/cmd/waku/server"
"github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/pb"
@ -12,6 +14,7 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
// RelayService represents the JSON RPC service for WakuRelay
type RelayService struct { type RelayService struct {
node *node.WakuNode node *node.WakuNode
@ -24,19 +27,23 @@ type RelayService struct {
runner *runnerService runner *runnerService
} }
// RelayMessageArgs represents the requests used for posting messages
type RelayMessageArgs struct { type RelayMessageArgs struct {
Topic string `json:"topic,omitempty"` Topic string `json:"topic,omitempty"`
Message *RPCWakuMessage `json:"message,omitempty"` Message *RPCWakuMessage `json:"message,omitempty"`
} }
// TopicsArgs represents the lists of topics to use when subscribing / unsubscribing
type TopicsArgs struct { type TopicsArgs struct {
Topics []string `json:"topics,omitempty"` Topics []string `json:"topics,omitempty"`
} }
// TopicArgs represents a request that contains a single topic
type TopicArgs struct { type TopicArgs struct {
Topic string `json:"topic,omitempty"` Topic string `json:"topic,omitempty"`
} }
// NewRelayService returns an instance of RelayService
func NewRelayService(node *node.WakuNode, cacheCapacity int, log *zap.Logger) *RelayService { func NewRelayService(node *node.WakuNode, cacheCapacity int, log *zap.Logger) *RelayService {
s := &RelayService{ s := &RelayService{
node: node, node: node,
@ -66,6 +73,7 @@ func (r *RelayService) addEnvelope(envelope *protocol.Envelope) {
r.messages[envelope.PubsubTopic()] = append(r.messages[envelope.PubsubTopic()], envelope.Message()) r.messages[envelope.PubsubTopic()] = append(r.messages[envelope.PubsubTopic()], envelope.Message())
} }
// Start starts the RelayService
func (r *RelayService) Start() { func (r *RelayService) Start() {
r.messagesMutex.Lock() r.messagesMutex.Lock()
// Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these // Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these
@ -78,18 +86,31 @@ func (r *RelayService) Start() {
r.runner.Start() r.runner.Start()
} }
// Stop stops the RelayService
func (r *RelayService) Stop() { func (r *RelayService) Stop() {
r.runner.Stop() r.runner.Stop()
} }
// PostV1Message is invoked when the json rpc request uses the post_waku_v2_relay_v1_message method
func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, reply *SuccessReply) error { func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, reply *SuccessReply) error {
var err error var err error
if args.Topic == "" { topic := relay.DefaultWakuTopic
_, err = r.node.Relay().Publish(req.Context(), args.Message.toProto()) if args.Topic != "" {
} else { topic = args.Topic
_, err = r.node.Relay().PublishToTopic(req.Context(), args.Message.toProto(), args.Topic)
} }
if !r.node.Relay().IsSubscribed(topic) {
return errors.New("not subscribed to pubsubTopic")
}
msg := args.Message.toProto()
if err = server.AppendRLNProof(r.node, msg); err != nil {
return err
}
_, err = r.node.Relay().PublishToTopic(req.Context(), msg, topic)
if err != nil { if err != nil {
r.log.Error("publishing message", zap.Error(err)) r.log.Error("publishing message", zap.Error(err))
return err return err
@ -99,6 +120,7 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs,
return nil return nil
} }
// PostV1Subscription is invoked when the json rpc request uses the post_waku_v2_relay_v1_subscription method
func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error { func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
ctx := req.Context() ctx := req.Context()
for _, topic := range args.Topics { for _, topic := range args.Topics {
@ -129,6 +151,7 @@ func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, r
return nil return nil
} }
// DeleteV1Subscription is invoked when the json rpc request uses the delete_waku_v2_relay_v1_subscription method
func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error { func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
ctx := req.Context() ctx := req.Context()
for _, topic := range args.Topics { for _, topic := range args.Topics {
@ -145,6 +168,7 @@ func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs,
return nil return nil
} }
// GetV1Messages is invoked when the json rpc request uses the get_waku_v2_relay_v1_messages method
func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply *MessagesReply) error { func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply *MessagesReply) error {
r.messagesMutex.Lock() r.messagesMutex.Lock()
defer r.messagesMutex.Unlock() defer r.messagesMutex.Unlock()

View File

@ -5,6 +5,7 @@ package node
import "context" import "context"
// RLNRelay is used to access any operation related to Waku RLN protocol
func (w *WakuNode) RLNRelay() RLNRelay { func (w *WakuNode) RLNRelay() RLNRelay {
return nil return nil
} }