mirror of https://github.com/status-im/go-waku.git
refactor: peer selection and filter API (#87)
This commit is contained in:
parent
5f3df9343c
commit
e482075fa9
|
@ -36,7 +36,7 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
|
|||
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
|
||||
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
|
||||
collectd.org v0.3.0/go.mod h1:A/8DzQBkF6abtvrT2j/AU/4tiBgJWYyh0y/oB/4MlWE=
|
||||
contrib.go.opencensus.io/exporter/prometheus v0.3.0/go.mod h1:rpCPVQKhiyH8oomWgm34ZmgIdZa8OVYO5WAIygPbBBE=
|
||||
contrib.go.opencensus.io/exporter/prometheus v0.4.0/go.mod h1:o7cosnyfuPVK0tB8q0QmaQNhGnptITnPQB+z1+qeFB0=
|
||||
dmitri.shuralyov.com/app/changes v0.0.0-20180602232624-0a106ad413e3/go.mod h1:Yl+fi1br7+Rr3LqpNJf1/uxUdtRUV+Tnj0o93V2B9MU=
|
||||
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
||||
dmitri.shuralyov.com/html/belt v0.0.0-20180602232347-f7d459c86be0/go.mod h1:JLBrvjyP0v+ecvNYvCpyZgu5/xkfAUhi6wJj28eUfSU=
|
||||
|
@ -467,7 +467,6 @@ github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlT
|
|||
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
|
||||
github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
|
||||
github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
|
||||
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
|
||||
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
|
||||
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
|
||||
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
|
||||
|
@ -923,7 +922,6 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP
|
|||
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs=
|
||||
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
|
||||
github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeDPbaTKGT+JTgUa3og=
|
||||
github.com/prometheus/client_golang v1.6.0/go.mod h1:ZLOG9ck3JLRdB5MgO8f+lLTe83AXG6ro35rLTxvnIl4=
|
||||
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
|
||||
github.com/prometheus/client_golang v1.9.0/go.mod h1:FqZLKOZnGdFAhOK4nqGHa7D66IdsO+O441Eve7ptJDU=
|
||||
github.com/prometheus/client_golang v1.10.0/go.mod h1:WJM3cc3yu7XKBKa/I8WeZm+V3eltZnBwfENSU7mdogU=
|
||||
|
@ -942,7 +940,6 @@ github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y8
|
|||
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
|
||||
github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+Zk0j9GMYc=
|
||||
github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA=
|
||||
github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4=
|
||||
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
|
||||
github.com/prometheus/common v0.15.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s=
|
||||
github.com/prometheus/common v0.18.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s=
|
||||
|
@ -955,13 +952,11 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R
|
|||
github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
|
||||
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
|
||||
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
|
||||
github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
|
||||
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
|
||||
github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
|
||||
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
|
||||
github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
|
||||
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
|
||||
github.com/prometheus/statsd_exporter v0.20.0/go.mod h1:YL3FWCG8JBBtaUSxAg4Gz2ZYu22bS84XM89ZQXXTWmQ=
|
||||
github.com/prometheus/statsd_exporter v0.21.0/go.mod h1:rbT83sZq2V+p73lHhPZfMc3MLCHmSHelCh9hSGYNLTQ=
|
||||
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
|
||||
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
|
||||
|
@ -1027,10 +1022,8 @@ github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn
|
|||
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
|
||||
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
|
||||
github.com/src-d/envconfig v1.0.0/go.mod h1:Q9YQZ7BKITldTBnoxsE5gOeB5y66RyPXeue/R4aaNBc=
|
||||
github.com/status-im/go-waku v0.0.0-20211012131444-baf57d82a30a h1:7P3E7XF9kJ05pPI6gzAk5wzh8i5OZ/BDYjY7srroVQ4=
|
||||
github.com/status-im/go-waku v0.0.0-20211012131444-baf57d82a30a/go.mod h1:CZV26mf0bzp94VE9QxxhpM4uFByn4UX698VUPalZMgQ=
|
||||
github.com/status-im/go-waku-rendezvous v0.0.0-20211005020656-b53661c58574 h1:v2gpjWKyI+vZZugjjhPDqIhg6uNrGLusHh3ilvbv8/Y=
|
||||
github.com/status-im/go-waku-rendezvous v0.0.0-20211005020656-b53661c58574/go.mod h1:Fa1uJjMz9MpfZc2tC5xdN9q90xg1VphSnevxWiBbFO0=
|
||||
github.com/status-im/go-waku-rendezvous v0.0.0-20211016214658-a0d71f947cee h1:IczLt9Rd1QVFd7Llt3Eoresj7betaoIgAbL65YUBVPQ=
|
||||
github.com/status-im/go-waku-rendezvous v0.0.0-20211016214658-a0d71f947cee/go.mod h1:Fa1uJjMz9MpfZc2tC5xdN9q90xg1VphSnevxWiBbFO0=
|
||||
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q=
|
||||
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
|
||||
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
|
||||
|
@ -1311,7 +1304,6 @@ golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7w
|
|||
golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200331124033-c3d80250170d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200501052902-10377860bb8e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200511232937-7e40ca221e25/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200515095857-1151b9dac4a9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
|
|
|
@ -85,7 +85,12 @@ func main() {
|
|||
//
|
||||
|
||||
// Send FilterRequest from light node to full node
|
||||
_, filterChan, err := lightNode.SubscribeFilter(ctx, string(pubSubTopic), []string{contentTopic})
|
||||
cf := filter.ContentFilter{
|
||||
Topic: string(pubSubTopic),
|
||||
ContentTopics: []string{contentTopic},
|
||||
}
|
||||
|
||||
_, filterChan, err := lightNode.SubscribeFilter(ctx, cf)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
@ -103,7 +108,7 @@ func main() {
|
|||
go func() {
|
||||
// Unsubscribe filter after 5 seconds
|
||||
time.Sleep(5 * time.Second)
|
||||
lightNode.UnsubscribeFilter(ctx, string(pubSubTopic), []string{contentTopic})
|
||||
lightNode.UnsubscribeFilter(ctx, cf)
|
||||
}()
|
||||
// Wait for a SIGINT or SIGTERM signal
|
||||
ch := make(chan os.Signal, 1)
|
||||
|
|
|
@ -466,7 +466,7 @@ func (node *WakuNode) subscribeToTopic(t relay.Topic, subscription *Subscription
|
|||
|
||||
// Wrapper around WakuFilter.Subscribe
|
||||
// that adds a Filter object to node.filters
|
||||
func (node *WakuNode) SubscribeFilter(ctx context.Context, topic string, contentTopics []string) (filterID string, ch chan *protocol.Envelope, err error) {
|
||||
func (node *WakuNode) SubscribeFilter(ctx context.Context, f filter.ContentFilter) (filterID string, ch chan *protocol.Envelope, err error) {
|
||||
if node.filter == nil {
|
||||
err = errors.New("WakuFilter is not set")
|
||||
return
|
||||
|
@ -477,25 +477,24 @@ func (node *WakuNode) SubscribeFilter(ctx context.Context, topic string, content
|
|||
|
||||
// Registers for messages that match a specific filter. Triggers the handler whenever a message is received.
|
||||
// ContentFilterChan takes MessagePush structs
|
||||
var peerID *peer.ID
|
||||
filterID, peerID, err = node.filter.Subscribe(ctx, topic, contentTopics)
|
||||
if filterID == "" || err != nil {
|
||||
subs, err := node.filter.Subscribe(ctx, f)
|
||||
if subs.RequestID == "" || err != nil {
|
||||
// Failed to subscribe
|
||||
log.Error("remote subscription to filter failed")
|
||||
log.Error("remote subscription to filter failed", err)
|
||||
return
|
||||
}
|
||||
|
||||
ch = make(chan *protocol.Envelope, 1024) // To avoid blocking
|
||||
|
||||
// Register handler for filter, whether remote subscription succeeded or not
|
||||
node.filters[filterID] = filter.Filter{
|
||||
PeerID: *peerID,
|
||||
Topic: topic,
|
||||
ContentFilters: contentTopics,
|
||||
node.filters[subs.RequestID] = filter.Filter{
|
||||
PeerID: subs.Peer,
|
||||
Topic: f.Topic,
|
||||
ContentFilters: f.ContentTopics,
|
||||
Chan: ch,
|
||||
}
|
||||
|
||||
return
|
||||
return subs.RequestID, ch, nil
|
||||
}
|
||||
|
||||
// UnsubscribeFilterByID removes a subscription to a filter node completely
|
||||
|
@ -508,7 +507,12 @@ func (node *WakuNode) UnsubscribeFilterByID(ctx context.Context, filterID string
|
|||
return errors.New("filter not found")
|
||||
}
|
||||
|
||||
err := node.filter.Unsubscribe(ctx, f.Topic, f.ContentFilters, f.PeerID)
|
||||
cf := filter.ContentFilter{
|
||||
Topic: f.Topic,
|
||||
ContentTopics: f.ContentFilters,
|
||||
}
|
||||
|
||||
err := node.filter.Unsubscribe(ctx, cf, f.PeerID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -521,16 +525,16 @@ func (node *WakuNode) UnsubscribeFilterByID(ctx context.Context, filterID string
|
|||
|
||||
// Unsubscribe filter removes content topics from a filter subscription. If all
|
||||
// the contentTopics are removed the subscription is dropped completely
|
||||
func (node *WakuNode) UnsubscribeFilter(ctx context.Context, topic string, contentTopics []string) error {
|
||||
func (node *WakuNode) UnsubscribeFilter(ctx context.Context, cf filter.ContentFilter) error {
|
||||
// Remove local filter
|
||||
var idsToRemove []string
|
||||
for id, f := range node.filters {
|
||||
if f.Topic != topic {
|
||||
if f.Topic != cf.Topic {
|
||||
continue
|
||||
}
|
||||
|
||||
// Send message to full node in order to unsubscribe
|
||||
err := node.filter.Unsubscribe(ctx, topic, contentTopics, f.PeerID)
|
||||
err := node.filter.Unsubscribe(ctx, cf, f.PeerID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -538,7 +542,7 @@ func (node *WakuNode) UnsubscribeFilter(ctx context.Context, topic string, conte
|
|||
// Iterate filter entries to remove matching content topics
|
||||
// make sure we delete the content filter
|
||||
// if no more topics are left
|
||||
for _, cfToDelete := range contentTopics {
|
||||
for _, cfToDelete := range cf.ContentTopics {
|
||||
for i, cf := range f.ContentFilters {
|
||||
if cf == cfToDelete {
|
||||
l := len(f.ContentFilters) - 1
|
||||
|
|
|
@ -3,6 +3,7 @@ package filter
|
|||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
logging "github.com/ipfs/go-log"
|
||||
|
@ -21,7 +22,18 @@ import (
|
|||
|
||||
var log = logging.Logger("wakufilter")
|
||||
|
||||
var (
|
||||
ErrNoPeersAvailable = errors.New("no suitable remote peers")
|
||||
)
|
||||
|
||||
type (
|
||||
FilterSubscribeParameters struct {
|
||||
host host.Host
|
||||
selectedPeer peer.ID
|
||||
}
|
||||
|
||||
FilterSubscribeOption func(*FilterSubscribeParameters)
|
||||
|
||||
Filter struct {
|
||||
PeerID peer.ID
|
||||
Topic string
|
||||
|
@ -29,6 +41,11 @@ type (
|
|||
Chan chan *protocol.Envelope
|
||||
}
|
||||
|
||||
ContentFilter struct {
|
||||
Topic string
|
||||
ContentTopics []string
|
||||
}
|
||||
|
||||
// @TODO MAYBE MORE INFO?
|
||||
Filters map[string]Filter
|
||||
|
||||
|
@ -38,6 +55,11 @@ type (
|
|||
filter pb.FilterRequest // @TODO MAKE THIS A SEQUENCE AGAIN?
|
||||
}
|
||||
|
||||
FilterSubscription struct {
|
||||
RequestID string
|
||||
Peer peer.ID
|
||||
}
|
||||
|
||||
MessagePushHandler func(requestId string, msg pb.MessagePush)
|
||||
|
||||
WakuFilter struct {
|
||||
|
@ -55,6 +77,29 @@ type (
|
|||
|
||||
const FilterID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter/2.0.0-beta1")
|
||||
|
||||
func WithPeer(p peer.ID) FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) {
|
||||
params.selectedPeer = p
|
||||
}
|
||||
}
|
||||
|
||||
func WithAutomaticPeerSelection() FilterSubscribeOption {
|
||||
return func(params *FilterSubscribeParameters) {
|
||||
p, err := utils.SelectPeer(params.host, string(FilterID_v20beta1))
|
||||
if err == nil {
|
||||
params.selectedPeer = *p
|
||||
} else {
|
||||
log.Info("Error selecting peer: ", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func DefaultOptions() []FilterSubscribeOption {
|
||||
return []FilterSubscribeOption{
|
||||
WithAutomaticPeerSelection(),
|
||||
}
|
||||
}
|
||||
|
||||
func (filters *Filters) Notify(msg *pb.WakuMessage, requestId string) {
|
||||
for key, filter := range *filters {
|
||||
envelope := protocol.NewEnvelope(msg, filter.Topic)
|
||||
|
@ -231,25 +276,33 @@ func (wf *WakuFilter) FilterListener() {
|
|||
// Having a FilterRequest struct,
|
||||
// select a peer with filter support, dial it,
|
||||
// and submit FilterRequest wrapped in FilterRPC
|
||||
func (wf *WakuFilter) Subscribe(ctx context.Context, topic string, contentTopics []string) (requestID string, peer *peer.ID, err error) {
|
||||
func (wf *WakuFilter) Subscribe(ctx context.Context, filter ContentFilter, opts ...FilterSubscribeOption) (subscription *FilterSubscription, err error) {
|
||||
params := new(FilterSubscribeParameters)
|
||||
params.host = wf.h
|
||||
|
||||
optList := DefaultOptions()
|
||||
optList = append(optList, opts...)
|
||||
for _, opt := range optList {
|
||||
opt(params)
|
||||
}
|
||||
|
||||
if params.selectedPeer == "" {
|
||||
return nil, ErrNoPeersAvailable
|
||||
}
|
||||
|
||||
var contentFilters []*pb.FilterRequest_ContentFilter
|
||||
for _, ct := range contentTopics {
|
||||
for _, ct := range filter.ContentTopics {
|
||||
contentFilters = append(contentFilters, &pb.FilterRequest_ContentFilter{ContentTopic: ct})
|
||||
}
|
||||
|
||||
request := pb.FilterRequest{
|
||||
Subscribe: true,
|
||||
Topic: topic,
|
||||
Topic: filter.Topic,
|
||||
ContentFilters: contentFilters,
|
||||
}
|
||||
|
||||
peer, err = utils.SelectPeer(wf.h, string(FilterID_v20beta1))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
var conn network.Stream
|
||||
conn, err = wf.h.NewStream(ctx, *peer, FilterID_v20beta1)
|
||||
conn, err = wf.h.NewStream(ctx, params.selectedPeer, FilterID_v20beta1)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
@ -257,7 +310,7 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, topic string, contentTopics
|
|||
defer conn.Close()
|
||||
|
||||
// This is the only successful path to subscription
|
||||
requestID = hex.EncodeToString(protocol.GenerateRequestId())
|
||||
requestID := hex.EncodeToString(protocol.GenerateRequestId())
|
||||
|
||||
writer := protoio.NewDelimitedWriter(conn)
|
||||
filterRPC := &pb.FilterRPC{RequestId: requestID, Request: &request}
|
||||
|
@ -268,10 +321,14 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, topic string, contentTopics
|
|||
return
|
||||
}
|
||||
|
||||
subscription = new(FilterSubscription)
|
||||
subscription.Peer = params.selectedPeer
|
||||
subscription.RequestID = requestID
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (wf *WakuFilter) Unsubscribe(ctx context.Context, topic string, contentTopics []string, peer peer.ID) error {
|
||||
func (wf *WakuFilter) Unsubscribe(ctx context.Context, filter ContentFilter, peer peer.ID) error {
|
||||
conn, err := wf.h.NewStream(ctx, peer, FilterID_v20beta1)
|
||||
|
||||
if err != nil {
|
||||
|
@ -284,13 +341,13 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, topic string, contentTopi
|
|||
id := protocol.GenerateRequestId()
|
||||
|
||||
var contentFilters []*pb.FilterRequest_ContentFilter
|
||||
for _, ct := range contentTopics {
|
||||
for _, ct := range filter.ContentTopics {
|
||||
contentFilters = append(contentFilters, &pb.FilterRequest_ContentFilter{ContentTopic: ct})
|
||||
}
|
||||
|
||||
request := pb.FilterRequest{
|
||||
Subscribe: false,
|
||||
Topic: topic,
|
||||
Topic: filter.Topic,
|
||||
ContentFilters: contentFilters,
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue