mirror of https://github.com/status-im/go-waku.git
chore: add docs to filter protocol
This commit is contained in:
parent
8cf5f80529
commit
6043f6db2e
|
@ -0,0 +1,82 @@
|
||||||
|
Receive messages using Waku Filter
|
||||||
|
===
|
||||||
|
WakuFilter is a protocol that enables subscribing to messages that a peer receives. You can find Waku Filter's specifications on [Vac RFC](https://rfc.vac.dev/spec/12/). This is a more lightweight version of WakuRelay specifically designed for bandwidth restricted devices. This is due to the fact that light nodes subscribe to full-nodes and only receive the messages they desire.
|
||||||
|
|
||||||
|
## Create a waku instance
|
||||||
|
```go
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/status-im/go-waku/waku/v2/node"
|
||||||
|
"github.com/status-im/go-waku/waku/v2/protocol/relay"
|
||||||
|
)
|
||||||
|
|
||||||
|
...
|
||||||
|
wakuNode, err := node.New(context.Background(),
|
||||||
|
node.WithWakuFilter(false),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := wakuNode.Start(); err != nil {
|
||||||
|
fmt.Println(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
...
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
### Options
|
||||||
|
One of these options must be specified when instantiating a node supporting the waku relay protocol
|
||||||
|
|
||||||
|
- `WithWakuRelay(opts ...pubsub.Option)` - enables the waku relay protocol and receives an optional list of pubsub options to tune or configure the gossipsub parameters. Supported options can be seen [here](https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub#Option). The recommended [parameter configuration](https://rfc.vac.dev/spec/29/) is used by default.
|
||||||
|
- `WithWakuRelayAndMinPeers(minRelayPeersToPublish int, opts ...pubsub.Option)` - enables the waku relay protocol, specifying the minimum number of peers a topic should have to send a message. It also receives an optional list of pubsub [options](https://pkg.go.dev/github.com/libp2p/go-libp2p-pubsub#Option)
|
||||||
|
|
||||||
|
|
||||||
|
## Adding a peer and receiving messages
|
||||||
|
```go
|
||||||
|
...
|
||||||
|
|
||||||
|
peerAddr, err := multiaddr.NewMultiaddr("/dns4/node-01.do-ams3.wakuv2.test.statusim.net/tcp/30303/p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ")
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = wakuNode.AddPeer(peerAddr, string(filter.FilterID_v20beta1))
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setting the content filter
|
||||||
|
cf := filter.ContentFilter{
|
||||||
|
Topic: "/waku/2/default-waku/proto",
|
||||||
|
ContentTopics: []string{contentTopic},
|
||||||
|
}
|
||||||
|
|
||||||
|
_, subscription, err := wakuNode.Filter().Subscribe(context.Background(), cf)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for env := range subscription.Chan {
|
||||||
|
fmt.Println("Received msg:", string(value.Message().Payload))
|
||||||
|
}
|
||||||
|
...
|
||||||
|
```
|
||||||
|
To receive messages sent via the relay protocol, you need to subscribe to a pubsub topic. This can be done via any of these functions:
|
||||||
|
- `wakuNode.Filter().Subscribe(ctx, contentFilter, opts)` - subscribes to receive messages that match a specific contentFilter
|
||||||
|
|
||||||
|
This function return a `Filter` subscrition. To stop receiving messages in this channel `wakuNode.UnsubscribeByFilter(ctx, theFilter)` can be executed which will close the subscription and channel for the filter. `wakuNode.Unsubscribe` and `wakuNode.UnsubscribeFilterByID` are also available for similar purposes
|
||||||
|
|
||||||
|
If no options are specified when subscribing to a filter node, go-waku will automatically choose the peer to subscribe to. This behaviour can be controlled via options:
|
||||||
|
|
||||||
|
### Options
|
||||||
|
|
||||||
|
- `filter.WithPeer(peerID)` - use an specific peer ID (which should be part of the node peerstore) to receive the messages from
|
||||||
|
- `filter.WithAutomaticPeerSelection(host)` - automatically select a peer that supports filter protocol from the peerstore to receive the messages from
|
||||||
|
- `filter.WithFastestPeerSelection(ctx)` - automatically select a peer based on its ping reply time
|
|
@ -28,6 +28,7 @@ var (
|
||||||
|
|
||||||
type (
|
type (
|
||||||
Filter struct {
|
Filter struct {
|
||||||
|
filterID string
|
||||||
PeerID peer.ID
|
PeerID peer.ID
|
||||||
Topic string
|
Topic string
|
||||||
ContentFilters []string
|
ContentFilters []string
|
||||||
|
@ -57,11 +58,10 @@ type (
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
// NOTE This is just a start, the design of this protocol isn't done yet. It
|
// FilterID_v20beta1 is the current Waku Filter protocol identifier
|
||||||
// should be direct payload exchange (a la req-resp), not be coupled with the
|
|
||||||
// relay protocol.
|
|
||||||
const FilterID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter/2.0.0-beta1")
|
const FilterID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter/2.0.0-beta1")
|
||||||
|
|
||||||
|
// NewWakuRelay returns a new instance of Waku Filter struct setup according to the chosen parameter and options
|
||||||
func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, log *zap.Logger, opts ...Option) (*WakuFilter, error) {
|
func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, log *zap.Logger, opts ...Option) (*WakuFilter, error) {
|
||||||
wf := new(WakuFilter)
|
wf := new(WakuFilter)
|
||||||
wf.log = log.Named("filter").With(zap.Bool("fullNode", isFullNode))
|
wf.log = log.Named("filter").With(zap.Bool("fullNode", isFullNode))
|
||||||
|
@ -90,7 +90,7 @@ func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, log *za
|
||||||
wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest)
|
wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest)
|
||||||
|
|
||||||
wf.wg.Add(1)
|
wf.wg.Add(1)
|
||||||
go wf.FilterListener()
|
go wf.filterListener()
|
||||||
|
|
||||||
wf.log.Info("filter protocol started")
|
wf.log.Info("filter protocol started")
|
||||||
return wf, nil
|
return wf, nil
|
||||||
|
@ -177,7 +177,7 @@ func (wf *WakuFilter) pushMessage(subscriber Subscriber, msg *pb.WakuMessage) er
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wf *WakuFilter) FilterListener() {
|
func (wf *WakuFilter) filterListener() {
|
||||||
defer wf.wg.Done()
|
defer wf.wg.Done()
|
||||||
|
|
||||||
// This function is invoked for each message received
|
// This function is invoked for each message received
|
||||||
|
@ -282,6 +282,7 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Unsubscribe is used to stop receiving messages from a peer that match a content filter
|
||||||
func (wf *WakuFilter) Unsubscribe(ctx context.Context, contentFilter ContentFilter, peer peer.ID) error {
|
func (wf *WakuFilter) Unsubscribe(ctx context.Context, contentFilter ContentFilter, peer peer.ID) error {
|
||||||
// We connect first so dns4 addresses are resolved (NewStream does not do it)
|
// We connect first so dns4 addresses are resolved (NewStream does not do it)
|
||||||
err := wf.h.Connect(ctx, wf.h.Peerstore().PeerInfo(peer))
|
err := wf.h.Connect(ctx, wf.h.Peerstore().PeerInfo(peer))
|
||||||
|
@ -320,6 +321,7 @@ func (wf *WakuFilter) Unsubscribe(ctx context.Context, contentFilter ContentFilt
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Stop unmounts the filter protocol
|
||||||
func (wf *WakuFilter) Stop() {
|
func (wf *WakuFilter) Stop() {
|
||||||
close(wf.MsgC)
|
close(wf.MsgC)
|
||||||
|
|
||||||
|
@ -328,6 +330,7 @@ func (wf *WakuFilter) Stop() {
|
||||||
wf.wg.Wait()
|
wf.wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Subscribe setups a subscription to receive messages that match a specific content filter
|
||||||
func (wf *WakuFilter) Subscribe(ctx context.Context, f ContentFilter, opts ...FilterSubscribeOption) (filterID string, theFilter Filter, err error) {
|
func (wf *WakuFilter) Subscribe(ctx context.Context, f ContentFilter, opts ...FilterSubscribeOption) (filterID string, theFilter Filter, err error) {
|
||||||
// TODO: check if there's an existing pubsub topic that uses the same peer. If so, reuse filter, and return same channel and filterID
|
// TODO: check if there's an existing pubsub topic that uses the same peer. If so, reuse filter, and return same channel and filterID
|
||||||
|
|
||||||
|
@ -344,6 +347,7 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, f ContentFilter, opts ...Fi
|
||||||
|
|
||||||
filterID = remoteSubs.RequestID
|
filterID = remoteSubs.RequestID
|
||||||
theFilter = Filter{
|
theFilter = Filter{
|
||||||
|
filterID: filterID,
|
||||||
PeerID: remoteSubs.Peer,
|
PeerID: remoteSubs.Peer,
|
||||||
Topic: f.Topic,
|
Topic: f.Topic,
|
||||||
ContentFilters: f.ContentTopics,
|
ContentFilters: f.ContentTopics,
|
||||||
|
@ -355,6 +359,16 @@ func (wf *WakuFilter) Subscribe(ctx context.Context, f ContentFilter, opts ...Fi
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UnsubscribeFilterByID removes a subscription to a filter node completely
|
||||||
|
// using using a filter. It also closes the filter channel
|
||||||
|
func (wf *WakuFilter) UnsubscribeByFilter(ctx context.Context, filter Filter) error {
|
||||||
|
err := wf.UnsubscribeFilterByID(ctx, filter.filterID)
|
||||||
|
if err != nil {
|
||||||
|
close(filter.Chan)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// UnsubscribeFilterByID removes a subscription to a filter node completely
|
// UnsubscribeFilterByID removes a subscription to a filter node completely
|
||||||
// using the filterID returned when the subscription was created
|
// using the filterID returned when the subscription was created
|
||||||
func (wf *WakuFilter) UnsubscribeFilterByID(ctx context.Context, filterID string) error {
|
func (wf *WakuFilter) UnsubscribeFilterByID(ctx context.Context, filterID string) error {
|
||||||
|
|
Loading…
Reference in New Issue