Source code for cas.file_utils

import csv
import json
import pathlib
from importlib import resources
from typing import Optional

import anndata
from cap_anndata import CapAnnDataDF
from cas_schema import schemas
from ruamel.yaml import YAML

from cas.model import CellTypeAnnotation


[docs] def read_json_file(file_path): """ Reads and parses a JSON file into a Python dictionary. Args: file_path (str): The path to the JSON file. Returns: dict: The JSON data as a Python dictionary. Returns None if the file does not exist or if there is an issue parsing the JSON content. Example: json_data = read_json_file('path/to/your/file.json') if json_data is not None: # Use the parsed JSON data as a dictionary print(json_data) """ try: with open(file_path, "r") as file: data = json.load(file) return data except (FileNotFoundError, json.JSONDecodeError) as e: print(f"Error reading JSON file: {e}") return None
[docs] def read_cas_json_file(file_path: str) -> CellTypeAnnotation: """ Reads and parses a JSON file into a CAS object. Args: file_path (str): The path to the JSON file. Returns: dict: The JSON data as a CAS object. """ return CellTypeAnnotation.from_dict(read_json_file(file_path))
[docs] def read_cas_from_anndata(anndata_path: str) -> CellTypeAnnotation: """ Reads the CAS json from the anndata uns and parses into a CAS object. Args: anndata_path: The path to the Anndata file. Returns: CellTypeAnnotation object. """ input_anndata = read_anndata_file(anndata_path) if input_anndata and "cas" in input_anndata.uns: return CellTypeAnnotation.from_dict(json.loads(input_anndata.uns["cas"])) else: raise Exception("Given Anndata file doesn't have a 'cas' object in it's uns.")
[docs] def write_json_file( cas: CellTypeAnnotation, out_file: str, print_undefined: bool = False ): """ Writes cell type annotation object to a json file. :param cas: cell type annotation object to serialize. :param out_file: output file path. :param print_undefined: prints null values to the output json if true. Omits undefined values from the json output if """ cas.set_exclude_none_values(not print_undefined) output_data = cas.to_json(indent=2) with open(out_file, "w") as out_file: out_file.write(output_data)
[docs] def write_dict_to_json_file(output_file_path: str, dictionary: dict): with open(output_file_path, "w") as json_file: json.dump(dictionary, json_file, indent=2)
[docs] def read_anndata_file(file_path: str) -> Optional[anndata.AnnData]: """Load anndata object from a file. Args: file_path: The path to the file containing the anndata object. Returns: The loaded anndata object if successful, else None. """ try: anndata_obj = anndata.read_h5ad(file_path, backed="r") return anndata_obj except Exception as e: print(f"An error occurred while loading the file: {e}") return None
[docs] def read_table_to_dict(table_path, id_column=0, generated_ids=False): """ Reads table file content into a dict. Key is the first column value and the value is dict representation of the Args: table_path: Path of the table file id_column: Id column becomes the key of the dict. This column should be unique. Default value is first column. generated_ids: If 'True', uses row number as the key of the dict. Initial key is 0. Returns: Function provides two return values: first; headers of the table and second; the TSV content dict. Key of the content is the first column value and the values are dict of row values. """ if table_path.endswith(".tsv"): return read_tsv_to_dict( table_path, id_column=id_column, generated_ids=generated_ids ) elif table_path.endswith(".csv"): return read_csv_to_dict( table_path, id_column=id_column, generated_ids=generated_ids ) else: raise Exception("Table file should be either tsv or csv file.")
[docs] def read_tsv_to_dict(tsv_path, id_column=0, generated_ids=False): """ Reads tsv file content into a dict. Key is the first column value and the value is dict representation of the row values (each header is a key and column value is the value). Args: tsv_path: Path of the TSV file id_column: Id column becomes the key of the dict. This column should be unique. Default value is first column. generated_ids: If 'True', uses row number as the key of the dict. Initial key is 0. Returns: Function provides two return values: first; headers of the table and second; the TSV content dict. Key of the content is the first column value and the values are dict of row values. """ return read_csv_to_dict( tsv_path, id_column=id_column, delimiter="\t", generated_ids=generated_ids )
[docs] def read_csv_to_dict( csv_path, id_column=0, id_column_name="", delimiter=",", id_to_lower=False, generated_ids=False, ): """ Reads tsv file content into a dict. Key is the first column value and the value is dict representation of the row values (each header is a key and column value is the value). Args: csv_path: Path of the CSV file id_column: Id column becomes the keys of the dict. This column should be unique. Default is the first column. id_column_name: Alternative to the numeric id_column, id_column_name specifies id_column by its header string. delimiter: Value delimiter. Default is comma. id_to_lower: applies string lowercase operation to the key generated_ids: If 'True', uses row number as the key of the dict. Initial key is 1. Returns: Function provides two return values: first; headers of the table and second; the CSV content dict. Key of the content is the first column value and the values are dict of row values. """ records = dict() headers = [] with open(csv_path) as fd: rd = csv.reader(fd, delimiter=delimiter, quotechar='"') row_count = 0 for row in rd: _id = str(row[id_column]).strip() if id_to_lower: _id = str(_id).strip().lower() if generated_ids: _id = row_count if row_count == 0: headers = [str(header).strip() for header in row] if id_column_name and id_column_name in headers: id_column = headers.index(id_column_name) else: row_object = dict() for column_num, column_value in enumerate(row): row_object[headers[column_num]] = column_value records[_id] = row_object row_count += 1 return headers, records
[docs] def read_json_config(file_path: str) -> dict: """ Reads the configuration object from the given path. :param file_path: path to the json file :return: configuration object (List of data column config items) """ with open(file_path, "r") as fs: try: return json.load(fs) except Exception as e: raise Exception("JSON read failed:" + file_path + " " + str(e))
[docs] def read_yaml_config(file_path: str) -> dict: """ Reads the configuration object from the given path. :param file_path: path to the yaml file :return: configuration object (List of data column config items) """ with open(file_path, "r") as fs: try: ryaml = YAML(typ="safe") return ryaml.load(fs) except Exception as e: raise Exception("Yaml read failed:" + file_path + " " + str(e))
[docs] def read_config(file_path: str) -> dict: """ Reads the configuration object from the given path. :param file_path: path to the configuration file :return: configuration object (List of data column config items) """ file_extension = pathlib.Path(file_path).suffix if file_extension == ".json": return read_json_config(file_path) elif file_extension == ".yaml" or file_extension == ".yml": return read_yaml_config(file_path) else: raise Exception( "Given configuration file extension is not supported. " "Try a json or yaml file instead of :" + file_path )
[docs] def update_obs(obs: CapAnnDataDF, data: dict): """ Updates the obs with data dict. Args: obs: Dataset representing the obs field in the AnnData file. data: Dictionary containing flattened data. """ for key, value in data.items(): if key in obs: obs.remove_column(key) obs[key] = value.values
[docs] def update_uns(uns: CapAnnDataDF, data: dict): """ Updates the uns with data dict. Args: uns: The HDF5 group to write data to. data: Dictionary containing the data to be written. """ for key, value in data.items(): if isinstance(value, list) and all(isinstance(item, str) for item in value): uns[key] = ", ".join(sorted(value)) else: uns[key] = value
[docs] def get_cas_schema_names() -> dict: """ Returns the list of available CAS schema names. Returns: dict: The available CAS schema names. """ return { "base": "general_schema.json", "cap": "CAP_schema.json", "bican": "BICAN_schema.json", }
[docs] def get_cas_schema(schema_name: Optional[str] = "base") -> dict: """ Reads the schema file from the CAS module and returns as a dictionary. Args: schema_name: The name of the schema to be returned. Default is 'base'. Returns: dict: The schema as a dictionary. """ if not schema_name: schema_name = "base" schema_name = schema_name.strip().lower() if schema_name not in get_cas_schema_names(): raise ValueError( "Schema name should be one of: " + ", ".join(get_cas_schema_names().keys()) ) schema_file = resources.files(schemas) / get_cas_schema_names()[schema_name] with schema_file.open("rt") as f: schema = json.loads(f.read()) return schema