mirror of https://github.com/status-im/migrate.git
fixes mysql lock failure
I believe this closes #297 as well. I have been working on adding testing of migrations and it requires acquiring the lock in mysql multiple times to go up and down. After nailing this down to GET_LOCK returning a failure for every subsequent GET_LOCK call after the first, I decided it was time to rtfm and lo and behold: https://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_release-lock RELEASE_LOCK will not work if called from a different thread than GET_LOCK. The simplest solution using the golang database/sql pkg appears to be to just get a single conn to use for every operation. since migrations are happening sequentially, I don't think this will be a performance hit (possible improvement). now calling Lock() and Unlock() multiple times will work; prior to this patch, every call to RELEASE_LOCK would fail. this required minimal changes to use the *sql.Conn methods instead of the *sql.DB methods. other changes: * upped time out to 10s on GET_LOCK, 1s timeout can too easily leave us in a state where we think we have the lock but it has timed out (during the operation). * fixes SetVersion which was not using the tx it was intending to, and fixed a bug where the transaction could have been left open since Rollback or Commit may never have been called. I added a test but it does not seem to come up in the previous patch, at least easily, I tried some shenanigans. At least, this behavior should be fixed with this patch and hopefully the test / comment is advisory enough. thank you for maintaining this library, it has been relatively easy to integrate with and most stuff works (pg works great :)
This commit is contained in:
parent
53dd06057f
commit
636c911d33
|
@ -1,6 +1,7 @@
|
|||
package mysql
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"database/sql"
|
||||
|
@ -35,7 +36,9 @@ type Config struct {
|
|||
}
|
||||
|
||||
type Mysql struct {
|
||||
db *sql.DB
|
||||
// mysql RELEASE_LOCK must be called from the same conn, so
|
||||
// just do everything over a single conn anyway.
|
||||
db *sql.Conn
|
||||
isLocked bool
|
||||
|
||||
config *Config
|
||||
|
@ -67,8 +70,13 @@ func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) {
|
|||
config.MigrationsTable = DefaultMigrationsTable
|
||||
}
|
||||
|
||||
conn, err := instance.Conn(context.Background())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
mx := &Mysql{
|
||||
db: instance,
|
||||
db: conn,
|
||||
config: config,
|
||||
}
|
||||
|
||||
|
@ -162,9 +170,9 @@ func (m *Mysql) Lock() error {
|
|||
return err
|
||||
}
|
||||
|
||||
query := "SELECT GET_LOCK(?, 1)"
|
||||
query := "SELECT GET_LOCK(?, 10)"
|
||||
var success bool
|
||||
if err := m.db.QueryRow(query, aid).Scan(&success); err != nil {
|
||||
if err := m.db.QueryRowContext(context.Background(), query, aid).Scan(&success); err != nil {
|
||||
return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)}
|
||||
}
|
||||
|
||||
|
@ -188,10 +196,14 @@ func (m *Mysql) Unlock() error {
|
|||
}
|
||||
|
||||
query := `SELECT RELEASE_LOCK(?)`
|
||||
if _, err := m.db.Exec(query, aid); err != nil {
|
||||
if _, err := m.db.ExecContext(context.Background(), query, aid); err != nil {
|
||||
return &database.Error{OrigErr: err, Query: []byte(query)}
|
||||
}
|
||||
|
||||
// NOTE: RELEASE_LOCK could return NULL or (or 0 if the code is changed),
|
||||
// in which case isLocked should be true until the timeout expires -- synchronizing
|
||||
// these states is likely not worth trying to do; reconsider the necessity of isLocked.
|
||||
|
||||
m.isLocked = false
|
||||
return nil
|
||||
}
|
||||
|
@ -203,7 +215,7 @@ func (m *Mysql) Run(migration io.Reader) error {
|
|||
}
|
||||
|
||||
query := string(migr[:])
|
||||
if _, err := m.db.Exec(query); err != nil {
|
||||
if _, err := m.db.ExecContext(context.Background(), query); err != nil {
|
||||
return database.Error{OrigErr: err, Err: "migration failed", Query: migr}
|
||||
}
|
||||
|
||||
|
@ -211,19 +223,20 @@ func (m *Mysql) Run(migration io.Reader) error {
|
|||
}
|
||||
|
||||
func (m *Mysql) SetVersion(version int, dirty bool) error {
|
||||
tx, err := m.db.Begin()
|
||||
tx, err := m.db.BeginTx(context.Background(), &sql.TxOptions{})
|
||||
if err != nil {
|
||||
return &database.Error{OrigErr: err, Err: "transaction start failed"}
|
||||
}
|
||||
|
||||
query := "TRUNCATE `" + m.config.MigrationsTable + "`"
|
||||
if _, err := m.db.Exec(query); err != nil {
|
||||
if _, err := tx.ExecContext(context.Background(), query); err != nil {
|
||||
tx.Rollback()
|
||||
return &database.Error{OrigErr: err, Query: []byte(query)}
|
||||
}
|
||||
|
||||
if version >= 0 {
|
||||
query := "INSERT INTO `" + m.config.MigrationsTable + "` (version, dirty) VALUES (?, ?)"
|
||||
if _, err := m.db.Exec(query, version, dirty); err != nil {
|
||||
if _, err := tx.ExecContext(context.Background(), query, version, dirty); err != nil {
|
||||
tx.Rollback()
|
||||
return &database.Error{OrigErr: err, Query: []byte(query)}
|
||||
}
|
||||
|
@ -238,7 +251,7 @@ func (m *Mysql) SetVersion(version int, dirty bool) error {
|
|||
|
||||
func (m *Mysql) Version() (version int, dirty bool, err error) {
|
||||
query := "SELECT version, dirty FROM `" + m.config.MigrationsTable + "` LIMIT 1"
|
||||
err = m.db.QueryRow(query).Scan(&version, &dirty)
|
||||
err = m.db.QueryRowContext(context.Background(), query).Scan(&version, &dirty)
|
||||
switch {
|
||||
case err == sql.ErrNoRows:
|
||||
return database.NilVersion, false, nil
|
||||
|
@ -259,7 +272,7 @@ func (m *Mysql) Version() (version int, dirty bool, err error) {
|
|||
func (m *Mysql) Drop() error {
|
||||
// select all tables
|
||||
query := `SHOW TABLES LIKE '%'`
|
||||
tables, err := m.db.Query(query)
|
||||
tables, err := m.db.QueryContext(context.Background(), query)
|
||||
if err != nil {
|
||||
return &database.Error{OrigErr: err, Query: []byte(query)}
|
||||
}
|
||||
|
@ -281,7 +294,7 @@ func (m *Mysql) Drop() error {
|
|||
// delete one by one ...
|
||||
for _, t := range tableNames {
|
||||
query = "DROP TABLE IF EXISTS `" + t + "` CASCADE"
|
||||
if _, err := m.db.Exec(query); err != nil {
|
||||
if _, err := m.db.ExecContext(context.Background(), query); err != nil {
|
||||
return &database.Error{OrigErr: err, Query: []byte(query)}
|
||||
}
|
||||
}
|
||||
|
@ -297,7 +310,7 @@ func (m *Mysql) ensureVersionTable() error {
|
|||
// check if migration table exists
|
||||
var result string
|
||||
query := `SHOW TABLES LIKE "` + m.config.MigrationsTable + `"`
|
||||
if err := m.db.QueryRow(query).Scan(&result); err != nil {
|
||||
if err := m.db.QueryRowContext(context.Background(), query).Scan(&result); err != nil {
|
||||
if err != sql.ErrNoRows {
|
||||
return &database.Error{OrigErr: err, Query: []byte(query)}
|
||||
}
|
||||
|
@ -307,7 +320,7 @@ func (m *Mysql) ensureVersionTable() error {
|
|||
|
||||
// if not, create the empty migration table
|
||||
query = "CREATE TABLE `" + m.config.MigrationsTable + "` (version bigint not null primary key, dirty boolean not null)"
|
||||
if _, err := m.db.Exec(query); err != nil {
|
||||
if _, err := m.db.ExecContext(context.Background(), query); err != nil {
|
||||
return &database.Error{OrigErr: err, Query: []byte(query)}
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -63,3 +63,37 @@ func Test(t *testing.T) {
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestLockWorks(t *testing.T) {
|
||||
mt.ParallelTest(t, versions, isReady,
|
||||
func(t *testing.T, i mt.Instance) {
|
||||
p := &Mysql{}
|
||||
addr := fmt.Sprintf("mysql://root:root@tcp(%v:%v)/public", i.Host(), i.Port())
|
||||
d, err := p.Open(addr)
|
||||
if err != nil {
|
||||
t.Fatalf("%v", err)
|
||||
}
|
||||
dt.Test(t, d, []byte("SELECT 1"))
|
||||
|
||||
ms := d.(*Mysql)
|
||||
|
||||
err = ms.Lock()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = ms.Unlock()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// make sure the 2nd lock works (RELEASE_LOCK is very finicky)
|
||||
err = ms.Lock()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = ms.Unlock()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue