127 lines
4.3 KiB
Python
127 lines
4.3 KiB
Python
from __future__ import annotations
|
|
|
|
from sqlalchemy import select
|
|
|
|
from app.db.engine import get_session
|
|
from app.db.models import Permission, Role
|
|
|
|
|
|
SYSTEM_ADMIN_ROLE = "system_admin"
|
|
READONLY_VIEWER_ROLE = "readonly_viewer"
|
|
|
|
ADMIN_PERMISSION_CODES = {
|
|
"table:jobs:read",
|
|
"table:jobs:write",
|
|
"table:job_logs:read",
|
|
"table:job_logs:write",
|
|
"table:users:read",
|
|
"table:users:write",
|
|
"table:roles:read",
|
|
"table:roles:write",
|
|
"table:permissions:read",
|
|
"table:permissions:write",
|
|
"table:ldap_configs:read",
|
|
"table:ldap_configs:write",
|
|
"table:ldap_groups:read",
|
|
"table:ldap_groups:write",
|
|
"table:ldap_group_roles:read",
|
|
"table:ldap_group_roles:write",
|
|
"table:audit_logs:read",
|
|
"button:job:run",
|
|
"button:job:run_now",
|
|
"button:job:view_logs",
|
|
"button:job:disable",
|
|
"button:job:clear_logs",
|
|
"button:job:delete_with_logs",
|
|
"button:joblog:retry",
|
|
"button:ldap:sync",
|
|
"button:ldap:test",
|
|
}
|
|
|
|
READONLY_PERMISSION_CODES = {
|
|
"table:jobs:read",
|
|
"table:job_logs:read",
|
|
}
|
|
|
|
PERMISSION_DESCRIPTIONS = {
|
|
"table:jobs:read": "查看任务菜单和任务详情",
|
|
"table:jobs:write": "创建、编辑、删除任务",
|
|
"table:job_logs:read": "查看任务日志菜单和日志详情",
|
|
"table:job_logs:write": "管理任务日志",
|
|
"table:users:read": "查看用户菜单和用户详情",
|
|
"table:users:write": "管理用户",
|
|
"table:roles:read": "查看角色菜单和角色详情",
|
|
"table:roles:write": "管理角色",
|
|
"table:permissions:read": "查看权限菜单和权限详情",
|
|
"table:permissions:write": "管理权限",
|
|
"table:ldap_configs:read": "查看 LDAP 配置",
|
|
"table:ldap_configs:write": "管理 LDAP 配置",
|
|
"table:ldap_groups:read": "查看 LDAP 组",
|
|
"table:ldap_groups:write": "管理 LDAP 组",
|
|
"table:ldap_group_roles:read": "查看 LDAP 组角色映射",
|
|
"table:ldap_group_roles:write": "管理 LDAP 组角色映射",
|
|
"table:audit_logs:read": "查看审计日志",
|
|
"button:job:run": "从任务列表立即运行任务",
|
|
"button:job:run_now": "通过 SQLAdmin 动作立即运行任务",
|
|
"button:job:view_logs": "从任务查看关联日志",
|
|
"button:job:disable": "停用任务",
|
|
"button:job:clear_logs": "清理任务日志",
|
|
"button:job:delete_with_logs": "删除任务及日志",
|
|
"button:joblog:retry": "重试任务日志",
|
|
"button:ldap:sync": "同步 LDAP 用户角色",
|
|
"button:ldap:test": "测试 LDAP 连接",
|
|
}
|
|
|
|
|
|
def _ensure_permissions(session, codes: set[str]) -> dict[str, Permission]:
|
|
existing = {p.code: p for p in session.scalars(select(Permission).where(Permission.code.in_(codes)))}
|
|
for code in sorted(codes):
|
|
perm = existing.get(code)
|
|
if not perm:
|
|
perm = Permission(code=code, description=PERMISSION_DESCRIPTIONS.get(code, ""))
|
|
session.add(perm)
|
|
session.flush()
|
|
existing[code] = perm
|
|
elif not perm.description and code in PERMISSION_DESCRIPTIONS:
|
|
perm.description = PERMISSION_DESCRIPTIONS[code]
|
|
session.add(perm)
|
|
return existing
|
|
|
|
|
|
def _ensure_role(session, *, name: str, description: str, permission_codes: set[str]) -> Role:
|
|
role = session.scalar(select(Role).where(Role.name == name))
|
|
if not role:
|
|
role = Role(name=name, description=description)
|
|
session.add(role)
|
|
session.flush()
|
|
elif not role.description:
|
|
role.description = description
|
|
session.add(role)
|
|
|
|
perms = _ensure_permissions(session, permission_codes)
|
|
current_codes = {p.code for p in role.permissions}
|
|
for code in sorted(permission_codes - current_codes):
|
|
role.permissions.append(perms[code])
|
|
session.add(role)
|
|
return role
|
|
|
|
|
|
def seed_standard_rbac() -> None:
|
|
db = get_session()
|
|
try:
|
|
_ensure_role(
|
|
db,
|
|
name=SYSTEM_ADMIN_ROLE,
|
|
description="系统管理员:通过角色获得完整后台管理权限",
|
|
permission_codes=ADMIN_PERMISSION_CODES,
|
|
)
|
|
_ensure_role(
|
|
db,
|
|
name=READONLY_VIEWER_ROLE,
|
|
description="只读查看者:仅可查看任务和任务日志",
|
|
permission_codes=READONLY_PERMISSION_CODES,
|
|
)
|
|
db.commit()
|
|
finally:
|
|
db.close()
|