Feature/sanitize jinja template (#389)

* added a helpers class that can be used in jinja templates

* added jinja helpers to script task imports

* added some comments for jinja helper usage

* added tests for jinja helpers

* pyl

* pass jinja helper mappings in so helpers can be used consistently

---------

Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
jasquat 2023-07-13 11:05:49 -04:00 committed by GitHub
parent cde64400ea
commit a7d0fbb38c
5 changed files with 185 additions and 47 deletions

View File

@ -3,12 +3,10 @@ import json
import os
import uuid
from collections.abc import Generator
from sys import exc_info
from typing import Any
from typing import TypedDict
import flask.wrappers
import jinja2
import sentry_sdk
from flask import current_app
from flask import g
@ -16,7 +14,6 @@ from flask import jsonify
from flask import make_response
from flask import stream_with_context
from flask.wrappers import Response
from jinja2 import TemplateSyntaxError
from SpiffWorkflow.bpmn.exceptions import WorkflowTaskException # type: ignore
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
@ -53,6 +50,7 @@ from spiffworkflow_backend.services.authorization_service import HumanTaskNotFou
from spiffworkflow_backend.services.authorization_service import UserDoesNotHaveAccessToTaskError
from spiffworkflow_backend.services.error_handling_service import ErrorHandlingService
from spiffworkflow_backend.services.file_system_service import FileSystemService
from spiffworkflow_backend.services.jinja_service import JinjaService
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceIsAlreadyLockedError
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService
@ -400,7 +398,7 @@ def task_show(process_instance_id: int, task_guid: str = "next") -> flask.wrappe
task_model.form_ui_schema = ui_form_contents
_munge_form_ui_schema_based_on_hidden_fields_in_task_data(task_model)
_render_instructions_for_end_user(task_model, extensions)
JinjaService.render_instructions_for_end_user(task_model, extensions)
task_model.extensions = extensions
return make_response(jsonify(task_model), 200)
@ -414,22 +412,6 @@ def task_submit(
return _task_submit_shared(process_instance_id, task_guid, body)
def _render_instructions_for_end_user(task_model: TaskModel, extensions: dict | None = None) -> str:
"""Assure any instructions for end user are processed for jinja syntax."""
if extensions is None:
extensions = TaskService.get_extensions_from_task_model(task_model)
if extensions and "instructionsForEndUser" in extensions:
if extensions["instructionsForEndUser"]:
try:
instructions = _render_jinja_template(extensions["instructionsForEndUser"], task_model)
extensions["instructionsForEndUser"] = instructions
return instructions
except TaskModelError as wfe:
wfe.add_note("Failed to render instructions for end user.")
raise ApiError.from_workflow_exception("instructions_error", str(wfe), exp=wfe) from wfe
return ""
def _interstitial_stream(
process_instance: ProcessInstanceModel, execute_tasks: bool = True, is_locked: bool = False
) -> Generator[str, str | None, None]:
@ -442,8 +424,7 @@ def _interstitial_stream(
task_model = TaskModel.query.filter_by(guid=str(spiff_task.id)).first()
if task_model is None:
return ""
extensions = TaskService.get_extensions_from_task_model(task_model)
return _render_instructions_for_end_user(task_model, extensions)
return JinjaService.render_instructions_for_end_user(task_model)
processor = ProcessInstanceProcessor(process_instance)
reported_ids = [] # A list of all the ids reported by this endpoint so far.
@ -822,7 +803,7 @@ def _prepare_form_data(form_file: str, task_model: TaskModel, process_model: Pro
file_contents = SpecFileService.get_data(process_model, form_file).decode("utf-8")
try:
form_contents = _render_jinja_template(file_contents, task_model)
form_contents = JinjaService.render_jinja_template(file_contents, task_model)
try:
# form_contents is a str
hot_dict: dict = json.loads(form_contents)
@ -842,30 +823,6 @@ def _prepare_form_data(form_file: str, task_model: TaskModel, process_model: Pro
raise api_error
def _render_jinja_template(unprocessed_template: str, task_model: TaskModel) -> str:
jinja_environment = jinja2.Environment(autoescape=True, lstrip_blocks=True, trim_blocks=True)
try:
template = jinja_environment.from_string(unprocessed_template)
return template.render(**(task_model.get_data()))
except jinja2.exceptions.TemplateError as template_error:
wfe = TaskModelError(str(template_error), task_model=task_model, exception=template_error)
if isinstance(template_error, TemplateSyntaxError):
wfe.line_number = template_error.lineno
wfe.error_line = template_error.source.split("\n")[template_error.lineno - 1]
wfe.add_note("Jinja2 template errors can happen when trying to display task data")
raise wfe from template_error
except Exception as error:
_type, _value, tb = exc_info()
wfe = TaskModelError(str(error), task_model=task_model, exception=error)
while tb:
if tb.tb_frame.f_code.co_filename == "<template>":
wfe.line_number = tb.tb_lineno
wfe.error_line = unprocessed_template.split("\n")[tb.tb_lineno - 1]
tb = tb.tb_next
wfe.add_note("Jinja2 template errors can happen when trying to display task data")
raise wfe from error
def _get_spiff_task_from_process_instance(
task_guid: str,
process_instance: ProcessInstanceModel,

View File

@ -0,0 +1,76 @@
import re
from sys import exc_info
import jinja2
from jinja2 import TemplateSyntaxError
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.services.task_service import TaskModelError
from spiffworkflow_backend.services.task_service import TaskService
class JinjaHelpers:
"""These are helpers that added to script tasks and to jinja for rendering templates.
Can be used from a jinja template as a filter like:
This is a template for {{ unsanitized_variable | sanitize_for_md }}.
Or as a python-style method call like:
This is a template for {{ sanitize_for_md(unsanitized_variable) }}.
It can also be used from a script task like:
sanitized_variable = sanitize_for_md(unsanitized_variable)
"""
@classmethod
def get_helper_mapping(cls) -> dict:
"""So we can use filter syntax in markdown."""
return {"sanitize_for_md": JinjaHelpers.sanitize_for_md}
@classmethod
def sanitize_for_md(cls, value: str) -> str:
"""Sanitizes given value for markdown."""
sanitized_value = re.sub(r"([|])", r"\\\1", value)
return sanitized_value
class JinjaService:
@classmethod
def render_instructions_for_end_user(cls, task_model: TaskModel, extensions: dict | None = None) -> str:
"""Assure any instructions for end user are processed for jinja syntax."""
if extensions is None:
extensions = TaskService.get_extensions_from_task_model(task_model)
if extensions and "instructionsForEndUser" in extensions:
if extensions["instructionsForEndUser"]:
try:
instructions = cls.render_jinja_template(extensions["instructionsForEndUser"], task_model)
extensions["instructionsForEndUser"] = instructions
return instructions
except TaskModelError as wfe:
wfe.add_note("Failed to render instructions for end user.")
raise ApiError.from_workflow_exception("instructions_error", str(wfe), exp=wfe) from wfe
return ""
@classmethod
def render_jinja_template(cls, unprocessed_template: str, task_model: TaskModel) -> str:
jinja_environment = jinja2.Environment(autoescape=True, lstrip_blocks=True, trim_blocks=True)
jinja_environment.filters.update(JinjaHelpers.get_helper_mapping())
try:
template = jinja_environment.from_string(unprocessed_template)
return template.render(**(task_model.get_data()), **JinjaHelpers.get_helper_mapping())
except jinja2.exceptions.TemplateError as template_error:
wfe = TaskModelError(str(template_error), task_model=task_model, exception=template_error)
if isinstance(template_error, TemplateSyntaxError):
wfe.line_number = template_error.lineno
wfe.error_line = template_error.source.split("\n")[template_error.lineno - 1]
wfe.add_note("Jinja2 template errors can happen when trying to display task data")
raise wfe from template_error
except Exception as error:
_type, _value, tb = exc_info()
wfe = TaskModelError(str(error), task_model=task_model, exception=error)
while tb:
if tb.tb_frame.f_code.co_filename == "<template>":
wfe.line_number = tb.tb_lineno
wfe.error_line = unprocessed_template.split("\n")[tb.tb_lineno - 1]
tb = tb.tb_next
wfe.add_note("Jinja2 template errors can happen when trying to display task data")
raise wfe from error

View File

@ -71,6 +71,7 @@ from spiffworkflow_backend.scripts.script import Script
from spiffworkflow_backend.services.custom_parser import MyCustomParser
from spiffworkflow_backend.services.element_units_service import ElementUnitsService
from spiffworkflow_backend.services.file_system_service import FileSystemService
from spiffworkflow_backend.services.jinja_service import JinjaHelpers
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService
from spiffworkflow_backend.services.process_instance_tmp_service import ProcessInstanceTmpService
from spiffworkflow_backend.services.process_model_service import ProcessModelService
@ -284,6 +285,7 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore
"time": time,
"timedelta": timedelta,
"uuid": uuid,
**JinjaHelpers.get_helper_mapping(),
}
use_restricted_script_engine = True

View File

@ -0,0 +1,59 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:spiffworkflow="http://spiffworkflow.org/bpmn/schema/1.0/core" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" id="Definitions_96f6665" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="3.0.0-dev">
<bpmn:process id="Process_eadk5mh" isExecutable="true">
<bpmn:startEvent id="StartEvent_1">
<bpmn:outgoing>Flow_1owskjp</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_1owskjp" sourceRef="StartEvent_1" targetRef="set_variables" />
<bpmn:sequenceFlow id="Flow_0rd9v57" sourceRef="set_variables" targetRef="display_variables" />
<bpmn:endEvent id="Event_14nu5rm">
<bpmn:incoming>Flow_1oco0t1</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_1oco0t1" sourceRef="display_variables" targetRef="Event_14nu5rm" />
<bpmn:scriptTask id="set_variables" name="Set Variables">
<bpmn:incoming>Flow_1owskjp</bpmn:incoming>
<bpmn:outgoing>Flow_0rd9v57</bpmn:outgoing>
<bpmn:script>from_filter = "Sanitized | from | filter"
from_method_call = "Sanitized | from | method | call"
from_script_task = sanitize_for_md("Sanitized | from | script | task")</bpmn:script>
</bpmn:scriptTask>
<bpmn:manualTask id="display_variables" name="Display Variables">
<bpmn:extensionElements>
<spiffworkflow:instructionsForEndUser>* From Filter: {{ from_filter | sanitize_for_md }}
* From Method Call: {{ sanitize_for_md(from_method_call) }}
* From ScriptTask: {{ from_script_task }}</spiffworkflow:instructionsForEndUser>
</bpmn:extensionElements>
<bpmn:incoming>Flow_0rd9v57</bpmn:incoming>
<bpmn:outgoing>Flow_1oco0t1</bpmn:outgoing>
</bpmn:manualTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Process_eadk5mh">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_14nu5rm_di" bpmnElement="Event_14nu5rm">
<dc:Bounds x="592" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0es22ks_di" bpmnElement="set_variables">
<dc:Bounds x="270" y="137" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_093h5uh_di" bpmnElement="display_variables">
<dc:Bounds x="430" y="137" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_1owskjp_di" bpmnElement="Flow_1owskjp">
<di:waypoint x="215" y="177" />
<di:waypoint x="270" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0rd9v57_di" bpmnElement="Flow_0rd9v57">
<di:waypoint x="370" y="177" />
<di:waypoint x="430" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1oco0t1_di" bpmnElement="Flow_1oco0t1">
<di:waypoint x="530" y="177" />
<di:waypoint x="592" y="177" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>

View File

@ -0,0 +1,44 @@
from flask import Flask
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.services.jinja_service import JinjaService
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
class TestJinjaService(BaseTest):
def test_can_sanitize_string(
self,
app: Flask,
with_db_and_bpmn_file_cleanup: None,
) -> None:
process_model = load_test_spec(
process_model_id="test_group/manual-task-with-sanitized-markdown",
process_model_source_directory="manual-task-with-sanitized-markdown",
)
process_instance = self.create_process_instance_from_process_model(process_model=process_model)
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)
assert len(process_instance.active_human_tasks) == 1
human_task = process_instance.active_human_tasks[0]
task_model = TaskModel.query.filter_by(guid=human_task.task_id).first()
assert task_model is not None
expected_result = "\n".join(
[
r"* From Filter: Sanitized \| from \| filter",
r"* From Method Call: Sanitized \| from \| method \| call",
r"* From ScriptTask: Sanitized \| from \| script \| task",
]
)
result = JinjaService.render_instructions_for_end_user(task_model)
assert result == expected_result
expected_task_data = {
"from_filter": "Sanitized | from | filter",
"from_method_call": "Sanitized | from | method | call",
"from_script_task": "Sanitized \\| from \\| script \\| task",
}
assert task_model.get_data() == expected_task_data