117 lines
3.3 KiB
Go
Raw Normal View History

package statsd
import (
"sync/atomic"
"time"
)
// A statsdWriter offers a standard interface regardless of the underlying
// protocol. For now UDS and UPD writers are available.
// Attention: the underlying buffer of `data` is reused after a `statsdWriter.Write` call.
// `statsdWriter.Write` must be synchronous.
type statsdWriter interface {
Write(data []byte) (n int, err error)
SetWriteTimeout(time.Duration) error
Close() error
}
// SenderMetrics contains metrics about the health of the sender
type SenderMetrics struct {
TotalSentBytes uint64
TotalSentPayloads uint64
TotalDroppedPayloads uint64
TotalDroppedBytes uint64
TotalDroppedPayloadsQueueFull uint64
TotalDroppedBytesQueueFull uint64
TotalDroppedPayloadsWriter uint64
TotalDroppedBytesWriter uint64
}
type sender struct {
transport statsdWriter
pool *bufferPool
queue chan *statsdBuffer
metrics SenderMetrics
stop chan struct{}
}
func newSender(transport statsdWriter, queueSize int, pool *bufferPool) *sender {
sender := &sender{
transport: transport,
pool: pool,
queue: make(chan *statsdBuffer, queueSize),
stop: make(chan struct{}),
}
go sender.sendLoop()
return sender
}
func (s *sender) send(buffer *statsdBuffer) {
select {
case s.queue <- buffer:
default:
atomic.AddUint64(&s.metrics.TotalDroppedPayloads, 1)
atomic.AddUint64(&s.metrics.TotalDroppedBytes, uint64(len(buffer.bytes())))
atomic.AddUint64(&s.metrics.TotalDroppedPayloadsQueueFull, 1)
atomic.AddUint64(&s.metrics.TotalDroppedBytesQueueFull, uint64(len(buffer.bytes())))
s.pool.returnBuffer(buffer)
}
}
func (s *sender) write(buffer *statsdBuffer) {
_, err := s.transport.Write(buffer.bytes())
if err != nil {
atomic.AddUint64(&s.metrics.TotalDroppedPayloads, 1)
atomic.AddUint64(&s.metrics.TotalDroppedBytes, uint64(len(buffer.bytes())))
atomic.AddUint64(&s.metrics.TotalDroppedPayloadsWriter, 1)
atomic.AddUint64(&s.metrics.TotalDroppedBytesWriter, uint64(len(buffer.bytes())))
} else {
atomic.AddUint64(&s.metrics.TotalSentPayloads, 1)
atomic.AddUint64(&s.metrics.TotalSentBytes, uint64(len(buffer.bytes())))
}
s.pool.returnBuffer(buffer)
}
func (s *sender) flushMetrics() SenderMetrics {
return SenderMetrics{
TotalSentBytes: atomic.SwapUint64(&s.metrics.TotalSentBytes, 0),
TotalSentPayloads: atomic.SwapUint64(&s.metrics.TotalSentPayloads, 0),
TotalDroppedPayloads: atomic.SwapUint64(&s.metrics.TotalDroppedPayloads, 0),
TotalDroppedBytes: atomic.SwapUint64(&s.metrics.TotalDroppedBytes, 0),
TotalDroppedPayloadsQueueFull: atomic.SwapUint64(&s.metrics.TotalDroppedPayloadsQueueFull, 0),
TotalDroppedBytesQueueFull: atomic.SwapUint64(&s.metrics.TotalDroppedBytesQueueFull, 0),
TotalDroppedPayloadsWriter: atomic.SwapUint64(&s.metrics.TotalDroppedPayloadsWriter, 0),
TotalDroppedBytesWriter: atomic.SwapUint64(&s.metrics.TotalDroppedBytesWriter, 0),
}
}
func (s *sender) sendLoop() {
for {
select {
case buffer := <-s.queue:
s.write(buffer)
case <-s.stop:
return
}
}
}
func (s *sender) flush() {
for {
select {
case buffer := <-s.queue:
s.write(buffer)
default:
return
}
}
}
func (s *sender) close() error {
s.flush()
err := s.transport.Close()
close(s.stop)
return err
}