58 lines
1.6 KiB
Python
58 lines
1.6 KiB
Python
from __future__ import annotations
|
|
|
|
from sqlalchemy import select
|
|
|
|
from app.db.engine import get_session
|
|
from app.db.models import Permission, Role, User, role_permissions, user_roles
|
|
from app.security.session import get_current_user
|
|
|
|
|
|
def _load_user_permissions(session, user: User) -> set[str]:
|
|
stmt = (
|
|
select(Permission.code)
|
|
.select_from(Permission)
|
|
.join(role_permissions, Permission.id == role_permissions.c.permission_id)
|
|
.join(Role, Role.id == role_permissions.c.role_id)
|
|
.join(user_roles, Role.id == user_roles.c.role_id)
|
|
.where(user_roles.c.user_id == user.id)
|
|
)
|
|
return {code for code in session.scalars(stmt)}
|
|
|
|
|
|
def user_has_permission(session, user: User, code: str) -> bool:
|
|
if user.is_superuser:
|
|
return True
|
|
if not code:
|
|
return True
|
|
perms = _load_user_permissions(session, user)
|
|
return code in perms
|
|
|
|
|
|
def request_has_permission(request, code: str) -> bool:
|
|
user = get_current_user(request)
|
|
if not user:
|
|
return False
|
|
if user.is_superuser:
|
|
return True
|
|
if not code:
|
|
return True
|
|
|
|
if hasattr(request.state, "permissions"):
|
|
perms = request.state.permissions
|
|
else:
|
|
db = get_session()
|
|
try:
|
|
perms = _load_user_permissions(db, user)
|
|
request.state.permissions = perms
|
|
finally:
|
|
db.close()
|
|
return code in perms
|
|
|
|
|
|
def table_permission_code(table: str, action: str) -> str:
|
|
return f"table:{table}:{action}"
|
|
|
|
|
|
def button_permission_code(code: str) -> str:
|
|
return f"button:{code}"
|