From 56ef99e11f06fa27f25b94d16dcab7095ae52c5c Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Fri, 19 Nov 2021 12:19:48 -0400 Subject: [PATCH] refactor: remove topic type --- examples/filter2/main.go | 3 +- waku/node.go | 3 +- waku/v2/protocol/filter/waku_filter_test.go | 4 +- waku/v2/protocol/lightpush/waku_lightpush.go | 4 +- .../protocol/lightpush/waku_lightpush_test.go | 6 +-- waku/v2/protocol/relay/waku_relay.go | 46 ++++++++----------- waku/v2/protocol/relay/waku_relay_test.go | 2 +- waku/v2/rpc/filter_test.go | 4 +- waku/v2/rpc/relay.go | 7 ++- 9 files changed, 35 insertions(+), 44 deletions(-) diff --git a/examples/filter2/main.go b/examples/filter2/main.go index da4c717f..dde6c501 100644 --- a/examples/filter2/main.go +++ b/examples/filter2/main.go @@ -17,7 +17,6 @@ import ( "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol/filter" "github.com/status-im/go-waku/waku/v2/protocol/pb" - "github.com/status-im/go-waku/waku/v2/protocol/relay" "github.com/status-im/go-waku/waku/v2/utils" ) @@ -173,7 +172,7 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode) { } func readLoop(ctx context.Context, wakuNode *node.WakuNode) { - pubsubTopic := relay.Topic(pubSubTopic.String()) + pubsubTopic := pubSubTopic.String() sub, err := wakuNode.Relay().Subscribe(ctx, &pubsubTopic) if err != nil { log.Error("Could not subscribe: ", err) diff --git a/waku/node.go b/waku/node.go index a392b155..5d2ed56e 100644 --- a/waku/node.go +++ b/waku/node.go @@ -231,8 +231,7 @@ func Execute(options Options) { } if !options.Relay.Disable { - for _, t := range options.Relay.Topics { - nodeTopic := relay.Topic(t) + for _, nodeTopic := range options.Relay.Topics { _, err := wakuNode.Relay().Subscribe(ctx, &nodeTopic) failOnErr(err, "Error subscring to topic") } diff --git a/waku/v2/protocol/filter/waku_filter_test.go b/waku/v2/protocol/filter/waku_filter_test.go index 6e28b012..25df13ed 100644 --- a/waku/v2/protocol/filter/waku_filter_test.go +++ b/waku/v2/protocol/filter/waku_filter_test.go @@ -15,7 +15,7 @@ import ( "github.com/stretchr/testify/require" ) -func makeWakuRelay(t *testing.T, topic relay.Topic, broadcaster v2.Broadcaster) (*relay.WakuRelay, *relay.Subscription, host.Host) { +func makeWakuRelay(t *testing.T, topic string, broadcaster v2.Broadcaster) (*relay.WakuRelay, *relay.Subscription, host.Host) { port, err := tests.FindFreePort(t, "", 5) require.NoError(t, err) @@ -57,7 +57,7 @@ func TestWakuFilter(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds defer cancel() - var testTopic relay.Topic = "/waku/2/go/filter/test" + testTopic := "/waku/2/go/filter/test" testContentTopic := "TopicA" node1, host1 := makeWakuFilter(t) diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 053364b4..6715801d 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -77,7 +77,7 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) { log.Info("lightpush push request") response := new(pb.PushResponse) if !wakuLP.IsClientOnly() { - pubSubTopic := relay.Topic(requestPushRPC.Query.PubsubTopic) + pubSubTopic := requestPushRPC.Query.PubsubTopic message := requestPushRPC.Query.Message // TODO: Assumes success, should probably be extended to check for network, peers, etc @@ -181,7 +181,7 @@ func (wakuLP *WakuLightPush) Stop() { wakuLP.h.RemoveStreamHandler(LightPushID_v20beta1) } -func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic, opts ...LightPushOption) ([]byte, error) { +func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *pb.WakuMessage, topic *string, opts ...LightPushOption) ([]byte, error) { if message == nil { return nil, errors.New("message can't be null") } diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index a30e3415..b0b86b73 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -17,7 +17,7 @@ import ( "github.com/stretchr/testify/require" ) -func makeWakuRelay(t *testing.T, topic relay.Topic) (*relay.WakuRelay, *relay.Subscription, host.Host) { +func makeWakuRelay(t *testing.T, topic string) (*relay.WakuRelay, *relay.Subscription, host.Host) { port, err := tests.FindFreePort(t, "", 5) require.NoError(t, err) @@ -45,7 +45,7 @@ func makeWakuRelay(t *testing.T, topic relay.Topic) (*relay.WakuRelay, *relay.Su // Node2 receive the message and broadcast it // Node1 receive the message func TestWakuLightPush(t *testing.T) { - var testTopic relay.Topic = "/waku/2/go/lightpush/test" + testTopic := "/waku/2/go/lightpush/test" node1, sub1, host1 := makeWakuRelay(t, testTopic) defer node1.Stop() defer sub1.Unsubscribe() @@ -131,7 +131,7 @@ func TestWakuLightPushNoPeers(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - testTopic := relay.Topic("abc") + testTopic := "abc" clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader) require.NoError(t, err) diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index be1e2cf4..cff9c519 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -24,11 +24,9 @@ import ( var log = logging.Logger("wakurelay") -type Topic string - const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0") -var DefaultWakuTopic Topic = Topic(waku_proto.DefaultPubsubTopic().String()) +var DefaultWakuTopic string = waku_proto.DefaultPubsubTopic().String() type WakuRelay struct { host host.Host @@ -37,13 +35,12 @@ type WakuRelay struct { bcaster v2.Broadcaster // TODO: convert to concurrent maps - topics map[Topic]struct{} topicsMutex sync.Mutex - wakuRelayTopics map[Topic]*pubsub.Topic - relaySubs map[Topic]*pubsub.Subscription + wakuRelayTopics map[string]*pubsub.Topic + relaySubs map[string]*pubsub.Subscription // TODO: convert to concurrent maps - subscriptions map[Topic][]*Subscription + subscriptions map[string][]*Subscription subscriptionsMutex sync.Mutex } @@ -56,10 +53,9 @@ func msgIdFn(pmsg *pubsub_pb.Message) string { func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, opts ...pubsub.Option) (*WakuRelay, error) { w := new(WakuRelay) w.host = h - w.topics = make(map[Topic]struct{}) - w.wakuRelayTopics = make(map[Topic]*pubsub.Topic) - w.relaySubs = make(map[Topic]*pubsub.Subscription) - w.subscriptions = make(map[Topic][]*Subscription) + w.wakuRelayTopics = make(map[string]*pubsub.Topic) + w.relaySubs = make(map[string]*pubsub.Subscription) + w.subscriptions = make(map[string][]*Subscription) w.bcaster = bcaster // default options required by WakuRelay @@ -96,12 +92,12 @@ func (w *WakuRelay) PubSub() *pubsub.PubSub { return w.pubsub } -func (w *WakuRelay) Topics() []Topic { +func (w *WakuRelay) Topics() []string { defer w.topicsMutex.Unlock() w.topicsMutex.Lock() - var result []Topic - for topic := range w.topics { + var result []string + for topic := range w.relaySubs { result = append(result, topic) } return result @@ -111,11 +107,10 @@ func (w *WakuRelay) SetPubSub(pubSub *pubsub.PubSub) { w.pubsub = pubSub } -func (w *WakuRelay) upsertTopic(topic Topic) (*pubsub.Topic, error) { +func (w *WakuRelay) upsertTopic(topic string) (*pubsub.Topic, error) { defer w.topicsMutex.Unlock() w.topicsMutex.Lock() - w.topics[topic] = struct{}{} pubSubTopic, ok := w.wakuRelayTopics[topic] if !ok { // Joins topic if node hasn't joined yet newTopic, err := w.pubsub.Join(string(topic)) @@ -128,7 +123,7 @@ func (w *WakuRelay) upsertTopic(topic Topic) (*pubsub.Topic, error) { return pubSubTopic, nil } -func (w *WakuRelay) subscribe(topic Topic) (subs *pubsub.Subscription, err error) { +func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err error) { sub, ok := w.relaySubs[topic] if !ok { pubSubTopic, err := w.upsertTopic(topic) @@ -148,7 +143,7 @@ func (w *WakuRelay) subscribe(topic Topic) (subs *pubsub.Subscription, err error return sub, nil } -func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, topic *Topic) ([]byte, error) { +func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, topic *string) ([]byte, error) { // Publish a `WakuMessage` to a PubSub topic. if w.pubsub == nil { return nil, errors.New("PubSub hasn't been set") @@ -179,8 +174,8 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, topic return hash, nil } -func GetTopic(topic *Topic) Topic { - var t Topic = DefaultWakuTopic +func GetTopic(topic *string) string { + t := DefaultWakuTopic if topic != nil { t = *topic } @@ -200,7 +195,7 @@ func (w *WakuRelay) Stop() { w.subscriptions = nil } -func (w *WakuRelay) Subscribe(ctx context.Context, topic *Topic) (*Subscription, error) { +func (w *WakuRelay) Subscribe(ctx context.Context, topic *string) (*Subscription, error) { // Subscribes to a PubSub topic. // NOTE The data field SHOULD be decoded as a WakuMessage. t := GetTopic(topic) @@ -230,12 +225,11 @@ func (w *WakuRelay) Subscribe(ctx context.Context, topic *Topic) (*Subscription, return subscription, nil } -func (w *WakuRelay) Unsubscribe(ctx context.Context, topic Topic) error { - if _, ok := w.topics[topic]; !ok { +func (w *WakuRelay) Unsubscribe(ctx context.Context, topic string) error { + if _, ok := w.relaySubs[topic]; !ok { return fmt.Errorf("topics %s is not subscribed", (string)(topic)) } log.Info("Unsubscribing from topic ", topic) - delete(w.topics, topic) for _, sub := range w.subscriptions[topic] { sub.Unsubscribe() @@ -268,7 +262,7 @@ func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) < log.Error(fmt.Errorf("subscription failed: %w", err)) sub.Cancel() close(msgChannel) - for _, subscription := range w.subscriptions[Topic(sub.Topic())] { + for _, subscription := range w.subscriptions[sub.Topic()] { subscription.Unsubscribe() } } @@ -279,7 +273,7 @@ func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) < return msgChannel } -func (w *WakuRelay) subscribeToTopic(t Topic, subscription *Subscription, sub *pubsub.Subscription) { +func (w *WakuRelay) subscribeToTopic(t string, subscription *Subscription, sub *pubsub.Subscription) { ctx, err := tag.New(context.Background(), tag.Insert(metrics.KeyType, "relay")) if err != nil { log.Error(err) diff --git a/waku/v2/protocol/relay/waku_relay_test.go b/waku/v2/protocol/relay/waku_relay_test.go index d242ef5a..4cd39b02 100644 --- a/waku/v2/protocol/relay/waku_relay_test.go +++ b/waku/v2/protocol/relay/waku_relay_test.go @@ -11,7 +11,7 @@ import ( ) func TestWakuRelay(t *testing.T) { - var testTopic Topic = "/waku/2/go/relay/test" + testTopic := "/waku/2/go/relay/test" port, err := tests.FindFreePort(t, "", 5) require.NoError(t, err) diff --git a/waku/v2/rpc/filter_test.go b/waku/v2/rpc/filter_test.go index 9812484f..d2c80eb1 100644 --- a/waku/v2/rpc/filter_test.go +++ b/waku/v2/rpc/filter_test.go @@ -24,7 +24,7 @@ func makeFilterService(t *testing.T) *FilterService { err = n.Start() require.NoError(t, err) - _, err = n.Relay().Subscribe(context.Background(), (*relay.Topic)(&testTopic)) + _, err = n.Relay().Subscribe(context.Background(), &testTopic) require.NoError(t, err) return &FilterService{n} @@ -40,7 +40,7 @@ func TestFilterSubscription(t *testing.T) { node, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10)) require.NoError(t, err) - _, err = node.Subscribe(context.Background(), (*relay.Topic)(&testTopic)) + _, err = node.Subscribe(context.Background(), &testTopic) require.NoError(t, err) _ = filter.NewWakuFilter(context.Background(), host, false) diff --git a/waku/v2/rpc/relay.go b/waku/v2/rpc/relay.go index fd569c73..ec54956e 100644 --- a/waku/v2/rpc/relay.go +++ b/waku/v2/rpc/relay.go @@ -8,7 +8,6 @@ import ( "github.com/status-im/go-waku/waku/v2/node" "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol/pb" - "github.com/status-im/go-waku/waku/v2/protocol/relay" ) type RelayService struct { @@ -78,7 +77,7 @@ func (r *RelayService) Stop() { } func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, reply *SuccessReply) error { - _, err := r.node.Relay().Publish(req.Context(), &args.Message, (*relay.Topic)(&args.Topic)) + _, err := r.node.Relay().Publish(req.Context(), &args.Message, &args.Topic) if err != nil { log.Error("Error publishing message:", err) reply.Success = false @@ -92,7 +91,7 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error { ctx := req.Context() for _, topic := range args.Topics { - _, err := r.node.Relay().Subscribe(ctx, (*relay.Topic)(&topic)) + _, err := r.node.Relay().Subscribe(ctx, &topic) if err != nil { log.Error("Error subscribing to topic:", topic, "err:", err) reply.Success = false @@ -108,7 +107,7 @@ func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, r func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error { ctx := req.Context() for _, topic := range args.Topics { - err := r.node.Relay().Unsubscribe(ctx, (relay.Topic)(topic)) + err := r.node.Relay().Unsubscribe(ctx, topic) if err != nil { log.Error("Error unsubscribing from topic:", topic, "err:", err) reply.Success = false