91 lines
2.3 KiB
Python
91 lines
2.3 KiB
Python
from __future__ import annotations
|
|
|
|
from datetime import datetime, timedelta
|
|
from uuid import uuid4
|
|
|
|
from fastapi import Request, Response
|
|
from sqlalchemy import delete, select
|
|
|
|
from app.core.config import settings
|
|
from app.db.engine import get_session
|
|
from app.db.models import Session
|
|
|
|
|
|
SESSION_COOKIE_NAME = "session_id"
|
|
|
|
|
|
def _now_utc() -> datetime:
|
|
return datetime.utcnow()
|
|
|
|
|
|
def create_session(user_id: int, request: Request) -> str:
|
|
session_id = uuid4().hex
|
|
expires_at = _now_utc() + timedelta(minutes=settings.session_expiry_minutes)
|
|
db = get_session()
|
|
try:
|
|
record = Session(
|
|
id=session_id,
|
|
user_id=user_id,
|
|
expires_at=expires_at,
|
|
ip=request.client.host if request.client else "",
|
|
user_agent=request.headers.get("User-Agent", ""),
|
|
)
|
|
db.add(record)
|
|
db.commit()
|
|
finally:
|
|
db.close()
|
|
return session_id
|
|
|
|
|
|
def delete_session(session_id: str) -> None:
|
|
db = get_session()
|
|
try:
|
|
db.execute(delete(Session).where(Session.id == session_id))
|
|
db.commit()
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def set_session_cookie(response: Response, session_id: str) -> None:
|
|
response.set_cookie(
|
|
SESSION_COOKIE_NAME,
|
|
session_id,
|
|
httponly=True,
|
|
samesite="lax",
|
|
max_age=settings.session_expiry_minutes * 60,
|
|
)
|
|
|
|
|
|
def clear_session_cookie(response: Response) -> None:
|
|
response.delete_cookie(SESSION_COOKIE_NAME)
|
|
|
|
|
|
def get_current_user(request: Request) -> User | None:
|
|
if hasattr(request.state, "user"):
|
|
return request.state.user
|
|
|
|
session_id = request.cookies.get(SESSION_COOKIE_NAME)
|
|
if not session_id:
|
|
request.state.user = None
|
|
return None
|
|
|
|
db = get_session()
|
|
try:
|
|
record = db.scalar(select(Session).where(Session.id == session_id))
|
|
if not record:
|
|
request.state.user = None
|
|
return None
|
|
if record.expires_at <= _now_utc():
|
|
db.execute(delete(Session).where(Session.id == session_id))
|
|
db.commit()
|
|
request.state.user = None
|
|
return None
|
|
user = db.get(User, record.user_id)
|
|
if not user or not user.is_active:
|
|
request.state.user = None
|
|
return None
|
|
request.state.user = user
|
|
return user
|
|
finally:
|
|
db.close()
|