401 lines
11 KiB
Go
401 lines
11 KiB
Go
package mailservers
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/ethereum/go-ethereum/event"
|
|
"github.com/ethereum/go-ethereum/p2p"
|
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
|
|
|
"github.com/status-im/status-go/eth-node/types"
|
|
"github.com/status-im/status-go/t/utils"
|
|
)
|
|
|
|
type fakePeerEvents struct {
|
|
mu sync.Mutex
|
|
nodes map[types.EnodeID]struct{}
|
|
input chan *p2p.PeerEvent
|
|
}
|
|
|
|
func (f *fakePeerEvents) Nodes() []types.EnodeID {
|
|
f.mu.Lock()
|
|
rst := make([]types.EnodeID, 0, len(f.nodes))
|
|
for n := range f.nodes {
|
|
rst = append(rst, n)
|
|
}
|
|
f.mu.Unlock()
|
|
return rst
|
|
}
|
|
|
|
func (f *fakePeerEvents) AddPeer(node *enode.Node) {
|
|
f.mu.Lock()
|
|
f.nodes[types.EnodeID(node.ID())] = struct{}{}
|
|
f.mu.Unlock()
|
|
if f.input == nil {
|
|
return
|
|
}
|
|
f.input <- &p2p.PeerEvent{
|
|
Peer: node.ID(),
|
|
Type: p2p.PeerEventTypeAdd,
|
|
}
|
|
}
|
|
|
|
func (f *fakePeerEvents) RemovePeer(node *enode.Node) {
|
|
f.mu.Lock()
|
|
delete(f.nodes, types.EnodeID(node.ID()))
|
|
f.mu.Unlock()
|
|
if f.input == nil {
|
|
return
|
|
}
|
|
f.input <- &p2p.PeerEvent{
|
|
Peer: node.ID(),
|
|
Type: p2p.PeerEventTypeDrop,
|
|
}
|
|
}
|
|
|
|
func newFakePeerAdderRemover() *fakePeerEvents {
|
|
return &fakePeerEvents{nodes: map[types.EnodeID]struct{}{}}
|
|
}
|
|
|
|
func (f *fakePeerEvents) SubscribeEvents(output chan *p2p.PeerEvent) event.Subscription {
|
|
return event.NewSubscription(func(quit <-chan struct{}) error {
|
|
for {
|
|
select {
|
|
case <-quit:
|
|
return nil
|
|
case ev := <-f.input:
|
|
// will block the same way as in any feed
|
|
output <- ev
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
func newFakeServer() *fakePeerEvents {
|
|
srv := newFakePeerAdderRemover()
|
|
srv.input = make(chan *p2p.PeerEvent, 20)
|
|
return srv
|
|
}
|
|
|
|
type fakeEnvelopeEvents struct {
|
|
input chan types.EnvelopeEvent
|
|
}
|
|
|
|
func (f *fakeEnvelopeEvents) SubscribeEnvelopeEvents(output chan<- types.EnvelopeEvent) types.Subscription {
|
|
return event.NewSubscription(func(quit <-chan struct{}) error {
|
|
for {
|
|
select {
|
|
case <-quit:
|
|
return nil
|
|
case ev := <-f.input:
|
|
// will block the same way as in any feed
|
|
output <- ev
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
func newFakeEnvelopesEvents() *fakeEnvelopeEvents {
|
|
return &fakeEnvelopeEvents{
|
|
input: make(chan types.EnvelopeEvent),
|
|
}
|
|
}
|
|
|
|
func fillWithRandomNodes(t *testing.T, nodes []*enode.Node) {
|
|
var err error
|
|
for i := range nodes {
|
|
nodes[i], err = RandomNode()
|
|
require.NoError(t, err)
|
|
}
|
|
}
|
|
|
|
func getMapWithRandomNodes(t *testing.T, n int) map[types.EnodeID]*enode.Node {
|
|
nodes := make([]*enode.Node, n)
|
|
fillWithRandomNodes(t, nodes)
|
|
return nodesToMap(nodes)
|
|
}
|
|
|
|
func mergeOldIntoNew(old, new map[types.EnodeID]*enode.Node) {
|
|
for n := range old {
|
|
new[n] = old[n]
|
|
}
|
|
}
|
|
|
|
func TestReplaceNodes(t *testing.T) {
|
|
type testCase struct {
|
|
description string
|
|
old map[types.EnodeID]*enode.Node
|
|
new map[types.EnodeID]*enode.Node
|
|
target int
|
|
}
|
|
for _, tc := range []testCase{
|
|
{
|
|
"InitialReplace",
|
|
getMapWithRandomNodes(t, 0),
|
|
getMapWithRandomNodes(t, 3),
|
|
2,
|
|
},
|
|
{
|
|
"FullReplace",
|
|
getMapWithRandomNodes(t, 3),
|
|
getMapWithRandomNodes(t, 3),
|
|
2,
|
|
},
|
|
} {
|
|
t.Run(tc.description, func(t *testing.T) {
|
|
peers := newFakePeerAdderRemover()
|
|
state := newInternalState(peers, tc.target, 0)
|
|
state.replaceNodes(tc.old)
|
|
require.Len(t, peers.nodes, len(tc.old))
|
|
for n := range peers.nodes {
|
|
require.Contains(t, tc.old, n)
|
|
}
|
|
state.replaceNodes(tc.new)
|
|
require.Len(t, peers.nodes, len(tc.new))
|
|
for n := range peers.nodes {
|
|
require.Contains(t, tc.new, n)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestPartialReplaceNodesBelowTarget(t *testing.T) {
|
|
peers := newFakePeerAdderRemover()
|
|
old := getMapWithRandomNodes(t, 1)
|
|
new := getMapWithRandomNodes(t, 2)
|
|
state := newInternalState(peers, 2, 0)
|
|
state.replaceNodes(old)
|
|
mergeOldIntoNew(old, new)
|
|
state.replaceNodes(new)
|
|
require.Len(t, peers.nodes, len(new))
|
|
}
|
|
|
|
func TestPartialReplaceNodesAboveTarget(t *testing.T) {
|
|
peers := newFakePeerAdderRemover()
|
|
initial, err := RandomNode()
|
|
require.NoError(t, err)
|
|
old := nodesToMap([]*enode.Node{initial})
|
|
new := getMapWithRandomNodes(t, 2)
|
|
state := newInternalState(peers, 1, 0)
|
|
state.replaceNodes(old)
|
|
state.nodeAdded(types.EnodeID(initial.ID()))
|
|
mergeOldIntoNew(old, new)
|
|
state.replaceNodes(new)
|
|
require.Len(t, peers.nodes, 1)
|
|
}
|
|
|
|
func TestConnectionManagerAddDrop(t *testing.T) {
|
|
server := newFakeServer()
|
|
whisper := newFakeEnvelopesEvents()
|
|
target := 1
|
|
connmanager := NewConnectionManager(server, whisper, target, 1, 0)
|
|
connmanager.Start()
|
|
defer connmanager.Stop()
|
|
nodes := []*enode.Node{}
|
|
for _, n := range getMapWithRandomNodes(t, 3) {
|
|
nodes = append(nodes, n)
|
|
}
|
|
// Send 3 random nodes to connection manager.
|
|
connmanager.Notify(nodes)
|
|
var initial enode.ID
|
|
// Wait till connection manager establishes connection with 1 peer.
|
|
require.NoError(t, utils.Eventually(func() error {
|
|
nodes := server.Nodes()
|
|
if len(nodes) != target {
|
|
return fmt.Errorf("unexpected number of connected servers: %d", len(nodes))
|
|
}
|
|
initial = enode.ID(nodes[0])
|
|
return nil
|
|
}, time.Second, 100*time.Millisecond))
|
|
// Send an event that peer was dropped.
|
|
select {
|
|
case server.input <- &p2p.PeerEvent{Peer: initial, Type: p2p.PeerEventTypeDrop}:
|
|
case <-time.After(time.Second):
|
|
require.FailNow(t, "can't send a drop event")
|
|
}
|
|
// Connection manager should establish connection with any other peer from initial list.
|
|
require.NoError(t, utils.Eventually(func() error {
|
|
nodes := server.Nodes()
|
|
if len(nodes) != target {
|
|
return fmt.Errorf("unexpected number of connected servers: %d", len(nodes))
|
|
}
|
|
if enode.ID(nodes[0]) == initial {
|
|
return fmt.Errorf("connected node wasn't changed from %s", initial)
|
|
}
|
|
return nil
|
|
}, time.Second, 100*time.Millisecond))
|
|
}
|
|
|
|
func TestConnectionManagerReplace(t *testing.T) {
|
|
server := newFakeServer()
|
|
whisper := newFakeEnvelopesEvents()
|
|
target := 1
|
|
connmanager := NewConnectionManager(server, whisper, target, 1, 0)
|
|
connmanager.Start()
|
|
defer connmanager.Stop()
|
|
nodes := []*enode.Node{}
|
|
for _, n := range getMapWithRandomNodes(t, 3) {
|
|
nodes = append(nodes, n)
|
|
}
|
|
// Send a single node to connection manager.
|
|
connmanager.Notify(nodes[:1])
|
|
// Wait until this node will get connected.
|
|
require.NoError(t, utils.Eventually(func() error {
|
|
connected := server.Nodes()
|
|
if len(connected) != target {
|
|
return fmt.Errorf("unexpected number of connected servers: %d", len(connected))
|
|
}
|
|
if types.EnodeID(nodes[0].ID()) != connected[0] {
|
|
return fmt.Errorf("connected with a wrong peer. expected %s, got %s", nodes[0].ID(), connected[0])
|
|
}
|
|
return nil
|
|
}, time.Second, 100*time.Millisecond))
|
|
// Replace previously sent node with 2 different nodes.
|
|
connmanager.Notify(nodes[1:])
|
|
// Wait until connection manager replaces node connected in the first round.
|
|
require.NoError(t, utils.Eventually(func() error {
|
|
connected := server.Nodes()
|
|
if len(connected) != target {
|
|
return fmt.Errorf("unexpected number of connected servers: %d", len(connected))
|
|
}
|
|
switch enode.ID(connected[0]) {
|
|
case nodes[1].ID():
|
|
case nodes[2].ID():
|
|
default:
|
|
return fmt.Errorf("connected with unexpected peer. got %s, expected %+v", connected[0], nodes[1:])
|
|
}
|
|
return nil
|
|
}, time.Second, 100*time.Millisecond))
|
|
}
|
|
|
|
func setupTestConnectionAfterExpiry(t *testing.T, server *fakePeerEvents, whisperMock *fakeEnvelopeEvents, target, maxFailures int, hash types.Hash) (*ConnectionManager, types.EnodeID) {
|
|
connmanager := NewConnectionManager(server, whisperMock, target, maxFailures, 0)
|
|
connmanager.Start()
|
|
nodes := []*enode.Node{}
|
|
for _, n := range getMapWithRandomNodes(t, 2) {
|
|
nodes = append(nodes, n)
|
|
}
|
|
// Send two random nodes to connection manager.
|
|
connmanager.Notify(nodes)
|
|
var initial types.EnodeID
|
|
// Wait until connection manager establishes connection with one node.
|
|
require.NoError(t, utils.Eventually(func() error {
|
|
nodes := server.Nodes()
|
|
if len(nodes) != target {
|
|
return fmt.Errorf("unexpected number of connected servers: %d", len(nodes))
|
|
}
|
|
initial = nodes[0]
|
|
return nil
|
|
}, time.Second, 100*time.Millisecond))
|
|
// Send event that history request for connected peer was sent.
|
|
select {
|
|
case whisperMock.input <- types.EnvelopeEvent{
|
|
Event: types.EventMailServerRequestSent, Peer: initial, Hash: hash}:
|
|
case <-time.After(time.Second):
|
|
require.FailNow(t, "can't send a 'sent' event")
|
|
}
|
|
return connmanager, initial
|
|
}
|
|
|
|
func TestConnectionChangedAfterExpiry(t *testing.T) {
|
|
server := newFakeServer()
|
|
whisperMock := newFakeEnvelopesEvents()
|
|
target := 1
|
|
maxFailures := 1
|
|
hash := types.Hash{1}
|
|
connmanager, initial := setupTestConnectionAfterExpiry(t, server, whisperMock, target, maxFailures, hash)
|
|
defer connmanager.Stop()
|
|
|
|
// And eventually expired.
|
|
select {
|
|
case whisperMock.input <- types.EnvelopeEvent{
|
|
Event: types.EventMailServerRequestExpired, Peer: initial, Hash: hash}:
|
|
case <-time.After(time.Second):
|
|
require.FailNow(t, "can't send an 'expiry' event")
|
|
}
|
|
require.NoError(t, utils.Eventually(func() error {
|
|
nodes := server.Nodes()
|
|
if len(nodes) != target {
|
|
return fmt.Errorf("unexpected number of connected servers: %d", len(nodes))
|
|
}
|
|
if nodes[0] == initial {
|
|
return fmt.Errorf("connected node wasn't changed from %s", initial)
|
|
}
|
|
return nil
|
|
}, time.Second, 100*time.Millisecond))
|
|
}
|
|
|
|
func TestConnectionChangedAfterSecondExpiry(t *testing.T) {
|
|
server := newFakeServer()
|
|
whisperMock := newFakeEnvelopesEvents()
|
|
target := 1
|
|
maxFailures := 2
|
|
hash := types.Hash{1}
|
|
connmanager, initial := setupTestConnectionAfterExpiry(t, server, whisperMock, target, maxFailures, hash)
|
|
defer connmanager.Stop()
|
|
|
|
// First expired is sent. Nothing should happen.
|
|
select {
|
|
case whisperMock.input <- types.EnvelopeEvent{
|
|
Event: types.EventMailServerRequestExpired, Peer: initial, Hash: hash}:
|
|
case <-time.After(time.Second):
|
|
require.FailNow(t, "can't send an 'expiry' event")
|
|
}
|
|
|
|
// we use 'eventually' as 'consistently' because this function will retry for a given timeout while error is received
|
|
require.EqualError(t, utils.Eventually(func() error {
|
|
nodes := server.Nodes()
|
|
if len(nodes) != target {
|
|
return fmt.Errorf("unexpected number of connected servers: %d", len(nodes))
|
|
}
|
|
if nodes[0] == initial {
|
|
return fmt.Errorf("connected node wasn't changed from %s", initial)
|
|
}
|
|
return nil
|
|
}, time.Second, 100*time.Millisecond), fmt.Sprintf("connected node wasn't changed from %s", initial))
|
|
|
|
// second expiry event
|
|
select {
|
|
case whisperMock.input <- types.EnvelopeEvent{
|
|
Event: types.EventMailServerRequestExpired, Peer: initial, Hash: hash}:
|
|
case <-time.After(time.Second):
|
|
require.FailNow(t, "can't send an 'expiry' event")
|
|
}
|
|
require.NoError(t, utils.Eventually(func() error {
|
|
nodes := server.Nodes()
|
|
if len(nodes) != target {
|
|
return fmt.Errorf("unexpected number of connected servers: %d", len(nodes))
|
|
}
|
|
if nodes[0] == initial {
|
|
return fmt.Errorf("connected node wasn't changed from %s", initial)
|
|
}
|
|
return nil
|
|
}, time.Second, 100*time.Millisecond))
|
|
}
|
|
|
|
func TestProcessReplacementWaitsForConnections(t *testing.T) {
|
|
srv := newFakePeerAdderRemover()
|
|
target := 1
|
|
timeout := time.Second
|
|
nodes := make([]*enode.Node, 2)
|
|
fillWithRandomNodes(t, nodes)
|
|
events := make(chan *p2p.PeerEvent)
|
|
state := newInternalState(srv, target, timeout)
|
|
state.currentNodes = nodesToMap(nodes)
|
|
go func() {
|
|
select {
|
|
case events <- &p2p.PeerEvent{Peer: nodes[0].ID(), Type: p2p.PeerEventTypeAdd}:
|
|
case <-time.After(time.Second):
|
|
assert.FailNow(t, "can't send a drop event")
|
|
}
|
|
}()
|
|
state.processReplacement(nodes, events)
|
|
require.Len(t, state.connected, 1)
|
|
}
|