feat: improvements on filter protocol (client)

This commit is contained in:
Richard Ramos 2023-02-08 19:33:06 -04:00 committed by RichΛrd
parent f255adffd9
commit 52f7c8d86e
7 changed files with 604 additions and 72 deletions

2
go.mod
View File

@ -76,7 +76,7 @@ require (
github.com/golang/mock v1.6.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/google/uuid v1.3.0
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect

View File

@ -79,7 +79,8 @@ type WakuNode struct {
discoveryV5 Service
peerExchange Service
filter ReceptorService
filterV2 ReceptorService
filterV2Full ReceptorService
filterV2Light Service
store ReceptorService
rlnRelay RLNRelay
@ -210,7 +211,8 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.relay = relay.NewWakuRelay(w.host, w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...)
w.filter = filter.NewWakuFilter(w.host, w.bcaster, w.opts.isFilterFullNode, w.timesource, w.log, w.opts.filterOpts...)
w.filterV2 = filterv2.NewWakuFilter(w.host, w.bcaster, w.timesource, w.log, w.opts.filterOpts...)
w.filterV2Full = filterv2.NewWakuFilter(w.host, w.bcaster, w.timesource, w.log, w.opts.filterOpts...)
w.filterV2Light = filterv2.NewWakuFilterPush(w.host, w.bcaster, w.timesource, w.log)
w.lightPush = lightpush.NewWakuLightPush(w.host, w.Relay(), w.log)
if w.opts.enableSwap {
@ -357,13 +359,20 @@ func (w *WakuNode) Start(ctx context.Context) error {
}
if w.opts.enableFilterV2FullNode {
err := w.filterV2.Start(ctx)
err := w.filterV2Full.Start(ctx)
if err != nil {
return err
}
w.log.Info("Subscribing filterV2 to broadcaster")
w.bcaster.Register(nil, w.filterV2.MessageChannel())
w.bcaster.Register(nil, w.filterV2Full.MessageChannel())
}
if w.opts.enableFilterV2LightNode {
err := w.filterV2Light.Start(ctx)
if err != nil {
return err
}
}
err = w.setupENR(ctx, w.ListenAddresses())
@ -407,7 +416,7 @@ func (w *WakuNode) Stop() {
w.lightPush.Stop()
w.store.Stop()
w.filter.Stop()
w.filterV2.Stop()
w.filterV2Full.Stop()
w.peerExchange.Stop()
if w.opts.enableDiscV5 {
@ -503,6 +512,14 @@ func (w *WakuNode) Filter() *filter.WakuFilter {
return nil
}
// FilterV2 is used to access any operation related to Waku Filter protocol
func (w *WakuNode) FilterV2() *filterv2.WakuFilterPush {
if result, ok := w.filterV2Light.(*filterv2.WakuFilterPush); ok {
return result
}
return nil
}
// Lightpush is used to access any operation related to Waku Lightpush protocol
func (w *WakuNode) Lightpush() *lightpush.WakuLightPush {
if result, ok := w.lightPush.(*lightpush.WakuLightPush); ok {

View File

@ -1,7 +1,270 @@
package filterv2
import libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
import (
"context"
"encoding/hex"
"errors"
"math"
"sync"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-msgio/protoio"
"github.com/waku-org/go-waku/logging"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
"go.opencensus.io/tag"
"go.uber.org/zap"
)
// FilterPushID_v20beta1 is the current Waku Filter protocol identifier used to allow
// filter service nodes to push messages matching registered subscriptions to this client.
const FilterPushID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter-push/2.0.0-beta1")
var (
ErrNoPeersAvailable = errors.New("no suitable remote peers")
)
type WakuFilterPush struct {
cancel context.CancelFunc
ctx context.Context
h host.Host
broadcaster v2.Broadcaster
timesource timesource.Timesource
wg *sync.WaitGroup
log *zap.Logger
subscriptions *SubscriptionsMap
}
type ContentFilter struct {
Topic string
ContentTopics []string
}
// NewWakuRelay returns a new instance of Waku Filter struct setup according to the chosen parameter and options
func NewWakuFilterPush(host host.Host, broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger) *WakuFilterPush {
wf := new(WakuFilterPush)
wf.log = log.Named("filter")
wf.broadcaster = broadcaster
wf.timesource = timesource
wf.wg = &sync.WaitGroup{}
wf.h = host
return wf
}
func (wf *WakuFilterPush) Start(ctx context.Context) error {
wf.wg.Wait() // Wait for any goroutines to stop
ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter"))
if err != nil {
wf.log.Error("creating tag map", zap.Error(err))
return errors.New("could not start waku filter")
}
ctx, cancel := context.WithCancel(ctx)
wf.cancel = cancel
wf.ctx = ctx
wf.subscriptions = NewSubscriptionMap()
wf.h.SetStreamHandlerMatch(FilterPushID_v20beta1, protocol.PrefixTextMatch(string(FilterPushID_v20beta1)), wf.onRequest(ctx))
wf.wg.Add(1)
// TODO: go wf.keepAliveSubscriptions(ctx)
wf.log.Info("filter protocol (light) started")
return nil
}
// Stop unmounts the filter protocol
func (wf *WakuFilterPush) Stop() {
if wf.cancel == nil {
return
}
wf.cancel()
wf.h.RemoveStreamHandler(FilterPushID_v20beta1)
wf.UnsubscribeAll(wf.ctx)
wf.subscriptions.Clear()
wf.wg.Wait()
}
func (wf *WakuFilterPush) onRequest(ctx context.Context) func(s network.Stream) {
return func(s network.Stream) {
defer s.Close()
logger := wf.log.With(logging.HostID("peer", s.Conn().RemotePeer()))
reader := protoio.NewDelimitedReader(s, math.MaxInt32)
messagePush := &pb.MessagePushV2{}
err := reader.ReadMsg(messagePush)
if err != nil {
logger.Error("reading message push", zap.Error(err))
return
}
wf.notify(s.Conn().RemotePeer(), messagePush.PubsubTopic, messagePush.WakuMessage)
logger.Info("received message push")
}
}
func (wf *WakuFilterPush) notify(remotePeerID peer.ID, pubsubTopic string, msg *pb.WakuMessage) {
envelope := protocol.NewEnvelope(msg, wf.timesource.Now().UnixNano(), pubsubTopic)
// Broadcasting message so it's stored
wf.broadcaster.Submit(envelope)
// Notify filter subscribers
wf.subscriptions.Notify(remotePeerID, envelope)
}
func (wf *WakuFilterPush) request(ctx context.Context, params *FilterSubscribeParameters, reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter ContentFilter) error {
err := wf.h.Connect(ctx, wf.h.Peerstore().PeerInfo(params.selectedPeer))
if err != nil {
return err
}
var conn network.Stream
conn, err = wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_v20beta1)
if err != nil {
return err
}
writer := protoio.NewDelimitedWriter(conn)
request := &pb.FilterSubscribeRequest{
RequestId: hex.EncodeToString(params.requestId),
FilterSubscribeType: reqType,
PubsubTopic: contentFilter.Topic,
ContentTopics: contentFilter.ContentTopics,
}
wf.log.Debug("sending FilterSubscribeRequest", zap.Stringer("request", request))
err = writer.WriteMsg(request)
if err != nil {
wf.log.Error("sending FilterSubscribeRequest", zap.Error(err))
return err
}
defer conn.Close()
return nil
}
// Subscribe setups a subscription to receive messages that match a specific content filter
func (wf *WakuFilterPush) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) error {
// TODO: validate content filters
params := new(FilterSubscribeParameters)
params.log = wf.log
params.host = wf.h
optList := DefaultSubscriptionOptions()
optList = append(optList, opts...)
for _, opt := range optList {
opt(params)
}
if params.selectedPeer == "" {
return ErrNoPeersAvailable
}
err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, contentFilter)
if err != nil {
return err
}
return nil
}
// SubscriptionChannel is used to obtain an object from which you could receive messages received via filter protocol
func (wf *WakuFilterPush) SubscriptionChannel(peerID peer.ID, topic string, contentTopics []string) *SubscriptionDetails {
return wf.subscriptions.NewSubscription(peerID, topic, contentTopics)
}
func (wf *WakuFilterPush) getUnsubscribeParameters(opts ...FilterUnsubscribeOption) (*FilterUnsubscribeParameters, error) {
params := new(FilterUnsubscribeParameters)
params.log = wf.log
for _, opt := range opts {
opt(params)
}
if !params.unsubscribeAll && params.selectedPeer == "" {
return nil, ErrNoPeersAvailable
}
return params, nil
}
// Unsubscribe is used to stop receiving messages from a peer that match a content filter
func (wf *WakuFilterPush) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterUnsubscribeOption) error {
// TODO: checks if a subscription exists with the chosen criteria
params, err := wf.getUnsubscribeParameters(opts...)
if err != nil {
return err
}
for peerID := range wf.subscriptions.items {
if !params.unsubscribeAll && peerID != params.selectedPeer {
continue
}
go func(peerID peer.ID) {
defer wf.wg.Done()
err := wf.request(
ctx,
&FilterSubscribeParameters{selectedPeer: peerID},
pb.FilterSubscribeRequest_UNSUBSCRIBE,
ContentFilter{})
if err != nil {
wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err))
}
}(peerID)
}
return nil
}
// UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions
func (wf *WakuFilterPush) UnsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) error {
params, err := wf.getUnsubscribeParameters(opts...)
if err != nil {
return err
}
wf.subscriptions.Lock()
defer wf.subscriptions.Unlock()
wf.wg.Add(len(wf.subscriptions.items))
for peerID := range wf.subscriptions.items {
if !params.unsubscribeAll && peerID != params.selectedPeer {
continue
}
go func(peerID peer.ID) {
defer wf.wg.Done()
err := wf.request(
ctx,
&FilterSubscribeParameters{selectedPeer: peerID},
pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL,
ContentFilter{})
if err != nil {
wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err))
}
}(peerID)
}
return nil
}

View File

@ -0,0 +1,108 @@
package filterv2
import (
"context"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
)
type (
FilterSubscribeParameters struct {
host host.Host
selectedPeer peer.ID
requestId []byte
log *zap.Logger
}
FilterUnsubscribeParameters struct {
unsubscribeAll bool
selectedPeer peer.ID
requestId []byte
log *zap.Logger
}
FilterSubscribeOption func(*FilterSubscribeParameters)
FilterUnsubscribeOption func(*FilterUnsubscribeParameters)
)
func WithPeer(p peer.ID) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) {
params.selectedPeer = p
}
}
// WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store.
// If a list of specific peers is passed, the peer will be chosen from that list assuming it
// supports the chosen protocol, otherwise it will chose a peer from the node peerstore
func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) {
p, err := utils.SelectPeer(params.host, string(FilterSubscribeID_v20beta1), fromThesePeers, params.log)
if err == nil {
params.selectedPeer = p
} else {
params.log.Info("selecting peer", zap.Error(err))
}
}
}
// WithFastestPeerSelection is an option used to select a peer from the peer store
// with the lowest ping If a list of specific peers is passed, the peer will be chosen
// from that list assuming it supports the chosen protocol, otherwise it will chose a
// peer from the node peerstore
func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) {
p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterSubscribeID_v20beta1), fromThesePeers, params.log)
if err == nil {
params.selectedPeer = p
} else {
params.log.Info("selecting peer", zap.Error(err))
}
}
}
func WithRequestId(requestId []byte) FilterSubscribeOption {
return func(params *FilterSubscribeParameters) {
params.requestId = requestId
}
}
func WithAutomaticRequestId() FilterSubscribeOption {
return func(params *FilterSubscribeParameters) {
params.requestId = protocol.GenerateRequestId()
}
}
func DefaultSubscriptionOptions() []FilterSubscribeOption {
return []FilterSubscribeOption{
WithAutomaticPeerSelection(),
WithAutomaticRequestId(),
}
}
func UnsubscribeAll() FilterUnsubscribeOption {
return func(params *FilterUnsubscribeParameters) {
params.unsubscribeAll = true
}
}
func RequestID(requestId []byte) FilterUnsubscribeOption {
return func(params *FilterUnsubscribeParameters) {
params.requestId = requestId
}
}
func AutomaticRequestId() FilterUnsubscribeOption {
return func(params *FilterUnsubscribeParameters) {
params.requestId = protocol.GenerateRequestId()
}
}
func DefaultUnsubscribeOptions() []FilterUnsubscribeOption {
return []FilterUnsubscribeOption{
AutomaticRequestId(),
}
}

View File

@ -36,7 +36,7 @@ type (
wg *sync.WaitGroup
log *zap.Logger
subscriptions *SubscriptionMap
subscriptions *SubscribersMap
}
)
@ -54,7 +54,7 @@ func NewWakuFilter(host host.Host, broadcaster v2.Broadcaster, timesource timeso
wf.wg = &sync.WaitGroup{}
wf.h = host
wf.subscriptions = NewSubscriptionMap(broadcaster, timesource, params.Timeout)
wf.subscriptions = NewSubscribersMap(params.Timeout)
return wf
}
@ -78,7 +78,7 @@ func (wf *WakuFilter) Start(ctx context.Context) error {
wf.wg.Add(1)
go wf.filterListener(ctx)
wf.log.Info("filter protocol started")
wf.log.Info("filter protocol (full) started")
return nil
}

View File

@ -8,9 +8,6 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/libp2p/go-libp2p/core/peer"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
)
var ErrNotFound = errors.New("not found")
@ -21,36 +18,26 @@ type PeerSet map[peer.ID]struct{}
type PubsubTopics map[string]ContentTopicSet // pubsubTopic => contentTopics
type SubscriptionMap struct {
type SubscribersMap struct {
sync.RWMutex
timesource timesource.Timesource
items map[peer.ID]PubsubTopics
interestMap map[string]PeerSet // key: sha256(pubsubTopic-contentTopic) => peers
timeout time.Duration
failedPeers map[peer.ID]time.Time
broadcaster v2.Broadcaster
}
type SubscriptionItem struct {
Key peer.ID
Value PubsubTopics
}
func NewSubscriptionMap(broadcaster v2.Broadcaster, timesource timesource.Timesource, timeout time.Duration) *SubscriptionMap {
return &SubscriptionMap{
timesource: timesource,
func NewSubscribersMap(timeout time.Duration) *SubscribersMap {
return &SubscribersMap{
items: make(map[peer.ID]PubsubTopics),
interestMap: make(map[string]PeerSet),
broadcaster: broadcaster,
timeout: timeout,
failedPeers: make(map[peer.ID]time.Time),
}
}
func (sub *SubscriptionMap) Set(peerID peer.ID, pubsubTopic string, contentTopics []string) {
func (sub *SubscribersMap) Set(peerID peer.ID, pubsubTopic string, contentTopics []string) {
sub.Lock()
defer sub.Unlock()
@ -83,7 +70,7 @@ func (sub *SubscriptionMap) Set(peerID peer.ID, pubsubTopic string, contentTopic
}
}
func (sub *SubscriptionMap) Get(peerID peer.ID) (PubsubTopics, bool) {
func (sub *SubscribersMap) Get(peerID peer.ID) (PubsubTopics, bool) {
sub.RLock()
defer sub.RUnlock()
@ -92,7 +79,7 @@ func (sub *SubscriptionMap) Get(peerID peer.ID) (PubsubTopics, bool) {
return value, ok
}
func (sub *SubscriptionMap) Has(peerID peer.ID) bool {
func (sub *SubscribersMap) Has(peerID peer.ID) bool {
sub.RLock()
defer sub.RUnlock()
@ -101,7 +88,7 @@ func (sub *SubscriptionMap) Has(peerID peer.ID) bool {
return ok
}
func (sub *SubscriptionMap) Delete(peerID peer.ID, pubsubTopic string, contentTopics []string) error {
func (sub *SubscribersMap) Delete(peerID peer.ID, pubsubTopic string, contentTopics []string) error {
sub.Lock()
defer sub.Unlock()
@ -146,7 +133,7 @@ func (sub *SubscriptionMap) Delete(peerID peer.ID, pubsubTopic string, contentTo
return nil
}
func (sub *SubscriptionMap) deleteAll(peerID peer.ID) error {
func (sub *SubscribersMap) deleteAll(peerID peer.ID) error {
pubsubTopicMap, ok := sub.items[peerID]
if !ok {
return ErrNotFound
@ -169,14 +156,14 @@ func (sub *SubscriptionMap) deleteAll(peerID peer.ID) error {
return nil
}
func (sub *SubscriptionMap) DeleteAll(peerID peer.ID) error {
func (sub *SubscribersMap) DeleteAll(peerID peer.ID) error {
sub.Lock()
defer sub.Unlock()
return sub.deleteAll(peerID)
}
func (sub *SubscriptionMap) RemoveAll() {
func (sub *SubscribersMap) RemoveAll() {
sub.Lock()
defer sub.Unlock()
@ -185,7 +172,8 @@ func (sub *SubscriptionMap) RemoveAll() {
delete(sub.items, k)
}
}
func (sub *SubscriptionMap) Items(pubsubTopic string, contentTopic string) <-chan peer.ID {
func (sub *SubscribersMap) Items(pubsubTopic string, contentTopic string) <-chan peer.ID {
c := make(chan peer.ID)
onlyPubsubTopicKey := getKey(pubsubTopic, nil)
@ -194,11 +182,17 @@ func (sub *SubscriptionMap) Items(pubsubTopic string, contentTopic string) <-cha
f := func() {
sub.RLock()
defer sub.RUnlock()
for p := range sub.interestMap[onlyPubsubTopicKey] {
c <- p
if peers, ok := sub.interestMap[onlyPubsubTopicKey]; ok {
for p := range peers {
c <- p
}
}
for p := range sub.interestMap[pubsubAndContentTopicKey] {
c <- p
if peers, ok := sub.interestMap[pubsubAndContentTopicKey]; ok {
for p := range peers {
c <- p
}
}
close(c)
}
@ -207,35 +201,7 @@ func (sub *SubscriptionMap) Items(pubsubTopic string, contentTopic string) <-cha
return c
}
func (fm *SubscriptionMap) Notify(msg *pb.WakuMessage, peerID peer.ID) {
/*fm.RLock()
defer fm.RUnlock()
filter, ok := fm.items[peerID]
if !ok {
return
}
envelope := protocol.NewEnvelope(msg, fm.timesource.Now().UnixNano(), filter.Topic)
// Broadcasting message so it's stored
fm.broadcaster.Submit(envelope)
if msg.ContentTopic == "" {
filter.Chan <- envelope
}
// TODO: In case of no topics we should either trigger here for all messages,
// or we should not allow such filter to exist in the first place.
for _, contentTopic := range filter.ContentFilters {
if msg.ContentTopic == contentTopic {
filter.Chan <- envelope
break
}
}*/
}
func (sub *SubscriptionMap) addToInterestMap(peerID peer.ID, pubsubTopic string, contentTopic *string) {
func (sub *SubscribersMap) addToInterestMap(peerID peer.ID, pubsubTopic string, contentTopic *string) {
key := getKey(pubsubTopic, contentTopic)
peerSet, ok := sub.interestMap[key]
if !ok {
@ -245,9 +211,12 @@ func (sub *SubscriptionMap) addToInterestMap(peerID peer.ID, pubsubTopic string,
sub.interestMap[key] = peerSet
}
func (sub *SubscriptionMap) removeFromInterestMap(peerID peer.ID, pubsubTopic string, contentTopic *string) {
func (sub *SubscribersMap) removeFromInterestMap(peerID peer.ID, pubsubTopic string, contentTopic *string) {
key := getKey(pubsubTopic, contentTopic)
delete(sub.interestMap, key)
_, exists := sub.interestMap[key]
if exists {
delete(sub.interestMap[key], peerID)
}
}
func getKey(pubsubTopic string, contentTopic *string) string {
@ -260,14 +229,14 @@ func getKey(pubsubTopic string, contentTopic *string) string {
}
}
func (sub *SubscriptionMap) IsFailedPeer(peerID peer.ID) bool {
func (sub *SubscribersMap) IsFailedPeer(peerID peer.ID) bool {
sub.RLock()
defer sub.RUnlock()
_, ok := sub.failedPeers[peerID]
return ok
}
func (sub *SubscriptionMap) FlagAsSuccess(peerID peer.ID) {
func (sub *SubscribersMap) FlagAsSuccess(peerID peer.ID) {
sub.Lock()
defer sub.Unlock()
@ -277,7 +246,7 @@ func (sub *SubscriptionMap) FlagAsSuccess(peerID peer.ID) {
}
}
func (sub *SubscriptionMap) FlagAsFailure(peerID peer.ID) {
func (sub *SubscribersMap) FlagAsFailure(peerID peer.ID) {
sub.Lock()
defer sub.Unlock()

View File

@ -0,0 +1,175 @@
package filterv2
import (
"sync"
"github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol"
)
type SubscriptionDetails struct {
sync.RWMutex
id string
mapRef *SubscriptionsMap
closed bool
once sync.Once
peerID peer.ID
pubsubTopic string
contentTopics map[string]struct{}
C chan *protocol.Envelope
}
type SubscriptionSet map[string]*SubscriptionDetails
type PeerSubscription struct {
peerID peer.ID
subscriptionsPerTopic map[string]SubscriptionSet
}
type SubscriptionsMap struct {
sync.RWMutex
items map[peer.ID]*PeerSubscription
}
func NewSubscriptionMap() *SubscriptionsMap {
return &SubscriptionsMap{
items: make(map[peer.ID]*PeerSubscription),
}
}
func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, topic string, contentTopics []string) *SubscriptionDetails {
sub.Lock()
defer sub.Unlock()
peerSubscription, ok := sub.items[peerID]
if !ok {
peerSubscription = &PeerSubscription{
peerID: peerID,
subscriptionsPerTopic: make(map[string]SubscriptionSet),
}
sub.items[peerID] = peerSubscription
}
_, ok = peerSubscription.subscriptionsPerTopic[topic]
if !ok {
peerSubscription.subscriptionsPerTopic[topic] = make(SubscriptionSet)
}
details := &SubscriptionDetails{
id: uuid.NewString(),
mapRef: sub,
peerID: peerID,
pubsubTopic: topic,
C: make(chan *protocol.Envelope),
}
for _, ct := range contentTopics {
details.contentTopics[ct] = struct{}{}
}
sub.items[peerID].subscriptionsPerTopic[topic][details.id] = details
return details
}
func (sub *SubscriptionsMap) Delete(subscription *SubscriptionDetails) error {
sub.Lock()
defer sub.Unlock()
peerSubscription, ok := sub.items[subscription.peerID]
if !ok {
return ErrNotFound
}
delete(peerSubscription.subscriptionsPerTopic[subscription.pubsubTopic], subscription.id)
return nil
}
func (s *SubscriptionDetails) Add(contentTopics []string) {
s.Lock()
defer s.Unlock()
for _, ct := range contentTopics {
s.contentTopics[ct] = struct{}{}
}
}
func (s *SubscriptionDetails) Remove(contentTopics []string) {
s.Lock()
defer s.Unlock()
for _, ct := range contentTopics {
delete(s.contentTopics, ct)
}
}
func (s *SubscriptionDetails) closeC() {
s.once.Do(func() {
s.Lock()
defer s.Unlock()
s.closed = true
close(s.C)
})
}
func (s *SubscriptionDetails) Close() error {
s.closeC()
return s.mapRef.Delete(s)
}
func (sub *SubscriptionsMap) clear() {
for _, peerSubscription := range sub.items {
for _, subscriptionSet := range peerSubscription.subscriptionsPerTopic {
for _, subscription := range subscriptionSet {
subscription.closeC()
}
}
}
sub.items = make(map[peer.ID]*PeerSubscription)
}
func (sub *SubscriptionsMap) Clear() {
sub.Lock()
defer sub.Unlock()
sub.clear()
}
func (sub *SubscriptionsMap) Notify(peerID peer.ID, envelope *protocol.Envelope) {
sub.RLock()
defer sub.RUnlock()
subscriptions, ok := sub.items[peerID].subscriptionsPerTopic[envelope.PubsubTopic()]
if ok {
iterateSubscriptionSet(subscriptions, envelope)
}
subscriptionsWithNoPeer, ok := sub.items[peerID].subscriptionsPerTopic[envelope.PubsubTopic()]
if ok {
iterateSubscriptionSet(subscriptionsWithNoPeer, envelope)
}
}
func iterateSubscriptionSet(subscriptions SubscriptionSet, envelope *protocol.Envelope) {
for _, subscription := range subscriptions {
func(subscription *SubscriptionDetails) {
subscription.RLock()
defer subscription.RUnlock()
_, ok := subscription.contentTopics[envelope.Message().ContentTopic]
if !ok && len(subscription.contentTopics) != 0 { // TODO: confirm if no content topics are allowed
return
}
if !subscription.closed {
subscription.C <- envelope
}
}(subscription)
}
}