package peers

import (
	"net"
	"testing"
	"time"

	"github.com/stretchr/testify/require"
	"github.com/stretchr/testify/suite"

	"github.com/ethereum/go-ethereum/common"
	"github.com/ethereum/go-ethereum/crypto"
	"github.com/ethereum/go-ethereum/p2p"
	"github.com/ethereum/go-ethereum/p2p/discv5"
	"github.com/ethereum/go-ethereum/p2p/enode"

	"github.com/status-im/status-go/params"
	"github.com/status-im/status-go/t/helpers"
	"github.com/status-im/status-go/whisper/v6"
)

type TopicPoolSuite struct {
	suite.Suite

	peer      *p2p.Server
	topicPool *TopicPool
}

func TestTopicPoolSuite(t *testing.T) {
	suite.Run(t, new(TopicPoolSuite))
}

func (s *TopicPoolSuite) createDiscV5Node(ip net.IP, port uint16) (enode.ID, *discv5.Node) {
	id, err := crypto.GenerateKey()
	s.Require().NoError(err)
	nodeID := enode.PubkeyToIDV4(&id.PublicKey)
	nodeV5 := discv5.NewNode(discv5.PubkeyID(&id.PublicKey), ip, port, port)
	return nodeID, nodeV5
}

func (s *TopicPoolSuite) SetupTest() {
	maxCachedPeersMultiplier = 1
	key, _ := crypto.GenerateKey()
	name := common.MakeName("peer", "1.0")
	s.peer = &p2p.Server{
		Config: p2p.Config{
			MaxPeers:    10,
			Name:        name,
			ListenAddr:  "0.0.0.0:0",
			PrivateKey:  key,
			NoDiscovery: true,
		},
	}
	s.Require().NoError(s.peer.Start())
	topic := discv5.Topic("cap=cap1")
	limits := params.NewLimits(1, 2)
	cache, err := newInMemoryCache()
	s.Require().NoError(err)
	s.topicPool = newTopicPool(nil, topic, limits, 100*time.Millisecond, 200*time.Millisecond, cache)
	s.topicPool.running = 1
	// This is a buffered channel to simplify testing.
	// If your test generates more than 10 mode changes,
	// override this `period` field or consume from it
	// using `AssertConsumed()`.
	s.topicPool.period = make(chan time.Duration, 10)
}

func (s *TopicPoolSuite) TearDown() {
	s.peer.Stop()
}

func (s *TopicPoolSuite) AssertConsumed(channel <-chan time.Duration, expected time.Duration, timeout time.Duration) {
	select {
	case received := <-channel:
		s.Equal(expected, received)
	case <-time.After(timeout):
		s.FailNow("timed out waiting")
	}
}

func (s *TopicPoolSuite) TestUsingCache() {
	s.topicPool.limits = params.NewLimits(1, 1)
	s.topicPool.maxCachedPeers = 1

	nodeID1, peer1 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
	s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer1))
	nodeID2, peer2 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
	s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer2))

	s.topicPool.ConfirmAdded(s.peer, nodeID1)
	s.topicPool.ConfirmAdded(s.peer, nodeID2)

	cached := s.topicPool.cache.GetPeersRange(s.topicPool.topic, 10)
	s.Contains(cached, peer1)
	s.Contains(cached, peer2)

	s.topicPool.ConfirmDropped(s.peer, nodeID2)
	cached = s.topicPool.cache.GetPeersRange(s.topicPool.topic, 10)
	s.Contains(cached, peer1)
	s.Contains(cached, peer2)

	// A peer that drops by itself, should be removed from the cache.
	s.topicPool.ConfirmDropped(s.peer, nodeID1)
	s.Equal([]*discv5.Node{peer2}, s.topicPool.cache.GetPeersRange(s.topicPool.topic, 10))
}

func (s *TopicPoolSuite) TestSyncSwitches() {
	nodeID, peer := s.createDiscV5Node(s.peer.Self().IP(), 32311)
	s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer))
	s.topicPool.ConfirmAdded(s.peer, nodeID)
	s.AssertConsumed(s.topicPool.period, s.topicPool.slowMode, time.Second)
	s.NotNil(s.topicPool.connectedPeers[nodeID])
	s.topicPool.ConfirmDropped(s.peer, nodeID)
	s.AssertConsumed(s.topicPool.period, s.topicPool.fastMode, time.Second)
}

func (s *TopicPoolSuite) TestTimeoutFastMode() {
	s.topicPool.fastModeTimeout = time.Millisecond * 50

	// set fast mode
	s.topicPool.mu.Lock()
	s.topicPool.setSyncMode(s.topicPool.fastMode)
	s.topicPool.mu.Unlock()
	s.Equal(s.topicPool.fastMode, <-s.topicPool.period)

	// switch to slow mode after `fastModeTimeout`
	select {
	case mode := <-s.topicPool.period:
		s.Equal(s.topicPool.slowMode, mode)
	case <-time.After(s.topicPool.fastModeTimeout * 2):
		s.FailNow("timed out")
	}
}

func (s *TopicPoolSuite) TestSetSyncMode() {
	s.topicPool.fastModeTimeout = 0

	// set fast mode
	s.topicPool.setSyncMode(s.topicPool.fastMode)
	s.Equal(s.topicPool.fastMode, <-s.topicPool.period)
	s.Equal(s.topicPool.fastMode, s.topicPool.currentMode)

	// skip setting the same mode
	s.topicPool.setSyncMode(s.topicPool.fastMode)
	select {
	case <-s.topicPool.period:
		s.FailNow("should not have update the mode")
	default:
		// pass
	}

	// switch to slow mode
	cancel := make(chan struct{})
	s.topicPool.fastModeTimeoutCancel = cancel // should be set to nil
	s.topicPool.setSyncMode(s.topicPool.slowMode)
	s.Equal(s.topicPool.slowMode, <-s.topicPool.period)
	s.Equal(s.topicPool.slowMode, s.topicPool.currentMode)
	select {
	case <-cancel:
		s.Nil(s.topicPool.fastModeTimeoutCancel)
	default:
		s.FailNow("cancel should be closed")
	}
}

func (s *TopicPoolSuite) TestNewPeerSelectedOnDrop() {
	nodeID1, peer1 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
	nodeID2, peer2 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
	nodeID3, peer3 := s.createDiscV5Node(s.peer.Self().IP(), 32311)

	// add 3 nodes and confirm connection for 1 and 2
	s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer1))
	s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer2))
	s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer3))
	s.Len(s.topicPool.pendingPeers, 3)
	s.Len(s.topicPool.discoveredPeersQueue, 0)
	s.topicPool.ConfirmAdded(s.peer, nodeID1)
	s.Contains(s.topicPool.connectedPeers, nodeID1)
	s.topicPool.ConfirmAdded(s.peer, nodeID2)
	s.Contains(s.topicPool.connectedPeers, nodeID2)
	s.topicPool.ConfirmAdded(s.peer, nodeID3)
	s.topicPool.ConfirmDropped(s.peer, nodeID3)
	s.Contains(s.topicPool.pendingPeers, nodeID3)
	s.Len(s.topicPool.pendingPeers, 1)
	s.Len(s.topicPool.discoveredPeersQueue, 1)
	// drop peer1
	s.True(s.topicPool.ConfirmDropped(s.peer, nodeID1))
	s.NotContains(s.topicPool.connectedPeers, nodeID1)
	// add peer from the pool
	s.Equal(peer3.ID, s.topicPool.AddPeerFromTable(s.peer).ID)
	s.Len(s.topicPool.pendingPeers, 1)
	s.Len(s.topicPool.discoveredPeersQueue, 0)
}

type mockConstantClock struct{}

func (mockConstantClock) Now() time.Time { return time.Unix(0, 0) }

// Regression test for https://github.com/status-im/nim-status-client/issues/522
func (s *TopicPoolSuite) TestNewPeerLackOfClockPrecision() {
	_, peer1 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
	_, peer2 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
	_, peer3 := s.createDiscV5Node(s.peer.Self().IP(), 32311)

	s.topicPool.maxPendingPeers = 2

	s.topicPool.clock = mockConstantClock{}

	s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer1))
	s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer2))
	s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer3))
}

func (s *TopicPoolSuite) TestRequestedDoesntRemove() {
	// max limit is 1 because we test that 2nd peer will stay in local table
	// when we request to drop it
	s.topicPool.limits = params.NewLimits(1, 1)
	s.topicPool.maxCachedPeers = 1

	nodeID1, peer1 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
	nodeID2, peer2 := s.createDiscV5Node(s.peer.Self().IP(), 32311)

	s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer1))
	s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer2))
	s.topicPool.ConfirmAdded(s.peer, nodeID1)
	s.topicPool.ConfirmAdded(s.peer, nodeID2)
	s.False(s.topicPool.connectedPeers[nodeID1].dismissed)
	s.True(s.topicPool.connectedPeers[nodeID2].dismissed)
	s.topicPool.ConfirmDropped(s.peer, nodeID2)
	s.Contains(s.topicPool.pendingPeers, nodeID2)
	s.NotContains(s.topicPool.connectedPeers, nodeID2)
	s.topicPool.ConfirmDropped(s.peer, nodeID1)
	s.NotContains(s.topicPool.pendingPeers, nodeID1)
	s.NotContains(s.topicPool.connectedPeers, nodeID1)
}

func (s *TopicPoolSuite) TestTheMostRecentPeerIsSelected() {
	s.topicPool.limits = params.NewLimits(1, 1)
	s.topicPool.maxCachedPeers = 1

	nodeID1, peer1 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
	nodeID2, peer2 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
	nodeID3, peer3 := s.createDiscV5Node(s.peer.Self().IP(), 32311)

	// after these operations, peer1 is confirmed and peer3 and peer2
	// was added to the pool; peer3 is the most recent one
	s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer1))
	s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer2))
	s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer3))
	s.topicPool.ConfirmAdded(s.peer, nodeID1)
	s.topicPool.ConfirmAdded(s.peer, nodeID2)
	s.topicPool.ConfirmAdded(s.peer, nodeID3)

	s.topicPool.ConfirmDropped(s.peer, nodeID2)
	s.topicPool.ConfirmDropped(s.peer, nodeID3)
	// peer1 has dropped
	s.topicPool.ConfirmDropped(s.peer, nodeID1)
	// and peer3 is take from the pool as the most recent
	s.True(s.topicPool.pendingPeers[nodeID2].discoveredTime.Before(s.topicPool.pendingPeers[nodeID3].discoveredTime))
	s.Equal(peer3.ID, s.topicPool.AddPeerFromTable(s.peer).ID)
}

func (s *TopicPoolSuite) TestSelectPeerAfterMaxLimit() {
	s.topicPool.limits = params.NewLimits(1, 1)
	s.topicPool.maxCachedPeers = 1

	nodeID1, peer1 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
	nodeID2, peer2 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
	nodeID3, peer3 := s.createDiscV5Node(s.peer.Self().IP(), 32311)

	s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer1))
	s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer2))
	s.topicPool.ConfirmAdded(s.peer, nodeID1)
	s.topicPool.ConfirmAdded(s.peer, nodeID2)
	s.topicPool.ConfirmDropped(s.peer, nodeID2)
	s.Len(s.topicPool.pendingPeers, 1)
	s.Contains(s.topicPool.pendingPeers, nodeID2)
	s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer3))
	s.Len(s.topicPool.pendingPeers, 2)
	s.Contains(s.topicPool.pendingPeers, nodeID3)
	s.Equal(peer3, s.topicPool.AddPeerFromTable(s.peer))
}

func (s *TopicPoolSuite) TestReplacementPeerIsCounted() {
	s.topicPool.limits = params.NewLimits(1, 1)
	s.topicPool.maxCachedPeers = 1

	nodeID1, peer1 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
	nodeID2, peer2 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
	s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer1))
	s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer2))
	s.topicPool.ConfirmAdded(s.peer, nodeID1)
	s.topicPool.ConfirmAdded(s.peer, nodeID2)
	s.topicPool.ConfirmDropped(s.peer, nodeID2)
	s.topicPool.ConfirmDropped(s.peer, nodeID1)

	s.NotContains(s.topicPool.pendingPeers, nodeID1)
	s.NotContains(s.topicPool.connectedPeers, nodeID1)
	s.Contains(s.topicPool.pendingPeers, nodeID2)
	s.topicPool.pendingPeers[nodeID2].added = true
	s.topicPool.ConfirmAdded(s.peer, nodeID2)
	s.True(s.topicPool.MaxReached())
}

func (s *TopicPoolSuite) TestPeerDontAddTwice() {
	s.topicPool.limits = params.NewLimits(1, 1)
	s.topicPool.maxCachedPeers = 1

	nodeID1, peer1 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
	_, peer2 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
	s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer1))
	s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer2))
	s.topicPool.ConfirmAdded(s.peer, nodeID1)
	// peer2 already added to p2p server no reason to add it again
	s.Nil(s.topicPool.AddPeerFromTable(s.peer))
}

func (s *TopicPoolSuite) TestMaxCachedPeers() {
	s.topicPool.limits = params.NewLimits(1, 1)
	s.topicPool.maxCachedPeers = 3
	nodeID1, peer1 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
	nodeID2, peer2 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
	nodeID3, peer3 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
	s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer1))
	s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer2))
	s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer3))
	s.topicPool.ConfirmAdded(s.peer, nodeID1)
	s.topicPool.ConfirmAdded(s.peer, nodeID2)
	s.topicPool.ConfirmAdded(s.peer, nodeID3)

	s.Equal(3, len(s.topicPool.connectedPeers))
	s.False(s.topicPool.connectedPeers[nodeID1].dismissed)
	s.True(s.topicPool.connectedPeers[nodeID2].dismissed)
	s.True(s.topicPool.connectedPeers[nodeID3].dismissed)

	cached := s.topicPool.cache.GetPeersRange(s.topicPool.topic, 5)
	s.Equal(3, len(cached))

	cachedMap := make(map[discv5.NodeID]*discv5.Node)
	for _, peer := range cached {
		cachedMap[peer.ID] = peer
	}

	s.topicPool.ConfirmDropped(s.peer, nodeID2)
	s.topicPool.ConfirmDropped(s.peer, nodeID3)

	s.Contains(cachedMap, peer1.ID)
	s.Contains(cachedMap, peer2.ID)
	s.Contains(cachedMap, peer3.ID)

	s.Contains(s.topicPool.connectedPeers, nodeID1)
	s.NotContains(s.topicPool.connectedPeers, nodeID2)
	s.NotContains(s.topicPool.connectedPeers, nodeID3)

	s.NotContains(s.topicPool.pendingPeers, nodeID1)
	s.Contains(s.topicPool.pendingPeers, nodeID2)
	s.Contains(s.topicPool.pendingPeers, nodeID3)

	s.True(s.topicPool.maxCachedPeersReached())
	cached = s.topicPool.cache.GetPeersRange(s.topicPool.topic, 5)
	s.Equal(3, len(cached))
}

func (s *TopicPoolSuite) TestMaxPendingPeers() {
	s.topicPool.maxPendingPeers = 2

	nodeID1, peer1 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
	nodeID2, peer2 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
	nodeID3, peer3 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
	pk1, _ := peer1.ID.Pubkey()
	pk2, _ := peer2.ID.Pubkey()
	pk3, _ := peer3.ID.Pubkey()

	s.topicPool.addToPendingPeers(&peerInfo{discoveredTime: time.Now(), node: peer1, publicKey: pk1})
	s.topicPool.addToPendingPeers(&peerInfo{discoveredTime: time.Now(), node: peer2, publicKey: pk2})
	s.topicPool.addToPendingPeers(&peerInfo{discoveredTime: time.Now(), node: peer3, publicKey: pk3})

	s.Equal(2, len(s.topicPool.pendingPeers))
	s.Require().NotContains(s.topicPool.pendingPeers, nodeID1)
	s.Require().Contains(s.topicPool.pendingPeers, nodeID2)
	s.Require().Contains(s.topicPool.pendingPeers, nodeID3)

	// maxPendingPeers = 0 means no limits.
	s.topicPool.maxPendingPeers = 0
	s.topicPool.addToPendingPeers(&peerInfo{discoveredTime: time.Now(), node: peer1, publicKey: pk1})

	s.Equal(3, len(s.topicPool.pendingPeers))
	s.Require().Contains(s.topicPool.pendingPeers, nodeID1)
	s.Require().Contains(s.topicPool.pendingPeers, nodeID2)
	s.Require().Contains(s.topicPool.pendingPeers, nodeID3)
}

func (s *TopicPoolSuite) TestQueueDuplicatePeers() {
	_, peer1 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
	_, peer2 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
	pk1, _ := peer1.ID.Pubkey()
	pk2, _ := peer2.ID.Pubkey()
	peerInfo1 := &peerInfo{discoveredTime: time.Now(), node: peer1, publicKey: pk1}
	peerInfo2 := &peerInfo{discoveredTime: time.Now(), node: peer2, publicKey: pk2}

	s.topicPool.addToPendingPeers(&peerInfo{discoveredTime: time.Now(), node: peer1, publicKey: pk1})
	s.topicPool.addToPendingPeers(&peerInfo{discoveredTime: time.Now(), node: peer2, publicKey: pk2})
	s.topicPool.addToQueue(peerInfo1)
	s.topicPool.addToQueue(peerInfo2)

	s.Equal(2, len(s.topicPool.discoveredPeersQueue))
	s.Equal(2, len(s.topicPool.discoveredPeers))

	s.topicPool.addToQueue(peerInfo1)

	s.Equal(2, len(s.topicPool.discoveredPeersQueue))
	s.Equal(2, len(s.topicPool.discoveredPeers))

	peer := s.topicPool.popFromQueue()

	s.Equal(1, len(s.topicPool.discoveredPeersQueue))
	s.Equal(1, len(s.topicPool.discoveredPeers))
	s.Require().NotContains(s.topicPool.discoveredPeers, peer.NodeID())
}

func (s *TopicPoolSuite) TestNewTopicPoolInterface() {
	limits := params.NewLimits(1, 2)
	cache, err := newInMemoryCache()
	s.Require().NoError(err)

	topic := discv5.Topic("cap=cap1")
	t := newTopicPool(nil, topic, limits, 100*time.Millisecond, 200*time.Millisecond, cache)
	s.IsType(&TopicPool{}, t)

	tp := newTopicPool(nil, MailServerDiscoveryTopic, limits, 100*time.Millisecond, 200*time.Millisecond, cache)
	cacheTP := newCacheOnlyTopicPool(tp, &testTrueVerifier{})
	s.IsType(&cacheOnlyTopicPool{}, cacheTP)
}

func (s *TopicPoolSuite) TestIgnoreInboundConnection() {
	s.topicPool.limits = params.NewLimits(0, 0)
	s.topicPool.maxCachedPeers = 0

	nodeID1, peer1 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
	s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer1))
	s.Contains(s.topicPool.pendingPeers, nodeID1)
	s.topicPool.ConfirmAdded(s.peer, nodeID1)
	s.Contains(s.topicPool.pendingPeers, nodeID1)
	s.False(s.topicPool.pendingPeers[nodeID1].dismissed)
	s.NotContains(s.topicPool.connectedPeers, nodeID1)
}

func (s *TopicPoolSuite) TestConnectedButRemoved() {
	s.topicPool.limits = params.NewLimits(0, 0)
	s.topicPool.maxCachedPeers = 1

	nodeID1, peer1 := s.createDiscV5Node(s.peer.Self().IP(), 32311)
	s.Require().NoError(s.topicPool.processFoundNode(s.peer, peer1))
	s.Contains(s.topicPool.pendingPeers, nodeID1)
	s.topicPool.ConfirmAdded(s.peer, nodeID1)
	s.Contains(s.topicPool.connectedPeers, nodeID1)
	s.False(s.topicPool.ConfirmDropped(s.peer, nodeID1))
	s.False(s.topicPool.pendingPeers[nodeID1].added)
}

func TestServerIgnoresInboundPeer(t *testing.T) {
	topic := discv5.Topic("cap=cap1")
	limits := params.NewLimits(0, 0)
	cache, err := newInMemoryCache()
	require.NoError(t, err)
	topicPool := newTopicPool(nil, topic, limits, 100*time.Millisecond, 200*time.Millisecond, cache)
	topicPool.running = 1
	topicPool.maxCachedPeers = 0

	whisper := whisper.New(nil)
	srvkey, err := crypto.GenerateKey()
	require.NoError(t, err)
	server := &p2p.Server{
		Config: p2p.Config{
			MaxPeers:    1,
			Name:        "server",
			ListenAddr:  ":0",
			PrivateKey:  srvkey,
			NoDiscovery: true,
			Protocols:   whisper.Protocols(),
		},
	}
	require.NoError(t, server.Start())
	clientkey, err := crypto.GenerateKey()
	require.NoError(t, err)
	client := &p2p.Server{
		Config: p2p.Config{
			MaxPeers:    1,
			Name:        "client",
			ListenAddr:  ":0",
			PrivateKey:  clientkey,
			NoDiscovery: true,
			Protocols:   whisper.Protocols(),
		},
	}
	require.NoError(t, client.Start())

	// add peer to topic pool, as if it was discovered.
	// it will be ignored due to the limit and added to a table of pending peers.
	clientID := enode.PubkeyToIDV4(&clientkey.PublicKey)
	clientNodeV5 := discv5.NewNode(
		discv5.PubkeyID(&clientkey.PublicKey),
		client.Self().IP(),
		uint16(client.Self().UDP()),
		uint16(client.Self().TCP()),
	)
	require.NoError(t, topicPool.processFoundNode(server, clientNodeV5))
	require.Contains(t, topicPool.pendingPeers, clientID)
	require.False(t, topicPool.pendingPeers[clientID].added)

	errch := helpers.WaitForPeerAsync(server, client.Self().URLv4(), p2p.PeerEventTypeAdd, 5*time.Second)
	// connect to a server from client. client will be an inbound connection for a server.
	client.AddPeer(server.Self())
	select {
	case err := <-errch:
		require.NoError(t, err)
	case <-time.After(10 * time.Second):
		require.FailNow(t, "failed waiting for WaitPeerAsync")
	}

	// wait some time to confirm that RemovePeer wasn't called on the server object.
	errch = helpers.WaitForPeerAsync(server, client.Self().URLv4(), p2p.PeerEventTypeDrop, time.Second)
	// simulate that event was received by a topic pool.
	// topic pool will ignore this even because it sees that it is inbound connection.
	topicPool.ConfirmAdded(server, clientID)
	require.Contains(t, topicPool.pendingPeers, clientID)
	require.False(t, topicPool.pendingPeers[clientID].dismissed)

	select {
	case err := <-errch:
		require.EqualError(t, err, "wait for peer: timeout")
	case <-time.After(10 * time.Second):
		require.FailNow(t, "failed waiting for WaitPeerAsync")
	}
}