← Back to blog
2026-04-24

Session 11: Phase 9 finish: data sources, deployments, cancel, alembic

Engineering log for session 11.

Baseline: session 10 landed the runs subsystem with live event streaming. The server could execute runs but had no way to ingest real data, no way to serve a trained model, no way to cancel, and no proper migration story.

Theme: owner: "lets continue with our roadmap development as per agreed and logical sequence." Close Phase 9 so the backend is feature-complete before the frontend starts.

ADDED — data sources (CSV upload + S3/Postgres connectors)#

  • ADDEDpycaret_server/api/data_sources.py — new router, 5 endpoints:
    • POST /api/v1/workspaces/{ws_id}/data-sources/upload — streams a multipart CSV to ${ARTIFACT_DIR}/data-sources/<uuid>.csv, enforces a 64 MB cap as it goes (no "copy to disk then reject"), computes SHA-256 in the same pass, samples with pd.read_csv(nrows=5) for column metadata, records row count via a line scan. Returns the DataSource row.
    • POST /api/v1/workspaces/{ws_id}/data-sources — register an s3 or postgres connector config (no connectivity check — deferred to dispatch time).
    • GET /api/v1/workspaces/{ws_id}/data-sources — list.
    • GET /api/v1/data-sources/{id} — fetch.
    • DELETE /api/v1/data-sources/{id} — delete + unlink the uploaded file when kind="csv_upload".
  • ADDEDpycaret_server/runs/plans.py::load_csv(path) — tiny loader called from the orchestrator's _load_data when RunSpec.data_source_path is set.
  • CHANGEDpycaret_server/runs/orchestrator.py::RunSpec gains data_source_path + target_override; _load_data picks the right source based on which field is populated.
  • CHANGEDpycaret_server/api/runs.py::submit_run resolves data_source_id against the DataSource table, rejects cross-workspace references with 400, rejects non-csv_upload kinds with 400 (for now), snapshots both the effective target and the data_source_id into Run.snapshot.
  • CHANGEDpycaret_server/api/schemas.py::RunCreate adds data_source_id + target fields; existing sklearn_dataset/data_inline unchanged.

ADDED — deployments + in-house serving#

  • ADDEDpycaret_server/serving.pyDeploymentRegistry, an in-process thread-safe slug→pipeline cache. Pipelines are loaded on first prediction via cloudpickle (with pickle fallback) and evicted on deployment delete. Rolling 100-sample latency window tracks p50/p95 without adding a histogram dep. Module-level singleton with reset_registry() for test fixtures.
  • ADDEDpycaret_server/api/deployments.py — new router, 9 endpoints:
    • POST /api/v1/runs/{run_id}/promote — validates the run succeeded + has a pipeline_pickle artifact; creates a workspace-scoped pipelines row pointing at the artifact path (reuses the SHA-256 that the orchestrator stamped).
    • GET /api/v1/workspaces/{ws_id}/pipelines / GET /api/v1/pipelines/{id} — list + fetch.
    • DELETE /api/v1/pipelines/{id} — refuses with 409 when any Deployment still references the pipeline (mirrors the FK's ON DELETE RESTRICT).
    • POST /api/v1/pipelines/{id}/deployments — create a Deployment. Validates endpoint_slug against ^[a-z0-9][a-z0-9-]{1,62}[a-z0-9]$ and auth_mode against workspace|api-key|public. Global uniqueness on slug.
    • GET /api/v1/workspaces/{ws_id}/deployments / GET /api/v1/deployments/{id} — list + fetch (with live p50/p95).
    • DELETE /api/v1/deployments/{id} — evicts the registry entry + deletes the row.
    • POST /api/v1/deployments/{slug}/predict — the serving endpoint. workspace auth_mode for v1 (api-key and public are schema-reserved but not enforceable yet). Ticks inference_count + last_inference_at + p50_latency_ms + p95_latency_ms on every request; errors tick error_count. Request contract: {"rows": [{...}, ...]} in, {"deployment_id", "endpoint_slug", "predictions": [{"index", "prediction"}, ...], "latency_ms", "request_id"} out.

ADDED — run cancellation#

  • ADDEDpycaret_server/runs/orchestrator.py:
    • _CancelledError — private exception mapped to Run.status = "cancelled".
    • RunOrchestrator.cancel(run_id) — thread-safe setter on a per-run threading.Event stored in self._cancel_events.
    • _checkpoint() closure in _execute — polls the event at stage boundaries (pre-load, post-load, post-fit, post-plan). Raises _CancelledError when set.
    • _cleanup() done-callback now pops both the Future and the cancel event.
  • ADDEDPOST /api/v1/runs/{id}/cancel route — returns current row; no-op when already terminal.

ADDED — Alembic baseline + schema bootstrap#

  • ADDEDpycaret-server/alembic.ini — script location, file template (date + slug), UTC timezone, no post-write hooks (we run ruff format ourselves).
  • ADDEDpycaret_server/migrations/env.py — pulls database_url from get_settings() (overridable via ALEMBIC_URL), target_metadata = Base.metadata, render_as_batch=True on SQLite, compare_type=True + compare_server_default=True so autogenerate catches column type changes.
  • ADDEDpycaret_server/migrations/script.py.mako — typed PEP-604 template (str | None) compatible with Python 3.11+.
  • ADDEDpycaret_server/migrations/versions/20260424_0213_9f9b7c770df0_baseline_schema.py — autogen-produced baseline. Creates all 14 tables + 24 indexes + 5 unique constraints + alembic_version. Confirmed end-to-end: fresh SQLite alembic upgrade head yields 15 tables.
  • ADDEDpycaret_server/db/bootstrap.py::ensure_schema(engine, dev_auto_migrate=True) — the bridge between dev SQLite (one-command workflow) and prod (explicit migrations). If alembic_version is present → no-op. Else if a legacy create_all-seeded DB is detected (has users table, no alembic_version) → auto-stamp to baseline. Else if empty + dev_auto_migrate=Truealembic upgrade head. Else raise (prod safeguard).
  • ADDEDpycaret-server migrate [--url ...] [--revision head] — CLI subcommand that calls alembic.command.upgrade with the live engine URL, so ops can deploy without a vendored alembic CLI.
  • CHANGEDpycaret_server/app.py::_lifespan — now calls ensure_schema instead of Base.metadata.create_all. Also resets the DeploymentRegistry singleton in the finally: block to stop reload mode from caching stale pipelines across processes.

TESTS#

  • TESTStests/test_phase9_finish.py — 10 new integration tests, ~350 LOC:
    • Data sources: test_csv_upload_and_run_from_it (upload an iris CSV, submit a create run that reads from it, assert succeeded), test_register_s3_connector (happy path + 2 bad shapes), test_data_source_delete_cleans_file.
    • Run cancel: test_cancel_queued_run (cancel mid-flight; outcome is cancelled or succeeded if the worker raced past every checkpoint), test_cancel_terminal_run_is_noop.
    • Deployments: test_promote_run_and_serve_predictions (the full curl flow: train → promote → deploy → predict × 2 rows → verify inference_count=2 + p50 non-null), test_promote_rejects_unfinished_run, test_delete_pipeline_with_active_deployment_fails (409), test_deployment_slug_collision (409) + bad-format (400).
    • Alembic: test_alembic_baseline_creates_schema — subprocess-invokes alembic upgrade head on a fresh SQLite, asserts 15-table result set.
  • TESTSServer suite total: 30/30 green in ~55 s (14 + 6 + 10).
  • TESTSCombined engine + server: 62/62 green.

INTERNAL#

  • INTERNALRoute style — data-sources' upload_csv uses FastAPI's Annotated[UploadFile, File()] / Annotated[str, Form()] form (the modern recommendation), not the deprecated file: UploadFile = File(...) default-arg form. Ruff B008 caught the latter; picked up as a style contract for future file-upload endpoints.
  • INTERNALNo Alembic in CI yet — the server suite still uses Base.metadata.create_all in its per-test fixture (fast path — ~40 ms vs alembic's ~700 ms). The test_alembic_baseline_creates_schema test is the single canary that the baseline migration actually applies; it runs alembic upgrade head in a subprocess to keep the inner test fast.
  • INTERNALCancellation semantics — cancellation is cooperative, not preemptive. An engine verb that's deep inside sklearn code (a long compare_models, say) cannot be interrupted mid-fit. The _checkpoint() calls catch cancellation between verbs, which is enough for the UI to feel responsive on a multi-stage plan but not a substitute for a real signal-handling worker.

Session 11 delta summary#

MetricSession 10 endSession 11 end
Server LOC~2,400~3,600 (+1,200)
API routes26 + 1 WS39 + 1 WS
Server tests2030
Combined tests5262
Alembic revisions01 baseline