2021-04-13 18:54:06 +00:00
package waku
2021-03-18 23:21:45 +00:00
import (
"context"
"crypto/rand"
2021-04-19 00:03:16 +00:00
"database/sql"
2021-03-18 23:21:45 +00:00
"encoding/hex"
2021-04-13 18:54:06 +00:00
"errors"
2021-03-18 23:21:45 +00:00
"fmt"
"net"
"os"
"os/signal"
"syscall"
2021-06-24 13:02:53 +00:00
"time"
2021-03-18 23:21:45 +00:00
2021-03-23 14:46:16 +00:00
"github.com/ethereum/go-ethereum/crypto"
2021-04-13 18:54:06 +00:00
dssql "github.com/ipfs/go-ds-sql"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p-peerstore/pstoreds"
2021-04-22 13:07:22 +00:00
"github.com/multiformats/go-multiaddr"
2021-03-18 23:21:45 +00:00
"github.com/spf13/cobra"
"github.com/spf13/viper"
2021-06-28 13:20:23 +00:00
"github.com/status-im/go-waku/waku/metrics"
2021-04-13 18:54:06 +00:00
"github.com/status-im/go-waku/waku/persistence"
"github.com/status-im/go-waku/waku/persistence/sqlite"
2021-03-18 23:21:45 +00:00
"github.com/status-im/go-waku/waku/v2/node"
2021-04-28 20:10:44 +00:00
"github.com/status-im/go-waku/waku/v2/protocol/relay"
2021-03-18 23:21:45 +00:00
)
2021-04-13 18:54:06 +00:00
var log = logging . Logger ( "wakunode" )
2021-03-18 23:21:45 +00:00
func randomHex ( n int ) ( string , error ) {
bytes := make ( [ ] byte , n )
if _ , err := rand . Read ( bytes ) ; err != nil {
return "" , err
}
return hex . EncodeToString ( bytes ) , nil
}
2021-04-13 18:54:06 +00:00
func checkError ( err error , msg string ) {
if err != nil {
if msg != "" {
msg = msg + ": "
}
log . Fatal ( msg , err )
}
}
2021-03-18 23:21:45 +00:00
var rootCmd = & cobra . Command {
Use : "waku" ,
Short : "Start a waku node" ,
Long : ` Start a waku node... ` ,
// Uncomment the following line if your bare application
// has an action associated with it:
Run : func ( cmd * cobra . Command , args [ ] string ) {
port , _ := cmd . Flags ( ) . GetInt ( "port" )
2021-04-22 13:07:22 +00:00
enableWs , _ := cmd . Flags ( ) . GetBool ( "ws" )
wsPort , _ := cmd . Flags ( ) . GetInt ( "ws-port" )
2021-04-28 20:10:44 +00:00
wakuRelay , _ := cmd . Flags ( ) . GetBool ( "relay" )
2021-06-10 12:59:51 +00:00
wakuFilter , _ := cmd . Flags ( ) . GetBool ( "filter" )
2021-03-18 23:21:45 +00:00
key , _ := cmd . Flags ( ) . GetString ( "nodekey" )
store , _ := cmd . Flags ( ) . GetBool ( "store" )
2021-04-19 00:03:16 +00:00
useDB , _ := cmd . Flags ( ) . GetBool ( "use-db" )
2021-04-12 17:59:41 +00:00
dbPath , _ := cmd . Flags ( ) . GetString ( "dbpath" )
2021-03-18 23:21:45 +00:00
storenode , _ := cmd . Flags ( ) . GetString ( "storenode" )
staticnodes , _ := cmd . Flags ( ) . GetStringSlice ( "staticnodes" )
2021-06-28 14:14:28 +00:00
filternodes , _ := cmd . Flags ( ) . GetStringSlice ( "filternodes" )
lightpush , _ := cmd . Flags ( ) . GetBool ( "lightpush" )
lightpushnodes , _ := cmd . Flags ( ) . GetStringSlice ( "lightpushnodes" )
2021-04-15 21:23:07 +00:00
topics , _ := cmd . Flags ( ) . GetStringSlice ( "topics" )
2021-06-24 13:02:53 +00:00
keepAlive , _ := cmd . Flags ( ) . GetInt ( "keep-alive" )
2021-06-28 13:20:23 +00:00
enableMetrics , _ := cmd . Flags ( ) . GetBool ( "metrics" )
metricsAddress , _ := cmd . Flags ( ) . GetString ( "metrics-address" )
metricsPort , _ := cmd . Flags ( ) . GetInt ( "metrics-port" )
2021-03-18 23:21:45 +00:00
2021-03-30 14:13:33 +00:00
hostAddr , _ := net . ResolveTCPAddr ( "tcp" , fmt . Sprint ( "0.0.0.0:" , port ) )
2021-03-18 23:21:45 +00:00
2021-04-13 18:54:06 +00:00
var err error
2021-03-18 23:21:45 +00:00
if key == "" {
key , err = randomHex ( 32 )
2021-06-24 13:02:53 +00:00
checkError ( err , "could not generate random key" )
2021-03-18 23:21:45 +00:00
}
2021-03-23 14:46:16 +00:00
prvKey , err := crypto . HexToECDSA ( key )
2021-06-24 13:02:53 +00:00
checkError ( err , "error converting key into valid ecdsa key" )
2021-03-18 23:21:45 +00:00
2021-04-19 00:03:16 +00:00
if dbPath == "" && useDB {
2021-04-13 18:54:06 +00:00
checkError ( errors . New ( "dbpath can't be null" ) , "" )
2021-03-18 23:21:45 +00:00
}
2021-04-19 00:03:16 +00:00
var db * sql . DB
2021-04-13 18:54:06 +00:00
2021-04-19 00:03:16 +00:00
if useDB {
db , err = sqlite . NewDB ( dbPath )
checkError ( err , "Could not connect to DB" )
}
2021-04-13 18:54:06 +00:00
2021-04-19 00:03:16 +00:00
ctx := context . Background ( )
2021-04-13 18:54:06 +00:00
2021-06-28 13:20:23 +00:00
var metricsServer * metrics . Server
if enableMetrics {
metricsServer = metrics . NewMetricsServer ( metricsAddress , metricsPort )
go metricsServer . Start ( )
}
2021-04-18 23:41:42 +00:00
nodeOpts := [ ] node . WakuNodeOption {
node . WithPrivateKey ( prvKey ) ,
node . WithHostAddress ( [ ] net . Addr { hostAddr } ) ,
2021-06-24 13:02:53 +00:00
node . WithKeepAlive ( time . Duration ( keepAlive ) * time . Second ) ,
2021-04-18 23:41:42 +00:00
}
2021-04-13 18:54:06 +00:00
2021-04-22 13:07:22 +00:00
if enableWs {
wsMa , _ := multiaddr . NewMultiaddr ( fmt . Sprintf ( "/ip4/0.0.0.0/tcp/%d/ws" , wsPort ) )
nodeOpts = append ( nodeOpts , node . WithMultiaddress ( [ ] multiaddr . Multiaddr { wsMa } ) )
}
2021-04-19 00:03:16 +00:00
libp2pOpts := node . DefaultLibP2POptions
if useDB {
// Create persistent peerstore
queries , err := sqlite . NewQueries ( "peerstore" , db )
checkError ( err , "Peerstore" )
datastore := dssql . NewDatastore ( db , queries )
opts := pstoreds . DefaultOpts ( )
peerStore , err := pstoreds . NewPeerstore ( ctx , datastore , opts )
checkError ( err , "Peerstore" )
libp2pOpts = append ( libp2pOpts , libp2p . Peerstore ( peerStore ) )
}
nodeOpts = append ( nodeOpts , node . WithLibP2POptions ( libp2pOpts ... ) )
2021-04-28 20:10:44 +00:00
if wakuRelay {
2021-04-18 23:41:42 +00:00
nodeOpts = append ( nodeOpts , node . WithWakuRelay ( ) )
2021-03-18 23:21:45 +00:00
}
2021-06-10 12:59:51 +00:00
if wakuFilter {
nodeOpts = append ( nodeOpts , node . WithWakuFilter ( ) )
}
2021-04-13 18:54:06 +00:00
if store {
2021-04-18 23:41:42 +00:00
nodeOpts = append ( nodeOpts , node . WithWakuStore ( true ) )
2021-04-19 00:03:16 +00:00
if useDB {
dbStore , err := persistence . NewDBStore ( persistence . WithDB ( db ) )
checkError ( err , "DBStore" )
nodeOpts = append ( nodeOpts , node . WithMessageProvider ( dbStore ) )
} else {
nodeOpts = append ( nodeOpts , node . WithMessageProvider ( nil ) )
}
2021-04-18 23:41:42 +00:00
}
2021-04-13 18:54:06 +00:00
2021-06-28 14:14:28 +00:00
if lightpush {
nodeOpts = append ( nodeOpts , node . WithLightPush ( ) )
}
2021-04-18 23:41:42 +00:00
wakuNode , err := node . New ( ctx , nodeOpts ... )
2021-04-12 17:59:41 +00:00
2021-04-18 23:41:42 +00:00
checkError ( err , "Wakunode" )
2021-03-18 23:21:45 +00:00
2021-04-15 21:23:07 +00:00
for _ , t := range topics {
2021-04-28 20:10:44 +00:00
nodeTopic := relay . Topic ( t )
2021-04-15 21:23:07 +00:00
_ , err := wakuNode . Subscribe ( & nodeTopic )
checkError ( err , "Error subscring to topic" )
}
2021-03-18 23:21:45 +00:00
if storenode != "" && ! store {
2021-04-13 18:54:06 +00:00
checkError ( errors . New ( "Store protocol was not started" ) , "" )
2021-03-18 23:21:45 +00:00
} else {
if storenode != "" {
2021-04-18 23:41:42 +00:00
_ , err = wakuNode . AddStorePeer ( storenode )
checkError ( err , "Error adding store peer" )
2021-03-18 23:21:45 +00:00
}
}
if len ( staticnodes ) > 0 {
for _ , n := range staticnodes {
2021-07-29 12:40:54 +00:00
go func ( node string ) {
err = wakuNode . DialPeer ( node )
checkError ( err , "Error dialing peer" )
} ( n )
2021-03-18 23:21:45 +00:00
}
}
2021-06-28 14:14:28 +00:00
if len ( lightpushnodes ) > 0 && ! lightpush {
checkError ( errors . New ( "LightPush protocol was not started" ) , "" )
} else {
if len ( lightpushnodes ) > 0 {
for _ , n := range lightpushnodes {
2021-07-29 12:40:54 +00:00
go func ( node string ) {
_ , err = wakuNode . AddLightPushPeer ( node )
checkError ( err , "Error adding lightpush peer" )
} ( n )
2021-06-28 14:14:28 +00:00
}
}
}
if len ( filternodes ) > 0 && ! wakuFilter {
checkError ( errors . New ( "WakuFilter protocol was not started" ) , "" )
} else {
if len ( filternodes ) > 0 {
for _ , n := range filternodes {
2021-07-29 12:40:54 +00:00
go func ( node string ) {
_ , err = wakuNode . AddFilterPeer ( node )
checkError ( err , "Error adding filter peer" )
} ( n )
2021-06-28 14:14:28 +00:00
}
}
}
2021-03-18 23:21:45 +00:00
// Wait for a SIGINT or SIGTERM signal
ch := make ( chan os . Signal , 1 )
signal . Notify ( ch , syscall . SIGINT , syscall . SIGTERM )
<- ch
fmt . Println ( "\n\n\nReceived signal, shutting down..." )
// shut the node down
2021-03-22 16:45:13 +00:00
wakuNode . Stop ( )
2021-04-19 00:03:16 +00:00
2021-06-28 13:20:23 +00:00
if enableMetrics {
metricsServer . Stop ( ctx )
}
2021-04-19 00:03:16 +00:00
if useDB {
err = db . Close ( )
checkError ( err , "DBClose" )
}
2021-03-18 23:21:45 +00:00
} ,
}
// Execute adds all child commands to the root command and sets flags appropriately.
// This is called by main.main(). It only needs to happen once to the rootCmd.
func Execute ( ) {
cobra . CheckErr ( rootCmd . Execute ( ) )
}
func init ( ) {
cobra . OnInitialize ( initConfig )
rootCmd . Flags ( ) . Int ( "port" , 9000 , "Libp2p TCP listening port (0 for random)" )
2021-04-22 13:07:22 +00:00
rootCmd . Flags ( ) . Bool ( "ws" , false , "Enable websockets support" )
rootCmd . Flags ( ) . Int ( "ws-port" , 9001 , "Libp2p TCP listening port for websocket connection (0 for random)" )
2021-03-18 23:21:45 +00:00
rootCmd . Flags ( ) . String ( "nodekey" , "" , "P2P node private key as hex (default random)" )
2021-04-28 20:10:44 +00:00
rootCmd . Flags ( ) . StringSlice ( "topics" , [ ] string { string ( relay . DefaultWakuTopic ) } , fmt . Sprintf ( "List of topics to listen (default %s)" , relay . DefaultWakuTopic ) )
2021-03-18 23:21:45 +00:00
rootCmd . Flags ( ) . StringSlice ( "staticnodes" , [ ] string { } , "Multiaddr of peer to directly connect with. Argument may be repeated" )
2021-04-22 13:07:22 +00:00
rootCmd . Flags ( ) . Bool ( "relay" , true , "Enable relay protocol" )
2021-06-10 12:59:51 +00:00
rootCmd . Flags ( ) . Bool ( "filter" , true , "Enable filter protocol" )
2021-03-18 23:21:45 +00:00
rootCmd . Flags ( ) . Bool ( "store" , false , "Enable store protocol" )
2021-06-28 14:14:28 +00:00
rootCmd . Flags ( ) . Bool ( "lightpush" , false , "Enable lightpush protocol" )
2021-04-19 00:03:16 +00:00
rootCmd . Flags ( ) . Bool ( "use-db" , true , "Store messages and peers in a DB, (default: true, use false for in-memory only)" )
2021-04-12 17:59:41 +00:00
rootCmd . Flags ( ) . String ( "dbpath" , "./store.db" , "Path to DB file" )
2021-03-18 23:21:45 +00:00
rootCmd . Flags ( ) . String ( "storenode" , "" , "Multiaddr of peer to connect with for waku store protocol" )
2021-06-24 13:02:53 +00:00
rootCmd . Flags ( ) . Int ( "keep-alive" , 300 , "interval in seconds for pinging peers to keep the connection alive." )
2021-06-28 13:20:23 +00:00
rootCmd . Flags ( ) . StringSlice ( "filternodes" , [ ] string { } , "Multiaddr of peers to to request content filtering of messages. Argument may be repeated" )
rootCmd . Flags ( ) . StringSlice ( "lightpushnodes" , [ ] string { } , "Multiaddr of peers to to request lightpush of published messages. Argument may be repeated" )
rootCmd . Flags ( ) . Bool ( "metrics" , false , "Enable the metrics server" )
rootCmd . Flags ( ) . String ( "metrics-address" , "127.0.0.1" , "Listening address of the metrics server" )
rootCmd . Flags ( ) . Int ( "metrics-port" , 8008 , "Listening HTTP port of the metrics server" )
2021-03-18 23:21:45 +00:00
}
func initConfig ( ) {
viper . AutomaticEnv ( ) // read in environment variables that match
}