chose node type when starting filter and make relay optional (#113)

This commit is contained in:
Richard Ramos 2021-10-30 10:29:34 -04:00 committed by GitHub
parent f2be4a8e7a
commit d94802f739
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 63 additions and 35 deletions

View File

@ -48,8 +48,15 @@ func NewChat(ctx context.Context, n *node.WakuNode, selfID peer.ID, contentTopic
} }
if useLightPush { if useLightPush {
chat.C = make(filter.ContentFilterChan) cf := filter.ContentFilter{
n.SubscribeFilter(ctx, string(relay.GetTopic(nil)), []string{contentTopic}, chat.C) Topic: string(relay.GetTopic(nil)),
ContentTopics: []string{contentTopic},
}
var err error
_, chat.C, err = n.SubscribeFilter(ctx, cf)
if err != nil {
return nil, err
}
} else { } else {
sub, err := n.Subscribe(ctx, nil) sub, err := n.Subscribe(ctx, nil)
if err != nil { if err != nil {

View File

@ -77,7 +77,7 @@ func main() {
opts := []node.WakuNodeOption{ opts := []node.WakuNodeOption{
node.WithPrivateKey(prvKey), node.WithPrivateKey(prvKey),
node.WithHostAddress([]net.Addr{hostAddr}), node.WithHostAddress([]*net.TCPAddr{hostAddr}),
node.WithWakuStore(false, true), node.WithWakuStore(false, true),
node.WithKeepAlive(time.Duration(*keepAliveFlag) * time.Second), node.WithKeepAlive(time.Duration(*keepAliveFlag) * time.Second),
} }
@ -87,7 +87,7 @@ func main() {
} }
if *filterFlag { if *filterFlag {
opts = append(opts, node.WithWakuFilter()) opts = append(opts, node.WithWakuFilter(false))
} }
if *lightPushFlag || *lightPushNodeFlag != "" { if *lightPushFlag || *lightPushNodeFlag != "" {

View File

@ -54,9 +54,9 @@ func main() {
fullNode, err := node.New(ctx, fullNode, err := node.New(ctx,
node.WithPrivateKey(prvKey1), node.WithPrivateKey(prvKey1),
node.WithHostAddress([]net.Addr{hostAddr1}), node.WithHostAddress([]*net.TCPAddr{hostAddr1}),
node.WithWakuRelay(), node.WithWakuRelay(),
node.WithWakuFilter(), node.WithWakuFilter(true),
) )
err = fullNode.Start() err = fullNode.Start()
@ -66,8 +66,8 @@ func main() {
lightNode, err := node.New(ctx, lightNode, err := node.New(ctx,
node.WithPrivateKey(prvKey2), node.WithPrivateKey(prvKey2),
node.WithHostAddress([]net.Addr{hostAddr2}), node.WithHostAddress([]*net.TCPAddr{hostAddr2}),
node.WithWakuFilter(), node.WithWakuFilter(false),
) )
_, err = lightNode.AddPeer(fullNode.ListenAddresses()[0], filter.FilterID_v20beta1) _, err = lightNode.AddPeer(fullNode.ListenAddresses()[0], filter.FilterID_v20beta1)

View File

@ -139,7 +139,7 @@ func Execute(options Options) {
} }
if options.Filter.Enable { if options.Filter.Enable {
nodeOpts = append(nodeOpts, node.WithWakuFilter()) nodeOpts = append(nodeOpts, node.WithWakuFilter(!options.Filter.DisableFullNode))
} }
if options.Store.Enable { if options.Store.Enable {
@ -178,10 +178,12 @@ func Execute(options Options) {
options.Relay.Topics = []string{string(relay.DefaultWakuTopic)} options.Relay.Topics = []string{string(relay.DefaultWakuTopic)}
} }
for _, t := range options.Relay.Topics { if !options.Relay.Disable {
nodeTopic := relay.Topic(t) for _, t := range options.Relay.Topics {
_, err := wakuNode.Subscribe(ctx, &nodeTopic) nodeTopic := relay.Topic(t)
failOnErr(err, "Error subscring to topic") _, err := wakuNode.Subscribe(ctx, &nodeTopic)
failOnErr(err, "Error subscring to topic")
}
} }
for _, n := range options.StaticNodes { for _, n := range options.StaticNodes {

View File

@ -16,8 +16,9 @@ type RelayOptions struct {
} }
type FilterOptions struct { type FilterOptions struct {
Enable bool `long:"filter" description:"Enable filter protocol"` Enable bool `long:"filter" description:"Enable filter protocol"`
Nodes []string `long:"filter-node" description:"Multiaddr of a peer that supports filter protocol. Option may be repeated"` DisableFullNode bool `long:"no-subscribers" description:"Don't accept filter subscribers"`
Nodes []string `long:"filter-node" description:"Multiaddr of a peer that supports filter protocol. Option may be repeated"`
} }
// LightpushOptions are settings used to enable the lightpush protocol. This is // LightpushOptions are settings used to enable the lightpush protocol. This is

View File

@ -155,9 +155,11 @@ func (w *WakuNode) Start() error {
w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(rendezvous, w.opts.rendezvousOpts...)) w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(rendezvous, w.opts.rendezvousOpts...))
} }
err := w.mountRelay(w.opts.enableRelay, w.opts.wOpts...) if w.opts.enableRelay {
if err != nil { err := w.mountRelay(w.opts.wOpts...)
return err if err != nil {
return err
}
} }
if w.opts.enableLightPush { if w.opts.enableLightPush {
@ -242,15 +244,16 @@ func (w *WakuNode) Filter() *filter.WakuFilter {
return w.filter return w.filter
} }
func (w *WakuNode) mountRelay(shouldRelayMessages bool, opts ...pubsub.Option) error { func (w *WakuNode) mountRelay(opts ...pubsub.Option) error {
var err error var err error
w.relay, err = relay.NewWakuRelay(w.ctx, w.host, opts...) w.relay, err = relay.NewWakuRelay(w.ctx, w.host, opts...)
if err != nil {
return err
}
if shouldRelayMessages { _, err = w.Subscribe(w.ctx, nil)
_, err := w.Subscribe(w.ctx, nil) if err != nil {
if err != nil { return err
return err
}
} }
// TODO: rlnRelay // TODO: rlnRelay
@ -265,7 +268,7 @@ func (w *WakuNode) mountFilter() error {
} }
} }
w.filter = filter.NewWakuFilter(w.ctx, w.host, filterHandler) w.filter = filter.NewWakuFilter(w.ctx, w.host, w.opts.isFilterFullNode, filterHandler)
return nil return nil
} }
@ -572,10 +575,6 @@ func (node *WakuNode) UnsubscribeFilter(ctx context.Context, cf filter.ContentFi
} }
func (node *WakuNode) Publish(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic) ([]byte, error) { func (node *WakuNode) Publish(ctx context.Context, message *pb.WakuMessage, topic *relay.Topic) ([]byte, error) {
if node.relay == nil {
return nil, errors.New("WakuRelay hasn't been set")
}
if message == nil { if message == nil {
return nil, errors.New("message can't be null") return nil, errors.New("message can't be null")
} }
@ -584,6 +583,10 @@ func (node *WakuNode) Publish(ctx context.Context, message *pb.WakuMessage, topi
return node.LightPush(ctx, message, topic) return node.LightPush(ctx, message, topic)
} }
if node.relay == nil {
return nil, errors.New("WakuRelay hasn't been set")
}
hash, err := node.relay.Publish(ctx, message, topic) hash, err := node.relay.Publish(ctx, message, topic)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -28,9 +28,10 @@ type WakuNodeParameters struct {
privKey *crypto.PrivKey privKey *crypto.PrivKey
libP2POpts []libp2p.Option libP2POpts []libp2p.Option
enableRelay bool enableRelay bool
enableFilter bool enableFilter bool
wOpts []pubsub.Option isFilterFullNode bool
wOpts []pubsub.Option
enableStore bool enableStore bool
shouldResume bool shouldResume bool
@ -151,9 +152,10 @@ func WithRendezvousServer(storage rendezvous.Storage) WakuNodeOption {
// WithWakuFilter enables the Waku V2 Filter protocol. This WakuNodeOption // WithWakuFilter enables the Waku V2 Filter protocol. This WakuNodeOption
// accepts a list of WakuFilter gossipsub options to setup the protocol // accepts a list of WakuFilter gossipsub options to setup the protocol
func WithWakuFilter(opts ...pubsub.Option) WakuNodeOption { func WithWakuFilter(fullNode bool) WakuNodeOption {
return func(params *WakuNodeParameters) error { return func(params *WakuNodeParameters) error {
params.enableFilter = true params.enableFilter = true
params.isFilterFullNode = fullNode
return nil return nil
} }
} }

View File

@ -66,6 +66,7 @@ type (
ctx context.Context ctx context.Context
h host.Host h host.Host
subscribers []Subscriber subscribers []Subscriber
isFullNode bool
pushHandler MessagePushHandler pushHandler MessagePushHandler
MsgC chan *protocol.Envelope MsgC chan *protocol.Envelope
} }
@ -145,7 +146,7 @@ func (wf *WakuFilter) onRequest(s network.Stream) {
log.Info("filter light node, received a message push. ", len(filterRPCRequest.Push.Messages), " messages") log.Info("filter light node, received a message push. ", len(filterRPCRequest.Push.Messages), " messages")
stats.Record(wf.ctx, metrics.Messages.M(int64(len(filterRPCRequest.Push.Messages)))) stats.Record(wf.ctx, metrics.Messages.M(int64(len(filterRPCRequest.Push.Messages))))
} else if filterRPCRequest.Request != nil { } else if filterRPCRequest.Request != nil && wf.isFullNode {
// We're on a full node. // We're on a full node.
// This is a filter request coming from a light node. // This is a filter request coming from a light node.
if filterRPCRequest.Request.Subscribe { if filterRPCRequest.Request.Subscribe {
@ -197,10 +198,13 @@ func (wf *WakuFilter) onRequest(s network.Stream) {
stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len(wf.subscribers)))) stats.Record(wf.ctx, metrics.FilterSubscriptions.M(int64(len(wf.subscribers))))
} }
} else {
log.Error("can't serve request")
return
} }
} }
func NewWakuFilter(ctx context.Context, host host.Host, handler MessagePushHandler) *WakuFilter { func NewWakuFilter(ctx context.Context, host host.Host, isFullNode bool, handler MessagePushHandler) *WakuFilter {
ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter")) ctx, err := tag.New(ctx, tag.Insert(metrics.KeyType, "filter"))
if err != nil { if err != nil {
log.Error(err) log.Error(err)
@ -211,10 +215,17 @@ func NewWakuFilter(ctx context.Context, host host.Host, handler MessagePushHandl
wf.MsgC = make(chan *protocol.Envelope) wf.MsgC = make(chan *protocol.Envelope)
wf.h = host wf.h = host
wf.pushHandler = handler wf.pushHandler = handler
wf.isFullNode = isFullNode
wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest) wf.h.SetStreamHandlerMatch(FilterID_v20beta1, protocol.PrefixTextMatch(string(FilterID_v20beta1)), wf.onRequest)
go wf.FilterListener() go wf.FilterListener()
if wf.isFullNode {
log.Info("Filter protocol started")
} else {
log.Info("Filter protocol started (only client mode)")
}
return wf return wf
} }

View File

@ -42,6 +42,8 @@ func NewWakuLightPush(ctx context.Context, h host.Host, relay *relay.WakuRelay)
if relay != nil { if relay != nil {
wakuLP.h.SetStreamHandlerMatch(LightPushID_v20beta1, protocol.PrefixTextMatch(string(LightPushID_v20beta1)), wakuLP.onRequest) wakuLP.h.SetStreamHandlerMatch(LightPushID_v20beta1, protocol.PrefixTextMatch(string(LightPushID_v20beta1)), wakuLP.onRequest)
log.Info("Light Push protocol started") log.Info("Light Push protocol started")
} else {
log.Info("Light Push protocol started (only client mode)")
} }
return wakuLP return wakuLP