Feature/nonetype debug stuff (#513)

* some debug items w/ burnettk

* removed some db commits from workflow execution service so the passed in save is the only thing that actually commits the transaction

* set max depth to 50000 for now w/ burnettk

* pyl w/ burnettk

* use temp maxdepth fix in SpiffWorkflow w/ burnettk

---------

Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
jasquat 2023-09-27 17:04:47 -04:00 committed by GitHub
parent 08098dd54e
commit 47f94dccbb
9 changed files with 36 additions and 26 deletions

View File

@ -2352,7 +2352,7 @@ files = [
[[package]]
name = "SpiffWorkflow"
version = "2.0.0rc0"
description = ""
description = "A workflow framework and BPMN/DMN Processor"
optional = false
python-versions = "*"
files = []
@ -2365,8 +2365,8 @@ lxml = "*"
[package.source]
type = "git"
url = "https://github.com/sartography/SpiffWorkflow"
reference = "main"
resolved_reference = "90159bd23c3c74fcf480414473e851460a01e92c"
reference = "temp/short-term-max-depth-fix"
resolved_reference = "56f0ba545b2c0f16a4ce7afac951fd3694fb9040"
[[package]]
name = "sqlalchemy"
@ -2795,4 +2795,4 @@ tests-strict = ["codecov (==2.0.15)", "pytest (==4.6.0)", "pytest (==4.6.0)", "p
[metadata]
lock-version = "2.0"
python-versions = ">=3.10,<3.12"
content-hash = "38add9297ce0c445da94865bbdf82bb37c95b57e056ec195b3c68704236fcec5"
content-hash = "52d201da1a56af3fad40ecd730548993d93a45023d4a94471112adf4e9bd54b5"

View File

@ -29,7 +29,7 @@ flask-migrate = "*"
flask-restful = "*"
flask-simple-crypt = "^0.3.3"
werkzeug = "*"
SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "main"}
SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "temp/short-term-max-depth-fix"}
# SpiffWorkflow = {develop = true, path = "../../spiffworkflow/" }
# SpiffWorkflow = {develop = true, path = "../../SpiffWorkflow/" }
sentry-sdk = "^1.10"

View File

@ -155,6 +155,8 @@ config_from_env(
"SPIFFWORKFLOW_BACKEND_SYSTEM_NOTIFICATION_PROCESS_MODEL_MESSAGE_ID",
default="Message_SystemMessageNotification",
)
# check all tasks listed as child tasks are saved to the database
config_from_env("SPIFFWORKFLOW_BACKEND_DEBUG_TASK_CONSISTENCY", default=False)
### for documentation only
# we load the CustomBpmnScriptEngine at import time, where we do not have access to current_app,

View File

@ -83,5 +83,4 @@ class JsonDataModel(SpiffworkflowBaseDBModel):
def create_and_insert_json_data_from_dict(cls, data: dict) -> str:
json_data_hash = sha256(json.dumps(data, sort_keys=True).encode("utf8")).hexdigest()
cls.insert_or_update_json_data_dict({"hash": json_data_hash, "data": data})
db.session.commit()
return json_data_hash

View File

@ -246,9 +246,10 @@ def process_instance_list(
)
json_data_hash = JsonDataModel.create_and_insert_json_data_from_dict(body["report_metadata"])
response_json["report_hash"] = json_data_hash
db.session.commit()
response_json["report_hash"] = json_data_hash
return make_response(jsonify(response_json), 200)

View File

@ -1026,7 +1026,6 @@ class ProcessInstanceProcessor:
def save(self) -> None:
"""Saves the current state of this processor to the database."""
self.process_instance_model.spiff_serializer_version = self.SERIALIZER_VERSION
self.process_instance_model.status = self.get_status().value
current_app.logger.debug(
f"the_status: {self.process_instance_model.status} for instance {self.process_instance_model.id}"
@ -1043,6 +1042,7 @@ class ProcessInstanceProcessor:
db.session.add(self.process_instance_model)
db.session.commit()
human_tasks = HumanTaskModel.query.filter_by(
process_instance_id=self.process_instance_model.id, completed=False
).all()
@ -1477,6 +1477,7 @@ class ProcessInstanceProcessor:
self.save,
)
execution_service.run_and_save(exit_at, save)
self.check_all_tasks()
@classmethod
def get_tasks_with_data(cls, bpmn_process_instance: BpmnWorkflow) -> list[SpiffTask]:
@ -1791,3 +1792,17 @@ class ProcessInstanceProcessor:
self.process_instance_model, ProcessInstanceEventType.process_instance_resumed.value
)
db.session.commit()
def check_all_tasks(self) -> None:
if current_app.config["SPIFFWORKFLOW_BACKEND_DEBUG_TASK_CONSISTENCY"] is not True:
return
tasks = TaskModel.query.filter_by(process_instance_id=self.process_instance_model.id).all()
missing_child_guids = []
for task in tasks:
for child_task_guid in task.properties_json["children"]:
child_task = TaskModel.query.filter_by(guid=child_task_guid).first()
if child_task is None:
missing_child_guids.append(f"Missing child guid {child_task_guid} for {task.properties_json}")
if len(missing_child_guids) > 0:
raise Exception(missing_child_guids)

View File

@ -18,6 +18,7 @@ from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_model import PROCESS_MODEL_SUPPORTED_KEYS_FOR_DISK_SERIALIZATION
from spiffworkflow_backend.models.process_model import ProcessModelInfo
from spiffworkflow_backend.models.process_model import ProcessModelInfoSchema
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.file_system_service import FileSystemService

View File

@ -463,7 +463,6 @@ class TaskService:
# otherwise sqlalchemy returns several warnings.
for task in human_tasks_to_clear + tasks_to_clear:
db.session.delete(task)
db.session.commit()
bpmn_processes_to_delete = (
BpmnProcessModel.query.filter(BpmnProcessModel.guid.in_(deleted_task_guids)) # type: ignore

View File

@ -62,7 +62,7 @@ class EngineStepDelegate:
pass
@abstractmethod
def save(self, bpmn_process_instance: BpmnWorkflow, commit: bool = False) -> None:
def add_object_to_db_session(self, bpmn_process_instance: BpmnWorkflow) -> None:
pass
@abstractmethod
@ -162,8 +162,8 @@ class ExecutionStrategy:
def on_exception(self, bpmn_process_instance: BpmnWorkflow) -> None:
self.delegate.on_exception(bpmn_process_instance)
def save(self, bpmn_process_instance: BpmnWorkflow) -> None:
self.delegate.save(bpmn_process_instance)
def add_object_to_db_session(self, bpmn_process_instance: BpmnWorkflow) -> None:
self.delegate.add_object_to_db_session(bpmn_process_instance)
def get_ready_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> list[SpiffTask]:
tasks = [t for t in bpmn_process_instance.get_tasks(state=TaskState.READY) if not t.task_spec.manual]
@ -240,7 +240,7 @@ class TaskModelSavingDelegate(EngineStepDelegate):
if self.secondary_engine_step_delegate:
self.secondary_engine_step_delegate.did_complete_task(spiff_task)
def save(self, bpmn_process_instance: BpmnWorkflow, _commit: bool = True) -> None:
def add_object_to_db_session(self, bpmn_process_instance: BpmnWorkflow) -> None:
# NOTE: process-all-tasks: All tests pass with this but it's less efficient and would be nice to replace
# excludes COMPLETED. the others were required to get PP1 to go to completion.
# process FUTURE tasks because Boundary events are not processed otherwise.
@ -249,6 +249,7 @@ class TaskModelSavingDelegate(EngineStepDelegate):
# but it didn't quite work in all cases, so we deleted it. you can find it in commit
# 1ead87b4b496525df8cc0e27836c3e987d593dc0 if you are curious.
for waiting_spiff_task in bpmn_process_instance.get_tasks(
max_depth=50000, # TODO: remove when SpiffWorkflow depth calculation is corrected
state=TaskState.WAITING
| TaskState.CANCELLED
| TaskState.READY
@ -256,15 +257,14 @@ class TaskModelSavingDelegate(EngineStepDelegate):
| TaskState.LIKELY
| TaskState.FUTURE
| TaskState.STARTED
| TaskState.ERROR
| TaskState.ERROR,
):
self.task_service.update_task_model_with_spiff_task(waiting_spiff_task)
self.task_service.save_objects_to_database()
if self.secondary_engine_step_delegate:
self.secondary_engine_step_delegate.save(bpmn_process_instance, commit=False)
db.session.commit()
self.secondary_engine_step_delegate.add_object_to_db_session(bpmn_process_instance)
def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None:
pass
@ -380,9 +380,8 @@ class WorkflowExecutionService:
if self.bpmn_process_instance.is_completed():
self.process_instance_completer(self.bpmn_process_instance)
if self.process_instance_model.persistence_level != "none":
self.process_bpmn_messages()
self.queue_waiting_receive_messages()
self.process_bpmn_messages()
self.queue_waiting_receive_messages()
except WorkflowTaskException as wte:
ProcessInstanceTmpService.add_event_to_process_instance(
self.process_instance_model,
@ -398,9 +397,7 @@ class WorkflowExecutionService:
finally:
if self.process_instance_model.persistence_level != "none":
self.execution_strategy.save(self.bpmn_process_instance)
db.session.commit()
self.execution_strategy.add_object_to_db_session(self.bpmn_process_instance)
if save:
self.process_instance_saver()
@ -432,8 +429,6 @@ class WorkflowExecutionService:
bpmn_process.properties_json["bpmn_events"] = []
db.session.add(bpmn_process)
db.session.commit()
def queue_waiting_receive_messages(self) -> None:
waiting_events = self.bpmn_process_instance.waiting_events()
waiting_message_events = filter(lambda e: e.event_type == "MessageEventDefinition", waiting_events)
@ -474,8 +469,6 @@ class WorkflowExecutionService:
bpmn_process.properties_json["correlations"] = bpmn_process_correlations
db.session.add(bpmn_process)
db.session.commit()
class ProfiledWorkflowExecutionService(WorkflowExecutionService):
"""A profiled version of the workflow execution service."""