matterbridge/vendor/github.com/sromku/go-gitter/stream.go

221 lines
5.1 KiB
Go

package gitter
import (
"bufio"
"encoding/json"
"fmt"
"net/http"
"time"
"github.com/mreiferson/go-httpclient"
)
var defaultConnectionWaitTime time.Duration = 3000 // millis
var defaultConnectionMaxRetries = 5
// Stream initialize stream
func (gitter *Gitter) Stream(roomID string) *Stream {
return &Stream{
url: streamBaseURL + "rooms/" + roomID + "/chatMessages",
Event: make(chan Event),
gitter: gitter,
streamConnection: gitter.newStreamConnection(
defaultConnectionWaitTime,
defaultConnectionMaxRetries),
}
}
// Implemented to conform with https://developer.gitter.im/docs/streaming-api
func (gitter *Gitter) Listen(stream *Stream) {
defer stream.destroy()
var reader *bufio.Reader
var gitterMessage Message
lastKeepalive := time.Now().Unix()
// connect
stream.connect()
Loop:
for {
// if closed then stop trying
if stream.isClosed() {
stream.Event <- Event{
Data: &GitterConnectionClosed{},
}
break Loop
}
resp := stream.getResponse()
if resp.StatusCode != 200 {
gitter.log(fmt.Sprintf("Unexpected response code %v", resp.StatusCode))
continue
}
//"The JSON stream returns messages as JSON objects that are delimited by carriage return (\r)" <- Not true crap it's (\n) only
reader = bufio.NewReader(resp.Body)
line, err := reader.ReadBytes('\n')
//Check if the line only consists of whitespace
onlyWhitespace := true
for _, b := range line {
if b != ' ' && b != '\t' && b != '\r' && b != '\n' {
onlyWhitespace = false
}
}
if onlyWhitespace {
//"Parsers must be tolerant of occasional extra newline characters placed between messages."
currentKeepalive := time.Now().Unix() //interesting behavior of 100+ keepalives per seconds was observed
if currentKeepalive-lastKeepalive > 10 {
lastKeepalive = currentKeepalive
gitter.log("Keepalive was received")
}
continue
} else if stream.isClosed() {
gitter.log("Stream closed")
continue
} else if err != nil {
gitter.log("ReadBytes error: " + err.Error())
stream.connect()
continue
}
// unmarshal the streamed data
err = json.Unmarshal(line, &gitterMessage)
if err != nil {
gitter.log("JSON Unmarshal error: " + err.Error())
continue
}
// we are here, then we got the good message. pipe it forward.
stream.Event <- Event{
Data: &MessageReceived{
Message: gitterMessage,
},
}
}
gitter.log("Listening was completed")
}
// Stream holds stream data.
type Stream struct {
url string
Event chan Event
streamConnection *streamConnection
gitter *Gitter
}
func (stream *Stream) destroy() {
close(stream.Event)
}
type Event struct {
Data interface{}
}
type GitterConnectionClosed struct {
}
type MessageReceived struct {
Message Message
}
// connect and try to reconnect with
func (stream *Stream) connect() {
if stream.streamConnection.retries == stream.streamConnection.currentRetries {
stream.Close()
stream.gitter.log("Number of retries exceeded the max retries number, we are done here")
return
}
res, err := stream.gitter.getResponse(stream.url, stream)
if stream.streamConnection.canceled {
// do nothing
} else if err != nil || res.StatusCode != 200 {
stream.gitter.log("Failed to get response, trying reconnect ")
stream.gitter.log(err)
// sleep and wait
stream.streamConnection.currentRetries++
time.Sleep(time.Millisecond * stream.streamConnection.wait * time.Duration(stream.streamConnection.currentRetries))
// connect again
stream.Close()
stream.connect()
} else {
stream.gitter.log("Response was received")
stream.streamConnection.currentRetries = 0
stream.streamConnection.closed = false
stream.streamConnection.response = res
}
}
type streamConnection struct {
// connection was closed
closed bool
// canceled
canceled bool
// wait time till next try
wait time.Duration
// max tries to recover
retries int
// current streamed response
response *http.Response
// current request
request *http.Request
// current status
currentRetries int
}
// Close the stream connection and stop receiving streamed data
func (stream *Stream) Close() {
conn := stream.streamConnection
conn.closed = true
if conn.response != nil {
stream.gitter.log("Stream connection close response")
defer conn.response.Body.Close()
}
if conn.request != nil {
stream.gitter.log("Stream connection close request")
switch transport := stream.gitter.config.client.Transport.(type) {
case *httpclient.Transport:
stream.streamConnection.canceled = true
transport.CancelRequest(conn.request)
default:
}
}
conn.currentRetries = 0
}
func (stream *Stream) isClosed() bool {
return stream.streamConnection.closed
}
func (stream *Stream) getResponse() *http.Response {
return stream.streamConnection.response
}
// Optional, set stream connection properties
// wait - time in milliseconds of waiting between reconnections. Will grow exponentially.
// retries - number of reconnections retries before dropping the stream.
func (gitter *Gitter) newStreamConnection(wait time.Duration, retries int) *streamConnection {
return &streamConnection{
closed: true,
wait: wait,
retries: retries,
}
}