From 7db7109a5b53c339f00e9c05ac826b3dbd1f98e1 Mon Sep 17 00:00:00 2001 From: zsfelfoldi Date: Wed, 13 Jan 2016 19:35:48 +0100 Subject: [PATCH] cmd, eth: added light client and light server modes --- cmd/geth/chaincmd.go | 2 +- cmd/geth/main.go | 30 ++++++++++++ cmd/geth/usage.go | 4 ++ cmd/utils/flags.go | 58 +++++++++++++++++++++-- contracts/release/release.go | 16 +++++-- eth/backend.go | 61 +++++++++++++++++++----- eth/bind.go | 8 ++-- eth/downloader/downloader.go | 27 ++++++++--- eth/downloader/downloader_test.go | 2 +- eth/filters/api.go | 18 +++++--- eth/filters/filter.go | 77 ++++++++++++++++++------------- eth/filters/filter_system_test.go | 33 +++++++++++++ eth/filters/filter_test.go | 36 ++++++++------- eth/handler.go | 10 +++- eth/handler_test.go | 2 +- eth/helper_test.go | 2 +- ethdb/database.go | 6 ++- 17 files changed, 298 insertions(+), 94 deletions(-) diff --git a/cmd/geth/chaincmd.go b/cmd/geth/chaincmd.go index c1bbbd8dc..7a9cf4ac2 100644 --- a/cmd/geth/chaincmd.go +++ b/cmd/geth/chaincmd.go @@ -180,7 +180,7 @@ func exportChain(ctx *cli.Context) error { func removeDB(ctx *cli.Context) error { stack := utils.MakeNode(ctx, clientIdentifier, gitCommit) - dbdir := stack.ResolvePath("chaindata") + dbdir := stack.ResolvePath(utils.ChainDbName(ctx)) if !common.FileExist(dbdir) { fmt.Println(dbdir, "does not exist") return nil diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 551705208..75abdc813 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -41,6 +41,7 @@ import ( "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p/discover" "gopkg.in/urfave/cli.v1" ) @@ -118,6 +119,10 @@ participating. utils.KeyStoreDirFlag, utils.OlympicFlag, utils.FastSyncFlag, + utils.LightModeFlag, + utils.NoDefSrvFlag, + utils.LightServFlag, + utils.LightPeersFlag, utils.LightKDFFlag, utils.CacheFlag, utils.TrieCacheGenFlag, @@ -280,6 +285,31 @@ func startNode(ctx *cli.Context, stack *node.Node) { // Start up the node itself utils.StartNode(stack) + if ctx.GlobalBool(utils.LightModeFlag.Name) && !ctx.GlobalBool(utils.NoDefSrvFlag.Name) { + // add default light server; test phase only + addPeer := func(url string) { + node, err := discover.ParseNode(url) + if err == nil { + stack.Server().AddPeer(node) + } + } + + if ctx.GlobalBool(utils.TestNetFlag.Name) { + // TestNet (John Gerryts @phonikg) + addPeer("enode://d72af45ba9b60851a8077a4eb07700484b585e5f2e55024e0c93b7ec7d114f2e3fa3c8f3a3358f89da00a609f5a062415deb857ada863b8cdad02b0b0bc90da3@50.112.52.169:30301") + } else { + if ctx.GlobalBool(utils.OpposeDAOFork.Name) { + // Classic (Azure) + addPeer("enode://fc3d7b57e5d317946bf421411632ec98d5ffcbf94548cd7bc10088e4fef176670f8ec70280d301a9d0b22fe498203f62b323da15b3acc18b02a1fee2a06b7d3f@40.118.3.223:30305") + } else { + // MainNet (Azure) + addPeer("enode://feaf206a308a669a789be45f4dadcb351246051727f12415ad69e44f8080daf0569c10fe1d9944d245dd1f3e1c89cedda8ce03d7e3d5ed8975a35cad4b4f7ec1@40.118.3.223:30303") + // MainNet (John Gerryts @phonikg) + addPeer("enode://02b80f0d47c7c157c069d0584067a284cdf188b9267666234b872e70d936a803ad20ea27f78ef1fd6425ae4b7108907e1875adbca96b038004114ac4d1e529a3@50.112.52.169:30300") + } + } + } + // Unlock any account specifically requested accman := stack.AccountManager() passwords := utils.MakePasswordList(ctx) diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index 4c4e27630..75560d017 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -72,6 +72,10 @@ var AppHelpFlagGroups = []flagGroup{ utils.DevModeFlag, utils.IdentityFlag, utils.FastSyncFlag, + utils.LightModeFlag, + utils.NoDefSrvFlag, + utils.LightServFlag, + utils.LightPeersFlag, utils.LightKDFFlag, }, }, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index edb0873f6..40d96a3b5 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -39,6 +39,8 @@ import ( "github.com/ethereum/go-ethereum/eth" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/les" + "github.com/ethereum/go-ethereum/light" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/metrics" @@ -145,6 +147,24 @@ var ( Name: "fast", Usage: "Enable fast syncing through state downloads", } + LightModeFlag = cli.BoolFlag{ + Name: "light", + Usage: "Enable light client mode", + } + NoDefSrvFlag = cli.BoolFlag{ + Name: "nodefsrv", + Usage: "Don't add default LES server (only for test version)", + } + LightServFlag = cli.IntFlag{ + Name: "lightserv", + Usage: "Maximum percentage of time allowed for serving LES requests (0-90)", + Value: 0, + } + LightPeersFlag = cli.IntFlag{ + Name: "lightpeers", + Usage: "Maximum number of LES client peers", + Value: 20, + } LightKDFFlag = cli.BoolFlag{ Name: "lightkdf", Usage: "Reduce key-derivation RAM & CPU usage at some expense of KDF strength", @@ -680,6 +700,11 @@ func RegisterEthService(ctx *cli.Context, stack *node.Node, extra []byte) { Etherbase: MakeEtherbase(stack.AccountManager(), ctx), ChainConfig: MakeChainConfig(ctx, stack), FastSync: ctx.GlobalBool(FastSyncFlag.Name), + LightMode: ctx.GlobalBool(LightModeFlag.Name), + NoDefSrv: ctx.GlobalBool(NoDefSrvFlag.Name), + LightServ: ctx.GlobalInt(LightServFlag.Name), + LightPeers: ctx.GlobalInt(LightPeersFlag.Name), + MaxPeers: ctx.GlobalInt(MaxPeersFlag.Name), DatabaseCache: ctx.GlobalInt(CacheFlag.Name), DatabaseHandles: MakeDatabaseHandles(), NetworkId: ctx.GlobalInt(NetworkIdFlag.Name), @@ -714,6 +739,7 @@ func RegisterEthService(ctx *cli.Context, stack *node.Node, extra []byte) { } ethConf.Genesis = core.TestNetGenesisBlock() state.StartingNonce = 1048576 // (2**20) + light.StartingNonce = 1048576 // (2**20) case ctx.GlobalBool(DevModeFlag.Name): ethConf.Genesis = core.OlympicGenesisBlock() @@ -727,10 +753,23 @@ func RegisterEthService(ctx *cli.Context, stack *node.Node, extra []byte) { state.MaxTrieCacheGen = uint16(gen) } - if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { - return eth.New(ctx, ethConf) - }); err != nil { - Fatalf("Failed to register the Ethereum service: %v", err) + if ethConf.LightMode { + if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { + return les.New(ctx, ethConf) + }); err != nil { + Fatalf("Failed to register the Ethereum light node service: %v", err) + } + } else { + if err := stack.Register(func(ctx *node.ServiceContext) (node.Service, error) { + fullNode, err := eth.New(ctx, ethConf) + if fullNode != nil && ethConf.LightServ > 0 { + ls, _ := les.NewLesServer(fullNode, ethConf) + fullNode.AddLesServer(ls) + } + return fullNode, err + }); err != nil { + Fatalf("Failed to register the Ethereum full node service: %v", err) + } } } @@ -830,14 +869,23 @@ func MakeChainConfigFromDb(ctx *cli.Context, db ethdb.Database) *core.ChainConfi return config } +func ChainDbName(ctx *cli.Context) string { + if ctx.GlobalBool(LightModeFlag.Name) { + return "lightchaindata" + } else { + return "chaindata" + } +} + // MakeChainDatabase open an LevelDB using the flags passed to the client and will hard crash if it fails. func MakeChainDatabase(ctx *cli.Context, stack *node.Node) ethdb.Database { var ( cache = ctx.GlobalInt(CacheFlag.Name) handles = MakeDatabaseHandles() + name = ChainDbName(ctx) ) - chainDb, err := stack.OpenDatabase("chaindata", cache, handles) + chainDb, err := stack.OpenDatabase(name, cache, handles) if err != nil { Fatalf("Could not open database: %v", err) } diff --git a/contracts/release/release.go b/contracts/release/release.go index 5a6665dba..8d1710197 100644 --- a/contracts/release/release.go +++ b/contracts/release/release.go @@ -27,6 +27,8 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/eth" + "github.com/ethereum/go-ethereum/internal/ethapi" + "github.com/ethereum/go-ethereum/les" "github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/node" @@ -60,12 +62,20 @@ type ReleaseService struct { // releases and notify the user of such. func NewReleaseService(ctx *node.ServiceContext, config Config) (node.Service, error) { // Retrieve the Ethereum service dependency to access the blockchain + var apiBackend ethapi.Backend var ethereum *eth.Ethereum - if err := ctx.Service(ðereum); err != nil { - return nil, err + if err := ctx.Service(ðereum); err == nil { + apiBackend = ethereum.ApiBackend + } else { + var ethereum *les.LightEthereum + if err := ctx.Service(ðereum); err == nil { + apiBackend = ethereum.ApiBackend + } else { + return nil, err + } } // Construct the release service - contract, err := NewReleaseOracle(config.Oracle, eth.NewContractBackend(ethereum)) + contract, err := NewReleaseOracle(config.Oracle, eth.NewContractBackend(apiBackend)) if err != nil { return nil, err } diff --git a/eth/backend.go b/eth/backend.go index 9c5e11a59..10018aeaa 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -66,9 +66,14 @@ var ( type Config struct { ChainConfig *core.ChainConfig // chain configuration - NetworkId int // Network ID to use for selecting peers to connect to - Genesis string // Genesis JSON to seed the chain database with - FastSync bool // Enables the state download based fast synchronisation algorithm + NetworkId int // Network ID to use for selecting peers to connect to + Genesis string // Genesis JSON to seed the chain database with + FastSync bool // Enables the state download based fast synchronisation algorithm + LightMode bool // Running in light client mode + NoDefSrv bool // No default LES server + LightServ int // Maximum percentage of time allowed for serving LES requests + LightPeers int // Maximum number of LES client peers + MaxPeers int // Maximum number of global peers SkipBcVersionCheck bool // e.g. blockchain export DatabaseCache int @@ -100,6 +105,12 @@ type Config struct { TestGenesisState ethdb.Database // Genesis state to seed the database with (testing only!) } +type LesServer interface { + Start() + Stop() + Protocols() []p2p.Protocol +} + // Ethereum implements the Ethereum full node service. type Ethereum struct { chainConfig *core.ChainConfig @@ -111,6 +122,7 @@ type Ethereum struct { txMu sync.Mutex blockchain *core.BlockChain protocolManager *ProtocolManager + lesServer LesServer // DB interfaces chainDb ethdb.Database // Block chain database @@ -119,7 +131,7 @@ type Ethereum struct { httpclient *httpclient.HTTPClient accountManager *accounts.Manager - apiBackend *EthApiBackend + ApiBackend *EthApiBackend miner *miner.Miner Mining bool @@ -135,10 +147,14 @@ type Ethereum struct { netRPCService *ethapi.PublicNetAPI } +func (s *Ethereum) AddLesServer(ls LesServer) { + s.lesServer = ls +} + // New creates a new Ethereum object (including the // initialisation of the common Ethereum object) func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { - chainDb, err := createDB(ctx, config) + chainDb, err := CreateDB(ctx, config, "chaindata") if err != nil { return nil, err } @@ -217,7 +233,18 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { newPool := core.NewTxPool(eth.chainConfig, eth.EventMux(), eth.blockchain.State, eth.blockchain.GasLimit) eth.txPool = newPool - if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.FastSync, config.NetworkId, eth.eventMux, eth.txPool, eth.pow, eth.blockchain, chainDb); err != nil { + maxPeers := config.MaxPeers + if config.LightServ > 0 { + // if we are running a light server, limit the number of ETH peers so that we reserve some space for incoming LES connections + // temporary solution until the new peer connectivity API is finished + halfPeers := maxPeers / 2 + maxPeers -= config.LightPeers + if maxPeers < halfPeers { + maxPeers = halfPeers + } + } + + if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.FastSync, config.NetworkId, maxPeers, eth.eventMux, eth.txPool, eth.pow, eth.blockchain, chainDb); err != nil { return nil, err } eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.pow) @@ -233,14 +260,14 @@ func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) { GpobaseCorrectionFactor: config.GpobaseCorrectionFactor, } gpo := gasprice.NewGasPriceOracle(eth.blockchain, chainDb, eth.eventMux, gpoParams) - eth.apiBackend = &EthApiBackend{eth, gpo} + eth.ApiBackend = &EthApiBackend{eth, gpo} return eth, nil } -// createDB creates the chain database. -func createDB(ctx *node.ServiceContext, config *Config) (ethdb.Database, error) { - db, err := ctx.OpenDatabase("chaindata", config.DatabaseCache, config.DatabaseHandles) +// CreateDB creates the chain database. +func CreateDB(ctx *node.ServiceContext, config *Config, name string) (ethdb.Database, error) { + db, err := ctx.OpenDatabase(name, config.DatabaseCache, config.DatabaseHandles) if db, ok := db.(*ethdb.LDBDatabase); ok { db.Meter("eth/db/chaindata/") } @@ -288,7 +315,7 @@ func CreatePoW(config *Config) (*ethash.Ethash, error) { // APIs returns the collection of RPC services the ethereum package offers. // NOTE, some of these services probably need to be moved to somewhere else. func (s *Ethereum) APIs() []rpc.API { - return append(ethapi.GetAPIs(s.apiBackend, s.solcPath), []rpc.API{ + return append(ethapi.GetAPIs(s.ApiBackend, s.solcPath), []rpc.API{ { Namespace: "eth", Version: "1.0", @@ -391,7 +418,11 @@ func (s *Ethereum) Downloader() *downloader.Downloader { return s.protocolManage // Protocols implements node.Service, returning all the currently configured // network protocols to start. func (s *Ethereum) Protocols() []p2p.Protocol { - return s.protocolManager.SubProtocols + if s.lesServer == nil { + return s.protocolManager.SubProtocols + } else { + return append(s.protocolManager.SubProtocols, s.lesServer.Protocols()...) + } } // Start implements node.Service, starting all internal goroutines needed by the @@ -402,6 +433,9 @@ func (s *Ethereum) Start(srvr *p2p.Server) error { s.StartAutoDAG() } s.protocolManager.Start() + if s.lesServer != nil { + s.lesServer.Start() + } return nil } @@ -413,6 +447,9 @@ func (s *Ethereum) Stop() error { } s.blockchain.Stop() s.protocolManager.Stop() + if s.lesServer != nil { + s.lesServer.Stop() + } s.txPool.Stop() s.miner.Stop() s.eventMux.Stop() diff --git a/eth/bind.go b/eth/bind.go index 532e94460..0931c5f3b 100644 --- a/eth/bind.go +++ b/eth/bind.go @@ -43,11 +43,11 @@ type ContractBackend struct { // NewContractBackend creates a new native contract backend using an existing // Etheruem object. -func NewContractBackend(eth *Ethereum) *ContractBackend { +func NewContractBackend(apiBackend ethapi.Backend) *ContractBackend { return &ContractBackend{ - eapi: ethapi.NewPublicEthereumAPI(eth.apiBackend), - bcapi: ethapi.NewPublicBlockChainAPI(eth.apiBackend), - txapi: ethapi.NewPublicTransactionPoolAPI(eth.apiBackend), + eapi: ethapi.NewPublicEthereumAPI(apiBackend), + bcapi: ethapi.NewPublicBlockChainAPI(apiBackend), + txapi: ethapi.NewPublicTransactionPoolAPI(apiBackend), } } diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index da20e48a2..b1f4b8169 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -164,13 +164,13 @@ type Downloader struct { } // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, hasBlockAndState blockAndStateCheckFn, +func New(mode SyncMode, stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, hasBlockAndState blockAndStateCheckFn, getHeader headerRetrievalFn, getBlock blockRetrievalFn, headHeader headHeaderRetrievalFn, headBlock headBlockRetrievalFn, headFastBlock headFastBlockRetrievalFn, commitHeadBlock headBlockCommitterFn, getTd tdRetrievalFn, insertHeaders headerChainInsertFn, insertBlocks blockChainInsertFn, insertReceipts receiptChainInsertFn, rollback chainRollbackFn, dropPeer peerDropFn) *Downloader { dl := &Downloader{ - mode: FullSync, + mode: mode, mux: mux, queue: newQueue(stateDb), peers: newPeerSet(), @@ -1179,10 +1179,23 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { for i, header := range rollback { hashes[i] = header.Hash() } - lastHeader, lastFastBlock, lastBlock := d.headHeader().Number, d.headFastBlock().Number(), d.headBlock().Number() + lastHeader, lastFastBlock, lastBlock := d.headHeader().Number, common.Big0, common.Big0 + if d.headFastBlock != nil { + lastFastBlock = d.headFastBlock().Number() + } + if d.headBlock != nil { + lastBlock = d.headBlock().Number() + } d.rollback(hashes) + curFastBlock, curBlock := common.Big0, common.Big0 + if d.headFastBlock != nil { + curFastBlock = d.headFastBlock().Number() + } + if d.headBlock != nil { + curBlock = d.headBlock().Number() + } glog.V(logger.Warn).Infof("Rolled back %d headers (LH: %d->%d, FB: %d->%d, LB: %d->%d)", - len(hashes), lastHeader, d.headHeader().Number, lastFastBlock, d.headFastBlock().Number(), lastBlock, d.headBlock().Number()) + len(hashes), lastHeader, d.headHeader().Number, lastFastBlock, curFastBlock, lastBlock, curBlock) // If we're already past the pivot point, this could be an attack, thread carefully if rollback[len(rollback)-1].Number.Uint64() > pivot { @@ -1229,8 +1242,10 @@ func (d *Downloader) processHeaders(origin uint64, td *big.Int) error { // L: Sync begins, and finds common ancestor at 11 // L: Request new headers up from 11 (R's TD was higher, it must have something) // R: Nothing to give - if !gotHeaders && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 { - return errStallingPeer + if d.mode != LightSync { + if !gotHeaders && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 { + return errStallingPeer + } } // If fast or light syncing, ensure promised headers are indeed delivered. This is // needed to detect scenarios where an attacker feeds a bad pivot and then bails out diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 2849712ab..ff8cd1044 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -96,7 +96,7 @@ func newTester() *downloadTester { tester.stateDb, _ = ethdb.NewMemDatabase() tester.stateDb.Put(genesis.Root().Bytes(), []byte{0x00}) - tester.downloader = New(tester.stateDb, new(event.TypeMux), tester.hasHeader, tester.hasBlock, tester.getHeader, + tester.downloader = New(FullSync, tester.stateDb, new(event.TypeMux), tester.hasHeader, tester.hasBlock, tester.getHeader, tester.getBlock, tester.headHeader, tester.headBlock, tester.headFastBlock, tester.commitHeadBlock, tester.getTd, tester.insertHeaders, tester.insertBlocks, tester.insertReceipts, tester.rollback, tester.dropPeer) diff --git a/eth/filters/api.go b/eth/filters/api.go index fa4bef283..834513262 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -52,6 +52,8 @@ type filter struct { // PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various // information related to the Ethereum protocol such als blocks, transactions and logs. type PublicFilterAPI struct { + backend Backend + useMipMap bool mux *event.TypeMux quit chan struct{} chainDb ethdb.Database @@ -316,7 +318,7 @@ func (api *PublicFilterAPI) NewFilter(crit FilterCriteria) rpc.ID { // GetLogs returns logs matching the given argument that are stored within the state. // // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getlogs -func (api *PublicFilterAPI) GetLogs(crit FilterCriteria) []Log { +func (api *PublicFilterAPI) GetLogs(ctx context.Context, crit FilterCriteria) ([]Log, error) { if crit.FromBlock == nil { crit.FromBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) } @@ -324,13 +326,14 @@ func (api *PublicFilterAPI) GetLogs(crit FilterCriteria) []Log { crit.ToBlock = big.NewInt(rpc.LatestBlockNumber.Int64()) } - filter := New(api.chainDb) + filter := New(api.backend, api.useMipMap) filter.SetBeginBlock(crit.FromBlock.Int64()) filter.SetEndBlock(crit.ToBlock.Int64()) filter.SetAddresses(crit.Addresses) filter.SetTopics(crit.Topics) - return returnLogs(filter.Find()) + logs, err := filter.Find(ctx) + return returnLogs(logs), err } // UninstallFilter removes the filter with the given filter id. @@ -354,22 +357,23 @@ func (api *PublicFilterAPI) UninstallFilter(id rpc.ID) bool { // If the filter could not be found an empty array of logs is returned. // // https://github.com/ethereum/wiki/wiki/JSON-RPC#eth_getfilterlogs -func (api *PublicFilterAPI) GetFilterLogs(id rpc.ID) []Log { +func (api *PublicFilterAPI) GetFilterLogs(ctx context.Context, id rpc.ID) ([]Log, error) { api.filtersMu.Lock() f, found := api.filters[id] api.filtersMu.Unlock() if !found || f.typ != LogsSubscription { - return []Log{} + return []Log{}, nil } - filter := New(api.chainDb) + filter := New(api.backend, api.useMipMap) filter.SetBeginBlock(f.crit.FromBlock.Int64()) filter.SetEndBlock(f.crit.ToBlock.Int64()) filter.SetAddresses(f.crit.Addresses) filter.SetTopics(f.crit.Topics) - return returnLogs(filter.Find()) + logs, err := filter.Find(ctx) + return returnLogs(logs), err } // GetFilterChanges returns the logs for the filter with the given id since diff --git a/eth/filters/filter.go b/eth/filters/filter.go index d181d0892..4004af300 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -24,10 +24,23 @@ import ( "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/rpc" + "golang.org/x/net/context" ) +type Backend interface { + ChainDb() ethdb.Database + EventMux() *event.TypeMux + HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) + GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) +} + // Filter can be used to retrieve and filter logs type Filter struct { + backend Backend + useMipMap bool + created time.Time db ethdb.Database @@ -38,8 +51,12 @@ type Filter struct { // New creates a new filter which uses a bloom filter on blocks to figure out whether // a particular block is interesting or not. -func New(db ethdb.Database) *Filter { - return &Filter{db: db} +func New(backend Backend, useMipMap bool) *Filter { + return &Filter{ + backend: backend, + useMipMap: useMipMap, + db: backend.ChainDb(), + } } // SetBeginBlock sets the earliest block for filtering. @@ -66,30 +83,29 @@ func (f *Filter) SetTopics(topics [][]common.Hash) { } // Run filters logs with the current parameters set -func (f *Filter) Find() []Log { - latestHash := core.GetHeadBlockHash(f.db) - latestBlock := core.GetBlock(f.db, latestHash, core.GetBlockNumber(f.db, latestHash)) - if latestBlock == nil { - return []Log{} +func (f *Filter) Find(ctx context.Context) ([]Log, error) { + head, _ := f.backend.HeaderByNumber(ctx, rpc.LatestBlockNumber) + if head == nil { + return nil, nil } + headBlockNumber := head.Number.Uint64() var beginBlockNo uint64 = uint64(f.begin) if f.begin == -1 { - beginBlockNo = latestBlock.NumberU64() + beginBlockNo = headBlockNumber } - - endBlockNo := uint64(f.end) + var endBlockNo uint64 = uint64(f.end) if f.end == -1 { - endBlockNo = latestBlock.NumberU64() + endBlockNo = headBlockNumber } // if no addresses are present we can't make use of fast search which // uses the mipmap bloom filters to check for fast inclusion and uses // higher range probability in order to ensure at least a false positive - if len(f.addresses) == 0 { - return f.getLogs(beginBlockNo, endBlockNo) + if !f.useMipMap || len(f.addresses) == 0 { + return f.getLogs(ctx, beginBlockNo, endBlockNo) } - return f.mipFind(beginBlockNo, endBlockNo, 0) + return f.mipFind(beginBlockNo, endBlockNo, 0), nil } func (f *Filter) mipFind(start, end uint64, depth int) (logs []Log) { @@ -107,7 +123,8 @@ func (f *Filter) mipFind(start, end uint64, depth int) (logs []Log) { start := uint64(math.Max(float64(num), float64(start))) end := uint64(math.Min(float64(num+level-1), float64(end))) if depth+1 == len(core.MIPMapLevels) { - logs = append(logs, f.getLogs(start, end)...) + l, _ := f.getLogs(context.Background(), start, end) + logs = append(logs, l...) } else { logs = append(logs, f.mipFind(start, end, depth+1)...) } @@ -122,28 +139,22 @@ func (f *Filter) mipFind(start, end uint64, depth int) (logs []Log) { return logs } -func (f *Filter) getLogs(start, end uint64) (logs []Log) { - var block *types.Block - +func (f *Filter) getLogs(ctx context.Context, start, end uint64) (logs []Log, err error) { for i := start; i <= end; i++ { - hash := core.GetCanonicalHash(f.db, i) - if hash != (common.Hash{}) { - block = core.GetBlock(f.db, hash, i) - } else { // block not found - return logs - } - if block == nil { // block not found/written - return logs + header, err := f.backend.HeaderByNumber(ctx, rpc.BlockNumber(i)) + if header == nil || err != nil { + return logs, err } // Use bloom filtering to see if this block is interesting given the // current parameters - if f.bloomFilter(block) { + if f.bloomFilter(header.Bloom) { // Get the logs of the block - var ( - receipts = core.GetBlockReceipts(f.db, block.Hash(), i) - unfiltered []Log - ) + receipts, err := f.backend.GetReceipts(ctx, header.Hash()) + if err != nil { + return nil, err + } + var unfiltered []Log for _, receipt := range receipts { rl := make([]Log, len(receipt.Logs)) for i, l := range receipt.Logs { @@ -155,7 +166,7 @@ func (f *Filter) getLogs(start, end uint64) (logs []Log) { } } - return logs + return logs, nil } func includes(addresses []common.Address, a common.Address) bool { @@ -229,7 +240,7 @@ func bloomFilter(bloom types.Bloom, addresses []common.Address, topics [][]commo for _, sub := range topics { var included bool for _, topic := range sub { - if (topic == common.Hash{}) || types.BloomLookup(block.Bloom(), topic) { + if (topic == common.Hash{}) || types.BloomLookup(bloom, topic) { included = true break } diff --git a/eth/filters/filter_system_test.go b/eth/filters/filter_system_test.go index 1bd4d502d..48d6811c0 100644 --- a/eth/filters/filter_system_test.go +++ b/eth/filters/filter_system_test.go @@ -22,6 +22,8 @@ import ( "testing" "time" + "golang.org/x/net/context" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" @@ -38,6 +40,37 @@ var ( api = NewPublicFilterAPI(backend, false) ) +type testBackend struct { + mux *event.TypeMux + db ethdb.Database +} + +func (b *testBackend) ChainDb() ethdb.Database { + return b.db +} + +func (b *testBackend) EventMux() *event.TypeMux { + return b.mux +} + +func (b *testBackend) HeaderByNumber(ctx context.Context, blockNr rpc.BlockNumber) (*types.Header, error) { + var hash common.Hash + var num uint64 + if blockNr == rpc.LatestBlockNumber { + hash = core.GetHeadBlockHash(b.db) + num = core.GetBlockNumber(b.db, hash) + } else { + num = uint64(blockNr) + hash = core.GetCanonicalHash(b.db, num) + } + return core.GetHeader(b.db, hash, num), nil +} + +func (b *testBackend) GetReceipts(ctx context.Context, blockHash common.Hash) (types.Receipts, error) { + num := core.GetBlockNumber(b.db, blockHash) + return core.GetBlockReceipts(b.db, blockHash, num), nil +} + // TestBlockSubscription tests if a block subscription returns block hashes for posted chain events. // It creates multiple subscriptions: // - one at the start and should receive all posted chain events and a second (blockHashes) diff --git a/eth/filters/filter_test.go b/eth/filters/filter_test.go index 7b714f5d5..e0b24046c 100644 --- a/eth/filters/filter_test.go +++ b/eth/filters/filter_test.go @@ -22,6 +22,8 @@ import ( "os" "testing" + "golang.org/x/net/context" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" @@ -48,6 +50,7 @@ func BenchmarkMipmaps(b *testing.B) { var ( db, _ = ethdb.NewLDBDatabase(dir, 0, 0) + backend = &testBackend{mux, db} key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") addr1 = crypto.PubkeyToAddress(key1.PublicKey) addr2 = common.BytesToAddress([]byte("jeff")) @@ -100,13 +103,13 @@ func BenchmarkMipmaps(b *testing.B) { } b.ResetTimer() - filter := New(db) + filter := New(backend, true) filter.SetAddresses([]common.Address{addr1, addr2, addr3, addr4}) filter.SetBeginBlock(0) filter.SetEndBlock(-1) for i := 0; i < b.N; i++ { - logs := filter.Find() + logs, _ := filter.Find(context.Background()) if len(logs) != 4 { b.Fatal("expected 4 log, got", len(logs)) } @@ -122,6 +125,7 @@ func TestFilters(t *testing.T) { var ( db, _ = ethdb.NewLDBDatabase(dir, 0, 0) + backend = &testBackend{mux, db} key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") addr = crypto.PubkeyToAddress(key1.PublicKey) @@ -201,23 +205,23 @@ func TestFilters(t *testing.T) { } } - filter := New(db) + filter := New(backend, true) filter.SetAddresses([]common.Address{addr}) filter.SetTopics([][]common.Hash{[]common.Hash{hash1, hash2, hash3, hash4}}) filter.SetBeginBlock(0) filter.SetEndBlock(-1) - logs := filter.Find() + logs, _ := filter.Find(context.Background()) if len(logs) != 4 { t.Error("expected 4 log, got", len(logs)) } - filter = New(db) + filter = New(backend, true) filter.SetAddresses([]common.Address{addr}) filter.SetTopics([][]common.Hash{[]common.Hash{hash3}}) filter.SetBeginBlock(900) filter.SetEndBlock(999) - logs = filter.Find() + logs, _ = filter.Find(context.Background()) if len(logs) != 1 { t.Error("expected 1 log, got", len(logs)) } @@ -225,12 +229,12 @@ func TestFilters(t *testing.T) { t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0]) } - filter = New(db) + filter = New(backend, true) filter.SetAddresses([]common.Address{addr}) filter.SetTopics([][]common.Hash{[]common.Hash{hash3}}) filter.SetBeginBlock(990) filter.SetEndBlock(-1) - logs = filter.Find() + logs, _ = filter.Find(context.Background()) if len(logs) != 1 { t.Error("expected 1 log, got", len(logs)) } @@ -238,44 +242,44 @@ func TestFilters(t *testing.T) { t.Errorf("expected log[0].Topics[0] to be %x, got %x", hash3, logs[0].Topics[0]) } - filter = New(db) + filter = New(backend, true) filter.SetTopics([][]common.Hash{[]common.Hash{hash1, hash2}}) filter.SetBeginBlock(1) filter.SetEndBlock(10) - logs = filter.Find() + logs, _ = filter.Find(context.Background()) if len(logs) != 2 { t.Error("expected 2 log, got", len(logs)) } failHash := common.BytesToHash([]byte("fail")) - filter = New(db) + filter = New(backend, true) filter.SetTopics([][]common.Hash{[]common.Hash{failHash}}) filter.SetBeginBlock(0) filter.SetEndBlock(-1) - logs = filter.Find() + logs, _ = filter.Find(context.Background()) if len(logs) != 0 { t.Error("expected 0 log, got", len(logs)) } failAddr := common.BytesToAddress([]byte("failmenow")) - filter = New(db) + filter = New(backend, true) filter.SetAddresses([]common.Address{failAddr}) filter.SetBeginBlock(0) filter.SetEndBlock(-1) - logs = filter.Find() + logs, _ = filter.Find(context.Background()) if len(logs) != 0 { t.Error("expected 0 log, got", len(logs)) } - filter = New(db) + filter = New(backend, true) filter.SetTopics([][]common.Hash{[]common.Hash{failHash}, []common.Hash{hash1}}) filter.SetBeginBlock(0) filter.SetEndBlock(-1) - logs = filter.Find() + logs, _ = filter.Find(context.Background()) if len(logs) != 0 { t.Error("expected 0 log, got", len(logs)) } diff --git a/eth/handler.go b/eth/handler.go index e478990f7..9d6b1ced2 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -68,6 +68,7 @@ type ProtocolManager struct { blockchain *core.BlockChain chaindb ethdb.Database chainconfig *core.ChainConfig + maxPeers int downloader *downloader.Downloader fetcher *fetcher.Fetcher @@ -94,7 +95,7 @@ type ProtocolManager struct { // NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable // with the ethereum network. -func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int, mux *event.TypeMux, txpool txPool, pow pow.PoW, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) { +func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int, maxPeers int, mux *event.TypeMux, txpool txPool, pow pow.PoW, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) { // Create the protocol manager with the base fields manager := &ProtocolManager{ networkId: networkId, @@ -103,6 +104,7 @@ func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int, blockchain: blockchain, chaindb: chaindb, chainconfig: config, + maxPeers: maxPeers, peers: newPeerSet(), newPeerCh: make(chan *peer), noMorePeers: make(chan struct{}), @@ -156,7 +158,7 @@ func NewProtocolManager(config *core.ChainConfig, fastSync bool, networkId int, return nil, errIncompatibleConfig } // Construct the different synchronisation mechanisms - manager.downloader = downloader.New(chaindb, manager.eventMux, blockchain.HasHeader, blockchain.HasBlockAndState, blockchain.GetHeaderByHash, + manager.downloader = downloader.New(downloader.FullSync, chaindb, manager.eventMux, blockchain.HasHeader, blockchain.HasBlockAndState, blockchain.GetHeaderByHash, blockchain.GetBlockByHash, blockchain.CurrentHeader, blockchain.CurrentBlock, blockchain.CurrentFastBlock, blockchain.FastSyncCommitHead, blockchain.GetTdByHash, blockchain.InsertHeaderChain, manager.insertChain, blockchain.InsertReceiptChain, blockchain.Rollback, manager.removePeer) @@ -253,6 +255,10 @@ func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter) *p // handle is the callback invoked to manage the life cycle of an eth peer. When // this function terminates, the peer is disconnected. func (pm *ProtocolManager) handle(p *peer) error { + if pm.peers.Len() >= pm.maxPeers { + return p2p.DiscTooManyPeers + } + glog.V(logger.Debug).Infof("%v: peer connected [%s]", p, p.Name()) // Execute the Ethereum handshake diff --git a/eth/handler_test.go b/eth/handler_test.go index f0f18d0a6..64449afda 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -469,7 +469,7 @@ func testDAOChallenge(t *testing.T, localForked, remoteForked bool, timeout bool config = &core.ChainConfig{DAOForkBlock: big.NewInt(1), DAOForkSupport: localForked} blockchain, _ = core.NewBlockChain(db, config, pow, evmux) ) - pm, err := NewProtocolManager(config, false, NetworkId, evmux, new(testTxPool), pow, blockchain, db) + pm, err := NewProtocolManager(config, false, NetworkId, 1000, evmux, new(testTxPool), pow, blockchain, db) if err != nil { t.Fatalf("failed to start test protocol manager: %v", err) } diff --git a/eth/helper_test.go b/eth/helper_test.go index 732fe89ee..d5295b398 100644 --- a/eth/helper_test.go +++ b/eth/helper_test.go @@ -62,7 +62,7 @@ func newTestProtocolManager(fastSync bool, blocks int, generator func(int, *core panic(err) } - pm, err := NewProtocolManager(chainConfig, fastSync, NetworkId, evmux, &testTxPool{added: newtx}, pow, blockchain, db) + pm, err := NewProtocolManager(chainConfig, fastSync, NetworkId, 1000, evmux, &testTxPool{added: newtx}, pow, blockchain, db) if err != nil { return nil, err } diff --git a/ethdb/database.go b/ethdb/database.go index 479c54b60..96d9a5982 100644 --- a/ethdb/database.go +++ b/ethdb/database.go @@ -40,13 +40,15 @@ var OpenFileLimit = 64 // cacheRatio specifies how the total allotted cache is distributed between the // various system databases. var cacheRatio = map[string]float64{ - "chaindata": 1.0, + "chaindata": 1.0, + "lightchaindata": 1.0, } // handleRatio specifies how the total allotted file descriptors is distributed // between the various system databases. var handleRatio = map[string]float64{ - "chaindata": 1.0, + "chaindata": 1.0, + "lightchaindata": 1.0, } type LDBDatabase struct {