mirror of
https://github.com/status-im/go-waku.git
synced 2025-01-24 20:49:46 +00:00
refactor: rename filter to legacy filter and set filterv2 as filter
This commit is contained in:
parent
c3ef173b2c
commit
0b01e4bb16
@ -18,6 +18,7 @@ import (
|
|||||||
"github.com/waku-org/go-waku/waku/v2/payload"
|
"github.com/waku-org/go-waku/waku/v2/payload"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
||||||
|
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
|
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
|
||||||
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||||
@ -58,27 +59,47 @@ func NewChat(ctx context.Context, node *node.WakuNode, options Options) *Chat {
|
|||||||
chat.ui = NewUIModel(chat.uiReady, chat.inputChan)
|
chat.ui = NewUIModel(chat.uiReady, chat.inputChan)
|
||||||
|
|
||||||
if options.Filter.Enable {
|
if options.Filter.Enable {
|
||||||
cf := filter.ContentFilter{
|
if options.Filter.UseV2 {
|
||||||
Topic: relay.DefaultWakuTopic,
|
cf := filter.ContentFilter{
|
||||||
ContentTopics: []string{options.ContentTopic},
|
Topic: relay.DefaultWakuTopic,
|
||||||
}
|
ContentTopics: []string{options.ContentTopic},
|
||||||
var err error
|
}
|
||||||
|
var filterOpt filter.FilterSubscribeOption
|
||||||
var filterOpt filter.FilterSubscribeOption
|
peerID, err := options.Filter.NodePeerID()
|
||||||
peerID, err := options.Filter.NodePeerID()
|
if err != nil {
|
||||||
if err != nil {
|
filterOpt = filter.WithAutomaticPeerSelection()
|
||||||
filterOpt = filter.WithAutomaticPeerSelection()
|
} else {
|
||||||
|
filterOpt = filter.WithPeer(peerID)
|
||||||
|
chat.ui.InfoMessage(fmt.Sprintf("Subscribing to filter node %s", peerID))
|
||||||
|
}
|
||||||
|
theFilter, err := node.FilterLightnode().Subscribe(ctx, cf, filterOpt)
|
||||||
|
if err != nil {
|
||||||
|
chat.ui.ErrorMessage(err)
|
||||||
|
} else {
|
||||||
|
chat.C = theFilter.C
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
filterOpt = filter.WithPeer(peerID)
|
// TODO: remove
|
||||||
chat.ui.InfoMessage(fmt.Sprintf("Subscribing to filter node %s", peerID))
|
cf := legacy_filter.ContentFilter{
|
||||||
|
Topic: relay.DefaultWakuTopic,
|
||||||
|
ContentTopics: []string{options.ContentTopic},
|
||||||
|
}
|
||||||
|
var filterOpt legacy_filter.FilterSubscribeOption
|
||||||
|
peerID, err := options.Filter.NodePeerID()
|
||||||
|
if err != nil {
|
||||||
|
filterOpt = legacy_filter.WithAutomaticPeerSelection()
|
||||||
|
} else {
|
||||||
|
filterOpt = legacy_filter.WithPeer(peerID)
|
||||||
|
chat.ui.InfoMessage(fmt.Sprintf("Subscribing to filter node %s", peerID))
|
||||||
|
}
|
||||||
|
_, theFilter, err := node.LegacyFilter().Subscribe(ctx, cf, filterOpt)
|
||||||
|
if err != nil {
|
||||||
|
chat.ui.ErrorMessage(err)
|
||||||
|
} else {
|
||||||
|
chat.C = theFilter.Chan
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_, theFilter, err := node.Filter().Subscribe(ctx, cf, filterOpt)
|
|
||||||
if err != nil {
|
|
||||||
chat.ui.ErrorMessage(err)
|
|
||||||
} else {
|
|
||||||
chat.C = theFilter.Chan
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
sub, err := node.Relay().Subscribe(ctx)
|
sub, err := node.Relay().Subscribe(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -12,7 +12,7 @@ import (
|
|||||||
"github.com/multiformats/go-multiaddr"
|
"github.com/multiformats/go-multiaddr"
|
||||||
"github.com/waku-org/go-waku/waku/v2/node"
|
"github.com/waku-org/go-waku/waku/v2/node"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filterv2"
|
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
|
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||||
@ -89,9 +89,9 @@ func execute(options Options) {
|
|||||||
|
|
||||||
if options.Filter.Enable {
|
if options.Filter.Enable {
|
||||||
if options.Filter.UseV2 {
|
if options.Filter.UseV2 {
|
||||||
opts = append(opts, node.WithWakuFilterV2LightNode())
|
opts = append(opts, node.WithWakuFilterLightNode())
|
||||||
} else {
|
} else {
|
||||||
opts = append(opts, node.WithWakuFilter(false))
|
opts = append(opts, node.WithLegacyWakuFilter(false))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
@ -116,9 +116,9 @@ func execute(options Options) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if options.Filter.UseV2 {
|
if options.Filter.UseV2 {
|
||||||
err = addPeer(wakuNode, options.Filter.Node, filterv2.FilterSubscribeID_v20beta1)
|
err = addPeer(wakuNode, options.Filter.Node, filter.FilterSubscribeID_v20beta1)
|
||||||
} else {
|
} else {
|
||||||
err = addPeer(wakuNode, options.Filter.Node, filter.FilterID_v20beta1)
|
err = addPeer(wakuNode, options.Filter.Node, legacy_filter.FilterID_v20beta1)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println(err.Error())
|
fmt.Println(err.Error())
|
||||||
|
@ -60,7 +60,7 @@ func main() {
|
|||||||
node.WithPrivateKey(prvKey1),
|
node.WithPrivateKey(prvKey1),
|
||||||
node.WithHostAddress(hostAddr1),
|
node.WithHostAddress(hostAddr1),
|
||||||
node.WithWakuRelay(),
|
node.WithWakuRelay(),
|
||||||
node.WithWakuFilter(true),
|
node.WithWakuFilterFullNode(),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
@ -74,13 +74,13 @@ func main() {
|
|||||||
lightNode, err := node.New(
|
lightNode, err := node.New(
|
||||||
node.WithPrivateKey(prvKey2),
|
node.WithPrivateKey(prvKey2),
|
||||||
node.WithHostAddress(hostAddr2),
|
node.WithHostAddress(hostAddr2),
|
||||||
node.WithWakuFilter(false),
|
node.WithWakuFilterLightNode(),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = lightNode.AddPeer(fullNode.ListenAddresses()[0], filter.FilterID_v20beta1)
|
_, err = lightNode.AddPeer(fullNode.ListenAddresses()[0], filter.FilterSubscribeID_v20beta1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Info("Error adding filter peer on light node ", err)
|
log.Info("Error adding filter peer on light node ", err)
|
||||||
}
|
}
|
||||||
@ -100,13 +100,13 @@ func main() {
|
|||||||
ContentTopics: []string{contentTopic},
|
ContentTopics: []string{contentTopic},
|
||||||
}
|
}
|
||||||
|
|
||||||
_, theFilter, err := lightNode.Filter().Subscribe(ctx, cf)
|
theFilter, err := lightNode.FilterLightnode().Subscribe(ctx, cf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for env := range theFilter.Chan {
|
for env := range theFilter.C {
|
||||||
log.Info("Light node received msg, ", string(env.Message().Payload))
|
log.Info("Light node received msg, ", string(env.Message().Payload))
|
||||||
}
|
}
|
||||||
log.Info("Message channel closed!")
|
log.Info("Message channel closed!")
|
||||||
@ -118,7 +118,7 @@ func main() {
|
|||||||
go func() {
|
go func() {
|
||||||
// Unsubscribe filter after 5 seconds
|
// Unsubscribe filter after 5 seconds
|
||||||
time.Sleep(5 * time.Second)
|
time.Sleep(5 * time.Second)
|
||||||
lightNode.Filter().UnsubscribeFilter(ctx, cf)
|
lightNode.FilterLightnode().Unsubscribe(ctx, cf)
|
||||||
}()
|
}()
|
||||||
// Wait for a SIGINT or SIGTERM signal
|
// Wait for a SIGINT or SIGTERM signal
|
||||||
ch := make(chan os.Signal, 1)
|
ch := make(chan os.Signal, 1)
|
||||||
|
@ -107,7 +107,7 @@ func NewNode(configJSON string) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if *config.EnableFilter {
|
if *config.EnableFilter {
|
||||||
opts = append(opts, node.WithWakuFilter(false))
|
opts = append(opts, node.WithLegacyWakuFilter(false))
|
||||||
}
|
}
|
||||||
|
|
||||||
if *config.EnableStore {
|
if *config.EnableStore {
|
||||||
|
@ -6,8 +6,8 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb"
|
||||||
)
|
)
|
||||||
|
|
||||||
type FilterArgument struct {
|
type FilterArgument struct {
|
||||||
@ -15,14 +15,14 @@ type FilterArgument struct {
|
|||||||
ContentFilters []*pb.FilterRequest_ContentFilter `json:"contentFilters,omitempty"`
|
ContentFilters []*pb.FilterRequest_ContentFilter `json:"contentFilters,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func toContentFilter(filterJSON string) (filter.ContentFilter, error) {
|
func toContentFilter(filterJSON string) (legacy_filter.ContentFilter, error) {
|
||||||
var f FilterArgument
|
var f FilterArgument
|
||||||
err := json.Unmarshal([]byte(filterJSON), &f)
|
err := json.Unmarshal([]byte(filterJSON), &f)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return filter.ContentFilter{}, err
|
return legacy_filter.ContentFilter{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
result := filter.ContentFilter{
|
result := legacy_filter.ContentFilter{
|
||||||
Topic: f.Topic,
|
Topic: f.Topic,
|
||||||
}
|
}
|
||||||
for _, cf := range f.ContentFilters {
|
for _, cf := range f.ContentFilters {
|
||||||
@ -52,23 +52,23 @@ func FilterSubscribe(filterJSON string, peerID string, ms int) string {
|
|||||||
ctx = context.Background()
|
ctx = context.Background()
|
||||||
}
|
}
|
||||||
|
|
||||||
var fOptions []filter.FilterSubscribeOption
|
var fOptions []legacy_filter.FilterSubscribeOption
|
||||||
if peerID != "" {
|
if peerID != "" {
|
||||||
p, err := peer.Decode(peerID)
|
p, err := peer.Decode(peerID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return MakeJSONResponse(err)
|
return MakeJSONResponse(err)
|
||||||
}
|
}
|
||||||
fOptions = append(fOptions, filter.WithPeer(p))
|
fOptions = append(fOptions, legacy_filter.WithPeer(p))
|
||||||
} else {
|
} else {
|
||||||
fOptions = append(fOptions, filter.WithAutomaticPeerSelection())
|
fOptions = append(fOptions, legacy_filter.WithAutomaticPeerSelection())
|
||||||
}
|
}
|
||||||
|
|
||||||
_, f, err := wakuState.node.Filter().Subscribe(ctx, cf, fOptions...)
|
_, f, err := wakuState.node.LegacyFilter().Subscribe(ctx, cf, fOptions...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return MakeJSONResponse(err)
|
return MakeJSONResponse(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
go func(f filter.Filter) {
|
go func(f legacy_filter.Filter) {
|
||||||
for envelope := range f.Chan {
|
for envelope := range f.Chan {
|
||||||
send("message", toSubscriptionMessage(envelope))
|
send("message", toSubscriptionMessage(envelope))
|
||||||
}
|
}
|
||||||
@ -97,7 +97,7 @@ func FilterUnsubscribe(filterJSON string, ms int) string {
|
|||||||
ctx = context.Background()
|
ctx = context.Background()
|
||||||
}
|
}
|
||||||
|
|
||||||
err = wakuState.node.Filter().UnsubscribeFilter(ctx, cf)
|
err = wakuState.node.LegacyFilter().UnsubscribeFilter(ctx, cf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return MakeJSONResponse(err)
|
return MakeJSONResponse(err)
|
||||||
}
|
}
|
||||||
|
13
waku/node.go
13
waku/node.go
@ -41,7 +41,7 @@ import (
|
|||||||
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
|
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
|
||||||
"github.com/waku-org/go-waku/waku/v2/node"
|
"github.com/waku-org/go-waku/waku/v2/node"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filterv2"
|
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
|
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange"
|
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||||
@ -165,13 +165,12 @@ func Execute(options Options) {
|
|||||||
|
|
||||||
if options.Filter.Enable {
|
if options.Filter.Enable {
|
||||||
if options.Filter.UseV2 {
|
if options.Filter.UseV2 {
|
||||||
|
nodeOpts = append(nodeOpts, node.WithWakuFilterLightNode())
|
||||||
if !options.Filter.DisableFullNode {
|
if !options.Filter.DisableFullNode {
|
||||||
nodeOpts = append(nodeOpts, node.WithWakuFilterV2LightNode())
|
nodeOpts = append(nodeOpts, node.WithWakuFilterFullNode(filter.WithTimeout(options.Filter.Timeout)))
|
||||||
} else {
|
|
||||||
nodeOpts = append(nodeOpts, node.WithWakuFilterV2FullNode(filterv2.WithTimeout(options.Filter.Timeout)))
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
nodeOpts = append(nodeOpts, node.WithWakuFilter(!options.Filter.DisableFullNode, filter.WithTimeout(options.Filter.Timeout)))
|
nodeOpts = append(nodeOpts, node.WithLegacyWakuFilter(!options.Filter.DisableFullNode, legacy_filter.WithTimeout(options.Filter.Timeout)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -262,9 +261,9 @@ func Execute(options Options) {
|
|||||||
addPeers(wakuNode, options.Rendezvous.Nodes, rendezvous.RendezvousID)
|
addPeers(wakuNode, options.Rendezvous.Nodes, rendezvous.RendezvousID)
|
||||||
|
|
||||||
if options.Filter.UseV2 {
|
if options.Filter.UseV2 {
|
||||||
addPeers(wakuNode, options.Filter.Nodes, filter.FilterID_v20beta1)
|
addPeers(wakuNode, options.Filter.Nodes, legacy_filter.FilterID_v20beta1)
|
||||||
} else {
|
} else {
|
||||||
addPeers(wakuNode, options.Filter.Nodes, filter.FilterID_v20beta1)
|
addPeers(wakuNode, options.Filter.Nodes, legacy_filter.FilterID_v20beta1)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = wakuNode.Start(ctx); err != nil {
|
if err = wakuNode.Start(ctx); err != nil {
|
||||||
|
@ -10,7 +10,7 @@ import (
|
|||||||
"github.com/multiformats/go-multiaddr"
|
"github.com/multiformats/go-multiaddr"
|
||||||
"github.com/waku-org/go-waku/logging"
|
"github.com/waku-org/go-waku/logging"
|
||||||
"github.com/waku-org/go-waku/waku/v2/metrics"
|
"github.com/waku-org/go-waku/waku/v2/metrics"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
|
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||||
@ -131,7 +131,7 @@ func (w *WakuNode) Status() (isOnline bool, hasHistory bool) {
|
|||||||
if !hasStore && protocol == store.StoreID_v20beta4 {
|
if !hasStore && protocol == store.StoreID_v20beta4 {
|
||||||
hasStore = true
|
hasStore = true
|
||||||
}
|
}
|
||||||
if !hasFilter && protocol == filter.FilterID_v20beta1 {
|
if !hasFilter && protocol == legacy_filter.FilterID_v20beta1 {
|
||||||
hasFilter = true
|
hasFilter = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -33,7 +33,7 @@ import (
|
|||||||
"github.com/waku-org/go-waku/waku/v2/discv5"
|
"github.com/waku-org/go-waku/waku/v2/discv5"
|
||||||
"github.com/waku-org/go-waku/waku/v2/metrics"
|
"github.com/waku-org/go-waku/waku/v2/metrics"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filterv2"
|
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
|
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange"
|
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange"
|
||||||
@ -76,17 +76,17 @@ type WakuNode struct {
|
|||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
timesource timesource.Timesource
|
timesource timesource.Timesource
|
||||||
|
|
||||||
relay Service
|
relay Service
|
||||||
lightPush Service
|
lightPush Service
|
||||||
peerConnector PeerConnectorService
|
peerConnector PeerConnectorService
|
||||||
discoveryV5 Service
|
discoveryV5 Service
|
||||||
peerExchange Service
|
peerExchange Service
|
||||||
rendezvous Service
|
rendezvous Service
|
||||||
filter ReceptorService
|
legacyFilter ReceptorService
|
||||||
filterV2Full ReceptorService
|
filterFullnode ReceptorService
|
||||||
filterV2Light Service
|
filterLightnode Service
|
||||||
store ReceptorService
|
store ReceptorService
|
||||||
rlnRelay RLNRelay
|
rlnRelay RLNRelay
|
||||||
|
|
||||||
wakuFlag utils.WakuEnrBitfield
|
wakuFlag utils.WakuEnrBitfield
|
||||||
|
|
||||||
@ -179,7 +179,7 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
|
|||||||
w.log = params.logger.Named("node2")
|
w.log = params.logger.Named("node2")
|
||||||
w.wg = &sync.WaitGroup{}
|
w.wg = &sync.WaitGroup{}
|
||||||
w.keepAliveFails = make(map[peer.ID]int)
|
w.keepAliveFails = make(map[peer.ID]int)
|
||||||
w.wakuFlag = utils.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableFilter, w.opts.enableStore, w.opts.enableRelay)
|
w.wakuFlag = utils.NewWakuEnrBitfield(w.opts.enableLightPush, w.opts.enableLefacyFilter, w.opts.enableStore, w.opts.enableRelay)
|
||||||
|
|
||||||
if params.enableNTP {
|
if params.enableNTP {
|
||||||
w.timesource = timesource.NewNTPTimesource(w.opts.ntpURLs, w.log)
|
w.timesource = timesource.NewNTPTimesource(w.opts.ntpURLs, w.log)
|
||||||
@ -225,9 +225,9 @@ func New(opts ...WakuNodeOption) (*WakuNode, error) {
|
|||||||
|
|
||||||
w.rendezvous = rendezvous.NewRendezvous(w.host, w.opts.enableRendezvousServer, w.opts.rendezvousDB, w.opts.enableRendezvous, rendezvousPoints, w.peerConnector, w.log)
|
w.rendezvous = rendezvous.NewRendezvous(w.host, w.opts.enableRendezvousServer, w.opts.rendezvousDB, w.opts.enableRendezvous, rendezvousPoints, w.peerConnector, w.log)
|
||||||
w.relay = relay.NewWakuRelay(w.host, w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...)
|
w.relay = relay.NewWakuRelay(w.host, w.bcaster, w.opts.minRelayPeersToPublish, w.timesource, w.log, w.opts.wOpts...)
|
||||||
w.filter = filter.NewWakuFilter(w.host, w.bcaster, w.opts.isFilterFullNode, w.timesource, w.log, w.opts.filterOpts...)
|
w.legacyFilter = legacy_filter.NewWakuFilter(w.host, w.bcaster, w.opts.isLegacyFilterFullnode, w.timesource, w.log, w.opts.legacyFilterOpts...)
|
||||||
w.filterV2Full = filterv2.NewWakuFilterFullnode(w.host, w.bcaster, w.timesource, w.log, w.opts.filterV2Opts...)
|
w.filterFullnode = filter.NewWakuFilterFullnode(w.host, w.bcaster, w.timesource, w.log, w.opts.filterOpts...)
|
||||||
w.filterV2Light = filterv2.NewWakuFilterLightnode(w.host, w.bcaster, w.timesource, w.log)
|
w.filterLightnode = filter.NewWakuFilterLightnode(w.host, w.bcaster, w.timesource, w.log)
|
||||||
w.lightPush = lightpush.NewWakuLightPush(w.host, w.Relay(), w.log)
|
w.lightPush = lightpush.NewWakuLightPush(w.host, w.Relay(), w.log)
|
||||||
|
|
||||||
if params.storeFactory != nil {
|
if params.storeFactory != nil {
|
||||||
@ -356,28 +356,28 @@ func (w *WakuNode) Start(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if w.opts.enableFilter {
|
if w.opts.enableLefacyFilter {
|
||||||
err := w.filter.Start(ctx)
|
err := w.legacyFilter.Start(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
w.log.Info("Subscribing filter to broadcaster")
|
w.log.Info("Subscribing filter to broadcaster")
|
||||||
w.bcaster.Register(nil, w.filter.MessageChannel())
|
w.bcaster.Register(nil, w.legacyFilter.MessageChannel())
|
||||||
}
|
}
|
||||||
|
|
||||||
if w.opts.enableFilterV2FullNode {
|
if w.opts.enableFilterFullnode {
|
||||||
err := w.filterV2Full.Start(ctx)
|
err := w.filterFullnode.Start(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
w.log.Info("Subscribing filterV2 to broadcaster")
|
w.log.Info("Subscribing filterV2 to broadcaster")
|
||||||
w.bcaster.Register(nil, w.filterV2Full.MessageChannel())
|
w.bcaster.Register(nil, w.filterFullnode.MessageChannel())
|
||||||
}
|
}
|
||||||
|
|
||||||
if w.opts.enableFilterV2LightNode {
|
if w.opts.enableFilterLightNode {
|
||||||
err := w.filterV2Light.Start(ctx)
|
err := w.filterLightnode.Start(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -434,8 +434,8 @@ func (w *WakuNode) Stop() {
|
|||||||
w.relay.Stop()
|
w.relay.Stop()
|
||||||
w.lightPush.Stop()
|
w.lightPush.Stop()
|
||||||
w.store.Stop()
|
w.store.Stop()
|
||||||
w.filter.Stop()
|
w.legacyFilter.Stop()
|
||||||
w.filterV2Full.Stop()
|
w.filterFullnode.Stop()
|
||||||
w.peerExchange.Stop()
|
w.peerExchange.Stop()
|
||||||
|
|
||||||
if w.opts.enableDiscV5 {
|
if w.opts.enableDiscV5 {
|
||||||
@ -523,17 +523,25 @@ func (w *WakuNode) Store() store.Store {
|
|||||||
return w.store.(store.Store)
|
return w.store.(store.Store)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Filter is used to access any operation related to Waku Filter protocol
|
// LegacyFilter is used to access any operation related to Waku LegacyFilter protocol
|
||||||
func (w *WakuNode) Filter() *filter.WakuFilter {
|
func (w *WakuNode) LegacyFilter() *legacy_filter.WakuFilter {
|
||||||
if result, ok := w.filter.(*filter.WakuFilter); ok {
|
if result, ok := w.legacyFilter.(*legacy_filter.WakuFilter); ok {
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// FilterV2 is used to access any operation related to Waku Filter protocol
|
// FilterLightnode is used to access any operation related to Waku Filter protocol Full node feature
|
||||||
func (w *WakuNode) FilterV2() *filterv2.WakuFilterLightnode {
|
func (w *WakuNode) FilterFullnode() *filter.WakuFilterFullNode {
|
||||||
if result, ok := w.filterV2Light.(*filterv2.WakuFilterLightnode); ok {
|
if result, ok := w.filterFullnode.(*filter.WakuFilterFullNode); ok {
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FilterFullnode is used to access any operation related to Waku Filter protocol Light node feature
|
||||||
|
func (w *WakuNode) FilterLightnode() *filter.WakuFilterLightnode {
|
||||||
|
if result, ok := w.filterLightnode.(*filter.WakuFilterLightnode); ok {
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
@ -14,7 +14,7 @@ import (
|
|||||||
"github.com/waku-org/go-waku/tests"
|
"github.com/waku-org/go-waku/tests"
|
||||||
"github.com/waku-org/go-waku/waku/persistence"
|
"github.com/waku-org/go-waku/waku/persistence"
|
||||||
"github.com/waku-org/go-waku/waku/persistence/sqlite"
|
"github.com/waku-org/go-waku/waku/persistence/sqlite"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||||
@ -175,7 +175,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
|
|||||||
wakuNode1, err := New(
|
wakuNode1, err := New(
|
||||||
WithHostAddress(hostAddr1),
|
WithHostAddress(hostAddr1),
|
||||||
WithWakuRelay(),
|
WithWakuRelay(),
|
||||||
WithWakuFilter(true),
|
WithLegacyWakuFilter(true),
|
||||||
)
|
)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
err = wakuNode1.Start(ctx)
|
err = wakuNode1.Start(ctx)
|
||||||
@ -192,7 +192,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
wakuNode2, err := New(
|
wakuNode2, err := New(
|
||||||
WithHostAddress(hostAddr2),
|
WithHostAddress(hostAddr2),
|
||||||
WithWakuFilter(false),
|
WithLegacyWakuFilter(false),
|
||||||
WithWakuStore(),
|
WithWakuStore(),
|
||||||
WithMessageProvider(dbStore),
|
WithMessageProvider(dbStore),
|
||||||
)
|
)
|
||||||
@ -206,7 +206,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
|
|||||||
|
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
_, filter, err := wakuNode2.Filter().Subscribe(ctx, filter.ContentFilter{
|
_, filter, err := wakuNode2.LegacyFilter().Subscribe(ctx, legacy_filter.ContentFilter{
|
||||||
Topic: string(relay.DefaultWakuTopic),
|
Topic: string(relay.DefaultWakuTopic),
|
||||||
})
|
})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@ -241,7 +241,7 @@ func TestDecoupledStoreFromRelay(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
wakuNode3, err := New(
|
wakuNode3, err := New(
|
||||||
WithHostAddress(hostAddr3),
|
WithHostAddress(hostAddr3),
|
||||||
WithWakuFilter(false),
|
WithLegacyWakuFilter(false),
|
||||||
)
|
)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
err = wakuNode3.Start(ctx)
|
err = wakuNode3.Start(ctx)
|
||||||
|
@ -27,7 +27,7 @@ import (
|
|||||||
ma "github.com/multiformats/go-multiaddr"
|
ma "github.com/multiformats/go-multiaddr"
|
||||||
manet "github.com/multiformats/go-multiaddr/net"
|
manet "github.com/multiformats/go-multiaddr/net"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filterv2"
|
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||||
"github.com/waku-org/go-waku/waku/v2/rendezvous"
|
"github.com/waku-org/go-waku/waku/v2/rendezvous"
|
||||||
@ -64,15 +64,15 @@ type WakuNodeParameters struct {
|
|||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
logLevel logging.LogLevel
|
logLevel logging.LogLevel
|
||||||
|
|
||||||
noDefaultWakuTopic bool
|
noDefaultWakuTopic bool
|
||||||
enableRelay bool
|
enableRelay bool
|
||||||
enableFilter bool
|
enableLefacyFilter bool
|
||||||
isFilterFullNode bool
|
isLegacyFilterFullnode bool
|
||||||
enableFilterV2LightNode bool
|
enableFilterLightNode bool
|
||||||
enableFilterV2FullNode bool
|
enableFilterFullnode bool
|
||||||
filterOpts []filter.Option
|
legacyFilterOpts []legacy_filter.Option
|
||||||
filterV2Opts []filterv2.Option
|
filterOpts []filter.Option
|
||||||
wOpts []pubsub.Option
|
wOpts []pubsub.Option
|
||||||
|
|
||||||
minRelayPeersToPublish int
|
minRelayPeersToPublish int
|
||||||
|
|
||||||
@ -319,31 +319,31 @@ func WithPeerExchange() WakuNodeOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithWakuFilter enables the Waku Filter protocol. This WakuNodeOption
|
// WithLegacyWakuFilter enables the legacy Waku Filter protocol. This WakuNodeOption
|
||||||
// accepts a list of WakuFilter gossipsub options to setup the protocol
|
// accepts a list of WakuFilter gossipsub options to setup the protocol
|
||||||
func WithWakuFilter(fullNode bool, filterOpts ...filter.Option) WakuNodeOption {
|
func WithLegacyWakuFilter(fullnode bool, filterOpts ...legacy_filter.Option) WakuNodeOption {
|
||||||
return func(params *WakuNodeParameters) error {
|
return func(params *WakuNodeParameters) error {
|
||||||
params.enableFilter = true
|
params.enableLefacyFilter = true
|
||||||
params.isFilterFullNode = fullNode
|
params.isLegacyFilterFullnode = fullnode
|
||||||
params.filterOpts = filterOpts
|
params.legacyFilterOpts = filterOpts
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithWakuFilterV2 enables the Waku Filter V2 protocol for lightnode functionality
|
// WithWakuFilter enables the Waku Filter V2 protocol for lightnode functionality
|
||||||
func WithWakuFilterV2LightNode() WakuNodeOption {
|
func WithWakuFilterLightNode() WakuNodeOption {
|
||||||
return func(params *WakuNodeParameters) error {
|
return func(params *WakuNodeParameters) error {
|
||||||
params.enableFilterV2LightNode = true
|
params.enableFilterLightNode = true
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithWakuFilterV2FullNode enables the Waku Filter V2 protocol full node functionality.
|
// WithWakuFilterFullNode enables the Waku Filter V2 protocol full node functionality.
|
||||||
// This WakuNodeOption accepts a list of WakuFilter options to setup the protocol
|
// This WakuNodeOption accepts a list of WakuFilter options to setup the protocol
|
||||||
func WithWakuFilterV2FullNode(filterOpts ...filterv2.Option) WakuNodeOption {
|
func WithWakuFilterFullNode(filterOpts ...filter.Option) WakuNodeOption {
|
||||||
return func(params *WakuNodeParameters) error {
|
return func(params *WakuNodeParameters) error {
|
||||||
params.enableFilterV2FullNode = true
|
params.enableFilterFullnode = true
|
||||||
params.filterV2Opts = filterOpts
|
params.filterOpts = filterOpts
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -38,7 +38,7 @@ func TestWakuOptions(t *testing.T) {
|
|||||||
WithPrivateKey(prvKey),
|
WithPrivateKey(prvKey),
|
||||||
WithLibP2POptions(),
|
WithLibP2POptions(),
|
||||||
WithWakuRelay(),
|
WithWakuRelay(),
|
||||||
WithWakuFilter(true),
|
WithLegacyWakuFilter(true),
|
||||||
WithDiscoveryV5(123, nil, false),
|
WithDiscoveryV5(123, nil, false),
|
||||||
WithWakuStore(),
|
WithWakuStore(),
|
||||||
WithMessageProvider(&persistence.DBStore{}),
|
WithMessageProvider(&persistence.DBStore{}),
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
package filterv2
|
package filter
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@ -18,7 +18,7 @@ import (
|
|||||||
v2 "github.com/waku-org/go-waku/waku/v2"
|
v2 "github.com/waku-org/go-waku/waku/v2"
|
||||||
"github.com/waku-org/go-waku/waku/v2/metrics"
|
"github.com/waku-org/go-waku/waku/v2/metrics"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filterv2/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/filter/pb"
|
||||||
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||||
"go.opencensus.io/tag"
|
"go.opencensus.io/tag"
|
||||||
@ -82,7 +82,7 @@ func (wf *WakuFilterLightnode) Start(ctx context.Context) error {
|
|||||||
|
|
||||||
wf.h.SetStreamHandlerMatch(FilterPushID_v20beta1, protocol.PrefixTextMatch(string(FilterPushID_v20beta1)), wf.onRequest(ctx))
|
wf.h.SetStreamHandlerMatch(FilterPushID_v20beta1, protocol.PrefixTextMatch(string(FilterPushID_v20beta1)), wf.onRequest(ctx))
|
||||||
|
|
||||||
wf.log.Info("filter protocol (light) started")
|
wf.log.Info("filter-push protocol started")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
@ -1,4 +1,4 @@
|
|||||||
package filterv2
|
package filter
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
@ -1,4 +1,4 @@
|
|||||||
package filterv2
|
package filter
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
@ -1,4 +1,4 @@
|
|||||||
package filterv2
|
package filter
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
@ -1,4 +1,4 @@
|
|||||||
package filterv2
|
package filter
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
@ -1,3 +1,3 @@
|
|||||||
package pb
|
package pb
|
||||||
|
|
||||||
//go:generate protoc -I./../../pb/. -I. --go_opt=paths=source_relative --go_opt=Mwaku_filter.proto=github.com/waku-org/go-waku/waku/v2/protocol/filter/pb --go_opt=Mwaku_message.proto=github.com/waku-org/go-waku/waku/v2/protocol/pb --go_out=. ./waku_filter.proto
|
//go:generate protoc -I./../../pb/. -I. --go_opt=paths=source_relative --go_opt=Mwaku_filter_v2.proto=github.com/waku-org/go-waku/waku/v2/protocol/filter/pb --go_opt=Mwaku_message.proto=github.com/waku-org/go-waku/waku/v2/protocol/pb --go_out=. ./waku_filter_v2.proto
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
package filterv2
|
package filter
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@ -17,7 +17,7 @@ import (
|
|||||||
v2 "github.com/waku-org/go-waku/waku/v2"
|
v2 "github.com/waku-org/go-waku/waku/v2"
|
||||||
"github.com/waku-org/go-waku/waku/v2/metrics"
|
"github.com/waku-org/go-waku/waku/v2/metrics"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filterv2/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/filter/pb"
|
||||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||||
"go.opencensus.io/tag"
|
"go.opencensus.io/tag"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
@ -30,7 +30,7 @@ const FilterSubscribeID_v20beta1 = libp2pProtocol.ID("/vac/waku/filter-subscribe
|
|||||||
const peerHasNoSubscription = "peer has no subscriptions"
|
const peerHasNoSubscription = "peer has no subscriptions"
|
||||||
|
|
||||||
type (
|
type (
|
||||||
WakuFilterFull struct {
|
WakuFilterFullNode struct {
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
h host.Host
|
h host.Host
|
||||||
msgC chan *protocol.Envelope
|
msgC chan *protocol.Envelope
|
||||||
@ -44,8 +44,8 @@ type (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// NewWakuFilterFullnode returns a new instance of Waku Filter struct setup according to the chosen parameter and options
|
// NewWakuFilterFullnode returns a new instance of Waku Filter struct setup according to the chosen parameter and options
|
||||||
func NewWakuFilterFullnode(host host.Host, broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger, opts ...Option) *WakuFilterFull {
|
func NewWakuFilterFullnode(host host.Host, broadcaster v2.Broadcaster, timesource timesource.Timesource, log *zap.Logger, opts ...Option) *WakuFilterFullNode {
|
||||||
wf := new(WakuFilterFull)
|
wf := new(WakuFilterFullNode)
|
||||||
wf.log = log.Named("filterv2-fullnode")
|
wf.log = log.Named("filterv2-fullnode")
|
||||||
|
|
||||||
params := new(FilterParameters)
|
params := new(FilterParameters)
|
||||||
@ -63,7 +63,7 @@ func NewWakuFilterFullnode(host host.Host, broadcaster v2.Broadcaster, timesourc
|
|||||||
return wf
|
return wf
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wf *WakuFilterFull) Start(ctx context.Context) error {
|
func (wf *WakuFilterFullNode) Start(ctx context.Context) error {
|
||||||
wf.wg.Wait() // Wait for any goroutines to stop
|
wf.wg.Wait() // Wait for any goroutines to stop
|
||||||
|
|
||||||
ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter"))
|
ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter"))
|
||||||
@ -82,12 +82,12 @@ func (wf *WakuFilterFull) Start(ctx context.Context) error {
|
|||||||
wf.wg.Add(1)
|
wf.wg.Add(1)
|
||||||
go wf.filterListener(ctx)
|
go wf.filterListener(ctx)
|
||||||
|
|
||||||
wf.log.Info("filter protocol (full) started")
|
wf.log.Info("filter-subscriber protocol started")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wf *WakuFilterFull) onRequest(ctx context.Context) func(s network.Stream) {
|
func (wf *WakuFilterFullNode) onRequest(ctx context.Context) func(s network.Stream) {
|
||||||
return func(s network.Stream) {
|
return func(s network.Stream) {
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
logger := wf.log.With(logging.HostID("peer", s.Conn().RemotePeer()))
|
logger := wf.log.With(logging.HostID("peer", s.Conn().RemotePeer()))
|
||||||
@ -137,7 +137,7 @@ func reply(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequ
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wf *WakuFilterFull) ping(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
|
func (wf *WakuFilterFullNode) ping(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
|
||||||
exists := wf.subscriptions.Has(s.Conn().RemotePeer())
|
exists := wf.subscriptions.Has(s.Conn().RemotePeer())
|
||||||
|
|
||||||
if exists {
|
if exists {
|
||||||
@ -147,7 +147,7 @@ func (wf *WakuFilterFull) ping(s network.Stream, logger *zap.Logger, request *pb
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wf *WakuFilterFull) subscribe(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
|
func (wf *WakuFilterFullNode) subscribe(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
|
||||||
if request.PubsubTopic == "" {
|
if request.PubsubTopic == "" {
|
||||||
reply(s, logger, request, http.StatusBadRequest, "pubsubtopic can't be empty")
|
reply(s, logger, request, http.StatusBadRequest, "pubsubtopic can't be empty")
|
||||||
return
|
return
|
||||||
@ -186,7 +186,7 @@ func (wf *WakuFilterFull) subscribe(s network.Stream, logger *zap.Logger, reques
|
|||||||
reply(s, logger, request, http.StatusOK)
|
reply(s, logger, request, http.StatusOK)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wf *WakuFilterFull) unsubscribe(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
|
func (wf *WakuFilterFullNode) unsubscribe(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
|
||||||
if request.PubsubTopic == "" {
|
if request.PubsubTopic == "" {
|
||||||
reply(s, logger, request, http.StatusBadRequest, "pubsubtopic can't be empty")
|
reply(s, logger, request, http.StatusBadRequest, "pubsubtopic can't be empty")
|
||||||
return
|
return
|
||||||
@ -209,7 +209,7 @@ func (wf *WakuFilterFull) unsubscribe(s network.Stream, logger *zap.Logger, requ
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wf *WakuFilterFull) unsubscribeAll(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
|
func (wf *WakuFilterFullNode) unsubscribeAll(s network.Stream, logger *zap.Logger, request *pb.FilterSubscribeRequest) {
|
||||||
err := wf.subscriptions.DeleteAll(s.Conn().RemotePeer())
|
err := wf.subscriptions.DeleteAll(s.Conn().RemotePeer())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
reply(s, logger, request, http.StatusNotFound, peerHasNoSubscription)
|
reply(s, logger, request, http.StatusNotFound, peerHasNoSubscription)
|
||||||
@ -218,7 +218,7 @@ func (wf *WakuFilterFull) unsubscribeAll(s network.Stream, logger *zap.Logger, r
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wf *WakuFilterFull) filterListener(ctx context.Context) {
|
func (wf *WakuFilterFullNode) filterListener(ctx context.Context) {
|
||||||
defer wf.wg.Done()
|
defer wf.wg.Done()
|
||||||
|
|
||||||
// This function is invoked for each message received
|
// This function is invoked for each message received
|
||||||
@ -255,7 +255,7 @@ func (wf *WakuFilterFull) filterListener(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wf *WakuFilterFull) pushMessage(ctx context.Context, peerID peer.ID, env *protocol.Envelope) error {
|
func (wf *WakuFilterFullNode) pushMessage(ctx context.Context, peerID peer.ID, env *protocol.Envelope) error {
|
||||||
logger := wf.log.With(logging.HostID("peer", peerID))
|
logger := wf.log.With(logging.HostID("peer", peerID))
|
||||||
|
|
||||||
messagePush := &pb.MessagePushV2{
|
messagePush := &pb.MessagePushV2{
|
||||||
@ -297,7 +297,7 @@ func (wf *WakuFilterFull) pushMessage(ctx context.Context, peerID peer.ID, env *
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Stop unmounts the filter protocol
|
// Stop unmounts the filter protocol
|
||||||
func (wf *WakuFilterFull) Stop() {
|
func (wf *WakuFilterFullNode) Stop() {
|
||||||
if wf.cancel == nil {
|
if wf.cancel == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -311,6 +311,6 @@ func (wf *WakuFilterFull) Stop() {
|
|||||||
wf.wg.Wait()
|
wf.wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (wf *WakuFilterFull) MessageChannel() chan *protocol.Envelope {
|
func (wf *WakuFilterFullNode) MessageChannel() chan *protocol.Envelope {
|
||||||
return wf.msgC
|
return wf.msgC
|
||||||
}
|
}
|
@ -1,4 +1,4 @@
|
|||||||
package filterv2
|
package filter
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/hex"
|
"encoding/hex"
|
@ -1,4 +1,4 @@
|
|||||||
package filterv2
|
package filter
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
@ -1,4 +1,4 @@
|
|||||||
package filterv2
|
package filter
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
@ -1,4 +1,4 @@
|
|||||||
package filterv2
|
package filter
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
@ -1,3 +0,0 @@
|
|||||||
package pb
|
|
||||||
|
|
||||||
//go:generate protoc -I./../../pb/. -I. --go_opt=paths=source_relative --go_opt=Mwaku_filter_v2.proto=github.com/waku-org/go-waku/waku/v2/protocol/filterv2/pb --go_opt=Mwaku_message.proto=github.com/waku-org/go-waku/waku/v2/protocol/pb --go_out=. ./waku_filter_v2.proto
|
|
@ -1,4 +1,4 @@
|
|||||||
package filter
|
package legacy_filter
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
@ -1,4 +1,4 @@
|
|||||||
package filter
|
package legacy_filter
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
@ -1,11 +1,11 @@
|
|||||||
package filter
|
package legacy_filter
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Subscriber struct {
|
type Subscriber struct {
|
@ -1,4 +1,4 @@
|
|||||||
package filter
|
package legacy_filter
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
||||||
@ -7,7 +7,7 @@ import (
|
|||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/libp2p/go-libp2p/core/test"
|
"github.com/libp2p/go-libp2p/core/test"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb"
|
||||||
)
|
)
|
||||||
|
|
||||||
const TOPIC = "/test/topic"
|
const TOPIC = "/test/topic"
|
3
waku/v2/protocol/legacy_filter/pb/generate.go
Normal file
3
waku/v2/protocol/legacy_filter/pb/generate.go
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
package pb
|
||||||
|
|
||||||
|
//go:generate protoc -I./../../pb/. -I. --go_opt=paths=source_relative --go_opt=Mwaku_filter.proto=github.com/waku-org/go-waku/waku/v2/protocol/filter/pb --go_opt=Mwaku_message.proto=github.com/waku-org/go-waku/waku/v2/protocol/pb --go_out=. ./waku_filter.proto
|
@ -1,4 +1,4 @@
|
|||||||
package filter
|
package legacy_filter
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@ -16,7 +16,7 @@ import (
|
|||||||
v2 "github.com/waku-org/go-waku/waku/v2"
|
v2 "github.com/waku-org/go-waku/waku/v2"
|
||||||
"github.com/waku-org/go-waku/waku/v2/metrics"
|
"github.com/waku-org/go-waku/waku/v2/metrics"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb"
|
||||||
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
"github.com/waku-org/go-waku/waku/v2/timesource"
|
@ -1,4 +1,4 @@
|
|||||||
package filter
|
package legacy_filter
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
@ -1,4 +1,4 @@
|
|||||||
package filter
|
package legacy_filter
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
@ -1,4 +1,4 @@
|
|||||||
package filter
|
package legacy_filter
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
@ -9,7 +9,7 @@ import (
|
|||||||
|
|
||||||
"github.com/waku-org/go-waku/waku/v2/node"
|
"github.com/waku-org/go-waku/waku/v2/node"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filterv2"
|
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
|
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
"github.com/waku-org/go-waku/waku/v2/protocol/store"
|
||||||
@ -55,9 +55,9 @@ func (a *AdminService) PostV1Peers(req *http.Request, args *PeersArgs, reply *Su
|
|||||||
}
|
}
|
||||||
|
|
||||||
func isWakuProtocol(protocol protocol.ID) bool {
|
func isWakuProtocol(protocol protocol.ID) bool {
|
||||||
return protocol == filter.FilterID_v20beta1 ||
|
return protocol == legacy_filter.FilterID_v20beta1 ||
|
||||||
protocol == filterv2.FilterPushID_v20beta1 ||
|
protocol == filter.FilterPushID_v20beta1 ||
|
||||||
protocol == filterv2.FilterSubscribeID_v20beta1 ||
|
protocol == filter.FilterSubscribeID_v20beta1 ||
|
||||||
protocol == relay.WakuRelayID_v200 ||
|
protocol == relay.WakuRelayID_v200 ||
|
||||||
protocol == lightpush.LightPushID_v20beta1 ||
|
protocol == lightpush.LightPushID_v20beta1 ||
|
||||||
protocol == store.StoreID_v20beta4
|
protocol == store.StoreID_v20beta4
|
||||||
|
@ -7,8 +7,8 @@ import (
|
|||||||
|
|
||||||
"github.com/waku-org/go-waku/waku/v2/node"
|
"github.com/waku-org/go-waku/waku/v2/node"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol"
|
"github.com/waku-org/go-waku/waku/v2/protocol"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb"
|
||||||
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
@ -44,13 +44,13 @@ func NewFilterService(node *node.WakuNode, cacheCapacity int, log *zap.Logger) *
|
|||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeContentFilter(args *FilterContentArgs) filter.ContentFilter {
|
func makeContentFilter(args *FilterContentArgs) legacy_filter.ContentFilter {
|
||||||
var contentTopics []string
|
var contentTopics []string
|
||||||
for _, contentFilter := range args.ContentFilters {
|
for _, contentFilter := range args.ContentFilters {
|
||||||
contentTopics = append(contentTopics, contentFilter.ContentTopic)
|
contentTopics = append(contentTopics, contentFilter.ContentTopic)
|
||||||
}
|
}
|
||||||
|
|
||||||
return filter.ContentFilter{
|
return legacy_filter.ContentFilter{
|
||||||
Topic: args.Topic,
|
Topic: args.Topic,
|
||||||
ContentTopics: contentTopics,
|
ContentTopics: contentTopics,
|
||||||
}
|
}
|
||||||
@ -82,10 +82,10 @@ func (f *FilterService) Stop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (f *FilterService) PostV1Subscription(req *http.Request, args *FilterContentArgs, reply *SuccessReply) error {
|
func (f *FilterService) PostV1Subscription(req *http.Request, args *FilterContentArgs, reply *SuccessReply) error {
|
||||||
_, _, err := f.node.Filter().Subscribe(
|
_, _, err := f.node.LegacyFilter().Subscribe(
|
||||||
req.Context(),
|
req.Context(),
|
||||||
makeContentFilter(args),
|
makeContentFilter(args),
|
||||||
filter.WithAutomaticPeerSelection(),
|
legacy_filter.WithAutomaticPeerSelection(),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
f.log.Error("subscribing to topic", zap.String("topic", args.Topic), zap.Error(err))
|
f.log.Error("subscribing to topic", zap.String("topic", args.Topic), zap.Error(err))
|
||||||
@ -100,7 +100,7 @@ func (f *FilterService) PostV1Subscription(req *http.Request, args *FilterConten
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (f *FilterService) DeleteV1Subscription(req *http.Request, args *FilterContentArgs, reply *SuccessReply) error {
|
func (f *FilterService) DeleteV1Subscription(req *http.Request, args *FilterContentArgs, reply *SuccessReply) error {
|
||||||
err := f.node.Filter().UnsubscribeFilter(
|
err := f.node.LegacyFilter().UnsubscribeFilter(
|
||||||
req.Context(),
|
req.Context(),
|
||||||
makeContentFilter(args),
|
makeContentFilter(args),
|
||||||
)
|
)
|
||||||
|
@ -12,8 +12,8 @@ import (
|
|||||||
"github.com/waku-org/go-waku/tests"
|
"github.com/waku-org/go-waku/tests"
|
||||||
v2 "github.com/waku-org/go-waku/waku/v2"
|
v2 "github.com/waku-org/go-waku/waku/v2"
|
||||||
"github.com/waku-org/go-waku/waku/v2/node"
|
"github.com/waku-org/go-waku/waku/v2/node"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
|
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/filter/pb"
|
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_filter/pb"
|
||||||
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
|
||||||
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
|
||||||
"github.com/waku-org/go-waku/waku/v2/timesource"
|
"github.com/waku-org/go-waku/waku/v2/timesource"
|
||||||
@ -25,7 +25,7 @@ var testTopic = "test"
|
|||||||
func makeFilterService(t *testing.T, isFullNode bool) *FilterService {
|
func makeFilterService(t *testing.T, isFullNode bool) *FilterService {
|
||||||
var nodeOpts []node.WakuNodeOption
|
var nodeOpts []node.WakuNodeOption
|
||||||
|
|
||||||
nodeOpts = append(nodeOpts, node.WithWakuFilter(isFullNode))
|
nodeOpts = append(nodeOpts, node.WithLegacyWakuFilter(isFullNode))
|
||||||
if isFullNode {
|
if isFullNode {
|
||||||
nodeOpts = append(nodeOpts, node.WithWakuRelay())
|
nodeOpts = append(nodeOpts, node.WithWakuRelay())
|
||||||
}
|
}
|
||||||
@ -57,7 +57,7 @@ func TestFilterSubscription(t *testing.T) {
|
|||||||
_, err = node.SubscribeToTopic(context.Background(), testTopic)
|
_, err = node.SubscribeToTopic(context.Background(), testTopic)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
f := filter.NewWakuFilter(host, v2.NewBroadcaster(10), false, timesource.NewDefaultClock(), utils.Logger())
|
f := legacy_filter.NewWakuFilter(host, v2.NewBroadcaster(10), false, timesource.NewDefaultClock(), utils.Logger())
|
||||||
err = f.Start(context.Background())
|
err = f.Start(context.Background())
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
@ -73,7 +73,7 @@ func TestFilterSubscription(t *testing.T) {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = d.node.AddPeer(addr, filter.FilterID_v20beta1)
|
_, err = d.node.AddPeer(addr, legacy_filter.FilterID_v20beta1)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
args := &FilterContentArgs{Topic: testTopic, ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: "ct"}}}
|
args := &FilterContentArgs{Topic: testTopic, ContentFilters: []*pb.FilterRequest_ContentFilter{{ContentTopic: "ct"}}}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user