autosharding content-topic config (#696)

* chore: add shard choice simulation test

* feat: add new flags for pubsub and contentTopics and deprecate topic flag

* chore: remove store-resume-peer config and comment out functionality until redesign of store is done

* chore: fix code to use contentTopics value

* fix: use default waku topic only if no other topics are provided in the config
This commit is contained in:
Prem Chaitanya Prathi 2023-09-06 10:07:21 +05:30 committed by GitHub
parent e66f0e3b9c
commit d13b1f0aa3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 119 additions and 49 deletions

View File

@ -227,10 +227,22 @@ var (
})
Topics = altsrc.NewStringSliceFlag(&cli.StringSliceFlag{
Name: "topic",
Usage: "Pubsub topic to subscribe to. Argument may be repeated",
Usage: "Default topic to subscribe to. Argument may be repeated. Deprecated! Please use pubsub-topic and/or content-topic instead.",
Destination: &options.Relay.Topics,
EnvVars: []string{"WAKUNODE2_TOPICS"},
})
PubSubTopics = altsrc.NewStringSliceFlag(&cli.StringSliceFlag{
Name: "pubsub-topic",
Usage: "Default pubsub topic to subscribe to. Argument may be repeated.",
Destination: &options.Relay.PubSubTopics,
EnvVars: []string{"WAKUNODE2_PUBSUB_TOPICS"},
})
ContentTopics = altsrc.NewStringSliceFlag(&cli.StringSliceFlag{
Name: "content-topic",
Usage: "Default content topic to subscribe to. Argument may be repeated.",
Destination: &options.Relay.ContentTopics,
EnvVars: []string{"WAKUNODE2_CONTENT_TOPICS"},
})
ProtectedTopics = cliutils.NewGenericFlagMultiValue(&cli.GenericFlag{
Name: "protected-topic",
Usage: "Topics and its public key to be used for message validation, topic:pubkey. Argument may be repeated.",
@ -301,14 +313,6 @@ var (
Value: true,
EnvVars: []string{"WAKUNODE2_STORE_MESSAGE_DB_MIGRATION"},
})
StoreResumePeer = cliutils.NewGenericFlagMultiValue(&cli.GenericFlag{
Name: "store-resume-peer",
Usage: "Peer multiaddress to resume the message store at boot. Option may be repeated",
Value: &cliutils.MultiaddrSlice{
Values: &options.Store.ResumeNodes,
},
EnvVars: []string{"WAKUNODE2_STORE_RESUME_PEER"},
})
FilterFlag = altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "filter",
Usage: "Enable filter protocol",

View File

@ -49,6 +49,8 @@ func main() {
AgentString,
Relay,
Topics,
ContentTopics,
PubSubTopics,
ProtectedTopics,
RelayPeerExchange,
MinRelayPeersToPublish,
@ -59,7 +61,6 @@ func main() {
StoreMessageRetentionCapacity,
StoreMessageDBVacuum,
StoreMessageDBMigration,
StoreResumePeer,
FilterFlag,
FilterNode,
FilterTimeout,

View File

@ -27,7 +27,6 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p/enode"
dssql "github.com/ipfs/go-ds-sql"
"github.com/urfave/cli/v2"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
@ -48,6 +47,7 @@ import (
"github.com/waku-org/go-waku/waku/persistence"
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
"github.com/waku-org/go-waku/waku/v2/node"
wprotocol "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
@ -305,6 +305,9 @@ func Execute(options NodeOptions) {
addStaticPeers(wakuNode, options.Filter.NodesV1, legacy_filter.FilterID_v20beta1)
}
//Process pubSub and contentTopics specified and arrive at all corresponding pubSubTopics
pubSubTopicMap := processTopics(options)
if err = wakuNode.Start(ctx); err != nil {
logger.Fatal("starting waku node", zap.Error(err))
}
@ -318,14 +321,10 @@ func Execute(options NodeOptions) {
addStaticPeers(wakuNode, options.Rendezvous.Nodes, rendezvous.RendezvousID)
addStaticPeers(wakuNode, options.Filter.Nodes, filter.FilterSubscribeID_v20beta1)
if len(options.Relay.Topics.Value()) == 0 {
options.Relay.Topics = *cli.NewStringSlice(relay.DefaultWakuTopic)
}
var wg sync.WaitGroup
if options.Relay.Enable {
for _, nodeTopic := range options.Relay.Topics.Value() {
for nodeTopic := range pubSubTopicMap {
nodeTopic := nodeTopic
sub, err := wakuNode.Relay().SubscribeToTopic(ctx, nodeTopic)
failOnErr(err, "Error subscring to topic")
@ -435,32 +434,6 @@ func Execute(options NodeOptions) {
}
}
if options.Store.Enable && len(options.Store.ResumeNodes) != 0 {
// TODO: extract this to a function and run it when you go offline
// TODO: determine if a store is listening to a topic
var peerIDs []peer.ID
for _, n := range options.Store.ResumeNodes {
pID, err := wakuNode.AddPeer(n, wakupeerstore.Static, store.StoreID_v20beta4)
if err != nil {
logger.Warn("adding peer to peerstore", logging.MultiAddrs("peer", n), zap.Error(err))
}
peerIDs = append(peerIDs, pID)
}
for _, t := range options.Relay.Topics.Value() {
wg.Add(1)
go func(topic string) {
defer wg.Done()
ctxWithTimeout, ctxCancel := context.WithTimeout(ctx, 20*time.Second)
defer ctxCancel()
if _, err := wakuNode.Store().Resume(ctxWithTimeout, topic, peerIDs); err != nil {
logger.Error("Could not resume history", zap.Error(err))
}
}(t)
}
}
var rpcServer *rpc.WakuRpc
if options.RPCServer.Enable {
rpcServer = rpc.NewWakuRpc(wakuNode, options.RPCServer.Address, options.RPCServer.Port, options.RPCServer.Admin, options.PProf, options.RPCServer.RelayCacheCapacity, logger)
@ -507,6 +480,36 @@ func Execute(options NodeOptions) {
}
}
func processTopics(options NodeOptions) map[string]struct{} {
//Using a map to avoid duplicate pub-sub topics that can result from autosharding
// or same-topic being passed twice.
pubSubTopicMap := make(map[string]struct{})
for _, topic := range options.Relay.Topics.Value() {
pubSubTopicMap[topic] = struct{}{}
}
for _, topic := range options.Relay.PubSubTopics.Value() {
pubSubTopicMap[topic] = struct{}{}
}
//Get pubSub topics from contentTopics if they are as per autosharding
for _, cTopic := range options.Relay.ContentTopics.Value() {
contentTopic, err := wprotocol.StringToContentTopic(cTopic)
if err != nil {
failOnErr(err, "failed to parse content topic")
}
pTopic := wprotocol.GetShardFromContentTopic(contentTopic, wprotocol.GenerationZeroShardsCount)
pubSubTopicMap[pTopic.String()] = struct{}{}
}
//If no topics are passed, then use default waku topic.
if len(pubSubTopicMap) == 0 {
pubSubTopicMap[relay.DefaultWakuTopic] = struct{}{}
}
return pubSubTopicMap
}
func addStaticPeers(wakuNode *node.WakuNode, addresses []multiaddr.Multiaddr, protocols ...protocol.ID) {
for _, addr := range addresses {
_, err := wakuNode.AddPeer(addr, wakupeerstore.Static, protocols...)

View File

@ -26,6 +26,8 @@ type RelayOptions struct {
Enable bool
Topics cli.StringSlice
ProtectedTopics []cliutils.ProtectedTopic
PubSubTopics cli.StringSlice
ContentTopics cli.StringSlice
PeerExchange bool
MinRelayPeersToPublish int
}
@ -76,7 +78,7 @@ type StoreOptions struct {
DatabaseURL string
RetentionTime time.Duration
RetentionMaxMessages int
ResumeNodes []multiaddr.Multiaddr
//ResumeNodes []multiaddr.Multiaddr
Nodes []multiaddr.Multiaddr
Vacuum bool
Migration bool

View File

@ -29,7 +29,6 @@ func main() {
return
}
contentTopic := cTopic.String()
hostAddr, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:0")
key, err := randomHex(32)
if err != nil {

View File

@ -94,7 +94,7 @@ type StaticShardingPubsubTopic struct {
}
// NewStaticShardingPubsubTopic creates a new pubSub topic
func NewStaticShardingPubsubTopic(cluster uint16, shard uint16) NamespacedPubsubTopic {
func NewStaticShardingPubsubTopic(cluster uint16, shard uint16) StaticShardingPubsubTopic {
return StaticShardingPubsubTopic{
kind: StaticSharding,
cluster: cluster,

View File

@ -217,7 +217,7 @@ func FromBitVector(buf []byte) (RelayShards, error) {
// GetShardFromContentTopic runs Autosharding logic and returns a pubSubTopic
// This is based on Autosharding algorithm defined in RFC 51
func GetShardFromContentTopic(topic ContentTopic, shardCount int) NamespacedPubsubTopic {
func GetShardFromContentTopic(topic ContentTopic, shardCount int) StaticShardingPubsubTopic {
bytes := []byte(topic.ApplicationName)
bytes = append(bytes, []byte(fmt.Sprintf("%d", topic.ApplicationVersion))...)

View File

@ -1,7 +1,9 @@
package protocol
import (
"math/rand"
"testing"
"time"
"github.com/stretchr/testify/require"
)
@ -53,6 +55,65 @@ func TestContentTopicAndSharding(t *testing.T) {
require.Equal(t, ct5.Generation, 0)
}
func randomContentTopic() (ContentTopic, error) {
var app = ""
const WordLength = 5
rand.New(rand.NewSource(time.Now().Unix()))
//Generate a random character between lowercase a to z
for i := 0; i < WordLength; i++ {
randomChar := 'a' + rune(rand.Intn(26))
app = app + string(randomChar)
}
version := uint32(1)
var name = ""
for i := 0; i < WordLength; i++ {
randomChar := 'a' + rune(rand.Intn(26))
name = name + string(randomChar)
}
var enc = "proto"
return NewContentTopic(app, version, name, enc)
}
func TestShardChoiceSimulation(t *testing.T) {
//Given
var topics []ContentTopic
for i := 0; i < 100000; i++ {
ct, err := randomContentTopic()
require.NoError(t, err)
topics = append(topics, ct)
}
var counts [GenerationZeroShardsCount]int
// When
for _, topic := range topics {
pubsub := GetShardFromContentTopic(topic, GenerationZeroShardsCount)
counts[pubsub.Shard()]++
}
t.Logf("Total number of topics simulated %d", len(topics))
for i := 0; i < GenerationZeroShardsCount; i++ {
t.Logf("Topics assigned to shard %d is %d", i, counts[i])
}
// Then
for i := 1; i < GenerationZeroShardsCount; i++ {
//t.Logf("float64(counts[%d]) %f float64(counts[%d]) %f", i-1, float64(counts[i-1]), i, float64(counts[i]))
if float64(counts[i-1]) <= (float64(counts[i])*1.05) &&
float64(counts[i]) <= (float64(counts[i-1])*1.05) &&
float64(counts[i-1]) >= (float64(counts[i])*0.95) &&
float64(counts[i]) >= (float64(counts[i-1])*0.95) {
t.Logf("Shard choice simulation successful")
} else {
t.FailNow()
}
}
}
func TestNsPubsubTopic(t *testing.T) {
ns1 := NewNamedShardingPubsubTopic("waku-dev")
require.Equal(t, "/waku/2/waku-dev", ns1.String())