From 209c056258769d79b9df4b54f680b8ebc2833be8 Mon Sep 17 00:00:00 2001 From: Matthias Kadenbach Date: Sat, 11 Feb 2017 19:15:54 -0800 Subject: [PATCH] add database lock timeouts in migrate and in tests --- Makefile | 6 ++- README.md | 2 +- cli/README.md | 15 +++---- cli/main.go | 17 ++++---- database/postgres/postgres.go | 55 ++++++++++++++----------- database/postgres/postgres_test.go | 8 +--- database/testing/testing.go | 22 +++++++++- migrate.go | 65 +++++++++++++++++++++++++----- 8 files changed, 133 insertions(+), 57 deletions(-) diff --git a/Makefile b/Makefile index 945b61d..baa8705 100644 --- a/Makefile +++ b/Makefile @@ -51,6 +51,10 @@ test-with-flags: @go test ./migrate/... +kill-orphaned-docker-containers: + docker rm -f $(shell docker ps -aq --filter label=migrate_test) + + html-coverage: go tool cover -html=.coverage/combined.txt @@ -113,7 +117,7 @@ endef .PHONY: build-cli clean test-short test test-with-flags deps html-coverage \ restore-import-paths rewrite-import-paths list-external-deps release \ - docs kill-docs open-docs + docs kill-docs open-docs kill-orphaned-docker-containers SHELL = /bin/bash RAND = $(shell echo $$RANDOM) diff --git a/README.md b/README.md index bb09ed6..91eb79f 100644 --- a/README.md +++ b/README.md @@ -66,7 +66,7 @@ $ migrate -database postgres://localhost:5432/database up 2 * To help prevent database corruptions, it supports graceful stops via `GracefulStop chan bool`. * Bring your own logger. * Uses `io.Reader` streams internally for low memory overhead. - * Thread-safe. + * Thread-safe and no goroutine leaks. __[Go Documentation](https://godoc.org/github.com/mattes/migrate)__ diff --git a/cli/README.md b/cli/README.md index 07819b0..b1d65d2 100644 --- a/cli/README.md +++ b/cli/README.md @@ -40,13 +40,14 @@ Usage: migrate OPTIONS COMMAND [arg...] migrate [ -version | -help ] Options: - -source Location of the migrations (driver://url) - -path Shorthand for -source=file://path - -database Run migrations against this database (driver://url) - -prefetch N Number of migrations to load in advance before executing (default 10) - -verbose Print verbose logging - -version Print version - -help Print usage + -source Location of the migrations (driver://url) + -path Shorthand for -source=file://path + -database Run migrations against this database (driver://url) + -prefetch N Number of migrations to load in advance before executing (default 10) + -lock-timeout N Allow N seconds to acquire database lock (default 15) + -verbose Print verbose logging + -version Print version + -help Print usage Commands: goto V Migrate to version V diff --git a/cli/main.go b/cli/main.go index d73d59b..57bfacb 100644 --- a/cli/main.go +++ b/cli/main.go @@ -20,6 +20,7 @@ func main() { versionPtr := flag.Bool("version", false, "") verbosePtr := flag.Bool("verbose", false, "") prefetchPtr := flag.Uint("prefetch", 10, "") + lockTimeoutPtr := flag.Uint("lock-timeout", 15, "") pathPtr := flag.String("path", "", "") databasePtr := flag.String("database", "", "") sourcePtr := flag.String("source", "", "") @@ -30,13 +31,14 @@ func main() { migrate [ -version | -help ] Options: - -source Location of the migrations (driver://url) - -path Shorthand for -source=file://path - -database Run migrations against this database (driver://url) - -prefetch N Number of migrations to load in advance before executing (default 10) - -verbose Print verbose logging - -version Print version - -help Print usage + -source Location of the migrations (driver://url) + -path Shorthand for -source=file://path + -database Run migrations against this database (driver://url) + -prefetch N Number of migrations to load in advance before executing (default 10) + -lock-timeout N Allow N seconds to acquire database lock (default 15) + -verbose Print verbose logging + -version Print version + -help Print usage Commands: goto V Migrate to version V @@ -81,6 +83,7 @@ Commands: if migraterErr == nil { migrater.Log = log migrater.PrefetchMigrations = *prefetchPtr + migrater.LockTimeout = time.Duration(int64(*lockTimeoutPtr)) * time.Second // handle Ctrl+c signals := make(chan os.Signal, 1) diff --git a/database/postgres/postgres.go b/database/postgres/postgres.go index 1353c02..ff2b2e4 100644 --- a/database/postgres/postgres.go +++ b/database/postgres/postgres.go @@ -16,27 +16,44 @@ func init() { database.Register("postgres", &Postgres{}) } +var ( + ErrNilConfig = fmt.Errorf("no config") + ErrNoDatabaseName = fmt.Errorf("no database name") +) + type Config struct { + // DatbaseName is the name of the database + DatabaseName string } func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) { - return &Postgres{ + if config == nil { + return nil, ErrNilConfig + } + + if len(config.DatabaseName) == 0 { + return nil, ErrNoDatabaseName + } + + px := &Postgres{ db: instance, config: config, - }, nil + } + + if err := px.ensureVersionTable(); err != nil { + return nil, err + } + + return px, nil } type Postgres struct { db *sql.DB - url *nurl.URL isLocked bool - config *Config -} -var ( - ErrNoSqlInstance = fmt.Errorf("expected *sql.DB") - ErrNoDatabaseName = fmt.Errorf("no database name") -) + // Open and WithInstance need to garantuee that config is never nil + config *Config +} const tableName = "schema_migrations" @@ -51,15 +68,14 @@ func (p *Postgres) Open(url string) (database.Driver, error) { return nil, err } - if err := db.Ping(); err != nil { + px, err := WithInstance(db, &Config{ + DatabaseName: purl.Path, + }) + if err != nil { return nil, err } - px := &Postgres{ - db: db, - url: purl, - } - if err := px.ensureVersionTable(); err != nil { + if err := db.Ping(); err != nil { return nil, err } @@ -211,14 +227,7 @@ const AdvisoryLockIdSalt uint = 1486364155 // inspired by rails migrations, see https://goo.gl/8o9bCT func (p *Postgres) generateAdvisoryLockId() (string, error) { - if p.url == nil { - return "", ErrNoDatabaseName - } - dbname := p.url.Path - if len(dbname) == 0 { - return "", ErrNoDatabaseName - } - sum := crc32.ChecksumIEEE([]byte(dbname)) + sum := crc32.ChecksumIEEE([]byte(p.config.DatabaseName)) sum = sum * uint32(AdvisoryLockIdSalt) return fmt.Sprintf("%v", sum), nil } diff --git a/database/postgres/postgres_test.go b/database/postgres/postgres_test.go index d30fb3f..a0541f4 100644 --- a/database/postgres/postgres_test.go +++ b/database/postgres/postgres_test.go @@ -7,7 +7,6 @@ import ( "database/sql" "fmt" "io" - nurl "net/url" "testing" "github.com/lib/pq" @@ -113,12 +112,7 @@ func TestWithInstance(t *testing.T) { func TestGenerateAdvisoryLockId(t *testing.T) { p := &Postgres{} - - if _, err := p.generateAdvisoryLockId(); err == nil { - t.Errorf("expected err not to be nil") - } - - p.url = &nurl.URL{Path: "database_name"} + p.config = &Config{DatabaseName: "database_name"} id, err := p.generateAdvisoryLockId() if err != nil { t.Errorf("expected err to be nil, got %v", err) diff --git a/database/testing/testing.go b/database/testing/testing.go index 0e087c6..65d5dc5 100644 --- a/database/testing/testing.go +++ b/database/testing/testing.go @@ -5,8 +5,10 @@ package testing import ( "bytes" + "fmt" "io" "testing" + "time" "github.com/mattes/migrate/database" ) @@ -35,7 +37,25 @@ func TestNilVersion(t *testing.T, d database.Driver) { } func TestLockAndUnlock(t *testing.T, d database.Driver) { - // TODO: add timeouts, in case something goes wrong + // add a timeout, in case there is a deadlock + done := make(chan bool, 1) + go func() { + timeout := time.After(15 * time.Second) + for { + select { + case <-done: + return + case <-timeout: + panic(fmt.Sprintf("Timeout after 15 seconds. Looks like a deadlock in Lock/UnLock.\n%#v", d)) + } + } + }() + defer func() { + done <- true + }() + + // run the locking test ... + if err := d.Lock(); err != nil { t.Fatal(err) } diff --git a/migrate.go b/migrate.go index 8a6c7f0..1c794ec 100644 --- a/migrate.go +++ b/migrate.go @@ -21,10 +21,14 @@ import ( // since each pre-read migration is buffered in memory. See DefaultBufferSize. var DefaultPrefetchMigrations = uint(10) +// DefaultLockTimeout sets the max time a database driver has to acquire a lock. +var DefaultLockTimeout = 15 * time.Second + var ( - ErrNoChange = fmt.Errorf("no change") - ErrNilVersion = fmt.Errorf("no migration") - ErrLocked = fmt.Errorf("database locked") + ErrNoChange = fmt.Errorf("no change") + ErrNilVersion = fmt.Errorf("no migration") + ErrLocked = fmt.Errorf("database locked") + ErrLockTimeout = fmt.Errorf("timeout: can't acquire database lock") ) // ErrShortLimit is an error returned when not enough migrations @@ -59,6 +63,10 @@ type Migrate struct { // PrefetchMigrations defaults to DefaultPrefetchMigrations, // but can be set per Migrate instance. PrefetchMigrations uint + + // LockTimeout defaults to DefaultLockTimeout, + // but can be set per Migrate instance. + LockTimeout time.Duration } // New returns a new Migrate instance from a source URL and a database URL. @@ -165,6 +173,7 @@ func newCommon() *Migrate { return &Migrate{ GracefulStop: make(chan bool, 1), PrefetchMigrations: DefaultPrefetchMigrations, + LockTimeout: DefaultLockTimeout, isLockedMu: &sync.Mutex{}, } } @@ -174,6 +183,8 @@ func (m *Migrate) Close() (source error, database error) { databaseSrvClose := make(chan error) sourceSrvClose := make(chan error) + m.logVerbosePrintf("Closing source and database\n") + go func() { databaseSrvClose <- m.databaseDrv.Close() }() @@ -768,15 +779,49 @@ func (m *Migrate) lock() error { m.isLockedMu.Lock() defer m.isLockedMu.Unlock() - if !m.isLocked { - if err := m.databaseDrv.Lock(); err != nil { - return err - } - m.isLocked = true - return nil + if m.isLocked { + return ErrLocked } - return ErrLocked + // create done channel, used in the timeout goroutine + done := make(chan bool, 1) + defer func() { + done <- true + }() + + // use errchan to signal error back to this context + errchan := make(chan error, 2) + + // start timeout goroutine + timeout := time.After(m.LockTimeout) + go func() { + for { + select { + case <-done: + return + case <-timeout: + errchan <- ErrLockTimeout + return + } + } + }() + + // now try to acquire the lock + go func() { + if err := m.databaseDrv.Lock(); err != nil { + errchan <- err + } else { + errchan <- nil + } + return + }() + + // wait until we either recieve ErrLockTimeout or error from Lock operation + err := <-errchan + if err == nil { + m.isLocked = true + } + return err } // unlock is a thread safe helper function to unlock the database.