mirror of
https://github.com/status-im/status-go.git
synced 2025-01-23 21:19:48 +00:00
653da5bcd0
Currently it is quite easy to introduce concurrency issues while working with transaction object. For example, race issue will exist every time while transaction is processed in a separate goroutine and caller will try to check for an error before event to Done channel is sent. This change removes all the data that is updated on transaction and leaves it with ID, Args and Context (which is not used at the moment). Signed-off-by: Dmitry Shulyak <yashulyak@gmail.com>
232 lines
6.5 KiB
Go
232 lines
6.5 KiB
Go
package queue
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/ethereum/go-ethereum/accounts/keystore"
|
|
gethcommon "github.com/ethereum/go-ethereum/common"
|
|
"github.com/status-im/status-go/geth/account"
|
|
"github.com/status-im/status-go/geth/common"
|
|
"github.com/status-im/status-go/geth/log"
|
|
)
|
|
|
|
const (
|
|
// DefaultTxQueueCap defines how many items can be queued.
|
|
DefaultTxQueueCap = int(35)
|
|
)
|
|
|
|
var (
|
|
// ErrQueuedTxExist - transaction was already enqueued
|
|
ErrQueuedTxExist = errors.New("transaction already exist in queue")
|
|
//ErrQueuedTxIDNotFound - error transaction hash not found
|
|
ErrQueuedTxIDNotFound = errors.New("transaction hash not found")
|
|
//ErrQueuedTxInProgress - error transaction is in progress
|
|
ErrQueuedTxInProgress = errors.New("transaction is in progress")
|
|
//ErrInvalidCompleteTxSender - error transaction with invalid sender
|
|
ErrInvalidCompleteTxSender = errors.New("transaction can only be completed by the same account which created it")
|
|
)
|
|
|
|
// remove from queue on any error (except for transient ones) and propagate
|
|
// defined as map[string]bool because errors from ethclient returned wrapped as jsonError
|
|
var transientErrs = map[string]bool{
|
|
keystore.ErrDecrypt.Error(): true, // wrong password
|
|
ErrInvalidCompleteTxSender.Error(): true, // completing tx create from another account
|
|
account.ErrNoAccountSelected.Error(): true, // account not selected
|
|
}
|
|
|
|
type empty struct{}
|
|
|
|
// TxQueue is capped container that holds pending transactions
|
|
type TxQueue struct {
|
|
mu sync.RWMutex // to guard transactions map
|
|
transactions map[common.QueuedTxID]*common.QueuedTx
|
|
inprogress map[common.QueuedTxID]empty
|
|
|
|
// TODO(dshulyak) research why eviction is done in separate goroutine
|
|
evictableIDs chan common.QueuedTxID
|
|
enqueueTicker chan struct{}
|
|
|
|
// when this channel is closed, all queue channels processing must cease (incoming queue, processing queued items etc)
|
|
stopped chan struct{}
|
|
stoppedGroup sync.WaitGroup // to make sure that all routines are stopped
|
|
}
|
|
|
|
// New creates a transaction queue.
|
|
func New() *TxQueue {
|
|
log.Info("initializing transaction queue")
|
|
return &TxQueue{
|
|
transactions: make(map[common.QueuedTxID]*common.QueuedTx),
|
|
inprogress: make(map[common.QueuedTxID]empty),
|
|
evictableIDs: make(chan common.QueuedTxID, DefaultTxQueueCap), // will be used to evict in FIFO
|
|
enqueueTicker: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// Start starts enqueue and eviction loops
|
|
func (q *TxQueue) Start() {
|
|
log.Info("starting transaction queue")
|
|
|
|
if q.stopped != nil {
|
|
return
|
|
}
|
|
|
|
q.stopped = make(chan struct{})
|
|
q.stoppedGroup.Add(1)
|
|
go q.evictionLoop()
|
|
}
|
|
|
|
// Stop stops transaction enqueue and eviction loops
|
|
func (q *TxQueue) Stop() {
|
|
log.Info("stopping transaction queue")
|
|
|
|
if q.stopped == nil {
|
|
return
|
|
}
|
|
|
|
close(q.stopped) // stops all processing loops (enqueue, eviction etc)
|
|
q.stoppedGroup.Wait()
|
|
q.stopped = nil
|
|
|
|
log.Info("finally stopped transaction queue")
|
|
}
|
|
|
|
// evictionLoop frees up queue to accommodate another transaction item
|
|
func (q *TxQueue) evictionLoop() {
|
|
defer HaltOnPanic()
|
|
evict := func() {
|
|
if q.Count() >= DefaultTxQueueCap { // eviction is required to accommodate another/last item
|
|
q.Remove(<-q.evictableIDs)
|
|
}
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-time.After(250 * time.Millisecond): // do not wait for manual ticks, check queue regularly
|
|
evict()
|
|
case <-q.enqueueTicker: // when manually requested
|
|
evict()
|
|
case <-q.stopped:
|
|
log.Info("transaction queue's eviction loop stopped")
|
|
q.stoppedGroup.Done()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Reset is to be used in tests only, as it simply creates new transaction map, w/o any cleanup of the previous one
|
|
func (q *TxQueue) Reset() {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
|
|
q.transactions = make(map[common.QueuedTxID]*common.QueuedTx)
|
|
q.evictableIDs = make(chan common.QueuedTxID, DefaultTxQueueCap)
|
|
q.inprogress = make(map[common.QueuedTxID]empty)
|
|
}
|
|
|
|
// Enqueue enqueues incoming transaction
|
|
func (q *TxQueue) Enqueue(tx *common.QueuedTx) error {
|
|
log.Info(fmt.Sprintf("enqueue transaction: %s", tx.ID))
|
|
q.mu.RLock()
|
|
if _, ok := q.transactions[tx.ID]; ok {
|
|
q.mu.RUnlock()
|
|
return ErrQueuedTxExist
|
|
}
|
|
q.mu.RUnlock()
|
|
|
|
// we can't hold a lock in this part
|
|
log.Debug("notifying eviction loop")
|
|
q.enqueueTicker <- struct{}{} // notify eviction loop that we are trying to insert new item
|
|
q.evictableIDs <- tx.ID // this will block when we hit DefaultTxQueueCap
|
|
log.Debug("notified eviction loop")
|
|
|
|
q.mu.Lock()
|
|
q.transactions[tx.ID] = tx
|
|
q.mu.Unlock()
|
|
|
|
// notify handler
|
|
log.Info("calling txEnqueueHandler")
|
|
return nil
|
|
}
|
|
|
|
// Get returns transaction by transaction identifier
|
|
func (q *TxQueue) Get(id common.QueuedTxID) (*common.QueuedTx, error) {
|
|
q.mu.RLock()
|
|
defer q.mu.RUnlock()
|
|
|
|
if tx, ok := q.transactions[id]; ok {
|
|
return tx, nil
|
|
}
|
|
return nil, ErrQueuedTxIDNotFound
|
|
}
|
|
|
|
// LockInprogress returns error if transaction is already inprogress.
|
|
func (q *TxQueue) LockInprogress(id common.QueuedTxID) error {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
if _, ok := q.transactions[id]; ok {
|
|
if _, inprogress := q.inprogress[id]; inprogress {
|
|
return ErrQueuedTxInProgress
|
|
}
|
|
q.inprogress[id] = empty{}
|
|
return nil
|
|
}
|
|
return ErrQueuedTxIDNotFound
|
|
}
|
|
|
|
// Remove removes transaction by transaction identifier
|
|
func (q *TxQueue) Remove(id common.QueuedTxID) {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
q.remove(id)
|
|
}
|
|
|
|
func (q *TxQueue) remove(id common.QueuedTxID) {
|
|
delete(q.transactions, id)
|
|
delete(q.inprogress, id)
|
|
}
|
|
|
|
// Done removes transaction from queue if no error or error is not transient
|
|
// and notify subscribers
|
|
func (q *TxQueue) Done(id common.QueuedTxID, hash gethcommon.Hash, err error) error {
|
|
q.mu.Lock()
|
|
defer q.mu.Unlock()
|
|
tx, ok := q.transactions[id]
|
|
if !ok {
|
|
return ErrQueuedTxIDNotFound
|
|
}
|
|
q.done(tx, hash, err)
|
|
return nil
|
|
}
|
|
|
|
func (q *TxQueue) done(tx *common.QueuedTx, hash gethcommon.Hash, err error) {
|
|
delete(q.inprogress, tx.ID)
|
|
// hash is updated only if err is nil, but transaction is not removed from a queue
|
|
if err == nil {
|
|
q.transactions[tx.ID].Result <- common.TransactionResult{Hash: hash, Error: err}
|
|
q.remove(tx.ID)
|
|
return
|
|
}
|
|
if _, transient := transientErrs[err.Error()]; !transient {
|
|
q.transactions[tx.ID].Result <- common.TransactionResult{Error: err}
|
|
q.remove(tx.ID)
|
|
}
|
|
}
|
|
|
|
// Count returns number of currently queued transactions
|
|
func (q *TxQueue) Count() int {
|
|
q.mu.RLock()
|
|
defer q.mu.RUnlock()
|
|
return len(q.transactions)
|
|
}
|
|
|
|
// Has checks whether transaction with a given identifier exists in queue
|
|
func (q *TxQueue) Has(id common.QueuedTxID) bool {
|
|
q.mu.RLock()
|
|
defer q.mu.RUnlock()
|
|
_, ok := q.transactions[id]
|
|
return ok
|
|
}
|