Skip to content

Preprocessing API

These functions power credtools munge and credtools chunk.

Munging

Munge summary statistics using smunger integration.

This module provides functionality to reformat and standardize GWAS summary statistics from various formats into a consistent format suitable for fine-mapping.

create_munge_config(sample_files, output_config, interactive=True)

Create configuration file for munging by examining sample files.

Parameters:

Name Type Description Default
sample_files Dict[str, str]

Dictionary mapping identifiers to sample file paths.

required
output_config str

Output path for the configuration file.

required
interactive bool

Whether to use interactive mode for column mapping, by default True.

True
Notes

This function helps users create configuration files by examining the headers of input files and providing suggested column mappings.

Source code in credtools/preprocessing/munge.py
def create_munge_config(
    sample_files: Dict[str, str], output_config: str, interactive: bool = True
) -> None:
    """
    Create configuration file for munging by examining sample files.

    Parameters
    ----------
    sample_files : Dict[str, str]
        Dictionary mapping identifiers to sample file paths.
    output_config : str
        Output path for the configuration file.
    interactive : bool, optional
        Whether to use interactive mode for column mapping, by default True.

    Notes
    -----
    This function helps users create configuration files by examining
    the headers of input files and providing suggested column mappings.
    """
    # Use internal munging module (adapted from smunger)
    from .munging import create_config_template, inspect_headers

    config = {}

    for identifier, file_path in sample_files.items():
        logger.info(f"Examining headers for {identifier}: {file_path}")

        # Inspect file headers
        headers = inspect_headers(file_path)
        logger.info(f"Detected headers: {headers}")

        if interactive:
            # Interactive column mapping
            logger.info(f"Please specify column mappings for {identifier}")
            file_config = create_config_template(headers, interactive=True)
        else:
            # Auto-detect column mappings
            file_config = create_config_template(headers, interactive=False)

        config[identifier] = file_config

    # Save configuration
    import json

    with open(output_config, "w") as f:
        json.dump(config, f, indent=2)

    logger.info(f"Configuration saved to {output_config}")

munge_sumstats(input_files, output_dir, config_file=None, force_overwrite=False, **kwargs)

Munge summary statistics files using smunger integration.

Parameters:

Name Type Description Default
input_files Union[str, List[str], Dict[str, str]]

Input summary statistics file(s). Can be: - Single file path (str) - List of file paths (List[str]) - Dictionary mapping ancestry/cohort names to file paths (Dict[str, str])

required
output_dir str

Output directory for munged files.

required
config_file Optional[str]

Path to configuration file specifying column mappings, by default None.

None
force_overwrite bool

Whether to overwrite existing output files, by default False.

False
**kwargs

Additional arguments passed to smunger functions.

{}

Returns:

Type Description
Dict[str, str]

Dictionary mapping input identifiers to output file paths.

Raises:

Type Description
ImportError

If smunger package is not available.

FileNotFoundError

If input files do not exist.

ValueError

If input format is invalid.

Examples:

>>> # Single file
>>> result = munge_sumstats("gwas_eur.txt", "output/")
>>>
>>> # Multiple files with ancestry labels
>>> files = {"EUR": "gwas_eur.txt", "ASN": "gwas_asn.txt"}
>>> result = munge_sumstats(files, "output/")
Source code in credtools/preprocessing/munge.py
def munge_sumstats(
    input_files: Union[str, List[str], Dict[str, str]],
    output_dir: str,
    config_file: Optional[str] = None,
    force_overwrite: bool = False,
    **kwargs,
) -> Dict[str, str]:
    """
    Munge summary statistics files using smunger integration.

    Parameters
    ----------
    input_files : Union[str, List[str], Dict[str, str]]
        Input summary statistics file(s). Can be:
        - Single file path (str)
        - List of file paths (List[str])
        - Dictionary mapping ancestry/cohort names to file paths (Dict[str, str])
    output_dir : str
        Output directory for munged files.
    config_file : Optional[str], optional
        Path to configuration file specifying column mappings, by default None.
    force_overwrite : bool, optional
        Whether to overwrite existing output files, by default False.
    **kwargs
        Additional arguments passed to smunger functions.

    Returns
    -------
    Dict[str, str]
        Dictionary mapping input identifiers to output file paths.

    Raises
    ------
    ImportError
        If smunger package is not available.
    FileNotFoundError
        If input files do not exist.
    ValueError
        If input format is invalid.

    Examples
    --------
    >>> # Single file
    >>> result = munge_sumstats("gwas_eur.txt", "output/")
    >>>
    >>> # Multiple files with ancestry labels
    >>> files = {"EUR": "gwas_eur.txt", "ASN": "gwas_asn.txt"}
    >>> result = munge_sumstats(files, "output/")
    """
    # Use internal munging module (adapted from smunger)
    from .munging import load_and_munge, munge, read_config

    # Create output directory
    os.makedirs(output_dir, exist_ok=True)

    # Normalize input files to dictionary format
    if isinstance(input_files, str):
        # Single file - use filename as key
        file_key = Path(input_files).stem
        input_dict = {file_key: input_files}
    elif isinstance(input_files, list):
        # List of files - use filenames as keys
        input_dict = {Path(f).stem: f for f in input_files}
    elif isinstance(input_files, dict):
        # Already a dictionary
        input_dict = input_files
    else:
        raise ValueError("input_files must be a string, list of strings, or dictionary")

    # Validate input files exist
    for identifier, file_path in input_dict.items():
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"Input file not found: {file_path}")

    # Load configuration if provided
    config = {}
    if config_file and os.path.exists(config_file):
        logger.info(f"Loading configuration from {config_file}")
        config = read_config(config_file)

    # Process each file
    output_files = {}
    for identifier, input_file in tqdm(input_dict.items(), desc="Munging files"):
        logger.info(f"Processing {identifier}: {input_file}")

        # Define output file path
        output_file = os.path.join(output_dir, f"{identifier}.munged.txt.gz")
        output_files[identifier] = output_file

        # Skip if output exists and not overwriting
        if os.path.exists(output_file) and not force_overwrite:
            logger.info(f"Output file exists, skipping: {output_file}")
            continue

        try:
            # Apply file-specific config if available
            file_config = config.get(identifier, {})

            # Munge the file using internal munging module
            logger.info(f"Munging {input_file} -> {output_file}")
            munged_data = load_and_munge(input_file, config=file_config, **kwargs)

            # Save munged data
            logger.info(f"Saving munged data to {output_file}")
            munged_data.to_csv(output_file, sep="\t", index=False, compression="gzip")

        except Exception as e:
            logger.error(f"Failed to process {input_file}: {str(e)}")
            # Remove failed output file if it was created
            if os.path.exists(output_file):
                os.remove(output_file)
            raise

    logger.info(f"Successfully munged {len(output_files)} files")
    return output_files

validate_munged_files(munged_files, required_columns=None)

Validate munged summary statistics files.

Parameters:

Name Type Description Default
munged_files Dict[str, str]

Dictionary mapping identifiers to munged file paths.

required
required_columns Optional[List[str]]

List of required column names to check, by default None.

None

Returns:

Type Description
Dict[str, Dict]

Dictionary with validation results for each file.

Source code in credtools/preprocessing/munge.py
def validate_munged_files(
    munged_files: Dict[str, str], required_columns: Optional[List[str]] = None
) -> Dict[str, Dict]:
    """
    Validate munged summary statistics files.

    Parameters
    ----------
    munged_files : Dict[str, str]
        Dictionary mapping identifiers to munged file paths.
    required_columns : Optional[List[str]], optional
        List of required column names to check, by default None.

    Returns
    -------
    Dict[str, Dict]
        Dictionary with validation results for each file.
    """
    if required_columns is None:
        required_columns = [
            "CHR",
            "BP",
            "SNPID",
            "EA",
            "NEA",
            "EAF",
            "BETA",
            "SE",
            "P",
            "RSID",
        ]

    validation_results = {}

    for identifier, file_path in munged_files.items():
        logger.info(f"Validating {identifier}: {file_path}")

        result = {
            "file_exists": os.path.exists(file_path),
            "n_variants": 0,
            "columns": [],
            "missing_columns": [],
            "validation_passed": False,
        }

        if result["file_exists"]:
            try:
                # Read first few lines to check structure
                df_sample = pd.read_csv(
                    file_path, sep="\t", nrows=5, compression="gzip"
                )
                result["columns"] = df_sample.columns.tolist()
                result["missing_columns"] = [
                    col for col in required_columns if col not in result["columns"]
                ]

                # Count total variants
                df_full = pd.read_csv(file_path, sep="\t", compression="gzip")
                result["n_variants"] = len(df_full)

                # Check if validation passed
                result["validation_passed"] = len(result["missing_columns"]) == 0

                logger.info(
                    f"{identifier}: {result['n_variants']} variants, "
                    f"columns: {result['columns']}"
                )

                if result["missing_columns"]:
                    logger.warning(
                        f"{identifier}: Missing required columns: {result['missing_columns']}"
                    )

            except Exception as e:
                logger.error(f"Error validating {file_path}: {str(e)}")
                result["error"] = str(e)

        validation_results[identifier] = result

    return validation_results

Chunking

Chunk whole genome summary statistics into independent loci.

This module provides functionality to identify independent lead SNPs and create regional chunks suitable for fine-mapping analysis across multiple ancestries.

chunk_sumstats(loci_df, sumstats_files, output_dir, threads=1, compress=True)

Chunk summary statistics files into loci-specific files.

Parameters:

Name Type Description Default
loci_df DataFrame

DataFrame with loci coordinates from identify_independent_loci.

required
sumstats_files Dict[str, str]

Dictionary mapping ancestry names to sumstats file paths.

required
output_dir str

Output directory for chunked files.

required
threads int

Number of threads for parallel processing, by default 1.

1
compress bool

Whether to compress output files, by default True.

True

Returns:

Type Description
DataFrame

DataFrame with information about generated files.

Examples:

>>> loci_df = identify_independent_loci(files, "output/")
>>> file_info = chunk_sumstats(loci_df, files, "output/chunks/")
Source code in credtools/preprocessing/chunk.py
def chunk_sumstats(
    loci_df: pd.DataFrame,
    sumstats_files: Dict[str, str],
    output_dir: str,
    threads: int = 1,
    compress: bool = True,
) -> pd.DataFrame:
    """
    Chunk summary statistics files into loci-specific files.

    Parameters
    ----------
    loci_df : pd.DataFrame
        DataFrame with loci coordinates from identify_independent_loci.
    sumstats_files : Dict[str, str]
        Dictionary mapping ancestry names to sumstats file paths.
    output_dir : str
        Output directory for chunked files.
    threads : int, optional
        Number of threads for parallel processing, by default 1.
    compress : bool, optional
        Whether to compress output files, by default True.

    Returns
    -------
    pd.DataFrame
        DataFrame with information about generated files.

    Examples
    --------
    >>> loci_df = identify_independent_loci(files, "output/")
    >>> file_info = chunk_sumstats(loci_df, files, "output/chunks/")
    """
    os.makedirs(output_dir, exist_ok=True)

    file_info_list = []

    # Load all sumstats files
    sumstats_data = {}
    for ancestry, file_path in sumstats_files.items():
        logger.info(f"Loading {ancestry} sumstats: {file_path}")
        sumstats_data[ancestry] = pd.read_csv(file_path, sep="\t", compression="gzip")

    # Process each locus
    for _, locus in tqdm(loci_df.iterrows(), total=len(loci_df), desc="Chunking loci"):
        locus_id = locus["locus_id"]
        chrom = locus["chr"]
        start = locus["start"]
        end = locus["end"]

        # Get ancestries for this locus
        if "," in locus["ancestry"]:
            ancestries = locus["ancestry"].split(",")
        else:
            ancestries = [locus["ancestry"]]

        # Extract data for each ancestry
        for ancestry in ancestries:
            if ancestry not in sumstats_data:
                logger.warning(f"No data available for ancestry: {ancestry}")
                continue

            # Extract locus data
            locus_data = sumstats_data[ancestry][
                (sumstats_data[ancestry]["CHR"] == chrom)
                & (sumstats_data[ancestry]["BP"] >= start)
                & (sumstats_data[ancestry]["BP"] <= end)
            ].copy()

            if len(locus_data) == 0:
                logger.warning(f"No variants found for {ancestry} {locus_id}")
                continue

            # Define output file
            suffix = ".gz" if compress else ""
            output_file = os.path.join(
                output_dir, f"{ancestry}.{locus_id}.sumstats{suffix}"
            )

            # Save chunked data
            if compress:
                locus_data.to_csv(
                    output_file, sep="\t", index=False, compression="gzip"
                )
            else:
                locus_data.to_csv(output_file, sep="\t", index=False)

            # Record file info
            file_info_list.append(
                {
                    "locus_id": locus_id,
                    "ancestry": ancestry,
                    "chr": chrom,
                    "start": start,
                    "end": end,
                    "n_variants": len(locus_data),
                    "sumstats_file": output_file,
                }
            )

    # Create file info DataFrame
    file_info_df = pd.DataFrame(file_info_list)

    # Save file info
    info_file = os.path.join(output_dir, "chunk_info.txt")
    file_info_df.to_csv(info_file, sep="\t", index=False)

    logger.info(f"Generated {len(file_info_list)} chunked files")
    logger.info(f"File information saved to {info_file}")

    return file_info_df

create_loci_list_for_credtools(chunk_info_df, ld_info_df=None, output_file='loci_list.txt')

Create loci list file compatible with credtools format.

Parameters:

Name Type Description Default
chunk_info_df DataFrame

DataFrame from chunk_sumstats with file information.

required
ld_info_df Optional[DataFrame]

DataFrame with LD file information, by default None.

None
output_file str

Output file path, by default "loci_list.txt".

'loci_list.txt'

Returns:

Type Description
DataFrame

DataFrame in credtools loci list format.

Source code in credtools/preprocessing/chunk.py
def create_loci_list_for_credtools(
    chunk_info_df: pd.DataFrame,
    ld_info_df: Optional[pd.DataFrame] = None,
    output_file: str = "loci_list.txt",
) -> pd.DataFrame:
    """
    Create loci list file compatible with credtools format.

    Parameters
    ----------
    chunk_info_df : pd.DataFrame
        DataFrame from chunk_sumstats with file information.
    ld_info_df : Optional[pd.DataFrame], optional
        DataFrame with LD file information, by default None.
    output_file : str, optional
        Output file path, by default "loci_list.txt".

    Returns
    -------
    pd.DataFrame
        DataFrame in credtools loci list format.
    """
    # Group by locus_id to create credtools format
    loci_list = []

    for locus_id, group in chunk_info_df.groupby("locus_id"):
        for _, row in group.iterrows():
            # Extract prefix from sumstats file
            sumstats_file = row["sumstats_file"]
            prefix = str(Path(sumstats_file).with_suffix("")).replace(".sumstats", "")

            locus_entry = {
                "locus_id": locus_id,
                "chr": row["chr"],
                "start": row["start"],
                "end": row["end"],
                "popu": row["ancestry"],
                "cohort": row["ancestry"],  # Use ancestry as cohort for now
                "sample_size": 50000,  # Placeholder - should be provided by user
                "prefix": prefix,
            }

            # Add LD file info if available
            if ld_info_df is not None:
                ld_match = ld_info_df[
                    (ld_info_df["locus_id"] == locus_id)
                    & (ld_info_df["ancestry"] == row["ancestry"])
                ]
                if len(ld_match) > 0:
                    locus_entry.update(ld_match.iloc[0].to_dict())

            loci_list.append(locus_entry)

    loci_df = pd.DataFrame(loci_list)

    # Save to file
    loci_df.to_csv(output_file, sep="\t", index=False)
    logger.info(f"Created credtools loci list: {output_file}")

    return loci_df

identify_independent_loci(sumstats_files, output_dir, distance_threshold=500000, pvalue_threshold=5e-08, merge_overlapping=True, use_most_sig_if_no_sig=True, min_variants_per_locus=10, **kwargs)

Identify independent loci across multiple ancestries.

Parameters:

Name Type Description Default
sumstats_files Union[Dict[str, str], str]

Dictionary mapping ancestry/cohort names to munged sumstats files, or single file path.

required
output_dir str

Output directory for results.

required
distance_threshold int

Distance threshold in base pairs for independence, by default 500000.

500000
pvalue_threshold float

P-value threshold for significance, by default 5e-8.

5e-08
merge_overlapping bool

Whether to merge overlapping loci across ancestries, by default True.

True
use_most_sig_if_no_sig bool

Whether to use most significant SNP if no significant SNPs found, by default True.

True
min_variants_per_locus int

Minimum number of variants required per locus, by default 10.

10
**kwargs

Additional parameters.

{}

Returns:

Type Description
DataFrame

DataFrame with identified loci coordinates and lead SNPs.

Examples:

>>> files = {"EUR": "eur.munged.txt.gz", "ASN": "asn.munged.txt.gz"}
>>> loci_df = identify_independent_loci(files, "output/")
Source code in credtools/preprocessing/chunk.py
def identify_independent_loci(
    sumstats_files: Union[Dict[str, str], str],
    output_dir: str,
    distance_threshold: int = 500000,
    pvalue_threshold: float = 5e-8,
    merge_overlapping: bool = True,
    use_most_sig_if_no_sig: bool = True,
    min_variants_per_locus: int = 10,
    **kwargs,
) -> pd.DataFrame:
    """
    Identify independent loci across multiple ancestries.

    Parameters
    ----------
    sumstats_files : Union[Dict[str, str], str]
        Dictionary mapping ancestry/cohort names to munged sumstats files,
        or single file path.
    output_dir : str
        Output directory for results.
    distance_threshold : int, optional
        Distance threshold in base pairs for independence, by default 500000.
    pvalue_threshold : float, optional
        P-value threshold for significance, by default 5e-8.
    merge_overlapping : bool, optional
        Whether to merge overlapping loci across ancestries, by default True.
    use_most_sig_if_no_sig : bool, optional
        Whether to use most significant SNP if no significant SNPs found, by default True.
    min_variants_per_locus : int, optional
        Minimum number of variants required per locus, by default 10.
    **kwargs
        Additional parameters.

    Returns
    -------
    pd.DataFrame
        DataFrame with identified loci coordinates and lead SNPs.

    Examples
    --------
    >>> files = {"EUR": "eur.munged.txt.gz", "ASN": "asn.munged.txt.gz"}
    >>> loci_df = identify_independent_loci(files, "output/")
    """
    os.makedirs(output_dir, exist_ok=True)

    # Normalize input to dictionary
    if isinstance(sumstats_files, str):
        ancestry_key = Path(sumstats_files).stem.replace(".munged", "")
        sumstats_files = {ancestry_key: sumstats_files}

    all_loci = []

    # Process each ancestry file
    for ancestry, file_path in tqdm(
        sumstats_files.items(), desc="Processing ancestries"
    ):
        logger.info(f"Processing {ancestry}: {file_path}")

        # Load sumstats
        sumstats = pd.read_csv(file_path, sep="\t", compression="gzip")

        # Identify independent SNPs for this ancestry
        ancestry_loci = _identify_independent_snps_by_distance(
            sumstats=sumstats,
            ancestry=ancestry,
            distance_threshold=distance_threshold,
            pvalue_threshold=pvalue_threshold,
            use_most_sig_if_no_sig=use_most_sig_if_no_sig,
            min_variants_per_locus=min_variants_per_locus,
        )

        all_loci.extend(ancestry_loci)

    # Convert to DataFrame
    loci_df = pd.DataFrame(all_loci)

    if len(loci_df) == 0:
        logger.warning("No loci identified")
        return loci_df

    # Merge overlapping loci across ancestries if requested
    if merge_overlapping and len(sumstats_files) > 1:
        logger.info("Merging overlapping loci across ancestries")
        loci_df = _merge_overlapping_loci(loci_df)

    # Sort by chromosome and position
    loci_df = loci_df.sort_values(["chr", "start"]).reset_index(drop=True)

    # Add locus IDs
    loci_df["locus_id"] = [
        f"chr{row['chr']}_{row['start']}_{row['end']}" for _, row in loci_df.iterrows()
    ]

    # Save results
    output_file = os.path.join(output_dir, "identified_loci.txt")
    loci_df.to_csv(output_file, sep="\t", index=False)
    logger.info(f"Identified {len(loci_df)} loci, saved to {output_file}")

    return loci_df

Prepare Helpers

Prepare LD matrices and final fine-mapping inputs.

This module provides functionality to extract LD matrices from genotype data and create final input files compatible with credtools fine-mapping pipeline.

prepare_finemap_inputs(chunk_info_df, genotype_files, output_dir, threads=1, ld_format='plink', keep_intermediate=False, **kwargs)

Prepare final fine-mapping input files from chunked sumstats and genotype data.

Parameters:

Name Type Description Default
chunk_info_df DataFrame

DataFrame from chunk_sumstats with chunked file information.

required
genotype_files Dict[str, str]

Dictionary mapping ancestry names to genotype file prefixes. Supports PLINK format (.bed/.bim/.fam) and VCF format.

required
output_dir str

Output directory for prepared files.

required
threads int

Number of threads for parallel processing, by default 1.

1
ld_format str

Format for LD computation ("plink", "vcf"), by default "plink".

'plink'
keep_intermediate bool

Whether to keep intermediate files, by default False.

False
**kwargs

Additional parameters.

{}

Returns:

Type Description
DataFrame

DataFrame with information about prepared files.

Examples:

>>> genotype_files = {"EUR": "eur_genotypes", "ASN": "asn_genotypes"}
>>> prepared_df = prepare_finemap_inputs(chunk_info_df, genotype_files, "output/")
Source code in credtools/preprocessing/prepare.py
def prepare_finemap_inputs(
    chunk_info_df: pd.DataFrame,
    genotype_files: Dict[str, str],
    output_dir: str,
    threads: int = 1,
    ld_format: str = "plink",
    keep_intermediate: bool = False,
    **kwargs,
) -> pd.DataFrame:
    """
    Prepare final fine-mapping input files from chunked sumstats and genotype data.

    Parameters
    ----------
    chunk_info_df : pd.DataFrame
        DataFrame from chunk_sumstats with chunked file information.
    genotype_files : Dict[str, str]
        Dictionary mapping ancestry names to genotype file prefixes.
        Supports PLINK format (.bed/.bim/.fam) and VCF format.
    output_dir : str
        Output directory for prepared files.
    threads : int, optional
        Number of threads for parallel processing, by default 1.
    ld_format : str, optional
        Format for LD computation ("plink", "vcf"), by default "plink".
    keep_intermediate : bool, optional
        Whether to keep intermediate files, by default False.
    **kwargs
        Additional parameters.

    Returns
    -------
    pd.DataFrame
        DataFrame with information about prepared files.

    Examples
    --------
    >>> genotype_files = {"EUR": "eur_genotypes", "ASN": "asn_genotypes"}
    >>> prepared_df = prepare_finemap_inputs(chunk_info_df, genotype_files, "output/")
    """
    os.makedirs(output_dir, exist_ok=True)

    # Group by ancestry for parallel processing (use 'popu' column from chunk output)
    ancestry_groups = chunk_info_df.groupby("popu")

    # Prepare arguments for parallel processing
    prepare_args = []
    for ancestry, group in ancestry_groups:
        if ancestry not in genotype_files:
            logger.warning(f"No genotype file specified for ancestry: {ancestry}")
            continue

        prepare_args.append(
            (
                ancestry,
                genotype_files[ancestry],
                group,
                output_dir,
                ld_format,
                keep_intermediate,
                kwargs,
            )
        )

    # Process ancestries in parallel
    if threads > 1:
        logger.info(
            f"Processing {len(prepare_args)} ancestries using {threads} threads"
        )
        with Pool(threads) as pool:
            results = list(
                tqdm(
                    pool.starmap(_prepare_ancestry_files, prepare_args),
                    total=len(prepare_args),
                    desc="Preparing ancestries",
                )
            )
    else:
        logger.info(f"Processing {len(prepare_args)} ancestries sequentially")
        results = [
            _prepare_ancestry_files(*args)
            for args in tqdm(prepare_args, desc="Preparing ancestries")
        ]

    # Combine results
    all_prepared_files = []
    for result in results:
        all_prepared_files.extend(result)

    prepared_df = pd.DataFrame(all_prepared_files)

    # Save results
    output_file = os.path.join(output_dir, "prepared_files.txt")
    prepared_df.to_csv(output_file, sep="\t", index=False)

    logger.info(f"Prepared {len(all_prepared_files)} locus files")
    logger.info(f"Results saved to {output_file}")

    return prepared_df

Low-Level Munging Helpers

Core munging functions for GWAS summary statistics.

Adapted from smunger (https://github.com/Jianhua-Wang/smunger) Original author: Jianhua Wang License: MIT

This module provides the main data cleaning and standardization functions for processing GWAS summary statistics.

make_SNPID_unique(df, remove_duplicates=True, col_chr=ColName.CHR, col_bp=ColName.BP, col_ea=ColName.EA, col_nea=ColName.NEA, col_p=ColName.P)

Generate unique SNP identifiers and optionally remove duplicates.

Adapted from smunger.make_SNPID_unique() function.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame.

required
remove_duplicates bool

Whether to remove duplicated SNPs, keeping the one with smallest p-value.

True
col_chr str

Column name for chromosome.

CHR
col_bp str

Column name for base pair position.

BP
col_ea str

Column name for effect allele.

EA
col_nea str

Column name for non-effect allele.

NEA
col_p str

Column name for p-value.

P

Returns:

Type Description
DataFrame

DataFrame with unique SNPID column.

Source code in credtools/preprocessing/munging/core.py
def make_SNPID_unique(
    df: pd.DataFrame,
    remove_duplicates: bool = True,
    col_chr: str = ColName.CHR,
    col_bp: str = ColName.BP,
    col_ea: str = ColName.EA,
    col_nea: str = ColName.NEA,
    col_p: str = ColName.P,
) -> pd.DataFrame:
    """
    Generate unique SNP identifiers and optionally remove duplicates.

    Adapted from smunger.make_SNPID_unique() function.

    Parameters
    ----------
    df : pd.DataFrame
        Input DataFrame.
    remove_duplicates : bool, optional
        Whether to remove duplicated SNPs, keeping the one with smallest p-value.
    col_chr : str, optional
        Column name for chromosome.
    col_bp : str, optional
        Column name for base pair position.
    col_ea : str, optional
        Column name for effect allele.
    col_nea : str, optional
        Column name for non-effect allele.
    col_p : str, optional
        Column name for p-value.

    Returns
    -------
    pd.DataFrame
        DataFrame with unique SNPID column.
    """
    outdf = df.copy()

    # Sort alleles alphabetically to ensure consistent SNP IDs
    allele_df = outdf[[col_ea, col_nea]].apply(
        lambda row: sorted([str(row[col_ea]), str(row[col_nea])]),
        axis=1,
        result_type="expand",
    )
    allele_df.columns = ["allele1", "allele2"]

    # Create unique SNPID: chr-bp-allele1-allele2
    outdf[ColName.SNPID] = (
        outdf[col_chr].astype(str)
        + "-"
        + outdf[col_bp].astype(str)
        + "-"
        + allele_df["allele1"]
        + "-"
        + allele_df["allele2"]
    )

    # Move SNPID to first column
    cols = outdf.columns.tolist()
    if ColName.SNPID in cols:
        cols.insert(0, cols.pop(cols.index(ColName.SNPID)))
        outdf = outdf[cols]

    # Handle duplicates
    n_duplicated = outdf.duplicated(subset=[ColName.SNPID]).sum()

    if remove_duplicates and n_duplicated > 0:
        logger.info(f"Removing {n_duplicated} duplicate SNPs")
        if col_p in outdf.columns:
            # Sort by p-value to keep most significant
            outdf = outdf.sort_values(col_p)
        outdf = outdf.drop_duplicates(subset=[ColName.SNPID], keep="first")
        outdf = outdf.sort_values([col_chr, col_bp])
        outdf.reset_index(drop=True, inplace=True)
    elif n_duplicated > 0:
        logger.warning(f"Found {n_duplicated} duplicate SNPs, keeping all")
        # Add suffix to make unique
        dup_suffix = "-" + outdf.groupby(ColName.SNPID).cumcount().astype(str)
        dup_suffix = dup_suffix.str.replace("-0", "")
        outdf[ColName.SNPID] = outdf[ColName.SNPID] + dup_suffix

    logger.debug(f"Created unique SNPIDs: {len(outdf)} variants")

    return outdf

munge(df)

Clean and standardize GWAS summary statistics.

Adapted from smunger.munge() function.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame with GWAS summary statistics.

required

Returns:

Type Description
DataFrame

Cleaned and standardized DataFrame.

Notes

This function performs comprehensive data cleaning including: 1. Removing columns with all NA values 2. Cleaning and validating core columns (CHR, BP, alleles) 3. Creating unique SNP identifiers 4. Processing p-values, effect sizes, and other statistics 5. Sorting by chromosome and position

Source code in credtools/preprocessing/munging/core.py
def munge(df: pd.DataFrame) -> pd.DataFrame:
    """
    Clean and standardize GWAS summary statistics.

    Adapted from smunger.munge() function.

    Parameters
    ----------
    df : pd.DataFrame
        Input DataFrame with GWAS summary statistics.

    Returns
    -------
    pd.DataFrame
        Cleaned and standardized DataFrame.

    Notes
    -----
    This function performs comprehensive data cleaning including:
    1. Removing columns with all NA values
    2. Cleaning and validating core columns (CHR, BP, alleles)
    3. Creating unique SNP identifiers
    4. Processing p-values, effect sizes, and other statistics
    5. Sorting by chromosome and position
    """
    logger.info("Starting munging process")
    original_rows = len(df)

    # Check mandatory columns
    check_mandatory_cols(df)

    # Make a copy to avoid modifying input
    outdf = df.copy()

    # Remove columns that are all NA
    outdf = _remove_all_na_columns(outdf)

    # Clean core columns
    outdf = _munge_chr(outdf)
    outdf = _munge_bp(outdf)
    outdf = _munge_alleles(outdf)

    # Create unique SNP identifiers
    outdf = make_SNPID_unique(outdf)

    # Process statistical columns
    outdf = _munge_pvalue(outdf)
    outdf = _munge_beta(outdf)
    outdf = _munge_se(outdf)

    # Process frequency columns if present
    if ColName.EAF in outdf.columns:
        outdf = _munge_eaf(outdf)

    # Sort by chromosome and position
    outdf = outdf.sort_values([ColName.CHR, ColName.BP])
    outdf.reset_index(drop=True, inplace=True)

    # Ensure correct column order and types
    outdf = _finalize_columns(outdf)

    final_rows = len(outdf)
    logger.info(
        f"Munging complete: {original_rows} -> {final_rows} rows ({original_rows - final_rows} removed)"
    )

    return outdf

transform_allele(series)

Transform allele values to standard format.

Source code in credtools/preprocessing/munging/core.py
def transform_allele(series: pd.Series) -> pd.Series:
    """Transform allele values to standard format."""
    result = series.astype(str).str.upper()

    # Only keep valid DNA bases
    valid_pattern = r"^[ACGT]+$"
    mask = result.str.match(valid_pattern, na=False)
    result.loc[~mask] = np.nan

    return result

transform_chr(series)

Transform chromosome values to standard format.

Source code in credtools/preprocessing/munging/core.py
def transform_chr(series: pd.Series) -> pd.Series:
    """Transform chromosome values to standard format."""
    # Convert to string first
    result = series.astype(str)

    # Remove 'chr' prefix
    result = result.str.replace("chr", "", case=False)

    # Convert X to 23
    result = result.replace(["X", "x"], "23")

    # Convert to numeric
    result = pd.to_numeric(result, errors="coerce")

    return result

Header detection and mapping utilities for GWAS summary statistics.

Adapted from smunger (https://github.com/Jianhua-Wang/smunger) Original author: Jianhua Wang License: MIT

This module provides functionality to detect, map, and standardize column headers from various GWAS summary statistics file formats.

apply_header_mapping(df, mapping)

Apply header mapping to DataFrame.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame.

required
mapping Dict[str, str]

Mapping from current column names to new names.

required

Returns:

Type Description
DataFrame

DataFrame with renamed columns.

Source code in credtools/preprocessing/munging/headers.py
def apply_header_mapping(df: pd.DataFrame, mapping: Dict[str, str]) -> pd.DataFrame:
    """
    Apply header mapping to DataFrame.

    Parameters
    ----------
    df : pd.DataFrame
        Input DataFrame.
    mapping : Dict[str, str]
        Mapping from current column names to new names.

    Returns
    -------
    pd.DataFrame
        DataFrame with renamed columns.
    """
    outdf = df.copy()

    # Only rename columns that exist in the DataFrame
    existing_mapping = {k: v for k, v in mapping.items() if k in outdf.columns}

    if existing_mapping:
        outdf = outdf.rename(columns=existing_mapping)
        logger.info(f"Applied mapping to {len(existing_mapping)} columns")

    return outdf

create_config_template(headers, interactive=False)

Create configuration template for column mapping.

Parameters:

Name Type Description Default
headers List[str]

List of column headers.

required
interactive bool

Whether to use interactive mode for mapping.

False

Returns:

Type Description
Dict[str, Any]

Configuration dictionary with column mappings.

Source code in credtools/preprocessing/munging/headers.py
def create_config_template(
    headers: List[str], interactive: bool = False
) -> Dict[str, Any]:
    """
    Create configuration template for column mapping.

    Parameters
    ----------
    headers : List[str]
        List of column headers.
    interactive : bool, optional
        Whether to use interactive mode for mapping.

    Returns
    -------
    Dict[str, Any]
        Configuration dictionary with column mappings.
    """
    if interactive:
        return _create_interactive_config(headers)
    else:
        return _create_automatic_config(headers)

inspect_headers(file_path, sep=None, nrows=5)

Inspect file headers and return column names.

Parameters:

Name Type Description Default
file_path str

Path to the input file.

required
sep str

Column separator. If None, will try to auto-detect.

None
nrows int

Number of rows to read for inspection.

5

Returns:

Type Description
List[str]

List of column headers.

Source code in credtools/preprocessing/munging/headers.py
def inspect_headers(
    file_path: str, sep: Optional[str] = None, nrows: int = 5
) -> List[str]:
    """
    Inspect file headers and return column names.

    Parameters
    ----------
    file_path : str
        Path to the input file.
    sep : str, optional
        Column separator. If None, will try to auto-detect.
    nrows : int, optional
        Number of rows to read for inspection.

    Returns
    -------
    List[str]
        List of column headers.
    """
    # Auto-detect separator if not provided
    if sep is None:
        sep = _detect_separator(file_path)

    try:
        # Read just the header and a few rows
        df = pd.read_csv(file_path, sep=sep, nrows=nrows)
        headers = df.columns.tolist()

        logger.info(f"Detected {len(headers)} columns in {file_path}")
        logger.debug(f"Headers: {headers}")

        return headers

    except Exception as e:
        logger.error(f"Failed to inspect headers in {file_path}: {str(e)}")
        raise

map_headers_automatic(headers)

Automatically map headers to standard column names.

Parameters:

Name Type Description Default
headers List[str]

List of column headers from input file.

required

Returns:

Type Description
Dict[str, str]

Mapping from original headers to standard column names.

Source code in credtools/preprocessing/munging/headers.py
def map_headers_automatic(headers: List[str]) -> Dict[str, str]:
    """
    Automatically map headers to standard column names.

    Parameters
    ----------
    headers : List[str]
        List of column headers from input file.

    Returns
    -------
    Dict[str, str]
        Mapping from original headers to standard column names.
    """
    mapping = {}

    for header in headers:
        if header in COMMON_COLNAMES:
            mapping[header] = COMMON_COLNAMES[header]
        else:
            # Try fuzzy matching
            mapped = _fuzzy_match_header(header)
            mapping[header] = mapped or header

    logger.info(
        f"Automatically mapped {sum(1 for v in mapping.values() if v in ColName.sumstat_cols)} columns"
    )

    return mapping

suggest_missing_mappings(headers, mapped_headers)

Suggest mappings for unmapped headers.

Parameters:

Name Type Description Default
headers List[str]

Original headers.

required
mapped_headers Dict[str, str]

Already mapped headers.

required

Returns:

Type Description
Dict[str, str]

Suggestions for unmapped headers.

Source code in credtools/preprocessing/munging/headers.py
def suggest_missing_mappings(
    headers: List[str], mapped_headers: Dict[str, str]
) -> Dict[str, str]:
    """
    Suggest mappings for unmapped headers.

    Parameters
    ----------
    headers : List[str]
        Original headers.
    mapped_headers : Dict[str, str]
        Already mapped headers.

    Returns
    -------
    Dict[str, str]
        Suggestions for unmapped headers.
    """
    mapped_standards = set(mapped_headers.values())
    required_standards = set(ColName.mandatory_cols)

    missing_standards = required_standards - mapped_standards
    unmapped_headers = [h for h in headers if h not in mapped_headers]

    suggestions = {}

    # Try to map missing required columns
    for missing_std in missing_standards:
        for unmapped in unmapped_headers:
            fuzzy_match = _fuzzy_match_header(unmapped)
            if fuzzy_match == missing_std:
                suggestions[unmapped] = missing_std
                unmapped_headers.remove(unmapped)
                break

    logger.info(f"Generated {len(suggestions)} additional mapping suggestions")

    return suggestions

validate_required_columns(df, required=None)

Validate that required columns are present.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame to validate.

required
required List[str]

List of required column names. Uses mandatory columns if None.

None

Returns:

Type Description
bool

True if all required columns are present.

Source code in credtools/preprocessing/munging/headers.py
def validate_required_columns(
    df: pd.DataFrame, required: Optional[List[str]] = None
) -> bool:
    """
    Validate that required columns are present.

    Parameters
    ----------
    df : pd.DataFrame
        Input DataFrame to validate.
    required : List[str], optional
        List of required column names. Uses mandatory columns if None.

    Returns
    -------
    bool
        True if all required columns are present.
    """
    if required is None:
        required = ColName.mandatory_cols

    missing = set(required) - set(df.columns)

    if missing:
        logger.error(f"Missing required columns: {missing}")
        return False

    logger.info("All required columns are present")
    return True

Validation functions for GWAS summary statistics columns.

Adapted from smunger (https://github.com/Jianhua-Wang/smunger) Original author: Jianhua Wang License: MIT

This module provides validation and cleaning functions for individual columns in GWAS summary statistics data.

check_mandatory_cols(df)

Check if DataFrame contains all mandatory columns.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame to validate.

required

Raises:

Type Description
ValueError

If any mandatory columns are missing.

Source code in credtools/preprocessing/munging/validation.py
def check_mandatory_cols(df: pd.DataFrame) -> None:
    """
    Check if DataFrame contains all mandatory columns.

    Parameters
    ----------
    df : pd.DataFrame
        Input DataFrame to validate.

    Raises
    ------
    ValueError
        If any mandatory columns are missing.
    """
    missing_cols = set(ColName.mandatory_cols) - set(df.columns)
    if missing_cols:
        raise ValueError(f"Missing mandatory columns: {missing_cols}")

validate_allele_consistency(df)

Validate that alleles are consistent and biallelic.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame with EA and NEA columns.

required

Returns:

Type Description
DataFrame

DataFrame with consistent alleles.

Source code in credtools/preprocessing/munging/validation.py
def validate_allele_consistency(df: pd.DataFrame) -> pd.DataFrame:
    """
    Validate that alleles are consistent and biallelic.

    Parameters
    ----------
    df : pd.DataFrame
        Input DataFrame with EA and NEA columns.

    Returns
    -------
    pd.DataFrame
        DataFrame with consistent alleles.
    """
    outdf = df.copy()

    if ColName.EA not in outdf.columns or ColName.NEA not in outdf.columns:
        return outdf

    original_count = len(outdf)

    # Remove rows where alleles are identical
    outdf = outdf[outdf[ColName.EA] != outdf[ColName.NEA]]

    # Remove rows where either allele is missing (if required)
    outdf = outdf[outdf[ColName.EA].notna() & outdf[ColName.NEA].notna()]

    final_count = len(outdf)
    if original_count > final_count:
        logger.debug(
            f"Removed {original_count - final_count} rows with invalid alleles"
        )

    return outdf

validate_and_clean_column(df, col_name, col_type, min_val=None, max_val=None, allow_na=True, exclude_min=False, exclude_max=False, transform_func=None)

Validate and clean a single column in the DataFrame.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame.

required
col_name str

Name of column to validate.

required
col_type type

Target data type for the column.

required
min_val float

Minimum allowed value.

None
max_val float

Maximum allowed value.

None
allow_na bool

Whether NA values are allowed.

True
exclude_min bool

Whether to exclude the minimum value itself.

False
exclude_max bool

Whether to exclude the maximum value itself.

False
transform_func callable

Function to transform values before validation.

None

Returns:

Type Description
DataFrame

DataFrame with cleaned column.

Source code in credtools/preprocessing/munging/validation.py
def validate_and_clean_column(
    df: pd.DataFrame,
    col_name: str,
    col_type: Any,
    min_val: Optional[float] = None,
    max_val: Optional[float] = None,
    allow_na: bool = True,
    exclude_min: bool = False,
    exclude_max: bool = False,
    transform_func: Optional[Callable] = None,
) -> pd.DataFrame:
    """
    Validate and clean a single column in the DataFrame.

    Parameters
    ----------
    df : pd.DataFrame
        Input DataFrame.
    col_name : str
        Name of column to validate.
    col_type : type
        Target data type for the column.
    min_val : float, optional
        Minimum allowed value.
    max_val : float, optional
        Maximum allowed value.
    allow_na : bool, optional
        Whether NA values are allowed.
    exclude_min : bool, optional
        Whether to exclude the minimum value itself.
    exclude_max : bool, optional
        Whether to exclude the maximum value itself.
    transform_func : callable, optional
        Function to transform values before validation.

    Returns
    -------
    pd.DataFrame
        DataFrame with cleaned column.
    """
    if col_name not in df.columns:
        if not allow_na:
            raise ValueError(f"Required column '{col_name}' not found")
        return df

    outdf = df.copy()
    original_count = len(outdf)

    # Apply transformation if provided
    if transform_func is not None:
        outdf[col_name] = transform_func(outdf[col_name])

    # Remove rows with NA values if not allowed
    if not allow_na:
        outdf = outdf[outdf[col_name].notna()]

    # Convert to target type for numeric columns
    if col_type in [
        np.int8,
        np.int16,
        np.int32,
        np.int64,
        np.float16,
        np.float32,
        np.float64,
    ]:
        outdf[col_name] = pd.to_numeric(outdf[col_name], errors="coerce")

        # Remove rows that couldn't be converted
        if not allow_na:
            outdf = outdf[outdf[col_name].notna()]

    # Apply range validation
    if min_val is not None:
        if exclude_min:
            mask = outdf[col_name] > min_val
        else:
            mask = outdf[col_name] >= min_val
        outdf = outdf[mask | outdf[col_name].isna()]

    if max_val is not None:
        if exclude_max:
            mask = outdf[col_name] < max_val
        else:
            mask = outdf[col_name] <= max_val
        outdf = outdf[mask | outdf[col_name].isna()]

    # Convert to final type
    if col_type == str:
        outdf[col_name] = outdf[col_name].astype(str)
        outdf.loc[outdf[col_name] == "nan", col_name] = np.nan
    else:
        outdf[col_name] = outdf[col_name].astype(col_type)

    final_count = len(outdf)
    if original_count > final_count:
        logger.debug(
            f"Column {col_name}: removed {original_count - final_count} invalid rows"
        )

    return outdf

validate_frequency_consistency(df)

Validate frequency column consistency.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame with frequency columns.

required

Returns:

Type Description
DataFrame

DataFrame with consistent frequency data.

Source code in credtools/preprocessing/munging/validation.py
def validate_frequency_consistency(df: pd.DataFrame) -> pd.DataFrame:
    """
    Validate frequency column consistency.

    Parameters
    ----------
    df : pd.DataFrame
        Input DataFrame with frequency columns.

    Returns
    -------
    pd.DataFrame
        DataFrame with consistent frequency data.
    """
    outdf = df.copy()

    # Ensure MAF is actually minor (≤ 0.5)
    if ColName.MAF in outdf.columns:
        # Convert frequencies > 0.5 to 1 - frequency
        mask = outdf[ColName.MAF] > 0.5
        outdf.loc[mask, ColName.MAF] = 1 - outdf.loc[mask, ColName.MAF]

    # Ensure EAF and MAF are consistent
    if all(col in outdf.columns for col in [ColName.EAF, ColName.MAF]):
        # MAF should be min(EAF, 1-EAF)
        expected_maf = outdf[ColName.EAF].apply(
            lambda x: min(x, 1 - x) if pd.notna(x) else np.nan
        )

        # Check for large discrepancies
        discrepancy = abs(outdf[ColName.MAF] - expected_maf)
        large_discrepancy = discrepancy > 0.01  # 1% tolerance

        if large_discrepancy.any():
            n_discrepant = large_discrepancy.sum()
            logger.warning(f"Found {n_discrepant} rows with EAF/MAF discrepancies")
            # Use computed MAF
            outdf.loc[large_discrepancy, ColName.MAF] = expected_maf.loc[
                large_discrepancy
            ]

    return outdf

validate_pvalue_consistency(df)

Validate p-value consistency with other statistics.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame with p-value and other statistical columns.

required

Returns:

Type Description
DataFrame

DataFrame with consistent p-values.

Source code in credtools/preprocessing/munging/validation.py
def validate_pvalue_consistency(df: pd.DataFrame) -> pd.DataFrame:
    """
    Validate p-value consistency with other statistics.

    Parameters
    ----------
    df : pd.DataFrame
        Input DataFrame with p-value and other statistical columns.

    Returns
    -------
    pd.DataFrame
        DataFrame with consistent p-values.
    """
    outdf = df.copy()

    # If we have BETA, SE, and P, check consistency
    if all(col in outdf.columns for col in [ColName.BETA, ColName.SE, ColName.P]):
        # Compute Z-score
        z_score = outdf[ColName.BETA] / outdf[ColName.SE]

        # Compute expected p-value (two-tailed test)
        try:
            from scipy.stats import norm

            expected_p = np.maximum(2 * (1 - norm.cdf(abs(z_score))), 1e-300)
        except ImportError:
            # Fallback if scipy not available
            logger.warning("scipy not available, skipping p-value consistency check")
            return outdf

        # Check for large discrepancies (order of magnitude)
        log_ratio = np.log10(outdf[ColName.P]) - np.log10(expected_p)
        large_discrepancy = abs(log_ratio) > 1  # 10-fold difference

        if large_discrepancy.any():
            n_discrepant = large_discrepancy.sum()
            logger.warning(f"Found {n_discrepant} rows with P-value discrepancies")

    return outdf

validate_statistical_consistency(df)

Validate statistical consistency (e.g., beta and SE relationship).

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame with statistical columns.

required

Returns:

Type Description
DataFrame

DataFrame with statistically consistent data.

Source code in credtools/preprocessing/munging/validation.py
def validate_statistical_consistency(df: pd.DataFrame) -> pd.DataFrame:
    """
    Validate statistical consistency (e.g., beta and SE relationship).

    Parameters
    ----------
    df : pd.DataFrame
        Input DataFrame with statistical columns.

    Returns
    -------
    pd.DataFrame
        DataFrame with statistically consistent data.
    """
    outdf = df.copy()

    # Check if we can compute Z-score from BETA and SE
    if all(col in outdf.columns for col in [ColName.BETA, ColName.SE]):
        # Remove rows where SE is 0 or negative
        mask = (outdf[ColName.SE] > 0) | outdf[ColName.SE].isna()
        removed = len(outdf) - mask.sum()
        if removed > 0:
            logger.debug(f"Removed {removed} rows with invalid SE values")
        outdf = outdf[mask]

    return outdf