From 75516a8f96be78dcbedd9bc2c926b8e9110ab3c3 Mon Sep 17 00:00:00 2001 From: Anthony Laibe Date: Thu, 18 Nov 2021 15:20:58 +0100 Subject: [PATCH] feat: implement rcp relay get messages --- waku/v2/node/wakunode2.go | 4 +++ waku/v2/rpc/relay.go | 71 +++++++++++++++++++++++++++++++++++-- waku/v2/rpc/relay_test.go | 74 ++++++++++++++++++++++++++++++++++++++- waku/v2/rpc/store_test.go | 2 +- waku/v2/rpc/waku_rpc.go | 14 ++++++-- 5 files changed, 159 insertions(+), 6 deletions(-) diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 182af24d..183b678a 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -350,6 +350,10 @@ func (w *WakuNode) DiscV5() *discv5.DiscoveryV5 { return w.discoveryV5 } +func (w *WakuNode) Broadcaster() v2.Broadcaster { + return w.bcaster +} + func (w *WakuNode) mountRelay(opts ...pubsub.Option) error { var err error w.relay, err = relay.NewWakuRelay(w.ctx, w.host, w.bcaster, opts...) diff --git a/waku/v2/rpc/relay.go b/waku/v2/rpc/relay.go index c4123a46..fd569c73 100644 --- a/waku/v2/rpc/relay.go +++ b/waku/v2/rpc/relay.go @@ -3,14 +3,22 @@ package rpc import ( "fmt" "net/http" + "sync" "github.com/status-im/go-waku/waku/v2/node" + "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/relay" ) type RelayService struct { node *node.WakuNode + + messages map[string][]*pb.WakuMessage + messagesMutex sync.RWMutex + + ch chan *protocol.Envelope + quit chan bool } type RelayMessageArgs struct { @@ -22,6 +30,53 @@ type TopicsArgs struct { Topics []string `json:"topics,omitempty"` } +type TopicArgs struct { + Topic string `json:"topic,omitempty"` +} + +type MessagesReply struct { + Messages []*pb.WakuMessage `json:"messages,omitempty"` +} + +func NewRelayService(node *node.WakuNode) *RelayService { + return &RelayService{ + node: node, + messages: make(map[string][]*pb.WakuMessage), + quit: make(chan bool), + } +} + +func (r *RelayService) addEnvelope(envelope *protocol.Envelope) { + r.messagesMutex.Lock() + defer r.messagesMutex.Unlock() + + if _, ok := r.messages[envelope.PubsubTopic()]; !ok { + return + } + + r.messages[envelope.PubsubTopic()] = append(r.messages[envelope.PubsubTopic()], envelope.Message()) +} + +func (r *RelayService) Start() { + r.ch = make(chan *protocol.Envelope, 1024) + r.node.Broadcaster().Register(r.ch) + + for { + select { + case <-r.quit: + return + case envelope := <-r.ch: + r.addEnvelope(envelope) + } + } +} + +func (r *RelayService) Stop() { + r.quit <- true + r.node.Broadcaster().Unregister(r.ch) + close(r.ch) +} + func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, reply *SuccessReply) error { _, err := r.node.Relay().Publish(req.Context(), &args.Message, (*relay.Topic)(&args.Topic)) if err != nil { @@ -44,6 +99,7 @@ func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, r reply.Error = err.Error() return nil } + r.messages[topic] = make([]*pb.WakuMessage, 0) } reply.Success = true return nil @@ -59,11 +115,22 @@ func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, reply.Error = err.Error() return nil } + + delete(r.messages, topic) } reply.Success = true return nil } -func (r *RelayService) GetV1Messages(req *http.Request, args *Empty, reply *Empty) error { - return fmt.Errorf("not implemented") +func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply *MessagesReply) error { + r.messagesMutex.Lock() + defer r.messagesMutex.Unlock() + + if _, ok := r.messages[args.Topic]; !ok { + return fmt.Errorf("topic %s not subscribed", args.Topic) + } + + reply.Messages = r.messages[args.Topic] + r.messages[args.Topic] = make([]*pb.WakuMessage, 0) + return nil } diff --git a/waku/v2/rpc/relay_test.go b/waku/v2/rpc/relay_test.go index b59de4e4..a6d97669 100644 --- a/waku/v2/rpc/relay_test.go +++ b/waku/v2/rpc/relay_test.go @@ -2,9 +2,13 @@ package rpc import ( "context" + "fmt" "testing" + "time" + "github.com/multiformats/go-multiaddr" "github.com/status-im/go-waku/waku/v2/node" + "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/stretchr/testify/require" ) @@ -14,7 +18,7 @@ func makeRelayService(t *testing.T) *RelayService { require.NoError(t, err) err = n.Start() require.NoError(t, err) - return &RelayService{n} + return NewRelayService(n) } func TestPostV1Message(t *testing.T) { @@ -53,3 +57,71 @@ func TestRelaySubscription(t *testing.T) { require.NoError(t, err) require.True(t, reply.Success) } + +func TestRelayGetV1Messages(t *testing.T) { + serviceA := makeRelayService(t) + var reply SuccessReply + + serviceB := makeRelayService(t) + go serviceB.Start() + defer serviceB.Stop() + + hostInfo, err := multiaddr.NewMultiaddr(fmt.Sprintf("/p2p/%s", serviceB.node.Host().ID().Pretty())) + require.NoError(t, err) + + var addr multiaddr.Multiaddr + for _, a := range serviceB.node.Host().Addrs() { + addr = a.Encapsulate(hostInfo) + break + } + err = serviceA.node.DialPeerWithMultiAddress(context.Background(), addr) + require.NoError(t, err) + + // Wait for the dial to complete + time.Sleep(1 * time.Second) + + args := &TopicsArgs{Topics: []string{"test"}} + err = serviceB.PostV1Subscription( + makeRequest(t), + args, + &reply, + ) + require.NoError(t, err) + require.True(t, reply.Success) + + // Wait for the subscription to be started + time.Sleep(1 * time.Second) + + err = serviceA.PostV1Message( + makeRequest(t), + &RelayMessageArgs{ + Topic: "test", + Message: pb.WakuMessage{ + Payload: []byte("test"), + }, + }, + &reply, + ) + require.NoError(t, err) + require.True(t, reply.Success) + + // Wait for the message to be received + time.Sleep(1 * time.Second) + + var messagesReply MessagesReply + err = serviceB.GetV1Messages( + makeRequest(t), + &TopicArgs{"test"}, + &messagesReply, + ) + require.NoError(t, err) + require.Len(t, messagesReply.Messages, 1) + + err = serviceB.GetV1Messages( + makeRequest(t), + &TopicArgs{"test"}, + &messagesReply, + ) + require.NoError(t, err) + require.Len(t, messagesReply.Messages, 0) +} diff --git a/waku/v2/rpc/store_test.go b/waku/v2/rpc/store_test.go index 4f25b02e..68d1924e 100644 --- a/waku/v2/rpc/store_test.go +++ b/waku/v2/rpc/store_test.go @@ -17,7 +17,7 @@ func makeStoreService(t *testing.T) *StoreService { return &StoreService{n} } -func TestGetV1Message(t *testing.T) { +func TestStoreGetV1Messages(t *testing.T) { var reply StoreMessagesReply s := makeStoreService(t) diff --git a/waku/v2/rpc/waku_rpc.go b/waku/v2/rpc/waku_rpc.go index ec2f54fd..d433b4e3 100644 --- a/waku/v2/rpc/waku_rpc.go +++ b/waku/v2/rpc/waku_rpc.go @@ -16,6 +16,8 @@ var log = logging.Logger("wakurpc") type WakuRpc struct { node *node.WakuNode server *http.Server + + relayService *RelayService } func NewWakuRpc(node *node.WakuNode, address string, port int) *WakuRpc { @@ -28,7 +30,9 @@ func NewWakuRpc(node *node.WakuNode, address string, port int) *WakuRpc { log.Error(err) } - err = s.RegisterService(&RelayService{node}, "Relay") + relayService := NewRelayService(node) + + err = s.RegisterService(relayService, "Relay") if err != nil { log.Error(err) } @@ -67,10 +71,16 @@ func NewWakuRpc(node *node.WakuNode, address string, port int) *WakuRpc { Handler: mux, } - return &WakuRpc{node: node, server: server} + return &WakuRpc{ + node: node, + server: server, + relayService: relayService, + } } func (r *WakuRpc) Start() { + go r.relayService.Start() + defer r.relayService.Stop() log.Info("Rpc server started at ", r.server.Addr) log.Info("server stopped ", r.server.ListenAndServe()) }