diff --git a/migrate.go b/migrate.go index c53edc1..31f318f 100644 --- a/migrate.go +++ b/migrate.go @@ -1,3 +1,7 @@ +// Package migrate reads migrations from sources and runs them against databases. +// Sources are defined by the `source.Driver` and databases by the `database.Driver` +// interface. The driver interfaces are kept "dump", all migration logic is kept +// in this package. package migrate import ( @@ -10,6 +14,11 @@ import ( "github.com/mattes/migrate/source" ) +// DefaultPrefetchMigrations sets the number of migrations to pre-read +// from the source. This is helpful if the source is remote, but has little +// effect for a local source (i.e. file system). +// Please note that this setting has a major impact on the memory usage, +// since each pre-read migration is buffered in memory. See DefaultBufferSize. var DefaultPrefetchMigrations = uint(10) var ( @@ -18,10 +27,13 @@ var ( ErrLocked = fmt.Errorf("database locked") ) +// ErrShortLimit is an error returned when not enough migrations +// can be returned by a source for a given limit. type ErrShortLimit struct { Short uint } +// Error implements the error interface. func (e ErrShortLimit) Error() string { return fmt.Sprintf("limit %v short", e.Short) } @@ -32,17 +44,25 @@ type Migrate struct { databaseName string databaseDrv database.Driver + // Log accepts a Logger interface Log Logger + // GracefulStop accepts `true` and will stop executing migrations + // as soon as possible at a safe break point, so that the database + // is not corrpupted. GracefulStop chan bool isGracefulStop bool isLockedMu *sync.Mutex isLocked bool + // PrefetchMigrations defaults to DefaultPrefetchMigrations, + // but can be set per Migrate instance. PrefetchMigrations uint } +// New returns a new Migrate instance from a source URL and a database URL. +// The URL scheme is defined by each driver. func New(sourceUrl, databaseUrl string) (*Migrate, error) { m := newCommon() @@ -73,6 +93,10 @@ func New(sourceUrl, databaseUrl string) (*Migrate, error) { return m, nil } +// NewWithDatabaseInstance returns a new Migrate instance from a source URL +// and an existing database instance. The source URL scheme is defined by each driver. +// Use any string that can serve as an identifier during logging as databaseName. +// You are responsible for closing the underlying database client if necessary. func NewWithDatabaseInstance(sourceUrl string, databaseName string, databaseInstance database.Driver) (*Migrate, error) { m := newCommon() @@ -95,6 +119,10 @@ func NewWithDatabaseInstance(sourceUrl string, databaseName string, databaseInst return m, nil } +// NewWithSourceInstance returns a new Migrate instance from an existing source instance +// and a database URL. The database URL scheme is defined by each driver. +// Use any string that can serve as an identifier during logging as sourceName. +// You are responsible for closing the underlying source client if necessary. func NewWithSourceInstance(sourceName string, sourceInstance source.Driver, databaseUrl string) (*Migrate, error) { m := newCommon() @@ -117,6 +145,10 @@ func NewWithSourceInstance(sourceName string, sourceInstance source.Driver, data return m, nil } +// NewWithInstance returns a new Migrate instance from an existing source and +// database instance. Use any string that can serve as an identifier during logging +// as sourceName and databaseName. You are responsible for closing down +// the underlying source and database client if necessary. func NewWithInstance(sourceName string, sourceInstance source.Driver, databaseName string, databaseInstance database.Driver) (*Migrate, error) { m := newCommon() @@ -137,6 +169,7 @@ func newCommon() *Migrate { } } +// Close closes the the source and the database. func (m *Migrate) Close() (sourceErr error, databaseErr error) { databaseSrvClose := make(chan error) sourceSrvClose := make(chan error) @@ -152,6 +185,8 @@ func (m *Migrate) Close() (sourceErr error, databaseErr error) { return <-sourceSrvClose, <-databaseSrvClose } +// Migrate looks at the currently active migration version, +// then migrates either up or down to the specified version. func (m *Migrate) Migrate(version uint) error { if err := m.lock(); err != nil { return err @@ -168,6 +203,8 @@ func (m *Migrate) Migrate(version uint) error { return m.unlockErr(m.runMigrations(ret)) } +// Steps looks at the currently active migration version. +// It will migrate up if n > 0, and down if n < 0. func (m *Migrate) Steps(n int) error { if n == 0 { return ErrNoChange @@ -193,6 +230,8 @@ func (m *Migrate) Steps(n int) error { return m.unlockErr(m.runMigrations(ret)) } +// Up looks at the currently active migration version +// and will migrate all the way up (applying all up migrations). func (m *Migrate) Up() error { if err := m.lock(); err != nil { return err @@ -209,6 +248,8 @@ func (m *Migrate) Up() error { return m.unlockErr(m.runMigrations(ret)) } +// Down looks at the currently active migration version +// and will migrate all the way down (applying all down migrations). func (m *Migrate) Down() error { if err := m.lock(); err != nil { return err @@ -224,6 +265,7 @@ func (m *Migrate) Down() error { return m.unlockErr(m.runMigrations(ret)) } +// Drop deletes everyting in the database. func (m *Migrate) Drop() error { if err := m.lock(); err != nil { return err @@ -234,6 +276,8 @@ func (m *Migrate) Drop() error { return m.unlock() } +// Version returns the currently active migration version. +// If no migration has been applied, yet, it will return ErrNilVersion. func (m *Migrate) Version() (uint, error) { v, err := m.databaseDrv.Version() if err != nil { @@ -247,6 +291,10 @@ func (m *Migrate) Version() (uint, error) { return suint(v), nil } +// read reads either up or down migrations from source `from` to `to`. +// Each migration is then written to the ret channel. +// If an error occurs during reading, that error is written to the ret channel, too. +// Once read is done reading it will close the ret channel. func (m *Migrate) read(from int, to int, ret chan<- interface{}) { defer close(ret) @@ -354,6 +402,11 @@ func (m *Migrate) read(from int, to int, ret chan<- interface{}) { } } +// readUp reads up migrations from `from` limitted by `limit`. +// limit can be -1, implying no limit and reading until there are no more migrations. +// Each migration is then written to the ret channel. +// If an error occurs during reading, that error is written to the ret channel, too. +// Once readUp is done reading it will close the ret channel. func (m *Migrate) readUp(from int, limit int, ret chan<- interface{}) { defer close(ret) @@ -441,6 +494,11 @@ func (m *Migrate) readUp(from int, limit int, ret chan<- interface{}) { } } +// readDown reads down migrations from `from` limitted by `limit`. +// limit can be -1, implying no limit and reading until there are no more migrations. +// Each migration is then written to the ret channel. +// If an error occurs during reading, that error is written to the ret channel, too. +// Once readDown is done reading it will close the ret channel. func (m *Migrate) readDown(from int, limit int, ret chan<- interface{}) { defer close(ret) @@ -518,7 +576,12 @@ func (m *Migrate) readDown(from int, limit int, ret chan<- interface{}) { } } -// ret chan expects *Migration or error +// runMigrations reads *Migration and error from a channel. Any other type +// sent on this channel will result in a panic. Each migration is then +// proxied to the database driver and run against the database. +// Before running a newly received migration it will check if it's supposed +// to stop execution because it might have received a stop signal on the +// GracefulStop channel. func (m *Migrate) runMigrations(ret <-chan interface{}) error { for r := range ret { @@ -566,6 +629,8 @@ func (m *Migrate) runMigrations(ret <-chan interface{}) error { return nil } +// versionExists checks the source if either the up or down migration for +// the specified migration version exists. func (m *Migrate) versionExists(version uint) error { // try up migration first up, _, err := m.sourceDrv.ReadUp(version) @@ -592,6 +657,9 @@ func (m *Migrate) versionExists(version uint) error { return os.ErrNotExist } +// stop returns true if no more migrations should be run against the database +// because a stop signal was received on the GracefulStop channel. +// Calls are cheap and this function is not blocking. func (m *Migrate) stop() bool { if m.isGracefulStop { return true @@ -607,6 +675,8 @@ func (m *Migrate) stop() bool { } } +// newMigration is a helper func that returns a *Migration for the +// specified version and targetVersion. func (m *Migrate) newMigration(version uint, targetVersion int) (*Migration, error) { var migr *Migration @@ -660,6 +730,8 @@ func (m *Migrate) newMigration(version uint, targetVersion int) (*Migration, err return migr, nil } +// lock is a thread safe helper function to lock the database. +// It should be called as late as possible when running migrations. func (m *Migrate) lock() error { m.isLockedMu.Lock() defer m.isLockedMu.Unlock() @@ -675,13 +747,15 @@ func (m *Migrate) lock() error { return ErrLocked } +// unlock is a thread safe helper function to unlock the database. +// It should be called as early as possible when no more migrations are +// expected to be executed. func (m *Migrate) unlock() error { m.isLockedMu.Lock() defer m.isLockedMu.Unlock() if err := m.databaseDrv.Unlock(); err != nil { - // can potentially create deadlock when never succeeds - // TODO: add timeout + // BUG: Can potentially create a deadlock. Add a timeout. return err } @@ -689,18 +763,23 @@ func (m *Migrate) unlock() error { return nil } +// unlockErr calls unlock and returns a combined error +// if a prevErr is not nil. func (m *Migrate) unlockErr(prevErr error) error { if err := m.unlock(); err != nil { return NewMultiError(prevErr, err) } return prevErr } + +// logPrintf writes to m.Log if not nil func (m *Migrate) logPrintf(format string, v ...interface{}) { if m.Log != nil { m.Log.Printf(format, v...) } } +// logVerbosePrintf writes to m.Log if not nil. Use for verbose logging output. func (m *Migrate) logVerbosePrintf(format string, v ...interface{}) { if m.Log != nil && m.Log.Verbose() { m.Log.Printf(format, v...) diff --git a/migrate/direction/direction.go b/migrate/direction/direction.go index a89f0e1..a67a759 100644 --- a/migrate/direction/direction.go +++ b/migrate/direction/direction.go @@ -1,3 +1,5 @@ +// Deprecated: package direction is deprecated. +// Will be removed soon. package direction type Direction int diff --git a/migrate/file/file.go b/migrate/file/file.go index 9497c19..cfc569a 100644 --- a/migrate/file/file.go +++ b/migrate/file/file.go @@ -1,3 +1,5 @@ +// Deprecated: package file is deprecated. +// Will be removed soon. package file import ( diff --git a/migrate/v1compat.go b/migrate/v1compat.go index 5d4c839..41a5ac4 100644 --- a/migrate/v1compat.go +++ b/migrate/v1compat.go @@ -1,4 +1,5 @@ -// Deprecated: package migrate is here to make sure v2 is downwards compatible with v1 +// Deprecated: package migrate makes this version backwards compatible. +// Expect this to be removed very soon. package migrate import ( diff --git a/migrate_test.go b/migrate_test.go index d777a76..9cb04bf 100644 --- a/migrate_test.go +++ b/migrate_test.go @@ -2,8 +2,10 @@ package migrate import ( "bytes" + "database/sql" "fmt" "io/ioutil" + "log" "os" "testing" @@ -15,7 +17,6 @@ import ( // u = up migration, d = down migration, n = version // | 1 | - | 3 | 4 | 5 | - | 7 | // | u d | - | u | u d | d | - | u d | -// var sourceStubMigrations = source.NewMigrations() var sourceStubMigrations *source.Migrations @@ -54,6 +55,19 @@ func TestNew(t *testing.T) { } } +func ExampleNew() { + // Read migrations from /home/mattes/migrations and connect to a local postgres database. + m, err := New("file:///home/mattes/migrations", "postgres://mattes:secret@localhost:5432/database?sslmode=disable") + if err != nil { + log.Fatal(err) + } + + // Migrate all the way up ... + if err := m.Up(); err != nil { + log.Fatal(err) + } +} + func TestNewWithDatabaseInstance(t *testing.T) { dummyDb := &DummyInstance{"database"} dbInst, err := dStub.WithInstance(dummyDb, &dStub.Config{}) @@ -81,6 +95,34 @@ func TestNewWithDatabaseInstance(t *testing.T) { } } +func ExampleNewWithDatabaseInstance() { + // Create and use an existing database instance. + db, err := sql.Open("postgres", "postgres://mattes:secret@localhost:5432/database?sslmode=disable") + if err != nil { + log.Fatal(err) + } + defer db.Close() + + // Create driver instance from db. + // Check each driver if it supports the WithInstance function. + // `import "github.com/mattes/migrate/database/postgres"` + instance, err := dStub.WithInstance(db, &dStub.Config{}) + if err != nil { + log.Fatal(err) + } + + // Read migrations from /home/mattes/migrations and connect to a local postgres database. + m, err := NewWithDatabaseInstance("file:///home/mattes/migrations", "postgres", instance) + if err != nil { + log.Fatal(err) + } + + // Migrate all the way up ... + if err := m.Up(); err != nil { + log.Fatal(err) + } +} + func TestNewWithSourceInstance(t *testing.T) { dummySource := &DummyInstance{"source"} sInst, err := sStub.WithInstance(dummySource, &sStub.Config{}) @@ -108,6 +150,29 @@ func TestNewWithSourceInstance(t *testing.T) { } } +func ExampleNewWithSourceInstance() { + di := &DummyInstance{"think any client required for a source here"} + + // Create driver instance from DummyInstance di. + // Check each driver if it support the WithInstance function. + // `import "github.com/mattes/migrate/source/stub"` + instance, err := sStub.WithInstance(di, &sStub.Config{}) + if err != nil { + log.Fatal(err) + } + + // Read migrations from Stub and connect to a local postgres database. + m, err := NewWithSourceInstance("stub", instance, "postgres://mattes:secret@localhost:5432/database?sslmode=disable") + if err != nil { + log.Fatal(err) + } + + // Migrate all the way up ... + if err := m.Up(); err != nil { + log.Fatal(err) + } +} + func TestNewWithInstance(t *testing.T) { dummyDb := &DummyInstance{"database"} dbInst, err := dStub.WithInstance(dummyDb, &dStub.Config{}) @@ -141,6 +206,10 @@ func TestNewWithInstance(t *testing.T) { } } +func ExampleNewWithInstance() { + // See NewWithDatabaseInstance and NewWithSourceInstance for an example. +} + func TestClose(t *testing.T) { m, _ := New("stub://", "stub://") sourceErr, databaseErr := m.Close() diff --git a/migration.go b/migration.go index 9d578e7..e91ead8 100644 --- a/migration.go +++ b/migration.go @@ -7,25 +7,65 @@ import ( "time" ) +// DefaultBufferSize sets the in memory buffer size (in Bytes) for every +// pre-read migration (see DefaultPrefetchMigrations). var DefaultBufferSize = uint(100000) +// Migration holds information about a migration. +// It is initially created from data coming from the source and then +// used when run against the database. type Migration struct { - Identifier string - Version uint + // Identifier can be any string to help identifying + // the migration in the source. + Identifier string + + // Version is the version of this migration. + Version uint + + // TargetVersion is the migration version after this migration + // has been applied to the database. TargetVersion int - Body io.ReadCloser + // Body holds an io.ReadCloser to the source. + Body io.ReadCloser + + // BufferedBody holds an buffered io.Reader to the underlying Body. BufferedBody io.Reader - BufferSize uint + + // BufferSize defaults to DefaultBufferSize + BufferSize uint + + // bufferWriter holds an io.WriteCloser and pipes to BufferBody. + // It's an *Closer for flow control. bufferWriter io.WriteCloser - Scheduled time.Time - StartedBuffering time.Time + // Scheduled is the time when the migration was scheduled/ queued. + Scheduled time.Time + + // StartedBuffering is the time when buffering of the migration source started. + StartedBuffering time.Time + + // FinishedBuffering is the time when buffering of the migration source finished. FinishedBuffering time.Time - FinishedReading time.Time - BytesRead int64 + + // FinishedReading is the time when the migration source is fully read. + FinishedReading time.Time + + // BytesRead holds the number of Bytes read from the migration source. + BytesRead int64 } +// NewMigration returns a new Migration and sets the body, identifier, +// version and targetVersion. Body can be nil, which turns this migration +// into a "NilMigration". If no identifier is provided, it will default to "". +// +// What is a NilMigration? +// Usually each migration version coming from source is expected to have an +// Up and Down migration. This is not a hard requirement though, leading to +// a situation where only the Up or Down migration is present. So let's say +// the user wants to migrate up to a version that doesn't have the actual Up +// migration, in that case we still want to apply the version, but with an empty +// body. We are calling that a NilMigration, a migration with an empty body. func NewMigration(body io.ReadCloser, identifier string, version uint, targetVersion int) (*Migration, error) { tnow := time.Now() m := &Migration{ @@ -54,10 +94,13 @@ func NewMigration(body io.ReadCloser, identifier string, version uint, targetVer return m, nil } +// String implements string.Stringer and is used in tests. func (m *Migration) String() string { return fmt.Sprintf("%v [%v=>%v]", m.Identifier, m.Version, m.TargetVersion) } +// StringLong returns a string describing this migration to humans. +// TODO: rename to LogString() func (m *Migration) StringLong() string { directionStr := "u" if m.TargetVersion < int(m.Version) { @@ -66,7 +109,8 @@ func (m *Migration) StringLong() string { return fmt.Sprintf("%v/%v %v", m.Version, directionStr, m.Identifier) } -// Buffer buffers up to BufferSize (blocking, call with goroutine) +// Buffer buffers Body up to BufferSize. +// Calling this function blocks. Call with goroutine. func (m *Migration) Buffer() error { if m.Body == nil { return nil diff --git a/migration_test.go b/migration_test.go new file mode 100644 index 0000000..ffbbdc0 --- /dev/null +++ b/migration_test.go @@ -0,0 +1,39 @@ +package migrate + +import ( + "fmt" + "io/ioutil" + "log" + "strings" +) + +func ExampleNewMigration() { + // Create a dummy migration body, this is coming from the source usually. + body := ioutil.NopCloser(strings.NewReader("dumy migration that creates users table")) + + // Create a new Migration that represents version 1486686016. + // Once this migration has been applied to the database, the new + // migration version will be 1486689359. + migr, err := NewMigration(body, "create_users_table", 1486686016, 1486689359) + if err != nil { + log.Fatal(err) + } + + fmt.Print(migr.StringLong()) + // Output: + // 1486686016/u create_users_table +} + +func ExampleNewMigration_nilMigration() { + // Create a new Migration that represents a NilMigration. + // Once this migration has been applied to the database, the new + // migration version will be 1486689359. + migr, err := NewMigration(nil, "", 1486686016, 1486689359) + if err != nil { + log.Fatal(err) + } + + fmt.Print(migr.StringLong()) + // Output: + // 1486686016/u +} diff --git a/source/github/github.go b/source/github/github.go index 06af167..dff70f7 100644 --- a/source/github/github.go +++ b/source/github/github.go @@ -36,6 +36,9 @@ type Github struct { migrations *source.Migrations } +type Config struct { +} + func (g *Github) Open(url string) (source.Driver, error) { u, err := nurl.Parse(url) if err != nil { @@ -80,7 +83,7 @@ func (g *Github) Open(url string) (source.Driver, error) { return gn, nil } -func WithInstance(client *github.Client) (source.Driver, error) { +func WithInstance(client *github.Client, config *Config) (source.Driver, error) { gn := &Github{ client: client, migrations: source.NewMigrations(), diff --git a/util.go b/util.go index 73647ec..a92e359 100644 --- a/util.go +++ b/util.go @@ -9,10 +9,12 @@ import ( "time" ) +// MultiError holds multiple errors. type MultiError struct { Errs []error } +// NewMultiError returns an error type holding multiple errors. func NewMultiError(errs ...error) MultiError { compactErrs := make([]error, 0) for _, e := range errs { @@ -23,6 +25,7 @@ func NewMultiError(errs ...error) MultiError { return MultiError{compactErrs} } +// Error implements error. Mulitple errors are concatenated with 'and's. func (m MultiError) Error() string { var strs = make([]string, 0) for _, e := range m.Errs { @@ -72,7 +75,7 @@ func (b *slowReader) Close() error { return b.rx.Close() } -var ErrNoName = fmt.Errorf("no name") +var errNoName = fmt.Errorf("no name") func nameFromUrl(url string) (string, error) { u, err := nurl.Parse(url) @@ -81,7 +84,7 @@ func nameFromUrl(url string) (string, error) { } if len(u.Scheme) == 0 { - return "", ErrNoName + return "", errNoName } return u.Scheme, nil