"""morie.otis_all_analyze -- RichResult-emitting analyses for ALL 28
OTIS datasets.
This module is the **comprehensive** OTIS analysis surface. It pairs
with `morie.otis_datasets` (the registry/loader) and extends the
narrower `morie.otis_analyze` (which only covered the restricted-
confinement subset).
For each dataset id (b01..d07), this module exposes:
analyze_<id>(df=None) -> RichResult
If df is None, the analysis loads from
data/datasets/OTIS/<csv_filename>.
Plus a top-level driver:
analyze_all(*, out_dir=None) -> dict[str, RichResult]
which runs every dataset's analysis, writes per-dataset .txt and .json
under data/manifest/outputs/otis/, and returns the in-memory results.
Plus the Ruhela-formulation (RF) high-level entry points:
analyze_a01_ruhela_formulations(df=None) -- full DLRM on a01
-- alias: analyze_a01_dlrm
analyze_b01_ruhela_formulations(df=None) -- full DLRM on b01
-- alias: analyze_b01_dlrm
analyze_b02_ruhela_formulations(df=None) -- gender -> seg days on b02
-- alias: analyze_b02_dlrm
analyze_a01_with_csi_context() -- a01 causal + Toronto CSI
analyze_c_doob_chi2() -- χ² + Cramer's V on c-series
-- homage to Doob's chi-square tradition
analyze_d_doob_chi2() -- d-series + Alert χ²
Naming abbreviations:
RF -- Ruhela formulation
RDF -- Ruhela Dual Formulation (RF + Naive-arm sensitivity)
DLRM -- Doob-Levinsky-Ruhela-Medina (methodology attribution)
The DLRM methodology attribution and the wider acknowledgements
(Prof. Jauregui, Prof. Laniyonu) live at the top of
``morie.otis_causal``'s module docstring.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
from .fn._richresult import RichResult
from .otis_datasets import (
DATASET_REGISTRY,
load_otis_dataset,
)
PROJECT = Path(__file__).resolve().parents[5]
DEFAULT_OUT = PROJECT / "data/manifest/outputs/otis"
# ── Generic helpers ────────────────────────────────────────────────
def _year_col(df: pd.DataFrame) -> str | None:
for c in ("EndFiscalYear", "Year"):
if c in df.columns:
return c
return None
def _to_int(x: Any) -> int:
try:
return int(x)
except Exception:
return 0
def _summary_lines(df: pd.DataFrame, ds_id: str) -> list[tuple[str, Any]]:
meta = DATASET_REGISTRY[ds_id]
yc = _year_col(df)
out: list[tuple[str, Any]] = [
("Dataset", f"{ds_id} -- {meta.description}"),
("Series", meta.series),
("Rows", int(df.shape[0])),
("Columns", int(df.shape[1])),
]
if yc and df.shape[0]:
years = pd.to_numeric(df[yc], errors="coerce").dropna()
if years.size:
out.append(("Years covered",
f"{int(years.min())}–{int(years.max())}"))
if meta.primary_metric in df.columns:
col = pd.to_numeric(df[meta.primary_metric], errors="coerce")
out.append((f"Total {meta.primary_metric}",
int(col.sum())))
return out
def _crosstab(df: pd.DataFrame,
row: str, col: str, value: str,
aggfunc: str = "sum",
top_rows: int = 20) -> dict | None:
if not all(c in df.columns for c in (row, col, value)):
return None
pivot = df.pivot_table(
index=row, columns=col,
values=value, aggfunc=aggfunc, fill_value=0,
)
pivot["TOTAL"] = pivot.sum(axis=1)
pivot = pivot.sort_values("TOTAL", ascending=False).head(top_rows)
return {
"title": f"{value} by {row} × {col}:",
"headers": [row] + [str(c) for c in pivot.columns],
"rows": [[str(idx)] + [int(v) if pd.notna(v) else 0
for v in row_vals]
for idx, row_vals in pivot.iterrows()],
}
def _year_trend(df: pd.DataFrame, value: str,
year_col: str | None = None) -> dict | None:
yc = year_col or _year_col(df)
if not yc or value not in df.columns:
return None
g = df.groupby(yc)[value].sum().sort_index()
return {
"title": f"{value} by {yc}:",
"headers": [yc, value],
"rows": [[int(y), int(v)] for y, v in g.items()],
}
# ── b-series analyses ──────────────────────────────────────────────
[docs]
def analyze_b01(df: pd.DataFrame | None = None) -> RichResult:
"""Person-level segregation placements (76,934 rows)."""
df = df if df is not None else load_otis_dataset("b01")
summary = _summary_lines(df, "b01")
summary.append(("Unique individuals",
int(df["UniqueIndividual_ID"].nunique())))
summary.append(("Mean consecutive days",
float(df["NumberConsecutiveDays_Segregation"].mean())))
summary.append(("Median consecutive days",
int(df["NumberConsecutiveDays_Segregation"].median())))
summary.append(("Max consecutive days",
int(df["NumberConsecutiveDays_Segregation"].max())))
# Reason flags
reasons = [c for c in df.columns if c.startswith("SegReason_")]
reason_rows = []
for r in reasons:
n_yes = int((df[r] == True).sum() if df[r].dtype == bool
else (df[r] == 1).sum())
if n_yes > 0:
reason_rows.append([r.replace("SegReason_", ""), n_yes,
f"{100*n_yes/df.shape[0]:.1f}%"])
reason_rows.sort(key=lambda r: -r[1])
# Alert flags
alerts = ["MentalHealth_Alert", "SuicideRisk_Alert", "SuicideWatch_Alert"]
alert_rows = []
for a in alerts:
if a in df.columns:
n_yes = int((df[a] == True).sum() if df[a].dtype == bool
else (df[a] == 1).sum())
alert_rows.append([a, n_yes,
f"{100*n_yes/df.shape[0]:.1f}%"])
return RichResult(
title="b01 -- Segregation placements (person-level detail)",
summary_lines=summary,
tables=[
{"title": "Reasons for placement (count, % of rows):",
"headers": ["Reason", "Count", "Percent"], "rows": reason_rows},
{"title": "Alert flags on placements:",
"headers": ["Alert", "Count", "Percent"], "rows": alert_rows},
_year_trend(df, "Number_Of_Placements") or {"title": "(no year trend)", "headers": [], "rows": []},
],
payload={"n_rows": int(df.shape[0]),
"n_individuals": int(df["UniqueIndividual_ID"].nunique())},
)
[docs]
def analyze_b02(df: pd.DataFrame | None = None) -> RichResult:
"""Aggregate days in segregation per person per year."""
df = df if df is not None else load_otis_dataset("b02")
summary = _summary_lines(df, "b02")
days = pd.to_numeric(df["TotalAggregatedDays_Segregation"],
errors="coerce").dropna()
summary.append(("Mean total days", float(days.mean())))
summary.append(("Median total days", int(days.median())))
summary.append(("Max total days", int(days.max())))
return RichResult(
title="b02 -- Segregation total days per person per fiscal year",
summary_lines=summary,
tables=[
_year_trend(df, "TotalAggregatedDays_Segregation") or {"title": "(none)", "headers": [], "rows": []},
_crosstab(df, "Gender", "Region_MostRecentPlacement",
"TotalAggregatedDays_Segregation") or
{"title": "(none)", "headers": [], "rows": []},
],
)
[docs]
def analyze_b03(df: pd.DataFrame | None = None) -> RichResult:
"""Segregation placements by alert × institution."""
df = df if df is not None else load_otis_dataset("b03")
summary = _summary_lines(df, "b03")
return RichResult(
title="b03 -- Segregation placements by alert/hold flag × institution",
summary_lines=summary,
tables=[
_crosstab(df, "Alert_Type", "Alert_Presence",
"Number_SegregationPlacements") or {},
_crosstab(df, "Region_AtTimeOfPlacement", "Alert_Type",
"Number_SegregationPlacements") or {},
],
)
[docs]
def analyze_b04(df: pd.DataFrame | None = None) -> RichResult:
"""Placement durations (max/median/mode) by region & gender."""
df = df if df is not None else load_otis_dataset("b04")
summary = _summary_lines(df, "b04")
return RichResult(
title="b04 -- Placement durations by region & gender",
summary_lines=summary,
tables=[
_crosstab(df, "Region_AtTimeOfPlacement", "Measure",
"NumberConsecutiveDays_Segregation",
aggfunc="max") or {},
],
)
[docs]
def analyze_b05(df: pd.DataFrame | None = None) -> RichResult:
"""Distribution of placements by binned duration."""
df = df if df is not None else load_otis_dataset("b05")
summary = _summary_lines(df, "b05")
return RichResult(
title="b05 -- Distribution of placements by binned duration",
summary_lines=summary,
tables=[
_crosstab(df, "Consecutive_Duration", "EndFiscalYear",
"Number_SegregationPlacements") or {},
],
)
[docs]
def analyze_b06(df: pd.DataFrame | None = None) -> RichResult:
"""Reasons for placement × institution × gender."""
df = df if df is not None else load_otis_dataset("b06")
summary = _summary_lines(df, "b06")
return RichResult(
title="b06 -- Reasons for placement by institution & gender",
summary_lines=summary,
tables=[
_crosstab(df, "Reason", "EndFiscalYear",
"Number_SegregationPlacements") or {},
_crosstab(df, "Reason", "Gender",
"Number_SegregationPlacements") or {},
],
)
[docs]
def analyze_b07(df: pd.DataFrame | None = None) -> RichResult:
"""Alerts × gender."""
df = df if df is not None else load_otis_dataset("b07")
summary = _summary_lines(df, "b07")
rows = []
for _, r in df.iterrows():
with_a = _to_int(r["Number_Segregation_Placements_With_Alert"])
wo_a = _to_int(r["Number_Segregation_Placements_Without_Alert"])
tot = with_a + wo_a
rate = (with_a / tot * 100) if tot > 0 else 0.0
rows.append([_to_int(r["EndFiscalYear"]), str(r["Alert_Type"]),
str(r["Gender"]), with_a, wo_a, f"{rate:.1f}%"])
return RichResult(
title="b07 -- Segregation placements with/without alert × gender",
summary_lines=summary,
tables=[{"title": "By alert × gender × year:",
"headers": ["Year", "Alert_Type", "Gender", "With_Alert",
"Without_Alert", "% with alert"],
"rows": rows}],
)
[docs]
def analyze_b08(df: pd.DataFrame | None = None) -> RichResult:
"""Durations (median/mode) by institution & gender."""
df = df if df is not None else load_otis_dataset("b08")
summary = _summary_lines(df, "b08")
return RichResult(
title="b08 -- Placement durations by institution & gender",
summary_lines=summary,
tables=[
_crosstab(df, "Institution_AtTimeOfPlacement", "Measure",
"NumberConsecutiveDays_Segregation",
aggfunc="max", top_rows=15) or {},
],
)
[docs]
def analyze_b09(df: pd.DataFrame | None = None) -> RichResult:
"""Individuals by number of segregation placements × gender."""
df = df if df is not None else load_otis_dataset("b09")
summary = _summary_lines(df, "b09")
return RichResult(
title="b09 -- Individuals by number of placements × gender",
summary_lines=summary,
tables=[
_crosstab(df, "NumberPlacements_Segregation", "Gender",
"NumberIndividuals_Segregation") or {},
],
)
# ── c-series analyses ──────────────────────────────────────────────
[docs]
def analyze_c01(df: pd.DataFrame | None = None) -> RichResult:
"""Total individuals × custody/RC/seg × gender."""
df = df if df is not None else load_otis_dataset("c01")
summary = _summary_lines(df, "c01")
# Compute RC/custody and seg/custody rates
rate_rows = []
for _, r in df.iterrows():
cust = _to_int(r["NumberIndividuals_InCustody"])
rc = _to_int(r["NumberIndividuals_RestrictiveConfinement"])
seg = _to_int(r["NumberIndividuals_Segregation"])
rate_rows.append([_to_int(r["EndFiscalYear"]), str(r["Gender"]),
cust, rc, seg,
f"{(rc/cust*100) if cust else 0:.1f}%",
f"{(seg/cust*100) if cust else 0:.1f}%"])
return RichResult(
title="c01 -- Total individuals × custody/RC/seg × gender",
summary_lines=summary,
tables=[{"title": "Cohort sizes + ratios:",
"headers": ["Year", "Gender", "Custody", "RC", "Seg",
"RC/custody", "Seg/custody"],
"rows": rate_rows}],
)
[docs]
def analyze_c02(df: pd.DataFrame | None = None) -> RichResult:
"""Individuals in RC/seg by institution."""
df = df if df is not None else load_otis_dataset("c02")
summary = _summary_lines(df, "c02")
return RichResult(
title="c02 -- Individuals in RC/seg by institution × region × gender",
summary_lines=summary,
tables=[
_crosstab(df, "Institution_MostRecentPlacement",
"EndFiscalYear",
"NumberIndividuals_RestrictiveConfinement",
top_rows=15) or {},
],
)
[docs]
def analyze_c03(df: pd.DataFrame | None = None) -> RichResult:
"""Individuals × race × gender."""
df = df if df is not None else load_otis_dataset("c03")
summary = _summary_lines(df, "c03")
# Compute race-disparity ratios: RC/custody and seg/custody by race
by_race = df.groupby("Race", as_index=False).agg(
cust=("NumberIndividuals_InCustody", "sum"),
rc=("NumberIndividuals_RestrictiveConfinement", "sum"),
seg=("NumberIndividuals_Segregation", "sum"),
)
rows = []
for _, r in by_race.iterrows():
cust = _to_int(r["cust"])
rc = _to_int(r["rc"])
seg = _to_int(r["seg"])
rows.append([str(r["Race"]), cust, rc, seg,
f"{(rc/cust*100) if cust else 0:.1f}%",
f"{(seg/cust*100) if cust else 0:.1f}%"])
rows.sort(key=lambda x: -x[1])
return RichResult(
title="c03 -- Individuals × race × gender",
summary_lines=summary,
tables=[{"title": "By race (totals across years/genders):",
"headers": ["Race", "Custody", "RC", "Seg",
"RC/custody", "Seg/custody"],
"rows": rows}],
interpretation=(
"Race × confinement disparities are visible in the "
"RC/custody and Seg/custody ratios. Compare across race "
"categories -- the gap between Indigenous and White rates is "
"a documented Ontario-corrections finding."
),
)
[docs]
def analyze_c04(df: pd.DataFrame | None = None) -> RichResult:
"""Individuals × race × region."""
df = df if df is not None else load_otis_dataset("c04")
summary = _summary_lines(df, "c04")
return RichResult(
title="c04 -- Individuals in RC/seg × race × region",
summary_lines=summary,
tables=[
_crosstab(df, "Race", "Region_MostRecentPlacement",
"NumberIndividuals_RestrictiveConfinement") or {},
],
)
[docs]
def analyze_c05(df: pd.DataFrame | None = None) -> RichResult:
"""Individuals × religion × region."""
df = df if df is not None else load_otis_dataset("c05")
summary = _summary_lines(df, "c05")
return RichResult(
title="c05 -- Individuals in RC/seg × religion × region",
summary_lines=summary,
tables=[
_crosstab(df, "Religion", "Region_MostRecentPlacement",
"NumberIndividuals_RestrictiveConfinement") or {},
],
)
[docs]
def analyze_c06(df: pd.DataFrame | None = None) -> RichResult:
"""Individuals × age × region."""
df = df if df is not None else load_otis_dataset("c06")
summary = _summary_lines(df, "c06")
return RichResult(
title="c06 -- Individuals in RC/seg × age category × region",
summary_lines=summary,
tables=[
_crosstab(df, "Age_Category", "Region_MostRecentPlacement",
"NumberIndividuals_RestrictiveConfinement") or {},
],
)
[docs]
def analyze_c07(df: pd.DataFrame | None = None) -> RichResult:
"""Individuals × alerts × gender."""
df = df if df is not None else load_otis_dataset("c07")
summary = _summary_lines(df, "c07")
return RichResult(
title="c07 -- Individuals in custody/RC/seg × alert type × gender",
summary_lines=summary,
tables=[
_crosstab(df, "Alert_Type", "Gender",
"NumberIndividuals_RestrictiveConfinement") or {},
_crosstab(df, "Alert_Type", "EndFiscalYear",
"NumberIndividuals_Segregation") or {},
],
)
[docs]
def analyze_c08(df: pd.DataFrame | None = None) -> RichResult:
"""Individuals × religion × gender."""
df = df if df is not None else load_otis_dataset("c08")
summary = _summary_lines(df, "c08")
return RichResult(
title="c08 -- Individuals × religion × gender",
summary_lines=summary,
tables=[
_crosstab(df, "Religion", "Gender",
"NumberIndividuals_RestrictiveConfinement") or {},
],
)
[docs]
def analyze_c09(df: pd.DataFrame | None = None) -> RichResult:
"""Individuals × age × gender."""
df = df if df is not None else load_otis_dataset("c09")
summary = _summary_lines(df, "c09")
return RichResult(
title="c09 -- Individuals × age category × gender",
summary_lines=summary,
tables=[
_crosstab(df, "Age_Category", "Gender",
"NumberIndividuals_RestrictiveConfinement") or {},
],
)
[docs]
def analyze_c10(df: pd.DataFrame | None = None) -> RichResult:
"""RC/seg aggregate durations by institution."""
df = df if df is not None else load_otis_dataset("c10")
summary = _summary_lines(df, "c10")
return RichResult(
title="c10 -- RC/seg aggregate durations by institution",
summary_lines=summary,
tables=[
_crosstab(df, "Institution_MostRecentPlacement", "Measure",
"TotalAggregatedDays_RestrictiveConfinement",
aggfunc="max", top_rows=15) or {},
],
)
[docs]
def analyze_c11(df: pd.DataFrame | None = None) -> RichResult:
"""Individuals by aggregate-duration bin."""
df = df if df is not None else load_otis_dataset("c11")
summary = _summary_lines(df, "c11")
return RichResult(
title="c11 -- Individuals by binned aggregate duration",
summary_lines=summary,
tables=[
_crosstab(df, "Aggregate_Duration", "EndFiscalYear",
"NumberIndividuals_RestrictiveConfinement") or {},
],
)
[docs]
def analyze_c12(df: pd.DataFrame | None = None) -> RichResult:
"""RC/seg aggregate durations by region & gender."""
df = df if df is not None else load_otis_dataset("c12")
summary = _summary_lines(df, "c12")
return RichResult(
title="c12 -- RC/seg aggregate durations by region & gender",
summary_lines=summary,
tables=[
_crosstab(df, "Region_MostRecentPlacement", "Measure",
"TotalAggregatedDays_RestrictiveConfinement",
aggfunc="max") or {},
],
)
# ── d-series analyses ──────────────────────────────────────────────
[docs]
def analyze_d01(df: pd.DataFrame | None = None) -> RichResult:
"""Person-level deaths in custody."""
df = df if df is not None else load_otis_dataset("d01")
summary = _summary_lines(df, "d01")
summary.append(("Distinct individuals",
int(df["UniqueIndividual_ID"].nunique())))
# Real CSV uses MedicalCauseofDeath / MeansofDeath (lowercase o);
# tolerate either casing.
cause_col = ("MedicalCauseofDeath" if "MedicalCauseofDeath" in df.columns
else "MedicalCauseOfDeath")
means_col = ("MeansofDeath" if "MeansofDeath" in df.columns
else "MeansOfDeath")
return RichResult(
title="d01 -- Custodial deaths (person-level)",
summary_lines=summary,
tables=[
{"title": "By region:",
"headers": ["Region", "Deaths"],
"rows": [[k, int(v)] for k, v in
df["Region_AtTimeOfDeath"].value_counts().items()]},
{"title": "By housing unit type:",
"headers": ["HousingUnit", "Deaths"],
"rows": [[k, int(v)] for k, v in
df["HousingUnit_Type"].value_counts().items()]},
{"title": "By medical cause:",
"headers": ["Cause", "Deaths"],
"rows": [[k, int(v)] for k, v in
df[cause_col].value_counts().items()]},
{"title": "By means of death:",
"headers": ["Means", "Deaths"],
"rows": [[k, int(v)] for k, v in
df[means_col].value_counts().items()]},
],
)
def _d_simple(ds_id: str, by: str) -> callable:
def _fn(df: pd.DataFrame | None = None) -> RichResult:
df = df if df is not None else load_otis_dataset(ds_id)
meta = DATASET_REGISTRY[ds_id]
return RichResult(
title=f"{ds_id} -- {meta.description}",
summary_lines=_summary_lines(df, ds_id),
tables=[
_year_trend(df, "Number_CustodialDeaths") or {"title": "(none)", "headers": [], "rows": []},
_crosstab(df, by, "Year", "Number_CustodialDeaths") or {"title": "(none)", "headers": [], "rows": []},
],
)
return _fn
analyze_d02 = _d_simple("d02", "Gender")
analyze_d03 = _d_simple("d03", "Race")
analyze_d04 = _d_simple("d04", "Religion")
analyze_d05 = _d_simple("d05", "Age_Category")
[docs]
def analyze_d06(df: pd.DataFrame | None = None) -> RichResult:
df = df if df is not None else load_otis_dataset("d06")
return RichResult(
title="d06 -- Custodial deaths × alert × medical cause",
summary_lines=_summary_lines(df, "d06"),
tables=[
_crosstab(df, "MedicalCauseOfDeath", "Alert_Type",
"Number_CustodialDeaths") or {},
],
)
[docs]
def analyze_d07(df: pd.DataFrame | None = None) -> RichResult:
df = df if df is not None else load_otis_dataset("d07")
return RichResult(
title="d07 -- Custodial deaths × alert × housing unit",
summary_lines=_summary_lines(df, "d07"),
tables=[
_crosstab(df, "HousingUnit_Type", "Alert_Type",
"Number_CustodialDeaths") or {},
],
)
# ── Master driver ──────────────────────────────────────────────────
_ANALYSES = {
"b01": analyze_b01, "b02": analyze_b02, "b03": analyze_b03,
"b04": analyze_b04, "b05": analyze_b05, "b06": analyze_b06,
"b07": analyze_b07, "b08": analyze_b08, "b09": analyze_b09,
"c01": analyze_c01, "c02": analyze_c02, "c03": analyze_c03,
"c04": analyze_c04, "c05": analyze_c05, "c06": analyze_c06,
"c07": analyze_c07, "c08": analyze_c08, "c09": analyze_c09,
"c10": analyze_c10, "c11": analyze_c11, "c12": analyze_c12,
"d01": analyze_d01, "d02": analyze_d02, "d03": analyze_d03,
"d04": analyze_d04, "d05": analyze_d05, "d06": analyze_d06,
"d07": analyze_d07,
}
[docs]
def analyze_a01(df: pd.DataFrame | None = None) -> RichResult:
"""High-level a01 (Restrictive Confinement Detailed) analysis.
Wraps the full causal pipeline that the OTIS-RC research
runs against ``a01_restrictive_confinement_detailed_dataset.csv``:
1. Person-year aggregation via 8-state alert-combo encoding
(``morie.otis_causal.make_pair_alert_to_volatility_a01``).
2. MatchIt 1:1 NN PSM with caliper.
3. IRM-DML with cross-fitted random-forest nuisances.
4. Multi-way clustered standard errors (id, region, year).
Returns a :class:`RichResult` with both ATE and ATTE estimates.
"""
from . import otis_causal as oc
if df is None:
data, T, Y, covs = oc.make_pair_alert_to_volatility_a01()
else:
data, T, Y, covs = oc.make_pair_alert_to_volatility_ruhela(df)
fit = oc.otis_irm_dml(
data, treatment=T, outcome=Y, covariates=covs,
cluster_cols="UniqueIndividual_ID",
match_first=True,
)
summary = [
("Source file", "a01_restrictive_confinement_detailed_dataset.csv"),
("Person-years (after MatchIt)", fit["n"]),
("ATE", round(fit["ate"], 4)),
("ATE 95% CI", (round(fit["ate_ci95"][0], 4),
round(fit["ate_ci95"][1], 4))),
("ATTE", round(fit["atte"], 4)),
("ATTE 95% CI", (round(fit["atte_ci95"][0], 4),
round(fit["atte_ci95"][1], 4))),
("Standard error type", fit["se_kind"]),
]
return RichResult(
title=("OTIS a01 -- high alert complexity (ac ≥ 2) "
"-> regional volatility (vm count)"),
summary_lines=summary,
interpretation=(
"MatchIt-then-IRM-DML reproduction of the published "
"res_pool finding on the canonical Restrictive "
"Confinement Detailed Dataset."),
payload=fit,
)
def _dual_irm_dml_on(df: pd.DataFrame,
*,
ds_id: str,
source_label: str,
title: str,
interpretation: str,
cluster_col: str = "EndFiscalYear",
include_match_first: bool = True,
include_per_year: bool = True,
) -> RichResult:
"""Shared full-dual IRM-DML driver for a01 and b01 analyzers.
Runs Ruhela + Naive arms side by side on `df`, optionally adds the
match_first PSM-pruned Ruhela fit and per-fiscal-year Ruhela ATEs.
Both `analyze_a01_dual` and `analyze_b01_dual` delegate here.
"""
from . import otis_causal as oc
both = oc.make_pair_alert_to_volatility_all(df)
rows = []
payloads: dict = {}
for arm in ("ruhela", "naive"):
data, T, Y, cov = both[arm]
res = oc.otis_irm_dml(
data, treatment=T, outcome=Y, covariates=cov,
cluster_cols=[cluster_col],
)
rows.append([
arm.capitalize(), T, Y, int(res["n"]),
f"{100*res['p_treat']:.1f}%",
f"{res['ate']:+.4f}",
f"[{res['ate_ci95'][0]:+.3f}, {res['ate_ci95'][1]:+.3f}]",
f"{res['ate_pval']:.2e}",
f"{res['atte']:+.4f}",
f"[{res['atte_ci95'][0]:+.3f}, {res['atte_ci95'][1]:+.3f}]",
])
payloads[arm] = {k: v for k, v in res.items()
if k not in {"data"}}
extra_rows = []
if include_match_first:
data_r, T_r, Y_r, cov_r = both["ruhela"]
res_mf = oc.otis_irm_dml(
data_r, treatment=T_r, outcome=Y_r, covariates=cov_r,
cluster_cols=[cluster_col], match_first=True,
)
extra_rows.append([
"Ruhela + match_first", T_r, Y_r, int(res_mf["n"]),
f"{100*res_mf['p_treat']:.1f}%",
f"{res_mf['ate']:+.4f}",
f"[{res_mf['ate_ci95'][0]:+.3f}, {res_mf['ate_ci95'][1]:+.3f}]",
f"{res_mf['ate_pval']:.2e}",
f"{res_mf['atte']:+.4f}",
f"[{res_mf['atte_ci95'][0]:+.3f}, {res_mf['atte_ci95'][1]:+.3f}]",
])
payloads["ruhela_match_first"] = {k: v for k, v in res_mf.items()
if k not in {"data"}}
per_year_rows = []
if include_per_year:
data_r, T_r, Y_r, cov_r = both["ruhela"]
try:
yr_results = oc.otis_per_year_irm_dml(
data_r, treatment=T_r, outcome=Y_r, covariates=cov_r,
)
payloads["ruhela_per_year"] = {
str(k): {kk: vv for kk, vv in r.items()
if kk not in {"data"}}
for k, r in yr_results.items() if isinstance(r, dict)}
for y, r in sorted(yr_results.items()):
if isinstance(r, dict) and "ate" in r:
per_year_rows.append([
int(y), int(r.get("n", 0)),
f"{r['ate']:+.4f}",
f"[{r['ate_ci95'][0]:+.3f}, {r['ate_ci95'][1]:+.3f}]",
f"{r['ate_pval']:.2e}",
])
except Exception as e: # noqa: BLE001
per_year_rows = [[f"err: {type(e).__name__}", str(e)[:80]]]
summary = [
("Source file", source_label),
("Dataset id", ds_id),
("Cluster axis", cluster_col),
("Person-years", int(both["ruhela"][0].shape[0])),
("Ruhela ATE",
f"{payloads['ruhela']['ate']:+.4f} "
f"(SE {payloads['ruhela']['ate_se']:.4f})"),
("Naive ATE",
f"{payloads['naive']['ate']:+.4f} "
f"(SE {payloads['naive']['ate_se']:.4f})"),
]
if include_match_first:
summary.append(("Ruhela ATE (match_first PSM)",
f"{payloads['ruhela_match_first']['ate']:+.4f} "
f"(SE {payloads['ruhela_match_first']['ate_se']:.4f}, "
f"matched n={payloads['ruhela_match_first']['n']})"))
if include_per_year and per_year_rows:
summary.append(("Per-year Ruhela ATEs",
", ".join(f"FY{r[0]}={r[2]}" for r in per_year_rows
if isinstance(r[0], int))))
tables = [{
"title": (f"Side-by-side dual on {ds_id}: T = high-alert-complexity "
f"(arm-specific), Y = vm; cluster = {cluster_col}"),
"headers": ["Arm", "T", "Y", "n", "p_treat",
"ATE", "ATE 95% CI", "ATE p", "ATTE", "ATTE 95% CI"],
"rows": rows + extra_rows,
}]
if include_per_year and per_year_rows:
tables.append({
"title": f"Per-fiscal-year Ruhela IRM-DML on {ds_id}:",
"headers": ["FY", "n", "ATE", "ATE 95% CI", "ATE p"],
"rows": per_year_rows,
})
return RichResult(
title=title,
summary_lines=summary,
tables=tables,
interpretation=interpretation,
payload={"arms": payloads, "cluster_col": cluster_col,
"ds_id": ds_id},
)
def _ruhela_formulations_on(df: pd.DataFrame,
*,
ds_id: str,
source_label: str,
title: str,
interpretation: str,
cluster_col: str = "EndFiscalYear",
treatment: str | None = None,
outcome: str | None = None,
covariates: list[str] | None = None,
naive_pair_fn=None,
include_atc: bool = True,
include_plr: bool = True,
include_multi_se: bool = True,
include_superlearner: bool = True,
propensity_calibration: str = "none",
) -> RichResult:
"""Ruhela formulations (full DLRM).
Runs the complete OTIS-RC methodology arc on a Ruhela formulation --
a (treatment, outcome, covariates) design choice for a specific
OTIS dataset. Defaults to the author's canonical alert-complexity -> vm
formulation (via ``make_pair_alert_to_volatility_all``); pass
explicit ``treatment``/``outcome``/``covariates`` for dataset-
specific Ruhela formulations on b02-b09 / c-series / d-series.
DLRM (each estimator runs on the Ruhela arm):
1. IPW (Hájek) -- single-robust; Lunceford-Davidian SE
2. AIPW -- doubly-robust; RRZ 1994 IF, cross-fit
3. g-computation -- single-robust outcome model + bootstrap
4. PSM 1:1 NN -- Austin 2011 caliper, ATT
5. PSM subclass (5 strata) -- Rosenbaum-Rubin 1983, ATE
6. IRM-DML -- Chernozhukov 2018, ATE+ATTE+ATC,
cluster-robust SE
7. PSM->IRM-DML (match_first) -- the author's MatchIt-then-DoubleML pipeline
8. ATC (AIPW-flavour) -- E[Y(1)-Y(0) | D=0]
9. PLR DML -- homogeneous-effect double ML
10. SuperLearner-stacked AIPW -- RF+ridge+GLM+mean convex stack
Plus IRM-DML SE comparison: pooled (iid), cluster on EndFiscalYear,
cluster on UniqueIndividual_ID, multi-way (year × id) -- same point
estimate, four standard errors.
Parameters
----------
df : pd.DataFrame
Source dataset.
ds_id : str
Dataset id (a01, b01, b02, c01...).
cluster_col : str
Primary cluster axis for IRM-DML SE.
treatment, outcome, covariates : optional explicit Ruhela formulation.
If None, falls back to ``make_pair_alert_to_volatility_all`` (a01/b01).
naive_pair_fn : callable returning (df, T, Y, cov), optional.
If supplied, runs Naive-arm sensitivity (e.g. any-flag, vm-binary).
Default None: skip Naive arm (used for b02+/c/d formulations).
include_atc / include_plr / include_multi_se / include_superlearner :
Toggles for the extended ensemble.
propensity_calibration : "none" | "platt" | "isotonic"
Calibration of propensity scores in IPW/AIPW.
"""
from . import otis_causal as oc
payloads: dict = {"ensemble": [], "match_first": None,
"ds_id": ds_id, "cluster_col": cluster_col,
"propensity_calibration": propensity_calibration}
# Build the Ruhela arm: explicit triple, or default to alert-complexity
if treatment is not None and outcome is not None and covariates is not None:
# Explicit dataset-specific Ruhela formulation
data_r = df[[treatment, outcome] + list(covariates)].dropna().copy()
T_r, Y_r, cov_r = treatment, outcome, list(covariates)
formulation_label = f"{ds_id}: T={treatment}, Y={outcome}"
naive_arm = None
if naive_pair_fn is not None:
try:
naive_arm = naive_pair_fn(df)
except Exception: # noqa: BLE001
naive_arm = None
else:
# Default a01/b01 alert-complexity formulation with naive sensitivity
both = oc.make_pair_alert_to_volatility_all(df)
data_r, T_r, Y_r, cov_r = both["ruhela"]
formulation_label = f"{ds_id}: T_high_ac -> Y_vm_count (canonical)"
naive_arm = both["naive"]
payloads["formulation"] = {"label": formulation_label,
"treatment": T_r, "outcome": Y_r,
"covariates": cov_r,
"n_rows": int(data_r.shape[0])}
rows: list = []
# Helper to format a CausalEstimate row
def _row(idx: str, est, label: str = None,
estimand: str = "ATE") -> list:
if est is None or not hasattr(est, "ate"):
return [idx, "--", "err", "--", "--", "--", "--"]
ci = est.ate_ci95
return [
idx if not label else f"{idx} ({label})",
estimand,
f"{est.ate:+.4f}",
f"{est.ate_se:.4f}",
f"[{ci[0]:+.3f}, {ci[1]:+.3f}]",
f"{est.ate_pval:.2e}",
", ".join(est.notes[:2]) if est.notes else "--",
]
# 1. IPW
try:
ipw = oc.otis_ipw(data_r, treatment=T_r, outcome=Y_r,
covariates=cov_r,
propensity_calibration=propensity_calibration)
rows.append(_row("1. IPW (Hájek)", ipw))
payloads["ensemble"].append({"estimator": "IPW",
"ate": float(ipw.ate),
"se": float(ipw.ate_se),
"p": float(ipw.ate_pval),
"n": int(ipw.n), "notes": ipw.notes})
except Exception as e: # noqa: BLE001
rows.append(["1. IPW", "--", "err", str(e)[:30], "--", "--", "--"])
# 2. AIPW
try:
aipw = oc.otis_aipw(data_r, treatment=T_r, outcome=Y_r,
covariates=cov_r,
propensity_calibration=propensity_calibration)
rows.append(_row("2. AIPW (RRZ DR)", aipw))
payloads["ensemble"].append({"estimator": "AIPW",
"ate": float(aipw.ate),
"se": float(aipw.ate_se),
"p": float(aipw.ate_pval),
"n": int(aipw.n), "notes": aipw.notes})
except Exception as e: # noqa: BLE001
rows.append(["2. AIPW", "--", "err", str(e)[:30], "--", "--", "--"])
# 3. g-computation
try:
gc = oc.otis_gcomputation(data_r, treatment=T_r, outcome=Y_r,
covariates=cov_r, n_bootstrap=200)
rows.append(_row("3. g-computation", gc))
payloads["ensemble"].append({"estimator": "g-computation",
"ate": float(gc.ate),
"se": float(gc.ate_se),
"p": float(gc.ate_pval),
"n": int(gc.n), "notes": gc.notes})
except Exception as e: # noqa: BLE001
rows.append(["3. g-computation", "--", "err", str(e)[:30], "--", "--", "--"])
# 4. PSM 1:1 NN
try:
psm = oc.otis_psm(data_r, treatment=T_r, outcome=Y_r,
covariates=cov_r, k=1)
rows.append(_row("4. PSM 1:1 NN", psm, estimand="ATT"))
payloads["ensemble"].append({"estimator": "PSM-NN",
"att": float(psm.ate),
"se": float(psm.ate_se),
"p": float(psm.ate_pval),
"n": int(psm.n), "notes": psm.notes})
except Exception as e: # noqa: BLE001
rows.append(["4. PSM 1:1 NN", "--", "err", str(e)[:30], "--", "--", "--"])
# 5. PSM subclass
try:
psm_sc = oc.otis_psm_subclass(data_r, treatment=T_r, outcome=Y_r,
covariates=cov_r, n_strata=5)
rows.append(_row("5. PSM subclass (5)", psm_sc))
payloads["ensemble"].append({"estimator": "PSM-subclass",
"ate": float(psm_sc.ate),
"se": float(psm_sc.ate_se),
"p": float(psm_sc.ate_pval),
"n": int(psm_sc.n),
"notes": psm_sc.notes})
except Exception as e: # noqa: BLE001
rows.append(["5. PSM subclass", "--", "err", str(e)[:30], "--", "--", "--"])
# 6. IRM-DML -- primary (cluster on cluster_col), now with ATC
irm_results: dict = {}
try:
dml = oc.otis_irm_dml(data_r, treatment=T_r, outcome=Y_r,
covariates=cov_r,
cluster_cols=[cluster_col]
if cluster_col in data_r.columns else None)
rows.append(["6. IRM-DML", "ATE",
f"{dml['ate']:+.4f}", f"{dml['ate_se']:.4f}",
f"[{dml['ate_ci95'][0]:+.3f}, {dml['ate_ci95'][1]:+.3f}]",
f"{dml['ate_pval']:.2e}",
f"cluster={dml['se_kind']}"])
rows.append(["6. IRM-DML", "ATTE",
f"{dml['atte']:+.4f}", f"{dml['atte_se']:.4f}",
f"[{dml['atte_ci95'][0]:+.3f}, {dml['atte_ci95'][1]:+.3f}]",
f"{dml['atte_pval']:.2e}", "--"])
if include_atc and dml.get("atc") is not None:
rows.append(["6. IRM-DML", "ATC",
f"{dml['atc']:+.4f}",
f"{dml.get('atc_se', float('nan')):.4f}",
(f"[{dml['atc_ci95'][0]:+.3f}, "
f"{dml['atc_ci95'][1]:+.3f}]"
if dml.get("atc_ci95") else "--"),
(f"{dml['atc_pval']:.2e}"
if dml.get("atc_pval") is not None else "--"),
"--"])
irm_results = dml
payloads["ensemble"].append({"estimator": "IRM-DML",
"ate": float(dml["ate"]),
"ate_se": float(dml["ate_se"]),
"atte": float(dml["atte"]),
"atte_se": float(dml["atte_se"]),
"atc": (float(dml["atc"])
if dml.get("atc") is not None
else None),
"n": int(dml["n"])})
except Exception as e: # noqa: BLE001
rows.append(["6. IRM-DML", "--", "err", str(e)[:30], "--", "--", "--"])
# 7. PSM -> IRM-DML
try:
dml_mf = oc.otis_irm_dml(data_r, treatment=T_r, outcome=Y_r,
covariates=cov_r,
cluster_cols=[cluster_col]
if cluster_col in data_r.columns else None,
match_first=True)
rows.append(["7. PSM->IRM-DML (match_first)", "ATE",
f"{dml_mf['ate']:+.4f}", f"{dml_mf['ate_se']:.4f}",
f"[{dml_mf['ate_ci95'][0]:+.3f}, "
f"{dml_mf['ate_ci95'][1]:+.3f}]",
f"{dml_mf['ate_pval']:.2e}",
f"matched_n={dml_mf['n']}"])
payloads["match_first"] = {k: v for k, v in dml_mf.items()
if k != "data"}
except Exception as e: # noqa: BLE001
rows.append(["7. PSM->IRM-DML", "--", "err", str(e)[:30], "--", "--", "--"])
# 8. ATC (AIPW-style)
if include_atc:
try:
atc = oc.otis_atc(data_r, treatment=T_r, outcome=Y_r,
covariates=cov_r)
rows.append(_row("8. ATC (AIPW)", atc, estimand="ATC"))
payloads["ensemble"].append({"estimator": "ATC-AIPW",
"atc": float(atc.ate),
"se": float(atc.ate_se),
"p": float(atc.ate_pval),
"n": int(atc.n), "notes": atc.notes})
except Exception as e: # noqa: BLE001
rows.append(["8. ATC (AIPW)", "--", "err",
str(e)[:30], "--", "--", "--"])
# 9. PLR (Partially Linear DML)
if include_plr:
try:
plr = oc.otis_plr(data_r, treatment=T_r, outcome=Y_r,
covariates=cov_r)
rows.append(_row("9. PLR (Chernozhukov 2018)", plr,
estimand="θ"))
payloads["ensemble"].append({"estimator": "PLR",
"theta": float(plr.ate),
"se": float(plr.ate_se),
"p": float(plr.ate_pval),
"n": int(plr.n), "notes": plr.notes})
except Exception as e: # noqa: BLE001
rows.append(["9. PLR", "--", "err", str(e)[:30], "--", "--", "--"])
# 10. SuperLearner-stacked AIPW
if include_superlearner:
try:
sl = oc.otis_aipw_superlearner(data_r, treatment=T_r,
outcome=Y_r, covariates=cov_r,
propensity_calibration=propensity_calibration)
rows.append(_row("10. SuperLearner AIPW", sl))
payloads["ensemble"].append({"estimator": "SuperLearner-AIPW",
"ate": float(sl.ate),
"se": float(sl.ate_se),
"p": float(sl.ate_pval),
"n": int(sl.n),
"notes": sl.notes})
except Exception as e: # noqa: BLE001
rows.append(["10. SuperLearner AIPW", "--", "err",
str(e)[:30], "--", "--", "--"])
# === Multi-SE comparison on IRM-DML primary point estimate ===
multi_se_rows: list = []
if include_multi_se and irm_results:
ate_pt = irm_results.get("ate", float("nan"))
# Try four SE flavours: pooled, EFY-cluster, UID-cluster, multi-way
flavours = [
("pooled (iid)", None),
("cluster: EndFiscalYear",
["EndFiscalYear"] if "EndFiscalYear" in data_r.columns else None),
("cluster: UniqueIndividual_ID",
["UniqueIndividual_ID"]
if "UniqueIndividual_ID" in data_r.columns else None),
("multi-way: yr × id",
["EndFiscalYear", "UniqueIndividual_ID"]
if ("EndFiscalYear" in data_r.columns
and "UniqueIndividual_ID" in data_r.columns) else None),
]
for label, cl in flavours:
if cl is None and label != "pooled (iid)":
multi_se_rows.append([label, "--", "n/a (col missing)",
"--", "--"])
continue
try:
fit = oc.otis_irm_dml(data_r, treatment=T_r, outcome=Y_r,
covariates=cov_r, cluster_cols=cl)
multi_se_rows.append([
label,
f"{fit['ate']:+.4f}",
f"{fit['ate_se']:.4f}",
f"[{fit['ate_ci95'][0]:+.3f}, {fit['ate_ci95'][1]:+.3f}]",
f"{fit['ate_pval']:.2e}",
])
except Exception as e: # noqa: BLE001
multi_se_rows.append([label, "--", "err",
str(e)[:30], "--"])
payloads["multi_se"] = multi_se_rows
# === Naive-arm sensitivity (only if formulation defines a naive pair) ===
naive_rows = []
if naive_arm is not None:
data_n, T_n, Y_n, cov_n = naive_arm
for label_n, fn in [
("IPW", oc.otis_ipw),
("AIPW", oc.otis_aipw),
("g-comp", oc.otis_gcomputation),
]:
try:
r = fn(data_n, treatment=T_n, outcome=Y_n,
covariates=cov_n)
naive_rows.append([label_n, f"{r.ate:+.4f}",
f"{r.ate_se:.4f}",
f"[{r.ate_ci95[0]:+.3f}, "
f"{r.ate_ci95[1]:+.3f}]",
f"{r.ate_pval:.2e}"])
except Exception as e: # noqa: BLE001
naive_rows.append([label_n, "err", str(e)[:30],
"--", "--"])
payloads["naive_arm"] = naive_rows
# === Balance & propensity diagnostics on Ruhela arm ===
balance_summary = "--"
try:
balance = oc.otis_balance(data_r, treatment=T_r,
covariates=cov_r)
smd_data = balance.payload if hasattr(balance, "payload") else {}
max_smd = smd_data.get("max_abs_smd", "n/a")
balance_summary = (f"max |SMD| = {max_smd:.3f}"
if isinstance(max_smd, (int, float))
else str(max_smd))
except Exception as e: # noqa: BLE001
balance_summary = f"err: {type(e).__name__}: {e}"
summary = [
("Source file", source_label),
("Dataset id", ds_id),
("Formulation", formulation_label),
("Cluster axis (primary)", cluster_col),
("n", int(data_r.shape[0])),
("Treated prevalence",
f"{100*float(data_r[T_r].mean() if data_r[T_r].dtype != object else (data_r[T_r] == data_r[T_r].mode()[0]).mean()):.1f}%"
if T_r in data_r.columns else "n/a"),
("Pre-balance (covariate SMD)", balance_summary),
("Propensity calibration", propensity_calibration),
]
tables = [{
"title": (f"Ruhela formulations (full DLRM, "
f"alias DLRM) on {ds_id} -- {formulation_label}:"),
"headers": ["Estimator", "Estimand",
"Estimate", "SE", "95% CI", "p-value", "Notes"],
"rows": rows,
}]
if include_multi_se and multi_se_rows:
tables.append({
"title": ("IRM-DML SE comparison: pooled vs clustered on "
"EndFiscalYear, UniqueIndividual_ID, multi-way "
"(year × id):"),
"headers": ["SE flavour", "ATE", "SE", "95% CI", "p-value"],
"rows": multi_se_rows,
})
if naive_rows:
tables.append({
"title": ("Naive-arm sensitivity (any-flag, vm-binary): "
"concordance with Ruhela in sign, smaller magnitude"),
"headers": ["Estimator", "ATE", "SE", "95% CI", "p-value"],
"rows": naive_rows,
})
return RichResult(
title=title,
summary_lines=summary,
tables=tables,
interpretation=interpretation,
payload=payloads,
)
# Backward-compat alias (renamed 2026-05-09)
[docs]
def analyze_a01_dual(*args, **kwargs):
"""Deprecated alias for ``analyze_a01_ruhela_formulations``."""
import warnings as _w
_w.warn("analyze_a01_dual is deprecated; use analyze_a01_ruhela_formulations",
DeprecationWarning, stacklevel=2)
return analyze_a01_ruhela_formulations(*args, **kwargs)
# ── Per-dataset Ruhela formulations (b02-b09 / c01-c12 / d01-d07) ──
#
# OTIS data shape varies by dataset: a01/b01 are panel with alerts +
# region pair (canonical Ruhela formulation applies); b02-b09 / c-series
# are aggregate or have only some Ruhela variables; d-series carries
# no alert columns. Each function below picks a (T, Y, covariates)
# triple that's well-defined for the dataset's actual columns. These
# available variables.
def _female_indicator(s) -> "pd.Series":
return (s.astype(str).str.lower() == "female").astype(int)
def _region_toronto_indicator(s) -> "pd.Series":
return (s.astype(str).str.lower() == "toronto").astype(int)
def _age_50plus_indicator(s) -> "pd.Series":
return s.astype(str).str.contains("50", na=False).astype(int)
[docs]
def analyze_ruhela_per_year(df: "pd.DataFrame",
*,
ds_id: str,
treatment: str,
outcome: str,
covariates: list[str],
year_col: str = "EndFiscalYear",
cluster_col: str | None = "EndFiscalYear",
propensity_calibration: str = "none",
) -> RichResult:
"""Per-fiscal-year full DLRM on each Ruhela formulation.
Runs the complete DLRM (IPW + AIPW + g-comp + PSM-NN +
PSM-subclass + IRM-DML + match_first + ATC + PLR + SuperLearner)
separately on each fiscal year. ~7× the standard per-year IRM-DML
runtime. Mirrors a per-year extension of the author's ``res_by_year``.
"""
from . import otis_causal as oc
cl = ([cluster_col] if cluster_col else None)
by_year = oc.otis_per_year_irm_dml(
df, treatment=treatment, outcome=outcome,
covariates=covariates, year_col=year_col,
cluster_cols=cl, full_ensemble=True,
propensity_calibration=propensity_calibration,
)
# Build a long table: one row per (year, estimator)
long_rows: list = []
for yr, year_results in sorted(by_year.items()):
if not isinstance(year_results, dict):
continue
n = year_results.get("n", "?")
for est_label in ("ipw", "aipw", "gcomp", "psm_nn",
"psm_subclass", "atc", "plr",
"superlearner"):
est = year_results.get(est_label, {})
if "error" in est:
long_rows.append([yr, est_label.upper(),
"err", "--", "--", "--", est.get("error", "")[:30]])
continue
ci = est.get("ci95", (float("nan"), float("nan")))
long_rows.append([
yr, est_label.upper(),
f"{est.get('ate', float('nan')):+.4f}",
f"{est.get('se', float('nan')):.4f}",
f"[{ci[0]:+.3f}, {ci[1]:+.3f}]",
f"{est.get('p', float('nan')):.2e}",
f"n={est.get('n', '?')}",
])
# IRM-DML: report ATE + ATTE + ATC inline
irm = year_results.get("irm_dml", {})
if "error" in irm:
long_rows.append([yr, "IRM-DML", "err", "--", "--", "--",
irm.get("error", "")[:30]])
else:
for label_irm, key, key_se in [
("IRM-DML ATE", "ate", "ate_se"),
("IRM-DML ATTE", "atte", "atte_se"),
("IRM-DML ATC", "atc", "atc_se"),
]:
v = irm.get(key)
vse = irm.get(key_se)
if v is None or vse is None:
continue
long_rows.append([
yr, label_irm,
f"{v:+.4f}", f"{vse:.4f}",
f"[{v - 1.96 * vse:+.3f}, {v + 1.96 * vse:+.3f}]",
"--", f"n={irm.get('n', '?')} {irm.get('se_kind', '')}",
])
# match_first
mf = year_results.get("match_first", {})
if "error" in mf:
long_rows.append([yr, "PSM->IRM-DML", "err", "--", "--", "--",
mf.get("error", "")[:30]])
else:
v = mf.get("ate", float("nan"))
vse = mf.get("ate_se", float("nan"))
long_rows.append([
yr, "PSM->IRM-DML",
f"{v:+.4f}", f"{vse:.4f}",
f"[{v - 1.96 * vse:+.3f}, {v + 1.96 * vse:+.3f}]",
"--", f"matched_n={mf.get('n', '?')}",
])
summary = [
("Dataset id", ds_id),
("Treatment", treatment),
("Outcome", outcome),
("Year column", year_col),
("Cluster axis", cluster_col or "iid"),
("Years analysed", len([y for y in by_year if not
(isinstance(by_year[y], dict)
and "error" in by_year[y]
and len(by_year[y]) == 1)])),
("Estimators per year", 10),
]
return RichResult(
title=(f"OTIS {ds_id} -- per-fiscal-year full DLRM-on-Ruhela-formulations "
"suite (10 estimators × N years)"),
summary_lines=summary,
tables=[{
"title": ("Per-year × estimator: ATE / SE / 95% CI / p / n. "
"Triangulation across IPW + AIPW + g-comp + PSM "
"(NN+subclass) + IRM-DML (ATE+ATTE+ATC) + "
"match_first + ATC-AIPW + PLR + SuperLearner."),
"headers": ["FY", "Estimator", "Estimate", "SE",
"95% CI", "p", "Notes"],
"rows": long_rows,
}],
interpretation=(
"Per-year × per-estimator triangulation. Stable signs "
"across estimators within each FY = no-misspecification "
"robustness; stable magnitudes across FYs = temporal "
"stationarity. Use this to identify FYs where some "
"estimators disagree -- typical sign of finite-sample "
"fragility or genuine effect heterogeneity."
),
payload={"by_year": by_year,
"ds_id": ds_id,
"treatment": treatment,
"outcome": outcome},
)
[docs]
def analyze_a01_ruhela_per_year(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Per-year full DLRM on Ruhela formulations on a01."""
from . import otis_causal as oc
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("a01")
data, T, Y, cov = oc.make_pair_alert_to_volatility_a01()
return analyze_ruhela_per_year(
data, ds_id="a01", treatment=T, outcome=Y, covariates=cov,
cluster_col="EndFiscalYear")
[docs]
def analyze_b01_ruhela_per_year(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Per-year full DLRM on Ruhela formulations on b01."""
from . import otis_causal as oc
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("b01")
data, T, Y, cov = oc.make_pair_alert_to_volatility_ruhela(df)
return analyze_ruhela_per_year(
data, ds_id="b01", treatment=T, outcome=Y, covariates=cov,
cluster_col="EndFiscalYear")
# ── Aggregate Ruhela formulations on b03-b09 + c-series ──────────
#
# Aggregate datasets carry pre-summed counts, not individual rows. The
# propensity-score / cross-fitted-ML machinery in the per-row Ruhela
# formulations doesn't apply. The shape-appropriate analog is a
# Poisson + Negative-Binomial GLM with year-FE + treatment-FE +
# demographic-FE, returning an IRR table with 95% CI. This is still
def _ruhela_aggregate_on(df: "pd.DataFrame",
*,
ds_id: str,
source_label: str,
title: str,
interpretation: str,
treatment: str,
outcome: str,
covariates: "list[str] | None" = None,
year_col: str = "EndFiscalYear",
cluster_group: "str | None" = None,
) -> RichResult:
"""Aggregate Ruhela formulation: Poisson + NB IRR on count outcomes.
For aggregate datasets where rows are pre-summed counts grouped by
demographic / institutional cells. By default fits both Poisson and
Negative-Binomial GLMs with the chosen treatment + covariates +
year fixed effect; reports IRR with Wald 95% CI and AIC dispersion
diagnostics.
When ``cluster_group`` is supplied, switches to a **GEE marginal
model** (Liang-Zeger 1986) clustered on that grouping variable,
with cluster-robust SE. Use this when one categorical has too many
levels to enter as a fixed effect (e.g. ~76 institutions on a 148-
row table) -- the FE specification becomes collinear and fits fail.
GEE marginal model is what OTIS-RC's ``glmmTMB nbinom2`` does in
spirit: report population-level IRR with cluster-robust inference.
Parameters
----------
df : pre-aggregated DataFrame
treatment : column whose IRR we want (typically a binary indicator
derived from a categorical column)
outcome : count column
covariates : additional categorical variables to include as fixed
effects
year_col : year fixed effect column (defaults to EndFiscalYear)
cluster_group : if set, fit GEE clustered on this grouping variable
instead of including it as a categorical FE
"""
try:
import statsmodels.api as sm
import statsmodels.formula.api as smf
except ImportError:
return RichResult(
title=title,
warnings=["statsmodels unavailable -- install morie[stats]"],
interpretation="Aggregate Poisson/NB GLM requires statsmodels.",
)
cov = list(covariates or [])
cluster_cols = [cluster_group] if cluster_group else []
# Dedup column selection to avoid duplicate-column bug when
# cluster_group, year_col, treatment, etc. overlap.
_all_cols = list(dict.fromkeys(
[treatment, outcome, year_col, *cov, *cluster_cols]))
work = df[_all_cols].dropna().copy()
work[outcome] = pd.to_numeric(work[outcome], errors="coerce")
work = work.dropna(subset=[outcome])
work[outcome] = work[outcome].astype(int)
if work.empty or work[outcome].sum() == 0:
return RichResult(
title=title,
warnings=["empty cell-table or zero outcome count"],
)
# Build a formula string with categorical wrappers for non-numeric covs
parts = [f"C({treatment})"]
for c in cov:
parts.append(f"C({c})")
parts.append(f"C({year_col})")
formula = f"{outcome} ~ " + " + ".join(parts)
rows: list = []
payloads: dict = {"formula": formula, "ds_id": ds_id,
"n_rows": int(work.shape[0]),
"treatment": treatment, "outcome": outcome,
"cluster_group": cluster_group,
"estimands": []}
aic_pois = float("nan")
aic_nb = float("nan")
# === GLM fits (always run for IRR estimate + AIC) ===
for fam_label, family in (
("Poisson", sm.families.Poisson()),
("NB", sm.families.NegativeBinomial(alpha=1.0)),
):
try:
model = smf.glm(formula, data=work, family=family).fit()
t_coef_keys = [k for k in model.params.index
if treatment in k]
if not t_coef_keys:
rows.append([fam_label, treatment, "no coef", "--", "--",
"--", "--"])
continue
t_key = t_coef_keys[0]
beta = float(model.params[t_key])
se = float(model.bse[t_key])
irr = float(np.exp(beta))
ci_lo = float(np.exp(beta - 1.96 * se))
ci_hi = float(np.exp(beta + 1.96 * se))
pval = float(model.pvalues[t_key])
aic = float(model.aic)
if fam_label == "Poisson":
aic_pois = aic
else:
aic_nb = aic
rows.append([
fam_label, t_key, round(irr, 4),
f"[{ci_lo:.3f}, {ci_hi:.3f}]",
f"{pval:.2e}", round(aic, 1),
f"n={int(work.shape[0])}, total_y={int(work[outcome].sum())}",
])
payloads["estimands"].append({
"family": fam_label, "coef": t_key,
"irr": irr, "ci95_low": ci_lo, "ci95_high": ci_hi,
"p": pval, "aic": aic, "beta": beta, "se": se,
})
except Exception as e: # noqa: BLE001
rows.append([fam_label, treatment, "fit failed",
str(type(e).__name__), str(e)[:30],
"--", "--"])
# === GEE clustered fits when cluster_group set ===
# Iterate over (Family × WorkingCorrelation): Poisson + NB,
# Exchangeable + Independence. Independence = sandwich-only
# cluster-robust SE without modelling within-cluster correlation
# (the conservative sensitivity baseline). Exchangeable = constant
# within-cluster correlation (the standard OTIS-RC analog).
if cluster_group is not None:
n_groups = int(work[cluster_group].nunique())
gee_specs = [
("Poisson", "Exch", sm.families.Poisson(),
sm.cov_struct.Exchangeable()),
("Poisson", "Indep", sm.families.Poisson(),
sm.cov_struct.Independence()),
("NB", "Exch", sm.families.NegativeBinomial(alpha=1.0),
sm.cov_struct.Exchangeable()),
("NB", "Indep", sm.families.NegativeBinomial(alpha=1.0),
sm.cov_struct.Independence()),
]
for fam_lbl, corr_lbl, family, cov_struct in gee_specs:
row_label = (f"GEE-{fam_lbl} (cluster:{cluster_group}, "
f"{corr_lbl})")
try:
gee_model = smf.gee(
formula, groups=cluster_group, data=work,
family=family, cov_struct=cov_struct,
).fit()
t_coef_keys = [k for k in gee_model.params.index
if treatment in k]
if not t_coef_keys:
rows.append([row_label, treatment, "no coef",
"--", "--", "--", "--"])
continue
t_key = t_coef_keys[0]
beta = float(gee_model.params[t_key])
se = float(gee_model.bse[t_key])
irr = float(np.exp(beta))
ci_lo = float(np.exp(beta - 1.96 * se))
ci_hi = float(np.exp(beta + 1.96 * se))
pval = float(gee_model.pvalues[t_key])
rows.append([
row_label, t_key, round(irr, 4),
f"[{ci_lo:.3f}, {ci_hi:.3f}]",
f"{pval:.2e}", "--",
f"groups={n_groups}, n_obs={int(work.shape[0])}",
])
payloads["estimands"].append({
"family": f"GEE-{fam_lbl}",
"working_correlation": corr_lbl,
"cluster_group": cluster_group,
"n_groups": n_groups,
"coef": t_key, "irr": irr,
"ci95_low": ci_lo, "ci95_high": ci_hi,
"p": pval, "beta": beta, "se": se,
})
except Exception as e: # noqa: BLE001
rows.append([
row_label, treatment, "fit failed",
str(type(e).__name__), str(e)[:30],
"--", "--",
])
overdispersion = (round(aic_pois - aic_nb, 1)
if np.isfinite(aic_pois) and np.isfinite(aic_nb)
else "n/a")
summary = [
("Source file", source_label),
("Dataset id", ds_id),
("Treatment column", treatment),
("Outcome (count)", outcome),
("Year FE", year_col),
("Covariate FEs", ", ".join(cov) if cov else "none"),
("Cluster group (GEE marginal model)",
cluster_group or "--"),
("Cells", int(work.shape[0])),
("Total outcome count", int(work[outcome].sum())),
("Overdispersion (Poisson AIC − NB AIC)", overdispersion),
]
return RichResult(
title=title,
summary_lines=summary,
tables=[{
"title": ("Aggregate Ruhela formulation -- Poisson + NB IRR "
+ ("+ GEE clustered " if cluster_group else "")
+ "for treatment effect on count outcome:"),
"headers": ["Family", "Coefficient", "IRR", "95% CI",
"p-value", "AIC", "Notes"],
"rows": rows,
}],
interpretation=interpretation + (
"\n\nIRR > 1 ⇒ treatment increases the count rate; IRR < 1 ⇒ "
"treatment decreases it. Concordance between Poisson and NB "
"indicates equidispersion; large gap (Poisson AIC ≫ NB AIC) "
"indicates overdispersion -- trust NB."
+ (" GEE-Poisson with cluster-robust SE is the population-"
"level marginal-model alternative that respects within-"
f"{cluster_group} correlation; trust the GEE row when "
"the GLM with that variable as a fixed effect fails to "
"fit due to collinearity."
if cluster_group else "")
),
payload=payloads,
)
[docs]
def analyze_b03_ruhela_aggregate(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Aggregate Ruhela formulation on b03 (Alert × Institution).
T = Alert_Presence indicator -> Y = Number_SegregationPlacements.
Year-FE + Region-FE + Alert_Type-FE; Institution as **GEE cluster
group** (24 levels, marginal model with cluster-robust SE).
"""
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("b03")
work = df.dropna(subset=["Alert_Presence",
"Number_SegregationPlacements"]).copy()
work["T_alert"] = (work["Alert_Presence"].astype(str).str.strip()
.str.lower().isin(["yes", "y", "true", "1"])
.astype(int))
return _ruhela_aggregate_on(
work, ds_id="b03",
source_label=("b03_segregation_placements_alerts_and_hold_flags"
"_by_institution.csv"),
title=("OTIS b03 -- Aggregate Ruhela formulation: "
"Alert presence -> Number of segregation placements"),
interpretation=(
"Aggregate Ruhela formulation on b03 testing whether an "
"alert-flagged person-cell receives more segregation "
"placements than a no-alert cell. GEE-Poisson clustered "
"on institution (24 levels) is the marginal-model "
"inference; Poisson + NB GLM rows give the population "
"IRR with FE on alert type, region, and year."
),
treatment="T_alert",
outcome="Number_SegregationPlacements",
covariates=["Alert_Type", "Region_AtTimeOfPlacement"],
cluster_group="Institution_AtTimeOfPlacement",
)
[docs]
def analyze_b07_ruhela_aggregate(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Aggregate Ruhela formulation on b07 (alerts × gender summary).
b07 is a wide table with paired columns:
Number_Segregation_Placements_With_Alert
Number_Segregation_Placements_Without_Alert
We pivot to long form (one row per alert-state × gender × year),
then T = with_alert indicator -> Y = count. Year + Gender + Alert_Type
fixed effects.
"""
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("b07")
work = df.dropna(subset=[
"EndFiscalYear", "Alert_Type", "Gender",
"Number_Segregation_Placements_With_Alert",
"Number_Segregation_Placements_Without_Alert",
]).copy()
# Pivot to long
long_with = work[["EndFiscalYear", "Alert_Type", "Gender",
"Number_Segregation_Placements_With_Alert"]
].rename(columns={
"Number_Segregation_Placements_With_Alert":
"n_placements"})
long_with["T_alert"] = 1
long_without = work[["EndFiscalYear", "Alert_Type", "Gender",
"Number_Segregation_Placements_Without_Alert"]
].rename(columns={
"Number_Segregation_Placements_Without_Alert":
"n_placements"})
long_without["T_alert"] = 0
long_df = pd.concat([long_with, long_without], ignore_index=True)
return _ruhela_aggregate_on(
long_df, ds_id="b07",
source_label=("b07_segregation_placements_alerts_and_hold_flags"
"_by_gender.csv"),
title=("OTIS b07 -- Aggregate Ruhela formulation: "
"With-alert vs without-alert × gender -> "
"Number of segregation placements"),
interpretation=(
"Aggregate Ruhela formulation on b07 testing whether "
"alert-flagged person-cells receive more segregation "
"placements than no-alert cells in the same gender × "
"alert-type stratum. Poisson + NB GLM IRR. Aggregate "
"counts; no individual-level inference."
),
treatment="T_alert",
outcome="n_placements",
covariates=["Alert_Type", "Gender"],
)
[docs]
def analyze_c01_ruhela_aggregate(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Aggregate Ruhela formulation on c01 -- gender disparity in RC.
c01 has Gender × {InCustody, RestrictiveConfinement, Segregation}
counts. T = Female indicator -> Y = NumberIndividuals_RC,
Year-FE.
"""
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("c01")
work = df.dropna(subset=[
"EndFiscalYear", "Gender",
"NumberIndividuals_RestrictiveConfinement"]).copy()
work["T_female"] = _female_indicator(work["Gender"])
return _ruhela_aggregate_on(
work, ds_id="c01",
source_label=("c01_individuals_in_segregation_and_restrictive_"
"confinement_total_individuals.csv"),
title=("OTIS c01 -- Aggregate Ruhela formulation: "
"Female indicator -> Number of individuals in RC"),
interpretation=(
"Aggregate Ruhela formulation on c01 testing gender "
"disparity in restrictive-confinement totals at the "
"population aggregate level. Year-FE only (gender is "
"the sole demographic cross). Aggregate counts; no "
"individual-level inference."
),
treatment="T_female",
outcome="NumberIndividuals_RestrictiveConfinement",
covariates=[],
)
[docs]
def analyze_c07_ruhela_aggregate(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Aggregate Ruhela formulation on c07 -- alert × gender -> RC count.
Counterpart of the per-row Ruhela formulation on a01/b01 at the
aggregate level. Useful triangulation: c07 reports
NumberIndividuals_InCustody / RC / Segregation cross-tabulated
by Alert_Type × Gender × FiscalYear. The Ruhela formulation here
contrasts an indicator of alert presence (vs. no-alert reference)
with the number-in-RC count.
"""
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("c07")
work = df.dropna(subset=[
"EndFiscalYear", "Alert_Type", "Gender",
"NumberIndividuals_RestrictiveConfinement"]).copy()
# Treat any Alert_Type other than "No Alert" / "None" as alert-present
no_alert_strs = {"no alert", "none", "no_alert", "no"}
work["T_alert_present"] = (~work["Alert_Type"].astype(str).str.strip()
.str.lower().isin(no_alert_strs)
).astype(int)
# If no rows remain in either arm, skip
if (work["T_alert_present"].sum() == 0
or (1 - work["T_alert_present"]).sum() == 0):
return RichResult(
title="c07 aggregate Ruhela",
warnings=["c07 has no no-alert reference rows -- "
"treatment indicator is degenerate"],
)
return _ruhela_aggregate_on(
work, ds_id="c07",
source_label=("c07_individuals_in_segregation_and_restrictive_"
"confinement_alerts_and_hold_flags.csv"),
title=("OTIS c07 -- Aggregate Ruhela formulation: "
"Alert presence × Gender -> Individuals in RC"),
interpretation=(
"Aggregate Ruhela formulation on c07 -- the population-level "
"counterpart of the canonical (T_high_ac, vm) per-row "
"formulation on a01/b01. Tests whether alert-flagged person-"
"cells produce higher RC counts at the aggregate level."
),
treatment="T_alert_present",
outcome="NumberIndividuals_RestrictiveConfinement",
covariates=["Alert_Type", "Gender"],
)
# ── Disparity-focused aggregate Ruhela formulations ────────────────
[docs]
def analyze_b06_ruhela_aggregate(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Aggregate Ruhela formulation on b06 -- disciplinary reason -> seg.
T = (Reason contains "Disciplinary") -> Y = Number of seg placements.
Year-FE + Region-FE + Gender-FE; Institution as **GEE cluster
group** (24 levels) for marginal-model cluster-robust inference.
"""
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("b06")
work = df.dropna(subset=[
"EndFiscalYear", "Reason", "Gender",
"Region_AtTimeOfPlacement", "Institution_AtTimeOfPlacement",
"Number_SegregationPlacements"]).copy()
work["T_disciplinary"] = (work["Reason"].astype(str).str.lower()
.str.contains("disciplinary", na=False)
).astype(int)
return _ruhela_aggregate_on(
work, ds_id="b06",
source_label=("b06_segregation_placements_reason_for_placement_"
"by_institution.csv"),
title=("OTIS b06 -- Aggregate Ruhela formulation: "
"Disciplinary reason -> Number of seg placements"),
interpretation=(
"Aggregate RF on b06 testing whether disciplinary "
"segregation reasons produce more placements than non-"
"disciplinary (security, protection, medical). GEE-Poisson "
"clustered on institution gives cluster-robust marginal "
"inference."
),
treatment="T_disciplinary",
outcome="Number_SegregationPlacements",
covariates=["Gender", "Region_AtTimeOfPlacement"],
cluster_group="Institution_AtTimeOfPlacement",
)
[docs]
def analyze_c03_ruhela_aggregate(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Aggregate Ruhela formulation on c03 -- racial disparity in RC.
T = (Race == "Indigenous") indicator -> Y = NumberIndividuals_RC.
Year-FE + Gender-FE. Indigenous vs all-other contrast tests
overrepresentation.
"""
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("c03")
work = df.dropna(subset=[
"EndFiscalYear", "Race", "Gender",
"NumberIndividuals_RestrictiveConfinement"]).copy()
work["T_indigenous"] = (work["Race"].astype(str).str.lower()
== "indigenous").astype(int)
return _ruhela_aggregate_on(
work, ds_id="c03",
source_label=("c03_individuals_in_segregation_and_restrictive_"
"confinement_race_by_gender.csv"),
title=("OTIS c03 -- Aggregate Ruhela formulation: "
"Indigenous race -> Individuals in RC"),
interpretation=(
"Aggregate Ruhela formulation on c03 testing Indigenous "
"overrepresentation in restrictive confinement at the "
"population aggregate level. Indigenous adults make up "
"~5% of Ontario population but a much higher share of "
"RC placements; this formulation quantifies the IRR."
),
treatment="T_indigenous",
outcome="NumberIndividuals_RestrictiveConfinement",
covariates=["Gender"],
)
[docs]
def analyze_c04_ruhela_aggregate(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Aggregate Ruhela formulation on c04 -- racial disparity by region.
T = (Race == "Indigenous") -> Y = NumberIndividuals_RC.
Year-FE + Region-FE.
"""
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("c04")
work = df.dropna(subset=[
"EndFiscalYear", "Race", "Region_MostRecentPlacement",
"NumberIndividuals_RestrictiveConfinement"]).copy()
work["T_indigenous"] = (work["Race"].astype(str).str.lower()
== "indigenous").astype(int)
return _ruhela_aggregate_on(
work, ds_id="c04",
source_label=("c04_individuals_in_segregation_and_restrictive_"
"confinement_race_by_region.csv"),
title=("OTIS c04 -- Aggregate Ruhela formulation: "
"Indigenous race -> Individuals in RC, by region"),
interpretation=(
"Aggregate Ruhela formulation on c04 testing Indigenous "
"overrepresentation in RC controlling for region. Region-"
"FE absorbs cross-region differences in Indigenous "
"population share so the Indigenous coefficient measures "
"within-region disparity."
),
treatment="T_indigenous",
outcome="NumberIndividuals_RestrictiveConfinement",
covariates=["Region_MostRecentPlacement"],
)
[docs]
def analyze_c06_ruhela_aggregate(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Aggregate Ruhela formulation on c06 -- age-50+ disparity in RC.
T = (Age_Category contains "50") -> Y = NumberIndividuals_RC.
Year-FE + Region-FE.
"""
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("c06")
work = df.dropna(subset=[
"EndFiscalYear", "Age_Category", "Region_MostRecentPlacement",
"NumberIndividuals_RestrictiveConfinement"]).copy()
work["T_50plus"] = _age_50plus_indicator(work["Age_Category"])
return _ruhela_aggregate_on(
work, ds_id="c06",
source_label=("c06_individuals_in_segregation_and_restrictive_"
"confinement_age_category_by_region.csv"),
title=("OTIS c06 -- Aggregate Ruhela formulation: "
"Age 50+ -> Individuals in RC, by region"),
interpretation=(
"Aggregate Ruhela formulation on c06 testing age-50+ "
"overrepresentation in RC controlling for region. Older "
"adults face different segregation pathways than younger "
"adults; this formulation quantifies the population-level "
"IRR."
),
treatment="T_50plus",
outcome="NumberIndividuals_RestrictiveConfinement",
covariates=["Region_MostRecentPlacement"],
)
[docs]
def analyze_c09_ruhela_aggregate(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Aggregate Ruhela formulation on c09 -- age-50+ disparity by gender."""
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("c09")
work = df.dropna(subset=[
"EndFiscalYear", "Age_Category", "Gender",
"NumberIndividuals_RestrictiveConfinement"]).copy()
work["T_50plus"] = _age_50plus_indicator(work["Age_Category"])
return _ruhela_aggregate_on(
work, ds_id="c09",
source_label=("c09_individuals_in_segregation_and_restrictive_"
"confinement_age_category_by_gender.csv"),
title=("OTIS c09 -- Aggregate Ruhela formulation: "
"Age 50+ -> Individuals in RC, by gender"),
interpretation=(
"Aggregate Ruhela formulation on c09 testing age-50+ "
"overrepresentation in RC controlling for gender."
),
treatment="T_50plus",
outcome="NumberIndividuals_RestrictiveConfinement",
covariates=["Gender"],
)
# ── Alternative-treatment Ruhela formulations on a01 (per-row) ────
#
# The canonical a01 Ruhela formulation is T_high_ac -> Y_vm_count
# (alert-complexity -> regional-volatility). The framework also supports
# alternative treatments on the same panel data: gender, age, region.
# Each runs the full 10-estimator Ruhela suite on the a01 cell-level
# frame, with the alternative T while keeping Y = vm count and the
# usual demographic covariates (excluding the variable being treated).
def _a01_ruhela_cell_frame() -> "tuple[pd.DataFrame, list[str]]":
"""Build the a01 cell-level (id × year) frame with vm count."""
from . import otis_causal as oc
from .otis_datasets import load_otis_dataset
df = load_otis_dataset("a01")
data, _, _, _ = oc.make_pair_alert_to_volatility_ruhela(df)
return data, ["Gender", "Age_Category", "EndFiscalYear"]
[docs]
def analyze_a01_ruhela_alt_gender(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Alt-T Ruhela formulation on a01 -- gender -> regional volatility.
Same per-row Ruhela cell frame as the canonical formulation, but
T = Female indicator -> Y = vm count. Covariates exclude Gender
(the variable being treated). Runs the full 10-estimator DLRM ensemble.
Tests whether women experience more or fewer regional transfers
within a fiscal year of restrictive confinement.
"""
cell_data, _ = _a01_ruhela_cell_frame()
cell_data = cell_data.copy()
cell_data["T_female"] = _female_indicator(cell_data["Gender"])
return _ruhela_formulations_on(
cell_data, ds_id="a01-altG",
source_label=("a01_restrictive_confinement_detailed_dataset.csv "
"(alt-T: Gender)"),
title=("OTIS a01 -- Alt-T Ruhela formulation: "
"T=Female -> Y=vm count (regional volatility)"),
interpretation=(
"Alternative-treatment Ruhela formulation on the a01 cell "
"frame. Same Y (vm count) as the canonical formulation; T "
"swapped to Female indicator. Tests whether gender alone "
"drives intra-year regional volatility."
),
treatment="T_female",
outcome="Y_vm_count",
covariates=["Age_Category", "EndFiscalYear"],
cluster_col="EndFiscalYear",
)
[docs]
def analyze_a01_ruhela_alt_age(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Alt-T Ruhela formulation on a01 -- age 50+ -> regional volatility."""
cell_data, _ = _a01_ruhela_cell_frame()
cell_data = cell_data.copy()
cell_data["T_50plus"] = _age_50plus_indicator(cell_data["Age_Category"])
return _ruhela_formulations_on(
cell_data, ds_id="a01-altA",
source_label=("a01_restrictive_confinement_detailed_dataset.csv "
"(alt-T: Age 50+)"),
title=("OTIS a01 -- Alt-T Ruhela formulation: "
"T=Age 50+ -> Y=vm count"),
interpretation=(
"Alternative-treatment Ruhela formulation: age 50+ as the "
"binary treatment. Tests whether older adults experience "
"different intra-year regional churn than younger adults."
),
treatment="T_50plus",
outcome="Y_vm_count",
covariates=["Gender", "EndFiscalYear"],
cluster_col="EndFiscalYear",
)
[docs]
def analyze_a01_ruhela_alt_toronto(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Alt-T Ruhela formulation on a01 -- Toronto-region -> regional volatility.
T = (Region_AtTimeOfPlacement == "Toronto") indicator. Tests
whether Toronto-region placements produce different volatility
than placements outside Toronto, controlling for demographics.
"""
cell_data, _ = _a01_ruhela_cell_frame()
cell_data = cell_data.copy()
# Cell-level frame has regA (region at placement)
if "regA" not in cell_data.columns:
return RichResult(
title="a01 Alt-T Toronto Ruhela",
warnings=["regA column missing from cell frame"],
)
cell_data["T_toronto"] = _region_toronto_indicator(cell_data["regA"])
return _ruhela_formulations_on(
cell_data, ds_id="a01-altT",
source_label=("a01_restrictive_confinement_detailed_dataset.csv "
"(alt-T: Toronto region)"),
title=("OTIS a01 -- Alt-T Ruhela formulation: "
"T=Toronto region -> Y=vm count"),
interpretation=(
"Alternative-treatment Ruhela formulation: Toronto-region "
"placement as the binary treatment. Tests whether the "
"Toronto region's institutional density translates into "
"different intra-year regional churn."
),
treatment="T_toronto",
outcome="Y_vm_count",
covariates=["Gender", "Age_Category", "EndFiscalYear"],
cluster_col="EndFiscalYear",
)
# ── b01 alt-T Ruhela formulations (parallel to a01 alt-T) ──────────
def _b01_ruhela_cell_frame() -> "pd.DataFrame":
"""Build the b01 cell-level (id × year) frame with vm count.
Same structure as a01's cell frame: per-(UniqueIndividual_ID,
EndFiscalYear) aggregation with regA/regB preserved.
"""
from . import otis_causal as oc
from .otis_datasets import load_otis_dataset
df = load_otis_dataset("b01")
data, _, _, _ = oc.make_pair_alert_to_volatility_ruhela(df)
return data
[docs]
def analyze_b01_ruhela_alt_gender(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Alt-T Ruhela formulation on b01 -- gender -> regional volatility.
Same as analyze_a01_ruhela_alt_gender but on b01 (Segregation
Detailed) cell frame. Tests whether women experience more or
fewer regional transfers within a fiscal year of segregation.
"""
cell_data = _b01_ruhela_cell_frame()
cell_data = cell_data.copy()
cell_data["T_female"] = _female_indicator(cell_data["Gender"])
return _ruhela_formulations_on(
cell_data, ds_id="b01-altG",
source_label=("b01_segregation_detailed_dataset.csv "
"(alt-T: Gender)"),
title=("OTIS b01 -- Alt-T Ruhela formulation: "
"T=Female -> Y=vm count (regional volatility)"),
interpretation=(
"Alternative-treatment Ruhela formulation on the b01 cell "
"frame. Same Y (vm count) as canonical; T = Female "
"indicator. Tests whether gender alone drives intra-year "
"regional volatility on segregation records (vs the per-"
"day RC records of a01)."
),
treatment="T_female",
outcome="Y_vm_count",
covariates=["Age_Category", "EndFiscalYear"],
cluster_col="EndFiscalYear",
)
[docs]
def analyze_b01_ruhela_alt_age(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Alt-T Ruhela formulation on b01 -- age 50+ -> regional volatility."""
cell_data = _b01_ruhela_cell_frame()
cell_data = cell_data.copy()
cell_data["T_50plus"] = _age_50plus_indicator(cell_data["Age_Category"])
return _ruhela_formulations_on(
cell_data, ds_id="b01-altA",
source_label=("b01_segregation_detailed_dataset.csv "
"(alt-T: Age 50+)"),
title=("OTIS b01 -- Alt-T Ruhela formulation: "
"T=Age 50+ -> Y=vm count"),
interpretation=(
"Alt-T RF on b01: age 50+ as binary treatment. Tests "
"whether older adults experience different intra-year "
"regional churn than younger adults on segregation records."
),
treatment="T_50plus",
outcome="Y_vm_count",
covariates=["Gender", "EndFiscalYear"],
cluster_col="EndFiscalYear",
)
[docs]
def analyze_b01_ruhela_alt_toronto(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Alt-T Ruhela formulation on b01 -- Toronto-region -> regional volatility."""
cell_data = _b01_ruhela_cell_frame()
cell_data = cell_data.copy()
if "regA" not in cell_data.columns:
return RichResult(
title="b01 Alt-T Toronto Ruhela",
warnings=["regA column missing from cell frame"],
)
cell_data["T_toronto"] = _region_toronto_indicator(cell_data["regA"])
return _ruhela_formulations_on(
cell_data, ds_id="b01-altT",
source_label=("b01_segregation_detailed_dataset.csv "
"(alt-T: Toronto region)"),
title=("OTIS b01 -- Alt-T Ruhela formulation: "
"T=Toronto region -> Y=vm count"),
interpretation=(
"Alt-T RF on b01: Toronto-region placement as binary "
"treatment. Tests whether Toronto's institutional density "
"translates into different intra-year regional churn on "
"segregation records."
),
treatment="T_toronto",
outcome="Y_vm_count",
covariates=["Gender", "Age_Category", "EndFiscalYear"],
cluster_col="EndFiscalYear",
)
# ── Subgroup-analysis variants on a01 (effect-heterogeneity by gender) ──
[docs]
def analyze_a01_ruhela_subgroup_female(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Canonical a01 Ruhela formulation on Female-only subset.
Tests effect-heterogeneity-by-gender: same T_high_ac -> vm
contrast, but only on female cells. Compare ATE here to
analyze_a01_ruhela_subgroup_male to see if alert-complexity
matters more or less for women than men.
"""
from . import otis_causal as oc
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("a01")
data, T, Y, cov = oc.make_pair_alert_to_volatility_ruhela(df)
sub = data[_female_indicator(data["Gender"]) == 1].copy()
if sub.shape[0] < 100:
return RichResult(
title="a01 subgroup Female Ruhela",
warnings=[f"only {sub.shape[0]} female cells; too few"],
)
cov_no_gender = [c for c in cov if c != "Gender"]
return _ruhela_formulations_on(
sub, ds_id="a01-subF",
source_label=("a01_restrictive_confinement_detailed_dataset.csv "
"(subgroup: Female)"),
title=("OTIS a01 -- Subgroup Ruhela formulation: "
"Female-only cell frame, T_high_ac -> vm count"),
interpretation=(
"Subgroup analysis: canonical Ruhela formulation restricted "
"to female cells. Compare to analyze_a01_ruhela_subgroup_male "
"for effect-heterogeneity-by-gender. If the female ATE is "
"smaller / larger than the male ATE, alert-complexity-driven "
"regional volatility is gender-conditional."
),
treatment=T, outcome=Y, covariates=cov_no_gender,
cluster_col="EndFiscalYear",
)
[docs]
def analyze_a01_ruhela_subgroup_male(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Canonical a01 Ruhela formulation on Male-only subset."""
from . import otis_causal as oc
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("a01")
data, T, Y, cov = oc.make_pair_alert_to_volatility_ruhela(df)
sub = data[_female_indicator(data["Gender"]) == 0].copy()
cov_no_gender = [c for c in cov if c != "Gender"]
return _ruhela_formulations_on(
sub, ds_id="a01-subM",
source_label=("a01_restrictive_confinement_detailed_dataset.csv "
"(subgroup: Male)"),
title=("OTIS a01 -- Subgroup Ruhela formulation: "
"Male-only cell frame, T_high_ac -> vm count"),
interpretation=(
"Subgroup analysis: canonical Ruhela formulation restricted "
"to male cells. Companion to analyze_a01_ruhela_subgroup_female "
"for effect-heterogeneity-by-gender."
),
treatment=T, outcome=Y, covariates=cov_no_gender,
cluster_col="EndFiscalYear",
)
[docs]
def analyze_b01_ruhela_subgroup_female(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Canonical b01 Ruhela formulation on Female-only subset.
Sister to analyze_a01_ruhela_subgroup_female. Tests effect-
heterogeneity-by-gender on segregation records.
"""
from . import otis_causal as oc
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("b01")
data, T, Y, cov = oc.make_pair_alert_to_volatility_ruhela(df)
sub = data[_female_indicator(data["Gender"]) == 1].copy()
if sub.shape[0] < 100:
return RichResult(
title="b01 subgroup Female Ruhela",
warnings=[f"only {sub.shape[0]} female cells; too few"],
)
cov_no_gender = [c for c in cov if c != "Gender"]
return _ruhela_formulations_on(
sub, ds_id="b01-subF",
source_label=("b01_segregation_detailed_dataset.csv "
"(subgroup: Female)"),
title=("OTIS b01 -- Subgroup Ruhela formulation: "
"Female-only cell frame, T_high_ac -> vm count"),
interpretation=(
"Subgroup analysis on b01 (segregation records, per-"
"placement). Compare to analyze_a01_ruhela_subgroup_female "
"to see whether the gender-conditional pattern in alert-"
"complexity -> vm holds across the two unit-of-analysis "
"variants (per-day RC vs per-placement seg)."
),
treatment=T, outcome=Y, covariates=cov_no_gender,
cluster_col="EndFiscalYear",
)
[docs]
def analyze_b01_ruhela_subgroup_male(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Canonical b01 Ruhela formulation on Male-only subset."""
from . import otis_causal as oc
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("b01")
data, T, Y, cov = oc.make_pair_alert_to_volatility_ruhela(df)
sub = data[_female_indicator(data["Gender"]) == 0].copy()
cov_no_gender = [c for c in cov if c != "Gender"]
return _ruhela_formulations_on(
sub, ds_id="b01-subM",
source_label=("b01_segregation_detailed_dataset.csv "
"(subgroup: Male)"),
title=("OTIS b01 -- Subgroup Ruhela formulation: "
"Male-only cell frame, T_high_ac -> vm count"),
interpretation=(
"Male-only subgroup analysis on b01. Companion to "
"analyze_b01_ruhela_subgroup_female for effect-"
"heterogeneity-by-gender on segregation records."
),
treatment=T, outcome=Y, covariates=cov_no_gender,
cluster_col="EndFiscalYear",
)
# ── b02 alt-T variants (region, age) ───────────────────────────────
[docs]
def analyze_b02_ruhela_alt_region(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""b02 alt-T: T = Toronto region -> Y = total seg days within FY.
Sister to analyze_b02_ruhela_formulations (T = Female). Tests
regional disparity in seg-day burden controlling for gender +
age + year.
"""
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("b02")
work = df.dropna(subset=["Gender",
"TotalAggregatedDays_Segregation",
"Region_MostRecentPlacement",
"Age_Category", "EndFiscalYear"]).copy()
work["T_toronto"] = _region_toronto_indicator(
work["Region_MostRecentPlacement"])
return _ruhela_formulations_on(
work, ds_id="b02-altR",
source_label=("b02_segregation_detailed_total_days.csv "
"(alt-T: Toronto region)"),
title=("OTIS b02 -- Alt-T Ruhela formulation: "
"T=Toronto region -> Y=Total seg days within FY"),
interpretation=(
"Alt-T RF on b02: Toronto-region indicator. Tests whether "
"Toronto's institutional density translates into different "
"seg-day burden than other regions, controlling for gender, "
"age, and year."
),
treatment="T_toronto",
outcome="TotalAggregatedDays_Segregation",
covariates=["Gender", "Age_Category", "EndFiscalYear"],
cluster_col="EndFiscalYear",
)
[docs]
def analyze_b02_ruhela_alt_age(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""b02 alt-T: T = Age 50+ -> Y = total seg days within FY."""
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("b02")
work = df.dropna(subset=["Gender",
"TotalAggregatedDays_Segregation",
"Region_MostRecentPlacement",
"Age_Category", "EndFiscalYear"]).copy()
work["T_50plus"] = _age_50plus_indicator(work["Age_Category"])
return _ruhela_formulations_on(
work, ds_id="b02-altA",
source_label=("b02_segregation_detailed_total_days.csv "
"(alt-T: Age 50+)"),
title=("OTIS b02 -- Alt-T Ruhela formulation: "
"T=Age 50+ -> Y=Total seg days within FY"),
interpretation=(
"Alt-T RF on b02: Age 50+ indicator. Tests whether older "
"adults experience different seg-day burden than younger "
"adults, controlling for gender, region, and year."
),
treatment="T_50plus",
outcome="TotalAggregatedDays_Segregation",
covariates=["Gender", "Region_MostRecentPlacement",
"EndFiscalYear"],
cluster_col="EndFiscalYear",
)
# ── More aggregate disparity RF: c05, c08, b04, b08 ────────────────
[docs]
def analyze_c05_ruhela_aggregate(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Aggregate RF on c05 -- non-Christian religion -> individuals in RC.
T = (Religion != "Christian" and not "No Religion" and not
"Unknown") indicator. Tests population-level religious-minority
overrepresentation in RC, controlling for region.
"""
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("c05")
work = df.dropna(subset=[
"EndFiscalYear", "Religion", "Region_MostRecentPlacement",
"NumberIndividuals_RestrictiveConfinement"]).copy()
excluded = {"christian", "no religion", "unknown or not reported"}
work["T_minority_religion"] = (
~work["Religion"].astype(str).str.strip().str.lower().isin(excluded)
).astype(int)
return _ruhela_aggregate_on(
work, ds_id="c05",
source_label=("c05_individuals_in_segregation_and_restrictive_"
"confinement_religion_by_region.csv"),
title=("OTIS c05 -- Aggregate Ruhela formulation: "
"Non-Christian/non-majority religion -> Individuals in RC"),
interpretation=(
"Aggregate RF on c05 testing whether religious-minority "
"(non-Christian, non-no-religion, non-unknown) cells "
"produce more or fewer individuals in RC, controlling for "
"region and year. The 'minority' construct is reductive "
"but tractable for an aggregate population-level signal."
),
treatment="T_minority_religion",
outcome="NumberIndividuals_RestrictiveConfinement",
covariates=["Region_MostRecentPlacement"],
)
[docs]
def analyze_c08_ruhela_aggregate(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Aggregate RF on c08 -- non-Christian religion × gender -> RC.
T = (Religion != Christian / no religion / unknown) indicator.
Year + Gender FE. Companion to c05 with gender control.
"""
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("c08")
work = df.dropna(subset=[
"EndFiscalYear", "Religion", "Gender",
"NumberIndividuals_RestrictiveConfinement"]).copy()
excluded = {"christian", "no religion", "unknown or not reported"}
work["T_minority_religion"] = (
~work["Religion"].astype(str).str.strip().str.lower().isin(excluded)
).astype(int)
return _ruhela_aggregate_on(
work, ds_id="c08",
source_label=("c08_individuals_in_segregation_and_restrictive_"
"confinement_religion_by_gender.csv"),
title=("OTIS c08 -- Aggregate Ruhela formulation: "
"Non-Christian/non-majority religion × Gender -> "
"Individuals in RC"),
interpretation=(
"Aggregate RF on c08, parallel to c05 but with gender "
"control. Tests the same religious-minority overrepresentation "
"question; gender + year FE absorb cross-gender baseline."
),
treatment="T_minority_religion",
outcome="NumberIndividuals_RestrictiveConfinement",
covariates=["Gender"],
)
[docs]
def analyze_b04_ruhela_aggregate(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Aggregate RF on b04 -- gender disparity in median seg duration.
b04 has Region × Gender × Measure (max/median/mode) duration. We
filter to Measure == "Median" and test T = Female -> Y = duration.
"""
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("b04")
work = df.dropna(subset=[
"EndFiscalYear", "Region_AtTimeOfPlacement", "Gender",
"Measure", "NumberConsecutiveDays_Segregation"]).copy()
work = work[work["Measure"].astype(str).str.strip()
== "Median"].copy()
if work.empty:
return RichResult(
title="b04 aggregate Ruhela",
warnings=["b04 has no Median rows after filter"],
)
work["T_female"] = _female_indicator(work["Gender"])
return _ruhela_aggregate_on(
work, ds_id="b04",
source_label=("b04_segregation_placements_consecutive_durations"
"_by_region.csv"),
title=("OTIS b04 -- Aggregate Ruhela formulation: "
"Female -> median consecutive seg duration"),
interpretation=(
"Aggregate RF on b04 testing gender disparity in the "
"MEDIAN consecutive duration of segregation placements, "
"by region. Median rather than max because max is "
"outlier-driven; mode is privacy-suppressed in many cells."
),
treatment="T_female",
outcome="NumberConsecutiveDays_Segregation",
covariates=["Region_AtTimeOfPlacement"],
)
[docs]
def analyze_b08_ruhela_aggregate(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Aggregate RF on b08 -- gender disparity in median seg duration
by institution.
Companion to b04; b08 adds Institution_AtTimeOfPlacement as a
finer-grained spatial dimension. Region-FE; Institution as **GEE
cluster group** (24 levels).
"""
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("b08")
work = df.dropna(subset=[
"EndFiscalYear", "Region_AtTimeOfPlacement",
"Institution_AtTimeOfPlacement", "Gender",
"Measure", "NumberConsecutiveDays_Segregation"]).copy()
work = work[work["Measure"].astype(str).str.strip()
== "Median"].copy()
if work.empty:
return RichResult(
title="b08 aggregate Ruhela",
warnings=["b08 has no Median rows after filter"],
)
work["T_female"] = _female_indicator(work["Gender"])
return _ruhela_aggregate_on(
work, ds_id="b08",
source_label=("b08_segregation_placements_consecutive_durations"
"_by_institution.csv"),
title=("OTIS b08 -- Aggregate Ruhela formulation: "
"Female -> median seg duration, GEE-clustered on institution"),
interpretation=(
"Aggregate RF on b08 testing gender disparity in median "
"segregation duration with region-FE + GEE-Poisson "
"clustered on institution (24 levels). Companion to b04 "
"with finer-grained spatial control."
),
treatment="T_female",
outcome="NumberConsecutiveDays_Segregation",
covariates=["Region_AtTimeOfPlacement"],
cluster_group="Institution_AtTimeOfPlacement",
)
# ── Final aggregate RF coverage: b09 / c02 / c10 / c12 + d-series ──
[docs]
def analyze_b09_ruhela_aggregate(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Aggregate RF on b09 -- gender disparity in placement-count distribution.
b09 has rows of (Year × NumberPlacements_Segregation × Gender ×
NumberIndividuals_Segregation). We test T = Female -> Y =
NumberIndividuals_Segregation, with NumberPlacements as a
categorical FE so we adjust for the bin-cell structure.
"""
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("b09")
work = df.dropna(subset=[
"EndFiscalYear", "NumberPlacements_Segregation", "Gender",
"NumberIndividuals_Segregation"]).copy()
work["T_female"] = _female_indicator(work["Gender"])
return _ruhela_aggregate_on(
work, ds_id="b09",
source_label=("b09_individuals_in_segregation_number_of_times"
"_in_segregation.csv"),
title=("OTIS b09 -- Aggregate Ruhela formulation: "
"Female -> Number of individuals in segregation"),
interpretation=(
"Aggregate RF on b09 testing gender disparity in the "
"individuals-in-segregation count distribution, "
"controlling for the placement-count bin (categorical FE)."
),
treatment="T_female",
outcome="NumberIndividuals_Segregation",
covariates=["NumberPlacements_Segregation"],
)
[docs]
def analyze_c02_ruhela_aggregate(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Aggregate RF on c02 -- gender disparity by institution.
Companion to c01 with institution-level granularity. T = Female ->
Y = NumberIndividuals_RC. Region + year as fixed effects;
**Institution as a GEE cluster group** (marginal model with
cluster-robust SE) -- Institution has too many levels (~76) for an
FE specification on the 148-row table without collinearity.
Mirrors OTIS-RC's ``glmmTMB nbinom2`` clustered approach.
"""
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("c02")
work = df.dropna(subset=[
"EndFiscalYear", "Region_MostRecentPlacement",
"Institution_MostRecentPlacement", "Gender",
"NumberIndividuals_RestrictiveConfinement"]).copy()
work["T_female"] = _female_indicator(work["Gender"])
return _ruhela_aggregate_on(
work, ds_id="c02",
source_label=("c02_individuals_in_segregation_and_restrictive_"
"confinement_by_institution.csv"),
title=("OTIS c02 -- Aggregate Ruhela formulation: "
"Female -> Individuals in RC, GEE-clustered on institution"),
interpretation=(
"Aggregate RF on c02 testing gender disparity in RC counts. "
"Region + year as fixed effects; institution enters as a "
"GEE cluster group (~76 levels too many for FE specification "
"on this 148-row table). The GEE-Poisson row reports the "
"population-level IRR with cluster-robust SE -- the "
"marginal-model analogue of OTIS-RC's glmmTMB nbinom2 "
"approach with institution random intercept."
),
treatment="T_female",
outcome="NumberIndividuals_RestrictiveConfinement",
covariates=["Region_MostRecentPlacement"],
cluster_group="Institution_MostRecentPlacement",
)
[docs]
def analyze_c10_ruhela_aggregate(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Aggregate RF on c10 -- gender disparity in median RC days.
c10 has Region × Institution × Gender × Measure
(Maximum/Median/Mode) duration. Filter to Median; T = Female ->
Y = TotalAggregatedDays_RC. Region-FE + Year-FE; Institution as
**GEE cluster group** (25 levels) for cluster-robust marginal
inference.
"""
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("c10")
work = df.dropna(subset=[
"EndFiscalYear", "Region_MostRecentPlacement",
"Institution_MostRecentPlacement", "Gender", "Measure",
"TotalAggregatedDays_RestrictiveConfinement"]).copy()
work = work[work["Measure"].astype(str).str.strip()
== "Median"].copy()
if work.empty:
return RichResult(
title="c10 aggregate Ruhela",
warnings=["c10 has no Median rows after filter"],
)
work["T_female"] = _female_indicator(work["Gender"])
return _ruhela_aggregate_on(
work, ds_id="c10",
source_label=("c10_segregation_and_restrictive_confinement_"
"aggregate_durations_by_institution.csv"),
title=("OTIS c10 -- Aggregate Ruhela formulation: "
"Female -> median RC days, GEE-clustered on institution"),
interpretation=(
"Aggregate RF on c10 testing gender disparity in median "
"total RC days per individual. Region + year FE; "
"Institution as GEE cluster group (25 levels) for cluster-"
"robust marginal inference."
),
treatment="T_female",
outcome="TotalAggregatedDays_RestrictiveConfinement",
covariates=["Region_MostRecentPlacement"],
cluster_group="Institution_MostRecentPlacement",
)
[docs]
def analyze_c12_ruhela_aggregate(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Aggregate RF on c12 -- gender disparity in median RC days by region."""
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("c12")
work = df.dropna(subset=[
"EndFiscalYear", "Region_MostRecentPlacement", "Gender",
"Measure", "TotalAggregatedDays_RestrictiveConfinement"]).copy()
work = work[work["Measure"].astype(str).str.strip()
== "Median"].copy()
if work.empty:
return RichResult(
title="c12 aggregate Ruhela",
warnings=["c12 has no Median rows after filter"],
)
work["T_female"] = _female_indicator(work["Gender"])
return _ruhela_aggregate_on(
work, ds_id="c12",
source_label=("c12_segregation_and_restrictive_confinement_"
"aggregate_durations_by_region.csv"),
title=("OTIS c12 -- Aggregate Ruhela formulation: "
"Female -> median RC days, by region"),
interpretation=(
"Aggregate RF on c12, the region-level companion to c10. "
"Tests gender disparity in median total RC days with "
"region-FE only (less granular than c10's institution-FE)."
),
treatment="T_female",
outcome="TotalAggregatedDays_RestrictiveConfinement",
covariates=["Region_MostRecentPlacement"],
)
[docs]
def analyze_c11_ruhela_aggregate(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Aggregate Ruhela formulation on c11 -- long-duration cell concentration.
c11 has rows of (FY × Aggregate_Duration_bin × NumberIndividuals_RC
× NumberIndividuals_Seg). 11 duration bins from "1 day" to
"Greater than 30 days". Rather than treat duration as the
treatment-FE (it IS the categorical the data is binned by), we
test T = (long-duration bin: ≥16 days) -> Y = NumberIndividuals_RC.
Year-FE only.
"""
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("c11")
work = df.dropna(subset=[
"EndFiscalYear", "Aggregate_Duration",
"NumberIndividuals_RestrictiveConfinement"]).copy()
long_bins = {
"16 to 20 days", "21 to 25 days", "26 to 30 days",
"Greater than 30 days",
}
work["T_long_duration"] = work["Aggregate_Duration"].astype(str).isin(
long_bins).astype(int)
return _ruhela_aggregate_on(
work, ds_id="c11",
source_label=("c11_individuals_in_segregation_and_restrictive_"
"confinement_aggregate_lengths.csv"),
title=("OTIS c11 -- Aggregate Ruhela formulation: "
"Long-duration bin (≥16 days) -> Individuals in RC"),
interpretation=(
"Aggregate RF on c11 testing whether long-duration bins "
"(≥16 days) account for a disproportionate share of "
"individuals in restrictive confinement. The 11 duration "
"bins are binarised into long (4 bins, ≥16 days) vs short "
"(7 bins, ≤15 days). IRR > 1 ⇒ long-duration bins concentrate "
"more individuals; IRR < 1 ⇒ short-duration bins dominate. "
"Year-FE only."
),
treatment="T_long_duration",
outcome="NumberIndividuals_RestrictiveConfinement",
covariates=[],
)
# ── Region-cluster GEE variants of canonical formulations ──────────
[docs]
def analyze_c01_ruhela_aggregate_region_cluster(
df: "pd.DataFrame | None" = None) -> RichResult:
"""c01 alternative: Female -> RC count, GEE-clustered on year.
Sister analyzer to analyze_c01_ruhela_aggregate. The c01 case has
only 6 cells (3 years × 2 genders), so cluster_group="EndFiscalYear"
gives a 3-cluster GEE -- extreme small-cluster bias. This variant
is shipped for transparency: shows that with so few clusters,
Poisson and GEE-Poisson give nearly identical numbers because
there's no clustering structure to exploit.
"""
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("c01")
work = df.dropna(subset=[
"EndFiscalYear", "Gender",
"NumberIndividuals_RestrictiveConfinement"]).copy()
work["T_female"] = _female_indicator(work["Gender"])
return _ruhela_aggregate_on(
work, ds_id="c01-RC",
source_label=("c01_individuals_in_segregation_and_restrictive_"
"confinement_total_individuals.csv "
"(year-clustered variant)"),
title=("OTIS c01 -- Aggregate Ruhela formulation, year-clustered: "
"Female -> Individuals in RC"),
interpretation=(
"Year-clustered GEE variant of c01. With only 3 fiscal "
"years (= 3 clusters), the GEE inference is unreliable -- "
"this analyzer is shipped for transparency, not as the "
"primary inference. The standard c01 analyzer remains the "
"default."
),
treatment="T_female",
outcome="NumberIndividuals_RestrictiveConfinement",
covariates=[],
cluster_group="EndFiscalYear",
)
[docs]
def analyze_c04_ruhela_aggregate_region_cluster(
df: "pd.DataFrame | None" = None) -> RichResult:
"""c04 alternative: Indigenous -> RC, GEE-clustered on region.
Sister to analyze_c04_ruhela_aggregate (which uses region as FE).
With 5 regions, the GEE-cluster:Region inference treats region as
a marginal-model grouping. Useful when wide region heterogeneity
in case-mix matters for SE.
"""
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("c04")
work = df.dropna(subset=[
"EndFiscalYear", "Race", "Region_MostRecentPlacement",
"NumberIndividuals_RestrictiveConfinement"]).copy()
work["T_indigenous"] = (work["Race"].astype(str).str.lower()
== "indigenous").astype(int)
return _ruhela_aggregate_on(
work, ds_id="c04-RC",
source_label=("c04_individuals_in_segregation_and_restrictive_"
"confinement_race_by_region.csv "
"(region-clustered variant)"),
title=("OTIS c04 -- Aggregate Ruhela formulation, region-clustered: "
"Indigenous -> Individuals in RC"),
interpretation=(
"Region-clustered GEE variant of c04. The standard c04 "
"analyzer enters region as FE; this variant treats region "
"as a GEE cluster group (5 regions). Cluster-robust SE "
"accounts for within-region case-mix correlation. Both "
"should give similar IRR; the SE differs."
),
treatment="T_indigenous",
outcome="NumberIndividuals_RestrictiveConfinement",
covariates=[],
cluster_group="Region_MostRecentPlacement",
)
# ── d-series Ruhela formulations on death counts ───────────────────
[docs]
def analyze_d02_ruhela_aggregate(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""d02 -- gender -> custodial death count, with year FE."""
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("d02")
work = df.dropna(subset=["Year", "Gender",
"Number_CustodialDeaths"]).copy()
work["T_female"] = _female_indicator(work["Gender"])
return _ruhela_aggregate_on(
work, ds_id="d02",
source_label="d02_deaths_in_custody_gender.csv",
title=("OTIS d02 -- Aggregate Ruhela formulation: "
"Female -> Number of custodial deaths"),
interpretation=(
"Aggregate RF on d02 testing gender disparity in custodial "
"death counts. Calendar-year aggregate, very small N (6 "
"rows = 3 years × 2 genders). Wide CI expected."
),
treatment="T_female",
outcome="Number_CustodialDeaths",
covariates=[],
year_col="Year",
)
[docs]
def analyze_d03_ruhela_aggregate(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""d03 -- Indigenous -> custodial death count, with year FE."""
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("d03")
work = df.dropna(subset=["Year", "Race",
"Number_CustodialDeaths"]).copy()
work["T_indigenous"] = (work["Race"].astype(str).str.lower()
== "indigenous").astype(int)
return _ruhela_aggregate_on(
work, ds_id="d03",
source_label="d03_deaths_in_custody_race.csv",
title=("OTIS d03 -- Aggregate Ruhela formulation: "
"Indigenous -> Number of custodial deaths"),
interpretation=(
"Aggregate RF on d03 testing Indigenous overrepresentation "
"in custodial death counts. Small-sample warning: 17 rows; "
"Indigenous-specific cells may be privacy-suppressed in "
"some years."
),
treatment="T_indigenous",
outcome="Number_CustodialDeaths",
covariates=[],
year_col="Year",
)
[docs]
def analyze_d04_ruhela_aggregate(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""d04 -- non-majority religion -> custodial death count."""
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("d04")
work = df.dropna(subset=["Year", "Religion",
"Number_CustodialDeaths"]).copy()
excluded = {"christian", "no religion", "unknown or not reported"}
work["T_minority_religion"] = (
~work["Religion"].astype(str).str.strip().str.lower().isin(excluded)
).astype(int)
return _ruhela_aggregate_on(
work, ds_id="d04",
source_label="d04_deaths_in_custody_religion.csv",
title=("OTIS d04 -- Aggregate Ruhela formulation: "
"Non-majority religion -> Number of custodial deaths"),
interpretation=(
"Aggregate RF on d04 testing whether non-majority-religion "
"cells produce higher death counts. 20 rows total. Small-"
"sample warning."
),
treatment="T_minority_religion",
outcome="Number_CustodialDeaths",
covariates=[],
year_col="Year",
)
[docs]
def analyze_d05_ruhela_aggregate(df: "pd.DataFrame | None" = None,
) -> RichResult:
"""d05 -- age 50+ -> custodial death count."""
from .otis_datasets import load_otis_dataset
if df is None:
df = load_otis_dataset("d05")
work = df.dropna(subset=["Year", "Age_Category",
"Number_CustodialDeaths"]).copy()
work["T_50plus"] = _age_50plus_indicator(work["Age_Category"])
return _ruhela_aggregate_on(
work, ds_id="d05",
source_label="d05_deaths_in_custody_age_category.csv",
title=("OTIS d05 -- Aggregate Ruhela formulation: "
"Age 50+ -> Number of custodial deaths"),
interpretation=(
"Aggregate RF on d05 testing age-50+ overrepresentation "
"in custodial death counts. Old-age mortality is well-"
"documented in custody literature; this formulation "
"quantifies the IRR at the population aggregate level."
),
treatment="T_50plus",
outcome="Number_CustodialDeaths",
covariates=[],
year_col="Year",
)
# ── Ruhela formulations grid summary ───────────────────────────────
[docs]
def analyze_ruhela_grid() -> RichResult:
"""One-page Ruhela formulations grid summary (24 aggregates).
Runs every aggregate Ruhela formulation analyzer + the region-cluster
GEE variants, and presents a single comparison table.
"""
aggregates_to_run = [
("b03", analyze_b03_ruhela_aggregate),
("b04", analyze_b04_ruhela_aggregate),
("b06", analyze_b06_ruhela_aggregate),
("b07", analyze_b07_ruhela_aggregate),
("b08", analyze_b08_ruhela_aggregate),
("b09", analyze_b09_ruhela_aggregate),
("c01", analyze_c01_ruhela_aggregate),
("c01-RC", analyze_c01_ruhela_aggregate_region_cluster),
("c02", analyze_c02_ruhela_aggregate),
("c03", analyze_c03_ruhela_aggregate),
("c04", analyze_c04_ruhela_aggregate),
("c04-RC", analyze_c04_ruhela_aggregate_region_cluster),
("c05", analyze_c05_ruhela_aggregate),
("c06", analyze_c06_ruhela_aggregate),
("c07", analyze_c07_ruhela_aggregate),
("c08", analyze_c08_ruhela_aggregate),
("c09", analyze_c09_ruhela_aggregate),
("c10", analyze_c10_ruhela_aggregate),
("c11", analyze_c11_ruhela_aggregate),
("c12", analyze_c12_ruhela_aggregate),
("d02", analyze_d02_ruhela_aggregate),
("d03", analyze_d03_ruhela_aggregate),
("d04", analyze_d04_ruhela_aggregate),
("d05", analyze_d05_ruhela_aggregate),
]
rows: list = []
for ds_id, fn in aggregates_to_run:
try:
r = fn()
except Exception as e: # noqa: BLE001
rows.append([ds_id, "--", "load_err", str(type(e).__name__),
str(e)[:30], "--", "--"])
continue
if r.warnings:
rows.append([ds_id, "--", "warn",
(r.warnings or [""])[0][:50],
"--", "--", "--"])
continue
if not (r.tables and r.tables[0].get("rows")):
rows.append([ds_id, "--", "no rows", "--", "--", "--", "--"])
continue
# Estimator priority (most -> least appropriate):
# 1. GEE-NB Exch -- cluster-robust + overdispersion-aware (OTIS-RC analog)
# 2. GEE-Poisson Exch -- cluster-robust, equidispersed
# 3. NB GLM -- overdispersion-aware, no cluster
# 4. Poisson GLM -- fallback
def _find(label_substr):
return next((row for row in r.tables[0]["rows"]
if label_substr in str(row[0])
and row[2] != "fit failed"), None)
primary = (_find("GEE-NB") and _find("GEE-NB (cluster")
or _find("GEE-NB"))
primary_type = "GEE-NB (cluster)" if primary else None
if primary is None:
primary = _find("GEE-Poisson")
primary_type = "GEE-Poisson (cluster)" if primary else None
if primary is None:
primary = next((row for row in r.tables[0]["rows"]
if row[0] == "NB" and row[2] != "fit failed"),
None)
primary_type = "NB GLM" if primary else None
if primary is None:
primary = next((row for row in r.tables[0]["rows"]
if row[0] == "Poisson"
and row[2] != "fit failed"), None)
primary_type = "Poisson GLM" if primary else None
title = (r.title or "")[:60]
if primary:
rows.append([
ds_id, primary_type, title,
primary[2], primary[3], primary[4],
primary[6][:30] if len(primary) > 6 else "--",
])
else:
rows.append([
ds_id, "all failed", title,
"--", "--", "--", "--",
])
return RichResult(
title=("OTIS Ruhela formulations grid summary -- one-page IRR/ATE"
" comparison across all aggregate RF analyzers"),
summary_lines=[
("Aggregate datasets covered", len(aggregates_to_run)),
("Per-row datasets covered (a01, b01, b02; alt-T on a01)",
"see analyze_a01/b01/b02_ruhela_formulations + alt-T variants"),
("Doob chi² datasets (c, d)",
"see analyze_c_doob_chi2 / analyze_d_doob_chi2"),
("Primary estimator priority",
"GEE-NB > GEE-Poisson > NB GLM > Poisson GLM"),
],
tables=[{
"title": ("Aggregate Ruhela formulations -- primary IRR per "
"dataset (GEE cluster-robust > NB GLM):"),
"headers": ["DS", "Type", "Formulation", "IRR",
"95% CI", "p", "Notes"],
"rows": rows,
}],
interpretation=(
"One-page comparison of every aggregate Ruhela formulation "
"currently shipped. The primary IRR is the GEE-Poisson "
"cluster-robust estimate when available (more conservative "
"inference); otherwise the NB GLM estimate. IRR > 1 ⇒ "
"treatment increases the count; IRR < 1 ⇒ decreases. Per-row "
"a01/b01 RFs are the canonical individual-level contrasts. "
"See docs/source/methods/ruhela_formulations.md."
),
payload={"n_aggregates": len(aggregates_to_run)},
)
[docs]
def analyze_ruhela_master(*, include_per_row: bool = False) -> RichResult:
"""Paper-ready master report -- every Ruhela formulation in one RichResult.
Sections:
1. Aggregate Ruhela formulations (24 datasets) -- IRR comparison
2. Per-row Ruhela formulations on a01/b01/b02 -- primary IRM-DML ATE
(only when ``include_per_row=True``; ~5-7 min runtime)
3. Doob chi-square family on c-series + d-series
For thesis / paper writing.
Parameters
----------
include_per_row : bool
If True, also runs the slow per-row RFs on a01/b01/b02 (full
10-estimator DLRM). Default False -- runs in ~1 second from
cached aggregates only.
"""
sections: list = []
# === Section 1: aggregate grid (always) ===
grid = analyze_ruhela_grid()
if grid.tables:
sections.append({
"title": ("§1 Aggregate Ruhela formulations -- primary "
"IRR per dataset:"),
"headers": grid.tables[0]["headers"],
"rows": grid.tables[0]["rows"],
})
# === Section 2: per-row RFs (optional, slow) ===
per_row_rows: list = []
if include_per_row:
for ds_id, fn in [
("a01", analyze_a01_ruhela_formulations),
("b01", analyze_b01_ruhela_formulations),
("b02", analyze_b02_ruhela_formulations),
]:
try:
r = fn()
except Exception as e: # noqa: BLE001
per_row_rows.append([ds_id, "load_err",
str(type(e).__name__),
str(e)[:30], "--", "--"])
continue
if r.warnings:
per_row_rows.append([ds_id, "warn",
(r.warnings or [""])[0][:50],
"--", "--", "--"])
continue
if not (r.tables and r.tables[0].get("rows")):
per_row_rows.append([ds_id, "no rows", "--", "--", "--", "--"])
continue
irm_row = next((row for row in r.tables[0]["rows"]
if "IRM-DML" in str(row[0])
and "ATE" in str(row[1])), None)
if irm_row:
per_row_rows.append([
ds_id, "IRM-DML ATE",
irm_row[2], irm_row[3], irm_row[4], irm_row[5],
])
else:
per_row_rows.append([ds_id, "no IRM-DML ATE",
"--", "--", "--", "--"])
if per_row_rows:
sections.append({
"title": ("§2 Per-row Ruhela formulations (canonical "
"T_high_ac -> vm_count) -- IRM-DML ATE:"),
"headers": ["DS", "Estimator", "ATE", "SE", "95% CI", "p"],
"rows": per_row_rows,
})
# === Section 3: Doob chi-square (always, fast) ===
try:
c_doob = analyze_c_doob_chi2()
d_doob = analyze_d_doob_chi2()
doob_rows: list = []
for label, r in [("c-series", c_doob), ("d-series", d_doob)]:
if r.tables and r.tables[0].get("rows"):
# Pick the χ² rows
for row in r.tables[0]["rows"][:3]:
doob_rows.append([label, *row[:5]])
if doob_rows:
sections.append({
"title": ("§3 Doob chi-square family -- Pearson χ² + "
"Cramer's V on aggregate contingency tables:"),
"headers": ["Series", "Slice/measure", "χ²", "dof",
"p", "Cramer's V"],
"rows": doob_rows,
})
except Exception: # noqa: BLE001
pass
# === Section 4: Sprott-Doob CRIMSL/Schulich federal SIU evidence ===
# Federal-aggregate analyses extending the Ruhela formulations to the
# national-level Sprott / Doob / Iftene CRIMSL + Schulich Law tables.
try:
from . import sprott_doob as _sd
sd_rows = []
# Feb 2021 Mandela classification (Table 19) headline
for r in _sd.TABLE19_MANDELA_CLASSIFICATION:
sd_rows.append(["SD-2021-Feb T19", r["category"],
f"{r['percent']:.1f}%", r["n"], "--", "--"])
# Feb 2021 regional torture rates (Table 23) headline
for r in _sd.TABLE23_REGIONAL_TORTURE_RATES:
sd_rows.append(["SD-2021-Feb T23", r["region"],
"--", "--",
f"{r['solitary_rate']:.2f}/1k",
f"{r['torture_rate']:.2f}/1k"])
# Feb 2021 -- additional headline rows from Tables 11, 12, 22
if hasattr(_sd, "TABLE11_CHISQ"):
sd_rows.append(["SD-2021-Feb T11",
"Region × stay-length χ²",
f"χ²={_sd.TABLE11_CHISQ['chi2']}, "
f"df={_sd.TABLE11_CHISQ['df']}",
f"p<{_sd.TABLE11_CHISQ['p']}",
"--", "--"])
if hasattr(_sd, "TABLE12_REGIONAL_OVERREP"):
for r in _sd.TABLE12_REGIONAL_OVERREP:
ratio = r["siu_pct"] / r["pop_pct"]
sd_rows.append(["SD-2021-Feb T12",
f"{r['region']} over-/under-rep",
f"SIU {r['siu_pct']:.1f}%",
f"Pop {r['pop_pct']:.1f}%",
f"{ratio:.2f}×", "--"])
if hasattr(_sd, "TABLE22_CHISQ"):
sd_rows.append(["SD-2021-Feb T22",
"Region × Mandela χ²",
f"χ²={_sd.TABLE22_CHISQ['chi2']}, "
f"df={_sd.TABLE22_CHISQ['df']}",
f"p<{_sd.TABLE22_CHISQ['p']}",
"--", "--"])
# May 2021 -- IEDM headline
h = _sd.HEADLINE_MAY2021
sd_rows.append(["SDI-2021-May", "n_iedm_stays_reviewed", "--",
h["n_iedm_stays_reviewed"], "--", "--"])
sd_rows.append(["SDI-2021-May", "% IEDM stay-in (Table 9)",
f"{h['pct_stay_in_decisions_among_rendered']}%",
"--", "--", "--"])
sd_rows.append(["SDI-2021-May", "% pre-empted by CSC (T9)",
f"{h['pct_csc_moved_prisoner_before_iedm']}%",
"--", "--", "--"])
sd_rows.append(["SDI-2021-May", "long-stay no-IEDM record (T15)",
"--", h["n_long_stay_no_iedm_record_min76d"],
"--", "--"])
sd_rows.append(["SDI-2021-May", "Indigenous share (T1)",
f"{h['indigenous_share_of_reviewed_stays_pct']:.1f}%",
"--", "--", "--"])
# May 2021 -- additional headline rows from Tables 5, 8, 10
if hasattr(_sd, "TABLE5_MAY2021_CHISQ"):
sd_rows.append(["SDI-2021-May T5",
"Race × #IEDM-reviews χ²",
f"χ²={_sd.TABLE5_MAY2021_CHISQ['chi2']}, "
f"df={_sd.TABLE5_MAY2021_CHISQ['df']}",
f"p<{_sd.TABLE5_MAY2021_CHISQ['p']}",
"--", "--"])
if hasattr(_sd, "TABLE8_MAY2021_RACE_X_STAY_121PLUS"):
for r in _sd.TABLE8_MAY2021_RACE_X_STAY_121PLUS:
sd_rows.append(["SDI-2021-May T8",
f"{r['race']} ≥121d share",
f"{r['pct_121_380']:.1f}%",
r["n_121_380"], "--", "--"])
if hasattr(_sd, "TABLE10_MAY2021_CHISQ"):
sd_rows.append(["SDI-2021-May T10",
"Per-IEDM 'remain' rate variance χ²",
f"χ²={_sd.TABLE10_MAY2021_CHISQ['chi2']}, "
f"df={_sd.TABLE10_MAY2021_CHISQ['df']}",
f"p<{_sd.TABLE10_MAY2021_CHISQ['p']}",
"37.5%-85.7%", "--"])
if sd_rows:
sections.append({
"title": ("§4 RF-extended (federal): Sprott-Doob CRIMSL "
"+ Sprott-Doob-Iftene Schulich Law SIU "
"analyses (national-aggregate companion):"),
"headers": ["Source", "Item", "Percent/Stat",
"N", "Solitary/1k", "Torture/1k"],
"rows": sd_rows,
})
except Exception: # noqa: BLE001
pass
# === Section 5: Doob T-539-20 affidavit -- CCRSO national tables ===
try:
from . import doob_trends as _dt
doob_nat_rows = []
# Table 1: 5-yr avg releases by type
for r in _dt.CCRSO_TABLE1_RELEASES:
doob_nat_rows.append([
"Doob-T1", r["type"],
f"{r['revoke_violent']:.1f} ({r['revoke_violent_pct']:.2f}%)",
f"{r['success']:.1f} ({r['success_pct']:.1f}%)",
f"{r['total']:.1f}",
])
# Table 3: age over-/under-rep IRR (one-line per row)
for r in _dt.CCRSO_TABLE3_AGE:
irr_c = r["csc_in_custody_pct"] / r["canada_adult_pop_pct"]
doob_nat_rows.append([
"Doob-T3", f"age {r['age_group']}",
f"adult {r['canada_adult_pop_pct']:.1f}%",
f"custody {r['csc_in_custody_pct']:.1f}%",
f"IRR_custody={irr_c:.2f}",
])
if doob_nat_rows:
sections.append({
"title": ("§5 RF-extended (federal): Doob T-539-20 "
"affidavit -- CCRSO 2018 prisoner-flow & age "
"tables (national-aggregate companion):"),
"headers": ["Source", "Item", "Col A", "Col B", "Col C"],
"rows": doob_nat_rows,
})
except Exception: # noqa: BLE001
pass
# === Section 6: Mandela-RF on OTIS provincial data ===
# Bridges Sprott-Doob's federal Mandela classifier into the
# Ruhela formulations on Ontario provincial OTIS data.
try:
c11_man = analyze_c11_mandela_classification()
b05_man = analyze_b05_mandela_classification()
cmp_man = analyze_otis_mandela_provincial_vs_federal()
man_rows = []
# Provincial b05 (placements) per year
if b05_man.tables and b05_man.tables[0].get("rows"):
for r in b05_man.tables[0]["rows"]:
man_rows.append(["b05 placement", str(r[0]),
r[2], r[4], r[5]])
# Provincial c11 (individuals) Segregation per year
if c11_man.payload.get("rows"):
for r in c11_man.payload["rows"]:
if r[1] == "Segregation":
man_rows.append([f"c11 individuals ({r[1]})",
str(r[0]), r[3], r[5], r[6]])
# Federal-vs-provincial cross-comparison summary row
if cmp_man.payload.get("max_provincial_torture_pct") is not None:
gap = cmp_man.payload["gap_pp"]
max_pct = cmp_man.payload["max_provincial_torture_pct"]
max_year = cmp_man.payload["max_provincial_torture_year"]
man_rows.append([
"Federal SIU vs Prov peak (Mandela torture)",
f"max {max_year}",
"--", f"prov {max_pct:.1f}%",
f"gap {gap:+.1f} pp vs federal 9.9%",
])
if man_rows:
sections.append({
"title": ("§6 Mandela-RF -- Ontario provincial Mandela "
"classification (OTIS b05/c11) + federal "
"cross-comparison:"),
"headers": ["Source", "Year/Slice", "Solitary %",
"Torture %", "N or note"],
"rows": man_rows,
})
except Exception: # noqa: BLE001
pass
return RichResult(
title=("OTIS Ruhela formulations -- paper-ready master report "
"(provincial + federal-aggregate companion)"),
summary_lines=[
("Sections", len(sections)),
("Aggregate RFs", "24 (b03-b09 + c01-c12 + d02-d05 + region-cluster variants)"),
("Per-row RFs included", include_per_row),
("Doob chi-square", "c-series + d-series families"),
("Methodology attribution", "DLRM (Doob-Levinsky-Ruhela-Medina)"),
("Acknowledgements (separate)", "Jauregui, A. Laniyonu"),
("Federal RF extensions",
"Sprott-Doob CRIMSL (Feb 2021) + Sprott-Doob-Iftene "
"Schulich Law (May 2021) + Doob T-539-20 (CCRSO 2018)"),
("Federal panel context",
"morie.siuiap (Sapers chair, Doob, Sprott)"),
],
tables=sections,
interpretation=(
"Master report compiling every Ruhela formulation analyzer "
"shipped in morie. Sections: (1) aggregate IRR grid on "
"OTIS, cluster-robust where high-cardinality grouping "
"applies; (2) per-row IRM-DML ATE on a01/b01/b02 (when "
"include_per_row=True); (3) Doob χ² on c-series + d-series "
"demographic contingency tables; (4) RF-extended federal "
"companion -- Sprott-Doob CRIMSL Feb 2021 Mandela-Rules "
"classifier (28.4% solitary, 9.9% torture) + regional "
"torture rates (Pacific 22.6× Ontario), and Sprott-Doob-"
"Iftene Schulich Law May 2021 IEDM-review evaluation "
"(N=265 stays; 87% stay-in rate; 30% pre-empted; 12 IEDMs "
"varied 38%-86%); (5) Doob T-539-20 affidavit CCRSO 2018 "
"tables (release outcomes, prisoner flow, age over-/under-"
"representation). Together these brackets the argument "
"from federal aggregates (Sections 4-5) down to provincial "
"individual-level evidence (Sections 1-3). Methodology "
"attribution remains DLRM; the federal-aggregate evidence "
"is replicated FROM Sprott / Doob / Iftene published work, "
"The man who moves a mountain begins by carrying away small stones. -- Confucius"
),
payload={"include_per_row": include_per_row,
"n_sections": len(sections)},
)
[docs]
def analyze_b02_dlrm(*args, **kwargs):
"""DLRM short alias for ``analyze_b02_ruhela_formulations``."""
return analyze_b02_ruhela_formulations(*args, **kwargs)
[docs]
def analyze_a01_dlrm(*args, **kwargs):
"""DLRM short alias for ``analyze_a01_ruhela_formulations``."""
return analyze_a01_ruhela_formulations(*args, **kwargs)
[docs]
def analyze_b01_dlrm(*args, **kwargs):
"""DLRM short alias for ``analyze_b01_ruhela_formulations``."""
return analyze_b01_ruhela_formulations(*args, **kwargs)
# ── Mandela-RF: Mandela Rules classification on OTIS provincial data ─
#
# Bridges the Sprott-Doob federal Mandela classifier
# (`morie.sprott_doob.classify_mandela`) into the Ruhela formulations
# framework on Ontario provincial OTIS data.
#
# Mandela Rule 43 thresholds applied to OTIS duration bins:
# ≤ 15 days -> "Solitary Confinement" (Mandela Rule 44 if conditions met)
# ≥ 16 days -> "Torture" (Mandela Rule 43 -- prolonged solitary confinement)
#
# OTIS b05 has bins: 1d, 2d, 3d, 4d, 5d, 6-10d, 11-15d, 16-20d, 21-25d,
# 26-30d, >30d. The first 7 bins are "≤15 days"; the last 4 are "≥16 days".
#
# Caveat: the Mandela Rules' full definition requires "22+ hours per day
# without meaningful human contact". OTIS exposes duration but NOT
# hours-out-of-cell. So the Ontario classification is a DURATION-ONLY
# proxy for the Mandela threshold: a placement of ≥16 days CROSSES the
# Mandela 'prolonged' line, but whether it formally meets the full
# torture definition depends on hours-out-of-cell data we don't have.
# The user should treat the b05/c11 figures as a CEILING -- actual
# Mandela-classified torture rates may be lower if conditions are
# better than the federal SIU regime.
_OTIS_MANDELA_SOLITARY_BINS = [
"1 day", "2 days", "3 days", "4 days", "5 days",
"6 to 10 days", "11 to 15 days",
]
_OTIS_MANDELA_TORTURE_BINS = [
"16 to 20 days", "21 to 25 days", "26 to 30 days",
"Greater than 30 days",
]
def _classify_otis_bins(duration_str: str) -> str:
"""Map an OTIS duration-bin label to a Mandela class."""
if duration_str in _OTIS_MANDELA_SOLITARY_BINS:
return "Solitary Confinement (≤15d)"
if duration_str in _OTIS_MANDELA_TORTURE_BINS:
return "Torture (≥16d)"
return "Unknown"
[docs]
def analyze_b05_mandela_classification(
df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Mandela-RF on b05 -- per-placement Mandela classification by year.
Applies the Sprott-Doob 15-day Mandela threshold to OTIS b05 (Ontario
provincial segregation placement counts by binned duration). Reports
proportions of placements ≤15 days (solitary) vs ≥16 days (torture)
per fiscal year + a federal-vs-provincial comparison.
"""
df = df if df is not None else load_otis_dataset("b05")
df = df.copy()
df["mandela_class"] = df["Consecutive_Duration"].astype(str).apply(
_classify_otis_bins)
# Per-year totals
rows = []
for year, ydf in df.groupby("EndFiscalYear", sort=True):
n_solitary = int(ydf.loc[
ydf["mandela_class"] == "Solitary Confinement (≤15d)",
"Number_SegregationPlacements"].sum())
n_torture = int(ydf.loc[
ydf["mandela_class"] == "Torture (≥16d)",
"Number_SegregationPlacements"].sum())
total = n_solitary + n_torture
if total == 0:
continue
sol_pct = 100.0 * n_solitary / total
tor_pct = 100.0 * n_torture / total
rows.append([int(year), n_solitary, f"{sol_pct:.1f}%",
n_torture, f"{tor_pct:.1f}%", total])
# Federal-vs-provincial comparison row (using 2023 + Sprott-Doob Feb
# 2021 federal numbers as reference points).
sd_federal_solitary_pct = 28.4 # Sprott-Doob T19
sd_federal_torture_pct = 9.9
sd_federal_n = 1960
if rows:
latest_year_row = rows[-1] # most recent year
prov_torture_pct = float(
latest_year_row[4].rstrip("%"))
else:
prov_torture_pct = float("nan")
return RichResult(
title=("Mandela-RF on OTIS b05 -- per-placement Mandela "
"classification (Ontario provincial segregation, by "
"fiscal year)"),
summary_lines=[
("Source", "OTIS b05 (segregation placement counts × duration)"),
("Mandela threshold", "Rule 43 -- 15 days (≤15 = solitary; "
"≥16 = torture)"),
("Caveat",
"Duration-only proxy (no hours-out-of-cell in OTIS); "
"treat as upper bound -- actual Mandela torture rate may "
"be lower if conditions are less restrictive than "
"federal SIUs"),
("Federal SD-2021 reference (CSC SIUs N=1960)",
f"Solitary {sd_federal_solitary_pct}%, "
f"Torture {sd_federal_torture_pct}%"),
("Latest provincial torture rate",
f"{prov_torture_pct:.2f}% -- vs federal "
f"{sd_federal_torture_pct}%; "
f"{'higher' if prov_torture_pct > sd_federal_torture_pct else 'lower'}"),
],
tables=[{
"title": ("OTIS b05 Mandela-class by fiscal year (placements "
"as the unit):"),
"headers": ["Fiscal year", "Solitary N", "Solitary %",
"Torture N", "Torture %", "Total"],
"rows": rows,
}],
interpretation=(
"Applies the Sprott-Doob Mandela-Rules duration threshold "
"(15 days, Rule 43) to OTIS provincial segregation "
"placements. The 'torture' bin (≥16 days) here counts "
"placements crossing the prolonged-solitary line; "
"comparable to Sprott-Doob's federal 9.9% torture rate, "
"though the OTIS data lacks hours-out-of-cell, so this "
"is a duration-only proxy. Compare to Sprott-Doob "
"Federal Court Affidavit Table 19 (federal SIU regime) "
"and Sprott-Doob Feb 2021 paper for the federal-aggregate "
"context."
),
payload={
"rows": rows,
"federal_reference": {
"solitary_pct": sd_federal_solitary_pct,
"torture_pct": sd_federal_torture_pct,
"n": sd_federal_n,
},
},
)
[docs]
def analyze_c11_mandela_classification(
df: "pd.DataFrame | None" = None,
) -> RichResult:
"""Mandela-RF on c11 -- per-individual Mandela classification by year.
Applies the 15-day threshold to OTIS c11 (Ontario provincial counts of
INDIVIDUALS by binned aggregate duration). Reports both restrictive-
confinement and segregation-only views; the latter is the more
conservative and directly comparable to Sprott-Doob federal SIUs.
"""
df = df if df is not None else load_otis_dataset("c11")
df = df.copy()
df["mandela_class"] = df["Aggregate_Duration"].astype(str).apply(
_classify_otis_bins)
rows = []
for (year, kind), gdf in df.groupby(
["EndFiscalYear", # noqa: PD010
df["mandela_class"]], sort=True):
pass # placeholder -- restructure below
# Per-year totals across both 'kind' columns
rows = []
for year, ydf in df.groupby("EndFiscalYear", sort=True):
for kind_col, kind_label in [
("NumberIndividuals_Segregation", "Segregation"),
("NumberIndividuals_RestrictiveConfinement",
"Restrictive confinement"),
]:
n_solitary = int(ydf.loc[
ydf["mandela_class"] == "Solitary Confinement (≤15d)",
kind_col].sum())
n_torture = int(ydf.loc[
ydf["mandela_class"] == "Torture (≥16d)",
kind_col].sum())
total = n_solitary + n_torture
if total == 0:
continue
sol_pct = 100.0 * n_solitary / total
tor_pct = 100.0 * n_torture / total
rows.append([int(year), kind_label,
n_solitary, f"{sol_pct:.1f}%",
n_torture, f"{tor_pct:.1f}%", total])
return RichResult(
title=("Mandela-RF on OTIS c11 -- per-individual Mandela "
"classification (Ontario provincial restrictive-"
"confinement & segregation, by fiscal year)"),
summary_lines=[
("Source", "OTIS c11 (individuals × aggregate duration)"),
("Mandela threshold", "Rule 43 -- 15 days "
"(≤15 = solitary; ≥16 = torture)"),
("Two views",
"Segregation (closer match to federal SIU); "
"Restrictive Confinement (broader Ontario superset)"),
("Caveat",
"Duration-only proxy -- no hours-out-of-cell in OTIS"),
("Federal SD-2021 reference (N=1960)",
"Solitary 28.4%, Torture 9.9%"),
],
tables=[{
"title": ("OTIS c11 Mandela-class by year × confinement "
"type (individuals as the unit):"),
"headers": ["Year", "Type", "Solitary N", "Solitary %",
"Torture N", "Torture %", "Total"],
"rows": rows,
}],
interpretation=(
"Per-individual Mandela classification on OTIS c11. The "
"'Segregation' view is most directly comparable to the "
"Sprott-Doob federal SIU classification (28.4% solitary, "
"9.9% torture across N=1960). The 'Restrictive Confinement' "
"view is broader, including Ontario's mediated-conditions "
"isolation; expect lower torture proportions there. "
"Both views are duration-only proxies because OTIS does "
"not expose hours-out-of-cell."
),
payload={"rows": rows},
)
[docs]
def analyze_otis_mandela_provincial_vs_federal() -> RichResult:
"""Side-by-side comparison: provincial OTIS vs federal SIU Mandela
rates.
Cross-references `analyze_c11_mandela_classification` (Ontario
provincial individuals) against the Sprott-Doob Feb 2021 federal
SIU figures (Table 19, N=1960).
"""
from . import sprott_doob as _sd
c11_r = analyze_c11_mandela_classification()
# Federal reference from Sprott-Doob Table 19
fed_solitary_pct = next(r["percent"]
for r in _sd.TABLE19_MANDELA_CLASSIFICATION
if r["category"] == "Solitary Confinement")
fed_torture_pct = next(r["percent"]
for r in _sd.TABLE19_MANDELA_CLASSIFICATION
if r["category"] == "Torture")
fed_n = sum(r["n"]
for r in _sd.TABLE19_MANDELA_CLASSIFICATION)
rows = []
# Federal reference row
rows.append(["Federal SIUs (CSC, Sprott-Doob T19)",
"Nov 2019 - Sept 2020", "Person-stays",
f"{fed_solitary_pct}%", f"{fed_torture_pct}%",
fed_n])
# Provincial Segregation rows from c11
for r in c11_r.payload["rows"]:
if r[1] == "Segregation":
rows.append([f"Ontario Provincial Segregation",
str(r[0]), "Individuals",
r[3], r[5], r[6]])
# Provincial RC rows
for r in c11_r.payload["rows"]:
if r[1] == "Restrictive confinement":
rows.append([f"Ontario Provincial RC (broader)",
str(r[0]), "Individuals",
r[3], r[5], r[6]])
# Compute the largest provincial-vs-federal torture-rate gap
seg_rows = [r for r in c11_r.payload["rows"]
if r[1] == "Segregation"]
if seg_rows:
max_torture_pct = max(
float(r[5].rstrip("%")) for r in seg_rows)
max_torture_year = next(r[0] for r in seg_rows
if abs(float(r[5].rstrip("%"))
- max_torture_pct) < 0.01)
gap_pp = max_torture_pct - fed_torture_pct
else:
max_torture_pct = float("nan")
max_torture_year = None
gap_pp = float("nan")
return RichResult(
title=("Mandela-RF cross-comparison -- Ontario provincial "
"(OTIS) vs federal SIUs (Sprott-Doob T19)"),
summary_lines=[
("Federal SIU reference",
f"Solitary {fed_solitary_pct}%, "
f"Torture {fed_torture_pct}% (N={fed_n})"),
("Provincial peak torture-rate (Segregation)",
f"{max_torture_pct:.1f}% in {max_torture_year}"),
("Provincial-vs-federal gap (peak)",
f"{gap_pp:+.1f} percentage points "
f"({'higher' if gap_pp > 0 else 'lower'} provincially)"),
("Caveat",
"Federal: person-stays + full Mandela operationalization "
"(hours-out-of-cell + duration). Provincial: individuals "
"+ duration only. Cross-walks should note this."),
],
tables=[{
"title": "Federal vs Provincial Mandela classification:",
"headers": ["Source", "Period", "Unit",
"Solitary %", "Torture %", "N"],
"rows": rows,
}],
interpretation=(
"Cross-comparison of Mandela-Rules classifications -- "
"federal SIUs (Sprott-Doob's headline 9.9% torture) vs "
"Ontario provincial (OTIS c11). Note the unit and "
"operationalization differences: federal numbers are "
"person-stays with the full hours-out-of-cell + duration "
"operationalization; provincial numbers are individuals "
"with duration-only (no h-o-of-cell in OTIS). The "
"comparison is therefore directionally informative but "
"not perfectly apples-to-apples. The KEY question this "
"supports: is Ontario provincial restrictive-confinement "
"duration approaching, exceeding, or remaining below the "
"federal SIU torture-classified rate? Use the provincial "
"Segregation view for the closest match."
),
payload={
"federal": {"solitary_pct": fed_solitary_pct,
"torture_pct": fed_torture_pct, "n": fed_n},
"max_provincial_torture_pct": max_torture_pct,
"max_provincial_torture_year": max_torture_year,
"gap_pp": gap_pp,
},
)
# ── Doob chi-square aggregate analyzers ────────────────────────────
#
# Prof. Anthony N. Doob (U. of Toronto, member of the federal SIU IAP
# alongside Howard Sapers and Jane Sprott) has a long career applying
# chi-square independence tests in Canadian corrections research.
# These analyzers -- applied to the c-series (aggregate counts) and
# d-series (deaths) -- are framed as a homage to that tradition: classic
# Pearson chi-square + Cramer's V on every meaningful 2-way slice of
# the aggregate tables, plus year-over-year trend tests on death counts.
#
# These pair with the Ruhela formulations on a01/b01 (where individual-
# level causal inference applies) and the SIU IAP federal context
# (see ``morie.siuiap`` for citations).
[docs]
def analyze_c_doob_chi2(*,
contingency_value: str = "NumberIndividuals_RestrictiveConfinement",
) -> RichResult:
"""Doob chi-square family on c-series demographic contingency tables.
Honour to Prof. Doob's chi-square tradition in Canadian corrections.
Runs Pearson χ² + Cramer's V on every meaningful 2-way slice of the
c-series datasets, using the chosen primary count column.
Sibling: ``analyze_d_doob_chi2`` (death counts).
Federal counterpart context: ``morie.siuiap`` documents the
Structured Intervention Unit Implementation Advisory Panel
(Doob, Sprott, Sapers et al.).
"""
return analyze_c_aggregate(contingency_value=contingency_value)
[docs]
def analyze_d_doob_chi2() -> RichResult:
"""Doob chi-square on d-series death data + yearly trend.
Honour to Prof. Doob's chi-square tradition; same payload as
``analyze_d_aggregate`` (yearly Poisson 95% CI + Alert × MedicalCause
χ² + Alert × Housing χ²).
"""
return analyze_d_aggregate()
# Backward-compat alias (renamed 2026-05-09)
[docs]
def analyze_b01_dual(*args, **kwargs):
"""Deprecated alias for ``analyze_b01_ruhela_formulations``."""
import warnings as _w
_w.warn("analyze_b01_dual is deprecated; use analyze_b01_ruhela_formulations",
DeprecationWarning, stacklevel=2)
return analyze_b01_ruhela_formulations(*args, **kwargs)
# ── c/d-series aggregate analyzers ──────────────────────────────────
#
# c-series and d-series datasets are pre-aggregated (counts already
# grouped by demographics) so the per-(id, year) Ruhela dual cannot be
# computed on them. The shape-appropriate machinery is contingency-
# table χ² + Cramer's V (c-series) and yearly trend rate ratios
# (d-series). See data/datasets/OTIS/OTIS_DATA_DICTIONARY.md.
def _chi2_cramer(table: "pd.DataFrame") -> dict:
"""Chi² test of independence + Cramer's V on a 2-way contingency.
Returns a dict with chi2, dof, pvalue, cramer_v, n. Returns NaN
fields when the table fails the χ² minimum-cell-count rule of
thumb (expected counts < 5 in any cell).
"""
from scipy import stats as sps
if table.size == 0:
return {"chi2": float("nan"), "dof": 0,
"pvalue": float("nan"), "cramer_v": float("nan"),
"n": 0, "min_cell": 0}
arr = table.values
n = float(arr.sum())
min_cell = int(arr.min())
try:
chi2, p, dof, expected = sps.chi2_contingency(arr)
# Cramer's V via χ² / (n × min(rows-1, cols-1))
denom = n * (min(table.shape) - 1) if min(table.shape) > 1 else 1.0
v = float(np.sqrt(chi2 / denom)) if denom > 0 else float("nan")
return {"chi2": float(chi2), "dof": int(dof),
"pvalue": float(p), "cramer_v": v,
"n": int(n), "min_cell": min_cell,
"min_expected": float(expected.min())}
except Exception as e: # noqa: BLE001
return {"chi2": float("nan"), "dof": 0,
"pvalue": float("nan"), "cramer_v": float("nan"),
"n": int(n), "min_cell": min_cell,
"error": f"{type(e).__name__}: {e}"}
def _contingency_chi2(df: "pd.DataFrame", *, row: str, col: str,
value: str) -> tuple["pd.DataFrame", dict]:
"""Build a 2-way contingency from a long-format df (sum value over
other cols) and run χ² + Cramer's V on it.
"""
if not all(c in df.columns for c in (row, col, value)):
return pd.DataFrame(), {"error": "missing columns",
"n": 0, "chi2": float("nan"),
"cramer_v": float("nan"),
"pvalue": float("nan")}
g = df.dropna(subset=[row, col, value]).copy()
g[value] = pd.to_numeric(g[value], errors="coerce")
g = g.dropna(subset=[value])
table = (g.groupby([row, col])[value].sum()
.unstack(fill_value=0))
stats = _chi2_cramer(table)
return table, stats
[docs]
def analyze_c_aggregate(*,
contingency_value: str = "NumberIndividuals_RestrictiveConfinement",
) -> RichResult:
"""Chi² + Cramer's V family on every c-series 2-way slice.
c-series datasets are pre-aggregated counts, so the natural
statistic on them is contingency-table independence. This runs
χ² + Cramer's V on each meaningful (categorical_a × categorical_b)
slice using the chosen primary count column.
"""
from .otis_datasets import load_otis_dataset
rows = []
payloads: dict = {}
slices = [
("c03", "Race", "Gender"),
("c04", "Race", "Region_MostRecentPlacement"),
("c05", "Religion", "Region_MostRecentPlacement"),
("c06", "Age_Category", "Region_MostRecentPlacement"),
("c07", "Alert_Type", "Gender"),
("c08", "Religion", "Gender"),
("c09", "Age_Category", "Gender"),
]
for ds, r_col, c_col in slices:
try:
df = load_otis_dataset(ds)
table, stats = _contingency_chi2(
df, row=r_col, col=c_col, value=contingency_value)
except Exception as e: # noqa: BLE001
stats = {"chi2": float("nan"), "cramer_v": float("nan"),
"pvalue": float("nan"), "n": 0,
"error": f"{type(e).__name__}: {e}"}
rows.append([
ds, f"{r_col} × {c_col}",
int(stats.get("n", 0)),
(round(stats["chi2"], 3) if np.isfinite(stats.get("chi2", float("nan")))
else "n/a"),
int(stats.get("dof", 0)),
(f"{stats['pvalue']:.2e}" if np.isfinite(stats.get("pvalue", float("nan")))
else "n/a"),
(round(stats["cramer_v"], 4)
if np.isfinite(stats.get("cramer_v", float("nan"))) else "n/a"),
int(stats.get("min_cell", 0)),
])
payloads[ds] = {"row": r_col, "col": c_col, "stats": stats}
return RichResult(
title=("OTIS c-series -- χ² + Cramer's V family on demographic "
"contingency tables"),
summary_lines=[
("Contingency value column", contingency_value),
("Slices tested", len(rows)),
],
tables=[{
"title": ("Contingency χ² and Cramer's V on c-series slices "
f"(value = {contingency_value}):"),
"headers": ["Dataset", "Slice", "n",
"χ²", "dof", "p", "Cramer's V", "min cell"],
"rows": rows,
}],
interpretation=(
"Cramer's V is the canonical effect-size measure for χ² "
"independence on 2-way contingency tables: V≈0 -> "
"independence, V->1 -> strong association. p<.05 with "
"V<0.1 ⇒ statistically detectable but practically tiny "
"(typical of large-n contingency); V>0.3 is a noticeable "
"association. All slices are population-aggregate (year-"
"summed); the c-series anonymisation does not permit "
"individual-level inference."
),
payload={"slices": payloads,
"contingency_value": contingency_value},
)
[docs]
def analyze_d_aggregate() -> RichResult:
"""Yearly trend + alert-cause/housing χ² on d-series death data.
Three things on the d-series:
1. Per-year custodial-death count (d02-d05 aggregate over the
categorical dim) with Poisson 95% CIs.
2. Year-over-year rate ratio (last full year / first year).
3. χ² + Cramer's V on d06 (Alert × MedicalCauseOfDeath) and
d07 (Alert × Housing_Type).
All measures are at the population aggregate level -- d-series has
no per-individual alert columns, so the Ruhela dual is structurally
impossible here.
"""
from .otis_datasets import load_otis_dataset
from scipy import stats as sps
payloads: dict = {}
# 1. Yearly counts from d01 (canonical per-death record)
d01 = load_otis_dataset("d01")
yearly = (d01.dropna(subset=["Year"])
.groupby("Year").size().rename("n_deaths"))
yearly_rows = []
for y, n in yearly.items():
# Poisson 95% CI via chi² quantile
lo = sps.chi2.ppf(0.025, 2 * n) / 2 if n > 0 else 0
hi = sps.chi2.ppf(0.975, 2 * (n + 1)) / 2
yearly_rows.append([
int(y), int(n), f"[{lo:.1f}, {hi:.1f}]",
])
payloads["d01_yearly_counts"] = {int(y): int(n) for y, n in yearly.items()}
# 2. Year-over-year rate ratio (d01 last/first)
rr_text = "n/a"
if len(yearly) >= 2:
first_y, last_y = int(yearly.index.min()), int(yearly.index.max())
n_first = int(yearly.loc[first_y])
n_last = int(yearly.loc[last_y])
if n_first > 0:
rr = n_last / n_first
# Approximate Wald 95% CI on log-RR
log_rr = np.log(max(rr, 1e-9))
se_log_rr = np.sqrt(1.0 / max(n_first, 1) + 1.0 / max(n_last, 1))
ci = (np.exp(log_rr - 1.96 * se_log_rr),
np.exp(log_rr + 1.96 * se_log_rr))
rr_text = (f"{rr:.3f} ({last_y}/{first_y}; "
f"n={n_last}/{n_first}; 95% CI [{ci[0]:.3f}, "
f"{ci[1]:.3f}])")
payloads["yoy_rate_ratio"] = {"first_year": first_y,
"last_year": last_y,
"n_first": n_first,
"n_last": n_last,
"rr": rr,
"ci95_low": float(ci[0]),
"ci95_high": float(ci[1])}
# 3. d06 Alert × MedicalCauseOfDeath
d06 = load_otis_dataset("d06")
table_d06, stats_d06 = _contingency_chi2(
d06, row="Alert_Type", col="MedicalCauseOfDeath",
value="Number_CustodialDeaths")
payloads["d06_chi2"] = stats_d06
# 4. d07 Alert × Housing_Type
d07 = load_otis_dataset("d07")
table_d07, stats_d07 = _contingency_chi2(
d07, row="Alert_Type", col="Housing_Type",
value="Number_CustodialDeaths")
payloads["d07_chi2"] = stats_d07
chi_rows = []
for ds, slice_label, st in [
("d06", "Alert × MedicalCause", stats_d06),
("d07", "Alert × Housing_Type", stats_d07),
]:
chi_rows.append([
ds, slice_label, int(st.get("n", 0)),
(round(st["chi2"], 3) if np.isfinite(st.get("chi2", float("nan")))
else "n/a"),
int(st.get("dof", 0)),
(f"{st['pvalue']:.2e}" if np.isfinite(st.get("pvalue", float("nan")))
else "n/a"),
(round(st["cramer_v"], 4)
if np.isfinite(st.get("cramer_v", float("nan"))) else "n/a"),
int(st.get("min_cell", 0)),
])
return RichResult(
title=("OTIS d-series -- yearly death counts + "
"Alert × Cause/Housing χ²"),
summary_lines=[
("d01 total deaths", int(yearly.sum())),
("d01 year range",
f"{int(yearly.index.min())}–{int(yearly.index.max())}"
if not yearly.empty else "n/a"),
("Year-over-year RR", rr_text),
("d06 (Alert × MedicalCause) Cramer's V",
round(stats_d06["cramer_v"], 4)
if np.isfinite(stats_d06.get("cramer_v", float("nan")))
else "n/a"),
("d07 (Alert × Housing_Type) Cramer's V",
round(stats_d07["cramer_v"], 4)
if np.isfinite(stats_d07.get("cramer_v", float("nan")))
else "n/a"),
],
tables=[{
"title": "d01 yearly custodial-death counts (Poisson 95% CI):",
"headers": ["Year", "Deaths", "95% CI"],
"rows": yearly_rows,
}, {
"title": ("d06 / d07 Alert-flag × outcome contingency χ² + "
"Cramer's V:"),
"headers": ["Dataset", "Slice", "n",
"χ²", "dof", "p", "Cramer's V", "min cell"],
"rows": chi_rows,
}],
interpretation=(
"d-series carries no per-individual alert columns -- the "
"Ruhela alert-complexity dual is structurally impossible "
"on these data. The natural alternatives are: (1) yearly "
"death-count trends (small N: 116 total deaths over 3 "
"calendar years), and (2) Cramer's V on the d06 (Alert × "
"MedicalCause) and d07 (Alert × Housing) aggregate "
"tables. Note d-series uses calendar Year, not "
"EndFiscalYear. Yearly counts in this window are too "
"small to support change-point detection (Pettitt requires "
"≥10 timepoints typically)."
),
payload=payloads,
)
[docs]
def analyze_a01_with_csi_context(df: pd.DataFrame | None = None,
*,
variant: str = "total",
rebase_to_year: int | None = 2023,
) -> RichResult:
"""OTIS a01 causal pipeline + Toronto Crime Severity Index context.
Wires together three independent morie subsystems:
1. ``morie.otis_all_analyze.analyze_a01`` -- the MatchIt-then-
IRM-DML causal estimate of high-alert-complexity (ac ≥ 2) on
regional volatility (vm) within OTIS a01.
2. ``morie.otis_tps_overlay`` -- year-by-year correlation between
OTIS Toronto-region segregation counts and TPS incident counts.
3. ``morie.tps_csi.analyze_csi_from_tps_dataframes`` -- Statistics
Canada Crime Severity Index per year + per ward, weighted by
offence-specific severity weights and population-adjusted.
The narrative answer: "Did the OTIS-observed increase in high-alert-
complexity placements happen during a period of rising or falling
Toronto crime severity?" The CSI rebased to ``rebase_to_year``
(default 2023, OTIS's first FY) gives a grandma-readable index.
Parameters
----------
df : pd.DataFrame, optional
a01 DataFrame; if None, ``otis_all_analyze.analyze_a01()`` loads
the canonical local copy.
variant : {"total", "violent"}
StatsCan CSI variant.
rebase_to_year : int | None
Anchor year for the CSI index column. ``None`` skips rebasing.
"""
from . import tps_csi
from .tps_datasets import TPS_REGISTRY
otis_result = analyze_a01(df)
csi_dfs = {}
csi_load_errors = []
for cat in tps_csi.CSI_CATEGORIES:
meta = TPS_REGISTRY.get(cat)
if meta is None:
continue
try:
p = meta.csv_path()
if not p.exists():
csi_load_errors.append(f"{cat}: file missing")
continue
tps_df = pd.read_csv(p, usecols=lambda c: c in
{"OCC_YEAR", "HOOD_158"})
csi_dfs[cat] = tps_df
except Exception as e:
csi_load_errors.append(f"{cat}: {type(e).__name__}: {e}")
if not csi_dfs:
otis_result.warnings = list(otis_result.warnings or []) + [
"CSI context unavailable: no TPS CSVs loaded",
*csi_load_errors,
]
return otis_result
csi_result = tps_csi.analyze_csi_from_tps_dataframes(
csi_dfs, variant=variant)
by_year_full = pd.DataFrame(csi_result.payload["by_year"])
if rebase_to_year is not None and not by_year_full.empty:
try:
by_year_full = tps_csi.csi_per_year(
{int(r["year"]): {c: 0 for c in tps_csi.CSI_CATEGORIES}
for _, r in by_year_full.iterrows()},
variant=variant,
rebase_to_year=rebase_to_year,
)
# The above re-call drops actual counts; re-compute properly:
counts_by_year: dict[int, dict[str, int]] = {}
for cat, tps_df in csi_dfs.items():
if "OCC_YEAR" in tps_df.columns:
for y, n in tps_df.groupby("OCC_YEAR").size().items():
try:
counts_by_year.setdefault(int(y), {})[cat] = int(n)
except (ValueError, TypeError):
continue
by_year_full = tps_csi.csi_per_year(
counts_by_year, variant=variant,
rebase_to_year=rebase_to_year)
except (ValueError, KeyError) as e:
csi_load_errors.append(
f"rebase_to_year={rebase_to_year} failed: {e}")
# Restrict to OTIS overlap (2023-2025)
otis_years = {2023, 2024, 2025}
overlap = (by_year_full[by_year_full["year"].isin(otis_years)]
if "year" in by_year_full.columns else by_year_full)
csi_table_rows = []
for _, r in overlap.iterrows():
idx_val = (round(float(r["csi_index"]), 2)
if "csi_index" in r.index and pd.notna(r["csi_index"])
else "--")
csi_table_rows.append([
int(r["year"]),
int(r["raw_weighted_sum"]),
int(r["total_count"]),
round(float(r["csi_per_capita"]), 2),
idx_val,
])
summary = list(otis_result.summary_lines or []) + [
("-- CSI context (StatsCan), variant", variant),
("-- Rebase year", rebase_to_year if rebase_to_year else "none"),
("-- Years with CSI data",
sorted(by_year_full["year"].astype(int).tolist())
if "year" in by_year_full.columns else "n/a"),
]
interpretation = (
(otis_result.interpretation or "") + "\n\n"
f"CSI context: Toronto's Crime Severity Index ({variant}) "
f"is shown alongside the OTIS estimate, rebased to "
f"{rebase_to_year}=100 for grandma-readable trend. The OTIS-"
"observed regional volatility effect happens INSIDE this "
"broader Toronto crime-severity environment; rising CSI "
"alongside rising vm-effect would suggest a coupling, falling "
"CSI with stable vm-effect would suggest the OTIS dynamic is "
"internal to corrections rather than a reflection of street "
"crime trends."
)
return RichResult(
title=("OTIS a01 (alert->vm) + Toronto Crime Severity Index "
"context"),
summary_lines=summary,
tables=[{
"title": (f"Toronto CSI by year (variant={variant}, "
f"rebased to {rebase_to_year}=100):"),
"headers": ["year", "weighted_sum", "incidents",
"csi_per_capita", "csi_index"],
"rows": csi_table_rows,
}],
interpretation=interpretation,
payload={"otis": otis_result.payload,
"csi_overlap_years": csi_table_rows,
"csi_full_payload": csi_result.payload,
"variant": variant,
"rebase_to_year": rebase_to_year},
warnings=(list(otis_result.warnings or []) + csi_load_errors
if csi_load_errors else otis_result.warnings),
)
_ANALYSES["a01"] = analyze_a01
[docs]
def analyze_all(out_dir: Path | None = None) -> dict[str, RichResult]:
"""Run all 29 OTIS analyses and write outputs."""
out_dir = out_dir or DEFAULT_OUT
out_dir.mkdir(parents=True, exist_ok=True)
results: dict[str, RichResult] = {}
for ds_id, fn in _ANALYSES.items():
try:
r = fn()
results[ds_id] = r
(out_dir / f"otis_{ds_id}.txt").write_text(str(r))
(out_dir / f"otis_{ds_id}.json").write_text(
json.dumps(r.payload, indent=2, default=str,
ensure_ascii=False)
)
except Exception as e: # noqa: BLE001
results[ds_id] = RichResult(
title=f"{ds_id} (failed)",
warnings=[f"{type(e).__name__}: {e}"],
)
return results
# ----------------------------------------------------------------------
# MRM-prefixed aliases
# ----------------------------------------------------------------------
#
# Same callables, MRM-prefixed names. Prefer the *_mrm form in new
# code; the *_ruhela_* form remains as an alias for backward
# compatibility with existing notebooks and scripts.
analyze_a01_mrm_alt_age = analyze_a01_ruhela_alt_age
analyze_a01_mrm_alt_gender = analyze_a01_ruhela_alt_gender
analyze_a01_mrm_alt_toronto = analyze_a01_ruhela_alt_toronto
analyze_a01_mrm = analyze_a01_ruhela_formulations
analyze_a01_mrm_per_year = analyze_a01_ruhela_per_year
analyze_a01_mrm_subgroup_female = analyze_a01_ruhela_subgroup_female
analyze_a01_mrm_subgroup_male = analyze_a01_ruhela_subgroup_male
analyze_b01_mrm_alt_age = analyze_b01_ruhela_alt_age
analyze_b01_mrm_alt_gender = analyze_b01_ruhela_alt_gender
analyze_b01_mrm_alt_toronto = analyze_b01_ruhela_alt_toronto
analyze_b01_mrm = analyze_b01_ruhela_formulations
analyze_b01_mrm_per_year = analyze_b01_ruhela_per_year
analyze_b01_mrm_subgroup_female = analyze_b01_ruhela_subgroup_female
analyze_b01_mrm_subgroup_male = analyze_b01_ruhela_subgroup_male
analyze_b02_mrm_alt_age = analyze_b02_ruhela_alt_age
analyze_b02_mrm_alt_region = analyze_b02_ruhela_alt_region
analyze_b02_mrm = analyze_b02_ruhela_formulations
analyze_b03_mrm_aggregate = analyze_b03_ruhela_aggregate
analyze_b04_mrm_aggregate = analyze_b04_ruhela_aggregate
analyze_b06_mrm_aggregate = analyze_b06_ruhela_aggregate
analyze_b07_mrm_aggregate = analyze_b07_ruhela_aggregate
analyze_b08_mrm_aggregate = analyze_b08_ruhela_aggregate
analyze_b09_mrm_aggregate = analyze_b09_ruhela_aggregate
analyze_c01_mrm_aggregate = analyze_c01_ruhela_aggregate
analyze_c01_mrm_aggregate_region_cluster = analyze_c01_ruhela_aggregate_region_cluster
analyze_c02_mrm_aggregate = analyze_c02_ruhela_aggregate
analyze_c03_mrm_aggregate = analyze_c03_ruhela_aggregate
analyze_c04_mrm_aggregate = analyze_c04_ruhela_aggregate
analyze_c04_mrm_aggregate_region_cluster = analyze_c04_ruhela_aggregate_region_cluster
analyze_c05_mrm_aggregate = analyze_c05_ruhela_aggregate
analyze_c06_mrm_aggregate = analyze_c06_ruhela_aggregate
analyze_c07_mrm_aggregate = analyze_c07_ruhela_aggregate
analyze_c08_mrm_aggregate = analyze_c08_ruhela_aggregate
analyze_c09_mrm_aggregate = analyze_c09_ruhela_aggregate
analyze_c10_mrm_aggregate = analyze_c10_ruhela_aggregate
analyze_c11_mrm_aggregate = analyze_c11_ruhela_aggregate
analyze_c12_mrm_aggregate = analyze_c12_ruhela_aggregate
analyze_d02_mrm_aggregate = analyze_d02_ruhela_aggregate
analyze_d03_mrm_aggregate = analyze_d03_ruhela_aggregate
analyze_d04_mrm_aggregate = analyze_d04_ruhela_aggregate
analyze_d05_mrm_aggregate = analyze_d05_ruhela_aggregate