diff --git a/examples/filter2/main.go b/examples/filter2/main.go index 85369dd1..69afa73e 100644 --- a/examples/filter2/main.go +++ b/examples/filter2/main.go @@ -173,7 +173,7 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode) { func readLoop(ctx context.Context, wakuNode *node.WakuNode) { pubsubTopic := pubSubTopic.String() - sub, err := wakuNode.Relay().SubscribeWithTopic(ctx, pubsubTopic) + sub, err := wakuNode.Relay().SubscribeToTopic(ctx, pubsubTopic) if err != nil { log.Error("Could not subscribe: ", err) return diff --git a/waku/node.go b/waku/node.go index 6f75751b..fe95973b 100644 --- a/waku/node.go +++ b/waku/node.go @@ -232,7 +232,7 @@ func Execute(options Options) { if !options.Relay.Disable { for _, nodeTopic := range options.Relay.Topics { - _, err := wakuNode.Relay().SubscribeWithTopic(ctx, nodeTopic) + _, err := wakuNode.Relay().SubscribeToTopic(ctx, nodeTopic) failOnErr(err, "Error subscring to topic") } } diff --git a/waku/v2/protocol/filter/waku_filter_test.go b/waku/v2/protocol/filter/waku_filter_test.go index 4b87af81..612e54f1 100644 --- a/waku/v2/protocol/filter/waku_filter_test.go +++ b/waku/v2/protocol/filter/waku_filter_test.go @@ -25,7 +25,7 @@ func makeWakuRelay(t *testing.T, topic string, broadcaster v2.Broadcaster) (*rel relay, err := relay.NewWakuRelay(context.Background(), host, broadcaster) require.NoError(t, err) - sub, err := relay.SubscribeWithTopic(context.Background(), topic) + sub, err := relay.SubscribeToTopic(context.Background(), topic) require.NoError(t, err) return relay, sub, host @@ -95,7 +95,7 @@ func TestWakuFilter(t *testing.T) { require.Equal(t, contentFilter.ContentTopics[0], env.Message().GetContentTopic()) }() - _, err = node2.PublishWithTopic(ctx, tests.CreateWakuMessage(testContentTopic, 0), testTopic) + _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 0), testTopic) require.NoError(t, err) wg.Wait() @@ -112,7 +112,7 @@ func TestWakuFilter(t *testing.T) { } }() - _, err = node2.PublishWithTopic(ctx, tests.CreateWakuMessage("TopicB", 1), testTopic) + _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage("TopicB", 1), testTopic) require.NoError(t, err) wg.Wait() @@ -134,7 +134,7 @@ func TestWakuFilter(t *testing.T) { time.Sleep(1 * time.Second) - _, err = node2.PublishWithTopic(ctx, tests.CreateWakuMessage(testContentTopic, 2), testTopic) + _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, 2), testTopic) require.NoError(t, err) wg.Wait() } diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 83e95508..290fa670 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -83,7 +83,7 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) { // TODO: Assumes success, should probably be extended to check for network, peers, etc // It might make sense to use WithReadiness option here? - _, err := wakuLP.relay.PublishWithTopic(wakuLP.ctx, message, pubSubTopic) + _, err := wakuLP.relay.PublishToTopic(wakuLP.ctx, message, pubSubTopic) if err != nil { response.IsSuccess = false @@ -181,7 +181,7 @@ func (wakuLP *WakuLightPush) Stop() { wakuLP.h.RemoveStreamHandler(LightPushID_v20beta1) } -func (wakuLP *WakuLightPush) PublishWithTopic(ctx context.Context, message *pb.WakuMessage, topic string, opts ...LightPushOption) ([]byte, error) { +func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *pb.WakuMessage, topic string, opts ...LightPushOption) ([]byte, error) { if message == nil { return nil, errors.New("message can't be null") } @@ -204,5 +204,5 @@ func (wakuLP *WakuLightPush) PublishWithTopic(ctx context.Context, message *pb.W } func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *pb.WakuMessage, opts ...LightPushOption) ([]byte, error) { - return wakuLP.PublishWithTopic(ctx, message, relay.DefaultWakuTopic, opts...) + return wakuLP.PublishToTopic(ctx, message, relay.DefaultWakuTopic, opts...) } diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index b3789083..12df018d 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -27,7 +27,7 @@ func makeWakuRelay(t *testing.T, topic string) (*relay.WakuRelay, *relay.Subscri relay, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10)) require.NoError(t, err) - sub, err := relay.SubscribeWithTopic(context.Background(), topic) + sub, err := relay.SubscribeToTopic(context.Background(), topic) require.NoError(t, err) return relay, sub, host @@ -109,7 +109,7 @@ func TestWakuLightPush(t *testing.T) { require.True(t, resp.IsSuccess) // Checking that msg hash is correct - hash, err := client.PublishWithTopic(ctx, msg2, testTopic) + hash, err := client.PublishToTopic(ctx, msg2, testTopic) require.NoError(t, err) require.Equal(t, protocol.NewEnvelope(msg2, string(testTopic)).Hash(), hash) wg.Wait() @@ -137,6 +137,6 @@ func TestWakuLightPushNoPeers(t *testing.T) { require.NoError(t, err) client := NewWakuLightPush(ctx, clientHost, nil) - _, err = client.PublishWithTopic(ctx, tests.CreateWakuMessage("test", float64(0)), testTopic) + _, err = client.PublishToTopic(ctx, tests.CreateWakuMessage("test", float64(0)), testTopic) require.Errorf(t, err, "no suitable remote peers") } diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 54b977f7..9bff7395 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -143,7 +143,7 @@ func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err erro return sub, nil } -func (w *WakuRelay) PublishWithTopic(ctx context.Context, message *pb.WakuMessage, topic string) ([]byte, error) { +func (w *WakuRelay) PublishToTopic(ctx context.Context, message *pb.WakuMessage, topic string) ([]byte, error) { // Publish a `WakuMessage` to a PubSub topic. if w.pubsub == nil { return nil, errors.New("PubSub hasn't been set") @@ -175,7 +175,7 @@ func (w *WakuRelay) PublishWithTopic(ctx context.Context, message *pb.WakuMessag } func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage) ([]byte, error) { - return w.PublishWithTopic(ctx, message, DefaultWakuTopic) + return w.PublishToTopic(ctx, message, DefaultWakuTopic) } func (w *WakuRelay) Stop() { @@ -191,7 +191,7 @@ func (w *WakuRelay) Stop() { w.subscriptions = nil } -func (w *WakuRelay) SubscribeWithTopic(ctx context.Context, topic string) (*Subscription, error) { +func (w *WakuRelay) SubscribeToTopic(ctx context.Context, topic string) (*Subscription, error) { // Subscribes to a PubSub topic. // NOTE The data field SHOULD be decoded as a WakuMessage. sub, err := w.subscribe(topic) @@ -221,7 +221,7 @@ func (w *WakuRelay) SubscribeWithTopic(ctx context.Context, topic string) (*Subs } func (w *WakuRelay) Subscribe(ctx context.Context) (*Subscription, error) { - return w.SubscribeWithTopic(ctx, DefaultWakuTopic) + return w.SubscribeToTopic(ctx, DefaultWakuTopic) } func (w *WakuRelay) Unsubscribe(ctx context.Context, topic string) error { diff --git a/waku/v2/protocol/relay/waku_relay_test.go b/waku/v2/protocol/relay/waku_relay_test.go index d653f275..3fc5b6b9 100644 --- a/waku/v2/protocol/relay/waku_relay_test.go +++ b/waku/v2/protocol/relay/waku_relay_test.go @@ -45,7 +45,7 @@ func TestWakuRelay(t *testing.T) { ContentTopic: "test", Timestamp: 0, } - _, err = relay.PublishWithTopic(context.Background(), msg, testTopic) + _, err = relay.PublishToTopic(context.Background(), msg, testTopic) require.NoError(t, err) <-ctx.Done() diff --git a/waku/v2/rpc/filter_test.go b/waku/v2/rpc/filter_test.go index 99a07694..5fac092c 100644 --- a/waku/v2/rpc/filter_test.go +++ b/waku/v2/rpc/filter_test.go @@ -24,7 +24,7 @@ func makeFilterService(t *testing.T) *FilterService { err = n.Start() require.NoError(t, err) - _, err = n.Relay().SubscribeWithTopic(context.Background(), testTopic) + _, err = n.Relay().SubscribeToTopic(context.Background(), testTopic) require.NoError(t, err) return &FilterService{n} @@ -40,7 +40,7 @@ func TestFilterSubscription(t *testing.T) { node, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10)) require.NoError(t, err) - _, err = node.SubscribeWithTopic(context.Background(), testTopic) + _, err = node.SubscribeToTopic(context.Background(), testTopic) require.NoError(t, err) _ = filter.NewWakuFilter(context.Background(), host, false) diff --git a/waku/v2/rpc/relay.go b/waku/v2/rpc/relay.go index 4e4a0281..927f7684 100644 --- a/waku/v2/rpc/relay.go +++ b/waku/v2/rpc/relay.go @@ -81,7 +81,7 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, if args.Topic == "" { _, err = r.node.Relay().Publish(req.Context(), &args.Message) } else { - _, err = r.node.Relay().PublishWithTopic(req.Context(), &args.Message, args.Topic) + _, err = r.node.Relay().PublishToTopic(req.Context(), &args.Message, args.Topic) } if err != nil { log.Error("Error publishing message:", err) @@ -101,7 +101,7 @@ func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, r _, err = r.node.Relay().Subscribe(ctx) } else { - _, err = r.node.Relay().SubscribeWithTopic(ctx, topic) + _, err = r.node.Relay().SubscribeToTopic(ctx, topic) } if err != nil { log.Error("Error subscribing to topic:", topic, "err:", err)