Generalize signing requests.

We need to be able to sign more than just transactions to make DApps
work properly. This change separates signing requests from the
transactions and make it more general to prepare to intoduce different
types of signing requests.

This change is designed to preserve status APIs, so it is
backward-comparible with the current API bindings.
This commit is contained in:
Igor Mandrigin 2018-04-09 10:18:22 +02:00 committed by Igor Mandrigin
parent f3e53441d9
commit a9eb5a7d2b
24 changed files with 1076 additions and 1249 deletions

View File

@ -12,6 +12,7 @@ import (
"github.com/status-im/status-go/geth/node"
"github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/transactions"
"github.com/status-im/status-go/sign"
)
// StatusAPI provides API to access Status related functionality.
@ -49,9 +50,14 @@ func (api *StatusAPI) JailManager() jail.Manager {
return api.b.JailManager()
}
// TxQueueManager returns reference to account manager
func (api *StatusAPI) TxQueueManager() *transactions.Manager {
return api.b.TxQueueManager()
// Transactor returns reference to a status transactor
func (api *StatusAPI) Transactor() *transactions.Transactor {
return api.b.Transactor()
}
// PendingSignRequests returns reference to a list of current sign requests
func (api *StatusAPI) PendingSignRequests() *sign.PendingRequests {
return api.b.PendingSignRequests()
}
// StartNode start Status node, fails if node is already started
@ -156,7 +162,7 @@ func (api *StatusAPI) CompleteTransaction(id string, password string) (gethcommo
}
// CompleteTransactions instructs backend to complete sending of multiple transactions
func (api *StatusAPI) CompleteTransactions(ids []string, password string) map[string]transactions.Result {
func (api *StatusAPI) CompleteTransactions(ids []string, password string) map[string]sign.Result {
return api.b.CompleteTransactions(ids, password)
}

View File

@ -16,6 +16,7 @@ import (
"github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/signal"
"github.com/status-im/status-go/geth/transactions"
"github.com/status-im/status-go/sign"
)
const (
@ -32,14 +33,15 @@ var (
// StatusBackend implements Status.im service
type StatusBackend struct {
mu sync.Mutex
statusNode *node.StatusNode
accountManager *account.Manager
txQueueManager *transactions.Manager
jailManager jail.Manager
newNotification fcm.NotificationConstructor
connectionState ConnectionState
log log.Logger
mu sync.Mutex
statusNode *node.StatusNode
pendingSignRequests *sign.PendingRequests
accountManager *account.Manager
transactor *transactions.Transactor
jailManager jail.Manager
newNotification fcm.NotificationConstructor
connectionState ConnectionState
log log.Logger
}
// NewStatusBackend create a new NewStatusBackend instance
@ -47,18 +49,20 @@ func NewStatusBackend() *StatusBackend {
defer log.Info("Status backend initialized")
statusNode := node.New()
pendingSignRequests := sign.NewPendingRequests()
accountManager := account.NewManager(statusNode)
txQueueManager := transactions.NewManager(statusNode)
transactor := transactions.NewTransactor(pendingSignRequests)
jailManager := jail.New(statusNode)
notificationManager := fcm.NewNotification(fcmServerKey)
return &StatusBackend{
statusNode: statusNode,
accountManager: accountManager,
jailManager: jailManager,
txQueueManager: txQueueManager,
newNotification: notificationManager,
log: log.New("package", "status-go/geth/api.StatusBackend"),
pendingSignRequests: pendingSignRequests,
statusNode: statusNode,
accountManager: accountManager,
jailManager: jailManager,
transactor: transactor,
newNotification: notificationManager,
log: log.New("package", "status-go/geth/api.StatusBackend"),
}
}
@ -77,9 +81,14 @@ func (b *StatusBackend) JailManager() jail.Manager {
return b.jailManager
}
// TxQueueManager returns reference to transactions manager
func (b *StatusBackend) TxQueueManager() *transactions.Manager {
return b.txQueueManager
// Transactor returns reference to a status transactor
func (b *StatusBackend) Transactor() *transactions.Transactor {
return b.transactor
}
// PendingSignRequests returns reference to a list of current sign requests
func (b *StatusBackend) PendingSignRequests() *sign.PendingRequests {
return b.pendingSignRequests
}
// IsNodeRunning confirm that node is running
@ -117,9 +126,9 @@ func (b *StatusBackend) startNode(config *params.NodeConfig) (err error) {
return err
}
signal.Send(signal.Envelope{Type: signal.EventNodeStarted})
// tx queue manager should be started after node is started, it depends
// on rpc client being created
b.txQueueManager.Start(config.NetworkID)
b.transactor.SetNetworkID(config.NetworkID)
b.transactor.SetRPCClient(b.statusNode.RPCClient())
if err := b.registerHandlers(); err != nil {
b.log.Error("Handler registration failed", "err", err)
}
@ -142,7 +151,6 @@ func (b *StatusBackend) stopNode() error {
if !b.IsNodeRunning() {
return node.ErrNoRunningNode
}
b.txQueueManager.Stop()
b.jailManager.Stop()
defer signal.Send(signal.Envelope{Type: signal.EventNodeStopped})
return b.statusNode.Stop()
@ -193,18 +201,7 @@ func (b *StatusBackend) CallRPC(inputJSON string) string {
// SendTransaction creates a new transaction and waits until it's complete.
func (b *StatusBackend) SendTransaction(ctx context.Context, args transactions.SendTxArgs) (hash gethcommon.Hash, err error) {
if ctx == nil {
ctx = context.Background()
}
tx := transactions.Create(ctx, args)
if err = b.txQueueManager.QueueTransaction(tx); err != nil {
return hash, err
}
rst := b.txQueueManager.WaitForTransaction(tx)
if rst.Error != nil {
return hash, rst.Error
}
return rst.Hash, nil
return b.transactor.SendTransaction(ctx, args)
}
func (b *StatusBackend) getVerifiedAccount(password string) (*account.SelectedExtKey, error) {
@ -227,21 +224,15 @@ func (b *StatusBackend) getVerifiedAccount(password string) (*account.SelectedEx
// CompleteTransaction instructs backend to complete sending of a given transaction
func (b *StatusBackend) CompleteTransaction(id string, password string) (hash gethcommon.Hash, err error) {
selectedAccount, err := b.getVerifiedAccount(password)
if err != nil {
_ = b.txQueueManager.NotifyErrored(id, err)
return hash, err
}
return b.txQueueManager.CompleteTransaction(id, selectedAccount)
return b.pendingSignRequests.Approve(id, password, b.getVerifiedAccount)
}
// CompleteTransactions instructs backend to complete sending of multiple transactions
func (b *StatusBackend) CompleteTransactions(ids []string, password string) map[string]transactions.Result {
results := make(map[string]transactions.Result)
func (b *StatusBackend) CompleteTransactions(ids []string, password string) map[string]sign.Result {
results := make(map[string]sign.Result)
for _, txID := range ids {
txHash, txErr := b.CompleteTransaction(txID, password)
results[txID] = transactions.Result{
results[txID] = sign.Result{
Hash: txHash,
Error: txErr,
}
@ -251,7 +242,7 @@ func (b *StatusBackend) CompleteTransactions(ids []string, password string) map[
// DiscardTransaction discards a given transaction from transaction queue
func (b *StatusBackend) DiscardTransaction(id string) error {
return b.txQueueManager.DiscardTransaction(id)
return b.pendingSignRequests.Discard(id)
}
// DiscardTransactions discards given multiple transactions from transaction queue
@ -273,13 +264,30 @@ func (b *StatusBackend) registerHandlers() error {
if rpcClient == nil {
return node.ErrRPCClient
}
rpcClient.RegisterHandler("eth_accounts", func(context.Context, ...interface{}) (interface{}, error) {
rpcClient.RegisterHandler(params.AccountsMethodName, func(context.Context, ...interface{}) (interface{}, error) {
return b.AccountManager().Accounts()
})
rpcClient.RegisterHandler("eth_sendTransaction", b.txQueueManager.SendTransactionRPCHandler)
rpcClient.RegisterHandler(params.SendTransactionMethodName, func(ctx context.Context, rpcParams ...interface{}) (interface{}, error) {
txArgs, err := transactions.RPCCalltoSendTxArgs(rpcParams...)
if err != nil {
return nil, err
}
hash, err := b.SendTransaction(ctx, txArgs)
if err != nil {
return nil, err
}
return hash.Hex(), err
})
return nil
}
//
// ConnectionChange handles network state changes logic.
func (b *StatusBackend) ConnectionChange(state ConnectionState) {
b.log.Info("Network state change", "old", b.connectionState, "new", state)

View File

@ -34,6 +34,10 @@ const (
// SendTransactionMethodName defines the name for a giving transaction.
SendTransactionMethodName = "eth_sendTransaction"
// AccountsMethodName defines the name for listing the currently signed accounts.
AccountsMethodName = "eth_accounts"
// PersonalSignMethodName defines the name for `personal.sign` API.
PersonalSignMethodName = "personal_sign"
// WSPort is a WS-RPC port (replaced in unit tests)
WSPort = 8546

View File

@ -1,10 +0,0 @@
package transactions
import "errors"
var (
//ErrQueuedTxTimedOut - error transaction sending timed out
ErrQueuedTxTimedOut = errors.New("transaction sending timed out")
//ErrQueuedTxDiscarded - error transaction discarded
ErrQueuedTxDiscarded = errors.New("transaction has been discarded")
)

View File

@ -1,234 +0,0 @@
package transactions
import (
"errors"
"sync"
"time"
"github.com/ethereum/go-ethereum/accounts/keystore"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/geth/account"
)
const (
// DefaultTxQueueCap defines how many items can be queued.
DefaultTxQueueCap = int(35)
)
var (
// ErrQueuedTxExist - transaction was already enqueued
ErrQueuedTxExist = errors.New("transaction already exist in queue")
//ErrQueuedTxIDNotFound - error transaction hash not found
ErrQueuedTxIDNotFound = errors.New("transaction hash not found")
//ErrQueuedTxInProgress - error transaction is in progress
ErrQueuedTxInProgress = errors.New("transaction is in progress")
//ErrInvalidCompleteTxSender - error transaction with invalid sender
ErrInvalidCompleteTxSender = errors.New("transaction can only be completed by the same account which created it")
)
// remove from queue on any error (except for transient ones) and propagate
// defined as map[string]bool because errors from ethclient returned wrapped as jsonError
var transientErrs = map[string]bool{
keystore.ErrDecrypt.Error(): true, // wrong password
ErrInvalidCompleteTxSender.Error(): true, // completing tx create from another account
account.ErrNoAccountSelected.Error(): true, // account not selected
}
type empty struct{}
// TxQueue is capped container that holds pending transactions
type TxQueue struct {
mu sync.RWMutex // to guard transactions map
transactions map[string]*QueuedTx
inprogress map[string]empty
// TODO(dshulyak) research why eviction is done in separate goroutine
evictableIDs chan string
enqueueTicker chan struct{}
// when this channel is closed, all queue channels processing must cease (incoming queue, processing queued items etc)
stopped chan struct{}
stoppedGroup sync.WaitGroup // to make sure that all routines are stopped
log log.Logger
}
// newQueue creates a transaction queue.
func newQueue() *TxQueue {
logger := log.New("package", "status-go/geth/transactions.TxQueue")
logger.Info("initializing transaction queue")
return &TxQueue{
transactions: make(map[string]*QueuedTx),
inprogress: make(map[string]empty),
evictableIDs: make(chan string, DefaultTxQueueCap), // will be used to evict in FIFO
enqueueTicker: make(chan struct{}),
log: logger,
}
}
// Start starts enqueue and eviction loops
func (q *TxQueue) Start() {
q.log.Info("starting transaction queue")
if q.stopped != nil {
return
}
q.stopped = make(chan struct{})
q.stoppedGroup.Add(1)
go q.evictionLoop()
}
// Stop stops transaction enqueue and eviction loops
func (q *TxQueue) Stop() {
q.log.Info("stopping transaction queue")
if q.stopped == nil {
return
}
close(q.stopped) // stops all processing loops (enqueue, eviction etc)
q.stoppedGroup.Wait()
q.stopped = nil
q.log.Info("finally stopped transaction queue")
}
// evictionLoop frees up queue to accommodate another transaction item
func (q *TxQueue) evictionLoop() {
defer haltOnPanic()
evict := func() {
if q.Count() >= DefaultTxQueueCap { // eviction is required to accommodate another/last item
q.Remove(<-q.evictableIDs)
}
}
for {
select {
case <-time.After(250 * time.Millisecond): // do not wait for manual ticks, check queue regularly
evict()
case <-q.enqueueTicker: // when manually requested
evict()
case <-q.stopped:
q.log.Info("transaction queue's eviction loop stopped")
q.stoppedGroup.Done()
return
}
}
}
// Reset is to be used in tests only, as it simply creates new transaction map, w/o any cleanup of the previous one
func (q *TxQueue) Reset() {
q.mu.Lock()
defer q.mu.Unlock()
q.transactions = make(map[string]*QueuedTx)
q.evictableIDs = make(chan string, DefaultTxQueueCap)
q.inprogress = make(map[string]empty)
}
// Enqueue enqueues incoming transaction
func (q *TxQueue) Enqueue(tx *QueuedTx) error {
q.log.Info("enqueue transaction", "ID", tx.ID)
q.mu.RLock()
if _, ok := q.transactions[tx.ID]; ok {
q.mu.RUnlock()
return ErrQueuedTxExist
}
q.mu.RUnlock()
// we can't hold a lock in this part
q.log.Debug("notifying eviction loop")
q.enqueueTicker <- struct{}{} // notify eviction loop that we are trying to insert new item
q.evictableIDs <- tx.ID // this will block when we hit DefaultTxQueueCap
q.log.Debug("notified eviction loop")
q.mu.Lock()
q.transactions[tx.ID] = tx
q.mu.Unlock()
// notify handler
q.log.Info("calling txEnqueueHandler")
return nil
}
// Get returns transaction by transaction identifier
func (q *TxQueue) Get(id string) (*QueuedTx, error) {
q.mu.RLock()
defer q.mu.RUnlock()
if tx, ok := q.transactions[id]; ok {
return tx, nil
}
return nil, ErrQueuedTxIDNotFound
}
// LockInprogress returns error if transaction is already inprogress.
func (q *TxQueue) LockInprogress(id string) error {
q.mu.Lock()
defer q.mu.Unlock()
if _, ok := q.transactions[id]; ok {
if _, inprogress := q.inprogress[id]; inprogress {
return ErrQueuedTxInProgress
}
q.inprogress[id] = empty{}
return nil
}
return ErrQueuedTxIDNotFound
}
// Remove removes transaction by transaction identifier
func (q *TxQueue) Remove(id string) {
q.mu.Lock()
defer q.mu.Unlock()
q.remove(id)
}
func (q *TxQueue) remove(id string) {
delete(q.transactions, id)
delete(q.inprogress, id)
}
// Done removes transaction from queue if no error or error is not transient
// and notify subscribers
func (q *TxQueue) Done(id string, hash gethcommon.Hash, err error) error {
q.mu.Lock()
defer q.mu.Unlock()
tx, ok := q.transactions[id]
if !ok {
return ErrQueuedTxIDNotFound
}
q.done(tx, hash, err)
return nil
}
func (q *TxQueue) done(tx *QueuedTx, hash gethcommon.Hash, err error) {
delete(q.inprogress, tx.ID)
// hash is updated only if err is nil, but transaction is not removed from a queue
if err == nil {
q.transactions[tx.ID].Result <- Result{Hash: hash, Error: err}
q.remove(tx.ID)
return
}
if _, transient := transientErrs[err.Error()]; !transient {
q.transactions[tx.ID].Result <- Result{Error: err}
q.remove(tx.ID)
}
}
// Count returns number of currently queued transactions
func (q *TxQueue) Count() int {
q.mu.RLock()
defer q.mu.RUnlock()
return len(q.transactions)
}
// Has checks whether transaction with a given identifier exists in queue
func (q *TxQueue) Has(id string) bool {
q.mu.RLock()
defer q.mu.RUnlock()
_, ok := q.transactions[id]
return ok
}

View File

@ -1,131 +0,0 @@
package transactions
import (
"context"
"errors"
"testing"
"github.com/ethereum/go-ethereum/accounts/keystore"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/suite"
)
func TestQueueTestSuite(t *testing.T) {
suite.Run(t, new(QueueTestSuite))
}
type QueueTestSuite struct {
suite.Suite
queue *TxQueue
}
func (s *QueueTestSuite) SetupTest() {
s.queue = newQueue()
s.queue.Start()
}
func (s *QueueTestSuite) TearDownTest() {
s.queue.Stop()
}
func (s *QueueTestSuite) TestLockInprogressTransaction() {
tx := Create(context.Background(), SendTxArgs{})
s.NoError(s.queue.Enqueue(tx))
enquedTx, err := s.queue.Get(tx.ID)
s.NoError(err)
s.NoError(s.queue.LockInprogress(tx.ID))
s.Equal(tx, enquedTx)
// verify that tx was marked as being inprogress
s.Equal(ErrQueuedTxInProgress, s.queue.LockInprogress(tx.ID))
}
func (s *QueueTestSuite) TestGetTransaction() {
tx := Create(context.Background(), SendTxArgs{})
s.NoError(s.queue.Enqueue(tx))
for i := 2; i > 0; i-- {
enquedTx, err := s.queue.Get(tx.ID)
s.NoError(err)
s.Equal(tx, enquedTx)
}
}
func (s *QueueTestSuite) TestAlreadyEnqueued() {
tx := Create(context.Background(), SendTxArgs{})
s.NoError(s.queue.Enqueue(tx))
s.Equal(ErrQueuedTxExist, s.queue.Enqueue(tx))
// try to enqueue another tx to double check locking
tx = Create(context.Background(), SendTxArgs{})
s.NoError(s.queue.Enqueue(tx))
}
func (s *QueueTestSuite) testDone(hash gethcommon.Hash, err error) *QueuedTx {
tx := Create(context.Background(), SendTxArgs{})
s.NoError(s.queue.Enqueue(tx))
s.NoError(s.queue.Done(tx.ID, hash, err))
return tx
}
func (s *QueueTestSuite) TestDoneSuccess() {
hash := gethcommon.Hash{1}
tx := s.testDone(hash, nil)
// event is sent only if transaction was removed from a queue
select {
case rst := <-tx.Result:
s.NoError(rst.Error)
s.Equal(hash, rst.Hash)
s.False(s.queue.Has(tx.ID))
default:
s.Fail("No event was sent to Done channel")
}
}
func (s *QueueTestSuite) TestDoneTransientError() {
hash := gethcommon.Hash{1}
err := keystore.ErrDecrypt
tx := s.testDone(hash, err)
s.True(s.queue.Has(tx.ID))
_, inp := s.queue.inprogress[tx.ID]
s.False(inp)
}
func (s *QueueTestSuite) TestDoneError() {
hash := gethcommon.Hash{1}
err := errors.New("test")
tx := s.testDone(hash, err)
// event is sent only if transaction was removed from a queue
select {
case rst := <-tx.Result:
s.Equal(err, rst.Error)
s.NotEqual(hash, rst.Hash)
s.Equal(gethcommon.Hash{}, rst.Hash)
s.False(s.queue.Has(tx.ID))
default:
s.Fail("No event was sent to Done channel")
}
}
func (s QueueTestSuite) TestMultipleDone() {
hash := gethcommon.Hash{1}
err := keystore.ErrDecrypt
tx := s.testDone(hash, err)
s.NoError(s.queue.Done(tx.ID, hash, nil))
s.Equal(ErrQueuedTxIDNotFound, s.queue.Done(tx.ID, hash, errors.New("timeout")))
}
func (s *QueueTestSuite) TestEviction() {
var first *QueuedTx
for i := 0; i < DefaultTxQueueCap; i++ {
tx := Create(context.Background(), SendTxArgs{})
if first == nil {
first = tx
}
s.NoError(s.queue.Enqueue(tx))
}
s.Equal(DefaultTxQueueCap, s.queue.Count())
tx := Create(context.Background(), SendTxArgs{})
s.NoError(s.queue.Enqueue(tx))
s.Equal(DefaultTxQueueCap, s.queue.Count())
s.False(s.queue.Has(first.ID))
}

View File

@ -9,40 +9,32 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
"github.com/status-im/status-go/geth/rpc"
)
// EthTransactor provides methods to create transactions for ethereum network.
type EthTransactor interface {
PendingNonceAt(ctx context.Context, account common.Address) (uint64, error)
ethereum.GasEstimator
ethereum.GasPricer
ethereum.TransactionSender
// rpcWrapper wraps provides convenient interface for ethereum RPC APIs we need for sending transactions
type rpcWrapper struct {
rpcClient *rpc.Client
}
// EthTxClient wraps common API methods that are used to send transaction.
type EthTxClient struct {
c *rpc.Client
}
// NewEthTxClient returns a new EthTxClient for client
func NewEthTxClient(client *rpc.Client) *EthTxClient {
return &EthTxClient{c: client}
func newRPCWrapper(client *rpc.Client) *rpcWrapper {
return &rpcWrapper{rpcClient: client}
}
// PendingNonceAt returns the account nonce of the given account in the pending state.
// This is the nonce that should be used for the next transaction.
func (ec *EthTxClient) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) {
func (w *rpcWrapper) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) {
var result hexutil.Uint64
err := ec.c.CallContext(ctx, &result, "eth_getTransactionCount", account, "pending")
err := w.rpcClient.CallContext(ctx, &result, "eth_getTransactionCount", account, "pending")
return uint64(result), err
}
// SuggestGasPrice retrieves the currently suggested gas price to allow a timely
// execution of a transaction.
func (ec *EthTxClient) SuggestGasPrice(ctx context.Context) (*big.Int, error) {
func (w *rpcWrapper) SuggestGasPrice(ctx context.Context) (*big.Int, error) {
var hex hexutil.Big
if err := ec.c.CallContext(ctx, &hex, "eth_gasPrice"); err != nil {
if err := w.rpcClient.CallContext(ctx, &hex, "eth_gasPrice"); err != nil {
return nil, err
}
return (*big.Int)(&hex), nil
@ -52,9 +44,9 @@ func (ec *EthTxClient) SuggestGasPrice(ctx context.Context) (*big.Int, error) {
// the current pending state of the backend blockchain. There is no guarantee that this is
// the true gas limit requirement as other transactions may be added or removed by miners,
// but it should provide a basis for setting a reasonable default.
func (ec *EthTxClient) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) {
func (w *rpcWrapper) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) {
var hex hexutil.Uint64
err := ec.c.CallContext(ctx, &hex, "eth_estimateGas", toCallArg(msg))
err := w.rpcClient.CallContext(ctx, &hex, "eth_estimateGas", toCallArg(msg))
if err != nil {
return 0, err
}
@ -65,12 +57,12 @@ func (ec *EthTxClient) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (u
//
// If the transaction was a contract creation use the TransactionReceipt method to get the
// contract address after the transaction has been mined.
func (ec *EthTxClient) SendTransaction(ctx context.Context, tx *types.Transaction) error {
func (w *rpcWrapper) SendTransaction(ctx context.Context, tx *types.Transaction) error {
data, err := rlp.EncodeToBytes(tx)
if err != nil {
return err
}
return ec.c.CallContext(ctx, nil, "eth_sendRawTransaction", common.ToHex(data))
return w.rpcClient.CallContext(ctx, nil, "eth_sendRawTransaction", common.ToHex(data))
}
func toCallArg(msg ethereum.CallMsg) interface{} {

View File

@ -1,47 +0,0 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: geth/transactions/txqueue_manager_test.go
// Package transactions is a generated GoMock package.
package transactions
import (
reflect "reflect"
gomock "github.com/golang/mock/gomock"
rpc "github.com/status-im/status-go/geth/rpc"
)
// MocktestRPCClientProvider is a mock of testRPCClientProvider interface
type MocktestRPCClientProvider struct {
ctrl *gomock.Controller
recorder *MocktestRPCClientProviderMockRecorder
}
// MocktestRPCClientProviderMockRecorder is the mock recorder for MocktestRPCClientProvider
type MocktestRPCClientProviderMockRecorder struct {
mock *MocktestRPCClientProvider
}
// NewMocktestRPCClientProvider creates a new mock instance
func NewMocktestRPCClientProvider(ctrl *gomock.Controller) *MocktestRPCClientProvider {
mock := &MocktestRPCClientProvider{ctrl: ctrl}
mock.recorder = &MocktestRPCClientProviderMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MocktestRPCClientProvider) EXPECT() *MocktestRPCClientProviderMockRecorder {
return m.recorder
}
// RPCClient mocks base method
func (m *MocktestRPCClientProvider) RPCClient() *rpc.Client {
ret := m.ctrl.Call(m, "RPCClient")
ret0, _ := ret[0].(*rpc.Client)
return ret0
}
// RPCClient indicates an expected call of RPCClient
func (mr *MocktestRPCClientProviderMockRecorder) RPCClient() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RPCClient", reflect.TypeOf((*MocktestRPCClientProvider)(nil).RPCClient))
}

View File

@ -0,0 +1,205 @@
package transactions
import (
"context"
"math/big"
"sync"
"time"
ethereum "github.com/ethereum/go-ethereum"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/geth/account"
"github.com/status-im/status-go/geth/rpc"
"github.com/status-im/status-go/sign"
)
const (
// sendTxTimeout defines how many seconds to wait before returning result in sentTransaction().
sendTxTimeout = 300 * time.Second
rpcCallTimeout = time.Minute
defaultGas = 90000
)
// Transactor validates, signs transactions.
// It uses upstream to propagate transactions to the Ethereum network.
type Transactor struct {
pendingSignRequests *sign.PendingRequests
sender ethereum.TransactionSender
pendingNonceProvider PendingNonceProvider
gasCalculator GasCalculator
sendTxTimeout time.Duration
rpcCallTimeout time.Duration
networkID uint64
addrLock *AddrLocker
localNonce sync.Map
log log.Logger
}
// NewTransactor returns a new Manager.
func NewTransactor(signRequests *sign.PendingRequests) *Transactor {
return &Transactor{
pendingSignRequests: signRequests,
addrLock: &AddrLocker{},
sendTxTimeout: sendTxTimeout,
rpcCallTimeout: rpcCallTimeout,
localNonce: sync.Map{},
log: log.New("package", "status-go/geth/transactions.Manager"),
}
}
// SetNetworkID selects a correct network.
func (t *Transactor) SetNetworkID(networkID uint64) {
t.networkID = networkID
}
// SetRPCClient an RPC client.
func (t *Transactor) SetRPCClient(rpcClient *rpc.Client) {
rpcWrapper := newRPCWrapper(rpcClient)
t.sender = rpcWrapper
t.pendingNonceProvider = rpcWrapper
t.gasCalculator = rpcWrapper
}
// SendTransaction is an implementation of eth_sendTransaction. It queues the tx to the sign queue.
func (t *Transactor) SendTransaction(ctx context.Context, args SendTxArgs) (gethcommon.Hash, error) {
if ctx == nil {
ctx = context.Background()
}
completeFunc := func(acc *account.SelectedExtKey) (gethcommon.Hash, error) {
return t.validateAndPropagate(acc, args)
}
request, err := t.pendingSignRequests.Add(ctx, args, completeFunc)
if err != nil {
return gethcommon.Hash{}, err
}
rst := t.pendingSignRequests.Wait(request.ID, t.sendTxTimeout)
return rst.Hash, rst.Error
}
// make sure that only account which created the tx can complete it
func (t *Transactor) validateAccount(args SendTxArgs, selectedAccount *account.SelectedExtKey) error {
if selectedAccount == nil {
return account.ErrNoAccountSelected
}
if args.From.Hex() != selectedAccount.Address.Hex() {
err := sign.ErrInvalidCompleteTxSender
t.log.Error("queued transaction does not belong to the selected account", "err", err)
return err
}
return nil
}
func (t *Transactor) validateAndPropagate(selectedAccount *account.SelectedExtKey, args SendTxArgs) (hash gethcommon.Hash, err error) {
// TODO (mandrigin): Send sign request ID as a parameter to this function and uncoment the log message
// m.log.Info("complete transaction", "id", queuedTx.ID)
if err := t.validateAccount(args, selectedAccount); err != nil {
return hash, err
}
if !args.Valid() {
return hash, ErrInvalidSendTxArgs
}
t.addrLock.LockAddr(args.From)
var localNonce uint64
if val, ok := t.localNonce.Load(args.From); ok {
localNonce = val.(uint64)
}
var nonce uint64
defer func() {
// nonce should be incremented only if tx completed without error
// if upstream node returned nonce higher than ours we will stick to it
if err == nil {
t.localNonce.Store(args.From, nonce+1)
}
t.addrLock.UnlockAddr(args.From)
}()
ctx, cancel := context.WithTimeout(context.Background(), t.rpcCallTimeout)
defer cancel()
nonce, err = t.pendingNonceProvider.PendingNonceAt(ctx, args.From)
if err != nil {
return hash, err
}
// if upstream node returned nonce higher than ours we will use it, as it probably means
// that another client was used for sending transactions
if localNonce > nonce {
nonce = localNonce
}
gasPrice := (*big.Int)(args.GasPrice)
if args.GasPrice == nil {
ctx, cancel = context.WithTimeout(context.Background(), t.rpcCallTimeout)
defer cancel()
gasPrice, err = t.gasCalculator.SuggestGasPrice(ctx)
if err != nil {
return hash, err
}
}
chainID := big.NewInt(int64(t.networkID))
value := (*big.Int)(args.Value)
var gas uint64
if args.Gas == nil {
ctx, cancel = context.WithTimeout(context.Background(), t.rpcCallTimeout)
defer cancel()
gas, err = t.gasCalculator.EstimateGas(ctx, ethereum.CallMsg{
From: args.From,
To: args.To,
GasPrice: gasPrice,
Value: value,
Data: args.GetInput(),
})
if err != nil {
return hash, err
}
if gas < defaultGas {
t.log.Info("default gas will be used. estimated gas", gas, "is lower than", defaultGas)
gas = defaultGas
}
} else {
gas = uint64(*args.Gas)
}
var tx *types.Transaction
if args.To != nil {
t.log.Info("New transaction",
"From", args.From,
"To", *args.To,
"Gas", gas,
"GasPrice", gasPrice,
"Value", value,
)
tx = types.NewTransaction(nonce, *args.To, value, gas, gasPrice, args.GetInput())
} else {
// contract creation is rare enough to log an expected address
t.log.Info("New contract",
"From", args.From,
"Gas", gas,
"GasPrice", gasPrice,
"Value", value,
"Contract address", crypto.CreateAddress(args.From, nonce),
)
tx = types.NewContractCreation(nonce, value, gas, gasPrice, args.GetInput())
}
signedTx, err := types.SignTx(tx, types.NewEIP155Signer(chainID), selectedAccount.AccountKey.PrivateKey)
if err != nil {
return hash, err
}
ctx, cancel = context.WithTimeout(context.Background(), t.rpcCallTimeout)
defer cancel()
if err := t.sender.SendTransaction(ctx, signedTx); err != nil {
return hash, err
}
return signedTx.Hash(), nil
}

View File

@ -4,7 +4,6 @@ import (
"context"
"errors"
"math/big"
"sync"
"testing"
"time"
@ -26,52 +25,52 @@ import (
"github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/rpc"
"github.com/status-im/status-go/geth/transactions/fake"
"github.com/status-im/status-go/sign"
. "github.com/status-im/status-go/t/utils"
)
func simpleVerifyFunc(acc *account.SelectedExtKey) func(string) (*account.SelectedExtKey, error) {
return func(string) (*account.SelectedExtKey, error) {
return acc, nil
}
}
func TestTxQueueTestSuite(t *testing.T) {
suite.Run(t, new(TxQueueTestSuite))
}
type TxQueueTestSuite struct {
suite.Suite
rpcClientMockCtrl *gomock.Controller
rpcClientMock *MocktestRPCClientProvider
server *gethrpc.Server
client *gethrpc.Client
txServiceMockCtrl *gomock.Controller
txServiceMock *fake.MockPublicTransactionPoolAPI
nodeConfig *params.NodeConfig
manager *Manager
manager *Transactor
}
func (s *TxQueueTestSuite) SetupTest() {
s.rpcClientMockCtrl = gomock.NewController(s.T())
s.txServiceMockCtrl = gomock.NewController(s.T())
s.rpcClientMock = NewMocktestRPCClientProvider(s.rpcClientMockCtrl)
s.server, s.txServiceMock = fake.NewTestServer(s.txServiceMockCtrl)
s.client = gethrpc.DialInProc(s.server)
rpclient, _ := rpc.NewClient(s.client, params.UpstreamRPCConfig{})
s.rpcClientMock.EXPECT().RPCClient().Return(rpclient)
rpcClient, _ := rpc.NewClient(s.client, params.UpstreamRPCConfig{})
// expected by simulated backend
chainID := gethparams.AllEthashProtocolChanges.ChainId.Uint64()
nodeConfig, err := params.NewNodeConfig("/tmp", "", chainID, true)
s.Require().NoError(err)
s.nodeConfig = nodeConfig
s.manager = NewManager(s.rpcClientMock)
s.manager.DisableNotificactions()
s.manager.completionTimeout = time.Second
s.manager = NewTransactor(sign.NewPendingRequests())
s.manager.sendTxTimeout = time.Second
s.manager.rpcCallTimeout = time.Second
s.manager.Start(chainID)
s.manager.SetNetworkID(chainID)
s.manager.SetRPCClient(rpcClient)
}
func (s *TxQueueTestSuite) TearDownTest() {
s.manager.Stop()
s.rpcClientMockCtrl.Finish()
s.txServiceMockCtrl.Finish()
s.server.Stop()
s.client.Close()
@ -83,38 +82,44 @@ var (
testNonce = hexutil.Uint64(10)
)
func (s *TxQueueTestSuite) setupTransactionPoolAPI(tx *QueuedTx, returnNonce, resultNonce hexutil.Uint64, account *account.SelectedExtKey, txErr error) {
func (s *TxQueueTestSuite) completeFunc(args SendTxArgs) func(acc *account.SelectedExtKey) (gethcommon.Hash, error) {
return func(acc *account.SelectedExtKey) (gethcommon.Hash, error) {
return s.manager.validateAndPropagate(acc, args)
}
}
func (s *TxQueueTestSuite) setupTransactionPoolAPI(args SendTxArgs, returnNonce, resultNonce hexutil.Uint64, account *account.SelectedExtKey, txErr error) {
// Expect calls to gas functions only if there are no user defined values.
// And also set the expected gas and gas price for RLP encoding the expected tx.
var usedGas hexutil.Uint64
var usedGasPrice *big.Int
s.txServiceMock.EXPECT().GetTransactionCount(gomock.Any(), account.Address, gethrpc.PendingBlockNumber).Return(&returnNonce, nil)
if tx.Args.GasPrice == nil {
if args.GasPrice == nil {
usedGasPrice = (*big.Int)(testGasPrice)
s.txServiceMock.EXPECT().GasPrice(gomock.Any()).Return(usedGasPrice, nil)
} else {
usedGasPrice = (*big.Int)(tx.Args.GasPrice)
usedGasPrice = (*big.Int)(args.GasPrice)
}
if tx.Args.Gas == nil {
if args.Gas == nil {
s.txServiceMock.EXPECT().EstimateGas(gomock.Any(), gomock.Any()).Return(testGas, nil)
usedGas = testGas
} else {
usedGas = *tx.Args.Gas
usedGas = *args.Gas
}
// Prepare the transaction anD RLP encode it.
data := s.rlpEncodeTx(tx, s.nodeConfig, account, &resultNonce, usedGas, usedGasPrice)
data := s.rlpEncodeTx(args, s.nodeConfig, account, &resultNonce, usedGas, usedGasPrice)
// Expect the RLP encoded transaction.
s.txServiceMock.EXPECT().SendRawTransaction(gomock.Any(), data).Return(gethcommon.Hash{}, txErr)
}
func (s *TxQueueTestSuite) rlpEncodeTx(tx *QueuedTx, config *params.NodeConfig, account *account.SelectedExtKey, nonce *hexutil.Uint64, gas hexutil.Uint64, gasPrice *big.Int) hexutil.Bytes {
func (s *TxQueueTestSuite) rlpEncodeTx(args SendTxArgs, config *params.NodeConfig, account *account.SelectedExtKey, nonce *hexutil.Uint64, gas hexutil.Uint64, gasPrice *big.Int) hexutil.Bytes {
newTx := types.NewTransaction(
uint64(*nonce),
gethcommon.Address(*tx.Args.To),
tx.Args.Value.ToInt(),
gethcommon.Address(*args.To),
args.Value.ToInt(),
uint64(gas),
gasPrice,
[]byte(tx.Args.Input),
[]byte(args.Input),
)
chainID := big.NewInt(int64(config.NetworkID))
signedTx, err := types.SignTx(newTx, types.NewEIP155Signer(chainID), account.AccountKey.PrivateKey)
@ -160,141 +165,103 @@ func (s *TxQueueTestSuite) TestCompleteTransaction() {
for _, testCase := range testCases {
s.T().Run(testCase.name, func(t *testing.T) {
s.SetupTest()
tx := Create(context.Background(), SendTxArgs{
args := SendTxArgs{
From: account.FromAddress(TestConfig.Account1.Address),
To: account.ToAddress(TestConfig.Account2.Address),
Gas: testCase.gas,
GasPrice: testCase.gasPrice,
})
s.setupTransactionPoolAPI(tx, testNonce, testNonce, selectedAccount, nil)
}
s.setupTransactionPoolAPI(args, testNonce, testNonce, selectedAccount, nil)
s.NoError(s.manager.QueueTransaction(tx))
w := make(chan struct{})
var (
hash gethcommon.Hash
err error
sendHash gethcommon.Hash
err error
)
go func() {
hash, err = s.manager.CompleteTransaction(tx.ID, selectedAccount)
s.NoError(err)
var sendErr error
sendHash, sendErr = s.manager.SendTransaction(context.Background(), args)
s.NoError(sendErr)
close(w)
}()
rst := s.manager.WaitForTransaction(tx)
// Check that error is assigned to the transaction.
s.NoError(rst.Error)
// Transaction should be already removed from the queue.
s.False(s.manager.TransactionQueue().Has(tx.ID))
for i := 10; i > 0; i-- {
if s.manager.pendingSignRequests.Count() > 0 {
break
}
time.Sleep(time.Millisecond)
}
req := s.manager.pendingSignRequests.First()
s.NotNil(req)
approveHash, err := s.manager.pendingSignRequests.Approve(req.ID, "", simpleVerifyFunc(selectedAccount))
s.NoError(err)
s.NoError(WaitClosed(w, time.Second))
s.Equal(hash, rst.Hash)
// Transaction should be already removed from the queue.
s.False(s.manager.pendingSignRequests.Has(req.ID))
s.Equal(sendHash, approveHash)
})
}
}
func (s *TxQueueTestSuite) TestCompleteTransactionMultipleTimes() {
key, _ := crypto.GenerateKey()
selectedAccount := &account.SelectedExtKey{
Address: account.FromAddress(TestConfig.Account1.Address),
AccountKey: &keystore.Key{PrivateKey: key},
}
tx := Create(context.Background(), SendTxArgs{
From: account.FromAddress(TestConfig.Account1.Address),
To: account.ToAddress(TestConfig.Account2.Address),
})
s.setupTransactionPoolAPI(tx, testNonce, testNonce, selectedAccount, nil)
err := s.manager.QueueTransaction(tx)
s.NoError(err)
var (
wg sync.WaitGroup
mu sync.Mutex
completedTx int
inprogressTx int
txCount = 3
)
for i := 0; i < txCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, err := s.manager.CompleteTransaction(tx.ID, selectedAccount)
mu.Lock()
defer mu.Unlock()
if err == nil {
completedTx++
} else if err == ErrQueuedTxInProgress {
inprogressTx++
} else {
s.Fail("tx failed with unexpected error: ", err.Error())
}
}()
}
rst := s.manager.WaitForTransaction(tx)
// Check that error is assigned to the transaction.
s.NoError(rst.Error)
// Transaction should be already removed from the queue.
s.False(s.manager.TransactionQueue().Has(tx.ID))
// Wait for all CompleteTransaction calls.
wg.Wait()
s.Equal(1, completedTx, "only 1 tx expected to be completed")
s.Equal(txCount-1, inprogressTx, "txs expected to be reported as inprogress")
}
func (s *TxQueueTestSuite) TestAccountMismatch() {
selectedAccount := &account.SelectedExtKey{
Address: account.FromAddress(TestConfig.Account2.Address),
}
tx := Create(context.Background(), SendTxArgs{
args := SendTxArgs{
From: account.FromAddress(TestConfig.Account1.Address),
To: account.ToAddress(TestConfig.Account2.Address),
})
}
s.NoError(s.manager.QueueTransaction(tx))
go func() {
s.manager.SendTransaction(context.Background(), args) // nolint: errcheck
}()
_, err := s.manager.CompleteTransaction(tx.ID, selectedAccount)
s.Equal(err, ErrInvalidCompleteTxSender)
for i := 10; i > 0; i-- {
if s.manager.pendingSignRequests.Count() > 0 {
break
}
time.Sleep(time.Millisecond)
}
req := s.manager.pendingSignRequests.First()
s.NotNil(req)
_, err := s.manager.pendingSignRequests.Approve(req.ID, "", simpleVerifyFunc(selectedAccount))
s.Equal(err, sign.ErrInvalidCompleteTxSender)
// Transaction should stay in the queue as mismatched accounts
// is a recoverable error.
s.True(s.manager.TransactionQueue().Has(tx.ID))
s.True(s.manager.pendingSignRequests.Has(req.ID))
}
func (s *TxQueueTestSuite) TestDiscardTransaction() {
tx := Create(context.Background(), SendTxArgs{
args := SendTxArgs{
From: account.FromAddress(TestConfig.Account1.Address),
To: account.ToAddress(TestConfig.Account2.Address),
})
s.NoError(s.manager.QueueTransaction(tx))
}
w := make(chan struct{})
go func() {
s.NoError(s.manager.DiscardTransaction(tx.ID))
_, err := s.manager.SendTransaction(context.Background(), args)
s.Equal(sign.ErrSignReqDiscarded, err)
close(w)
}()
rst := s.manager.WaitForTransaction(tx)
s.Equal(ErrQueuedTxDiscarded, rst.Error)
// Transaction should be already removed from the queue.
s.False(s.manager.TransactionQueue().Has(tx.ID))
for i := 10; i > 0; i-- {
if s.manager.pendingSignRequests.Count() > 0 {
break
}
time.Sleep(time.Millisecond)
}
req := s.manager.pendingSignRequests.First()
s.NotNil(req)
err := s.manager.pendingSignRequests.Discard(req.ID)
s.NoError(err)
s.NoError(WaitClosed(w, time.Second))
}
func (s *TxQueueTestSuite) TestCompletionTimedOut() {
tx := Create(context.Background(), SendTxArgs{
From: account.FromAddress(TestConfig.Account1.Address),
To: account.ToAddress(TestConfig.Account2.Address),
})
s.NoError(s.manager.QueueTransaction(tx))
rst := s.manager.WaitForTransaction(tx)
s.Equal(ErrQueuedTxTimedOut, rst.Error)
}
// TestLocalNonce verifies that local nonce will be used unless
// upstream nonce is updated and higher than a local
// in test we will run 3 transaction with nonce zero returned by upstream
@ -310,49 +277,60 @@ func (s *TxQueueTestSuite) TestLocalNonce() {
AccountKey: &keystore.Key{PrivateKey: key},
}
nonce := hexutil.Uint64(0)
go func() {
approved := 0
for {
// 3 in a cycle, then 2
if approved >= txCount+2 {
return
}
req := s.manager.pendingSignRequests.First()
if req == nil {
time.Sleep(time.Millisecond)
} else {
s.manager.pendingSignRequests.Approve(req.ID, "", simpleVerifyFunc(selectedAccount)) // nolint: errcheck
}
}
}()
for i := 0; i < txCount; i++ {
tx := Create(context.Background(), SendTxArgs{
args := SendTxArgs{
From: account.FromAddress(TestConfig.Account1.Address),
To: account.ToAddress(TestConfig.Account2.Address),
})
s.setupTransactionPoolAPI(tx, nonce, hexutil.Uint64(i), selectedAccount, nil)
s.NoError(s.manager.QueueTransaction(tx))
hash, err := s.manager.CompleteTransaction(tx.ID, selectedAccount)
rst := s.manager.WaitForTransaction(tx)
// simple sanity checks
}
s.setupTransactionPoolAPI(args, nonce, hexutil.Uint64(i), selectedAccount, nil)
_, err := s.manager.SendTransaction(context.Background(), args)
s.NoError(err)
s.NoError(rst.Error)
s.Equal(rst.Hash, hash)
resultNonce, _ := s.manager.localNonce.Load(tx.Args.From)
resultNonce, _ := s.manager.localNonce.Load(args.From)
s.Equal(uint64(i)+1, resultNonce.(uint64))
}
nonce = hexutil.Uint64(5)
tx := Create(context.Background(), SendTxArgs{
args := SendTxArgs{
From: account.FromAddress(TestConfig.Account1.Address),
To: account.ToAddress(TestConfig.Account2.Address),
})
s.setupTransactionPoolAPI(tx, nonce, nonce, selectedAccount, nil)
s.NoError(s.manager.QueueTransaction(tx))
hash, err := s.manager.CompleteTransaction(tx.ID, selectedAccount)
rst := s.manager.WaitForTransaction(tx)
}
s.setupTransactionPoolAPI(args, nonce, nonce, selectedAccount, nil)
_, err := s.manager.SendTransaction(context.Background(), args)
s.NoError(err)
s.NoError(rst.Error)
s.Equal(rst.Hash, hash)
resultNonce, _ := s.manager.localNonce.Load(tx.Args.From)
resultNonce, _ := s.manager.localNonce.Load(args.From)
s.Equal(uint64(nonce)+1, resultNonce.(uint64))
testErr := errors.New("test")
s.txServiceMock.EXPECT().GetTransactionCount(gomock.Any(), selectedAccount.Address, gethrpc.PendingBlockNumber).Return(nil, testErr)
tx = Create(context.Background(), SendTxArgs{
args = SendTxArgs{
From: account.FromAddress(TestConfig.Account1.Address),
To: account.ToAddress(TestConfig.Account2.Address),
})
s.NoError(s.manager.QueueTransaction(tx))
_, err = s.manager.CompleteTransaction(tx.ID, selectedAccount)
rst = s.manager.WaitForTransaction(tx)
}
_, err = s.manager.SendTransaction(context.Background(), args)
s.EqualError(testErr, err.Error())
s.EqualError(testErr, rst.Error.Error())
resultNonce, _ = s.manager.localNonce.Load(tx.Args.From)
resultNonce, _ = s.manager.localNonce.Load(args.From)
s.Equal(uint64(nonce)+1, resultNonce.(uint64))
}
@ -367,13 +345,27 @@ func (s *TxQueueTestSuite) TestContractCreation() {
Address: testaddr,
AccountKey: &keystore.Key{PrivateKey: key},
}
s.manager.ethTxClient = backend
tx := Create(context.Background(), SendTxArgs{
s.manager.sender = backend
s.manager.gasCalculator = backend
s.manager.pendingNonceProvider = backend
tx := SendTxArgs{
From: testaddr,
Input: hexutil.Bytes(gethcommon.FromHex(contract.ENSBin)),
})
s.NoError(s.manager.QueueTransaction(tx))
hash, err := s.manager.CompleteTransaction(tx.ID, selectedAccount)
}
go func() {
for i := 1000; i > 0; i-- {
req := s.manager.pendingSignRequests.First()
if req == nil {
time.Sleep(time.Millisecond)
} else {
s.manager.pendingSignRequests.Approve(req.ID, "", simpleVerifyFunc(selectedAccount)) // nolint: errcheck
break
}
}
}()
hash, err := s.manager.SendTransaction(context.Background(), tx)
s.NoError(err)
backend.Commit()
receipt, err := backend.TransactionReceipt(context.TODO(), hash)

View File

@ -1,333 +0,0 @@
package transactions
import (
"context"
"encoding/json"
"errors"
"math/big"
"sync"
"time"
ethereum "github.com/ethereum/go-ethereum"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/core/types"
"github.com/status-im/status-go/geth/account"
"github.com/status-im/status-go/geth/rpc"
)
const (
// SendTxDefaultErrorCode is sent by default, when error is not nil, but type is unknown/unexpected.
SendTxDefaultErrorCode = SendTransactionDefaultErrorCode
// DefaultTxSendCompletionTimeout defines how many seconds to wait before returning result in sentTransaction().
DefaultTxSendCompletionTimeout = 300 * time.Second
defaultGas = 90000
defaultTimeout = time.Minute
)
var (
// ErrUnexpectedArgs returned when args are of unexpected length.
ErrUnexpectedArgs = errors.New("unexpected args")
)
// RPCClientProvider is an interface that provides a way
// to obtain an rpc.Client.
type RPCClientProvider interface {
RPCClient() *rpc.Client
}
// Manager provides means to manage internal Status Backend (injected into LES)
type Manager struct {
rpcClientProvider RPCClientProvider
txQueue *TxQueue
ethTxClient EthTransactor
notify bool
completionTimeout time.Duration
rpcCallTimeout time.Duration
networkID uint64
addrLock *AddrLocker
localNonce sync.Map
log log.Logger
}
// NewManager returns a new Manager.
func NewManager(rpcClientProvider RPCClientProvider) *Manager {
return &Manager{
rpcClientProvider: rpcClientProvider,
txQueue: newQueue(),
addrLock: &AddrLocker{},
notify: true,
completionTimeout: DefaultTxSendCompletionTimeout,
rpcCallTimeout: defaultTimeout,
localNonce: sync.Map{},
log: log.New("package", "status-go/geth/transactions.Manager"),
}
}
// DisableNotificactions turns off notifications on enqueue and return of tx.
// It is not thread safe and must be called only before manager is started.
func (m *Manager) DisableNotificactions() {
m.notify = false
}
// Start starts accepting new transactions into the queue.
func (m *Manager) Start(networkID uint64) {
m.log.Info("start Manager")
m.networkID = networkID
m.ethTxClient = NewEthTxClient(m.rpcClientProvider.RPCClient())
m.txQueue.Start()
}
// Stop stops accepting new transactions into the queue.
func (m *Manager) Stop() {
m.log.Info("stop Manager")
m.txQueue.Stop()
}
// TransactionQueue returns a reference to the queue.
func (m *Manager) TransactionQueue() *TxQueue {
return m.txQueue
}
// QueueTransaction puts a transaction into the queue.
func (m *Manager) QueueTransaction(tx *QueuedTx) error {
if !tx.Args.Valid() {
return ErrInvalidSendTxArgs
}
to := "<nil>"
if tx.Args.To != nil {
to = tx.Args.To.Hex()
}
m.log.Info("queue a new transaction", "id", tx.ID, "from", tx.Args.From.Hex(), "to", to)
if err := m.txQueue.Enqueue(tx); err != nil {
return err
}
if m.notify {
NotifyOnEnqueue(tx)
}
return nil
}
func (m *Manager) txDone(tx *QueuedTx, hash gethcommon.Hash, err error) {
if err := m.txQueue.Done(tx.ID, hash, err); err == ErrQueuedTxIDNotFound {
m.log.Warn("transaction is already removed from a queue", "ID", tx.ID)
return
}
if m.notify {
NotifyOnReturn(tx, err)
}
}
// WaitForTransaction adds a transaction to the queue and blocks
// until it's completed, discarded or times out.
func (m *Manager) WaitForTransaction(tx *QueuedTx) Result {
m.log.Info("wait for transaction", "id", tx.ID)
// now wait up until transaction is:
// - completed (via CompleteQueuedTransaction),
// - discarded (via DiscardQueuedTransaction)
// - or times out
for {
select {
case rst := <-tx.Result:
return rst
case <-time.After(m.completionTimeout):
m.txDone(tx, gethcommon.Hash{}, ErrQueuedTxTimedOut)
}
}
}
// NotifyErrored sends a notification for the given transaction
func (m *Manager) NotifyErrored(id string, inputError error) error {
tx, err := m.txQueue.Get(id)
if err != nil {
m.log.Warn("error getting a queued transaction", "err", err)
return err
}
if m.notify {
NotifyOnReturn(tx, inputError)
}
return nil
}
// CompleteTransaction instructs backend to complete sending of a given transaction.
func (m *Manager) CompleteTransaction(id string, account *account.SelectedExtKey) (hash gethcommon.Hash, err error) {
m.log.Info("complete transaction", "id", id)
tx, err := m.txQueue.Get(id)
if err != nil {
m.log.Warn("error getting a queued transaction", "err", err)
return hash, err
}
if err := m.txQueue.LockInprogress(id); err != nil {
m.log.Warn("can't process transaction", "err", err)
return hash, err
}
if err := m.validateAccount(tx, account); err != nil {
m.txDone(tx, hash, err)
return hash, err
}
hash, err = m.completeTransaction(account, tx)
m.log.Info("finally completed transaction", "id", tx.ID, "hash", hash, "err", err)
m.txDone(tx, hash, err)
return hash, err
}
// make sure that only account which created the tx can complete it
func (m *Manager) validateAccount(tx *QueuedTx, selectedAccount *account.SelectedExtKey) error {
if selectedAccount == nil {
return account.ErrNoAccountSelected
}
// make sure that only account which created the tx can complete it
if tx.Args.From.Hex() != selectedAccount.Address.Hex() {
m.log.Warn("queued transaction does not belong to the selected account", "err", ErrInvalidCompleteTxSender)
return ErrInvalidCompleteTxSender
}
return nil
}
func (m *Manager) completeTransaction(selectedAccount *account.SelectedExtKey, queuedTx *QueuedTx) (hash gethcommon.Hash, err error) {
m.log.Info("complete transaction", "id", queuedTx.ID)
m.addrLock.LockAddr(queuedTx.Args.From)
var localNonce uint64
if val, ok := m.localNonce.Load(queuedTx.Args.From); ok {
localNonce = val.(uint64)
}
var nonce uint64
defer func() {
// nonce should be incremented only if tx completed without error
// if upstream node returned nonce higher than ours we will stick to it
if err == nil {
m.localNonce.Store(queuedTx.Args.From, nonce+1)
}
m.addrLock.UnlockAddr(queuedTx.Args.From)
}()
ctx, cancel := context.WithTimeout(context.Background(), m.rpcCallTimeout)
defer cancel()
nonce, err = m.ethTxClient.PendingNonceAt(ctx, queuedTx.Args.From)
if err != nil {
return hash, err
}
// if upstream node returned nonce higher than ours we will use it, as it probably means
// that another client was used for sending transactions
if localNonce > nonce {
nonce = localNonce
}
args := queuedTx.Args
if !args.Valid() {
return hash, ErrInvalidSendTxArgs
}
gasPrice := (*big.Int)(args.GasPrice)
if args.GasPrice == nil {
ctx, cancel = context.WithTimeout(context.Background(), m.rpcCallTimeout)
defer cancel()
gasPrice, err = m.ethTxClient.SuggestGasPrice(ctx)
if err != nil {
return hash, err
}
}
chainID := big.NewInt(int64(m.networkID))
value := (*big.Int)(args.Value)
var gas uint64
if args.Gas == nil {
ctx, cancel = context.WithTimeout(context.Background(), m.rpcCallTimeout)
defer cancel()
gas, err = m.ethTxClient.EstimateGas(ctx, ethereum.CallMsg{
From: args.From,
To: args.To,
GasPrice: gasPrice,
Value: value,
Data: args.GetInput(),
})
if err != nil {
return hash, err
}
if gas < defaultGas {
m.log.Info("default gas will be used. estimated gas", gas, "is lower than", defaultGas)
gas = defaultGas
}
} else {
gas = uint64(*args.Gas)
}
var tx *types.Transaction
if args.To != nil {
m.log.Info("New transaction",
"From", args.From,
"To", *args.To,
"Gas", gas,
"GasPrice", gasPrice,
"Value", value,
)
tx = types.NewTransaction(nonce, *args.To, value, gas, gasPrice, args.GetInput())
} else {
// contract creation is rare enough to log an expected address
m.log.Info("New contract",
"From", args.From,
"Gas", gas,
"GasPrice", gasPrice,
"Value", value,
"Contract address", crypto.CreateAddress(args.From, nonce),
)
tx = types.NewContractCreation(nonce, value, gas, gasPrice, args.GetInput())
}
signedTx, err := types.SignTx(tx, types.NewEIP155Signer(chainID), selectedAccount.AccountKey.PrivateKey)
if err != nil {
return hash, err
}
ctx, cancel = context.WithTimeout(context.Background(), m.rpcCallTimeout)
defer cancel()
if err := m.ethTxClient.SendTransaction(ctx, signedTx); err != nil {
return hash, err
}
return signedTx.Hash(), nil
}
// DiscardTransaction discards a given transaction from transaction queue
func (m *Manager) DiscardTransaction(id string) error {
tx, err := m.txQueue.Get(id)
if err != nil {
return err
}
err = m.txQueue.Done(id, gethcommon.Hash{}, ErrQueuedTxDiscarded)
if m.notify {
NotifyOnReturn(tx, ErrQueuedTxDiscarded)
}
return err
}
// SendTransactionRPCHandler is a handler for eth_sendTransaction method.
// It accepts one param which is a slice with a map of transaction params.
func (m *Manager) SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) {
m.log.Debug("SendTransactionRPCHandler called", "ARGS", args)
if len(args) != 1 {
return nil, ErrUnexpectedArgs
}
data, err := json.Marshal(args[0])
if err != nil {
return nil, err
}
var txArgs SendTxArgs
if err := json.Unmarshal(data, &txArgs); err != nil {
return nil, err
}
tx := Create(ctx, txArgs)
if err := m.QueueTransaction(tx); err != nil {
return nil, err
}
rst := m.WaitForTransaction(tx)
if rst.Error != nil {
return nil, rst.Error
}
return rst.Hash.Hex(), nil
}

View File

@ -3,29 +3,30 @@ package transactions
import (
"bytes"
"context"
"encoding/json"
"errors"
ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
)
// errors
var (
// ErrInvalidSendTxArgs is returned when the structure of SendTxArgs is ambigious.
ErrInvalidSendTxArgs = errors.New("Transaction arguments are invalid (are both 'input' and 'data' fields used?)")
// ErrUnexpectedArgs returned when args are of unexpected length.
ErrUnexpectedArgs = errors.New("unexpected args")
)
// Result is a JSON returned from transaction complete function (used internally)
type Result struct {
Hash common.Hash
Error error
// PendingNonceProvider provides information about nonces.
type PendingNonceProvider interface {
PendingNonceAt(ctx context.Context, account common.Address) (uint64, error)
}
// QueuedTx holds enough information to complete the queued transaction.
type QueuedTx struct {
ID string
Context context.Context
Args SendTxArgs
Result chan Result
// GasCalculator provides methods for estimating and pricing gas.
type GasCalculator interface {
ethereum.GasEstimator
ethereum.GasPricer
}
// SendTxArgs represents the arguments to submit a new transaction into the transaction pool.
@ -68,3 +69,20 @@ func (args SendTxArgs) GetInput() hexutil.Bytes {
func isNilOrEmpty(bytes hexutil.Bytes) bool {
return bytes == nil || len(bytes) == 0
}
// RPCCalltoSendTxArgs creates SendTxArgs based on RPC parameters
func RPCCalltoSendTxArgs(args ...interface{}) (SendTxArgs, error) {
var txArgs SendTxArgs
if len(args) != 1 {
return txArgs, ErrUnexpectedArgs
}
data, err := json.Marshal(args[0])
if err != nil {
return txArgs, err
}
if err := json.Unmarshal(data, &txArgs); err != nil {
return txArgs, err
}
return txArgs, nil
}

View File

@ -1,89 +0,0 @@
package transactions
import (
"context"
"errors"
"fmt"
"io"
"os"
"reflect"
"runtime/debug"
"github.com/pborman/uuid"
"github.com/status-im/status-go/geth/signal"
)
const (
// MessageIDKey is a key for message ID
// This ID is required to track from which chat a given send transaction request is coming.
MessageIDKey = contextKey("message_id")
)
type contextKey string // in order to make sure that our context key does not collide with keys from other packages
//ErrTxQueueRunFailure - error running transaction queue
var ErrTxQueueRunFailure = errors.New("error running transaction queue")
// haltOnPanic recovers from panic, logs issue, sends upward notification, and exits
func haltOnPanic() {
if r := recover(); r != nil {
err := fmt.Errorf("%v: %v", ErrTxQueueRunFailure, r)
// send signal up to native app
signal.Send(signal.Envelope{
Type: signal.EventNodeCrashed,
Event: signal.NodeCrashEvent{
Error: err,
},
})
fatalf(err) // os.exit(1) is called internally
}
}
// messageIDFromContext returns message id from context (if exists)
func messageIDFromContext(ctx context.Context) string {
if ctx == nil {
return ""
}
if messageID, ok := ctx.Value(MessageIDKey).(string); ok {
return messageID
}
return ""
}
// fatalf is used to halt the execution.
// When called the function prints stack end exits.
// Failure is logged into both StdErr and StdOut.
func fatalf(reason interface{}, args ...interface{}) {
// decide on output stream
w := io.MultiWriter(os.Stdout, os.Stderr)
outf, _ := os.Stdout.Stat() // nolint: gas
errf, _ := os.Stderr.Stat() // nolint: gas
if outf != nil && errf != nil && os.SameFile(outf, errf) {
w = os.Stderr
}
// find out whether error or string has been passed as a reason
r := reflect.ValueOf(reason)
if r.Kind() == reflect.String {
fmt.Fprintf(w, "Fatal Failure: %v\n%v\n", reason.(string), args)
} else {
fmt.Fprintf(w, "Fatal Failure: %v\n", reason.(error))
}
debug.PrintStack()
os.Exit(1)
}
// Create returns a transaction object.
func Create(ctx context.Context, args SendTxArgs) *QueuedTx {
return &QueuedTx{
ID: uuid.New(),
Context: ctx,
Args: args,
Result: make(chan Result, 1),
}
}

View File

@ -34,6 +34,7 @@ import (
"github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/signal"
"github.com/status-im/status-go/geth/transactions"
"github.com/status-im/status-go/sign"
"github.com/status-im/status-go/static"
. "github.com/status-im/status-go/t/utils" //nolint: golint
)
@ -767,10 +768,8 @@ func testAccountLogout(t *testing.T) bool {
}
func testCompleteTransaction(t *testing.T) bool {
txQueueManager := statusAPI.TxQueueManager()
txQueue := txQueueManager.TransactionQueue()
signRequests := statusAPI.PendingSignRequests()
txQueue.Reset()
EnsureNodeSync(statusAPI.StatusNode().EnsureSync)
// log into account from which transactions will be sent
@ -792,7 +791,7 @@ func testCompleteTransaction(t *testing.T) bool {
t.Errorf("cannot unmarshal event's JSON: %s. Error %q", jsonEvent, err)
return
}
if envelope.Type == transactions.EventTransactionQueued {
if envelope.Type == sign.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
t.Logf("transaction queued (will be completed shortly): {id: %s}\n", event["id"].(string))
@ -839,7 +838,7 @@ func testCompleteTransaction(t *testing.T) bool {
return false
}
if txQueue.Count() != 0 {
if signRequests.Count() != 0 {
t.Error("tx queue must be empty at this point")
return false
}
@ -848,8 +847,7 @@ func testCompleteTransaction(t *testing.T) bool {
}
func testCompleteMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocyclo
txQueue := statusAPI.TxQueueManager().TransactionQueue()
txQueue.Reset()
signRequests := statusAPI.PendingSignRequests()
// log into account from which transactions will be sent
if err := statusAPI.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password); err != nil {
@ -870,7 +868,7 @@ func testCompleteMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocyc
t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent)
return
}
if envelope.Type == transactions.EventTransactionQueued {
if envelope.Type == sign.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
txID = event["id"].(string)
t.Logf("transaction queued (will be completed in a single call, once aggregated): {id: %s}\n", txID)
@ -917,7 +915,7 @@ func testCompleteMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocyc
}
results := resultsStruct.Results
if len(results) != (testTxCount+1) || results["invalid-tx-id"].Error != transactions.ErrQueuedTxIDNotFound.Error() {
if len(results) != (testTxCount+1) || results["invalid-tx-id"].Error != sign.ErrSignReqNotFound.Error() {
t.Errorf("cannot complete txs: %v", results)
return
}
@ -942,7 +940,7 @@ func testCompleteMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocyc
time.Sleep(1 * time.Second) // make sure that tx complete signal propagates
for _, txID := range parsedIDs {
if txQueue.Has(string(txID)) {
if signRequests.Has(string(txID)) {
t.Errorf("txqueue should not have test tx at this point (it should be completed): %s", txID)
return
}
@ -972,7 +970,7 @@ func testCompleteMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocyc
return false
}
if txQueue.Count() != 0 {
if signRequests.Count() != 0 {
t.Error("tx queue must be empty at this point")
return false
}
@ -981,8 +979,7 @@ func testCompleteMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocyc
}
func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo
txQueue := statusAPI.TxQueueManager().TransactionQueue()
txQueue.Reset()
signRequests := statusAPI.PendingSignRequests()
// log into account from which transactions will be sent
if err := statusAPI.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password); err != nil {
@ -1003,12 +1000,12 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo
t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent)
return
}
if envelope.Type == transactions.EventTransactionQueued {
if envelope.Type == sign.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
txID = event["id"].(string)
t.Logf("transaction queued (will be discarded soon): {id: %s}\n", txID)
if !txQueue.Has(string(txID)) {
if !signRequests.Has(string(txID)) {
t.Errorf("txqueue should still have test tx: %s", txID)
return
}
@ -1028,13 +1025,12 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo
// try completing discarded transaction
_, err := statusAPI.CompleteTransaction(string(txID), TestConfig.Account1.Password)
if err != transactions.ErrQueuedTxIDNotFound {
if err != sign.ErrSignReqNotFound {
t.Error("expects tx not found, but call to CompleteTransaction succeeded")
return
}
time.Sleep(1 * time.Second) // make sure that tx complete signal propagates
if txQueue.Has(string(txID)) {
if signRequests.Has(string(txID)) {
t.Errorf("txqueue should not have test tx at this point (it should be discarded): %s", txID)
return
}
@ -1042,19 +1038,19 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo
completeQueuedTransaction <- struct{}{} // so that timeout is aborted
}
if envelope.Type == transactions.EventTransactionFailed {
if envelope.Type == sign.EventTransactionFailed {
event := envelope.Event.(map[string]interface{})
t.Logf("transaction return event received: {id: %s}\n", event["id"].(string))
receivedErrMessage := event["error_message"].(string)
expectedErrMessage := transactions.ErrQueuedTxDiscarded.Error()
expectedErrMessage := sign.ErrSignReqDiscarded.Error()
if receivedErrMessage != expectedErrMessage {
t.Errorf("unexpected error message received: got %v", receivedErrMessage)
return
}
receivedErrCode := event["error_code"].(string)
if receivedErrCode != strconv.Itoa(transactions.SendTransactionDiscardedErrorCode) {
if receivedErrCode != strconv.Itoa(sign.SendTransactionDiscardedErrorCode) {
t.Errorf("unexpected error code received: got %v", receivedErrCode)
return
}
@ -1069,7 +1065,8 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo
To: account.ToAddress(TestConfig.Account2.Address),
Value: (*hexutil.Big)(big.NewInt(1000000000000)),
})
if err != transactions.ErrQueuedTxDiscarded {
time.Sleep(1 * time.Second)
if err != sign.ErrSignReqDiscarded {
t.Errorf("expected error not thrown: %v", err)
return false
}
@ -1079,7 +1076,7 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo
return false
}
if txQueue.Count() != 0 {
if signRequests.Count() != 0 {
t.Error("tx queue must be empty at this point")
return false
}
@ -1093,8 +1090,7 @@ func testDiscardTransaction(t *testing.T) bool { //nolint: gocyclo
}
func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocyclo
txQueue := statusAPI.TxQueueManager().TransactionQueue()
txQueue.Reset()
signRequests := statusAPI.PendingSignRequests()
// log into account from which transactions will be sent
if err := statusAPI.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password); err != nil {
@ -1116,12 +1112,12 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl
t.Errorf("cannot unmarshal event's JSON: %s", jsonEvent)
return
}
if envelope.Type == transactions.EventTransactionQueued {
if envelope.Type == sign.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
txID = event["id"].(string)
t.Logf("transaction queued (will be discarded soon): {id: %s}\n", txID)
if !txQueue.Has(string(txID)) {
if !signRequests.Has(string(txID)) {
t.Errorf("txqueue should still have test tx: %s", txID)
return
}
@ -1129,19 +1125,19 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl
txIDs <- txID
}
if envelope.Type == transactions.EventTransactionFailed {
if envelope.Type == sign.EventTransactionFailed {
event := envelope.Event.(map[string]interface{})
t.Logf("transaction return event received: {id: %s}\n", event["id"].(string))
receivedErrMessage := event["error_message"].(string)
expectedErrMessage := transactions.ErrQueuedTxDiscarded.Error()
expectedErrMessage := sign.ErrSignReqDiscarded.Error()
if receivedErrMessage != expectedErrMessage {
t.Errorf("unexpected error message received: got %v", receivedErrMessage)
return
}
receivedErrCode := event["error_code"].(string)
if receivedErrCode != strconv.Itoa(transactions.SendTransactionDiscardedErrorCode) {
if receivedErrCode != strconv.Itoa(sign.SendTransactionDiscardedErrorCode) {
t.Errorf("unexpected error code received: got %v", receivedErrCode)
return
}
@ -1160,7 +1156,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl
To: account.ToAddress(TestConfig.Account2.Address),
Value: (*hexutil.Big)(big.NewInt(1000000000000)),
})
if err != transactions.ErrQueuedTxDiscarded {
if err != sign.ErrSignReqDiscarded {
t.Errorf("expected error not thrown: %v", err)
return
}
@ -1191,7 +1187,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl
}
discardResults := discardResultsStruct.Results
if len(discardResults) != 1 || discardResults["invalid-tx-id"].Error != transactions.ErrQueuedTxIDNotFound.Error() {
if len(discardResults) != 1 || discardResults["invalid-tx-id"].Error != sign.ErrSignReqNotFound.Error() {
t.Errorf("cannot discard txs: %v", discardResults)
return
}
@ -1213,7 +1209,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl
t.Errorf("tx id not set in result: expected id is %s", txID)
return
}
if txResult.Error != transactions.ErrQueuedTxIDNotFound.Error() {
if txResult.Error != sign.ErrSignReqNotFound.Error() {
t.Errorf("invalid error for %s", txResult.Hash)
return
}
@ -1225,7 +1221,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl
time.Sleep(1 * time.Second) // make sure that tx complete signal propagates
for _, txID := range parsedIDs {
if txQueue.Has(string(txID)) {
if signRequests.Has(string(txID)) {
t.Errorf("txqueue should not have test tx at this point (it should be discarded): %s", txID)
return
}
@ -1254,7 +1250,7 @@ func testDiscardMultipleQueuedTransactions(t *testing.T) bool { //nolint: gocycl
return false
}
if txQueue.Count() != 0 {
if signRequests.Count() != 0 {
t.Error("tx queue must be empty at this point")
return false
}
@ -1430,7 +1426,7 @@ func startTestNode(t *testing.T) <-chan struct{} {
return
}
if envelope.Type == transactions.EventTransactionQueued {
if envelope.Type == sign.EventTransactionQueued {
}
if envelope.Type == signal.EventNodeStarted {
t.Log("Node started, but we wait till it be ready")

13
sign/README.md Normal file
View File

@ -0,0 +1,13 @@
# sign
`sign` package represents the API and signals for sending and receiving
signature request to and from our API user.
When a method is called that requires an additional signature confirmation from
a user (like, a transaction), it gets it's sign request.
Client of the API is then nofified of the sign request.
Client has a chance to approve the sign request (by providing a valid password)
or to discard it. When the request is approved, the locked functinality is
executed.

36
sign/errors.go Normal file
View File

@ -0,0 +1,36 @@
package sign
import (
"errors"
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/status-im/status-go/geth/account"
)
// TODO (mandrigin): Change values of these errors when API change is made.
var (
//ErrSignReqNotFound - error transaction hash not found
ErrSignReqNotFound = errors.New("transaction hash not found")
//ErrSignReqInProgress - error transaction is in progress
ErrSignReqInProgress = errors.New("transaction is in progress")
// TODO (mandrigin): to be moved to `transactions` package
//ErrInvalidCompleteTxSender - error transaction with invalid sender
ErrInvalidCompleteTxSender = errors.New("transaction can only be completed by the same account which created it")
//ErrSignReqTimedOut - error transaction sending timed out
ErrSignReqTimedOut = errors.New("transaction sending timed out")
//ErrSignReqDiscarded - error transaction discarded
ErrSignReqDiscarded = errors.New("transaction has been discarded")
)
// remove from queue on any error (except for transient ones) and propagate
// defined as map[string]bool because errors from ethclient returned wrapped as jsonError
var transientErrs = map[string]bool{
keystore.ErrDecrypt.Error(): true, // wrong password
ErrInvalidCompleteTxSender.Error(): true, // completing tx create from another account
account.ErrNoAccountSelected.Error(): true, // account not selected
}
func isTransient(err error) bool {
_, transient := transientErrs[err.Error()]
return transient
}

View File

@ -1,6 +1,8 @@
package transactions
package sign
import (
"context"
"github.com/ethereum/go-ethereum/accounts/keystore"
"github.com/status-im/status-go/geth/signal"
)
@ -25,57 +27,73 @@ const (
SendTransactionDiscardedErrorCode
)
const (
// MessageIDKey is a key for message ID
// This ID is required to track from which chat a given send transaction request is coming.
MessageIDKey = contextKey("message_id")
)
type contextKey string // in order to make sure that our context key does not collide with keys from other packages
// messageIDFromContext returns message id from context (if exists)
func messageIDFromContext(ctx context.Context) string {
if ctx == nil {
return ""
}
if messageID, ok := ctx.Value(MessageIDKey).(string); ok {
return messageID
}
return ""
}
var txReturnCodes = map[error]int{
nil: SendTransactionNoErrorCode,
keystore.ErrDecrypt: SendTransactionPasswordErrorCode,
ErrQueuedTxTimedOut: SendTransactionTimeoutErrorCode,
ErrQueuedTxDiscarded: SendTransactionDiscardedErrorCode,
nil: SendTransactionNoErrorCode,
keystore.ErrDecrypt: SendTransactionPasswordErrorCode,
ErrSignReqTimedOut: SendTransactionTimeoutErrorCode,
ErrSignReqDiscarded: SendTransactionDiscardedErrorCode,
}
// SendTransactionEvent is a signal sent on a send transaction request
type SendTransactionEvent struct {
ID string `json:"id"`
Args SendTxArgs `json:"args"`
MessageID string `json:"message_id"`
ID string `json:"id"`
Args interface{} `json:"args"`
MessageID string `json:"message_id"`
}
// NotifyOnEnqueue returns handler that processes incoming tx queue requests
func NotifyOnEnqueue(queuedTx *QueuedTx) {
func NotifyOnEnqueue(request *Request) {
signal.Send(signal.Envelope{
Type: EventTransactionQueued,
Event: SendTransactionEvent{
ID: queuedTx.ID,
Args: queuedTx.Args,
MessageID: messageIDFromContext(queuedTx.Context),
ID: request.ID,
Args: request.Meta,
MessageID: messageIDFromContext(request.context),
},
})
}
// ReturnSendTransactionEvent is a JSON returned whenever transaction send is returned
type ReturnSendTransactionEvent struct {
ID string `json:"id"`
Args SendTxArgs `json:"args"`
MessageID string `json:"message_id"`
ErrorMessage string `json:"error_message"`
ErrorCode int `json:"error_code,string"`
ID string `json:"id"`
Args interface{} `json:"args"`
MessageID string `json:"message_id"`
ErrorMessage string `json:"error_message"`
ErrorCode int `json:"error_code,string"`
}
// NotifyOnReturn returns handler that processes responses from internal tx manager
func NotifyOnReturn(queuedTx *QueuedTx, err error) {
func NotifyOnReturn(request *Request, err error) {
// we don't want to notify a user if tx was sent successfully
if err == nil {
return
}
// discard notifications with empty tx
if queuedTx == nil {
return
}
signal.Send(signal.Envelope{
Type: EventTransactionFailed,
Event: ReturnSendTransactionEvent{
ID: queuedTx.ID,
Args: queuedTx.Args,
MessageID: messageIDFromContext(queuedTx.Context),
ID: request.ID,
Args: request.Meta,
MessageID: messageIDFromContext(request.context),
ErrorMessage: err.Error(),
ErrorCode: sendTransactionErrorCode(err),
},
@ -86,5 +104,5 @@ func sendTransactionErrorCode(err error) int {
if code, ok := txReturnCodes[err]; ok {
return code
}
return SendTxDefaultErrorCode
return SendTransactionDefaultErrorCode
}

172
sign/pending_requests.go Normal file
View File

@ -0,0 +1,172 @@
package sign
import (
"context"
"sync"
"time"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/geth/account"
)
type verifyFunc func(string) (*account.SelectedExtKey, error)
// PendingRequests is a capped container that holds pending signing requests.
type PendingRequests struct {
mu sync.RWMutex // to guard transactions map
requests map[string]*Request
log log.Logger
}
// NewPendingRequests creates a new requests list
func NewPendingRequests() *PendingRequests {
logger := log.New("package", "status-go/sign.PendingRequests")
return &PendingRequests{
requests: make(map[string]*Request),
log: logger,
}
}
// Add a new signing request.
func (rs *PendingRequests) Add(ctx context.Context, meta Meta, completeFunc completeFunc) (*Request, error) {
rs.mu.Lock()
defer rs.mu.Unlock()
request := newRequest(ctx, meta, completeFunc)
rs.requests[request.ID] = request
rs.log.Info("signing request is created", "ID", request.ID)
go NotifyOnEnqueue(request)
return request, nil
}
// Get returns a signing request by it's ID.
func (rs *PendingRequests) Get(id string) (*Request, error) {
rs.mu.RLock()
defer rs.mu.RUnlock()
if request, ok := rs.requests[id]; ok {
return request, nil
}
return nil, ErrSignReqNotFound
}
// First returns a first signing request (if exists, nil otherwise).
func (rs *PendingRequests) First() *Request {
rs.mu.RLock()
defer rs.mu.RUnlock()
for _, req := range rs.requests {
return req
}
return nil
}
// Approve a signing request by it's ID. Requires a valid password and a verification function.
func (rs *PendingRequests) Approve(id string, password string, verify verifyFunc) (hash gethcommon.Hash, err error) {
rs.log.Info("complete transaction", "id", id)
request, err := rs.tryLock(id)
if err != nil {
rs.log.Warn("can't process transaction", "err", err)
return hash, err
}
selectedAccount, err := verify(password)
if err != nil {
rs.complete(request, hash, err)
return hash, err
}
hash, err = request.completeFunc(selectedAccount)
rs.log.Info("finally completed transaction", "id", request.ID, "hash", hash, "err", err)
rs.complete(request, hash, err)
return hash, err
}
// Discard remove a signing request from the list of pending requests.
func (rs *PendingRequests) Discard(id string) error {
request, err := rs.Get(id)
if err != nil {
return err
}
rs.complete(request, gethcommon.Hash{}, ErrSignReqDiscarded)
return nil
}
// Wait blocks until a request with a specified ID is completed (approved or discarded)
func (rs *PendingRequests) Wait(id string, timeout time.Duration) Result {
request, err := rs.Get(id)
if err != nil {
return Result{Error: err}
}
for {
select {
case rst := <-request.result:
return rst
case <-time.After(timeout):
rs.complete(request, gethcommon.Hash{}, ErrSignReqTimedOut)
}
}
}
// Count returns number of currently pending requests
func (rs *PendingRequests) Count() int {
rs.mu.RLock()
defer rs.mu.RUnlock()
return len(rs.requests)
}
// Has checks whether a pending request with a given identifier exists in the list
func (rs *PendingRequests) Has(id string) bool {
rs.mu.RLock()
defer rs.mu.RUnlock()
_, ok := rs.requests[id]
return ok
}
// tryLock is used to avoid double-completion of the same request.
// it returns a request instance if it isn't processing yet, returns an error otherwise.
func (rs *PendingRequests) tryLock(id string) (*Request, error) {
rs.mu.Lock()
defer rs.mu.Unlock()
if tx, ok := rs.requests[id]; ok {
if tx.locked {
return nil, ErrSignReqInProgress
}
tx.locked = true
return tx, nil
}
return nil, ErrSignReqNotFound
}
// complete removes the request from the list if there is no error or an error is non-transient
func (rs *PendingRequests) complete(request *Request, hash gethcommon.Hash, err error) {
rs.mu.Lock()
defer rs.mu.Unlock()
request.locked = false
go NotifyOnReturn(request, err)
if err != nil && isTransient(err) {
return
}
delete(rs.requests, request.ID)
// hash is updated only if err is nil, but transaction is not removed from a queue
result := Result{Error: err}
if err == nil {
result.Hash = hash
}
request.result <- result
}

View File

@ -0,0 +1,216 @@
package sign
import (
"context"
"errors"
"testing"
"time"
"github.com/ethereum/go-ethereum/accounts/keystore"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/status-im/status-go/geth/account"
"github.com/stretchr/testify/suite"
)
const (
correctPassword = "password-correct"
wrongPassword = "password-wrong"
)
func testVerifyFunc(password string) (*account.SelectedExtKey, error) {
if password == correctPassword {
return nil, nil
}
return nil, keystore.ErrDecrypt
}
func TestPendingRequestsSuite(t *testing.T) {
suite.Run(t, new(PendingRequestsSuite))
}
type PendingRequestsSuite struct {
suite.Suite
pendingRequests *PendingRequests
}
func (s *PendingRequestsSuite) SetupTest() {
s.pendingRequests = NewPendingRequests()
}
func (s *PendingRequestsSuite) defaultCompleteFunc() completeFunc {
hash := gethcommon.Hash{1}
return func(acc *account.SelectedExtKey) (gethcommon.Hash, error) {
s.Nil(acc, "account should be `nil`")
return hash, nil
}
}
func (s *PendingRequestsSuite) delayedCompleteFunc() completeFunc {
hash := gethcommon.Hash{1}
return func(acc *account.SelectedExtKey) (gethcommon.Hash, error) {
time.Sleep(10 * time.Millisecond)
s.Nil(acc, "account should be `nil`")
return hash, nil
}
}
func (s *PendingRequestsSuite) errorCompleteFunc(err error) completeFunc {
hash := gethcommon.Hash{1}
return func(acc *account.SelectedExtKey) (gethcommon.Hash, error) {
s.Nil(acc, "account should be `nil`")
return hash, err
}
}
func (s *PendingRequestsSuite) TestGet() {
req, err := s.pendingRequests.Add(context.Background(), nil, s.defaultCompleteFunc())
s.NoError(err)
for i := 2; i > 0; i-- {
actualRequest, err := s.pendingRequests.Get(req.ID)
s.NoError(err)
s.Equal(req, actualRequest)
}
}
func (s *PendingRequestsSuite) testComplete(password string, hash gethcommon.Hash, completeFunc completeFunc) (string, error) {
req, err := s.pendingRequests.Add(context.Background(), nil, completeFunc)
s.NoError(err)
s.True(s.pendingRequests.Has(req.ID), "sign request should exist")
hash2, err := s.pendingRequests.Approve(req.ID, password, testVerifyFunc)
s.Equal(hash, hash2, "hashes should match")
return req.ID, err
}
func (s *PendingRequestsSuite) TestCompleteSuccess() {
id, err := s.testComplete(correctPassword, gethcommon.Hash{1}, s.defaultCompleteFunc())
s.NoError(err, "no errors should be there")
s.False(s.pendingRequests.Has(id), "sign request should not exist")
}
func (s *PendingRequestsSuite) TestCompleteTransientError() {
hash := gethcommon.Hash{}
id, err := s.testComplete(wrongPassword, hash, s.errorCompleteFunc(keystore.ErrDecrypt))
s.Equal(keystore.ErrDecrypt, err, "error value should be preserved")
s.True(s.pendingRequests.Has(id))
// verify that you are able to re-approve it after a transient error
_, err = s.pendingRequests.tryLock(id)
s.NoError(err)
}
func (s *PendingRequestsSuite) TestCompleteError() {
hash := gethcommon.Hash{1}
expectedError := errors.New("test")
id, err := s.testComplete(correctPassword, hash, s.errorCompleteFunc(expectedError))
s.Equal(expectedError, err, "error value should be preserved")
s.False(s.pendingRequests.Has(id))
}
func (s PendingRequestsSuite) TestMultipleComplete() {
id, err := s.testComplete(correctPassword, gethcommon.Hash{1}, s.defaultCompleteFunc())
s.NoError(err, "no errors should be there")
_, err = s.pendingRequests.Approve(id, correctPassword, testVerifyFunc)
s.Equal(ErrSignReqNotFound, err)
}
func (s PendingRequestsSuite) TestConcurrentComplete() {
req, err := s.pendingRequests.Add(context.Background(), nil, s.delayedCompleteFunc())
s.NoError(err)
s.True(s.pendingRequests.Has(req.ID), "sign request should exist")
approved := 0
tried := 0
for i := 10; i > 0; i-- {
go func() {
_, err = s.pendingRequests.Approve(req.ID, correctPassword, testVerifyFunc)
if err == nil {
approved++
}
tried++
}()
}
s.pendingRequests.Wait(req.ID, 10*time.Second)
s.False(s.pendingRequests.Has(req.ID), "sign request should exist")
s.Equal(approved, 1, "request should be approved only once")
s.Equal(tried, 10, "request should be tried to approve 10 times")
}
func (s PendingRequestsSuite) TestWaitSuccess() {
req, err := s.pendingRequests.Add(context.Background(), nil, s.defaultCompleteFunc())
s.NoError(err)
s.True(s.pendingRequests.Has(req.ID), "sign request should exist")
go func() {
_, err := s.pendingRequests.Approve(req.ID, correctPassword, testVerifyFunc)
s.NoError(err)
}()
result := s.pendingRequests.Wait(req.ID, 1*time.Second)
s.NoError(result.Error)
}
func (s PendingRequestsSuite) TestDiscard() {
req, err := s.pendingRequests.Add(context.Background(), nil, s.defaultCompleteFunc())
s.NoError(err)
s.True(s.pendingRequests.Has(req.ID), "sign request should exist")
s.Equal(ErrSignReqNotFound, s.pendingRequests.Discard(""))
go func() {
// enough to make it be called after Wait
time.Sleep(time.Millisecond)
s.NoError(s.pendingRequests.Discard(req.ID))
}()
result := s.pendingRequests.Wait(req.ID, 1*time.Second)
s.Equal(ErrSignReqDiscarded, result.Error)
}
func (s PendingRequestsSuite) TestWaitFail() {
expectedError := errors.New("test-wait-fail")
req, err := s.pendingRequests.Add(context.Background(), nil, s.errorCompleteFunc(expectedError))
s.NoError(err)
s.True(s.pendingRequests.Has(req.ID), "sign request should exist")
go func() {
_, err := s.pendingRequests.Approve(req.ID, correctPassword, testVerifyFunc)
s.Equal(expectedError, err)
}()
result := s.pendingRequests.Wait(req.ID, 1*time.Second)
s.Equal(expectedError, result.Error)
}
func (s PendingRequestsSuite) TestWaitTimeout() {
req, err := s.pendingRequests.Add(context.Background(), nil, s.delayedCompleteFunc())
s.NoError(err)
s.True(s.pendingRequests.Has(req.ID), "sign request should exist")
go func() {
_, err := s.pendingRequests.Approve(req.ID, correctPassword, testVerifyFunc)
s.NoError(err)
}()
result := s.pendingRequests.Wait(req.ID, 0*time.Second)
s.Equal(result.Error, ErrSignReqTimedOut)
}

36
sign/request.go Normal file
View File

@ -0,0 +1,36 @@
package sign
import (
"context"
"github.com/ethereum/go-ethereum/common"
"github.com/pborman/uuid"
"github.com/status-im/status-go/geth/account"
)
type completeFunc func(*account.SelectedExtKey) (common.Hash, error)
// Meta represents any metadata that could be attached to a signing request.
// It will be JSON-serialized and used in notifications to the API consumer.
type Meta interface{}
// Request is a single signing request.
type Request struct {
ID string
Meta Meta
context context.Context
locked bool
completeFunc completeFunc
result chan Result
}
func newRequest(ctx context.Context, meta Meta, completeFunc completeFunc) *Request {
return &Request{
ID: uuid.New(),
Meta: meta,
context: ctx,
locked: false,
completeFunc: completeFunc,
result: make(chan Result, 1),
}
}

9
sign/result.go Normal file
View File

@ -0,0 +1,9 @@
package sign
import "github.com/ethereum/go-ethereum/common"
// Result is a result of a signing request, error or successful
type Result struct {
Hash common.Hash
Error error
}

View File

@ -13,7 +13,7 @@ import (
"github.com/status-im/status-go/geth/jail"
"github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/signal"
"github.com/status-im/status-go/geth/transactions"
"github.com/status-im/status-go/sign"
e2e "github.com/status-im/status-go/t/e2e"
. "github.com/status-im/status-go/t/utils"
"github.com/stretchr/testify/suite"
@ -126,7 +126,7 @@ func (s *JailRPCTestSuite) TestContractDeployment() {
unmarshalErr := json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(unmarshalErr, "cannot unmarshal JSON: %s", jsonEvent)
if envelope.Type == transactions.EventTransactionQueued {
if envelope.Type == sign.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
s.T().Logf("transaction queued and will be completed shortly, id: %v", event["id"])
@ -284,7 +284,7 @@ func (s *JailRPCTestSuite) TestJailVMPersistence() {
s.T().Errorf("cannot unmarshal event's JSON: %s", jsonEvent)
return
}
if envelope.Type == transactions.EventTransactionQueued {
if envelope.Type == sign.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
s.T().Logf("Transaction queued (will be completed shortly): {id: %s}\n", event["id"].(string))

View File

@ -5,6 +5,7 @@ import (
whisper "github.com/ethereum/go-ethereum/whisper/whisperv6"
"github.com/status-im/status-go/geth/api"
"github.com/status-im/status-go/sign"
"github.com/status-im/status-go/geth/node"
"github.com/status-im/status-go/geth/signal"
@ -123,9 +124,14 @@ func (s *BackendTestSuite) WhisperService() *whisper.Whisper {
return whisperService
}
// TxQueueManager returns a reference to the TxQueueManager.
func (s *BackendTestSuite) TxQueueManager() *transactions.Manager {
return s.Backend.TxQueueManager()
// Transactor returns a reference to the Transactor.
func (s *BackendTestSuite) Transactor() *transactions.Transactor {
return s.Backend.Transactor()
}
// PendingSignRequests returns a reference to PendingSignRequests.
func (s *BackendTestSuite) PendingSignRequests() *sign.PendingRequests {
return s.Backend.PendingSignRequests()
}
func importTestAccounts(keyStoreDir string) (err error) {

View File

@ -6,7 +6,6 @@ import (
"fmt"
"math/big"
"reflect"
"sync"
"testing"
"time"
@ -18,6 +17,7 @@ import (
"github.com/status-im/status-go/geth/params"
"github.com/status-im/status-go/geth/signal"
"github.com/status-im/status-go/geth/transactions"
"github.com/status-im/status-go/sign"
e2e "github.com/status-im/status-go/t/e2e"
. "github.com/status-im/status-go/t/utils"
"github.com/stretchr/testify/suite"
@ -50,12 +50,11 @@ func (s *TransactionsTestSuite) TestCallRPCSendTransaction() {
err := json.Unmarshal([]byte(rawSignal), &sg)
s.NoError(err)
if sg.Type == transactions.EventTransactionQueued {
if sg.Type == sign.EventTransactionQueued {
event := sg.Event.(map[string]interface{})
txID := event["id"].(string)
txHash, err = s.Backend.CompleteTransaction(string(txID), TestConfig.Account1.Password)
s.NoError(err, "cannot complete queued transaction %s", txID)
close(transactionCompleted)
}
})
@ -102,7 +101,7 @@ func (s *TransactionsTestSuite) TestCallRPCSendTransactionUpstream() {
err := json.Unmarshal([]byte(rawSignal), &signalEnvelope)
s.NoError(err)
if signalEnvelope.Type == transactions.EventTransactionQueued {
if signalEnvelope.Type == sign.EventTransactionQueued {
event := signalEnvelope.Event.(map[string]interface{})
txID := event["id"].(string)
@ -155,11 +154,12 @@ func (s *TransactionsTestSuite) TestEmptyToFieldPreserved() {
}
err := json.Unmarshal([]byte(rawSignal), &sg)
s.NoError(err)
if sg.Type == transactions.EventTransactionQueued {
var event transactions.SendTransactionEvent
if sg.Type == sign.EventTransactionQueued {
var event sign.SendTransactionEvent
s.NoError(json.Unmarshal(sg.Event, &event))
s.NotNil(event.Args.From)
s.Nil(event.Args.To)
args := event.Args.(map[string]interface{})
s.NotNil(args["from"])
s.Nil(args["to"])
_, err := s.Backend.CompleteTransaction(event.ID, TestConfig.Account1.Password)
s.NoError(err)
close(transactionCompleted)
@ -203,6 +203,8 @@ func (s *TransactionsTestSuite) TestSendContractTxCollision() {
}
s.testSendContractTx(initFunc, nil, "")
s.NoError(s.Backend.AccountManager().Logout())
// Scenario 2: Both fields are filled with different values, expect an error
inverted := func(source []byte) []byte {
inverse := make([]byte, len(source))
@ -245,7 +247,7 @@ func (s *TransactionsTestSuite) testSendContractTx(setInputAndDataValue initFunc
err = json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))
if envelope.Type == transactions.EventTransactionQueued {
if envelope.Type == sign.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string))
@ -271,7 +273,7 @@ func (s *TransactionsTestSuite) testSendContractTx(setInputAndDataValue initFunc
)
s.EqualError(
err,
transactions.ErrInvalidCompleteTxSender.Error(),
sign.ErrInvalidCompleteTxSender.Error(),
fmt.Sprintf("expected error on queued transaction[%v] not thrown", event["id"]),
)
@ -282,7 +284,11 @@ func (s *TransactionsTestSuite) testSendContractTx(setInputAndDataValue initFunc
string(event["id"].(string)),
TestConfig.Account1.Password,
)
s.NoError(err, fmt.Sprintf("cannot complete queued transaction[%v]", event["id"]))
if expectedError != nil {
s.Equal(expectedError, err)
} else {
s.NoError(err, fmt.Sprintf("cannot complete queued transaction[%v]", event["id"]))
}
log.Info("contract transaction complete", "URL", "https://ropsten.etherscan.io/tx/"+txHash.Hex())
close(completeQueuedTransaction)
@ -319,7 +325,7 @@ func (s *TransactionsTestSuite) testSendContractTx(setInputAndDataValue initFunc
s.Equal(txHashCheck.Hex(), txHash.Hex(), "transaction hash returned from SendTransaction is invalid")
s.False(reflect.DeepEqual(txHashCheck, gethcommon.Hash{}), "transaction was never queued or completed")
s.Zero(s.TxQueueManager().TransactionQueue().Count(), "tx queue must be empty at this point")
s.Zero(s.PendingSignRequests().Count(), "tx queue must be empty at this point")
}
func (s *TransactionsTestSuite) TestSendEther() {
@ -341,7 +347,7 @@ func (s *TransactionsTestSuite) TestSendEther() {
err = json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))
if envelope.Type == transactions.EventTransactionQueued {
if envelope.Type == sign.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string))
@ -365,7 +371,7 @@ func (s *TransactionsTestSuite) TestSendEther() {
string(event["id"].(string)), TestConfig.Account1.Password)
s.EqualError(
err,
transactions.ErrInvalidCompleteTxSender.Error(),
sign.ErrInvalidCompleteTxSender.Error(),
fmt.Sprintf("expected error on queued transaction[%v] not thrown", event["id"]),
)
@ -399,7 +405,7 @@ func (s *TransactionsTestSuite) TestSendEther() {
s.Equal(txHashCheck.Hex(), txHash.Hex(), "transaction hash returned from SendTransaction is invalid")
s.False(reflect.DeepEqual(txHashCheck, gethcommon.Hash{}), "transaction was never queued or completed")
s.Zero(s.Backend.TxQueueManager().TransactionQueue().Count(), "tx queue must be empty at this point")
s.Zero(s.Backend.PendingSignRequests().Count(), "tx queue must be empty at this point")
}
func (s *TransactionsTestSuite) TestSendEtherTxUpstream() {
@ -424,7 +430,7 @@ func (s *TransactionsTestSuite) TestSendEtherTxUpstream() {
err = json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err, "cannot unmarshal JSON: %s", jsonEvent)
if envelope.Type == transactions.EventTransactionQueued {
if envelope.Type == sign.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
log.Info("transaction queued (will be completed shortly)", "id", event["id"].(string))
@ -456,7 +462,7 @@ func (s *TransactionsTestSuite) TestSendEtherTxUpstream() {
}
s.Equal(txHash.Hex(), txHashCheck.Hex(), "transaction hash returned from SendTransaction is invalid")
s.Zero(s.Backend.TxQueueManager().TransactionQueue().Count(), "tx queue must be empty at this point")
s.Zero(s.Backend.PendingSignRequests().Count(), "tx queue must be empty at this point")
}
func (s *TransactionsTestSuite) TestDoubleCompleteQueuedTransactions() {
@ -478,7 +484,7 @@ func (s *TransactionsTestSuite) TestDoubleCompleteQueuedTransactions() {
err := json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))
if envelope.Type == transactions.EventTransactionQueued {
if envelope.Type == sign.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
txID := string(event["id"].(string))
log.Info("transaction queued (will be failed and completed on the second call)", "id", txID)
@ -488,7 +494,7 @@ func (s *TransactionsTestSuite) TestDoubleCompleteQueuedTransactions() {
_, err = s.Backend.CompleteTransaction(txID, TestConfig.Account1.Password+"wrong")
s.EqualError(err, keystore.ErrDecrypt.Error())
s.Equal(1, s.TxQueueManager().TransactionQueue().Count(), "txqueue cannot be empty, as tx has failed")
s.Equal(1, s.PendingSignRequests().Count(), "txqueue cannot be empty, as tx has failed")
// now try to complete transaction, but with the correct password
txHash, err = s.Backend.CompleteTransaction(txID, TestConfig.Account1.Password)
@ -498,7 +504,7 @@ func (s *TransactionsTestSuite) TestDoubleCompleteQueuedTransactions() {
close(completeQueuedTransaction)
}
if envelope.Type == transactions.EventTransactionFailed {
if envelope.Type == sign.EventTransactionFailed {
event := envelope.Event.(map[string]interface{})
log.Info("transaction return event received", "id", event["id"].(string))
@ -529,7 +535,7 @@ func (s *TransactionsTestSuite) TestDoubleCompleteQueuedTransactions() {
s.Equal(txHashCheck.Hex(), txHash.Hex(), "transaction hash returned from SendTransaction is invalid")
s.False(reflect.DeepEqual(txHashCheck, gethcommon.Hash{}), "transaction was never queued or completed")
s.Zero(s.Backend.TxQueueManager().TransactionQueue().Count(), "tx queue must be empty at this point")
s.Zero(s.Backend.PendingSignRequests().Count(), "tx queue must be empty at this point")
s.True(txFailedEventCalled, "expected tx failure signal is not received")
}
@ -539,9 +545,6 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() {
EnsureNodeSync(s.Backend.StatusNode().EnsureSync)
// reset queue
s.Backend.TxQueueManager().TransactionQueue().Reset()
// log into account from which transactions will be sent
s.NoError(s.Backend.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password))
@ -554,12 +557,12 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() {
err := json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))
if envelope.Type == transactions.EventTransactionQueued {
if envelope.Type == sign.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
txID := string(event["id"].(string))
log.Info("transaction queued (will be discarded soon)", "id", txID)
s.True(s.Backend.TxQueueManager().TransactionQueue().Has(txID), "txqueue should still have test tx")
s.True(s.Backend.PendingSignRequests().Has(txID), "txqueue should still have test tx")
// discard
err := s.Backend.DiscardTransaction(txID)
@ -570,18 +573,18 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() {
s.EqualError(err, "transaction hash not found", "expects tx not found, but call to CompleteTransaction succeeded")
time.Sleep(1 * time.Second) // make sure that tx complete signal propagates
s.False(s.Backend.TxQueueManager().TransactionQueue().Has(txID),
s.False(s.Backend.PendingSignRequests().Has(txID),
fmt.Sprintf("txqueue should not have test tx at this point (it should be discarded): %s", txID))
close(completeQueuedTransaction)
}
if envelope.Type == transactions.EventTransactionFailed {
if envelope.Type == sign.EventTransactionFailed {
event := envelope.Event.(map[string]interface{})
log.Info("transaction return event received", "id", event["id"].(string))
receivedErrMessage := event["error_message"].(string)
expectedErrMessage := transactions.ErrQueuedTxDiscarded.Error()
expectedErrMessage := sign.ErrSignReqDiscarded.Error()
s.Equal(receivedErrMessage, expectedErrMessage)
receivedErrCode := event["error_code"].(string)
@ -597,16 +600,16 @@ func (s *TransactionsTestSuite) TestDiscardQueuedTransaction() {
To: account.ToAddress(TestConfig.Account2.Address),
Value: (*hexutil.Big)(big.NewInt(1000000000000)),
})
s.EqualError(err, transactions.ErrQueuedTxDiscarded.Error(), "transaction is expected to be discarded")
s.EqualError(err, sign.ErrSignReqDiscarded.Error(), "transaction is expected to be discarded")
select {
case <-completeQueuedTransaction:
case <-time.After(time.Minute):
case <-time.After(10 * time.Second):
s.FailNow("test timed out")
}
s.True(reflect.DeepEqual(txHashCheck, gethcommon.Hash{}), "transaction returned hash, while it shouldn't")
s.Zero(s.Backend.TxQueueManager().TransactionQueue().Count(), "tx queue must be empty at this point")
s.Zero(s.Backend.PendingSignRequests().Count(), "tx queue must be empty at this point")
s.True(txFailedEventCalled, "expected tx failure signal is not received")
}
@ -614,8 +617,6 @@ func (s *TransactionsTestSuite) TestCompleteMultipleQueuedTransactions() {
s.setupLocalNode()
defer s.StopTestBackend()
s.TxQueueManager().TransactionQueue().Reset()
// log into account from which transactions will be sent
err := s.Backend.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password)
s.NoError(err)
@ -629,9 +630,6 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() {
EnsureNodeSync(s.Backend.StatusNode().EnsureSync)
// reset queue
s.Backend.TxQueueManager().TransactionQueue().Reset()
// log into account from which transactions will be sent
s.NoError(s.Backend.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password))
@ -645,22 +643,22 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() {
var envelope signal.Envelope
err := json.Unmarshal([]byte(jsonEvent), &envelope)
s.NoError(err)
if envelope.Type == transactions.EventTransactionQueued {
if envelope.Type == sign.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
txID := string(event["id"].(string))
log.Info("transaction queued (will be discarded soon)", "id", txID)
s.True(s.Backend.TxQueueManager().TransactionQueue().Has(txID),
s.True(s.Backend.PendingSignRequests().Has(txID),
"txqueue should still have test tx")
txIDs <- txID
}
if envelope.Type == transactions.EventTransactionFailed {
if envelope.Type == sign.EventTransactionFailed {
event := envelope.Event.(map[string]interface{})
log.Info("transaction return event received", "id", event["id"].(string))
receivedErrMessage := event["error_message"].(string)
expectedErrMessage := transactions.ErrQueuedTxDiscarded.Error()
expectedErrMessage := sign.ErrSignReqDiscarded.Error()
s.Equal(receivedErrMessage, expectedErrMessage)
receivedErrCode := event["error_code"].(string)
@ -682,11 +680,11 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() {
To: account.ToAddress(TestConfig.Account2.Address),
Value: (*hexutil.Big)(big.NewInt(1000000000000)),
})
require.EqualError(err, transactions.ErrQueuedTxDiscarded.Error())
require.EqualError(err, sign.ErrSignReqDiscarded.Error())
require.Equal(gethcommon.Hash{}, txHashCheck, "transaction returned hash, while it shouldn't")
}
txQueueManager := s.Backend.TxQueueManager()
signRequests := s.Backend.PendingSignRequests()
// wait for transactions, and discard immediately
discardTxs := func(txIDs []string) {
@ -710,7 +708,7 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() {
for _, txID := range txIDs {
require.False(
txQueueManager.TransactionQueue().Has(txID),
signRequests.Has(txID),
"txqueue should not have test tx at this point (it should be discarded): %s",
txID,
)
@ -735,8 +733,9 @@ func (s *TransactionsTestSuite) TestDiscardMultipleQueuedTransactions() {
case <-time.After(1 * time.Minute):
s.FailNow("test timed out")
}
time.Sleep(5 * time.Second)
s.Zero(s.Backend.TxQueueManager().TransactionQueue().Count(), "tx queue must be empty at this point")
s.Zero(s.Backend.PendingSignRequests().Count(), "tx queue must be empty at this point")
}
func (s *TransactionsTestSuite) TestNonExistentQueuedTransactions() {
@ -752,68 +751,13 @@ func (s *TransactionsTestSuite) TestNonExistentQueuedTransactions() {
// try completing non-existing transaction
_, err := s.Backend.CompleteTransaction("some-bad-transaction-id", TestConfig.Account1.Password)
s.Error(err, "error expected and not received")
s.EqualError(err, transactions.ErrQueuedTxIDNotFound.Error())
}
func (s *TransactionsTestSuite) TestEvictionOfQueuedTransactions() {
s.StartTestBackend()
defer s.StopTestBackend()
var m sync.Mutex
txCount := 0
txIDs := [transactions.DefaultTxQueueCap + 5 + 10]string{}
signal.SetDefaultNodeNotificationHandler(func(rawSignal string) {
var sg signal.Envelope
err := json.Unmarshal([]byte(rawSignal), &sg)
s.NoError(err)
if sg.Type == transactions.EventTransactionQueued {
event := sg.Event.(map[string]interface{})
txID := event["id"].(string)
m.Lock()
txIDs[txCount] = string(txID)
txCount++
m.Unlock()
}
})
// reset queue
s.Backend.TxQueueManager().TransactionQueue().Reset()
// log into account from which transactions will be sent
s.NoError(s.Backend.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password))
txQueue := s.Backend.TxQueueManager().TransactionQueue()
s.Zero(txQueue.Count(), "transaction count should be zero")
for j := 0; j < 10; j++ {
go s.Backend.SendTransaction(context.TODO(), transactions.SendTxArgs{}) // nolint: errcheck
}
time.Sleep(2 * time.Second)
s.Equal(10, txQueue.Count(), "transaction count should be 10")
for i := 0; i < transactions.DefaultTxQueueCap+5; i++ { // stress test by hitting with lots of goroutines
go s.Backend.SendTransaction(context.TODO(), transactions.SendTxArgs{}) // nolint: errcheck
}
time.Sleep(5 * time.Second)
s.True(txQueue.Count() <= transactions.DefaultTxQueueCap, "transaction count should be %d (or %d): got %d", transactions.DefaultTxQueueCap, transactions.DefaultTxQueueCap-1, txQueue.Count())
m.Lock()
for _, txID := range txIDs {
txQueue.Remove(txID)
}
m.Unlock()
s.Zero(txQueue.Count(), "transaction count should be zero: %d", txQueue.Count())
s.EqualError(err, sign.ErrSignReqNotFound.Error())
}
func (s *TransactionsTestSuite) TestCompleteMultipleQueuedTransactionsUpstream() {
s.setupUpstreamNode()
defer s.StopTestBackend()
s.TxQueueManager().TransactionQueue().Reset()
// log into account from which transactions will be sent
err := s.Backend.SelectAccount(TestConfig.Account1.Address, TestConfig.Account1.Password)
s.NoError(err)
@ -849,7 +793,7 @@ func (s *TransactionsTestSuite) sendConcurrentTransactions(testTxCount int) {
err := json.Unmarshal([]byte(jsonEvent), &envelope)
require.NoError(err, fmt.Sprintf("cannot unmarshal JSON: %s", jsonEvent))
if envelope.Type == transactions.EventTransactionQueued {
if envelope.Type == sign.EventTransactionQueued {
event := envelope.Event.(map[string]interface{})
txID := string(event["id"].(string))
log.Info("transaction queued (will be completed in a single call, once aggregated)", "id", txID)
@ -892,7 +836,7 @@ func (s *TransactionsTestSuite) sendConcurrentTransactions(testTxCount int) {
for _, txID := range txIDs {
s.False(
s.Backend.TxQueueManager().TransactionQueue().Has(txID),
s.Backend.PendingSignRequests().Has(txID),
"txqueue should not have test tx at this point (it should be completed)",
)
}
@ -918,5 +862,5 @@ func (s *TransactionsTestSuite) sendConcurrentTransactions(testTxCount int) {
s.FailNow("test timed out")
}
s.Zero(s.TxQueueManager().TransactionQueue().Count(), "queue should be empty")
s.Zero(s.PendingSignRequests().Count(), "queue should be empty")
}