feat(c-bindings): filterv2

This commit is contained in:
Richard Ramos 2023-06-22 14:55:51 -04:00 committed by richΛrd
parent f0f3543df8
commit 0381b92531
11 changed files with 577 additions and 68 deletions

View File

@ -91,7 +91,24 @@ Fields:
### `FilterSubscription` type
The criteria to create subscription to a light node in JSON Format:
The criteria to create subscription to a filter full node in JSON Format:
```ts
{
contentTopics: string[];
pubsubTopic: string;
}
```
Fields:
- `contentTopics`: Array of content topics.
- `topic`: pubsub topic.
### `LegacyFilterSubscription` type
The criteria to create subscription to a filter full node in JSON Format:
```ts
{
@ -274,7 +291,7 @@ interface JsonConfig {
relayTopics?: Array<string>;
gossipsubParameters?: GossipSubParameters;
minPeersToPublish?: number;
filter?: boolean;
legacyFilter?: boolean;
discV5?: boolean;
discV5BootstrapNodes?: Array<string>;
discV5UDPPort?: number;
@ -310,7 +327,7 @@ If a key is `undefined`, or `null`, a default value will be set.
- `gossipSubParameters`: custom gossipsub parameters. See `GossipSubParameters` section for defaults
- `minPeersToPublish`: The minimum number of peers required on a topic to allow broadcasting a message.
Default `0`.
- `filter`: Enable filter protocol.
- `legacyFilter`: Enable Legacy Filter protocol.
Default `false`.
- `discV5`: Enable DiscoveryV5.
Default `false`
@ -918,7 +935,7 @@ For example:
### `extern char* waku_filter_subscribe(char* filterJSON, char* peerID, int timeoutMs)`
Creates a subscription in a lightnode for messages that matches a content filter and optionally a [PubSub `topic`](https://github.com/libp2p/specs/blob/master/pubsub/README.md#the-topic-descriptor).
Creates a subscription to a filter full node matching a content filter..
**Parameters**
@ -935,6 +952,161 @@ Creates a subscription in a lightnode for messages that matches a content filter
**Returns**
A [`JsonResponse`](#jsonresponse-type).
If the execution is successful, the `result` field will contain the subscription details.
For example:
```json
{
"result": {
"peerID": "....",
"pubsubTopic": "...",
"contentTopics": [...]
}
}
```
**Events**
When a message is received, a ``"message"` event` is emitted containing the message, pubsub topic, and node ID in which
the message was received.
The `event` type is [`JsonMessageEvent`](#jsonmessageevent-type).
For Example:
```json
{
"type": "message",
"event": {
"pubsubTopic": "/waku/2/default-waku/proto",
"messageId": "0x6496491e40dbe0b6c3a2198c2426b16301688a2daebc4f57ad7706115eac3ad1",
"wakuMessage": {
"payload": "TODO",
"contentTopic": "/my-app/1/notification/proto",
"version": 1,
"timestamp": 1647826358000000000
}
}
}
```
### `extern char* waku_filter_ping(char* peerID, int timeoutMs)`
Used to know if a service node has an active subscription for this client
**Parameters**
1. `char* peerID`: Peer ID to check for an active subscription
The peer must be already known.
It must have been added before with [`waku_add_peer`](#extern-char-waku_add_peerchar-address-char-protocolid)
or previously dialed with [`waku_connect_peer`](#extern-char-waku_connect_peerchar-address-int-timeoutms).
2. `int timeoutMs`: Timeout value in milliseconds to execute the call.
If the function execution takes longer than this value,
the execution will be canceled and an error returned.
Use `0` for no timeout.
**Returns**
A [`JsonResponse`](#jsonresponse-type).
If the execution is successful, the `result` field is set to `true`.
For example:
```json
{
"result": true
}
```
### `extern char* waku_filter_unsubscribe(filterJSON *C.char, char* peerID, int timeoutMs)`
Sends a requests to a service node to stop pushing messages matching this filter to this client. It might be used to modify an existing subscription by providing a subset of the original filter criteria
**Parameters**
1. `char* filterJSON`: JSON string containing the [`FilterSubscription`](#filtersubscription-type) criteria to unsubscribe from
2. `char* peerID`: Peer ID to unsubscribe from
The peer must be already known.
It must have been added before with [`waku_add_peer`](#extern-char-waku_add_peerchar-address-char-protocolid)
or previously dialed with [`waku_connect_peer`](#extern-char-waku_connect_peerchar-address-int-timeoutms).
3. `int timeoutMs`: Timeout value in milliseconds to execute the call.
If the function execution takes longer than this value,
the execution will be canceled and an error returned.
Use `0` for no timeout.
**Returns**
A [`JsonResponse`](#jsonresponse-type).
If the execution is successful, the `result` field is set to `true`.
For example:
```json
{
"result": true
}
```
### `extern char* waku_filter_unsubscribe_all(char* peerID, int timeoutMs)`
Sends a requests to a service node (or all service nodes) to stop pushing messages
**Parameters**
1. `char* peerID`: Peer ID to unsubscribe from
The peer must be already known.
It must have been added before with [`waku_add_peer`](#extern-char-waku_add_peerchar-address-char-protocolid)
or previously dialed with [`waku_connect_peer`](#extern-char-waku_connect_peerchar-address-int-timeoutms).
Use `NULL` to unsubscribe from all peers with active subscriptions
2. `int timeoutMs`: Timeout value in milliseconds to execute the call.
If the function execution takes longer than this value,
the execution will be canceled and an error returned.
Use `0` for no timeout.
**Returns**
A [`JsonResponse`](#jsonresponse-type).
If the execution is successful, the `result` field will contain an array with information about the state of each unsubscription attempt (one per peer)
For example:
```json
{
"result": [
{
"peerID": ....,
"error": "" // Empty if succesful
},
...
]
}
```
## Waku Legacy Filter
### `extern char* waku_legacy_filter_subscribe(char* filterJSON, char* peerID, int timeoutMs)`
Creates a subscription in a lightnode for messages that matches a content filter and optionally a [PubSub `topic`](https://github.com/libp2p/specs/blob/master/pubsub/README.md#the-topic-descriptor).
**Parameters**
1. `char* filterJSON`: JSON string containing the [`LegacyFilterSubscription`](#legacyfiltersubscription-type) to subscribe to.
2. `char* peerID`: Peer ID to subscribe to.
The peer must be already known.
It must have been added before with [`waku_add_peer`](#extern-char-waku_add_peerchar-address-char-protocolid)
or previously dialed with [`waku_connect_peer`](#extern-char-waku_connect_peerchar-address-int-timeoutms).
Use `NULL` to automatically select a node.
3. `int timeoutMs`: Timeout value in milliseconds to execute the call.
If the function execution takes longer than this value,
the execution will be canceled and an error returned.
Use `0` for no timeout.
**Returns**
A [`JsonResponse`](#jsonresponse-type).
If the execution is successful, the `result` field is set to `true`.
@ -971,13 +1143,13 @@ For Example:
}
```
### `extern char* waku_filter_unsubscribe(char* filterJSON, int timeoutMs)`
### `extern char* waku_legacy_filter_unsubscribe(char* filterJSON, int timeoutMs)`
Removes subscriptions in a light node matching a content filter and, optionally, a [PubSub `topic`](https://github.com/libp2p/specs/blob/master/pubsub/README.md#the-topic-descriptor).
**Parameters**
1. `char* filterJSON`: JSON string containing the [`FilterSubscription`](#filtersubscription-type).
1. `char* filterJSON`: JSON string containing the [`LegacyFilterSubscription`](#filtersubscription-type).
2. `int timeoutMs`: Timeout value in milliseconds to execute the call.
If the function execution takes longer than this value,
the execution will be canceled and an error returned.

View File

@ -60,7 +60,7 @@ func main() {}
// - seenMessagesTTLSeconds: configures when a previously seen message ID can be forgotten about. Default `120` seconds
//
// - minPeersToPublish: The minimum number of peers required on a topic to allow broadcasting a message. Default `0`
// - filter: Enable Filter. Default `false`
// - legacyFilter: Enable LegacyFilter. Default `false`
// - discV5: Enable DiscoveryV5. Default `false`
// - discV5BootstrapNodes: Array of bootstrap nodes ENR
// - discV5UDPPort: UDP port for DiscoveryV5

View File

@ -6,39 +6,63 @@ import (
mobile "github.com/waku-org/go-waku/mobile"
)
//export waku_filter_subscribe
// Creates a subscription to a light node matching a content filter and, optionally, a pubSub topic.
// Creates a subscription to a filter full node matching a content filter.
// filterJSON must contain a JSON with this format:
// {
// "contentFilters": [ // mandatory
// {
// "contentTopic": "the content topic"
// }, ...
// ],
// "pubsubTopic": "the pubsub topic" // optional
// }
//
// {
// "pubsubTopic": "the pubsub topic" // mandatory
// "contentTopics": ["the content topic"] // mandatory, at least one required, with a max of 10
// }
//
// peerID should contain the ID of a peer supporting the filter protocol. Use NULL to automatically select a node
// If ms is greater than 0, the subscription must happen before the timeout
// (in milliseconds) is reached, or an error will be returned
// It returns a json object containing the peerID to which we are subscribed to and the details of the subscription
//
//export waku_filter_subscribe
func waku_filter_subscribe(filterJSON *C.char, peerID *C.char, ms C.int) *C.char {
response := mobile.FilterSubscribe(C.GoString(filterJSON), C.GoString(peerID), int(ms))
return C.CString(response)
}
//export waku_filter_unsubscribe
// Removes subscriptions in a light node matching a content filter and, optionally, a pubSub topic.
// filterJSON must contain a JSON with this format:
// {
// "contentFilters": [ // mandatory
// {
// "contentTopic": "the content topic"
// }, ...
// ],
// "pubsubTopic": "the pubsub topic" // optional
// }
// Used to know if a service node has an active subscription for this client
// peerID should contain the ID of a peer we are subscribed to, supporting the filter protocol
// If ms is greater than 0, the subscription must happen before the timeout
// (in milliseconds) is reached, or an error will be returned
func waku_filter_unsubscribe(filterJSON *C.char, ms C.int) *C.char {
response := mobile.FilterUnsubscribe(C.GoString(filterJSON), int(ms))
//
//export waku_filter_ping
func waku_filter_ping(peerID *C.char, ms C.int) *C.char {
response := mobile.FilterPing(C.GoString(peerID), int(ms))
return C.CString(response)
}
// Sends a requests to a service node to stop pushing messages matching this filter to this client.
// It might be used to modify an existing subscription by providing a subset of the original filter
// criteria
//
// {
// "pubsubTopic": "the pubsub topic" // mandatory
// "contentTopics": ["the content topic"] // mandatory, at least one required, with a max of 10
// }
//
// peerID should contain the ID of a peer this client is subscribed to.
// If ms is greater than 0, the subscription must happen before the timeout
// (in milliseconds) is reached, or an error will be returned
//
//export waku_filter_unsubscribe
func waku_filter_unsubscribe(filterJSON *C.char, peerID *C.char, ms C.int) *C.char {
response := mobile.FilterUnsubscribe(C.GoString(filterJSON), C.GoString(peerID), int(ms))
return C.CString(response)
}
// Sends a requests to a service node (or all service nodes) to stop pushing messages
// peerID should contain the ID of a peer this client is subscribed to, or can be NULL to
// stop all active subscriptions
// If ms is greater than 0, the subscription must happen before the timeout
// (in milliseconds) is reached, or an error will be returned
//
//export waku_filter_unsubscribe_all
func waku_filter_unsubscribe_all(peerID *C.char, ms C.int) *C.char {
response := mobile.FilterUnsubscribeAll(C.GoString(peerID), int(ms))
return C.CString(response)
}

View File

@ -0,0 +1,50 @@
package main
import (
"C"
mobile "github.com/waku-org/go-waku/mobile"
)
// Creates a subscription to a light node matching a content filter and, optionally, a pubSub topic.
// filterJSON must contain a JSON with this format:
//
// {
// "contentFilters": [ // mandatory
// {
// "contentTopic": "the content topic"
// }, ...
// ],
// "pubsubTopic": "the pubsub topic" // optional
// }
//
// peerID should contain the ID of a peer supporting the filter protocol. Use NULL to automatically select a node
// If ms is greater than 0, the subscription must happen before the timeout
// (in milliseconds) is reached, or an error will be returned
//
//export waku_legacy_filter_subscribe
func waku_legacy_filter_subscribe(filterJSON *C.char, peerID *C.char, ms C.int) *C.char {
response := mobile.LegacyFilterSubscribe(C.GoString(filterJSON), C.GoString(peerID), int(ms))
return C.CString(response)
}
// Removes subscriptions in a light node matching a content filter and, optionally, a pubSub topic.
// filterJSON must contain a JSON with this format:
//
// {
// "contentFilters": [ // mandatory
// {
// "contentTopic": "the content topic"
// }, ...
// ],
// "pubsubTopic": "the pubsub topic" // optional
// }
//
// If ms is greater than 0, the subscription must happen before the timeout
// (in milliseconds) is reached, or an error will be returned
//
//export waku_legacy_filter_unsubscribe
func waku_legacy_filter_unsubscribe(filterJSON *C.char, ms C.int) *C.char {
response := mobile.LegacyFilterUnsubscribe(C.GoString(filterJSON), int(ms))
return C.CString(response)
}

View File

@ -107,10 +107,12 @@ func NewNode(configJSON string) string {
opts = append(opts, node.WithWakuRelayAndMinPeers(*config.MinPeersToPublish, pubsubOpt...))
}
if *config.EnableFilter {
if *config.EnableLegacyFilter {
opts = append(opts, node.WithLegacyWakuFilter(false))
}
opts = append(opts, node.WithWakuFilterLightNode())
if *config.EnableStore {
var db *sql.DB
var migrationFn func(*sql.DB) error

View File

@ -3,33 +3,29 @@ package gowaku
import (
"context"
"encoding/json"
"errors"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
)
type FilterArgument struct {
Topic string `json:"pubsubTopic,omitempty"`
ContentFilters []*pb.FilterRequest_ContentFilter `json:"contentFilters,omitempty"`
Topic string `json:"pubsubTopic,omitempty"`
ContentTopics []string `json:"contentTopics,omitempty"`
}
func toContentFilter(filterJSON string) (legacy_filter.ContentFilter, error) {
func toContentFilter(filterJSON string) (filter.ContentFilter, error) {
var f FilterArgument
err := json.Unmarshal([]byte(filterJSON), &f)
if err != nil {
return legacy_filter.ContentFilter{}, err
return filter.ContentFilter{}, err
}
result := legacy_filter.ContentFilter{
Topic: f.Topic,
}
for _, cf := range f.ContentFilters {
result.ContentTopics = append(result.ContentTopics, cf.ContentTopic)
}
return result, err
return filter.ContentFilter{
Topic: f.Topic,
ContentTopics: f.ContentTopics,
}, nil
}
func FilterSubscribe(filterJSON string, peerID string, ms int) string {
@ -52,32 +48,63 @@ func FilterSubscribe(filterJSON string, peerID string, ms int) string {
ctx = context.Background()
}
var fOptions []legacy_filter.FilterSubscribeOption
var fOptions []filter.FilterSubscribeOption
if peerID != "" {
p, err := peer.Decode(peerID)
if err != nil {
return MakeJSONResponse(err)
}
fOptions = append(fOptions, legacy_filter.WithPeer(p))
fOptions = append(fOptions, filter.WithPeer(p))
} else {
fOptions = append(fOptions, legacy_filter.WithAutomaticPeerSelection())
fOptions = append(fOptions, filter.WithAutomaticPeerSelection())
}
_, f, err := wakuState.node.LegacyFilter().Subscribe(ctx, cf, fOptions...)
subscriptionDetails, err := wakuState.node.FilterLightnode().Subscribe(ctx, cf, fOptions...)
if err != nil {
return MakeJSONResponse(err)
}
go func(f legacy_filter.Filter) {
for envelope := range f.Chan {
go func(subscriptionDetails *filter.SubscriptionDetails) {
for envelope := range subscriptionDetails.C {
send("message", toSubscriptionMessage(envelope))
}
}(f)
}(subscriptionDetails)
return PrepareJSONResponse(true, nil)
return PrepareJSONResponse(subscriptionDetails, nil)
}
func FilterUnsubscribe(filterJSON string, ms int) string {
func FilterPing(peerID string, ms int) string {
if wakuState.node == nil {
return MakeJSONResponse(errWakuNodeNotReady)
}
var ctx context.Context
var cancel context.CancelFunc
if ms > 0 {
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond)
defer cancel()
} else {
ctx = context.Background()
}
var pID peer.ID
var err error
if peerID != "" {
pID, err = peer.Decode(peerID)
if err != nil {
return MakeJSONResponse(err)
}
} else {
return MakeJSONResponse(errors.New("peerID is required"))
}
err = wakuState.node.FilterLightnode().Ping(ctx, pID)
return MakeJSONResponse(err)
}
func FilterUnsubscribe(filterJSON string, peerID string, ms int) string {
cf, err := toContentFilter(filterJSON)
if err != nil {
return MakeJSONResponse(err)
@ -97,10 +124,74 @@ func FilterUnsubscribe(filterJSON string, ms int) string {
ctx = context.Background()
}
err = wakuState.node.LegacyFilter().UnsubscribeFilter(ctx, cf)
var fOptions []filter.FilterUnsubscribeOption
if peerID != "" {
p, err := peer.Decode(peerID)
if err != nil {
return MakeJSONResponse(err)
}
fOptions = append(fOptions, filter.Peer(p))
} else {
return MakeJSONResponse(errors.New("peerID is required"))
}
pushResult, err := wakuState.node.FilterLightnode().Unsubscribe(ctx, cf, fOptions...)
if err != nil {
return MakeJSONResponse(err)
}
return MakeJSONResponse(nil)
result := <-pushResult
return MakeJSONResponse(result.Err)
}
type unsubscribeAllResult struct {
PeerID string `json:"peerID"`
Error string `json:"error"`
}
func FilterUnsubscribeAll(peerID string, ms int) string {
if wakuState.node == nil {
return MakeJSONResponse(errWakuNodeNotReady)
}
var ctx context.Context
var cancel context.CancelFunc
if ms > 0 {
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond)
defer cancel()
} else {
ctx = context.Background()
}
var fOptions []filter.FilterUnsubscribeOption
if peerID != "" {
p, err := peer.Decode(peerID)
if err != nil {
return MakeJSONResponse(err)
}
fOptions = append(fOptions, filter.Peer(p))
} else {
fOptions = append(fOptions, filter.UnsubscribeAll())
}
pushResult, err := wakuState.node.FilterLightnode().UnsubscribeAll(ctx, fOptions...)
if err != nil {
return MakeJSONResponse(err)
}
var unsubscribeResult []unsubscribeAllResult
for result := range pushResult {
ur := unsubscribeAllResult{
PeerID: result.PeerID.Pretty(),
}
if result.Err != nil {
ur.Error = result.Err.Error()
}
unsubscribeResult = append(unsubscribeResult, ur)
}
return PrepareJSONResponse(unsubscribeResult, nil)
}

108
mobile/api_legacy_filter.go Normal file
View File

@ -0,0 +1,108 @@
package gowaku
import (
"context"
"encoding/json"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb"
)
type LegacyFilterArgument struct {
Topic string `json:"pubsubTopic,omitempty"`
ContentFilters []*pb.FilterRequest_ContentFilter `json:"contentFilters,omitempty"`
}
func toLegacyContentFilter(filterJSON string) (legacy_filter.ContentFilter, error) {
var f LegacyFilterArgument
err := json.Unmarshal([]byte(filterJSON), &f)
if err != nil {
return legacy_filter.ContentFilter{}, err
}
result := legacy_filter.ContentFilter{
Topic: f.Topic,
}
for _, cf := range f.ContentFilters {
result.ContentTopics = append(result.ContentTopics, cf.ContentTopic)
}
return result, err
}
// Deprecated: Use FilterSubscribe instead
func LegacyFilterSubscribe(filterJSON string, peerID string, ms int) string {
cf, err := toLegacyContentFilter(filterJSON)
if err != nil {
return MakeJSONResponse(err)
}
if wakuState.node == nil {
return MakeJSONResponse(errWakuNodeNotReady)
}
var ctx context.Context
var cancel context.CancelFunc
if ms > 0 {
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond)
defer cancel()
} else {
ctx = context.Background()
}
var fOptions []legacy_filter.FilterSubscribeOption
if peerID != "" {
p, err := peer.Decode(peerID)
if err != nil {
return MakeJSONResponse(err)
}
fOptions = append(fOptions, legacy_filter.WithPeer(p))
} else {
fOptions = append(fOptions, legacy_filter.WithAutomaticPeerSelection())
}
_, f, err := wakuState.node.LegacyFilter().Subscribe(ctx, cf, fOptions...)
if err != nil {
return MakeJSONResponse(err)
}
go func(f legacy_filter.Filter) {
for envelope := range f.Chan {
send("message", toSubscriptionMessage(envelope))
}
}(f)
return PrepareJSONResponse(true, nil)
}
// Deprecated: Use FilterUnsubscribe or FilterUnsubscribeAll instead
func LegacyFilterUnsubscribe(filterJSON string, ms int) string {
cf, err := toLegacyContentFilter(filterJSON)
if err != nil {
return MakeJSONResponse(err)
}
if wakuState.node == nil {
return MakeJSONResponse(errWakuNodeNotReady)
}
var ctx context.Context
var cancel context.CancelFunc
if ms > 0 {
ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond)
defer cancel()
} else {
ctx = context.Background()
}
err = wakuState.node.LegacyFilter().UnsubscribeFilter(ctx, cf)
if err != nil {
return MakeJSONResponse(err)
}
return MakeJSONResponse(nil)
}

View File

@ -17,7 +17,7 @@ type wakuConfig struct {
EnableRelay *bool `json:"relay"`
RelayTopics []string `json:"relayTopics,omitempty"`
GossipSubParams *GossipSubParams `json:"gossipsubParams,omitempty"`
EnableFilter *bool `json:"filter,omitempty"`
EnableLegacyFilter *bool `json:"legacyFilter,omitempty"`
MinPeersToPublish *int `json:"minPeersToPublish,omitempty"`
EnableDiscV5 *bool `json:"discV5,omitempty"`
DiscV5BootstrapNodes []string `json:"discV5BootstrapNodes,omitempty"`
@ -33,7 +33,7 @@ var defaultPort = 60000
var defaultKeepAliveInterval = 20
var defaultEnableRelay = true
var defaultMinPeersToPublish = 0
var defaultEnableFilter = false
var defaultEnableLegacyFilter = false
var defaultEnableDiscV5 = false
var defaultDiscV5UDPPort = uint(9000)
var defaultLogLevel = "INFO"
@ -207,8 +207,8 @@ func getConfig(configJSON string) (wakuConfig, error) {
config.EnableRelay = &defaultEnableRelay
}
if config.EnableFilter == nil {
config.EnableFilter = &defaultEnableFilter
if config.EnableLegacyFilter == nil {
config.EnableLegacyFilter = &defaultEnableLegacyFilter
}
if config.EnableDiscV5 == nil {

View File

@ -50,8 +50,8 @@ type ContentFilter struct {
}
type WakuFilterPushResult struct {
err error
peerID peer.ID
Err error
PeerID peer.ID
}
// NewWakuRelay returns a new instance of Waku Filter struct setup according to the chosen parameter and options
@ -292,6 +292,41 @@ func (wf *WakuFilterLightnode) Subscriptions() []*SubscriptionDetails {
return output
}
func (wf *WakuFilterLightnode) cleanupSubscriptions(peerID peer.ID, contentFilter ContentFilter) {
wf.subscriptions.Lock()
defer wf.subscriptions.Unlock()
peerSubscription, ok := wf.subscriptions.items[peerID]
if !ok {
return
}
subscriptionDetailList, ok := peerSubscription.subscriptionsPerTopic[contentFilter.Topic]
if !ok {
return
}
for subscriptionDetailID, subscriptionDetail := range subscriptionDetailList {
subscriptionDetail.Remove(contentFilter.ContentTopics...)
if len(subscriptionDetail.ContentTopics) == 0 {
delete(subscriptionDetailList, subscriptionDetailID)
} else {
subscriptionDetailList[subscriptionDetailID] = subscriptionDetail
}
}
if len(subscriptionDetailList) == 0 {
delete(wf.subscriptions.items[peerID].subscriptionsPerTopic, contentFilter.Topic)
} else {
wf.subscriptions.items[peerID].subscriptionsPerTopic[contentFilter.Topic] = subscriptionDetailList
}
if len(wf.subscriptions.items[peerID].subscriptionsPerTopic) == 0 {
delete(wf.subscriptions.items, peerID)
}
}
// Unsubscribe is used to stop receiving messages from a peer that match a content filter
func (wf *WakuFilterLightnode) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterUnsubscribeOption) (<-chan WakuFilterPushResult, error) {
if contentFilter.Topic == "" {
@ -329,11 +364,14 @@ func (wf *WakuFilterLightnode) Unsubscribe(ctx context.Context, contentFilter Co
contentFilter)
if err != nil {
wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err))
return
}
wf.cleanupSubscriptions(peerID, contentFilter)
resultChan <- WakuFilterPushResult{
err: err,
peerID: peerID,
Err: err,
PeerID: peerID,
}
}(peerID)
}
@ -375,7 +413,7 @@ func (wf *WakuFilterLightnode) UnsubscribeAll(ctx context.Context, opts ...Filte
}
localWg.Add(1)
go func(peerID peer.ID) {
defer wf.wg.Done()
defer localWg.Done()
err := wf.request(
ctx,
&FilterSubscribeParameters{selectedPeer: peerID},
@ -385,9 +423,13 @@ func (wf *WakuFilterLightnode) UnsubscribeAll(ctx context.Context, opts ...Filte
wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err))
}
wf.subscriptions.Lock()
delete(wf.subscriptions.items, peerID)
defer wf.subscriptions.Unlock()
resultChan <- WakuFilterPushResult{
err: err,
peerID: peerID,
Err: err,
PeerID: peerID,
}
}(peerID)
}

View File

@ -23,5 +23,5 @@ func NewFilterError(code int, message string) FilterError {
}
func (e *FilterError) Error() string {
return fmt.Sprintf("error %d: %s", e.Code, e.Message)
return fmt.Sprintf("%d - %s", e.Code, e.Message)
}

View File

@ -1,6 +1,7 @@
package filter
import (
"encoding/json"
"sync"
"github.com/google/uuid"
@ -238,3 +239,22 @@ func iterateSubscriptionSet(logger *zap.Logger, subscriptions SubscriptionSet, e
}(subscription)
}
}
func (s *SubscriptionDetails) MarshalJSON() ([]byte, error) {
type resultType struct {
PeerID string `json:"peerID"`
PubsubTopic string `json:"pubsubTopic"`
ContentTopics []string `json:"contentTopics"`
}
result := resultType{
PeerID: s.PeerID.Pretty(),
PubsubTopic: s.PubsubTopic,
}
for c := range s.ContentTopics {
result.ContentTopics = append(result.ContentTopics, c)
}
return json.Marshal(result)
}