Resolves an FSM snapshot TODO.

This adds checks for sink write calls before we continue the refactor, which
will resolve the other TODO comment we deleted as part of this change.
This commit is contained in:
James Phillips 2017-11-28 17:26:16 -08:00
parent aa61159b74
commit e810697e06
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
1 changed files with 33 additions and 18 deletions

View File

@ -15,13 +15,6 @@ import (
"github.com/hashicorp/raft"
)
// TODO (slackpad) - There are two refactors we should do here:
//
// 1. Register the different types from the state store and make the FSM more
// generic, especially around snapshot/restore. Those should really just
// pass the encoder into a WriteSnapshot() kind of method.
// 2. Check all the error return values from all the Write() calls.
// msgpackHandle is a shared handle for encoding/decoding msgpack payloads
var msgpackHandle = &codec.MsgpackHandle{}
@ -592,7 +585,9 @@ func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink,
}
// Register the node itself
sink.Write([]byte{byte(structs.RegisterRequestType)})
if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil {
return err
}
if err := encoder.Encode(&req); err != nil {
return err
}
@ -603,7 +598,9 @@ func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink,
return err
}
for service := services.Next(); service != nil; service = services.Next() {
sink.Write([]byte{byte(structs.RegisterRequestType)})
if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil {
return err
}
req.Service = service.(*structs.ServiceNode).ToNodeService()
if err := encoder.Encode(&req); err != nil {
return err
@ -617,7 +614,9 @@ func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink,
return err
}
for check := checks.Next(); check != nil; check = checks.Next() {
sink.Write([]byte{byte(structs.RegisterRequestType)})
if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil {
return err
}
req.Check = check.(*structs.HealthCheck)
if err := encoder.Encode(&req); err != nil {
return err
@ -633,7 +632,9 @@ func (s *consulSnapshot) persistNodes(sink raft.SnapshotSink,
return err
}
for coord := coords.Next(); coord != nil; coord = coords.Next() {
sink.Write([]byte{byte(structs.CoordinateBatchUpdateType)})
if _, err := sink.Write([]byte{byte(structs.CoordinateBatchUpdateType)}); err != nil {
return err
}
updates := structs.Coordinates{coord.(*structs.Coordinate)}
if err := encoder.Encode(&updates); err != nil {
return err
@ -650,7 +651,9 @@ func (s *consulSnapshot) persistSessions(sink raft.SnapshotSink,
}
for session := sessions.Next(); session != nil; session = sessions.Next() {
sink.Write([]byte{byte(structs.SessionRequestType)})
if _, err := sink.Write([]byte{byte(structs.SessionRequestType)}); err != nil {
return err
}
if err := encoder.Encode(session.(*structs.Session)); err != nil {
return err
}
@ -666,7 +669,9 @@ func (s *consulSnapshot) persistACLs(sink raft.SnapshotSink,
}
for acl := acls.Next(); acl != nil; acl = acls.Next() {
sink.Write([]byte{byte(structs.ACLRequestType)})
if _, err := sink.Write([]byte{byte(structs.ACLRequestType)}); err != nil {
return err
}
if err := encoder.Encode(acl.(*structs.ACL)); err != nil {
return err
}
@ -677,7 +682,9 @@ func (s *consulSnapshot) persistACLs(sink raft.SnapshotSink,
return err
}
if bs != nil {
sink.Write([]byte{byte(structs.ACLBootstrapRequestType)})
if _, err := sink.Write([]byte{byte(structs.ACLBootstrapRequestType)}); err != nil {
return err
}
if err := encoder.Encode(bs); err != nil {
return err
}
@ -694,7 +701,9 @@ func (s *consulSnapshot) persistKVs(sink raft.SnapshotSink,
}
for entry := entries.Next(); entry != nil; entry = entries.Next() {
sink.Write([]byte{byte(structs.KVSRequestType)})
if _, err := sink.Write([]byte{byte(structs.KVSRequestType)}); err != nil {
return err
}
if err := encoder.Encode(entry.(*structs.DirEntry)); err != nil {
return err
}
@ -710,7 +719,9 @@ func (s *consulSnapshot) persistTombstones(sink raft.SnapshotSink,
}
for stone := stones.Next(); stone != nil; stone = stones.Next() {
sink.Write([]byte{byte(structs.TombstoneRequestType)})
if _, err := sink.Write([]byte{byte(structs.TombstoneRequestType)}); err != nil {
return err
}
// For historical reasons, these are serialized in the snapshots
// as KV entries. We want to keep the snapshot format compatible
@ -737,7 +748,9 @@ func (s *consulSnapshot) persistPreparedQueries(sink raft.SnapshotSink,
}
for _, query := range queries {
sink.Write([]byte{byte(structs.PreparedQueryRequestType)})
if _, err := sink.Write([]byte{byte(structs.PreparedQueryRequestType)}); err != nil {
return err
}
if err := encoder.Encode(query); err != nil {
return err
}
@ -755,7 +768,9 @@ func (s *consulSnapshot) persistAutopilot(sink raft.SnapshotSink,
return nil
}
sink.Write([]byte{byte(structs.AutopilotRequestType)})
if _, err := sink.Write([]byte{byte(structs.AutopilotRequestType)}); err != nil {
return err
}
if err := encoder.Encode(autopilot); err != nil {
return err
}