mirror of
https://github.com/status-im/go-waku.git
synced 2025-01-27 05:56:07 +00:00
fix(subscription-map): uniform operations and encapsulation (#853)
* fix(subscription-map): uniform operations and encapsulation * nit: fixes based on comments
This commit is contained in:
parent
67d57a36b8
commit
a0bc53c679
@ -171,7 +171,7 @@ func (s *FilterService) unsubscribe(w http.ResponseWriter, req *http.Request) {
|
||||
}
|
||||
|
||||
// unsubscribe on filter
|
||||
errCh, err := s.node.FilterLightnode().Unsubscribe(
|
||||
result, err := s.node.FilterLightnode().Unsubscribe(
|
||||
req.Context(),
|
||||
protocol.NewContentFilter(message.PubsubTopic, message.ContentFilters...),
|
||||
filter.WithRequestID(message.RequestId),
|
||||
@ -190,14 +190,17 @@ func (s *FilterService) unsubscribe(w http.ResponseWriter, req *http.Request) {
|
||||
// on success
|
||||
writeResponse(w, filterSubscriptionResponse{
|
||||
RequestId: message.RequestId,
|
||||
StatusDesc: s.unsubscribeGetMessage(errCh),
|
||||
StatusDesc: s.unsubscribeGetMessage(result),
|
||||
}, http.StatusOK)
|
||||
}
|
||||
|
||||
func (s *FilterService) unsubscribeGetMessage(ch <-chan filter.WakuFilterPushResult) string {
|
||||
func (s *FilterService) unsubscribeGetMessage(result *filter.WakuFilterPushResult) string {
|
||||
if result == nil {
|
||||
return http.StatusText(http.StatusOK)
|
||||
}
|
||||
var peerIds string
|
||||
ind := 0
|
||||
for entry := range ch {
|
||||
for _, entry := range result.Errors() {
|
||||
s.log.Error("can't unsubscribe for ", zap.String("peer", entry.PeerID.String()), zap.Error(entry.Err))
|
||||
if ind != 0 {
|
||||
peerIds += ", "
|
||||
|
@ -150,14 +150,16 @@ func FilterUnsubscribe(filterJSON string, peerID string, ms int) error {
|
||||
return errors.New("peerID is required")
|
||||
}
|
||||
|
||||
pushResult, err := wakuState.node.FilterLightnode().Unsubscribe(ctx, cf, fOptions...)
|
||||
result, err := wakuState.node.FilterLightnode().Unsubscribe(ctx, cf, fOptions...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
result := <-pushResult
|
||||
|
||||
return result.Err
|
||||
errs := result.Errors()
|
||||
if len(errs) == 0 {
|
||||
return nil
|
||||
}
|
||||
return errs[0].Err
|
||||
}
|
||||
|
||||
type unsubscribeAllResult struct {
|
||||
@ -192,19 +194,19 @@ func FilterUnsubscribeAll(peerID string, ms int) (string, error) {
|
||||
fOptions = append(fOptions, filter.UnsubscribeAll())
|
||||
}
|
||||
|
||||
pushResult, err := wakuState.node.FilterLightnode().UnsubscribeAll(ctx, fOptions...)
|
||||
result, err := wakuState.node.FilterLightnode().UnsubscribeAll(ctx, fOptions...)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
var unsubscribeResult []unsubscribeAllResult
|
||||
|
||||
for result := range pushResult {
|
||||
for _, err := range result.Errors() {
|
||||
ur := unsubscribeAllResult{
|
||||
PeerID: result.PeerID.Pretty(),
|
||||
PeerID: err.PeerID.Pretty(),
|
||||
}
|
||||
if result.Err != nil {
|
||||
ur.Error = result.Err.Error()
|
||||
if err.Err != nil {
|
||||
ur.Error = err.Err.Error()
|
||||
}
|
||||
unsubscribeResult = append(unsubscribeResult, ur)
|
||||
}
|
||||
|
@ -15,6 +15,10 @@ func NewContentTopicSet(contentTopics ...string) ContentTopicSet {
|
||||
return s
|
||||
}
|
||||
|
||||
func (cf ContentTopicSet) ToList() []string {
|
||||
return maps.Keys(cf)
|
||||
}
|
||||
|
||||
// ContentFilter is used to specify the filter to be applied for a FilterNode.
|
||||
// Topic means pubSubTopic - optional in case of using contentTopics that following Auto sharding, mandatory in case of named or static sharding.
|
||||
// ContentTopics - Specify list of content topics to be filtered under a pubSubTopic (for named and static sharding), or a list of contentTopics (in case ofAuto sharding)
|
||||
@ -25,7 +29,7 @@ type ContentFilter struct {
|
||||
}
|
||||
|
||||
func (cf ContentFilter) ContentTopicsList() []string {
|
||||
return maps.Keys(cf.ContentTopics)
|
||||
return cf.ContentTopics.ToList()
|
||||
}
|
||||
|
||||
func NewContentFilter(pubsubTopic string, contentTopics ...string) ContentFilter {
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"math"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
@ -46,11 +47,27 @@ type WakuFilterLightNode struct {
|
||||
pm *peermanager.PeerManager
|
||||
}
|
||||
|
||||
type WakuFilterPushResult struct {
|
||||
type WakuFilterPushError struct {
|
||||
Err error
|
||||
PeerID peer.ID
|
||||
}
|
||||
|
||||
type WakuFilterPushResult struct {
|
||||
errs []WakuFilterPushError
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
func (arr *WakuFilterPushResult) Add(err WakuFilterPushError) {
|
||||
arr.Lock()
|
||||
defer arr.Unlock()
|
||||
arr.errs = append(arr.errs, err)
|
||||
}
|
||||
func (arr *WakuFilterPushResult) Errors() []WakuFilterPushError {
|
||||
arr.RLock()
|
||||
defer arr.RUnlock()
|
||||
return arr.errs
|
||||
}
|
||||
|
||||
// NewWakuFilterLightnode returns a new instance of Waku Filter struct setup according to the chosen parameter and options
|
||||
// Note that broadcaster is optional.
|
||||
// Takes an optional peermanager if WakuFilterLightnode is being created along with WakuNode.
|
||||
@ -95,7 +112,7 @@ func (wf *WakuFilterLightNode) Stop() {
|
||||
wf.log.Warn("unsubscribing from full nodes", zap.Error(err))
|
||||
}
|
||||
|
||||
for r := range res {
|
||||
for _, r := range res.Errors() {
|
||||
if r.Err != nil {
|
||||
wf.log.Warn("unsubscribing from full nodes", zap.Error(r.Err), logging.HostID("peerID", r.PeerID))
|
||||
}
|
||||
@ -395,59 +412,8 @@ func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscrip
|
||||
return wf.Ping(ctx, subscription.PeerID)
|
||||
}
|
||||
|
||||
func (wf *WakuFilterLightNode) Subscriptions() []*subscription.SubscriptionDetails {
|
||||
wf.RLock()
|
||||
defer wf.RUnlock()
|
||||
if err := wf.ErrOnNotRunning(); err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
wf.subscriptions.RLock()
|
||||
defer wf.subscriptions.RUnlock()
|
||||
|
||||
var output []*subscription.SubscriptionDetails
|
||||
|
||||
for _, peerSubscription := range wf.subscriptions.Items {
|
||||
for _, subscriptions := range peerSubscription.SubsPerPubsubTopic {
|
||||
for _, subscriptionDetail := range subscriptions {
|
||||
output = append(output, subscriptionDetail)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return output
|
||||
}
|
||||
|
||||
func (wf *WakuFilterLightNode) cleanupSubscriptions(peerID peer.ID, contentFilter protocol.ContentFilter) {
|
||||
wf.subscriptions.Lock()
|
||||
defer wf.subscriptions.Unlock()
|
||||
|
||||
peerSubscription, ok := wf.subscriptions.Items[peerID]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
subscriptionDetailList, ok := peerSubscription.SubsPerPubsubTopic[contentFilter.PubsubTopic]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
for subscriptionDetailID, subscriptionDetail := range subscriptionDetailList {
|
||||
subscriptionDetail.Remove(contentFilter.ContentTopicsList()...)
|
||||
if len(subscriptionDetail.ContentFilter.ContentTopics) == 0 {
|
||||
delete(subscriptionDetailList, subscriptionDetailID)
|
||||
subscriptionDetail.CloseC()
|
||||
}
|
||||
}
|
||||
|
||||
if len(subscriptionDetailList) == 0 {
|
||||
delete(wf.subscriptions.Items[peerID].SubsPerPubsubTopic, contentFilter.PubsubTopic)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Unsubscribe is used to stop receiving messages from a peer that match a content filter
|
||||
func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) {
|
||||
func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) {
|
||||
wf.RLock()
|
||||
defer wf.RUnlock()
|
||||
if err := wf.ErrOnNotRunning(); err != nil {
|
||||
@ -475,28 +441,18 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter pr
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.Items))
|
||||
result := &WakuFilterPushResult{}
|
||||
for pTopic, cTopics := range pubSubTopicMap {
|
||||
cFilter := protocol.NewContentFilter(pTopic, cTopics...)
|
||||
for peerID := range wf.subscriptions.Items {
|
||||
if params.selectedPeer != "" && peerID != params.selectedPeer {
|
||||
continue
|
||||
}
|
||||
|
||||
subscriptions, ok := wf.subscriptions.Items[peerID]
|
||||
if !ok || subscriptions == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
wf.cleanupSubscriptions(peerID, cFilter)
|
||||
if len(subscriptions.SubsPerPubsubTopic) == 0 {
|
||||
delete(wf.subscriptions.Items, peerID)
|
||||
}
|
||||
|
||||
if params.wg != nil {
|
||||
params.wg.Add(1)
|
||||
}
|
||||
|
||||
peers, subs := wf.subscriptions.GetSubscription(params.selectedPeer, cFilter)
|
||||
for _, sub := range subs {
|
||||
sub.Remove(cTopics...)
|
||||
}
|
||||
if params.wg != nil {
|
||||
params.wg.Add(len(peers))
|
||||
}
|
||||
// send unsubscribe request to all the peers
|
||||
for _, peerID := range peers {
|
||||
go func(peerID peer.ID) {
|
||||
defer func() {
|
||||
if params.wg != nil {
|
||||
@ -506,10 +462,10 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter pr
|
||||
err := wf.unsubscribeFromServer(ctx, &FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID}, cFilter)
|
||||
|
||||
if params.wg != nil {
|
||||
resultChan <- WakuFilterPushResult{
|
||||
result.Add(WakuFilterPushError{
|
||||
Err: err,
|
||||
PeerID: peerID,
|
||||
}
|
||||
})
|
||||
}
|
||||
}(peerID)
|
||||
}
|
||||
@ -518,16 +474,19 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter pr
|
||||
params.wg.Wait()
|
||||
}
|
||||
|
||||
close(resultChan)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
return resultChan, nil
|
||||
func (wf *WakuFilterLightNode) Subscriptions() []*subscription.SubscriptionDetails {
|
||||
_, subs := wf.subscriptions.GetSubscription("", protocol.ContentFilter{})
|
||||
return subs
|
||||
}
|
||||
|
||||
// UnsubscribeWithSubscription is used to close a particular subscription
|
||||
// If there are no more subscriptions matching the passed [peer, contentFilter] pair,
|
||||
// server unsubscribe is also performed
|
||||
func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *subscription.SubscriptionDetails,
|
||||
opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) {
|
||||
opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) {
|
||||
wf.RLock()
|
||||
defer wf.RUnlock()
|
||||
if err := wf.ErrOnNotRunning(); err != nil {
|
||||
@ -542,20 +501,18 @@ func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context,
|
||||
// Close this sub
|
||||
sub.Close()
|
||||
|
||||
resultChan := make(chan WakuFilterPushResult, 1)
|
||||
result := &WakuFilterPushResult{}
|
||||
|
||||
if !wf.subscriptions.Has(sub.PeerID, sub.ContentFilter) {
|
||||
// Last sub for this [peer, contentFilter] pair
|
||||
paramsCopy := params.Copy()
|
||||
paramsCopy.selectedPeer = sub.PeerID
|
||||
err = wf.unsubscribeFromServer(ctx, paramsCopy, sub.ContentFilter)
|
||||
resultChan <- WakuFilterPushResult{
|
||||
params.selectedPeer = sub.PeerID
|
||||
err = wf.unsubscribeFromServer(ctx, params, sub.ContentFilter)
|
||||
result.Add(WakuFilterPushError{
|
||||
Err: err,
|
||||
PeerID: sub.PeerID,
|
||||
}
|
||||
})
|
||||
}
|
||||
close(resultChan)
|
||||
return resultChan, err
|
||||
return result, err
|
||||
|
||||
}
|
||||
|
||||
@ -573,28 +530,23 @@ func (wf *WakuFilterLightNode) unsubscribeFromServer(ctx context.Context, params
|
||||
return err
|
||||
}
|
||||
|
||||
func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) {
|
||||
// close all subscribe for selectedPeer or if selectedPeer == "", then all peers
|
||||
// send the unsubscribeAll request to the peers
|
||||
func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) {
|
||||
params, err := wf.getUnsubscribeParameters(opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
wf.subscriptions.Lock()
|
||||
defer wf.subscriptions.Unlock()
|
||||
|
||||
resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.Items))
|
||||
|
||||
for peerID := range wf.subscriptions.Items {
|
||||
if params.selectedPeer != "" && peerID != params.selectedPeer {
|
||||
continue
|
||||
}
|
||||
|
||||
delete(wf.subscriptions.Items, peerID)
|
||||
|
||||
if params.wg != nil {
|
||||
params.wg.Add(1)
|
||||
}
|
||||
|
||||
peerIds, subs := wf.subscriptions.GetSubscription(params.selectedPeer, protocol.ContentFilter{})
|
||||
for _, sub := range subs {
|
||||
sub.Close()
|
||||
}
|
||||
result := &WakuFilterPushResult{}
|
||||
if params.wg != nil {
|
||||
params.wg.Add(len(peerIds))
|
||||
}
|
||||
for _, peerId := range peerIds {
|
||||
go func(peerID peer.ID) {
|
||||
defer func() {
|
||||
if params.wg != nil {
|
||||
@ -613,25 +565,23 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte
|
||||
wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err))
|
||||
}
|
||||
if params.wg != nil {
|
||||
resultChan <- WakuFilterPushResult{
|
||||
result.Add(WakuFilterPushError{
|
||||
Err: err,
|
||||
PeerID: peerID,
|
||||
}
|
||||
})
|
||||
}
|
||||
}(peerID)
|
||||
}(peerId)
|
||||
}
|
||||
|
||||
if params.wg != nil {
|
||||
params.wg.Wait()
|
||||
}
|
||||
|
||||
close(resultChan)
|
||||
|
||||
return resultChan, nil
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// UnsubscribeAll is used to stop receiving messages from peer(s). It does not close subscriptions
|
||||
func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) {
|
||||
func (wf *WakuFilterLightNode) UnsubscribeAll(ctx context.Context, opts ...FilterSubscribeOption) (*WakuFilterPushResult, error) {
|
||||
wf.RLock()
|
||||
defer wf.RUnlock()
|
||||
if err := wf.ErrOnNotRunning(); err != nil {
|
||||
|
@ -177,7 +177,10 @@ func (s *FilterTestSuite) waitForMessages(fn func(), subs []*subscription.Subscr
|
||||
s.log.Info("Looking at ", zap.String("pubSubTopic", sub.ContentFilter.PubsubTopic))
|
||||
for i := 0; i < msgCount; i++ {
|
||||
select {
|
||||
case env := <-sub.C:
|
||||
case env, ok := <-sub.C:
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
received := WakuMsg{
|
||||
pubSubTopic: env.PubsubTopic(),
|
||||
contentTopic: env.Message().GetContentTopic(),
|
||||
@ -422,10 +425,9 @@ func (s *FilterTestSuite) TestFireAndForgetAndCustomWg() {
|
||||
_, err := s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID()))
|
||||
s.Require().NoError(err)
|
||||
|
||||
ch, err := s.lightNode.Unsubscribe(s.ctx, contentFilter, DontWait())
|
||||
_, open := <-ch
|
||||
result, err := s.lightNode.Unsubscribe(s.ctx, contentFilter, DontWait())
|
||||
s.Require().NoError(err)
|
||||
s.Require().False(open)
|
||||
s.Require().Equal(0, len(result.Errors()))
|
||||
|
||||
_, err = s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID()))
|
||||
s.Require().NoError(err)
|
||||
|
122
waku/v2/protocol/subscription/subscription_details.go
Normal file
122
waku/v2/protocol/subscription/subscription_details.go
Normal file
@ -0,0 +1,122 @@
|
||||
package subscription
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"sync"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||
)
|
||||
|
||||
// Map of SubscriptionDetails.ID to subscriptions
|
||||
type SubscriptionSet map[string]*SubscriptionDetails
|
||||
|
||||
type PeerSubscription struct {
|
||||
PeerID peer.ID
|
||||
SubsPerPubsubTopic map[string]SubscriptionSet
|
||||
}
|
||||
|
||||
type PeerContentFilter struct {
|
||||
PeerID peer.ID `json:"peerID"`
|
||||
PubsubTopic string `json:"pubsubTopics"`
|
||||
ContentTopics []string `json:"contentTopics"`
|
||||
}
|
||||
|
||||
type SubscriptionDetails struct {
|
||||
sync.RWMutex
|
||||
|
||||
ID string `json:"subscriptionID"`
|
||||
mapRef *SubscriptionsMap
|
||||
Closed bool `json:"-"`
|
||||
once sync.Once
|
||||
|
||||
PeerID peer.ID `json:"peerID"`
|
||||
ContentFilter protocol.ContentFilter `json:"contentFilters"`
|
||||
C chan *protocol.Envelope `json:"-"`
|
||||
}
|
||||
|
||||
func (s *SubscriptionDetails) Add(contentTopics ...string) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
for _, ct := range contentTopics {
|
||||
if _, ok := s.ContentFilter.ContentTopics[ct]; !ok {
|
||||
s.ContentFilter.ContentTopics[ct] = struct{}{}
|
||||
// Increase the number of subscriptions for this (pubsubTopic, contentTopic) pair
|
||||
s.mapRef.Lock()
|
||||
s.mapRef.increaseSubFor(s.ContentFilter.PubsubTopic, ct)
|
||||
s.mapRef.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SubscriptionDetails) Remove(contentTopics ...string) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
for _, ct := range contentTopics {
|
||||
if _, ok := s.ContentFilter.ContentTopics[ct]; ok {
|
||||
delete(s.ContentFilter.ContentTopics, ct)
|
||||
// Decrease the number of subscriptions for this (pubsubTopic, contentTopic) pair
|
||||
s.mapRef.Lock()
|
||||
s.mapRef.decreaseSubFor(s.ContentFilter.PubsubTopic, ct)
|
||||
s.mapRef.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
if len(s.ContentFilter.ContentTopics) == 0 {
|
||||
// err doesn't matter
|
||||
_ = s.mapRef.Delete(s)
|
||||
}
|
||||
}
|
||||
|
||||
// C1 if contentFilter is empty, it means that given subscription is part of contentFilter
|
||||
// C2 if not empty, check matching pubsubsTopic and atleast 1 contentTopic
|
||||
func (s *SubscriptionDetails) isPartOf(contentFilter protocol.ContentFilter) bool {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
if contentFilter.PubsubTopic != "" && // C1
|
||||
s.ContentFilter.PubsubTopic != contentFilter.PubsubTopic { // C2
|
||||
return false
|
||||
}
|
||||
// C1
|
||||
if len(contentFilter.ContentTopics) == 0 {
|
||||
return true
|
||||
}
|
||||
// C2
|
||||
for cTopic := range contentFilter.ContentTopics {
|
||||
if _, ok := s.ContentFilter.ContentTopics[cTopic]; ok {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (s *SubscriptionDetails) CloseC() {
|
||||
s.once.Do(func() {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
s.Closed = true
|
||||
close(s.C)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *SubscriptionDetails) Close() error {
|
||||
s.CloseC()
|
||||
return s.mapRef.Delete(s)
|
||||
}
|
||||
|
||||
func (s *SubscriptionDetails) MarshalJSON() ([]byte, error) {
|
||||
result := struct {
|
||||
PeerID peer.ID `json:"peerID"`
|
||||
PubsubTopic string `json:"pubsubTopics"`
|
||||
ContentTopics []string `json:"contentTopics"`
|
||||
}{
|
||||
PeerID: s.PeerID,
|
||||
PubsubTopic: s.ContentFilter.PubsubTopic,
|
||||
ContentTopics: s.ContentFilter.ContentTopics.ToList(),
|
||||
}
|
||||
|
||||
return json.Marshal(result)
|
||||
}
|
@ -1,7 +1,6 @@
|
||||
package subscription
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
@ -12,53 +11,51 @@ import (
|
||||
"golang.org/x/exp/maps"
|
||||
)
|
||||
|
||||
type SubscriptionDetails struct {
|
||||
sync.RWMutex
|
||||
|
||||
ID string `json:"subscriptionID"`
|
||||
mapRef *SubscriptionsMap
|
||||
Closed bool `json:"-"`
|
||||
once sync.Once
|
||||
|
||||
PeerID peer.ID `json:"peerID"`
|
||||
ContentFilter protocol.ContentFilter `json:"contentFilters"`
|
||||
C chan *protocol.Envelope `json:"-"`
|
||||
}
|
||||
|
||||
// Map of SubscriptionDetails.ID to subscriptions
|
||||
type SubscriptionSet map[string]*SubscriptionDetails
|
||||
|
||||
type PeerSubscription struct {
|
||||
PeerID peer.ID
|
||||
SubsPerPubsubTopic map[string]SubscriptionSet
|
||||
}
|
||||
|
||||
type SubscriptionsMap struct {
|
||||
sync.RWMutex
|
||||
logger *zap.Logger
|
||||
Items map[peer.ID]*PeerSubscription
|
||||
logger *zap.Logger
|
||||
items map[peer.ID]*PeerSubscription
|
||||
noOfSubs map[string]map[string]int
|
||||
}
|
||||
|
||||
var ErrNotFound = errors.New("not found")
|
||||
|
||||
func NewSubscriptionMap(logger *zap.Logger) *SubscriptionsMap {
|
||||
return &SubscriptionsMap{
|
||||
logger: logger.Named("subscription-map"),
|
||||
Items: make(map[peer.ID]*PeerSubscription),
|
||||
logger: logger.Named("subscription-map"),
|
||||
items: make(map[peer.ID]*PeerSubscription),
|
||||
noOfSubs: map[string]map[string]int{},
|
||||
}
|
||||
}
|
||||
|
||||
func (m *SubscriptionsMap) IsListening(pubsubTopic, contentTopic string) bool {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
return m.noOfSubs[pubsubTopic] != nil && m.noOfSubs[pubsubTopic][contentTopic] > 0
|
||||
}
|
||||
|
||||
func (m *SubscriptionsMap) increaseSubFor(pubsubTopic, contentTopic string) {
|
||||
if m.noOfSubs[pubsubTopic] == nil {
|
||||
m.noOfSubs[pubsubTopic] = map[string]int{}
|
||||
}
|
||||
m.noOfSubs[pubsubTopic][contentTopic] = m.noOfSubs[pubsubTopic][contentTopic] + 1
|
||||
}
|
||||
|
||||
func (m *SubscriptionsMap) decreaseSubFor(pubsubTopic, contentTopic string) {
|
||||
m.noOfSubs[pubsubTopic][contentTopic] = m.noOfSubs[pubsubTopic][contentTopic] - 1
|
||||
}
|
||||
|
||||
func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, cf protocol.ContentFilter) *SubscriptionDetails {
|
||||
sub.Lock()
|
||||
defer sub.Unlock()
|
||||
|
||||
peerSubscription, ok := sub.Items[peerID]
|
||||
peerSubscription, ok := sub.items[peerID]
|
||||
if !ok {
|
||||
peerSubscription = &PeerSubscription{
|
||||
PeerID: peerID,
|
||||
SubsPerPubsubTopic: make(map[string]SubscriptionSet),
|
||||
}
|
||||
sub.Items[peerID] = peerSubscription
|
||||
sub.items[peerID] = peerSubscription
|
||||
}
|
||||
|
||||
_, ok = peerSubscription.SubsPerPubsubTopic[cf.PubsubTopic]
|
||||
@ -74,7 +71,12 @@ func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, cf protocol.Content
|
||||
ContentFilter: protocol.ContentFilter{PubsubTopic: cf.PubsubTopic, ContentTopics: maps.Clone(cf.ContentTopics)},
|
||||
}
|
||||
|
||||
sub.Items[peerID].SubsPerPubsubTopic[cf.PubsubTopic][details.ID] = details
|
||||
// Increase the number of subscriptions for this (pubsubTopic, contentTopic) pair
|
||||
for contentTopic := range cf.ContentTopics {
|
||||
sub.increaseSubFor(cf.PubsubTopic, contentTopic)
|
||||
}
|
||||
|
||||
sub.items[peerID].SubsPerPubsubTopic[cf.PubsubTopic][details.ID] = details
|
||||
|
||||
return details
|
||||
}
|
||||
@ -83,7 +85,7 @@ func (sub *SubscriptionsMap) IsSubscribedTo(peerID peer.ID) bool {
|
||||
sub.RLock()
|
||||
defer sub.RUnlock()
|
||||
|
||||
_, ok := sub.Items[peerID]
|
||||
_, ok := sub.items[peerID]
|
||||
return ok
|
||||
}
|
||||
|
||||
@ -93,7 +95,7 @@ func (sub *SubscriptionsMap) Has(peerID peer.ID, cf protocol.ContentFilter) bool
|
||||
defer sub.RUnlock()
|
||||
|
||||
// Check if peer exits
|
||||
peerSubscription, ok := sub.Items[peerID]
|
||||
peerSubscription, ok := sub.items[peerID]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
@ -125,67 +127,24 @@ func (sub *SubscriptionsMap) Delete(subscription *SubscriptionDetails) error {
|
||||
sub.Lock()
|
||||
defer sub.Unlock()
|
||||
|
||||
peerSubscription, ok := sub.Items[subscription.PeerID]
|
||||
peerSubscription, ok := sub.items[subscription.PeerID]
|
||||
if !ok {
|
||||
return ErrNotFound
|
||||
}
|
||||
|
||||
delete(peerSubscription.SubsPerPubsubTopic[subscription.ContentFilter.PubsubTopic], subscription.ID)
|
||||
contentFilter := subscription.ContentFilter
|
||||
delete(peerSubscription.SubsPerPubsubTopic[contentFilter.PubsubTopic], subscription.ID)
|
||||
|
||||
// Decrease the number of subscriptions for this (pubsubTopic, contentTopic) pair
|
||||
for contentTopic := range contentFilter.ContentTopics {
|
||||
sub.decreaseSubFor(contentFilter.PubsubTopic, contentTopic)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SubscriptionDetails) Add(contentTopics ...string) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
for _, ct := range contentTopics {
|
||||
s.ContentFilter.ContentTopics[ct] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SubscriptionDetails) Remove(contentTopics ...string) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
for _, ct := range contentTopics {
|
||||
delete(s.ContentFilter.ContentTopics, ct)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SubscriptionDetails) CloseC() {
|
||||
s.once.Do(func() {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
s.Closed = true
|
||||
close(s.C)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *SubscriptionDetails) Close() error {
|
||||
s.CloseC()
|
||||
return s.mapRef.Delete(s)
|
||||
}
|
||||
|
||||
func (s *SubscriptionDetails) Clone() *SubscriptionDetails {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
|
||||
result := &SubscriptionDetails{
|
||||
ID: uuid.NewString(),
|
||||
mapRef: s.mapRef,
|
||||
Closed: false,
|
||||
PeerID: s.PeerID,
|
||||
ContentFilter: protocol.ContentFilter{PubsubTopic: s.ContentFilter.PubsubTopic, ContentTopics: maps.Clone(s.ContentFilter.ContentTopics)},
|
||||
C: make(chan *protocol.Envelope),
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func (sub *SubscriptionsMap) clear() {
|
||||
for _, peerSubscription := range sub.Items {
|
||||
for _, peerSubscription := range sub.items {
|
||||
for _, subscriptionSet := range peerSubscription.SubsPerPubsubTopic {
|
||||
for _, subscription := range subscriptionSet {
|
||||
subscription.CloseC()
|
||||
@ -193,7 +152,7 @@ func (sub *SubscriptionsMap) clear() {
|
||||
}
|
||||
}
|
||||
|
||||
sub.Items = make(map[peer.ID]*PeerSubscription)
|
||||
sub.items = make(map[peer.ID]*PeerSubscription)
|
||||
}
|
||||
|
||||
func (sub *SubscriptionsMap) Clear() {
|
||||
@ -206,7 +165,7 @@ func (sub *SubscriptionsMap) Notify(peerID peer.ID, envelope *protocol.Envelope)
|
||||
sub.RLock()
|
||||
defer sub.RUnlock()
|
||||
|
||||
subscriptions, ok := sub.Items[peerID].SubsPerPubsubTopic[envelope.PubsubTopic()]
|
||||
subscriptions, ok := sub.items[peerID].SubsPerPubsubTopic[envelope.PubsubTopic()]
|
||||
if ok {
|
||||
iterateSubscriptionSet(sub.logger, subscriptions, envelope)
|
||||
}
|
||||
@ -234,21 +193,24 @@ func iterateSubscriptionSet(logger *zap.Logger, subscriptions SubscriptionSet, e
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SubscriptionDetails) MarshalJSON() ([]byte, error) {
|
||||
type resultType struct {
|
||||
PeerID string `json:"peerID"`
|
||||
PubsubTopic string `json:"pubsubTopic"`
|
||||
ContentTopics []string `json:"contentTopics"`
|
||||
}
|
||||
func (m *SubscriptionsMap) GetSubscription(peerID peer.ID, contentFilter protocol.ContentFilter) ([]peer.ID, []*SubscriptionDetails) {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
|
||||
result := resultType{
|
||||
PeerID: s.PeerID.Pretty(),
|
||||
PubsubTopic: s.ContentFilter.PubsubTopic,
|
||||
}
|
||||
var output []*SubscriptionDetails
|
||||
|
||||
for c := range s.ContentFilter.ContentTopics {
|
||||
result.ContentTopics = append(result.ContentTopics, c)
|
||||
var peerIDs []peer.ID
|
||||
for _, peerSubs := range m.items {
|
||||
if peerID == "" || peerSubs.PeerID == peerID {
|
||||
peerIDs = append(peerIDs, peerID)
|
||||
for _, subs := range peerSubs.SubsPerPubsubTopic {
|
||||
for _, subscriptionDetail := range subs {
|
||||
if subscriptionDetail.isPartOf(contentFilter) {
|
||||
output = append(output, subscriptionDetail)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return json.Marshal(result)
|
||||
return peerIDs, output
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user