From 38202e7a2e8db6982f53aaad3805f92d4498e634 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Mon, 30 Oct 2023 12:30:25 -0400 Subject: [PATCH] refactor: publish API for relay and lightpush (#845) --- Makefile | 5 ++- cmd/waku/server/rest/relay.go | 2 +- cmd/waku/server/rpc/filter_test.go | 4 +- cmd/waku/server/rpc/relay.go | 2 +- docs/api/lightpush.md | 17 +++----- docs/api/relay.md | 33 +++++----------- examples/basic2/main.go | 2 +- examples/chat2/chat.go | 10 ++--- examples/filter2/main.go | 4 +- examples/noise/main.go | 8 ++-- examples/rln/main.go | 3 +- library/lightpush.go | 2 +- library/relay.go | 2 +- tests/connection_test.go | 2 +- waku/v2/node/wakunode2_test.go | 4 +- .../filter/filter_proto_ident_test.go | 22 ++++++----- waku/v2/protocol/filter/filter_test.go | 8 ++-- .../legacy_filter/waku_filter_test.go | 12 +++--- waku/v2/protocol/lightpush/waku_lightpush.go | 15 +++---- .../lightpush/waku_lightpush_option.go | 21 +++++++--- .../protocol/lightpush/waku_lightpush_test.go | 4 +- .../protocol/noise/pairing_relay_messenger.go | 2 +- waku/v2/protocol/relay/options.go | 22 +++++++++++ waku/v2/protocol/relay/waku_relay.go | 39 +++++++++++-------- waku/v2/protocol/relay/waku_relay_test.go | 6 +-- 25 files changed, 137 insertions(+), 114 deletions(-) create mode 100644 waku/v2/protocol/relay/options.go diff --git a/Makefile b/Makefile index 211b4c72..e68348b1 100644 --- a/Makefile +++ b/Makefile @@ -124,10 +124,13 @@ build-example-filter2: build-example-c-bindings: cd examples/c-bindings && $(MAKE) +build-example-noise: + cd examples/noise && $(MAKE) + build-example-rln: cd examples/rln && $(MAKE) -build-example: build-example-basic2 build-example-chat-2 build-example-filter2 build-example-c-bindings build-example-rln +build-example: build-example-basic2 build-example-chat-2 build-example-filter2 build-example-c-bindings build-example-noise build-example-rln static-library: @echo "Building static library..." diff --git a/cmd/waku/server/rest/relay.go b/cmd/waku/server/rest/relay.go index b788ed99..1261f6ed 100644 --- a/cmd/waku/server/rest/relay.go +++ b/cmd/waku/server/rest/relay.go @@ -220,7 +220,7 @@ func (r *RelayService) postV1Message(w http.ResponseWriter, req *http.Request) { return } - _, err = r.node.Relay().PublishToTopic(req.Context(), message, strings.Replace(topic, "\n", "", -1)) + _, err = r.node.Relay().Publish(req.Context(), message, relay.WithPubSubTopic(strings.Replace(topic, "\n", "", -1))) if err != nil { r.log.Error("publishing message", zap.Error(err)) } diff --git a/cmd/waku/server/rpc/filter_test.go b/cmd/waku/server/rpc/filter_test.go index 8986de53..e6c7d698 100644 --- a/cmd/waku/server/rpc/filter_test.go +++ b/cmd/waku/server/rpc/filter_test.go @@ -143,10 +143,10 @@ func TestFilterGetV1Messages(t *testing.T) { // Wait for the subscription to be started time.Sleep(1 * time.Second) - _, err = serviceA.node.Relay().PublishToTopic( + _, err = serviceA.node.Relay().Publish( context.Background(), &wpb.WakuMessage{ContentTopic: "ct"}, - testTopic, + relay.WithPubSubTopic(testTopic), ) require.NoError(t, err) require.True(t, reply) diff --git a/cmd/waku/server/rpc/relay.go b/cmd/waku/server/rpc/relay.go index 8c7d10a4..5f431046 100644 --- a/cmd/waku/server/rpc/relay.go +++ b/cmd/waku/server/rpc/relay.go @@ -115,7 +115,7 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, return err } - _, err = r.node.Relay().PublishToTopic(req.Context(), msg, topic) + _, err = r.node.Relay().Publish(req.Context(), msg, relay.WithPubSubTopic(topic)) if err != nil { r.log.Error("publishing message", zap.Error(err)) return err diff --git a/docs/api/lightpush.md b/docs/api/lightpush.md index 35454638..b22acabd 100644 --- a/docs/api/lightpush.md +++ b/docs/api/lightpush.md @@ -65,22 +65,17 @@ if err != nil { ``` -To send a message, it needs to be wrapped into a [`WakuMessage`](https://rfc.vac.dev/spec/14/) protobuffer. The payload of the message is not limited to strings. Any kind of data that can be serialized +To send a message, it needs to be wrapped into a [`WakuMessage`](https://rfc.vac.dev/spec/14/) protobuffer. +The payload of the message is not limited to strings. Any kind of data that can be serialized into a `[]byte` can be sent as long as it does not exceed the maximum length a message can have (~1MB) -The following functions can be used to publish a message: -- `wakuNode.Lightpush().Publish(ctx, msg, opts...)` - to send a message to the default waku pubsub topic -- `wakuNode.Lightpush().PublishToTopic(ctx, msg, topic, opts...)` - to send a message to a custom pubsub topic +`wakuNode.Lightpush().Publish(ctx, msg, opts...)` is used to publish a message. This function will return a message id on success, or an error if the message could not be published. -Both of these functions will return a message id on success, or an error if the message could not be published. - -If no options are specified, go-waku will automatically choose the peer used to broadcast the message via Lightpush. This behaviour can be controlled via options: +If no options are specified, go-waku will automatically choose the peer used to broadcast the message via Lightpush and publish the message to a pubsub topic derived from the content topic of the message. This behaviour can be controlled via options: ### Options - +- `lightpush.WithPubSubTopic(topic)` - broadcast the message using a custom pubsub topic +- `lightpush.WithDefaultPubsubTopic()` - broadcast the message to the default pubsub topic - `lightpush.WithPeer(peerID)` - use an specific peer ID (which should be part of the node peerstore) to broadcast the message with - `lightpush.WithAutomaticPeerSelection(host)` - automatically select a peer that supports lightpush protocol from the peerstore to broadcast the message with - `lightpush.WithFastestPeerSelection(ctx)` - automatically select a peer based on its ping reply time - - - diff --git a/docs/api/relay.md b/docs/api/relay.md index 3525ab2c..84f3530c 100644 --- a/docs/api/relay.md +++ b/docs/api/relay.md @@ -41,34 +41,19 @@ One of these options must be specified when instantiating a node supporting the ## Receiving messages ```go ... -sub, err := wakuNode.Relay().Subscribe(context.Background()) +contentFilter := protocol.NewContentFilter(relay.DefaultWakuTopic) +sub, err := wakuNode.Relay().Subscribe(context.Background, contentFilter) ([]*Subscription, error) if err != nil { fmt.Println(err) return } -for value := range sub.C { +for value := range sub[0].C { fmt.Println("Received msg:", string(value.Message().Payload)) } ... ``` -To receive messages sent via the relay protocol, you need to subscribe to a pubsub topic. This can be done via any of these functions: -- `wakuNode.Relay().Subscribe(ctx)` - subscribes to the default waku pubsub topic `/waku/2/default-waku/proto` -- `wakuNode.Relay().SubscribeToTopic(ctx, topic)` - subscribes to a custom pubsub topic - -These functions return a `Subscription` struct containing a channel on which messages will be received. To stop receiving messages in this channel `sub.Unsubscribe()` can be executed which will close the channel (without unsubscribing from the pubsub topic) - -> Pubsub topics should follow the [recommended usage](https://rfc.vac.dev/spec/23/) structure. For this purpose, the `NewPubsubTopic` helper function was created: -```go -import "github.com/waku-org/go-waku/waku/v2/protocol" - -topic := protocol.NewPubsubTopic("the_topic_name", "the_encoding") -/* -fmt.Println(topic.String()) // => `/waku/2/the_topic_name/the_encoding` -*/ -``` - - +To receive messages sent via the relay protocol, you need to subscribe specifying a content filter with the function `Subscribe(ctx context.Context, contentFilter waku_proto.ContentFilter, opts ...RelaySubscribeOption) ([]*Subscription, error)`. This functions return a list of `Subscription` struct containing a channel on which messages will be received. To stop receiving messages `WakuRelay`'s `Unsubscribe(ctx context.Context, contentFilter waku_proto.ContentFilter) error` can be executed which will close the channel (without unsubscribing from the pubsub topic) which will make sure the subscription is stopped, and if no other subscriptions exist for underlying pubsub topic, the pubsub is also unsubscribed. ## Sending messages @@ -95,11 +80,13 @@ if err != nil { To send a message, it needs to be wrapped into a [`WakuMessage`](https://rfc.vac.dev/spec/14/) protobuffer. The payload of the message is not limited to strings. Any kind of data that can be serialized into a `[]byte` can be sent as long as it does not exceed the maximum length a message can have (~1MB) -The following functions can be used to publish a message: -- `wakuNode.Relay().Publish(ctx, msg)` - to send a message to the default waku pubsub topic -- `wakuNode.Relay().PublishToTopic(ctx, msg, topic)` - to send a message to a custom pubsub topic +`wakuNode.Relay().Publish(ctx, msg, opts...)` is used to publish a message. This function will return a message id on success, or an error if the message could not be published. -Both of these functions will return a message id on success, or an error if the message could not be published. +If no options are specified, go-waku will automatically choose the peer used to broadcast the message via Relay and publish the message to a pubsub topic derived from the content topic of the message. This behaviour can be controlled via options: + +### Options +- `relay.WithPubSubTopic(topic)` - broadcast the message using a custom pubsub topic +- `relay.WithDefaultPubsubTopic()` - broadcast the message to the default pubsub topic > If `WithWakuRelayAndMinPeers` was used during the instantiation of the wakuNode, it should be possible to verify if there's enough peers for publishing to a topic with `wakuNode.Relay().EnoughPeersToPublish()` and `wakuNode.Relay().EnoughPeersToPublishToTopic(topic)` diff --git a/examples/basic2/main.go b/examples/basic2/main.go index 1cad0b8f..ce93be2c 100644 --- a/examples/basic2/main.go +++ b/examples/basic2/main.go @@ -103,7 +103,7 @@ func write(ctx context.Context, wakuNode *node.WakuNode, contentTopic string, ms Timestamp: timestamp, } - _, err = wakuNode.Relay().Publish(ctx, msg) + _, err = wakuNode.Relay().Publish(ctx, msg, relay.WithDefaultPubsubTopic()) if err != nil { log.Error("Error sending a message", zap.Error(err)) } diff --git a/examples/chat2/chat.go b/examples/chat2/chat.go index fcf7e221..847a3032 100644 --- a/examples/chat2/chat.go +++ b/examples/chat2/chat.go @@ -319,18 +319,18 @@ func (c *Chat) publish(ctx context.Context, message string) error { } if c.options.LightPush.Enable { - var lightOpt lightpush.Option + lightOpt := []lightpush.Option{lightpush.WithDefaultPubsubTopic()} var peerID peer.ID peerID, err = options.LightPush.NodePeerID() if err != nil { - lightOpt = lightpush.WithAutomaticPeerSelection() + lightOpt = append(lightOpt, lightpush.WithAutomaticPeerSelection()) } else { - lightOpt = lightpush.WithPeer(peerID) + lightOpt = append(lightOpt, lightpush.WithPeer(peerID)) } - _, err = c.node.Lightpush().Publish(c.ctx, wakuMsg, lightOpt) + _, err = c.node.Lightpush().Publish(c.ctx, wakuMsg, lightOpt...) } else { - _, err = c.node.Relay().Publish(ctx, wakuMsg) + _, err = c.node.Relay().Publish(ctx, wakuMsg, relay.WithDefaultPubsubTopic()) } return err diff --git a/examples/filter2/main.go b/examples/filter2/main.go index a4f24c1a..70dd1dc7 100644 --- a/examples/filter2/main.go +++ b/examples/filter2/main.go @@ -19,6 +19,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/utils" ) @@ -98,6 +99,7 @@ func main() { // Send FilterRequest from light node to full node cf := protocol.ContentFilter{ + PubsubTopic: relay.DefaultWakuTopic, ContentTopics: protocol.NewContentTopicSet(contentTopic), } @@ -157,7 +159,7 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) { Timestamp: timestamp, } - _, err := wakuNode.Relay().Publish(ctx, msg) + _, err := wakuNode.Relay().Publish(ctx, msg, relay.WithPubSubTopic(pubSubTopic.String())) if err != nil { log.Error("Error sending a message: ", err) } diff --git a/examples/noise/main.go b/examples/noise/main.go index 80f27d4a..f5d0998c 100644 --- a/examples/noise/main.go +++ b/examples/noise/main.go @@ -18,7 +18,9 @@ import ( n "github.com/waku-org/go-noise" "github.com/waku-org/go-waku/waku/v2/dnsdisc" "github.com/waku-org/go-waku/waku/v2/node" + "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/noise" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" @@ -186,7 +188,7 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode, pairingObj *noise.P msg.Timestamp = wakuNode.Timesource().Now().UnixNano() - _, err = wakuNode.Relay().Publish(ctx, msg) + _, err = wakuNode.Relay().Publish(ctx, msg, relay.WithDefaultPubsubTopic()) if err != nil { log.Error("Error sending a message", zap.Error(err)) } @@ -194,13 +196,13 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode, pairingObj *noise.P } func readLoop(ctx context.Context, wakuNode *node.WakuNode, pairingObj *noise.Pairing) { - sub, err := wakuNode.Relay().Subscribe(ctx) + sub, err := wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic)) if err != nil { log.Error("Could not subscribe", zap.Error(err)) return } - for value := range sub.Ch { + for value := range sub[0].Ch { if value.Message().ContentTopic != pairingObj.ContentTopic { continue } diff --git a/examples/rln/main.go b/examples/rln/main.go index 6a2aec2a..ee8d7099 100644 --- a/examples/rln/main.go +++ b/examples/rln/main.go @@ -17,6 +17,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/payload" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/utils" "go.uber.org/zap" ) @@ -128,7 +129,7 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) { log.Error("Error appending proof", zap.Error(err)) } - _, err = wakuNode.Relay().PublishToTopic(ctx, msg, pubsubTopic.String()) + _, err = wakuNode.Relay().Publish(ctx, msg, relay.WithPubSubTopic(pubsubTopic.String())) if err != nil { log.Error("Error sending a message", zap.Error(err)) } diff --git a/library/lightpush.go b/library/lightpush.go index 16389a1f..62bc2bee 100644 --- a/library/lightpush.go +++ b/library/lightpush.go @@ -41,7 +41,7 @@ func lightpushPublish(msg *pb.WakuMessage, pubsubTopic string, peerID string, ms lpOptions = append(lpOptions, lightpush.WithPubSubTopic(pubsubTopic)) } - hash, err := wakuState.node.Lightpush().PublishToTopic(ctx, msg, lpOptions...) + hash, err := wakuState.node.Lightpush().Publish(ctx, msg, lpOptions...) return hexutil.Encode(hash), err } diff --git a/library/relay.go b/library/relay.go index 63a5d232..0cd2093a 100644 --- a/library/relay.go +++ b/library/relay.go @@ -39,7 +39,7 @@ func relayPublish(msg *pb.WakuMessage, pubsubTopic string, ms int) (string, erro ctx = context.Background() } - hash, err := wakuState.node.Relay().PublishToTopic(ctx, msg, pubsubTopic) + hash, err := wakuState.node.Relay().Publish(ctx, msg, relay.WithPubSubTopic(pubsubTopic)) return hexutil.Encode(hash), err } diff --git a/tests/connection_test.go b/tests/connection_test.go index 1bc98450..910679da 100644 --- a/tests/connection_test.go +++ b/tests/connection_test.go @@ -72,6 +72,6 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) erro Timestamp: timestamp, } - _, err = wakuNode.Relay().PublishToTopic(ctx, msg, relay.DefaultWakuTopic) + _, err = wakuNode.Relay().Publish(ctx, msg, relay.WithDefaultPubsubTopic()) return err } diff --git a/waku/v2/node/wakunode2_test.go b/waku/v2/node/wakunode2_test.go index 9d4d75e8..8becee59 100644 --- a/waku/v2/node/wakunode2_test.go +++ b/waku/v2/node/wakunode2_test.go @@ -208,7 +208,7 @@ func Test500(t *testing.T) { msg := createTestMsg(0) msg.Payload = int2Bytes(i) msg.Timestamp = int64(i) - if _, err := wakuNode2.Relay().PublishToTopic(ctx, msg, relay.DefaultWakuTopic); err != nil { + if _, err := wakuNode2.Relay().Publish(ctx, msg, relay.WithDefaultPubsubTopic()); err != nil { require.Fail(t, "Could not publish all messages") } time.Sleep(5 * time.Millisecond) @@ -292,7 +292,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) { time.Sleep(500 * time.Millisecond) - if _, err := wakuNode1.Relay().PublishToTopic(ctx, msg, relay.DefaultWakuTopic); err != nil { + if _, err := wakuNode1.Relay().Publish(ctx, msg, relay.WithDefaultPubsubTopic()); err != nil { require.Fail(t, "Could not publish all messages") } diff --git a/waku/v2/protocol/filter/filter_proto_ident_test.go b/waku/v2/protocol/filter/filter_proto_ident_test.go index d41c70f8..03919d8c 100644 --- a/waku/v2/protocol/filter/filter_proto_ident_test.go +++ b/waku/v2/protocol/filter/filter_proto_ident_test.go @@ -5,16 +5,18 @@ import ( "encoding/hex" "errors" "fmt" - "github.com/libp2p/go-msgio/pbio" - "github.com/waku-org/go-waku/waku/v2/peermanager" - "github.com/waku-org/go-waku/waku/v2/protocol/filter/pb" - "golang.org/x/exp/slices" "math" "net/http" "strings" "sync" "time" + "github.com/libp2p/go-msgio/pbio" + "github.com/waku-org/go-waku/waku/v2/peermanager" + "github.com/waku-org/go-waku/waku/v2/protocol/filter/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "golang.org/x/exp/slices" + libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" "github.com/libp2p/go-libp2p/core/peer" @@ -30,7 +32,7 @@ func (s *FilterTestSuite) TestCreateSubscription() { // Initial subscribe s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) s.waitForMsg(func() { - _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic) + _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(s.testTopic)) s.Require().NoError(err) }, s.subDetails[0].C) @@ -42,7 +44,7 @@ func (s *FilterTestSuite) TestModifySubscription() { s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) s.waitForMsg(func() { - _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic) + _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(s.testTopic)) s.Require().NoError(err) }, s.subDetails[0].C) @@ -52,7 +54,7 @@ func (s *FilterTestSuite) TestModifySubscription() { s.subDetails = s.subscribe(s.testTopic, newContentTopic, s.fullNodeHost.ID()) s.waitForMsg(func() { - _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), s.testTopic) + _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(s.testTopic)) s.Require().NoError(err) }, s.subDetails[0].C) @@ -64,13 +66,13 @@ func (s *FilterTestSuite) TestMultipleMessages() { s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) s.waitForMsg(func() { - _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "first"), s.testTopic) + _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "first"), relay.WithPubSubTopic(s.testTopic)) s.Require().NoError(err) }, s.subDetails[0].C) s.waitForMsg(func() { - _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), s.testTopic) + _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), relay.WithPubSubTopic(s.testTopic)) s.Require().NoError(err) }, s.subDetails[0].C) @@ -286,7 +288,7 @@ func (s *FilterTestSuite) TestIncorrectPushIdentifier() { time.Sleep(1 * time.Second) // Send message - _, err = s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), s.testTopic) + _, err = s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), relay.WithPubSubTopic(s.testTopic)) s.Require().NoError(err) // Message should never arrive -> exit after timeout diff --git a/waku/v2/protocol/filter/filter_test.go b/waku/v2/protocol/filter/filter_test.go index 0bdada3e..b1cbfcb3 100644 --- a/waku/v2/protocol/filter/filter_test.go +++ b/waku/v2/protocol/filter/filter_test.go @@ -275,13 +275,13 @@ func (s *FilterTestSuite) publishMsg(topic, contentTopic string, optionalPayload payload = "123" } - _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(contentTopic, utils.GetUnixEpoch(), payload), topic) + _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(contentTopic, utils.GetUnixEpoch(), payload), relay.WithPubSubTopic(topic)) s.Require().NoError(err) } func (s *FilterTestSuite) publishMessages(msgs []WakuMsg) { for _, m := range msgs { - _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(m.contentTopic, utils.GetUnixEpoch(), m.payload), m.pubSubTopic) + _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(m.contentTopic, utils.GetUnixEpoch(), m.payload), relay.WithPubSubTopic(m.pubSubTopic)) s.Require().NoError(err) } } @@ -495,7 +495,7 @@ func (s *FilterTestSuite) TestAutoShard() { s.log.Info("Testing Autoshard:CreateSubscription") s.subDetails = s.subscribe("", s.testContentTopic, s.fullNodeHost.ID()) s.waitForMsg(func() { - _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic) + _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(s.testTopic)) s.Require().NoError(err) }, s.subDetails[0].C) @@ -528,7 +528,7 @@ func (s *FilterTestSuite) TestAutoShard() { s.subDetails = s.subscribe("", newContentTopic, s.fullNodeHost.ID()) s.waitForMsg(func() { - _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), s.testTopic) + _, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(s.testTopic)) s.Require().NoError(err) }, s.subDetails[0].C) diff --git a/waku/v2/protocol/legacy_filter/waku_filter_test.go b/waku/v2/protocol/legacy_filter/waku_filter_test.go index a4aa32bb..4f35ace2 100644 --- a/waku/v2/protocol/legacy_filter/waku_filter_test.go +++ b/waku/v2/protocol/legacy_filter/waku_filter_test.go @@ -110,7 +110,7 @@ func TestWakuFilter(t *testing.T) { require.Equal(t, contentFilter.ContentTopics[0], env.Message().GetContentTopic()) }() - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic) + _, err = node2.Publish(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(testTopic)) require.NoError(t, err) wg.Wait() @@ -127,7 +127,7 @@ func TestWakuFilter(t *testing.T) { } }() - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage("TopicB", utils.GetUnixEpoch()), testTopic) + _, err = node2.Publish(ctx, tests.CreateWakuMessage("TopicB", utils.GetUnixEpoch()), relay.WithPubSubTopic(testTopic)) require.NoError(t, err) wg.Wait() @@ -149,7 +149,7 @@ func TestWakuFilter(t *testing.T) { time.Sleep(1 * time.Second) - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic) + _, err = node2.Publish(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(testTopic)) require.NoError(t, err) wg.Wait() } @@ -207,7 +207,7 @@ func TestWakuFilterPeerFailure(t *testing.T) { }() - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic) + _, err = node2.Publish(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(testTopic)) require.NoError(t, err) wg.Wait() @@ -217,7 +217,7 @@ func TestWakuFilterPeerFailure(t *testing.T) { time.Sleep(1 * time.Second) - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic) + _, err = node2.Publish(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(testTopic)) require.NoError(t, err) // TODO: find out how to eliminate this sleep @@ -226,7 +226,7 @@ func TestWakuFilterPeerFailure(t *testing.T) { time.Sleep(3 * time.Second) - _, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic) + _, err = node2.Publish(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(testTopic)) require.NoError(t, err) time.Sleep(1 * time.Second) diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index a54dee24..3439c17b 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -118,7 +118,7 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(network.Stream) // TODO: Assumes success, should probably be extended to check for network, peers, etc // It might make sense to use WithReadiness option here? - _, err = wakuLP.relay.PublishToTopic(ctx, message, pubSubTopic) + _, err = wakuLP.relay.Publish(ctx, message, relay.WithPubSubTopic(pubSubTopic)) if err != nil { logger.Error("publishing message", zap.Error(err)) wakuLP.metrics.RecordError(messagePushFailure) @@ -261,9 +261,10 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe return params, nil } -// Optional PublishToTopic is used to broadcast a WakuMessage to a pubsub topic via lightpush protocol -// If pubSubTopic is not provided, then contentTopic is use to derive the relevant pubSubTopic via autosharding. -func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.WakuMessage, opts ...Option) ([]byte, error) { +// Publish is used to broadcast a WakuMessage to the pubSubTopic (which is derived from the +// contentTopic) via lightpush protocol. If auto-sharding is not to be used, then the +// `WithPubSubTopic` option should be provided to publish the message to an specific pubSubTopic +func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessage, opts ...Option) ([]byte, error) { if message == nil { return nil, errors.New("message can't be null") } @@ -289,9 +290,3 @@ func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.Wa return nil, errors.New(response.Info) } - -// Publish is used to broadcast a WakuMessage to the pubSubTopic (which is derived from the contentTopic) via lightpush protocol -// If auto-sharding is not to be used, then PublishToTopic API should be used -func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessage, opts ...Option) ([]byte, error) { - return wakuLP.PublishToTopic(ctx, message, opts...) -} diff --git a/waku/v2/protocol/lightpush/waku_lightpush_option.go b/waku/v2/protocol/lightpush/waku_lightpush_option.go index 55dba60a..d1627c16 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_option.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_option.go @@ -5,6 +5,7 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/waku-org/go-waku/waku/v2/peermanager" "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "go.uber.org/zap" ) @@ -40,12 +41,6 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) Option { } } -func WithPubSubTopic(pubsubTopic string) Option { - return func(params *lightPushParameters) { - params.pubsubTopic = pubsubTopic - } -} - // WithFastestPeerSelection is an option used to select a peer from the peer store // with the lowest ping. If a list of specific peers is passed, the peer will be chosen // from that list assuming it supports the chosen protocol, otherwise it will chose a peer @@ -56,6 +51,20 @@ func WithFastestPeerSelection(fromThesePeers ...peer.ID) Option { } } +// WithPubSubTopic is used to specify the pubsub topic on which a WakuMessage will be broadcasted +func WithPubSubTopic(pubsubTopic string) Option { + return func(params *lightPushParameters) { + params.pubsubTopic = pubsubTopic + } +} + +// WithDefaultPubsubTopic is used to indicate that the message should be broadcasted in the default pubsub topic +func WithDefaultPubsubTopic() Option { + return func(params *lightPushParameters) { + params.pubsubTopic = relay.DefaultWakuTopic + } +} + // WithRequestID is an option to set a specific request ID to be used when // publishing a message func WithRequestID(requestID []byte) Option { diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index ab4455a8..41beb261 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -109,7 +109,7 @@ func TestWakuLightPush(t *testing.T) { lpOptions = append(lpOptions, WithPeer(host2.ID())) // Checking that msg hash is correct - hash, err := client.PublishToTopic(ctx, msg2, lpOptions...) + hash, err := client.Publish(ctx, msg2, lpOptions...) require.NoError(t, err) require.Equal(t, protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), string(testTopic)).Hash(), hash) wg.Wait() @@ -141,7 +141,7 @@ func TestWakuLightPushNoPeers(t *testing.T) { var lpOptions []Option lpOptions = append(lpOptions, WithPubSubTopic(testTopic)) - _, err = client.PublishToTopic(ctx, tests.CreateWakuMessage("test", utils.GetUnixEpoch()), lpOptions...) + _, err = client.Publish(ctx, tests.CreateWakuMessage("test", utils.GetUnixEpoch()), lpOptions...) require.Errorf(t, err, "no suitable remote peers") } diff --git a/waku/v2/protocol/noise/pairing_relay_messenger.go b/waku/v2/protocol/noise/pairing_relay_messenger.go index 489fc446..05a9bdd5 100644 --- a/waku/v2/protocol/noise/pairing_relay_messenger.go +++ b/waku/v2/protocol/noise/pairing_relay_messenger.go @@ -129,7 +129,7 @@ func (r *NoiseWakuRelay) Publish(ctx context.Context, contentTopic string, paylo message.ContentTopic = contentTopic message.Timestamp = r.timesource.Now().UnixNano() - _, err = r.relay.PublishToTopic(ctx, message, r.pubsubTopic) + _, err = r.relay.Publish(ctx, message, relay.WithPubSubTopic(r.pubsubTopic)) return err } diff --git a/waku/v2/protocol/relay/options.go b/waku/v2/protocol/relay/options.go new file mode 100644 index 00000000..9d7979fe --- /dev/null +++ b/waku/v2/protocol/relay/options.go @@ -0,0 +1,22 @@ +package relay + +type publishParameters struct { + pubsubTopic string +} + +// PublishOption is the type of options accepted when publishing WakuMessages +type PublishOption func(*publishParameters) + +// WithPubSubTopic is used to specify the pubsub topic on which a WakuMessage will be broadcasted +func WithPubSubTopic(pubsubTopic string) PublishOption { + return func(params *publishParameters) { + params.pubsubTopic = pubsubTopic + } +} + +// WithPubSubTopic is used to indicate that the message should be broadcasted in the default pubsub topic +func WithDefaultPubsubTopic() PublishOption { + return func(params *publishParameters) { + params.pubsubTopic = DefaultWakuTopic + } +} diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index d051e308..f55eba1e 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -223,8 +223,10 @@ func (w *WakuRelay) subscribeToPubsubTopic(topic string) (subs *pubsub.Subscript return sub, nil } -// PublishToTopic is used to broadcast a WakuMessage to a pubsub topic -func (w *WakuRelay) PublishToTopic(ctx context.Context, message *pb.WakuMessage, topic string) ([]byte, error) { +// PublishToTopic is used to broadcast a WakuMessage to a pubsub topic. The pubsubTopic is derived from contentTopic +// specified in the message via autosharding. To publish to a specific pubsubTopic, the `WithPubSubTopic` option should +// be provided +func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, opts ...PublishOption) ([]byte, error) { // Publish a `WakuMessage` to a PubSub topic. if w.pubsub == nil { return nil, errors.New("PubSub hasn't been set") @@ -234,15 +236,28 @@ func (w *WakuRelay) PublishToTopic(ctx context.Context, message *pb.WakuMessage, return nil, errors.New("message can't be null") } - if err := message.Validate(); err != nil { + err := message.Validate() + if err != nil { return nil, err } - if !w.EnoughPeersToPublishToTopic(topic) { + params := new(publishParameters) + for _, opt := range opts { + opt(params) + } + + if params.pubsubTopic == "" { + params.pubsubTopic, err = waku_proto.GetPubSubTopicFromContentTopic(message.ContentTopic) + if err != nil { + return nil, err + } + } + + if !w.EnoughPeersToPublishToTopic(params.pubsubTopic) { return nil, errors.New("not enough peers to publish") } - pubSubTopic, err := w.upsertTopic(topic) + pubSubTopic, err := w.upsertTopic(params.pubsubTopic) if err != nil { return nil, err } @@ -257,23 +272,13 @@ func (w *WakuRelay) PublishToTopic(ctx context.Context, message *pb.WakuMessage, return nil, err } - hash := message.Hash(topic) + hash := message.Hash(params.pubsubTopic) - w.log.Debug("waku.relay published", zap.String("pubsubTopic", topic), logging.HexString("hash", hash), zap.Int64("publishTime", w.timesource.Now().UnixNano()), zap.Int("payloadSizeBytes", len(message.Payload))) + w.log.Debug("waku.relay published", zap.String("pubsubTopic", params.pubsubTopic), logging.HexString("hash", hash), zap.Int64("publishTime", w.timesource.Now().UnixNano()), zap.Int("payloadSizeBytes", len(message.Payload))) return hash, nil } -// Publish is used to broadcast a WakuMessage, the pubsubTopic is derived from contentTopic specified in the message via autosharding. -// To publish to a specific pubsubTopic, please use PublishToTopic -func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage) ([]byte, error) { - pubSubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(message.ContentTopic) - if err != nil { - return nil, err - } - return w.PublishToTopic(ctx, message, pubSubTopic) -} - func (w *WakuRelay) GetSubscription(contentTopic string) (*Subscription, error) { pubSubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic) if err != nil { diff --git a/waku/v2/protocol/relay/waku_relay_test.go b/waku/v2/protocol/relay/waku_relay_test.go index e49db217..8130dc4f 100644 --- a/waku/v2/protocol/relay/waku_relay_test.go +++ b/waku/v2/protocol/relay/waku_relay_test.go @@ -63,7 +63,7 @@ func TestWakuRelay(t *testing.T) { ContentTopic: "test", Timestamp: 0, } - _, err = relay.PublishToTopic(context.Background(), msg, testTopic) + _, err = relay.Publish(context.Background(), msg, WithPubSubTopic(testTopic)) require.NoError(t, err) time.Sleep(2 * time.Second) @@ -273,7 +273,7 @@ func TestWakuRelayAutoShard(t *testing.T) { Timestamp: 0, } - _, err = relay.PublishToTopic(context.Background(), msg1, subs[0].contentFilter.PubsubTopic) + _, err = relay.Publish(context.Background(), msg1, WithPubSubTopic(subs[0].contentFilter.PubsubTopic)) require.NoError(t, err) wg = waitForMsg(t, subs1[0].Ch, testcTopic1) @@ -300,7 +300,7 @@ func TestWakuRelayAutoShard(t *testing.T) { Timestamp: 1, } - _, err = relay.PublishToTopic(context.Background(), msg2, subs[0].contentFilter.PubsubTopic) + _, err = relay.Publish(context.Background(), msg2, WithPubSubTopic(subs[0].contentFilter.PubsubTopic)) require.NoError(t, err) wg2.Wait()