Merge pull request #173 from sartography/feature/save_tasks_one_at_a_time

Feature/save tasks one at a time
This commit is contained in:
Kevin Burnett 2023-03-10 14:01:27 -08:00 committed by GitHub
commit cd49a6b743
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 620 additions and 126 deletions

View File

@ -17,10 +17,10 @@ per-file-ignores =
# THEN, test_hey.py will NOT be excluding D103
# asserts are ok in tests
spiffworkflow-backend/tests/*:S101,D100,D101,D102,D103
spiffworkflow-backend/tests/*:S101,D100,D101,D102,D103,D107
# prefer naming functions descriptively rather than forcing comments
spiffworkflow-backend/src/*:D100,D101,D102,D103
spiffworkflow-backend/src/*:D100,D101,D102,D103,D107
spiffworkflow-backend/bin/keycloak_test_server.py:B950,D
spiffworkflow-backend/conftest.py:S105

View File

@ -17,10 +17,10 @@ per-file-ignores =
# THEN, test_hey.py will NOT be excluding D103
# asserts are ok in tests
tests/*:S101,D100,D101,D102,D103
tests/*:S101,D100,D101,D102,D103,D107
# prefer naming functions descriptively rather than forcing comments
src/*:D100,D101,D102,D103
src/*:D100,D101,D102,D103,D107
bin/keycloak_test_server.py:B950,D
conftest.py:S105

View File

@ -50,7 +50,6 @@ from SpiffWorkflow.serializer.exceptions import MissingSpecError # type: ignore
from SpiffWorkflow.spiff.serializer.config import SPIFF_SPEC_CONFIG # type: ignore
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.task import TaskState
from SpiffWorkflow.task import TaskStateNames
from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore
from sqlalchemy import text
@ -93,6 +92,7 @@ from spiffworkflow_backend.services.file_system_service import FileSystemService
from spiffworkflow_backend.services.process_model_service import ProcessModelService
from spiffworkflow_backend.services.service_task_service import ServiceTaskDelegate
from spiffworkflow_backend.services.spec_file_service import SpecFileService
from spiffworkflow_backend.services.task_service import TaskService
from spiffworkflow_backend.services.user_service import UserService
from spiffworkflow_backend.services.workflow_execution_service import (
execution_strategy_named,
@ -100,6 +100,9 @@ from spiffworkflow_backend.services.workflow_execution_service import (
from spiffworkflow_backend.services.workflow_execution_service import (
StepDetailLoggingDelegate,
)
from spiffworkflow_backend.services.workflow_execution_service import (
TaskModelSavingDelegate,
)
from spiffworkflow_backend.services.workflow_execution_service import (
WorkflowExecutionService,
)
@ -152,6 +155,10 @@ class SpiffStepDetailIsMissingError(Exception):
pass
class TaskNotFoundError(Exception):
pass
class BoxedTaskDataBasedScriptEngineEnvironment(BoxedTaskDataEnvironment): # type: ignore
def __init__(self, environment_globals: Dict[str, Any]):
"""BoxedTaskDataBasedScriptEngineEnvironment."""
@ -802,7 +809,7 @@ class ProcessInstanceProcessor:
if start_in_seconds is None:
start_in_seconds = time.time()
task_json = self.get_task_json_from_spiff_task(spiff_task)
task_json = self.get_task_dict_from_spiff_task(spiff_task)
return {
"process_instance_id": self.process_instance_model.id,
@ -1034,91 +1041,12 @@ class ProcessInstanceProcessor:
bpmn_process_definition_parent
)
def _add_bpmn_process(
self,
bpmn_process_dict: dict,
bpmn_process_parent: Optional[BpmnProcessModel] = None,
bpmn_process_guid: Optional[str] = None,
) -> BpmnProcessModel:
tasks = bpmn_process_dict.pop("tasks")
bpmn_process_data = bpmn_process_dict.pop("data")
bpmn_process = None
if bpmn_process_parent is not None:
bpmn_process = BpmnProcessModel.query.filter_by(
parent_process_id=bpmn_process_parent.id, guid=bpmn_process_guid
).first()
elif self.process_instance_model.bpmn_process_id is not None:
bpmn_process = self.process_instance_model.bpmn_process
if bpmn_process is None:
bpmn_process = BpmnProcessModel(guid=bpmn_process_guid)
bpmn_process.properties_json = bpmn_process_dict
bpmn_process_data_json = json.dumps(bpmn_process_data, sort_keys=True).encode(
"utf8"
)
bpmn_process_data_hash = sha256(bpmn_process_data_json).hexdigest()
if bpmn_process.json_data_hash != bpmn_process_data_hash:
json_data = (
db.session.query(JsonDataModel.id)
.filter_by(hash=bpmn_process_data_hash)
.first()
)
if json_data is None:
json_data = JsonDataModel(
hash=bpmn_process_data_hash, data=bpmn_process_data
)
db.session.add(json_data)
bpmn_process.json_data_hash = bpmn_process_data_hash
if bpmn_process_parent is None:
self.process_instance_model.bpmn_process = bpmn_process
elif bpmn_process.parent_process_id is None:
bpmn_process.parent_process_id = bpmn_process_parent.id
db.session.add(bpmn_process)
for task_id, task_properties in tasks.items():
task_data_dict = task_properties.pop("data")
state_int = task_properties["state"]
task = TaskModel.query.filter_by(guid=task_id).first()
if task is None:
# bpmn_process_identifier = task_properties['workflow_name']
# bpmn_identifier = task_properties['task_spec']
#
# task_definition = TaskDefinitionModel.query.filter_by(bpmn_identifier=bpmn_identifier)
# .join(BpmnProcessDefinitionModel).filter(BpmnProcessDefinitionModel.bpmn_identifier==bpmn_process_identifier).first()
# if task_definition is None:
# subprocess_task = TaskModel.query.filter_by(guid=bpmn_process.guid)
task = TaskModel(guid=task_id, bpmn_process_id=bpmn_process.id)
task.state = TaskStateNames[state_int]
task.properties_json = task_properties
task_data_json = json.dumps(task_data_dict, sort_keys=True).encode("utf8")
task_data_hash = sha256(task_data_json).hexdigest()
if task.json_data_hash != task_data_hash:
json_data = (
db.session.query(JsonDataModel.id)
.filter_by(hash=task_data_hash)
.first()
)
if json_data is None:
json_data = JsonDataModel(hash=task_data_hash, data=task_data_dict)
db.session.add(json_data)
task.json_data_hash = task_data_hash
db.session.add(task)
return bpmn_process
def _add_bpmn_json_records(self) -> None:
"""Adds serialized_bpmn_definition and process_instance_data records to the db session.
Expects the save method to commit it.
"""
bpmn_dict = json.loads(self.serialize())
# with open('tmp2.json', 'w') as f: f.write(json.dumps(bpmn_dict)
bpmn_dict_keys = ("spec", "subprocess_specs", "serializer_version")
process_instance_data_dict = {}
bpmn_spec_dict = {}
@ -1132,14 +1060,14 @@ class ProcessInstanceProcessor:
# if self.process_instance_model.bpmn_process_definition_id is None:
self._add_bpmn_process_definitions(bpmn_spec_dict)
# FIXME: Update tasks in the did_complete_task instead to set the final info.
# We will need to somehow cache all tasks initially though before each task is run.
# Maybe always do this for first run - just need to know it's the first run.
subprocesses = process_instance_data_dict.pop("subprocesses")
bpmn_process_parent = self._add_bpmn_process(process_instance_data_dict)
bpmn_process_parent = TaskService.add_bpmn_process(
process_instance_data_dict, self.process_instance_model
)
for subprocess_task_id, subprocess_properties in subprocesses.items():
self._add_bpmn_process(
TaskService.add_bpmn_process(
subprocess_properties,
self.process_instance_model,
bpmn_process_parent,
bpmn_process_guid=subprocess_task_id,
)
@ -1691,8 +1619,13 @@ class ProcessInstanceProcessor:
step_delegate = StepDetailLoggingDelegate(
self.increment_spiff_step, spiff_step_details_mapping_builder
)
task_model_delegate = TaskModelSavingDelegate(
secondary_engine_step_delegate=step_delegate,
serializer=self._serializer,
process_instance=self.process_instance_model,
)
execution_strategy = execution_strategy_named(
execution_strategy_name, step_delegate
execution_strategy_name, task_model_delegate
)
execution_service = WorkflowExecutionService(
self.bpmn_process_instance,
@ -1871,7 +1804,7 @@ class ProcessInstanceProcessor:
)
return user_tasks # type: ignore
def get_task_json_from_spiff_task(self, spiff_task: SpiffTask) -> dict[str, Any]:
def get_task_dict_from_spiff_task(self, spiff_task: SpiffTask) -> dict[str, Any]:
default_registry = DefaultRegistry()
task_data = default_registry.convert(spiff_task.data)
python_env = default_registry.convert(
@ -1884,17 +1817,29 @@ class ProcessInstanceProcessor:
return task_json
def complete_task(
self, task: SpiffTask, human_task: HumanTaskModel, user: UserModel
self, spiff_task: SpiffTask, human_task: HumanTaskModel, user: UserModel
) -> None:
"""Complete_task."""
self.bpmn_process_instance.complete_task_from_id(task.id)
task_model = TaskModel.query.filter_by(guid=human_task.task_id).first()
if task_model is None:
raise TaskNotFoundError(
"Cannot find a task with guid"
f" {self.process_instance_model.id} and task_id is {human_task.task_id}"
)
task_model.start_in_seconds = time.time()
self.bpmn_process_instance.complete_task_from_id(spiff_task.id)
task_model.end_in_seconds = time.time()
human_task.completed_by_user_id = user.id
human_task.completed = True
db.session.add(human_task)
# FIXME: remove when we switch over to using tasks only
details_model = (
SpiffStepDetailsModel.query.filter_by(
process_instance_id=self.process_instance_model.id,
task_id=str(task.id),
task_id=str(spiff_task.id),
task_state="READY",
)
.order_by(SpiffStepDetailsModel.id.desc()) # type: ignore
@ -1903,13 +1848,19 @@ class ProcessInstanceProcessor:
if details_model is None:
raise SpiffStepDetailIsMissingError(
"Cannot find a ready spiff_step_detail entry for process instance"
f" {self.process_instance_model.id} and task_id is {task.id}"
f" {self.process_instance_model.id} and task_id is {spiff_task.id}"
)
details_model.task_state = task.get_state_name()
details_model.task_state = spiff_task.get_state_name()
details_model.end_in_seconds = time.time()
details_model.task_json = self.get_task_json_from_spiff_task(task)
details_model.task_json = self.get_task_dict_from_spiff_task(spiff_task)
db.session.add(details_model)
# #######
TaskService.update_task_model_and_add_to_db_session(
task_model, spiff_task, self._serializer
)
# this is the thing that actually commits the db transaction (on behalf of the other updates above as well)
self.save()

View File

@ -0,0 +1,200 @@
import json
from hashlib import sha256
from typing import Optional
from typing import Tuple
from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflow # type: ignore
from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.task import TaskStateNames
from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.json_data import JsonDataModel # noqa: F401
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
class TaskService:
@classmethod
def update_task_data_on_task_model(
cls, task_model: TaskModel, task_data_dict: dict
) -> None:
task_data_json = json.dumps(task_data_dict, sort_keys=True)
task_data_hash = sha256(task_data_json.encode("utf8")).hexdigest()
if task_model.json_data_hash != task_data_hash:
json_data = (
db.session.query(JsonDataModel.id)
.filter_by(hash=task_data_hash)
.first()
)
if json_data is None:
json_data = JsonDataModel(hash=task_data_hash, data=task_data_dict)
db.session.add(json_data)
task_model.json_data_hash = task_data_hash
@classmethod
def update_task_model_and_add_to_db_session(
cls,
task_model: TaskModel,
spiff_task: SpiffTask,
serializer: BpmnWorkflowSerializer,
) -> None:
"""Updates properties_json and data on given task_model.
This will NOT update start_in_seconds or end_in_seconds.
"""
new_properties_json = serializer.task_to_dict(spiff_task)
spiff_task_data = new_properties_json.pop("data")
task_model.properties_json = new_properties_json
task_model.state = TaskStateNames[new_properties_json["state"]]
cls.update_task_data_on_task_model(task_model, spiff_task_data)
db.session.add(task_model)
@classmethod
def find_or_create_task_model_from_spiff_task(
cls,
spiff_task: SpiffTask,
process_instance: ProcessInstanceModel,
serializer: BpmnWorkflowSerializer,
) -> TaskModel:
spiff_task_guid = str(spiff_task.id)
task_model: Optional[TaskModel] = TaskModel.query.filter_by(
guid=spiff_task_guid
).first()
if task_model is None:
bpmn_process = cls.task_bpmn_process(
spiff_task, process_instance, serializer
)
task_model = TaskModel.query.filter_by(guid=spiff_task_guid).first()
if task_model is None:
task_model = TaskModel(
guid=spiff_task_guid, bpmn_process_id=bpmn_process.id
)
return task_model
@classmethod
def task_subprocess(
cls, spiff_task: SpiffTask
) -> Tuple[Optional[str], Optional[BpmnWorkflow]]:
top_level_workflow = spiff_task.workflow._get_outermost_workflow()
my_wf = spiff_task.workflow # This is the workflow the spiff_task is part of
my_sp = None
my_sp_id = None
if my_wf != top_level_workflow:
# All the subprocesses are at the top level, so you can just compare them
for sp_id, sp in top_level_workflow.subprocesses.items():
if sp == my_wf:
my_sp = sp
my_sp_id = sp_id
break
return (str(my_sp_id), my_sp)
@classmethod
def task_bpmn_process(
cls,
spiff_task: SpiffTask,
process_instance: ProcessInstanceModel,
serializer: BpmnWorkflowSerializer,
) -> BpmnProcessModel:
subprocess_guid, subprocess = cls.task_subprocess(spiff_task)
bpmn_process: Optional[BpmnProcessModel] = None
if subprocess is None:
bpmn_process = process_instance.bpmn_process
# This is the top level workflow, which has no guid
# check for bpmn_process_id because mypy doesn't realize bpmn_process can be None
if process_instance.bpmn_process_id is None:
bpmn_process = cls.add_bpmn_process(
serializer.workflow_to_dict(
spiff_task.workflow._get_outermost_workflow()
),
process_instance,
)
db.session.commit()
else:
bpmn_process = BpmnProcessModel.query.filter_by(
guid=subprocess_guid
).first()
if bpmn_process is None:
bpmn_process = cls.add_bpmn_process(
serializer.workflow_to_dict(subprocess),
process_instance,
process_instance.bpmn_process,
subprocess_guid,
)
db.session.commit()
return bpmn_process
@classmethod
def add_bpmn_process(
cls,
bpmn_process_dict: dict,
process_instance: ProcessInstanceModel,
bpmn_process_parent: Optional[BpmnProcessModel] = None,
bpmn_process_guid: Optional[str] = None,
) -> BpmnProcessModel:
tasks = bpmn_process_dict.pop("tasks")
bpmn_process_data_dict = bpmn_process_dict.pop("data")
bpmn_process = None
if bpmn_process_parent is not None:
bpmn_process = BpmnProcessModel.query.filter_by(
parent_process_id=bpmn_process_parent.id, guid=bpmn_process_guid
).first()
elif process_instance.bpmn_process_id is not None:
bpmn_process = process_instance.bpmn_process
bpmn_process_is_new = False
if bpmn_process is None:
bpmn_process_is_new = True
bpmn_process = BpmnProcessModel(guid=bpmn_process_guid)
bpmn_process.properties_json = bpmn_process_dict
bpmn_process_data_json = json.dumps(bpmn_process_data_dict, sort_keys=True)
bpmn_process_data_hash = sha256(
bpmn_process_data_json.encode("utf8")
).hexdigest()
if bpmn_process.json_data_hash != bpmn_process_data_hash:
json_data = (
db.session.query(JsonDataModel.id)
.filter_by(hash=bpmn_process_data_hash)
.first()
)
if json_data is None:
json_data = JsonDataModel(
hash=bpmn_process_data_hash, data=bpmn_process_data_dict
)
db.session.add(json_data)
bpmn_process.json_data_hash = bpmn_process_data_hash
if bpmn_process_parent is None:
process_instance.bpmn_process = bpmn_process
elif bpmn_process.parent_process_id is None:
bpmn_process.parent_process_id = bpmn_process_parent.id
db.session.add(bpmn_process)
if bpmn_process_is_new:
for task_id, task_properties in tasks.items():
task_data_dict = task_properties.pop("data")
state_int = task_properties["state"]
task_model = TaskModel.query.filter_by(guid=task_id).first()
if task_model is None:
# bpmn_process_identifier = task_properties['workflow_name']
# bpmn_identifier = task_properties['task_spec']
#
# task_definition = TaskDefinitionModel.query.filter_by(bpmn_identifier=bpmn_identifier)
# .join(BpmnProcessDefinitionModel).filter(BpmnProcessDefinitionModel.bpmn_identifier==bpmn_process_identifier).first()
# if task_definition is None:
# subprocess_task = TaskModel.query.filter_by(guid=bpmn_process.guid)
task_model = TaskModel(
guid=task_id, bpmn_process_id=bpmn_process.id
)
task_model.state = TaskStateNames[state_int]
task_model.properties_json = task_properties
TaskService.update_task_data_on_task_model(task_model, task_data_dict)
db.session.add(task_model)
return bpmn_process

View File

@ -2,7 +2,9 @@ import logging
import time
from typing import Callable
from typing import List
from typing import Optional
from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore
from SpiffWorkflow.exceptions import SpiffWorkflowException # type: ignore
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
@ -16,18 +18,23 @@ from spiffworkflow_backend.models.message_instance_correlation import (
)
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.spiff_step_details import SpiffStepDetailsModel
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.services.task_service import TaskService
class EngineStepDelegate:
"""Interface of sorts for a concrete engine step delegate."""
def will_complete_task(self, task: SpiffTask) -> None:
def will_complete_task(self, spiff_task: SpiffTask) -> None:
pass
def did_complete_task(self, task: SpiffTask) -> None:
def did_complete_task(self, spiff_task: SpiffTask) -> None:
pass
def save(self) -> None:
def save(self, commit: bool = False) -> None:
pass
def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None:
pass
@ -35,6 +42,75 @@ SpiffStepIncrementer = Callable[[], None]
SpiffStepDetailsMappingBuilder = Callable[[SpiffTask, float, float], dict]
class TaskModelSavingDelegate(EngineStepDelegate):
"""Engine step delegate that takes care of saving a task model to the database.
It can also be given another EngineStepDelegate.
"""
def __init__(
self,
serializer: BpmnWorkflowSerializer,
process_instance: ProcessInstanceModel,
secondary_engine_step_delegate: Optional[EngineStepDelegate] = None,
) -> None:
self.secondary_engine_step_delegate = secondary_engine_step_delegate
self.process_instance = process_instance
self.current_task_model: Optional[TaskModel] = None
self.serializer = serializer
def should_update_task_model(self) -> bool:
"""We need to figure out if we have previously save task info on this process intance.
Use the bpmn_process_id to do this.
"""
return self.process_instance.bpmn_process_id is not None
# return False
def will_complete_task(self, spiff_task: SpiffTask) -> None:
if self.should_update_task_model():
self.current_task_model = (
TaskService.find_or_create_task_model_from_spiff_task(
spiff_task, self.process_instance, self.serializer
)
)
self.current_task_model.start_in_seconds = time.time()
if self.secondary_engine_step_delegate:
self.secondary_engine_step_delegate.will_complete_task(spiff_task)
def did_complete_task(self, spiff_task: SpiffTask) -> None:
if self.current_task_model and self.should_update_task_model():
self.current_task_model.end_in_seconds = time.time()
TaskService.update_task_model_and_add_to_db_session(
self.current_task_model, spiff_task, self.serializer
)
db.session.add(self.current_task_model)
if self.secondary_engine_step_delegate:
self.secondary_engine_step_delegate.did_complete_task(spiff_task)
def save(self, _commit: bool = True) -> None:
if self.secondary_engine_step_delegate:
self.secondary_engine_step_delegate.save(commit=False)
db.session.commit()
def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None:
for waiting_spiff_task in bpmn_process_instance.get_tasks(
TaskState.WAITING | TaskState.CANCELLED
):
task_model = TaskModel.query.filter_by(
guid=str(waiting_spiff_task.id)
).first()
if task_model is None:
task_model = TaskService.find_or_create_task_model_from_spiff_task(
waiting_spiff_task, self.process_instance, self.serializer
)
TaskService.update_task_model_and_add_to_db_session(
task_model, waiting_spiff_task, self.serializer
)
db.session.commit()
class StepDetailLoggingDelegate(EngineStepDelegate):
"""Engine step delegate that takes care of logging spiff step details.
@ -65,28 +141,29 @@ class StepDetailLoggingDelegate(EngineStepDelegate):
"Transactional Subprocess",
}
def should_log(self, task: SpiffTask) -> bool:
def should_log(self, spiff_task: SpiffTask) -> bool:
return (
task.task_spec.spec_type in self.tasks_to_log
and not task.task_spec.name.endswith(".EndJoin")
spiff_task.task_spec.spec_type in self.tasks_to_log
and not spiff_task.task_spec.name.endswith(".EndJoin")
)
def will_complete_task(self, task: SpiffTask) -> None:
if self.should_log(task):
def will_complete_task(self, spiff_task: SpiffTask) -> None:
if self.should_log(spiff_task):
self.current_task_start_in_seconds = time.time()
self.increment_spiff_step()
def did_complete_task(self, task: SpiffTask) -> None:
if self.should_log(task):
def did_complete_task(self, spiff_task: SpiffTask) -> None:
if self.should_log(spiff_task):
self.step_details.append(
self.spiff_step_details_mapping(
task, self.current_task_start_in_seconds, time.time()
spiff_task, self.current_task_start_in_seconds, time.time()
)
)
def save(self) -> None:
def save(self, commit: bool = True) -> None:
db.session.bulk_insert_mappings(SpiffStepDetailsModel, self.step_details)
db.session.commit()
if commit:
db.session.commit()
class ExecutionStrategy:
@ -116,6 +193,7 @@ class GreedyExecutionStrategy(ExecutionStrategy):
will_complete_task=self.delegate.will_complete_task,
did_complete_task=self.delegate.did_complete_task,
)
self.delegate.after_engine_steps(bpmn_process_instance)
class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy):
@ -136,12 +214,12 @@ class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy):
]
)
while engine_steps:
for task in engine_steps:
if task.task_spec.spec_type == "Service Task":
for spiff_task in engine_steps:
if spiff_task.task_spec.spec_type == "Service Task":
return
self.delegate.will_complete_task(task)
task.complete()
self.delegate.did_complete_task(task)
self.delegate.will_complete_task(spiff_task)
spiff_task.complete()
self.delegate.did_complete_task(spiff_task)
engine_steps = list(
[
@ -151,6 +229,8 @@ class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy):
]
)
self.delegate.after_engine_steps(bpmn_process_instance)
def execution_strategy_named(
name: str, delegate: EngineStepDelegate
@ -224,6 +304,13 @@ class WorkflowExecutionService:
correlation_keys=self.bpmn_process_instance.correlations,
)
db.session.add(message_instance)
bpmn_process = self.process_instance_model.bpmn_process
if bpmn_process is not None:
bpmn_process_correlations = self.bpmn_process_instance.correlations
bpmn_process.properties_json["correlations"] = bpmn_process_correlations
db.session.add(bpmn_process)
db.session.commit()
def queue_waiting_receive_messages(self) -> None:
@ -261,6 +348,14 @@ class WorkflowExecutionService:
)
message_instance.correlation_rules.append(message_correlation)
db.session.add(message_instance)
bpmn_process = self.process_instance_model.bpmn_process
if bpmn_process is not None:
bpmn_process_correlations = self.bpmn_process_instance.correlations
bpmn_process.properties_json["correlations"] = bpmn_process_correlations
db.session.add(bpmn_process)
db.session.commit()

View File

@ -2,39 +2,52 @@
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:spiffworkflow="http://spiffworkflow.org/bpmn/schema/1.0/core" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" id="Definitions_96f6665" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="3.0.0-dev">
<bpmn:process id="Process_ManualTask" name="Manual Task" isExecutable="true">
<bpmn:startEvent id="StartEvent_1">
<bpmn:outgoing>Flow_1xlck7g</bpmn:outgoing>
<bpmn:outgoing>Flow_0stlaxe</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_1xlck7g" sourceRef="StartEvent_1" targetRef="Activity_Hello" />
<bpmn:endEvent id="Event_0ia26nb">
<bpmn:endEvent id="end_event_of_manual_task_model">
<bpmn:incoming>Flow_0nnh2x9</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_0nnh2x9" sourceRef="Activity_Hello" targetRef="Event_0ia26nb" />
<bpmn:sequenceFlow id="Flow_0nnh2x9" sourceRef="Activity_Hello" targetRef="end_event_of_manual_task_model" />
<bpmn:manualTask id="Activity_Hello" name="Hello">
<bpmn:extensionElements>
<spiffworkflow:instructionsForEndUser>## Hello</spiffworkflow:instructionsForEndUser>
</bpmn:extensionElements>
<bpmn:incoming>Flow_1xlck7g</bpmn:incoming>
<bpmn:incoming>Flow_1pmem7s</bpmn:incoming>
<bpmn:outgoing>Flow_0nnh2x9</bpmn:outgoing>
</bpmn:manualTask>
<bpmn:sequenceFlow id="Flow_0stlaxe" sourceRef="StartEvent_1" targetRef="the_script" />
<bpmn:sequenceFlow id="Flow_1pmem7s" sourceRef="the_script" targetRef="Activity_Hello" />
<bpmn:scriptTask id="the_script">
<bpmn:incoming>Flow_0stlaxe</bpmn:incoming>
<bpmn:outgoing>Flow_1pmem7s</bpmn:outgoing>
<bpmn:script>the_new_var = "HEY"</bpmn:script>
</bpmn:scriptTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Process_ManualTask">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_0ia26nb_di" bpmnElement="Event_0ia26nb">
<dc:Bounds x="432" y="159" width="36" height="36" />
<bpmndi:BPMNShape id="Event_0ia26nb_di" bpmnElement="end_event_of_manual_task_model">
<dc:Bounds x="592" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1rcj16n_di" bpmnElement="Activity_Hello">
<dc:Bounds x="420" y="137" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1vokg57_di" bpmnElement="the_script">
<dc:Bounds x="270" y="137" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_1xlck7g_di" bpmnElement="Flow_1xlck7g">
<bpmndi:BPMNEdge id="Flow_0nnh2x9_di" bpmnElement="Flow_0nnh2x9">
<di:waypoint x="520" y="177" />
<di:waypoint x="592" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0stlaxe_di" bpmnElement="Flow_0stlaxe">
<di:waypoint x="215" y="177" />
<di:waypoint x="270" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0nnh2x9_di" bpmnElement="Flow_0nnh2x9">
<bpmndi:BPMNEdge id="Flow_1pmem7s_di" bpmnElement="Flow_1pmem7s">
<di:waypoint x="370" y="177" />
<di:waypoint x="432" y="177" />
<di:waypoint x="420" y="177" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>

View File

@ -0,0 +1,112 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:spiffworkflow="http://spiffworkflow.org/bpmn/schema/1.0/core" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" id="Definitions_96f6665" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="3.0.0-dev">
<bpmn:process id="top_level_process" name="Manual Task" isExecutable="true">
<bpmn:startEvent id="StartEvent_1">
<bpmn:outgoing>Flow_0stlaxe</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:endEvent id="end_event_of_manual_task_model">
<bpmn:incoming>Flow_1and8ze</bpmn:incoming>
</bpmn:endEvent>
<bpmn:manualTask id="manual_task" name="Hello">
<bpmn:extensionElements>
<spiffworkflow:instructionsForEndUser>## Hello</spiffworkflow:instructionsForEndUser>
</bpmn:extensionElements>
<bpmn:incoming>Flow_1fktmf7</bpmn:incoming>
<bpmn:outgoing>Flow_09gjylo</bpmn:outgoing>
</bpmn:manualTask>
<bpmn:sequenceFlow id="Flow_0stlaxe" sourceRef="StartEvent_1" targetRef="top_level_script" />
<bpmn:scriptTask id="top_level_script">
<bpmn:incoming>Flow_0stlaxe</bpmn:incoming>
<bpmn:outgoing>Flow_1fktmf7</bpmn:outgoing>
<bpmn:script>set_in_top_level_script = 1</bpmn:script>
</bpmn:scriptTask>
<bpmn:sequenceFlow id="Flow_1fktmf7" sourceRef="top_level_script" targetRef="manual_task" />
<bpmn:sequenceFlow id="Flow_1i7syph" sourceRef="top_level_subprocess" targetRef="top_level_call_activity" />
<bpmn:sequenceFlow id="Flow_1and8ze" sourceRef="top_level_call_activity" targetRef="end_event_of_manual_task_model" />
<bpmn:sequenceFlow id="Flow_09gjylo" sourceRef="manual_task" targetRef="top_level_subprocess" />
<bpmn:subProcess id="top_level_subprocess">
<bpmn:incoming>Flow_09gjylo</bpmn:incoming>
<bpmn:outgoing>Flow_1i7syph</bpmn:outgoing>
<bpmn:startEvent id="Event_0g7txdo">
<bpmn:outgoing>Flow_00k1tii</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_00k1tii" sourceRef="Event_0g7txdo" targetRef="top_level_subprocess_script" />
<bpmn:endEvent id="Event_0zi0szr">
<bpmn:incoming>Flow_1b4o55k</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_1b4o55k" sourceRef="top_level_subprocess_script" targetRef="Event_0zi0szr" />
<bpmn:scriptTask id="top_level_subprocess_script">
<bpmn:incoming>Flow_00k1tii</bpmn:incoming>
<bpmn:outgoing>Flow_1b4o55k</bpmn:outgoing>
<bpmn:script>set_in_top_level_subprocess = 1</bpmn:script>
</bpmn:scriptTask>
</bpmn:subProcess>
<bpmn:callActivity id="top_level_call_activity" calledElement="test_process_to_call">
<bpmn:incoming>Flow_1i7syph</bpmn:incoming>
<bpmn:outgoing>Flow_1and8ze</bpmn:outgoing>
</bpmn:callActivity>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="top_level_process">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1vokg57_di" bpmnElement="top_level_script">
<dc:Bounds x="270" y="137" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1rcj16n_di" bpmnElement="manual_task">
<dc:Bounds x="400" y="137" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_0ia26nb_di" bpmnElement="end_event_of_manual_task_model">
<dc:Bounds x="812" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_04hrmow_di" bpmnElement="top_level_call_activity">
<dc:Bounds x="680" y="137" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_19a46sv_di" bpmnElement="top_level_subprocess">
<dc:Bounds x="530" y="137" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_0stlaxe_di" bpmnElement="Flow_0stlaxe">
<di:waypoint x="215" y="177" />
<di:waypoint x="270" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1fktmf7_di" bpmnElement="Flow_1fktmf7">
<di:waypoint x="370" y="177" />
<di:waypoint x="400" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1i7syph_di" bpmnElement="Flow_1i7syph">
<di:waypoint x="630" y="177" />
<di:waypoint x="680" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1and8ze_di" bpmnElement="Flow_1and8ze">
<di:waypoint x="780" y="177" />
<di:waypoint x="812" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_09gjylo_di" bpmnElement="Flow_09gjylo">
<di:waypoint x="500" y="177" />
<di:waypoint x="530" y="177" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
<bpmndi:BPMNDiagram id="BPMNDiagram_01cbxj3">
<bpmndi:BPMNPlane id="BPMNPlane_07qyo6y" bpmnElement="top_level_subprocess">
<bpmndi:BPMNShape id="Event_0g7txdo_di" bpmnElement="Event_0g7txdo">
<dc:Bounds x="362" y="132" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_0zi0szr_di" bpmnElement="Event_0zi0szr">
<dc:Bounds x="562" y="132" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0g000aa_di" bpmnElement="top_level_subprocess_script">
<dc:Bounds x="430" y="110" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_00k1tii_di" bpmnElement="Flow_00k1tii">
<di:waypoint x="398" y="150" />
<di:waypoint x="430" y="150" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1b4o55k_di" bpmnElement="Flow_1b4o55k">
<di:waypoint x="530" y="150" />
<di:waypoint x="562" y="150" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>

View File

@ -0,0 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" id="Definitions_96f6665" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="3.0.0-dev">
<bpmn:process id="test_process_to_call" isExecutable="true">
<bpmn:startEvent id="StartEvent_1">
<bpmn:outgoing>Flow_06g687y</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_06g687y" sourceRef="StartEvent_1" targetRef="test_process_to_call_script" />
<bpmn:endEvent id="Event_1nn875f">
<bpmn:incoming>Flow_01e21r0</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_01e21r0" sourceRef="test_process_to_call_script" targetRef="Event_1nn875f" />
<bpmn:scriptTask id="test_process_to_call_script">
<bpmn:incoming>Flow_06g687y</bpmn:incoming>
<bpmn:outgoing>Flow_01e21r0</bpmn:outgoing>
<bpmn:script>set_in_test_process_to_call_script = 1</bpmn:script>
</bpmn:scriptTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="test_process_to_call">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1nn875f_di" bpmnElement="Event_1nn875f">
<dc:Bounds x="432" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_059upl6_di" bpmnElement="test_process_to_call_script">
<dc:Bounds x="270" y="137" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_06g687y_di" bpmnElement="Flow_06g687y">
<di:waypoint x="215" y="177" />
<di:waypoint x="270" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_01e21r0_di" bpmnElement="Flow_01e21r0">
<di:waypoint x="370" y="177" />
<di:waypoint x="432" y="177" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>

View File

@ -10,6 +10,7 @@ from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.authorization_service import (
@ -292,6 +293,89 @@ class TestProcessInstanceProcessor(BaseTest):
assert spiff_task is not None
assert spiff_task.state == TaskState.COMPLETED
def test_properly_saves_tasks_when_running(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_does_not_recreate_human_tasks_on_multiple_saves."""
self.create_process_group(
client, with_super_admin_user, "test_group", "test_group"
)
initiator_user = self.find_or_create_user("initiator_user")
finance_user_three = self.find_or_create_user("testuser3")
assert initiator_user.principal is not None
assert finance_user_three.principal is not None
AuthorizationService.import_permissions_from_yaml_file()
finance_group = GroupModel.query.filter_by(identifier="Finance Team").first()
assert finance_group is not None
process_model = load_test_spec(
process_model_id="test_group/manual_task_with_subprocesses",
process_model_source_directory="manual_task_with_subprocesses",
)
process_instance = self.create_process_instance_from_process_model(
process_model=process_model, user=initiator_user
)
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)
assert len(process_instance.active_human_tasks) == 1
initial_human_task_id = process_instance.active_human_tasks[0].id
# save again to ensure we go attempt to process the human tasks again
processor.save()
assert len(process_instance.active_human_tasks) == 1
assert initial_human_task_id == process_instance.active_human_tasks[0].id
processor = ProcessInstanceProcessor(process_instance)
human_task_one = process_instance.active_human_tasks[0]
spiff_manual_task = processor.__class__.get_task_by_bpmn_identifier(
human_task_one.task_name, processor.bpmn_process_instance
)
ProcessInstanceService.complete_form_task(
processor, spiff_manual_task, {}, initiator_user, human_task_one
)
# recreate variables to ensure all bpmn json was recreated from scratch from the db
process_instance_relookup = ProcessInstanceModel.query.filter_by(
id=process_instance.id
).first()
processor_final = ProcessInstanceProcessor(process_instance_relookup)
assert process_instance_relookup.status == "complete"
first_data_set = {"set_in_top_level_script": 1}
second_data_set = {**first_data_set, **{"set_in_top_level_subprocess": 1}}
third_data_set = {
**second_data_set,
**{"set_in_test_process_to_call_script": 1},
}
expected_task_data = {
"top_level_script": first_data_set,
"manual_task": first_data_set,
"top_level_subprocess_script": second_data_set,
"top_level_subprocess": second_data_set,
"test_process_to_call_script": third_data_set,
"top_level_call_activity": third_data_set,
"end_event_of_manual_task_model": third_data_set,
}
all_spiff_tasks = processor_final.bpmn_process_instance.get_tasks()
assert len(all_spiff_tasks) > 1
for spiff_task in all_spiff_tasks:
assert spiff_task.state == TaskState.COMPLETED
spiff_task_name = spiff_task.task_spec.name
if spiff_task_name in expected_task_data:
spiff_task_data = expected_task_data[spiff_task_name]
failure_message = (
f"Found unexpected task data on {spiff_task_name}. "
f"Expected: {spiff_task_data}, Found: {spiff_task.data}"
)
assert spiff_task.data == spiff_task_data, failure_message
def test_does_not_recreate_human_tasks_on_multiple_saves(
self,
app: Flask,