add code comments for package migrate

This commit is contained in:
Matthias Kadenbach 2017-02-09 16:59:30 -08:00
parent d5daf6f6ae
commit 199678e1bc
9 changed files with 259 additions and 17 deletions

View File

@ -1,3 +1,7 @@
// Package migrate reads migrations from sources and runs them against databases.
// Sources are defined by the `source.Driver` and databases by the `database.Driver`
// interface. The driver interfaces are kept "dump", all migration logic is kept
// in this package.
package migrate
import (
@ -10,6 +14,11 @@ import (
"github.com/mattes/migrate/source"
)
// DefaultPrefetchMigrations sets the number of migrations to pre-read
// from the source. This is helpful if the source is remote, but has little
// effect for a local source (i.e. file system).
// Please note that this setting has a major impact on the memory usage,
// since each pre-read migration is buffered in memory. See DefaultBufferSize.
var DefaultPrefetchMigrations = uint(10)
var (
@ -18,10 +27,13 @@ var (
ErrLocked = fmt.Errorf("database locked")
)
// ErrShortLimit is an error returned when not enough migrations
// can be returned by a source for a given limit.
type ErrShortLimit struct {
Short uint
}
// Error implements the error interface.
func (e ErrShortLimit) Error() string {
return fmt.Sprintf("limit %v short", e.Short)
}
@ -32,17 +44,25 @@ type Migrate struct {
databaseName string
databaseDrv database.Driver
// Log accepts a Logger interface
Log Logger
// GracefulStop accepts `true` and will stop executing migrations
// as soon as possible at a safe break point, so that the database
// is not corrpupted.
GracefulStop chan bool
isGracefulStop bool
isLockedMu *sync.Mutex
isLocked bool
// PrefetchMigrations defaults to DefaultPrefetchMigrations,
// but can be set per Migrate instance.
PrefetchMigrations uint
}
// New returns a new Migrate instance from a source URL and a database URL.
// The URL scheme is defined by each driver.
func New(sourceUrl, databaseUrl string) (*Migrate, error) {
m := newCommon()
@ -73,6 +93,10 @@ func New(sourceUrl, databaseUrl string) (*Migrate, error) {
return m, nil
}
// NewWithDatabaseInstance returns a new Migrate instance from a source URL
// and an existing database instance. The source URL scheme is defined by each driver.
// Use any string that can serve as an identifier during logging as databaseName.
// You are responsible for closing the underlying database client if necessary.
func NewWithDatabaseInstance(sourceUrl string, databaseName string, databaseInstance database.Driver) (*Migrate, error) {
m := newCommon()
@ -95,6 +119,10 @@ func NewWithDatabaseInstance(sourceUrl string, databaseName string, databaseInst
return m, nil
}
// NewWithSourceInstance returns a new Migrate instance from an existing source instance
// and a database URL. The database URL scheme is defined by each driver.
// Use any string that can serve as an identifier during logging as sourceName.
// You are responsible for closing the underlying source client if necessary.
func NewWithSourceInstance(sourceName string, sourceInstance source.Driver, databaseUrl string) (*Migrate, error) {
m := newCommon()
@ -117,6 +145,10 @@ func NewWithSourceInstance(sourceName string, sourceInstance source.Driver, data
return m, nil
}
// NewWithInstance returns a new Migrate instance from an existing source and
// database instance. Use any string that can serve as an identifier during logging
// as sourceName and databaseName. You are responsible for closing down
// the underlying source and database client if necessary.
func NewWithInstance(sourceName string, sourceInstance source.Driver, databaseName string, databaseInstance database.Driver) (*Migrate, error) {
m := newCommon()
@ -137,6 +169,7 @@ func newCommon() *Migrate {
}
}
// Close closes the the source and the database.
func (m *Migrate) Close() (sourceErr error, databaseErr error) {
databaseSrvClose := make(chan error)
sourceSrvClose := make(chan error)
@ -152,6 +185,8 @@ func (m *Migrate) Close() (sourceErr error, databaseErr error) {
return <-sourceSrvClose, <-databaseSrvClose
}
// Migrate looks at the currently active migration version,
// then migrates either up or down to the specified version.
func (m *Migrate) Migrate(version uint) error {
if err := m.lock(); err != nil {
return err
@ -168,6 +203,8 @@ func (m *Migrate) Migrate(version uint) error {
return m.unlockErr(m.runMigrations(ret))
}
// Steps looks at the currently active migration version.
// It will migrate up if n > 0, and down if n < 0.
func (m *Migrate) Steps(n int) error {
if n == 0 {
return ErrNoChange
@ -193,6 +230,8 @@ func (m *Migrate) Steps(n int) error {
return m.unlockErr(m.runMigrations(ret))
}
// Up looks at the currently active migration version
// and will migrate all the way up (applying all up migrations).
func (m *Migrate) Up() error {
if err := m.lock(); err != nil {
return err
@ -209,6 +248,8 @@ func (m *Migrate) Up() error {
return m.unlockErr(m.runMigrations(ret))
}
// Down looks at the currently active migration version
// and will migrate all the way down (applying all down migrations).
func (m *Migrate) Down() error {
if err := m.lock(); err != nil {
return err
@ -224,6 +265,7 @@ func (m *Migrate) Down() error {
return m.unlockErr(m.runMigrations(ret))
}
// Drop deletes everyting in the database.
func (m *Migrate) Drop() error {
if err := m.lock(); err != nil {
return err
@ -234,6 +276,8 @@ func (m *Migrate) Drop() error {
return m.unlock()
}
// Version returns the currently active migration version.
// If no migration has been applied, yet, it will return ErrNilVersion.
func (m *Migrate) Version() (uint, error) {
v, err := m.databaseDrv.Version()
if err != nil {
@ -247,6 +291,10 @@ func (m *Migrate) Version() (uint, error) {
return suint(v), nil
}
// read reads either up or down migrations from source `from` to `to`.
// Each migration is then written to the ret channel.
// If an error occurs during reading, that error is written to the ret channel, too.
// Once read is done reading it will close the ret channel.
func (m *Migrate) read(from int, to int, ret chan<- interface{}) {
defer close(ret)
@ -354,6 +402,11 @@ func (m *Migrate) read(from int, to int, ret chan<- interface{}) {
}
}
// readUp reads up migrations from `from` limitted by `limit`.
// limit can be -1, implying no limit and reading until there are no more migrations.
// Each migration is then written to the ret channel.
// If an error occurs during reading, that error is written to the ret channel, too.
// Once readUp is done reading it will close the ret channel.
func (m *Migrate) readUp(from int, limit int, ret chan<- interface{}) {
defer close(ret)
@ -441,6 +494,11 @@ func (m *Migrate) readUp(from int, limit int, ret chan<- interface{}) {
}
}
// readDown reads down migrations from `from` limitted by `limit`.
// limit can be -1, implying no limit and reading until there are no more migrations.
// Each migration is then written to the ret channel.
// If an error occurs during reading, that error is written to the ret channel, too.
// Once readDown is done reading it will close the ret channel.
func (m *Migrate) readDown(from int, limit int, ret chan<- interface{}) {
defer close(ret)
@ -518,7 +576,12 @@ func (m *Migrate) readDown(from int, limit int, ret chan<- interface{}) {
}
}
// ret chan expects *Migration or error
// runMigrations reads *Migration and error from a channel. Any other type
// sent on this channel will result in a panic. Each migration is then
// proxied to the database driver and run against the database.
// Before running a newly received migration it will check if it's supposed
// to stop execution because it might have received a stop signal on the
// GracefulStop channel.
func (m *Migrate) runMigrations(ret <-chan interface{}) error {
for r := range ret {
@ -566,6 +629,8 @@ func (m *Migrate) runMigrations(ret <-chan interface{}) error {
return nil
}
// versionExists checks the source if either the up or down migration for
// the specified migration version exists.
func (m *Migrate) versionExists(version uint) error {
// try up migration first
up, _, err := m.sourceDrv.ReadUp(version)
@ -592,6 +657,9 @@ func (m *Migrate) versionExists(version uint) error {
return os.ErrNotExist
}
// stop returns true if no more migrations should be run against the database
// because a stop signal was received on the GracefulStop channel.
// Calls are cheap and this function is not blocking.
func (m *Migrate) stop() bool {
if m.isGracefulStop {
return true
@ -607,6 +675,8 @@ func (m *Migrate) stop() bool {
}
}
// newMigration is a helper func that returns a *Migration for the
// specified version and targetVersion.
func (m *Migrate) newMigration(version uint, targetVersion int) (*Migration, error) {
var migr *Migration
@ -660,6 +730,8 @@ func (m *Migrate) newMigration(version uint, targetVersion int) (*Migration, err
return migr, nil
}
// lock is a thread safe helper function to lock the database.
// It should be called as late as possible when running migrations.
func (m *Migrate) lock() error {
m.isLockedMu.Lock()
defer m.isLockedMu.Unlock()
@ -675,13 +747,15 @@ func (m *Migrate) lock() error {
return ErrLocked
}
// unlock is a thread safe helper function to unlock the database.
// It should be called as early as possible when no more migrations are
// expected to be executed.
func (m *Migrate) unlock() error {
m.isLockedMu.Lock()
defer m.isLockedMu.Unlock()
if err := m.databaseDrv.Unlock(); err != nil {
// can potentially create deadlock when never succeeds
// TODO: add timeout
// BUG: Can potentially create a deadlock. Add a timeout.
return err
}
@ -689,18 +763,23 @@ func (m *Migrate) unlock() error {
return nil
}
// unlockErr calls unlock and returns a combined error
// if a prevErr is not nil.
func (m *Migrate) unlockErr(prevErr error) error {
if err := m.unlock(); err != nil {
return NewMultiError(prevErr, err)
}
return prevErr
}
// logPrintf writes to m.Log if not nil
func (m *Migrate) logPrintf(format string, v ...interface{}) {
if m.Log != nil {
m.Log.Printf(format, v...)
}
}
// logVerbosePrintf writes to m.Log if not nil. Use for verbose logging output.
func (m *Migrate) logVerbosePrintf(format string, v ...interface{}) {
if m.Log != nil && m.Log.Verbose() {
m.Log.Printf(format, v...)

View File

@ -1,3 +1,5 @@
// Deprecated: package direction is deprecated.
// Will be removed soon.
package direction
type Direction int

View File

@ -1,3 +1,5 @@
// Deprecated: package file is deprecated.
// Will be removed soon.
package file
import (

View File

@ -1,4 +1,5 @@
// Deprecated: package migrate is here to make sure v2 is downwards compatible with v1
// Deprecated: package migrate makes this version backwards compatible.
// Expect this to be removed very soon.
package migrate
import (

View File

@ -2,8 +2,10 @@ package migrate
import (
"bytes"
"database/sql"
"fmt"
"io/ioutil"
"log"
"os"
"testing"
@ -15,7 +17,6 @@ import (
// u = up migration, d = down migration, n = version
// | 1 | - | 3 | 4 | 5 | - | 7 |
// | u d | - | u | u d | d | - | u d |
// var sourceStubMigrations = source.NewMigrations()
var sourceStubMigrations *source.Migrations
@ -54,6 +55,19 @@ func TestNew(t *testing.T) {
}
}
func ExampleNew() {
// Read migrations from /home/mattes/migrations and connect to a local postgres database.
m, err := New("file:///home/mattes/migrations", "postgres://mattes:secret@localhost:5432/database?sslmode=disable")
if err != nil {
log.Fatal(err)
}
// Migrate all the way up ...
if err := m.Up(); err != nil {
log.Fatal(err)
}
}
func TestNewWithDatabaseInstance(t *testing.T) {
dummyDb := &DummyInstance{"database"}
dbInst, err := dStub.WithInstance(dummyDb, &dStub.Config{})
@ -81,6 +95,34 @@ func TestNewWithDatabaseInstance(t *testing.T) {
}
}
func ExampleNewWithDatabaseInstance() {
// Create and use an existing database instance.
db, err := sql.Open("postgres", "postgres://mattes:secret@localhost:5432/database?sslmode=disable")
if err != nil {
log.Fatal(err)
}
defer db.Close()
// Create driver instance from db.
// Check each driver if it supports the WithInstance function.
// `import "github.com/mattes/migrate/database/postgres"`
instance, err := dStub.WithInstance(db, &dStub.Config{})
if err != nil {
log.Fatal(err)
}
// Read migrations from /home/mattes/migrations and connect to a local postgres database.
m, err := NewWithDatabaseInstance("file:///home/mattes/migrations", "postgres", instance)
if err != nil {
log.Fatal(err)
}
// Migrate all the way up ...
if err := m.Up(); err != nil {
log.Fatal(err)
}
}
func TestNewWithSourceInstance(t *testing.T) {
dummySource := &DummyInstance{"source"}
sInst, err := sStub.WithInstance(dummySource, &sStub.Config{})
@ -108,6 +150,29 @@ func TestNewWithSourceInstance(t *testing.T) {
}
}
func ExampleNewWithSourceInstance() {
di := &DummyInstance{"think any client required for a source here"}
// Create driver instance from DummyInstance di.
// Check each driver if it support the WithInstance function.
// `import "github.com/mattes/migrate/source/stub"`
instance, err := sStub.WithInstance(di, &sStub.Config{})
if err != nil {
log.Fatal(err)
}
// Read migrations from Stub and connect to a local postgres database.
m, err := NewWithSourceInstance("stub", instance, "postgres://mattes:secret@localhost:5432/database?sslmode=disable")
if err != nil {
log.Fatal(err)
}
// Migrate all the way up ...
if err := m.Up(); err != nil {
log.Fatal(err)
}
}
func TestNewWithInstance(t *testing.T) {
dummyDb := &DummyInstance{"database"}
dbInst, err := dStub.WithInstance(dummyDb, &dStub.Config{})
@ -141,6 +206,10 @@ func TestNewWithInstance(t *testing.T) {
}
}
func ExampleNewWithInstance() {
// See NewWithDatabaseInstance and NewWithSourceInstance for an example.
}
func TestClose(t *testing.T) {
m, _ := New("stub://", "stub://")
sourceErr, databaseErr := m.Close()

View File

@ -7,25 +7,65 @@ import (
"time"
)
// DefaultBufferSize sets the in memory buffer size (in Bytes) for every
// pre-read migration (see DefaultPrefetchMigrations).
var DefaultBufferSize = uint(100000)
// Migration holds information about a migration.
// It is initially created from data coming from the source and then
// used when run against the database.
type Migration struct {
Identifier string
Version uint
// Identifier can be any string to help identifying
// the migration in the source.
Identifier string
// Version is the version of this migration.
Version uint
// TargetVersion is the migration version after this migration
// has been applied to the database.
TargetVersion int
Body io.ReadCloser
// Body holds an io.ReadCloser to the source.
Body io.ReadCloser
// BufferedBody holds an buffered io.Reader to the underlying Body.
BufferedBody io.Reader
BufferSize uint
// BufferSize defaults to DefaultBufferSize
BufferSize uint
// bufferWriter holds an io.WriteCloser and pipes to BufferBody.
// It's an *Closer for flow control.
bufferWriter io.WriteCloser
Scheduled time.Time
StartedBuffering time.Time
// Scheduled is the time when the migration was scheduled/ queued.
Scheduled time.Time
// StartedBuffering is the time when buffering of the migration source started.
StartedBuffering time.Time
// FinishedBuffering is the time when buffering of the migration source finished.
FinishedBuffering time.Time
FinishedReading time.Time
BytesRead int64
// FinishedReading is the time when the migration source is fully read.
FinishedReading time.Time
// BytesRead holds the number of Bytes read from the migration source.
BytesRead int64
}
// NewMigration returns a new Migration and sets the body, identifier,
// version and targetVersion. Body can be nil, which turns this migration
// into a "NilMigration". If no identifier is provided, it will default to "<empty>".
//
// What is a NilMigration?
// Usually each migration version coming from source is expected to have an
// Up and Down migration. This is not a hard requirement though, leading to
// a situation where only the Up or Down migration is present. So let's say
// the user wants to migrate up to a version that doesn't have the actual Up
// migration, in that case we still want to apply the version, but with an empty
// body. We are calling that a NilMigration, a migration with an empty body.
func NewMigration(body io.ReadCloser, identifier string, version uint, targetVersion int) (*Migration, error) {
tnow := time.Now()
m := &Migration{
@ -54,10 +94,13 @@ func NewMigration(body io.ReadCloser, identifier string, version uint, targetVer
return m, nil
}
// String implements string.Stringer and is used in tests.
func (m *Migration) String() string {
return fmt.Sprintf("%v [%v=>%v]", m.Identifier, m.Version, m.TargetVersion)
}
// StringLong returns a string describing this migration to humans.
// TODO: rename to LogString()
func (m *Migration) StringLong() string {
directionStr := "u"
if m.TargetVersion < int(m.Version) {
@ -66,7 +109,8 @@ func (m *Migration) StringLong() string {
return fmt.Sprintf("%v/%v %v", m.Version, directionStr, m.Identifier)
}
// Buffer buffers up to BufferSize (blocking, call with goroutine)
// Buffer buffers Body up to BufferSize.
// Calling this function blocks. Call with goroutine.
func (m *Migration) Buffer() error {
if m.Body == nil {
return nil

39
migration_test.go Normal file
View File

@ -0,0 +1,39 @@
package migrate
import (
"fmt"
"io/ioutil"
"log"
"strings"
)
func ExampleNewMigration() {
// Create a dummy migration body, this is coming from the source usually.
body := ioutil.NopCloser(strings.NewReader("dumy migration that creates users table"))
// Create a new Migration that represents version 1486686016.
// Once this migration has been applied to the database, the new
// migration version will be 1486689359.
migr, err := NewMigration(body, "create_users_table", 1486686016, 1486689359)
if err != nil {
log.Fatal(err)
}
fmt.Print(migr.StringLong())
// Output:
// 1486686016/u create_users_table
}
func ExampleNewMigration_nilMigration() {
// Create a new Migration that represents a NilMigration.
// Once this migration has been applied to the database, the new
// migration version will be 1486689359.
migr, err := NewMigration(nil, "", 1486686016, 1486689359)
if err != nil {
log.Fatal(err)
}
fmt.Print(migr.StringLong())
// Output:
// 1486686016/u <empty>
}

View File

@ -36,6 +36,9 @@ type Github struct {
migrations *source.Migrations
}
type Config struct {
}
func (g *Github) Open(url string) (source.Driver, error) {
u, err := nurl.Parse(url)
if err != nil {
@ -80,7 +83,7 @@ func (g *Github) Open(url string) (source.Driver, error) {
return gn, nil
}
func WithInstance(client *github.Client) (source.Driver, error) {
func WithInstance(client *github.Client, config *Config) (source.Driver, error) {
gn := &Github{
client: client,
migrations: source.NewMigrations(),

View File

@ -9,10 +9,12 @@ import (
"time"
)
// MultiError holds multiple errors.
type MultiError struct {
Errs []error
}
// NewMultiError returns an error type holding multiple errors.
func NewMultiError(errs ...error) MultiError {
compactErrs := make([]error, 0)
for _, e := range errs {
@ -23,6 +25,7 @@ func NewMultiError(errs ...error) MultiError {
return MultiError{compactErrs}
}
// Error implements error. Mulitple errors are concatenated with 'and's.
func (m MultiError) Error() string {
var strs = make([]string, 0)
for _, e := range m.Errs {
@ -72,7 +75,7 @@ func (b *slowReader) Close() error {
return b.rx.Close()
}
var ErrNoName = fmt.Errorf("no name")
var errNoName = fmt.Errorf("no name")
func nameFromUrl(url string) (string, error) {
u, err := nurl.Parse(url)
@ -81,7 +84,7 @@ func nameFromUrl(url string) (string, error) {
}
if len(u.Scheme) == 0 {
return "", ErrNoName
return "", errNoName
}
return u.Scheme, nil