Session 10: Run execution + event stream (Phase 9 core complete)
Engineering log for session 10.
Baseline: session 9 landed the scaffold — 29 routes, 14 tables, 14 tests, 3766 LOC added to pycaret-server/. The server could CRUD workspaces / projects / experiments but couldn't actually run a PyCaret experiment.
Theme: owner: "lets do the next major phase." Wire the engine's Experiment verbs to the platform. A client POSTs to /api/v1/experiments/{id}/runs with a plan (setup / create / compare) and a data source (sklearn_dataset or data_inline); the server enqueues a job, dispatches it to a thread-pool worker that builds the right pycaret.tasks.*Experiment, wires a DB-backed BaseLogger subclass so every engine Event becomes an events row, pickles the fitted pipeline to the artifact dir, writes the leaderboard JSON back onto the Run row, and — if anyone is listening on the WebSocket — fans out events live.
ADDED — pycaret_server/runs/ subsystem#
ADDED—pycaret_server/runs/broker.py—EventBroker, a thread-safe fan-out that bridges the worker thread's synchronousBaseLogger.emitto async WebSocket consumers. Subscribers register anasyncio.Queue;publish(run_id, event_dict)dispatches throughloop.call_soon_threadsafe(queue.put_nowait, event)so producers can live on any thread.close_run(run_id)pushes anENDsentinel to drain outstanding subscribers when a run reaches a terminal state. Module-level singletonevent_broker;clear()for test fixtures.ADDED—pycaret_server/runs/logger_bridge.py—DBEventLogger(pycaret.logging.BaseLogger). Overridesemit()to open a short-lived SQLAlchemy session, write a singleEventrow (kind + message + payload + duration_ms + emitted_at), then republish viaevent_broker.publish. Per-emit session avoids holding SQLite connections open across long-running verbs. Resolvessession_factoryviaget_session()(the defining-module function) so test monkeypatches take effect.ADDED—pycaret_server/runs/plans.py— "plan" abstraction: aPlanNameLiteral ("setup" | "create" | "compare"), aPlanOutcomedataclass (leaderboard + best_model + extra), andexecute_plan(exp, plan, *, model_id, plan_params)that maps each plan onto the right engine verb call. Alsoload_sklearn_dataset(name)covering iris / wine / breast_cancer / diabetes (no network dependency) andload_inline(rows)for JSON dict-of-lists payloads.ADDED—pycaret_server/runs/orchestrator.py—RunOrchestrator: aThreadPoolExecutor(max_workers=2)wrapped with submit / wait_for / shutdown._execute(spec)transitions the Run row throughqueued → running → succeeded|failed, loads data, instantiates the rightExperimentsubclass by TaskType, attaches theDBEventLogger, callsexp.fit(df), executes the plan, picklesoutcome.best_modelto${PYCARET_ARTIFACT_DIR}/runs/<run_id>/pipeline.pklwith a SHA-256 checksum, writes anArtifactrow, stores the leaderboard JSON + a two-key summary (rows,best) on the Run row. Every exception is captured intoRun.error; afinally:closes the event stream viaevent_broker.close_run. Module-level singleton viaget_orchestrator()/reset_orchestrator()(used in test fixtures + the app lifespan teardown).ADDED—pycaret_server/runs/__init__.py— exportsEventBroker,event_broker,RunOrchestrator,get_orchestrator.
ADDED — HTTP + WebSocket routes#
ADDED—pycaret_server/api/runs.py— a singleAPIRouter(tags=["runs"])mounted at/api/v1hosting:POST /experiments/{experiment_id}/runs— validates plan / model_id / data source, snapshots the Experiment config intoRun.snapshot, persists a queued Run, enqueues aRunSpecwith the orchestrator, returns 202.GET /experiments/{experiment_id}/runs— all runs for an experiment, newest first.GET /runs/{run_id}— status + leaderboard + metrics_summary + error.GET /runs/{run_id}/events?limit=&after_id=— paginated replay (polling clients; UI uses the WebSocket).POST /runs/{run_id}/wait?timeout_s=30— blocking wait; refreshes the Run and returns. Useful for notebooks + tests.WS /runs/{run_id}/events/ws?token=<jwt>— authenticates via query-param JWT (browser WebSocket can't set headers), replays stored events first so the client sees full history, then subscribes to the broker untilrun.closed. Non-terminal subscribers get live fan-out; late-joiners on a terminal run get the replay + sentinel then disconnect.
ADDED—pycaret_server/api/schemas.py—RunCreate,RunResponse,EventResponsePydantic models.
ADDED — app wiring#
ADDED—pycaret_server/app.pynow mounts the runs router and tears down the orchestrator in the lifespanfinallyso ephemeral worker threads stop between test runs and on graceful server shutdown.ADDED—pycaret_server/api/__init__.pyexports therunssubmodule.
TESTS#
TESTS—pycaret-server/tests/test_runs.py— 6 new integration tests with per-test SQLite + orchestrator reset:test_submit_run_validation— covers 3 bad shapes (no data source, create without model_id, unknown plan). 400 each.test_setup_run_lifecycle— submit asetuprun on iris, block via/wait, assert status=succeeded + duration_ms > 0 + events containexperiment.startedandexperiment.fitted.test_create_run_produces_artifact— submit acreaterun withmodel_id=lron iris, block, assert the pipeline pickle exists on disk under${artifact_dir}/runs/<id>/pipeline.pkl.test_list_runs_for_experiment— empty list, submit 2 runs (iris + wine), list returns 2.test_websocket_replay_after_run_finishes— submit + wait, then open the WS; receive the event replay followed byrun.closed. Verifies the broker handles late subscribers against terminal runs.test_ws_rejects_unauth— WebSocket with no?token=is closed with code 4401.
TESTS— Server suite total: 20/20 green in ~10 s (14 existing + 6 new).TESTS— Combined engine + server: 52/52 green.
INTERNAL#
INTERNAL— Thread + asyncio bridge —EventBroker.publishis safe from worker threads because each subscriber remembers its owning event loop at subscribe-time;call_soon_threadsafedefers theQueue.put_nowaitonto the right loop. This matches FastAPI's own pattern for cross-loop wakeups.INTERNAL— Test monkeypatch safety — any module-levelfrom pycaret_server.db import session_factorywould have frozen the reference at import time and missed the per-test rebind. Switched tofrom pycaret_server.db import get_session(a wrapper function that resolvessession_factoryfrom its defining module's globals at call time) inlogger_bridge.py,orchestrator.py, andapi/runs.py. Pattern to reuse in any future module.INTERNAL— Pipeline pickling — usescloudpickle(already a PyCaret core dep) withpicklefallback. SHA-256 checksum captured in theArtifactrow for later integrity validation when loaded for serving.
Session 10 delta summary#
| Metric | Session 9 end | Session 10 end |
|---|---|---|
| Server LOC | ~1,800 | ~2,400 (+600) |
| API routes (under /api/v1) | 21 | 26 + 1 WebSocket |
| Tests in server suite | 14 | 20 |
| Total tests (engine + server) | 46 | 52 |
runs/* subsystem | 0 files | 5 files, ~580 LOC |