miner: remove contention on currentMu for pending data retrievals (#16497)

This commit is contained in:
Ryan Schneider 2018-04-16 00:56:20 -07:00 committed by Péter Szilágyi
parent 60516c83b0
commit 2a1fc3d155
1 changed files with 33 additions and 18 deletions

View File

@ -117,6 +117,10 @@ type worker struct {
currentMu sync.Mutex currentMu sync.Mutex
current *Work current *Work
snapshotMu sync.RWMutex
snapshotBlock *types.Block
snapshotState *state.StateDB
uncleMu sync.Mutex uncleMu sync.Mutex
possibleUncles map[common.Hash]*types.Block possibleUncles map[common.Hash]*types.Block
@ -171,32 +175,28 @@ func (self *worker) setExtra(extra []byte) {
} }
func (self *worker) pending() (*types.Block, *state.StateDB) { func (self *worker) pending() (*types.Block, *state.StateDB) {
if atomic.LoadInt32(&self.mining) == 0 {
// return a snapshot to avoid contention on currentMu mutex
self.snapshotMu.RLock()
defer self.snapshotMu.RUnlock()
return self.snapshotBlock, self.snapshotState.Copy()
}
self.currentMu.Lock() self.currentMu.Lock()
defer self.currentMu.Unlock() defer self.currentMu.Unlock()
if atomic.LoadInt32(&self.mining) == 0 {
return types.NewBlock(
self.current.header,
self.current.txs,
nil,
self.current.receipts,
), self.current.state.Copy()
}
return self.current.Block, self.current.state.Copy() return self.current.Block, self.current.state.Copy()
} }
func (self *worker) pendingBlock() *types.Block { func (self *worker) pendingBlock() *types.Block {
if atomic.LoadInt32(&self.mining) == 0 {
// return a snapshot to avoid contention on currentMu mutex
self.snapshotMu.RLock()
defer self.snapshotMu.RUnlock()
return self.snapshotBlock
}
self.currentMu.Lock() self.currentMu.Lock()
defer self.currentMu.Unlock() defer self.currentMu.Unlock()
if atomic.LoadInt32(&self.mining) == 0 {
return types.NewBlock(
self.current.header,
self.current.txs,
nil,
self.current.receipts,
)
}
return self.current.Block return self.current.Block
} }
@ -268,6 +268,7 @@ func (self *worker) update() {
txset := types.NewTransactionsByPriceAndNonce(self.current.signer, txs) txset := types.NewTransactionsByPriceAndNonce(self.current.signer, txs)
self.current.commitTransactions(self.mux, txset, self.chain, self.coinbase) self.current.commitTransactions(self.mux, txset, self.chain, self.coinbase)
self.updateSnapshot()
self.currentMu.Unlock() self.currentMu.Unlock()
} else { } else {
// If we're mining, but nothing is being processed, wake on new transactions // If we're mining, but nothing is being processed, wake on new transactions
@ -489,6 +490,7 @@ func (self *worker) commitNewWork() {
self.unconfirmed.Shift(work.Block.NumberU64() - 1) self.unconfirmed.Shift(work.Block.NumberU64() - 1)
} }
self.push(work) self.push(work)
self.updateSnapshot()
} }
func (self *worker) commitUncle(work *Work, uncle *types.Header) error { func (self *worker) commitUncle(work *Work, uncle *types.Header) error {
@ -506,6 +508,19 @@ func (self *worker) commitUncle(work *Work, uncle *types.Header) error {
return nil return nil
} }
func (self *worker) updateSnapshot() {
self.snapshotMu.Lock()
defer self.snapshotMu.Unlock()
self.snapshotBlock = types.NewBlock(
self.current.header,
self.current.txs,
nil,
self.current.receipts,
)
self.snapshotState = self.current.state.Copy()
}
func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsByPriceAndNonce, bc *core.BlockChain, coinbase common.Address) { func (env *Work) commitTransactions(mux *event.TypeMux, txs *types.TransactionsByPriceAndNonce, bc *core.BlockChain, coinbase common.Address) {
gp := new(core.GasPool).AddGas(env.header.GasLimit) gp := new(core.GasPool).AddGas(env.header.GasLimit)