mirror of https://github.com/status-im/go-waku.git
refactor: create separate functions for subscriptions and publishing
This commit is contained in:
parent
56ef99e11f
commit
00ee0b7511
|
@ -103,7 +103,7 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) {
|
|||
Timestamp: timestamp,
|
||||
}
|
||||
|
||||
_, err = wakuNode.Relay().Publish(ctx, msg, nil)
|
||||
_, err = wakuNode.Relay().Publish(ctx, msg)
|
||||
if err != nil {
|
||||
log.Error("Error sending a message: ", err)
|
||||
}
|
||||
|
@ -117,7 +117,7 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode) {
|
|||
}
|
||||
|
||||
func readLoop(ctx context.Context, wakuNode *node.WakuNode) {
|
||||
sub, err := wakuNode.Relay().Subscribe(ctx, nil)
|
||||
sub, err := wakuNode.Relay().Subscribe(ctx)
|
||||
if err != nil {
|
||||
log.Error("Could not subscribe: ", err)
|
||||
return
|
||||
|
|
|
@ -51,7 +51,7 @@ func NewChat(ctx context.Context, n *node.WakuNode, selfID peer.ID, contentTopic
|
|||
|
||||
if useLightPush {
|
||||
cf := filter.ContentFilter{
|
||||
Topic: string(relay.GetTopic(nil)),
|
||||
Topic: relay.DefaultWakuTopic,
|
||||
ContentTopics: []string{contentTopic},
|
||||
}
|
||||
var err error
|
||||
|
@ -61,7 +61,7 @@ func NewChat(ctx context.Context, n *node.WakuNode, selfID peer.ID, contentTopic
|
|||
return nil, err
|
||||
}
|
||||
} else {
|
||||
sub, err := n.Relay().Subscribe(ctx, nil)
|
||||
sub, err := n.Relay().Subscribe(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -124,10 +124,10 @@ func (cr *Chat) Publish(ctx context.Context, message string) error {
|
|||
}
|
||||
|
||||
if cr.useLightPush {
|
||||
_, err = cr.node.Lightpush().Publish(ctx, wakuMsg, nil)
|
||||
_, err = cr.node.Lightpush().Publish(ctx, wakuMsg)
|
||||
|
||||
} else {
|
||||
_, err = cr.node.Relay().Publish(ctx, wakuMsg, nil)
|
||||
_, err = cr.node.Relay().Publish(ctx, wakuMsg)
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -158,7 +158,7 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) {
|
|||
Timestamp: timestamp,
|
||||
}
|
||||
|
||||
_, err := wakuNode.Relay().Publish(ctx, msg, nil)
|
||||
_, err := wakuNode.Relay().Publish(ctx, msg)
|
||||
if err != nil {
|
||||
log.Error("Error sending a message: ", err)
|
||||
}
|
||||
|
@ -173,7 +173,7 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode) {
|
|||
|
||||
func readLoop(ctx context.Context, wakuNode *node.WakuNode) {
|
||||
pubsubTopic := pubSubTopic.String()
|
||||
sub, err := wakuNode.Relay().Subscribe(ctx, &pubsubTopic)
|
||||
sub, err := wakuNode.Relay().SubscribeWithTopic(ctx, pubsubTopic)
|
||||
if err != nil {
|
||||
log.Error("Could not subscribe: ", err)
|
||||
return
|
||||
|
|
|
@ -38,7 +38,7 @@ func TestBasicSendingReceiving(t *testing.T) {
|
|||
|
||||
require.NoError(t, write(ctx, wakuNode, "test"))
|
||||
|
||||
sub, err := wakuNode.Relay().Subscribe(ctx, nil)
|
||||
sub, err := wakuNode.Relay().Subscribe(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
value := <-sub.C
|
||||
|
@ -69,6 +69,6 @@ func write(ctx context.Context, wakuNode *node.WakuNode, msgContent string) erro
|
|||
Timestamp: timestamp,
|
||||
}
|
||||
|
||||
_, err = wakuNode.Relay().Publish(ctx, msg, nil)
|
||||
_, err = wakuNode.Relay().Publish(ctx, msg)
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -232,7 +232,7 @@ func Execute(options Options) {
|
|||
|
||||
if !options.Relay.Disable {
|
||||
for _, nodeTopic := range options.Relay.Topics {
|
||||
_, err := wakuNode.Relay().Subscribe(ctx, &nodeTopic)
|
||||
_, err := wakuNode.Relay().SubscribeWithTopic(ctx, nodeTopic)
|
||||
failOnErr(err, "Error subscring to topic")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -362,7 +362,7 @@ func (w *WakuNode) mountRelay(opts ...pubsub.Option) error {
|
|||
}
|
||||
|
||||
if w.opts.enableRelay {
|
||||
_, err = w.relay.Subscribe(w.ctx, nil)
|
||||
_, err = w.relay.Subscribe(w.ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -25,7 +25,7 @@ func makeWakuRelay(t *testing.T, topic string, broadcaster v2.Broadcaster) (*rel
|
|||
relay, err := relay.NewWakuRelay(context.Background(), host, broadcaster)
|
||||
require.NoError(t, err)
|
||||
|
||||
sub, err := relay.Subscribe(context.Background(), &topic)
|
||||
sub, err := relay.SubscribeWithTopic(context.Background(), topic)
|
||||
require.NoError(t, err)
|
||||
|
||||
return relay, sub, host
|
||||
|
@ -95,7 +95,7 @@ func TestWakuFilter(t *testing.T) {
|
|||
require.Equal(t, contentFilter.ContentTopics[0], env.Message().GetContentTopic())
|
||||
}()
|
||||
|
||||
_, err = node2.Publish(ctx, tests.CreateWakuMessage(testContentTopic, 0), &testTopic)
|
||||
_, err = node2.PublishWithTopic(ctx, tests.CreateWakuMessage(testContentTopic, 0), testTopic)
|
||||
require.NoError(t, err)
|
||||
|
||||
wg.Wait()
|
||||
|
@ -112,7 +112,7 @@ func TestWakuFilter(t *testing.T) {
|
|||
}
|
||||
}()
|
||||
|
||||
_, err = node2.Publish(ctx, tests.CreateWakuMessage("TopicB", 1), &testTopic)
|
||||
_, err = node2.PublishWithTopic(ctx, tests.CreateWakuMessage("TopicB", 1), testTopic)
|
||||
require.NoError(t, err)
|
||||
|
||||
wg.Wait()
|
||||
|
@ -134,7 +134,7 @@ func TestWakuFilter(t *testing.T) {
|
|||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
_, err = node2.Publish(ctx, tests.CreateWakuMessage(testContentTopic, 2), &testTopic)
|
||||
_, err = node2.PublishWithTopic(ctx, tests.CreateWakuMessage(testContentTopic, 2), testTopic)
|
||||
require.NoError(t, err)
|
||||
wg.Wait()
|
||||
}
|
||||
|
|
|
@ -83,7 +83,7 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) {
|
|||
// TODO: Assumes success, should probably be extended to check for network, peers, etc
|
||||
// It might make sense to use WithReadiness option here?
|
||||
|
||||
_, err := wakuLP.relay.Publish(wakuLP.ctx, message, &pubSubTopic)
|
||||
_, err := wakuLP.relay.PublishWithTopic(wakuLP.ctx, message, pubSubTopic)
|
||||
|
||||
if err != nil {
|
||||
response.IsSuccess = false
|
||||
|
@ -181,14 +181,14 @@ func (wakuLP *WakuLightPush) Stop() {
|
|||
wakuLP.h.RemoveStreamHandler(LightPushID_v20beta1)
|
||||
}
|
||||
|
||||
func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *pb.WakuMessage, topic *string, opts ...LightPushOption) ([]byte, error) {
|
||||
func (wakuLP *WakuLightPush) PublishWithTopic(ctx context.Context, message *pb.WakuMessage, topic string, opts ...LightPushOption) ([]byte, error) {
|
||||
if message == nil {
|
||||
return nil, errors.New("message can't be null")
|
||||
}
|
||||
|
||||
req := new(pb.PushRequest)
|
||||
req.Message = message
|
||||
req.PubsubTopic = string(relay.GetTopic(topic))
|
||||
req.PubsubTopic = topic
|
||||
|
||||
response, err := wakuLP.request(ctx, req, opts...)
|
||||
if err != nil {
|
||||
|
@ -202,3 +202,7 @@ func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *pb.WakuMessag
|
|||
return nil, errors.New(response.Info)
|
||||
}
|
||||
}
|
||||
|
||||
func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *pb.WakuMessage, opts ...LightPushOption) ([]byte, error) {
|
||||
return wakuLP.PublishWithTopic(ctx, message, relay.DefaultWakuTopic, opts...)
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ func makeWakuRelay(t *testing.T, topic string) (*relay.WakuRelay, *relay.Subscri
|
|||
relay, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10))
|
||||
require.NoError(t, err)
|
||||
|
||||
sub, err := relay.Subscribe(context.Background(), &topic)
|
||||
sub, err := relay.SubscribeWithTopic(context.Background(), topic)
|
||||
require.NoError(t, err)
|
||||
|
||||
return relay, sub, host
|
||||
|
@ -109,7 +109,7 @@ func TestWakuLightPush(t *testing.T) {
|
|||
require.True(t, resp.IsSuccess)
|
||||
|
||||
// Checking that msg hash is correct
|
||||
hash, err := client.Publish(ctx, msg2, &testTopic)
|
||||
hash, err := client.PublishWithTopic(ctx, msg2, testTopic)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, protocol.NewEnvelope(msg2, string(testTopic)).Hash(), hash)
|
||||
wg.Wait()
|
||||
|
@ -137,6 +137,6 @@ func TestWakuLightPushNoPeers(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
client := NewWakuLightPush(ctx, clientHost, nil)
|
||||
|
||||
_, err = client.Publish(ctx, tests.CreateWakuMessage("test", float64(0)), &testTopic)
|
||||
_, err = client.PublishWithTopic(ctx, tests.CreateWakuMessage("test", float64(0)), testTopic)
|
||||
require.Errorf(t, err, "no suitable remote peers")
|
||||
}
|
||||
|
|
|
@ -143,7 +143,7 @@ func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err erro
|
|||
return sub, nil
|
||||
}
|
||||
|
||||
func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, topic *string) ([]byte, error) {
|
||||
func (w *WakuRelay) PublishWithTopic(ctx context.Context, message *pb.WakuMessage, topic string) ([]byte, error) {
|
||||
// Publish a `WakuMessage` to a PubSub topic.
|
||||
if w.pubsub == nil {
|
||||
return nil, errors.New("PubSub hasn't been set")
|
||||
|
@ -153,7 +153,7 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, topic
|
|||
return nil, errors.New("message can't be null")
|
||||
}
|
||||
|
||||
pubSubTopic, err := w.upsertTopic(GetTopic(topic))
|
||||
pubSubTopic, err := w.upsertTopic(topic)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -174,12 +174,8 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, topic
|
|||
return hash, nil
|
||||
}
|
||||
|
||||
func GetTopic(topic *string) string {
|
||||
t := DefaultWakuTopic
|
||||
if topic != nil {
|
||||
t = *topic
|
||||
}
|
||||
return t
|
||||
func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage) ([]byte, error) {
|
||||
return w.PublishWithTopic(ctx, message, DefaultWakuTopic)
|
||||
}
|
||||
|
||||
func (w *WakuRelay) Stop() {
|
||||
|
@ -195,11 +191,10 @@ func (w *WakuRelay) Stop() {
|
|||
w.subscriptions = nil
|
||||
}
|
||||
|
||||
func (w *WakuRelay) Subscribe(ctx context.Context, topic *string) (*Subscription, error) {
|
||||
func (w *WakuRelay) SubscribeWithTopic(ctx context.Context, topic string) (*Subscription, error) {
|
||||
// Subscribes to a PubSub topic.
|
||||
// NOTE The data field SHOULD be decoded as a WakuMessage.
|
||||
t := GetTopic(topic)
|
||||
sub, err := w.subscribe(t)
|
||||
sub, err := w.subscribe(topic)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -214,17 +209,21 @@ func (w *WakuRelay) Subscribe(ctx context.Context, topic *string) (*Subscription
|
|||
w.subscriptionsMutex.Lock()
|
||||
defer w.subscriptionsMutex.Unlock()
|
||||
|
||||
w.subscriptions[t] = append(w.subscriptions[t], subscription)
|
||||
w.subscriptions[topic] = append(w.subscriptions[topic], subscription)
|
||||
|
||||
if w.bcaster != nil {
|
||||
w.bcaster.Register(subscription.C)
|
||||
}
|
||||
|
||||
go w.subscribeToTopic(t, subscription, sub)
|
||||
go w.subscribeToTopic(topic, subscription, sub)
|
||||
|
||||
return subscription, nil
|
||||
}
|
||||
|
||||
func (w *WakuRelay) Subscribe(ctx context.Context) (*Subscription, error) {
|
||||
return w.SubscribeWithTopic(ctx, DefaultWakuTopic)
|
||||
}
|
||||
|
||||
func (w *WakuRelay) Unsubscribe(ctx context.Context, topic string) error {
|
||||
if _, ok := w.relaySubs[topic]; !ok {
|
||||
return fmt.Errorf("topics %s is not subscribed", (string)(topic))
|
||||
|
|
|
@ -45,7 +45,7 @@ func TestWakuRelay(t *testing.T) {
|
|||
ContentTopic: "test",
|
||||
Timestamp: 0,
|
||||
}
|
||||
_, err = relay.Publish(context.Background(), msg, &testTopic)
|
||||
_, err = relay.PublishWithTopic(context.Background(), msg, testTopic)
|
||||
require.NoError(t, err)
|
||||
|
||||
<-ctx.Done()
|
||||
|
|
|
@ -24,7 +24,7 @@ func makeFilterService(t *testing.T) *FilterService {
|
|||
err = n.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = n.Relay().Subscribe(context.Background(), &testTopic)
|
||||
_, err = n.Relay().SubscribeWithTopic(context.Background(), testTopic)
|
||||
require.NoError(t, err)
|
||||
|
||||
return &FilterService{n}
|
||||
|
@ -40,7 +40,7 @@ func TestFilterSubscription(t *testing.T) {
|
|||
node, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10))
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = node.Subscribe(context.Background(), &testTopic)
|
||||
_, err = node.SubscribeWithTopic(context.Background(), testTopic)
|
||||
require.NoError(t, err)
|
||||
|
||||
_ = filter.NewWakuFilter(context.Background(), host, false)
|
||||
|
|
|
@ -77,7 +77,12 @@ func (r *RelayService) Stop() {
|
|||
}
|
||||
|
||||
func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, reply *SuccessReply) error {
|
||||
_, err := r.node.Relay().Publish(req.Context(), &args.Message, &args.Topic)
|
||||
var err error
|
||||
if args.Topic == "" {
|
||||
_, err = r.node.Relay().Publish(req.Context(), &args.Message)
|
||||
} else {
|
||||
_, err = r.node.Relay().PublishWithTopic(req.Context(), &args.Message, args.Topic)
|
||||
}
|
||||
if err != nil {
|
||||
log.Error("Error publishing message:", err)
|
||||
reply.Success = false
|
||||
|
@ -91,7 +96,13 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs,
|
|||
func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
|
||||
ctx := req.Context()
|
||||
for _, topic := range args.Topics {
|
||||
_, err := r.node.Relay().Subscribe(ctx, &topic)
|
||||
var err error
|
||||
if topic == "" {
|
||||
_, err = r.node.Relay().Subscribe(ctx)
|
||||
|
||||
} else {
|
||||
_, err = r.node.Relay().SubscribeWithTopic(ctx, topic)
|
||||
}
|
||||
if err != nil {
|
||||
log.Error("Error subscribing to topic:", topic, "err:", err)
|
||||
reply.Success = false
|
||||
|
|
Loading…
Reference in New Issue