2023-09-29 10:43:25 +05:30
package subscription
2023-02-08 19:33:06 -04:00
import (
2024-07-15 19:47:27 +05:30
"context"
2023-09-29 10:43:25 +05:30
"errors"
2023-02-08 19:33:06 -04:00
"sync"
"github.com/google/uuid"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol"
2023-05-08 17:33:10 -04:00
"go.uber.org/zap"
2023-09-19 15:52:11 +03:00
"golang.org/x/exp/maps"
2023-02-08 19:33:06 -04:00
)
type SubscriptionsMap struct {
sync . RWMutex
2023-11-04 14:16:24 +07:00
logger * zap . Logger
items map [ peer . ID ] * PeerSubscription
noOfSubs map [ string ] map [ string ] int
2023-02-08 19:33:06 -04:00
}
2023-09-29 10:43:25 +05:30
var ErrNotFound = errors . New ( "not found" )
2023-05-08 17:33:10 -04:00
func NewSubscriptionMap ( logger * zap . Logger ) * SubscriptionsMap {
2023-02-08 19:33:06 -04:00
return & SubscriptionsMap {
2023-11-04 14:16:24 +07:00
logger : logger . Named ( "subscription-map" ) ,
items : make ( map [ peer . ID ] * PeerSubscription ) ,
noOfSubs : map [ string ] map [ string ] int { } ,
2023-02-08 19:33:06 -04:00
}
}
2023-12-01 06:27:13 +05:30
func ( m * SubscriptionsMap ) Count ( ) int {
2024-01-26 14:15:15 +05:30
m . RLock ( )
2023-12-01 06:27:13 +05:30
defer m . RUnlock ( )
return len ( m . items )
}
2023-11-04 14:16:24 +07:00
func ( m * SubscriptionsMap ) IsListening ( pubsubTopic , contentTopic string ) bool {
m . RLock ( )
defer m . RUnlock ( )
return m . noOfSubs [ pubsubTopic ] != nil && m . noOfSubs [ pubsubTopic ] [ contentTopic ] > 0
}
func ( m * SubscriptionsMap ) increaseSubFor ( pubsubTopic , contentTopic string ) {
if m . noOfSubs [ pubsubTopic ] == nil {
m . noOfSubs [ pubsubTopic ] = map [ string ] int { }
}
m . noOfSubs [ pubsubTopic ] [ contentTopic ] = m . noOfSubs [ pubsubTopic ] [ contentTopic ] + 1
}
func ( m * SubscriptionsMap ) decreaseSubFor ( pubsubTopic , contentTopic string ) {
m . noOfSubs [ pubsubTopic ] [ contentTopic ] = m . noOfSubs [ pubsubTopic ] [ contentTopic ] - 1
}
2023-09-29 10:43:25 +05:30
func ( sub * SubscriptionsMap ) NewSubscription ( peerID peer . ID , cf protocol . ContentFilter ) * SubscriptionDetails {
2023-02-08 19:33:06 -04:00
sub . Lock ( )
defer sub . Unlock ( )
2023-11-04 14:16:24 +07:00
peerSubscription , ok := sub . items [ peerID ]
2023-02-08 19:33:06 -04:00
if ! ok {
peerSubscription = & PeerSubscription {
2023-09-29 10:43:25 +05:30
PeerID : peerID ,
SubsPerPubsubTopic : make ( map [ string ] SubscriptionSet ) ,
2023-02-08 19:33:06 -04:00
}
2023-11-04 14:16:24 +07:00
sub . items [ peerID ] = peerSubscription
2023-02-08 19:33:06 -04:00
}
2023-09-29 10:43:25 +05:30
_ , ok = peerSubscription . SubsPerPubsubTopic [ cf . PubsubTopic ]
2023-02-08 19:33:06 -04:00
if ! ok {
2023-09-29 10:43:25 +05:30
peerSubscription . SubsPerPubsubTopic [ cf . PubsubTopic ] = make ( SubscriptionSet )
2023-02-08 19:33:06 -04:00
}
details := & SubscriptionDetails {
2023-05-08 17:33:10 -04:00
ID : uuid . NewString ( ) ,
2023-02-14 18:19:38 -04:00
mapRef : sub ,
2023-05-08 17:33:10 -04:00
PeerID : peerID ,
C : make ( chan * protocol . Envelope , 1024 ) ,
2023-09-29 10:43:25 +05:30
ContentFilter : protocol . ContentFilter { PubsubTopic : cf . PubsubTopic , ContentTopics : maps . Clone ( cf . ContentTopics ) } ,
2024-06-07 15:35:04 +05:30
Closing : make ( chan bool ) ,
2023-02-08 19:33:06 -04:00
}
2023-11-04 14:16:24 +07:00
// Increase the number of subscriptions for this (pubsubTopic, contentTopic) pair
for contentTopic := range cf . ContentTopics {
sub . increaseSubFor ( cf . PubsubTopic , contentTopic )
}
sub . items [ peerID ] . SubsPerPubsubTopic [ cf . PubsubTopic ] [ details . ID ] = details
2023-02-08 19:33:06 -04:00
return details
}
2023-04-20 10:44:06 -04:00
func ( sub * SubscriptionsMap ) IsSubscribedTo ( peerID peer . ID ) bool {
sub . RLock ( )
defer sub . RUnlock ( )
2023-11-04 14:16:24 +07:00
_ , ok := sub . items [ peerID ]
2023-04-20 10:44:06 -04:00
return ok
}
2023-09-19 15:52:11 +03:00
// Check if we have subscriptions for all (pubsubTopic, contentTopics[i]) pairs provided
2023-09-29 10:43:25 +05:30
func ( sub * SubscriptionsMap ) Has ( peerID peer . ID , cf protocol . ContentFilter ) bool {
2023-04-20 10:44:06 -04:00
sub . RLock ( )
defer sub . RUnlock ( )
2023-03-08 11:58:51 -04:00
// Check if peer exits
2023-11-04 14:16:24 +07:00
peerSubscription , ok := sub . items [ peerID ]
2023-03-08 11:58:51 -04:00
if ! ok {
return false
}
2023-09-12 18:04:43 +05:30
//TODO: Handle pubsubTopic as null
2023-03-08 11:58:51 -04:00
// Check if pubsub topic exists
2023-09-29 10:43:25 +05:30
subscriptions , ok := peerSubscription . SubsPerPubsubTopic [ cf . PubsubTopic ]
2023-03-08 11:58:51 -04:00
if ! ok {
return false
}
// Check if the content topic exists within the list of subscriptions for this peer
2023-09-21 13:36:04 +03:00
for _ , ct := range cf . ContentTopicsList ( ) {
2023-03-08 11:58:51 -04:00
found := false
for _ , subscription := range subscriptions {
2023-09-19 15:52:11 +03:00
_ , exists := subscription . ContentFilter . ContentTopics [ ct ]
2023-03-08 11:58:51 -04:00
if exists {
found = true
break
}
}
if ! found {
return false
}
}
return true
}
2024-07-01 19:48:00 +05:30
// Caller has to acquire lock before invoking this method.This is done to avoid possible deadlock
func ( sub * SubscriptionsMap ) DeleteNoLock ( subscription * SubscriptionDetails ) error {
2023-02-08 19:33:06 -04:00
2023-11-04 14:16:24 +07:00
peerSubscription , ok := sub . items [ subscription . PeerID ]
2023-02-08 19:33:06 -04:00
if ! ok {
return ErrNotFound
}
2023-11-04 14:16:24 +07:00
contentFilter := subscription . ContentFilter
delete ( peerSubscription . SubsPerPubsubTopic [ contentFilter . PubsubTopic ] , subscription . ID )
2023-02-08 19:33:06 -04:00
2024-06-12 18:33:57 +05:30
if len ( peerSubscription . SubsPerPubsubTopic [ contentFilter . PubsubTopic ] ) == 0 {
sub . logger . Debug ( "no more subs for pubsubTopic for this peer" , zap . Stringer ( "id" , subscription . PeerID ) , zap . String ( "pubsubtopic" , contentFilter . PubsubTopic ) )
delete ( peerSubscription . SubsPerPubsubTopic , contentFilter . PubsubTopic )
}
2023-11-04 14:16:24 +07:00
// Decrease the number of subscriptions for this (pubsubTopic, contentTopic) pair
for contentTopic := range contentFilter . ContentTopics {
sub . decreaseSubFor ( contentFilter . PubsubTopic , contentTopic )
2023-02-08 19:33:06 -04:00
}
2023-02-15 14:43:51 -04:00
2024-06-07 15:35:04 +05:30
if len ( peerSubscription . SubsPerPubsubTopic ) == 0 {
sub . logger . Debug ( "no more subs for peer" , zap . Stringer ( "id" , subscription . PeerID ) )
delete ( sub . items , subscription . PeerID )
}
2023-11-04 14:16:24 +07:00
return nil
2023-02-15 14:43:51 -04:00
}
2023-02-08 19:33:06 -04:00
func ( sub * SubscriptionsMap ) clear ( ) {
2023-11-04 14:16:24 +07:00
for _ , peerSubscription := range sub . items {
2023-09-29 10:43:25 +05:30
for _ , subscriptionSet := range peerSubscription . SubsPerPubsubTopic {
2023-02-08 19:33:06 -04:00
for _ , subscription := range subscriptionSet {
2023-09-29 10:43:25 +05:30
subscription . CloseC ( )
2023-02-08 19:33:06 -04:00
}
}
}
2023-11-04 14:16:24 +07:00
sub . items = make ( map [ peer . ID ] * PeerSubscription )
2023-02-08 19:33:06 -04:00
}
func ( sub * SubscriptionsMap ) Clear ( ) {
sub . Lock ( )
defer sub . Unlock ( )
sub . clear ( )
}
2024-07-15 19:47:27 +05:30
func ( sub * SubscriptionsMap ) Notify ( ctx context . Context , peerID peer . ID , envelope * protocol . Envelope ) {
2023-02-08 19:33:06 -04:00
sub . RLock ( )
defer sub . RUnlock ( )
2023-11-04 14:16:24 +07:00
subscriptions , ok := sub . items [ peerID ] . SubsPerPubsubTopic [ envelope . PubsubTopic ( ) ]
2023-02-08 19:33:06 -04:00
if ok {
2024-07-15 19:47:27 +05:30
iterateSubscriptionSet ( ctx , sub . logger , subscriptions , envelope )
2023-02-08 19:33:06 -04:00
}
}
2024-07-15 19:47:27 +05:30
func iterateSubscriptionSet ( ctx context . Context , logger * zap . Logger , subscriptions SubscriptionSet , envelope * protocol . Envelope ) {
2023-02-08 19:33:06 -04:00
for _ , subscription := range subscriptions {
func ( subscription * SubscriptionDetails ) {
subscription . RLock ( )
defer subscription . RUnlock ( )
2023-09-19 15:52:11 +03:00
_ , ok := subscription . ContentFilter . ContentTopics [ envelope . Message ( ) . ContentTopic ]
2023-08-30 17:35:08 +07:00
if ! ok { // only send the msg to subscriptions that have matching contentTopic
2023-02-08 19:33:06 -04:00
return
}
2023-05-08 17:33:10 -04:00
if ! subscription . Closed {
select {
2024-07-15 19:47:27 +05:30
case <- ctx . Done ( ) :
return
2023-05-08 17:33:10 -04:00
case subscription . C <- envelope :
default :
logger . Warn ( "can't deliver message to subscription. subscriber too slow" )
}
2023-02-08 19:33:06 -04:00
}
} ( subscription )
}
}
2023-06-22 14:55:51 -04:00
2024-01-26 14:15:15 +05:30
func ( m * SubscriptionsMap ) GetSubscriptionsForPeer ( peerID peer . ID , contentFilter protocol . ContentFilter ) [ ] * SubscriptionDetails {
2023-11-04 14:16:24 +07:00
m . RLock ( )
defer m . RUnlock ( )
2023-06-22 14:55:51 -04:00
2023-11-04 14:16:24 +07:00
var output [ ] * SubscriptionDetails
for _ , peerSubs := range m . items {
if peerID == "" || peerSubs . PeerID == peerID {
for _ , subs := range peerSubs . SubsPerPubsubTopic {
for _ , subscriptionDetail := range subs {
if subscriptionDetail . isPartOf ( contentFilter ) {
output = append ( output , subscriptionDetail )
}
}
}
}
2023-06-22 14:55:51 -04:00
}
2023-11-20 09:27:22 -04:00
return output
2023-06-22 14:55:51 -04:00
}
2024-01-26 14:15:15 +05:30
2024-05-22 11:45:53 +05:30
func ( m * SubscriptionsMap ) GetAllSubscriptionsForPeer ( peerID peer . ID ) [ ] * SubscriptionDetails {
m . RLock ( )
defer m . RUnlock ( )
var output [ ] * SubscriptionDetails
for _ , peerSubs := range m . items {
if peerSubs . PeerID == peerID {
for _ , subs := range peerSubs . SubsPerPubsubTopic {
for _ , subscriptionDetail := range subs {
output = append ( output , subscriptionDetail )
}
}
break
}
}
return output
}
func ( m * SubscriptionsMap ) GetSubscribedPeers ( ) peer . IDSlice {
m . RLock ( )
defer m . RUnlock ( )
return maps . Keys ( m . items )
}
2024-01-26 14:15:15 +05:30
func ( m * SubscriptionsMap ) GetAllSubscriptions ( ) [ ] * SubscriptionDetails {
return m . GetSubscriptionsForPeer ( "" , protocol . ContentFilter { } )
}