mirror of https://github.com/status-im/migrate.git
Add driver for Neo4j
This commit is contained in:
parent
8fe1a56731
commit
e303d64469
|
@ -0,0 +1,20 @@
|
|||
# Neo4j Driver
|
||||
|
||||
* Runs migrations in transcations.
|
||||
That means that if a migration failes, it will be safely rolled back.
|
||||
|
||||
* Stores migration version details with the label ``SchemaMigrations``.
|
||||
An unique constraint for the field :SchemaMigrations(version) will be auto-generated.
|
||||
|
||||
* Neo4j cannot perform schema and data updates in a transaction, therefore it's necessary to use different migration files
|
||||
|
||||
## Usage
|
||||
|
||||
```bash
|
||||
migrate -url neo4j://user:password@host:port/db/data -path ./db/migrations create add_field_to_table
|
||||
migrate -url neo4j://user:password@host:port/db/data -path ./db/migrations up
|
||||
migrate help # for more info
|
||||
```
|
||||
## Author
|
||||
|
||||
* Carlos Forero, https://github.com/carlosforero
|
|
@ -0,0 +1,157 @@
|
|||
// Package neo4j implements the Driver interface.
|
||||
package neo4j
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"bytes"
|
||||
"strings"
|
||||
"errors"
|
||||
"github.com/jmcvetta/neoism"
|
||||
"github.com/mattes/migrate/driver"
|
||||
"github.com/mattes/migrate/file"
|
||||
"github.com/mattes/migrate/migrate/direction"
|
||||
)
|
||||
|
||||
type Driver struct {
|
||||
db *neoism.Database
|
||||
}
|
||||
|
||||
const labelName = "SchemaMigration"
|
||||
|
||||
func (driver *Driver) Initialize(url string) error {
|
||||
url = strings.Replace(url,"neo4j","http",1)
|
||||
|
||||
db, err := neoism.Connect(url)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
driver.db = db
|
||||
|
||||
if err := driver.ensureVersionConstraintExists(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (driver *Driver) Close() error {
|
||||
driver.db = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (driver *Driver) FilenameExtension() string {
|
||||
return "cql"
|
||||
}
|
||||
|
||||
func (driver *Driver) ensureVersionConstraintExists() error {
|
||||
uc, _ := driver.db.UniqueConstraints("SchemaMigration", "version")
|
||||
if len(uc) == 0 {
|
||||
_, err := driver.db.CreateUniqueConstraint("SchemaMigration", "version")
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (driver *Driver) setVersion(d direction.Direction, v uint64, invert bool) error {
|
||||
|
||||
cqUp := neoism.CypherQuery {
|
||||
Statement: `CREATE (n:SchemaMigration {version: {Version}}) RETURN n`,
|
||||
Parameters: neoism.Props{"Version": v},
|
||||
}
|
||||
|
||||
cqDown := neoism.CypherQuery {
|
||||
Statement: `MATCH (n:SchemaMigration {version: {Version}}) DELETE n`,
|
||||
Parameters: neoism.Props{"Version": v},
|
||||
}
|
||||
|
||||
var cq neoism.CypherQuery
|
||||
switch d {
|
||||
case direction.Up:
|
||||
if invert { cq = cqDown } else { cq = cqUp }
|
||||
case direction.Down:
|
||||
if invert { cq = cqUp } else { cq = cqDown }
|
||||
}
|
||||
return driver.db.Cypher(&cq)
|
||||
}
|
||||
|
||||
func (driver *Driver) Migrate(f file.File, pipe chan interface{}) {
|
||||
var err error
|
||||
|
||||
defer func() {
|
||||
if err != nil {
|
||||
// Invert version direction if we couldn't apply the changes for some reason.
|
||||
if err := driver.setVersion(f.Direction, f.Version, true); err != nil {
|
||||
pipe <- err
|
||||
}
|
||||
pipe <- err
|
||||
}
|
||||
close(pipe)
|
||||
}()
|
||||
|
||||
pipe <- f
|
||||
|
||||
|
||||
if err = driver.setVersion(f.Direction, f.Version, false); err != nil {
|
||||
pipe <- err
|
||||
return
|
||||
}
|
||||
|
||||
if err = f.ReadContent(); err != nil {
|
||||
pipe <- err
|
||||
return
|
||||
}
|
||||
|
||||
cQueries := []*neoism.CypherQuery{}
|
||||
|
||||
// Neoism doesn't support multiple statements per query.
|
||||
cqlStmts := bytes.Split(f.Content, []byte(";"))
|
||||
|
||||
for _, cqlStmt := range cqlStmts {
|
||||
cqlStmt = bytes.TrimSpace(cqlStmt)
|
||||
if len(cqlStmt) > 0 {
|
||||
cq := neoism.CypherQuery{Statement: string(cqlStmt)}
|
||||
cQueries = append( cQueries, &cq )
|
||||
}
|
||||
}
|
||||
|
||||
var tx *neoism.Tx
|
||||
|
||||
tx, err = driver.db.Begin(cQueries)
|
||||
if err != nil {
|
||||
pipe <- err
|
||||
for _, err := range tx.Errors {
|
||||
pipe <- errors.New(fmt.Sprintf("%v", err.Message))
|
||||
}
|
||||
if err = tx.Rollback(); err != nil {
|
||||
pipe <- err
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if err = tx.Commit(); err != nil {
|
||||
pipe <- err
|
||||
for _, err := range tx.Errors {
|
||||
pipe <- errors.New(fmt.Sprintf("%v", err.Message))
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (driver *Driver) Version() (uint64, error) {
|
||||
res := []struct {Version uint64 `json:"n.version"`}{}
|
||||
|
||||
cq := neoism.CypherQuery{
|
||||
Statement: `MATCH (n:SchemaMigration)
|
||||
RETURN n.version ORDER BY n.version DESC LIMIT 1`,
|
||||
Result: &res,
|
||||
}
|
||||
|
||||
if err := driver.db.Cypher(&cq); err != nil || len(res) == 0 {
|
||||
return 0, err
|
||||
}
|
||||
return res[0].Version, nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
driver.RegisterDriver("neo4j", &Driver{})
|
||||
}
|
|
@ -0,0 +1,98 @@
|
|||
package neo4j
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"os"
|
||||
|
||||
"github.com/jmcvetta/neoism"
|
||||
"github.com/mattes/migrate/file"
|
||||
"github.com/mattes/migrate/migrate/direction"
|
||||
pipep "github.com/mattes/migrate/pipe"
|
||||
)
|
||||
|
||||
// TestMigrate runs some additional tests on Migrate().
|
||||
// Basic testing is already done in migrate/migrate_test.go
|
||||
func TestMigrate(t *testing.T) {
|
||||
host := os.Getenv("NEO4J_PORT_7474_TCP_ADDR")
|
||||
port := os.Getenv("NEO4J_PORT_7474_TCP_PORT")
|
||||
|
||||
driverUrl := "http://neo4j:test@" + host + ":" + port + "/db/data"
|
||||
|
||||
// prepare clean database
|
||||
db, err := neoism.Connect(driverUrl)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
cq := neoism.CypherQuery{
|
||||
Statement: `DROP INDEX ON :Yolo(name)`,
|
||||
}
|
||||
|
||||
// If an error dropping the index then ignore it
|
||||
db.Cypher(&cq)
|
||||
|
||||
driverUrl = "neo4j://neo4j:test@" + host + ":" + port + "/db/data"
|
||||
|
||||
d := &Driver{}
|
||||
if err := d.Initialize(driverUrl); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
files := []file.File{
|
||||
{
|
||||
Path: "/foobar",
|
||||
FileName: "001_foobar.up.cql",
|
||||
Version: 1,
|
||||
Name: "foobar",
|
||||
Direction: direction.Up,
|
||||
Content: []byte(`
|
||||
CREATE INDEX ON :Yolo(name)
|
||||
`),
|
||||
},
|
||||
{
|
||||
Path: "/foobar",
|
||||
FileName: "001_foobar.down.cql",
|
||||
Version: 1,
|
||||
Name: "foobar",
|
||||
Direction: direction.Down,
|
||||
Content: []byte(`
|
||||
DROP INDEX ON :Yolo(name)
|
||||
`),
|
||||
},
|
||||
{
|
||||
Path: "/foobar",
|
||||
FileName: "002_foobar.up.cql",
|
||||
Version: 2,
|
||||
Name: "foobar",
|
||||
Direction: direction.Up,
|
||||
Content: []byte(`
|
||||
CREATE INDEX :Yolo(name) THIS WILL CAUSE AN ERROR
|
||||
`),
|
||||
},
|
||||
}
|
||||
|
||||
pipe := pipep.New()
|
||||
go d.Migrate(files[0], pipe)
|
||||
errs := pipep.ReadErrors(pipe)
|
||||
if len(errs) > 0 {
|
||||
t.Fatal(errs)
|
||||
}
|
||||
|
||||
pipe = pipep.New()
|
||||
go d.Migrate(files[1], pipe)
|
||||
errs = pipep.ReadErrors(pipe)
|
||||
if len(errs) > 0 {
|
||||
t.Fatal(errs)
|
||||
}
|
||||
|
||||
pipe = pipep.New()
|
||||
go d.Migrate(files[2], pipe)
|
||||
errs = pipep.ReadErrors(pipe)
|
||||
if len(errs) == 0 {
|
||||
t.Error("Expected test case to fail")
|
||||
}
|
||||
|
||||
if err := d.Close(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
1
main.go
1
main.go
|
@ -16,6 +16,7 @@ import (
|
|||
_ "github.com/mattes/migrate/driver/mysql"
|
||||
_ "github.com/mattes/migrate/driver/postgres"
|
||||
_ "github.com/mattes/migrate/driver/sqlite3"
|
||||
_ "github.com/mattes/migrate/driver/neo4j"
|
||||
"github.com/mattes/migrate/file"
|
||||
"github.com/mattes/migrate/migrate"
|
||||
"github.com/mattes/migrate/migrate/direction"
|
||||
|
|
Loading…
Reference in New Issue