From a7a2e01b4ae612de8f1cc3868b75028723f9b87a Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Thu, 12 Apr 2018 16:08:49 +0300 Subject: [PATCH] Peerpool failover (#801) --- geth/peers/discv5.go | 31 ++++++++++++ geth/peers/peerpool.go | 95 +++++++++++++++++++++++++++++------- geth/peers/peerpool_test.go | 52 ++++++++++++-------- geth/peers/topicpool.go | 38 ++++++++++++--- geth/peers/topicpool_test.go | 19 ++++++-- 5 files changed, 186 insertions(+), 49 deletions(-) create mode 100644 geth/peers/discv5.go diff --git a/geth/peers/discv5.go b/geth/peers/discv5.go new file mode 100644 index 000000000..96d794ff2 --- /dev/null +++ b/geth/peers/discv5.go @@ -0,0 +1,31 @@ +package peers + +import ( + "net" + + "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discv5" +) + +// StartDiscv5 starts discv5 udp listener. +// This is done here to avoid patching p2p server, we can't hold a lock of course +// but no other sub-process should use discovery +func StartDiscv5(server *p2p.Server) (*discv5.Network, error) { + addr, err := net.ResolveUDPAddr("udp", server.ListenAddr) + if err != nil { + return nil, err + } + conn, err := net.ListenUDP("udp", addr) + if err != nil { + return nil, err + } + realaddr := conn.LocalAddr().(*net.UDPAddr) + ntab, err := discv5.ListenUDP(server.PrivateKey, conn, realaddr, "", server.NetRestrict) + if err != nil { + return nil, err + } + if err := ntab.SetFallbackNodes(server.BootstrapNodesV5); err != nil { + return nil, err + } + return ntab, nil +} diff --git a/geth/peers/peerpool.go b/geth/peers/peerpool.go index 8edec6161..c8ce998b9 100644 --- a/geth/peers/peerpool.go +++ b/geth/peers/peerpool.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/discv5" "github.com/status-im/status-go/geth/params" @@ -22,6 +23,8 @@ var ( const ( // expirationPeriod is an amount of time while peer is considered as a connectable expirationPeriod = 60 * time.Minute + // discoveryRestartTimeout defines how often loop will try to start discovery server + discoveryRestartTimeout = 2 * time.Second // DefaultFastSync is a recommended value for aggressive peers search. DefaultFastSync = 3 * time.Second // DefaultSlowSync is a recommended value for slow (background) peers search. @@ -44,6 +47,8 @@ type peerInfo struct { discoveredTime mclock.AbsTime // connected is true if node is added as a static peer connected bool + // requested is true when our node requested a disconnect + requested bool node *discv5.Node } @@ -92,42 +97,98 @@ func (p *PeerPool) Start(server *p2p.Server) error { return nil } +// restartDiscovery and search for topics that have peer count below min +func (p *PeerPool) restartDiscovery(server *p2p.Server) error { + if server.DiscV5 == nil { + ntab, err := StartDiscv5(server) + if err != nil { + log.Error("starting discv5 failed", "error", err, "retry in", discoveryRestartTimeout) + return err + } + log.Debug("restarted discovery from peer pool") + server.DiscV5 = ntab + } + for _, t := range p.topics { + if !t.BelowMin() || t.SearchRunning() { + continue + } + err := t.StartSearch(server) + if err != nil { + log.Error("search failed to start", "error", err) + } + } + return nil +} + // handleServerPeers watches server peer events, notifies topic pools about changes // in the peer set and stops the discv5 if all topic pools collected enough peers. func (p *PeerPool) handleServerPeers(server *p2p.Server, events <-chan *p2p.PeerEvent) { + var retryDiscv5 <-chan time.Time + for { select { case <-p.quit: return + case <-retryDiscv5: + if err := p.restartDiscovery(server); err != nil { + retryDiscv5 = time.After(discoveryRestartTimeout) + } case event := <-events: switch event.Type { case p2p.PeerEventTypeDrop: - p.mu.Lock() - for _, t := range p.topics { - t.ConfirmDropped(server, event.Peer, event.Error) - // TODO(dshulyak) restart discv5 if peers number dropped too low + log.Debug("confirm peer dropped", "ID", event.Peer) + if p.stopOnMax && p.handleDroppedPeer(server, event.Peer) { + retryDiscv5 = time.After(0) } - p.mu.Unlock() case p2p.PeerEventTypeAdd: - p.mu.Lock() - total := 0 - for _, t := range p.topics { - t.ConfirmAdded(server, event.Peer) - if p.stopOnMax && t.MaxReached() { - total++ - t.StopSearch() - } - } - if p.stopOnMax && total == len(p.config) { - log.Debug("closing discv5 connection") + log.Debug("confirm peer added", "ID", event.Peer) + if p.stopOnMax && p.handleAddedPeer(server, event.Peer) { + log.Debug("closing discv5 connection", "server", server.Self()) server.DiscV5.Close() + server.DiscV5 = nil } - p.mu.Unlock() } } } } +// handleAddedPeer notifies all topics about added peer and return true if all topics has max limit of connections +func (p *PeerPool) handleAddedPeer(server *p2p.Server, nodeID discover.NodeID) (all bool) { + p.mu.Lock() + defer p.mu.Unlock() + all = true + for _, t := range p.topics { + t.ConfirmAdded(server, nodeID) + if p.stopOnMax && t.MaxReached() { + t.StopSearch() + } else { + all = false + } + } + return all +} + +// handleDroppedPeer notifies every topic about dropped peer and returns true if any peer have connections +// below min limit +func (p *PeerPool) handleDroppedPeer(server *p2p.Server, nodeID discover.NodeID) (any bool) { + p.mu.Lock() + defer p.mu.Unlock() + for _, t := range p.topics { + confirmed := t.ConfirmDropped(server, nodeID) + if confirmed { + newPeer := t.AddPeerFromTable(server) + if newPeer != nil { + log.Debug("added peer from local table", "ID", newPeer.ID) + } + } + log.Debug("search", "topic", t.topic, "below min", t.BelowMin()) + if t.BelowMin() && !t.SearchRunning() { + any = true + } + } + return any +} + // Stop closes pool quit channel and all channels that are watched by search queries // and waits till all goroutines will exit. func (p *PeerPool) Stop() { diff --git a/geth/peers/peerpool_test.go b/geth/peers/peerpool_test.go index dafefba3a..37ce65792 100644 --- a/geth/peers/peerpool_test.go +++ b/geth/peers/peerpool_test.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/discv5" "github.com/stretchr/testify/suite" @@ -46,7 +47,8 @@ func (s *PeerPoolSimulationSuite) SetupTest() { s.Require().NoError(s.bootnode.Start()) bootnodeV5 := discv5.NewNode(s.bootnode.DiscV5.Self().ID, net.ParseIP("127.0.0.1"), uint16(port), uint16(port)) - s.peers = make([]*p2p.Server, 2) + // 1 peer to initiate connection, 1 peer as a first candidate, 1 peer - for failover + s.peers = make([]*p2p.Server, 3) for i := range s.peers { key, _ := crypto.GenerateKey() peer := &p2p.Server{ @@ -66,14 +68,26 @@ func (s *PeerPoolSimulationSuite) SetupTest() { } } -func (s *PeerPoolSimulationSuite) TestSingleTopicDiscovery() { +func (s *PeerPoolSimulationSuite) getPeerFromEvent(events <-chan *p2p.PeerEvent, etype p2p.PeerEventType) (nodeID discover.NodeID) { + select { + case ev := <-events: + if ev.Type == etype { + return ev.Peer + } + case <-time.After(5 * time.Second): + s.Fail("timed out waiting for a peer") + return + } + return +} + +func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() { topic := discv5.Topic("cap=test") - expectedConnections := 1 // simulation should only rely on fast sync config := map[discv5.Topic]params.Limits{ - topic: {expectedConnections, expectedConnections}, + topic: {1, 1}, // limits a chosen for simplicity of the simulation } - peerPool := NewPeerPool(config, 100*time.Millisecond, 100*time.Millisecond, nil, false) + peerPool := NewPeerPool(config, 100*time.Millisecond, 100*time.Millisecond, nil, true) register := NewRegister(topic) s.Require().NoError(register.Start(s.peers[0])) defer register.Stop() @@ -84,20 +98,20 @@ func (s *PeerPoolSimulationSuite) TestSingleTopicDiscovery() { defer subscription.Unsubscribe() s.NoError(peerPool.Start(s.peers[1])) defer peerPool.Stop() - connected := 0 - for { - select { - case ev := <-events: - if ev.Type == p2p.PeerEventTypeAdd { - connected++ - } - case <-time.After(5 * time.Second): - s.Require().FailNowf("waiting for peers timed out", strconv.Itoa(connected)) - } - if connected == expectedConnections { - break - } - } + connected := s.getPeerFromEvent(events, p2p.PeerEventTypeAdd) + s.Equal(s.peers[0].Self().ID, connected) + time.Sleep(100 * time.Millisecond) + s.Require().Nil(s.peers[1].DiscV5) + s.peers[0].Stop() + disconnected := s.getPeerFromEvent(events, p2p.PeerEventTypeDrop) + s.Equal(connected, disconnected) + time.Sleep(100 * time.Millisecond) + s.Require().NotNil(s.peers[1].DiscV5) + register = NewRegister(topic) + s.Require().NoError(register.Start(s.peers[2])) + defer register.Stop() + newConnected := s.getPeerFromEvent(events, p2p.PeerEventTypeAdd) + s.Equal(s.peers[2].Self().ID, newConnected) } func (s *PeerPoolSimulationSuite) TearDown() { diff --git a/geth/peers/topicpool.go b/geth/peers/topicpool.go index eb337ff9b..ee22d430a 100644 --- a/geth/peers/topicpool.go +++ b/geth/peers/topicpool.go @@ -56,6 +56,13 @@ func (t *TopicPool) MaxReached() bool { return t.connected == t.limits[1] } +// BelowMin returns true if current number of peers is below min limit. +func (t *TopicPool) BelowMin() bool { + t.mu.RLock() + defer t.mu.RUnlock() + return t.connected < t.limits[0] +} + // ConfirmAdded called when peer was added by p2p Server. // 1. Skip a peer if it not in our peer table // 2. Add a peer to a cache. @@ -80,6 +87,7 @@ func (t *TopicPool) ConfirmAdded(server *p2p.Server, nodeID discover.NodeID) { // when max limit is reached drop every peer after if t.connected == t.limits[1] { log.Debug("max limit is reached drop the peer", "ID", nodeID, "topic", t.topic) + peer.requested = true t.removePeer(server, peer) return } @@ -99,8 +107,9 @@ func (t *TopicPool) ConfirmAdded(server *p2p.Server, nodeID discover.NodeID) { // 2. If disconnect request - we could drop that peer ourselves. // 3. If connected number will drop below min limit - switch to fast mode. // 4. Delete a peer from cache and peer table. -// 5. Connect with another valid peer, if such is available. -func (t *TopicPool) ConfirmDropped(server *p2p.Server, nodeID discover.NodeID, reason string) (new bool) { +// Returns false if peer is not in our table or we requested removal of this peer. +// Otherwise peer is removed and true is returned. +func (t *TopicPool) ConfirmDropped(server *p2p.Server, nodeID discover.NodeID) bool { t.mu.Lock() defer t.mu.Unlock() // either inbound or connected from another topic @@ -108,9 +117,8 @@ func (t *TopicPool) ConfirmDropped(server *p2p.Server, nodeID discover.NodeID, r if !exist { return false } - log.Debug("disconnect reason", "peer", nodeID, "reason", reason) - // if requested - we don't need to remove peer from cache and look for a replacement - if reason == p2p.DiscRequested.Error() { + log.Debug("disconnect", "ID", nodeID) + if peer.requested { return false } if t.SearchRunning() && t.connected == t.limits[0] { @@ -124,14 +132,21 @@ func (t *TopicPool) ConfirmDropped(server *p2p.Server, nodeID discover.NodeID, r log.Error("failed to remove peer from cache", "error", err) } } + return true +} + +// AddPeerFromTable checks if there is a valid peer in local table and adds it to a server. +func (t *TopicPool) AddPeerFromTable(server *p2p.Server) *discv5.Node { + t.mu.RLock() + defer t.mu.RUnlock() // TODO use a heap queue and always get a peer that was discovered recently for _, peer := range t.peers { if !peer.connected && mclock.Now() < peer.discoveredTime+mclock.AbsTime(expirationPeriod) { t.addPeer(server, peer) - return true + return peer.node } } - return false + return nil } // StartSearch creates discv5 queries and runs a loop to consume found peers. @@ -169,7 +184,11 @@ func (t *TopicPool) StartSearch(server *p2p.Server) error { } func (t *TopicPool) handleFoundPeers(server *p2p.Server, found <-chan *discv5.Node, lookup <-chan bool) { - t.period <- t.fastSync + if t.connected >= t.limits[0] { + t.period <- t.slowSync + } else { + t.period <- t.fastSync + } selfID := discv5.NodeID(server.Self().ID) for { select { @@ -226,6 +245,9 @@ func (t *TopicPool) removePeer(server *p2p.Server, info *peerInfo) { // StopSearch stops the closes stop func (t *TopicPool) StopSearch() { + if !t.SearchRunning() { + return + } if t.quit == nil { return } diff --git a/geth/peers/topicpool_test.go b/geth/peers/topicpool_test.go index 252214b76..03fa4e5d5 100644 --- a/geth/peers/topicpool_test.go +++ b/geth/peers/topicpool_test.go @@ -63,7 +63,7 @@ func (s *TopicPoolSuite) TestSyncSwitches() { s.topicPool.ConfirmAdded(s.peer, discover.NodeID(testPeer.ID)) s.AssertConsumed(s.topicPool.period, s.topicPool.slowSync, time.Second) s.True(s.topicPool.peers[testPeer.ID].connected) - s.topicPool.ConfirmDropped(s.peer, discover.NodeID(testPeer.ID), p2p.DiscProtocolError.Error()) + s.topicPool.ConfirmDropped(s.peer, discover.NodeID(testPeer.ID)) s.AssertConsumed(s.topicPool.period, s.topicPool.fastSync, time.Second) } @@ -82,15 +82,24 @@ func (s *TopicPoolSuite) TestNewPeerSelectedOnDrop() { s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer3.ID)) s.False(s.topicPool.peers[peer3.ID].connected) - s.True(s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer1.ID), p2p.DiscNetworkError.Error())) + s.True(s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer1.ID))) + s.Equal(peer3.ID, s.topicPool.AddPeerFromTable(s.peer).ID) } func (s *TopicPoolSuite) TestRequestedDoesntRemove() { + // max limit is 1 because we test that 2nd peer will stay in local table + // when we request to drop it + s.topicPool.limits = params.Limits{1, 1} peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311) + peer2 := discv5.NewNode(discv5.NodeID{2}, s.peer.Self().IP, 32311, 32311) s.topicPool.processFoundNode(s.peer, peer1) + s.topicPool.processFoundNode(s.peer, peer2) s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID)) - s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer1.ID), p2p.DiscRequested.Error()) - s.Contains(s.topicPool.peers, peer1.ID) - s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer1.ID), p2p.DiscProtocolError.Error()) + s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer2.ID)) + s.False(s.topicPool.peers[peer1.ID].requested) + s.True(s.topicPool.peers[peer2.ID].requested) + s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer2.ID)) + s.Contains(s.topicPool.peers, peer2.ID) + s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer1.ID)) s.NotContains(s.topicPool.peers, peer1.ID) }