feat: update lightpush API for autosharding (#774)

* feat: update lightpush API to make pubSubTopic optional as per autosharding

* Extract contentFilter and subscriptions out of filter to reuse in relay (#779)

* chore: extract contentFilter outside filter package

* chore: move subscription outside of filter so that it can be modified and reused for relay

* Feat: filter select peer for sharding (#783)

* update selectPeer to support pubsubTopic based selection
This commit is contained in:
Prem Chaitanya Prathi 2023-09-29 10:43:25 +05:30 committed by GitHub
parent dfd104dbac
commit 47c961dcbb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 417 additions and 205 deletions

View File

@ -63,9 +63,9 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.
} }
if options.Filter.Enable { if options.Filter.Enable {
cf := filter.ContentFilter{ cf := protocol.ContentFilter{
PubsubTopic: relay.DefaultWakuTopic, PubsubTopic: relay.DefaultWakuTopic,
ContentTopics: filter.NewContentTopicSet(options.ContentTopic), ContentTopics: protocol.NewContentTopicSet(options.ContentTopic),
} }
var filterOpt filter.FilterSubscribeOption var filterOpt filter.FilterSubscribeOption
peerID, err := options.Filter.NodePeerID() peerID, err := options.Filter.NodePeerID()
@ -269,7 +269,7 @@ func (c *Chat) SendMessage(line string) {
err := c.publish(tCtx, line) err := c.publish(tCtx, line)
if err != nil { if err != nil {
if err.Error() == "validation failed" { if err.Error() == "validation failed" {
err = errors.New("message rate violation!") err = errors.New("message rate violation")
} }
c.ui.ErrorMessage(err) c.ui.ErrorMessage(err)
} }
@ -524,7 +524,7 @@ func (c *Chat) discoverNodes(connectionWg *sync.WaitGroup) {
ctx, cancel := context.WithTimeout(ctx, time.Duration(10)*time.Second) ctx, cancel := context.WithTimeout(ctx, time.Duration(10)*time.Second)
defer cancel() defer cancel()
err = c.node.DialPeerWithInfo(ctx, n) err = c.node.DialPeerWithInfo(ctx, info)
if err != nil { if err != nil {
c.ui.ErrorMessage(fmt.Errorf("co!!uld not connect to %s: %w", info.ID.Pretty(), err)) c.ui.ErrorMessage(fmt.Errorf("co!!uld not connect to %s: %w", info.ID.Pretty(), err))

View File

@ -97,8 +97,8 @@ func main() {
} }
// Send FilterRequest from light node to full node // Send FilterRequest from light node to full node
cf := filter.ContentFilter{ cf := protocol.ContentFilter{
ContentTopics: filter.NewContentTopicSet(contentTopic), ContentTopics: protocol.NewContentTopicSet(contentTopic),
} }
theFilter, err := lightNode.FilterLightnode().Subscribe(ctx, cf) theFilter, err := lightNode.FilterLightnode().Subscribe(ctx, cf)

View File

@ -1083,7 +1083,7 @@ Publish a message using Waku Lightpush.
1. `char* messageJson`: JSON string containing the [Waku Message](https://rfc.vac.dev/spec/14/) as [`JsonMessage`](#jsonmessage-type). 1. `char* messageJson`: JSON string containing the [Waku Message](https://rfc.vac.dev/spec/14/) as [`JsonMessage`](#jsonmessage-type).
2. `char* pubsubTopic`: pubsub topic on which to publish the message. 2. `char* pubsubTopic`: pubsub topic on which to publish the message.
If `NULL`, it uses the default pubsub topic. If `NULL`, it derives the pubsub topic from content-topic based on autosharding.
3. `char* peerID`: Peer ID supporting the lightpush protocol. 3. `char* peerID`: Peer ID supporting the lightpush protocol.
The peer must be already known. The peer must be already known.
It must have been added before with [`waku_add_peer`](#extern-char-waku_add_peerchar-address-char-protocolid) It must have been added before with [`waku_add_peer`](#extern-char-waku_add_peerchar-address-char-protocolid)

View File

@ -6,7 +6,7 @@ package main
import "C" import "C"
import "github.com/waku-org/go-waku/library" import "github.com/waku-org/go-waku/library"
// Publish a message using waku lightpush. Use NULL for topic to use the default pubsub topic.. // Publish a message using waku lightpush. Use NULL for topic to derive the pubsub topic from the contentTopic.
// peerID should contain the ID of a peer supporting the lightpush protocol. Use NULL to automatically select a node // peerID should contain the ID of a peer supporting the lightpush protocol. Use NULL to automatically select a node
// If ms is greater than 0, the broadcast of the message must happen before the timeout // If ms is greater than 0, the broadcast of the message must happen before the timeout
// (in milliseconds) is reached, or an error will be returned // (in milliseconds) is reached, or an error will be returned

View File

@ -7,7 +7,9 @@ import (
"time" "time"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter" "github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
) )
type filterArgument struct { type filterArgument struct {
@ -15,22 +17,22 @@ type filterArgument struct {
ContentTopics []string `json:"contentTopics,omitempty"` ContentTopics []string `json:"contentTopics,omitempty"`
} }
func toContentFilter(filterJSON string) (filter.ContentFilter, error) { func toContentFilter(filterJSON string) (protocol.ContentFilter, error) {
var f filterArgument var f filterArgument
err := json.Unmarshal([]byte(filterJSON), &f) err := json.Unmarshal([]byte(filterJSON), &f)
if err != nil { if err != nil {
return filter.ContentFilter{}, err return protocol.ContentFilter{}, err
} }
return filter.ContentFilter{ return protocol.ContentFilter{
PubsubTopic: f.PubsubTopic, PubsubTopic: f.PubsubTopic,
ContentTopics: filter.NewContentTopicSet(f.ContentTopics...), ContentTopics: protocol.NewContentTopicSet(f.ContentTopics...),
}, nil }, nil
} }
type subscribeResult struct { type subscribeResult struct {
Subscriptions []*filter.SubscriptionDetails `json:"subscriptions"` Subscriptions []*subscription.SubscriptionDetails `json:"subscriptions"`
Error string `json:"error,omitempty"` Error string `json:"error,omitempty"`
} }
// FilterSubscribe is used to create a subscription to a filter node to receive messages // FilterSubscribe is used to create a subscription to a filter node to receive messages
@ -71,7 +73,7 @@ func FilterSubscribe(filterJSON string, peerID string, ms int) (string, error) {
} }
for _, subscriptionDetails := range subscriptions { for _, subscriptionDetails := range subscriptions {
go func(subscriptionDetails *filter.SubscriptionDetails) { go func(subscriptionDetails *subscription.SubscriptionDetails) {
for envelope := range subscriptionDetails.C { for envelope := range subscriptionDetails.C {
send("message", toSubscriptionMessage(envelope)) send("message", toSubscriptionMessage(envelope))
} }

View File

@ -37,16 +37,20 @@ func lightpushPublish(msg *pb.WakuMessage, pubsubTopic string, peerID string, ms
lpOptions = append(lpOptions, lightpush.WithAutomaticPeerSelection()) lpOptions = append(lpOptions, lightpush.WithAutomaticPeerSelection())
} }
hash, err := wakuState.node.Lightpush().PublishToTopic(ctx, msg, pubsubTopic, lpOptions...) if pubsubTopic != "" {
lpOptions = append(lpOptions, lightpush.WithPubSubTopic(pubsubTopic))
}
hash, err := wakuState.node.Lightpush().PublishToTopic(ctx, msg, lpOptions...)
return hexutil.Encode(hash), err return hexutil.Encode(hash), err
} }
// LightpushPublish is used to publish a WakuMessage in a pubsub topic using Lightpush protocol // LightpushPublish is used to publish a WakuMessage in a pubsub topic using Lightpush protocol
func LightpushPublish(messageJSON string, topic string, peerID string, ms int) (string, error) { func LightpushPublish(messageJSON string, pubsubTopic string, peerID string, ms int) (string, error) {
msg, err := wakuMessage(messageJSON) msg, err := wakuMessage(messageJSON)
if err != nil { if err != nil {
return "", err return "", err
} }
return lightpushPublish(msg, getTopic(topic), peerID, ms) return lightpushPublish(msg, getTopic(pubsubTopic), peerID, ms)
} }

View File

@ -847,11 +847,21 @@ func (w *WakuNode) Peers() ([]*Peer, error) {
return peers, nil return peers, nil
} }
func (w *WakuNode) PeersByShard(cluster uint16, shard uint16) peer.IDSlice { // PeersByShard filters peers based on shard information following static sharding
func (w *WakuNode) PeersByStaticShard(cluster uint16, shard uint16) peer.IDSlice {
pTopic := wakuprotocol.NewStaticShardingPubsubTopic(cluster, shard).String() pTopic := wakuprotocol.NewStaticShardingPubsubTopic(cluster, shard).String()
return w.peerstore.(wps.WakuPeerstore).PeersByPubSubTopic(pTopic) return w.peerstore.(wps.WakuPeerstore).PeersByPubSubTopic(pTopic)
} }
// PeersByContentTopics filters peers based on contentTopic
func (w *WakuNode) PeersByContentTopic(contentTopic string) peer.IDSlice {
pTopic, err := wakuprotocol.GetPubSubTopicFromContentTopic(contentTopic)
if err != nil {
return nil
}
return w.peerstore.(wps.WakuPeerstore).PeersByPubSubTopic(pTopic)
}
func (w *WakuNode) findRelayNodes(ctx context.Context) { func (w *WakuNode) findRelayNodes(ctx context.Context) {
defer w.wg.Done() defer w.wg.Done()

View File

@ -127,7 +127,7 @@ func (pm *PeerManager) connectivityLoop(ctx context.Context) {
} }
// GroupPeersByDirection returns all the connected peers in peer store grouped by Inbound or outBound direction // GroupPeersByDirection returns all the connected peers in peer store grouped by Inbound or outBound direction
func (pm *PeerManager) GroupPeersByDirection(specificPeers []peer.ID) (inPeers peer.IDSlice, outPeers peer.IDSlice, err error) { func (pm *PeerManager) GroupPeersByDirection(specificPeers ...peer.ID) (inPeers peer.IDSlice, outPeers peer.IDSlice, err error) {
if len(specificPeers) == 0 { if len(specificPeers) == 0 {
specificPeers = pm.host.Network().Peers() specificPeers = pm.host.Network().Peers()
} }
@ -150,9 +150,9 @@ func (pm *PeerManager) GroupPeersByDirection(specificPeers []peer.ID) (inPeers p
// getRelayPeers - Returns list of in and out peers supporting WakuRelayProtocol within specifiedPeers. // getRelayPeers - Returns list of in and out peers supporting WakuRelayProtocol within specifiedPeers.
// If specifiedPeers is empty, it checks within all peers in peerStore. // If specifiedPeers is empty, it checks within all peers in peerStore.
func (pm *PeerManager) getRelayPeers(specificPeers []peer.ID) (inRelayPeers peer.IDSlice, outRelayPeers peer.IDSlice) { func (pm *PeerManager) getRelayPeers(specificPeers ...peer.ID) (inRelayPeers peer.IDSlice, outRelayPeers peer.IDSlice) {
//Group peers by their connected direction inbound or outbound. //Group peers by their connected direction inbound or outbound.
inPeers, outPeers, err := pm.GroupPeersByDirection(specificPeers) inPeers, outPeers, err := pm.GroupPeersByDirection(specificPeers...)
if err != nil { if err != nil {
return return
} }
@ -206,7 +206,7 @@ func (pm *PeerManager) connectToRelayPeers() {
//Check for out peer connections and connect to more peers. //Check for out peer connections and connect to more peers.
pm.ensureMinRelayConnsPerTopic() pm.ensureMinRelayConnsPerTopic()
inRelayPeers, outRelayPeers := pm.getRelayPeers(nil) inRelayPeers, outRelayPeers := pm.getRelayPeers()
pm.logger.Info("number of relay peers connected", pm.logger.Info("number of relay peers connected",
zap.Int("in", inRelayPeers.Len()), zap.Int("in", inRelayPeers.Len()),
zap.Int("out", outRelayPeers.Len())) zap.Int("out", outRelayPeers.Len()))
@ -417,22 +417,32 @@ func (pm *PeerManager) addPeerToServiceSlot(proto protocol.ID, peerID peer.ID) {
pm.serviceSlots.getPeers(proto).add(peerID) pm.serviceSlots.getPeers(proto).add(peerID)
} }
// SelectPeerByContentTopic is used to return a random peer that supports a given protocol for given contentTopic.
// If a list of specific peers is passed, the peer will be chosen from that list assuming
// it supports the chosen protocol and contentTopic, otherwise it will chose a peer from the service slot.
// If a peer cannot be found in the service slot, a peer will be selected from node peerstore
func (pm *PeerManager) SelectPeerByContentTopic(proto protocol.ID, contentTopic string, specificPeers ...peer.ID) (peer.ID, error) {
pubsubTopic, err := waku_proto.GetPubSubTopicFromContentTopic(contentTopic)
if err != nil {
return "", err
}
return pm.SelectPeer(proto, pubsubTopic, specificPeers...)
}
// SelectPeer is used to return a random peer that supports a given protocol. // SelectPeer is used to return a random peer that supports a given protocol.
// If a list of specific peers is passed, the peer will be chosen from that list assuming // If a list of specific peers is passed, the peer will be chosen from that list assuming
// it supports the chosen protocol, otherwise it will chose a peer from the service slot. // it supports the chosen protocol, otherwise it will chose a peer from the service slot.
// If a peer cannot be found in the service slot, a peer will be selected from node peerstore // If a peer cannot be found in the service slot, a peer will be selected from node peerstore
func (pm *PeerManager) SelectPeer(proto protocol.ID, specificPeers []peer.ID) (peer.ID, error) { // if pubSubTopic is specified, peer is selected from list that support the pubSubTopic
func (pm *PeerManager) SelectPeer(proto protocol.ID, pubSubTopic string, specificPeers ...peer.ID) (peer.ID, error) {
// @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service. // @TODO We need to be more strategic about which peers we dial. Right now we just set one on the service.
// Ideally depending on the query and our set of peers we take a subset of ideal peers. // Ideally depending on the query and our set of peers we take a subset of ideal peers.
// This will require us to check for various factors such as: // This will require us to check for various factors such as:
// - which topics they track // - which topics they track
// - latency? // - latency?
//Try to fetch from serviceSlot if peerID := pm.selectServicePeer(proto, pubSubTopic, specificPeers...); peerID != nil {
if slot := pm.serviceSlots.getPeers(proto); slot != nil { return *peerID, nil
if peerID, err := slot.getRandom(); err == nil {
return peerID, nil
}
} }
// if not found in serviceSlots or proto == WakuRelayIDv200 // if not found in serviceSlots or proto == WakuRelayIDv200
@ -440,6 +450,36 @@ func (pm *PeerManager) SelectPeer(proto protocol.ID, specificPeers []peer.ID) (p
if err != nil { if err != nil {
return "", err return "", err
} }
if pubSubTopic != "" {
filteredPeers = pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(pubSubTopic, filteredPeers...)
}
return utils.SelectRandomPeer(filteredPeers, pm.logger) return utils.SelectRandomPeer(filteredPeers, pm.logger)
} }
func (pm *PeerManager) selectServicePeer(proto protocol.ID, pubSubTopic string, specificPeers ...peer.ID) (peerIDPtr *peer.ID) {
peerIDPtr = nil
//Try to fetch from serviceSlot
if slot := pm.serviceSlots.getPeers(proto); slot != nil {
if pubSubTopic == "" {
if peerID, err := slot.getRandom(); err == nil {
peerIDPtr = &peerID
} else {
pm.logger.Debug("could not retrieve random peer from slot", zap.Error(err))
}
} else { //PubsubTopic based selection
keys := make([]peer.ID, 0, len(slot.m))
for i := range slot.m {
keys = append(keys, i)
}
selectedPeers := pm.host.Peerstore().(wps.WakuPeerstore).PeersByPubSubTopic(pubSubTopic, keys...)
peerID, err := utils.SelectRandomPeer(selectedPeers, pm.logger)
if err == nil {
peerIDPtr = &peerID
} else {
pm.logger.Debug("could not select random peer", zap.Error(err))
}
}
}
return
}

View File

@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/tests"
wps "github.com/waku-org/go-waku/waku/v2/peerstore" wps "github.com/waku-org/go-waku/waku/v2/peerstore"
wakuproto "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/utils" "github.com/waku-org/go-waku/waku/v2/utils"
) )
@ -65,7 +66,7 @@ func TestServiceSlots(t *testing.T) {
/////////////// ///////////////
// select peer from pm, currently only h2 is set in pm // select peer from pm, currently only h2 is set in pm
peerID, err := pm.SelectPeer(protocol, nil) peerID, err := pm.SelectPeer(protocol, "")
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, peerID, h2.ID()) require.Equal(t, peerID, h2.ID())
@ -74,7 +75,7 @@ func TestServiceSlots(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// check that returned peer is h2 or h3 peer // check that returned peer is h2 or h3 peer
peerID, err = pm.SelectPeer(protocol, nil) peerID, err = pm.SelectPeer(protocol, "")
require.NoError(t, err) require.NoError(t, err)
if peerID == h2.ID() || peerID == h3.ID() { if peerID == h2.ID() || peerID == h3.ID() {
//Test success //Test success
@ -90,7 +91,7 @@ func TestServiceSlots(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer h4.Close() defer h4.Close()
_, err = pm.SelectPeer(protocol1, nil) _, err = pm.SelectPeer(protocol1, "")
require.Error(t, err, utils.ErrNoPeersAvailable) require.Error(t, err, utils.ErrNoPeersAvailable)
// add h4 peer for protocol1 // add h4 peer for protocol1
@ -98,10 +99,47 @@ func TestServiceSlots(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
//Test peer selection for protocol1 //Test peer selection for protocol1
peerID, err = pm.SelectPeer(protocol1, nil) peerID, err = pm.SelectPeer(protocol1, "")
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, peerID, h4.ID()) require.Equal(t, peerID, h4.ID())
_, err = pm.SelectPeerByContentTopic(protocol1, "")
require.Error(t, wakuproto.ErrInvalidFormat, err)
}
func TestPeerSelection(t *testing.T) {
ctx, pm, deferFn := initTest(t)
defer deferFn()
h2, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
defer h2.Close()
h3, err := tests.MakeHost(ctx, 0, rand.Reader)
require.NoError(t, err)
defer h3.Close()
protocol := libp2pProtocol.ID("test/protocol")
_, err = pm.AddPeer(getAddr(h2), wps.Static, []string{"/waku/rs/2/1", "/waku/rs/2/2"}, libp2pProtocol.ID(protocol))
require.NoError(t, err)
_, err = pm.AddPeer(getAddr(h3), wps.Static, []string{"/waku/rs/2/1"}, libp2pProtocol.ID(protocol))
require.NoError(t, err)
_, err = pm.SelectPeer(protocol, "")
require.NoError(t, err)
peerID, err := pm.SelectPeer(protocol, "/waku/rs/2/2")
require.NoError(t, err)
require.Equal(t, h2.ID(), peerID)
_, err = pm.SelectPeer(protocol, "/waku/rs/2/3")
require.Error(t, utils.ErrNoPeersAvailable, err)
_, err = pm.SelectPeer(protocol, "/waku/rs/2/1")
require.NoError(t, err)
} }
func TestDefaultProtocol(t *testing.T) { func TestDefaultProtocol(t *testing.T) {
@ -111,7 +149,7 @@ func TestDefaultProtocol(t *testing.T) {
// check peer for default protocol // check peer for default protocol
/////////////// ///////////////
//Test empty peer selection for relay protocol //Test empty peer selection for relay protocol
_, err := pm.SelectPeer(relay.WakuRelayID_v200, nil) _, err := pm.SelectPeer(relay.WakuRelayID_v200, "")
require.Error(t, err, utils.ErrNoPeersAvailable) require.Error(t, err, utils.ErrNoPeersAvailable)
/////////////// ///////////////
@ -126,7 +164,7 @@ func TestDefaultProtocol(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
// since we are not passing peerList, selectPeer fn using filterByProto checks in PeerStore for peers with same protocol. // since we are not passing peerList, selectPeer fn using filterByProto checks in PeerStore for peers with same protocol.
peerID, err := pm.SelectPeer(relay.WakuRelayID_v200, nil) peerID, err := pm.SelectPeer(relay.WakuRelayID_v200, "")
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, peerID, h5.ID()) require.Equal(t, peerID, h5.ID())
} }
@ -146,12 +184,12 @@ func TestAdditionAndRemovalOfPeer(t *testing.T) {
_, err = pm.AddPeer(getAddr(h6), wps.Static, []string{""}, protocol2) _, err = pm.AddPeer(getAddr(h6), wps.Static, []string{""}, protocol2)
require.NoError(t, err) require.NoError(t, err)
peerID, err := pm.SelectPeer(protocol2, nil) peerID, err := pm.SelectPeer(protocol2, "")
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, peerID, h6.ID()) require.Equal(t, peerID, h6.ID())
pm.RemovePeer(peerID) pm.RemovePeer(peerID)
_, err = pm.SelectPeer(protocol2, nil) _, err = pm.SelectPeer(protocol2, "")
require.Error(t, err, utils.ErrNoPeersAvailable) require.Error(t, err, utils.ErrNoPeersAvailable)
} }

View File

@ -59,7 +59,7 @@ type WakuPeerstore interface {
RemovePubSubTopic(p peer.ID, topic string) error RemovePubSubTopic(p peer.ID, topic string) error
PubSubTopics(p peer.ID) ([]string, error) PubSubTopics(p peer.ID) ([]string, error)
SetPubSubTopics(p peer.ID, topics []string) error SetPubSubTopics(p peer.ID, topics []string) error
PeersByPubSubTopic(pubSubTopic string) peer.IDSlice PeersByPubSubTopic(pubSubTopic string, specificPeers ...peer.ID) peer.IDSlice
} }
// NewWakuPeerstore creates a new WakuPeerStore object // NewWakuPeerstore creates a new WakuPeerStore object
@ -208,9 +208,13 @@ func (ps *WakuPeerstoreImpl) PubSubTopics(p peer.ID) ([]string, error) {
} }
// PeersByPubSubTopic Returns list of peers by pubSubTopic // PeersByPubSubTopic Returns list of peers by pubSubTopic
func (ps *WakuPeerstoreImpl) PeersByPubSubTopic(pubSubTopic string) peer.IDSlice { // If specifiPeers are listed, filtering is done from them otherwise from all peers in peerstore
func (ps *WakuPeerstoreImpl) PeersByPubSubTopic(pubSubTopic string, specificPeers ...peer.ID) peer.IDSlice {
if specificPeers == nil {
specificPeers = ps.Peers()
}
var result peer.IDSlice var result peer.IDSlice
for _, p := range ps.Peers() { for _, p := range specificPeers {
topics, err := ps.PubSubTopics(p) topics, err := ps.PubSubTopics(p)
if err == nil { if err == nil {
for _, topic := range topics { for _, topic := range topics {

View File

@ -0,0 +1,30 @@
package protocol
import "golang.org/x/exp/maps"
type ContentTopicSet map[string]struct{}
func NewContentTopicSet(contentTopics ...string) ContentTopicSet {
s := make(ContentTopicSet, len(contentTopics))
for _, ct := range contentTopics {
s[ct] = struct{}{}
}
return s
}
// ContentFilter is used to specify the filter to be applied for a FilterNode.
// Topic means pubSubTopic - optional in case of using contentTopics that following Auto sharding, mandatory in case of named or static sharding.
// ContentTopics - Specify list of content topics to be filtered under a pubSubTopic (for named and static sharding), or a list of contentTopics (in case ofAuto sharding)
// If pubSub topic is not specified, then content-topics are used to derive the shard and corresponding pubSubTopic using autosharding algorithm
type ContentFilter struct {
PubsubTopic string
ContentTopics ContentTopicSet
}
func (cf ContentFilter) ContentTopicsList() []string {
return maps.Keys(cf.ContentTopics)
}
func NewContentFilter(pubsubTopic string, contentTopics ...string) ContentFilter {
return ContentFilter{pubsubTopic, NewContentTopicSet(contentTopics...)}
}

View File

@ -21,9 +21,9 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/filter/pb" "github.com/waku-org/go-waku/waku/v2/protocol/filter/pb"
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb" wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
"github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/timesource"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/exp/maps"
) )
// FilterPushID_v20beta1 is the current Waku Filter protocol identifier used to allow // FilterPushID_v20beta1 is the current Waku Filter protocol identifier used to allow
@ -41,27 +41,10 @@ type WakuFilterLightNode struct {
timesource timesource.Timesource timesource timesource.Timesource
metrics Metrics metrics Metrics
log *zap.Logger log *zap.Logger
subscriptions *SubscriptionsMap subscriptions *subscription.SubscriptionsMap
pm *peermanager.PeerManager pm *peermanager.PeerManager
} }
// ContentFilter is used to specify the filter to be applied for a FilterNode.
// Topic means pubSubTopic - optional in case of using contentTopics that following Auto sharding, mandatory in case of named or static sharding.
// ContentTopics - Specify list of content topics to be filtered under a pubSubTopic (for named and static sharding), or a list of contentTopics (in case ofAuto sharding)
// If pubSub topic is not specified, then content-topics are used to derive the shard and corresponding pubSubTopic using autosharding algorithm
type ContentFilter struct {
PubsubTopic string
ContentTopics ContentTopicSet
}
func (cf ContentFilter) ContentTopicsList() []string {
return maps.Keys(cf.ContentTopics)
}
func NewContentFilter(pubsubTopic string, contentTopics ...string) ContentFilter {
return ContentFilter{pubsubTopic, NewContentTopicSet(contentTopics...)}
}
type WakuFilterPushResult struct { type WakuFilterPushResult struct {
Err error Err error
PeerID peer.ID PeerID peer.ID
@ -95,7 +78,7 @@ func (wf *WakuFilterLightNode) Start(ctx context.Context) error {
} }
func (wf *WakuFilterLightNode) start() error { func (wf *WakuFilterLightNode) start() error {
wf.subscriptions = NewSubscriptionMap(wf.log) wf.subscriptions = subscription.NewSubscriptionMap(wf.log)
wf.h.SetStreamHandlerMatch(FilterPushID_v20beta1, protocol.PrefixTextMatch(string(FilterPushID_v20beta1)), wf.onRequest(wf.Context())) wf.h.SetStreamHandlerMatch(FilterPushID_v20beta1, protocol.PrefixTextMatch(string(FilterPushID_v20beta1)), wf.onRequest(wf.Context()))
wf.log.Info("filter-push protocol started") wf.log.Info("filter-push protocol started")
@ -144,7 +127,7 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Str
pubSubTopic := "" pubSubTopic := ""
//For now returning failure, this will get addressed with autosharding changes for filter. //For now returning failure, this will get addressed with autosharding changes for filter.
if messagePush.PubsubTopic == nil { if messagePush.PubsubTopic == nil {
pubSubTopic, err = getPubSubTopicFromContentTopic(messagePush.WakuMessage.ContentTopic) pubSubTopic, err = protocol.GetPubSubTopicFromContentTopic(messagePush.WakuMessage.ContentTopic)
if err != nil { if err != nil {
logger.Error("could not derive pubSubTopic from contentTopic", zap.Error(err)) logger.Error("could not derive pubSubTopic from contentTopic", zap.Error(err))
wf.metrics.RecordError(decodeRPCFailure) wf.metrics.RecordError(decodeRPCFailure)
@ -153,7 +136,7 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Str
} else { } else {
pubSubTopic = *messagePush.PubsubTopic pubSubTopic = *messagePush.PubsubTopic
} }
if !wf.subscriptions.Has(s.Conn().RemotePeer(), NewContentFilter(pubSubTopic, messagePush.WakuMessage.ContentTopic)) { if !wf.subscriptions.Has(s.Conn().RemotePeer(), protocol.NewContentFilter(pubSubTopic, messagePush.WakuMessage.ContentTopic)) {
logger.Warn("received messagepush with invalid subscription parameters", logger.Warn("received messagepush with invalid subscription parameters",
logging.HostID("peerID", s.Conn().RemotePeer()), zap.String("topic", pubSubTopic), logging.HostID("peerID", s.Conn().RemotePeer()), zap.String("topic", pubSubTopic),
zap.String("contentTopic", messagePush.WakuMessage.ContentTopic)) zap.String("contentTopic", messagePush.WakuMessage.ContentTopic))
@ -181,7 +164,7 @@ func (wf *WakuFilterLightNode) notify(remotePeerID peer.ID, pubsubTopic string,
} }
func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscribeParameters, func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscribeParameters,
reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter ContentFilter) error { reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter protocol.ContentFilter) error {
conn, err := wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_v20beta1) conn, err := wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_v20beta1)
if err != nil { if err != nil {
wf.metrics.RecordError(dialFailure) wf.metrics.RecordError(dialFailure)
@ -230,18 +213,8 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr
return nil return nil
} }
func getPubSubTopicFromContentTopic(cTopicString string) (string, error) {
cTopic, err := protocol.StringToContentTopic(cTopicString)
if err != nil {
return "", fmt.Errorf("%s : %s", err.Error(), cTopicString)
}
pTopic := protocol.GetShardFromContentTopic(cTopic, protocol.GenerationZeroShardsCount)
return pTopic.String(), nil
}
// This function converts a contentFilter into a map of pubSubTopics and corresponding contentTopics // This function converts a contentFilter into a map of pubSubTopics and corresponding contentTopics
func contentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[string][]string, error) { func contentFilterToPubSubTopicMap(contentFilter protocol.ContentFilter) (map[string][]string, error) {
pubSubTopicMap := make(map[string][]string) pubSubTopicMap := make(map[string][]string)
if contentFilter.PubsubTopic != "" { if contentFilter.PubsubTopic != "" {
@ -249,7 +222,7 @@ func contentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[string][]st
} else { } else {
//Parse the content-Topics to figure out shards. //Parse the content-Topics to figure out shards.
for _, cTopicString := range contentFilter.ContentTopicsList() { for _, cTopicString := range contentFilter.ContentTopicsList() {
pTopicStr, err := getPubSubTopicFromContentTopic(cTopicString) pTopicStr, err := protocol.GetPubSubTopicFromContentTopic(cTopicString)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -267,7 +240,7 @@ func contentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[string][]st
// If contentTopics passed result in different pubSub topics (due to Auto/Static sharding), then multiple subscription requests are sent to the peer. // If contentTopics passed result in different pubSub topics (due to Auto/Static sharding), then multiple subscription requests are sent to the peer.
// This may change if Filterv2 protocol is updated to handle such a scenario in a single request. // This may change if Filterv2 protocol is updated to handle such a scenario in a single request.
// Note: In case of partial failure, results are returned for successful subscriptions along with error indicating failed contentTopics. // Note: In case of partial failure, results are returned for successful subscriptions along with error indicating failed contentTopics.
func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) ([]*SubscriptionDetails, error) { func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) ([]*subscription.SubscriptionDetails, error) {
wf.RLock() wf.RLock()
defer wf.RUnlock() defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil { if err := wf.ErrOnNotRunning(); err != nil {
@ -303,11 +276,11 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter Cont
return nil, err return nil, err
} }
failedContentTopics := []string{} failedContentTopics := []string{}
subscriptions := make([]*SubscriptionDetails, 0) subscriptions := make([]*subscription.SubscriptionDetails, 0)
for pubSubTopic, cTopics := range pubSubTopicMap { for pubSubTopic, cTopics := range pubSubTopicMap {
var cFilter ContentFilter var cFilter protocol.ContentFilter
cFilter.PubsubTopic = pubSubTopic cFilter.PubsubTopic = pubSubTopic
cFilter.ContentTopics = NewContentTopicSet(cTopics...) cFilter.ContentTopics = protocol.NewContentTopicSet(cTopics...)
err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, cFilter) err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, cFilter)
if err != nil { if err != nil {
wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics),
@ -325,7 +298,7 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter Cont
} }
// FilterSubscription is used to obtain an object from which you could receive messages received via filter protocol // FilterSubscription is used to obtain an object from which you could receive messages received via filter protocol
func (wf *WakuFilterLightNode) FilterSubscription(peerID peer.ID, contentFilter ContentFilter) (*SubscriptionDetails, error) { func (wf *WakuFilterLightNode) FilterSubscription(peerID peer.ID, contentFilter protocol.ContentFilter) (*subscription.SubscriptionDetails, error) {
wf.RLock() wf.RLock()
defer wf.RUnlock() defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil { if err := wf.ErrOnNotRunning(); err != nil {
@ -361,10 +334,10 @@ func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID) error {
ctx, ctx,
&FilterSubscribeParameters{selectedPeer: peerID, requestID: protocol.GenerateRequestID()}, &FilterSubscribeParameters{selectedPeer: peerID, requestID: protocol.GenerateRequestID()},
pb.FilterSubscribeRequest_SUBSCRIBER_PING, pb.FilterSubscribeRequest_SUBSCRIBER_PING,
ContentFilter{}) protocol.ContentFilter{})
} }
func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscription *SubscriptionDetails) error { func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscription *subscription.SubscriptionDetails) error {
wf.RLock() wf.RLock()
defer wf.RUnlock() defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil { if err := wf.ErrOnNotRunning(); err != nil {
@ -374,7 +347,7 @@ func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscrip
return wf.Ping(ctx, subscription.PeerID) return wf.Ping(ctx, subscription.PeerID)
} }
func (wf *WakuFilterLightNode) Subscriptions() []*SubscriptionDetails { func (wf *WakuFilterLightNode) Subscriptions() []*subscription.SubscriptionDetails {
wf.RLock() wf.RLock()
defer wf.RUnlock() defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil { if err := wf.ErrOnNotRunning(); err != nil {
@ -384,10 +357,10 @@ func (wf *WakuFilterLightNode) Subscriptions() []*SubscriptionDetails {
wf.subscriptions.RLock() wf.subscriptions.RLock()
defer wf.subscriptions.RUnlock() defer wf.subscriptions.RUnlock()
var output []*SubscriptionDetails var output []*subscription.SubscriptionDetails
for _, peerSubscription := range wf.subscriptions.items { for _, peerSubscription := range wf.subscriptions.Items {
for _, subscriptions := range peerSubscription.subsPerPubsubTopic { for _, subscriptions := range peerSubscription.SubsPerPubsubTopic {
for _, subscriptionDetail := range subscriptions { for _, subscriptionDetail := range subscriptions {
output = append(output, subscriptionDetail) output = append(output, subscriptionDetail)
} }
@ -397,16 +370,16 @@ func (wf *WakuFilterLightNode) Subscriptions() []*SubscriptionDetails {
return output return output
} }
func (wf *WakuFilterLightNode) cleanupSubscriptions(peerID peer.ID, contentFilter ContentFilter) { func (wf *WakuFilterLightNode) cleanupSubscriptions(peerID peer.ID, contentFilter protocol.ContentFilter) {
wf.subscriptions.Lock() wf.subscriptions.Lock()
defer wf.subscriptions.Unlock() defer wf.subscriptions.Unlock()
peerSubscription, ok := wf.subscriptions.items[peerID] peerSubscription, ok := wf.subscriptions.Items[peerID]
if !ok { if !ok {
return return
} }
subscriptionDetailList, ok := peerSubscription.subsPerPubsubTopic[contentFilter.PubsubTopic] subscriptionDetailList, ok := peerSubscription.SubsPerPubsubTopic[contentFilter.PubsubTopic]
if !ok { if !ok {
return return
} }
@ -415,18 +388,18 @@ func (wf *WakuFilterLightNode) cleanupSubscriptions(peerID peer.ID, contentFilte
subscriptionDetail.Remove(contentFilter.ContentTopicsList()...) subscriptionDetail.Remove(contentFilter.ContentTopicsList()...)
if len(subscriptionDetail.ContentFilter.ContentTopics) == 0 { if len(subscriptionDetail.ContentFilter.ContentTopics) == 0 {
delete(subscriptionDetailList, subscriptionDetailID) delete(subscriptionDetailList, subscriptionDetailID)
subscriptionDetail.closeC() subscriptionDetail.CloseC()
} }
} }
if len(subscriptionDetailList) == 0 { if len(subscriptionDetailList) == 0 {
delete(wf.subscriptions.items[peerID].subsPerPubsubTopic, contentFilter.PubsubTopic) delete(wf.subscriptions.Items[peerID].SubsPerPubsubTopic, contentFilter.PubsubTopic)
} }
} }
// Unsubscribe is used to stop receiving messages from a peer that match a content filter // Unsubscribe is used to stop receiving messages from a peer that match a content filter
func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) { func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) {
wf.RLock() wf.RLock()
defer wf.RUnlock() defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil { if err := wf.ErrOnNotRunning(); err != nil {
@ -450,22 +423,22 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co
if err != nil { if err != nil {
return nil, err return nil, err
} }
resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.items)) resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.Items))
for pTopic, cTopics := range pubSubTopicMap { for pTopic, cTopics := range pubSubTopicMap {
cFilter := NewContentFilter(pTopic, cTopics...) cFilter := protocol.NewContentFilter(pTopic, cTopics...)
for peerID := range wf.subscriptions.items { for peerID := range wf.subscriptions.Items {
if params.selectedPeer != "" && peerID != params.selectedPeer { if params.selectedPeer != "" && peerID != params.selectedPeer {
continue continue
} }
subscriptions, ok := wf.subscriptions.items[peerID] subscriptions, ok := wf.subscriptions.Items[peerID]
if !ok || subscriptions == nil { if !ok || subscriptions == nil {
continue continue
} }
wf.cleanupSubscriptions(peerID, cFilter) wf.cleanupSubscriptions(peerID, cFilter)
if len(subscriptions.subsPerPubsubTopic) == 0 { if len(subscriptions.SubsPerPubsubTopic) == 0 {
delete(wf.subscriptions.items, peerID) delete(wf.subscriptions.Items, peerID)
} }
if params.wg != nil { if params.wg != nil {
@ -501,7 +474,8 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co
// UnsubscribeWithSubscription is used to close a particular subscription // UnsubscribeWithSubscription is used to close a particular subscription
// If there are no more subscriptions matching the passed [peer, contentFilter] pair, // If there are no more subscriptions matching the passed [peer, contentFilter] pair,
// server unsubscribe is also performed // server unsubscribe is also performed
func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *SubscriptionDetails, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) { func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *subscription.SubscriptionDetails,
opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) {
wf.RLock() wf.RLock()
defer wf.RUnlock() defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil { if err := wf.ErrOnNotRunning(); err != nil {
@ -531,7 +505,7 @@ func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context,
} }
func (wf *WakuFilterLightNode) unsubscribeFromServer(ctx context.Context, params *FilterSubscribeParameters, cFilter ContentFilter) error { func (wf *WakuFilterLightNode) unsubscribeFromServer(ctx context.Context, params *FilterSubscribeParameters, cFilter protocol.ContentFilter) error {
err := wf.request(ctx, params, pb.FilterSubscribeRequest_UNSUBSCRIBE, cFilter) err := wf.request(ctx, params, pb.FilterSubscribeRequest_UNSUBSCRIBE, cFilter)
if err != nil { if err != nil {
ferr, ok := err.(*FilterError) ferr, ok := err.(*FilterError)
@ -554,14 +528,14 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte
wf.subscriptions.Lock() wf.subscriptions.Lock()
defer wf.subscriptions.Unlock() defer wf.subscriptions.Unlock()
resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.items)) resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.Items))
for peerID := range wf.subscriptions.items { for peerID := range wf.subscriptions.Items {
if params.selectedPeer != "" && peerID != params.selectedPeer { if params.selectedPeer != "" && peerID != params.selectedPeer {
continue continue
} }
delete(wf.subscriptions.items, peerID) delete(wf.subscriptions.Items, peerID)
if params.wg != nil { if params.wg != nil {
params.wg.Add(1) params.wg.Add(1)
@ -578,7 +552,7 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte
ctx, ctx,
&FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID}, &FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID},
pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL, pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL,
ContentFilter{}) protocol.ContentFilter{})
if err != nil { if err != nil {
wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err)) wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err))
} }

View File

@ -17,6 +17,7 @@ import (
"github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
"github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils" "github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap" "go.uber.org/zap"
@ -40,8 +41,8 @@ type FilterTestSuite struct {
fullNode *WakuFilterFullNode fullNode *WakuFilterFullNode
fullNodeHost host.Host fullNodeHost host.Host
wg *sync.WaitGroup wg *sync.WaitGroup
contentFilter ContentFilter contentFilter protocol.ContentFilter
subDetails []*SubscriptionDetails subDetails []*subscription.SubscriptionDetails
log *zap.Logger log *zap.Logger
} }
@ -145,8 +146,8 @@ func (s *FilterTestSuite) waitForTimeout(fn func(), ch chan *protocol.Envelope)
s.wg.Wait() s.wg.Wait()
} }
func (s *FilterTestSuite) subscribe(pubsubTopic string, contentTopic string, peer peer.ID) []*SubscriptionDetails { func (s *FilterTestSuite) subscribe(pubsubTopic string, contentTopic string, peer peer.ID) []*subscription.SubscriptionDetails {
s.contentFilter = ContentFilter{pubsubTopic, NewContentTopicSet(contentTopic)} s.contentFilter = protocol.ContentFilter{PubsubTopic: pubsubTopic, ContentTopics: protocol.NewContentTopicSet(contentTopic)}
subDetails, err := s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(peer)) subDetails, err := s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(peer))
s.Require().NoError(err) s.Require().NoError(err)
@ -378,7 +379,7 @@ func (s *FilterTestSuite) TestRunningGuard() {
s.lightNode.Stop() s.lightNode.Stop()
contentFilter := ContentFilter{"test", NewContentTopicSet("test")} contentFilter := protocol.ContentFilter{PubsubTopic: "test", ContentTopics: protocol.NewContentTopicSet("test")}
_, err := s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID())) _, err := s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID()))
@ -394,7 +395,7 @@ func (s *FilterTestSuite) TestRunningGuard() {
func (s *FilterTestSuite) TestFireAndForgetAndCustomWg() { func (s *FilterTestSuite) TestFireAndForgetAndCustomWg() {
contentFilter := ContentFilter{"test", NewContentTopicSet("test")} contentFilter := protocol.ContentFilter{PubsubTopic: "test", ContentTopics: protocol.NewContentTopicSet("test")}
_, err := s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID())) _, err := s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID()))
s.Require().NoError(err) s.Require().NoError(err)
@ -508,9 +509,9 @@ func (s *FilterTestSuite) TestAutoShard() {
s.Require().NoError(err) s.Require().NoError(err)
}, s.subDetails[0].C) }, s.subDetails[0].C)
_, err = s.lightNode.Unsubscribe(s.ctx, ContentFilter{ _, err = s.lightNode.Unsubscribe(s.ctx, protocol.ContentFilter{
PubsubTopic: s.testTopic, PubsubTopic: s.testTopic,
ContentTopics: NewContentTopicSet(newContentTopic), ContentTopics: protocol.NewContentTopicSet(newContentTopic),
}) })
s.Require().NoError(err) s.Require().NoError(err)

View File

@ -60,7 +60,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) FilterSubscribeOption
if params.pm == nil { if params.pm == nil {
p, err = utils.SelectPeer(params.host, FilterSubscribeID_v20beta1, fromThesePeers, params.log) p, err = utils.SelectPeer(params.host, FilterSubscribeID_v20beta1, fromThesePeers, params.log)
} else { } else {
p, err = params.pm.SelectPeer(FilterSubscribeID_v20beta1, fromThesePeers) p, err = params.pm.SelectPeer(FilterSubscribeID_v20beta1, "", fromThesePeers...)
} }
if err == nil { if err == nil {
params.selectedPeer = p params.selectedPeer = p

View File

@ -8,23 +8,14 @@ import (
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol"
) )
var ErrNotFound = errors.New("not found")
type ContentTopicSet map[string]struct{}
func NewContentTopicSet(contentTopics ...string) ContentTopicSet {
s := make(ContentTopicSet, len(contentTopics))
for _, ct := range contentTopics {
s[ct] = struct{}{}
}
return s
}
type PeerSet map[peer.ID]struct{} type PeerSet map[peer.ID]struct{}
type PubsubTopics map[string]ContentTopicSet // pubsubTopic => contentTopics type PubsubTopics map[string]protocol.ContentTopicSet // pubsubTopic => contentTopics
var errNotFound = errors.New("not found")
type SubscribersMap struct { type SubscribersMap struct {
sync.RWMutex sync.RWMutex
@ -65,7 +56,7 @@ func (sub *SubscribersMap) Set(peerID peer.ID, pubsubTopic string, contentTopics
contentTopicsMap, ok := pubsubTopicMap[pubsubTopic] contentTopicsMap, ok := pubsubTopicMap[pubsubTopic]
if !ok { if !ok {
contentTopicsMap = make(ContentTopicSet) contentTopicsMap = make(protocol.ContentTopicSet)
} }
for _, c := range contentTopics { for _, c := range contentTopics {
@ -106,12 +97,12 @@ func (sub *SubscribersMap) Delete(peerID peer.ID, pubsubTopic string, contentTop
pubsubTopicMap, ok := sub.items[peerID] pubsubTopicMap, ok := sub.items[peerID]
if !ok { if !ok {
return ErrNotFound return errNotFound
} }
contentTopicsMap, ok := pubsubTopicMap[pubsubTopic] contentTopicsMap, ok := pubsubTopicMap[pubsubTopic]
if !ok { if !ok {
return ErrNotFound return errNotFound
} }
// Removing content topics individually // Removing content topics individually
@ -140,7 +131,7 @@ func (sub *SubscribersMap) Delete(peerID peer.ID, pubsubTopic string, contentTop
func (sub *SubscribersMap) deleteAll(peerID peer.ID) error { func (sub *SubscribersMap) deleteAll(peerID peer.ID) error {
pubsubTopicMap, ok := sub.items[peerID] pubsubTopicMap, ok := sub.items[peerID]
if !ok { if !ok {
return ErrNotFound return errNotFound
} }
for pubsubTopic, contentTopicsMap := range pubsubTopicMap { for pubsubTopic, contentTopicsMap := range pubsubTopicMap {

View File

@ -144,15 +144,10 @@ func (wakuLP *WakuLightPush) onRequest(ctx context.Context) func(s network.Strea
} }
} }
func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, opts ...Option) (*pb.PushResponse, error) { // request sends a message via lightPush protocol to either a specified peer or peer that is selected.
params := new(lightPushParameters) func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, params *lightPushParameters) (*pb.PushResponse, error) {
params.host = wakuLP.h if params == nil {
params.log = wakuLP.log return nil, errors.New("lightpush params are mandatory")
params.pm = wakuLP.pm
optList := append(DefaultOptions(wakuLP.h), opts...)
for _, opt := range optList {
opt(params)
} }
if params.selectedPeer == "" { if params.selectedPeer == "" {
@ -215,23 +210,40 @@ func (wakuLP *WakuLightPush) Stop() {
wakuLP.h.RemoveStreamHandler(LightPushID_v20beta1) wakuLP.h.RemoveStreamHandler(LightPushID_v20beta1)
} }
// PublishToTopic is used to broadcast a WakuMessage to a pubsub topic via lightpush protocol // Optional PublishToTopic is used to broadcast a WakuMessage to a pubsub topic via lightpush protocol
func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.WakuMessage, topic string, opts ...Option) ([]byte, error) { // If pubSubTopic is not provided, then contentTopic is use to derive the relevant pubSubTopic via autosharding.
func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.WakuMessage, opts ...Option) ([]byte, error) {
if message == nil { if message == nil {
return nil, errors.New("message can't be null") return nil, errors.New("message can't be null")
} }
params := new(lightPushParameters)
params.host = wakuLP.h
params.log = wakuLP.log
params.pm = wakuLP.pm
optList := append(DefaultOptions(wakuLP.h), opts...)
for _, opt := range optList {
opt(params)
}
if params.pubsubTopic == "" {
var err error
params.pubsubTopic, err = protocol.GetPubSubTopicFromContentTopic(message.ContentTopic)
if err != nil {
return nil, err
}
}
req := new(pb.PushRequest) req := new(pb.PushRequest)
req.Message = message req.Message = message
req.PubsubTopic = topic req.PubsubTopic = params.pubsubTopic
response, err := wakuLP.request(ctx, req, opts...) response, err := wakuLP.request(ctx, req, params)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if response.IsSuccess { if response.IsSuccess {
hash := message.Hash(topic) hash := message.Hash(params.pubsubTopic)
wakuLP.log.Info("waku.lightpush published", logging.HexString("hash", hash)) wakuLP.log.Info("waku.lightpush published", logging.HexString("hash", hash))
return hash, nil return hash, nil
} }
@ -239,7 +251,8 @@ func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *wpb.Wa
return nil, errors.New(response.Info) return nil, errors.New(response.Info)
} }
// Publish is used to broadcast a WakuMessage to the default waku pubsub topic via lightpush protocol // Publish is used to broadcast a WakuMessage to the pubSubTopic (which is derived from the contentTopic) via lightpush protocol
// If auto-sharding is not to be used, then PublishToTopic API should be used
func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessage, opts ...Option) ([]byte, error) { func (wakuLP *WakuLightPush) Publish(ctx context.Context, message *wpb.WakuMessage, opts ...Option) ([]byte, error) {
return wakuLP.PublishToTopic(ctx, message, relay.DefaultWakuTopic, opts...) return wakuLP.PublishToTopic(ctx, message, opts...)
} }

View File

@ -17,6 +17,7 @@ type lightPushParameters struct {
requestID []byte requestID []byte
pm *peermanager.PeerManager pm *peermanager.PeerManager
log *zap.Logger log *zap.Logger
pubsubTopic string
} }
// Option is the type of options accepted when performing LightPush protocol requests // Option is the type of options accepted when performing LightPush protocol requests
@ -40,7 +41,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) Option {
if params.pm == nil { if params.pm == nil {
p, err = utils.SelectPeer(params.host, LightPushID_v20beta1, fromThesePeers, params.log) p, err = utils.SelectPeer(params.host, LightPushID_v20beta1, fromThesePeers, params.log)
} else { } else {
p, err = params.pm.SelectPeer(LightPushID_v20beta1, fromThesePeers) p, err = params.pm.SelectPeer(LightPushID_v20beta1, "", fromThesePeers...)
} }
if err == nil { if err == nil {
params.selectedPeer = p params.selectedPeer = p
@ -50,6 +51,12 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) Option {
} }
} }
func WithPubSubTopic(pubsubTopic string) Option {
return func(params *lightPushParameters) {
params.pubsubTopic = pubsubTopic
}
}
// WithFastestPeerSelection is an option used to select a peer from the peer store // WithFastestPeerSelection is an option used to select a peer from the peer store
// with the lowest ping. If a list of specific peers is passed, the peer will be chosen // with the lowest ping. If a list of specific peers is passed, the peer will be chosen
// from that list assuming it supports the chosen protocol, otherwise it will chose a peer // from that list assuming it supports the chosen protocol, otherwise it will chose a peer

View File

@ -13,13 +13,12 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource" "github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-waku/waku/v2/utils" "github.com/waku-org/go-waku/waku/v2/utils"
) )
func makeWakuRelay(t *testing.T, topic string) (*relay.WakuRelay, *relay.Subscription, host.Host) { func makeWakuRelay(t *testing.T, pusubTopic string) (*relay.WakuRelay, *relay.Subscription, host.Host) {
port, err := tests.FindFreePort(t, "", 5) port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err) require.NoError(t, err)
@ -34,7 +33,7 @@ func makeWakuRelay(t *testing.T, topic string) (*relay.WakuRelay, *relay.Subscri
err = relay.Start(context.Background()) err = relay.Start(context.Background())
require.NoError(t, err) require.NoError(t, err)
sub, err := relay.SubscribeToTopic(context.Background(), topic) sub, err := relay.SubscribeToTopic(context.Background(), pusubTopic)
require.NoError(t, err) require.NoError(t, err)
return relay, sub, host return relay, sub, host
@ -87,13 +86,8 @@ func TestWakuLightPush(t *testing.T) {
err = clientHost.Peerstore().AddProtocols(host2.ID(), LightPushID_v20beta1) err = clientHost.Peerstore().AddProtocols(host2.ID(), LightPushID_v20beta1)
require.NoError(t, err) require.NoError(t, err)
msg1 := tests.CreateWakuMessage("test1", utils.GetUnixEpoch())
msg2 := tests.CreateWakuMessage("test2", utils.GetUnixEpoch()) msg2 := tests.CreateWakuMessage("test2", utils.GetUnixEpoch())
req := new(pb.PushRequest)
req.Message = msg1
req.PubsubTopic = string(testTopic)
// Wait for the mesh connection to happen between node1 and node2 // Wait for the mesh connection to happen between node1 and node2
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
var wg sync.WaitGroup var wg sync.WaitGroup
@ -102,23 +96,19 @@ func TestWakuLightPush(t *testing.T) {
go func() { go func() {
defer wg.Done() defer wg.Done()
<-sub1.Ch <-sub1.Ch
<-sub1.Ch
}() }()
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
<-sub2.Ch <-sub2.Ch
<-sub2.Ch
}() }()
// Verifying successful request var lpOptions []Option
resp, err := client.request(ctx, req) lpOptions = append(lpOptions, WithPubSubTopic(testTopic))
require.NoError(t, err)
require.True(t, resp.IsSuccess)
// Checking that msg hash is correct // Checking that msg hash is correct
hash, err := client.PublishToTopic(ctx, msg2, testTopic) hash, err := client.PublishToTopic(ctx, msg2, lpOptions...)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), string(testTopic)).Hash(), hash) require.Equal(t, protocol.NewEnvelope(msg2, utils.GetUnixEpoch(), string(testTopic)).Hash(), hash)
wg.Wait() wg.Wait()
@ -147,6 +137,90 @@ func TestWakuLightPushNoPeers(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
client := NewWakuLightPush(nil, nil, prometheus.DefaultRegisterer, utils.Logger()) client := NewWakuLightPush(nil, nil, prometheus.DefaultRegisterer, utils.Logger())
client.SetHost(clientHost) client.SetHost(clientHost)
_, err = client.PublishToTopic(ctx, tests.CreateWakuMessage("test", utils.GetUnixEpoch()), testTopic) var lpOptions []Option
lpOptions = append(lpOptions, WithPubSubTopic(testTopic))
_, err = client.PublishToTopic(ctx, tests.CreateWakuMessage("test", utils.GetUnixEpoch()), lpOptions...)
require.Errorf(t, err, "no suitable remote peers") require.Errorf(t, err, "no suitable remote peers")
} }
// Node1: Relay
// Node2: Relay+Lightpush
// Client that will lightpush a message
//
// Node1 and Node 2 are peers
// Client and Node 2 are peers
// Client will use lightpush request, sending the message to Node2
//
// Client send a successful message using lightpush
// Node2 receive the message and broadcast it
// Node1 receive the message
func TestWakuLightPushAutoSharding(t *testing.T) {
contentTopic := "0/test/1/testTopic/proto"
cTopic1, err := protocol.StringToContentTopic(contentTopic)
require.NoError(t, err)
//Computing pubSubTopic only for filterFullNode.
pubSubTopicInst := protocol.GetShardFromContentTopic(cTopic1, protocol.GenerationZeroShardsCount)
pubSubTopic := pubSubTopicInst.String()
node1, sub1, host1 := makeWakuRelay(t, pubSubTopic)
defer node1.Stop()
defer sub1.Unsubscribe()
node2, sub2, host2 := makeWakuRelay(t, pubSubTopic)
defer node2.Stop()
defer sub2.Unsubscribe()
ctx := context.Background()
lightPushNode2 := NewWakuLightPush(node2, nil, prometheus.DefaultRegisterer, utils.Logger())
lightPushNode2.SetHost(host2)
err = lightPushNode2.Start(ctx)
require.NoError(t, err)
defer lightPushNode2.Stop()
port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)
clientHost, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
client := NewWakuLightPush(nil, nil, prometheus.DefaultRegisterer, utils.Logger())
client.SetHost(clientHost)
host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL)
err = host2.Peerstore().AddProtocols(host1.ID(), relay.WakuRelayID_v200)
require.NoError(t, err)
err = host2.Connect(ctx, host2.Peerstore().PeerInfo(host1.ID()))
require.NoError(t, err)
clientHost.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
err = clientHost.Peerstore().AddProtocols(host2.ID(), LightPushID_v20beta1)
require.NoError(t, err)
msg1 := tests.CreateWakuMessage(contentTopic, utils.GetUnixEpoch())
// Wait for the mesh connection to happen between node1 and node2
time.Sleep(2 * time.Second)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
<-sub1.Ch
}()
wg.Add(1)
go func() {
defer wg.Done()
<-sub2.Ch
}()
// Verifying successful request
hash1, err := client.Publish(ctx, msg1)
require.NoError(t, err)
require.Equal(t, protocol.NewEnvelope(msg1, utils.GetUnixEpoch(), string(pubSubTopic)).Hash(), hash1)
wg.Wait()
}

View File

@ -37,7 +37,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) PeerExchangeOption {
if params.pm == nil { if params.pm == nil {
p, err = utils.SelectPeer(params.host, PeerExchangeID_v20alpha1, fromThesePeers, params.log) p, err = utils.SelectPeer(params.host, PeerExchangeID_v20alpha1, fromThesePeers, params.log)
} else { } else {
p, err = params.pm.SelectPeer(PeerExchangeID_v20alpha1, fromThesePeers) p, err = params.pm.SelectPeer(PeerExchangeID_v20alpha1, "", fromThesePeers...)
} }
if err == nil { if err == nil {
params.selectedPeer = p params.selectedPeer = p

View File

@ -230,3 +230,13 @@ func GetShardFromContentTopic(topic ContentTopic, shardCount int) StaticSharding
return NewStaticShardingPubsubTopic(ClusterIndex, uint16(shard)) return NewStaticShardingPubsubTopic(ClusterIndex, uint16(shard))
} }
func GetPubSubTopicFromContentTopic(cTopicString string) (string, error) {
cTopic, err := StringToContentTopic(cTopicString)
if err != nil {
return "", fmt.Errorf("%s : %s", err.Error(), cTopicString)
}
pTopic := GetShardFromContentTopic(cTopic, GenerationZeroShardsCount)
return pTopic.String(), nil
}

View File

@ -111,7 +111,7 @@ func WithAutomaticPeerSelection(fromThesePeers ...peer.ID) HistoryRequestOption
if params.s.pm == nil { if params.s.pm == nil {
p, err = utils.SelectPeer(params.s.h, StoreID_v20beta4, fromThesePeers, params.s.log) p, err = utils.SelectPeer(params.s.h, StoreID_v20beta4, fromThesePeers, params.s.log)
} else { } else {
p, err = params.s.pm.SelectPeer(StoreID_v20beta4, fromThesePeers) p, err = params.s.pm.SelectPeer(StoreID_v20beta4, "", fromThesePeers...)
} }
if err == nil { if err == nil {
params.selectedPeer = p params.selectedPeer = p

View File

@ -1,7 +1,8 @@
package filter package subscription
import ( import (
"encoding/json" "encoding/json"
"errors"
"sync" "sync"
"github.com/google/uuid" "github.com/google/uuid"
@ -20,7 +21,7 @@ type SubscriptionDetails struct {
once sync.Once once sync.Once
PeerID peer.ID PeerID peer.ID
ContentFilter ContentFilter ContentFilter protocol.ContentFilter
C chan *protocol.Envelope C chan *protocol.Envelope
} }
@ -28,39 +29,41 @@ type SubscriptionDetails struct {
type SubscriptionSet map[string]*SubscriptionDetails type SubscriptionSet map[string]*SubscriptionDetails
type PeerSubscription struct { type PeerSubscription struct {
peerID peer.ID PeerID peer.ID
subsPerPubsubTopic map[string]SubscriptionSet SubsPerPubsubTopic map[string]SubscriptionSet
} }
type SubscriptionsMap struct { type SubscriptionsMap struct {
sync.RWMutex sync.RWMutex
logger *zap.Logger logger *zap.Logger
items map[peer.ID]*PeerSubscription Items map[peer.ID]*PeerSubscription
} }
var ErrNotFound = errors.New("not found")
func NewSubscriptionMap(logger *zap.Logger) *SubscriptionsMap { func NewSubscriptionMap(logger *zap.Logger) *SubscriptionsMap {
return &SubscriptionsMap{ return &SubscriptionsMap{
logger: logger.Named("subscription-map"), logger: logger.Named("subscription-map"),
items: make(map[peer.ID]*PeerSubscription), Items: make(map[peer.ID]*PeerSubscription),
} }
} }
func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, cf ContentFilter) *SubscriptionDetails { func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, cf protocol.ContentFilter) *SubscriptionDetails {
sub.Lock() sub.Lock()
defer sub.Unlock() defer sub.Unlock()
peerSubscription, ok := sub.items[peerID] peerSubscription, ok := sub.Items[peerID]
if !ok { if !ok {
peerSubscription = &PeerSubscription{ peerSubscription = &PeerSubscription{
peerID: peerID, PeerID: peerID,
subsPerPubsubTopic: make(map[string]SubscriptionSet), SubsPerPubsubTopic: make(map[string]SubscriptionSet),
} }
sub.items[peerID] = peerSubscription sub.Items[peerID] = peerSubscription
} }
_, ok = peerSubscription.subsPerPubsubTopic[cf.PubsubTopic] _, ok = peerSubscription.SubsPerPubsubTopic[cf.PubsubTopic]
if !ok { if !ok {
peerSubscription.subsPerPubsubTopic[cf.PubsubTopic] = make(SubscriptionSet) peerSubscription.SubsPerPubsubTopic[cf.PubsubTopic] = make(SubscriptionSet)
} }
details := &SubscriptionDetails{ details := &SubscriptionDetails{
@ -68,10 +71,10 @@ func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, cf ContentFilter) *
mapRef: sub, mapRef: sub,
PeerID: peerID, PeerID: peerID,
C: make(chan *protocol.Envelope, 1024), C: make(chan *protocol.Envelope, 1024),
ContentFilter: ContentFilter{cf.PubsubTopic, maps.Clone(cf.ContentTopics)}, ContentFilter: protocol.ContentFilter{PubsubTopic: cf.PubsubTopic, ContentTopics: maps.Clone(cf.ContentTopics)},
} }
sub.items[peerID].subsPerPubsubTopic[cf.PubsubTopic][details.ID] = details sub.Items[peerID].SubsPerPubsubTopic[cf.PubsubTopic][details.ID] = details
return details return details
} }
@ -80,23 +83,23 @@ func (sub *SubscriptionsMap) IsSubscribedTo(peerID peer.ID) bool {
sub.RLock() sub.RLock()
defer sub.RUnlock() defer sub.RUnlock()
_, ok := sub.items[peerID] _, ok := sub.Items[peerID]
return ok return ok
} }
// Check if we have subscriptions for all (pubsubTopic, contentTopics[i]) pairs provided // Check if we have subscriptions for all (pubsubTopic, contentTopics[i]) pairs provided
func (sub *SubscriptionsMap) Has(peerID peer.ID, cf ContentFilter) bool { func (sub *SubscriptionsMap) Has(peerID peer.ID, cf protocol.ContentFilter) bool {
sub.RLock() sub.RLock()
defer sub.RUnlock() defer sub.RUnlock()
// Check if peer exits // Check if peer exits
peerSubscription, ok := sub.items[peerID] peerSubscription, ok := sub.Items[peerID]
if !ok { if !ok {
return false return false
} }
//TODO: Handle pubsubTopic as null //TODO: Handle pubsubTopic as null
// Check if pubsub topic exists // Check if pubsub topic exists
subscriptions, ok := peerSubscription.subsPerPubsubTopic[cf.PubsubTopic] subscriptions, ok := peerSubscription.SubsPerPubsubTopic[cf.PubsubTopic]
if !ok { if !ok {
return false return false
} }
@ -122,12 +125,12 @@ func (sub *SubscriptionsMap) Delete(subscription *SubscriptionDetails) error {
sub.Lock() sub.Lock()
defer sub.Unlock() defer sub.Unlock()
peerSubscription, ok := sub.items[subscription.PeerID] peerSubscription, ok := sub.Items[subscription.PeerID]
if !ok { if !ok {
return ErrNotFound return ErrNotFound
} }
delete(peerSubscription.subsPerPubsubTopic[subscription.ContentFilter.PubsubTopic], subscription.ID) delete(peerSubscription.SubsPerPubsubTopic[subscription.ContentFilter.PubsubTopic], subscription.ID)
return nil return nil
} }
@ -150,7 +153,7 @@ func (s *SubscriptionDetails) Remove(contentTopics ...string) {
} }
} }
func (s *SubscriptionDetails) closeC() { func (s *SubscriptionDetails) CloseC() {
s.once.Do(func() { s.once.Do(func() {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
@ -161,7 +164,7 @@ func (s *SubscriptionDetails) closeC() {
} }
func (s *SubscriptionDetails) Close() error { func (s *SubscriptionDetails) Close() error {
s.closeC() s.CloseC()
return s.mapRef.Delete(s) return s.mapRef.Delete(s)
} }
@ -174,7 +177,7 @@ func (s *SubscriptionDetails) Clone() *SubscriptionDetails {
mapRef: s.mapRef, mapRef: s.mapRef,
Closed: false, Closed: false,
PeerID: s.PeerID, PeerID: s.PeerID,
ContentFilter: ContentFilter{s.ContentFilter.PubsubTopic, maps.Clone(s.ContentFilter.ContentTopics)}, ContentFilter: protocol.ContentFilter{PubsubTopic: s.ContentFilter.PubsubTopic, ContentTopics: maps.Clone(s.ContentFilter.ContentTopics)},
C: make(chan *protocol.Envelope), C: make(chan *protocol.Envelope),
} }
@ -182,15 +185,15 @@ func (s *SubscriptionDetails) Clone() *SubscriptionDetails {
} }
func (sub *SubscriptionsMap) clear() { func (sub *SubscriptionsMap) clear() {
for _, peerSubscription := range sub.items { for _, peerSubscription := range sub.Items {
for _, subscriptionSet := range peerSubscription.subsPerPubsubTopic { for _, subscriptionSet := range peerSubscription.SubsPerPubsubTopic {
for _, subscription := range subscriptionSet { for _, subscription := range subscriptionSet {
subscription.closeC() subscription.CloseC()
} }
} }
} }
sub.items = make(map[peer.ID]*PeerSubscription) sub.Items = make(map[peer.ID]*PeerSubscription)
} }
func (sub *SubscriptionsMap) Clear() { func (sub *SubscriptionsMap) Clear() {
@ -203,7 +206,7 @@ func (sub *SubscriptionsMap) Notify(peerID peer.ID, envelope *protocol.Envelope)
sub.RLock() sub.RLock()
defer sub.RUnlock() defer sub.RUnlock()
subscriptions, ok := sub.items[peerID].subsPerPubsubTopic[envelope.PubsubTopic()] subscriptions, ok := sub.Items[peerID].SubsPerPubsubTopic[envelope.PubsubTopic()]
if ok { if ok {
iterateSubscriptionSet(sub.logger, subscriptions, envelope) iterateSubscriptionSet(sub.logger, subscriptions, envelope)
} }

View File

@ -1,4 +1,4 @@
package filter package subscription
import ( import (
"context" "context"
@ -6,18 +6,29 @@ import (
"testing" "testing"
"time" "time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/test"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests" "github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/utils" "github.com/waku-org/go-waku/waku/v2/utils"
) )
const PUBSUB_TOPIC = "/test/topic"
func createPeerID(t *testing.T) peer.ID {
peerId, err := test.RandPeerID()
assert.NoError(t, err)
return peerId
}
func TestSubscriptionMapAppend(t *testing.T) { func TestSubscriptionMapAppend(t *testing.T) {
fmap := NewSubscriptionMap(utils.Logger()) fmap := NewSubscriptionMap(utils.Logger())
peerID := createPeerID(t) peerID := createPeerID(t)
contentTopics := NewContentTopicSet("ct1", "ct2") contentTopics := protocol.NewContentTopicSet("ct1", "ct2")
sub := fmap.NewSubscription(peerID, ContentFilter{PUBSUB_TOPIC, contentTopics}) sub := fmap.NewSubscription(peerID, protocol.ContentFilter{PubsubTopic: PUBSUB_TOPIC, ContentTopics: contentTopics})
_, found := sub.ContentFilter.ContentTopics["ct1"] _, found := sub.ContentFilter.ContentTopics["ct1"]
require.True(t, found) require.True(t, found)
_, found = sub.ContentFilter.ContentTopics["ct2"] _, found = sub.ContentFilter.ContentTopics["ct2"]
@ -44,12 +55,12 @@ func TestSubscriptionMapAppend(t *testing.T) {
func TestSubscriptionClear(t *testing.T) { func TestSubscriptionClear(t *testing.T) {
fmap := NewSubscriptionMap(utils.Logger()) fmap := NewSubscriptionMap(utils.Logger())
contentTopics := NewContentTopicSet("ct1", "ct2") contentTopics := protocol.NewContentTopicSet("ct1", "ct2")
var subscriptions = []*SubscriptionDetails{ var subscriptions = []*SubscriptionDetails{
fmap.NewSubscription(createPeerID(t), ContentFilter{PUBSUB_TOPIC + "1", contentTopics}), fmap.NewSubscription(createPeerID(t), protocol.ContentFilter{PubsubTopic: PUBSUB_TOPIC + "1", ContentTopics: contentTopics}),
fmap.NewSubscription(createPeerID(t), ContentFilter{PUBSUB_TOPIC + "2", contentTopics}), fmap.NewSubscription(createPeerID(t), protocol.ContentFilter{PubsubTopic: PUBSUB_TOPIC + "2", ContentTopics: contentTopics}),
fmap.NewSubscription(createPeerID(t), ContentFilter{PUBSUB_TOPIC + "3", contentTopics}), fmap.NewSubscription(createPeerID(t), protocol.ContentFilter{PubsubTopic: PUBSUB_TOPIC + "3", ContentTopics: contentTopics}),
} }
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
@ -84,9 +95,9 @@ func TestSubscriptionsNotify(t *testing.T) {
p1 := createPeerID(t) p1 := createPeerID(t)
p2 := createPeerID(t) p2 := createPeerID(t)
var subscriptions = []*SubscriptionDetails{ var subscriptions = []*SubscriptionDetails{
fmap.NewSubscription(p1, ContentFilter{PUBSUB_TOPIC + "1", NewContentTopicSet("ct1", "ct2")}), fmap.NewSubscription(p1, protocol.ContentFilter{PubsubTopic: PUBSUB_TOPIC + "1", ContentTopics: protocol.NewContentTopicSet("ct1", "ct2")}),
fmap.NewSubscription(p2, ContentFilter{PUBSUB_TOPIC + "1", NewContentTopicSet("ct1")}), fmap.NewSubscription(p2, protocol.ContentFilter{PubsubTopic: PUBSUB_TOPIC + "1", ContentTopics: protocol.NewContentTopicSet("ct1")}),
fmap.NewSubscription(p1, ContentFilter{PUBSUB_TOPIC + "2", NewContentTopicSet("ct1", "ct2")}), fmap.NewSubscription(p1, protocol.ContentFilter{PubsubTopic: PUBSUB_TOPIC + "2", ContentTopics: protocol.NewContentTopicSet("ct1", "ct2")}),
} }
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)