Merge pull request #145 from sartography/data_size_script
Data size script
This commit is contained in:
commit
c65aeafbe3
|
@ -0,0 +1,46 @@
|
||||||
|
"""Get_data_sizes."""
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from spiffworkflow_backend.models.script_attributes_context import (
|
||||||
|
ScriptAttributesContext,
|
||||||
|
)
|
||||||
|
from spiffworkflow_backend.scripts.script import Script
|
||||||
|
from spiffworkflow_backend.services.process_instance_processor import (
|
||||||
|
ProcessInstanceProcessor,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class GetDataSizes(Script):
|
||||||
|
"""GetDataSizes."""
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def requires_privileged_permissions() -> bool:
|
||||||
|
"""We have deemed this function safe to run without elevated permissions."""
|
||||||
|
return False
|
||||||
|
|
||||||
|
def get_description(self) -> str:
|
||||||
|
"""Get_description."""
|
||||||
|
return """Returns a dictionary of information about the size of task data and
|
||||||
|
the python environment for the currently running process."""
|
||||||
|
|
||||||
|
def run(
|
||||||
|
self,
|
||||||
|
script_attributes_context: ScriptAttributesContext,
|
||||||
|
*_args: Any,
|
||||||
|
**kwargs: Any
|
||||||
|
) -> Any:
|
||||||
|
"""Run."""
|
||||||
|
workflow = script_attributes_context.task.workflow
|
||||||
|
task_data_size = ProcessInstanceProcessor.get_task_data_size(workflow)
|
||||||
|
task_data_keys_by_task = {
|
||||||
|
t.task_spec.name: sorted(t.data.keys())
|
||||||
|
for t in ProcessInstanceProcessor.get_tasks_with_data(workflow)
|
||||||
|
}
|
||||||
|
python_env_size = ProcessInstanceProcessor.get_python_env_size(workflow)
|
||||||
|
python_env_keys = workflow.script_engine.environment.user_defined_state().keys()
|
||||||
|
return {
|
||||||
|
"python_env_size": python_env_size,
|
||||||
|
"python_env_keys": sorted(python_env_keys),
|
||||||
|
"task_data_size": task_data_size,
|
||||||
|
"task_data_keys_by_task": task_data_keys_by_task,
|
||||||
|
}
|
|
@ -152,6 +152,11 @@ class BoxedTaskDataBasedScriptEngineEnvironment(BoxedTaskDataEnvironment): # ty
|
||||||
super().execute(script, context, external_methods)
|
super().execute(script, context, external_methods)
|
||||||
self._last_result = context
|
self._last_result = context
|
||||||
|
|
||||||
|
def user_defined_state(
|
||||||
|
self, external_methods: Optional[Dict[str, Any]] = None
|
||||||
|
) -> Dict[str, Any]:
|
||||||
|
return {}
|
||||||
|
|
||||||
def last_result(self) -> Dict[str, Any]:
|
def last_result(self) -> Dict[str, Any]:
|
||||||
return {k: v for k, v in self._last_result.items()}
|
return {k: v for k, v in self._last_result.items()}
|
||||||
|
|
||||||
|
@ -218,13 +223,13 @@ class NonTaskDataBasedScriptEngineEnvironment(BasePythonScriptEngineEnvironment)
|
||||||
for key_to_drop in context_keys_to_drop:
|
for key_to_drop in context_keys_to_drop:
|
||||||
context.pop(key_to_drop)
|
context.pop(key_to_drop)
|
||||||
|
|
||||||
self.state = self._user_defined_state(external_methods)
|
self.state = self.user_defined_state(external_methods)
|
||||||
|
|
||||||
# the task data needs to be updated with the current state so data references can be resolved properly.
|
# the task data needs to be updated with the current state so data references can be resolved properly.
|
||||||
# the state will be removed later once the task is completed.
|
# the state will be removed later once the task is completed.
|
||||||
context.update(self.state)
|
context.update(self.state)
|
||||||
|
|
||||||
def _user_defined_state(
|
def user_defined_state(
|
||||||
self, external_methods: Optional[Dict[str, Any]] = None
|
self, external_methods: Optional[Dict[str, Any]] = None
|
||||||
) -> Dict[str, Any]:
|
) -> Dict[str, Any]:
|
||||||
keys_to_filter = self.non_user_defined_keys
|
keys_to_filter = self.non_user_defined_keys
|
||||||
|
@ -245,7 +250,7 @@ class NonTaskDataBasedScriptEngineEnvironment(BasePythonScriptEngineEnvironment)
|
||||||
|
|
||||||
def preserve_state(self, bpmn_process_instance: BpmnWorkflow) -> None:
|
def preserve_state(self, bpmn_process_instance: BpmnWorkflow) -> None:
|
||||||
key = self.PYTHON_ENVIRONMENT_STATE_KEY
|
key = self.PYTHON_ENVIRONMENT_STATE_KEY
|
||||||
state = self._user_defined_state()
|
state = self.user_defined_state()
|
||||||
bpmn_process_instance.data[key] = state
|
bpmn_process_instance.data[key] = state
|
||||||
|
|
||||||
def restore_state(self, bpmn_process_instance: BpmnWorkflow) -> None:
|
def restore_state(self, bpmn_process_instance: BpmnWorkflow) -> None:
|
||||||
|
@ -253,7 +258,7 @@ class NonTaskDataBasedScriptEngineEnvironment(BasePythonScriptEngineEnvironment)
|
||||||
self.state = bpmn_process_instance.data.get(key, {})
|
self.state = bpmn_process_instance.data.get(key, {})
|
||||||
|
|
||||||
def finalize_result(self, bpmn_process_instance: BpmnWorkflow) -> None:
|
def finalize_result(self, bpmn_process_instance: BpmnWorkflow) -> None:
|
||||||
bpmn_process_instance.data.update(self._user_defined_state())
|
bpmn_process_instance.data.update(self.user_defined_state())
|
||||||
|
|
||||||
def revise_state_with_task_data(self, task: SpiffTask) -> None:
|
def revise_state_with_task_data(self, task: SpiffTask) -> None:
|
||||||
state_keys = set(self.state.keys())
|
state_keys = set(self.state.keys())
|
||||||
|
@ -1604,16 +1609,40 @@ class ProcessInstanceProcessor:
|
||||||
except WorkflowTaskException as we:
|
except WorkflowTaskException as we:
|
||||||
raise ApiError.from_workflow_exception("task_error", str(we), we) from we
|
raise ApiError.from_workflow_exception("task_error", str(we), we) from we
|
||||||
|
|
||||||
def check_task_data_size(self) -> None:
|
@classmethod
|
||||||
"""CheckTaskDataSize."""
|
def get_tasks_with_data(
|
||||||
tasks_to_check = self.bpmn_process_instance.get_tasks(TaskState.FINISHED_MASK)
|
cls, bpmn_process_instance: BpmnWorkflow
|
||||||
task_data = [task.data for task in tasks_to_check]
|
) -> List[SpiffTask]:
|
||||||
task_data_to_check = list(filter(len, task_data))
|
return [
|
||||||
|
task
|
||||||
|
for task in bpmn_process_instance.get_tasks(TaskState.FINISHED_MASK)
|
||||||
|
if len(task.data) > 0
|
||||||
|
]
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_task_data_size(cls, bpmn_process_instance: BpmnWorkflow) -> int:
|
||||||
|
tasks_with_data = cls.get_tasks_with_data(bpmn_process_instance)
|
||||||
|
all_task_data = [task.data for task in tasks_with_data]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
task_data_len = len(json.dumps(task_data_to_check))
|
return len(json.dumps(all_task_data))
|
||||||
except Exception:
|
except Exception:
|
||||||
task_data_len = 0
|
return 0
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_python_env_size(cls, bpmn_process_instance: BpmnWorkflow) -> int:
|
||||||
|
user_defined_state = (
|
||||||
|
bpmn_process_instance.script_engine.environment.user_defined_state()
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
return len(json.dumps(user_defined_state))
|
||||||
|
except Exception:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
def check_task_data_size(self) -> None:
|
||||||
|
"""CheckTaskDataSize."""
|
||||||
|
task_data_len = self.get_task_data_size(self.bpmn_process_instance)
|
||||||
|
|
||||||
# Not sure what the number here should be but this now matches the mysql
|
# Not sure what the number here should be but this now matches the mysql
|
||||||
# max_allowed_packet variable on dev - 1073741824
|
# max_allowed_packet variable on dev - 1073741824
|
||||||
|
|
Loading…
Reference in New Issue