mirror of https://github.com/status-im/go-waku.git
Feat/autoshard filter (#723)
* feat: update filter client to support autosharding * chore: add filter tests for autoshard * chore:update filter API docs for autosharding * chore: docs changes to indicate sharding impact on pubSubTopic * fix: handle partial errors during subscribe and return failed content-topic details
This commit is contained in:
parent
bf90ab4d1b
commit
054bdae1de
|
@ -75,11 +75,11 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.
|
|||
filterOpt = filter.WithPeer(peerID)
|
||||
chat.ui.InfoMessage(fmt.Sprintf("Subscribing to filter node %s", peerID))
|
||||
}
|
||||
theFilter, err := node.FilterLightnode().Subscribe(ctx, cf, filterOpt)
|
||||
theFilters, err := node.FilterLightnode().Subscribe(ctx, cf, filterOpt)
|
||||
if err != nil {
|
||||
chat.ui.ErrorMessage(err)
|
||||
} else {
|
||||
chat.C = theFilter.C
|
||||
chat.C = theFilters[0].C //Picking first subscription since there is only 1 contentTopic specified.
|
||||
}
|
||||
} else {
|
||||
|
||||
|
|
|
@ -21,28 +21,20 @@ The app will run 2 nodes ("full" node and "light" node), with light node subscri
|
|||
|
||||
## Flow description
|
||||
|
||||
1. Light node submits a FilterRequest through WakuNode.SubscribeFilter. This request is submitted to a particular peer.
|
||||
Filter is stored in WakuNode.filters map. That's it.
|
||||
DONE
|
||||
2. Full node: we read incoming messages in WakuFilter.onRequest(). It is set as a stream handler on wakunode.Host for WakuFilterProtocolId.
|
||||
3. In WakuFilter.onRequest():
|
||||
3.1. We check whether it's a MessagePush or FilterRequest.
|
||||
3.2. If it's a MessagePush, then we're on a light node. Invoke pushHandler coming from WakuNode.mountFilter()
|
||||
3.3. If it's a FilterRequest, add a subscriber.
|
||||
4. WakuNode.Subscribe has a message loop extracting WakuMessages from a wakurelay.Subscription object.
|
||||
It denotes a pubsub topic subscription.
|
||||
All envelopes are then submitted to node.broadcaster.
|
||||
|
||||
|
||||
## Nim code flow
|
||||
1. Light node: WakuFilter.subscribe(). Find a peer, wrileLP(FilterRequest). Store requestId in WakuNode.filters along with a ContentFilterHandler proc.
|
||||
2. Full node: WakuFilter inherits LPProtocol. LPProtocol.handler invokes readLP() to read FilterRPC messages
|
||||
3. this handler function has a signature (conn: Connection, proto: string).
|
||||
3.1. it checks whether a MessagePush or FilterRequest is received.
|
||||
3.2. (light node) if it's a MessagePush, then we're on a light node. Invoke pushHandler of MessagePushHandler type. This pushHandler comes from WakuNode.mountFilter(). It iterates through all registered WakuNode.filters (stored in step 1) and invokes their ContentFilterHandler proc.
|
||||
3.3. (full node) if it's a FilterRequest, create a Subscriber and add to WakuFilter.subscribers seq
|
||||
4. (full node) Each time a message is received through GossipSub in wakunode.subscribe.defaultHandler(), we iterate through subscriptions.
|
||||
5. (full node) One of these subscriptions is a filter subscription added by WakuNode.mountFilter(), which in turn is returned from WakuFilter.subscription()
|
||||
6. (full node) This subscription iterates through subscribers added by WakuFilter.handler() fn (subscribers being light nodes)
|
||||
7. (full node) Once subscriber peer is found, a message is pushed directly to the peer (go to step 3.2)
|
||||
|
||||
### Light Node
|
||||
1. A light node is created with option WithWakuFilterLightNode.
|
||||
2. Starting this node sets stream handler on wakunode.Host for WakuFilterProtocolId.
|
||||
3. Light node submits a FilterSubscribeRequest through WakuFilterLightNode.Subscribe. This request is submitted to a particular peer.
|
||||
Filter is stored in WakuFilterLightNode.subscriptions map. That's it.
|
||||
4. Now we wait on WakuFilterLightNode.onRequest to process any further messages.
|
||||
5. On receiving a message check and notify all subscribers on relevant channel (which is part of subscription obbject).
|
||||
6. If a broadcaster is specified,
|
||||
WakuNode.Subscribe has a message loop extracting WakuMessages from a wakurelay.Subscription object.It denotes a pubsub topic subscription.All envelopes are then submitted to node.broadcaster.
|
||||
### Full Node
|
||||
1. Full node is created with option WithWakuFilterFullNode.
|
||||
2. We read incoming messages in WithWakuFilterFullNode.onRequest(). It is set as a stream handler on wakunode.Host for WakuFilterProtocolId.
|
||||
3. In WakuFilter.onRequest
|
||||
* We check the type of FilterRequest and handle accordingly.
|
||||
* If it's a FilterRequest for subscribe, add a subscriber.
|
||||
* If it is a SubscriberPing request, check if subscriptions exists or not and respond accordingly.
|
||||
* If it is an unsubscribe/unsubscribeAll request, check and remove relevant subscriptions.
|
||||
|
|
|
@ -26,7 +26,7 @@ var log = logging.Logger("filter2")
|
|||
|
||||
var pubSubTopic = protocol.DefaultPubsubTopic()
|
||||
|
||||
const contentTopic = "test"
|
||||
const contentTopic = "/filter2test/1/testTopic/proto"
|
||||
|
||||
func main() {
|
||||
hostAddr1, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:60000")
|
||||
|
@ -98,7 +98,6 @@ func main() {
|
|||
|
||||
// Send FilterRequest from light node to full node
|
||||
cf := filter.ContentFilter{
|
||||
PubsubTopic: pubSubTopic.String(),
|
||||
ContentTopics: filter.NewContentTopicSet(contentTopic),
|
||||
}
|
||||
|
||||
|
@ -108,7 +107,7 @@ func main() {
|
|||
}
|
||||
|
||||
go func() {
|
||||
for env := range theFilter.C {
|
||||
for env := range theFilter[0].C { //Safely picking first subscriptions since only 1 contentTopic is subscribed
|
||||
log.Info("Light node received msg, ", string(env.Message().Payload))
|
||||
}
|
||||
log.Info("Message channel closed!")
|
||||
|
|
|
@ -88,7 +88,7 @@ The criteria to create subscription to a filter full node in JSON Format:
|
|||
Fields:
|
||||
|
||||
- `contentTopics`: Array of content topics.
|
||||
- `topic`: pubsub topic.
|
||||
- `topic`: Optional pubsub topic when using contentTopics as per Autosharding. In case of named or static-sharding, pubSub topic is mandatory.
|
||||
|
||||
|
||||
### `LegacyFilterSubscription` type
|
||||
|
@ -884,15 +884,21 @@ Creates a subscription to a filter full node matching a content filter..
|
|||
|
||||
A status code. Refer to the [`Status codes`](#status-codes) section for possible values.
|
||||
|
||||
If the function is executed succesfully, `onOkCb` will receive the subscription details.
|
||||
If the function is executed succesfully, `onOkCb` will receive the following subscription details along with any partial errors.
|
||||
|
||||
For example:
|
||||
|
||||
```json
|
||||
{
|
||||
"peerID": "....",
|
||||
"pubsubTopic": "...",
|
||||
"contentTopics": [...]
|
||||
"subscriptions" : [
|
||||
{
|
||||
"ID": "<subscriptionID>",
|
||||
"peerID": "....",
|
||||
"pubsubTopic": "...",
|
||||
"contentTopics": [...]
|
||||
}
|
||||
],
|
||||
"error" : "subscriptions failed for contentTopics:<topicA>,.." // Empty if all subscriptions are succesful
|
||||
}
|
||||
```
|
||||
|
||||
|
|
|
@ -10,14 +10,14 @@ import "github.com/waku-org/go-waku/library"
|
|||
// filterJSON must contain a JSON with this format:
|
||||
//
|
||||
// {
|
||||
// "pubsubTopic": "the pubsub topic" // mandatory
|
||||
// "pubsubTopic": "the pubsub topic" // optional if using autosharding, mandatory if using static or named sharding.
|
||||
// "contentTopics": ["the content topic"] // mandatory, at least one required, with a max of 10
|
||||
// }
|
||||
//
|
||||
// peerID should contain the ID of a peer supporting the filter protocol. Use NULL to automatically select a node
|
||||
// If ms is greater than 0, the subscription must happen before the timeout
|
||||
// (in milliseconds) is reached, or an error will be returned
|
||||
// It returns a json object containing the peerID to which we are subscribed to and the details of the subscription
|
||||
// It returns a json object containing the details of the subscriptions along with any errors in case of partial failures
|
||||
//
|
||||
//export waku_filter_subscribe
|
||||
func waku_filter_subscribe(filterJSON *C.char, peerID *C.char, ms C.int, onOkCb C.WakuCallBack, onErrCb C.WakuCallBack) C.int {
|
||||
|
@ -42,7 +42,7 @@ func waku_filter_ping(peerID *C.char, ms C.int, onErrCb C.WakuCallBack) C.int {
|
|||
// criteria
|
||||
//
|
||||
// {
|
||||
// "pubsubTopic": "the pubsub topic" // mandatory
|
||||
// "pubsubTopic": "the pubsub topic" // optional if using autosharding, mandatory if using static or named sharding.
|
||||
// "contentTopics": ["the content topic"] // mandatory, at least one required, with a max of 10
|
||||
// }
|
||||
//
|
||||
|
|
|
@ -28,6 +28,11 @@ func toContentFilter(filterJSON string) (filter.ContentFilter, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
type subscribeResult struct {
|
||||
Subscriptions []*filter.SubscriptionDetails `json:"subscriptions"`
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
// FilterSubscribe is used to create a subscription to a filter node to receive messages
|
||||
func FilterSubscribe(filterJSON string, peerID string, ms int) (string, error) {
|
||||
cf, err := toContentFilter(filterJSON)
|
||||
|
@ -60,18 +65,22 @@ func FilterSubscribe(filterJSON string, peerID string, ms int) (string, error) {
|
|||
fOptions = append(fOptions, filter.WithAutomaticPeerSelection())
|
||||
}
|
||||
|
||||
subscriptionDetails, err := wakuState.node.FilterLightnode().Subscribe(ctx, cf, fOptions...)
|
||||
if err != nil {
|
||||
subscriptions, err := wakuState.node.FilterLightnode().Subscribe(ctx, cf, fOptions...)
|
||||
if err != nil && subscriptions == nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
go func(subscriptionDetails *filter.SubscriptionDetails) {
|
||||
for envelope := range subscriptionDetails.C {
|
||||
send("message", toSubscriptionMessage(envelope))
|
||||
}
|
||||
}(subscriptionDetails)
|
||||
|
||||
return marshalJSON(subscriptionDetails)
|
||||
for _, subscriptionDetails := range subscriptions {
|
||||
go func(subscriptionDetails *filter.SubscriptionDetails) {
|
||||
for envelope := range subscriptionDetails.C {
|
||||
send("message", toSubscriptionMessage(envelope))
|
||||
}
|
||||
}(subscriptionDetails)
|
||||
}
|
||||
var subResult subscribeResult
|
||||
subResult.Subscriptions = subscriptions
|
||||
subResult.Error = err.Error()
|
||||
return marshalJSON(subResult)
|
||||
}
|
||||
|
||||
// FilterPing is used to determine if a peer has an active subscription
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"fmt"
|
||||
"math"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
|
@ -44,6 +45,10 @@ type WakuFilterLightNode struct {
|
|||
pm *peermanager.PeerManager
|
||||
}
|
||||
|
||||
// ContentFilter is used to specify the filter to be applied for a FilterNode.
|
||||
// Topic means pubSubTopic - optional in case of using contentTopics that following Auto sharding, mandatory in case of named or static sharding.
|
||||
// ContentTopics - Specify list of content topics to be filtered under a pubSubTopic (for named and static sharding), or a list of contentTopics (in case ofAuto sharding)
|
||||
// If pubSub topic is not specified, then content-topics are used to derive the shard and corresponding pubSubTopic using autosharding algorithm
|
||||
type ContentFilter struct {
|
||||
PubsubTopic string
|
||||
ContentTopics ContentTopicSet
|
||||
|
@ -117,7 +122,6 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Str
|
|||
return func(s network.Stream) {
|
||||
defer s.Close()
|
||||
logger := wf.log.With(logging.HostID("peer", s.Conn().RemotePeer()))
|
||||
|
||||
if !wf.subscriptions.IsSubscribedTo(s.Conn().RemotePeer()) {
|
||||
logger.Warn("received message push from unknown peer", logging.HostID("peerID", s.Conn().RemotePeer()))
|
||||
wf.metrics.RecordError(unknownPeerMessagePush)
|
||||
|
@ -133,15 +137,21 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Str
|
|||
wf.metrics.RecordError(decodeRPCFailure)
|
||||
return
|
||||
}
|
||||
pubSubTopic := ""
|
||||
//For now returning failure, this will get addressed with autosharding changes for filter.
|
||||
if messagePush.PubsubTopic == nil {
|
||||
logger.Error("empty pubsub topic")
|
||||
wf.metrics.RecordError(decodeRPCFailure)
|
||||
return
|
||||
pubSubTopic, err = getPubSubTopicFromContentTopic(messagePush.WakuMessage.ContentTopic)
|
||||
if err != nil {
|
||||
logger.Error("could not derive pubSubTopic from contentTopic", zap.Error(err))
|
||||
wf.metrics.RecordError(decodeRPCFailure)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
pubSubTopic = *messagePush.PubsubTopic
|
||||
}
|
||||
if !wf.subscriptions.Has(s.Conn().RemotePeer(), *messagePush.PubsubTopic, messagePush.WakuMessage.ContentTopic) {
|
||||
if !wf.subscriptions.Has(s.Conn().RemotePeer(), pubSubTopic, messagePush.WakuMessage.ContentTopic) {
|
||||
logger.Warn("received messagepush with invalid subscription parameters",
|
||||
logging.HostID("peerID", s.Conn().RemotePeer()), zap.String("topic", *messagePush.PubsubTopic),
|
||||
logging.HostID("peerID", s.Conn().RemotePeer()), zap.String("topic", pubSubTopic),
|
||||
zap.String("contentTopic", messagePush.WakuMessage.ContentTopic))
|
||||
wf.metrics.RecordError(invalidSubscriptionMessage)
|
||||
return
|
||||
|
@ -149,7 +159,7 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Str
|
|||
|
||||
wf.metrics.RecordMessage()
|
||||
|
||||
wf.notify(s.Conn().RemotePeer(), *messagePush.PubsubTopic, messagePush.WakuMessage)
|
||||
wf.notify(s.Conn().RemotePeer(), pubSubTopic, messagePush.WakuMessage)
|
||||
|
||||
logger.Info("received message push")
|
||||
}
|
||||
|
@ -200,7 +210,6 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr
|
|||
wf.metrics.RecordError(decodeRPCFailure)
|
||||
return err
|
||||
}
|
||||
|
||||
if filterSubscribeResponse.RequestId != request.RequestId {
|
||||
wf.log.Error("requestID mismatch", zap.String("expected", request.RequestId), zap.String("received", filterSubscribeResponse.RequestId))
|
||||
wf.metrics.RecordError(requestIDMismatch)
|
||||
|
@ -217,18 +226,50 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr
|
|||
return nil
|
||||
}
|
||||
|
||||
func getPubSubTopicFromContentTopic(cTopicString string) (string, error) {
|
||||
cTopic, err := protocol.StringToContentTopic(cTopicString)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("%s : %s", err.Error(), cTopicString)
|
||||
}
|
||||
pTopic := protocol.GetShardFromContentTopic(cTopic, protocol.GenerationZeroShardsCount)
|
||||
|
||||
return pTopic.String(), nil
|
||||
}
|
||||
|
||||
// This function converts a contentFilter into a map of pubSubTopics and corresponding contentTopics
|
||||
func contentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[string][]string, error) {
|
||||
pubSubTopicMap := make(map[string][]string)
|
||||
|
||||
if contentFilter.PubsubTopic != "" {
|
||||
pubSubTopicMap[contentFilter.PubsubTopic] = contentFilter.ContentTopicsList()
|
||||
} else {
|
||||
//Parse the content-Topics to figure out shards.
|
||||
for _, cTopicString := range contentFilter.ContentTopicsList() {
|
||||
pTopicStr, err := getPubSubTopicFromContentTopic(cTopicString)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, ok := pubSubTopicMap[pTopicStr]
|
||||
if !ok {
|
||||
pubSubTopicMap[pTopicStr] = []string{}
|
||||
}
|
||||
pubSubTopicMap[pTopicStr] = append(pubSubTopicMap[pTopicStr], cTopicString)
|
||||
}
|
||||
}
|
||||
return pubSubTopicMap, nil
|
||||
}
|
||||
|
||||
// Subscribe setups a subscription to receive messages that match a specific content filter
|
||||
func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) (*SubscriptionDetails, error) {
|
||||
// If contentTopics passed result in different pubSub topics (due to Auto/Static sharding), then multiple subscription requests are sent to the peer.
|
||||
// This may change if Filterv2 protocol is updated to handle such a scenario in a single request.
|
||||
// Note: In case of partial failure, results are returned for successful subscriptions along with error indicating failed contentTopics.
|
||||
func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) ([]*SubscriptionDetails, error) {
|
||||
wf.RLock()
|
||||
defer wf.RUnlock()
|
||||
if err := wf.ErrOnNotRunning(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if contentFilter.PubsubTopic == "" {
|
||||
return nil, errors.New("pubsub topic is required")
|
||||
}
|
||||
|
||||
if len(contentFilter.ContentTopics) == 0 {
|
||||
return nil, errors.New("at least one content topic is required")
|
||||
}
|
||||
|
@ -253,15 +294,36 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter Cont
|
|||
return nil, ErrNoPeersAvailable
|
||||
}
|
||||
|
||||
existingSub := wf.subscriptions.Get(params.selectedPeer, contentFilter)
|
||||
if existingSub != nil {
|
||||
return existingSub, nil
|
||||
} else {
|
||||
err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, contentFilter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
pubSubTopicMap, err := contentFilterToPubSubTopicMap(contentFilter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
failedContentTopics := []string{}
|
||||
subscriptions := make([]*SubscriptionDetails, 0)
|
||||
for pubSubTopic, cTopics := range pubSubTopicMap {
|
||||
var cFilter ContentFilter
|
||||
cFilter.PubsubTopic = pubSubTopic
|
||||
cFilter.ContentTopics = NewContentTopicSet(cTopics...)
|
||||
existingSub := wf.subscriptions.Get(params.selectedPeer, contentFilter)
|
||||
if existingSub != nil {
|
||||
subscriptions = append(subscriptions, existingSub)
|
||||
} else {
|
||||
//TO OPTIMIZE: Should we parallelize these, if so till how many batches?
|
||||
err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, cFilter)
|
||||
if err != nil {
|
||||
wf.log.Error("Failed to subscribe for conentTopics ",
|
||||
zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics),
|
||||
zap.Error(err))
|
||||
failedContentTopics = append(failedContentTopics, cTopics...)
|
||||
}
|
||||
subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(params.selectedPeer, cFilter))
|
||||
}
|
||||
return wf.subscriptions.NewSubscription(params.selectedPeer, contentFilter), nil
|
||||
}
|
||||
|
||||
if len(failedContentTopics) > 0 {
|
||||
return subscriptions, fmt.Errorf("subscriptions failed for contentTopics: %s", strings.Join(failedContentTopics, ","))
|
||||
} else {
|
||||
return subscriptions, nil
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -377,10 +439,6 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if contentFilter.PubsubTopic == "" {
|
||||
return nil, errors.New("pubsub topic is required")
|
||||
}
|
||||
|
||||
if len(contentFilter.ContentTopics) == 0 {
|
||||
return nil, errors.New("at least one content topic is required")
|
||||
}
|
||||
|
@ -394,57 +452,66 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co
|
|||
return nil, err
|
||||
}
|
||||
|
||||
pubSubTopicMap, err := contentFilterToPubSubTopicMap(contentFilter)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.items))
|
||||
for peerID := range wf.subscriptions.items {
|
||||
if params.selectedPeer != "" && peerID != params.selectedPeer {
|
||||
continue
|
||||
}
|
||||
for pTopic, cTopics := range pubSubTopicMap {
|
||||
var cFilter ContentFilter
|
||||
cFilter.PubsubTopic = pTopic
|
||||
cFilter.ContentTopics = NewContentTopicSet(cTopics...)
|
||||
for peerID := range wf.subscriptions.items {
|
||||
if params.selectedPeer != "" && peerID != params.selectedPeer {
|
||||
continue
|
||||
}
|
||||
|
||||
subscriptions, ok := wf.subscriptions.items[peerID]
|
||||
if !ok || subscriptions == nil {
|
||||
continue
|
||||
}
|
||||
subscriptions, ok := wf.subscriptions.items[peerID]
|
||||
if !ok || subscriptions == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
wf.cleanupSubscriptions(peerID, contentFilter)
|
||||
if len(subscriptions.subsPerPubsubTopic) == 0 {
|
||||
delete(wf.subscriptions.items, peerID)
|
||||
}
|
||||
|
||||
if params.wg != nil {
|
||||
params.wg.Add(1)
|
||||
}
|
||||
|
||||
go func(peerID peer.ID) {
|
||||
defer func() {
|
||||
if params.wg != nil {
|
||||
params.wg.Done()
|
||||
}
|
||||
}()
|
||||
|
||||
err := wf.request(
|
||||
ctx,
|
||||
&FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID},
|
||||
pb.FilterSubscribeRequest_UNSUBSCRIBE,
|
||||
contentFilter)
|
||||
if err != nil {
|
||||
ferr, ok := err.(*FilterError)
|
||||
if ok && ferr.Code == http.StatusNotFound {
|
||||
wf.log.Warn("peer does not have a subscription", logging.HostID("peerID", peerID), zap.Error(err))
|
||||
} else {
|
||||
wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err))
|
||||
return
|
||||
}
|
||||
wf.cleanupSubscriptions(peerID, cFilter)
|
||||
if len(subscriptions.subsPerPubsubTopic) == 0 {
|
||||
delete(wf.subscriptions.items, peerID)
|
||||
}
|
||||
|
||||
if params.wg != nil {
|
||||
resultChan <- WakuFilterPushResult{
|
||||
Err: err,
|
||||
PeerID: peerID,
|
||||
}
|
||||
params.wg.Add(1)
|
||||
}
|
||||
}(peerID)
|
||||
}
|
||||
|
||||
go func(peerID peer.ID) {
|
||||
defer func() {
|
||||
if params.wg != nil {
|
||||
params.wg.Done()
|
||||
}
|
||||
}()
|
||||
|
||||
err := wf.request(
|
||||
ctx,
|
||||
&FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID},
|
||||
pb.FilterSubscribeRequest_UNSUBSCRIBE,
|
||||
cFilter)
|
||||
if err != nil {
|
||||
ferr, ok := err.(*FilterError)
|
||||
if ok && ferr.Code == http.StatusNotFound {
|
||||
wf.log.Warn("peer does not have a subscription", logging.HostID("peerID", peerID), zap.Error(err))
|
||||
} else {
|
||||
wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if params.wg != nil {
|
||||
resultChan <- WakuFilterPushResult{
|
||||
Err: err,
|
||||
PeerID: peerID,
|
||||
}
|
||||
}
|
||||
}(peerID)
|
||||
}
|
||||
}
|
||||
if params.wg != nil {
|
||||
params.wg.Wait()
|
||||
}
|
||||
|
|
|
@ -42,7 +42,7 @@ type FilterTestSuite struct {
|
|||
fullNodeHost host.Host
|
||||
wg *sync.WaitGroup
|
||||
contentFilter ContentFilter
|
||||
subDetails *SubscriptionDetails
|
||||
subDetails []*SubscriptionDetails
|
||||
log *zap.Logger
|
||||
}
|
||||
|
||||
|
@ -142,7 +142,7 @@ func (s *FilterTestSuite) waitForTimeout(fn func(), ch chan *protocol.Envelope)
|
|||
s.wg.Wait()
|
||||
}
|
||||
|
||||
func (s *FilterTestSuite) subscribe(pubsubTopic string, contentTopic string, peer peer.ID) *SubscriptionDetails {
|
||||
func (s *FilterTestSuite) subscribe(pubsubTopic string, contentTopic string, peer peer.ID) []*SubscriptionDetails {
|
||||
s.contentFilter = ContentFilter{pubsubTopic, NewContentTopicSet(contentTopic)}
|
||||
|
||||
subDetails, err := s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(peer))
|
||||
|
@ -181,7 +181,8 @@ func (s *FilterTestSuite) SetupTest() {
|
|||
s.testTopic = "/waku/2/go/filter/test"
|
||||
s.testContentTopic = "TopicA"
|
||||
|
||||
s.lightNode = s.makeWakuFilterLightNode(true, false)
|
||||
s.lightNode = s.makeWakuFilterLightNode(true, true)
|
||||
|
||||
//TODO: Add tests to verify broadcaster.
|
||||
|
||||
s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic)
|
||||
|
@ -202,19 +203,18 @@ func (s *FilterTestSuite) TearDownTest() {
|
|||
}
|
||||
|
||||
func (s *FilterTestSuite) TestWakuFilter() {
|
||||
|
||||
// Initial subscribe
|
||||
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
|
||||
|
||||
// Should be received
|
||||
s.waitForMsg(func() {
|
||||
s.publishMsg(s.testTopic, s.testContentTopic, "first")
|
||||
}, s.subDetails.C)
|
||||
}, s.subDetails[0].C)
|
||||
|
||||
// Wrong content topic
|
||||
s.waitForTimeout(func() {
|
||||
s.publishMsg(s.testTopic, "TopicB", "second")
|
||||
}, s.subDetails.C)
|
||||
}, s.subDetails[0].C)
|
||||
|
||||
_, err := s.lightNode.Unsubscribe(s.ctx, s.contentFilter, Peer(s.fullNodeHost.ID()))
|
||||
s.Require().NoError(err)
|
||||
|
@ -224,11 +224,10 @@ func (s *FilterTestSuite) TestWakuFilter() {
|
|||
// Should not receive after unsubscribe
|
||||
s.waitForTimeout(func() {
|
||||
s.publishMsg(s.testTopic, s.testContentTopic, "third")
|
||||
}, s.subDetails.C)
|
||||
}, s.subDetails[0].C)
|
||||
}
|
||||
|
||||
func (s *FilterTestSuite) TestSubscriptionPing() {
|
||||
|
||||
err := s.lightNode.Ping(context.Background(), s.fullNodeHost.ID())
|
||||
s.Require().Error(err)
|
||||
filterErr, ok := err.(*FilterError)
|
||||
|
@ -243,7 +242,6 @@ func (s *FilterTestSuite) TestSubscriptionPing() {
|
|||
}
|
||||
|
||||
func (s *FilterTestSuite) TestPeerFailure() {
|
||||
|
||||
broadcaster2 := relay.NewBroadcaster(10)
|
||||
s.Require().NoError(broadcaster2.Start(context.Background()))
|
||||
|
||||
|
@ -260,7 +258,7 @@ func (s *FilterTestSuite) TestPeerFailure() {
|
|||
|
||||
s.waitForMsg(func() {
|
||||
s.publishMsg(s.testTopic, s.testContentTopic)
|
||||
}, s.subDetails.C)
|
||||
}, s.subDetails[0].C)
|
||||
|
||||
// Failure is removed
|
||||
s.Require().False(s.fullNode.subscriptions.IsFailedPeer(s.lightNodeHost.ID()))
|
||||
|
@ -287,15 +285,13 @@ func (s *FilterTestSuite) TestPeerFailure() {
|
|||
}
|
||||
|
||||
func (s *FilterTestSuite) TestCreateSubscription() {
|
||||
|
||||
// Initial subscribe
|
||||
s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID())
|
||||
|
||||
s.waitForMsg(func() {
|
||||
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic)
|
||||
s.Require().NoError(err)
|
||||
|
||||
}, s.subDetails.C)
|
||||
}, s.subDetails[0].C)
|
||||
}
|
||||
|
||||
func (s *FilterTestSuite) TestModifySubscription() {
|
||||
|
@ -307,7 +303,7 @@ func (s *FilterTestSuite) TestModifySubscription() {
|
|||
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic)
|
||||
s.Require().NoError(err)
|
||||
|
||||
}, s.subDetails.C)
|
||||
}, s.subDetails[0].C)
|
||||
|
||||
// Subscribe to another content_topic
|
||||
newContentTopic := "Topic_modified"
|
||||
|
@ -317,7 +313,7 @@ func (s *FilterTestSuite) TestModifySubscription() {
|
|||
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), s.testTopic)
|
||||
s.Require().NoError(err)
|
||||
|
||||
}, s.subDetails.C)
|
||||
}, s.subDetails[0].C)
|
||||
}
|
||||
|
||||
func (s *FilterTestSuite) TestMultipleMessages() {
|
||||
|
@ -329,16 +325,17 @@ func (s *FilterTestSuite) TestMultipleMessages() {
|
|||
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "first"), s.testTopic)
|
||||
s.Require().NoError(err)
|
||||
|
||||
}, s.subDetails.C)
|
||||
}, s.subDetails[0].C)
|
||||
|
||||
s.waitForMsg(func() {
|
||||
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), s.testTopic)
|
||||
s.Require().NoError(err)
|
||||
|
||||
}, s.subDetails.C)
|
||||
}, s.subDetails[0].C)
|
||||
}
|
||||
|
||||
func (s *FilterTestSuite) TestRunningGuard() {
|
||||
|
||||
s.lightNode.Stop()
|
||||
|
||||
contentFilter := ContentFilter{"test", NewContentTopicSet("test")}
|
||||
|
@ -356,6 +353,7 @@ func (s *FilterTestSuite) TestRunningGuard() {
|
|||
}
|
||||
|
||||
func (s *FilterTestSuite) TestFireAndForgetAndCustomWg() {
|
||||
|
||||
contentFilter := ContentFilter{"test", NewContentTopicSet("test")}
|
||||
|
||||
_, err := s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID()))
|
||||
|
@ -376,6 +374,7 @@ func (s *FilterTestSuite) TestFireAndForgetAndCustomWg() {
|
|||
}
|
||||
|
||||
func (s *FilterTestSuite) TestStartStop() {
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
s.lightNode = s.makeWakuFilterLightNode(false, false)
|
||||
|
@ -403,3 +402,87 @@ func (s *FilterTestSuite) TestStartStop() {
|
|||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (s *FilterTestSuite) TestAutoShard() {
|
||||
|
||||
//Workaround as could not find a way to reuse setup test with params
|
||||
// Stop what is run in setup
|
||||
s.fullNode.Stop()
|
||||
s.lightNode.Stop()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) // Test can't exceed 10 seconds
|
||||
s.ctx = ctx
|
||||
s.ctxCancel = cancel
|
||||
|
||||
cTopic1Str := "0/test/1/testTopic/proto"
|
||||
cTopic1, err := protocol.StringToContentTopic(cTopic1Str)
|
||||
s.Require().NoError(err)
|
||||
//Computing pubSubTopic only for filterFullNode.
|
||||
pubSubTopic := protocol.GetShardFromContentTopic(cTopic1, protocol.GenerationZeroShardsCount)
|
||||
s.testContentTopic = cTopic1Str
|
||||
s.testTopic = pubSubTopic.String()
|
||||
|
||||
s.lightNode = s.makeWakuFilterLightNode(true, false)
|
||||
s.relayNode, s.fullNode = s.makeWakuFilterFullNode(pubSubTopic.String())
|
||||
|
||||
s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL)
|
||||
err = s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1)
|
||||
s.Require().NoError(err)
|
||||
|
||||
s.log.Info("Testing Autoshard:CreateSubscription")
|
||||
s.subDetails = s.subscribe("", s.testContentTopic, s.fullNodeHost.ID())
|
||||
s.waitForMsg(func() {
|
||||
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic)
|
||||
s.Require().NoError(err)
|
||||
|
||||
}, s.subDetails[0].C)
|
||||
|
||||
// Wrong content topic
|
||||
s.waitForTimeout(func() {
|
||||
s.publishMsg(s.testTopic, "TopicB", "second")
|
||||
}, s.subDetails[0].C)
|
||||
|
||||
_, err = s.lightNode.Unsubscribe(s.ctx, s.contentFilter, Peer(s.fullNodeHost.ID()))
|
||||
s.Require().NoError(err)
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
// Should not receive after unsubscribe
|
||||
s.waitForTimeout(func() {
|
||||
s.publishMsg(s.testTopic, s.testContentTopic, "third")
|
||||
}, s.subDetails[0].C)
|
||||
|
||||
s.subDetails = s.subscribe("", s.testContentTopic, s.fullNodeHost.ID())
|
||||
|
||||
s.log.Info("Testing Autoshard:SubscriptionPing")
|
||||
err = s.lightNode.Ping(context.Background(), s.fullNodeHost.ID())
|
||||
s.Require().NoError(err)
|
||||
|
||||
// Test ModifySubscription Subscribe to another content_topic
|
||||
s.log.Info("Testing Autoshard:ModifySubscription")
|
||||
|
||||
newContentTopic := "0/test/1/testTopic1/proto"
|
||||
s.subDetails = s.subscribe("", newContentTopic, s.fullNodeHost.ID())
|
||||
|
||||
s.waitForMsg(func() {
|
||||
_, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), s.testTopic)
|
||||
s.Require().NoError(err)
|
||||
|
||||
}, s.subDetails[0].C)
|
||||
_, err = s.lightNode.Unsubscribe(s.ctx, ContentFilter{
|
||||
PubsubTopic: s.testTopic,
|
||||
ContentTopics: NewContentTopicSet(newContentTopic),
|
||||
})
|
||||
s.Require().NoError(err)
|
||||
|
||||
_, err = s.lightNode.UnsubscribeAll(s.ctx)
|
||||
s.Require().NoError(err)
|
||||
|
||||
}
|
||||
|
||||
func (s *FilterTestSuite) BeforeTest(suiteName, testName string) {
|
||||
s.log.Info("Executing ", zap.String("testName", testName))
|
||||
}
|
||||
|
||||
func (s *FilterTestSuite) AfterTest(suiteName, testName string) {
|
||||
s.log.Info("Finished executing ", zap.String("testName", testName))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue