mirror of https://github.com/status-im/go-waku.git
refactor: remove topic type
This commit is contained in:
parent
75516a8f96
commit
56ef99e11f
|
@ -17,7 +17,6 @@ import (
|
||||||
"github.com/status-im/go-waku/waku/v2/protocol"
|
"github.com/status-im/go-waku/waku/v2/protocol"
|
||||||
"github.com/status-im/go-waku/waku/v2/protocol/filter"
|
"github.com/status-im/go-waku/waku/v2/protocol/filter"
|
||||||
"github.com/status-im/go-waku/waku/v2/protocol/pb"
|
"github.com/status-im/go-waku/waku/v2/protocol/pb"
|
||||||
"github.com/status-im/go-waku/waku/v2/protocol/relay"
|
|
||||||
"github.com/status-im/go-waku/waku/v2/utils"
|
"github.com/status-im/go-waku/waku/v2/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -173,7 +172,7 @@ func writeLoop(ctx context.Context, wakuNode *node.WakuNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func readLoop(ctx context.Context, wakuNode *node.WakuNode) {
|
func readLoop(ctx context.Context, wakuNode *node.WakuNode) {
|
||||||
pubsubTopic := relay.Topic(pubSubTopic.String())
|
pubsubTopic := pubSubTopic.String()
|
||||||
sub, err := wakuNode.Relay().Subscribe(ctx, &pubsubTopic)
|
sub, err := wakuNode.Relay().Subscribe(ctx, &pubsubTopic)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Could not subscribe: ", err)
|
log.Error("Could not subscribe: ", err)
|
||||||
|
|
|
@ -231,8 +231,7 @@ func Execute(options Options) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if !options.Relay.Disable {
|
if !options.Relay.Disable {
|
||||||
for _, t := range options.Relay.Topics {
|
for _, nodeTopic := range options.Relay.Topics {
|
||||||
nodeTopic := relay.Topic(t)
|
|
||||||
_, err := wakuNode.Relay().Subscribe(ctx, &nodeTopic)
|
_, err := wakuNode.Relay().Subscribe(ctx, &nodeTopic)
|
||||||
failOnErr(err, "Error subscring to topic")
|
failOnErr(err, "Error subscring to topic")
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,7 +15,7 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func makeWakuRelay(t *testing.T, topic relay.Topic, broadcaster v2.Broadcaster) (*relay.WakuRelay, *relay.Subscription, host.Host) {
|
func makeWakuRelay(t *testing.T, topic string, broadcaster v2.Broadcaster) (*relay.WakuRelay, *relay.Subscription, host.Host) {
|
||||||
port, err := tests.FindFreePort(t, "", 5)
|
port, err := tests.FindFreePort(t, "", 5)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -57,7 +57,7 @@ func TestWakuFilter(t *testing.T) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) // Test can't exceed 10 seconds
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
var testTopic relay.Topic = "/waku/2/go/filter/test"
|
testTopic := "/waku/2/go/filter/test"
|
||||||
testContentTopic := "TopicA"
|
testContentTopic := "TopicA"
|
||||||
|
|
||||||
node1, host1 := makeWakuFilter(t)
|
node1, host1 := makeWakuFilter(t)
|
||||||
|
|
|
@ -77,7 +77,7 @@ func (wakuLP *WakuLightPush) onRequest(s network.Stream) {
|
||||||
log.Info("lightpush push request")
|
log.Info("lightpush push request")
|
||||||
response := new(pb.PushResponse)
|
response := new(pb.PushResponse)
|
||||||
if !wakuLP.IsClientOnly() {
|
if !wakuLP.IsClientOnly() {
|
||||||
pubSubTopic := relay.Topic(requestPushRPC.Query.PubsubTopic)
|
pubSubTopic := requestPushRPC.Query.PubsubTopic
|
||||||
message := requestPushRPC.Query.Message
|
message := requestPushRPC.Query.Message
|
||||||
|
|
||||||
// TODO: Assumes success, should probably be extended to check for network, peers, etc
|
// TODO: Assumes success, should probably be extended to check for network, peers, etc
|
||||||
|
@ -181,7 +181,7 @@ func (wakuLP *WakuLightPush) Stop() {
|
||||||
wakuLP.h.RemoveStreamHandler(LightPushID_v20beta1)
|
wakuLP.h.RemoveStreamHandler(LightPushID_v20beta1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic, opts ...LightPushOption) ([]byte, error) {
|
func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *pb.WakuMessage, topic *string, opts ...LightPushOption) ([]byte, error) {
|
||||||
if message == nil {
|
if message == nil {
|
||||||
return nil, errors.New("message can't be null")
|
return nil, errors.New("message can't be null")
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,7 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func makeWakuRelay(t *testing.T, topic relay.Topic) (*relay.WakuRelay, *relay.Subscription, host.Host) {
|
func makeWakuRelay(t *testing.T, topic string) (*relay.WakuRelay, *relay.Subscription, host.Host) {
|
||||||
port, err := tests.FindFreePort(t, "", 5)
|
port, err := tests.FindFreePort(t, "", 5)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
@ -45,7 +45,7 @@ func makeWakuRelay(t *testing.T, topic relay.Topic) (*relay.WakuRelay, *relay.Su
|
||||||
// Node2 receive the message and broadcast it
|
// Node2 receive the message and broadcast it
|
||||||
// Node1 receive the message
|
// Node1 receive the message
|
||||||
func TestWakuLightPush(t *testing.T) {
|
func TestWakuLightPush(t *testing.T) {
|
||||||
var testTopic relay.Topic = "/waku/2/go/lightpush/test"
|
testTopic := "/waku/2/go/lightpush/test"
|
||||||
node1, sub1, host1 := makeWakuRelay(t, testTopic)
|
node1, sub1, host1 := makeWakuRelay(t, testTopic)
|
||||||
defer node1.Stop()
|
defer node1.Stop()
|
||||||
defer sub1.Unsubscribe()
|
defer sub1.Unsubscribe()
|
||||||
|
@ -131,7 +131,7 @@ func TestWakuLightPushNoPeers(t *testing.T) {
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
testTopic := relay.Topic("abc")
|
testTopic := "abc"
|
||||||
|
|
||||||
clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader)
|
clientHost, err := tests.MakeHost(context.Background(), 0, rand.Reader)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -24,11 +24,9 @@ import (
|
||||||
|
|
||||||
var log = logging.Logger("wakurelay")
|
var log = logging.Logger("wakurelay")
|
||||||
|
|
||||||
type Topic string
|
|
||||||
|
|
||||||
const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0")
|
const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0")
|
||||||
|
|
||||||
var DefaultWakuTopic Topic = Topic(waku_proto.DefaultPubsubTopic().String())
|
var DefaultWakuTopic string = waku_proto.DefaultPubsubTopic().String()
|
||||||
|
|
||||||
type WakuRelay struct {
|
type WakuRelay struct {
|
||||||
host host.Host
|
host host.Host
|
||||||
|
@ -37,13 +35,12 @@ type WakuRelay struct {
|
||||||
bcaster v2.Broadcaster
|
bcaster v2.Broadcaster
|
||||||
|
|
||||||
// TODO: convert to concurrent maps
|
// TODO: convert to concurrent maps
|
||||||
topics map[Topic]struct{}
|
|
||||||
topicsMutex sync.Mutex
|
topicsMutex sync.Mutex
|
||||||
wakuRelayTopics map[Topic]*pubsub.Topic
|
wakuRelayTopics map[string]*pubsub.Topic
|
||||||
relaySubs map[Topic]*pubsub.Subscription
|
relaySubs map[string]*pubsub.Subscription
|
||||||
|
|
||||||
// TODO: convert to concurrent maps
|
// TODO: convert to concurrent maps
|
||||||
subscriptions map[Topic][]*Subscription
|
subscriptions map[string][]*Subscription
|
||||||
subscriptionsMutex sync.Mutex
|
subscriptionsMutex sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -56,10 +53,9 @@ func msgIdFn(pmsg *pubsub_pb.Message) string {
|
||||||
func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, opts ...pubsub.Option) (*WakuRelay, error) {
|
func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, opts ...pubsub.Option) (*WakuRelay, error) {
|
||||||
w := new(WakuRelay)
|
w := new(WakuRelay)
|
||||||
w.host = h
|
w.host = h
|
||||||
w.topics = make(map[Topic]struct{})
|
w.wakuRelayTopics = make(map[string]*pubsub.Topic)
|
||||||
w.wakuRelayTopics = make(map[Topic]*pubsub.Topic)
|
w.relaySubs = make(map[string]*pubsub.Subscription)
|
||||||
w.relaySubs = make(map[Topic]*pubsub.Subscription)
|
w.subscriptions = make(map[string][]*Subscription)
|
||||||
w.subscriptions = make(map[Topic][]*Subscription)
|
|
||||||
w.bcaster = bcaster
|
w.bcaster = bcaster
|
||||||
|
|
||||||
// default options required by WakuRelay
|
// default options required by WakuRelay
|
||||||
|
@ -96,12 +92,12 @@ func (w *WakuRelay) PubSub() *pubsub.PubSub {
|
||||||
return w.pubsub
|
return w.pubsub
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WakuRelay) Topics() []Topic {
|
func (w *WakuRelay) Topics() []string {
|
||||||
defer w.topicsMutex.Unlock()
|
defer w.topicsMutex.Unlock()
|
||||||
w.topicsMutex.Lock()
|
w.topicsMutex.Lock()
|
||||||
|
|
||||||
var result []Topic
|
var result []string
|
||||||
for topic := range w.topics {
|
for topic := range w.relaySubs {
|
||||||
result = append(result, topic)
|
result = append(result, topic)
|
||||||
}
|
}
|
||||||
return result
|
return result
|
||||||
|
@ -111,11 +107,10 @@ func (w *WakuRelay) SetPubSub(pubSub *pubsub.PubSub) {
|
||||||
w.pubsub = pubSub
|
w.pubsub = pubSub
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WakuRelay) upsertTopic(topic Topic) (*pubsub.Topic, error) {
|
func (w *WakuRelay) upsertTopic(topic string) (*pubsub.Topic, error) {
|
||||||
defer w.topicsMutex.Unlock()
|
defer w.topicsMutex.Unlock()
|
||||||
w.topicsMutex.Lock()
|
w.topicsMutex.Lock()
|
||||||
|
|
||||||
w.topics[topic] = struct{}{}
|
|
||||||
pubSubTopic, ok := w.wakuRelayTopics[topic]
|
pubSubTopic, ok := w.wakuRelayTopics[topic]
|
||||||
if !ok { // Joins topic if node hasn't joined yet
|
if !ok { // Joins topic if node hasn't joined yet
|
||||||
newTopic, err := w.pubsub.Join(string(topic))
|
newTopic, err := w.pubsub.Join(string(topic))
|
||||||
|
@ -128,7 +123,7 @@ func (w *WakuRelay) upsertTopic(topic Topic) (*pubsub.Topic, error) {
|
||||||
return pubSubTopic, nil
|
return pubSubTopic, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WakuRelay) subscribe(topic Topic) (subs *pubsub.Subscription, err error) {
|
func (w *WakuRelay) subscribe(topic string) (subs *pubsub.Subscription, err error) {
|
||||||
sub, ok := w.relaySubs[topic]
|
sub, ok := w.relaySubs[topic]
|
||||||
if !ok {
|
if !ok {
|
||||||
pubSubTopic, err := w.upsertTopic(topic)
|
pubSubTopic, err := w.upsertTopic(topic)
|
||||||
|
@ -148,7 +143,7 @@ func (w *WakuRelay) subscribe(topic Topic) (subs *pubsub.Subscription, err error
|
||||||
return sub, nil
|
return sub, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, topic *Topic) ([]byte, error) {
|
func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, topic *string) ([]byte, error) {
|
||||||
// Publish a `WakuMessage` to a PubSub topic.
|
// Publish a `WakuMessage` to a PubSub topic.
|
||||||
if w.pubsub == nil {
|
if w.pubsub == nil {
|
||||||
return nil, errors.New("PubSub hasn't been set")
|
return nil, errors.New("PubSub hasn't been set")
|
||||||
|
@ -179,8 +174,8 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, topic
|
||||||
return hash, nil
|
return hash, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetTopic(topic *Topic) Topic {
|
func GetTopic(topic *string) string {
|
||||||
var t Topic = DefaultWakuTopic
|
t := DefaultWakuTopic
|
||||||
if topic != nil {
|
if topic != nil {
|
||||||
t = *topic
|
t = *topic
|
||||||
}
|
}
|
||||||
|
@ -200,7 +195,7 @@ func (w *WakuRelay) Stop() {
|
||||||
w.subscriptions = nil
|
w.subscriptions = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WakuRelay) Subscribe(ctx context.Context, topic *Topic) (*Subscription, error) {
|
func (w *WakuRelay) Subscribe(ctx context.Context, topic *string) (*Subscription, error) {
|
||||||
// Subscribes to a PubSub topic.
|
// Subscribes to a PubSub topic.
|
||||||
// NOTE The data field SHOULD be decoded as a WakuMessage.
|
// NOTE The data field SHOULD be decoded as a WakuMessage.
|
||||||
t := GetTopic(topic)
|
t := GetTopic(topic)
|
||||||
|
@ -230,12 +225,11 @@ func (w *WakuRelay) Subscribe(ctx context.Context, topic *Topic) (*Subscription,
|
||||||
return subscription, nil
|
return subscription, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WakuRelay) Unsubscribe(ctx context.Context, topic Topic) error {
|
func (w *WakuRelay) Unsubscribe(ctx context.Context, topic string) error {
|
||||||
if _, ok := w.topics[topic]; !ok {
|
if _, ok := w.relaySubs[topic]; !ok {
|
||||||
return fmt.Errorf("topics %s is not subscribed", (string)(topic))
|
return fmt.Errorf("topics %s is not subscribed", (string)(topic))
|
||||||
}
|
}
|
||||||
log.Info("Unsubscribing from topic ", topic)
|
log.Info("Unsubscribing from topic ", topic)
|
||||||
delete(w.topics, topic)
|
|
||||||
|
|
||||||
for _, sub := range w.subscriptions[topic] {
|
for _, sub := range w.subscriptions[topic] {
|
||||||
sub.Unsubscribe()
|
sub.Unsubscribe()
|
||||||
|
@ -268,7 +262,7 @@ func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) <
|
||||||
log.Error(fmt.Errorf("subscription failed: %w", err))
|
log.Error(fmt.Errorf("subscription failed: %w", err))
|
||||||
sub.Cancel()
|
sub.Cancel()
|
||||||
close(msgChannel)
|
close(msgChannel)
|
||||||
for _, subscription := range w.subscriptions[Topic(sub.Topic())] {
|
for _, subscription := range w.subscriptions[sub.Topic()] {
|
||||||
subscription.Unsubscribe()
|
subscription.Unsubscribe()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -279,7 +273,7 @@ func (w *WakuRelay) nextMessage(ctx context.Context, sub *pubsub.Subscription) <
|
||||||
return msgChannel
|
return msgChannel
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WakuRelay) subscribeToTopic(t Topic, subscription *Subscription, sub *pubsub.Subscription) {
|
func (w *WakuRelay) subscribeToTopic(t string, subscription *Subscription, sub *pubsub.Subscription) {
|
||||||
ctx, err := tag.New(context.Background(), tag.Insert(metrics.KeyType, "relay"))
|
ctx, err := tag.New(context.Background(), tag.Insert(metrics.KeyType, "relay"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
|
|
|
@ -11,7 +11,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestWakuRelay(t *testing.T) {
|
func TestWakuRelay(t *testing.T) {
|
||||||
var testTopic Topic = "/waku/2/go/relay/test"
|
testTopic := "/waku/2/go/relay/test"
|
||||||
|
|
||||||
port, err := tests.FindFreePort(t, "", 5)
|
port, err := tests.FindFreePort(t, "", 5)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
|
@ -24,7 +24,7 @@ func makeFilterService(t *testing.T) *FilterService {
|
||||||
err = n.Start()
|
err = n.Start()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
_, err = n.Relay().Subscribe(context.Background(), (*relay.Topic)(&testTopic))
|
_, err = n.Relay().Subscribe(context.Background(), &testTopic)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
return &FilterService{n}
|
return &FilterService{n}
|
||||||
|
@ -40,7 +40,7 @@ func TestFilterSubscription(t *testing.T) {
|
||||||
node, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10))
|
node, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
_, err = node.Subscribe(context.Background(), (*relay.Topic)(&testTopic))
|
_, err = node.Subscribe(context.Background(), &testTopic)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
_ = filter.NewWakuFilter(context.Background(), host, false)
|
_ = filter.NewWakuFilter(context.Background(), host, false)
|
||||||
|
|
|
@ -8,7 +8,6 @@ import (
|
||||||
"github.com/status-im/go-waku/waku/v2/node"
|
"github.com/status-im/go-waku/waku/v2/node"
|
||||||
"github.com/status-im/go-waku/waku/v2/protocol"
|
"github.com/status-im/go-waku/waku/v2/protocol"
|
||||||
"github.com/status-im/go-waku/waku/v2/protocol/pb"
|
"github.com/status-im/go-waku/waku/v2/protocol/pb"
|
||||||
"github.com/status-im/go-waku/waku/v2/protocol/relay"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type RelayService struct {
|
type RelayService struct {
|
||||||
|
@ -78,7 +77,7 @@ func (r *RelayService) Stop() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, reply *SuccessReply) error {
|
func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, reply *SuccessReply) error {
|
||||||
_, err := r.node.Relay().Publish(req.Context(), &args.Message, (*relay.Topic)(&args.Topic))
|
_, err := r.node.Relay().Publish(req.Context(), &args.Message, &args.Topic)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error publishing message:", err)
|
log.Error("Error publishing message:", err)
|
||||||
reply.Success = false
|
reply.Success = false
|
||||||
|
@ -92,7 +91,7 @@ func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs,
|
||||||
func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
|
func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
|
||||||
ctx := req.Context()
|
ctx := req.Context()
|
||||||
for _, topic := range args.Topics {
|
for _, topic := range args.Topics {
|
||||||
_, err := r.node.Relay().Subscribe(ctx, (*relay.Topic)(&topic))
|
_, err := r.node.Relay().Subscribe(ctx, &topic)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error subscribing to topic:", topic, "err:", err)
|
log.Error("Error subscribing to topic:", topic, "err:", err)
|
||||||
reply.Success = false
|
reply.Success = false
|
||||||
|
@ -108,7 +107,7 @@ func (r *RelayService) PostV1Subscription(req *http.Request, args *TopicsArgs, r
|
||||||
func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
|
func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, reply *SuccessReply) error {
|
||||||
ctx := req.Context()
|
ctx := req.Context()
|
||||||
for _, topic := range args.Topics {
|
for _, topic := range args.Topics {
|
||||||
err := r.node.Relay().Unsubscribe(ctx, (relay.Topic)(topic))
|
err := r.node.Relay().Unsubscribe(ctx, topic)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Error unsubscribing from topic:", topic, "err:", err)
|
log.Error("Error unsubscribing from topic:", topic, "err:", err)
|
||||||
reply.Success = false
|
reply.Success = false
|
||||||
|
|
Loading…
Reference in New Issue