mirror of
https://github.com/logos-messaging/logos-messaging-go.git
synced 2026-01-09 09:23:07 +00:00
feat: bridge relay topics (#854)
This commit is contained in:
parent
e0c6ab8ee1
commit
d51c207a1f
@ -264,6 +264,15 @@ var (
|
||||
Destination: &options.Relay.ContentTopics,
|
||||
EnvVars: []string{"WAKUNODE2_CONTENT_TOPICS"},
|
||||
})
|
||||
BridgeTopics = altsrc.NewGenericFlag(&cli.GenericFlag{
|
||||
Name: "bridge-topics",
|
||||
Usage: "Bridge two pubsub topics, from_topic:to_topic. Argument may be repeated.",
|
||||
EnvVars: []string{"WAKUNODE2_BRIDGE_TOPIC"},
|
||||
Value: &cliutils.BridgeTopicSlice{
|
||||
Values: &options.Relay.BridgeTopics,
|
||||
},
|
||||
Hidden: true,
|
||||
})
|
||||
ProtectedTopics = cliutils.NewGenericFlagMultiValue(&cli.GenericFlag{
|
||||
Name: "protected-topic",
|
||||
Usage: "Topics and its public key to be used for message validation, topic:pubkey. Argument may be repeated.",
|
||||
|
||||
@ -56,6 +56,7 @@ func main() {
|
||||
Topics,
|
||||
ContentTopics,
|
||||
PubSubTopics,
|
||||
BridgeTopics,
|
||||
ProtectedTopics,
|
||||
RelayPeerExchange,
|
||||
MinRelayPeersToPublish,
|
||||
|
||||
@ -383,69 +383,8 @@ func Execute(options NodeOptions) error {
|
||||
var wg sync.WaitGroup
|
||||
|
||||
if options.Relay.Enable {
|
||||
for nodeTopic, cTopics := range pubSubTopicMap {
|
||||
nodeTopic := nodeTopic
|
||||
_, err := wakuNode.Relay().Subscribe(ctx, wprotocol.NewContentFilter(nodeTopic, cTopics...), relay.WithoutConsumer())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(options.Rendezvous.Nodes) != 0 {
|
||||
// Register the node in rendezvous point
|
||||
iter := rendezvous.NewRendezvousPointIterator(options.Rendezvous.Nodes)
|
||||
|
||||
wg.Add(1)
|
||||
go func(nodeTopic string) {
|
||||
t := time.NewTicker(rendezvous.RegisterDefaultTTL)
|
||||
defer t.Stop()
|
||||
defer wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
// Register in rendezvous points periodically
|
||||
wakuNode.Rendezvous().RegisterWithNamespace(ctx, nodeTopic, iter.RendezvousPoints())
|
||||
}
|
||||
}
|
||||
}(nodeTopic)
|
||||
|
||||
wg.Add(1)
|
||||
go func(nodeTopic string) {
|
||||
defer wg.Done()
|
||||
desiredOutDegree := wakuNode.Relay().Params().D
|
||||
t := time.NewTicker(7 * time.Second)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
peerCnt := len(wakuNode.Relay().PubSub().ListPeers(nodeTopic))
|
||||
peersToFind := desiredOutDegree - peerCnt
|
||||
if peersToFind <= 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
rp := <-iter.Next(ctx)
|
||||
if rp == nil {
|
||||
continue
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(ctx, 7*time.Second)
|
||||
wakuNode.Rendezvous().DiscoverWithNamespace(ctx, nodeTopic, rp, peersToFind)
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
}(nodeTopic)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
for _, protectedTopic := range options.Relay.ProtectedTopics {
|
||||
if err := wakuNode.Relay().AddSignedTopicValidator(protectedTopic.Topic, protectedTopic.PublicKey); err != nil {
|
||||
return nonRecoverErrorMsg("could not add signed topic validator: %w", err)
|
||||
}
|
||||
if err = handleRelayTopics(ctx, &wg, wakuNode, pubSubTopicMap); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -25,6 +25,7 @@ type DiscV5Options struct {
|
||||
type RelayOptions struct {
|
||||
Enable bool
|
||||
Topics cli.StringSlice
|
||||
BridgeTopics []cliutils.BridgeTopic
|
||||
ProtectedTopics []cliutils.ProtectedTopic
|
||||
PubSubTopics cli.StringSlice
|
||||
ContentTopics cli.StringSlice
|
||||
|
||||
156
cmd/waku/relay.go
Normal file
156
cmd/waku/relay.go
Normal file
@ -0,0 +1,156 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/waku-org/go-waku/logging"
|
||||
"github.com/waku-org/go-waku/waku/v2/node"
|
||||
wprotocol "github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||
"github.com/waku-org/go-waku/waku/v2/rendezvous"
|
||||
"github.com/waku-org/go-waku/waku/v2/utils"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/exp/maps"
|
||||
)
|
||||
|
||||
var fwdMetaTag = []byte{102, 119, 100} //"fwd"
|
||||
|
||||
func handleRelayTopics(ctx context.Context, wg *sync.WaitGroup, wakuNode *node.WakuNode, pubSubTopicMap map[string][]string) error {
|
||||
for nodeTopic, cTopics := range pubSubTopicMap {
|
||||
nodeTopic := nodeTopic
|
||||
_, err := wakuNode.Relay().Subscribe(ctx, wprotocol.NewContentFilter(nodeTopic, cTopics...), relay.WithoutConsumer())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(options.Rendezvous.Nodes) != 0 {
|
||||
// Register the node in rendezvous point
|
||||
iter := rendezvous.NewRendezvousPointIterator(options.Rendezvous.Nodes)
|
||||
|
||||
wg.Add(1)
|
||||
go func(nodeTopic string) {
|
||||
t := time.NewTicker(rendezvous.RegisterDefaultTTL)
|
||||
defer t.Stop()
|
||||
defer wg.Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
// Register in rendezvous points periodically
|
||||
wakuNode.Rendezvous().RegisterWithNamespace(ctx, nodeTopic, iter.RendezvousPoints())
|
||||
}
|
||||
}
|
||||
}(nodeTopic)
|
||||
|
||||
wg.Add(1)
|
||||
go func(nodeTopic string) {
|
||||
defer wg.Done()
|
||||
desiredOutDegree := wakuNode.Relay().Params().D
|
||||
t := time.NewTicker(7 * time.Second)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
peerCnt := len(wakuNode.Relay().PubSub().ListPeers(nodeTopic))
|
||||
peersToFind := desiredOutDegree - peerCnt
|
||||
if peersToFind <= 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
rp := <-iter.Next(ctx)
|
||||
if rp == nil {
|
||||
continue
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(ctx, 7*time.Second)
|
||||
wakuNode.Rendezvous().DiscoverWithNamespace(ctx, nodeTopic, rp, peersToFind)
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
}(nodeTopic)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
// Protected topics
|
||||
for _, protectedTopic := range options.Relay.ProtectedTopics {
|
||||
if err := wakuNode.Relay().AddSignedTopicValidator(protectedTopic.Topic, protectedTopic.PublicKey); err != nil {
|
||||
return nonRecoverErrorMsg("could not add signed topic validator: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
err := bridgeTopics(ctx, wg, wakuNode)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func bridgeTopics(ctx context.Context, wg *sync.WaitGroup, wakuNode *node.WakuNode) error {
|
||||
// Bridge topics
|
||||
bridgedTopics := make(map[string]map[string]struct{})
|
||||
bridgedTopicsSet := make(map[string]struct{})
|
||||
for _, topics := range options.Relay.BridgeTopics {
|
||||
_, ok := bridgedTopics[topics.FromTopic]
|
||||
if !ok {
|
||||
bridgedTopics[topics.FromTopic] = make(map[string]struct{})
|
||||
}
|
||||
|
||||
bridgedTopics[topics.FromTopic][topics.ToTopic] = struct{}{}
|
||||
bridgedTopicsSet[topics.FromTopic] = struct{}{}
|
||||
bridgedTopicsSet[topics.ToTopic] = struct{}{}
|
||||
}
|
||||
|
||||
// Make sure all topics are subscribed
|
||||
for _, topic := range maps.Keys(bridgedTopicsSet) {
|
||||
if !wakuNode.Relay().IsSubscribed(topic) {
|
||||
_, err := wakuNode.Relay().Subscribe(ctx, wprotocol.NewContentFilter(topic), relay.WithoutConsumer())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for fromTopic, toTopics := range bridgedTopics {
|
||||
subscriptions, err := wakuNode.Relay().Subscribe(ctx, wprotocol.NewContentFilter(fromTopic))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
topics := maps.Keys(toTopics)
|
||||
for _, subscription := range subscriptions {
|
||||
wg.Add(1)
|
||||
go func(subscription *relay.Subscription, topics []string) {
|
||||
defer wg.Done()
|
||||
for env := range subscription.Ch {
|
||||
for _, topic := range topics {
|
||||
// HACK: message has been already fwded
|
||||
metaLen := len(env.Message().Meta)
|
||||
fwdTagLen := len(fwdMetaTag)
|
||||
if metaLen >= fwdTagLen && bytes.Equal(env.Message().Meta[metaLen-fwdTagLen:], fwdMetaTag) {
|
||||
continue
|
||||
}
|
||||
|
||||
// HACK: We append magic numbers here, just so the pubsub message ID will change
|
||||
env.Message().Meta = append(env.Message().Meta, fwdMetaTag...)
|
||||
_, err := wakuNode.Relay().Publish(ctx, env.Message(), relay.WithPubSubTopic(topic))
|
||||
if err != nil {
|
||||
utils.Logger().Warn("could not bridge message", logging.HexString("hash", env.Hash()),
|
||||
zap.String("fromTopic", env.PubsubTopic()), zap.String("toTopic", topic),
|
||||
zap.String("contentTopic", env.Message().ContentTopic), zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
}(subscription, topics)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
56
waku/cliutils/bridge.go
Normal file
56
waku/cliutils/bridge.go
Normal file
@ -0,0 +1,56 @@
|
||||
package cliutils
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
type BridgeTopic struct {
|
||||
FromTopic string
|
||||
ToTopic string
|
||||
}
|
||||
|
||||
func (p BridgeTopic) String() string {
|
||||
return fmt.Sprintf("%s:%s", p.FromTopic, p.ToTopic)
|
||||
}
|
||||
|
||||
type BridgeTopicSlice struct {
|
||||
Values *[]BridgeTopic
|
||||
}
|
||||
|
||||
func (k *BridgeTopicSlice) Set(value string) error {
|
||||
topicParts := strings.Split(value, ":")
|
||||
if len(topicParts) != 2 {
|
||||
return errors.New("expected from_topic:to_topic")
|
||||
}
|
||||
|
||||
for i := range topicParts {
|
||||
topicParts[i] = strings.TrimSpace(topicParts[i])
|
||||
}
|
||||
|
||||
if slices.Contains(topicParts, "") {
|
||||
return errors.New("topic can't be empty")
|
||||
}
|
||||
|
||||
*k.Values = append(*k.Values, BridgeTopic{
|
||||
FromTopic: topicParts[0],
|
||||
ToTopic: topicParts[1],
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (k *BridgeTopicSlice) String() string {
|
||||
if k.Values == nil {
|
||||
return ""
|
||||
}
|
||||
var output []string
|
||||
for _, v := range *k.Values {
|
||||
output = append(output, v.String())
|
||||
}
|
||||
|
||||
return strings.Join(output, ", ")
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user