2022-04-06 11:48:16 +02:00

178 lines
3.5 KiB
Go

package stm
import (
"math/rand"
"reflect"
"runtime/pprof"
"sync"
"time"
)
var (
txPool = sync.Pool{New: func() interface{} {
expvars.Add("new txs", 1)
tx := &Tx{
reads: make(map[*Var]VarValue),
writes: make(map[*Var]interface{}),
watching: make(map[*Var]struct{}),
}
tx.cond.L = &tx.mu
return tx
}}
failedCommitsProfile *pprof.Profile
)
const (
profileFailedCommits = false
sleepBetweenRetries = false
)
func init() {
if profileFailedCommits {
failedCommitsProfile = pprof.NewProfile("stmFailedCommits")
}
}
func newTx() *Tx {
tx := txPool.Get().(*Tx)
tx.tries = 0
tx.completed = false
return tx
}
func WouldBlock(fn Operation) (block bool) {
tx := newTx()
tx.reset()
_, block = catchRetry(fn, tx)
if len(tx.watching) != 0 {
panic("shouldn't have installed any watchers")
}
tx.recycle()
return
}
// Atomically executes the atomic function fn.
func Atomically(op Operation) interface{} {
expvars.Add("atomically", 1)
// run the transaction
tx := newTx()
retry:
tx.tries++
tx.reset()
if sleepBetweenRetries {
shift := int64(tx.tries - 1)
const maxShift = 30
if shift > maxShift {
shift = maxShift
}
ns := int64(1) << shift
d := time.Duration(rand.Int63n(ns))
if d > 100*time.Microsecond {
tx.updateWatchers()
time.Sleep(time.Duration(ns))
}
}
tx.mu.Lock()
ret, retry := catchRetry(op, tx)
tx.mu.Unlock()
if retry {
expvars.Add("retries", 1)
// wait for one of the variables we read to change before retrying
tx.wait()
goto retry
}
// verify the read log
tx.lockAllVars()
if tx.inputsChanged() {
tx.unlock()
expvars.Add("failed commits", 1)
if profileFailedCommits {
failedCommitsProfile.Add(new(int), 0)
}
goto retry
}
// commit the write log and broadcast that variables have changed
tx.commit()
tx.mu.Lock()
tx.completed = true
tx.cond.Broadcast()
tx.mu.Unlock()
tx.unlock()
expvars.Add("commits", 1)
tx.recycle()
return ret
}
// AtomicGet is a helper function that atomically reads a value.
func AtomicGet(v *Var) interface{} {
return v.value.Load().(VarValue).Get()
}
// AtomicSet is a helper function that atomically writes a value.
func AtomicSet(v *Var, val interface{}) {
v.mu.Lock()
v.changeValue(val)
v.mu.Unlock()
}
// Compose is a helper function that composes multiple transactions into a
// single transaction.
func Compose(fns ...Operation) Operation {
return func(tx *Tx) interface{} {
for _, f := range fns {
f(tx)
}
return nil
}
}
// Select runs the supplied functions in order. Execution stops when a
// function succeeds without calling Retry. If no functions succeed, the
// entire selection will be retried.
func Select(fns ...Operation) Operation {
return func(tx *Tx) interface{} {
switch len(fns) {
case 0:
// empty Select blocks forever
tx.Retry()
panic("unreachable")
case 1:
return fns[0](tx)
default:
oldWrites := tx.writes
tx.writes = make(map[*Var]interface{}, len(oldWrites))
for k, v := range oldWrites {
tx.writes[k] = v
}
ret, retry := catchRetry(fns[0], tx)
if retry {
tx.writes = oldWrites
return Select(fns[1:]...)(tx)
} else {
return ret
}
}
}
}
type Operation func(*Tx) interface{}
func VoidOperation(f func(*Tx)) Operation {
return func(tx *Tx) interface{} {
f(tx)
return nil
}
}
func AtomicModify(v *Var, f interface{}) {
r := reflect.ValueOf(f)
Atomically(VoidOperation(func(tx *Tx) {
cur := reflect.ValueOf(tx.Get(v))
out := r.Call([]reflect.Value{cur})
if lenOut := len(out); lenOut != 1 {
panic(lenOut)
}
tx.Set(v, out[0].Interface())
}))
}