rename signal to interrupt and cleanup

This commit is contained in:
mattes 2014-08-13 23:15:03 +02:00
parent 69df0f6de7
commit e245a4664c
4 changed files with 35 additions and 28 deletions

View File

@ -9,7 +9,6 @@ import (
"github.com/mattes/migrate/file"
"github.com/mattes/migrate/migrate/direction"
"strconv"
"time"
)
type Driver struct {
@ -52,8 +51,6 @@ func (driver *Driver) Migrate(f file.File, pipe chan interface{}) {
pipe <- f
time.Sleep(3 * time.Second)
tx, err := driver.db.Begin()
if err != nil {
pipe <- err

View File

@ -12,7 +12,6 @@ import (
"github.com/mattes/migrate/migrate/direction"
pipep "github.com/mattes/migrate/pipe"
"os"
// "os/signal"
"strconv"
"time"
)
@ -106,12 +105,9 @@ func writePipe(pipe chan interface{}) {
}
fmt.Printf(" %s\n", f.FileName)
case os.Signal:
fmt.Println("signal", item)
default:
text := fmt.Sprint(item)
fmt.Println("TEXT", text)
fmt.Println(text)
}
}
}

View File

@ -33,7 +33,7 @@ func Up(pipe chan interface{}, url, migrationsPath string) {
for _, f := range applyMigrationFiles {
pipe1 := pipep.New()
go d.Migrate(f, pipe1)
if ok := pipep.WaitAndRedirect(pipe1, pipe, handleSignal()); !ok {
if ok := pipep.WaitAndRedirect(pipe1, pipe, handleInterrupts()); !ok {
break
}
}
@ -71,7 +71,7 @@ func Down(pipe chan interface{}, url, migrationsPath string) {
for _, f := range applyMigrationFiles {
pipe1 := pipep.New()
go d.Migrate(f, pipe1)
if ok := pipep.WaitAndRedirect(pipe1, pipe, handleSignal()); !ok {
if ok := pipep.WaitAndRedirect(pipe1, pipe, handleInterrupts()); !ok {
break
}
}
@ -95,7 +95,7 @@ func DownSync(url, migrationsPath string) (err []error, ok bool) {
func Redo(pipe chan interface{}, url, migrationsPath string) {
pipe1 := pipep.New()
go Migrate(pipe1, url, migrationsPath, -1)
if ok := pipep.WaitAndRedirect(pipe1, pipe, handleSignal()); !ok {
if ok := pipep.WaitAndRedirect(pipe1, pipe, handleInterrupts()); !ok {
go pipep.Close(pipe, nil)
return
} else {
@ -115,7 +115,7 @@ func RedoSync(url, migrationsPath string) (err []error, ok bool) {
func Reset(pipe chan interface{}, url, migrationsPath string) {
pipe1 := pipep.New()
go Down(pipe1, url, migrationsPath)
if ok := pipep.WaitAndRedirect(pipe1, pipe, handleSignal()); !ok {
if ok := pipep.WaitAndRedirect(pipe1, pipe, handleInterrupts()); !ok {
go pipep.Close(pipe, nil)
return
} else {
@ -149,7 +149,7 @@ func Migrate(pipe chan interface{}, url, migrationsPath string, relativeN int) {
for _, f := range applyMigrationFiles {
pipe1 := pipep.New()
go d.Migrate(f, pipe1)
if ok := pipep.WaitAndRedirect(pipe1, pipe, handleSignal()); !ok {
if ok := pipep.WaitAndRedirect(pipe1, pipe, handleInterrupts()); !ok {
break
}
}
@ -256,18 +256,28 @@ func NewPipe() chan interface{} {
return pipep.New()
}
var handleSignals = true
// interrupts is an internal variable that holds the state of
// interrupt handling
var interrupts = true
func EnableSignals() {
handleSignals = true
// Graceful enables interrupts checking. Once the first ^C is received
// it will finish the currently running migration and abort execution
// of the next migration. If ^C is received twice, it will stop
// execution immediately.
func Graceful() {
interrupts = true
}
func DisableSignals() {
handleSignals = false
// NonGraceful disables interrupts checking. The first received ^C will
// stop execution immediately.
func NonGraceful() {
interrupts = false
}
func handleSignal() chan os.Signal {
if handleSignals {
// interrupts returns a signal channel if interrupts checking is
// enabled. nil otherwise.
func handleInterrupts() chan os.Signal {
if interrupts {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
return c

View File

@ -18,25 +18,29 @@ func Close(pipe chan interface{}, err error) {
close(pipe)
}
func WaitAndRedirect(pipe, redirectPipe chan interface{}, signal chan os.Signal) (ok bool) {
// WaitAndRedirect waits for pipe to be closed and
// redirects all messages from pipe to redirectPipe
// while it waits. It also checks if there was an
// interrupt send and will quit gracefully if yes.
func WaitAndRedirect(pipe, redirectPipe chan interface{}, interrupt chan os.Signal) (ok bool) {
errorReceived := false
signalsReceived := 0
interruptsReceived := 0
if pipe != nil && redirectPipe != nil {
for {
select {
case <-signal:
signalsReceived += 1
if signalsReceived > 1 {
case <-interrupt:
interruptsReceived += 1
if interruptsReceived > 1 {
os.Exit(5)
} else {
// add white space at beginning for ^C splitting
redirectPipe <- " Aborting after this migration ..."
}
case item, ok := <-pipe:
if !ok {
return !errorReceived && signalsReceived == 0
return !errorReceived && interruptsReceived == 0
} else {
redirectPipe <- item
switch item.(type) {
@ -47,7 +51,7 @@ func WaitAndRedirect(pipe, redirectPipe chan interface{}, signal chan os.Signal)
}
}
}
return !errorReceived && signalsReceived == 0
return !errorReceived && interruptsReceived == 0
}
// ReadErrors selects all received errors and returns them.