Merge pull request #130 from sartography/save_step_data
POC for saving some data about each step
This commit is contained in:
commit
5c998f9909
|
@ -50,6 +50,8 @@ if [[ "${1:-}" == "clean" ]]; then
|
||||||
docker exec -it postgres-spiff psql -U spiffworkflow_backend spiffworkflow_backend_testing -c "create database spiffworkflow_backend_local_development;"
|
docker exec -it postgres-spiff psql -U spiffworkflow_backend spiffworkflow_backend_testing -c "create database spiffworkflow_backend_local_development;"
|
||||||
fi
|
fi
|
||||||
fi
|
fi
|
||||||
|
elif [[ "${1:-}" == "migrate" ]]; then
|
||||||
|
tasks="$tasks migrate"
|
||||||
fi
|
fi
|
||||||
tasks="$tasks upgrade"
|
tasks="$tasks upgrade"
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,38 @@
|
||||||
|
"""empty message
|
||||||
|
|
||||||
|
Revision ID: e05ca5cdc312
|
||||||
|
Revises: ca9b79dde5cc
|
||||||
|
Create Date: 2023-02-08 12:21:41.722774
|
||||||
|
|
||||||
|
"""
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
from sqlalchemy.dialects import mysql
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision = 'e05ca5cdc312'
|
||||||
|
down_revision = 'ca9b79dde5cc'
|
||||||
|
branch_labels = None
|
||||||
|
depends_on = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade():
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
|
op.add_column('spiff_step_details', sa.Column('task_state', sa.String(length=50), nullable=False))
|
||||||
|
op.add_column('spiff_step_details', sa.Column('task_id', sa.String(length=50), nullable=False))
|
||||||
|
op.add_column('spiff_step_details', sa.Column('bpmn_task_identifier', sa.String(length=255), nullable=False))
|
||||||
|
op.add_column('spiff_step_details', sa.Column('engine_step_start_in_seconds', sa.DECIMAL(precision=17, scale=6), nullable=True))
|
||||||
|
op.add_column('spiff_step_details', sa.Column('engine_step_end_in_seconds', sa.DECIMAL(precision=17, scale=6), nullable=True))
|
||||||
|
op.drop_column('spiff_step_details', 'timestamp')
|
||||||
|
# ### end Alembic commands ###
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade():
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
|
op.add_column('spiff_step_details', sa.Column('timestamp', mysql.DECIMAL(precision=17, scale=6), nullable=False))
|
||||||
|
op.drop_column('spiff_step_details', 'engine_step_end_in_seconds')
|
||||||
|
op.drop_column('spiff_step_details', 'engine_step_start_in_seconds')
|
||||||
|
op.drop_column('spiff_step_details', 'bpmn_task_identifier')
|
||||||
|
op.drop_column('spiff_step_details', 'task_id')
|
||||||
|
op.drop_column('spiff_step_details', 'task_state')
|
||||||
|
# ### end Alembic commands ###
|
|
@ -27,4 +27,10 @@ class SpiffStepDetailsModel(SpiffworkflowBaseDBModel):
|
||||||
)
|
)
|
||||||
spiff_step: int = db.Column(db.Integer, nullable=False)
|
spiff_step: int = db.Column(db.Integer, nullable=False)
|
||||||
task_json: dict = deferred(db.Column(db.JSON, nullable=False)) # type: ignore
|
task_json: dict = deferred(db.Column(db.JSON, nullable=False)) # type: ignore
|
||||||
timestamp: float = db.Column(db.DECIMAL(17, 6), nullable=False)
|
task_id: str = db.Column(db.String(50), nullable=False)
|
||||||
|
task_state: str = db.Column(db.String(50), nullable=False)
|
||||||
|
bpmn_task_identifier: str = db.Column(db.String(255), nullable=False)
|
||||||
|
|
||||||
|
# timestamp: float = db.Column(db.DECIMAL(17, 6), nullable=False)
|
||||||
|
engine_step_start_in_seconds: float | None = db.Column(db.DECIMAL(17, 6))
|
||||||
|
engine_step_end_in_seconds: float | None = db.Column(db.DECIMAL(17, 6))
|
||||||
|
|
|
@ -556,20 +556,30 @@ def process_instance_task_list(
|
||||||
get_task_data: bool = False,
|
get_task_data: bool = False,
|
||||||
) -> flask.wrappers.Response:
|
) -> flask.wrappers.Response:
|
||||||
"""Process_instance_task_list."""
|
"""Process_instance_task_list."""
|
||||||
|
step_detail_query = db.session.query(SpiffStepDetailsModel).filter(
|
||||||
|
SpiffStepDetailsModel.process_instance_id == process_instance.id,
|
||||||
|
)
|
||||||
|
|
||||||
if spiff_step > 0:
|
if spiff_step > 0:
|
||||||
step_detail = (
|
step_detail_query = step_detail_query.filter(
|
||||||
db.session.query(SpiffStepDetailsModel)
|
SpiffStepDetailsModel.spiff_step <= spiff_step
|
||||||
.filter(
|
|
||||||
SpiffStepDetailsModel.process_instance_id == process_instance.id,
|
|
||||||
SpiffStepDetailsModel.spiff_step == spiff_step,
|
|
||||||
)
|
|
||||||
.first()
|
|
||||||
)
|
)
|
||||||
if step_detail is not None and process_instance.bpmn_json is not None:
|
|
||||||
bpmn_json = json.loads(process_instance.bpmn_json)
|
step_details = step_detail_query.all()
|
||||||
bpmn_json["tasks"] = step_detail.task_json["tasks"]
|
bpmn_json = json.loads(process_instance.bpmn_json or "{}")
|
||||||
bpmn_json["subprocesses"] = step_detail.task_json["subprocesses"]
|
tasks = bpmn_json["tasks"]
|
||||||
process_instance.bpmn_json = json.dumps(bpmn_json)
|
|
||||||
|
# if step_detail is not None and process_instance.bpmn_json is not None:
|
||||||
|
for step_detail in step_details:
|
||||||
|
if step_detail.task_id in tasks:
|
||||||
|
task_data = (
|
||||||
|
step_detail.task_json["task_data"] | step_detail.task_json["python_env"]
|
||||||
|
)
|
||||||
|
if task_data is None:
|
||||||
|
task_data = {}
|
||||||
|
tasks[step_detail.task_id]["data"] = task_data
|
||||||
|
|
||||||
|
process_instance.bpmn_json = json.dumps(bpmn_json)
|
||||||
|
|
||||||
processor = ProcessInstanceProcessor(process_instance)
|
processor = ProcessInstanceProcessor(process_instance)
|
||||||
|
|
||||||
|
|
|
@ -30,6 +30,7 @@ from SpiffWorkflow.bpmn.PythonScriptEngine import PythonScriptEngine # type: ig
|
||||||
from SpiffWorkflow.bpmn.PythonScriptEngineEnvironment import BasePythonScriptEngineEnvironment # type: ignore
|
from SpiffWorkflow.bpmn.PythonScriptEngineEnvironment import BasePythonScriptEngineEnvironment # type: ignore
|
||||||
from SpiffWorkflow.bpmn.PythonScriptEngineEnvironment import Box
|
from SpiffWorkflow.bpmn.PythonScriptEngineEnvironment import Box
|
||||||
from SpiffWorkflow.bpmn.PythonScriptEngineEnvironment import BoxedTaskDataEnvironment
|
from SpiffWorkflow.bpmn.PythonScriptEngineEnvironment import BoxedTaskDataEnvironment
|
||||||
|
from SpiffWorkflow.bpmn.serializer.helpers.registry import DefaultRegistry # type: ignore
|
||||||
from SpiffWorkflow.bpmn.serializer.task_spec import ( # type: ignore
|
from SpiffWorkflow.bpmn.serializer.task_spec import ( # type: ignore
|
||||||
EventBasedGatewayConverter,
|
EventBasedGatewayConverter,
|
||||||
)
|
)
|
||||||
|
@ -205,6 +206,13 @@ class NonTaskDataBasedScriptEngineEnvironment(BasePythonScriptEngineEnvironment)
|
||||||
self.state.update(context)
|
self.state.update(context)
|
||||||
exec(script, self.state) # noqa
|
exec(script, self.state) # noqa
|
||||||
|
|
||||||
|
# since the task data is not directly mutated when the script executes, need to determine which keys
|
||||||
|
# have been deleted from the environment and remove them from task data if present.
|
||||||
|
context_keys_to_drop = context.keys() - self.state.keys()
|
||||||
|
|
||||||
|
for key_to_drop in context_keys_to_drop:
|
||||||
|
context.pop(key_to_drop)
|
||||||
|
|
||||||
self.state = self._user_defined_state(external_methods)
|
self.state = self._user_defined_state(external_methods)
|
||||||
|
|
||||||
# the task data needs to be updated with the current state so data references can be resolved properly.
|
# the task data needs to be updated with the current state so data references can be resolved properly.
|
||||||
|
@ -674,22 +682,44 @@ class ProcessInstanceProcessor:
|
||||||
"lane_assignment_id": lane_assignment_id,
|
"lane_assignment_id": lane_assignment_id,
|
||||||
}
|
}
|
||||||
|
|
||||||
def spiff_step_details_mapping(self) -> dict:
|
def spiff_step_details_mapping(
|
||||||
|
self,
|
||||||
|
spiff_task: Optional[SpiffTask] = None,
|
||||||
|
start_in_seconds: Optional[float] = 0,
|
||||||
|
end_in_seconds: Optional[float] = 0,
|
||||||
|
) -> dict:
|
||||||
"""SaveSpiffStepDetails."""
|
"""SaveSpiffStepDetails."""
|
||||||
# bpmn_json = self.serialize()
|
# bpmn_json = self.serialize()
|
||||||
# wf_json = json.loads(bpmn_json)
|
# wf_json = json.loads(bpmn_json)
|
||||||
|
default_registry = DefaultRegistry()
|
||||||
|
|
||||||
|
if spiff_task is None:
|
||||||
|
# TODO: safer to pass in task vs use last task?
|
||||||
|
spiff_task = self.bpmn_process_instance.last_task
|
||||||
|
|
||||||
|
if spiff_task is None:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
task_data = default_registry.convert(spiff_task.data)
|
||||||
|
python_env = default_registry.convert(
|
||||||
|
self._script_engine.environment.last_result()
|
||||||
|
)
|
||||||
|
|
||||||
task_json: Dict[str, Any] = {
|
task_json: Dict[str, Any] = {
|
||||||
# "tasks": wf_json["tasks"],
|
# "tasks": wf_json["tasks"],
|
||||||
# "subprocesses": wf_json["subprocesses"],
|
# "subprocesses": wf_json["subprocesses"],
|
||||||
# "python_env": self._script_engine.environment.last_result(),
|
"task_data": task_data,
|
||||||
|
"python_env": python_env,
|
||||||
}
|
}
|
||||||
|
|
||||||
return {
|
return {
|
||||||
"process_instance_id": self.process_instance_model.id,
|
"process_instance_id": self.process_instance_model.id,
|
||||||
"spiff_step": self.process_instance_model.spiff_step or 1,
|
"spiff_step": self.process_instance_model.spiff_step or 1,
|
||||||
"task_json": task_json,
|
"task_json": task_json,
|
||||||
"timestamp": round(time.time()),
|
"task_id": str(spiff_task.id),
|
||||||
# "completed_by_user_id": self.current_user().id,
|
"task_state": spiff_task.state,
|
||||||
|
"bpmn_task_identifier": spiff_task.task_spec.name,
|
||||||
|
"engine_step_start_in_seconds": start_in_seconds,
|
||||||
|
"engine_step_end_in_seconds": end_in_seconds,
|
||||||
}
|
}
|
||||||
|
|
||||||
def spiff_step_details(self) -> SpiffStepDetailsModel:
|
def spiff_step_details(self) -> SpiffStepDetailsModel:
|
||||||
|
@ -1492,6 +1522,9 @@ class ProcessInstanceProcessor:
|
||||||
# "Subprocess"
|
# "Subprocess"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# making a dictionary to ensure we are not shadowing variables in the other methods
|
||||||
|
current_task_start_in_seconds = {}
|
||||||
|
|
||||||
def should_log(task: SpiffTask) -> bool:
|
def should_log(task: SpiffTask) -> bool:
|
||||||
if (
|
if (
|
||||||
task.task_spec.spec_type in tasks_to_log
|
task.task_spec.spec_type in tasks_to_log
|
||||||
|
@ -1502,12 +1535,17 @@ class ProcessInstanceProcessor:
|
||||||
|
|
||||||
def will_complete_task(task: SpiffTask) -> None:
|
def will_complete_task(task: SpiffTask) -> None:
|
||||||
if should_log(task):
|
if should_log(task):
|
||||||
|
current_task_start_in_seconds["time"] = time.time()
|
||||||
self.increment_spiff_step()
|
self.increment_spiff_step()
|
||||||
|
|
||||||
def did_complete_task(task: SpiffTask) -> None:
|
def did_complete_task(task: SpiffTask) -> None:
|
||||||
if should_log(task):
|
if should_log(task):
|
||||||
self._script_engine.environment.revise_state_with_task_data(task)
|
self._script_engine.environment.revise_state_with_task_data(task)
|
||||||
step_details.append(self.spiff_step_details_mapping())
|
step_details.append(
|
||||||
|
self.spiff_step_details_mapping(
|
||||||
|
task, current_task_start_in_seconds["time"], time.time()
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.bpmn_process_instance.refresh_waiting_tasks()
|
self.bpmn_process_instance.refresh_waiting_tasks()
|
||||||
|
|
Loading…
Reference in New Issue