← Back to blog
2026-04-24

Session 10: Run execution + event stream (Phase 9 core complete)

Engineering log for session 10.

Baseline: session 9 landed the scaffold — 29 routes, 14 tables, 14 tests, 3766 LOC added to pycaret-server/. The server could CRUD workspaces / projects / experiments but couldn't actually run a PyCaret experiment.

Theme: owner: "lets do the next major phase." Wire the engine's Experiment verbs to the platform. A client POSTs to /api/v1/experiments/{id}/runs with a plan (setup / create / compare) and a data source (sklearn_dataset or data_inline); the server enqueues a job, dispatches it to a thread-pool worker that builds the right pycaret.tasks.*Experiment, wires a DB-backed BaseLogger subclass so every engine Event becomes an events row, pickles the fitted pipeline to the artifact dir, writes the leaderboard JSON back onto the Run row, and — if anyone is listening on the WebSocket — fans out events live.

ADDED — pycaret_server/runs/ subsystem#

  • ADDEDpycaret_server/runs/broker.pyEventBroker, a thread-safe fan-out that bridges the worker thread's synchronous BaseLogger.emit to async WebSocket consumers. Subscribers register an asyncio.Queue; publish(run_id, event_dict) dispatches through loop.call_soon_threadsafe(queue.put_nowait, event) so producers can live on any thread. close_run(run_id) pushes an END sentinel to drain outstanding subscribers when a run reaches a terminal state. Module-level singleton event_broker; clear() for test fixtures.
  • ADDEDpycaret_server/runs/logger_bridge.pyDBEventLogger(pycaret.logging.BaseLogger). Overrides emit() to open a short-lived SQLAlchemy session, write a single Event row (kind + message + payload + duration_ms + emitted_at), then republish via event_broker.publish. Per-emit session avoids holding SQLite connections open across long-running verbs. Resolves session_factory via get_session() (the defining-module function) so test monkeypatches take effect.
  • ADDEDpycaret_server/runs/plans.py — "plan" abstraction: a PlanName Literal ("setup" | "create" | "compare"), a PlanOutcome dataclass (leaderboard + best_model + extra), and execute_plan(exp, plan, *, model_id, plan_params) that maps each plan onto the right engine verb call. Also load_sklearn_dataset(name) covering iris / wine / breast_cancer / diabetes (no network dependency) and load_inline(rows) for JSON dict-of-lists payloads.
  • ADDEDpycaret_server/runs/orchestrator.pyRunOrchestrator: a ThreadPoolExecutor(max_workers=2) wrapped with submit / wait_for / shutdown. _execute(spec) transitions the Run row through queued → running → succeeded|failed, loads data, instantiates the right Experiment subclass by TaskType, attaches the DBEventLogger, calls exp.fit(df), executes the plan, pickles outcome.best_model to ${PYCARET_ARTIFACT_DIR}/runs/<run_id>/pipeline.pkl with a SHA-256 checksum, writes an Artifact row, stores the leaderboard JSON + a two-key summary (rows, best) on the Run row. Every exception is captured into Run.error; a finally: closes the event stream via event_broker.close_run. Module-level singleton via get_orchestrator() / reset_orchestrator() (used in test fixtures + the app lifespan teardown).
  • ADDEDpycaret_server/runs/__init__.py — exports EventBroker, event_broker, RunOrchestrator, get_orchestrator.

ADDED — HTTP + WebSocket routes#

  • ADDEDpycaret_server/api/runs.py — a single APIRouter(tags=["runs"]) mounted at /api/v1 hosting:
    • POST /experiments/{experiment_id}/runs — validates plan / model_id / data source, snapshots the Experiment config into Run.snapshot, persists a queued Run, enqueues a RunSpec with the orchestrator, returns 202.
    • GET /experiments/{experiment_id}/runs — all runs for an experiment, newest first.
    • GET /runs/{run_id} — status + leaderboard + metrics_summary + error.
    • GET /runs/{run_id}/events?limit=&after_id= — paginated replay (polling clients; UI uses the WebSocket).
    • POST /runs/{run_id}/wait?timeout_s=30 — blocking wait; refreshes the Run and returns. Useful for notebooks + tests.
    • WS /runs/{run_id}/events/ws?token=<jwt> — authenticates via query-param JWT (browser WebSocket can't set headers), replays stored events first so the client sees full history, then subscribes to the broker until run.closed. Non-terminal subscribers get live fan-out; late-joiners on a terminal run get the replay + sentinel then disconnect.
  • ADDEDpycaret_server/api/schemas.pyRunCreate, RunResponse, EventResponse Pydantic models.

ADDED — app wiring#

  • ADDEDpycaret_server/app.py now mounts the runs router and tears down the orchestrator in the lifespan finally so ephemeral worker threads stop between test runs and on graceful server shutdown.
  • ADDEDpycaret_server/api/__init__.py exports the runs submodule.

TESTS#

  • TESTSpycaret-server/tests/test_runs.py — 6 new integration tests with per-test SQLite + orchestrator reset:
    • test_submit_run_validation — covers 3 bad shapes (no data source, create without model_id, unknown plan). 400 each.
    • test_setup_run_lifecycle — submit a setup run on iris, block via /wait, assert status=succeeded + duration_ms > 0 + events contain experiment.started and experiment.fitted.
    • test_create_run_produces_artifact — submit a create run with model_id=lr on iris, block, assert the pipeline pickle exists on disk under ${artifact_dir}/runs/<id>/pipeline.pkl.
    • test_list_runs_for_experiment — empty list, submit 2 runs (iris + wine), list returns 2.
    • test_websocket_replay_after_run_finishes — submit + wait, then open the WS; receive the event replay followed by run.closed. Verifies the broker handles late subscribers against terminal runs.
    • test_ws_rejects_unauth — WebSocket with no ?token= is closed with code 4401.
  • TESTSServer suite total: 20/20 green in ~10 s (14 existing + 6 new).
  • TESTSCombined engine + server: 52/52 green.

INTERNAL#

  • INTERNALThread + asyncio bridgeEventBroker.publish is safe from worker threads because each subscriber remembers its owning event loop at subscribe-time; call_soon_threadsafe defers the Queue.put_nowait onto the right loop. This matches FastAPI's own pattern for cross-loop wakeups.
  • INTERNALTest monkeypatch safety — any module-level from pycaret_server.db import session_factory would have frozen the reference at import time and missed the per-test rebind. Switched to from pycaret_server.db import get_session (a wrapper function that resolves session_factory from its defining module's globals at call time) in logger_bridge.py, orchestrator.py, and api/runs.py. Pattern to reuse in any future module.
  • INTERNALPipeline pickling — uses cloudpickle (already a PyCaret core dep) with pickle fallback. SHA-256 checksum captured in the Artifact row for later integrity validation when loaded for serving.

Session 10 delta summary#

MetricSession 9 endSession 10 end
Server LOC~1,800~2,400 (+600)
API routes (under /api/v1)2126 + 1 WebSocket
Tests in server suite1420
Total tests (engine + server)4652
runs/* subsystem0 files5 files, ~580 LOC