Use cache in topic pool (#955)

It must have slipped when i moved significant chunk of logic
from peer pool to topic pool and because of the != nil protection
this bug was missed
This commit is contained in:
Dmitry Shulyak 2018-05-15 18:05:57 +03:00 committed by GitHub
parent bd68fa15c9
commit f5a8be08de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 34 additions and 25 deletions

View File

@ -5,19 +5,24 @@ import (
"testing" "testing"
"github.com/ethereum/go-ethereum/p2p/discv5" "github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/status-im/status-go/geth/db"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/storage"
) )
func TestPeersRange(t *testing.T) { // newInMemoryCache creates a cache for tests
rootDB, err := db.Create("", "status-peers-test") func newInMemoryCache() (*Cache, error) {
require.NoError(t, err) memdb, err := leveldb.Open(storage.NewMemStorage(), nil)
defer func() { if err != nil {
assert.NoError(t, rootDB.Close()) return nil, err
}() }
return NewCache(memdb), nil
}
peersDB := Cache{db: rootDB} func TestPeersRange(t *testing.T) {
peersDB, err := newInMemoryCache()
require.NoError(t, err)
topic := discv5.Topic("test") topic := discv5.Topic("test")
peers := [3]*discv5.Node{ peers := [3]*discv5.Node{
discv5.NewNode(discv5.NodeID{3}, net.IPv4(100, 100, 0, 3), 32311, 32311), discv5.NewNode(discv5.NodeID{3}, net.IPv4(100, 100, 0, 3), 32311, 32311),

View File

@ -116,7 +116,7 @@ func (p *PeerPool) Start(server *p2p.Server) error {
// collect topics and start searching for nodes // collect topics and start searching for nodes
p.topics = make([]*TopicPool, 0, len(p.config)) p.topics = make([]*TopicPool, 0, len(p.config))
for topic, limits := range p.config { for topic, limits := range p.config {
topicPool := NewTopicPool(topic, limits, p.opts.SlowSync, p.opts.FastSync) topicPool := NewTopicPool(topic, limits, p.opts.SlowSync, p.opts.FastSync, p.cache)
if err := topicPool.StartSearch(server); err != nil { if err := topicPool.StartSearch(server); err != nil {
return err return err
} }

View File

@ -133,7 +133,9 @@ func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() {
topic: params.NewLimits(1, 1), // limits are chosen for simplicity of the simulation topic: params.NewLimits(1, 1), // limits are chosen for simplicity of the simulation
} }
peerPoolOpts := &Options{100 * time.Millisecond, 100 * time.Millisecond, 0, true} peerPoolOpts := &Options{100 * time.Millisecond, 100 * time.Millisecond, 0, true}
peerPool := NewPeerPool(config, nil, peerPoolOpts) cache, err := newInMemoryCache()
s.Require().NoError(err)
peerPool := NewPeerPool(config, cache, peerPoolOpts)
register := NewRegister(topic) register := NewRegister(topic)
s.Require().NoError(register.Start(s.peers[0])) s.Require().NoError(register.Start(s.peers[0]))
// need to wait for topic to get registered, discv5 can query same node // need to wait for topic to get registered, discv5 can query same node
@ -173,6 +175,11 @@ func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() {
s.Require().Equal(signal.EventDiscoverySummary, s.getPoolEvent(poolEvents)) s.Require().Equal(signal.EventDiscoverySummary, s.getPoolEvent(poolEvents))
summary = <-summaries summary = <-summaries
s.Len(summary, 1) s.Len(summary, 1)
// verify that we are actually using cache
cachedPeers := peerPool.cache.GetPeersRange(topic, 1)
s.Len(cachedPeers, 1)
s.Equal(s.peers[2].Self().ID, discover.NodeID(cachedPeers[0].ID))
} }
// TestPeerPoolMaxPeersOverflow verifies that following scenario will not occur: // TestPeerPoolMaxPeersOverflow verifies that following scenario will not occur:

View File

@ -15,7 +15,7 @@ import (
) )
// NewTopicPool returns instance of TopicPool // NewTopicPool returns instance of TopicPool
func NewTopicPool(topic discv5.Topic, limits params.Limits, slowMode, fastMode time.Duration) *TopicPool { func NewTopicPool(topic discv5.Topic, limits params.Limits, slowMode, fastMode time.Duration, cache *Cache) *TopicPool {
pool := TopicPool{ pool := TopicPool{
topic: topic, topic: topic,
limits: limits, limits: limits,
@ -25,6 +25,7 @@ func NewTopicPool(topic discv5.Topic, limits params.Limits, slowMode, fastMode t
peerPool: make(map[discv5.NodeID]*peerInfoItem), peerPool: make(map[discv5.NodeID]*peerInfoItem),
peerPoolQueue: make(peerPriorityQueue, 0), peerPoolQueue: make(peerPriorityQueue, 0),
connectedPeers: make(map[discv5.NodeID]*peerInfo), connectedPeers: make(map[discv5.NodeID]*peerInfo),
cache: cache,
} }
heap.Init(&pool.peerPoolQueue) heap.Init(&pool.peerPoolQueue)
@ -198,10 +199,8 @@ func (t *TopicPool) ConfirmAdded(server *p2p.Server, nodeID discover.NodeID) {
// established connection means that the node // established connection means that the node
// is a viable candidate for a connection and can be cached // is a viable candidate for a connection and can be cached
if t.cache != nil { if err := t.cache.AddPeer(peer.node, t.topic); err != nil {
if err := t.cache.AddPeer(peer.node, t.topic); err != nil { log.Error("failed to persist a peer", "error", err)
log.Error("failed to persist a peer", "error", err)
}
} }
// if the upper limit is already reached, drop this peer // if the upper limit is already reached, drop this peer
@ -256,10 +255,8 @@ func (t *TopicPool) ConfirmDropped(server *p2p.Server, nodeID discover.NodeID) b
t.removeServerPeer(server, peer) t.removeServerPeer(server, peer)
delete(t.connectedPeers, discV5NodeID) delete(t.connectedPeers, discV5NodeID)
if t.cache != nil { if err := t.cache.RemovePeer(discV5NodeID, t.topic); err != nil {
if err := t.cache.RemovePeer(discV5NodeID, t.topic); err != nil { log.Error("failed to remove peer from cache", "error", err)
log.Error("failed to remove peer from cache", "error", err)
}
} }
// As we removed a peer, update a sync strategy if needed. // As we removed a peer, update a sync strategy if needed.
@ -312,11 +309,9 @@ func (t *TopicPool) StartSearch(server *p2p.Server) error {
found := make(chan *discv5.Node, 5) // 5 reasonable number for concurrently found nodes found := make(chan *discv5.Node, 5) // 5 reasonable number for concurrently found nodes
lookup := make(chan bool, 10) // sufficiently buffered channel, just prevents blocking because of lookup lookup := make(chan bool, 10) // sufficiently buffered channel, just prevents blocking because of lookup
if t.cache != nil { for _, peer := range t.cache.GetPeersRange(t.topic, 5) {
for _, peer := range t.cache.GetPeersRange(t.topic, 5) { log.Debug("adding a peer from cache", "peer", peer)
log.Debug("adding a peer from cache", "peer", peer) found <- peer
found <- peer
}
} }
t.discWG.Add(1) t.discWG.Add(1)

View File

@ -39,7 +39,9 @@ func (s *TopicPoolSuite) SetupTest() {
s.Require().NoError(s.peer.Start()) s.Require().NoError(s.peer.Start())
topic := discv5.Topic("cap=cap1") topic := discv5.Topic("cap=cap1")
limits := params.NewLimits(1, 2) limits := params.NewLimits(1, 2)
s.topicPool = NewTopicPool(topic, limits, 100*time.Millisecond, 200*time.Millisecond) cache, err := newInMemoryCache()
s.Require().NoError(err)
s.topicPool = NewTopicPool(topic, limits, 100*time.Millisecond, 200*time.Millisecond, cache)
s.topicPool.running = 1 s.topicPool.running = 1
// This is a buffered channel to simplify testing. // This is a buffered channel to simplify testing.
// If your test generates more than 10 mode changes, // If your test generates more than 10 mode changes,