mirror of
https://github.com/status-im/matterbridge.git
synced 2025-01-17 09:41:10 +00:00
Add vendor matterbridge/gozulipbot
This commit is contained in:
parent
c6c92e273d
commit
1605fbc012
256
vendor/github.com/matterbridge/gozulipbot/bot.go
generated
vendored
Normal file
256
vendor/github.com/matterbridge/gozulipbot/bot.go
generated
vendored
Normal file
@ -0,0 +1,256 @@
|
||||
package gozulipbot
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Bot struct {
|
||||
APIKey string
|
||||
APIURL string
|
||||
Email string
|
||||
Queues []*Queue
|
||||
Streams []string
|
||||
Client Doer
|
||||
Backoff time.Duration
|
||||
Retries int64
|
||||
}
|
||||
|
||||
type Doer interface {
|
||||
Do(*http.Request) (*http.Response, error)
|
||||
}
|
||||
|
||||
// Init adds an http client to an existing bot struct.
|
||||
func (b *Bot) Init() *Bot {
|
||||
b.Client = &http.Client{}
|
||||
return b
|
||||
}
|
||||
|
||||
// GetStreamList gets the raw http response when requesting all public streams.
|
||||
func (b *Bot) GetStreamList() (*http.Response, error) {
|
||||
req, err := b.constructRequest("GET", "streams", "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return b.Client.Do(req)
|
||||
}
|
||||
|
||||
type StreamJSON struct {
|
||||
Msg string `json:"msg"`
|
||||
Streams []struct {
|
||||
StreamID int `json:"stream_id"`
|
||||
InviteOnly bool `json:"invite_only"`
|
||||
Description string `json:"description"`
|
||||
Name string `json:"name"`
|
||||
} `json:"streams"`
|
||||
Result string `json:"result"`
|
||||
}
|
||||
|
||||
// GetStreams returns a list of all public streams
|
||||
func (b *Bot) GetStreams() ([]string, error) {
|
||||
resp, err := b.GetStreamList()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var sj StreamJSON
|
||||
err = json.Unmarshal(body, &sj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var streams []string
|
||||
for _, s := range sj.Streams {
|
||||
streams = append(streams, s.Name)
|
||||
}
|
||||
|
||||
return streams, nil
|
||||
}
|
||||
|
||||
// GetStreams returns a list of all public streams
|
||||
func (b *Bot) GetRawStreams() (StreamJSON, error) {
|
||||
var sj StreamJSON
|
||||
resp, err := b.GetStreamList()
|
||||
if err != nil {
|
||||
return sj, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return sj, err
|
||||
}
|
||||
|
||||
err = json.Unmarshal(body, &sj)
|
||||
if err != nil {
|
||||
return sj, err
|
||||
}
|
||||
return sj, nil
|
||||
}
|
||||
|
||||
// Subscribe will set the bot to receive messages from the given streams.
|
||||
// If no streams are given, it will subscribe the bot to the streams in the bot struct.
|
||||
func (b *Bot) Subscribe(streams []string) (*http.Response, error) {
|
||||
if streams == nil {
|
||||
streams = b.Streams
|
||||
}
|
||||
|
||||
var toSubStreams []map[string]string
|
||||
for _, name := range streams {
|
||||
toSubStreams = append(toSubStreams, map[string]string{"name": name})
|
||||
}
|
||||
|
||||
bodyBts, err := json.Marshal(toSubStreams)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
body := "subscriptions=" + string(bodyBts)
|
||||
|
||||
req, err := b.constructRequest("POST", "users/me/subscriptions", body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return b.Client.Do(req)
|
||||
}
|
||||
|
||||
// Unsubscribe will remove the bot from the given streams.
|
||||
// If no streams are given, nothing will happen and the function will error.
|
||||
func (b *Bot) Unsubscribe(streams []string) (*http.Response, error) {
|
||||
if len(streams) == 0 {
|
||||
return nil, fmt.Errorf("No streams were provided")
|
||||
}
|
||||
|
||||
body := `delete=["` + strings.Join(streams, `","`) + `"]`
|
||||
|
||||
req, err := b.constructRequest("PATCH", "users/me/subscriptions", body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return b.Client.Do(req)
|
||||
}
|
||||
|
||||
func (b *Bot) ListSubscriptions() (*http.Response, error) {
|
||||
req, err := b.constructRequest("GET", "users/me/subscriptions", "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return b.Client.Do(req)
|
||||
}
|
||||
|
||||
type EventType string
|
||||
|
||||
const (
|
||||
Messages EventType = "messages"
|
||||
Subscriptions EventType = "subscriptions"
|
||||
RealmUser EventType = "realm_user"
|
||||
Pointer EventType = "pointer"
|
||||
)
|
||||
|
||||
type Narrow string
|
||||
|
||||
const (
|
||||
NarrowPrivate Narrow = `[["is", "private"]]`
|
||||
NarrowAt Narrow = `[["is", "mentioned"]]`
|
||||
)
|
||||
|
||||
// RegisterEvents adds a queue to the bot. It includes the EventTypes and
|
||||
// Narrow given. If neither is given, it will default to all Messages.
|
||||
func (b *Bot) RegisterEvents(ets []EventType, n Narrow) (*Queue, error) {
|
||||
resp, err := b.RawRegisterEvents(ets, n)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
q := &Queue{Bot: b}
|
||||
err = json.Unmarshal(body, q)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if q.LastEventID < q.MaxMessageID {
|
||||
q.LastEventID = q.MaxMessageID
|
||||
}
|
||||
|
||||
b.Queues = append(b.Queues, q)
|
||||
|
||||
return q, nil
|
||||
}
|
||||
|
||||
func (b *Bot) RegisterAll() (*Queue, error) {
|
||||
return b.RegisterEvents(nil, "")
|
||||
}
|
||||
|
||||
func (b *Bot) RegisterAt() (*Queue, error) {
|
||||
return b.RegisterEvents(nil, NarrowAt)
|
||||
}
|
||||
|
||||
func (b *Bot) RegisterPrivate() (*Queue, error) {
|
||||
return b.RegisterEvents(nil, NarrowPrivate)
|
||||
}
|
||||
|
||||
func (b *Bot) RegisterSubscriptions() (*Queue, error) {
|
||||
events := []EventType{Subscriptions}
|
||||
return b.RegisterEvents(events, "")
|
||||
}
|
||||
|
||||
// RawRegisterEvents tells Zulip to include message events in the bots events queue.
|
||||
// Passing nil as the slice of EventType will default to receiving Messages
|
||||
func (b *Bot) RawRegisterEvents(ets []EventType, n Narrow) (*http.Response, error) {
|
||||
// default to Messages if no EventTypes given
|
||||
query := `event_types=["message"]`
|
||||
|
||||
if len(ets) != 0 {
|
||||
query = `event_types=["`
|
||||
for i, s := range ets {
|
||||
query += fmt.Sprintf("%s", s)
|
||||
if i != len(ets)-1 {
|
||||
query += `", "`
|
||||
}
|
||||
}
|
||||
query += `"]`
|
||||
}
|
||||
|
||||
if n != "" {
|
||||
query += fmt.Sprintf("&narrow=%s", n)
|
||||
}
|
||||
query += fmt.Sprintf("&all_public_streams=true")
|
||||
req, err := b.constructRequest("POST", "register", query)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return b.Client.Do(req)
|
||||
}
|
||||
|
||||
// constructRequest makes a zulip request and ensures the proper headers are set.
|
||||
func (b *Bot) constructRequest(method, endpoint, body string) (*http.Request, error) {
|
||||
url := b.APIURL + endpoint
|
||||
req, err := http.NewRequest(method, url, strings.NewReader(body))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
|
||||
req.SetBasicAuth(b.Email, b.APIKey)
|
||||
|
||||
return req, nil
|
||||
}
|
32
vendor/github.com/matterbridge/gozulipbot/flag.go
generated
vendored
Normal file
32
vendor/github.com/matterbridge/gozulipbot/flag.go
generated
vendored
Normal file
@ -0,0 +1,32 @@
|
||||
package gozulipbot
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
func (b *Bot) GetConfigFromFlags() error {
|
||||
var (
|
||||
apiKey = flag.String("apikey", "", "bot api key")
|
||||
apiURL = flag.String("apiurl", "", "url of zulip server")
|
||||
email = flag.String("email", "", "bot email address")
|
||||
backoff = flag.Duration("backoff", 1*time.Second, "backoff base duration")
|
||||
)
|
||||
flag.Parse()
|
||||
|
||||
if *apiKey == "" {
|
||||
return fmt.Errorf("--apikey is required")
|
||||
}
|
||||
if *apiURL == "" {
|
||||
return fmt.Errorf("--apiurl is required")
|
||||
}
|
||||
if *email == "" {
|
||||
return fmt.Errorf("--email is required")
|
||||
}
|
||||
b.APIKey = *apiKey
|
||||
b.APIURL = *apiURL
|
||||
b.Email = *email
|
||||
b.Backoff = *backoff
|
||||
return nil
|
||||
}
|
263
vendor/github.com/matterbridge/gozulipbot/message.go
generated
vendored
Normal file
263
vendor/github.com/matterbridge/gozulipbot/message.go
generated
vendored
Normal file
@ -0,0 +1,263 @@
|
||||
package gozulipbot
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
)
|
||||
|
||||
// A Message is all of the necessary metadata to post on Zulip.
|
||||
// It can be either a public message, where Topic is set, or a private message,
|
||||
// where there is at least one element in Emails.
|
||||
//
|
||||
// If the length of Emails is not 0, functions will always assume it is a private message.
|
||||
type Message struct {
|
||||
Stream string
|
||||
Topic string
|
||||
Emails []string
|
||||
Content string
|
||||
}
|
||||
|
||||
type EventMessage struct {
|
||||
AvatarURL string `json:"avatar_url"`
|
||||
Client string `json:"client"`
|
||||
Content string `json:"content"`
|
||||
ContentType string `json:"content_type"`
|
||||
DisplayRecipient DisplayRecipient `json:"display_recipient"`
|
||||
GravatarHash string `json:"gravatar_hash"`
|
||||
ID int `json:"id"`
|
||||
RecipientID int `json:"recipient_id"`
|
||||
SenderDomain string `json:"sender_domain"`
|
||||
SenderEmail string `json:"sender_email"`
|
||||
SenderFullName string `json:"sender_full_name"`
|
||||
SenderID int `json:"sender_id"`
|
||||
SenderShortName string `json:"sender_short_name"`
|
||||
Subject string `json:"subject"`
|
||||
SubjectLinks []interface{} `json:"subject_links"`
|
||||
StreamID int `json:"stream_id"`
|
||||
Timestamp int `json:"timestamp"`
|
||||
Type string `json:"type"`
|
||||
Queue *Queue `json:"-"`
|
||||
}
|
||||
|
||||
type DisplayRecipient struct {
|
||||
Users []User `json:"users,omitempty"`
|
||||
Topic string `json:"topic,omitempty"`
|
||||
}
|
||||
|
||||
type User struct {
|
||||
Domain string `json:"domain"`
|
||||
Email string `json:"email"`
|
||||
FullName string `json:"full_name"`
|
||||
ID int `json:"id"`
|
||||
IsMirrorDummy bool `json:"is_mirror_dummy"`
|
||||
ShortName string `json:"short_name"`
|
||||
}
|
||||
|
||||
func (d *DisplayRecipient) UnmarshalJSON(b []byte) (err error) {
|
||||
topic, users := "", make([]User, 1)
|
||||
if err = json.Unmarshal(b, &topic); err == nil {
|
||||
d.Topic = topic
|
||||
return
|
||||
}
|
||||
if err = json.Unmarshal(b, &users); err == nil {
|
||||
d.Users = users
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Message posts a message to Zulip. If any emails have been set on the message,
|
||||
// the message will be re-routed to the PrivateMessage function.
|
||||
func (b *Bot) Message(m Message) (*http.Response, error) {
|
||||
if m.Content == "" {
|
||||
return nil, fmt.Errorf("content cannot be empty")
|
||||
}
|
||||
|
||||
// if any emails are set, this is a private message
|
||||
if len(m.Emails) != 0 {
|
||||
return b.PrivateMessage(m)
|
||||
}
|
||||
|
||||
// otherwise it's a stream message
|
||||
if m.Stream == "" {
|
||||
return nil, fmt.Errorf("stream cannot be empty")
|
||||
}
|
||||
if m.Topic == "" {
|
||||
return nil, fmt.Errorf("topic cannot be empty")
|
||||
}
|
||||
req, err := b.constructMessageRequest(m)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b.Client.Do(req)
|
||||
}
|
||||
|
||||
// PrivateMessage sends a message to the users in the message email slice.
|
||||
func (b *Bot) PrivateMessage(m Message) (*http.Response, error) {
|
||||
if len(m.Emails) == 0 {
|
||||
return nil, fmt.Errorf("there must be at least one recipient")
|
||||
}
|
||||
req, err := b.constructMessageRequest(m)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b.Client.Do(req)
|
||||
}
|
||||
|
||||
// Respond sends a given message as a response to whatever context from which
|
||||
// an EventMessage was received.
|
||||
func (b *Bot) Respond(e EventMessage, response string) (*http.Response, error) {
|
||||
if response == "" {
|
||||
return nil, fmt.Errorf("Message response cannot be blank")
|
||||
}
|
||||
m := Message{
|
||||
Stream: e.DisplayRecipient.Topic,
|
||||
Topic: e.Subject,
|
||||
Content: response,
|
||||
}
|
||||
if m.Topic != "" {
|
||||
return b.Message(m)
|
||||
}
|
||||
// private message
|
||||
if m.Stream == "" {
|
||||
emails, err := b.privateResponseList(e)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
m.Emails = emails
|
||||
return b.Message(m)
|
||||
}
|
||||
return nil, fmt.Errorf("EventMessage is not understood: %v\n", e)
|
||||
}
|
||||
|
||||
// privateResponseList gets the list of other users in a private multiple
|
||||
// message conversation.
|
||||
func (b *Bot) privateResponseList(e EventMessage) ([]string, error) {
|
||||
var out []string
|
||||
for _, u := range e.DisplayRecipient.Users {
|
||||
if u.Email != b.Email {
|
||||
out = append(out, u.Email)
|
||||
}
|
||||
}
|
||||
if len(out) == 0 {
|
||||
return nil, fmt.Errorf("EventMessage had no Users within the DisplayRecipient")
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// constructMessageRequest is a helper for simplifying sending a message.
|
||||
func (b *Bot) constructMessageRequest(m Message) (*http.Request, error) {
|
||||
to := m.Stream
|
||||
mtype := "stream"
|
||||
|
||||
le := len(m.Emails)
|
||||
if le != 0 {
|
||||
mtype = "private"
|
||||
}
|
||||
if le == 1 {
|
||||
to = m.Emails[0]
|
||||
}
|
||||
if le > 1 {
|
||||
to = ""
|
||||
for i, e := range m.Emails {
|
||||
to += e
|
||||
if i != le-1 {
|
||||
to += ","
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
values := url.Values{}
|
||||
values.Set("type", mtype)
|
||||
values.Set("to", to)
|
||||
values.Set("content", m.Content)
|
||||
if mtype == "stream" {
|
||||
values.Set("subject", m.Topic)
|
||||
}
|
||||
|
||||
return b.constructRequest("POST", "messages", values.Encode())
|
||||
}
|
||||
|
||||
func (b *Bot) UpdateMessage(id string, content string) (*http.Response, error) {
|
||||
//mid, _ := strconv.Atoi(id)
|
||||
values := url.Values{}
|
||||
values.Set("content", content)
|
||||
req, err := b.constructRequest("PATCH", "messages/"+id, values.Encode())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b.Client.Do(req)
|
||||
}
|
||||
|
||||
// React adds an emoji reaction to an EventMessage.
|
||||
func (b *Bot) React(e EventMessage, emoji string) (*http.Response, error) {
|
||||
url := fmt.Sprintf("messages/%d/emoji_reactions/%s", e.ID, emoji)
|
||||
req, err := b.constructRequest("PUT", url, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b.Client.Do(req)
|
||||
}
|
||||
|
||||
// Unreact removes an emoji reaction from an EventMessage.
|
||||
func (b *Bot) Unreact(e EventMessage, emoji string) (*http.Response, error) {
|
||||
url := fmt.Sprintf("messages/%d/emoji_reactions/%s", e.ID, emoji)
|
||||
req, err := b.constructRequest("DELETE", url, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b.Client.Do(req)
|
||||
}
|
||||
|
||||
type Emoji struct {
|
||||
Author string `json:"author"`
|
||||
DisplayURL string `json:"display_url"`
|
||||
SourceURL string `json:"source_url"`
|
||||
}
|
||||
|
||||
type EmojiResponse struct {
|
||||
Emoji map[string]*Emoji `json:"emoji"`
|
||||
Msg string `json:"msg"`
|
||||
Result string `json:"result"`
|
||||
}
|
||||
|
||||
// RealmEmoji gets the custom emoji information for the Zulip instance.
|
||||
func (b *Bot) RealmEmoji() (map[string]*Emoji, error) {
|
||||
req, err := b.constructRequest("GET", "realm/emoji", "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp, err := b.Client.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var emjResp EmojiResponse
|
||||
err = json.Unmarshal(body, &emjResp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return emjResp.Emoji, nil
|
||||
}
|
||||
|
||||
// RealmEmojiSet makes a set of the names of the custom emoji in the Zulip instance.
|
||||
func (b *Bot) RealmEmojiSet() (map[string]struct{}, error) {
|
||||
emj, err := b.RealmEmoji()
|
||||
if err != nil {
|
||||
return nil, nil
|
||||
}
|
||||
out := map[string]struct{}{}
|
||||
for k, _ := range emj {
|
||||
out[k] = struct{}{}
|
||||
}
|
||||
return out, nil
|
||||
}
|
203
vendor/github.com/matterbridge/gozulipbot/queue.go
generated
vendored
Normal file
203
vendor/github.com/matterbridge/gozulipbot/queue.go
generated
vendored
Normal file
@ -0,0 +1,203 @@
|
||||
package gozulipbot
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
HeartbeatError = fmt.Errorf("EventMessage is a heartbeat")
|
||||
UnauthorizedError = fmt.Errorf("Request is unauthorized")
|
||||
BackoffError = fmt.Errorf("Too many requests")
|
||||
UnknownError = fmt.Errorf("Error was unknown")
|
||||
)
|
||||
|
||||
type Queue struct {
|
||||
ID string `json:"queue_id"`
|
||||
LastEventID int `json:"last_event_id"`
|
||||
MaxMessageID int `json:"max_message_id"`
|
||||
Bot *Bot `json:"-"`
|
||||
}
|
||||
|
||||
func (q *Queue) EventsChan() (chan EventMessage, func()) {
|
||||
end := false
|
||||
endFunc := func() {
|
||||
end = true
|
||||
}
|
||||
|
||||
out := make(chan EventMessage)
|
||||
go func() {
|
||||
defer close(out)
|
||||
for {
|
||||
backoffTime := time.Now().Add(q.Bot.Backoff * time.Duration(math.Pow10(int(atomic.LoadInt64(&q.Bot.Retries)))))
|
||||
minTime := time.Now().Add(q.Bot.Backoff)
|
||||
if end {
|
||||
return
|
||||
}
|
||||
ems, err := q.GetEvents()
|
||||
switch {
|
||||
case err == HeartbeatError:
|
||||
time.Sleep(time.Until(minTime))
|
||||
continue
|
||||
case err == BackoffError:
|
||||
time.Sleep(time.Until(backoffTime))
|
||||
atomic.AddInt64(&q.Bot.Retries, 1)
|
||||
case err == UnauthorizedError:
|
||||
// TODO? have error channel when ending the continuously running process?
|
||||
return
|
||||
default:
|
||||
atomic.StoreInt64(&q.Bot.Retries, 0)
|
||||
}
|
||||
if err != nil {
|
||||
// TODO: handle unknown error
|
||||
// For now, handle this like an UnauthorizedError and end the func.
|
||||
return
|
||||
}
|
||||
for _, em := range ems {
|
||||
out <- em
|
||||
}
|
||||
// Always make sure we wait the minimum time before asking again.
|
||||
time.Sleep(time.Until(minTime))
|
||||
}
|
||||
}()
|
||||
|
||||
return out, endFunc
|
||||
}
|
||||
|
||||
// EventsCallback will repeatedly call provided callback function with
|
||||
// the output of continual queue.GetEvents calls.
|
||||
// It returns a function which can be called to end the calls.
|
||||
//
|
||||
// It will end early if it receives an UnauthorizedError, or an unknown error.
|
||||
// Note, it will never return a HeartbeatError.
|
||||
func (q *Queue) EventsCallback(fn func(EventMessage, error)) func() {
|
||||
end := false
|
||||
endFunc := func() {
|
||||
end = true
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
backoffTime := time.Now().Add(q.Bot.Backoff * time.Duration(math.Pow10(int(atomic.LoadInt64(&q.Bot.Retries)))))
|
||||
minTime := time.Now().Add(q.Bot.Backoff)
|
||||
if end {
|
||||
return
|
||||
}
|
||||
ems, err := q.GetEvents()
|
||||
switch {
|
||||
case err == HeartbeatError:
|
||||
time.Sleep(time.Until(minTime))
|
||||
continue
|
||||
case err == BackoffError:
|
||||
time.Sleep(time.Until(backoffTime))
|
||||
atomic.AddInt64(&q.Bot.Retries, 1)
|
||||
case err == UnauthorizedError:
|
||||
// TODO? have error channel when ending the continuously running process?
|
||||
return
|
||||
default:
|
||||
atomic.StoreInt64(&q.Bot.Retries, 0)
|
||||
}
|
||||
if err != nil {
|
||||
// TODO: handle unknown error
|
||||
// For now, handle this like an UnauthorizedError and end the func.
|
||||
return
|
||||
}
|
||||
for _, em := range ems {
|
||||
fn(em, err)
|
||||
}
|
||||
// Always make sure we wait the minimum time before asking again.
|
||||
time.Sleep(time.Until(minTime))
|
||||
}
|
||||
}()
|
||||
|
||||
return endFunc
|
||||
}
|
||||
|
||||
// GetEvents is a blocking call that waits for and parses a list of EventMessages.
|
||||
// There will usually only be one EventMessage returned.
|
||||
// When a heartbeat is returned, GetEvents will return a HeartbeatError.
|
||||
// When an http status code above 400 is returned, one of a BackoffError,
|
||||
// UnauthorizedError, or UnknownError will be returned.
|
||||
func (q *Queue) GetEvents() ([]EventMessage, error) {
|
||||
resp, err := q.RawGetEvents()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
switch {
|
||||
case resp.StatusCode == 429:
|
||||
return nil, BackoffError
|
||||
case resp.StatusCode == 403:
|
||||
return nil, UnauthorizedError
|
||||
case resp.StatusCode >= 400:
|
||||
return nil, UnknownError
|
||||
}
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
msgs, err := q.ParseEventMessages(body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return msgs, nil
|
||||
}
|
||||
|
||||
// RawGetEvents is a blocking call that receives a response containing a list
|
||||
// of events (a.k.a. received messages) since the last message id in the queue.
|
||||
func (q *Queue) RawGetEvents() (*http.Response, error) {
|
||||
values := url.Values{}
|
||||
values.Set("queue_id", q.ID)
|
||||
values.Set("last_event_id", strconv.Itoa(q.LastEventID))
|
||||
|
||||
url := "events?" + values.Encode()
|
||||
|
||||
req, err := q.Bot.constructRequest("GET", url, "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return q.Bot.Client.Do(req)
|
||||
}
|
||||
|
||||
func (q *Queue) ParseEventMessages(rawEventResponse []byte) ([]EventMessage, error) {
|
||||
rawResponse := map[string]json.RawMessage{}
|
||||
err := json.Unmarshal(rawEventResponse, &rawResponse)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
events := []map[string]json.RawMessage{}
|
||||
err = json.Unmarshal(rawResponse["events"], &events)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
messages := []EventMessage{}
|
||||
for _, event := range events {
|
||||
// if the event is a heartbeat, return a special error
|
||||
if string(event["type"]) == `"heartbeat"` {
|
||||
return nil, HeartbeatError
|
||||
}
|
||||
var msg EventMessage
|
||||
err = json.Unmarshal(event["message"], &msg)
|
||||
// TODO? should this check be here
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
msg.Queue = q
|
||||
messages = append(messages, msg)
|
||||
}
|
||||
|
||||
return messages, nil
|
||||
}
|
8
vendor/manifest
vendored
8
vendor/manifest
vendored
@ -405,6 +405,14 @@
|
||||
"branch": "work",
|
||||
"notests": true
|
||||
},
|
||||
{
|
||||
"importpath": "github.com/matterbridge/gozulipbot",
|
||||
"repository": "https://github.com/matterbridge/gozulipbot",
|
||||
"vcs": "git",
|
||||
"revision": "b6bb12d33544893aa68904652704cf1a86ea3d18",
|
||||
"branch": "work",
|
||||
"notests": true
|
||||
},
|
||||
{
|
||||
"importpath": "github.com/mattermost/mattermost-server/einterfaces",
|
||||
"repository": "https://github.com/mattermost/mattermost-server",
|
||||
|
Loading…
x
Reference in New Issue
Block a user