diff --git a/tracer.go b/tracer.go index efa2ac6..fe6e47f 100644 --- a/tracer.go +++ b/tracer.go @@ -5,35 +5,19 @@ import ( "io" "os" "sync" + + ggio "github.com/gogo/protobuf/io" + + pb "github.com/libp2p/go-libp2p-pubsub/pb" ) -// JSONTracer is a tracer that writes events to a file, encoded in json. -type JSONTracer struct { - w io.WriteCloser +type basicTracer struct { ch chan struct{} mx sync.Mutex buf []interface{} } -// NewJsonTracer creates a new JSON tracer writing to file. -func NewJSONTracer(file string) (*JSONTracer, error) { - return OpenJSONTracer(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) -} - -// OpenJsonTracer creates a new JSON tracer, with explicit control of OpenFile flags and permissions. -func OpenJSONTracer(file string, flags int, perm os.FileMode) (*JSONTracer, error) { - f, err := os.OpenFile(file, flags, perm) - if err != nil { - return nil, err - } - - tr := &JSONTracer{w: f, ch: make(chan struct{}, 1)} - go tr.doWrite() - - return tr, nil -} - -func (t *JSONTracer) Trace(evt interface{}) { +func (t *basicTracer) Trace(evt interface{}) { t.mx.Lock() t.buf = append(t.buf, evt) t.mx.Unlock() @@ -44,10 +28,34 @@ func (t *JSONTracer) Trace(evt interface{}) { } } -func (t *JSONTracer) Close() { +func (t *basicTracer) Close() { close(t.ch) } +// JSONTracer is a tracer that writes events to a file, encoded in ndjson. +type JSONTracer struct { + basicTracer + w io.WriteCloser +} + +// NewJsonTracer creates a new JSONTracer writing traces to file. +func NewJSONTracer(file string) (*JSONTracer, error) { + return OpenJSONTracer(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) +} + +// OpenJSONTracer creates a new JSONTracer, with explicit control of OpenFile flags and permissions. +func OpenJSONTracer(file string, flags int, perm os.FileMode) (*JSONTracer, error) { + f, err := os.OpenFile(file, flags, perm) + if err != nil { + return nil, err + } + + tr := &JSONTracer{w: f, basicTracer: basicTracer{ch: make(chan struct{}, 1)}} + go tr.doWrite() + + return tr, nil +} + func (t *JSONTracer) doWrite() { var buf []interface{} enc := json.NewEncoder(t.w) @@ -74,3 +82,57 @@ func (t *JSONTracer) doWrite() { } } } + +var _ EventTracer = (*JSONTracer)(nil) + +// PBTracer is a tracer that writes events to a file, as delimited protobufs. +type PBTracer struct { + basicTracer + w io.WriteCloser +} + +func NewPBTracer(file string) (*PBTracer, error) { + return OpenPBTracer(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) +} + +// OpenPBTracer creates a new PBTracer, with explicit control of OpenFile flags and permissions. +func OpenPBTracer(file string, flags int, perm os.FileMode) (*PBTracer, error) { + f, err := os.OpenFile(file, flags, perm) + if err != nil { + return nil, err + } + + tr := &PBTracer{w: f, basicTracer: basicTracer{ch: make(chan struct{}, 1)}} + go tr.doWrite() + + return tr, nil +} + +func (t *PBTracer) doWrite() { + var buf []interface{} + w := ggio.NewDelimitedWriter(t.w) + for { + _, ok := <-t.ch + + t.mx.Lock() + tmp := t.buf + t.buf = buf[:0] + buf = tmp + t.mx.Unlock() + + for i, evt := range buf { + err := w.WriteMsg(evt.(*pb.TraceEvent)) + if err != nil { + log.Errorf("error writing event trace: %s", err.Error()) + } + buf[i] = nil + } + + if !ok { + t.w.Close() + return + } + } +} + +var _ EventTracer = (*PBTracer)(nil)