[#1054] : Introducing t/helpers and WaitForPeer (#1064)

* [#1054] : Introducing t/helpers and WaitForPeer

* waitForPeer to receive a p2p.Server
This commit is contained in:
Adrià Cidre 2018-06-27 09:55:25 +02:00 committed by GitHub
parent ab7b9b914c
commit 3ca120c2aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 92 additions and 51 deletions

View File

@ -1,11 +1,12 @@
package node
import (
"testing"
. "github.com/status-im/status-go/t/utils"
"github.com/stretchr/testify/require"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/storage"
"testing"
)
var enode1 = "enode://f32efef2739e5135a0f9a80600b321ba4d13393a5f1d3f5f593df85919262f06c70bfa66d38507b9d79a91021f5e200ec20150592e72934c66248e87014c4317@1.1.1.1:30404"

View File

@ -14,6 +14,7 @@ import (
"github.com/ethereum/go-ethereum/les"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
"github.com/syndtr/goleveldb/leveldb"
@ -77,6 +78,18 @@ func (n *StatusNode) GethNode() *node.Node {
return n.gethNode
}
// Server retrieves the currently running P2P network layer.
func (n *StatusNode) Server() *p2p.Server {
n.mu.RLock()
defer n.mu.RUnlock()
if n.gethNode == nil {
return nil
}
return n.gethNode.Server()
}
func (n *StatusNode) startWithDB(config *params.NodeConfig, db *leveldb.DB, services []node.ServiceConstructor) error {
if err := n.createNode(config, db); err != nil {
return err

View File

@ -1,7 +1,6 @@
package node
import (
"errors"
"io/ioutil"
"math"
"os"
@ -17,6 +16,7 @@ import (
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/t/helpers"
"github.com/stretchr/testify/require"
)
@ -181,7 +181,7 @@ func TestStatusNodeAddPeer(t *testing.T) {
require.NoError(t, n.Start(&config))
defer func() { require.NoError(t, n.Stop()) }()
errCh := waitForPeerAsync(n, peerURL, p2p.PeerEventTypeAdd, time.Second*5)
errCh := helpers.WaitForPeerAsync(n.Server(), peerURL, p2p.PeerEventTypeAdd, time.Second*5)
// checks after node is started
require.NoError(t, n.AddPeer(peerURL))
@ -228,16 +228,17 @@ func TestStatusNodeReconnectStaticPeers(t *testing.T) {
connected, err := isPeerConnected(n, peerURL)
require.NoError(t, err)
if !connected {
errCh = waitForPeerAsync(n, peerURL, p2p.PeerEventTypeAdd, time.Second*30)
errCh = helpers.WaitForPeerAsync(n.Server(), peerURL, p2p.PeerEventTypeAdd, time.Second*30)
require.NoError(t, <-errCh)
}
require.Equal(t, 1, n.PeerCount())
require.Equal(t, peer.Server().Self().ID.String(), n.GethNode().Server().PeersInfo()[0].ID)
// reconnect static peers
errDropCh := waitForPeerAsync(n, peerURL, p2p.PeerEventTypeDrop, time.Second*30)
errDropCh := helpers.WaitForPeerAsync(n.Server(), peerURL, p2p.PeerEventTypeDrop, time.Second*30)
// it takes at least 30 seconds to bring back previously connected peer
errAddCh := waitForPeerAsync(n, peerURL, p2p.PeerEventTypeAdd, time.Second*60)
errAddCh := helpers.WaitForPeerAsync(n.Server(), peerURL, p2p.PeerEventTypeAdd, time.Second*60)
require.NoError(t, n.ReconnectStaticPeers())
// first check if a peer gets disconnected
require.NoError(t, <-errDropCh)
@ -264,48 +265,3 @@ func isPeerConnected(node *StatusNode, peerURL string) (bool, error) {
return false, nil
}
func waitForPeer(node *StatusNode, peerURL string, eventType p2p.PeerEventType, timeout time.Duration, subscribed chan struct{}) error {
if !node.IsRunning() {
return ErrNoRunningNode
}
parsedPeer, err := discover.ParseNode(peerURL)
if err != nil {
return err
}
server := node.GethNode().Server()
ch := make(chan *p2p.PeerEvent)
subscription := server.SubscribeEvents(ch)
defer subscription.Unsubscribe()
close(subscribed)
for {
select {
case ev := <-ch:
if ev.Type == eventType && ev.Peer == parsedPeer.ID {
return nil
}
case err := <-subscription.Err():
if err != nil {
return err
}
case <-time.After(timeout):
return errors.New("wait for peer: timeout")
}
}
}
func waitForPeerAsync(node *StatusNode, peerURL string, eventType p2p.PeerEventType, timeout time.Duration) <-chan error {
subscribed := make(chan struct{})
errCh := make(chan error)
go func() {
errCh <- waitForPeer(node, peerURL, eventType, timeout, subscribed)
}()
<-subscribed
return errCh
}

View File

@ -6,13 +6,17 @@ import (
"io/ioutil"
"os"
"testing"
"time"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/p2p"
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
"github.com/status-im/status-go/node"
"github.com/status-im/status-go/params"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/status-im/status-go/t/helpers"
. "github.com/status-im/status-go/t/utils"
)
@ -127,7 +131,13 @@ func (s *DebugAPISuite) addPeerToCurrentNode(dir string) {
node2 := s.newPeer("test2", dir).GethNode()
s.NotNil(node2)
errCh := helpers.WaitForPeerAsync(s.Backend.StatusNode().Server(),
node2.Server().Self().String(),
p2p.PeerEventTypeAdd,
time.Second*5)
node1.Server().AddPeer(node2.Server().Self())
require.NoError(s.T(), <-errCh)
}
// newNode creates, configures and starts a new peer.

61
t/helpers/peers.go Normal file
View File

@ -0,0 +1,61 @@
package helpers
import (
"errors"
"time"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
)
var (
// ErrNoRunningNode node is not running.
ErrNoRunningNode = errors.New("there is no running node")
// ErrEmptyPeerURL provided peer URL is empty
ErrEmptyPeerURL = errors.New("empty peer url")
)
// waitForPeer waits for a peer to be added
func waitForPeer(p *p2p.Server, u string, e p2p.PeerEventType, t time.Duration, subscribed chan struct{}) error {
if p == nil {
return ErrNoRunningNode
}
if u == "" {
return ErrEmptyPeerURL
}
parsedPeer, err := discover.ParseNode(u)
if err != nil {
return err
}
ch := make(chan *p2p.PeerEvent)
subscription := p.SubscribeEvents(ch)
defer subscription.Unsubscribe()
close(subscribed)
for {
select {
case ev := <-ch:
if ev.Type == e && ev.Peer == parsedPeer.ID {
return nil
}
case err := <-subscription.Err():
if err != nil {
return err
}
case <-time.After(t):
return errors.New("wait for peer: timeout")
}
}
}
// WaitForPeerAsync waits for a peer to be added asynchronously
func WaitForPeerAsync(p *p2p.Server, u string, e p2p.PeerEventType, t time.Duration) <-chan error {
subscribed := make(chan struct{})
errCh := make(chan error)
go func() {
errCh <- waitForPeer(p, u, e, t, subscribed)
}()
<-subscribed
return errCh
}