Decouple discovery from p2p server

This commit is contained in:
Dmitry 2018-07-03 14:27:04 +03:00 committed by Dmitry Shulyak
parent 8ce522f430
commit 737f966dbe
7 changed files with 194 additions and 95 deletions

View File

@ -48,9 +48,10 @@ type StatusNode struct {
rpcClient *rpc.Client // reference to public RPC client
rpcPrivateClient *rpc.Client // reference to private RPC client (can call private APIs)
register *peers.Register
peerPool *peers.PeerPool
db *leveldb.DB // used as a cache for PeerPool
discovery peers.Discovery
register *peers.Register
peerPool *peers.PeerPool
db *leveldb.DB // used as a cache for PeerPool
log log.Logger
}
@ -104,10 +105,10 @@ func (n *StatusNode) startWithDB(config *params.NodeConfig, db *leveldb.DB, serv
return err
}
if n.config.NoDiscovery {
return nil
if n.discoveryEnabled() {
return n.startDiscovery()
}
return n.startPeerPool()
return nil
}
// Start starts current StatusNode, will fail if it's already started.
@ -176,17 +177,29 @@ func (n *StatusNode) setupRPCClient() (err error) {
return
}
func (n *StatusNode) startPeerPool() error {
n.register = peers.NewRegister(n.config.RegisterTopics...)
func (n *StatusNode) discoveryEnabled() bool {
return n.config != nil && !n.config.NoDiscovery && n.config.ClusterConfig != nil
}
func (n *StatusNode) startDiscovery() error {
n.discovery = peers.NewDiscV5(
n.gethNode.Server().PrivateKey,
n.config.ListenAddr,
parseNodesV5(n.config.ClusterConfig.BootNodes))
n.register = peers.NewRegister(n.discovery, n.config.RegisterTopics...)
options := peers.NewDefaultOptions()
// TODO(dshulyak) consider adding a flag to define this behaviour
options.AllowStop = len(n.config.RegisterTopics) == 0
n.peerPool = peers.NewPeerPool(
n.discovery,
n.config.RequireTopics,
peers.NewCache(n.db),
options,
)
if err := n.register.Start(n.gethNode.Server()); err != nil {
if err := n.discovery.Start(); err != nil {
return err
}
if err := n.register.Start(); err != nil {
return err
}
return n.peerPool.Start(n.gethNode.Server())
@ -206,11 +219,13 @@ func (n *StatusNode) Stop() error {
// stop will stop current StatusNode. A stopped node cannot be resumed.
func (n *StatusNode) stop() error {
if err := n.stopPeerPool(); err != nil {
n.log.Error("Error stopping the PeerPool", "error", err)
if n.discoveryEnabled() {
if err := n.stopDiscovery(); err != nil {
n.log.Error("Error stopping the PeerPool", "error", err)
}
n.register = nil
n.peerPool = nil
}
n.register = nil
n.peerPool = nil
if err := n.gethNode.Stop(); err != nil {
return err
@ -234,14 +249,10 @@ func (n *StatusNode) stop() error {
return nil
}
func (n *StatusNode) stopPeerPool() error {
if n.config == nil || n.config.NoDiscovery {
return nil
}
func (n *StatusNode) stopDiscovery() error {
n.register.Stop()
n.peerPool.Stop()
return nil
return n.discovery.Stop()
}
// ResetChainData removes chain data if node is not running.

View File

@ -1,31 +1,97 @@
package peers
import (
"crypto/ecdsa"
"net"
"sync"
"time"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discv5"
)
// StartDiscv5 starts discv5 udp listener.
// This is done here to avoid patching p2p server, we can't hold a lock of course
// but no other sub-process should use discovery
func StartDiscv5(server *p2p.Server) (*discv5.Network, error) {
addr, err := net.ResolveUDPAddr("udp", server.ListenAddr)
// Discovery is an abstract interface for using different discovery providers.
type Discovery interface {
Running() bool
Start() error
Stop() error
Register(topic string, stop chan struct{}) error
Discover(topic string, period <-chan time.Duration, found chan<- *discv5.Node, lookup chan<- bool) error
}
// NewDiscV5 creates instances of discovery v5 facade.
func NewDiscV5(prv *ecdsa.PrivateKey, laddr string, bootnodes []*discv5.Node) *DiscV5 {
return &DiscV5{
prv: prv,
laddr: laddr,
bootnodes: bootnodes,
}
}
// DiscV5 is a facade for ethereum discv5 implementation.
type DiscV5 struct {
mu sync.Mutex
net *discv5.Network
prv *ecdsa.PrivateKey
laddr string
bootnodes []*discv5.Node
}
// Running returns true if v5 server is started.
func (d *DiscV5) Running() bool {
d.mu.Lock()
defer d.mu.Unlock()
return d.net != nil
}
// Start creates v5 server and stores pointer to it.
func (d *DiscV5) Start() error {
d.mu.Lock()
defer d.mu.Unlock()
log.Debug("Starting discovery", "listen address", d.laddr)
addr, err := net.ResolveUDPAddr("udp", d.laddr)
if err != nil {
return nil, err
return err
}
conn, err := net.ListenUDP("udp", addr)
if err != nil {
return nil, err
return err
}
realaddr := conn.LocalAddr().(*net.UDPAddr)
ntab, err := discv5.ListenUDP(server.PrivateKey, conn, realaddr, "", server.NetRestrict)
ntab, err := discv5.ListenUDP(d.prv, conn, realaddr, "", nil)
if err != nil {
return nil, err
return err
}
if err := ntab.SetFallbackNodes(server.BootstrapNodesV5); err != nil {
return nil, err
if err := ntab.SetFallbackNodes(d.bootnodes); err != nil {
return err
}
return ntab, nil
d.net = ntab
return nil
}
// Stop closes v5 server listener and removes pointer.
func (d *DiscV5) Stop() error {
d.mu.Lock()
defer d.mu.Unlock()
if d.net == nil {
return nil
}
d.net.Close()
d.net = nil
return nil
}
// Register creates a register request in v5 server.
// It will block until stop is closed.
func (d *DiscV5) Register(topic string, stop chan struct{}) error {
d.net.RegisterTopic(discv5.Topic(topic), stop)
return nil
}
// Discover creates search request in v5 server. Results will be published to found channel.
// It will block until period is closed.
func (d *DiscV5) Discover(topic string, period <-chan time.Duration, found chan<- *discv5.Node, lookup chan<- bool) error {
d.net.SearchTopic(discv5.Topic(topic), period, found, lookup)
return nil
}

View File

@ -78,6 +78,8 @@ type peerInfo struct {
type PeerPool struct {
opts *Options
discovery Discovery
// config can be set only once per pool life cycle
config map[discv5.Topic]params.Limits
cache *Cache
@ -92,11 +94,12 @@ type PeerPool struct {
}
// NewPeerPool creates instance of PeerPool
func NewPeerPool(config map[discv5.Topic]params.Limits, cache *Cache, options *Options) *PeerPool {
func NewPeerPool(discovery Discovery, config map[discv5.Topic]params.Limits, cache *Cache, options *Options) *PeerPool {
return &PeerPool{
opts: options,
config: config,
cache: cache,
opts: options,
discovery: discovery,
config: config,
cache: cache,
}
}
@ -108,7 +111,7 @@ func (p *PeerPool) setDiscoveryTimeout() {
// Start creates topic pool for each topic in config and subscribes to server events.
func (p *PeerPool) Start(server *p2p.Server) error {
if server.DiscV5 == nil {
if !p.discovery.Running() {
return ErrDiscv5NotRunning
}
@ -131,7 +134,7 @@ func (p *PeerPool) Start(server *p2p.Server) error {
// collect topics and start searching for nodes
p.topics = make([]*TopicPool, 0, len(p.config))
for topic, limits := range p.config {
topicPool := NewTopicPool(topic, limits, p.opts.SlowSync, p.opts.FastSync, p.cache)
topicPool := NewTopicPool(p.discovery, topic, limits, p.opts.SlowSync, p.opts.FastSync, p.cache)
if err := topicPool.StartSearch(server); err != nil {
return err
}
@ -144,18 +147,16 @@ func (p *PeerPool) Start(server *p2p.Server) error {
return nil
}
func (p *PeerPool) startDiscovery(server *p2p.Server) error {
if server.DiscV5 != nil {
func (p *PeerPool) startDiscovery() error {
if p.discovery.Running() {
return nil
}
ntab, err := StartDiscv5(server)
if err != nil {
if err := p.discovery.Start(); err != nil {
return err
}
p.mu.Lock()
server.DiscV5 = ntab
p.setDiscoveryTimeout()
p.mu.Unlock()
@ -164,18 +165,19 @@ func (p *PeerPool) startDiscovery(server *p2p.Server) error {
return nil
}
func (p *PeerPool) stopDiscovery(server *p2p.Server) {
if server.DiscV5 == nil {
func (p *PeerPool) stopDiscovery() {
if !p.discovery.Running() {
return
}
for _, t := range p.topics {
t.StopSearch()
}
if err := p.discovery.Stop(); err != nil {
log.Error("discovery errored when was closed", "err", err)
}
p.mu.Lock()
server.DiscV5.Close()
server.DiscV5 = nil
p.timeout = nil
p.mu.Unlock()
@ -184,8 +186,8 @@ func (p *PeerPool) stopDiscovery(server *p2p.Server) {
// restartDiscovery and search for topics that have peer count below min
func (p *PeerPool) restartDiscovery(server *p2p.Server) error {
if server.DiscV5 == nil {
if err := p.startDiscovery(server); err != nil {
if !p.discovery.Running() {
if err := p.startDiscovery(); err != nil {
return err
}
log.Debug("restarted discovery from peer pool")
@ -218,19 +220,19 @@ func (p *PeerPool) handleServerPeers(server *p2p.Server, events <-chan *p2p.Peer
select {
case <-p.quit:
log.Debug("stopping DiscV5 because of quit", "server", server.Self())
p.stopDiscovery(server)
log.Debug("stopping DiscV5 because of quit")
p.stopDiscovery()
return
case <-timeout:
log.Info("DiscV5 timed out", "server", server.Self())
p.stopDiscovery(server)
log.Info("DiscV5 timed out")
p.stopDiscovery()
case <-retryDiscv5:
if err := p.restartDiscovery(server); err != nil {
retryDiscv5 = time.After(discoveryRestartTimeout)
log.Error("starting discv5 failed", "error", err, "retry", discoveryRestartTimeout)
}
case <-stopDiscv5:
p.handleStopTopics(server)
p.handleStopTopics()
case event := <-events:
switch event.Type {
case p2p.PeerEventTypeDrop:
@ -265,7 +267,7 @@ func (p *PeerPool) handleAddedPeer(server *p2p.Server, nodeID discover.NodeID) {
// handleStopTopics stops the search on any topics having reached its max cached
// limit or its delay stop is expired, additionally will stop discovery if all
// peers are stopped.
func (p *PeerPool) handleStopTopics(server *p2p.Server) {
func (p *PeerPool) handleStopTopics() {
if !p.opts.AllowStop {
return
}
@ -275,8 +277,8 @@ func (p *PeerPool) handleStopTopics(server *p2p.Server) {
}
}
if p.allTopicsStopped() {
log.Debug("closing discv5 connection because all topics reached max limit", "server", server.Self())
p.stopDiscovery(server)
log.Debug("closing discv5 connection because all topics reached max limit")
p.stopDiscovery()
}
}

View File

@ -14,6 +14,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/ethereum/go-ethereum/whisper/whisperv6"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
@ -27,9 +28,10 @@ import (
type PeerPoolSimulationSuite struct {
suite.Suite
bootnode *p2p.Server
peers []*p2p.Server
port uint16
bootnode *p2p.Server
peers []*p2p.Server
discovery []Discovery
port uint16
}
func TestPeerPoolSimulationSuite(t *testing.T) {
@ -63,6 +65,7 @@ func (s *PeerPoolSimulationSuite) SetupTest() {
// 1 peer to initiate connection, 1 peer as a first candidate, 1 peer - for failover
s.peers = make([]*p2p.Server, 3)
s.discovery = make([]Discovery, 3)
for i := range s.peers {
key, _ := crypto.GenerateKey()
whisper := whisperv6.New(nil)
@ -72,7 +75,6 @@ func (s *PeerPoolSimulationSuite) SetupTest() {
Name: common.MakeName("peer-"+strconv.Itoa(i), "1.0"),
ListenAddr: fmt.Sprintf("0.0.0.0:%d", s.nextPort()),
PrivateKey: key,
DiscoveryV5: true,
NoDiscovery: true,
BootstrapNodesV5: []*discv5.Node{bootnodeV5},
Protocols: whisper.Protocols(),
@ -80,13 +82,17 @@ func (s *PeerPoolSimulationSuite) SetupTest() {
}
s.NoError(peer.Start())
s.peers[i] = peer
d := NewDiscV5(key, peer.ListenAddr, peer.BootstrapNodesV5)
s.NoError(d.Start())
s.discovery[i] = d
}
}
func (s *PeerPoolSimulationSuite) TearDown() {
s.bootnode.Stop()
for _, p := range s.peers {
p.Stop()
for i := range s.peers {
s.peers[i].Stop()
s.NoError(s.discovery[i].Stop())
}
}
@ -124,7 +130,7 @@ func (s *PeerPoolSimulationSuite) TestPeerPoolCache() {
peerPoolOpts := &Options{100 * time.Millisecond, 100 * time.Millisecond, 0, true, 100 * time.Millisecond}
cache, err := newInMemoryCache()
s.Require().NoError(err)
peerPool := NewPeerPool(config, cache, peerPoolOpts)
peerPool := NewPeerPool(s.discovery[1], config, cache, peerPoolOpts)
// start peer pool
s.Require().NoError(peerPool.Start(s.peers[1]))
@ -173,12 +179,11 @@ func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() {
peerPoolOpts := &Options{100 * time.Millisecond, 100 * time.Millisecond, 0, true, 0}
cache, err := newInMemoryCache()
s.Require().NoError(err)
peerPool := NewPeerPool(config, cache, peerPoolOpts)
peerPool := NewPeerPool(s.discovery[1], config, cache, peerPoolOpts)
// create and start topic registry
register := NewRegister(topic)
err = register.Start(s.peers[0])
s.Require().NoError(err)
register := NewRegister(s.discovery[0], topic)
s.Require().NoError(register.Start())
// subscribe for peer events before starting the peer pool
events := make(chan *p2p.PeerEvent, 20)
@ -210,8 +215,8 @@ func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() {
s.Equal(signal.EventDiscoveryStarted, s.getPoolEvent(poolEvents))
// register the second peer
err = register.Start(s.peers[2])
s.Require().NoError(err)
register = NewRegister(s.discovery[2], topic)
s.Require().NoError(register.Start())
defer register.Stop()
s.Equal(s.peers[2].Self().ID, s.getPeerFromEvent(events, p2p.PeerEventTypeAdd))
// Discovery can be stopped again.
@ -243,23 +248,25 @@ func TestPeerPoolMaxPeersOverflow(t *testing.T) {
peer := &p2p.Server{
Config: p2p.Config{
PrivateKey: key,
DiscoveryV5: true,
NoDiscovery: true,
},
}
require.NoError(t, peer.Start())
defer peer.Stop()
require.NotNil(t, peer.DiscV5)
discovery := NewDiscV5(key, peer.ListenAddr, nil)
require.NoError(t, discovery.Start())
defer func() { assert.NoError(t, discovery.Stop()) }()
require.True(t, discovery.Running())
poolOpts := &Options{DefaultFastSync, DefaultSlowSync, 0, true, 100 * time.Millisecond}
pool := NewPeerPool(nil, nil, poolOpts)
pool := NewPeerPool(discovery, nil, nil, poolOpts)
require.NoError(t, pool.Start(peer))
require.Equal(t, signal.EventDiscoveryStarted, <-signals)
// without config, it will stop the discovery because all topic pools are satisfied
pool.events <- &p2p.PeerEvent{Type: p2p.PeerEventTypeAdd}
require.Equal(t, signal.EventDiscoverySummary, <-signals)
require.Equal(t, signal.EventDiscoveryStopped, <-signals)
require.Nil(t, peer.DiscV5)
require.False(t, discovery.Running())
// another peer added after discovery is stopped should not panic
pool.events <- &p2p.PeerEvent{Type: p2p.PeerEventTypeAdd}
}
@ -292,17 +299,20 @@ func TestPeerPoolDiscV5Timeout(t *testing.T) {
server := &p2p.Server{
Config: p2p.Config{
PrivateKey: key,
DiscoveryV5: true,
NoDiscovery: true,
},
}
require.NoError(t, server.Start())
defer server.Stop()
require.NotNil(t, server.DiscV5)
discovery := NewDiscV5(key, server.ListenAddr, nil)
require.NoError(t, discovery.Start())
defer func() { assert.NoError(t, discovery.Stop()) }()
require.True(t, discovery.Running())
// start PeerPool
poolOpts := &Options{DefaultFastSync, DefaultSlowSync, time.Millisecond * 100, true, 100 * time.Millisecond}
pool := NewPeerPool(nil, nil, poolOpts)
pool := NewPeerPool(discovery, nil, nil, poolOpts)
require.NoError(t, pool.Start(server))
require.Equal(t, signal.EventDiscoveryStarted, <-signals)
@ -313,12 +323,12 @@ func TestPeerPoolDiscV5Timeout(t *testing.T) {
case <-time.After(pool.opts.DiscServerTimeout * 2):
t.Fatal("timed out")
}
require.Nil(t, server.DiscV5)
require.False(t, discovery.Running())
// timeout after discovery restart
require.NoError(t, pool.restartDiscovery(server))
require.Equal(t, signal.EventDiscoveryStarted, <-signals)
require.NotNil(t, server.DiscV5)
require.True(t, discovery.Running())
pool.events <- &p2p.PeerEvent{Type: p2p.PeerEventTypeDrop} // required to turn the loop and pick up new timeout
select {
case sig := <-signals:
@ -326,7 +336,7 @@ func TestPeerPoolDiscV5Timeout(t *testing.T) {
case <-time.After(pool.opts.DiscServerTimeout * 2):
t.Fatal("timed out")
}
require.Nil(t, server.DiscV5)
require.False(t, discovery.Running())
}
func TestPeerPoolNotAllowedStopping(t *testing.T) {
@ -336,20 +346,23 @@ func TestPeerPoolNotAllowedStopping(t *testing.T) {
server := &p2p.Server{
Config: p2p.Config{
PrivateKey: key,
DiscoveryV5: true,
NoDiscovery: true,
},
}
require.NoError(t, server.Start())
defer server.Stop()
require.NotNil(t, server.DiscV5)
discovery := NewDiscV5(key, server.ListenAddr, nil)
require.NoError(t, discovery.Start())
defer func() { assert.NoError(t, discovery.Stop()) }()
require.True(t, discovery.Running())
// start PeerPool
poolOpts := &Options{DefaultFastSync, DefaultSlowSync, time.Millisecond * 100, false, 100 * time.Millisecond}
pool := NewPeerPool(nil, nil, poolOpts)
pool := NewPeerPool(discovery, nil, nil, poolOpts)
require.NoError(t, pool.Start(server))
// wait 2x timeout duration
<-time.After(pool.opts.DiscServerTimeout * 2)
require.NotNil(t, server.DiscV5)
require.True(t, discovery.Running())
}

View File

@ -4,26 +4,26 @@ import (
"sync"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discv5"
)
// Register manages register topic queries
type Register struct {
topics []discv5.Topic
discovery Discovery
topics []discv5.Topic
wg sync.WaitGroup
quit chan struct{}
}
// NewRegister creates instance of topic register
func NewRegister(topics ...discv5.Topic) *Register {
return &Register{topics: topics}
func NewRegister(discovery Discovery, topics ...discv5.Topic) *Register {
return &Register{discovery: discovery, topics: topics}
}
// Start topic register query for every topic
func (r *Register) Start(server *p2p.Server) error {
if server.DiscV5 == nil {
func (r *Register) Start() error {
if !r.discovery.Running() {
return ErrDiscv5NotRunning
}
r.quit = make(chan struct{})
@ -31,7 +31,9 @@ func (r *Register) Start(server *p2p.Server) error {
r.wg.Add(1)
go func(t discv5.Topic) {
log.Debug("v5 register topic", "topic", t)
server.DiscV5.RegisterTopic(t, r.quit)
if err := r.discovery.Register(string(t), r.quit); err != nil {
log.Error("error registering topic", "topic", t, "error", err)
}
r.wg.Done()
}(topic)
}

View File

@ -24,8 +24,9 @@ const (
var maxCachedPeersMultiplier = 2
// NewTopicPool returns instance of TopicPool
func NewTopicPool(topic discv5.Topic, limits params.Limits, slowMode, fastMode time.Duration, cache *Cache) *TopicPool {
func NewTopicPool(discovery Discovery, topic discv5.Topic, limits params.Limits, slowMode, fastMode time.Duration, cache *Cache) *TopicPool {
pool := TopicPool{
discovery: discovery,
topic: topic,
limits: limits,
fastMode: fastMode,
@ -44,6 +45,8 @@ func NewTopicPool(topic discv5.Topic, limits params.Limits, slowMode, fastMode t
// TopicPool manages peers for topic.
type TopicPool struct {
discovery Discovery
// configuration
topic discv5.Topic
limits params.Limits
@ -351,7 +354,7 @@ func (t *TopicPool) StartSearch(server *p2p.Server) error {
if atomic.LoadInt32(&t.running) == 1 {
return nil
}
if server.DiscV5 == nil {
if !t.discovery.Running() {
return ErrDiscv5NotRunning
}
atomic.StoreInt32(&t.running, 1)
@ -378,7 +381,9 @@ func (t *TopicPool) StartSearch(server *p2p.Server) error {
t.discWG.Add(1)
go func() {
server.DiscV5.SearchTopic(t.topic, t.period, found, lookup)
if err := t.discovery.Discover(string(t.topic), t.period, found, lookup); err != nil {
log.Error("error searching foro", "topic", t.topic, "err", err)
}
t.discWG.Done()
}()
t.poolWG.Add(1)

View File

@ -42,7 +42,7 @@ func (s *TopicPoolSuite) SetupTest() {
limits := params.NewLimits(1, 2)
cache, err := newInMemoryCache()
s.Require().NoError(err)
s.topicPool = NewTopicPool(topic, limits, 100*time.Millisecond, 200*time.Millisecond, cache)
s.topicPool = NewTopicPool(&DiscV5{}, topic, limits, 100*time.Millisecond, 200*time.Millisecond, cache)
s.topicPool.running = 1
// This is a buffered channel to simplify testing.
// If your test generates more than 10 mode changes,