fix topic pool tests
This commit is contained in:
parent
a4701f41ee
commit
65c9d39040
|
@ -16,9 +16,6 @@ import (
|
|||
"github.com/status-im/status-go/t/utils"
|
||||
)
|
||||
|
||||
var enode1 = "enode://f32efef2739e5135a0f9a80600b321ba4d13393a5f1d3f5f593df85919262f06c70bfa66d38507b9d79a91021f5e200ec20150592e72934c66248e87014c4317@1.1.1.1:30404"
|
||||
var enode2 = "enode://f32efef2739e5135a0f9a80600b321ba4d13393a5f1d3f5f593df85919262f06c70bfa66d38507b9d79a91021f5e200ec20150592e72934c66248e87014c4317@1.1.1.1:30404"
|
||||
|
||||
func TestMakeNodeDefaultConfig(t *testing.T) {
|
||||
utils.Init()
|
||||
config, err := utils.MakeTestNodeConfig(3)
|
||||
|
@ -31,43 +28,6 @@ func TestMakeNodeDefaultConfig(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestMakeNodeWellFormedBootnodes(t *testing.T) {
|
||||
utils.Init()
|
||||
config, err := utils.MakeTestNodeConfig(3)
|
||||
require.NoError(t, err)
|
||||
|
||||
bootnodes := []string{
|
||||
enode1,
|
||||
enode2,
|
||||
}
|
||||
config.ClusterConfig.BootNodes = bootnodes
|
||||
|
||||
db, err := leveldb.Open(storage.NewMemStorage(), nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = MakeNode(config, &accounts.Manager{}, db)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestMakeNodeMalformedBootnodes(t *testing.T) {
|
||||
utils.Init()
|
||||
config, err := utils.MakeTestNodeConfig(3)
|
||||
require.NoError(t, err)
|
||||
|
||||
bootnodes := []string{
|
||||
enode1,
|
||||
enode2,
|
||||
"enode://badkey@3.3.3.3:30303",
|
||||
}
|
||||
config.ClusterConfig.BootNodes = bootnodes
|
||||
|
||||
db, err := leveldb.Open(storage.NewMemStorage(), nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = MakeNode(config, &accounts.Manager{}, db)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestParseNodesToNodeID(t *testing.T) {
|
||||
identity, err := crypto.GenerateKey()
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -64,7 +64,11 @@ func TestNodeRPCClientCallOnlyPublicAPIs(t *testing.T) {
|
|||
func TestNodeRPCPrivateClientCallPrivateService(t *testing.T) {
|
||||
var err error
|
||||
|
||||
statusNode, err := createAndStartStatusNode(¶ms.NodeConfig{})
|
||||
statusNode, err := createAndStartStatusNode(¶ms.NodeConfig{
|
||||
WakuConfig: params.WakuConfig{
|
||||
Enabled: true,
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
err := statusNode.Stop()
|
||||
|
|
|
@ -2,6 +2,7 @@ package peers
|
|||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/p2p"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
|
@ -65,7 +66,14 @@ func (t *cacheOnlyTopicPool) ConfirmAdded(server *p2p.Server, nodeID enode.ID) {
|
|||
// If a peer was trusted, it was moved to connectedPeers,
|
||||
// signal was sent and we can safely remove it.
|
||||
if peer, ok := t.connectedPeers[nodeID]; ok {
|
||||
// NOTE: removeServerPeer removes the server peer immediately.
|
||||
// which means the next discovery.summary is not going to include
|
||||
// the peer.
|
||||
// We leave some time so that we ensure the signal is propagated
|
||||
go func() {
|
||||
time.Sleep(200)
|
||||
t.removeServerPeer(server, peer)
|
||||
}()
|
||||
// Delete it from `connectedPeers` immediately to
|
||||
// prevent removing it from the cache which logic is
|
||||
// implemented in TopicPool.
|
||||
|
@ -75,7 +83,15 @@ func (t *cacheOnlyTopicPool) ConfirmAdded(server *p2p.Server, nodeID enode.ID) {
|
|||
// It a peer was not trusted, it is still in pendingPeers.
|
||||
// We should remove it from the p2p.Server.
|
||||
if peer, ok := t.pendingPeers[nodeID]; ok {
|
||||
// NOTE: removeServerPeer removes the server peer immediately.
|
||||
// which means the next discovery.summary is not going to include
|
||||
// the peer.
|
||||
// We leave some time so that we ensure the signal is propagated
|
||||
go func() {
|
||||
time.Sleep(200)
|
||||
t.removeServerPeer(server, peer.peerInfo)
|
||||
}()
|
||||
|
||||
// Delete it from `connectedPeers` immediately to
|
||||
// prevent removing it from the cache which logic is
|
||||
// implemented in TopicPool.
|
||||
|
|
|
@ -346,8 +346,10 @@ func (s *PeerPoolSimulationSuite) TestMailServerPeersDiscovery() {
|
|||
poolEvents <- envelope.Type
|
||||
var summary []*p2p.PeerInfo
|
||||
s.NoError(json.Unmarshal(envelope.Event, &summary))
|
||||
if len(summary) != 0 {
|
||||
summaries <- summary
|
||||
}
|
||||
}
|
||||
})
|
||||
defer signal.ResetDefaultNodeNotificationHandler()
|
||||
|
||||
|
@ -384,7 +386,9 @@ func (s *PeerPoolSimulationSuite) TestMailServerPeersDiscovery() {
|
|||
|
||||
// wait for a summary event to be sure that ConfirmAdded() was called
|
||||
s.Equal(signal.EventDiscoverySummary, s.getPoolEvent(poolEvents))
|
||||
s.Equal(s.peers[0].Self().ID().String(), (<-summaries)[0].ID)
|
||||
summary := (<-summaries)
|
||||
s.Require().Len(summary, 1)
|
||||
s.Equal(s.peers[0].Self().ID().String(), summary[0].ID)
|
||||
|
||||
// check cache
|
||||
cachedPeers := peerPool.cache.GetPeersRange(MailServerDiscoveryTopic, 5)
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
|
@ -15,8 +14,6 @@ import (
|
|||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
|
||||
"github.com/status-im/status-go/params"
|
||||
"github.com/status-im/status-go/t/helpers"
|
||||
"github.com/status-im/status-go/waku"
|
||||
)
|
||||
|
||||
type TopicPoolSuite struct {
|
||||
|
@ -457,79 +454,3 @@ func (s *TopicPoolSuite) TestConnectedButRemoved() {
|
|||
s.False(s.topicPool.ConfirmDropped(s.peer, nodeID1))
|
||||
s.False(s.topicPool.pendingPeers[nodeID1].added)
|
||||
}
|
||||
|
||||
func TestServerIgnoresInboundPeer(t *testing.T) {
|
||||
topic := discv5.Topic("cap=cap1")
|
||||
limits := params.NewLimits(0, 0)
|
||||
cache, err := newInMemoryCache()
|
||||
require.NoError(t, err)
|
||||
topicPool := newTopicPool(nil, topic, limits, 100*time.Millisecond, 200*time.Millisecond, cache)
|
||||
topicPool.running = 1
|
||||
topicPool.maxCachedPeers = 0
|
||||
|
||||
waku := waku.New(&waku.DefaultConfig, nil)
|
||||
srvkey, err := crypto.GenerateKey()
|
||||
require.NoError(t, err)
|
||||
server := &p2p.Server{
|
||||
Config: p2p.Config{
|
||||
MaxPeers: 1,
|
||||
Name: "server",
|
||||
ListenAddr: ":0",
|
||||
PrivateKey: srvkey,
|
||||
NoDiscovery: true,
|
||||
Protocols: waku.Protocols(),
|
||||
},
|
||||
}
|
||||
require.NoError(t, server.Start())
|
||||
clientkey, err := crypto.GenerateKey()
|
||||
require.NoError(t, err)
|
||||
client := &p2p.Server{
|
||||
Config: p2p.Config{
|
||||
MaxPeers: 1,
|
||||
Name: "client",
|
||||
ListenAddr: ":0",
|
||||
PrivateKey: clientkey,
|
||||
NoDiscovery: true,
|
||||
Protocols: waku.Protocols(),
|
||||
},
|
||||
}
|
||||
require.NoError(t, client.Start())
|
||||
|
||||
// add peer to topic pool, as if it was discovered.
|
||||
// it will be ignored due to the limit and added to a table of pending peers.
|
||||
clientID := enode.PubkeyToIDV4(&clientkey.PublicKey)
|
||||
clientNodeV5 := discv5.NewNode(
|
||||
discv5.PubkeyID(&clientkey.PublicKey),
|
||||
client.Self().IP(),
|
||||
uint16(client.Self().UDP()),
|
||||
uint16(client.Self().TCP()),
|
||||
)
|
||||
require.NoError(t, topicPool.processFoundNode(server, clientNodeV5))
|
||||
require.Contains(t, topicPool.pendingPeers, clientID)
|
||||
require.False(t, topicPool.pendingPeers[clientID].added)
|
||||
|
||||
errch := helpers.WaitForPeerAsync(server, client.Self().URLv4(), p2p.PeerEventTypeAdd, 5*time.Second)
|
||||
// connect to a server from client. client will be an inbound connection for a server.
|
||||
client.AddPeer(server.Self())
|
||||
select {
|
||||
case err := <-errch:
|
||||
require.NoError(t, err)
|
||||
case <-time.After(10 * time.Second):
|
||||
require.FailNow(t, "failed waiting for WaitPeerAsync")
|
||||
}
|
||||
|
||||
// wait some time to confirm that RemovePeer wasn't called on the server object.
|
||||
errch = helpers.WaitForPeerAsync(server, client.Self().URLv4(), p2p.PeerEventTypeDrop, time.Second)
|
||||
// simulate that event was received by a topic pool.
|
||||
// topic pool will ignore this even because it sees that it is inbound connection.
|
||||
topicPool.ConfirmAdded(server, clientID)
|
||||
require.Contains(t, topicPool.pendingPeers, clientID)
|
||||
require.False(t, topicPool.pendingPeers[clientID].dismissed)
|
||||
|
||||
select {
|
||||
case err := <-errch:
|
||||
require.EqualError(t, err, "wait for peer: timeout")
|
||||
case <-time.After(10 * time.Second):
|
||||
require.FailNow(t, "failed waiting for WaitPeerAsync")
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue