go-waku/library/filter.go

218 lines
5.3 KiB
Go
Raw Permalink Normal View History

2023-08-10 13:30:38 +00:00
package library
import (
"context"
"encoding/json"
2023-06-22 18:55:51 +00:00
"errors"
"time"
2022-10-19 19:39:32 +00:00
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol"
2023-06-22 18:55:51 +00:00
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
"github.com/waku-org/go-waku/waku/v2/utils"
)
2023-08-10 13:30:38 +00:00
type filterArgument struct {
PubsubTopic string `json:"pubsubTopic,omitempty"`
2023-06-22 18:55:51 +00:00
ContentTopics []string `json:"contentTopics,omitempty"`
}
func toContentFilter(filterJSON string) (protocol.ContentFilter, error) {
2023-08-10 13:30:38 +00:00
var f filterArgument
err := json.Unmarshal([]byte(filterJSON), &f)
if err != nil {
return protocol.ContentFilter{}, err
}
return protocol.ContentFilter{
PubsubTopic: f.PubsubTopic,
ContentTopics: protocol.NewContentTopicSet(f.ContentTopics...),
2023-06-22 18:55:51 +00:00
}, nil
}
type subscribeResult struct {
Subscriptions []*subscription.SubscriptionDetails `json:"subscriptions"`
Error string `json:"error,omitempty"`
}
2023-08-10 13:30:38 +00:00
// FilterSubscribe is used to create a subscription to a filter node to receive messages
func FilterSubscribe(instance *WakuInstance, filterJSON string, peerID string, ms int) (string, error) {
cf, err := toContentFilter(filterJSON)
if err != nil {
2023-08-10 13:30:38 +00:00
return "", err
}
if err := validateInstance(instance, MustBeStarted); err != nil {
return "", err
}
var ctx context.Context
var cancel context.CancelFunc
if ms > 0 {
ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond)
defer cancel()
} else {
ctx = instance.ctx
}
2023-06-22 18:55:51 +00:00
var fOptions []filter.FilterSubscribeOption
if peerID != "" {
p, err := peer.Decode(peerID)
if err != nil {
2023-08-10 13:30:38 +00:00
return "", err
}
2023-06-22 18:55:51 +00:00
fOptions = append(fOptions, filter.WithPeer(p))
} else {
2023-06-22 18:55:51 +00:00
fOptions = append(fOptions, filter.WithAutomaticPeerSelection())
}
subscriptions, err := instance.node.FilterLightnode().Subscribe(ctx, cf, fOptions...)
if err != nil && subscriptions == nil {
2023-08-10 13:30:38 +00:00
return "", err
}
for _, subscriptionDetails := range subscriptions {
go func(subscriptionDetails *subscription.SubscriptionDetails) {
defer utils.LogOnPanic()
for envelope := range subscriptionDetails.C {
send(instance, "message", toSubscriptionMessage(envelope))
}
}(subscriptionDetails)
}
var subResult subscribeResult
subResult.Subscriptions = subscriptions
2023-10-28 23:37:53 +00:00
if err != nil {
subResult.Error = err.Error()
}
return marshalJSON(subResult)
2023-06-22 18:55:51 +00:00
}
2023-08-10 13:30:38 +00:00
// FilterPing is used to determine if a peer has an active subscription
func FilterPing(instance *WakuInstance, peerID string, ms int) error {
if err := validateInstance(instance, MustBeStarted); err != nil {
return err
2023-06-22 18:55:51 +00:00
}
var ctx context.Context
var cancel context.CancelFunc
if ms > 0 {
ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond)
2023-06-22 18:55:51 +00:00
defer cancel()
} else {
ctx = instance.ctx
2023-06-22 18:55:51 +00:00
}
var pID peer.ID
var err error
if peerID != "" {
pID, err = peer.Decode(peerID)
if err != nil {
2023-08-10 13:30:38 +00:00
return err
2023-06-22 18:55:51 +00:00
}
} else {
2023-08-10 13:30:38 +00:00
return errors.New("peerID is required")
2023-06-22 18:55:51 +00:00
}
return instance.node.FilterLightnode().Ping(ctx, pID)
}
2023-08-10 13:30:38 +00:00
// FilterUnsubscribe is used to remove a filter criteria from an active subscription with a filter node
func FilterUnsubscribe(instance *WakuInstance, filterJSON string, peerID string, ms int) error {
cf, err := toContentFilter(filterJSON)
if err != nil {
2023-08-10 13:30:38 +00:00
return err
}
if err := validateInstance(instance, MustBeStarted); err != nil {
return err
}
var ctx context.Context
var cancel context.CancelFunc
if ms > 0 {
ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond)
defer cancel()
} else {
ctx = instance.ctx
}
var fOptions []filter.FilterSubscribeOption
2023-06-22 18:55:51 +00:00
if peerID != "" {
p, err := peer.Decode(peerID)
if err != nil {
2023-08-10 13:30:38 +00:00
return err
2023-06-22 18:55:51 +00:00
}
fOptions = append(fOptions, filter.WithPeer(p))
2023-06-22 18:55:51 +00:00
} else {
2023-08-10 13:30:38 +00:00
return errors.New("peerID is required")
2023-06-22 18:55:51 +00:00
}
result, err := instance.node.FilterLightnode().Unsubscribe(ctx, cf, fOptions...)
if err != nil {
2023-08-10 13:30:38 +00:00
return err
}
errs := result.Errors()
if len(errs) == 0 {
return nil
}
return errs[0].Err
2023-06-22 18:55:51 +00:00
}
type unsubscribeAllResult struct {
PeerID string `json:"peerID"`
Error string `json:"error"`
}
2023-08-10 13:30:38 +00:00
// FilterUnsubscribeAll is used to remove an active subscription to a peer. If no peerID is defined, it will stop all active filter subscriptions
func FilterUnsubscribeAll(instance *WakuInstance, peerID string, ms int) (string, error) {
if err := validateInstance(instance, MustBeStarted); err != nil {
return "", err
2023-06-22 18:55:51 +00:00
}
var ctx context.Context
var cancel context.CancelFunc
if ms > 0 {
ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond)
2023-06-22 18:55:51 +00:00
defer cancel()
} else {
ctx = instance.ctx
2023-06-22 18:55:51 +00:00
}
var fOptions []filter.FilterSubscribeOption
2023-06-22 18:55:51 +00:00
if peerID != "" {
p, err := peer.Decode(peerID)
if err != nil {
2023-08-10 13:30:38 +00:00
return "", err
2023-06-22 18:55:51 +00:00
}
fOptions = append(fOptions, filter.WithPeer(p))
2023-06-22 18:55:51 +00:00
} else {
fOptions = append(fOptions, filter.UnsubscribeAll())
}
result, err := instance.node.FilterLightnode().UnsubscribeAll(ctx, fOptions...)
2023-06-22 18:55:51 +00:00
if err != nil {
2023-08-10 13:30:38 +00:00
return "", err
2023-06-22 18:55:51 +00:00
}
var unsubscribeResult []unsubscribeAllResult
for _, err := range result.Errors() {
2023-06-22 18:55:51 +00:00
ur := unsubscribeAllResult{
PeerID: err.PeerID.String(),
2023-06-22 18:55:51 +00:00
}
if err.Err != nil {
ur.Error = err.Err.Error()
2023-06-22 18:55:51 +00:00
}
unsubscribeResult = append(unsubscribeResult, ur)
}
2023-08-10 13:30:38 +00:00
return marshalJSON(unsubscribeResult)
}