64 lines
1.7 KiB
Python
64 lines
1.7 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
from typing import Any
|
|
|
|
from cryptography.fernet import Fernet, InvalidToken
|
|
|
|
from app.core.config import settings
|
|
|
|
|
|
def _ensure_parent_dir(path: str) -> None:
|
|
parent = os.path.dirname(path)
|
|
if parent:
|
|
os.makedirs(parent, exist_ok=True)
|
|
|
|
|
|
def get_or_create_fernet_key(path: str | None = None) -> bytes:
|
|
key_path = path or settings.fernet_key_path
|
|
_ensure_parent_dir(key_path)
|
|
|
|
if os.path.exists(key_path):
|
|
with open(key_path, "rb") as f:
|
|
return f.read().strip()
|
|
|
|
key = Fernet.generate_key()
|
|
# best-effort set 0o600 (not always supported on some FS)
|
|
try:
|
|
flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
|
|
fd = os.open(key_path, flags, 0o600)
|
|
with os.fdopen(fd, "wb") as f:
|
|
f.write(key)
|
|
f.write(b"\n")
|
|
except FileExistsError:
|
|
# race: another process wrote it
|
|
with open(key_path, "rb") as f:
|
|
return f.read().strip()
|
|
except OSError:
|
|
with open(key_path, "wb") as f:
|
|
f.write(key)
|
|
f.write(b"\n")
|
|
return key
|
|
|
|
|
|
def _fernet() -> Fernet:
|
|
return Fernet(get_or_create_fernet_key())
|
|
|
|
|
|
def encrypt_json(obj: dict[str, Any]) -> str:
|
|
data = json.dumps(obj, ensure_ascii=False, separators=(",", ":"), sort_keys=True).encode("utf-8")
|
|
return _fernet().encrypt(data).decode("utf-8")
|
|
|
|
|
|
def decrypt_json(token: str) -> dict[str, Any]:
|
|
if not token:
|
|
return {}
|
|
try:
|
|
raw = _fernet().decrypt(token.encode("utf-8"))
|
|
except InvalidToken as e:
|
|
raise ValueError("Invalid secret_cfg token (Fernet)") from e
|
|
return json.loads(raw.decode("utf-8"))
|
|
|
|
|