tests are now passing w/ burnettk

This commit is contained in:
jasquat 2023-03-10 16:52:57 -05:00
parent e13e703825
commit 7806e6c460
4 changed files with 97 additions and 39 deletions

View File

@ -50,7 +50,6 @@ from SpiffWorkflow.serializer.exceptions import MissingSpecError # type: ignore
from SpiffWorkflow.spiff.serializer.config import SPIFF_SPEC_CONFIG # type: ignore from SpiffWorkflow.spiff.serializer.config import SPIFF_SPEC_CONFIG # type: ignore
from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.task import TaskState from SpiffWorkflow.task import TaskState
from SpiffWorkflow.task import TaskStateNames
from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore
from sqlalchemy import text from sqlalchemy import text
@ -612,6 +611,7 @@ class ProcessInstanceProcessor:
).first() ).first()
bpmn_process_dict = {"data": json_data.data, "tasks": {}} bpmn_process_dict = {"data": json_data.data, "tasks": {}}
bpmn_process_dict.update(bpmn_process.properties_json) bpmn_process_dict.update(bpmn_process.properties_json)
print(f"bpmn_process_dict: {bpmn_process_dict}")
tasks = TaskModel.query.filter_by(bpmn_process_id=bpmn_process.id).all() tasks = TaskModel.query.filter_by(bpmn_process_id=bpmn_process.id).all()
for task in tasks: for task in tasks:
json_data = JsonDataModel.query.filter_by(hash=task.json_data_hash).first() json_data = JsonDataModel.query.filter_by(hash=task.json_data_hash).first()
@ -1062,7 +1062,9 @@ class ProcessInstanceProcessor:
self._add_bpmn_process_definitions(bpmn_spec_dict) self._add_bpmn_process_definitions(bpmn_spec_dict)
subprocesses = process_instance_data_dict.pop("subprocesses") subprocesses = process_instance_data_dict.pop("subprocesses")
bpmn_process_parent = TaskService.add_bpmn_process(process_instance_data_dict, self.process_instance_model) bpmn_process_parent = TaskService.add_bpmn_process(
process_instance_data_dict, self.process_instance_model
)
for subprocess_task_id, subprocess_properties in subprocesses.items(): for subprocess_task_id, subprocess_properties in subprocesses.items():
TaskService.add_bpmn_process( TaskService.add_bpmn_process(
subprocess_properties, subprocess_properties,

View File

@ -1,10 +1,10 @@
import json import json
from hashlib import sha256 from hashlib import sha256
from typing import Tuple
from typing import Any
from typing import Optional from typing import Optional
from typing import Tuple
from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer, BpmnWorkflow # type: ignore from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflow # type: ignore
from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer
from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.task import TaskStateNames from SpiffWorkflow.task import TaskStateNames
@ -20,8 +20,8 @@ class TaskService:
def update_task_data_on_task_model( def update_task_data_on_task_model(
cls, task_model: TaskModel, task_data_dict: dict cls, task_model: TaskModel, task_data_dict: dict
) -> None: ) -> None:
task_data_json = json.dumps(task_data_dict, sort_keys=True).encode("utf8") task_data_json = json.dumps(task_data_dict, sort_keys=True)
task_data_hash = sha256(task_data_json).hexdigest() task_data_hash = sha256(task_data_json.encode("utf8")).hexdigest()
if task_model.json_data_hash != task_data_hash: if task_model.json_data_hash != task_data_hash:
json_data = ( json_data = (
db.session.query(JsonDataModel.id) db.session.query(JsonDataModel.id)
@ -45,34 +45,38 @@ class TaskService:
This will NOT update start_in_seconds or end_in_seconds. This will NOT update start_in_seconds or end_in_seconds.
""" """
new_properties_json = serializer.task_to_dict(spiff_task) new_properties_json = serializer.task_to_dict(spiff_task)
spiff_task_data = new_properties_json.pop("data")
task_model.properties_json = new_properties_json task_model.properties_json = new_properties_json
task_model.state = TaskStateNames[new_properties_json["state"]] task_model.state = TaskStateNames[new_properties_json["state"]]
cls.update_task_data_on_task_model(task_model, spiff_task.data) cls.update_task_data_on_task_model(task_model, spiff_task_data)
db.session.add(task_model) db.session.add(task_model)
@classmethod @classmethod
def find_or_create_task_model_from_spiff_task( def find_or_create_task_model_from_spiff_task(
cls, spiff_task: SpiffTask, process_instance: ProcessInstanceModel, cls,
serializer: BpmnWorkflowSerializer spiff_task: SpiffTask,
process_instance: ProcessInstanceModel,
serializer: BpmnWorkflowSerializer,
) -> TaskModel: ) -> TaskModel:
spiff_task_guid = str(spiff_task.id) spiff_task_guid = str(spiff_task.id)
task_model: Optional[TaskModel] = TaskModel.query.filter_by( task_model: Optional[TaskModel] = TaskModel.query.filter_by(
guid=spiff_task_guid guid=spiff_task_guid
).first() ).first()
if task_model is None: if task_model is None:
bpmn_process = cls.task_bpmn_process(spiff_task, process_instance, serializer) bpmn_process = cls.task_bpmn_process(
task_model = TaskModel.query.filter_by( spiff_task, process_instance, serializer
guid=spiff_task_guid )
).first() task_model = TaskModel.query.filter_by(guid=spiff_task_guid).first()
if task_model is None: if task_model is None:
task_model = TaskModel( task_model = TaskModel(
guid=spiff_task_guid, bpmn_process_id=bpmn_process.id guid=spiff_task_guid, bpmn_process_id=bpmn_process.id
) )
db.session.commit()
return task_model return task_model
@classmethod @classmethod
def task_subprocess(cls, spiff_task: SpiffTask) -> Tuple[Optional[str], Optional[BpmnWorkflow]]: def task_subprocess(
cls, spiff_task: SpiffTask
) -> Tuple[Optional[str], Optional[BpmnWorkflow]]:
top_level_workflow = spiff_task.workflow._get_outermost_workflow() top_level_workflow = spiff_task.workflow._get_outermost_workflow()
my_wf = spiff_task.workflow # This is the workflow the spiff_task is part of my_wf = spiff_task.workflow # This is the workflow the spiff_task is part of
my_sp = None my_sp = None
@ -88,26 +92,37 @@ class TaskService:
@classmethod @classmethod
def task_bpmn_process( def task_bpmn_process(
cls, spiff_task: SpiffTask, process_instance: ProcessInstanceModel, cls,
serializer: BpmnWorkflowSerializer spiff_task: SpiffTask,
process_instance: ProcessInstanceModel,
serializer: BpmnWorkflowSerializer,
) -> BpmnProcessModel: ) -> BpmnProcessModel:
subprocess_guid, subprocess = cls.task_subprocess(spiff_task) subprocess_guid, subprocess = cls.task_subprocess(spiff_task)
bpmn_process: Optional[BpmnProcessModel] = None
if subprocess is None: if subprocess is None:
bpmn_process = process_instance.bpmn_process
# This is the top level workflow, which has no guid # This is the top level workflow, which has no guid
return process_instance.bpmn_process # check for bpmn_process_id because mypy doesn't realize bpmn_process can be None
if process_instance.bpmn_process_id is None:
bpmn_process = cls.add_bpmn_process(
serializer.workflow_to_dict(
spiff_task.workflow._get_outermost_workflow()
),
process_instance,
)
db.session.commit()
else: else:
# import pdb; pdb.set_trace() bpmn_process = BpmnProcessModel.query.filter_by(
bpmn_process: Optional[BpmnProcessModel] = BpmnProcessModel.query.filter_by(
guid=subprocess_guid guid=subprocess_guid
).first() ).first()
# import pdb; pdb.set_trace()
if bpmn_process is None: if bpmn_process is None:
bpmn_process = cls.add_bpmn_process(serializer.workflow_to_dict(subprocess), process_instance, process_instance.bpmn_process, subprocess_guid) bpmn_process = cls.add_bpmn_process(
serializer.workflow_to_dict(subprocess),
process_instance,
process_instance.bpmn_process,
subprocess_guid,
)
db.session.commit() db.session.commit()
# spiff_task_guid = str(spiff_task.id)
# raise Exception(
# f"Could not find bpmn_process for task {spiff_task_guid}"
# )
return bpmn_process return bpmn_process
@classmethod @classmethod
@ -119,7 +134,7 @@ class TaskService:
bpmn_process_guid: Optional[str] = None, bpmn_process_guid: Optional[str] = None,
) -> BpmnProcessModel: ) -> BpmnProcessModel:
tasks = bpmn_process_dict.pop("tasks") tasks = bpmn_process_dict.pop("tasks")
bpmn_process_data = bpmn_process_dict.pop("data") bpmn_process_data_dict = bpmn_process_dict.pop("data")
bpmn_process = None bpmn_process = None
if bpmn_process_parent is not None: if bpmn_process_parent is not None:
@ -136,10 +151,10 @@ class TaskService:
bpmn_process.properties_json = bpmn_process_dict bpmn_process.properties_json = bpmn_process_dict
bpmn_process_data_json = json.dumps(bpmn_process_data, sort_keys=True).encode( bpmn_process_data_json = json.dumps(bpmn_process_data_dict, sort_keys=True)
"utf8" bpmn_process_data_hash = sha256(
) bpmn_process_data_json.encode("utf8")
bpmn_process_data_hash = sha256(bpmn_process_data_json).hexdigest() ).hexdigest()
if bpmn_process.json_data_hash != bpmn_process_data_hash: if bpmn_process.json_data_hash != bpmn_process_data_hash:
json_data = ( json_data = (
db.session.query(JsonDataModel.id) db.session.query(JsonDataModel.id)
@ -148,7 +163,7 @@ class TaskService:
) )
if json_data is None: if json_data is None:
json_data = JsonDataModel( json_data = JsonDataModel(
hash=bpmn_process_data_hash, data=bpmn_process_data hash=bpmn_process_data_hash, data=bpmn_process_data_dict
) )
db.session.add(json_data) db.session.add(json_data)
bpmn_process.json_data_hash = bpmn_process_data_hash bpmn_process.json_data_hash = bpmn_process_data_hash
@ -173,7 +188,9 @@ class TaskService:
# .join(BpmnProcessDefinitionModel).filter(BpmnProcessDefinitionModel.bpmn_identifier==bpmn_process_identifier).first() # .join(BpmnProcessDefinitionModel).filter(BpmnProcessDefinitionModel.bpmn_identifier==bpmn_process_identifier).first()
# if task_definition is None: # if task_definition is None:
# subprocess_task = TaskModel.query.filter_by(guid=bpmn_process.guid) # subprocess_task = TaskModel.query.filter_by(guid=bpmn_process.guid)
task_model = TaskModel(guid=task_id, bpmn_process_id=bpmn_process.id) task_model = TaskModel(
guid=task_id, bpmn_process_id=bpmn_process.id
)
task_model.state = TaskStateNames[state_int] task_model.state = TaskStateNames[state_int]
task_model.properties_json = task_properties task_model.properties_json = task_properties

View File

@ -1,6 +1,5 @@
import logging import logging
import time import time
from typing import Any
from typing import Callable from typing import Callable
from typing import List from typing import List
from typing import Optional from typing import Optional
@ -35,6 +34,9 @@ class EngineStepDelegate:
def save(self, commit: bool = False) -> None: def save(self, commit: bool = False) -> None:
pass pass
def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None:
pass
SpiffStepIncrementer = Callable[[], None] SpiffStepIncrementer = Callable[[], None]
SpiffStepDetailsMappingBuilder = Callable[[SpiffTask, float, float], dict] SpiffStepDetailsMappingBuilder = Callable[[SpiffTask, float, float], dict]
@ -92,6 +94,22 @@ class TaskModelSavingDelegate(EngineStepDelegate):
self.secondary_engine_step_delegate.save(commit=False) self.secondary_engine_step_delegate.save(commit=False)
db.session.commit() db.session.commit()
def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None:
for waiting_spiff_task in bpmn_process_instance.get_tasks(
TaskState.WAITING | TaskState.CANCELLED
):
task_model = TaskModel.query.filter_by(
guid=str(waiting_spiff_task.id)
).first()
if task_model is None:
task_model = TaskService.find_or_create_task_model_from_spiff_task(
waiting_spiff_task, self.process_instance, self.serializer
)
TaskService.update_task_model_and_add_to_db_session(
task_model, waiting_spiff_task, self.serializer
)
db.session.commit()
class StepDetailLoggingDelegate(EngineStepDelegate): class StepDetailLoggingDelegate(EngineStepDelegate):
"""Engine step delegate that takes care of logging spiff step details. """Engine step delegate that takes care of logging spiff step details.
@ -175,6 +193,7 @@ class GreedyExecutionStrategy(ExecutionStrategy):
will_complete_task=self.delegate.will_complete_task, will_complete_task=self.delegate.will_complete_task,
did_complete_task=self.delegate.did_complete_task, did_complete_task=self.delegate.did_complete_task,
) )
self.delegate.after_engine_steps(bpmn_process_instance)
class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy): class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy):
@ -210,6 +229,8 @@ class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy):
] ]
) )
self.delegate.after_engine_steps(bpmn_process_instance)
def execution_strategy_named( def execution_strategy_named(
name: str, delegate: EngineStepDelegate name: str, delegate: EngineStepDelegate
@ -283,6 +304,13 @@ class WorkflowExecutionService:
correlation_keys=self.bpmn_process_instance.correlations, correlation_keys=self.bpmn_process_instance.correlations,
) )
db.session.add(message_instance) db.session.add(message_instance)
bpmn_process = self.process_instance_model.bpmn_process
if bpmn_process is not None:
bpmn_process_correlations = self.bpmn_process_instance.correlations
bpmn_process.properties_json["correlations"] = bpmn_process_correlations
db.session.add(bpmn_process)
db.session.commit() db.session.commit()
def queue_waiting_receive_messages(self) -> None: def queue_waiting_receive_messages(self) -> None:
@ -320,6 +348,14 @@ class WorkflowExecutionService:
) )
message_instance.correlation_rules.append(message_correlation) message_instance.correlation_rules.append(message_correlation)
db.session.add(message_instance) db.session.add(message_instance)
bpmn_process = self.process_instance_model.bpmn_process
if bpmn_process is not None:
bpmn_process_correlations = self.bpmn_process_instance.correlations
bpmn_process.properties_json["correlations"] = bpmn_process_correlations
db.session.add(bpmn_process)
db.session.commit() db.session.commit()

View File

@ -347,9 +347,12 @@ class TestProcessInstanceProcessor(BaseTest):
processor_final = ProcessInstanceProcessor(process_instance_relookup) processor_final = ProcessInstanceProcessor(process_instance_relookup)
assert process_instance_relookup.status == "complete" assert process_instance_relookup.status == "complete"
first_data_set = {'set_in_top_level_script': 1} first_data_set = {"set_in_top_level_script": 1}
second_data_set = {**first_data_set, **{'set_in_top_level_subprocess': 1}} second_data_set = {**first_data_set, **{"set_in_top_level_subprocess": 1}}
third_data_set = {**second_data_set, **{'set_in_test_process_to_call_script': 1}} third_data_set = {
**second_data_set,
**{"set_in_test_process_to_call_script": 1},
}
expected_task_data = { expected_task_data = {
"top_level_script": first_data_set, "top_level_script": first_data_set,
"manual_task": first_data_set, "manual_task": first_data_set,