go-waku/library/legacy_filter.go

106 lines
2.7 KiB
Go
Raw Normal View History

2023-08-10 13:30:38 +00:00
package library
2023-06-22 18:55:51 +00:00
import (
"context"
"encoding/json"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb"
)
2023-08-10 13:30:38 +00:00
type legacyFilterArgument struct {
2023-06-22 18:55:51 +00:00
Topic string `json:"pubsubTopic,omitempty"`
ContentFilters []*pb.FilterRequest_ContentFilter `json:"contentFilters,omitempty"`
}
func toLegacyContentFilter(filterJSON string) (legacy_filter.ContentFilter, error) {
2023-08-10 13:30:38 +00:00
var f legacyFilterArgument
2023-06-22 18:55:51 +00:00
err := json.Unmarshal([]byte(filterJSON), &f)
if err != nil {
return legacy_filter.ContentFilter{}, err
}
result := legacy_filter.ContentFilter{
Topic: f.Topic,
}
for _, cf := range f.ContentFilters {
result.ContentTopics = append(result.ContentTopics, cf.ContentTopic)
}
return result, err
}
2023-08-10 13:30:38 +00:00
// LegacyFilterSubscribe is used to create a subscription to a filter node to receive messages
2023-06-22 18:55:51 +00:00
// Deprecated: Use FilterSubscribe instead
func LegacyFilterSubscribe(instance *WakuInstance, filterJSON string, peerID string, ms int) error {
2023-06-22 18:55:51 +00:00
cf, err := toLegacyContentFilter(filterJSON)
if err != nil {
2023-08-10 13:30:38 +00:00
return err
2023-06-22 18:55:51 +00:00
}
if err := validateInstance(instance, MustBeStarted); err != nil {
return err
2023-06-22 18:55:51 +00:00
}
var ctx context.Context
var cancel context.CancelFunc
if ms > 0 {
ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond)
2023-06-22 18:55:51 +00:00
defer cancel()
} else {
ctx = instance.ctx
2023-06-22 18:55:51 +00:00
}
var fOptions []legacy_filter.FilterSubscribeOption
if peerID != "" {
p, err := peer.Decode(peerID)
if err != nil {
2023-08-10 13:30:38 +00:00
return err
2023-06-22 18:55:51 +00:00
}
fOptions = append(fOptions, legacy_filter.WithPeer(p))
} else {
fOptions = append(fOptions, legacy_filter.WithAutomaticPeerSelection())
}
_, f, err := instance.node.LegacyFilter().Subscribe(ctx, cf, fOptions...)
2023-06-22 18:55:51 +00:00
if err != nil {
2023-08-10 13:30:38 +00:00
return err
2023-06-22 18:55:51 +00:00
}
go func(f legacy_filter.Filter) {
for envelope := range f.Chan {
send(instance, "message", toSubscriptionMessage(envelope))
2023-06-22 18:55:51 +00:00
}
}(f)
2023-08-10 13:30:38 +00:00
return nil
2023-06-22 18:55:51 +00:00
}
2023-08-10 13:30:38 +00:00
// LegacyFilterUnsubscribe is used to remove a filter criteria from an active subscription with a filter node
2023-06-22 18:55:51 +00:00
// Deprecated: Use FilterUnsubscribe or FilterUnsubscribeAll instead
func LegacyFilterUnsubscribe(instance *WakuInstance, filterJSON string, ms int) error {
2023-06-22 18:55:51 +00:00
cf, err := toLegacyContentFilter(filterJSON)
if err != nil {
2023-08-10 13:30:38 +00:00
return err
2023-06-22 18:55:51 +00:00
}
if err := validateInstance(instance, MustBeStarted); err != nil {
return err
2023-06-22 18:55:51 +00:00
}
var ctx context.Context
var cancel context.CancelFunc
if ms > 0 {
ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond)
2023-06-22 18:55:51 +00:00
defer cancel()
} else {
ctx = instance.ctx
2023-06-22 18:55:51 +00:00
}
return instance.node.LegacyFilter().UnsubscribeFilter(ctx, cf)
2023-06-22 18:55:51 +00:00
}