package raft import ( "fmt" "io" "time" "github.com/armon/go-metrics" ) // FSM provides an interface that can be implemented by // clients to make use of the replicated log. type FSM interface { // Apply log is invoked once a log entry is committed. // It returns a value which will be made available in the // ApplyFuture returned by Raft.Apply method if that // method was called on the same Raft node as the FSM. Apply(*Log) interface{} // Snapshot is used to support log compaction. This call should // return an FSMSnapshot which can be used to save a point-in-time // snapshot of the FSM. Apply and Snapshot are not called in multiple // threads, but Apply will be called concurrently with Persist. This means // the FSM should be implemented in a fashion that allows for concurrent // updates while a snapshot is happening. Snapshot() (FSMSnapshot, error) // Restore is used to restore an FSM from a snapshot. It is not called // concurrently with any other command. The FSM must discard all previous // state. Restore(io.ReadCloser) error } // BatchingFSM extends the FSM interface to add an ApplyBatch function. This can // optionally be implemented by clients to enable multiple logs to be applied to // the FSM in batches. Up to MaxAppendEntries could be sent in a batch. type BatchingFSM interface { // ApplyBatch is invoked once a batch of log entries has been committed and // are ready to be applied to the FSM. ApplyBatch will take in an array of // log entries. These log entries will be in the order they were committed, // will not have gaps, and could be of a few log types. Clients should check // the log type prior to attempting to decode the data attached. Presently // the LogCommand and LogConfiguration types will be sent. // // The returned slice must be the same length as the input and each response // should correlate to the log at the same index of the input. The returned // values will be made available in the ApplyFuture returned by Raft.Apply // method if that method was called on the same Raft node as the FSM. ApplyBatch([]*Log) []interface{} FSM } // FSMSnapshot is returned by an FSM in response to a Snapshot // It must be safe to invoke FSMSnapshot methods with concurrent // calls to Apply. type FSMSnapshot interface { // Persist should dump all necessary state to the WriteCloser 'sink', // and call sink.Close() when finished or call sink.Cancel() on error. Persist(sink SnapshotSink) error // Release is invoked when we are finished with the snapshot. Release() } // runFSM is a long running goroutine responsible for applying logs // to the FSM. This is done async of other logs since we don't want // the FSM to block our internal operations. func (r *Raft) runFSM() { var lastIndex, lastTerm uint64 batchingFSM, batchingEnabled := r.fsm.(BatchingFSM) configStore, configStoreEnabled := r.fsm.(ConfigurationStore) commitSingle := func(req *commitTuple) { // Apply the log if a command or config change var resp interface{} // Make sure we send a response defer func() { // Invoke the future if given if req.future != nil { req.future.response = resp req.future.respond(nil) } }() switch req.log.Type { case LogCommand: start := time.Now() resp = r.fsm.Apply(req.log) metrics.MeasureSince([]string{"raft", "fsm", "apply"}, start) case LogConfiguration: if !configStoreEnabled { // Return early to avoid incrementing the index and term for // an unimplemented operation. return } start := time.Now() configStore.StoreConfiguration(req.log.Index, DecodeConfiguration(req.log.Data)) metrics.MeasureSince([]string{"raft", "fsm", "store_config"}, start) } // Update the indexes lastIndex = req.log.Index lastTerm = req.log.Term } commitBatch := func(reqs []*commitTuple) { if !batchingEnabled { for _, ct := range reqs { commitSingle(ct) } return } // Only send LogCommand and LogConfiguration log types. LogBarrier types // will not be sent to the FSM. shouldSend := func(l *Log) bool { switch l.Type { case LogCommand, LogConfiguration: return true } return false } var lastBatchIndex, lastBatchTerm uint64 sendLogs := make([]*Log, 0, len(reqs)) for _, req := range reqs { if shouldSend(req.log) { sendLogs = append(sendLogs, req.log) } lastBatchIndex = req.log.Index lastBatchTerm = req.log.Term } var responses []interface{} if len(sendLogs) > 0 { start := time.Now() responses = batchingFSM.ApplyBatch(sendLogs) metrics.MeasureSince([]string{"raft", "fsm", "applyBatch"}, start) metrics.AddSample([]string{"raft", "fsm", "applyBatchNum"}, float32(len(reqs))) // Ensure we get the expected responses if len(sendLogs) != len(responses) { panic("invalid number of responses") } } // Update the indexes lastIndex = lastBatchIndex lastTerm = lastBatchTerm var i int for _, req := range reqs { var resp interface{} // If the log was sent to the FSM, retrieve the response. if shouldSend(req.log) { resp = responses[i] i++ } if req.future != nil { req.future.response = resp req.future.respond(nil) } } } restore := func(req *restoreFuture) { // Open the snapshot meta, source, err := r.snapshots.Open(req.ID) if err != nil { req.respond(fmt.Errorf("failed to open snapshot %v: %v", req.ID, err)) return } defer source.Close() // Attempt to restore if err := fsmRestoreAndMeasure(r.fsm, source); err != nil { req.respond(fmt.Errorf("failed to restore snapshot %v: %v", req.ID, err)) return } // Update the last index and term lastIndex = meta.Index lastTerm = meta.Term req.respond(nil) } snapshot := func(req *reqSnapshotFuture) { // Is there something to snapshot? if lastIndex == 0 { req.respond(ErrNothingNewToSnapshot) return } // Start a snapshot start := time.Now() snap, err := r.fsm.Snapshot() metrics.MeasureSince([]string{"raft", "fsm", "snapshot"}, start) // Respond to the request req.index = lastIndex req.term = lastTerm req.snapshot = snap req.respond(err) } for { select { case ptr := <-r.fsmMutateCh: switch req := ptr.(type) { case []*commitTuple: commitBatch(req) case *restoreFuture: restore(req) default: panic(fmt.Errorf("bad type passed to fsmMutateCh: %#v", ptr)) } case req := <-r.fsmSnapshotCh: snapshot(req) case <-r.shutdownCh: return } } } // fsmRestoreAndMeasure wraps the Restore call on an FSM to consistently measure // and report timing metrics. The caller is still responsible for calling Close // on the source in all cases. func fsmRestoreAndMeasure(fsm FSM, source io.ReadCloser) error { start := time.Now() if err := fsm.Restore(source); err != nil { return err } metrics.MeasureSince([]string{"raft", "fsm", "restore"}, start) metrics.SetGauge([]string{"raft", "fsm", "lastRestoreDuration"}, float32(time.Since(start).Milliseconds())) return nil }