diff --git a/transaction_pool.go b/transaction_pool.go new file mode 100644 index 000000000..f645afd06 --- /dev/null +++ b/transaction_pool.go @@ -0,0 +1,161 @@ +package main + +import ( + "container/list" + "errors" + "github.com/ethereum/ethutil-go" + "log" + "math/big" + "sync" +) + +const ( + txPoolQueueSize = 50 +) + +func FindTx(pool *list.List, finder func(*ethutil.Transaction, *list.Element) bool) *ethutil.Transaction { + for e := pool.Front(); e != nil; e = e.Next() { + if tx, ok := e.Value.(*ethutil.Transaction); ok { + if finder(tx, e) { + return tx + } + } + } + + return nil +} + +// The tx pool a thread safe transaction pool handler. In order to +// guarantee a non blocking pool we use a queue channel which can be +// independently read without needing access to the actual pool. If the +// pool is being drained or synced for whatever reason the transactions +// will simple queue up and handled when the mutex is freed. +type TxPool struct { + server *Server + // The mutex for accessing the Tx pool. + mutex sync.Mutex + // Queueing channel for reading and writing incoming + // transactions to + queueChan chan *ethutil.Transaction + // Quiting channel + quit chan bool + + pool *list.List +} + +func NewTxPool(s *Server) *TxPool { + return &TxPool{ + server: s, + mutex: sync.Mutex{}, + pool: list.New(), + queueChan: make(chan *ethutil.Transaction, txPoolQueueSize), + quit: make(chan bool), + } +} + +// Blocking function. Don't use directly. Use QueueTransaction instead +func (pool *TxPool) addTransaction(tx *ethutil.Transaction) { + pool.mutex.Lock() + defer pool.mutex.Unlock() + + pool.pool.PushBack(tx) +} + +// Process transaction validates the Tx and processes funds from the +// sender to the recipient. +func (pool *TxPool) processTransaction(tx *ethutil.Transaction) error { + // Get the last block so we can retrieve the sender and receiver from + // the merkle trie + block := pool.server.blockManager.bc.LastBlock + // Something has gone horribly wrong if this happens + if block == nil { + return errors.New("No last block on the block chain") + } + + var sender, receiver *ethutil.Ether + + // Get the sender + data := block.State().Get(string(tx.Sender())) + // If it doesn't exist create a new account. Of course trying to send funds + // from this account will fail since it will hold 0 Wei + if data == "" { + sender = ethutil.NewEther(big.NewInt(0)) + } else { + sender = ethutil.NewEtherFromData([]byte(data)) + } + // Defer the update. Whatever happens it should be persisted + defer block.State().Update(string(tx.Sender()), string(sender.RlpEncode())) + + // Make sure there's enough in the sender's account. Having insufficient + // funds won't invalidate this transaction but simple ignores it. + if sender.Amount.Cmp(tx.Value) < 0 { + return errors.New("Insufficient amount in sender's account") + } + + // Subtract the amount from the senders account + sender.Amount.Sub(sender.Amount, tx.Value) + // Increment the nonce making each tx valid only once to prevent replay + // attacks + sender.Nonce += 1 + + // Get the receiver + data = block.State().Get(tx.Recipient) + // If the receiver doesn't exist yet, create a new account to which the + // funds will be send. + if data == "" { + receiver = ethutil.NewEther(big.NewInt(0)) + } else { + receiver = ethutil.NewEtherFromData([]byte(data)) + } + // Defer the update + defer block.State().Update(tx.Recipient, string(receiver.RlpEncode())) + + // Add the amount to receivers account which should conclude this transaction + receiver.Amount.Add(receiver.Amount, tx.Value) + + return nil +} + +func (pool *TxPool) queueHandler() { +out: + for { + select { + case tx := <-pool.queueChan: + // Process the transaction + err := pool.processTransaction(tx) + if err != nil { + log.Println("Error processing Tx", err) + } else { + // Call blocking version. At this point it + // doesn't matter since this is a goroutine + pool.addTransaction(tx) + } + case <-pool.quit: + break out + } + } +} + +func (pool *TxPool) QueueTransaction(tx *ethutil.Transaction) { + pool.queueChan <- tx +} + +func (pool *TxPool) Flush() { + pool.mutex.Lock() + + defer pool.mutex.Unlock() + + pool.mutex.Unlock() +} + +func (pool *TxPool) Start() { + go pool.queueHandler() +} + +func (pool *TxPool) Stop() { + log.Println("[TXP] Stopping...") + + close(pool.quit) + + pool.Flush() +}