Backend do_engine_steps performance improvements (#129)
Co-authored-by: Dan <daniel.h.funk@gmail.com>
This commit is contained in:
parent
ab9614c6b4
commit
c00338e951
|
@ -3634,4 +3634,4 @@
|
|||
"clientPolicies" : {
|
||||
"policies" : [ ]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -243,5 +243,5 @@ class DBHandler(logging.Handler):
|
|||
# so at some point we are going to insert logs.
|
||||
# we don't want to insert on every log, so we will insert every 100 logs, which is just about as fast as inserting
|
||||
# on every 1,000 logs. if we get deadlocks in the database, this can be changed to 1 in order to insert on every log.
|
||||
if len(self.logs) % 1 == 0:
|
||||
if len(self.logs) >= 100:
|
||||
self.bulk_insert_logs()
|
||||
|
|
|
@ -149,7 +149,7 @@ class BoxedTaskDataBasedScriptEngineEnvironment(BoxedTaskDataEnvironment): # ty
|
|||
self._last_result = context
|
||||
|
||||
def last_result(self) -> Dict[str, Any]:
|
||||
return self._last_result
|
||||
return {k: v for k, v in self._last_result.items()}
|
||||
|
||||
def clear_state(self) -> None:
|
||||
pass
|
||||
|
@ -226,7 +226,7 @@ class NonTaskDataBasedScriptEngineEnvironment(BasePythonScriptEngineEnvironment)
|
|||
}
|
||||
|
||||
def last_result(self) -> Dict[str, Any]:
|
||||
return self.state
|
||||
return {k: v for k, v in self.state.items()}
|
||||
|
||||
def clear_state(self) -> None:
|
||||
self.state = {}
|
||||
|
@ -254,8 +254,13 @@ class NonTaskDataBasedScriptEngineEnvironment(BasePythonScriptEngineEnvironment)
|
|||
}
|
||||
task.data = {k: v for k, v in task.data.items() if k in task_data_keys_to_keep}
|
||||
|
||||
if hasattr(task.task_spec, "_result_variable"):
|
||||
result_variable = task.task_spec._result_variable(task)
|
||||
if result_variable in task.data:
|
||||
self.state[result_variable] = task.data.pop(result_variable)
|
||||
|
||||
class CustomScriptEngineEnvironment(BoxedTaskDataBasedScriptEngineEnvironment):
|
||||
|
||||
class CustomScriptEngineEnvironment(NonTaskDataBasedScriptEngineEnvironment):
|
||||
pass
|
||||
|
||||
|
||||
|
@ -685,9 +690,13 @@ class ProcessInstanceProcessor:
|
|||
|
||||
def spiff_step_details_mapping(self) -> dict:
|
||||
"""SaveSpiffStepDetails."""
|
||||
bpmn_json = self.serialize()
|
||||
wf_json = json.loads(bpmn_json)
|
||||
task_json = {"tasks": wf_json["tasks"], "subprocesses": wf_json["subprocesses"]}
|
||||
# bpmn_json = self.serialize()
|
||||
# wf_json = json.loads(bpmn_json)
|
||||
task_json: Dict[str, Any] = {
|
||||
# "tasks": wf_json["tasks"],
|
||||
# "subprocesses": wf_json["subprocesses"],
|
||||
# "python_env": self._script_engine.environment.last_result(),
|
||||
}
|
||||
|
||||
return {
|
||||
"process_instance_id": self.process_instance_model.id,
|
||||
|
@ -700,13 +709,7 @@ class ProcessInstanceProcessor:
|
|||
def spiff_step_details(self) -> SpiffStepDetailsModel:
|
||||
"""SaveSpiffStepDetails."""
|
||||
details_mapping = self.spiff_step_details_mapping()
|
||||
details_model = SpiffStepDetailsModel(
|
||||
process_instance_id=details_mapping["process_instance_id"],
|
||||
spiff_step=details_mapping["spiff_step"],
|
||||
task_json=details_mapping["task_json"],
|
||||
timestamp=details_mapping["timestamp"],
|
||||
# completed_by_user_id=details_mapping["completed_by_user_id"],
|
||||
)
|
||||
details_model = SpiffStepDetailsModel(**details_mapping)
|
||||
return details_model
|
||||
|
||||
def extract_metadata(self, process_model_info: ProcessModelInfo) -> None:
|
||||
|
@ -1490,16 +1493,42 @@ class ProcessInstanceProcessor:
|
|||
"""Do_engine_steps."""
|
||||
step_details = []
|
||||
|
||||
tasks_to_log = {
|
||||
"BPMN Task",
|
||||
"Script Task",
|
||||
"Service Task"
|
||||
# "End Event",
|
||||
# "Default Start Event",
|
||||
# "Exclusive Gateway",
|
||||
# "End Join",
|
||||
# "End Event",
|
||||
# "Default Throwing Event",
|
||||
# "Subprocess"
|
||||
}
|
||||
|
||||
def should_log(task: SpiffTask) -> bool:
|
||||
if (
|
||||
task.task_spec.spec_type in tasks_to_log
|
||||
and not task.task_spec.name.endswith(".EndJoin")
|
||||
):
|
||||
return True
|
||||
return False
|
||||
|
||||
def will_complete_task(task: SpiffTask) -> None:
|
||||
if should_log(task):
|
||||
self.increment_spiff_step()
|
||||
|
||||
def did_complete_task(task: SpiffTask) -> None:
|
||||
self._script_engine.environment.revise_state_with_task_data(task)
|
||||
step_details.append(self.spiff_step_details_mapping())
|
||||
if should_log(task):
|
||||
self._script_engine.environment.revise_state_with_task_data(task)
|
||||
step_details.append(self.spiff_step_details_mapping())
|
||||
|
||||
try:
|
||||
self.bpmn_process_instance.refresh_waiting_tasks()
|
||||
|
||||
self.bpmn_process_instance.do_engine_steps(
|
||||
exit_at=exit_at,
|
||||
will_complete_task=lambda t: self.increment_spiff_step(),
|
||||
will_complete_task=will_complete_task,
|
||||
did_complete_task=did_complete_task,
|
||||
)
|
||||
|
||||
|
|
Loading…
Reference in New Issue