methods, etc for running user tasks

needed for task_events
This commit is contained in:
mike cullerton 2022-06-29 11:40:52 -04:00
parent e374813609
commit ceef89ff55
6 changed files with 509 additions and 23 deletions

View File

@ -198,11 +198,11 @@ class FormFieldSchema(Schema):
)
# class FormSchema(Schema):
# """FormSchema."""
#
# key = marshmallow.fields.String(required=True, allow_none=False)
# fields = marshmallow.fields.List(marshmallow.fields.Nested(FormFieldSchema))
class FormSchema(Schema):
"""FormSchema."""
key = marshmallow.fields.String(required=True, allow_none=False)
fields = marshmallow.fields.List(marshmallow.fields.Nested(FormFieldSchema))
class TaskSchema(Schema):
@ -230,7 +230,7 @@ class TaskSchema(Schema):
multi_instance_type = EnumField(MultiInstanceType)
documentation = marshmallow.fields.String(required=False, allow_none=True)
# form = marshmallow.fields.Nested(FormSchema, required=False, allow_none=True)
form = marshmallow.fields.Nested(FormSchema, required=False, allow_none=True)
title = marshmallow.fields.String(required=False, allow_none=True)
process_name = marshmallow.fields.String(required=False, allow_none=True)
lane = marshmallow.fields.String(required=False, allow_none=True)

View File

@ -279,7 +279,7 @@ def process_instance_run(
process_group_id: str,
process_model_id: str,
process_instance_id: int,
do_engine_steps: bool = False,
do_engine_steps: bool = True,
) -> flask.wrappers.Response:
"""Process_instance_run."""
process_instance = ProcessInstanceService().get_process_instance(
@ -287,19 +287,20 @@ def process_instance_run(
)
processor = ProcessInstanceProcessor(process_instance)
try:
processor.do_engine_steps()
except Exception as e:
ErrorHandlingService().handle_error(processor, e)
task = processor.bpmn_process_instance.last_task
raise ApiError.from_task(
code="unknown_exception",
message=f"An unknown error occurred. Original error: {e}",
status_code=400,
task=task,
) from e
processor.save()
# ProcessInstanceService.update_task_assignments(processor)
if do_engine_steps:
try:
processor.do_engine_steps()
except Exception as e:
ErrorHandlingService().handle_error(processor, e)
task = processor.bpmn_process_instance.last_task
raise ApiError.from_task(
code="unknown_exception",
message=f"An unknown error occurred. Original error: {e}",
status_code=400,
task=task,
) from e
processor.save()
ProcessInstanceService.update_task_assignments(processor)
process_instance_api = ProcessInstanceService.processor_to_process_instance_api(
processor
@ -308,7 +309,7 @@ def process_instance_run(
process_instance_metadata = ProcessInstanceApiSchema().dump(process_instance_api)
process_instance_metadata["data"] = process_instance_data
return Response(
json.dumps(process_instance_metadata), status=201, mimetype="application/json"
json.dumps(process_instance_metadata), status=200, mimetype="application/json"
)

View File

@ -1,17 +1,26 @@
"""Process_instance_service."""
import enum
import time
from typing import Any
from typing import Any, List
from typing import Dict
from typing import Optional
from flask import current_app
from flask_bpmn.models.db import db
from SpiffWorkflow.task import Task # type: ignore
from SpiffWorkflow.task import TaskState # type: ignore
from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore
from SpiffWorkflow.bpmn.specs.ManualTask import ManualTask
from SpiffWorkflow.bpmn.specs.ScriptTask import ScriptTask
from SpiffWorkflow.bpmn.specs.UserTask import UserTask
from SpiffWorkflow.bpmn.specs.events import EndEvent, StartEvent
from SpiffWorkflow.dmn.specs.BusinessRuleTask import BusinessRuleTask
from SpiffWorkflow.exceptions import WorkflowTaskExecException
from SpiffWorkflow.specs import CancelTask, StartTask
from spiffworkflow_backend.models.process_instance import ProcessInstanceApi
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
from spiffworkflow_backend.models.task import Task
from spiffworkflow_backend.models.task_event import TaskAction
from spiffworkflow_backend.models.task_event import TaskEventModel
from spiffworkflow_backend.models.user import UserModel
@ -19,6 +28,14 @@ from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
from spiffworkflow_backend.services.process_model_service import ProcessModelService
from spiffworkflow_backend.services.user_service import UserService
class MultiInstanceType(enum.Enum):
none = "none"
looping = "looping"
parallel = "parallel"
sequential = "sequential"
class ProcessInstanceService:
@ -89,6 +106,12 @@ class ProcessInstanceService:
previous_form_data, next_task_trying_again.data
)
process_instance_api.next_task = ProcessInstanceService.spiff_task_to_api_task(next_task_trying_again, add_docs_and_forms=True)
# Update the state of the task to locked if the current user does not own the task.
# user_uids = WorkflowService.get_users_assigned_to_task(processor, next_task)
# if not UserService.in_list(user_uids, allow_admin_impersonate=True):
# workflow_api.next_task.state = WorkflowService.TASK_STATE_LOCKED
return process_instance_api
@staticmethod
@ -134,3 +157,360 @@ class ProcessInstanceService:
.first()
)
return result
@staticmethod
def update_task_assignments(processor):
"""For every upcoming user task, log a task action
that connects the assigned user(s) to that task. All
existing assignment actions for this workflow are removed from the database,
so that only the current valid actions are available. update_task_assignments
should be called whenever progress is made on a workflow."""
db.session.query(TaskEventModel). \
filter(TaskEventModel.process_instance_id == processor.process_instance_model.id). \
filter(TaskEventModel.action == TaskAction.ASSIGNMENT.value).delete()
db.session.commit()
tasks = processor.get_current_user_tasks()
for task in tasks:
user_ids = ProcessInstanceService.get_users_assigned_to_task(processor, task)
for user_id in user_ids:
ProcessInstanceService().log_task_action(user_id, processor, task, TaskAction.ASSIGNMENT.value)
@staticmethod
def get_users_assigned_to_task(processor, spiff_task) -> List[str]:
if processor.process_instance_model.process_initiator_id is None:
raise ApiError.from_task(code='invalid_workflow',
message='A process instance must have a user_id.',
task=spiff_task)
# # Standalone workflow - we only care about the current user
# elif processor.workflow_model.study_id is None and processor.workflow_model.user_id is not None:
# return [processor.workflow_model.user_id]
# Workflow associated with a study - get all the users
else:
if not hasattr(spiff_task.task_spec, 'lane') or spiff_task.task_spec.lane is None:
current_user = spiff_task.data['current_user']
principal = UserService().get_principal_by_user_id(current_user.id)
return principal.id,
# return [processor.process_instance_model.process_initiator_id]
if spiff_task.task_spec.lane not in spiff_task.data:
return [] # No users are assignable to the task at this moment
lane_users = spiff_task.data[spiff_task.task_spec.lane]
if not isinstance(lane_users, list):
lane_users = [lane_users]
lane_uids = []
for user in lane_users:
if isinstance(user, dict):
if user.get("value"):
lane_uids.append(user['value'])
else:
raise ApiError.from_task(code="task_lane_user_error",
message="Spiff Task %s lane user dict must have a key called 'value' with the user's uid in it." %
spiff_task.task_spec.name, task=spiff_task)
elif isinstance(user, str):
lane_uids.append(user)
else:
raise ApiError.from_task(code="task_lane_user_error",
message="Spiff Task %s lane user is not a string or dict" %
spiff_task.task_spec.name, task=spiff_task)
return lane_uids
@staticmethod
def get_task_type(spiff_task):
task_type = spiff_task.task_spec.__class__.__name__
task_types = [UserTask, ManualTask, BusinessRuleTask, CancelTask, ScriptTask, StartTask, EndEvent, StartEvent]
for t in task_types:
if isinstance(spiff_task.task_spec, t):
task_type = t.__name__
break
else:
task_type = "NoneTask"
return task_type
@staticmethod
def log_task_action(user_uid, processor, spiff_task, action):
task = ProcessInstanceService.spiff_task_to_api_task(spiff_task)
form_data = ProcessInstanceService.extract_form_data(spiff_task.data, spiff_task)
task_event = TaskEventModel(
# study_id=processor.workflow_model.study_id,
user_uid=user_uid,
process_instance_id=processor.process_instance_model.id,
# workflow_spec_id=processor.workflow_model.workflow_spec_id,
action=action,
task_id=str(task.id),
task_name=task.name,
task_title=task.title,
task_type=str(task.type),
task_state=task.state,
task_lane=task.lane,
form_data=form_data,
mi_type=task.multi_instance_type.value, # Some tasks have a repeat behavior.
mi_count=task.multi_instance_count, # This is the number of times the task could repeat.
mi_index=task.multi_instance_index, # And the index of the currently repeating task.
process_name=task.process_name,
# date=datetime.utcnow(), <=== For future reference, NEVER do this. Let the database set the time.
)
db.session.add(task_event)
db.session.commit()
@staticmethod
def extract_form_data(latest_data, task):
"""Extracts data from the latest_data that is directly related to the form that is being
submitted."""
data = {}
if hasattr(task.task_spec, 'form'):
for field in task.task_spec.form.fields:
if field.has_property(Task.FIELD_PROP_REPEAT):
group = field.get_property(Task.FIELD_PROP_REPEAT)
if group in latest_data:
data[group] = latest_data[group]
else:
value = ProcessInstanceService.get_dot_value(field.id, latest_data)
if value is not None:
ProcessInstanceService.set_dot_value(field.id, value, data)
return data
@staticmethod
def get_dot_value(path, source):
### Given a path in dot notation, uas as 'fruit.type' tries to find that value in
### the source, but looking deep in the dictionary.
paths = path.split(".") # [a,b,c]
s = source
index = 0
for p in paths:
index += 1
if isinstance(s, dict) and p in s:
if index == len(paths):
return s[p]
else:
s = s[p]
if path in source:
return source[path]
return None
@staticmethod
def set_dot_value(path, value, target):
### Given a path in dot notation, such as "fruit.type", and a value "apple", will
### set the value in the target dictionary, as target["fruit"]["type"]="apple"
destination = target
paths = path.split(".") # [a,b,c]
index = 0
for p in paths:
index += 1
if p not in destination:
if index == len(paths):
destination[p] = value
else:
destination[p] = {}
destination = destination[p]
return target
@staticmethod
def spiff_task_to_api_task(spiff_task, add_docs_and_forms=False):
task_type = spiff_task.task_spec.__class__.__name__
task_types = [UserTask, ManualTask, BusinessRuleTask, CancelTask, ScriptTask, StartTask, EndEvent, StartEvent]
for t in task_types:
if isinstance(spiff_task.task_spec, t):
task_type = t.__name__
break
else:
task_type = "NoneTask"
info = spiff_task.task_info()
if info["is_looping"]:
mi_type = MultiInstanceType.looping
elif info["is_sequential_mi"]:
mi_type = MultiInstanceType.sequential
elif info["is_parallel_mi"]:
mi_type = MultiInstanceType.parallel
else:
mi_type = MultiInstanceType.none
props = {}
if hasattr(spiff_task.task_spec, 'extensions'):
for key, val in spiff_task.task_spec.extensions.items():
props[key] = val
if hasattr(spiff_task.task_spec, 'lane'):
lane = spiff_task.task_spec.lane
else:
lane = None
task = Task(spiff_task.id,
spiff_task.task_spec.name,
spiff_task.task_spec.description,
task_type,
spiff_task.get_state_name(),
lane,
None,
"",
{},
mi_type,
info["mi_count"],
info["mi_index"],
process_name=spiff_task.task_spec._wf_spec.description,
properties=props
)
# Only process the form and documentation if requested.
# The task should be in a completed or a ready state, and should
# not be a previously completed MI Task.
if add_docs_and_forms:
task.data = spiff_task.data
if hasattr(spiff_task.task_spec, "form"):
task.form = spiff_task.task_spec.form
for i, field in enumerate(task.form.fields):
task.form.fields[i] = ProcessInstanceService.process_options(spiff_task, field)
# If there is a default value, set it.
#if field.id not in task.data and ProcessInstanceService.get_default_value(field, spiff_task) is not None:
# task.data[field.id] = ProcessInstanceService.get_default_value(field, spiff_task)
# task.documentation = ProcessInstanceService._process_documentation(spiff_task)
task.documentation = spiff_task.task_spec.documentation if hasattr(spiff_task.task_spec, 'documentation') else None
# All ready tasks should have a valid name, and this can be computed for
# some tasks, particularly multi-instance tasks that all have the same spec
# but need different labels.
# if spiff_task.state == TaskState.READY:
# task.properties = ProcessInstanceService._process_properties(spiff_task, props)
#
# task.title = ProcessInstanceService.__calculate_title(spiff_task)
if task.properties and "clear_data" in task.properties:
if task.form and task.properties['clear_data'] == 'True':
for i in range(len(task.form.fields)):
task.data.pop(task.form.fields[i].id, None)
# Pass help text through the Jinja parser
if task.form and task.form.fields:
for field in task.form.fields:
if field.properties:
for field_property in field.properties:
if field_property.id == 'help':
jinja_text = JinjaService().get_content(field_property.value, task.data)
field_property.value = jinja_text
return task
@staticmethod
def _process_properties(spiff_task, props):
"""Runs all the property values through the Jinja2 processor to inject data."""
for k, v in props.items():
try:
props[k] = JinjaService.get_content(v, spiff_task.data)
except jinja2.exceptions.TemplateError as ue:
app.logger.error(f'Failed to process task property {str(ue)}', exc_info=True)
return props
@staticmethod
def process_options(spiff_task, field):
if field.type != Task.FIELD_TYPE_ENUM:
return field
if hasattr(field, 'options') and len(field.options) > 1:
return field
elif not (field.has_property(Task.FIELD_PROP_VALUE_COLUMN) or
field.has_property(Task.FIELD_PROP_LABEL_COLUMN)):
raise ApiError.from_task("invalid_enum",
f"For enumerations, you must include options, or a way to generate options from"
f" a spreadsheet or data set. Please set either a spreadsheet name or data name,"
f" along with the value and label columns to use from these sources. Valid params"
f" include: "
f"{Task.FIELD_PROP_SPREADSHEET_NAME}, "
f"{Task.FIELD_PROP_DATA_NAME}, "
f"{Task.FIELD_PROP_VALUE_COLUMN}, "
f"{Task.FIELD_PROP_LABEL_COLUMN}", task=spiff_task)
if field.has_property(Task.FIELD_PROP_SPREADSHEET_NAME):
lookup_model = LookupService.get_lookup_model(spiff_task, field)
data = db.session.query(LookupDataModel).filter(LookupDataModel.lookup_file_model == lookup_model).all()
for d in data:
field.add_option(d.value, d.label)
elif field.has_property(Task.FIELD_PROP_DATA_NAME):
field.options = ProcessInstanceService.get_options_from_task_data(spiff_task, field)
return field
@staticmethod
def get_options_from_task_data(spiff_task, field):
prop = field.get_property(Task.FIELD_PROP_DATA_NAME)
if prop not in spiff_task.data:
raise ApiError.from_task("invalid_enum", f"For enumerations based on task data, task data must have "
f"a property called {prop}", task=spiff_task)
# Get the enum options from the task data
data_model = spiff_task.data[prop]
value_column = field.get_property(Task.FIELD_PROP_VALUE_COLUMN)
label_column = field.get_property(Task.FIELD_PROP_LABEL_COLUMN)
items = data_model.items() if isinstance(data_model, dict) else data_model
options = []
for item in items:
if value_column not in item:
raise ApiError.from_task("invalid_enum",
f"The value column '{value_column}' does not exist for item {item}",
task=spiff_task)
if label_column not in item:
raise ApiError.from_task("invalid_enum",
f"The label column '{label_column}' does not exist for item {item}",
task=spiff_task)
options.append(Box({"id": item[value_column], "name": item[label_column], "data": item}))
return options
@staticmethod
def _process_documentation(spiff_task):
"""Runs the given documentation string through the Jinja2 processor to inject data
create loops, etc... - If a markdown file exists with the same name as the task id,
it will use that file instead of the documentation. """
documentation = spiff_task.task_spec.documentation if hasattr(spiff_task.task_spec, "documentation") else ""
try:
doc_file_name = spiff_task.task_spec.name + ".md"
workflow_id = WorkflowService.workflow_id_from_spiff_task(spiff_task)
workflow = db.session.query(WorkflowModel).filter(WorkflowModel.id == workflow_id).first()
spec_service = WorkflowSpecService()
data = SpecFileService.get_data(spec_service.get_spec(workflow.workflow_spec_id), doc_file_name)
raw_doc = data.decode("utf-8")
except ApiError:
raw_doc = documentation
if not raw_doc:
return ""
try:
return JinjaService.get_content(raw_doc, spiff_task.data)
except jinja2.exceptions.TemplateSyntaxError as tse:
lines = tse.source.splitlines()
error_line = ""
if len(lines) >= tse.lineno - 1:
error_line = tse.source.splitlines()[tse.lineno - 1]
raise ApiError.from_task(code="template_error", message="Jinja Template Error: %s" % str(tse),
task=spiff_task, line_number=tse.lineno, error_line=error_line)
except jinja2.exceptions.TemplateError as te:
# Figure out the line number in the template that caused the error.
cl, exc, tb = sys.exc_info()
line_number = None
error_line = None
for frameSummary in traceback.extract_tb(tb):
if frameSummary.filename == '<template>':
line_number = frameSummary.lineno
lines = documentation.splitlines()
error_line = ""
if len(lines) > line_number:
error_line = lines[line_number - 1]
raise ApiError.from_task(code="template_error", message="Jinja Template Error: %s" % str(te),
task=spiff_task, line_number=line_number, error_line=error_line)
except TypeError as te:
raise ApiError.from_task(code="template_error", message="Jinja Template Error: %s" % str(te),
task=spiff_task)
except Exception as e:
app.logger.error(str(e), exc_info=True)

View File

@ -7,6 +7,7 @@ from flask_bpmn.api.api_error import ApiError
from flask_bpmn.models.db import db
from spiffworkflow_backend.models.user import AdminSessionModel
from spiffworkflow_backend.models.principal import PrincipalModel
from spiffworkflow_backend.models.user import UserModel
@ -171,3 +172,9 @@ class UserService:
"You do not have permissions to do this.",
status_code=403,
)
@staticmethod
def get_principal_by_user_id(user_id):
principal = db.session.query(PrincipalModel).filter(PrincipalModel.user_id == user_id).first()
if principal:
return principal

View File

@ -0,0 +1,65 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:camunda="http://camunda.org/schema/1.0/bpmn" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" id="Definitions_0ed38cx" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="4.2.0">
<bpmn:process id="Process_UserTask" name="UserTask" isExecutable="true">
<bpmn:startEvent id="StartEvent_1">
<bpmn:outgoing>Flow_0vj0gx7</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_0vj0gx7" sourceRef="StartEvent_1" targetRef="Activity_ScriptTask" />
<bpmn:endEvent id="Event_EndEvent" name="Display Form Data">
<bpmn:documentation>## Name
{{ name }}</bpmn:documentation>
<bpmn:incoming>Flow_1y1c27e</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_1y1c27e" sourceRef="Activity_UserTask" targetRef="Event_EndEvent" />
<bpmn:userTask id="Activity_UserTask" name="User Task" camunda:formKey="UserForm">
<bpmn:extensionElements>
<camunda:formData>
<camunda:formField id="name" label="Name" type="string">
<camunda:validation>
<camunda:constraint />
</camunda:validation>
</camunda:formField>
</camunda:formData>
</bpmn:extensionElements>
<bpmn:incoming>Flow_0s4f93a</bpmn:incoming>
<bpmn:outgoing>Flow_1y1c27e</bpmn:outgoing>
</bpmn:userTask>
<bpmn:sequenceFlow id="Flow_0s4f93a" sourceRef="Activity_ScriptTask" targetRef="Activity_UserTask" />
<bpmn:scriptTask id="Activity_ScriptTask" name="Script Task">
<bpmn:incoming>Flow_0vj0gx7</bpmn:incoming>
<bpmn:outgoing>Flow_0s4f93a</bpmn:outgoing>
<bpmn:script>a = 1</bpmn:script>
</bpmn:scriptTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Process_UserTask">
<bpmndi:BPMNEdge id="Flow_0vj0gx7_di" bpmnElement="Flow_0vj0gx7">
<di:waypoint x="215" y="117" />
<di:waypoint x="250" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1y1c27e_di" bpmnElement="Flow_1y1c27e">
<di:waypoint x="500" y="117" />
<di:waypoint x="562" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0s4f93a_di" bpmnElement="Flow_0s4f93a">
<di:waypoint x="350" y="117" />
<di:waypoint x="400" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="99" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_17li8o1_di" bpmnElement="Event_EndEvent">
<dc:Bounds x="562" y="99" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="550" y="142" width="65" height="27" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_07bafvv_di" bpmnElement="Activity_UserTask">
<dc:Bounds x="400" y="77" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0zpegud_di" bpmnElement="Activity_ScriptTask">
<dc:Bounds x="250" y="77" width="100" height="80" />
</bpmndi:BPMNShape>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>

View File

@ -25,6 +25,7 @@ from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
from spiffworkflow_backend.models.process_model import NotificationType
from spiffworkflow_backend.models.process_model import ProcessModelInfo
from spiffworkflow_backend.models.process_model import ProcessModelInfoSchema
from spiffworkflow_backend.models.task_event import TaskEventModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_model_service import ProcessModelService
@ -604,10 +605,39 @@ def test_process_instance_run(
assert response.json["data"]["person"] == "Kevin"
def test_process_instance_run_user_task(
app: Flask, client: FlaskClient, with_bpmn_file_cleanup: None
) -> None:
db.session.query(TaskEventModel).delete()
db.session.query(ProcessInstanceModel).delete()
db.session.commit()
process_group_id = 'my_process_group'
process_model_id = 'user_task'
user = find_or_create_user()
headers = logged_in_headers(user)
response = create_process_instance(
client, process_group_id, process_model_id, headers
)
assert response.json is not None
process_instance_id = response.json["id"]
response = client.post(
f"/v1.0/process-models/{process_group_id}/{process_model_id}/process-instances/{process_instance_id}/run",
headers=logged_in_headers(user),
)
assert response.json is not None
print(f'test_process_instance_run_user_task: {process_instance_id}')
def test_process_instance_list_with_default_list(
app: Flask, client: FlaskClient, with_bpmn_file_cleanup: None
) -> None:
"""Test_process_instance_list_with_default_list."""
db.session.query(TaskEventModel).delete()
db.session.query(ProcessInstanceModel).delete()
db.session.commit()
@ -644,6 +674,7 @@ def test_process_instance_list_with_paginated_items(
app: Flask, client: FlaskClient, with_bpmn_file_cleanup: None
) -> None:
"""Test_process_instance_list_with_paginated_items."""
db.session.query(TaskEventModel).delete()
db.session.query(ProcessInstanceModel).delete()
db.session.commit()
@ -694,6 +725,7 @@ def test_process_instance_list_filter(
app: Flask, client: FlaskClient, with_bpmn_file_cleanup: None
) -> None:
"""Test_process_instance_list_filter."""
db.session.query(TaskEventModel).delete()
db.session.query(ProcessInstanceModel).delete()
db.session.commit()
@ -839,6 +871,7 @@ def test_error_handler(
app: Flask, client: FlaskClient, with_bpmn_file_cleanup: None
) -> None:
"""Test_error_handler."""
db.session.query(TaskEventModel).delete()
db.session.query(ProcessInstanceModel).delete()
db.session.commit()