2020-07-30 16:40:53 +00:00
|
|
|
from flask import g
|
2020-07-27 18:38:57 +00:00
|
|
|
|
2020-07-30 16:40:53 +00:00
|
|
|
from crc import session
|
2020-07-27 18:38:57 +00:00
|
|
|
from crc.api.common import ApiError
|
2020-07-30 16:40:53 +00:00
|
|
|
from crc.models.user import UserModel, AdminSessionModel
|
2020-07-27 18:38:57 +00:00
|
|
|
|
|
|
|
|
|
|
|
class UserService(object):
|
|
|
|
"""Provides common tools for working with users"""
|
|
|
|
|
2020-07-30 02:47:47 +00:00
|
|
|
# Returns true if the current user is logged in.
|
2020-07-27 18:38:57 +00:00
|
|
|
@staticmethod
|
|
|
|
def has_user():
|
2020-07-30 16:40:53 +00:00
|
|
|
return 'token' in g and \
|
|
|
|
bool(g.token) and \
|
|
|
|
'user' in g and \
|
|
|
|
bool(g.user)
|
2020-07-27 18:38:57 +00:00
|
|
|
|
2020-07-30 02:47:47 +00:00
|
|
|
# Returns true if the current user is an admin.
|
2020-07-27 18:38:57 +00:00
|
|
|
@staticmethod
|
2020-07-30 02:47:47 +00:00
|
|
|
def user_is_admin():
|
|
|
|
return UserService.has_user() and g.user.is_admin()
|
|
|
|
|
|
|
|
# Returns true if the current admin user is impersonating another user.
|
|
|
|
@staticmethod
|
|
|
|
def admin_is_impersonating():
|
2020-07-30 16:40:53 +00:00
|
|
|
if UserService.user_is_admin():
|
|
|
|
adminSession: AdminSessionModel = UserService.get_admin_session()
|
|
|
|
return adminSession is not None
|
|
|
|
|
|
|
|
else:
|
|
|
|
raise ApiError("unauthorized", "You do not have permissions to do this.", status_code=403)
|
2020-07-27 18:38:57 +00:00
|
|
|
|
2020-07-30 02:47:47 +00:00
|
|
|
# Returns true if the given user uid is different from the current user's uid.
|
|
|
|
@staticmethod
|
|
|
|
def is_different_user(uid):
|
|
|
|
return UserService.has_user() and uid is not None and uid is not g.user.uid
|
|
|
|
|
|
|
|
@staticmethod
|
2020-07-30 16:40:53 +00:00
|
|
|
def current_user(allow_admin_impersonate=False) -> UserModel:
|
2020-07-27 18:38:57 +00:00
|
|
|
if not UserService.has_user():
|
|
|
|
raise ApiError("logged_out", "You are no longer logged in.", status_code=401)
|
|
|
|
|
2020-07-30 14:17:02 +00:00
|
|
|
# Admins can pretend to be different users and act on a user's behalf in
|
2020-07-27 18:38:57 +00:00
|
|
|
# some circumstances.
|
2020-07-30 16:40:53 +00:00
|
|
|
if UserService.user_is_admin() and allow_admin_impersonate and UserService.admin_is_impersonating():
|
|
|
|
return UserService.get_admin_session_user()
|
2020-07-27 18:38:57 +00:00
|
|
|
else:
|
|
|
|
return g.user
|
|
|
|
|
2020-07-30 14:17:02 +00:00
|
|
|
# Admins can pretend to be different users and act on a user's behalf in some circumstances.
|
2020-07-30 02:47:47 +00:00
|
|
|
# This method allows an admin user to start impersonating another user with the given uid.
|
|
|
|
# Stops impersonating if the uid is None or invalid.
|
|
|
|
@staticmethod
|
2020-07-30 16:40:53 +00:00
|
|
|
def start_impersonating(uid=None):
|
2020-07-30 02:47:47 +00:00
|
|
|
if not UserService.has_user():
|
|
|
|
raise ApiError("logged_out", "You are no longer logged in.", status_code=401)
|
|
|
|
|
2020-07-30 16:40:53 +00:00
|
|
|
if not UserService.user_is_admin():
|
|
|
|
raise ApiError("unauthorized", "You do not have permissions to do this.", status_code=403)
|
|
|
|
|
|
|
|
if uid is None:
|
|
|
|
raise ApiError("invalid_uid", "Please provide a valid user uid.")
|
|
|
|
|
2020-07-30 02:47:47 +00:00
|
|
|
if not UserService.admin_is_impersonating() and UserService.is_different_user(uid):
|
|
|
|
# Impersonate the user if the given uid is valid.
|
2020-07-30 16:40:53 +00:00
|
|
|
impersonate_user = session.query(UserModel).filter(UserModel.uid == uid).first()
|
2020-07-30 02:47:47 +00:00
|
|
|
|
2020-07-30 16:40:53 +00:00
|
|
|
if impersonate_user is not None:
|
|
|
|
g.impersonate_user = impersonate_user
|
|
|
|
|
|
|
|
# Store the uid and user session token.
|
|
|
|
session.add(AdminSessionModel(token=g.token, admin_impersonate_uid=uid))
|
|
|
|
session.commit()
|
|
|
|
else:
|
|
|
|
raise ApiError("invalid_uid", "The uid provided is not valid.")
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def stop_impersonating():
|
|
|
|
if not UserService.has_user():
|
|
|
|
raise ApiError("logged_out", "You are no longer logged in.", status_code=401)
|
|
|
|
|
|
|
|
# Clear out the current impersonating user.
|
|
|
|
if 'impersonate_user' in g:
|
|
|
|
del g.impersonate_user
|
|
|
|
|
|
|
|
admin_session: AdminSessionModel = UserService.get_admin_session()
|
|
|
|
if admin_session:
|
|
|
|
session.delete(admin_session)
|
|
|
|
session.commit()
|
2020-07-30 14:17:02 +00:00
|
|
|
|
2020-07-27 18:38:57 +00:00
|
|
|
@staticmethod
|
|
|
|
def in_list(uids, allow_admin_impersonate=False):
|
|
|
|
"""Returns true if the current user's id is in the given list of ids. False if there
|
|
|
|
is no user, or the user is not in the list."""
|
|
|
|
if UserService.has_user(): # If someone is logged in, lock tasks that don't belong to them.
|
|
|
|
user = UserService.current_user(allow_admin_impersonate)
|
|
|
|
if user.uid in uids:
|
|
|
|
return True
|
|
|
|
return False
|
2020-07-30 16:40:53 +00:00
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def get_admin_session() -> AdminSessionModel:
|
|
|
|
if UserService.user_is_admin():
|
|
|
|
return session.query(AdminSessionModel).filter(AdminSessionModel.token == g.token).first()
|
|
|
|
else:
|
|
|
|
raise ApiError("unauthorized", "You do not have permissions to do this.", status_code=403)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def get_admin_session_user() -> UserModel:
|
|
|
|
if UserService.user_is_admin():
|
|
|
|
admin_session = UserService.get_admin_session()
|
|
|
|
|
|
|
|
if admin_session is not None:
|
|
|
|
return session.query(UserModel).filter(UserModel.uid == admin_session.admin_impersonate_uid).first()
|
|
|
|
else:
|
|
|
|
raise ApiError("unauthorized", "You do not have permissions to do this.", status_code=403)
|