diff --git a/examples/chat2/main.go b/examples/chat2/main.go index 9a2eb36c..e5a6fa66 100644 --- a/examples/chat2/main.go +++ b/examples/chat2/main.go @@ -30,12 +30,12 @@ func main() { nickFlag := flag.String("nick", "", "nickname to use in chat. will be generated if empty") fleetFlag := flag.String("fleet", "wakuv2.prod", "Select the fleet to connect to. (wakuv2.prod, wakuv2.test)") contentTopicFlag := flag.String("contenttopic", DefaultContentTopic, "content topic to use for the chat") - nodeKeyFlag := flag.String("nodekey", "", "private key for this node. will be generated if empty") - staticNodeFlag := flag.String("staticnode", "", "connects to a node. will get a random node from fleets.status.im if empty") - storeNodeFlag := flag.String("storenode", "", "connects to a store node to retrieve messages. will get a random node from fleets.status.im if empty") + nodeKeyFlag := flag.String("nodekey", "", "private key for this node. Will be generated if empty") + staticNodeFlag := flag.String("staticnode", "", "connects to a node. Will get a random node from fleets.status.im if empty") + storeNodeFlag := flag.String("storenode", "", "connects to a store node to retrieve messages. Will get a random node from fleets.status.im if empty") port := flag.Int("port", 0, "port. Will be random if 0") payloadV1Flag := flag.Bool("payloadV1", false, "use Waku v1 payload encoding/encryption. default false") - + keepAliveFlag := flag.Int64("keep-alive", 300, "interval in seconds for pinging peers to keep the connection alive.") flag.Parse() hostAddr, _ := net.ResolveTCPAddr("tcp", fmt.Sprintf("0.0.0.0:%d", *port)) @@ -63,6 +63,7 @@ func main() { node.WithHostAddress([]net.Addr{hostAddr}), node.WithWakuRelay(), node.WithWakuStore(false), + node.WithKeepAlive((*keepAliveFlag)*time.Second), ) if err != nil { fmt.Print(err) diff --git a/waku/node.go b/waku/node.go index 3e084395..ebae2da6 100644 --- a/waku/node.go +++ b/waku/node.go @@ -11,6 +11,7 @@ import ( "os" "os/signal" "syscall" + "time" "github.com/ethereum/go-ethereum/crypto" dssql "github.com/ipfs/go-ds-sql" @@ -65,6 +66,7 @@ var rootCmd = &cobra.Command{ storenode, _ := cmd.Flags().GetString("storenode") staticnodes, _ := cmd.Flags().GetStringSlice("staticnodes") topics, _ := cmd.Flags().GetStringSlice("topics") + keepAlive, _ := cmd.Flags().GetInt("keep-alive") hostAddr, _ := net.ResolveTCPAddr("tcp", fmt.Sprint("0.0.0.0:", port)) @@ -72,10 +74,11 @@ var rootCmd = &cobra.Command{ if key == "" { key, err = randomHex(32) - checkError(err, "Could not generate random key") + checkError(err, "could not generate random key") } prvKey, err := crypto.HexToECDSA(key) + checkError(err, "error converting key into valid ecdsa key") if dbPath == "" && useDB { checkError(errors.New("dbpath can't be null"), "") @@ -93,6 +96,7 @@ var rootCmd = &cobra.Command{ nodeOpts := []node.WakuNodeOption{ node.WithPrivateKey(prvKey), node.WithHostAddress([]net.Addr{hostAddr}), + node.WithKeepAlive(time.Duration(keepAlive) * time.Second), } if enableWs { @@ -199,6 +203,7 @@ func init() { rootCmd.Flags().Bool("use-db", true, "Store messages and peers in a DB, (default: true, use false for in-memory only)") rootCmd.Flags().String("dbpath", "./store.db", "Path to DB file") rootCmd.Flags().String("storenode", "", "Multiaddr of peer to connect with for waku store protocol") + rootCmd.Flags().Int("keep-alive", 300, "interval in seconds for pinging peers to keep the connection alive.") } func initConfig() { diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index a799235a..f1051b67 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -12,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" ma "github.com/multiformats/go-multiaddr" "github.com/status-im/go-waku/waku/v2/protocol" @@ -38,6 +39,8 @@ type WakuNode struct { filter *filter.WakuFilter lightPush *lightpush.WakuLightPush + ping *ping.PingService + subscriptions map[relay.Topic][]*Subscription subscriptionsMutex sync.Mutex @@ -47,6 +50,7 @@ type WakuNode struct { ctx context.Context cancel context.CancelFunc + quit chan struct{} } func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { @@ -84,6 +88,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { w.ctx = ctx w.subscriptions = make(map[relay.Topic][]*Subscription) w.opts = params + w.quit = make(chan struct{}) if params.enableRelay { err := w.mountRelay(params.wOpts...) @@ -111,6 +116,10 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { w.mountLightPush() } + if params.keepAliveInterval > time.Duration(0) { + w.startKeepAlive(params.keepAliveInterval) + } + for _, addr := range w.ListenAddresses() { log.Info("Listening on ", addr) } @@ -123,6 +132,8 @@ func (w *WakuNode) Stop() { defer w.subscriptionsMutex.Unlock() defer w.cancel() + close(w.quit) + for _, topic := range w.relay.Topics() { for _, sub := range w.subscriptions[topic] { sub.Unsubscribe() @@ -495,3 +506,25 @@ func (w *WakuNode) ClosePeerById(id peer.ID) error { func (w *WakuNode) PeerCount() int { return len(w.host.Network().Peers()) } + +func (w *WakuNode) startKeepAlive(t time.Duration) { + log.Info("Setting up ping protocol with duration of", t) + + w.ping = ping.NewPingService(w.host) + ticker := time.NewTicker(t) + go func() { + for { + select { + case <-ticker.C: + for _, peer := range w.host.Network().Peers() { + log.Info("Pinging", peer) + w.ping.Ping(w.ctx, peer) + } + case <-w.quit: + ticker.Stop() + return + } + } + }() + +} diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 946cf5d9..7c3a5703 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -3,6 +3,7 @@ package node import ( "crypto/ecdsa" "net" + "time" "github.com/libp2p/go-libp2p" connmgr "github.com/libp2p/go-libp2p-connmgr" @@ -28,6 +29,8 @@ type WakuNodeParameters struct { store *store.WakuStore filter *filter.WakuFilter + keepAliveInterval time.Duration + enableLightPush bool } @@ -130,6 +133,13 @@ func WithLightPush() WakuNodeOption { } } +func WithKeepAlive(t time.Duration) WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.keepAliveInterval = t + return nil + } +} + // Default options used in the libp2p node var DefaultLibP2POptions = []libp2p.Option{ libp2p.DefaultTransports,