Add Spanner driver

Support for Google Cloud Spanner, closes #172. Includes example migrations ported over from the PostgreSQL driver.

In all examples provided by Google I’ve come across, camel-case is used for table names, columns, etc. Hence the examples use this naming convention.
This commit is contained in:
Christian Klotz 2017-06-04 21:13:49 +01:00
parent 3682bcf9d2
commit 748ae8f06a
5 changed files with 301 additions and 2 deletions

View File

@ -1,5 +1,5 @@
SOURCE ?= file go-bindata github aws-s3 google-cloud-storage
DATABASE ?= postgres mysql redshift
DATABASE ?= postgres mysql redshift spanner
VERSION ?= $(shell git describe --tags 2>/dev/null | cut -c 2-)
TEST_FLAGS ?=
REPO_OWNER ?= $(shell cd .. && basename "$$(pwd)")

View File

@ -31,7 +31,7 @@ Database drivers run migrations. [Add a new database?](database/driver.go)
* [MongoDB](database/mongodb) ([todo #169](https://github.com/mattes/migrate/issues/169))
* [CrateDB](database/crate) ([todo #170](https://github.com/mattes/migrate/issues/170))
* [Shell](database/shell) ([todo #171](https://github.com/mattes/migrate/issues/171))
* [Google Cloud Spanner](database/spanner) ([todo #172](https://github.com/mattes/migrate/issues/172))
* [Google Cloud Spanner](database/spanner)

7
cli/build_spanner.go Normal file
View File

@ -0,0 +1,7 @@
// +build spanner
package main
import (
_ "github.com/mattes/migrate/database/spanner"
)

View File

@ -0,0 +1,29 @@
# Google Cloud Spanner
## Usage
The DSN must be given in the following format.
`spanner://projects/{projectId}/instances/{instanceId}/databases/{databaseName}`
See [Google Spanner Documentation](https://cloud.google.com/spanner/docs) for details.
| Param | WithInstance Config | Description |
| ----- | ------------------- | ----------- |
| `x-migrations-table` | `MigrationsTable` | Name of the migrations table |
| `url` | `DatabaseName` | The full path to the Spanner database resource. If provided as part of `Config` it must not contain a scheme or query string to match the format `projects/{projectId}/instances/{instanceId}/databases/{databaseName}`|
| `projectId` || The Google Cloud Platform project id
| `instanceId` || The id of the instance running Spanner
| `databaseName` || The name of the Spanner database
> **Note:** Google Cloud Spanner migrations can take a considerable amount of
> time. The migrations provided as part of the example take about 6 minutes to
> run on a small instance.
>
> ```log
> 1481574547/u create_users_table (21.354507597s)
> 1496539702/u add_city_to_users (41.647359754s)
> 1496601752/u add_index_on_user_emails (2m12.155787369s)
> 1496602638/u create_books_table (2m30.77299181s)

263
database/spanner/spanner.go Normal file
View File

@ -0,0 +1,263 @@
package spanner
import (
"fmt"
"io"
"io/ioutil"
"log"
nurl "net/url"
"regexp"
"strings"
"golang.org/x/net/context"
"cloud.google.com/go/spanner"
database "cloud.google.com/go/spanner/admin/database/apiv1"
"github.com/mattes/migrate"
mdb "github.com/mattes/migrate/database"
"google.golang.org/api/iterator"
adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1"
)
func init() {
db := Spanner{}
mdb.Register("spanner", &db)
}
// DefaultMigrationsTable is used if no custom table is specified
const DefaultMigrationsTable = "SchemaMigrations"
// Driver errors
var (
ErrNilConfig = fmt.Errorf("no config")
ErrNoDatabaseName = fmt.Errorf("no database name")
ErrNoSchema = fmt.Errorf("no schema")
ErrDatabaseDirty = fmt.Errorf("database is dirty")
)
// Config used for a Spanner instance
type Config struct {
MigrationsTable string
DatabaseName string
}
// Spanner implements database.Driver for Google Cloud Spanner
type Spanner struct {
adminClient *database.DatabaseAdminClient
dataClient *spanner.Client
config *Config
}
// Open implements database.Driver
func (s *Spanner) Open(url string) (mdb.Driver, error) {
purl, err := nurl.Parse(url)
if err != nil {
return nil, err
}
dbname := strings.Replace(migrate.FilterCustomQuery(purl).String(), "spanner://", "", 1)
ctx := context.Background()
adminClient, err := database.NewDatabaseAdminClient(ctx)
if err != nil {
return nil, err
}
dataClient, err := spanner.NewClient(ctx, dbname)
if err != nil {
log.Fatal(err)
}
migrationsTable := purl.Query().Get("x-migrations-table")
if len(migrationsTable) == 0 {
migrationsTable = DefaultMigrationsTable
}
sx := &Spanner{
adminClient: adminClient,
dataClient: dataClient,
config: &Config{
DatabaseName: dbname,
MigrationsTable: migrationsTable,
},
}
if err := sx.ensureVersionTable(); err != nil {
return nil, err
}
return sx, nil
}
// Close implements database.Driver
func (s *Spanner) Close() error {
s.dataClient.Close()
return s.adminClient.Close()
}
// Lock implements database.Driver but doesn't do anything because Spanner only
// enqueues the UpdateDatabaseDdlRequest.
func (s *Spanner) Lock() error {
return nil
}
// Unlock implements database.Driver but no action required, see Lock.
func (s *Spanner) Unlock() error {
return nil
}
// Run implements database.Driver
func (s *Spanner) Run(migration io.Reader) error {
migr, err := ioutil.ReadAll(migration)
if err != nil {
return err
}
// run migration
stmt := string(migr[:])
ctx := context.Background()
op, err := s.adminClient.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
Database: s.config.DatabaseName,
Statements: []string{stmt},
})
if err != nil {
return &mdb.Error{OrigErr: err, Err: "migration failed", Query: migr}
}
if err := op.Wait(ctx); err != nil {
return &mdb.Error{OrigErr: err, Err: "migration failed", Query: migr}
}
return nil
}
// SetVersion implements database.Driver
func (s *Spanner) SetVersion(version int, dirty bool) error {
ctx := context.Background()
_, err := s.dataClient.ReadWriteTransaction(ctx,
func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
m := []*spanner.Mutation{
spanner.Delete(s.config.MigrationsTable, spanner.AllKeys()),
spanner.Insert(s.config.MigrationsTable,
[]string{"Version", "Dirty"},
[]interface{}{version, dirty},
)}
return txn.BufferWrite(m)
})
if err != nil {
return &mdb.Error{OrigErr: err}
}
return nil
}
// Version implements database.Driver
func (s *Spanner) Version() (version int, dirty bool, err error) {
ctx := context.Background()
stmt := spanner.Statement{
SQL: `SELECT Version, Dirty FROM ` + s.config.MigrationsTable + ` LIMIT 1`,
}
iter := s.dataClient.Single().Query(ctx, stmt)
defer iter.Stop()
row, err := iter.Next()
switch err {
case iterator.Done:
return mdb.NilVersion, false, nil
case nil:
var v int64
if err = row.Columns(&v, &dirty); err != nil {
return 0, false, &mdb.Error{OrigErr: err, Query: []byte(stmt.SQL)}
}
version = int(v)
default:
return 0, false, &mdb.Error{OrigErr: err, Query: []byte(stmt.SQL)}
}
return version, dirty, nil
}
// Drop implements database.Driver. Retrieves the database schema first and
// creates statements to drop the indexes and tables accordingly.
// Note: The drop statements are created in reverse order to how they're
// provided in the schema. Assuming the schema describes how the database can
// be "build up", it seems logical to "unbuild" the database simply by going the
// opposite direction. More testing
func (s *Spanner) Drop() error {
ctx := context.Background()
res, err := s.adminClient.GetDatabaseDdl(ctx, &adminpb.GetDatabaseDdlRequest{
Database: s.config.DatabaseName,
})
if err != nil {
return &mdb.Error{OrigErr: err, Err: "drop failed"}
}
if len(res.Statements) == 0 {
return nil
}
r := regexp.MustCompile(`(CREATE TABLE\s(\S+)\s)|(CREATE.+INDEX\s(\S+)\s)`)
stmts := make([]string, 0)
for i := len(res.Statements) - 1; i >= 0; i-- {
s := res.Statements[i]
m := r.FindSubmatch([]byte(s))
if len(m) == 0 {
continue
} else if tbl := m[2]; len(tbl) > 0 {
stmts = append(stmts, fmt.Sprintf(`DROP TABLE %s`, tbl))
} else if idx := m[4]; len(idx) > 0 {
stmts = append(stmts, fmt.Sprintf(`DROP INDEX %s`, idx))
}
}
op, err := s.adminClient.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
Database: s.config.DatabaseName,
Statements: stmts,
})
if err != nil {
return &mdb.Error{OrigErr: err, Query: []byte(strings.Join(stmts, "; "))}
}
if err := op.Wait(ctx); err != nil {
return &mdb.Error{OrigErr: err, Query: []byte(strings.Join(stmts, "; "))}
}
if err := p.ensureVersionTable(); err != nil {
return err
}
return nil
}
func (s *Spanner) ensureVersionTable() error {
ctx := context.Background()
tbl := s.config.MigrationsTable
iter := s.dataClient.Single().Read(ctx, tbl, spanner.AllKeys(), nil)
if err := iter.Do(func(r *spanner.Row) error { return nil }); err == nil {
return nil
}
stmt := fmt.Sprintf(`CREATE TABLE %s (
Version INT64 NOT NULL,
Dirty BOOL NOT NULL
) PRIMARY KEY(Version)`, tbl)
op, err := s.adminClient.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
Database: s.config.DatabaseName,
Statements: []string{stmt},
})
if err != nil {
return &mdb.Error{OrigErr: err, Query: []byte(stmt)}
}
if err := op.Wait(ctx); err != nil {
return &mdb.Error{OrigErr: err, Query: []byte(stmt)}
}
return nil
}