diff --git a/tracer.go b/tracer.go index dbef80a..3893cb3 100644 --- a/tracer.go +++ b/tracer.go @@ -1,7 +1,6 @@ package pubsub import ( - "bytes" "compress/gzip" "context" "encoding/json" @@ -173,7 +172,10 @@ func (t *RemoteTracer) doWrite() { return } - w := ggio.NewDelimitedWriter(s) + var batch pb.TraceEventBatch + + gzipW := gzip.NewWriter(s) + w := ggio.NewDelimitedWriter(gzipW) for { _, ok := <-t.ch @@ -196,53 +198,36 @@ func (t *RemoteTracer) doWrite() { goto end } - { - batch := &pb.TraceEventBatch{Batch: buf} - blob, err := batch.Marshal() - if err != nil { - log.Errorf("error marshalling trace event batch: %s", err.Error()) - goto end - } + batch.Batch = buf - // compress batch - var cbuf bytes.Buffer - gzipW := gzip.NewWriter(&cbuf) - _, err = gzipW.Write(blob) - if err != nil { - log.Errorf("error compressing trace event batch: %s", err.Error()) - goto end - } - err = gzipW.Close() - if err != nil { - log.Errorf("error compressing trace event batch: %s", err.Error()) - goto end - } + err = w.WriteMsg(&batch) + if err != nil { + log.Errorf("error writing trace event batch: %s", err) + goto end + } - cblob := cbuf.Bytes() - cbatch := &pb.CompressedTraceEventBatch{Data: cblob} - err = w.WriteMsg(cbatch) - if err != nil { - log.Errorf("error writing trace event data: %s", err.Error()) - if !ok { - goto end - } - - // reset output - s.Reset() - s, err = t.openStream() - if err != nil { - log.Errorf("error opening remote tracer stream: %s", err.Error()) - return - } - w = ggio.NewDelimitedWriter(s) - } + err = gzipW.Flush() + if err != nil { + log.Errorf("error flushin gzip stream: %s", err) + goto end } end: if !ok { + gzipW.Close() helpers.FullClose(s) return } + + if err != nil { + s, err = t.openStream() + if err != nil { + log.Errorf("error opening remote tracer stream: %s", err.Error()) + return + } + + gzipW.Reset(s) + } } }