freeleaps-authentication/backend/infra/permission/role_handler.py

196 lines
8.0 KiB
Python
Raw Permalink Normal View History

2025-10-30 03:26:05 +00:00
from typing import Optional, List, Tuple
from fastapi.exceptions import RequestValidationError
from backend.models.permission.models import RoleDoc, PermissionDoc, UserRoleDoc
from bson import ObjectId
from datetime import datetime
class RoleHandler:
def __init__(self):
pass
async def create_role(self, role_key: str, role_name: str, role_description: Optional[str], role_level: int) -> Optional[RoleDoc]:
"""Create a new role, ensuring role_key and role_name are unique and not empty"""
if not role_key or not role_name:
raise RequestValidationError("role_key and role_name are required.")
if await RoleDoc.find_one({str(RoleDoc.role_key): role_key}) or await RoleDoc.find_one(
{str(RoleDoc.role_name): role_name}):
raise RequestValidationError("role_key or role_name has already been created.")
doc = RoleDoc(
role_key=role_key,
role_name=role_name,
role_description=role_description,
permission_ids=[],
role_level=role_level,
created_at=datetime.now(),
updated_at=datetime.now()
)
await doc.create()
return doc
async def update_role(self, role_id: str, role_key: str, role_name: str,
role_description: Optional[str], role_level: int) -> Optional[
RoleDoc]:
"""Update an existing role, ensuring role_key and role_name are unique and not empty"""
if not role_id or not role_key or not role_name:
raise RequestValidationError("role_id, role_key and role_name are required.")
doc = await RoleDoc.get(role_id)
if not doc:
raise RequestValidationError("role not found.")
if doc.is_default:
raise RequestValidationError("Default role cannot be updated.")
# Check for uniqueness (exclude self)
conflict = await RoleDoc.find_one({
"$and": [
{"_id": {"$ne": role_id}},
{"$or": [
{str(RoleDoc.role_key): role_key},
{str(RoleDoc.role_name): role_name}
]}
]
})
if conflict:
raise RequestValidationError("role_key or role_name already exists.")
doc.role_key = role_key
doc.role_name = role_name
doc.role_description = role_description
doc.role_level = role_level
doc.updated_at = datetime.now()
await doc.save()
return doc
async def create_or_update_role(self, role_key: str, role_name: str, role_level: int, custom_role_id: Optional[str], role_description: Optional[str] = None) -> Optional[RoleDoc]:
"""Create or update a role document"""
# Input validation
if not role_key or not role_name:
raise RequestValidationError("role_key and role_name are required.")
def create_new_doc():
return RoleDoc(
role_key=role_key,
role_name=role_name,
role_description=role_description,
role_level=role_level,
permission_ids=[],
created_at=datetime.now(),
updated_at=datetime.now()
)
def update_doc_fields(doc):
doc.role_key = role_key
doc.role_name = role_name
doc.role_description = role_description
doc.role_level = role_level
doc.updated_at = datetime.now()
# Check if role with this key already exists
existing_doc = await RoleDoc.find_one(
{str(RoleDoc.role_key): role_key}
)
if existing_doc:
# If role with this key already exists
if custom_role_id and str(custom_role_id) != str(existing_doc.id):
# Different ID provided - replace the document
id_conflict = await RoleDoc.get(custom_role_id)
if id_conflict:
raise RequestValidationError("Role with the provided ID already exists.")
new_doc = create_new_doc()
new_doc.id = custom_role_id
await new_doc.create()
await existing_doc.delete()
return new_doc
else:
# Same ID or no ID provided - update existing document
update_doc_fields(existing_doc)
await existing_doc.save()
return existing_doc
else:
# If no existing document with this key, create new document
new_doc = create_new_doc()
if custom_role_id:
id_conflict = await RoleDoc.get(custom_role_id)
if id_conflict:
raise RequestValidationError("Role with the provided ID already exists.")
new_doc.id = custom_role_id
await new_doc.create()
2025-10-30 03:26:05 +00:00
return new_doc
async def query_roles(self, role_key: Optional[str], role_name: Optional[str], skip: int = 0, limit: int = 10) -> \
Tuple[List[RoleDoc], int]:
"""Query roles with pagination and fuzzy search by role_key and role_name"""
query = {}
if role_key:
query["role_key"] = {"$regex": role_key, "$options": "i"}
if role_name:
query["role_name"] = {"$regex": role_name, "$options": "i"}
cursor = RoleDoc.find(query)
total = await cursor.count()
docs = await cursor.skip(skip).limit(limit).to_list()
return docs, total
async def query_roles_no_pagination(
self,
role_id: Optional[str] = None,
role_key: Optional[str] = None,
role_name: Optional[str] = None
) -> Tuple[List[RoleDoc], int]:
"""Query roles fuzzy search without pagination"""
query = {}
if role_id:
try:
query["_id"] = ObjectId(role_id) # Convert string to ObjectId for MongoDB
except Exception:
raise RequestValidationError("Invalid role_id format. Must be a valid ObjectId.")
if role_key:
query["role_key"] = {"$regex": role_key, "$options": "i"}
if role_name:
query["role_name"] = {"$regex": role_name, "$options": "i"}
cursor = RoleDoc.find(query)
total = await cursor.count()
docs = await cursor.to_list()
return docs, total
async def assign_permissions_to_role(self, role_id: str, permission_ids: List[str]) -> Optional[RoleDoc]:
"""Assign permissions to a role by updating the permission_ids field"""
if not role_id or not permission_ids:
raise RequestValidationError("role_id and permission_ids are required.")
doc = await RoleDoc.get(role_id)
if not doc:
raise RequestValidationError("Role not found.")
# Validate that all permission_ids exist in the permission collection
for permission_id in permission_ids:
permission_doc = await PermissionDoc.get(permission_id)
if not permission_doc:
raise RequestValidationError(f"Permission with id {permission_id} not found.")
# Remove duplicates from permission_ids
unique_permission_ids = list(dict.fromkeys(permission_ids))
doc.permission_ids = unique_permission_ids
doc.updated_at = datetime.now()
await doc.save()
return doc
async def delete_role(self, role_id: str) -> None:
"""Delete a role document after checking if it is referenced by any user and is not default"""
if not role_id:
raise RequestValidationError("role_id is required.")
# Check if any user references this role
user_role = await UserRoleDoc.find_one({"role_ids": str(role_id)})
if user_role:
raise RequestValidationError("Role is referenced by a user and cannot be deleted.")
doc = await RoleDoc.get(role_id)
if not doc:
raise RequestValidationError("Role not found.")
# Check if the role is default
if doc.is_default:
raise RequestValidationError("Default role cannot be deleted.")
await doc.delete()