go-libp2p-pubsub/tracer.go

302 lines
6.1 KiB
Go
Raw Normal View History

package pubsub
import (
2019-11-14 10:23:32 +00:00
"compress/gzip"
"context"
"encoding/json"
"io"
"os"
"sync"
2019-11-14 10:23:32 +00:00
"time"
2019-11-12 19:50:39 +00:00
pb "github.com/libp2p/go-libp2p-pubsub/pb"
2019-11-14 10:23:32 +00:00
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
2019-11-14 10:23:32 +00:00
"github.com/libp2p/go-libp2p-core/protocol"
ggio "github.com/gogo/protobuf/io"
)
var TraceBufferSize = 1 << 16 // 64K ought to be enough for everyone; famous last words.
var MinTraceBatchSize = 16
// rejection reasons
const (
rejectBlacklstedPeer = "blacklisted peer"
rejectBlacklistedSource = "blacklisted source"
rejectMissingSignature = "missing signature"
rejectInvalidSignature = "invalid signature"
rejectValidationQueueFull = "validation queue full"
rejectValidationThrottled = "validation throttled"
rejectValidationFailed = "validation failed"
rejectSelfOrigin = "self originated message"
)
2019-11-12 19:50:39 +00:00
type basicTracer struct {
2019-11-18 22:29:23 +00:00
ch chan struct{}
mx sync.Mutex
buf []*pb.TraceEvent
lossy bool
closed bool
}
func (t *basicTracer) Trace(evt *pb.TraceEvent) {
2019-11-12 19:50:39 +00:00
t.mx.Lock()
2019-11-18 22:29:23 +00:00
if t.closed {
t.mx.Unlock()
return
}
if t.lossy && len(t.buf) > TraceBufferSize {
log.Warningf("trace buffer overflow; dropping trace event")
} else {
t.buf = append(t.buf, evt)
}
2019-11-12 19:50:39 +00:00
t.mx.Unlock()
select {
case t.ch <- struct{}{}:
default:
}
}
func (t *basicTracer) Close() {
2019-11-18 22:29:23 +00:00
t.mx.Lock()
defer t.mx.Unlock()
if !t.closed {
t.closed = true
close(t.ch)
}
2019-11-12 19:50:39 +00:00
}
// JSONTracer is a tracer that writes events to a file, encoded in ndjson.
type JSONTracer struct {
basicTracer
w io.WriteCloser
}
// NewJsonTracer creates a new JSONTracer writing traces to file.
func NewJSONTracer(file string) (*JSONTracer, error) {
return OpenJSONTracer(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
}
2019-11-12 19:50:39 +00:00
// OpenJSONTracer creates a new JSONTracer, with explicit control of OpenFile flags and permissions.
func OpenJSONTracer(file string, flags int, perm os.FileMode) (*JSONTracer, error) {
f, err := os.OpenFile(file, flags, perm)
if err != nil {
return nil, err
}
2019-11-12 19:50:39 +00:00
tr := &JSONTracer{w: f, basicTracer: basicTracer{ch: make(chan struct{}, 1)}}
go tr.doWrite()
return tr, nil
}
2019-11-12 19:50:39 +00:00
func (t *JSONTracer) doWrite() {
var buf []*pb.TraceEvent
2019-11-12 19:50:39 +00:00
enc := json.NewEncoder(t.w)
for {
_, ok := <-t.ch
2019-11-12 19:50:39 +00:00
t.mx.Lock()
tmp := t.buf
t.buf = buf[:0]
buf = tmp
t.mx.Unlock()
for i, evt := range buf {
err := enc.Encode(evt)
if err != nil {
log.Errorf("error writing event trace: %s", err.Error())
}
buf[i] = nil
}
if !ok {
t.w.Close()
return
}
}
}
2019-11-12 19:50:39 +00:00
var _ EventTracer = (*JSONTracer)(nil)
// PBTracer is a tracer that writes events to a file, as delimited protobufs.
type PBTracer struct {
basicTracer
w io.WriteCloser
}
2019-11-12 19:50:39 +00:00
func NewPBTracer(file string) (*PBTracer, error) {
return OpenPBTracer(file, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
}
// OpenPBTracer creates a new PBTracer, with explicit control of OpenFile flags and permissions.
func OpenPBTracer(file string, flags int, perm os.FileMode) (*PBTracer, error) {
f, err := os.OpenFile(file, flags, perm)
if err != nil {
return nil, err
}
tr := &PBTracer{w: f, basicTracer: basicTracer{ch: make(chan struct{}, 1)}}
go tr.doWrite()
return tr, nil
}
func (t *PBTracer) doWrite() {
var buf []*pb.TraceEvent
2019-11-12 19:50:39 +00:00
w := ggio.NewDelimitedWriter(t.w)
for {
_, ok := <-t.ch
t.mx.Lock()
tmp := t.buf
t.buf = buf[:0]
buf = tmp
t.mx.Unlock()
for i, evt := range buf {
err := w.WriteMsg(evt)
if err != nil {
log.Errorf("error writing event trace: %s", err.Error())
}
buf[i] = nil
}
if !ok {
t.w.Close()
return
}
}
}
2019-11-12 19:50:39 +00:00
var _ EventTracer = (*PBTracer)(nil)
2019-11-14 10:23:32 +00:00
const RemoteTracerProtoID = protocol.ID("/libp2p/pubsub/tracer/1.0.0")
// RemoteTracer is a tracer that sends trace events to a remote peer
type RemoteTracer struct {
basicTracer
ctx context.Context
host host.Host
peer peer.ID
2019-11-14 10:23:32 +00:00
}
// NewRemoteTracer constructs a RemoteTracer, tracing to the peer identified by pi
func NewRemoteTracer(ctx context.Context, host host.Host, pi peer.AddrInfo) (*RemoteTracer, error) {
tr := &RemoteTracer{ctx: ctx, host: host, peer: pi.ID, basicTracer: basicTracer{ch: make(chan struct{}, 1), lossy: true}}
2019-11-19 00:28:23 +00:00
host.Peerstore().AddAddrs(pi.ID, pi.Addrs, peerstore.PermanentAddrTTL)
2019-11-14 10:23:32 +00:00
go tr.doWrite()
return tr, nil
}
func (t *RemoteTracer) doWrite() {
var buf []*pb.TraceEvent
s, err := t.openStream()
if err != nil {
log.Errorf("error opening remote tracer stream: %s", err.Error())
return
}
var batch pb.TraceEventBatch
gzipW := gzip.NewWriter(s)
w := ggio.NewDelimitedWriter(gzipW)
2019-11-14 10:23:32 +00:00
for {
_, ok := <-t.ch
// deadline for batch accumulation
deadline := time.Now().Add(time.Second)
2019-11-14 10:23:32 +00:00
t.mx.Lock()
2019-11-19 00:28:23 +00:00
for len(t.buf) < MinTraceBatchSize && time.Now().Before(deadline) {
t.mx.Unlock()
time.Sleep(100 * time.Millisecond)
2019-11-19 00:28:23 +00:00
t.mx.Lock()
}
2019-11-14 10:23:32 +00:00
tmp := t.buf
t.buf = buf[:0]
buf = tmp
t.mx.Unlock()
if len(buf) == 0 {
goto end
}
batch.Batch = buf
2019-11-14 10:23:32 +00:00
err = w.WriteMsg(&batch)
if err != nil {
log.Errorf("error writing trace event batch: %s", err)
goto end
}
2019-11-14 10:23:32 +00:00
err = gzipW.Flush()
if err != nil {
log.Errorf("error flushin gzip stream: %s", err)
goto end
2019-11-14 10:23:32 +00:00
}
end:
// nil out the buffer to gc consumed events
for i := range buf {
buf[i] = nil
}
2019-11-14 10:23:32 +00:00
if !ok {
2019-11-18 15:33:53 +00:00
if err != nil {
s.Reset()
} else {
gzipW.Close()
helpers.FullClose(s)
}
2019-11-14 10:23:32 +00:00
return
}
if err != nil {
2019-11-18 15:33:53 +00:00
s.Reset()
s, err = t.openStream()
if err != nil {
log.Errorf("error opening remote tracer stream: %s", err.Error())
return
}
gzipW.Reset(s)
}
2019-11-14 10:23:32 +00:00
}
}
func (t *RemoteTracer) openStream() (network.Stream, error) {
for {
ctx, cancel := context.WithTimeout(t.ctx, time.Minute)
s, err := t.host.NewStream(ctx, t.peer, RemoteTracerProtoID)
2019-11-14 10:23:32 +00:00
cancel()
if err != nil {
if t.ctx.Err() != nil {
return nil, err
}
// wait a minute and try again, to account for transient server downtime
select {
case <-time.After(time.Minute):
continue
case <-t.ctx.Done():
return nil, t.ctx.Err()
}
}
return s, nil
}
}
var _ EventTracer = (*RemoteTracer)(nil)