go-libp2p-pubsub/tracer.go

298 lines
5.8 KiB
Go
Raw Normal View History

package pubsub
import (
2019-11-14 10:23:32 +00:00
"compress/gzip"
"context"
"encoding/json"
"io"
"os"
"sync"
2019-11-14 10:23:32 +00:00
"time"
2019-11-12 19:50:39 +00:00
pb "github.com/libp2p/go-libp2p-pubsub/pb"
2019-11-14 10:23:32 +00:00
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
ggio "github.com/gogo/protobuf/io"
)
var TraceBufferSize = 1 << 16 // 64K ought to be enough for everyone; famous last words.
2019-11-12 19:50:39 +00:00
type basicTracer struct {
ch chan struct{}
mx sync.Mutex
buf []*pb.TraceEvent
lossy bool
}
func (t *basicTracer) Trace(evt *pb.TraceEvent) {
2019-11-12 19:50:39 +00:00
t.mx.Lock()
if t.lossy && len(t.buf) > TraceBufferSize {
log.Warningf("trace buffer overflow; dropping trace event")
} else {
t.buf = append(t.buf, evt)
}
2019-11-12 19:50:39 +00:00
t.mx.Unlock()
select {
case t.ch <- struct{}{}:
default:
}
}
func (t *basicTracer) Close() {
close(t.ch)
}
// JSONTracer is a tracer that writes events to a file, encoded in ndjson.
type JSONTracer struct {
basicTracer
w io.WriteCloser
}
// NewJsonTracer creates a new JSONTracer writing traces to file.
func NewJSONTracer(file string) (*JSONTracer, error) {
return OpenJSONTracer(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
}
2019-11-12 19:50:39 +00:00
// OpenJSONTracer creates a new JSONTracer, with explicit control of OpenFile flags and permissions.
func OpenJSONTracer(file string, flags int, perm os.FileMode) (*JSONTracer, error) {
f, err := os.OpenFile(file, flags, perm)
if err != nil {
return nil, err
}
2019-11-12 19:50:39 +00:00
tr := &JSONTracer{w: f, basicTracer: basicTracer{ch: make(chan struct{}, 1)}}
go tr.doWrite()
return tr, nil
}
2019-11-12 19:50:39 +00:00
func (t *JSONTracer) doWrite() {
var buf []*pb.TraceEvent
2019-11-12 19:50:39 +00:00
enc := json.NewEncoder(t.w)
for {
_, ok := <-t.ch
2019-11-12 19:50:39 +00:00
t.mx.Lock()
tmp := t.buf
t.buf = buf[:0]
buf = tmp
t.mx.Unlock()
for i, evt := range buf {
err := enc.Encode(evt)
if err != nil {
log.Errorf("error writing event trace: %s", err.Error())
}
buf[i] = nil
}
if !ok {
t.w.Close()
return
}
}
}
2019-11-12 19:50:39 +00:00
var _ EventTracer = (*JSONTracer)(nil)
// PBTracer is a tracer that writes events to a file, as delimited protobufs.
type PBTracer struct {
basicTracer
w io.WriteCloser
}
2019-11-12 19:50:39 +00:00
func NewPBTracer(file string) (*PBTracer, error) {
return OpenPBTracer(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
}
// OpenPBTracer creates a new PBTracer, with explicit control of OpenFile flags and permissions.
func OpenPBTracer(file string, flags int, perm os.FileMode) (*PBTracer, error) {
f, err := os.OpenFile(file, flags, perm)
if err != nil {
return nil, err
}
tr := &PBTracer{w: f, basicTracer: basicTracer{ch: make(chan struct{}, 1)}}
go tr.doWrite()
return tr, nil
}
func (t *PBTracer) doWrite() {
var buf []*pb.TraceEvent
2019-11-12 19:50:39 +00:00
w := ggio.NewDelimitedWriter(t.w)
for {
_, ok := <-t.ch
t.mx.Lock()
tmp := t.buf
t.buf = buf[:0]
buf = tmp
t.mx.Unlock()
for i, evt := range buf {
err := w.WriteMsg(evt)
if err != nil {
log.Errorf("error writing event trace: %s", err.Error())
}
buf[i] = nil
}
if !ok {
t.w.Close()
return
}
}
}
2019-11-12 19:50:39 +00:00
var _ EventTracer = (*PBTracer)(nil)
2019-11-14 10:23:32 +00:00
const RemoteTracerProtoID = protocol.ID("/libp2p/pubsub/tracer/1.0.0")
// RemoteTracer is a tracer that sends trace events to a remote peer
type RemoteTracer struct {
basicTracer
ctx context.Context
host host.Host
pi peer.AddrInfo
}
// NewRemoteTracer constructs a RemoteTracer, tracing to the peer identified by pi
func NewRemoteTracer(ctx context.Context, host host.Host, pi peer.AddrInfo) (*RemoteTracer, error) {
tr := &RemoteTracer{ctx: ctx, host: host, pi: pi, basicTracer: basicTracer{ch: make(chan struct{}, 1), lossy: true}}
2019-11-14 10:23:32 +00:00
go tr.doWrite()
return tr, nil
}
func (t *RemoteTracer) doWrite() {
var buf []*pb.TraceEvent
s, err := t.openStream()
if err != nil {
log.Errorf("error opening remote tracer stream: %s", err.Error())
return
}
var batch pb.TraceEventBatch
gzipW := gzip.NewWriter(s)
w := ggio.NewDelimitedWriter(gzipW)
2019-11-14 10:23:32 +00:00
for {
_, ok := <-t.ch
// nil out the buffer to gc events when swapping buffers
2019-11-14 10:23:32 +00:00
for i := range buf {
buf[i] = nil
}
// wait a bit to accumulate a batch
time.Sleep(time.Second)
2019-11-14 10:23:32 +00:00
t.mx.Lock()
tmp := t.buf
t.buf = buf[:0]
buf = tmp
t.mx.Unlock()
if len(buf) == 0 {
goto end
}
batch.Batch = buf
2019-11-14 10:23:32 +00:00
err = w.WriteMsg(&batch)
if err != nil {
log.Errorf("error writing trace event batch: %s", err)
goto end
}
2019-11-14 10:23:32 +00:00
err = gzipW.Flush()
if err != nil {
log.Errorf("error flushin gzip stream: %s", err)
goto end
2019-11-14 10:23:32 +00:00
}
end:
if !ok {
2019-11-18 15:33:53 +00:00
if err != nil {
s.Reset()
} else {
gzipW.Close()
helpers.FullClose(s)
}
2019-11-14 10:23:32 +00:00
return
}
if err != nil {
2019-11-18 15:33:53 +00:00
s.Reset()
s, err = t.openStream()
if err != nil {
log.Errorf("error opening remote tracer stream: %s", err.Error())
return
}
gzipW.Reset(s)
}
2019-11-14 10:23:32 +00:00
}
}
func (t *RemoteTracer) connect() error {
for {
ctx, cancel := context.WithTimeout(t.ctx, time.Minute)
err := t.host.Connect(ctx, t.pi)
cancel()
if err != nil {
if t.ctx.Err() != nil {
return err
}
// wait a minute and try again, to account for transient server downtime
select {
case <-time.After(time.Minute):
continue
case <-t.ctx.Done():
return t.ctx.Err()
}
}
return nil
}
}
func (t *RemoteTracer) openStream() (network.Stream, error) {
for {
err := t.connect()
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(t.ctx, time.Minute)
s, err := t.host.NewStream(ctx, t.pi.ID, RemoteTracerProtoID)
cancel()
if err != nil {
if t.ctx.Err() != nil {
return nil, err
}
// wait a minute and try again, to account for transient server downtime
select {
case <-time.After(time.Minute):
continue
case <-t.ctx.Done():
return nil, t.ctx.Err()
}
}
return s, nil
}
}
var _ EventTracer = (*RemoteTracer)(nil)