From b66188941da3751d923b3367a1321c8b765a5891 Mon Sep 17 00:00:00 2001 From: Victor Farazdagi Date: Thu, 15 Sep 2016 06:08:06 +0300 Subject: [PATCH] Non-blocking StartNode() --- cmd/status/library.go | 10 ----- geth/node.go | 100 +++++++++++++++++++++++++----------------- geth/node_test.go | 25 +++++++++-- geth/txqueue_test.go | 4 +- geth/utils.go | 40 ++++++++++++++--- 5 files changed, 117 insertions(+), 62 deletions(-) diff --git a/cmd/status/library.go b/cmd/status/library.go index 466d2a2d7..b56540409 100644 --- a/cmd/status/library.go +++ b/cmd/status/library.go @@ -1,10 +1,5 @@ package main -/* -#include -#include -extern bool StatusServiceSignalEvent(const char *jsonEvent); -*/ import "C" import ( "encoding/json" @@ -16,11 +11,6 @@ import ( "github.com/status-im/status-go/jail" ) -// export TriggerTestSignal -func TriggerTestSignal() { - C.StatusServiceSignalEvent(C.CString(`{"answer": 42}`)) -} - //export CreateAccount func CreateAccount(password *C.char) *C.char { // This is equivalent to creating an account from the command line, diff --git a/geth/node.go b/geth/node.go index 7fbde6b32..e35e0fda7 100644 --- a/geth/node.go +++ b/geth/node.go @@ -1,6 +1,13 @@ package geth +/* +#include +#include +extern bool StatusServiceSignalEvent( const char *jsonEvent ); +*/ +import "C" import ( + "encoding/json" "errors" "flag" "fmt" @@ -33,6 +40,8 @@ const ( versionOracle = "0xfa7b9770ca4cb04296cac84f37736d4041251cdf" // Ethereum address of the Geth release oracle RPCPort = 8545 // RPC port (replaced in unit tests) + + EventNodeStarted = "node.started" ) var ( @@ -45,17 +54,15 @@ var ( ErrNodeStartFailure = errors.New("could not create the in-memory node object") ) -type NodeNotificationHandler func(jsonEvent string) - type NodeManager struct { - currentNode *node.Node // currently running geth node - ctx *cli.Context // the CLI context used to start the geth node - lightEthereum *les.LightEthereum // LES service - accountManager *accounts.Manager // the account manager attached to the currentNode - SelectedAddress string // address of the account that was processed during the last call to SelectAccount() - whisperService *whisper.Whisper // Whisper service - client *rpc.ClientRestartWrapper // RPC client - notificationHandler NodeNotificationHandler // internal signal handler (used in tests) + currentNode *node.Node // currently running geth node + ctx *cli.Context // the CLI context used to start the geth node + lightEthereum *les.LightEthereum // LES service + accountManager *accounts.Manager // the account manager attached to the currentNode + SelectedAddress string // address of the account that was processed during the last call to SelectAccount() + whisperService *whisper.Whisper // Whisper service + client *rpc.ClientRestartWrapper // RPC client + nodeStarted chan struct{} // channel to wait for node to start } var ( @@ -67,9 +74,6 @@ func NewNodeManager(datadir string, rpcport int) *NodeManager { createOnce.Do(func() { nodeManagerInstance = &NodeManager{} nodeManagerInstance.MakeNode(datadir, rpcport) - nodeManagerInstance.SetNotificationHandler(func(jsonEvent string) { - glog.V(logger.Info).Infof("internal notification received: %s\n", jsonEvent) - }) }) return nodeManagerInstance @@ -80,12 +84,14 @@ func GetNodeManager() *NodeManager { } // createAndStartNode creates a node entity and starts the -// node running locally +// node running locally exposing given RPC port func CreateAndRunNode(datadir string, rpcport int) error { nodeManager := NewNodeManager(datadir, rpcport) if nodeManager.HasNode() { nodeManager.RunNode() + + <-nodeManager.nodeStarted // block until node is ready return nil } @@ -135,33 +141,53 @@ func (m *NodeManager) MakeNode(datadir string, rpcport int) *node.Node { } m.accountManager = m.currentNode.AccountManager() + m.nodeStarted = make(chan struct{}) return m.currentNode } // StartNode starts a geth node entity func (m *NodeManager) RunNode() { - utils.StartNode(m.currentNode) + go func() { + utils.StartNode(m.currentNode) - if m.currentNode.AccountManager() == nil { - glog.V(logger.Warn).Infoln("cannot get account manager") - } - if err := m.currentNode.Service(&m.whisperService); err != nil { - glog.V(logger.Warn).Infoln("cannot get whisper service:", err) - } - if err := m.currentNode.Service(&m.lightEthereum); err != nil { - glog.V(logger.Warn).Infoln("cannot get light ethereum service:", err) - } - m.lightEthereum.StatusBackend.SetTransactionQueueHandler(onSendTransactionRequest) - - m.client = rpc.NewClientRestartWrapper(func() *rpc.Client { - client, err := m.currentNode.Attach() - if err != nil { - return nil + if m.currentNode.AccountManager() == nil { + glog.V(logger.Warn).Infoln("cannot get account manager") } - return client - }) - m.currentNode.Wait() + if err := m.currentNode.Service(&m.whisperService); err != nil { + glog.V(logger.Warn).Infoln("cannot get whisper service:", err) + } + if err := m.currentNode.Service(&m.lightEthereum); err != nil { + glog.V(logger.Warn).Infoln("cannot get light ethereum service:", err) + } + m.lightEthereum.StatusBackend.SetTransactionQueueHandler(onSendTransactionRequest) + + m.client = rpc.NewClientRestartWrapper(func() *rpc.Client { + client, err := m.currentNode.Attach() + if err != nil { + return nil + } + return client + }) + + m.onNodeStarted() // node started, notify listeners + m.currentNode.Wait() + }() +} + +func (m *NodeManager) onNodeStarted() { + // notify local listener + m.nodeStarted <- struct{}{} + close(m.nodeStarted) + + // send signal up to native app + event := GethEvent{ + Type: EventNodeStarted, + Event: struct{}{}, + } + + body, _ := json.Marshal(&event) + C.StatusServiceSignalEvent(C.CString(string(body))) } func (m *NodeManager) AddPeer(url string) (bool, error) { @@ -251,14 +277,6 @@ func (m *NodeManager) ClientRestartWrapper() (*rpc.ClientRestartWrapper, error) return m.client, nil } -func (m *NodeManager) SetNotificationHandler(fn NodeNotificationHandler) { - m.notificationHandler = fn -} - -func (m *NodeManager) NotificationHandler() NodeNotificationHandler { - return m.notificationHandler -} - func makeDefaultExtra() []byte { var clientInfo = struct { Version uint diff --git a/geth/node_test.go b/geth/node_test.go index aa0c7b86a..a9959d425 100644 --- a/geth/node_test.go +++ b/geth/node_test.go @@ -1,8 +1,11 @@ package geth_test import ( - "github.com/status-im/status-go/geth" + "os" "testing" + "time" + + "github.com/status-im/status-go/geth" ) const ( @@ -17,10 +20,26 @@ const ( whisperMessage5 = "test message 5 (K2 -> K1)" ) -func TestNodeSetup(t *testing.T) { +func TestMain(m *testing.M) { + // make sure you panic if node start signal is not received + signalRecieved := make(chan struct{}, 1) + abortPanic := make(chan bool, 1) + geth.PanicAfter(10*time.Second, abortPanic, "TestNodeSetup") + + geth.SetDefaultNodeNotificationHandler(func(jsonEvent string) { + if jsonEvent == `{"type":"node.started","event":{}}` { + signalRecieved <- struct{}{} + } + }) + err := geth.PrepareTestNode() if err != nil { - t.Error(err) + panic(err) return } + + <-signalRecieved // block and wait for either panic or successful signal + abortPanic <- true + + os.Exit(m.Run()) } diff --git a/geth/txqueue_test.go b/geth/txqueue_test.go index 10c2fb520..4f3d229fd 100644 --- a/geth/txqueue_test.go +++ b/geth/txqueue_test.go @@ -46,11 +46,11 @@ func TestQueuedTransactions(t *testing.T) { // make sure you panic if transaction complete doesn't return completeQueuedTransaction := make(chan bool, 1) - geth.PanicAfter(20*time.Second, completeQueuedTransaction) + geth.PanicAfter(20*time.Second, completeQueuedTransaction, "TestQueuedTransactions") // replace transaction notification handler var txHash = common.Hash{} - geth.GetNodeManager().SetNotificationHandler(func(jsonEvent string) { + geth.SetDefaultNodeNotificationHandler(func(jsonEvent string) { var envelope geth.GethEvent if err := json.Unmarshal([]byte(jsonEvent), &envelope); err != nil { t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent) diff --git a/geth/utils.go b/geth/utils.go index 8e698e1e5..4feeaaccb 100644 --- a/geth/utils.go +++ b/geth/utils.go @@ -1,5 +1,10 @@ package geth +/* +#include +#include +extern bool StatusServiceSignalEvent(const char *jsonEvent); +*/ import "C" import ( "bytes" @@ -21,9 +26,24 @@ const ( testNodeSyncSeconds = 300 ) +type NodeNotificationHandler func(jsonEvent string) + +var notificationHandler NodeNotificationHandler = func(jsonEvent string) { // internal signal handler (used in tests) + glog.V(logger.Info).Infof("notification received (default notification handler): %s\n", jsonEvent) +} + +func SetDefaultNodeNotificationHandler(fn NodeNotificationHandler) { + notificationHandler = fn +} + //export NotifyNode func NotifyNode(jsonEvent *C.char) { - GetNodeManager().NotificationHandler()(C.GoString(jsonEvent)) + notificationHandler(C.GoString(jsonEvent)) +} + +// export TriggerTestSignal +func TriggerTestSignal() { + C.StatusServiceSignalEvent(C.CString(`{"answer": 42}`)) } func CopyFile(dst, src string) error { @@ -93,12 +113,20 @@ func PrepareTestNode() (err error) { // start geth node and wait for it to initialize // internally once.Do() is used, so call below is thread-safe - go CreateAndRunNode(dataDir, 8546) // to avoid conflicts with running react-native app, run on different port - time.Sleep(3 * time.Second) + CreateAndRunNode(dataDir, 8546) // to avoid conflicts with running react-native app, run on different port manager = GetNodeManager() if !manager.HasNode() { - panic("could not obtain geth node") + panic(ErrInvalidGethNode) + } + if !manager.HasClientRestartWrapper() { + panic(ErrInvalidGethNode) + } + if !manager.HasWhisperService() { + panic(ErrInvalidGethNode) + } + if !manager.HasLightEthereumService() { + panic(ErrInvalidGethNode) } manager.AddPeer("enode://409772c7dea96fa59a912186ad5bcdb5e51b80556b3fe447d940f99d9eaadb51d4f0ffedb68efad232b52475dd7bd59b51cee99968b3cc79e2d5684b33c4090c@139.162.166.59:30303") @@ -141,7 +169,7 @@ func PreprocessDataDir(dataDir string) (string, error) { } // PanicAfter throws panic() after waitSeconds, unless abort channel receives notification -func PanicAfter(waitSeconds time.Duration, abort chan bool) { +func PanicAfter(waitSeconds time.Duration, abort chan bool, desc string) { // panic if function takes too long timeout := make(chan bool, 1) @@ -155,7 +183,7 @@ func PanicAfter(waitSeconds time.Duration, abort chan bool) { case <-abort: return case <-timeout: - panic("function takes to long, which generally means we are stuck") + panic("whatever you were doing takes toooo long: " + desc) } }() }