pass file instead of files to migrate and implement soft shutdown

This commit is contained in:
mattes 2014-08-13 20:06:38 +02:00
parent cb82ec44c0
commit dad5b36dbb
7 changed files with 170 additions and 94 deletions

View File

@ -17,7 +17,7 @@ func (driver *Driver) FilenameExtension() string {
return "sh"
}
func (driver *Driver) Migrate(files file.Files, pipe chan interface{}) {
func (driver *Driver) Migrate(f file.File, pipe chan interface{}) {
defer close(pipe)
return
}

View File

@ -23,14 +23,10 @@ type Driver interface {
FilenameExtension() string
// Migrate is the heart of the driver.
// It will receive a slice of files which the driver should apply
// It will receive a file which the driver should apply
// to its backend or whatever. The migration function should use
// the pipe channel to return any errors or other useful information.
//
// Please note that the order of the migration files is already
// sorted: Ascending for UP migration files, and descending for
// DOWN migration files.
Migrate(files file.Files, pipe chan interface{})
Migrate(file file.File, pipe chan interface{})
// Version returns the current migration version.
Version() (uint64, error)

View File

@ -46,62 +46,61 @@ func (driver *Driver) FilenameExtension() string {
return "sql"
}
func (driver *Driver) Migrate(files file.Files, pipe chan interface{}) {
func (driver *Driver) Migrate(f file.File, pipe chan interface{}) {
defer close(pipe)
for _, f := range files {
pipe <- f
tx, err := driver.db.Begin()
if err != nil {
pipe <- f
tx, err := driver.db.Begin()
if err != nil {
pipe <- err
return
}
if f.Direction == direction.Up {
if _, err := tx.Exec(`INSERT INTO `+tableName+` (version) VALUES ($1)`, f.Version); err != nil {
pipe <- err
return
}
if f.Direction == direction.Up {
if _, err := tx.Exec(`INSERT INTO `+tableName+` (version) VALUES ($1)`, f.Version); err != nil {
pipe <- err
if err := tx.Rollback(); err != nil {
pipe <- err
}
return
}
} else if f.Direction == direction.Down {
if _, err := tx.Exec(`DELETE FROM `+tableName+` WHERE version=$1`, f.Version); err != nil {
pipe <- err
if err := tx.Rollback(); err != nil {
pipe <- err
}
return
}
}
if err := f.ReadContent(); err != nil {
pipe <- err
return
}
if _, err := tx.Exec(string(f.Content)); err != nil {
pqErr := err.(*pq.Error)
offset, err := strconv.Atoi(pqErr.Position)
if err == nil && offset >= 0 {
lineNo, columnNo := file.LineColumnFromOffset(f.Content, offset-1)
errorPart := file.LinesBeforeAndAfter(f.Content, lineNo, 5, 5, true)
pipe <- errors.New(fmt.Sprintf("%s %v: %s in line %v, column %v:\n\n%s", pqErr.Severity, pqErr.Code, pqErr.Message, lineNo, columnNo, string(errorPart)))
} else {
pipe <- errors.New(fmt.Sprintf("%s %v: %s", pqErr.Severity, pqErr.Code, pqErr.Message))
}
if err := tx.Rollback(); err != nil {
pipe <- err
}
return
}
if err := tx.Commit(); err != nil {
} else if f.Direction == direction.Down {
if _, err := tx.Exec(`DELETE FROM `+tableName+` WHERE version=$1`, f.Version); err != nil {
pipe <- err
if err := tx.Rollback(); err != nil {
pipe <- err
}
return
}
}
if err := f.ReadContent(); err != nil {
pipe <- err
return
}
if _, err := tx.Exec(string(f.Content)); err != nil {
pqErr := err.(*pq.Error)
offset, err := strconv.Atoi(pqErr.Position)
if err == nil && offset >= 0 {
lineNo, columnNo := file.LineColumnFromOffset(f.Content, offset-1)
errorPart := file.LinesBeforeAndAfter(f.Content, lineNo, 5, 5, true)
pipe <- errors.New(fmt.Sprintf("%s %v: %s in line %v, column %v:\n\n%s", pqErr.Severity, pqErr.Code, pqErr.Message, lineNo, columnNo, string(errorPart)))
} else {
pipe <- errors.New(fmt.Sprintf("%s %v: %s", pqErr.Severity, pqErr.Code, pqErr.Message))
}
if err := tx.Rollback(); err != nil {
pipe <- err
}
return
}
if err := tx.Commit(); err != nil {
pipe <- err
return
}
}
func (driver *Driver) Version() (uint64, error) {

View File

@ -67,21 +67,21 @@ func TestMigrate(t *testing.T) {
}
pipe := pipep.New()
go d.Migrate(file.Files{files[0]}, pipe)
go d.Migrate(files[0], pipe)
errs := pipep.ReadErrors(pipe)
if len(errs) > 0 {
t.Fatal(errs)
}
pipe = pipep.New()
go d.Migrate(file.Files{files[1]}, pipe)
go d.Migrate(files[1], pipe)
errs = pipep.ReadErrors(pipe)
if len(errs) > 0 {
t.Fatal(errs)
}
pipe = pipep.New()
go d.Migrate(file.Files{files[2]}, pipe)
go d.Migrate(files[2], pipe)
errs = pipep.ReadErrors(pipe)
if len(errs) == 0 {
t.Error("Expected test case to fail")

16
main.go
View File

@ -12,6 +12,7 @@ import (
"github.com/mattes/migrate/migrate/direction"
pipep "github.com/mattes/migrate/pipe"
"os"
// "os/signal"
"strconv"
"time"
)
@ -105,9 +106,12 @@ func writePipe(pipe chan interface{}) {
}
fmt.Printf(" %s\n", f.FileName)
case os.Signal:
fmt.Println("signal", item)
default:
text := fmt.Sprint(item)
fmt.Println(text)
fmt.Println("TEXT", text)
}
}
}
@ -141,7 +145,7 @@ func createCmd(url, migrationsPath, name string) {
func upCmd(url, migrationsPath string) {
timerStart = time.Now()
pipe := pipep.New()
go migrate.Up(pipe, url, migrationsPath)
go migrate.Up(pipe, pipep.Signal(), url, migrationsPath)
writePipe(pipe)
printTimer()
}
@ -149,7 +153,7 @@ func upCmd(url, migrationsPath string) {
func downCmd(url, migrationsPath string) {
timerStart = time.Now()
pipe := pipep.New()
go migrate.Down(pipe, url, migrationsPath)
go migrate.Down(pipe, pipep.Signal(), url, migrationsPath)
writePipe(pipe)
printTimer()
}
@ -157,7 +161,7 @@ func downCmd(url, migrationsPath string) {
func redoCmd(url, migrationsPath string) {
timerStart = time.Now()
pipe := pipep.New()
go migrate.Redo(pipe, url, migrationsPath)
go migrate.Redo(pipe, pipep.Signal(), url, migrationsPath)
writePipe(pipe)
printTimer()
}
@ -165,7 +169,7 @@ func redoCmd(url, migrationsPath string) {
func resetCmd(url, migrationsPath string) {
timerStart = time.Now()
pipe := pipep.New()
go migrate.Reset(pipe, url, migrationsPath)
go migrate.Reset(pipe, pipep.Signal(), url, migrationsPath)
writePipe(pipe)
printTimer()
}
@ -173,7 +177,7 @@ func resetCmd(url, migrationsPath string) {
func migrateCmd(url, migrationsPath string, relativeN int) {
timerStart = time.Now()
pipe := pipep.New()
go migrate.Migrate(pipe, url, migrationsPath, relativeN)
go migrate.Migrate(pipe, pipep.Signal(), url, migrationsPath, relativeN)
writePipe(pipe)
printTimer()
}

View File

@ -9,13 +9,43 @@ import (
"github.com/mattes/migrate/migrate/direction"
pipep "github.com/mattes/migrate/pipe"
"io/ioutil"
"os"
"path"
"strconv"
"strings"
)
// Up applies all available migrations
func Up(pipe chan interface{}, url, migrationsPath string) {
// // Up applies all available migrations
// func Up(pipe chan interface{}, url, migrationsPath string) {
// d, files, version, err := initDriverAndReadMigrationFilesAndGetVersion(url, migrationsPath)
// if err != nil {
// go pipep.Close(pipe, err)
// return
// }
// applyMigrationFiles, err := files.ToLastFrom(version)
// if err != nil {
// go pipep.Close(pipe, err)
// return
// }
// if len(applyMigrationFiles) > 0 {
// for _, f := range applyMigrationFiles {
// pipe1 := pipep.New()
// go d.Migrate(f, pipe1)
// if ok := pipep.WaitAndRedirect(pipe1, pipe); !ok {
// break
// }
// }
// go pipep.Close(pipe, nil)
// return
// } else {
// go pipep.Close(pipe, nil)
// return
// }
// }
func Up(pipe chan interface{}, signal chan os.Signal, url, migrationsPath string) {
d, files, version, err := initDriverAndReadMigrationFilesAndGetVersion(url, migrationsPath)
if err != nil {
go pipep.Close(pipe, err)
@ -29,7 +59,14 @@ func Up(pipe chan interface{}, url, migrationsPath string) {
}
if len(applyMigrationFiles) > 0 {
go d.Migrate(applyMigrationFiles, pipe)
for _, f := range applyMigrationFiles {
pipe1 := pipep.New()
go d.Migrate(f, pipe1)
if ok := pipep.WaitAndRedirect(pipe1, pipe, signal); !ok {
break
}
}
go pipep.Close(pipe, nil)
return
} else {
go pipep.Close(pipe, nil)
@ -40,13 +77,13 @@ func Up(pipe chan interface{}, url, migrationsPath string) {
// UpSync is synchronous version of Up
func UpSync(url, migrationsPath string) (err []error, ok bool) {
pipe := pipep.New()
go Up(pipe, url, migrationsPath)
go Up(pipe, pipep.Signal(), url, migrationsPath)
err = pipep.ReadErrors(pipe)
return err, len(err) == 0
}
// Down rolls back all migrations
func Down(pipe chan interface{}, url, migrationsPath string) {
func Down(pipe chan interface{}, signal chan os.Signal, url, migrationsPath string) {
d, files, version, err := initDriverAndReadMigrationFilesAndGetVersion(url, migrationsPath)
if err != nil {
go pipep.Close(pipe, err)
@ -60,7 +97,14 @@ func Down(pipe chan interface{}, url, migrationsPath string) {
}
if len(applyMigrationFiles) > 0 {
go d.Migrate(applyMigrationFiles, pipe)
for _, f := range applyMigrationFiles {
pipe1 := pipep.New()
go d.Migrate(f, pipe1)
if ok := pipep.WaitAndRedirect(pipe1, pipe, signal); !ok {
break
}
}
go pipep.Close(pipe, nil)
return
} else {
go pipep.Close(pipe, nil)
@ -71,45 +115,53 @@ func Down(pipe chan interface{}, url, migrationsPath string) {
// DownSync is synchronous version of Down
func DownSync(url, migrationsPath string) (err []error, ok bool) {
pipe := pipep.New()
go Down(pipe, url, migrationsPath)
go Down(pipe, pipep.Signal(), url, migrationsPath)
err = pipep.ReadErrors(pipe)
return err, len(err) == 0
}
// Redo rolls back the most recently applied migration, then runs it again.
func Redo(pipe chan interface{}, url, migrationsPath string) {
func Redo(pipe chan interface{}, signal chan os.Signal, url, migrationsPath string) {
pipe1 := pipep.New()
go Migrate(pipe1, url, migrationsPath, -1)
pipep.WaitAndRedirect(pipe1, pipe)
go Migrate(pipe, url, migrationsPath, +1)
go Migrate(pipe1, signal, url, migrationsPath, -1)
if ok := pipep.WaitAndRedirect(pipe1, pipe, signal); !ok {
go pipep.Close(pipe, nil)
return
} else {
go Migrate(pipe, pipep.Signal(), url, migrationsPath, +1)
}
}
// RedoSync is synchronous version of Redo
func RedoSync(url, migrationsPath string) (err []error, ok bool) {
pipe := pipep.New()
go Redo(pipe, url, migrationsPath)
go Redo(pipe, pipep.Signal(), url, migrationsPath)
err = pipep.ReadErrors(pipe)
return err, len(err) == 0
}
// Reset runs the down and up migration function
func Reset(pipe chan interface{}, url, migrationsPath string) {
func Reset(pipe chan interface{}, signal chan os.Signal, url, migrationsPath string) {
pipe1 := pipep.New()
go Down(pipe1, url, migrationsPath)
pipep.WaitAndRedirect(pipe1, pipe)
go Up(pipe, url, migrationsPath)
go Down(pipe1, signal, url, migrationsPath)
if ok := pipep.WaitAndRedirect(pipe1, pipe, signal); !ok {
go pipep.Close(pipe, nil)
return
} else {
go Up(pipe, signal, url, migrationsPath)
}
}
// ResetSync is synchronous version of Reset
func ResetSync(url, migrationsPath string) (err []error, ok bool) {
pipe := pipep.New()
go Reset(pipe, url, migrationsPath)
go Reset(pipe, pipep.Signal(), url, migrationsPath)
err = pipep.ReadErrors(pipe)
return err, len(err) == 0
}
// Migrate applies relative +n/-n migrations
func Migrate(pipe chan interface{}, url, migrationsPath string, relativeN int) {
func Migrate(pipe chan interface{}, signal chan os.Signal, url, migrationsPath string, relativeN int) {
d, files, version, err := initDriverAndReadMigrationFilesAndGetVersion(url, migrationsPath)
if err != nil {
go pipep.Close(pipe, err)
@ -121,17 +173,17 @@ func Migrate(pipe chan interface{}, url, migrationsPath string, relativeN int) {
go pipep.Close(pipe, err)
return
}
if len(applyMigrationFiles) > 0 {
if relativeN > 0 {
go d.Migrate(applyMigrationFiles, pipe)
return
} else if relativeN < 0 {
go d.Migrate(applyMigrationFiles, pipe)
return
} else {
go pipep.Close(pipe, nil)
return
if len(applyMigrationFiles) > 0 && relativeN != 0 {
for _, f := range applyMigrationFiles {
pipe1 := pipep.New()
go d.Migrate(f, pipe1)
if ok := pipep.WaitAndRedirect(pipe1, pipe, signal); !ok {
break
}
}
go pipep.Close(pipe, nil)
return
}
go pipep.Close(pipe, nil)
return
@ -140,7 +192,7 @@ func Migrate(pipe chan interface{}, url, migrationsPath string, relativeN int) {
// MigrateSync is synchronous version of Migrate
func MigrateSync(url, migrationsPath string, relativeN int) (err []error, ok bool) {
pipe := pipep.New()
go Migrate(pipe, url, migrationsPath, relativeN)
go Migrate(pipe, pipep.Signal(), url, migrationsPath, relativeN)
err = pipep.ReadErrors(pipe)
return err, len(err) == 0
}

View File

@ -1,6 +1,11 @@
// Package pipe has functions for pipe channel handling.
package pipe
import (
"os"
"os/signal"
)
// New creates a new pipe. A pipe is basically a channel.
func New() chan interface{} {
return make(chan interface{}, 0)
@ -14,22 +19,42 @@ func Close(pipe chan interface{}, err error) {
close(pipe)
}
// WaitAndRedirect waits for pipe to be closed and
// redirects all messages from pipe to redirectPipe
// while it waits.
func WaitAndRedirect(pipe, redirectPipe chan interface{}) {
func Signal() chan os.Signal {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
return c
}
func WaitAndRedirect(pipe, redirectPipe chan interface{}, signal chan os.Signal) (ok bool) {
errorReceived := false
signalsReceived := 0
if pipe != nil && redirectPipe != nil {
for {
select {
case <-signal:
signalsReceived += 1
if signalsReceived > 1 {
os.Exit(5)
} else {
redirectPipe <- " Aborting after this migration ..."
}
case item, ok := <-pipe:
if !ok {
return
return !errorReceived && signalsReceived == 0
} else {
redirectPipe <- item
switch item.(type) {
case error:
errorReceived = true
}
}
}
}
}
return !errorReceived && signalsReceived == 0
}
// ReadErrors selects all received errors and returns them.