From a7d0fbb38c9ba34185e3ccab3964393df744d657 Mon Sep 17 00:00:00 2001 From: jasquat <2487833+jasquat@users.noreply.github.com> Date: Thu, 13 Jul 2023 11:05:49 -0400 Subject: [PATCH] Feature/sanitize jinja template (#389) * added a helpers class that can be used in jinja templates * added jinja helpers to script task imports * added some comments for jinja helper usage * added tests for jinja helpers * pyl * pass jinja helper mappings in so helpers can be used consistently --------- Co-authored-by: jasquat --- .../routes/tasks_controller.py | 51 +------------ .../services/jinja_service.py | 76 +++++++++++++++++++ .../services/process_instance_processor.py | 2 + .../manual_task_with_sanitized_markdown.bpmn | 59 ++++++++++++++ .../unit/test_jinja_service.py | 44 +++++++++++ 5 files changed, 185 insertions(+), 47 deletions(-) create mode 100644 spiffworkflow-backend/src/spiffworkflow_backend/services/jinja_service.py create mode 100644 spiffworkflow-backend/tests/data/manual-task-with-sanitized-markdown/manual_task_with_sanitized_markdown.bpmn create mode 100644 spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_jinja_service.py diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py index 29d7f75f4..b8c105cbe 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py @@ -3,12 +3,10 @@ import json import os import uuid from collections.abc import Generator -from sys import exc_info from typing import Any from typing import TypedDict import flask.wrappers -import jinja2 import sentry_sdk from flask import current_app from flask import g @@ -16,7 +14,6 @@ from flask import jsonify from flask import make_response from flask import stream_with_context from flask.wrappers import Response -from jinja2 import TemplateSyntaxError from SpiffWorkflow.bpmn.exceptions import WorkflowTaskException # type: ignore from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore from SpiffWorkflow.task import Task as SpiffTask # type: ignore @@ -53,6 +50,7 @@ from spiffworkflow_backend.services.authorization_service import HumanTaskNotFou from spiffworkflow_backend.services.authorization_service import UserDoesNotHaveAccessToTaskError from spiffworkflow_backend.services.error_handling_service import ErrorHandlingService from spiffworkflow_backend.services.file_system_service import FileSystemService +from spiffworkflow_backend.services.jinja_service import JinjaService from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceIsAlreadyLockedError from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService @@ -400,7 +398,7 @@ def task_show(process_instance_id: int, task_guid: str = "next") -> flask.wrappe task_model.form_ui_schema = ui_form_contents _munge_form_ui_schema_based_on_hidden_fields_in_task_data(task_model) - _render_instructions_for_end_user(task_model, extensions) + JinjaService.render_instructions_for_end_user(task_model, extensions) task_model.extensions = extensions return make_response(jsonify(task_model), 200) @@ -414,22 +412,6 @@ def task_submit( return _task_submit_shared(process_instance_id, task_guid, body) -def _render_instructions_for_end_user(task_model: TaskModel, extensions: dict | None = None) -> str: - """Assure any instructions for end user are processed for jinja syntax.""" - if extensions is None: - extensions = TaskService.get_extensions_from_task_model(task_model) - if extensions and "instructionsForEndUser" in extensions: - if extensions["instructionsForEndUser"]: - try: - instructions = _render_jinja_template(extensions["instructionsForEndUser"], task_model) - extensions["instructionsForEndUser"] = instructions - return instructions - except TaskModelError as wfe: - wfe.add_note("Failed to render instructions for end user.") - raise ApiError.from_workflow_exception("instructions_error", str(wfe), exp=wfe) from wfe - return "" - - def _interstitial_stream( process_instance: ProcessInstanceModel, execute_tasks: bool = True, is_locked: bool = False ) -> Generator[str, str | None, None]: @@ -442,8 +424,7 @@ def _interstitial_stream( task_model = TaskModel.query.filter_by(guid=str(spiff_task.id)).first() if task_model is None: return "" - extensions = TaskService.get_extensions_from_task_model(task_model) - return _render_instructions_for_end_user(task_model, extensions) + return JinjaService.render_instructions_for_end_user(task_model) processor = ProcessInstanceProcessor(process_instance) reported_ids = [] # A list of all the ids reported by this endpoint so far. @@ -822,7 +803,7 @@ def _prepare_form_data(form_file: str, task_model: TaskModel, process_model: Pro file_contents = SpecFileService.get_data(process_model, form_file).decode("utf-8") try: - form_contents = _render_jinja_template(file_contents, task_model) + form_contents = JinjaService.render_jinja_template(file_contents, task_model) try: # form_contents is a str hot_dict: dict = json.loads(form_contents) @@ -842,30 +823,6 @@ def _prepare_form_data(form_file: str, task_model: TaskModel, process_model: Pro raise api_error -def _render_jinja_template(unprocessed_template: str, task_model: TaskModel) -> str: - jinja_environment = jinja2.Environment(autoescape=True, lstrip_blocks=True, trim_blocks=True) - try: - template = jinja_environment.from_string(unprocessed_template) - return template.render(**(task_model.get_data())) - except jinja2.exceptions.TemplateError as template_error: - wfe = TaskModelError(str(template_error), task_model=task_model, exception=template_error) - if isinstance(template_error, TemplateSyntaxError): - wfe.line_number = template_error.lineno - wfe.error_line = template_error.source.split("\n")[template_error.lineno - 1] - wfe.add_note("Jinja2 template errors can happen when trying to display task data") - raise wfe from template_error - except Exception as error: - _type, _value, tb = exc_info() - wfe = TaskModelError(str(error), task_model=task_model, exception=error) - while tb: - if tb.tb_frame.f_code.co_filename == "