go-waku/waku/v2/protocol/relay/subscription.go

58 lines
1.6 KiB
Go

package relay
import (
"context"
"github.com/waku-org/go-waku/waku/v2/protocol"
"golang.org/x/exp/slices"
)
// Subscription handles the details of a particular Topic subscription. There may be many subscriptions for a given topic.
type Subscription struct {
ID int
Unsubscribe func() //for internal use only. For relay Subscription use relay protocol's unsubscribe
Ch chan *protocol.Envelope
contentFilter protocol.ContentFilter
subType SubscriptionType
noConsume bool
}
type SubscriptionType int
const (
SpecificContentTopics SubscriptionType = iota
AllContentTopics
)
// Submit allows a message to be submitted for a subscription
func (s *Subscription) Submit(ctx context.Context, msg *protocol.Envelope) {
//Filter and notify
// - if contentFilter doesn't have a contentTopic
// - if contentFilter has contentTopics and it matches with message
if !s.noConsume && (len(s.contentFilter.ContentTopicsList()) == 0 ||
(len(s.contentFilter.ContentTopicsList()) > 0 && slices.Contains[string](s.contentFilter.ContentTopicsList(), msg.Message().ContentTopic))) {
select {
case <-ctx.Done():
return
case s.Ch <- msg:
}
}
}
// NewSubscription creates a subscription that will only receive messages based on the contentFilter
func NewSubscription(contentFilter protocol.ContentFilter) *Subscription {
ch := make(chan *protocol.Envelope)
var subType SubscriptionType
if len(contentFilter.ContentTopicsList()) == 0 {
subType = AllContentTopics
}
return &Subscription{
Unsubscribe: func() {
close(ch)
},
Ch: ch,
contentFilter: contentFilter,
subType: subType,
}
}