diff --git a/cmd/waku/flags.go b/cmd/waku/flags.go index e83e44dc..dcbca197 100644 --- a/cmd/waku/flags.go +++ b/cmd/waku/flags.go @@ -264,6 +264,15 @@ var ( Destination: &options.Relay.ContentTopics, EnvVars: []string{"WAKUNODE2_CONTENT_TOPICS"}, }) + BridgeTopics = altsrc.NewGenericFlag(&cli.GenericFlag{ + Name: "bridge-topics", + Usage: "Bridge two pubsub topics, from_topic:to_topic. Argument may be repeated.", + EnvVars: []string{"WAKUNODE2_BRIDGE_TOPIC"}, + Value: &cliutils.BridgeTopicSlice{ + Values: &options.Relay.BridgeTopics, + }, + Hidden: true, + }) ProtectedTopics = cliutils.NewGenericFlagMultiValue(&cli.GenericFlag{ Name: "protected-topic", Usage: "Topics and its public key to be used for message validation, topic:pubkey. Argument may be repeated.", diff --git a/cmd/waku/main.go b/cmd/waku/main.go index 93ffd946..c590f38d 100644 --- a/cmd/waku/main.go +++ b/cmd/waku/main.go @@ -56,6 +56,7 @@ func main() { Topics, ContentTopics, PubSubTopics, + BridgeTopics, ProtectedTopics, RelayPeerExchange, MinRelayPeersToPublish, diff --git a/cmd/waku/node.go b/cmd/waku/node.go index b705d155..3d5113ef 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -383,69 +383,8 @@ func Execute(options NodeOptions) error { var wg sync.WaitGroup if options.Relay.Enable { - for nodeTopic, cTopics := range pubSubTopicMap { - nodeTopic := nodeTopic - _, err := wakuNode.Relay().Subscribe(ctx, wprotocol.NewContentFilter(nodeTopic, cTopics...), relay.WithoutConsumer()) - if err != nil { - return err - } - - if len(options.Rendezvous.Nodes) != 0 { - // Register the node in rendezvous point - iter := rendezvous.NewRendezvousPointIterator(options.Rendezvous.Nodes) - - wg.Add(1) - go func(nodeTopic string) { - t := time.NewTicker(rendezvous.RegisterDefaultTTL) - defer t.Stop() - defer wg.Done() - - for { - select { - case <-ctx.Done(): - return - case <-t.C: - // Register in rendezvous points periodically - wakuNode.Rendezvous().RegisterWithNamespace(ctx, nodeTopic, iter.RendezvousPoints()) - } - } - }(nodeTopic) - - wg.Add(1) - go func(nodeTopic string) { - defer wg.Done() - desiredOutDegree := wakuNode.Relay().Params().D - t := time.NewTicker(7 * time.Second) - defer t.Stop() - for { - select { - case <-ctx.Done(): - return - case <-t.C: - peerCnt := len(wakuNode.Relay().PubSub().ListPeers(nodeTopic)) - peersToFind := desiredOutDegree - peerCnt - if peersToFind <= 0 { - continue - } - - rp := <-iter.Next(ctx) - if rp == nil { - continue - } - ctx, cancel := context.WithTimeout(ctx, 7*time.Second) - wakuNode.Rendezvous().DiscoverWithNamespace(ctx, nodeTopic, rp, peersToFind) - cancel() - } - } - }(nodeTopic) - - } - } - - for _, protectedTopic := range options.Relay.ProtectedTopics { - if err := wakuNode.Relay().AddSignedTopicValidator(protectedTopic.Topic, protectedTopic.PublicKey); err != nil { - return nonRecoverErrorMsg("could not add signed topic validator: %w", err) - } + if err = handleRelayTopics(ctx, &wg, wakuNode, pubSubTopicMap); err != nil { + return err } } diff --git a/cmd/waku/options.go b/cmd/waku/options.go index 35b79249..611f64e4 100644 --- a/cmd/waku/options.go +++ b/cmd/waku/options.go @@ -25,6 +25,7 @@ type DiscV5Options struct { type RelayOptions struct { Enable bool Topics cli.StringSlice + BridgeTopics []cliutils.BridgeTopic ProtectedTopics []cliutils.ProtectedTopic PubSubTopics cli.StringSlice ContentTopics cli.StringSlice diff --git a/cmd/waku/relay.go b/cmd/waku/relay.go new file mode 100644 index 00000000..513582e3 --- /dev/null +++ b/cmd/waku/relay.go @@ -0,0 +1,156 @@ +package main + +import ( + "bytes" + "context" + "sync" + "time" + + "github.com/waku-org/go-waku/logging" + "github.com/waku-org/go-waku/waku/v2/node" + wprotocol "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/rendezvous" + "github.com/waku-org/go-waku/waku/v2/utils" + "go.uber.org/zap" + "golang.org/x/exp/maps" +) + +var fwdMetaTag = []byte{102, 119, 100} //"fwd" + +func handleRelayTopics(ctx context.Context, wg *sync.WaitGroup, wakuNode *node.WakuNode, pubSubTopicMap map[string][]string) error { + for nodeTopic, cTopics := range pubSubTopicMap { + nodeTopic := nodeTopic + _, err := wakuNode.Relay().Subscribe(ctx, wprotocol.NewContentFilter(nodeTopic, cTopics...), relay.WithoutConsumer()) + if err != nil { + return err + } + + if len(options.Rendezvous.Nodes) != 0 { + // Register the node in rendezvous point + iter := rendezvous.NewRendezvousPointIterator(options.Rendezvous.Nodes) + + wg.Add(1) + go func(nodeTopic string) { + t := time.NewTicker(rendezvous.RegisterDefaultTTL) + defer t.Stop() + defer wg.Done() + + for { + select { + case <-ctx.Done(): + return + case <-t.C: + // Register in rendezvous points periodically + wakuNode.Rendezvous().RegisterWithNamespace(ctx, nodeTopic, iter.RendezvousPoints()) + } + } + }(nodeTopic) + + wg.Add(1) + go func(nodeTopic string) { + defer wg.Done() + desiredOutDegree := wakuNode.Relay().Params().D + t := time.NewTicker(7 * time.Second) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return + case <-t.C: + peerCnt := len(wakuNode.Relay().PubSub().ListPeers(nodeTopic)) + peersToFind := desiredOutDegree - peerCnt + if peersToFind <= 0 { + continue + } + + rp := <-iter.Next(ctx) + if rp == nil { + continue + } + ctx, cancel := context.WithTimeout(ctx, 7*time.Second) + wakuNode.Rendezvous().DiscoverWithNamespace(ctx, nodeTopic, rp, peersToFind) + cancel() + } + } + }(nodeTopic) + + } + } + + // Protected topics + for _, protectedTopic := range options.Relay.ProtectedTopics { + if err := wakuNode.Relay().AddSignedTopicValidator(protectedTopic.Topic, protectedTopic.PublicKey); err != nil { + return nonRecoverErrorMsg("could not add signed topic validator: %w", err) + } + } + + err := bridgeTopics(ctx, wg, wakuNode) + if err != nil { + return err + } + + return nil +} + +func bridgeTopics(ctx context.Context, wg *sync.WaitGroup, wakuNode *node.WakuNode) error { + // Bridge topics + bridgedTopics := make(map[string]map[string]struct{}) + bridgedTopicsSet := make(map[string]struct{}) + for _, topics := range options.Relay.BridgeTopics { + _, ok := bridgedTopics[topics.FromTopic] + if !ok { + bridgedTopics[topics.FromTopic] = make(map[string]struct{}) + } + + bridgedTopics[topics.FromTopic][topics.ToTopic] = struct{}{} + bridgedTopicsSet[topics.FromTopic] = struct{}{} + bridgedTopicsSet[topics.ToTopic] = struct{}{} + } + + // Make sure all topics are subscribed + for _, topic := range maps.Keys(bridgedTopicsSet) { + if !wakuNode.Relay().IsSubscribed(topic) { + _, err := wakuNode.Relay().Subscribe(ctx, wprotocol.NewContentFilter(topic), relay.WithoutConsumer()) + if err != nil { + return err + } + } + } + + for fromTopic, toTopics := range bridgedTopics { + subscriptions, err := wakuNode.Relay().Subscribe(ctx, wprotocol.NewContentFilter(fromTopic)) + if err != nil { + return err + } + + topics := maps.Keys(toTopics) + for _, subscription := range subscriptions { + wg.Add(1) + go func(subscription *relay.Subscription, topics []string) { + defer wg.Done() + for env := range subscription.Ch { + for _, topic := range topics { + // HACK: message has been already fwded + metaLen := len(env.Message().Meta) + fwdTagLen := len(fwdMetaTag) + if metaLen >= fwdTagLen && bytes.Equal(env.Message().Meta[metaLen-fwdTagLen:], fwdMetaTag) { + continue + } + + // HACK: We append magic numbers here, just so the pubsub message ID will change + env.Message().Meta = append(env.Message().Meta, fwdMetaTag...) + _, err := wakuNode.Relay().Publish(ctx, env.Message(), relay.WithPubSubTopic(topic)) + if err != nil { + utils.Logger().Warn("could not bridge message", logging.HexString("hash", env.Hash()), + zap.String("fromTopic", env.PubsubTopic()), zap.String("toTopic", topic), + zap.String("contentTopic", env.Message().ContentTopic), zap.Error(err)) + } + } + } + }(subscription, topics) + } + } + + return nil +} diff --git a/waku/cliutils/bridge.go b/waku/cliutils/bridge.go new file mode 100644 index 00000000..2ba1c59a --- /dev/null +++ b/waku/cliutils/bridge.go @@ -0,0 +1,56 @@ +package cliutils + +import ( + "errors" + "fmt" + "strings" + + "golang.org/x/exp/slices" +) + +type BridgeTopic struct { + FromTopic string + ToTopic string +} + +func (p BridgeTopic) String() string { + return fmt.Sprintf("%s:%s", p.FromTopic, p.ToTopic) +} + +type BridgeTopicSlice struct { + Values *[]BridgeTopic +} + +func (k *BridgeTopicSlice) Set(value string) error { + topicParts := strings.Split(value, ":") + if len(topicParts) != 2 { + return errors.New("expected from_topic:to_topic") + } + + for i := range topicParts { + topicParts[i] = strings.TrimSpace(topicParts[i]) + } + + if slices.Contains(topicParts, "") { + return errors.New("topic can't be empty") + } + + *k.Values = append(*k.Values, BridgeTopic{ + FromTopic: topicParts[0], + ToTopic: topicParts[1], + }) + + return nil +} + +func (k *BridgeTopicSlice) String() string { + if k.Values == nil { + return "" + } + var output []string + for _, v := range *k.Values { + output = append(output, v.String()) + } + + return strings.Join(output, ", ") +}