87 lines
2.7 KiB
Python
87 lines
2.7 KiB
Python
from __future__ import annotations
|
||
|
||
import json
|
||
import os
|
||
from typing import Any
|
||
|
||
from cryptography.fernet import Fernet, InvalidToken
|
||
|
||
from app.core.config import settings
|
||
|
||
|
||
def _ensure_parent_dir(path: str) -> None:
|
||
parent = os.path.dirname(path)
|
||
if parent:
|
||
os.makedirs(parent, exist_ok=True)
|
||
|
||
|
||
def get_or_create_fernet_key(path: str | None = None) -> bytes:
|
||
key_path = path or settings.fernet_key_path
|
||
_ensure_parent_dir(key_path)
|
||
|
||
if os.path.exists(key_path):
|
||
with open(key_path, "rb") as f:
|
||
return f.read().strip()
|
||
|
||
key = Fernet.generate_key()
|
||
# best-effort set 0o600 (not always supported on some FS)
|
||
try:
|
||
flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
|
||
fd = os.open(key_path, flags, 0o600)
|
||
with os.fdopen(fd, "wb") as f:
|
||
f.write(key)
|
||
f.write(b"\n")
|
||
except FileExistsError:
|
||
# race: another process wrote it
|
||
with open(key_path, "rb") as f:
|
||
return f.read().strip()
|
||
except OSError:
|
||
with open(key_path, "wb") as f:
|
||
f.write(key)
|
||
f.write(b"\n")
|
||
return key
|
||
|
||
|
||
def _fernet() -> Fernet:
|
||
return Fernet(get_or_create_fernet_key())
|
||
|
||
|
||
def encrypt_json(obj: dict[str, Any]) -> str:
|
||
data = json.dumps(obj, ensure_ascii=False, separators=(",", ":"), sort_keys=True).encode("utf-8")
|
||
return _fernet().encrypt(data).decode("utf-8")
|
||
|
||
|
||
def decrypt_json(token: str) -> dict[str, Any]:
|
||
if not token:
|
||
return {}
|
||
token = token.strip()
|
||
# 常见脏数据:被包了引号
|
||
if (token.startswith('"') and token.endswith('"')) or (token.startswith("'") and token.endswith("'")):
|
||
token = token[1:-1].strip()
|
||
# 常见脏数据:中间混入换行/空白(复制粘贴/渲染导致)
|
||
token = "".join(token.split())
|
||
|
||
# 兼容:历史/手工输入导致误存明文 JSON
|
||
if token.startswith("{"):
|
||
try:
|
||
obj = json.loads(token)
|
||
if isinstance(obj, dict):
|
||
return obj
|
||
except Exception:
|
||
pass
|
||
|
||
# 兼容:末尾 padding '=' 被裁剪导致 base64 解码失败(len % 4 != 0)
|
||
data_len = len(token.rstrip("="))
|
||
# base64 非 padding 字符长度为 4n+1 时不可恢复:大概率是 token 被截断/丢字符
|
||
if data_len % 4 == 1:
|
||
raise ValueError("Invalid secret_cfg token (looks truncated). Please re-save secret_cfg to re-encrypt.")
|
||
if token and (len(token) % 4) != 0:
|
||
token = token + ("=" * (-len(token) % 4))
|
||
try:
|
||
raw = _fernet().decrypt(token.encode("utf-8"))
|
||
except InvalidToken as e:
|
||
raise ValueError("Invalid secret_cfg token (Fernet)") from e
|
||
return json.loads(raw.decode("utf-8"))
|
||
|
||
|