feat: autosharding core logic (#669)

* feat: autosharding core logic
This commit is contained in:
Prem Chaitanya Prathi 2023-08-25 09:55:38 +05:30 committed by GitHub
parent cb3f5da322
commit 44d3ef6d78
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 217 additions and 69 deletions

View File

@ -199,8 +199,8 @@ func waku_peer_cnt(onOkCb C.WakuCallBack, onErrCb C.WakuCallBack) C.int {
//
//export waku_content_topic
func waku_content_topic(applicationName *C.char, applicationVersion C.uint, contentTopicName *C.char, encoding *C.char, onOkCb C.WakuCallBack) C.int {
contentTopic := protocol.NewContentTopic(C.GoString(applicationName), uint(applicationVersion), C.GoString(contentTopicName), C.GoString(encoding)).String()
return execOkCB(onOkCb, contentTopic)
contentTopic, _ := protocol.NewContentTopic(C.GoString(applicationName), uint32(applicationVersion), C.GoString(contentTopicName), C.GoString(encoding))
return execOkCB(onOkCb, contentTopic.String())
}
// Create a pubsub topic string according to RFC 23

View File

@ -74,7 +74,8 @@ func PeerCnt() string {
// ContentTopic creates a content topic string according to RFC 23
func ContentTopic(applicationName string, applicationVersion int, contentTopicName string, encoding string) string {
return protocol.NewContentTopic(applicationName, uint(applicationVersion), contentTopicName, encoding).String()
contentTopic, _ := protocol.NewContentTopic(applicationName, uint32(applicationVersion), contentTopicName, encoding)
return contentTopic.String()
}
// PubsubTopic creates a pubsub topic string according to RFC 23

View File

@ -336,7 +336,8 @@ func PeerCnt() (int, error) {
// ContentTopic creates a content topic string according to RFC 23
func ContentTopic(applicationName string, applicationVersion int, contentTopicName string, encoding string) string {
return protocol.NewContentTopic(applicationName, uint(applicationVersion), contentTopicName, encoding).String()
contentTopic, _ := protocol.NewContentTopic(applicationName, uint32(applicationVersion), contentTopicName, encoding)
return contentTopic.String()
}
// PubsubTopic creates a pubsub topic string according to RFC 23

View File

@ -0,0 +1,127 @@
package protocol
import (
"errors"
"fmt"
"strconv"
"strings"
)
// DefaultContentTopic is the default content topic used in Waku network if no content topic is specified.
const DefaultContentTopic = "/waku/2/default-content/proto"
var ErrInvalidFormat = errors.New("invalid format")
var ErrMissingGeneration = errors.New("missing part: generation")
var ErrInvalidGeneration = errors.New("generation should be a number")
// ContentTopic is used for content based.
type ContentTopic struct {
ContentTopicParams
ApplicationName string
ApplicationVersion uint32
ContentTopicName string
Encoding string
}
// ContentTopicParams contains all the optional params for a content topic
type ContentTopicParams struct {
Generation int
}
// Equal method used to compare 2 contentTopicParams
func (ctp ContentTopicParams) Equal(ctp2 ContentTopicParams) bool {
return ctp.Generation == ctp2.Generation
}
// ContentTopicOption is following the options pattern to define optional params
type ContentTopicOption func(*ContentTopicParams)
// String formats a content topic in string format as per RFC 23.
func (ct ContentTopic) String() string {
return fmt.Sprintf("/%s/%d/%s/%s", ct.ApplicationName, ct.ApplicationVersion, ct.ContentTopicName, ct.Encoding)
}
// NewContentTopic creates a new content topic based on params specified.
// Returns ErrInvalidGeneration if an unsupported generation is specified.
func NewContentTopic(applicationName string, applicationVersion uint32,
contentTopicName string, encoding string, opts ...ContentTopicOption) (ContentTopic, error) {
params := new(ContentTopicParams)
optList := DefaultOptions()
optList = append(optList, opts...)
for _, opt := range optList {
opt(params)
}
if params.Generation > 0 {
return ContentTopic{}, ErrInvalidGeneration
}
return ContentTopic{
ContentTopicParams: *params,
ApplicationName: applicationName,
ApplicationVersion: applicationVersion,
ContentTopicName: contentTopicName,
Encoding: encoding,
}, nil
}
// WithGeneration option can be used to specify explicitly a generation for contentTopic
func WithGeneration(generation int) ContentTopicOption {
return func(params *ContentTopicParams) {
params.Generation = generation
}
}
// DefaultOptions sets default values for contentTopic optional params.
func DefaultOptions() []ContentTopicOption {
return []ContentTopicOption{
WithGeneration(0),
}
}
// Equal to compare 2 content topics.
func (ct ContentTopic) Equal(ct2 ContentTopic) bool {
return ct.ApplicationName == ct2.ApplicationName && ct.ApplicationVersion == ct2.ApplicationVersion &&
ct.ContentTopicName == ct2.ContentTopicName && ct.Encoding == ct2.Encoding &&
ct.ContentTopicParams.Equal(ct2.ContentTopicParams)
}
// StringToContentTopic can be used to create a ContentTopic object from a string
func StringToContentTopic(s string) (ContentTopic, error) {
p := strings.Split(s, "/")
switch len(p) {
case 5:
vNum, err := strconv.ParseUint(p[2], 10, 32)
if err != nil {
return ContentTopic{}, ErrInvalidFormat
}
return ContentTopic{
ApplicationName: p[1],
ApplicationVersion: uint32(vNum),
ContentTopicName: p[3],
Encoding: p[4],
}, nil
case 6:
if len(p[1]) == 0 {
return ContentTopic{}, ErrMissingGeneration
}
generation, err := strconv.Atoi(p[1])
if err != nil || generation > 0 {
return ContentTopic{}, ErrInvalidGeneration
}
vNum, err := strconv.ParseUint(p[3], 10, 32)
if err != nil {
return ContentTopic{}, ErrInvalidFormat
}
return ContentTopic{
ContentTopicParams: ContentTopicParams{Generation: generation},
ApplicationName: p[2],
ApplicationVersion: uint32(vNum),
ContentTopicName: p[4],
Encoding: p[5],
}, nil
default:
return ContentTopic{}, ErrInvalidFormat
}
}

View File

@ -10,7 +10,6 @@ import (
const Waku2PubsubTopicPrefix = "/waku/2"
const StaticShardingPubsubTopicPrefix = Waku2PubsubTopicPrefix + "/rs"
var ErrInvalidFormat = errors.New("invalid format")
var ErrInvalidStructure = errors.New("invalid topic structure")
var ErrInvalidTopicPrefix = errors.New("must start with " + Waku2PubsubTopicPrefix)
var ErrMissingTopicName = errors.New("missing topic-name")
@ -19,51 +18,7 @@ var ErrMissingClusterIndex = errors.New("missing shard_cluster_index")
var ErrMissingShardNumber = errors.New("missing shard_number")
var ErrInvalidNumberFormat = errors.New("only 2^16 numbers are allowed")
type ContentTopic struct {
ApplicationName string
ApplicationVersion uint
ContentTopicName string
Encoding string
}
func (ct ContentTopic) String() string {
return fmt.Sprintf("/%s/%d/%s/%s", ct.ApplicationName, ct.ApplicationVersion, ct.ContentTopicName, ct.Encoding)
}
func NewContentTopic(applicationName string, applicationVersion uint, contentTopicName string, encoding string) ContentTopic {
return ContentTopic{
ApplicationName: applicationName,
ApplicationVersion: applicationVersion,
ContentTopicName: contentTopicName,
Encoding: encoding,
}
}
func (ct ContentTopic) Equal(ct2 ContentTopic) bool {
return ct.ApplicationName == ct2.ApplicationName && ct.ApplicationVersion == ct2.ApplicationVersion &&
ct.ContentTopicName == ct2.ContentTopicName && ct.Encoding == ct2.Encoding
}
func StringToContentTopic(s string) (ContentTopic, error) {
p := strings.Split(s, "/")
if len(p) != 5 || p[0] != "" || p[1] == "" || p[2] == "" || p[3] == "" || p[4] == "" {
return ContentTopic{}, ErrInvalidFormat
}
vNum, err := strconv.ParseUint(p[2], 10, 32)
if err != nil {
return ContentTopic{}, ErrInvalidFormat
}
return ContentTopic{
ApplicationName: p[1],
ApplicationVersion: uint(vNum),
ContentTopicName: p[3],
Encoding: p[4],
}, nil
}
// NamespacedPubsubTopicKind used to represent kind of NamespacedPubsubTopicKind
type NamespacedPubsubTopicKind int
const (
@ -71,18 +26,21 @@ const (
NamedSharding
)
// NamespacedPubsubTopic is an interface for namespace based pubSub topic
type NamespacedPubsubTopic interface {
String() string
Kind() NamespacedPubsubTopicKind
Equal(NamespacedPubsubTopic) bool
}
// NamedShardingPubsubTopic is object for a NamedSharding type pubSub topic
type NamedShardingPubsubTopic struct {
NamespacedPubsubTopic
kind NamespacedPubsubTopicKind
name string
}
// NewNamedShardingPubsubTopic creates a new NamedShardingPubSubTopic
func NewNamedShardingPubsubTopic(name string) NamespacedPubsubTopic {
return NamedShardingPubsubTopic{
kind: NamedSharding,
@ -90,23 +48,28 @@ func NewNamedShardingPubsubTopic(name string) NamespacedPubsubTopic {
}
}
// Kind returns the type of PubsubTopic whether it is StaticShared or NamedSharded
func (n NamedShardingPubsubTopic) Kind() NamespacedPubsubTopicKind {
return n.kind
}
// Name is the name of the NamedSharded pubsub topic.
func (n NamedShardingPubsubTopic) Name() string {
return n.name
}
func (s NamedShardingPubsubTopic) Equal(t2 NamespacedPubsubTopic) bool {
return s.String() == t2.String()
// Equal compares NamedShardingPubsubTopic
func (n NamedShardingPubsubTopic) Equal(t2 NamespacedPubsubTopic) bool {
return n.String() == t2.String()
}
// String formats NamedShardingPubsubTopic to RFC 23 specific string format for pubsub topic.
func (n NamedShardingPubsubTopic) String() string {
return fmt.Sprintf("%s/%s", Waku2PubsubTopicPrefix, n.name)
}
func (s *NamedShardingPubsubTopic) Parse(topic string) error {
// Parse parses a topic string into a NamedShardingPubsubTopic
func (n *NamedShardingPubsubTopic) Parse(topic string) error {
if !strings.HasPrefix(topic, Waku2PubsubTopicPrefix) {
return ErrInvalidTopicPrefix
}
@ -116,12 +79,13 @@ func (s *NamedShardingPubsubTopic) Parse(topic string) error {
return ErrMissingTopicName
}
s.kind = NamedSharding
s.name = topicName
n.kind = NamedSharding
n.name = topicName
return nil
}
// StaticShardingPubsubTopic describes a pubSub topic as per StaticSharding
type StaticShardingPubsubTopic struct {
NamespacedPubsubTopic
kind NamespacedPubsubTopicKind
@ -129,6 +93,7 @@ type StaticShardingPubsubTopic struct {
shard uint16
}
// NewStaticShardingPubsubTopic creates a new pubSub topic
func NewStaticShardingPubsubTopic(cluster uint16, shard uint16) NamespacedPubsubTopic {
return StaticShardingPubsubTopic{
kind: StaticSharding,
@ -137,26 +102,32 @@ func NewStaticShardingPubsubTopic(cluster uint16, shard uint16) NamespacedPubsub
}
}
func (n StaticShardingPubsubTopic) Cluster() uint16 {
return n.cluster
// Cluster returns the sharded cluster index
func (s StaticShardingPubsubTopic) Cluster() uint16 {
return s.cluster
}
func (n StaticShardingPubsubTopic) Shard() uint16 {
return n.shard
// Cluster returns the shard number
func (s StaticShardingPubsubTopic) Shard() uint16 {
return s.shard
}
func (n StaticShardingPubsubTopic) Kind() NamespacedPubsubTopicKind {
return n.kind
// Kind returns the type of PubsubTopic whether it is StaticShared or NamedSharded
func (s StaticShardingPubsubTopic) Kind() NamespacedPubsubTopicKind {
return s.kind
}
// Equal compares StaticShardingPubsubTopic
func (s StaticShardingPubsubTopic) Equal(t2 NamespacedPubsubTopic) bool {
return s.String() == t2.String()
}
func (n StaticShardingPubsubTopic) String() string {
return fmt.Sprintf("%s/%d/%d", StaticShardingPubsubTopicPrefix, n.cluster, n.shard)
// String formats StaticShardingPubsubTopic to RFC 23 specific string format for pubsub topic.
func (s StaticShardingPubsubTopic) String() string {
return fmt.Sprintf("%s/%d/%d", StaticShardingPubsubTopicPrefix, s.cluster, s.shard)
}
// Parse parses a topic string into a StaticShardingPubsubTopic
func (s *StaticShardingPubsubTopic) Parse(topic string) error {
if !strings.HasPrefix(topic, StaticShardingPubsubTopicPrefix) {
return ErrInvalidShardedTopicPrefix
@ -194,6 +165,7 @@ func (s *StaticShardingPubsubTopic) Parse(topic string) error {
return nil
}
// ToShardedPubsubTopic takes a pubSub topic string and creates a NamespacedPubsubTopic object.
func ToShardedPubsubTopic(topic string) (NamespacedPubsubTopic, error) {
if strings.HasPrefix(topic, StaticShardingPubsubTopicPrefix) {
s := StaticShardingPubsubTopic{}
@ -212,6 +184,7 @@ func ToShardedPubsubTopic(topic string) (NamespacedPubsubTopic, error) {
}
}
// DefaultPubsubTopic is the default pubSub topic used in waku
func DefaultPubsubTopic() NamespacedPubsubTopic {
return NewNamedShardingPubsubTopic("default-waku/proto")
}

View File

@ -6,10 +6,19 @@ import (
"fmt"
"math"
"strings"
"github.com/waku-org/go-waku/waku/v2/hash"
)
const MaxShardIndex = uint16(1023)
// ClusterIndex is the clusterID used in sharding space.
// For indices allocation and other magic numbers refer to RFC 51
const ClusterIndex = 1
// GenerationZeroShardsCount is number of shards supported in generation-0
const GenerationZeroShardsCount = 8
type RelayShards struct {
Cluster uint16
Indices []uint16
@ -205,3 +214,18 @@ func FromBitVector(buf []byte) (RelayShards, error) {
return RelayShards{Cluster: cluster, Indices: indices}, nil
}
// GetShardFromContentTopic runs Autosharding logic and returns a pubSubTopic
// This is based on Autosharding algorithm defined in RFC 51
func GetShardFromContentTopic(topic ContentTopic, shardCount int) NamespacedPubsubTopic {
bytes := []byte(topic.ApplicationName)
bytes = append(bytes, []byte(fmt.Sprintf("%d", topic.ApplicationVersion))...)
hash := hash.SHA256(bytes)
//We only use the last 64 bits of the hash as having more shards is unlikely.
hashValue := binary.BigEndian.Uint64(hash[24:])
shard := hashValue % uint64(shardCount)
return NewStaticShardingPubsubTopic(ClusterIndex, uint16(shard))
}

View File

@ -27,14 +27,14 @@ func TestIndexComputation(t *testing.T) {
msg1 := &wpb.WakuMessage{
Payload: []byte{1, 2, 3},
Timestamp: 123,
ContentTopic: "/waku/2/default-content/proto",
ContentTopic: protocol.DefaultContentTopic,
}
idx1 := protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), "test").Index()
msg2 := &wpb.WakuMessage{
Payload: []byte{1, 2, 3},
Timestamp: 123,
ContentTopic: "/waku/2/default-content/proto",
ContentTopic: protocol.DefaultContentTopic,
}
idx2 := protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), "test").Index()

View File

@ -6,11 +6,12 @@ import (
"github.com/stretchr/testify/require"
)
func TestContentTopic(t *testing.T) {
ct := NewContentTopic("waku", 2, "test", "proto")
func TestContentTopicAndSharding(t *testing.T) {
ct, err := NewContentTopic("waku", 2, "test", "proto")
require.NoError(t, err)
require.Equal(t, ct.String(), "/waku/2/test/proto")
_, err := StringToContentTopic("/waku/-1/a/b")
_, err = StringToContentTopic("/waku/-1/a/b")
require.Error(t, ErrInvalidFormat, err)
_, err = StringToContentTopic("waku/1/a/b")
@ -27,8 +28,29 @@ func TestContentTopic(t *testing.T) {
require.Equal(t, ct.String(), ct2.String())
require.True(t, ct.Equal(ct2))
ct3 := NewContentTopic("waku", 2, "test2", "proto")
ct3, err := NewContentTopic("waku", 2, "test2", "proto")
require.NoError(t, err)
require.False(t, ct.Equal(ct3))
ct4, err := StringToContentTopic("/0/toychat/2/huilong/proto")
require.NoError(t, err)
require.Equal(t, ct4.Generation, 0)
ct6, err := StringToContentTopic("/toychat/2/huilong/proto")
require.NoError(t, err)
nsPubSubT1 := GetShardFromContentTopic(ct6, GenerationZeroShardsCount)
require.Equal(t, NewStaticShardingPubsubTopic(ClusterIndex, 3), nsPubSubT1)
_, err = StringToContentTopic("/abc/toychat/2/huilong/proto")
require.Error(t, ErrInvalidGeneration, err)
_, err = StringToContentTopic("/1/toychat/2/huilong/proto")
require.Error(t, ErrInvalidGeneration, err)
ct5, err := NewContentTopic("waku", 2, "test2", "proto", WithGeneration(0))
require.NoError(t, err)
require.Equal(t, ct5.Generation, 0)
}
func TestNsPubsubTopic(t *testing.T) {