Add signals for notifying react about discovery activity (#859)

* Add signal for notifying react on discovery activity

* Whitelist expected signals in e2e tests
This commit is contained in:
Dmitry Shulyak 2018-04-25 10:13:59 +03:00 committed by GitHub
parent b37fda7731
commit f3e2631c1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 158 additions and 19 deletions

48
geth/peers/README.md Normal file
View File

@ -0,0 +1,48 @@
Peer pool signals
=================
Peer pool sends 3 types of signals.
Discovery started signal will be sent once discovery server is started.
And every time node will have to re-start discovery server because peer number dropped too low.
```json
{
"type": "discovery.started",
"event": null
}
```
Discovery stopped signal will be sent once discovery found max limit of peers
for every registered topic.
```json
{
"type": "discovery.stopped",
"event": null
}
```
Discovery summary signal will be sent every time new peer is added or removed
from a cluster. It will contain a map with capability as a key and total numbers
of peers with that capability as a value.
```json
{
"type": "discovery.summary",
"event": {
"shh/6": 1
}
}
```
Or if we don't have any peers:
```json
{
"type": "discovery.summary",
"event": {}
}
```

View File

@ -32,11 +32,6 @@ const (
DefaultFastSync = 3 * time.Second DefaultFastSync = 3 * time.Second
// DefaultSlowSync is a recommended value for slow (background) peers search. // DefaultSlowSync is a recommended value for slow (background) peers search.
DefaultSlowSync = 30 * time.Minute DefaultSlowSync = 30 * time.Minute
// Discv5Closed is sent when discv5 is closed
Discv5Closed PoolEvent = "discv5.closed"
// Discv5Started is sent when discv5 is started
Discv5Started PoolEvent = "discv5.started"
) )
// NewPeerPool creates instance of PeerPool // NewPeerPool creates instance of PeerPool
@ -74,8 +69,6 @@ type PeerPool struct {
quit chan struct{} quit chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
feed event.Feed
} }
// Start creates topic pool for each topic in config and subscribes to server events. // Start creates topic pool for each topic in config and subscribes to server events.
@ -94,6 +87,7 @@ func (p *PeerPool) Start(server *p2p.Server) error {
} }
p.topics = append(p.topics, topicPool) p.topics = append(p.topics, topicPool)
} }
SendDiscoveryStarted() // discovery must be started when pool is started
events := make(chan *p2p.PeerEvent, 20) events := make(chan *p2p.PeerEvent, 20)
p.serverSubscription = server.SubscribeEvents(events) p.serverSubscription = server.SubscribeEvents(events)
@ -115,7 +109,7 @@ func (p *PeerPool) restartDiscovery(server *p2p.Server) error {
} }
log.Debug("restarted discovery from peer pool") log.Debug("restarted discovery from peer pool")
server.DiscV5 = ntab server.DiscV5 = ntab
p.feed.Send(Discv5Started) SendDiscoveryStarted()
} }
for _, t := range p.topics { for _, t := range p.topics {
if !t.BelowMin() || t.SearchRunning() { if !t.BelowMin() || t.SearchRunning() {
@ -152,6 +146,7 @@ func (p *PeerPool) handleServerPeers(server *p2p.Server, events <-chan *p2p.Peer
case p2p.PeerEventTypeAdd: case p2p.PeerEventTypeAdd:
p.handleAddedEvent(server, event) p.handleAddedEvent(server, event)
} }
SendDiscoverySummary(server.PeersInfo())
} }
} }
} }
@ -167,7 +162,7 @@ func (p *PeerPool) handleAddedEvent(server *p2p.Server, event *p2p.PeerEvent) {
log.Debug("closing discv5 connection", "server", server.Self()) log.Debug("closing discv5 connection", "server", server.Self())
server.DiscV5.Close() server.DiscV5.Close()
server.DiscV5 = nil server.DiscV5 = nil
p.feed.Send(Discv5Closed) SendDiscoveryStopped()
} }
} }

View File

@ -1,6 +1,7 @@
package peers package peers
import ( import (
"encoding/json"
"fmt" "fmt"
"net" "net"
"strconv" "strconv"
@ -12,10 +13,12 @@ import (
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/discv5" "github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/ethereum/go-ethereum/whisper/whisperv6"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"github.com/status-im/status-go/geth/params" "github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/signal"
) )
type PeerPoolSimulationSuite struct { type PeerPoolSimulationSuite struct {
@ -52,6 +55,7 @@ func (s *PeerPoolSimulationSuite) SetupTest() {
s.peers = make([]*p2p.Server, 3) s.peers = make([]*p2p.Server, 3)
for i := range s.peers { for i := range s.peers {
key, _ := crypto.GenerateKey() key, _ := crypto.GenerateKey()
whisper := whisperv6.New(nil)
peer := &p2p.Server{ peer := &p2p.Server{
Config: p2p.Config{ Config: p2p.Config{
MaxPeers: 10, MaxPeers: 10,
@ -61,6 +65,7 @@ func (s *PeerPoolSimulationSuite) SetupTest() {
DiscoveryV5: true, DiscoveryV5: true,
NoDiscovery: true, NoDiscovery: true,
BootstrapNodesV5: []*discv5.Node{bootnodeV5}, BootstrapNodesV5: []*discv5.Node{bootnodeV5},
Protocols: whisper.Protocols(),
}, },
} }
port++ port++
@ -82,21 +87,43 @@ func (s *PeerPoolSimulationSuite) getPeerFromEvent(events <-chan *p2p.PeerEvent,
return return
} }
func (s *PeerPoolSimulationSuite) getPoolEvent(events <-chan PoolEvent) PoolEvent { func (s *PeerPoolSimulationSuite) getPoolEvent(events <-chan string) string {
select { select {
case ev := <-events: case ev := <-events:
return ev return ev
case <-time.After(200 * time.Millisecond): case <-time.After(time.Second):
s.Fail("timed out waiting for a peer") s.Fail("timed out waiting for a peer")
return "" return ""
} }
} }
func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() { func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() {
poolEvents := make(chan string, 1)
summaries := make(chan map[string]int, 1)
signal.SetDefaultNodeNotificationHandler(func(jsonEvent string) {
fmt.Println(jsonEvent)
var envelope struct {
Type string
Event json.RawMessage
}
s.NoError(json.Unmarshal([]byte(jsonEvent), &envelope))
switch envelope.Type {
case DiscoveryStarted:
poolEvents <- envelope.Type
case DiscoveryStopped:
poolEvents <- envelope.Type
case DiscoverySummary:
poolEvents <- envelope.Type
var summary map[string]int
s.NoError(json.Unmarshal(envelope.Event, &summary))
summaries <- summary
}
})
topic := discv5.Topic("cap=test") topic := discv5.Topic("cap=test")
// simulation should only rely on fast sync // simulation should only rely on fast sync
config := map[discv5.Topic]params.Limits{ config := map[discv5.Topic]params.Limits{
topic: params.NewLimits(1, 1), // limits a chosen for simplicity of the simulation topic: params.NewLimits(1, 1), // limits are chosen for simplicity of the simulation
} }
peerPool := NewPeerPool(config, 100*time.Millisecond, 100*time.Millisecond, nil, true) peerPool := NewPeerPool(config, 100*time.Millisecond, 100*time.Millisecond, nil, true)
register := NewRegister(topic) register := NewRegister(topic)
@ -109,23 +136,39 @@ func (s *PeerPoolSimulationSuite) TestSingleTopicDiscoveryWithFailover() {
defer subscription.Unsubscribe() defer subscription.Unsubscribe()
s.NoError(peerPool.Start(s.peers[1])) s.NoError(peerPool.Start(s.peers[1]))
defer peerPool.Stop() defer peerPool.Stop()
poolEvents := make(chan PoolEvent) s.Equal(DiscoveryStarted, s.getPoolEvent(poolEvents))
poolSub := peerPool.feed.Subscribe(poolEvents)
defer poolSub.Unsubscribe()
connected := s.getPeerFromEvent(events, p2p.PeerEventTypeAdd) connected := s.getPeerFromEvent(events, p2p.PeerEventTypeAdd)
s.Equal(s.peers[0].Self().ID, connected) s.Equal(s.peers[0].Self().ID, connected)
s.Equal(Discv5Closed, s.getPoolEvent(poolEvents)) s.Equal(DiscoveryStopped, s.getPoolEvent(poolEvents))
s.Require().Nil(s.peers[1].DiscV5) s.Require().Nil(s.peers[1].DiscV5)
s.Require().Equal(DiscoverySummary, s.getPoolEvent(poolEvents))
summary := <-summaries
s.Len(summary, 1)
s.Contains(summary, "shh/6")
s.Equal(summary["shh/6"], 1)
s.peers[0].Stop() s.peers[0].Stop()
disconnected := s.getPeerFromEvent(events, p2p.PeerEventTypeDrop) disconnected := s.getPeerFromEvent(events, p2p.PeerEventTypeDrop)
s.Equal(connected, disconnected) s.Equal(connected, disconnected)
s.Equal(Discv5Started, s.getPoolEvent(poolEvents))
s.Require().Equal(DiscoverySummary, s.getPoolEvent(poolEvents))
summary = <-summaries
s.Len(summary, 0)
s.Equal(DiscoveryStarted, s.getPoolEvent(poolEvents))
s.Require().NotNil(s.peers[1].DiscV5) s.Require().NotNil(s.peers[1].DiscV5)
register = NewRegister(topic) register = NewRegister(topic)
s.Require().NoError(register.Start(s.peers[2])) s.Require().NoError(register.Start(s.peers[2]))
defer register.Stop() defer register.Stop()
newConnected := s.getPeerFromEvent(events, p2p.PeerEventTypeAdd) s.Equal(s.peers[2].Self().ID, s.getPeerFromEvent(events, p2p.PeerEventTypeAdd))
s.Equal(s.peers[2].Self().ID, newConnected)
s.Equal(DiscoveryStopped, s.getPoolEvent(poolEvents))
s.Require().Equal(DiscoverySummary, s.getPoolEvent(poolEvents))
summary = <-summaries
s.Len(summary, 1)
s.Contains(summary, "shh/6")
s.Equal(summary["shh/6"], 1)
} }
func (s *PeerPoolSimulationSuite) TearDown() { func (s *PeerPoolSimulationSuite) TearDown() {

45
geth/peers/signal.go Normal file
View File

@ -0,0 +1,45 @@
package peers
import (
"github.com/ethereum/go-ethereum/p2p"
"github.com/status-im/status-go/geth/signal"
)
const (
// DiscoveryStarted is sent when node discv5 was started.
DiscoveryStarted = "discovery.started"
// DiscoveryStopped is sent when discv5 server was stopped.
DiscoveryStopped = "discovery.stopped"
// DiscoverySummary is sent when peer is added or removed.
// it will be a map with capability=peer count k/v's.
DiscoverySummary = "discovery.summary"
)
// SendDiscoveryStarted sends discovery.started signal.
func SendDiscoveryStarted() {
signal.Send(signal.Envelope{
Type: DiscoveryStarted,
})
}
// SendDiscoveryStopped sends discovery.stopped signal.
func SendDiscoveryStopped() {
signal.Send(signal.Envelope{
Type: DiscoveryStopped,
})
}
// SendDiscoverySummary sends discovery.summary signal.
func SendDiscoverySummary(peers []*p2p.PeerInfo) {
summary := map[string]int{}
for i := range peers {
for _, cap := range peers[i].Caps {
summary[cap]++
}
}
signal.Send(signal.Envelope{
Type: DiscoverySummary,
Event: summary,
})
}

View File

@ -201,6 +201,14 @@ func (s *APITestSuite) TestEventsNodeStartStop() {
var envelope signal.Envelope var envelope signal.Envelope
err := json.Unmarshal([]byte(jsonEvent), &envelope) err := json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err) s.NoError(err)
// whitelist types that we are interested in
switch envelope.Type {
case signal.EventNodeStarted:
case signal.EventNodeStopped:
case signal.EventNodeReady:
default:
return
}
envelopes <- envelope envelopes <- envelope
}) })