Fix rendezvous loop when discovery protocol is stopped; add advertiseaddr flag (#1151)

This commit is contained in:
Adam Babik 2018-08-17 08:25:55 +02:00 committed by GitHub
parent 4afd9e6c6c
commit cf21f981f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 60 additions and 6 deletions

View File

@ -62,6 +62,7 @@ var (
version = flag.Bool("version", false, "Print version")
listenAddr = flag.String("listenaddr", ":30303", "IP address and port of this node (e.g. 127.0.0.1:30303)")
advertiseAddr = flag.String("advertiseaddr", "", "IP address the node wants to reached with (useful if floating IP is used)")
fleet = flag.String("fleet", params.FleetBeta, "Name of the fleet like 'eth.staging' (default to 'eth.beta')")
standalone = flag.Bool("standalone", true, "Don't actively connect to peers, wait for incoming connections")
bootnodes = flag.String("bootnodes", "", "A list of bootnodes separated by comma")
@ -217,6 +218,7 @@ func makeNodeConfig() (*params.NodeConfig, error) {
}
nodeConfig.ListenAddr = *listenAddr
nodeConfig.AdvertiseAddr = *advertiseAddr
// TODO(divan): move this logic into params package
if *nodeKeyFile != "" {

View File

@ -18,6 +18,7 @@ import (
const (
registrationPeriod = 10 * time.Second
requestTimeout = 5 * time.Second
bucketSize = 10
)
@ -44,6 +45,11 @@ type Rendezvous struct {
mu sync.RWMutex
client *rendezvous.Client
// Root context is used to cancel running requests
// when Rendezvous is stopped.
rootCtx context.Context
cancelRootCtx context.CancelFunc
servers []ma.Multiaddr
registrationPeriod time.Duration
bucketSize int
@ -65,6 +71,7 @@ func (r *Rendezvous) Start() error {
return err
}
r.client = &client
r.rootCtx, r.cancelRootCtx = context.WithCancel(context.Background())
return nil
}
@ -72,13 +79,14 @@ func (r *Rendezvous) Start() error {
func (r *Rendezvous) Stop() error {
r.mu.Lock()
defer r.mu.Unlock()
r.cancelRootCtx()
r.client = nil
return nil
}
func (r *Rendezvous) register(topic string) {
srv := r.servers[rand.Intn(len(r.servers))]
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(r.rootCtx, requestTimeout)
r.mu.RLock()
defer r.mu.RUnlock()
defer cancel()
@ -104,8 +112,8 @@ func (r *Rendezvous) Register(topic string, stop chan struct{}) error {
// Discover will search for new records every time period fetched from period channel.
func (r *Rendezvous) Discover(
topic string, period <-chan time.Duration,
found chan<- *discv5.Node, lookup chan<- bool) error {
topic string, period <-chan time.Duration, found chan<- *discv5.Node, lookup chan<- bool,
) error {
ticker := time.NewTicker(<-period)
for {
select {
@ -117,7 +125,7 @@ func (r *Rendezvous) Discover(
ticker = time.NewTicker(newPeriod)
case <-ticker.C:
srv := r.servers[rand.Intn(len(r.servers))]
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
ctx, cancel := context.WithTimeout(r.rootCtx, requestTimeout)
r.mu.RLock()
records, err := r.client.Discover(ctx, srv, topic, r.bucketSize)
r.mu.RUnlock()

View File

@ -4,8 +4,10 @@ import (
"context"
"errors"
"fmt"
"net"
"os"
"path/filepath"
"reflect"
"sync"
"time"
@ -185,6 +187,19 @@ func (n *StatusNode) discoveryEnabled() bool {
return n.config != nil && (!n.config.NoDiscovery || n.config.Rendezvous) && n.config.ClusterConfig != nil
}
func (n *StatusNode) discoverNode() *discover.Node {
if !n.isRunning() {
return nil
}
discNode := n.gethNode.Server().Self()
if n.config.AdvertiseAddr != "" {
n.log.Info("using AdvertiseAddr for rendezvous", "addr", n.config.AdvertiseAddr)
discNode.IP = net.ParseIP(n.config.AdvertiseAddr)
}
return discNode
}
func (n *StatusNode) startRendezvous() (discovery.Discovery, error) {
if !n.config.Rendezvous {
return nil, errors.New("rendezvous is not enabled")
@ -200,8 +215,7 @@ func (n *StatusNode) startRendezvous() (discovery.Discovery, error) {
return nil, fmt.Errorf("failed to parse rendezvous node %s: %v", n.config.ClusterConfig.RendezvousNodes[0], err)
}
}
srv := n.gethNode.Server()
return discovery.NewRendezvous(maddrs, srv.PrivateKey, srv.Self())
return discovery.NewRendezvous(maddrs, n.gethNode.Server().PrivateKey, n.discoverNode())
}
func (n *StatusNode) startDiscovery() error {
@ -226,6 +240,12 @@ func (n *StatusNode) startDiscovery() error {
} else {
n.discovery = discoveries[0]
}
log.Debug(
"using discovery",
"instance", reflect.TypeOf(n.discovery),
"registerTopics", n.config.RegisterTopics,
"requireTopics", n.config.RequireTopics,
)
n.register = peers.NewRegister(n.discovery, n.config.RegisterTopics...)
options := peers.NewDefaultOptions()
// TODO(dshulyak) consider adding a flag to define this behaviour

View File

@ -3,6 +3,7 @@ package node
import (
"io/ioutil"
"math"
"net"
"os"
"path"
"reflect"
@ -283,3 +284,22 @@ func TestStatusNodeRendezvousDiscovery(t *testing.T) {
require.True(t, n.discovery.Running())
require.IsType(t, &discovery.Rendezvous{}, n.discovery)
}
func TestStatusNodeDiscoverNode(t *testing.T) {
config := params.NodeConfig{
NoDiscovery: true,
ListenAddr: "127.0.0.1:0",
}
n := New()
require.NoError(t, n.Start(&config))
require.Equal(t, net.ParseIP("127.0.0.1").To4(), n.discoverNode().IP)
config = params.NodeConfig{
NoDiscovery: true,
AdvertiseAddr: "127.0.0.2",
ListenAddr: "127.0.0.1:0",
}
n = New()
require.NoError(t, n.Start(&config))
require.Equal(t, net.ParseIP("127.0.0.2"), n.discoverNode().IP)
}

View File

@ -270,6 +270,10 @@ type NodeConfig struct {
// ListenAddr is an IP address and port of this node (e.g. 127.0.0.1:30303).
ListenAddr string
// AdvertiseAddr is a public IP address the node wants to be found with.
// It is especially useful when using floating IPs attached to a server.
AdvertiseAddr string
// Name sets the instance name of the node. It must not contain the / character.
Name string `validate:"excludes=/"`