diff --git a/docs/DevOps_installation_integration/redis_celery_broker.md b/docs/DevOps_installation_integration/redis_celery_broker.md new file mode 100644 index 00000000..0340ad4b --- /dev/null +++ b/docs/DevOps_installation_integration/redis_celery_broker.md @@ -0,0 +1,41 @@ +# Redis Celery Broker + +SpiffWorkflow can be configured to use celery for efficient processing. +Redis can be used as both a broker and backend for celery. + +If configured in this way, there will be a queue called "celery" and you can inspect it from redis-cli like this: + +```sh +redis-cli LLEN celery # how many queued entries +redis-cli LRANGE celery 0 -1 # get all queued entries. Be careful if you have a lot. +``` + +If you want to purge all entries from the queue: + +```sh +poetry run celery -A src.spiffworkflow_backend.background_processing.celery_worker purge +``` + +If you want to inspect jobs that are currently being processed by workers: + +```sh +poetry run celery -A src.spiffworkflow_backend.background_processing.celery_worker inspect active +``` + +When we publish a message to the queue, we log a message like this at the log level info: + +```sh +Queueing process instance (3) for celery (9622ff55-9f23-4a94-b4a0-4e0a615a8d14) +``` + +If you want to get the results of this job after the worker processes it, you would run a query like this: + +```sh +redis-cli get celery-task-meta-9622ff55-9f23-4a94-b4a0-4e0a615a8d14 +``` + +As such, if you wanted to get ALL of the results, you could use a hilarious command like: + +```sh +echo 'keys celery-task-meta-\*' | redis-cli | sed 's/^/get /' | redis-cli +``` diff --git a/docs/index.md b/docs/index.md index a0d66081..aef19d70 100644 --- a/docs/index.md +++ b/docs/index.md @@ -6,7 +6,6 @@ Getting_Started/quick_start.md ``` - ```{toctree} :maxdepth: 1 :caption: Building Diagrams @@ -48,6 +47,7 @@ DevOps_installation_integration/permission_url.md DevOps_installation_integration/configure_connector_proxy.md DevOps_installation_integration/deploy_aws_lambda.md DevOps_installation_integration/Secrets.md +DevOps_installation_integration/redis_celery_broker.md ``` ```{toctree} @@ -71,6 +71,6 @@ wish_list/wish_list.md ## Indices and tables -* [](genindex) -* [](modindex) -* [](search) +- [](genindex) +- [](modindex) +- [](search) diff --git a/spiffworkflow-backend/bin/import_process_instance_from_bpmn_json.py b/spiffworkflow-backend/bin/import_process_instance_from_bpmn_json.py new file mode 100644 index 00000000..3d8858df --- /dev/null +++ b/spiffworkflow-backend/bin/import_process_instance_from_bpmn_json.py @@ -0,0 +1,28 @@ +import json +import sys + +from spiffworkflow_backend import create_app +from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor +from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService +from spiffworkflow_backend.services.process_model_service import ProcessModelService +from spiffworkflow_backend.services.user_service import UserService + + +def main(process_model_identifier: str, filepath: str) -> None: + app = create_app() + with app.app_context(): + user = UserService.find_or_create_system_user() + process_model = ProcessModelService.get_process_model(process_model_identifier.replace(":", "/")) + process_instance, _ = ProcessInstanceService.create_process_instance(process_model, user) + with open(filepath) as f: + bpmn_process_json = f.read() + bpmn_process_dict = json.loads(bpmn_process_json) + ProcessInstanceProcessor.persist_bpmn_process_dict( + bpmn_process_dict, bpmn_definition_to_task_definitions_mappings={}, process_instance_model=process_instance + ) + + +if len(sys.argv) < 3: + raise Exception("usage: [script] [process_model_identifier] [bpmn_json_file_path]") + +main(sys.argv[1], sys.argv[2]) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py index 4c30ffbd..d25190b9 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py @@ -36,6 +36,9 @@ def celery_task_process_instance_run(process_instance_id: int, task_guid: str | execution_strategy_name="queue_instructions_for_end_user", additional_processing_identifier=proc_index, ) + # currently, whenever we get a task_guid, that means that that task, which was a future task, is ready to run. + # there is an assumption that it was successfully processed by run_process_instance_with_processor above. + # we might want to check that assumption. if task_guid is not None: future_task = FutureTaskModel.query.filter_by(completed=False, guid=task_guid).first() if future_task is not None: diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py index 80cd519b..7d54e1ff 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task_producer.py @@ -49,7 +49,8 @@ def queue_future_task_if_appropriate(process_instance: ProcessInstanceModel, eta # (maybe due to subsecond stuff, maybe because of clock skew within the cluster of computers running spiff) # celery_task_process_instance_run.apply_async(kwargs=args_to_celery, countdown=countdown + 1) # type: ignore - celery.current_app.send_task(CELERY_TASK_PROCESS_INSTANCE_RUN, kwargs=args_to_celery, countdown=countdown) + async_result = celery.current_app.send_task(CELERY_TASK_PROCESS_INSTANCE_RUN, kwargs=args_to_celery, countdown=countdown) + current_app.logger.info(f"Queueing process instance ({process_instance.id}) for celery ({async_result.task_id})") return True return False @@ -70,6 +71,7 @@ def queue_process_instance_if_appropriate(process_instance: ProcessInstanceModel # ) if should_queue_process_instance(process_instance, execution_mode): - celery.current_app.send_task(CELERY_TASK_PROCESS_INSTANCE_RUN, (process_instance.id,)) + async_result = celery.current_app.send_task(CELERY_TASK_PROCESS_INSTANCE_RUN, (process_instance.id,)) + current_app.logger.info(f"Queueing process instance ({process_instance.id}) for celery ({async_result.task_id})") return True return False diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_api_blueprint.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_api_blueprint.py index 249be47d..f9db9bb5 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_api_blueprint.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_api_blueprint.py @@ -21,6 +21,9 @@ from sqlalchemy import or_ from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import ( queue_enabled_for_process_model, ) +from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import ( + queue_process_instance_if_appropriate, +) from spiffworkflow_backend.data_migrations.process_instance_migrator import ProcessInstanceMigrator from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.exceptions.error import HumanTaskAlreadyCompletedError @@ -487,14 +490,16 @@ def _task_submit_shared( ) with sentry_sdk.start_span(op="task", description="complete_form_task"): - ProcessInstanceService.complete_form_task( - processor=processor, - spiff_task=spiff_task, - data=body, - user=g.user, - human_task=human_task, - execution_mode=execution_mode, - ) + with ProcessInstanceQueueService.dequeued(processor.process_instance_model, max_attempts=3): + ProcessInstanceService.complete_form_task( + processor=processor, + spiff_task=spiff_task, + data=body, + user=g.user, + human_task=human_task, + execution_mode=execution_mode, + ) + queue_process_instance_if_appropriate(process_instance, execution_mode) # currently task_model has the potential to be None. This should be removable once # we backfill the human_task table for task_guid and make that column not nullable diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py index 33374657..0950d24c 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py @@ -19,9 +19,7 @@ from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore from SpiffWorkflow.util.task import TaskState # type: ignore -from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import ( - queue_process_instance_if_appropriate, -) +from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import should_queue_process_instance from spiffworkflow_backend.data_migrations.process_instance_migrator import ProcessInstanceMigrator from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.exceptions.error import HumanTaskAlreadyCompletedError @@ -477,23 +475,20 @@ class ProcessInstanceService: Abstracted here because we need to do it multiple times when completing all tasks in a multi-instance task. """ - with ProcessInstanceQueueService.dequeued(processor.process_instance_model, max_attempts=3): - ProcessInstanceService.update_form_task_data(processor.process_instance_model, spiff_task, data, user) - processor.complete_task(spiff_task, human_task, user=user) + ProcessInstanceService.update_form_task_data(processor.process_instance_model, spiff_task, data, user) + processor.complete_task(spiff_task, human_task, user=user) - if queue_process_instance_if_appropriate(processor.process_instance_model, execution_mode): - return - else: - with ProcessInstanceQueueService.dequeued(processor.process_instance_model, max_attempts=3): - if not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(processor.process_instance_model): - with sentry_sdk.start_span(op="task", description="backend_do_engine_steps"): - execution_strategy_name = None - if execution_mode == ProcessInstanceExecutionMode.synchronous.value: - execution_strategy_name = "greedy" + # the caller needs to handle the actual queueing of the process instance for better dequeueing ability + if not should_queue_process_instance(processor.process_instance_model, execution_mode): + if not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(processor.process_instance_model): + with sentry_sdk.start_span(op="task", description="backend_do_engine_steps"): + execution_strategy_name = None + if execution_mode == ProcessInstanceExecutionMode.synchronous.value: + execution_strategy_name = "greedy" - # maybe move this out once we have the interstitial page since this is - # here just so we can get the next human task - processor.do_engine_steps(save=True, execution_strategy_name=execution_strategy_name) + # maybe move this out once we have the interstitial page since this is + # here just so we can get the next human task + processor.do_engine_steps(save=True, execution_strategy_name=execution_strategy_name) @staticmethod def spiff_task_to_api_task(