From aad57e952efb92e595ebb9ecaa77ecec7c5ed9eb Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 28 Aug 2014 11:15:55 -0700 Subject: [PATCH] agent: Methods to get the user events --- command/agent/user_event.go | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/command/agent/user_event.go b/command/agent/user_event.go index c1e7fb1ff1..0540627acf 100644 --- a/command/agent/user_event.go +++ b/command/agent/user_event.go @@ -36,6 +36,9 @@ type UserEvent struct { // Version of the user event. Automatically generated. Version int `codec:"v"` + + // LTime is the lamport time. Automatically generated. + LTime uint64 `codec:"-"` } // validateUserEventParams is used to sanity check the inputs @@ -97,6 +100,7 @@ func (a *Agent) handleEvents() { a.logger.Printf("[ERR] agent: Failed to decode event: %v", err) continue } + msg.LTime = uint64(e.LTime) // Skip if we don't pass filtering if !a.shouldProcessUserEvent(msg) { @@ -197,6 +201,39 @@ func (a *Agent) ingestUserEvent(msg *UserEvent) { a.eventIndex = (idx + 1) % len(a.eventBuf) } +// UserEvents is used to return a slice of the most recent +// user events. +func (a *Agent) UserEvents() []*UserEvent { + n := len(a.eventBuf) + out := make([]*UserEvent, n) + a.eventLock.RLock() + defer a.eventLock.RUnlock() + + // Check if the buffer is full + if a.eventBuf[a.eventIndex] != nil { + if a.eventIndex == 0 { + copy(out, a.eventBuf) + } else { + copy(out, a.eventBuf[a.eventIndex:]) + copy(out[n-a.eventIndex:], a.eventBuf[:a.eventIndex]) + } + } else { + // We haven't filled the buffer yet + copy(out, a.eventBuf[:a.eventIndex]) + out = out[:a.eventIndex] + } + return out +} + +// LastUserEvent is used to return the lastest user event. +// This will return nil if there is no recent event. +func (a *Agent) LastUserEvent() *UserEvent { + a.eventLock.RLock() + defer a.eventLock.RUnlock() + idx := (a.eventIndex - 1) % len(a.eventBuf) + return a.eventBuf[idx] +} + // Decode is used to decode a MsgPack encoded object func decodeUserEvent(buf []byte, out interface{}) error { return codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out)