From 0381b9253124200e36f66a0bdfe61ea75d771711 Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Thu, 22 Jun 2023 14:55:51 -0400 Subject: [PATCH] feat(c-bindings): filterv2 --- library/README.md | 184 ++++++++++++++++++- library/api.go | 2 +- library/api_filter.go | 70 ++++--- library/api_legacy_filter.go | 50 +++++ mobile/api.go | 4 +- mobile/api_filter.go | 141 +++++++++++--- mobile/api_legacy_filter.go | 108 +++++++++++ mobile/config.go | 8 +- waku/v2/protocol/filter/client.go | 56 +++++- waku/v2/protocol/filter/common.go | 2 +- waku/v2/protocol/filter/subscriptions_map.go | 20 ++ 11 files changed, 577 insertions(+), 68 deletions(-) create mode 100644 library/api_legacy_filter.go create mode 100644 mobile/api_legacy_filter.go diff --git a/library/README.md b/library/README.md index d4fdc0f1..a67bf2e8 100644 --- a/library/README.md +++ b/library/README.md @@ -91,7 +91,24 @@ Fields: ### `FilterSubscription` type -The criteria to create subscription to a light node in JSON Format: +The criteria to create subscription to a filter full node in JSON Format: + +```ts +{ + contentTopics: string[]; + pubsubTopic: string; +} +``` + +Fields: + +- `contentTopics`: Array of content topics. +- `topic`: pubsub topic. + + +### `LegacyFilterSubscription` type + +The criteria to create subscription to a filter full node in JSON Format: ```ts { @@ -274,7 +291,7 @@ interface JsonConfig { relayTopics?: Array; gossipsubParameters?: GossipSubParameters; minPeersToPublish?: number; - filter?: boolean; + legacyFilter?: boolean; discV5?: boolean; discV5BootstrapNodes?: Array; discV5UDPPort?: number; @@ -310,7 +327,7 @@ If a key is `undefined`, or `null`, a default value will be set. - `gossipSubParameters`: custom gossipsub parameters. See `GossipSubParameters` section for defaults - `minPeersToPublish`: The minimum number of peers required on a topic to allow broadcasting a message. Default `0`. -- `filter`: Enable filter protocol. +- `legacyFilter`: Enable Legacy Filter protocol. Default `false`. - `discV5`: Enable DiscoveryV5. Default `false` @@ -918,7 +935,7 @@ For example: ### `extern char* waku_filter_subscribe(char* filterJSON, char* peerID, int timeoutMs)` -Creates a subscription in a lightnode for messages that matches a content filter and optionally a [PubSub `topic`](https://github.com/libp2p/specs/blob/master/pubsub/README.md#the-topic-descriptor). +Creates a subscription to a filter full node matching a content filter.. **Parameters** @@ -935,6 +952,161 @@ Creates a subscription in a lightnode for messages that matches a content filter **Returns** +A [`JsonResponse`](#jsonresponse-type). +If the execution is successful, the `result` field will contain the subscription details. + +For example: + +```json +{ + "result": { + "peerID": "....", + "pubsubTopic": "...", + "contentTopics": [...] + } +} +``` + +**Events** + +When a message is received, a ``"message"` event` is emitted containing the message, pubsub topic, and node ID in which +the message was received. + +The `event` type is [`JsonMessageEvent`](#jsonmessageevent-type). + +For Example: + +```json +{ + "type": "message", + "event": { + "pubsubTopic": "/waku/2/default-waku/proto", + "messageId": "0x6496491e40dbe0b6c3a2198c2426b16301688a2daebc4f57ad7706115eac3ad1", + "wakuMessage": { + "payload": "TODO", + "contentTopic": "/my-app/1/notification/proto", + "version": 1, + "timestamp": 1647826358000000000 + } + } +} +``` + + +### `extern char* waku_filter_ping(char* peerID, int timeoutMs)` + +Used to know if a service node has an active subscription for this client + +**Parameters** + +1. `char* peerID`: Peer ID to check for an active subscription + The peer must be already known. + It must have been added before with [`waku_add_peer`](#extern-char-waku_add_peerchar-address-char-protocolid) + or previously dialed with [`waku_connect_peer`](#extern-char-waku_connect_peerchar-address-int-timeoutms). +2. `int timeoutMs`: Timeout value in milliseconds to execute the call. + If the function execution takes longer than this value, + the execution will be canceled and an error returned. + Use `0` for no timeout. + +**Returns** + +A [`JsonResponse`](#jsonresponse-type). +If the execution is successful, the `result` field is set to `true`. + +For example: + +```json +{ + "result": true +} +``` + + +### `extern char* waku_filter_unsubscribe(filterJSON *C.char, char* peerID, int timeoutMs)` + +Sends a requests to a service node to stop pushing messages matching this filter to this client. It might be used to modify an existing subscription by providing a subset of the original filter criteria +**Parameters** + +1. `char* filterJSON`: JSON string containing the [`FilterSubscription`](#filtersubscription-type) criteria to unsubscribe from +2. `char* peerID`: Peer ID to unsubscribe from + The peer must be already known. + It must have been added before with [`waku_add_peer`](#extern-char-waku_add_peerchar-address-char-protocolid) + or previously dialed with [`waku_connect_peer`](#extern-char-waku_connect_peerchar-address-int-timeoutms). +3. `int timeoutMs`: Timeout value in milliseconds to execute the call. + If the function execution takes longer than this value, + the execution will be canceled and an error returned. + Use `0` for no timeout. + +**Returns** + +A [`JsonResponse`](#jsonresponse-type). +If the execution is successful, the `result` field is set to `true`. + +For example: +```json +{ + "result": true +} +``` + + +### `extern char* waku_filter_unsubscribe_all(char* peerID, int timeoutMs)` + +Sends a requests to a service node (or all service nodes) to stop pushing messages + +**Parameters** + +1. `char* peerID`: Peer ID to unsubscribe from + The peer must be already known. + It must have been added before with [`waku_add_peer`](#extern-char-waku_add_peerchar-address-char-protocolid) + or previously dialed with [`waku_connect_peer`](#extern-char-waku_connect_peerchar-address-int-timeoutms). + Use `NULL` to unsubscribe from all peers with active subscriptions +2. `int timeoutMs`: Timeout value in milliseconds to execute the call. + If the function execution takes longer than this value, + the execution will be canceled and an error returned. + Use `0` for no timeout. + +**Returns** + +A [`JsonResponse`](#jsonresponse-type). +If the execution is successful, the `result` field will contain an array with information about the state of each unsubscription attempt (one per peer) + +For example: + +```json +{ + "result": [ + { + "peerID": ...., + "error": "" // Empty if succesful + }, + ... + ] +} +``` + + +## Waku Legacy Filter + +### `extern char* waku_legacy_filter_subscribe(char* filterJSON, char* peerID, int timeoutMs)` + +Creates a subscription in a lightnode for messages that matches a content filter and optionally a [PubSub `topic`](https://github.com/libp2p/specs/blob/master/pubsub/README.md#the-topic-descriptor). + +**Parameters** + +1. `char* filterJSON`: JSON string containing the [`LegacyFilterSubscription`](#legacyfiltersubscription-type) to subscribe to. +2. `char* peerID`: Peer ID to subscribe to. + The peer must be already known. + It must have been added before with [`waku_add_peer`](#extern-char-waku_add_peerchar-address-char-protocolid) + or previously dialed with [`waku_connect_peer`](#extern-char-waku_connect_peerchar-address-int-timeoutms). + Use `NULL` to automatically select a node. +3. `int timeoutMs`: Timeout value in milliseconds to execute the call. + If the function execution takes longer than this value, + the execution will be canceled and an error returned. + Use `0` for no timeout. + +**Returns** + A [`JsonResponse`](#jsonresponse-type). If the execution is successful, the `result` field is set to `true`. @@ -971,13 +1143,13 @@ For Example: } ``` -### `extern char* waku_filter_unsubscribe(char* filterJSON, int timeoutMs)` +### `extern char* waku_legacy_filter_unsubscribe(char* filterJSON, int timeoutMs)` Removes subscriptions in a light node matching a content filter and, optionally, a [PubSub `topic`](https://github.com/libp2p/specs/blob/master/pubsub/README.md#the-topic-descriptor). **Parameters** -1. `char* filterJSON`: JSON string containing the [`FilterSubscription`](#filtersubscription-type). +1. `char* filterJSON`: JSON string containing the [`LegacyFilterSubscription`](#filtersubscription-type). 2. `int timeoutMs`: Timeout value in milliseconds to execute the call. If the function execution takes longer than this value, the execution will be canceled and an error returned. diff --git a/library/api.go b/library/api.go index 82727d9d..a84165b8 100644 --- a/library/api.go +++ b/library/api.go @@ -60,7 +60,7 @@ func main() {} // - seenMessagesTTLSeconds: configures when a previously seen message ID can be forgotten about. Default `120` seconds // // - minPeersToPublish: The minimum number of peers required on a topic to allow broadcasting a message. Default `0` -// - filter: Enable Filter. Default `false` +// - legacyFilter: Enable LegacyFilter. Default `false` // - discV5: Enable DiscoveryV5. Default `false` // - discV5BootstrapNodes: Array of bootstrap nodes ENR // - discV5UDPPort: UDP port for DiscoveryV5 diff --git a/library/api_filter.go b/library/api_filter.go index a25199c2..604d133a 100644 --- a/library/api_filter.go +++ b/library/api_filter.go @@ -6,39 +6,63 @@ import ( mobile "github.com/waku-org/go-waku/mobile" ) -//export waku_filter_subscribe -// Creates a subscription to a light node matching a content filter and, optionally, a pubSub topic. +// Creates a subscription to a filter full node matching a content filter. // filterJSON must contain a JSON with this format: -// { -// "contentFilters": [ // mandatory -// { -// "contentTopic": "the content topic" -// }, ... -// ], -// "pubsubTopic": "the pubsub topic" // optional -// } +// +// { +// "pubsubTopic": "the pubsub topic" // mandatory +// "contentTopics": ["the content topic"] // mandatory, at least one required, with a max of 10 +// } +// // peerID should contain the ID of a peer supporting the filter protocol. Use NULL to automatically select a node // If ms is greater than 0, the subscription must happen before the timeout // (in milliseconds) is reached, or an error will be returned +// It returns a json object containing the peerID to which we are subscribed to and the details of the subscription +// +//export waku_filter_subscribe func waku_filter_subscribe(filterJSON *C.char, peerID *C.char, ms C.int) *C.char { response := mobile.FilterSubscribe(C.GoString(filterJSON), C.GoString(peerID), int(ms)) return C.CString(response) } -//export waku_filter_unsubscribe -// Removes subscriptions in a light node matching a content filter and, optionally, a pubSub topic. -// filterJSON must contain a JSON with this format: -// { -// "contentFilters": [ // mandatory -// { -// "contentTopic": "the content topic" -// }, ... -// ], -// "pubsubTopic": "the pubsub topic" // optional -// } +// Used to know if a service node has an active subscription for this client +// peerID should contain the ID of a peer we are subscribed to, supporting the filter protocol // If ms is greater than 0, the subscription must happen before the timeout // (in milliseconds) is reached, or an error will be returned -func waku_filter_unsubscribe(filterJSON *C.char, ms C.int) *C.char { - response := mobile.FilterUnsubscribe(C.GoString(filterJSON), int(ms)) +// +//export waku_filter_ping +func waku_filter_ping(peerID *C.char, ms C.int) *C.char { + response := mobile.FilterPing(C.GoString(peerID), int(ms)) + return C.CString(response) +} + +// Sends a requests to a service node to stop pushing messages matching this filter to this client. +// It might be used to modify an existing subscription by providing a subset of the original filter +// criteria +// +// { +// "pubsubTopic": "the pubsub topic" // mandatory +// "contentTopics": ["the content topic"] // mandatory, at least one required, with a max of 10 +// } +// +// peerID should contain the ID of a peer this client is subscribed to. +// If ms is greater than 0, the subscription must happen before the timeout +// (in milliseconds) is reached, or an error will be returned +// +//export waku_filter_unsubscribe +func waku_filter_unsubscribe(filterJSON *C.char, peerID *C.char, ms C.int) *C.char { + response := mobile.FilterUnsubscribe(C.GoString(filterJSON), C.GoString(peerID), int(ms)) + return C.CString(response) +} + +// Sends a requests to a service node (or all service nodes) to stop pushing messages +// peerID should contain the ID of a peer this client is subscribed to, or can be NULL to +// stop all active subscriptions +// If ms is greater than 0, the subscription must happen before the timeout +// (in milliseconds) is reached, or an error will be returned +// +//export waku_filter_unsubscribe_all +func waku_filter_unsubscribe_all(peerID *C.char, ms C.int) *C.char { + response := mobile.FilterUnsubscribeAll(C.GoString(peerID), int(ms)) return C.CString(response) } diff --git a/library/api_legacy_filter.go b/library/api_legacy_filter.go new file mode 100644 index 00000000..7926ac61 --- /dev/null +++ b/library/api_legacy_filter.go @@ -0,0 +1,50 @@ +package main + +import ( + "C" + + mobile "github.com/waku-org/go-waku/mobile" +) + +// Creates a subscription to a light node matching a content filter and, optionally, a pubSub topic. +// filterJSON must contain a JSON with this format: +// +// { +// "contentFilters": [ // mandatory +// { +// "contentTopic": "the content topic" +// }, ... +// ], +// "pubsubTopic": "the pubsub topic" // optional +// } +// +// peerID should contain the ID of a peer supporting the filter protocol. Use NULL to automatically select a node +// If ms is greater than 0, the subscription must happen before the timeout +// (in milliseconds) is reached, or an error will be returned +// +//export waku_legacy_filter_subscribe +func waku_legacy_filter_subscribe(filterJSON *C.char, peerID *C.char, ms C.int) *C.char { + response := mobile.LegacyFilterSubscribe(C.GoString(filterJSON), C.GoString(peerID), int(ms)) + return C.CString(response) +} + +// Removes subscriptions in a light node matching a content filter and, optionally, a pubSub topic. +// filterJSON must contain a JSON with this format: +// +// { +// "contentFilters": [ // mandatory +// { +// "contentTopic": "the content topic" +// }, ... +// ], +// "pubsubTopic": "the pubsub topic" // optional +// } +// +// If ms is greater than 0, the subscription must happen before the timeout +// (in milliseconds) is reached, or an error will be returned +// +//export waku_legacy_filter_unsubscribe +func waku_legacy_filter_unsubscribe(filterJSON *C.char, ms C.int) *C.char { + response := mobile.LegacyFilterUnsubscribe(C.GoString(filterJSON), int(ms)) + return C.CString(response) +} diff --git a/mobile/api.go b/mobile/api.go index ed1e803c..815725be 100644 --- a/mobile/api.go +++ b/mobile/api.go @@ -107,10 +107,12 @@ func NewNode(configJSON string) string { opts = append(opts, node.WithWakuRelayAndMinPeers(*config.MinPeersToPublish, pubsubOpt...)) } - if *config.EnableFilter { + if *config.EnableLegacyFilter { opts = append(opts, node.WithLegacyWakuFilter(false)) } + opts = append(opts, node.WithWakuFilterLightNode()) + if *config.EnableStore { var db *sql.DB var migrationFn func(*sql.DB) error diff --git a/mobile/api_filter.go b/mobile/api_filter.go index d168b360..c69701c1 100644 --- a/mobile/api_filter.go +++ b/mobile/api_filter.go @@ -3,33 +3,29 @@ package gowaku import ( "context" "encoding/json" + "errors" "time" "github.com/libp2p/go-libp2p/core/peer" - "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter" - "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/filter" ) type FilterArgument struct { - Topic string `json:"pubsubTopic,omitempty"` - ContentFilters []*pb.FilterRequest_ContentFilter `json:"contentFilters,omitempty"` + Topic string `json:"pubsubTopic,omitempty"` + ContentTopics []string `json:"contentTopics,omitempty"` } -func toContentFilter(filterJSON string) (legacy_filter.ContentFilter, error) { +func toContentFilter(filterJSON string) (filter.ContentFilter, error) { var f FilterArgument err := json.Unmarshal([]byte(filterJSON), &f) if err != nil { - return legacy_filter.ContentFilter{}, err + return filter.ContentFilter{}, err } - result := legacy_filter.ContentFilter{ - Topic: f.Topic, - } - for _, cf := range f.ContentFilters { - result.ContentTopics = append(result.ContentTopics, cf.ContentTopic) - } - - return result, err + return filter.ContentFilter{ + Topic: f.Topic, + ContentTopics: f.ContentTopics, + }, nil } func FilterSubscribe(filterJSON string, peerID string, ms int) string { @@ -52,32 +48,63 @@ func FilterSubscribe(filterJSON string, peerID string, ms int) string { ctx = context.Background() } - var fOptions []legacy_filter.FilterSubscribeOption + var fOptions []filter.FilterSubscribeOption if peerID != "" { p, err := peer.Decode(peerID) if err != nil { return MakeJSONResponse(err) } - fOptions = append(fOptions, legacy_filter.WithPeer(p)) + fOptions = append(fOptions, filter.WithPeer(p)) } else { - fOptions = append(fOptions, legacy_filter.WithAutomaticPeerSelection()) + fOptions = append(fOptions, filter.WithAutomaticPeerSelection()) } - _, f, err := wakuState.node.LegacyFilter().Subscribe(ctx, cf, fOptions...) + subscriptionDetails, err := wakuState.node.FilterLightnode().Subscribe(ctx, cf, fOptions...) if err != nil { return MakeJSONResponse(err) } - go func(f legacy_filter.Filter) { - for envelope := range f.Chan { + go func(subscriptionDetails *filter.SubscriptionDetails) { + for envelope := range subscriptionDetails.C { send("message", toSubscriptionMessage(envelope)) } - }(f) + }(subscriptionDetails) - return PrepareJSONResponse(true, nil) + return PrepareJSONResponse(subscriptionDetails, nil) } -func FilterUnsubscribe(filterJSON string, ms int) string { +func FilterPing(peerID string, ms int) string { + if wakuState.node == nil { + return MakeJSONResponse(errWakuNodeNotReady) + } + + var ctx context.Context + var cancel context.CancelFunc + + if ms > 0 { + ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond) + defer cancel() + } else { + ctx = context.Background() + } + + var pID peer.ID + var err error + if peerID != "" { + pID, err = peer.Decode(peerID) + if err != nil { + return MakeJSONResponse(err) + } + } else { + return MakeJSONResponse(errors.New("peerID is required")) + } + + err = wakuState.node.FilterLightnode().Ping(ctx, pID) + + return MakeJSONResponse(err) +} + +func FilterUnsubscribe(filterJSON string, peerID string, ms int) string { cf, err := toContentFilter(filterJSON) if err != nil { return MakeJSONResponse(err) @@ -97,10 +124,74 @@ func FilterUnsubscribe(filterJSON string, ms int) string { ctx = context.Background() } - err = wakuState.node.LegacyFilter().UnsubscribeFilter(ctx, cf) + var fOptions []filter.FilterUnsubscribeOption + if peerID != "" { + p, err := peer.Decode(peerID) + if err != nil { + return MakeJSONResponse(err) + } + fOptions = append(fOptions, filter.Peer(p)) + } else { + return MakeJSONResponse(errors.New("peerID is required")) + } + + pushResult, err := wakuState.node.FilterLightnode().Unsubscribe(ctx, cf, fOptions...) if err != nil { return MakeJSONResponse(err) } - return MakeJSONResponse(nil) + result := <-pushResult + + return MakeJSONResponse(result.Err) +} + +type unsubscribeAllResult struct { + PeerID string `json:"peerID"` + Error string `json:"error"` +} + +func FilterUnsubscribeAll(peerID string, ms int) string { + if wakuState.node == nil { + return MakeJSONResponse(errWakuNodeNotReady) + } + + var ctx context.Context + var cancel context.CancelFunc + + if ms > 0 { + ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond) + defer cancel() + } else { + ctx = context.Background() + } + + var fOptions []filter.FilterUnsubscribeOption + if peerID != "" { + p, err := peer.Decode(peerID) + if err != nil { + return MakeJSONResponse(err) + } + fOptions = append(fOptions, filter.Peer(p)) + } else { + fOptions = append(fOptions, filter.UnsubscribeAll()) + } + + pushResult, err := wakuState.node.FilterLightnode().UnsubscribeAll(ctx, fOptions...) + if err != nil { + return MakeJSONResponse(err) + } + + var unsubscribeResult []unsubscribeAllResult + + for result := range pushResult { + ur := unsubscribeAllResult{ + PeerID: result.PeerID.Pretty(), + } + if result.Err != nil { + ur.Error = result.Err.Error() + } + unsubscribeResult = append(unsubscribeResult, ur) + } + + return PrepareJSONResponse(unsubscribeResult, nil) } diff --git a/mobile/api_legacy_filter.go b/mobile/api_legacy_filter.go new file mode 100644 index 00000000..dd3f227c --- /dev/null +++ b/mobile/api_legacy_filter.go @@ -0,0 +1,108 @@ +package gowaku + +import ( + "context" + "encoding/json" + "time" + + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter" + "github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb" +) + +type LegacyFilterArgument struct { + Topic string `json:"pubsubTopic,omitempty"` + ContentFilters []*pb.FilterRequest_ContentFilter `json:"contentFilters,omitempty"` +} + +func toLegacyContentFilter(filterJSON string) (legacy_filter.ContentFilter, error) { + var f LegacyFilterArgument + err := json.Unmarshal([]byte(filterJSON), &f) + if err != nil { + return legacy_filter.ContentFilter{}, err + } + + result := legacy_filter.ContentFilter{ + Topic: f.Topic, + } + for _, cf := range f.ContentFilters { + result.ContentTopics = append(result.ContentTopics, cf.ContentTopic) + } + + return result, err +} + +// Deprecated: Use FilterSubscribe instead +func LegacyFilterSubscribe(filterJSON string, peerID string, ms int) string { + cf, err := toLegacyContentFilter(filterJSON) + if err != nil { + return MakeJSONResponse(err) + } + + if wakuState.node == nil { + return MakeJSONResponse(errWakuNodeNotReady) + } + + var ctx context.Context + var cancel context.CancelFunc + + if ms > 0 { + ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond) + defer cancel() + } else { + ctx = context.Background() + } + + var fOptions []legacy_filter.FilterSubscribeOption + if peerID != "" { + p, err := peer.Decode(peerID) + if err != nil { + return MakeJSONResponse(err) + } + fOptions = append(fOptions, legacy_filter.WithPeer(p)) + } else { + fOptions = append(fOptions, legacy_filter.WithAutomaticPeerSelection()) + } + + _, f, err := wakuState.node.LegacyFilter().Subscribe(ctx, cf, fOptions...) + if err != nil { + return MakeJSONResponse(err) + } + + go func(f legacy_filter.Filter) { + for envelope := range f.Chan { + send("message", toSubscriptionMessage(envelope)) + } + }(f) + + return PrepareJSONResponse(true, nil) +} + +// Deprecated: Use FilterUnsubscribe or FilterUnsubscribeAll instead +func LegacyFilterUnsubscribe(filterJSON string, ms int) string { + cf, err := toLegacyContentFilter(filterJSON) + if err != nil { + return MakeJSONResponse(err) + } + + if wakuState.node == nil { + return MakeJSONResponse(errWakuNodeNotReady) + } + + var ctx context.Context + var cancel context.CancelFunc + + if ms > 0 { + ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond) + defer cancel() + } else { + ctx = context.Background() + } + + err = wakuState.node.LegacyFilter().UnsubscribeFilter(ctx, cf) + if err != nil { + return MakeJSONResponse(err) + } + + return MakeJSONResponse(nil) +} diff --git a/mobile/config.go b/mobile/config.go index 47b56887..02e2a274 100644 --- a/mobile/config.go +++ b/mobile/config.go @@ -17,7 +17,7 @@ type wakuConfig struct { EnableRelay *bool `json:"relay"` RelayTopics []string `json:"relayTopics,omitempty"` GossipSubParams *GossipSubParams `json:"gossipsubParams,omitempty"` - EnableFilter *bool `json:"filter,omitempty"` + EnableLegacyFilter *bool `json:"legacyFilter,omitempty"` MinPeersToPublish *int `json:"minPeersToPublish,omitempty"` EnableDiscV5 *bool `json:"discV5,omitempty"` DiscV5BootstrapNodes []string `json:"discV5BootstrapNodes,omitempty"` @@ -33,7 +33,7 @@ var defaultPort = 60000 var defaultKeepAliveInterval = 20 var defaultEnableRelay = true var defaultMinPeersToPublish = 0 -var defaultEnableFilter = false +var defaultEnableLegacyFilter = false var defaultEnableDiscV5 = false var defaultDiscV5UDPPort = uint(9000) var defaultLogLevel = "INFO" @@ -207,8 +207,8 @@ func getConfig(configJSON string) (wakuConfig, error) { config.EnableRelay = &defaultEnableRelay } - if config.EnableFilter == nil { - config.EnableFilter = &defaultEnableFilter + if config.EnableLegacyFilter == nil { + config.EnableLegacyFilter = &defaultEnableLegacyFilter } if config.EnableDiscV5 == nil { diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 24b7b8b6..ce617d96 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -50,8 +50,8 @@ type ContentFilter struct { } type WakuFilterPushResult struct { - err error - peerID peer.ID + Err error + PeerID peer.ID } // NewWakuRelay returns a new instance of Waku Filter struct setup according to the chosen parameter and options @@ -292,6 +292,41 @@ func (wf *WakuFilterLightnode) Subscriptions() []*SubscriptionDetails { return output } +func (wf *WakuFilterLightnode) cleanupSubscriptions(peerID peer.ID, contentFilter ContentFilter) { + wf.subscriptions.Lock() + defer wf.subscriptions.Unlock() + + peerSubscription, ok := wf.subscriptions.items[peerID] + if !ok { + return + } + + subscriptionDetailList, ok := peerSubscription.subscriptionsPerTopic[contentFilter.Topic] + if !ok { + return + } + + for subscriptionDetailID, subscriptionDetail := range subscriptionDetailList { + subscriptionDetail.Remove(contentFilter.ContentTopics...) + if len(subscriptionDetail.ContentTopics) == 0 { + delete(subscriptionDetailList, subscriptionDetailID) + } else { + subscriptionDetailList[subscriptionDetailID] = subscriptionDetail + } + } + + if len(subscriptionDetailList) == 0 { + delete(wf.subscriptions.items[peerID].subscriptionsPerTopic, contentFilter.Topic) + } else { + wf.subscriptions.items[peerID].subscriptionsPerTopic[contentFilter.Topic] = subscriptionDetailList + } + + if len(wf.subscriptions.items[peerID].subscriptionsPerTopic) == 0 { + delete(wf.subscriptions.items, peerID) + } + +} + // Unsubscribe is used to stop receiving messages from a peer that match a content filter func (wf *WakuFilterLightnode) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) { if contentFilter.Topic == "" { @@ -329,11 +364,14 @@ func (wf *WakuFilterLightnode) Unsubscribe(ctx context.Context, contentFilter Co contentFilter) if err != nil { wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err)) + return } + wf.cleanupSubscriptions(peerID, contentFilter) + resultChan <- WakuFilterPushResult{ - err: err, - peerID: peerID, + Err: err, + PeerID: peerID, } }(peerID) } @@ -375,7 +413,7 @@ func (wf *WakuFilterLightnode) UnsubscribeAll(ctx context.Context, opts ...Filte } localWg.Add(1) go func(peerID peer.ID) { - defer wf.wg.Done() + defer localWg.Done() err := wf.request( ctx, &FilterSubscribeParameters{selectedPeer: peerID}, @@ -385,9 +423,13 @@ func (wf *WakuFilterLightnode) UnsubscribeAll(ctx context.Context, opts ...Filte wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err)) } + wf.subscriptions.Lock() + delete(wf.subscriptions.items, peerID) + defer wf.subscriptions.Unlock() + resultChan <- WakuFilterPushResult{ - err: err, - peerID: peerID, + Err: err, + PeerID: peerID, } }(peerID) } diff --git a/waku/v2/protocol/filter/common.go b/waku/v2/protocol/filter/common.go index 84fe2c2e..39866768 100644 --- a/waku/v2/protocol/filter/common.go +++ b/waku/v2/protocol/filter/common.go @@ -23,5 +23,5 @@ func NewFilterError(code int, message string) FilterError { } func (e *FilterError) Error() string { - return fmt.Sprintf("error %d: %s", e.Code, e.Message) + return fmt.Sprintf("%d - %s", e.Code, e.Message) } diff --git a/waku/v2/protocol/filter/subscriptions_map.go b/waku/v2/protocol/filter/subscriptions_map.go index 8ab0e35c..66d1e166 100644 --- a/waku/v2/protocol/filter/subscriptions_map.go +++ b/waku/v2/protocol/filter/subscriptions_map.go @@ -1,6 +1,7 @@ package filter import ( + "encoding/json" "sync" "github.com/google/uuid" @@ -238,3 +239,22 @@ func iterateSubscriptionSet(logger *zap.Logger, subscriptions SubscriptionSet, e }(subscription) } } + +func (s *SubscriptionDetails) MarshalJSON() ([]byte, error) { + type resultType struct { + PeerID string `json:"peerID"` + PubsubTopic string `json:"pubsubTopic"` + ContentTopics []string `json:"contentTopics"` + } + + result := resultType{ + PeerID: s.PeerID.Pretty(), + PubsubTopic: s.PubsubTopic, + } + + for c := range s.ContentTopics { + result.ContentTopics = append(result.ContentTopics, c) + } + + return json.Marshal(result) +}