From 2d03146d3b9c8000ebee55b68aa08033f7c3464d Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 27 Aug 2014 17:01:10 -0700 Subject: [PATCH] agent: Adding event ingestion --- command/agent/agent.go | 10 ++++++++++ command/agent/user_event.go | 16 +++++++++++++++- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index 08964d3257..2170d0cee9 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -50,6 +50,15 @@ type Agent struct { // eventCh is used to receive user events eventCh chan serf.UserEvent + // eventBuf stores the most recent events in a ring buffer + // using eventIndex as the next index to insert into. This + // is guarded by eventLock. When an insert happens, the + // eventNotify group is notified. + eventBuf []*userEventEnc + eventIndex int + eventLock sync.RWMutex + eventNotify consul.NotifyGroup + shutdown bool shutdownCh chan struct{} shutdownLock sync.Mutex @@ -93,6 +102,7 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) { checkMonitors: make(map[string]*CheckMonitor), checkTTLs: make(map[string]*CheckTTL), eventCh: make(chan serf.UserEvent, 1024), + eventBuf: make([]*userEventEnc, 256), shutdownCh: make(chan struct{}), } diff --git a/command/agent/user_event.go b/command/agent/user_event.go index 501b58d4ed..58580af74c 100644 --- a/command/agent/user_event.go +++ b/command/agent/user_event.go @@ -115,7 +115,8 @@ func (a *Agent) handleEvents() { continue } - // TODO: Process event + // Ingest the event + a.ingestUserEvent(msg) case <-a.shutdownCh: return @@ -195,6 +196,19 @@ func (a *Agent) shouldProcessUserEvent(msg *userEventEnc) bool { return true } +// ingestUserEvent is used to process an event that passes filtering +func (a *Agent) ingestUserEvent(msg *userEventEnc) { + a.eventLock.Lock() + defer func() { + a.eventLock.Unlock() + a.eventNotify.Notify() + }() + + idx := a.eventIndex + a.eventBuf[idx] = msg + a.eventIndex = (idx + 1) % len(a.eventBuf) +} + // Decode is used to decode a MsgPack encoded object func decodeUserEvent(buf []byte, out interface{}) error { return codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out)