package mailservers import ( "fmt" "sync" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/t/utils" ) type fakePeerEvents struct { mu sync.Mutex nodes map[types.EnodeID]struct{} input chan *p2p.PeerEvent } func (f *fakePeerEvents) Nodes() []types.EnodeID { f.mu.Lock() rst := make([]types.EnodeID, 0, len(f.nodes)) for n := range f.nodes { rst = append(rst, n) } f.mu.Unlock() return rst } func (f *fakePeerEvents) AddPeer(node *enode.Node) { f.mu.Lock() f.nodes[types.EnodeID(node.ID())] = struct{}{} f.mu.Unlock() if f.input == nil { return } f.input <- &p2p.PeerEvent{ Peer: node.ID(), Type: p2p.PeerEventTypeAdd, } } func (f *fakePeerEvents) RemovePeer(node *enode.Node) { f.mu.Lock() delete(f.nodes, types.EnodeID(node.ID())) f.mu.Unlock() if f.input == nil { return } f.input <- &p2p.PeerEvent{ Peer: node.ID(), Type: p2p.PeerEventTypeDrop, } } func newFakePeerAdderRemover() *fakePeerEvents { return &fakePeerEvents{nodes: map[types.EnodeID]struct{}{}} } func (f *fakePeerEvents) SubscribeEvents(output chan *p2p.PeerEvent) event.Subscription { return event.NewSubscription(func(quit <-chan struct{}) error { for { select { case <-quit: return nil case ev := <-f.input: // will block the same way as in any feed output <- ev } } }) } func newFakeServer() *fakePeerEvents { srv := newFakePeerAdderRemover() srv.input = make(chan *p2p.PeerEvent, 20) return srv } type fakeEnvelopeEvents struct { input chan types.EnvelopeEvent } func (f *fakeEnvelopeEvents) SubscribeEnvelopeEvents(output chan<- types.EnvelopeEvent) types.Subscription { return event.NewSubscription(func(quit <-chan struct{}) error { for { select { case <-quit: return nil case ev := <-f.input: // will block the same way as in any feed output <- ev } } }) } func newFakeEnvelopesEvents() *fakeEnvelopeEvents { return &fakeEnvelopeEvents{ input: make(chan types.EnvelopeEvent), } } func fillWithRandomNodes(t *testing.T, nodes []*enode.Node) { var err error for i := range nodes { nodes[i], err = RandomNode() require.NoError(t, err) } } func getMapWithRandomNodes(t *testing.T, n int) map[types.EnodeID]*enode.Node { nodes := make([]*enode.Node, n) fillWithRandomNodes(t, nodes) return nodesToMap(nodes) } func mergeOldIntoNew(old, new map[types.EnodeID]*enode.Node) { for n := range old { new[n] = old[n] } } func TestReplaceNodes(t *testing.T) { type testCase struct { description string old map[types.EnodeID]*enode.Node new map[types.EnodeID]*enode.Node target int } for _, tc := range []testCase{ { "InitialReplace", getMapWithRandomNodes(t, 0), getMapWithRandomNodes(t, 3), 2, }, { "FullReplace", getMapWithRandomNodes(t, 3), getMapWithRandomNodes(t, 3), 2, }, } { t.Run(tc.description, func(t *testing.T) { peers := newFakePeerAdderRemover() state := newInternalState(peers, tc.target, 0) state.replaceNodes(tc.old) require.Len(t, peers.nodes, len(tc.old)) for n := range peers.nodes { require.Contains(t, tc.old, n) } state.replaceNodes(tc.new) require.Len(t, peers.nodes, len(tc.new)) for n := range peers.nodes { require.Contains(t, tc.new, n) } }) } } func TestPartialReplaceNodesBelowTarget(t *testing.T) { peers := newFakePeerAdderRemover() old := getMapWithRandomNodes(t, 1) new := getMapWithRandomNodes(t, 2) state := newInternalState(peers, 2, 0) state.replaceNodes(old) mergeOldIntoNew(old, new) state.replaceNodes(new) require.Len(t, peers.nodes, len(new)) } func TestPartialReplaceNodesAboveTarget(t *testing.T) { peers := newFakePeerAdderRemover() initial, err := RandomNode() require.NoError(t, err) old := nodesToMap([]*enode.Node{initial}) new := getMapWithRandomNodes(t, 2) state := newInternalState(peers, 1, 0) state.replaceNodes(old) state.nodeAdded(types.EnodeID(initial.ID())) mergeOldIntoNew(old, new) state.replaceNodes(new) require.Len(t, peers.nodes, 1) } func TestConnectionManagerAddDrop(t *testing.T) { server := newFakeServer() whisper := newFakeEnvelopesEvents() target := 1 connmanager := NewConnectionManager(server, whisper, target, 1, 0) connmanager.Start() defer connmanager.Stop() nodes := []*enode.Node{} for _, n := range getMapWithRandomNodes(t, 3) { nodes = append(nodes, n) } // Send 3 random nodes to connection manager. connmanager.Notify(nodes) var initial enode.ID // Wait till connection manager establishes connection with 1 peer. require.NoError(t, utils.Eventually(func() error { nodes := server.Nodes() if len(nodes) != target { return fmt.Errorf("unexpected number of connected servers: %d", len(nodes)) } initial = enode.ID(nodes[0]) return nil }, time.Second, 100*time.Millisecond)) // Send an event that peer was dropped. select { case server.input <- &p2p.PeerEvent{Peer: initial, Type: p2p.PeerEventTypeDrop}: case <-time.After(time.Second): require.FailNow(t, "can't send a drop event") } // Connection manager should establish connection with any other peer from initial list. require.NoError(t, utils.Eventually(func() error { nodes := server.Nodes() if len(nodes) != target { return fmt.Errorf("unexpected number of connected servers: %d", len(nodes)) } if enode.ID(nodes[0]) == initial { return fmt.Errorf("connected node wasn't changed from %s", initial) } return nil }, time.Second, 100*time.Millisecond)) } func TestConnectionManagerReplace(t *testing.T) { server := newFakeServer() whisper := newFakeEnvelopesEvents() target := 1 connmanager := NewConnectionManager(server, whisper, target, 1, 0) connmanager.Start() defer connmanager.Stop() nodes := []*enode.Node{} for _, n := range getMapWithRandomNodes(t, 3) { nodes = append(nodes, n) } // Send a single node to connection manager. connmanager.Notify(nodes[:1]) // Wait until this node will get connected. require.NoError(t, utils.Eventually(func() error { connected := server.Nodes() if len(connected) != target { return fmt.Errorf("unexpected number of connected servers: %d", len(connected)) } if types.EnodeID(nodes[0].ID()) != connected[0] { return fmt.Errorf("connected with a wrong peer. expected %s, got %s", nodes[0].ID(), connected[0]) } return nil }, time.Second, 100*time.Millisecond)) // Replace previously sent node with 2 different nodes. connmanager.Notify(nodes[1:]) // Wait until connection manager replaces node connected in the first round. require.NoError(t, utils.Eventually(func() error { connected := server.Nodes() if len(connected) != target { return fmt.Errorf("unexpected number of connected servers: %d", len(connected)) } switch enode.ID(connected[0]) { case nodes[1].ID(): case nodes[2].ID(): default: return fmt.Errorf("connected with unexpected peer. got %s, expected %+v", connected[0], nodes[1:]) } return nil }, time.Second, 100*time.Millisecond)) } func setupTestConnectionAfterExpiry(t *testing.T, server *fakePeerEvents, whisperMock *fakeEnvelopeEvents, target, maxFailures int, hash types.Hash) (*ConnectionManager, types.EnodeID) { connmanager := NewConnectionManager(server, whisperMock, target, maxFailures, 0) connmanager.Start() nodes := []*enode.Node{} for _, n := range getMapWithRandomNodes(t, 2) { nodes = append(nodes, n) } // Send two random nodes to connection manager. connmanager.Notify(nodes) var initial types.EnodeID // Wait until connection manager establishes connection with one node. require.NoError(t, utils.Eventually(func() error { nodes := server.Nodes() if len(nodes) != target { return fmt.Errorf("unexpected number of connected servers: %d", len(nodes)) } initial = nodes[0] return nil }, time.Second, 100*time.Millisecond)) // Send event that history request for connected peer was sent. select { case whisperMock.input <- types.EnvelopeEvent{ Event: types.EventMailServerRequestSent, Peer: initial, Hash: hash}: case <-time.After(time.Second): require.FailNow(t, "can't send a 'sent' event") } return connmanager, initial } func TestConnectionChangedAfterExpiry(t *testing.T) { server := newFakeServer() whisperMock := newFakeEnvelopesEvents() target := 1 maxFailures := 1 hash := types.Hash{1} connmanager, initial := setupTestConnectionAfterExpiry(t, server, whisperMock, target, maxFailures, hash) defer connmanager.Stop() // And eventually expired. select { case whisperMock.input <- types.EnvelopeEvent{ Event: types.EventMailServerRequestExpired, Peer: initial, Hash: hash}: case <-time.After(time.Second): require.FailNow(t, "can't send an 'expiry' event") } require.NoError(t, utils.Eventually(func() error { nodes := server.Nodes() if len(nodes) != target { return fmt.Errorf("unexpected number of connected servers: %d", len(nodes)) } if nodes[0] == initial { return fmt.Errorf("connected node wasn't changed from %s", initial) } return nil }, time.Second, 100*time.Millisecond)) } func TestConnectionChangedAfterSecondExpiry(t *testing.T) { server := newFakeServer() whisperMock := newFakeEnvelopesEvents() target := 1 maxFailures := 2 hash := types.Hash{1} connmanager, initial := setupTestConnectionAfterExpiry(t, server, whisperMock, target, maxFailures, hash) defer connmanager.Stop() // First expired is sent. Nothing should happen. select { case whisperMock.input <- types.EnvelopeEvent{ Event: types.EventMailServerRequestExpired, Peer: initial, Hash: hash}: case <-time.After(time.Second): require.FailNow(t, "can't send an 'expiry' event") } // we use 'eventually' as 'consistently' because this function will retry for a given timeout while error is received require.EqualError(t, utils.Eventually(func() error { nodes := server.Nodes() if len(nodes) != target { return fmt.Errorf("unexpected number of connected servers: %d", len(nodes)) } if nodes[0] == initial { return fmt.Errorf("connected node wasn't changed from %s", initial) } return nil }, time.Second, 100*time.Millisecond), fmt.Sprintf("connected node wasn't changed from %s", initial)) // second expiry event select { case whisperMock.input <- types.EnvelopeEvent{ Event: types.EventMailServerRequestExpired, Peer: initial, Hash: hash}: case <-time.After(time.Second): require.FailNow(t, "can't send an 'expiry' event") } require.NoError(t, utils.Eventually(func() error { nodes := server.Nodes() if len(nodes) != target { return fmt.Errorf("unexpected number of connected servers: %d", len(nodes)) } if nodes[0] == initial { return fmt.Errorf("connected node wasn't changed from %s", initial) } return nil }, time.Second, 100*time.Millisecond)) } func TestProcessReplacementWaitsForConnections(t *testing.T) { srv := newFakePeerAdderRemover() target := 1 timeout := time.Second nodes := make([]*enode.Node, 2) fillWithRandomNodes(t, nodes) events := make(chan *p2p.PeerEvent) state := newInternalState(srv, target, timeout) state.currentNodes = nodesToMap(nodes) go func() { select { case events <- &p2p.PeerEvent{Peer: nodes[0].ID(), Type: p2p.PeerEventTypeAdd}: case <-time.After(time.Second): assert.FailNow(t, "can't send a drop event") } }() state.processReplacement(nodes, events) require.Len(t, state.connected, 1) }