mirror of
https://github.com/logos-messaging/logos-messaging-go.git
synced 2026-01-02 14:03:06 +00:00
feat: draft changes enabling auto sharding in all clusters
This commit is contained in:
parent
7d767c0105
commit
bfd47a8d36
@ -455,7 +455,7 @@ func processTopics(options NodeOptions) (map[string][]string, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pTopic := wprotocol.GetShardFromContentTopic(contentTopic, wprotocol.GenerationZeroShardsCount)
|
||||
pTopic := wprotocol.GetShardFromContentTopic(contentTopic, uint16(options.ClusterID), wprotocol.GenerationZeroShardsCount)
|
||||
if _, ok := pubSubTopicMap[pTopic.String()]; !ok {
|
||||
pubSubTopicMap[pTopic.String()] = []string{}
|
||||
}
|
||||
|
||||
@ -34,7 +34,7 @@ type FilterService struct {
|
||||
func (s *FilterService) Start(ctx context.Context) {
|
||||
|
||||
for _, sub := range s.node.FilterLightnode().Subscriptions() {
|
||||
s.cache.subscribe(sub.ContentFilter)
|
||||
s.cache.subscribe(s.node.ClusterID(), sub.ContentFilter)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
@ -190,7 +190,7 @@ func (s *FilterService) subscribe(w http.ResponseWriter, req *http.Request) {
|
||||
}
|
||||
|
||||
// on success
|
||||
s.cache.subscribe(contentFilter)
|
||||
s.cache.subscribe(s.node.ClusterID(), contentFilter)
|
||||
writeResponse(w, filterSubscriptionResponse{
|
||||
RequestID: message.RequestID,
|
||||
StatusDesc: http.StatusText(http.StatusOK),
|
||||
@ -346,7 +346,7 @@ func (s *FilterService) getMessagesByContentTopic(w http.ResponseWriter, req *ht
|
||||
if contentTopic == "" {
|
||||
return
|
||||
}
|
||||
pubsubTopic, err := protocol.GetPubSubTopicFromContentTopic(contentTopic)
|
||||
pubsubTopic, err := protocol.GetPubSubTopicFromContentTopic(s.node.ClusterID(), contentTopic)
|
||||
if err != nil {
|
||||
writeGetMessageErr(w, fmt.Errorf("bad content topic"), http.StatusBadRequest, s.log)
|
||||
return
|
||||
|
||||
@ -23,11 +23,11 @@ func newFilterCache(capacity int, log *zap.Logger) *filterCache {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *filterCache) subscribe(contentFilter protocol.ContentFilter) {
|
||||
func (c *filterCache) subscribe(clusterID uint16, contentFilter protocol.ContentFilter) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
pubSubTopicMap, _ := protocol.ContentFilterToPubSubTopicMap(contentFilter)
|
||||
pubSubTopicMap, _ := protocol.ContentFilterToPubSubTopicMap(clusterID, contentFilter)
|
||||
for pubsubTopic, contentTopics := range pubSubTopicMap {
|
||||
if c.data[pubsubTopic] == nil {
|
||||
c.data[pubsubTopic] = make(map[string][]*RestWakuMessage)
|
||||
|
||||
@ -261,7 +261,7 @@ func TestFilterGetMessages(t *testing.T) {
|
||||
contentTopic := "/waku/2/app/1"
|
||||
|
||||
// get nodes add connect them
|
||||
generatedPubsubTopic, err := protocol.GetPubSubTopicFromContentTopic(contentTopic)
|
||||
generatedPubsubTopic, err := protocol.GetPubSubTopicFromContentTopic(protocol.DefaultClusterIndex, contentTopic)
|
||||
require.NoError(t, err)
|
||||
node1, node2 := twoFilterConnectedNodes(t, pubsubTopic, generatedPubsubTopic)
|
||||
defer func() {
|
||||
@ -378,7 +378,7 @@ func toMessage(envs []*protocol.Envelope) []*pb.WakuMessage {
|
||||
|
||||
func genMessage(pubsubTopic, contentTopic string) *protocol.Envelope {
|
||||
if pubsubTopic == "" {
|
||||
pubsubTopic, _ = protocol.GetPubSubTopicFromContentTopic(contentTopic)
|
||||
pubsubTopic, _ = protocol.GetPubSubTopicFromContentTopic(protocol.DefaultClusterIndex, contentTopic)
|
||||
}
|
||||
return protocol.NewEnvelope(
|
||||
&pb.WakuMessage{
|
||||
|
||||
@ -846,7 +846,7 @@ func (w *WakuNode) PeersByStaticShard(cluster uint16, shard uint16) peer.IDSlice
|
||||
|
||||
// PeersByContentTopics filters peers based on contentTopic
|
||||
func (w *WakuNode) PeersByContentTopic(contentTopic string) peer.IDSlice {
|
||||
pTopic, err := wakuprotocol.GetPubSubTopicFromContentTopic(contentTopic)
|
||||
pTopic, err := wakuprotocol.GetPubSubTopicFromContentTopic(w.ClusterID(), contentTopic)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -24,8 +24,14 @@ type peerSet map[peer.ID]struct{}
|
||||
// If a peer cannot be found in the service slot, a peer will be selected from node peerstore
|
||||
func (pm *PeerManager) SelectPeerByContentTopics(proto protocol.ID, contentTopics []string, specificPeers ...peer.ID) (peer.ID, error) {
|
||||
pubsubTopics := []string{}
|
||||
var clusterID *uint32
|
||||
*clusterID = 0
|
||||
for _, cTopic := range contentTopics {
|
||||
pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(cTopic)
|
||||
if pm.metadata != nil {
|
||||
clusterID, _, _ = pm.metadata.ClusterAndShards()
|
||||
}
|
||||
|
||||
pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(uint16(*clusterID), cTopic)
|
||||
if err != nil {
|
||||
pm.logger.Debug("selectPeer: failed to get contentTopic from pubsubTopic", zap.String("contentTopic", cTopic))
|
||||
return "", err
|
||||
|
||||
@ -51,6 +51,6 @@ func (cf ContentFilter) Equals(cf1 ContentFilter) bool {
|
||||
}
|
||||
|
||||
// This function converts a contentFilter into a map of pubSubTopics and corresponding contentTopics
|
||||
func ContentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[PubsubTopicStr][]ContentTopicStr, error) {
|
||||
return GeneratePubsubToContentTopicMap(contentFilter.PubsubTopic, contentFilter.ContentTopicsList())
|
||||
func ContentFilterToPubSubTopicMap(clusterID uint16, contentFilter ContentFilter) (map[PubsubTopicStr][]ContentTopicStr, error) {
|
||||
return GeneratePubsubToContentTopicMap(clusterID, contentFilter.PubsubTopic, contentFilter.ContentTopicsList())
|
||||
}
|
||||
|
||||
@ -12,9 +12,9 @@ import (
|
||||
|
||||
const MaxShardIndex = uint16(1023)
|
||||
|
||||
// ClusterIndex is the clusterID used in sharding space.
|
||||
// DefaultClusterIndex is the clusterID used in sharding space.
|
||||
// For shardIDs allocation and other magic numbers refer to RFC 51
|
||||
const ClusterIndex = 1
|
||||
const DefaultClusterIndex = 1
|
||||
|
||||
// GenerationZeroShardsCount is number of shards supported in generation-0
|
||||
const GenerationZeroShardsCount = 8
|
||||
@ -229,7 +229,7 @@ func FromBitVector(buf []byte) (RelayShards, error) {
|
||||
|
||||
// GetShardFromContentTopic runs Autosharding logic and returns a pubSubTopic
|
||||
// This is based on Autosharding algorithm defined in RFC 51
|
||||
func GetShardFromContentTopic(topic ContentTopic, shardCount int) StaticShardingPubsubTopic {
|
||||
func GetShardFromContentTopic(topic ContentTopic, clusterID uint16, shardCount int) StaticShardingPubsubTopic {
|
||||
bytes := []byte(topic.ApplicationName)
|
||||
bytes = append(bytes, []byte(topic.ApplicationVersion)...)
|
||||
|
||||
@ -238,28 +238,30 @@ func GetShardFromContentTopic(topic ContentTopic, shardCount int) StaticSharding
|
||||
hashValue := binary.BigEndian.Uint64(hash[24:])
|
||||
|
||||
shard := hashValue % uint64(shardCount)
|
||||
|
||||
return NewStaticShardingPubsubTopic(ClusterIndex, uint16(shard))
|
||||
if clusterID == 0 {
|
||||
//TODO: should we return error??
|
||||
}
|
||||
return NewStaticShardingPubsubTopic(clusterID, uint16(shard))
|
||||
}
|
||||
|
||||
func GetPubSubTopicFromContentTopic(cTopicString string) (string, error) {
|
||||
func GetPubSubTopicFromContentTopic(clusterID uint16, cTopicString string) (string, error) {
|
||||
cTopic, err := StringToContentTopic(cTopicString)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("%s : %s", err.Error(), cTopicString)
|
||||
}
|
||||
pTopic := GetShardFromContentTopic(cTopic, GenerationZeroShardsCount)
|
||||
pTopic := GetShardFromContentTopic(cTopic, clusterID, GenerationZeroShardsCount)
|
||||
|
||||
return pTopic.String(), nil
|
||||
}
|
||||
|
||||
func GeneratePubsubToContentTopicMap(pubsubTopic string, contentTopics []string) (map[string][]string, error) {
|
||||
func GeneratePubsubToContentTopicMap(clusterID uint16, pubsubTopic string, contentTopics []string) (map[string][]string, error) {
|
||||
|
||||
pubSubTopicMap := make(map[string][]string, 0)
|
||||
|
||||
if pubsubTopic == "" {
|
||||
//Should we derive pubsub topic from contentTopic so that peer selection and discovery can be done accordingly?
|
||||
for _, cTopic := range contentTopics {
|
||||
pTopic, err := GetPubSubTopicFromContentTopic(cTopic)
|
||||
pTopic, err := GetPubSubTopicFromContentTopic(clusterID, cTopic)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -41,8 +41,8 @@ func TestContentTopicAndSharding(t *testing.T) {
|
||||
ct6, err := StringToContentTopic("/toychat/2/huilong/proto")
|
||||
require.NoError(t, err)
|
||||
|
||||
nsPubSubT1 := GetShardFromContentTopic(ct6, GenerationZeroShardsCount)
|
||||
require.Equal(t, NewStaticShardingPubsubTopic(ClusterIndex, 3), nsPubSubT1)
|
||||
nsPubSubT1 := GetShardFromContentTopic(ct6, DefaultClusterIndex, GenerationZeroShardsCount)
|
||||
require.Equal(t, NewStaticShardingPubsubTopic(DefaultClusterIndex, 3), nsPubSubT1)
|
||||
|
||||
_, err = StringToContentTopic("/abc/toychat/2/huilong/proto")
|
||||
require.Error(t, err, ErrInvalidGeneration)
|
||||
@ -91,7 +91,7 @@ func TestShardChoiceSimulation(t *testing.T) {
|
||||
|
||||
// When
|
||||
for _, topic := range topics {
|
||||
pubsub := GetShardFromContentTopic(topic, GenerationZeroShardsCount)
|
||||
pubsub := GetShardFromContentTopic(topic, DefaultClusterIndex, GenerationZeroShardsCount)
|
||||
counts[pubsub.Shard()]++
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user