Allow for different Python Environments when executing scripts within SpiffWorkflow (#121)

This commit is contained in:
jbirddog 2023-02-02 10:24:55 -05:00 committed by GitHub
parent 6a0848f895
commit a815863727
4 changed files with 168 additions and 26 deletions

View File

@ -1825,7 +1825,7 @@ lxml = "*"
type = "git"
url = "https://github.com/sartography/SpiffWorkflow"
reference = "main"
resolved_reference = "98c6294f1240aee599cd98bcee58d121cb57b331"
resolved_reference = "64737498caa36c25b12f5216bdc9c30338b2a1fa"
[[package]]
name = "SQLAlchemy"
@ -2863,10 +2863,7 @@ orjson = [
{file = "orjson-3.8.0-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:b68a42a31f8429728183c21fb440c21de1b62e5378d0d73f280e2d894ef8942e"},
{file = "orjson-3.8.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:ff13410ddbdda5d4197a4a4c09969cb78c722a67550f0a63c02c07aadc624833"},
{file = "orjson-3.8.0-cp310-none-win_amd64.whl", hash = "sha256:2d81e6e56bbea44be0222fb53f7b255b4e7426290516771592738ca01dbd053b"},
{file = "orjson-3.8.0-cp311-cp311-macosx_10_7_x86_64.whl", hash = "sha256:200eae21c33f1f8b02a11f5d88d76950cd6fd986d88f1afe497a8ae2627c49aa"},
{file = "orjson-3.8.0-cp311-cp311-macosx_10_9_x86_64.macosx_11_0_arm64.macosx_10_9_universal2.whl", hash = "sha256:9529990f3eab54b976d327360aa1ff244a4b12cb5e4c5b3712fcdd96e8fe56d4"},
{file = "orjson-3.8.0-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:e2defd9527651ad39ec20ae03c812adf47ef7662bdd6bc07dabb10888d70dc62"},
{file = "orjson-3.8.0-cp311-none-win_amd64.whl", hash = "sha256:b21c7af0ff6228ca7105f54f0800636eb49201133e15ddb80ac20c1ce973ef07"},
{file = "orjson-3.8.0-cp37-cp37m-macosx_10_7_x86_64.whl", hash = "sha256:9e6ac22cec72d5b39035b566e4b86c74b84866f12b5b0b6541506a080fb67d6d"},
{file = "orjson-3.8.0-cp37-cp37m-macosx_10_9_x86_64.macosx_11_0_arm64.macosx_10_9_universal2.whl", hash = "sha256:e2f4a5542f50e3d336a18cb224fc757245ca66b1fd0b70b5dd4471b8ff5f2b0e"},
{file = "orjson-3.8.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e1418feeb8b698b9224b1f024555895169d481604d5d884498c1838d7412794c"},

View File

@ -26,8 +26,10 @@ from lxml import etree # type: ignore
from lxml.etree import XMLSyntaxError # type: ignore
from RestrictedPython import safe_globals # type: ignore
from SpiffWorkflow.bpmn.parser.ValidationException import ValidationException # type: ignore
from SpiffWorkflow.bpmn.PythonScriptEngine import Box # type: ignore
from SpiffWorkflow.bpmn.PythonScriptEngine import PythonScriptEngine
from SpiffWorkflow.bpmn.PythonScriptEngine import PythonScriptEngine # type: ignore
from SpiffWorkflow.bpmn.PythonScriptEngineEnvironment import BasePythonScriptEngineEnvironment # type: ignore
from SpiffWorkflow.bpmn.PythonScriptEngineEnvironment import Box
from SpiffWorkflow.bpmn.PythonScriptEngineEnvironment import BoxedTaskDataEnvironment
from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore
from SpiffWorkflow.bpmn.specs.BpmnProcessSpec import BpmnProcessSpec # type: ignore
from SpiffWorkflow.bpmn.specs.events.EndEvent import EndEvent # type: ignore
@ -150,6 +152,132 @@ class ProcessInstanceLockedBySomethingElseError(Exception):
pass
class BoxedTaskDataBasedScriptEngineEnvironment(BoxedTaskDataEnvironment): # type: ignore
def __init__(self, environment_globals: Dict[str, Any]):
"""BoxedTaskDataBasedScriptEngineEnvironment."""
self._last_result: Dict[str, Any] = {}
super().__init__(environment_globals)
def execute(
self,
script: str,
context: Dict[str, Any],
external_methods: Optional[Dict[str, Any]] = None,
) -> None:
super().execute(script, context, external_methods)
self._last_result = context
def last_result(self) -> Dict[str, Any]:
return self._last_result
def clear_state(self) -> None:
pass
def preserve_state(self, bpmn_process_instance: BpmnWorkflow) -> None:
pass
def restore_state(self, bpmn_process_instance: BpmnWorkflow) -> None:
pass
def finalize_result(self, bpmn_process_instance: BpmnWorkflow) -> None:
pass
def revise_state_with_task_data(self, task: SpiffTask) -> None:
pass
class NonTaskDataBasedScriptEngineEnvironment(BasePythonScriptEngineEnvironment): # type: ignore
PYTHON_ENVIRONMENT_STATE_KEY = "spiff__python_env_state"
def __init__(self, environment_globals: Dict[str, Any]):
"""NonTaskDataBasedScriptEngineEnvironment."""
self.state: Dict[str, Any] = {}
self.non_user_defined_keys = set(
[*environment_globals.keys()] + ["__builtins__", "current_user"]
)
super().__init__(environment_globals)
def evaluate(
self,
expression: str,
context: Dict[str, Any],
external_methods: Optional[dict[str, Any]] = None,
) -> Any:
# TODO: once integrated look at the tests that fail without Box
Box.convert_to_box(context)
state = {}
state.update(self.globals)
state.update(external_methods or {})
state.update(self.state)
state.update(context)
return eval(expression, state) # noqa
def execute(
self,
script: str,
context: Dict[str, Any],
external_methods: Optional[Dict[str, Any]] = None,
) -> None:
# TODO: once integrated look at the tests that fail without Box
Box.convert_to_box(context)
self.state.update(self.globals)
self.state.update(external_methods or {})
self.state.update(context)
exec(script, self.state) # noqa
self.state = self._user_defined_state(external_methods)
# the task data needs to be updated with the current state so data references can be resolved properly.
# the state will be removed later once the task is completed.
context.update(self.state)
def _user_defined_state(
self, external_methods: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
keys_to_filter = self.non_user_defined_keys
if external_methods is not None:
keys_to_filter |= set(external_methods.keys())
return {
k: v
for k, v in self.state.items()
if k not in keys_to_filter and not callable(v)
}
def last_result(self) -> Dict[str, Any]:
return self.state
def clear_state(self) -> None:
self.state = {}
def preserve_state(self, bpmn_process_instance: BpmnWorkflow) -> None:
key = self.PYTHON_ENVIRONMENT_STATE_KEY
state = self._user_defined_state()
bpmn_process_instance.data[key] = state
def restore_state(self, bpmn_process_instance: BpmnWorkflow) -> None:
key = self.PYTHON_ENVIRONMENT_STATE_KEY
self.state = bpmn_process_instance.data.get(key, {})
def finalize_result(self, bpmn_process_instance: BpmnWorkflow) -> None:
bpmn_process_instance.data.update(self._user_defined_state())
def revise_state_with_task_data(self, task: SpiffTask) -> None:
state_keys = set(self.state.keys())
task_data_keys = set(task.data.keys())
state_keys_to_remove = state_keys - task_data_keys
task_data_keys_to_keep = task_data_keys - state_keys
self.state = {
k: v for k, v in self.state.items() if k not in state_keys_to_remove
}
task.data = {k: v for k, v in task.data.items() if k in task_data_keys_to_keep}
class CustomScriptEngineEnvironment(BoxedTaskDataBasedScriptEngineEnvironment):
pass
class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore
"""This is a custom script processor that can be easily injected into Spiff Workflow.
@ -179,7 +307,9 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore
default_globals.update(safe_globals)
default_globals["__builtins__"]["__import__"] = _import
super().__init__(default_globals=default_globals)
environment = CustomScriptEngineEnvironment(default_globals)
super().__init__(environment=environment)
def __get_augment_methods(self, task: SpiffTask) -> Dict[str, Callable]:
"""__get_augment_methods."""
@ -392,7 +522,7 @@ class ProcessInstanceProcessor:
validate_only,
subprocesses=subprocesses,
)
self.bpmn_process_instance.script_engine = self._script_engine
self.set_script_engine(self.bpmn_process_instance)
self.add_user_info_to_process_instance(self.bpmn_process_instance)
except MissingSpecError as ke:
@ -438,6 +568,18 @@ class ProcessInstanceProcessor:
bpmn_process_spec, subprocesses
)
@staticmethod
def set_script_engine(bpmn_process_instance: BpmnWorkflow) -> None:
ProcessInstanceProcessor._script_engine.environment.restore_state(
bpmn_process_instance
)
bpmn_process_instance.script_engine = ProcessInstanceProcessor._script_engine
def preserve_script_engine_state(self) -> None:
ProcessInstanceProcessor._script_engine.environment.preserve_state(
self.bpmn_process_instance
)
def current_user(self) -> Any:
"""Current_user."""
current_user = None
@ -470,11 +612,12 @@ class ProcessInstanceProcessor:
subprocesses: Optional[IdToBpmnProcessSpecMapping] = None,
) -> BpmnWorkflow:
"""Get_bpmn_process_instance_from_workflow_spec."""
return BpmnWorkflow(
bpmn_process_instance = BpmnWorkflow(
spec,
script_engine=ProcessInstanceProcessor._script_engine,
subprocess_specs=subprocesses,
)
ProcessInstanceProcessor.set_script_engine(bpmn_process_instance)
return bpmn_process_instance
@staticmethod
def __get_bpmn_process_instance(
@ -501,9 +644,7 @@ class ProcessInstanceProcessor:
finally:
spiff_logger.setLevel(original_spiff_logger_log_level)
bpmn_process_instance.script_engine = (
ProcessInstanceProcessor._script_engine
)
ProcessInstanceProcessor.set_script_engine(bpmn_process_instance)
else:
bpmn_process_instance = (
ProcessInstanceProcessor.get_bpmn_process_instance_from_workflow_spec(
@ -1384,25 +1525,25 @@ class ProcessInstanceProcessor:
def do_engine_steps(self, exit_at: None = None, save: bool = False) -> None:
"""Do_engine_steps."""
step_details = []
def did_complete_task(task: SpiffTask) -> None:
self._script_engine.environment.revise_state_with_task_data(task)
step_details.append(self.spiff_step_details_mapping())
try:
self.bpmn_process_instance.refresh_waiting_tasks(
#
# commenting out to see if this helps with the growing spiff steps/db issue
#
# will_refresh_task=lambda t: self.increment_spiff_step(),
# did_refresh_task=lambda t: step_details.append(
# self.spiff_step_details_mapping()
# ),
)
self.bpmn_process_instance.refresh_waiting_tasks()
self.bpmn_process_instance.do_engine_steps(
exit_at=exit_at,
will_complete_task=lambda t: self.increment_spiff_step(),
did_complete_task=lambda t: step_details.append(
self.spiff_step_details_mapping()
),
did_complete_task=did_complete_task,
)
if self.bpmn_process_instance.is_completed():
self._script_engine.environment.finalize_result(
self.bpmn_process_instance
)
self.process_bpmn_messages()
self.queue_waiting_receive_messages()
@ -1466,6 +1607,7 @@ class ProcessInstanceProcessor:
def serialize(self) -> str:
"""Serialize."""
self.check_task_data_size()
self.preserve_script_engine_state()
return self._serializer.serialize_json(self.bpmn_process_instance) # type: ignore
def next_user_tasks(self) -> list[SpiffTask]:

View File

@ -45,6 +45,7 @@ class ScriptUnitTestRunner:
context = input_context.copy()
try:
cls._script_engine.environment.clear_state()
cls._script_engine._execute(context=context, script=script)
except SyntaxError as ex:
return ScriptUnitTestResult(
@ -77,6 +78,7 @@ class ScriptUnitTestRunner:
error=f"Failed to execute script: {error_message}",
)
context = cls._script_engine.environment.last_result()
result_as_boolean = context == expected_output_context
script_unit_test_result = ScriptUnitTestResult(

View File

@ -87,7 +87,8 @@ class TestGetLocaltime(BaseTest):
)
assert spiff_task
data = spiff_task.data
data = ProcessInstanceProcessor._script_engine.environment.last_result()
some_time = data["some_time"]
localtime = data["localtime"]
timezone = data["timezone"]