2026-04-24
Session 11: Phase 9 finish: data sources, deployments, cancel, alembic
Engineering log for session 11.
Baseline: session 10 landed the runs subsystem with live event streaming. The server could execute runs but had no way to ingest real data, no way to serve a trained model, no way to cancel, and no proper migration story.
Theme: owner: "lets continue with our roadmap development as per agreed and logical sequence." Close Phase 9 so the backend is feature-complete before the frontend starts.
ADDED — data sources (CSV upload + S3/Postgres connectors)#
ADDED—pycaret_server/api/data_sources.py— new router, 5 endpoints:POST /api/v1/workspaces/{ws_id}/data-sources/upload— streams a multipart CSV to${ARTIFACT_DIR}/data-sources/<uuid>.csv, enforces a 64 MB cap as it goes (no "copy to disk then reject"), computes SHA-256 in the same pass, samples withpd.read_csv(nrows=5)for column metadata, records row count via a line scan. Returns the DataSource row.POST /api/v1/workspaces/{ws_id}/data-sources— register ans3orpostgresconnector config (no connectivity check — deferred to dispatch time).GET /api/v1/workspaces/{ws_id}/data-sources— list.GET /api/v1/data-sources/{id}— fetch.DELETE /api/v1/data-sources/{id}— delete + unlink the uploaded file whenkind="csv_upload".
ADDED—pycaret_server/runs/plans.py::load_csv(path)— tiny loader called from the orchestrator's_load_datawhenRunSpec.data_source_pathis set.CHANGED—pycaret_server/runs/orchestrator.py::RunSpecgainsdata_source_path+target_override;_load_datapicks the right source based on which field is populated.CHANGED—pycaret_server/api/runs.py::submit_runresolvesdata_source_idagainst the DataSource table, rejects cross-workspace references with 400, rejects non-csv_uploadkinds with 400 (for now), snapshots both the effective target and the data_source_id intoRun.snapshot.CHANGED—pycaret_server/api/schemas.py::RunCreateaddsdata_source_id+targetfields; existingsklearn_dataset/data_inlineunchanged.
ADDED — deployments + in-house serving#
ADDED—pycaret_server/serving.py—DeploymentRegistry, an in-process thread-safe slug→pipeline cache. Pipelines are loaded on first prediction via cloudpickle (with pickle fallback) and evicted on deployment delete. Rolling 100-sample latency window tracks p50/p95 without adding a histogram dep. Module-level singleton withreset_registry()for test fixtures.ADDED—pycaret_server/api/deployments.py— new router, 9 endpoints:POST /api/v1/runs/{run_id}/promote— validates the run succeeded + has apipeline_pickleartifact; creates a workspace-scopedpipelinesrow pointing at the artifact path (reuses the SHA-256 that the orchestrator stamped).GET /api/v1/workspaces/{ws_id}/pipelines/GET /api/v1/pipelines/{id}— list + fetch.DELETE /api/v1/pipelines/{id}— refuses with 409 when anyDeploymentstill references the pipeline (mirrors the FK'sON DELETE RESTRICT).POST /api/v1/pipelines/{id}/deployments— create a Deployment. Validatesendpoint_slugagainst^[a-z0-9][a-z0-9-]{1,62}[a-z0-9]$andauth_modeagainstworkspace|api-key|public. Global uniqueness on slug.GET /api/v1/workspaces/{ws_id}/deployments/GET /api/v1/deployments/{id}— list + fetch (with live p50/p95).DELETE /api/v1/deployments/{id}— evicts the registry entry + deletes the row.POST /api/v1/deployments/{slug}/predict— the serving endpoint.workspaceauth_mode for v1 (api-key and public are schema-reserved but not enforceable yet). Ticksinference_count+last_inference_at+p50_latency_ms+p95_latency_mson every request; errors tickerror_count. Request contract:{"rows": [{...}, ...]}in,{"deployment_id", "endpoint_slug", "predictions": [{"index", "prediction"}, ...], "latency_ms", "request_id"}out.
ADDED — run cancellation#
ADDED—pycaret_server/runs/orchestrator.py:_CancelledError— private exception mapped toRun.status = "cancelled".RunOrchestrator.cancel(run_id)— thread-safe setter on a per-runthreading.Eventstored inself._cancel_events._checkpoint()closure in_execute— polls the event at stage boundaries (pre-load, post-load, post-fit, post-plan). Raises_CancelledErrorwhen set._cleanup()done-callback now pops both the Future and the cancel event.
ADDED—POST /api/v1/runs/{id}/cancelroute — returns current row; no-op when already terminal.
ADDED — Alembic baseline + schema bootstrap#
ADDED—pycaret-server/alembic.ini— script location, file template (date + slug), UTC timezone, no post-write hooks (we runruff formatourselves).ADDED—pycaret_server/migrations/env.py— pullsdatabase_urlfromget_settings()(overridable viaALEMBIC_URL),target_metadata = Base.metadata,render_as_batch=Trueon SQLite,compare_type=True+compare_server_default=Trueso autogenerate catches column type changes.ADDED—pycaret_server/migrations/script.py.mako— typed PEP-604 template (str | None) compatible with Python 3.11+.ADDED—pycaret_server/migrations/versions/20260424_0213_9f9b7c770df0_baseline_schema.py— autogen-produced baseline. Creates all 14 tables + 24 indexes + 5 unique constraints + alembic_version. Confirmed end-to-end: fresh SQLitealembic upgrade headyields 15 tables.ADDED—pycaret_server/db/bootstrap.py::ensure_schema(engine, dev_auto_migrate=True)— the bridge between dev SQLite (one-command workflow) and prod (explicit migrations). Ifalembic_versionis present → no-op. Else if a legacycreate_all-seeded DB is detected (hasuserstable, noalembic_version) → auto-stamp to baseline. Else if empty +dev_auto_migrate=True→alembic upgrade head. Else raise (prod safeguard).ADDED—pycaret-server migrate [--url ...] [--revision head]— CLI subcommand that callsalembic.command.upgradewith the live engine URL, so ops can deploy without a vendored alembic CLI.CHANGED—pycaret_server/app.py::_lifespan— now callsensure_schemainstead ofBase.metadata.create_all. Also resets theDeploymentRegistrysingleton in thefinally:block to stop reload mode from caching stale pipelines across processes.
TESTS#
TESTS—tests/test_phase9_finish.py— 10 new integration tests, ~350 LOC:- Data sources:
test_csv_upload_and_run_from_it(upload an iris CSV, submit acreaterun that reads from it, assert succeeded),test_register_s3_connector(happy path + 2 bad shapes),test_data_source_delete_cleans_file. - Run cancel:
test_cancel_queued_run(cancel mid-flight; outcome iscancelledorsucceededif the worker raced past every checkpoint),test_cancel_terminal_run_is_noop. - Deployments:
test_promote_run_and_serve_predictions(the full curl flow: train → promote → deploy → predict × 2 rows → verify inference_count=2 + p50 non-null),test_promote_rejects_unfinished_run,test_delete_pipeline_with_active_deployment_fails(409),test_deployment_slug_collision(409) + bad-format (400). - Alembic:
test_alembic_baseline_creates_schema— subprocess-invokesalembic upgrade headon a fresh SQLite, asserts 15-table result set.
- Data sources:
TESTS— Server suite total: 30/30 green in ~55 s (14 + 6 + 10).TESTS— Combined engine + server: 62/62 green.
INTERNAL#
INTERNAL— Route style — data-sources'upload_csvuses FastAPI'sAnnotated[UploadFile, File()]/Annotated[str, Form()]form (the modern recommendation), not the deprecatedfile: UploadFile = File(...)default-arg form. Ruff B008 caught the latter; picked up as a style contract for future file-upload endpoints.INTERNAL— No Alembic in CI yet — the server suite still usesBase.metadata.create_allin its per-test fixture (fast path — ~40 ms vs alembic's ~700 ms). Thetest_alembic_baseline_creates_schematest is the single canary that the baseline migration actually applies; it runsalembic upgrade headin a subprocess to keep the inner test fast.INTERNAL— Cancellation semantics — cancellation is cooperative, not preemptive. An engine verb that's deep inside sklearn code (a longcompare_models, say) cannot be interrupted mid-fit. The_checkpoint()calls catch cancellation between verbs, which is enough for the UI to feel responsive on a multi-stage plan but not a substitute for a real signal-handling worker.
Session 11 delta summary#
| Metric | Session 10 end | Session 11 end |
|---|---|---|
| Server LOC | ~2,400 | ~3,600 (+1,200) |
| API routes | 26 + 1 WS | 39 + 1 WS |
| Server tests | 20 | 30 |
| Combined tests | 52 | 62 |
| Alembic revisions | 0 | 1 baseline |