mirror of https://github.com/status-im/migrate.git
add database lock timeouts in migrate and in tests
This commit is contained in:
parent
cd6e62049c
commit
209c056258
6
Makefile
6
Makefile
|
@ -51,6 +51,10 @@ test-with-flags:
|
|||
@go test ./migrate/...
|
||||
|
||||
|
||||
kill-orphaned-docker-containers:
|
||||
docker rm -f $(shell docker ps -aq --filter label=migrate_test)
|
||||
|
||||
|
||||
html-coverage:
|
||||
go tool cover -html=.coverage/combined.txt
|
||||
|
||||
|
@ -113,7 +117,7 @@ endef
|
|||
|
||||
.PHONY: build-cli clean test-short test test-with-flags deps html-coverage \
|
||||
restore-import-paths rewrite-import-paths list-external-deps release \
|
||||
docs kill-docs open-docs
|
||||
docs kill-docs open-docs kill-orphaned-docker-containers
|
||||
|
||||
SHELL = /bin/bash
|
||||
RAND = $(shell echo $$RANDOM)
|
||||
|
|
|
@ -66,7 +66,7 @@ $ migrate -database postgres://localhost:5432/database up 2
|
|||
* To help prevent database corruptions, it supports graceful stops via `GracefulStop chan bool`.
|
||||
* Bring your own logger.
|
||||
* Uses `io.Reader` streams internally for low memory overhead.
|
||||
* Thread-safe.
|
||||
* Thread-safe and no goroutine leaks.
|
||||
|
||||
__[Go Documentation](https://godoc.org/github.com/mattes/migrate)__
|
||||
|
||||
|
|
|
@ -40,13 +40,14 @@ Usage: migrate OPTIONS COMMAND [arg...]
|
|||
migrate [ -version | -help ]
|
||||
|
||||
Options:
|
||||
-source Location of the migrations (driver://url)
|
||||
-path Shorthand for -source=file://path
|
||||
-database Run migrations against this database (driver://url)
|
||||
-prefetch N Number of migrations to load in advance before executing (default 10)
|
||||
-verbose Print verbose logging
|
||||
-version Print version
|
||||
-help Print usage
|
||||
-source Location of the migrations (driver://url)
|
||||
-path Shorthand for -source=file://path
|
||||
-database Run migrations against this database (driver://url)
|
||||
-prefetch N Number of migrations to load in advance before executing (default 10)
|
||||
-lock-timeout N Allow N seconds to acquire database lock (default 15)
|
||||
-verbose Print verbose logging
|
||||
-version Print version
|
||||
-help Print usage
|
||||
|
||||
Commands:
|
||||
goto V Migrate to version V
|
||||
|
|
17
cli/main.go
17
cli/main.go
|
@ -20,6 +20,7 @@ func main() {
|
|||
versionPtr := flag.Bool("version", false, "")
|
||||
verbosePtr := flag.Bool("verbose", false, "")
|
||||
prefetchPtr := flag.Uint("prefetch", 10, "")
|
||||
lockTimeoutPtr := flag.Uint("lock-timeout", 15, "")
|
||||
pathPtr := flag.String("path", "", "")
|
||||
databasePtr := flag.String("database", "", "")
|
||||
sourcePtr := flag.String("source", "", "")
|
||||
|
@ -30,13 +31,14 @@ func main() {
|
|||
migrate [ -version | -help ]
|
||||
|
||||
Options:
|
||||
-source Location of the migrations (driver://url)
|
||||
-path Shorthand for -source=file://path
|
||||
-database Run migrations against this database (driver://url)
|
||||
-prefetch N Number of migrations to load in advance before executing (default 10)
|
||||
-verbose Print verbose logging
|
||||
-version Print version
|
||||
-help Print usage
|
||||
-source Location of the migrations (driver://url)
|
||||
-path Shorthand for -source=file://path
|
||||
-database Run migrations against this database (driver://url)
|
||||
-prefetch N Number of migrations to load in advance before executing (default 10)
|
||||
-lock-timeout N Allow N seconds to acquire database lock (default 15)
|
||||
-verbose Print verbose logging
|
||||
-version Print version
|
||||
-help Print usage
|
||||
|
||||
Commands:
|
||||
goto V Migrate to version V
|
||||
|
@ -81,6 +83,7 @@ Commands:
|
|||
if migraterErr == nil {
|
||||
migrater.Log = log
|
||||
migrater.PrefetchMigrations = *prefetchPtr
|
||||
migrater.LockTimeout = time.Duration(int64(*lockTimeoutPtr)) * time.Second
|
||||
|
||||
// handle Ctrl+c
|
||||
signals := make(chan os.Signal, 1)
|
||||
|
|
|
@ -16,27 +16,44 @@ func init() {
|
|||
database.Register("postgres", &Postgres{})
|
||||
}
|
||||
|
||||
var (
|
||||
ErrNilConfig = fmt.Errorf("no config")
|
||||
ErrNoDatabaseName = fmt.Errorf("no database name")
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
// DatbaseName is the name of the database
|
||||
DatabaseName string
|
||||
}
|
||||
|
||||
func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) {
|
||||
return &Postgres{
|
||||
if config == nil {
|
||||
return nil, ErrNilConfig
|
||||
}
|
||||
|
||||
if len(config.DatabaseName) == 0 {
|
||||
return nil, ErrNoDatabaseName
|
||||
}
|
||||
|
||||
px := &Postgres{
|
||||
db: instance,
|
||||
config: config,
|
||||
}, nil
|
||||
}
|
||||
|
||||
if err := px.ensureVersionTable(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return px, nil
|
||||
}
|
||||
|
||||
type Postgres struct {
|
||||
db *sql.DB
|
||||
url *nurl.URL
|
||||
isLocked bool
|
||||
config *Config
|
||||
}
|
||||
|
||||
var (
|
||||
ErrNoSqlInstance = fmt.Errorf("expected *sql.DB")
|
||||
ErrNoDatabaseName = fmt.Errorf("no database name")
|
||||
)
|
||||
// Open and WithInstance need to garantuee that config is never nil
|
||||
config *Config
|
||||
}
|
||||
|
||||
const tableName = "schema_migrations"
|
||||
|
||||
|
@ -51,15 +68,14 @@ func (p *Postgres) Open(url string) (database.Driver, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if err := db.Ping(); err != nil {
|
||||
px, err := WithInstance(db, &Config{
|
||||
DatabaseName: purl.Path,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
px := &Postgres{
|
||||
db: db,
|
||||
url: purl,
|
||||
}
|
||||
if err := px.ensureVersionTable(); err != nil {
|
||||
if err := db.Ping(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -211,14 +227,7 @@ const AdvisoryLockIdSalt uint = 1486364155
|
|||
|
||||
// inspired by rails migrations, see https://goo.gl/8o9bCT
|
||||
func (p *Postgres) generateAdvisoryLockId() (string, error) {
|
||||
if p.url == nil {
|
||||
return "", ErrNoDatabaseName
|
||||
}
|
||||
dbname := p.url.Path
|
||||
if len(dbname) == 0 {
|
||||
return "", ErrNoDatabaseName
|
||||
}
|
||||
sum := crc32.ChecksumIEEE([]byte(dbname))
|
||||
sum := crc32.ChecksumIEEE([]byte(p.config.DatabaseName))
|
||||
sum = sum * uint32(AdvisoryLockIdSalt)
|
||||
return fmt.Sprintf("%v", sum), nil
|
||||
}
|
||||
|
|
|
@ -7,7 +7,6 @@ import (
|
|||
"database/sql"
|
||||
"fmt"
|
||||
"io"
|
||||
nurl "net/url"
|
||||
"testing"
|
||||
|
||||
"github.com/lib/pq"
|
||||
|
@ -113,12 +112,7 @@ func TestWithInstance(t *testing.T) {
|
|||
|
||||
func TestGenerateAdvisoryLockId(t *testing.T) {
|
||||
p := &Postgres{}
|
||||
|
||||
if _, err := p.generateAdvisoryLockId(); err == nil {
|
||||
t.Errorf("expected err not to be nil")
|
||||
}
|
||||
|
||||
p.url = &nurl.URL{Path: "database_name"}
|
||||
p.config = &Config{DatabaseName: "database_name"}
|
||||
id, err := p.generateAdvisoryLockId()
|
||||
if err != nil {
|
||||
t.Errorf("expected err to be nil, got %v", err)
|
||||
|
|
|
@ -5,8 +5,10 @@ package testing
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/mattes/migrate/database"
|
||||
)
|
||||
|
@ -35,7 +37,25 @@ func TestNilVersion(t *testing.T, d database.Driver) {
|
|||
}
|
||||
|
||||
func TestLockAndUnlock(t *testing.T, d database.Driver) {
|
||||
// TODO: add timeouts, in case something goes wrong
|
||||
// add a timeout, in case there is a deadlock
|
||||
done := make(chan bool, 1)
|
||||
go func() {
|
||||
timeout := time.After(15 * time.Second)
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
case <-timeout:
|
||||
panic(fmt.Sprintf("Timeout after 15 seconds. Looks like a deadlock in Lock/UnLock.\n%#v", d))
|
||||
}
|
||||
}
|
||||
}()
|
||||
defer func() {
|
||||
done <- true
|
||||
}()
|
||||
|
||||
// run the locking test ...
|
||||
|
||||
if err := d.Lock(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
|
65
migrate.go
65
migrate.go
|
@ -21,10 +21,14 @@ import (
|
|||
// since each pre-read migration is buffered in memory. See DefaultBufferSize.
|
||||
var DefaultPrefetchMigrations = uint(10)
|
||||
|
||||
// DefaultLockTimeout sets the max time a database driver has to acquire a lock.
|
||||
var DefaultLockTimeout = 15 * time.Second
|
||||
|
||||
var (
|
||||
ErrNoChange = fmt.Errorf("no change")
|
||||
ErrNilVersion = fmt.Errorf("no migration")
|
||||
ErrLocked = fmt.Errorf("database locked")
|
||||
ErrNoChange = fmt.Errorf("no change")
|
||||
ErrNilVersion = fmt.Errorf("no migration")
|
||||
ErrLocked = fmt.Errorf("database locked")
|
||||
ErrLockTimeout = fmt.Errorf("timeout: can't acquire database lock")
|
||||
)
|
||||
|
||||
// ErrShortLimit is an error returned when not enough migrations
|
||||
|
@ -59,6 +63,10 @@ type Migrate struct {
|
|||
// PrefetchMigrations defaults to DefaultPrefetchMigrations,
|
||||
// but can be set per Migrate instance.
|
||||
PrefetchMigrations uint
|
||||
|
||||
// LockTimeout defaults to DefaultLockTimeout,
|
||||
// but can be set per Migrate instance.
|
||||
LockTimeout time.Duration
|
||||
}
|
||||
|
||||
// New returns a new Migrate instance from a source URL and a database URL.
|
||||
|
@ -165,6 +173,7 @@ func newCommon() *Migrate {
|
|||
return &Migrate{
|
||||
GracefulStop: make(chan bool, 1),
|
||||
PrefetchMigrations: DefaultPrefetchMigrations,
|
||||
LockTimeout: DefaultLockTimeout,
|
||||
isLockedMu: &sync.Mutex{},
|
||||
}
|
||||
}
|
||||
|
@ -174,6 +183,8 @@ func (m *Migrate) Close() (source error, database error) {
|
|||
databaseSrvClose := make(chan error)
|
||||
sourceSrvClose := make(chan error)
|
||||
|
||||
m.logVerbosePrintf("Closing source and database\n")
|
||||
|
||||
go func() {
|
||||
databaseSrvClose <- m.databaseDrv.Close()
|
||||
}()
|
||||
|
@ -768,15 +779,49 @@ func (m *Migrate) lock() error {
|
|||
m.isLockedMu.Lock()
|
||||
defer m.isLockedMu.Unlock()
|
||||
|
||||
if !m.isLocked {
|
||||
if err := m.databaseDrv.Lock(); err != nil {
|
||||
return err
|
||||
}
|
||||
m.isLocked = true
|
||||
return nil
|
||||
if m.isLocked {
|
||||
return ErrLocked
|
||||
}
|
||||
|
||||
return ErrLocked
|
||||
// create done channel, used in the timeout goroutine
|
||||
done := make(chan bool, 1)
|
||||
defer func() {
|
||||
done <- true
|
||||
}()
|
||||
|
||||
// use errchan to signal error back to this context
|
||||
errchan := make(chan error, 2)
|
||||
|
||||
// start timeout goroutine
|
||||
timeout := time.After(m.LockTimeout)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
case <-timeout:
|
||||
errchan <- ErrLockTimeout
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// now try to acquire the lock
|
||||
go func() {
|
||||
if err := m.databaseDrv.Lock(); err != nil {
|
||||
errchan <- err
|
||||
} else {
|
||||
errchan <- nil
|
||||
}
|
||||
return
|
||||
}()
|
||||
|
||||
// wait until we either recieve ErrLockTimeout or error from Lock operation
|
||||
err := <-errchan
|
||||
if err == nil {
|
||||
m.isLocked = true
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// unlock is a thread safe helper function to unlock the database.
|
||||
|
|
Loading…
Reference in New Issue