Better ldap searching.

This commit is contained in:
Dan Funk 2020-05-29 15:17:51 -04:00
parent 079a0c1feb
commit 4e4cc7884c
2 changed files with 36 additions and 14 deletions

View File

@ -1,5 +1,7 @@
import os
from ldap3.core.exceptions import LDAPExceptionError
from crc import app
from ldap3 import Connection, Server, MOCK_SYNC
@ -38,7 +40,9 @@ class LdapService(object):
attributes = ['uid', 'cn', 'sn', 'displayName', 'givenName', 'mail', 'objectClass', 'UvaDisplayDepartment',
'telephoneNumber', 'title', 'uvaPersonIAMAffiliation', 'uvaPersonSponsoredType']
uid_search_string = "(&(objectclass=person)(uid=%s))"
user_or_last_name_search_string = "(&(objectclass=person)(|(uid=%s*)(sn=%s*)))"
user_or_last_name_search = "(&(objectclass=person)(|(uid=%s*)(sn=%s*)))"
cn_single_search = '(&(objectclass=person)(cn=%s*))'
cn_double_search = '(&(objectclass=person)(&(cn=%s*)(cn=*%s*)))'
def __init__(self):
if app.config['TESTING']:
@ -67,18 +71,33 @@ class LdapService(object):
return LdapUserInfo.from_entry(entry)
def search_users(self, query, limit):
if len(query) < 3: return []
search_string = LdapService.user_or_last_name_search_string % (query, query)
self.conn.search(LdapService.search_base, search_string, attributes=LdapService.attributes)
# Entries are returned as a generator, accessing entries
# can make subsequent calls to the ldap service, so limit
# those here.
count = 0
if len(query.strip()) < 3:
return []
elif query.endswith(' '):
search_string = LdapService.cn_single_search % (query.strip())
elif query.strip().count(',') == 1:
f, l = query.split(",")
search_string = LdapService.cn_double_search % (l.strip(), f.strip())
elif query.strip().count(' ') == 1:
f,l = query.split(" ")
search_string = LdapService.cn_double_search % (f, l)
else:
# Search by user_id or last name
search_string = LdapService.user_or_last_name_search % (query, query)
results = []
for entry in self.conn.entries:
if count > limit:
break
results.append(LdapUserInfo.from_entry(entry))
count += 1
print(search_string)
try:
self.conn.search(LdapService.search_base, search_string, attributes=LdapService.attributes)
# Entries are returned as a generator, accessing entries
# can make subsequent calls to the ldap service, so limit
# those here.
count = 0
for entry in self.conn.entries:
if count > limit:
break
results.append(LdapUserInfo.from_entry(entry))
count += 1
except LDAPExceptionError as le:
app.logger.info("Failed to execute ldap search. %s", str(le))
return results

View File

@ -114,6 +114,9 @@ class TestLookupService(BaseTest):
results = LookupService.lookup(workflow, "AllTheNames", "1 (!-Something", limit=10)
self.assertEquals("1 Something", results[0].label, "special characters don't flake out")
results = LookupService.lookup(workflow, "AllTheNames", "Dan Funk (dhf", limit=10)
self.assertEquals(results)
# 1018 10000 Something Industry
# 1019 1000 Something Industry