feat: improvements on filter protocol (server)

This commit is contained in:
Richard Ramos 2023-02-07 18:28:46 -04:00 committed by RichΛrd
parent c3be19f922
commit f255adffd9
19 changed files with 1726 additions and 24 deletions

View File

@ -297,6 +297,12 @@ var (
Destination: &options.Filter.Enable,
EnvVars: []string{"WAKUNODE2_FILTER"},
})
FilterV2Flag = altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "use-filterv2",
Usage: "Use filterV2 protocol (experimental)",
Destination: &options.Filter.UseV2,
EnvVars: []string{"WAKUNODE2_FILTERV2"},
})
LightClient = altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "light-client",
Usage: "Don't accept filter subscribers",

View File

@ -58,6 +58,7 @@ func main() {
SwapPaymentThreshold,
SwapDisconnectThreshold,
FilterFlag,
FilterV2Flag,
LightClient,
FilterNode,
FilterTimeout,

View File

@ -11,6 +11,7 @@ import (
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/filterv2"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
@ -81,13 +82,12 @@ func execute(options Options) {
}
if options.Filter.Enable {
opts = append(opts, node.WithWakuFilter(false))
if options.Filter.UseV2 {
opts = append(opts, node.WithWakuFilterV2LightNode())
} else {
opts = append(opts, node.WithWakuFilter(false))
}
}
if options.LightPush.Enable {
opts = append(opts, node.WithLightPush())
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -109,7 +109,11 @@ func execute(options Options) {
return
}
err = addPeer(wakuNode, options.Filter.Node, string(filter.FilterID_v20beta1))
if options.Filter.UseV2 {
err = addPeer(wakuNode, options.Filter.Node, string(filterv2.FilterSubscribeID_v20beta1))
} else {
err = addPeer(wakuNode, options.Filter.Node, string(filter.FilterID_v20beta1))
}
if err != nil {
fmt.Println(err.Error())
return

View File

@ -117,7 +117,7 @@ func getFlags() []cli.Flag {
},
&cli.BoolFlag{
Name: "store",
Usage: "Enable relay protocol",
Usage: "Enable store protocol",
Value: true,
Destination: &options.Store.Enable,
},
@ -133,6 +133,11 @@ func getFlags() []cli.Flag {
Usage: "Enable filter protocol",
Destination: &options.Filter.Enable,
},
&cli.BoolFlag{
Name: "use-filterv2",
Usage: "Use filterV2 protocol (experimental)",
Destination: &options.Filter.UseV2,
},
&cli.GenericFlag{
Name: "filternode",
Usage: "Multiaddr of a peer that supports filter protocol.",

View File

@ -16,7 +16,7 @@ require (
github.com/libp2p/go-libp2p v0.23.2
github.com/muesli/reflow v0.3.0
github.com/multiformats/go-multiaddr v0.7.0
github.com/urfave/cli/v2 v2.20.2
github.com/urfave/cli/v2 v2.23.7
github.com/waku-org/go-waku v0.2.3-0.20221109195301-b2a5a68d28ba
github.com/waku-org/go-zerokit-rln v0.1.7-wakuorg
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e
@ -25,6 +25,7 @@ require (
)
require (
github.com/BurntSushi/toml v1.2.1 // indirect
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 // indirect
github.com/VictoriaMetrics/fastcache v1.6.0 // indirect
github.com/atotto/clipboard v0.1.4 // indirect
@ -154,5 +155,6 @@ require (
golang.org/x/tools v0.1.12 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
lukechampine.com/blake3 v1.1.7 // indirect
)

View File

@ -56,6 +56,8 @@ github.com/Azure/go-autorest/autorest/mocks v0.3.0/go.mod h1:a8FDP3DYzQ4RYfVAxAN
github.com/Azure/go-autorest/logger v0.1.0/go.mod h1:oExouG+K6PryycPJfVSxi/koC6LSNgds39diKLz7Vrc=
github.com/Azure/go-autorest/tracing v0.5.0/go.mod h1:r/s2XiOKccPW3HrqB+W0TQzfbtp2fGCgRFtBroKn4Dk=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/toml v1.2.1 h1:9F2/+DoOYIOksmaJFPw1tGFy1eDnIJXg+UHjuD8lTak=
github.com/BurntSushi/toml v1.2.1/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
@ -751,8 +753,8 @@ github.com/tyler-smith/go-bip39 v1.0.1-0.20181017060643-dbb3b84ba2ef h1:wHSqTBrZ
github.com/tyler-smith/go-bip39 v1.0.1-0.20181017060643-dbb3b84ba2ef/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2K9Zr6cf67kNs=
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
github.com/urfave/cli/v2 v2.20.2 h1:dKA0LUjznZpwmmbrc0pOgcLTEilnHeM8Av9Yng77gHM=
github.com/urfave/cli/v2 v2.20.2/go.mod h1:1CNUng3PtjQMtRzJO4FMXBQvkGtuYRxxiR9xMa7jMwI=
github.com/urfave/cli/v2 v2.23.7 h1:YHDQ46s3VghFHFf1DdF+Sh7H4RqhcM+t0TmZRJx4oJY=
github.com/urfave/cli/v2 v2.23.7/go.mod h1:GHupkWPMM0M/sj1a2b4wUrWBPzazNrIjouW6fmdJLxc=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=

View File

@ -59,6 +59,7 @@ func nodePeerID(node *multiaddr.Multiaddr) (peer.ID, error) {
// restricted devices.
type FilterOptions struct {
Enable bool
UseV2 bool
Node *multiaddr.Multiaddr
}

View File

@ -160,7 +160,14 @@ func Execute(options Options) {
}
if options.Filter.Enable {
nodeOpts = append(nodeOpts, node.WithWakuFilter(!options.Filter.DisableFullNode, filter.WithTimeout(options.Filter.Timeout)))
if options.Filter.UseV2 {
if !options.Filter.DisableFullNode {
nodeOpts = append(nodeOpts, node.WithWakuFilterV2FullNode(filter.WithTimeout(options.Filter.Timeout)))
}
nodeOpts = append(nodeOpts, node.WithWakuFilterV2LightNode())
} else {
nodeOpts = append(nodeOpts, node.WithWakuFilter(!options.Filter.DisableFullNode, filter.WithTimeout(options.Filter.Timeout)))
}
}
if options.Store.Enable {
@ -233,7 +240,12 @@ func Execute(options Options) {
addPeers(wakuNode, options.Store.Nodes, string(store.StoreID_v20beta4))
addPeers(wakuNode, options.LightPush.Nodes, string(lightpush.LightPushID_v20beta1))
addPeers(wakuNode, options.Filter.Nodes, string(filter.FilterID_v20beta1))
if options.Filter.UseV2 {
addPeers(wakuNode, options.Filter.Nodes, string(filter.FilterID_v20beta1))
} else {
addPeers(wakuNode, options.Filter.Nodes, string(filter.FilterID_v20beta1))
}
if err = wakuNode.Start(ctx); err != nil {
logger.Fatal("starting waku node", zap.Error(err))

View File

@ -47,6 +47,7 @@ type RLNRelayOptions struct {
// restricted devices.
type FilterOptions struct {
Enable bool
UseV2 bool
DisableFullNode bool
Nodes []multiaddr.Multiaddr
Timeout time.Duration

View File

@ -32,6 +32,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/discv5"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/filterv2"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange"
@ -78,6 +79,7 @@ type WakuNode struct {
discoveryV5 Service
peerExchange Service
filter ReceptorService
filterV2 ReceptorService
store ReceptorService
rlnRelay RLNRelay
@ -208,6 +210,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
w.relay = relay.NewWakuRelay(w.host, w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...)
w.filter = filter.NewWakuFilter(w.host, w.bcaster, w.opts.isFilterFullNode, w.timesource, w.log, w.opts.filterOpts...)
w.filterV2 = filterv2.NewWakuFilter(w.host, w.bcaster, w.timesource, w.log, w.opts.filterOpts...)
w.lightPush = lightpush.NewWakuLightPush(w.host, w.Relay(), w.log)
if w.opts.enableSwap {
@ -353,6 +356,16 @@ func (w *WakuNode) Start(ctx context.Context) error {
w.bcaster.Register(nil, w.filter.MessageChannel())
}
if w.opts.enableFilterV2FullNode {
err := w.filterV2.Start(ctx)
if err != nil {
return err
}
w.log.Info("Subscribing filterV2 to broadcaster")
w.bcaster.Register(nil, w.filterV2.MessageChannel())
}
err = w.setupENR(ctx, w.ListenAddresses())
if err != nil {
return err
@ -394,6 +407,7 @@ func (w *WakuNode) Stop() {
w.lightPush.Stop()
w.store.Stop()
w.filter.Stop()
w.filterV2.Stop()
w.peerExchange.Stop()
if w.opts.enableDiscV5 {

View File

@ -60,12 +60,14 @@ type WakuNodeParameters struct {
logger *zap.Logger
noDefaultWakuTopic bool
enableRelay bool
enableFilter bool
isFilterFullNode bool
filterOpts []filter.Option
wOpts []pubsub.Option
noDefaultWakuTopic bool
enableRelay bool
enableFilter bool
isFilterFullNode bool
enableFilterV2LightNode bool
enableFilterV2FullNode bool
filterOpts []filter.Option
wOpts []pubsub.Option
minRelayPeersToPublish int
@ -310,7 +312,7 @@ func WithPeerExchange() WakuNodeOption {
}
}
// WithWakuFilter enables the Waku V2 Filter protocol. This WakuNodeOption
// WithWakuFilter enables the Waku Filter protocol. This WakuNodeOption
// accepts a list of WakuFilter gossipsub options to setup the protocol
func WithWakuFilter(fullNode bool, filterOpts ...filter.Option) WakuNodeOption {
return func(params *WakuNodeParameters) error {
@ -321,6 +323,24 @@ func WithWakuFilter(fullNode bool, filterOpts ...filter.Option) WakuNodeOption {
}
}
// WithWakuFilterV2 enables the Waku Filter V2 protocol for lightnode functionality
func WithWakuFilterV2LightNode() WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableFilterV2LightNode = true
return nil
}
}
// WithWakuFilterV2FullNode enables the Waku Filter V2 protocol full node functionality.
// This WakuNodeOption accepts a list of WakuFilter options to setup the protocol
func WithWakuFilterV2FullNode(filterOpts ...filter.Option) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.enableFilterV2FullNode = true
params.filterOpts = filterOpts
return nil
}
}
// WithWakuStore enables the Waku V2 Store protocol and if the messages should
// be stored or not in a message provider. If resumeNodes are specified, the
// store will attempt to resume message history using those nodes

View File

@ -80,7 +80,7 @@ func NewWakuFilter(host host.Host, broadcaster v2.Broadcaster, isFullNode bool,
wf.h = host
wf.isFullNode = isFullNode
wf.filters = NewFilterMap(broadcaster, timesource)
wf.subscribers = NewSubscribers(params.timeout)
wf.subscribers = NewSubscribers(params.Timeout)
return wf
}
@ -287,7 +287,7 @@ func (wf *WakuFilter) requestSubscription(ctx context.Context, filter ContentFil
writer := protoio.NewDelimitedWriter(conn)
filterRPC := &pb.FilterRPC{RequestId: requestID, Request: &request}
wf.log.Info("sending filterRPC", zap.Stringer("rpc", filterRPC))
wf.log.Debug("sending filterRPC", zap.Stringer("rpc", filterRPC))
err = writer.WriteMsg(filterRPC)
if err != nil {
wf.log.Error("sending filterRPC", zap.Error(err))

View File

@ -20,7 +20,7 @@ type (
FilterSubscribeOption func(*FilterSubscribeParameters)
FilterParameters struct {
timeout time.Duration
Timeout time.Duration
}
Option func(*FilterParameters)
@ -28,7 +28,7 @@ type (
func WithTimeout(timeout time.Duration) Option {
return func(params *FilterParameters) {
params.timeout = timeout
params.Timeout = timeout
}
}

View File

@ -0,0 +1,7 @@
package filterv2
import libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
// FilterPushID_v20beta1 is the current Waku Filter protocol identifier used to allow
// filter service nodes to push messages matching registered subscriptions to this client.
const FilterPushID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter-push/2.0.0-beta1")

View File

@ -0,0 +1,272 @@
package filterv2
import (
"context"
"errors"
"math"
"net/http"
"sync"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
libp2pProtocol "github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-msgio/protoio"
"github.com/waku-org/go-waku/logging"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/metrics"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
"go.opencensus.io/tag"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
// FilterSubscribeID_v20beta1 is the current Waku Filter protocol identifier for servers to
// allow filter clients to subscribe, modify, refresh and unsubscribe a desired set of filter criteria
const FilterSubscribeID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter-subscribe/2.0.0-beta1")
type (
WakuFilter struct {
cancel context.CancelFunc
h host.Host
msgC chan *protocol.Envelope
wg *sync.WaitGroup
log *zap.Logger
subscriptions *SubscriptionMap
}
)
// NewWakuFilter returns a new instance of Waku Filter struct setup according to the chosen parameter and options
func NewWakuFilter(host host.Host, broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger, opts ...filter.Option) *WakuFilter {
wf := new(WakuFilter)
wf.log = log.Named("filterv2-fullnode")
params := new(filter.FilterParameters)
optList := filter.DefaultOptions()
optList = append(optList, opts...)
for _, opt := range optList {
opt(params)
}
wf.wg = &sync.WaitGroup{}
wf.h = host
wf.subscriptions = NewSubscriptionMap(broadcaster, timesource, params.Timeout)
return wf
}
func (wf *WakuFilter) Start(ctx context.Context) error {
wf.wg.Wait() // Wait for any goroutines to stop
ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter"))
if err != nil {
wf.log.Error("creating tag map", zap.Error(err))
return errors.New("could not start waku filter")
}
ctx, cancel := context.WithCancel(ctx)
wf.h.SetStreamHandlerMatch(FilterSubscribeID_v20beta1, protocol.PrefixTextMatch(string(FilterSubscribeID_v20beta1)), wf.onRequest(ctx))
wf.cancel = cancel
wf.msgC = make(chan *protocol.Envelope, 1024)
wf.wg.Add(1)
go wf.filterListener(ctx)
wf.log.Info("filter protocol started")
return nil
}
func (wf *WakuFilter) onRequest(ctx context.Context) func(s network.Stream) {
return func(s network.Stream) {
defer s.Close()
logger := wf.log.With(logging.HostID("peer", s.Conn().RemotePeer()))
reader := protoio.NewDelimitedReader(s, math.MaxInt32)
subscribeRequest := &pb.FilterSubscribeRequest{}
err := reader.ReadMsg(subscribeRequest)
if err != nil {
logger.Error("reading request", zap.Error(err))
return
}
logger = logger.With(zap.String("requestID", subscribeRequest.RequestId))
switch subscribeRequest.FilterSubscribeType {
case pb.FilterSubscribeRequest_SUBSCRIBE:
wf.subscribe(s, logger, subscribeRequest)
case pb.FilterSubscribeRequest_SUBSCRIBER_PING:
wf.ping(s, logger, subscribeRequest)
case pb.FilterSubscribeRequest_UNSUBSCRIBE:
wf.unsubscribe(s, logger, subscribeRequest)
case pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL:
wf.unsubscribeAll(s, logger, subscribeRequest)
}
logger.Info("received request")
}
}
func reply(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest, statusCode int, description ...string) {
response := &pb.FilterSubscribeResponse{
RequestId: request.RequestId,
StatusCode: uint32(statusCode),
}
if len(description) != 0 {
response.StatusDesc = description[0]
} else {
response.StatusDesc = http.StatusText(statusCode)
}
writer := protoio.NewDelimitedWriter(s)
err := writer.WriteMsg(response)
if err != nil {
logger.Error("sending response", zap.Error(err))
}
}
func (wf *WakuFilter) ping(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
exists := wf.subscriptions.Has(s.Conn().RemotePeer())
if exists {
reply(s, logger, request, http.StatusOK)
} else {
reply(s, logger, request, http.StatusNotFound)
}
}
func (wf *WakuFilter) subscribe(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
if request.PubsubTopic == "" {
reply(s, logger, request, http.StatusBadRequest, "pubsubtopic can't be empty")
}
peerID := s.Conn().RemotePeer()
wf.subscriptions.Set(peerID, request.PubsubTopic, request.ContentTopics)
reply(s, logger, request, http.StatusOK)
}
func (wf *WakuFilter) unsubscribe(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
if request.PubsubTopic == "" {
reply(s, logger, request, http.StatusBadRequest, "pubsubtopic can't be empty")
}
err := wf.subscriptions.Delete(s.Conn().RemotePeer(), request.PubsubTopic, request.ContentTopics)
if err != nil {
reply(s, logger, request, http.StatusNotFound)
} else {
reply(s, logger, request, http.StatusOK)
}
}
func (wf *WakuFilter) unsubscribeAll(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
err := wf.subscriptions.DeleteAll(s.Conn().RemotePeer())
if err != nil {
reply(s, logger, request, http.StatusNotFound)
} else {
reply(s, logger, request, http.StatusOK)
}
}
func (wf *WakuFilter) filterListener(ctx context.Context) {
defer wf.wg.Done()
// This function is invoked for each message received
// on the full node in context of Waku2-Filter
handle := func(envelope *protocol.Envelope) error {
msg := envelope.Message()
pubsubTopic := envelope.PubsubTopic()
logger := wf.log.With(logging.HexBytes("envelopeHash", envelope.Hash()))
g := new(errgroup.Group)
// Each subscriber is a light node that earlier on invoked
// a FilterRequest on this node
for subscriber := range wf.subscriptions.Items(pubsubTopic, msg.ContentTopic) {
logger := logger.With(logging.HostID("subscriber", subscriber))
subscriber := subscriber // https://golang.org/doc/faq#closures_and_goroutines
// Do a message push to light node
logger.Info("pushing message to light node")
g.Go(func() (err error) {
err = wf.pushMessage(ctx, subscriber, envelope)
if err != nil {
logger.Error("pushing message", zap.Error(err))
}
return err
})
}
return g.Wait()
}
for m := range wf.msgC {
if err := handle(m); err != nil {
wf.log.Error("handling message", zap.Error(err))
}
}
}
func (wf *WakuFilter) pushMessage(ctx context.Context, peerID peer.ID, env *protocol.Envelope) error {
logger := wf.log.With(logging.HostID("peer", peerID))
messagePush := &pb.MessagePushV2{
PubsubTopic: env.PubsubTopic(),
WakuMessage: env.Message(),
}
// We connect first so dns4 addresses are resolved (NewStream does not do it)
err := wf.h.Connect(ctx, wf.h.Peerstore().PeerInfo(peerID))
if err != nil {
wf.subscriptions.FlagAsFailure(peerID)
logger.Error("connecting to peer", zap.Error(err))
return err
}
conn, err := wf.h.NewStream(ctx, peerID, FilterPushID_v20beta1)
if err != nil {
wf.subscriptions.FlagAsFailure(peerID)
logger.Error("opening peer stream", zap.Error(err))
//waku_filter_errors.inc(labelValues = [dialFailure])
return err
}
defer conn.Close()
writer := protoio.NewDelimitedWriter(conn)
err = writer.WriteMsg(messagePush)
if err != nil {
logger.Error("pushing messages to peer", zap.Error(err))
wf.subscriptions.FlagAsFailure(peerID)
return nil
}
wf.subscriptions.FlagAsSuccess(peerID)
return nil
}
// Stop unmounts the filter protocol
func (wf *WakuFilter) Stop() {
if wf.cancel == nil {
return
}
wf.h.RemoveStreamHandler(FilterSubscribeID_v20beta1)
wf.cancel()
close(wf.msgC)
wf.wg.Wait()
}
func (wf *WakuFilter) MessageChannel() chan *protocol.Envelope {
return wf.msgC
}

View File

@ -0,0 +1,293 @@
package filterv2
import (
"encoding/hex"
"errors"
"sync"
"time"
"github.com/ethereum/go-ethereum/crypto"
"github.com/libp2p/go-libp2p/core/peer"
v2 "github.com/waku-org/go-waku/waku/v2"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/timesource"
)
var ErrNotFound = errors.New("not found")
type ContentTopicSet map[string]struct{}
type PeerSet map[peer.ID]struct{}
type PubsubTopics map[string]ContentTopicSet // pubsubTopic => contentTopics
type SubscriptionMap struct {
sync.RWMutex
timesource timesource.Timesource
items map[peer.ID]PubsubTopics
interestMap map[string]PeerSet // key: sha256(pubsubTopic-contentTopic) => peers
timeout time.Duration
failedPeers map[peer.ID]time.Time
broadcaster v2.Broadcaster
}
type SubscriptionItem struct {
Key peer.ID
Value PubsubTopics
}
func NewSubscriptionMap(broadcaster v2.Broadcaster, timesource timesource.Timesource, timeout time.Duration) *SubscriptionMap {
return &SubscriptionMap{
timesource: timesource,
items: make(map[peer.ID]PubsubTopics),
interestMap: make(map[string]PeerSet),
broadcaster: broadcaster,
timeout: timeout,
failedPeers: make(map[peer.ID]time.Time),
}
}
func (sub *SubscriptionMap) Set(peerID peer.ID, pubsubTopic string, contentTopics []string) {
sub.Lock()
defer sub.Unlock()
pubsubTopicMap, ok := sub.items[peerID]
if !ok {
pubsubTopicMap = make(PubsubTopics)
}
contentTopicsMap, ok := pubsubTopicMap[pubsubTopic]
if !ok {
contentTopicsMap = make(ContentTopicSet)
}
for _, c := range contentTopics {
contentTopicsMap[c] = struct{}{}
}
pubsubTopicMap[pubsubTopic] = contentTopicsMap
sub.items[peerID] = pubsubTopicMap
if len(contentTopics) == 0 {
// Interested in all messages for a pubsub topic
sub.addToInterestMap(peerID, pubsubTopic, nil)
} else {
for _, c := range contentTopics {
c := c
sub.addToInterestMap(peerID, pubsubTopic, &c)
}
}
}
func (sub *SubscriptionMap) Get(peerID peer.ID) (PubsubTopics, bool) {
sub.RLock()
defer sub.RUnlock()
value, ok := sub.items[peerID]
return value, ok
}
func (sub *SubscriptionMap) Has(peerID peer.ID) bool {
sub.RLock()
defer sub.RUnlock()
_, ok := sub.items[peerID]
return ok
}
func (sub *SubscriptionMap) Delete(peerID peer.ID, pubsubTopic string, contentTopics []string) error {
sub.Lock()
defer sub.Unlock()
pubsubTopicMap, ok := sub.items[peerID]
if !ok {
return ErrNotFound
}
contentTopicsMap, ok := pubsubTopicMap[pubsubTopic]
if !ok {
return ErrNotFound
}
if len(contentTopics) == 0 {
// Remove all content topics related to this pubsub topic
for c := range contentTopicsMap {
c := c
delete(contentTopicsMap, c)
sub.removeFromInterestMap(peerID, pubsubTopic, &c)
}
delete(pubsubTopicMap, pubsubTopic)
sub.removeFromInterestMap(peerID, pubsubTopic, nil)
} else {
// Removing content topics individually
for _, c := range contentTopics {
c := c
delete(contentTopicsMap, c)
sub.removeFromInterestMap(peerID, pubsubTopic, &c)
}
// No more content topics available. Removing subscription completely
if len(contentTopicsMap) == 0 {
delete(pubsubTopicMap, pubsubTopic)
sub.removeFromInterestMap(peerID, pubsubTopic, nil)
}
}
pubsubTopicMap[pubsubTopic] = contentTopicsMap
sub.items[peerID] = pubsubTopicMap
return nil
}
func (sub *SubscriptionMap) deleteAll(peerID peer.ID) error {
pubsubTopicMap, ok := sub.items[peerID]
if !ok {
return ErrNotFound
}
for pubsubTopic, contentTopicsMap := range pubsubTopicMap {
// Remove all content topics related to this pubsub topic
for c := range contentTopicsMap {
c := c
delete(contentTopicsMap, c)
sub.removeFromInterestMap(peerID, pubsubTopic, &c)
}
delete(pubsubTopicMap, pubsubTopic)
sub.removeFromInterestMap(peerID, pubsubTopic, nil)
}
delete(sub.items, peerID)
return nil
}
func (sub *SubscriptionMap) DeleteAll(peerID peer.ID) error {
sub.Lock()
defer sub.Unlock()
return sub.deleteAll(peerID)
}
func (sub *SubscriptionMap) RemoveAll() {
sub.Lock()
defer sub.Unlock()
for k /*, _ v*/ := range sub.items {
//close(v.Chan)
delete(sub.items, k)
}
}
func (sub *SubscriptionMap) Items(pubsubTopic string, contentTopic string) <-chan peer.ID {
c := make(chan peer.ID)
onlyPubsubTopicKey := getKey(pubsubTopic, nil)
pubsubAndContentTopicKey := getKey(pubsubTopic, &contentTopic)
f := func() {
sub.RLock()
defer sub.RUnlock()
for p := range sub.interestMap[onlyPubsubTopicKey] {
c <- p
}
for p := range sub.interestMap[pubsubAndContentTopicKey] {
c <- p
}
close(c)
}
go f()
return c
}
func (fm *SubscriptionMap) Notify(msg *pb.WakuMessage, peerID peer.ID) {
/*fm.RLock()
defer fm.RUnlock()
filter, ok := fm.items[peerID]
if !ok {
return
}
envelope := protocol.NewEnvelope(msg, fm.timesource.Now().UnixNano(), filter.Topic)
// Broadcasting message so it's stored
fm.broadcaster.Submit(envelope)
if msg.ContentTopic == "" {
filter.Chan <- envelope
}
// TODO: In case of no topics we should either trigger here for all messages,
// or we should not allow such filter to exist in the first place.
for _, contentTopic := range filter.ContentFilters {
if msg.ContentTopic == contentTopic {
filter.Chan <- envelope
break
}
}*/
}
func (sub *SubscriptionMap) addToInterestMap(peerID peer.ID, pubsubTopic string, contentTopic *string) {
key := getKey(pubsubTopic, contentTopic)
peerSet, ok := sub.interestMap[key]
if !ok {
peerSet = make(PeerSet)
}
peerSet[peerID] = struct{}{}
sub.interestMap[key] = peerSet
}
func (sub *SubscriptionMap) removeFromInterestMap(peerID peer.ID, pubsubTopic string, contentTopic *string) {
key := getKey(pubsubTopic, contentTopic)
delete(sub.interestMap, key)
}
func getKey(pubsubTopic string, contentTopic *string) string {
pubsubTopicBytes := []byte(pubsubTopic)
if contentTopic == nil {
return hex.EncodeToString(crypto.Keccak256(pubsubTopicBytes))
} else {
key := append(pubsubTopicBytes, []byte(*contentTopic)...)
return hex.EncodeToString(crypto.Keccak256(key))
}
}
func (sub *SubscriptionMap) IsFailedPeer(peerID peer.ID) bool {
sub.RLock()
defer sub.RUnlock()
_, ok := sub.failedPeers[peerID]
return ok
}
func (sub *SubscriptionMap) FlagAsSuccess(peerID peer.ID) {
sub.Lock()
defer sub.Unlock()
_, ok := sub.failedPeers[peerID]
if ok {
delete(sub.failedPeers, peerID)
}
}
func (sub *SubscriptionMap) FlagAsFailure(peerID peer.ID) {
sub.Lock()
defer sub.Unlock()
lastFailure, ok := sub.failedPeers[peerID]
if ok {
elapsedTime := time.Since(lastFailure)
if elapsedTime > sub.timeout {
sub.deleteAll(peerID)
}
} else {
sub.failedPeers[peerID] = time.Now()
}
}

View File

@ -6,3 +6,4 @@ package pb
//go:generate protoc -I. --gofast_out=. ./waku_store.proto
//go:generate protoc -I. --gofast_out=. ./waku_swap.proto
//go:generate protoc -I. --gofast_out=. ./waku_peer_exchange.proto
//go:generate protoc -I. --gofast_out=. ./waku_filter_v2.proto

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,36 @@
syntax = "proto3";
// 12/WAKU2-FILTER rfc: https://rfc.vac.dev/spec/12/
package pb;
import "waku_message.proto";
// Protocol identifier: /vac/waku/filter-subscribe/2.0.0-beta1
message FilterSubscribeRequest {
enum FilterSubscribeType {
SUBSCRIBER_PING = 0;
SUBSCRIBE = 1;
UNSUBSCRIBE = 2;
UNSUBSCRIBE_ALL = 3;
}
string request_id = 1;
FilterSubscribeType filter_subscribe_type = 2;
// Filter criteria
string pubsub_topic = 10;
repeated string content_topics = 11;
}
message FilterSubscribeResponse {
string request_id = 1;
uint32 status_code = 10;
string status_desc = 11;
}
// Protocol identifier: /vac/waku/filter-push/2.0.0-beta1
message MessagePushV2 {
WakuMessage waku_message = 1;
string pubsub_topic = 2;
}