mirror of
https://github.com/status-im/status-go.git
synced 2025-01-09 06:12:55 +00:00
f2c6fef64c
This change allows to connect to the mail server that we were using before the app was restarted. Separate loop is listening for whisper events, and when we receive event that request was completed we will update time on a peer record. Records are stored in leveldb. Body of the record is marshaled using json. At this point the only field is a timestamp when record was used. This loop doesn't control connections, it only tracks what mail server we ended up using. It works asynchronously to connection management loop. Which tracks events that are related to connection state and expiry of the requests. When app starts we look into the database and select the most recently used record. This record is added to connection management loop first. So if this server is available we will stick to using it. If we weren't able to connect to the same server in configured timeout (5s) we will try to connect to any other server from list of active servers. closes: #1285
345 lines
9.2 KiB
Go
345 lines
9.2 KiB
Go
package mailservers
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/common"
|
|
"github.com/ethereum/go-ethereum/event"
|
|
"github.com/ethereum/go-ethereum/p2p"
|
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
|
"github.com/status-im/status-go/t/utils"
|
|
whisper "github.com/status-im/whisper/whisperv6"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
type fakePeerEvents struct {
|
|
mu sync.Mutex
|
|
nodes map[enode.ID]struct{}
|
|
input chan *p2p.PeerEvent
|
|
}
|
|
|
|
func (f *fakePeerEvents) Nodes() []enode.ID {
|
|
f.mu.Lock()
|
|
rst := make([]enode.ID, 0, len(f.nodes))
|
|
for n := range f.nodes {
|
|
rst = append(rst, n)
|
|
}
|
|
f.mu.Unlock()
|
|
return rst
|
|
}
|
|
|
|
func (f *fakePeerEvents) AddPeer(node *enode.Node) {
|
|
f.mu.Lock()
|
|
f.nodes[node.ID()] = struct{}{}
|
|
f.mu.Unlock()
|
|
if f.input == nil {
|
|
return
|
|
}
|
|
f.input <- &p2p.PeerEvent{
|
|
Peer: node.ID(),
|
|
Type: p2p.PeerEventTypeAdd,
|
|
}
|
|
}
|
|
|
|
func (f *fakePeerEvents) RemovePeer(node *enode.Node) {
|
|
f.mu.Lock()
|
|
delete(f.nodes, node.ID())
|
|
f.mu.Unlock()
|
|
if f.input == nil {
|
|
return
|
|
}
|
|
f.input <- &p2p.PeerEvent{
|
|
Peer: node.ID(),
|
|
Type: p2p.PeerEventTypeDrop,
|
|
}
|
|
}
|
|
|
|
func newFakePeerAdderRemover() *fakePeerEvents {
|
|
return &fakePeerEvents{nodes: map[enode.ID]struct{}{}}
|
|
}
|
|
|
|
func (f *fakePeerEvents) SubscribeEvents(output chan *p2p.PeerEvent) event.Subscription {
|
|
return event.NewSubscription(func(quit <-chan struct{}) error {
|
|
for {
|
|
select {
|
|
case <-quit:
|
|
return nil
|
|
case ev := <-f.input:
|
|
// will block the same way as in any feed
|
|
output <- ev
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
func newFakeServer() *fakePeerEvents {
|
|
srv := newFakePeerAdderRemover()
|
|
srv.input = make(chan *p2p.PeerEvent, 20)
|
|
return srv
|
|
}
|
|
|
|
type fakeEnvelopeEvents struct {
|
|
input chan whisper.EnvelopeEvent
|
|
}
|
|
|
|
func (f fakeEnvelopeEvents) SubscribeEnvelopeEvents(output chan<- whisper.EnvelopeEvent) event.Subscription {
|
|
return event.NewSubscription(func(quit <-chan struct{}) error {
|
|
for {
|
|
select {
|
|
case <-quit:
|
|
return nil
|
|
case ev := <-f.input:
|
|
// will block the same way as in any feed
|
|
output <- ev
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
func newFakeEnvelopesEvents() fakeEnvelopeEvents {
|
|
return fakeEnvelopeEvents{
|
|
input: make(chan whisper.EnvelopeEvent),
|
|
}
|
|
}
|
|
|
|
func fillWithRandomNodes(t *testing.T, nodes []*enode.Node) {
|
|
var err error
|
|
for i := range nodes {
|
|
nodes[i], err = RandomNode()
|
|
require.NoError(t, err)
|
|
}
|
|
}
|
|
|
|
func getMapWithRandomNodes(t *testing.T, n int) map[enode.ID]*enode.Node {
|
|
nodes := make([]*enode.Node, n)
|
|
fillWithRandomNodes(t, nodes)
|
|
return nodesToMap(nodes)
|
|
}
|
|
|
|
func mergeOldIntoNew(old, new map[enode.ID]*enode.Node) {
|
|
for n := range old {
|
|
new[n] = old[n]
|
|
}
|
|
}
|
|
|
|
func TestReplaceNodes(t *testing.T) {
|
|
type testCase struct {
|
|
description string
|
|
old map[enode.ID]*enode.Node
|
|
new map[enode.ID]*enode.Node
|
|
target int
|
|
}
|
|
for _, tc := range []testCase{
|
|
{
|
|
"InitialReplace",
|
|
getMapWithRandomNodes(t, 0),
|
|
getMapWithRandomNodes(t, 3),
|
|
2,
|
|
},
|
|
{
|
|
"FullReplace",
|
|
getMapWithRandomNodes(t, 3),
|
|
getMapWithRandomNodes(t, 3),
|
|
2,
|
|
},
|
|
} {
|
|
t.Run(tc.description, func(t *testing.T) {
|
|
peers := newFakePeerAdderRemover()
|
|
state := newInternalState(peers, tc.target, 0)
|
|
state.replaceNodes(tc.old)
|
|
require.Len(t, peers.nodes, len(tc.old))
|
|
for n := range peers.nodes {
|
|
require.Contains(t, tc.old, n)
|
|
}
|
|
state.replaceNodes(tc.new)
|
|
require.Len(t, peers.nodes, len(tc.new))
|
|
for n := range peers.nodes {
|
|
require.Contains(t, tc.new, n)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestPartialReplaceNodesBelowTarget(t *testing.T) {
|
|
peers := newFakePeerAdderRemover()
|
|
old := getMapWithRandomNodes(t, 1)
|
|
new := getMapWithRandomNodes(t, 2)
|
|
state := newInternalState(peers, 2, 0)
|
|
state.replaceNodes(old)
|
|
mergeOldIntoNew(old, new)
|
|
state.replaceNodes(new)
|
|
require.Len(t, peers.nodes, len(new))
|
|
}
|
|
|
|
func TestPartialReplaceNodesAboveTarget(t *testing.T) {
|
|
peers := newFakePeerAdderRemover()
|
|
initial, err := RandomNode()
|
|
require.NoError(t, err)
|
|
old := nodesToMap([]*enode.Node{initial})
|
|
new := getMapWithRandomNodes(t, 2)
|
|
state := newInternalState(peers, 1, 0)
|
|
state.replaceNodes(old)
|
|
state.nodeAdded(initial.ID())
|
|
mergeOldIntoNew(old, new)
|
|
state.replaceNodes(new)
|
|
require.Len(t, peers.nodes, 1)
|
|
}
|
|
|
|
func TestConnectionManagerAddDrop(t *testing.T) {
|
|
server := newFakeServer()
|
|
whisper := newFakeEnvelopesEvents()
|
|
target := 1
|
|
connmanager := NewConnectionManager(server, whisper, target, 0)
|
|
connmanager.Start()
|
|
defer connmanager.Stop()
|
|
nodes := []*enode.Node{}
|
|
for _, n := range getMapWithRandomNodes(t, 3) {
|
|
nodes = append(nodes, n)
|
|
}
|
|
// Send 3 random nodes to connection manager.
|
|
connmanager.Notify(nodes)
|
|
var initial enode.ID
|
|
// Wait till connection manager establishes connection with 1 peer.
|
|
require.NoError(t, utils.Eventually(func() error {
|
|
nodes := server.Nodes()
|
|
if len(nodes) != target {
|
|
return fmt.Errorf("unexpected number of connected servers: %d", len(nodes))
|
|
}
|
|
initial = nodes[0]
|
|
return nil
|
|
}, time.Second, 100*time.Millisecond))
|
|
// Send an event that peer was dropped.
|
|
select {
|
|
case server.input <- &p2p.PeerEvent{Peer: initial, Type: p2p.PeerEventTypeDrop}:
|
|
case <-time.After(time.Second):
|
|
require.FailNow(t, "can't send a drop event")
|
|
}
|
|
// Connection manager should establish connection with any other peer from initial list.
|
|
require.NoError(t, utils.Eventually(func() error {
|
|
nodes := server.Nodes()
|
|
if len(nodes) != target {
|
|
return fmt.Errorf("unexpected number of connected servers: %d", len(nodes))
|
|
}
|
|
if nodes[0] == initial {
|
|
return fmt.Errorf("connected node wasn't changed from %s", initial)
|
|
}
|
|
return nil
|
|
}, time.Second, 100*time.Millisecond))
|
|
}
|
|
|
|
func TestConnectionManagerReplace(t *testing.T) {
|
|
server := newFakeServer()
|
|
whisper := newFakeEnvelopesEvents()
|
|
target := 1
|
|
connmanager := NewConnectionManager(server, whisper, target, 0)
|
|
connmanager.Start()
|
|
defer connmanager.Stop()
|
|
nodes := []*enode.Node{}
|
|
for _, n := range getMapWithRandomNodes(t, 3) {
|
|
nodes = append(nodes, n)
|
|
}
|
|
// Send a single node to connection manager.
|
|
connmanager.Notify(nodes[:1])
|
|
// Wait until this node will get connected.
|
|
require.NoError(t, utils.Eventually(func() error {
|
|
connected := server.Nodes()
|
|
if len(connected) != target {
|
|
return fmt.Errorf("unexpected number of connected servers: %d", len(connected))
|
|
}
|
|
if nodes[0].ID() != connected[0] {
|
|
return fmt.Errorf("connected with a wrong peer. expected %s, got %s", nodes[0].ID(), connected[0])
|
|
}
|
|
return nil
|
|
}, time.Second, 100*time.Millisecond))
|
|
// Replace previously sent node with 2 different nodes.
|
|
connmanager.Notify(nodes[1:])
|
|
// Wait until connection manager replaces node connected in the first round.
|
|
require.NoError(t, utils.Eventually(func() error {
|
|
connected := server.Nodes()
|
|
if len(connected) != target {
|
|
return fmt.Errorf("unexpected number of connected servers: %d", len(connected))
|
|
}
|
|
switch connected[0] {
|
|
case nodes[1].ID():
|
|
case nodes[2].ID():
|
|
default:
|
|
return fmt.Errorf("connected with unexpected peer. got %s, expected %+v", connected[0], nodes[1:])
|
|
}
|
|
return nil
|
|
}, time.Second, 100*time.Millisecond))
|
|
}
|
|
|
|
func TestConnectionChangedAfterExpiry(t *testing.T) {
|
|
server := newFakeServer()
|
|
whisperMock := newFakeEnvelopesEvents()
|
|
target := 1
|
|
connmanager := NewConnectionManager(server, whisperMock, target, 0)
|
|
connmanager.Start()
|
|
defer connmanager.Stop()
|
|
nodes := []*enode.Node{}
|
|
for _, n := range getMapWithRandomNodes(t, 2) {
|
|
nodes = append(nodes, n)
|
|
}
|
|
// Send two random nodes to connection manager.
|
|
connmanager.Notify(nodes)
|
|
var initial enode.ID
|
|
// Wait until connection manager establishes connection with one node.
|
|
require.NoError(t, utils.Eventually(func() error {
|
|
nodes := server.Nodes()
|
|
if len(nodes) != target {
|
|
return fmt.Errorf("unexpected number of connected servers: %d", len(nodes))
|
|
}
|
|
initial = nodes[0]
|
|
return nil
|
|
}, time.Second, 100*time.Millisecond))
|
|
hash := common.Hash{1}
|
|
// Send event that history request for connected peer was sent.
|
|
select {
|
|
case whisperMock.input <- whisper.EnvelopeEvent{
|
|
Event: whisper.EventMailServerRequestSent, Peer: initial, Hash: hash}:
|
|
case <-time.After(time.Second):
|
|
require.FailNow(t, "can't send a 'sent' event")
|
|
}
|
|
// And eventually expired.
|
|
select {
|
|
case whisperMock.input <- whisper.EnvelopeEvent{
|
|
Event: whisper.EventMailServerRequestExpired, Peer: initial, Hash: hash}:
|
|
case <-time.After(time.Second):
|
|
require.FailNow(t, "can't send an 'expiry' event")
|
|
}
|
|
require.NoError(t, utils.Eventually(func() error {
|
|
nodes := server.Nodes()
|
|
if len(nodes) != target {
|
|
return fmt.Errorf("unexpected number of connected servers: %d", len(nodes))
|
|
}
|
|
if nodes[0] == initial {
|
|
return fmt.Errorf("connected node wasn't changed from %s", initial)
|
|
}
|
|
return nil
|
|
}, time.Second, 100*time.Millisecond))
|
|
}
|
|
|
|
func TestProcessReplacementWaitsForConnections(t *testing.T) {
|
|
srv := newFakePeerAdderRemover()
|
|
target := 1
|
|
timeout := time.Second
|
|
nodes := make([]*enode.Node, 2)
|
|
fillWithRandomNodes(t, nodes)
|
|
events := make(chan *p2p.PeerEvent)
|
|
state := newInternalState(srv, target, timeout)
|
|
state.currentNodes = nodesToMap(nodes)
|
|
go func() {
|
|
select {
|
|
case events <- &p2p.PeerEvent{Peer: nodes[0].ID(), Type: p2p.PeerEventTypeAdd}:
|
|
case <-time.After(time.Second):
|
|
assert.FailNow(t, "can't send a drop event")
|
|
}
|
|
}()
|
|
state.processReplacement(nodes, events)
|
|
require.Len(t, state.connected, 1)
|
|
}
|