consul: First pass at tombstone reaping

This commit is contained in:
Armon Dadgar 2014-12-15 15:28:56 -08:00
parent bcb10cff11
commit bf74361fa7
3 changed files with 86 additions and 13 deletions

View File

@ -85,8 +85,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
return c.applySessionOperation(buf[1:], log.Index) return c.applySessionOperation(buf[1:], log.Index)
case structs.ACLRequestType: case structs.ACLRequestType:
return c.applyACLOperation(buf[1:], log.Index) return c.applyACLOperation(buf[1:], log.Index)
case structs.TombstoneReapRequestType: case structs.TombstoneRequestType:
return c.applyTombstoneReapOperation(buf[1:], log.Index) return c.applyTombstoneOperation(buf[1:], log.Index)
default: default:
panic(fmt.Errorf("failed to apply request: %#v", buf)) panic(fmt.Errorf("failed to apply request: %#v", buf))
} }
@ -217,12 +217,18 @@ func (c *consulFSM) applyACLOperation(buf []byte, index uint64) interface{} {
} }
} }
func (c *consulFSM) applyTombstoneReapOperation(buf []byte, index uint64) interface{} { func (c *consulFSM) applyTombstoneOperation(buf []byte, index uint64) interface{} {
var req structs.TombstoneReapRequest var req structs.TombstoneRequest
if err := structs.Decode(buf, &req); err != nil { if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err)) panic(fmt.Errorf("failed to decode request: %v", err))
} }
return nil switch req.Op {
case structs.TombstoneReap:
return c.state.ReapTombstones(req.ReapIndex)
default:
c.logger.Printf("[WARN] consul.fsm: Invalid Tombstone operation '%s'", req.Op)
return fmt.Errorf("Invalid Tombstone operation '%s'", req.Op)
}
} }
func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) { func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {

View File

@ -536,12 +536,13 @@ func (s *Server) removeConsulServer(m serf.Member, port int) error {
// through Raft to ensure consistency. We do this outside the leader loop // through Raft to ensure consistency. We do this outside the leader loop
// to avoid blocking. // to avoid blocking.
func (s *Server) reapTombstones(index uint64) { func (s *Server) reapTombstones(index uint64) {
req := structs.TombstoneReapRequest{ req := structs.TombstoneRequest{
Datacenter: s.config.Datacenter, Datacenter: s.config.Datacenter,
Op: structs.TombstoneReap,
ReapIndex: index, ReapIndex: index,
WriteRequest: structs.WriteRequest{Token: s.config.ACLToken}, WriteRequest: structs.WriteRequest{Token: s.config.ACLToken},
} }
_, err := s.raftApply(structs.TombstoneReapRequestType, &req) _, err := s.raftApply(structs.TombstoneRequestType, &req)
if err != nil { if err != nil {
s.logger.Printf("[ERR] consul: failed to reap tombstones up to %d: %v", s.logger.Printf("[ERR] consul: failed to reap tombstones up to %d: %v",
index, err) index, err)

View File

@ -296,12 +296,6 @@ func (s *StateStore) initialize() error {
Unique: true, Unique: true,
Fields: []string{"Key"}, Fields: []string{"Key"},
}, },
"id_prefix": &MDBIndex{
Virtual: true,
RealIndex: "id",
Fields: []string{"Key"},
IdxFunc: DefaultIndexPrefixFunc,
},
}, },
Decoder: func(buf []byte) interface{} { Decoder: func(buf []byte) interface{} {
out := new(structs.DirEntry) out := new(structs.DirEntry)
@ -1386,6 +1380,71 @@ func (s *StateStore) kvsSet(
return true, tx.Commit() return true, tx.Commit()
} }
// ReapTombstones is used to delete all the tombstones with a ModifyTime
// less than or equal to the given index. This is used to prevent unbounded
// storage growth of the tombstones.
func (s *StateStore) ReapTombstones(index uint64) error {
tx, err := s.tombstoneTable.StartTxn(false, nil)
if err != nil {
return fmt.Errorf("failed to start txn: %v", err)
}
defer tx.Abort()
// Scan the tombstone table for all the entries that are
// eligble for GC. This could be improved by indexing on
// ModifyTime and doing a less-than-equals scan, however
// we don't currently support numeric indexes internally.
// Luckily, this is a low frequency operation.
var toDelete []string
streamCh := make(chan interface{}, 128)
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
for raw := range streamCh {
ent := raw.(*structs.DirEntry)
if ent.ModifyIndex <= index {
toDelete = append(toDelete, ent.Key)
}
}
}()
if err := s.tombstoneTable.StreamTxn(streamCh, tx, "id"); err != nil {
s.logger.Printf("[ERR] consul.state: Failed to scan tombstones: %v", err)
return fmt.Errorf("failed to scan tombstones: %v", err)
}
// Delete each tombstone
if len(toDelete) > 0 {
s.logger.Printf("[DEBUG] consul.state: Reaping %d tombstones", len(toDelete))
}
for _, key := range toDelete {
num, err := s.tombstoneTable.DeleteTxn(tx, "id", key)
if err != nil {
s.logger.Printf("[ERR] consul.state: Failed to delete tombstone: %v", err)
return fmt.Errorf("failed to delete tombstone: %v", err)
}
if num != 1 {
return fmt.Errorf("failed to delete tombstone '%s'", key)
}
}
return tx.Commit()
}
// TombstoneRestore is used to restore a tombstone.
// It should only be used when doing a restore.
func (s *StateStore) TombstoneRestore(d *structs.DirEntry) error {
// Start a new txn
tx, err := s.tombstoneTable.StartTxn(false, nil)
if err != nil {
return err
}
defer tx.Abort()
if err := s.tombstoneTable.InsertTxn(tx, d); err != nil {
return err
}
return tx.Commit()
}
// SessionCreate is used to create a new session. The // SessionCreate is used to create a new session. The
// ID will be populated on a successful return // ID will be populated on a successful return
func (s *StateStore) SessionCreate(index uint64, session *structs.Session) error { func (s *StateStore) SessionCreate(index uint64, session *structs.Session) error {
@ -1852,6 +1911,13 @@ func (s *StateSnapshot) KVSDump(stream chan<- interface{}) error {
return s.store.kvsTable.StreamTxn(stream, s.tx, "id") return s.store.kvsTable.StreamTxn(stream, s.tx, "id")
} }
// TombstoneDump is used to dump all tombstone entries. It takes a channel and streams
// back *struct.DirEntry objects. This will block and should be invoked
// in a goroutine.
func (s *StateSnapshot) TombstoneDump(stream chan<- interface{}) error {
return s.store.tombstoneTable.StreamTxn(stream, s.tx, "id")
}
// SessionList is used to list all the open sessions // SessionList is used to list all the open sessions
func (s *StateSnapshot) SessionList() ([]*structs.Session, error) { func (s *StateSnapshot) SessionList() ([]*structs.Session, error) {
res, err := s.store.sessionTable.GetTxn(s.tx, "id") res, err := s.store.sessionTable.GetTxn(s.tx, "id")