From 748ae8f06a472de25e4c58c2dc59df08121e7bd7 Mon Sep 17 00:00:00 2001 From: Christian Klotz Date: Sun, 4 Jun 2017 21:13:49 +0100 Subject: [PATCH] Add Spanner driver MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Support for Google Cloud Spanner, closes #172. Includes example migrations ported over from the PostgreSQL driver. In all examples provided by Google I’ve come across, camel-case is used for table names, columns, etc. Hence the examples use this naming convention. --- Makefile | 2 +- README.md | 2 +- cli/build_spanner.go | 7 + database/spanner/README.md | 29 ++++ database/spanner/spanner.go | 263 ++++++++++++++++++++++++++++++++++++ 5 files changed, 301 insertions(+), 2 deletions(-) create mode 100644 cli/build_spanner.go create mode 100644 database/spanner/README.md create mode 100644 database/spanner/spanner.go diff --git a/Makefile b/Makefile index 34e737e..466fe3e 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ SOURCE ?= file go-bindata github aws-s3 google-cloud-storage -DATABASE ?= postgres mysql redshift +DATABASE ?= postgres mysql redshift spanner VERSION ?= $(shell git describe --tags 2>/dev/null | cut -c 2-) TEST_FLAGS ?= REPO_OWNER ?= $(shell cd .. && basename "$$(pwd)") diff --git a/README.md b/README.md index 2327f7d..50455e0 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,7 @@ Database drivers run migrations. [Add a new database?](database/driver.go) * [MongoDB](database/mongodb) ([todo #169](https://github.com/mattes/migrate/issues/169)) * [CrateDB](database/crate) ([todo #170](https://github.com/mattes/migrate/issues/170)) * [Shell](database/shell) ([todo #171](https://github.com/mattes/migrate/issues/171)) - * [Google Cloud Spanner](database/spanner) ([todo #172](https://github.com/mattes/migrate/issues/172)) + * [Google Cloud Spanner](database/spanner) diff --git a/cli/build_spanner.go b/cli/build_spanner.go new file mode 100644 index 0000000..7223d82 --- /dev/null +++ b/cli/build_spanner.go @@ -0,0 +1,7 @@ +// +build spanner + +package main + +import ( + _ "github.com/mattes/migrate/database/spanner" +) diff --git a/database/spanner/README.md b/database/spanner/README.md new file mode 100644 index 0000000..195b214 --- /dev/null +++ b/database/spanner/README.md @@ -0,0 +1,29 @@ +# Google Cloud Spanner + +## Usage + +The DSN must be given in the following format. + +`spanner://projects/{projectId}/instances/{instanceId}/databases/{databaseName}` + +See [Google Spanner Documentation](https://cloud.google.com/spanner/docs) for details. + + +| Param | WithInstance Config | Description | +| ----- | ------------------- | ----------- | +| `x-migrations-table` | `MigrationsTable` | Name of the migrations table | +| `url` | `DatabaseName` | The full path to the Spanner database resource. If provided as part of `Config` it must not contain a scheme or query string to match the format `projects/{projectId}/instances/{instanceId}/databases/{databaseName}`| +| `projectId` || The Google Cloud Platform project id +| `instanceId` || The id of the instance running Spanner +| `databaseName` || The name of the Spanner database + + +> **Note:** Google Cloud Spanner migrations can take a considerable amount of +> time. The migrations provided as part of the example take about 6 minutes to +> run on a small instance. +> +> ```log +> 1481574547/u create_users_table (21.354507597s) +> 1496539702/u add_city_to_users (41.647359754s) +> 1496601752/u add_index_on_user_emails (2m12.155787369s) +> 1496602638/u create_books_table (2m30.77299181s) \ No newline at end of file diff --git a/database/spanner/spanner.go b/database/spanner/spanner.go new file mode 100644 index 0000000..ef86fcb --- /dev/null +++ b/database/spanner/spanner.go @@ -0,0 +1,263 @@ +package spanner + +import ( + "fmt" + "io" + "io/ioutil" + "log" + nurl "net/url" + "regexp" + "strings" + + "golang.org/x/net/context" + + "cloud.google.com/go/spanner" + database "cloud.google.com/go/spanner/admin/database/apiv1" + + "github.com/mattes/migrate" + mdb "github.com/mattes/migrate/database" + + "google.golang.org/api/iterator" + adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1" +) + +func init() { + db := Spanner{} + mdb.Register("spanner", &db) +} + +// DefaultMigrationsTable is used if no custom table is specified +const DefaultMigrationsTable = "SchemaMigrations" + +// Driver errors +var ( + ErrNilConfig = fmt.Errorf("no config") + ErrNoDatabaseName = fmt.Errorf("no database name") + ErrNoSchema = fmt.Errorf("no schema") + ErrDatabaseDirty = fmt.Errorf("database is dirty") +) + +// Config used for a Spanner instance +type Config struct { + MigrationsTable string + DatabaseName string +} + +// Spanner implements database.Driver for Google Cloud Spanner +type Spanner struct { + adminClient *database.DatabaseAdminClient + dataClient *spanner.Client + + config *Config +} + +// Open implements database.Driver +func (s *Spanner) Open(url string) (mdb.Driver, error) { + purl, err := nurl.Parse(url) + if err != nil { + return nil, err + } + + dbname := strings.Replace(migrate.FilterCustomQuery(purl).String(), "spanner://", "", 1) + + ctx := context.Background() + adminClient, err := database.NewDatabaseAdminClient(ctx) + if err != nil { + return nil, err + } + dataClient, err := spanner.NewClient(ctx, dbname) + if err != nil { + log.Fatal(err) + } + + migrationsTable := purl.Query().Get("x-migrations-table") + if len(migrationsTable) == 0 { + migrationsTable = DefaultMigrationsTable + } + + sx := &Spanner{ + adminClient: adminClient, + dataClient: dataClient, + config: &Config{ + DatabaseName: dbname, + MigrationsTable: migrationsTable, + }, + } + + if err := sx.ensureVersionTable(); err != nil { + return nil, err + } + + return sx, nil +} + +// Close implements database.Driver +func (s *Spanner) Close() error { + s.dataClient.Close() + return s.adminClient.Close() +} + +// Lock implements database.Driver but doesn't do anything because Spanner only +// enqueues the UpdateDatabaseDdlRequest. +func (s *Spanner) Lock() error { + return nil +} + +// Unlock implements database.Driver but no action required, see Lock. +func (s *Spanner) Unlock() error { + return nil +} + +// Run implements database.Driver +func (s *Spanner) Run(migration io.Reader) error { + migr, err := ioutil.ReadAll(migration) + if err != nil { + return err + } + + // run migration + stmt := string(migr[:]) + ctx := context.Background() + + op, err := s.adminClient.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{ + Database: s.config.DatabaseName, + Statements: []string{stmt}, + }) + + if err != nil { + return &mdb.Error{OrigErr: err, Err: "migration failed", Query: migr} + } + + if err := op.Wait(ctx); err != nil { + return &mdb.Error{OrigErr: err, Err: "migration failed", Query: migr} + } + + return nil +} + +// SetVersion implements database.Driver +func (s *Spanner) SetVersion(version int, dirty bool) error { + ctx := context.Background() + + _, err := s.dataClient.ReadWriteTransaction(ctx, + func(ctx context.Context, txn *spanner.ReadWriteTransaction) error { + m := []*spanner.Mutation{ + spanner.Delete(s.config.MigrationsTable, spanner.AllKeys()), + spanner.Insert(s.config.MigrationsTable, + []string{"Version", "Dirty"}, + []interface{}{version, dirty}, + )} + return txn.BufferWrite(m) + }) + if err != nil { + return &mdb.Error{OrigErr: err} + } + + return nil +} + +// Version implements database.Driver +func (s *Spanner) Version() (version int, dirty bool, err error) { + ctx := context.Background() + + stmt := spanner.Statement{ + SQL: `SELECT Version, Dirty FROM ` + s.config.MigrationsTable + ` LIMIT 1`, + } + iter := s.dataClient.Single().Query(ctx, stmt) + defer iter.Stop() + + row, err := iter.Next() + switch err { + case iterator.Done: + return mdb.NilVersion, false, nil + case nil: + var v int64 + if err = row.Columns(&v, &dirty); err != nil { + return 0, false, &mdb.Error{OrigErr: err, Query: []byte(stmt.SQL)} + } + version = int(v) + default: + return 0, false, &mdb.Error{OrigErr: err, Query: []byte(stmt.SQL)} + } + + return version, dirty, nil +} + +// Drop implements database.Driver. Retrieves the database schema first and +// creates statements to drop the indexes and tables accordingly. +// Note: The drop statements are created in reverse order to how they're +// provided in the schema. Assuming the schema describes how the database can +// be "build up", it seems logical to "unbuild" the database simply by going the +// opposite direction. More testing +func (s *Spanner) Drop() error { + ctx := context.Background() + res, err := s.adminClient.GetDatabaseDdl(ctx, &adminpb.GetDatabaseDdlRequest{ + Database: s.config.DatabaseName, + }) + if err != nil { + return &mdb.Error{OrigErr: err, Err: "drop failed"} + } + if len(res.Statements) == 0 { + return nil + } + + r := regexp.MustCompile(`(CREATE TABLE\s(\S+)\s)|(CREATE.+INDEX\s(\S+)\s)`) + stmts := make([]string, 0) + for i := len(res.Statements) - 1; i >= 0; i-- { + s := res.Statements[i] + m := r.FindSubmatch([]byte(s)) + + if len(m) == 0 { + continue + } else if tbl := m[2]; len(tbl) > 0 { + stmts = append(stmts, fmt.Sprintf(`DROP TABLE %s`, tbl)) + } else if idx := m[4]; len(idx) > 0 { + stmts = append(stmts, fmt.Sprintf(`DROP INDEX %s`, idx)) + } + } + + op, err := s.adminClient.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{ + Database: s.config.DatabaseName, + Statements: stmts, + }) + if err != nil { + return &mdb.Error{OrigErr: err, Query: []byte(strings.Join(stmts, "; "))} + } + if err := op.Wait(ctx); err != nil { + return &mdb.Error{OrigErr: err, Query: []byte(strings.Join(stmts, "; "))} + } + + if err := p.ensureVersionTable(); err != nil { + return err + } + + return nil +} + +func (s *Spanner) ensureVersionTable() error { + ctx := context.Background() + tbl := s.config.MigrationsTable + iter := s.dataClient.Single().Read(ctx, tbl, spanner.AllKeys(), nil) + if err := iter.Do(func(r *spanner.Row) error { return nil }); err == nil { + return nil + } + + stmt := fmt.Sprintf(`CREATE TABLE %s ( + Version INT64 NOT NULL, + Dirty BOOL NOT NULL + ) PRIMARY KEY(Version)`, tbl) + + op, err := s.adminClient.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{ + Database: s.config.DatabaseName, + Statements: []string{stmt}, + }) + + if err != nil { + return &mdb.Error{OrigErr: err, Query: []byte(stmt)} + } + if err := op.Wait(ctx); err != nil { + return &mdb.Error{OrigErr: err, Query: []byte(stmt)} + } + + return nil +}