Skip to content

qc

Quality control functions for CREDTOOLS data.

cochran_q(locus_set)

Compute Cochran-Q statistic for heterogeneity testing across cohorts.

Parameters

locus_set : LocusSet LocusSet object containing multiple loci/cohorts.

Returns

pd.DataFrame DataFrame with SNPID as index and columns: - Q: Cochran-Q test statistic - Q_pvalue: p-value from chi-squared test - I_squared: I² heterogeneity statistic (percentage)

Notes

The Cochran-Q test assesses heterogeneity in effect sizes across studies:

Q = Σ w_i(β_i - β_pooled)²

where: - w_i = 1/SE_i² (inverse variance weights) - β_i = effect size in study i - β_pooled = weighted average effect size

The I² statistic quantifies the proportion of total variation due to heterogeneity rather than chance:

I² = max(0, (Q - df)/Q × 100%)

Interpretation: - Q p-value < 0.05: significant heterogeneity - I² > 50%: substantial heterogeneity - I² > 75%: considerable heterogeneity

High heterogeneity may indicate: - Population differences - Different LD patterns - Batch effects - Population stratification

Source code in credtools/qc.py
def cochran_q(locus_set: LocusSet) -> pd.DataFrame:
    """
    Compute Cochran-Q statistic for heterogeneity testing across cohorts.

    Parameters
    ----------
    locus_set : LocusSet
        LocusSet object containing multiple loci/cohorts.

    Returns
    -------
    pd.DataFrame
        DataFrame with SNPID as index and columns:
        - Q: Cochran-Q test statistic
        - Q_pvalue: p-value from chi-squared test
        - I_squared: I² heterogeneity statistic (percentage)

    Notes
    -----
    The Cochran-Q test assesses heterogeneity in effect sizes across studies:

    Q = Σ w_i(β_i - β_pooled)²

    where:
    - w_i = 1/SE_i² (inverse variance weights)
    - β_i = effect size in study i
    - β_pooled = weighted average effect size

    The I² statistic quantifies the proportion of total variation due to
    heterogeneity rather than chance:

    I² = max(0, (Q - df)/Q × 100%)

    Interpretation:
    - Q p-value < 0.05: significant heterogeneity
    - I² > 50%: substantial heterogeneity
    - I² > 75%: considerable heterogeneity

    High heterogeneity may indicate:
    - Population differences
    - Different LD patterns
    - Batch effects
    - Population stratification
    """
    # Outer join: include SNPs present in any cohort
    merged_df = locus_set.loci[0].original_sumstats[[ColName.SNPID]].copy()
    for i, locus_obj in enumerate(locus_set.loci):
        locus_df = locus_obj.sumstats[
            [ColName.SNPID, ColName.BETA, ColName.SE, ColName.EAF]
        ].copy()
        locus_df.rename(
            columns={
                ColName.BETA: f"BETA_{i}",
                ColName.SE: f"SE_{i}",
                ColName.EAF: f"EAF_{i}",
            },
            inplace=True,
        )
        merged_df = pd.merge(
            merged_df, locus_df, on=ColName.SNPID, how="outer", suffixes=("", f"_{i}")
        )

    n_cohorts: int = len(locus_set.loci)

    # Build weight and effect arrays; NaN where a cohort lacks the SNP
    weight_arr = np.column_stack(
        [1.0 / (merged_df[f"SE_{i}"] ** 2) for i in range(n_cohorts)]
    )
    effect_arr = np.column_stack([merged_df[f"BETA_{i}"] for i in range(n_cohorts)])

    # Per-SNP count of non-missing cohorts
    present = ~np.isnan(effect_arr)
    k_per_snp = present.sum(axis=1)

    # Replace NaN with 0 in weights so they don't contribute
    w = np.where(present, weight_arr, 0.0)
    e = np.where(present, effect_arr, 0.0)

    w_sum = w.sum(axis=1)
    weighted_mean = np.where(w_sum > 0, (w * e).sum(axis=1) / w_sum, 0.0)

    # Q = Σ w_i (β_i - β_pooled)² over non-missing cohorts
    Q = (w * (e - weighted_mean[:, None]) ** 2).sum(axis=1)

    # df = k_i - 1 per SNP
    df = k_per_snp - 1

    # P-value and I² only for SNPs with >= 2 cohorts
    p_value = np.full(len(merged_df), np.nan)
    I_squared = np.full(len(merged_df), np.nan)
    valid = df >= 1
    p_value[valid] = np.maximum(stats.chi2.sf(Q[valid], df[valid]), 1e-300)
    with np.errstate(invalid="ignore"):
        I_squared[valid] = np.maximum(0, (Q[valid] - df[valid]) / Q[valid] * 100)
    Q = np.where(valid, Q, np.nan)

    # Create output dataframe
    output_df = pd.DataFrame(
        {
            "SNPID": merged_df["SNPID"],
            "Q": Q,
            "Q_pvalue": p_value,
            "I_squared": I_squared,
            "k": k_per_snp,
        }
    )
    return output_df.set_index(ColName.SNPID)

compare_maf(locus)

Compare allele frequencies between summary statistics and LD reference.

Parameters

locus : Locus Locus object containing summary statistics and LD matrix.

Returns

pd.DataFrame DataFrame containing the comparison results with columns: - SNPID: SNP identifier - MAF_sumstats: MAF from summary statistics - MAF_ld: MAF from LD reference

Returns empty DataFrame if AF2 column is not available in LD matrix.

Warnings

If AF2 column is not present in the LD matrix, a warning is logged.

Notes

This function compares minor allele frequencies (MAF) between:

  1. Summary statistics (derived from EAF)
  2. LD reference panel (from AF2 column)

Large discrepancies may indicate: - Population stratification - Allele frequency differences between studies - Potential data quality issues

MAF is calculated as min(AF, 1-AF) for both sources.

Source code in credtools/qc.py
def compare_maf(locus: Locus) -> pd.DataFrame:
    """
    Compare allele frequencies between summary statistics and LD reference.

    Parameters
    ----------
    locus : Locus
        Locus object containing summary statistics and LD matrix.

    Returns
    -------
    pd.DataFrame
        DataFrame containing the comparison results with columns:
        - SNPID: SNP identifier
        - MAF_sumstats: MAF from summary statistics
        - MAF_ld: MAF from LD reference

        Returns empty DataFrame if AF2 column is not available in LD matrix.

    Warnings
    --------
    If AF2 column is not present in the LD matrix, a warning is logged.

    Notes
    -----
    This function compares minor allele frequencies (MAF) between:

    1. Summary statistics (derived from EAF)
    2. LD reference panel (from AF2 column)

    Large discrepancies may indicate:
    - Population stratification
    - Allele frequency differences between studies
    - Potential data quality issues

    MAF is calculated as min(AF, 1-AF) for both sources.
    """
    input_locus = locus.copy()
    if "AF2" not in input_locus.ld.map.columns:
        logger.warning("AF2 is not in the LD matrix.")
        return pd.DataFrame()
    input_locus = intersect_sumstat_ld(input_locus)
    df = input_locus.sumstats[[ColName.SNPID, ColName.MAF]].copy()
    df.rename(columns={ColName.MAF: "MAF_sumstats"}, inplace=True)
    df.set_index(ColName.SNPID, inplace=True)
    af_ld = pd.Series(
        index=input_locus.ld.map[ColName.SNPID].tolist(),
        data=input_locus.ld.map["AF2"].values,
    )
    maf_ld = np.minimum(af_ld, 1 - af_ld)
    df["MAF_ld"] = maf_ld
    df[ColName.SNPID] = df.index
    df.reset_index(drop=True, inplace=True)
    return df

compute_dentist_s(locus)

Compute Dentist-S statistic and p-value for outlier detection.

Parameters

locus : Locus Locus object containing summary statistics and LD matrix.

Returns

pd.DataFrame DataFrame containing the results of the Dentist-S test with columns: - SNPID: SNP identifier - t_dentist_s: Dentist-S test statistic - -log10p_dentist_s: -log10 p-value

Notes

Reference: https://github.com/mkanai/slalom/blob/854976f8e19e6fad2db3123eb9249e07ba0e1c1b/slalom.py#L254

The Dentist-S statistic tests for outliers by comparing each variant's z-score to what would be expected based on its LD with the lead variant:

t_dentist_s = (z_j - r_jk * z_k)^2 / (1 - r_jk^2)

where: - z_j: z-score for variant j - z_k: z-score for lead variant k - r_jk: LD correlation between variants j and k

TODO: Use ABF to select lead variant, although in most cases the lead variant is the one with the smallest p-value.

Source code in credtools/qc.py
def compute_dentist_s(locus: Locus) -> pd.DataFrame:
    """
    Compute Dentist-S statistic and p-value for outlier detection.

    Parameters
    ----------
    locus : Locus
        Locus object containing summary statistics and LD matrix.

    Returns
    -------
    pd.DataFrame
        DataFrame containing the results of the Dentist-S test with columns:
        - SNPID: SNP identifier
        - t_dentist_s: Dentist-S test statistic
        - -log10p_dentist_s: -log10 p-value

    Notes
    -----
    Reference: https://github.com/mkanai/slalom/blob/854976f8e19e6fad2db3123eb9249e07ba0e1c1b/slalom.py#L254

    The Dentist-S statistic tests for outliers by comparing each variant's z-score
    to what would be expected based on its LD with the lead variant:

    t_dentist_s = (z_j - r_jk * z_k)^2 / (1 - r_jk^2)

    where:
    - z_j: z-score for variant j
    - z_k: z-score for lead variant k
    - r_jk: LD correlation between variants j and k

    TODO: Use ABF to select lead variant, although in most cases the lead variant
    is the one with the smallest p-value.
    """
    input_locus = locus.copy()
    input_locus = intersect_sumstat_ld(input_locus)
    df = input_locus.sumstats.copy()
    df["Z"] = df[ColName.BETA] / df[ColName.SE]
    lead_idx = df[ColName.P].idxmin()
    # TODO: use abf to select lead variant, although in most cases the lead variant is the one with the smallest p-value
    lead_z = df.loc[lead_idx, "Z"]
    df["r"] = input_locus.ld.r[lead_idx]
    df["r2"] = df["r"] ** 2

    df["t_dentist_s"] = (df["Z"] - df["r"] * lead_z) ** 2 / (1 - df["r"] ** 2)  # type: ignore
    df["t_dentist_s"] = np.where(df["t_dentist_s"] < 0, np.inf, df["t_dentist_s"])
    df.at[lead_idx, "t_dentist_s"] = np.nan
    df["-log10p_dentist_s"] = -stats.chi2.logsf(df["t_dentist_s"], df=1) / np.log(10)

    df = df[[ColName.SNPID, "t_dentist_s", "-log10p_dentist_s", "r2"]].copy()
    # df.set_index(ColName.SNPID, inplace=True)
    # df.index.name = None
    return df

estimate_s_rss(locus, r_tol=1e-08, method='null-mle', eigvens=None, dtype=None)

Estimate s parameter in the susie_rss Model Using Regularized LD.

Parameters

locus : Locus Locus object containing summary statistics and LD matrix. r_tol : float, optional Tolerance level for eigenvalue check of positive semidefinite matrix, by default 1e-8. method : str, optional Method to estimate s, by default "null-mle". Options: "null-mle", "null-partialmle", or "null-pseudomle". eigvens : Optional[Dict[str, np.ndarray]], optional Pre-computed eigenvalues and eigenvectors, by default None. dtype : Optional[np.dtype], optional Data type for computation. If None, uses float64. Use np.float32 for reduced memory usage with minimal precision loss.

Returns

float Estimated s value between 0 and 1 (or potentially > 1 for "null-partialmle").

Raises

ValueError If n <= 1 or if the method is not implemented.

Notes

This function estimates the parameter s, which provides information about the consistency between z-scores and the LD matrix. A larger s indicates a strong inconsistency between z-scores and the LD matrix.

The function implements three estimation methods:

  • "null-mle": Maximum likelihood estimation under the null model
  • "null-partialmle": Partial MLE using null space projection
  • "null-pseudomle": Pseudo-likelihood estimation

The z-scores are transformed using the formula: z_transformed = sqrt(sigma2) * z where sigma2 = (n-1) / (z^2 + n-2)

Source code in credtools/qc.py
def estimate_s_rss(
    locus: Locus,
    r_tol: float = 1e-8,
    method: str = "null-mle",
    eigvens: Optional[Dict[str, np.ndarray]] = None,
    dtype: Optional[np.dtype] = None,
) -> float:
    """
    Estimate s parameter in the susie_rss Model Using Regularized LD.

    Parameters
    ----------
    locus : Locus
        Locus object containing summary statistics and LD matrix.
    r_tol : float, optional
        Tolerance level for eigenvalue check of positive semidefinite matrix, by default 1e-8.
    method : str, optional
        Method to estimate s, by default "null-mle".
        Options: "null-mle", "null-partialmle", or "null-pseudomle".
    eigvens : Optional[Dict[str, np.ndarray]], optional
        Pre-computed eigenvalues and eigenvectors, by default None.
    dtype : Optional[np.dtype], optional
        Data type for computation. If None, uses float64.
        Use np.float32 for reduced memory usage with minimal precision loss.

    Returns
    -------
    float
        Estimated s value between 0 and 1 (or potentially > 1 for "null-partialmle").

    Raises
    ------
    ValueError
        If n <= 1 or if the method is not implemented.

    Notes
    -----
    This function estimates the parameter s, which provides information about the
    consistency between z-scores and the LD matrix. A larger s indicates a strong
    inconsistency between z-scores and the LD matrix.

    The function implements three estimation methods:

    - "null-mle": Maximum likelihood estimation under the null model
    - "null-partialmle": Partial MLE using null space projection
    - "null-pseudomle": Pseudo-likelihood estimation

    The z-scores are transformed using the formula:
    z_transformed = sqrt(sigma2) * z
    where sigma2 = (n-1) / (z^2 + n-2)
    """
    # make sure the LD matrix and sumstats file are matched
    input_locus = locus.copy()
    input_locus = intersect_sumstat_ld(input_locus)
    z = (
        input_locus.sumstats[ColName.BETA] / input_locus.sumstats[ColName.SE]
    ).to_numpy()
    n = input_locus.sample_size
    # Check and process input arguments z, R
    z = np.where(np.isnan(z), 0, z)
    if eigvens is not None:
        eigvals = eigvens["eigvals"]
        eigvecs = eigvens["eigvecs"]
    else:
        eigens = get_eigen(input_locus.ld.r, dtype)
        eigvals = eigens["eigvals"]
        eigvecs = eigens["eigvecs"]

    # if np.any(eigvals < -r_tol):
    #     logger.warning("The LD matrix is not positive semidefinite. Negative eigenvalues are set to zero")
    eigvals[eigvals < r_tol] = 0

    if n <= 1:
        raise ValueError("n must be greater than 1")

    sigma2 = (n - 1) / (z**2 + n - 2)
    z = np.sqrt(sigma2) * z

    if method == "null-mle":

        @jit(nopython=True, cache=True)
        def negloglikelihood(s, ztv, d):
            denom = (1 - s) * d + s
            term1 = 0.5 * np.sum(np.log(denom))
            term2 = 0.5 * np.sum((ztv / denom) * ztv)
            return term1 + term2

        ztv = eigvecs.T @ z
        result = minimize_scalar(
            negloglikelihood,
            bounds=(0, 1),
            method="bounded",
            args=(ztv, eigvals),
            options={"xatol": np.sqrt(np.finfo(float).eps)},
        )
        s = result.x  # type: ignore

    elif method == "null-partialmle":
        colspace = np.where(eigvals > 0)[0]
        if len(colspace) == len(z):
            s = 0
        else:
            znull = eigvecs[:, ~np.isin(np.arange(len(z)), colspace)].T @ z
            s = np.sum(znull**2) / len(znull)

    elif method == "null-pseudomle":

        def pseudolikelihood(
            s: float, z: np.ndarray, eigvals: np.ndarray, eigvecs: np.ndarray
        ) -> float:
            precision = eigvecs @ (eigvecs.T / ((1 - s) * eigvals + s))
            postmean = np.zeros_like(z)
            postvar = np.zeros_like(z)
            for i in range(len(z)):
                postmean[i] = -(1 / precision[i, i]) * precision[i, :].dot(z) + z[i]
                postvar[i] = 1 / precision[i, i]
            return -np.sum(stats.norm.logpdf(z, loc=postmean, scale=np.sqrt(postvar)))

        result = minimize_scalar(
            pseudolikelihood,
            bounds=(0, 1),
            method="bounded",
            args=(z, eigvals, eigvecs),
        )
        s = result.x  # type: ignore

    else:
        raise ValueError("The method is not implemented")

    return s  # type: ignore

get_eigen(ldmatrix, dtype=None)

Compute eigenvalues and eigenvectors of LD matrix.

Parameters

ldmatrix : np.ndarray A p by p symmetric, positive semidefinite correlation matrix. dtype : Optional[np.dtype], optional Data type for computation. If None, uses the input matrix dtype. Use np.float32 for reduced memory usage with minimal precision loss.

Returns

Dict[str, np.ndarray] Dictionary containing eigenvalues and eigenvectors with keys: - 'eigvals': eigenvalues array - 'eigvecs': eigenvectors matrix

Notes

TODO: accelerate with joblib for large matrices.

This function uses numpy.linalg.eigh which is optimized for symmetric matrices and returns eigenvalues in ascending order.

Source code in credtools/qc.py
def get_eigen(
    ldmatrix: np.ndarray, dtype: Optional[np.dtype] = None
) -> Dict[str, np.ndarray]:
    """
    Compute eigenvalues and eigenvectors of LD matrix.

    Parameters
    ----------
    ldmatrix : np.ndarray
        A p by p symmetric, positive semidefinite correlation matrix.
    dtype : Optional[np.dtype], optional
        Data type for computation. If None, uses the input matrix dtype.
        Use np.float32 for reduced memory usage with minimal precision loss.

    Returns
    -------
    Dict[str, np.ndarray]
        Dictionary containing eigenvalues and eigenvectors with keys:
        - 'eigvals': eigenvalues array
        - 'eigvecs': eigenvectors matrix

    Notes
    -----
    TODO: accelerate with joblib for large matrices.

    This function uses numpy.linalg.eigh which is optimized for symmetric matrices
    and returns eigenvalues in ascending order.
    """
    if dtype is not None:
        ldmatrix = ldmatrix.astype(dtype)
    eigvals, eigvecs = np.linalg.eigh(ldmatrix)
    return {"eigvals": eigvals, "eigvecs": eigvecs}

identify_outliers(qc_metrics, cohort, logLR_threshold=2, z_threshold=2, z_std_diff_threshold=3, r_threshold=0.8, dentist_s_pvalue_threshold=4, dentist_s_r2_threshold=0.6)

Identify outlier SNPs based on QC metrics.

Parameters

qc_metrics : Dict[str, pd.DataFrame] Dictionary of QC results from locus_qc function. cohort : str Cohort identifier (format: "popu_cohort"). logLR_threshold : float, optional LogLR threshold for LD mismatch detection, by default 2. z_threshold : float, optional Z-score threshold for both LD mismatch and marginal SNP detection, by default 2. z_std_diff_threshold : float, optional Z_std_diff threshold for marginal SNP outlier detection, by default 3. r_threshold : float, optional Correlation threshold with lead SNP for marginal SNP detection, by default 0.8. dentist_s_pvalue_threshold : float, optional -log10 p-value threshold for Dentist-S outlier detection, by default 4. dentist_s_r2_threshold : float, optional R² threshold for Dentist-S outlier detection, by default 0.6.

Returns

pd.DataFrame DataFrame of outlier SNPs with columns: SNPID, C1_ld_mismatch, C2_marginal, C3_dentist_s. Each row represents a SNP that triggered at least one outlier criterion.

Source code in credtools/qc.py
def identify_outliers(
    qc_metrics: Dict[str, pd.DataFrame],
    cohort: str,
    logLR_threshold: float = 2,
    z_threshold: float = 2,
    z_std_diff_threshold: float = 3,
    r_threshold: float = 0.8,
    dentist_s_pvalue_threshold: float = 4,
    dentist_s_r2_threshold: float = 0.6,
) -> pd.DataFrame:
    """
    Identify outlier SNPs based on QC metrics.

    Parameters
    ----------
    qc_metrics : Dict[str, pd.DataFrame]
        Dictionary of QC results from locus_qc function.
    cohort : str
        Cohort identifier (format: "popu_cohort").
    logLR_threshold : float, optional
        LogLR threshold for LD mismatch detection, by default 2.
    z_threshold : float, optional
        Z-score threshold for both LD mismatch and marginal SNP detection, by default 2.
    z_std_diff_threshold : float, optional
        Z_std_diff threshold for marginal SNP outlier detection, by default 3.
    r_threshold : float, optional
        Correlation threshold with lead SNP for marginal SNP detection, by default 0.8.
    dentist_s_pvalue_threshold : float, optional
        -log10 p-value threshold for Dentist-S outlier detection, by default 4.
    dentist_s_r2_threshold : float, optional
        R² threshold for Dentist-S outlier detection, by default 0.6.

    Returns
    -------
    pd.DataFrame
        DataFrame of outlier SNPs with columns: SNPID, C1_ld_mismatch,
        C2_marginal, C3_dentist_s. Each row represents a SNP that triggered
        at least one outlier criterion.
    """
    empty_result = pd.DataFrame(
        columns=["SNPID", "C1_ld_mismatch", "C2_marginal", "C3_dentist_s"]
    )

    # Collect all SNPIDs and their criterion flags
    all_snpids: set = set()
    c1_snps: set = set()
    c2_snps: set = set()
    c3_snps: set = set()

    # Get expected_z metrics for this cohort
    expected_z = qc_metrics.get("expected_z", pd.DataFrame())
    if not expected_z.empty:
        cohort_expected_z = expected_z[expected_z["cohort"] == cohort]

        if not cohort_expected_z.empty:
            # C1: LD Mismatch Indicators
            ld_mismatch_condition = (cohort_expected_z["logLR"] > logLR_threshold) & (
                cohort_expected_z["z"].abs() > z_threshold
            )

            # C2: Marginally Non-significant SNPs
            # Get lead SNP correlation from dentist_s results
            cohort_dentist_s = qc_metrics.get("dentist_s", pd.DataFrame())
            if not cohort_dentist_s.empty:
                cohort_dentist_s = cohort_dentist_s[
                    cohort_dentist_s["cohort"] == cohort
                ]

                # Merge to get r values
                merged_data = pd.merge(
                    cohort_expected_z[["SNPID", "z", "z_std_diff"]],
                    cohort_dentist_s[["SNPID", "r2"]],
                    on="SNPID",
                    how="left",
                )
                merged_data["r_abs"] = np.sqrt(merged_data["r2"].fillna(0))

                marginal_condition = (
                    merged_data["z_std_diff"].abs() > z_std_diff_threshold
                ) & (merged_data["r_abs"] > r_threshold)

                # Collect C1 and C2 SNPs
                c1_hits = merged_data[ld_mismatch_condition.values]["SNPID"].tolist()
                c2_hits = merged_data[marginal_condition]["SNPID"].tolist()
                c1_snps.update(c1_hits)
                c2_snps.update(c2_hits)
                all_snpids.update(c1_hits)
                all_snpids.update(c2_hits)

    # C3: Dentist-S outliers
    dentist_s = qc_metrics.get("dentist_s", pd.DataFrame())
    if not dentist_s.empty:
        cohort_dentist_s = dentist_s[dentist_s["cohort"] == cohort]

        if not cohort_dentist_s.empty:
            dentist_outlier_condition = (
                cohort_dentist_s["-log10p_dentist_s"] >= dentist_s_pvalue_threshold
            ) & (cohort_dentist_s["r2"] >= dentist_s_r2_threshold)

            c3_hits = cohort_dentist_s[dentist_outlier_condition]["SNPID"].tolist()
            c3_snps.update(c3_hits)
            all_snpids.update(c3_hits)

    if not all_snpids:
        return empty_result

    # Build result DataFrame
    result = pd.DataFrame({"SNPID": sorted(all_snpids)})
    result["C1_ld_mismatch"] = result["SNPID"].isin(c1_snps)
    result["C2_marginal"] = result["SNPID"].isin(c2_snps)
    result["C3_dentist_s"] = result["SNPID"].isin(c3_snps)

    return result

kriging_rss(locus, r_tol=1e-08, s=None, eigvens=None, dtype=None)

Compute distribution of z-scores of variant j given other z-scores, and detect possible allele switch issues.

Parameters

locus : Locus Locus object containing summary statistics and LD matrix. r_tol : float, optional Tolerance level for eigenvalue check of positive semidefinite matrix, by default 1e-8. s : Optional[float], optional An estimated s parameter from estimate_s_rss function, by default None. If None, s will be estimated automatically. eigvens : Optional[Dict[str, np.ndarray]], optional Pre-computed eigenvalues and eigenvectors, by default None. dtype : Optional[np.dtype], optional Data type for computation. If None, uses float64. Use np.float32 for reduced memory usage with minimal precision loss.

Returns

pd.DataFrame DataFrame containing the results of the kriging RSS test with columns: - SNPID: SNP identifier - z: transformed z-score - condmean: conditional mean - condvar: conditional variance - z_std_diff: standardized difference - logLR: log likelihood ratio

Raises

ValueError If n <= 1.

Notes

Under the null hypothesis, the RSS model with regularized LD matrix assumes: z|R,s ~ N(0, (1-s)R + sI)

This function uses a mixture of normals to model the conditional distribution of z_j given other z-scores. The method can help detect:

  • Allele switch issues
  • Outlier variants
  • LD inconsistencies

The algorithm: 1. Computes conditional means and variances for each variant 2. Fits a Gaussian mixture model to capture heterogeneity 3. Calculates likelihood ratios for allele switch detection

Source code in credtools/qc.py
def kriging_rss(
    locus: Locus,
    r_tol: float = 1e-8,
    s: Optional[float] = None,
    eigvens: Optional[Dict[str, np.ndarray]] = None,
    dtype: Optional[np.dtype] = None,
) -> pd.DataFrame:
    """
    Compute distribution of z-scores of variant j given other z-scores, and detect possible allele switch issues.

    Parameters
    ----------
    locus : Locus
        Locus object containing summary statistics and LD matrix.
    r_tol : float, optional
        Tolerance level for eigenvalue check of positive semidefinite matrix, by default 1e-8.
    s : Optional[float], optional
        An estimated s parameter from estimate_s_rss function, by default None.
        If None, s will be estimated automatically.
    eigvens : Optional[Dict[str, np.ndarray]], optional
        Pre-computed eigenvalues and eigenvectors, by default None.
    dtype : Optional[np.dtype], optional
        Data type for computation. If None, uses float64.
        Use np.float32 for reduced memory usage with minimal precision loss.

    Returns
    -------
    pd.DataFrame
        DataFrame containing the results of the kriging RSS test with columns:
        - SNPID: SNP identifier
        - z: transformed z-score
        - condmean: conditional mean
        - condvar: conditional variance
        - z_std_diff: standardized difference
        - logLR: log likelihood ratio

    Raises
    ------
    ValueError
        If n <= 1.

    Notes
    -----
    Under the null hypothesis, the RSS model with regularized LD matrix assumes:
    z|R,s ~ N(0, (1-s)R + sI)

    This function uses a mixture of normals to model the conditional distribution
    of z_j given other z-scores. The method can help detect:

    - Allele switch issues
    - Outlier variants
    - LD inconsistencies

    The algorithm:
    1. Computes conditional means and variances for each variant
    2. Fits a Gaussian mixture model to capture heterogeneity
    3. Calculates likelihood ratios for allele switch detection
    """
    # Check and process input arguments z, R
    input_locus = locus.copy()
    input_locus = intersect_sumstat_ld(input_locus)
    z = (
        input_locus.sumstats[ColName.BETA] / input_locus.sumstats[ColName.SE]
    ).to_numpy()
    n = input_locus.sample_size
    z = np.where(np.isnan(z), 0, z)

    # Compute eigenvalues and eigenvectors
    if eigvens is not None:
        eigvals = eigvens["eigvals"]
        eigvecs = eigvens["eigvecs"]
    else:
        eigens = get_eigen(input_locus.ld.r, dtype)
        eigvals = eigens["eigvals"]
        eigvecs = eigens["eigvecs"]
    if s is None:
        s = estimate_s_rss(locus, eigvens={"eigvals": eigvals, "eigvecs": eigvecs})
    eigvals = eigvals[::-1]
    eigvecs = eigvecs[:, ::-1]

    eigvals[eigvals < r_tol] = 0

    if n <= 1:
        raise ValueError("n must be greater than 1")

    sigma2 = (n - 1) / (z**2 + n - 2)
    z = np.sqrt(sigma2) * z

    dinv = 1 / ((1 - s) * eigvals + s)
    dinv[np.isinf(dinv)] = 0
    precision = eigvecs @ (eigvecs * dinv).T

    # Vectorized conditional mean and variance calculation
    diag_prec = np.diag(precision)
    condvar = 1 / diag_prec

    # Compute conditional means vectorized
    precision_off_diag = precision - np.diag(diag_prec)
    condmean = -(precision_off_diag @ z) / diag_prec

    z_std_diff = (z - condmean) / np.sqrt(condvar)

    # Obtain grid
    a_min = 0.8
    z_std_diff_max_sq = np.max(z_std_diff**2)
    a_max = 2 if z_std_diff_max_sq < 1 else 2 * np.sqrt(z_std_diff_max_sq)
    npoint = int(np.ceil(np.log2(a_max / a_min) / np.log2(1.05)))
    # Ensure npoint doesn't exceed number of samples
    npoint = min(npoint, len(z) - 1)
    a_grid = 1.05 ** np.arange(-npoint, 1) * a_max

    # Compute likelihood more efficiently
    sqrt_condvar = np.sqrt(condvar)
    z_centered = z - condmean

    # Use broadcasting to avoid creating large temporary matrices
    matrix_llik = np.empty((len(z), len(a_grid)))
    for i, a in enumerate(a_grid):
        scale = sqrt_condvar * a
        matrix_llik[:, i] = stats.norm.logpdf(z_centered, scale=scale)

    lfactors = np.max(matrix_llik, axis=1)
    matrix_llik -= lfactors[:, np.newaxis]

    # Estimate weight using Gaussian Mixture Model
    # Limit components for better performance and numerical stability
    n_components = min(len(a_grid), max(2, len(z) // 10))
    gmm = GaussianMixture(
        n_components=n_components,
        covariance_type="diag",
        max_iter=500,
        init_params="k-means++",
        random_state=42,
    )
    gmm.fit(matrix_llik)

    # If we reduced components, pad weights with zeros
    if n_components < len(a_grid):
        w = np.zeros(len(a_grid))
        w[:n_components] = gmm.weights_
    else:
        w = gmm.weights_

    # Compute denominators in likelihood ratios
    logl0mix = np.log(np.sum(np.exp(matrix_llik) * (w + 1e-15), axis=1)) + lfactors  # type: ignore

    # Compute numerators in likelihood ratios
    z_plus_condmean = z + condmean
    for i, a in enumerate(a_grid):
        scale = sqrt_condvar * a
        matrix_llik[:, i] = stats.norm.logpdf(z_plus_condmean, scale=scale)

    lfactors = np.max(matrix_llik, axis=1)
    matrix_llik -= lfactors[:, np.newaxis]
    logl1mix = np.log(np.sum(np.exp(matrix_llik) * (w + 1e-15), axis=1)) + lfactors  # type: ignore

    # Compute (log) likelihood ratios
    logLRmix = logl1mix - logl0mix

    res = pd.DataFrame(
        {
            "SNPID": input_locus.sumstats[ColName.SNPID].to_numpy(),
            "z": z,
            "condmean": condmean,
            "condvar": condvar,
            "z_std_diff": z_std_diff,
            "logLR": logLRmix,
        },
        # index=input_locus.sumstats[ColName.SNPID].to_numpy(),
    )
    # TODO: remove variants with logLR > 2 and abs(z) > 2

    return res

ld_4th_moment(locus_set)

Compute the 4th moment of the LD matrix as a measure of LD structure.

Parameters

locus_set : LocusSet LocusSet object containing multiple loci/cohorts.

Returns

pd.DataFrame DataFrame with variants as rows and cohorts as columns, containing the 4th moment values (sum of r^4 - 1 for each variant).

Notes

The 4th moment is calculated as: Σ(r_ij^4) - 1 for each variant i

This metric provides information about: - LD structure complexity - Potential issues with LD matrix quality - Differences in LD patterns between populations

Higher values may indicate: - Strong local LD structure - Potential genotyping errors - Population stratification effects

The function intersects variants across all cohorts to ensure fair comparison.

Source code in credtools/qc.py
def ld_4th_moment(locus_set: LocusSet) -> pd.DataFrame:
    """
    Compute the 4th moment of the LD matrix as a measure of LD structure.

    Parameters
    ----------
    locus_set : LocusSet
        LocusSet object containing multiple loci/cohorts.

    Returns
    -------
    pd.DataFrame
        DataFrame with variants as rows and cohorts as columns, containing
        the 4th moment values (sum of r^4 - 1 for each variant).

    Notes
    -----
    The 4th moment is calculated as:
    Σ(r_ij^4) - 1 for each variant i

    This metric provides information about:
    - LD structure complexity
    - Potential issues with LD matrix quality
    - Differences in LD patterns between populations

    Higher values may indicate:
    - Strong local LD structure
    - Potential genotyping errors
    - Population stratification effects

    The function intersects variants across all cohorts to ensure fair comparison.
    """
    ld_4th_res = []
    # Determine valid cohorts: each cohort must have sumstats ∩ ld.map overlap
    valid_loci = []
    for locus in locus_set.loci:
        cohort_valid_snps = set(locus.sumstats[ColName.SNPID]) & set(
            locus.ld.map[ColName.SNPID]
        )
        if cohort_valid_snps:
            valid_loci.append(locus)
        else:
            logger.warning(
                "Skipping cohort %s_%s in ld_4th_moment: "
                "no overlap between sumstats and LD matrix.",
                locus.popu,
                locus.cohort,
            )
    if not valid_loci:
        return pd.DataFrame()
    # intersect between valid loci — consider both sumstats AND ld.map
    overlap_snps = set(valid_loci[0].sumstats[ColName.SNPID]) & set(
        valid_loci[0].ld.map[ColName.SNPID]
    )
    for locus in valid_loci[1:]:
        locus_valid = set(locus.sumstats[ColName.SNPID]) & set(
            locus.ld.map[ColName.SNPID]
        )
        overlap_snps = overlap_snps & locus_valid
    for locus in valid_loci:
        locus = locus.copy()
        locus.sumstats = locus.sumstats[
            locus.sumstats[ColName.SNPID].isin(overlap_snps)
        ]
        try:
            locus = intersect_sumstat_ld(locus)
        except (ValueError, IndexError):
            logger.warning(
                "Skipping cohort %s_%s in ld_4th_moment: "
                "no overlap after intersection.",
                locus.popu,
                locus.cohort,
            )
            continue
        r_4th = pd.Series(
            index=locus.ld.map[ColName.SNPID], data=np.power(locus.ld.r, 4).sum(axis=0)
        )
        r_4th = r_4th - 1
        r_4th.name = f"{locus.popu}_{locus.cohort}"
        ld_4th_res.append(r_4th)
    if not ld_4th_res:
        return pd.DataFrame()
    return pd.concat(ld_4th_res, axis=1)

ld_decay(locus_set)

Compute LD decay patterns across cohorts.

Parameters

locus_set : LocusSet LocusSet object containing multiple loci/cohorts.

Returns

pd.DataFrame DataFrame containing LD decay information with columns: - distance_kb: distance in kilobases - r2_avg: average r² value at that distance - decay_rate: fitted exponential decay rate parameter - cohort: cohort identifier

Notes

This function analyzes LD decay by:

  1. Computing pairwise distances between all variants
  2. Binning distances into 1kb windows
  3. Calculating average r² within each distance bin
  4. Fitting an exponential decay model: r² = a * exp(-b * distance)

LD decay patterns can reveal: - Population-specific recombination patterns - Effective population size differences - Demographic history effects

Different populations typically show different decay rates due to: - Historical effective population sizes - Admixture patterns - Founder effects

Source code in credtools/qc.py
def ld_decay(locus_set: LocusSet) -> pd.DataFrame:
    """
    Compute LD decay patterns across cohorts.

    Parameters
    ----------
    locus_set : LocusSet
        LocusSet object containing multiple loci/cohorts.

    Returns
    -------
    pd.DataFrame
        DataFrame containing LD decay information with columns:
        - distance_kb: distance in kilobases
        - r2_avg: average r² value at that distance
        - decay_rate: fitted exponential decay rate parameter
        - cohort: cohort identifier

    Notes
    -----
    This function analyzes LD decay by:

    1. Computing pairwise distances between all variants
    2. Binning distances into 1kb windows
    3. Calculating average r² within each distance bin
    4. Fitting an exponential decay model: r² = a * exp(-b * distance)

    LD decay patterns can reveal:
    - Population-specific recombination patterns
    - Effective population size differences
    - Demographic history effects

    Different populations typically show different decay rates due to:
    - Historical effective population sizes
    - Admixture patterns
    - Founder effects
    """

    @jit(nopython=True, cache=True)
    def fit_exp(x: np.ndarray, a: float, b: float) -> np.ndarray:
        return a * np.exp(-b * x)

    binsize = 1000
    decay_res = []
    for locus in locus_set.loci:
        ldmap = locus.ld.map.copy()
        r = locus.ld.r.copy()
        distance_mat = np.array(
            [ldmap["BP"] - ldmap["BP"].values[i] for i in range(len(ldmap))]
        )
        distance_mat = distance_mat[np.tril_indices_from(distance_mat, k=-1)].flatten()
        distance_mat = np.abs(distance_mat)
        r = r[np.tril_indices_from(r, k=-1)].flatten()
        r = np.square(r)
        bins = np.arange(0, ldmap["BP"].max() - ldmap["BP"].min() + binsize, binsize)

        r_sum, _ = np.histogram(distance_mat, bins=bins, weights=r)
        count, _ = np.histogram(distance_mat, bins=bins)

        with np.errstate(divide="ignore", invalid="ignore"):
            r2_avg = np.where(count > 0, r_sum / count, 0)
        popt, _ = curve_fit(fit_exp, bins[1:] / binsize, r2_avg)
        res = pd.DataFrame(
            {
                "distance_kb": bins[1:] / binsize,
                "r2_avg": r2_avg,
                "decay_rate": popt[0],
                "cohort": f"{locus.popu}_{locus.cohort}",
            }
        )
        decay_res.append(res)
    return pd.concat(decay_res, axis=0)

loci_qc(inputs, out_dir, threads=1, remove_outlier=False, logLR_threshold=2, z_threshold=2, z_std_diff_threshold=3, r_threshold=0.8, dentist_s_pvalue_threshold=4, dentist_s_r2_threshold=0.6)

Perform quality control analysis on multiple loci in parallel.

Parameters

inputs : str Path to input file containing locus information. Must be tab-separated with columns including 'locus_id'. out_dir : str Output directory path where results will be saved. threads : int, optional Number of parallel threads to use, by default 1. remove_outlier : bool, optional Whether to remove outliers and re-run QC on cleaned data, by default False. logLR_threshold : float, optional LogLR threshold for LD mismatch detection, by default 2. z_threshold : float, optional Z-score threshold for both LD mismatch and marginal SNP detection, by default 2. z_std_diff_threshold : float, optional Z_std_diff threshold for marginal SNP outlier detection, by default 3. r_threshold : float, optional Correlation threshold with lead SNP for marginal SNP detection, by default 0.8. dentist_s_pvalue_threshold : float, optional -log10 p-value threshold for Dentist-S outlier detection, by default 4. dentist_s_r2_threshold : float, optional R² threshold for Dentist-S outlier detection, by default 0.6.

Returns

Dict[str, Any] Run summary containing counts of successful and failed loci and error details. QC result files are written under out_dir.

Raises

ValueError Raised when threads is less than 1.

Notes

This function processes multiple loci in parallel with the following workflow:

  1. Reads locus information from input file
  2. Groups loci by locus_id
  3. Processes each locus group using multiprocessing
  4. Displays progress bar for user feedback
  5. Saves results organized by locus_id

The input file should contain columns: locus_id, prefix, popu, cohort, sample_size.

Output structure: {out_dir}/{locus_id}/{qc_metric}.txt.gz {out_dir}/{locus_id}/qc.txt.gz {out_dir}/qc.txt.gz

Each locus gets its own subdirectory with compressed QC result files. A global QC summary file is also generated at the output directory root.

Source code in credtools/qc.py
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
def loci_qc(
    inputs: str,
    out_dir: str,
    threads: int = 1,
    remove_outlier: bool = False,
    logLR_threshold: float = 2,
    z_threshold: float = 2,
    z_std_diff_threshold: float = 3,
    r_threshold: float = 0.8,
    dentist_s_pvalue_threshold: float = 4,
    dentist_s_r2_threshold: float = 0.6,
) -> Dict[str, Any]:
    """
    Perform quality control analysis on multiple loci in parallel.

    Parameters
    ----------
    inputs : str
        Path to input file containing locus information. Must be tab-separated
        with columns including 'locus_id'.
    out_dir : str
        Output directory path where results will be saved.
    threads : int, optional
        Number of parallel threads to use, by default 1.
    remove_outlier : bool, optional
        Whether to remove outliers and re-run QC on cleaned data, by default False.
    logLR_threshold : float, optional
        LogLR threshold for LD mismatch detection, by default 2.
    z_threshold : float, optional
        Z-score threshold for both LD mismatch and marginal SNP detection, by default 2.
    z_std_diff_threshold : float, optional
        Z_std_diff threshold for marginal SNP outlier detection, by default 3.
    r_threshold : float, optional
        Correlation threshold with lead SNP for marginal SNP detection, by default 0.8.
    dentist_s_pvalue_threshold : float, optional
        -log10 p-value threshold for Dentist-S outlier detection, by default 4.
    dentist_s_r2_threshold : float, optional
        R² threshold for Dentist-S outlier detection, by default 0.6.

    Returns
    -------
    Dict[str, Any]
        Run summary containing counts of successful and failed loci and error
        details. QC result files are written under ``out_dir``.

    Raises
    ------
    ValueError
        Raised when ``threads`` is less than 1.

    Notes
    -----
    This function processes multiple loci in parallel with the following workflow:

    1. Reads locus information from input file
    2. Groups loci by locus_id
    3. Processes each locus group using multiprocessing
    4. Displays progress bar for user feedback
    5. Saves results organized by locus_id

    The input file should contain columns: locus_id, prefix, popu, cohort, sample_size.

    Output structure:
    {out_dir}/{locus_id}/{qc_metric}.txt.gz
    {out_dir}/{locus_id}/qc.txt.gz
    {out_dir}/qc.txt.gz

    Each locus gets its own subdirectory with compressed QC result files.
    A global QC summary file is also generated at the output directory root.
    """
    if threads < 1:
        raise ValueError("threads must be a positive integer")

    os.makedirs(out_dir, exist_ok=True)

    loci_info = pd.read_csv(inputs, sep="	")
    loci_info = check_loci_info(loci_info)  # Validate input data

    progress = Progress(
        SpinnerColumn(),
        TextColumn("[progress.description]{task.description}"),
        BarColumn(),
        MofNCompleteColumn(),
        TimeRemainingColumn(),
    )

    locus_groups = [
        (
            locus_id,
            locus_info,
            out_dir,
            remove_outlier,
            logLR_threshold,
            z_threshold,
            z_std_diff_threshold,
            r_threshold,
            dentist_s_pvalue_threshold,
            dentist_s_r2_threshold,
        )
        for locus_id, locus_info in loci_info.groupby("locus_id")
    ]
    all_summaries: list[pd.DataFrame] = []
    all_outlier_summaries: list[pd.DataFrame] = []
    all_cleaned_summaries: list[pd.DataFrame] = []
    all_outlier_details: list[pd.DataFrame] = []
    total_loci = len(locus_groups)
    start_time = datetime.now()
    log_path = Path(out_dir) / "qc_run_summary.log"
    run_summary: Dict[str, Any] = {
        "start_time": start_time.isoformat(),
        "end_time": None,
        "total_loci": total_loci,
        "successful_loci": 0,
        "failed_loci": 0,
        "errors": [],
        "log_path": str(log_path),
        "parameters": {
            "inputs": inputs,
            "out_dir": out_dir,
            "threads": threads,
            "remove_outlier": remove_outlier,
            "logLR_threshold": logLR_threshold,
            "z_threshold": z_threshold,
            "z_std_diff_threshold": z_std_diff_threshold,
            "r_threshold": r_threshold,
            "dentist_s_pvalue_threshold": dentist_s_pvalue_threshold,
            "dentist_s_r2_threshold": dentist_s_r2_threshold,
        },
    }
    logger.info(f"QC run started at {start_time.isoformat()}")
    logger.info(f"Log file will be saved to {log_path}")
    logger.info(f"parameters: {run_summary['parameters']}")

    with progress:
        task = progress.add_task("[cyan]Processing loci...", total=total_loci)
        if locus_groups:
            with Pool(processes=threads) as pool:
                for (
                    locus_id,
                    summary,
                    outlier_summary,
                    cleaned_summary,
                    outlier_detail,
                    error,
                ) in pool.imap_unordered(safe_qc_locus_cli, locus_groups):
                    progress.update(task, advance=1)
                    if error is None:
                        run_summary["successful_loci"] += 1
                        if summary is not None and not summary.empty:
                            all_summaries.append(summary)
                        if outlier_summary is not None and not outlier_summary.empty:
                            all_outlier_summaries.append(outlier_summary)
                        if cleaned_summary is not None and not cleaned_summary.empty:
                            all_cleaned_summaries.append(cleaned_summary)
                        if outlier_detail is not None and not outlier_detail.empty:
                            all_outlier_details.append(outlier_detail)
                    else:
                        run_summary["failed_loci"] += 1
                        run_summary["errors"].append(
                            f"Locus {locus_id} failed: {error}"
                        )

    if all_summaries:
        global_summary = pd.concat(all_summaries, ignore_index=True)
        cols = ["locus_id"] + [
            col for col in global_summary.columns if col != "locus_id"
        ]
        global_summary = global_summary[cols]
        global_summary.to_csv(
            f"{out_dir}/qc.txt.gz",
            sep="	",
            index=False,
            compression="gzip",
            float_format="%.6f",
        )
        logger.info(f"Global QC summary saved to {out_dir}/qc.txt.gz")

    # Save global cleaned QC summary if outlier removal was performed
    if all_cleaned_summaries:
        global_cleaned_summary = pd.concat(all_cleaned_summaries, ignore_index=True)
        cols = ["locus_id"] + [
            col for col in global_cleaned_summary.columns if col != "locus_id"
        ]
        global_cleaned_summary = global_cleaned_summary[cols]
        os.makedirs(f"{out_dir}/cleaned", exist_ok=True)
        global_cleaned_summary.to_csv(
            f"{out_dir}/cleaned/qc_cleaned.txt.gz",
            sep="\t",
            index=False,
            compression="gzip",
            float_format="%.6f",
        )
        logger.info(
            f"Global cleaned QC summary saved to {out_dir}/cleaned/qc_cleaned.txt.gz"
        )

    # Save outlier summary if outlier removal was performed
    if all_outlier_summaries:
        global_outlier_summary = pd.concat(all_outlier_summaries, ignore_index=True)
        os.makedirs(f"{out_dir}/cleaned", exist_ok=True)
        global_outlier_summary.to_csv(
            f"{out_dir}/cleaned/outlier_removal_summary.txt.gz",
            sep="	",
            index=False,
            compression="gzip",
            float_format="%.6f",
        )
        logger.info(
            f"Outlier removal summary saved to {out_dir}/cleaned/outlier_removal_summary.txt.gz"
        )

    # Save global outlier SNP details if any were collected
    if all_outlier_details:
        global_outlier_details = pd.concat(all_outlier_details, ignore_index=True)
        os.makedirs(f"{out_dir}/cleaned", exist_ok=True)
        global_outlier_details.to_csv(
            f"{out_dir}/cleaned/outlier_snps.txt.gz",
            sep="\t",
            index=False,
            compression="gzip",
        )
        logger.info(
            f"Global outlier SNP details saved to {out_dir}/cleaned/outlier_snps.txt.gz"
        )

    if all_outlier_summaries:
        # Create cleaned loci info file
        cleaned_loci_info = []
        for _, row in global_outlier_summary.iterrows():
            cleaned_dir = f"{out_dir}/cleaned/{row['locus_id']}"
            # Use the same prefix logic as Locus.prefix property
            import hashlib

            if "+" in row["cohort"]:
                cohort_hash = hashlib.md5(row["cohort"].encode()).hexdigest()[:8]
                num_cohorts = len(row["cohort"].split("+"))
                prefix = f"{row['popu']}_meta{num_cohorts}cohorts_{cohort_hash}"
            else:
                prefix = f"{row['popu']}_{row['cohort']}"
            cleaned_loci_info.append(
                {
                    "locus_id": row["locus_id"],
                    "prefix": f"{cleaned_dir}/{prefix}",
                    "popu": row["popu"],
                    "cohort": row["cohort"],
                    "sample_size": (
                        loci_info[
                            (loci_info["locus_id"] == row["locus_id"])
                            & (loci_info["popu"] == row["popu"])
                            & (loci_info["cohort"] == row["cohort"])
                        ]["sample_size"].iloc[0]
                        if not loci_info.empty
                        else 0
                    ),
                    "chr": (
                        loci_info[loci_info["locus_id"] == row["locus_id"]]["chr"].iloc[
                            0
                        ]
                        if not loci_info.empty
                        else 0
                    ),
                    "start": (
                        loci_info[loci_info["locus_id"] == row["locus_id"]][
                            "start"
                        ].iloc[0]
                        if not loci_info.empty
                        else 0
                    ),
                    "end": (
                        loci_info[loci_info["locus_id"] == row["locus_id"]]["end"].iloc[
                            0
                        ]
                        if not loci_info.empty
                        else 0
                    ),
                    "n_snps": row["cleaned_snps"],
                    "outliers_removed": row["outliers_removed"],
                }
            )

        cleaned_loci_info_df = pd.DataFrame(cleaned_loci_info)
        cleaned_loci_info_df.to_csv(
            f"{out_dir}/cleaned/cleaned_loci_info.txt.gz",
            sep="	",
            index=False,
            compression="gzip",
        )
        logger.info(
            f"Cleaned loci info saved to {out_dir}/cleaned/cleaned_loci_info.txt.gz"
        )

    run_summary["end_time"] = datetime.now().isoformat()
    with log_path.open("w") as log_file:
        log_file.write("=== CREDTOOLS QC RUN SUMMARY ===\n")
        log_file.write(f"Start Time: {run_summary['start_time']}\n")
        log_file.write(f"End Time: {run_summary['end_time']}\n")
        log_file.write(f"Total Loci: {run_summary['total_loci']}\n")
        log_file.write(f"Successful: {run_summary['successful_loci']}\n")
        log_file.write(f"Failed: {run_summary['failed_loci']}\n")
        log_file.write("\n")
        if run_summary["errors"]:
            log_file.write("Error Details:\n")
            for error in run_summary["errors"]:
                log_file.write(f"  - {error}\n")
            log_file.write("\n")

    logger.info(
        "QC completed: %s succeeded, %s failed. Summary written to %s",
        run_summary["successful_loci"],
        run_summary["failed_loci"],
        log_path,
    )

    return run_summary

locus_qc(locus_set, r_tol=0.001, method='null-mle', out_dir=None, dtype=None, logLR_threshold=2, z_threshold=2, z_std_diff_threshold=3, r_threshold=0.8, dentist_s_pvalue_threshold=4, dentist_s_r2_threshold=0.6)

Perform comprehensive quality control analysis for a locus.

Parameters

locus_set : LocusSet LocusSet object containing loci to analyze. r_tol : float, optional Tolerance level for eigenvalue check of positive semidefinite matrix, by default 1e-3. method : str, optional Method to estimate s parameter, by default "null-mle". Options: "null-mle", "null-partialmle", or "null-pseudomle". out_dir : Optional[str], optional Output directory to save results, by default None. dtype : Optional[np.dtype], optional Data type for computation. If None, uses float64. Use np.float32 for reduced memory usage with minimal precision loss. logLR_threshold : float, optional LogLR threshold for LD mismatch detection, by default 2. z_threshold : float, optional Z-score threshold for both LD mismatch and marginal SNP detection, by default 2. z_std_diff_threshold : float, optional Z_std_diff threshold for marginal SNP outlier detection, by default 3. r_threshold : float, optional Correlation threshold with lead SNP for marginal SNP detection, by default 0.8. dentist_s_pvalue_threshold : float, optional -log10 p-value threshold for Dentist-S outlier detection, by default 4. dentist_s_r2_threshold : float, optional R² threshold for Dentist-S outlier detection, by default 0.6.

Returns

Dict[str, pd.DataFrame] Dictionary of quality control results with keys: - 'expected_z': kriging RSS results - 'dentist_s': Dentist-S test results - 'compare_maf': MAF comparison results

Note: heterogeneity metrics (ld_4th_moment, ld_decay, cochran_q,
snp_missingness) are now computed before meta-analysis via
``credtools.meta.compute_heterogeneity()``.

Notes

This function performs comprehensive QC including:

Single-locus analyses: - Kriging RSS for outlier detection with combined lambda-s outlier criteria: * LD Mismatch Indicators (SuSiE guidelines): logLR > threshold AND |z| > threshold * Marginally Non-significant SNPs: |z| < threshold AND |z_std_diff| > threshold AND |r| > threshold with lead SNP - Dentist-S for outlier detection - MAF comparison between sumstats and LD reference - LD matrix 4th moment analysis - LD decay pattern analysis

Multi-locus analyses (when applicable): - Cochran-Q heterogeneity testing - SNP missingness across cohorts

TODO: Add LAVA (Local Analysis of Variant Associations) analysis.

If out_dir is provided, results are saved as tab-separated files.

Source code in credtools/qc.py
def locus_qc(
    locus_set: LocusSet,
    r_tol: float = 1e-3,
    method: str = "null-mle",
    out_dir: Optional[str] = None,
    dtype: Optional[np.dtype] = None,
    logLR_threshold: float = 2,
    z_threshold: float = 2,
    z_std_diff_threshold: float = 3,
    r_threshold: float = 0.8,
    dentist_s_pvalue_threshold: float = 4,
    dentist_s_r2_threshold: float = 0.6,
) -> Dict[str, pd.DataFrame]:
    """
    Perform comprehensive quality control analysis for a locus.

    Parameters
    ----------
    locus_set : LocusSet
        LocusSet object containing loci to analyze.
    r_tol : float, optional
        Tolerance level for eigenvalue check of positive semidefinite matrix, by default 1e-3.
    method : str, optional
        Method to estimate s parameter, by default "null-mle".
        Options: "null-mle", "null-partialmle", or "null-pseudomle".
    out_dir : Optional[str], optional
        Output directory to save results, by default None.
    dtype : Optional[np.dtype], optional
        Data type for computation. If None, uses float64.
        Use np.float32 for reduced memory usage with minimal precision loss.
    logLR_threshold : float, optional
        LogLR threshold for LD mismatch detection, by default 2.
    z_threshold : float, optional
        Z-score threshold for both LD mismatch and marginal SNP detection, by default 2.
    z_std_diff_threshold : float, optional
        Z_std_diff threshold for marginal SNP outlier detection, by default 3.
    r_threshold : float, optional
        Correlation threshold with lead SNP for marginal SNP detection, by default 0.8.
    dentist_s_pvalue_threshold : float, optional
        -log10 p-value threshold for Dentist-S outlier detection, by default 4.
    dentist_s_r2_threshold : float, optional
        R² threshold for Dentist-S outlier detection, by default 0.6.

    Returns
    -------
    Dict[str, pd.DataFrame]
        Dictionary of quality control results with keys:
        - 'expected_z': kriging RSS results
        - 'dentist_s': Dentist-S test results
        - 'compare_maf': MAF comparison results

        Note: heterogeneity metrics (ld_4th_moment, ld_decay, cochran_q,
        snp_missingness) are now computed before meta-analysis via
        ``credtools.meta.compute_heterogeneity()``.

    Notes
    -----
    This function performs comprehensive QC including:

    Single-locus analyses:
    - Kriging RSS for outlier detection with combined lambda-s outlier criteria:
      * LD Mismatch Indicators (SuSiE guidelines): logLR > threshold AND |z| > threshold
      * Marginally Non-significant SNPs: |z| < threshold AND |z_std_diff| > threshold
        AND |r| > threshold with lead SNP
    - Dentist-S for outlier detection
    - MAF comparison between sumstats and LD reference
    - LD matrix 4th moment analysis
    - LD decay pattern analysis

    Multi-locus analyses (when applicable):
    - Cochran-Q heterogeneity testing
    - SNP missingness across cohorts

    TODO: Add LAVA (Local Analysis of Variant Associations) analysis.

    If out_dir is provided, results are saved as tab-separated files.
    """
    qc_metrics = {}
    all_expected_z = []
    all_dentist_s = []
    all_compare_maf = []
    for locus in locus_set.loci:
        logger.info(f"Processing locus: {locus.locus_id}")
        lo = intersect_sumstat_ld(locus)
        # Compute eigendecomposition once and reuse for both functions
        eigens = get_eigen(lo.ld.r, dtype)
        lambda_s = estimate_s_rss(locus, r_tol, method, eigens, dtype)
        expected_z = kriging_rss(locus, r_tol, lambda_s, eigens, dtype)
        expected_z["lambda_s"] = lambda_s
        expected_z["cohort"] = f"{locus.popu}_{locus.cohort}"
        dentist_s = compute_dentist_s(locus)
        dentist_s["cohort"] = f"{locus.popu}_{locus.cohort}"
        compare_maf_res = compare_maf(locus)
        compare_maf_res["cohort"] = f"{locus.popu}_{locus.cohort}"
        all_expected_z.append(expected_z)
        all_dentist_s.append(dentist_s)
        all_compare_maf.append(compare_maf_res)
    all_expected_z = pd.concat(all_expected_z, axis=0)
    all_dentist_s = pd.concat(all_dentist_s, axis=0)
    all_compare_maf = pd.concat(all_compare_maf, axis=0)
    qc_metrics["expected_z"] = all_expected_z
    qc_metrics["dentist_s"] = all_dentist_s
    qc_metrics["compare_maf"] = all_compare_maf

    if out_dir is not None:
        os.makedirs(out_dir, exist_ok=True)
        for metric_name, metric_data in qc_metrics.items():
            metric_data.to_csv(
                f"{out_dir}/{metric_name}.txt.gz",
                sep="\t",
                index=False,
                float_format="%.6f",
                compression="gzip",
            )

    return qc_metrics

locus_qc_summary(qc_metrics, logLR_threshold=2, z_threshold=2, z_std_diff_threshold=3, r_threshold=0.8, dentist_s_pvalue_threshold=4, dentist_s_r2_threshold=0.6)

Generate summary QC statistics for a locus across all cohorts.

Parameters

qc_metrics : Dict[str, pd.DataFrame] Dictionary of QC results from locus_qc function. logLR_threshold : float, optional LogLR threshold for LD mismatch detection, by default 2. z_threshold : float, optional Z-score threshold for both LD mismatch and marginal SNP detection, by default 2. z_std_diff_threshold : float, optional Z_std_diff threshold for marginal SNP outlier detection, by default 3. r_threshold : float, optional Correlation threshold with lead SNP for marginal SNP detection, by default 0.8. dentist_s_pvalue_threshold : float, optional -log10 p-value threshold for Dentist-S outlier detection, by default 4. dentist_s_r2_threshold : float, optional R² threshold for Dentist-S outlier detection, by default 0.6.

Returns

pd.DataFrame Summary QC statistics with columns: popu, cohort, n_snps, n_1e-5, n_5e-8, maf_corr, lambda_s, n_lambda_s_outlier, n_dentist_s_outlier.

Source code in credtools/qc.py
def locus_qc_summary(
    qc_metrics: Dict[str, pd.DataFrame],
    logLR_threshold: float = 2,
    z_threshold: float = 2,
    z_std_diff_threshold: float = 3,
    r_threshold: float = 0.8,
    dentist_s_pvalue_threshold: float = 4,
    dentist_s_r2_threshold: float = 0.6,
) -> pd.DataFrame:
    """
    Generate summary QC statistics for a locus across all cohorts.

    Parameters
    ----------
    qc_metrics : Dict[str, pd.DataFrame]
        Dictionary of QC results from locus_qc function.
    logLR_threshold : float, optional
        LogLR threshold for LD mismatch detection, by default 2.
    z_threshold : float, optional
        Z-score threshold for both LD mismatch and marginal SNP detection, by default 2.
    z_std_diff_threshold : float, optional
        Z_std_diff threshold for marginal SNP outlier detection, by default 3.
    r_threshold : float, optional
        Correlation threshold with lead SNP for marginal SNP detection, by default 0.8.
    dentist_s_pvalue_threshold : float, optional
        -log10 p-value threshold for Dentist-S outlier detection, by default 4.
    dentist_s_r2_threshold : float, optional
        R² threshold for Dentist-S outlier detection, by default 0.6.

    Returns
    -------
    pd.DataFrame
        Summary QC statistics with columns: popu, cohort, n_snps, n_1e-5, n_5e-8,
        maf_corr, lambda_s, n_lambda_s_outlier, n_dentist_s_outlier.
    """
    summary_rows = []

    # Get unique cohorts from expected_z results
    expected_z = qc_metrics.get("expected_z", pd.DataFrame())
    if expected_z.empty:
        return pd.DataFrame()

    cohorts = expected_z["cohort"].unique()
    logger.info(
        f"Run QC on parameters: logLR_threshold={logLR_threshold}, z_threshold={z_threshold}, "
    )
    logger.info(
        f"z_std_diff_threshold={z_std_diff_threshold}, r_threshold={r_threshold}, "
    )
    logger.info(
        f"dentist_s_pvalue_threshold={dentist_s_pvalue_threshold}, dentist_s_r2_threshold={dentist_s_r2_threshold}"
    )

    for cohort in cohorts:
        logger.info(f"Generating QC summary for cohort: {cohort}")
        # Parse population and cohort from cohort string (format: "popu_cohort")
        popu, cohort_name = cohort.split("_", 1)

        # Filter data for this cohort
        cohort_expected_z = expected_z[expected_z["cohort"] == cohort]
        cohort_dentist_s = qc_metrics.get("dentist_s", pd.DataFrame())
        if not cohort_dentist_s.empty:
            cohort_dentist_s = cohort_dentist_s[cohort_dentist_s["cohort"] == cohort]
        else:
            cohort_dentist_s = pd.DataFrame()
        cohort_compare_maf = qc_metrics.get("compare_maf", pd.DataFrame())
        if not cohort_compare_maf.empty:
            cohort_compare_maf = cohort_compare_maf[
                cohort_compare_maf["cohort"] == cohort
            ]
        else:
            cohort_compare_maf = pd.DataFrame()

        # Calculate metrics
        n_snps = len(cohort_expected_z)

        # Count p-values < 1e-5 and < 5e-8 (need to calculate p-values from z-scores)
        z_scores = cohort_expected_z["z"].abs()
        p_values = np.maximum(2 * (1 - stats.norm.cdf(z_scores)), 1e-300)
        n_1e_5 = int((p_values < 1e-5).sum())
        n_5e_8 = int((p_values < 5e-8).sum())

        # MAF correlation
        if not cohort_compare_maf.empty and len(cohort_compare_maf) > 1:
            maf_corr = float(
                cohort_compare_maf["MAF_sumstats"].corr(cohort_compare_maf["MAF_ld"])
            )
        else:
            maf_corr = np.nan

        # Lambda-s (take the first value since it's the same for all SNPs in the cohort)
        lambda_s = (
            float(cohort_expected_z["lambda_s"].iloc[0])
            if not cohort_expected_z.empty
            else np.nan
        )

        # Calculate combined n_lambda_s_outlier using new criteria
        # 1. LD Mismatch Indicators (SuSiE guidelines): logLR > threshold AND |z| > threshold
        ld_mismatch_condition = (cohort_expected_z["logLR"] > logLR_threshold) & (
            cohort_expected_z["z"].abs() > z_threshold
        )

        # 2. Marginally Non-significant SNPs: |z| < threshold AND |z_std_diff| > threshold AND |r| > threshold with lead SNP
        # Find lead SNP (lowest p-value) and get its correlation with all SNPs
        if not cohort_expected_z.empty:
            # Calculate p-values from z-scores to find lead SNP
            z_scores_abs = cohort_expected_z["z"].abs()
            p_values_lead = np.maximum(2 * (1 - stats.norm.cdf(z_scores_abs)), 1e-300)
            lead_idx = p_values_lead.argmin()

            # Get correlation matrix for this cohort - need to find the corresponding locus
            # Extract cohort info to match with locus
            current_cohort = cohort
            lead_snp_corr = None

            # Find the locus that matches this cohort to get LD matrix
            for locus in [
                li for li in expected_z["cohort"].unique() if li == current_cohort
            ]:
                # Find the locus object - this is a limitation, we need the actual locus object
                # For now, we'll approximate using the dentist_s results which has r2 with lead SNP
                if not cohort_dentist_s.empty:
                    # Use the dentist_s r values as proxy for correlation with lead SNP
                    # Merge expected_z with dentist_s on SNPID to get r values
                    merged_data = pd.merge(
                        cohort_expected_z,
                        cohort_dentist_s[["SNPID", "r2"]],
                        on="SNPID",
                        how="left",
                    )
                    lead_snp_corr = np.sqrt(
                        merged_data["r2"].fillna(0)
                    )  # Convert r2 to |r|
                else:
                    # If no dentist_s data, set correlation to 0 (will not trigger marginal condition)
                    lead_snp_corr = np.zeros(len(cohort_expected_z))

            if lead_snp_corr is None:
                lead_snp_corr = np.zeros(len(cohort_expected_z))

            marginal_condition = (
                (cohort_expected_z["z"].abs() < z_threshold)
                & (cohort_expected_z["z_std_diff"].abs() > z_std_diff_threshold)
                & (lead_snp_corr > r_threshold)
            )
        else:
            marginal_condition = pd.Series([False] * len(cohort_expected_z))

        # Combined outlier count: either LD mismatch OR marginally non-significant
        n_lambda_s_outlier = int((ld_mismatch_condition | marginal_condition).sum())

        # Count dentist-s outliers: -log10p >= threshold AND r2 >= threshold
        if not cohort_dentist_s.empty:
            dentist_condition = (
                cohort_dentist_s["-log10p_dentist_s"] >= dentist_s_pvalue_threshold
            ) & (cohort_dentist_s["r2"] >= dentist_s_r2_threshold)
            n_dentist_s_outlier = int(dentist_condition.sum())
        else:
            n_dentist_s_outlier = 0

        summary_rows.append(
            {
                "popu": popu,
                "cohort": cohort_name,
                "n_snps": n_snps,
                "n_1e-5": n_1e_5,
                "n_5e-8": n_5e_8,
                "maf_corr": maf_corr,
                "lambda_s": lambda_s,
                "n_lambda_s_outlier": n_lambda_s_outlier,
                "n_dentist_s_outlier": n_dentist_s_outlier,
            }
        )

    return pd.DataFrame(summary_rows)

qc_locus_cli(args)

Quality control for a single locus (command-line interface wrapper).

Parameters

args : Tuple[str, pd.DataFrame, str, bool, float, float, float, float, float, float] Tuple containing: - locus_id : str Locus identifier - locus_info : pd.DataFrame DataFrame with locus information - base_out_dir : str Base output directory - remove_outlier : bool Whether to remove outliers and re-run QC - logLR_threshold : float LogLR threshold for LD mismatch detection - z_threshold : float Z-score threshold for both LD mismatch and marginal SNP detection - z_std_diff_threshold : float Z_std_diff threshold for marginal SNP outlier detection - r_threshold : float Correlation threshold with lead SNP for marginal SNP detection - dentist_s_pvalue_threshold : float -log10 p-value threshold for Dentist-S outlier detection - dentist_s_r2_threshold : float R² threshold for Dentist-S outlier detection

Returns

Tuple[str, pd.DataFrame, Optional[pd.DataFrame], Optional[pd.DataFrame], Optional[pd.DataFrame]] Tuple containing the locus_id that was processed, its summary QC stats, outlier removal summary, cleaned QC summary, and outlier detail DataFrame if applicable.

Notes

This function is designed for multiprocessing and:

  1. Loads the locus set from the provided information
  2. Performs comprehensive QC analysis
  3. Creates locus-specific output directory
  4. Saves all QC results as compressed files
  5. Generates and saves locus-level summary QC statistics
  6. Returns the processed locus_id and summary for global aggregation

Output files are saved as: {base_out_dir}/{locus_id}/{qc_metric}.txt.gz {base_out_dir}/{locus_id}/qc.txt.gz

Source code in credtools/qc.py
def qc_locus_cli(
    args: Tuple[str, pd.DataFrame, str, bool, float, float, float, float, float, float],
) -> Tuple[
    str,
    pd.DataFrame,
    Optional[pd.DataFrame],
    Optional[pd.DataFrame],
    Optional[pd.DataFrame],
]:
    """
    Quality control for a single locus (command-line interface wrapper).

    Parameters
    ----------
    args : Tuple[str, pd.DataFrame, str, bool, float, float, float, float, float, float]
        Tuple containing:
        - locus_id : str
            Locus identifier
        - locus_info : pd.DataFrame
            DataFrame with locus information
        - base_out_dir : str
            Base output directory
        - remove_outlier : bool
            Whether to remove outliers and re-run QC
        - logLR_threshold : float
            LogLR threshold for LD mismatch detection
        - z_threshold : float
            Z-score threshold for both LD mismatch and marginal SNP detection
        - z_std_diff_threshold : float
            Z_std_diff threshold for marginal SNP outlier detection
        - r_threshold : float
            Correlation threshold with lead SNP for marginal SNP detection
        - dentist_s_pvalue_threshold : float
            -log10 p-value threshold for Dentist-S outlier detection
        - dentist_s_r2_threshold : float
            R² threshold for Dentist-S outlier detection

    Returns
    -------
    Tuple[str, pd.DataFrame, Optional[pd.DataFrame], Optional[pd.DataFrame], Optional[pd.DataFrame]]
        Tuple containing the locus_id that was processed, its summary QC stats,
        outlier removal summary, cleaned QC summary, and outlier detail DataFrame
        if applicable.

    Notes
    -----
    This function is designed for multiprocessing and:

    1. Loads the locus set from the provided information
    2. Performs comprehensive QC analysis
    3. Creates locus-specific output directory
    4. Saves all QC results as compressed files
    5. Generates and saves locus-level summary QC statistics
    6. Returns the processed locus_id and summary for global aggregation

    Output files are saved as:
    {base_out_dir}/{locus_id}/{qc_metric}.txt.gz
    {base_out_dir}/{locus_id}/qc.txt.gz
    """
    (
        locus_id,
        locus_info,
        base_out_dir,
        remove_outlier,
        logLR_threshold,
        z_threshold,
        z_std_diff_threshold,
        r_threshold,
        dentist_s_pvalue_threshold,
        dentist_s_r2_threshold,
    ) = args
    locus_set = load_locus_set(locus_info)
    qc_metrics = locus_qc(
        locus_set,
        logLR_threshold=logLR_threshold,
        z_threshold=z_threshold,
        z_std_diff_threshold=z_std_diff_threshold,
        r_threshold=r_threshold,
        dentist_s_pvalue_threshold=dentist_s_pvalue_threshold,
        dentist_s_r2_threshold=dentist_s_r2_threshold,
    )
    locus_out_dir = f"{base_out_dir}/{locus_id}"
    os.makedirs(locus_out_dir, exist_ok=True)

    # Save individual QC metrics
    for metric_name, metric_data in qc_metrics.items():
        metric_data.to_csv(
            f"{locus_out_dir}/{metric_name}.txt.gz",
            sep="\t",
            index=False,
            compression="gzip",
            float_format="%.6f",
        )

    # Generate and save locus-level summary
    summary = locus_qc_summary(
        qc_metrics,
        logLR_threshold=logLR_threshold,
        z_threshold=z_threshold,
        z_std_diff_threshold=z_std_diff_threshold,
        r_threshold=r_threshold,
        dentist_s_pvalue_threshold=dentist_s_pvalue_threshold,
        dentist_s_r2_threshold=dentist_s_r2_threshold,
    )
    if not summary.empty:
        summary.to_csv(
            f"{locus_out_dir}/qc.txt.gz",
            sep="\t",
            index=False,
            compression="gzip",
            float_format="%.6f",
        )
        # Add locus_id for global aggregation
        summary["locus_id"] = locus_id

    # Handle outlier removal if requested
    outlier_summary = None
    cleaned_summary = None
    outlier_detail = None
    if remove_outlier:
        logger.info(f"Removing outliers and re-running QC for locus {locus_id}")
        (
            _,
            cleaned_qc_metrics,
            outlier_summary,
            outlier_detail,
        ) = remove_outliers_and_rerun_qc(
            locus_set,
            qc_metrics,
            base_out_dir,
            locus_id,
            logLR_threshold=logLR_threshold,
            z_threshold=z_threshold,
            z_std_diff_threshold=z_std_diff_threshold,
            r_threshold=r_threshold,
            dentist_s_pvalue_threshold=dentist_s_pvalue_threshold,
            dentist_s_r2_threshold=dentist_s_r2_threshold,
        )

        # Generate and save cleaned QC summary
        cleaned_summary = locus_qc_summary(
            cleaned_qc_metrics,
            logLR_threshold=logLR_threshold,
            z_threshold=z_threshold,
            z_std_diff_threshold=z_std_diff_threshold,
            r_threshold=r_threshold,
            dentist_s_pvalue_threshold=dentist_s_pvalue_threshold,
            dentist_s_r2_threshold=dentist_s_r2_threshold,
        )
        if not cleaned_summary.empty:
            cleaned_locus_dir = os.path.join(base_out_dir, "cleaned", locus_id)
            cleaned_summary.to_csv(
                f"{cleaned_locus_dir}/qc.txt.gz",
                sep="\t",
                index=False,
                compression="gzip",
                float_format="%.6f",
            )
            cleaned_summary["locus_id"] = locus_id

        # Save outlier removal summary
        if not outlier_summary.empty:
            outlier_summary.to_csv(
                f"{base_out_dir}/cleaned/{locus_id}/outlier_removal_summary.txt.gz",
                sep="\t",
                index=False,
                compression="gzip",
                float_format="%.6f",
            )

    return locus_id, summary, outlier_summary, cleaned_summary, outlier_detail

remove_outliers_and_rerun_qc(locus_set, qc_metrics, base_out_dir, locus_id, r_tol=0.001, method='null-mle', dtype=None, logLR_threshold=2, z_threshold=2, z_std_diff_threshold=3, r_threshold=0.8, dentist_s_pvalue_threshold=4, dentist_s_r2_threshold=0.6)

Remove outliers from locus set and re-run QC.

Parameters

locus_set : LocusSet Original LocusSet object. qc_metrics : Dict[str, pd.DataFrame] QC metrics from initial QC run. base_out_dir : str Base output directory. locus_id : str Locus identifier. r_tol : float, optional Tolerance level for eigenvalue check, by default 1e-3. method : str, optional Method to estimate s parameter, by default "null-mle". dtype : Optional[np.dtype], optional Data type for computation, by default None.

Returns

Tuple[LocusSet, Dict[str, pd.DataFrame], pd.DataFrame, pd.DataFrame] Cleaned LocusSet, new QC metrics, summary of outlier removal, and detailed outlier SNP DataFrame with columns: SNPID, C1_ld_mismatch, C2_marginal, C3_dentist_s, locus_id, popu, cohort.

Source code in credtools/qc.py
def remove_outliers_and_rerun_qc(
    locus_set: LocusSet,
    qc_metrics: Dict[str, pd.DataFrame],
    base_out_dir: str,
    locus_id: str,
    r_tol: float = 1e-3,
    method: str = "null-mle",
    dtype: Optional[np.dtype] = None,
    logLR_threshold: float = 2,
    z_threshold: float = 2,
    z_std_diff_threshold: float = 3,
    r_threshold: float = 0.8,
    dentist_s_pvalue_threshold: float = 4,
    dentist_s_r2_threshold: float = 0.6,
) -> Tuple[LocusSet, Dict[str, pd.DataFrame], pd.DataFrame, pd.DataFrame]:
    """
    Remove outliers from locus set and re-run QC.

    Parameters
    ----------
    locus_set : LocusSet
        Original LocusSet object.
    qc_metrics : Dict[str, pd.DataFrame]
        QC metrics from initial QC run.
    base_out_dir : str
        Base output directory.
    locus_id : str
        Locus identifier.
    r_tol : float, optional
        Tolerance level for eigenvalue check, by default 1e-3.
    method : str, optional
        Method to estimate s parameter, by default "null-mle".
    dtype : Optional[np.dtype], optional
        Data type for computation, by default None.

    Returns
    -------
    Tuple[LocusSet, Dict[str, pd.DataFrame], pd.DataFrame, pd.DataFrame]
        Cleaned LocusSet, new QC metrics, summary of outlier removal, and
        detailed outlier SNP DataFrame with columns: SNPID, C1_ld_mismatch,
        C2_marginal, C3_dentist_s, locus_id, popu, cohort.
    """
    cleaned_loci = []
    outlier_summary = []
    all_outlier_details = []

    for locus in locus_set.loci:
        cohort = f"{locus.popu}_{locus.cohort}"

        # Identify outliers for this cohort (returns DataFrame)
        outlier_df = identify_outliers(
            qc_metrics,
            cohort,
            logLR_threshold=logLR_threshold,
            z_threshold=z_threshold,
            z_std_diff_threshold=z_std_diff_threshold,
            r_threshold=r_threshold,
            dentist_s_pvalue_threshold=dentist_s_pvalue_threshold,
            dentist_s_r2_threshold=dentist_s_r2_threshold,
        )
        outlier_snps = outlier_df["SNPID"].tolist() if not outlier_df.empty else []

        # Save cleaned data
        cleaned_dir = os.path.join(base_out_dir, "cleaned", locus_id)
        prefix = locus.prefix

        if outlier_snps:
            # Remove outliers
            cleaned_locus = remove_snps_from_locus(locus, outlier_snps)
            save_cleaned_locus(cleaned_locus, cleaned_dir, prefix)

            # Add context columns to outlier detail
            locus_outlier_detail = outlier_df.copy()
            locus_outlier_detail["locus_id"] = locus_id
            locus_outlier_detail["popu"] = locus.popu
            locus_outlier_detail["cohort"] = locus.cohort
            all_outlier_details.append(locus_outlier_detail)

            outlier_summary.append(
                {
                    "locus_id": locus_id,
                    "popu": locus.popu,
                    "cohort": locus.cohort,
                    "original_snps": locus.n_snps,
                    "outliers_removed": len(outlier_snps),
                    "cleaned_snps": cleaned_locus.n_snps,
                    "retention_rate": (
                        cleaned_locus.n_snps / locus.n_snps if locus.n_snps > 0 else 0
                    ),
                }
            )
        else:
            # No outliers detected, but still save to cleaned/ for consistency
            cleaned_locus = locus.copy()
            save_cleaned_locus(cleaned_locus, cleaned_dir, prefix)

            outlier_summary.append(
                {
                    "locus_id": locus_id,
                    "popu": locus.popu,
                    "cohort": locus.cohort,
                    "original_snps": locus.n_snps,
                    "outliers_removed": 0,
                    "cleaned_snps": locus.n_snps,
                    "retention_rate": 1.0,
                }
            )

        cleaned_loci.append(cleaned_locus)

    # Create cleaned LocusSet
    cleaned_locus_set = LocusSet(cleaned_loci)

    # Re-run QC on cleaned data
    logger.info(f"Re-running QC on cleaned data for locus {locus_id}")
    cleaned_qc_metrics = locus_qc(
        cleaned_locus_set,
        r_tol=r_tol,
        method=method,
        out_dir=os.path.join(base_out_dir, "cleaned", locus_id),
        dtype=dtype,
        logLR_threshold=logLR_threshold,
        z_threshold=z_threshold,
        z_std_diff_threshold=z_std_diff_threshold,
        r_threshold=r_threshold,
        dentist_s_pvalue_threshold=dentist_s_pvalue_threshold,
        dentist_s_r2_threshold=dentist_s_r2_threshold,
    )

    outlier_summary_df = pd.DataFrame(outlier_summary)

    # Combine all outlier details and save per-locus file
    if all_outlier_details:
        outlier_detail_df = pd.concat(all_outlier_details, ignore_index=True)
    else:
        outlier_detail_df = pd.DataFrame(
            columns=[
                "SNPID",
                "C1_ld_mismatch",
                "C2_marginal",
                "C3_dentist_s",
                "locus_id",
                "popu",
                "cohort",
            ]
        )

    # Save outlier detail file per locus
    outlier_detail_path = os.path.join(cleaned_dir, "outlier_snps.txt.gz")
    os.makedirs(cleaned_dir, exist_ok=True)
    outlier_detail_df.to_csv(
        outlier_detail_path,
        sep="\t",
        index=False,
        compression="gzip",
    )

    return cleaned_locus_set, cleaned_qc_metrics, outlier_summary_df, outlier_detail_df

remove_snps_from_locus(locus, outlier_snps)

Remove outlier SNPs from a Locus object.

Parameters

locus : Locus Input Locus object. outlier_snps : List[str] List of SNP IDs to remove.

Returns

Locus New Locus object with outliers removed.

Source code in credtools/qc.py
def remove_snps_from_locus(locus: Locus, outlier_snps: List[str]) -> Locus:
    """
    Remove outlier SNPs from a Locus object.

    Parameters
    ----------
    locus : Locus
        Input Locus object.
    outlier_snps : List[str]
        List of SNP IDs to remove.

    Returns
    -------
    Locus
        New Locus object with outliers removed.
    """
    if not outlier_snps:
        return locus.copy()

    # Filter sumstats
    cleaned_sumstats = locus.sumstats[
        ~locus.sumstats[ColName.SNPID].isin(outlier_snps)
    ].copy()
    cleaned_sumstats.reset_index(drop=True, inplace=True)

    # Filter LD matrix and map if available
    cleaned_ld = None
    if locus.ld is not None:
        # Get indices of SNPs to keep
        keep_mask = ~locus.ld.map[ColName.SNPID].isin(outlier_snps)
        keep_indices = np.where(keep_mask)[0]

        # Filter LD map
        cleaned_ld_map = locus.ld.map[keep_mask].copy()
        cleaned_ld_map.reset_index(drop=True, inplace=True)

        # Filter LD matrix (remove rows and columns)
        cleaned_r = locus.ld.r[keep_indices, :][:, keep_indices]

        # Create new LDMatrix
        cleaned_ld = LDMatrix(cleaned_ld_map, cleaned_r)

    # Create new Locus object
    cleaned_locus = Locus(
        popu=locus.popu,
        cohort=locus.cohort,
        sample_size=locus.sample_size,
        sumstats=cleaned_sumstats,
        locus_start=locus._locus_start,
        locus_end=locus._locus_end,
        ld=cleaned_ld,
        if_intersect=False,
    )

    logger.info(
        f"Removed {len(outlier_snps)} outliers from locus {locus.locus_id}. "
        f"SNPs reduced from {locus.n_snps} to {cleaned_locus.n_snps}"
    )

    return cleaned_locus

safe_qc_locus_cli(args)

Wrapper for qc_locus_cli that captures exceptions.

Source code in credtools/qc.py
def safe_qc_locus_cli(
    args: Tuple[str, pd.DataFrame, str, bool, float, float, float, float, float, float],
) -> Tuple[
    str,
    Optional[pd.DataFrame],
    Optional[pd.DataFrame],
    Optional[pd.DataFrame],
    Optional[pd.DataFrame],
    Optional[str],
]:
    """Wrapper for qc_locus_cli that captures exceptions."""
    locus_id = args[0]
    try:
        processed_id, summary, outlier_summary, cleaned_summary, outlier_detail = (
            qc_locus_cli(args)
        )
        return (
            processed_id,
            summary,
            outlier_summary,
            cleaned_summary,
            outlier_detail,
            None,
        )
    except Exception as exc:  # pragma: no cover - defensive logging
        logger.exception("QC failed for locus %s", locus_id)
        return (
            locus_id,
            pd.DataFrame(),
            None,
            None,
            None,
            f"{type(exc).__name__}: {exc}",
        )

save_cleaned_locus(locus, output_dir, prefix)

Save cleaned locus data to files.

Parameters

locus : Locus Cleaned Locus object to save. output_dir : str Output directory path. prefix : str Prefix for output files.

Source code in credtools/qc.py
def save_cleaned_locus(locus: Locus, output_dir: str, prefix: str) -> None:
    """
    Save cleaned locus data to files.

    Parameters
    ----------
    locus : Locus
        Cleaned Locus object to save.
    output_dir : str
        Output directory path.
    prefix : str
        Prefix for output files.
    """
    os.makedirs(output_dir, exist_ok=True)

    # Save summary statistics
    sumstats_path = os.path.join(output_dir, f"{prefix}.sumstats.gz")
    locus.sumstats.to_csv(
        sumstats_path,
        sep="\t",
        index=False,
        compression="gzip",
        float_format="%.6g",
    )
    logger.info(f"Saved cleaned sumstats to {sumstats_path}")

    # Save LD matrix and map if available
    if locus.ld is not None:
        # Save LD matrix as compressed numpy array
        ld_path = os.path.join(output_dir, f"{prefix}.ld.npz")
        np.savez_compressed(ld_path, ld=locus.ld.r.astype(np.float16))
        logger.info(f"Saved cleaned LD matrix to {ld_path}")

        # Save LD map
        ldmap_path = os.path.join(output_dir, f"{prefix}.ldmap.gz")
        locus.ld.map.to_csv(
            ldmap_path,
            sep="\t",
            index=False,
            compression="gzip",
        )
        logger.info(f"Saved cleaned LD map to {ldmap_path}")

snp_missingness(locus_set)

Compute the missingness rate of each cohort across variants.

Parameters

locus_set : LocusSet LocusSet object containing multiple loci/cohorts.

Returns

pd.DataFrame DataFrame with variants as rows and cohorts as columns, where 1 indicates presence and 0 indicates absence of the variant in that cohort.

Warnings

If any cohort has a missing rate > 0.1, a warning is logged.

Notes

This function:

  1. Identifies all unique variants across cohorts
  2. Creates a binary matrix indicating variant presence/absence
  3. Calculates and logs missing rates for each cohort
  4. Issues warnings for cohorts with high missing rates (>10%)

High missing rates may indicate: - Different genotyping platforms - Different quality control criteria - Population-specific variants

Source code in credtools/qc.py
def snp_missingness(locus_set: LocusSet) -> pd.DataFrame:
    """
    Compute the missingness rate of each cohort across variants.

    Parameters
    ----------
    locus_set : LocusSet
        LocusSet object containing multiple loci/cohorts.

    Returns
    -------
    pd.DataFrame
        DataFrame with variants as rows and cohorts as columns, where 1 indicates
        presence and 0 indicates absence of the variant in that cohort.

    Warnings
    --------
    If any cohort has a missing rate > 0.1, a warning is logged.

    Notes
    -----
    This function:

    1. Identifies all unique variants across cohorts
    2. Creates a binary matrix indicating variant presence/absence
    3. Calculates and logs missing rates for each cohort
    4. Issues warnings for cohorts with high missing rates (>10%)

    High missing rates may indicate:
    - Different genotyping platforms
    - Different quality control criteria
    - Population-specific variants
    """
    missingness_df = []
    for locus in locus_set.loci:
        try:
            loc = intersect_sumstat_ld(locus)
        except ValueError:
            logger.warning(
                "Skipping cohort %s_%s in snp_missingness: "
                "no overlap between sumstats and LD matrix.",
                locus.popu,
                locus.cohort,
            )
            continue
        loc = loc.sumstats[[ColName.SNPID]].copy()
        loc[f"{locus.popu}_{locus.cohort}"] = 1
        loc.set_index(ColName.SNPID, inplace=True)
        missingness_df.append(loc)
    if not missingness_df:
        return pd.DataFrame()
    missingness_df = pd.concat(missingness_df, axis=1)
    missingness_df.fillna(0, inplace=True)
    # log warning if missing rate > 0.1
    for col in missingness_df.columns:
        missing_rate = float(
            round(1 - missingness_df[col].sum() / missingness_df.shape[0], 3)
        )
        if missing_rate > 0.1:
            logger.warning(f"The missing rate of {col} is {missing_rate}")
        else:
            logger.info(f"The missing rate of {col} is {missing_rate}")

    return missingness_df