From 155a8b241cd4b871f43d86e3ca4c109bfd205393 Mon Sep 17 00:00:00 2001 From: Teddy Schmitz Date: Fri, 27 Oct 2017 10:56:10 +0800 Subject: [PATCH 1/2] Use db.Conn to fix postgres lock/unlock --- .travis.yml | 6 +++-- database/postgres/postgres.go | 31 +++++++++++++++--------- database/postgres/postgres_test.go | 39 +++++++++++++++++++++++++++++- 3 files changed, 62 insertions(+), 14 deletions(-) diff --git a/.travis.yml b/.travis.yml index 9535b36..fce76d8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -5,6 +5,8 @@ go: - 1.7 - 1.8 - 1.9 + - 1.9.1 + - 1.9.2 env: - MIGRATE_TEST_CONTAINER_BOOT_DELAY=10 @@ -38,7 +40,7 @@ deploy: secure: EFow50BI448HVb/uQ1Kk2Kq0xzmwIYq3V67YyymXIuqSCodvXEsMiBPUoLrxEknpPEIc67LEQTNdfHBgvyHk6oRINWAfie+7pr5tKrpOTF9ghyxoN1PlO8WKQCqwCvGMBCnc5ur5rvzp0bqfpV2rs5q9/nngy3kBuEvs12V7iho= skip_cleanup: true on: - go: 1.8 + go: 1.9 repo: mattes/migrate tags: true file: @@ -56,7 +58,7 @@ deploy: package_glob: '*.deb' skip_cleanup: true on: - go: 1.8 + go: 1.9 repo: mattes/migrate tags: true diff --git a/database/postgres/postgres.go b/database/postgres/postgres.go index fb2d61c..601e9dd 100644 --- a/database/postgres/postgres.go +++ b/database/postgres/postgres.go @@ -1,3 +1,4 @@ +// +build go1.9 package postgres import ( @@ -10,6 +11,7 @@ import ( "github.com/lib/pq" "github.com/mattes/migrate" "github.com/mattes/migrate/database" + "context" ) func init() { @@ -33,7 +35,8 @@ type Config struct { } type Postgres struct { - db *sql.DB + // Locking and unlocking need to use the same connection + db *sql.Conn isLocked bool // Open and WithInstance need to garantuee that config is never nil @@ -65,8 +68,14 @@ func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) { config.MigrationsTable = DefaultMigrationsTable } + conn, err := instance.Conn(context.Background()) + + if err != nil { + return nil, err + } + px := &Postgres{ - db: instance, + db: conn, config: config, } @@ -123,7 +132,7 @@ func (p *Postgres) Lock() error { // or return false if the lock cannot be acquired immediately. query := `SELECT pg_try_advisory_lock($1)` var success bool - if err := p.db.QueryRow(query, aid).Scan(&success); err != nil { + if err := p.db.QueryRowContext(context.Background(), query, aid).Scan(&success); err != nil { return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)} } @@ -146,7 +155,7 @@ func (p *Postgres) Unlock() error { } query := `SELECT pg_advisory_unlock($1)` - if _, err := p.db.Exec(query, aid); err != nil { + if _, err := p.db.ExecContext(context.Background(), query, aid); err != nil { return &database.Error{OrigErr: err, Query: []byte(query)} } p.isLocked = false @@ -161,7 +170,7 @@ func (p *Postgres) Run(migration io.Reader) error { // run migration query := string(migr[:]) - if _, err := p.db.Exec(query); err != nil { + if _, err := p.db.ExecContext(context.Background(), query); err != nil { // TODO: cast to postgress error and get line number return database.Error{OrigErr: err, Err: "migration failed", Query: migr} } @@ -170,7 +179,7 @@ func (p *Postgres) Run(migration io.Reader) error { } func (p *Postgres) SetVersion(version int, dirty bool) error { - tx, err := p.db.Begin() + tx, err := p.db.BeginTx(context.Background(), &sql.TxOptions{}) if err != nil { return &database.Error{OrigErr: err, Err: "transaction start failed"} } @@ -198,7 +207,7 @@ func (p *Postgres) SetVersion(version int, dirty bool) error { func (p *Postgres) Version() (version int, dirty bool, err error) { query := `SELECT version, dirty FROM "` + p.config.MigrationsTable + `" LIMIT 1` - err = p.db.QueryRow(query).Scan(&version, &dirty) + err = p.db.QueryRowContext(context.Background(),query).Scan(&version, &dirty) switch { case err == sql.ErrNoRows: return database.NilVersion, false, nil @@ -219,7 +228,7 @@ func (p *Postgres) Version() (version int, dirty bool, err error) { func (p *Postgres) Drop() error { // select all tables in current schema query := `SELECT table_name FROM information_schema.tables WHERE table_schema=(SELECT current_schema())` - tables, err := p.db.Query(query) + tables, err := p.db.QueryContext(context.Background(),query) if err != nil { return &database.Error{OrigErr: err, Query: []byte(query)} } @@ -241,7 +250,7 @@ func (p *Postgres) Drop() error { // delete one by one ... for _, t := range tableNames { query = `DROP TABLE IF EXISTS ` + t + ` CASCADE` - if _, err := p.db.Exec(query); err != nil { + if _, err := p.db.ExecContext(context.Background(), query); err != nil { return &database.Error{OrigErr: err, Query: []byte(query)} } } @@ -257,7 +266,7 @@ func (p *Postgres) ensureVersionTable() error { // check if migration table exists var count int query := `SELECT COUNT(1) FROM information_schema.tables WHERE table_name = $1 AND table_schema = (SELECT current_schema()) LIMIT 1` - if err := p.db.QueryRow(query, p.config.MigrationsTable).Scan(&count); err != nil { + if err := p.db.QueryRowContext(context.Background(),query, p.config.MigrationsTable).Scan(&count); err != nil { return &database.Error{OrigErr: err, Query: []byte(query)} } if count == 1 { @@ -266,7 +275,7 @@ func (p *Postgres) ensureVersionTable() error { // if not, create the empty migration table query = `CREATE TABLE "` + p.config.MigrationsTable + `" (version bigint not null primary key, dirty boolean not null)` - if _, err := p.db.Exec(query); err != nil { + if _, err := p.db.ExecContext(context.Background(),query); err != nil { return &database.Error{OrigErr: err, Query: []byte(query)} } return nil diff --git a/database/postgres/postgres_test.go b/database/postgres/postgres_test.go index 9a367a0..7cdd0ad 100644 --- a/database/postgres/postgres_test.go +++ b/database/postgres/postgres_test.go @@ -12,6 +12,7 @@ import ( "github.com/lib/pq" dt "github.com/mattes/migrate/database/testing" mt "github.com/mattes/migrate/testing" + "context" ) var versions = []mt.Version{ @@ -69,7 +70,7 @@ func TestMultiStatement(t *testing.T) { // make sure second table exists var exists bool - if err := d.(*Postgres).db.QueryRow("SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'bar' AND table_schema = (SELECT current_schema()))").Scan(&exists); err != nil { + if err := d.(*Postgres).db.QueryRowContext(context.Background(), "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'bar' AND table_schema = (SELECT current_schema()))").Scan(&exists); err != nil { t.Fatal(err) } if !exists { @@ -148,3 +149,39 @@ func TestWithSchema(t *testing.T) { func TestWithInstance(t *testing.T) { } + +func TestPostgres_Lock(t *testing.T) { + mt.ParallelTest(t, versions, isReady, + func(t *testing.T, i mt.Instance) { + p := &Postgres{} + addr := fmt.Sprintf("postgres://postgres@%v:%v/postgres?sslmode=disable", i.Host(), i.Port()) + d, err := p.Open(addr) + if err != nil { + t.Fatalf("%v", err) + } + + dt.Test(t, d, []byte("SELECT 1")) + + ps := d.(*Postgres) + + err = ps.Lock() + if err != nil { + t.Fatal(err) + } + + err = ps.Unlock() + if err != nil { + t.Fatal(err) + } + + err = ps.Lock() + if err != nil { + t.Fatal(err) + } + + err = ps.Unlock() + if err != nil { + t.Fatal(err) + } + }) +} \ No newline at end of file From 1519c59188e35638247173d265e3facbde4f6222 Mon Sep 17 00:00:00 2001 From: Dale Hui Date: Tue, 20 Feb 2018 15:40:20 -0800 Subject: [PATCH 2/2] Rename Postgres.db to Postgres.conn --- database/postgres/postgres.go | 24 ++++++++++++------------ database/postgres/postgres_test.go | 2 +- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/database/postgres/postgres.go b/database/postgres/postgres.go index 81eab5f..bbab60f 100644 --- a/database/postgres/postgres.go +++ b/database/postgres/postgres.go @@ -37,7 +37,7 @@ type Config struct { type Postgres struct { // Locking and unlocking need to use the same connection - db *sql.Conn + conn *sql.Conn isLocked bool // Open and WithInstance need to garantuee that config is never nil @@ -76,7 +76,7 @@ func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) { } px := &Postgres{ - db: conn, + conn: conn, config: config, } @@ -115,7 +115,7 @@ func (p *Postgres) Open(url string) (database.Driver, error) { } func (p *Postgres) Close() error { - return p.db.Close() + return p.conn.Close() } // https://www.postgresql.org/docs/9.6/static/explicit-locking.html#ADVISORY-LOCKS @@ -133,7 +133,7 @@ func (p *Postgres) Lock() error { // or return false if the lock cannot be acquired immediately. query := `SELECT pg_try_advisory_lock($1)` var success bool - if err := p.db.QueryRowContext(context.Background(), query, aid).Scan(&success); err != nil { + if err := p.conn.QueryRowContext(context.Background(), query, aid).Scan(&success); err != nil { return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)} } @@ -156,7 +156,7 @@ func (p *Postgres) Unlock() error { } query := `SELECT pg_advisory_unlock($1)` - if _, err := p.db.ExecContext(context.Background(), query, aid); err != nil { + if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil { return &database.Error{OrigErr: err, Query: []byte(query)} } p.isLocked = false @@ -171,7 +171,7 @@ func (p *Postgres) Run(migration io.Reader) error { // run migration query := string(migr[:]) - if _, err := p.db.ExecContext(context.Background(), query); err != nil { + if _, err := p.conn.ExecContext(context.Background(), query); err != nil { // TODO: cast to postgress error and get line number return database.Error{OrigErr: err, Err: "migration failed", Query: migr} } @@ -180,7 +180,7 @@ func (p *Postgres) Run(migration io.Reader) error { } func (p *Postgres) SetVersion(version int, dirty bool) error { - tx, err := p.db.BeginTx(context.Background(), &sql.TxOptions{}) + tx, err := p.conn.BeginTx(context.Background(), &sql.TxOptions{}) if err != nil { return &database.Error{OrigErr: err, Err: "transaction start failed"} } @@ -208,7 +208,7 @@ func (p *Postgres) SetVersion(version int, dirty bool) error { func (p *Postgres) Version() (version int, dirty bool, err error) { query := `SELECT version, dirty FROM "` + p.config.MigrationsTable + `" LIMIT 1` - err = p.db.QueryRowContext(context.Background(), query).Scan(&version, &dirty) + err = p.conn.QueryRowContext(context.Background(), query).Scan(&version, &dirty) switch { case err == sql.ErrNoRows: return database.NilVersion, false, nil @@ -229,7 +229,7 @@ func (p *Postgres) Version() (version int, dirty bool, err error) { func (p *Postgres) Drop() error { // select all tables in current schema query := `SELECT table_name FROM information_schema.tables WHERE table_schema=(SELECT current_schema())` - tables, err := p.db.QueryContext(context.Background(), query) + tables, err := p.conn.QueryContext(context.Background(), query) if err != nil { return &database.Error{OrigErr: err, Query: []byte(query)} } @@ -251,7 +251,7 @@ func (p *Postgres) Drop() error { // delete one by one ... for _, t := range tableNames { query = `DROP TABLE IF EXISTS ` + t + ` CASCADE` - if _, err := p.db.ExecContext(context.Background(), query); err != nil { + if _, err := p.conn.ExecContext(context.Background(), query); err != nil { return &database.Error{OrigErr: err, Query: []byte(query)} } } @@ -267,7 +267,7 @@ func (p *Postgres) ensureVersionTable() error { // check if migration table exists var count int query := `SELECT COUNT(1) FROM information_schema.tables WHERE table_name = $1 AND table_schema = (SELECT current_schema()) LIMIT 1` - if err := p.db.QueryRowContext(context.Background(), query, p.config.MigrationsTable).Scan(&count); err != nil { + if err := p.conn.QueryRowContext(context.Background(), query, p.config.MigrationsTable).Scan(&count); err != nil { return &database.Error{OrigErr: err, Query: []byte(query)} } if count == 1 { @@ -276,7 +276,7 @@ func (p *Postgres) ensureVersionTable() error { // if not, create the empty migration table query = `CREATE TABLE "` + p.config.MigrationsTable + `" (version bigint not null primary key, dirty boolean not null)` - if _, err := p.db.ExecContext(context.Background(), query); err != nil { + if _, err := p.conn.ExecContext(context.Background(), query); err != nil { return &database.Error{OrigErr: err, Query: []byte(query)} } return nil diff --git a/database/postgres/postgres_test.go b/database/postgres/postgres_test.go index 68b9d6f..591752f 100644 --- a/database/postgres/postgres_test.go +++ b/database/postgres/postgres_test.go @@ -73,7 +73,7 @@ func TestMultiStatement(t *testing.T) { // make sure second table exists var exists bool - if err := d.(*Postgres).db.QueryRowContext(context.Background(), "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'bar' AND table_schema = (SELECT current_schema()))").Scan(&exists); err != nil { + if err := d.(*Postgres).conn.QueryRowContext(context.Background(), "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'bar' AND table_schema = (SELECT current_schema()))").Scan(&exists); err != nil { t.Fatal(err) } if !exists {