Merge pull request #128 from andrei-m/fix-redshift

Fix Redshift migrations driver
This commit is contained in:
Dale Hui 2018-11-14 10:32:07 -08:00 committed by GitHub
commit 7f00868584
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 638 additions and 25 deletions

View File

@ -1,6 +1,21 @@
Redshift
===
# Redshift
This provides a Redshift driver for migrations. It is used whenever the URL of the database starts with `redshift://`.
`redshift://user:password@host:port/dbname?query`
| URL Query | WithInstance Config | Description |
|------------|---------------------|-------------|
| `x-migrations-table` | `MigrationsTable` | Name of the migrations table |
| `dbname` | `DatabaseName` | The name of the database to connect to |
| `search_path` | | This variable specifies the order in which schemas are searched when an object is referenced by a simple name with no schema specified. |
| `user` | | The user to sign in as |
| `password` | | The user's password |
| `host` | | The host to connect to. Values that start with / are for unix domain sockets. (default is localhost) |
| `port` | | The port to bind to. (default is 5439) |
| `fallback_application_name` | | An application_name to fall back to if one isn't provided. |
| `connect_timeout` | | Maximum wait for connection, in seconds. Zero or not specified means wait indefinitely. |
| `sslcert` | | Cert file location. The file must contain PEM encoded data. |
| `sslkey` | | Key file location. The file must contain PEM encoded data. |
| `sslrootcert` | | The location of the root certificate file. The file must contain PEM encoded data. |
| `sslmode` | | Whether or not to use SSL (disable\|require\|verify-ca\|verify-full) |
Redshift is PostgreSQL compatible but has some specific features (or lack thereof) that require slightly different behavior.

View File

@ -0,0 +1 @@
DROP TABLE IF EXISTS users;

View File

@ -0,0 +1,5 @@
CREATE TABLE users (
user_id integer unique,
name varchar(40),
email varchar(40)
);

View File

@ -0,0 +1 @@
ALTER TABLE users DROP COLUMN IF EXISTS city;

View File

@ -0,0 +1,3 @@
ALTER TABLE users ADD COLUMN city varchar(100);

View File

@ -0,0 +1 @@
DROP INDEX IF EXISTS users_email_index;

View File

@ -0,0 +1,3 @@
CREATE UNIQUE INDEX CONCURRENTLY users_email_index ON users (email);
-- Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aenean sed interdum velit, tristique iaculis justo. Pellentesque ut porttitor dolor. Donec sit amet pharetra elit. Cras vel ligula ex. Phasellus posuere.

View File

@ -0,0 +1 @@
DROP TABLE IF EXISTS books;

View File

@ -0,0 +1,5 @@
CREATE TABLE books (
user_id integer,
name varchar(40),
author varchar(40)
);

View File

@ -0,0 +1 @@
DROP TABLE IF EXISTS movies;

View File

@ -0,0 +1,5 @@
CREATE TABLE movies (
user_id integer,
name varchar(40),
director varchar(40)
);

View File

@ -0,0 +1 @@
-- Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aenean sed interdum velit, tristique iaculis justo. Pellentesque ut porttitor dolor. Donec sit amet pharetra elit. Cras vel ligula ex. Phasellus posuere.

View File

@ -0,0 +1 @@
-- Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aenean sed interdum velit, tristique iaculis justo. Pellentesque ut porttitor dolor. Donec sit amet pharetra elit. Cras vel ligula ex. Phasellus posuere.

View File

@ -0,0 +1 @@
-- Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aenean sed interdum velit, tristique iaculis justo. Pellentesque ut porttitor dolor. Donec sit amet pharetra elit. Cras vel ligula ex. Phasellus posuere.

View File

@ -0,0 +1 @@
-- Lorem ipsum dolor sit amet, consectetur adipiscing elit. Aenean sed interdum velit, tristique iaculis justo. Pellentesque ut porttitor dolor. Donec sit amet pharetra elit. Cras vel ligula ex. Phasellus posuere.

View File

@ -1,46 +1,310 @@
// +build go1.9
package redshift
import (
"net/url"
"context"
"database/sql"
"fmt"
"io"
"io/ioutil"
nurl "net/url"
"strconv"
"strings"
"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database"
"github.com/golang-migrate/migrate/v4/database/postgres"
"github.com/lib/pq"
)
// init registers the driver under the name 'redshift'
func init() {
db := new(Redshift)
db.Driver = new(postgres.Postgres)
database.Register("redshift", db)
db := Redshift{}
database.Register("redshift", &db)
}
var DefaultMigrationsTable = "schema_migrations"
var (
ErrNilConfig = fmt.Errorf("no config")
ErrNoDatabaseName = fmt.Errorf("no database name")
)
type Config struct {
MigrationsTable string
DatabaseName string
}
// Redshift is a wrapper around the PostgreSQL driver which implements Redshift-specific behavior.
//
// Currently, the only different behaviour is the lack of locking in Redshift. The (Un)Lock() method(s) have been overridden from the PostgreSQL adapter to simply return nil.
type Redshift struct {
// The wrapped PostgreSQL driver.
database.Driver
isLocked bool
conn *sql.Conn
db *sql.DB
// Open and WithInstance need to garantuee that config is never nil
config *Config
}
// Open implements the database.Driver interface by parsing the URL, switching the scheme from "redshift" to "postgres", and delegating to the underlying PostgreSQL driver.
func (driver *Redshift) Open(dsn string) (database.Driver, error) {
parsed, err := url.Parse(dsn)
func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) {
if config == nil {
return nil, ErrNilConfig
}
if err := instance.Ping(); err != nil {
return nil, err
}
query := `SELECT CURRENT_DATABASE()`
var databaseName string
if err := instance.QueryRow(query).Scan(&databaseName); err != nil {
return nil, &database.Error{OrigErr: err, Query: []byte(query)}
}
if len(databaseName) == 0 {
return nil, ErrNoDatabaseName
}
config.DatabaseName = databaseName
if len(config.MigrationsTable) == 0 {
config.MigrationsTable = DefaultMigrationsTable
}
conn, err := instance.Conn(context.Background())
if err != nil {
return nil, err
}
parsed.Scheme = "postgres"
psql, err := driver.Driver.Open(parsed.String())
px := &Redshift{
conn: conn,
db: instance,
config: config,
}
if err := px.ensureVersionTable(); err != nil {
return nil, err
}
return px, nil
}
func (p *Redshift) Open(url string) (database.Driver, error) {
purl, err := nurl.Parse(url)
if err != nil {
return nil, err
}
purl.Scheme = "postgres"
db, err := sql.Open("postgres", migrate.FilterCustomQuery(purl).String())
if err != nil {
return nil, err
}
return &Redshift{Driver: psql}, nil
migrationsTable := purl.Query().Get("x-migrations-table")
if len(migrationsTable) == 0 {
migrationsTable = DefaultMigrationsTable
}
// Lock implements the database.Driver interface by not locking and returning nil.
func (driver *Redshift) Lock() error { return nil }
px, err := WithInstance(db, &Config{
DatabaseName: purl.Path,
MigrationsTable: migrationsTable,
})
if err != nil {
return nil, err
}
// Unlock implements the database.Driver interface by not unlocking and returning nil.
func (driver *Redshift) Unlock() error { return nil }
return px, nil
}
func (p *Redshift) Close() error {
connErr := p.conn.Close()
dbErr := p.db.Close()
if connErr != nil || dbErr != nil {
return fmt.Errorf("conn: %v, db: %v", connErr, dbErr)
}
return nil
}
// Redshift does not support advisory lock functions: https://docs.aws.amazon.com/redshift/latest/dg/c_unsupported-postgresql-functions.html
func (p *Redshift) Lock() error {
if p.isLocked {
return database.ErrLocked
}
p.isLocked = true
return nil
}
func (p *Redshift) Unlock() error {
p.isLocked = false
return nil
}
func (p *Redshift) Run(migration io.Reader) error {
migr, err := ioutil.ReadAll(migration)
if err != nil {
return err
}
// run migration
query := string(migr[:])
if _, err := p.conn.ExecContext(context.Background(), query); err != nil {
if pgErr, ok := err.(*pq.Error); ok {
var line uint
var col uint
var lineColOK bool
if pgErr.Position != "" {
if pos, err := strconv.ParseUint(pgErr.Position, 10, 64); err == nil {
line, col, lineColOK = computeLineFromPos(query, int(pos))
}
}
message := fmt.Sprintf("migration failed: %s", pgErr.Message)
if lineColOK {
message = fmt.Sprintf("%s (column %d)", message, col)
}
if pgErr.Detail != "" {
message = fmt.Sprintf("%s, %s", message, pgErr.Detail)
}
return database.Error{OrigErr: err, Err: message, Query: migr, Line: line}
}
return database.Error{OrigErr: err, Err: "migration failed", Query: migr}
}
return nil
}
func computeLineFromPos(s string, pos int) (line uint, col uint, ok bool) {
// replace crlf with lf
s = strings.Replace(s, "\r\n", "\n", -1)
// pg docs: pos uses index 1 for the first character, and positions are measured in characters not bytes
runes := []rune(s)
if pos > len(runes) {
return 0, 0, false
}
sel := runes[:pos]
line = uint(runesCount(sel, newLine) + 1)
col = uint(pos - 1 - runesLastIndex(sel, newLine))
return line, col, true
}
const newLine = '\n'
func runesCount(input []rune, target rune) int {
var count int
for _, r := range input {
if r == target {
count++
}
}
return count
}
func runesLastIndex(input []rune, target rune) int {
for i := len(input) - 1; i >= 0; i-- {
if input[i] == target {
return i
}
}
return -1
}
func (p *Redshift) SetVersion(version int, dirty bool) error {
tx, err := p.conn.BeginTx(context.Background(), &sql.TxOptions{})
if err != nil {
return &database.Error{OrigErr: err, Err: "transaction start failed"}
}
query := `DELETE FROM "` + p.config.MigrationsTable + `"`
if _, err := tx.Exec(query); err != nil {
tx.Rollback()
return &database.Error{OrigErr: err, Query: []byte(query)}
}
if version >= 0 {
query = `INSERT INTO "` + p.config.MigrationsTable + `" (version, dirty) VALUES ($1, $2)`
if _, err := tx.Exec(query, version, dirty); err != nil {
tx.Rollback()
return &database.Error{OrigErr: err, Query: []byte(query)}
}
}
if err := tx.Commit(); err != nil {
return &database.Error{OrigErr: err, Err: "transaction commit failed"}
}
return nil
}
func (p *Redshift) Version() (version int, dirty bool, err error) {
query := `SELECT version, dirty FROM "` + p.config.MigrationsTable + `" LIMIT 1`
err = p.conn.QueryRowContext(context.Background(), query).Scan(&version, &dirty)
switch {
case err == sql.ErrNoRows:
return database.NilVersion, false, nil
case err != nil:
if e, ok := err.(*pq.Error); ok {
if e.Code.Name() == "undefined_table" {
return database.NilVersion, false, nil
}
}
return 0, false, &database.Error{OrigErr: err, Query: []byte(query)}
default:
return version, dirty, nil
}
}
func (p *Redshift) Drop() error {
// select all tables in current schema
query := `SELECT table_name FROM information_schema.tables WHERE table_schema=(SELECT current_schema()) AND table_type='BASE TABLE'`
tables, err := p.conn.QueryContext(context.Background(), query)
if err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}
defer tables.Close()
// delete one table after another
tableNames := make([]string, 0)
for tables.Next() {
var tableName string
if err := tables.Scan(&tableName); err != nil {
return err
}
if len(tableName) > 0 {
tableNames = append(tableNames, tableName)
}
}
if len(tableNames) > 0 {
// delete one by one ...
for _, t := range tableNames {
query = `DROP TABLE IF EXISTS ` + t + ` CASCADE`
if _, err := p.conn.ExecContext(context.Background(), query); err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}
}
if err := p.ensureVersionTable(); err != nil {
return err
}
}
return nil
}
func (p *Redshift) ensureVersionTable() error {
// check if migration table exists
var count int
query := `SELECT COUNT(1) FROM information_schema.tables WHERE table_name = $1 AND table_schema = (SELECT current_schema()) LIMIT 1`
if err := p.conn.QueryRowContext(context.Background(), query, p.config.MigrationsTable).Scan(&count); err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}
if count == 1 {
return nil
}
// if not, create the empty migration table
query = `CREATE TABLE "` + p.config.MigrationsTable + `" (version bigint not null primary key, dirty boolean not null)`
if _, err := p.conn.ExecContext(context.Background(), query); err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}
return nil
}

View File

@ -0,0 +1,304 @@
package redshift
// error codes https://github.com/lib/pq/blob/master/error.go
import (
"bytes"
"context"
"database/sql"
sqldriver "database/sql/driver"
"fmt"
"io"
"strconv"
"strings"
"testing"
dt "github.com/golang-migrate/migrate/v4/database/testing"
mt "github.com/golang-migrate/migrate/v4/testing"
)
var versions = []mt.Version{
{Image: "postgres:8"},
}
func redshiftConnectionString(host string, port uint) string {
return connectionString("redshift", host, port)
}
func pgConnectionString(host string, port uint) string {
return connectionString("postgres", host, port)
}
func connectionString(schema, host string, port uint) string {
return fmt.Sprintf("%s://postgres@%s:%v/postgres?sslmode=disable", schema, host, port)
}
func isReady(i mt.Instance) bool {
db, err := sql.Open("postgres", pgConnectionString(i.Host(), i.Port()))
if err != nil {
return false
}
defer db.Close()
if err = db.Ping(); err != nil {
switch err {
case sqldriver.ErrBadConn, io.EOF:
return false
default:
fmt.Println(err)
}
return false
}
return true
}
func Test(t *testing.T) {
mt.ParallelTest(t, versions, isReady,
func(t *testing.T, i mt.Instance) {
p := &Redshift{}
addr := redshiftConnectionString(i.Host(), i.Port())
d, err := p.Open(addr)
if err != nil {
t.Fatalf("%v", err)
}
defer d.Close()
dt.Test(t, d, []byte("SELECT 1"))
})
}
func TestMultiStatement(t *testing.T) {
mt.ParallelTest(t, versions, isReady,
func(t *testing.T, i mt.Instance) {
p := &Redshift{}
addr := redshiftConnectionString(i.Host(), i.Port())
d, err := p.Open(addr)
if err != nil {
t.Fatalf("%v", err)
}
defer d.Close()
if err := d.Run(bytes.NewReader([]byte("CREATE TABLE foo (foo text); CREATE TABLE bar (bar text);"))); err != nil {
t.Fatalf("expected err to be nil, got %v", err)
}
// make sure second table exists
var exists bool
if err := d.(*Redshift).conn.QueryRowContext(context.Background(), "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'bar' AND table_schema = (SELECT current_schema()))").Scan(&exists); err != nil {
t.Fatal(err)
}
if !exists {
t.Fatalf("expected table bar to exist")
}
})
}
func TestErrorParsing(t *testing.T) {
mt.ParallelTest(t, versions, isReady,
func(t *testing.T, i mt.Instance) {
p := &Redshift{}
addr := redshiftConnectionString(i.Host(), i.Port())
d, err := p.Open(addr)
if err != nil {
t.Fatalf("%v", err)
}
defer d.Close()
wantErr := `migration failed: syntax error at or near "TABLEE" (column 37) in line 1: CREATE TABLE foo ` +
`(foo text); CREATE TABLEE bar (bar text); (details: pq: syntax error at or near "TABLEE")`
if err := d.Run(bytes.NewReader([]byte("CREATE TABLE foo (foo text); CREATE TABLEE bar (bar text);"))); err == nil {
t.Fatal("expected err but got nil")
} else if err.Error() != wantErr {
t.Fatalf("expected '%s' but got '%s'", wantErr, err.Error())
}
})
}
func TestFilterCustomQuery(t *testing.T) {
mt.ParallelTest(t, versions, isReady,
func(t *testing.T, i mt.Instance) {
p := &Redshift{}
addr := fmt.Sprintf("postgres://postgres@%v:%v/postgres?sslmode=disable&x-custom=foobar", i.Host(), i.Port())
d, err := p.Open(addr)
if err != nil {
t.Fatalf("%v", err)
}
defer d.Close()
})
}
func TestWithSchema(t *testing.T) {
mt.ParallelTest(t, versions, isReady,
func(t *testing.T, i mt.Instance) {
p := &Redshift{}
addr := redshiftConnectionString(i.Host(), i.Port())
d, err := p.Open(addr)
if err != nil {
t.Fatalf("%v", err)
}
defer d.Close()
// create foobar schema
if err := d.Run(bytes.NewReader([]byte("CREATE SCHEMA foobar AUTHORIZATION postgres"))); err != nil {
t.Fatal(err)
}
if err := d.SetVersion(1, false); err != nil {
t.Fatal(err)
}
// re-connect using that schema
d2, err := p.Open(fmt.Sprintf("postgres://postgres@%v:%v/postgres?sslmode=disable&search_path=foobar", i.Host(), i.Port()))
if err != nil {
t.Fatalf("%v", err)
}
defer d2.Close()
version, _, err := d2.Version()
if err != nil {
t.Fatal(err)
}
if version != -1 {
t.Fatal("expected NilVersion")
}
// now update version and compare
if err := d2.SetVersion(2, false); err != nil {
t.Fatal(err)
}
version, _, err = d2.Version()
if err != nil {
t.Fatal(err)
}
if version != 2 {
t.Fatal("expected version 2")
}
// meanwhile, the public schema still has the other version
version, _, err = d.Version()
if err != nil {
t.Fatal(err)
}
if version != 1 {
t.Fatal("expected version 2")
}
})
}
func TestWithInstance(t *testing.T) {
}
func TestRedshift_Lock(t *testing.T) {
mt.ParallelTest(t, versions, isReady,
func(t *testing.T, i mt.Instance) {
p := &Redshift{}
addr := pgConnectionString(i.Host(), i.Port())
d, err := p.Open(addr)
if err != nil {
t.Fatalf("%v", err)
}
dt.Test(t, d, []byte("SELECT 1"))
ps := d.(*Redshift)
err = ps.Lock()
if err != nil {
t.Fatal(err)
}
err = ps.Unlock()
if err != nil {
t.Fatal(err)
}
err = ps.Lock()
if err != nil {
t.Fatal(err)
}
err = ps.Unlock()
if err != nil {
t.Fatal(err)
}
})
}
func Test_computeLineFromPos(t *testing.T) {
testcases := []struct {
pos int
wantLine uint
wantCol uint
input string
wantOk bool
}{
{
15, 2, 6, "SELECT *\nFROM foo", true, // foo table does not exists
},
{
16, 3, 6, "SELECT *\n\nFROM foo", true, // foo table does not exists, empty line
},
{
25, 3, 7, "SELECT *\nFROM foo\nWHERE x", true, // x column error
},
{
27, 5, 7, "SELECT *\n\nFROM foo\n\nWHERE x", true, // x column error, empty lines
},
{
10, 2, 1, "SELECT *\nFROMM foo", true, // FROMM typo
},
{
11, 3, 1, "SELECT *\n\nFROMM foo", true, // FROMM typo, empty line
},
{
17, 2, 8, "SELECT *\nFROM foo", true, // last character
},
{
18, 0, 0, "SELECT *\nFROM foo", false, // invalid position
},
}
for i, tc := range testcases {
t.Run("tc"+strconv.Itoa(i), func(t *testing.T) {
run := func(crlf bool, nonASCII bool) {
var name string
if crlf {
name = "crlf"
} else {
name = "lf"
}
if nonASCII {
name += "-nonascii"
} else {
name += "-ascii"
}
t.Run(name, func(t *testing.T) {
input := tc.input
if crlf {
input = strings.Replace(input, "\n", "\r\n", -1)
}
if nonASCII {
input = strings.Replace(input, "FROM", "FRÖM", -1)
}
gotLine, gotCol, gotOK := computeLineFromPos(input, tc.pos)
if tc.wantOk {
t.Logf("pos %d, want %d:%d, %#v", tc.pos, tc.wantLine, tc.wantCol, input)
}
if gotOK != tc.wantOk {
t.Fatalf("expected ok %v but got %v", tc.wantOk, gotOK)
}
if gotLine != tc.wantLine {
t.Fatalf("expected line %d but got %d", tc.wantLine, gotLine)
}
if gotCol != tc.wantCol {
t.Fatalf("expected col %d but got %d", tc.wantCol, gotCol)
}
})
}
run(false, false)
run(true, false)
run(false, true)
run(true, true)
})
}
}