diff --git a/cmd/waku/flags.go b/cmd/waku/flags.go index 56a8def3..91a82d5c 100644 --- a/cmd/waku/flags.go +++ b/cmd/waku/flags.go @@ -264,15 +264,6 @@ var ( Destination: &options.Relay.ContentTopics, EnvVars: []string{"WAKUNODE2_CONTENT_TOPICS"}, }) - BridgeTopics = altsrc.NewGenericFlag(&cli.GenericFlag{ - Name: "bridge-topics", - Usage: "Bridge two pubsub topics, from_topic:to_topic. Argument may be repeated.", - EnvVars: []string{"WAKUNODE2_BRIDGE_TOPIC"}, - Value: &cliutils.BridgeTopicSlice{ - Values: &options.Relay.BridgeTopics, - }, - Hidden: true, - }) ProtectedTopics = cliutils.NewGenericFlagMultiValue(&cli.GenericFlag{ Name: "protected-topic", Usage: "Topics and its public key to be used for message validation, topic:pubkey. Argument may be repeated.", diff --git a/cmd/waku/main.go b/cmd/waku/main.go index ce6c3a17..f868505f 100644 --- a/cmd/waku/main.go +++ b/cmd/waku/main.go @@ -56,7 +56,6 @@ func main() { Topics, ContentTopics, PubSubTopics, - BridgeTopics, ProtectedTopics, RelayPeerExchange, MinRelayPeersToPublish, diff --git a/cmd/waku/options.go b/cmd/waku/options.go index d85f094a..ee32bb2d 100644 --- a/cmd/waku/options.go +++ b/cmd/waku/options.go @@ -25,7 +25,6 @@ type DiscV5Options struct { type RelayOptions struct { Enable bool Topics cli.StringSlice - BridgeTopics []cliutils.BridgeTopic ProtectedTopics []cliutils.ProtectedTopic PubSubTopics cli.StringSlice ContentTopics cli.StringSlice diff --git a/cmd/waku/relay.go b/cmd/waku/relay.go index 6b59f941..26ae1de3 100644 --- a/cmd/waku/relay.go +++ b/cmd/waku/relay.go @@ -1,23 +1,16 @@ package main import ( - "bytes" "context" "sync" "time" - "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/v2/node" wprotocol "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/rendezvous" - "github.com/waku-org/go-waku/waku/v2/utils" - "go.uber.org/zap" - "golang.org/x/exp/maps" ) -var fwdMetaTag = []byte{102, 119, 100} //"fwd" - func handleRelayTopics(ctx context.Context, wg *sync.WaitGroup, wakuNode *node.WakuNode, pubSubTopicMap map[string][]string) error { for nodeTopic, cTopics := range pubSubTopicMap { nodeTopic := nodeTopic @@ -85,72 +78,5 @@ func handleRelayTopics(ctx context.Context, wg *sync.WaitGroup, wakuNode *node.W } } - err := bridgeTopics(ctx, wg, wakuNode) - if err != nil { - return err - } - - return nil -} - -func bridgeTopics(ctx context.Context, wg *sync.WaitGroup, wakuNode *node.WakuNode) error { - // Bridge topics - bridgedTopics := make(map[string]map[string]struct{}) - bridgedTopicsSet := make(map[string]struct{}) - for _, topics := range options.Relay.BridgeTopics { - _, ok := bridgedTopics[topics.FromTopic] - if !ok { - bridgedTopics[topics.FromTopic] = make(map[string]struct{}) - } - - bridgedTopics[topics.FromTopic][topics.ToTopic] = struct{}{} - bridgedTopicsSet[topics.FromTopic] = struct{}{} - bridgedTopicsSet[topics.ToTopic] = struct{}{} - } - - // Make sure all topics are subscribed - for _, topic := range maps.Keys(bridgedTopicsSet) { - if !wakuNode.Relay().IsSubscribed(topic) { - _, err := wakuNode.Relay().Subscribe(ctx, wprotocol.NewContentFilter(topic), relay.WithoutConsumer()) - if err != nil { - return err - } - } - } - - for fromTopic, toTopics := range bridgedTopics { - subscriptions, err := wakuNode.Relay().Subscribe(ctx, wprotocol.NewContentFilter(fromTopic)) - if err != nil { - return err - } - - topics := maps.Keys(toTopics) - for _, subscription := range subscriptions { - wg.Add(1) - go func(subscription *relay.Subscription, topics []string) { - defer wg.Done() - for env := range subscription.Ch { - for _, topic := range topics { - // HACK: message has been already fwded - metaLen := len(env.Message().Meta) - fwdTagLen := len(fwdMetaTag) - if metaLen >= fwdTagLen && bytes.Equal(env.Message().Meta[metaLen-fwdTagLen:], fwdMetaTag) { - continue - } - - // HACK: We append magic numbers here, just so the pubsub message ID will change - env.Message().Meta = append(env.Message().Meta, fwdMetaTag...) - _, err := wakuNode.Relay().Publish(ctx, env.Message(), relay.WithPubSubTopic(topic)) - if err != nil { - utils.Logger().Warn("could not bridge message", logging.HexBytes("hash", env.Hash()), - zap.String("fromTopic", env.PubsubTopic()), zap.String("toTopic", topic), - zap.String("contentTopic", env.Message().ContentTopic), zap.Error(err)) - } - } - } - }(subscription, topics) - } - } - return nil } diff --git a/waku/cliutils/bridge.go b/waku/cliutils/bridge.go deleted file mode 100644 index 2ba1c59a..00000000 --- a/waku/cliutils/bridge.go +++ /dev/null @@ -1,56 +0,0 @@ -package cliutils - -import ( - "errors" - "fmt" - "strings" - - "golang.org/x/exp/slices" -) - -type BridgeTopic struct { - FromTopic string - ToTopic string -} - -func (p BridgeTopic) String() string { - return fmt.Sprintf("%s:%s", p.FromTopic, p.ToTopic) -} - -type BridgeTopicSlice struct { - Values *[]BridgeTopic -} - -func (k *BridgeTopicSlice) Set(value string) error { - topicParts := strings.Split(value, ":") - if len(topicParts) != 2 { - return errors.New("expected from_topic:to_topic") - } - - for i := range topicParts { - topicParts[i] = strings.TrimSpace(topicParts[i]) - } - - if slices.Contains(topicParts, "") { - return errors.New("topic can't be empty") - } - - *k.Values = append(*k.Values, BridgeTopic{ - FromTopic: topicParts[0], - ToTopic: topicParts[1], - }) - - return nil -} - -func (k *BridgeTopicSlice) String() string { - if k.Values == nil { - return "" - } - var output []string - for _, v := range *k.Values { - output = append(output, v.String()) - } - - return strings.Join(output, ", ") -}