diff --git a/mobile/api.go b/mobile/api.go index 6b653f52..961291c5 100644 --- a/mobile/api.go +++ b/mobile/api.go @@ -405,7 +405,7 @@ func ContentTopic(applicationName string, applicationVersion int, contentTopicNa } func PubsubTopic(name string, encoding string) string { - return protocol.NewPubsubTopic(name, encoding).String() + return protocol.NewNamedShardingPubsubTopic(name + "/" + encoding).String() } func DefaultPubsubTopic() string { diff --git a/waku/v2/protocol/topic.go b/waku/v2/protocol/topic.go index 2c9aba4f..b5f6a7fb 100644 --- a/waku/v2/protocol/topic.go +++ b/waku/v2/protocol/topic.go @@ -3,11 +3,22 @@ package protocol import ( "errors" "fmt" + "runtime/debug" "strconv" "strings" ) +const Waku2PubsubTopicPrefix = "/waku/2" +const StaticShardingPubsubTopicPrefix = Waku2PubsubTopicPrefix + "/rs" + var ErrInvalidFormat = errors.New("invalid format") +var ErrInvalidStructure = errors.New("invalid topic structure") +var ErrInvalidTopicPrefix = errors.New("must start with " + Waku2PubsubTopicPrefix) +var ErrMissingTopicName = errors.New("missing topic-name") +var ErrInvalidShardedTopicPrefix = errors.New("must start with " + StaticShardingPubsubTopicPrefix) +var ErrMissingClusterIndex = errors.New("missing shard_cluster_index") +var ErrMissingShardNumber = errors.New("missing shard_number") +var ErrInvalidNumberFormat = errors.New("only 2^16 numbers are allowed") type ContentTopic struct { ApplicationName string @@ -54,38 +65,156 @@ func StringToContentTopic(s string) (ContentTopic, error) { }, nil } -type PubsubTopic struct { - Name string - Encoding string +type NamespacedPubsubTopicKind int + +const ( + StaticSharding NamespacedPubsubTopicKind = iota + NamedSharding +) + +type ShardedPubsubTopic interface { + String() string + Kind() NamespacedPubsubTopicKind + Equal(ShardedPubsubTopic) bool } -func (t PubsubTopic) String() string { - return fmt.Sprintf("/waku/2/%s/%s", t.Name, t.Encoding) +type NamedShardingPubsubTopic struct { + ShardedPubsubTopic + kind NamespacedPubsubTopicKind + name string } -func DefaultPubsubTopic() PubsubTopic { - return NewPubsubTopic("default-waku", "proto") -} - -func NewPubsubTopic(name string, encoding string) PubsubTopic { - return PubsubTopic{ - Name: name, - Encoding: encoding, +func NewNamedShardingPubsubTopic(name string) ShardedPubsubTopic { + return NamedShardingPubsubTopic{ + kind: NamedSharding, + name: name, } } -func (t PubsubTopic) Equal(t2 PubsubTopic) bool { - return t.Name == t2.Name && t.Encoding == t2.Encoding +func (n NamedShardingPubsubTopic) Kind() NamespacedPubsubTopicKind { + return n.kind } -func StringToPubsubTopic(s string) (PubsubTopic, error) { - p := strings.Split(s, "/") - if len(p) != 5 || p[0] != "" || p[1] != "waku" || p[2] != "2" || p[3] == "" || p[4] == "" { - return PubsubTopic{}, ErrInvalidFormat +func (n NamedShardingPubsubTopic) Name() string { + return n.name +} + +func (s NamedShardingPubsubTopic) Equal(t2 ShardedPubsubTopic) bool { + return s.String() == t2.String() +} + +func (n NamedShardingPubsubTopic) String() string { + return fmt.Sprintf("%s/%s", Waku2PubsubTopicPrefix, n.name) +} + +func (s *NamedShardingPubsubTopic) Parse(topic string) error { + if !strings.HasPrefix(topic, Waku2PubsubTopicPrefix) { + return ErrInvalidTopicPrefix } - return PubsubTopic{ - Name: p[3], - Encoding: p[4], - }, nil + topicName := topic[8:] + if len(topicName) == 0 { + return ErrMissingTopicName + } + + s.kind = NamedSharding + s.name = topicName + + return nil +} + +type StaticShardingPubsubTopic struct { + ShardedPubsubTopic + kind NamespacedPubsubTopicKind + cluster uint16 + shard uint16 +} + +func NewStaticShardingPubsubTopic(cluster uint16, shard uint16) ShardedPubsubTopic { + return StaticShardingPubsubTopic{ + kind: StaticSharding, + cluster: cluster, + shard: shard, + } +} + +func (n StaticShardingPubsubTopic) Cluster() uint16 { + return n.cluster +} + +func (n StaticShardingPubsubTopic) Shard() uint16 { + return n.shard +} + +func (n StaticShardingPubsubTopic) Kind() NamespacedPubsubTopicKind { + return n.kind +} + +func (s StaticShardingPubsubTopic) Equal(t2 ShardedPubsubTopic) bool { + return s.String() == t2.String() +} + +func (n StaticShardingPubsubTopic) String() string { + return fmt.Sprintf("%s/%d/%d", StaticShardingPubsubTopicPrefix, n.cluster, n.shard) +} + +func (s *StaticShardingPubsubTopic) Parse(topic string) error { + if !strings.HasPrefix(topic, StaticShardingPubsubTopicPrefix) { + fmt.Println(topic, StaticShardingPubsubTopicPrefix) + return ErrInvalidShardedTopicPrefix + } + + parts := strings.Split(topic[11:], "/") + if len(parts) != 2 { + return ErrInvalidStructure + } + + clusterPart := parts[0] + if len(clusterPart) == 0 { + return ErrMissingClusterIndex + } + + clusterInt, err := strconv.ParseUint(clusterPart, 10, 16) + if err != nil { + return ErrInvalidNumberFormat + } + + shardPart := parts[1] + if len(shardPart) == 0 { + return ErrMissingShardNumber + } + + shardInt, err := strconv.ParseUint(shardPart, 10, 16) + if err != nil { + return ErrInvalidNumberFormat + } + + s.shard = uint16(shardInt) + s.cluster = uint16(clusterInt) + s.kind = StaticSharding + + return nil +} + +func ToShardedPubsubTopic(topic string) (ShardedPubsubTopic, error) { + if strings.HasPrefix(topic, StaticShardingPubsubTopicPrefix) { + s := StaticShardingPubsubTopic{} + err := s.Parse(topic) + if err != nil { + return nil, err + } + return s, nil + } else { + debug.PrintStack() + s := NamedShardingPubsubTopic{} + err := s.Parse(topic) + if err != nil { + return nil, err + } + return s, nil + } +} + +func DefaultPubsubTopic() ShardedPubsubTopic { + return NewNamedShardingPubsubTopic("default-waku/proto") } diff --git a/waku/v2/protocol/topic_test.go b/waku/v2/protocol/topic_test.go index 07cd2de7..bdc2aa4f 100644 --- a/waku/v2/protocol/topic_test.go +++ b/waku/v2/protocol/topic_test.go @@ -31,27 +31,41 @@ func TestContentTopic(t *testing.T) { require.False(t, ct.Equal(ct3)) } -func TestTopic(t *testing.T) { - topic := NewPubsubTopic("test", "proto") - require.Equal(t, topic.String(), "/waku/2/test/proto") +func TestNsPubsubTopic(t *testing.T) { + ns1 := NewNamedShardingPubsubTopic("waku-dev") + require.Equal(t, "/waku/2/waku-dev", ns1.String()) - _, err := StringToPubsubTopic("/waku/-1/a/b") - require.Error(t, ErrInvalidFormat, err) + ns2 := NewStaticShardingPubsubTopic(0, 2) + require.Equal(t, "/waku/2/rs/0/2", ns2.String()) - _, err = StringToPubsubTopic("waku/2/a/b") - require.Error(t, ErrInvalidFormat, err) + require.True(t, ns1.Equal(ns1)) + require.False(t, ns1.Equal(ns2)) - _, err = StringToPubsubTopic("////") - require.Error(t, ErrInvalidFormat, err) - - _, err = StringToPubsubTopic("/waku/2/a") - require.Error(t, ErrInvalidFormat, err) - - topic2, err := StringToPubsubTopic("/waku/2/test/proto") + topic := "/waku/2/waku-dev" + ns, err := ToShardedPubsubTopic(topic) require.NoError(t, err) - require.Equal(t, topic.String(), topic2.String()) - require.True(t, topic.Equal(topic2)) + require.Equal(t, NamedSharding, ns.Kind()) + require.Equal(t, "waku-dev", ns.(NamedShardingPubsubTopic).Name()) - topic3 := NewPubsubTopic("test2", "proto") - require.False(t, topic.Equal(topic3)) + topic = "/waku/2/rs/16/42" + ns, err = ToShardedPubsubTopic(topic) + require.NoError(t, err) + require.Equal(t, StaticSharding, ns.Kind()) + require.Equal(t, uint16(16), ns.(StaticShardingPubsubTopic).Cluster()) + require.Equal(t, uint16(42), ns.(StaticShardingPubsubTopic).Shard()) + + topic = "/waku/1/rs/16/42" + _, err = ToShardedPubsubTopic(topic) + require.Error(t, err) + require.ErrorIs(t, err, ErrInvalidTopicPrefix) + + topic = "/waku/2/rs//02" + _, err = ToShardedPubsubTopic(topic) + require.Error(t, err) + require.ErrorIs(t, err, ErrMissingClusterIndex) + + topic = "/waku/2/rs/xx/77" + _, err = ToShardedPubsubTopic(topic) + require.Error(t, err) + require.ErrorIs(t, err, ErrInvalidNumberFormat) }