From 00ee0b7511a46e8fe9b4a4b21bb6faaa059c6b2d Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Fri, 19 Nov 2021 16:01:52 -0400 Subject: [PATCH] refactor: create separate functions for subscriptions and publishing --- examples/basic2/main.go | 4 +-- examples/chat2/chat.go | 8 +++--- examples/filter2/main.go | 4 +-- tests/connection_test.go | 4 +-- waku/node.go | 2 +- waku/v2/node/wakunode2.go | 2 +- waku/v2/protocol/filter/waku_filter_test.go | 8 +++--- waku/v2/protocol/lightpush/waku_lightpush.go | 10 +++++--- .../protocol/lightpush/waku_lightpush_test.go | 6 ++--- waku/v2/protocol/relay/waku_relay.go | 25 +++++++++---------- waku/v2/protocol/relay/waku_relay_test.go | 2 +- waku/v2/rpc/filter_test.go | 4 +-- waku/v2/rpc/relay.go | 15 +++++++++-- 13 files changed, 54 insertions(+), 40 deletions(-) diff --git a/examples/basic2/main.go b/examples/basic2/main.go index 59af31d1..813c8ee1 100644 --- a/examples/basic2/main.go +++ b/examples/basic2/main.go @@ -103,7 +103,7 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) { Timestamp: timestamp, } - _, err = wakuNode.Relay().Publish(ctx, msg, nil) + _, err = wakuNode.Relay().Publish(ctx, msg) if err != nil { log.Error("Error sending a message: ", err) } @@ -117,7 +117,7 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode) { } func readLoop(ctx context.Context, wakuNode *node.WakuNode) { - sub, err := wakuNode.Relay().Subscribe(ctx, nil) + sub, err := wakuNode.Relay().Subscribe(ctx) if err != nil { log.Error("Could not subscribe: ", err) return diff --git a/examples/chat2/chat.go b/examples/chat2/chat.go index dcb95f76..b5203db2 100644 --- a/examples/chat2/chat.go +++ b/examples/chat2/chat.go @@ -51,7 +51,7 @@ func NewChat(ctx context.Context, n *node.WakuNode, selfID peer.ID, contentTopic if useLightPush { cf := filter.ContentFilter{ - Topic: string(relay.GetTopic(nil)), + Topic: relay.DefaultWakuTopic, ContentTopics: []string{contentTopic}, } var err error @@ -61,7 +61,7 @@ func NewChat(ctx context.Context, n *node.WakuNode, selfID peer.ID, contentTopic return nil, err } } else { - sub, err := n.Relay().Subscribe(ctx, nil) + sub, err := n.Relay().Subscribe(ctx) if err != nil { return nil, err } @@ -124,10 +124,10 @@ func (cr *Chat) Publish(ctx context.Context, message string) error { } if cr.useLightPush { - _, err = cr.node.Lightpush().Publish(ctx, wakuMsg, nil) + _, err = cr.node.Lightpush().Publish(ctx, wakuMsg) } else { - _, err = cr.node.Relay().Publish(ctx, wakuMsg, nil) + _, err = cr.node.Relay().Publish(ctx, wakuMsg) } diff --git a/examples/filter2/main.go b/examples/filter2/main.go index dde6c501..85369dd1 100644 --- a/examples/filter2/main.go +++ b/examples/filter2/main.go @@ -158,7 +158,7 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) { Timestamp: timestamp, } - _, err := wakuNode.Relay().Publish(ctx, msg, nil) + _, err := wakuNode.Relay().Publish(ctx, msg) if err != nil { log.Error("Error sending a message: ", err) } @@ -173,7 +173,7 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode) { func readLoop(ctx context.Context, wakuNode *node.WakuNode) { pubsubTopic := pubSubTopic.String() - sub, err := wakuNode.Relay().Subscribe(ctx, &pubsubTopic) + sub, err := wakuNode.Relay().SubscribeWithTopic(ctx, pubsubTopic) if err != nil { log.Error("Could not subscribe: ", err) return diff --git a/tests/connection_test.go b/tests/connection_test.go index edcf0374..325ba10e 100644 --- a/tests/connection_test.go +++ b/tests/connection_test.go @@ -38,7 +38,7 @@ func TestBasicSendingReceiving(t *testing.T) { require.NoError(t, write(ctx, wakuNode, "test")) - sub, err := wakuNode.Relay().Subscribe(ctx, nil) + sub, err := wakuNode.Relay().Subscribe(ctx) require.NoError(t, err) value := <-sub.C @@ -69,6 +69,6 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) erro Timestamp: timestamp, } - _, err = wakuNode.Relay().Publish(ctx, msg, nil) + _, err = wakuNode.Relay().Publish(ctx, msg) return err } diff --git a/waku/node.go b/waku/node.go index 5d2ed56e..6f75751b 100644 --- a/waku/node.go +++ b/waku/node.go @@ -232,7 +232,7 @@ func Execute(options Options) { if !options.Relay.Disable { for _, nodeTopic := range options.Relay.Topics { - _, err := wakuNode.Relay().Subscribe(ctx, &nodeTopic) + _, err := wakuNode.Relay().SubscribeWithTopic(ctx, nodeTopic) failOnErr(err, "Error subscring to topic") } } diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 183b678a..6a2802bc 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -362,7 +362,7 @@ func (w *WakuNode) mountRelay(opts ...pubsub.Option) error { } if w.opts.enableRelay { - _, err = w.relay.Subscribe(w.ctx, nil) + _, err = w.relay.Subscribe(w.ctx) if err != nil { return err } diff --git a/waku/v2/protocol/filter/waku_filter_test.go b/waku/v2/protocol/filter/waku_filter_test.go index 25df13ed..4b87af81 100644 --- a/waku/v2/protocol/filter/waku_filter_test.go +++ b/waku/v2/protocol/filter/waku_filter_test.go @@ -25,7 +25,7 @@ func makeWakuRelay(t *testing.T, topic string, broadcaster v2.Broadcaster) (*rel relay, err := relay.NewWakuRelay(context.Background(), host, broadcaster) require.NoError(t, err) - sub, err := relay.Subscribe(context.Background(), &topic) + sub, err := relay.SubscribeWithTopic(context.Background(), topic) require.NoError(t, err) return relay, sub, host @@ -95,7 +95,7 @@ func TestWakuFilter(t *testing.T) { require.Equal(t, contentFilter.ContentTopics[0], env.Message().GetContentTopic()) }() - _, err = node2.Publish(ctx, tests.CreateWakuMessage(testContentTopic, 0), &testTopic) + _, err = node2.PublishWithTopic(ctx, tests.CreateWakuMessage(testContentTopic, 0), testTopic) require.NoError(t, err) wg.Wait() @@ -112,7 +112,7 @@ func TestWakuFilter(t *testing.T) { } }() - _, err = node2.Publish(ctx, tests.CreateWakuMessage("TopicB", 1), &testTopic) + _, err = node2.PublishWithTopic(ctx, tests.CreateWakuMessage("TopicB", 1), testTopic) require.NoError(t, err) wg.Wait() @@ -134,7 +134,7 @@ func TestWakuFilter(t *testing.T) { time.Sleep(1 * time.Second) - _, err = node2.Publish(ctx, tests.CreateWakuMessage(testContentTopic, 2), &testTopic) + _, err = node2.PublishWithTopic(ctx, tests.CreateWakuMessage(testContentTopic, 2), testTopic) require.NoError(t, err) wg.Wait() } diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 6715801d..83e95508 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -83,7 +83,7 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) { // TODO: Assumes success, should probably be extended to check for network, peers, etc // It might make sense to use WithReadiness option here? - _, err := wakuLP.relay.Publish(wakuLP.ctx, message, &pubSubTopic) + _, err := wakuLP.relay.PublishWithTopic(wakuLP.ctx, message, pubSubTopic) if err != nil { response.IsSuccess = false @@ -181,14 +181,14 @@ func (wakuLP *WakuLightPush) Stop() { wakuLP.h.RemoveStreamHandler(LightPushID_v20beta1) } -func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *pb.WakuMessage, topic *string, opts ...LightPushOption) ([]byte, error) { +func (wakuLP *WakuLightPush) PublishWithTopic(ctx context.Context, message *pb.WakuMessage, topic string, opts ...LightPushOption) ([]byte, error) { if message == nil { return nil, errors.New("message can't be null") } req := new(pb.PushRequest) req.Message = message - req.PubsubTopic = string(relay.GetTopic(topic)) + req.PubsubTopic = topic response, err := wakuLP.request(ctx, req, opts...) if err != nil { @@ -202,3 +202,7 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *pb.WakuMessag return nil, errors.New(response.Info) } } + +func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *pb.WakuMessage, opts ...LightPushOption) ([]byte, error) { + return wakuLP.PublishWithTopic(ctx, message, relay.DefaultWakuTopic, opts...) +} diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index b0b86b73..b3789083 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -27,7 +27,7 @@ func makeWakuRelay(t *testing.T, topic string) (*relay.WakuRelay, *relay.Subscri relay, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10)) require.NoError(t, err) - sub, err := relay.Subscribe(context.Background(), &topic) + sub, err := relay.SubscribeWithTopic(context.Background(), topic) require.NoError(t, err) return relay, sub, host @@ -109,7 +109,7 @@ func TestWakuLightPush(t *testing.T) { require.True(t, resp.IsSuccess) // Checking that msg hash is correct - hash, err := client.Publish(ctx, msg2, &testTopic) + hash, err := client.PublishWithTopic(ctx, msg2, testTopic) require.NoError(t, err) require.Equal(t, protocol.NewEnvelope(msg2, string(testTopic)).Hash(), hash) wg.Wait() @@ -137,6 +137,6 @@ func TestWakuLightPushNoPeers(t *testing.T) { require.NoError(t, err) client := NewWakuLightPush(ctx, clientHost, nil) - _, err = client.Publish(ctx, tests.CreateWakuMessage("test", float64(0)), &testTopic) + _, err = client.PublishWithTopic(ctx, tests.CreateWakuMessage("test", float64(0)), testTopic) require.Errorf(t, err, "no suitable remote peers") } diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index cff9c519..54b977f7 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -143,7 +143,7 @@ func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err erro return sub, nil } -func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, topic *string) ([]byte, error) { +func (w *WakuRelay) PublishWithTopic(ctx context.Context, message *pb.WakuMessage, topic string) ([]byte, error) { // Publish a `WakuMessage` to a PubSub topic. if w.pubsub == nil { return nil, errors.New("PubSub hasn't been set") @@ -153,7 +153,7 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, topic return nil, errors.New("message can't be null") } - pubSubTopic, err := w.upsertTopic(GetTopic(topic)) + pubSubTopic, err := w.upsertTopic(topic) if err != nil { return nil, err @@ -174,12 +174,8 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, topic return hash, nil } -func GetTopic(topic *string) string { - t := DefaultWakuTopic - if topic != nil { - t = *topic - } - return t +func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage) ([]byte, error) { + return w.PublishWithTopic(ctx, message, DefaultWakuTopic) } func (w *WakuRelay) Stop() { @@ -195,11 +191,10 @@ func (w *WakuRelay) Stop() { w.subscriptions = nil } -func (w *WakuRelay) Subscribe(ctx context.Context, topic *string) (*Subscription, error) { +func (w *WakuRelay) SubscribeWithTopic(ctx context.Context, topic string) (*Subscription, error) { // Subscribes to a PubSub topic. // NOTE The data field SHOULD be decoded as a WakuMessage. - t := GetTopic(topic) - sub, err := w.subscribe(t) + sub, err := w.subscribe(topic) if err != nil { return nil, err @@ -214,17 +209,21 @@ func (w *WakuRelay) Subscribe(ctx context.Context, topic *string) (*Subscription w.subscriptionsMutex.Lock() defer w.subscriptionsMutex.Unlock() - w.subscriptions[t] = append(w.subscriptions[t], subscription) + w.subscriptions[topic] = append(w.subscriptions[topic], subscription) if w.bcaster != nil { w.bcaster.Register(subscription.C) } - go w.subscribeToTopic(t, subscription, sub) + go w.subscribeToTopic(topic, subscription, sub) return subscription, nil } +func (w *WakuRelay) Subscribe(ctx context.Context) (*Subscription, error) { + return w.SubscribeWithTopic(ctx, DefaultWakuTopic) +} + func (w *WakuRelay) Unsubscribe(ctx context.Context, topic string) error { if _, ok := w.relaySubs[topic]; !ok { return fmt.Errorf("topics %s is not subscribed", (string)(topic)) diff --git a/waku/v2/protocol/relay/waku_relay_test.go b/waku/v2/protocol/relay/waku_relay_test.go index 4cd39b02..d653f275 100644 --- a/waku/v2/protocol/relay/waku_relay_test.go +++ b/waku/v2/protocol/relay/waku_relay_test.go @@ -45,7 +45,7 @@ func TestWakuRelay(t *testing.T) { ContentTopic: "test", Timestamp: 0, } - _, err = relay.Publish(context.Background(), msg, &testTopic) + _, err = relay.PublishWithTopic(context.Background(), msg, testTopic) require.NoError(t, err) <-ctx.Done() diff --git a/waku/v2/rpc/filter_test.go b/waku/v2/rpc/filter_test.go index d2c80eb1..99a07694 100644 --- a/waku/v2/rpc/filter_test.go +++ b/waku/v2/rpc/filter_test.go @@ -24,7 +24,7 @@ func makeFilterService(t *testing.T) *FilterService { err = n.Start() require.NoError(t, err) - _, err = n.Relay().Subscribe(context.Background(), &testTopic) + _, err = n.Relay().SubscribeWithTopic(context.Background(), testTopic) require.NoError(t, err) return &FilterService{n} @@ -40,7 +40,7 @@ func TestFilterSubscription(t *testing.T) { node, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10)) require.NoError(t, err) - _, err = node.Subscribe(context.Background(), &testTopic) + _, err = node.SubscribeWithTopic(context.Background(), testTopic) require.NoError(t, err) _ = filter.NewWakuFilter(context.Background(), host, false) diff --git a/waku/v2/rpc/relay.go b/waku/v2/rpc/relay.go index ec54956e..4e4a0281 100644 --- a/waku/v2/rpc/relay.go +++ b/waku/v2/rpc/relay.go @@ -77,7 +77,12 @@ func (r *RelayService) Stop() { } func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, reply *SuccessReply) error { - _, err := r.node.Relay().Publish(req.Context(), &args.Message, &args.Topic) + var err error + if args.Topic == "" { + _, err = r.node.Relay().Publish(req.Context(), &args.Message) + } else { + _, err = r.node.Relay().PublishWithTopic(req.Context(), &args.Message, args.Topic) + } if err != nil { log.Error("Error publishing message:", err) reply.Success = false @@ -91,7 +96,13 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error { ctx := req.Context() for _, topic := range args.Topics { - _, err := r.node.Relay().Subscribe(ctx, &topic) + var err error + if topic == "" { + _, err = r.node.Relay().Subscribe(ctx) + + } else { + _, err = r.node.Relay().SubscribeWithTopic(ctx, topic) + } if err != nil { log.Error("Error subscribing to topic:", topic, "err:", err) reply.Success = false