Vastai-ConnectHub/app/security/fernet.py

64 lines
1.7 KiB
Python

from __future__ import annotations
import json
import os
from typing import Any
from cryptography.fernet import Fernet, InvalidToken
from app.core.config import settings
def _ensure_parent_dir(path: str) -> None:
parent = os.path.dirname(path)
if parent:
os.makedirs(parent, exist_ok=True)
def get_or_create_fernet_key(path: str | None = None) -> bytes:
key_path = path or settings.fernet_key_path
_ensure_parent_dir(key_path)
if os.path.exists(key_path):
with open(key_path, "rb") as f:
return f.read().strip()
key = Fernet.generate_key()
# best-effort set 0o600 (not always supported on some FS)
try:
flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
fd = os.open(key_path, flags, 0o600)
with os.fdopen(fd, "wb") as f:
f.write(key)
f.write(b"\n")
except FileExistsError:
# race: another process wrote it
with open(key_path, "rb") as f:
return f.read().strip()
except OSError:
with open(key_path, "wb") as f:
f.write(key)
f.write(b"\n")
return key
def _fernet() -> Fernet:
return Fernet(get_or_create_fernet_key())
def encrypt_json(obj: dict[str, Any]) -> str:
data = json.dumps(obj, ensure_ascii=False, separators=(",", ":"), sort_keys=True).encode("utf-8")
return _fernet().encrypt(data).decode("utf-8")
def decrypt_json(token: str) -> dict[str, Any]:
if not token:
return {}
try:
raw = _fernet().decrypt(token.encode("utf-8"))
except InvalidToken as e:
raise ValueError("Invalid secret_cfg token (Fernet)") from e
return json.loads(raw.decode("utf-8"))