From d13b1f0aa3008d29c6c16d5bf443f78c7d5e43ae Mon Sep 17 00:00:00 2001 From: Prem Chaitanya Prathi Date: Wed, 6 Sep 2023 10:07:21 +0530 Subject: [PATCH] autosharding content-topic config (#696) * chore: add shard choice simulation test * feat: add new flags for pubsub and contentTopics and deprecate topic flag * chore: remove store-resume-peer config and comment out functionality until redesign of store is done * chore: fix code to use contentTopics value * fix: use default waku topic only if no other topics are provided in the config --- cmd/waku/flags.go | 22 ++++++----- cmd/waku/main.go | 3 +- cmd/waku/node.go | 67 +++++++++++++++++--------------- cmd/waku/options.go | 10 +++-- examples/basic2/main.go | 1 - waku/v2/protocol/pubsub_topic.go | 2 +- waku/v2/protocol/shard.go | 2 +- waku/v2/protocol/topic_test.go | 61 +++++++++++++++++++++++++++++ 8 files changed, 119 insertions(+), 49 deletions(-) diff --git a/cmd/waku/flags.go b/cmd/waku/flags.go index b0d54157..89807747 100644 --- a/cmd/waku/flags.go +++ b/cmd/waku/flags.go @@ -227,10 +227,22 @@ var ( }) Topics = altsrc.NewStringSliceFlag(&cli.StringSliceFlag{ Name: "topic", - Usage: "Pubsub topic to subscribe to. Argument may be repeated", + Usage: "Default topic to subscribe to. Argument may be repeated. Deprecated! Please use pubsub-topic and/or content-topic instead.", Destination: &options.Relay.Topics, EnvVars: []string{"WAKUNODE2_TOPICS"}, }) + PubSubTopics = altsrc.NewStringSliceFlag(&cli.StringSliceFlag{ + Name: "pubsub-topic", + Usage: "Default pubsub topic to subscribe to. Argument may be repeated.", + Destination: &options.Relay.PubSubTopics, + EnvVars: []string{"WAKUNODE2_PUBSUB_TOPICS"}, + }) + ContentTopics = altsrc.NewStringSliceFlag(&cli.StringSliceFlag{ + Name: "content-topic", + Usage: "Default content topic to subscribe to. Argument may be repeated.", + Destination: &options.Relay.ContentTopics, + EnvVars: []string{"WAKUNODE2_CONTENT_TOPICS"}, + }) ProtectedTopics = cliutils.NewGenericFlagMultiValue(&cli.GenericFlag{ Name: "protected-topic", Usage: "Topics and its public key to be used for message validation, topic:pubkey. Argument may be repeated.", @@ -301,14 +313,6 @@ var ( Value: true, EnvVars: []string{"WAKUNODE2_STORE_MESSAGE_DB_MIGRATION"}, }) - StoreResumePeer = cliutils.NewGenericFlagMultiValue(&cli.GenericFlag{ - Name: "store-resume-peer", - Usage: "Peer multiaddress to resume the message store at boot. Option may be repeated", - Value: &cliutils.MultiaddrSlice{ - Values: &options.Store.ResumeNodes, - }, - EnvVars: []string{"WAKUNODE2_STORE_RESUME_PEER"}, - }) FilterFlag = altsrc.NewBoolFlag(&cli.BoolFlag{ Name: "filter", Usage: "Enable filter protocol", diff --git a/cmd/waku/main.go b/cmd/waku/main.go index a4192839..db7ae6e5 100644 --- a/cmd/waku/main.go +++ b/cmd/waku/main.go @@ -49,6 +49,8 @@ func main() { AgentString, Relay, Topics, + ContentTopics, + PubSubTopics, ProtectedTopics, RelayPeerExchange, MinRelayPeersToPublish, @@ -59,7 +61,6 @@ func main() { StoreMessageRetentionCapacity, StoreMessageDBVacuum, StoreMessageDBMigration, - StoreResumePeer, FilterFlag, FilterNode, FilterTimeout, diff --git a/cmd/waku/node.go b/cmd/waku/node.go index 36b0782e..3d778682 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -27,7 +27,6 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p/enode" dssql "github.com/ipfs/go-ds-sql" - "github.com/urfave/cli/v2" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -48,6 +47,7 @@ import ( "github.com/waku-org/go-waku/waku/persistence" "github.com/waku-org/go-waku/waku/v2/dnsdisc" "github.com/waku-org/go-waku/waku/v2/node" + wprotocol "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter" "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" @@ -305,6 +305,9 @@ func Execute(options NodeOptions) { addStaticPeers(wakuNode, options.Filter.NodesV1, legacy_filter.FilterID_v20beta1) } + //Process pubSub and contentTopics specified and arrive at all corresponding pubSubTopics + pubSubTopicMap := processTopics(options) + if err = wakuNode.Start(ctx); err != nil { logger.Fatal("starting waku node", zap.Error(err)) } @@ -318,14 +321,10 @@ func Execute(options NodeOptions) { addStaticPeers(wakuNode, options.Rendezvous.Nodes, rendezvous.RendezvousID) addStaticPeers(wakuNode, options.Filter.Nodes, filter.FilterSubscribeID_v20beta1) - if len(options.Relay.Topics.Value()) == 0 { - options.Relay.Topics = *cli.NewStringSlice(relay.DefaultWakuTopic) - } - var wg sync.WaitGroup if options.Relay.Enable { - for _, nodeTopic := range options.Relay.Topics.Value() { + for nodeTopic := range pubSubTopicMap { nodeTopic := nodeTopic sub, err := wakuNode.Relay().SubscribeToTopic(ctx, nodeTopic) failOnErr(err, "Error subscring to topic") @@ -435,32 +434,6 @@ func Execute(options NodeOptions) { } } - if options.Store.Enable && len(options.Store.ResumeNodes) != 0 { - // TODO: extract this to a function and run it when you go offline - // TODO: determine if a store is listening to a topic - - var peerIDs []peer.ID - for _, n := range options.Store.ResumeNodes { - pID, err := wakuNode.AddPeer(n, wakupeerstore.Static, store.StoreID_v20beta4) - if err != nil { - logger.Warn("adding peer to peerstore", logging.MultiAddrs("peer", n), zap.Error(err)) - } - peerIDs = append(peerIDs, pID) - } - - for _, t := range options.Relay.Topics.Value() { - wg.Add(1) - go func(topic string) { - defer wg.Done() - ctxWithTimeout, ctxCancel := context.WithTimeout(ctx, 20*time.Second) - defer ctxCancel() - if _, err := wakuNode.Store().Resume(ctxWithTimeout, topic, peerIDs); err != nil { - logger.Error("Could not resume history", zap.Error(err)) - } - }(t) - } - } - var rpcServer *rpc.WakuRpc if options.RPCServer.Enable { rpcServer = rpc.NewWakuRpc(wakuNode, options.RPCServer.Address, options.RPCServer.Port, options.RPCServer.Admin, options.PProf, options.RPCServer.RelayCacheCapacity, logger) @@ -507,6 +480,36 @@ func Execute(options NodeOptions) { } } +func processTopics(options NodeOptions) map[string]struct{} { + //Using a map to avoid duplicate pub-sub topics that can result from autosharding + // or same-topic being passed twice. + pubSubTopicMap := make(map[string]struct{}) + + for _, topic := range options.Relay.Topics.Value() { + pubSubTopicMap[topic] = struct{}{} + } + + for _, topic := range options.Relay.PubSubTopics.Value() { + pubSubTopicMap[topic] = struct{}{} + } + + //Get pubSub topics from contentTopics if they are as per autosharding + for _, cTopic := range options.Relay.ContentTopics.Value() { + contentTopic, err := wprotocol.StringToContentTopic(cTopic) + if err != nil { + failOnErr(err, "failed to parse content topic") + } + pTopic := wprotocol.GetShardFromContentTopic(contentTopic, wprotocol.GenerationZeroShardsCount) + pubSubTopicMap[pTopic.String()] = struct{}{} + } + //If no topics are passed, then use default waku topic. + if len(pubSubTopicMap) == 0 { + pubSubTopicMap[relay.DefaultWakuTopic] = struct{}{} + } + + return pubSubTopicMap +} + func addStaticPeers(wakuNode *node.WakuNode, addresses []multiaddr.Multiaddr, protocols ...protocol.ID) { for _, addr := range addresses { _, err := wakuNode.AddPeer(addr, wakupeerstore.Static, protocols...) diff --git a/cmd/waku/options.go b/cmd/waku/options.go index 1facf0b3..1d35a5a0 100644 --- a/cmd/waku/options.go +++ b/cmd/waku/options.go @@ -26,6 +26,8 @@ type RelayOptions struct { Enable bool Topics cli.StringSlice ProtectedTopics []cliutils.ProtectedTopic + PubSubTopics cli.StringSlice + ContentTopics cli.StringSlice PeerExchange bool MinRelayPeersToPublish int } @@ -76,10 +78,10 @@ type StoreOptions struct { DatabaseURL string RetentionTime time.Duration RetentionMaxMessages int - ResumeNodes []multiaddr.Multiaddr - Nodes []multiaddr.Multiaddr - Vacuum bool - Migration bool + //ResumeNodes []multiaddr.Multiaddr + Nodes []multiaddr.Multiaddr + Vacuum bool + Migration bool } // DNSDiscoveryOptions are settings used for enabling DNS-based discovery diff --git a/examples/basic2/main.go b/examples/basic2/main.go index e43521a0..a5407bd7 100644 --- a/examples/basic2/main.go +++ b/examples/basic2/main.go @@ -29,7 +29,6 @@ func main() { return } contentTopic := cTopic.String() - hostAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0") key, err := randomHex(32) if err != nil { diff --git a/waku/v2/protocol/pubsub_topic.go b/waku/v2/protocol/pubsub_topic.go index 9ed1f030..59ceaa40 100644 --- a/waku/v2/protocol/pubsub_topic.go +++ b/waku/v2/protocol/pubsub_topic.go @@ -94,7 +94,7 @@ type StaticShardingPubsubTopic struct { } // NewStaticShardingPubsubTopic creates a new pubSub topic -func NewStaticShardingPubsubTopic(cluster uint16, shard uint16) NamespacedPubsubTopic { +func NewStaticShardingPubsubTopic(cluster uint16, shard uint16) StaticShardingPubsubTopic { return StaticShardingPubsubTopic{ kind: StaticSharding, cluster: cluster, diff --git a/waku/v2/protocol/shard.go b/waku/v2/protocol/shard.go index 1bad7546..79fb4b14 100644 --- a/waku/v2/protocol/shard.go +++ b/waku/v2/protocol/shard.go @@ -217,7 +217,7 @@ func FromBitVector(buf []byte) (RelayShards, error) { // GetShardFromContentTopic runs Autosharding logic and returns a pubSubTopic // This is based on Autosharding algorithm defined in RFC 51 -func GetShardFromContentTopic(topic ContentTopic, shardCount int) NamespacedPubsubTopic { +func GetShardFromContentTopic(topic ContentTopic, shardCount int) StaticShardingPubsubTopic { bytes := []byte(topic.ApplicationName) bytes = append(bytes, []byte(fmt.Sprintf("%d", topic.ApplicationVersion))...) diff --git a/waku/v2/protocol/topic_test.go b/waku/v2/protocol/topic_test.go index 9b5dbe9b..e96c646b 100644 --- a/waku/v2/protocol/topic_test.go +++ b/waku/v2/protocol/topic_test.go @@ -1,7 +1,9 @@ package protocol import ( + "math/rand" "testing" + "time" "github.com/stretchr/testify/require" ) @@ -53,6 +55,65 @@ func TestContentTopicAndSharding(t *testing.T) { require.Equal(t, ct5.Generation, 0) } +func randomContentTopic() (ContentTopic, error) { + var app = "" + const WordLength = 5 + rand.New(rand.NewSource(time.Now().Unix())) + + //Generate a random character between lowercase a to z + for i := 0; i < WordLength; i++ { + randomChar := 'a' + rune(rand.Intn(26)) + app = app + string(randomChar) + } + version := uint32(1) + + var name = "" + + for i := 0; i < WordLength; i++ { + randomChar := 'a' + rune(rand.Intn(26)) + name = name + string(randomChar) + } + var enc = "proto" + + return NewContentTopic(app, version, name, enc) +} + +func TestShardChoiceSimulation(t *testing.T) { + //Given + var topics []ContentTopic + for i := 0; i < 100000; i++ { + ct, err := randomContentTopic() + require.NoError(t, err) + topics = append(topics, ct) + } + + var counts [GenerationZeroShardsCount]int + + // When + for _, topic := range topics { + pubsub := GetShardFromContentTopic(topic, GenerationZeroShardsCount) + counts[pubsub.Shard()]++ + } + + t.Logf("Total number of topics simulated %d", len(topics)) + for i := 0; i < GenerationZeroShardsCount; i++ { + t.Logf("Topics assigned to shard %d is %d", i, counts[i]) + } + + // Then + for i := 1; i < GenerationZeroShardsCount; i++ { + //t.Logf("float64(counts[%d]) %f float64(counts[%d]) %f", i-1, float64(counts[i-1]), i, float64(counts[i])) + if float64(counts[i-1]) <= (float64(counts[i])*1.05) && + float64(counts[i]) <= (float64(counts[i-1])*1.05) && + float64(counts[i-1]) >= (float64(counts[i])*0.95) && + float64(counts[i]) >= (float64(counts[i-1])*0.95) { + t.Logf("Shard choice simulation successful") + } else { + t.FailNow() + } + } +} + func TestNsPubsubTopic(t *testing.T) { ns1 := NewNamedShardingPubsubTopic("waku-dev") require.Equal(t, "/waku/2/waku-dev", ns1.String())