drop completed_tasks from the task_api object

Show the "next" task if no task is provided on the task_show api endpoint
Adding interstitial endpoint
Rename run to run_and_save
Remove repeated code from execution strategy
Adding interstital frontend page
This commit is contained in:
Dan 2023-04-14 15:44:59 -04:00
parent fe8c86e288
commit 8a6426efec
12 changed files with 151 additions and 59 deletions

View File

@ -1817,6 +1817,27 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/ServiceTask"
/tasks/{process_instance_id}:
parameters:
- name: process_instance_id
in: path
required: true
description: The unique id of an existing process instance.
schema:
type: integer
get:
tags:
- Tasks
operationId: spiffworkflow_backend.routes.tasks_controller.interstitial
summary: An SSE (Server Sent Events) endpoint that returns what tasks are currently active (running, waiting, or the final END event)
responses:
"200":
description: One task
content:
application/json:
schema:
$ref: "#/components/schemas/Task"
/tasks/{process_instance_id}/{task_guid}:
parameters:

View File

@ -205,7 +205,6 @@ class ProcessInstanceApi:
next_task: Task | None,
process_model_identifier: str,
process_model_display_name: str,
completed_tasks: int,
updated_at_in_seconds: int,
) -> None:
"""__init__."""
@ -214,7 +213,6 @@ class ProcessInstanceApi:
self.next_task = next_task # The next task that requires user input.
self.process_model_identifier = process_model_identifier
self.process_model_display_name = process_model_display_name
self.completed_tasks = completed_tasks
self.updated_at_in_seconds = updated_at_in_seconds

View File

@ -1,7 +1,9 @@
"""APIs for dealing with process groups, process models, and process instances."""
import json
import os
import time
import uuid
from datetime import datetime
from sys import exc_info
from typing import Any
from typing import Dict
@ -12,7 +14,7 @@ from typing import Union
import flask.wrappers
import jinja2
import sentry_sdk
from flask import current_app
from flask import current_app, stream_with_context
from flask import g
from flask import jsonify
from flask import make_response
@ -262,7 +264,7 @@ def manual_complete_task(
)
def task_show(process_instance_id: int, task_guid: str) -> flask.wrappers.Response:
def task_show(process_instance_id: int, task_guid: str = "next") -> flask.wrappers.Response:
"""Task_show."""
process_instance = _find_process_instance_by_id_or_raise(process_instance_id)
@ -277,12 +279,16 @@ def task_show(process_instance_id: int, task_guid: str) -> flask.wrappers.Respon
process_instance.process_model_identifier,
)
_find_human_task_or_raise(process_instance_id, task_guid)
# _find_human_task_or_raise(process_instance_id, task_guid)
form_schema_file_name = ""
form_ui_schema_file_name = ""
processor = ProcessInstanceProcessor(process_instance)
spiff_task = _get_spiff_task_from_process_instance(task_guid, process_instance, processor=processor)
if task_guid == "next":
spiff_task = processor.next_task()
task_guid = spiff_task.id
else:
spiff_task = _get_spiff_task_from_process_instance(task_guid, process_instance, processor=processor)
extensions = spiff_task.task_spec.extensions
if "properties" in extensions:
@ -344,7 +350,11 @@ def task_show(process_instance_id: int, task_guid: str) -> flask.wrappers.Respon
task.form_ui_schema = ui_form_contents
_munge_form_ui_schema_based_on_hidden_fields_in_task_data(task)
_render_instructions_for_end_user(spiff_task, task)
return make_response(jsonify(task), 200)
def _render_instructions_for_end_user(spiff_task: SpiffTask, task: Task):
"""Assure any instructions for end user are processed for jinja syntax."""
if task.properties and "instructionsForEndUser" in task.properties:
if task.properties["instructionsForEndUser"]:
try:
@ -354,7 +364,7 @@ def task_show(process_instance_id: int, task_guid: str) -> flask.wrappers.Respon
except WorkflowTaskException as wfe:
wfe.add_note("Failed to render instructions for end user.")
raise ApiError.from_workflow_exception("instructions_error", str(wfe), exp=wfe) from wfe
return make_response(jsonify(task), 200)
return ""
def process_data_show(
@ -380,6 +390,24 @@ def process_data_show(
200,
)
def interstitial(process_instance_id: int):
process_instance = _find_process_instance_by_id_or_raise(process_instance_id)
processor = ProcessInstanceProcessor(process_instance)
def get_data():
spiff_task = processor.next_task()
last_task = None
while last_task != spiff_task:
task = ProcessInstanceService.spiff_task_to_api_task(processor, processor.next_task())
_render_instructions_for_end_user(spiff_task, task)
yield f'data: {current_app.json.dumps(task)} \n\n'
last_task = spiff_task
processor.do_engine_steps(execution_strategy_name="one_at_a_time")
spiff_task = processor.next_task()
return
# return Response(get_data(), mimetype='text/event-stream')
return Response(stream_with_context(get_data()), mimetype='text/event-stream')
def _task_submit_shared(
process_instance_id: int,
@ -462,9 +490,15 @@ def _task_submit_shared(
)
if next_human_task_assigned_to_me:
return make_response(jsonify(HumanTaskModel.to_task(next_human_task_assigned_to_me)), 200)
elif processor.next_task():
task = ProcessInstanceService.spiff_task_to_api_task(processor, processor.next_task())
return make_response(jsonify(task), 200)
return Response(json.dumps({"ok": True}), status=202, mimetype="application/json")
return Response(json.dumps(
{"ok": True,
"process_model_identifier": process_instance.process_model_identifier,
"process_instance_id": process_instance_id
}), status=202, mimetype="application/json")
def task_submit(
process_instance_id: int,

View File

@ -1642,7 +1642,7 @@ class ProcessInstanceProcessor:
self.save,
)
try:
execution_service.run(exit_at, save)
execution_service.run_and_save(exit_at, save)
finally:
# clear out failling spiff tasks here since the ProcessInstanceProcessor creates an instance of the
# script engine on a class variable.

View File

@ -155,7 +155,6 @@ class ProcessInstanceService:
next_task=None,
process_model_identifier=processor.process_model_identifier,
process_model_display_name=processor.process_model_display_name,
completed_tasks=processor.process_instance_model.completed_tasks,
updated_at_in_seconds=processor.process_instance_model.updated_at_in_seconds,
)
@ -442,6 +441,8 @@ class ProcessInstanceService:
spiff_task.get_state_name(),
lane=lane,
process_identifier=spiff_task.task_spec._wf_spec.name,
process_instance_id=processor.process_instance_model.id,
process_model_identifier=processor.process_model_identifier,
properties=props,
parent=parent_id,
event_definition=serialized_task_spec.get("event_definition"),

View File

@ -1,6 +1,6 @@
import copy
import time
from typing import Callable
from typing import Callable, List
from typing import Optional
from typing import Set
from uuid import UUID
@ -206,6 +206,15 @@ class ExecutionStrategy:
def save(self, bpmn_process_instance: BpmnWorkflow) -> None:
self.delegate.save(bpmn_process_instance)
def get_ready_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> List[SpiffTask]:
return list(
[
t
for t in bpmn_process_instance.get_tasks(TaskState.READY)
if bpmn_process_instance._is_engine_task(t.task_spec)
]
)
class GreedyExecutionStrategy(ExecutionStrategy):
"""The common execution strategy. This will greedily run all engine steps without stopping."""
@ -234,7 +243,6 @@ class GreedyExecutionStrategy(ExecutionStrategy):
if non_human_waiting_task is not None:
self.run_until_user_input_required(exit_at)
class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy):
"""For illustration purposes, not currently integrated.
@ -243,30 +251,45 @@ class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy):
"""
def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
self.bpmn_process_instance = bpmn_process_instance
engine_steps = list(
[
t
for t in bpmn_process_instance.get_tasks(TaskState.READY)
if bpmn_process_instance._is_engine_task(t.task_spec)
]
)
engine_steps = self.get_ready_engine_steps(bpmn_process_instance)
while engine_steps:
for spiff_task in engine_steps:
if spiff_task.task_spec.spec_type == "Service Task":
return
self.delegate.will_complete_task(spiff_task)
spiff_task.complete()
spiff_task.run()
self.delegate.did_complete_task(spiff_task)
bpmn_process_instance.refresh_waiting_tasks()
engine_steps = self.get_ready_engine_steps(bpmn_process_instance)
self.delegate.after_engine_steps(bpmn_process_instance)
engine_steps = list(
[
t
for t in bpmn_process_instance.get_tasks(TaskState.READY)
if bpmn_process_instance._is_engine_task(t.task_spec)
]
)
class RunUntilUserMessageExecutionStrategy(ExecutionStrategy):
"""When you want to run tasks until you hit something to report to the end user, or
until there are no other engine steps to complete."""
def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
engine_steps = self.get_ready_engine_steps(bpmn_process_instance)
while engine_steps:
for spiff_task in engine_steps:
self.delegate.will_complete_task(spiff_task)
spiff_task.run()
self.delegate.did_complete_task(spiff_task)
if spiff_task.task_spec.properties.get("instructionsForEndUser", None) is not None:
break
engine_steps = self.get_ready_engine_steps(bpmn_process_instance)
self.delegate.after_engine_steps(bpmn_process_instance)
class OneAtATimeExecutionStrategy(ExecutionStrategy):
"""When you want to run only one engine step at a time."""
def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
engine_steps = self.get_ready_engine_steps(bpmn_process_instance)
if len(engine_steps) > 0:
spiff_task = engine_steps[0]
self.delegate.will_complete_task(spiff_task)
spiff_task.run()
self.delegate.did_complete_task(spiff_task)
self.delegate.after_engine_steps(bpmn_process_instance)
@ -274,6 +297,8 @@ def execution_strategy_named(name: str, delegate: EngineStepDelegate) -> Executi
cls = {
"greedy": GreedyExecutionStrategy,
"run_until_service_task": RunUntilServiceTaskExecutionStrategy,
"run_until_user_message": RunUntilUserMessageExecutionStrategy,
"one_at_a_time": OneAtATimeExecutionStrategy,
}[name]
return cls(delegate)
@ -282,7 +307,6 @@ def execution_strategy_named(name: str, delegate: EngineStepDelegate) -> Executi
ProcessInstanceCompleter = Callable[[BpmnWorkflow], None]
ProcessInstanceSaver = Callable[[], None]
class WorkflowExecutionService:
"""Provides the driver code for workflow execution."""
@ -306,16 +330,7 @@ class WorkflowExecutionService:
# run
# execution_strategy.spiff_run
# spiff.[some_run_task_method]
def run(self, exit_at: None = None, save: bool = False) -> None:
"""Do_engine_steps."""
with safe_assertion(ProcessInstanceLockService.has_lock(self.process_instance_model.id)) as tripped:
if tripped:
raise AssertionError(
"The current thread has not obtained a lock for this process"
f" instance ({self.process_instance_model.id})."
)
try:
def run_and_save(self, exit_at: None = None, save: bool = False) -> None:
self.bpmn_process_instance.refresh_waiting_tasks()
# TODO: implicit re-entrant locks here `with_dequeued`
@ -324,17 +339,9 @@ class WorkflowExecutionService:
if self.bpmn_process_instance.is_completed():
self.process_instance_completer(self.bpmn_process_instance)
self.process_bpmn_messages()
self.queue_waiting_receive_messages()
except SpiffWorkflowException as swe:
raise ApiError.from_workflow_exception("task_error", str(swe), swe) from swe
finally:
self.execution_strategy.save(self.bpmn_process_instance)
db.session.commit()
if save:
self.process_instance_saver()
self.execution_strategy.spiff_run(self.bpmn_process_instance, exit_at)
if self.bpmn_process_instance.is_completed():
self.process_instance_completer(self.bpmn_process_instance)
def process_bpmn_messages(self) -> None:
"""Process_bpmn_messages."""
@ -407,11 +414,11 @@ class WorkflowExecutionService:
class ProfiledWorkflowExecutionService(WorkflowExecutionService):
"""A profiled version of the workflow execution service."""
def run(self, exit_at: None = None, save: bool = False) -> None:
def run_and_save(self, exit_at: None = None, save: bool = False) -> None:
"""__do_engine_steps."""
import cProfile
from pstats import SortKey
with cProfile.Profile() as pr:
super().run(exit_at=exit_at, save=save)
super().run_and_save(exit_at=exit_at, save=save)
pr.print_stats(sort=SortKey.CUMULATIVE)

View File

@ -17,6 +17,7 @@
"@casl/ability": "^6.3.2",
"@casl/react": "^3.1.0",
"@ginkgo-bioworks/react-json-schema-form-builder": "^2.9.0",
"@microsoft/fetch-event-source": "^2.0.1",
"@monaco-editor/react": "^4.4.5",
"@mui/material": "^5.10.14",
"@react-icons/all-files": "^4.1.0",
@ -4473,6 +4474,11 @@
"@lezer/common": "^1.0.0"
}
},
"node_modules/@microsoft/fetch-event-source": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/@microsoft/fetch-event-source/-/fetch-event-source-2.0.1.tgz",
"integrity": "sha512-W6CLUJ2eBMw3Rec70qrsEW0jOm/3twwJv21mrmj2yORiaVmVYGS4sSS5yUwvQc1ZlDLYGPnClVWmUUMagKNsfA=="
},
"node_modules/@monaco-editor/loader": {
"version": "1.3.2",
"resolved": "https://registry.npmjs.org/@monaco-editor/loader/-/loader-1.3.2.tgz",
@ -8066,7 +8072,7 @@
},
"node_modules/bpmn-js-spiffworkflow": {
"version": "0.0.8",
"resolved": "git+ssh://git@github.com/sartography/bpmn-js-spiffworkflow.git#24a71ec5e2cbbefce58be4b4610151db4a55a8e1",
"resolved": "git+ssh://git@github.com/sartography/bpmn-js-spiffworkflow.git#69135655f8a5282bcdaef82705c3d522ef5b4464",
"license": "MIT",
"dependencies": {
"inherits": "^2.0.4",
@ -35584,6 +35590,11 @@
"@lezer/common": "^1.0.0"
}
},
"@microsoft/fetch-event-source": {
"version": "2.0.1",
"resolved": "https://registry.npmjs.org/@microsoft/fetch-event-source/-/fetch-event-source-2.0.1.tgz",
"integrity": "sha512-W6CLUJ2eBMw3Rec70qrsEW0jOm/3twwJv21mrmj2yORiaVmVYGS4sSS5yUwvQc1ZlDLYGPnClVWmUUMagKNsfA=="
},
"@monaco-editor/loader": {
"version": "1.3.2",
"resolved": "https://registry.npmjs.org/@monaco-editor/loader/-/loader-1.3.2.tgz",
@ -38227,7 +38238,7 @@
}
},
"bpmn-js-spiffworkflow": {
"version": "git+ssh://git@github.com/sartography/bpmn-js-spiffworkflow.git#24a71ec5e2cbbefce58be4b4610151db4a55a8e1",
"version": "git+ssh://git@github.com/sartography/bpmn-js-spiffworkflow.git#69135655f8a5282bcdaef82705c3d522ef5b4464",
"from": "bpmn-js-spiffworkflow@sartography/bpmn-js-spiffworkflow#main",
"requires": {
"inherits": "^2.0.4",

View File

@ -12,6 +12,7 @@
"@casl/ability": "^6.3.2",
"@casl/react": "^3.1.0",
"@ginkgo-bioworks/react-json-schema-form-builder": "^2.9.0",
"@microsoft/fetch-event-source": "^2.0.1",
"@monaco-editor/react": "^4.4.5",
"@mui/material": "^5.10.14",
"@react-icons/all-files": "^4.1.0",

View File

@ -8,6 +8,7 @@ import NavigationBar from './components/NavigationBar';
import HomePageRoutes from './routes/HomePageRoutes';
import ErrorBoundary from './components/ErrorBoundary';
import AdminRoutes from './routes/AdminRoutes';
import ProcessRoutes from './routes/ProcessRoutes';
import { AbilityContext } from './contexts/Can';
import UserService from './services/UserService';
@ -35,6 +36,7 @@ export default function App() {
<Routes>
<Route path="/*" element={<HomePageRoutes />} />
<Route path="/tasks/*" element={<HomePageRoutes />} />
<Route path="/process/*" element={<ProcessRoutes />} />
<Route path="/admin/*" element={<AdminRoutes />} />
</Routes>
</ErrorBoundary>

View File

@ -7,6 +7,7 @@ import MyTasks from './MyTasks';
import CompletedInstances from './CompletedInstances';
import CreateNewInstance from './CreateNewInstance';
import InProgressInstances from './InProgressInstances';
import ProcessInterstitial from './ProcessInterstitial';
export default function HomePageRoutes() {
const location = useLocation();
@ -55,6 +56,7 @@ export default function HomePageRoutes() {
<Route path="my-tasks" element={<MyTasks />} />
<Route path=":process_instance_id/:task_id" element={<TaskShow />} />
<Route path="grouped" element={<InProgressInstances />} />
<Route path="process/:process_instance_id/interstitial" element={<ProcessInterstitial />} />
<Route path="completed-instances" element={<CompletedInstances />} />
<Route path="create-new-instance" element={<CreateNewInstance />} />
</Routes>

View File

@ -17,6 +17,7 @@ import {
import MDEditor from '@uiw/react-md-editor';
// eslint-disable-next-line import/no-named-as-default
import Form from '../themes/carbon';
import Loading from '../themes/carbon';
import HttpService from '../services/HttpService';
import useAPIError from '../hooks/UseApiError';
import { modifyProcessIdentifierForPathParam } from '../helpers';
@ -90,6 +91,8 @@ function TypeAheadWidget({
);
}
class UnexpectedHumanTaskType extends Error {
constructor(message: string) {
super(message);
@ -108,6 +111,7 @@ export default function TaskShow() {
const params = useParams();
const navigate = useNavigate();
const [disabled, setDisabled] = useState(false);
const [refreshSeconds, setRefreshSeconds] = useState(0);
// save current form data so that we can avoid validations in certain situations
const [currentFormObject, setCurrentFormObject] = useState<any>({});
@ -117,6 +121,7 @@ export default function TaskShow() {
// eslint-disable-next-line sonarjs/no-duplicate-string
const supportedHumanTaskTypes = ['User Task', 'Manual Task'];
useEffect(() => {
const processResult = (result: ProcessInstanceTask) => {
setTask(result);
@ -157,7 +162,15 @@ export default function TaskShow() {
if (result.ok) {
navigate(`/tasks`);
} else if (result.process_instance_id) {
navigate(`/tasks/${result.process_instance_id}/${result.id}`);
if (result.type in supportedHumanTaskTypes) {
navigate(`/tasks/${result.process_instance_id}/${result.id}`);
} else {
navigate(
`/process/${modifyProcessIdentifierForPathParam(
result.process_model_identifier
)}/${result.process_instance_id}/interstitial`
);
}
} else {
addError(result);
}
@ -344,8 +357,10 @@ export default function TaskShow() {
</Button>
);
} else {
throw new UnexpectedHumanTaskType(
`Invalid task type given: ${task.type}. Only supported types: ${supportedHumanTaskTypes}`
return (
<p>
<i>Page will refresh in {refreshSeconds} seconds.</i>
</p>
);
}
reactFragmentToHideSubmitButton = (

View File

@ -8,7 +8,7 @@ const HttpMethods = {
DELETE: 'DELETE',
};
const getBasicHeaders = (): object => {
export const getBasicHeaders = (): Record<string, string> => {
if (UserService.isLoggedIn()) {
return {
Authorization: `Bearer ${UserService.getAccessToken()}`,