206 lines
5.0 KiB
Go
Raw Normal View History

2022-04-01 12:16:46 -04:00
package gojay
import (
"strconv"
"sync"
"time"
)
// MarshalerStream is the interface to implement
// to continuously encode of stream of data.
type MarshalerStream interface {
MarshalStream(enc *StreamEncoder)
}
// A StreamEncoder reads and encodes values to JSON from an input stream.
//
// It implements conext.Context and provide a channel to notify interruption.
type StreamEncoder struct {
mux *sync.RWMutex
*Encoder
nConsumer int
delimiter byte
deadline *time.Time
done chan struct{}
}
// EncodeStream spins up a defined number of non blocking consumers of the MarshalerStream m.
//
// m must implement MarshalerStream. Ideally m is a channel. See example for implementation.
//
// See the documentation for Marshal for details about the conversion of Go value to JSON.
func (s *StreamEncoder) EncodeStream(m MarshalerStream) {
// if a single consumer, just use this encoder
if s.nConsumer == 1 {
go consume(s, s, m)
return
}
// else use this Encoder only for first consumer
// and use new encoders for other consumers
// this is to avoid concurrent writing to same buffer
// resulting in a weird JSON
go consume(s, s, m)
for i := 1; i < s.nConsumer; i++ {
s.mux.RLock()
select {
case <-s.done:
default:
ss := Stream.borrowEncoder(s.w)
ss.mux.Lock()
ss.done = s.done
ss.buf = make([]byte, 0, 512)
ss.delimiter = s.delimiter
go consume(s, ss, m)
ss.mux.Unlock()
}
s.mux.RUnlock()
}
return
}
// LineDelimited sets the delimiter to a new line character.
//
// It will add a new line after each JSON marshaled by the MarshalerStream
func (s *StreamEncoder) LineDelimited() *StreamEncoder {
s.delimiter = '\n'
return s
}
// CommaDelimited sets the delimiter to a comma.
//
// It will add a new line after each JSON marshaled by the MarshalerStream
func (s *StreamEncoder) CommaDelimited() *StreamEncoder {
s.delimiter = ','
return s
}
// NConsumer sets the number of non blocking go routine to consume the stream.
func (s *StreamEncoder) NConsumer(n int) *StreamEncoder {
s.nConsumer = n
return s
}
// Release sends back a Decoder to the pool.
// If a decoder is used after calling Release
// a panic will be raised with an InvalidUsagePooledDecoderError error.
func (s *StreamEncoder) Release() {
s.isPooled = 1
streamEncPool.Put(s)
}
// Done returns a channel that's closed when work is done.
// It implements context.Context
func (s *StreamEncoder) Done() <-chan struct{} {
return s.done
}
// Err returns nil if Done is not yet closed.
// If Done is closed, Err returns a non-nil error explaining why.
// It implements context.Context
func (s *StreamEncoder) Err() error {
return s.err
}
// Deadline returns the time when work done on behalf of this context
// should be canceled. Deadline returns ok==false when no deadline is
// set. Successive calls to Deadline return the same results.
func (s *StreamEncoder) Deadline() (time.Time, bool) {
if s.deadline != nil {
return *s.deadline, true
}
return time.Time{}, false
}
// SetDeadline sets the deadline
func (s *StreamEncoder) SetDeadline(t time.Time) {
s.deadline = &t
}
// Value implements context.Context
func (s *StreamEncoder) Value(key interface{}) interface{} {
return nil
}
// Cancel cancels the consumers of the stream, interrupting the stream encoding.
//
// After calling cancel, Done() will return a closed channel.
func (s *StreamEncoder) Cancel(err error) {
s.mux.Lock()
defer s.mux.Unlock()
select {
case <-s.done:
default:
s.err = err
close(s.done)
}
}
// AddObject adds an object to be encoded.
// value must implement MarshalerJSONObject.
func (s *StreamEncoder) AddObject(v MarshalerJSONObject) {
if v.IsNil() {
return
}
s.Encoder.writeByte('{')
v.MarshalJSONObject(s.Encoder)
s.Encoder.writeByte('}')
s.Encoder.writeByte(s.delimiter)
}
// AddString adds a string to be encoded.
func (s *StreamEncoder) AddString(v string) {
s.Encoder.writeByte('"')
s.Encoder.writeString(v)
s.Encoder.writeByte('"')
s.Encoder.writeByte(s.delimiter)
}
// AddArray adds an implementation of MarshalerJSONArray to be encoded.
func (s *StreamEncoder) AddArray(v MarshalerJSONArray) {
s.Encoder.writeByte('[')
v.MarshalJSONArray(s.Encoder)
s.Encoder.writeByte(']')
s.Encoder.writeByte(s.delimiter)
}
// AddInt adds an int to be encoded.
func (s *StreamEncoder) AddInt(value int) {
s.buf = strconv.AppendInt(s.buf, int64(value), 10)
s.Encoder.writeByte(s.delimiter)
}
// AddFloat64 adds a float64 to be encoded.
func (s *StreamEncoder) AddFloat64(value float64) {
s.buf = strconv.AppendFloat(s.buf, value, 'f', -1, 64)
s.Encoder.writeByte(s.delimiter)
}
// AddFloat adds a float64 to be encoded.
func (s *StreamEncoder) AddFloat(value float64) {
s.AddFloat64(value)
}
// Non exposed
func consume(init *StreamEncoder, s *StreamEncoder, m MarshalerStream) {
defer s.Release()
for {
select {
case <-init.Done():
return
default:
m.MarshalStream(s)
if s.Encoder.err != nil {
init.Cancel(s.Encoder.err)
return
}
i, err := s.Encoder.Write()
if err != nil || i == 0 {
init.Cancel(err)
return
}
}
}
}