Explicitly disable discovery (#886)
* Make it possible to explicitly disable discovery Discovery will be disabled in following cases: - if there are not bootnodes - v5 server will be disabled because there is no point in running it - if user defined in config NoDiscovery=true this value will be preserved even if we have bootnodes So, basically discovery will be always enabled by default on mobile, unless it is explicitly specified otherwise. When statusd is used current behavior is that discovery is disabled by default. I kept it in this change, but it would be better to change it. * Fix leftovers * Add wait group to peer pool to protect from races with p2p.Server * Change fields only when all goroutines finished * Turn off discovery after topic searches are stopped * Don't set period to nil to avoid race with SearchTopic * Close period chan only when all writers are finished
This commit is contained in:
parent
f7dac0bcc4
commit
375d5ec8c3
|
@ -261,7 +261,7 @@ func makeNodeConfig() (*params.NodeConfig, error) {
|
||||||
nodeConfig.ClusterConfig.BootNodes = nil
|
nodeConfig.ClusterConfig.BootNodes = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
nodeConfig.Discovery = *discovery
|
nodeConfig.NoDiscovery = !(*discovery)
|
||||||
nodeConfig.RequireTopics = map[discv5.Topic]params.Limits(searchTopics)
|
nodeConfig.RequireTopics = map[discv5.Topic]params.Limits(searchTopics)
|
||||||
nodeConfig.RegisterTopics = []discv5.Topic(registerTopics)
|
nodeConfig.RegisterTopics = []discv5.Topic(registerTopics)
|
||||||
|
|
||||||
|
|
|
@ -114,8 +114,8 @@ func defaultEmbeddedNodeConfig(config *params.NodeConfig) *node.Config {
|
||||||
Name: config.Name,
|
Name: config.Name,
|
||||||
Version: config.Version,
|
Version: config.Version,
|
||||||
P2P: p2p.Config{
|
P2P: p2p.Config{
|
||||||
NoDiscovery: true,
|
NoDiscovery: true, // we always use only v5 server
|
||||||
DiscoveryV5: config.Discovery,
|
DiscoveryV5: !config.NoDiscovery,
|
||||||
ListenAddr: config.ListenAddr,
|
ListenAddr: config.ListenAddr,
|
||||||
NAT: nat.Any(),
|
NAT: nat.Any(),
|
||||||
MaxPeers: config.MaxPeers,
|
MaxPeers: config.MaxPeers,
|
||||||
|
|
|
@ -110,11 +110,10 @@ func (n *StatusNode) Start(config *params.NodeConfig, services ...node.ServiceCo
|
||||||
if err := n.setupDeduplicator(); err != nil {
|
if err := n.setupDeduplicator(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if n.config.NoDiscovery {
|
||||||
if n.config.Discovery {
|
return nil
|
||||||
return n.startPeerPool()
|
|
||||||
}
|
}
|
||||||
return nil
|
return n.startPeerPool()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *StatusNode) setupDeduplicator() error {
|
func (n *StatusNode) setupDeduplicator() error {
|
||||||
|
@ -224,7 +223,7 @@ func (n *StatusNode) stop() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *StatusNode) stopPeerPool() error {
|
func (n *StatusNode) stopPeerPool() error {
|
||||||
if !n.config.Discovery {
|
if n.config.NoDiscovery {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,6 @@ package params
|
||||||
|
|
||||||
type cluster struct {
|
type cluster struct {
|
||||||
NetworkID int `json:"networkID"`
|
NetworkID int `json:"networkID"`
|
||||||
Discovery bool `json:"discovery"`
|
|
||||||
StaticNodes []string `json:"staticnodes"`
|
StaticNodes []string `json:"staticnodes"`
|
||||||
BootNodes []string `json:"bootnodes"`
|
BootNodes []string `json:"bootnodes"`
|
||||||
}
|
}
|
||||||
|
@ -17,7 +16,6 @@ var ropstenCluster = cluster{
|
||||||
|
|
||||||
var rinkebyCluster = cluster{
|
var rinkebyCluster = cluster{
|
||||||
NetworkID: 4,
|
NetworkID: 4,
|
||||||
Discovery: true,
|
|
||||||
BootNodes: []string{
|
BootNodes: []string{
|
||||||
"enode://1b843c7697f6fc42a1f606fb3cfaac54e025f06789dc20ad9278be3388967cf21e3a1b1e4be51faecd66c2c3adef12e942b4fcdeb8727657abe60636efb6224f@206.189.6.46:30404",
|
"enode://1b843c7697f6fc42a1f606fb3cfaac54e025f06789dc20ad9278be3388967cf21e3a1b1e4be51faecd66c2c3adef12e942b4fcdeb8727657abe60636efb6224f@206.189.6.46:30404",
|
||||||
"enode://b29100c8468e3e6604817174a15e4d71627458b0dcdbeea169ab2eb4ab2bbc6f24adbb175826726cec69db8fdba6c0dd60b3da598e530ede562180d300728659@206.189.6.48:30404",
|
"enode://b29100c8468e3e6604817174a15e4d71627458b0dcdbeea169ab2eb4ab2bbc6f24adbb175826726cec69db8fdba6c0dd60b3da598e530ede562180d300728659@206.189.6.48:30404",
|
||||||
|
|
|
@ -228,8 +228,8 @@ type NodeConfig struct {
|
||||||
// remote peer identification as well as network traffic encryption.
|
// remote peer identification as well as network traffic encryption.
|
||||||
NodeKeyFile string
|
NodeKeyFile string
|
||||||
|
|
||||||
// Discovery set to true will enabled discovery protocol.
|
// NoDiscovery set to true will disable discovery protocol.
|
||||||
Discovery bool
|
NoDiscovery bool
|
||||||
|
|
||||||
// ListenAddr is an IP address and port of this node (e.g. 127.0.0.1:30303).
|
// ListenAddr is an IP address and port of this node (e.g. 127.0.0.1:30303).
|
||||||
ListenAddr string
|
ListenAddr string
|
||||||
|
@ -576,9 +576,13 @@ func (c *NodeConfig) updateClusterConfig() error {
|
||||||
|
|
||||||
for _, cluster := range clusters {
|
for _, cluster := range clusters {
|
||||||
if cluster.NetworkID == int(c.NetworkID) {
|
if cluster.NetworkID == int(c.NetworkID) {
|
||||||
c.Discovery = cluster.Discovery
|
|
||||||
c.ClusterConfig.BootNodes = cluster.BootNodes
|
c.ClusterConfig.BootNodes = cluster.BootNodes
|
||||||
c.ClusterConfig.StaticNodes = cluster.StaticNodes
|
c.ClusterConfig.StaticNodes = cluster.StaticNodes
|
||||||
|
// no point in running discovery if we don't have bootnodes.
|
||||||
|
// but in case if we do have nodes and NoDiscovery=true we will preserve that value
|
||||||
|
if len(cluster.BootNodes) == 0 {
|
||||||
|
c.NoDiscovery = true
|
||||||
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -608,7 +612,7 @@ func (c *NodeConfig) updateRelativeDirsConfig() error {
|
||||||
|
|
||||||
// updatePeerLimits will set default peer limits expectations based on enabled services.
|
// updatePeerLimits will set default peer limits expectations based on enabled services.
|
||||||
func (c *NodeConfig) updatePeerLimits() {
|
func (c *NodeConfig) updatePeerLimits() {
|
||||||
if !c.Discovery {
|
if c.NoDiscovery {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if c.WhisperConfig.Enabled {
|
if c.WhisperConfig.Enabled {
|
||||||
|
|
|
@ -302,7 +302,7 @@ var loadConfigTestCases = []struct {
|
||||||
func(t *testing.T, dataDir string, nodeConfig *params.NodeConfig, err error) {
|
func(t *testing.T, dataDir string, nodeConfig *params.NodeConfig, err error) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.True(t, nodeConfig.ClusterConfig.Enabled, "cluster configuration is expected to be enabled by default")
|
require.True(t, nodeConfig.ClusterConfig.Enabled, "cluster configuration is expected to be enabled by default")
|
||||||
require.True(t, nodeConfig.Discovery)
|
require.False(t, nodeConfig.NoDiscovery)
|
||||||
require.True(t, len(nodeConfig.ClusterConfig.BootNodes) >= 2)
|
require.True(t, len(nodeConfig.ClusterConfig.BootNodes) >= 2)
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -343,11 +343,23 @@ var loadConfigTestCases = []struct {
|
||||||
func(t *testing.T, dataDir string, nodeConfig *params.NodeConfig, err error) {
|
func(t *testing.T, dataDir string, nodeConfig *params.NodeConfig, err error) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotNil(t, nodeConfig.RequireTopics)
|
require.NotNil(t, nodeConfig.RequireTopics)
|
||||||
require.True(t, nodeConfig.Discovery)
|
require.False(t, nodeConfig.NoDiscovery)
|
||||||
require.Contains(t, nodeConfig.RequireTopics, params.WhisperDiscv5Topic)
|
require.Contains(t, nodeConfig.RequireTopics, params.WhisperDiscv5Topic)
|
||||||
require.Equal(t, params.WhisperDiscv5Limits, nodeConfig.RequireTopics[params.WhisperDiscv5Topic])
|
require.Equal(t, params.WhisperDiscv5Limits, nodeConfig.RequireTopics[params.WhisperDiscv5Topic])
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
`no discovery preserved`,
|
||||||
|
`{
|
||||||
|
"NetworkId": 4,
|
||||||
|
"DataDir": "$TMPDIR",
|
||||||
|
"NoDiscovery": true
|
||||||
|
}`,
|
||||||
|
func(t *testing.T, dataDir string, nodeConfig *params.NodeConfig, err error) {
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, nodeConfig.NoDiscovery)
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestLoadNodeConfig tests loading JSON configuration and setting default values.
|
// TestLoadNodeConfig tests loading JSON configuration and setting default values.
|
||||||
|
|
|
@ -75,6 +75,7 @@ type PeerPool struct {
|
||||||
serverSubscription event.Subscription
|
serverSubscription event.Subscription
|
||||||
events chan *p2p.PeerEvent
|
events chan *p2p.PeerEvent
|
||||||
quit chan struct{}
|
quit chan struct{}
|
||||||
|
wg sync.WaitGroup
|
||||||
timeout <-chan time.Time
|
timeout <-chan time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -100,8 +101,10 @@ func (p *PeerPool) Start(server *p2p.Server) error {
|
||||||
|
|
||||||
p.events = make(chan *p2p.PeerEvent, 20)
|
p.events = make(chan *p2p.PeerEvent, 20)
|
||||||
p.serverSubscription = server.SubscribeEvents(p.events)
|
p.serverSubscription = server.SubscribeEvents(p.events)
|
||||||
|
p.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
p.handleServerPeers(server, p.events)
|
p.handleServerPeers(server, p.events)
|
||||||
|
p.wg.Done()
|
||||||
}()
|
}()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -130,16 +133,16 @@ func (p *PeerPool) stopDiscovery(server *p2p.Server) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, t := range p.topics {
|
||||||
|
t.StopSearch()
|
||||||
|
}
|
||||||
|
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
server.DiscV5.Close()
|
server.DiscV5.Close()
|
||||||
server.DiscV5 = nil
|
server.DiscV5 = nil
|
||||||
p.timeout = nil
|
p.timeout = nil
|
||||||
p.mu.Unlock()
|
p.mu.Unlock()
|
||||||
|
|
||||||
for _, t := range p.topics {
|
|
||||||
t.StopSearch()
|
|
||||||
}
|
|
||||||
|
|
||||||
signal.SendDiscoveryStopped()
|
signal.SendDiscoveryStopped()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -262,4 +265,5 @@ func (p *PeerPool) Stop() {
|
||||||
close(p.quit)
|
close(p.quit)
|
||||||
}
|
}
|
||||||
p.serverSubscription.Unsubscribe()
|
p.serverSubscription.Unsubscribe()
|
||||||
|
p.wg.Wait()
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,9 +41,10 @@ type TopicPool struct {
|
||||||
slowMode time.Duration
|
slowMode time.Duration
|
||||||
fastModeTimeout time.Duration
|
fastModeTimeout time.Duration
|
||||||
|
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
wg sync.WaitGroup // wait group for all goroutines spawn during TopicPool operation
|
discWG sync.WaitGroup
|
||||||
quit chan struct{}
|
poolWG sync.WaitGroup
|
||||||
|
quit chan struct{}
|
||||||
|
|
||||||
running int32
|
running int32
|
||||||
|
|
||||||
|
@ -159,9 +160,9 @@ func (t *TopicPool) limitFastMode(timeout time.Duration) chan struct{} {
|
||||||
|
|
||||||
cancel := make(chan struct{})
|
cancel := make(chan struct{})
|
||||||
|
|
||||||
t.wg.Add(1)
|
t.poolWG.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer t.wg.Done()
|
defer t.poolWG.Done()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-time.After(timeout):
|
case <-time.After(timeout):
|
||||||
|
@ -318,15 +319,15 @@ func (t *TopicPool) StartSearch(server *p2p.Server) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
t.wg.Add(1)
|
t.discWG.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
server.DiscV5.SearchTopic(t.topic, t.period, found, lookup)
|
server.DiscV5.SearchTopic(t.topic, t.period, found, lookup)
|
||||||
t.wg.Done()
|
t.discWG.Done()
|
||||||
}()
|
}()
|
||||||
t.wg.Add(1)
|
t.poolWG.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
t.handleFoundPeers(server, found, lookup)
|
t.handleFoundPeers(server, found, lookup)
|
||||||
t.wg.Done()
|
t.poolWG.Done()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -409,15 +410,18 @@ func (t *TopicPool) StopSearch() {
|
||||||
case <-t.quit:
|
case <-t.quit:
|
||||||
return
|
return
|
||||||
default:
|
default:
|
||||||
log.Debug("stoping search", "topic", t.topic)
|
|
||||||
if t.fastModeTimeoutCancel != nil {
|
|
||||||
close(t.fastModeTimeoutCancel)
|
|
||||||
t.fastModeTimeoutCancel = nil
|
|
||||||
}
|
|
||||||
close(t.period)
|
|
||||||
t.period = nil
|
|
||||||
t.currentMode = 0
|
|
||||||
close(t.quit)
|
|
||||||
}
|
}
|
||||||
t.wg.Wait()
|
log.Debug("stoping search", "topic", t.topic)
|
||||||
|
close(t.quit)
|
||||||
|
t.mu.Lock()
|
||||||
|
if t.fastModeTimeoutCancel != nil {
|
||||||
|
close(t.fastModeTimeoutCancel)
|
||||||
|
t.fastModeTimeoutCancel = nil
|
||||||
|
}
|
||||||
|
t.currentMode = 0
|
||||||
|
t.mu.Unlock()
|
||||||
|
// wait for poolWG to exit because it writes to period channel
|
||||||
|
t.poolWG.Wait()
|
||||||
|
close(t.period)
|
||||||
|
t.discWG.Wait()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue