Use channels for communication

This commit is contained in:
mattes 2014-08-12 03:36:07 +02:00
parent 707c9b9c07
commit 2301404557
4 changed files with 149 additions and 109 deletions

View File

@ -45,8 +45,17 @@ func (driver *Driver) FilenameExtension() string {
func (driver *Driver) Migrate(files file.Files, pipe chan interface{}) {
defer close(pipe)
for _, f := range files {
direc := ""
if f.Direction == direction.Up {
direc = " →"
} else if f.Direction == direction.Down {
direc = "← "
}
pipe <- fmt.Sprintf("%s | %s", direc, f.FileName)
tx, err := driver.db.Begin()
if err != nil {
pipe <- err
@ -79,7 +88,6 @@ func (driver *Driver) Migrate(files file.Files, pipe chan interface{}) {
}
return
}
pipe <- fmt.Sprintf("Applied %s", f.FileName)
if err := tx.Commit(); err != nil {
pipe <- err

29
main.go
View File

@ -4,6 +4,7 @@ import (
"flag"
"fmt"
"github.com/mattes/migrate/migrate"
pipep "github.com/mattes/migrate/pipe"
"os"
"strconv"
)
@ -73,7 +74,6 @@ func writePipe(pipe chan interface{}) {
case item, ok := <-pipe:
if !ok {
return
} else {
switch item.(type) {
case string:
@ -99,23 +99,36 @@ func createCmd(url, migrationsPath, name string) {
fmt.Printf("Version %v migration files created in %v:\n", migrationFile.Version, migrationsPath)
fmt.Println(migrationFile.UpFile.FileName)
fmt.Println(migrationFile.DownFile.FileName)
}
func upCmd(url, migrationsPath string) {
writePipe(migrate.UpPipe(url, migrationsPath, nil))
pipe := pipep.New()
go migrate.Up(pipe, url, migrationsPath)
writePipe(pipe)
}
func downCmd(url, migrationsPath string) {
writePipe(migrate.DownPipe(url, migrationsPath, nil))
pipe := pipep.New()
go migrate.Down(pipe, url, migrationsPath)
writePipe(pipe)
}
func redoCmd(url, migrationsPath string) {
writePipe(migrate.RedoPipe(url, migrationsPath, nil))
pipe := pipep.New()
go migrate.Redo(pipe, url, migrationsPath)
writePipe(pipe)
}
func resetCmd(url, migrationsPath string) {
writePipe(migrate.ResetPipe(url, migrationsPath, nil))
pipe := pipep.New()
go migrate.Reset(pipe, url, migrationsPath)
writePipe(pipe)
}
func migrateCmd(url, migrationsPath string, relativeN int) {
pipe := pipep.New()
go migrate.Migrate(pipe, url, migrationsPath, relativeN)
writePipe(pipe)
}
func versionCmd(url, migrationsPath string) {
@ -127,10 +140,6 @@ func versionCmd(url, migrationsPath string) {
fmt.Println(version)
}
func migrateCmd(url, migrationsPath string, relativeN int) {
writePipe(migrate.MigratePipe(url, migrationsPath, relativeN, nil))
}
func helpCmd() {
os.Stderr.WriteString(
`usage: migrate [-path=<path>] [-url=<url>] <command> [<args>]

View File

@ -6,6 +6,7 @@ import (
"github.com/mattes/migrate/driver"
"github.com/mattes/migrate/file"
"github.com/mattes/migrate/migrate/direction"
pipep "github.com/mattes/migrate/pipe"
"io/ioutil"
"path"
"strconv"
@ -30,165 +31,126 @@ func initDriverAndReadMigrationFilesAndGetVersion(url, migrationsPath string) (d
return d, &files, version, nil
}
// getErrorsFromPipe selects all received errors and returns them
func getErrorsFromPipe(pipe chan interface{}) []error {
err := make([]error, 0)
if pipe != nil {
for {
select {
case item, ok := <-pipe:
if !ok {
return err
} else {
switch item.(type) {
case error:
err = append(err, item.(error))
}
}
}
}
}
return err
}
// sendErrorAndClosePipe sends the error to the pipe and closes the
// pipe afterwards
func sendErrorAndClosePipe(err error, pipe chan interface{}) {
pipe <- err
close(pipe)
}
func UpPipe(url, migrationsPath string, pipe chan interface{}) chan interface{} {
if pipe == nil {
pipe = make(chan interface{}, 1) // make buffered channel with cap 1
}
func Up(pipe chan interface{}, url, migrationsPath string) {
d, files, version, err := initDriverAndReadMigrationFilesAndGetVersion(url, migrationsPath)
if err != nil {
sendErrorAndClosePipe(err, pipe)
return pipe
go pipep.Close(pipe, err)
return
}
applyMigrationFiles, err := files.ToLastFrom(version)
if err != nil {
sendErrorAndClosePipe(err, pipe)
return pipe
go pipep.Close(pipe, err)
return
}
if len(applyMigrationFiles) > 0 {
go d.Migrate(applyMigrationFiles, pipe)
return pipe
return
} else {
sendErrorAndClosePipe(errors.New("No migration files to apply."), pipe)
return pipe
go pipep.Close(pipe, errors.New("No migration files to apply."))
return
}
}
func Up(url, migrationsPath string) (err []error, ok bool) {
pipe := UpPipe(url, migrationsPath, nil)
err = getErrorsFromPipe(pipe)
func UpSync(url, migrationsPath string) (err []error, ok bool) {
pipe := pipep.New()
go Up(pipe, url, migrationsPath)
err = pipep.ReadErrors(pipe)
return err, len(err) == 0
}
func DownPipe(url, migrationsPath string, pipe chan interface{}) chan interface{} {
if pipe == nil {
pipe = make(chan interface{}, 1) // make buffered channel with cap 1
}
func Down(pipe chan interface{}, url, migrationsPath string) {
d, files, version, err := initDriverAndReadMigrationFilesAndGetVersion(url, migrationsPath)
if err != nil {
sendErrorAndClosePipe(err, pipe)
return pipe
go pipep.Close(pipe, err)
return
}
applyMigrationFiles, err := files.ToFirstFrom(version)
if err != nil {
sendErrorAndClosePipe(err, pipe)
return pipe
go pipep.Close(pipe, err)
return
}
if len(applyMigrationFiles) > 0 {
go d.Migrate(applyMigrationFiles, pipe)
return pipe
return
} else {
sendErrorAndClosePipe(errors.New("No migration files to apply."), pipe)
return pipe
go pipep.Close(pipe, errors.New("No migration files to apply."))
return
}
}
func Down(url, migrationsPath string) (err []error, ok bool) {
pipe := DownPipe(url, migrationsPath, nil)
err = getErrorsFromPipe(pipe)
func DownSync(url, migrationsPath string) (err []error, ok bool) {
pipe := pipep.New()
go Down(pipe, url, migrationsPath)
err = pipep.ReadErrors(pipe)
return err, len(err) == 0
}
func RedoPipe(url, migrationsPath string, pipe chan interface{}) chan interface{} {
if pipe == nil {
pipe = make(chan interface{}, 1) // make buffered channel with cap 1
}
_ = MigratePipe(url, migrationsPath, -1, pipe)
_ = MigratePipe(url, migrationsPath, +1, pipe)
return pipe
func Redo(pipe chan interface{}, url, migrationsPath string) {
pipe1 := pipep.New()
go Migrate(pipe1, url, migrationsPath, -1)
go pipep.Redirect(pipe1, pipe)
pipep.Wait(pipe1)
go Migrate(pipe, url, migrationsPath, +1)
}
func Redo(url, migrationsPath string) (err []error, ok bool) {
pipe := RedoPipe(url, migrationsPath, nil)
err = getErrorsFromPipe(pipe)
func RedoSync(url, migrationsPath string) (err []error, ok bool) {
pipe := pipep.New()
go Redo(pipe, url, migrationsPath)
err = pipep.ReadErrors(pipe)
return err, len(err) == 0
}
func ResetPipe(url, migrationsPath string, pipe chan interface{}) chan interface{} {
if pipe == nil {
pipe = make(chan interface{}, 1) // make buffered channel with cap 1
}
// TODO check pipe pointer
_ = DownPipe(url, migrationsPath, pipe)
_ = UpPipe(url, migrationsPath, pipe)
return pipe
func Reset(pipe chan interface{}, url, migrationsPath string) {
pipe1 := pipep.New()
go Down(pipe1, url, migrationsPath)
go pipep.Redirect(pipe1, pipe)
pipep.Wait(pipe1)
go Up(pipe, url, migrationsPath)
}
func Reset(url, migrationsPath string) (err []error, ok bool) {
pipe := ResetPipe(url, migrationsPath, nil)
err = getErrorsFromPipe(pipe)
func ResetSync(url, migrationsPath string) (err []error, ok bool) {
pipe := pipep.New()
go Reset(pipe, url, migrationsPath)
err = pipep.ReadErrors(pipe)
return err, len(err) == 0
}
func MigratePipe(url, migrationsPath string, relativeN int, pipe chan interface{}) chan interface{} {
if pipe == nil {
pipe = make(chan interface{}, 1) // make buffered channel with cap 1
}
func Migrate(pipe chan interface{}, url, migrationsPath string, relativeN int) {
d, files, version, err := initDriverAndReadMigrationFilesAndGetVersion(url, migrationsPath)
if err != nil {
sendErrorAndClosePipe(err, pipe)
return pipe
go pipep.Close(pipe, err)
return
}
applyMigrationFiles, err := files.From(version, relativeN)
if err != nil {
sendErrorAndClosePipe(err, pipe)
return pipe
go pipep.Close(pipe, err)
return
}
if len(applyMigrationFiles) > 0 {
if relativeN > 0 {
go d.Migrate(applyMigrationFiles, pipe)
return pipe
return
} else if relativeN < 0 {
go d.Migrate(applyMigrationFiles, pipe)
return pipe
return
} else {
sendErrorAndClosePipe(errors.New("No migration files to apply."), pipe)
return pipe
go pipep.Close(pipe, errors.New("No migration files to apply."))
return
}
}
sendErrorAndClosePipe(errors.New("No migration files to apply."), pipe)
return pipe
go pipep.Close(pipe, errors.New("No migration files to apply."))
return
}
func Migrate(url, migrationsPath string, relativeN int) (err []error, ok bool) {
pipe := MigratePipe(url, migrationsPath, relativeN, nil)
err = getErrorsFromPipe(pipe)
func MigrateSync(url, migrationsPath string, relativeN int) (err []error, ok bool) {
pipe := pipep.New()
go Migrate(pipe, url, migrationsPath, relativeN)
err = pipep.ReadErrors(pipe)
return err, len(err) == 0
}

61
pipe/pipe.go Normal file
View File

@ -0,0 +1,61 @@
package pipe
func New() chan interface{} {
return make(chan interface{}, 0)
}
func Close(pipe chan interface{}, err error) {
if err != nil {
pipe <- err
}
close(pipe)
}
func Redirect(fromPipe, toPipe chan interface{}) {
if fromPipe != nil && toPipe != nil {
for {
select {
case item, ok := <-fromPipe:
if !ok {
return
} else {
toPipe <- item
}
}
}
}
}
func Wait(pipe chan interface{}) {
if pipe != nil {
for {
select {
case _, ok := <-pipe:
if !ok {
return
}
}
}
}
}
// getErrorsFromPipe selects all received errors and returns them
func ReadErrors(pipe chan interface{}) []error {
err := make([]error, 0)
if pipe != nil {
for {
select {
case item, ok := <-pipe:
if !ok {
return err
} else {
switch item.(type) {
case error:
err = append(err, item.(error))
}
}
}
}
}
return err
}