run_pyl and a changes to the way we loop through tasks for the interstitial page.

This commit is contained in:
Dan 2023-04-26 15:50:14 -04:00
parent 18789d9ea6
commit 2f17ee0579
7 changed files with 100 additions and 74 deletions

View File

@ -18,13 +18,13 @@ def setup_database_uri(app: Flask) -> None:
if app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_URI") is None:
database_name = f"spiffworkflow_backend_{app.config['ENV_IDENTIFIER']}"
if app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_TYPE") == "sqlite":
app.config["SQLALCHEMY_DATABASE_URI"] = (
f"sqlite:///{app.instance_path}/db_{app.config['ENV_IDENTIFIER']}.sqlite3"
)
app.config[
"SQLALCHEMY_DATABASE_URI"
] = f"sqlite:///{app.instance_path}/db_{app.config['ENV_IDENTIFIER']}.sqlite3"
elif app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_TYPE") == "postgres":
app.config["SQLALCHEMY_DATABASE_URI"] = (
f"postgresql://spiffworkflow_backend:spiffworkflow_backend@localhost:5432/{database_name}"
)
app.config[
"SQLALCHEMY_DATABASE_URI"
] = f"postgresql://spiffworkflow_backend:spiffworkflow_backend@localhost:5432/{database_name}"
else:
# use pswd to trick flake8 with hardcoded passwords
db_pswd = app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_PASSWORD")

View File

@ -127,9 +127,9 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel):
def serialized_with_metadata(self) -> dict[str, Any]:
process_instance_attributes = self.serialized
process_instance_attributes["process_metadata"] = self.process_metadata
process_instance_attributes["process_model_with_diagram_identifier"] = (
self.process_model_with_diagram_identifier
)
process_instance_attributes[
"process_model_with_diagram_identifier"
] = self.process_model_with_diagram_identifier
return process_instance_attributes
@property

View File

@ -20,6 +20,7 @@ from flask import make_response
from flask import stream_with_context
from flask.wrappers import Response
from jinja2 import TemplateSyntaxError
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow
from SpiffWorkflow.exceptions import WorkflowTaskException # type: ignore
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.task import TaskState
@ -385,46 +386,65 @@ def _render_instructions_for_end_user(task_model: TaskModel, extensions: Optiona
def _interstitial_stream(process_instance: ProcessInstanceModel) -> Generator[str, Optional[str], None]:
processor = ProcessInstanceProcessor(process_instance)
reported_ids = [] # bit of an issue with end tasks showing as getting completed twice.
spiff_task = processor.next_task()
task_model = TaskModel.query.filter_by(guid=str(spiff_task.id)).first()
last_task = None
while last_task != spiff_task:
task = ProcessInstanceService.spiff_task_to_api_task(processor, processor.next_task())
extensions = TaskService.get_extensions_from_task_model(task_model)
instructions = _render_instructions_for_end_user(task_model, extensions)
if instructions and spiff_task.id not in reported_ids:
reported_ids.append(spiff_task.id)
task.properties = extensions
yield f"data: {current_app.json.dumps(task)} \n\n"
last_task = spiff_task
try:
processor.do_engine_steps(execution_strategy_name="one_at_a_time")
processor.do_engine_steps(execution_strategy_name="run_until_user_message")
processor.save() # Fixme - maybe find a way not to do this on every loop?
except WorkflowTaskException as wfe:
api_error = ApiError.from_workflow_exception(
"engine_steps_error", "Failed complete an automated task.", exp=wfe
)
yield f"data: {current_app.json.dumps(api_error)} \n\n"
except Exception as e:
api_error = ApiError(
error_code="engine_steps_error",
message=f"Failed complete an automated task. Error was: {str(e)}",
status_code=400,
)
yield f"data: {current_app.json.dumps(api_error)} \n\n"
reported_ids = [] # A list of all the ids reported by this endpoint so far.
def get_reportable_tasks():
return processor.bpmn_process_instance.get_tasks(
TaskState.WAITING | TaskState.STARTED | TaskState.READY | TaskState.ERROR
)
tasks = get_reportable_tasks()
while True:
for spiff_task in tasks:
task_model = TaskModel.query.filter_by(guid=str(spiff_task.id)).first()
extensions = TaskService.get_extensions_from_task_model(task_model)
instructions = _render_instructions_for_end_user(task_model, extensions)
if instructions and spiff_task.id not in reported_ids:
task = ProcessInstanceService.spiff_task_to_api_task(processor, spiff_task)
task.properties = extensions
yield f"data: {current_app.json.dumps(task)} \n\n"
reported_ids.append(spiff_task.id)
if spiff_task.state == TaskState.READY:
try:
processor.do_engine_steps(execution_strategy_name="run_until_user_message")
processor.save() # Fixme - maybe find a way not to do this on every loop?
except WorkflowTaskException as wfe:
api_error = ApiError.from_workflow_exception(
"engine_steps_error", "Failed complete an automated task.", exp=wfe
)
yield f"data: {current_app.json.dumps(api_error)} \n\n"
return
except Exception as e:
api_error = ApiError(
error_code="engine_steps_error",
message=f"Failed complete an automated task. Error was: {str(e)}",
status_code=400,
)
yield f"data: {current_app.json.dumps(api_error)} \n\n"
return
processor.bpmn_process_instance.refresh_waiting_tasks()
ready_engine_task_count = get_ready_engine_step_count(processor.bpmn_process_instance)
if ready_engine_task_count == 0:
break # No more tasks to report
task = ProcessInstanceService.spiff_task_to_api_task(processor, processor.next_task())
if task.id not in reported_ids:
yield f"data: {current_app.json.dumps(task)} \n\n"
# Note, this has to be done in case someone leaves the page,
# which can otherwise cancel this function and leave completed tasks un-registered.
spiff_task = processor.next_task()
task_model = TaskModel.query.filter_by(guid=str(spiff_task.id)).first()
# Always provide some response, in the event no instructions were provided.
if len(reported_ids) == 0:
task = ProcessInstanceService.spiff_task_to_api_task(processor, processor.next_task())
yield f"data: {current_app.json.dumps(task)} \n\n"
def get_ready_engine_step_count(bpmn_process_instance: BpmnWorkflow):
return len(
list(
[
t
for t in bpmn_process_instance.get_tasks(TaskState.READY)
if bpmn_process_instance._is_engine_task(t.task_spec)
]
)
)
def _dequeued_interstitial_stream(process_instance_id: int) -> Generator[str, Optional[str], None]:
process_instance = _find_process_instance_by_id_or_raise(process_instance_id)

View File

@ -423,9 +423,9 @@ class ProcessInstanceProcessor:
tld.process_instance_id = process_instance_model.id
# we want this to be the fully qualified path to the process model including all group subcomponents
current_app.config["THREAD_LOCAL_DATA"].process_model_identifier = (
f"{process_instance_model.process_model_identifier}"
)
current_app.config[
"THREAD_LOCAL_DATA"
].process_model_identifier = f"{process_instance_model.process_model_identifier}"
self.process_instance_model = process_instance_model
self.process_model_service = ProcessModelService()
@ -585,9 +585,9 @@ class ProcessInstanceProcessor:
bpmn_subprocess_definition.bpmn_identifier
] = bpmn_process_definition_dict
spiff_bpmn_process_dict["subprocess_specs"][bpmn_subprocess_definition.bpmn_identifier]["task_specs"] = {}
bpmn_subprocess_definition_bpmn_identifiers[bpmn_subprocess_definition.id] = (
bpmn_subprocess_definition.bpmn_identifier
)
bpmn_subprocess_definition_bpmn_identifiers[
bpmn_subprocess_definition.id
] = bpmn_subprocess_definition.bpmn_identifier
task_definitions = TaskDefinitionModel.query.filter(
TaskDefinitionModel.bpmn_process_definition_id.in_( # type: ignore

View File

@ -220,14 +220,15 @@ class TaskService:
if task_model.state == "COMPLETED":
event_type = ProcessInstanceEventType.task_completed.value
timestamp = task_model.end_in_seconds or task_model.start_in_seconds or time.time()
process_instance_event, _process_instance_error_detail = (
ProcessInstanceTmpService.add_event_to_process_instance(
self.process_instance,
event_type,
task_guid=task_model.guid,
timestamp=timestamp,
add_to_db_session=False,
)
(
process_instance_event,
_process_instance_error_detail,
) = ProcessInstanceTmpService.add_event_to_process_instance(
self.process_instance,
event_type,
task_guid=task_model.guid,
timestamp=timestamp,
add_to_db_session=False,
)
self.process_instance_events[task_model.guid] = process_instance_event
@ -454,7 +455,6 @@ class TaskService:
spiff_task,
self.bpmn_definition_to_task_definitions_mappings,
)
self.update_task_model(task_model, spiff_task)
self.task_models[task_model.guid] = task_model

View File

@ -299,28 +299,33 @@ class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy):
class RunUntilUserTaskOrMessageExecutionStrategy(ExecutionStrategy):
"""When you want to run tasks until you hit something to report to the end user."""
"""When you want to run tasks until you hit something to report to the end user.
def get_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> list[SpiffTask]:
return list(
[
t
for t in bpmn_process_instance.get_tasks(TaskState.READY)
if t.task_spec.spec_type not in ["User Task", "Manual Task"]
and not (
hasattr(t.task_spec, "extensions") and t.task_spec.extensions.get("instructionsForEndUser", None)
)
]
)
Note that this will run at least one engine step if possible,
but will stop if it hits instructions after the first task.
"""
def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
engine_steps = self.get_engine_steps(bpmn_process_instance)
bpmn_process_instance.refresh_waiting_tasks()
engine_steps = self.get_ready_engine_steps(bpmn_process_instance)
if len(engine_steps) > 0:
self.delegate.will_complete_task(engine_steps[0])
engine_steps[0].run()
self.delegate.did_complete_task(engine_steps[0])
bpmn_process_instance.refresh_waiting_tasks()
engine_steps = self.get_ready_engine_steps(bpmn_process_instance)
while engine_steps:
for task in engine_steps:
if hasattr(task.task_spec, "extensions") and task.task_spec.extensions.get(
"instructionsForEndUser", None
):
should_continue = False
break
self.delegate.will_complete_task(task)
task.run()
self.delegate.did_complete_task(task)
engine_steps = self.get_engine_steps(bpmn_process_instance)
engine_steps = self.get_ready_engine_steps(bpmn_process_instance)
self.delegate.after_engine_steps(bpmn_process_instance)

View File

@ -1408,6 +1408,7 @@ export default function ProcessInstanceListTable({
);
};
// eslint-disable-next-line sonarjs/cognitive-complexity
const buildTable = () => {
const headerLabels: Record<string, string> = {
id: 'Id',
@ -1449,11 +1450,11 @@ export default function ProcessInstanceListTable({
buttonElement = (
<Button
kind={
hasAccessToCompleteTask && row.task_id ? 'secondary' : 'tertiary'
hasAccessToCompleteTask && row.task_id ? 'secondary' : 'ghost'
}
href={interstitialUrl}
>
Go
{hasAccessToCompleteTask && row.task_id ? 'Go' : 'View'}
</Button>
);
currentRow.push(<td>{buttonElement}</td>);