status-go/peers/topicpool_test.go
Andrea Maria Piana 58fcf809ea
Bug/fix pending peers segfault (#2004)
* Replace mclock with time in peer pools

we used mclock as golang before 1.9 did not support monotonic clocks,
https://github.com/gavv/monotime, it does now https://golang.org/pkg/time/
so we can fallback on the system implementation, which will return
nanoseconds with a resolution that is system dependent.

* Handle case where peer have same discovered time

In some system the resolution of the clock is not high enough so
multiple peers are added on the same nanosecond.

This result in the peer just added being immediately removed.

This code adds a check making sure we don't assume that a peer is added.

Another approach would be to make sure to include the peer in the list,
so prevent the peer just being added to be evicted, but it's slightly
more complicated and the resolution is generally accurate enough for our
purpose so that peers will be fresh enough if they have the same
discovered time.

It also adds a regression test, I had to use an interface to stub the
clock.

Fixes: https://github.com/status-im/nim-status-client/issues/522

* bump version to 0.55.3
2020-07-24 11:14:05 -04:00

536 lines
19 KiB
Go

package peers
import (
"net"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/t/helpers"
"github.com/status-im/status-go/whisper/v6"
)
type TopicPoolSuite struct {
suite.Suite
peer *p2p.Server
topicPool *TopicPool
}
func TestTopicPoolSuite(t *testing.T) {
suite.Run(t, new(TopicPoolSuite))
}
func (s *TopicPoolSuite) createDiscV5Node(ip net.IP, port uint16) (enode.ID, *discv5.Node) {
id, err := crypto.GenerateKey()
s.Require().NoError(err)
nodeID := enode.PubkeyToIDV4(&id.PublicKey)
nodeV5 := discv5.NewNode(discv5.PubkeyID(&id.PublicKey), ip, port, port)
return nodeID, nodeV5
}
func (s *TopicPoolSuite) SetupTest() {
maxCachedPeersMultiplier = 1
key, _ := crypto.GenerateKey()
name := common.MakeName("peer", "1.0")
s.peer = &p2p.Server{
Config: p2p.Config{
MaxPeers: 10,
Name: name,
ListenAddr: "0.0.0.0:0",
PrivateKey: key,
NoDiscovery: true,
},
}
s.Require().NoError(s.peer.Start())
topic := discv5.Topic("cap=cap1")
limits := params.NewLimits(1, 2)
cache, err := newInMemoryCache()
s.Require().NoError(err)
s.topicPool = newTopicPool(nil, topic, limits, 100*time.Millisecond, 200*time.Millisecond, cache)
s.topicPool.running = 1
// This is a buffered channel to simplify testing.
// If your test generates more than 10 mode changes,
// override this `period` field or consume from it
// using `AssertConsumed()`.
s.topicPool.period = make(chan time.Duration, 10)
}
func (s *TopicPoolSuite) TearDown() {
s.peer.Stop()
}
func (s *TopicPoolSuite) AssertConsumed(channel <-chan time.Duration, expected time.Duration, timeout time.Duration) {
select {
case received := <-channel:
s.Equal(expected, received)
case <-time.After(timeout):
s.FailNow("timed out waiting")
}
}
func (s *TopicPoolSuite) TestUsingCache() {
s.topicPool.limits = params.NewLimits(1, 1)
s.topicPool.maxCachedPeers = 1
nodeID1, peer1 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer1))
nodeID2, peer2 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer2))
s.topicPool.ConfirmAdded(s.peer, nodeID1)
s.topicPool.ConfirmAdded(s.peer, nodeID2)
cached := s.topicPool.cache.GetPeersRange(s.topicPool.topic, 10)
s.Contains(cached, peer1)
s.Contains(cached, peer2)
s.topicPool.ConfirmDropped(s.peer, nodeID2)
cached = s.topicPool.cache.GetPeersRange(s.topicPool.topic, 10)
s.Contains(cached, peer1)
s.Contains(cached, peer2)
// A peer that drops by itself, should be removed from the cache.
s.topicPool.ConfirmDropped(s.peer, nodeID1)
s.Equal([]*discv5.Node{peer2}, s.topicPool.cache.GetPeersRange(s.topicPool.topic, 10))
}
func (s *TopicPoolSuite) TestSyncSwitches() {
nodeID, peer := s.createDiscV5Node(s.peer.Self().IP(), 32311)
s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer))
s.topicPool.ConfirmAdded(s.peer, nodeID)
s.AssertConsumed(s.topicPool.period, s.topicPool.slowMode, time.Second)
s.NotNil(s.topicPool.connectedPeers[nodeID])
s.topicPool.ConfirmDropped(s.peer, nodeID)
s.AssertConsumed(s.topicPool.period, s.topicPool.fastMode, time.Second)
}
func (s *TopicPoolSuite) TestTimeoutFastMode() {
s.topicPool.fastModeTimeout = time.Millisecond * 50
// set fast mode
s.topicPool.mu.Lock()
s.topicPool.setSyncMode(s.topicPool.fastMode)
s.topicPool.mu.Unlock()
s.Equal(s.topicPool.fastMode, <-s.topicPool.period)
// switch to slow mode after `fastModeTimeout`
select {
case mode := <-s.topicPool.period:
s.Equal(s.topicPool.slowMode, mode)
case <-time.After(s.topicPool.fastModeTimeout * 2):
s.FailNow("timed out")
}
}
func (s *TopicPoolSuite) TestSetSyncMode() {
s.topicPool.fastModeTimeout = 0
// set fast mode
s.topicPool.setSyncMode(s.topicPool.fastMode)
s.Equal(s.topicPool.fastMode, <-s.topicPool.period)
s.Equal(s.topicPool.fastMode, s.topicPool.currentMode)
// skip setting the same mode
s.topicPool.setSyncMode(s.topicPool.fastMode)
select {
case <-s.topicPool.period:
s.FailNow("should not have update the mode")
default:
// pass
}
// switch to slow mode
cancel := make(chan struct{})
s.topicPool.fastModeTimeoutCancel = cancel // should be set to nil
s.topicPool.setSyncMode(s.topicPool.slowMode)
s.Equal(s.topicPool.slowMode, <-s.topicPool.period)
s.Equal(s.topicPool.slowMode, s.topicPool.currentMode)
select {
case <-cancel:
s.Nil(s.topicPool.fastModeTimeoutCancel)
default:
s.FailNow("cancel should be closed")
}
}
func (s *TopicPoolSuite) TestNewPeerSelectedOnDrop() {
nodeID1, peer1 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
nodeID2, peer2 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
nodeID3, peer3 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
// add 3 nodes and confirm connection for 1 and 2
s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer1))
s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer2))
s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer3))
s.Len(s.topicPool.pendingPeers, 3)
s.Len(s.topicPool.discoveredPeersQueue, 0)
s.topicPool.ConfirmAdded(s.peer, nodeID1)
s.Contains(s.topicPool.connectedPeers, nodeID1)
s.topicPool.ConfirmAdded(s.peer, nodeID2)
s.Contains(s.topicPool.connectedPeers, nodeID2)
s.topicPool.ConfirmAdded(s.peer, nodeID3)
s.topicPool.ConfirmDropped(s.peer, nodeID3)
s.Contains(s.topicPool.pendingPeers, nodeID3)
s.Len(s.topicPool.pendingPeers, 1)
s.Len(s.topicPool.discoveredPeersQueue, 1)
// drop peer1
s.True(s.topicPool.ConfirmDropped(s.peer, nodeID1))
s.NotContains(s.topicPool.connectedPeers, nodeID1)
// add peer from the pool
s.Equal(peer3.ID, s.topicPool.AddPeerFromTable(s.peer).ID)
s.Len(s.topicPool.pendingPeers, 1)
s.Len(s.topicPool.discoveredPeersQueue, 0)
}
type mockConstantClock struct{}
func (mockConstantClock) Now() time.Time { return time.Unix(0, 0) }
// Regression test for https://github.com/status-im/nim-status-client/issues/522
func (s *TopicPoolSuite) TestNewPeerLackOfClockPrecision() {
_, peer1 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
_, peer2 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
_, peer3 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
s.topicPool.maxPendingPeers = 2
s.topicPool.clock = mockConstantClock{}
s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer1))
s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer2))
s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer3))
}
func (s *TopicPoolSuite) TestRequestedDoesntRemove() {
// max limit is 1 because we test that 2nd peer will stay in local table
// when we request to drop it
s.topicPool.limits = params.NewLimits(1, 1)
s.topicPool.maxCachedPeers = 1
nodeID1, peer1 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
nodeID2, peer2 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer1))
s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer2))
s.topicPool.ConfirmAdded(s.peer, nodeID1)
s.topicPool.ConfirmAdded(s.peer, nodeID2)
s.False(s.topicPool.connectedPeers[nodeID1].dismissed)
s.True(s.topicPool.connectedPeers[nodeID2].dismissed)
s.topicPool.ConfirmDropped(s.peer, nodeID2)
s.Contains(s.topicPool.pendingPeers, nodeID2)
s.NotContains(s.topicPool.connectedPeers, nodeID2)
s.topicPool.ConfirmDropped(s.peer, nodeID1)
s.NotContains(s.topicPool.pendingPeers, nodeID1)
s.NotContains(s.topicPool.connectedPeers, nodeID1)
}
func (s *TopicPoolSuite) TestTheMostRecentPeerIsSelected() {
s.topicPool.limits = params.NewLimits(1, 1)
s.topicPool.maxCachedPeers = 1
nodeID1, peer1 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
nodeID2, peer2 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
nodeID3, peer3 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
// after these operations, peer1 is confirmed and peer3 and peer2
// was added to the pool; peer3 is the most recent one
s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer1))
s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer2))
s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer3))
s.topicPool.ConfirmAdded(s.peer, nodeID1)
s.topicPool.ConfirmAdded(s.peer, nodeID2)
s.topicPool.ConfirmAdded(s.peer, nodeID3)
s.topicPool.ConfirmDropped(s.peer, nodeID2)
s.topicPool.ConfirmDropped(s.peer, nodeID3)
// peer1 has dropped
s.topicPool.ConfirmDropped(s.peer, nodeID1)
// and peer3 is take from the pool as the most recent
s.True(s.topicPool.pendingPeers[nodeID2].discoveredTime.Before(s.topicPool.pendingPeers[nodeID3].discoveredTime))
s.Equal(peer3.ID, s.topicPool.AddPeerFromTable(s.peer).ID)
}
func (s *TopicPoolSuite) TestSelectPeerAfterMaxLimit() {
s.topicPool.limits = params.NewLimits(1, 1)
s.topicPool.maxCachedPeers = 1
nodeID1, peer1 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
nodeID2, peer2 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
nodeID3, peer3 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer1))
s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer2))
s.topicPool.ConfirmAdded(s.peer, nodeID1)
s.topicPool.ConfirmAdded(s.peer, nodeID2)
s.topicPool.ConfirmDropped(s.peer, nodeID2)
s.Len(s.topicPool.pendingPeers, 1)
s.Contains(s.topicPool.pendingPeers, nodeID2)
s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer3))
s.Len(s.topicPool.pendingPeers, 2)
s.Contains(s.topicPool.pendingPeers, nodeID3)
s.Equal(peer3, s.topicPool.AddPeerFromTable(s.peer))
}
func (s *TopicPoolSuite) TestReplacementPeerIsCounted() {
s.topicPool.limits = params.NewLimits(1, 1)
s.topicPool.maxCachedPeers = 1
nodeID1, peer1 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
nodeID2, peer2 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer1))
s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer2))
s.topicPool.ConfirmAdded(s.peer, nodeID1)
s.topicPool.ConfirmAdded(s.peer, nodeID2)
s.topicPool.ConfirmDropped(s.peer, nodeID2)
s.topicPool.ConfirmDropped(s.peer, nodeID1)
s.NotContains(s.topicPool.pendingPeers, nodeID1)
s.NotContains(s.topicPool.connectedPeers, nodeID1)
s.Contains(s.topicPool.pendingPeers, nodeID2)
s.topicPool.pendingPeers[nodeID2].added = true
s.topicPool.ConfirmAdded(s.peer, nodeID2)
s.True(s.topicPool.MaxReached())
}
func (s *TopicPoolSuite) TestPeerDontAddTwice() {
s.topicPool.limits = params.NewLimits(1, 1)
s.topicPool.maxCachedPeers = 1
nodeID1, peer1 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
_, peer2 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer1))
s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer2))
s.topicPool.ConfirmAdded(s.peer, nodeID1)
// peer2 already added to p2p server no reason to add it again
s.Nil(s.topicPool.AddPeerFromTable(s.peer))
}
func (s *TopicPoolSuite) TestMaxCachedPeers() {
s.topicPool.limits = params.NewLimits(1, 1)
s.topicPool.maxCachedPeers = 3
nodeID1, peer1 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
nodeID2, peer2 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
nodeID3, peer3 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer1))
s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer2))
s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer3))
s.topicPool.ConfirmAdded(s.peer, nodeID1)
s.topicPool.ConfirmAdded(s.peer, nodeID2)
s.topicPool.ConfirmAdded(s.peer, nodeID3)
s.Equal(3, len(s.topicPool.connectedPeers))
s.False(s.topicPool.connectedPeers[nodeID1].dismissed)
s.True(s.topicPool.connectedPeers[nodeID2].dismissed)
s.True(s.topicPool.connectedPeers[nodeID3].dismissed)
cached := s.topicPool.cache.GetPeersRange(s.topicPool.topic, 5)
s.Equal(3, len(cached))
cachedMap := make(map[discv5.NodeID]*discv5.Node)
for _, peer := range cached {
cachedMap[peer.ID] = peer
}
s.topicPool.ConfirmDropped(s.peer, nodeID2)
s.topicPool.ConfirmDropped(s.peer, nodeID3)
s.Contains(cachedMap, peer1.ID)
s.Contains(cachedMap, peer2.ID)
s.Contains(cachedMap, peer3.ID)
s.Contains(s.topicPool.connectedPeers, nodeID1)
s.NotContains(s.topicPool.connectedPeers, nodeID2)
s.NotContains(s.topicPool.connectedPeers, nodeID3)
s.NotContains(s.topicPool.pendingPeers, nodeID1)
s.Contains(s.topicPool.pendingPeers, nodeID2)
s.Contains(s.topicPool.pendingPeers, nodeID3)
s.True(s.topicPool.maxCachedPeersReached())
cached = s.topicPool.cache.GetPeersRange(s.topicPool.topic, 5)
s.Equal(3, len(cached))
}
func (s *TopicPoolSuite) TestMaxPendingPeers() {
s.topicPool.maxPendingPeers = 2
nodeID1, peer1 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
nodeID2, peer2 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
nodeID3, peer3 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
pk1, _ := peer1.ID.Pubkey()
pk2, _ := peer2.ID.Pubkey()
pk3, _ := peer3.ID.Pubkey()
s.topicPool.addToPendingPeers(&peerInfo{discoveredTime: time.Now(), node: peer1, publicKey: pk1})
s.topicPool.addToPendingPeers(&peerInfo{discoveredTime: time.Now(), node: peer2, publicKey: pk2})
s.topicPool.addToPendingPeers(&peerInfo{discoveredTime: time.Now(), node: peer3, publicKey: pk3})
s.Equal(2, len(s.topicPool.pendingPeers))
s.Require().NotContains(s.topicPool.pendingPeers, nodeID1)
s.Require().Contains(s.topicPool.pendingPeers, nodeID2)
s.Require().Contains(s.topicPool.pendingPeers, nodeID3)
// maxPendingPeers = 0 means no limits.
s.topicPool.maxPendingPeers = 0
s.topicPool.addToPendingPeers(&peerInfo{discoveredTime: time.Now(), node: peer1, publicKey: pk1})
s.Equal(3, len(s.topicPool.pendingPeers))
s.Require().Contains(s.topicPool.pendingPeers, nodeID1)
s.Require().Contains(s.topicPool.pendingPeers, nodeID2)
s.Require().Contains(s.topicPool.pendingPeers, nodeID3)
}
func (s *TopicPoolSuite) TestQueueDuplicatePeers() {
_, peer1 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
_, peer2 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
pk1, _ := peer1.ID.Pubkey()
pk2, _ := peer2.ID.Pubkey()
peerInfo1 := &peerInfo{discoveredTime: time.Now(), node: peer1, publicKey: pk1}
peerInfo2 := &peerInfo{discoveredTime: time.Now(), node: peer2, publicKey: pk2}
s.topicPool.addToPendingPeers(&peerInfo{discoveredTime: time.Now(), node: peer1, publicKey: pk1})
s.topicPool.addToPendingPeers(&peerInfo{discoveredTime: time.Now(), node: peer2, publicKey: pk2})
s.topicPool.addToQueue(peerInfo1)
s.topicPool.addToQueue(peerInfo2)
s.Equal(2, len(s.topicPool.discoveredPeersQueue))
s.Equal(2, len(s.topicPool.discoveredPeers))
s.topicPool.addToQueue(peerInfo1)
s.Equal(2, len(s.topicPool.discoveredPeersQueue))
s.Equal(2, len(s.topicPool.discoveredPeers))
peer := s.topicPool.popFromQueue()
s.Equal(1, len(s.topicPool.discoveredPeersQueue))
s.Equal(1, len(s.topicPool.discoveredPeers))
s.Require().NotContains(s.topicPool.discoveredPeers, peer.NodeID())
}
func (s *TopicPoolSuite) TestNewTopicPoolInterface() {
limits := params.NewLimits(1, 2)
cache, err := newInMemoryCache()
s.Require().NoError(err)
topic := discv5.Topic("cap=cap1")
t := newTopicPool(nil, topic, limits, 100*time.Millisecond, 200*time.Millisecond, cache)
s.IsType(&TopicPool{}, t)
tp := newTopicPool(nil, MailServerDiscoveryTopic, limits, 100*time.Millisecond, 200*time.Millisecond, cache)
cacheTP := newCacheOnlyTopicPool(tp, &testTrueVerifier{})
s.IsType(&cacheOnlyTopicPool{}, cacheTP)
}
func (s *TopicPoolSuite) TestIgnoreInboundConnection() {
s.topicPool.limits = params.NewLimits(0, 0)
s.topicPool.maxCachedPeers = 0
nodeID1, peer1 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer1))
s.Contains(s.topicPool.pendingPeers, nodeID1)
s.topicPool.ConfirmAdded(s.peer, nodeID1)
s.Contains(s.topicPool.pendingPeers, nodeID1)
s.False(s.topicPool.pendingPeers[nodeID1].dismissed)
s.NotContains(s.topicPool.connectedPeers, nodeID1)
}
func (s *TopicPoolSuite) TestConnectedButRemoved() {
s.topicPool.limits = params.NewLimits(0, 0)
s.topicPool.maxCachedPeers = 1
nodeID1, peer1 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer1))
s.Contains(s.topicPool.pendingPeers, nodeID1)
s.topicPool.ConfirmAdded(s.peer, nodeID1)
s.Contains(s.topicPool.connectedPeers, nodeID1)
s.False(s.topicPool.ConfirmDropped(s.peer, nodeID1))
s.False(s.topicPool.pendingPeers[nodeID1].added)
}
func TestServerIgnoresInboundPeer(t *testing.T) {
topic := discv5.Topic("cap=cap1")
limits := params.NewLimits(0, 0)
cache, err := newInMemoryCache()
require.NoError(t, err)
topicPool := newTopicPool(nil, topic, limits, 100*time.Millisecond, 200*time.Millisecond, cache)
topicPool.running = 1
topicPool.maxCachedPeers = 0
whisper := whisper.New(nil)
srvkey, err := crypto.GenerateKey()
require.NoError(t, err)
server := &p2p.Server{
Config: p2p.Config{
MaxPeers: 1,
Name: "server",
ListenAddr: ":0",
PrivateKey: srvkey,
NoDiscovery: true,
Protocols: whisper.Protocols(),
},
}
require.NoError(t, server.Start())
clientkey, err := crypto.GenerateKey()
require.NoError(t, err)
client := &p2p.Server{
Config: p2p.Config{
MaxPeers: 1,
Name: "client",
ListenAddr: ":0",
PrivateKey: clientkey,
NoDiscovery: true,
Protocols: whisper.Protocols(),
},
}
require.NoError(t, client.Start())
// add peer to topic pool, as if it was discovered.
// it will be ignored due to the limit and added to a table of pending peers.
clientID := enode.PubkeyToIDV4(&clientkey.PublicKey)
clientNodeV5 := discv5.NewNode(
discv5.PubkeyID(&clientkey.PublicKey),
client.Self().IP(),
uint16(client.Self().UDP()),
uint16(client.Self().TCP()),
)
require.NoError(t, topicPool.processFoundNode(server, clientNodeV5))
require.Contains(t, topicPool.pendingPeers, clientID)
require.False(t, topicPool.pendingPeers[clientID].added)
errch := helpers.WaitForPeerAsync(server, client.Self().URLv4(), p2p.PeerEventTypeAdd, 5*time.Second)
// connect to a server from client. client will be an inbound connection for a server.
client.AddPeer(server.Self())
select {
case err := <-errch:
require.NoError(t, err)
case <-time.After(10 * time.Second):
require.FailNow(t, "failed waiting for WaitPeerAsync")
}
// wait some time to confirm that RemovePeer wasn't called on the server object.
errch = helpers.WaitForPeerAsync(server, client.Self().URLv4(), p2p.PeerEventTypeDrop, time.Second)
// simulate that event was received by a topic pool.
// topic pool will ignore this even because it sees that it is inbound connection.
topicPool.ConfirmAdded(server, clientID)
require.Contains(t, topicPool.pendingPeers, clientID)
require.False(t, topicPool.pendingPeers[clientID].dismissed)
select {
case err := <-errch:
require.EqualError(t, err, "wait for peer: timeout")
case <-time.After(10 * time.Second):
require.FailNow(t, "failed waiting for WaitPeerAsync")
}
}