- Add dependency for dep

- Add transaction mode for migrations
This commit is contained in:
DBobrov 2019-01-03 18:00:23 +03:00
parent bcd4f6e7dd
commit 309edb6da6
5 changed files with 155 additions and 23 deletions

83
Gopkg.lock generated
View File

@ -262,6 +262,14 @@
revision = "d523deb1b23d913de5bdada721a6071e71283618"
version = "v1.4.0"
[[projects]]
digest = "1:586ea76dbd0374d6fb649a91d70d652b7fe0ccffb8910a77468e7702e7901f3d"
name = "github.com/go-stack/stack"
packages = ["."]
pruneopts = "UT"
revision = "2fee6af1a9795aafbe0253a0cfbdf668e1fb8a9a"
version = "v1.8.0"
[[projects]]
branch = "master"
digest = "1:d21f3471df3c4109c9b7c5569363a082b9fcde4fea49e4ce332ad307cfc4e359"
@ -413,6 +421,45 @@
revision = "25ecb14adfc7543176f7d85291ec7dba82c6f7e4"
version = "v1.9.0"
[[projects]]
digest = "1:820eeb72f78947f9b2e87bd99a57b5cc791c8fb47676a5c3824d408441ab1546"
name = "github.com/mongodb/mongo-go-driver"
packages = [
"bson",
"bson/bsoncodec",
"bson/bsonrw",
"bson/bsontype",
"bson/primitive",
"event",
"internal",
"mongo",
"mongo/options",
"mongo/readconcern",
"mongo/readpref",
"mongo/writeconcern",
"tag",
"version",
"x/bsonx",
"x/bsonx/bsoncore",
"x/mongo/driver",
"x/mongo/driver/auth",
"x/mongo/driver/auth/internal/gssapi",
"x/mongo/driver/session",
"x/mongo/driver/topology",
"x/mongo/driver/uuid",
"x/network/address",
"x/network/command",
"x/network/compressor",
"x/network/connection",
"x/network/connstring",
"x/network/description",
"x/network/result",
"x/network/wiremessage",
]
pruneopts = "UT"
revision = "29905d4bda472574c8b499e2f93b5b2747d8fbd7"
version = "v0.1.0"
[[projects]]
digest = "1:ee4d4af67d93cc7644157882329023ce9a7bcfce956a079069a9405521c7cc8d"
name = "github.com/opencontainers/go-digest"
@ -440,6 +487,22 @@
revision = "645ef00459ed84a119197bfb8d8205042c6df63d"
version = "v0.8.0"
[[projects]]
branch = "master"
digest = "1:40fdfd6ab85ca32b6935853bbba35935dcb1d796c8135efd85947566c76e662e"
name = "github.com/xdg/scram"
packages = ["."]
pruneopts = "UT"
revision = "7eeb5667e42c09cb51bf7b7c28aea8c56767da90"
[[projects]]
branch = "master"
digest = "1:f5c1d04bc09c644c592b45b9f0bad4030521b1a7d11c7dadbb272d9439fa6e8e"
name = "github.com/xdg/stringprep"
packages = ["."]
pruneopts = "UT"
revision = "73f8eece6fdcd902c185bf651de50f3828bed5ed"
[[projects]]
digest = "1:8e7fcf54cc7c0c3aba7801812ae70343585b3cb663f6cb11ad50c0c28552f359"
name = "go.opencensus.io"
@ -463,6 +526,14 @@
revision = "79993219becaa7e29e3b60cb67f5b8e82dee11d6"
version = "v0.17.0"
[[projects]]
branch = "master"
digest = "1:f92f6956e4059f6a3efc14924d2dd58ba90da25cc57fe07ae3779ef2f5e0c5f2"
name = "golang.org/x/crypto"
packages = ["pbkdf2"]
pruneopts = "UT"
revision = "8d7daa0c54b357f3071e11eaef7efc4e19a417e2"
[[projects]]
branch = "master"
digest = "1:56975096c005d57a2c9c2faefbec772c44c29c084af35facbfca85961469ff6d"
@ -496,6 +567,14 @@
pruneopts = "UT"
revision = "c57b0facaced709681d9f90397429b9430a74754"
[[projects]]
branch = "master"
digest = "1:75515eedc0dc2cb0b40372008b616fa2841d831c63eedd403285ff286c593295"
name = "golang.org/x/sync"
packages = ["semaphore"]
pruneopts = "UT"
revision = "37e7f081c4d4c64e13b10787722085407fe5d15f"
[[projects]]
branch = "master"
digest = "1:f5aa274a0377f85735edc7fedfb0811d3cbc20af91633797cb359e29c3272271"
@ -669,6 +748,10 @@
"github.com/kshvakov/clickhouse",
"github.com/lib/pq",
"github.com/mattn/go-sqlite3",
"github.com/mongodb/mongo-go-driver/bson",
"github.com/mongodb/mongo-go-driver/mongo",
"github.com/mongodb/mongo-go-driver/x/bsonx",
"github.com/mongodb/mongo-go-driver/x/network/connstring",
"golang.org/x/net/context",
"golang.org/x/tools/godoc/vfs",
"golang.org/x/tools/godoc/vfs/mapfs",

View File

@ -81,6 +81,10 @@
branch = "master"
name = "google.golang.org/genproto"
[[constraint]]
name = "github.com/mongodb/mongo-go-driver"
version = "0.1.0"
[[override]]
name = "cloud.google.com/go"
version = "0.27.0"

View File

@ -12,6 +12,7 @@
| URL Query | WithInstance Config | Description |
|------------|---------------------|-------------|
| `x-migrations-table` | `MigrationsTable` | Name of the migrations table |
| `x-transaction-mode` | `TransactionMode` | If set to `true` wrap commands in [transaction](https://docs.mongodb.com/manual/core/transactions). Available only for replica set. |
| `dbname` | `DatabaseName` | The name of the database to connect to |
| `user` | | The user to sign in as. Can be omitted |
| `password` | | The user's password. Can be omitted |

View File

@ -19,7 +19,7 @@ func init() {
database.Register("mongodb", &Mongo{})
}
var DefaultMigrationsTable = "schema_migrations"
var DefaultMigrationsCollection = "schema_migrations"
var (
ErrNoDatabaseName = fmt.Errorf("no database name")
@ -36,6 +36,7 @@ type Mongo struct {
type Config struct {
DatabaseName string
MigrationsCollection string
TransactionMode bool
}
type versionInfo struct {
@ -51,7 +52,7 @@ func WithInstance(instance *mongo.Client, config *Config) (database.Driver, erro
return nil, ErrNoDatabaseName
}
if len(config.MigrationsCollection) == 0 {
config.MigrationsCollection = DefaultMigrationsTable
config.MigrationsCollection = DefaultMigrationsCollection
}
mc := &Mongo{
client: instance,
@ -76,9 +77,11 @@ func (m *Mongo) Open(dsn string) (database.Driver, error) {
}
migrationsCollection := purl.Query().Get("x-migrations-collection")
if len(migrationsCollection) == 0 {
migrationsCollection = DefaultMigrationsTable
migrationsCollection = DefaultMigrationsCollection
}
transactionMode := purl.Query().Get("x-transaction-mode") == "true"
q := migrate.FilterCustomQuery(purl)
q.Scheme = "mongodb"
@ -92,6 +95,7 @@ func (m *Mongo) Open(dsn string) (database.Driver, error) {
mc, err := WithInstance(client, &Config{
DatabaseName: uri.Database,
MigrationsCollection: migrationsCollection,
TransactionMode: transactionMode,
})
if err != nil {
return nil, err
@ -134,11 +138,45 @@ func (m *Mongo) Run(migration io.Reader) error {
if err != nil {
return fmt.Errorf("unmarshaling json error: %s", err)
}
for _, cmd := range cmds {
err := m.db.RunCommand(context.TODO(), cmd).Err()
if err != nil {
if m.config.TransactionMode {
if err := m.executeCommandsWithTransaction(context.TODO(), cmds); err != nil {
return err
}
} else {
if err := m.executeCommands(context.TODO(), cmds); err != nil {
return err
}
}
return nil
}
func (m *Mongo) executeCommandsWithTransaction(ctx context.Context, cmds []bsonx.Doc) error {
err := m.db.Client().UseSession(ctx, func(sessionContext mongo.SessionContext) error {
if err := sessionContext.StartTransaction(); err != nil {
return &database.Error{OrigErr: err, Err: "failed to start transaction"}
}
if err := m.executeCommands(sessionContext, cmds); err != nil {
//When command execution is failed, it's aborting transaction
//If you tried to call abortTransaction, it`s return error that transaction already aborted
return err
}
if err := sessionContext.CommitTransaction(sessionContext); err != nil {
return &database.Error{OrigErr: err, Err: "failed to commit transaction"}
}
return nil
})
if err != nil {
return err
}
return nil
}
func (m *Mongo) executeCommands(ctx context.Context, cmds []bsonx.Doc) error {
for _, cmd := range cmds {
err := m.db.RunCommand(ctx, cmd).Err()
if err != nil {
return &database.Error{OrigErr: err, Err: fmt.Sprintf("failed to execute command:%s", cmd.String())}
}
}
return nil
}

View File

@ -72,26 +72,32 @@ func TestWithAuth(t *testing.T) {
if err != nil {
t.Fatalf("%v", err)
}
driverWithAuth, err := p.Open(fmt.Sprintf("mongodb://deminem:gogo@%s:%v/testMigration", i.Host(), i.Port()))
if err != nil {
t.Fatalf("%v", err)
testcases := []struct {
name string
connectUri string
isErrorExpected bool
}{
{"right auth data", "mongodb://deminem:gogo@%s:%v/testMigration", false},
{"wrong auth data", "mongodb://wrong:auth@%s:%v/testMigration", true},
}
defer driverWithAuth.Close()
insertCMD := []byte(`[{"insert":"hello","documents":[{"wild":"world"}]}]`)
err = driverWithAuth.Run(bytes.NewReader(insertCMD))
if err != nil {
t.Fatalf("%v", err)
}
driverWithWrongAuth, err := p.Open(fmt.Sprintf("mongodb://wrong:auth@%s:%v/testMigration", i.Host(), i.Port()))
if err != nil {
t.Fatalf("%v", err)
}
defer driverWithWrongAuth.Close()
err = driverWithWrongAuth.Run(bytes.NewReader(insertCMD))
if err == nil {
t.Fatal("no error with wrong authorization")
for _, tcase := range testcases {
t.Run(tcase.name, func(t *testing.T) {
mc := &Mongo{}
d, err := mc.Open(fmt.Sprintf(tcase.connectUri, i.Host(), i.Port()))
if err != nil {
t.Fatalf("%v", err)
}
defer d.Close()
err = d.Run(bytes.NewReader(insertCMD))
switch {
case tcase.isErrorExpected && err == nil:
t.Fatalf("no error when expected")
case !tcase.isErrorExpected && err != nil:
t.Fatalf("unexpected error: %v", err)
}
})
}
})
}