feat: remove named topic (#844)

* feat: remove named topic

* fix: update examples

* Update library/mobile/api.go
This commit is contained in:
harsh jain 2023-10-30 21:56:26 +07:00 committed by GitHub
parent 279752344f
commit ddf188bbf8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 102 additions and 201 deletions

View File

@ -22,12 +22,3 @@ fun ContentTopic(applicationName: String, applicationVersion: Long, contentTopic
return Gowaku.contentTopic(applicationName, applicationVersion, contentTopicName, encoding)
}
/**
* Create a pubsub topic string
* @param name
* @param encoding
* @return Pubsub topic string according to RFC 23
*/
fun PubsubTopic(name: String, encoding: String): String {
return Gowaku.pubsubTopic(name, encoding)
}

View File

@ -24,7 +24,7 @@ import (
var log = logging.Logger("filter2")
var pubSubTopic = protocol.DefaultPubsubTopic()
var pubSubTopic = protocol.DefaultPubsubTopic{}
const contentTopic = "/filter2test/1/testTopic/proto"

View File

@ -15,7 +15,7 @@ const credentialsPath = ""
const credentialsPassword = ""
var contentTopic = protocol.NewContentTopic("rln", 1, "test", "proto").String()
var pubsubTopic = protocol.DefaultPubsubTopic()
var pubsubTopic = protocol.DefaultPubsubTopic{}
```
The private key used here should contain enough Sepolia ETH to register on the contract (0.001 ETH). An ethereum client address is required as well. After updating these values, execute `make`

View File

@ -31,7 +31,7 @@ var keystorePath = "./rlnKeystore.json"
var keystorePassword = "password"
var membershipIndex = uint(0)
var contentTopic, _ = protocol.NewContentTopic("rln", 1, "test", "proto")
var pubsubTopic = protocol.DefaultPubsubTopic()
var pubsubTopic = protocol.DefaultPubsubTopic{}
// ============================================================================

View File

@ -34,19 +34,5 @@ namespace Waku
return Response.PtrToStringUtf8(ptr);
}
[DllImport(Constants.dllName)]
internal static extern IntPtr waku_pubsub_topic(string name, string encoding);
/// <summary>
/// Create a pubsub topic string
/// </summary>
/// <param name="name"></param>
/// <param name="encoding"></param>
/// <returns>Pubsub topic string according to RFC 23</returns>
public static string PubsubTopic(string name, string encoding)
{
IntPtr ptr = waku_pubsub_topic(name, encoding);
return Response.PtrToStringUtf8(ptr);
}
}
}

View File

@ -199,13 +199,6 @@ func waku_content_topic(applicationName *C.char, applicationVersion C.uint, cont
return onSuccesfulResponse(contentTopic.String(), cb, userData)
}
// Create a pubsub topic string according to RFC 23
//
//export waku_pubsub_topic
func waku_pubsub_topic(name *C.char, encoding *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int {
topic := library.PubsubTopic(C.GoString(name), C.GoString(encoding))
return onSuccesfulResponse(topic, cb, userData)
}
// Get the default pubsub topic used in waku2: /waku/2/default-waku/proto
//

View File

@ -78,14 +78,9 @@ func ContentTopic(applicationName string, applicationVersion int, contentTopicNa
return contentTopic.String()
}
// PubsubTopic creates a pubsub topic string according to RFC 23
func PubsubTopic(name string, encoding string) string {
return protocol.NewNamedShardingPubsubTopic(name + "/" + encoding).String()
}
// DefaultPubsubTopic returns the default pubsub topic used in waku2: /waku/2/default-waku/proto
func DefaultPubsubTopic() string {
return protocol.DefaultPubsubTopic().String()
return library.DefaultPubsubTopic()
}
// Peers retrieves the list of peers known by the waku node

View File

@ -340,14 +340,9 @@ func ContentTopic(applicationName string, applicationVersion int, contentTopicNa
return contentTopic.String()
}
// PubsubTopic creates a pubsub topic string according to RFC 23
func PubsubTopic(name string, encoding string) string {
return protocol.NewNamedShardingPubsubTopic(name + "/" + encoding).String()
}
// DefaultPubsubTopic returns the default pubsub topic used in waku2: /waku/2/default-waku/proto
func DefaultPubsubTopic() string {
return protocol.DefaultPubsubTopic().String()
return protocol.DefaultPubsubTopic{}.String()
}
type subscriptionMsg struct {

View File

@ -16,7 +16,7 @@ func RelayEnoughPeers(topic string) (bool, error) {
return false, errWakuNodeNotReady
}
topicToCheck := protocol.DefaultPubsubTopic().String()
topicToCheck := protocol.DefaultPubsubTopic{}.String()
if topic != "" {
topicToCheck = topic
}

View File

@ -7,9 +7,6 @@ import (
"strings"
)
// DefaultContentTopic is the default content topic used in Waku network if no content topic is specified.
const DefaultContentTopic = "/waku/2/default-content/proto"
var ErrInvalidFormat = errors.New("invalid content topic format")
var ErrMissingGeneration = errors.New("missing part: generation")
var ErrInvalidGeneration = errors.New("generation should be a number")

View File

@ -122,22 +122,22 @@ func ContainsShard(record *enr.Record, cluster uint16, index uint16) bool {
return rs.Contains(cluster, index)
}
func ContainsShardWithNsTopic(record *enr.Record, topic protocol.NamespacedPubsubTopic) bool {
if topic.Kind() != protocol.StaticSharding {
func ContainsShardWithWakuTopic(record *enr.Record, topic protocol.WakuPubSubTopic) bool {
if shardTopic, err := protocol.ToShardPubsubTopic(topic); err != nil {
return false
}
shardTopic := topic.(protocol.StaticShardingPubsubTopic)
} else {
return ContainsShard(record, shardTopic.Cluster(), shardTopic.Shard())
}
}
func ContainsRelayShard(record *enr.Record, topic protocol.StaticShardingPubsubTopic) bool {
return ContainsShardWithNsTopic(record, topic)
return ContainsShardWithWakuTopic(record, topic)
}
func ContainsShardTopic(record *enr.Record, topic string) bool {
shardTopic, err := protocol.ToShardedPubsubTopic(topic)
shardTopic, err := protocol.ToWakuPubsubTopic(topic)
if err != nil {
return false
}
return ContainsShardWithNsTopic(record, shardTopic)
return ContainsShardWithWakuTopic(record, shardTopic)
}

View File

@ -7,18 +7,27 @@ import (
"strings"
)
// Waku2PubsubTopicPrefix is the expected prefix to be used for pubsub topics
const Waku2PubsubTopicPrefix = "/waku/2"
type WakuPubSubTopic interface {
String() string
}
const defaultPubsubTopic = "/waku/2/default-waku/proto"
type DefaultPubsubTopic struct{}
func (DefaultPubsubTopic) String() string {
return defaultPubsubTopic
}
// StaticShardingPubsubTopicPrefix is the expected prefix to be used for static sharding pubsub topics
const StaticShardingPubsubTopicPrefix = Waku2PubsubTopicPrefix + "/rs"
const StaticShardingPubsubTopicPrefix = "/waku/2/rs"
// ErrInvalidStructure indicates that the pubsub topic is malformed
// waku pubsub topic errors
var ErrNotWakuPubsubTopic = errors.New("not a waku pubsub topic")
// shard pubsub topic errors
var ErrNotShardPubsubTopic = errors.New("not a shard pubsub topic")
var ErrInvalidStructure = errors.New("invalid topic structure")
// ErrInvalidTopicPrefix indicates that the pubsub topic is missing the prefix /waku/2
var ErrInvalidTopicPrefix = errors.New("must start with " + Waku2PubsubTopicPrefix)
var ErrMissingTopicName = errors.New("missing topic-name")
var ErrInvalidShardedTopicPrefix = errors.New("must start with " + StaticShardingPubsubTopicPrefix)
var ErrMissingClusterIndex = errors.New("missing shard_cluster_index")
var ErrMissingShardNumber = errors.New("missing shard_number")
@ -26,77 +35,8 @@ var ErrMissingShardNumber = errors.New("missing shard_number")
// ErrInvalidNumberFormat indicates that a number exceeds the allowed range
var ErrInvalidNumberFormat = errors.New("only 2^16 numbers are allowed")
// NamespacedPubsubTopicKind used to represent kind of NamespacedPubsubTopicKind
type NamespacedPubsubTopicKind int
const (
StaticSharding NamespacedPubsubTopicKind = iota
NamedSharding
)
// NamespacedPubsubTopic is an interface for namespace based pubSub topic
type NamespacedPubsubTopic interface {
String() string
Kind() NamespacedPubsubTopicKind
Equal(NamespacedPubsubTopic) bool
}
// NamedShardingPubsubTopic is object for a NamedSharding type pubSub topic
type NamedShardingPubsubTopic struct {
NamespacedPubsubTopic
kind NamespacedPubsubTopicKind
name string
}
// NewNamedShardingPubsubTopic creates a new NamedShardingPubSubTopic
func NewNamedShardingPubsubTopic(name string) NamespacedPubsubTopic {
return NamedShardingPubsubTopic{
kind: NamedSharding,
name: name,
}
}
// Kind returns the type of PubsubTopic whether it is StaticShared or NamedSharded
func (n NamedShardingPubsubTopic) Kind() NamespacedPubsubTopicKind {
return n.kind
}
// Name is the name of the NamedSharded pubsub topic.
func (n NamedShardingPubsubTopic) Name() string {
return n.name
}
// Equal compares NamedShardingPubsubTopic
func (n NamedShardingPubsubTopic) Equal(t2 NamespacedPubsubTopic) bool {
return n.String() == t2.String()
}
// String formats NamedShardingPubsubTopic to RFC 23 specific string format for pubsub topic.
func (n NamedShardingPubsubTopic) String() string {
return fmt.Sprintf("%s/%s", Waku2PubsubTopicPrefix, n.name)
}
// Parse parses a topic string into a NamedShardingPubsubTopic
func (n *NamedShardingPubsubTopic) Parse(topic string) error {
if !strings.HasPrefix(topic, Waku2PubsubTopicPrefix) {
return ErrInvalidTopicPrefix
}
topicName := topic[8:]
if len(topicName) == 0 {
return ErrMissingTopicName
}
n.kind = NamedSharding
n.name = topicName
return nil
}
// StaticShardingPubsubTopic describes a pubSub topic as per StaticSharding
type StaticShardingPubsubTopic struct {
NamespacedPubsubTopic
kind NamespacedPubsubTopicKind
cluster uint16
shard uint16
}
@ -104,7 +44,6 @@ type StaticShardingPubsubTopic struct {
// NewStaticShardingPubsubTopic creates a new pubSub topic
func NewStaticShardingPubsubTopic(cluster uint16, shard uint16) StaticShardingPubsubTopic {
return StaticShardingPubsubTopic{
kind: StaticSharding,
cluster: cluster,
shard: shard,
}
@ -120,13 +59,8 @@ func (s StaticShardingPubsubTopic) Shard() uint16 {
return s.shard
}
// Kind returns the type of PubsubTopic whether it is StaticShared or NamedSharded
func (s StaticShardingPubsubTopic) Kind() NamespacedPubsubTopicKind {
return s.kind
}
// Equal compares StaticShardingPubsubTopic
func (s StaticShardingPubsubTopic) Equal(t2 NamespacedPubsubTopic) bool {
func (s StaticShardingPubsubTopic) Equal(t2 StaticShardingPubsubTopic) bool {
return s.String() == t2.String()
}
@ -168,31 +102,30 @@ func (s *StaticShardingPubsubTopic) Parse(topic string) error {
s.shard = uint16(shardInt)
s.cluster = uint16(clusterInt)
s.kind = StaticSharding
return nil
}
// ToShardedPubsubTopic takes a pubSub topic string and creates a NamespacedPubsubTopic object.
func ToShardedPubsubTopic(topic string) (NamespacedPubsubTopic, error) {
func ToShardPubsubTopic(topic WakuPubSubTopic) (StaticShardingPubsubTopic, error) {
result, ok := topic.(StaticShardingPubsubTopic)
if !ok {
return StaticShardingPubsubTopic{}, ErrNotShardPubsubTopic
}
return result, nil
}
// ToWakuPubsubTopic takes a pubSub topic string and creates a WakuPubsubTopic object.
func ToWakuPubsubTopic(topic string) (WakuPubSubTopic, error) {
if topic == defaultPubsubTopic {
return DefaultPubsubTopic{}, nil
}
if strings.HasPrefix(topic, StaticShardingPubsubTopicPrefix) {
s := StaticShardingPubsubTopic{}
err := s.Parse(topic)
if err != nil {
return nil, err
return s, err
}
return s, nil
}
s := NamedShardingPubsubTopic{}
err := s.Parse(topic)
if err != nil {
return nil, err
}
return s, nil
}
// DefaultPubsubTopic is the default pubSub topic used in waku
func DefaultPubsubTopic() NamespacedPubsubTopic {
return NewNamedShardingPubsubTopic("default-waku/proto")
return nil, ErrNotWakuPubsubTopic
}

View File

@ -25,7 +25,7 @@ import (
const WakuRelayID_v200 = protocol.ID("/vac/waku/relay/2.0.0")
// DefaultWakuTopic is the default pubsub topic used across all Waku protocols
var DefaultWakuTopic string = waku_proto.DefaultPubsubTopic().String()
var DefaultWakuTopic string = waku_proto.DefaultPubsubTopic{}.String()
// WakuRelay is the implementation of the Waku Relay protocol
type WakuRelay struct {

View File

@ -49,8 +49,8 @@ func NewRelayShards(cluster uint16, indices ...uint16) (RelayShards, error) {
return RelayShards{Cluster: cluster, Indices: indices}, nil
}
func (rs RelayShards) Topics() []NamespacedPubsubTopic {
var result []NamespacedPubsubTopic
func (rs RelayShards) Topics() []WakuPubSubTopic {
var result []WakuPubSubTopic
for _, i := range rs.Indices {
result = append(result, NewStaticShardingPubsubTopic(rs.Cluster, i))
}
@ -72,14 +72,12 @@ func (rs RelayShards) Contains(cluster uint16, index uint16) bool {
return found
}
func (rs RelayShards) ContainsNamespacedTopic(topic NamespacedPubsubTopic) bool {
if topic.Kind() != StaticSharding {
func (rs RelayShards) ContainsShardPubsubTopic(topic WakuPubSubTopic) bool {
if shardedTopic, err := ToShardPubsubTopic(topic); err != nil {
return false
}
shardedTopic := topic.(StaticShardingPubsubTopic)
} else {
return rs.Contains(shardedTopic.Cluster(), shardedTopic.Shard())
}
}
func TopicsToRelayShards(topic ...string) ([]RelayShards, error) {
@ -123,11 +121,11 @@ func TopicsToRelayShards(topic ...string) ([]RelayShards, error) {
}
func (rs RelayShards) ContainsTopic(topic string) bool {
nsTopic, err := ToShardedPubsubTopic(topic)
wTopic, err := ToWakuPubsubTopic(topic)
if err != nil {
return false
}
return rs.ContainsNamespacedTopic(nsTopic)
return rs.ContainsShardPubsubTopic(wTopic)
}
func (rs RelayShards) IndicesList() ([]byte, error) {

View File

@ -13,6 +13,7 @@ import (
)
func TestIndexComputation(t *testing.T) {
testContentTopic := "/waku/2/default-content/proto"
msg := &wpb.WakuMessage{
Payload: []byte{1, 2, 3},
Timestamp: utils.GetUnixEpoch(),
@ -27,14 +28,14 @@ func TestIndexComputation(t *testing.T) {
msg1 := &wpb.WakuMessage{
Payload: []byte{1, 2, 3},
Timestamp: 123,
ContentTopic: protocol.DefaultContentTopic,
ContentTopic: testContentTopic,
}
idx1 := protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), "test").Index()
msg2 := &wpb.WakuMessage{
Payload: []byte{1, 2, 3},
Timestamp: 123,
ContentTopic: protocol.DefaultContentTopic,
ContentTopic: testContentTopic,
}
idx2 := protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), "test").Index()

View File

@ -114,41 +114,53 @@ func TestShardChoiceSimulation(t *testing.T) {
}
}
func TestNsPubsubTopic(t *testing.T) {
ns1 := NewNamedShardingPubsubTopic("waku-dev")
require.Equal(t, "/waku/2/waku-dev", ns1.String())
func TestShardPubsubTopic(t *testing.T) {
{ // not waku topci
topic := "/waku/1/2/3"
_, err := ToWakuPubsubTopic(topic)
require.Error(t, ErrNotWakuPubsubTopic, err)
}
ns2 := NewStaticShardingPubsubTopic(0, 2)
require.Equal(t, "/waku/2/rs/0/2", ns2.String())
require.True(t, ns1.Equal(ns1))
require.False(t, ns1.Equal(ns2))
topic := "/waku/2/waku-dev"
ns, err := ToShardedPubsubTopic(topic)
{ // check default pubsub topic
topic := defaultPubsubTopic
wakuTopic, err := ToWakuPubsubTopic(topic)
require.NoError(t, err)
require.Equal(t, NamedSharding, ns.Kind())
require.Equal(t, "waku-dev", ns.(NamedShardingPubsubTopic).Name())
require.Equal(t, defaultPubsubTopic, wakuTopic.String())
}
topic = "/waku/2/rs/16/42"
ns, err = ToShardedPubsubTopic(topic)
{ // check behavior of waku topic
topic := "/waku/2/rs/16/42"
wakuTopic, err := ToWakuPubsubTopic(topic)
require.NoError(t, err)
require.Equal(t, StaticSharding, ns.Kind())
require.Equal(t, uint16(16), ns.(StaticShardingPubsubTopic).Cluster())
require.Equal(t, uint16(42), ns.(StaticShardingPubsubTopic).Shard())
require.Equal(t, topic, wakuTopic.String())
require.Equal(t, uint16(16), wakuTopic.(StaticShardingPubsubTopic).Cluster())
require.Equal(t, uint16(42), wakuTopic.(StaticShardingPubsubTopic).Shard())
}
topic = "/waku/1/rs/16/42"
_, err = ToShardedPubsubTopic(topic)
{ // check if shard pubtopic checks for prefix
topic := "/waku/1/rs/16/42"
err := (&StaticShardingPubsubTopic{}).Parse(topic)
require.Error(t, err)
require.ErrorIs(t, err, ErrInvalidTopicPrefix)
require.ErrorIs(t, err, ErrInvalidShardedTopicPrefix)
}
topic = "/waku/2/rs//02"
_, err = ToShardedPubsubTopic(topic)
{ // check if cluster/index is missing
topic := "/waku/2/rs//02"
_, err := ToWakuPubsubTopic(topic)
require.Error(t, err)
require.ErrorIs(t, err, ErrMissingClusterIndex)
topic = "/waku/2/rs/xx/77"
_, err = ToShardedPubsubTopic(topic)
topic = "/waku/2/rs/1/"
_, err = ToWakuPubsubTopic(topic)
require.Error(t, err)
require.ErrorIs(t, err, ErrMissingShardNumber)
}
{ // check if the cluster/index are number
topic := "/waku/2/rs/xx/77"
_, err := ToWakuPubsubTopic(topic)
require.Error(t, err)
require.ErrorIs(t, err, ErrInvalidNumberFormat)
}
}

View File

@ -80,7 +80,7 @@ const registerMaxRetries = 7
// Discover is used to find a number of peers that use the default pubsub topic
func (r *Rendezvous) Discover(ctx context.Context, rp *RendezvousPoint, numPeers int) {
r.DiscoverWithNamespace(ctx, protocol.DefaultPubsubTopic().String(), rp, numPeers)
r.DiscoverWithNamespace(ctx, protocol.DefaultPubsubTopic{}.String(), rp, numPeers)
}
// DiscoverShard is used to find a number of peers that support an specific cluster and shard index
@ -137,7 +137,7 @@ func (r *Rendezvous) callRegister(ctx context.Context, namespace string, rendezv
// Register registers the node in the rendezvous points using the default pubsub topic as namespace
func (r *Rendezvous) Register(ctx context.Context, rendezvousPoints []*RendezvousPoint) {
r.RegisterWithNamespace(ctx, protocol.DefaultPubsubTopic().String(), rendezvousPoints)
r.RegisterWithNamespace(ctx, protocol.DefaultPubsubTopic{}.String(), rendezvousPoints)
}
// RegisterShard registers the node in the rendezvous points using a shard as namespace