cr-connect-workflow/crc/api/approval.py

138 lines
4.9 KiB
Python
Raw Normal View History

2020-06-22 15:24:58 +00:00
import csv
import io
import json
import pickle
from base64 import b64decode
from datetime import datetime
2020-06-22 15:24:58 +00:00
from flask import g, make_response
2020-06-02 01:45:09 +00:00
from crc import db, session
from crc.api.common import ApiError
from crc.models.approval import Approval, ApprovalModel, ApprovalSchema, ApprovalStatus
from crc.models.workflow import WorkflowModel
2020-05-24 05:53:48 +00:00
from crc.services.approval_service import ApprovalService
from crc.services.ldap_service import LdapService
# Returns counts of approvals in each status group assigned to the given user.
# The goal is to return results as quickly as possible.
def get_approval_counts(as_user=None):
uid = as_user or g.user.uid
db_user_approvals = db.session.query(ApprovalModel)\
.filter_by(approver_uid=uid)\
.filter(ApprovalModel.status != ApprovalStatus.CANCELED.name)\
.all()
study_ids = [a.study_id for a in db_user_approvals]
db_other_approvals = db.session.query(ApprovalModel)\
.filter(ApprovalModel.study_id.in_(study_ids))\
.filter(ApprovalModel.approver_uid != uid)\
.filter(ApprovalModel.status != ApprovalStatus.CANCELED.name)\
.all()
# Make a dict of the other approvals where the key is the study id and the value is the approval
# TODO: This won't work if there are more than 2 approvals with the same study_id
other_approvals = {}
for approval in db_other_approvals:
other_approvals[approval.study_id] = approval
counts = {}
for name, value in ApprovalStatus.__members__.items():
counts[name] = 0
for approval in db_user_approvals:
# Check if another approval has the same study id
if approval.study_id in other_approvals:
other_approval = other_approvals[approval.study_id]
# Other approval takes precedence over this one
if other_approval.id < approval.id:
if other_approval.status == ApprovalStatus.PENDING.name:
counts[ApprovalStatus.AWAITING.name] += 1
elif other_approval.status == ApprovalStatus.DECLINED.name:
counts[ApprovalStatus.DECLINED.name] += 1
elif other_approval.status == ApprovalStatus.CANCELED.name:
counts[ApprovalStatus.CANCELED.name] += 1
elif other_approval.status == ApprovalStatus.APPROVED.name:
counts[approval.status] += 1
else:
counts[approval.status] += 1
else:
counts[approval.status] += 1
return counts
def get_all_approvals(status=None):
approvals = ApprovalService.get_all_approvals(include_cancelled=status is True)
results = ApprovalSchema(many=True).dump(approvals)
return results
def get_approvals(status=None, as_user=None):
#status = ApprovalStatus.PENDING.value
user = g.user.uid
if as_user:
user = as_user
approvals = ApprovalService.get_approvals_per_user(user, status,
include_cancelled=False)
2020-05-24 05:53:48 +00:00
results = ApprovalSchema(many=True).dump(approvals)
return results
def get_approvals_for_study(study_id=None):
db_approvals = ApprovalService.get_approvals_for_study(study_id)
approvals = [Approval.from_model(approval_model) for approval_model in db_approvals]
results = ApprovalSchema(many=True).dump(approvals)
return results
2020-06-22 20:07:57 +00:00
def get_health_attesting_csv():
records = ApprovalService.get_health_attesting_records()
2020-06-22 15:24:58 +00:00
si = io.StringIO()
cw = csv.writer(si)
cw.writerows(records)
output = make_response(si.getvalue())
output.headers["Content-Disposition"] = "attachment; filename=health_attesting.csv"
output.headers["Content-type"] = "text/csv"
return output
# ----- Begin descent into madness ---- #
def get_csv():
"""A damn lie, it's a json file. A huge bit of a one-off for RRT, but 3 weeks of midnight work can convince a
man to do just about anything"""
2020-06-22 20:07:57 +00:00
content = ApprovalService.get_not_really_csv_content()
return content
# ----- come back to the world of the living ---- #
2020-05-24 05:53:48 +00:00
def update_approval(approval_id, body):
if approval_id is None:
raise ApiError('unknown_approval', 'Please provide a valid Approval ID.')
approval_model = session.query(ApprovalModel).get(approval_id)
if approval_model is None:
2020-05-24 05:53:48 +00:00
raise ApiError('unknown_approval', 'The approval "' + str(approval_id) + '" is not recognized.')
if approval_model.approver_uid != g.user.uid:
raise ApiError("not_your_approval", "You may not modify this approval. It belongs to another user.")
approval_model.status = body['status']
approval_model.message = body['message']
approval_model.date_approved = datetime.now()
session.add(approval_model)
session.commit()
2020-06-05 19:39:52 +00:00
# Called only to send emails
approver = body['approver']['uid']
ApprovalService.update_approval(approval_id, approver)
result = ApprovalSchema().dump(approval_model)
2020-05-24 05:53:48 +00:00
return result