Skip to content

qc

Quality control functions for CREDTOOLS data.

adaptive_qc(locus, qc_metrics, *, logLR_threshold=1.0, z_threshold=1.0, z_std_diff_threshold=3.0, r_threshold=0.1, dentist_s_pvalue_threshold=4.0, dentist_s_r2_threshold=0.6, c1b_z_std_diff_threshold=10.0, adaptive_lambda_threshold=0.05)

Two-stage adaptive QC.

Stage A — baseline C1+C2+C3 outlier removal at the supplied thresholds. Stage B — if lambda_s on the cleaned locus still exceeds adaptive_lambda_threshold, augment with C1b (|z_std_diff| > c1b_z_std_diff_threshold & |z| > z_threshold) applied to the ORIGINAL locus's expected_z. Union the two outlier sets, remove from the original locus once, and recompute lambda_s.

The C1b augmentation is applied to the original locus's qc_metrics, NOT to the cleaned locus's metrics, because the residual diagnostics for the flipped/perturbed SNPs are most informative before they are removed.

Parameters

locus : Locus Input locus (unmodified). qc_metrics : Dict[str, pd.DataFrame] Output of locus_qc(LocusSet([locus])). Reused as-is; no recomputation. logLR_threshold, z_threshold, z_std_diff_threshold, r_threshold, dentist_s_pvalue_threshold, dentist_s_r2_threshold : float Baseline (Stage A) thresholds, passed through to identify_outliers. c1b_z_std_diff_threshold : float, optional Threshold on |z_std_diff| for the Stage B C1b rule, by default 10.0. adaptive_lambda_threshold : float, optional Trigger for Stage B; if cleaned-locus lambda_s exceeds this value, the C1b augmentation runs. Default 0.05.

Returns

cleaned_locus : Locus Locus with the union outlier set removed. meta : dict with keys lambda_s_before, lambda_s_after_baseline, lambda_s_after_final, n_outliers_baseline, n_outliers_c1b_extra, outlier_snpids (final union), adaptive_triggered (bool), baseline_thresholds (dict), adaptive_lambda_threshold, c1b_z_std_diff_threshold.

Source code in credtools/qc.py
def adaptive_qc(
    locus: Locus,
    qc_metrics: Dict[str, pd.DataFrame],
    *,
    logLR_threshold: float = 1.0,
    z_threshold: float = 1.0,
    z_std_diff_threshold: float = 3.0,
    r_threshold: float = 0.1,
    dentist_s_pvalue_threshold: float = 4.0,
    dentist_s_r2_threshold: float = 0.6,
    c1b_z_std_diff_threshold: float = 10.0,
    adaptive_lambda_threshold: float = 0.05,
) -> Tuple[Locus, Dict[str, Any]]:
    """Two-stage adaptive QC.

    Stage A — baseline C1+C2+C3 outlier removal at the supplied thresholds.
    Stage B — if ``lambda_s`` on the cleaned locus still exceeds
    ``adaptive_lambda_threshold``, augment with C1b
    (``|z_std_diff| > c1b_z_std_diff_threshold & |z| > z_threshold``)
    applied to the ORIGINAL locus's expected_z. Union the two outlier sets,
    remove from the original locus once, and recompute ``lambda_s``.

    The C1b augmentation is applied to the original locus's qc_metrics, NOT
    to the cleaned locus's metrics, because the residual diagnostics for the
    flipped/perturbed SNPs are most informative before they are removed.

    Parameters
    ----------
    locus : Locus
        Input locus (unmodified).
    qc_metrics : Dict[str, pd.DataFrame]
        Output of ``locus_qc(LocusSet([locus]))``. Reused as-is; no
        recomputation.
    logLR_threshold, z_threshold, z_std_diff_threshold, r_threshold,
    dentist_s_pvalue_threshold, dentist_s_r2_threshold : float
        Baseline (Stage A) thresholds, passed through to ``identify_outliers``.
    c1b_z_std_diff_threshold : float, optional
        Threshold on ``|z_std_diff|`` for the Stage B C1b rule, by default 10.0.
    adaptive_lambda_threshold : float, optional
        Trigger for Stage B; if cleaned-locus lambda_s exceeds this value,
        the C1b augmentation runs. Default 0.05.

    Returns
    -------
    cleaned_locus : Locus
        Locus with the union outlier set removed.
    meta : dict with keys
        ``lambda_s_before``, ``lambda_s_after_baseline``,
        ``lambda_s_after_final``, ``n_outliers_baseline``,
        ``n_outliers_c1b_extra``, ``outlier_snpids`` (final union),
        ``adaptive_triggered`` (bool), ``baseline_thresholds`` (dict),
        ``adaptive_lambda_threshold``, ``c1b_z_std_diff_threshold``.
    """
    cohort = f"{locus.popu}_{locus.cohort}"

    baseline_thresholds = {
        "logLR_threshold": logLR_threshold,
        "z_threshold": z_threshold,
        "z_std_diff_threshold": z_std_diff_threshold,
        "r_threshold": r_threshold,
        "dentist_s_pvalue_threshold": dentist_s_pvalue_threshold,
        "dentist_s_r2_threshold": dentist_s_r2_threshold,
    }

    try:
        lambda_s_before: Optional[float] = float(estimate_s_rss(locus))
    except Exception as exc:  # pragma: no cover - defensive
        logger.warning(f"adaptive_qc: estimate_s_rss(locus) failed: {exc}")
        lambda_s_before = None

    # Stage A: baseline outlier identification (no C1b).
    outlier_df = identify_outliers(
        qc_metrics,
        cohort,
        logLR_threshold=logLR_threshold,
        z_threshold=z_threshold,
        z_std_diff_threshold=z_std_diff_threshold,
        r_threshold=r_threshold,
        dentist_s_pvalue_threshold=dentist_s_pvalue_threshold,
        dentist_s_r2_threshold=dentist_s_r2_threshold,
        enable_c1b=False,
    )
    baseline_snps: List[str] = (
        outlier_df["SNPID"].tolist() if not outlier_df.empty else []
    )

    if baseline_snps:
        cleaned_baseline = remove_snps_from_locus(locus, baseline_snps)
        try:
            lambda_s_after_baseline: Optional[float] = float(
                estimate_s_rss(cleaned_baseline)
            )
        except Exception as exc:  # pragma: no cover - defensive
            logger.warning(
                f"adaptive_qc: estimate_s_rss(cleaned_baseline) failed: {exc}"
            )
            lambda_s_after_baseline = None
    else:
        cleaned_baseline = locus
        lambda_s_after_baseline = lambda_s_before

    # Stage B: trigger C1b augmentation if lambda_s remains elevated.
    triggered = (
        lambda_s_after_baseline is not None
        and lambda_s_after_baseline > adaptive_lambda_threshold
    )
    c1b_extra: List[str] = []
    cleaned_locus = cleaned_baseline
    lambda_s_after_final: Optional[float] = lambda_s_after_baseline

    if triggered:
        expected_z = qc_metrics.get("expected_z", pd.DataFrame())
        if not expected_z.empty:
            cohort_expected_z = expected_z[expected_z["cohort"] == cohort]
            if not cohort_expected_z.empty:
                mask = (
                    cohort_expected_z["z_std_diff"].abs() > c1b_z_std_diff_threshold
                ) & (cohort_expected_z["z"].abs() > z_threshold)
                baseline_set = set(baseline_snps)
                c1b_extra = [
                    snp
                    for snp in cohort_expected_z.loc[mask, "SNPID"].tolist()
                    if snp not in baseline_set
                ]

        if c1b_extra:
            union = baseline_snps + c1b_extra
            cleaned_locus = remove_snps_from_locus(locus, union)
            try:
                lambda_s_after_final = float(estimate_s_rss(cleaned_locus))
            except Exception as exc:  # pragma: no cover - defensive
                logger.warning(
                    f"adaptive_qc: estimate_s_rss(cleaned_locus) failed: {exc}"
                )
                lambda_s_after_final = None
        else:
            # No new SNPs from C1b — Stage B was a no-op.
            triggered = False

    final_outliers: List[str] = baseline_snps + c1b_extra

    ls_before_str = f"{lambda_s_before:.4f}" if lambda_s_before is not None else "N/A"
    ls_final_str = (
        f"{lambda_s_after_final:.4f}" if lambda_s_after_final is not None else "N/A"
    )
    logger.info(
        f"adaptive_qc[{locus.locus_id}] "
        f"baseline={len(baseline_snps)} c1b_extra={len(c1b_extra)} "
        f"triggered={triggered} lambda_s {ls_before_str} -> {ls_final_str}"
    )

    meta: Dict[str, Any] = {
        "lambda_s_before": lambda_s_before,
        "lambda_s_after_baseline": lambda_s_after_baseline,
        "lambda_s_after_final": lambda_s_after_final,
        "n_outliers_baseline": len(baseline_snps),
        "n_outliers_c1b_extra": len(c1b_extra),
        "outlier_snpids": final_outliers,
        "adaptive_triggered": triggered,
        "baseline_thresholds": baseline_thresholds,
        "adaptive_lambda_threshold": adaptive_lambda_threshold,
        "c1b_z_std_diff_threshold": c1b_z_std_diff_threshold,
    }
    return cleaned_locus, meta

cochran_q(locus_set)

Compute Cochran-Q statistic for heterogeneity testing across cohorts.

Parameters

locus_set : LocusSet LocusSet object containing multiple loci/cohorts.

Returns

pd.DataFrame DataFrame with SNPID as index and columns: - Q: Cochran-Q test statistic - Q_pvalue: p-value from chi-squared test - I_squared: I² heterogeneity statistic (percentage)

Notes

The Cochran-Q test assesses heterogeneity in effect sizes across studies:

Q = Σ w_i(β_i - β_pooled)²

where: - w_i = 1/SE_i² (inverse variance weights) - β_i = effect size in study i - β_pooled = weighted average effect size

The I² statistic quantifies the proportion of total variation due to heterogeneity rather than chance:

I² = max(0, (Q - df)/Q × 100%)

Interpretation: - Q p-value < 0.05: significant heterogeneity - I² > 50%: substantial heterogeneity - I² > 75%: considerable heterogeneity

High heterogeneity may indicate: - Population differences - Different LD patterns - Batch effects - Population stratification

Source code in credtools/qc.py
def cochran_q(locus_set: LocusSet) -> pd.DataFrame:
    """
    Compute Cochran-Q statistic for heterogeneity testing across cohorts.

    Parameters
    ----------
    locus_set : LocusSet
        LocusSet object containing multiple loci/cohorts.

    Returns
    -------
    pd.DataFrame
        DataFrame with SNPID as index and columns:
        - Q: Cochran-Q test statistic
        - Q_pvalue: p-value from chi-squared test
        - I_squared: I² heterogeneity statistic (percentage)

    Notes
    -----
    The Cochran-Q test assesses heterogeneity in effect sizes across studies:

    Q = Σ w_i(β_i - β_pooled)²

    where:
    - w_i = 1/SE_i² (inverse variance weights)
    - β_i = effect size in study i
    - β_pooled = weighted average effect size

    The I² statistic quantifies the proportion of total variation due to
    heterogeneity rather than chance:

    I² = max(0, (Q - df)/Q × 100%)

    Interpretation:
    - Q p-value < 0.05: significant heterogeneity
    - I² > 50%: substantial heterogeneity
    - I² > 75%: considerable heterogeneity

    High heterogeneity may indicate:
    - Population differences
    - Different LD patterns
    - Batch effects
    - Population stratification
    """
    # Outer join: include SNPs present in any cohort
    merged_df = locus_set.loci[0].original_sumstats[[ColName.SNPID]].copy()
    for i, locus_obj in enumerate(locus_set.loci):
        locus_df = locus_obj.sumstats[
            [ColName.SNPID, ColName.BETA, ColName.SE, ColName.EAF]
        ].copy()
        locus_df.rename(
            columns={
                ColName.BETA: f"BETA_{i}",
                ColName.SE: f"SE_{i}",
                ColName.EAF: f"EAF_{i}",
            },
            inplace=True,
        )
        merged_df = pd.merge(
            merged_df, locus_df, on=ColName.SNPID, how="outer", suffixes=("", f"_{i}")
        )

    n_cohorts: int = len(locus_set.loci)

    # Build weight and effect arrays; NaN where a cohort lacks the SNP
    weight_arr = np.column_stack(
        [1.0 / (merged_df[f"SE_{i}"] ** 2) for i in range(n_cohorts)]
    )
    effect_arr = np.column_stack([merged_df[f"BETA_{i}"] for i in range(n_cohorts)])

    # Per-SNP count of non-missing cohorts
    present = ~np.isnan(effect_arr)
    k_per_snp = present.sum(axis=1)

    # Replace NaN with 0 in weights so they don't contribute
    w = np.where(present, weight_arr, 0.0)
    e = np.where(present, effect_arr, 0.0)

    w_sum = w.sum(axis=1)
    weighted_mean = np.where(w_sum > 0, (w * e).sum(axis=1) / w_sum, 0.0)

    # Q = Σ w_i (β_i - β_pooled)² over non-missing cohorts
    Q = (w * (e - weighted_mean[:, None]) ** 2).sum(axis=1)

    # df = k_i - 1 per SNP
    df = k_per_snp - 1

    # P-value and I² only for SNPs with >= 2 cohorts
    p_value = np.full(len(merged_df), np.nan)
    I_squared = np.full(len(merged_df), np.nan)
    valid = df >= 1
    p_value[valid] = np.maximum(stats.chi2.sf(Q[valid], df[valid]), 1e-300)
    with np.errstate(invalid="ignore"):
        I_squared[valid] = np.maximum(0, (Q[valid] - df[valid]) / Q[valid] * 100)
    Q = np.where(valid, Q, np.nan)

    # Create output dataframe
    output_df = pd.DataFrame(
        {
            "SNPID": merged_df["SNPID"],
            "Q": Q,
            "Q_pvalue": p_value,
            "I_squared": I_squared,
            "k": k_per_snp,
        }
    )
    return output_df.set_index(ColName.SNPID)

compare_maf(locus)

Compare allele frequencies between summary statistics and LD reference.

Parameters

locus : Locus Locus object containing summary statistics and LD matrix.

Returns

pd.DataFrame DataFrame containing the comparison results with columns: - SNPID: SNP identifier - MAF_sumstats: MAF from summary statistics - MAF_ld: MAF from LD reference

Returns empty DataFrame if AF2 column is not available in LD matrix.

Warnings

If AF2 column is not present in the LD matrix, a warning is logged.

Notes

This function compares minor allele frequencies (MAF) between:

  1. Summary statistics (derived from EAF)
  2. LD reference panel (from AF2 column)

Large discrepancies may indicate: - Population stratification - Allele frequency differences between studies - Potential data quality issues

MAF is calculated as min(AF, 1-AF) for both sources.

Source code in credtools/qc.py
def compare_maf(locus: Locus) -> pd.DataFrame:
    """
    Compare allele frequencies between summary statistics and LD reference.

    Parameters
    ----------
    locus : Locus
        Locus object containing summary statistics and LD matrix.

    Returns
    -------
    pd.DataFrame
        DataFrame containing the comparison results with columns:
        - SNPID: SNP identifier
        - MAF_sumstats: MAF from summary statistics
        - MAF_ld: MAF from LD reference

        Returns empty DataFrame if AF2 column is not available in LD matrix.

    Warnings
    --------
    If AF2 column is not present in the LD matrix, a warning is logged.

    Notes
    -----
    This function compares minor allele frequencies (MAF) between:

    1. Summary statistics (derived from EAF)
    2. LD reference panel (from AF2 column)

    Large discrepancies may indicate:
    - Population stratification
    - Allele frequency differences between studies
    - Potential data quality issues

    MAF is calculated as min(AF, 1-AF) for both sources.
    """
    input_locus = locus.copy()
    if "AF2" not in input_locus.ld.map.columns:
        logger.warning("AF2 is not in the LD matrix.")
        return pd.DataFrame()
    input_locus = intersect_sumstat_ld(input_locus)
    df = input_locus.sumstats[[ColName.SNPID, ColName.MAF]].copy()
    df.rename(columns={ColName.MAF: "MAF_sumstats"}, inplace=True)
    df.set_index(ColName.SNPID, inplace=True)
    af_ld = pd.Series(
        index=input_locus.ld.map[ColName.SNPID].tolist(),
        data=input_locus.ld.map["AF2"].values,
    )
    maf_ld = np.minimum(af_ld, 1 - af_ld)
    df["MAF_ld"] = maf_ld
    df[ColName.SNPID] = df.index
    df.reset_index(drop=True, inplace=True)
    return df

compute_dentist_s(locus)

Compute Dentist-S statistic and p-value for outlier detection.

Parameters

locus : Locus Locus object containing summary statistics and LD matrix.

Returns

pd.DataFrame DataFrame containing the results of the Dentist-S test with columns: - SNPID: SNP identifier - t_dentist_s: Dentist-S test statistic - -log10p_dentist_s: -log10 p-value

Notes

Reference: https://github.com/mkanai/slalom/blob/854976f8e19e6fad2db3123eb9249e07ba0e1c1b/slalom.py#L254

The Dentist-S statistic tests for outliers by comparing each variant's z-score to what would be expected based on its LD with the lead variant:

t_dentist_s = (z_j - r_jk * z_k)^2 / (1 - r_jk^2)

where: - z_j: z-score for variant j - z_k: z-score for lead variant k - r_jk: LD correlation between variants j and k

TODO: Use ABF to select lead variant, although in most cases the lead variant is the one with the smallest p-value.

Source code in credtools/qc.py
def compute_dentist_s(locus: Locus) -> pd.DataFrame:
    """
    Compute Dentist-S statistic and p-value for outlier detection.

    Parameters
    ----------
    locus : Locus
        Locus object containing summary statistics and LD matrix.

    Returns
    -------
    pd.DataFrame
        DataFrame containing the results of the Dentist-S test with columns:
        - SNPID: SNP identifier
        - t_dentist_s: Dentist-S test statistic
        - -log10p_dentist_s: -log10 p-value

    Notes
    -----
    Reference: https://github.com/mkanai/slalom/blob/854976f8e19e6fad2db3123eb9249e07ba0e1c1b/slalom.py#L254

    The Dentist-S statistic tests for outliers by comparing each variant's z-score
    to what would be expected based on its LD with the lead variant:

    t_dentist_s = (z_j - r_jk * z_k)^2 / (1 - r_jk^2)

    where:
    - z_j: z-score for variant j
    - z_k: z-score for lead variant k
    - r_jk: LD correlation between variants j and k

    TODO: Use ABF to select lead variant, although in most cases the lead variant
    is the one with the smallest p-value.
    """
    input_locus = locus.copy()
    input_locus = intersect_sumstat_ld(input_locus)
    df = input_locus.sumstats.copy()
    df["Z"] = df[ColName.BETA] / df[ColName.SE]
    lead_idx = df[ColName.P].idxmin()
    # TODO: use abf to select lead variant, although in most cases the lead variant is the one with the smallest p-value
    lead_z = df.loc[lead_idx, "Z"]
    df["r"] = input_locus.ld.r[lead_idx]
    df["r2"] = df["r"] ** 2

    df["t_dentist_s"] = (df["Z"] - df["r"] * lead_z) ** 2 / (1 - df["r"] ** 2)  # type: ignore
    df["t_dentist_s"] = np.where(df["t_dentist_s"] < 0, np.inf, df["t_dentist_s"])
    df.at[lead_idx, "t_dentist_s"] = np.nan
    df["-log10p_dentist_s"] = -stats.chi2.logsf(df["t_dentist_s"], df=1) / np.log(10)

    df = df[[ColName.SNPID, "t_dentist_s", "-log10p_dentist_s", "r2"]].copy()
    # df.set_index(ColName.SNPID, inplace=True)
    # df.index.name = None
    return df

estimate_s_rss(locus, r_tol=1e-08, method='null-mle', eigvens=None, dtype=None)

Estimate s parameter in the susie_rss Model Using Regularized LD.

Parameters

locus : Locus Locus object containing summary statistics and LD matrix. r_tol : float, optional Tolerance level for eigenvalue check of positive semidefinite matrix, by default 1e-8. method : str, optional Method to estimate s, by default "null-mle". Options: "null-mle", "null-partialmle", or "null-pseudomle". eigvens : Optional[Dict[str, np.ndarray]], optional Pre-computed eigenvalues and eigenvectors, by default None. dtype : Optional[np.dtype], optional Data type for computation. If None, uses float64. Use np.float32 for reduced memory usage with minimal precision loss.

Returns

float Estimated s value between 0 and 1 (or potentially > 1 for "null-partialmle").

Raises

ValueError If n <= 1 or if the method is not implemented.

Notes

This function estimates the parameter s, which provides information about the consistency between z-scores and the LD matrix. A larger s indicates a strong inconsistency between z-scores and the LD matrix.

The function implements three estimation methods:

  • "null-mle": Maximum likelihood estimation under the null model
  • "null-partialmle": Partial MLE using null space projection
  • "null-pseudomle": Pseudo-likelihood estimation

The z-scores are transformed using the formula: z_transformed = sqrt(sigma2) * z where sigma2 = (n-1) / (z^2 + n-2)

Source code in credtools/qc.py
def estimate_s_rss(
    locus: Locus,
    r_tol: float = 1e-8,
    method: str = "null-mle",
    eigvens: Optional[Dict[str, np.ndarray]] = None,
    dtype: Optional[np.dtype] = None,
) -> float:
    """
    Estimate s parameter in the susie_rss Model Using Regularized LD.

    Parameters
    ----------
    locus : Locus
        Locus object containing summary statistics and LD matrix.
    r_tol : float, optional
        Tolerance level for eigenvalue check of positive semidefinite matrix, by default 1e-8.
    method : str, optional
        Method to estimate s, by default "null-mle".
        Options: "null-mle", "null-partialmle", or "null-pseudomle".
    eigvens : Optional[Dict[str, np.ndarray]], optional
        Pre-computed eigenvalues and eigenvectors, by default None.
    dtype : Optional[np.dtype], optional
        Data type for computation. If None, uses float64.
        Use np.float32 for reduced memory usage with minimal precision loss.

    Returns
    -------
    float
        Estimated s value between 0 and 1 (or potentially > 1 for "null-partialmle").

    Raises
    ------
    ValueError
        If n <= 1 or if the method is not implemented.

    Notes
    -----
    This function estimates the parameter s, which provides information about the
    consistency between z-scores and the LD matrix. A larger s indicates a strong
    inconsistency between z-scores and the LD matrix.

    The function implements three estimation methods:

    - "null-mle": Maximum likelihood estimation under the null model
    - "null-partialmle": Partial MLE using null space projection
    - "null-pseudomle": Pseudo-likelihood estimation

    The z-scores are transformed using the formula:
    z_transformed = sqrt(sigma2) * z
    where sigma2 = (n-1) / (z^2 + n-2)
    """
    # make sure the LD matrix and sumstats file are matched
    input_locus = locus.copy()
    input_locus = intersect_sumstat_ld(input_locus)
    z = (
        input_locus.sumstats[ColName.BETA] / input_locus.sumstats[ColName.SE]
    ).to_numpy()
    n = input_locus.sample_size
    # Check and process input arguments z, R
    z = np.where(np.isnan(z), 0, z)
    if eigvens is not None:
        eigvals = eigvens["eigvals"]
        eigvecs = eigvens["eigvecs"]
    else:
        eigens = get_eigen(input_locus.ld.r, dtype)
        eigvals = eigens["eigvals"]
        eigvecs = eigens["eigvecs"]

    # if np.any(eigvals < -r_tol):
    #     logger.warning("The LD matrix is not positive semidefinite. Negative eigenvalues are set to zero")
    eigvals[eigvals < r_tol] = 0

    if n <= 1:
        raise ValueError("n must be greater than 1")

    sigma2 = (n - 1) / (z**2 + n - 2)
    z = np.sqrt(sigma2) * z

    if method == "null-mle":

        @jit(nopython=True, cache=True)
        def negloglikelihood(s, ztv, d):
            denom = (1 - s) * d + s
            term1 = 0.5 * np.sum(np.log(denom))
            term2 = 0.5 * np.sum((ztv / denom) * ztv)
            return term1 + term2

        ztv = eigvecs.T @ z
        result = minimize_scalar(
            negloglikelihood,
            bounds=(0, 1),
            method="bounded",
            args=(ztv, eigvals),
            options={"xatol": np.sqrt(np.finfo(float).eps)},
        )
        s = result.x  # type: ignore

    elif method == "null-partialmle":
        colspace = np.where(eigvals > 0)[0]
        if len(colspace) == len(z):
            s = 0
        else:
            znull = eigvecs[:, ~np.isin(np.arange(len(z)), colspace)].T @ z
            s = np.sum(znull**2) / len(znull)

    elif method == "null-pseudomle":

        def pseudolikelihood(
            s: float, z: np.ndarray, eigvals: np.ndarray, eigvecs: np.ndarray
        ) -> float:
            precision = eigvecs @ (eigvecs.T / ((1 - s) * eigvals + s))
            postmean = np.zeros_like(z)
            postvar = np.zeros_like(z)
            for i in range(len(z)):
                postmean[i] = -(1 / precision[i, i]) * precision[i, :].dot(z) + z[i]
                postvar[i] = 1 / precision[i, i]
            return -np.sum(stats.norm.logpdf(z, loc=postmean, scale=np.sqrt(postvar)))

        result = minimize_scalar(
            pseudolikelihood,
            bounds=(0, 1),
            method="bounded",
            args=(z, eigvals, eigvecs),
        )
        s = result.x  # type: ignore

    else:
        raise ValueError("The method is not implemented")

    return s  # type: ignore

get_eigen(ldmatrix, dtype=None)

Compute eigenvalues and eigenvectors of LD matrix.

Parameters

ldmatrix : np.ndarray A p by p symmetric, positive semidefinite correlation matrix. dtype : Optional[np.dtype], optional Data type for computation. If None, uses the input matrix dtype. Use np.float32 for reduced memory usage with minimal precision loss.

Returns

Dict[str, np.ndarray] Dictionary containing eigenvalues and eigenvectors with keys: - 'eigvals': eigenvalues array - 'eigvecs': eigenvectors matrix

Notes

TODO: accelerate with joblib for large matrices.

This function uses numpy.linalg.eigh which is optimized for symmetric matrices and returns eigenvalues in ascending order.

Source code in credtools/qc.py
def get_eigen(
    ldmatrix: np.ndarray, dtype: Optional[np.dtype] = None
) -> Dict[str, np.ndarray]:
    """
    Compute eigenvalues and eigenvectors of LD matrix.

    Parameters
    ----------
    ldmatrix : np.ndarray
        A p by p symmetric, positive semidefinite correlation matrix.
    dtype : Optional[np.dtype], optional
        Data type for computation. If None, uses the input matrix dtype.
        Use np.float32 for reduced memory usage with minimal precision loss.

    Returns
    -------
    Dict[str, np.ndarray]
        Dictionary containing eigenvalues and eigenvectors with keys:
        - 'eigvals': eigenvalues array
        - 'eigvecs': eigenvectors matrix

    Notes
    -----
    TODO: accelerate with joblib for large matrices.

    This function uses numpy.linalg.eigh which is optimized for symmetric matrices
    and returns eigenvalues in ascending order.
    """
    if dtype is not None:
        ldmatrix = ldmatrix.astype(dtype)
    eigvals, eigvecs = np.linalg.eigh(ldmatrix)
    return {"eigvals": eigvals, "eigvecs": eigvecs}

identify_outliers(qc_metrics, cohort, logLR_threshold=2, z_threshold=2, z_std_diff_threshold=3, r_threshold=0.8, dentist_s_pvalue_threshold=4, dentist_s_r2_threshold=0.6, enable_c1b=False, c1b_z_std_diff_threshold=10.0)

Identify outlier SNPs based on QC metrics.

Parameters

qc_metrics : Dict[str, pd.DataFrame] Dictionary of QC results from locus_qc function. cohort : str Cohort identifier (format: "popu_cohort"). logLR_threshold : float, optional LogLR threshold for LD mismatch detection, by default 2. z_threshold : float, optional Z-score threshold for both LD mismatch and marginal SNP detection, by default 2. z_std_diff_threshold : float, optional Z_std_diff threshold for marginal SNP outlier detection, by default 3. r_threshold : float, optional Correlation threshold with lead SNP for marginal SNP detection, by default 0.8. dentist_s_pvalue_threshold : float, optional -log10 p-value threshold for Dentist-S outlier detection, by default 4. dentist_s_r2_threshold : float, optional R² threshold for Dentist-S outlier detection, by default 0.6. enable_c1b : bool, optional If True, also evaluate the C1b "high-z anomalous-residual" rule (|z_std_diff| > c1b_z_std_diff_threshold AND |z| > z_threshold) and emit a C1b_high_z_residual column. Defaults to False to preserve pre-0.5.5 behavior. c1b_z_std_diff_threshold : float, optional Threshold on |z_std_diff| for the C1b rule, by default 10.0 (intentionally conservative).

Returns

pd.DataFrame DataFrame of outlier SNPs. When enable_c1b=False (default) the columns are [SNPID, C1_ld_mismatch, C2_marginal, C3_dentist_s]; when enable_c1b=True the column C1b_high_z_residual is inserted between C1_ld_mismatch and C2_marginal. Each row represents a SNP that triggered at least one criterion.

Source code in credtools/qc.py
def identify_outliers(
    qc_metrics: Dict[str, pd.DataFrame],
    cohort: str,
    logLR_threshold: float = 2,
    z_threshold: float = 2,
    z_std_diff_threshold: float = 3,
    r_threshold: float = 0.8,
    dentist_s_pvalue_threshold: float = 4,
    dentist_s_r2_threshold: float = 0.6,
    enable_c1b: bool = False,
    c1b_z_std_diff_threshold: float = 10.0,
) -> pd.DataFrame:
    """
    Identify outlier SNPs based on QC metrics.

    Parameters
    ----------
    qc_metrics : Dict[str, pd.DataFrame]
        Dictionary of QC results from locus_qc function.
    cohort : str
        Cohort identifier (format: "popu_cohort").
    logLR_threshold : float, optional
        LogLR threshold for LD mismatch detection, by default 2.
    z_threshold : float, optional
        Z-score threshold for both LD mismatch and marginal SNP detection, by default 2.
    z_std_diff_threshold : float, optional
        Z_std_diff threshold for marginal SNP outlier detection, by default 3.
    r_threshold : float, optional
        Correlation threshold with lead SNP for marginal SNP detection, by default 0.8.
    dentist_s_pvalue_threshold : float, optional
        -log10 p-value threshold for Dentist-S outlier detection, by default 4.
    dentist_s_r2_threshold : float, optional
        R² threshold for Dentist-S outlier detection, by default 0.6.
    enable_c1b : bool, optional
        If True, also evaluate the C1b "high-z anomalous-residual" rule
        (``|z_std_diff| > c1b_z_std_diff_threshold AND |z| > z_threshold``)
        and emit a ``C1b_high_z_residual`` column. Defaults to False to
        preserve pre-0.5.5 behavior.
    c1b_z_std_diff_threshold : float, optional
        Threshold on ``|z_std_diff|`` for the C1b rule, by default 10.0
        (intentionally conservative).

    Returns
    -------
    pd.DataFrame
        DataFrame of outlier SNPs. When ``enable_c1b=False`` (default) the
        columns are ``[SNPID, C1_ld_mismatch, C2_marginal, C3_dentist_s]``;
        when ``enable_c1b=True`` the column ``C1b_high_z_residual`` is
        inserted between ``C1_ld_mismatch`` and ``C2_marginal``. Each row
        represents a SNP that triggered at least one criterion.
    """
    if enable_c1b:
        empty_columns = [
            "SNPID",
            "C1_ld_mismatch",
            "C1b_high_z_residual",
            "C2_marginal",
            "C3_dentist_s",
        ]
    else:
        empty_columns = ["SNPID", "C1_ld_mismatch", "C2_marginal", "C3_dentist_s"]
    empty_result = pd.DataFrame(columns=empty_columns)

    # Collect all SNPIDs and their criterion flags
    all_snpids: set = set()
    c1_snps: set = set()
    c1b_snps: set = set()
    c2_snps: set = set()
    c3_snps: set = set()

    # Get expected_z metrics for this cohort
    expected_z = qc_metrics.get("expected_z", pd.DataFrame())
    cohort_expected_z = pd.DataFrame()
    if not expected_z.empty:
        cohort_expected_z = expected_z[expected_z["cohort"] == cohort]

        if not cohort_expected_z.empty:
            # C1: LD Mismatch Indicators
            ld_mismatch_condition = (cohort_expected_z["logLR"] > logLR_threshold) & (
                cohort_expected_z["z"].abs() > z_threshold
            )

            # C2: Marginally Non-significant SNPs
            # Get lead SNP correlation from dentist_s results
            cohort_dentist_s = qc_metrics.get("dentist_s", pd.DataFrame())
            if not cohort_dentist_s.empty:
                cohort_dentist_s = cohort_dentist_s[
                    cohort_dentist_s["cohort"] == cohort
                ]

                # Merge to get r values
                merged_data = pd.merge(
                    cohort_expected_z[["SNPID", "z", "z_std_diff"]],
                    cohort_dentist_s[["SNPID", "r2"]],
                    on="SNPID",
                    how="left",
                )
                merged_data["r_abs"] = np.sqrt(merged_data["r2"].fillna(0))

                # C2 per SuSiE guidelines (Wang et al., JRSSB 2020): all three
                # conditions are required. The |z| < z_threshold guard is what
                # restricts this criterion to marginally non-significant SNPs;
                # without it, causal / lead SNPs (large |z|, high LD to lead)
                # get misclassified as LD-mismatch outliers and removed.
                marginal_condition = (
                    (merged_data["z_std_diff"].abs() > z_std_diff_threshold)
                    & (merged_data["r_abs"] > r_threshold)
                    & (merged_data["z"].abs() < z_threshold)
                )

                # Collect C1 and C2 SNPs
                c1_hits = merged_data[ld_mismatch_condition.values]["SNPID"].tolist()
                c2_hits = merged_data[marginal_condition]["SNPID"].tolist()
                c1_snps.update(c1_hits)
                c2_snps.update(c2_hits)
                all_snpids.update(c1_hits)
                all_snpids.update(c2_hits)

    # C1b: high-z anomalous-residual rule. Complements C1 by dropping the
    # logLR gate; complements C2 by flipping the |z| direction. Operates only
    # on the kriging residual, no DENTIST-S merge needed.
    if enable_c1b and not cohort_expected_z.empty:
        c1b_condition = (
            cohort_expected_z["z_std_diff"].abs() > c1b_z_std_diff_threshold
        ) & (cohort_expected_z["z"].abs() > z_threshold)
        c1b_hits = cohort_expected_z[c1b_condition]["SNPID"].tolist()
        c1b_snps.update(c1b_hits)
        all_snpids.update(c1b_hits)

    # C3: Dentist-S outliers
    dentist_s = qc_metrics.get("dentist_s", pd.DataFrame())
    if not dentist_s.empty:
        cohort_dentist_s = dentist_s[dentist_s["cohort"] == cohort]

        if not cohort_dentist_s.empty:
            dentist_outlier_condition = (
                cohort_dentist_s["-log10p_dentist_s"] >= dentist_s_pvalue_threshold
            ) & (cohort_dentist_s["r2"] >= dentist_s_r2_threshold)

            c3_hits = cohort_dentist_s[dentist_outlier_condition]["SNPID"].tolist()
            c3_snps.update(c3_hits)
            all_snpids.update(c3_hits)

    if not all_snpids:
        return empty_result

    # Build result DataFrame
    result = pd.DataFrame({"SNPID": sorted(all_snpids)})
    result["C1_ld_mismatch"] = result["SNPID"].isin(c1_snps)
    if enable_c1b:
        result["C1b_high_z_residual"] = result["SNPID"].isin(c1b_snps)
    result["C2_marginal"] = result["SNPID"].isin(c2_snps)
    result["C3_dentist_s"] = result["SNPID"].isin(c3_snps)

    return result

kriging_rss(locus, r_tol=1e-08, s=None, eigvens=None, dtype=None)

Compute distribution of z-scores of variant j given other z-scores, and detect possible allele switch issues.

Parameters

locus : Locus Locus object containing summary statistics and LD matrix. r_tol : float, optional Tolerance level for eigenvalue check of positive semidefinite matrix, by default 1e-8. s : Optional[float], optional An estimated s parameter from estimate_s_rss function, by default None. If None, s will be estimated automatically. eigvens : Optional[Dict[str, np.ndarray]], optional Pre-computed eigenvalues and eigenvectors, by default None. dtype : Optional[np.dtype], optional Data type for computation. If None, uses float64. Use np.float32 for reduced memory usage with minimal precision loss.

Returns

pd.DataFrame DataFrame containing the results of the kriging RSS test with columns: - SNPID: SNP identifier - z: transformed z-score - condmean: conditional mean - condvar: conditional variance - z_std_diff: standardized difference - logLR: log likelihood ratio

Raises

ValueError If n <= 1.

Notes

Under the null hypothesis, the RSS model with regularized LD matrix assumes: z|R,s ~ N(0, (1-s)R + sI)

This function uses a mixture of normals to model the conditional distribution of z_j given other z-scores. The method can help detect:

  • Allele switch issues
  • Outlier variants
  • LD inconsistencies

The algorithm: 1. Computes conditional means and variances for each variant 2. Fits a Gaussian mixture model to capture heterogeneity 3. Calculates likelihood ratios for allele switch detection

Source code in credtools/qc.py
def kriging_rss(
    locus: Locus,
    r_tol: float = 1e-8,
    s: Optional[float] = None,
    eigvens: Optional[Dict[str, np.ndarray]] = None,
    dtype: Optional[np.dtype] = None,
) -> pd.DataFrame:
    """
    Compute distribution of z-scores of variant j given other z-scores, and detect possible allele switch issues.

    Parameters
    ----------
    locus : Locus
        Locus object containing summary statistics and LD matrix.
    r_tol : float, optional
        Tolerance level for eigenvalue check of positive semidefinite matrix, by default 1e-8.
    s : Optional[float], optional
        An estimated s parameter from estimate_s_rss function, by default None.
        If None, s will be estimated automatically.
    eigvens : Optional[Dict[str, np.ndarray]], optional
        Pre-computed eigenvalues and eigenvectors, by default None.
    dtype : Optional[np.dtype], optional
        Data type for computation. If None, uses float64.
        Use np.float32 for reduced memory usage with minimal precision loss.

    Returns
    -------
    pd.DataFrame
        DataFrame containing the results of the kriging RSS test with columns:
        - SNPID: SNP identifier
        - z: transformed z-score
        - condmean: conditional mean
        - condvar: conditional variance
        - z_std_diff: standardized difference
        - logLR: log likelihood ratio

    Raises
    ------
    ValueError
        If n <= 1.

    Notes
    -----
    Under the null hypothesis, the RSS model with regularized LD matrix assumes:
    z|R,s ~ N(0, (1-s)R + sI)

    This function uses a mixture of normals to model the conditional distribution
    of z_j given other z-scores. The method can help detect:

    - Allele switch issues
    - Outlier variants
    - LD inconsistencies

    The algorithm:
    1. Computes conditional means and variances for each variant
    2. Fits a Gaussian mixture model to capture heterogeneity
    3. Calculates likelihood ratios for allele switch detection
    """
    # Check and process input arguments z, R
    input_locus = locus.copy()
    input_locus = intersect_sumstat_ld(input_locus)
    z = (
        input_locus.sumstats[ColName.BETA] / input_locus.sumstats[ColName.SE]
    ).to_numpy()
    n = input_locus.sample_size
    z = np.where(np.isnan(z), 0, z)

    # Compute eigenvalues and eigenvectors
    if eigvens is not None:
        eigvals = eigvens["eigvals"]
        eigvecs = eigvens["eigvecs"]
    else:
        eigens = get_eigen(input_locus.ld.r, dtype)
        eigvals = eigens["eigvals"]
        eigvecs = eigens["eigvecs"]
    if s is None:
        s = estimate_s_rss(locus, eigvens={"eigvals": eigvals, "eigvecs": eigvecs})
    eigvals = eigvals[::-1]
    eigvecs = eigvecs[:, ::-1]

    eigvals[eigvals < r_tol] = 0

    if n <= 1:
        raise ValueError("n must be greater than 1")

    sigma2 = (n - 1) / (z**2 + n - 2)
    z = np.sqrt(sigma2) * z

    dinv = 1 / ((1 - s) * eigvals + s)
    dinv[np.isinf(dinv)] = 0
    precision = eigvecs @ (eigvecs * dinv).T

    # Vectorized conditional mean and variance calculation
    diag_prec = np.diag(precision)
    condvar = 1 / diag_prec

    # Compute conditional means vectorized
    precision_off_diag = precision - np.diag(diag_prec)
    condmean = -(precision_off_diag @ z) / diag_prec

    z_std_diff = (z - condmean) / np.sqrt(condvar)

    # Obtain grid
    a_min = 0.8
    z_std_diff_max_sq = np.max(z_std_diff**2)
    a_max = 2 if z_std_diff_max_sq < 1 else 2 * np.sqrt(z_std_diff_max_sq)
    npoint = int(np.ceil(np.log2(a_max / a_min) / np.log2(1.05)))
    # Ensure npoint doesn't exceed number of samples
    npoint = min(npoint, len(z) - 1)
    a_grid = 1.05 ** np.arange(-npoint, 1) * a_max

    # Compute likelihood more efficiently
    sqrt_condvar = np.sqrt(condvar)
    z_centered = z - condmean

    # Use broadcasting to avoid creating large temporary matrices
    matrix_llik = np.empty((len(z), len(a_grid)))
    for i, a in enumerate(a_grid):
        scale = sqrt_condvar * a
        matrix_llik[:, i] = stats.norm.logpdf(z_centered, scale=scale)

    lfactors = np.max(matrix_llik, axis=1)
    matrix_llik -= lfactors[:, np.newaxis]

    # Estimate weight using Gaussian Mixture Model
    # Limit components for better performance and numerical stability
    n_components = min(len(a_grid), max(2, len(z) // 10))
    gmm = GaussianMixture(
        n_components=n_components,
        covariance_type="diag",
        max_iter=500,
        init_params="k-means++",
        random_state=42,
    )
    gmm.fit(matrix_llik)

    # If we reduced components, pad weights with zeros
    if n_components < len(a_grid):
        w = np.zeros(len(a_grid))
        w[:n_components] = gmm.weights_
    else:
        w = gmm.weights_

    # Compute denominators in likelihood ratios
    logl0mix = np.log(np.sum(np.exp(matrix_llik) * (w + 1e-15), axis=1)) + lfactors  # type: ignore

    # Compute numerators in likelihood ratios
    z_plus_condmean = z + condmean
    for i, a in enumerate(a_grid):
        scale = sqrt_condvar * a
        matrix_llik[:, i] = stats.norm.logpdf(z_plus_condmean, scale=scale)

    lfactors = np.max(matrix_llik, axis=1)
    matrix_llik -= lfactors[:, np.newaxis]
    logl1mix = np.log(np.sum(np.exp(matrix_llik) * (w + 1e-15), axis=1)) + lfactors  # type: ignore

    # Compute (log) likelihood ratios
    logLRmix = logl1mix - logl0mix

    res = pd.DataFrame(
        {
            "SNPID": input_locus.sumstats[ColName.SNPID].to_numpy(),
            "z": z,
            "condmean": condmean,
            "condvar": condvar,
            "z_std_diff": z_std_diff,
            "logLR": logLRmix,
        },
        # index=input_locus.sumstats[ColName.SNPID].to_numpy(),
    )
    # TODO: remove variants with logLR > 2 and abs(z) > 2

    return res

ld_4th_moment(locus_set)

Compute the 4th moment of the LD matrix as a measure of LD structure.

Parameters

locus_set : LocusSet LocusSet object containing multiple loci/cohorts.

Returns

pd.DataFrame DataFrame with variants as rows and cohorts as columns, containing the 4th moment values (sum of r^4 - 1 for each variant).

Notes

The 4th moment is calculated as: Σ(r_ij^4) - 1 for each variant i

This metric provides information about: - LD structure complexity - Potential issues with LD matrix quality - Differences in LD patterns between populations

Higher values may indicate: - Strong local LD structure - Potential genotyping errors - Population stratification effects

The function intersects variants across all cohorts to ensure fair comparison.

Source code in credtools/qc.py
def ld_4th_moment(locus_set: LocusSet) -> pd.DataFrame:
    """
    Compute the 4th moment of the LD matrix as a measure of LD structure.

    Parameters
    ----------
    locus_set : LocusSet
        LocusSet object containing multiple loci/cohorts.

    Returns
    -------
    pd.DataFrame
        DataFrame with variants as rows and cohorts as columns, containing
        the 4th moment values (sum of r^4 - 1 for each variant).

    Notes
    -----
    The 4th moment is calculated as:
    Σ(r_ij^4) - 1 for each variant i

    This metric provides information about:
    - LD structure complexity
    - Potential issues with LD matrix quality
    - Differences in LD patterns between populations

    Higher values may indicate:
    - Strong local LD structure
    - Potential genotyping errors
    - Population stratification effects

    The function intersects variants across all cohorts to ensure fair comparison.
    """
    ld_4th_res = []
    # Determine valid cohorts: each cohort must have sumstats ∩ ld.map overlap
    valid_loci = []
    for locus in locus_set.loci:
        cohort_valid_snps = set(locus.sumstats[ColName.SNPID]) & set(
            locus.ld.map[ColName.SNPID]
        )
        if cohort_valid_snps:
            valid_loci.append(locus)
        else:
            logger.warning(
                "Skipping cohort %s_%s in ld_4th_moment: "
                "no overlap between sumstats and LD matrix.",
                locus.popu,
                locus.cohort,
            )
    if not valid_loci:
        return pd.DataFrame()
    # intersect between valid loci — consider both sumstats AND ld.map
    overlap_snps = set(valid_loci[0].sumstats[ColName.SNPID]) & set(
        valid_loci[0].ld.map[ColName.SNPID]
    )
    for locus in valid_loci[1:]:
        locus_valid = set(locus.sumstats[ColName.SNPID]) & set(
            locus.ld.map[ColName.SNPID]
        )
        overlap_snps = overlap_snps & locus_valid
    for locus in valid_loci:
        locus = locus.copy()
        locus.sumstats = locus.sumstats[
            locus.sumstats[ColName.SNPID].isin(overlap_snps)
        ]
        try:
            locus = intersect_sumstat_ld(locus)
        except (ValueError, IndexError):
            logger.warning(
                "Skipping cohort %s_%s in ld_4th_moment: "
                "no overlap after intersection.",
                locus.popu,
                locus.cohort,
            )
            continue
        r_4th = pd.Series(
            index=locus.ld.map[ColName.SNPID], data=np.power(locus.ld.r, 4).sum(axis=0)
        )
        r_4th = r_4th - 1
        r_4th.name = f"{locus.popu}_{locus.cohort}"
        ld_4th_res.append(r_4th)
    if not ld_4th_res:
        return pd.DataFrame()
    return pd.concat(ld_4th_res, axis=1)

ld_decay(locus_set)

Compute LD decay patterns across cohorts.

Parameters

locus_set : LocusSet LocusSet object containing multiple loci/cohorts.

Returns

pd.DataFrame DataFrame containing LD decay information with columns: - distance_kb: distance in kilobases - r2_avg: average r² value at that distance - decay_rate: fitted exponential decay rate parameter - cohort: cohort identifier

Notes

This function analyzes LD decay by:

  1. Computing pairwise distances between all variants
  2. Binning distances into 1kb windows
  3. Calculating average r² within each distance bin
  4. Fitting an exponential decay model: r² = a * exp(-b * distance)

LD decay patterns can reveal: - Population-specific recombination patterns - Effective population size differences - Demographic history effects

Different populations typically show different decay rates due to: - Historical effective population sizes - Admixture patterns - Founder effects

Source code in credtools/qc.py
def ld_decay(locus_set: LocusSet) -> pd.DataFrame:
    """
    Compute LD decay patterns across cohorts.

    Parameters
    ----------
    locus_set : LocusSet
        LocusSet object containing multiple loci/cohorts.

    Returns
    -------
    pd.DataFrame
        DataFrame containing LD decay information with columns:
        - distance_kb: distance in kilobases
        - r2_avg: average r² value at that distance
        - decay_rate: fitted exponential decay rate parameter
        - cohort: cohort identifier

    Notes
    -----
    This function analyzes LD decay by:

    1. Computing pairwise distances between all variants
    2. Binning distances into 1kb windows
    3. Calculating average r² within each distance bin
    4. Fitting an exponential decay model: r² = a * exp(-b * distance)

    LD decay patterns can reveal:
    - Population-specific recombination patterns
    - Effective population size differences
    - Demographic history effects

    Different populations typically show different decay rates due to:
    - Historical effective population sizes
    - Admixture patterns
    - Founder effects
    """

    @jit(nopython=True, cache=True)
    def fit_exp(x: np.ndarray, a: float, b: float) -> np.ndarray:
        return a * np.exp(-b * x)

    binsize = 1000
    decay_res = []
    for locus in locus_set.loci:
        ldmap = locus.ld.map.copy()
        r = locus.ld.r.copy()
        distance_mat = np.array(
            [ldmap["BP"] - ldmap["BP"].values[i] for i in range(len(ldmap))]
        )
        distance_mat = distance_mat[np.tril_indices_from(distance_mat, k=-1)].flatten()
        distance_mat = np.abs(distance_mat)
        r = r[np.tril_indices_from(r, k=-1)].flatten()
        r = np.square(r)
        bins = np.arange(0, ldmap["BP"].max() - ldmap["BP"].min() + binsize, binsize)

        r_sum, _ = np.histogram(distance_mat, bins=bins, weights=r)
        count, _ = np.histogram(distance_mat, bins=bins)

        with np.errstate(divide="ignore", invalid="ignore"):
            r2_avg = np.where(count > 0, r_sum / count, 0)
        popt, _ = curve_fit(fit_exp, bins[1:] / binsize, r2_avg)
        res = pd.DataFrame(
            {
                "distance_kb": bins[1:] / binsize,
                "r2_avg": r2_avg,
                "decay_rate": popt[0],
                "cohort": f"{locus.popu}_{locus.cohort}",
            }
        )
        decay_res.append(res)
    return pd.concat(decay_res, axis=0)

loci_qc(inputs, out_dir, threads=1, remove_outlier=False, logLR_threshold=2, z_threshold=2, z_std_diff_threshold=3, r_threshold=0.8, dentist_s_pvalue_threshold=4, dentist_s_r2_threshold=0.6, enable_c1b=False, c1b_z_std_diff_threshold=10.0, adaptive_qc_flag=False, adaptive_lambda_threshold=0.05)

Perform quality control analysis on multiple loci in parallel.

Parameters

inputs : str Path to input file containing locus information. Must be tab-separated with columns including 'locus_id'. out_dir : str Output directory path where results will be saved. threads : int, optional Number of parallel threads to use, by default 1. remove_outlier : bool, optional Whether to remove outliers and re-run QC on cleaned data, by default False. logLR_threshold : float, optional LogLR threshold for LD mismatch detection, by default 2. z_threshold : float, optional Z-score threshold for both LD mismatch and marginal SNP detection, by default 2. z_std_diff_threshold : float, optional Z_std_diff threshold for marginal SNP outlier detection, by default 3. r_threshold : float, optional Correlation threshold with lead SNP for marginal SNP detection, by default 0.8. dentist_s_pvalue_threshold : float, optional -log10 p-value threshold for Dentist-S outlier detection, by default 4. dentist_s_r2_threshold : float, optional R² threshold for Dentist-S outlier detection, by default 0.6. enable_c1b : bool, optional Enable the C1b "high-z anomalous-residual" criterion, by default False. c1b_z_std_diff_threshold : float, optional Threshold on |z_std_diff| for the C1b rule, by default 10.0. adaptive_qc_flag : bool, optional Route the per-locus removal step through :func:adaptive_qc (Stage A baseline + Stage B C1b augmentation when lambda_s remains elevated), by default False. adaptive_lambda_threshold : float, optional Trigger lambda_s threshold for Stage B C1b augmentation, by default 0.05.

Returns

Dict[str, Any] Run summary containing counts of successful and failed loci and error details. QC result files are written under out_dir.

Raises

ValueError Raised when threads is less than 1.

Notes

This function processes multiple loci in parallel with the following workflow:

  1. Reads locus information from input file
  2. Groups loci by locus_id
  3. Processes each locus group using multiprocessing
  4. Displays progress bar for user feedback
  5. Saves results organized by locus_id

The input file should contain columns: locus_id, prefix, popu, cohort, sample_size.

Output structure: {out_dir}/{locus_id}/{qc_metric}.txt.gz {out_dir}/{locus_id}/qc.txt.gz {out_dir}/qc.txt.gz

Each locus gets its own subdirectory with compressed QC result files. A global QC summary file is also generated at the output directory root.

Source code in credtools/qc.py
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
2362
2363
2364
2365
2366
2367
2368
2369
2370
2371
2372
2373
2374
2375
2376
2377
2378
2379
2380
2381
2382
2383
2384
2385
2386
2387
2388
2389
2390
2391
2392
2393
2394
2395
2396
2397
2398
2399
2400
2401
2402
2403
2404
2405
2406
2407
2408
2409
2410
2411
2412
2413
2414
2415
2416
2417
2418
2419
2420
2421
2422
2423
2424
2425
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444
2445
2446
2447
2448
2449
2450
2451
2452
2453
2454
2455
2456
2457
2458
2459
2460
2461
2462
2463
2464
2465
2466
2467
2468
2469
2470
2471
2472
2473
2474
2475
2476
2477
2478
2479
2480
2481
2482
2483
2484
2485
2486
2487
2488
2489
2490
2491
2492
2493
2494
2495
def loci_qc(
    inputs: str,
    out_dir: str,
    threads: int = 1,
    remove_outlier: bool = False,
    logLR_threshold: float = 2,
    z_threshold: float = 2,
    z_std_diff_threshold: float = 3,
    r_threshold: float = 0.8,
    dentist_s_pvalue_threshold: float = 4,
    dentist_s_r2_threshold: float = 0.6,
    enable_c1b: bool = False,
    c1b_z_std_diff_threshold: float = 10.0,
    adaptive_qc_flag: bool = False,
    adaptive_lambda_threshold: float = 0.05,
) -> Dict[str, Any]:
    """
    Perform quality control analysis on multiple loci in parallel.

    Parameters
    ----------
    inputs : str
        Path to input file containing locus information. Must be tab-separated
        with columns including 'locus_id'.
    out_dir : str
        Output directory path where results will be saved.
    threads : int, optional
        Number of parallel threads to use, by default 1.
    remove_outlier : bool, optional
        Whether to remove outliers and re-run QC on cleaned data, by default False.
    logLR_threshold : float, optional
        LogLR threshold for LD mismatch detection, by default 2.
    z_threshold : float, optional
        Z-score threshold for both LD mismatch and marginal SNP detection, by default 2.
    z_std_diff_threshold : float, optional
        Z_std_diff threshold for marginal SNP outlier detection, by default 3.
    r_threshold : float, optional
        Correlation threshold with lead SNP for marginal SNP detection, by default 0.8.
    dentist_s_pvalue_threshold : float, optional
        -log10 p-value threshold for Dentist-S outlier detection, by default 4.
    dentist_s_r2_threshold : float, optional
        R² threshold for Dentist-S outlier detection, by default 0.6.
    enable_c1b : bool, optional
        Enable the C1b "high-z anomalous-residual" criterion, by default False.
    c1b_z_std_diff_threshold : float, optional
        Threshold on ``|z_std_diff|`` for the C1b rule, by default 10.0.
    adaptive_qc_flag : bool, optional
        Route the per-locus removal step through :func:`adaptive_qc` (Stage A
        baseline + Stage B C1b augmentation when lambda_s remains elevated),
        by default False.
    adaptive_lambda_threshold : float, optional
        Trigger lambda_s threshold for Stage B C1b augmentation, by default 0.05.

    Returns
    -------
    Dict[str, Any]
        Run summary containing counts of successful and failed loci and error
        details. QC result files are written under ``out_dir``.

    Raises
    ------
    ValueError
        Raised when ``threads`` is less than 1.

    Notes
    -----
    This function processes multiple loci in parallel with the following workflow:

    1. Reads locus information from input file
    2. Groups loci by locus_id
    3. Processes each locus group using multiprocessing
    4. Displays progress bar for user feedback
    5. Saves results organized by locus_id

    The input file should contain columns: locus_id, prefix, popu, cohort, sample_size.

    Output structure:
    {out_dir}/{locus_id}/{qc_metric}.txt.gz
    {out_dir}/{locus_id}/qc.txt.gz
    {out_dir}/qc.txt.gz

    Each locus gets its own subdirectory with compressed QC result files.
    A global QC summary file is also generated at the output directory root.
    """
    if threads < 1:
        raise ValueError("threads must be a positive integer")

    os.makedirs(out_dir, exist_ok=True)

    loci_info = pd.read_csv(inputs, sep="	")
    loci_info = check_loci_info(loci_info)  # Validate input data

    progress = Progress(
        SpinnerColumn(),
        TextColumn("[progress.description]{task.description}"),
        BarColumn(),
        MofNCompleteColumn(),
        TimeRemainingColumn(),
    )

    locus_groups = [
        (
            locus_id,
            locus_info,
            out_dir,
            remove_outlier,
            logLR_threshold,
            z_threshold,
            z_std_diff_threshold,
            r_threshold,
            dentist_s_pvalue_threshold,
            dentist_s_r2_threshold,
            enable_c1b,
            c1b_z_std_diff_threshold,
            adaptive_qc_flag,
            adaptive_lambda_threshold,
        )
        for locus_id, locus_info in loci_info.groupby("locus_id")
    ]
    all_summaries: list[pd.DataFrame] = []
    all_outlier_summaries: list[pd.DataFrame] = []
    all_cleaned_summaries: list[pd.DataFrame] = []
    all_outlier_details: list[pd.DataFrame] = []
    total_loci = len(locus_groups)
    start_time = datetime.now()
    log_path = Path(out_dir) / "qc_run_summary.log"
    run_summary: Dict[str, Any] = {
        "start_time": start_time.isoformat(),
        "end_time": None,
        "total_loci": total_loci,
        "successful_loci": 0,
        "failed_loci": 0,
        "errors": [],
        "log_path": str(log_path),
        "parameters": {
            "inputs": inputs,
            "out_dir": out_dir,
            "threads": threads,
            "remove_outlier": remove_outlier,
            "logLR_threshold": logLR_threshold,
            "z_threshold": z_threshold,
            "z_std_diff_threshold": z_std_diff_threshold,
            "r_threshold": r_threshold,
            "dentist_s_pvalue_threshold": dentist_s_pvalue_threshold,
            "dentist_s_r2_threshold": dentist_s_r2_threshold,
            "enable_c1b": enable_c1b,
            "c1b_z_std_diff_threshold": c1b_z_std_diff_threshold,
            "adaptive_qc_flag": adaptive_qc_flag,
            "adaptive_lambda_threshold": adaptive_lambda_threshold,
        },
    }
    logger.info(f"QC run started at {start_time.isoformat()}")
    logger.info(f"Log file will be saved to {log_path}")
    logger.info(f"parameters: {run_summary['parameters']}")

    with progress:
        task = progress.add_task("[cyan]Processing loci...", total=total_loci)
        if locus_groups:
            with Pool(processes=threads) as pool:
                for (
                    locus_id,
                    summary,
                    outlier_summary,
                    cleaned_summary,
                    outlier_detail,
                    error,
                ) in pool.imap_unordered(safe_qc_locus_cli, locus_groups):
                    progress.update(task, advance=1)
                    if error is None:
                        run_summary["successful_loci"] += 1
                        if summary is not None and not summary.empty:
                            all_summaries.append(summary)
                        if outlier_summary is not None and not outlier_summary.empty:
                            all_outlier_summaries.append(outlier_summary)
                        if cleaned_summary is not None and not cleaned_summary.empty:
                            all_cleaned_summaries.append(cleaned_summary)
                        if outlier_detail is not None and not outlier_detail.empty:
                            all_outlier_details.append(outlier_detail)
                    else:
                        run_summary["failed_loci"] += 1
                        run_summary["errors"].append(
                            f"Locus {locus_id} failed: {error}"
                        )

    if all_summaries:
        global_summary = pd.concat(all_summaries, ignore_index=True)
        cols = ["locus_id"] + [
            col for col in global_summary.columns if col != "locus_id"
        ]
        global_summary = global_summary[cols]
        global_summary.to_csv(
            f"{out_dir}/qc.txt.gz",
            sep="	",
            index=False,
            compression="gzip",
            float_format="%.6f",
        )
        logger.info(f"Global QC summary saved to {out_dir}/qc.txt.gz")

    # Save global cleaned QC summary if outlier removal was performed
    if all_cleaned_summaries:
        global_cleaned_summary = pd.concat(all_cleaned_summaries, ignore_index=True)
        cols = ["locus_id"] + [
            col for col in global_cleaned_summary.columns if col != "locus_id"
        ]
        global_cleaned_summary = global_cleaned_summary[cols]
        os.makedirs(f"{out_dir}/cleaned", exist_ok=True)
        global_cleaned_summary.to_csv(
            f"{out_dir}/cleaned/qc_cleaned.txt.gz",
            sep="\t",
            index=False,
            compression="gzip",
            float_format="%.6f",
        )
        logger.info(
            f"Global cleaned QC summary saved to {out_dir}/cleaned/qc_cleaned.txt.gz"
        )

    # Save outlier summary if outlier removal was performed
    if all_outlier_summaries:
        global_outlier_summary = pd.concat(all_outlier_summaries, ignore_index=True)
        os.makedirs(f"{out_dir}/cleaned", exist_ok=True)
        global_outlier_summary.to_csv(
            f"{out_dir}/cleaned/outlier_removal_summary.txt.gz",
            sep="	",
            index=False,
            compression="gzip",
            float_format="%.6f",
        )
        logger.info(
            f"Outlier removal summary saved to {out_dir}/cleaned/outlier_removal_summary.txt.gz"
        )

    # Save global outlier SNP details if any were collected
    if all_outlier_details:
        global_outlier_details = pd.concat(all_outlier_details, ignore_index=True)
        os.makedirs(f"{out_dir}/cleaned", exist_ok=True)
        global_outlier_details.to_csv(
            f"{out_dir}/cleaned/outlier_snps.txt.gz",
            sep="\t",
            index=False,
            compression="gzip",
        )
        logger.info(
            f"Global outlier SNP details saved to {out_dir}/cleaned/outlier_snps.txt.gz"
        )

    if all_outlier_summaries:
        # Create cleaned loci info file
        cleaned_loci_info = []
        for _, row in global_outlier_summary.iterrows():
            cleaned_dir = f"{out_dir}/cleaned/{row['locus_id']}"
            # Use the same prefix logic as Locus.prefix property
            import hashlib

            if "+" in row["cohort"]:
                cohort_hash = hashlib.md5(row["cohort"].encode()).hexdigest()[:8]
                num_cohorts = len(row["cohort"].split("+"))
                prefix = f"{row['popu']}_meta{num_cohorts}cohorts_{cohort_hash}"
            else:
                prefix = f"{row['popu']}_{row['cohort']}"
            cleaned_loci_info.append(
                {
                    "locus_id": row["locus_id"],
                    "prefix": f"{cleaned_dir}/{prefix}",
                    "popu": row["popu"],
                    "cohort": row["cohort"],
                    "sample_size": (
                        loci_info[
                            (loci_info["locus_id"] == row["locus_id"])
                            & (loci_info["popu"] == row["popu"])
                            & (loci_info["cohort"] == row["cohort"])
                        ]["sample_size"].iloc[0]
                        if not loci_info.empty
                        else 0
                    ),
                    "chr": (
                        loci_info[loci_info["locus_id"] == row["locus_id"]]["chr"].iloc[
                            0
                        ]
                        if not loci_info.empty
                        else 0
                    ),
                    "start": (
                        loci_info[loci_info["locus_id"] == row["locus_id"]][
                            "start"
                        ].iloc[0]
                        if not loci_info.empty
                        else 0
                    ),
                    "end": (
                        loci_info[loci_info["locus_id"] == row["locus_id"]]["end"].iloc[
                            0
                        ]
                        if not loci_info.empty
                        else 0
                    ),
                    "n_snps": row["cleaned_snps"],
                    "outliers_removed": row["outliers_removed"],
                }
            )

        cleaned_loci_info_df = pd.DataFrame(cleaned_loci_info)
        cleaned_loci_info_df.to_csv(
            f"{out_dir}/cleaned/cleaned_loci_info.txt.gz",
            sep="	",
            index=False,
            compression="gzip",
        )
        logger.info(
            f"Cleaned loci info saved to {out_dir}/cleaned/cleaned_loci_info.txt.gz"
        )

    run_summary["end_time"] = datetime.now().isoformat()
    with log_path.open("w") as log_file:
        log_file.write("=== CREDTOOLS QC RUN SUMMARY ===\n")
        log_file.write(f"Start Time: {run_summary['start_time']}\n")
        log_file.write(f"End Time: {run_summary['end_time']}\n")
        log_file.write(f"Total Loci: {run_summary['total_loci']}\n")
        log_file.write(f"Successful: {run_summary['successful_loci']}\n")
        log_file.write(f"Failed: {run_summary['failed_loci']}\n")
        log_file.write("\n")
        if run_summary["errors"]:
            log_file.write("Error Details:\n")
            for error in run_summary["errors"]:
                log_file.write(f"  - {error}\n")
            log_file.write("\n")

    logger.info(
        "QC completed: %s succeeded, %s failed. Summary written to %s",
        run_summary["successful_loci"],
        run_summary["failed_loci"],
        log_path,
    )

    return run_summary

locus_qc(locus_set, r_tol=0.001, method='null-mle', out_dir=None, dtype=None, logLR_threshold=2, z_threshold=2, z_std_diff_threshold=3, r_threshold=0.8, dentist_s_pvalue_threshold=4, dentist_s_r2_threshold=0.6)

Perform comprehensive quality control analysis for a locus.

Parameters

locus_set : LocusSet LocusSet object containing loci to analyze. r_tol : float, optional Tolerance level for eigenvalue check of positive semidefinite matrix, by default 1e-3. method : str, optional Method to estimate s parameter, by default "null-mle". Options: "null-mle", "null-partialmle", or "null-pseudomle". out_dir : Optional[str], optional Output directory to save results, by default None. dtype : Optional[np.dtype], optional Data type for computation. If None, uses float64. Use np.float32 for reduced memory usage with minimal precision loss. logLR_threshold : float, optional LogLR threshold for LD mismatch detection, by default 2. z_threshold : float, optional Z-score threshold for both LD mismatch and marginal SNP detection, by default 2. z_std_diff_threshold : float, optional Z_std_diff threshold for marginal SNP outlier detection, by default 3. r_threshold : float, optional Correlation threshold with lead SNP for marginal SNP detection, by default 0.8. dentist_s_pvalue_threshold : float, optional -log10 p-value threshold for Dentist-S outlier detection, by default 4. dentist_s_r2_threshold : float, optional R² threshold for Dentist-S outlier detection, by default 0.6.

Returns

Dict[str, pd.DataFrame] Dictionary of quality control results with keys: - 'expected_z': kriging RSS results - 'dentist_s': Dentist-S test results - 'compare_maf': MAF comparison results

Note: heterogeneity metrics (ld_4th_moment, ld_decay, cochran_q,
snp_missingness) are now computed before meta-analysis via
``credtools.meta.compute_heterogeneity()``.

Notes

This function performs comprehensive QC including:

Single-locus analyses: - Kriging RSS for outlier detection with combined lambda-s outlier criteria: * LD Mismatch Indicators (SuSiE guidelines): logLR > threshold AND |z| > threshold * Marginally Non-significant SNPs: |z| < threshold AND |z_std_diff| > threshold AND |r| > threshold with lead SNP - Dentist-S for outlier detection - MAF comparison between sumstats and LD reference - LD matrix 4th moment analysis - LD decay pattern analysis

Multi-locus analyses (when applicable): - Cochran-Q heterogeneity testing - SNP missingness across cohorts

TODO: Add LAVA (Local Analysis of Variant Associations) analysis.

If out_dir is provided, results are saved as tab-separated files.

Source code in credtools/qc.py
def locus_qc(
    locus_set: LocusSet,
    r_tol: float = 1e-3,
    method: str = "null-mle",
    out_dir: Optional[str] = None,
    dtype: Optional[np.dtype] = None,
    logLR_threshold: float = 2,
    z_threshold: float = 2,
    z_std_diff_threshold: float = 3,
    r_threshold: float = 0.8,
    dentist_s_pvalue_threshold: float = 4,
    dentist_s_r2_threshold: float = 0.6,
) -> Dict[str, pd.DataFrame]:
    """
    Perform comprehensive quality control analysis for a locus.

    Parameters
    ----------
    locus_set : LocusSet
        LocusSet object containing loci to analyze.
    r_tol : float, optional
        Tolerance level for eigenvalue check of positive semidefinite matrix, by default 1e-3.
    method : str, optional
        Method to estimate s parameter, by default "null-mle".
        Options: "null-mle", "null-partialmle", or "null-pseudomle".
    out_dir : Optional[str], optional
        Output directory to save results, by default None.
    dtype : Optional[np.dtype], optional
        Data type for computation. If None, uses float64.
        Use np.float32 for reduced memory usage with minimal precision loss.
    logLR_threshold : float, optional
        LogLR threshold for LD mismatch detection, by default 2.
    z_threshold : float, optional
        Z-score threshold for both LD mismatch and marginal SNP detection, by default 2.
    z_std_diff_threshold : float, optional
        Z_std_diff threshold for marginal SNP outlier detection, by default 3.
    r_threshold : float, optional
        Correlation threshold with lead SNP for marginal SNP detection, by default 0.8.
    dentist_s_pvalue_threshold : float, optional
        -log10 p-value threshold for Dentist-S outlier detection, by default 4.
    dentist_s_r2_threshold : float, optional
        R² threshold for Dentist-S outlier detection, by default 0.6.

    Returns
    -------
    Dict[str, pd.DataFrame]
        Dictionary of quality control results with keys:
        - 'expected_z': kriging RSS results
        - 'dentist_s': Dentist-S test results
        - 'compare_maf': MAF comparison results

        Note: heterogeneity metrics (ld_4th_moment, ld_decay, cochran_q,
        snp_missingness) are now computed before meta-analysis via
        ``credtools.meta.compute_heterogeneity()``.

    Notes
    -----
    This function performs comprehensive QC including:

    Single-locus analyses:
    - Kriging RSS for outlier detection with combined lambda-s outlier criteria:
      * LD Mismatch Indicators (SuSiE guidelines): logLR > threshold AND |z| > threshold
      * Marginally Non-significant SNPs: |z| < threshold AND |z_std_diff| > threshold
        AND |r| > threshold with lead SNP
    - Dentist-S for outlier detection
    - MAF comparison between sumstats and LD reference
    - LD matrix 4th moment analysis
    - LD decay pattern analysis

    Multi-locus analyses (when applicable):
    - Cochran-Q heterogeneity testing
    - SNP missingness across cohorts

    TODO: Add LAVA (Local Analysis of Variant Associations) analysis.

    If out_dir is provided, results are saved as tab-separated files.
    """
    qc_metrics = {}
    all_expected_z = []
    all_dentist_s = []
    all_compare_maf = []
    for locus in locus_set.loci:
        logger.info(f"Processing locus: {locus.locus_id}")
        lo = intersect_sumstat_ld(locus)
        # Compute eigendecomposition once and reuse for both functions
        eigens = get_eigen(lo.ld.r, dtype)
        lambda_s = estimate_s_rss(locus, r_tol, method, eigens, dtype)
        expected_z = kriging_rss(locus, r_tol, lambda_s, eigens, dtype)
        expected_z["lambda_s"] = lambda_s
        expected_z["cohort"] = f"{locus.popu}_{locus.cohort}"
        dentist_s = compute_dentist_s(locus)
        dentist_s["cohort"] = f"{locus.popu}_{locus.cohort}"
        compare_maf_res = compare_maf(locus)
        compare_maf_res["cohort"] = f"{locus.popu}_{locus.cohort}"
        all_expected_z.append(expected_z)
        all_dentist_s.append(dentist_s)
        all_compare_maf.append(compare_maf_res)
    all_expected_z = pd.concat(all_expected_z, axis=0)
    all_dentist_s = pd.concat(all_dentist_s, axis=0)
    all_compare_maf = pd.concat(all_compare_maf, axis=0)
    qc_metrics["expected_z"] = all_expected_z
    qc_metrics["dentist_s"] = all_dentist_s
    qc_metrics["compare_maf"] = all_compare_maf

    if out_dir is not None:
        os.makedirs(out_dir, exist_ok=True)
        for metric_name, metric_data in qc_metrics.items():
            metric_data.to_csv(
                f"{out_dir}/{metric_name}.txt.gz",
                sep="\t",
                index=False,
                float_format="%.6f",
                compression="gzip",
            )

    return qc_metrics

locus_qc_summary(qc_metrics, logLR_threshold=2, z_threshold=2, z_std_diff_threshold=3, r_threshold=0.8, dentist_s_pvalue_threshold=4, dentist_s_r2_threshold=0.6, enable_c1b=False, c1b_z_std_diff_threshold=10.0)

Generate summary QC statistics for a locus across all cohorts.

Parameters

qc_metrics : Dict[str, pd.DataFrame] Dictionary of QC results from locus_qc function. logLR_threshold : float, optional LogLR threshold for LD mismatch detection, by default 2. z_threshold : float, optional Z-score threshold for both LD mismatch and marginal SNP detection, by default 2. z_std_diff_threshold : float, optional Z_std_diff threshold for marginal SNP outlier detection, by default 3. r_threshold : float, optional Correlation threshold with lead SNP for marginal SNP detection, by default 0.8. dentist_s_pvalue_threshold : float, optional -log10 p-value threshold for Dentist-S outlier detection, by default 4. dentist_s_r2_threshold : float, optional R² threshold for Dentist-S outlier detection, by default 0.6. enable_c1b : bool, optional If True, also tabulate the C1b "high-z anomalous-residual" rule into a n_c1b_outlier column. Defaults to False (column omitted). c1b_z_std_diff_threshold : float, optional Threshold on |z_std_diff| for the C1b rule, by default 10.0.

Returns

pd.DataFrame Summary QC statistics with columns: popu, cohort, n_snps, n_1e-5, n_5e-8, maf_corr, lambda_s, n_lambda_s_outlier, n_dentist_s_outlier. When enable_c1b=True, an additional n_c1b_outlier column is appended.

Source code in credtools/qc.py
def locus_qc_summary(
    qc_metrics: Dict[str, pd.DataFrame],
    logLR_threshold: float = 2,
    z_threshold: float = 2,
    z_std_diff_threshold: float = 3,
    r_threshold: float = 0.8,
    dentist_s_pvalue_threshold: float = 4,
    dentist_s_r2_threshold: float = 0.6,
    enable_c1b: bool = False,
    c1b_z_std_diff_threshold: float = 10.0,
) -> pd.DataFrame:
    """
    Generate summary QC statistics for a locus across all cohorts.

    Parameters
    ----------
    qc_metrics : Dict[str, pd.DataFrame]
        Dictionary of QC results from locus_qc function.
    logLR_threshold : float, optional
        LogLR threshold for LD mismatch detection, by default 2.
    z_threshold : float, optional
        Z-score threshold for both LD mismatch and marginal SNP detection, by default 2.
    z_std_diff_threshold : float, optional
        Z_std_diff threshold for marginal SNP outlier detection, by default 3.
    r_threshold : float, optional
        Correlation threshold with lead SNP for marginal SNP detection, by default 0.8.
    dentist_s_pvalue_threshold : float, optional
        -log10 p-value threshold for Dentist-S outlier detection, by default 4.
    dentist_s_r2_threshold : float, optional
        R² threshold for Dentist-S outlier detection, by default 0.6.
    enable_c1b : bool, optional
        If True, also tabulate the C1b "high-z anomalous-residual" rule into
        a ``n_c1b_outlier`` column. Defaults to False (column omitted).
    c1b_z_std_diff_threshold : float, optional
        Threshold on ``|z_std_diff|`` for the C1b rule, by default 10.0.

    Returns
    -------
    pd.DataFrame
        Summary QC statistics with columns: popu, cohort, n_snps, n_1e-5, n_5e-8,
        maf_corr, lambda_s, n_lambda_s_outlier, n_dentist_s_outlier. When
        ``enable_c1b=True``, an additional ``n_c1b_outlier`` column is appended.
    """
    summary_rows = []

    # Get unique cohorts from expected_z results
    expected_z = qc_metrics.get("expected_z", pd.DataFrame())
    if expected_z.empty:
        return pd.DataFrame()

    cohorts = expected_z["cohort"].unique()
    logger.info(
        f"Run QC on parameters: logLR_threshold={logLR_threshold}, z_threshold={z_threshold}, "
    )
    logger.info(
        f"z_std_diff_threshold={z_std_diff_threshold}, r_threshold={r_threshold}, "
    )
    logger.info(
        f"dentist_s_pvalue_threshold={dentist_s_pvalue_threshold}, dentist_s_r2_threshold={dentist_s_r2_threshold}"
    )

    for cohort in cohorts:
        logger.info(f"Generating QC summary for cohort: {cohort}")
        # Parse population and cohort from cohort string (format: "popu_cohort")
        popu, cohort_name = cohort.split("_", 1)

        # Filter data for this cohort
        cohort_expected_z = expected_z[expected_z["cohort"] == cohort]
        cohort_dentist_s = qc_metrics.get("dentist_s", pd.DataFrame())
        if not cohort_dentist_s.empty:
            cohort_dentist_s = cohort_dentist_s[cohort_dentist_s["cohort"] == cohort]
        else:
            cohort_dentist_s = pd.DataFrame()
        cohort_compare_maf = qc_metrics.get("compare_maf", pd.DataFrame())
        if not cohort_compare_maf.empty:
            cohort_compare_maf = cohort_compare_maf[
                cohort_compare_maf["cohort"] == cohort
            ]
        else:
            cohort_compare_maf = pd.DataFrame()

        # Calculate metrics
        n_snps = len(cohort_expected_z)

        # Count p-values < 1e-5 and < 5e-8 (need to calculate p-values from z-scores)
        z_scores = cohort_expected_z["z"].abs()
        p_values = np.maximum(2 * (1 - stats.norm.cdf(z_scores)), 1e-300)
        n_1e_5 = int((p_values < 1e-5).sum())
        n_5e_8 = int((p_values < 5e-8).sum())

        # MAF correlation
        if not cohort_compare_maf.empty and len(cohort_compare_maf) > 1:
            maf_corr = float(
                cohort_compare_maf["MAF_sumstats"].corr(cohort_compare_maf["MAF_ld"])
            )
        else:
            maf_corr = np.nan

        # Lambda-s (take the first value since it's the same for all SNPs in the cohort)
        lambda_s = (
            float(cohort_expected_z["lambda_s"].iloc[0])
            if not cohort_expected_z.empty
            else np.nan
        )

        # Calculate combined n_lambda_s_outlier using new criteria
        # 1. LD Mismatch Indicators (SuSiE guidelines): logLR > threshold AND |z| > threshold
        ld_mismatch_condition = (cohort_expected_z["logLR"] > logLR_threshold) & (
            cohort_expected_z["z"].abs() > z_threshold
        )

        # 2. Marginally Non-significant SNPs: |z| < threshold AND |z_std_diff| > threshold AND |r| > threshold with lead SNP
        # Find lead SNP (lowest p-value) and get its correlation with all SNPs
        if not cohort_expected_z.empty:
            # Calculate p-values from z-scores to find lead SNP
            z_scores_abs = cohort_expected_z["z"].abs()
            p_values_lead = np.maximum(2 * (1 - stats.norm.cdf(z_scores_abs)), 1e-300)
            lead_idx = p_values_lead.argmin()

            # Get correlation matrix for this cohort - need to find the corresponding locus
            # Extract cohort info to match with locus
            current_cohort = cohort
            lead_snp_corr = None

            # Find the locus that matches this cohort to get LD matrix
            for locus in [
                li for li in expected_z["cohort"].unique() if li == current_cohort
            ]:
                # Find the locus object - this is a limitation, we need the actual locus object
                # For now, we'll approximate using the dentist_s results which has r2 with lead SNP
                if not cohort_dentist_s.empty:
                    # Use the dentist_s r values as proxy for correlation with lead SNP
                    # Merge expected_z with dentist_s on SNPID to get r values
                    merged_data = pd.merge(
                        cohort_expected_z,
                        cohort_dentist_s[["SNPID", "r2"]],
                        on="SNPID",
                        how="left",
                    )
                    lead_snp_corr = np.sqrt(
                        merged_data["r2"].fillna(0)
                    )  # Convert r2 to |r|
                else:
                    # If no dentist_s data, set correlation to 0 (will not trigger marginal condition)
                    lead_snp_corr = np.zeros(len(cohort_expected_z))

            if lead_snp_corr is None:
                lead_snp_corr = np.zeros(len(cohort_expected_z))

            marginal_condition = (
                (cohort_expected_z["z"].abs() < z_threshold)
                & (cohort_expected_z["z_std_diff"].abs() > z_std_diff_threshold)
                & (lead_snp_corr > r_threshold)
            )
        else:
            marginal_condition = pd.Series([False] * len(cohort_expected_z))

        # Combined outlier count: either LD mismatch OR marginally non-significant
        n_lambda_s_outlier = int((ld_mismatch_condition | marginal_condition).sum())

        # Count dentist-s outliers: -log10p >= threshold AND r2 >= threshold
        if not cohort_dentist_s.empty:
            dentist_condition = (
                cohort_dentist_s["-log10p_dentist_s"] >= dentist_s_pvalue_threshold
            ) & (cohort_dentist_s["r2"] >= dentist_s_r2_threshold)
            n_dentist_s_outlier = int(dentist_condition.sum())
        else:
            n_dentist_s_outlier = 0

        row: Dict[str, Any] = {
            "popu": popu,
            "cohort": cohort_name,
            "n_snps": n_snps,
            "n_1e-5": n_1e_5,
            "n_5e-8": n_5e_8,
            "maf_corr": maf_corr,
            "lambda_s": lambda_s,
            "n_lambda_s_outlier": n_lambda_s_outlier,
            "n_dentist_s_outlier": n_dentist_s_outlier,
        }
        if enable_c1b:
            if not cohort_expected_z.empty:
                c1b_condition = (
                    cohort_expected_z["z_std_diff"].abs() > c1b_z_std_diff_threshold
                ) & (cohort_expected_z["z"].abs() > z_threshold)
                row["n_c1b_outlier"] = int(c1b_condition.sum())
            else:
                row["n_c1b_outlier"] = 0
        summary_rows.append(row)

    return pd.DataFrame(summary_rows)

qc_locus_cli(args)

Quality control for a single locus (command-line interface wrapper).

Parameters

args : tuple 14-tuple (legacy 10-tuple is auto-padded with defaults). Fields: - locus_id : str - locus_info : pd.DataFrame - base_out_dir : str - remove_outlier : bool - logLR_threshold : float - z_threshold : float - z_std_diff_threshold : float - r_threshold : float - dentist_s_pvalue_threshold : float - dentist_s_r2_threshold : float - enable_c1b : bool - c1b_z_std_diff_threshold : float - adaptive_qc_flag : bool - adaptive_lambda_threshold : float

Returns

Tuple[str, pd.DataFrame, Optional[pd.DataFrame], Optional[pd.DataFrame], Optional[pd.DataFrame]] Tuple containing the locus_id that was processed, its summary QC stats, outlier removal summary, cleaned QC summary, and outlier detail DataFrame if applicable.

Notes

Designed for multiprocessing. When adaptive_qc_flag=True, routes the per-locus removal path through :func:adaptive_qc and persists cleaned/{locus_id}/adaptive_qc_meta.json.

Source code in credtools/qc.py
def qc_locus_cli(
    args: tuple,
) -> Tuple[
    str,
    pd.DataFrame,
    Optional[pd.DataFrame],
    Optional[pd.DataFrame],
    Optional[pd.DataFrame],
]:
    """
    Quality control for a single locus (command-line interface wrapper).

    Parameters
    ----------
    args : tuple
        14-tuple (legacy 10-tuple is auto-padded with defaults). Fields:
        - locus_id : str
        - locus_info : pd.DataFrame
        - base_out_dir : str
        - remove_outlier : bool
        - logLR_threshold : float
        - z_threshold : float
        - z_std_diff_threshold : float
        - r_threshold : float
        - dentist_s_pvalue_threshold : float
        - dentist_s_r2_threshold : float
        - enable_c1b : bool
        - c1b_z_std_diff_threshold : float
        - adaptive_qc_flag : bool
        - adaptive_lambda_threshold : float

    Returns
    -------
    Tuple[str, pd.DataFrame, Optional[pd.DataFrame], Optional[pd.DataFrame], Optional[pd.DataFrame]]
        Tuple containing the locus_id that was processed, its summary QC stats,
        outlier removal summary, cleaned QC summary, and outlier detail DataFrame
        if applicable.

    Notes
    -----
    Designed for multiprocessing. When ``adaptive_qc_flag=True``, routes the
    per-locus removal path through :func:`adaptive_qc` and persists
    ``cleaned/{locus_id}/adaptive_qc_meta.json``.
    """
    (
        locus_id,
        locus_info,
        base_out_dir,
        remove_outlier,
        logLR_threshold,
        z_threshold,
        z_std_diff_threshold,
        r_threshold,
        dentist_s_pvalue_threshold,
        dentist_s_r2_threshold,
        enable_c1b,
        c1b_z_std_diff_threshold,
        adaptive_qc_flag,
        adaptive_lambda_threshold,
    ) = _coerce_qc_locus_args(args)

    # Adaptive mode always removes outliers regardless of the remove_outlier flag.
    effective_remove_outlier = remove_outlier or adaptive_qc_flag

    locus_set = load_locus_set(locus_info)
    qc_metrics = locus_qc(
        locus_set,
        logLR_threshold=logLR_threshold,
        z_threshold=z_threshold,
        z_std_diff_threshold=z_std_diff_threshold,
        r_threshold=r_threshold,
        dentist_s_pvalue_threshold=dentist_s_pvalue_threshold,
        dentist_s_r2_threshold=dentist_s_r2_threshold,
    )
    locus_out_dir = f"{base_out_dir}/{locus_id}"
    os.makedirs(locus_out_dir, exist_ok=True)

    # Save individual QC metrics
    for metric_name, metric_data in qc_metrics.items():
        metric_data.to_csv(
            f"{locus_out_dir}/{metric_name}.txt.gz",
            sep="\t",
            index=False,
            compression="gzip",
            float_format="%.6f",
        )

    # Adaptive mode summary uses C1b column; otherwise stays backward-compatible.
    summary_enable_c1b = enable_c1b or adaptive_qc_flag

    # Generate and save locus-level summary
    summary = locus_qc_summary(
        qc_metrics,
        logLR_threshold=logLR_threshold,
        z_threshold=z_threshold,
        z_std_diff_threshold=z_std_diff_threshold,
        r_threshold=r_threshold,
        dentist_s_pvalue_threshold=dentist_s_pvalue_threshold,
        dentist_s_r2_threshold=dentist_s_r2_threshold,
        enable_c1b=summary_enable_c1b,
        c1b_z_std_diff_threshold=c1b_z_std_diff_threshold,
    )
    if not summary.empty:
        summary.to_csv(
            f"{locus_out_dir}/qc.txt.gz",
            sep="\t",
            index=False,
            compression="gzip",
            float_format="%.6f",
        )
        # Add locus_id for global aggregation
        summary["locus_id"] = locus_id

    # Handle outlier removal if requested
    outlier_summary = None
    cleaned_summary = None
    outlier_detail = None
    if effective_remove_outlier:
        if adaptive_qc_flag:
            logger.info(f"Running adaptive QC for locus {locus_id}")
            (
                _,
                cleaned_qc_metrics,
                outlier_summary,
                outlier_detail,
            ) = _adaptive_qc_for_locus_set(
                locus_set,
                qc_metrics,
                base_out_dir,
                locus_id,
                logLR_threshold=logLR_threshold,
                z_threshold=z_threshold,
                z_std_diff_threshold=z_std_diff_threshold,
                r_threshold=r_threshold,
                dentist_s_pvalue_threshold=dentist_s_pvalue_threshold,
                dentist_s_r2_threshold=dentist_s_r2_threshold,
                c1b_z_std_diff_threshold=c1b_z_std_diff_threshold,
                adaptive_lambda_threshold=adaptive_lambda_threshold,
                enable_c1b_summary=summary_enable_c1b,
            )
        else:
            logger.info(f"Removing outliers and re-running QC for locus {locus_id}")
            (
                _,
                cleaned_qc_metrics,
                outlier_summary,
                outlier_detail,
            ) = remove_outliers_and_rerun_qc(
                locus_set,
                qc_metrics,
                base_out_dir,
                locus_id,
                logLR_threshold=logLR_threshold,
                z_threshold=z_threshold,
                z_std_diff_threshold=z_std_diff_threshold,
                r_threshold=r_threshold,
                dentist_s_pvalue_threshold=dentist_s_pvalue_threshold,
                dentist_s_r2_threshold=dentist_s_r2_threshold,
                enable_c1b=enable_c1b,
                c1b_z_std_diff_threshold=c1b_z_std_diff_threshold,
            )

        # Generate and save cleaned QC summary
        cleaned_summary = locus_qc_summary(
            cleaned_qc_metrics,
            logLR_threshold=logLR_threshold,
            z_threshold=z_threshold,
            z_std_diff_threshold=z_std_diff_threshold,
            r_threshold=r_threshold,
            dentist_s_pvalue_threshold=dentist_s_pvalue_threshold,
            dentist_s_r2_threshold=dentist_s_r2_threshold,
            enable_c1b=summary_enable_c1b,
            c1b_z_std_diff_threshold=c1b_z_std_diff_threshold,
        )
        if not cleaned_summary.empty:
            cleaned_locus_dir = os.path.join(base_out_dir, "cleaned", locus_id)
            cleaned_summary.to_csv(
                f"{cleaned_locus_dir}/qc.txt.gz",
                sep="\t",
                index=False,
                compression="gzip",
                float_format="%.6f",
            )
            cleaned_summary["locus_id"] = locus_id

        # Save outlier removal summary
        if not outlier_summary.empty:
            outlier_summary.to_csv(
                f"{base_out_dir}/cleaned/{locus_id}/outlier_removal_summary.txt.gz",
                sep="\t",
                index=False,
                compression="gzip",
                float_format="%.6f",
            )

    return locus_id, summary, outlier_summary, cleaned_summary, outlier_detail

remove_outliers_and_rerun_qc(locus_set, qc_metrics, base_out_dir, locus_id, r_tol=0.001, method='null-mle', dtype=None, logLR_threshold=2, z_threshold=2, z_std_diff_threshold=3, r_threshold=0.8, dentist_s_pvalue_threshold=4, dentist_s_r2_threshold=0.6, enable_c1b=False, c1b_z_std_diff_threshold=10.0)

Remove outliers from locus set and re-run QC.

Parameters

locus_set : LocusSet Original LocusSet object. qc_metrics : Dict[str, pd.DataFrame] QC metrics from initial QC run. base_out_dir : str Base output directory. locus_id : str Locus identifier. r_tol : float, optional Tolerance level for eigenvalue check, by default 1e-3. method : str, optional Method to estimate s parameter, by default "null-mle". dtype : Optional[np.dtype], optional Data type for computation, by default None.

Returns

Tuple[LocusSet, Dict[str, pd.DataFrame], pd.DataFrame, pd.DataFrame] Cleaned LocusSet, new QC metrics, summary of outlier removal, and detailed outlier SNP DataFrame with columns: SNPID, C1_ld_mismatch, C2_marginal, C3_dentist_s, locus_id, popu, cohort.

Source code in credtools/qc.py
def remove_outliers_and_rerun_qc(
    locus_set: LocusSet,
    qc_metrics: Dict[str, pd.DataFrame],
    base_out_dir: str,
    locus_id: str,
    r_tol: float = 1e-3,
    method: str = "null-mle",
    dtype: Optional[np.dtype] = None,
    logLR_threshold: float = 2,
    z_threshold: float = 2,
    z_std_diff_threshold: float = 3,
    r_threshold: float = 0.8,
    dentist_s_pvalue_threshold: float = 4,
    dentist_s_r2_threshold: float = 0.6,
    enable_c1b: bool = False,
    c1b_z_std_diff_threshold: float = 10.0,
) -> Tuple[LocusSet, Dict[str, pd.DataFrame], pd.DataFrame, pd.DataFrame]:
    """
    Remove outliers from locus set and re-run QC.

    Parameters
    ----------
    locus_set : LocusSet
        Original LocusSet object.
    qc_metrics : Dict[str, pd.DataFrame]
        QC metrics from initial QC run.
    base_out_dir : str
        Base output directory.
    locus_id : str
        Locus identifier.
    r_tol : float, optional
        Tolerance level for eigenvalue check, by default 1e-3.
    method : str, optional
        Method to estimate s parameter, by default "null-mle".
    dtype : Optional[np.dtype], optional
        Data type for computation, by default None.

    Returns
    -------
    Tuple[LocusSet, Dict[str, pd.DataFrame], pd.DataFrame, pd.DataFrame]
        Cleaned LocusSet, new QC metrics, summary of outlier removal, and
        detailed outlier SNP DataFrame with columns: SNPID, C1_ld_mismatch,
        C2_marginal, C3_dentist_s, locus_id, popu, cohort.
    """
    cleaned_loci = []
    outlier_summary = []
    all_outlier_details = []

    for locus in locus_set.loci:
        cohort = f"{locus.popu}_{locus.cohort}"

        # Identify outliers for this cohort (returns DataFrame)
        outlier_df = identify_outliers(
            qc_metrics,
            cohort,
            logLR_threshold=logLR_threshold,
            z_threshold=z_threshold,
            z_std_diff_threshold=z_std_diff_threshold,
            r_threshold=r_threshold,
            dentist_s_pvalue_threshold=dentist_s_pvalue_threshold,
            dentist_s_r2_threshold=dentist_s_r2_threshold,
            enable_c1b=enable_c1b,
            c1b_z_std_diff_threshold=c1b_z_std_diff_threshold,
        )
        outlier_snps = outlier_df["SNPID"].tolist() if not outlier_df.empty else []

        # Save cleaned data
        cleaned_dir = os.path.join(base_out_dir, "cleaned", locus_id)
        prefix = locus.prefix

        if outlier_snps:
            # Remove outliers
            cleaned_locus = remove_snps_from_locus(locus, outlier_snps)
            save_cleaned_locus(cleaned_locus, cleaned_dir, prefix)

            # Add context columns to outlier detail
            locus_outlier_detail = outlier_df.copy()
            locus_outlier_detail["locus_id"] = locus_id
            locus_outlier_detail["popu"] = locus.popu
            locus_outlier_detail["cohort"] = locus.cohort
            all_outlier_details.append(locus_outlier_detail)

            outlier_summary.append(
                {
                    "locus_id": locus_id,
                    "popu": locus.popu,
                    "cohort": locus.cohort,
                    "original_snps": locus.n_snps,
                    "outliers_removed": len(outlier_snps),
                    "cleaned_snps": cleaned_locus.n_snps,
                    "retention_rate": (
                        cleaned_locus.n_snps / locus.n_snps if locus.n_snps > 0 else 0
                    ),
                }
            )
        else:
            # No outliers detected, but still save to cleaned/ for consistency
            cleaned_locus = locus.copy()
            save_cleaned_locus(cleaned_locus, cleaned_dir, prefix)

            outlier_summary.append(
                {
                    "locus_id": locus_id,
                    "popu": locus.popu,
                    "cohort": locus.cohort,
                    "original_snps": locus.n_snps,
                    "outliers_removed": 0,
                    "cleaned_snps": locus.n_snps,
                    "retention_rate": 1.0,
                }
            )

        cleaned_loci.append(cleaned_locus)

    # Create cleaned LocusSet
    cleaned_locus_set = LocusSet(cleaned_loci)

    # Re-run QC on cleaned data
    logger.info(f"Re-running QC on cleaned data for locus {locus_id}")
    cleaned_qc_metrics = locus_qc(
        cleaned_locus_set,
        r_tol=r_tol,
        method=method,
        out_dir=os.path.join(base_out_dir, "cleaned", locus_id),
        dtype=dtype,
        logLR_threshold=logLR_threshold,
        z_threshold=z_threshold,
        z_std_diff_threshold=z_std_diff_threshold,
        r_threshold=r_threshold,
        dentist_s_pvalue_threshold=dentist_s_pvalue_threshold,
        dentist_s_r2_threshold=dentist_s_r2_threshold,
    )

    outlier_summary_df = pd.DataFrame(outlier_summary)

    # Combine all outlier details and save per-locus file
    if all_outlier_details:
        outlier_detail_df = pd.concat(all_outlier_details, ignore_index=True)
    else:
        empty_outlier_cols = ["SNPID", "C1_ld_mismatch"]
        if enable_c1b:
            empty_outlier_cols.append("C1b_high_z_residual")
        empty_outlier_cols.extend(
            ["C2_marginal", "C3_dentist_s", "locus_id", "popu", "cohort"]
        )
        outlier_detail_df = pd.DataFrame(columns=empty_outlier_cols)

    # Save outlier detail file per locus
    outlier_detail_path = os.path.join(cleaned_dir, "outlier_snps.txt.gz")
    os.makedirs(cleaned_dir, exist_ok=True)
    outlier_detail_df.to_csv(
        outlier_detail_path,
        sep="\t",
        index=False,
        compression="gzip",
    )

    return cleaned_locus_set, cleaned_qc_metrics, outlier_summary_df, outlier_detail_df

remove_snps_from_locus(locus, outlier_snps)

Remove outlier SNPs from a Locus object.

Parameters

locus : Locus Input Locus object. outlier_snps : List[str] List of SNP IDs to remove.

Returns

Locus New Locus object with outliers removed.

Source code in credtools/qc.py
def remove_snps_from_locus(locus: Locus, outlier_snps: List[str]) -> Locus:
    """
    Remove outlier SNPs from a Locus object.

    Parameters
    ----------
    locus : Locus
        Input Locus object.
    outlier_snps : List[str]
        List of SNP IDs to remove.

    Returns
    -------
    Locus
        New Locus object with outliers removed.
    """
    if not outlier_snps:
        return locus.copy()

    # Filter sumstats
    cleaned_sumstats = locus.sumstats[
        ~locus.sumstats[ColName.SNPID].isin(outlier_snps)
    ].copy()
    cleaned_sumstats.reset_index(drop=True, inplace=True)

    # Filter LD matrix and map if available
    cleaned_ld = None
    if locus.ld is not None:
        # Get indices of SNPs to keep
        keep_mask = ~locus.ld.map[ColName.SNPID].isin(outlier_snps)
        keep_indices = np.where(keep_mask)[0]

        # Filter LD map
        cleaned_ld_map = locus.ld.map[keep_mask].copy()
        cleaned_ld_map.reset_index(drop=True, inplace=True)

        # Filter LD matrix (remove rows and columns)
        cleaned_r = locus.ld.r[keep_indices, :][:, keep_indices]

        # Create new LDMatrix
        cleaned_ld = LDMatrix(cleaned_ld_map, cleaned_r)

    # Create new Locus object
    cleaned_locus = Locus(
        popu=locus.popu,
        cohort=locus.cohort,
        sample_size=locus.sample_size,
        sumstats=cleaned_sumstats,
        locus_start=locus._locus_start,
        locus_end=locus._locus_end,
        ld=cleaned_ld,
        if_intersect=False,
    )

    logger.info(
        f"Removed {len(outlier_snps)} outliers from locus {locus.locus_id}. "
        f"SNPs reduced from {locus.n_snps} to {cleaned_locus.n_snps}"
    )

    return cleaned_locus

safe_qc_locus_cli(args)

Wrapper for qc_locus_cli that captures exceptions.

Source code in credtools/qc.py
def safe_qc_locus_cli(
    args: tuple,
) -> Tuple[
    str,
    Optional[pd.DataFrame],
    Optional[pd.DataFrame],
    Optional[pd.DataFrame],
    Optional[pd.DataFrame],
    Optional[str],
]:
    """Wrapper for qc_locus_cli that captures exceptions."""
    locus_id = args[0]
    try:
        processed_id, summary, outlier_summary, cleaned_summary, outlier_detail = (
            qc_locus_cli(args)
        )
        return (
            processed_id,
            summary,
            outlier_summary,
            cleaned_summary,
            outlier_detail,
            None,
        )
    except Exception as exc:  # pragma: no cover - defensive logging
        logger.exception("QC failed for locus %s", locus_id)
        return (
            locus_id,
            pd.DataFrame(),
            None,
            None,
            None,
            f"{type(exc).__name__}: {exc}",
        )

save_cleaned_locus(locus, output_dir, prefix)

Save cleaned locus data to files.

Parameters

locus : Locus Cleaned Locus object to save. output_dir : str Output directory path. prefix : str Prefix for output files.

Source code in credtools/qc.py
def save_cleaned_locus(locus: Locus, output_dir: str, prefix: str) -> None:
    """
    Save cleaned locus data to files.

    Parameters
    ----------
    locus : Locus
        Cleaned Locus object to save.
    output_dir : str
        Output directory path.
    prefix : str
        Prefix for output files.
    """
    os.makedirs(output_dir, exist_ok=True)

    # Save summary statistics
    sumstats_path = os.path.join(output_dir, f"{prefix}.sumstats.gz")
    locus.sumstats.to_csv(
        sumstats_path,
        sep="\t",
        index=False,
        compression="gzip",
        float_format="%.6g",
    )
    logger.info(f"Saved cleaned sumstats to {sumstats_path}")

    # Save LD matrix and map if available
    if locus.ld is not None:
        # Save LD matrix as compressed numpy array
        ld_path = os.path.join(output_dir, f"{prefix}.ld.npz")
        np.savez_compressed(ld_path, ld=locus.ld.r.astype(np.float16))
        logger.info(f"Saved cleaned LD matrix to {ld_path}")

        # Save LD map
        ldmap_path = os.path.join(output_dir, f"{prefix}.ldmap.gz")
        locus.ld.map.to_csv(
            ldmap_path,
            sep="\t",
            index=False,
            compression="gzip",
        )
        logger.info(f"Saved cleaned LD map to {ldmap_path}")

snp_missingness(locus_set)

Compute the missingness rate of each cohort across variants.

Parameters

locus_set : LocusSet LocusSet object containing multiple loci/cohorts.

Returns

pd.DataFrame DataFrame with variants as rows and cohorts as columns, where 1 indicates presence and 0 indicates absence of the variant in that cohort.

Warnings

If any cohort has a missing rate > 0.1, a warning is logged.

Notes

This function:

  1. Identifies all unique variants across cohorts
  2. Creates a binary matrix indicating variant presence/absence
  3. Calculates and logs missing rates for each cohort
  4. Issues warnings for cohorts with high missing rates (>10%)

High missing rates may indicate: - Different genotyping platforms - Different quality control criteria - Population-specific variants

Source code in credtools/qc.py
def snp_missingness(locus_set: LocusSet) -> pd.DataFrame:
    """
    Compute the missingness rate of each cohort across variants.

    Parameters
    ----------
    locus_set : LocusSet
        LocusSet object containing multiple loci/cohorts.

    Returns
    -------
    pd.DataFrame
        DataFrame with variants as rows and cohorts as columns, where 1 indicates
        presence and 0 indicates absence of the variant in that cohort.

    Warnings
    --------
    If any cohort has a missing rate > 0.1, a warning is logged.

    Notes
    -----
    This function:

    1. Identifies all unique variants across cohorts
    2. Creates a binary matrix indicating variant presence/absence
    3. Calculates and logs missing rates for each cohort
    4. Issues warnings for cohorts with high missing rates (>10%)

    High missing rates may indicate:
    - Different genotyping platforms
    - Different quality control criteria
    - Population-specific variants
    """
    missingness_df = []
    for locus in locus_set.loci:
        try:
            loc = intersect_sumstat_ld(locus)
        except ValueError:
            logger.warning(
                "Skipping cohort %s_%s in snp_missingness: "
                "no overlap between sumstats and LD matrix.",
                locus.popu,
                locus.cohort,
            )
            continue
        loc = loc.sumstats[[ColName.SNPID]].copy()
        loc[f"{locus.popu}_{locus.cohort}"] = 1
        loc.set_index(ColName.SNPID, inplace=True)
        missingness_df.append(loc)
    if not missingness_df:
        return pd.DataFrame()
    missingness_df = pd.concat(missingness_df, axis=1)
    missingness_df.fillna(0, inplace=True)
    # log warning if missing rate > 0.1
    for col in missingness_df.columns:
        missing_rate = float(
            round(1 - missingness_df[col].sum() / missingness_df.shape[0], 3)
        )
        if missing_rate > 0.1:
            logger.warning(f"The missing rate of {col} is {missing_rate}")
        else:
            logger.info(f"The missing rate of {col} is {missing_rate}")

    return missingness_df