diff --git a/go.mod b/go.mod index 223c9560..8563b305 100644 --- a/go.mod +++ b/go.mod @@ -76,7 +76,7 @@ require ( github.com/golang/mock v1.6.0 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/gopacket v1.1.19 // indirect - github.com/google/uuid v1.3.0 // indirect + github.com/google/uuid v1.3.0 github.com/gorilla/websocket v1.5.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 0dd198af..9acc8300 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -79,7 +79,8 @@ type WakuNode struct { discoveryV5 Service peerExchange Service filter ReceptorService - filterV2 ReceptorService + filterV2Full ReceptorService + filterV2Light Service store ReceptorService rlnRelay RLNRelay @@ -210,7 +211,8 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) { w.relay = relay.NewWakuRelay(w.host, w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...) w.filter = filter.NewWakuFilter(w.host, w.bcaster, w.opts.isFilterFullNode, w.timesource, w.log, w.opts.filterOpts...) - w.filterV2 = filterv2.NewWakuFilter(w.host, w.bcaster, w.timesource, w.log, w.opts.filterOpts...) + w.filterV2Full = filterv2.NewWakuFilter(w.host, w.bcaster, w.timesource, w.log, w.opts.filterOpts...) + w.filterV2Light = filterv2.NewWakuFilterPush(w.host, w.bcaster, w.timesource, w.log) w.lightPush = lightpush.NewWakuLightPush(w.host, w.Relay(), w.log) if w.opts.enableSwap { @@ -357,13 +359,20 @@ func (w *WakuNode) Start(ctx context.Context) error { } if w.opts.enableFilterV2FullNode { - err := w.filterV2.Start(ctx) + err := w.filterV2Full.Start(ctx) if err != nil { return err } w.log.Info("Subscribing filterV2 to broadcaster") - w.bcaster.Register(nil, w.filterV2.MessageChannel()) + w.bcaster.Register(nil, w.filterV2Full.MessageChannel()) + } + + if w.opts.enableFilterV2LightNode { + err := w.filterV2Light.Start(ctx) + if err != nil { + return err + } } err = w.setupENR(ctx, w.ListenAddresses()) @@ -407,7 +416,7 @@ func (w *WakuNode) Stop() { w.lightPush.Stop() w.store.Stop() w.filter.Stop() - w.filterV2.Stop() + w.filterV2Full.Stop() w.peerExchange.Stop() if w.opts.enableDiscV5 { @@ -503,6 +512,14 @@ func (w *WakuNode) Filter() *filter.WakuFilter { return nil } +// FilterV2 is used to access any operation related to Waku Filter protocol +func (w *WakuNode) FilterV2() *filterv2.WakuFilterPush { + if result, ok := w.filterV2Light.(*filterv2.WakuFilterPush); ok { + return result + } + return nil +} + // Lightpush is used to access any operation related to Waku Lightpush protocol func (w *WakuNode) Lightpush() *lightpush.WakuLightPush { if result, ok := w.lightPush.(*lightpush.WakuLightPush); ok { diff --git a/waku/v2/protocol/filterv2/client.go b/waku/v2/protocol/filterv2/client.go index 5f91ef2d..40167089 100644 --- a/waku/v2/protocol/filterv2/client.go +++ b/waku/v2/protocol/filterv2/client.go @@ -1,7 +1,270 @@ package filterv2 -import libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" +import ( + "context" + "encoding/hex" + "errors" + "math" + "sync" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" + libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-msgio/protoio" + "github.com/waku-org/go-waku/logging" + v2 "github.com/waku-org/go-waku/waku/v2" + "github.com/waku-org/go-waku/waku/v2/metrics" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/timesource" + "go.opencensus.io/tag" + "go.uber.org/zap" +) // FilterPushID_v20beta1 is the current Waku Filter protocol identifier used to allow // filter service nodes to push messages matching registered subscriptions to this client. const FilterPushID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter-push/2.0.0-beta1") + +var ( + ErrNoPeersAvailable = errors.New("no suitable remote peers") +) + +type WakuFilterPush struct { + cancel context.CancelFunc + ctx context.Context + h host.Host + broadcaster v2.Broadcaster + timesource timesource.Timesource + wg *sync.WaitGroup + log *zap.Logger + subscriptions *SubscriptionsMap +} + +type ContentFilter struct { + Topic string + ContentTopics []string +} + +// NewWakuRelay returns a new instance of Waku Filter struct setup according to the chosen parameter and options +func NewWakuFilterPush(host host.Host, broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger) *WakuFilterPush { + wf := new(WakuFilterPush) + wf.log = log.Named("filter") + wf.broadcaster = broadcaster + wf.timesource = timesource + wf.wg = &sync.WaitGroup{} + wf.h = host + + return wf +} + +func (wf *WakuFilterPush) Start(ctx context.Context) error { + wf.wg.Wait() // Wait for any goroutines to stop + + ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter")) + if err != nil { + wf.log.Error("creating tag map", zap.Error(err)) + return errors.New("could not start waku filter") + } + + ctx, cancel := context.WithCancel(ctx) + wf.cancel = cancel + wf.ctx = ctx + wf.subscriptions = NewSubscriptionMap() + + wf.h.SetStreamHandlerMatch(FilterPushID_v20beta1, protocol.PrefixTextMatch(string(FilterPushID_v20beta1)), wf.onRequest(ctx)) + + wf.wg.Add(1) + + // TODO: go wf.keepAliveSubscriptions(ctx) + + wf.log.Info("filter protocol (light) started") + + return nil +} + +// Stop unmounts the filter protocol +func (wf *WakuFilterPush) Stop() { + if wf.cancel == nil { + return + } + + wf.cancel() + + wf.h.RemoveStreamHandler(FilterPushID_v20beta1) + + wf.UnsubscribeAll(wf.ctx) + + wf.subscriptions.Clear() + + wf.wg.Wait() +} + +func (wf *WakuFilterPush) onRequest(ctx context.Context) func(s network.Stream) { + return func(s network.Stream) { + defer s.Close() + logger := wf.log.With(logging.HostID("peer", s.Conn().RemotePeer())) + + reader := protoio.NewDelimitedReader(s, math.MaxInt32) + + messagePush := &pb.MessagePushV2{} + err := reader.ReadMsg(messagePush) + if err != nil { + logger.Error("reading message push", zap.Error(err)) + return + } + + wf.notify(s.Conn().RemotePeer(), messagePush.PubsubTopic, messagePush.WakuMessage) + + logger.Info("received message push") + } +} + +func (wf *WakuFilterPush) notify(remotePeerID peer.ID, pubsubTopic string, msg *pb.WakuMessage) { + envelope := protocol.NewEnvelope(msg, wf.timesource.Now().UnixNano(), pubsubTopic) + + // Broadcasting message so it's stored + wf.broadcaster.Submit(envelope) + + // Notify filter subscribers + wf.subscriptions.Notify(remotePeerID, envelope) +} + +func (wf *WakuFilterPush) request(ctx context.Context, params *FilterSubscribeParameters, reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter ContentFilter) error { + err := wf.h.Connect(ctx, wf.h.Peerstore().PeerInfo(params.selectedPeer)) + if err != nil { + return err + } + + var conn network.Stream + conn, err = wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_v20beta1) + if err != nil { + return err + } + + writer := protoio.NewDelimitedWriter(conn) + + request := &pb.FilterSubscribeRequest{ + RequestId: hex.EncodeToString(params.requestId), + FilterSubscribeType: reqType, + PubsubTopic: contentFilter.Topic, + ContentTopics: contentFilter.ContentTopics, + } + + wf.log.Debug("sending FilterSubscribeRequest", zap.Stringer("request", request)) + err = writer.WriteMsg(request) + if err != nil { + wf.log.Error("sending FilterSubscribeRequest", zap.Error(err)) + return err + } + + defer conn.Close() + return nil +} + +// Subscribe setups a subscription to receive messages that match a specific content filter +func (wf *WakuFilterPush) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) error { + // TODO: validate content filters + + params := new(FilterSubscribeParameters) + params.log = wf.log + params.host = wf.h + + optList := DefaultSubscriptionOptions() + optList = append(optList, opts...) + for _, opt := range optList { + opt(params) + } + + if params.selectedPeer == "" { + return ErrNoPeersAvailable + } + + err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, contentFilter) + if err != nil { + return err + } + + return nil +} + +// SubscriptionChannel is used to obtain an object from which you could receive messages received via filter protocol +func (wf *WakuFilterPush) SubscriptionChannel(peerID peer.ID, topic string, contentTopics []string) *SubscriptionDetails { + return wf.subscriptions.NewSubscription(peerID, topic, contentTopics) +} + +func (wf *WakuFilterPush) getUnsubscribeParameters(opts ...FilterUnsubscribeOption) (*FilterUnsubscribeParameters, error) { + params := new(FilterUnsubscribeParameters) + params.log = wf.log + for _, opt := range opts { + opt(params) + } + + if !params.unsubscribeAll && params.selectedPeer == "" { + return nil, ErrNoPeersAvailable + } + + return params, nil +} + +// Unsubscribe is used to stop receiving messages from a peer that match a content filter +func (wf *WakuFilterPush) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterUnsubscribeOption) error { + // TODO: checks if a subscription exists with the chosen criteria + + params, err := wf.getUnsubscribeParameters(opts...) + if err != nil { + return err + } + + for peerID := range wf.subscriptions.items { + if !params.unsubscribeAll && peerID != params.selectedPeer { + continue + } + + go func(peerID peer.ID) { + defer wf.wg.Done() + err := wf.request( + ctx, + &FilterSubscribeParameters{selectedPeer: peerID}, + pb.FilterSubscribeRequest_UNSUBSCRIBE, + ContentFilter{}) + if err != nil { + wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err)) + } + }(peerID) + } + + return nil +} + +// UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions +func (wf *WakuFilterPush) UnsubscribeAll(ctx context.Context, opts ...FilterUnsubscribeOption) error { + params, err := wf.getUnsubscribeParameters(opts...) + if err != nil { + return err + } + + wf.subscriptions.Lock() + defer wf.subscriptions.Unlock() + + wf.wg.Add(len(wf.subscriptions.items)) + for peerID := range wf.subscriptions.items { + if !params.unsubscribeAll && peerID != params.selectedPeer { + continue + } + + go func(peerID peer.ID) { + defer wf.wg.Done() + err := wf.request( + ctx, + &FilterSubscribeParameters{selectedPeer: peerID}, + pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL, + ContentFilter{}) + if err != nil { + wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err)) + } + }(peerID) + } + + return nil +} diff --git a/waku/v2/protocol/filterv2/options.go b/waku/v2/protocol/filterv2/options.go new file mode 100644 index 00000000..504116d6 --- /dev/null +++ b/waku/v2/protocol/filterv2/options.go @@ -0,0 +1,108 @@ +package filterv2 + +import ( + "context" + + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/utils" + "go.uber.org/zap" +) + +type ( + FilterSubscribeParameters struct { + host host.Host + selectedPeer peer.ID + requestId []byte + log *zap.Logger + } + + FilterUnsubscribeParameters struct { + unsubscribeAll bool + selectedPeer peer.ID + requestId []byte + log *zap.Logger + } + + FilterSubscribeOption func(*FilterSubscribeParameters) + FilterUnsubscribeOption func(*FilterUnsubscribeParameters) +) + +func WithPeer(p peer.ID) FilterSubscribeOption { + return func(params *FilterSubscribeParameters) { + params.selectedPeer = p + } +} + +// WithAutomaticPeerSelection is an option used to randomly select a peer from the peer store. +// If a list of specific peers is passed, the peer will be chosen from that list assuming it +// supports the chosen protocol, otherwise it will chose a peer from the node peerstore +func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption { + return func(params *FilterSubscribeParameters) { + p, err := utils.SelectPeer(params.host, string(FilterSubscribeID_v20beta1), fromThesePeers, params.log) + if err == nil { + params.selectedPeer = p + } else { + params.log.Info("selecting peer", zap.Error(err)) + } + } +} + +// WithFastestPeerSelection is an option used to select a peer from the peer store +// with the lowest ping If a list of specific peers is passed, the peer will be chosen +// from that list assuming it supports the chosen protocol, otherwise it will chose a +// peer from the node peerstore +func WithFastestPeerSelection(ctx context.Context, fromThesePeers ...peer.ID) FilterSubscribeOption { + return func(params *FilterSubscribeParameters) { + p, err := utils.SelectPeerWithLowestRTT(ctx, params.host, string(FilterSubscribeID_v20beta1), fromThesePeers, params.log) + if err == nil { + params.selectedPeer = p + } else { + params.log.Info("selecting peer", zap.Error(err)) + } + } +} + +func WithRequestId(requestId []byte) FilterSubscribeOption { + return func(params *FilterSubscribeParameters) { + params.requestId = requestId + } +} + +func WithAutomaticRequestId() FilterSubscribeOption { + return func(params *FilterSubscribeParameters) { + params.requestId = protocol.GenerateRequestId() + } +} + +func DefaultSubscriptionOptions() []FilterSubscribeOption { + return []FilterSubscribeOption{ + WithAutomaticPeerSelection(), + WithAutomaticRequestId(), + } +} + +func UnsubscribeAll() FilterUnsubscribeOption { + return func(params *FilterUnsubscribeParameters) { + params.unsubscribeAll = true + } +} + +func RequestID(requestId []byte) FilterUnsubscribeOption { + return func(params *FilterUnsubscribeParameters) { + params.requestId = requestId + } +} + +func AutomaticRequestId() FilterUnsubscribeOption { + return func(params *FilterUnsubscribeParameters) { + params.requestId = protocol.GenerateRequestId() + } +} + +func DefaultUnsubscribeOptions() []FilterUnsubscribeOption { + return []FilterUnsubscribeOption{ + AutomaticRequestId(), + } +} diff --git a/waku/v2/protocol/filterv2/server.go b/waku/v2/protocol/filterv2/server.go index 05c1bc3d..213e9198 100644 --- a/waku/v2/protocol/filterv2/server.go +++ b/waku/v2/protocol/filterv2/server.go @@ -36,7 +36,7 @@ type ( wg *sync.WaitGroup log *zap.Logger - subscriptions *SubscriptionMap + subscriptions *SubscribersMap } ) @@ -54,7 +54,7 @@ func NewWakuFilter(host host.Host, broadcaster v2.Broadcaster, timesource timeso wf.wg = &sync.WaitGroup{} wf.h = host - wf.subscriptions = NewSubscriptionMap(broadcaster, timesource, params.Timeout) + wf.subscriptions = NewSubscribersMap(params.Timeout) return wf } @@ -78,7 +78,7 @@ func (wf *WakuFilter) Start(ctx context.Context) error { wf.wg.Add(1) go wf.filterListener(ctx) - wf.log.Info("filter protocol started") + wf.log.Info("filter protocol (full) started") return nil } diff --git a/waku/v2/protocol/filterv2/subscription_map.go b/waku/v2/protocol/filterv2/subscribers_map.go similarity index 65% rename from waku/v2/protocol/filterv2/subscription_map.go rename to waku/v2/protocol/filterv2/subscribers_map.go index b19f8315..cebfe30b 100644 --- a/waku/v2/protocol/filterv2/subscription_map.go +++ b/waku/v2/protocol/filterv2/subscribers_map.go @@ -8,9 +8,6 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/libp2p/go-libp2p/core/peer" - v2 "github.com/waku-org/go-waku/waku/v2" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" - "github.com/waku-org/go-waku/waku/v2/timesource" ) var ErrNotFound = errors.New("not found") @@ -21,36 +18,26 @@ type PeerSet map[peer.ID]struct{} type PubsubTopics map[string]ContentTopicSet // pubsubTopic => contentTopics -type SubscriptionMap struct { +type SubscribersMap struct { sync.RWMutex - timesource timesource.Timesource items map[peer.ID]PubsubTopics interestMap map[string]PeerSet // key: sha256(pubsubTopic-contentTopic) => peers timeout time.Duration failedPeers map[peer.ID]time.Time - - broadcaster v2.Broadcaster } -type SubscriptionItem struct { - Key peer.ID - Value PubsubTopics -} - -func NewSubscriptionMap(broadcaster v2.Broadcaster, timesource timesource.Timesource, timeout time.Duration) *SubscriptionMap { - return &SubscriptionMap{ - timesource: timesource, +func NewSubscribersMap(timeout time.Duration) *SubscribersMap { + return &SubscribersMap{ items: make(map[peer.ID]PubsubTopics), interestMap: make(map[string]PeerSet), - broadcaster: broadcaster, timeout: timeout, failedPeers: make(map[peer.ID]time.Time), } } -func (sub *SubscriptionMap) Set(peerID peer.ID, pubsubTopic string, contentTopics []string) { +func (sub *SubscribersMap) Set(peerID peer.ID, pubsubTopic string, contentTopics []string) { sub.Lock() defer sub.Unlock() @@ -83,7 +70,7 @@ func (sub *SubscriptionMap) Set(peerID peer.ID, pubsubTopic string, contentTopic } } -func (sub *SubscriptionMap) Get(peerID peer.ID) (PubsubTopics, bool) { +func (sub *SubscribersMap) Get(peerID peer.ID) (PubsubTopics, bool) { sub.RLock() defer sub.RUnlock() @@ -92,7 +79,7 @@ func (sub *SubscriptionMap) Get(peerID peer.ID) (PubsubTopics, bool) { return value, ok } -func (sub *SubscriptionMap) Has(peerID peer.ID) bool { +func (sub *SubscribersMap) Has(peerID peer.ID) bool { sub.RLock() defer sub.RUnlock() @@ -101,7 +88,7 @@ func (sub *SubscriptionMap) Has(peerID peer.ID) bool { return ok } -func (sub *SubscriptionMap) Delete(peerID peer.ID, pubsubTopic string, contentTopics []string) error { +func (sub *SubscribersMap) Delete(peerID peer.ID, pubsubTopic string, contentTopics []string) error { sub.Lock() defer sub.Unlock() @@ -146,7 +133,7 @@ func (sub *SubscriptionMap) Delete(peerID peer.ID, pubsubTopic string, contentTo return nil } -func (sub *SubscriptionMap) deleteAll(peerID peer.ID) error { +func (sub *SubscribersMap) deleteAll(peerID peer.ID) error { pubsubTopicMap, ok := sub.items[peerID] if !ok { return ErrNotFound @@ -169,14 +156,14 @@ func (sub *SubscriptionMap) deleteAll(peerID peer.ID) error { return nil } -func (sub *SubscriptionMap) DeleteAll(peerID peer.ID) error { +func (sub *SubscribersMap) DeleteAll(peerID peer.ID) error { sub.Lock() defer sub.Unlock() return sub.deleteAll(peerID) } -func (sub *SubscriptionMap) RemoveAll() { +func (sub *SubscribersMap) RemoveAll() { sub.Lock() defer sub.Unlock() @@ -185,7 +172,8 @@ func (sub *SubscriptionMap) RemoveAll() { delete(sub.items, k) } } -func (sub *SubscriptionMap) Items(pubsubTopic string, contentTopic string) <-chan peer.ID { + +func (sub *SubscribersMap) Items(pubsubTopic string, contentTopic string) <-chan peer.ID { c := make(chan peer.ID) onlyPubsubTopicKey := getKey(pubsubTopic, nil) @@ -194,11 +182,17 @@ func (sub *SubscriptionMap) Items(pubsubTopic string, contentTopic string) <-cha f := func() { sub.RLock() defer sub.RUnlock() - for p := range sub.interestMap[onlyPubsubTopicKey] { - c <- p + + if peers, ok := sub.interestMap[onlyPubsubTopicKey]; ok { + for p := range peers { + c <- p + } } - for p := range sub.interestMap[pubsubAndContentTopicKey] { - c <- p + + if peers, ok := sub.interestMap[pubsubAndContentTopicKey]; ok { + for p := range peers { + c <- p + } } close(c) } @@ -207,35 +201,7 @@ func (sub *SubscriptionMap) Items(pubsubTopic string, contentTopic string) <-cha return c } -func (fm *SubscriptionMap) Notify(msg *pb.WakuMessage, peerID peer.ID) { - /*fm.RLock() - defer fm.RUnlock() - - filter, ok := fm.items[peerID] - if !ok { - return - } - - envelope := protocol.NewEnvelope(msg, fm.timesource.Now().UnixNano(), filter.Topic) - - // Broadcasting message so it's stored - fm.broadcaster.Submit(envelope) - - if msg.ContentTopic == "" { - filter.Chan <- envelope - } - - // TODO: In case of no topics we should either trigger here for all messages, - // or we should not allow such filter to exist in the first place. - for _, contentTopic := range filter.ContentFilters { - if msg.ContentTopic == contentTopic { - filter.Chan <- envelope - break - } - }*/ -} - -func (sub *SubscriptionMap) addToInterestMap(peerID peer.ID, pubsubTopic string, contentTopic *string) { +func (sub *SubscribersMap) addToInterestMap(peerID peer.ID, pubsubTopic string, contentTopic *string) { key := getKey(pubsubTopic, contentTopic) peerSet, ok := sub.interestMap[key] if !ok { @@ -245,9 +211,12 @@ func (sub *SubscriptionMap) addToInterestMap(peerID peer.ID, pubsubTopic string, sub.interestMap[key] = peerSet } -func (sub *SubscriptionMap) removeFromInterestMap(peerID peer.ID, pubsubTopic string, contentTopic *string) { +func (sub *SubscribersMap) removeFromInterestMap(peerID peer.ID, pubsubTopic string, contentTopic *string) { key := getKey(pubsubTopic, contentTopic) - delete(sub.interestMap, key) + _, exists := sub.interestMap[key] + if exists { + delete(sub.interestMap[key], peerID) + } } func getKey(pubsubTopic string, contentTopic *string) string { @@ -260,14 +229,14 @@ func getKey(pubsubTopic string, contentTopic *string) string { } } -func (sub *SubscriptionMap) IsFailedPeer(peerID peer.ID) bool { +func (sub *SubscribersMap) IsFailedPeer(peerID peer.ID) bool { sub.RLock() defer sub.RUnlock() _, ok := sub.failedPeers[peerID] return ok } -func (sub *SubscriptionMap) FlagAsSuccess(peerID peer.ID) { +func (sub *SubscribersMap) FlagAsSuccess(peerID peer.ID) { sub.Lock() defer sub.Unlock() @@ -277,7 +246,7 @@ func (sub *SubscriptionMap) FlagAsSuccess(peerID peer.ID) { } } -func (sub *SubscriptionMap) FlagAsFailure(peerID peer.ID) { +func (sub *SubscribersMap) FlagAsFailure(peerID peer.ID) { sub.Lock() defer sub.Unlock() diff --git a/waku/v2/protocol/filterv2/subscriptions_map.go b/waku/v2/protocol/filterv2/subscriptions_map.go new file mode 100644 index 00000000..89005a29 --- /dev/null +++ b/waku/v2/protocol/filterv2/subscriptions_map.go @@ -0,0 +1,175 @@ +package filterv2 + +import ( + "sync" + + "github.com/google/uuid" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/protocol" +) + +type SubscriptionDetails struct { + sync.RWMutex + + id string + mapRef *SubscriptionsMap + closed bool + once sync.Once + + peerID peer.ID + pubsubTopic string + contentTopics map[string]struct{} + C chan *protocol.Envelope +} + +type SubscriptionSet map[string]*SubscriptionDetails + +type PeerSubscription struct { + peerID peer.ID + subscriptionsPerTopic map[string]SubscriptionSet +} + +type SubscriptionsMap struct { + sync.RWMutex + items map[peer.ID]*PeerSubscription +} + +func NewSubscriptionMap() *SubscriptionsMap { + return &SubscriptionsMap{ + items: make(map[peer.ID]*PeerSubscription), + } +} + +func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, topic string, contentTopics []string) *SubscriptionDetails { + sub.Lock() + defer sub.Unlock() + + peerSubscription, ok := sub.items[peerID] + if !ok { + peerSubscription = &PeerSubscription{ + peerID: peerID, + subscriptionsPerTopic: make(map[string]SubscriptionSet), + } + sub.items[peerID] = peerSubscription + } + + _, ok = peerSubscription.subscriptionsPerTopic[topic] + if !ok { + peerSubscription.subscriptionsPerTopic[topic] = make(SubscriptionSet) + } + + details := &SubscriptionDetails{ + id: uuid.NewString(), + mapRef: sub, + peerID: peerID, + pubsubTopic: topic, + C: make(chan *protocol.Envelope), + } + + for _, ct := range contentTopics { + details.contentTopics[ct] = struct{}{} + } + + sub.items[peerID].subscriptionsPerTopic[topic][details.id] = details + + return details +} + +func (sub *SubscriptionsMap) Delete(subscription *SubscriptionDetails) error { + sub.Lock() + defer sub.Unlock() + + peerSubscription, ok := sub.items[subscription.peerID] + if !ok { + return ErrNotFound + } + + delete(peerSubscription.subscriptionsPerTopic[subscription.pubsubTopic], subscription.id) + + return nil +} + +func (s *SubscriptionDetails) Add(contentTopics []string) { + s.Lock() + defer s.Unlock() + + for _, ct := range contentTopics { + s.contentTopics[ct] = struct{}{} + } +} + +func (s *SubscriptionDetails) Remove(contentTopics []string) { + s.Lock() + defer s.Unlock() + + for _, ct := range contentTopics { + delete(s.contentTopics, ct) + } +} + +func (s *SubscriptionDetails) closeC() { + s.once.Do(func() { + s.Lock() + defer s.Unlock() + + s.closed = true + close(s.C) + }) + +} + +func (s *SubscriptionDetails) Close() error { + s.closeC() + return s.mapRef.Delete(s) +} + +func (sub *SubscriptionsMap) clear() { + for _, peerSubscription := range sub.items { + for _, subscriptionSet := range peerSubscription.subscriptionsPerTopic { + for _, subscription := range subscriptionSet { + subscription.closeC() + } + } + } + + sub.items = make(map[peer.ID]*PeerSubscription) +} + +func (sub *SubscriptionsMap) Clear() { + sub.Lock() + defer sub.Unlock() + sub.clear() +} + +func (sub *SubscriptionsMap) Notify(peerID peer.ID, envelope *protocol.Envelope) { + sub.RLock() + defer sub.RUnlock() + + subscriptions, ok := sub.items[peerID].subscriptionsPerTopic[envelope.PubsubTopic()] + if ok { + iterateSubscriptionSet(subscriptions, envelope) + } + + subscriptionsWithNoPeer, ok := sub.items[peerID].subscriptionsPerTopic[envelope.PubsubTopic()] + if ok { + iterateSubscriptionSet(subscriptionsWithNoPeer, envelope) + } +} + +func iterateSubscriptionSet(subscriptions SubscriptionSet, envelope *protocol.Envelope) { + for _, subscription := range subscriptions { + func(subscription *SubscriptionDetails) { + subscription.RLock() + defer subscription.RUnlock() + + _, ok := subscription.contentTopics[envelope.Message().ContentTopic] + if !ok && len(subscription.contentTopics) != 0 { // TODO: confirm if no content topics are allowed + return + } + + if !subscription.closed { + subscription.C <- envelope + } + }(subscription) + } +}