run_pyl and a changes to the way we loop through tasks for the interstitial page.

This commit is contained in:
Dan 2023-04-26 15:50:14 -04:00
parent ecee5a795b
commit 85c34de9df
7 changed files with 100 additions and 74 deletions

View File

@ -18,13 +18,13 @@ def setup_database_uri(app: Flask) -> None:
if app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_URI") is None: if app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_URI") is None:
database_name = f"spiffworkflow_backend_{app.config['ENV_IDENTIFIER']}" database_name = f"spiffworkflow_backend_{app.config['ENV_IDENTIFIER']}"
if app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_TYPE") == "sqlite": if app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_TYPE") == "sqlite":
app.config["SQLALCHEMY_DATABASE_URI"] = ( app.config[
f"sqlite:///{app.instance_path}/db_{app.config['ENV_IDENTIFIER']}.sqlite3" "SQLALCHEMY_DATABASE_URI"
) ] = f"sqlite:///{app.instance_path}/db_{app.config['ENV_IDENTIFIER']}.sqlite3"
elif app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_TYPE") == "postgres": elif app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_TYPE") == "postgres":
app.config["SQLALCHEMY_DATABASE_URI"] = ( app.config[
f"postgresql://spiffworkflow_backend:spiffworkflow_backend@localhost:5432/{database_name}" "SQLALCHEMY_DATABASE_URI"
) ] = f"postgresql://spiffworkflow_backend:spiffworkflow_backend@localhost:5432/{database_name}"
else: else:
# use pswd to trick flake8 with hardcoded passwords # use pswd to trick flake8 with hardcoded passwords
db_pswd = app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_PASSWORD") db_pswd = app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_PASSWORD")

View File

@ -127,9 +127,9 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel):
def serialized_with_metadata(self) -> dict[str, Any]: def serialized_with_metadata(self) -> dict[str, Any]:
process_instance_attributes = self.serialized process_instance_attributes = self.serialized
process_instance_attributes["process_metadata"] = self.process_metadata process_instance_attributes["process_metadata"] = self.process_metadata
process_instance_attributes["process_model_with_diagram_identifier"] = ( process_instance_attributes[
self.process_model_with_diagram_identifier "process_model_with_diagram_identifier"
) ] = self.process_model_with_diagram_identifier
return process_instance_attributes return process_instance_attributes
@property @property

View File

@ -20,6 +20,7 @@ from flask import make_response
from flask import stream_with_context from flask import stream_with_context
from flask.wrappers import Response from flask.wrappers import Response
from jinja2 import TemplateSyntaxError from jinja2 import TemplateSyntaxError
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow
from SpiffWorkflow.exceptions import WorkflowTaskException # type: ignore from SpiffWorkflow.exceptions import WorkflowTaskException # type: ignore
from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.task import TaskState from SpiffWorkflow.task import TaskState
@ -385,21 +386,26 @@ def _render_instructions_for_end_user(task_model: TaskModel, extensions: Optiona
def _interstitial_stream(process_instance: ProcessInstanceModel) -> Generator[str, Optional[str], None]: def _interstitial_stream(process_instance: ProcessInstanceModel) -> Generator[str, Optional[str], None]:
processor = ProcessInstanceProcessor(process_instance) processor = ProcessInstanceProcessor(process_instance)
reported_ids = [] # bit of an issue with end tasks showing as getting completed twice. reported_ids = [] # A list of all the ids reported by this endpoint so far.
spiff_task = processor.next_task()
def get_reportable_tasks():
return processor.bpmn_process_instance.get_tasks(
TaskState.WAITING | TaskState.STARTED | TaskState.READY | TaskState.ERROR
)
tasks = get_reportable_tasks()
while True:
for spiff_task in tasks:
task_model = TaskModel.query.filter_by(guid=str(spiff_task.id)).first() task_model = TaskModel.query.filter_by(guid=str(spiff_task.id)).first()
last_task = None
while last_task != spiff_task:
task = ProcessInstanceService.spiff_task_to_api_task(processor, processor.next_task())
extensions = TaskService.get_extensions_from_task_model(task_model) extensions = TaskService.get_extensions_from_task_model(task_model)
instructions = _render_instructions_for_end_user(task_model, extensions) instructions = _render_instructions_for_end_user(task_model, extensions)
if instructions and spiff_task.id not in reported_ids: if instructions and spiff_task.id not in reported_ids:
reported_ids.append(spiff_task.id) task = ProcessInstanceService.spiff_task_to_api_task(processor, spiff_task)
task.properties = extensions task.properties = extensions
yield f"data: {current_app.json.dumps(task)} \n\n" yield f"data: {current_app.json.dumps(task)} \n\n"
last_task = spiff_task reported_ids.append(spiff_task.id)
if spiff_task.state == TaskState.READY:
try: try:
processor.do_engine_steps(execution_strategy_name="one_at_a_time")
processor.do_engine_steps(execution_strategy_name="run_until_user_message") processor.do_engine_steps(execution_strategy_name="run_until_user_message")
processor.save() # Fixme - maybe find a way not to do this on every loop? processor.save() # Fixme - maybe find a way not to do this on every loop?
except WorkflowTaskException as wfe: except WorkflowTaskException as wfe:
@ -407,6 +413,7 @@ def _interstitial_stream(process_instance: ProcessInstanceModel) -> Generator[st
"engine_steps_error", "Failed complete an automated task.", exp=wfe "engine_steps_error", "Failed complete an automated task.", exp=wfe
) )
yield f"data: {current_app.json.dumps(api_error)} \n\n" yield f"data: {current_app.json.dumps(api_error)} \n\n"
return
except Exception as e: except Exception as e:
api_error = ApiError( api_error = ApiError(
error_code="engine_steps_error", error_code="engine_steps_error",
@ -414,17 +421,30 @@ def _interstitial_stream(process_instance: ProcessInstanceModel) -> Generator[st
status_code=400, status_code=400,
) )
yield f"data: {current_app.json.dumps(api_error)} \n\n" yield f"data: {current_app.json.dumps(api_error)} \n\n"
return
processor.bpmn_process_instance.refresh_waiting_tasks()
ready_engine_task_count = get_ready_engine_step_count(processor.bpmn_process_instance)
if ready_engine_task_count == 0:
break # No more tasks to report
task = ProcessInstanceService.spiff_task_to_api_task(processor, processor.next_task())
if task.id not in reported_ids:
yield f"data: {current_app.json.dumps(task)} \n\n"
# Note, this has to be done in case someone leaves the page, # Note, this has to be done in case someone leaves the page,
# which can otherwise cancel this function and leave completed tasks un-registered. # which can otherwise cancel this function and leave completed tasks un-registered.
spiff_task = processor.next_task() spiff_task = processor.next_task()
task_model = TaskModel.query.filter_by(guid=str(spiff_task.id)).first() task_model = TaskModel.query.filter_by(guid=str(spiff_task.id)).first()
# Always provide some response, in the event no instructions were provided. def get_ready_engine_step_count(bpmn_process_instance: BpmnWorkflow):
if len(reported_ids) == 0: return len(
task = ProcessInstanceService.spiff_task_to_api_task(processor, processor.next_task()) list(
yield f"data: {current_app.json.dumps(task)} \n\n" [
t
for t in bpmn_process_instance.get_tasks(TaskState.READY)
if bpmn_process_instance._is_engine_task(t.task_spec)
]
)
)
def _dequeued_interstitial_stream(process_instance_id: int) -> Generator[str, Optional[str], None]: def _dequeued_interstitial_stream(process_instance_id: int) -> Generator[str, Optional[str], None]:
process_instance = _find_process_instance_by_id_or_raise(process_instance_id) process_instance = _find_process_instance_by_id_or_raise(process_instance_id)

View File

@ -423,9 +423,9 @@ class ProcessInstanceProcessor:
tld.process_instance_id = process_instance_model.id tld.process_instance_id = process_instance_model.id
# we want this to be the fully qualified path to the process model including all group subcomponents # we want this to be the fully qualified path to the process model including all group subcomponents
current_app.config["THREAD_LOCAL_DATA"].process_model_identifier = ( current_app.config[
f"{process_instance_model.process_model_identifier}" "THREAD_LOCAL_DATA"
) ].process_model_identifier = f"{process_instance_model.process_model_identifier}"
self.process_instance_model = process_instance_model self.process_instance_model = process_instance_model
self.process_model_service = ProcessModelService() self.process_model_service = ProcessModelService()
@ -585,9 +585,9 @@ class ProcessInstanceProcessor:
bpmn_subprocess_definition.bpmn_identifier bpmn_subprocess_definition.bpmn_identifier
] = bpmn_process_definition_dict ] = bpmn_process_definition_dict
spiff_bpmn_process_dict["subprocess_specs"][bpmn_subprocess_definition.bpmn_identifier]["task_specs"] = {} spiff_bpmn_process_dict["subprocess_specs"][bpmn_subprocess_definition.bpmn_identifier]["task_specs"] = {}
bpmn_subprocess_definition_bpmn_identifiers[bpmn_subprocess_definition.id] = ( bpmn_subprocess_definition_bpmn_identifiers[
bpmn_subprocess_definition.bpmn_identifier bpmn_subprocess_definition.id
) ] = bpmn_subprocess_definition.bpmn_identifier
task_definitions = TaskDefinitionModel.query.filter( task_definitions = TaskDefinitionModel.query.filter(
TaskDefinitionModel.bpmn_process_definition_id.in_( # type: ignore TaskDefinitionModel.bpmn_process_definition_id.in_( # type: ignore

View File

@ -220,15 +220,16 @@ class TaskService:
if task_model.state == "COMPLETED": if task_model.state == "COMPLETED":
event_type = ProcessInstanceEventType.task_completed.value event_type = ProcessInstanceEventType.task_completed.value
timestamp = task_model.end_in_seconds or task_model.start_in_seconds or time.time() timestamp = task_model.end_in_seconds or task_model.start_in_seconds or time.time()
process_instance_event, _process_instance_error_detail = ( (
ProcessInstanceTmpService.add_event_to_process_instance( process_instance_event,
_process_instance_error_detail,
) = ProcessInstanceTmpService.add_event_to_process_instance(
self.process_instance, self.process_instance,
event_type, event_type,
task_guid=task_model.guid, task_guid=task_model.guid,
timestamp=timestamp, timestamp=timestamp,
add_to_db_session=False, add_to_db_session=False,
) )
)
self.process_instance_events[task_model.guid] = process_instance_event self.process_instance_events[task_model.guid] = process_instance_event
self.update_bpmn_process(spiff_task.workflow, bpmn_process) self.update_bpmn_process(spiff_task.workflow, bpmn_process)
@ -454,7 +455,6 @@ class TaskService:
spiff_task, spiff_task,
self.bpmn_definition_to_task_definitions_mappings, self.bpmn_definition_to_task_definitions_mappings,
) )
self.update_task_model(task_model, spiff_task) self.update_task_model(task_model, spiff_task)
self.task_models[task_model.guid] = task_model self.task_models[task_model.guid] = task_model

View File

@ -299,28 +299,33 @@ class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy):
class RunUntilUserTaskOrMessageExecutionStrategy(ExecutionStrategy): class RunUntilUserTaskOrMessageExecutionStrategy(ExecutionStrategy):
"""When you want to run tasks until you hit something to report to the end user.""" """When you want to run tasks until you hit something to report to the end user.
def get_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> list[SpiffTask]: Note that this will run at least one engine step if possible,
return list( but will stop if it hits instructions after the first task.
[ """
t
for t in bpmn_process_instance.get_tasks(TaskState.READY)
if t.task_spec.spec_type not in ["User Task", "Manual Task"]
and not (
hasattr(t.task_spec, "extensions") and t.task_spec.extensions.get("instructionsForEndUser", None)
)
]
)
def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None: def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
engine_steps = self.get_engine_steps(bpmn_process_instance) bpmn_process_instance.refresh_waiting_tasks()
engine_steps = self.get_ready_engine_steps(bpmn_process_instance)
if len(engine_steps) > 0:
self.delegate.will_complete_task(engine_steps[0])
engine_steps[0].run()
self.delegate.did_complete_task(engine_steps[0])
bpmn_process_instance.refresh_waiting_tasks()
engine_steps = self.get_ready_engine_steps(bpmn_process_instance)
while engine_steps: while engine_steps:
for task in engine_steps: for task in engine_steps:
if hasattr(task.task_spec, "extensions") and task.task_spec.extensions.get(
"instructionsForEndUser", None
):
should_continue = False
break
self.delegate.will_complete_task(task) self.delegate.will_complete_task(task)
task.run() task.run()
self.delegate.did_complete_task(task) self.delegate.did_complete_task(task)
engine_steps = self.get_engine_steps(bpmn_process_instance) engine_steps = self.get_ready_engine_steps(bpmn_process_instance)
self.delegate.after_engine_steps(bpmn_process_instance) self.delegate.after_engine_steps(bpmn_process_instance)

View File

@ -1408,6 +1408,7 @@ export default function ProcessInstanceListTable({
); );
}; };
// eslint-disable-next-line sonarjs/cognitive-complexity
const buildTable = () => { const buildTable = () => {
const headerLabels: Record<string, string> = { const headerLabels: Record<string, string> = {
id: 'Id', id: 'Id',
@ -1449,11 +1450,11 @@ export default function ProcessInstanceListTable({
buttonElement = ( buttonElement = (
<Button <Button
kind={ kind={
hasAccessToCompleteTask && row.task_id ? 'secondary' : 'tertiary' hasAccessToCompleteTask && row.task_id ? 'secondary' : 'ghost'
} }
href={interstitialUrl} href={interstitialUrl}
> >
Go {hasAccessToCompleteTask && row.task_id ? 'Go' : 'View'}
</Button> </Button>
); );
currentRow.push(<td>{buttonElement}</td>); currentRow.push(<td>{buttonElement}</td>);