add cassandra driver and function to retrieve networkSettings to get port bound to 9042

This commit is contained in:
Jolan Malassigne 2017-05-24 09:59:18 +02:00 committed by Christian Klotz
parent 6ecd671cfc
commit 89879968bb
9 changed files with 332 additions and 11 deletions

View File

@ -1,5 +1,5 @@
SOURCE ?= file go-bindata github aws-s3 google-cloud-storage SOURCE ?= file go-bindata github aws-s3 google-cloud-storage
DATABASE ?= postgres mysql redshift sqlite3 spanner DATABASE ?= postgres mysql redshift cassandra sqlite3 spanner
VERSION ?= $(shell git describe --tags 2>/dev/null | cut -c 2-) VERSION ?= $(shell git describe --tags 2>/dev/null | cut -c 2-)
TEST_FLAGS ?= TEST_FLAGS ?=
REPO_OWNER ?= $(shell cd .. && basename "$$(pwd)") REPO_OWNER ?= $(shell cd .. && basename "$$(pwd)")

View File

@ -24,8 +24,8 @@ Database drivers run migrations. [Add a new database?](database/driver.go)
* [PostgreSQL](database/postgres) * [PostgreSQL](database/postgres)
* [Redshift](database/redshift) * [Redshift](database/redshift)
* [Ql](database/ql) * [Ql](database/ql)
* [Cassandra](database/cassandra) ([todo #164](https://github.com/mattes/migrate/issues/164)) * [Cassandra](database/cassandra)
* [SQLite](database/sqlite3) * [SQLite](database/sqlite)
* [MySQL/ MariaDB](database/mysql) * [MySQL/ MariaDB](database/mysql)
* [Neo4j](database/neo4j) ([todo #167](https://github.com/mattes/migrate/issues/167)) * [Neo4j](database/neo4j) ([todo #167](https://github.com/mattes/migrate/issues/167))
* [MongoDB](database/mongodb) ([todo #169](https://github.com/mattes/migrate/issues/169)) * [MongoDB](database/mongodb) ([todo #169](https://github.com/mattes/migrate/issues/169))

7
cli/build_cassandra.go Normal file
View File

@ -0,0 +1,7 @@
// +build cassandra
package main
import (
_ "github.com/mattes/migrate/database/cassandra"
)

View File

@ -0,0 +1,29 @@
# Cassandra
* Drop command will not work on Cassandra 2.X because it rely on
system_schema table which comes with 3.X
* Other commands should work properly but are **not tested**
## Usage
`cassandra://host:port/keyspace?param1=value&param2=value2`
| URL Query | Default value | Description |
|------------|-------------|-----------|
| `x-migrations-table` | schema_migrations | Name of the migrations table |
| `port` | 9042 | The port to bind to |
| `consistency` | ALL | Migration consistency
| `protocol` | | Cassandra protocol version (3 or 4)
| `timeout` | 1 minute | Migration timeout
`timeout` is parsed using [time.ParseDuration(s string)](https://golang.org/pkg/time/#ParseDuration)
## Upgrading from v1
1. Write down the current migration version from schema_migrations
2. `DROP TABLE schema_migrations`
4. Download and install the latest migrate version.
5. Force the current migration version with `migrate force <current_version>`.

View File

@ -0,0 +1,223 @@
package cassandra
import (
"fmt"
"io"
"io/ioutil"
nurl "net/url"
"github.com/gocql/gocql"
"time"
"github.com/mattes/migrate/database"
"strconv"
)
func init() {
db := new(Cassandra)
database.Register("cassandra", db)
}
var DefaultMigrationsTable = "schema_migrations"
var dbLocked = false
var (
ErrNilConfig = fmt.Errorf("no config")
ErrNoKeyspace = fmt.Errorf("no keyspace provided")
ErrDatabaseDirty = fmt.Errorf("database is dirty")
)
type Config struct {
MigrationsTable string
KeyspaceName string
}
type Cassandra struct {
session *gocql.Session
isLocked bool
// Open and WithInstance need to guarantee that config is never nil
config *Config
}
func (p *Cassandra) Open(url string) (database.Driver, error) {
u, err := nurl.Parse(url)
if err != nil {
return nil, err
}
// Check for missing mandatory attributes
if len(u.Path) == 0 {
return nil, ErrNoKeyspace
}
migrationsTable := u.Query().Get("x-migrations-table")
if len(migrationsTable) == 0 {
migrationsTable = DefaultMigrationsTable
}
p.config = &Config{
KeyspaceName: u.Path,
MigrationsTable: migrationsTable,
}
cluster := gocql.NewCluster(u.Host)
cluster.Keyspace = u.Path[1:len(u.Path)]
cluster.Consistency = gocql.All
cluster.Timeout = 1 * time.Minute
// Retrieve query string configuration
if len(u.Query().Get("consistency")) > 0 {
var consistency gocql.Consistency
consistency, err = parseConsistency(u.Query().Get("consistency"))
if err != nil {
return nil, err
}
cluster.Consistency = consistency
}
if len(u.Query().Get("protocol")) > 0 {
var protoversion int
protoversion, err = strconv.Atoi(u.Query().Get("protocol"))
if err != nil {
return nil, err
}
cluster.ProtoVersion = protoversion
}
if len(u.Query().Get("timeout")) > 0 {
var timeout time.Duration
timeout, err = time.ParseDuration(u.Query().Get("timeout"))
if err != nil {
return nil, err
}
cluster.Timeout = timeout
}
p.session, err = cluster.CreateSession()
if err != nil {
return nil, err
}
if err := p.ensureVersionTable(); err != nil {
return nil, err
}
return p, nil
}
func (p *Cassandra) Close() error {
p.session.Close()
return nil
}
func (p *Cassandra) Lock() error {
if (dbLocked) {
return database.ErrLocked
}
dbLocked = true
return nil
}
func (p *Cassandra) Unlock() error {
dbLocked = false
return nil
}
func (p *Cassandra) Run(migration io.Reader) error {
migr, err := ioutil.ReadAll(migration)
if err != nil {
return err
}
// run migration
query := string(migr[:])
if err := p.session.Query(query).Exec(); err != nil {
// TODO: cast to Cassandra error and get line number
return database.Error{OrigErr: err, Err: "migration failed", Query: migr}
}
return nil
}
func (p *Cassandra) SetVersion(version int, dirty bool) error {
query := `TRUNCATE "` + p.config.MigrationsTable + `"`
if err := p.session.Query(query).Exec(); err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}
if version >= 0 {
query = `INSERT INTO "` + p.config.MigrationsTable + `" (version, dirty) VALUES (?, ?)`
if err := p.session.Query(query, version, dirty).Exec(); err != nil {
return &database.Error{OrigErr: err, Query: []byte(query)}
}
}
return nil
}
// Return current keyspace version
func (p *Cassandra) Version() (version int, dirty bool, err error) {
query := `SELECT version, dirty FROM "` + p.config.MigrationsTable + `" LIMIT 1`
err = p.session.Query(query).Scan(&version, &dirty)
switch {
case err == gocql.ErrNotFound:
return database.NilVersion, false, nil
case err != nil:
if _, ok := err.(*gocql.Error); ok {
return database.NilVersion, false, nil
}
return 0, false, &database.Error{OrigErr: err, Query: []byte(query)}
default:
return version, dirty, nil
}
}
func (p *Cassandra) Drop() error {
// select all tables in current schema
query := fmt.Sprintf(`SELECT table_name from system_schema.tables WHERE keyspace_name='%s'`, p.config.KeyspaceName[1:]) // Skip '/' character
iter := p.session.Query(query).Iter()
var tableName string
for iter.Scan(&tableName) {
err := p.session.Query(fmt.Sprintf(`DROP TABLE %s`, tableName)).Exec()
if err != nil {
return err
}
}
// Re-create the version table
if err := p.ensureVersionTable(); err != nil {
return err
}
return nil
}
// Ensure version table exists
func (p *Cassandra) ensureVersionTable() error {
err := p.session.Query(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (version bigint, dirty boolean, PRIMARY KEY(version))", p.config.MigrationsTable)).Exec()
if err != nil {
return err
}
if _, _, err = p.Version(); err != nil {
return err
}
return nil
}
// ParseConsistency wraps gocql.ParseConsistency
// to return an error instead of a panicking.
func parseConsistency(consistencyStr string) (consistency gocql.Consistency, err error) {
defer func() {
if r := recover(); r != nil {
var ok bool
err, ok = r.(error)
if !ok {
err = fmt.Errorf("Failed to parse consistency \"%s\": %v", consistencyStr, r)
}
}
}()
consistency = gocql.ParseConsistency(consistencyStr)
return consistency, nil
}

View File

@ -0,0 +1,53 @@
package cassandra
import (
"fmt"
"testing"
dt "github.com/mattes/migrate/database/testing"
mt "github.com/mattes/migrate/testing"
"github.com/gocql/gocql"
"time"
"strconv"
)
var versions = []mt.Version{
{"cassandra:3.0.10", []string{}},
{"cassandra:3.0", []string{}},
}
func isReady(i mt.Instance) bool {
// Cassandra exposes 5 ports (7000, 7001, 7199, 9042 & 9160)
// We only need the port bound to 9042, but we can only access to the first one
// through 'i.Port()' (which calls DockerContainer.firstPortMapping())
// So we need to get port mapping to retrieve correct port number bound to 9042
portMap := i.NetworkSettings().Ports
port, _ := strconv.Atoi(portMap["9042/tcp"][0].HostPort)
cluster := gocql.NewCluster(i.Host())
cluster.Port = port
//cluster.ProtoVersion = 4
cluster.Consistency = gocql.All
cluster.Timeout = 1 * time.Minute
p, err := cluster.CreateSession()
if err != nil {
return false
}
// Create keyspace for tests
p.Query("CREATE KEYSPACE testks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor':1}").Exec()
return true
}
func Test(t *testing.T) {
mt.ParallelTest(t, versions, isReady,
func(t *testing.T, i mt.Instance) {
p := &Cassandra{}
portMap := i.NetworkSettings().Ports
port, _ := strconv.Atoi(portMap["9042/tcp"][0].HostPort)
addr := fmt.Sprintf("cassandra://%v:%v/testks", i.Host(), port)
d, err := p.Open(addr)
if err != nil {
t.Fatalf("%v", err)
}
dt.Test(t, d, []byte("SELECT table_name from system_schema.tables"))
})
}

View File

@ -32,6 +32,8 @@ var drivers = make(map[string]Driver)
// All other functions are tested by tests in database/testing. // All other functions are tested by tests in database/testing.
// Saves you some time and makes sure all database drivers behave the same way. // Saves you some time and makes sure all database drivers behave the same way.
// 5. Call Register in init(). // 5. Call Register in init().
// 6. Create a migrate/cli/build_<driver-name>.go file
// 7. Add driver name in 'DATABASE' variable in Makefile
// //
// Guidelines: // Guidelines:
// * Don't try to correct user input. Don't assume things. // * Don't try to correct user input. Don't assume things.
@ -71,7 +73,7 @@ type Driver interface {
// Dirty means, a previous migration failed and user interaction is required. // Dirty means, a previous migration failed and user interaction is required.
Version() (version int, dirty bool, err error) Version() (version int, dirty bool, err error)
// Drop deletes everyting in the database. // Drop deletes everything in the database.
Drop() error Drop() error
} }

View File

@ -3,7 +3,7 @@ package testing
import ( import (
"bufio" "bufio"
"context" // TODO: is issue with go < 1.7? "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
@ -12,7 +12,6 @@ import (
"strings" "strings"
"testing" "testing"
"time" "time"
dockertypes "github.com/docker/docker/api/types" dockertypes "github.com/docker/docker/api/types"
dockercontainer "github.com/docker/docker/api/types/container" dockercontainer "github.com/docker/docker/api/types/container"
dockernetwork "github.com/docker/docker/api/types/network" dockernetwork "github.com/docker/docker/api/types/network"
@ -202,6 +201,11 @@ func (d *DockerContainer) Port() uint {
return port return port
} }
func (d *DockerContainer) NetworkSettings() dockertypes.NetworkSettings {
netSettings := d.ContainerJSON.NetworkSettings
return *netSettings
}
type dockerImagePullOutput struct { type dockerImagePullOutput struct {
Status string `json:"status"` Status string `json:"status"`
ProgressDetails struct { ProgressDetails struct {

View File

@ -6,6 +6,8 @@ import (
"strconv" "strconv"
"testing" "testing"
"time" "time"
dockertypes "github.com/docker/docker/api/types"
) )
type IsReadyFunc func(Instance) bool type IsReadyFunc func(Instance) bool
@ -87,5 +89,6 @@ func containerLogs(t *testing.T, c *DockerContainer) []byte {
type Instance interface { type Instance interface {
Host() string Host() string
Port() uint Port() uint
NetworkSettings() dockertypes.NetworkSettings
KeepForDebugging() KeepForDebugging()
} }