mirror of
https://github.com/status-im/consul.git
synced 2025-01-22 03:29:43 +00:00
Adding ability to snapshot StateStore
This commit is contained in:
parent
b7beda2a63
commit
21478ce3a7
@ -5,7 +5,9 @@ import (
|
||||
"fmt"
|
||||
"github.com/hashicorp/consul/rpc"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
"log"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
// nextDBIndex is used to generate a new ID
|
||||
@ -25,6 +27,7 @@ const (
|
||||
queryServices
|
||||
queryServiceNodes
|
||||
queryServiceTagNodes
|
||||
queryAllServices
|
||||
)
|
||||
|
||||
// The StateStore is responsible for maintaining all the Consul
|
||||
@ -76,6 +79,7 @@ func (s *StateStore) initialize() error {
|
||||
pragmas := []string{
|
||||
"pragma journal_mode=memory;",
|
||||
"pragma foreign_keys=ON;",
|
||||
"pragma read_uncommitted=true;",
|
||||
}
|
||||
for _, p := range pragmas {
|
||||
if _, err := s.db.Exec(p); err != nil {
|
||||
@ -108,6 +112,7 @@ func (s *StateStore) initialize() error {
|
||||
queryServices: "SELECT DISTINCT service, tag FROM services",
|
||||
queryServiceNodes: "SELECT n.name, n.address, s.tag, s.port from nodes n, services s WHERE s.service=? AND s.node=n.name",
|
||||
queryServiceTagNodes: "SELECT n.name, n.address, s.tag, s.port from nodes n, services s WHERE s.service=? AND s.tag=? AND s.node=n.name",
|
||||
queryAllServices: "SELECT * FROM services",
|
||||
}
|
||||
for name, query := range queries {
|
||||
stmt, err := s.db.Prepare(query)
|
||||
@ -281,3 +286,59 @@ func parseServiceNodes(rows *sql.Rows, err error) rpc.ServiceNodes {
|
||||
}
|
||||
return nodes
|
||||
}
|
||||
|
||||
// Snapshot is used to create a point in time snapshot
|
||||
func (s *StateStore) Snapshot() (*StateStore, error) {
|
||||
defer func(start time.Time) {
|
||||
log.Printf("[INFO] StateStore Snapshot created in %v", time.Now().Sub(start))
|
||||
}(time.Now())
|
||||
|
||||
// Create a new state store
|
||||
state, err := NewStateStore()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Start a Tx on the new DB
|
||||
tx, err := state.db.Begin()
|
||||
if err != nil {
|
||||
state.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create the new statements we need
|
||||
ensureNode := tx.Stmt(state.prepared[queryEnsureNode])
|
||||
ensureService := tx.Stmt(state.prepared[queryEnsureService])
|
||||
|
||||
// Copy all the nodes
|
||||
nodes := s.Nodes()
|
||||
for i := 0; i < len(nodes); i += 2 {
|
||||
if _, err := ensureNode.Exec(nodes[i], nodes[i+1]); err != nil {
|
||||
state.Close()
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Copy all the services
|
||||
var node, service, tag string
|
||||
var port int
|
||||
rows, err := s.prepared[queryAllServices].Query()
|
||||
for rows.Next() {
|
||||
if err := rows.Scan(&node, &service, &tag, &port); err != nil {
|
||||
state.Close()
|
||||
return nil, err
|
||||
}
|
||||
if _, err := ensureService.Exec(node, service, tag, port); err != nil {
|
||||
state.Close()
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// Commit the Txn
|
||||
if err := tx.Commit(); err != nil {
|
||||
state.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return state, nil
|
||||
}
|
||||
|
@ -295,3 +295,79 @@ func TestServiceTagNodes(t *testing.T) {
|
||||
t.Fatalf("bad: %v", nodes)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStoreSnapshot(t *testing.T) {
|
||||
store, err := NewStateStore()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer store.Close()
|
||||
|
||||
if err := store.EnsureNode("foo", "127.0.0.1"); err != nil {
|
||||
t.Fatalf("err: %v")
|
||||
}
|
||||
|
||||
if err := store.EnsureNode("bar", "127.0.0.2"); err != nil {
|
||||
t.Fatalf("err: %v")
|
||||
}
|
||||
|
||||
if err := store.EnsureService("foo", "db", "master", 8000); err != nil {
|
||||
t.Fatalf("err: %v")
|
||||
}
|
||||
|
||||
if err := store.EnsureService("bar", "db", "slave", 8000); err != nil {
|
||||
t.Fatalf("err: %v")
|
||||
}
|
||||
|
||||
// Take a snapshot
|
||||
snap, err := store.Snapshot()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v")
|
||||
}
|
||||
defer snap.Close()
|
||||
|
||||
// Check snapshot has old values
|
||||
nodes := snap.Nodes()
|
||||
if len(nodes) != 4 {
|
||||
t.Fatalf("bad: %v", nodes)
|
||||
}
|
||||
|
||||
// Ensure we get the service entries
|
||||
services := snap.NodeServices("foo")
|
||||
if services["db"].Tag != "master" {
|
||||
t.Fatalf("bad: %v", services)
|
||||
}
|
||||
|
||||
services = snap.NodeServices("bar")
|
||||
if services["db"].Tag != "slave" {
|
||||
t.Fatalf("bad: %v", services)
|
||||
}
|
||||
|
||||
// Make some changes!
|
||||
if err := store.EnsureService("foo", "db", "slave", 8000); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if err := store.EnsureService("bar", "db", "master", 8000); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if err := store.EnsureNode("baz", "127.0.0.3"); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Check snapshot has old values
|
||||
nodes = snap.Nodes()
|
||||
if len(nodes) != 4 {
|
||||
t.Fatalf("bad: %v", nodes)
|
||||
}
|
||||
|
||||
// Ensure old service entries
|
||||
services = snap.NodeServices("foo")
|
||||
if services["db"].Tag != "master" {
|
||||
t.Fatalf("bad: %v", services)
|
||||
}
|
||||
|
||||
services = snap.NodeServices("bar")
|
||||
if services["db"].Tag != "slave" {
|
||||
t.Fatalf("bad: %v", services)
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user