Dmitry Shulyak 653da5bcd0 Result of tx processing returned as QueuedTxResult
Currently it is quite easy to introduce concurrency issues while working
with transaction object. For example, race issue will exist every time
while transaction is processed in a separate goroutine and caller will
try to check for an error before event to Done channel is sent.

This change removes all the data that is updated on transaction and leaves
it with ID, Args and Context (which is not used at the moment).

Signed-off-by: Dmitry Shulyak <yashulyak@gmail.com>
2018-02-02 09:47:56 +02:00

232 lines
6.5 KiB
Go

package queue
import (
"errors"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/accounts/keystore"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/status-im/status-go/geth/account"
"github.com/status-im/status-go/geth/common"
"github.com/status-im/status-go/geth/log"
)
const (
// DefaultTxQueueCap defines how many items can be queued.
DefaultTxQueueCap = int(35)
)
var (
// ErrQueuedTxExist - transaction was already enqueued
ErrQueuedTxExist = errors.New("transaction already exist in queue")
//ErrQueuedTxIDNotFound - error transaction hash not found
ErrQueuedTxIDNotFound = errors.New("transaction hash not found")
//ErrQueuedTxInProgress - error transaction is in progress
ErrQueuedTxInProgress = errors.New("transaction is in progress")
//ErrInvalidCompleteTxSender - error transaction with invalid sender
ErrInvalidCompleteTxSender = errors.New("transaction can only be completed by the same account which created it")
)
// remove from queue on any error (except for transient ones) and propagate
// defined as map[string]bool because errors from ethclient returned wrapped as jsonError
var transientErrs = map[string]bool{
keystore.ErrDecrypt.Error(): true, // wrong password
ErrInvalidCompleteTxSender.Error(): true, // completing tx create from another account
account.ErrNoAccountSelected.Error(): true, // account not selected
}
type empty struct{}
// TxQueue is capped container that holds pending transactions
type TxQueue struct {
mu sync.RWMutex // to guard transactions map
transactions map[common.QueuedTxID]*common.QueuedTx
inprogress map[common.QueuedTxID]empty
// TODO(dshulyak) research why eviction is done in separate goroutine
evictableIDs chan common.QueuedTxID
enqueueTicker chan struct{}
// when this channel is closed, all queue channels processing must cease (incoming queue, processing queued items etc)
stopped chan struct{}
stoppedGroup sync.WaitGroup // to make sure that all routines are stopped
}
// New creates a transaction queue.
func New() *TxQueue {
log.Info("initializing transaction queue")
return &TxQueue{
transactions: make(map[common.QueuedTxID]*common.QueuedTx),
inprogress: make(map[common.QueuedTxID]empty),
evictableIDs: make(chan common.QueuedTxID, DefaultTxQueueCap), // will be used to evict in FIFO
enqueueTicker: make(chan struct{}),
}
}
// Start starts enqueue and eviction loops
func (q *TxQueue) Start() {
log.Info("starting transaction queue")
if q.stopped != nil {
return
}
q.stopped = make(chan struct{})
q.stoppedGroup.Add(1)
go q.evictionLoop()
}
// Stop stops transaction enqueue and eviction loops
func (q *TxQueue) Stop() {
log.Info("stopping transaction queue")
if q.stopped == nil {
return
}
close(q.stopped) // stops all processing loops (enqueue, eviction etc)
q.stoppedGroup.Wait()
q.stopped = nil
log.Info("finally stopped transaction queue")
}
// evictionLoop frees up queue to accommodate another transaction item
func (q *TxQueue) evictionLoop() {
defer HaltOnPanic()
evict := func() {
if q.Count() >= DefaultTxQueueCap { // eviction is required to accommodate another/last item
q.Remove(<-q.evictableIDs)
}
}
for {
select {
case <-time.After(250 * time.Millisecond): // do not wait for manual ticks, check queue regularly
evict()
case <-q.enqueueTicker: // when manually requested
evict()
case <-q.stopped:
log.Info("transaction queue's eviction loop stopped")
q.stoppedGroup.Done()
return
}
}
}
// Reset is to be used in tests only, as it simply creates new transaction map, w/o any cleanup of the previous one
func (q *TxQueue) Reset() {
q.mu.Lock()
defer q.mu.Unlock()
q.transactions = make(map[common.QueuedTxID]*common.QueuedTx)
q.evictableIDs = make(chan common.QueuedTxID, DefaultTxQueueCap)
q.inprogress = make(map[common.QueuedTxID]empty)
}
// Enqueue enqueues incoming transaction
func (q *TxQueue) Enqueue(tx *common.QueuedTx) error {
log.Info(fmt.Sprintf("enqueue transaction: %s", tx.ID))
q.mu.RLock()
if _, ok := q.transactions[tx.ID]; ok {
q.mu.RUnlock()
return ErrQueuedTxExist
}
q.mu.RUnlock()
// we can't hold a lock in this part
log.Debug("notifying eviction loop")
q.enqueueTicker <- struct{}{} // notify eviction loop that we are trying to insert new item
q.evictableIDs <- tx.ID // this will block when we hit DefaultTxQueueCap
log.Debug("notified eviction loop")
q.mu.Lock()
q.transactions[tx.ID] = tx
q.mu.Unlock()
// notify handler
log.Info("calling txEnqueueHandler")
return nil
}
// Get returns transaction by transaction identifier
func (q *TxQueue) Get(id common.QueuedTxID) (*common.QueuedTx, error) {
q.mu.RLock()
defer q.mu.RUnlock()
if tx, ok := q.transactions[id]; ok {
return tx, nil
}
return nil, ErrQueuedTxIDNotFound
}
// LockInprogress returns error if transaction is already inprogress.
func (q *TxQueue) LockInprogress(id common.QueuedTxID) error {
q.mu.Lock()
defer q.mu.Unlock()
if _, ok := q.transactions[id]; ok {
if _, inprogress := q.inprogress[id]; inprogress {
return ErrQueuedTxInProgress
}
q.inprogress[id] = empty{}
return nil
}
return ErrQueuedTxIDNotFound
}
// Remove removes transaction by transaction identifier
func (q *TxQueue) Remove(id common.QueuedTxID) {
q.mu.Lock()
defer q.mu.Unlock()
q.remove(id)
}
func (q *TxQueue) remove(id common.QueuedTxID) {
delete(q.transactions, id)
delete(q.inprogress, id)
}
// Done removes transaction from queue if no error or error is not transient
// and notify subscribers
func (q *TxQueue) Done(id common.QueuedTxID, hash gethcommon.Hash, err error) error {
q.mu.Lock()
defer q.mu.Unlock()
tx, ok := q.transactions[id]
if !ok {
return ErrQueuedTxIDNotFound
}
q.done(tx, hash, err)
return nil
}
func (q *TxQueue) done(tx *common.QueuedTx, hash gethcommon.Hash, err error) {
delete(q.inprogress, tx.ID)
// hash is updated only if err is nil, but transaction is not removed from a queue
if err == nil {
q.transactions[tx.ID].Result <- common.TransactionResult{Hash: hash, Error: err}
q.remove(tx.ID)
return
}
if _, transient := transientErrs[err.Error()]; !transient {
q.transactions[tx.ID].Result <- common.TransactionResult{Error: err}
q.remove(tx.ID)
}
}
// Count returns number of currently queued transactions
func (q *TxQueue) Count() int {
q.mu.RLock()
defer q.mu.RUnlock()
return len(q.transactions)
}
// Has checks whether transaction with a given identifier exists in queue
func (q *TxQueue) Has(id common.QueuedTxID) bool {
q.mu.RLock()
defer q.mu.RUnlock()
_, ok := q.transactions[id]
return ok
}