From 737f966dbe127c5ee4e587daa8be4454f4123db7 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Tue, 3 Jul 2018 14:27:04 +0300 Subject: [PATCH] Decouple discovery from p2p server --- node/status_node.go | 49 +++++++++++++--------- peers/discv5.go | 92 +++++++++++++++++++++++++++++++++++------ peers/peerpool.go | 52 ++++++++++++----------- peers/peerpool_test.go | 67 ++++++++++++++++++------------ peers/topic_register.go | 16 +++---- peers/topicpool.go | 11 +++-- peers/topicpool_test.go | 2 +- 7 files changed, 194 insertions(+), 95 deletions(-) diff --git a/node/status_node.go b/node/status_node.go index 8ce9efa2b..866d620c7 100644 --- a/node/status_node.go +++ b/node/status_node.go @@ -48,9 +48,10 @@ type StatusNode struct { rpcClient *rpc.Client // reference to public RPC client rpcPrivateClient *rpc.Client // reference to private RPC client (can call private APIs) - register *peers.Register - peerPool *peers.PeerPool - db *leveldb.DB // used as a cache for PeerPool + discovery peers.Discovery + register *peers.Register + peerPool *peers.PeerPool + db *leveldb.DB // used as a cache for PeerPool log log.Logger } @@ -104,10 +105,10 @@ func (n *StatusNode) startWithDB(config *params.NodeConfig, db *leveldb.DB, serv return err } - if n.config.NoDiscovery { - return nil + if n.discoveryEnabled() { + return n.startDiscovery() } - return n.startPeerPool() + return nil } // Start starts current StatusNode, will fail if it's already started. @@ -176,17 +177,29 @@ func (n *StatusNode) setupRPCClient() (err error) { return } -func (n *StatusNode) startPeerPool() error { - n.register = peers.NewRegister(n.config.RegisterTopics...) +func (n *StatusNode) discoveryEnabled() bool { + return n.config != nil && !n.config.NoDiscovery && n.config.ClusterConfig != nil +} + +func (n *StatusNode) startDiscovery() error { + n.discovery = peers.NewDiscV5( + n.gethNode.Server().PrivateKey, + n.config.ListenAddr, + parseNodesV5(n.config.ClusterConfig.BootNodes)) + n.register = peers.NewRegister(n.discovery, n.config.RegisterTopics...) options := peers.NewDefaultOptions() // TODO(dshulyak) consider adding a flag to define this behaviour options.AllowStop = len(n.config.RegisterTopics) == 0 n.peerPool = peers.NewPeerPool( + n.discovery, n.config.RequireTopics, peers.NewCache(n.db), options, ) - if err := n.register.Start(n.gethNode.Server()); err != nil { + if err := n.discovery.Start(); err != nil { + return err + } + if err := n.register.Start(); err != nil { return err } return n.peerPool.Start(n.gethNode.Server()) @@ -206,11 +219,13 @@ func (n *StatusNode) Stop() error { // stop will stop current StatusNode. A stopped node cannot be resumed. func (n *StatusNode) stop() error { - if err := n.stopPeerPool(); err != nil { - n.log.Error("Error stopping the PeerPool", "error", err) + if n.discoveryEnabled() { + if err := n.stopDiscovery(); err != nil { + n.log.Error("Error stopping the PeerPool", "error", err) + } + n.register = nil + n.peerPool = nil } - n.register = nil - n.peerPool = nil if err := n.gethNode.Stop(); err != nil { return err @@ -234,14 +249,10 @@ func (n *StatusNode) stop() error { return nil } -func (n *StatusNode) stopPeerPool() error { - if n.config == nil || n.config.NoDiscovery { - return nil - } - +func (n *StatusNode) stopDiscovery() error { n.register.Stop() n.peerPool.Stop() - return nil + return n.discovery.Stop() } // ResetChainData removes chain data if node is not running. diff --git a/peers/discv5.go b/peers/discv5.go index 96d794ff2..36ae159a4 100644 --- a/peers/discv5.go +++ b/peers/discv5.go @@ -1,31 +1,97 @@ package peers import ( + "crypto/ecdsa" "net" + "sync" + "time" - "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p/discv5" ) -// StartDiscv5 starts discv5 udp listener. -// This is done here to avoid patching p2p server, we can't hold a lock of course -// but no other sub-process should use discovery -func StartDiscv5(server *p2p.Server) (*discv5.Network, error) { - addr, err := net.ResolveUDPAddr("udp", server.ListenAddr) +// Discovery is an abstract interface for using different discovery providers. +type Discovery interface { + Running() bool + Start() error + Stop() error + Register(topic string, stop chan struct{}) error + Discover(topic string, period <-chan time.Duration, found chan<- *discv5.Node, lookup chan<- bool) error +} + +// NewDiscV5 creates instances of discovery v5 facade. +func NewDiscV5(prv *ecdsa.PrivateKey, laddr string, bootnodes []*discv5.Node) *DiscV5 { + return &DiscV5{ + prv: prv, + laddr: laddr, + bootnodes: bootnodes, + } +} + +// DiscV5 is a facade for ethereum discv5 implementation. +type DiscV5 struct { + mu sync.Mutex + net *discv5.Network + + prv *ecdsa.PrivateKey + laddr string + bootnodes []*discv5.Node +} + +// Running returns true if v5 server is started. +func (d *DiscV5) Running() bool { + d.mu.Lock() + defer d.mu.Unlock() + return d.net != nil +} + +// Start creates v5 server and stores pointer to it. +func (d *DiscV5) Start() error { + d.mu.Lock() + defer d.mu.Unlock() + log.Debug("Starting discovery", "listen address", d.laddr) + addr, err := net.ResolveUDPAddr("udp", d.laddr) if err != nil { - return nil, err + return err } conn, err := net.ListenUDP("udp", addr) if err != nil { - return nil, err + return err } realaddr := conn.LocalAddr().(*net.UDPAddr) - ntab, err := discv5.ListenUDP(server.PrivateKey, conn, realaddr, "", server.NetRestrict) + ntab, err := discv5.ListenUDP(d.prv, conn, realaddr, "", nil) if err != nil { - return nil, err + return err } - if err := ntab.SetFallbackNodes(server.BootstrapNodesV5); err != nil { - return nil, err + if err := ntab.SetFallbackNodes(d.bootnodes); err != nil { + return err } - return ntab, nil + d.net = ntab + return nil +} + +// Stop closes v5 server listener and removes pointer. +func (d *DiscV5) Stop() error { + d.mu.Lock() + defer d.mu.Unlock() + if d.net == nil { + return nil + } + d.net.Close() + d.net = nil + return nil +} + +// Register creates a register request in v5 server. +// It will block until stop is closed. +func (d *DiscV5) Register(topic string, stop chan struct{}) error { + d.net.RegisterTopic(discv5.Topic(topic), stop) + return nil +} + +// Discover creates search request in v5 server. Results will be published to found channel. +// It will block until period is closed. +func (d *DiscV5) Discover(topic string, period <-chan time.Duration, found chan<- *discv5.Node, lookup chan<- bool) error { + d.net.SearchTopic(discv5.Topic(topic), period, found, lookup) + return nil } diff --git a/peers/peerpool.go b/peers/peerpool.go index cdfa9c50d..cb288b1db 100644 --- a/peers/peerpool.go +++ b/peers/peerpool.go @@ -78,6 +78,8 @@ type peerInfo struct { type PeerPool struct { opts *Options + discovery Discovery + // config can be set only once per pool life cycle config map[discv5.Topic]params.Limits cache *Cache @@ -92,11 +94,12 @@ type PeerPool struct { } // NewPeerPool creates instance of PeerPool -func NewPeerPool(config map[discv5.Topic]params.Limits, cache *Cache, options *Options) *PeerPool { +func NewPeerPool(discovery Discovery, config map[discv5.Topic]params.Limits, cache *Cache, options *Options) *PeerPool { return &PeerPool{ - opts: options, - config: config, - cache: cache, + opts: options, + discovery: discovery, + config: config, + cache: cache, } } @@ -108,7 +111,7 @@ func (p *PeerPool) setDiscoveryTimeout() { // Start creates topic pool for each topic in config and subscribes to server events. func (p *PeerPool) Start(server *p2p.Server) error { - if server.DiscV5 == nil { + if !p.discovery.Running() { return ErrDiscv5NotRunning } @@ -131,7 +134,7 @@ func (p *PeerPool) Start(server *p2p.Server) error { // collect topics and start searching for nodes p.topics = make([]*TopicPool, 0, len(p.config)) for topic, limits := range p.config { - topicPool := NewTopicPool(topic, limits, p.opts.SlowSync, p.opts.FastSync, p.cache) + topicPool := NewTopicPool(p.discovery, topic, limits, p.opts.SlowSync, p.opts.FastSync, p.cache) if err := topicPool.StartSearch(server); err != nil { return err } @@ -144,18 +147,16 @@ func (p *PeerPool) Start(server *p2p.Server) error { return nil } -func (p *PeerPool) startDiscovery(server *p2p.Server) error { - if server.DiscV5 != nil { +func (p *PeerPool) startDiscovery() error { + if p.discovery.Running() { return nil } - ntab, err := StartDiscv5(server) - if err != nil { + if err := p.discovery.Start(); err != nil { return err } p.mu.Lock() - server.DiscV5 = ntab p.setDiscoveryTimeout() p.mu.Unlock() @@ -164,18 +165,19 @@ func (p *PeerPool) startDiscovery(server *p2p.Server) error { return nil } -func (p *PeerPool) stopDiscovery(server *p2p.Server) { - if server.DiscV5 == nil { +func (p *PeerPool) stopDiscovery() { + if !p.discovery.Running() { return } for _, t := range p.topics { t.StopSearch() } + if err := p.discovery.Stop(); err != nil { + log.Error("discovery errored when was closed", "err", err) + } p.mu.Lock() - server.DiscV5.Close() - server.DiscV5 = nil p.timeout = nil p.mu.Unlock() @@ -184,8 +186,8 @@ func (p *PeerPool) stopDiscovery(server *p2p.Server) { // restartDiscovery and search for topics that have peer count below min func (p *PeerPool) restartDiscovery(server *p2p.Server) error { - if server.DiscV5 == nil { - if err := p.startDiscovery(server); err != nil { + if !p.discovery.Running() { + if err := p.startDiscovery(); err != nil { return err } log.Debug("restarted discovery from peer pool") @@ -218,19 +220,19 @@ func (p *PeerPool) handleServerPeers(server *p2p.Server, events <-chan *p2p.Peer select { case <-p.quit: - log.Debug("stopping DiscV5 because of quit", "server", server.Self()) - p.stopDiscovery(server) + log.Debug("stopping DiscV5 because of quit") + p.stopDiscovery() return case <-timeout: - log.Info("DiscV5 timed out", "server", server.Self()) - p.stopDiscovery(server) + log.Info("DiscV5 timed out") + p.stopDiscovery() case <-retryDiscv5: if err := p.restartDiscovery(server); err != nil { retryDiscv5 = time.After(discoveryRestartTimeout) log.Error("starting discv5 failed", "error", err, "retry", discoveryRestartTimeout) } case <-stopDiscv5: - p.handleStopTopics(server) + p.handleStopTopics() case event := <-events: switch event.Type { case p2p.PeerEventTypeDrop: @@ -265,7 +267,7 @@ func (p *PeerPool) handleAddedPeer(server *p2p.Server, nodeID discover.NodeID) { // handleStopTopics stops the search on any topics having reached its max cached // limit or its delay stop is expired, additionally will stop discovery if all // peers are stopped. -func (p *PeerPool) handleStopTopics(server *p2p.Server) { +func (p *PeerPool) handleStopTopics() { if !p.opts.AllowStop { return } @@ -275,8 +277,8 @@ func (p *PeerPool) handleStopTopics(server *p2p.Server) { } } if p.allTopicsStopped() { - log.Debug("closing discv5 connection because all topics reached max limit", "server", server.Self()) - p.stopDiscovery(server) + log.Debug("closing discv5 connection because all topics reached max limit") + p.stopDiscovery() } } diff --git a/peers/peerpool_test.go b/peers/peerpool_test.go index 55fd7e7ee..d8d93b4ef 100644 --- a/peers/peerpool_test.go +++ b/peers/peerpool_test.go @@ -14,6 +14,7 @@ import ( "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/discv5" "github.com/ethereum/go-ethereum/whisper/whisperv6" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -27,9 +28,10 @@ import ( type PeerPoolSimulationSuite struct { suite.Suite - bootnode *p2p.Server - peers []*p2p.Server - port uint16 + bootnode *p2p.Server + peers []*p2p.Server + discovery []Discovery + port uint16 } func TestPeerPoolSimulationSuite(t *testing.T) { @@ -63,6 +65,7 @@ func (s *PeerPoolSimulationSuite) SetupTest() { // 1 peer to initiate connection, 1 peer as a first candidate, 1 peer - for failover s.peers = make([]*p2p.Server, 3) + s.discovery = make([]Discovery, 3) for i := range s.peers { key, _ := crypto.GenerateKey() whisper := whisperv6.New(nil) @@ -72,7 +75,6 @@ func (s *PeerPoolSimulationSuite) SetupTest() { Name: common.MakeName("peer-"+strconv.Itoa(i), "1.0"), ListenAddr: fmt.Sprintf("0.0.0.0:%d", s.nextPort()), PrivateKey: key, - DiscoveryV5: true, NoDiscovery: true, BootstrapNodesV5: []*discv5.Node{bootnodeV5}, Protocols: whisper.Protocols(), @@ -80,13 +82,17 @@ func (s *PeerPoolSimulationSuite) SetupTest() { } s.NoError(peer.Start()) s.peers[i] = peer + d := NewDiscV5(key, peer.ListenAddr, peer.BootstrapNodesV5) + s.NoError(d.Start()) + s.discovery[i] = d } } func (s *PeerPoolSimulationSuite) TearDown() { s.bootnode.Stop() - for _, p := range s.peers { - p.Stop() + for i := range s.peers { + s.peers[i].Stop() + s.NoError(s.discovery[i].Stop()) } } @@ -124,7 +130,7 @@ func (s *PeerPoolSimulationSuite) TestPeerPoolCache() { peerPoolOpts := &Options{100 * time.Millisecond, 100 * time.Millisecond, 0, true, 100 * time.Millisecond} cache, err := newInMemoryCache() s.Require().NoError(err) - peerPool := NewPeerPool(config, cache, peerPoolOpts) + peerPool := NewPeerPool(s.discovery[1], config, cache, peerPoolOpts) // start peer pool s.Require().NoError(peerPool.Start(s.peers[1])) @@ -173,12 +179,11 @@ func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() { peerPoolOpts := &Options{100 * time.Millisecond, 100 * time.Millisecond, 0, true, 0} cache, err := newInMemoryCache() s.Require().NoError(err) - peerPool := NewPeerPool(config, cache, peerPoolOpts) + peerPool := NewPeerPool(s.discovery[1], config, cache, peerPoolOpts) // create and start topic registry - register := NewRegister(topic) - err = register.Start(s.peers[0]) - s.Require().NoError(err) + register := NewRegister(s.discovery[0], topic) + s.Require().NoError(register.Start()) // subscribe for peer events before starting the peer pool events := make(chan *p2p.PeerEvent, 20) @@ -210,8 +215,8 @@ func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() { s.Equal(signal.EventDiscoveryStarted, s.getPoolEvent(poolEvents)) // register the second peer - err = register.Start(s.peers[2]) - s.Require().NoError(err) + register = NewRegister(s.discovery[2], topic) + s.Require().NoError(register.Start()) defer register.Stop() s.Equal(s.peers[2].Self().ID, s.getPeerFromEvent(events, p2p.PeerEventTypeAdd)) // Discovery can be stopped again. @@ -243,23 +248,25 @@ func TestPeerPoolMaxPeersOverflow(t *testing.T) { peer := &p2p.Server{ Config: p2p.Config{ PrivateKey: key, - DiscoveryV5: true, NoDiscovery: true, }, } require.NoError(t, peer.Start()) defer peer.Stop() - require.NotNil(t, peer.DiscV5) + discovery := NewDiscV5(key, peer.ListenAddr, nil) + require.NoError(t, discovery.Start()) + defer func() { assert.NoError(t, discovery.Stop()) }() + require.True(t, discovery.Running()) poolOpts := &Options{DefaultFastSync, DefaultSlowSync, 0, true, 100 * time.Millisecond} - pool := NewPeerPool(nil, nil, poolOpts) + pool := NewPeerPool(discovery, nil, nil, poolOpts) require.NoError(t, pool.Start(peer)) require.Equal(t, signal.EventDiscoveryStarted, <-signals) // without config, it will stop the discovery because all topic pools are satisfied pool.events <- &p2p.PeerEvent{Type: p2p.PeerEventTypeAdd} require.Equal(t, signal.EventDiscoverySummary, <-signals) require.Equal(t, signal.EventDiscoveryStopped, <-signals) - require.Nil(t, peer.DiscV5) + require.False(t, discovery.Running()) // another peer added after discovery is stopped should not panic pool.events <- &p2p.PeerEvent{Type: p2p.PeerEventTypeAdd} } @@ -292,17 +299,20 @@ func TestPeerPoolDiscV5Timeout(t *testing.T) { server := &p2p.Server{ Config: p2p.Config{ PrivateKey: key, - DiscoveryV5: true, NoDiscovery: true, }, } require.NoError(t, server.Start()) defer server.Stop() - require.NotNil(t, server.DiscV5) + + discovery := NewDiscV5(key, server.ListenAddr, nil) + require.NoError(t, discovery.Start()) + defer func() { assert.NoError(t, discovery.Stop()) }() + require.True(t, discovery.Running()) // start PeerPool poolOpts := &Options{DefaultFastSync, DefaultSlowSync, time.Millisecond * 100, true, 100 * time.Millisecond} - pool := NewPeerPool(nil, nil, poolOpts) + pool := NewPeerPool(discovery, nil, nil, poolOpts) require.NoError(t, pool.Start(server)) require.Equal(t, signal.EventDiscoveryStarted, <-signals) @@ -313,12 +323,12 @@ func TestPeerPoolDiscV5Timeout(t *testing.T) { case <-time.After(pool.opts.DiscServerTimeout * 2): t.Fatal("timed out") } - require.Nil(t, server.DiscV5) + require.False(t, discovery.Running()) // timeout after discovery restart require.NoError(t, pool.restartDiscovery(server)) require.Equal(t, signal.EventDiscoveryStarted, <-signals) - require.NotNil(t, server.DiscV5) + require.True(t, discovery.Running()) pool.events <- &p2p.PeerEvent{Type: p2p.PeerEventTypeDrop} // required to turn the loop and pick up new timeout select { case sig := <-signals: @@ -326,7 +336,7 @@ func TestPeerPoolDiscV5Timeout(t *testing.T) { case <-time.After(pool.opts.DiscServerTimeout * 2): t.Fatal("timed out") } - require.Nil(t, server.DiscV5) + require.False(t, discovery.Running()) } func TestPeerPoolNotAllowedStopping(t *testing.T) { @@ -336,20 +346,23 @@ func TestPeerPoolNotAllowedStopping(t *testing.T) { server := &p2p.Server{ Config: p2p.Config{ PrivateKey: key, - DiscoveryV5: true, NoDiscovery: true, }, } require.NoError(t, server.Start()) defer server.Stop() - require.NotNil(t, server.DiscV5) + + discovery := NewDiscV5(key, server.ListenAddr, nil) + require.NoError(t, discovery.Start()) + defer func() { assert.NoError(t, discovery.Stop()) }() + require.True(t, discovery.Running()) // start PeerPool poolOpts := &Options{DefaultFastSync, DefaultSlowSync, time.Millisecond * 100, false, 100 * time.Millisecond} - pool := NewPeerPool(nil, nil, poolOpts) + pool := NewPeerPool(discovery, nil, nil, poolOpts) require.NoError(t, pool.Start(server)) // wait 2x timeout duration <-time.After(pool.opts.DiscServerTimeout * 2) - require.NotNil(t, server.DiscV5) + require.True(t, discovery.Running()) } diff --git a/peers/topic_register.go b/peers/topic_register.go index e7dac02ac..06c0ee1fc 100644 --- a/peers/topic_register.go +++ b/peers/topic_register.go @@ -4,26 +4,26 @@ import ( "sync" "github.com/ethereum/go-ethereum/log" - "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/discv5" ) // Register manages register topic queries type Register struct { - topics []discv5.Topic + discovery Discovery + topics []discv5.Topic wg sync.WaitGroup quit chan struct{} } // NewRegister creates instance of topic register -func NewRegister(topics ...discv5.Topic) *Register { - return &Register{topics: topics} +func NewRegister(discovery Discovery, topics ...discv5.Topic) *Register { + return &Register{discovery: discovery, topics: topics} } // Start topic register query for every topic -func (r *Register) Start(server *p2p.Server) error { - if server.DiscV5 == nil { +func (r *Register) Start() error { + if !r.discovery.Running() { return ErrDiscv5NotRunning } r.quit = make(chan struct{}) @@ -31,7 +31,9 @@ func (r *Register) Start(server *p2p.Server) error { r.wg.Add(1) go func(t discv5.Topic) { log.Debug("v5 register topic", "topic", t) - server.DiscV5.RegisterTopic(t, r.quit) + if err := r.discovery.Register(string(t), r.quit); err != nil { + log.Error("error registering topic", "topic", t, "error", err) + } r.wg.Done() }(topic) } diff --git a/peers/topicpool.go b/peers/topicpool.go index b567d29f8..ab7a84750 100644 --- a/peers/topicpool.go +++ b/peers/topicpool.go @@ -24,8 +24,9 @@ const ( var maxCachedPeersMultiplier = 2 // NewTopicPool returns instance of TopicPool -func NewTopicPool(topic discv5.Topic, limits params.Limits, slowMode, fastMode time.Duration, cache *Cache) *TopicPool { +func NewTopicPool(discovery Discovery, topic discv5.Topic, limits params.Limits, slowMode, fastMode time.Duration, cache *Cache) *TopicPool { pool := TopicPool{ + discovery: discovery, topic: topic, limits: limits, fastMode: fastMode, @@ -44,6 +45,8 @@ func NewTopicPool(topic discv5.Topic, limits params.Limits, slowMode, fastMode t // TopicPool manages peers for topic. type TopicPool struct { + discovery Discovery + // configuration topic discv5.Topic limits params.Limits @@ -351,7 +354,7 @@ func (t *TopicPool) StartSearch(server *p2p.Server) error { if atomic.LoadInt32(&t.running) == 1 { return nil } - if server.DiscV5 == nil { + if !t.discovery.Running() { return ErrDiscv5NotRunning } atomic.StoreInt32(&t.running, 1) @@ -378,7 +381,9 @@ func (t *TopicPool) StartSearch(server *p2p.Server) error { t.discWG.Add(1) go func() { - server.DiscV5.SearchTopic(t.topic, t.period, found, lookup) + if err := t.discovery.Discover(string(t.topic), t.period, found, lookup); err != nil { + log.Error("error searching foro", "topic", t.topic, "err", err) + } t.discWG.Done() }() t.poolWG.Add(1) diff --git a/peers/topicpool_test.go b/peers/topicpool_test.go index bab62f4c9..91868c038 100644 --- a/peers/topicpool_test.go +++ b/peers/topicpool_test.go @@ -42,7 +42,7 @@ func (s *TopicPoolSuite) SetupTest() { limits := params.NewLimits(1, 2) cache, err := newInMemoryCache() s.Require().NoError(err) - s.topicPool = NewTopicPool(topic, limits, 100*time.Millisecond, 200*time.Millisecond, cache) + s.topicPool = NewTopicPool(&DiscV5{}, topic, limits, 100*time.Millisecond, 200*time.Millisecond, cache) s.topicPool.running = 1 // This is a buffered channel to simplify testing. // If your test generates more than 10 mode changes,