115 lines
3.9 KiB
Python
115 lines
3.9 KiB
Python
from __future__ import annotations
|
||
|
||
import re
|
||
from pathlib import Path
|
||
from typing import Dict, List, Tuple
|
||
|
||
import pandas as pd
|
||
|
||
from ad_user_creator.exceptions import InputValidationError
|
||
from ad_user_creator.models import UserInputRecord
|
||
|
||
REQUIRED_HEADERS = ["姓名", "用户名", "邮箱", "部门 OU", "基础组", "项目组", "资源组"]
|
||
USERNAME_PATTERN = re.compile(r"^[A-Za-z0-9_-]+$")
|
||
EMAIL_PATTERN = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
|
||
|
||
|
||
def _split_groups(value: object) -> List[str]:
|
||
if value is None:
|
||
return []
|
||
text = str(value).strip()
|
||
if not text or text.lower() == "nan":
|
||
return []
|
||
normalized = text.replace(",", ",")
|
||
groups = [item.strip() for item in normalized.split(",") if item.strip()]
|
||
deduped: List[str] = []
|
||
seen = set()
|
||
for group in groups:
|
||
if group not in seen:
|
||
deduped.append(group)
|
||
seen.add(group)
|
||
return deduped
|
||
|
||
|
||
def _read_table(input_path: str) -> pd.DataFrame:
|
||
file_path = Path(input_path)
|
||
if not file_path.exists():
|
||
raise InputValidationError(f"输入文件不存在: {input_path}")
|
||
suffix = file_path.suffix.lower()
|
||
if suffix not in {".csv", ".xlsx"}:
|
||
raise InputValidationError(f"仅支持 .csv 和 .xlsx,当前为: {suffix}")
|
||
|
||
if suffix == ".csv":
|
||
try:
|
||
return pd.read_csv(file_path, encoding="utf-8-sig")
|
||
except UnicodeDecodeError:
|
||
return pd.read_csv(file_path, encoding="utf-8")
|
||
return pd.read_excel(file_path, engine="openpyxl")
|
||
|
||
|
||
def _validate_headers(df: pd.DataFrame) -> None:
|
||
missing = [header for header in REQUIRED_HEADERS if header not in df.columns]
|
||
if missing:
|
||
raise InputValidationError(f"输入文件缺少列: {', '.join(missing)}")
|
||
|
||
|
||
def parse_input_file(input_path: str) -> List[Tuple[UserInputRecord, Dict[str, str]]]:
|
||
df = _read_table(input_path)
|
||
_validate_headers(df)
|
||
df = df.fillna("")
|
||
|
||
parsed: List[Tuple[UserInputRecord, Dict[str, str]]] = []
|
||
for index, row in df.iterrows():
|
||
line_no = index + 2
|
||
display_name = str(row["姓名"]).strip()
|
||
sam_account_name = str(row["用户名"]).strip()
|
||
email = str(row["邮箱"]).strip()
|
||
dept_ou = str(row["部门 OU"]).strip()
|
||
base_group = str(row["基础组"]).strip()
|
||
project_groups = _split_groups(row["项目组"])
|
||
resource_groups = _split_groups(row["资源组"])
|
||
|
||
required_missing = []
|
||
if not display_name:
|
||
required_missing.append("姓名")
|
||
if not sam_account_name:
|
||
required_missing.append("用户名")
|
||
if not email:
|
||
required_missing.append("邮箱")
|
||
if not dept_ou:
|
||
required_missing.append("部门 OU")
|
||
if not base_group:
|
||
required_missing.append("基础组")
|
||
if required_missing:
|
||
raise InputValidationError(
|
||
f"第 {line_no} 行缺少必填字段: {', '.join(required_missing)}"
|
||
)
|
||
if not USERNAME_PATTERN.match(sam_account_name):
|
||
raise InputValidationError(
|
||
f"第 {line_no} 行用户名非法: {sam_account_name},只允许字母数字下划线短横线"
|
||
)
|
||
if not EMAIL_PATTERN.match(email):
|
||
raise InputValidationError(f"第 {line_no} 行邮箱格式非法: {email}")
|
||
|
||
record = UserInputRecord(
|
||
display_name=display_name,
|
||
sam_account_name=sam_account_name,
|
||
email=email,
|
||
dept_ou=dept_ou,
|
||
base_group=base_group,
|
||
project_groups=project_groups,
|
||
resource_groups=resource_groups,
|
||
)
|
||
raw = {
|
||
"姓名": display_name,
|
||
"用户名": sam_account_name,
|
||
"邮箱": email,
|
||
"部门 OU": dept_ou,
|
||
"基础组": base_group,
|
||
"项目组": ",".join(project_groups),
|
||
"资源组": ",".join(resource_groups),
|
||
}
|
||
parsed.append((record, raw))
|
||
|
||
return parsed
|