process instance report refactor (#977)

* moved some code blocks from main process instance report service method to own methods w/ burnettk

* added notes of how to implement on backend w/ burnettk

* fixed based on some code rabbit suggestions

* pyl

---------

Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
jasquat 2024-02-06 14:08:03 -05:00 committed by GitHub
parent fcb52cbe67
commit 64d880509a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -183,13 +183,11 @@ class ProcessInstanceReportService:
@classmethod
def process_instance_metadata_as_columns(cls, process_model_identifier: str | None = None) -> list[ReportMetadataColumn]:
columns_for_metadata_query = (
db.session.query(ProcessInstanceMetadataModel.key)
.order_by(ProcessInstanceMetadataModel.key)
.distinct() # type: ignore
columns_for_metadata_query = db.session.query(ProcessInstanceMetadataModel.key.distinct()).order_by( # type: ignore
ProcessInstanceMetadataModel.key
)
if process_model_identifier:
columns_for_metadata_query = columns_for_metadata_query.join(ProcessInstanceModel)
columns_for_metadata_query = columns_for_metadata_query.join(ProcessInstanceModel) # type: ignore
columns_for_metadata_query = columns_for_metadata_query.filter(
ProcessInstanceModel.process_model_identifier == process_model_identifier
)
@ -392,6 +390,12 @@ class ProcessInstanceReportService:
if filter_found is False:
filters.append(new_filter)
# When we say we want to filter by "waiting for group" or "waiting for specific user," what we probably assume is that
# there are human tasks that those people can actually complete right now.
# We don't exactly have that in the query directly, but if you pass a filter for user_group_identifier, it will get into
# this function, and if you pass any statuses, and if they are all "active" then it will do what you want, which is
# to look for only HumanTaskModel.completed.is_(False). So...we should probably make the widget add filters for
# both user_group_identifier and status. And we should make a method that does a similar thing for waiting for users.
@classmethod
def filter_by_user_group_identifier(
cls,
@ -428,175 +432,74 @@ class ProcessInstanceReportService:
UserGroupAssignmentModel,
UserGroupAssignmentModel.group_id == GroupModel.id,
)
# FIXME: this may be problematic
# if user_group_identifier filter is set to something you are not in
process_instance_query = process_instance_query.filter(UserGroupAssignmentModel.user_id == user.id)
return process_instance_query
@classmethod
def run_process_instance_report(
def filter_by_instances_with_tasks_waiting_for_me(
cls,
process_instance_query: Query,
user: UserModel,
) -> Query:
process_instance_query = process_instance_query.filter(ProcessInstanceModel.process_initiator_id != user.id)
process_instance_query = process_instance_query.join(
HumanTaskModel,
and_(
HumanTaskModel.process_instance_id == ProcessInstanceModel.id,
HumanTaskModel.completed.is_(False), # type: ignore
),
).join(
HumanTaskUserModel,
and_(HumanTaskUserModel.human_task_id == HumanTaskModel.id, HumanTaskUserModel.user_id == user.id),
)
user_group_assignment_for_lane_assignment = aliased(UserGroupAssignmentModel)
process_instance_query = process_instance_query.outerjoin( # type: ignore
user_group_assignment_for_lane_assignment,
and_(
user_group_assignment_for_lane_assignment.group_id == HumanTaskModel.lane_assignment_id,
user_group_assignment_for_lane_assignment.user_id == user.id,
),
).filter(
# it should show up in your "Waiting for me" list IF:
# 1) task is not assigned to a group OR
# 2) you are not in the group
# In the case of number 2, it probably means you were added to the task individually by an admin
or_(
HumanTaskModel.lane_assignment_id.is_(None), # type: ignore
user_group_assignment_for_lane_assignment.group_id.is_(None),
)
)
return process_instance_query
@classmethod
def filter_by_instances_with_tasks_completed_by_me(
cls,
process_instance_query: Query,
user: UserModel,
) -> Query:
process_instance_query = process_instance_query.filter(ProcessInstanceModel.process_initiator_id != user.id)
process_instance_query = process_instance_query.join( # type: ignore
HumanTaskModel,
and_(
HumanTaskModel.process_instance_id == ProcessInstanceModel.id,
HumanTaskModel.completed_by_user_id == user.id,
),
)
return process_instance_query
@classmethod
def add_where_clauses_for_process_instance_metadata_filters(
cls,
process_instance_query: Query,
report_metadata: ReportMetadata,
user: UserModel | None = None,
page: int = 1,
per_page: int = 100,
) -> dict:
process_instance_query = ProcessInstanceModel.query
# Always join that hot user table for good performance at serialization time.
process_instance_query = process_instance_query.options(selectinload(ProcessInstanceModel.process_initiator))
filters = report_metadata["filter_by"]
restrict_human_tasks_to_user = None
for value in cls.check_filter_value(filters, "process_model_identifier"):
process_model = ProcessModelService.get_process_model(
f"{value}",
)
process_instance_query = process_instance_query.filter_by(process_model_identifier=process_model.id)
# this can never happen. obviously the class has the columns it defines. this is just to appease mypy.
if ProcessInstanceModel.start_in_seconds is None or ProcessInstanceModel.end_in_seconds is None:
raise (
ApiError(
error_code="unexpected_condition",
message="Something went very wrong",
status_code=500,
)
)
for value in cls.check_filter_value(filters, "start_from"):
process_instance_query = process_instance_query.filter(ProcessInstanceModel.start_in_seconds >= value)
for value in cls.check_filter_value(filters, "start_to"):
process_instance_query = process_instance_query.filter(ProcessInstanceModel.start_in_seconds <= value)
for value in cls.check_filter_value(filters, "end_from"):
process_instance_query = process_instance_query.filter(ProcessInstanceModel.end_in_seconds >= value)
for value in cls.check_filter_value(filters, "end_to"):
process_instance_query = process_instance_query.filter(ProcessInstanceModel.end_in_seconds <= value)
process_status = cls.get_filter_value(filters, "process_status")
if process_status is not None:
process_instance_query = process_instance_query.filter(
ProcessInstanceModel.status.in_(process_status.split(",")) # type: ignore
)
has_active_status = cls.get_filter_value(filters, "has_active_status")
if has_active_status:
process_instance_query = process_instance_query.filter(
ProcessInstanceModel.status.in_(ProcessInstanceModel.active_statuses()) # type: ignore
)
for value in cls.check_filter_value(filters, "process_initiator_username"):
initiator = UserModel.query.filter_by(username=value).first()
process_initiator_id = -1
if initiator:
process_initiator_id = initiator.id
process_instance_query = process_instance_query.filter_by(process_initiator_id=process_initiator_id)
instances_with_tasks_completed_by_me = cls.get_filter_value(filters, "instances_with_tasks_completed_by_me")
instances_with_tasks_waiting_for_me = cls.get_filter_value(filters, "instances_with_tasks_waiting_for_me")
user_group_identifier = cls.get_filter_value(filters, "user_group_identifier")
# builtin only - for the for-me paths
with_relation_to_me = cls.get_filter_value(filters, "with_relation_to_me")
if (
not instances_with_tasks_completed_by_me
and not user_group_identifier
and not instances_with_tasks_waiting_for_me
and with_relation_to_me is True
):
if user is None:
raise ProcessInstanceReportCannotBeRunError("A user must be specified to run report with with_relation_to_me")
process_instance_query = process_instance_query.outerjoin(HumanTaskModel).outerjoin(
HumanTaskUserModel,
and_(
HumanTaskModel.id == HumanTaskUserModel.human_task_id,
HumanTaskUserModel.user_id == user.id,
),
)
process_instance_query = process_instance_query.filter(
or_(
HumanTaskUserModel.id.is_not(None),
ProcessInstanceModel.process_initiator_id == user.id,
)
)
if instances_with_tasks_completed_by_me is True and instances_with_tasks_waiting_for_me is True:
raise ProcessInstanceReportMetadataInvalidError(
"Cannot set both 'instances_with_tasks_completed_by_me' and 'instances_with_tasks_waiting_for_me' to"
" true. You must choose one."
)
# ensure we only join with HumanTaskModel once
human_task_already_joined = False
if instances_with_tasks_completed_by_me is True:
if user is None:
raise ProcessInstanceReportCannotBeRunError(
"A user must be specified to run report with instances_with_tasks_completed_by_me."
)
process_instance_query = process_instance_query.filter(ProcessInstanceModel.process_initiator_id != user.id)
process_instance_query = process_instance_query.join(
HumanTaskModel,
and_(
HumanTaskModel.process_instance_id == ProcessInstanceModel.id,
HumanTaskModel.completed_by_user_id == user.id,
),
)
human_task_already_joined = True
# this excludes some tasks you can complete, because that's the way the requirements were described.
# if it's assigned to one of your groups, it does not get returned by this query.
if instances_with_tasks_waiting_for_me is True:
if user is None:
raise ProcessInstanceReportCannotBeRunError(
"A user must be specified to run report with instances_with_tasks_waiting_for_me."
)
process_instance_query = process_instance_query.filter(ProcessInstanceModel.process_initiator_id != user.id)
process_instance_query = process_instance_query.join(
HumanTaskModel,
and_(
HumanTaskModel.process_instance_id == ProcessInstanceModel.id,
HumanTaskModel.completed.is_(False), # type: ignore
),
).join(
HumanTaskUserModel,
and_(HumanTaskUserModel.human_task_id == HumanTaskModel.id, HumanTaskUserModel.user_id == user.id),
)
user_group_assignment_for_lane_assignment = aliased(UserGroupAssignmentModel)
process_instance_query = process_instance_query.outerjoin(
user_group_assignment_for_lane_assignment,
and_(
user_group_assignment_for_lane_assignment.group_id == HumanTaskModel.lane_assignment_id,
user_group_assignment_for_lane_assignment.user_id == user.id,
),
).filter(
# it should show up in your "Waiting for me" list IF:
# 1) task is not assigned to a group OR
# 2) you are not in the group
# In the case of number 2, it probably means you were added to the task individually by an admin
or_(
HumanTaskModel.lane_assignment_id.is_(None), # type: ignore
user_group_assignment_for_lane_assignment.group_id.is_(None),
)
)
human_task_already_joined = True
restrict_human_tasks_to_user = user
if user_group_identifier is not None:
if user is None:
raise ProcessInstanceReportCannotBeRunError("A user must be specified to run report with a group identifier.")
process_instance_query = cls.filter_by_user_group_identifier(
process_instance_query=process_instance_query,
user_group_identifier=user_group_identifier,
user=user,
human_task_already_joined=human_task_already_joined,
process_status=process_status,
instances_with_tasks_waiting_for_me=instances_with_tasks_waiting_for_me,
)
instance_metadata_aliases = {}
if report_metadata["columns"] is None or len(report_metadata["columns"]) < 1:
report_metadata["columns"] = cls.builtin_column_options()
instance_metadata_aliases: dict[str, Any],
) -> Query:
for column in report_metadata["columns"]:
if column["accessor"] in cls.non_metadata_columns():
continue
@ -634,10 +537,17 @@ class ProcessInstanceReportService:
join_conditions.append(
or_(instance_metadata_alias.value.is_not(None), instance_metadata_alias.value != "")
)
process_instance_query = process_instance_query.join(
process_instance_query = process_instance_query.join( # type: ignore
instance_metadata_alias, and_(*join_conditions), isouter=isouter
).add_columns(func.max(instance_metadata_alias.value).label(column["accessor"]))
return process_instance_query
@classmethod
def generate_order_by_query_array(
cls,
report_metadata: ReportMetadata,
instance_metadata_aliases: dict[str, Any],
) -> list:
order_by_query_array = []
order_by_array = report_metadata["order_by"]
if len(order_by_array) < 1:
@ -654,9 +564,156 @@ class ProcessInstanceReportService:
order_by_query_array.append(func.max(instance_metadata_aliases[attribute].value).desc())
else:
order_by_query_array.append(func.max(instance_metadata_aliases[attribute].value).asc())
return order_by_query_array
@classmethod
def get_basic_query(
cls,
filters: list[FilterValue],
) -> Query:
process_instance_query: Query = ProcessInstanceModel.query
# Always join that hot user table for good performance at serialization time.
process_instance_query = process_instance_query.options(selectinload(ProcessInstanceModel.process_initiator))
for value in cls.check_filter_value(filters, "process_model_identifier"):
process_model = ProcessModelService.get_process_model(
f"{value}",
)
process_instance_query = process_instance_query.filter_by(process_model_identifier=process_model.id)
# this can never happen. obviously the class has the columns it defines. this is just to appease mypy.
if ProcessInstanceModel.start_in_seconds is None or ProcessInstanceModel.end_in_seconds is None:
raise (
ApiError(
error_code="unexpected_condition",
message="Something went very wrong",
status_code=500,
)
)
for value in cls.check_filter_value(filters, "start_from"):
process_instance_query = process_instance_query.filter(ProcessInstanceModel.start_in_seconds >= value)
for value in cls.check_filter_value(filters, "start_to"):
process_instance_query = process_instance_query.filter(ProcessInstanceModel.start_in_seconds <= value)
for value in cls.check_filter_value(filters, "end_from"):
process_instance_query = process_instance_query.filter(ProcessInstanceModel.end_in_seconds >= value)
for value in cls.check_filter_value(filters, "end_to"):
process_instance_query = process_instance_query.filter(ProcessInstanceModel.end_in_seconds <= value)
has_active_status = cls.get_filter_value(filters, "has_active_status")
if has_active_status:
process_instance_query = process_instance_query.filter(
ProcessInstanceModel.status.in_(ProcessInstanceModel.active_statuses()) # type: ignore
)
for value in cls.check_filter_value(filters, "process_initiator_username"):
initiator = UserModel.query.filter_by(username=value).first()
process_initiator_id = -1
if initiator:
process_initiator_id = initiator.id
process_instance_query = process_instance_query.filter_by(process_initiator_id=process_initiator_id)
return process_instance_query
@classmethod
def run_process_instance_report(
cls,
report_metadata: ReportMetadata,
user: UserModel | None = None,
page: int = 1,
per_page: int = 100,
) -> dict:
restrict_human_tasks_to_user = None
filters = report_metadata["filter_by"]
process_instance_query = cls.get_basic_query(filters)
process_status = cls.get_filter_value(filters, "process_status")
if process_status is not None:
process_instance_query = process_instance_query.filter(
ProcessInstanceModel.status.in_(process_status.split(",")) # type: ignore
)
instances_with_tasks_completed_by_me = cls.get_filter_value(filters, "instances_with_tasks_completed_by_me")
instances_with_tasks_waiting_for_me = cls.get_filter_value(filters, "instances_with_tasks_waiting_for_me")
user_group_identifier = cls.get_filter_value(filters, "user_group_identifier")
# builtin only - for the for-me paths
with_relation_to_me = cls.get_filter_value(filters, "with_relation_to_me")
if (
not instances_with_tasks_completed_by_me
and not user_group_identifier
and not instances_with_tasks_waiting_for_me
and with_relation_to_me is True
):
if user is None:
raise ProcessInstanceReportCannotBeRunError("A user must be specified to run report with with_relation_to_me")
process_instance_query = process_instance_query.outerjoin(HumanTaskModel).outerjoin( # type: ignore
HumanTaskUserModel,
and_(
HumanTaskModel.id == HumanTaskUserModel.human_task_id,
HumanTaskUserModel.user_id == user.id,
),
)
process_instance_query = process_instance_query.filter(
or_(
HumanTaskUserModel.id.is_not(None),
ProcessInstanceModel.process_initiator_id == user.id,
)
)
if instances_with_tasks_completed_by_me is True and instances_with_tasks_waiting_for_me is True:
raise ProcessInstanceReportMetadataInvalidError(
"Cannot set both 'instances_with_tasks_completed_by_me' and 'instances_with_tasks_waiting_for_me' to"
" true. You must choose one."
)
# ensure we only join with HumanTaskModel once
human_task_already_joined = False
if instances_with_tasks_completed_by_me is True:
if user is None:
raise ProcessInstanceReportCannotBeRunError(
"A user must be specified to run report with instances_with_tasks_completed_by_me."
)
process_instance_query = cls.filter_by_instances_with_tasks_completed_by_me(process_instance_query, user)
human_task_already_joined = True
# this excludes some tasks you can complete, because that's the way the requirements were described.
# if it's assigned to one of your groups, it does not get returned by this query.
if instances_with_tasks_waiting_for_me is True:
if user is None:
raise ProcessInstanceReportCannotBeRunError(
"A user must be specified to run report with instances_with_tasks_waiting_for_me."
)
human_task_already_joined = True
restrict_human_tasks_to_user = user
process_instance_query = cls.filter_by_instances_with_tasks_waiting_for_me(
process_instance_query=process_instance_query,
user=user,
)
if user_group_identifier is not None:
if user is None:
raise ProcessInstanceReportCannotBeRunError("A user must be specified to run report with a group identifier.")
process_instance_query = cls.filter_by_user_group_identifier(
process_instance_query=process_instance_query,
user_group_identifier=user_group_identifier,
user=user,
human_task_already_joined=human_task_already_joined,
process_status=process_status,
instances_with_tasks_waiting_for_me=instances_with_tasks_waiting_for_me,
)
instance_metadata_aliases: dict[str, Any] = {}
if report_metadata["columns"] is None or len(report_metadata["columns"]) < 1:
report_metadata["columns"] = cls.builtin_column_options()
process_instance_query = cls.add_where_clauses_for_process_instance_metadata_filters(
process_instance_query, report_metadata, instance_metadata_aliases
)
order_by_query_array = cls.generate_order_by_query_array(report_metadata, instance_metadata_aliases)
process_instances = (
process_instance_query.group_by(ProcessInstanceModel.id)
process_instance_query.group_by(ProcessInstanceModel.id) # type: ignore
.add_columns(ProcessInstanceModel.id)
.order_by(*order_by_query_array)
.paginate(page=page, per_page=per_page, error_out=False)