jasquat 18600189c8 Feature/background proc with celery (#788)
* WIP: some initial test code to test out celery w/ burnettk

* some cleanup for celery and added base model to put tasks waiting on timers

* removed dup bpmn file

* some more cleanup and added strategy to queue instructions

* some minor code changes w/ burnettk

* remove the unused next_task key from api calls since nobody uses it w/ burnettk essweine

* added migration for future tasks and added test to make sure we are inserting into it w/ burnettk essweine

* ensure future task run at time can be updated w/ burnettk

* added table to queue instructions for end user in w/ burnettk

* added test to ensure we are storing instructions for end users w/ burnettk

* added progress page to display new instructions to user

* ignore dup instructions on db insert w/ burnettk

* some more updates for celery w/ burnettk

* some pyl and test fixes w/ burnettk

* fixed tests w/ burnettk

* WIP: added in page to show instructions on pi show page w/ burnettk

* pi show page is fully using not interstitial now w/ burnettk

* fixed broken test w/ burnettk

* moved background processing items to own module w/ burnettk

* fixed apscheduler start script

* updated celery task queue to handle future tasks and upgraded black and set its line-length to match ruff w/ burnettk

* added support to run future tasks using countdown w/ burnettk

* build image for celery branch w/ burnettk

* poet does not exist in the image w/ burnettk

* start blocking scheduler should always start the scheduler w/ burnettk

* add init and stuff for this branch

* make this work not just on my mac

* send other args to only

* added running status for process instance and use that on fe to go to show page and added additional identifier to locking system to isolate celery workers better w/ burnettk

* fixed typing error that typeguard found, not sure why mypy did not w/ burnettk

* do not check for no instructions on interstitial page for cypress tests on frontend w/ burnettk

* do not queue process instances twice w/ burnettk

* removed bad file w/ burnettk

* queue tasks using strings to avoid circular imports when attmepting to queue w/ burnettk

* only queue imminent new timer events and mock celery

* some keyboard shortcut support on frontend and added ability to force run a process instance over the api w/ burnettk

* some styles added for the shortcut menu w/ burnettk

* pyl w/ burnettk

* fixed test w/ burnettk

* removed temporary celery script and added support for celery worker in run server locally w/ burnettk

* cleaned up migrations w/ burnettk

* created new migration to clean up old migrations

---------

Co-authored-by: jasquat <jasquat@users.noreply.github.com>
Co-authored-by: burnettk <burnettk@users.noreply.github.com>
2023-12-05 11:41:59 -05:00

104 lines
4.4 KiB
Python

import faulthandler
import os
from typing import Any
import connexion # type: ignore
import flask.app
import flask.json
import sqlalchemy
from flask.json.provider import DefaultJSONProvider
from flask_cors import CORS # type: ignore
from flask_mail import Mail # type: ignore
import spiffworkflow_backend.load_database_models # noqa: F401
from spiffworkflow_backend.background_processing.apscheduler import start_apscheduler_if_appropriate
from spiffworkflow_backend.background_processing.celery import init_celery_if_appropriate
from spiffworkflow_backend.config import setup_config
from spiffworkflow_backend.exceptions.api_error import api_error_blueprint
from spiffworkflow_backend.helpers.api_version import V1_API_PATH_PREFIX
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import migrate
from spiffworkflow_backend.routes.authentication_controller import _set_new_access_token_in_cookie
from spiffworkflow_backend.routes.authentication_controller import verify_token
from spiffworkflow_backend.routes.openid_blueprint.openid_blueprint import openid_blueprint
from spiffworkflow_backend.routes.user_blueprint import user_blueprint
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.monitoring_service import configure_sentry
from spiffworkflow_backend.services.monitoring_service import setup_prometheus_metrics
class MyJSONEncoder(DefaultJSONProvider):
def default(self, obj: Any) -> Any:
if hasattr(obj, "serialized"):
return obj.serialized()
elif isinstance(obj, sqlalchemy.engine.row.Row): # type: ignore
return_dict = {}
row_mapping = obj._mapping
for row_key in row_mapping.keys():
row_value = row_mapping[row_key]
if hasattr(row_value, "serialized"):
return_dict.update(row_value.serialized())
elif hasattr(row_value, "__dict__"):
return_dict.update(row_value.__dict__)
else:
return_dict.update({row_key: row_value})
if "_sa_instance_state" in return_dict:
return_dict.pop("_sa_instance_state")
return return_dict
return super().default(obj)
def dumps(self, obj: Any, **kwargs: Any) -> Any:
kwargs.setdefault("default", self.default)
return super().dumps(obj, **kwargs)
def create_app() -> flask.app.Flask:
faulthandler.enable()
# We need to create the sqlite database in a known location.
# If we rely on the app.instance_path without setting an environment
# variable, it will be one thing when we run flask db upgrade in the
# noxfile and another thing when the tests actually run.
# instance_path is described more at https://flask.palletsprojects.com/en/2.1.x/config/
connexion_app = connexion.FlaskApp(__name__, server_args={"instance_path": os.environ.get("FLASK_INSTANCE_PATH")})
app = connexion_app.app
app.config["CONNEXION_APP"] = connexion_app
app.config["SESSION_TYPE"] = "filesystem"
setup_prometheus_metrics(app, connexion_app)
setup_config(app)
db.init_app(app)
migrate.init_app(app, db)
app.register_blueprint(user_blueprint)
app.register_blueprint(api_error_blueprint)
app.register_blueprint(openid_blueprint, url_prefix="/openid")
# preflight options requests will be allowed if they meet the requirements of the url regex.
# we will add an Access-Control-Max-Age header to the response to tell the browser it doesn't
# need to continually keep asking for the same path.
origins_re = [r"^https?:\/\/%s(.*)" % o.replace(".", r"\.") for o in app.config["SPIFFWORKFLOW_BACKEND_CORS_ALLOW_ORIGINS"]]
CORS(app, origins=origins_re, max_age=3600, supports_credentials=True)
connexion_app.add_api("api.yml", base_path=V1_API_PATH_PREFIX)
mail = Mail(app)
app.config["MAIL_APP"] = mail
app.json = MyJSONEncoder(app)
configure_sentry(app)
app.before_request(verify_token)
app.before_request(AuthorizationService.check_for_permission)
app.after_request(_set_new_access_token_in_cookie)
# The default is true, but we want to preserve the order of keys in the json
# This is particularly helpful for forms that are generated from json schemas.
app.json.sort_keys = False
start_apscheduler_if_appropriate(app)
init_celery_if_appropriate(app)
return app # type: ignore