tests are now passing w/ burnettk

This commit is contained in:
jasquat 2023-03-16 16:14:41 -04:00
parent 80d9fa8ad6
commit 5c0d72ab91
6 changed files with 126 additions and 87 deletions

View File

@ -50,6 +50,7 @@ class TaskModel(SpiffworkflowBaseDBModel):
id: int = db.Column(db.Integer, primary_key=True) id: int = db.Column(db.Integer, primary_key=True)
guid: str = db.Column(db.String(36), nullable=False, unique=True, index=True) guid: str = db.Column(db.String(36), nullable=False, unique=True, index=True)
bpmn_process_id: int = db.Column(ForeignKey(BpmnProcessModel.id), nullable=False) # type: ignore bpmn_process_id: int = db.Column(ForeignKey(BpmnProcessModel.id), nullable=False) # type: ignore
bpmn_process = relationship(BpmnProcessModel)
process_instance_id: int = db.Column(ForeignKey("process_instance.id"), nullable=False) process_instance_id: int = db.Column(ForeignKey("process_instance.id"), nullable=False)
# find this by looking up the "workflow_name" and "task_spec" from the properties_json # find this by looking up the "workflow_name" and "task_spec" from the properties_json

View File

@ -326,6 +326,10 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore
environment = CustomScriptEngineEnvironment(default_globals) environment = CustomScriptEngineEnvironment(default_globals)
# right now spiff is executing script tasks on ready so doing this
# so we know when something fails and we can save it to our database.
self.failing_spiff_task: Optional[SpiffTask] = None
super().__init__(environment=environment) super().__init__(environment=environment)
def __get_augment_methods(self, task: Optional[SpiffTask]) -> Dict[str, Callable]: def __get_augment_methods(self, task: Optional[SpiffTask]) -> Dict[str, Callable]:
@ -385,11 +389,14 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore
def execute(self, task: SpiffTask, script: str, external_methods: Any = None) -> None: def execute(self, task: SpiffTask, script: str, external_methods: Any = None) -> None:
"""Execute.""" """Execute."""
try: try:
# reset failing task just in case
self.failing_spiff_task = None
methods = self.__get_augment_methods(task) methods = self.__get_augment_methods(task)
if external_methods: if external_methods:
methods.update(external_methods) methods.update(external_methods)
super().execute(task, script, methods) super().execute(task, script, methods)
except WorkflowException as e: except WorkflowException as e:
self.failing_spiff_task = task
raise e raise e
except Exception as e: except Exception as e:
raise self.create_task_exec_exception(task, script, e) from e raise self.create_task_exec_exception(task, script, e) from e
@ -1558,7 +1565,6 @@ class ProcessInstanceProcessor:
serializer=self._serializer, serializer=self._serializer,
process_instance=self.process_instance_model, process_instance=self.process_instance_model,
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
# script_engine=self._script_engine,
) )
if execution_strategy_name is None: if execution_strategy_name is None:
@ -1572,7 +1578,13 @@ class ProcessInstanceProcessor:
self._script_engine.environment.finalize_result, self._script_engine.environment.finalize_result,
self.save, self.save,
) )
try:
execution_service.do_engine_steps(exit_at, save) execution_service.do_engine_steps(exit_at, save)
finally:
# clear out failling spiff tasks here since the ProcessInstanceProcessor creates an instance of the
# script engine on a class variable.
if hasattr(self._script_engine, "failing_spiff_task") and self._script_engine.failing_spiff_task is not None:
self._script_engine.failing_spiff_task = None
# log the spiff step details so we know what is processing the process # log the spiff step details so we know what is processing the process
# instance when a human task has a timer event. # instance when a human task has a timer event.

View File

@ -202,14 +202,9 @@ class TaskService:
bpmn_process.properties_json = bpmn_process_dict bpmn_process.properties_json = bpmn_process_dict
bpmn_process_data_json = json.dumps(bpmn_process_data_dict, sort_keys=True) bpmn_process_json_data = cls.update_task_data_on_bpmn_process(bpmn_process, bpmn_process_data_dict)
bpmn_process_data_hash = sha256(bpmn_process_data_json.encode("utf8")).hexdigest() if bpmn_process_json_data is not None:
if bpmn_process.json_data_hash != bpmn_process_data_hash: new_json_data_dicts[bpmn_process_json_data['hash']] = bpmn_process_json_data
new_json_data_dicts[bpmn_process_data_hash] = {
"hash": bpmn_process_data_hash,
"data": bpmn_process_data_dict,
}
bpmn_process.json_data_hash = bpmn_process_data_hash
if bpmn_process_parent is None: if bpmn_process_parent is None:
process_instance.bpmn_process = bpmn_process process_instance.bpmn_process = bpmn_process
@ -261,6 +256,18 @@ class TaskService:
return (bpmn_process, new_task_models, new_json_data_dicts) return (bpmn_process, new_task_models, new_json_data_dicts)
@classmethod
def update_task_data_on_bpmn_process(
cls, bpmn_process: BpmnProcessModel, bpmn_process_data_dict: dict
) -> Optional[JsonDataDict]:
bpmn_process_data_json = json.dumps(bpmn_process_data_dict, sort_keys=True)
bpmn_process_data_hash: str = sha256(bpmn_process_data_json.encode("utf8")).hexdigest()
json_data_dict: Optional[JsonDataDict] = None
if bpmn_process.json_data_hash != bpmn_process_data_hash:
json_data_dict = {"hash": bpmn_process_data_hash, "data": bpmn_process_data_dict}
bpmn_process.json_data_hash = bpmn_process_data_hash
return json_data_dict
@classmethod @classmethod
def _update_task_data_on_task_model( def _update_task_data_on_task_model(
cls, task_model: TaskModel, task_data_dict: dict, task_model_data_column: str cls, task_model: TaskModel, task_data_dict: dict, task_model_data_column: str

View File

@ -36,7 +36,7 @@ class EngineStepDelegate:
def did_complete_task(self, spiff_task: SpiffTask) -> None: def did_complete_task(self, spiff_task: SpiffTask) -> None:
pass pass
def save(self, commit: bool = False) -> None: def save(self, bpmn_process_instance: BpmnWorkflow, commit: bool = False) -> None:
pass pass
def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None: def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None:
@ -58,13 +58,11 @@ class TaskModelSavingDelegate(EngineStepDelegate):
serializer: BpmnWorkflowSerializer, serializer: BpmnWorkflowSerializer,
process_instance: ProcessInstanceModel, process_instance: ProcessInstanceModel,
bpmn_definition_to_task_definitions_mappings: dict, bpmn_definition_to_task_definitions_mappings: dict,
# script_engine,
secondary_engine_step_delegate: Optional[EngineStepDelegate] = None, secondary_engine_step_delegate: Optional[EngineStepDelegate] = None,
) -> None: ) -> None:
self.secondary_engine_step_delegate = secondary_engine_step_delegate self.secondary_engine_step_delegate = secondary_engine_step_delegate
self.process_instance = process_instance self.process_instance = process_instance
self.bpmn_definition_to_task_definitions_mappings = bpmn_definition_to_task_definitions_mappings self.bpmn_definition_to_task_definitions_mappings = bpmn_definition_to_task_definitions_mappings
# self.script_engine = script_engine
self.current_task_model: Optional[TaskModel] = None self.current_task_model: Optional[TaskModel] = None
self.current_task_start_in_seconds: Optional[float] = None self.current_task_start_in_seconds: Optional[float] = None
@ -72,7 +70,41 @@ class TaskModelSavingDelegate(EngineStepDelegate):
self.json_data_dicts: dict[str, JsonDataDict] = {} self.json_data_dicts: dict[str, JsonDataDict] = {}
self.serializer = serializer self.serializer = serializer
def should_update_task_model(self) -> bool: def will_complete_task(self, spiff_task: SpiffTask) -> None:
if self._should_update_task_model():
self.current_task_start_in_seconds = time.time()
if self.secondary_engine_step_delegate:
self.secondary_engine_step_delegate.will_complete_task(spiff_task)
def did_complete_task(self, spiff_task: SpiffTask) -> None:
if self._should_update_task_model():
self._update_task_model_with_spiff_task(spiff_task)
if self.secondary_engine_step_delegate:
self.secondary_engine_step_delegate.did_complete_task(spiff_task)
def save(self, bpmn_process_instance: BpmnWorkflow, _commit: bool = True) -> None:
script_engine = bpmn_process_instance.script_engine
if hasattr(script_engine, "failing_spiff_task") and script_engine.failing_spiff_task is not None:
failing_spiff_task = script_engine.failing_spiff_task
self._update_task_model_with_spiff_task(failing_spiff_task)
db.session.bulk_save_objects(self.task_models.values())
TaskService.insert_or_update_json_data_records(self.json_data_dicts)
if self.secondary_engine_step_delegate:
self.secondary_engine_step_delegate.save(bpmn_process_instance, commit=False)
db.session.commit()
def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None:
if self._should_update_task_model():
# excludes FUTURE and COMPLETED. the others were required to get PP1 to go to completion.
for waiting_spiff_task in bpmn_process_instance.get_tasks(
TaskState.WAITING | TaskState.CANCELLED | TaskState.READY | TaskState.MAYBE | TaskState.LIKELY
):
self._update_task_model_with_spiff_task(waiting_spiff_task)
def _should_update_task_model(self) -> bool:
"""We need to figure out if we have previously save task info on this process intance. """We need to figure out if we have previously save task info on this process intance.
Use the bpmn_process_id to do this. Use the bpmn_process_id to do this.
@ -85,31 +117,8 @@ class TaskModelSavingDelegate(EngineStepDelegate):
if json_data_dict is not None: if json_data_dict is not None:
self.json_data_dicts[json_data_dict["hash"]] = json_data_dict self.json_data_dicts[json_data_dict["hash"]] = json_data_dict
def will_complete_task(self, spiff_task: SpiffTask) -> None: def _update_task_model_with_spiff_task(self, spiff_task: SpiffTask) -> None:
if self.should_update_task_model(): bpmn_process, task_model, new_task_models, new_json_data_dicts = (
# _bpmn_process, task_model, new_task_models, new_json_data_dicts = (
# TaskService.find_or_create_task_model_from_spiff_task(
# spiff_task,
# self.process_instance,
# self.serializer,
# bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
# )
# )
# self.current_task_model = task_model
# self.task_models.update(new_task_models)
# self.json_data_dicts.update(new_json_data_dicts)
# self.current_task_model.start_in_seconds = time.time()
self.current_task_start_in_seconds = time.time()
if self.secondary_engine_step_delegate:
self.secondary_engine_step_delegate.will_complete_task(spiff_task)
def did_complete_task(self, spiff_task: SpiffTask) -> None:
# if self.current_task_model and self.should_update_task_model():
if self.should_update_task_model():
# if spiff_task.task_spec.name == 'top_level_script':
# import pdb; pdb.set_trace()
# spiff_task.workflow.script_engine.environment.revise_state_with_task_data(spiff_task)
_bpmn_process, task_model, new_task_models, new_json_data_dicts = (
TaskService.find_or_create_task_model_from_spiff_task( TaskService.find_or_create_task_model_from_spiff_task(
spiff_task, spiff_task,
self.process_instance, self.process_instance,
@ -117,43 +126,13 @@ class TaskModelSavingDelegate(EngineStepDelegate):
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
) )
) )
task_model.start_in_seconds = self.current_task_start_in_seconds or time.time() bpmn_process_json_data = TaskService.update_task_data_on_bpmn_process(bpmn_process or task_model.bpmn_process, spiff_task.workflow.data)
task_model.end_in_seconds = time.time() self.task_models.update(new_task_models)
self.json_data_dicts.update(new_json_data_dicts)
json_data_dict_list = TaskService.update_task_model(task_model, spiff_task, self.serializer) json_data_dict_list = TaskService.update_task_model(task_model, spiff_task, self.serializer)
self._update_json_data_dicts_using_list(json_data_dict_list)
self.task_models.update(new_task_models)
self.json_data_dicts.update(new_json_data_dicts)
self.task_models[task_model.guid] = task_model
if self.secondary_engine_step_delegate:
self.secondary_engine_step_delegate.did_complete_task(spiff_task)
def save(self, _commit: bool = True) -> None:
db.session.bulk_save_objects(self.task_models.values())
TaskService.insert_or_update_json_data_records(self.json_data_dicts)
if self.secondary_engine_step_delegate:
self.secondary_engine_step_delegate.save(commit=False)
db.session.commit()
def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None:
if self.should_update_task_model():
# excludes FUTURE and COMPLETED. the others were required to get PP1 to go to completion.
for waiting_spiff_task in bpmn_process_instance.get_tasks(
TaskState.WAITING | TaskState.CANCELLED | TaskState.READY | TaskState.MAYBE | TaskState.LIKELY
):
_bpmn_process, task_model, new_task_models, new_json_data_dicts = (
TaskService.find_or_create_task_model_from_spiff_task(
waiting_spiff_task,
self.process_instance,
self.serializer,
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
)
)
self.task_models.update(new_task_models)
self.json_data_dicts.update(new_json_data_dicts)
json_data_dict_list = TaskService.update_task_model(task_model, waiting_spiff_task, self.serializer)
self.task_models[task_model.guid] = task_model self.task_models[task_model.guid] = task_model
if bpmn_process_json_data is not None:
json_data_dict_list.append(bpmn_process_json_data)
self._update_json_data_dicts_using_list(json_data_dict_list) self._update_json_data_dicts_using_list(json_data_dict_list)
@ -203,7 +182,7 @@ class StepDetailLoggingDelegate(EngineStepDelegate):
self.spiff_step_details_mapping(spiff_task, self.current_task_start_in_seconds, time.time()) self.spiff_step_details_mapping(spiff_task, self.current_task_start_in_seconds, time.time())
) )
def save(self, commit: bool = True) -> None: def save(self, _bpmn_process_instance: BpmnWorkflow, commit: bool = True) -> None:
db.session.bulk_insert_mappings(SpiffStepDetailsModel, self.step_details) db.session.bulk_insert_mappings(SpiffStepDetailsModel, self.step_details)
if commit: if commit:
db.session.commit() db.session.commit()
@ -215,18 +194,20 @@ class ExecutionStrategy:
def __init__(self, delegate: EngineStepDelegate): def __init__(self, delegate: EngineStepDelegate):
"""__init__.""" """__init__."""
self.delegate = delegate self.delegate = delegate
self.bpmn_process_instance = None
def do_engine_steps(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None: def do_engine_steps(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
pass pass
def save(self) -> None: def save(self) -> None:
self.delegate.save() self.delegate.save(self.bpmn_process_instance)
class GreedyExecutionStrategy(ExecutionStrategy): class GreedyExecutionStrategy(ExecutionStrategy):
"""The common execution strategy. This will greedily run all engine steps without stopping.""" """The common execution strategy. This will greedily run all engine steps without stopping."""
def do_engine_steps(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None: def do_engine_steps(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
self.bpmn_process_instance = bpmn_process_instance
bpmn_process_instance.do_engine_steps( bpmn_process_instance.do_engine_steps(
exit_at=exit_at, exit_at=exit_at,
will_complete_task=self.delegate.will_complete_task, will_complete_task=self.delegate.will_complete_task,
@ -243,6 +224,7 @@ class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy):
""" """
def do_engine_steps(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None: def do_engine_steps(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
self.bpmn_process_instance = bpmn_process_instance
engine_steps = list( engine_steps = list(
[ [
t t
@ -310,6 +292,7 @@ class WorkflowExecutionService:
) )
try: try:
# import pdb; pdb.set_trace()
self.bpmn_process_instance.refresh_waiting_tasks() self.bpmn_process_instance.refresh_waiting_tasks()
# TODO: implicit re-entrant locks here `with_dequeued` # TODO: implicit re-entrant locks here `with_dequeued`

View File

@ -1,5 +1,6 @@
"""Test Process Api Blueprint.""" """Test Process Api Blueprint."""
import io import io
from SpiffWorkflow.task import TaskState
import json import json
import os import os
import time import time
@ -2067,7 +2068,7 @@ class TestProcessApi(BaseTest):
# TODO: make sure the system notification process is run on exceptions # TODO: make sure the system notification process is run on exceptions
... ...
def test_task_data_is_set_even_if_process_instance_errors( def test_task_data_is_set_even_if_process_instance_errors_through_the_api(
self, self,
app: Flask, app: Flask,
client: FlaskClient, client: FlaskClient,
@ -2093,6 +2094,7 @@ class TestProcessApi(BaseTest):
processor = ProcessInstanceProcessor(process_instance) processor = ProcessInstanceProcessor(process_instance)
spiff_task = processor.get_task_by_bpmn_identifier("script_task_two", processor.bpmn_process_instance) spiff_task = processor.get_task_by_bpmn_identifier("script_task_two", processor.bpmn_process_instance)
assert spiff_task is not None assert spiff_task is not None
assert spiff_task.state == TaskState.WAITING
assert spiff_task.data == {"my_var": "THE VAR"} assert spiff_task.data == {"my_var": "THE VAR"}
def test_process_model_file_create( def test_process_model_file_create(

View File

@ -5,7 +5,8 @@ import pytest
from flask import g from flask import g
from flask.app import Flask from flask.app import Flask
from flask.testing import FlaskClient from flask.testing import FlaskClient
from SpiffWorkflow.task import TaskState # type: ignore from SpiffWorkflow.task import TaskState
from spiffworkflow_backend.exceptions.api_error import ApiError # type: ignore
from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
@ -318,6 +319,7 @@ class TestProcessInstanceProcessor(BaseTest):
**{"set_in_test_process_to_call_script": 1}, **{"set_in_test_process_to_call_script": 1},
} }
fourth_data_set = {**third_data_set, **{"a": 1, "we_move_on": True}} fourth_data_set = {**third_data_set, **{"a": 1, "we_move_on": True}}
fifth_data_set = {**fourth_data_set, **{'validate_only': False, 'set_top_level_process_script_after_gate': 1}}
expected_task_data = { expected_task_data = {
"top_level_script": first_data_set, "top_level_script": first_data_set,
"manual_task": first_data_set, "manual_task": first_data_set,
@ -345,6 +347,7 @@ class TestProcessInstanceProcessor(BaseTest):
assert task_definition.bpmn_identifier == spiff_task_name assert task_definition.bpmn_identifier == spiff_task_name
assert task_definition.bpmn_process_definition.bpmn_identifier == bpmn_process_identifier assert task_definition.bpmn_process_definition.bpmn_identifier == bpmn_process_identifier
message = f"{base_failure_message} Expected: {expected_python_env_data}. Received: {task.json_data()}" message = f"{base_failure_message} Expected: {expected_python_env_data}. Received: {task.json_data()}"
# TODO: if we split out env data again we will need to use it here instead of json_data
# assert task.python_env_data() == expected_python_env_data, message # assert task.python_env_data() == expected_python_env_data, message
assert task.json_data() == expected_python_env_data, message assert task.json_data() == expected_python_env_data, message
spiff_tasks_checked_once.append(spiff_task.task_spec.name) spiff_tasks_checked_once.append(spiff_task.task_spec.name)
@ -357,6 +360,8 @@ class TestProcessInstanceProcessor(BaseTest):
assert_spiff_task_is_in_process("top_level_subprocess_script", "top_level_subprocess") assert_spiff_task_is_in_process("top_level_subprocess_script", "top_level_subprocess")
assert_spiff_task_is_in_process("top_level_script", "top_level_process") assert_spiff_task_is_in_process("top_level_script", "top_level_process")
assert processor.get_data() == fifth_data_set
def test_does_not_recreate_human_tasks_on_multiple_saves( def test_does_not_recreate_human_tasks_on_multiple_saves(
self, self,
app: Flask, app: Flask,
@ -469,3 +474,32 @@ class TestProcessInstanceProcessor(BaseTest):
# EDIT: when using feature/remove-loop-reset branch of SpiffWorkflow, these should be different. # EDIT: when using feature/remove-loop-reset branch of SpiffWorkflow, these should be different.
assert human_task_two.task_id != human_task_one.task_id assert human_task_two.task_id != human_task_one.task_id
def test_task_data_is_set_even_if_process_instance_errors(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_task_data_is_set_even_if_process_instance_errors."""
process_model = load_test_spec(
process_model_id="group/error_with_task_data",
bpmn_file_name="script_error_with_task_data.bpmn",
process_model_source_directory="error",
)
process_instance = self.create_process_instance_from_process_model(
process_model=process_model, user=with_super_admin_user
)
processor = ProcessInstanceProcessor(process_instance)
with pytest.raises(ApiError):
processor.do_engine_steps(save=True)
process_instance_final = ProcessInstanceModel.query.filter_by(id=process_instance.id).first()
processor_final = ProcessInstanceProcessor(process_instance_final)
spiff_task = processor_final.get_task_by_bpmn_identifier("script_task_two", processor_final.bpmn_process_instance)
assert spiff_task is not None
assert spiff_task.state == TaskState.WAITING
assert spiff_task.data == {"my_var": "THE VAR"}