refactor: publish API for relay and lightpush (#845)

This commit is contained in:
richΛrd 2023-10-30 12:30:25 -04:00 committed by GitHub
parent ddf188bbf8
commit 38202e7a2e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 137 additions and 114 deletions

View File

@ -124,10 +124,13 @@ build-example-filter2:
build-example-c-bindings:
cd examples/c-bindings && $(MAKE)
build-example-noise:
cd examples/noise && $(MAKE)
build-example-rln:
cd examples/rln && $(MAKE)
build-example: build-example-basic2 build-example-chat-2 build-example-filter2 build-example-c-bindings build-example-rln
build-example: build-example-basic2 build-example-chat-2 build-example-filter2 build-example-c-bindings build-example-noise build-example-rln
static-library:
@echo "Building static library..."

View File

@ -220,7 +220,7 @@ func (r *RelayService) postV1Message(w http.ResponseWriter, req *http.Request) {
return
}
_, err = r.node.Relay().PublishToTopic(req.Context(), message, strings.Replace(topic, "\n", "", -1))
_, err = r.node.Relay().Publish(req.Context(), message, relay.WithPubSubTopic(strings.Replace(topic, "\n", "", -1)))
if err != nil {
r.log.Error("publishing message", zap.Error(err))
}

View File

@ -143,10 +143,10 @@ func TestFilterGetV1Messages(t *testing.T) {
// Wait for the subscription to be started
time.Sleep(1 * time.Second)
_, err = serviceA.node.Relay().PublishToTopic(
_, err = serviceA.node.Relay().Publish(
context.Background(),
&wpb.WakuMessage{ContentTopic: "ct"},
testTopic,
relay.WithPubSubTopic(testTopic),
)
require.NoError(t, err)
require.True(t, reply)

View File

@ -115,7 +115,7 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs,
return err
}
_, err = r.node.Relay().PublishToTopic(req.Context(), msg, topic)
_, err = r.node.Relay().Publish(req.Context(), msg, relay.WithPubSubTopic(topic))
if err != nil {
r.log.Error("publishing message", zap.Error(err))
return err

View File

@ -65,22 +65,17 @@ if err != nil {
```
To send a message, it needs to be wrapped into a [`WakuMessage`](https://rfc.vac.dev/spec/14/) protobuffer. The payload of the message is not limited to strings. Any kind of data that can be serialized
To send a message, it needs to be wrapped into a [`WakuMessage`](https://rfc.vac.dev/spec/14/) protobuffer.
The payload of the message is not limited to strings. Any kind of data that can be serialized
into a `[]byte` can be sent as long as it does not exceed the maximum length a message can have (~1MB)
The following functions can be used to publish a message:
- `wakuNode.Lightpush().Publish(ctx, msg, opts...)` - to send a message to the default waku pubsub topic
- `wakuNode.Lightpush().PublishToTopic(ctx, msg, topic, opts...)` - to send a message to a custom pubsub topic
`wakuNode.Lightpush().Publish(ctx, msg, opts...)` is used to publish a message. This function will return a message id on success, or an error if the message could not be published.
Both of these functions will return a message id on success, or an error if the message could not be published.
If no options are specified, go-waku will automatically choose the peer used to broadcast the message via Lightpush. This behaviour can be controlled via options:
If no options are specified, go-waku will automatically choose the peer used to broadcast the message via Lightpush and publish the message to a pubsub topic derived from the content topic of the message. This behaviour can be controlled via options:
### Options
- `lightpush.WithPubSubTopic(topic)` - broadcast the message using a custom pubsub topic
- `lightpush.WithDefaultPubsubTopic()` - broadcast the message to the default pubsub topic
- `lightpush.WithPeer(peerID)` - use an specific peer ID (which should be part of the node peerstore) to broadcast the message with
- `lightpush.WithAutomaticPeerSelection(host)` - automatically select a peer that supports lightpush protocol from the peerstore to broadcast the message with
- `lightpush.WithFastestPeerSelection(ctx)` - automatically select a peer based on its ping reply time

View File

@ -41,34 +41,19 @@ One of these options must be specified when instantiating a node supporting the
## Receiving messages
```go
...
sub, err := wakuNode.Relay().Subscribe(context.Background())
contentFilter := protocol.NewContentFilter(relay.DefaultWakuTopic)
sub, err := wakuNode.Relay().Subscribe(context.Background, contentFilter) ([]*Subscription, error)
if err != nil {
fmt.Println(err)
return
}
for value := range sub.C {
for value := range sub[0].C {
fmt.Println("Received msg:", string(value.Message().Payload))
}
...
```
To receive messages sent via the relay protocol, you need to subscribe to a pubsub topic. This can be done via any of these functions:
- `wakuNode.Relay().Subscribe(ctx)` - subscribes to the default waku pubsub topic `/waku/2/default-waku/proto`
- `wakuNode.Relay().SubscribeToTopic(ctx, topic)` - subscribes to a custom pubsub topic
These functions return a `Subscription` struct containing a channel on which messages will be received. To stop receiving messages in this channel `sub.Unsubscribe()` can be executed which will close the channel (without unsubscribing from the pubsub topic)
> Pubsub topics should follow the [recommended usage](https://rfc.vac.dev/spec/23/) structure. For this purpose, the `NewPubsubTopic` helper function was created:
```go
import "github.com/waku-org/go-waku/waku/v2/protocol"
topic := protocol.NewPubsubTopic("the_topic_name", "the_encoding")
/*
fmt.Println(topic.String()) // => `/waku/2/the_topic_name/the_encoding`
*/
```
To receive messages sent via the relay protocol, you need to subscribe specifying a content filter with the function `Subscribe(ctx context.Context, contentFilter waku_proto.ContentFilter, opts ...RelaySubscribeOption) ([]*Subscription, error)`. This functions return a list of `Subscription` struct containing a channel on which messages will be received. To stop receiving messages `WakuRelay`'s `Unsubscribe(ctx context.Context, contentFilter waku_proto.ContentFilter) error` can be executed which will close the channel (without unsubscribing from the pubsub topic) which will make sure the subscription is stopped, and if no other subscriptions exist for underlying pubsub topic, the pubsub is also unsubscribed.
## Sending messages
@ -95,11 +80,13 @@ if err != nil {
To send a message, it needs to be wrapped into a [`WakuMessage`](https://rfc.vac.dev/spec/14/) protobuffer. The payload of the message is not limited to strings. Any kind of data that can be serialized
into a `[]byte` can be sent as long as it does not exceed the maximum length a message can have (~1MB)
The following functions can be used to publish a message:
- `wakuNode.Relay().Publish(ctx, msg)` - to send a message to the default waku pubsub topic
- `wakuNode.Relay().PublishToTopic(ctx, msg, topic)` - to send a message to a custom pubsub topic
`wakuNode.Relay().Publish(ctx, msg, opts...)` is used to publish a message. This function will return a message id on success, or an error if the message could not be published.
Both of these functions will return a message id on success, or an error if the message could not be published.
If no options are specified, go-waku will automatically choose the peer used to broadcast the message via Relay and publish the message to a pubsub topic derived from the content topic of the message. This behaviour can be controlled via options:
### Options
- `relay.WithPubSubTopic(topic)` - broadcast the message using a custom pubsub topic
- `relay.WithDefaultPubsubTopic()` - broadcast the message to the default pubsub topic
> If `WithWakuRelayAndMinPeers` was used during the instantiation of the wakuNode, it should be possible to verify if there's enough peers for publishing to a topic with `wakuNode.Relay().EnoughPeersToPublish()` and `wakuNode.Relay().EnoughPeersToPublishToTopic(topic)`

View File

@ -103,7 +103,7 @@ func write(ctx context.Context, wakuNode *node.WakuNode, contentTopic string, ms
Timestamp: timestamp,
}
_, err = wakuNode.Relay().Publish(ctx, msg)
_, err = wakuNode.Relay().Publish(ctx, msg, relay.WithDefaultPubsubTopic())
if err != nil {
log.Error("Error sending a message", zap.Error(err))
}

View File

@ -319,18 +319,18 @@ func (c *Chat) publish(ctx context.Context, message string) error {
}
if c.options.LightPush.Enable {
var lightOpt lightpush.Option
lightOpt := []lightpush.Option{lightpush.WithDefaultPubsubTopic()}
var peerID peer.ID
peerID, err = options.LightPush.NodePeerID()
if err != nil {
lightOpt = lightpush.WithAutomaticPeerSelection()
lightOpt = append(lightOpt, lightpush.WithAutomaticPeerSelection())
} else {
lightOpt = lightpush.WithPeer(peerID)
lightOpt = append(lightOpt, lightpush.WithPeer(peerID))
}
_, err = c.node.Lightpush().Publish(c.ctx, wakuMsg, lightOpt)
_, err = c.node.Lightpush().Publish(c.ctx, wakuMsg, lightOpt...)
} else {
_, err = c.node.Relay().Publish(ctx, wakuMsg)
_, err = c.node.Relay().Publish(ctx, wakuMsg, relay.WithDefaultPubsubTopic())
}
return err

View File

@ -19,6 +19,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
)
@ -98,6 +99,7 @@ func main() {
// Send FilterRequest from light node to full node
cf := protocol.ContentFilter{
PubsubTopic: relay.DefaultWakuTopic,
ContentTopics: protocol.NewContentTopicSet(contentTopic),
}
@ -157,7 +159,7 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) {
Timestamp: timestamp,
}
_, err := wakuNode.Relay().Publish(ctx, msg)
_, err := wakuNode.Relay().Publish(ctx, msg, relay.WithPubSubTopic(pubSubTopic.String()))
if err != nil {
log.Error("Error sending a message: ", err)
}

View File

@ -18,7 +18,9 @@ import (
n "github.com/waku-org/go-noise"
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/noise"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
@ -186,7 +188,7 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode, pairingObj *noise.P
msg.Timestamp = wakuNode.Timesource().Now().UnixNano()
_, err = wakuNode.Relay().Publish(ctx, msg)
_, err = wakuNode.Relay().Publish(ctx, msg, relay.WithDefaultPubsubTopic())
if err != nil {
log.Error("Error sending a message", zap.Error(err))
}
@ -194,13 +196,13 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode, pairingObj *noise.P
}
func readLoop(ctx context.Context, wakuNode *node.WakuNode, pairingObj *noise.Pairing) {
sub, err := wakuNode.Relay().Subscribe(ctx)
sub, err := wakuNode.Relay().Subscribe(ctx, protocol.NewContentFilter(relay.DefaultWakuTopic))
if err != nil {
log.Error("Could not subscribe", zap.Error(err))
return
}
for value := range sub.Ch {
for value := range sub[0].Ch {
if value.Message().ContentTopic != pairingObj.ContentTopic {
continue
}

View File

@ -17,6 +17,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/payload"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
@ -128,7 +129,7 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) {
log.Error("Error appending proof", zap.Error(err))
}
_, err = wakuNode.Relay().PublishToTopic(ctx, msg, pubsubTopic.String())
_, err = wakuNode.Relay().Publish(ctx, msg, relay.WithPubSubTopic(pubsubTopic.String()))
if err != nil {
log.Error("Error sending a message", zap.Error(err))
}

View File

@ -41,7 +41,7 @@ func lightpushPublish(msg *pb.WakuMessage, pubsubTopic string, peerID string, ms
lpOptions = append(lpOptions, lightpush.WithPubSubTopic(pubsubTopic))
}
hash, err := wakuState.node.Lightpush().PublishToTopic(ctx, msg, lpOptions...)
hash, err := wakuState.node.Lightpush().Publish(ctx, msg, lpOptions...)
return hexutil.Encode(hash), err
}

View File

@ -39,7 +39,7 @@ func relayPublish(msg *pb.WakuMessage, pubsubTopic string, ms int) (string, erro
ctx = context.Background()
}
hash, err := wakuState.node.Relay().PublishToTopic(ctx, msg, pubsubTopic)
hash, err := wakuState.node.Relay().Publish(ctx, msg, relay.WithPubSubTopic(pubsubTopic))
return hexutil.Encode(hash), err
}

View File

@ -72,6 +72,6 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) erro
Timestamp: timestamp,
}
_, err = wakuNode.Relay().PublishToTopic(ctx, msg, relay.DefaultWakuTopic)
_, err = wakuNode.Relay().Publish(ctx, msg, relay.WithDefaultPubsubTopic())
return err
}

View File

@ -208,7 +208,7 @@ func Test500(t *testing.T) {
msg := createTestMsg(0)
msg.Payload = int2Bytes(i)
msg.Timestamp = int64(i)
if _, err := wakuNode2.Relay().PublishToTopic(ctx, msg, relay.DefaultWakuTopic); err != nil {
if _, err := wakuNode2.Relay().Publish(ctx, msg, relay.WithDefaultPubsubTopic()); err != nil {
require.Fail(t, "Could not publish all messages")
}
time.Sleep(5 * time.Millisecond)
@ -292,7 +292,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
time.Sleep(500 * time.Millisecond)
if _, err := wakuNode1.Relay().PublishToTopic(ctx, msg, relay.DefaultWakuTopic); err != nil {
if _, err := wakuNode1.Relay().Publish(ctx, msg, relay.WithDefaultPubsubTopic()); err != nil {
require.Fail(t, "Could not publish all messages")
}

View File

@ -5,16 +5,18 @@ import (
"encoding/hex"
"errors"
"fmt"
"github.com/libp2p/go-msgio/pbio"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/protocol/filter/pb"
"golang.org/x/exp/slices"
"math"
"net/http"
"strings"
"sync"
"time"
"github.com/libp2p/go-msgio/pbio"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/protocol/filter/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"golang.org/x/exp/slices"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/peer"
@ -30,7 +32,7 @@ func (s *FilterTestSuite) TestCreateSubscription() {
// Initial subscribe
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.waitForMsg(func() {
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic)
_, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(s.testTopic))
s.Require().NoError(err)
}, s.subDetails[0].C)
@ -42,7 +44,7 @@ func (s *FilterTestSuite) TestModifySubscription() {
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.waitForMsg(func() {
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic)
_, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(s.testTopic))
s.Require().NoError(err)
}, s.subDetails[0].C)
@ -52,7 +54,7 @@ func (s *FilterTestSuite) TestModifySubscription() {
s.subDetails = s.subscribe(s.testTopic, newContentTopic, s.fullNodeHost.ID())
s.waitForMsg(func() {
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), s.testTopic)
_, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(s.testTopic))
s.Require().NoError(err)
}, s.subDetails[0].C)
@ -64,13 +66,13 @@ func (s *FilterTestSuite) TestMultipleMessages() {
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
s.waitForMsg(func() {
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "first"), s.testTopic)
_, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "first"), relay.WithPubSubTopic(s.testTopic))
s.Require().NoError(err)
}, s.subDetails[0].C)
s.waitForMsg(func() {
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), s.testTopic)
_, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), relay.WithPubSubTopic(s.testTopic))
s.Require().NoError(err)
}, s.subDetails[0].C)
@ -286,7 +288,7 @@ func (s *FilterTestSuite) TestIncorrectPushIdentifier() {
time.Sleep(1 * time.Second)
// Send message
_, err = s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), s.testTopic)
_, err = s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), relay.WithPubSubTopic(s.testTopic))
s.Require().NoError(err)
// Message should never arrive -> exit after timeout

View File

@ -275,13 +275,13 @@ func (s *FilterTestSuite) publishMsg(topic, contentTopic string, optionalPayload
payload = "123"
}
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(contentTopic, utils.GetUnixEpoch(), payload), topic)
_, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(contentTopic, utils.GetUnixEpoch(), payload), relay.WithPubSubTopic(topic))
s.Require().NoError(err)
}
func (s *FilterTestSuite) publishMessages(msgs []WakuMsg) {
for _, m := range msgs {
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(m.contentTopic, utils.GetUnixEpoch(), m.payload), m.pubSubTopic)
_, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(m.contentTopic, utils.GetUnixEpoch(), m.payload), relay.WithPubSubTopic(m.pubSubTopic))
s.Require().NoError(err)
}
}
@ -495,7 +495,7 @@ func (s *FilterTestSuite) TestAutoShard() {
s.log.Info("Testing Autoshard:CreateSubscription")
s.subDetails = s.subscribe("", s.testContentTopic, s.fullNodeHost.ID())
s.waitForMsg(func() {
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic)
_, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(s.testTopic))
s.Require().NoError(err)
}, s.subDetails[0].C)
@ -528,7 +528,7 @@ func (s *FilterTestSuite) TestAutoShard() {
s.subDetails = s.subscribe("", newContentTopic, s.fullNodeHost.ID())
s.waitForMsg(func() {
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), s.testTopic)
_, err := s.relayNode.Publish(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(s.testTopic))
s.Require().NoError(err)
}, s.subDetails[0].C)

View File

@ -110,7 +110,7 @@ func TestWakuFilter(t *testing.T) {
require.Equal(t, contentFilter.ContentTopics[0], env.Message().GetContentTopic())
}()
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic)
_, err = node2.Publish(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(testTopic))
require.NoError(t, err)
wg.Wait()
@ -127,7 +127,7 @@ func TestWakuFilter(t *testing.T) {
}
}()
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage("TopicB", utils.GetUnixEpoch()), testTopic)
_, err = node2.Publish(ctx, tests.CreateWakuMessage("TopicB", utils.GetUnixEpoch()), relay.WithPubSubTopic(testTopic))
require.NoError(t, err)
wg.Wait()
@ -149,7 +149,7 @@ func TestWakuFilter(t *testing.T) {
time.Sleep(1 * time.Second)
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic)
_, err = node2.Publish(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(testTopic))
require.NoError(t, err)
wg.Wait()
}
@ -207,7 +207,7 @@ func TestWakuFilterPeerFailure(t *testing.T) {
}()
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic)
_, err = node2.Publish(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(testTopic))
require.NoError(t, err)
wg.Wait()
@ -217,7 +217,7 @@ func TestWakuFilterPeerFailure(t *testing.T) {
time.Sleep(1 * time.Second)
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic)
_, err = node2.Publish(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(testTopic))
require.NoError(t, err)
// TODO: find out how to eliminate this sleep
@ -226,7 +226,7 @@ func TestWakuFilterPeerFailure(t *testing.T) {
time.Sleep(3 * time.Second)
_, err = node2.PublishToTopic(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), testTopic)
_, err = node2.Publish(ctx, tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch()), relay.WithPubSubTopic(testTopic))
require.NoError(t, err)
time.Sleep(1 * time.Second)

View File

@ -118,7 +118,7 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(network.Stream)
// TODO: Assumes success, should probably be extended to check for network, peers, etc
// It might make sense to use WithReadiness option here?
_, err = wakuLP.relay.PublishToTopic(ctx, message, pubSubTopic)
_, err = wakuLP.relay.Publish(ctx, message, relay.WithPubSubTopic(pubSubTopic))
if err != nil {
logger.Error("publishing message", zap.Error(err))
wakuLP.metrics.RecordError(messagePushFailure)
@ -261,9 +261,10 @@ func (wakuLP *WakuLightPush) handleOpts(ctx context.Context, message *wpb.WakuMe
return params, nil
}
// Optional PublishToTopic is used to broadcast a WakuMessage to a pubsub topic via lightpush protocol
// If pubSubTopic is not provided, then contentTopic is use to derive the relevant pubSubTopic via autosharding.
func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.WakuMessage, opts ...Option) ([]byte, error) {
// Publish is used to broadcast a WakuMessage to the pubSubTopic (which is derived from the
// contentTopic) via lightpush protocol. If auto-sharding is not to be used, then the
// `WithPubSubTopic` option should be provided to publish the message to an specific pubSubTopic
func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessage, opts ...Option) ([]byte, error) {
if message == nil {
return nil, errors.New("message can't be null")
}
@ -289,9 +290,3 @@ func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.Wa
return nil, errors.New(response.Info)
}
// Publish is used to broadcast a WakuMessage to the pubSubTopic (which is derived from the contentTopic) via lightpush protocol
// If auto-sharding is not to be used, then PublishToTopic API should be used
func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessage, opts ...Option) ([]byte, error) {
return wakuLP.PublishToTopic(ctx, message, opts...)
}

View File

@ -5,6 +5,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap"
)
@ -40,12 +41,6 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) Option {
}
}
func WithPubSubTopic(pubsubTopic string) Option {
return func(params *lightPushParameters) {
params.pubsubTopic = pubsubTopic
}
}
// WithFastestPeerSelection is an option used to select a peer from the peer store
// with the lowest ping. If a list of specific peers is passed, the peer will be chosen
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer
@ -56,6 +51,20 @@ func WithFastestPeerSelection(fromThesePeers ...peer.ID) Option {
}
}
// WithPubSubTopic is used to specify the pubsub topic on which a WakuMessage will be broadcasted
func WithPubSubTopic(pubsubTopic string) Option {
return func(params *lightPushParameters) {
params.pubsubTopic = pubsubTopic
}
}
// WithDefaultPubsubTopic is used to indicate that the message should be broadcasted in the default pubsub topic
func WithDefaultPubsubTopic() Option {
return func(params *lightPushParameters) {
params.pubsubTopic = relay.DefaultWakuTopic
}
}
// WithRequestID is an option to set a specific request ID to be used when
// publishing a message
func WithRequestID(requestID []byte) Option {

View File

@ -109,7 +109,7 @@ func TestWakuLightPush(t *testing.T) {
lpOptions = append(lpOptions, WithPeer(host2.ID()))
// Checking that msg hash is correct
hash, err := client.PublishToTopic(ctx, msg2, lpOptions...)
hash, err := client.Publish(ctx, msg2, lpOptions...)
require.NoError(t, err)
require.Equal(t, protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), string(testTopic)).Hash(), hash)
wg.Wait()
@ -141,7 +141,7 @@ func TestWakuLightPushNoPeers(t *testing.T) {
var lpOptions []Option
lpOptions = append(lpOptions, WithPubSubTopic(testTopic))
_, err = client.PublishToTopic(ctx, tests.CreateWakuMessage("test", utils.GetUnixEpoch()), lpOptions...)
_, err = client.Publish(ctx, tests.CreateWakuMessage("test", utils.GetUnixEpoch()), lpOptions...)
require.Errorf(t, err, "no suitable remote peers")
}

View File

@ -129,7 +129,7 @@ func (r *NoiseWakuRelay) Publish(ctx context.Context, contentTopic string, paylo
message.ContentTopic = contentTopic
message.Timestamp = r.timesource.Now().UnixNano()
_, err = r.relay.PublishToTopic(ctx, message, r.pubsubTopic)
_, err = r.relay.Publish(ctx, message, relay.WithPubSubTopic(r.pubsubTopic))
return err
}

View File

@ -0,0 +1,22 @@
package relay
type publishParameters struct {
pubsubTopic string
}
// PublishOption is the type of options accepted when publishing WakuMessages
type PublishOption func(*publishParameters)
// WithPubSubTopic is used to specify the pubsub topic on which a WakuMessage will be broadcasted
func WithPubSubTopic(pubsubTopic string) PublishOption {
return func(params *publishParameters) {
params.pubsubTopic = pubsubTopic
}
}
// WithPubSubTopic is used to indicate that the message should be broadcasted in the default pubsub topic
func WithDefaultPubsubTopic() PublishOption {
return func(params *publishParameters) {
params.pubsubTopic = DefaultWakuTopic
}
}

View File

@ -223,8 +223,10 @@ func (w *WakuRelay) subscribeToPubsubTopic(topic string) (subs *pubsub.Subscript
return sub, nil
}
// PublishToTopic is used to broadcast a WakuMessage to a pubsub topic
func (w *WakuRelay) PublishToTopic(ctx context.Context, message *pb.WakuMessage, topic string) ([]byte, error) {
// PublishToTopic is used to broadcast a WakuMessage to a pubsub topic. The pubsubTopic is derived from contentTopic
// specified in the message via autosharding. To publish to a specific pubsubTopic, the `WithPubSubTopic` option should
// be provided
func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, opts ...PublishOption) ([]byte, error) {
// Publish a `WakuMessage` to a PubSub topic.
if w.pubsub == nil {
return nil, errors.New("PubSub hasn't been set")
@ -234,15 +236,28 @@ func (w *WakuRelay) PublishToTopic(ctx context.Context, message *pb.WakuMessage,
return nil, errors.New("message can't be null")
}
if err := message.Validate(); err != nil {
err := message.Validate()
if err != nil {
return nil, err
}
if !w.EnoughPeersToPublishToTopic(topic) {
params := new(publishParameters)
for _, opt := range opts {
opt(params)
}
if params.pubsubTopic == "" {
params.pubsubTopic, err = waku_proto.GetPubSubTopicFromContentTopic(message.ContentTopic)
if err != nil {
return nil, err
}
}
if !w.EnoughPeersToPublishToTopic(params.pubsubTopic) {
return nil, errors.New("not enough peers to publish")
}
pubSubTopic, err := w.upsertTopic(topic)
pubSubTopic, err := w.upsertTopic(params.pubsubTopic)
if err != nil {
return nil, err
}
@ -257,23 +272,13 @@ func (w *WakuRelay) PublishToTopic(ctx context.Context, message *pb.WakuMessage,
return nil, err
}
hash := message.Hash(topic)
hash := message.Hash(params.pubsubTopic)
w.log.Debug("waku.relay published", zap.String("pubsubTopic", topic), logging.HexString("hash", hash), zap.Int64("publishTime", w.timesource.Now().UnixNano()), zap.Int("payloadSizeBytes", len(message.Payload)))
w.log.Debug("waku.relay published", zap.String("pubsubTopic", params.pubsubTopic), logging.HexString("hash", hash), zap.Int64("publishTime", w.timesource.Now().UnixNano()), zap.Int("payloadSizeBytes", len(message.Payload)))
return hash, nil
}
// Publish is used to broadcast a WakuMessage, the pubsubTopic is derived from contentTopic specified in the message via autosharding.
// To publish to a specific pubsubTopic, please use PublishToTopic
func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage) ([]byte, error) {
pubSubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(message.ContentTopic)
if err != nil {
return nil, err
}
return w.PublishToTopic(ctx, message, pubSubTopic)
}
func (w *WakuRelay) GetSubscription(contentTopic string) (*Subscription, error) {
pubSubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic)
if err != nil {

View File

@ -63,7 +63,7 @@ func TestWakuRelay(t *testing.T) {
ContentTopic: "test",
Timestamp: 0,
}
_, err = relay.PublishToTopic(context.Background(), msg, testTopic)
_, err = relay.Publish(context.Background(), msg, WithPubSubTopic(testTopic))
require.NoError(t, err)
time.Sleep(2 * time.Second)
@ -273,7 +273,7 @@ func TestWakuRelayAutoShard(t *testing.T) {
Timestamp: 0,
}
_, err = relay.PublishToTopic(context.Background(), msg1, subs[0].contentFilter.PubsubTopic)
_, err = relay.Publish(context.Background(), msg1, WithPubSubTopic(subs[0].contentFilter.PubsubTopic))
require.NoError(t, err)
wg = waitForMsg(t, subs1[0].Ch, testcTopic1)
@ -300,7 +300,7 @@ func TestWakuRelayAutoShard(t *testing.T) {
Timestamp: 1,
}
_, err = relay.PublishToTopic(context.Background(), msg2, subs[0].contentFilter.PubsubTopic)
_, err = relay.Publish(context.Background(), msg2, WithPubSubTopic(subs[0].contentFilter.PubsubTopic))
require.NoError(t, err)
wg2.Wait()