Unfactor to fix size calculation/type hint issues
This commit is contained in:
parent
a4c0dd52ae
commit
2afc19175e
|
@ -22,7 +22,8 @@ class GetDataSizes(Script):
|
|||
|
||||
def get_description(self) -> str:
|
||||
"""Get_description."""
|
||||
return """Returns a dictionary of information about the size of task data and the python environment for the currently running process."""
|
||||
return """Returns a dictionary of information about the size of task data and
|
||||
the python environment for the currently running process."""
|
||||
|
||||
def run(
|
||||
self,
|
||||
|
@ -32,11 +33,11 @@ class GetDataSizes(Script):
|
|||
) -> Any:
|
||||
"""Run."""
|
||||
task = script_attributes_context.task
|
||||
cumulative_task_data_size = ProcessInstanceProcessor.get_task_data_size(
|
||||
task.workflow
|
||||
)
|
||||
python_env_size = ProcessInstanceProcessor.get_python_env_size(task.workflow)
|
||||
return {
|
||||
"cumulative_task_data_size": ProcessInstanceProcessor.get_task_data_size(
|
||||
task.workflow
|
||||
),
|
||||
"python_env_size": ProcessInstanceProcessor.get_python_env_size(
|
||||
task.workflow
|
||||
),
|
||||
"cumulative_task_data_size": cumulative_task_data_size,
|
||||
"python_env_size": python_env_size,
|
||||
}
|
||||
|
|
|
@ -1607,27 +1607,27 @@ class ProcessInstanceProcessor:
|
|||
except WorkflowTaskException as we:
|
||||
raise ApiError.from_workflow_exception("task_error", str(we), we) from we
|
||||
|
||||
@classmethod
|
||||
def _get_data_size(cls, data: Dict[Any, Any]) -> int:
|
||||
data_to_check = list(filter(len, data))
|
||||
|
||||
try:
|
||||
return len(json.dumps(data_to_check))
|
||||
except Exception:
|
||||
return 0
|
||||
|
||||
@classmethod
|
||||
def get_task_data_size(cls, bpmn_process_instance: BpmnWorkflow) -> int:
|
||||
tasks_to_check = bpmn_process_instance.get_tasks(TaskState.FINISHED_MASK)
|
||||
task_data = [task.data for task in tasks_to_check]
|
||||
return cls._get_data_size(task_data)
|
||||
task_data_to_check = list(filter(len, task_data))
|
||||
|
||||
try:
|
||||
return len(json.dumps(task_data_to_check))
|
||||
except Exception:
|
||||
return 0
|
||||
|
||||
@classmethod
|
||||
def get_python_env_size(cls, bpmn_process_instance: BpmnWorkflow) -> int:
|
||||
user_defined_state = (
|
||||
bpmn_process_instance.script_engine.environment.user_defined_state()
|
||||
)
|
||||
return cls._get_data_size(user_defined_state)
|
||||
|
||||
try:
|
||||
return len(json.dumps(user_defined_state))
|
||||
except Exception:
|
||||
return 0
|
||||
|
||||
def check_task_data_size(self) -> None:
|
||||
"""CheckTaskDataSize."""
|
||||
|
|
Loading…
Reference in New Issue