add scope to filter data
This commit is contained in:
parent
394546dc67
commit
e9dec3292c
13 changed files with 386 additions and 36 deletions
118
processes/utils.py
Normal file
118
processes/utils.py
Normal file
|
@ -0,0 +1,118 @@
|
|||
from django.shortcuts import get_object_or_404
|
||||
from .models import ProcessInstance
|
||||
from common.consts import UserRoles
|
||||
|
||||
|
||||
def scope_instances_queryset(user, queryset=None):
|
||||
"""Return a queryset of ProcessInstance scoped by the user's role.
|
||||
|
||||
If no profile/role, returns an empty queryset.
|
||||
"""
|
||||
qs = queryset if queryset is not None else ProcessInstance.objects.all()
|
||||
profile = getattr(user, 'profile', None)
|
||||
if not profile:
|
||||
return qs.none()
|
||||
try:
|
||||
if profile.has_role(UserRoles.INSTALLER):
|
||||
# Only instances assigned to this installer
|
||||
from installations.models import InstallationAssignment
|
||||
assign_ids = InstallationAssignment.objects.filter(installer=user).values_list('process_instance', flat=True)
|
||||
return qs.filter(id__in=assign_ids)
|
||||
if profile.has_role(UserRoles.BROKER):
|
||||
return qs.filter(broker=profile.broker)
|
||||
if profile.has_role(UserRoles.ACCOUNTANT) or profile.has_role(UserRoles.MANAGER):
|
||||
return qs.filter(broker__affairs__county=profile.county)
|
||||
if profile.has_role(UserRoles.ADMIN):
|
||||
return qs
|
||||
# if profile.has_role(UserRoles.WATER_RESOURCE_MANAGER) or profile.has_role(UserRoles.HEADQUARTER):
|
||||
# return qs.filter(well__county=profile.county)
|
||||
# Fallback: no special scope
|
||||
# return qs
|
||||
except Exception:
|
||||
return qs.none()
|
||||
|
||||
|
||||
def count_incomplete_instances(user):
|
||||
"""Count non-completed, non-deleted requests within the user's scope."""
|
||||
base = ProcessInstance.objects.select_related('well').filter(is_deleted=False).exclude(status='completed')
|
||||
return scope_instances_queryset(user, base).count()
|
||||
|
||||
|
||||
def user_can_access_instance(user, instance: ProcessInstance) -> bool:
|
||||
"""Check if user can access a specific instance based on scoping rules."""
|
||||
try:
|
||||
scoped = scope_instances_queryset(user, ProcessInstance.objects.filter(id=instance.id))
|
||||
return scoped.exists()
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def get_scoped_instance_or_404(request, instance_id: int) -> ProcessInstance:
|
||||
"""Return instance only if it's within the user's scope; otherwise 404.
|
||||
|
||||
Use this in any view receiving instance_id from URL to prevent URL tampering.
|
||||
"""
|
||||
base = ProcessInstance.objects.filter(is_deleted=False)
|
||||
qs = scope_instances_queryset(request.user, base)
|
||||
return get_object_or_404(qs, id=instance_id)
|
||||
|
||||
|
||||
def scope_wells_queryset(user, queryset=None):
|
||||
"""Return a queryset of Well scoped by the user's role (parity with instances)."""
|
||||
try:
|
||||
from wells.models import Well
|
||||
qs = queryset if queryset is not None else Well.objects.all()
|
||||
profile = getattr(user, 'profile', None)
|
||||
if not profile:
|
||||
return qs.none()
|
||||
if profile.has_role(UserRoles.ADMIN):
|
||||
return qs
|
||||
if profile.has_role(UserRoles.BROKER):
|
||||
return qs.filter(broker=profile.broker)
|
||||
if profile.has_role(UserRoles.ACCOUNTANT) or profile.has_role(UserRoles.MANAGER):
|
||||
return qs.filter(broker__affairs__county=profile.county)
|
||||
if profile.has_role(UserRoles.INSTALLER):
|
||||
# Wells that have instances assigned to this installer
|
||||
from installations.models import InstallationAssignment
|
||||
assign_ids = InstallationAssignment.objects.filter(installer=user).values_list('process_instance', flat=True)
|
||||
inst_qs = ProcessInstance.objects.filter(id__in=assign_ids)
|
||||
return qs.filter(process_instances__in=inst_qs).distinct()
|
||||
# Fallback
|
||||
return qs.none()
|
||||
except Exception:
|
||||
return qs.none() if 'qs' in locals() else []
|
||||
|
||||
|
||||
def scope_customers_queryset(user, queryset=None):
|
||||
"""Return a queryset of customer Profiles scoped by user's role.
|
||||
|
||||
Assumes queryset is Profiles already filtered to customers, otherwise we filter here.
|
||||
"""
|
||||
try:
|
||||
from accounts.models import Profile
|
||||
qs = queryset if queryset is not None else Profile.objects.all()
|
||||
# Ensure we're only looking at customer profiles
|
||||
from common.consts import UserRoles as UR
|
||||
qs = qs.filter(roles__slug=UR.CUSTOMER.value, is_deleted=False)
|
||||
|
||||
profile = getattr(user, 'profile', None)
|
||||
if not profile:
|
||||
return qs.none()
|
||||
if profile.has_role(UserRoles.ADMIN):
|
||||
return qs
|
||||
if profile.has_role(UserRoles.BROKER):
|
||||
return qs.filter(broker=profile.broker)
|
||||
if profile.has_role(UserRoles.ACCOUNTANT) or profile.has_role(UserRoles.MANAGER):
|
||||
return qs.filter(county=profile.county)
|
||||
if profile.has_role(UserRoles.INSTALLER):
|
||||
# Customers that are representatives of instances assigned to this installer
|
||||
from installations.models import InstallationAssignment
|
||||
assign_ids = InstallationAssignment.objects.filter(installer=user).values_list('process_instance', flat=True)
|
||||
rep_ids = ProcessInstance.objects.filter(id__in=assign_ids).values_list('representative', flat=True)
|
||||
return qs.filter(user_id__in=rep_ids)
|
||||
# Fallback
|
||||
return qs.none()
|
||||
except Exception:
|
||||
return qs.none() if 'qs' in locals() else []
|
||||
|
||||
|
Loading…
Add table
Add a link
Reference in a new issue