refactor: namespaced pubsub topics

This commit is contained in:
Richard Ramos 2023-03-07 18:11:52 -04:00 committed by RichΛrd
parent 3bba1a86f1
commit aeddc7848a
3 changed files with 185 additions and 42 deletions

View File

@ -405,7 +405,7 @@ func ContentTopic(applicationName string, applicationVersion int, contentTopicNa
}
func PubsubTopic(name string, encoding string) string {
return protocol.NewPubsubTopic(name, encoding).String()
return protocol.NewNamedShardingPubsubTopic(name + "/" + encoding).String()
}
func DefaultPubsubTopic() string {

View File

@ -3,11 +3,22 @@ package protocol
import (
"errors"
"fmt"
"runtime/debug"
"strconv"
"strings"
)
const Waku2PubsubTopicPrefix = "/waku/2"
const StaticShardingPubsubTopicPrefix = Waku2PubsubTopicPrefix + "/rs"
var ErrInvalidFormat = errors.New("invalid format")
var ErrInvalidStructure = errors.New("invalid topic structure")
var ErrInvalidTopicPrefix = errors.New("must start with " + Waku2PubsubTopicPrefix)
var ErrMissingTopicName = errors.New("missing topic-name")
var ErrInvalidShardedTopicPrefix = errors.New("must start with " + StaticShardingPubsubTopicPrefix)
var ErrMissingClusterIndex = errors.New("missing shard_cluster_index")
var ErrMissingShardNumber = errors.New("missing shard_number")
var ErrInvalidNumberFormat = errors.New("only 2^16 numbers are allowed")
type ContentTopic struct {
ApplicationName string
@ -54,38 +65,156 @@ func StringToContentTopic(s string) (ContentTopic, error) {
}, nil
}
type PubsubTopic struct {
Name string
Encoding string
type NamespacedPubsubTopicKind int
const (
StaticSharding NamespacedPubsubTopicKind = iota
NamedSharding
)
type ShardedPubsubTopic interface {
String() string
Kind() NamespacedPubsubTopicKind
Equal(ShardedPubsubTopic) bool
}
func (t PubsubTopic) String() string {
return fmt.Sprintf("/waku/2/%s/%s", t.Name, t.Encoding)
type NamedShardingPubsubTopic struct {
ShardedPubsubTopic
kind NamespacedPubsubTopicKind
name string
}
func DefaultPubsubTopic() PubsubTopic {
return NewPubsubTopic("default-waku", "proto")
}
func NewPubsubTopic(name string, encoding string) PubsubTopic {
return PubsubTopic{
Name: name,
Encoding: encoding,
func NewNamedShardingPubsubTopic(name string) ShardedPubsubTopic {
return NamedShardingPubsubTopic{
kind: NamedSharding,
name: name,
}
}
func (t PubsubTopic) Equal(t2 PubsubTopic) bool {
return t.Name == t2.Name && t.Encoding == t2.Encoding
func (n NamedShardingPubsubTopic) Kind() NamespacedPubsubTopicKind {
return n.kind
}
func StringToPubsubTopic(s string) (PubsubTopic, error) {
p := strings.Split(s, "/")
if len(p) != 5 || p[0] != "" || p[1] != "waku" || p[2] != "2" || p[3] == "" || p[4] == "" {
return PubsubTopic{}, ErrInvalidFormat
func (n NamedShardingPubsubTopic) Name() string {
return n.name
}
func (s NamedShardingPubsubTopic) Equal(t2 ShardedPubsubTopic) bool {
return s.String() == t2.String()
}
func (n NamedShardingPubsubTopic) String() string {
return fmt.Sprintf("%s/%s", Waku2PubsubTopicPrefix, n.name)
}
func (s *NamedShardingPubsubTopic) Parse(topic string) error {
if !strings.HasPrefix(topic, Waku2PubsubTopicPrefix) {
return ErrInvalidTopicPrefix
}
return PubsubTopic{
Name: p[3],
Encoding: p[4],
}, nil
topicName := topic[8:]
if len(topicName) == 0 {
return ErrMissingTopicName
}
s.kind = NamedSharding
s.name = topicName
return nil
}
type StaticShardingPubsubTopic struct {
ShardedPubsubTopic
kind NamespacedPubsubTopicKind
cluster uint16
shard uint16
}
func NewStaticShardingPubsubTopic(cluster uint16, shard uint16) ShardedPubsubTopic {
return StaticShardingPubsubTopic{
kind: StaticSharding,
cluster: cluster,
shard: shard,
}
}
func (n StaticShardingPubsubTopic) Cluster() uint16 {
return n.cluster
}
func (n StaticShardingPubsubTopic) Shard() uint16 {
return n.shard
}
func (n StaticShardingPubsubTopic) Kind() NamespacedPubsubTopicKind {
return n.kind
}
func (s StaticShardingPubsubTopic) Equal(t2 ShardedPubsubTopic) bool {
return s.String() == t2.String()
}
func (n StaticShardingPubsubTopic) String() string {
return fmt.Sprintf("%s/%d/%d", StaticShardingPubsubTopicPrefix, n.cluster, n.shard)
}
func (s *StaticShardingPubsubTopic) Parse(topic string) error {
if !strings.HasPrefix(topic, StaticShardingPubsubTopicPrefix) {
fmt.Println(topic, StaticShardingPubsubTopicPrefix)
return ErrInvalidShardedTopicPrefix
}
parts := strings.Split(topic[11:], "/")
if len(parts) != 2 {
return ErrInvalidStructure
}
clusterPart := parts[0]
if len(clusterPart) == 0 {
return ErrMissingClusterIndex
}
clusterInt, err := strconv.ParseUint(clusterPart, 10, 16)
if err != nil {
return ErrInvalidNumberFormat
}
shardPart := parts[1]
if len(shardPart) == 0 {
return ErrMissingShardNumber
}
shardInt, err := strconv.ParseUint(shardPart, 10, 16)
if err != nil {
return ErrInvalidNumberFormat
}
s.shard = uint16(shardInt)
s.cluster = uint16(clusterInt)
s.kind = StaticSharding
return nil
}
func ToShardedPubsubTopic(topic string) (ShardedPubsubTopic, error) {
if strings.HasPrefix(topic, StaticShardingPubsubTopicPrefix) {
s := StaticShardingPubsubTopic{}
err := s.Parse(topic)
if err != nil {
return nil, err
}
return s, nil
} else {
debug.PrintStack()
s := NamedShardingPubsubTopic{}
err := s.Parse(topic)
if err != nil {
return nil, err
}
return s, nil
}
}
func DefaultPubsubTopic() ShardedPubsubTopic {
return NewNamedShardingPubsubTopic("default-waku/proto")
}

View File

@ -31,27 +31,41 @@ func TestContentTopic(t *testing.T) {
require.False(t, ct.Equal(ct3))
}
func TestTopic(t *testing.T) {
topic := NewPubsubTopic("test", "proto")
require.Equal(t, topic.String(), "/waku/2/test/proto")
func TestNsPubsubTopic(t *testing.T) {
ns1 := NewNamedShardingPubsubTopic("waku-dev")
require.Equal(t, "/waku/2/waku-dev", ns1.String())
_, err := StringToPubsubTopic("/waku/-1/a/b")
require.Error(t, ErrInvalidFormat, err)
ns2 := NewStaticShardingPubsubTopic(0, 2)
require.Equal(t, "/waku/2/rs/0/2", ns2.String())
_, err = StringToPubsubTopic("waku/2/a/b")
require.Error(t, ErrInvalidFormat, err)
require.True(t, ns1.Equal(ns1))
require.False(t, ns1.Equal(ns2))
_, err = StringToPubsubTopic("////")
require.Error(t, ErrInvalidFormat, err)
_, err = StringToPubsubTopic("/waku/2/a")
require.Error(t, ErrInvalidFormat, err)
topic2, err := StringToPubsubTopic("/waku/2/test/proto")
topic := "/waku/2/waku-dev"
ns, err := ToShardedPubsubTopic(topic)
require.NoError(t, err)
require.Equal(t, topic.String(), topic2.String())
require.True(t, topic.Equal(topic2))
require.Equal(t, NamedSharding, ns.Kind())
require.Equal(t, "waku-dev", ns.(NamedShardingPubsubTopic).Name())
topic3 := NewPubsubTopic("test2", "proto")
require.False(t, topic.Equal(topic3))
topic = "/waku/2/rs/16/42"
ns, err = ToShardedPubsubTopic(topic)
require.NoError(t, err)
require.Equal(t, StaticSharding, ns.Kind())
require.Equal(t, uint16(16), ns.(StaticShardingPubsubTopic).Cluster())
require.Equal(t, uint16(42), ns.(StaticShardingPubsubTopic).Shard())
topic = "/waku/1/rs/16/42"
_, err = ToShardedPubsubTopic(topic)
require.Error(t, err)
require.ErrorIs(t, err, ErrInvalidTopicPrefix)
topic = "/waku/2/rs//02"
_, err = ToShardedPubsubTopic(topic)
require.Error(t, err)
require.ErrorIs(t, err, ErrMissingClusterIndex)
topic = "/waku/2/rs/xx/77"
_, err = ToShardedPubsubTopic(topic)
require.Error(t, err)
require.ErrorIs(t, err, ErrInvalidNumberFormat)
}