add ldap endpoint that doesnt require workflow info

This commit is contained in:
alicia pritchett 2021-10-28 16:19:39 -04:00
parent b80e0507fe
commit 702b8eaed6
6 changed files with 73 additions and 12 deletions

View File

@ -96,6 +96,35 @@ paths:
items:
$ref: "#/components/schemas/DocumentDirectory"
# Context-less LDAP Lookup
/ldap:
parameters:
- name: query
in: query
required: false
description: The string to search for in the Value column of the lookup table.
schema:
type: string
- name: limit
in: query
required: false
description: The total number of records to return, defaults to 10.
schema:
type: integer
get:
operationId: crc.api.workflow.lookup_ldap
summary: Returns a list of LDAP users, with a given query.
tags:
- Users
responses:
'200':
description: An array of all LDAP users.
content:
application/json:
schema:
type: array
# /v1.0/study
/study:
get:

View File

@ -3,9 +3,9 @@ from flask import g, request
from crc import app, session
from crc.api.common import ApiError
from crc.services.user_service import UserService
from crc.models.user import UserModel, UserModelSchema
from crc.services.ldap_service import LdapService, LdapModel
from crc.services.user_service import UserService
"""
.. module:: crc.api.user

View File

@ -400,6 +400,16 @@ def lookup(workflow_id, task_spec_name, field_id, query=None, value=None, limit=
# Just return the data
return lookup_data
def lookup_ldap(query=None, limit=10):
"""
perform a lookup against the LDAP server without needing a provided workflow.
"""
value = None
lookup_data = LookupService._run_ldap_query(query, value, limit)
return lookup_data
def _verify_user_and_role(processor, spiff_task):
"""Assures the currently logged in user can access the given workflow and task, or
raises an error. """

View File

@ -1,7 +1,9 @@
from flask import g
import crc.api.user
from crc import session
from crc.api.common import ApiError
from crc.services.ldap_service import LdapService
from crc.models.user import UserModel, AdminSessionModel
@ -62,8 +64,13 @@ class UserService(object):
if uid is None:
raise ApiError("invalid_uid", "Please provide a valid user uid.")
if UserService.is_different_user(uid):
if UserService.is_different_user(uid):
# Impersonate the user if the given uid is valid.
# If the user is not in the User table, add them to it
ldap_info = LdapService().user_info(uid)
crc.api.user._upsert_user(ldap_info)
impersonate_user = session.query(UserModel).filter(UserModel.uid == uid).first()
if impersonate_user is not None:
@ -115,4 +122,4 @@ class UserService(object):
if admin_session is not None:
return session.query(UserModel).filter(UserModel.uid == admin_session.admin_impersonate_uid).first()
else:
raise ApiError("unauthorized", "You do not have permissions to do this.", status_code=403)
raise ApiError("unauthorized", "You do not have permissions to do this.", status_code=403)

View File

@ -0,0 +1,15 @@
from tests.base_test import BaseTest
class TestLdapApi(BaseTest):
def test_get_ldap(self):
"""
Test to make sure that LDAP api point returns a 200 code
"""
self.load_example_data()
rv = self.app.get('/v1.0/ldap?query=atp',
follow_redirects=True,
content_type="application/json", headers=self.logged_in_headers())
self.assertTrue(rv.status_code == 200)

View File

@ -220,17 +220,17 @@ class TestAuthentication(BaseTest):
admin_token_headers = dict(Authorization='Bearer ' + admin_user.encode_auth_token())
# User should not be in the system yet.
non_admin_user = session.query(UserModel).filter(UserModel.uid == self.non_admin_uid).first()
self.assertIsNone(non_admin_user)
# non_admin_user = session.query(UserModel).filter(UserModel.uid == self.non_admin_uid).first()
# self.assertIsNone(non_admin_user)
# Admin should not be able to impersonate non-existent user
rv_1 = self.app.get(
'/v1.0/user?admin_impersonate_uid=' + self.non_admin_uid,
content_type="application/json",
headers=admin_token_headers,
follow_redirects=False
)
self.assert_failure(rv_1, 400)
# rv_1 = self.app.get(
# '/v1.0/user?admin_impersonate_uid=' + self.non_admin_uid,
# content_type="application/json",
# headers=admin_token_headers,
# follow_redirects=False
#)
# self.assert_failure(rv_1, 400)
# Add the non-admin user now
self.logout()