Initial refactoring + vendor update (after rebase)

This commit is contained in:
Victor Farazdagi 2016-09-08 12:45:12 +03:00
parent 4babe9101d
commit 5fb4aef1cc
1092 changed files with 4188 additions and 4610 deletions

View File

@ -1,4 +1,4 @@
package main
package common
import (
"github.com/ethereum/go-ethereum/les/status"

26
common/utils.go Normal file
View File

@ -0,0 +1,26 @@
package common
import (
"io"
"os"
)
func copyFile(dst, src string) error {
s, err := os.Open(src)
if err != nil {
return err
}
defer s.Close()
d, err := os.Create(dst)
if err != nil {
return err
}
defer d.Close()
if _, err := io.Copy(d, s); err != nil {
return err
}
return nil
}

View File

@ -8,7 +8,7 @@ import (
"testing"
"github.com/btcsuite/btcd/chaincfg"
"github.com/status-im/status-go/src/extkeys"
"github.com/status-im/status-go/extkeys"
)
func TestBIP32Vectors(t *testing.T) {
@ -122,7 +122,7 @@ tests:
}
if !extKey.IsPrivate {
t.Errorf("Master node must feature private key")
t.Error("Master node must feature private key")
continue
}
@ -416,7 +416,7 @@ func TestErrors(t *testing.T) {
}
// Generate a new key and neuter it to a public extended key.
mnemonic := extkeys.NewMnemonic()
mnemonic := extkeys.NewMnemonic(extkeys.Salt)
phrase, err := mnemonic.MnemonicPhrase(128, extkeys.EnglishLanguage)
if err != nil {
@ -505,12 +505,12 @@ func TestBIP44ChildDerivation(t *testing.T) {
extKey, err := extkeys.NewKeyFromString(keyString)
if err != nil {
t.Errorf("NewKeyFromString: cannot create extended key")
t.Error("NewKeyFromString: cannot create extended key")
}
accounKey1, err := extKey.BIP44Child(extkeys.CoinTypeETH, 0)
if err != nil {
t.Errorf("Error dering BIP44-compliant key")
t.Error("Error dering BIP44-compliant key")
}
if accounKey1.String() != derivedKey1String {
t.Errorf("BIP44Child: key mismatch -- got: %v, want: %v", accounKey1.String(), derivedKey1String)
@ -519,7 +519,7 @@ func TestBIP44ChildDerivation(t *testing.T) {
accounKey2, err := extKey.BIP44Child(extkeys.CoinTypeETH, 1)
if err != nil {
t.Errorf("Error dering BIP44-compliant key")
t.Error("Error dering BIP44-compliant key")
}
if accounKey2.String() != derivedKey2String {
t.Errorf("BIP44Child: key mismatch -- got: %v, want: %v", accounKey2.String(), derivedKey2String)

File diff suppressed because one or more lines are too long

View File

@ -1,10 +1,11 @@
package extkeys
package extkeys_test
import (
"encoding/json"
"fmt"
"os"
"testing"
"github.com/status-im/status-go/extkeys"
)
type VectorsFile struct {
@ -19,13 +20,13 @@ type Vector struct {
// TestMnemonicPhrase
func TestMnemonicPhrase(t *testing.T) {
mnemonic := NewMnemonic()
mnemonic := extkeys.NewMnemonic(extkeys.Salt)
// test mnemonic generation
t.Logf("Test mnemonic generation:")
t.Log("Test mnemonic generation:")
for _, language := range mnemonic.AvailableLanguages() {
phrase, err := mnemonic.MnemonicPhrase(128, language)
t.Logf("Mnemonic (%s): %s", Languages[language], phrase)
t.Logf("Mnemonic (%s): %s", extkeys.Languages[language], phrase)
if err != nil {
t.Errorf("Test failed: could not create seed: %s", err)
@ -42,11 +43,11 @@ func TestMnemonicPhrase(t *testing.T) {
t.Error(err)
}
t.Logf("Test against pre-computed seed vectors:")
t.Log("Test against pre-computed seed vectors:")
stats := map[string]int{}
for _, vector := range vectorsFile.vectors {
stats[vector.language] += 1
mnemonic.salt = vector.salt
mnemonic := extkeys.NewMnemonic(vector.salt)
seed := mnemonic.MnemonicSeed(vector.mnemonic, vector.password)
if fmt.Sprintf("%x", seed) != vector.seed {
t.Errorf("Test failed (%s): incorrect seed (%x) generated (expected: %s)", vector.language, seed, vector.seed)

View File

@ -1,177 +1,14 @@
package main
import (
"errors"
"flag"
"fmt"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/les"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/release"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/whisper"
"gopkg.in/urfave/cli.v1"
"io"
"os"
"path"
"path/filepath"
"runtime"
)
const (
clientIdentifier = "Geth" // Client identifier to advertise over the network
versionMajor = 1 // Major version component of the current release
versionMinor = 5 // Minor version component of the current release
versionPatch = 0 // Patch version component of the current release
versionMeta = "unstable" // Version metadata to append to the version string
versionOracle = "0xfa7b9770ca4cb04296cac84f37736d4041251cdf" // Ethereum address of the Geth release oracle
)
var (
vString string // Combined textual representation of the version
rConfig release.Config // Structured version information and release oracle config
currentNode *node.Node // currently running geth node
c *cli.Context // the CLI context used to start the geth node
accountSync *[]node.Service // the object used to sync accounts between geth services
lightEthereum *les.LightEthereum // LES service
accountManager *accounts.Manager // the account manager attached to the currentNode
selectedAddress string // address of the account that was processed during the last call to SelectAccount()
whisperService *whisper.Whisper // whisper service
datadir string // data directory for geth
rpcport int = 8545 // RPC port (replaced in unit tests)
client rpc.Client
gitCommit = "rely on linker: -ldflags -X main.GitCommit"
buildStamp = "rely on linker: -ldflags -X main.buildStamp"
)
var (
ErrDataDirPreprocessingFailed = errors.New("Failed to pre-process data directory")
)
func main() {
fmt.Printf("Status\nGit Commit: %s\nBuild Time: %s\n", gitCommit, buildStamp)
}
// MakeNode create a geth node entity
func MakeNode(inputDir string) *node.Node {
datadir := inputDir
// TODO remove admin rpcapi flag
set := flag.NewFlagSet("test", 0)
set.Bool("lightkdf", true, "Reduce key-derivation RAM & CPU usage at some expense of KDF strength")
set.Bool("shh", true, "whisper")
set.Bool("light", true, "disable eth")
set.Bool("testnet", true, "light test network")
set.Bool("rpc", true, "enable rpc")
set.String("rpcaddr", "localhost", "host for RPC")
set.Int("rpcport", rpcport, "rpc port")
set.String("rpccorsdomain", "*", "allow all domains")
set.String("verbosity", "3", "verbosity level")
set.String("rpcapi", "db,eth,net,web3,shh,personal,admin", "rpc api(s)")
set.String("datadir", datadir, "data directory for geth")
set.String("logdir", datadir, "log dir for glog")
c = cli.NewContext(nil, set, nil)
// Construct the textual version string from the individual components
vString = fmt.Sprintf("%d.%d.%d", versionMajor, versionMinor, versionPatch)
// Construct the version release oracle configuration
rConfig.Oracle = common.HexToAddress(versionOracle)
rConfig.Major = uint32(versionMajor)
rConfig.Minor = uint32(versionMinor)
rConfig.Patch = uint32(versionPatch)
utils.DebugSetup(c)
currentNode, accountSync = utils.MakeSystemNode(clientIdentifier, vString, rConfig, makeDefaultExtra(), c)
return currentNode
}
// StartNode starts a geth node entity
func RunNode(nodeIn *node.Node) {
utils.StartNode(nodeIn)
if err := nodeIn.Service(&accountManager); err != nil {
glog.V(logger.Warn).Infoln("cannot get account manager:", err)
}
if err := nodeIn.Service(&whisperService); err != nil {
glog.V(logger.Warn).Infoln("cannot get whisper service:", err)
}
if err := nodeIn.Service(&lightEthereum); err != nil {
glog.V(logger.Warn).Infoln("cannot get light ethereum service:", err)
}
lightEthereum.StatusBackend.SetTransactionQueueHandler(onSendTransactionRequest)
client, _ = nodeIn.Attach()
nodeIn.Wait()
}
func makeDefaultExtra() []byte {
var clientInfo = struct {
Version uint
Name string
GoVersion string
Os string
}{uint(versionMajor<<16 | versionMinor<<8 | versionPatch), clientIdentifier, runtime.Version(), runtime.GOOS}
extra, err := rlp.EncodeToBytes(clientInfo)
if err != nil {
glog.V(logger.Warn).Infoln("error setting canonical miner information:", err)
}
if uint64(len(extra)) > params.MaximumExtraDataSize.Uint64() {
glog.V(logger.Warn).Infoln("error setting canonical miner information: extra exceeds", params.MaximumExtraDataSize)
glog.V(logger.Debug).Infof("extra: %x\n", extra)
return nil
}
return extra
}
func preprocessDataDir(dataDir string) (string, error) {
testDataDir := path.Join(dataDir, "testnet", "keystore")
if _, err := os.Stat(testDataDir); os.IsNotExist(err) {
if err := os.MkdirAll(testDataDir, 0755); err != nil {
return dataDir, ErrDataDirPreprocessingFailed
}
}
// copy over static peer nodes list (LES auto-discovery is not stable yet)
dst := filepath.Join(dataDir, "testnet", "static-nodes.json")
if _, err := os.Stat(dst); os.IsNotExist(err) {
src := filepath.Join("data", "static-nodes.json")
if err := copyFile(dst, src); err != nil {
return dataDir, err
}
}
return dataDir, nil
}
func copyFile(dst, src string) error {
s, err := os.Open(src)
if err != nil {
return err
}
defer s.Close()
d, err := os.Create(dst)
if err != nil {
return err
}
defer d.Close()
if _, err := io.Copy(d, s); err != nil {
return err
}
return nil
}

162
src/node.go Normal file
View File

@ -0,0 +1,162 @@
package main
import (
"errors"
"flag"
"fmt"
"os"
"path"
"path/filepath"
"runtime"
"github.com/ethereum/go-ethereum/accounts"
"github.com/ethereum/go-ethereum/cmd/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/les"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/release"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/whisper"
"gopkg.in/urfave/cli.v1"
)
const (
clientIdentifier = "Geth" // Client identifier to advertise over the network
versionMajor = 1 // Major version component of the current release
versionMinor = 5 // Minor version component of the current release
versionPatch = 0 // Patch version component of the current release
versionMeta = "unstable" // Version metadata to append to the version string
versionOracle = "0xfa7b9770ca4cb04296cac84f37736d4041251cdf" // Ethereum address of the Geth release oracle
)
var (
vString string // Combined textual representation of the version
rConfig release.Config // Structured version information and release oracle config
currentNode *node.Node // currently running geth node
c *cli.Context // the CLI context used to start the geth node
lightEthereum *les.LightEthereum // LES service
accountManager *accounts.Manager // the account manager attached to the currentNode
selectedAddress string // address of the account that was processed during the last call to SelectAccount()
whisperService *whisper.Whisper // whisper service
datadir string // data directory for geth
rpcport int = 8545 // RPC port (replaced in unit tests)
client *rpc.Client
)
var (
ErrDataDirPreprocessingFailed = errors.New("Failed to pre-process data directory")
)
// MakeNode create a geth node entity
func MakeNode(inputDir string) *node.Node {
datadir := inputDir
// TODO remove admin rpcapi flag
set := flag.NewFlagSet("test", 0)
set.Bool("lightkdf", true, "Reduce key-derivation RAM & CPU usage at some expense of KDF strength")
set.Bool("shh", true, "whisper")
set.Bool("light", true, "disable eth")
set.Bool("testnet", true, "light test network")
set.Bool("rpc", true, "enable rpc")
set.String("rpcaddr", "localhost", "host for RPC")
set.Int("rpcport", rpcport, "rpc port")
set.String("rpccorsdomain", "*", "allow all domains")
set.String("verbosity", "3", "verbosity level")
set.String("rpcapi", "db,eth,net,web3,shh,personal,admin", "rpc api(s)")
set.String("datadir", datadir, "data directory for geth")
set.String("logdir", datadir, "log dir for glog")
c = cli.NewContext(nil, set, nil)
// Construct the textual version string from the individual components
vString = fmt.Sprintf("%d.%d.%d", versionMajor, versionMinor, versionPatch)
// Construct the version release oracle configuration
rConfig.Oracle = common.HexToAddress(versionOracle)
rConfig.Major = uint32(versionMajor)
rConfig.Minor = uint32(versionMinor)
rConfig.Patch = uint32(versionPatch)
utils.DebugSetup(c)
currentNode = makeNode(c, clientIdentifier, vString)
return currentNode
}
func makeNode(ctx *cli.Context, name, version string) *node.Node {
nodeIn := utils.MakeNode(ctx, name, version)
utils.RegisterEthService(ctx, nodeIn, rConfig, makeDefaultExtra())
// Whisper must be explicitly enabled, but is auto-enabled in --dev mode.
shhEnabled := ctx.GlobalBool(utils.WhisperEnabledFlag.Name)
shhAutoEnabled := !ctx.GlobalIsSet(utils.WhisperEnabledFlag.Name) && ctx.GlobalIsSet(utils.DevModeFlag.Name)
if shhEnabled || shhAutoEnabled {
utils.RegisterShhService(nodeIn)
}
return nodeIn
}
// StartNode starts a geth node entity
func RunNode(nodeIn *node.Node) {
utils.StartNode(nodeIn)
if err := nodeIn.Service(&accountManager); err != nil {
glog.V(logger.Warn).Infoln("cannot get account manager:", err)
}
if err := nodeIn.Service(&whisperService); err != nil {
glog.V(logger.Warn).Infoln("cannot get whisper service:", err)
}
if err := nodeIn.Service(&lightEthereum); err != nil {
glog.V(logger.Warn).Infoln("cannot get light ethereum service:", err)
}
lightEthereum.StatusBackend.SetTransactionQueueHandler(onSendTransactionRequest)
client, _ = nodeIn.Attach()
nodeIn.Wait()
}
func makeDefaultExtra() []byte {
var clientInfo = struct {
Version uint
Name string
GoVersion string
Os string
}{uint(versionMajor<<16 | versionMinor<<8 | versionPatch), clientIdentifier, runtime.Version(), runtime.GOOS}
extra, err := rlp.EncodeToBytes(clientInfo)
if err != nil {
glog.V(logger.Warn).Infoln("error setting canonical miner information:", err)
}
if uint64(len(extra)) > params.MaximumExtraDataSize.Uint64() {
glog.V(logger.Warn).Infoln("error setting canonical miner information: extra exceeds", params.MaximumExtraDataSize)
glog.V(logger.Debug).Infof("extra: %x\n", extra)
return nil
}
return extra
}
func preprocessDataDir(dataDir string) (string, error) {
testDataDir := path.Join(dataDir, "testnet", "keystore")
if _, err := os.Stat(testDataDir); os.IsNotExist(err) {
if err := os.MkdirAll(testDataDir, 0755); err != nil {
return dataDir, ErrDataDirPreprocessingFailed
}
}
// copy over static peer nodes list (LES auto-discovery is not stable yet)
dst := filepath.Join(dataDir, "testnet", "static-nodes.json")
if _, err := os.Stat(dst); os.IsNotExist(err) {
src := filepath.Join("data", "static-nodes.json")
if err := copyFile(dst, src); err != nil {
return dataDir, err
}
}
return dataDir, nil
}

View File

@ -1,260 +0,0 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package backends
import (
"encoding/json"
"fmt"
"math/big"
"sync"
"sync/atomic"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"golang.org/x/net/context"
)
// This nil assignment ensures compile time that rpcBackend implements bind.ContractBackend.
var _ bind.ContractBackend = (*rpcBackend)(nil)
// rpcBackend implements bind.ContractBackend, and acts as the data provider to
// Ethereum contracts bound to Go structs. It uses an RPC connection to delegate
// all its functionality.
//
// Note: The current implementation is a blocking one. This should be replaced
// by a proper async version when a real RPC client is created.
type rpcBackend struct {
client rpc.Client // RPC client connection to interact with an API server
autoid uint32 // ID number to use for the next API request
lock sync.Mutex // Singleton access until we get to request multiplexing
}
// NewRPCBackend creates a new binding backend to an RPC provider that can be
// used to interact with remote contracts.
func NewRPCBackend(client rpc.Client) bind.ContractBackend {
return &rpcBackend{
client: client,
}
}
// request is a JSON RPC request package assembled internally from the client
// method calls.
type request struct {
JSONRPC string `json:"jsonrpc"` // Version of the JSON RPC protocol, always set to 2.0
ID int `json:"id"` // Auto incrementing ID number for this request
Method string `json:"method"` // Remote procedure name to invoke on the server
Params []interface{} `json:"params"` // List of parameters to pass through (keep types simple)
}
// response is a JSON RPC response package sent back from the API server.
type response struct {
JSONRPC string `json:"jsonrpc"` // Version of the JSON RPC protocol, always set to 2.0
ID int `json:"id"` // Auto incrementing ID number for this request
Error *failure `json:"error"` // Any error returned by the remote side
Result json.RawMessage `json:"result"` // Whatever the remote side sends us in reply
}
// failure is a JSON RPC response error field sent back from the API server.
type failure struct {
Code int `json:"code"` // JSON RPC error code associated with the failure
Message string `json:"message"` // Specific error message of the failure
}
// request forwards an API request to the RPC server, and parses the response.
//
// This is currently painfully non-concurrent, but it will have to do until we
// find the time for niceties like this :P
func (b *rpcBackend) request(ctx context.Context, method string, params []interface{}) (json.RawMessage, error) {
b.lock.Lock()
defer b.lock.Unlock()
if ctx == nil {
ctx = context.Background()
}
// Ugly hack to serialize an empty list properly
if params == nil {
params = []interface{}{}
}
// Assemble the request object
reqID := int(atomic.AddUint32(&b.autoid, 1))
req := &request{
JSONRPC: "2.0",
ID: reqID,
Method: method,
Params: params,
}
if err := b.client.Send(req); err != nil {
return nil, err
}
res := new(response)
errc := make(chan error, 1)
go func() {
errc <- b.client.Recv(res)
}()
select {
case err := <-errc:
if err != nil {
return nil, err
}
case <-ctx.Done():
return nil, ctx.Err()
}
if res.Error != nil {
if res.Error.Message == bind.ErrNoCode.Error() {
return nil, bind.ErrNoCode
}
return nil, fmt.Errorf("remote error: %s", res.Error.Message)
}
return res.Result, nil
}
// HasCode implements ContractVerifier.HasCode by retrieving any code associated
// with the contract from the remote node, and checking its size.
func (b *rpcBackend) HasCode(ctx context.Context, contract common.Address, pending bool) (bool, error) {
// Execute the RPC code retrieval
block := "latest"
if pending {
block = "pending"
}
res, err := b.request(ctx, "eth_getCode", []interface{}{contract.Hex(), block})
if err != nil {
return false, err
}
var hex string
if err := json.Unmarshal(res, &hex); err != nil {
return false, err
}
// Convert the response back to a Go byte slice and return
return len(common.FromHex(hex)) > 0, nil
}
// ContractCall implements ContractCaller.ContractCall, delegating the execution of
// a contract call to the remote node, returning the reply to for local processing.
func (b *rpcBackend) ContractCall(ctx context.Context, contract common.Address, data []byte, pending bool) ([]byte, error) {
// Pack up the request into an RPC argument
args := struct {
To common.Address `json:"to"`
Data string `json:"data"`
}{
To: contract,
Data: common.ToHex(data),
}
// Execute the RPC call and retrieve the response
block := "latest"
if pending {
block = "pending"
}
res, err := b.request(ctx, "eth_call", []interface{}{args, block})
if err != nil {
return nil, err
}
var hex string
if err := json.Unmarshal(res, &hex); err != nil {
return nil, err
}
// Convert the response back to a Go byte slice and return
return common.FromHex(hex), nil
}
// PendingAccountNonce implements ContractTransactor.PendingAccountNonce, delegating
// the current account nonce retrieval to the remote node.
func (b *rpcBackend) PendingAccountNonce(ctx context.Context, account common.Address) (uint64, error) {
res, err := b.request(ctx, "eth_getTransactionCount", []interface{}{account.Hex(), "pending"})
if err != nil {
return 0, err
}
var hex string
if err := json.Unmarshal(res, &hex); err != nil {
return 0, err
}
nonce, ok := new(big.Int).SetString(hex, 0)
if !ok {
return 0, fmt.Errorf("invalid nonce hex: %s", hex)
}
return nonce.Uint64(), nil
}
// SuggestGasPrice implements ContractTransactor.SuggestGasPrice, delegating the
// gas price oracle request to the remote node.
func (b *rpcBackend) SuggestGasPrice(ctx context.Context) (*big.Int, error) {
res, err := b.request(ctx, "eth_gasPrice", nil)
if err != nil {
return nil, err
}
var hex string
if err := json.Unmarshal(res, &hex); err != nil {
return nil, err
}
price, ok := new(big.Int).SetString(hex, 0)
if !ok {
return nil, fmt.Errorf("invalid price hex: %s", hex)
}
return price, nil
}
// EstimateGasLimit implements ContractTransactor.EstimateGasLimit, delegating
// the gas estimation to the remote node.
func (b *rpcBackend) EstimateGasLimit(ctx context.Context, sender common.Address, contract *common.Address, value *big.Int, data []byte) (*big.Int, error) {
// Pack up the request into an RPC argument
args := struct {
From common.Address `json:"from"`
To *common.Address `json:"to"`
Value *rpc.HexNumber `json:"value"`
Data string `json:"data"`
}{
From: sender,
To: contract,
Data: common.ToHex(data),
Value: rpc.NewHexNumber(value),
}
// Execute the RPC call and retrieve the response
res, err := b.request(ctx, "eth_estimateGas", []interface{}{args})
if err != nil {
return nil, err
}
var hex string
if err := json.Unmarshal(res, &hex); err != nil {
return nil, err
}
estimate, ok := new(big.Int).SetString(hex, 0)
if !ok {
return nil, fmt.Errorf("invalid estimate hex: %s", hex)
}
return estimate, nil
}
// SendTransaction implements ContractTransactor.SendTransaction, delegating the
// raw transaction injection to the remote node.
func (b *rpcBackend) SendTransaction(ctx context.Context, tx *types.Transaction) error {
data, err := rlp.EncodeToBytes(tx)
if err != nil {
return err
}
res, err := b.request(ctx, "eth_sendRawTransaction", []interface{}{common.ToHex(data)})
if err != nil {
return err
}
var hex string
if err := json.Unmarshal(res, &hex); err != nil {
return err
}
return nil
}

View File

@ -1,55 +0,0 @@
// Copyright 2015 The go-ethereum Authors
// This file is part of go-ethereum.
//
// go-ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// go-ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with go-ethereum. If not, see <http://www.gnu.org/licenses/>.
package utils
import (
"fmt"
"strings"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
"gopkg.in/urfave/cli.v1"
)
// NewRemoteRPCClient returns a RPC client which connects to a running geth instance.
// Depending on the given context this can either be a IPC or a HTTP client.
func NewRemoteRPCClient(ctx *cli.Context) (rpc.Client, error) {
if ctx.Args().Present() {
endpoint := ctx.Args().First()
return NewRemoteRPCClientFromString(endpoint)
}
// use IPC by default
return rpc.NewIPCClient(node.DefaultIPCEndpoint())
}
// NewRemoteRPCClientFromString returns a RPC client which connects to the given
// endpoint. It must start with either `ipc:` or `rpc:` (HTTP).
func NewRemoteRPCClientFromString(endpoint string) (rpc.Client, error) {
if strings.HasPrefix(endpoint, "ipc:") {
return rpc.NewIPCClient(endpoint[4:])
}
if strings.HasPrefix(endpoint, "rpc:") {
return rpc.NewHTTPClient(endpoint[4:])
}
if strings.HasPrefix(endpoint, "http://") {
return rpc.NewHTTPClient(endpoint)
}
if strings.HasPrefix(endpoint, "ws:") {
return rpc.NewWSClient(endpoint)
}
return nil, fmt.Errorf("invalid endpoint")
}

View File

@ -1,225 +0,0 @@
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package compiler
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
)
var (
versionRegexp = regexp.MustCompile("[0-9]+\\.[0-9]+\\.[0-9]+")
legacyRegexp = regexp.MustCompile("0\\.(9\\..*|1\\.[01])")
paramsLegacy = []string{
"--binary", // Request to output the contract in binary (hexadecimal).
"file", //
"--json-abi", // Request to output the contract's JSON ABI interface.
"file", //
"--natspec-user", // Request to output the contract's Natspec user documentation.
"file", //
"--natspec-dev", // Request to output the contract's Natspec developer documentation.
"file",
"--add-std",
"1",
}
paramsNew = []string{
"--bin", // Request to output the contract in binary (hexadecimal).
"--abi", // Request to output the contract's JSON ABI interface.
"--userdoc", // Request to output the contract's Natspec user documentation.
"--devdoc", // Request to output the contract's Natspec developer documentation.
"--add-std", // include standard lib contracts
"--optimize", // code optimizer switched on
"-o", // output directory
}
)
type Contract struct {
Code string `json:"code"`
Info ContractInfo `json:"info"`
}
type ContractInfo struct {
Source string `json:"source"`
Language string `json:"language"`
LanguageVersion string `json:"languageVersion"`
CompilerVersion string `json:"compilerVersion"`
CompilerOptions string `json:"compilerOptions"`
AbiDefinition interface{} `json:"abiDefinition"`
UserDoc interface{} `json:"userDoc"`
DeveloperDoc interface{} `json:"developerDoc"`
}
type Solidity struct {
solcPath string
version string
fullVersion string
legacy bool
}
func New(solcPath string) (sol *Solidity, err error) {
// set default solc
if len(solcPath) == 0 {
solcPath = "solc"
}
solcPath, err = exec.LookPath(solcPath)
if err != nil {
return
}
cmd := exec.Command(solcPath, "--version")
var out bytes.Buffer
cmd.Stdout = &out
err = cmd.Run()
if err != nil {
return
}
fullVersion := out.String()
version := versionRegexp.FindString(fullVersion)
legacy := legacyRegexp.MatchString(version)
sol = &Solidity{
solcPath: solcPath,
version: version,
fullVersion: fullVersion,
legacy: legacy,
}
glog.V(logger.Info).Infoln(sol.Info())
return
}
func (sol *Solidity) Info() string {
return fmt.Sprintf("%s\npath: %s", sol.fullVersion, sol.solcPath)
}
func (sol *Solidity) Version() string {
return sol.version
}
// Compile builds and returns all the contracts contained within a source string.
func (sol *Solidity) Compile(source string) (map[string]*Contract, error) {
// Short circuit if no source code was specified
if len(source) == 0 {
return nil, errors.New("solc: empty source string")
}
// Create a safe place to dump compilation output
wd, err := ioutil.TempDir("", "solc")
if err != nil {
return nil, fmt.Errorf("solc: failed to create temporary build folder: %v", err)
}
defer os.RemoveAll(wd)
// Assemble the compiler command, change to the temp folder and capture any errors
stderr := new(bytes.Buffer)
var params []string
if sol.legacy {
params = paramsLegacy
} else {
params = paramsNew
params = append(params, wd)
}
compilerOptions := strings.Join(params, " ")
cmd := exec.Command(sol.solcPath, params...)
cmd.Stdin = strings.NewReader(source)
cmd.Stderr = stderr
if err := cmd.Run(); err != nil {
return nil, fmt.Errorf("solc: %v\n%s", err, string(stderr.Bytes()))
}
// Sanity check that something was actually built
matches, _ := filepath.Glob(filepath.Join(wd, "*.bin*"))
if len(matches) < 1 {
return nil, fmt.Errorf("solc: no build results found")
}
// Compilation succeeded, assemble and return the contracts
contracts := make(map[string]*Contract)
for _, path := range matches {
_, file := filepath.Split(path)
base := strings.Split(file, ".")[0]
// Parse the individual compilation results (code binary, ABI definitions, user and dev docs)
var binary []byte
binext := ".bin"
if sol.legacy {
binext = ".binary"
}
if binary, err = ioutil.ReadFile(filepath.Join(wd, base+binext)); err != nil {
return nil, fmt.Errorf("solc: error reading compiler output for code: %v", err)
}
var abi interface{}
if blob, err := ioutil.ReadFile(filepath.Join(wd, base+".abi")); err != nil {
return nil, fmt.Errorf("solc: error reading abi definition: %v", err)
} else if err = json.Unmarshal(blob, &abi); err != nil {
return nil, fmt.Errorf("solc: error parsing abi definition: %v", err)
}
var userdoc interface{}
if blob, err := ioutil.ReadFile(filepath.Join(wd, base+".docuser")); err != nil {
return nil, fmt.Errorf("solc: error reading user doc: %v", err)
} else if err = json.Unmarshal(blob, &userdoc); err != nil {
return nil, fmt.Errorf("solc: error parsing user doc: %v", err)
}
var devdoc interface{}
if blob, err := ioutil.ReadFile(filepath.Join(wd, base+".docdev")); err != nil {
return nil, fmt.Errorf("solc: error reading dev doc: %v", err)
} else if err = json.Unmarshal(blob, &devdoc); err != nil {
return nil, fmt.Errorf("solc: error parsing dev doc: %v", err)
}
// Assemble the final contract
contracts[base] = &Contract{
Code: "0x" + string(binary),
Info: ContractInfo{
Source: source,
Language: "Solidity",
LanguageVersion: sol.version,
CompilerVersion: sol.version,
CompilerOptions: compilerOptions,
AbiDefinition: abi,
UserDoc: userdoc,
DeveloperDoc: devdoc,
},
}
}
return contracts, nil
}
func SaveInfo(info *ContractInfo, filename string) (contenthash common.Hash, err error) {
infojson, err := json.Marshal(info)
if err != nil {
return
}
contenthash = common.BytesToHash(crypto.Keccak256(infojson))
err = ioutil.WriteFile(filename, infojson, 0600)
return
}

View File

@ -1,108 +0,0 @@
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package downloader
import (
"sync"
"golang.org/x/net/context"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/rpc"
)
// PublicDownloaderAPI provides an API which gives information about the current synchronisation status.
// It offers only methods that operates on data that can be available to anyone without security risks.
type PublicDownloaderAPI struct {
d *Downloader
mux *event.TypeMux
muSyncSubscriptions sync.Mutex
syncSubscriptions map[string]rpc.Subscription
}
// NewPublicDownloaderAPI create a new PublicDownloaderAPI.
func NewPublicDownloaderAPI(d *Downloader, m *event.TypeMux) *PublicDownloaderAPI {
api := &PublicDownloaderAPI{d: d, mux: m, syncSubscriptions: make(map[string]rpc.Subscription)}
go api.run()
return api
}
func (api *PublicDownloaderAPI) run() {
sub := api.mux.Subscribe(StartEvent{}, DoneEvent{}, FailedEvent{})
for event := range sub.Chan() {
var notification interface{}
switch event.Data.(type) {
case StartEvent:
result := &SyncingResult{Syncing: true}
result.Status.Origin, result.Status.Current, result.Status.Height, result.Status.Pulled, result.Status.Known = api.d.Progress()
notification = result
case DoneEvent, FailedEvent:
notification = false
}
api.muSyncSubscriptions.Lock()
for id, sub := range api.syncSubscriptions {
if sub.Notify(notification) == rpc.ErrNotificationNotFound {
delete(api.syncSubscriptions, id)
}
}
api.muSyncSubscriptions.Unlock()
}
}
// Progress gives progress indications when the node is synchronising with the Ethereum network.
type Progress struct {
Origin uint64 `json:"startingBlock"`
Current uint64 `json:"currentBlock"`
Height uint64 `json:"highestBlock"`
Pulled uint64 `json:"pulledStates"`
Known uint64 `json:"knownStates"`
}
// SyncingResult provides information about the current synchronisation status for this node.
type SyncingResult struct {
Syncing bool `json:"syncing"`
Status Progress `json:"status"`
}
// Syncing provides information when this nodes starts synchronising with the Ethereum network and when it's finished.
func (api *PublicDownloaderAPI) Syncing(ctx context.Context) (rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return nil, rpc.ErrNotificationsUnsupported
}
subscription, err := notifier.NewSubscription(func(id string) {
api.muSyncSubscriptions.Lock()
delete(api.syncSubscriptions, id)
api.muSyncSubscriptions.Unlock()
})
if err != nil {
return nil, err
}
api.muSyncSubscriptions.Lock()
api.syncSubscriptions[subscription.ID()] = subscription
api.muSyncSubscriptions.Unlock()
return subscription, nil
}

View File

@ -1,655 +0,0 @@
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package filters
import (
"crypto/rand"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/internal/ethapi"
"github.com/ethereum/go-ethereum/rpc"
"golang.org/x/net/context"
)
var (
filterTickerTime = 5 * time.Minute
)
// byte will be inferred
const (
unknownFilterTy = iota
blockFilterTy
transactionFilterTy
logFilterTy
)
// PublicFilterAPI offers support to create and manage filters. This will allow external clients to retrieve various
// information related to the Ethereum protocol such als blocks, transactions and logs.
type PublicFilterAPI struct {
apiBackend ethapi.Backend
quit chan struct{}
chainDb ethdb.Database
mux *event.TypeMux
filterManager *FilterSystem
filterMapMu sync.RWMutex
filterMapping map[string]int // maps between filter internal filter identifiers and external filter identifiers
logMu sync.RWMutex
logQueue map[int]*logQueue
blockMu sync.RWMutex
blockQueue map[int]*hashQueue
transactionMu sync.RWMutex
transactionQueue map[int]*hashQueue
transactMu sync.Mutex
}
// NewPublicFilterAPI returns a new PublicFilterAPI instance.
func NewPublicFilterAPI(apiBackend ethapi.Backend) *PublicFilterAPI {
svc := &PublicFilterAPI{
apiBackend: apiBackend,
mux: apiBackend.EventMux(),
chainDb: apiBackend.ChainDb(),
filterManager: NewFilterSystem(apiBackend.EventMux()),
filterMapping: make(map[string]int),
logQueue: make(map[int]*logQueue),
blockQueue: make(map[int]*hashQueue),
transactionQueue: make(map[int]*hashQueue),
}
go svc.start()
return svc
}
// Stop quits the work loop.
func (s *PublicFilterAPI) Stop() {
close(s.quit)
}
// start the work loop, wait and process events.
func (s *PublicFilterAPI) start() {
timer := time.NewTicker(2 * time.Second)
defer timer.Stop()
done:
for {
select {
case <-timer.C:
s.logMu.Lock()
for id, filter := range s.logQueue {
if time.Since(filter.timeout) > filterTickerTime {
s.filterManager.Remove(id)
delete(s.logQueue, id)
}
}
s.logMu.Unlock()
s.blockMu.Lock()
for id, filter := range s.blockQueue {
if time.Since(filter.timeout) > filterTickerTime {
s.filterManager.Remove(id)
delete(s.blockQueue, id)
}
}
s.blockMu.Unlock()
s.transactionMu.Lock()
for id, filter := range s.transactionQueue {
if time.Since(filter.timeout) > filterTickerTime {
s.filterManager.Remove(id)
delete(s.transactionQueue, id)
}
}
s.transactionMu.Unlock()
case <-s.quit:
break done
}
}
}
// NewBlockFilter create a new filter that returns blocks that are included into the canonical chain.
func (s *PublicFilterAPI) NewBlockFilter() (string, error) {
externalId, err := newFilterId()
if err != nil {
return "", err
}
s.blockMu.Lock()
filter := New(s.apiBackend)
id, err := s.filterManager.Add(filter, ChainFilter)
if err != nil {
return "", err
}
s.blockQueue[id] = &hashQueue{timeout: time.Now()}
filter.BlockCallback = func(block *types.Block, logs vm.Logs) {
s.blockMu.Lock()
defer s.blockMu.Unlock()
if queue := s.blockQueue[id]; queue != nil {
queue.add(block.Hash())
}
}
defer s.blockMu.Unlock()
s.filterMapMu.Lock()
s.filterMapping[externalId] = id
s.filterMapMu.Unlock()
return externalId, nil
}
// NewPendingTransactionFilter creates a filter that returns new pending transactions.
func (s *PublicFilterAPI) NewPendingTransactionFilter() (string, error) {
externalId, err := newFilterId()
if err != nil {
return "", err
}
s.transactionMu.Lock()
defer s.transactionMu.Unlock()
filter := New(s.apiBackend)
id, err := s.filterManager.Add(filter, PendingTxFilter)
if err != nil {
return "", err
}
s.transactionQueue[id] = &hashQueue{timeout: time.Now()}
filter.TransactionCallback = func(tx *types.Transaction) {
s.transactionMu.Lock()
defer s.transactionMu.Unlock()
if queue := s.transactionQueue[id]; queue != nil {
queue.add(tx.Hash())
}
}
s.filterMapMu.Lock()
s.filterMapping[externalId] = id
s.filterMapMu.Unlock()
return externalId, nil
}
// newLogFilter creates a new log filter.
func (s *PublicFilterAPI) newLogFilter(earliest, latest int64, addresses []common.Address, topics [][]common.Hash, callback func(log *vm.Log, removed bool)) (int, error) {
s.logMu.Lock()
defer s.logMu.Unlock()
filter := New(s.apiBackend)
id, err := s.filterManager.Add(filter, LogFilter)
if err != nil {
return 0, err
}
s.logQueue[id] = &logQueue{timeout: time.Now()}
filter.SetBeginBlock(earliest)
filter.SetEndBlock(latest)
filter.SetAddresses(addresses)
filter.SetTopics(topics)
filter.LogCallback = func(log *vm.Log, removed bool) {
if callback != nil {
callback(log, removed)
} else {
s.logMu.Lock()
defer s.logMu.Unlock()
if queue := s.logQueue[id]; queue != nil {
queue.add(vmlog{log, removed})
}
}
}
return id, nil
}
// Logs creates a subscription that fires for all new log that match the given filter criteria.
func (s *PublicFilterAPI) Logs(ctx context.Context, args NewFilterArgs) (rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return nil, rpc.ErrNotificationsUnsupported
}
var (
externalId string
subscription rpc.Subscription
err error
)
if externalId, err = newFilterId(); err != nil {
return nil, err
}
// uninstall filter when subscription is unsubscribed/cancelled
if subscription, err = notifier.NewSubscription(func(string) {
s.UninstallFilter(externalId)
}); err != nil {
return nil, err
}
notifySubscriber := func(log *vm.Log, removed bool) {
rpcLog := toRPCLogs(vm.Logs{log}, removed)
if err := subscription.Notify(rpcLog); err != nil {
subscription.Cancel()
}
}
// from and to block number are not used since subscriptions don't allow you to travel to "time"
var id int
if len(args.Addresses) > 0 {
id, err = s.newLogFilter(-1, -1, args.Addresses, args.Topics, notifySubscriber)
} else {
id, err = s.newLogFilter(-1, -1, nil, args.Topics, notifySubscriber)
}
if err != nil {
subscription.Cancel()
return nil, err
}
s.filterMapMu.Lock()
s.filterMapping[externalId] = id
s.filterMapMu.Unlock()
return subscription, err
}
// NewFilterArgs represents a request to create a new filter.
type NewFilterArgs struct {
FromBlock rpc.BlockNumber
ToBlock rpc.BlockNumber
Addresses []common.Address
Topics [][]common.Hash
}
// UnmarshalJSON sets *args fields with given data.
func (args *NewFilterArgs) UnmarshalJSON(data []byte) error {
type input struct {
From *rpc.BlockNumber `json:"fromBlock"`
ToBlock *rpc.BlockNumber `json:"toBlock"`
Addresses interface{} `json:"address"`
Topics []interface{} `json:"topics"`
}
var raw input
if err := json.Unmarshal(data, &raw); err != nil {
return err
}
if raw.From == nil || raw.From.Int64() < 0 {
args.FromBlock = rpc.LatestBlockNumber
} else {
args.FromBlock = *raw.From
}
if raw.ToBlock == nil || raw.ToBlock.Int64() < 0 {
args.ToBlock = rpc.LatestBlockNumber
} else {
args.ToBlock = *raw.ToBlock
}
args.Addresses = []common.Address{}
if raw.Addresses != nil {
// raw.Address can contain a single address or an array of addresses
var addresses []common.Address
if strAddrs, ok := raw.Addresses.([]interface{}); ok {
for i, addr := range strAddrs {
if strAddr, ok := addr.(string); ok {
if len(strAddr) >= 2 && strAddr[0] == '0' && (strAddr[1] == 'x' || strAddr[1] == 'X') {
strAddr = strAddr[2:]
}
if decAddr, err := hex.DecodeString(strAddr); err == nil {
addresses = append(addresses, common.BytesToAddress(decAddr))
} else {
return fmt.Errorf("invalid address given")
}
} else {
return fmt.Errorf("invalid address on index %d", i)
}
}
} else if singleAddr, ok := raw.Addresses.(string); ok {
if len(singleAddr) >= 2 && singleAddr[0] == '0' && (singleAddr[1] == 'x' || singleAddr[1] == 'X') {
singleAddr = singleAddr[2:]
}
if decAddr, err := hex.DecodeString(singleAddr); err == nil {
addresses = append(addresses, common.BytesToAddress(decAddr))
} else {
return fmt.Errorf("invalid address given")
}
} else {
return errors.New("invalid address(es) given")
}
args.Addresses = addresses
}
// helper function which parses a string to a topic hash
topicConverter := func(raw string) (common.Hash, error) {
if len(raw) == 0 {
return common.Hash{}, nil
}
if len(raw) >= 2 && raw[0] == '0' && (raw[1] == 'x' || raw[1] == 'X') {
raw = raw[2:]
}
if len(raw) != 2*common.HashLength {
return common.Hash{}, errors.New("invalid topic(s)")
}
if decAddr, err := hex.DecodeString(raw); err == nil {
return common.BytesToHash(decAddr), nil
}
return common.Hash{}, errors.New("invalid topic(s)")
}
// topics is an array consisting of strings and/or arrays of strings.
// JSON null values are converted to common.Hash{} and ignored by the filter manager.
if len(raw.Topics) > 0 {
args.Topics = make([][]common.Hash, len(raw.Topics))
for i, t := range raw.Topics {
if t == nil { // ignore topic when matching logs
args.Topics[i] = []common.Hash{common.Hash{}}
} else if topic, ok := t.(string); ok { // match specific topic
top, err := topicConverter(topic)
if err != nil {
return err
}
args.Topics[i] = []common.Hash{top}
} else if topics, ok := t.([]interface{}); ok { // or case e.g. [null, "topic0", "topic1"]
for _, rawTopic := range topics {
if rawTopic == nil {
args.Topics[i] = append(args.Topics[i], common.Hash{})
} else if topic, ok := rawTopic.(string); ok {
parsed, err := topicConverter(topic)
if err != nil {
return err
}
args.Topics[i] = append(args.Topics[i], parsed)
} else {
return fmt.Errorf("invalid topic(s)")
}
}
} else {
return fmt.Errorf("invalid topic(s)")
}
}
}
return nil
}
// NewFilter creates a new filter and returns the filter id. It can be uses to retrieve logs.
func (s *PublicFilterAPI) NewFilter(args NewFilterArgs) (string, error) {
externalId, err := newFilterId()
if err != nil {
return "", err
}
var id int
if len(args.Addresses) > 0 {
id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), args.Addresses, args.Topics, nil)
} else {
id, err = s.newLogFilter(args.FromBlock.Int64(), args.ToBlock.Int64(), nil, args.Topics, nil)
}
if err != nil {
return "", err
}
s.filterMapMu.Lock()
s.filterMapping[externalId] = id
s.filterMapMu.Unlock()
return externalId, nil
}
// GetLogs returns the logs matching the given argument.
func (s *PublicFilterAPI) GetLogs(ctx context.Context, args NewFilterArgs) ([]vmlog, error) {
filter := New(s.apiBackend)
filter.SetBeginBlock(args.FromBlock.Int64())
filter.SetEndBlock(args.ToBlock.Int64())
filter.SetAddresses(args.Addresses)
filter.SetTopics(args.Topics)
logs, err := filter.Find(ctx)
return toRPCLogs(logs, false), err
}
// UninstallFilter removes the filter with the given filter id.
func (s *PublicFilterAPI) UninstallFilter(filterId string) bool {
s.filterMapMu.Lock()
defer s.filterMapMu.Unlock()
id, ok := s.filterMapping[filterId]
if !ok {
return false
}
defer s.filterManager.Remove(id)
delete(s.filterMapping, filterId)
if _, ok := s.logQueue[id]; ok {
s.logMu.Lock()
defer s.logMu.Unlock()
delete(s.logQueue, id)
return true
}
if _, ok := s.blockQueue[id]; ok {
s.blockMu.Lock()
defer s.blockMu.Unlock()
delete(s.blockQueue, id)
return true
}
if _, ok := s.transactionQueue[id]; ok {
s.transactionMu.Lock()
defer s.transactionMu.Unlock()
delete(s.transactionQueue, id)
return true
}
return false
}
// getFilterType is a helper utility that determine the type of filter for the given filter id.
func (s *PublicFilterAPI) getFilterType(id int) byte {
if _, ok := s.blockQueue[id]; ok {
return blockFilterTy
} else if _, ok := s.transactionQueue[id]; ok {
return transactionFilterTy
} else if _, ok := s.logQueue[id]; ok {
return logFilterTy
}
return unknownFilterTy
}
// blockFilterChanged returns a collection of block hashes for the block filter with the given id.
func (s *PublicFilterAPI) blockFilterChanged(id int) []common.Hash {
s.blockMu.Lock()
defer s.blockMu.Unlock()
if s.blockQueue[id] != nil {
res := s.blockQueue[id].get()
return res
}
return nil
}
// transactionFilterChanged returns a collection of transaction hashes for the pending
// transaction filter with the given id.
func (s *PublicFilterAPI) transactionFilterChanged(id int) []common.Hash {
s.blockMu.Lock()
defer s.blockMu.Unlock()
if s.transactionQueue[id] != nil {
return s.transactionQueue[id].get()
}
return nil
}
// logFilterChanged returns a collection of logs for the log filter with the given id.
func (s *PublicFilterAPI) logFilterChanged(id int) []vmlog {
s.logMu.Lock()
defer s.logMu.Unlock()
if s.logQueue[id] != nil {
return s.logQueue[id].get()
}
return nil
}
// GetFilterLogs returns the logs for the filter with the given id.
func (s *PublicFilterAPI) GetFilterLogs(ctx context.Context, filterId string) ([]vmlog, error) {
id, ok := s.filterMapping[filterId]
if !ok {
return toRPCLogs(nil, false), nil
}
if filter := s.filterManager.Get(id); filter != nil {
logs, err := filter.Find(ctx)
return toRPCLogs(logs, false), err
}
return toRPCLogs(nil, false), nil
}
// GetFilterChanges returns the logs for the filter with the given id since last time is was called.
// This can be used for polling.
func (s *PublicFilterAPI) GetFilterChanges(filterId string) interface{} {
s.filterMapMu.Lock()
id, ok := s.filterMapping[filterId]
s.filterMapMu.Unlock()
if !ok { // filter not found
return []interface{}{}
}
switch s.getFilterType(id) {
case blockFilterTy:
return returnHashes(s.blockFilterChanged(id))
case transactionFilterTy:
return returnHashes(s.transactionFilterChanged(id))
case logFilterTy:
return s.logFilterChanged(id)
}
return []interface{}{}
}
type vmlog struct {
*vm.Log
Removed bool `json:"removed"`
}
type logQueue struct {
mu sync.Mutex
logs []vmlog
timeout time.Time
id int
}
func (l *logQueue) add(logs ...vmlog) {
l.mu.Lock()
defer l.mu.Unlock()
l.logs = append(l.logs, logs...)
}
func (l *logQueue) get() []vmlog {
l.mu.Lock()
defer l.mu.Unlock()
l.timeout = time.Now()
tmp := l.logs
l.logs = nil
return tmp
}
type hashQueue struct {
mu sync.Mutex
hashes []common.Hash
timeout time.Time
id int
}
func (l *hashQueue) add(hashes ...common.Hash) {
l.mu.Lock()
defer l.mu.Unlock()
l.hashes = append(l.hashes, hashes...)
}
func (l *hashQueue) get() []common.Hash {
l.mu.Lock()
defer l.mu.Unlock()
l.timeout = time.Now()
tmp := l.hashes
l.hashes = nil
return tmp
}
// newFilterId generates a new random filter identifier that can be exposed to the outer world. By publishing random
// identifiers it is not feasible for DApp's to guess filter id's for other DApp's and uninstall or poll for them
// causing the affected DApp to miss data.
func newFilterId() (string, error) {
var subid [16]byte
n, _ := rand.Read(subid[:])
if n != 16 {
return "", errors.New("Unable to generate filter id")
}
return "0x" + hex.EncodeToString(subid[:]), nil
}
// toRPCLogs is a helper that will convert a vm.Logs array to an structure which
// can hold additional information about the logs such as whether it was deleted.
// Additionally when nil is given it will by default instead create an empty slice
// instead. This is required by the RPC specification.
func toRPCLogs(logs vm.Logs, removed bool) []vmlog {
convertedLogs := make([]vmlog, len(logs))
for i, log := range logs {
convertedLogs[i] = vmlog{Log: log, Removed: removed}
}
return convertedLogs
}
// returnHashes is a helper that will return an empty hash array case the given hash array is nil, otherwise is will
// return the given hashes. The RPC interfaces defines that always an array is returned.
func returnHashes(hashes []common.Hash) []common.Hash {
if hashes == nil {
return []common.Hash{}
}
return hashes
}

View File

@ -1,185 +0,0 @@
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
// package filters implements an ethereum filtering system for block,
// transactions and log events.
package filters
import (
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/event"
)
// FilterType determines the type of filter and is used to put the filter in to
// the correct bucket when added.
type FilterType byte
const (
ChainFilter FilterType = iota // new block events filter
PendingTxFilter // pending transaction filter
LogFilter // new or removed log filter
PendingLogFilter // pending log filter
)
// FilterSystem manages filters that filter specific events such as
// block, transaction and log events. The Filtering system can be used to listen
// for specific LOG events fired by the EVM (Ethereum Virtual Machine).
type FilterSystem struct {
filterMu sync.RWMutex
filterId int
chainFilters map[int]*Filter
pendingTxFilters map[int]*Filter
logFilters map[int]*Filter
pendingLogFilters map[int]*Filter
// generic is an ugly hack for Get
generic map[int]*Filter
sub event.Subscription
}
// NewFilterSystem returns a newly allocated filter manager
func NewFilterSystem(mux *event.TypeMux) *FilterSystem {
fs := &FilterSystem{
chainFilters: make(map[int]*Filter),
pendingTxFilters: make(map[int]*Filter),
logFilters: make(map[int]*Filter),
pendingLogFilters: make(map[int]*Filter),
generic: make(map[int]*Filter),
}
fs.sub = mux.Subscribe(
core.PendingLogsEvent{},
core.RemovedLogsEvent{},
core.ChainEvent{},
core.TxPreEvent{},
vm.Logs(nil),
)
go fs.filterLoop()
return fs
}
// Stop quits the filter loop required for polling events
func (fs *FilterSystem) Stop() {
fs.sub.Unsubscribe()
}
// Add adds a filter to the filter manager
func (fs *FilterSystem) Add(filter *Filter, filterType FilterType) (int, error) {
fs.filterMu.Lock()
defer fs.filterMu.Unlock()
id := fs.filterId
filter.created = time.Now()
switch filterType {
case ChainFilter:
fs.chainFilters[id] = filter
case PendingTxFilter:
fs.pendingTxFilters[id] = filter
case LogFilter:
fs.logFilters[id] = filter
case PendingLogFilter:
fs.pendingLogFilters[id] = filter
default:
return 0, fmt.Errorf("unknown filter type %v", filterType)
}
fs.generic[id] = filter
fs.filterId++
return id, nil
}
// Remove removes a filter by filter id
func (fs *FilterSystem) Remove(id int) {
fs.filterMu.Lock()
defer fs.filterMu.Unlock()
delete(fs.chainFilters, id)
delete(fs.pendingTxFilters, id)
delete(fs.logFilters, id)
delete(fs.pendingLogFilters, id)
delete(fs.generic, id)
}
func (fs *FilterSystem) Get(id int) *Filter {
fs.filterMu.RLock()
defer fs.filterMu.RUnlock()
return fs.generic[id]
}
// filterLoop waits for specific events from ethereum and fires their handlers
// when the filter matches the requirements.
func (fs *FilterSystem) filterLoop() {
for event := range fs.sub.Chan() {
switch ev := event.Data.(type) {
case core.ChainEvent:
fs.filterMu.RLock()
for _, filter := range fs.chainFilters {
if filter.BlockCallback != nil && !filter.created.After(event.Time) {
filter.BlockCallback(ev.Block, ev.Logs)
}
}
fs.filterMu.RUnlock()
case core.TxPreEvent:
fs.filterMu.RLock()
for _, filter := range fs.pendingTxFilters {
if filter.TransactionCallback != nil && !filter.created.After(event.Time) {
filter.TransactionCallback(ev.Tx)
}
}
fs.filterMu.RUnlock()
case vm.Logs:
fs.filterMu.RLock()
for _, filter := range fs.logFilters {
if filter.LogCallback != nil && !filter.created.After(event.Time) {
for _, log := range filter.FilterLogs(ev) {
filter.LogCallback(log, false)
}
}
}
fs.filterMu.RUnlock()
case core.RemovedLogsEvent:
fs.filterMu.RLock()
for _, filter := range fs.logFilters {
if filter.LogCallback != nil && !filter.created.After(event.Time) {
for _, removedLog := range filter.FilterLogs(ev.Logs) {
filter.LogCallback(removedLog, true)
}
}
}
fs.filterMu.RUnlock()
case core.PendingLogsEvent:
fs.filterMu.RLock()
for _, filter := range fs.pendingLogFilters {
if filter.LogCallback != nil && !filter.created.After(event.Time) {
for _, pendingLog := range ev.Logs {
filter.LogCallback(pendingLog, false)
}
}
}
fs.filterMu.RUnlock()
}
}
}

View File

@ -1,142 +0,0 @@
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rpc
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strings"
"github.com/rs/cors"
)
const (
maxHTTPRequestContentLength = 1024 * 128
)
// httpClient connects to a geth RPC server over HTTP.
type httpClient struct {
endpoint *url.URL // HTTP-RPC server endpoint
httpClient http.Client // reuse connection
lastRes []byte // HTTP requests are synchronous, store last response
}
// NewHTTPClient create a new RPC clients that connection to a geth RPC server
// over HTTP.
func NewHTTPClient(endpoint string) (Client, error) {
url, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
return &httpClient{endpoint: url}, nil
}
// Send will serialize the given msg to JSON and sends it to the RPC server.
// Since HTTP is synchronous the response is stored until Recv is called.
func (client *httpClient) Send(msg interface{}) error {
var body []byte
var err error
client.lastRes = nil
if body, err = json.Marshal(msg); err != nil {
return err
}
resp, err := client.httpClient.Post(client.endpoint.String(), "application/json", bytes.NewReader(body))
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode == http.StatusOK {
client.lastRes, err = ioutil.ReadAll(resp.Body)
return err
}
return fmt.Errorf("request failed: %s", resp.Status)
}
// Recv will try to deserialize the last received response into the given msg.
func (client *httpClient) Recv(msg interface{}) error {
return json.Unmarshal(client.lastRes, &msg)
}
// Close is not necessary for httpClient
func (client *httpClient) Close() {
}
// SupportedModules will return the collection of offered RPC modules.
func (client *httpClient) SupportedModules() (map[string]string, error) {
return SupportedModules(client)
}
// httpReadWriteNopCloser wraps a io.Reader and io.Writer with a NOP Close method.
type httpReadWriteNopCloser struct {
io.Reader
io.Writer
}
// Close does nothing and returns always nil
func (t *httpReadWriteNopCloser) Close() error {
return nil
}
// newJSONHTTPHandler creates a HTTP handler that will parse incoming JSON requests,
// send the request to the given API provider and sends the response back to the caller.
func newJSONHTTPHandler(srv *Server) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.ContentLength > maxHTTPRequestContentLength {
http.Error(w,
fmt.Sprintf("content length too large (%d>%d)", r.ContentLength, maxHTTPRequestContentLength),
http.StatusRequestEntityTooLarge)
return
}
w.Header().Set("content-type", "application/json")
// create a codec that reads direct from the request body until
// EOF and writes the response to w and order the server to process
// a single request.
codec := NewJSONCodec(&httpReadWriteNopCloser{r.Body, w})
defer codec.Close()
srv.ServeSingleRequest(codec, OptionMethodInvocation)
}
}
// NewHTTPServer creates a new HTTP RPC server around an API provider.
func NewHTTPServer(corsString string, srv *Server) *http.Server {
var allowedOrigins []string
for _, domain := range strings.Split(corsString, ",") {
allowedOrigins = append(allowedOrigins, strings.TrimSpace(domain))
}
c := cors.New(cors.Options{
AllowedOrigins: allowedOrigins,
AllowedMethods: []string{"POST", "GET"},
})
handler := c.Handler(newJSONHTTPHandler(srv))
return &http.Server{
Handler: handler,
}
}

View File

@ -1,61 +0,0 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rpc
import (
"encoding/json"
"io"
"net"
)
// inProcClient is an in-process buffer stream attached to an RPC server.
type inProcClient struct {
server *Server
cl io.Closer
enc *json.Encoder
dec *json.Decoder
}
// Close tears down the request channel of the in-proc client.
func (c *inProcClient) Close() {
c.cl.Close()
}
// NewInProcRPCClient creates an in-process buffer stream attachment to a given
// RPC server.
func NewInProcRPCClient(handler *Server) Client {
p1, p2 := net.Pipe()
go handler.ServeCodec(NewJSONCodec(p1), OptionMethodInvocation|OptionSubscriptions)
return &inProcClient{handler, p2, json.NewEncoder(p2), json.NewDecoder(p2)}
}
// Send marshals a message into a json format and injects in into the client
// request channel.
func (c *inProcClient) Send(msg interface{}) error {
return c.enc.Encode(msg)
}
// Recv reads a message from the response channel and tries to parse it into the
// given msg interface.
func (c *inProcClient) Recv(msg interface{}) error {
return c.dec.Decode(msg)
}
// Returns the collection of modules the RPC server offers.
func (c *inProcClient) SupportedModules() (map[string]string, error) {
return SupportedModules(c)
}

View File

@ -1,84 +0,0 @@
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rpc
import (
"encoding/json"
"net"
)
// CreateIPCListener creates an listener, on Unix platforms this is a unix socket, on Windows this is a named pipe
func CreateIPCListener(endpoint string) (net.Listener, error) {
return ipcListen(endpoint)
}
// ipcClient represent an IPC RPC client. It will connect to a given endpoint and tries to communicate with a node using
// JSON serialization.
type ipcClient struct {
endpoint string
conn net.Conn
out *json.Encoder
in *json.Decoder
}
// NewIPCClient create a new IPC client that will connect on the given endpoint. Messages are JSON encoded and encoded.
// On Unix it assumes the endpoint is the full path to a unix socket, and Windows the endpoint is an identifier for a
// named pipe.
func NewIPCClient(endpoint string) (Client, error) {
conn, err := newIPCConnection(endpoint)
if err != nil {
return nil, err
}
return &ipcClient{endpoint: endpoint, conn: conn, in: json.NewDecoder(conn), out: json.NewEncoder(conn)}, nil
}
// Send will serialize the given message and send it to the server.
// When sending the message fails it will try to reconnect once and send the message again.
func (client *ipcClient) Send(msg interface{}) error {
if err := client.out.Encode(msg); err == nil {
return nil
}
// retry once
client.conn.Close()
conn, err := newIPCConnection(client.endpoint)
if err != nil {
return err
}
client.conn = conn
client.in = json.NewDecoder(conn)
client.out = json.NewEncoder(conn)
return client.out.Encode(msg)
}
// Recv will read a message from the connection and tries to parse it. It assumes the received message is JSON encoded.
func (client *ipcClient) Recv(msg interface{}) error {
return client.in.Decode(&msg)
}
// Close will close the underlying IPC connection
func (client *ipcClient) Close() {
client.conn.Close()
}
// SupportedModules will return the collection of offered RPC modules.
func (client *ipcClient) SupportedModules() (map[string]string, error) {
return SupportedModules(client)
}

View File

@ -1,297 +0,0 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rpc
import (
"errors"
"sync"
"time"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"golang.org/x/net/context"
)
var (
// ErrNotificationsUnsupported is returned when the connection doesn't support notifications
ErrNotificationsUnsupported = errors.New("notifications not supported")
// ErrNotificationNotFound is returned when the notification for the given id is not found
ErrNotificationNotFound = errors.New("notification not found")
// errNotifierStopped is returned when the notifier is stopped (e.g. codec is closed)
errNotifierStopped = errors.New("unable to send notification")
// errNotificationQueueFull is returns when there are too many notifications in the queue
errNotificationQueueFull = errors.New("too many pending notifications")
)
// unsubSignal is a signal that the subscription is unsubscribed. It is used to flush buffered
// notifications that might be pending in the internal queue.
var unsubSignal = new(struct{})
// UnsubscribeCallback defines a callback that is called when a subcription ends.
// It receives the subscription id as argument.
type UnsubscribeCallback func(id string)
// notification is a helper object that holds event data for a subscription
type notification struct {
sub *bufferedSubscription // subscription id
data interface{} // event data
}
// A Notifier type describes the interface for objects that can send create subscriptions
type Notifier interface {
// Create a new subscription. The given callback is called when this subscription
// is cancelled (e.g. client send an unsubscribe, connection closed).
NewSubscription(UnsubscribeCallback) (Subscription, error)
// Cancel subscription
Unsubscribe(id string) error
}
type notifierKey struct{}
// NotifierFromContext returns the Notifier value stored in ctx, if any.
func NotifierFromContext(ctx context.Context) (Notifier, bool) {
n, ok := ctx.Value(notifierKey{}).(Notifier)
return n, ok
}
// Subscription defines the interface for objects that can notify subscribers
type Subscription interface {
// Inform client of an event
Notify(data interface{}) error
// Unique identifier
ID() string
// Cancel subscription
Cancel() error
}
// bufferedSubscription is a subscription that uses a bufferedNotifier to send
// notifications to subscribers.
type bufferedSubscription struct {
id string
unsubOnce sync.Once // call unsub method once
unsub UnsubscribeCallback // called on Unsubscribed
notifier *bufferedNotifier // forward notifications to
pending chan interface{} // closed when active
flushed chan interface{} // closed when all buffered notifications are send
lastNotification time.Time // last time a notification was send
}
// ID returns the subscription identifier that the client uses to refer to this instance.
func (s *bufferedSubscription) ID() string {
return s.id
}
// Cancel informs the notifier that this subscription is cancelled by the API
func (s *bufferedSubscription) Cancel() error {
return s.notifier.Unsubscribe(s.id)
}
// Notify the subscriber of a particular event.
func (s *bufferedSubscription) Notify(data interface{}) error {
return s.notifier.send(s.id, data)
}
// bufferedNotifier is a notifier that queues notifications in an internal queue and
// send them as fast as possible to the client from this queue. It will stop if the
// queue grows past a given size.
type bufferedNotifier struct {
codec ServerCodec // underlying connection
mu sync.Mutex // guard internal state
subscriptions map[string]*bufferedSubscription // keep track of subscriptions associated with codec
queueSize int // max number of items in queue
queue chan *notification // notification queue
stopped bool // indication if this notifier is ordered to stop
}
// newBufferedNotifier returns a notifier that queues notifications in an internal queue
// from which notifications are send as fast as possible to the client. If the queue size
// limit is reached (client is unable to keep up) it will stop and closes the codec.
func newBufferedNotifier(codec ServerCodec, size int) *bufferedNotifier {
notifier := &bufferedNotifier{
codec: codec,
subscriptions: make(map[string]*bufferedSubscription),
queue: make(chan *notification, size),
queueSize: size,
}
go notifier.run()
return notifier
}
// NewSubscription creates a new subscription that forwards events to this instance internal
// queue. The given callback is called when the subscription is unsubscribed/cancelled.
func (n *bufferedNotifier) NewSubscription(callback UnsubscribeCallback) (Subscription, error) {
id, err := newSubscriptionID()
if err != nil {
return nil, err
}
n.mu.Lock()
defer n.mu.Unlock()
if n.stopped {
return nil, errNotifierStopped
}
sub := &bufferedSubscription{
id: id,
unsub: callback,
notifier: n,
pending: make(chan interface{}),
flushed: make(chan interface{}),
lastNotification: time.Now(),
}
n.subscriptions[id] = sub
return sub, nil
}
// Remove the given subscription. If subscription is not found notificationNotFoundErr is returned.
func (n *bufferedNotifier) Unsubscribe(subid string) error {
n.mu.Lock()
sub, found := n.subscriptions[subid]
n.mu.Unlock()
if found {
// send the unsubscribe signal, this will cause the notifier not to accept new events
// for this subscription and will close the flushed channel after the last (buffered)
// notification was send to the client.
if err := n.send(subid, unsubSignal); err != nil {
return err
}
// wait for confirmation that all (buffered) events are send for this subscription.
// this ensures that the unsubscribe method response is not send before all buffered
// events for this subscription are send.
<-sub.flushed
return nil
}
return ErrNotificationNotFound
}
// Send enques the given data for the subscription with public ID on the internal queue. t returns
// an error when the notifier is stopped or the queue is full. If data is the unsubscribe signal it
// will remove the subscription with the given id from the subscription collection.
func (n *bufferedNotifier) send(id string, data interface{}) error {
n.mu.Lock()
defer n.mu.Unlock()
if n.stopped {
return errNotifierStopped
}
var (
subscription *bufferedSubscription
found bool
)
// check if subscription is associated with this connection, it might be cancelled
// (subscribe/connection closed)
if subscription, found = n.subscriptions[id]; !found {
glog.V(logger.Error).Infof("received notification for unknown subscription %s\n", id)
return ErrNotificationNotFound
}
// received the unsubscribe signal. Add it to the queue to make sure any pending notifications
// for this subscription are send. When the run loop receives this singal it will signal that
// all pending subscriptions are flushed and that the confirmation of the unsubscribe can be
// send to the user. Remove the subscriptions to make sure new notifications are not accepted.
if data == unsubSignal {
delete(n.subscriptions, id)
if subscription.unsub != nil {
subscription.unsubOnce.Do(func() { subscription.unsub(id) })
}
}
subscription.lastNotification = time.Now()
if len(n.queue) >= n.queueSize {
glog.V(logger.Warn).Infoln("too many buffered notifications -> close connection")
n.codec.Close()
return errNotificationQueueFull
}
n.queue <- &notification{subscription, data}
return nil
}
// run reads notifications from the internal queue and sends them to the client. In case of an
// error, or when the codec is closed it will cancel all active subscriptions and returns.
func (n *bufferedNotifier) run() {
defer func() {
n.mu.Lock()
defer n.mu.Unlock()
n.stopped = true
close(n.queue)
// on exit call unsubscribe callback
for id, sub := range n.subscriptions {
if sub.unsub != nil {
sub.unsubOnce.Do(func() { sub.unsub(id) })
}
close(sub.flushed)
delete(n.subscriptions, id)
}
}()
for {
select {
case notification := <-n.queue:
// It can happen that an event is raised before the RPC server was able to send the sub
// id to the client. Therefore subscriptions are marked as pending until the sub id was
// send. The RPC server will activate the subscription by closing the pending chan.
<-notification.sub.pending
if notification.data == unsubSignal {
// unsubSignal is the last accepted message for this subscription. Raise the signal
// that all buffered notifications are sent by closing the flushed channel. This
// indicates that the response for the unsubscribe can be send to the client.
close(notification.sub.flushed)
} else {
msg := n.codec.CreateNotification(notification.sub.id, notification.data)
if err := n.codec.Write(msg); err != nil {
n.codec.Close()
// unable to send notification to client, unsubscribe all subscriptions
glog.V(logger.Warn).Infof("unable to send notification - %v\n", err)
return
}
}
case <-n.codec.Closed(): // connection was closed
glog.V(logger.Debug).Infoln("codec closed, stop subscriptions")
return
}
}
}
// Marks the subscription as active. This will causes the notifications for this subscription to be
// forwarded to the client.
func (n *bufferedNotifier) activate(subid string) {
n.mu.Lock()
defer n.mu.Unlock()
if sub, found := n.subscriptions[subid]; found {
close(sub.pending)
}
}

View File

@ -1,182 +0,0 @@
// Copyright 2015 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package rpc
import (
"fmt"
"net/http"
"os"
"strings"
"sync"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"golang.org/x/net/websocket"
"gopkg.in/fatih/set.v0"
)
// wsReaderWriterCloser reads and write payloads from and to a websocket connection.
type wsReaderWriterCloser struct {
c *websocket.Conn
}
// Read will read incoming payload data into p.
func (rw *wsReaderWriterCloser) Read(p []byte) (int, error) {
return rw.c.Read(p)
}
// Write writes p to the websocket.
func (rw *wsReaderWriterCloser) Write(p []byte) (int, error) {
return rw.c.Write(p)
}
// Close closes the websocket connection.
func (rw *wsReaderWriterCloser) Close() error {
return rw.c.Close()
}
// wsHandshakeValidator returns a handler that verifies the origin during the
// websocket upgrade process. When a '*' is specified as an allowed origins all
// connections are accepted.
func wsHandshakeValidator(allowedOrigins []string) func(*websocket.Config, *http.Request) error {
origins := set.New()
allowAllOrigins := false
for _, origin := range allowedOrigins {
if origin == "*" {
allowAllOrigins = true
}
if origin != "" {
origins.Add(strings.ToLower(origin))
}
}
// allow localhost if no allowedOrigins are specified.
if len(origins.List()) == 0 {
origins.Add("http://localhost")
if hostname, err := os.Hostname(); err == nil {
origins.Add("http://" + strings.ToLower(hostname))
}
}
glog.V(logger.Debug).Infof("Allowed origin(s) for WS RPC interface %v\n", origins.List())
f := func(cfg *websocket.Config, req *http.Request) error {
origin := strings.ToLower(req.Header.Get("Origin"))
if allowAllOrigins || origins.Has(origin) {
return nil
}
glog.V(logger.Debug).Infof("origin '%s' not allowed on WS-RPC interface\n", origin)
return fmt.Errorf("origin %s not allowed", origin)
}
return f
}
// NewWSServer creates a new websocket RPC server around an API provider.
func NewWSServer(allowedOrigins string, handler *Server) *http.Server {
return &http.Server{
Handler: websocket.Server{
Handshake: wsHandshakeValidator(strings.Split(allowedOrigins, ",")),
Handler: func(conn *websocket.Conn) {
handler.ServeCodec(NewJSONCodec(&wsReaderWriterCloser{conn}),
OptionMethodInvocation|OptionSubscriptions)
},
},
}
}
// wsClient represents a RPC client that communicates over websockets with a
// RPC server.
type wsClient struct {
endpoint string
connMu sync.Mutex
conn *websocket.Conn
}
// NewWSClientj creates a new RPC client that communicates with a RPC server
// that is listening on the given endpoint using JSON encoding.
func NewWSClient(endpoint string) (Client, error) {
return &wsClient{endpoint: endpoint}, nil
}
// connection will return a websocket connection to the RPC server. It will
// (re)connect when necessary.
func (client *wsClient) connection() (*websocket.Conn, error) {
if client.conn != nil {
return client.conn, nil
}
origin, err := os.Hostname()
if err != nil {
return nil, err
}
origin = "http://" + origin
client.conn, err = websocket.Dial(client.endpoint, "", origin)
return client.conn, err
}
// SupportedModules is the collection of modules the RPC server offers.
func (client *wsClient) SupportedModules() (map[string]string, error) {
return SupportedModules(client)
}
// Send writes the JSON serialized msg to the websocket. It will create a new
// websocket connection to the server if the client is currently not connected.
func (client *wsClient) Send(msg interface{}) (err error) {
client.connMu.Lock()
defer client.connMu.Unlock()
var conn *websocket.Conn
if conn, err = client.connection(); err == nil {
if err = websocket.JSON.Send(conn, msg); err != nil {
client.conn.Close()
client.conn = nil
}
}
return err
}
// Recv reads a JSON message from the websocket and unmarshals it into msg.
func (client *wsClient) Recv(msg interface{}) (err error) {
client.connMu.Lock()
defer client.connMu.Unlock()
var conn *websocket.Conn
if conn, err = client.connection(); err == nil {
if err = websocket.JSON.Receive(conn, msg); err != nil {
client.conn.Close()
client.conn = nil
}
}
return
}
// Close closes the underlaying websocket connection.
func (client *wsClient) Close() {
client.connMu.Lock()
defer client.connMu.Unlock()
if client.conn != nil {
client.conn.Close()
client.conn = nil
}
}

Some files were not shown because too many files have changed in this diff Show More