diff --git a/trace.go b/trace.go index 744c01e..5b3a558 100644 --- a/trace.go +++ b/trace.go @@ -1,10 +1,6 @@ package pubsub import ( - "encoding/json" - "io" - "os" - "sync" "time" "github.com/libp2p/go-libp2p-core/peer" @@ -18,74 +14,6 @@ type EventTracer interface { Trace(evt interface{}) } -// JSONTracer is a tracer that writes events to a file, encoded in json. -type JSONTracer struct { - w io.WriteCloser - ch chan struct{} - mx sync.Mutex - buf []interface{} -} - -// NewJsonTracer creates a new JSON tracer writing to file. -func NewJSONTracer(file string) (*JSONTracer, error) { - return OpenJSONTracer(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) -} - -// OpenJsonTracer creates a new JSON tracer, with explicit control of OpenFile flags and permissions. -func OpenJSONTracer(file string, flags int, perm os.FileMode) (*JSONTracer, error) { - f, err := os.OpenFile(file, flags, perm) - if err != nil { - return nil, err - } - - tr := &JSONTracer{w: f, ch: make(chan struct{}, 1)} - go tr.doWrite() - - return tr, nil -} - -func (t *JSONTracer) Trace(evt interface{}) { - t.mx.Lock() - t.buf = append(t.buf, evt) - t.mx.Unlock() - - select { - case t.ch <- struct{}{}: - default: - } -} - -func (t *JSONTracer) Close() { - close(t.ch) -} - -func (t *JSONTracer) doWrite() { - var buf []interface{} - enc := json.NewEncoder(t.w) - for { - _, ok := <-t.ch - - t.mx.Lock() - tmp := t.buf - t.buf = buf[:0] - buf = tmp - t.mx.Unlock() - - for i, evt := range buf { - err := enc.Encode(evt) - if err != nil { - log.Errorf("error writing event trace: %s", err.Error()) - } - buf[i] = nil - } - - if !ok { - t.w.Close() - return - } - } -} - // pubsub tracer details type pubsubTracer struct { tracer EventTracer diff --git a/tracer.go b/tracer.go new file mode 100644 index 0000000..efa2ac6 --- /dev/null +++ b/tracer.go @@ -0,0 +1,76 @@ +package pubsub + +import ( + "encoding/json" + "io" + "os" + "sync" +) + +// JSONTracer is a tracer that writes events to a file, encoded in json. +type JSONTracer struct { + w io.WriteCloser + ch chan struct{} + mx sync.Mutex + buf []interface{} +} + +// NewJsonTracer creates a new JSON tracer writing to file. +func NewJSONTracer(file string) (*JSONTracer, error) { + return OpenJSONTracer(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) +} + +// OpenJsonTracer creates a new JSON tracer, with explicit control of OpenFile flags and permissions. +func OpenJSONTracer(file string, flags int, perm os.FileMode) (*JSONTracer, error) { + f, err := os.OpenFile(file, flags, perm) + if err != nil { + return nil, err + } + + tr := &JSONTracer{w: f, ch: make(chan struct{}, 1)} + go tr.doWrite() + + return tr, nil +} + +func (t *JSONTracer) Trace(evt interface{}) { + t.mx.Lock() + t.buf = append(t.buf, evt) + t.mx.Unlock() + + select { + case t.ch <- struct{}{}: + default: + } +} + +func (t *JSONTracer) Close() { + close(t.ch) +} + +func (t *JSONTracer) doWrite() { + var buf []interface{} + enc := json.NewEncoder(t.w) + for { + _, ok := <-t.ch + + t.mx.Lock() + tmp := t.buf + t.buf = buf[:0] + buf = tmp + t.mx.Unlock() + + for i, evt := range buf { + err := enc.Encode(evt) + if err != nil { + log.Errorf("error writing event trace: %s", err.Error()) + } + buf[i] = nil + } + + if !ok { + t.w.Close() + return + } + } +}