Adding python env size

This commit is contained in:
Jon Herron 2023-02-22 10:55:09 -05:00
parent 2fa4623876
commit d5fa8eae21
2 changed files with 15 additions and 5 deletions

View File

@ -33,5 +33,5 @@ class GetDataSizes(Script):
task = script_attributes_context.task
return {
"cumulative_task_data_size": ProcessInstanceProcessor.get_task_data_size(task.workflow),
"python_env_size": 0,
"python_env_size": ProcessInstanceProcessor.get_python_env_size(task.workflow),
}

View File

@ -152,6 +152,11 @@ class BoxedTaskDataBasedScriptEngineEnvironment(BoxedTaskDataEnvironment): # ty
super().execute(script, context, external_methods)
self._last_result = context
def user_defined_state(
self, external_methods: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
return {}
def last_result(self) -> Dict[str, Any]:
return {k: v for k, v in self._last_result.items()}
@ -218,13 +223,13 @@ class NonTaskDataBasedScriptEngineEnvironment(BasePythonScriptEngineEnvironment)
for key_to_drop in context_keys_to_drop:
context.pop(key_to_drop)
self.state = self._user_defined_state(external_methods)
self.state = self.user_defined_state(external_methods)
# the task data needs to be updated with the current state so data references can be resolved properly.
# the state will be removed later once the task is completed.
context.update(self.state)
def _user_defined_state(
def user_defined_state(
self, external_methods: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
keys_to_filter = self.non_user_defined_keys
@ -245,7 +250,7 @@ class NonTaskDataBasedScriptEngineEnvironment(BasePythonScriptEngineEnvironment)
def preserve_state(self, bpmn_process_instance: BpmnWorkflow) -> None:
key = self.PYTHON_ENVIRONMENT_STATE_KEY
state = self._user_defined_state()
state = self.user_defined_state()
bpmn_process_instance.data[key] = state
def restore_state(self, bpmn_process_instance: BpmnWorkflow) -> None:
@ -253,7 +258,7 @@ class NonTaskDataBasedScriptEngineEnvironment(BasePythonScriptEngineEnvironment)
self.state = bpmn_process_instance.data.get(key, {})
def finalize_result(self, bpmn_process_instance: BpmnWorkflow) -> None:
bpmn_process_instance.data.update(self._user_defined_state())
bpmn_process_instance.data.update(self.user_defined_state())
def revise_state_with_task_data(self, task: SpiffTask) -> None:
state_keys = set(self.state.keys())
@ -1617,6 +1622,11 @@ class ProcessInstanceProcessor:
task_data = [task.data for task in tasks_to_check]
return cls._get_data_size(task_data)
@classmethod
def get_python_env_size(cls, bpmn_process_instance: BpmnWorkflow) -> int:
user_defined_state = bpmn_process_instance.script_engine.environment.user_defined_state()
return cls._get_data_size(user_defined_state)
def check_task_data_size(self) -> None:
"""CheckTaskDataSize."""
task_data_len = self.get_task_data_size(self.bpmn_process_instance)