Squashed 'spiffworkflow-backend/' changes from a195512fd..795df3526

795df3526 use instance path instead of root_path for nox w/ burnettk
d4113651e task_json is not optional w/ burnettk
c93dd2822 Merge commit '89bfc25f35bdfd57eb9ccf6f3a9a3de76e68cf93'
44ba4738f Merge commit 'de30945eec9161570080b4858da967a7628ec86c'
f894ebee2 Bulk insert spiff logs and step details (#26)

git-subtree-dir: spiffworkflow-backend
git-subtree-split: 795df35260fe24562cf3ab2b92169ecad37d3d55
This commit is contained in:
jasquat 2022-11-09 17:01:17 -05:00
parent 89bfc25f35
commit 127ef67f12
8 changed files with 115 additions and 73 deletions

View File

@ -1,8 +1,8 @@
"""empty message """empty message
Revision ID: 50dd2e016d94 Revision ID: fd00c59e1f60
Revises: Revises:
Create Date: 2022-11-08 16:28:18.991635 Create Date: 2022-11-09 14:04:14.169379
""" """
from alembic import op from alembic import op
@ -10,7 +10,7 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
revision = '50dd2e016d94' revision = 'fd00c59e1f60'
down_revision = None down_revision = None
branch_labels = None branch_labels = None
depends_on = None depends_on = None
@ -45,6 +45,29 @@ def upgrade():
sa.PrimaryKeyConstraint('id'), sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('uri') sa.UniqueConstraint('uri')
) )
op.create_table('spiff_logging',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('process_instance_id', sa.Integer(), nullable=False),
sa.Column('bpmn_process_identifier', sa.String(length=255), nullable=False),
sa.Column('bpmn_task_identifier', sa.String(length=255), nullable=False),
sa.Column('bpmn_task_name', sa.String(length=255), nullable=True),
sa.Column('bpmn_task_type', sa.String(length=255), nullable=True),
sa.Column('spiff_task_guid', sa.String(length=50), nullable=False),
sa.Column('timestamp', sa.DECIMAL(precision=17, scale=6), nullable=False),
sa.Column('message', sa.String(length=255), nullable=True),
sa.Column('current_user_id', sa.Integer(), nullable=True),
sa.Column('spiff_step', sa.Integer(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_table('spiff_step_details',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('process_instance_id', sa.Integer(), nullable=False),
sa.Column('spiff_step', sa.Integer(), nullable=False),
sa.Column('task_json', sa.JSON(), nullable=False),
sa.Column('timestamp', sa.DECIMAL(precision=17, scale=6), nullable=False),
sa.Column('completed_by_user_id', sa.Integer(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_table('user', op.create_table('user',
sa.Column('id', sa.Integer(), nullable=False), sa.Column('id', sa.Integer(), nullable=False),
sa.Column('username', sa.String(length=255), nullable=False), sa.Column('username', sa.String(length=255), nullable=False),
@ -216,33 +239,6 @@ def upgrade():
sa.PrimaryKeyConstraint('id'), sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('principal_id', 'permission_target_id', 'permission', name='permission_assignment_uniq') sa.UniqueConstraint('principal_id', 'permission_target_id', 'permission', name='permission_assignment_uniq')
) )
op.create_table('spiff_logging',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('process_instance_id', sa.Integer(), nullable=False),
sa.Column('bpmn_process_identifier', sa.String(length=255), nullable=False),
sa.Column('bpmn_task_identifier', sa.String(length=255), nullable=False),
sa.Column('bpmn_task_name', sa.String(length=255), nullable=True),
sa.Column('bpmn_task_type', sa.String(length=255), nullable=True),
sa.Column('spiff_task_guid', sa.String(length=50), nullable=False),
sa.Column('timestamp', sa.DECIMAL(precision=17, scale=6), nullable=False),
sa.Column('message', sa.String(length=255), nullable=True),
sa.Column('current_user_id', sa.Integer(), nullable=True),
sa.Column('spiff_step', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['current_user_id'], ['user.id'], ),
sa.ForeignKeyConstraint(['process_instance_id'], ['process_instance.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_table('spiff_step_details',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('process_instance_id', sa.Integer(), nullable=False),
sa.Column('spiff_step', sa.Integer(), nullable=False),
sa.Column('task_json', sa.JSON(), nullable=False),
sa.Column('timestamp', sa.DECIMAL(precision=17, scale=6), nullable=False),
sa.Column('completed_by_user_id', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['completed_by_user_id'], ['user.id'], ),
sa.ForeignKeyConstraint(['process_instance_id'], ['process_instance.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_table('active_task_user', op.create_table('active_task_user',
sa.Column('id', sa.Integer(), nullable=False), sa.Column('id', sa.Integer(), nullable=False),
sa.Column('active_task_id', sa.Integer(), nullable=False), sa.Column('active_task_id', sa.Integer(), nullable=False),
@ -276,8 +272,6 @@ def downgrade():
op.drop_index(op.f('ix_active_task_user_user_id'), table_name='active_task_user') op.drop_index(op.f('ix_active_task_user_user_id'), table_name='active_task_user')
op.drop_index(op.f('ix_active_task_user_active_task_id'), table_name='active_task_user') op.drop_index(op.f('ix_active_task_user_active_task_id'), table_name='active_task_user')
op.drop_table('active_task_user') op.drop_table('active_task_user')
op.drop_table('spiff_step_details')
op.drop_table('spiff_logging')
op.drop_table('permission_assignment') op.drop_table('permission_assignment')
op.drop_table('message_instance') op.drop_table('message_instance')
op.drop_index(op.f('ix_message_correlation_value'), table_name='message_correlation') op.drop_index(op.f('ix_message_correlation_value'), table_name='message_correlation')
@ -302,6 +296,8 @@ def downgrade():
op.drop_index(op.f('ix_message_correlation_property_identifier'), table_name='message_correlation_property') op.drop_index(op.f('ix_message_correlation_property_identifier'), table_name='message_correlation_property')
op.drop_table('message_correlation_property') op.drop_table('message_correlation_property')
op.drop_table('user') op.drop_table('user')
op.drop_table('spiff_step_details')
op.drop_table('spiff_logging')
op.drop_table('permission_target') op.drop_table('permission_target')
op.drop_index(op.f('ix_message_model_name'), table_name='message_model') op.drop_index(op.f('ix_message_model_name'), table_name='message_model')
op.drop_index(op.f('ix_message_model_identifier'), table_name='message_model') op.drop_index(op.f('ix_message_model_identifier'), table_name='message_model')

View File

@ -80,10 +80,8 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel):
process_initiator = relationship("UserModel") process_initiator = relationship("UserModel")
active_tasks = relationship("ActiveTaskModel", cascade="delete") # type: ignore active_tasks = relationship("ActiveTaskModel", cascade="delete") # type: ignore
spiff_logs = relationship("SpiffLoggingModel", cascade="delete") # type: ignore
message_instances = relationship("MessageInstanceModel", cascade="delete") # type: ignore message_instances = relationship("MessageInstanceModel", cascade="delete") # type: ignore
message_correlations = relationship("MessageCorrelationModel", cascade="delete") # type: ignore message_correlations = relationship("MessageCorrelationModel", cascade="delete") # type: ignore
spiff_step_details = relationship("SpiffStepDetailsModel", cascade="delete") # type: ignore
bpmn_json: str | None = deferred(db.Column(db.JSON)) # type: ignore bpmn_json: str | None = deferred(db.Column(db.JSON)) # type: ignore
start_in_seconds: int | None = db.Column(db.Integer) start_in_seconds: int | None = db.Column(db.Integer)

View File

@ -4,10 +4,6 @@ from typing import Optional
from flask_bpmn.models.db import db from flask_bpmn.models.db import db
from flask_bpmn.models.db import SpiffworkflowBaseDBModel from flask_bpmn.models.db import SpiffworkflowBaseDBModel
from sqlalchemy import ForeignKey
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.user import UserModel
@dataclass @dataclass
@ -16,7 +12,7 @@ class SpiffLoggingModel(SpiffworkflowBaseDBModel):
__tablename__ = "spiff_logging" __tablename__ = "spiff_logging"
id: int = db.Column(db.Integer, primary_key=True) id: int = db.Column(db.Integer, primary_key=True)
process_instance_id: int = db.Column(ForeignKey(ProcessInstanceModel.id), nullable=False) # type: ignore process_instance_id: int = db.Column(db.Integer, nullable=False)
bpmn_process_identifier: str = db.Column(db.String(255), nullable=False) bpmn_process_identifier: str = db.Column(db.String(255), nullable=False)
bpmn_task_identifier: str = db.Column(db.String(255), nullable=False) bpmn_task_identifier: str = db.Column(db.String(255), nullable=False)
bpmn_task_name: str = db.Column(db.String(255), nullable=True) bpmn_task_name: str = db.Column(db.String(255), nullable=True)
@ -24,5 +20,5 @@ class SpiffLoggingModel(SpiffworkflowBaseDBModel):
spiff_task_guid: str = db.Column(db.String(50), nullable=False) spiff_task_guid: str = db.Column(db.String(50), nullable=False)
timestamp: float = db.Column(db.DECIMAL(17, 6), nullable=False) timestamp: float = db.Column(db.DECIMAL(17, 6), nullable=False)
message: Optional[str] = db.Column(db.String(255), nullable=True) message: Optional[str] = db.Column(db.String(255), nullable=True)
current_user_id: int = db.Column(ForeignKey(UserModel.id), nullable=True) current_user_id: int = db.Column(db.Integer, nullable=True)
spiff_step: int = db.Column(db.Integer, nullable=False) spiff_step: int = db.Column(db.Integer, nullable=False)

View File

@ -3,12 +3,8 @@ from dataclasses import dataclass
from flask_bpmn.models.db import db from flask_bpmn.models.db import db
from flask_bpmn.models.db import SpiffworkflowBaseDBModel from flask_bpmn.models.db import SpiffworkflowBaseDBModel
from sqlalchemy import ForeignKey
from sqlalchemy.orm import deferred from sqlalchemy.orm import deferred
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.user import UserModel
@dataclass @dataclass
class SpiffStepDetailsModel(SpiffworkflowBaseDBModel): class SpiffStepDetailsModel(SpiffworkflowBaseDBModel):
@ -16,8 +12,8 @@ class SpiffStepDetailsModel(SpiffworkflowBaseDBModel):
__tablename__ = "spiff_step_details" __tablename__ = "spiff_step_details"
id: int = db.Column(db.Integer, primary_key=True) id: int = db.Column(db.Integer, primary_key=True)
process_instance_id: int = db.Column(ForeignKey(ProcessInstanceModel.id), nullable=False) # type: ignore process_instance_id: int = db.Column(db.Integer, nullable=False)
spiff_step: int = db.Column(db.Integer, nullable=False) spiff_step: int = db.Column(db.Integer, nullable=False)
task_json: str | None = deferred(db.Column(db.JSON, nullable=False)) # type: ignore task_json: str = deferred(db.Column(db.JSON, nullable=False)) # type: ignore
timestamp: float = db.Column(db.DECIMAL(17, 6), nullable=False) timestamp: float = db.Column(db.DECIMAL(17, 6), nullable=False)
completed_by_user_id: int = db.Column(ForeignKey(UserModel.id), nullable=True) completed_by_user_id: int = db.Column(db.Integer, nullable=True)

View File

@ -521,7 +521,7 @@ def process_instance_log_list(
) )
.order_by(SpiffLoggingModel.timestamp.desc()) # type: ignore .order_by(SpiffLoggingModel.timestamp.desc()) # type: ignore
.join( .join(
UserModel, isouter=True UserModel, UserModel.id == SpiffLoggingModel.current_user_id, isouter=True
) # isouter since if we don't have a user, we still want the log ) # isouter since if we don't have a user, we still want the log
.add_columns( .add_columns(
UserModel.username, UserModel.username,
@ -790,6 +790,12 @@ def process_instance_delete(process_instance_id: int) -> flask.wrappers.Response
# (Pdb) db.session.delete # (Pdb) db.session.delete
# <bound method delete of <sqlalchemy.orm.scoping.scoped_session object at 0x103eaab30>> # <bound method delete of <sqlalchemy.orm.scoping.scoped_session object at 0x103eaab30>>
db.session.query(SpiffLoggingModel).filter_by(
process_instance_id=process_instance.id
).delete()
db.session.query(SpiffStepDetailsModel).filter_by(
process_instance_id=process_instance.id
).delete()
db.session.delete(process_instance) db.session.delete(process_instance)
db.session.commit() db.session.commit()
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")

View File

@ -193,6 +193,17 @@ def setup_logger(app: Flask) -> None:
class DBHandler(logging.Handler): class DBHandler(logging.Handler):
"""DBHandler.""" """DBHandler."""
def __init__(self) -> None:
"""__init__."""
self.logs: list[dict] = []
super().__init__()
def bulk_insert_logs(self) -> None:
"""Bulk_insert_logs."""
db.session.bulk_insert_mappings(SpiffLoggingModel, self.logs)
db.session.commit()
self.logs = []
def emit(self, record: logging.LogRecord) -> None: def emit(self, record: logging.LogRecord) -> None:
"""Emit.""" """Emit."""
# if we do not have a process instance id then do not log and assume we are running a script unit test # if we do not have a process instance id then do not log and assume we are running a script unit test
@ -211,17 +222,19 @@ class DBHandler(logging.Handler):
if hasattr(record, "spiff_step") and record.spiff_step is not None # type: ignore if hasattr(record, "spiff_step") and record.spiff_step is not None # type: ignore
else 1 else 1
) )
spiff_log = SpiffLoggingModel( self.logs.append(
process_instance_id=record.process_instance_id, # type: ignore {
bpmn_process_identifier=bpmn_process_identifier, "process_instance_id": record.process_instance_id, # type: ignore
spiff_task_guid=spiff_task_guid, "bpmn_process_identifier": bpmn_process_identifier,
bpmn_task_name=bpmn_task_name, "spiff_task_guid": spiff_task_guid,
bpmn_task_identifier=bpmn_task_identifier, "bpmn_task_name": bpmn_task_name,
bpmn_task_type=bpmn_task_type, "bpmn_task_identifier": bpmn_task_identifier,
message=message, "bpmn_task_type": bpmn_task_type,
timestamp=timestamp, "message": message,
current_user_id=current_user_id, "timestamp": timestamp,
spiff_step=spiff_step, "current_user_id": current_user_id,
"spiff_step": spiff_step,
}
) )
db.session.add(spiff_log) if len(self.logs) % 1000 == 0:
db.session.commit() self.bulk_insert_logs()

View File

@ -559,7 +559,7 @@ class ProcessInstanceProcessor:
"lane_assignment_id": lane_assignment_id, "lane_assignment_id": lane_assignment_id,
} }
def save_spiff_step_details(self) -> None: def spiff_step_details_mapping(self) -> dict:
"""SaveSpiffStepDetails.""" """SaveSpiffStepDetails."""
bpmn_json = self.serialize() bpmn_json = self.serialize()
wf_json = json.loads(bpmn_json) wf_json = json.loads(bpmn_json)
@ -570,13 +570,29 @@ class ProcessInstanceProcessor:
# TODO want to just save the tasks, something wasn't immediately working # TODO want to just save the tasks, something wasn't immediately working
# so after the flow works with the full wf_json revisit this # so after the flow works with the full wf_json revisit this
task_json = wf_json task_json = wf_json
return {
"process_instance_id": self.process_instance_model.id,
"spiff_step": self.process_instance_model.spiff_step or 1,
"task_json": task_json,
"timestamp": round(time.time()),
"completed_by_user_id": self.current_user().id,
}
def spiff_step_details(self) -> SpiffStepDetailsModel:
"""SaveSpiffStepDetails."""
details_mapping = self.spiff_step_details_mapping()
details_model = SpiffStepDetailsModel( details_model = SpiffStepDetailsModel(
process_instance_id=self.process_instance_model.id, process_instance_id=details_mapping["process_instance_id"],
spiff_step=self.process_instance_model.spiff_step or 1, spiff_step=details_mapping["spiff_step"],
task_json=task_json, task_json=details_mapping["task_json"],
timestamp=round(time.time()), timestamp=details_mapping["timestamp"],
completed_by_user_id=self.current_user().id, completed_by_user_id=details_mapping["completed_by_user_id"],
) )
return details_model
def save_spiff_step_details(self) -> None:
"""SaveSpiffStepDetails."""
details_model = self.spiff_step_details()
db.session.add(details_model) db.session.add(details_model)
db.session.commit() db.session.commit()
@ -989,25 +1005,47 @@ class ProcessInstanceProcessor:
self.process_instance_model.spiff_step = spiff_step self.process_instance_model.spiff_step = spiff_step
current_app.config["THREAD_LOCAL_DATA"].spiff_step = spiff_step current_app.config["THREAD_LOCAL_DATA"].spiff_step = spiff_step
db.session.add(self.process_instance_model) db.session.add(self.process_instance_model)
db.session.commit()
# TODO remove after done with the performance improvements
# to use delete the _ prefix here and add it to the real def below
def _do_engine_steps(self, exit_at: None = None, save: bool = False) -> None:
"""__do_engine_steps."""
import cProfile
from pstats import SortKey
with cProfile.Profile() as pr:
self._do_engine_steps(exit_at=exit_at, save=save)
pr.print_stats(sort=SortKey.CUMULATIVE)
def do_engine_steps(self, exit_at: None = None, save: bool = False) -> None: def do_engine_steps(self, exit_at: None = None, save: bool = False) -> None:
"""Do_engine_steps.""" """Do_engine_steps."""
step_details = []
try: try:
self.bpmn_process_instance.refresh_waiting_tasks( self.bpmn_process_instance.refresh_waiting_tasks(
will_refresh_task=lambda t: self.increment_spiff_step(), will_refresh_task=lambda t: self.increment_spiff_step(),
did_refresh_task=lambda t: self.save_spiff_step_details(), did_refresh_task=lambda t: step_details.append(
self.spiff_step_details_mapping()
),
) )
self.bpmn_process_instance.do_engine_steps( self.bpmn_process_instance.do_engine_steps(
exit_at=exit_at, exit_at=exit_at,
will_complete_task=lambda t: self.increment_spiff_step(), will_complete_task=lambda t: self.increment_spiff_step(),
did_complete_task=lambda t: self.save_spiff_step_details(), did_complete_task=lambda t: step_details.append(
self.spiff_step_details_mapping()
),
) )
self.process_bpmn_messages() self.process_bpmn_messages()
self.queue_waiting_receive_messages() self.queue_waiting_receive_messages()
db.session.bulk_insert_mappings(SpiffStepDetailsModel, step_details)
spiff_logger = logging.getLogger("spiff")
for handler in spiff_logger.handlers:
if hasattr(handler, "bulk_insert_logs"):
handler.bulk_insert_logs() # type: ignore
db.session.commit()
except WorkflowTaskExecException as we: except WorkflowTaskExecException as we:
raise ApiError.from_workflow_exception("task_error", str(we), we) from we raise ApiError.from_workflow_exception("task_error", str(we), we) from we

View File

@ -19,7 +19,6 @@ class ExampleDataLoader:
display_name: str = "", display_name: str = "",
description: str = "", description: str = "",
display_order: int = 0, display_order: int = 0,
# from_tests: bool = False,
bpmn_file_name: Optional[str] = None, bpmn_file_name: Optional[str] = None,
process_model_source_directory: Optional[str] = None, process_model_source_directory: Optional[str] = None,
) -> ProcessModelInfo: ) -> ProcessModelInfo:
@ -58,9 +57,9 @@ class ExampleDataLoader:
if bpmn_file_name: if bpmn_file_name:
file_name_matcher = bpmn_file_name_with_extension file_name_matcher = bpmn_file_name_with_extension
# file_glob = "" # we need instance_path here for nox tests
file_glob = os.path.join( file_glob = os.path.join(
current_app.root_path, current_app.instance_path,
"..", "..",
"..", "..",
"tests", "tests",