65 lines
1.5 KiB
Python
65 lines
1.5 KiB
Python
from __future__ import annotations
|
|
|
|
from datetime import datetime
|
|
from typing import Any
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.db.models import Job, JobLog, JobStatus
|
|
|
|
|
|
def get_job(session: Session, job_id: str) -> Job | None:
|
|
return session.get(Job, job_id)
|
|
|
|
|
|
def list_enabled_jobs(session: Session) -> list[Job]:
|
|
return list(session.scalars(select(Job).where(Job.enabled.is_(True))))
|
|
|
|
|
|
def update_job_last_run_at(session: Session, job_id: str, dt: datetime) -> None:
|
|
job = session.get(Job, job_id)
|
|
if not job:
|
|
return
|
|
job.last_run_at = dt
|
|
session.add(job)
|
|
session.commit()
|
|
|
|
|
|
def create_job_log(
|
|
session: Session,
|
|
*,
|
|
job_id: str,
|
|
status: JobStatus,
|
|
snapshot_params: dict[str, Any],
|
|
message: str = "",
|
|
traceback: str = "",
|
|
run_log: str = "",
|
|
celery_task_id: str = "",
|
|
attempt: int = 0,
|
|
started_at: datetime | None = None,
|
|
finished_at: datetime | None = None,
|
|
) -> JobLog:
|
|
log = JobLog(
|
|
job_id=job_id,
|
|
status=status,
|
|
snapshot_params=snapshot_params,
|
|
message=message,
|
|
traceback=traceback,
|
|
run_log=run_log,
|
|
celery_task_id=celery_task_id,
|
|
attempt=attempt,
|
|
started_at=started_at or datetime.utcnow(),
|
|
finished_at=finished_at,
|
|
)
|
|
session.add(log)
|
|
session.commit()
|
|
session.refresh(log)
|
|
return log
|
|
|
|
|
|
def get_job_log(session: Session, log_id: int) -> JobLog | None:
|
|
return session.get(JobLog, log_id)
|
|
|
|
|