From fd4df9221ee5c0f50ba1c4e39b645c0fa2f974b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Fri, 8 Dec 2023 11:55:15 -0400 Subject: [PATCH 1/6] fix(lightpush): register lightpush protocol when instantiated (#951) --- waku/v2/protocol/lightpush/waku_lightpush.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 5cdcc6c3..361f12ba 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -53,6 +53,10 @@ func NewWakuLightPush(relay *relay.WakuRelay, pm *peermanager.PeerManager, reg p wakuLP.pm = pm wakuLP.metrics = newMetrics(reg) + if pm != nil { + wakuLP.pm.RegisterWakuProtocol(LightPushID_v20beta1, LightPushENRField) + } + return wakuLP } @@ -73,9 +77,6 @@ func (wakuLP *WakuLightPush) Start(ctx context.Context) error { wakuLP.h.SetStreamHandlerMatch(LightPushID_v20beta1, protocol.PrefixTextMatch(string(LightPushID_v20beta1)), wakuLP.onRequest(ctx)) wakuLP.log.Info("Light Push protocol started") - if wakuLP.pm != nil { - wakuLP.pm.RegisterWakuProtocol(LightPushID_v20beta1, LightPushENRField) - } return nil } From e340337d64622d22cb94a969255efe4e36637df0 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Fri, 8 Dec 2023 14:40:30 -0400 Subject: [PATCH 2/6] fix: use https instead of ssh for submodules --- .gitmodules | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitmodules b/.gitmodules index 16cbf156..83e12df5 100644 --- a/.gitmodules +++ b/.gitmodules @@ -3,4 +3,4 @@ url = https://github.com/waku-org/waku-rln-contract.git [submodule "waku/v2/protocol/waku-proto"] path = waku/v2/protocol/waku-proto - url = git@github.com:waku-org/waku-proto.git + url = https://github.com/waku-org/waku-proto From 0ba1b630662bd1c53d2d56ba3e53f2d0608284d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Sat, 9 Dec 2023 13:59:35 -0400 Subject: [PATCH 3/6] chore: remove bridging topics feature (#949) --- cmd/waku/flags.go | 9 ----- cmd/waku/main.go | 1 - cmd/waku/options.go | 1 - cmd/waku/relay.go | 74 ----------------------------------------- waku/cliutils/bridge.go | 56 ------------------------------- 5 files changed, 141 deletions(-) delete mode 100644 waku/cliutils/bridge.go diff --git a/cmd/waku/flags.go b/cmd/waku/flags.go index 56a8def3..91a82d5c 100644 --- a/cmd/waku/flags.go +++ b/cmd/waku/flags.go @@ -264,15 +264,6 @@ var ( Destination: &options.Relay.ContentTopics, EnvVars: []string{"WAKUNODE2_CONTENT_TOPICS"}, }) - BridgeTopics = altsrc.NewGenericFlag(&cli.GenericFlag{ - Name: "bridge-topics", - Usage: "Bridge two pubsub topics, from_topic:to_topic. Argument may be repeated.", - EnvVars: []string{"WAKUNODE2_BRIDGE_TOPIC"}, - Value: &cliutils.BridgeTopicSlice{ - Values: &options.Relay.BridgeTopics, - }, - Hidden: true, - }) ProtectedTopics = cliutils.NewGenericFlagMultiValue(&cli.GenericFlag{ Name: "protected-topic", Usage: "Topics and its public key to be used for message validation, topic:pubkey. Argument may be repeated.", diff --git a/cmd/waku/main.go b/cmd/waku/main.go index ce6c3a17..f868505f 100644 --- a/cmd/waku/main.go +++ b/cmd/waku/main.go @@ -56,7 +56,6 @@ func main() { Topics, ContentTopics, PubSubTopics, - BridgeTopics, ProtectedTopics, RelayPeerExchange, MinRelayPeersToPublish, diff --git a/cmd/waku/options.go b/cmd/waku/options.go index d85f094a..ee32bb2d 100644 --- a/cmd/waku/options.go +++ b/cmd/waku/options.go @@ -25,7 +25,6 @@ type DiscV5Options struct { type RelayOptions struct { Enable bool Topics cli.StringSlice - BridgeTopics []cliutils.BridgeTopic ProtectedTopics []cliutils.ProtectedTopic PubSubTopics cli.StringSlice ContentTopics cli.StringSlice diff --git a/cmd/waku/relay.go b/cmd/waku/relay.go index 6b59f941..26ae1de3 100644 --- a/cmd/waku/relay.go +++ b/cmd/waku/relay.go @@ -1,23 +1,16 @@ package main import ( - "bytes" "context" "sync" "time" - "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/v2/node" wprotocol "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/rendezvous" - "github.com/waku-org/go-waku/waku/v2/utils" - "go.uber.org/zap" - "golang.org/x/exp/maps" ) -var fwdMetaTag = []byte{102, 119, 100} //"fwd" - func handleRelayTopics(ctx context.Context, wg *sync.WaitGroup, wakuNode *node.WakuNode, pubSubTopicMap map[string][]string) error { for nodeTopic, cTopics := range pubSubTopicMap { nodeTopic := nodeTopic @@ -85,72 +78,5 @@ func handleRelayTopics(ctx context.Context, wg *sync.WaitGroup, wakuNode *node.W } } - err := bridgeTopics(ctx, wg, wakuNode) - if err != nil { - return err - } - - return nil -} - -func bridgeTopics(ctx context.Context, wg *sync.WaitGroup, wakuNode *node.WakuNode) error { - // Bridge topics - bridgedTopics := make(map[string]map[string]struct{}) - bridgedTopicsSet := make(map[string]struct{}) - for _, topics := range options.Relay.BridgeTopics { - _, ok := bridgedTopics[topics.FromTopic] - if !ok { - bridgedTopics[topics.FromTopic] = make(map[string]struct{}) - } - - bridgedTopics[topics.FromTopic][topics.ToTopic] = struct{}{} - bridgedTopicsSet[topics.FromTopic] = struct{}{} - bridgedTopicsSet[topics.ToTopic] = struct{}{} - } - - // Make sure all topics are subscribed - for _, topic := range maps.Keys(bridgedTopicsSet) { - if !wakuNode.Relay().IsSubscribed(topic) { - _, err := wakuNode.Relay().Subscribe(ctx, wprotocol.NewContentFilter(topic), relay.WithoutConsumer()) - if err != nil { - return err - } - } - } - - for fromTopic, toTopics := range bridgedTopics { - subscriptions, err := wakuNode.Relay().Subscribe(ctx, wprotocol.NewContentFilter(fromTopic)) - if err != nil { - return err - } - - topics := maps.Keys(toTopics) - for _, subscription := range subscriptions { - wg.Add(1) - go func(subscription *relay.Subscription, topics []string) { - defer wg.Done() - for env := range subscription.Ch { - for _, topic := range topics { - // HACK: message has been already fwded - metaLen := len(env.Message().Meta) - fwdTagLen := len(fwdMetaTag) - if metaLen >= fwdTagLen && bytes.Equal(env.Message().Meta[metaLen-fwdTagLen:], fwdMetaTag) { - continue - } - - // HACK: We append magic numbers here, just so the pubsub message ID will change - env.Message().Meta = append(env.Message().Meta, fwdMetaTag...) - _, err := wakuNode.Relay().Publish(ctx, env.Message(), relay.WithPubSubTopic(topic)) - if err != nil { - utils.Logger().Warn("could not bridge message", logging.HexBytes("hash", env.Hash()), - zap.String("fromTopic", env.PubsubTopic()), zap.String("toTopic", topic), - zap.String("contentTopic", env.Message().ContentTopic), zap.Error(err)) - } - } - } - }(subscription, topics) - } - } - return nil } diff --git a/waku/cliutils/bridge.go b/waku/cliutils/bridge.go deleted file mode 100644 index 2ba1c59a..00000000 --- a/waku/cliutils/bridge.go +++ /dev/null @@ -1,56 +0,0 @@ -package cliutils - -import ( - "errors" - "fmt" - "strings" - - "golang.org/x/exp/slices" -) - -type BridgeTopic struct { - FromTopic string - ToTopic string -} - -func (p BridgeTopic) String() string { - return fmt.Sprintf("%s:%s", p.FromTopic, p.ToTopic) -} - -type BridgeTopicSlice struct { - Values *[]BridgeTopic -} - -func (k *BridgeTopicSlice) Set(value string) error { - topicParts := strings.Split(value, ":") - if len(topicParts) != 2 { - return errors.New("expected from_topic:to_topic") - } - - for i := range topicParts { - topicParts[i] = strings.TrimSpace(topicParts[i]) - } - - if slices.Contains(topicParts, "") { - return errors.New("topic can't be empty") - } - - *k.Values = append(*k.Values, BridgeTopic{ - FromTopic: topicParts[0], - ToTopic: topicParts[1], - }) - - return nil -} - -func (k *BridgeTopicSlice) String() string { - if k.Values == nil { - return "" - } - var output []string - for _, v := range *k.Values { - output = append(output, v.String()) - } - - return strings.Join(output, ", ") -} From c403388ec2146eb93155ee47f92f8eb030190857 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Mon, 11 Dec 2023 11:46:23 -0400 Subject: [PATCH 4/6] fix: remove extremely noisy logs (#956) --- waku/v2/discv5/discover.go | 4 +--- waku/v2/peermanager/peer_manager.go | 1 - 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index cf014adc..13496034 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -272,16 +272,14 @@ func (d *DiscoveryV5) evaluateNode() func(node *enode.Node) bool { if node == nil { return false } - d.log.Debug("found a peer", logging.ENode("enr", node)) // node filtering based on ENR; we do not filter based on ENR in the first waku discv5 beta stage if !isWakuNode(node) { d.log.Debug("peer is not waku node", logging.ENode("enr", node)) return false } - d.log.Debug("peer is a waku node", logging.ENode("enr", node)) - _, err := wenr.EnodeToPeerInfo(node) + _, err := wenr.EnodeToPeerInfo(node) if err != nil { d.metrics.RecordError(peerInfoFailure) d.log.Error("obtaining peer info from enode", logging.ENode("enr", node), zap.Error(err)) diff --git a/waku/v2/peermanager/peer_manager.go b/waku/v2/peermanager/peer_manager.go index 4eddd090..9294104a 100644 --- a/waku/v2/peermanager/peer_manager.go +++ b/waku/v2/peermanager/peer_manager.go @@ -352,7 +352,6 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) { enr, err := pm.host.Peerstore().(wps.WakuPeerstore).ENR(p.AddrInfo.ID) // Verifying if the enr record is more recent (DiscV5 and peer exchange can return peers already seen) if err == nil && enr.Record().Seq() > p.ENR.Seq() { - pm.logger.Debug("found discovered peer already in peerStore", logging.HostID("peer", p.AddrInfo.ID)) return } } From 5d1477d5b443745f431f9c4e48729f8d7e0b95a6 Mon Sep 17 00:00:00 2001 From: kaichao Date: Wed, 13 Dec 2023 22:46:23 +0800 Subject: [PATCH 5/6] fix: hash calculation of message to include timestamp (#959) --- waku/v2/protocol/envelope_test.go | 2 +- waku/v2/protocol/pb/utils.go | 10 +++++++++- waku/v2/protocol/pb/utils_test.go | 31 +++++++++++++++++++++++++++---- 3 files changed, 37 insertions(+), 6 deletions(-) diff --git a/waku/v2/protocol/envelope_test.go b/waku/v2/protocol/envelope_test.go index f0feaa44..b95426b4 100644 --- a/waku/v2/protocol/envelope_test.go +++ b/waku/v2/protocol/envelope_test.go @@ -24,7 +24,7 @@ func TestEnvelope(t *testing.T) { require.Equal( t, - []uint8{70, 218, 246, 174, 188, 127, 199, 220, 111, 30, 61, 218, 238, 60, 83, 3, 179, 98, 85, 35, 7, 107, 188, 138, 32, 70, 170, 126, 55, 21, 71, 70}, + []byte{0x91, 0x0, 0xe4, 0xa5, 0xcf, 0xf7, 0x19, 0x27, 0x49, 0x81, 0x66, 0xb3, 0xdf, 0xc7, 0xa6, 0x31, 0xf0, 0x87, 0xc7, 0x29, 0xb4, 0x28, 0x83, 0xb9, 0x5c, 0x31, 0x25, 0x33, 0x3, 0xc9, 0x7, 0x95}, hash, ) } diff --git a/waku/v2/protocol/pb/utils.go b/waku/v2/protocol/pb/utils.go index ba73b92b..d21f527c 100644 --- a/waku/v2/protocol/pb/utils.go +++ b/waku/v2/protocol/pb/utils.go @@ -1,10 +1,18 @@ package pb import ( + "encoding/binary" + "github.com/waku-org/go-waku/waku/v2/hash" ) // Hash calculates the hash of a waku message func (msg *WakuMessage) Hash(pubsubTopic string) []byte { - return hash.SHA256([]byte(pubsubTopic), msg.Payload, []byte(msg.ContentTopic), msg.Meta) + return hash.SHA256([]byte(pubsubTopic), msg.Payload, []byte(msg.ContentTopic), msg.Meta, toBytes(msg.GetTimestamp())) +} + +func toBytes(i int64) []byte { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, uint64(i)) + return b } diff --git a/waku/v2/protocol/pb/utils_test.go b/waku/v2/protocol/pb/utils_test.go index 6934666d..f15421db 100644 --- a/waku/v2/protocol/pb/utils_test.go +++ b/waku/v2/protocol/pb/utils_test.go @@ -22,7 +22,7 @@ func TestEnvelopeHash(t *testing.T) { msg.Timestamp = proto.Int64(123456789123456789) msg.Version = proto.Uint32(1) - expected := []byte{0xee, 0xcf, 0xf5, 0xb7, 0xdd, 0x54, 0x2d, 0x68, 0x9e, 0x7d, 0x64, 0xa3, 0xb8, 0x50, 0x8b, 0xba, 0xc, 0xf1, 0xac, 0xb6, 0xf7, 0x1c, 0x9f, 0xf2, 0x32, 0x7, 0x5b, 0xfd, 0x90, 0x5c, 0xe5, 0xa1} + expected := []byte{0xb6, 0x59, 0x60, 0x7f, 0x2a, 0xae, 0x18, 0x84, 0x8d, 0xca, 0xa7, 0xd5, 0x1c, 0xb3, 0x7e, 0x6c, 0xc6, 0xfc, 0x33, 0x40, 0x2c, 0x70, 0x4f, 0xf0, 0xc0, 0x16, 0x33, 0x7d, 0x83, 0xad, 0x61, 0x50} result := msg.Hash("test") require.Equal(t, expected, result) } @@ -39,7 +39,7 @@ func TestEmptyMeta(t *testing.T) { messageHash := msg.Hash(pubsubTopic) - require.Equal(t, "87619d05e563521d9126749b45bd4cc2430df0607e77e23572d874ed9c1aaa62", hex.EncodeToString(messageHash)) + require.Equal(t, "f0183c2e370e473ff471bbe1028d0d8a940949c02f3007a1ccd21fed356852a0", hex.EncodeToString(messageHash)) } func Test13ByteMeta(t *testing.T) { @@ -48,11 +48,12 @@ func Test13ByteMeta(t *testing.T) { msg.ContentTopic = "/waku/2/default-content/proto" msg.Payload = []byte("\x01\x02\x03\x04TEST\x05\x06\x07\x08") msg.Meta = []byte("\x73\x75\x70\x65\x72\x2d\x73\x65\x63\x72\x65\x74") + msg.Timestamp = proto.Int64(123456789123456789) msg.Version = proto.Uint32(1) messageHash := msg.Hash(pubsubTopic) - require.Equal(t, "4fdde1099c9f77f6dae8147b6b3179aba1fc8e14a7bf35203fc253ee479f135f", hex.EncodeToString(messageHash)) + require.Equal(t, "f673cd2c9c973d685b52ca74c2559e001733a3a31a49ffc7b6e8713decba5a55", hex.EncodeToString(messageHash)) } func TestZeroLenPayload(t *testing.T) { @@ -61,9 +62,31 @@ func TestZeroLenPayload(t *testing.T) { msg.ContentTopic = "/waku/2/default-content/proto" msg.Payload = []byte{} msg.Meta = []byte("\x73\x75\x70\x65\x72\x2d\x73\x65\x63\x72\x65\x74") + msg.Timestamp = proto.Int64(123456789123456789) msg.Version = proto.Uint32(1) messageHash := msg.Hash(pubsubTopic) - require.Equal(t, "e1a9596237dbe2cc8aaf4b838c46a7052df6bc0d42ba214b998a8bfdbe8487d6", hex.EncodeToString(messageHash)) + require.Equal(t, "978ccc9a665029f9829d42d84e3a49ad3a4791cce53fb5a8b581ef43ad6b4d2f", hex.EncodeToString(messageHash)) +} + +func TestHashWithTimestamp(t *testing.T) { + pubsubTopic := "/waku/2/default-waku/proto" + msg := new(WakuMessage) + msg.ContentTopic = "/waku/2/default-content/proto" + msg.Payload = []byte{} + msg.Meta = []byte("\x73\x75\x70\x65\x72\x2d\x73\x65\x63\x72\x65\x74") + msg.Version = proto.Uint32(1) + + messageHash := msg.Hash(pubsubTopic) + require.Equal(t, "58e2fc032a82c4adeb967a8b87086d0d6fb304912f120d4404e6236add8f1f56", hex.EncodeToString(messageHash)) + + msg.Timestamp = proto.Int64(123456789123456789) + messageHash = msg.Hash(pubsubTopic) + require.Equal(t, "978ccc9a665029f9829d42d84e3a49ad3a4791cce53fb5a8b581ef43ad6b4d2f", hex.EncodeToString(messageHash)) +} + +func TestIntToBytes(t *testing.T) { + require.Equal(t, []byte{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x27, 0x10}, toBytes(10000)) + require.Equal(t, []byte{0x0, 0x0, 0x0, 0x0, 0x0, 0x98, 0x96, 0x80}, toBytes(10000000)) } From 0218169b5f46959935a09c46f0b7b0d8aecf1a2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Wed, 13 Dec 2023 10:54:43 -0400 Subject: [PATCH 6/6] fix: remove more noisy logs from discv5 (#961) --- waku/v2/discv5/discover.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/waku/v2/discv5/discover.go b/waku/v2/discv5/discover.go index 13496034..5fe8d5c4 100644 --- a/waku/v2/discv5/discover.go +++ b/waku/v2/discv5/discover.go @@ -409,20 +409,17 @@ func (d *DiscoveryV5) DefaultPredicate() Predicate { } if nodeRS == nil { - d.log.Debug("node has no shards registered", logging.ENode("node", n)) // Node has no shards registered. return false } if nodeRS.ClusterID != localRS.ClusterID { - d.log.Debug("cluster id mismatch from local clusterid", logging.ENode("node", n), zap.Error(err)) return false } // Contains any for _, idx := range localRS.ShardIDs { if nodeRS.Contains(localRS.ClusterID, idx) { - d.log.Debug("shards match for discovered node", logging.ENode("node", n)) return true } }