diff --git a/waku/node.go b/waku/node.go index 635509f3..e057215a 100644 --- a/waku/node.go +++ b/waku/node.go @@ -337,10 +337,10 @@ func loadPrivateKeyFromFile(path string) (*ecdsa.PrivateKey, error) { return privKey, nil } -func writePrivateKeyToFile(path string, force bool) error { +func checkForPrivateKeyFile(path string, overwrite bool) error { _, err := os.Stat(path) - if err == nil && !force { + if err == nil && !overwrite { return fmt.Errorf("%s already exists. Use --overwrite to overwrite the file", path) } @@ -348,21 +348,38 @@ func writePrivateKeyToFile(path string, force bool) error { return err } + return nil +} + +func generatePrivateKey() ([]byte, error) { key, err := crypto.GenerateKey() if err != nil { - return err + return nil, err } privKey := libp2pcrypto.PrivKey((*libp2pcrypto.Secp256k1PrivateKey)(key)) b, err := privKey.Raw() if err != nil { - return err + return nil, err } output := make([]byte, hex.EncodedLen(len(b))) hex.Encode(output, b) + return output, nil +} + +func writePrivateKeyToFile(path string, overwrite bool) error { + if err := checkForPrivateKeyFile(path, overwrite); err != nil { + return err + } + + output, err := generatePrivateKey() + if err != nil { + return err + } + return ioutil.WriteFile(path, output, 0600) } diff --git a/waku/v2/node/keepalive.go b/waku/v2/node/keepalive.go new file mode 100644 index 00000000..a5c5e979 --- /dev/null +++ b/waku/v2/node/keepalive.go @@ -0,0 +1,77 @@ +package node + +import ( + "context" + "fmt" + "time" + + "github.com/libp2p/go-libp2p-core/network" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" +) + +const maxAllowedPingFailures = 2 +const maxPublishAttempt = 5 + +// startKeepAlive creates a go routine that periodically pings connected peers. +// This is necessary because TCP connections are automatically closed due to inactivity, +// and doing a ping will avoid this (with a small bandwidth cost) +func (w *WakuNode) startKeepAlive(t time.Duration) { + go func() { + defer w.wg.Done() + log.Info("Setting up ping protocol with duration of ", t) + ticker := time.NewTicker(t) + defer ticker.Stop() + for { + select { + case <-ticker.C: + // Compared to Network's peers collection, + // Peerstore contains all peers ever connected to, + // thus if a host goes down and back again, + // pinging a peer will trigger identification process, + // which is not possible when iterating + // through Network's peer collection, as it will be empty + for _, p := range w.host.Peerstore().Peers() { + if p != w.host.ID() { + w.wg.Add(1) + go w.pingPeer(p) + } + } + case <-w.quit: + return + } + } + }() +} + +func (w *WakuNode) pingPeer(peer peer.ID) { + w.keepAliveMutex.Lock() + defer w.keepAliveMutex.Unlock() + defer w.wg.Done() + + ctx, cancel := context.WithTimeout(w.ctx, 3*time.Second) + defer cancel() + + log.Debug("Pinging ", peer) + pr := ping.Ping(ctx, w.host, peer) + select { + case res := <-pr: + if res.Error != nil { + w.keepAliveFails[peer]++ + log.Debug(fmt.Sprintf("Could not ping %s: %s", peer, res.Error.Error())) + } else { + w.keepAliveFails[peer] = 0 + } + case <-ctx.Done(): + w.keepAliveFails[peer]++ + log.Debug(fmt.Sprintf("Could not ping %s: %s", peer, ctx.Err())) + } + + if w.keepAliveFails[peer] > maxAllowedPingFailures && w.host.Network().Connectedness(peer) == network.Connected { + log.Info("Disconnecting peer ", peer) + if err := w.host.Network().ClosePeer(peer); err != nil { + log.Debug(fmt.Sprintf("Could not close conn to peer %s: %s", peer, err)) + } + w.keepAliveFails[peer] = 0 + } +} diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index c0410651..1fe175fa 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -20,7 +20,6 @@ import ( "github.com/libp2p/go-libp2p-core/peerstore" p2pproto "github.com/libp2p/go-libp2p-core/protocol" pubsub "github.com/libp2p/go-libp2p-pubsub" - "github.com/libp2p/go-libp2p/p2p/protocol/ping" ma "github.com/multiformats/go-multiaddr" "go.opencensus.io/stats" @@ -40,11 +39,6 @@ import ( var log = logging.Logger("wakunode") -const maxAllowedPingFailures = 2 -const maxPublishAttempt = 5 - -type Message []byte - type Peer struct { ID peer.ID Protocols []string @@ -455,13 +449,9 @@ func (w *WakuNode) mountDiscV5() error { return err } - discoveryV5, err := discv5.NewDiscoveryV5(w.Host(), net.ParseIP(ipStr), port, w.opts.privKey, w.wakuFlag, discV5Options...) - if err != nil { - return err - } + w.discoveryV5, err = discv5.NewDiscoveryV5(w.Host(), net.ParseIP(ipStr), port, w.opts.privKey, w.wakuFlag, discV5Options...) - w.discoveryV5 = discoveryV5 - return nil + return err } func (w *WakuNode) mountRendezvous() error { @@ -631,66 +621,3 @@ func (w *WakuNode) Peers() ([]*Peer, error) { } return peers, nil } - -// startKeepAlive creates a go routine that periodically pings connected peers. -// This is necessary because TCP connections are automatically closed due to inactivity, -// and doing a ping will avoid this (with a small bandwidth cost) -func (w *WakuNode) startKeepAlive(t time.Duration) { - go func() { - defer w.wg.Done() - log.Info("Setting up ping protocol with duration of ", t) - ticker := time.NewTicker(t) - defer ticker.Stop() - for { - select { - case <-ticker.C: - // Compared to Network's peers collection, - // Peerstore contains all peers ever connected to, - // thus if a host goes down and back again, - // pinging a peer will trigger identification process, - // which is not possible when iterating - // through Network's peer collection, as it will be empty - for _, p := range w.host.Peerstore().Peers() { - if p != w.host.ID() { - w.wg.Add(1) - go w.pingPeer(p) - } - } - case <-w.quit: - return - } - } - }() -} - -func (w *WakuNode) pingPeer(peer peer.ID) { - w.keepAliveMutex.Lock() - defer w.keepAliveMutex.Unlock() - defer w.wg.Done() - - ctx, cancel := context.WithTimeout(w.ctx, 3*time.Second) - defer cancel() - - log.Debug("Pinging ", peer) - pr := ping.Ping(ctx, w.host, peer) - select { - case res := <-pr: - if res.Error != nil { - w.keepAliveFails[peer]++ - log.Debug(fmt.Sprintf("Could not ping %s: %s", peer, res.Error.Error())) - } else { - w.keepAliveFails[peer] = 0 - } - case <-ctx.Done(): - w.keepAliveFails[peer]++ - log.Debug(fmt.Sprintf("Could not ping %s: %s", peer, ctx.Err())) - } - - if w.keepAliveFails[peer] > maxAllowedPingFailures && w.host.Network().Connectedness(peer) == network.Connected { - log.Info("Disconnecting peer ", peer) - if err := w.host.Network().ClosePeer(peer); err != nil { - log.Debug(fmt.Sprintf("Could not close conn to peer %s: %s", peer, err)) - } - w.keepAliveFails[peer] = 0 - } -} diff --git a/waku/v2/protocol/store/waku_store.go b/waku/v2/protocol/store/waku_store.go index bef39de6..52efee9b 100644 --- a/waku/v2/protocol/store/waku_store.go +++ b/waku/v2/protocol/store/waku_store.go @@ -684,27 +684,21 @@ func (store *WakuStore) Resume(ctx context.Context, pubsubTopic string, peerList Direction: pb.PagingInfo_BACKWARD, }, } - var response *pb.HistoryResponse - if len(peerList) > 0 { - var err error - response, err = store.queryLoop(ctx, rpc, peerList) - if err != nil { - log.Error("failed to resume history", err) - return -1, ErrFailedToResumeHistory - } - } else { - p, err := utils.SelectPeer(store.h, string(StoreID_v20beta3)) + if len(peerList) == 0 { + p, err := utils.SelectPeer(store.h, string(StoreID_v20beta3)) if err != nil { log.Info("Error selecting peer: ", err) return -1, ErrNoPeersAvailable } - response, err = store.queryFrom(ctx, rpc, *p, protocol.GenerateRequestId()) - if err != nil { - log.Error("failed to resume history", err) - return -1, ErrFailedToResumeHistory - } + peerList = append(peerList, *p) + } + + response, err := store.queryLoop(ctx, rpc, peerList) + if err != nil { + log.Error("failed to resume history", err) + return -1, ErrFailedToResumeHistory } for _, msg := range response.Messages {