221 lines
5.1 KiB
Go
221 lines
5.1 KiB
Go
|
package gitter
|
||
|
|
||
|
import (
|
||
|
"bufio"
|
||
|
"encoding/json"
|
||
|
"fmt"
|
||
|
"net/http"
|
||
|
"time"
|
||
|
|
||
|
"github.com/mreiferson/go-httpclient"
|
||
|
)
|
||
|
|
||
|
var defaultConnectionWaitTime time.Duration = 3000 // millis
|
||
|
var defaultConnectionMaxRetries = 5
|
||
|
|
||
|
// Stream initialize stream
|
||
|
func (gitter *Gitter) Stream(roomID string) *Stream {
|
||
|
return &Stream{
|
||
|
url: streamBaseURL + "rooms/" + roomID + "/chatMessages",
|
||
|
Event: make(chan Event),
|
||
|
gitter: gitter,
|
||
|
streamConnection: gitter.newStreamConnection(
|
||
|
defaultConnectionWaitTime,
|
||
|
defaultConnectionMaxRetries),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Implemented to conform with https://developer.gitter.im/docs/streaming-api
|
||
|
func (gitter *Gitter) Listen(stream *Stream) {
|
||
|
|
||
|
defer stream.destroy()
|
||
|
|
||
|
var reader *bufio.Reader
|
||
|
var gitterMessage Message
|
||
|
lastKeepalive := time.Now().Unix()
|
||
|
|
||
|
// connect
|
||
|
stream.connect()
|
||
|
|
||
|
Loop:
|
||
|
for {
|
||
|
|
||
|
// if closed then stop trying
|
||
|
if stream.isClosed() {
|
||
|
stream.Event <- Event{
|
||
|
Data: &GitterConnectionClosed{},
|
||
|
}
|
||
|
break Loop
|
||
|
}
|
||
|
|
||
|
resp := stream.getResponse()
|
||
|
if resp.StatusCode != 200 {
|
||
|
gitter.log(fmt.Sprintf("Unexpected response code %v", resp.StatusCode))
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
//"The JSON stream returns messages as JSON objects that are delimited by carriage return (\r)" <- Not true crap it's (\n) only
|
||
|
reader = bufio.NewReader(resp.Body)
|
||
|
line, err := reader.ReadBytes('\n')
|
||
|
|
||
|
//Check if the line only consists of whitespace
|
||
|
onlyWhitespace := true
|
||
|
for _, b := range line {
|
||
|
if b != ' ' && b != '\t' && b != '\r' && b != '\n' {
|
||
|
onlyWhitespace = false
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if onlyWhitespace {
|
||
|
//"Parsers must be tolerant of occasional extra newline characters placed between messages."
|
||
|
currentKeepalive := time.Now().Unix() //interesting behavior of 100+ keepalives per seconds was observed
|
||
|
if currentKeepalive-lastKeepalive > 10 {
|
||
|
lastKeepalive = currentKeepalive
|
||
|
gitter.log("Keepalive was received")
|
||
|
}
|
||
|
continue
|
||
|
} else if stream.isClosed() {
|
||
|
gitter.log("Stream closed")
|
||
|
continue
|
||
|
} else if err != nil {
|
||
|
gitter.log("ReadBytes error: " + err.Error())
|
||
|
stream.connect()
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
// unmarshal the streamed data
|
||
|
err = json.Unmarshal(line, &gitterMessage)
|
||
|
if err != nil {
|
||
|
gitter.log("JSON Unmarshal error: " + err.Error())
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
// we are here, then we got the good message. pipe it forward.
|
||
|
stream.Event <- Event{
|
||
|
Data: &MessageReceived{
|
||
|
Message: gitterMessage,
|
||
|
},
|
||
|
}
|
||
|
}
|
||
|
|
||
|
gitter.log("Listening was completed")
|
||
|
}
|
||
|
|
||
|
// Stream holds stream data.
|
||
|
type Stream struct {
|
||
|
url string
|
||
|
Event chan Event
|
||
|
streamConnection *streamConnection
|
||
|
gitter *Gitter
|
||
|
}
|
||
|
|
||
|
func (stream *Stream) destroy() {
|
||
|
close(stream.Event)
|
||
|
}
|
||
|
|
||
|
type Event struct {
|
||
|
Data interface{}
|
||
|
}
|
||
|
|
||
|
type GitterConnectionClosed struct {
|
||
|
}
|
||
|
|
||
|
type MessageReceived struct {
|
||
|
Message Message
|
||
|
}
|
||
|
|
||
|
// connect and try to reconnect with
|
||
|
func (stream *Stream) connect() {
|
||
|
|
||
|
if stream.streamConnection.retries == stream.streamConnection.currentRetries {
|
||
|
stream.Close()
|
||
|
stream.gitter.log("Number of retries exceeded the max retries number, we are done here")
|
||
|
return
|
||
|
}
|
||
|
|
||
|
res, err := stream.gitter.getResponse(stream.url, stream)
|
||
|
if stream.streamConnection.canceled {
|
||
|
// do nothing
|
||
|
} else if err != nil || res.StatusCode != 200 {
|
||
|
stream.gitter.log("Failed to get response, trying reconnect ")
|
||
|
stream.gitter.log(err)
|
||
|
|
||
|
// sleep and wait
|
||
|
stream.streamConnection.currentRetries++
|
||
|
time.Sleep(time.Millisecond * stream.streamConnection.wait * time.Duration(stream.streamConnection.currentRetries))
|
||
|
|
||
|
// connect again
|
||
|
stream.Close()
|
||
|
stream.connect()
|
||
|
} else {
|
||
|
stream.gitter.log("Response was received")
|
||
|
stream.streamConnection.currentRetries = 0
|
||
|
stream.streamConnection.closed = false
|
||
|
stream.streamConnection.response = res
|
||
|
}
|
||
|
}
|
||
|
|
||
|
type streamConnection struct {
|
||
|
|
||
|
// connection was closed
|
||
|
closed bool
|
||
|
|
||
|
// canceled
|
||
|
canceled bool
|
||
|
|
||
|
// wait time till next try
|
||
|
wait time.Duration
|
||
|
|
||
|
// max tries to recover
|
||
|
retries int
|
||
|
|
||
|
// current streamed response
|
||
|
response *http.Response
|
||
|
|
||
|
// current request
|
||
|
request *http.Request
|
||
|
|
||
|
// current status
|
||
|
currentRetries int
|
||
|
}
|
||
|
|
||
|
// Close the stream connection and stop receiving streamed data
|
||
|
func (stream *Stream) Close() {
|
||
|
conn := stream.streamConnection
|
||
|
conn.closed = true
|
||
|
if conn.response != nil {
|
||
|
stream.gitter.log("Stream connection close response")
|
||
|
defer conn.response.Body.Close()
|
||
|
}
|
||
|
if conn.request != nil {
|
||
|
stream.gitter.log("Stream connection close request")
|
||
|
switch transport := stream.gitter.config.client.Transport.(type) {
|
||
|
case *httpclient.Transport:
|
||
|
stream.streamConnection.canceled = true
|
||
|
transport.CancelRequest(conn.request)
|
||
|
default:
|
||
|
}
|
||
|
|
||
|
}
|
||
|
conn.currentRetries = 0
|
||
|
}
|
||
|
|
||
|
func (stream *Stream) isClosed() bool {
|
||
|
return stream.streamConnection.closed
|
||
|
}
|
||
|
|
||
|
func (stream *Stream) getResponse() *http.Response {
|
||
|
return stream.streamConnection.response
|
||
|
}
|
||
|
|
||
|
// Optional, set stream connection properties
|
||
|
// wait - time in milliseconds of waiting between reconnections. Will grow exponentially.
|
||
|
// retries - number of reconnections retries before dropping the stream.
|
||
|
func (gitter *Gitter) newStreamConnection(wait time.Duration, retries int) *streamConnection {
|
||
|
return &streamConnection{
|
||
|
closed: true,
|
||
|
wait: wait,
|
||
|
retries: retries,
|
||
|
}
|
||
|
}
|