Source code for svgbit.core.io

from __future__ import annotations

import gzip
from pathlib import Path

import anndata
import h5py
import numpy as np
import pandas as pd
import scipy.io
from .STDataset import STDataset


[docs]def load_10X(read_path, make_sparse=True) -> STDataset: """ Load 10X Genomics Space Ranger outputs and generate STDataset. Parameters ========== read_path : str or pathlib.Path A location points to 10X outs dir. Assume directories ``filtered_feature_bc_matrix`` and ``spatial`` are in this path. make_sparse : bool, default True Whether to use sparse DataFrame in order to save memory. Returns ======= dataset : STDataset A STDataset instance generated from read_path. """ read_path = Path(read_path) mat_dir = Path.joinpath(read_path, "filtered_feature_bc_matrix") mtx_path = Path.joinpath(mat_dir, "matrix.mtx.gz") features_path = Path.joinpath(mat_dir, "features.tsv.gz") barcodes_path = Path.joinpath(mat_dir, "barcodes.tsv.gz") position_path = Path.joinpath(read_path, "spatial", "tissue_positions_list.csv") gene_name = [] with gzip.open(features_path, "rt") as f: for line in f: line = line.strip() gene_name.append(line.split("\t")[1]) spot_name = [] with gzip.open(barcodes_path, "rt") as f: for line in f: line = line.strip() spot_name.append(line) count_df = pd.DataFrame.sparse.from_spmatrix(scipy.io.mmread(mtx_path), ).T count_df.index = spot_name count_df.columns = gene_name try: coor_df = pd.read_csv(position_path, index_col=0, header=None) spaceranger_version = "v1" except FileNotFoundError: position_path = Path.joinpath(read_path, "spatial", "tissue_positions.csv") coor_df = pd.read_csv(position_path, index_col=0, header=0) spaceranger_version = "v2" if spaceranger_version == "v1": array_coor = coor_df[[3, 2]] coor_df = coor_df[[5, 4]] elif spaceranger_version == "v2": array_coor = coor_df[["array_row", "array_col"]] coor_df = coor_df[["pxl_row_in_fullres", "pxl_col_in_fullres"]] coor_df.index.name = "barcode" coor_df.columns = ["X", "Y"] array_coor = array_coor.reindex(index=count_df.index) array_coor.columns = ["X", "Y"] coor_df = coor_df.reindex(index=count_df.index) if not make_sparse: count_df = count_df.sparse.to_dense() dataset = STDataset(count_df, coor_df, make_sparse=make_sparse) dataset._array_coordinate = array_coor return dataset
[docs]def load_anndata_h5(read_path, **kwargs) -> STDataset: """ Load anndata saved h5ad file and generate STDataset. .. note:: ``load_anndata_h5`` will try to use anndata.X as expression matrix. Parameters ========== read_path : str or pathlib.Path File name to read from. **kwargs Additional keyword arguments passed to anndata.read_h5ad Returns ======= dataset : STDataset A STDataset instance generated from read_path. """ adata = anndata.read_h5ad(read_path, **kwargs) count_df = pd.DataFrame.sparse.from_spmatrix( adata.X, index=adata.obs.index, columns=adata.var.index, ) coor_df = pd.DataFrame(adata.obsm["spatial"]) coor_df.index = count_df.index coor_df.columns = ["X", "Y"] if isinstance(count_df.iloc[0, 0], np.float32): count_df = count_df.astype(np.float64) dataset = STDataset(count_df, coor_df) return dataset
def load_table(read_path, **kwargs) -> STDataset: """ Load text file and generate STDataset. Support tables in following format: ======== === === ======== ============ geneID X Y counts spot_name (optional) ======== === === ======== ============ gene_1 1 1 1 spot_1 gene_2 1 1 2 spot_1 gene_1 1 2 1 spot_2 gene_3 1 2 3 spot_2 ... ... ... ... ... ======== === === ======== ============ Parameters ========== read_path : str or pathlib.Path File name to read from. **kwargs Additional keyword arguments passed to pd.read_csv Returns ======= dataset : STDataset A STDataset instance generated from read_dir. """ read_df = pd.read_csv(read_path, **kwargs) count_dict = {} coor_dict = {} for it in read_df.iterrows(): try: spot_name = it[1].iloc[3] except IndexError: spot_name = f"{it[1].iloc[0]}x{it[1].iloc[1]}" if spot_name not in coor_dict.keys(): coor_dict[spot_name] = {"X": it[1].iloc[0], "Y": it[1].iloc[1]} if spot_name not in count_dict.keys(): count_dict[spot_name] = {} if it[0] not in count_dict[spot_name].keys(): count_dict[spot_name][it[0]] = it[1][2] else: count_dict[spot_name][it[0]] += it[1][2] count_df = pd.DataFrame.from_dict(count_dict).fillna(0).T coor_df = pd.DataFrame.from_dict(coor_dict).T coor_df = coor_df.reindex(index=count_df.index) return STDataset(count_df, coor_df) def load_gef(read_path, slot="bin20") -> STDataset: """ Load gef file and generate STDataset. Parameters ========== read_path : str or pathlib.Path File name to read from. Returns ======= dataset : STDataset A STDataset instance generated from read_dir. """ count_dict = {} coor_dict = {} f = h5py.File(read_path) for record in f["geneExp"][slot]["gene"]: gene_symbol = record[0].decode("utf8") begin_line = record[1] end_line = record[1] + record[2] - 1 exp = f["geneExp"][slot]["expression"][begin_line:end_line] for line in exp: line = list(map(int, line)) spot_name = f"{line[0]}x{line[1]}" if spot_name not in coor_dict.keys(): coor_dict[spot_name] = {"X": line[0], "Y": line[1]} if spot_name not in count_dict.keys(): count_dict[spot_name] = {} if gene_symbol not in count_dict[spot_name].keys(): count_dict[spot_name][gene_symbol] = line[2] else: count_dict[spot_name][gene_symbol] += line[2] count_df = pd.DataFrame.from_dict(count_dict).fillna(0).T coor_df = pd.DataFrame.from_dict(coor_dict).T coor_df = coor_df.reindex(index=count_df.index) f.close() return STDataset(count_df, coor_df)