fix a bug where pipe messages were dropped by accident

This commit is contained in:
mattes 2014-08-13 02:34:24 +02:00
parent 6715db583b
commit 5ee5242bb1
3 changed files with 15 additions and 25 deletions

View File

@ -10,7 +10,6 @@ import (
pipep "github.com/mattes/migrate/pipe" pipep "github.com/mattes/migrate/pipe"
"os" "os"
"strconv" "strconv"
"strings"
"time" "time"
) )
@ -97,9 +96,9 @@ func writePipe(pipe chan interface{}) {
f := item.(file.File) f := item.(file.File)
if f.Direction == direction.Up { if f.Direction == direction.Up {
fmt.Print("[-> ]") fmt.Print("o--> ")
} else if f.Direction == direction.Down { } else if f.Direction == direction.Down {
fmt.Print("[ <-]") fmt.Print(" <--o")
} }
fmt.Printf(" %s\n", f.FileName) fmt.Printf(" %s\n", f.FileName)

View File

@ -92,8 +92,7 @@ func DownSync(url, migrationsPath string) (err []error, ok bool) {
func Redo(pipe chan interface{}, url, migrationsPath string) { func Redo(pipe chan interface{}, url, migrationsPath string) {
pipe1 := pipep.New() pipe1 := pipep.New()
go Migrate(pipe1, url, migrationsPath, -1) go Migrate(pipe1, url, migrationsPath, -1)
go pipep.Redirect(pipe1, pipe) pipep.WaitAndRedirect(pipe1, pipe)
pipep.Wait(pipe1)
go Migrate(pipe, url, migrationsPath, +1) go Migrate(pipe, url, migrationsPath, +1)
} }
@ -107,8 +106,7 @@ func RedoSync(url, migrationsPath string) (err []error, ok bool) {
func Reset(pipe chan interface{}, url, migrationsPath string) { func Reset(pipe chan interface{}, url, migrationsPath string) {
pipe1 := pipep.New() pipe1 := pipep.New()
go Down(pipe1, url, migrationsPath) go Down(pipe1, url, migrationsPath)
go pipep.Redirect(pipe1, pipe) pipep.WaitAndRedirect(pipe1, pipe)
pipep.Wait(pipe1)
go Up(pipe, url, migrationsPath) go Up(pipe, url, migrationsPath)
} }

View File

@ -1,9 +1,11 @@
package pipe package pipe
// New creates a new pipe. A pipe is basically a channel.
func New() chan interface{} { func New() chan interface{} {
return make(chan interface{}, 0) return make(chan interface{}, 0)
} }
// Close closes pipe and optionally sends an error
func Close(pipe chan interface{}, err error) { func Close(pipe chan interface{}, err error) {
if err != nil { if err != nil {
pipe <- err pipe <- err
@ -11,35 +13,26 @@ func Close(pipe chan interface{}, err error) {
close(pipe) close(pipe)
} }
func Redirect(fromPipe, toPipe chan interface{}) { // WaitAndRedirect waits for pipe to be closed and
if fromPipe != nil && toPipe != nil { // redirects all messages from pipe to redirectPipe
// while it waits.
func WaitAndRedirect(pipe, redirectPipe chan interface{}) {
if pipe != nil && redirectPipe != nil {
for { for {
select { select {
case item, ok := <-fromPipe: case item, ok := <-pipe:
if !ok { if !ok {
return return
} else { } else {
toPipe <- item redirectPipe <- item
} }
} }
} }
} }
} }
func Wait(pipe chan interface{}) { // ReadErrors selects all received errors and returns them.
if pipe != nil { // This is helpful for synchronous migration functions.
for {
select {
case _, ok := <-pipe:
if !ok {
return
}
}
}
}
}
// getErrorsFromPipe selects all received errors and returns them
func ReadErrors(pipe chan interface{}) []error { func ReadErrors(pipe chan interface{}) []error {
err := make([]error, 0) err := make([]error, 0)
if pipe != nil { if pipe != nil {