2014-08-13 00:38:29 +00:00
|
|
|
// Package pipe has functions for pipe channel handling.
|
2014-08-12 01:36:07 +00:00
|
|
|
package pipe
|
|
|
|
|
2014-08-13 18:06:38 +00:00
|
|
|
import (
|
|
|
|
"os"
|
|
|
|
)
|
|
|
|
|
2014-08-13 00:34:24 +00:00
|
|
|
// New creates a new pipe. A pipe is basically a channel.
|
2014-08-12 01:36:07 +00:00
|
|
|
func New() chan interface{} {
|
|
|
|
return make(chan interface{}, 0)
|
|
|
|
}
|
|
|
|
|
2014-08-13 00:38:29 +00:00
|
|
|
// Close closes a pipe and optionally sends an error
|
2014-08-12 01:36:07 +00:00
|
|
|
func Close(pipe chan interface{}, err error) {
|
|
|
|
if err != nil {
|
|
|
|
pipe <- err
|
|
|
|
}
|
|
|
|
close(pipe)
|
|
|
|
}
|
|
|
|
|
2014-08-13 21:15:03 +00:00
|
|
|
// WaitAndRedirect waits for pipe to be closed and
|
|
|
|
// redirects all messages from pipe to redirectPipe
|
|
|
|
// while it waits. It also checks if there was an
|
|
|
|
// interrupt send and will quit gracefully if yes.
|
|
|
|
func WaitAndRedirect(pipe, redirectPipe chan interface{}, interrupt chan os.Signal) (ok bool) {
|
2014-08-13 18:06:38 +00:00
|
|
|
errorReceived := false
|
2014-08-13 21:15:03 +00:00
|
|
|
interruptsReceived := 0
|
2014-08-13 00:34:24 +00:00
|
|
|
if pipe != nil && redirectPipe != nil {
|
2014-08-12 01:36:07 +00:00
|
|
|
for {
|
|
|
|
select {
|
2014-08-13 18:06:38 +00:00
|
|
|
|
2014-08-13 21:15:03 +00:00
|
|
|
case <-interrupt:
|
|
|
|
interruptsReceived += 1
|
|
|
|
if interruptsReceived > 1 {
|
2014-08-13 18:06:38 +00:00
|
|
|
os.Exit(5)
|
|
|
|
} else {
|
2014-08-13 21:15:03 +00:00
|
|
|
// add white space at beginning for ^C splitting
|
2014-08-13 22:22:56 +00:00
|
|
|
redirectPipe <- " Aborting after this migration ... Hit again to force quit."
|
2014-08-13 18:06:38 +00:00
|
|
|
}
|
|
|
|
|
2014-08-13 00:34:24 +00:00
|
|
|
case item, ok := <-pipe:
|
2014-08-12 01:36:07 +00:00
|
|
|
if !ok {
|
2014-08-13 21:15:03 +00:00
|
|
|
return !errorReceived && interruptsReceived == 0
|
2014-08-12 01:36:07 +00:00
|
|
|
} else {
|
2014-08-13 00:34:24 +00:00
|
|
|
redirectPipe <- item
|
2014-08-13 18:06:38 +00:00
|
|
|
switch item.(type) {
|
|
|
|
case error:
|
|
|
|
errorReceived = true
|
|
|
|
}
|
2014-08-12 01:36:07 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2014-08-13 21:15:03 +00:00
|
|
|
return !errorReceived && interruptsReceived == 0
|
2014-08-12 01:36:07 +00:00
|
|
|
}
|
|
|
|
|
2014-08-13 00:34:24 +00:00
|
|
|
// ReadErrors selects all received errors and returns them.
|
|
|
|
// This is helpful for synchronous migration functions.
|
2014-08-12 01:36:07 +00:00
|
|
|
func ReadErrors(pipe chan interface{}) []error {
|
|
|
|
err := make([]error, 0)
|
|
|
|
if pipe != nil {
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case item, ok := <-pipe:
|
|
|
|
if !ok {
|
|
|
|
return err
|
|
|
|
} else {
|
|
|
|
switch item.(type) {
|
|
|
|
case error:
|
|
|
|
err = append(err, item.(error))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|