mirror of
https://github.com/logos-messaging/logos-messaging-go.git
synced 2026-04-18 10:43:19 +00:00
refactor(relay): use single data structure to contain pubsub items (#907)
This commit is contained in:
parent
e743069387
commit
ad8f349817
@ -53,11 +53,8 @@ type WakuRelay struct {
|
||||
topicValidators map[string][]validatorFn
|
||||
defaultTopicValidators []validatorFn
|
||||
|
||||
// TODO: convert to concurrent maps
|
||||
topicsMutex sync.RWMutex
|
||||
wakuRelayTopics map[string]*pubsub.Topic
|
||||
relaySubs map[string]*pubsub.Subscription
|
||||
topicEvtHanders map[string]*pubsub.TopicEventHandler
|
||||
topicsMutex sync.RWMutex
|
||||
topics map[string]*pubsubTopicSubscriptionDetails
|
||||
|
||||
events event.Bus
|
||||
emitters struct {
|
||||
@ -65,18 +62,23 @@ type WakuRelay struct {
|
||||
EvtRelayUnsubscribed event.Emitter
|
||||
EvtPeerTopic event.Emitter
|
||||
}
|
||||
contentSubs map[string]map[int]*Subscription
|
||||
|
||||
*service.CommonService
|
||||
}
|
||||
|
||||
type pubsubTopicSubscriptionDetails struct {
|
||||
topic *pubsub.Topic
|
||||
subscription *pubsub.Subscription
|
||||
topicEventHandler *pubsub.TopicEventHandler
|
||||
contentSubs map[int]*Subscription
|
||||
}
|
||||
|
||||
// NewWakuRelay returns a new instance of a WakuRelay struct
|
||||
func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesource.Timesource,
|
||||
reg prometheus.Registerer, log *zap.Logger, opts ...pubsub.Option) *WakuRelay {
|
||||
w := new(WakuRelay)
|
||||
w.timesource = timesource
|
||||
w.wakuRelayTopics = make(map[string]*pubsub.Topic)
|
||||
w.relaySubs = make(map[string]*pubsub.Subscription)
|
||||
w.topicEvtHanders = make(map[string]*pubsub.TopicEventHandler)
|
||||
w.topics = make(map[string]*pubsubTopicSubscriptionDetails)
|
||||
w.topicValidators = make(map[string][]validatorFn)
|
||||
w.bcaster = bcaster
|
||||
w.minPeersToPublish = minPeersToPublish
|
||||
@ -88,7 +90,6 @@ func NewWakuRelay(bcaster Broadcaster, minPeersToPublish int, timesource timesou
|
||||
|
||||
// default options required by WakuRelay
|
||||
w.opts = append(w.defaultPubsubOptions(), opts...)
|
||||
w.contentSubs = make(map[string]map[int]*Subscription)
|
||||
return w
|
||||
}
|
||||
|
||||
@ -148,7 +149,7 @@ func (w *WakuRelay) Topics() []string {
|
||||
w.topicsMutex.RLock()
|
||||
|
||||
var result []string
|
||||
for topic := range w.relaySubs {
|
||||
for topic := range w.topics {
|
||||
result = append(result, topic)
|
||||
}
|
||||
return result
|
||||
@ -158,7 +159,7 @@ func (w *WakuRelay) Topics() []string {
|
||||
func (w *WakuRelay) IsSubscribed(topic string) bool {
|
||||
w.topicsMutex.RLock()
|
||||
defer w.topicsMutex.RUnlock()
|
||||
_, ok := w.relaySubs[topic]
|
||||
_, ok := w.topics[topic]
|
||||
return ok
|
||||
}
|
||||
|
||||
@ -168,10 +169,7 @@ func (w *WakuRelay) SetPubSub(pubSub *pubsub.PubSub) {
|
||||
}
|
||||
|
||||
func (w *WakuRelay) upsertTopic(topic string) (*pubsub.Topic, error) {
|
||||
w.topicsMutex.Lock()
|
||||
defer w.topicsMutex.Unlock()
|
||||
|
||||
pubSubTopic, ok := w.wakuRelayTopics[topic]
|
||||
topicData, ok := w.topics[topic]
|
||||
if !ok { // Joins topic if node hasn't joined yet
|
||||
err := w.pubsub.RegisterTopicValidator(topic, w.topicValidator(topic))
|
||||
if err != nil {
|
||||
@ -188,44 +186,55 @@ func (w *WakuRelay) upsertTopic(topic string) (*pubsub.Topic, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
w.wakuRelayTopics[topic] = newTopic
|
||||
pubSubTopic = newTopic
|
||||
w.topics[topic] = &pubsubTopicSubscriptionDetails{
|
||||
topic: newTopic,
|
||||
}
|
||||
|
||||
return newTopic, nil
|
||||
}
|
||||
return pubSubTopic, nil
|
||||
|
||||
return topicData.topic, nil
|
||||
}
|
||||
|
||||
func (w *WakuRelay) subscribeToPubsubTopic(topic string) (subs *pubsub.Subscription, err error) {
|
||||
sub, ok := w.relaySubs[topic]
|
||||
func (w *WakuRelay) subscribeToPubsubTopic(topic string) (*pubsubTopicSubscriptionDetails, error) {
|
||||
w.topicsMutex.Lock()
|
||||
defer w.topicsMutex.Unlock()
|
||||
|
||||
result, ok := w.topics[topic]
|
||||
if !ok {
|
||||
pubSubTopic, err := w.upsertTopic(topic)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sub, err = pubSubTopic.Subscribe(pubsub.WithBufferSize(1024))
|
||||
subscription, err := pubSubTopic.Subscribe(pubsub.WithBufferSize(1024))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
w.WaitGroup().Add(1)
|
||||
go w.pubsubTopicMsgHandler(topic, sub)
|
||||
go w.pubsubTopicMsgHandler(subscription)
|
||||
|
||||
evtHandler, err := w.addPeerTopicEventListener(pubSubTopic)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
w.topicEvtHanders[topic] = evtHandler
|
||||
w.relaySubs[topic] = sub
|
||||
|
||||
w.topics[topic].contentSubs = make(map[int]*Subscription)
|
||||
w.topics[topic].subscription = subscription
|
||||
w.topics[topic].topicEventHandler = evtHandler
|
||||
|
||||
err = w.emitters.EvtRelaySubscribed.Emit(EvtRelaySubscribed{topic, pubSubTopic})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
w.log.Info("gossipsub subscription", zap.String("pubsubTopic", sub.Topic()))
|
||||
w.log.Info("gossipsub subscription", zap.String("pubsubTopic", subscription.Topic()))
|
||||
|
||||
result = w.topics[topic]
|
||||
}
|
||||
|
||||
return sub, nil
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// PublishToTopic is used to broadcast a WakuMessage to a pubsub topic. The pubsubTopic is derived from contentTopic
|
||||
@ -262,6 +271,9 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, opts .
|
||||
return nil, errors.New("not enough peers to publish")
|
||||
}
|
||||
|
||||
w.topicsMutex.RLock()
|
||||
defer w.topicsMutex.RUnlock()
|
||||
|
||||
pubSubTopic, err := w.upsertTopic(params.pubsubTopic)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -284,6 +296,24 @@ func (w *WakuRelay) Publish(ctx context.Context, message *pb.WakuMessage, opts .
|
||||
return hash, nil
|
||||
}
|
||||
|
||||
func (w *WakuRelay) getSubscription(contentFilter waku_proto.ContentFilter) (*Subscription, error) {
|
||||
w.topicsMutex.RLock()
|
||||
defer w.topicsMutex.RUnlock()
|
||||
topicData, ok := w.topics[contentFilter.PubsubTopic]
|
||||
if ok {
|
||||
for _, sub := range topicData.contentSubs {
|
||||
if sub.contentFilter.Equals(contentFilter) {
|
||||
if sub.noConsume { //This check is to ensure that default no-consumer subscription is not returned
|
||||
continue
|
||||
}
|
||||
return sub, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil, errors.New("no subscription found for content topic")
|
||||
}
|
||||
|
||||
// GetSubscriptionWithPubsubTopic fetches subscription matching pubsub and contentTopic
|
||||
func (w *WakuRelay) GetSubscriptionWithPubsubTopic(pubsubTopic string, contentTopic string) (*Subscription, error) {
|
||||
var contentFilter waku_proto.ContentFilter
|
||||
@ -292,16 +322,8 @@ func (w *WakuRelay) GetSubscriptionWithPubsubTopic(pubsubTopic string, contentTo
|
||||
} else {
|
||||
contentFilter = waku_proto.NewContentFilter(pubsubTopic)
|
||||
}
|
||||
cSubs := w.contentSubs[pubsubTopic]
|
||||
for _, sub := range cSubs {
|
||||
if sub.contentFilter.Equals(contentFilter) {
|
||||
if sub.noConsume { //This check is to ensure that default no-consumer subscription is not returned
|
||||
continue
|
||||
}
|
||||
return sub, nil
|
||||
}
|
||||
}
|
||||
return nil, errors.New("no subscription found for content topic")
|
||||
|
||||
return w.getSubscription(contentFilter)
|
||||
}
|
||||
|
||||
// GetSubscription fetches subscription matching a contentTopic(via autosharding)
|
||||
@ -311,16 +333,8 @@ func (w *WakuRelay) GetSubscription(contentTopic string) (*Subscription, error)
|
||||
return nil, err
|
||||
}
|
||||
contentFilter := waku_proto.NewContentFilter(pubsubTopic, contentTopic)
|
||||
cSubs := w.contentSubs[pubsubTopic]
|
||||
for _, sub := range cSubs {
|
||||
if sub.contentFilter.Equals(contentFilter) {
|
||||
if sub.noConsume { //This check is to ensure that default no-consumer subscription is not returned
|
||||
continue
|
||||
}
|
||||
return sub, nil
|
||||
}
|
||||
}
|
||||
return nil, errors.New("no subscription found for content topic")
|
||||
|
||||
return w.getSubscription(contentFilter)
|
||||
}
|
||||
|
||||
// Stop unmounts the relay protocol and stops all subscriptions
|
||||
@ -365,7 +379,7 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont
|
||||
}
|
||||
|
||||
for pubSubTopic, cTopics := range pubSubTopicMap {
|
||||
w.log.Info("subscribing to", zap.String("pubsubTopic", pubSubTopic), zap.Strings("contenTopics", cTopics))
|
||||
w.log.Info("subscribing to", zap.String("pubsubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics))
|
||||
var cFilter waku_proto.ContentFilter
|
||||
cFilter.PubsubTopic = pubSubTopic
|
||||
cFilter.ContentTopics = waku_proto.NewContentTopicSet(cTopics...)
|
||||
@ -383,13 +397,13 @@ func (w *WakuRelay) subscribe(ctx context.Context, contentFilter waku_proto.Cont
|
||||
WithConsumerOption(params.dontConsume))
|
||||
|
||||
// Create Content subscription
|
||||
w.topicsMutex.RLock()
|
||||
if _, ok := w.contentSubs[pubSubTopic]; !ok {
|
||||
w.contentSubs[pubSubTopic] = map[int]*Subscription{}
|
||||
w.topicsMutex.Lock()
|
||||
topicData, ok := w.topics[pubSubTopic]
|
||||
if ok {
|
||||
topicData.contentSubs[subscription.ID] = subscription
|
||||
}
|
||||
w.contentSubs[pubSubTopic][subscription.ID] = subscription
|
||||
w.topicsMutex.Unlock()
|
||||
|
||||
w.topicsMutex.RUnlock()
|
||||
subscriptions = append(subscriptions, subscription)
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
@ -420,20 +434,22 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, contentFilter waku_proto.Co
|
||||
for pubSubTopic, cTopics := range pubSubTopicMap {
|
||||
cfTemp := waku_proto.NewContentFilter(pubSubTopic, cTopics...)
|
||||
pubsubUnsubscribe := false
|
||||
sub, ok := w.relaySubs[pubSubTopic]
|
||||
sub, ok := w.topics[pubSubTopic]
|
||||
if !ok {
|
||||
return errors.New("not subscribed to topic")
|
||||
}
|
||||
cSubs := w.contentSubs[pubSubTopic]
|
||||
if cSubs != nil {
|
||||
|
||||
topicData, ok := w.topics[pubSubTopic]
|
||||
if ok {
|
||||
//Remove relevant subscription
|
||||
for subID, sub := range cSubs {
|
||||
for subID, sub := range topicData.contentSubs {
|
||||
if sub.contentFilter.Equals(cfTemp) {
|
||||
sub.Unsubscribe()
|
||||
delete(cSubs, subID)
|
||||
delete(topicData.contentSubs, subID)
|
||||
}
|
||||
}
|
||||
if len(cSubs) == 0 {
|
||||
|
||||
if len(topicData.contentSubs) == 0 {
|
||||
pubsubUnsubscribe = true
|
||||
}
|
||||
} else {
|
||||
@ -456,40 +472,29 @@ func (w *WakuRelay) Unsubscribe(ctx context.Context, contentFilter waku_proto.Co
|
||||
|
||||
// unsubscribeFromPubsubTopic unsubscribes subscription from underlying pubsub.
|
||||
// Note: caller has to acquire topicsMutex in order to avoid race conditions
|
||||
func (w *WakuRelay) unsubscribeFromPubsubTopic(sub *pubsub.Subscription) error {
|
||||
func (w *WakuRelay) unsubscribeFromPubsubTopic(topicData *pubsubTopicSubscriptionDetails) error {
|
||||
|
||||
pubSubTopic := sub.Topic()
|
||||
pubSubTopic := topicData.subscription.Topic()
|
||||
w.log.Info("unsubscribing from topic", zap.String("topic", pubSubTopic))
|
||||
|
||||
sub.Cancel()
|
||||
delete(w.relaySubs, pubSubTopic)
|
||||
topicData.subscription.Cancel()
|
||||
topicData.topicEventHandler.Cancel()
|
||||
|
||||
w.bcaster.UnRegister(pubSubTopic)
|
||||
|
||||
delete(w.contentSubs, pubSubTopic)
|
||||
|
||||
evtHandler, ok := w.topicEvtHanders[pubSubTopic]
|
||||
if ok {
|
||||
evtHandler.Cancel()
|
||||
delete(w.topicEvtHanders, pubSubTopic)
|
||||
}
|
||||
|
||||
err := w.wakuRelayTopics[pubSubTopic].Close()
|
||||
err := topicData.topic.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
delete(w.wakuRelayTopics, pubSubTopic)
|
||||
|
||||
w.RemoveTopicValidator(pubSubTopic)
|
||||
|
||||
err = w.emitters.EvtRelayUnsubscribed.Emit(EvtRelayUnsubscribed{pubSubTopic})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
delete(w.topics, pubSubTopic)
|
||||
|
||||
return w.emitters.EvtRelayUnsubscribed.Emit(EvtRelayUnsubscribed{pubSubTopic})
|
||||
}
|
||||
|
||||
func (w *WakuRelay) pubsubTopicMsgHandler(pubsubTopic string, sub *pubsub.Subscription) {
|
||||
func (w *WakuRelay) pubsubTopicMsgHandler(sub *pubsub.Subscription) {
|
||||
defer w.WaitGroup().Done()
|
||||
|
||||
for {
|
||||
@ -508,7 +513,7 @@ func (w *WakuRelay) pubsubTopicMsgHandler(pubsubTopic string, sub *pubsub.Subscr
|
||||
return
|
||||
}
|
||||
|
||||
envelope := waku_proto.NewEnvelope(wakuMessage, w.timesource.Now().UnixNano(), pubsubTopic)
|
||||
envelope := waku_proto.NewEnvelope(wakuMessage, w.timesource.Now().UnixNano(), sub.Topic())
|
||||
w.metrics.RecordMessage(envelope)
|
||||
|
||||
w.bcaster.Submit(envelope)
|
||||
|
||||
@ -106,11 +106,11 @@ func TestGossipsubScore(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
sub, err := relay[i].subscribeToPubsubTopic(testTopic)
|
||||
topicData, err := relay[i].subscribeToPubsubTopic(testTopic)
|
||||
require.NoError(t, err)
|
||||
go func() {
|
||||
for {
|
||||
_, err := sub.Next(context.Background())
|
||||
_, err := topicData.subscription.Next(context.Background())
|
||||
if err != nil {
|
||||
t.Log(err)
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user