Source code for morie.otis_churn

"""morie.otis_churn -- Goffmanian institutional-churn analyses on OTIS.

Goffman (1961) describes 'total institutions' as totalizing environments
that produce a *cyclical* relationship to the inmate. This module
operationalises the cyclical / mortifying / embedding dimensions as
formal statistical tests:

    repeat_placement_concentration(b09)   -- Gini + power-law fit + KS test
    within_year_placement_count(b01)      -- per-(id,year) cell-size distribution
    within_year_region_diversity(b01)     -- intra-year region cycling
    mortification_cooccurrence(b01)       -- joint alert chi² + Cramer's V
    disciplinary_medical_overlap(b01)     -- disc × med-protect co-occurrence
    embedding_distribution(b02)           -- total-days survival (lognormal vs Pareto AIC)
    intra_year_transition_matrix(a01)     -- Markov regA->regA' within FY
    path_complexity_gini(b01)             -- Gini split by (year × region)
    region_alert_state_richness(b01)      -- distinct (region × combo) per cell
    regC_demog_contingency(b01)           -- multi-region × Gender × Age χ²
    irr_glmm_vm(b01)                      -- Poisson + NB IRR for vm

Each emits a RichResult.

NOTE: OTIS `UniqueIndividual_ID` is anonymised as `YYYY-XXXXX-AA`, where
`AA` is the dataset acronym (`RC` for a01, `SG` for b01/b02, `DC` for d01).
The YYYY prefix locks each ID to one fiscal year, AND IDs are re-randomized
per dataset file even within the same year. So longitudinal individual-level
linkage and cross-dataset linkage are both **not possible by design**.
All metrics above are either aggregate (b09 counts), within-year individual
(b01), or within-year total-days (b02). See `docs/source/methods/otis_linkage.md`.
"""

from __future__ import annotations

import json
from pathlib import Path

import numpy as np
import pandas as pd
from scipy import stats as sps

from .fn._richresult import RichResult

PROJECT = Path(__file__).resolve().parents[5]
DEFAULT_OUT = PROJECT / "data/manifest/outputs/otis_churn"


def _load(ds_id: str) -> pd.DataFrame:
    from .otis_datasets import load_otis_dataset
    return load_otis_dataset(ds_id)


# ── 1. Repeat-placement concentration ──────────────────────────────


[docs] def gini(x: np.ndarray) -> float: if x.size == 0 or x.sum() == 0: return 0.0 sx = np.sort(x.astype(float)) n = sx.size cum = np.cumsum(sx) return float((n + 1 - 2 * np.sum(cum) / cum[-1]) / n)
def _powerlaw_mle(x: np.ndarray, x_min: float = 1) -> tuple[float, float]: """Hill-MLE for discrete power-law: alpha = 1 + n / sum(ln(x_i / (x_min - 0.5))) Returns (alpha, std_err). """ x = x[x >= x_min].astype(float) n = x.size if n < 5: return float("nan"), float("nan") s = np.sum(np.log(x / (x_min - 0.5))) alpha = 1 + n / s if s > 0 else float("nan") se = (alpha - 1) / np.sqrt(n) if np.isfinite(alpha) else float("nan") return alpha, se def _parse_placement_bin(label: str) -> float: """Parse OTIS b09 placement-count bin labels. '1 placement' -> 1 '2 placements' -> 2 '6 to 10 placements' -> 8 (midpoint) '11 to 15 placements' -> 13 (midpoint) """ import re as _re s = str(label).strip().lower() rng = _re.search(r"(\d+)\s+to\s+(\d+)", s) if rng: a, b = int(rng.group(1)), int(rng.group(2)) return (a + b) / 2.0 n = _re.search(r"(\d+)", s) return float(n.group(1)) if n else float("nan")
[docs] def repeat_placement_concentration(df: pd.DataFrame | None = None) -> RichResult: """Goffman's 'cyclical inmate' -- heavy concentration of placements among a small fraction of individuals. Uses OTIS b09 (NumberPlacements × NumberIndividuals). """ df = df if df is not None else _load("b09") if "NumberPlacements_Segregation" not in df.columns: return RichResult(title="Repeat-placement concentration", warnings=["b09 columns missing"]) # Reconstruct individual-level placement count vector # row = (count_value, n_indivs at that count) rows = df.dropna(subset=["NumberPlacements_Segregation", "NumberIndividuals_Segregation"]) counts = rows["NumberPlacements_Segregation"].apply(_parse_placement_bin).values multipliers = rows["NumberIndividuals_Segregation"].astype(int).values if counts.size == 0: return RichResult(title="Repeat-placement concentration", warnings=["b09 has no usable rows"]) # Expand: for each (count, n_indiv) row, repeat `count` n_indiv times expanded = np.repeat(counts, multipliers) g = gini(expanded) alpha, alpha_se = _powerlaw_mle(expanded, x_min=1) # Top-decile share sorted_desc = np.sort(expanded)[::-1] n = sorted_desc.size top10pct = sorted_desc[: max(1, n // 10)].sum() total = sorted_desc.sum() top10_share = top10pct / total if total > 0 else 0.0 # KS test vs exponential if expanded.size >= 5 and expanded.std() > 0: scale = expanded.mean() ks = sps.kstest(expanded, "expon", args=(0, scale)) ks_p = float(ks.pvalue) else: ks_p = float("nan") return RichResult( title="Goffmanian: repeat-placement concentration", summary_lines=[ ("OTIS source", "b09 -- placements per individual"), ("Individuals (in seg)", int(n)), ("Mean placements/person", round(float(expanded.mean()), 2)), ("Median placements/person", float(np.median(expanded))), ("Max placements/person", int(expanded.max())), ("Gini coefficient", round(g, 4)), ("Top-10% share of placements", f"{100*top10_share:.1f}%"), ("Power-law α (Hill MLE)", round(alpha, 3) if np.isfinite(alpha) else "n/a"), ("Power-law α SE", round(alpha_se, 3) if np.isfinite(alpha_se) else "n/a"), ("KS-test p-value vs exponential", round(ks_p, 6) if np.isfinite(ks_p) else "n/a"), ], interpretation=( f"Gini = {g:.3f} on placements/person. Goffmanian total-" "institution dynamics predict heavy-tail concentration " "(few individuals account for many placements). " f"Power-law α ≈ {alpha:.2f} -- typical Goffmanian range is " "1.5–2.5 (preferential-attachment-style cycling). KS-test " f"p={ks_p:.4f} against exponential null: rejection ⇒ " "non-exponential heavy tail." ), payload={"gini": g, "alpha_mle": alpha, "alpha_se": alpha_se, "ks_pvalue": ks_p, "top10_share": top10_share, "n": int(n)}, )
# ── 2. Within-year placement-count distribution ───────────────────
[docs] def within_year_placement_count(df: pd.DataFrame | None = None) -> RichResult: """Distribution of segregation placements per (individual × fiscal year) cell. Because OTIS `UniqueIndividual_ID` is anonymised per fiscal year (`YYYY-XXXXX-AA`), each (id, year) cell is one anonymous person-year. This metric describes how concentrated the placements are *within* a fiscal year -- Goffman's intra-year cycling. Cross-year readmission is **not measurable** in OTIS by design. See OTIS_LINKAGE.md. """ df = df if df is not None else _load("b01") needed = {"UniqueIndividual_ID", "EndFiscalYear"} if not needed.issubset(df.columns): return RichResult(title="Within-year placement count", warnings=["b01 missing required cols"]) g = df.dropna(subset=["UniqueIndividual_ID", "EndFiscalYear"]) if g.empty: return RichResult(title="Within-year placement count", warnings=["no usable rows"]) counts = g.groupby(["UniqueIndividual_ID", "EndFiscalYear"]).size() cnt_arr = counts.to_numpy() return RichResult( title="Within-year placement count (Goffmanian intra-year cycling)", summary_lines=[ ("Distinct (id × year) cells", int(counts.size)), ("Mean placements / person-year", round(float(cnt_arr.mean()), 3)), ("Median", float(np.median(cnt_arr))), ("Q1 / Q3", f"{float(np.quantile(cnt_arr, 0.25)):.1f} / " f"{float(np.quantile(cnt_arr, 0.75)):.1f}"), ("Max placements in one FY", int(cnt_arr.max())), ("Cells with 1 placement", int((cnt_arr == 1).sum())), ("Cells with 2 placements", int((cnt_arr == 2).sum())), ("Cells with 3+ placements", int((cnt_arr >= 3).sum())), ("% with multiple placements within FY", f"{100*float((cnt_arr > 1).mean()):.1f}%"), ("Gini of placement counts", round(gini(cnt_arr), 3)), ], interpretation=( f"Of {counts.size} person-year cells, " f"{(cnt_arr > 1).sum()} ({100*float((cnt_arr > 1).mean()):.1f}%) " "received more than one segregation placement within the same " "fiscal year. Gini = " f"{gini(cnt_arr):.3f}. This is the intra-year concentration " "of segregation exposure; cross-year readmission cannot be " "measured because OTIS IDs are year-locked." ), payload={"n_cells": int(counts.size), "mean_count": float(cnt_arr.mean()), "median_count": float(np.median(cnt_arr)), "max_count": int(cnt_arr.max()), "frac_multi": float((cnt_arr > 1).mean()), "gini": float(gini(cnt_arr))}, warnings=[ "OTIS UniqueIndividual_ID is year-locked (YYYY-XXXXX-AA); " "this metric measures intra-year placement count only.", ], )
# ── 3. Within-year region diversity ────────────────────────────────
[docs] def within_year_region_diversity(df: pd.DataFrame | None = None) -> RichResult: """Distribution of distinct regions per individual *within* one fiscal year. Because OTIS `UniqueIndividual_ID` is anonymised per fiscal year (`YYYY-XXXXX-AA`), this metric measures **intra-year** region cycling only -- not multi-year mobility. See OTIS_LINKAGE.md. """ df = df if df is not None else _load("b01") if "Region_AtTimeOfPlacement" not in df.columns or \ "UniqueIndividual_ID" not in df.columns: return RichResult(title="Within-year region diversity", warnings=["b01 missing region/individual cols"]) g = df.dropna(subset=["UniqueIndividual_ID", "Region_AtTimeOfPlacement"]) n_regions_per = g.groupby("UniqueIndividual_ID")["Region_AtTimeOfPlacement"].nunique() if n_regions_per.size == 0: return RichResult(title="Within-year region diversity", warnings=["no usable rows"]) return RichResult( title="Within-year region diversity (Goffmanian intra-year mobility)", summary_lines=[ ("Distinct (id × year) cells", int(n_regions_per.size)), ("Mean #regions per person-year", round(float(n_regions_per.mean()), 3)), ("In just 1 region", int((n_regions_per == 1).sum())), ("In 2 regions", int((n_regions_per == 2).sum())), ("In 3+ regions", int((n_regions_per >= 3).sum())), ("% multi-region within FY", f"{100*float((n_regions_per > 1).mean()):.1f}%"), ], tables=[{ "title": "Counts by # regions visited within one FY:", "headers": ["#regions", "n_person_years", "%"], "rows": [[int(k), int(v), f"{100*v/n_regions_per.size:.1f}%"] for k, v in n_regions_per.value_counts().sort_index().items()], }], interpretation=( f"{100*(n_regions_per > 1).mean():.1f}% of person-years involve " "movement across more than one region within the same fiscal " "year -- intra-year cross-staff-regime mobility. Multi-year " "mobility cannot be measured because OTIS IDs are year-locked." ), warnings=[ "OTIS UniqueIndividual_ID is year-locked (YYYY-XXXXX-AA); " "this metric is intra-year only.", ], )
# ── 4. Mortification co-occurrence ────────────────────────────────
[docs] def mortification_cooccurrence(df: pd.DataFrame | None = None) -> RichResult: """Goffman's 'mortification of self' is operationalised here as concurrent stigmatising-alert flags on a single placement. """ df = df if df is not None else _load("b01") cols = ["MentalHealth_Alert", "SuicideRisk_Alert", "SuicideWatch_Alert"] have = [c for c in cols if c in df.columns] if len(have) < 2: return RichResult(title="Mortification co-occurrence", warnings=["need ≥2 alert columns"]) # Convert Yes/No strings to bool def _yn(s): return s.astype(str).str.strip().str.lower().eq("yes") flags = pd.DataFrame({c: _yn(df[c]) for c in have}) n_flags = flags.sum(axis=1) counts = n_flags.value_counts().sort_index() n = int(flags.shape[0]) # Chi² for independence of MentalHealth & SuicideRisk if "MentalHealth_Alert" in have and "SuicideRisk_Alert" in have: ct = pd.crosstab(flags["MentalHealth_Alert"], flags["SuicideRisk_Alert"]) try: chi2, p, dof, _ = sps.chi2_contingency(ct) cramer_v = float(np.sqrt(chi2 / (ct.values.sum() * max(1, min(ct.shape) - 1)))) except Exception: chi2, p, dof, cramer_v = float("nan"), float("nan"), 0, float("nan") else: chi2, p, dof, cramer_v = float("nan"), float("nan"), 0, float("nan") return RichResult( title="Goffmanian: mortification co-occurrence", summary_lines=[ ("Alerts considered", ", ".join(have)), ("Placements", int(n)), ("0 alerts", int((n_flags == 0).sum())), ("1 alert", int((n_flags == 1).sum())), ("2 alerts", int((n_flags == 2).sum())), ("3 alerts", int((n_flags == 3).sum())), ("MH × Suicide-Risk χ²", round(float(chi2), 3)), ("χ² p-value", round(float(p), 6)), ("Cramer's V", round(float(cramer_v), 4)), ], tables=[{ "title": "# concurrent alerts per placement:", "headers": ["#alerts", "n_placements", "%"], "rows": [[int(k), int(v), f"{100*v/n:.1f}%"] for k, v in counts.items()], }], interpretation=( f"Cramer's V = {cramer_v:.3f} between MH and Suicide-Risk " "alerts. Higher V = stronger co-occurrence (Goffman's " f"mortification stack). χ² p={p:.4g}: rejection of " "independence ⇒ alerts cluster on the same placements rather " "than being independently assigned." ), )
# ── 5. Disciplinary × medical-protection overlap ──────────────────
[docs] def disciplinary_medical_overlap(df: pd.DataFrame | None = None) -> RichResult: """Goffman's 'tinkering trades' tension: the same institution classifies the same person via *medical* AND *disciplinary* rationales. We look at co-occurrence on individual records. """ df = df if df is not None else _load("b01") disc_cols = [c for c in df.columns if c.startswith("SegReason_Disciplinary")] med_cols = [c for c in df.columns if c.startswith("SegReason_") and "Medical" in c] if not disc_cols or not med_cols: return RichResult(title="Disciplinary × medical overlap", warnings=["missing disciplinary or medical " "SegReason columns"]) def _yn_any(cols): return df[cols].apply( lambda s: s.astype(str).str.strip().str.lower().eq("yes") ).any(axis=1) has_disc = _yn_any(disc_cols) has_med = _yn_any(med_cols) ct = pd.crosstab(has_disc, has_med, rownames=["disc"], colnames=["med"]) try: chi2, p, _, _ = sps.chi2_contingency(ct) n = ct.values.sum() cramer_v = float(np.sqrt(chi2 / (n * max(1, min(ct.shape) - 1)))) except Exception: chi2, p, cramer_v = float("nan"), float("nan"), float("nan") return RichResult( title="Goffmanian: disciplinary × medical-protection overlap", summary_lines=[ ("Placements", int(df.shape[0])), ("Disciplinary cols", ", ".join(c.replace('SegReason_', '') for c in disc_cols)), ("Medical-protection cols", ", ".join(c.replace('SegReason_', '') for c in med_cols)), ("Both flagged", int((has_disc & has_med).sum())), ("Disciplinary only", int((has_disc & ~has_med).sum())), ("Medical only", int((~has_disc & has_med).sum())), ("Neither", int((~has_disc & ~has_med).sum())), ("χ² statistic", round(float(chi2), 3)), ("χ² p-value", round(float(p), 6)), ("Cramer's V", round(float(cramer_v), 4)), ], interpretation=( "Goffman's 'tinkering trades' surface where punitive and " "therapeutic logics co-classify the same person. " f"Cramer's V={cramer_v:.3f} measures their dependence. " f"χ² p={p:.4g} -- rejection ⇒ joint flagging is non-random." ), )
# ── 6. Total-days embedding distribution ──────────────────────────
[docs] def embedding_distribution(df: pd.DataFrame | None = None) -> RichResult: """TotalAggregatedDays_Segregation distribution per individual per fiscal year. Compare lognormal vs Pareto fit via AIC. """ df = df if df is not None else _load("b02") if "TotalAggregatedDays_Segregation" not in df.columns: return RichResult(title="Embedding distribution", warnings=["b02 missing TotalAggregatedDays"]) days = pd.to_numeric(df["TotalAggregatedDays_Segregation"], errors="coerce").dropna() days = days[days > 0] if days.size < 50: return RichResult(title="Embedding distribution", warnings=[f"only {days.size} valid rows"]) x = days.values.astype(float) # MLE fits try: ln_shape, ln_loc, ln_scale = sps.lognorm.fit(x, floc=0) ln_aic = 2 * 2 - 2 * sps.lognorm.logpdf(x, ln_shape, ln_loc, ln_scale).sum() except Exception: ln_aic = float("nan") try: pa_b, pa_loc, pa_scale = sps.pareto.fit(x, floc=0) pa_aic = 2 * 2 - 2 * sps.pareto.logpdf(x, pa_b, pa_loc, pa_scale).sum() except Exception: pa_aic = float("nan") try: ex_loc, ex_scale = sps.expon.fit(x, floc=0) ex_aic = 2 * 1 - 2 * sps.expon.logpdf(x, ex_loc, ex_scale).sum() except Exception: ex_aic = float("nan") fits = [("lognormal", ln_aic), ("pareto", pa_aic), ("exponential", ex_aic)] fits_valid = [f for f in fits if np.isfinite(f[1])] best = min(fits_valid, key=lambda f: f[1])[0] if fits_valid else "n/a" return RichResult( title="Goffmanian: institutional embedding (total days) distribution", summary_lines=[ ("Records", int(days.size)), ("Mean total days", round(float(days.mean()), 1)), ("Median", float(days.median())), ("Max", int(days.max())), ("Lognormal AIC", round(float(ln_aic), 1) if np.isfinite(ln_aic) else "n/a"), ("Pareto AIC", round(float(pa_aic), 1) if np.isfinite(pa_aic) else "n/a"), ("Exponential AIC", round(float(ex_aic), 1) if np.isfinite(ex_aic) else "n/a"), ("Best fit (lowest AIC)", best), ], interpretation=( f"Best fit: {best}. Pareto / lognormal beats exponential " "= heavy-tailed, consistent with Goffman's dichotomy between " "the casual short-stayer and the deeply-embedded long-stayer." ), payload={"lognormal_aic": float(ln_aic) if np.isfinite(ln_aic) else None, "pareto_aic": float(pa_aic) if np.isfinite(pa_aic) else None, "exponential_aic": float(ex_aic) if np.isfinite(ex_aic) else None, "best_fit": best, "n": int(days.size)}, )
# ── Master driver ────────────────────────────────────────────────── # ── 7. Within-year region transition matrix (Markov) ────────────── def _binarise(s: pd.Series) -> pd.Series: """Tolerant 'Yes/No/True/False' -> 0/1.""" if pd.api.types.is_numeric_dtype(s): return (s.fillna(0).astype(int) > 0).astype(int) return s.astype(str).str.strip().str.lower().isin( {"yes", "y", "true", "t", "1"}).astype(int)
[docs] def intra_year_transition_matrix(df: pd.DataFrame | None = None) -> RichResult: """Knowledge itself is power. -- Francis Bacon""" df = df if df is not None else _load("a01") needed = {"UniqueIndividual_ID", "EndFiscalYear", "Region_AtTimeOfPlacement"} if not needed.issubset(df.columns): return RichResult(title="Intra-year region transition matrix", warnings=["a01 missing required cols"]) base = df.dropna(subset=list(needed)).copy() base["regA"] = base["Region_AtTimeOfPlacement"].astype(str) base = base.sort_values(["UniqueIndividual_ID", "EndFiscalYear"]) base["regA_prev"] = base.groupby( ["UniqueIndividual_ID", "EndFiscalYear"])["regA"].shift(1) edges = base.dropna(subset=["regA_prev"]) if edges.empty: return RichResult(title="Intra-year region transition matrix", warnings=["no within-year transitions"]) counts = (edges.groupby(["regA_prev", "regA"]).size() .unstack(fill_value=0)) # Make square (every region as both row and col) regions = sorted(set(counts.index) | set(counts.columns)) counts = counts.reindex(index=regions, columns=regions, fill_value=0) row_sums = counts.sum(axis=1).replace(0, np.nan) P = (counts.div(row_sums, axis=0)).fillna(0.0) # Stationary distribution via left-eigvec of largest eigval P_arr = P.to_numpy() if P_arr.shape[0] >= 2: eigvals, eigvecs = np.linalg.eig(P_arr.T) idx = int(np.argmin(np.abs(eigvals - 1.0))) v = np.real(eigvecs[:, idx]) v = v / v.sum() if v.sum() != 0 else v stationary = dict(zip(regions, [round(float(x), 4) for x in v])) else: stationary = {} # Off-diagonal Theil-T off = counts.values.copy() np.fill_diagonal(off, 0) off_total = off.sum() if off_total > 0: p = off.flatten() / off_total p = p[p > 0] theil = float((p * np.log(p * len(p))).sum()) else: theil = float("nan") diag_share = float(np.diag(counts.values).sum() / counts.values.sum()) return RichResult( title="Intra-year region transition matrix (a01 Markov)", summary_lines=[ ("Transitions observed", int(counts.values.sum())), ("Region states", len(regions)), ("Diagonal share (stay-in-region)", f"{100*diag_share:.1f}%"), ("Off-diagonal Theil-T", round(theil, 4) if np.isfinite(theil) else "n/a"), ("Stationary π (regA)", stationary), ], tables=[{ "title": "Transition probability matrix P(regA->regA'):", "headers": ["from \\ to"] + regions, "rows": [[r] + [round(float(P.loc[r, c]), 3) for c in regions] for r in regions], }], interpretation=( f"Within-fiscal-year region-stickiness is " f"{100*diag_share:.1f}% (diagonal share). The off-diagonal " f"mass is concentrated with Theil-T = {theil:.3f} " "(higher = more concentrated transitions, lower = more uniform " "cross-region mixing). All transitions are intra-year -- see " "OTIS_LINKAGE.md." ), payload={"diag_share": diag_share, "theil_off": theil, "stationary": stationary, "n_transitions": int(counts.values.sum())}, warnings=[ "OTIS UniqueIndividual_ID is year-locked (YYYY-XXXXX-AA); " "this transition matrix is intra-year only.", ], )
# ── 8. Path complexity Gini split by (year, region) ───────────────
[docs] def path_complexity_gini(df: pd.DataFrame | None = None) -> RichResult: """Gini of placements-per-(id × year) cells, split by (year, region). Extends `within_year_placement_count` by reporting per-region and per-year Gini coefficients, exposing where Goffmanian cycling is most concentrated. """ df = df if df is not None else _load("b01") needed = {"UniqueIndividual_ID", "EndFiscalYear", "Region_AtTimeOfPlacement"} if not needed.issubset(df.columns): return RichResult(title="Path complexity Gini", warnings=["b01 missing required cols"]) g = df.dropna(subset=list(needed)) cells = (g.groupby(["UniqueIndividual_ID", "EndFiscalYear", "Region_AtTimeOfPlacement"]) .size().reset_index(name="n_placements")) if cells.empty: return RichResult(title="Path complexity Gini", warnings=["no usable rows"]) overall_gini = gini(cells["n_placements"].to_numpy()) by_yr = (cells.groupby("EndFiscalYear")["n_placements"] .apply(lambda s: round(gini(s.to_numpy()), 4))) by_rg = (cells.groupby("Region_AtTimeOfPlacement")["n_placements"] .apply(lambda s: round(gini(s.to_numpy()), 4))) by_yrg = (cells.groupby( ["EndFiscalYear", "Region_AtTimeOfPlacement"])["n_placements"] .apply(lambda s: round(gini(s.to_numpy()), 4)) .reset_index(name="gini")) return RichResult( title="Path complexity Gini (b01 placement counts)", summary_lines=[ ("Overall Gini", round(overall_gini, 4)), ("Cells", int(cells.shape[0])), ("Total placements", int(cells["n_placements"].sum())), ("Gini by fiscal year", dict(by_yr)), ("Gini by region", dict(by_rg)), ], tables=[{ "title": "Gini by (year × region):", "headers": ["EndFiscalYear", "Region", "Gini"], "rows": [[int(r["EndFiscalYear"]), str(r["Region_AtTimeOfPlacement"]), float(r["gini"])] for _, r in by_yrg.iterrows()], }], interpretation=( f"Overall placement-count Gini = {overall_gini:.3f}. " "Higher region- or year-specific Gini ⇒ that cell has more " "Goffmanian heavy-tail cycling (a few people accumulating " "many placements). All values are intra-year by construction." ), payload={"overall_gini": float(overall_gini), "by_year": {int(k): float(v) for k, v in by_yr.items()}, "by_region": {str(k): float(v) for k, v in by_rg.items()}}, )
# ── 9. Region × alert state-richness (b01) ────────────────────────
[docs] def region_alert_state_richness(df: pd.DataFrame | None = None) -> RichResult: """Distinct (region × alert-combo) states occupied per (id × year). Goffman's mortification × institutional-mobility hybrid: how many distinct *role-zones* (region paired with alert presentation) does a person traverse within one fiscal year? """ df = df if df is not None else _load("b01") needed = {"UniqueIndividual_ID", "EndFiscalYear", "Region_AtTimeOfPlacement", "MentalHealth_Alert", "SuicideRisk_Alert", "SuicideWatch_Alert"} if not needed.issubset(df.columns): return RichResult(title="Region × alert state richness", warnings=["b01 missing required cols"]) base = df.dropna(subset=list(needed)).copy() a_mh = _binarise(base["MentalHealth_Alert"]) a_sr = _binarise(base["SuicideRisk_Alert"]) a_sw = _binarise(base["SuicideWatch_Alert"]) base["combo"] = (a_mh * 4 + a_sr * 2 + a_sw * 1).astype(int) base["state"] = (base["Region_AtTimeOfPlacement"].astype(str) + ":c" + base["combo"].astype(str)) n_states = (base.groupby(["UniqueIndividual_ID", "EndFiscalYear"]) ["state"].nunique()) if n_states.empty: return RichResult(title="Region × alert state richness", warnings=["no usable rows"]) arr = n_states.to_numpy() return RichResult( title="Region × alert state richness (Goffmanian role-zone sweep)", summary_lines=[ ("Person-year cells", int(n_states.size)), ("Mean distinct states / cell", round(float(arr.mean()), 3)), ("Median", float(np.median(arr))), ("Max distinct states", int(arr.max())), ("Cells in 1 state", int((arr == 1).sum())), ("Cells in 2 states", int((arr == 2).sum())), ("Cells in 3+ states", int((arr >= 3).sum())), ("% cells with multi-state", f"{100*float((arr > 1).mean()):.1f}%"), ], interpretation=( f"{100*(arr > 1).mean():.1f}% of person-years span multiple " "(region × alert-combo) states within one fiscal year -- a " "Goffmanian sweep across institutional role-zones. Possible " "states = 5 regions × 8 combos = 40." ), payload={"mean_states": float(arr.mean()), "median_states": float(np.median(arr)), "max_states": int(arr.max()), "frac_multi": float((arr > 1).mean())}, )
# ── 10. Multi-region path × demographic contingency (b01) ─────────
[docs] def regC_demog_contingency(df: pd.DataFrame | None = None) -> RichResult: """Multi-region path indicator (regC = number of distinct regions visited within FY) × Gender × Age contingency, with chi² + Cramer's V. """ df = df if df is not None else _load("b01") needed = {"UniqueIndividual_ID", "EndFiscalYear", "Region_AtTimeOfPlacement", "Gender", "Age_Category"} if not needed.issubset(df.columns): return RichResult(title="Multi-region path × demographics", warnings=["b01 missing required cols"]) base = df.dropna(subset=list(needed)).copy() cell = (base.groupby(["UniqueIndividual_ID", "EndFiscalYear"]) .agg(regC=("Region_AtTimeOfPlacement", "nunique"), Gender=("Gender", "first"), Age_Category=("Age_Category", "first")) .reset_index()) cell["multi_region"] = (cell["regC"] >= 2).astype(int) if cell.empty: return RichResult(title="Multi-region path × demographics", warnings=["no usable rows"]) # Gender × multi_region χ² + Cramer's V tab_g = pd.crosstab(cell["Gender"], cell["multi_region"]) if tab_g.shape == (2, 2) and tab_g.values.min() >= 5: chi_g, p_g, dof_g, _ = sps.chi2_contingency(tab_g.values) v_g = float(np.sqrt(chi_g / (tab_g.values.sum() * (min(tab_g.shape) - 1)))) else: chi_g = p_g = float("nan") v_g = float("nan") # Age × multi_region tab_a = pd.crosstab(cell["Age_Category"], cell["multi_region"]) if tab_a.shape[0] >= 2 and tab_a.values.min() >= 5: chi_a, p_a, dof_a, _ = sps.chi2_contingency(tab_a.values) v_a = float(np.sqrt(chi_a / (tab_a.values.sum() * (min(tab_a.shape) - 1)))) else: chi_a = p_a = float("nan") v_a = float("nan") return RichResult( title="Multi-region path × demographics (chi² + Cramer's V)", summary_lines=[ ("Person-year cells", int(cell.shape[0])), ("% multi-region", f"{100*float(cell['multi_region'].mean()):.1f}%"), ("Gender χ²", round(chi_g, 3) if np.isfinite(chi_g) else "n/a"), ("Gender χ² p-value", round(p_g, 6) if np.isfinite(p_g) else "n/a"), ("Gender Cramer's V", round(v_g, 4) if np.isfinite(v_g) else "n/a"), ("Age χ²", round(chi_a, 3) if np.isfinite(chi_a) else "n/a"), ("Age χ² p-value", round(p_a, 6) if np.isfinite(p_a) else "n/a"), ("Age Cramer's V", round(v_a, 4) if np.isfinite(v_a) else "n/a"), ], tables=[{ "title": "Gender × multi-region (intra-FY):", "headers": ["Gender", "single-region", "multi-region"], "rows": [[str(g), int(tab_g.loc[g, 0]) if 0 in tab_g.columns else 0, int(tab_g.loc[g, 1]) if 1 in tab_g.columns else 0] for g in tab_g.index], }, { "title": "Age × multi-region (intra-FY):", "headers": ["Age_Category", "single-region", "multi-region"], "rows": [[str(a), int(tab_a.loc[a, 0]) if 0 in tab_a.columns else 0, int(tab_a.loc[a, 1]) if 1 in tab_a.columns else 0] for a in tab_a.index], }], interpretation=( "Cramer's V quantifies association strength of Gender and Age " "with within-fiscal-year multi-region cycling. V≈0 -> " "independence, V->1 -> strong association. All measures are " "intra-year (OTIS IDs are year-locked)." ), payload={"frac_multi": float(cell["multi_region"].mean()), "chi2_gender": float(chi_g) if np.isfinite(chi_g) else None, "p_gender": float(p_g) if np.isfinite(p_g) else None, "v_gender": float(v_g) if np.isfinite(v_g) else None, "chi2_age": float(chi_a) if np.isfinite(chi_a) else None, "p_age": float(p_a) if np.isfinite(p_a) else None, "v_age": float(v_a) if np.isfinite(v_a) else None}, )
# ── 11. Incidence-rate-ratio Poisson/NB on vm count (b01) ─────────
[docs] def irr_glmm_vm(df: pd.DataFrame | None = None) -> RichResult: """Poisson + Negative-Binomial GLM IRR table for vm ~ T_high_ac plus demographic covariates, on the (UniqueIndividual_ID, FY) cell. A simplified implementation of the OTIS-RC `glmmTMB` pipeline: no random effect (statsmodels can't easily do mixed NB on this scale). Provides Poisson and NB2 IRR side by side as a robustness pair. """ df = df if df is not None else _load("b01") try: from .otis_causal import make_pair_alert_to_volatility_ruhela import statsmodels.api as sm import statsmodels.formula.api as smf except ImportError as e: return RichResult(title="IRR Poisson/NB GLM (vm)", warnings=[f"missing dep: {e}"]) py, T, Y, _ = make_pair_alert_to_volatility_ruhela(df) if py.empty or py[Y].sum() == 0: return RichResult(title="IRR Poisson/NB GLM (vm)", warnings=["empty cell-table or zero outcome"]) py = py.copy() py["yr"] = py["EndFiscalYear"].astype("category") py["sg"] = py["Gender"].astype("category") py["ag"] = py["Age_Category"].astype("category") formula = f"{Y} ~ {T} + C(yr) + C(sg) + C(ag)" out_rows = [] for family_label, family in ( ("Poisson", sm.families.Poisson()), ("NegBin2", sm.families.NegativeBinomial(alpha=1.0)), ): try: model = smf.glm(formula, data=py, family=family).fit() beta = float(model.params.get(T, np.nan)) se = float(model.bse.get(T, np.nan)) irr = float(np.exp(beta)) if np.isfinite(beta) else float("nan") ci_lo = float(np.exp(beta - 1.96 * se)) if np.isfinite(se) else float("nan") ci_hi = float(np.exp(beta + 1.96 * se)) if np.isfinite(se) else float("nan") pval = float(model.pvalues.get(T, np.nan)) out_rows.append([family_label, round(irr, 4), f"[{ci_lo:.3f}, {ci_hi:.3f}]", round(pval, 6), round(float(model.aic), 2)]) except Exception as e: # noqa: BLE001 out_rows.append([family_label, "fit failed", str(type(e).__name__), "--", "--"]) return RichResult( title="IRR Poisson/NB GLM -- vm ~ T_high_ac + demog", summary_lines=[ ("Cells (id × year)", int(py.shape[0])), ("Mean vm / cell", round(float(py[Y].mean()), 3)), ("Treatment T_high_ac (ac≥2) prevalence", f"{100*float(py[T].mean()):.1f}%"), ], tables=[{ "title": f"IRR for {T} on {Y} (covariate-adjusted):", "headers": ["Family", "IRR", "95% CI", "p-value", "AIC"], "rows": out_rows, }], interpretation=( "IRR ≈ exp(β) on the alert-complexity treatment. " "Poisson assumes equidispersion; NB2 relaxes that. " "Concordance between the two is a robustness signal. " "All inference is on intra-year (id × FY) cells." ), payload={"n_cells": int(py.shape[0]), "mean_vm": float(py[Y].mean()), "irr_results": out_rows}, warnings=[ "No random-effect / cluster-robust SE in this implementation. " "For paper-grade SEs, use morie.otis_causal.otis_irm_dml.", ], )
[docs] def analyze_all(out_dir: Path | None = None) -> dict[str, RichResult]: out_dir = out_dir or DEFAULT_OUT out_dir.mkdir(parents=True, exist_ok=True) results: dict[str, RichResult] = {} for name, fn in [ ("repeat_placement_concentration", repeat_placement_concentration), ("within_year_placement_count", within_year_placement_count), ("within_year_region_diversity", within_year_region_diversity), ("mortification_cooccurrence", mortification_cooccurrence), ("disciplinary_medical_overlap", disciplinary_medical_overlap), ("embedding_distribution", embedding_distribution), ("intra_year_transition_matrix", intra_year_transition_matrix), ("path_complexity_gini", path_complexity_gini), ("region_alert_state_richness", region_alert_state_richness), ("regC_demog_contingency", regC_demog_contingency), ("irr_glmm_vm", irr_glmm_vm), ]: try: r = fn() results[name] = r (out_dir / f"churn_{name}.txt").write_text(str(r)) (out_dir / f"churn_{name}.json").write_text( json.dumps(r.payload, indent=2, default=str, ensure_ascii=False) ) except Exception as e: # noqa: BLE001 results[name] = RichResult( title=f"churn {name} (failed)", warnings=[f"{type(e).__name__}: {e}"], ) return results