From 127ef67f120ef4035a5f214f6b140488eca60a8d Mon Sep 17 00:00:00 2001 From: jasquat Date: Wed, 9 Nov 2022 17:01:17 -0500 Subject: [PATCH] Squashed 'spiffworkflow-backend/' changes from a195512fd..795df3526 795df3526 use instance path instead of root_path for nox w/ burnettk d4113651e task_json is not optional w/ burnettk c93dd2822 Merge commit '89bfc25f35bdfd57eb9ccf6f3a9a3de76e68cf93' 44ba4738f Merge commit 'de30945eec9161570080b4858da967a7628ec86c' f894ebee2 Bulk insert spiff logs and step details (#26) git-subtree-dir: spiffworkflow-backend git-subtree-split: 795df35260fe24562cf3ab2b92169ecad37d3d55 --- .../{50dd2e016d94_.py => fd00c59e1f60_.py} | 60 +++++++++---------- .../models/process_instance.py | 2 - .../models/spiff_logging.py | 8 +-- .../models/spiff_step_details.py | 10 +--- .../routes/process_api_blueprint.py | 8 ++- .../services/logging_service.py | 39 ++++++++---- .../services/process_instance_processor.py | 56 ++++++++++++++--- .../helpers/example_data.py | 5 +- 8 files changed, 115 insertions(+), 73 deletions(-) rename migrations/versions/{50dd2e016d94_.py => fd00c59e1f60_.py} (97%) diff --git a/migrations/versions/50dd2e016d94_.py b/migrations/versions/fd00c59e1f60_.py similarity index 97% rename from migrations/versions/50dd2e016d94_.py rename to migrations/versions/fd00c59e1f60_.py index a702c5a4..f240843a 100644 --- a/migrations/versions/50dd2e016d94_.py +++ b/migrations/versions/fd00c59e1f60_.py @@ -1,8 +1,8 @@ """empty message -Revision ID: 50dd2e016d94 +Revision ID: fd00c59e1f60 Revises: -Create Date: 2022-11-08 16:28:18.991635 +Create Date: 2022-11-09 14:04:14.169379 """ from alembic import op @@ -10,7 +10,7 @@ import sqlalchemy as sa # revision identifiers, used by Alembic. -revision = '50dd2e016d94' +revision = 'fd00c59e1f60' down_revision = None branch_labels = None depends_on = None @@ -45,6 +45,29 @@ def upgrade(): sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('uri') ) + op.create_table('spiff_logging', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('process_instance_id', sa.Integer(), nullable=False), + sa.Column('bpmn_process_identifier', sa.String(length=255), nullable=False), + sa.Column('bpmn_task_identifier', sa.String(length=255), nullable=False), + sa.Column('bpmn_task_name', sa.String(length=255), nullable=True), + sa.Column('bpmn_task_type', sa.String(length=255), nullable=True), + sa.Column('spiff_task_guid', sa.String(length=50), nullable=False), + sa.Column('timestamp', sa.DECIMAL(precision=17, scale=6), nullable=False), + sa.Column('message', sa.String(length=255), nullable=True), + sa.Column('current_user_id', sa.Integer(), nullable=True), + sa.Column('spiff_step', sa.Integer(), nullable=False), + sa.PrimaryKeyConstraint('id') + ) + op.create_table('spiff_step_details', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('process_instance_id', sa.Integer(), nullable=False), + sa.Column('spiff_step', sa.Integer(), nullable=False), + sa.Column('task_json', sa.JSON(), nullable=False), + sa.Column('timestamp', sa.DECIMAL(precision=17, scale=6), nullable=False), + sa.Column('completed_by_user_id', sa.Integer(), nullable=True), + sa.PrimaryKeyConstraint('id') + ) op.create_table('user', sa.Column('id', sa.Integer(), nullable=False), sa.Column('username', sa.String(length=255), nullable=False), @@ -216,33 +239,6 @@ def upgrade(): sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('principal_id', 'permission_target_id', 'permission', name='permission_assignment_uniq') ) - op.create_table('spiff_logging', - sa.Column('id', sa.Integer(), nullable=False), - sa.Column('process_instance_id', sa.Integer(), nullable=False), - sa.Column('bpmn_process_identifier', sa.String(length=255), nullable=False), - sa.Column('bpmn_task_identifier', sa.String(length=255), nullable=False), - sa.Column('bpmn_task_name', sa.String(length=255), nullable=True), - sa.Column('bpmn_task_type', sa.String(length=255), nullable=True), - sa.Column('spiff_task_guid', sa.String(length=50), nullable=False), - sa.Column('timestamp', sa.DECIMAL(precision=17, scale=6), nullable=False), - sa.Column('message', sa.String(length=255), nullable=True), - sa.Column('current_user_id', sa.Integer(), nullable=True), - sa.Column('spiff_step', sa.Integer(), nullable=False), - sa.ForeignKeyConstraint(['current_user_id'], ['user.id'], ), - sa.ForeignKeyConstraint(['process_instance_id'], ['process_instance.id'], ), - sa.PrimaryKeyConstraint('id') - ) - op.create_table('spiff_step_details', - sa.Column('id', sa.Integer(), nullable=False), - sa.Column('process_instance_id', sa.Integer(), nullable=False), - sa.Column('spiff_step', sa.Integer(), nullable=False), - sa.Column('task_json', sa.JSON(), nullable=False), - sa.Column('timestamp', sa.DECIMAL(precision=17, scale=6), nullable=False), - sa.Column('completed_by_user_id', sa.Integer(), nullable=True), - sa.ForeignKeyConstraint(['completed_by_user_id'], ['user.id'], ), - sa.ForeignKeyConstraint(['process_instance_id'], ['process_instance.id'], ), - sa.PrimaryKeyConstraint('id') - ) op.create_table('active_task_user', sa.Column('id', sa.Integer(), nullable=False), sa.Column('active_task_id', sa.Integer(), nullable=False), @@ -276,8 +272,6 @@ def downgrade(): op.drop_index(op.f('ix_active_task_user_user_id'), table_name='active_task_user') op.drop_index(op.f('ix_active_task_user_active_task_id'), table_name='active_task_user') op.drop_table('active_task_user') - op.drop_table('spiff_step_details') - op.drop_table('spiff_logging') op.drop_table('permission_assignment') op.drop_table('message_instance') op.drop_index(op.f('ix_message_correlation_value'), table_name='message_correlation') @@ -302,6 +296,8 @@ def downgrade(): op.drop_index(op.f('ix_message_correlation_property_identifier'), table_name='message_correlation_property') op.drop_table('message_correlation_property') op.drop_table('user') + op.drop_table('spiff_step_details') + op.drop_table('spiff_logging') op.drop_table('permission_target') op.drop_index(op.f('ix_message_model_name'), table_name='message_model') op.drop_index(op.f('ix_message_model_identifier'), table_name='message_model') diff --git a/src/spiffworkflow_backend/models/process_instance.py b/src/spiffworkflow_backend/models/process_instance.py index 8d4cb538..37b65157 100644 --- a/src/spiffworkflow_backend/models/process_instance.py +++ b/src/spiffworkflow_backend/models/process_instance.py @@ -80,10 +80,8 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel): process_initiator = relationship("UserModel") active_tasks = relationship("ActiveTaskModel", cascade="delete") # type: ignore - spiff_logs = relationship("SpiffLoggingModel", cascade="delete") # type: ignore message_instances = relationship("MessageInstanceModel", cascade="delete") # type: ignore message_correlations = relationship("MessageCorrelationModel", cascade="delete") # type: ignore - spiff_step_details = relationship("SpiffStepDetailsModel", cascade="delete") # type: ignore bpmn_json: str | None = deferred(db.Column(db.JSON)) # type: ignore start_in_seconds: int | None = db.Column(db.Integer) diff --git a/src/spiffworkflow_backend/models/spiff_logging.py b/src/spiffworkflow_backend/models/spiff_logging.py index 58f13cd4..b0b90887 100644 --- a/src/spiffworkflow_backend/models/spiff_logging.py +++ b/src/spiffworkflow_backend/models/spiff_logging.py @@ -4,10 +4,6 @@ from typing import Optional from flask_bpmn.models.db import db from flask_bpmn.models.db import SpiffworkflowBaseDBModel -from sqlalchemy import ForeignKey - -from spiffworkflow_backend.models.process_instance import ProcessInstanceModel -from spiffworkflow_backend.models.user import UserModel @dataclass @@ -16,7 +12,7 @@ class SpiffLoggingModel(SpiffworkflowBaseDBModel): __tablename__ = "spiff_logging" id: int = db.Column(db.Integer, primary_key=True) - process_instance_id: int = db.Column(ForeignKey(ProcessInstanceModel.id), nullable=False) # type: ignore + process_instance_id: int = db.Column(db.Integer, nullable=False) bpmn_process_identifier: str = db.Column(db.String(255), nullable=False) bpmn_task_identifier: str = db.Column(db.String(255), nullable=False) bpmn_task_name: str = db.Column(db.String(255), nullable=True) @@ -24,5 +20,5 @@ class SpiffLoggingModel(SpiffworkflowBaseDBModel): spiff_task_guid: str = db.Column(db.String(50), nullable=False) timestamp: float = db.Column(db.DECIMAL(17, 6), nullable=False) message: Optional[str] = db.Column(db.String(255), nullable=True) - current_user_id: int = db.Column(ForeignKey(UserModel.id), nullable=True) + current_user_id: int = db.Column(db.Integer, nullable=True) spiff_step: int = db.Column(db.Integer, nullable=False) diff --git a/src/spiffworkflow_backend/models/spiff_step_details.py b/src/spiffworkflow_backend/models/spiff_step_details.py index 1706c2e9..56499e91 100644 --- a/src/spiffworkflow_backend/models/spiff_step_details.py +++ b/src/spiffworkflow_backend/models/spiff_step_details.py @@ -3,12 +3,8 @@ from dataclasses import dataclass from flask_bpmn.models.db import db from flask_bpmn.models.db import SpiffworkflowBaseDBModel -from sqlalchemy import ForeignKey from sqlalchemy.orm import deferred -from spiffworkflow_backend.models.process_instance import ProcessInstanceModel -from spiffworkflow_backend.models.user import UserModel - @dataclass class SpiffStepDetailsModel(SpiffworkflowBaseDBModel): @@ -16,8 +12,8 @@ class SpiffStepDetailsModel(SpiffworkflowBaseDBModel): __tablename__ = "spiff_step_details" id: int = db.Column(db.Integer, primary_key=True) - process_instance_id: int = db.Column(ForeignKey(ProcessInstanceModel.id), nullable=False) # type: ignore + process_instance_id: int = db.Column(db.Integer, nullable=False) spiff_step: int = db.Column(db.Integer, nullable=False) - task_json: str | None = deferred(db.Column(db.JSON, nullable=False)) # type: ignore + task_json: str = deferred(db.Column(db.JSON, nullable=False)) # type: ignore timestamp: float = db.Column(db.DECIMAL(17, 6), nullable=False) - completed_by_user_id: int = db.Column(ForeignKey(UserModel.id), nullable=True) + completed_by_user_id: int = db.Column(db.Integer, nullable=True) diff --git a/src/spiffworkflow_backend/routes/process_api_blueprint.py b/src/spiffworkflow_backend/routes/process_api_blueprint.py index 2f60e994..938e6cf3 100644 --- a/src/spiffworkflow_backend/routes/process_api_blueprint.py +++ b/src/spiffworkflow_backend/routes/process_api_blueprint.py @@ -521,7 +521,7 @@ def process_instance_log_list( ) .order_by(SpiffLoggingModel.timestamp.desc()) # type: ignore .join( - UserModel, isouter=True + UserModel, UserModel.id == SpiffLoggingModel.current_user_id, isouter=True ) # isouter since if we don't have a user, we still want the log .add_columns( UserModel.username, @@ -790,6 +790,12 @@ def process_instance_delete(process_instance_id: int) -> flask.wrappers.Response # (Pdb) db.session.delete # > + db.session.query(SpiffLoggingModel).filter_by( + process_instance_id=process_instance.id + ).delete() + db.session.query(SpiffStepDetailsModel).filter_by( + process_instance_id=process_instance.id + ).delete() db.session.delete(process_instance) db.session.commit() return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") diff --git a/src/spiffworkflow_backend/services/logging_service.py b/src/spiffworkflow_backend/services/logging_service.py index 13f66e00..c4e8c8ae 100644 --- a/src/spiffworkflow_backend/services/logging_service.py +++ b/src/spiffworkflow_backend/services/logging_service.py @@ -193,6 +193,17 @@ def setup_logger(app: Flask) -> None: class DBHandler(logging.Handler): """DBHandler.""" + def __init__(self) -> None: + """__init__.""" + self.logs: list[dict] = [] + super().__init__() + + def bulk_insert_logs(self) -> None: + """Bulk_insert_logs.""" + db.session.bulk_insert_mappings(SpiffLoggingModel, self.logs) + db.session.commit() + self.logs = [] + def emit(self, record: logging.LogRecord) -> None: """Emit.""" # if we do not have a process instance id then do not log and assume we are running a script unit test @@ -211,17 +222,19 @@ class DBHandler(logging.Handler): if hasattr(record, "spiff_step") and record.spiff_step is not None # type: ignore else 1 ) - spiff_log = SpiffLoggingModel( - process_instance_id=record.process_instance_id, # type: ignore - bpmn_process_identifier=bpmn_process_identifier, - spiff_task_guid=spiff_task_guid, - bpmn_task_name=bpmn_task_name, - bpmn_task_identifier=bpmn_task_identifier, - bpmn_task_type=bpmn_task_type, - message=message, - timestamp=timestamp, - current_user_id=current_user_id, - spiff_step=spiff_step, + self.logs.append( + { + "process_instance_id": record.process_instance_id, # type: ignore + "bpmn_process_identifier": bpmn_process_identifier, + "spiff_task_guid": spiff_task_guid, + "bpmn_task_name": bpmn_task_name, + "bpmn_task_identifier": bpmn_task_identifier, + "bpmn_task_type": bpmn_task_type, + "message": message, + "timestamp": timestamp, + "current_user_id": current_user_id, + "spiff_step": spiff_step, + } ) - db.session.add(spiff_log) - db.session.commit() + if len(self.logs) % 1000 == 0: + self.bulk_insert_logs() diff --git a/src/spiffworkflow_backend/services/process_instance_processor.py b/src/spiffworkflow_backend/services/process_instance_processor.py index 0ed85511..a9f7ac66 100644 --- a/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/src/spiffworkflow_backend/services/process_instance_processor.py @@ -559,7 +559,7 @@ class ProcessInstanceProcessor: "lane_assignment_id": lane_assignment_id, } - def save_spiff_step_details(self) -> None: + def spiff_step_details_mapping(self) -> dict: """SaveSpiffStepDetails.""" bpmn_json = self.serialize() wf_json = json.loads(bpmn_json) @@ -570,13 +570,29 @@ class ProcessInstanceProcessor: # TODO want to just save the tasks, something wasn't immediately working # so after the flow works with the full wf_json revisit this task_json = wf_json + return { + "process_instance_id": self.process_instance_model.id, + "spiff_step": self.process_instance_model.spiff_step or 1, + "task_json": task_json, + "timestamp": round(time.time()), + "completed_by_user_id": self.current_user().id, + } + + def spiff_step_details(self) -> SpiffStepDetailsModel: + """SaveSpiffStepDetails.""" + details_mapping = self.spiff_step_details_mapping() details_model = SpiffStepDetailsModel( - process_instance_id=self.process_instance_model.id, - spiff_step=self.process_instance_model.spiff_step or 1, - task_json=task_json, - timestamp=round(time.time()), - completed_by_user_id=self.current_user().id, + process_instance_id=details_mapping["process_instance_id"], + spiff_step=details_mapping["spiff_step"], + task_json=details_mapping["task_json"], + timestamp=details_mapping["timestamp"], + completed_by_user_id=details_mapping["completed_by_user_id"], ) + return details_model + + def save_spiff_step_details(self) -> None: + """SaveSpiffStepDetails.""" + details_model = self.spiff_step_details() db.session.add(details_model) db.session.commit() @@ -989,25 +1005,47 @@ class ProcessInstanceProcessor: self.process_instance_model.spiff_step = spiff_step current_app.config["THREAD_LOCAL_DATA"].spiff_step = spiff_step db.session.add(self.process_instance_model) - db.session.commit() + + # TODO remove after done with the performance improvements + # to use delete the _ prefix here and add it to the real def below + def _do_engine_steps(self, exit_at: None = None, save: bool = False) -> None: + """__do_engine_steps.""" + import cProfile + from pstats import SortKey + + with cProfile.Profile() as pr: + self._do_engine_steps(exit_at=exit_at, save=save) + pr.print_stats(sort=SortKey.CUMULATIVE) def do_engine_steps(self, exit_at: None = None, save: bool = False) -> None: """Do_engine_steps.""" + step_details = [] try: self.bpmn_process_instance.refresh_waiting_tasks( will_refresh_task=lambda t: self.increment_spiff_step(), - did_refresh_task=lambda t: self.save_spiff_step_details(), + did_refresh_task=lambda t: step_details.append( + self.spiff_step_details_mapping() + ), ) self.bpmn_process_instance.do_engine_steps( exit_at=exit_at, will_complete_task=lambda t: self.increment_spiff_step(), - did_complete_task=lambda t: self.save_spiff_step_details(), + did_complete_task=lambda t: step_details.append( + self.spiff_step_details_mapping() + ), ) self.process_bpmn_messages() self.queue_waiting_receive_messages() + db.session.bulk_insert_mappings(SpiffStepDetailsModel, step_details) + spiff_logger = logging.getLogger("spiff") + for handler in spiff_logger.handlers: + if hasattr(handler, "bulk_insert_logs"): + handler.bulk_insert_logs() # type: ignore + db.session.commit() + except WorkflowTaskExecException as we: raise ApiError.from_workflow_exception("task_error", str(we), we) from we diff --git a/tests/spiffworkflow_backend/helpers/example_data.py b/tests/spiffworkflow_backend/helpers/example_data.py index ac1e8dc8..251ba19c 100644 --- a/tests/spiffworkflow_backend/helpers/example_data.py +++ b/tests/spiffworkflow_backend/helpers/example_data.py @@ -19,7 +19,6 @@ class ExampleDataLoader: display_name: str = "", description: str = "", display_order: int = 0, - # from_tests: bool = False, bpmn_file_name: Optional[str] = None, process_model_source_directory: Optional[str] = None, ) -> ProcessModelInfo: @@ -58,9 +57,9 @@ class ExampleDataLoader: if bpmn_file_name: file_name_matcher = bpmn_file_name_with_extension - # file_glob = "" + # we need instance_path here for nox tests file_glob = os.path.join( - current_app.root_path, + current_app.instance_path, "..", "..", "tests",