cr-connect-workflow/crc/scripts/complete_template.py

172 lines
8.1 KiB
Python

import copy
import re
from io import BytesIO
import jinja2
from docx.shared import Inches
from docxtpl import DocxTemplate, Listing, InlineImage
from crc import session
from crc.api.common import ApiError
from crc.models.file import CONTENT_TYPES, FileModel, FileDataModel
from crc.models.workflow import WorkflowModel
from crc.scripts.script import Script
from crc.services.file_service import FileService
from crc.services.jinja_service import JinjaService
from crc.services.workflow_processor import WorkflowProcessor
class CompleteTemplate(Script):
def get_description(self):
return """Using the Jinja template engine, takes data available in the current task, and uses it to populate
a word document that contains Jinja markup. Please see https://docxtpl.readthedocs.io/en/latest/
for more information on exact syntax.
Takes two arguments:
1. The name of a MS Word docx file to use as a template.
2. The 'code' of the IRB Document as set in the irb_documents.xlsx file."
"""
def do_task_validate_only(self, task, study_id, workflow_id, *args, **kwargs):
"""For validation only, process the template, but do not store it in the database."""
workflow = session.query(WorkflowModel).filter(WorkflowModel.id == workflow_id).first()
self.process_template(task, study_id, workflow, *args, **kwargs)
def do_task(self, task, study_id, workflow_id, *args, **kwargs):
workflow = session.query(WorkflowModel).filter(WorkflowModel.id == workflow_id).first()
final_document_stream = self.process_template(task, study_id, workflow, *args, **kwargs)
file_name = args[0]
irb_doc_code = args[1]
FileService.add_workflow_file(workflow_id=workflow_id,
task_spec_name=task.get_name(),
name=file_name,
content_type=CONTENT_TYPES['docx'],
binary_data=final_document_stream.read(),
irb_doc_code=irb_doc_code)
def process_template(self, task, study_id, workflow=None, *args, **kwargs):
"""Entry point, mostly worried about wiring it all up."""
if len(args) < 2 or len(args) > 3:
raise ApiError(code="missing_argument",
message="The CompleteTemplate script requires 2 arguments. The first argument is "
"the name of the docx template to use. The second "
"argument is a code for the document, as "
"set in the reference document %s. " % FileService.DOCUMENT_LIST)
task_study_id = task.workflow.data[WorkflowProcessor.STUDY_ID_KEY]
file_name = args[0]
if task_study_id != study_id:
raise ApiError(code="invalid_argument",
message="The given task does not match the given study.")
file_data_model = None
if workflow is not None:
# Get the workflow specification file with the given name.
file_data_models = FileService.get_spec_data_files(
workflow_spec_id=workflow.workflow_spec_id,
workflow_id=workflow.id,
name=file_name)
if len(file_data_models) > 0:
file_data_model = file_data_models[0]
else:
raise ApiError(code="invalid_argument",
message="Uable to locate a file with the given name.")
# Get images from file/files fields
if len(args) == 3:
image_file_data = self.get_image_file_data(args[2], task)
else:
image_file_data = None
return self.make_template(BytesIO(file_data_model.data), task.data, image_file_data)
def get_image_file_data(self, fields_str, task):
image_file_data = []
images_field_str = re.sub(r'[\[\]]', '', fields_str)
images_field_keys = [v.strip() for v in images_field_str.strip().split(',')]
for field_key in images_field_keys:
if field_key in task.data:
v = task.data[field_key]
file_ids = v if isinstance(v, list) else [v]
for file_id in file_ids:
if isinstance(file_id, str) and file_id.isnumeric():
file_id = int(file_id)
if file_id is not None and isinstance(file_id, int):
if not task.workflow.data[WorkflowProcessor.VALIDATION_PROCESS_KEY]:
# Get the actual image data
image_file_model = session.query(FileModel).filter_by(id=file_id).first()
image_file_data_model = FileService.get_file_data(file_id, image_file_model)
if image_file_data_model is not None:
image_file_data.append(image_file_data_model)
else:
raise ApiError(
code="not_a_file_id",
message="The CompleteTemplate script requires 2-3 arguments. The third argument should "
"be a comma-delimited list of File IDs")
return image_file_data
def make_template(self, binary_stream, task_data, image_file_data=None):
# TODO: Move this into the jinja_service?
doc = DocxTemplate(binary_stream)
doc_context = copy.deepcopy(task_data)
doc_context = self.rich_text_update(doc_context)
doc_context = self.append_images(doc, doc_context, image_file_data)
jinja_env = jinja2.Environment(autoescape=True)
try:
doc.render(doc_context, jinja_env)
except Exception as e:
print (e)
# try:
# doc = JinjaService.get_word_document_content(binary_stream, task_data, image_file_data)
# except jinja2.exceptions.TemplateError as te:
# # raise ApiError.from_task(code="bad_template",
# # message="There was a problem compiling your template.",
# # task=self.task)
# print(te)
# except TypeError as te:
# print(te)
target_stream = BytesIO()
doc.save(target_stream)
target_stream.seek(0) # move to the beginning of the stream.
return target_stream
def append_images(self, template, context, image_file_data):
context['images'] = {}
if image_file_data is not None:
for file_data_model in image_file_data:
fm = file_data_model.file_model
if fm is not None:
context['images'][fm.id] = {
'name': fm.name,
'url': '/v1.0/file/%s/data' % fm.id,
'image': self.make_image(file_data_model, template)
}
return context
def make_image(self, file_data_model, template):
return InlineImage(template, BytesIO(file_data_model.data), width=Inches(6.5))
def rich_text_update(self, context):
"""This is a bit of a hack. If we find that /n characters exist in the data, we want
these to come out in the final document without requiring someone to predict it in the
template. Ideally we would use the 'RichText' feature of the python-docx library, but
that requires we both escape it here, and in the Docx template. There is a thing called
a 'listing' in python-docx library that only requires we use it on the way in, and the
template doesn't have to think about it. So running with that for now."""
# loop through the content, identify anything that has a newline character in it, and
# wrap that sucker in a 'listing' function.
if isinstance(context, dict):
for k, v in context.items():
context[k] = self.rich_text_update(v)
elif isinstance(context, list):
for i in range(len(context)):
context[i] = self.rich_text_update(context[i])
elif isinstance(context, str) and '\n' in context:
return Listing(context)
return context