Split-out Slack user and channel management (#762)

This commit is contained in:
Duco van Amstel 2019-03-12 21:52:36 +00:00 committed by Wim
parent eabf2a4582
commit 74699a8262
4 changed files with 239 additions and 230 deletions

View File

@ -158,7 +158,6 @@ linters-settings:
- regexpMust
- singleCaseSwitch
- sloppyLen
- sloppyReassign
- switchTrue
- typeSwitchVar
- typeUnparen

View File

@ -1,7 +1,6 @@
package bslack
import (
"context"
"fmt"
"regexp"
"strings"
@ -9,220 +8,9 @@ import (
"github.com/42wim/matterbridge/bridge/config"
"github.com/nlopes/slack"
"github.com/sirupsen/logrus"
)
func (b *Bslack) getUser(id string) *slack.User {
b.usersMutex.RLock()
user, ok := b.users[id]
b.usersMutex.RUnlock()
if ok {
return user
}
b.populateUser(id)
b.usersMutex.RLock()
defer b.usersMutex.RUnlock()
return b.users[id]
}
func (b *Bslack) getUsername(id string) string {
if user := b.getUser(id); user != nil {
if user.Profile.DisplayName != "" {
return user.Profile.DisplayName
}
return user.Name
}
b.Log.Warnf("Could not find user with ID '%s'", id)
return ""
}
func (b *Bslack) getAvatar(id string) string {
if user := b.getUser(id); user != nil {
return user.Profile.Image48
}
return ""
}
func (b *Bslack) getChannel(channel string) (*slack.Channel, error) {
if strings.HasPrefix(channel, "ID:") {
return b.getChannelByID(strings.TrimPrefix(channel, "ID:"))
}
return b.getChannelByName(channel)
}
func (b *Bslack) getChannelByName(name string) (*slack.Channel, error) {
return b.getChannelBy(name, b.channelsByName)
}
func (b *Bslack) getChannelByID(ID string) (*slack.Channel, error) {
return b.getChannelBy(ID, b.channelsByID)
}
func (b *Bslack) getChannelBy(lookupKey string, lookupMap map[string]*slack.Channel) (*slack.Channel, error) {
b.channelsMutex.RLock()
defer b.channelsMutex.RUnlock()
if channel, ok := lookupMap[lookupKey]; ok {
return channel, nil
}
return nil, fmt.Errorf("%s: channel %s not found", b.Account, lookupKey)
}
const minimumRefreshInterval = 10 * time.Second
func (b *Bslack) populateUser(userID string) {
b.usersMutex.RLock()
_, exists := b.users[userID]
b.usersMutex.RUnlock()
if exists {
// already in cache
return
}
user, err := b.sc.GetUserInfo(userID)
if err != nil {
b.Log.Debugf("GetUserInfo failed for %v: %v", userID, err)
return
}
b.usersMutex.Lock()
b.users[userID] = user
b.usersMutex.Unlock()
}
func (b *Bslack) populateUsers(wait bool) {
b.refreshMutex.Lock()
if !wait && (time.Now().Before(b.earliestUserRefresh) || b.refreshInProgress) {
b.Log.Debugf("Not refreshing user list as it was done less than %v ago.",
minimumRefreshInterval)
b.refreshMutex.Unlock()
return
}
for b.refreshInProgress {
b.refreshMutex.Unlock()
time.Sleep(time.Second)
b.refreshMutex.Lock()
}
b.refreshInProgress = true
b.refreshMutex.Unlock()
newUsers := map[string]*slack.User{}
pagination := b.sc.GetUsersPaginated(slack.GetUsersOptionLimit(200))
count := 0
for {
var err error
pagination, err = pagination.Next(context.Background())
time.Sleep(time.Second)
if err != nil {
if pagination.Done(err) {
break
}
if err = b.handleRateLimit(err); err != nil {
b.Log.Errorf("Could not retrieve users: %#v", err)
return
}
continue
}
for i := range pagination.Users {
newUsers[pagination.Users[i].ID] = &pagination.Users[i]
}
b.Log.Debugf("getting %d users", len(pagination.Users))
count++
// more > 2000 users, slack will complain and ratelimit. break
if count > 10 {
b.Log.Info("Large slack detected > 2000 users, skipping loading complete userlist.")
break
}
}
b.usersMutex.Lock()
defer b.usersMutex.Unlock()
b.users = newUsers
b.refreshMutex.Lock()
defer b.refreshMutex.Unlock()
b.earliestUserRefresh = time.Now().Add(minimumRefreshInterval)
b.refreshInProgress = false
}
func (b *Bslack) populateChannels(wait bool) {
b.refreshMutex.Lock()
if !wait && (time.Now().Before(b.earliestChannelRefresh) || b.refreshInProgress) {
b.Log.Debugf("Not refreshing channel list as it was done less than %v seconds ago.",
minimumRefreshInterval)
b.refreshMutex.Unlock()
return
}
for b.refreshInProgress {
b.refreshMutex.Unlock()
time.Sleep(time.Second)
b.refreshMutex.Lock()
}
b.refreshInProgress = true
b.refreshMutex.Unlock()
newChannelsByID := map[string]*slack.Channel{}
newChannelsByName := map[string]*slack.Channel{}
newChannelMembers := make(map[string][]string)
// We only retrieve public and private channels, not IMs
// and MPIMs as those do not have a channel name.
queryParams := &slack.GetConversationsParameters{
ExcludeArchived: "true",
Types: []string{"public_channel,private_channel"},
}
for {
channels, nextCursor, err := b.sc.GetConversations(queryParams)
if err != nil {
if err = b.handleRateLimit(err); err != nil {
b.Log.Errorf("Could not retrieve channels: %#v", err)
return
}
continue
}
for i := range channels {
newChannelsByID[channels[i].ID] = &channels[i]
newChannelsByName[channels[i].Name] = &channels[i]
// also find all the members in every channel
// comment for now, issues on big slacks
/*
members, err := b.getUsersInConversation(channels[i].ID)
if err != nil {
if err = b.handleRateLimit(err); err != nil {
b.Log.Errorf("Could not retrieve channel members: %#v", err)
return
}
continue
}
newChannelMembers[channels[i].ID] = members
*/
}
if nextCursor == "" {
break
}
queryParams.Cursor = nextCursor
}
b.channelsMutex.Lock()
defer b.channelsMutex.Unlock()
b.channelsByID = newChannelsByID
b.channelsByName = newChannelsByName
b.channelMembersMutex.Lock()
defer b.channelMembersMutex.Unlock()
b.channelMembers = newChannelMembers
b.refreshMutex.Lock()
defer b.refreshMutex.Unlock()
b.earliestChannelRefresh = time.Now().Add(minimumRefreshInterval)
b.refreshInProgress = false
}
// populateReceivedMessage shapes the initial Matterbridge message that we will forward to the
// router before we apply message-dependent modifications.
func (b *Bslack) populateReceivedMessage(ev *slack.MessageEvent) (*config.Message, error) {
@ -315,7 +103,7 @@ func (b *Bslack) populateMessageWithBotInfo(ev *slack.MessageEvent, rmsg *config
break
}
if err = b.handleRateLimit(err); err != nil {
if err = handleRateLimit(b.Log, err); err != nil {
b.Log.Errorf("Could not retrieve bot information: %#v", err)
return err
}
@ -404,16 +192,6 @@ func (b *Bslack) replaceCodeFence(text string) string {
return codeFenceRE.ReplaceAllString(text, "```")
}
func (b *Bslack) handleRateLimit(err error) error {
rateLimit, ok := err.(*slack.RateLimitedError)
if !ok {
return err
}
b.Log.Infof("Rate-limited by Slack. Sleeping for %v", rateLimit.RetryAfter)
time.Sleep(rateLimit.RetryAfter)
return nil
}
// getUsersInConversation returns an array of userIDs that are members of channelID
func (b *Bslack) getUsersInConversation(channelID string) ([]string, error) {
channelMembers := []string{}
@ -424,7 +202,7 @@ func (b *Bslack) getUsersInConversation(channelID string) ([]string, error) {
members, nextCursor, err := b.sc.GetUsersInConversation(queryParams)
if err != nil {
if err = b.handleRateLimit(err); err != nil {
if err = handleRateLimit(b.Log, err); err != nil {
return channelMembers, fmt.Errorf("Could not retrieve users in channels: %#v", err)
}
continue
@ -439,3 +217,13 @@ func (b *Bslack) getUsersInConversation(channelID string) ([]string, error) {
}
return channelMembers, nil
}
func handleRateLimit(log *logrus.Entry, err error) error {
rateLimit, ok := err.(*slack.RateLimitedError)
if !ok {
return err
}
log.Infof("Rate-limited by Slack. Sleeping for %v", rateLimit.RetryAfter)
time.Sleep(rateLimit.RetryAfter)
return nil
}

View File

@ -351,7 +351,7 @@ func (b *Bslack) updateTopicOrPurpose(msg *config.Message, channelInfo *slack.Ch
if err == nil {
return nil
}
if err = b.handleRateLimit(err); err != nil {
if err = handleRateLimit(b.Log, err); err != nil {
return err
}
}
@ -392,7 +392,7 @@ func (b *Bslack) deleteMessage(msg *config.Message, channelInfo *slack.Channel)
return true, nil
}
if err = b.handleRateLimit(err); err != nil {
if err = handleRateLimit(b.Log, err); err != nil {
b.Log.Errorf("Failed to delete user message from Slack: %#v", err)
return true, err
}
@ -411,7 +411,7 @@ func (b *Bslack) editMessage(msg *config.Message, channelInfo *slack.Channel) (b
return true, nil
}
if err = b.handleRateLimit(err); err != nil {
if err = handleRateLimit(b.Log, err); err != nil {
b.Log.Errorf("Failed to edit user message on Slack: %#v", err)
return true, err
}
@ -431,7 +431,7 @@ func (b *Bslack) postMessage(msg *config.Message, channelInfo *slack.Channel) (s
return id, nil
}
if err = b.handleRateLimit(err); err != nil {
if err = handleRateLimit(b.Log, err); err != nil {
b.Log.Errorf("Failed to sent user message to Slack: %#v", err)
return "", err
}

View File

@ -0,0 +1,222 @@
package bslack
import (
"context"
"fmt"
"strings"
"time"
"github.com/nlopes/slack"
)
const minimumRefreshInterval = 10 * time.Second
func (b *Bslack) getUser(id string) *slack.User {
b.usersMutex.RLock()
user, ok := b.users[id]
b.usersMutex.RUnlock()
if ok {
return user
}
b.populateUser(id)
b.usersMutex.RLock()
defer b.usersMutex.RUnlock()
return b.users[id]
}
func (b *Bslack) getUsername(id string) string {
if user := b.getUser(id); user != nil {
if user.Profile.DisplayName != "" {
return user.Profile.DisplayName
}
return user.Name
}
b.Log.Warnf("Could not find user with ID '%s'", id)
return ""
}
func (b *Bslack) getAvatar(id string) string {
if user := b.getUser(id); user != nil {
return user.Profile.Image48
}
return ""
}
func (b *Bslack) populateUser(userID string) {
b.usersMutex.RLock()
_, exists := b.users[userID]
b.usersMutex.RUnlock()
if exists {
// already in cache
return
}
user, err := b.sc.GetUserInfo(userID)
if err != nil {
b.Log.Debugf("GetUserInfo failed for %v: %v", userID, err)
return
}
b.usersMutex.Lock()
b.users[userID] = user
b.usersMutex.Unlock()
}
func (b *Bslack) populateUsers(wait bool) {
b.refreshMutex.Lock()
if !wait && (time.Now().Before(b.earliestUserRefresh) || b.refreshInProgress) {
b.Log.Debugf("Not refreshing user list as it was done less than %v ago.",
minimumRefreshInterval)
b.refreshMutex.Unlock()
return
}
for b.refreshInProgress {
b.refreshMutex.Unlock()
time.Sleep(time.Second)
b.refreshMutex.Lock()
}
b.refreshInProgress = true
b.refreshMutex.Unlock()
newUsers := map[string]*slack.User{}
pagination := b.sc.GetUsersPaginated(slack.GetUsersOptionLimit(200))
count := 0
for {
var err error
pagination, err = pagination.Next(context.Background())
time.Sleep(time.Second)
if err != nil {
if pagination.Done(err) {
break
}
if err = handleRateLimit(b.Log, err); err != nil {
b.Log.Errorf("Could not retrieve users: %#v", err)
return
}
continue
}
for i := range pagination.Users {
newUsers[pagination.Users[i].ID] = &pagination.Users[i]
}
b.Log.Debugf("getting %d users", len(pagination.Users))
count++
// more > 2000 users, slack will complain and ratelimit. break
if count > 10 {
b.Log.Info("Large slack detected > 2000 users, skipping loading complete userlist.")
break
}
}
b.usersMutex.Lock()
defer b.usersMutex.Unlock()
b.users = newUsers
b.refreshMutex.Lock()
defer b.refreshMutex.Unlock()
b.earliestUserRefresh = time.Now().Add(minimumRefreshInterval)
b.refreshInProgress = false
}
func (b *Bslack) getChannel(channel string) (*slack.Channel, error) {
if strings.HasPrefix(channel, "ID:") {
return b.getChannelByID(strings.TrimPrefix(channel, "ID:"))
}
return b.getChannelByName(channel)
}
func (b *Bslack) getChannelByName(name string) (*slack.Channel, error) {
return b.getChannelBy(name, b.channelsByName)
}
func (b *Bslack) getChannelByID(id string) (*slack.Channel, error) {
return b.getChannelBy(id, b.channelsByID)
}
func (b *Bslack) getChannelBy(lookupKey string, lookupMap map[string]*slack.Channel) (*slack.Channel, error) {
b.channelsMutex.RLock()
defer b.channelsMutex.RUnlock()
if channel, ok := lookupMap[lookupKey]; ok {
return channel, nil
}
return nil, fmt.Errorf("%s: channel %s not found", b.Account, lookupKey)
}
func (b *Bslack) populateChannels(wait bool) {
b.refreshMutex.Lock()
if !wait && (time.Now().Before(b.earliestChannelRefresh) || b.refreshInProgress) {
b.Log.Debugf("Not refreshing channel list as it was done less than %v seconds ago.",
minimumRefreshInterval)
b.refreshMutex.Unlock()
return
}
for b.refreshInProgress {
b.refreshMutex.Unlock()
time.Sleep(time.Second)
b.refreshMutex.Lock()
}
b.refreshInProgress = true
b.refreshMutex.Unlock()
newChannelsByID := map[string]*slack.Channel{}
newChannelsByName := map[string]*slack.Channel{}
newChannelMembers := make(map[string][]string)
// We only retrieve public and private channels, not IMs
// and MPIMs as those do not have a channel name.
queryParams := &slack.GetConversationsParameters{
ExcludeArchived: "true",
Types: []string{"public_channel,private_channel"},
}
for {
channels, nextCursor, err := b.sc.GetConversations(queryParams)
if err != nil {
if err = handleRateLimit(b.Log, err); err != nil {
b.Log.Errorf("Could not retrieve channels: %#v", err)
return
}
continue
}
for i := range channels {
newChannelsByID[channels[i].ID] = &channels[i]
newChannelsByName[channels[i].Name] = &channels[i]
// also find all the members in every channel
// comment for now, issues on big slacks
/*
members, err := b.getUsersInConversation(channels[i].ID)
if err != nil {
if err = b.handleRateLimit(err); err != nil {
b.Log.Errorf("Could not retrieve channel members: %#v", err)
return
}
continue
}
newChannelMembers[channels[i].ID] = members
*/
}
if nextCursor == "" {
break
}
queryParams.Cursor = nextCursor
}
b.channelsMutex.Lock()
defer b.channelsMutex.Unlock()
b.channelsByID = newChannelsByID
b.channelsByName = newChannelsByName
b.channelMembersMutex.Lock()
defer b.channelMembersMutex.Unlock()
b.channelMembers = newChannelMembers
b.refreshMutex.Lock()
defer b.refreshMutex.Unlock()
b.earliestChannelRefresh = time.Now().Add(minimumRefreshInterval)
b.refreshInProgress = false
}