diff --git a/docs/api/filter.md b/docs/api/filter.md new file mode 100644 index 00000000..c713d4fa --- /dev/null +++ b/docs/api/filter.md @@ -0,0 +1,82 @@ +Receive messages using Waku Filter +=== +WakuFilter is a protocol that enables subscribing to messages that a peer receives. You can find Waku Filter's specifications on [Vac RFC](https://rfc.vac.dev/spec/12/). This is a more lightweight version of WakuRelay specifically designed for bandwidth restricted devices. This is due to the fact that light nodes subscribe to full-nodes and only receive the messages they desire. + +## Create a waku instance +```go +package main + +import ( + "context" + "fmt" + + "github.com/status-im/go-waku/waku/v2/node" + "github.com/status-im/go-waku/waku/v2/protocol/relay" +) + +... +wakuNode, err := node.New(context.Background(), + node.WithWakuFilter(false), +) +if err != nil { + fmt.Println(err) + return +} + +if err := wakuNode.Start(); err != nil { + fmt.Println(err) + return +} +... + +``` + +### Options +One of these options must be specified when instantiating a node supporting the waku relay protocol + +- `WithWakuRelay(opts ...pubsub.Option)` - enables the waku relay protocol and receives an optional list of pubsub options to tune or configure the gossipsub parameters. Supported options can be seen [here](https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub#Option). The recommended [parameter configuration](https://rfc.vac.dev/spec/29/) is used by default. +- `WithWakuRelayAndMinPeers(minRelayPeersToPublish int, opts ...pubsub.Option)` - enables the waku relay protocol, specifying the minimum number of peers a topic should have to send a message. It also receives an optional list of pubsub [options](https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub#Option) + + +## Adding a peer and receiving messages +```go +... + +peerAddr, err := multiaddr.NewMultiaddr("/dns4/node-01.do-ams3.wakuv2.test.statusim.net/tcp/30303/p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ") +if err != nil { + panic(err) +} + +_, err = wakuNode.AddPeer(peerAddr, string(filter.FilterID_v20beta1)) +if err != nil { + panic(err) +} + +// Setting the content filter +cf := filter.ContentFilter{ + Topic: "/waku/2/default-waku/proto", + ContentTopics: []string{contentTopic}, +} + +_, subscription, err := wakuNode.Filter().Subscribe(context.Background(), cf) +if err != nil { + panic(err) +} + +for env := range subscription.Chan { + fmt.Println("Received msg:", string(value.Message().Payload)) +} +... +``` +To receive messages sent via the relay protocol, you need to subscribe to a pubsub topic. This can be done via any of these functions: +- `wakuNode.Filter().Subscribe(ctx, contentFilter, opts)` - subscribes to receive messages that match a specific contentFilter + +This function return a `Filter` subscrition. To stop receiving messages in this channel `wakuNode.UnsubscribeByFilter(ctx, theFilter)` can be executed which will close the subscription and channel for the filter. `wakuNode.Unsubscribe` and `wakuNode.UnsubscribeFilterByID` are also available for similar purposes + +If no options are specified when subscribing to a filter node, go-waku will automatically choose the peer to subscribe to. This behaviour can be controlled via options: + +### Options + +- `filter.WithPeer(peerID)` - use an specific peer ID (which should be part of the node peerstore) to receive the messages from +- `filter.WithAutomaticPeerSelection(host)` - automatically select a peer that supports filter protocol from the peerstore to receive the messages from +- `filter.WithFastestPeerSelection(ctx)` - automatically select a peer based on its ping reply time diff --git a/waku/v2/protocol/filter/waku_filter.go b/waku/v2/protocol/filter/waku_filter.go index 7b41431c..5b0d4ef7 100644 --- a/waku/v2/protocol/filter/waku_filter.go +++ b/waku/v2/protocol/filter/waku_filter.go @@ -28,6 +28,7 @@ var ( type ( Filter struct { + filterID string PeerID peer.ID Topic string ContentFilters []string @@ -57,11 +58,10 @@ type ( } ) -// NOTE This is just a start, the design of this protocol isn't done yet. It -// should be direct payload exchange (a la req-resp), not be coupled with the -// relay protocol. +// FilterID_v20beta1 is the current Waku Filter protocol identifier const FilterID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter/2.0.0-beta1") +// NewWakuRelay returns a new instance of Waku Filter struct setup according to the chosen parameter and options func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, log *zap.Logger, opts ...Option) (*WakuFilter, error) { wf := new(WakuFilter) wf.log = log.Named("filter").With(zap.Bool("fullNode", isFullNode)) @@ -90,7 +90,7 @@ func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, log *za wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest) wf.wg.Add(1) - go wf.FilterListener() + go wf.filterListener() wf.log.Info("filter protocol started") return wf, nil @@ -177,7 +177,7 @@ func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) er return nil } -func (wf *WakuFilter) FilterListener() { +func (wf *WakuFilter) filterListener() { defer wf.wg.Done() // This function is invoked for each message received @@ -282,6 +282,7 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil return } +// Unsubscribe is used to stop receiving messages from a peer that match a content filter func (wf *WakuFilter) Unsubscribe(ctx context.Context, contentFilter ContentFilter, peer peer.ID) error { // We connect first so dns4 addresses are resolved (NewStream does not do it) err := wf.h.Connect(ctx, wf.h.Peerstore().PeerInfo(peer)) @@ -320,6 +321,7 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, contentFilter ContentFilt return nil } +// Stop unmounts the filter protocol func (wf *WakuFilter) Stop() { close(wf.MsgC) @@ -328,6 +330,7 @@ func (wf *WakuFilter) Stop() { wf.wg.Wait() } +// Subscribe setups a subscription to receive messages that match a specific content filter func (wf *WakuFilter) Subscribe(ctx context.Context, f ContentFilter, opts ...FilterSubscribeOption) (filterID string, theFilter Filter, err error) { // TODO: check if there's an existing pubsub topic that uses the same peer. If so, reuse filter, and return same channel and filterID @@ -344,6 +347,7 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, f ContentFilter, opts ...Fi filterID = remoteSubs.RequestID theFilter = Filter{ + filterID: filterID, PeerID: remoteSubs.Peer, Topic: f.Topic, ContentFilters: f.ContentTopics, @@ -355,6 +359,16 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, f ContentFilter, opts ...Fi return } +// UnsubscribeFilterByID removes a subscription to a filter node completely +// using using a filter. It also closes the filter channel +func (wf *WakuFilter) UnsubscribeByFilter(ctx context.Context, filter Filter) error { + err := wf.UnsubscribeFilterByID(ctx, filter.filterID) + if err != nil { + close(filter.Chan) + } + return err +} + // UnsubscribeFilterByID removes a subscription to a filter node completely // using the filterID returned when the subscription was created func (wf *WakuFilter) UnsubscribeFilterByID(ctx context.Context, filterID string) error {