mirror of
synced 2025-02-23 05:08:32 +00:00
We now cache the LDAP records - so we look in our own database for the record before calling out to ldap for the details when given a straight up computing id like dhf8r. Added "date_approved" to the approval model. And moved the approver and primary investigator into real associated models to make it easier to dump. Fixed a problem with the validation that was causing it to throw incorrect errors on valid workflows. Getting it to behave a little more like the front end behaves, and respecting the read-only fields. But it was mainly to do with always returning all the data with each form submission.
83 lines
3.5 KiB
83 lines
3.5 KiB
import os
from attr import asdict
from ldap3.core.exceptions import LDAPExceptionError
from crc import app, db
from ldap3 import Connection, Server, MOCK_SYNC
from crc.api.common import ApiError
from crc.models.ldap import LdapModel, LdapSchema
class LdapService(object):
search_base = "ou=People,o=University of Virginia,c=US"
attributes = ['uid', 'cn', 'sn', 'displayName', 'givenName', 'mail', 'objectClass', 'UvaDisplayDepartment',
'telephoneNumber', 'title', 'uvaPersonIAMAffiliation', 'uvaPersonSponsoredType']
uid_search_string = "(&(objectclass=person)(uid=%s))"
user_or_last_name_search = "(&(objectclass=person)(|(uid=%s*)(sn=%s*)))"
cn_single_search = '(&(objectclass=person)(cn=%s*))'
cn_double_search = '(&(objectclass=person)(&(cn=%s*)(cn=*%s*)))'
def __init__(self):
if app.config['TESTING']:
server = Server('my_fake_server')
self.conn = Connection(server, client_strategy=MOCK_SYNC)
file_path = os.path.abspath(os.path.join(app.root_path, '..', 'tests', 'data', 'ldap_response.json'))
server = Server(app.config['LDAP_URL'], connect_timeout=app.config['LDAP_TIMEOUT_SEC'])
self.conn = Connection(server,
def __del__(self):
if self.conn:
def user_info(self, uva_uid):
user_info = db.session.query(LdapModel).filter(LdapModel.uid == uva_uid).first()
if not user_info:
search_string = LdapService.uid_search_string % uva_uid
self.conn.search(LdapService.search_base, search_string, attributes=LdapService.attributes)
if len(self.conn.entries) < 1:
raise ApiError("missing_ldap_record", "Unable to locate a user with id %s in LDAP" % uva_uid)
entry = self.conn.entries[0]
user_info = LdapModel.from_entry(entry)
return user_info
def search_users(self, query, limit):
if len(query.strip()) < 3:
return []
elif query.endswith(' '):
search_string = LdapService.cn_single_search % (query.strip())
elif query.strip().count(',') == 1:
f, l = query.split(",")
search_string = LdapService.cn_double_search % (l.strip(), f.strip())
elif query.strip().count(' ') == 1:
f,l = query.split(" ")
search_string = LdapService.cn_double_search % (f, l)
# Search by user_id or last name
search_string = LdapService.user_or_last_name_search % (query, query)
results = []
self.conn.search(LdapService.search_base, search_string, attributes=LdapService.attributes)
# Entries are returned as a generator, accessing entries
# can make subsequent calls to the ldap service, so limit
# those here.
count = 0
for entry in self.conn.entries:
if count > limit:
count += 1
except LDAPExceptionError as le:
app.logger.info("Failed to execute ldap search. %s", str(le))
return results