Merge pull request #414 from sartography/chore/view-as-382

remove email script test line about UTC stuff
This commit is contained in:
Dan Funk 2021-11-10 15:18:41 -05:00 committed by GitHub
commit 56dc34a209
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 96 additions and 18 deletions

View File

@ -96,6 +96,35 @@ paths:
items:
$ref: "#/components/schemas/DocumentDirectory"
# Context-less LDAP Lookup
/ldap:
parameters:
- name: query
in: query
required: false
description: The string to search for in the Value column of the lookup table.
schema:
type: string
- name: limit
in: query
required: false
description: The total number of records to return, defaults to 10.
schema:
type: integer
get:
operationId: crc.api.workflow.lookup_ldap
summary: Returns a list of LDAP users, with a given query.
tags:
- Users
responses:
'200':
description: An array of all LDAP users.
content:
application/json:
schema:
type: array
# /v1.0/study
/study:
get:

View File

@ -3,9 +3,9 @@ from flask import g, request
from crc import app, session
from crc.api.common import ApiError
from crc.services.user_service import UserService
from crc.models.user import UserModel, UserModelSchema
from crc.services.ldap_service import LdapService, LdapModel
from crc.services.user_service import UserService
"""
.. module:: crc.api.user

View File

@ -400,6 +400,16 @@ def lookup(workflow_id, task_spec_name, field_id, query=None, value=None, limit=
# Just return the data
return lookup_data
def lookup_ldap(query=None, limit=10):
"""
perform a lookup against the LDAP server without needing a provided workflow.
"""
value = None
lookup_data = LookupService._run_ldap_query(query, value, limit)
return lookup_data
def _verify_user_and_role(processor, spiff_task):
"""Assures the currently logged in user can access the given workflow and task, or
raises an error. """

View File

@ -1,7 +1,9 @@
from flask import g
import crc.api.user
from crc import session
from crc.api.common import ApiError
from crc.services.ldap_service import LdapService
from crc.models.user import UserModel, AdminSessionModel
@ -62,8 +64,13 @@ class UserService(object):
if uid is None:
raise ApiError("invalid_uid", "Please provide a valid user uid.")
if UserService.is_different_user(uid):
if UserService.is_different_user(uid):
# Impersonate the user if the given uid is valid.
# If the user is not in the User table, add them to it
ldap_info = LdapService().user_info(uid)
crc.api.user._upsert_user(ldap_info)
impersonate_user = session.query(UserModel).filter(UserModel.uid == uid).first()
if impersonate_user is not None:
@ -115,4 +122,4 @@ class UserService(object):
if admin_session is not None:
return session.query(UserModel).filter(UserModel.uid == admin_session.admin_impersonate_uid).first()
else:
raise ApiError("unauthorized", "You do not have permissions to do this.", status_code=403)
raise ApiError("unauthorized", "You do not have permissions to do this.", status_code=403)

View File

@ -48,12 +48,6 @@ class TestEmailScript(BaseTest):
self.assertEqual(db_emails[0].sender, workflow_api.next_task.data['email_model']['sender'])
self.assertEqual(db_emails[0].subject, workflow_api.next_task.data['email_model']['subject'])
# Make sure timestamp is UTC
self.assertEqual(db_emails[0].timestamp.tzinfo, datetime.timezone.utc)
# Make sure we remove content_html from the returned email_model
self.assertNotIn('content_html', workflow_api.next_task.data['email_model'])
@patch('crc.services.email_service.EmailService.add_email')
def test_email_raises_exception(self, mock_response):
self.load_example_data()

View File

@ -0,0 +1,38 @@
import json
from tests.base_test import BaseTest
class TestLdapApi(BaseTest):
def test_get_ldap(self):
"""
Test to make sure that LDAP api returns a real user
"""
self.load_example_data()
rv = self.app.get('/v1.0/ldap?query=dhf',
follow_redirects=True,
content_type="application/json", headers=self.logged_in_headers())
self.assertTrue(rv.status_code == 200)
user_uid = "dhf8r"
data = json.loads(rv.data)
self.assertEqual(data[0]['uid'], user_uid)
self.assertEqual(data[0]['display_name'], 'Dan Funk')
self.assertEqual(data[0]['given_name'], 'Dan')
self.assertEqual(data[0]['affiliation'], 'faculty')
def test_not_in_ldap(self):
"""
Test to make sure the LDAP api doesn't return a nonexistent user
"""
self.load_example_data()
rv = self.app.get('/v1.0/ldap?query=atp',
follow_redirects=True,
content_type="application/json", headers=self.logged_in_headers())
# Should still successfully perform lookup
self.assertTrue(rv.status_code == 200)
data = json.loads(rv.data)
# Should not return any users
self.assertEqual(len(data), 0)

View File

@ -220,17 +220,17 @@ class TestAuthentication(BaseTest):
admin_token_headers = dict(Authorization='Bearer ' + admin_user.encode_auth_token())
# User should not be in the system yet.
non_admin_user = session.query(UserModel).filter(UserModel.uid == self.non_admin_uid).first()
self.assertIsNone(non_admin_user)
# non_admin_user = session.query(UserModel).filter(UserModel.uid == self.non_admin_uid).first()
# self.assertIsNone(non_admin_user)
# Admin should not be able to impersonate non-existent user
rv_1 = self.app.get(
'/v1.0/user?admin_impersonate_uid=' + self.non_admin_uid,
content_type="application/json",
headers=admin_token_headers,
follow_redirects=False
)
self.assert_failure(rv_1, 400)
# rv_1 = self.app.get(
# '/v1.0/user?admin_impersonate_uid=' + self.non_admin_uid,
# content_type="application/json",
# headers=admin_token_headers,
# follow_redirects=False
#)
# self.assert_failure(rv_1, 400)
# Add the non-admin user now
self.logout()