package transactions import ( "context" "math/big" "sync" "time" ethereum "github.com/ethereum/go-ethereum" gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/core/types" "github.com/status-im/status-go/geth/account" "github.com/status-im/status-go/geth/rpc" ) const ( // SendTxDefaultErrorCode is sent by default, when error is not nil, but type is unknown/unexpected. SendTxDefaultErrorCode = SendTransactionDefaultErrorCode // DefaultTxSendCompletionTimeout defines how many seconds to wait before returning result in sentTransaction(). DefaultTxSendCompletionTimeout = 300 * time.Second defaultGas = 90000 defaultTimeout = time.Minute ) // RPCClientProvider is an interface that provides a way // to obtain an rpc.Client. type RPCClientProvider interface { RPCClient() *rpc.Client } // Manager provides means to manage internal Status Backend (injected into LES) type Manager struct { rpcClientProvider RPCClientProvider txQueue *TxQueue ethTxClient EthTransactor notify bool completionTimeout time.Duration rpcCallTimeout time.Duration networkID uint64 addrLock *AddrLocker localNonce sync.Map log log.Logger } // NewManager returns a new Manager. func NewManager(rpcClientProvider RPCClientProvider) *Manager { return &Manager{ rpcClientProvider: rpcClientProvider, txQueue: newQueue(), addrLock: &AddrLocker{}, notify: true, completionTimeout: DefaultTxSendCompletionTimeout, rpcCallTimeout: defaultTimeout, localNonce: sync.Map{}, log: log.New("package", "status-go/geth/transactions.Manager"), } } // DisableNotificactions turns off notifications on enqueue and return of tx. // It is not thread safe and must be called only before manager is started. func (m *Manager) DisableNotificactions() { m.notify = false } // Start starts accepting new transactions into the queue. func (m *Manager) Start(networkID uint64) { m.log.Info("start Manager") m.networkID = networkID m.ethTxClient = NewEthTxClient(m.rpcClientProvider.RPCClient()) m.txQueue.Start() } // Stop stops accepting new transactions into the queue. func (m *Manager) Stop() { m.log.Info("stop Manager") m.txQueue.Stop() } // TransactionQueue returns a reference to the queue. func (m *Manager) TransactionQueue() *TxQueue { return m.txQueue } // QueueTransaction puts a transaction into the queue. func (m *Manager) QueueTransaction(tx *QueuedTx) error { if !tx.Args.Valid() { return ErrInvalidSendTxArgs } to := "" if tx.Args.To != nil { to = tx.Args.To.Hex() } m.log.Info("queue a new transaction", "id", tx.ID, "from", tx.Args.From.Hex(), "to", to) if err := m.txQueue.Enqueue(tx); err != nil { return err } if m.notify { NotifyOnEnqueue(tx) } return nil } func (m *Manager) txDone(tx *QueuedTx, hash gethcommon.Hash, err error) { if err := m.txQueue.Done(tx.ID, hash, err); err == ErrQueuedTxIDNotFound { m.log.Warn("transaction is already removed from a queue", "ID", tx.ID) return } if m.notify { NotifyOnReturn(tx, err) } } // WaitForTransaction adds a transaction to the queue and blocks // until it's completed, discarded or times out. func (m *Manager) WaitForTransaction(tx *QueuedTx) Result { m.log.Info("wait for transaction", "id", tx.ID) // now wait up until transaction is: // - completed (via CompleteQueuedTransaction), // - discarded (via DiscardQueuedTransaction) // - or times out for { select { case rst := <-tx.Result: return rst case <-time.After(m.completionTimeout): m.txDone(tx, gethcommon.Hash{}, ErrQueuedTxTimedOut) } } } // NotifyErrored sends a notification for the given transaction func (m *Manager) NotifyErrored(id string, inputError error) error { tx, err := m.txQueue.Get(id) if err != nil { m.log.Warn("error getting a queued transaction", "err", err) return err } if m.notify { NotifyOnReturn(tx, inputError) } return nil } // CompleteTransaction instructs backend to complete sending of a given transaction. func (m *Manager) CompleteTransaction(id string, account *account.SelectedExtKey) (hash gethcommon.Hash, err error) { m.log.Info("complete transaction", "id", id) tx, err := m.txQueue.Get(id) if err != nil { m.log.Warn("error getting a queued transaction", "err", err) return hash, err } if err := m.txQueue.LockInprogress(id); err != nil { m.log.Warn("can't process transaction", "err", err) return hash, err } if err := m.validateAccount(tx, account); err != nil { m.txDone(tx, hash, err) return hash, err } hash, err = m.completeTransaction(account, tx) m.log.Info("finally completed transaction", "id", tx.ID, "hash", hash, "err", err) m.txDone(tx, hash, err) return hash, err } // make sure that only account which created the tx can complete it func (m *Manager) validateAccount(tx *QueuedTx, selectedAccount *account.SelectedExtKey) error { if selectedAccount == nil { return account.ErrNoAccountSelected } // make sure that only account which created the tx can complete it if tx.Args.From.Hex() != selectedAccount.Address.Hex() { m.log.Warn("queued transaction does not belong to the selected account", "err", ErrInvalidCompleteTxSender) return ErrInvalidCompleteTxSender } return nil } func (m *Manager) completeTransaction(selectedAccount *account.SelectedExtKey, queuedTx *QueuedTx) (hash gethcommon.Hash, err error) { m.log.Info("complete transaction", "id", queuedTx.ID) m.addrLock.LockAddr(queuedTx.Args.From) var localNonce uint64 if val, ok := m.localNonce.Load(queuedTx.Args.From); ok { localNonce = val.(uint64) } var nonce uint64 defer func() { // nonce should be incremented only if tx completed without error // if upstream node returned nonce higher than ours we will stick to it if err == nil { m.localNonce.Store(queuedTx.Args.From, nonce+1) } m.addrLock.UnlockAddr(queuedTx.Args.From) }() ctx, cancel := context.WithTimeout(context.Background(), m.rpcCallTimeout) defer cancel() nonce, err = m.ethTxClient.PendingNonceAt(ctx, queuedTx.Args.From) if err != nil { return hash, err } // if upstream node returned nonce higher than ours we will use it, as it probably means // that another client was used for sending transactions if localNonce > nonce { nonce = localNonce } args := queuedTx.Args if !args.Valid() { return hash, ErrInvalidSendTxArgs } gasPrice := (*big.Int)(args.GasPrice) if args.GasPrice == nil { ctx, cancel = context.WithTimeout(context.Background(), m.rpcCallTimeout) defer cancel() gasPrice, err = m.ethTxClient.SuggestGasPrice(ctx) if err != nil { return hash, err } } chainID := big.NewInt(int64(m.networkID)) value := (*big.Int)(args.Value) toAddr := gethcommon.Address{} if args.To != nil { toAddr = *args.To } var gas uint64 if args.Gas == nil { ctx, cancel = context.WithTimeout(context.Background(), m.rpcCallTimeout) defer cancel() gas, err = m.ethTxClient.EstimateGas(ctx, ethereum.CallMsg{ From: args.From, To: args.To, GasPrice: gasPrice, Value: value, Data: args.GetInput(), }) if err != nil { return hash, err } if gas < defaultGas { m.log.Info("default gas will be used. estimated gas", gas, "is lower than", defaultGas) gas = defaultGas } } else { gas = uint64(*args.Gas) } m.log.Info( "preparing raw transaction", "from", args.From.Hex(), "to", toAddr.Hex(), "gas", gas, "gasPrice", gasPrice, "value", value, ) tx := types.NewTransaction(nonce, toAddr, value, gas, gasPrice, args.GetInput()) signedTx, err := types.SignTx(tx, types.NewEIP155Signer(chainID), selectedAccount.AccountKey.PrivateKey) if err != nil { return hash, err } ctx, cancel = context.WithTimeout(context.Background(), m.rpcCallTimeout) defer cancel() if err := m.ethTxClient.SendTransaction(ctx, signedTx); err != nil { return hash, err } return signedTx.Hash(), nil } // DiscardTransaction discards a given transaction from transaction queue func (m *Manager) DiscardTransaction(id string) error { tx, err := m.txQueue.Get(id) if err != nil { return err } err = m.txQueue.Done(id, gethcommon.Hash{}, ErrQueuedTxDiscarded) if m.notify { NotifyOnReturn(tx, ErrQueuedTxDiscarded) } return err } // SendTransactionRPCHandler is a handler for eth_sendTransaction method. // It accepts one param which is a slice with a map of transaction params. func (m *Manager) SendTransactionRPCHandler(ctx context.Context, args ...interface{}) (interface{}, error) { m.log.Info("SendTransactionRPCHandler called") tx := Create(ctx, m.rpcCalltoSendTxArgs(args...)) if err := m.QueueTransaction(tx); err != nil { return nil, err } rst := m.WaitForTransaction(tx) if rst.Error != nil { return nil, rst.Error } return rst.Hash.Hex(), nil } func (m *Manager) rpcCalltoSendTxArgs(args ...interface{}) SendTxArgs { var err error var fromAddr, toAddr gethcommon.Address rpcCall := rpc.Call{Params: args} fromAddr, err = rpcCall.ParseFromAddress() if err != nil { fromAddr = gethcommon.HexToAddress("0x0") } toAddr, err = rpcCall.ParseToAddress() if err != nil { toAddr = gethcommon.HexToAddress("0x0") } input := rpcCall.ParseInput() data := rpcCall.ParseData() return SendTxArgs{ To: &toAddr, From: fromAddr, Value: rpcCall.ParseValue(), Input: input, Data: data, Gas: rpcCall.ParseGas(), GasPrice: rpcCall.ParseGasPrice(), } }