From cb15ca6e744beb12e0a15ab5a86b073f008e5ed3 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Tue, 13 Nov 2018 14:58:26 +0100 Subject: [PATCH] Mark peers that were added by peer pool (#1271) * Mark peers that were added by peer pool * Implement thorough test to verify that connection was indeed preserved * Add more debug logs and remove caching of the peers --- peers/peerpool.go | 2 + peers/topicpool.go | 15 ++++-- peers/topicpool_test.go | 110 +++++++++++++++++++++++++++++++++++++--- 3 files changed, 116 insertions(+), 11 deletions(-) diff --git a/peers/peerpool.go b/peers/peerpool.go index c75916719..2d61a6219 100644 --- a/peers/peerpool.go +++ b/peers/peerpool.go @@ -79,6 +79,8 @@ type peerInfo struct { discoveredTime mclock.AbsTime // dismissed is true when our node requested a disconnect dismissed bool + // added is true when the node tries to add this peer to a server + added bool node *discv5.Node } diff --git a/peers/topicpool.go b/peers/topicpool.go index 11e9c7431..e5afd33bf 100644 --- a/peers/topicpool.go +++ b/peers/topicpool.go @@ -22,7 +22,7 @@ const ( // maxCachedPeersMultiplier peers max limit will be multiplied by this number // to get the maximum number of cached peers allowed. -var maxCachedPeersMultiplier = 2 +var maxCachedPeersMultiplier = 1 // TopicPoolInterface the TopicPool interface. type TopicPoolInterface interface { @@ -270,9 +270,11 @@ func (t *TopicPool) ConfirmAdded(server *p2p.Server, nodeID discover.NodeID) { discV5NodeID := discv5.NodeID(nodeID) - // inbound connection peerInfoItem, ok := t.pendingPeers[discV5NodeID] - if !ok { + inbound := !ok || !peerInfoItem.added + log.Debug("peer added event", "peer", nodeID.String(), "inbound", inbound) + // inbound connection + if inbound { return } peer := peerInfoItem.peerInfo // get explicit reference @@ -453,7 +455,9 @@ func (t *TopicPool) processFoundNode(server *p2p.Server, node *discv5.Node) { node: node, }) } - + log.Debug( + "adding peer to a server", "peer", node.ID.String(), + "connected", len(t.connectedPeers), "max", t.maxCachedPeers) // the upper limit is not reached, so let's add this peer if len(t.connectedPeers) < t.maxCachedPeers { t.addServerPeer(server, t.pendingPeers[node.ID].peerInfo) @@ -463,6 +467,7 @@ func (t *TopicPool) processFoundNode(server *p2p.Server, node *discv5.Node) { } func (t *TopicPool) addServerPeer(server *p2p.Server, info *peerInfo) { + info.added = true server.AddPeer(discover.NewNode( discover.NodeID(info.node.ID), info.node.IP, @@ -472,6 +477,8 @@ func (t *TopicPool) addServerPeer(server *p2p.Server, info *peerInfo) { } func (t *TopicPool) removeServerPeer(server *p2p.Server, info *peerInfo) { + log.Debug("request to remove a peer", "id", info.node.ID.String()) + info.added = false server.RemovePeer(discover.NewNode( discover.NodeID(info.node.ID), info.node.IP, diff --git a/peers/topicpool_test.go b/peers/topicpool_test.go index b043bc477..ee1c7e673 100644 --- a/peers/topicpool_test.go +++ b/peers/topicpool_test.go @@ -10,6 +10,9 @@ import ( "github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/discv5" "github.com/status-im/status-go/params" + "github.com/status-im/status-go/t/helpers" + "github.com/status-im/whisper/whisperv6" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" ) @@ -70,14 +73,9 @@ func (s *TopicPoolSuite) TestUsingCache() { peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311) s.topicPool.processFoundNode(s.peer, peer1) - s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID)) - s.Equal([]*discv5.Node{peer1}, s.topicPool.cache.GetPeersRange(s.topicPool.topic, 10)) - - // Add a new peer which exceeds the upper limit. - // It should still be added to the cache and - // not removed when dropped. peer2 := discv5.NewNode(discv5.NodeID{2}, s.peer.Self().IP, 32311, 32311) s.topicPool.processFoundNode(s.peer, peer2) + s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID)) s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer2.ID)) s.Equal([]*discv5.Node{peer1, peer2}, s.topicPool.cache.GetPeersRange(s.topicPool.topic, 10)) s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer2.ID)) @@ -253,12 +251,13 @@ func (s *TopicPoolSuite) TestReplacementPeerIsCounted() { s.topicPool.processFoundNode(s.peer, peer2) s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID)) s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer2.ID)) - s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer2.ID)) s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer1.ID)) + s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer2.ID)) s.NotContains(s.topicPool.pendingPeers, peer1.ID) s.NotContains(s.topicPool.connectedPeers, peer1.ID) s.Contains(s.topicPool.pendingPeers, peer2.ID) + s.topicPool.pendingPeers[peer2.ID].added = true s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer2.ID)) s.True(s.topicPool.MaxReached()) } @@ -328,3 +327,100 @@ func (s *TopicPoolSuite) TestNewTopicPoolInterface() { cacheTP := newCacheOnlyTopicPool(tp, &testTrueVerifier{}) s.IsType(&cacheOnlyTopicPool{}, cacheTP) } + +func (s *TopicPoolSuite) TestIgnoreInboundConnection() { + s.topicPool.limits = params.NewLimits(0, 0) + s.topicPool.maxCachedPeers = 0 + peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311) + s.topicPool.processFoundNode(s.peer, peer1) + s.Contains(s.topicPool.pendingPeers, peer1.ID) + s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID)) + s.Contains(s.topicPool.pendingPeers, peer1.ID) + s.False(s.topicPool.pendingPeers[peer1.ID].dismissed) + s.NotContains(s.topicPool.connectedPeers, peer1.ID) +} + +func (s *TopicPoolSuite) TestConnectedButRemoved() { + s.topicPool.limits = params.NewLimits(0, 0) + s.topicPool.maxCachedPeers = 1 + peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311) + s.topicPool.processFoundNode(s.peer, peer1) + s.Contains(s.topicPool.pendingPeers, peer1.ID) + s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID)) + s.Contains(s.topicPool.connectedPeers, peer1.ID) + s.False(s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer1.ID))) + s.False(s.topicPool.pendingPeers[peer1.ID].added) +} + +func TestServerIgnoresInboundPeer(t *testing.T) { + topic := discv5.Topic("cap=cap1") + limits := params.NewLimits(0, 0) + cache, err := newInMemoryCache() + require.NoError(t, err) + topicPool := newTopicPool(nil, topic, limits, 100*time.Millisecond, 200*time.Millisecond, cache) + topicPool.running = 1 + topicPool.maxCachedPeers = 0 + + whisper := whisperv6.New(nil) + srvkey, err := crypto.GenerateKey() + require.NoError(t, err) + server := &p2p.Server{ + Config: p2p.Config{ + MaxPeers: 1, + Name: "server", + ListenAddr: ":0", + PrivateKey: srvkey, + NoDiscovery: true, + Protocols: whisper.Protocols(), + }, + } + require.NoError(t, server.Start()) + clientkey, err := crypto.GenerateKey() + require.NoError(t, err) + client := &p2p.Server{ + Config: p2p.Config{ + MaxPeers: 1, + Name: "client", + ListenAddr: ":0", + PrivateKey: clientkey, + NoDiscovery: true, + Protocols: whisper.Protocols(), + }, + } + require.NoError(t, client.Start()) + + // add peer to topic pool, as if it was discovered. + // it will be ignored due to the limit and added to a table of pending peers. + clientID := discv5.NodeID(client.Self().ID) + topicPool.processFoundNode(server, discv5.NewNode( + clientID, + client.Self().IP, + client.Self().UDP, client.Self().TCP)) + require.Contains(t, topicPool.pendingPeers, clientID) + require.False(t, topicPool.pendingPeers[clientID].added) + + errch := helpers.WaitForPeerAsync(server, client.Self().String(), p2p.PeerEventTypeAdd, 5*time.Second) + // connect to a server from client. client will be an inbound connection for a server. + client.AddPeer(server.Self()) + select { + case err := <-errch: + require.NoError(t, err) + case <-time.After(10 * time.Second): + require.FailNow(t, "failed waiting for WaitPeerAsync") + } + + // wait some time to confirm that RemovePeer wasn't called on the server object. + errch = helpers.WaitForPeerAsync(server, client.Self().String(), p2p.PeerEventTypeDrop, time.Second) + // simulate that event was received by a topic pool. + // topic pool will ignore this even because it sees that it is inbound connection. + topicPool.ConfirmAdded(server, client.Self().ID) + require.Contains(t, topicPool.pendingPeers, clientID) + require.False(t, topicPool.pendingPeers[clientID].dismissed) + + select { + case err := <-errch: + require.EqualError(t, err, "wait for peer: timeout") + case <-time.After(10 * time.Second): + require.FailNow(t, "failed waiting for WaitPeerAsync") + } +}