From 2b225e90e71e9379b216f08b08b4e99ea50b768c Mon Sep 17 00:00:00 2001 From: Anthony Laibe Date: Mon, 6 Dec 2021 09:43:00 +0100 Subject: [PATCH] feat: Implement logic for publish from node --- waku/node.go | 2 +- waku/options.go | 7 +- waku/try/try.go | 36 ++++++++ waku/try/try_test.go | 89 +++++++++++++++++++ waku/v2/node/wakunode2.go | 37 +++++++- waku/v2/node/wakuoptions.go | 12 +++ waku/v2/protocol/filter/waku_filter_test.go | 2 +- waku/v2/protocol/lightpush/waku_lightpush.go | 8 ++ .../protocol/lightpush/waku_lightpush_test.go | 2 +- waku/v2/protocol/relay/waku_relay.go | 17 +++- waku/v2/protocol/relay/waku_relay_test.go | 2 +- waku/v2/rpc/admin_test.go | 2 +- waku/v2/rpc/filter_test.go | 2 +- waku/v2/rpc/relay_test.go | 2 +- 14 files changed, 206 insertions(+), 14 deletions(-) create mode 100644 waku/try/try.go create mode 100644 waku/try/try_test.go diff --git a/waku/node.go b/waku/node.go index 2941cedc..857a3741 100644 --- a/waku/node.go +++ b/waku/node.go @@ -162,7 +162,7 @@ func Execute(options Options) { if !options.Relay.Disable { var wakurelayopts []pubsub.Option wakurelayopts = append(wakurelayopts, pubsub.WithPeerExchange(options.Relay.PeerExchange)) - nodeOpts = append(nodeOpts, node.WithWakuRelay(wakurelayopts...)) + nodeOpts = append(nodeOpts, node.WithWakuRelayAndMinPeers(options.Relay.MinRelayPeersToPublish, wakurelayopts...)) } if options.RendezvousServer.Enable { diff --git a/waku/options.go b/waku/options.go index ff226546..d34359ca 100644 --- a/waku/options.go +++ b/waku/options.go @@ -20,9 +20,10 @@ type DiscV5Options struct { } type RelayOptions struct { - Disable bool `long:"no-relay" description:"Disable relay protocol"` - Topics []string `long:"topics" description:"List of topics to listen"` - PeerExchange bool `long:"peer-exchange" description:"Enable GossipSub Peer Exchange"` + Disable bool `long:"no-relay" description:"Disable relay protocol"` + Topics []string `long:"topics" description:"List of topics to listen"` + PeerExchange bool `long:"peer-exchange" description:"Enable GossipSub Peer Exchange"` + MinRelayPeersToPublish int `long:"min-relay-peers-to-publish" description:"Minimum number of peers to publish to Relay" default:"1"` } type FilterOptions struct { diff --git a/waku/try/try.go b/waku/try/try.go new file mode 100644 index 00000000..69c791db --- /dev/null +++ b/waku/try/try.go @@ -0,0 +1,36 @@ +package try + +import "errors" + +// MaxRetries is the maximum number of retries before bailing. +var MaxRetries = 10 + +var errMaxRetriesReached = errors.New("exceeded retry limit") + +// Func represents functions that can be retried. +type Func func(attempt int) (retry bool, err error) + +// Do keeps trying the function until the second argument +// returns false, or no error is returned. +func Do(fn Func) error { + var err error + var cont bool + attempt := 1 + for { + cont, err = fn(attempt) + if !cont || err == nil { + break + } + attempt++ + if attempt > MaxRetries { + return errMaxRetriesReached + } + } + return err +} + +// IsMaxRetries checks whether the error is due to hitting the +// maximum number of retries or not. +func IsMaxRetries(err error) bool { + return err == errMaxRetriesReached +} diff --git a/waku/try/try_test.go b/waku/try/try_test.go new file mode 100644 index 00000000..828a95a0 --- /dev/null +++ b/waku/try/try_test.go @@ -0,0 +1,89 @@ +package try + +import ( + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestTry(t *testing.T) { + MaxRetries = 20 + SomeFunction := func() (string, error) { + return "", nil + } + err := Do(func(attempt int) (bool, error) { + var err error + _, err = SomeFunction() + return attempt < 5, err // try 5 times + }) + require.NoError(t, err) +} + +func TestTryPanic(t *testing.T) { + SomeFunction := func() (string, error) { + panic("something went badly wrong") + } + err := Do(func(attempt int) (retry bool, err error) { + retry = attempt < 5 // try 5 times + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("panic: %v", r) + } + }() + _, err = SomeFunction() + return + }) + require.Error(t, err) +} + +func TestTryDoSuccessful(t *testing.T) { + callCount := 0 + err := Do(func(attempt int) (bool, error) { + callCount++ + return attempt < 5, nil + }) + require.NoError(t, err) + require.Equal(t, callCount, 1) +} + +func TestTryDoFailed(t *testing.T) { + wrongErr := errors.New("something went wrong") + callCount := 0 + err := Do(func(attempt int) (bool, error) { + callCount++ + return attempt < 5, wrongErr + }) + require.Equal(t, err, wrongErr) + require.Equal(t, callCount, 5) +} + +func TestTryPanics(t *testing.T) { + wrongErr := errors.New("something went wrong") + callCount := 0 + err := Do(func(attempt int) (retry bool, err error) { + retry = attempt < 5 + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("panic: %v", r) + } + }() + callCount++ + if attempt > 2 { + panic("I don't like three") + } + err = wrongErr + return + }) + require.Equal(t, err.Error(), "panic: I don't like three") + require.Equal(t, callCount, 5) +} + +func TestRetryLimit(t *testing.T) { + err := Do(func(attempt int) (bool, error) { + return true, errors.New("nope") + }) + require.Error(t, err) + require.Equal(t, IsMaxRetries(err), true) +} diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 132b956b..788b5618 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -2,12 +2,14 @@ package node import ( "context" + "errors" "fmt" "net" "strconv" "sync" "time" + "github.com/ethereum/go-ethereum/common/hexutil" logging "github.com/ipfs/go-log" "github.com/libp2p/go-libp2p" @@ -23,11 +25,13 @@ import ( "go.opencensus.io/stats" rendezvous "github.com/status-im/go-waku-rendezvous" + "github.com/status-im/go-waku/waku/try" v2 "github.com/status-im/go-waku/waku/v2" "github.com/status-im/go-waku/waku/v2/discv5" "github.com/status-im/go-waku/waku/v2/metrics" "github.com/status-im/go-waku/waku/v2/protocol/filter" "github.com/status-im/go-waku/waku/v2/protocol/lightpush" + "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/relay" "github.com/status-im/go-waku/waku/v2/protocol/store" "github.com/status-im/go-waku/waku/v2/protocol/swap" @@ -37,6 +41,7 @@ import ( var log = logging.Logger("wakunode") const maxAllowedPingFailures = 2 +const maxPublishAttempt = 5 type Message []byte @@ -273,7 +278,7 @@ func (w *WakuNode) Start() error { w.opts.wOpts = append(w.opts.wOpts, pubsub.WithDiscovery(w.discoveryV5, w.opts.discV5Opts...)) } - err := w.mountRelay(w.opts.wOpts...) + err := w.mountRelay(w.opts.minRelayPeersToPublish, w.opts.wOpts...) if err != nil { return err } @@ -377,9 +382,35 @@ func (w *WakuNode) Broadcaster() v2.Broadcaster { return w.bcaster } -func (w *WakuNode) mountRelay(opts ...pubsub.Option) error { +func (w *WakuNode) Publish(ctx context.Context, msg *pb.WakuMessage) error { + if !w.opts.enableLightPush && !w.opts.enableRelay { + return errors.New("cannot publish message, relay and lightpush are disabled") + } + + hash, _ := msg.Hash() + err := try.Do(func(attempt int) (bool, error) { + var err error + if !w.relay.EnoughPeersToPublish() { + if !w.lightPush.IsStarted() { + err = errors.New("not enought peers for relay and lightpush is not yet started") + } else { + log.Debug("publishing message via lightpush", hexutil.Encode(hash)) + _, err = w.Lightpush().Publish(ctx, msg) + } + } else { + log.Debug("publishing message via relay", hexutil.Encode(hash)) + _, err = w.Relay().Publish(ctx, msg) + } + + return attempt < maxPublishAttempt, err + }) + + return err +} + +func (w *WakuNode) mountRelay(minRelayPeersToPublish int, opts ...pubsub.Option) error { var err error - w.relay, err = relay.NewWakuRelay(w.ctx, w.host, w.bcaster, opts...) + w.relay, err = relay.NewWakuRelay(w.ctx, w.host, w.bcaster, minRelayPeersToPublish, opts...) if err != nil { return err } diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 5956997f..f074bff2 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -23,6 +23,9 @@ import ( // Default clientId const clientId string = "Go Waku v2 node" +// Default minRelayPeersToPublish +const defaultMinRelayPeersToPublish = 1 + type WakuNodeParameters struct { hostAddr *net.TCPAddr advertiseAddr *net.IP @@ -36,6 +39,8 @@ type WakuNodeParameters struct { isFilterFullNode bool wOpts []pubsub.Option + minRelayPeersToPublish int + enableStore bool shouldResume bool storeMsgs bool @@ -157,9 +162,16 @@ func WithLibP2POptions(opts ...libp2p.Option) WakuNodeOption { // WithWakuRelay enables the Waku V2 Relay protocol. This WakuNodeOption // accepts a list of WakuRelay gossipsub option to setup the protocol func WithWakuRelay(opts ...pubsub.Option) WakuNodeOption { + return WithWakuRelayAndMinPeers(defaultMinRelayPeersToPublish, opts...) +} + +// WithWakuRelayAndMinPeers enables the Waku V2 Relay protocol. This WakuNodeOption +// accepts a min peers require to publish and a list of WakuRelay gossipsub option to setup the protocol +func WithWakuRelayAndMinPeers(minRelayPeersToPublish int, opts ...pubsub.Option) WakuNodeOption { return func(params *WakuNodeParameters) error { params.enableRelay = true params.wOpts = opts + params.minRelayPeersToPublish = minRelayPeersToPublish return nil } } diff --git a/waku/v2/protocol/filter/waku_filter_test.go b/waku/v2/protocol/filter/waku_filter_test.go index 612e54f1..ef3e4a00 100644 --- a/waku/v2/protocol/filter/waku_filter_test.go +++ b/waku/v2/protocol/filter/waku_filter_test.go @@ -22,7 +22,7 @@ func makeWakuRelay(t *testing.T, topic string, broadcaster v2.Broadcaster) (*rel host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - relay, err := relay.NewWakuRelay(context.Background(), host, broadcaster) + relay, err := relay.NewWakuRelay(context.Background(), host, broadcaster, 0) require.NoError(t, err) sub, err := relay.SubscribeToTopic(context.Background(), topic) diff --git a/waku/v2/protocol/lightpush/waku_lightpush.go b/waku/v2/protocol/lightpush/waku_lightpush.go index 4f521d84..e83f50dd 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush.go +++ b/waku/v2/protocol/lightpush/waku_lightpush.go @@ -31,6 +31,8 @@ type WakuLightPush struct { h host.Host relay *relay.WakuRelay ctx context.Context + + started bool } func NewWakuLightPush(ctx context.Context, h host.Host, relay *relay.WakuRelay) *WakuLightPush { @@ -49,6 +51,7 @@ func (wakuLP *WakuLightPush) Start() error { wakuLP.h.SetStreamHandlerMatch(LightPushID_v20beta1, protocol.PrefixTextMatch(string(LightPushID_v20beta1)), wakuLP.onRequest) log.Info("Light Push protocol started") + wakuLP.started = true return nil } @@ -178,8 +181,13 @@ func (wakuLP *WakuLightPush) request(ctx context.Context, req *pb.PushRequest, o return pushResponseRPC.Response, nil } +func (wakuLP *WakuLightPush) IsStarted() bool { + return wakuLP.started +} + func (wakuLP *WakuLightPush) Stop() { wakuLP.h.RemoveStreamHandler(LightPushID_v20beta1) + wakuLP.started = false } func (wakuLP *WakuLightPush) PublishToTopic(ctx context.Context, message *pb.WakuMessage, topic string, opts ...LightPushOption) ([]byte, error) { diff --git a/waku/v2/protocol/lightpush/waku_lightpush_test.go b/waku/v2/protocol/lightpush/waku_lightpush_test.go index 12df018d..c928be4f 100644 --- a/waku/v2/protocol/lightpush/waku_lightpush_test.go +++ b/waku/v2/protocol/lightpush/waku_lightpush_test.go @@ -24,7 +24,7 @@ func makeWakuRelay(t *testing.T, topic string) (*relay.WakuRelay, *relay.Subscri host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - relay, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10)) + relay, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0) require.NoError(t, err) sub, err := relay.SubscribeToTopic(context.Background(), topic) diff --git a/waku/v2/protocol/relay/waku_relay.go b/waku/v2/protocol/relay/waku_relay.go index 9bff7395..e61086d6 100644 --- a/waku/v2/protocol/relay/waku_relay.go +++ b/waku/v2/protocol/relay/waku_relay.go @@ -34,6 +34,8 @@ type WakuRelay struct { bcaster v2.Broadcaster + minPeersToPublish int + // TODO: convert to concurrent maps topicsMutex sync.Mutex wakuRelayTopics map[string]*pubsub.Topic @@ -50,13 +52,14 @@ func msgIdFn(pmsg *pubsub_pb.Message) string { return string(hash[:]) } -func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, opts ...pubsub.Option) (*WakuRelay, error) { +func NewWakuRelay(ctx context.Context, h host.Host, bcaster v2.Broadcaster, minPeersToPublish int, opts ...pubsub.Option) (*WakuRelay, error) { w := new(WakuRelay) w.host = h w.wakuRelayTopics = make(map[string]*pubsub.Topic) w.relaySubs = make(map[string]*pubsub.Subscription) w.subscriptions = make(map[string][]*Subscription) w.bcaster = bcaster + w.minPeersToPublish = minPeersToPublish // default options required by WakuRelay opts = append(opts, pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign)) @@ -153,6 +156,10 @@ func (w *WakuRelay) PublishToTopic(ctx context.Context, message *pb.WakuMessage, return nil, errors.New("message can't be null") } + if !w.EnoughPeersToPublishToTopic(topic) { + return nil, errors.New("not enougth peers to publish") + } + pubSubTopic, err := w.upsertTopic(topic) if err != nil { @@ -191,6 +198,14 @@ func (w *WakuRelay) Stop() { w.subscriptions = nil } +func (w *WakuRelay) EnoughPeersToPublish() bool { + return w.EnoughPeersToPublishToTopic(DefaultWakuTopic) +} + +func (w *WakuRelay) EnoughPeersToPublishToTopic(topic string) bool { + return len(w.PubSub().ListPeers(topic)) >= w.minPeersToPublish +} + func (w *WakuRelay) SubscribeToTopic(ctx context.Context, topic string) (*Subscription, error) { // Subscribes to a PubSub topic. // NOTE The data field SHOULD be decoded as a WakuMessage. diff --git a/waku/v2/protocol/relay/waku_relay_test.go b/waku/v2/protocol/relay/waku_relay_test.go index 3fc5b6b9..8021efd2 100644 --- a/waku/v2/protocol/relay/waku_relay_test.go +++ b/waku/v2/protocol/relay/waku_relay_test.go @@ -19,7 +19,7 @@ func TestWakuRelay(t *testing.T) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - relay, err := NewWakuRelay(context.Background(), host, nil) + relay, err := NewWakuRelay(context.Background(), host, nil, 0) defer relay.Stop() require.NoError(t, err) diff --git a/waku/v2/rpc/admin_test.go b/waku/v2/rpc/admin_test.go index 2b67fb29..5b46cc81 100644 --- a/waku/v2/rpc/admin_test.go +++ b/waku/v2/rpc/admin_test.go @@ -32,7 +32,7 @@ func TestV1Peers(t *testing.T) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - relay, err := relay.NewWakuRelay(context.Background(), host, nil) + relay, err := relay.NewWakuRelay(context.Background(), host, nil, 0) require.NoError(t, err) defer relay.Stop() diff --git a/waku/v2/rpc/filter_test.go b/waku/v2/rpc/filter_test.go index 36f5853f..c8cd669b 100644 --- a/waku/v2/rpc/filter_test.go +++ b/waku/v2/rpc/filter_test.go @@ -38,7 +38,7 @@ func TestFilterSubscription(t *testing.T) { host, err := tests.MakeHost(context.Background(), port, rand.Reader) require.NoError(t, err) - node, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10)) + node, err := relay.NewWakuRelay(context.Background(), host, v2.NewBroadcaster(10), 0) require.NoError(t, err) _, err = node.SubscribeToTopic(context.Background(), testTopic) diff --git a/waku/v2/rpc/relay_test.go b/waku/v2/rpc/relay_test.go index a6d97669..8bcbcb6c 100644 --- a/waku/v2/rpc/relay_test.go +++ b/waku/v2/rpc/relay_test.go @@ -13,7 +13,7 @@ import ( ) func makeRelayService(t *testing.T) *RelayService { - options := node.WithWakuRelay() + options := node.WithWakuRelayAndMinPeers(0) n, err := node.New(context.Background(), options) require.NoError(t, err) err = n.Start()