Mark peers that were added by peer pool (#1271)

* Mark peers that were added by peer pool

* Implement thorough test to verify that connection was indeed preserved

* Add more debug logs and remove caching of the peers
This commit is contained in:
Dmitry Shulyak 2018-11-13 14:58:26 +01:00 committed by GitHub
parent 04ba4d44e8
commit cb15ca6e74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 116 additions and 11 deletions

View File

@ -79,6 +79,8 @@ type peerInfo struct {
discoveredTime mclock.AbsTime
// dismissed is true when our node requested a disconnect
dismissed bool
// added is true when the node tries to add this peer to a server
added bool
node *discv5.Node
}

View File

@ -22,7 +22,7 @@ const (
// maxCachedPeersMultiplier peers max limit will be multiplied by this number
// to get the maximum number of cached peers allowed.
var maxCachedPeersMultiplier = 2
var maxCachedPeersMultiplier = 1
// TopicPoolInterface the TopicPool interface.
type TopicPoolInterface interface {
@ -270,9 +270,11 @@ func (t *TopicPool) ConfirmAdded(server *p2p.Server, nodeID discover.NodeID) {
discV5NodeID := discv5.NodeID(nodeID)
// inbound connection
peerInfoItem, ok := t.pendingPeers[discV5NodeID]
if !ok {
inbound := !ok || !peerInfoItem.added
log.Debug("peer added event", "peer", nodeID.String(), "inbound", inbound)
// inbound connection
if inbound {
return
}
peer := peerInfoItem.peerInfo // get explicit reference
@ -453,7 +455,9 @@ func (t *TopicPool) processFoundNode(server *p2p.Server, node *discv5.Node) {
node: node,
})
}
log.Debug(
"adding peer to a server", "peer", node.ID.String(),
"connected", len(t.connectedPeers), "max", t.maxCachedPeers)
// the upper limit is not reached, so let's add this peer
if len(t.connectedPeers) < t.maxCachedPeers {
t.addServerPeer(server, t.pendingPeers[node.ID].peerInfo)
@ -463,6 +467,7 @@ func (t *TopicPool) processFoundNode(server *p2p.Server, node *discv5.Node) {
}
func (t *TopicPool) addServerPeer(server *p2p.Server, info *peerInfo) {
info.added = true
server.AddPeer(discover.NewNode(
discover.NodeID(info.node.ID),
info.node.IP,
@ -472,6 +477,8 @@ func (t *TopicPool) addServerPeer(server *p2p.Server, info *peerInfo) {
}
func (t *TopicPool) removeServerPeer(server *p2p.Server, info *peerInfo) {
log.Debug("request to remove a peer", "id", info.node.ID.String())
info.added = false
server.RemovePeer(discover.NewNode(
discover.NodeID(info.node.ID),
info.node.IP,

View File

@ -10,6 +10,9 @@ import (
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/t/helpers"
"github.com/status-im/whisper/whisperv6"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)
@ -70,14 +73,9 @@ func (s *TopicPoolSuite) TestUsingCache() {
peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311)
s.topicPool.processFoundNode(s.peer, peer1)
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID))
s.Equal([]*discv5.Node{peer1}, s.topicPool.cache.GetPeersRange(s.topicPool.topic, 10))
// Add a new peer which exceeds the upper limit.
// It should still be added to the cache and
// not removed when dropped.
peer2 := discv5.NewNode(discv5.NodeID{2}, s.peer.Self().IP, 32311, 32311)
s.topicPool.processFoundNode(s.peer, peer2)
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID))
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer2.ID))
s.Equal([]*discv5.Node{peer1, peer2}, s.topicPool.cache.GetPeersRange(s.topicPool.topic, 10))
s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer2.ID))
@ -253,12 +251,13 @@ func (s *TopicPoolSuite) TestReplacementPeerIsCounted() {
s.topicPool.processFoundNode(s.peer, peer2)
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID))
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer2.ID))
s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer2.ID))
s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer1.ID))
s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer2.ID))
s.NotContains(s.topicPool.pendingPeers, peer1.ID)
s.NotContains(s.topicPool.connectedPeers, peer1.ID)
s.Contains(s.topicPool.pendingPeers, peer2.ID)
s.topicPool.pendingPeers[peer2.ID].added = true
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer2.ID))
s.True(s.topicPool.MaxReached())
}
@ -328,3 +327,100 @@ func (s *TopicPoolSuite) TestNewTopicPoolInterface() {
cacheTP := newCacheOnlyTopicPool(tp, &testTrueVerifier{})
s.IsType(&cacheOnlyTopicPool{}, cacheTP)
}
func (s *TopicPoolSuite) TestIgnoreInboundConnection() {
s.topicPool.limits = params.NewLimits(0, 0)
s.topicPool.maxCachedPeers = 0
peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311)
s.topicPool.processFoundNode(s.peer, peer1)
s.Contains(s.topicPool.pendingPeers, peer1.ID)
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID))
s.Contains(s.topicPool.pendingPeers, peer1.ID)
s.False(s.topicPool.pendingPeers[peer1.ID].dismissed)
s.NotContains(s.topicPool.connectedPeers, peer1.ID)
}
func (s *TopicPoolSuite) TestConnectedButRemoved() {
s.topicPool.limits = params.NewLimits(0, 0)
s.topicPool.maxCachedPeers = 1
peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311)
s.topicPool.processFoundNode(s.peer, peer1)
s.Contains(s.topicPool.pendingPeers, peer1.ID)
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID))
s.Contains(s.topicPool.connectedPeers, peer1.ID)
s.False(s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer1.ID)))
s.False(s.topicPool.pendingPeers[peer1.ID].added)
}
func TestServerIgnoresInboundPeer(t *testing.T) {
topic := discv5.Topic("cap=cap1")
limits := params.NewLimits(0, 0)
cache, err := newInMemoryCache()
require.NoError(t, err)
topicPool := newTopicPool(nil, topic, limits, 100*time.Millisecond, 200*time.Millisecond, cache)
topicPool.running = 1
topicPool.maxCachedPeers = 0
whisper := whisperv6.New(nil)
srvkey, err := crypto.GenerateKey()
require.NoError(t, err)
server := &p2p.Server{
Config: p2p.Config{
MaxPeers: 1,
Name: "server",
ListenAddr: ":0",
PrivateKey: srvkey,
NoDiscovery: true,
Protocols: whisper.Protocols(),
},
}
require.NoError(t, server.Start())
clientkey, err := crypto.GenerateKey()
require.NoError(t, err)
client := &p2p.Server{
Config: p2p.Config{
MaxPeers: 1,
Name: "client",
ListenAddr: ":0",
PrivateKey: clientkey,
NoDiscovery: true,
Protocols: whisper.Protocols(),
},
}
require.NoError(t, client.Start())
// add peer to topic pool, as if it was discovered.
// it will be ignored due to the limit and added to a table of pending peers.
clientID := discv5.NodeID(client.Self().ID)
topicPool.processFoundNode(server, discv5.NewNode(
clientID,
client.Self().IP,
client.Self().UDP, client.Self().TCP))
require.Contains(t, topicPool.pendingPeers, clientID)
require.False(t, topicPool.pendingPeers[clientID].added)
errch := helpers.WaitForPeerAsync(server, client.Self().String(), p2p.PeerEventTypeAdd, 5*time.Second)
// connect to a server from client. client will be an inbound connection for a server.
client.AddPeer(server.Self())
select {
case err := <-errch:
require.NoError(t, err)
case <-time.After(10 * time.Second):
require.FailNow(t, "failed waiting for WaitPeerAsync")
}
// wait some time to confirm that RemovePeer wasn't called on the server object.
errch = helpers.WaitForPeerAsync(server, client.Self().String(), p2p.PeerEventTypeDrop, time.Second)
// simulate that event was received by a topic pool.
// topic pool will ignore this even because it sees that it is inbound connection.
topicPool.ConfirmAdded(server, client.Self().ID)
require.Contains(t, topicPool.pendingPeers, clientID)
require.False(t, topicPool.pendingPeers[clientID].dismissed)
select {
case err := <-errch:
require.EqualError(t, err, "wait for peer: timeout")
case <-time.After(10 * time.Second):
require.FailNow(t, "failed waiting for WaitPeerAsync")
}
}