Celery docs and debug (#1328)

* added script to import process instance and updated some docs for redis and celery w/ burnettk

* pyl and removed bad gitignore w/ burnettk

---------

Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
jasquat 2024-04-02 20:13:18 +00:00 committed by GitHub
parent 0c945c7eea
commit b21a3b3ec7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 106 additions and 32 deletions

View File

@ -0,0 +1,41 @@
# Redis Celery Broker
SpiffWorkflow can be configured to use celery for efficient processing.
Redis can be used as both a broker and backend for celery.
If configured in this way, there will be a queue called "celery" and you can inspect it from redis-cli like this:
```sh
redis-cli LLEN celery # how many queued entries
redis-cli LRANGE celery 0 -1 # get all queued entries. Be careful if you have a lot.
```
If you want to purge all entries from the queue:
```sh
poetry run celery -A src.spiffworkflow_backend.background_processing.celery_worker purge
```
If you want to inspect jobs that are currently being processed by workers:
```sh
poetry run celery -A src.spiffworkflow_backend.background_processing.celery_worker inspect active
```
When we publish a message to the queue, we log a message like this at the log level info:
```sh
Queueing process instance (3) for celery (9622ff55-9f23-4a94-b4a0-4e0a615a8d14)
```
If you want to get the results of this job after the worker processes it, you would run a query like this:
```sh
redis-cli get celery-task-meta-9622ff55-9f23-4a94-b4a0-4e0a615a8d14
```
As such, if you wanted to get ALL of the results, you could use a hilarious command like:
```sh
echo 'keys celery-task-meta-\*' | redis-cli | sed 's/^/get /' | redis-cli
```

View File

@ -6,7 +6,6 @@
Getting_Started/quick_start.md
```
```{toctree}
:maxdepth: 1
:caption: Building Diagrams
@ -48,6 +47,7 @@ DevOps_installation_integration/permission_url.md
DevOps_installation_integration/configure_connector_proxy.md
DevOps_installation_integration/deploy_aws_lambda.md
DevOps_installation_integration/Secrets.md
DevOps_installation_integration/redis_celery_broker.md
```
```{toctree}
@ -71,6 +71,6 @@ wish_list/wish_list.md
## Indices and tables
* [](genindex)
* [](modindex)
* [](search)
- [](genindex)
- [](modindex)
- [](search)

View File

@ -0,0 +1,28 @@
import json
import sys
from spiffworkflow_backend import create_app
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService
from spiffworkflow_backend.services.process_model_service import ProcessModelService
from spiffworkflow_backend.services.user_service import UserService
def main(process_model_identifier: str, filepath: str) -> None:
app = create_app()
with app.app_context():
user = UserService.find_or_create_system_user()
process_model = ProcessModelService.get_process_model(process_model_identifier.replace(":", "/"))
process_instance, _ = ProcessInstanceService.create_process_instance(process_model, user)
with open(filepath) as f:
bpmn_process_json = f.read()
bpmn_process_dict = json.loads(bpmn_process_json)
ProcessInstanceProcessor.persist_bpmn_process_dict(
bpmn_process_dict, bpmn_definition_to_task_definitions_mappings={}, process_instance_model=process_instance
)
if len(sys.argv) < 3:
raise Exception("usage: [script] [process_model_identifier] [bpmn_json_file_path]")
main(sys.argv[1], sys.argv[2])

View File

@ -36,6 +36,9 @@ def celery_task_process_instance_run(process_instance_id: int, task_guid: str |
execution_strategy_name="queue_instructions_for_end_user",
additional_processing_identifier=proc_index,
)
# currently, whenever we get a task_guid, that means that that task, which was a future task, is ready to run.
# there is an assumption that it was successfully processed by run_process_instance_with_processor above.
# we might want to check that assumption.
if task_guid is not None:
future_task = FutureTaskModel.query.filter_by(completed=False, guid=task_guid).first()
if future_task is not None:

View File

@ -49,7 +49,8 @@ def queue_future_task_if_appropriate(process_instance: ProcessInstanceModel, eta
# (maybe due to subsecond stuff, maybe because of clock skew within the cluster of computers running spiff)
# celery_task_process_instance_run.apply_async(kwargs=args_to_celery, countdown=countdown + 1) # type: ignore
celery.current_app.send_task(CELERY_TASK_PROCESS_INSTANCE_RUN, kwargs=args_to_celery, countdown=countdown)
async_result = celery.current_app.send_task(CELERY_TASK_PROCESS_INSTANCE_RUN, kwargs=args_to_celery, countdown=countdown)
current_app.logger.info(f"Queueing process instance ({process_instance.id}) for celery ({async_result.task_id})")
return True
return False
@ -70,6 +71,7 @@ def queue_process_instance_if_appropriate(process_instance: ProcessInstanceModel
# )
if should_queue_process_instance(process_instance, execution_mode):
celery.current_app.send_task(CELERY_TASK_PROCESS_INSTANCE_RUN, (process_instance.id,))
async_result = celery.current_app.send_task(CELERY_TASK_PROCESS_INSTANCE_RUN, (process_instance.id,))
current_app.logger.info(f"Queueing process instance ({process_instance.id}) for celery ({async_result.task_id})")
return True
return False

View File

@ -21,6 +21,9 @@ from sqlalchemy import or_
from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import (
queue_enabled_for_process_model,
)
from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import (
queue_process_instance_if_appropriate,
)
from spiffworkflow_backend.data_migrations.process_instance_migrator import ProcessInstanceMigrator
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.exceptions.error import HumanTaskAlreadyCompletedError
@ -487,6 +490,7 @@ def _task_submit_shared(
)
with sentry_sdk.start_span(op="task", description="complete_form_task"):
with ProcessInstanceQueueService.dequeued(processor.process_instance_model, max_attempts=3):
ProcessInstanceService.complete_form_task(
processor=processor,
spiff_task=spiff_task,
@ -495,6 +499,7 @@ def _task_submit_shared(
human_task=human_task,
execution_mode=execution_mode,
)
queue_process_instance_if_appropriate(process_instance, execution_mode)
# currently task_model has the potential to be None. This should be removable once
# we backfill the human_task table for task_guid and make that column not nullable

View File

@ -19,9 +19,7 @@ from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore
from SpiffWorkflow.util.task import TaskState # type: ignore
from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import (
queue_process_instance_if_appropriate,
)
from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import should_queue_process_instance
from spiffworkflow_backend.data_migrations.process_instance_migrator import ProcessInstanceMigrator
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.exceptions.error import HumanTaskAlreadyCompletedError
@ -477,14 +475,11 @@ class ProcessInstanceService:
Abstracted here because we need to do it multiple times when completing all tasks in
a multi-instance task.
"""
with ProcessInstanceQueueService.dequeued(processor.process_instance_model, max_attempts=3):
ProcessInstanceService.update_form_task_data(processor.process_instance_model, spiff_task, data, user)
processor.complete_task(spiff_task, human_task, user=user)
if queue_process_instance_if_appropriate(processor.process_instance_model, execution_mode):
return
else:
with ProcessInstanceQueueService.dequeued(processor.process_instance_model, max_attempts=3):
# the caller needs to handle the actual queueing of the process instance for better dequeueing ability
if not should_queue_process_instance(processor.process_instance_model, execution_mode):
if not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(processor.process_instance_model):
with sentry_sdk.start_span(op="task", description="backend_do_engine_steps"):
execution_strategy_name = None