mirror of
https://github.com/status-im/matterbridge.git
synced 2025-01-12 23:34:24 +00:00
e3ffbcadd8
* zulip: Treat unknown errors with a 10-second backoff. An unknown error (including an unauthorized error) would fall through with no calls to time.Sleep, resulting in hammering the server as quickly as possible. Add a 10-second sleep in the default error case. The heartbeat is left with no explicit sleep, but all other codepaths now contain one. * version: Move version information into a separate package. This will allow it to be accessed by other sections of the code. * zulip: Use the matterbridge version in the user-agent. Co-authored-by: Wim <wim@42.be>
217 lines
4.7 KiB
Go
217 lines
4.7 KiB
Go
package bzulip
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/42wim/matterbridge/bridge"
|
|
"github.com/42wim/matterbridge/bridge/config"
|
|
"github.com/42wim/matterbridge/bridge/helper"
|
|
"github.com/42wim/matterbridge/version"
|
|
gzb "github.com/matterbridge/gozulipbot"
|
|
)
|
|
|
|
type Bzulip struct {
|
|
q *gzb.Queue
|
|
bot *gzb.Bot
|
|
streams map[int]string
|
|
*bridge.Config
|
|
sync.RWMutex
|
|
}
|
|
|
|
func New(cfg *bridge.Config) bridge.Bridger {
|
|
return &Bzulip{Config: cfg, streams: make(map[int]string)}
|
|
}
|
|
|
|
func (b *Bzulip) Connect() error {
|
|
bot := gzb.Bot{APIKey: b.GetString("token"), APIURL: b.GetString("server") + "/api/v1/", Email: b.GetString("login"), UserAgent: fmt.Sprintf("matterbridge/%s", version.Release)}
|
|
bot.Init()
|
|
q, err := bot.RegisterAll()
|
|
b.q = q
|
|
b.bot = &bot
|
|
if err != nil {
|
|
b.Log.Errorf("Connect() %#v", err)
|
|
return err
|
|
}
|
|
// init stream
|
|
b.getChannel(0)
|
|
b.Log.Info("Connection succeeded")
|
|
go b.handleQueue()
|
|
return nil
|
|
}
|
|
|
|
func (b *Bzulip) Disconnect() error {
|
|
return nil
|
|
}
|
|
|
|
func (b *Bzulip) JoinChannel(channel config.ChannelInfo) error {
|
|
return nil
|
|
}
|
|
|
|
func (b *Bzulip) Send(msg config.Message) (string, error) {
|
|
b.Log.Debugf("=> Receiving %#v", msg)
|
|
|
|
// Delete message
|
|
if msg.Event == config.EventMsgDelete {
|
|
if msg.ID == "" {
|
|
return "", nil
|
|
}
|
|
_, err := b.bot.UpdateMessage(msg.ID, "")
|
|
return "", err
|
|
}
|
|
|
|
// Upload a file if it exists
|
|
if msg.Extra != nil {
|
|
for _, rmsg := range helper.HandleExtra(&msg, b.General) {
|
|
b.sendMessage(rmsg)
|
|
}
|
|
if len(msg.Extra["file"]) > 0 {
|
|
return b.handleUploadFile(&msg)
|
|
}
|
|
}
|
|
|
|
// edit the message if we have a msg ID
|
|
if msg.ID != "" {
|
|
_, err := b.bot.UpdateMessage(msg.ID, msg.Username+msg.Text)
|
|
return "", err
|
|
}
|
|
|
|
// Post normal message
|
|
return b.sendMessage(msg)
|
|
}
|
|
|
|
func (b *Bzulip) getChannel(id int) string {
|
|
if name, ok := b.streams[id]; ok {
|
|
return name
|
|
}
|
|
streams, err := b.bot.GetRawStreams()
|
|
if err != nil {
|
|
b.Log.Errorf("getChannel: %#v", err)
|
|
return ""
|
|
}
|
|
for _, stream := range streams.Streams {
|
|
b.streams[stream.StreamID] = stream.Name
|
|
}
|
|
if name, ok := b.streams[id]; ok {
|
|
return name
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func (b *Bzulip) handleQueue() error {
|
|
for {
|
|
messages, err := b.q.GetEvents()
|
|
switch err {
|
|
case gzb.BackoffError:
|
|
time.Sleep(time.Second * 5)
|
|
case gzb.NoJSONError:
|
|
b.Log.Error("Response wasn't JSON, server down or restarting? sleeping 10 seconds")
|
|
time.Sleep(time.Second * 10)
|
|
case gzb.BadEventQueueError:
|
|
b.Log.Info("got a bad event queue id error, reconnecting")
|
|
b.bot.Queues = nil
|
|
for {
|
|
b.q, err = b.bot.RegisterAll()
|
|
if err != nil {
|
|
b.Log.Errorf("reconnecting failed: %s. Sleeping 10 seconds", err)
|
|
time.Sleep(time.Second * 10)
|
|
}
|
|
break
|
|
}
|
|
case gzb.HeartbeatError:
|
|
b.Log.Debug("heartbeat received.")
|
|
default:
|
|
b.Log.Debugf("receiving error: %#v", err)
|
|
time.Sleep(time.Second * 10)
|
|
}
|
|
if err != nil {
|
|
continue
|
|
}
|
|
for _, m := range messages {
|
|
b.Log.Debugf("== Receiving %#v", m)
|
|
// ignore our own messages
|
|
if m.SenderEmail == b.GetString("login") {
|
|
continue
|
|
}
|
|
|
|
avatarURL := m.AvatarURL
|
|
if !strings.HasPrefix(avatarURL, "http") {
|
|
avatarURL = b.GetString("server") + avatarURL
|
|
}
|
|
|
|
rmsg := config.Message{
|
|
Username: m.SenderFullName,
|
|
Text: m.Content,
|
|
Channel: b.getChannel(m.StreamID) + "/topic:" + m.Subject,
|
|
Account: b.Account,
|
|
UserID: strconv.Itoa(m.SenderID),
|
|
Avatar: avatarURL,
|
|
}
|
|
b.Log.Debugf("<= Sending message from %s on %s to gateway", rmsg.Username, b.Account)
|
|
b.Log.Debugf("<= Message is %#v", rmsg)
|
|
b.Remote <- rmsg
|
|
}
|
|
|
|
time.Sleep(time.Second * 3)
|
|
}
|
|
}
|
|
|
|
func (b *Bzulip) sendMessage(msg config.Message) (string, error) {
|
|
topic := ""
|
|
if strings.Contains(msg.Channel, "/topic:") {
|
|
res := strings.Split(msg.Channel, "/topic:")
|
|
topic = res[1]
|
|
msg.Channel = res[0]
|
|
}
|
|
m := gzb.Message{
|
|
Stream: msg.Channel,
|
|
Topic: topic,
|
|
Content: msg.Username + msg.Text,
|
|
}
|
|
resp, err := b.bot.Message(m)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
if resp != nil {
|
|
defer resp.Body.Close()
|
|
res, err := ioutil.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
var jr struct {
|
|
ID int `json:"id"`
|
|
}
|
|
err = json.Unmarshal(res, &jr)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return strconv.Itoa(jr.ID), nil
|
|
}
|
|
return "", nil
|
|
}
|
|
|
|
func (b *Bzulip) handleUploadFile(msg *config.Message) (string, error) {
|
|
for _, f := range msg.Extra["file"] {
|
|
fi := f.(config.FileInfo)
|
|
if fi.Comment != "" {
|
|
msg.Text += fi.Comment + ": "
|
|
}
|
|
if fi.URL != "" {
|
|
msg.Text = fi.URL
|
|
if fi.Comment != "" {
|
|
msg.Text = fi.Comment + ": " + fi.URL
|
|
}
|
|
}
|
|
_, err := b.sendMessage(*msg)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
}
|
|
return "", nil
|
|
}
|