cr-connect-workflow/crc/services/ldap_service.py

109 lines
4.6 KiB
Python
Raw Normal View History

2021-02-11 16:17:21 +00:00
import logging
import os
2021-02-11 16:17:21 +00:00
import ssl
from os import path
from attr import asdict
2020-05-29 19:17:51 +00:00
from ldap3.core.exceptions import LDAPExceptionError
from crc import app, db
2021-02-11 16:17:21 +00:00
from ldap3 import Connection, Server, MOCK_SYNC, RESTARTABLE, SASL, DIGEST_MD5, ALL, Tls, EXTERNAL, SYNC
from crc.api.common import ApiError
from crc.models.ldap import LdapModel, LdapSchema
class LdapService(object):
search_base = "ou=People,o=University of Virginia,c=US"
attributes = ['uid', 'cn', 'sn', 'displayName', 'givenName', 'mail', 'objectClass', 'UvaDisplayDepartment',
'telephoneNumber', 'title', 'uvaPersonIAMAffiliation', 'uvaPersonSponsoredType']
uid_search_string = "(&(objectclass=person)(uid=%s))"
2020-05-29 19:17:51 +00:00
user_or_last_name_search = "(&(objectclass=person)(|(uid=%s*)(sn=%s*)))"
cn_single_search = '(&(objectclass=person)(cn=%s*))'
cn_double_search = '(&(objectclass=person)(&(cn=%s*)(cn=*%s*)))'
temp_cache = {}
conn = None
@staticmethod
def __get_conn():
if not LdapService.conn:
2021-02-11 16:17:21 +00:00
if app.config['LDAP_URL'] == 'mock':
server = Server('my_fake_server')
conn = Connection(server, client_strategy=MOCK_SYNC)
file_path = os.path.abspath(os.path.join(app.root_path, '..', 'tests', 'data', 'ldap_response.json'))
conn.strategy.entries_from_json(file_path)
2020-06-04 19:38:45 +00:00
conn.bind()
2021-02-11 16:17:21 +00:00
elif "LDAP_USER" in app.config and app.config['LDAP_USER'].strip() != '':
server = Server(host=app.config['LDAP_URL'], use_ssl=True)
conn = Connection(server, auto_bind=True,
user=app.config['LDAP_USER'],
password=app.config['LDAP_PASS'],
receive_timeout=app.config['LDAP_TIMEOUT_SEC'],
client_strategy=RESTARTABLE)
else:
server = Server(app.config['LDAP_URL'], connect_timeout=app.config['LDAP_TIMEOUT_SEC'])
2020-06-04 19:38:45 +00:00
conn = Connection(server, auto_bind=True,
2021-02-11 16:17:21 +00:00
receive_timeout=app.config['LDAP_TIMEOUT_SEC'],
client_strategy=RESTARTABLE)
LdapService.conn = conn
return LdapService.conn
2021-02-24 17:05:06 +00:00
@staticmethod
def user_exists(uva_uid):
try:
x = LdapService.user_info(uva_uid)
except:
return False
return True
@staticmethod
def user_info(uva_uid):
uva_uid = uva_uid.lower()
user_info = db.session.query(LdapModel).filter(LdapModel.uid == uva_uid).first()
if not user_info:
app.logger.info("No cache for " + uva_uid)
search_string = LdapService.uid_search_string % uva_uid
conn = LdapService.__get_conn()
conn.search(LdapService.search_base, search_string, attributes=LdapService.attributes)
if len(conn.entries) < 1:
raise ApiError("missing_ldap_record", "Unable to locate a user with id %s in LDAP" % uva_uid)
entry = conn.entries[0]
user_info = LdapModel.from_entry(entry)
db.session.add(user_info)
db.session.commit()
return user_info
@staticmethod
def search_users(query, limit):
2020-05-29 19:17:51 +00:00
if len(query.strip()) < 3:
return []
elif query.endswith(' '):
search_string = LdapService.cn_single_search % (query.strip())
elif query.strip().count(',') == 1:
f, l = query.split(",")
search_string = LdapService.cn_double_search % (l.strip(), f.strip())
elif query.strip().count(' ') == 1:
f,l = query.split(" ")
search_string = LdapService.cn_double_search % (f, l)
else:
# Search by user_id or last name
search_string = LdapService.user_or_last_name_search % (query, query)
results = []
2020-06-05 02:37:28 +00:00
app.logger.info(search_string)
2020-05-29 19:17:51 +00:00
try:
conn = LdapService.__get_conn()
conn.search(LdapService.search_base, search_string, attributes=LdapService.attributes)
2020-05-29 19:17:51 +00:00
# Entries are returned as a generator, accessing entries
# can make subsequent calls to the ldap service, so limit
# those here.
count = 0
for entry in conn.entries:
2020-05-29 19:17:51 +00:00
if count > limit:
break
results.append(LdapSchema().dump(LdapModel.from_entry(entry)))
2020-05-29 19:17:51 +00:00
count += 1
except LDAPExceptionError as le:
app.logger.info("Failed to execute ldap search. %s", str(le))
return results