diff --git a/library/c/api.go b/library/c/api.go index aa311a65..81ba8f57 100644 --- a/library/c/api.go +++ b/library/c/api.go @@ -199,8 +199,8 @@ func waku_peer_cnt(onOkCb C.WakuCallBack, onErrCb C.WakuCallBack) C.int { // //export waku_content_topic func waku_content_topic(applicationName *C.char, applicationVersion C.uint, contentTopicName *C.char, encoding *C.char, onOkCb C.WakuCallBack) C.int { - contentTopic := protocol.NewContentTopic(C.GoString(applicationName), uint(applicationVersion), C.GoString(contentTopicName), C.GoString(encoding)).String() - return execOkCB(onOkCb, contentTopic) + contentTopic, _ := protocol.NewContentTopic(C.GoString(applicationName), uint32(applicationVersion), C.GoString(contentTopicName), C.GoString(encoding)) + return execOkCB(onOkCb, contentTopic.String()) } // Create a pubsub topic string according to RFC 23 diff --git a/library/mobile/api.go b/library/mobile/api.go index 5e4cacc6..5a43b436 100644 --- a/library/mobile/api.go +++ b/library/mobile/api.go @@ -74,7 +74,8 @@ func PeerCnt() string { // ContentTopic creates a content topic string according to RFC 23 func ContentTopic(applicationName string, applicationVersion int, contentTopicName string, encoding string) string { - return protocol.NewContentTopic(applicationName, uint(applicationVersion), contentTopicName, encoding).String() + contentTopic, _ := protocol.NewContentTopic(applicationName, uint32(applicationVersion), contentTopicName, encoding) + return contentTopic.String() } // PubsubTopic creates a pubsub topic string according to RFC 23 diff --git a/library/node.go b/library/node.go index 27ab9b05..587ffc87 100644 --- a/library/node.go +++ b/library/node.go @@ -336,7 +336,8 @@ func PeerCnt() (int, error) { // ContentTopic creates a content topic string according to RFC 23 func ContentTopic(applicationName string, applicationVersion int, contentTopicName string, encoding string) string { - return protocol.NewContentTopic(applicationName, uint(applicationVersion), contentTopicName, encoding).String() + contentTopic, _ := protocol.NewContentTopic(applicationName, uint32(applicationVersion), contentTopicName, encoding) + return contentTopic.String() } // PubsubTopic creates a pubsub topic string according to RFC 23 diff --git a/waku/v2/protocol/content_topic.go b/waku/v2/protocol/content_topic.go new file mode 100644 index 00000000..38caed6d --- /dev/null +++ b/waku/v2/protocol/content_topic.go @@ -0,0 +1,127 @@ +package protocol + +import ( + "errors" + "fmt" + "strconv" + "strings" +) + +// DefaultContentTopic is the default content topic used in Waku network if no content topic is specified. +const DefaultContentTopic = "/waku/2/default-content/proto" + +var ErrInvalidFormat = errors.New("invalid format") +var ErrMissingGeneration = errors.New("missing part: generation") +var ErrInvalidGeneration = errors.New("generation should be a number") + +// ContentTopic is used for content based. +type ContentTopic struct { + ContentTopicParams + ApplicationName string + ApplicationVersion uint32 + ContentTopicName string + Encoding string +} + +// ContentTopicParams contains all the optional params for a content topic +type ContentTopicParams struct { + Generation int +} + +// Equal method used to compare 2 contentTopicParams +func (ctp ContentTopicParams) Equal(ctp2 ContentTopicParams) bool { + return ctp.Generation == ctp2.Generation +} + +// ContentTopicOption is following the options pattern to define optional params +type ContentTopicOption func(*ContentTopicParams) + +// String formats a content topic in string format as per RFC 23. +func (ct ContentTopic) String() string { + return fmt.Sprintf("/%s/%d/%s/%s", ct.ApplicationName, ct.ApplicationVersion, ct.ContentTopicName, ct.Encoding) +} + +// NewContentTopic creates a new content topic based on params specified. +// Returns ErrInvalidGeneration if an unsupported generation is specified. +func NewContentTopic(applicationName string, applicationVersion uint32, + contentTopicName string, encoding string, opts ...ContentTopicOption) (ContentTopic, error) { + + params := new(ContentTopicParams) + optList := DefaultOptions() + optList = append(optList, opts...) + for _, opt := range optList { + opt(params) + } + if params.Generation > 0 { + return ContentTopic{}, ErrInvalidGeneration + } + return ContentTopic{ + ContentTopicParams: *params, + ApplicationName: applicationName, + ApplicationVersion: applicationVersion, + ContentTopicName: contentTopicName, + Encoding: encoding, + }, nil +} + +// WithGeneration option can be used to specify explicitly a generation for contentTopic +func WithGeneration(generation int) ContentTopicOption { + return func(params *ContentTopicParams) { + params.Generation = generation + } +} + +// DefaultOptions sets default values for contentTopic optional params. +func DefaultOptions() []ContentTopicOption { + return []ContentTopicOption{ + WithGeneration(0), + } +} + +// Equal to compare 2 content topics. +func (ct ContentTopic) Equal(ct2 ContentTopic) bool { + return ct.ApplicationName == ct2.ApplicationName && ct.ApplicationVersion == ct2.ApplicationVersion && + ct.ContentTopicName == ct2.ContentTopicName && ct.Encoding == ct2.Encoding && + ct.ContentTopicParams.Equal(ct2.ContentTopicParams) +} + +// StringToContentTopic can be used to create a ContentTopic object from a string +func StringToContentTopic(s string) (ContentTopic, error) { + p := strings.Split(s, "/") + switch len(p) { + case 5: + vNum, err := strconv.ParseUint(p[2], 10, 32) + if err != nil { + return ContentTopic{}, ErrInvalidFormat + } + + return ContentTopic{ + ApplicationName: p[1], + ApplicationVersion: uint32(vNum), + ContentTopicName: p[3], + Encoding: p[4], + }, nil + case 6: + if len(p[1]) == 0 { + return ContentTopic{}, ErrMissingGeneration + } + generation, err := strconv.Atoi(p[1]) + if err != nil || generation > 0 { + return ContentTopic{}, ErrInvalidGeneration + } + vNum, err := strconv.ParseUint(p[3], 10, 32) + if err != nil { + return ContentTopic{}, ErrInvalidFormat + } + + return ContentTopic{ + ContentTopicParams: ContentTopicParams{Generation: generation}, + ApplicationName: p[2], + ApplicationVersion: uint32(vNum), + ContentTopicName: p[4], + Encoding: p[5], + }, nil + default: + return ContentTopic{}, ErrInvalidFormat + } +} diff --git a/waku/v2/protocol/topic.go b/waku/v2/protocol/pubsub_topic.go similarity index 64% rename from waku/v2/protocol/topic.go rename to waku/v2/protocol/pubsub_topic.go index dd16c136..9ed1f030 100644 --- a/waku/v2/protocol/topic.go +++ b/waku/v2/protocol/pubsub_topic.go @@ -10,7 +10,6 @@ import ( const Waku2PubsubTopicPrefix = "/waku/2" const StaticShardingPubsubTopicPrefix = Waku2PubsubTopicPrefix + "/rs" -var ErrInvalidFormat = errors.New("invalid format") var ErrInvalidStructure = errors.New("invalid topic structure") var ErrInvalidTopicPrefix = errors.New("must start with " + Waku2PubsubTopicPrefix) var ErrMissingTopicName = errors.New("missing topic-name") @@ -19,51 +18,7 @@ var ErrMissingClusterIndex = errors.New("missing shard_cluster_index") var ErrMissingShardNumber = errors.New("missing shard_number") var ErrInvalidNumberFormat = errors.New("only 2^16 numbers are allowed") -type ContentTopic struct { - ApplicationName string - ApplicationVersion uint - ContentTopicName string - Encoding string -} - -func (ct ContentTopic) String() string { - return fmt.Sprintf("/%s/%d/%s/%s", ct.ApplicationName, ct.ApplicationVersion, ct.ContentTopicName, ct.Encoding) -} - -func NewContentTopic(applicationName string, applicationVersion uint, contentTopicName string, encoding string) ContentTopic { - return ContentTopic{ - ApplicationName: applicationName, - ApplicationVersion: applicationVersion, - ContentTopicName: contentTopicName, - Encoding: encoding, - } -} - -func (ct ContentTopic) Equal(ct2 ContentTopic) bool { - return ct.ApplicationName == ct2.ApplicationName && ct.ApplicationVersion == ct2.ApplicationVersion && - ct.ContentTopicName == ct2.ContentTopicName && ct.Encoding == ct2.Encoding -} - -func StringToContentTopic(s string) (ContentTopic, error) { - p := strings.Split(s, "/") - - if len(p) != 5 || p[0] != "" || p[1] == "" || p[2] == "" || p[3] == "" || p[4] == "" { - return ContentTopic{}, ErrInvalidFormat - } - - vNum, err := strconv.ParseUint(p[2], 10, 32) - if err != nil { - return ContentTopic{}, ErrInvalidFormat - } - - return ContentTopic{ - ApplicationName: p[1], - ApplicationVersion: uint(vNum), - ContentTopicName: p[3], - Encoding: p[4], - }, nil -} - +// NamespacedPubsubTopicKind used to represent kind of NamespacedPubsubTopicKind type NamespacedPubsubTopicKind int const ( @@ -71,18 +26,21 @@ const ( NamedSharding ) +// NamespacedPubsubTopic is an interface for namespace based pubSub topic type NamespacedPubsubTopic interface { String() string Kind() NamespacedPubsubTopicKind Equal(NamespacedPubsubTopic) bool } +// NamedShardingPubsubTopic is object for a NamedSharding type pubSub topic type NamedShardingPubsubTopic struct { NamespacedPubsubTopic kind NamespacedPubsubTopicKind name string } +// NewNamedShardingPubsubTopic creates a new NamedShardingPubSubTopic func NewNamedShardingPubsubTopic(name string) NamespacedPubsubTopic { return NamedShardingPubsubTopic{ kind: NamedSharding, @@ -90,23 +48,28 @@ func NewNamedShardingPubsubTopic(name string) NamespacedPubsubTopic { } } +// Kind returns the type of PubsubTopic whether it is StaticShared or NamedSharded func (n NamedShardingPubsubTopic) Kind() NamespacedPubsubTopicKind { return n.kind } +// Name is the name of the NamedSharded pubsub topic. func (n NamedShardingPubsubTopic) Name() string { return n.name } -func (s NamedShardingPubsubTopic) Equal(t2 NamespacedPubsubTopic) bool { - return s.String() == t2.String() +// Equal compares NamedShardingPubsubTopic +func (n NamedShardingPubsubTopic) Equal(t2 NamespacedPubsubTopic) bool { + return n.String() == t2.String() } +// String formats NamedShardingPubsubTopic to RFC 23 specific string format for pubsub topic. func (n NamedShardingPubsubTopic) String() string { return fmt.Sprintf("%s/%s", Waku2PubsubTopicPrefix, n.name) } -func (s *NamedShardingPubsubTopic) Parse(topic string) error { +// Parse parses a topic string into a NamedShardingPubsubTopic +func (n *NamedShardingPubsubTopic) Parse(topic string) error { if !strings.HasPrefix(topic, Waku2PubsubTopicPrefix) { return ErrInvalidTopicPrefix } @@ -116,12 +79,13 @@ func (s *NamedShardingPubsubTopic) Parse(topic string) error { return ErrMissingTopicName } - s.kind = NamedSharding - s.name = topicName + n.kind = NamedSharding + n.name = topicName return nil } +// StaticShardingPubsubTopic describes a pubSub topic as per StaticSharding type StaticShardingPubsubTopic struct { NamespacedPubsubTopic kind NamespacedPubsubTopicKind @@ -129,6 +93,7 @@ type StaticShardingPubsubTopic struct { shard uint16 } +// NewStaticShardingPubsubTopic creates a new pubSub topic func NewStaticShardingPubsubTopic(cluster uint16, shard uint16) NamespacedPubsubTopic { return StaticShardingPubsubTopic{ kind: StaticSharding, @@ -137,26 +102,32 @@ func NewStaticShardingPubsubTopic(cluster uint16, shard uint16) NamespacedPubsub } } -func (n StaticShardingPubsubTopic) Cluster() uint16 { - return n.cluster +// Cluster returns the sharded cluster index +func (s StaticShardingPubsubTopic) Cluster() uint16 { + return s.cluster } -func (n StaticShardingPubsubTopic) Shard() uint16 { - return n.shard +// Cluster returns the shard number +func (s StaticShardingPubsubTopic) Shard() uint16 { + return s.shard } -func (n StaticShardingPubsubTopic) Kind() NamespacedPubsubTopicKind { - return n.kind +// Kind returns the type of PubsubTopic whether it is StaticShared or NamedSharded +func (s StaticShardingPubsubTopic) Kind() NamespacedPubsubTopicKind { + return s.kind } +// Equal compares StaticShardingPubsubTopic func (s StaticShardingPubsubTopic) Equal(t2 NamespacedPubsubTopic) bool { return s.String() == t2.String() } -func (n StaticShardingPubsubTopic) String() string { - return fmt.Sprintf("%s/%d/%d", StaticShardingPubsubTopicPrefix, n.cluster, n.shard) +// String formats StaticShardingPubsubTopic to RFC 23 specific string format for pubsub topic. +func (s StaticShardingPubsubTopic) String() string { + return fmt.Sprintf("%s/%d/%d", StaticShardingPubsubTopicPrefix, s.cluster, s.shard) } +// Parse parses a topic string into a StaticShardingPubsubTopic func (s *StaticShardingPubsubTopic) Parse(topic string) error { if !strings.HasPrefix(topic, StaticShardingPubsubTopicPrefix) { return ErrInvalidShardedTopicPrefix @@ -194,6 +165,7 @@ func (s *StaticShardingPubsubTopic) Parse(topic string) error { return nil } +// ToShardedPubsubTopic takes a pubSub topic string and creates a NamespacedPubsubTopic object. func ToShardedPubsubTopic(topic string) (NamespacedPubsubTopic, error) { if strings.HasPrefix(topic, StaticShardingPubsubTopicPrefix) { s := StaticShardingPubsubTopic{} @@ -212,6 +184,7 @@ func ToShardedPubsubTopic(topic string) (NamespacedPubsubTopic, error) { } } +// DefaultPubsubTopic is the default pubSub topic used in waku func DefaultPubsubTopic() NamespacedPubsubTopic { return NewNamedShardingPubsubTopic("default-waku/proto") } diff --git a/waku/v2/protocol/shard.go b/waku/v2/protocol/shard.go index 4795524b..e98496ac 100644 --- a/waku/v2/protocol/shard.go +++ b/waku/v2/protocol/shard.go @@ -6,10 +6,19 @@ import ( "fmt" "math" "strings" + + "github.com/waku-org/go-waku/waku/v2/hash" ) const MaxShardIndex = uint16(1023) +// ClusterIndex is the clusterID used in sharding space. +// For indices allocation and other magic numbers refer to RFC 51 +const ClusterIndex = 1 + +// GenerationZeroShardsCount is number of shards supported in generation-0 +const GenerationZeroShardsCount = 8 + type RelayShards struct { Cluster uint16 Indices []uint16 @@ -205,3 +214,18 @@ func FromBitVector(buf []byte) (RelayShards, error) { return RelayShards{Cluster: cluster, Indices: indices}, nil } + +// GetShardFromContentTopic runs Autosharding logic and returns a pubSubTopic +// This is based on Autosharding algorithm defined in RFC 51 +func GetShardFromContentTopic(topic ContentTopic, shardCount int) NamespacedPubsubTopic { + bytes := []byte(topic.ApplicationName) + bytes = append(bytes, []byte(fmt.Sprintf("%d", topic.ApplicationVersion))...) + + hash := hash.SHA256(bytes) + //We only use the last 64 bits of the hash as having more shards is unlikely. + hashValue := binary.BigEndian.Uint64(hash[24:]) + + shard := hashValue % uint64(shardCount) + + return NewStaticShardingPubsubTopic(ClusterIndex, uint16(shard)) +} diff --git a/waku/v2/protocol/store/waku_store_pagination_test.go b/waku/v2/protocol/store/waku_store_pagination_test.go index 73e54e46..673b75d3 100644 --- a/waku/v2/protocol/store/waku_store_pagination_test.go +++ b/waku/v2/protocol/store/waku_store_pagination_test.go @@ -27,14 +27,14 @@ func TestIndexComputation(t *testing.T) { msg1 := &wpb.WakuMessage{ Payload: []byte{1, 2, 3}, Timestamp: 123, - ContentTopic: "/waku/2/default-content/proto", + ContentTopic: protocol.DefaultContentTopic, } idx1 := protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), "test").Index() msg2 := &wpb.WakuMessage{ Payload: []byte{1, 2, 3}, Timestamp: 123, - ContentTopic: "/waku/2/default-content/proto", + ContentTopic: protocol.DefaultContentTopic, } idx2 := protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), "test").Index() diff --git a/waku/v2/protocol/topic_test.go b/waku/v2/protocol/topic_test.go index bdc2aa4f..9b5dbe9b 100644 --- a/waku/v2/protocol/topic_test.go +++ b/waku/v2/protocol/topic_test.go @@ -6,11 +6,12 @@ import ( "github.com/stretchr/testify/require" ) -func TestContentTopic(t *testing.T) { - ct := NewContentTopic("waku", 2, "test", "proto") +func TestContentTopicAndSharding(t *testing.T) { + ct, err := NewContentTopic("waku", 2, "test", "proto") + require.NoError(t, err) require.Equal(t, ct.String(), "/waku/2/test/proto") - _, err := StringToContentTopic("/waku/-1/a/b") + _, err = StringToContentTopic("/waku/-1/a/b") require.Error(t, ErrInvalidFormat, err) _, err = StringToContentTopic("waku/1/a/b") @@ -27,8 +28,29 @@ func TestContentTopic(t *testing.T) { require.Equal(t, ct.String(), ct2.String()) require.True(t, ct.Equal(ct2)) - ct3 := NewContentTopic("waku", 2, "test2", "proto") + ct3, err := NewContentTopic("waku", 2, "test2", "proto") + require.NoError(t, err) require.False(t, ct.Equal(ct3)) + + ct4, err := StringToContentTopic("/0/toychat/2/huilong/proto") + require.NoError(t, err) + require.Equal(t, ct4.Generation, 0) + + ct6, err := StringToContentTopic("/toychat/2/huilong/proto") + require.NoError(t, err) + + nsPubSubT1 := GetShardFromContentTopic(ct6, GenerationZeroShardsCount) + require.Equal(t, NewStaticShardingPubsubTopic(ClusterIndex, 3), nsPubSubT1) + + _, err = StringToContentTopic("/abc/toychat/2/huilong/proto") + require.Error(t, ErrInvalidGeneration, err) + + _, err = StringToContentTopic("/1/toychat/2/huilong/proto") + require.Error(t, ErrInvalidGeneration, err) + + ct5, err := NewContentTopic("waku", 2, "test2", "proto", WithGeneration(0)) + require.NoError(t, err) + require.Equal(t, ct5.Generation, 0) } func TestNsPubsubTopic(t *testing.T) {