Merge pull request #14323 from fjl/ethash-verify-headers-fix

consensus/ethash: simplify concurrency in VerifyHeaders
This commit is contained in:
Péter Szilágyi 2017-04-13 10:32:28 +03:00 committed by GitHub
commit d5d910e8b6
2 changed files with 52 additions and 85 deletions

View File

@ -22,7 +22,6 @@ import (
"fmt" "fmt"
"math/big" "math/big"
"runtime" "runtime"
"sync/atomic"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -46,7 +45,6 @@ var (
// codebase, inherently breaking if the engine is swapped out. Please put common // codebase, inherently breaking if the engine is swapped out. Please put common
// error types into the consensus package. // error types into the consensus package.
var ( var (
errInvalidChain = errors.New("invalid header chain")
errLargeBlockTime = errors.New("timestamp too big") errLargeBlockTime = errors.New("timestamp too big")
errZeroBlockTime = errors.New("timestamp equals parent's") errZeroBlockTime = errors.New("timestamp equals parent's")
errTooManyUncles = errors.New("too many uncles") errTooManyUncles = errors.New("too many uncles")
@ -90,111 +88,80 @@ func (ethash *Ethash) VerifyHeader(chain consensus.ChainReader, header *types.He
// a results channel to retrieve the async verifications. // a results channel to retrieve the async verifications.
func (ethash *Ethash) VerifyHeaders(chain consensus.ChainReader, headers []*types.Header, seals []bool) (chan<- struct{}, <-chan error) { func (ethash *Ethash) VerifyHeaders(chain consensus.ChainReader, headers []*types.Header, seals []bool) (chan<- struct{}, <-chan error) {
// If we're running a full engine faking, accept any input as valid // If we're running a full engine faking, accept any input as valid
if ethash.fakeFull { if ethash.fakeFull || len(headers) == 0 {
abort, results := make(chan struct{}), make(chan error, len(headers)) abort, results := make(chan struct{}), make(chan error, len(headers))
for i := 0; i < len(headers); i++ { for i := 0; i < len(headers); i++ {
results <- nil results <- nil
} }
return abort, results return abort, results
} }
// Spawn as many workers as allowed threads // Spawn as many workers as allowed threads
workers := runtime.GOMAXPROCS(0) workers := runtime.GOMAXPROCS(0)
if len(headers) < workers { if len(headers) < workers {
workers = len(headers) workers = len(headers)
} }
// Create a task channel and spawn the verifiers
type result struct {
index int
err error
}
inputs := make(chan int, workers)
outputs := make(chan result, len(headers))
var badblock uint64 // Create a task channel and spawn the verifiers
var (
inputs = make(chan int)
done = make(chan int, workers)
errors = make([]error, len(headers))
abort = make(chan struct{})
)
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
go func() { go func() {
for index := range inputs { for index := range inputs {
// If we've found a bad block already before this, stop validating errors[index] = ethash.verifyHeaderWorker(chain, headers, seals, index)
if bad := atomic.LoadUint64(&badblock); bad != 0 && bad <= headers[index].Number.Uint64() { done <- index
outputs <- result{index: index, err: errInvalidChain}
continue
} }
// We need to look up the first parent }()
}
errorsOut := make(chan error, len(headers))
go func() {
defer close(inputs)
var (
in, out = 0, 0
checked = make([]bool, len(headers))
inputs = inputs
)
for {
select {
case inputs <- in:
if in++; in == len(headers) {
// Reached end of headers. Stop sending to workers.
inputs = nil
}
case index := <-done:
for checked[index] = true; checked[out]; out++ {
errorsOut <- errors[out]
if out == len(headers)-1 {
return
}
}
case <-abort:
return
}
}
}()
return abort, errorsOut
}
func (ethash *Ethash) verifyHeaderWorker(chain consensus.ChainReader, headers []*types.Header, seals []bool, index int) error {
var parent *types.Header var parent *types.Header
if index == 0 { if index == 0 {
parent = chain.GetHeader(headers[0].ParentHash, headers[0].Number.Uint64()-1) parent = chain.GetHeader(headers[0].ParentHash, headers[0].Number.Uint64()-1)
} else if headers[index-1].Hash() == headers[index].ParentHash { } else if headers[index-1].Hash() == headers[index].ParentHash {
parent = headers[index-1] parent = headers[index-1]
} }
// Ensure the validation is useful and execute it if parent == nil {
var failure error return consensus.ErrUnknownAncestor
switch {
case chain.GetHeader(headers[index].Hash(), headers[index].Number.Uint64()-1) != nil:
outputs <- result{index: index, err: nil}
case parent == nil:
failure = consensus.ErrUnknownAncestor
outputs <- result{index: index, err: failure}
default:
failure = ethash.verifyHeader(chain, headers[index], parent, false, seals[index])
outputs <- result{index: index, err: failure}
} }
// If a validation failure occurred, mark subsequent blocks invalid if chain.GetHeader(headers[index].Hash(), headers[index].Number.Uint64()) != nil {
if failure != nil { return nil // known block
number := headers[index].Number.Uint64()
if prev := atomic.LoadUint64(&badblock); prev == 0 || prev > number {
// This two step atomic op isn't thread-safe in that `badblock` might end
// up slightly higher than the block number of the first failure (if many
// workers try to write at the same time), but it's fine as we're mostly
// interested to avoid large useless work, we don't care about 1-2 extra
// runs. Doing "full thread safety" would involve mutexes, which would be
// a noticeable sync overhead on the fast spinning worker routines.
atomic.StoreUint64(&badblock, number)
} }
} return ethash.verifyHeader(chain, headers[index], parent, false, seals[index])
}
}()
}
// Feed item indices to the workers until done, sorting and feeding the results to the caller
dones := make([]bool, len(headers))
errors := make([]error, len(headers))
abort := make(chan struct{})
returns := make(chan error, len(headers))
go func() {
defer close(inputs)
input, output := 0, 0
for i := 0; i < len(headers)*2; i++ {
var res result
// If there are tasks left, push to workers
if input < len(headers) {
select {
case inputs <- input:
input++
continue
case <-abort:
return
case res = <-outputs:
}
} else {
// Otherwise keep waiting for results
select {
case <-abort:
return
case res = <-outputs:
}
}
// A result arrived, save and propagate if next
dones[res.index], errors[res.index] = true, res.err
for output < len(headers) && dones[output] {
returns <- errors[output]
output++
}
}
}()
return abort, returns
} }
// VerifyUncles verifies that the given block's uncles conform to the consensus // VerifyUncles verifies that the given block's uncles conform to the consensus

View File

@ -62,7 +62,7 @@ func TestDAOForkRangeExtradata(t *testing.T) {
gspec.MustCommit(db) gspec.MustCommit(db)
bc, _ := NewBlockChain(db, conConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) bc, _ := NewBlockChain(db, conConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
blocks := conBc.GetBlocksFromHash(conBc.CurrentBlock().Hash(), int(conBc.CurrentBlock().NumberU64()+1)) blocks := conBc.GetBlocksFromHash(conBc.CurrentBlock().Hash(), int(conBc.CurrentBlock().NumberU64()))
for j := 0; j < len(blocks)/2; j++ { for j := 0; j < len(blocks)/2; j++ {
blocks[j], blocks[len(blocks)-1-j] = blocks[len(blocks)-1-j], blocks[j] blocks[j], blocks[len(blocks)-1-j] = blocks[len(blocks)-1-j], blocks[j]
} }
@ -83,7 +83,7 @@ func TestDAOForkRangeExtradata(t *testing.T) {
gspec.MustCommit(db) gspec.MustCommit(db)
bc, _ = NewBlockChain(db, proConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) bc, _ = NewBlockChain(db, proConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
blocks = proBc.GetBlocksFromHash(proBc.CurrentBlock().Hash(), int(proBc.CurrentBlock().NumberU64()+1)) blocks = proBc.GetBlocksFromHash(proBc.CurrentBlock().Hash(), int(proBc.CurrentBlock().NumberU64()))
for j := 0; j < len(blocks)/2; j++ { for j := 0; j < len(blocks)/2; j++ {
blocks[j], blocks[len(blocks)-1-j] = blocks[len(blocks)-1-j], blocks[j] blocks[j], blocks[len(blocks)-1-j] = blocks[len(blocks)-1-j], blocks[j]
} }
@ -105,7 +105,7 @@ func TestDAOForkRangeExtradata(t *testing.T) {
gspec.MustCommit(db) gspec.MustCommit(db)
bc, _ := NewBlockChain(db, conConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) bc, _ := NewBlockChain(db, conConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
blocks := conBc.GetBlocksFromHash(conBc.CurrentBlock().Hash(), int(conBc.CurrentBlock().NumberU64()+1)) blocks := conBc.GetBlocksFromHash(conBc.CurrentBlock().Hash(), int(conBc.CurrentBlock().NumberU64()))
for j := 0; j < len(blocks)/2; j++ { for j := 0; j < len(blocks)/2; j++ {
blocks[j], blocks[len(blocks)-1-j] = blocks[len(blocks)-1-j], blocks[j] blocks[j], blocks[len(blocks)-1-j] = blocks[len(blocks)-1-j], blocks[j]
} }
@ -121,7 +121,7 @@ func TestDAOForkRangeExtradata(t *testing.T) {
gspec.MustCommit(db) gspec.MustCommit(db)
bc, _ = NewBlockChain(db, proConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{}) bc, _ = NewBlockChain(db, proConf, ethash.NewFaker(), new(event.TypeMux), vm.Config{})
blocks = proBc.GetBlocksFromHash(proBc.CurrentBlock().Hash(), int(proBc.CurrentBlock().NumberU64()+1)) blocks = proBc.GetBlocksFromHash(proBc.CurrentBlock().Hash(), int(proBc.CurrentBlock().NumberU64()))
for j := 0; j < len(blocks)/2; j++ { for j := 0; j < len(blocks)/2; j++ {
blocks[j], blocks[len(blocks)-1-j] = blocks[len(blocks)-1-j], blocks[j] blocks[j], blocks[len(blocks)-1-j] = blocks[len(blocks)-1-j], blocks[j]
} }