diff --git a/cmd/waku/node.go b/cmd/waku/node.go index 4a5a352b..81d071d2 100644 --- a/cmd/waku/node.go +++ b/cmd/waku/node.go @@ -455,7 +455,7 @@ func processTopics(options NodeOptions) (map[string][]string, error) { if err != nil { return nil, err } - pTopic := wprotocol.GetShardFromContentTopic(contentTopic, wprotocol.GenerationZeroShardsCount) + pTopic := wprotocol.GetShardFromContentTopic(contentTopic, uint16(options.ClusterID), wprotocol.GenerationZeroShardsCount) if _, ok := pubSubTopicMap[pTopic.String()]; !ok { pubSubTopicMap[pTopic.String()] = []string{} } diff --git a/cmd/waku/server/rest/filter.go b/cmd/waku/server/rest/filter.go index 7973cc02..28fe1848 100644 --- a/cmd/waku/server/rest/filter.go +++ b/cmd/waku/server/rest/filter.go @@ -34,7 +34,7 @@ type FilterService struct { func (s *FilterService) Start(ctx context.Context) { for _, sub := range s.node.FilterLightnode().Subscriptions() { - s.cache.subscribe(sub.ContentFilter) + s.cache.subscribe(s.node.ClusterID(), sub.ContentFilter) } ctx, cancel := context.WithCancel(ctx) @@ -190,7 +190,7 @@ func (s *FilterService) subscribe(w http.ResponseWriter, req *http.Request) { } // on success - s.cache.subscribe(contentFilter) + s.cache.subscribe(s.node.ClusterID(), contentFilter) writeResponse(w, filterSubscriptionResponse{ RequestID: message.RequestID, StatusDesc: http.StatusText(http.StatusOK), @@ -346,7 +346,7 @@ func (s *FilterService) getMessagesByContentTopic(w http.ResponseWriter, req *ht if contentTopic == "" { return } - pubsubTopic, err := protocol.GetPubSubTopicFromContentTopic(contentTopic) + pubsubTopic, err := protocol.GetPubSubTopicFromContentTopic(s.node.ClusterID(), contentTopic) if err != nil { writeGetMessageErr(w, fmt.Errorf("bad content topic"), http.StatusBadRequest, s.log) return diff --git a/cmd/waku/server/rest/filter_cache.go b/cmd/waku/server/rest/filter_cache.go index 77144435..0a088f13 100644 --- a/cmd/waku/server/rest/filter_cache.go +++ b/cmd/waku/server/rest/filter_cache.go @@ -23,11 +23,11 @@ func newFilterCache(capacity int, log *zap.Logger) *filterCache { } } -func (c *filterCache) subscribe(contentFilter protocol.ContentFilter) { +func (c *filterCache) subscribe(clusterID uint16, contentFilter protocol.ContentFilter) { c.mu.Lock() defer c.mu.Unlock() - pubSubTopicMap, _ := protocol.ContentFilterToPubSubTopicMap(contentFilter) + pubSubTopicMap, _ := protocol.ContentFilterToPubSubTopicMap(clusterID, contentFilter) for pubsubTopic, contentTopics := range pubSubTopicMap { if c.data[pubsubTopic] == nil { c.data[pubsubTopic] = make(map[string][]*RestWakuMessage) diff --git a/cmd/waku/server/rest/filter_test.go b/cmd/waku/server/rest/filter_test.go index 1641175b..57d3b80f 100644 --- a/cmd/waku/server/rest/filter_test.go +++ b/cmd/waku/server/rest/filter_test.go @@ -261,7 +261,7 @@ func TestFilterGetMessages(t *testing.T) { contentTopic := "/waku/2/app/1" // get nodes add connect them - generatedPubsubTopic, err := protocol.GetPubSubTopicFromContentTopic(contentTopic) + generatedPubsubTopic, err := protocol.GetPubSubTopicFromContentTopic(protocol.DefaultClusterIndex, contentTopic) require.NoError(t, err) node1, node2 := twoFilterConnectedNodes(t, pubsubTopic, generatedPubsubTopic) defer func() { @@ -378,7 +378,7 @@ func toMessage(envs []*protocol.Envelope) []*pb.WakuMessage { func genMessage(pubsubTopic, contentTopic string) *protocol.Envelope { if pubsubTopic == "" { - pubsubTopic, _ = protocol.GetPubSubTopicFromContentTopic(contentTopic) + pubsubTopic, _ = protocol.GetPubSubTopicFromContentTopic(protocol.DefaultClusterIndex, contentTopic) } return protocol.NewEnvelope( &pb.WakuMessage{ diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 7f2a7cbe..3779238f 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -846,7 +846,7 @@ func (w *WakuNode) PeersByStaticShard(cluster uint16, shard uint16) peer.IDSlice // PeersByContentTopics filters peers based on contentTopic func (w *WakuNode) PeersByContentTopic(contentTopic string) peer.IDSlice { - pTopic, err := wakuprotocol.GetPubSubTopicFromContentTopic(contentTopic) + pTopic, err := wakuprotocol.GetPubSubTopicFromContentTopic(w.ClusterID(), contentTopic) if err != nil { return nil } diff --git a/waku/v2/peermanager/peer_selection.go b/waku/v2/peermanager/peer_selection.go index 2b4c3807..a02f41aa 100644 --- a/waku/v2/peermanager/peer_selection.go +++ b/waku/v2/peermanager/peer_selection.go @@ -24,8 +24,14 @@ type peerSet map[peer.ID]struct{} // If a peer cannot be found in the service slot, a peer will be selected from node peerstore func (pm *PeerManager) SelectPeerByContentTopics(proto protocol.ID, contentTopics []string, specificPeers ...peer.ID) (peer.ID, error) { pubsubTopics := []string{} + var clusterID *uint32 + *clusterID = 0 for _, cTopic := range contentTopics { - pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(cTopic) + if pm.metadata != nil { + clusterID, _, _ = pm.metadata.ClusterAndShards() + } + + pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(uint16(*clusterID), cTopic) if err != nil { pm.logger.Debug("selectPeer: failed to get contentTopic from pubsubTopic", zap.String("contentTopic", cTopic)) return "", err diff --git a/waku/v2/protocol/content_filter.go b/waku/v2/protocol/content_filter.go index f09cf52b..6f14099e 100644 --- a/waku/v2/protocol/content_filter.go +++ b/waku/v2/protocol/content_filter.go @@ -51,6 +51,6 @@ func (cf ContentFilter) Equals(cf1 ContentFilter) bool { } // This function converts a contentFilter into a map of pubSubTopics and corresponding contentTopics -func ContentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[PubsubTopicStr][]ContentTopicStr, error) { - return GeneratePubsubToContentTopicMap(contentFilter.PubsubTopic, contentFilter.ContentTopicsList()) +func ContentFilterToPubSubTopicMap(clusterID uint16, contentFilter ContentFilter) (map[PubsubTopicStr][]ContentTopicStr, error) { + return GeneratePubsubToContentTopicMap(clusterID, contentFilter.PubsubTopic, contentFilter.ContentTopicsList()) } diff --git a/waku/v2/protocol/shard.go b/waku/v2/protocol/shard.go index 8b101484..ab27b248 100644 --- a/waku/v2/protocol/shard.go +++ b/waku/v2/protocol/shard.go @@ -12,9 +12,9 @@ import ( const MaxShardIndex = uint16(1023) -// ClusterIndex is the clusterID used in sharding space. +// DefaultClusterIndex is the clusterID used in sharding space. // For shardIDs allocation and other magic numbers refer to RFC 51 -const ClusterIndex = 1 +const DefaultClusterIndex = 1 // GenerationZeroShardsCount is number of shards supported in generation-0 const GenerationZeroShardsCount = 8 @@ -229,7 +229,7 @@ func FromBitVector(buf []byte) (RelayShards, error) { // GetShardFromContentTopic runs Autosharding logic and returns a pubSubTopic // This is based on Autosharding algorithm defined in RFC 51 -func GetShardFromContentTopic(topic ContentTopic, shardCount int) StaticShardingPubsubTopic { +func GetShardFromContentTopic(topic ContentTopic, clusterID uint16, shardCount int) StaticShardingPubsubTopic { bytes := []byte(topic.ApplicationName) bytes = append(bytes, []byte(topic.ApplicationVersion)...) @@ -238,28 +238,30 @@ func GetShardFromContentTopic(topic ContentTopic, shardCount int) StaticSharding hashValue := binary.BigEndian.Uint64(hash[24:]) shard := hashValue % uint64(shardCount) - - return NewStaticShardingPubsubTopic(ClusterIndex, uint16(shard)) + if clusterID == 0 { + //TODO: should we return error?? + } + return NewStaticShardingPubsubTopic(clusterID, uint16(shard)) } -func GetPubSubTopicFromContentTopic(cTopicString string) (string, error) { +func GetPubSubTopicFromContentTopic(clusterID uint16, cTopicString string) (string, error) { cTopic, err := StringToContentTopic(cTopicString) if err != nil { return "", fmt.Errorf("%s : %s", err.Error(), cTopicString) } - pTopic := GetShardFromContentTopic(cTopic, GenerationZeroShardsCount) + pTopic := GetShardFromContentTopic(cTopic, clusterID, GenerationZeroShardsCount) return pTopic.String(), nil } -func GeneratePubsubToContentTopicMap(pubsubTopic string, contentTopics []string) (map[string][]string, error) { +func GeneratePubsubToContentTopicMap(clusterID uint16, pubsubTopic string, contentTopics []string) (map[string][]string, error) { pubSubTopicMap := make(map[string][]string, 0) if pubsubTopic == "" { //Should we derive pubsub topic from contentTopic so that peer selection and discovery can be done accordingly? for _, cTopic := range contentTopics { - pTopic, err := GetPubSubTopicFromContentTopic(cTopic) + pTopic, err := GetPubSubTopicFromContentTopic(clusterID, cTopic) if err != nil { return nil, err } diff --git a/waku/v2/protocol/topic_test.go b/waku/v2/protocol/topic_test.go index c1708d4a..238019d1 100644 --- a/waku/v2/protocol/topic_test.go +++ b/waku/v2/protocol/topic_test.go @@ -41,8 +41,8 @@ func TestContentTopicAndSharding(t *testing.T) { ct6, err := StringToContentTopic("/toychat/2/huilong/proto") require.NoError(t, err) - nsPubSubT1 := GetShardFromContentTopic(ct6, GenerationZeroShardsCount) - require.Equal(t, NewStaticShardingPubsubTopic(ClusterIndex, 3), nsPubSubT1) + nsPubSubT1 := GetShardFromContentTopic(ct6, DefaultClusterIndex, GenerationZeroShardsCount) + require.Equal(t, NewStaticShardingPubsubTopic(DefaultClusterIndex, 3), nsPubSubT1) _, err = StringToContentTopic("/abc/toychat/2/huilong/proto") require.Error(t, err, ErrInvalidGeneration) @@ -91,7 +91,7 @@ func TestShardChoiceSimulation(t *testing.T) { // When for _, topic := range topics { - pubsub := GetShardFromContentTopic(topic, GenerationZeroShardsCount) + pubsub := GetShardFromContentTopic(topic, DefaultClusterIndex, GenerationZeroShardsCount) counts[pubsub.Shard()]++ }