diff --git a/examples/filter2/go.sum b/examples/filter2/go.sum index c6bdf2be..1e6fc48d 100644 --- a/examples/filter2/go.sum +++ b/examples/filter2/go.sum @@ -36,7 +36,7 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= collectd.org v0.3.0/go.mod h1:A/8DzQBkF6abtvrT2j/AU/4tiBgJWYyh0y/oB/4MlWE= -contrib.go.opencensus.io/exporter/prometheus v0.3.0/go.mod h1:rpCPVQKhiyH8oomWgm34ZmgIdZa8OVYO5WAIygPbBBE= +contrib.go.opencensus.io/exporter/prometheus v0.4.0/go.mod h1:o7cosnyfuPVK0tB8q0QmaQNhGnptITnPQB+z1+qeFB0= dmitri.shuralyov.com/app/changes v0.0.0-20180602232624-0a106ad413e3/go.mod h1:Yl+fi1br7+Rr3LqpNJf1/uxUdtRUV+Tnj0o93V2B9MU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/html/belt v0.0.0-20180602232347-f7d459c86be0/go.mod h1:JLBrvjyP0v+ecvNYvCpyZgu5/xkfAUhi6wJj28eUfSU= @@ -467,7 +467,6 @@ github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlT github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= -github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= @@ -923,7 +922,6 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeDPbaTKGT+JTgUa3og= -github.com/prometheus/client_golang v1.6.0/go.mod h1:ZLOG9ck3JLRdB5MgO8f+lLTe83AXG6ro35rLTxvnIl4= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= github.com/prometheus/client_golang v1.9.0/go.mod h1:FqZLKOZnGdFAhOK4nqGHa7D66IdsO+O441Eve7ptJDU= github.com/prometheus/client_golang v1.10.0/go.mod h1:WJM3cc3yu7XKBKa/I8WeZm+V3eltZnBwfENSU7mdogU= @@ -942,7 +940,6 @@ github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y8 github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+Zk0j9GMYc= github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA= -github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4= github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= github.com/prometheus/common v0.15.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= github.com/prometheus/common v0.18.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s= @@ -955,13 +952,11 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= -github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU= github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= -github.com/prometheus/statsd_exporter v0.20.0/go.mod h1:YL3FWCG8JBBtaUSxAg4Gz2ZYu22bS84XM89ZQXXTWmQ= github.com/prometheus/statsd_exporter v0.21.0/go.mod h1:rbT83sZq2V+p73lHhPZfMc3MLCHmSHelCh9hSGYNLTQ= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= @@ -1027,10 +1022,8 @@ github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnIn github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= github.com/src-d/envconfig v1.0.0/go.mod h1:Q9YQZ7BKITldTBnoxsE5gOeB5y66RyPXeue/R4aaNBc= -github.com/status-im/go-waku v0.0.0-20211012131444-baf57d82a30a h1:7P3E7XF9kJ05pPI6gzAk5wzh8i5OZ/BDYjY7srroVQ4= -github.com/status-im/go-waku v0.0.0-20211012131444-baf57d82a30a/go.mod h1:CZV26mf0bzp94VE9QxxhpM4uFByn4UX698VUPalZMgQ= -github.com/status-im/go-waku-rendezvous v0.0.0-20211005020656-b53661c58574 h1:v2gpjWKyI+vZZugjjhPDqIhg6uNrGLusHh3ilvbv8/Y= -github.com/status-im/go-waku-rendezvous v0.0.0-20211005020656-b53661c58574/go.mod h1:Fa1uJjMz9MpfZc2tC5xdN9q90xg1VphSnevxWiBbFO0= +github.com/status-im/go-waku-rendezvous v0.0.0-20211016214658-a0d71f947cee h1:IczLt9Rd1QVFd7Llt3Eoresj7betaoIgAbL65YUBVPQ= +github.com/status-im/go-waku-rendezvous v0.0.0-20211016214658-a0d71f947cee/go.mod h1:Fa1uJjMz9MpfZc2tC5xdN9q90xg1VphSnevxWiBbFO0= github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4/go.mod h1:RZLeN1LMWmRsyYjvAu+I6Dm9QmlDaIIt+Y+4Kd7Tp+Q= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= @@ -1311,7 +1304,6 @@ golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200331124033-c3d80250170d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200501052902-10377860bb8e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200511232937-7e40ca221e25/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200515095857-1151b9dac4a9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/examples/filter2/main.go b/examples/filter2/main.go index 70920ead..2d6fe161 100644 --- a/examples/filter2/main.go +++ b/examples/filter2/main.go @@ -85,7 +85,12 @@ func main() { // // Send FilterRequest from light node to full node - _, filterChan, err := lightNode.SubscribeFilter(ctx, string(pubSubTopic), []string{contentTopic}) + cf := filter.ContentFilter{ + Topic: string(pubSubTopic), + ContentTopics: []string{contentTopic}, + } + + _, filterChan, err := lightNode.SubscribeFilter(ctx, cf) if err != nil { panic(err) } @@ -103,7 +108,7 @@ func main() { go func() { // Unsubscribe filter after 5 seconds time.Sleep(5 * time.Second) - lightNode.UnsubscribeFilter(ctx, string(pubSubTopic), []string{contentTopic}) + lightNode.UnsubscribeFilter(ctx, cf) }() // Wait for a SIGINT or SIGTERM signal ch := make(chan os.Signal, 1) diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index c184704f..e38caea9 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -466,7 +466,7 @@ func (node *WakuNode) subscribeToTopic(t relay.Topic, subscription *Subscription // Wrapper around WakuFilter.Subscribe // that adds a Filter object to node.filters -func (node *WakuNode) SubscribeFilter(ctx context.Context, topic string, contentTopics []string) (filterID string, ch chan *protocol.Envelope, err error) { +func (node *WakuNode) SubscribeFilter(ctx context.Context, f filter.ContentFilter) (filterID string, ch chan *protocol.Envelope, err error) { if node.filter == nil { err = errors.New("WakuFilter is not set") return @@ -477,25 +477,24 @@ func (node *WakuNode) SubscribeFilter(ctx context.Context, topic string, content // Registers for messages that match a specific filter. Triggers the handler whenever a message is received. // ContentFilterChan takes MessagePush structs - var peerID *peer.ID - filterID, peerID, err = node.filter.Subscribe(ctx, topic, contentTopics) - if filterID == "" || err != nil { + subs, err := node.filter.Subscribe(ctx, f) + if subs.RequestID == "" || err != nil { // Failed to subscribe - log.Error("remote subscription to filter failed") + log.Error("remote subscription to filter failed", err) return } ch = make(chan *protocol.Envelope, 1024) // To avoid blocking // Register handler for filter, whether remote subscription succeeded or not - node.filters[filterID] = filter.Filter{ - PeerID: *peerID, - Topic: topic, - ContentFilters: contentTopics, + node.filters[subs.RequestID] = filter.Filter{ + PeerID: subs.Peer, + Topic: f.Topic, + ContentFilters: f.ContentTopics, Chan: ch, } - return + return subs.RequestID, ch, nil } // UnsubscribeFilterByID removes a subscription to a filter node completely @@ -508,7 +507,12 @@ func (node *WakuNode) UnsubscribeFilterByID(ctx context.Context, filterID string return errors.New("filter not found") } - err := node.filter.Unsubscribe(ctx, f.Topic, f.ContentFilters, f.PeerID) + cf := filter.ContentFilter{ + Topic: f.Topic, + ContentTopics: f.ContentFilters, + } + + err := node.filter.Unsubscribe(ctx, cf, f.PeerID) if err != nil { return err } @@ -521,16 +525,16 @@ func (node *WakuNode) UnsubscribeFilterByID(ctx context.Context, filterID string // Unsubscribe filter removes content topics from a filter subscription. If all // the contentTopics are removed the subscription is dropped completely -func (node *WakuNode) UnsubscribeFilter(ctx context.Context, topic string, contentTopics []string) error { +func (node *WakuNode) UnsubscribeFilter(ctx context.Context, cf filter.ContentFilter) error { // Remove local filter var idsToRemove []string for id, f := range node.filters { - if f.Topic != topic { + if f.Topic != cf.Topic { continue } // Send message to full node in order to unsubscribe - err := node.filter.Unsubscribe(ctx, topic, contentTopics, f.PeerID) + err := node.filter.Unsubscribe(ctx, cf, f.PeerID) if err != nil { return err } @@ -538,7 +542,7 @@ func (node *WakuNode) UnsubscribeFilter(ctx context.Context, topic string, conte // Iterate filter entries to remove matching content topics // make sure we delete the content filter // if no more topics are left - for _, cfToDelete := range contentTopics { + for _, cfToDelete := range cf.ContentTopics { for i, cf := range f.ContentFilters { if cf == cfToDelete { l := len(f.ContentFilters) - 1 diff --git a/waku/v2/protocol/filter/waku_filter.go b/waku/v2/protocol/filter/waku_filter.go index 83c97210..93ba7872 100644 --- a/waku/v2/protocol/filter/waku_filter.go +++ b/waku/v2/protocol/filter/waku_filter.go @@ -3,6 +3,7 @@ package filter import ( "context" "encoding/hex" + "errors" "fmt" logging "github.com/ipfs/go-log" @@ -21,7 +22,18 @@ import ( var log = logging.Logger("wakufilter") +var ( + ErrNoPeersAvailable = errors.New("no suitable remote peers") +) + type ( + FilterSubscribeParameters struct { + host host.Host + selectedPeer peer.ID + } + + FilterSubscribeOption func(*FilterSubscribeParameters) + Filter struct { PeerID peer.ID Topic string @@ -29,6 +41,11 @@ type ( Chan chan *protocol.Envelope } + ContentFilter struct { + Topic string + ContentTopics []string + } + // @TODO MAYBE MORE INFO? Filters map[string]Filter @@ -38,6 +55,11 @@ type ( filter pb.FilterRequest // @TODO MAKE THIS A SEQUENCE AGAIN? } + FilterSubscription struct { + RequestID string + Peer peer.ID + } + MessagePushHandler func(requestId string, msg pb.MessagePush) WakuFilter struct { @@ -55,6 +77,29 @@ type ( const FilterID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter/2.0.0-beta1") +func WithPeer(p peer.ID) FilterSubscribeOption { + return func(params *FilterSubscribeParameters) { + params.selectedPeer = p + } +} + +func WithAutomaticPeerSelection() FilterSubscribeOption { + return func(params *FilterSubscribeParameters) { + p, err := utils.SelectPeer(params.host, string(FilterID_v20beta1)) + if err == nil { + params.selectedPeer = *p + } else { + log.Info("Error selecting peer: ", err) + } + } +} + +func DefaultOptions() []FilterSubscribeOption { + return []FilterSubscribeOption{ + WithAutomaticPeerSelection(), + } +} + func (filters *Filters) Notify(msg *pb.WakuMessage, requestId string) { for key, filter := range *filters { envelope := protocol.NewEnvelope(msg, filter.Topic) @@ -231,25 +276,33 @@ func (wf *WakuFilter) FilterListener() { // Having a FilterRequest struct, // select a peer with filter support, dial it, // and submit FilterRequest wrapped in FilterRPC -func (wf *WakuFilter) Subscribe(ctx context.Context, topic string, contentTopics []string) (requestID string, peer *peer.ID, err error) { +func (wf *WakuFilter) Subscribe(ctx context.Context, filter ContentFilter, opts ...FilterSubscribeOption) (subscription *FilterSubscription, err error) { + params := new(FilterSubscribeParameters) + params.host = wf.h + + optList := DefaultOptions() + optList = append(optList, opts...) + for _, opt := range optList { + opt(params) + } + + if params.selectedPeer == "" { + return nil, ErrNoPeersAvailable + } + var contentFilters []*pb.FilterRequest_ContentFilter - for _, ct := range contentTopics { + for _, ct := range filter.ContentTopics { contentFilters = append(contentFilters, &pb.FilterRequest_ContentFilter{ContentTopic: ct}) } request := pb.FilterRequest{ Subscribe: true, - Topic: topic, + Topic: filter.Topic, ContentFilters: contentFilters, } - peer, err = utils.SelectPeer(wf.h, string(FilterID_v20beta1)) - if err != nil { - return - } - var conn network.Stream - conn, err = wf.h.NewStream(ctx, *peer, FilterID_v20beta1) + conn, err = wf.h.NewStream(ctx, params.selectedPeer, FilterID_v20beta1) if err != nil { return } @@ -257,7 +310,7 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, topic string, contentTopics defer conn.Close() // This is the only successful path to subscription - requestID = hex.EncodeToString(protocol.GenerateRequestId()) + requestID := hex.EncodeToString(protocol.GenerateRequestId()) writer := protoio.NewDelimitedWriter(conn) filterRPC := &pb.FilterRPC{RequestId: requestID, Request: &request} @@ -268,10 +321,14 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, topic string, contentTopics return } + subscription = new(FilterSubscription) + subscription.Peer = params.selectedPeer + subscription.RequestID = requestID + return } -func (wf *WakuFilter) Unsubscribe(ctx context.Context, topic string, contentTopics []string, peer peer.ID) error { +func (wf *WakuFilter) Unsubscribe(ctx context.Context, filter ContentFilter, peer peer.ID) error { conn, err := wf.h.NewStream(ctx, peer, FilterID_v20beta1) if err != nil { @@ -284,13 +341,13 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, topic string, contentTopi id := protocol.GenerateRequestId() var contentFilters []*pb.FilterRequest_ContentFilter - for _, ct := range contentTopics { + for _, ct := range filter.ContentTopics { contentFilters = append(contentFilters, &pb.FilterRequest_ContentFilter{ContentTopic: ct}) } request := pb.FilterRequest{ Subscribe: false, - Topic: topic, + Topic: filter.Topic, ContentFilters: contentFilters, }