mirror of
https://github.com/status-im/status-go.git
synced 2025-01-23 13:11:11 +00:00
ad326fa290
Allows runnning a store node depending on node config settings.
982 lines
22 KiB
Go
982 lines
22 KiB
Go
// Package migrate reads migrations from sources and runs them against databases.
|
|
// Sources are defined by the `source.Driver` and databases by the `database.Driver`
|
|
// interface. The driver interfaces are kept "dumb", all migration logic is kept
|
|
// in this package.
|
|
package migrate
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/hashicorp/go-multierror"
|
|
|
|
"github.com/golang-migrate/migrate/v4/database"
|
|
iurl "github.com/golang-migrate/migrate/v4/internal/url"
|
|
"github.com/golang-migrate/migrate/v4/source"
|
|
)
|
|
|
|
// DefaultPrefetchMigrations sets the number of migrations to pre-read
|
|
// from the source. This is helpful if the source is remote, but has little
|
|
// effect for a local source (i.e. file system).
|
|
// Please note that this setting has a major impact on the memory usage,
|
|
// since each pre-read migration is buffered in memory. See DefaultBufferSize.
|
|
var DefaultPrefetchMigrations = uint(10)
|
|
|
|
// DefaultLockTimeout sets the max time a database driver has to acquire a lock.
|
|
var DefaultLockTimeout = 15 * time.Second
|
|
|
|
var (
|
|
ErrNoChange = errors.New("no change")
|
|
ErrNilVersion = errors.New("no migration")
|
|
ErrInvalidVersion = errors.New("version must be >= -1")
|
|
ErrLocked = errors.New("database locked")
|
|
ErrLockTimeout = errors.New("timeout: can't acquire database lock")
|
|
)
|
|
|
|
// ErrShortLimit is an error returned when not enough migrations
|
|
// can be returned by a source for a given limit.
|
|
type ErrShortLimit struct {
|
|
Short uint
|
|
}
|
|
|
|
// Error implements the error interface.
|
|
func (e ErrShortLimit) Error() string {
|
|
return fmt.Sprintf("limit %v short", e.Short)
|
|
}
|
|
|
|
type ErrDirty struct {
|
|
Version int
|
|
}
|
|
|
|
func (e ErrDirty) Error() string {
|
|
return fmt.Sprintf("Dirty database version %v. Fix and force version.", e.Version)
|
|
}
|
|
|
|
type Migrate struct {
|
|
sourceName string
|
|
sourceDrv source.Driver
|
|
databaseName string
|
|
databaseDrv database.Driver
|
|
|
|
// Log accepts a Logger interface
|
|
Log Logger
|
|
|
|
// GracefulStop accepts `true` and will stop executing migrations
|
|
// as soon as possible at a safe break point, so that the database
|
|
// is not corrupted.
|
|
GracefulStop chan bool
|
|
isLockedMu *sync.Mutex
|
|
|
|
isGracefulStop bool
|
|
isLocked bool
|
|
|
|
// PrefetchMigrations defaults to DefaultPrefetchMigrations,
|
|
// but can be set per Migrate instance.
|
|
PrefetchMigrations uint
|
|
|
|
// LockTimeout defaults to DefaultLockTimeout,
|
|
// but can be set per Migrate instance.
|
|
LockTimeout time.Duration
|
|
}
|
|
|
|
// New returns a new Migrate instance from a source URL and a database URL.
|
|
// The URL scheme is defined by each driver.
|
|
func New(sourceURL, databaseURL string) (*Migrate, error) {
|
|
m := newCommon()
|
|
|
|
sourceName, err := iurl.SchemeFromURL(sourceURL)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
m.sourceName = sourceName
|
|
|
|
databaseName, err := iurl.SchemeFromURL(databaseURL)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
m.databaseName = databaseName
|
|
|
|
sourceDrv, err := source.Open(sourceURL)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
m.sourceDrv = sourceDrv
|
|
|
|
databaseDrv, err := database.Open(databaseURL)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
m.databaseDrv = databaseDrv
|
|
|
|
return m, nil
|
|
}
|
|
|
|
// NewWithDatabaseInstance returns a new Migrate instance from a source URL
|
|
// and an existing database instance. The source URL scheme is defined by each driver.
|
|
// Use any string that can serve as an identifier during logging as databaseName.
|
|
// You are responsible for closing the underlying database client if necessary.
|
|
func NewWithDatabaseInstance(sourceURL string, databaseName string, databaseInstance database.Driver) (*Migrate, error) {
|
|
m := newCommon()
|
|
|
|
sourceName, err := iurl.SchemeFromURL(sourceURL)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
m.sourceName = sourceName
|
|
|
|
m.databaseName = databaseName
|
|
|
|
sourceDrv, err := source.Open(sourceURL)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
m.sourceDrv = sourceDrv
|
|
|
|
m.databaseDrv = databaseInstance
|
|
|
|
return m, nil
|
|
}
|
|
|
|
// NewWithSourceInstance returns a new Migrate instance from an existing source instance
|
|
// and a database URL. The database URL scheme is defined by each driver.
|
|
// Use any string that can serve as an identifier during logging as sourceName.
|
|
// You are responsible for closing the underlying source client if necessary.
|
|
func NewWithSourceInstance(sourceName string, sourceInstance source.Driver, databaseURL string) (*Migrate, error) {
|
|
m := newCommon()
|
|
|
|
databaseName, err := iurl.SchemeFromURL(databaseURL)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
m.databaseName = databaseName
|
|
|
|
m.sourceName = sourceName
|
|
|
|
databaseDrv, err := database.Open(databaseURL)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
m.databaseDrv = databaseDrv
|
|
|
|
m.sourceDrv = sourceInstance
|
|
|
|
return m, nil
|
|
}
|
|
|
|
// NewWithInstance returns a new Migrate instance from an existing source and
|
|
// database instance. Use any string that can serve as an identifier during logging
|
|
// as sourceName and databaseName. You are responsible for closing down
|
|
// the underlying source and database client if necessary.
|
|
func NewWithInstance(sourceName string, sourceInstance source.Driver, databaseName string, databaseInstance database.Driver) (*Migrate, error) {
|
|
m := newCommon()
|
|
|
|
m.sourceName = sourceName
|
|
m.databaseName = databaseName
|
|
|
|
m.sourceDrv = sourceInstance
|
|
m.databaseDrv = databaseInstance
|
|
|
|
return m, nil
|
|
}
|
|
|
|
func newCommon() *Migrate {
|
|
return &Migrate{
|
|
GracefulStop: make(chan bool, 1),
|
|
PrefetchMigrations: DefaultPrefetchMigrations,
|
|
LockTimeout: DefaultLockTimeout,
|
|
isLockedMu: &sync.Mutex{},
|
|
}
|
|
}
|
|
|
|
// Close closes the source and the database.
|
|
func (m *Migrate) Close() (source error, database error) {
|
|
databaseSrvClose := make(chan error)
|
|
sourceSrvClose := make(chan error)
|
|
|
|
m.logVerbosePrintf("Closing source and database\n")
|
|
|
|
go func() {
|
|
databaseSrvClose <- m.databaseDrv.Close()
|
|
}()
|
|
|
|
go func() {
|
|
sourceSrvClose <- m.sourceDrv.Close()
|
|
}()
|
|
|
|
return <-sourceSrvClose, <-databaseSrvClose
|
|
}
|
|
|
|
// Migrate looks at the currently active migration version,
|
|
// then migrates either up or down to the specified version.
|
|
func (m *Migrate) Migrate(version uint) error {
|
|
if err := m.lock(); err != nil {
|
|
return err
|
|
}
|
|
|
|
curVersion, dirty, err := m.databaseDrv.Version()
|
|
if err != nil {
|
|
return m.unlockErr(err)
|
|
}
|
|
|
|
if dirty {
|
|
return m.unlockErr(ErrDirty{curVersion})
|
|
}
|
|
|
|
ret := make(chan interface{}, m.PrefetchMigrations)
|
|
go m.read(curVersion, int(version), ret)
|
|
|
|
return m.unlockErr(m.runMigrations(ret))
|
|
}
|
|
|
|
// Steps looks at the currently active migration version.
|
|
// It will migrate up if n > 0, and down if n < 0.
|
|
func (m *Migrate) Steps(n int) error {
|
|
if n == 0 {
|
|
return ErrNoChange
|
|
}
|
|
|
|
if err := m.lock(); err != nil {
|
|
return err
|
|
}
|
|
|
|
curVersion, dirty, err := m.databaseDrv.Version()
|
|
if err != nil {
|
|
return m.unlockErr(err)
|
|
}
|
|
|
|
if dirty {
|
|
return m.unlockErr(ErrDirty{curVersion})
|
|
}
|
|
|
|
ret := make(chan interface{}, m.PrefetchMigrations)
|
|
|
|
if n > 0 {
|
|
go m.readUp(curVersion, n, ret)
|
|
} else {
|
|
go m.readDown(curVersion, -n, ret)
|
|
}
|
|
|
|
return m.unlockErr(m.runMigrations(ret))
|
|
}
|
|
|
|
// Up looks at the currently active migration version
|
|
// and will migrate all the way up (applying all up migrations).
|
|
func (m *Migrate) Up() error {
|
|
if err := m.lock(); err != nil {
|
|
return err
|
|
}
|
|
|
|
curVersion, dirty, err := m.databaseDrv.Version()
|
|
if err != nil {
|
|
return m.unlockErr(err)
|
|
}
|
|
|
|
if dirty {
|
|
return m.unlockErr(ErrDirty{curVersion})
|
|
}
|
|
|
|
ret := make(chan interface{}, m.PrefetchMigrations)
|
|
|
|
go m.readUp(curVersion, -1, ret)
|
|
return m.unlockErr(m.runMigrations(ret))
|
|
}
|
|
|
|
// Down looks at the currently active migration version
|
|
// and will migrate all the way down (applying all down migrations).
|
|
func (m *Migrate) Down() error {
|
|
if err := m.lock(); err != nil {
|
|
return err
|
|
}
|
|
|
|
curVersion, dirty, err := m.databaseDrv.Version()
|
|
if err != nil {
|
|
return m.unlockErr(err)
|
|
}
|
|
|
|
if dirty {
|
|
return m.unlockErr(ErrDirty{curVersion})
|
|
}
|
|
|
|
ret := make(chan interface{}, m.PrefetchMigrations)
|
|
go m.readDown(curVersion, -1, ret)
|
|
return m.unlockErr(m.runMigrations(ret))
|
|
}
|
|
|
|
// Drop deletes everything in the database.
|
|
func (m *Migrate) Drop() error {
|
|
if err := m.lock(); err != nil {
|
|
return err
|
|
}
|
|
if err := m.databaseDrv.Drop(); err != nil {
|
|
return m.unlockErr(err)
|
|
}
|
|
return m.unlock()
|
|
}
|
|
|
|
// Run runs any migration provided by you against the database.
|
|
// It does not check any currently active version in database.
|
|
// Usually you don't need this function at all. Use Migrate,
|
|
// Steps, Up or Down instead.
|
|
func (m *Migrate) Run(migration ...*Migration) error {
|
|
if len(migration) == 0 {
|
|
return ErrNoChange
|
|
}
|
|
|
|
if err := m.lock(); err != nil {
|
|
return err
|
|
}
|
|
|
|
curVersion, dirty, err := m.databaseDrv.Version()
|
|
if err != nil {
|
|
return m.unlockErr(err)
|
|
}
|
|
|
|
if dirty {
|
|
return m.unlockErr(ErrDirty{curVersion})
|
|
}
|
|
|
|
ret := make(chan interface{}, m.PrefetchMigrations)
|
|
|
|
go func() {
|
|
defer close(ret)
|
|
for _, migr := range migration {
|
|
if m.PrefetchMigrations > 0 && migr.Body != nil {
|
|
m.logVerbosePrintf("Start buffering %v\n", migr.LogString())
|
|
} else {
|
|
m.logVerbosePrintf("Scheduled %v\n", migr.LogString())
|
|
}
|
|
|
|
ret <- migr
|
|
go func(migr *Migration) {
|
|
if err := migr.Buffer(); err != nil {
|
|
m.logErr(err)
|
|
}
|
|
}(migr)
|
|
}
|
|
}()
|
|
|
|
return m.unlockErr(m.runMigrations(ret))
|
|
}
|
|
|
|
// Force sets a migration version.
|
|
// It does not check any currently active version in database.
|
|
// It resets the dirty state to false.
|
|
func (m *Migrate) Force(version int) error {
|
|
if version < -1 {
|
|
return ErrInvalidVersion
|
|
}
|
|
|
|
if err := m.lock(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := m.databaseDrv.SetVersion(version, false); err != nil {
|
|
return m.unlockErr(err)
|
|
}
|
|
|
|
return m.unlock()
|
|
}
|
|
|
|
// Version returns the currently active migration version.
|
|
// If no migration has been applied, yet, it will return ErrNilVersion.
|
|
func (m *Migrate) Version() (version uint, dirty bool, err error) {
|
|
v, d, err := m.databaseDrv.Version()
|
|
if err != nil {
|
|
return 0, false, err
|
|
}
|
|
|
|
if v == database.NilVersion {
|
|
return 0, false, ErrNilVersion
|
|
}
|
|
|
|
return suint(v), d, nil
|
|
}
|
|
|
|
// read reads either up or down migrations from source `from` to `to`.
|
|
// Each migration is then written to the ret channel.
|
|
// If an error occurs during reading, that error is written to the ret channel, too.
|
|
// Once read is done reading it will close the ret channel.
|
|
func (m *Migrate) read(from int, to int, ret chan<- interface{}) {
|
|
defer close(ret)
|
|
|
|
// check if from version exists
|
|
if from >= 0 {
|
|
if err := m.versionExists(suint(from)); err != nil {
|
|
ret <- err
|
|
return
|
|
}
|
|
}
|
|
|
|
// check if to version exists
|
|
if to >= 0 {
|
|
if err := m.versionExists(suint(to)); err != nil {
|
|
ret <- err
|
|
return
|
|
}
|
|
}
|
|
|
|
// no change?
|
|
if from == to {
|
|
ret <- ErrNoChange
|
|
return
|
|
}
|
|
|
|
if from < to {
|
|
// it's going up
|
|
// apply first migration if from is nil version
|
|
if from == -1 {
|
|
firstVersion, err := m.sourceDrv.First()
|
|
if err != nil {
|
|
ret <- err
|
|
return
|
|
}
|
|
|
|
migr, err := m.newMigration(firstVersion, int(firstVersion))
|
|
if err != nil {
|
|
ret <- err
|
|
return
|
|
}
|
|
|
|
ret <- migr
|
|
go func() {
|
|
if err := migr.Buffer(); err != nil {
|
|
m.logErr(err)
|
|
}
|
|
}()
|
|
|
|
from = int(firstVersion)
|
|
}
|
|
|
|
// run until we reach target ...
|
|
for from < to {
|
|
if m.stop() {
|
|
return
|
|
}
|
|
|
|
next, err := m.sourceDrv.Next(suint(from))
|
|
if err != nil {
|
|
ret <- err
|
|
return
|
|
}
|
|
|
|
migr, err := m.newMigration(next, int(next))
|
|
if err != nil {
|
|
ret <- err
|
|
return
|
|
}
|
|
|
|
ret <- migr
|
|
go func() {
|
|
if err := migr.Buffer(); err != nil {
|
|
m.logErr(err)
|
|
}
|
|
}()
|
|
|
|
from = int(next)
|
|
}
|
|
|
|
} else {
|
|
// it's going down
|
|
// run until we reach target ...
|
|
for from > to && from >= 0 {
|
|
if m.stop() {
|
|
return
|
|
}
|
|
|
|
prev, err := m.sourceDrv.Prev(suint(from))
|
|
if errors.Is(err, os.ErrNotExist) && to == -1 {
|
|
// apply nil migration
|
|
migr, err := m.newMigration(suint(from), -1)
|
|
if err != nil {
|
|
ret <- err
|
|
return
|
|
}
|
|
ret <- migr
|
|
go func() {
|
|
if err := migr.Buffer(); err != nil {
|
|
m.logErr(err)
|
|
}
|
|
}()
|
|
|
|
return
|
|
|
|
} else if err != nil {
|
|
ret <- err
|
|
return
|
|
}
|
|
|
|
migr, err := m.newMigration(suint(from), int(prev))
|
|
if err != nil {
|
|
ret <- err
|
|
return
|
|
}
|
|
|
|
ret <- migr
|
|
go func() {
|
|
if err := migr.Buffer(); err != nil {
|
|
m.logErr(err)
|
|
}
|
|
}()
|
|
|
|
from = int(prev)
|
|
}
|
|
}
|
|
}
|
|
|
|
// readUp reads up migrations from `from` limitted by `limit`.
|
|
// limit can be -1, implying no limit and reading until there are no more migrations.
|
|
// Each migration is then written to the ret channel.
|
|
// If an error occurs during reading, that error is written to the ret channel, too.
|
|
// Once readUp is done reading it will close the ret channel.
|
|
func (m *Migrate) readUp(from int, limit int, ret chan<- interface{}) {
|
|
defer close(ret)
|
|
|
|
// check if from version exists
|
|
if from >= 0 {
|
|
if err := m.versionExists(suint(from)); err != nil {
|
|
ret <- err
|
|
return
|
|
}
|
|
}
|
|
|
|
if limit == 0 {
|
|
ret <- ErrNoChange
|
|
return
|
|
}
|
|
|
|
count := 0
|
|
for count < limit || limit == -1 {
|
|
if m.stop() {
|
|
return
|
|
}
|
|
|
|
// apply first migration if from is nil version
|
|
if from == -1 {
|
|
firstVersion, err := m.sourceDrv.First()
|
|
if err != nil {
|
|
ret <- err
|
|
return
|
|
}
|
|
|
|
migr, err := m.newMigration(firstVersion, int(firstVersion))
|
|
if err != nil {
|
|
ret <- err
|
|
return
|
|
}
|
|
|
|
ret <- migr
|
|
go func() {
|
|
if err := migr.Buffer(); err != nil {
|
|
m.logErr(err)
|
|
}
|
|
}()
|
|
from = int(firstVersion)
|
|
count++
|
|
continue
|
|
}
|
|
|
|
// apply next migration
|
|
next, err := m.sourceDrv.Next(suint(from))
|
|
if errors.Is(err, os.ErrNotExist) {
|
|
// no limit, but no migrations applied?
|
|
if limit == -1 && count == 0 {
|
|
ret <- ErrNoChange
|
|
return
|
|
}
|
|
|
|
// no limit, reached end
|
|
if limit == -1 {
|
|
return
|
|
}
|
|
|
|
// reached end, and didn't apply any migrations
|
|
if limit > 0 && count == 0 {
|
|
ret <- os.ErrNotExist
|
|
return
|
|
}
|
|
|
|
// applied less migrations than limit?
|
|
if count < limit {
|
|
ret <- ErrShortLimit{suint(limit - count)}
|
|
return
|
|
}
|
|
}
|
|
if err != nil {
|
|
ret <- err
|
|
return
|
|
}
|
|
|
|
migr, err := m.newMigration(next, int(next))
|
|
if err != nil {
|
|
ret <- err
|
|
return
|
|
}
|
|
|
|
ret <- migr
|
|
go func() {
|
|
if err := migr.Buffer(); err != nil {
|
|
m.logErr(err)
|
|
}
|
|
}()
|
|
from = int(next)
|
|
count++
|
|
}
|
|
}
|
|
|
|
// readDown reads down migrations from `from` limitted by `limit`.
|
|
// limit can be -1, implying no limit and reading until there are no more migrations.
|
|
// Each migration is then written to the ret channel.
|
|
// If an error occurs during reading, that error is written to the ret channel, too.
|
|
// Once readDown is done reading it will close the ret channel.
|
|
func (m *Migrate) readDown(from int, limit int, ret chan<- interface{}) {
|
|
defer close(ret)
|
|
|
|
// check if from version exists
|
|
if from >= 0 {
|
|
if err := m.versionExists(suint(from)); err != nil {
|
|
ret <- err
|
|
return
|
|
}
|
|
}
|
|
|
|
if limit == 0 {
|
|
ret <- ErrNoChange
|
|
return
|
|
}
|
|
|
|
// no change if already at nil version
|
|
if from == -1 && limit == -1 {
|
|
ret <- ErrNoChange
|
|
return
|
|
}
|
|
|
|
// can't go over limit if already at nil version
|
|
if from == -1 && limit > 0 {
|
|
ret <- os.ErrNotExist
|
|
return
|
|
}
|
|
|
|
count := 0
|
|
for count < limit || limit == -1 {
|
|
if m.stop() {
|
|
return
|
|
}
|
|
|
|
prev, err := m.sourceDrv.Prev(suint(from))
|
|
if errors.Is(err, os.ErrNotExist) {
|
|
// no limit or haven't reached limit, apply "first" migration
|
|
if limit == -1 || limit-count > 0 {
|
|
firstVersion, err := m.sourceDrv.First()
|
|
if err != nil {
|
|
ret <- err
|
|
return
|
|
}
|
|
|
|
migr, err := m.newMigration(firstVersion, -1)
|
|
if err != nil {
|
|
ret <- err
|
|
return
|
|
}
|
|
ret <- migr
|
|
go func() {
|
|
if err := migr.Buffer(); err != nil {
|
|
m.logErr(err)
|
|
}
|
|
}()
|
|
count++
|
|
}
|
|
|
|
if count < limit {
|
|
ret <- ErrShortLimit{suint(limit - count)}
|
|
}
|
|
return
|
|
}
|
|
if err != nil {
|
|
ret <- err
|
|
return
|
|
}
|
|
|
|
migr, err := m.newMigration(suint(from), int(prev))
|
|
if err != nil {
|
|
ret <- err
|
|
return
|
|
}
|
|
|
|
ret <- migr
|
|
go func() {
|
|
if err := migr.Buffer(); err != nil {
|
|
m.logErr(err)
|
|
}
|
|
}()
|
|
from = int(prev)
|
|
count++
|
|
}
|
|
}
|
|
|
|
// runMigrations reads *Migration and error from a channel. Any other type
|
|
// sent on this channel will result in a panic. Each migration is then
|
|
// proxied to the database driver and run against the database.
|
|
// Before running a newly received migration it will check if it's supposed
|
|
// to stop execution because it might have received a stop signal on the
|
|
// GracefulStop channel.
|
|
func (m *Migrate) runMigrations(ret <-chan interface{}) error {
|
|
for r := range ret {
|
|
|
|
if m.stop() {
|
|
return nil
|
|
}
|
|
|
|
switch r := r.(type) {
|
|
case error:
|
|
return r
|
|
|
|
case *Migration:
|
|
migr := r
|
|
|
|
// set version with dirty state
|
|
if err := m.databaseDrv.SetVersion(migr.TargetVersion, true); err != nil {
|
|
return err
|
|
}
|
|
|
|
if migr.Body != nil {
|
|
m.logVerbosePrintf("Read and execute %v\n", migr.LogString())
|
|
if err := m.databaseDrv.Run(migr.BufferedBody); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// set clean state
|
|
if err := m.databaseDrv.SetVersion(migr.TargetVersion, false); err != nil {
|
|
return err
|
|
}
|
|
|
|
endTime := time.Now()
|
|
readTime := migr.FinishedReading.Sub(migr.StartedBuffering)
|
|
runTime := endTime.Sub(migr.FinishedReading)
|
|
|
|
// log either verbose or normal
|
|
if m.Log != nil {
|
|
if m.Log.Verbose() {
|
|
m.logPrintf("Finished %v (read %v, ran %v)\n", migr.LogString(), readTime, runTime)
|
|
} else {
|
|
m.logPrintf("%v (%v)\n", migr.LogString(), readTime+runTime)
|
|
}
|
|
}
|
|
|
|
default:
|
|
return fmt.Errorf("unknown type: %T with value: %+v", r, r)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// versionExists checks the source if either the up or down migration for
|
|
// the specified migration version exists.
|
|
func (m *Migrate) versionExists(version uint) (result error) {
|
|
// try up migration first
|
|
up, _, err := m.sourceDrv.ReadUp(version)
|
|
if err == nil {
|
|
defer func() {
|
|
if errClose := up.Close(); errClose != nil {
|
|
result = multierror.Append(result, errClose)
|
|
}
|
|
}()
|
|
}
|
|
if errors.Is(err, os.ErrExist) {
|
|
return nil
|
|
} else if !errors.Is(err, os.ErrNotExist) {
|
|
return err
|
|
}
|
|
|
|
// then try down migration
|
|
down, _, err := m.sourceDrv.ReadDown(version)
|
|
if err == nil {
|
|
defer func() {
|
|
if errClose := down.Close(); errClose != nil {
|
|
result = multierror.Append(result, errClose)
|
|
}
|
|
}()
|
|
}
|
|
if errors.Is(err, os.ErrExist) {
|
|
return nil
|
|
} else if !errors.Is(err, os.ErrNotExist) {
|
|
return err
|
|
}
|
|
|
|
err = fmt.Errorf("no migration found for version %d: %w", version, err)
|
|
m.logErr(err)
|
|
return err
|
|
}
|
|
|
|
// stop returns true if no more migrations should be run against the database
|
|
// because a stop signal was received on the GracefulStop channel.
|
|
// Calls are cheap and this function is not blocking.
|
|
func (m *Migrate) stop() bool {
|
|
if m.isGracefulStop {
|
|
return true
|
|
}
|
|
|
|
select {
|
|
case <-m.GracefulStop:
|
|
m.isGracefulStop = true
|
|
return true
|
|
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// newMigration is a helper func that returns a *Migration for the
|
|
// specified version and targetVersion.
|
|
func (m *Migrate) newMigration(version uint, targetVersion int) (*Migration, error) {
|
|
var migr *Migration
|
|
|
|
if targetVersion >= int(version) {
|
|
r, identifier, err := m.sourceDrv.ReadUp(version)
|
|
if errors.Is(err, os.ErrNotExist) {
|
|
// create "empty" migration
|
|
migr, err = NewMigration(nil, "", version, targetVersion)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
} else if err != nil {
|
|
return nil, err
|
|
|
|
} else {
|
|
// create migration from up source
|
|
migr, err = NewMigration(r, identifier, version, targetVersion)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
} else {
|
|
r, identifier, err := m.sourceDrv.ReadDown(version)
|
|
if errors.Is(err, os.ErrNotExist) {
|
|
// create "empty" migration
|
|
migr, err = NewMigration(nil, "", version, targetVersion)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
} else if err != nil {
|
|
return nil, err
|
|
|
|
} else {
|
|
// create migration from down source
|
|
migr, err = NewMigration(r, identifier, version, targetVersion)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
|
|
if m.PrefetchMigrations > 0 && migr.Body != nil {
|
|
m.logVerbosePrintf("Start buffering %v\n", migr.LogString())
|
|
} else {
|
|
m.logVerbosePrintf("Scheduled %v\n", migr.LogString())
|
|
}
|
|
|
|
return migr, nil
|
|
}
|
|
|
|
// lock is a thread safe helper function to lock the database.
|
|
// It should be called as late as possible when running migrations.
|
|
func (m *Migrate) lock() error {
|
|
m.isLockedMu.Lock()
|
|
defer m.isLockedMu.Unlock()
|
|
|
|
if m.isLocked {
|
|
return ErrLocked
|
|
}
|
|
|
|
// create done channel, used in the timeout goroutine
|
|
done := make(chan bool, 1)
|
|
defer func() {
|
|
done <- true
|
|
}()
|
|
|
|
// use errchan to signal error back to this context
|
|
errchan := make(chan error, 2)
|
|
|
|
// start timeout goroutine
|
|
timeout := time.After(m.LockTimeout)
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-done:
|
|
return
|
|
case <-timeout:
|
|
errchan <- ErrLockTimeout
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
// now try to acquire the lock
|
|
go func() {
|
|
if err := m.databaseDrv.Lock(); err != nil {
|
|
errchan <- err
|
|
} else {
|
|
errchan <- nil
|
|
}
|
|
}()
|
|
|
|
// wait until we either receive ErrLockTimeout or error from Lock operation
|
|
err := <-errchan
|
|
if err == nil {
|
|
m.isLocked = true
|
|
}
|
|
return err
|
|
}
|
|
|
|
// unlock is a thread safe helper function to unlock the database.
|
|
// It should be called as early as possible when no more migrations are
|
|
// expected to be executed.
|
|
func (m *Migrate) unlock() error {
|
|
m.isLockedMu.Lock()
|
|
defer m.isLockedMu.Unlock()
|
|
|
|
if err := m.databaseDrv.Unlock(); err != nil {
|
|
// BUG: Can potentially create a deadlock. Add a timeout.
|
|
return err
|
|
}
|
|
|
|
m.isLocked = false
|
|
return nil
|
|
}
|
|
|
|
// unlockErr calls unlock and returns a combined error
|
|
// if a prevErr is not nil.
|
|
func (m *Migrate) unlockErr(prevErr error) error {
|
|
if err := m.unlock(); err != nil {
|
|
return multierror.Append(prevErr, err)
|
|
}
|
|
return prevErr
|
|
}
|
|
|
|
// logPrintf writes to m.Log if not nil
|
|
func (m *Migrate) logPrintf(format string, v ...interface{}) {
|
|
if m.Log != nil {
|
|
m.Log.Printf(format, v...)
|
|
}
|
|
}
|
|
|
|
// logVerbosePrintf writes to m.Log if not nil. Use for verbose logging output.
|
|
func (m *Migrate) logVerbosePrintf(format string, v ...interface{}) {
|
|
if m.Log != nil && m.Log.Verbose() {
|
|
m.Log.Printf(format, v...)
|
|
}
|
|
}
|
|
|
|
// logErr writes error to m.Log if not nil
|
|
func (m *Migrate) logErr(err error) {
|
|
if m.Log != nil {
|
|
m.Log.Printf("error: %v", err)
|
|
}
|
|
}
|