status-go/geth/txqueue/txqueue.go

294 lines
8.1 KiB
Go

package txqueue
import (
"errors"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/accounts/keystore"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/status-im/status-go/geth/common"
"github.com/status-im/status-go/geth/log"
)
const (
// DefaultTxQueueCap defines how many items can be queued.
DefaultTxQueueCap = int(35)
// DefaultTxSendQueueCap defines how many items can be passed to sendTransaction() w/o blocking.
DefaultTxSendQueueCap = int(70)
// DefaultTxSendCompletionTimeout defines how many seconds to wait before returning result in sentTransaction().
DefaultTxSendCompletionTimeout = 300
)
var (
//ErrQueuedTxIDNotFound - error transaction hash not found
ErrQueuedTxIDNotFound = errors.New("transaction hash not found")
//ErrQueuedTxTimedOut - error transaction sending timed out
ErrQueuedTxTimedOut = errors.New("transaction sending timed out")
//ErrQueuedTxDiscarded - error transaction discarded
ErrQueuedTxDiscarded = errors.New("transaction has been discarded")
//ErrQueuedTxInProgress - error transaction in progress
ErrQueuedTxInProgress = errors.New("transaction is in progress")
//ErrQueuedTxAlreadyProcessed - error transaction has already processed
ErrQueuedTxAlreadyProcessed = errors.New("transaction has been already processed")
//ErrInvalidCompleteTxSender - error transaction with invalid sender
ErrInvalidCompleteTxSender = errors.New("transaction can only be completed by the same account which created it")
)
// TxQueue is capped container that holds pending transactions
type TxQueue struct {
transactions map[common.QueuedTxID]*common.QueuedTx
mu sync.RWMutex // to guard transactions map
evictableIDs chan common.QueuedTxID
enqueueTicker chan struct{}
incomingPool chan *common.QueuedTx
// when this channel is closed, all queue channels processing must cease (incoming queue, processing queued items etc)
stopped chan struct{}
stoppedGroup sync.WaitGroup // to make sure that all routines are stopped
// when items are enqueued notify subscriber
txEnqueueHandler common.EnqueuedTxHandler
// when tx is returned (either successfully or with error) notify subscriber
txReturnHandler common.EnqueuedTxReturnHandler
}
// NewTransactionQueue make new transaction queue
func NewTransactionQueue() *TxQueue {
log.Info("initializing transaction queue")
return &TxQueue{
transactions: make(map[common.QueuedTxID]*common.QueuedTx),
evictableIDs: make(chan common.QueuedTxID, DefaultTxQueueCap), // will be used to evict in FIFO
enqueueTicker: make(chan struct{}),
incomingPool: make(chan *common.QueuedTx, DefaultTxSendQueueCap),
}
}
// Start starts enqueue and eviction loops
func (q *TxQueue) Start() {
log.Info("starting transaction queue")
if q.stopped != nil {
return
}
q.stopped = make(chan struct{})
q.stoppedGroup.Add(2)
go q.evictionLoop()
go q.enqueueLoop()
}
// Stop stops transaction enqueue and eviction loops
func (q *TxQueue) Stop() {
log.Info("stopping transaction queue")
if q.stopped == nil {
return
}
close(q.stopped) // stops all processing loops (enqueue, eviction etc)
q.stoppedGroup.Wait()
q.stopped = nil
log.Info("finally stopped transaction queue")
}
// evictionLoop frees up queue to accommodate another transaction item
func (q *TxQueue) evictionLoop() {
defer HaltOnPanic()
evict := func() {
if len(q.transactions) >= DefaultTxQueueCap { // eviction is required to accommodate another/last item
q.Remove(<-q.evictableIDs)
}
}
for {
select {
case <-time.After(250 * time.Millisecond): // do not wait for manual ticks, check queue regularly
evict()
case <-q.enqueueTicker: // when manually requested
evict()
case <-q.stopped:
log.Info("transaction queue's eviction loop stopped")
q.stoppedGroup.Done()
return
}
}
}
// enqueueLoop process incoming enqueue requests
func (q *TxQueue) enqueueLoop() {
defer HaltOnPanic()
// enqueue incoming transactions
for {
select {
case queuedTx := <-q.incomingPool:
log.Info("transaction enqueued requested", "tx", queuedTx.ID)
err := q.Enqueue(queuedTx)
log.Warn("transaction enqueued error", "tx", err)
log.Info("transaction enqueued", "tx", queuedTx.ID)
case <-q.stopped:
log.Info("transaction queue's enqueue loop stopped")
q.stoppedGroup.Done()
return
}
}
}
// Reset is to be used in tests only, as it simply creates new transaction map, w/o any cleanup of the previous one
func (q *TxQueue) Reset() {
q.mu.Lock()
defer q.mu.Unlock()
q.transactions = make(map[common.QueuedTxID]*common.QueuedTx)
q.evictableIDs = make(chan common.QueuedTxID, DefaultTxQueueCap)
}
// EnqueueAsync enqueues incoming transaction in async manner, returns as soon as possible
func (q *TxQueue) EnqueueAsync(tx *common.QueuedTx) error {
q.incomingPool <- tx
return nil
}
// Enqueue enqueues incoming transaction
func (q *TxQueue) Enqueue(tx *common.QueuedTx) error {
log.Info(fmt.Sprintf("enqueue transaction: %s", tx.ID))
if q.txEnqueueHandler == nil { //discard, until handler is provided
log.Info("there is no txEnqueueHandler")
return nil
}
log.Info("before enqueueTicker")
q.enqueueTicker <- struct{}{} // notify eviction loop that we are trying to insert new item
log.Info("before evictableIDs")
q.evictableIDs <- tx.ID // this will block when we hit DefaultTxQueueCap
log.Info("after evictableIDs")
q.mu.Lock()
q.transactions[tx.ID] = tx
q.mu.Unlock()
// notify handler
log.Info("calling txEnqueueHandler")
q.txEnqueueHandler(tx)
return nil
}
// Get returns transaction by transaction identifier
func (q *TxQueue) Get(id common.QueuedTxID) (*common.QueuedTx, error) {
q.mu.RLock()
defer q.mu.RUnlock()
if tx, ok := q.transactions[id]; ok {
return tx, nil
}
return nil, ErrQueuedTxIDNotFound
}
// Remove removes transaction by transaction identifier
func (q *TxQueue) Remove(id common.QueuedTxID) {
q.mu.Lock()
defer q.mu.Unlock()
delete(q.transactions, id)
}
// StartProcessing marks a transaction as in progress. It's thread-safe and
// prevents from processing the same transaction multiple times.
func (q *TxQueue) StartProcessing(tx *common.QueuedTx) error {
q.mu.Lock()
defer q.mu.Unlock()
if tx.Hash != (gethcommon.Hash{}) || tx.Err != nil {
return ErrQueuedTxAlreadyProcessed
}
if tx.InProgress {
return ErrQueuedTxInProgress
}
tx.InProgress = true
return nil
}
// StopProcessing removes the "InProgress" flag from the transaction.
func (q *TxQueue) StopProcessing(tx *common.QueuedTx) {
q.mu.Lock()
defer q.mu.Unlock()
tx.InProgress = false
}
// Count returns number of currently queued transactions
func (q *TxQueue) Count() int {
q.mu.RLock()
defer q.mu.RUnlock()
return len(q.transactions)
}
// Has checks whether transaction with a given identifier exists in queue
func (q *TxQueue) Has(id common.QueuedTxID) bool {
q.mu.RLock()
defer q.mu.RUnlock()
_, ok := q.transactions[id]
return ok
}
// SetEnqueueHandler sets callback handler, that is triggered on enqueue operation
func (q *TxQueue) SetEnqueueHandler(fn common.EnqueuedTxHandler) {
q.txEnqueueHandler = fn
}
// SetTxReturnHandler sets callback handler, that is triggered when transaction is finished executing
func (q *TxQueue) SetTxReturnHandler(fn common.EnqueuedTxReturnHandler) {
q.txReturnHandler = fn
}
// NotifyOnQueuedTxReturn is invoked when transaction is ready to return
// Transaction can be in error state, or executed successfully at this point.
func (q *TxQueue) NotifyOnQueuedTxReturn(queuedTx *common.QueuedTx, err error) {
if q == nil {
return
}
// discard, if transaction is not found
if queuedTx == nil {
return
}
// on success, remove item from the queue and stop propagating
if err == nil {
q.Remove(queuedTx.ID)
return
}
// error occurred, send upward notification
if q.txReturnHandler == nil { // discard, until handler is provided
return
}
// remove from queue on any error (except for transient ones) and propagate
transientErrs := map[error]bool{
keystore.ErrDecrypt: true, // wrong password
ErrInvalidCompleteTxSender: true, // completing tx create from another account
}
if !transientErrs[err] { // remove only on unrecoverable errors
q.Remove(queuedTx.ID)
}
// notify handler
q.txReturnHandler(queuedTx, err)
}