Non-blocking StartNode()

This commit is contained in:
Victor Farazdagi 2016-09-15 06:08:06 +03:00
parent edd8763c3c
commit b66188941d
5 changed files with 117 additions and 62 deletions

View File

@ -1,10 +1,5 @@
package main
/*
#include <stddef.h>
#include <stdbool.h>
extern bool StatusServiceSignalEvent(const char *jsonEvent);
*/
import "C"
import (
"encoding/json"
@ -16,11 +11,6 @@ import (
"github.com/status-im/status-go/jail"
)
// export TriggerTestSignal
func TriggerTestSignal() {
C.StatusServiceSignalEvent(C.CString(`{"answer": 42}`))
}
//export CreateAccount
func CreateAccount(password *C.char) *C.char {
// This is equivalent to creating an account from the command line,

View File

@ -1,6 +1,13 @@
package geth
/*
#include <stddef.h>
#include <stdbool.h>
extern bool StatusServiceSignalEvent( const char *jsonEvent );
*/
import "C"
import (
"encoding/json"
"errors"
"flag"
"fmt"
@ -33,6 +40,8 @@ const (
versionOracle = "0xfa7b9770ca4cb04296cac84f37736d4041251cdf" // Ethereum address of the Geth release oracle
RPCPort = 8545 // RPC port (replaced in unit tests)
EventNodeStarted = "node.started"
)
var (
@ -45,17 +54,15 @@ var (
ErrNodeStartFailure = errors.New("could not create the in-memory node object")
)
type NodeNotificationHandler func(jsonEvent string)
type NodeManager struct {
currentNode *node.Node // currently running geth node
ctx *cli.Context // the CLI context used to start the geth node
lightEthereum *les.LightEthereum // LES service
accountManager *accounts.Manager // the account manager attached to the currentNode
SelectedAddress string // address of the account that was processed during the last call to SelectAccount()
whisperService *whisper.Whisper // Whisper service
client *rpc.ClientRestartWrapper // RPC client
notificationHandler NodeNotificationHandler // internal signal handler (used in tests)
currentNode *node.Node // currently running geth node
ctx *cli.Context // the CLI context used to start the geth node
lightEthereum *les.LightEthereum // LES service
accountManager *accounts.Manager // the account manager attached to the currentNode
SelectedAddress string // address of the account that was processed during the last call to SelectAccount()
whisperService *whisper.Whisper // Whisper service
client *rpc.ClientRestartWrapper // RPC client
nodeStarted chan struct{} // channel to wait for node to start
}
var (
@ -67,9 +74,6 @@ func NewNodeManager(datadir string, rpcport int) *NodeManager {
createOnce.Do(func() {
nodeManagerInstance = &NodeManager{}
nodeManagerInstance.MakeNode(datadir, rpcport)
nodeManagerInstance.SetNotificationHandler(func(jsonEvent string) {
glog.V(logger.Info).Infof("internal notification received: %s\n", jsonEvent)
})
})
return nodeManagerInstance
@ -80,12 +84,14 @@ func GetNodeManager() *NodeManager {
}
// createAndStartNode creates a node entity and starts the
// node running locally
// node running locally exposing given RPC port
func CreateAndRunNode(datadir string, rpcport int) error {
nodeManager := NewNodeManager(datadir, rpcport)
if nodeManager.HasNode() {
nodeManager.RunNode()
<-nodeManager.nodeStarted // block until node is ready
return nil
}
@ -135,33 +141,53 @@ func (m *NodeManager) MakeNode(datadir string, rpcport int) *node.Node {
}
m.accountManager = m.currentNode.AccountManager()
m.nodeStarted = make(chan struct{})
return m.currentNode
}
// StartNode starts a geth node entity
func (m *NodeManager) RunNode() {
utils.StartNode(m.currentNode)
go func() {
utils.StartNode(m.currentNode)
if m.currentNode.AccountManager() == nil {
glog.V(logger.Warn).Infoln("cannot get account manager")
}
if err := m.currentNode.Service(&m.whisperService); err != nil {
glog.V(logger.Warn).Infoln("cannot get whisper service:", err)
}
if err := m.currentNode.Service(&m.lightEthereum); err != nil {
glog.V(logger.Warn).Infoln("cannot get light ethereum service:", err)
}
m.lightEthereum.StatusBackend.SetTransactionQueueHandler(onSendTransactionRequest)
m.client = rpc.NewClientRestartWrapper(func() *rpc.Client {
client, err := m.currentNode.Attach()
if err != nil {
return nil
if m.currentNode.AccountManager() == nil {
glog.V(logger.Warn).Infoln("cannot get account manager")
}
return client
})
m.currentNode.Wait()
if err := m.currentNode.Service(&m.whisperService); err != nil {
glog.V(logger.Warn).Infoln("cannot get whisper service:", err)
}
if err := m.currentNode.Service(&m.lightEthereum); err != nil {
glog.V(logger.Warn).Infoln("cannot get light ethereum service:", err)
}
m.lightEthereum.StatusBackend.SetTransactionQueueHandler(onSendTransactionRequest)
m.client = rpc.NewClientRestartWrapper(func() *rpc.Client {
client, err := m.currentNode.Attach()
if err != nil {
return nil
}
return client
})
m.onNodeStarted() // node started, notify listeners
m.currentNode.Wait()
}()
}
func (m *NodeManager) onNodeStarted() {
// notify local listener
m.nodeStarted <- struct{}{}
close(m.nodeStarted)
// send signal up to native app
event := GethEvent{
Type: EventNodeStarted,
Event: struct{}{},
}
body, _ := json.Marshal(&event)
C.StatusServiceSignalEvent(C.CString(string(body)))
}
func (m *NodeManager) AddPeer(url string) (bool, error) {
@ -251,14 +277,6 @@ func (m *NodeManager) ClientRestartWrapper() (*rpc.ClientRestartWrapper, error)
return m.client, nil
}
func (m *NodeManager) SetNotificationHandler(fn NodeNotificationHandler) {
m.notificationHandler = fn
}
func (m *NodeManager) NotificationHandler() NodeNotificationHandler {
return m.notificationHandler
}
func makeDefaultExtra() []byte {
var clientInfo = struct {
Version uint

View File

@ -1,8 +1,11 @@
package geth_test
import (
"github.com/status-im/status-go/geth"
"os"
"testing"
"time"
"github.com/status-im/status-go/geth"
)
const (
@ -17,10 +20,26 @@ const (
whisperMessage5 = "test message 5 (K2 -> K1)"
)
func TestNodeSetup(t *testing.T) {
func TestMain(m *testing.M) {
// make sure you panic if node start signal is not received
signalRecieved := make(chan struct{}, 1)
abortPanic := make(chan bool, 1)
geth.PanicAfter(10*time.Second, abortPanic, "TestNodeSetup")
geth.SetDefaultNodeNotificationHandler(func(jsonEvent string) {
if jsonEvent == `{"type":"node.started","event":{}}` {
signalRecieved <- struct{}{}
}
})
err := geth.PrepareTestNode()
if err != nil {
t.Error(err)
panic(err)
return
}
<-signalRecieved // block and wait for either panic or successful signal
abortPanic <- true
os.Exit(m.Run())
}

View File

@ -46,11 +46,11 @@ func TestQueuedTransactions(t *testing.T) {
// make sure you panic if transaction complete doesn't return
completeQueuedTransaction := make(chan bool, 1)
geth.PanicAfter(20*time.Second, completeQueuedTransaction)
geth.PanicAfter(20*time.Second, completeQueuedTransaction, "TestQueuedTransactions")
// replace transaction notification handler
var txHash = common.Hash{}
geth.GetNodeManager().SetNotificationHandler(func(jsonEvent string) {
geth.SetDefaultNodeNotificationHandler(func(jsonEvent string) {
var envelope geth.GethEvent
if err := json.Unmarshal([]byte(jsonEvent), &envelope); err != nil {
t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent)

View File

@ -1,5 +1,10 @@
package geth
/*
#include <stddef.h>
#include <stdbool.h>
extern bool StatusServiceSignalEvent(const char *jsonEvent);
*/
import "C"
import (
"bytes"
@ -21,9 +26,24 @@ const (
testNodeSyncSeconds = 300
)
type NodeNotificationHandler func(jsonEvent string)
var notificationHandler NodeNotificationHandler = func(jsonEvent string) { // internal signal handler (used in tests)
glog.V(logger.Info).Infof("notification received (default notification handler): %s\n", jsonEvent)
}
func SetDefaultNodeNotificationHandler(fn NodeNotificationHandler) {
notificationHandler = fn
}
//export NotifyNode
func NotifyNode(jsonEvent *C.char) {
GetNodeManager().NotificationHandler()(C.GoString(jsonEvent))
notificationHandler(C.GoString(jsonEvent))
}
// export TriggerTestSignal
func TriggerTestSignal() {
C.StatusServiceSignalEvent(C.CString(`{"answer": 42}`))
}
func CopyFile(dst, src string) error {
@ -93,12 +113,20 @@ func PrepareTestNode() (err error) {
// start geth node and wait for it to initialize
// internally once.Do() is used, so call below is thread-safe
go CreateAndRunNode(dataDir, 8546) // to avoid conflicts with running react-native app, run on different port
time.Sleep(3 * time.Second)
CreateAndRunNode(dataDir, 8546) // to avoid conflicts with running react-native app, run on different port
manager = GetNodeManager()
if !manager.HasNode() {
panic("could not obtain geth node")
panic(ErrInvalidGethNode)
}
if !manager.HasClientRestartWrapper() {
panic(ErrInvalidGethNode)
}
if !manager.HasWhisperService() {
panic(ErrInvalidGethNode)
}
if !manager.HasLightEthereumService() {
panic(ErrInvalidGethNode)
}
manager.AddPeer("enode://409772c7dea96fa59a912186ad5bcdb5e51b80556b3fe447d940f99d9eaadb51d4f0ffedb68efad232b52475dd7bd59b51cee99968b3cc79e2d5684b33c4090c@139.162.166.59:30303")
@ -141,7 +169,7 @@ func PreprocessDataDir(dataDir string) (string, error) {
}
// PanicAfter throws panic() after waitSeconds, unless abort channel receives notification
func PanicAfter(waitSeconds time.Duration, abort chan bool) {
func PanicAfter(waitSeconds time.Duration, abort chan bool, desc string) {
// panic if function takes too long
timeout := make(chan bool, 1)
@ -155,7 +183,7 @@ func PanicAfter(waitSeconds time.Duration, abort chan bool) {
case <-abort:
return
case <-timeout:
panic("function takes to long, which generally means we are stuck")
panic("whatever you were doing takes toooo long: " + desc)
}
}()
}