Merge branch 'master' into fix/rln-issues

This commit is contained in:
Prem Chaitanya Prathi 2023-12-13 21:28:10 +05:30 committed by GitHub
commit 625f0a3f99
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 43 additions and 158 deletions

2
.gitmodules vendored
View File

@ -3,4 +3,4 @@
url = https://github.com/waku-org/waku-rln-contract.git
[submodule "waku/v2/protocol/waku-proto"]
path = waku/v2/protocol/waku-proto
url = git@github.com:waku-org/waku-proto.git
url = https://github.com/waku-org/waku-proto

View File

@ -264,15 +264,6 @@ var (
Destination: &options.Relay.ContentTopics,
EnvVars: []string{"WAKUNODE2_CONTENT_TOPICS"},
})
BridgeTopics = altsrc.NewGenericFlag(&cli.GenericFlag{
Name: "bridge-topics",
Usage: "Bridge two pubsub topics, from_topic:to_topic. Argument may be repeated.",
EnvVars: []string{"WAKUNODE2_BRIDGE_TOPIC"},
Value: &cliutils.BridgeTopicSlice{
Values: &options.Relay.BridgeTopics,
},
Hidden: true,
})
ProtectedTopics = cliutils.NewGenericFlagMultiValue(&cli.GenericFlag{
Name: "protected-topic",
Usage: "Topics and its public key to be used for message validation, topic:pubkey. Argument may be repeated.",

View File

@ -56,7 +56,6 @@ func main() {
Topics,
ContentTopics,
PubSubTopics,
BridgeTopics,
ProtectedTopics,
RelayPeerExchange,
MinRelayPeersToPublish,

View File

@ -25,7 +25,6 @@ type DiscV5Options struct {
type RelayOptions struct {
Enable bool
Topics cli.StringSlice
BridgeTopics []cliutils.BridgeTopic
ProtectedTopics []cliutils.ProtectedTopic
PubSubTopics cli.StringSlice
ContentTopics cli.StringSlice

View File

@ -1,23 +1,16 @@
package main
import (
"bytes"
"context"
"sync"
"time"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/node"
wprotocol "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/rendezvous"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)
var fwdMetaTag = []byte{102, 119, 100} //"fwd"
func handleRelayTopics(ctx context.Context, wg *sync.WaitGroup, wakuNode *node.WakuNode, pubSubTopicMap map[string][]string) error {
for nodeTopic, cTopics := range pubSubTopicMap {
nodeTopic := nodeTopic
@ -85,72 +78,5 @@ func handleRelayTopics(ctx context.Context, wg *sync.WaitGroup, wakuNode *node.W
}
}
err := bridgeTopics(ctx, wg, wakuNode)
if err != nil {
return err
}
return nil
}
func bridgeTopics(ctx context.Context, wg *sync.WaitGroup, wakuNode *node.WakuNode) error {
// Bridge topics
bridgedTopics := make(map[string]map[string]struct{})
bridgedTopicsSet := make(map[string]struct{})
for _, topics := range options.Relay.BridgeTopics {
_, ok := bridgedTopics[topics.FromTopic]
if !ok {
bridgedTopics[topics.FromTopic] = make(map[string]struct{})
}
bridgedTopics[topics.FromTopic][topics.ToTopic] = struct{}{}
bridgedTopicsSet[topics.FromTopic] = struct{}{}
bridgedTopicsSet[topics.ToTopic] = struct{}{}
}
// Make sure all topics are subscribed
for _, topic := range maps.Keys(bridgedTopicsSet) {
if !wakuNode.Relay().IsSubscribed(topic) {
_, err := wakuNode.Relay().Subscribe(ctx, wprotocol.NewContentFilter(topic), relay.WithoutConsumer())
if err != nil {
return err
}
}
}
for fromTopic, toTopics := range bridgedTopics {
subscriptions, err := wakuNode.Relay().Subscribe(ctx, wprotocol.NewContentFilter(fromTopic))
if err != nil {
return err
}
topics := maps.Keys(toTopics)
for _, subscription := range subscriptions {
wg.Add(1)
go func(subscription *relay.Subscription, topics []string) {
defer wg.Done()
for env := range subscription.Ch {
for _, topic := range topics {
// HACK: message has been already fwded
metaLen := len(env.Message().Meta)
fwdTagLen := len(fwdMetaTag)
if metaLen >= fwdTagLen && bytes.Equal(env.Message().Meta[metaLen-fwdTagLen:], fwdMetaTag) {
continue
}
// HACK: We append magic numbers here, just so the pubsub message ID will change
env.Message().Meta = append(env.Message().Meta, fwdMetaTag...)
_, err := wakuNode.Relay().Publish(ctx, env.Message(), relay.WithPubSubTopic(topic))
if err != nil {
utils.Logger().Warn("could not bridge message", logging.HexBytes("hash", env.Hash()),
zap.String("fromTopic", env.PubsubTopic()), zap.String("toTopic", topic),
zap.String("contentTopic", env.Message().ContentTopic), zap.Error(err))
}
}
}
}(subscription, topics)
}
}
return nil
}

View File

@ -1,56 +0,0 @@
package cliutils
import (
"errors"
"fmt"
"strings"
"golang.org/x/exp/slices"
)
type BridgeTopic struct {
FromTopic string
ToTopic string
}
func (p BridgeTopic) String() string {
return fmt.Sprintf("%s:%s", p.FromTopic, p.ToTopic)
}
type BridgeTopicSlice struct {
Values *[]BridgeTopic
}
func (k *BridgeTopicSlice) Set(value string) error {
topicParts := strings.Split(value, ":")
if len(topicParts) != 2 {
return errors.New("expected from_topic:to_topic")
}
for i := range topicParts {
topicParts[i] = strings.TrimSpace(topicParts[i])
}
if slices.Contains(topicParts, "") {
return errors.New("topic can't be empty")
}
*k.Values = append(*k.Values, BridgeTopic{
FromTopic: topicParts[0],
ToTopic: topicParts[1],
})
return nil
}
func (k *BridgeTopicSlice) String() string {
if k.Values == nil {
return ""
}
var output []string
for _, v := range *k.Values {
output = append(output, v.String())
}
return strings.Join(output, ", ")
}

View File

@ -272,16 +272,14 @@ func (d *DiscoveryV5) evaluateNode() func(node *enode.Node) bool {
if node == nil {
return false
}
d.log.Debug("found a peer", logging.ENode("enr", node))
// node filtering based on ENR; we do not filter based on ENR in the first waku discv5 beta stage
if !isWakuNode(node) {
d.log.Debug("peer is not waku node", logging.ENode("enr", node))
return false
}
d.log.Debug("peer is a waku node", logging.ENode("enr", node))
_, err := wenr.EnodeToPeerInfo(node)
_, err := wenr.EnodeToPeerInfo(node)
if err != nil {
d.metrics.RecordError(peerInfoFailure)
d.log.Error("obtaining peer info from enode", logging.ENode("enr", node), zap.Error(err))
@ -411,20 +409,17 @@ func (d *DiscoveryV5) DefaultPredicate() Predicate {
}
if nodeRS == nil {
d.log.Debug("node has no shards registered", logging.ENode("node", n))
// Node has no shards registered.
return false
}
if nodeRS.ClusterID != localRS.ClusterID {
d.log.Debug("cluster id mismatch from local clusterid", logging.ENode("node", n), zap.Error(err))
return false
}
// Contains any
for _, idx := range localRS.ShardIDs {
if nodeRS.Contains(localRS.ClusterID, idx) {
d.log.Debug("shards match for discovered node", logging.ENode("node", n))
return true
}
}

View File

@ -352,7 +352,6 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) {
enr, err := pm.host.Peerstore().(wps.WakuPeerstore).ENR(p.AddrInfo.ID)
// Verifying if the enr record is more recent (DiscV5 and peer exchange can return peers already seen)
if err == nil && enr.Record().Seq() > p.ENR.Seq() {
pm.logger.Debug("found discovered peer already in peerStore", logging.HostID("peer", p.AddrInfo.ID))
return
}
}

View File

@ -24,7 +24,7 @@ func TestEnvelope(t *testing.T) {
require.Equal(
t,
[]uint8{70, 218, 246, 174, 188, 127, 199, 220, 111, 30, 61, 218, 238, 60, 83, 3, 179, 98, 85, 35, 7, 107, 188, 138, 32, 70, 170, 126, 55, 21, 71, 70},
[]byte{0x91, 0x0, 0xe4, 0xa5, 0xcf, 0xf7, 0x19, 0x27, 0x49, 0x81, 0x66, 0xb3, 0xdf, 0xc7, 0xa6, 0x31, 0xf0, 0x87, 0xc7, 0x29, 0xb4, 0x28, 0x83, 0xb9, 0x5c, 0x31, 0x25, 0x33, 0x3, 0xc9, 0x7, 0x95},
hash,
)
}

View File

@ -53,6 +53,10 @@ func NewWakuLightPush(relay *relay.WakuRelay, pm *peermanager.PeerManager, reg p
wakuLP.pm = pm
wakuLP.metrics = newMetrics(reg)
if pm != nil {
wakuLP.pm.RegisterWakuProtocol(LightPushID_v20beta1, LightPushENRField)
}
return wakuLP
}
@ -73,9 +77,6 @@ func (wakuLP *WakuLightPush) Start(ctx context.Context) error {
wakuLP.h.SetStreamHandlerMatch(LightPushID_v20beta1, protocol.PrefixTextMatch(string(LightPushID_v20beta1)), wakuLP.onRequest(ctx))
wakuLP.log.Info("Light Push protocol started")
if wakuLP.pm != nil {
wakuLP.pm.RegisterWakuProtocol(LightPushID_v20beta1, LightPushENRField)
}
return nil
}

View File

@ -1,10 +1,18 @@
package pb
import (
"encoding/binary"
"github.com/waku-org/go-waku/waku/v2/hash"
)
// Hash calculates the hash of a waku message
func (msg *WakuMessage) Hash(pubsubTopic string) []byte {
return hash.SHA256([]byte(pubsubTopic), msg.Payload, []byte(msg.ContentTopic), msg.Meta)
return hash.SHA256([]byte(pubsubTopic), msg.Payload, []byte(msg.ContentTopic), msg.Meta, toBytes(msg.GetTimestamp()))
}
func toBytes(i int64) []byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(i))
return b
}

View File

@ -22,7 +22,7 @@ func TestEnvelopeHash(t *testing.T) {
msg.Timestamp = proto.Int64(123456789123456789)
msg.Version = proto.Uint32(1)
expected := []byte{0xee, 0xcf, 0xf5, 0xb7, 0xdd, 0x54, 0x2d, 0x68, 0x9e, 0x7d, 0x64, 0xa3, 0xb8, 0x50, 0x8b, 0xba, 0xc, 0xf1, 0xac, 0xb6, 0xf7, 0x1c, 0x9f, 0xf2, 0x32, 0x7, 0x5b, 0xfd, 0x90, 0x5c, 0xe5, 0xa1}
expected := []byte{0xb6, 0x59, 0x60, 0x7f, 0x2a, 0xae, 0x18, 0x84, 0x8d, 0xca, 0xa7, 0xd5, 0x1c, 0xb3, 0x7e, 0x6c, 0xc6, 0xfc, 0x33, 0x40, 0x2c, 0x70, 0x4f, 0xf0, 0xc0, 0x16, 0x33, 0x7d, 0x83, 0xad, 0x61, 0x50}
result := msg.Hash("test")
require.Equal(t, expected, result)
}
@ -39,7 +39,7 @@ func TestEmptyMeta(t *testing.T) {
messageHash := msg.Hash(pubsubTopic)
require.Equal(t, "87619d05e563521d9126749b45bd4cc2430df0607e77e23572d874ed9c1aaa62", hex.EncodeToString(messageHash))
require.Equal(t, "f0183c2e370e473ff471bbe1028d0d8a940949c02f3007a1ccd21fed356852a0", hex.EncodeToString(messageHash))
}
func Test13ByteMeta(t *testing.T) {
@ -48,11 +48,12 @@ func Test13ByteMeta(t *testing.T) {
msg.ContentTopic = "/waku/2/default-content/proto"
msg.Payload = []byte("\x01\x02\x03\x04TEST\x05\x06\x07\x08")
msg.Meta = []byte("\x73\x75\x70\x65\x72\x2d\x73\x65\x63\x72\x65\x74")
msg.Timestamp = proto.Int64(123456789123456789)
msg.Version = proto.Uint32(1)
messageHash := msg.Hash(pubsubTopic)
require.Equal(t, "4fdde1099c9f77f6dae8147b6b3179aba1fc8e14a7bf35203fc253ee479f135f", hex.EncodeToString(messageHash))
require.Equal(t, "f673cd2c9c973d685b52ca74c2559e001733a3a31a49ffc7b6e8713decba5a55", hex.EncodeToString(messageHash))
}
func TestZeroLenPayload(t *testing.T) {
@ -61,9 +62,31 @@ func TestZeroLenPayload(t *testing.T) {
msg.ContentTopic = "/waku/2/default-content/proto"
msg.Payload = []byte{}
msg.Meta = []byte("\x73\x75\x70\x65\x72\x2d\x73\x65\x63\x72\x65\x74")
msg.Timestamp = proto.Int64(123456789123456789)
msg.Version = proto.Uint32(1)
messageHash := msg.Hash(pubsubTopic)
require.Equal(t, "e1a9596237dbe2cc8aaf4b838c46a7052df6bc0d42ba214b998a8bfdbe8487d6", hex.EncodeToString(messageHash))
require.Equal(t, "978ccc9a665029f9829d42d84e3a49ad3a4791cce53fb5a8b581ef43ad6b4d2f", hex.EncodeToString(messageHash))
}
func TestHashWithTimestamp(t *testing.T) {
pubsubTopic := "/waku/2/default-waku/proto"
msg := new(WakuMessage)
msg.ContentTopic = "/waku/2/default-content/proto"
msg.Payload = []byte{}
msg.Meta = []byte("\x73\x75\x70\x65\x72\x2d\x73\x65\x63\x72\x65\x74")
msg.Version = proto.Uint32(1)
messageHash := msg.Hash(pubsubTopic)
require.Equal(t, "58e2fc032a82c4adeb967a8b87086d0d6fb304912f120d4404e6236add8f1f56", hex.EncodeToString(messageHash))
msg.Timestamp = proto.Int64(123456789123456789)
messageHash = msg.Hash(pubsubTopic)
require.Equal(t, "978ccc9a665029f9829d42d84e3a49ad3a4791cce53fb5a8b581ef43ad6b4d2f", hex.EncodeToString(messageHash))
}
func TestIntToBytes(t *testing.T) {
require.Equal(t, []byte{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x27, 0x10}, toBytes(10000))
require.Equal(t, []byte{0x0, 0x0, 0x0, 0x0, 0x0, 0x98, 0x96, 0x80}, toBytes(10000000))
}