Add interface to verify trusted MailServers (#1112)

The goal of this PR is to add an interface to verify MailServers. In this PR, MailServers are hardcoded in status-go. The next iteration will use a smart contract.
This commit is contained in:
Adam Babik 2018-07-25 16:48:02 +02:00 committed by GitHub
parent 408ba5a7e6
commit 3b8c6c8260
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 249 additions and 22 deletions

View File

@ -327,6 +327,14 @@ func parseNodesV5(enodes []string) []*discv5.Node {
return nodes return nodes
} }
func parseNodesToNodeID(enodes []string) []discover.NodeID {
nodeIDs := make([]discover.NodeID, 0, len(enodes))
for _, node := range parseNodes(enodes) {
nodeIDs = append(nodeIDs, node.ID)
}
return nodeIDs
}
// whisperTimeSource get timeSource to be used by whisper // whisperTimeSource get timeSource to be used by whisper
func whisperTimeSource(ctx *node.ServiceContext) (func() time.Time, error) { func whisperTimeSource(ctx *node.ServiceContext) (func() time.Time, error) {
var timeSource *timesource.NTPTimeSource var timeSource *timesource.NTPTimeSource

View File

@ -1,8 +1,10 @@
package node package node
import ( import (
"fmt"
"testing" "testing"
"github.com/ethereum/go-ethereum/p2p/discover"
. "github.com/status-im/status-go/t/utils" . "github.com/status-im/status-go/t/utils"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
@ -57,3 +59,12 @@ func TestMakeNodeMalformedBootnodes(t *testing.T) {
_, err = MakeNode(config, db) _, err = MakeNode(config, db)
require.NoError(t, err) require.NoError(t, err)
} }
func TestParseNodesToNodeID(t *testing.T) {
nodeIDs := parseNodesToNodeID([]string{
"enode://badkey@127.0.0.1:30303",
fmt.Sprintf("enode://%s@127.0.0.1:30303", discover.NodeID{1}),
})
require.Len(t, nodeIDs, 1)
require.Equal(t, discover.NodeID{1}, nodeIDs[0])
}

View File

@ -223,6 +223,7 @@ func (n *StatusNode) startDiscovery() error {
options := peers.NewDefaultOptions() options := peers.NewDefaultOptions()
// TODO(dshulyak) consider adding a flag to define this behaviour // TODO(dshulyak) consider adding a flag to define this behaviour
options.AllowStop = len(n.config.RegisterTopics) == 0 options.AllowStop = len(n.config.RegisterTopics) == 0
options.TrustedMailServers = parseNodesToNodeID(n.config.ClusterConfig.TrustedMailServers)
n.peerPool = peers.NewPeerPool( n.peerPool = peers.NewPeerPool(
n.discovery, n.discovery,
n.config.RequireTopics, n.config.RequireTopics,

View File

@ -12,6 +12,7 @@ type cluster struct {
NetworkID int `json:"networkID"` NetworkID int `json:"networkID"`
StaticNodes []string `json:"staticnodes"` StaticNodes []string `json:"staticnodes"`
BootNodes []string `json:"bootnodes"` BootNodes []string `json:"bootnodes"`
MailServers []string `json:"mailservers"` // list of trusted mail servers
} }
var ropstenCluster = cluster{ var ropstenCluster = cluster{
@ -26,6 +27,14 @@ var ropstenCluster = cluster{
"enode://a6a2a9b3a7cbb0a15da74301537ebba549c990e3325ae78e1272a19a3ace150d03c184b8ac86cc33f1f2f63691e467d49308f02d613277754c4dccd6773b95e8@206.189.243.176:30304", // node-01.do-ams3.eth.beta "enode://a6a2a9b3a7cbb0a15da74301537ebba549c990e3325ae78e1272a19a3ace150d03c184b8ac86cc33f1f2f63691e467d49308f02d613277754c4dccd6773b95e8@206.189.243.176:30304", // node-01.do-ams3.eth.beta
"enode://207e53d9bf66be7441e3daba36f53bfbda0b6099dba9a865afc6260a2d253fb8a56a72a48598a4f7ba271792c2e4a8e1a43aaef7f34857f520c8c820f63b44c8@35.224.15.65:30304", // node-01.gc-us-central1-a.eth.beta "enode://207e53d9bf66be7441e3daba36f53bfbda0b6099dba9a865afc6260a2d253fb8a56a72a48598a4f7ba271792c2e4a8e1a43aaef7f34857f520c8c820f63b44c8@35.224.15.65:30304", // node-01.gc-us-central1-a.eth.beta
}, },
MailServers: []string{
"enode://c42f368a23fa98ee546fd247220759062323249ef657d26d357a777443aec04db1b29a3a22ef3e7c548e18493ddaf51a31b0aed6079bd6ebe5ae838fcfaf3a49@206.189.108.78:30504", // mail-01.do-ams3.eth.beta
"enode://7aa648d6e855950b2e3d3bf220c496e0cae4adfddef3e1e6062e6b177aec93bc6cdcf1282cb40d1656932ebfdd565729da440368d7c4da7dbd4d004b1ac02bf8@206.189.108.63:30504", // mail-02.do-ams3.eth.beta
"enode://8a64b3c349a2e0ef4a32ea49609ed6eb3364be1110253c20adc17a3cebbc39a219e5d3e13b151c0eee5d8e0f9a8ba2cd026014e67b41a4ab7d1d5dd67ca27427@206.189.7.30:30504", // mail-03.do-ams3.eth.beta
"enode://7de99e4cb1b3523bd26ca212369540646607c721ad4f3e5c821ed9148150ce6ce2e72631723002210fac1fd52dfa8bbdf3555e05379af79515e1179da37cc3db@35.188.19.210:30504", // mail-01.gc-us-central1-a.eth.beta
"enode://015e22f6cd2b44c8a51bd7a23555e271e0759c7d7f52432719665a74966f2da456d28e154e836bee6092b4d686fe67e331655586c57b718be3997c1629d24167@35.226.21.19:30504", // mail-02.gc-us-central1-a.eth.beta
"enode://531e252ec966b7e83f5538c19bf1cde7381cc7949026a6e499b6e998e695751aadf26d4c98d5a4eabfb7cefd31c3c88d600a775f14ed5781520a88ecd25da3c6@35.225.227.79:30504", // mail-03.gc-us-central1-a.eth.beta
},
} }
var rinkebyCluster = cluster{ var rinkebyCluster = cluster{
@ -40,6 +49,10 @@ var rinkebyCluster = cluster{
"enode://ba41aa829287a0a9076d9bffed97c8ce2e491b99873288c9e886f16fd575306ac6c656db4fbf814f5a9021aec004ffa9c0ae8650f92fd10c12eeb7c364593eb3@51.15.69.147:30303", "enode://ba41aa829287a0a9076d9bffed97c8ce2e491b99873288c9e886f16fd575306ac6c656db4fbf814f5a9021aec004ffa9c0ae8650f92fd10c12eeb7c364593eb3@51.15.69.147:30303",
"enode://28ecf5272b560ca951f4cd7f1eb8bd62da5853b026b46db432c4b01797f5b0114819a090a72acd7f32685365ecd8e00450074fa0673039aefe10f3fb666e0f3f@51.15.76.249:30303", "enode://28ecf5272b560ca951f4cd7f1eb8bd62da5853b026b46db432c4b01797f5b0114819a090a72acd7f32685365ecd8e00450074fa0673039aefe10f3fb666e0f3f@51.15.76.249:30303",
}, },
MailServers: []string{
"enode://43829580446ad138386dadb7fa50b6bd4d99f7c28659a0bc08115f8c0380005922a340962496f6af756a42b94a1522baa38a694fa27de59c3a73d4e08d5dbb31@206.189.6.48:30504",
"enode://70a2004e78399075f566033c42e9a0b1d43c683d4742755bb5457d03191be66a1b48c2b4fb259696839f28646a5828a1958b900860e27897f984ad0fc8482404@206.189.56.154:30504",
},
} }
var mainnetCluster = cluster{ var mainnetCluster = cluster{
@ -54,6 +67,14 @@ var mainnetCluster = cluster{
"enode://a6a2a9b3a7cbb0a15da74301537ebba549c990e3325ae78e1272a19a3ace150d03c184b8ac86cc33f1f2f63691e467d49308f02d613277754c4dccd6773b95e8@206.189.243.176:30304", // node-01.do-ams3.eth.beta "enode://a6a2a9b3a7cbb0a15da74301537ebba549c990e3325ae78e1272a19a3ace150d03c184b8ac86cc33f1f2f63691e467d49308f02d613277754c4dccd6773b95e8@206.189.243.176:30304", // node-01.do-ams3.eth.beta
"enode://207e53d9bf66be7441e3daba36f53bfbda0b6099dba9a865afc6260a2d253fb8a56a72a48598a4f7ba271792c2e4a8e1a43aaef7f34857f520c8c820f63b44c8@35.224.15.65:30304", // node-01.gc-us-central1-a.eth.beta "enode://207e53d9bf66be7441e3daba36f53bfbda0b6099dba9a865afc6260a2d253fb8a56a72a48598a4f7ba271792c2e4a8e1a43aaef7f34857f520c8c820f63b44c8@35.224.15.65:30304", // node-01.gc-us-central1-a.eth.beta
}, },
MailServers: []string{
"enode://c42f368a23fa98ee546fd247220759062323249ef657d26d357a777443aec04db1b29a3a22ef3e7c548e18493ddaf51a31b0aed6079bd6ebe5ae838fcfaf3a49@206.189.108.78:30504", // mail-01.do-ams3.eth.beta
"enode://7aa648d6e855950b2e3d3bf220c496e0cae4adfddef3e1e6062e6b177aec93bc6cdcf1282cb40d1656932ebfdd565729da440368d7c4da7dbd4d004b1ac02bf8@206.189.108.63:30504", // mail-02.do-ams3.eth.beta
"enode://8a64b3c349a2e0ef4a32ea49609ed6eb3364be1110253c20adc17a3cebbc39a219e5d3e13b151c0eee5d8e0f9a8ba2cd026014e67b41a4ab7d1d5dd67ca27427@206.189.7.30:30504", // mail-03.do-ams3.eth.beta
"enode://7de99e4cb1b3523bd26ca212369540646607c721ad4f3e5c821ed9148150ce6ce2e72631723002210fac1fd52dfa8bbdf3555e05379af79515e1179da37cc3db@35.188.19.210:30504", // mail-01.gc-us-central1-a.eth.beta
"enode://015e22f6cd2b44c8a51bd7a23555e271e0759c7d7f52432719665a74966f2da456d28e154e836bee6092b4d686fe67e331655586c57b718be3997c1629d24167@35.226.21.19:30504", // mail-02.gc-us-central1-a.eth.beta
"enode://531e252ec966b7e83f5538c19bf1cde7381cc7949026a6e499b6e998e695751aadf26d4c98d5a4eabfb7cefd31c3c88d600a775f14ed5781520a88ecd25da3c6@35.225.227.79:30504", // mail-03.gc-us-central1-a.eth.beta
},
} }
var betaCluster = []cluster{ropstenCluster, rinkebyCluster, mainnetCluster} var betaCluster = []cluster{ropstenCluster, rinkebyCluster, mainnetCluster}

View File

@ -200,6 +200,9 @@ type ClusterConfig struct {
// for a given mode (production vs development) // for a given mode (production vs development)
BootNodes []string BootNodes []string
// TrustedMailServers is a list of verified Mail Servers.
TrustedMailServers []string
// RendezvousNodes is a rendezvous discovery server. // RendezvousNodes is a rendezvous discovery server.
RendezvousNodes []string RendezvousNodes []string
} }
@ -644,6 +647,7 @@ func (c *NodeConfig) updateClusterConfig() error {
if len(cluster.BootNodes) == 0 { if len(cluster.BootNodes) == 0 {
c.NoDiscovery = true c.NoDiscovery = true
} }
c.ClusterConfig.TrustedMailServers = cluster.MailServers
break break
} }
} }

View File

@ -1,6 +1,8 @@
package peers package peers
import ( import (
"context"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover" "github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/discv5" "github.com/ethereum/go-ethereum/p2p/discv5"
@ -8,22 +10,29 @@ import (
"github.com/status-im/status-go/signal" "github.com/status-im/status-go/signal"
) )
// Verifier verifies if a give node is trusted.
type Verifier interface {
VerifyNode(context.Context, discover.NodeID) bool
}
// MailServerDiscoveryTopic topic name for mailserver discovery. // MailServerDiscoveryTopic topic name for mailserver discovery.
const MailServerDiscoveryTopic = "whispermail" const MailServerDiscoveryTopic = "whispermail"
// MailServerDiscoveryLimits default mailserver discovery limits. // MailServerDiscoveryLimits default mailserver discovery limits.
var MailServerDiscoveryLimits = params.Limits{Min: 3, Max: 3} var MailServerDiscoveryLimits = params.Limits{Min: 3, Max: 3}
// newCacheOnlyTopicPool returns instance of CacheOnlyTopicPool.
func newCacheOnlyTopicPool(t *TopicPool) *cacheOnlyTopicPool {
return &cacheOnlyTopicPool{
TopicPool: t,
}
}
// cacheOnlyTopicPool handles a mail server topic pool. // cacheOnlyTopicPool handles a mail server topic pool.
type cacheOnlyTopicPool struct { type cacheOnlyTopicPool struct {
*TopicPool *TopicPool
verifier Verifier
}
// newCacheOnlyTopicPool returns instance of CacheOnlyTopicPool.
func newCacheOnlyTopicPool(t *TopicPool, verifier Verifier) *cacheOnlyTopicPool {
return &cacheOnlyTopicPool{
TopicPool: t,
verifier: verifier,
}
} }
// MaxReached checks if the max allowed peers is reached or not. When true // MaxReached checks if the max allowed peers is reached or not. When true
@ -45,14 +54,34 @@ var sendEnodeDiscovered = signal.SendEnodeDiscovered
// ConfirmAdded calls base TopicPool ConfirmAdded method and sends a signal // ConfirmAdded calls base TopicPool ConfirmAdded method and sends a signal
// confirming the enode has been discovered. // confirming the enode has been discovered.
func (t *cacheOnlyTopicPool) ConfirmAdded(server *p2p.Server, nodeID discover.NodeID) { func (t *cacheOnlyTopicPool) ConfirmAdded(server *p2p.Server, nodeID discover.NodeID) {
trusted := t.verifier.VerifyNode(context.TODO(), nodeID)
if trusted {
// add to cache only if trusted
t.TopicPool.ConfirmAdded(server, nodeID) t.TopicPool.ConfirmAdded(server, nodeID)
sendEnodeDiscovered(nodeID.String(), string(t.topic)) sendEnodeDiscovered(nodeID.String(), string(t.topic))
t.subtractToLimits()
}
id := discv5.NodeID(nodeID) id := discv5.NodeID(nodeID)
// If a peer was trusted, it was moved to connectedPeers,
// signal was sent and we can safely remove it.
if peer, ok := t.connectedPeers[id]; ok { if peer, ok := t.connectedPeers[id]; ok {
t.removeServerPeer(server, peer) t.removeServerPeer(server, peer)
// Delete it from `connectedPeers` immediately to
// prevent removing it from the cache which logic is
// implemented in TopicPool.
delete(t.connectedPeers, id) delete(t.connectedPeers, id)
t.subtractToLimits() }
// It a peer was not trusted, it is still in pendingPeers.
// We should remove it from the p2p.Server.
if peer, ok := t.pendingPeers[id]; ok {
t.removeServerPeer(server, peer.peerInfo)
// Delete it from `connectedPeers` immediately to
// prevent removing it from the cache which logic is
// implemented in TopicPool.
delete(t.pendingPeers, id)
} }
} }

View File

@ -1,6 +1,7 @@
package peers package peers
import ( import (
"context"
"testing" "testing"
"time" "time"
@ -42,7 +43,7 @@ func (s *CacheOnlyTopicPoolSuite) SetupTest() {
cache, err := newInMemoryCache() cache, err := newInMemoryCache()
s.Require().NoError(err) s.Require().NoError(err)
t := newTopicPool(nil, MailServerDiscoveryTopic, limits, 100*time.Millisecond, 200*time.Millisecond, cache) t := newTopicPool(nil, MailServerDiscoveryTopic, limits, 100*time.Millisecond, 200*time.Millisecond, cache)
s.topicPool = newCacheOnlyTopicPool(t) s.topicPool = newCacheOnlyTopicPool(t, &testTrueVerifier{})
s.topicPool.running = 1 s.topicPool.running = 1
// This is a buffered channel to simplify testing. // This is a buffered channel to simplify testing.
// If your test generates more than 10 mode changes, // If your test generates more than 10 mode changes,
@ -70,15 +71,15 @@ func (s *CacheOnlyTopicPoolSuite) TestReplacementPeerIsCounted() {
// When we stop searching for peers (when Max limit is reached) // When we stop searching for peers (when Max limit is reached)
s.topicPool.StopSearch(s.peer) s.topicPool.StopSearch(s.peer)
s.True(s.topicPool.MaxReached()) s.True(s.topicPool.MaxReached())
s.Equal(s.topicPool.limits.Max, 0) s.Equal(0, s.topicPool.limits.Max)
s.Equal(s.topicPool.limits.Min, 0) s.Equal(0, s.topicPool.limits.Min)
// Then we should drop all connected peers // Then we should drop all connected peers
s.Equal(len(s.topicPool.connectedPeers), 0) s.Equal(len(s.topicPool.connectedPeers), 0)
// And cached peers should remain // And cached peers should remain
cachedPeers := s.topicPool.cache.GetPeersRange(s.topicPool.topic, s.topicPool.maxCachedPeers) cachedPeers := s.topicPool.cache.GetPeersRange(s.topicPool.topic, s.topicPool.maxCachedPeers)
s.Equal(len(cachedPeers), 1) s.Equal(1, len(cachedPeers))
} }
func (s *CacheOnlyTopicPoolSuite) TestConfirmAddedSignals() { func (s *CacheOnlyTopicPoolSuite) TestConfirmAddedSignals() {
@ -94,3 +95,35 @@ func (s *CacheOnlyTopicPoolSuite) TestConfirmAddedSignals() {
s.Equal((discv5.NodeID{1}).String(), sentNodeID) s.Equal((discv5.NodeID{1}).String(), sentNodeID)
s.Equal(MailServerDiscoveryTopic, sentTopic) s.Equal(MailServerDiscoveryTopic, sentTopic)
} }
func (s *CacheOnlyTopicPoolSuite) TestNotTrustedPeer() {
var signalCalled bool
sendEnodeDiscovered = func(_, _ string) { signalCalled = true }
s.topicPool.limits = params.NewLimits(1, 1)
s.topicPool.maxCachedPeers = 1
s.topicPool.verifier = &testFalseVerifier{}
foundPeer := discv5.NewNode(discv5.NodeID{1}, s.peer.Self().IP, 32311, 32311)
s.topicPool.processFoundNode(s.peer, foundPeer)
s.topicPool.ConfirmAdded(s.peer, discover.NodeID(foundPeer.ID))
s.False(signalCalled)
// limits should not change
s.Equal(1, s.topicPool.limits.Max)
s.Equal(1, s.topicPool.limits.Min)
// not verified peer shoud not be added to the cache
s.Equal(0, len(s.topicPool.cache.GetPeersRange(s.topicPool.topic, s.topicPool.limits.Max)))
}
type testTrueVerifier struct{}
func (v *testTrueVerifier) VerifyNode(context.Context, discover.NodeID) bool {
return true
}
type testFalseVerifier struct{}
func (v *testFalseVerifier) VerifyNode(context.Context, discover.NodeID) bool {
return false
}

View File

@ -14,6 +14,7 @@ import (
"github.com/status-im/status-go/discovery" "github.com/status-im/status-go/discovery"
"github.com/status-im/status-go/params" "github.com/status-im/status-go/params"
"github.com/status-im/status-go/peers/verifier"
"github.com/status-im/status-go/signal" "github.com/status-im/status-go/signal"
) )
@ -53,6 +54,8 @@ type Options struct {
// TopicStopSearchDelay time stopSearch will be waiting for max cached peers to be // TopicStopSearchDelay time stopSearch will be waiting for max cached peers to be
// filled before really stopping the search. // filled before really stopping the search.
TopicStopSearchDelay time.Duration TopicStopSearchDelay time.Duration
// TrustedMailServers is a list of trusted nodes.
TrustedMailServers []discover.NodeID
} }
// NewDefaultOptions returns a struct with default Options. // NewDefaultOptions returns a struct with default Options.
@ -140,7 +143,7 @@ func (p *PeerPool) Start(server *p2p.Server) error {
var topicPool TopicPoolInterface var topicPool TopicPoolInterface
t := newTopicPool(p.discovery, topic, limits, p.opts.SlowSync, p.opts.FastSync, p.cache) t := newTopicPool(p.discovery, topic, limits, p.opts.SlowSync, p.opts.FastSync, p.cache)
if topic == MailServerDiscoveryTopic { if topic == MailServerDiscoveryTopic {
topicPool = newCacheOnlyTopicPool(t) topicPool = newCacheOnlyTopicPool(t, verifier.NewLocalVerifier(p.opts.TrustedMailServers))
} else { } else {
topicPool = t topicPool = t
} }

View File

@ -162,7 +162,7 @@ func (s *PeerPoolSimulationSuite) TestPeerPoolCacheEthV5() {
config := map[discv5.Topic]params.Limits{ config := map[discv5.Topic]params.Limits{
topic: params.NewLimits(1, 1), topic: params.NewLimits(1, 1),
} }
peerPoolOpts := &Options{100 * time.Millisecond, 100 * time.Millisecond, 0, true, 100 * time.Millisecond} peerPoolOpts := &Options{100 * time.Millisecond, 100 * time.Millisecond, 0, true, 100 * time.Millisecond, nil}
cache, err := newInMemoryCache() cache, err := newInMemoryCache()
s.Require().NoError(err) s.Require().NoError(err)
peerPool := NewPeerPool(s.discovery[1], config, cache, peerPoolOpts) peerPool := NewPeerPool(s.discovery[1], config, cache, peerPoolOpts)
@ -220,7 +220,7 @@ func (s *PeerPoolSimulationSuite) singleTopicDiscoveryWithFailover() {
config := map[discv5.Topic]params.Limits{ config := map[discv5.Topic]params.Limits{
topic: params.NewLimits(1, 1), // limits are chosen for simplicity of the simulation topic: params.NewLimits(1, 1), // limits are chosen for simplicity of the simulation
} }
peerPoolOpts := &Options{100 * time.Millisecond, 100 * time.Millisecond, 0, true, 0} peerPoolOpts := &Options{100 * time.Millisecond, 100 * time.Millisecond, 0, true, 0, nil}
cache, err := newInMemoryCache() cache, err := newInMemoryCache()
s.Require().NoError(err) s.Require().NoError(err)
peerPool := NewPeerPool(s.discovery[1], config, cache, peerPoolOpts) peerPool := NewPeerPool(s.discovery[1], config, cache, peerPoolOpts)
@ -302,7 +302,7 @@ func TestPeerPoolMaxPeersOverflow(t *testing.T) {
defer func() { assert.NoError(t, discovery.Stop()) }() defer func() { assert.NoError(t, discovery.Stop()) }()
require.True(t, discovery.Running()) require.True(t, discovery.Running())
poolOpts := &Options{DefaultFastSync, DefaultSlowSync, 0, true, 100 * time.Millisecond} poolOpts := &Options{DefaultFastSync, DefaultSlowSync, 0, true, 100 * time.Millisecond, nil}
pool := NewPeerPool(discovery, nil, nil, poolOpts) pool := NewPeerPool(discovery, nil, nil, poolOpts)
require.NoError(t, pool.Start(peer)) require.NoError(t, pool.Start(peer))
require.Equal(t, signal.EventDiscoveryStarted, <-signals) require.Equal(t, signal.EventDiscoveryStarted, <-signals)
@ -355,7 +355,7 @@ func TestPeerPoolDiscV5Timeout(t *testing.T) {
require.True(t, discovery.Running()) require.True(t, discovery.Running())
// start PeerPool // start PeerPool
poolOpts := &Options{DefaultFastSync, DefaultSlowSync, time.Millisecond * 100, true, 100 * time.Millisecond} poolOpts := &Options{DefaultFastSync, DefaultSlowSync, time.Millisecond * 100, true, 100 * time.Millisecond, nil}
pool := NewPeerPool(discovery, nil, nil, poolOpts) pool := NewPeerPool(discovery, nil, nil, poolOpts)
require.NoError(t, pool.Start(server)) require.NoError(t, pool.Start(server))
require.Equal(t, signal.EventDiscoveryStarted, <-signals) require.Equal(t, signal.EventDiscoveryStarted, <-signals)
@ -402,7 +402,7 @@ func TestPeerPoolNotAllowedStopping(t *testing.T) {
require.True(t, discovery.Running()) require.True(t, discovery.Running())
// start PeerPool // start PeerPool
poolOpts := &Options{DefaultFastSync, DefaultSlowSync, time.Millisecond * 100, false, 100 * time.Millisecond} poolOpts := &Options{DefaultFastSync, DefaultSlowSync, time.Millisecond * 100, false, 100 * time.Millisecond, nil}
pool := NewPeerPool(discovery, nil, nil, poolOpts) pool := NewPeerPool(discovery, nil, nil, poolOpts)
require.NoError(t, pool.Start(server)) require.NoError(t, pool.Start(server))
@ -419,7 +419,7 @@ func (s *PeerPoolSimulationSuite) TestUpdateTopicLimits() {
config := map[discv5.Topic]params.Limits{ config := map[discv5.Topic]params.Limits{
topic: params.NewLimits(1, 1), topic: params.NewLimits(1, 1),
} }
peerPoolOpts := &Options{100 * time.Millisecond, 100 * time.Millisecond, 0, true, 100 * time.Millisecond} peerPoolOpts := &Options{100 * time.Millisecond, 100 * time.Millisecond, 0, true, 100 * time.Millisecond, nil}
cache, err := newInMemoryCache() cache, err := newInMemoryCache()
s.Require().NoError(err) s.Require().NoError(err)
peerPool := NewPeerPool(s.discovery[1], config, cache, peerPoolOpts) peerPool := NewPeerPool(s.discovery[1], config, cache, peerPoolOpts)
@ -447,3 +447,73 @@ func (s *PeerPoolSimulationSuite) TestUpdateTopicLimits() {
s.Equal(5, tp.limits.Min) s.Equal(5, tp.limits.Min)
} }
} }
func (s *PeerPoolSimulationSuite) TestMailServerPeersDiscovery() {
s.setupEthV5()
// Buffered channels must be used because we expect the events
// to be in the same order. Use a buffer length greater than
// the expected number of events to avoid deadlock.
poolEvents := make(chan string, 10)
summaries := make(chan []*p2p.PeerInfo, 10)
signal.SetDefaultNodeNotificationHandler(func(jsonEvent string) {
var envelope struct {
Type string
Event json.RawMessage
}
s.NoError(json.Unmarshal([]byte(jsonEvent), &envelope))
switch typ := envelope.Type; typ {
case signal.EventDiscoverySummary:
poolEvents <- envelope.Type
var summary []*p2p.PeerInfo
s.NoError(json.Unmarshal(envelope.Event, &summary))
summaries <- summary
}
})
defer signal.ResetDefaultNodeNotificationHandler()
// subscribe for peer events before starting the peer pool
events := make(chan *p2p.PeerEvent, 20)
subscription := s.peers[1].SubscribeEvents(events)
defer subscription.Unsubscribe()
// create and start topic registry
register := NewRegister(s.discovery[0], MailServerDiscoveryTopic)
s.Require().NoError(register.Start())
// create and start peer pool
config := map[discv5.Topic]params.Limits{
MailServerDiscoveryTopic: params.NewLimits(1, 1),
}
cache, err := newInMemoryCache()
s.Require().NoError(err)
peerPoolOpts := &Options{
100 * time.Millisecond,
100 * time.Millisecond,
0,
true,
100 * time.Millisecond,
[]discover.NodeID{s.peers[0].Self().ID},
}
peerPool := NewPeerPool(s.discovery[1], config, cache, peerPoolOpts)
s.Require().NoError(peerPool.Start(s.peers[1]))
defer peerPool.Stop()
// wait for and verify the mail server peer
connectedPeer := s.getPeerFromEvent(events, p2p.PeerEventTypeAdd)
s.Equal(s.peers[0].Self().ID, connectedPeer)
// wait for a summary event to be sure that ConfirmAdded() was called
s.Equal(signal.EventDiscoverySummary, s.getPoolEvent(poolEvents))
s.Equal(s.peers[0].Self().ID.String(), (<-summaries)[0].ID)
// check cache
cachedPeers := peerPool.cache.GetPeersRange(MailServerDiscoveryTopic, 5)
s.Require().Len(cachedPeers, 1)
s.Equal(s.peers[0].Self().ID[:], cachedPeers[0].ID[:])
// wait for another summary event as the peer should be removed
s.Equal(signal.EventDiscoverySummary, s.getPoolEvent(poolEvents))
s.Len(<-summaries, 0)
}

View File

@ -325,6 +325,6 @@ func (s *TopicPoolSuite) TestNewTopicPoolInterface() {
s.IsType(&TopicPool{}, t) s.IsType(&TopicPool{}, t)
tp := newTopicPool(nil, MailServerDiscoveryTopic, limits, 100*time.Millisecond, 200*time.Millisecond, cache) tp := newTopicPool(nil, MailServerDiscoveryTopic, limits, 100*time.Millisecond, 200*time.Millisecond, cache)
cacheTP := newCacheOnlyTopicPool(tp) cacheTP := newCacheOnlyTopicPool(tp, &testTrueVerifier{})
s.IsType(&cacheOnlyTopicPool{}, cacheTP) s.IsType(&cacheOnlyTopicPool{}, cacheTP)
} }

View File

@ -0,0 +1,30 @@
package verifier
import (
"context"
"github.com/ethereum/go-ethereum/p2p/discover"
)
// LocalVerifier verifies nodes based on a provided local list.
type LocalVerifier struct {
KnownPeers map[discover.NodeID]struct{}
}
// NewLocalVerifier returns a new LocalVerifier instance.
func NewLocalVerifier(peers []discover.NodeID) *LocalVerifier {
knownPeers := make(map[discover.NodeID]struct{})
for _, peer := range peers {
knownPeers[peer] = struct{}{}
}
return &LocalVerifier{KnownPeers: knownPeers}
}
// VerifyNode checks if a given node is trusted using a local list.
func (v *LocalVerifier) VerifyNode(_ context.Context, nodeID discover.NodeID) bool {
if _, ok := v.KnownPeers[nodeID]; ok {
return true
}
return false
}

View File

@ -0,0 +1,17 @@
package verifier
import (
"context"
"testing"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/stretchr/testify/require"
)
func TestLocalVerifierForNodeIDTypes(t *testing.T) {
nodeID := discover.NodeID{1}
v := NewLocalVerifier([]discover.NodeID{{1}})
require.True(t, v.VerifyNode(context.TODO(), nodeID))
require.False(t, v.VerifyNode(context.TODO(), discover.NodeID{2}))
}