Peer pool to manage v5 peers (#736)

This commit is contained in:
Dmitry Shulyak 2018-04-10 09:44:09 +03:00 committed by GitHub
parent a9eb5a7d2b
commit 02309e81e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 911 additions and 10 deletions

2
Gopkg.lock generated
View File

@ -489,6 +489,6 @@
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "a21695ec5b17d68c9524831c9cc9321753525f1e404f17c41f5dbc6b20e0b1db"
inputs-digest = "eff125b3ad51a2875bc0ee5f9d190ce65ee9fb774e79d8df0de7a7e4bf4e32f8"
solver-name = "gps-cdcl"
solver-version = 1

View File

@ -12,6 +12,7 @@ import (
"strings"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/status-im/status-go/cmd/statusd/debug"
"github.com/status-im/status-go/geth/api"
"github.com/status-im/status-go/geth/node"
@ -72,6 +73,10 @@ var (
firebaseAuth = flag.String("shh.firebaseauth", "", "FCM Authorization Key used for sending Push Notifications")
syncAndExit = flag.Int("sync-and-exit", -1, "Timeout in minutes for blockchain sync and exit, zero means no timeout unless sync is finished")
// Topics that will be search and registered by discovery v5.
searchTopics = topicLimitsFlag{}
registerTopics = topicsFlag{}
)
// All general log messages in this package should be routed through this logger.
@ -104,6 +109,9 @@ func enhanceLogger(logger *log.Logger, config *params.NodeConfig) error {
}
func main() {
flag.Var(&searchTopics, "topic.search", "Topic that will be searched in discovery v5, e.g (mailserver=1,1)")
flag.Var(&registerTopics, "topic.register", "Topic that will be registered using discovery v5.")
flag.Usage = printUsage
flag.Parse()
@ -269,6 +277,8 @@ func makeNodeConfig() (*params.NodeConfig, error) {
}
nodeConfig.Discovery = *discovery
nodeConfig.RequireTopics = map[discv5.Topic]params.Limits(searchTopics)
nodeConfig.RegisterTopics = []discv5.Topic(registerTopics)
// Even if standalone is true and discovery is disabled,
// it's possible to use bootnodes.

48
cmd/statusd/topics.go Normal file
View File

@ -0,0 +1,48 @@
package main
import (
"errors"
"strconv"
"strings"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/status-im/status-go/geth/params"
)
type topicsFlag []discv5.Topic
func (f *topicsFlag) String() string {
return "discv5 topics"
}
func (f *topicsFlag) Set(value string) error {
*f = append(*f, discv5.Topic(strings.TrimSpace(value)))
return nil
}
type topicLimitsFlag map[discv5.Topic]params.Limits
func (f *topicLimitsFlag) String() string {
return "disv5 topics to limits map"
}
func (f *topicLimitsFlag) Set(value string) error {
parts := strings.Split(strings.TrimSpace(value), "=")
if len(parts) != 2 {
return errors.New("topic must be separated by '=' from limits, e.g. 'topic1=1,1'")
}
limits := strings.Split(parts[1], ",")
if len(limits) != 2 {
return errors.New("min and max limit must be set, e.g. 'topic1=1,1'")
}
minL, err := strconv.Atoi(limits[0])
if err != nil {
return err
}
maxL, err := strconv.Atoi(limits[1])
if err != nil {
return err
}
(*f)[discv5.Topic(parts[0])] = params.Limits{minL, maxL}
return nil
}

22
geth/db/db.go Normal file
View File

@ -0,0 +1,22 @@
package db
import (
"github.com/ethereum/go-ethereum/log"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/opt"
)
// Create returns status pointer to leveldb.DB.
func Create(path string) (*leveldb.DB, error) {
opts := &opt.Options{OpenFilesCacheCapacity: 5}
db, err := leveldb.OpenFile(path, opts)
if _, iscorrupted := err.(*errors.ErrCorrupted); iscorrupted {
log.Info("database is corrupted trying to recover", "path", path)
db, err = leveldb.RecoverFile(path, nil)
}
if err != nil {
return nil, err
}
return db, err
}

View File

@ -18,6 +18,7 @@ import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/ethereum/go-ethereum/p2p/nat"
"github.com/ethereum/go-ethereum/whisper/mailserver"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
@ -94,14 +95,12 @@ func defaultEmbeddedNodeConfig(config *params.NodeConfig) *node.Config {
Name: config.Name,
Version: config.Version,
P2P: p2p.Config{
NoDiscovery: !config.Discovery,
DiscoveryV5: config.Discovery,
BootstrapNodes: nil,
BootstrapNodesV5: nil,
ListenAddr: config.ListenAddr,
NAT: nat.Any(),
MaxPeers: config.MaxPeers,
MaxPendingPeers: config.MaxPendingPeers,
NoDiscovery: true,
DiscoveryV5: config.Discovery,
ListenAddr: config.ListenAddr,
NAT: nat.Any(),
MaxPeers: config.MaxPeers,
MaxPendingPeers: config.MaxPendingPeers,
},
IPCPath: makeIPCPath(config),
HTTPCors: []string{"*"},
@ -120,7 +119,7 @@ func defaultEmbeddedNodeConfig(config *params.NodeConfig) *node.Config {
if config.ClusterConfig.Enabled {
nc.P2P.StaticNodes = parseNodes(config.ClusterConfig.StaticNodes)
nc.P2P.BootstrapNodes = parseNodes(config.ClusterConfig.BootNodes)
nc.P2P.BootstrapNodesV5 = parseNodesV5(config.ClusterConfig.BootNodes)
}
return nc
@ -229,3 +228,12 @@ func parseNodes(enodes []string) []*discover.Node {
}
return nodes
}
// parseNodesV5 creates list of discv5.Node out of enode strings.
func parseNodesV5(enodes []string) []*discv5.Node {
nodes := make([]*discv5.Node, len(enodes))
for i, enode := range enodes {
nodes[i] = discv5.MustParseNode(enode)
}
return nodes
}

View File

@ -16,9 +16,12 @@ import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/discover"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
"github.com/syndtr/goleveldb/leveldb"
"github.com/status-im/status-go/geth/db"
"github.com/status-im/status-go/geth/mailservice"
"github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/peers"
"github.com/status-im/status-go/geth/rpc"
)
@ -46,6 +49,10 @@ type StatusNode struct {
config *params.NodeConfig // Status node configuration
gethNode *node.Node // reference to Geth P2P stack/node
register *peers.Register
peerPool *peers.PeerPool
db *leveldb.DB
rpcClient *rpc.Client // reference to RPC client
log log.Logger
}
@ -97,9 +104,33 @@ func (n *StatusNode) start(config *params.NodeConfig) error {
n.log.Error("Failed to create an RPC client", "error", err)
return RPCClientError(err)
}
if ethNode.Server().DiscV5 != nil {
return n.startPeerPool()
}
return nil
}
func (n *StatusNode) startPeerPool() error {
statusDB, err := db.Create(filepath.Join(n.config.DataDir, params.StatusDatabase))
if err != nil {
return err
}
n.db = statusDB
n.register = peers.NewRegister(n.config.RegisterTopics...)
// TODO(dshulyak) consider adding a flag to define this behaviour
stopOnMax := len(n.config.RegisterTopics) == 0
n.peerPool = peers.NewPeerPool(n.config.RequireTopics,
peers.DefaultFastSync,
peers.DefaultSlowSync,
peers.NewCache(n.db),
stopOnMax,
)
if err := n.register.Start(n.gethNode.Server()); err != nil {
return err
}
return n.peerPool.Start(n.gethNode.Server())
}
// Stop will stop current StatusNode. A stopped node cannot be resumed.
func (n *StatusNode) Stop() error {
n.mu.Lock()
@ -112,6 +143,9 @@ func (n *StatusNode) stop() error {
if err := n.isAvailable(); err != nil {
return err
}
if n.gethNode.Server().DiscV5 != nil {
n.stopPeerPool()
}
if err := n.gethNode.Stop(); err != nil {
return err
}
@ -121,6 +155,14 @@ func (n *StatusNode) stop() error {
return nil
}
func (n *StatusNode) stopPeerPool() {
n.register.Stop()
n.peerPool.Stop()
if err := n.db.Close(); err != nil {
n.log.Error("error closing status db", "error", err)
}
}
// ResetChainData removes chain data if node is not running.
func (n *StatusNode) ResetChainData(config *params.NodeConfig) error {
if n.IsRunning() {

View File

@ -12,6 +12,7 @@ import (
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/status-im/status-go/static"
)
@ -179,6 +180,9 @@ func (c *ClusterConfig) String() string {
return string(data)
}
// Limits represent min and max amount of peers
type Limits [2]int
// ----------
// UpstreamRPCConfig
// ----------
@ -300,6 +304,9 @@ type NodeConfig struct {
// SwarmConfig extra configuration for Swarm and ENS
SwarmConfig *SwarmConfig `json:"SwarmConfig," validate:"structonly"`
RegisterTopics []discv5.Topic `json:"RegisterTopics"`
RequireTopics map[discv5.Topic]Limits `json:"RequireTopics"`
}
// NewNodeConfig creates new node configuration object

View File

@ -7,6 +7,9 @@ const (
// DataDir is default data directory used by statusd executable
DataDir = "statusd-data"
// StatusDatabase path relative to DataDir.
StatusDatabase = "status-db"
// KeyStoreDir is default directory where private keys are stored, relative to DataDir
KeyStoreDir = "keystore"

62
geth/peers/cache.go Normal file
View File

@ -0,0 +1,62 @@
package peers
import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/util"
)
// NewCache returns instance of PeersDatabase
func NewCache(db *leveldb.DB) *Cache {
return &Cache{db: db}
}
// Cache maintains list of peers that were discovered.
type Cache struct {
db *leveldb.DB
}
func makePeerKey(peerID discv5.NodeID, topic discv5.Topic) []byte {
topicLen := len([]byte(topic))
lth := topicLen + len(peerID)
key := make([]byte, lth)
copy(key[:], topic[:])
copy(key[topicLen:], peerID[:])
return key
}
// AddPeer stores peer with a following key: <topic><peer ID>
func (d *Cache) AddPeer(peer *discv5.Node, topic discv5.Topic) error {
data, err := peer.MarshalText()
if err != nil {
return err
}
return d.db.Put(makePeerKey(peer.ID, topic), data, nil)
}
// RemovePeer deletes a peer from database.
func (d *Cache) RemovePeer(peerID discv5.NodeID, topic discv5.Topic) error {
return d.db.Delete(makePeerKey(peerID, topic), nil)
}
// GetPeersRange returns peers for a given topic with a limit.
func (d *Cache) GetPeersRange(topic discv5.Topic, limit int) (nodes []*discv5.Node) {
topicLen := len([]byte(topic))
key := make([]byte, topicLen)
copy(key[:], []byte(topic))
iterator := d.db.NewIterator(&util.Range{Start: key}, nil)
defer iterator.Release()
count := 0
for iterator.Next() && count < limit {
node := discv5.Node{}
value := iterator.Value()
if err := node.UnmarshalText(value); err != nil {
log.Error("can't unmarshal node", "value", value, "error", err)
continue
}
nodes = append(nodes, &node)
count++
}
return nodes
}

48
geth/peers/cache_test.go Normal file
View File

@ -0,0 +1,48 @@
package peers
import (
"io/ioutil"
"net"
"os"
"testing"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/status-im/status-go/geth/db"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestPeersRange(t *testing.T) {
path, err := ioutil.TempDir("/tmp", "status-peers-test-")
require.NoError(t, err)
defer func() {
require.NoError(t, os.RemoveAll(path))
}()
rootDB, err := db.Create(path)
require.NoError(t, err)
defer func() {
assert.NoError(t, rootDB.Close())
}()
peersDB := Cache{db: rootDB}
topic := discv5.Topic("test")
peers := [3]*discv5.Node{
discv5.NewNode(discv5.NodeID{3}, net.IPv4(100, 100, 0, 3), 32311, 32311),
discv5.NewNode(discv5.NodeID{4}, net.IPv4(100, 100, 0, 4), 32311, 32311),
discv5.NewNode(discv5.NodeID{2}, net.IPv4(100, 100, 0, 2), 32311, 32311),
}
for _, peer := range peers {
assert.NoError(t, peersDB.AddPeer(peer, topic))
}
nodes := peersDB.GetPeersRange(topic, 3)
require.Len(t, nodes, 3)
// object will be ordered by memcpy order of bytes 2,3,4 in our case
// order of tests is intentionally mixed to make it obvious that range is
// not ordered by the insertion time
assert.Equal(t, peers[2].String(), nodes[0].String())
assert.Equal(t, peers[0].String(), nodes[1].String())
assert.Equal(t, peers[1].String(), nodes[2].String())
assert.NoError(t, peersDB.RemovePeer(peers[1].ID, topic))
require.Len(t, peersDB.GetPeersRange(topic, 3), 2)
}

150
geth/peers/peerpool.go Normal file
View File

@ -0,0 +1,150 @@
package peers
import (
"errors"
"sync"
"time"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/status-im/status-go/geth/params"
)
var (
// ErrDiscv5NotRunning returned when pool is started but discover v5 is not running or not enabled.
ErrDiscv5NotRunning = errors.New("Discovery v5 is not running")
)
const (
// expirationPeriod is an amount of time while peer is considered as a connectable
expirationPeriod = 60 * time.Minute
// DefaultFastSync is a recommended value for aggressive peers search.
DefaultFastSync = 3 * time.Second
// DefaultSlowSync is a recommended value for slow (background) peers search.
DefaultSlowSync = 30 * time.Minute
)
// NewPeerPool creates instance of PeerPool
func NewPeerPool(config map[discv5.Topic]params.Limits, fastSync, slowSync time.Duration, cache *Cache, stopOnMax bool) *PeerPool {
return &PeerPool{
config: config,
fastSync: fastSync,
slowSync: slowSync,
cache: cache,
stopOnMax: stopOnMax,
}
}
type peerInfo struct {
// discoveredTime last time when node was found by v5
discoveredTime mclock.AbsTime
// connected is true if node is added as a static peer
connected bool
node *discv5.Node
}
// PeerPool manages discovered peers and connects them to p2p server
type PeerPool struct {
// config can be set only once per pool life cycle
config map[discv5.Topic]params.Limits
fastSync time.Duration
slowSync time.Duration
cache *Cache
stopOnMax bool
mu sync.RWMutex
topics []*TopicPool
serverSubscription event.Subscription
quit chan struct{}
wg sync.WaitGroup
}
// Start creates topic pool for each topic in config and subscribes to server events.
func (p *PeerPool) Start(server *p2p.Server) error {
if server.DiscV5 == nil {
return ErrDiscv5NotRunning
}
p.mu.Lock()
defer p.mu.Unlock()
p.quit = make(chan struct{})
p.topics = make([]*TopicPool, 0, len(p.config))
for topic, limits := range p.config {
topicPool := NewTopicPool(topic, limits, p.slowSync, p.fastSync)
if err := topicPool.StartSearch(server); err != nil {
return err
}
p.topics = append(p.topics, topicPool)
}
events := make(chan *p2p.PeerEvent, 20)
p.serverSubscription = server.SubscribeEvents(events)
p.wg.Add(1)
go func() {
p.handleServerPeers(server, events)
p.wg.Done()
}()
return nil
}
// handleServerPeers watches server peer events, notifies topic pools about changes
// in the peer set and stops the discv5 if all topic pools collected enough peers.
func (p *PeerPool) handleServerPeers(server *p2p.Server, events <-chan *p2p.PeerEvent) {
for {
select {
case <-p.quit:
return
case event := <-events:
switch event.Type {
case p2p.PeerEventTypeDrop:
p.mu.Lock()
for _, t := range p.topics {
t.ConfirmDropped(server, event.Peer, event.Error)
// TODO(dshulyak) restart discv5 if peers number dropped too low
}
p.mu.Unlock()
case p2p.PeerEventTypeAdd:
p.mu.Lock()
total := 0
for _, t := range p.topics {
t.ConfirmAdded(server, event.Peer)
if p.stopOnMax && t.MaxReached() {
total++
t.StopSearch()
}
}
if p.stopOnMax && total == len(p.config) {
log.Debug("closing discv5 connection")
server.DiscV5.Close()
}
p.mu.Unlock()
}
}
}
}
// Stop closes pool quit channel and all channels that are watched by search queries
// and waits till all goroutines will exit.
func (p *PeerPool) Stop() {
// pool wasn't started
if p.quit == nil {
return
}
select {
case <-p.quit:
return
default:
log.Debug("started closing peer pool")
close(p.quit)
}
p.serverSubscription.Unsubscribe()
for _, t := range p.topics {
t.StopSearch()
}
p.wg.Wait()
}

108
geth/peers/peerpool_test.go Normal file
View File

@ -0,0 +1,108 @@
package peers
import (
"fmt"
"net"
"strconv"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/stretchr/testify/suite"
"github.com/status-im/status-go/geth/params"
)
type PeerPoolSimulationSuite struct {
suite.Suite
bootnode *p2p.Server
peers []*p2p.Server
}
func TestPeerPoolSimulationSuite(t *testing.T) {
suite.Run(t, new(PeerPoolSimulationSuite))
}
func (s *PeerPoolSimulationSuite) SetupTest() {
port := 33731
key, _ := crypto.GenerateKey()
name := common.MakeName("bootnode", "1.0")
// 127.0.0.1 is invalidated by discovery v5
s.bootnode = &p2p.Server{
Config: p2p.Config{
MaxPeers: 10,
Name: name,
ListenAddr: fmt.Sprintf("0.0.0.0:%d", 33731),
PrivateKey: key,
DiscoveryV5: true,
NoDiscovery: true,
},
}
port++
s.Require().NoError(s.bootnode.Start())
bootnodeV5 := discv5.NewNode(s.bootnode.DiscV5.Self().ID, net.ParseIP("127.0.0.1"), uint16(port), uint16(port))
s.peers = make([]*p2p.Server, 2)
for i := range s.peers {
key, _ := crypto.GenerateKey()
peer := &p2p.Server{
Config: p2p.Config{
MaxPeers: 10,
Name: common.MakeName("peer-"+strconv.Itoa(i), "1.0"),
ListenAddr: fmt.Sprintf("0.0.0.0:%d", port),
PrivateKey: key,
DiscoveryV5: true,
NoDiscovery: true,
BootstrapNodesV5: []*discv5.Node{bootnodeV5},
},
}
port++
s.NoError(peer.Start())
s.peers[i] = peer
}
}
func (s *PeerPoolSimulationSuite) TestSingleTopicDiscovery() {
topic := discv5.Topic("cap=test")
expectedConnections := 1
// simulation should only rely on fast sync
config := map[discv5.Topic]params.Limits{
topic: {expectedConnections, expectedConnections},
}
peerPool := NewPeerPool(config, 100*time.Millisecond, 100*time.Millisecond, nil, false)
register := NewRegister(topic)
s.Require().NoError(register.Start(s.peers[0]))
defer register.Stop()
// need to wait for topic to get registered, discv5 can query same node
// for a topic only once a minute
events := make(chan *p2p.PeerEvent, 20)
subscription := s.peers[1].SubscribeEvents(events)
defer subscription.Unsubscribe()
s.NoError(peerPool.Start(s.peers[1]))
defer peerPool.Stop()
connected := 0
for {
select {
case ev := <-events:
if ev.Type == p2p.PeerEventTypeAdd {
connected++
}
case <-time.After(5 * time.Second):
s.Require().FailNowf("waiting for peers timed out", strconv.Itoa(connected))
}
if connected == expectedConnections {
break
}
}
}
func (s *PeerPoolSimulationSuite) TearDown() {
s.bootnode.Stop()
for _, p := range s.peers {
p.Stop()
}
}

View File

@ -0,0 +1,54 @@
package peers
import (
"sync"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discv5"
)
// Register manages register topic queries
type Register struct {
topics []discv5.Topic
wg sync.WaitGroup
quit chan struct{}
}
// NewRegister creates instance of topic register
func NewRegister(topics ...discv5.Topic) *Register {
return &Register{topics: topics}
}
// Start topic register query for every topic
func (r *Register) Start(server *p2p.Server) error {
if server.DiscV5 == nil {
return ErrDiscv5NotRunning
}
r.quit = make(chan struct{})
for _, topic := range r.topics {
r.wg.Add(1)
go func(t discv5.Topic) {
log.Debug("v5 register topic", "topic", t)
server.DiscV5.RegisterTopic(t, r.quit)
r.wg.Done()
}(topic)
}
return nil
}
// Stop all register topic queries and waits for them to exit
func (r *Register) Stop() {
if r.quit == nil {
return
}
select {
case <-r.quit:
return
default:
close(r.quit)
}
log.Debug("waiting for register queries to exit")
r.wg.Wait()
}

243
geth/peers/topicpool.go Normal file
View File

@ -0,0 +1,243 @@
package peers
import (
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/status-im/status-go/geth/params"
)
// NewTopicPool returns instance of TopicPool
func NewTopicPool(topic discv5.Topic, limits params.Limits, slowSync, fastSync time.Duration) *TopicPool {
return &TopicPool{
topic: topic,
limits: limits,
slowSync: slowSync,
fastSync: fastSync,
peers: map[discv5.NodeID]*peerInfo{},
}
}
// TopicPool manages peers for topic.
type TopicPool struct {
topic discv5.Topic
limits params.Limits
slowSync time.Duration
fastSync time.Duration
quit chan struct{}
running int32
mu sync.RWMutex
discWG sync.WaitGroup
consumerWG sync.WaitGroup
connected int
peers map[discv5.NodeID]*peerInfo
period chan time.Duration
cache *Cache
}
// SearchRunning returns true if search is running
func (t *TopicPool) SearchRunning() bool {
return atomic.LoadInt32(&t.running) == 1
}
// MaxReached returns true if we connected with max number of peers.
func (t *TopicPool) MaxReached() bool {
t.mu.RLock()
defer t.mu.RUnlock()
return t.connected == t.limits[1]
}
// ConfirmAdded called when peer was added by p2p Server.
// 1. Skip a peer if it not in our peer table
// 2. Add a peer to a cache.
// 3. Disconnect a peer if it was connected after we reached max limit of peers.
// (we can't know in advance if peer will be connected, thats why we allow
// to overflow for short duration)
// 4. Switch search to slow mode if it is running.
func (t *TopicPool) ConfirmAdded(server *p2p.Server, nodeID discover.NodeID) {
t.mu.Lock()
defer t.mu.Unlock()
// inbound connection
peer, exist := t.peers[discv5.NodeID(nodeID)]
if !exist {
return
}
// established connection means that the node is a viable candidate for a connection and can be cached
if t.cache != nil {
if err := t.cache.AddPeer(peer.node, t.topic); err != nil {
log.Error("failed to persist a peer", "error", err)
}
}
// when max limit is reached drop every peer after
if t.connected == t.limits[1] {
log.Debug("max limit is reached drop the peer", "ID", nodeID, "topic", t.topic)
t.removePeer(server, peer)
return
}
// don't count same peer twice
if !peer.connected {
log.Debug("marking as connected", "ID", nodeID)
peer.connected = true
t.connected++
}
if t.SearchRunning() && t.connected == t.limits[0] {
t.period <- t.slowSync
}
}
// ConfirmDropped called when server receives drop event.
// 1. Skip peer if it is not in our peer table.
// 2. If disconnect request - we could drop that peer ourselves.
// 3. If connected number will drop below min limit - switch to fast mode.
// 4. Delete a peer from cache and peer table.
// 5. Connect with another valid peer, if such is available.
func (t *TopicPool) ConfirmDropped(server *p2p.Server, nodeID discover.NodeID, reason string) (new bool) {
t.mu.Lock()
defer t.mu.Unlock()
// either inbound or connected from another topic
peer, exist := t.peers[discv5.NodeID(nodeID)]
if !exist {
return false
}
log.Debug("disconnect reason", "peer", nodeID, "reason", reason)
// if requested - we don't need to remove peer from cache and look for a replacement
if reason == p2p.DiscRequested.Error() {
return false
}
if t.SearchRunning() && t.connected == t.limits[0] {
t.period <- t.fastSync
}
t.connected--
t.removePeer(server, peer)
delete(t.peers, discv5.NodeID(nodeID))
if t.cache != nil {
if err := t.cache.RemovePeer(discv5.NodeID(nodeID), t.topic); err != nil {
log.Error("failed to remove peer from cache", "error", err)
}
}
// TODO use a heap queue and always get a peer that was discovered recently
for _, peer := range t.peers {
if !peer.connected && mclock.Now() < peer.discoveredTime+mclock.AbsTime(expirationPeriod) {
t.addPeer(server, peer)
return true
}
}
return false
}
// StartSearch creates discv5 queries and runs a loop to consume found peers.
func (t *TopicPool) StartSearch(server *p2p.Server) error {
if atomic.LoadInt32(&t.running) == 1 {
return nil
}
if server.DiscV5 == nil {
return ErrDiscv5NotRunning
}
t.mu.Lock()
defer t.mu.Unlock()
atomic.StoreInt32(&t.running, 1)
t.quit = make(chan struct{})
t.period = make(chan time.Duration, 2) // 2 allows to send slow and then fast without blocking a producer
found := make(chan *discv5.Node, 5) // 5 reasonable number for concurrently found nodes
lookup := make(chan bool, 10) // sufficiently buffered channel, just prevents blocking because of lookup
if t.cache != nil {
for _, peer := range t.cache.GetPeersRange(t.topic, 5) {
log.Debug("adding a peer from cache", "peer", peer)
found <- peer
}
}
t.discWG.Add(1)
go func() {
server.DiscV5.SearchTopic(t.topic, t.period, found, lookup)
t.discWG.Done()
}()
t.consumerWG.Add(1)
go func() {
t.handleFoundPeers(server, found, lookup)
t.consumerWG.Done()
}()
return nil
}
func (t *TopicPool) handleFoundPeers(server *p2p.Server, found <-chan *discv5.Node, lookup <-chan bool) {
t.period <- t.fastSync
selfID := discv5.NodeID(server.Self().ID)
for {
select {
case <-t.quit:
return
case <-lookup:
case node := <-found:
if node.ID != selfID {
t.processFoundNode(server, node)
}
}
}
}
// processFoundNode called when node is discovered by kademlia search query
// 2 important conditions
// 1. every time when node is processed we need to update discoveredTime.
// peer will be considered as valid later only if it was discovered < 60m ago
// 2. if peer is connected or if max limit is reached we are not a adding peer to p2p server
func (t *TopicPool) processFoundNode(server *p2p.Server, node *discv5.Node) {
t.mu.Lock()
defer t.mu.Unlock()
if info, exist := t.peers[node.ID]; exist {
info.discoveredTime = mclock.Now()
} else {
t.peers[node.ID] = &peerInfo{
discoveredTime: mclock.Now(),
node: node,
}
}
if t.connected < t.limits[1] && !t.peers[node.ID].connected {
log.Debug("peer found", "ID", node.ID, "topic", t.topic)
t.addPeer(server, t.peers[node.ID])
}
}
func (t *TopicPool) addPeer(server *p2p.Server, info *peerInfo) {
server.AddPeer(discover.NewNode(
discover.NodeID(info.node.ID),
info.node.IP,
info.node.UDP,
info.node.TCP,
))
}
func (t *TopicPool) removePeer(server *p2p.Server, info *peerInfo) {
server.RemovePeer(discover.NewNode(
discover.NodeID(info.node.ID),
info.node.IP,
info.node.UDP,
info.node.TCP,
))
}
// StopSearch stops the closes stop
func (t *TopicPool) StopSearch() {
if t.quit == nil {
return
}
select {
case <-t.quit:
return
default:
log.Debug("stoping search", "topic", t.topic)
close(t.quit)
}
t.consumerWG.Wait()
atomic.StoreInt32(&t.running, 0)
close(t.period)
t.discWG.Wait()
}

View File

@ -0,0 +1,96 @@
package peers
import (
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/status-im/status-go/geth/params"
"github.com/stretchr/testify/suite"
)
type TopicPoolSuite struct {
suite.Suite
peer *p2p.Server
topicPool *TopicPool
}
func TestTopicPoolSuite(t *testing.T) {
suite.Run(t, new(TopicPoolSuite))
}
func (s *TopicPoolSuite) SetupTest() {
key, _ := crypto.GenerateKey()
name := common.MakeName("peer", "1.0")
s.peer = &p2p.Server{
Config: p2p.Config{
MaxPeers: 10,
Name: name,
ListenAddr: "0.0.0.0:0",
PrivateKey: key,
NoDiscovery: true,
},
}
s.Require().NoError(s.peer.Start())
topic := discv5.Topic("cap=cap1")
limits := params.Limits{1, 2}
s.topicPool = NewTopicPool(topic, limits, 100*time.Millisecond, 200*time.Millisecond)
s.topicPool.period = make(chan time.Duration, 2)
s.topicPool.running = 1
}
func (s *TopicPoolSuite) TearDown() {
s.peer.Stop()
}
func (s *TopicPoolSuite) AssertConsumed(channel chan time.Duration, expected time.Duration, timeout time.Duration) {
select {
case received := <-channel:
s.Equal(expected, received)
case <-time.After(timeout):
s.FailNow("timed out waiting")
}
}
func (s *TopicPoolSuite) TestSyncSwitches() {
testPeer := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311)
s.topicPool.processFoundNode(s.peer, testPeer)
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(testPeer.ID))
s.AssertConsumed(s.topicPool.period, s.topicPool.slowSync, time.Second)
s.True(s.topicPool.peers[testPeer.ID].connected)
s.topicPool.ConfirmDropped(s.peer, discover.NodeID(testPeer.ID), p2p.DiscProtocolError.Error())
s.AssertConsumed(s.topicPool.period, s.topicPool.fastSync, time.Second)
}
func (s *TopicPoolSuite) TestNewPeerSelectedOnDrop() {
peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311)
peer2 := discv5.NewNode(discv5.NodeID{2}, s.peer.Self().IP, 32311, 32311)
peer3 := discv5.NewNode(discv5.NodeID{3}, s.peer.Self().IP, 32311, 32311)
// add 3 nodes and confirm connection for 1 and 2
s.topicPool.processFoundNode(s.peer, peer1)
s.topicPool.processFoundNode(s.peer, peer2)
s.topicPool.processFoundNode(s.peer, peer3)
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID))
s.True(s.topicPool.peers[peer1.ID].connected)
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer2.ID))
s.True(s.topicPool.peers[peer2.ID].connected)
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer3.ID))
s.False(s.topicPool.peers[peer3.ID].connected)
s.True(s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer1.ID), p2p.DiscNetworkError.Error()))
}
func (s *TopicPoolSuite) TestRequestedDoesntRemove() {
peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311)
s.topicPool.processFoundNode(s.peer, peer1)
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID))
s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer1.ID), p2p.DiscRequested.Error())
s.Contains(s.topicPool.peers, peer1.ID)
s.topicPool.ConfirmDropped(s.peer, discover.NodeID(peer1.ID), p2p.DiscProtocolError.Error())
s.NotContains(s.topicPool.peers, peer1.ID)
}