from __future__ import annotations from datetime import datetime from typing import Any from sqlalchemy import select from sqlalchemy.orm import Session from app.db.models import Job, JobLog, JobStatus def get_job(session: Session, job_id: str) -> Job | None: return session.get(Job, job_id) def list_enabled_jobs(session: Session) -> list[Job]: return list(session.scalars(select(Job).where(Job.enabled.is_(True)))) def update_job_last_run_at(session: Session, job_id: str, dt: datetime) -> None: job = session.get(Job, job_id) if not job: return job.last_run_at = dt session.add(job) session.commit() def create_job_log( session: Session, *, job_id: str, status: JobStatus, snapshot_params: dict[str, Any], message: str = "", traceback: str = "", run_log: str = "", celery_task_id: str = "", attempt: int = 0, started_at: datetime | None = None, finished_at: datetime | None = None, ) -> JobLog: log = JobLog( job_id=job_id, status=status, snapshot_params=snapshot_params, message=message, traceback=traceback, run_log=run_log, celery_task_id=celery_task_id, attempt=attempt, started_at=started_at or datetime.utcnow(), finished_at=finished_at, ) session.add(log) session.commit() session.refresh(log) return log def get_job_log(session: Session, log_id: int) -> JobLog | None: return session.get(JobLog, log_id) def update_job_log( session: Session, log_id: int, *, status: JobStatus | None = None, message: str | None = None, traceback: str | None = None, run_log: str | None = None, celery_task_id: str | None = None, attempt: int | None = None, finished_at: datetime | None = None, ) -> JobLog | None: log = session.get(JobLog, log_id) if not log: return None if status is not None: log.status = status if message is not None: log.message = message if traceback is not None: log.traceback = traceback if run_log is not None: log.run_log = run_log if celery_task_id is not None: log.celery_task_id = celery_task_id if attempt is not None: log.attempt = attempt if finished_at is not None: log.finished_at = finished_at session.add(log) session.commit() session.refresh(log) return log