mirror of https://github.com/status-im/consul.git
Enhance the output of consul snapshot inspect (#8787)
This commit is contained in:
parent
5b2833d1a6
commit
9bb348c6c7
|
@ -0,0 +1,3 @@
|
|||
```release-note:feature
|
||||
cli: update `snapshot inspect` command to provide more detailed snapshot data
|
||||
```
|
|
@ -159,28 +159,7 @@ func (c *FSM) Restore(old io.ReadCloser) error {
|
|||
restore := stateNew.Restore()
|
||||
defer restore.Abort()
|
||||
|
||||
// Create a decoder
|
||||
dec := codec.NewDecoder(old, structs.MsgpackHandle)
|
||||
|
||||
// Read in the header
|
||||
var header snapshotHeader
|
||||
if err := dec.Decode(&header); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Populate the new state
|
||||
msgType := make([]byte, 1)
|
||||
for {
|
||||
// Read the message type
|
||||
_, err := old.Read(msgType)
|
||||
if err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Decode
|
||||
msg := structs.MessageType(msgType[0])
|
||||
handler := func(header *SnapshotHeader, msg structs.MessageType, dec *codec.Decoder) error {
|
||||
switch {
|
||||
case msg == structs.ChunkingStateType:
|
||||
chunkState := &raftchunking.State{
|
||||
|
@ -194,13 +173,18 @@ func (c *FSM) Restore(old io.ReadCloser) error {
|
|||
}
|
||||
case restorers[msg] != nil:
|
||||
fn := restorers[msg]
|
||||
if err := fn(&header, restore, dec); err != nil {
|
||||
if err := fn(header, restore, dec); err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("Unrecognized msg type %d", msg)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if err := ReadSnapshot(old, handler); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := restore.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -218,3 +202,35 @@ func (c *FSM) Restore(old io.ReadCloser) error {
|
|||
stateOld.Abandon()
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReadSnapshot decodes each message type and utilizes the handler function to
|
||||
// process each message type individually
|
||||
func ReadSnapshot(r io.Reader, handler func(header *SnapshotHeader, msg structs.MessageType, dec *codec.Decoder) error) error {
|
||||
// Create a decoder
|
||||
dec := codec.NewDecoder(r, structs.MsgpackHandle)
|
||||
|
||||
// Read in the header
|
||||
var header SnapshotHeader
|
||||
if err := dec.Decode(&header); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Populate the new state
|
||||
msgType := make([]byte, 1)
|
||||
for {
|
||||
// Read the message type
|
||||
_, err := r.Read(msgType)
|
||||
if err == io.EOF {
|
||||
return nil
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Decode
|
||||
msg := structs.MessageType(msgType[0])
|
||||
|
||||
if err := handler(&header, msg, dec); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,8 +20,8 @@ type snapshot struct {
|
|||
chunkState *raftchunking.State
|
||||
}
|
||||
|
||||
// snapshotHeader is the first entry in our snapshot
|
||||
type snapshotHeader struct {
|
||||
// SnapshotHeader is the first entry in our snapshot
|
||||
type SnapshotHeader struct {
|
||||
// LastIndex is the last index that affects the data.
|
||||
// This is used when we do the restore for watchers.
|
||||
LastIndex uint64
|
||||
|
@ -40,7 +40,7 @@ func registerPersister(fn persister) {
|
|||
}
|
||||
|
||||
// restorer is a function used to load back a snapshot of the FSM state.
|
||||
type restorer func(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error
|
||||
type restorer func(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error
|
||||
|
||||
// restorers is a map of restore functions by message type.
|
||||
var restorers map[structs.MessageType]restorer
|
||||
|
@ -62,7 +62,7 @@ func (s *snapshot) Persist(sink raft.SnapshotSink) error {
|
|||
defer metrics.MeasureSince([]string{"fsm", "persist"}, time.Now())
|
||||
|
||||
// Write the header
|
||||
header := snapshotHeader{
|
||||
header := SnapshotHeader{
|
||||
LastIndex: s.state.LastIndex(),
|
||||
}
|
||||
encoder := codec.NewEncoder(sink, structs.MsgpackHandle)
|
||||
|
|
|
@ -506,7 +506,7 @@ func (s *snapshot) persistIndex(sink raft.SnapshotSink, encoder *codec.Encoder)
|
|||
return nil
|
||||
}
|
||||
|
||||
func restoreRegistration(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
func restoreRegistration(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.RegisterRequest
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
|
@ -517,7 +517,7 @@ func restoreRegistration(header *snapshotHeader, restore *state.Restore, decoder
|
|||
return nil
|
||||
}
|
||||
|
||||
func restoreKV(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
func restoreKV(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.DirEntry
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
|
@ -528,7 +528,7 @@ func restoreKV(header *snapshotHeader, restore *state.Restore, decoder *codec.De
|
|||
return nil
|
||||
}
|
||||
|
||||
func restoreTombstone(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
func restoreTombstone(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.DirEntry
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
|
@ -547,7 +547,7 @@ func restoreTombstone(header *snapshotHeader, restore *state.Restore, decoder *c
|
|||
return nil
|
||||
}
|
||||
|
||||
func restoreSession(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
func restoreSession(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.Session
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
|
@ -558,7 +558,7 @@ func restoreSession(header *snapshotHeader, restore *state.Restore, decoder *cod
|
|||
return nil
|
||||
}
|
||||
|
||||
func restoreACL(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
func restoreACL(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.ACL
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
|
@ -571,7 +571,7 @@ func restoreACL(header *snapshotHeader, restore *state.Restore, decoder *codec.D
|
|||
}
|
||||
|
||||
// DEPRECATED (ACL-Legacy-Compat) - remove once v1 acl compat is removed
|
||||
func restoreACLBootstrap(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
func restoreACLBootstrap(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.ACLBootstrap
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
|
@ -582,7 +582,7 @@ func restoreACLBootstrap(header *snapshotHeader, restore *state.Restore, decoder
|
|||
return restore.IndexRestore(&state.IndexEntry{Key: "acl-token-bootstrap", Value: req.ModifyIndex})
|
||||
}
|
||||
|
||||
func restoreCoordinates(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
func restoreCoordinates(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.Coordinates
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
|
@ -593,7 +593,7 @@ func restoreCoordinates(header *snapshotHeader, restore *state.Restore, decoder
|
|||
return nil
|
||||
}
|
||||
|
||||
func restorePreparedQuery(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
func restorePreparedQuery(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.PreparedQuery
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
|
@ -604,7 +604,7 @@ func restorePreparedQuery(header *snapshotHeader, restore *state.Restore, decode
|
|||
return nil
|
||||
}
|
||||
|
||||
func restoreAutopilot(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
func restoreAutopilot(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req autopilot.Config
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
|
@ -615,7 +615,7 @@ func restoreAutopilot(header *snapshotHeader, restore *state.Restore, decoder *c
|
|||
return nil
|
||||
}
|
||||
|
||||
func restoreLegacyIntention(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
func restoreLegacyIntention(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.Intention
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
|
@ -627,7 +627,7 @@ func restoreLegacyIntention(header *snapshotHeader, restore *state.Restore, deco
|
|||
return nil
|
||||
}
|
||||
|
||||
func restoreConnectCA(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
func restoreConnectCA(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.CARoot
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
|
@ -638,7 +638,7 @@ func restoreConnectCA(header *snapshotHeader, restore *state.Restore, decoder *c
|
|||
return nil
|
||||
}
|
||||
|
||||
func restoreConnectCAProviderState(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
func restoreConnectCAProviderState(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.CAConsulProviderState
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
|
@ -649,7 +649,7 @@ func restoreConnectCAProviderState(header *snapshotHeader, restore *state.Restor
|
|||
return nil
|
||||
}
|
||||
|
||||
func restoreConnectCAConfig(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
func restoreConnectCAConfig(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.CAConfiguration
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
|
@ -660,7 +660,7 @@ func restoreConnectCAConfig(header *snapshotHeader, restore *state.Restore, deco
|
|||
return nil
|
||||
}
|
||||
|
||||
func restoreIndex(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
func restoreIndex(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req state.IndexEntry
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
|
@ -668,7 +668,7 @@ func restoreIndex(header *snapshotHeader, restore *state.Restore, decoder *codec
|
|||
return restore.IndexRestore(&req)
|
||||
}
|
||||
|
||||
func restoreToken(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
func restoreToken(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.ACLToken
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
|
@ -688,7 +688,7 @@ func restoreToken(header *snapshotHeader, restore *state.Restore, decoder *codec
|
|||
return restore.ACLToken(&req)
|
||||
}
|
||||
|
||||
func restorePolicy(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
func restorePolicy(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.ACLPolicy
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
|
@ -696,7 +696,7 @@ func restorePolicy(header *snapshotHeader, restore *state.Restore, decoder *code
|
|||
return restore.ACLPolicy(&req)
|
||||
}
|
||||
|
||||
func restoreConfigEntry(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
func restoreConfigEntry(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.ConfigEntryRequest
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
|
@ -704,7 +704,7 @@ func restoreConfigEntry(header *snapshotHeader, restore *state.Restore, decoder
|
|||
return restore.ConfigEntry(req.Entry)
|
||||
}
|
||||
|
||||
func restoreRole(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
func restoreRole(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.ACLRole
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
|
@ -712,7 +712,7 @@ func restoreRole(header *snapshotHeader, restore *state.Restore, decoder *codec.
|
|||
return restore.ACLRole(&req)
|
||||
}
|
||||
|
||||
func restoreBindingRule(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
func restoreBindingRule(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.ACLBindingRule
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
|
@ -720,7 +720,7 @@ func restoreBindingRule(header *snapshotHeader, restore *state.Restore, decoder
|
|||
return restore.ACLBindingRule(&req)
|
||||
}
|
||||
|
||||
func restoreAuthMethod(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
func restoreAuthMethod(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.ACLAuthMethod
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
|
@ -728,7 +728,7 @@ func restoreAuthMethod(header *snapshotHeader, restore *state.Restore, decoder *
|
|||
return restore.ACLAuthMethod(&req)
|
||||
}
|
||||
|
||||
func restoreFederationState(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
func restoreFederationState(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.FederationStateRequest
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
|
@ -736,7 +736,7 @@ func restoreFederationState(header *snapshotHeader, restore *state.Restore, deco
|
|||
return restore.FederationState(req.State)
|
||||
}
|
||||
|
||||
func restoreSystemMetadata(header *snapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
func restoreSystemMetadata(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.SystemMetadataEntry
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
|
|
|
@ -72,6 +72,46 @@ const (
|
|||
SystemMetadataRequestType = 31
|
||||
)
|
||||
|
||||
// if a new request type is added above it must be
|
||||
// added to the map below
|
||||
|
||||
// requestTypeStrings is used for snapshot enhance
|
||||
// any new request types added must be placed here
|
||||
var requestTypeStrings = map[MessageType]string{
|
||||
RegisterRequestType: "Register",
|
||||
DeregisterRequestType: "Deregister",
|
||||
KVSRequestType: "KVS",
|
||||
SessionRequestType: "Session",
|
||||
ACLRequestType: "ACL", // DEPRECATED (ACL-Legacy-Compat)
|
||||
TombstoneRequestType: "Tombstone",
|
||||
CoordinateBatchUpdateType: "CoordinateBatchUpdate",
|
||||
PreparedQueryRequestType: "PreparedQuery",
|
||||
TxnRequestType: "Txn",
|
||||
AutopilotRequestType: "Autopilot",
|
||||
AreaRequestType: "Area",
|
||||
ACLBootstrapRequestType: "ACLBootstrap",
|
||||
IntentionRequestType: "Intention",
|
||||
ConnectCARequestType: "ConnectCA",
|
||||
ConnectCAProviderStateType: "ConnectCAProviderState",
|
||||
ConnectCAConfigType: "ConnectCAConfig", // FSM snapshots only.
|
||||
IndexRequestType: "Index", // FSM snapshots only.
|
||||
ACLTokenSetRequestType: "ACLToken",
|
||||
ACLTokenDeleteRequestType: "ACLTokenDelete",
|
||||
ACLPolicySetRequestType: "ACLPolicy",
|
||||
ACLPolicyDeleteRequestType: "ACLPolicyDelete",
|
||||
ConnectCALeafRequestType: "ConnectCALeaf",
|
||||
ConfigEntryRequestType: "ConfigEntry",
|
||||
ACLRoleSetRequestType: "ACLRole",
|
||||
ACLRoleDeleteRequestType: "ACLRoleDelete",
|
||||
ACLBindingRuleSetRequestType: "ACLBindingRule",
|
||||
ACLBindingRuleDeleteRequestType: "ACLBindingRuleDelete",
|
||||
ACLAuthMethodSetRequestType: "ACLAuthMethod",
|
||||
ACLAuthMethodDeleteRequestType: "ACLAuthMethodDelete",
|
||||
ChunkingStateType: "ChunkingState",
|
||||
FederationStateRequestType: "FederationState",
|
||||
SystemMetadataRequestType: "SystemMetadata",
|
||||
}
|
||||
|
||||
const (
|
||||
// IgnoreUnknownTypeFlag is set along with a MessageType
|
||||
// to indicate that the message type can be safely ignored
|
||||
|
@ -2442,6 +2482,21 @@ func (r *KeyringResponses) New() interface{} {
|
|||
return new(KeyringResponses)
|
||||
}
|
||||
|
||||
// String converts message type int to string
|
||||
func (m MessageType) String() string {
|
||||
s, ok := requestTypeStrings[m]
|
||||
if ok {
|
||||
return s
|
||||
}
|
||||
|
||||
s, ok = enterpriseRequestType(m)
|
||||
if ok {
|
||||
return s
|
||||
}
|
||||
return "Unknown(" + strconv.Itoa(int(m)) + ")"
|
||||
|
||||
}
|
||||
|
||||
// UpstreamDownstream pairs come from individual proxy registrations, which can be updated independently.
|
||||
type UpstreamDownstream struct {
|
||||
Upstream ServiceName
|
||||
|
|
|
@ -131,6 +131,10 @@ func (_ *HealthCheck) Validate() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func enterpriseRequestType(m MessageType) (string, bool) {
|
||||
return "", false
|
||||
}
|
||||
|
||||
// CheckIDs returns the IDs for all checks associated with a session, regardless of type
|
||||
func (s *Session) CheckIDs() []types.CheckID {
|
||||
// Merge all check IDs into a single slice, since they will be handled the same way
|
||||
|
|
|
@ -4,11 +4,20 @@ import (
|
|||
"bytes"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"text/tabwriter"
|
||||
|
||||
"github.com/hashicorp/consul/agent/consul/fsm"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/command/flags"
|
||||
"github.com/hashicorp/consul/snapshot"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-msgpack/codec"
|
||||
"github.com/hashicorp/raft"
|
||||
"github.com/mitchellh/cli"
|
||||
)
|
||||
|
||||
|
@ -31,12 +40,13 @@ func (c *cmd) init() {
|
|||
|
||||
func (c *cmd) Run(args []string) int {
|
||||
if err := c.flags.Parse(args); err != nil {
|
||||
c.UI.Error(err.Error())
|
||||
return 1
|
||||
}
|
||||
|
||||
var file string
|
||||
|
||||
args = c.flags.Args()
|
||||
|
||||
switch len(args) {
|
||||
case 0:
|
||||
c.UI.Error("Missing FILE argument")
|
||||
|
@ -56,12 +66,45 @@ func (c *cmd) Run(args []string) int {
|
|||
}
|
||||
defer f.Close()
|
||||
|
||||
meta, err := snapshot.Verify(f)
|
||||
readFile, meta, err := snapshot.Read(hclog.New(nil), f)
|
||||
if err != nil {
|
||||
c.UI.Error(fmt.Sprintf("Error verifying snapshot: %s", err))
|
||||
c.UI.Error(fmt.Sprintf("Error reading snapshot: %s", err))
|
||||
}
|
||||
defer func() {
|
||||
if err := readFile.Close(); err != nil {
|
||||
c.UI.Error(fmt.Sprintf("Failed to close temp snapshot: %v", err))
|
||||
}
|
||||
if err := os.Remove(readFile.Name()); err != nil {
|
||||
c.UI.Error(fmt.Sprintf("Failed to clean up temp snapshot: %v", err))
|
||||
}
|
||||
}()
|
||||
|
||||
stats, totalSize, err := enhance(readFile)
|
||||
if err != nil {
|
||||
c.UI.Error(fmt.Sprintf("Error extracting snapshot data: %s", err))
|
||||
return 1
|
||||
}
|
||||
// Outputs the original style of inspect information
|
||||
legacy, err := c.legacyStats(meta)
|
||||
if err != nil {
|
||||
c.UI.Error(fmt.Sprintf("Error outputting snapshot data: %s", err))
|
||||
}
|
||||
c.UI.Info(legacy.String())
|
||||
|
||||
// Outputs the more detailed snapshot information
|
||||
enhanced, err := c.readStats(stats, totalSize)
|
||||
if err != nil {
|
||||
c.UI.Error(fmt.Sprintf("Error outputting enhanced snapshot data: %s", err))
|
||||
return 1
|
||||
}
|
||||
c.UI.Info(enhanced.String())
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
// legacyStats outputs the expected stats from the original snapshot
|
||||
// inspect command
|
||||
func (c *cmd) legacyStats(meta *raft.SnapshotMeta) (bytes.Buffer, error) {
|
||||
var b bytes.Buffer
|
||||
tw := tabwriter.NewWriter(&b, 0, 2, 6, ' ', 0)
|
||||
fmt.Fprintf(tw, "ID\t%s\n", meta.ID)
|
||||
|
@ -69,14 +112,142 @@ func (c *cmd) Run(args []string) int {
|
|||
fmt.Fprintf(tw, "Index\t%d\n", meta.Index)
|
||||
fmt.Fprintf(tw, "Term\t%d\n", meta.Term)
|
||||
fmt.Fprintf(tw, "Version\t%d\n", meta.Version)
|
||||
if err = tw.Flush(); err != nil {
|
||||
c.UI.Error(fmt.Sprintf("Error rendering snapshot info: %s", err))
|
||||
return 1
|
||||
if err := tw.Flush(); err != nil {
|
||||
return b, err
|
||||
}
|
||||
return b, nil
|
||||
}
|
||||
|
||||
type typeStats struct {
|
||||
Name string
|
||||
Sum int
|
||||
Count int
|
||||
}
|
||||
|
||||
// countingReader helps keep track of the bytes we have read
|
||||
// when reading snapshots
|
||||
type countingReader struct {
|
||||
wrappedReader io.Reader
|
||||
read int
|
||||
}
|
||||
|
||||
func (r *countingReader) Read(p []byte) (n int, err error) {
|
||||
n, err = r.wrappedReader.Read(p)
|
||||
if err == nil {
|
||||
r.read += n
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
// enhance utilizes ReadSnapshot to populate the struct with
|
||||
// all of the snapshot's itemized data
|
||||
func enhance(file io.Reader) (map[structs.MessageType]typeStats, int, error) {
|
||||
stats := make(map[structs.MessageType]typeStats)
|
||||
cr := &countingReader{wrappedReader: file}
|
||||
totalSize := 0
|
||||
handler := func(header *fsm.SnapshotHeader, msg structs.MessageType, dec *codec.Decoder) error {
|
||||
name := structs.MessageType.String(msg)
|
||||
s := stats[msg]
|
||||
if s.Name == "" {
|
||||
s.Name = name
|
||||
}
|
||||
var val interface{}
|
||||
err := dec.Decode(&val)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to decode msg type %v, error %v", name, err)
|
||||
}
|
||||
|
||||
c.UI.Info(b.String())
|
||||
size := cr.read - totalSize
|
||||
s.Sum += size
|
||||
s.Count++
|
||||
totalSize = cr.read
|
||||
stats[msg] = s
|
||||
return nil
|
||||
}
|
||||
if err := fsm.ReadSnapshot(cr, handler); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
return stats, totalSize, nil
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
// readStats takes the information generated from enhance and creates human
|
||||
// readable output from it
|
||||
func (c *cmd) readStats(stats map[structs.MessageType]typeStats, totalSize int) (bytes.Buffer, error) {
|
||||
// Output stats in size-order
|
||||
ss := make([]typeStats, 0, len(stats))
|
||||
|
||||
for _, s := range stats {
|
||||
ss = append(ss, s)
|
||||
}
|
||||
|
||||
// Sort the stat slice
|
||||
sort.Slice(ss, func(i, j int) bool { return ss[i].Sum > ss[j].Sum })
|
||||
|
||||
var b bytes.Buffer
|
||||
|
||||
tw := tabwriter.NewWriter(&b, 8, 8, 6, ' ', 0)
|
||||
fmt.Fprintln(tw, "\n Type\tCount\tSize\t")
|
||||
fmt.Fprintf(tw, " %s\t%s\t%s\t", "----", "----", "----")
|
||||
// For each different type generate new output
|
||||
for _, s := range ss {
|
||||
fmt.Fprintf(tw, "\n %s\t%d\t%s\t", s.Name, s.Count, ByteSize(uint64(s.Sum)))
|
||||
}
|
||||
fmt.Fprintf(tw, "\n %s\t%s\t%s\t", "----", "----", "----")
|
||||
fmt.Fprintf(tw, "\n Total\t\t%s\t", ByteSize(uint64(totalSize)))
|
||||
|
||||
if err := tw.Flush(); err != nil {
|
||||
c.UI.Error(fmt.Sprintf("Error rendering snapshot info: %s", err))
|
||||
return b, err
|
||||
}
|
||||
|
||||
return b, nil
|
||||
|
||||
}
|
||||
|
||||
// ByteSize returns a human-readable byte string of the form 10MB, 12.5KB, and so forth. The following units are available:
|
||||
// TB: Terabyte
|
||||
// GB: Gigabyte
|
||||
// MB: Megabyte
|
||||
// KB: Kilobyte
|
||||
// B: Byte
|
||||
// The unit that results in the smallest number greater than or equal to 1 is always chosen.
|
||||
// From https://github.com/cloudfoundry/bytefmt/blob/master/bytes.go
|
||||
|
||||
const (
|
||||
BYTE = 1 << (10 * iota)
|
||||
KILOBYTE
|
||||
MEGABYTE
|
||||
GIGABYTE
|
||||
TERABYTE
|
||||
)
|
||||
|
||||
func ByteSize(bytes uint64) string {
|
||||
unit := ""
|
||||
value := float64(bytes)
|
||||
|
||||
switch {
|
||||
case bytes >= TERABYTE:
|
||||
unit = "TB"
|
||||
value = value / TERABYTE
|
||||
case bytes >= GIGABYTE:
|
||||
unit = "GB"
|
||||
value = value / GIGABYTE
|
||||
case bytes >= MEGABYTE:
|
||||
unit = "MB"
|
||||
value = value / MEGABYTE
|
||||
case bytes >= KILOBYTE:
|
||||
unit = "KB"
|
||||
value = value / KILOBYTE
|
||||
case bytes >= BYTE:
|
||||
unit = "B"
|
||||
case bytes == 0:
|
||||
return "0"
|
||||
}
|
||||
|
||||
result := strconv.FormatFloat(value, 'f', 1, 64)
|
||||
result = strings.TrimSuffix(result, ".0")
|
||||
return result + unit
|
||||
}
|
||||
|
||||
func (c *cmd) Synopsis() string {
|
||||
|
|
|
@ -1,17 +1,36 @@
|
|||
package inspect
|
||||
|
||||
import (
|
||||
"io"
|
||||
"os"
|
||||
"flag"
|
||||
"io/ioutil"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/agent"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/mitchellh/cli"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// update allows golden files to be updated based on the current output.
|
||||
var update = flag.Bool("update", false, "update golden files")
|
||||
|
||||
// golden reads and optionally writes the expected data to the golden file,
|
||||
// returning the contents as a string.
|
||||
func golden(t *testing.T, name, got string) string {
|
||||
t.Helper()
|
||||
|
||||
golden := filepath.Join("testdata", name+".golden")
|
||||
if *update && got != "" {
|
||||
err := ioutil.WriteFile(golden, []byte(got), 0644)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
expected, err := ioutil.ReadFile(golden)
|
||||
require.NoError(t, err)
|
||||
|
||||
return string(expected)
|
||||
}
|
||||
|
||||
func TestSnapshotInspectCommand_noTabs(t *testing.T) {
|
||||
t.Parallel()
|
||||
if strings.ContainsRune(New(cli.NewMockUi()).Help(), '\t') {
|
||||
|
@ -60,53 +79,19 @@ func TestSnapshotInspectCommand_Validation(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestSnapshotInspectCommand(t *testing.T) {
|
||||
t.Parallel()
|
||||
a := agent.NewTestAgent(t, ``)
|
||||
defer a.Shutdown()
|
||||
client := a.Client()
|
||||
|
||||
dir := testutil.TempDir(t, "snapshot")
|
||||
file := filepath.Join(dir, "backup.tgz")
|
||||
|
||||
// Save a snapshot of the current Consul state
|
||||
f, err := os.Create(file)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
snap, _, err := client.Snapshot().Save(nil)
|
||||
if err != nil {
|
||||
f.Close()
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if _, err := io.Copy(f, snap); err != nil {
|
||||
f.Close()
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if err := f.Close(); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
filepath := "./testdata/backup.snap"
|
||||
|
||||
// Inspect the snapshot
|
||||
ui := cli.NewMockUi()
|
||||
c := New(ui)
|
||||
args := []string{file}
|
||||
args := []string{filepath}
|
||||
|
||||
code := c.Run(args)
|
||||
if code != 0 {
|
||||
t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String())
|
||||
}
|
||||
|
||||
output := ui.OutputWriter.String()
|
||||
for _, key := range []string{
|
||||
"ID",
|
||||
"Size",
|
||||
"Index",
|
||||
"Term",
|
||||
"Version",
|
||||
} {
|
||||
if !strings.Contains(output, key) {
|
||||
t.Fatalf("bad %#v, missing %q", output, key)
|
||||
}
|
||||
}
|
||||
want := golden(t, t.Name(), ui.OutputWriter.String())
|
||||
require.Equal(t, want, ui.OutputWriter.String())
|
||||
}
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
ID 2-13-1602222343947
|
||||
Size 5141
|
||||
Index 13
|
||||
Term 2
|
||||
Version 1
|
||||
|
||||
|
||||
Type Count Size
|
||||
---- ---- ----
|
||||
Register 3 1.7KB
|
||||
ConnectCA 1 1.2KB
|
||||
ConnectCAProviderState 1 1.1KB
|
||||
Index 12 344B
|
||||
Autopilot 1 199B
|
||||
ConnectCAConfig 1 197B
|
||||
FederationState 1 139B
|
||||
SystemMetadata 1 68B
|
||||
ChunkingState 1 12B
|
||||
---- ---- ----
|
||||
Total 5KB
|
Binary file not shown.
|
@ -160,13 +160,12 @@ func concludeGzipRead(decomp *gzip.Reader) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Restore takes the snapshot from the reader and attempts to apply it to the
|
||||
// given Raft instance.
|
||||
func Restore(logger hclog.Logger, in io.Reader, r *raft.Raft) error {
|
||||
// Read a snapshot into a temporary file. The caller is responsible for removing the file.
|
||||
func Read(logger hclog.Logger, in io.Reader) (*os.File, *raft.SnapshotMeta, error) {
|
||||
// Wrap the reader in a gzip decompressor.
|
||||
decomp, err := gzip.NewReader(in)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to decompress snapshot: %v", err)
|
||||
return nil, nil, fmt.Errorf("failed to decompress snapshot: %v", err)
|
||||
}
|
||||
defer func() {
|
||||
if err := decomp.Close(); err != nil {
|
||||
|
@ -178,9 +177,37 @@ func Restore(logger hclog.Logger, in io.Reader, r *raft.Raft) error {
|
|||
// we can avoid buffering in memory.
|
||||
snap, err := ioutil.TempFile("", "snapshot")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create temp snapshot file: %v", err)
|
||||
return nil, nil, fmt.Errorf("failed to create temp snapshot file: %v", err)
|
||||
}
|
||||
|
||||
// Read the archive.
|
||||
var metadata raft.SnapshotMeta
|
||||
if err := read(decomp, &metadata, snap); err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to read snapshot file: %v", err)
|
||||
}
|
||||
|
||||
if err := concludeGzipRead(decomp); err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// Sync and rewind the file so it's ready to be read again.
|
||||
if err := snap.Sync(); err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to sync temp snapshot: %v", err)
|
||||
}
|
||||
if _, err := snap.Seek(0, 0); err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to rewind temp snapshot: %v", err)
|
||||
}
|
||||
return snap, &metadata, nil
|
||||
}
|
||||
|
||||
// Restore takes the snapshot from the reader and attempts to apply it to the
|
||||
// given Raft instance.
|
||||
func Restore(logger hclog.Logger, in io.Reader, r *raft.Raft) error {
|
||||
snap, metadata, err := Read(logger, in)
|
||||
defer func() {
|
||||
if snap == nil {
|
||||
return
|
||||
}
|
||||
if err := snap.Close(); err != nil {
|
||||
logger.Error("Failed to close temp snapshot", "error", err)
|
||||
}
|
||||
|
@ -188,27 +215,12 @@ func Restore(logger hclog.Logger, in io.Reader, r *raft.Raft) error {
|
|||
logger.Error("Failed to clean up temp snapshot", "error", err)
|
||||
}
|
||||
}()
|
||||
|
||||
// Read the archive.
|
||||
var metadata raft.SnapshotMeta
|
||||
if err := read(decomp, &metadata, snap); err != nil {
|
||||
return fmt.Errorf("failed to read snapshot file: %v", err)
|
||||
}
|
||||
|
||||
if err := concludeGzipRead(decomp); err != nil {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Sync and rewind the file so it's ready to be read again.
|
||||
if err := snap.Sync(); err != nil {
|
||||
return fmt.Errorf("failed to sync temp snapshot: %v", err)
|
||||
}
|
||||
if _, err := snap.Seek(0, 0); err != nil {
|
||||
return fmt.Errorf("failed to rewind temp snapshot: %v", err)
|
||||
}
|
||||
|
||||
// Feed the snapshot into Raft.
|
||||
if err := r.Restore(&metadata, snap, 0); err != nil {
|
||||
if err := r.Restore(metadata, snap, 0); err != nil {
|
||||
return fmt.Errorf("Raft error when restoring snapshot: %v", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -41,6 +41,20 @@ Size 667
|
|||
Index 5
|
||||
Term 2
|
||||
Version 1
|
||||
|
||||
Type Count Size
|
||||
---- ---- ----
|
||||
Register 3 1.7KB
|
||||
ConnectCA 1 1.2KB
|
||||
ConnectCAProviderState 1 1.1KB
|
||||
Index 12 344B
|
||||
AutopilotRequest 1 199B
|
||||
ConnectCAConfig 1 197B
|
||||
FederationState 1 139B
|
||||
SystemMetadata 1 68B
|
||||
ChunkingState 1 12B
|
||||
---- ---- ----
|
||||
Total 5KB
|
||||
```
|
||||
|
||||
Please see the [HTTP API](/api/snapshot) documentation for
|
||||
|
|
Loading…
Reference in New Issue