Bulk insert spiff logs and step details (#26)

This commit is contained in:
jbirddog 2022-11-09 15:43:12 -05:00 committed by GitHub
parent f1b0de8297
commit 1382b25d37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 112 additions and 69 deletions

View File

@ -1,8 +1,8 @@
"""empty message
Revision ID: 50dd2e016d94
Revision ID: fd00c59e1f60
Revises:
Create Date: 2022-11-08 16:28:18.991635
Create Date: 2022-11-09 14:04:14.169379
"""
from alembic import op
@ -10,7 +10,7 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '50dd2e016d94'
revision = 'fd00c59e1f60'
down_revision = None
branch_labels = None
depends_on = None
@ -45,6 +45,29 @@ def upgrade():
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('uri')
)
op.create_table('spiff_logging',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('process_instance_id', sa.Integer(), nullable=False),
sa.Column('bpmn_process_identifier', sa.String(length=255), nullable=False),
sa.Column('bpmn_task_identifier', sa.String(length=255), nullable=False),
sa.Column('bpmn_task_name', sa.String(length=255), nullable=True),
sa.Column('bpmn_task_type', sa.String(length=255), nullable=True),
sa.Column('spiff_task_guid', sa.String(length=50), nullable=False),
sa.Column('timestamp', sa.DECIMAL(precision=17, scale=6), nullable=False),
sa.Column('message', sa.String(length=255), nullable=True),
sa.Column('current_user_id', sa.Integer(), nullable=True),
sa.Column('spiff_step', sa.Integer(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_table('spiff_step_details',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('process_instance_id', sa.Integer(), nullable=False),
sa.Column('spiff_step', sa.Integer(), nullable=False),
sa.Column('task_json', sa.JSON(), nullable=False),
sa.Column('timestamp', sa.DECIMAL(precision=17, scale=6), nullable=False),
sa.Column('completed_by_user_id', sa.Integer(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_table('user',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('username', sa.String(length=255), nullable=False),
@ -216,33 +239,6 @@ def upgrade():
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('principal_id', 'permission_target_id', 'permission', name='permission_assignment_uniq')
)
op.create_table('spiff_logging',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('process_instance_id', sa.Integer(), nullable=False),
sa.Column('bpmn_process_identifier', sa.String(length=255), nullable=False),
sa.Column('bpmn_task_identifier', sa.String(length=255), nullable=False),
sa.Column('bpmn_task_name', sa.String(length=255), nullable=True),
sa.Column('bpmn_task_type', sa.String(length=255), nullable=True),
sa.Column('spiff_task_guid', sa.String(length=50), nullable=False),
sa.Column('timestamp', sa.DECIMAL(precision=17, scale=6), nullable=False),
sa.Column('message', sa.String(length=255), nullable=True),
sa.Column('current_user_id', sa.Integer(), nullable=True),
sa.Column('spiff_step', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['current_user_id'], ['user.id'], ),
sa.ForeignKeyConstraint(['process_instance_id'], ['process_instance.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_table('spiff_step_details',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('process_instance_id', sa.Integer(), nullable=False),
sa.Column('spiff_step', sa.Integer(), nullable=False),
sa.Column('task_json', sa.JSON(), nullable=False),
sa.Column('timestamp', sa.DECIMAL(precision=17, scale=6), nullable=False),
sa.Column('completed_by_user_id', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['completed_by_user_id'], ['user.id'], ),
sa.ForeignKeyConstraint(['process_instance_id'], ['process_instance.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_table('active_task_user',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('active_task_id', sa.Integer(), nullable=False),
@ -276,8 +272,6 @@ def downgrade():
op.drop_index(op.f('ix_active_task_user_user_id'), table_name='active_task_user')
op.drop_index(op.f('ix_active_task_user_active_task_id'), table_name='active_task_user')
op.drop_table('active_task_user')
op.drop_table('spiff_step_details')
op.drop_table('spiff_logging')
op.drop_table('permission_assignment')
op.drop_table('message_instance')
op.drop_index(op.f('ix_message_correlation_value'), table_name='message_correlation')
@ -302,6 +296,8 @@ def downgrade():
op.drop_index(op.f('ix_message_correlation_property_identifier'), table_name='message_correlation_property')
op.drop_table('message_correlation_property')
op.drop_table('user')
op.drop_table('spiff_step_details')
op.drop_table('spiff_logging')
op.drop_table('permission_target')
op.drop_index(op.f('ix_message_model_name'), table_name='message_model')
op.drop_index(op.f('ix_message_model_identifier'), table_name='message_model')

View File

@ -80,10 +80,8 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel):
process_initiator = relationship("UserModel")
active_tasks = relationship("ActiveTaskModel", cascade="delete") # type: ignore
spiff_logs = relationship("SpiffLoggingModel", cascade="delete") # type: ignore
message_instances = relationship("MessageInstanceModel", cascade="delete") # type: ignore
message_correlations = relationship("MessageCorrelationModel", cascade="delete") # type: ignore
spiff_step_details = relationship("SpiffStepDetailsModel", cascade="delete") # type: ignore
bpmn_json: str | None = deferred(db.Column(db.JSON)) # type: ignore
start_in_seconds: int | None = db.Column(db.Integer)

View File

@ -4,10 +4,6 @@ from typing import Optional
from flask_bpmn.models.db import db
from flask_bpmn.models.db import SpiffworkflowBaseDBModel
from sqlalchemy import ForeignKey
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.user import UserModel
@dataclass
@ -16,7 +12,7 @@ class SpiffLoggingModel(SpiffworkflowBaseDBModel):
__tablename__ = "spiff_logging"
id: int = db.Column(db.Integer, primary_key=True)
process_instance_id: int = db.Column(ForeignKey(ProcessInstanceModel.id), nullable=False) # type: ignore
process_instance_id: int = db.Column(db.Integer, nullable=False)
bpmn_process_identifier: str = db.Column(db.String(255), nullable=False)
bpmn_task_identifier: str = db.Column(db.String(255), nullable=False)
bpmn_task_name: str = db.Column(db.String(255), nullable=True)
@ -24,5 +20,5 @@ class SpiffLoggingModel(SpiffworkflowBaseDBModel):
spiff_task_guid: str = db.Column(db.String(50), nullable=False)
timestamp: float = db.Column(db.DECIMAL(17, 6), nullable=False)
message: Optional[str] = db.Column(db.String(255), nullable=True)
current_user_id: int = db.Column(ForeignKey(UserModel.id), nullable=True)
current_user_id: int = db.Column(db.Integer, nullable=True)
spiff_step: int = db.Column(db.Integer, nullable=False)

View File

@ -3,12 +3,8 @@ from dataclasses import dataclass
from flask_bpmn.models.db import db
from flask_bpmn.models.db import SpiffworkflowBaseDBModel
from sqlalchemy import ForeignKey
from sqlalchemy.orm import deferred
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.user import UserModel
@dataclass
class SpiffStepDetailsModel(SpiffworkflowBaseDBModel):
@ -16,8 +12,8 @@ class SpiffStepDetailsModel(SpiffworkflowBaseDBModel):
__tablename__ = "spiff_step_details"
id: int = db.Column(db.Integer, primary_key=True)
process_instance_id: int = db.Column(ForeignKey(ProcessInstanceModel.id), nullable=False) # type: ignore
process_instance_id: int = db.Column(db.Integer, nullable=False)
spiff_step: int = db.Column(db.Integer, nullable=False)
task_json: str | None = deferred(db.Column(db.JSON, nullable=False)) # type: ignore
timestamp: float = db.Column(db.DECIMAL(17, 6), nullable=False)
completed_by_user_id: int = db.Column(ForeignKey(UserModel.id), nullable=True)
completed_by_user_id: int = db.Column(db.Integer, nullable=True)

View File

@ -521,7 +521,7 @@ def process_instance_log_list(
)
.order_by(SpiffLoggingModel.timestamp.desc()) # type: ignore
.join(
UserModel, isouter=True
UserModel, UserModel.id == SpiffLoggingModel.current_user_id, isouter=True
) # isouter since if we don't have a user, we still want the log
.add_columns(
UserModel.username,
@ -790,6 +790,12 @@ def process_instance_delete(process_instance_id: int) -> flask.wrappers.Response
# (Pdb) db.session.delete
# <bound method delete of <sqlalchemy.orm.scoping.scoped_session object at 0x103eaab30>>
db.session.query(SpiffLoggingModel).filter_by(
process_instance_id=process_instance.id
).delete()
db.session.query(SpiffStepDetailsModel).filter_by(
process_instance_id=process_instance.id
).delete()
db.session.delete(process_instance)
db.session.commit()
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")

View File

@ -193,6 +193,17 @@ def setup_logger(app: Flask) -> None:
class DBHandler(logging.Handler):
"""DBHandler."""
def __init__(self) -> None:
"""__init__."""
self.logs: list[dict] = []
super().__init__()
def bulk_insert_logs(self) -> None:
"""Bulk_insert_logs."""
db.session.bulk_insert_mappings(SpiffLoggingModel, self.logs)
db.session.commit()
self.logs = []
def emit(self, record: logging.LogRecord) -> None:
"""Emit."""
# if we do not have a process instance id then do not log and assume we are running a script unit test
@ -211,17 +222,19 @@ class DBHandler(logging.Handler):
if hasattr(record, "spiff_step") and record.spiff_step is not None # type: ignore
else 1
)
spiff_log = SpiffLoggingModel(
process_instance_id=record.process_instance_id, # type: ignore
bpmn_process_identifier=bpmn_process_identifier,
spiff_task_guid=spiff_task_guid,
bpmn_task_name=bpmn_task_name,
bpmn_task_identifier=bpmn_task_identifier,
bpmn_task_type=bpmn_task_type,
message=message,
timestamp=timestamp,
current_user_id=current_user_id,
spiff_step=spiff_step,
self.logs.append(
{
"process_instance_id": record.process_instance_id, # type: ignore
"bpmn_process_identifier": bpmn_process_identifier,
"spiff_task_guid": spiff_task_guid,
"bpmn_task_name": bpmn_task_name,
"bpmn_task_identifier": bpmn_task_identifier,
"bpmn_task_type": bpmn_task_type,
"message": message,
"timestamp": timestamp,
"current_user_id": current_user_id,
"spiff_step": spiff_step,
}
)
db.session.add(spiff_log)
db.session.commit()
if len(self.logs) % 1000 == 0:
self.bulk_insert_logs()

View File

@ -559,7 +559,7 @@ class ProcessInstanceProcessor:
"lane_assignment_id": lane_assignment_id,
}
def save_spiff_step_details(self) -> None:
def spiff_step_details_mapping(self) -> dict:
"""SaveSpiffStepDetails."""
bpmn_json = self.serialize()
wf_json = json.loads(bpmn_json)
@ -570,13 +570,29 @@ class ProcessInstanceProcessor:
# TODO want to just save the tasks, something wasn't immediately working
# so after the flow works with the full wf_json revisit this
task_json = wf_json
return {
"process_instance_id": self.process_instance_model.id,
"spiff_step": self.process_instance_model.spiff_step or 1,
"task_json": task_json,
"timestamp": round(time.time()),
"completed_by_user_id": self.current_user().id,
}
def spiff_step_details(self) -> SpiffStepDetailsModel:
"""SaveSpiffStepDetails."""
details_mapping = self.spiff_step_details_mapping()
details_model = SpiffStepDetailsModel(
process_instance_id=self.process_instance_model.id,
spiff_step=self.process_instance_model.spiff_step or 1,
task_json=task_json,
timestamp=round(time.time()),
completed_by_user_id=self.current_user().id,
process_instance_id=details_mapping["process_instance_id"],
spiff_step=details_mapping["spiff_step"],
task_json=details_mapping["task_json"],
timestamp=details_mapping["timestamp"],
completed_by_user_id=details_mapping["completed_by_user_id"],
)
return details_model
def save_spiff_step_details(self) -> None:
"""SaveSpiffStepDetails."""
details_model = self.spiff_step_details()
db.session.add(details_model)
db.session.commit()
@ -989,25 +1005,47 @@ class ProcessInstanceProcessor:
self.process_instance_model.spiff_step = spiff_step
current_app.config["THREAD_LOCAL_DATA"].spiff_step = spiff_step
db.session.add(self.process_instance_model)
db.session.commit()
# TODO remove after done with the performance improvements
# to use delete the _ prefix here and add it to the real def below
def _do_engine_steps(self, exit_at: None = None, save: bool = False) -> None:
"""__do_engine_steps."""
import cProfile
from pstats import SortKey
with cProfile.Profile() as pr:
self._do_engine_steps(exit_at=exit_at, save=save)
pr.print_stats(sort=SortKey.CUMULATIVE)
def do_engine_steps(self, exit_at: None = None, save: bool = False) -> None:
"""Do_engine_steps."""
step_details = []
try:
self.bpmn_process_instance.refresh_waiting_tasks(
will_refresh_task=lambda t: self.increment_spiff_step(),
did_refresh_task=lambda t: self.save_spiff_step_details(),
did_refresh_task=lambda t: step_details.append(
self.spiff_step_details_mapping()
),
)
self.bpmn_process_instance.do_engine_steps(
exit_at=exit_at,
will_complete_task=lambda t: self.increment_spiff_step(),
did_complete_task=lambda t: self.save_spiff_step_details(),
did_complete_task=lambda t: step_details.append(
self.spiff_step_details_mapping()
),
)
self.process_bpmn_messages()
self.queue_waiting_receive_messages()
db.session.bulk_insert_mappings(SpiffStepDetailsModel, step_details)
spiff_logger = logging.getLogger("spiff")
for handler in spiff_logger.handlers:
if hasattr(handler, "bulk_insert_logs"):
handler.bulk_insert_logs() # type: ignore
db.session.commit()
except WorkflowTaskExecException as we:
raise ApiError.from_workflow_exception("task_error", str(we), we) from we