add EnableSignals() and DisableSignals() and roll back some api design changes

This commit is contained in:
mattes 2014-08-13 23:00:27 +02:00
parent dad5b36dbb
commit 69df0f6de7
4 changed files with 47 additions and 61 deletions

View File

@ -9,6 +9,7 @@ import (
"github.com/mattes/migrate/file"
"github.com/mattes/migrate/migrate/direction"
"strconv"
"time"
)
type Driver struct {
@ -51,6 +52,8 @@ func (driver *Driver) Migrate(f file.File, pipe chan interface{}) {
pipe <- f
time.Sleep(3 * time.Second)
tx, err := driver.db.Begin()
if err != nil {
pipe <- err

10
main.go
View File

@ -145,7 +145,7 @@ func createCmd(url, migrationsPath, name string) {
func upCmd(url, migrationsPath string) {
timerStart = time.Now()
pipe := pipep.New()
go migrate.Up(pipe, pipep.Signal(), url, migrationsPath)
go migrate.Up(pipe, url, migrationsPath)
writePipe(pipe)
printTimer()
}
@ -153,7 +153,7 @@ func upCmd(url, migrationsPath string) {
func downCmd(url, migrationsPath string) {
timerStart = time.Now()
pipe := pipep.New()
go migrate.Down(pipe, pipep.Signal(), url, migrationsPath)
go migrate.Down(pipe, url, migrationsPath)
writePipe(pipe)
printTimer()
}
@ -161,7 +161,7 @@ func downCmd(url, migrationsPath string) {
func redoCmd(url, migrationsPath string) {
timerStart = time.Now()
pipe := pipep.New()
go migrate.Redo(pipe, pipep.Signal(), url, migrationsPath)
go migrate.Redo(pipe, url, migrationsPath)
writePipe(pipe)
printTimer()
}
@ -169,7 +169,7 @@ func redoCmd(url, migrationsPath string) {
func resetCmd(url, migrationsPath string) {
timerStart = time.Now()
pipe := pipep.New()
go migrate.Reset(pipe, pipep.Signal(), url, migrationsPath)
go migrate.Reset(pipe, url, migrationsPath)
writePipe(pipe)
printTimer()
}
@ -177,7 +177,7 @@ func resetCmd(url, migrationsPath string) {
func migrateCmd(url, migrationsPath string, relativeN int) {
timerStart = time.Now()
pipe := pipep.New()
go migrate.Migrate(pipe, pipep.Signal(), url, migrationsPath, relativeN)
go migrate.Migrate(pipe, url, migrationsPath, relativeN)
writePipe(pipe)
printTimer()
}

View File

@ -10,42 +10,13 @@ import (
pipep "github.com/mattes/migrate/pipe"
"io/ioutil"
"os"
"os/signal"
"path"
"strconv"
"strings"
)
// // Up applies all available migrations
// func Up(pipe chan interface{}, url, migrationsPath string) {
// d, files, version, err := initDriverAndReadMigrationFilesAndGetVersion(url, migrationsPath)
// if err != nil {
// go pipep.Close(pipe, err)
// return
// }
// applyMigrationFiles, err := files.ToLastFrom(version)
// if err != nil {
// go pipep.Close(pipe, err)
// return
// }
// if len(applyMigrationFiles) > 0 {
// for _, f := range applyMigrationFiles {
// pipe1 := pipep.New()
// go d.Migrate(f, pipe1)
// if ok := pipep.WaitAndRedirect(pipe1, pipe); !ok {
// break
// }
// }
// go pipep.Close(pipe, nil)
// return
// } else {
// go pipep.Close(pipe, nil)
// return
// }
// }
func Up(pipe chan interface{}, signal chan os.Signal, url, migrationsPath string) {
func Up(pipe chan interface{}, url, migrationsPath string) {
d, files, version, err := initDriverAndReadMigrationFilesAndGetVersion(url, migrationsPath)
if err != nil {
go pipep.Close(pipe, err)
@ -62,7 +33,7 @@ func Up(pipe chan interface{}, signal chan os.Signal, url, migrationsPath string
for _, f := range applyMigrationFiles {
pipe1 := pipep.New()
go d.Migrate(f, pipe1)
if ok := pipep.WaitAndRedirect(pipe1, pipe, signal); !ok {
if ok := pipep.WaitAndRedirect(pipe1, pipe, handleSignal()); !ok {
break
}
}
@ -77,13 +48,13 @@ func Up(pipe chan interface{}, signal chan os.Signal, url, migrationsPath string
// UpSync is synchronous version of Up
func UpSync(url, migrationsPath string) (err []error, ok bool) {
pipe := pipep.New()
go Up(pipe, pipep.Signal(), url, migrationsPath)
go Up(pipe, url, migrationsPath)
err = pipep.ReadErrors(pipe)
return err, len(err) == 0
}
// Down rolls back all migrations
func Down(pipe chan interface{}, signal chan os.Signal, url, migrationsPath string) {
func Down(pipe chan interface{}, url, migrationsPath string) {
d, files, version, err := initDriverAndReadMigrationFilesAndGetVersion(url, migrationsPath)
if err != nil {
go pipep.Close(pipe, err)
@ -100,7 +71,7 @@ func Down(pipe chan interface{}, signal chan os.Signal, url, migrationsPath stri
for _, f := range applyMigrationFiles {
pipe1 := pipep.New()
go d.Migrate(f, pipe1)
if ok := pipep.WaitAndRedirect(pipe1, pipe, signal); !ok {
if ok := pipep.WaitAndRedirect(pipe1, pipe, handleSignal()); !ok {
break
}
}
@ -115,53 +86,53 @@ func Down(pipe chan interface{}, signal chan os.Signal, url, migrationsPath stri
// DownSync is synchronous version of Down
func DownSync(url, migrationsPath string) (err []error, ok bool) {
pipe := pipep.New()
go Down(pipe, pipep.Signal(), url, migrationsPath)
go Down(pipe, url, migrationsPath)
err = pipep.ReadErrors(pipe)
return err, len(err) == 0
}
// Redo rolls back the most recently applied migration, then runs it again.
func Redo(pipe chan interface{}, signal chan os.Signal, url, migrationsPath string) {
func Redo(pipe chan interface{}, url, migrationsPath string) {
pipe1 := pipep.New()
go Migrate(pipe1, signal, url, migrationsPath, -1)
if ok := pipep.WaitAndRedirect(pipe1, pipe, signal); !ok {
go Migrate(pipe1, url, migrationsPath, -1)
if ok := pipep.WaitAndRedirect(pipe1, pipe, handleSignal()); !ok {
go pipep.Close(pipe, nil)
return
} else {
go Migrate(pipe, pipep.Signal(), url, migrationsPath, +1)
go Migrate(pipe, url, migrationsPath, +1)
}
}
// RedoSync is synchronous version of Redo
func RedoSync(url, migrationsPath string) (err []error, ok bool) {
pipe := pipep.New()
go Redo(pipe, pipep.Signal(), url, migrationsPath)
go Redo(pipe, url, migrationsPath)
err = pipep.ReadErrors(pipe)
return err, len(err) == 0
}
// Reset runs the down and up migration function
func Reset(pipe chan interface{}, signal chan os.Signal, url, migrationsPath string) {
func Reset(pipe chan interface{}, url, migrationsPath string) {
pipe1 := pipep.New()
go Down(pipe1, signal, url, migrationsPath)
if ok := pipep.WaitAndRedirect(pipe1, pipe, signal); !ok {
go Down(pipe1, url, migrationsPath)
if ok := pipep.WaitAndRedirect(pipe1, pipe, handleSignal()); !ok {
go pipep.Close(pipe, nil)
return
} else {
go Up(pipe, signal, url, migrationsPath)
go Up(pipe, url, migrationsPath)
}
}
// ResetSync is synchronous version of Reset
func ResetSync(url, migrationsPath string) (err []error, ok bool) {
pipe := pipep.New()
go Reset(pipe, pipep.Signal(), url, migrationsPath)
go Reset(pipe, url, migrationsPath)
err = pipep.ReadErrors(pipe)
return err, len(err) == 0
}
// Migrate applies relative +n/-n migrations
func Migrate(pipe chan interface{}, signal chan os.Signal, url, migrationsPath string, relativeN int) {
func Migrate(pipe chan interface{}, url, migrationsPath string, relativeN int) {
d, files, version, err := initDriverAndReadMigrationFilesAndGetVersion(url, migrationsPath)
if err != nil {
go pipep.Close(pipe, err)
@ -178,7 +149,7 @@ func Migrate(pipe chan interface{}, signal chan os.Signal, url, migrationsPath s
for _, f := range applyMigrationFiles {
pipe1 := pipep.New()
go d.Migrate(f, pipe1)
if ok := pipep.WaitAndRedirect(pipe1, pipe, signal); !ok {
if ok := pipep.WaitAndRedirect(pipe1, pipe, handleSignal()); !ok {
break
}
}
@ -192,7 +163,7 @@ func Migrate(pipe chan interface{}, signal chan os.Signal, url, migrationsPath s
// MigrateSync is synchronous version of Migrate
func MigrateSync(url, migrationsPath string, relativeN int) (err []error, ok bool) {
pipe := pipep.New()
go Migrate(pipe, pipep.Signal(), url, migrationsPath, relativeN)
go Migrate(pipe, url, migrationsPath, relativeN)
err = pipep.ReadErrors(pipe)
return err, len(err) == 0
}
@ -284,3 +255,22 @@ func initDriverAndReadMigrationFilesAndGetVersion(url, migrationsPath string) (d
func NewPipe() chan interface{} {
return pipep.New()
}
var handleSignals = true
func EnableSignals() {
handleSignals = true
}
func DisableSignals() {
handleSignals = false
}
func handleSignal() chan os.Signal {
if handleSignals {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
return c
}
return nil
}

View File

@ -3,7 +3,6 @@ package pipe
import (
"os"
"os/signal"
)
// New creates a new pipe. A pipe is basically a channel.
@ -19,12 +18,6 @@ func Close(pipe chan interface{}, err error) {
close(pipe)
}
func Signal() chan os.Signal {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
return c
}
func WaitAndRedirect(pipe, redirectPipe chan interface{}, signal chan os.Signal) (ok bool) {
errorReceived := false
signalsReceived := 0