agent: Adding event ingestion

This commit is contained in:
Armon Dadgar 2014-08-27 17:01:10 -07:00
parent 314743c111
commit 2d03146d3b
2 changed files with 25 additions and 1 deletions

View File

@ -50,6 +50,15 @@ type Agent struct {
// eventCh is used to receive user events // eventCh is used to receive user events
eventCh chan serf.UserEvent eventCh chan serf.UserEvent
// eventBuf stores the most recent events in a ring buffer
// using eventIndex as the next index to insert into. This
// is guarded by eventLock. When an insert happens, the
// eventNotify group is notified.
eventBuf []*userEventEnc
eventIndex int
eventLock sync.RWMutex
eventNotify consul.NotifyGroup
shutdown bool shutdown bool
shutdownCh chan struct{} shutdownCh chan struct{}
shutdownLock sync.Mutex shutdownLock sync.Mutex
@ -93,6 +102,7 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
checkMonitors: make(map[string]*CheckMonitor), checkMonitors: make(map[string]*CheckMonitor),
checkTTLs: make(map[string]*CheckTTL), checkTTLs: make(map[string]*CheckTTL),
eventCh: make(chan serf.UserEvent, 1024), eventCh: make(chan serf.UserEvent, 1024),
eventBuf: make([]*userEventEnc, 256),
shutdownCh: make(chan struct{}), shutdownCh: make(chan struct{}),
} }

View File

@ -115,7 +115,8 @@ func (a *Agent) handleEvents() {
continue continue
} }
// TODO: Process event // Ingest the event
a.ingestUserEvent(msg)
case <-a.shutdownCh: case <-a.shutdownCh:
return return
@ -195,6 +196,19 @@ func (a *Agent) shouldProcessUserEvent(msg *userEventEnc) bool {
return true return true
} }
// ingestUserEvent is used to process an event that passes filtering
func (a *Agent) ingestUserEvent(msg *userEventEnc) {
a.eventLock.Lock()
defer func() {
a.eventLock.Unlock()
a.eventNotify.Notify()
}()
idx := a.eventIndex
a.eventBuf[idx] = msg
a.eventIndex = (idx + 1) % len(a.eventBuf)
}
// Decode is used to decode a MsgPack encoded object // Decode is used to decode a MsgPack encoded object
func decodeUserEvent(buf []byte, out interface{}) error { func decodeUserEvent(buf []byte, out interface{}) error {
return codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out) return codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out)