adapt to new event package

This commit is contained in:
Felix Lange 2014-10-14 19:38:38 +02:00
parent 83a4b8b49b
commit 0aea5fc4a3
3 changed files with 96 additions and 131 deletions

View File

@ -11,9 +11,9 @@ import (
"github.com/ethereum/eth-go/ethchain" "github.com/ethereum/eth-go/ethchain"
"github.com/ethereum/eth-go/ethlog" "github.com/ethereum/eth-go/ethlog"
"github.com/ethereum/eth-go/ethpipe" "github.com/ethereum/eth-go/ethpipe"
"github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethstate" "github.com/ethereum/eth-go/ethstate"
"github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethutil"
"github.com/ethereum/eth-go/event"
"github.com/ethereum/go-ethereum/utils" "github.com/ethereum/go-ethereum/utils"
"github.com/obscuren/otto" "github.com/obscuren/otto"
) )
@ -25,9 +25,8 @@ type JSRE struct {
Vm *otto.Otto Vm *otto.Otto
pipe *ethpipe.JSPipe pipe *ethpipe.JSPipe
blockChan chan ethreact.Event events event.Subscription
changeChan chan ethreact.Event quitChan chan bool
quitChan chan bool
objectCb map[string][]otto.Value objectCb map[string][]otto.Value
} }
@ -51,8 +50,7 @@ func NewJSRE(ethereum *eth.Ethereum) *JSRE {
ethereum, ethereum,
otto.New(), otto.New(),
ethpipe.NewJSPipe(ethereum), ethpipe.NewJSPipe(ethereum),
make(chan ethreact.Event, 10), nil,
make(chan ethreact.Event, 10),
make(chan bool), make(chan bool),
make(map[string][]otto.Value), make(map[string][]otto.Value),
} }
@ -68,8 +66,8 @@ func NewJSRE(ethereum *eth.Ethereum) *JSRE {
go re.mainLoop() go re.mainLoop()
// Subscribe to events // Subscribe to events
reactor := ethereum.Reactor() mux := ethereum.EventMux()
reactor.Subscribe("newBlock", re.blockChan) re.events = mux.Subscribe(ethchain.NewBlockEvent{})
re.Bind("eth", &JSEthereum{re.pipe, re.Vm, ethereum}) re.Bind("eth", &JSEthereum{re.pipe, re.Vm, ethereum})
@ -105,25 +103,16 @@ func (self *JSRE) Require(file string) error {
} }
func (self *JSRE) Stop() { func (self *JSRE) Stop() {
self.events.Unsubscribe()
// Kill the main loop // Kill the main loop
self.quitChan <- true self.quitChan <- true
close(self.blockChan)
close(self.quitChan) close(self.quitChan)
close(self.changeChan)
jsrelogger.Infoln("stopped") jsrelogger.Infoln("stopped")
} }
func (self *JSRE) mainLoop() { func (self *JSRE) mainLoop() {
out: for _ = range self.events.Chan() {
for {
select {
case <-self.quitChan:
break out
case block := <-self.blockChan:
if _, ok := block.Resource.(*ethchain.Block); ok {
}
}
} }
} }
@ -201,13 +190,13 @@ func (self *JSRE) watch(call otto.FunctionCall) otto.Value {
if storageCallback { if storageCallback {
self.objectCb[addr+storageAddr] = append(self.objectCb[addr+storageAddr], cb) self.objectCb[addr+storageAddr] = append(self.objectCb[addr+storageAddr], cb)
event := "storage:" + string(ethutil.Hex2Bytes(addr)) + ":" + string(ethutil.Hex2Bytes(storageAddr)) // event := "storage:" + string(ethutil.Hex2Bytes(addr)) + ":" + string(ethutil.Hex2Bytes(storageAddr))
self.ethereum.Reactor().Subscribe(event, self.changeChan) // self.ethereum.EventMux().Subscribe(event, self.changeChan)
} else { } else {
self.objectCb[addr] = append(self.objectCb[addr], cb) self.objectCb[addr] = append(self.objectCb[addr], cb)
event := "object:" + string(ethutil.Hex2Bytes(addr)) // event := "object:" + string(ethutil.Hex2Bytes(addr))
self.ethereum.Reactor().Subscribe(event, self.changeChan) // self.ethereum.EventMux().Subscribe(event, self.changeChan)
} }
return otto.UndefinedValue() return otto.UndefinedValue()

View File

@ -5,8 +5,8 @@ import (
"github.com/ethereum/eth-go/ethchain" "github.com/ethereum/eth-go/ethchain"
"github.com/ethereum/eth-go/ethpipe" "github.com/ethereum/eth-go/ethpipe"
"github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethstate" "github.com/ethereum/eth-go/ethstate"
"github.com/ethereum/eth-go/event"
"github.com/ethereum/go-ethereum/javascript" "github.com/ethereum/go-ethereum/javascript"
"gopkg.in/qml.v1" "gopkg.in/qml.v1"
) )
@ -28,9 +28,7 @@ type ExtApplication struct {
*ethpipe.JSPipe *ethpipe.JSPipe
eth ethchain.EthManager eth ethchain.EthManager
blockChan chan ethreact.Event events event.Subscription
messageChan chan ethreact.Event
quitChan chan bool
watcherQuitChan chan bool watcherQuitChan chan bool
filters map[string]*ethchain.Filter filters map[string]*ethchain.Filter
@ -40,19 +38,14 @@ type ExtApplication struct {
} }
func NewExtApplication(container AppContainer, lib *UiLib) *ExtApplication { func NewExtApplication(container AppContainer, lib *UiLib) *ExtApplication {
app := &ExtApplication{ return &ExtApplication{
ethpipe.NewJSPipe(lib.eth), JSPipe: ethpipe.NewJSPipe(lib.eth),
lib.eth, eth: lib.eth,
make(chan ethreact.Event, 100), watcherQuitChan: make(chan bool),
make(chan ethreact.Event, 100), filters: make(map[string]*ethchain.Filter),
make(chan bool), container: container,
make(chan bool), lib: lib,
make(map[string]*ethchain.Filter),
container,
lib,
} }
return app
} }
func (app *ExtApplication) run() { func (app *ExtApplication) run() {
@ -67,14 +60,13 @@ func (app *ExtApplication) run() {
return return
} }
// Subscribe to events
mux := app.lib.eth.EventMux()
app.events = mux.Subscribe(ethchain.NewBlockEvent{}, ethstate.Messages(nil))
// Call the main loop // Call the main loop
go app.mainLoop() go app.mainLoop()
// Subscribe to events
reactor := app.lib.eth.Reactor()
reactor.Subscribe("newBlock", app.blockChan)
reactor.Subscribe("messages", app.messageChan)
app.container.NewWatcher(app.watcherQuitChan) app.container.NewWatcher(app.watcherQuitChan)
win := app.container.Window() win := app.container.Window()
@ -85,42 +77,29 @@ func (app *ExtApplication) run() {
} }
func (app *ExtApplication) stop() { func (app *ExtApplication) stop() {
// Clean up app.events.Unsubscribe()
reactor := app.lib.eth.Reactor()
reactor.Unsubscribe("newBlock", app.blockChan)
// Kill the main loop // Kill the main loop
app.quitChan <- true
app.watcherQuitChan <- true app.watcherQuitChan <- true
close(app.blockChan)
close(app.quitChan)
app.container.Destroy() app.container.Destroy()
} }
func (app *ExtApplication) mainLoop() { func (app *ExtApplication) mainLoop() {
out: for ev := range app.events.Chan() {
for { switch ev := ev.(type) {
select { case ethchain.NewBlockEvent:
case <-app.quitChan: app.container.NewBlock(ev.Block)
break out
case block := <-app.blockChan: case ethstate.Messages:
if block, ok := block.Resource.(*ethchain.Block); ok { for id, filter := range app.filters {
app.container.NewBlock(block) msgs := filter.FilterMessages(ev)
} if len(msgs) > 0 {
case msg := <-app.messageChan: app.container.Messages(msgs, id)
if messages, ok := msg.Resource.(ethstate.Messages); ok {
for id, filter := range app.filters {
msgs := filter.FilterMessages(messages)
if len(msgs) > 0 {
app.container.Messages(msgs, id)
}
} }
} }
} }
} }
} }
func (self *ExtApplication) Watch(filterOptions map[string]interface{}, identifier string) { func (self *ExtApplication) Watch(filterOptions map[string]interface{}, identifier string) {

View File

@ -19,7 +19,6 @@ import (
"github.com/ethereum/eth-go/ethlog" "github.com/ethereum/eth-go/ethlog"
"github.com/ethereum/eth-go/ethminer" "github.com/ethereum/eth-go/ethminer"
"github.com/ethereum/eth-go/ethpipe" "github.com/ethereum/eth-go/ethpipe"
"github.com/ethereum/eth-go/ethreact"
"github.com/ethereum/eth-go/ethutil" "github.com/ethereum/eth-go/ethutil"
"github.com/ethereum/eth-go/ethwire" "github.com/ethereum/eth-go/ethwire"
"gopkg.in/qml.v1" "gopkg.in/qml.v1"
@ -376,15 +375,6 @@ func (gui *Gui) update() {
gui.win.Root().Call("addPlugin", plugin.Path, "") gui.win.Root().Call("addPlugin", plugin.Path, "")
} }
var (
blockChan = make(chan ethreact.Event, 100)
txChan = make(chan ethreact.Event, 100)
objectChan = make(chan ethreact.Event, 100)
peerChan = make(chan ethreact.Event, 100)
chainSyncChan = make(chan ethreact.Event, 100)
miningChan = make(chan ethreact.Event, 100)
)
peerUpdateTicker := time.NewTicker(5 * time.Second) peerUpdateTicker := time.NewTicker(5 * time.Second)
generalUpdateTicker := time.NewTicker(500 * time.Millisecond) generalUpdateTicker := time.NewTicker(500 * time.Millisecond)
statsUpdateTicker := time.NewTicker(5 * time.Second) statsUpdateTicker := time.NewTicker(5 * time.Second)
@ -397,61 +387,82 @@ func (gui *Gui) update() {
lastBlockLabel := gui.getObjectByName("lastBlockLabel") lastBlockLabel := gui.getObjectByName("lastBlockLabel")
miningLabel := gui.getObjectByName("miningLabel") miningLabel := gui.getObjectByName("miningLabel")
events := gui.eth.EventMux().Subscribe(
eth.ChainSyncEvent{},
eth.PeerListEvent{},
ethchain.NewBlockEvent{},
ethchain.TxEvent{},
ethminer.Event{},
)
// nameReg := gui.pipe.World().Config().Get("NameReg")
// mux.Subscribe("object:"+string(nameReg.Address()), objectChan)
go func() { go func() {
defer events.Unsubscribe()
for { for {
select { select {
case b := <-blockChan: case ev, isopen := <-events.Chan():
block := b.Resource.(*ethchain.Block) if !isopen {
gui.processBlock(block, false) return
if bytes.Compare(block.Coinbase, gui.address()) == 0 {
gui.setWalletValue(gui.eth.StateManager().CurrentState().GetAccount(gui.address()).Balance, nil)
} }
case txMsg := <-txChan: switch ev := ev.(type) {
tx := txMsg.Resource.(*ethchain.Transaction) case ethchain.NewBlockEvent:
gui.processBlock(ev.Block, false)
if txMsg.Name == "newTx:pre" { if bytes.Compare(ev.Block.Coinbase, gui.address()) == 0 {
object := state.GetAccount(gui.address()) gui.setWalletValue(gui.eth.StateManager().CurrentState().GetAccount(gui.address()).Balance, nil)
if bytes.Compare(tx.Sender(), gui.address()) == 0 {
unconfirmedFunds.Sub(unconfirmedFunds, tx.Value)
} else if bytes.Compare(tx.Recipient, gui.address()) == 0 {
unconfirmedFunds.Add(unconfirmedFunds, tx.Value)
} }
gui.setWalletValue(object.Balance, unconfirmedFunds) case ethchain.TxEvent:
tx := ev.Tx
if ev.Type == ethchain.TxPre {
object := state.GetAccount(gui.address())
gui.insertTransaction("pre", tx) if bytes.Compare(tx.Sender(), gui.address()) == 0 {
} else { unconfirmedFunds.Sub(unconfirmedFunds, tx.Value)
object := state.GetAccount(gui.address()) } else if bytes.Compare(tx.Recipient, gui.address()) == 0 {
if bytes.Compare(tx.Sender(), gui.address()) == 0 { unconfirmedFunds.Add(unconfirmedFunds, tx.Value)
object.SubAmount(tx.Value) }
//gui.getObjectByName("transactionView").Call("addTx", ethpipe.NewJSTx(tx), "send") gui.setWalletValue(object.Balance, unconfirmedFunds)
gui.txDb.Put(tx.Hash(), tx.RlpEncode())
} else if bytes.Compare(tx.Recipient, gui.address()) == 0 {
object.AddAmount(tx.Value)
//gui.getObjectByName("transactionView").Call("addTx", ethpipe.NewJSTx(tx), "recv") gui.insertTransaction("pre", tx)
gui.txDb.Put(tx.Hash(), tx.RlpEncode())
} else if ev.Type == ethchain.TxPost {
object := state.GetAccount(gui.address())
if bytes.Compare(tx.Sender(), gui.address()) == 0 {
object.SubAmount(tx.Value)
//gui.getObjectByName("transactionView").Call("addTx", ethpipe.NewJSTx(tx), "send")
gui.txDb.Put(tx.Hash(), tx.RlpEncode())
} else if bytes.Compare(tx.Recipient, gui.address()) == 0 {
object.AddAmount(tx.Value)
//gui.getObjectByName("transactionView").Call("addTx", ethpipe.NewJSTx(tx), "recv")
gui.txDb.Put(tx.Hash(), tx.RlpEncode())
}
gui.setWalletValue(object.Balance, nil)
state.UpdateStateObject(object)
} }
gui.setWalletValue(object.Balance, nil) // case object:
// gui.loadAddressBook()
state.UpdateStateObject(object) case eth.PeerListEvent:
gui.setPeerInfo()
case ethminer.Event:
if ev.Type == ethminer.Started {
gui.miner = ev.Miner
} else {
gui.miner = nil
}
} }
case <-objectChan:
gui.loadAddressBook()
case <-peerChan:
gui.setPeerInfo()
case <-peerUpdateTicker.C: case <-peerUpdateTicker.C:
gui.setPeerInfo() gui.setPeerInfo()
case msg := <-miningChan:
if msg.Name == "miner:start" {
gui.miner = msg.Resource.(*ethminer.Miner)
} else {
gui.miner = nil
}
case <-generalUpdateTicker.C: case <-generalUpdateTicker.C:
statusText := "#" + gui.eth.BlockChain().CurrentBlock.Number.String() statusText := "#" + gui.eth.BlockChain().CurrentBlock.Number.String()
lastBlockLabel.Set("text", statusText) lastBlockLabel.Set("text", statusText)
@ -478,20 +489,6 @@ func (gui *Gui) update() {
} }
} }
}() }()
reactor := gui.eth.Reactor()
reactor.Subscribe("newBlock", blockChan)
reactor.Subscribe("newTx:pre", txChan)
reactor.Subscribe("newTx:post", txChan)
reactor.Subscribe("chainSync", chainSyncChan)
reactor.Subscribe("miner:start", miningChan)
reactor.Subscribe("miner:stop", miningChan)
nameReg := gui.pipe.World().Config().Get("NameReg")
reactor.Subscribe("object:"+string(nameReg.Address()), objectChan)
reactor.Subscribe("peerList", peerChan)
} }
func (gui *Gui) setStatsPane() { func (gui *Gui) setStatsPane() {