status-go/peers/peerpool.go
shashankshampi 14dcd29eee test_: Code Migration from status-cli-tests
author shashankshampi <shashank.sanket1995@gmail.com> 1729780155 +0530
committer shashankshampi <shashank.sanket1995@gmail.com> 1730274350 +0530

test: Code Migration from status-cli-tests
fix_: functional tests (#5979)

* fix_: generate on test-functional

* chore(test)_: fix functional test assertion

---------

Co-authored-by: Siddarth Kumar <siddarthkay@gmail.com>

feat(accounts)_: cherry-pick Persist acceptance of Terms of Use & Privacy policy (#5766) (#5977)

* feat(accounts)_: Persist acceptance of Terms of Use & Privacy policy (#5766)

The original GH issue https://github.com/status-im/status-mobile/issues/21113
came from a request from the Legal team. We must show to Status v1 users the new
terms (Terms of Use & Privacy Policy) right after they upgrade to Status v2
from the stores.

The solution we use is to create a flag in the accounts table, named
hasAcceptedTerms. The flag will be set to true on the first account ever
created in v2 and we provide a native call in mobile/status.go#AcceptTerms,
which allows the client to persist the user's choice in case they are upgrading
(from v1 -> v2, or from a v2 older than this PR).

This solution is not the best because we should store the setting in a separate
table, not in the accounts table.

Related Mobile PR https://github.com/status-im/status-mobile/pull/21124

* fix(test)_: Compare addresses using uppercased strings

---------

Co-authored-by: Icaro Motta <icaro.ldm@gmail.com>

test_: restore account (#5960)

feat_: `LogOnPanic` linter (#5969)

* feat_: LogOnPanic linter

* fix_: add missing defer LogOnPanic

* chore_: make vendor

* fix_: tests, address pr comments

* fix_: address pr comments

fix(ci)_: remove workspace and tmp dir

This ensures we do not encounter weird errors like:
```
+ ln -s /home/jenkins/workspace/go_prs_linux_x86_64_main_PR-5907 /home/jenkins/workspace/go_prs_linux_x86_64_main_PR-5907@tmp/go/src/github.com/status-im/status-go
ln: failed to create symbolic link '/home/jenkins/workspace/go_prs_linux_x86_64_main_PR-5907@tmp/go/src/github.com/status-im/status-go': File exists
script returned exit code 1
```

Signed-off-by: Jakub Sokołowski <jakub@status.im>

chore_: enable windows and macos CI build (#5840)

- Added support for Windows and macOS in CI pipelines
- Added missing dependencies for Windows and x86-64-darwin
- Resolved macOS SDK version compatibility for darwin-x86_64

The `mkShell` override was necessary to ensure compatibility with the newer
macOS SDK (version 11.0) for x86_64. The default SDK (10.12) was causing build failures
because of the missing libs and frameworks. OverrideSDK creates a mapping from
the default SDK in all package categories to the requested SDK (11.0).

fix(contacts)_: fix trust status not being saved to cache when changed (#5965)

Fixes https://github.com/status-im/status-desktop/issues/16392

cleanup

added logger and cleanup

review comments changes

fix_: functional tests (#5979)

* fix_: generate on test-functional

* chore(test)_: fix functional test assertion

---------

Co-authored-by: Siddarth Kumar <siddarthkay@gmail.com>

feat(accounts)_: cherry-pick Persist acceptance of Terms of Use & Privacy policy (#5766) (#5977)

* feat(accounts)_: Persist acceptance of Terms of Use & Privacy policy (#5766)

The original GH issue https://github.com/status-im/status-mobile/issues/21113
came from a request from the Legal team. We must show to Status v1 users the new
terms (Terms of Use & Privacy Policy) right after they upgrade to Status v2
from the stores.

The solution we use is to create a flag in the accounts table, named
hasAcceptedTerms. The flag will be set to true on the first account ever
created in v2 and we provide a native call in mobile/status.go#AcceptTerms,
which allows the client to persist the user's choice in case they are upgrading
(from v1 -> v2, or from a v2 older than this PR).

This solution is not the best because we should store the setting in a separate
table, not in the accounts table.

Related Mobile PR https://github.com/status-im/status-mobile/pull/21124

* fix(test)_: Compare addresses using uppercased strings

---------

Co-authored-by: Icaro Motta <icaro.ldm@gmail.com>

test_: restore account (#5960)

feat_: `LogOnPanic` linter (#5969)

* feat_: LogOnPanic linter

* fix_: add missing defer LogOnPanic

* chore_: make vendor

* fix_: tests, address pr comments

* fix_: address pr comments

chore_: enable windows and macos CI build (#5840)

- Added support for Windows and macOS in CI pipelines
- Added missing dependencies for Windows and x86-64-darwin
- Resolved macOS SDK version compatibility for darwin-x86_64

The `mkShell` override was necessary to ensure compatibility with the newer
macOS SDK (version 11.0) for x86_64. The default SDK (10.12) was causing build failures
because of the missing libs and frameworks. OverrideSDK creates a mapping from
the default SDK in all package categories to the requested SDK (11.0).

fix(contacts)_: fix trust status not being saved to cache when changed (#5965)

Fixes https://github.com/status-im/status-desktop/issues/16392

test_: remove port bind

chore(wallet)_: move route execution code to separate module

chore_: replace geth logger with zap logger (#5962)

closes: #6002

feat(telemetry)_: add metrics for message reliability (#5899)

* feat(telemetry)_: track message reliability

Add metrics for dial errors, missed messages,
missed relevant messages, and confirmed delivery.

* fix_: handle error from json marshal

chore_: use zap logger as request logger

iterates: status-im/status-desktop#16536

test_: unique project per run

test_: use docker compose v2, more concrete project name

fix(codecov)_: ignore folders without tests

Otherwise Codecov reports incorrect numbers when making changes.
https://docs.codecov.com/docs/ignoring-paths

Signed-off-by: Jakub Sokołowski <jakub@status.im>

test_: verify schema of signals during init; fix schema verification warnings (#5947)

fix_: update defaultGorushURL (#6011)

fix(tests)_: use non-standard port to avoid conflicts

We have observed `nimbus-eth2` build failures reporting this port:
```json
{
  "lvl": "NTC",
  "ts": "2024-10-28 13:51:32.308+00:00",
  "msg": "REST HTTP server could not be started",
  "topics": "beacnde",
  "address": "127.0.0.1:5432",
  "reason": "(98) Address already in use"
}
```
https://ci.status.im/job/nimbus-eth2/job/platforms/job/linux/job/x86_64/job/main/job/PR-6683/3/

Signed-off-by: Jakub Sokołowski <jakub@status.im>

fix_: create request logger ad-hoc in tests

Fixes `TestCall` failing when run concurrently.

chore_: configure codecov (#6005)

* chore_: configure codecov

* fix_: after_n_builds
2024-10-30 14:49:26 +05:30

463 lines
12 KiB
Go

package peers
import (
"crypto/ecdsa"
"errors"
"sync"
"time"
"go.uber.org/zap"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/status-im/status-go/common"
"github.com/status-im/status-go/discovery"
"github.com/status-im/status-go/logutils"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/peers/verifier"
"github.com/status-im/status-go/signal"
)
var (
// ErrDiscv5NotRunning returned when pool is started but discover v5 is not running or not enabled.
ErrDiscv5NotRunning = errors.New("Discovery v5 is not running")
)
// PoolEvent is a type used to for peer pool events.
type PoolEvent string
const (
immediately = 0 * time.Minute
// expirationPeriod is an amount of time while peer is considered as a connectable
expirationPeriod = 60 * time.Minute
// discoveryRestartTimeout defines how often loop will try to start discovery server
discoveryRestartTimeout = 2 * time.Second
// DefaultFastSync is a recommended value for aggressive peers search.
DefaultFastSync = 3 * time.Second
// DefaultSlowSync is a recommended value for slow (background) peers search.
DefaultSlowSync = 30 * time.Second
// DefaultDiscV5Timeout is a timeout after which Discv5 is stopped.
DefaultDiscV5Timeout = 3 * time.Minute
// DefaultTopicFastModeTimeout is a timeout after which sync mode is switched to slow mode.
DefaultTopicFastModeTimeout = 30 * time.Second
// DefaultTopicStopSearchDelay is the default delay when stopping a topic search.
DefaultTopicStopSearchDelay = 10 * time.Second
)
// Options is a struct with PeerPool configuration.
type Options struct {
FastSync time.Duration
SlowSync time.Duration
// After this time, Discovery is stopped even if max peers is not reached.
DiscServerTimeout time.Duration
// AllowStop allows stopping Discovery when reaching max peers or after timeout.
AllowStop bool
// TopicStopSearchDelay time stopSearch will be waiting for max cached peers to be
// filled before really stopping the search.
TopicStopSearchDelay time.Duration
// TrustedMailServers is a list of trusted nodes.
TrustedMailServers []enode.ID
}
// NewDefaultOptions returns a struct with default Options.
func NewDefaultOptions() *Options {
return &Options{
FastSync: DefaultFastSync,
SlowSync: DefaultSlowSync,
DiscServerTimeout: DefaultDiscV5Timeout,
AllowStop: false,
TopicStopSearchDelay: DefaultTopicStopSearchDelay,
}
}
type peerInfo struct {
// discoveredTime last time when node was found by v5
discoveredTime time.Time
// dismissed is true when our node requested a disconnect
dismissed bool
// added is true when the node tries to add this peer to a server
added bool
node *discv5.Node
// store public key separately to make peerInfo more independent from discv5
publicKey *ecdsa.PublicKey
}
func (p *peerInfo) NodeID() enode.ID {
return enode.PubkeyToIDV4(p.publicKey)
}
// PeerPool manages discovered peers and connects them to p2p server
type PeerPool struct {
opts *Options
discovery discovery.Discovery
// config can be set only once per pool life cycle
config map[discv5.Topic]params.Limits
cache *Cache
mu sync.RWMutex
timeoutMu sync.RWMutex
topics []TopicPoolInterface
serverSubscription event.Subscription
events chan *p2p.PeerEvent
quit chan struct{}
wg sync.WaitGroup
timeout <-chan time.Time
updateTopic chan *updateTopicRequest
}
// NewPeerPool creates instance of PeerPool
func NewPeerPool(discovery discovery.Discovery, config map[discv5.Topic]params.Limits, cache *Cache, options *Options) *PeerPool {
return &PeerPool{
opts: options,
discovery: discovery,
config: config,
cache: cache,
}
}
func (p *PeerPool) setDiscoveryTimeout() {
p.timeoutMu.Lock()
defer p.timeoutMu.Unlock()
if p.opts.AllowStop && p.opts.DiscServerTimeout > 0 {
p.timeout = time.After(p.opts.DiscServerTimeout)
}
}
// Start creates topic pool for each topic in config and subscribes to server events.
func (p *PeerPool) Start(server *p2p.Server) error {
if !p.discovery.Running() {
return ErrDiscv5NotRunning
}
p.mu.Lock()
defer p.mu.Unlock()
// init channels
p.quit = make(chan struct{})
p.updateTopic = make(chan *updateTopicRequest)
p.setDiscoveryTimeout()
// subscribe to peer events
p.events = make(chan *p2p.PeerEvent, 20)
p.serverSubscription = server.SubscribeEvents(p.events)
p.wg.Add(1)
go func() {
defer common.LogOnPanic()
p.handleServerPeers(server, p.events)
p.wg.Done()
}()
// collect topics and start searching for nodes
p.topics = make([]TopicPoolInterface, 0, len(p.config))
for topic, limits := range p.config {
var topicPool TopicPoolInterface
t := newTopicPool(p.discovery, topic, limits, p.opts.SlowSync, p.opts.FastSync, p.cache)
if topic == MailServerDiscoveryTopic {
v, err := p.initVerifier()
if err != nil {
return err
}
topicPool = newCacheOnlyTopicPool(t, v)
} else {
topicPool = t
}
if err := topicPool.StartSearch(server); err != nil {
return err
}
p.topics = append(p.topics, topicPool)
}
// discovery must be already started when pool is started
signal.SendDiscoveryStarted()
return nil
}
func (p *PeerPool) initVerifier() (v Verifier, err error) {
return verifier.NewLocalVerifier(p.opts.TrustedMailServers), nil
}
func (p *PeerPool) startDiscovery() error {
if p.discovery.Running() {
return nil
}
if err := p.discovery.Start(); err != nil {
return err
}
p.mu.Lock()
p.setDiscoveryTimeout()
p.mu.Unlock()
signal.SendDiscoveryStarted()
return nil
}
func (p *PeerPool) stopDiscovery(server *p2p.Server) {
if !p.discovery.Running() {
return
}
if err := p.discovery.Stop(); err != nil {
logutils.ZapLogger().Error("discovery errored when stopping", zap.Error(err))
}
for _, t := range p.topics {
t.StopSearch(server)
}
p.timeoutMu.Lock()
p.timeout = nil
p.timeoutMu.Unlock()
signal.SendDiscoveryStopped()
}
// restartDiscovery and search for topics that have peer count below min
func (p *PeerPool) restartDiscovery(server *p2p.Server) error {
if !p.discovery.Running() {
if err := p.startDiscovery(); err != nil {
return err
}
logutils.ZapLogger().Debug("restarted discovery from peer pool")
}
for _, t := range p.topics {
if !t.BelowMin() || t.SearchRunning() {
continue
}
err := t.StartSearch(server)
if err != nil {
logutils.ZapLogger().Error("search failed to start", zap.Error(err))
}
}
return nil
}
// handleServerPeers watches server peer events, notifies topic pools about changes
// in the peer set and stops the discv5 if all topic pools collected enough peers.
//
// @TODO(adam): split it into peers and discovery management loops. This should
// simplify the whole logic and allow to remove `timeout` field from `PeerPool`.
func (p *PeerPool) handleServerPeers(server *p2p.Server, events <-chan *p2p.PeerEvent) {
retryDiscv5 := make(chan struct{}, 1)
stopDiscv5 := make(chan struct{}, 1)
queueRetry := func(d time.Duration) {
go func() {
defer common.LogOnPanic()
time.Sleep(d)
select {
case retryDiscv5 <- struct{}{}:
default:
}
}()
}
queueStop := func() {
go func() {
defer common.LogOnPanic()
select {
case stopDiscv5 <- struct{}{}:
default:
}
}()
}
for {
// We use a separate lock for timeout, as this loop should
// always be running, otherwise the p2p.Server will hang.
// Because the handler of events might potentially hang on the
// server, deadlocking if this loop is waiting for the global lock.
// NOTE: this code probably needs to be refactored and simplified
// as it's difficult to follow the asynchronous nature of it.
p.timeoutMu.RLock()
timeout := p.timeout
p.timeoutMu.RUnlock()
select {
case <-p.quit:
logutils.ZapLogger().Debug("stopping DiscV5 because of quit")
p.stopDiscovery(server)
return
case <-timeout:
logutils.ZapLogger().Info("DiscV5 timed out")
p.stopDiscovery(server)
case <-retryDiscv5:
if err := p.restartDiscovery(server); err != nil {
logutils.ZapLogger().Error("starting discv5 failed", zap.Duration("retry", discoveryRestartTimeout), zap.Error(err))
queueRetry(discoveryRestartTimeout)
}
case <-stopDiscv5:
p.handleStopTopics(server)
case req := <-p.updateTopic:
if p.updateTopicLimits(server, req) == nil {
if !p.discovery.Running() {
queueRetry(immediately)
}
}
case event := <-events:
// NOTE: handlePeerEventType needs to be called asynchronously
// as it publishes on the <-events channel, leading to a deadlock
// if events channel is full.
go p.handlePeerEventType(server, event, queueRetry, queueStop)
}
}
}
func (p *PeerPool) handlePeerEventType(server *p2p.Server, event *p2p.PeerEvent, queueRetry func(time.Duration), queueStop func()) {
defer common.LogOnPanic()
p.mu.Lock()
defer p.mu.Unlock()
var shouldRetry bool
var shouldStop bool
switch event.Type {
case p2p.PeerEventTypeDrop:
logutils.ZapLogger().Debug("confirm peer dropped", zap.Stringer("ID", event.Peer))
if p.handleDroppedPeer(server, event.Peer) {
shouldRetry = true
}
case p2p.PeerEventTypeAdd: // skip other events
logutils.ZapLogger().Debug("confirm peer added", zap.Stringer("ID", event.Peer))
p.handleAddedPeer(server, event.Peer)
shouldStop = true
default:
return
}
// First we send the discovery summary
SendDiscoverySummary(server.PeersInfo())
// then we send the stop event
if shouldRetry {
queueRetry(immediately)
} else if shouldStop {
queueStop()
}
}
// handleAddedPeer notifies all topics about added peer.
func (p *PeerPool) handleAddedPeer(server *p2p.Server, nodeID enode.ID) {
for _, t := range p.topics {
t.ConfirmAdded(server, nodeID)
if p.opts.AllowStop && t.MaxReached() {
t.setStopSearchTimeout(p.opts.TopicStopSearchDelay)
}
}
}
// handleStopTopics stops the search on any topics having reached its max cached
// limit or its delay stop is expired, additionally will stop discovery if all
// peers are stopped.
func (p *PeerPool) handleStopTopics(server *p2p.Server) {
if !p.opts.AllowStop {
return
}
for _, t := range p.topics {
if t.readyToStopSearch() {
t.StopSearch(server)
}
}
if p.allTopicsStopped() {
logutils.ZapLogger().Debug("closing discv5 connection because all topics reached max limit")
p.stopDiscovery(server)
}
}
// allTopicsStopped returns true if all topics are stopped.
func (p *PeerPool) allTopicsStopped() (all bool) {
if !p.opts.AllowStop {
return false
}
all = true
for _, t := range p.topics {
if !t.isStopped() {
all = false
}
}
return all
}
// handleDroppedPeer notifies every topic about dropped peer and returns true if any peer have connections
// below min limit
func (p *PeerPool) handleDroppedPeer(server *p2p.Server, nodeID enode.ID) (any bool) {
for _, t := range p.topics {
confirmed := t.ConfirmDropped(server, nodeID)
if confirmed {
newPeer := t.AddPeerFromTable(server)
if newPeer != nil {
logutils.ZapLogger().Debug("added peer from local table", zap.Stringer("ID", newPeer.ID))
}
}
logutils.ZapLogger().Debug("search", zap.String("topic", string(t.Topic())), zap.Bool("below min", t.BelowMin()))
if t.BelowMin() && !t.SearchRunning() {
any = true
}
}
return any
}
// Stop closes pool quit channel and all channels that are watched by search queries
// and waits till all goroutines will exit.
func (p *PeerPool) Stop() {
// pool wasn't started
if p.quit == nil {
return
}
select {
case <-p.quit:
return
default:
logutils.ZapLogger().Debug("started closing peer pool")
close(p.quit)
}
p.serverSubscription.Unsubscribe()
p.wg.Wait()
}
type updateTopicRequest struct {
Topic string
Limits params.Limits
}
// UpdateTopic updates the pre-existing TopicPool limits.
func (p *PeerPool) UpdateTopic(topic string, limits params.Limits) error {
if _, err := p.getTopic(topic); err != nil {
return err
}
p.updateTopic <- &updateTopicRequest{
Topic: topic,
Limits: limits,
}
return nil
}
func (p *PeerPool) updateTopicLimits(server *p2p.Server, req *updateTopicRequest) error {
t, err := p.getTopic(req.Topic)
if err != nil {
return err
}
t.SetLimits(req.Limits)
return nil
}
func (p *PeerPool) getTopic(topic string) (TopicPoolInterface, error) {
for _, t := range p.topics {
if t.Topic() == discv5.Topic(topic) {
return t, nil
}
}
return nil, errors.New("topic not found")
}