mirror of https://github.com/status-im/consul.git
Begins split out of snapshots from the main FSM class.
This commit is contained in:
parent
c8e763667f
commit
f53f521072
|
@ -7,7 +7,6 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/armon/go-metrics"
|
|
||||||
"github.com/hashicorp/consul/agent/consul/state"
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/go-msgpack/codec"
|
"github.com/hashicorp/go-msgpack/codec"
|
||||||
|
@ -61,20 +60,6 @@ type FSM struct {
|
||||||
gc *state.TombstoneGC
|
gc *state.TombstoneGC
|
||||||
}
|
}
|
||||||
|
|
||||||
// consulSnapshot is used to provide a snapshot of the current
|
|
||||||
// state in a way that can be accessed concurrently with operations
|
|
||||||
// that may modify the live state.
|
|
||||||
type consulSnapshot struct {
|
|
||||||
state *state.Snapshot
|
|
||||||
}
|
|
||||||
|
|
||||||
// snapshotHeader is the first entry in our snapshot
|
|
||||||
type snapshotHeader struct {
|
|
||||||
// LastIndex is the last index that affects the data.
|
|
||||||
// This is used when we do the restore for watchers.
|
|
||||||
LastIndex uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
// New is used to construct a new FSM with a blank state.
|
// New is used to construct a new FSM with a blank state.
|
||||||
func New(gc *state.TombstoneGC, logOutput io.Writer) (*FSM, error) {
|
func New(gc *state.TombstoneGC, logOutput io.Writer) (*FSM, error) {
|
||||||
stateNew, err := state.NewStateStore(gc)
|
stateNew, err := state.NewStateStore(gc)
|
||||||
|
@ -140,7 +125,7 @@ func (c *FSM) Snapshot() (raft.FSMSnapshot, error) {
|
||||||
c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Since(start))
|
c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Since(start))
|
||||||
}(time.Now())
|
}(time.Now())
|
||||||
|
|
||||||
return &consulSnapshot{c.state.Snapshot()}, nil
|
return &snapshot{c.state.Snapshot()}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Restore streams in the snapshot and replaces the current state store with a
|
// Restore streams in the snapshot and replaces the current state store with a
|
||||||
|
@ -290,273 +275,3 @@ func (c *FSM) Restore(old io.ReadCloser) error {
|
||||||
stateOld.Abandon()
|
stateOld.Abandon()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *consulSnapshot) Persist(sink raft.SnapshotSink) error {
|
|
||||||
defer metrics.MeasureSince([]string{"consul", "fsm", "persist"}, time.Now())
|
|
||||||
defer metrics.MeasureSince([]string{"fsm", "persist"}, time.Now())
|
|
||||||
|
|
||||||
// Register the nodes
|
|
||||||
encoder := codec.NewEncoder(sink, msgpackHandle)
|
|
||||||
|
|
||||||
// Write the header
|
|
||||||
header := snapshotHeader{
|
|
||||||
LastIndex: s.state.LastIndex(),
|
|
||||||
}
|
|
||||||
if err := encoder.Encode(&header); err != nil {
|
|
||||||
sink.Cancel()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := s.persistNodes(sink, encoder); err != nil {
|
|
||||||
sink.Cancel()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := s.persistSessions(sink, encoder); err != nil {
|
|
||||||
sink.Cancel()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := s.persistACLs(sink, encoder); err != nil {
|
|
||||||
sink.Cancel()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := s.persistKVs(sink, encoder); err != nil {
|
|
||||||
sink.Cancel()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := s.persistTombstones(sink, encoder); err != nil {
|
|
||||||
sink.Cancel()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := s.persistPreparedQueries(sink, encoder); err != nil {
|
|
||||||
sink.Cancel()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := s.persistAutopilot(sink, encoder); err != nil {
|
|
||||||
sink.Cancel()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink,
|
|
||||||
encoder *codec.Encoder) error {
|
|
||||||
|
|
||||||
// Get all the nodes
|
|
||||||
nodes, err := s.state.Nodes()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register each node
|
|
||||||
for node := nodes.Next(); node != nil; node = nodes.Next() {
|
|
||||||
n := node.(*structs.Node)
|
|
||||||
req := structs.RegisterRequest{
|
|
||||||
Node: n.Node,
|
|
||||||
Address: n.Address,
|
|
||||||
TaggedAddresses: n.TaggedAddresses,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register the node itself
|
|
||||||
if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := encoder.Encode(&req); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register each service this node has
|
|
||||||
services, err := s.state.Services(n.Node)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
for service := services.Next(); service != nil; service = services.Next() {
|
|
||||||
if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
req.Service = service.(*structs.ServiceNode).ToNodeService()
|
|
||||||
if err := encoder.Encode(&req); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register each check this node has
|
|
||||||
req.Service = nil
|
|
||||||
checks, err := s.state.Checks(n.Node)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
for check := checks.Next(); check != nil; check = checks.Next() {
|
|
||||||
if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
req.Check = check.(*structs.HealthCheck)
|
|
||||||
if err := encoder.Encode(&req); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Save the coordinates separately since they are not part of the
|
|
||||||
// register request interface. To avoid copying them out, we turn
|
|
||||||
// them into batches with a single coordinate each.
|
|
||||||
coords, err := s.state.Coordinates()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
for coord := coords.Next(); coord != nil; coord = coords.Next() {
|
|
||||||
if _, err := sink.Write([]byte{byte(structs.CoordinateBatchUpdateType)}); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
updates := structs.Coordinates{coord.(*structs.Coordinate)}
|
|
||||||
if err := encoder.Encode(&updates); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *consulSnapshot) persistSessions(sink raft.SnapshotSink,
|
|
||||||
encoder *codec.Encoder) error {
|
|
||||||
sessions, err := s.state.Sessions()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for session := sessions.Next(); session != nil; session = sessions.Next() {
|
|
||||||
if _, err := sink.Write([]byte{byte(structs.SessionRequestType)}); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := encoder.Encode(session.(*structs.Session)); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *consulSnapshot) persistACLs(sink raft.SnapshotSink,
|
|
||||||
encoder *codec.Encoder) error {
|
|
||||||
acls, err := s.state.ACLs()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for acl := acls.Next(); acl != nil; acl = acls.Next() {
|
|
||||||
if _, err := sink.Write([]byte{byte(structs.ACLRequestType)}); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := encoder.Encode(acl.(*structs.ACL)); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
bs, err := s.state.ACLBootstrap()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if bs != nil {
|
|
||||||
if _, err := sink.Write([]byte{byte(structs.ACLBootstrapRequestType)}); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := encoder.Encode(bs); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *consulSnapshot) persistKVs(sink raft.SnapshotSink,
|
|
||||||
encoder *codec.Encoder) error {
|
|
||||||
entries, err := s.state.KVs()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for entry := entries.Next(); entry != nil; entry = entries.Next() {
|
|
||||||
if _, err := sink.Write([]byte{byte(structs.KVSRequestType)}); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := encoder.Encode(entry.(*structs.DirEntry)); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *consulSnapshot) persistTombstones(sink raft.SnapshotSink,
|
|
||||||
encoder *codec.Encoder) error {
|
|
||||||
stones, err := s.state.Tombstones()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for stone := stones.Next(); stone != nil; stone = stones.Next() {
|
|
||||||
if _, err := sink.Write([]byte{byte(structs.TombstoneRequestType)}); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// For historical reasons, these are serialized in the snapshots
|
|
||||||
// as KV entries. We want to keep the snapshot format compatible
|
|
||||||
// with pre-0.6 versions for now.
|
|
||||||
s := stone.(*state.Tombstone)
|
|
||||||
fake := &structs.DirEntry{
|
|
||||||
Key: s.Key,
|
|
||||||
RaftIndex: structs.RaftIndex{
|
|
||||||
ModifyIndex: s.Index,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
if err := encoder.Encode(fake); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *consulSnapshot) persistPreparedQueries(sink raft.SnapshotSink,
|
|
||||||
encoder *codec.Encoder) error {
|
|
||||||
queries, err := s.state.PreparedQueries()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, query := range queries {
|
|
||||||
if _, err := sink.Write([]byte{byte(structs.PreparedQueryRequestType)}); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := encoder.Encode(query); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *consulSnapshot) persistAutopilot(sink raft.SnapshotSink,
|
|
||||||
encoder *codec.Encoder) error {
|
|
||||||
autopilot, err := s.state.Autopilot()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if autopilot == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err := sink.Write([]byte{byte(structs.AutopilotRequestType)}); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := encoder.Encode(autopilot); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *consulSnapshot) Release() {
|
|
||||||
s.state.Close()
|
|
||||||
}
|
|
||||||
|
|
|
@ -0,0 +1,295 @@
|
||||||
|
package fsm
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/armon/go-metrics"
|
||||||
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
"github.com/hashicorp/go-msgpack/codec"
|
||||||
|
"github.com/hashicorp/raft"
|
||||||
|
)
|
||||||
|
|
||||||
|
// snapshot is used to provide a snapshot of the current
|
||||||
|
// state in a way that can be accessed concurrently with operations
|
||||||
|
// that may modify the live state.
|
||||||
|
type snapshot struct {
|
||||||
|
state *state.Snapshot
|
||||||
|
}
|
||||||
|
|
||||||
|
// snapshotHeader is the first entry in our snapshot
|
||||||
|
type snapshotHeader struct {
|
||||||
|
// LastIndex is the last index that affects the data.
|
||||||
|
// This is used when we do the restore for watchers.
|
||||||
|
LastIndex uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *snapshot) Persist(sink raft.SnapshotSink) error {
|
||||||
|
defer metrics.MeasureSince([]string{"consul", "fsm", "persist"}, time.Now())
|
||||||
|
defer metrics.MeasureSince([]string{"fsm", "persist"}, time.Now())
|
||||||
|
|
||||||
|
// Register the nodes
|
||||||
|
encoder := codec.NewEncoder(sink, msgpackHandle)
|
||||||
|
|
||||||
|
// Write the header
|
||||||
|
header := snapshotHeader{
|
||||||
|
LastIndex: s.state.LastIndex(),
|
||||||
|
}
|
||||||
|
if err := encoder.Encode(&header); err != nil {
|
||||||
|
sink.Cancel()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.persistNodes(sink, encoder); err != nil {
|
||||||
|
sink.Cancel()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.persistSessions(sink, encoder); err != nil {
|
||||||
|
sink.Cancel()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.persistACLs(sink, encoder); err != nil {
|
||||||
|
sink.Cancel()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.persistKVs(sink, encoder); err != nil {
|
||||||
|
sink.Cancel()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.persistTombstones(sink, encoder); err != nil {
|
||||||
|
sink.Cancel()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.persistPreparedQueries(sink, encoder); err != nil {
|
||||||
|
sink.Cancel()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.persistAutopilot(sink, encoder); err != nil {
|
||||||
|
sink.Cancel()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *snapshot) persistNodes(sink raft.SnapshotSink,
|
||||||
|
encoder *codec.Encoder) error {
|
||||||
|
|
||||||
|
// Get all the nodes
|
||||||
|
nodes, err := s.state.Nodes()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register each node
|
||||||
|
for node := nodes.Next(); node != nil; node = nodes.Next() {
|
||||||
|
n := node.(*structs.Node)
|
||||||
|
req := structs.RegisterRequest{
|
||||||
|
Node: n.Node,
|
||||||
|
Address: n.Address,
|
||||||
|
TaggedAddresses: n.TaggedAddresses,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register the node itself
|
||||||
|
if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := encoder.Encode(&req); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register each service this node has
|
||||||
|
services, err := s.state.Services(n.Node)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for service := services.Next(); service != nil; service = services.Next() {
|
||||||
|
if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
req.Service = service.(*structs.ServiceNode).ToNodeService()
|
||||||
|
if err := encoder.Encode(&req); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Register each check this node has
|
||||||
|
req.Service = nil
|
||||||
|
checks, err := s.state.Checks(n.Node)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for check := checks.Next(); check != nil; check = checks.Next() {
|
||||||
|
if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
req.Check = check.(*structs.HealthCheck)
|
||||||
|
if err := encoder.Encode(&req); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save the coordinates separately since they are not part of the
|
||||||
|
// register request interface. To avoid copying them out, we turn
|
||||||
|
// them into batches with a single coordinate each.
|
||||||
|
coords, err := s.state.Coordinates()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for coord := coords.Next(); coord != nil; coord = coords.Next() {
|
||||||
|
if _, err := sink.Write([]byte{byte(structs.CoordinateBatchUpdateType)}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
updates := structs.Coordinates{coord.(*structs.Coordinate)}
|
||||||
|
if err := encoder.Encode(&updates); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *snapshot) persistSessions(sink raft.SnapshotSink,
|
||||||
|
encoder *codec.Encoder) error {
|
||||||
|
sessions, err := s.state.Sessions()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for session := sessions.Next(); session != nil; session = sessions.Next() {
|
||||||
|
if _, err := sink.Write([]byte{byte(structs.SessionRequestType)}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := encoder.Encode(session.(*structs.Session)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *snapshot) persistACLs(sink raft.SnapshotSink,
|
||||||
|
encoder *codec.Encoder) error {
|
||||||
|
acls, err := s.state.ACLs()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for acl := acls.Next(); acl != nil; acl = acls.Next() {
|
||||||
|
if _, err := sink.Write([]byte{byte(structs.ACLRequestType)}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := encoder.Encode(acl.(*structs.ACL)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bs, err := s.state.ACLBootstrap()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if bs != nil {
|
||||||
|
if _, err := sink.Write([]byte{byte(structs.ACLBootstrapRequestType)}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := encoder.Encode(bs); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *snapshot) persistKVs(sink raft.SnapshotSink,
|
||||||
|
encoder *codec.Encoder) error {
|
||||||
|
entries, err := s.state.KVs()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for entry := entries.Next(); entry != nil; entry = entries.Next() {
|
||||||
|
if _, err := sink.Write([]byte{byte(structs.KVSRequestType)}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := encoder.Encode(entry.(*structs.DirEntry)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *snapshot) persistTombstones(sink raft.SnapshotSink,
|
||||||
|
encoder *codec.Encoder) error {
|
||||||
|
stones, err := s.state.Tombstones()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for stone := stones.Next(); stone != nil; stone = stones.Next() {
|
||||||
|
if _, err := sink.Write([]byte{byte(structs.TombstoneRequestType)}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// For historical reasons, these are serialized in the snapshots
|
||||||
|
// as KV entries. We want to keep the snapshot format compatible
|
||||||
|
// with pre-0.6 versions for now.
|
||||||
|
s := stone.(*state.Tombstone)
|
||||||
|
fake := &structs.DirEntry{
|
||||||
|
Key: s.Key,
|
||||||
|
RaftIndex: structs.RaftIndex{
|
||||||
|
ModifyIndex: s.Index,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := encoder.Encode(fake); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *snapshot) persistPreparedQueries(sink raft.SnapshotSink,
|
||||||
|
encoder *codec.Encoder) error {
|
||||||
|
queries, err := s.state.PreparedQueries()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, query := range queries {
|
||||||
|
if _, err := sink.Write([]byte{byte(structs.PreparedQueryRequestType)}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := encoder.Encode(query); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *snapshot) persistAutopilot(sink raft.SnapshotSink,
|
||||||
|
encoder *codec.Encoder) error {
|
||||||
|
autopilot, err := s.state.Autopilot()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if autopilot == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := sink.Write([]byte{byte(structs.AutopilotRequestType)}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := encoder.Encode(autopilot); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *snapshot) Release() {
|
||||||
|
s.state.Close()
|
||||||
|
}
|
Loading…
Reference in New Issue