diff --git a/miner/agent.go b/miner/agent.go index 95d835bd7..e922ea153 100644 --- a/miner/agent.go +++ b/miner/agent.go @@ -27,10 +27,10 @@ import ( type CpuAgent struct { mu sync.Mutex - workCh chan *Work + taskCh chan *Package + returnCh chan<- *Package stop chan struct{} quitCurrentOp chan struct{} - returnCh chan<- *Result chain consensus.ChainReader engine consensus.Engine @@ -43,13 +43,17 @@ func NewCpuAgent(chain consensus.ChainReader, engine consensus.Engine) *CpuAgent chain: chain, engine: engine, stop: make(chan struct{}, 1), - workCh: make(chan *Work, 1), + taskCh: make(chan *Package, 1), } return agent } -func (self *CpuAgent) Work() chan<- *Work { return self.workCh } -func (self *CpuAgent) SetReturnCh(ch chan<- *Result) { self.returnCh = ch } +func (self *CpuAgent) AssignTask(p *Package) { + if atomic.LoadInt32(&self.started) == 1 { + self.taskCh <- p + } +} +func (self *CpuAgent) DeliverTo(ch chan<- *Package) { self.returnCh = ch } func (self *CpuAgent) Start() { if !atomic.CompareAndSwapInt32(&self.started, 0, 1) { @@ -67,7 +71,7 @@ done: // Empty work channel for { select { - case <-self.workCh: + case <-self.taskCh: default: break done } @@ -78,13 +82,13 @@ func (self *CpuAgent) update() { out: for { select { - case work := <-self.workCh: + case p := <-self.taskCh: self.mu.Lock() if self.quitCurrentOp != nil { close(self.quitCurrentOp) } self.quitCurrentOp = make(chan struct{}) - go self.mine(work, self.quitCurrentOp) + go self.mine(p, self.quitCurrentOp) self.mu.Unlock() case <-self.stop: self.mu.Lock() @@ -98,10 +102,11 @@ out: } } -func (self *CpuAgent) mine(work *Work, stop <-chan struct{}) { - if result, err := self.engine.Seal(self.chain, work.Block, stop); result != nil { - log.Info("Successfully sealed new block", "number", result.Number(), "hash", result.Hash()) - self.returnCh <- &Result{work, result} +func (self *CpuAgent) mine(p *Package, stop <-chan struct{}) { + var err error + if p.Block, err = self.engine.Seal(self.chain, p.Block, stop); p.Block != nil { + log.Info("Successfully sealed new block", "number", p.Block.Number(), "hash", p.Block.Hash()) + self.returnCh <- p } else { if err != nil { log.Warn("Block sealing failed", "err", err) diff --git a/miner/worker.go b/miner/worker.go index f1194fa18..ae695f019 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -51,17 +51,16 @@ const ( chainSideChanSize = 10 ) -// Agent can register themself with the worker +// Agent can register themselves with the worker type Agent interface { - Work() chan<- *Work - SetReturnCh(chan<- *Result) + AssignTask(*Package) + DeliverTo(chan<- *Package) Start() Stop() } -// Work is the workers current environment and holds -// all of the current state information -type Work struct { +// Env is the workers current environment and holds all of the current state information. +type Env struct { config *params.ChainConfig signer types.Signer @@ -72,8 +71,6 @@ type Work struct { tcount int // tx count in cycle gasPool *core.GasPool // available gas used to pack transactions - Block *types.Block // the new block - header *types.Header txs []*types.Transaction receipts []*types.Receipt @@ -81,9 +78,11 @@ type Work struct { createdAt time.Time } -type Result struct { - Work *Work - Block *types.Block +// Package contains all information for consensus engine sealing and result submitting. +type Package struct { + Receipts []*types.Receipt + State *state.StateDB + Block *types.Block } // worker is the main object which takes care of applying messages to the new state @@ -103,7 +102,7 @@ type worker struct { chainSideSub event.Subscription agents map[Agent]struct{} - recv chan *Result + recv chan *Package eth Backend chain *core.BlockChain @@ -114,7 +113,7 @@ type worker struct { extra []byte currentMu sync.Mutex - current *Work + current *Env snapshotMu sync.RWMutex snapshotBlock *types.Block @@ -126,7 +125,6 @@ type worker struct { unconfirmed *unconfirmedBlocks // set of locally mined blocks pending canonicalness confirmations // atomic status counters - atWork int32 // The number of in-flight consensus engine work. running int32 // The indicator whether the consensus engine is running or not. } @@ -140,7 +138,7 @@ func newWorker(config *params.ChainConfig, engine consensus.Engine, eth Backend, chainHeadCh: make(chan core.ChainHeadEvent, chainHeadChanSize), chainSideCh: make(chan core.ChainSideEvent, chainSideChanSize), chainDb: eth.ChainDb(), - recv: make(chan *Result, resultQueueSize), + recv: make(chan *Package, resultQueueSize), chain: eth.BlockChain(), proc: eth.BlockChain().Validator(), possibleUncles: make(map[common.Hash]*types.Block), @@ -203,7 +201,6 @@ func (self *worker) stop() { for agent := range self.agents { agent.Stop() } - atomic.StoreInt32(&self.atWork, 0) } func (self *worker) isRunning() bool { @@ -214,7 +211,7 @@ func (self *worker) register(agent Agent) { self.mu.Lock() defer self.mu.Unlock() self.agents[agent] = struct{}{} - agent.SetReturnCh(self.recv) + agent.DeliverTo(self.recv) if self.isRunning() { agent.Start() } @@ -284,26 +281,24 @@ func (self *worker) update() { func (self *worker) wait() { for { for result := range self.recv { - atomic.AddInt32(&self.atWork, -1) if result == nil { continue } block := result.Block - work := result.Work // Update the block hash in all logs since it is now available and not when the // receipt/log of individual transactions were created. - for _, r := range work.receipts { + for _, r := range result.Receipts { for _, l := range r.Logs { l.BlockHash = block.Hash() } } - for _, log := range work.state.Logs() { + for _, log := range result.State.Logs() { log.BlockHash = block.Hash() } self.currentMu.Lock() - stat, err := self.chain.WriteBlockWithState(block, work.receipts, work.state) + stat, err := self.chain.WriteBlockWithState(block, result.Receipts, result.State) self.currentMu.Unlock() if err != nil { log.Error("Failed writing block to chain", "err", err) @@ -313,7 +308,7 @@ func (self *worker) wait() { self.mux.Post(core.NewMinedBlockEvent{Block: block}) var ( events []interface{} - logs = work.state.Logs() + logs = result.State.Logs() ) events = append(events, core.ChainEvent{Block: block, Hash: block.Hash(), Logs: logs}) if stat == core.CanonStatTy { @@ -328,12 +323,9 @@ func (self *worker) wait() { } // push sends a new work task to currently live miner agents. -func (self *worker) push(work *Work) { +func (self *worker) push(p *Package) { for agent := range self.agents { - atomic.AddInt32(&self.atWork, 1) - if ch := agent.Work(); ch != nil { - ch <- work - } + agent.AssignTask(p) } } @@ -343,7 +335,7 @@ func (self *worker) makeCurrent(parent *types.Block, header *types.Header) error if err != nil { return err } - work := &Work{ + env := &Env{ config: self.config, signer: types.NewEIP155Signer(self.config.ChainID), state: state, @@ -357,15 +349,15 @@ func (self *worker) makeCurrent(parent *types.Block, header *types.Header) error // when 08 is processed ancestors contain 07 (quick block) for _, ancestor := range self.chain.GetBlocksFromHash(parent.Hash(), 7) { for _, uncle := range ancestor.Uncles() { - work.family.Add(uncle.Hash()) + env.family.Add(uncle.Hash()) } - work.family.Add(ancestor.Hash()) - work.ancestors.Add(ancestor.Hash()) + env.family.Add(ancestor.Hash()) + env.ancestors.Add(ancestor.Hash()) } // Keep track of transactions which return errors so they can be removed - work.tcount = 0 - self.current = work + env.tcount = 0 + self.current = env return nil } @@ -431,9 +423,9 @@ func (self *worker) commitNewWork() { return } // Create the current work task and check any fork transitions needed - work := self.current + env := self.current if self.config.DAOForkSupport && self.config.DAOForkBlock != nil && self.config.DAOForkBlock.Cmp(header.Number) == 0 { - misc.ApplyDAOHardFork(work.state) + misc.ApplyDAOHardFork(env.state) } // compute uncles for the new block. @@ -445,7 +437,7 @@ func (self *worker) commitNewWork() { if len(uncles) == 2 { break } - if err := self.commitUncle(work, uncle.Header()); err != nil { + if err := self.commitUncle(env, uncle.Header()); err != nil { log.Trace("Bad uncle found and will be removed", "hash", hash) log.Trace(fmt.Sprint(uncle)) @@ -459,17 +451,23 @@ func (self *worker) commitNewWork() { delete(self.possibleUncles, hash) } + var ( + emptyBlock *types.Block + fullBlock *types.Block + ) + // Create an empty block based on temporary copied state for sealing in advance without waiting block // execution finished. - if work.Block, err = self.engine.Finalize(self.chain, header, work.state.Copy(), nil, uncles, nil); err != nil { + emptyState := env.state.Copy() + if emptyBlock, err = self.engine.Finalize(self.chain, header, emptyState, nil, uncles, nil); err != nil { log.Error("Failed to finalize block for temporary sealing", "err", err) } else { // Push empty work in advance without applying pending transaction. // The reason is transactions execution can cost a lot and sealer need to // take advantage of this part time. if self.isRunning() { - log.Info("Commit new empty mining work", "number", work.Block.Number(), "uncles", len(uncles)) - self.push(work) + log.Info("Commit new empty mining work", "number", emptyBlock.Number(), "uncles", len(uncles)) + self.push(&Package{nil, emptyState, emptyBlock}) } } @@ -480,34 +478,34 @@ func (self *worker) commitNewWork() { return } txs := types.NewTransactionsByPriceAndNonce(self.current.signer, pending) - work.commitTransactions(self.mux, txs, self.chain, self.coinbase) + env.commitTransactions(self.mux, txs, self.chain, self.coinbase) // Create the full block to seal with the consensus engine - if work.Block, err = self.engine.Finalize(self.chain, header, work.state, work.txs, uncles, work.receipts); err != nil { + if fullBlock, err = self.engine.Finalize(self.chain, header, env.state, env.txs, uncles, env.receipts); err != nil { log.Error("Failed to finalize block for sealing", "err", err) return } // We only care about logging if we're actually mining. if self.isRunning() { - log.Info("Commit new full mining work", "number", work.Block.Number(), "txs", work.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart))) - self.unconfirmed.Shift(work.Block.NumberU64() - 1) - self.push(work) + log.Info("Commit new full mining work", "number", fullBlock.Number(), "txs", env.tcount, "uncles", len(uncles), "elapsed", common.PrettyDuration(time.Since(tstart))) + self.unconfirmed.Shift(fullBlock.NumberU64() - 1) + self.push(&Package{env.receipts, env.state, fullBlock}) } self.updateSnapshot() } -func (self *worker) commitUncle(work *Work, uncle *types.Header) error { +func (self *worker) commitUncle(env *Env, uncle *types.Header) error { hash := uncle.Hash() - if work.uncles.Contains(hash) { + if env.uncles.Contains(hash) { return fmt.Errorf("uncle not unique") } - if !work.ancestors.Contains(uncle.ParentHash) { + if !env.ancestors.Contains(uncle.ParentHash) { return fmt.Errorf("uncle's parent unknown (%x)", uncle.ParentHash[0:4]) } - if work.family.Contains(hash) { + if env.family.Contains(hash) { return fmt.Errorf("uncle already in family (%x)", hash) } - work.uncles.Add(uncle.Hash()) + env.uncles.Add(uncle.Hash()) return nil } @@ -533,7 +531,7 @@ func (self *worker) updateSnapshot() { self.snapshotState = self.current.state.Copy() } -func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsByPriceAndNonce, bc *core.BlockChain, coinbase common.Address) { +func (env *Env) commitTransactions(mux *event.TypeMux, txs *types.TransactionsByPriceAndNonce, bc *core.BlockChain, coinbase common.Address) { if env.gasPool == nil { env.gasPool = new(core.GasPool).AddGas(env.header.GasLimit) } @@ -618,7 +616,7 @@ func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsB } } -func (env *Work) commitTransaction(tx *types.Transaction, bc *core.BlockChain, coinbase common.Address, gp *core.GasPool) (error, []*types.Log) { +func (env *Env) commitTransaction(tx *types.Transaction, bc *core.BlockChain, coinbase common.Address, gp *core.GasPool) (error, []*types.Log) { snap := env.state.Snapshot() receipt, _, err := core.ApplyTransaction(env.config, bc, &coinbase, gp, env.state, env.header, tx, &env.header.GasUsed, vm.Config{})