The 3 files from cr-connect ldap.

Most of my changes are in ldap_service. I added users_as_json method, commented out an unused method, and created my own search string.
	The model in ldap.py is unchanged.
	In ldap_response, I added a fake entry to test multiple returned values.
This commit is contained in:
mike cullerton 2021-01-06 11:14:07 -05:00
parent e6c2f5a628
commit 1138d45183
3 changed files with 306 additions and 0 deletions

42
pb/ldap/ldap.py Normal file
View File

@ -0,0 +1,42 @@
from flask_marshmallow.sqla import SQLAlchemyAutoSchema
from marshmallow import EXCLUDE
from sqlalchemy import func
from pb import db
class LdapModel(db.Model):
uid = db.Column(db.String, primary_key=True)
display_name = db.Column(db.String)
given_name = db.Column(db.String)
email_address = db.Column(db.String)
telephone_number = db.Column(db.String)
title = db.Column(db.String)
department = db.Column(db.String)
affiliation = db.Column(db.String)
sponsor_type = db.Column(db.String)
date_cached = db.Column(db.DateTime(timezone=True), default=func.now())
@classmethod
def from_entry(cls, entry):
return LdapModel(uid=entry.uid.value,
display_name=entry.displayName.value,
given_name=", ".join(entry.givenName),
email_address=entry.mail.value,
telephone_number=entry.telephoneNumber.value,
title=", ".join(entry.title),
department=", ".join(entry.uvaDisplayDepartment),
affiliation=", ".join(entry.uvaPersonIAMAffiliation),
sponsor_type=", ".join(entry.uvaPersonSponsoredType))
def proper_name(self):
return f'{self.display_name} - ({self.uid})'
class LdapSchema(SQLAlchemyAutoSchema):
class Meta:
model = LdapModel
load_instance = True
include_relationships = True
include_fk = True # Includes foreign keys
unknown = EXCLUDE

164
pb/ldap/ldap_response.json Normal file
View File

@ -0,0 +1,164 @@
{
"entries": [
{
"dn": "uid=lb3dp,ou=People,o=University of Virginia,c=US",
"raw": {
"cn": [
"Laura Barnes (lb3dp)"
],
"displayName": [
"Laura Barnes"
],
"givenName": [
"Laura"
],
"mail": [
"lb3dp@virginia.edu"
],
"objectClass": [
"top",
"person",
"organizationalPerson",
"inetOrgPerson",
"uvaPerson",
"uidObject"
],
"telephoneNumber": [
"+1 (434) 924-1723"
],
"title": [
"E0:Associate Professor of Systems and Information Engineering"
],
"uvaDisplayDepartment": [
"E0:EN-Eng Sys and Environment"
],
"uvaPersonIAMAffiliation": [
"faculty"
],
"uvaPersonSponsoredType": [
"Staff"
]
}
},
{
"dn": "uid=st1fn,ou=People,o=University of Virginia,c=US",
"raw": {
"cn": [
"Stanley Funkmeister (st1fn)"
],
"displayName": [
"Stanley Funkmeister"
],
"givenName": [
"Stanley"
],
"mail": [
"st1fn@virginia.edu"
],
"objectClass": [
"top",
"person",
"organizationalPerson",
"inetOrgPerson",
"uvaPerson",
"uidObject"
],
"telephoneNumber": [
"+1 (434) 924-1724"
],
"title": [
"E1:Professor of Systems and Information Engineering"
],
"uvaDisplayDepartment": [
"E0:EN-Eng Sys and Environment"
],
"uvaPersonIAMAffiliation": [
"faculty"
],
"uvaPersonSponsoredType": [
"Staff"
]
}
},
{
"dn": "uid=dhf8r,ou=People,o=University of Virginia,c=US",
"raw": {
"cn": [
"Dan Funk (dhf84)"
],
"displayName": [
"Dan Funk"
],
"givenName": [
"Dan"
],
"mail": [
"dhf8r@virginia.edu"
],
"objectClass": [
"top",
"person",
"organizationalPerson",
"inetOrgPerson",
"uvaPerson",
"uidObject"
],
"telephoneNumber": [
"+1 (434) 924-1723"
],
"title": [
"E42:He's a hoopy frood"
],
"uvaDisplayDepartment": [
"E0:EN-Eng Study of Parallel Universes"
],
"uvaPersonIAMAffiliation": [
"faculty"
],
"uvaPersonSponsoredType": [
"Staff"
]
}
},
{
"dn": "uid=lje5u,ou=People,o=University of Virginia,c=US",
"raw": {
"cn": [
"Elder, Lori J (lje5u)"
],
"displayName": [
"Lori Elder"
],
"givenName": [
"Lori"
],
"mail": [
"lje5u@virginia.edu"
],
"objectClass": [
"top",
"person",
"organizationalPerson",
"inetOrgPerson",
"uvaPerson",
"uidObject"
],
"telephoneNumber": [
"+1 (434) 924-1723"
],
"title": [
"E42:The vision"
],
"uvaDisplayDepartment": [
"E0:EN-Phy Anything could go here."
],
"uvaPersonIAMAffiliation": [
"faculty"
],
"uvaPersonSponsoredType": [
"Staff"
]
}
}
]
}

100
pb/ldap/ldap_service.py Normal file
View File

@ -0,0 +1,100 @@
import os
import json
# from attr import asdict
from ldap3.core.exceptions import LDAPExceptionError
from pb import app, db
from ldap3 import Connection, Server, MOCK_SYNC, RESTARTABLE
from pb.ldap.ldap import LdapModel, LdapSchema
# class PBError(Exception):
# pass
#
#
# ApiError = PBError
class LdapService(object):
search_base = "ou=People,o=University of Virginia,c=US"
attributes = ['uid', 'cn', 'sn', 'displayName', 'givenName', 'mail', 'objectClass', 'UvaDisplayDepartment',
'telephoneNumber', 'title', 'uvaPersonIAMAffiliation', 'uvaPersonSponsoredType']
# uid_search_string = "(&(objectclass=person)(uid=%s))"
# user_or_last_name_search = "(&(objectclass=person)(|(uid=%s*)(sn=%s*)))"
# cn_single_search = '(&(objectclass=person)(cn=%s*))'
# cn_double_search = '(&(objectclass=person)(&(cn=%s*)(cn=*%s*)))'
live_search = '(&(objectclass=person)(|(cn=*%s*)(displayName=*%s*)(givenName=*%s*)(mail=*%s*)))'
temp_cache = {}
conn = None
@staticmethod
def __get_conn():
if not LdapService.conn:
if app.config['TESTING'] or app.config['LDAP_URL'] == 'mock':
server = Server('my_fake_server')
conn = Connection(server, client_strategy=MOCK_SYNC)
file_path = os.path.abspath(os.path.join(app.root_path, 'pb', 'ldap', 'ldap_response.json'))
conn.strategy.entries_from_json(file_path)
conn.bind()
else:
server = Server(app.config['LDAP_URL'], connect_timeout=app.config['LDAP_TIMEOUT_SEC'])
conn = Connection(server, auto_bind=True,
receive_timeout=app.config['LDAP_TIMEOUT_SEC'],
client_strategy=RESTARTABLE)
LdapService.conn = conn
return LdapService.conn
# @staticmethod
# def user_info(uva_uid):
# user_info = db.session.query(LdapModel).filter(LdapModel.uid == uva_uid).first()
# if not user_info:
# app.logger.info("No cache for " + uva_uid)
# search_string = LdapService.uid_search_string % uva_uid
# conn = LdapService.__get_conn()
# conn.search(LdapService.search_base, search_string, attributes=LdapService.attributes)
# if len(conn.entries) < 1:
# raise ApiError("missing_ldap_record", "Unable to locate a user with id %s in LDAP" % uva_uid)
# entry = conn.entries[0]
# user_info = LdapModel.from_entry(entry)
# db.session.add(user_info)
# db.session.commit()
# return user_info
@staticmethod
def search_users(query, limit):
query = query.strip()
if len(query) < 3:
return []
else:
# Search cn, displayName, givenName and mail
search_string = LdapService.live_search % (query, query, query, query)
results = []
app.logger.info(search_string)
try:
conn = LdapService.__get_conn()
conn.search(LdapService.search_base, search_string, attributes=LdapService.attributes)
# Entries are returned as a generator, accessing entries
# can make subsequent calls to the ldap service, so limit
# those here.
count = 0
for entry in conn.entries:
if count > limit:
break
results.append(LdapSchema().dump(LdapModel.from_entry(entry)))
count += 1
except LDAPExceptionError as le:
app.logger.info("Failed to execute ldap search. %s", str(le))
return results
def users_as_json(needle):
users = []
if len(needle) > 2:
result = LdapService.search_users(needle, 15)
for user in result:
users.append({'uid': user['uid'], 'display_name': user['display_name']})
return json.dumps(users)