diff --git a/driver/bash/bash.go b/driver/bash/bash.go index 928d76e..b0b6c6d 100644 --- a/driver/bash/bash.go +++ b/driver/bash/bash.go @@ -17,7 +17,7 @@ func (driver *Driver) FilenameExtension() string { return "sh" } -func (driver *Driver) Migrate(files file.Files, pipe chan interface{}) { +func (driver *Driver) Migrate(f file.File, pipe chan interface{}) { defer close(pipe) return } diff --git a/driver/driver.go b/driver/driver.go index ebaa86d..c92ea29 100644 --- a/driver/driver.go +++ b/driver/driver.go @@ -23,14 +23,10 @@ type Driver interface { FilenameExtension() string // Migrate is the heart of the driver. - // It will receive a slice of files which the driver should apply + // It will receive a file which the driver should apply // to its backend or whatever. The migration function should use // the pipe channel to return any errors or other useful information. - // - // Please note that the order of the migration files is already - // sorted: Ascending for UP migration files, and descending for - // DOWN migration files. - Migrate(files file.Files, pipe chan interface{}) + Migrate(file file.File, pipe chan interface{}) // Version returns the current migration version. Version() (uint64, error) diff --git a/driver/postgres/postgres.go b/driver/postgres/postgres.go index c7553a0..6849509 100644 --- a/driver/postgres/postgres.go +++ b/driver/postgres/postgres.go @@ -46,62 +46,61 @@ func (driver *Driver) FilenameExtension() string { return "sql" } -func (driver *Driver) Migrate(files file.Files, pipe chan interface{}) { +func (driver *Driver) Migrate(f file.File, pipe chan interface{}) { defer close(pipe) - for _, f := range files { - pipe <- f - tx, err := driver.db.Begin() - if err != nil { + pipe <- f + + tx, err := driver.db.Begin() + if err != nil { + pipe <- err + return + } + + if f.Direction == direction.Up { + if _, err := tx.Exec(`INSERT INTO `+tableName+` (version) VALUES ($1)`, f.Version); err != nil { pipe <- err - return - } - - if f.Direction == direction.Up { - if _, err := tx.Exec(`INSERT INTO `+tableName+` (version) VALUES ($1)`, f.Version); err != nil { - pipe <- err - if err := tx.Rollback(); err != nil { - pipe <- err - } - return - } - } else if f.Direction == direction.Down { - if _, err := tx.Exec(`DELETE FROM `+tableName+` WHERE version=$1`, f.Version); err != nil { - pipe <- err - if err := tx.Rollback(); err != nil { - pipe <- err - } - return - } - } - - if err := f.ReadContent(); err != nil { - pipe <- err - return - } - - if _, err := tx.Exec(string(f.Content)); err != nil { - pqErr := err.(*pq.Error) - offset, err := strconv.Atoi(pqErr.Position) - if err == nil && offset >= 0 { - lineNo, columnNo := file.LineColumnFromOffset(f.Content, offset-1) - errorPart := file.LinesBeforeAndAfter(f.Content, lineNo, 5, 5, true) - pipe <- errors.New(fmt.Sprintf("%s %v: %s in line %v, column %v:\n\n%s", pqErr.Severity, pqErr.Code, pqErr.Message, lineNo, columnNo, string(errorPart))) - } else { - pipe <- errors.New(fmt.Sprintf("%s %v: %s", pqErr.Severity, pqErr.Code, pqErr.Message)) - } - if err := tx.Rollback(); err != nil { pipe <- err } return } - - if err := tx.Commit(); err != nil { + } else if f.Direction == direction.Down { + if _, err := tx.Exec(`DELETE FROM `+tableName+` WHERE version=$1`, f.Version); err != nil { pipe <- err + if err := tx.Rollback(); err != nil { + pipe <- err + } return } } + + if err := f.ReadContent(); err != nil { + pipe <- err + return + } + + if _, err := tx.Exec(string(f.Content)); err != nil { + pqErr := err.(*pq.Error) + offset, err := strconv.Atoi(pqErr.Position) + if err == nil && offset >= 0 { + lineNo, columnNo := file.LineColumnFromOffset(f.Content, offset-1) + errorPart := file.LinesBeforeAndAfter(f.Content, lineNo, 5, 5, true) + pipe <- errors.New(fmt.Sprintf("%s %v: %s in line %v, column %v:\n\n%s", pqErr.Severity, pqErr.Code, pqErr.Message, lineNo, columnNo, string(errorPart))) + } else { + pipe <- errors.New(fmt.Sprintf("%s %v: %s", pqErr.Severity, pqErr.Code, pqErr.Message)) + } + + if err := tx.Rollback(); err != nil { + pipe <- err + } + return + } + + if err := tx.Commit(); err != nil { + pipe <- err + return + } } func (driver *Driver) Version() (uint64, error) { diff --git a/driver/postgres/postgres_test.go b/driver/postgres/postgres_test.go index e099b0b..54487bf 100644 --- a/driver/postgres/postgres_test.go +++ b/driver/postgres/postgres_test.go @@ -67,21 +67,21 @@ func TestMigrate(t *testing.T) { } pipe := pipep.New() - go d.Migrate(file.Files{files[0]}, pipe) + go d.Migrate(files[0], pipe) errs := pipep.ReadErrors(pipe) if len(errs) > 0 { t.Fatal(errs) } pipe = pipep.New() - go d.Migrate(file.Files{files[1]}, pipe) + go d.Migrate(files[1], pipe) errs = pipep.ReadErrors(pipe) if len(errs) > 0 { t.Fatal(errs) } pipe = pipep.New() - go d.Migrate(file.Files{files[2]}, pipe) + go d.Migrate(files[2], pipe) errs = pipep.ReadErrors(pipe) if len(errs) == 0 { t.Error("Expected test case to fail") diff --git a/main.go b/main.go index 8ab0f0a..eff1807 100644 --- a/main.go +++ b/main.go @@ -12,6 +12,7 @@ import ( "github.com/mattes/migrate/migrate/direction" pipep "github.com/mattes/migrate/pipe" "os" + // "os/signal" "strconv" "time" ) @@ -105,9 +106,12 @@ func writePipe(pipe chan interface{}) { } fmt.Printf(" %s\n", f.FileName) + case os.Signal: + fmt.Println("signal", item) + default: text := fmt.Sprint(item) - fmt.Println(text) + fmt.Println("TEXT", text) } } } @@ -141,7 +145,7 @@ func createCmd(url, migrationsPath, name string) { func upCmd(url, migrationsPath string) { timerStart = time.Now() pipe := pipep.New() - go migrate.Up(pipe, url, migrationsPath) + go migrate.Up(pipe, pipep.Signal(), url, migrationsPath) writePipe(pipe) printTimer() } @@ -149,7 +153,7 @@ func upCmd(url, migrationsPath string) { func downCmd(url, migrationsPath string) { timerStart = time.Now() pipe := pipep.New() - go migrate.Down(pipe, url, migrationsPath) + go migrate.Down(pipe, pipep.Signal(), url, migrationsPath) writePipe(pipe) printTimer() } @@ -157,7 +161,7 @@ func downCmd(url, migrationsPath string) { func redoCmd(url, migrationsPath string) { timerStart = time.Now() pipe := pipep.New() - go migrate.Redo(pipe, url, migrationsPath) + go migrate.Redo(pipe, pipep.Signal(), url, migrationsPath) writePipe(pipe) printTimer() } @@ -165,7 +169,7 @@ func redoCmd(url, migrationsPath string) { func resetCmd(url, migrationsPath string) { timerStart = time.Now() pipe := pipep.New() - go migrate.Reset(pipe, url, migrationsPath) + go migrate.Reset(pipe, pipep.Signal(), url, migrationsPath) writePipe(pipe) printTimer() } @@ -173,7 +177,7 @@ func resetCmd(url, migrationsPath string) { func migrateCmd(url, migrationsPath string, relativeN int) { timerStart = time.Now() pipe := pipep.New() - go migrate.Migrate(pipe, url, migrationsPath, relativeN) + go migrate.Migrate(pipe, pipep.Signal(), url, migrationsPath, relativeN) writePipe(pipe) printTimer() } diff --git a/migrate/migrate.go b/migrate/migrate.go index 9f5ca86..5b6dfed 100644 --- a/migrate/migrate.go +++ b/migrate/migrate.go @@ -9,13 +9,43 @@ import ( "github.com/mattes/migrate/migrate/direction" pipep "github.com/mattes/migrate/pipe" "io/ioutil" + "os" "path" "strconv" "strings" ) -// Up applies all available migrations -func Up(pipe chan interface{}, url, migrationsPath string) { +// // Up applies all available migrations +// func Up(pipe chan interface{}, url, migrationsPath string) { +// d, files, version, err := initDriverAndReadMigrationFilesAndGetVersion(url, migrationsPath) +// if err != nil { +// go pipep.Close(pipe, err) +// return +// } + +// applyMigrationFiles, err := files.ToLastFrom(version) +// if err != nil { +// go pipep.Close(pipe, err) +// return +// } + +// if len(applyMigrationFiles) > 0 { +// for _, f := range applyMigrationFiles { +// pipe1 := pipep.New() +// go d.Migrate(f, pipe1) +// if ok := pipep.WaitAndRedirect(pipe1, pipe); !ok { +// break +// } +// } +// go pipep.Close(pipe, nil) +// return +// } else { +// go pipep.Close(pipe, nil) +// return +// } +// } + +func Up(pipe chan interface{}, signal chan os.Signal, url, migrationsPath string) { d, files, version, err := initDriverAndReadMigrationFilesAndGetVersion(url, migrationsPath) if err != nil { go pipep.Close(pipe, err) @@ -29,7 +59,14 @@ func Up(pipe chan interface{}, url, migrationsPath string) { } if len(applyMigrationFiles) > 0 { - go d.Migrate(applyMigrationFiles, pipe) + for _, f := range applyMigrationFiles { + pipe1 := pipep.New() + go d.Migrate(f, pipe1) + if ok := pipep.WaitAndRedirect(pipe1, pipe, signal); !ok { + break + } + } + go pipep.Close(pipe, nil) return } else { go pipep.Close(pipe, nil) @@ -40,13 +77,13 @@ func Up(pipe chan interface{}, url, migrationsPath string) { // UpSync is synchronous version of Up func UpSync(url, migrationsPath string) (err []error, ok bool) { pipe := pipep.New() - go Up(pipe, url, migrationsPath) + go Up(pipe, pipep.Signal(), url, migrationsPath) err = pipep.ReadErrors(pipe) return err, len(err) == 0 } // Down rolls back all migrations -func Down(pipe chan interface{}, url, migrationsPath string) { +func Down(pipe chan interface{}, signal chan os.Signal, url, migrationsPath string) { d, files, version, err := initDriverAndReadMigrationFilesAndGetVersion(url, migrationsPath) if err != nil { go pipep.Close(pipe, err) @@ -60,7 +97,14 @@ func Down(pipe chan interface{}, url, migrationsPath string) { } if len(applyMigrationFiles) > 0 { - go d.Migrate(applyMigrationFiles, pipe) + for _, f := range applyMigrationFiles { + pipe1 := pipep.New() + go d.Migrate(f, pipe1) + if ok := pipep.WaitAndRedirect(pipe1, pipe, signal); !ok { + break + } + } + go pipep.Close(pipe, nil) return } else { go pipep.Close(pipe, nil) @@ -71,45 +115,53 @@ func Down(pipe chan interface{}, url, migrationsPath string) { // DownSync is synchronous version of Down func DownSync(url, migrationsPath string) (err []error, ok bool) { pipe := pipep.New() - go Down(pipe, url, migrationsPath) + go Down(pipe, pipep.Signal(), url, migrationsPath) err = pipep.ReadErrors(pipe) return err, len(err) == 0 } // Redo rolls back the most recently applied migration, then runs it again. -func Redo(pipe chan interface{}, url, migrationsPath string) { +func Redo(pipe chan interface{}, signal chan os.Signal, url, migrationsPath string) { pipe1 := pipep.New() - go Migrate(pipe1, url, migrationsPath, -1) - pipep.WaitAndRedirect(pipe1, pipe) - go Migrate(pipe, url, migrationsPath, +1) + go Migrate(pipe1, signal, url, migrationsPath, -1) + if ok := pipep.WaitAndRedirect(pipe1, pipe, signal); !ok { + go pipep.Close(pipe, nil) + return + } else { + go Migrate(pipe, pipep.Signal(), url, migrationsPath, +1) + } } // RedoSync is synchronous version of Redo func RedoSync(url, migrationsPath string) (err []error, ok bool) { pipe := pipep.New() - go Redo(pipe, url, migrationsPath) + go Redo(pipe, pipep.Signal(), url, migrationsPath) err = pipep.ReadErrors(pipe) return err, len(err) == 0 } // Reset runs the down and up migration function -func Reset(pipe chan interface{}, url, migrationsPath string) { +func Reset(pipe chan interface{}, signal chan os.Signal, url, migrationsPath string) { pipe1 := pipep.New() - go Down(pipe1, url, migrationsPath) - pipep.WaitAndRedirect(pipe1, pipe) - go Up(pipe, url, migrationsPath) + go Down(pipe1, signal, url, migrationsPath) + if ok := pipep.WaitAndRedirect(pipe1, pipe, signal); !ok { + go pipep.Close(pipe, nil) + return + } else { + go Up(pipe, signal, url, migrationsPath) + } } // ResetSync is synchronous version of Reset func ResetSync(url, migrationsPath string) (err []error, ok bool) { pipe := pipep.New() - go Reset(pipe, url, migrationsPath) + go Reset(pipe, pipep.Signal(), url, migrationsPath) err = pipep.ReadErrors(pipe) return err, len(err) == 0 } // Migrate applies relative +n/-n migrations -func Migrate(pipe chan interface{}, url, migrationsPath string, relativeN int) { +func Migrate(pipe chan interface{}, signal chan os.Signal, url, migrationsPath string, relativeN int) { d, files, version, err := initDriverAndReadMigrationFilesAndGetVersion(url, migrationsPath) if err != nil { go pipep.Close(pipe, err) @@ -121,17 +173,17 @@ func Migrate(pipe chan interface{}, url, migrationsPath string, relativeN int) { go pipep.Close(pipe, err) return } - if len(applyMigrationFiles) > 0 { - if relativeN > 0 { - go d.Migrate(applyMigrationFiles, pipe) - return - } else if relativeN < 0 { - go d.Migrate(applyMigrationFiles, pipe) - return - } else { - go pipep.Close(pipe, nil) - return + + if len(applyMigrationFiles) > 0 && relativeN != 0 { + for _, f := range applyMigrationFiles { + pipe1 := pipep.New() + go d.Migrate(f, pipe1) + if ok := pipep.WaitAndRedirect(pipe1, pipe, signal); !ok { + break + } } + go pipep.Close(pipe, nil) + return } go pipep.Close(pipe, nil) return @@ -140,7 +192,7 @@ func Migrate(pipe chan interface{}, url, migrationsPath string, relativeN int) { // MigrateSync is synchronous version of Migrate func MigrateSync(url, migrationsPath string, relativeN int) (err []error, ok bool) { pipe := pipep.New() - go Migrate(pipe, url, migrationsPath, relativeN) + go Migrate(pipe, pipep.Signal(), url, migrationsPath, relativeN) err = pipep.ReadErrors(pipe) return err, len(err) == 0 } diff --git a/pipe/pipe.go b/pipe/pipe.go index c8c3bc4..f07d122 100644 --- a/pipe/pipe.go +++ b/pipe/pipe.go @@ -1,6 +1,11 @@ // Package pipe has functions for pipe channel handling. package pipe +import ( + "os" + "os/signal" +) + // New creates a new pipe. A pipe is basically a channel. func New() chan interface{} { return make(chan interface{}, 0) @@ -14,22 +19,42 @@ func Close(pipe chan interface{}, err error) { close(pipe) } -// WaitAndRedirect waits for pipe to be closed and -// redirects all messages from pipe to redirectPipe -// while it waits. -func WaitAndRedirect(pipe, redirectPipe chan interface{}) { +func Signal() chan os.Signal { + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + return c +} + +func WaitAndRedirect(pipe, redirectPipe chan interface{}, signal chan os.Signal) (ok bool) { + errorReceived := false + signalsReceived := 0 + if pipe != nil && redirectPipe != nil { for { select { + + case <-signal: + signalsReceived += 1 + if signalsReceived > 1 { + os.Exit(5) + } else { + redirectPipe <- " Aborting after this migration ..." + } + case item, ok := <-pipe: if !ok { - return + return !errorReceived && signalsReceived == 0 } else { redirectPipe <- item + switch item.(type) { + case error: + errorReceived = true + } } } } } + return !errorReceived && signalsReceived == 0 } // ReadErrors selects all received errors and returns them.