From 313770d5389d37c5988f76d1a0edb14d7b8baa2a Mon Sep 17 00:00:00 2001 From: Carlos Lopez Date: Sun, 19 Jul 2020 21:53:18 -0600 Subject: [PATCH] Ldap lookup script --- crc/scripts/ldap_lookup.py | 78 +++++++++++++++++++++++++++ crc/scripts/request_approval.py | 2 +- tests/ldap/test_ldap_lookup_script.py | 51 ++++++++++++++++++ 3 files changed, 130 insertions(+), 1 deletion(-) create mode 100644 crc/scripts/ldap_lookup.py create mode 100644 tests/ldap/test_ldap_lookup_script.py diff --git a/crc/scripts/ldap_lookup.py b/crc/scripts/ldap_lookup.py new file mode 100644 index 00000000..62bd287a --- /dev/null +++ b/crc/scripts/ldap_lookup.py @@ -0,0 +1,78 @@ +import copy + +from crc import app +from crc.api.common import ApiError +from crc.scripts.script import Script +from crc.services.ldap_service import LdapService + + +USER_DETAILS = { + "PIComputingID": { + "value": "", + "data": { + }, + "label": "invalid uid" + } +} + + +class LdapLookup(Script): + """This Script allows to be introduced as part of a workflow and called from there, taking + a UID as input and looking it up through LDAP to return the person's details """ + + def get_description(self): + return """ +Attempts to create a dictionary with person details, using the +provided argument (a UID) and look it up through LDAP. + +Example: +LdapLookup PIComputingID +""" + + def do_task_validate_only(self, task, *args, **kwargs): + self.get_user_info(task, args) + + def do_task(self, task, *args, **kwargs): + args = [arg for arg in args if type(arg) == str] + user_info = self.get_user_info(task, args) + + user_details = copy.deepcopy(USER_DETAILS) + user_details['PIComputingID']['value'] = user_info['uid'] + if len(user_info.keys()) > 1: + user_details['PIComputingID']['label'] = user_info.pop('label') + else: + user_info.pop('uid') + user_details['PIComputingID']['data'] = user_info + return user_details + + def get_user_info(self, task, args): + if len(args) < 1: + raise ApiError(code="missing_argument", + message="Ldap lookup script requires one argument. The " + "UID for the person we want to look up") + + arg = args.pop() # Extracting only one value for now + uid = task.workflow.script_engine.evaluate_expression(task, arg) + if not isinstance(uid, str): + raise ApiError(code="invalid_argument", + message="Ldap lookup script requires one 1 UID argument, of type string.") + user_info_dict = {} + try: + user_info = LdapService.user_info(uid) + user_info_dict = { + "display_name": user_info.display_name, + "given_name": user_info.given_name, + "email_address": user_info.email_address, + "telephone_number": user_info.telephone_number, + "title": user_info.title, + "department": user_info.department, + "affiliation": user_info.affiliation, + "sponsor_type": user_info.sponsor_type, + "uid": user_info.uid, + "label": user_info.proper_name() + } + except: + user_info_dict['uid'] = uid + app.logger.error(f'Ldap lookup failed for UID {uid}') + + return user_info_dict diff --git a/crc/scripts/request_approval.py b/crc/scripts/request_approval.py index 0a4c76ff..a82e17a0 100644 --- a/crc/scripts/request_approval.py +++ b/crc/scripts/request_approval.py @@ -11,7 +11,7 @@ class RequestApproval(Script): return """ Creates an approval request on this workflow, by the given approver_uid(s)," Takes multiple arguments, which should point to data located in current task -or be quoted strings. The order is important. Approvals will be processed +or be quoted strings. The order is important. Approvals will be processed in this order. Example: diff --git a/tests/ldap/test_ldap_lookup_script.py b/tests/ldap/test_ldap_lookup_script.py new file mode 100644 index 00000000..7cb7ff55 --- /dev/null +++ b/tests/ldap/test_ldap_lookup_script.py @@ -0,0 +1,51 @@ +from tests.base_test import BaseTest + +from crc.services.workflow_processor import WorkflowProcessor +from crc.scripts.ldap_lookup import LdapLookup +from crc import db, mail + + +class TestLdapLookupScript(BaseTest): + + def test_get_existing_user_details(self): + self.load_example_data() + self.create_reference_document() + workflow = self.create_workflow('empty_workflow') + processor = WorkflowProcessor(workflow) + task = processor.next_task() + + task.data = { + 'PIComputingID': 'dhf8r' + } + + script = LdapLookup() + user_details = script.do_task(task, workflow.study_id, workflow.id, "PIComputingID") + + self.assertEqual(user_details['PIComputingID']['label'], 'Dan Funk - (dhf8r)') + self.assertEqual(user_details['PIComputingID']['value'], 'dhf8r') + self.assertEqual(user_details['PIComputingID']['data']['display_name'], 'Dan Funk') + self.assertEqual(user_details['PIComputingID']['data']['given_name'], 'Dan') + self.assertEqual(user_details['PIComputingID']['data']['email_address'], 'dhf8r@virginia.edu') + self.assertEqual(user_details['PIComputingID']['data']['telephone_number'], '+1 (434) 924-1723') + self.assertEqual(user_details['PIComputingID']['data']['title'], 'E42:He\'s a hoopy frood') + self.assertEqual(user_details['PIComputingID']['data']['department'], 'E0:EN-Eng Study of Parallel Universes') + self.assertEqual(user_details['PIComputingID']['data']['affiliation'], 'faculty') + self.assertEqual(user_details['PIComputingID']['data']['sponsor_type'], 'Staff') + self.assertEqual(user_details['PIComputingID']['data']['uid'], 'dhf8r') + + def test_get_invalid_user_details(self): + self.load_example_data() + self.create_reference_document() + workflow = self.create_workflow('empty_workflow') + processor = WorkflowProcessor(workflow) + task = processor.next_task() + + task.data = { + 'PIComputingID': 'rec3z' + } + + script = LdapLookup() + user_details = script.do_task(task, workflow.study_id, workflow.id, "PIComputingID") + self.assertEqual(user_details['PIComputingID']['label'], 'invalid uid') + self.assertEqual(user_details['PIComputingID']['value'], 'rec3z') + self.assertEqual(user_details['PIComputingID']['data'], {})