463 lines
12 KiB
Go
463 lines
12 KiB
Go
package peers
|
|
|
|
import (
|
|
"crypto/ecdsa"
|
|
"errors"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/ethereum/go-ethereum/event"
|
|
"github.com/ethereum/go-ethereum/p2p"
|
|
"github.com/ethereum/go-ethereum/p2p/discv5"
|
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
|
|
|
"github.com/status-im/status-go/common"
|
|
"github.com/status-im/status-go/discovery"
|
|
"github.com/status-im/status-go/logutils"
|
|
"github.com/status-im/status-go/params"
|
|
"github.com/status-im/status-go/peers/verifier"
|
|
"github.com/status-im/status-go/signal"
|
|
)
|
|
|
|
var (
|
|
// ErrDiscv5NotRunning returned when pool is started but discover v5 is not running or not enabled.
|
|
ErrDiscv5NotRunning = errors.New("Discovery v5 is not running")
|
|
)
|
|
|
|
// PoolEvent is a type used to for peer pool events.
|
|
type PoolEvent string
|
|
|
|
const (
|
|
immediately = 0 * time.Minute
|
|
// expirationPeriod is an amount of time while peer is considered as a connectable
|
|
expirationPeriod = 60 * time.Minute
|
|
// discoveryRestartTimeout defines how often loop will try to start discovery server
|
|
discoveryRestartTimeout = 2 * time.Second
|
|
// DefaultFastSync is a recommended value for aggressive peers search.
|
|
DefaultFastSync = 3 * time.Second
|
|
// DefaultSlowSync is a recommended value for slow (background) peers search.
|
|
DefaultSlowSync = 30 * time.Second
|
|
// DefaultDiscV5Timeout is a timeout after which Discv5 is stopped.
|
|
DefaultDiscV5Timeout = 3 * time.Minute
|
|
// DefaultTopicFastModeTimeout is a timeout after which sync mode is switched to slow mode.
|
|
DefaultTopicFastModeTimeout = 30 * time.Second
|
|
// DefaultTopicStopSearchDelay is the default delay when stopping a topic search.
|
|
DefaultTopicStopSearchDelay = 10 * time.Second
|
|
)
|
|
|
|
// Options is a struct with PeerPool configuration.
|
|
type Options struct {
|
|
FastSync time.Duration
|
|
SlowSync time.Duration
|
|
// After this time, Discovery is stopped even if max peers is not reached.
|
|
DiscServerTimeout time.Duration
|
|
// AllowStop allows stopping Discovery when reaching max peers or after timeout.
|
|
AllowStop bool
|
|
// TopicStopSearchDelay time stopSearch will be waiting for max cached peers to be
|
|
// filled before really stopping the search.
|
|
TopicStopSearchDelay time.Duration
|
|
// TrustedMailServers is a list of trusted nodes.
|
|
TrustedMailServers []enode.ID
|
|
}
|
|
|
|
// NewDefaultOptions returns a struct with default Options.
|
|
func NewDefaultOptions() *Options {
|
|
return &Options{
|
|
FastSync: DefaultFastSync,
|
|
SlowSync: DefaultSlowSync,
|
|
DiscServerTimeout: DefaultDiscV5Timeout,
|
|
AllowStop: false,
|
|
TopicStopSearchDelay: DefaultTopicStopSearchDelay,
|
|
}
|
|
}
|
|
|
|
type peerInfo struct {
|
|
// discoveredTime last time when node was found by v5
|
|
discoveredTime time.Time
|
|
// dismissed is true when our node requested a disconnect
|
|
dismissed bool
|
|
// added is true when the node tries to add this peer to a server
|
|
added bool
|
|
|
|
node *discv5.Node
|
|
// store public key separately to make peerInfo more independent from discv5
|
|
publicKey *ecdsa.PublicKey
|
|
}
|
|
|
|
func (p *peerInfo) NodeID() enode.ID {
|
|
return enode.PubkeyToIDV4(p.publicKey)
|
|
}
|
|
|
|
// PeerPool manages discovered peers and connects them to p2p server
|
|
type PeerPool struct {
|
|
opts *Options
|
|
|
|
discovery discovery.Discovery
|
|
|
|
// config can be set only once per pool life cycle
|
|
config map[discv5.Topic]params.Limits
|
|
cache *Cache
|
|
|
|
mu sync.RWMutex
|
|
timeoutMu sync.RWMutex
|
|
topics []TopicPoolInterface
|
|
serverSubscription event.Subscription
|
|
events chan *p2p.PeerEvent
|
|
quit chan struct{}
|
|
wg sync.WaitGroup
|
|
timeout <-chan time.Time
|
|
updateTopic chan *updateTopicRequest
|
|
}
|
|
|
|
// NewPeerPool creates instance of PeerPool
|
|
func NewPeerPool(discovery discovery.Discovery, config map[discv5.Topic]params.Limits, cache *Cache, options *Options) *PeerPool {
|
|
return &PeerPool{
|
|
opts: options,
|
|
discovery: discovery,
|
|
config: config,
|
|
cache: cache,
|
|
}
|
|
}
|
|
|
|
func (p *PeerPool) setDiscoveryTimeout() {
|
|
p.timeoutMu.Lock()
|
|
defer p.timeoutMu.Unlock()
|
|
if p.opts.AllowStop && p.opts.DiscServerTimeout > 0 {
|
|
p.timeout = time.After(p.opts.DiscServerTimeout)
|
|
}
|
|
}
|
|
|
|
// Start creates topic pool for each topic in config and subscribes to server events.
|
|
func (p *PeerPool) Start(server *p2p.Server) error {
|
|
if !p.discovery.Running() {
|
|
return ErrDiscv5NotRunning
|
|
}
|
|
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
// init channels
|
|
p.quit = make(chan struct{})
|
|
p.updateTopic = make(chan *updateTopicRequest)
|
|
p.setDiscoveryTimeout()
|
|
|
|
// subscribe to peer events
|
|
p.events = make(chan *p2p.PeerEvent, 20)
|
|
p.serverSubscription = server.SubscribeEvents(p.events)
|
|
p.wg.Add(1)
|
|
go func() {
|
|
defer common.LogOnPanic()
|
|
p.handleServerPeers(server, p.events)
|
|
p.wg.Done()
|
|
}()
|
|
|
|
// collect topics and start searching for nodes
|
|
p.topics = make([]TopicPoolInterface, 0, len(p.config))
|
|
for topic, limits := range p.config {
|
|
var topicPool TopicPoolInterface
|
|
t := newTopicPool(p.discovery, topic, limits, p.opts.SlowSync, p.opts.FastSync, p.cache)
|
|
if topic == MailServerDiscoveryTopic {
|
|
v, err := p.initVerifier()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
topicPool = newCacheOnlyTopicPool(t, v)
|
|
} else {
|
|
topicPool = t
|
|
}
|
|
if err := topicPool.StartSearch(server); err != nil {
|
|
return err
|
|
}
|
|
p.topics = append(p.topics, topicPool)
|
|
}
|
|
|
|
// discovery must be already started when pool is started
|
|
signal.SendDiscoveryStarted()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *PeerPool) initVerifier() (v Verifier, err error) {
|
|
return verifier.NewLocalVerifier(p.opts.TrustedMailServers), nil
|
|
}
|
|
|
|
func (p *PeerPool) startDiscovery() error {
|
|
if p.discovery.Running() {
|
|
return nil
|
|
}
|
|
|
|
if err := p.discovery.Start(); err != nil {
|
|
return err
|
|
}
|
|
|
|
p.mu.Lock()
|
|
p.setDiscoveryTimeout()
|
|
p.mu.Unlock()
|
|
|
|
signal.SendDiscoveryStarted()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *PeerPool) stopDiscovery(server *p2p.Server) {
|
|
if !p.discovery.Running() {
|
|
return
|
|
}
|
|
|
|
if err := p.discovery.Stop(); err != nil {
|
|
logutils.ZapLogger().Error("discovery errored when stopping", zap.Error(err))
|
|
}
|
|
for _, t := range p.topics {
|
|
t.StopSearch(server)
|
|
}
|
|
|
|
p.timeoutMu.Lock()
|
|
p.timeout = nil
|
|
p.timeoutMu.Unlock()
|
|
|
|
signal.SendDiscoveryStopped()
|
|
}
|
|
|
|
// restartDiscovery and search for topics that have peer count below min
|
|
func (p *PeerPool) restartDiscovery(server *p2p.Server) error {
|
|
if !p.discovery.Running() {
|
|
if err := p.startDiscovery(); err != nil {
|
|
return err
|
|
}
|
|
logutils.ZapLogger().Debug("restarted discovery from peer pool")
|
|
}
|
|
for _, t := range p.topics {
|
|
if !t.BelowMin() || t.SearchRunning() {
|
|
continue
|
|
}
|
|
err := t.StartSearch(server)
|
|
if err != nil {
|
|
logutils.ZapLogger().Error("search failed to start", zap.Error(err))
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// handleServerPeers watches server peer events, notifies topic pools about changes
|
|
// in the peer set and stops the discv5 if all topic pools collected enough peers.
|
|
//
|
|
// @TODO(adam): split it into peers and discovery management loops. This should
|
|
// simplify the whole logic and allow to remove `timeout` field from `PeerPool`.
|
|
func (p *PeerPool) handleServerPeers(server *p2p.Server, events <-chan *p2p.PeerEvent) {
|
|
retryDiscv5 := make(chan struct{}, 1)
|
|
stopDiscv5 := make(chan struct{}, 1)
|
|
|
|
queueRetry := func(d time.Duration) {
|
|
go func() {
|
|
defer common.LogOnPanic()
|
|
time.Sleep(d)
|
|
select {
|
|
case retryDiscv5 <- struct{}{}:
|
|
default:
|
|
}
|
|
}()
|
|
|
|
}
|
|
|
|
queueStop := func() {
|
|
go func() {
|
|
defer common.LogOnPanic()
|
|
select {
|
|
case stopDiscv5 <- struct{}{}:
|
|
default:
|
|
}
|
|
}()
|
|
|
|
}
|
|
|
|
for {
|
|
// We use a separate lock for timeout, as this loop should
|
|
// always be running, otherwise the p2p.Server will hang.
|
|
// Because the handler of events might potentially hang on the
|
|
// server, deadlocking if this loop is waiting for the global lock.
|
|
// NOTE: this code probably needs to be refactored and simplified
|
|
// as it's difficult to follow the asynchronous nature of it.
|
|
p.timeoutMu.RLock()
|
|
timeout := p.timeout
|
|
p.timeoutMu.RUnlock()
|
|
|
|
select {
|
|
case <-p.quit:
|
|
logutils.ZapLogger().Debug("stopping DiscV5 because of quit")
|
|
p.stopDiscovery(server)
|
|
return
|
|
case <-timeout:
|
|
logutils.ZapLogger().Info("DiscV5 timed out")
|
|
p.stopDiscovery(server)
|
|
case <-retryDiscv5:
|
|
if err := p.restartDiscovery(server); err != nil {
|
|
logutils.ZapLogger().Error("starting discv5 failed", zap.Duration("retry", discoveryRestartTimeout), zap.Error(err))
|
|
queueRetry(discoveryRestartTimeout)
|
|
}
|
|
case <-stopDiscv5:
|
|
p.handleStopTopics(server)
|
|
case req := <-p.updateTopic:
|
|
if p.updateTopicLimits(server, req) == nil {
|
|
if !p.discovery.Running() {
|
|
queueRetry(immediately)
|
|
}
|
|
}
|
|
case event := <-events:
|
|
// NOTE: handlePeerEventType needs to be called asynchronously
|
|
// as it publishes on the <-events channel, leading to a deadlock
|
|
// if events channel is full.
|
|
go p.handlePeerEventType(server, event, queueRetry, queueStop)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *PeerPool) handlePeerEventType(server *p2p.Server, event *p2p.PeerEvent, queueRetry func(time.Duration), queueStop func()) {
|
|
defer common.LogOnPanic()
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
var shouldRetry bool
|
|
var shouldStop bool
|
|
switch event.Type {
|
|
case p2p.PeerEventTypeDrop:
|
|
logutils.ZapLogger().Debug("confirm peer dropped", zap.Stringer("ID", event.Peer))
|
|
if p.handleDroppedPeer(server, event.Peer) {
|
|
shouldRetry = true
|
|
}
|
|
case p2p.PeerEventTypeAdd: // skip other events
|
|
logutils.ZapLogger().Debug("confirm peer added", zap.Stringer("ID", event.Peer))
|
|
p.handleAddedPeer(server, event.Peer)
|
|
shouldStop = true
|
|
default:
|
|
return
|
|
}
|
|
|
|
// First we send the discovery summary
|
|
SendDiscoverySummary(server.PeersInfo())
|
|
|
|
// then we send the stop event
|
|
if shouldRetry {
|
|
queueRetry(immediately)
|
|
} else if shouldStop {
|
|
queueStop()
|
|
}
|
|
}
|
|
|
|
// handleAddedPeer notifies all topics about added peer.
|
|
func (p *PeerPool) handleAddedPeer(server *p2p.Server, nodeID enode.ID) {
|
|
for _, t := range p.topics {
|
|
t.ConfirmAdded(server, nodeID)
|
|
if p.opts.AllowStop && t.MaxReached() {
|
|
t.setStopSearchTimeout(p.opts.TopicStopSearchDelay)
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleStopTopics stops the search on any topics having reached its max cached
|
|
// limit or its delay stop is expired, additionally will stop discovery if all
|
|
// peers are stopped.
|
|
func (p *PeerPool) handleStopTopics(server *p2p.Server) {
|
|
if !p.opts.AllowStop {
|
|
return
|
|
}
|
|
for _, t := range p.topics {
|
|
if t.readyToStopSearch() {
|
|
t.StopSearch(server)
|
|
}
|
|
}
|
|
if p.allTopicsStopped() {
|
|
logutils.ZapLogger().Debug("closing discv5 connection because all topics reached max limit")
|
|
p.stopDiscovery(server)
|
|
}
|
|
}
|
|
|
|
// allTopicsStopped returns true if all topics are stopped.
|
|
func (p *PeerPool) allTopicsStopped() (all bool) {
|
|
if !p.opts.AllowStop {
|
|
return false
|
|
}
|
|
all = true
|
|
for _, t := range p.topics {
|
|
if !t.isStopped() {
|
|
all = false
|
|
}
|
|
}
|
|
return all
|
|
}
|
|
|
|
// handleDroppedPeer notifies every topic about dropped peer and returns true if any peer have connections
|
|
// below min limit
|
|
func (p *PeerPool) handleDroppedPeer(server *p2p.Server, nodeID enode.ID) (any bool) {
|
|
for _, t := range p.topics {
|
|
confirmed := t.ConfirmDropped(server, nodeID)
|
|
if confirmed {
|
|
newPeer := t.AddPeerFromTable(server)
|
|
if newPeer != nil {
|
|
logutils.ZapLogger().Debug("added peer from local table", zap.Stringer("ID", newPeer.ID))
|
|
}
|
|
}
|
|
logutils.ZapLogger().Debug("search", zap.String("topic", string(t.Topic())), zap.Bool("below min", t.BelowMin()))
|
|
if t.BelowMin() && !t.SearchRunning() {
|
|
any = true
|
|
}
|
|
}
|
|
return any
|
|
}
|
|
|
|
// Stop closes pool quit channel and all channels that are watched by search queries
|
|
// and waits till all goroutines will exit.
|
|
func (p *PeerPool) Stop() {
|
|
// pool wasn't started
|
|
if p.quit == nil {
|
|
return
|
|
}
|
|
select {
|
|
case <-p.quit:
|
|
return
|
|
default:
|
|
logutils.ZapLogger().Debug("started closing peer pool")
|
|
close(p.quit)
|
|
}
|
|
p.serverSubscription.Unsubscribe()
|
|
p.wg.Wait()
|
|
}
|
|
|
|
type updateTopicRequest struct {
|
|
Topic string
|
|
Limits params.Limits
|
|
}
|
|
|
|
// UpdateTopic updates the pre-existing TopicPool limits.
|
|
func (p *PeerPool) UpdateTopic(topic string, limits params.Limits) error {
|
|
if _, err := p.getTopic(topic); err != nil {
|
|
return err
|
|
}
|
|
|
|
p.updateTopic <- &updateTopicRequest{
|
|
Topic: topic,
|
|
Limits: limits,
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *PeerPool) updateTopicLimits(server *p2p.Server, req *updateTopicRequest) error {
|
|
t, err := p.getTopic(req.Topic)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
t.SetLimits(req.Limits)
|
|
return nil
|
|
}
|
|
|
|
func (p *PeerPool) getTopic(topic string) (TopicPoolInterface, error) {
|
|
for _, t := range p.topics {
|
|
if t.Topic() == discv5.Topic(topic) {
|
|
return t, nil
|
|
}
|
|
}
|
|
return nil, errors.New("topic not found")
|
|
}
|