From 2301404557e7536346cc0e17fe20221d0f696589 Mon Sep 17 00:00:00 2001 From: mattes Date: Tue, 12 Aug 2014 03:36:07 +0200 Subject: [PATCH] Use channels for communication --- driver/postgres/postgres.go | 10 ++- main.go | 29 ++++--- migrate/migrate.go | 158 ++++++++++++++---------------------- pipe/pipe.go | 61 ++++++++++++++ 4 files changed, 149 insertions(+), 109 deletions(-) create mode 100644 pipe/pipe.go diff --git a/driver/postgres/postgres.go b/driver/postgres/postgres.go index 2485476..fed0864 100644 --- a/driver/postgres/postgres.go +++ b/driver/postgres/postgres.go @@ -45,8 +45,17 @@ func (driver *Driver) FilenameExtension() string { func (driver *Driver) Migrate(files file.Files, pipe chan interface{}) { defer close(pipe) + for _, f := range files { + direc := "" + if f.Direction == direction.Up { + direc = " →" + } else if f.Direction == direction.Down { + direc = "← " + } + pipe <- fmt.Sprintf("%s | %s", direc, f.FileName) + tx, err := driver.db.Begin() if err != nil { pipe <- err @@ -79,7 +88,6 @@ func (driver *Driver) Migrate(files file.Files, pipe chan interface{}) { } return } - pipe <- fmt.Sprintf("Applied %s", f.FileName) if err := tx.Commit(); err != nil { pipe <- err diff --git a/main.go b/main.go index 4b2b612..395785c 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import ( "flag" "fmt" "github.com/mattes/migrate/migrate" + pipep "github.com/mattes/migrate/pipe" "os" "strconv" ) @@ -73,7 +74,6 @@ func writePipe(pipe chan interface{}) { case item, ok := <-pipe: if !ok { return - } else { switch item.(type) { case string: @@ -99,23 +99,36 @@ func createCmd(url, migrationsPath, name string) { fmt.Printf("Version %v migration files created in %v:\n", migrationFile.Version, migrationsPath) fmt.Println(migrationFile.UpFile.FileName) fmt.Println(migrationFile.DownFile.FileName) - } func upCmd(url, migrationsPath string) { - writePipe(migrate.UpPipe(url, migrationsPath, nil)) + pipe := pipep.New() + go migrate.Up(pipe, url, migrationsPath) + writePipe(pipe) } func downCmd(url, migrationsPath string) { - writePipe(migrate.DownPipe(url, migrationsPath, nil)) + pipe := pipep.New() + go migrate.Down(pipe, url, migrationsPath) + writePipe(pipe) } func redoCmd(url, migrationsPath string) { - writePipe(migrate.RedoPipe(url, migrationsPath, nil)) + pipe := pipep.New() + go migrate.Redo(pipe, url, migrationsPath) + writePipe(pipe) } func resetCmd(url, migrationsPath string) { - writePipe(migrate.ResetPipe(url, migrationsPath, nil)) + pipe := pipep.New() + go migrate.Reset(pipe, url, migrationsPath) + writePipe(pipe) +} + +func migrateCmd(url, migrationsPath string, relativeN int) { + pipe := pipep.New() + go migrate.Migrate(pipe, url, migrationsPath, relativeN) + writePipe(pipe) } func versionCmd(url, migrationsPath string) { @@ -127,10 +140,6 @@ func versionCmd(url, migrationsPath string) { fmt.Println(version) } -func migrateCmd(url, migrationsPath string, relativeN int) { - writePipe(migrate.MigratePipe(url, migrationsPath, relativeN, nil)) -} - func helpCmd() { os.Stderr.WriteString( `usage: migrate [-path=] [-url=] [] diff --git a/migrate/migrate.go b/migrate/migrate.go index 8deeec4..58e65c5 100644 --- a/migrate/migrate.go +++ b/migrate/migrate.go @@ -6,6 +6,7 @@ import ( "github.com/mattes/migrate/driver" "github.com/mattes/migrate/file" "github.com/mattes/migrate/migrate/direction" + pipep "github.com/mattes/migrate/pipe" "io/ioutil" "path" "strconv" @@ -30,165 +31,126 @@ func initDriverAndReadMigrationFilesAndGetVersion(url, migrationsPath string) (d return d, &files, version, nil } -// getErrorsFromPipe selects all received errors and returns them -func getErrorsFromPipe(pipe chan interface{}) []error { - err := make([]error, 0) - if pipe != nil { - for { - select { - case item, ok := <-pipe: - if !ok { - return err - } else { - switch item.(type) { - case error: - err = append(err, item.(error)) - } - } - } - } - } - return err -} - -// sendErrorAndClosePipe sends the error to the pipe and closes the -// pipe afterwards -func sendErrorAndClosePipe(err error, pipe chan interface{}) { - pipe <- err - close(pipe) -} - -func UpPipe(url, migrationsPath string, pipe chan interface{}) chan interface{} { - if pipe == nil { - pipe = make(chan interface{}, 1) // make buffered channel with cap 1 - } - +func Up(pipe chan interface{}, url, migrationsPath string) { d, files, version, err := initDriverAndReadMigrationFilesAndGetVersion(url, migrationsPath) if err != nil { - sendErrorAndClosePipe(err, pipe) - return pipe + go pipep.Close(pipe, err) + return } applyMigrationFiles, err := files.ToLastFrom(version) if err != nil { - sendErrorAndClosePipe(err, pipe) - return pipe + go pipep.Close(pipe, err) + return } if len(applyMigrationFiles) > 0 { go d.Migrate(applyMigrationFiles, pipe) - return pipe + return } else { - sendErrorAndClosePipe(errors.New("No migration files to apply."), pipe) - return pipe + go pipep.Close(pipe, errors.New("No migration files to apply.")) + return } } -func Up(url, migrationsPath string) (err []error, ok bool) { - pipe := UpPipe(url, migrationsPath, nil) - err = getErrorsFromPipe(pipe) +func UpSync(url, migrationsPath string) (err []error, ok bool) { + pipe := pipep.New() + go Up(pipe, url, migrationsPath) + err = pipep.ReadErrors(pipe) return err, len(err) == 0 } -func DownPipe(url, migrationsPath string, pipe chan interface{}) chan interface{} { - if pipe == nil { - pipe = make(chan interface{}, 1) // make buffered channel with cap 1 - } - +func Down(pipe chan interface{}, url, migrationsPath string) { d, files, version, err := initDriverAndReadMigrationFilesAndGetVersion(url, migrationsPath) if err != nil { - sendErrorAndClosePipe(err, pipe) - return pipe + go pipep.Close(pipe, err) + return } applyMigrationFiles, err := files.ToFirstFrom(version) if err != nil { - sendErrorAndClosePipe(err, pipe) - return pipe + go pipep.Close(pipe, err) + return } if len(applyMigrationFiles) > 0 { go d.Migrate(applyMigrationFiles, pipe) - return pipe + return } else { - sendErrorAndClosePipe(errors.New("No migration files to apply."), pipe) - return pipe + go pipep.Close(pipe, errors.New("No migration files to apply.")) + return } } -func Down(url, migrationsPath string) (err []error, ok bool) { - pipe := DownPipe(url, migrationsPath, nil) - err = getErrorsFromPipe(pipe) +func DownSync(url, migrationsPath string) (err []error, ok bool) { + pipe := pipep.New() + go Down(pipe, url, migrationsPath) + err = pipep.ReadErrors(pipe) return err, len(err) == 0 } -func RedoPipe(url, migrationsPath string, pipe chan interface{}) chan interface{} { - if pipe == nil { - pipe = make(chan interface{}, 1) // make buffered channel with cap 1 - } - - _ = MigratePipe(url, migrationsPath, -1, pipe) - _ = MigratePipe(url, migrationsPath, +1, pipe) - return pipe +func Redo(pipe chan interface{}, url, migrationsPath string) { + pipe1 := pipep.New() + go Migrate(pipe1, url, migrationsPath, -1) + go pipep.Redirect(pipe1, pipe) + pipep.Wait(pipe1) + go Migrate(pipe, url, migrationsPath, +1) } -func Redo(url, migrationsPath string) (err []error, ok bool) { - pipe := RedoPipe(url, migrationsPath, nil) - err = getErrorsFromPipe(pipe) +func RedoSync(url, migrationsPath string) (err []error, ok bool) { + pipe := pipep.New() + go Redo(pipe, url, migrationsPath) + err = pipep.ReadErrors(pipe) return err, len(err) == 0 } -func ResetPipe(url, migrationsPath string, pipe chan interface{}) chan interface{} { - if pipe == nil { - pipe = make(chan interface{}, 1) // make buffered channel with cap 1 - } - // TODO check pipe pointer - _ = DownPipe(url, migrationsPath, pipe) - _ = UpPipe(url, migrationsPath, pipe) - return pipe +func Reset(pipe chan interface{}, url, migrationsPath string) { + pipe1 := pipep.New() + go Down(pipe1, url, migrationsPath) + go pipep.Redirect(pipe1, pipe) + pipep.Wait(pipe1) + go Up(pipe, url, migrationsPath) } -func Reset(url, migrationsPath string) (err []error, ok bool) { - pipe := ResetPipe(url, migrationsPath, nil) - err = getErrorsFromPipe(pipe) +func ResetSync(url, migrationsPath string) (err []error, ok bool) { + pipe := pipep.New() + go Reset(pipe, url, migrationsPath) + err = pipep.ReadErrors(pipe) return err, len(err) == 0 } -func MigratePipe(url, migrationsPath string, relativeN int, pipe chan interface{}) chan interface{} { - if pipe == nil { - pipe = make(chan interface{}, 1) // make buffered channel with cap 1 - } - +func Migrate(pipe chan interface{}, url, migrationsPath string, relativeN int) { d, files, version, err := initDriverAndReadMigrationFilesAndGetVersion(url, migrationsPath) if err != nil { - sendErrorAndClosePipe(err, pipe) - return pipe + go pipep.Close(pipe, err) + return } applyMigrationFiles, err := files.From(version, relativeN) if err != nil { - sendErrorAndClosePipe(err, pipe) - return pipe + go pipep.Close(pipe, err) + return } if len(applyMigrationFiles) > 0 { if relativeN > 0 { go d.Migrate(applyMigrationFiles, pipe) - return pipe + return } else if relativeN < 0 { go d.Migrate(applyMigrationFiles, pipe) - return pipe + return } else { - sendErrorAndClosePipe(errors.New("No migration files to apply."), pipe) - return pipe + go pipep.Close(pipe, errors.New("No migration files to apply.")) + return } } - sendErrorAndClosePipe(errors.New("No migration files to apply."), pipe) - return pipe + go pipep.Close(pipe, errors.New("No migration files to apply.")) + return } -func Migrate(url, migrationsPath string, relativeN int) (err []error, ok bool) { - pipe := MigratePipe(url, migrationsPath, relativeN, nil) - err = getErrorsFromPipe(pipe) +func MigrateSync(url, migrationsPath string, relativeN int) (err []error, ok bool) { + pipe := pipep.New() + go Migrate(pipe, url, migrationsPath, relativeN) + err = pipep.ReadErrors(pipe) return err, len(err) == 0 } diff --git a/pipe/pipe.go b/pipe/pipe.go new file mode 100644 index 0000000..669159c --- /dev/null +++ b/pipe/pipe.go @@ -0,0 +1,61 @@ +package pipe + +func New() chan interface{} { + return make(chan interface{}, 0) +} + +func Close(pipe chan interface{}, err error) { + if err != nil { + pipe <- err + } + close(pipe) +} + +func Redirect(fromPipe, toPipe chan interface{}) { + if fromPipe != nil && toPipe != nil { + for { + select { + case item, ok := <-fromPipe: + if !ok { + return + } else { + toPipe <- item + } + } + } + } +} + +func Wait(pipe chan interface{}) { + if pipe != nil { + for { + select { + case _, ok := <-pipe: + if !ok { + return + } + } + } + } +} + +// getErrorsFromPipe selects all received errors and returns them +func ReadErrors(pipe chan interface{}) []error { + err := make([]error, 0) + if pipe != nil { + for { + select { + case item, ok := <-pipe: + if !ok { + return err + } else { + switch item.(type) { + case error: + err = append(err, item.(error)) + } + } + } + } + } + return err +}