Discover mail servers on demand. (#1082)

* [#1076] Discover mail servers on start up.

* [#1076] On-demand mail-server discovery
This commit is contained in:
Adrià Cidre 2018-07-16 09:40:40 +02:00 committed by GitHub
parent 5abc68b11d
commit bfbb02019f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 706 additions and 20 deletions

View File

@ -184,6 +184,7 @@ mock: ##@other Regenerate mocks
mockgen -package=account -destination=account/accounts_mock.go -source=account/accounts.go mockgen -package=account -destination=account/accounts_mock.go -source=account/accounts.go
mockgen -package=jail -destination=jail/cell_mock.go -source=jail/cell.go mockgen -package=jail -destination=jail/cell_mock.go -source=jail/cell.go
mockgen -package=status -destination=services/status/account_mock.go -source=services/status/service.go mockgen -package=status -destination=services/status/account_mock.go -source=services/status/service.go
mockgen -package=peer -destination=services/peer/discoverer_mock.go -source=services/peer/service.go
docker-test: ##@tests Run tests in a docker container with golang. docker-test: ##@tests Run tests in a docker container with golang.
docker run --privileged --rm -it -v "$(shell pwd):$(DOCKER_TEST_WORKDIR)" -w "$(DOCKER_TEST_WORKDIR)" $(DOCKER_TEST_IMAGE) go test ${ARGS} docker run --privileged --rm -it -v "$(shell pwd):$(DOCKER_TEST_WORKDIR)" -w "$(DOCKER_TEST_WORKDIR)" $(DOCKER_TEST_IMAGE) go test ${ARGS}

View File

@ -161,6 +161,10 @@ func (b *StatusBackend) startNode(config *params.NodeConfig) (err error) {
st.SetAccountManager(b.AccountManager()) st.SetAccountManager(b.AccountManager())
} }
if st, err := b.statusNode.PeerService(); err == nil {
st.SetDiscoverer(b.StatusNode())
}
signal.SendNodeReady() signal.SendNodeReady()
return nil return nil

View File

@ -24,6 +24,7 @@ import (
"github.com/status-im/status-go/mailserver" "github.com/status-im/status-go/mailserver"
shhmetrics "github.com/status-im/status-go/metrics/whisper" shhmetrics "github.com/status-im/status-go/metrics/whisper"
"github.com/status-im/status-go/params" "github.com/status-im/status-go/params"
"github.com/status-im/status-go/services/peer"
"github.com/status-im/status-go/services/personal" "github.com/status-im/status-go/services/personal"
"github.com/status-im/status-go/services/shhext" "github.com/status-im/status-go/services/shhext"
"github.com/status-im/status-go/services/status" "github.com/status-im/status-go/services/status"
@ -38,6 +39,7 @@ var (
ErrLightEthRegistrationFailure = errors.New("failed to register the LES service") ErrLightEthRegistrationFailure = errors.New("failed to register the LES service")
ErrPersonalServiceRegistrationFailure = errors.New("failed to register the personal api service") ErrPersonalServiceRegistrationFailure = errors.New("failed to register the personal api service")
ErrStatusServiceRegistrationFailure = errors.New("failed to register the Status service") ErrStatusServiceRegistrationFailure = errors.New("failed to register the Status service")
ErrPeerServiceRegistrationFailure = errors.New("failed to register the Peer service")
) )
// All general log messages in this package should be routed through this logger. // All general log messages in this package should be routed through this logger.
@ -103,6 +105,11 @@ func MakeNode(config *params.NodeConfig, db *leveldb.DB) (*node.Node, error) {
return nil, fmt.Errorf("%v: %v", ErrStatusServiceRegistrationFailure, err) return nil, fmt.Errorf("%v: %v", ErrStatusServiceRegistrationFailure, err)
} }
// start peer service
if err := activatePeerService(stack); err != nil {
return nil, fmt.Errorf("%v: %v", ErrPeerServiceRegistrationFailure, err)
}
return stack, nil return stack, nil
} }
@ -188,6 +195,13 @@ func activateStatusService(stack *node.Node, config *params.NodeConfig) error {
}) })
} }
func activatePeerService(stack *node.Node) error {
return stack.Register(func(ctx *node.ServiceContext) (node.Service, error) {
svc := peer.New()
return svc, nil
})
}
func registerMailServer(whisperService *whisper.Whisper, config *params.WhisperConfig) (err error) { func registerMailServer(whisperService *whisper.Whisper, config *params.WhisperConfig) (err error) {
// if the Password is already set, do not override it // if the Password is already set, do not override it
if config.MailServerPassword == "" && config.MailServerPasswordFile != "" { if config.MailServerPassword == "" && config.MailServerPasswordFile != "" {

View File

@ -23,6 +23,7 @@ import (
"github.com/status-im/status-go/params" "github.com/status-im/status-go/params"
"github.com/status-im/status-go/peers" "github.com/status-im/status-go/peers"
"github.com/status-im/status-go/rpc" "github.com/status-im/status-go/rpc"
"github.com/status-im/status-go/services/peer"
"github.com/status-im/status-go/services/status" "github.com/status-im/status-go/services/status"
) )
@ -428,6 +429,19 @@ func (n *StatusNode) StatusService() (st *status.Service, err error) {
return return
} }
// PeerService exposes reference to peer service running on top of the node.
func (n *StatusNode) PeerService() (st *peer.Service, err error) {
n.mu.RLock()
defer n.mu.RUnlock()
err = n.gethService(&st)
if err == node.ErrServiceUnknown {
err = ErrServiceUnknown
}
return
}
// WhisperService exposes reference to Whisper service running on top of the node // WhisperService exposes reference to Whisper service running on top of the node
func (n *StatusNode) WhisperService() (w *whisper.Whisper, err error) { func (n *StatusNode) WhisperService() (w *whisper.Whisper, err error) {
n.mu.RLock() n.mu.RLock()
@ -551,3 +565,14 @@ func (n *StatusNode) ensureSync(ctx context.Context) error {
} }
} }
} }
// Discover sets up the discovery for a specific topic.
func (n *StatusNode) Discover(topic string, max, min int) (err error) {
if n.peerPool == nil {
return errors.New("peerPool not running")
}
return n.peerPool.UpdateTopic(topic, params.Limits{
Max: max,
Min: min,
})
}

View File

@ -328,7 +328,12 @@ type NodeConfig struct {
// SwarmConfig extra configuration for Swarm and ENS // SwarmConfig extra configuration for Swarm and ENS
SwarmConfig *SwarmConfig `json:"SwarmConfig," validate:"structonly"` SwarmConfig *SwarmConfig `json:"SwarmConfig," validate:"structonly"`
// RegisterTopics a list of specific topics where the peer wants to be
// discoverable.
RegisterTopics []discv5.Topic `json:"RegisterTopics"` RegisterTopics []discv5.Topic `json:"RegisterTopics"`
// RequiredTopics list of topics where a client wants to search for
// discoverable peers with the discovery limits.
RequireTopics map[discv5.Topic]Limits `json:"RequireTopics"` RequireTopics map[discv5.Topic]Limits `json:"RequireTopics"`
// StatusServiceEnabled enables status service api // StatusServiceEnabled enables status service api

View File

@ -33,7 +33,7 @@ const (
// APIModules is a list of modules to expose via any type of RPC (HTTP, IPC, in-proc) // APIModules is a list of modules to expose via any type of RPC (HTTP, IPC, in-proc)
// we also expose 2 limited personal APIs by overriding them in `api/backend.go` // we also expose 2 limited personal APIs by overriding them in `api/backend.go`
APIModules = "eth,net,web3,shh,shhext" APIModules = "eth,net,web3,shh,shhext,peer"
// SendTransactionMethodName defines the name for a giving transaction. // SendTransactionMethodName defines the name for a giving transaction.
SendTransactionMethodName = "eth_sendTransaction" SendTransactionMethodName = "eth_sendTransaction"

70
peers/cotopicpool.go Normal file
View File

@ -0,0 +1,70 @@
package peers
import (
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/signal"
)
// MailServerDiscoveryTopic topic name for mailserver discovery.
const MailServerDiscoveryTopic = "mailserver"
// MailServerDiscoveryLimits default mailserver discovery limits.
var MailServerDiscoveryLimits = params.Limits{Min: 3, Max: 3}
// newCacheOnlyTopicPool returns instance of CacheOnlyTopicPool.
func newCacheOnlyTopicPool(t *TopicPool) *cacheOnlyTopicPool {
return &cacheOnlyTopicPool{
TopicPool: t,
}
}
// cacheOnlyTopicPool handles a mail server topic pool.
type cacheOnlyTopicPool struct {
*TopicPool
}
// MaxReached checks if the max allowed peers is reached or not. When true
// peerpool will stop the discovery process on this TopicPool.
// Main difference with basic TopicPool is we want to stop discovery process
// when the number of cached peers eq/exceeds the max limit.
func (t *cacheOnlyTopicPool) MaxReached() bool {
t.mu.RLock()
defer t.mu.RUnlock()
if t.limits.Max == 0 {
return true
}
peers := t.cache.GetPeersRange(t.topic, t.limits.Max)
return len(peers) >= t.limits.Max
}
var sendEnodeDiscovered = signal.SendEnodeDiscovered
// ConfirmAdded calls base TopicPool ConfirmAdded method and sends a signal
// confirming the enode has been discovered.
func (t *cacheOnlyTopicPool) ConfirmAdded(server *p2p.Server, nodeID discover.NodeID) {
t.TopicPool.ConfirmAdded(server, nodeID)
sendEnodeDiscovered(nodeID.String(), string(t.topic))
id := discv5.NodeID(nodeID)
if peer, ok := t.connectedPeers[id]; ok {
t.removeServerPeer(server, peer)
delete(t.connectedPeers, id)
t.subtractToLimits()
}
}
// subtractToLimits subtracts one to topic pool limits.
func (t *cacheOnlyTopicPool) subtractToLimits() {
t.mu.Lock()
defer t.mu.Unlock()
if t.limits.Max > 0 {
t.limits.Max = t.limits.Max - 1
}
if t.limits.Min > 0 {
t.limits.Min = t.limits.Min - 1
}
}

96
peers/cotopicpool_test.go Normal file
View File

@ -0,0 +1,96 @@
package peers
import (
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/discv5"
"github.com/status-im/status-go/params"
"github.com/stretchr/testify/suite"
)
type CacheOnlyTopicPoolSuite struct {
suite.Suite
peer *p2p.Server
topicPool *cacheOnlyTopicPool
}
func TestCacheOnlyTopicPoolSuite(t *testing.T) {
suite.Run(t, new(CacheOnlyTopicPoolSuite))
}
func (s *CacheOnlyTopicPoolSuite) SetupTest() {
maxCachedPeersMultiplier = 1
key, _ := crypto.GenerateKey()
name := common.MakeName("peer", "1.0")
s.peer = &p2p.Server{
Config: p2p.Config{
MaxPeers: 10,
Name: name,
ListenAddr: "0.0.0.0:0",
PrivateKey: key,
NoDiscovery: true,
},
}
s.Require().NoError(s.peer.Start())
limits := params.NewLimits(1, 2)
cache, err := newInMemoryCache()
s.Require().NoError(err)
t := newTopicPool(&DiscV5{}, MailServerDiscoveryTopic, limits, 100*time.Millisecond, 200*time.Millisecond, cache)
s.topicPool = newCacheOnlyTopicPool(t)
s.topicPool.running = 1
// This is a buffered channel to simplify testing.
// If your test generates more than 10 mode changes,
// override this `period` field or consume from it
// using `AssertConsumed()`.
s.topicPool.period = make(chan time.Duration, 10)
}
func (s *CacheOnlyTopicPoolSuite) TearDown() {
s.peer.Stop()
}
func (s *CacheOnlyTopicPoolSuite) TestReplacementPeerIsCounted() {
s.topicPool.limits = params.NewLimits(1, 1)
s.topicPool.maxCachedPeers = 1
peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311)
peer2 := discv5.NewNode(discv5.NodeID{2}, s.peer.Self().IP, 32311, 32311)
s.topicPool.processFoundNode(s.peer, peer1)
s.topicPool.processFoundNode(s.peer, peer2)
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID))
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer2.ID))
s.True(s.topicPool.MaxReached())
// When we stop searching for peers (when Max limit is reached)
s.topicPool.StopSearch(s.peer)
s.True(s.topicPool.MaxReached())
s.Equal(s.topicPool.limits.Max, 0)
s.Equal(s.topicPool.limits.Min, 0)
// Then we should drop all connected peers
s.Equal(len(s.topicPool.connectedPeers), 0)
// And cached peers should remain
cachedPeers := s.topicPool.cache.GetPeersRange(s.topicPool.topic, s.topicPool.maxCachedPeers)
s.Equal(len(cachedPeers), 1)
}
func (s *CacheOnlyTopicPoolSuite) TestConfirmAddedSignals() {
sentNodeID := ""
sentTopic := ""
sendEnodeDiscovered = func(enode, topic string) {
sentNodeID = enode
sentTopic = topic
}
peer1 := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311)
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(peer1.ID))
s.Equal((discv5.NodeID{1}).String(), sentNodeID)
s.Equal(MailServerDiscoveryTopic, sentTopic)
}

View File

@ -85,12 +85,13 @@ type PeerPool struct {
cache *Cache cache *Cache
mu sync.RWMutex mu sync.RWMutex
topics []*TopicPool topics []TopicPoolInterface
serverSubscription event.Subscription serverSubscription event.Subscription
events chan *p2p.PeerEvent events chan *p2p.PeerEvent
quit chan struct{} quit chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
timeout <-chan time.Time timeout <-chan time.Time
updateTopic chan *updateTopicRequest
} }
// NewPeerPool creates instance of PeerPool // NewPeerPool creates instance of PeerPool
@ -120,6 +121,7 @@ func (p *PeerPool) Start(server *p2p.Server) error {
// init channels // init channels
p.quit = make(chan struct{}) p.quit = make(chan struct{})
p.updateTopic = make(chan *updateTopicRequest)
p.setDiscoveryTimeout() p.setDiscoveryTimeout()
// subscribe to peer events // subscribe to peer events
@ -132,9 +134,15 @@ func (p *PeerPool) Start(server *p2p.Server) error {
}() }()
// collect topics and start searching for nodes // collect topics and start searching for nodes
p.topics = make([]*TopicPool, 0, len(p.config)) p.topics = make([]TopicPoolInterface, 0, len(p.config))
for topic, limits := range p.config { for topic, limits := range p.config {
topicPool := NewTopicPool(p.discovery, topic, limits, p.opts.SlowSync, p.opts.FastSync, p.cache) var topicPool TopicPoolInterface
t := newTopicPool(p.discovery, topic, limits, p.opts.SlowSync, p.opts.FastSync, p.cache)
if topic == MailServerDiscoveryTopic {
topicPool = newCacheOnlyTopicPool(t)
} else {
topicPool = t
}
if err := topicPool.StartSearch(server); err != nil { if err := topicPool.StartSearch(server); err != nil {
return err return err
} }
@ -165,13 +173,13 @@ func (p *PeerPool) startDiscovery() error {
return nil return nil
} }
func (p *PeerPool) stopDiscovery() { func (p *PeerPool) stopDiscovery(server *p2p.Server) {
if !p.discovery.Running() { if !p.discovery.Running() {
return return
} }
for _, t := range p.topics { for _, t := range p.topics {
t.StopSearch() t.StopSearch(server)
} }
if err := p.discovery.Stop(); err != nil { if err := p.discovery.Stop(); err != nil {
log.Error("discovery errored when was closed", "err", err) log.Error("discovery errored when was closed", "err", err)
@ -221,18 +229,24 @@ func (p *PeerPool) handleServerPeers(server *p2p.Server, events <-chan *p2p.Peer
select { select {
case <-p.quit: case <-p.quit:
log.Debug("stopping DiscV5 because of quit") log.Debug("stopping DiscV5 because of quit")
p.stopDiscovery() p.stopDiscovery(server)
return return
case <-timeout: case <-timeout:
log.Info("DiscV5 timed out") log.Info("DiscV5 timed out")
p.stopDiscovery() p.stopDiscovery(server)
case <-retryDiscv5: case <-retryDiscv5:
if err := p.restartDiscovery(server); err != nil { if err := p.restartDiscovery(server); err != nil {
retryDiscv5 = time.After(discoveryRestartTimeout) retryDiscv5 = time.After(discoveryRestartTimeout)
log.Error("starting discv5 failed", "error", err, "retry", discoveryRestartTimeout) log.Error("starting discv5 failed", "error", err, "retry", discoveryRestartTimeout)
} }
case <-stopDiscv5: case <-stopDiscv5:
p.handleStopTopics() p.handleStopTopics(server)
case req := <-p.updateTopic:
if p.updateTopicLimits(server, req) == nil {
if !p.discovery.Running() {
retryDiscv5 = time.After(0)
}
}
case event := <-events: case event := <-events:
switch event.Type { switch event.Type {
case p2p.PeerEventTypeDrop: case p2p.PeerEventTypeDrop:
@ -267,18 +281,18 @@ func (p *PeerPool) handleAddedPeer(server *p2p.Server, nodeID discover.NodeID) {
// handleStopTopics stops the search on any topics having reached its max cached // handleStopTopics stops the search on any topics having reached its max cached
// limit or its delay stop is expired, additionally will stop discovery if all // limit or its delay stop is expired, additionally will stop discovery if all
// peers are stopped. // peers are stopped.
func (p *PeerPool) handleStopTopics() { func (p *PeerPool) handleStopTopics(server *p2p.Server) {
if !p.opts.AllowStop { if !p.opts.AllowStop {
return return
} }
for _, t := range p.topics { for _, t := range p.topics {
if t.readyToStopSearch() { if t.readyToStopSearch() {
t.StopSearch() t.StopSearch(server)
} }
} }
if p.allTopicsStopped() { if p.allTopicsStopped() {
log.Debug("closing discv5 connection because all topics reached max limit") log.Debug("closing discv5 connection because all topics reached max limit")
p.stopDiscovery() p.stopDiscovery(server)
} }
} }
@ -309,7 +323,7 @@ func (p *PeerPool) handleDroppedPeer(server *p2p.Server, nodeID discover.NodeID)
log.Debug("added peer from local table", "ID", newPeer.ID) log.Debug("added peer from local table", "ID", newPeer.ID)
} }
} }
log.Debug("search", "topic", t.topic, "below min", t.BelowMin()) log.Debug("search", "topic", t.Topic(), "below min", t.BelowMin())
if t.BelowMin() && !t.SearchRunning() { if t.BelowMin() && !t.SearchRunning() {
any = true any = true
} }
@ -334,3 +348,40 @@ func (p *PeerPool) Stop() {
p.serverSubscription.Unsubscribe() p.serverSubscription.Unsubscribe()
p.wg.Wait() p.wg.Wait()
} }
type updateTopicRequest struct {
Topic string
Limits params.Limits
}
// UpdateTopic updates the pre-existing TopicPool limits.
func (p *PeerPool) UpdateTopic(topic string, limits params.Limits) error {
if _, err := p.getTopic(topic); err != nil {
return err
}
p.updateTopic <- &updateTopicRequest{
Topic: topic,
Limits: limits,
}
return nil
}
func (p *PeerPool) updateTopicLimits(server *p2p.Server, req *updateTopicRequest) error {
t, err := p.getTopic(req.Topic)
if err != nil {
return err
}
t.SetLimits(req.Limits)
return nil
}
func (p *PeerPool) getTopic(topic string) (TopicPoolInterface, error) {
for _, t := range p.topics {
if t.Topic() == discv5.Topic(topic) {
return t, nil
}
}
return nil, errors.New("topic not found")
}

View File

@ -138,7 +138,8 @@ func (s *PeerPoolSimulationSuite) TestPeerPoolCache() {
// check if cache is passed to topic pools // check if cache is passed to topic pools
for _, topicPool := range peerPool.topics { for _, topicPool := range peerPool.topics {
s.Equal(cache, topicPool.cache) tp := topicPool.(*TopicPool)
s.Equal(cache, tp.cache)
} }
} }
@ -366,3 +367,39 @@ func TestPeerPoolNotAllowedStopping(t *testing.T) {
<-time.After(pool.opts.DiscServerTimeout * 2) <-time.After(pool.opts.DiscServerTimeout * 2)
require.True(t, discovery.Running()) require.True(t, discovery.Running())
} }
func (s *PeerPoolSimulationSuite) TestUpdateTopicLimits() {
var err error
topic := discv5.Topic("cap=test")
config := map[discv5.Topic]params.Limits{
topic: params.NewLimits(1, 1),
}
peerPoolOpts := &Options{100 * time.Millisecond, 100 * time.Millisecond, 0, true, 100 * time.Millisecond}
cache, err := newInMemoryCache()
s.Require().NoError(err)
peerPool := NewPeerPool(s.discovery[1], config, cache, peerPoolOpts)
// start peer pool
s.Require().NoError(peerPool.Start(s.peers[1]))
defer peerPool.Stop()
for _, topicPool := range peerPool.topics {
tp := topicPool.(*TopicPool)
s.Equal(1, tp.limits.Max)
s.Equal(1, tp.limits.Min)
}
// Updating TopicPool's limits
err = peerPool.UpdateTopic("cap=test", params.NewLimits(5, 10))
s.Require().NoError(err)
time.Sleep(1 * time.Millisecond)
for _, topicPool := range peerPool.topics {
tp := topicPool.(*TopicPool)
tp.mu.RLock()
defer tp.mu.RUnlock()
s.Equal(10, tp.limits.Max)
s.Equal(5, tp.limits.Min)
}
}

View File

@ -23,8 +23,25 @@ const (
// to get the maximum number of cached peers allowed. // to get the maximum number of cached peers allowed.
var maxCachedPeersMultiplier = 2 var maxCachedPeersMultiplier = 2
// NewTopicPool returns instance of TopicPool // TopicPoolInterface the TopicPool interface.
func NewTopicPool(discovery Discovery, topic discv5.Topic, limits params.Limits, slowMode, fastMode time.Duration, cache *Cache) *TopicPool { type TopicPoolInterface interface {
StopSearch(server *p2p.Server)
BelowMin() bool
SearchRunning() bool
StartSearch(server *p2p.Server) error
ConfirmDropped(server *p2p.Server, nodeID discover.NodeID) bool
AddPeerFromTable(server *p2p.Server) *discv5.Node
MaxReached() bool
ConfirmAdded(server *p2p.Server, nodeID discover.NodeID)
isStopped() bool
Topic() discv5.Topic
SetLimits(limits params.Limits)
setStopSearchTimeout(delay time.Duration)
readyToStopSearch() bool
}
// newTopicPool returns instance of TopicPool.
func newTopicPool(discovery Discovery, topic discv5.Topic, limits params.Limits, slowMode, fastMode time.Duration, cache *Cache) *TopicPool {
pool := TopicPool{ pool := TopicPool{
discovery: discovery, discovery: discovery,
topic: topic, topic: topic,
@ -469,7 +486,7 @@ func (t *TopicPool) isStopped() bool {
} }
// StopSearch stops the closes stop // StopSearch stops the closes stop
func (t *TopicPool) StopSearch() { func (t *TopicPool) StopSearch(server *p2p.Server) {
if !atomic.CompareAndSwapInt32(&t.running, 1, 0) { if !atomic.CompareAndSwapInt32(&t.running, 1, 0) {
return return
} }
@ -495,3 +512,16 @@ func (t *TopicPool) StopSearch() {
close(t.period) close(t.period)
t.discWG.Wait() t.discWG.Wait()
} }
// Topic exposes the internal discovery topic.
func (t *TopicPool) Topic() discv5.Topic {
return t.topic
}
// SetLimits set the limits for the current TopicPool.
func (t *TopicPool) SetLimits(limits params.Limits) {
t.mu.Lock()
defer t.mu.Unlock()
t.limits = limits
}

View File

@ -42,7 +42,7 @@ func (s *TopicPoolSuite) SetupTest() {
limits := params.NewLimits(1, 2) limits := params.NewLimits(1, 2)
cache, err := newInMemoryCache() cache, err := newInMemoryCache()
s.Require().NoError(err) s.Require().NoError(err)
s.topicPool = NewTopicPool(&DiscV5{}, topic, limits, 100*time.Millisecond, 200*time.Millisecond, cache) s.topicPool = newTopicPool(&DiscV5{}, topic, limits, 100*time.Millisecond, 200*time.Millisecond, cache)
s.topicPool.running = 1 s.topicPool.running = 1
// This is a buffered channel to simplify testing. // This is a buffered channel to simplify testing.
// If your test generates more than 10 mode changes, // If your test generates more than 10 mode changes,
@ -314,3 +314,17 @@ func (s *TopicPoolSuite) TestMaxCachedPeers() {
cached = s.topicPool.cache.GetPeersRange(s.topicPool.topic, 5) cached = s.topicPool.cache.GetPeersRange(s.topicPool.topic, 5)
s.Equal(3, len(cached)) s.Equal(3, len(cached))
} }
func (s *TopicPoolSuite) TestNewTopicPoolInterface() {
limits := params.NewLimits(1, 2)
cache, err := newInMemoryCache()
s.Require().NoError(err)
topic := discv5.Topic("cap=cap1")
t := newTopicPool(&DiscV5{}, topic, limits, 100*time.Millisecond, 200*time.Millisecond, cache)
s.IsType(&TopicPool{}, t)
tp := newTopicPool(&DiscV5{}, MailServerDiscoveryTopic, limits, 100*time.Millisecond, 200*time.Millisecond, cache)
cacheTP := newCacheOnlyTopicPool(tp)
s.IsType(&cacheOnlyTopicPool{}, cacheTP)
}

48
services/peer/api.go Normal file
View File

@ -0,0 +1,48 @@
package peer
import (
"context"
"errors"
)
var (
// ErrInvalidTopic error returned when the requested topic is not valid.
ErrInvalidTopic = errors.New("topic not valid")
// ErrInvalidRange error returned when max-min range is not valid.
ErrInvalidRange = errors.New("invalid range, Min should be lower or equal to Max")
// ErrDiscovererNotProvided error when discoverer is not being provided.
ErrDiscovererNotProvided = errors.New("discoverer not provided")
)
// PublicAPI represents a set of APIs from the `web3.peer` namespace.
type PublicAPI struct {
s *Service
}
// NewAPI creates an instance of the peer API.
func NewAPI(s *Service) *PublicAPI {
return &PublicAPI{s: s}
}
// DiscoverRequest json request for peer_discover.
type DiscoverRequest struct {
Topic string `json:"topic"`
Max int `json:"max"`
Min int `json:"min"`
}
// Discover is an implementation of `peer_discover` or `web3.peer.discover` API.
func (api *PublicAPI) Discover(context context.Context, req DiscoverRequest) (err error) {
if api.s.d == nil {
return ErrDiscovererNotProvided
}
if len(req.Topic) == 0 {
return ErrInvalidTopic
}
if req.Max < req.Min {
return ErrInvalidRange
}
return api.s.d.Discover(req.Topic, req.Max, req.Min)
}

126
services/peer/api_test.go Normal file
View File

@ -0,0 +1,126 @@
package peer
import (
"context"
"errors"
"testing"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/suite"
)
func TestPeerSuite(t *testing.T) {
suite.Run(t, new(PeerSuite))
}
type PeerSuite struct {
suite.Suite
api *PublicAPI
s *Service
d *MockDiscoverer
}
func (s *PeerSuite) SetupTest() {
ctrl := gomock.NewController(s.T())
s.d = NewMockDiscoverer(ctrl)
s.s = New()
s.api = NewAPI(s.s)
}
var discovertests = []struct {
name string
expectedError error
prepareExpectations func(*PeerSuite)
request DiscoverRequest
}{
{
name: "success discover",
expectedError: nil,
prepareExpectations: func(s *PeerSuite) {
s.d.EXPECT().Discover("topic", 10, 1).Return(nil)
},
request: DiscoverRequest{
Topic: "topic",
Max: 10,
Min: 1,
},
},
{
name: "range 0",
expectedError: nil,
prepareExpectations: func(s *PeerSuite) {
s.d.EXPECT().Discover("topic", 10, 10).Return(nil)
},
request: DiscoverRequest{
Topic: "topic",
Max: 10,
Min: 10,
},
},
{
name: "invalid topic",
expectedError: ErrInvalidTopic,
prepareExpectations: func(s *PeerSuite) {},
request: DiscoverRequest{
Topic: "",
Max: 10,
Min: 1,
},
},
{
name: "invalid range",
expectedError: ErrInvalidRange,
prepareExpectations: func(s *PeerSuite) {},
request: DiscoverRequest{
Topic: "topic",
Max: 1,
Min: 10,
},
},
{
name: "success discover",
expectedError: nil,
prepareExpectations: func(s *PeerSuite) {
s.d.EXPECT().Discover("topic", 10, 1).Return(nil)
},
request: DiscoverRequest{
Topic: "topic",
Max: 10,
Min: 1,
},
},
{
name: "errored discover",
expectedError: errors.New("could not create the specified account : foo"),
prepareExpectations: func(s *PeerSuite) {
s.d.EXPECT().Discover("topic", 10, 1).Return(errors.New("could not create the specified account : foo"))
},
request: DiscoverRequest{
Topic: "topic",
Max: 10,
Min: 1,
},
},
}
func (s *PeerSuite) TestDiscover() {
for _, tc := range discovertests {
s.T().Run(tc.name, func(t *testing.T) {
s.s.SetDiscoverer(s.d)
tc.prepareExpectations(s)
var ctx context.Context
err := s.api.Discover(ctx, tc.request)
s.Equal(tc.expectedError, err, "failed scenario : "+tc.name)
})
}
}
func (s *PeerSuite) TestDiscoverWihEmptyDiscoverer() {
var ctx context.Context
s.Equal(ErrDiscovererNotProvided, s.api.Discover(ctx, DiscoverRequest{
Topic: "topic",
Max: 10,
Min: 1,
}))
}

View File

@ -0,0 +1,45 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: services/peer/service.go
// Package peer is a generated GoMock package.
package peer
import (
gomock "github.com/golang/mock/gomock"
reflect "reflect"
)
// MockDiscoverer is a mock of Discoverer interface
type MockDiscoverer struct {
ctrl *gomock.Controller
recorder *MockDiscovererMockRecorder
}
// MockDiscovererMockRecorder is the mock recorder for MockDiscoverer
type MockDiscovererMockRecorder struct {
mock *MockDiscoverer
}
// NewMockDiscoverer creates a new mock instance
func NewMockDiscoverer(ctrl *gomock.Controller) *MockDiscoverer {
mock := &MockDiscoverer{ctrl: ctrl}
mock.recorder = &MockDiscovererMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockDiscoverer) EXPECT() *MockDiscovererMockRecorder {
return m.recorder
}
// Discover mocks base method
func (m *MockDiscoverer) Discover(topic string, max, min int) error {
ret := m.ctrl.Call(m, "Discover", topic, max, min)
ret0, _ := ret[0].(error)
return ret0
}
// Discover indicates an expected call of Discover
func (mr *MockDiscovererMockRecorder) Discover(topic, max, min interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Discover", reflect.TypeOf((*MockDiscoverer)(nil).Discover), topic, max, min)
}

59
services/peer/service.go Normal file
View File

@ -0,0 +1,59 @@
package peer
import (
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rpc"
)
// Make sure that Service implements node.Service interface.
var _ node.Service = (*Service)(nil)
// Discoverer manages peer discovery.
type Discoverer interface {
Discover(topic string, max, min int) error
}
// Service it manages all endpoints for peer operations.
type Service struct {
d Discoverer
}
// New returns a new Service.
func New() *Service {
return &Service{}
}
// Protocols returns a new protocols list. In this case, there are none.
func (s *Service) Protocols() []p2p.Protocol {
return []p2p.Protocol{}
}
// APIs returns a list of new APIs.
func (s *Service) APIs() []rpc.API {
return []rpc.API{
{
Namespace: "peer",
Version: "1.0",
Service: NewAPI(s),
Public: false,
},
}
}
// SetDiscoverer sets discoverer for the API calls.
func (s *Service) SetDiscoverer(d Discoverer) {
s.d = d
}
// Start is run when a service is started.
// It does nothing in this case but is required by `node.Service` interface.
func (s *Service) Start(server *p2p.Server) error {
return nil
}
// Stop is run when a service is stopped.
// It does nothing in this case but is required by `node.Service` interface.
func (s *Service) Stop() error {
return nil
}

View File

@ -17,6 +17,9 @@ const (
// EventMailServerRequestExpired is triggered when request TTL ends // EventMailServerRequestExpired is triggered when request TTL ends
EventMailServerRequestExpired = "mailserver.request.expired" EventMailServerRequestExpired = "mailserver.request.expired"
// EventEnodeDiscovered is tiggered when enode has been discovered.
EventEnodeDiscovered = "enode.discovered"
) )
// EnvelopeSignal includes hash of the envelope. // EnvelopeSignal includes hash of the envelope.
@ -55,3 +58,18 @@ func SendMailServerRequestCompleted(requestID common.Hash, lastEnvelopeHash comm
func SendMailServerRequestExpired(hash common.Hash) { func SendMailServerRequestExpired(hash common.Hash) {
send(EventMailServerRequestExpired, EnvelopeSignal{hash}) send(EventMailServerRequestExpired, EnvelopeSignal{hash})
} }
// EnodeDiscoveredSignal includes enode address and topic
type EnodeDiscoveredSignal struct {
Enode string `json:"enode"`
Topic string `json:"topic"`
}
// SendEnodeDiscovered tiggered when an enode is discovered.
// finds a new enode.
func SendEnodeDiscovered(enode, topic string) {
send(EventEnodeDiscovered, EnodeDiscoveredSignal{
Enode: enode,
Topic: topic,
})
}

View File

@ -0,0 +1,43 @@
package services
import (
"testing"
"github.com/status-im/status-go/params"
"github.com/stretchr/testify/suite"
. "github.com/status-im/status-go/t/utils"
)
func TestPeerAPISuite(t *testing.T) {
s := new(PeerAPISuite)
s.upstream = false
suite.Run(t, s)
}
func TestPeerAPISuiteUpstream(t *testing.T) {
s := new(PeerAPISuite)
s.upstream = true
suite.Run(t, s)
}
type PeerAPISuite struct {
BaseJSONRPCSuite
upstream bool
}
func (s *PeerAPISuite) TestAccessiblePeerAPIs() {
if s.upstream && GetNetworkID() == params.StatusChainNetworkID {
s.T().Skip()
return
}
err := s.SetupTest(s.upstream, true, false)
s.NoError(err)
defer func() {
err := s.Backend.StopNode()
s.NoError(err)
}()
// These peer APIs should be available
s.AssertAPIMethodExported("peer_discover")
}