go-libp2p-pubsub/tracer.go

139 lines
2.6 KiB
Go
Raw Normal View History

package pubsub
import (
"encoding/json"
"io"
"os"
"sync"
2019-11-12 19:50:39 +00:00
ggio "github.com/gogo/protobuf/io"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
)
2019-11-12 19:50:39 +00:00
type basicTracer struct {
ch chan struct{}
mx sync.Mutex
buf []interface{}
}
2019-11-12 19:50:39 +00:00
func (t *basicTracer) Trace(evt interface{}) {
t.mx.Lock()
t.buf = append(t.buf, evt)
t.mx.Unlock()
select {
case t.ch <- struct{}{}:
default:
}
}
func (t *basicTracer) Close() {
close(t.ch)
}
// JSONTracer is a tracer that writes events to a file, encoded in ndjson.
type JSONTracer struct {
basicTracer
w io.WriteCloser
}
// NewJsonTracer creates a new JSONTracer writing traces to file.
func NewJSONTracer(file string) (*JSONTracer, error) {
return OpenJSONTracer(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
}
2019-11-12 19:50:39 +00:00
// OpenJSONTracer creates a new JSONTracer, with explicit control of OpenFile flags and permissions.
func OpenJSONTracer(file string, flags int, perm os.FileMode) (*JSONTracer, error) {
f, err := os.OpenFile(file, flags, perm)
if err != nil {
return nil, err
}
2019-11-12 19:50:39 +00:00
tr := &JSONTracer{w: f, basicTracer: basicTracer{ch: make(chan struct{}, 1)}}
go tr.doWrite()
return tr, nil
}
2019-11-12 19:50:39 +00:00
func (t *JSONTracer) doWrite() {
var buf []interface{}
enc := json.NewEncoder(t.w)
for {
_, ok := <-t.ch
2019-11-12 19:50:39 +00:00
t.mx.Lock()
tmp := t.buf
t.buf = buf[:0]
buf = tmp
t.mx.Unlock()
for i, evt := range buf {
err := enc.Encode(evt)
if err != nil {
log.Errorf("error writing event trace: %s", err.Error())
}
buf[i] = nil
}
if !ok {
t.w.Close()
return
}
}
}
2019-11-12 19:50:39 +00:00
var _ EventTracer = (*JSONTracer)(nil)
// PBTracer is a tracer that writes events to a file, as delimited protobufs.
type PBTracer struct {
basicTracer
w io.WriteCloser
}
2019-11-12 19:50:39 +00:00
func NewPBTracer(file string) (*PBTracer, error) {
return OpenPBTracer(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
}
// OpenPBTracer creates a new PBTracer, with explicit control of OpenFile flags and permissions.
func OpenPBTracer(file string, flags int, perm os.FileMode) (*PBTracer, error) {
f, err := os.OpenFile(file, flags, perm)
if err != nil {
return nil, err
}
tr := &PBTracer{w: f, basicTracer: basicTracer{ch: make(chan struct{}, 1)}}
go tr.doWrite()
return tr, nil
}
func (t *PBTracer) doWrite() {
var buf []interface{}
2019-11-12 19:50:39 +00:00
w := ggio.NewDelimitedWriter(t.w)
for {
_, ok := <-t.ch
t.mx.Lock()
tmp := t.buf
t.buf = buf[:0]
buf = tmp
t.mx.Unlock()
for i, evt := range buf {
2019-11-12 19:50:39 +00:00
err := w.WriteMsg(evt.(*pb.TraceEvent))
if err != nil {
log.Errorf("error writing event trace: %s", err.Error())
}
buf[i] = nil
}
if !ok {
t.w.Close()
return
}
}
}
2019-11-12 19:50:39 +00:00
var _ EventTracer = (*PBTracer)(nil)