consul/vendor/github.com/hashicorp/go-raftchunking/fsm.go

160 lines
4.2 KiB
Go

package raftchunking
import (
"io"
"github.com/golang/protobuf/proto"
"github.com/hashicorp/errwrap"
"github.com/hashicorp/go-raftchunking/types"
"github.com/hashicorp/raft"
)
var _ raft.FSM = (*ChunkingFSM)(nil)
var _ raft.ConfigurationStore = (*ChunkingConfigurationStore)(nil)
type ChunkingSuccess struct {
Response interface{}
}
// ChunkingFSM is an FSM that implements chunking; it's the sister of
// ChunkingApply.
//
// N.B.: If a term change happens the final apply from the client will have a
// nil result and not be passed through to the underlying FSM. To detect this,
// the final apply to the underlying FSM is wrapped in ChunkingSuccess.
type ChunkingFSM struct {
underlying raft.FSM
store ChunkStorage
lastTerm uint64
}
type ChunkingConfigurationStore struct {
*ChunkingFSM
underlyingConfigurationStore raft.ConfigurationStore
}
func NewChunkingFSM(underlying raft.FSM, store ChunkStorage) *ChunkingFSM {
ret := &ChunkingFSM{
underlying: underlying,
store: store,
}
if store == nil {
ret.store = NewInmemChunkStorage()
}
return ret
}
func NewChunkingConfigurationStore(underlying raft.ConfigurationStore, store ChunkStorage) *ChunkingConfigurationStore {
ret := &ChunkingConfigurationStore{
ChunkingFSM: &ChunkingFSM{
underlying: underlying,
store: store,
},
underlyingConfigurationStore: underlying,
}
if store == nil {
ret.ChunkingFSM.store = NewInmemChunkStorage()
}
return ret
}
// Apply applies the log, handling chunking as needed. The return value will
// either be an error or whatever is returned from the underlying Apply.
func (c *ChunkingFSM) Apply(l *raft.Log) interface{} {
// Not chunking or wrong type, pass through
if l.Type != raft.LogCommand || l.Extensions == nil {
return c.underlying.Apply(l)
}
if l.Term != c.lastTerm {
// Term has changed. A raft library client that was applying chunks
// should get an error that it's no longer the leader and bail, and
// then any client of (Consul, Vault, etc.) should then retry the full
// chunking operation automatically, which will be under a different
// opnum. So it should be safe in this case to clear the map.
if err := c.store.RestoreChunks(nil); err != nil {
return err
}
c.lastTerm = l.Term
}
// Get chunk info from extensions
var ci types.ChunkInfo
if err := proto.Unmarshal(l.Extensions, &ci); err != nil {
return errwrap.Wrapf("error unmarshaling chunk info: {{err}}", err)
}
// Store the current chunk and find out if all chunks have arrived
done, err := c.store.StoreChunk(&ChunkInfo{
OpNum: ci.OpNum,
SequenceNum: ci.SequenceNum,
NumChunks: ci.NumChunks,
Term: l.Term,
Data: l.Data,
})
if err != nil {
return err
}
if !done {
return nil
}
// All chunks are here; get the full set and clear storage of the op
chunks, err := c.store.FinalizeOp(ci.OpNum)
if err != nil {
return err
}
finalData := make([]byte, 0, len(chunks)*raft.SuggestedMaxDataSize)
for _, chunk := range chunks {
finalData = append(finalData, chunk.Data...)
}
// Use the latest log's values with the final data
logToApply := &raft.Log{
Index: l.Index,
Term: l.Term,
Type: l.Type,
Data: finalData,
Extensions: ci.NextExtensions,
}
return ChunkingSuccess{Response: c.Apply(logToApply)}
}
func (c *ChunkingFSM) Snapshot() (raft.FSMSnapshot, error) {
return c.underlying.Snapshot()
}
func (c *ChunkingFSM) Restore(rc io.ReadCloser) error {
return c.underlying.Restore(rc)
}
// Note: this is used in tests via the Raft package test helper functions, even
// if it's not used in client code
func (c *ChunkingFSM) Underlying() raft.FSM {
return c.underlying
}
func (c *ChunkingFSM) CurrentState() (*State, error) {
chunks, err := c.store.GetChunks()
if err != nil {
return nil, err
}
return &State{
ChunkMap: chunks,
}, nil
}
func (c *ChunkingFSM) RestoreState(state *State) error {
// If nil we'll restore to blank, so create a new state with a nil map
if state == nil {
state = new(State)
}
return c.store.RestoreChunks(state.ChunkMap)
}
func (c *ChunkingConfigurationStore) StoreConfiguration(index uint64, configuration raft.Configuration) {
c.underlyingConfigurationStore.StoreConfiguration(index, configuration)
}