Ldap lookup script

This commit is contained in:
Carlos Lopez 2020-07-19 21:53:18 -06:00
parent 7b0eb9733a
commit 313770d538
3 changed files with 130 additions and 1 deletions

View File

@ -0,0 +1,78 @@
import copy
from crc import app
from crc.api.common import ApiError
from crc.scripts.script import Script
from crc.services.ldap_service import LdapService
USER_DETAILS = {
"PIComputingID": {
"value": "",
"data": {
},
"label": "invalid uid"
}
}
class LdapLookup(Script):
"""This Script allows to be introduced as part of a workflow and called from there, taking
a UID as input and looking it up through LDAP to return the person's details """
def get_description(self):
return """
Attempts to create a dictionary with person details, using the
provided argument (a UID) and look it up through LDAP.
Example:
LdapLookup PIComputingID
"""
def do_task_validate_only(self, task, *args, **kwargs):
self.get_user_info(task, args)
def do_task(self, task, *args, **kwargs):
args = [arg for arg in args if type(arg) == str]
user_info = self.get_user_info(task, args)
user_details = copy.deepcopy(USER_DETAILS)
user_details['PIComputingID']['value'] = user_info['uid']
if len(user_info.keys()) > 1:
user_details['PIComputingID']['label'] = user_info.pop('label')
else:
user_info.pop('uid')
user_details['PIComputingID']['data'] = user_info
return user_details
def get_user_info(self, task, args):
if len(args) < 1:
raise ApiError(code="missing_argument",
message="Ldap lookup script requires one argument. The "
"UID for the person we want to look up")
arg = args.pop() # Extracting only one value for now
uid = task.workflow.script_engine.evaluate_expression(task, arg)
if not isinstance(uid, str):
raise ApiError(code="invalid_argument",
message="Ldap lookup script requires one 1 UID argument, of type string.")
user_info_dict = {}
try:
user_info = LdapService.user_info(uid)
user_info_dict = {
"display_name": user_info.display_name,
"given_name": user_info.given_name,
"email_address": user_info.email_address,
"telephone_number": user_info.telephone_number,
"title": user_info.title,
"department": user_info.department,
"affiliation": user_info.affiliation,
"sponsor_type": user_info.sponsor_type,
"uid": user_info.uid,
"label": user_info.proper_name()
}
except:
user_info_dict['uid'] = uid
app.logger.error(f'Ldap lookup failed for UID {uid}')
return user_info_dict

View File

@ -11,7 +11,7 @@ class RequestApproval(Script):
return """
Creates an approval request on this workflow, by the given approver_uid(s),"
Takes multiple arguments, which should point to data located in current task
or be quoted strings. The order is important. Approvals will be processed
or be quoted strings. The order is important. Approvals will be processed
in this order.
Example:

View File

@ -0,0 +1,51 @@
from tests.base_test import BaseTest
from crc.services.workflow_processor import WorkflowProcessor
from crc.scripts.ldap_lookup import LdapLookup
from crc import db, mail
class TestLdapLookupScript(BaseTest):
def test_get_existing_user_details(self):
self.load_example_data()
self.create_reference_document()
workflow = self.create_workflow('empty_workflow')
processor = WorkflowProcessor(workflow)
task = processor.next_task()
task.data = {
'PIComputingID': 'dhf8r'
}
script = LdapLookup()
user_details = script.do_task(task, workflow.study_id, workflow.id, "PIComputingID")
self.assertEqual(user_details['PIComputingID']['label'], 'Dan Funk - (dhf8r)')
self.assertEqual(user_details['PIComputingID']['value'], 'dhf8r')
self.assertEqual(user_details['PIComputingID']['data']['display_name'], 'Dan Funk')
self.assertEqual(user_details['PIComputingID']['data']['given_name'], 'Dan')
self.assertEqual(user_details['PIComputingID']['data']['email_address'], 'dhf8r@virginia.edu')
self.assertEqual(user_details['PIComputingID']['data']['telephone_number'], '+1 (434) 924-1723')
self.assertEqual(user_details['PIComputingID']['data']['title'], 'E42:He\'s a hoopy frood')
self.assertEqual(user_details['PIComputingID']['data']['department'], 'E0:EN-Eng Study of Parallel Universes')
self.assertEqual(user_details['PIComputingID']['data']['affiliation'], 'faculty')
self.assertEqual(user_details['PIComputingID']['data']['sponsor_type'], 'Staff')
self.assertEqual(user_details['PIComputingID']['data']['uid'], 'dhf8r')
def test_get_invalid_user_details(self):
self.load_example_data()
self.create_reference_document()
workflow = self.create_workflow('empty_workflow')
processor = WorkflowProcessor(workflow)
task = processor.next_task()
task.data = {
'PIComputingID': 'rec3z'
}
script = LdapLookup()
user_details = script.do_task(task, workflow.study_id, workflow.id, "PIComputingID")
self.assertEqual(user_details['PIComputingID']['label'], 'invalid uid')
self.assertEqual(user_details['PIComputingID']['value'], 'rec3z')
self.assertEqual(user_details['PIComputingID']['data'], {})