go-waku/waku/v2/protocol/pubsub_topic.go

199 lines
5.5 KiB
Go
Raw Normal View History

2021-11-10 14:28:45 +00:00
package protocol
import (
"errors"
"fmt"
"strconv"
"strings"
)
// Waku2PubsubTopicPrefix is the expected prefix to be used for pubsub topics
2023-03-07 22:11:52 +00:00
const Waku2PubsubTopicPrefix = "/waku/2"
// StaticShardingPubsubTopicPrefix is the expected prefix to be used for static sharding pubsub topics
2023-03-07 22:11:52 +00:00
const StaticShardingPubsubTopicPrefix = Waku2PubsubTopicPrefix + "/rs"
// ErrInvalidStructure indicates that the pubsub topic is malformed
2023-03-07 22:11:52 +00:00
var ErrInvalidStructure = errors.New("invalid topic structure")
// ErrInvalidTopicPrefix indicates that the pubsub topic is missing the prefix /waku/2
2023-03-07 22:11:52 +00:00
var ErrInvalidTopicPrefix = errors.New("must start with " + Waku2PubsubTopicPrefix)
var ErrMissingTopicName = errors.New("missing topic-name")
var ErrInvalidShardedTopicPrefix = errors.New("must start with " + StaticShardingPubsubTopicPrefix)
var ErrMissingClusterIndex = errors.New("missing shard_cluster_index")
var ErrMissingShardNumber = errors.New("missing shard_number")
// ErrInvalidNumberFormat indicates that a number exceeds the allowed range
2023-03-07 22:11:52 +00:00
var ErrInvalidNumberFormat = errors.New("only 2^16 numbers are allowed")
2021-11-10 14:28:45 +00:00
// NamespacedPubsubTopicKind used to represent kind of NamespacedPubsubTopicKind
2023-03-07 22:11:52 +00:00
type NamespacedPubsubTopicKind int
const (
StaticSharding NamespacedPubsubTopicKind = iota
NamedSharding
)
// NamespacedPubsubTopic is an interface for namespace based pubSub topic
2023-04-20 18:51:13 +00:00
type NamespacedPubsubTopic interface {
2023-03-07 22:11:52 +00:00
String() string
Kind() NamespacedPubsubTopicKind
2023-04-20 18:51:13 +00:00
Equal(NamespacedPubsubTopic) bool
2023-03-07 22:11:52 +00:00
}
// NamedShardingPubsubTopic is object for a NamedSharding type pubSub topic
2023-03-07 22:11:52 +00:00
type NamedShardingPubsubTopic struct {
2023-04-20 18:51:13 +00:00
NamespacedPubsubTopic
2023-03-07 22:11:52 +00:00
kind NamespacedPubsubTopicKind
name string
}
// NewNamedShardingPubsubTopic creates a new NamedShardingPubSubTopic
2023-04-20 18:51:13 +00:00
func NewNamedShardingPubsubTopic(name string) NamespacedPubsubTopic {
2023-03-07 22:11:52 +00:00
return NamedShardingPubsubTopic{
kind: NamedSharding,
name: name,
}
}
// Kind returns the type of PubsubTopic whether it is StaticShared or NamedSharded
2023-03-07 22:11:52 +00:00
func (n NamedShardingPubsubTopic) Kind() NamespacedPubsubTopicKind {
return n.kind
}
// Name is the name of the NamedSharded pubsub topic.
2023-03-07 22:11:52 +00:00
func (n NamedShardingPubsubTopic) Name() string {
return n.name
2021-11-10 14:28:45 +00:00
}
// Equal compares NamedShardingPubsubTopic
func (n NamedShardingPubsubTopic) Equal(t2 NamespacedPubsubTopic) bool {
return n.String() == t2.String()
2021-11-10 14:28:45 +00:00
}
// String formats NamedShardingPubsubTopic to RFC 23 specific string format for pubsub topic.
2023-03-07 22:11:52 +00:00
func (n NamedShardingPubsubTopic) String() string {
return fmt.Sprintf("%s/%s", Waku2PubsubTopicPrefix, n.name)
2021-11-10 14:28:45 +00:00
}
// Parse parses a topic string into a NamedShardingPubsubTopic
func (n *NamedShardingPubsubTopic) Parse(topic string) error {
2023-03-07 22:11:52 +00:00
if !strings.HasPrefix(topic, Waku2PubsubTopicPrefix) {
return ErrInvalidTopicPrefix
2021-11-10 14:28:45 +00:00
}
2023-03-07 22:11:52 +00:00
topicName := topic[8:]
if len(topicName) == 0 {
return ErrMissingTopicName
}
n.kind = NamedSharding
n.name = topicName
2023-03-07 22:11:52 +00:00
return nil
2021-11-10 14:28:45 +00:00
}
// StaticShardingPubsubTopic describes a pubSub topic as per StaticSharding
2023-03-07 22:11:52 +00:00
type StaticShardingPubsubTopic struct {
2023-04-20 18:51:13 +00:00
NamespacedPubsubTopic
2023-03-07 22:11:52 +00:00
kind NamespacedPubsubTopicKind
cluster uint16
shard uint16
2021-11-10 14:28:45 +00:00
}
// NewStaticShardingPubsubTopic creates a new pubSub topic
func NewStaticShardingPubsubTopic(cluster uint16, shard uint16) StaticShardingPubsubTopic {
2023-03-07 22:11:52 +00:00
return StaticShardingPubsubTopic{
kind: StaticSharding,
cluster: cluster,
shard: shard,
2021-11-10 14:28:45 +00:00
}
2023-03-07 22:11:52 +00:00
}
2021-11-10 14:28:45 +00:00
// Cluster returns the sharded cluster index
func (s StaticShardingPubsubTopic) Cluster() uint16 {
return s.cluster
2023-03-07 22:11:52 +00:00
}
// Shard returns the shard number
func (s StaticShardingPubsubTopic) Shard() uint16 {
return s.shard
2023-03-07 22:11:52 +00:00
}
// Kind returns the type of PubsubTopic whether it is StaticShared or NamedSharded
func (s StaticShardingPubsubTopic) Kind() NamespacedPubsubTopicKind {
return s.kind
2023-03-07 22:11:52 +00:00
}
// Equal compares StaticShardingPubsubTopic
2023-04-20 18:51:13 +00:00
func (s StaticShardingPubsubTopic) Equal(t2 NamespacedPubsubTopic) bool {
2023-03-07 22:11:52 +00:00
return s.String() == t2.String()
}
// String formats StaticShardingPubsubTopic to RFC 23 specific string format for pubsub topic.
func (s StaticShardingPubsubTopic) String() string {
return fmt.Sprintf("%s/%d/%d", StaticShardingPubsubTopicPrefix, s.cluster, s.shard)
2023-03-07 22:11:52 +00:00
}
// Parse parses a topic string into a StaticShardingPubsubTopic
2023-03-07 22:11:52 +00:00
func (s *StaticShardingPubsubTopic) Parse(topic string) error {
if !strings.HasPrefix(topic, StaticShardingPubsubTopicPrefix) {
return ErrInvalidShardedTopicPrefix
}
parts := strings.Split(topic[11:], "/")
if len(parts) != 2 {
return ErrInvalidStructure
}
clusterPart := parts[0]
if len(clusterPart) == 0 {
return ErrMissingClusterIndex
}
clusterInt, err := strconv.ParseUint(clusterPart, 10, 16)
if err != nil {
return ErrInvalidNumberFormat
}
shardPart := parts[1]
if len(shardPart) == 0 {
return ErrMissingShardNumber
}
shardInt, err := strconv.ParseUint(shardPart, 10, 16)
if err != nil {
return ErrInvalidNumberFormat
}
s.shard = uint16(shardInt)
s.cluster = uint16(clusterInt)
s.kind = StaticSharding
return nil
}
// ToShardedPubsubTopic takes a pubSub topic string and creates a NamespacedPubsubTopic object.
2023-04-20 18:51:13 +00:00
func ToShardedPubsubTopic(topic string) (NamespacedPubsubTopic, error) {
2023-03-07 22:11:52 +00:00
if strings.HasPrefix(topic, StaticShardingPubsubTopicPrefix) {
s := StaticShardingPubsubTopic{}
err := s.Parse(topic)
if err != nil {
return nil, err
}
return s, nil
}
s := NamedShardingPubsubTopic{}
err := s.Parse(topic)
if err != nil {
return nil, err
}
return s, nil
2023-03-07 22:11:52 +00:00
}
// DefaultPubsubTopic is the default pubSub topic used in waku
2023-04-20 18:51:13 +00:00
func DefaultPubsubTopic() NamespacedPubsubTopic {
2023-03-07 22:11:52 +00:00
return NewNamedShardingPubsubTopic("default-waku/proto")
2021-11-10 14:28:45 +00:00
}