compress entire stream in remote tracer
This commit is contained in:
parent
db8e2195ea
commit
abe4763c01
65
tracer.go
65
tracer.go
|
@ -1,7 +1,6 @@
|
|||
package pubsub
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
@ -173,7 +172,10 @@ func (t *RemoteTracer) doWrite() {
|
|||
return
|
||||
}
|
||||
|
||||
w := ggio.NewDelimitedWriter(s)
|
||||
var batch pb.TraceEventBatch
|
||||
|
||||
gzipW := gzip.NewWriter(s)
|
||||
w := ggio.NewDelimitedWriter(gzipW)
|
||||
|
||||
for {
|
||||
_, ok := <-t.ch
|
||||
|
@ -196,53 +198,36 @@ func (t *RemoteTracer) doWrite() {
|
|||
goto end
|
||||
}
|
||||
|
||||
{
|
||||
batch := &pb.TraceEventBatch{Batch: buf}
|
||||
blob, err := batch.Marshal()
|
||||
if err != nil {
|
||||
log.Errorf("error marshalling trace event batch: %s", err.Error())
|
||||
goto end
|
||||
}
|
||||
batch.Batch = buf
|
||||
|
||||
// compress batch
|
||||
var cbuf bytes.Buffer
|
||||
gzipW := gzip.NewWriter(&cbuf)
|
||||
_, err = gzipW.Write(blob)
|
||||
if err != nil {
|
||||
log.Errorf("error compressing trace event batch: %s", err.Error())
|
||||
goto end
|
||||
}
|
||||
err = gzipW.Close()
|
||||
if err != nil {
|
||||
log.Errorf("error compressing trace event batch: %s", err.Error())
|
||||
goto end
|
||||
}
|
||||
err = w.WriteMsg(&batch)
|
||||
if err != nil {
|
||||
log.Errorf("error writing trace event batch: %s", err)
|
||||
goto end
|
||||
}
|
||||
|
||||
cblob := cbuf.Bytes()
|
||||
cbatch := &pb.CompressedTraceEventBatch{Data: cblob}
|
||||
err = w.WriteMsg(cbatch)
|
||||
if err != nil {
|
||||
log.Errorf("error writing trace event data: %s", err.Error())
|
||||
if !ok {
|
||||
goto end
|
||||
}
|
||||
|
||||
// reset output
|
||||
s.Reset()
|
||||
s, err = t.openStream()
|
||||
if err != nil {
|
||||
log.Errorf("error opening remote tracer stream: %s", err.Error())
|
||||
return
|
||||
}
|
||||
w = ggio.NewDelimitedWriter(s)
|
||||
}
|
||||
err = gzipW.Flush()
|
||||
if err != nil {
|
||||
log.Errorf("error flushin gzip stream: %s", err)
|
||||
goto end
|
||||
}
|
||||
|
||||
end:
|
||||
if !ok {
|
||||
gzipW.Close()
|
||||
helpers.FullClose(s)
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
s, err = t.openStream()
|
||||
if err != nil {
|
||||
log.Errorf("error opening remote tracer stream: %s", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
gzipW.Reset(s)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue