from __future__ import annotations from datetime import datetime from typing import Any from sqlalchemy import select from sqlalchemy.orm import Session from app.db.models import Job, JobLog, JobStatus def get_job(session: Session, job_id: str) -> Job | None: return session.get(Job, job_id) def list_enabled_jobs(session: Session) -> list[Job]: return list(session.scalars(select(Job).where(Job.enabled.is_(True)))) def update_job_last_run_at(session: Session, job_id: str, dt: datetime) -> None: job = session.get(Job, job_id) if not job: return job.last_run_at = dt session.add(job) session.commit() def create_job_log( session: Session, *, job_id: str, status: JobStatus, snapshot_params: dict[str, Any], message: str = "", traceback: str = "", run_log: str = "", celery_task_id: str = "", attempt: int = 0, started_at: datetime | None = None, finished_at: datetime | None = None, ) -> JobLog: log = JobLog( job_id=job_id, status=status, snapshot_params=snapshot_params, message=message, traceback=traceback, run_log=run_log, celery_task_id=celery_task_id, attempt=attempt, started_at=started_at or datetime.utcnow(), finished_at=finished_at, ) session.add(log) session.commit() session.refresh(log) return log def get_job_log(session: Session, log_id: int) -> JobLog | None: return session.get(JobLog, log_id)