from __future__ import annotations
import warnings
from collections import Counter
from copy import deepcopy
from typing import Optional, Tuple, Union
from pathlib import Path
import numpy as np
import pandas as pd
from libpysal.weights import KNN
from libpysal.weights import W as libpysal_W
from . import cluster, density, moran
DataFrames = Union[pd.DataFrame, np.ndarray, Path, str]
[docs]class STDataset(object):
"""
STDataset: A meta class for discribing Spatial Transcriptomics data.
Parameters
==========
count_df : np.ndarray, pd.DataFrame, str or Path
Expression matrix for Spatial Transcriptomics Data. If ``str`` or ``Path``
is given, svgbit will try to read file with given path with pandas.
Default shape: (spot * gene)
coordinate_df : np.ndarray, pd.DataFrame, str or Path
Coordinates for Spatial Transcriptomics Data. If ``str`` or ``Path``
is given, svgbit will try to read file with given path with pandas.
Default shape: (spot * 2)
count_transpose : bool, default False
Whether to transpose count matrix.
coordinate_transpose : bool, default False
Whether to transpose coordinate dataframe.
count_df_kwargs : dict, default {}
Keyword arguments pass to ``pandas.read_csv`` if ``str`` or ``Path`` is
given to ``count_df``.
coordinate_df_kwargs : dict, default {}
Keyword arguments pass to ``pandas.read_csv`` if ``str`` or ``Path`` is
given to ``coordinate_df``.
make_sparse : bool, default True
Whether to use sparse DataFrame in order to save memory.
check_duplicate_genes : bool, default True
Whether to check duplicated gene names.
sort_spots : bool, default True
Whether to sort spots with spots' name.
"""
def __init__(
self,
count_df: DataFrames,
coordinate_df: DataFrames,
count_transpose: bool = False,
coordinate_transpose: bool = False,
count_df_kwargs: dict = {},
coordinate_df_kwargs: dict = {},
make_sparse: bool = True,
check_duplicate_genes: bool = True,
sort_spots: bool = True,
) -> None:
# attributes initial
self._count_df: Optional[pd.DataFrame] = None
self._coordinate_df: Optional[pd.DataFrame] = None
self._normalizer: Optional[str] = None
self._weight: Optional[libpysal_W] = None
self._weight_type: Tuple[Optional[str], Optional[str]] = (None, None)
self._hotspot_df: Optional[pd.DataFrame] = None
self._local_moran_i: Optional[pd.DataFrame] = None
self._local_moran_p: Optional[pd.DataFrame] = None
self._AI: Optional[pd.Series] = None
self._Di: Optional[pd.DataFrame] = None
self._svg_cluster: Optional[pd.Series] = None
self._spot_type: Optional[pd.DataFrame] = None
self._array_coordinate: Optional[pd.DataFrame] = None
# dataframes check
if isinstance(count_df, pd.DataFrame):
self._count_df = deepcopy(count_df)
elif isinstance(count_df, np.ndarray):
self._count_df = pd.DataFrame(count_df)
else:
self._count_df = pd.read_csv(count_df, **count_df_kwargs)
if count_transpose:
self._count_df = self._count_df.T
if isinstance(coordinate_df, pd.DataFrame):
self._coordinate_df = deepcopy(coordinate_df)
elif isinstance(coordinate_df, np.ndarray):
self._coordinate_df = pd.DataFrame(coordinate_df)
else:
self._coordinate_df = pd.read_csv(
coordinate_df,
**coordinate_df_kwargs,
)
if coordinate_transpose:
self._coordinate_df = self._coordinate_df.T
if sort_spots:
self._count_df.sort_index(inplace=True)
self._coordinate_df = self._coordinate_df.reindex(
index=self.count_df.index)
self._coordinate_df.columns = ["X", "Y"]
err = "Expression matrix and coordinate file have different number of spots."
assert self._count_df.shape[0] == self._coordinate_df.shape[0], err
err = "Spots' name mismatch!"
assert all(self._count_df.index == self._coordinate_df.index), err
self._count_df.fillna(0, inplace=True)
# Rename duplicated columns
if check_duplicate_genes:
genes = []
flag = 0
c = Counter(self._count_df.columns)
gene_suffix = {}
for gene_name in self._count_df.columns:
if c[gene_name] > 1:
flag = 1
if gene_name in gene_suffix:
gene_suffix[gene_name] += 1
gene_name += f".{gene_suffix[gene_name]}"
else:
gene_suffix[gene_name] = 0
genes.append(gene_name)
if flag:
self._count_df.columns = genes
print("Duplicated column names found. Auto rename.")
warnings.warn("Duplicated column names found. Auto rename.")
if make_sparse:
self.to_sparse()
def __repr__(self) -> str:
descr = f"STDataset with n_spots x n_genes = {self.n_spots} x {self.n_genes}"
descr = f"{descr}\nApplied normalizers: {self._normalizer}"
descr = f"{descr}\nAssigned attributes: "
flag = 0
for attr in ["weight", "hotspot_df", "AI", "svg_cluster"]:
if getattr(self, attr) is not None:
descr += f"{attr}, "
flag = 1
if flag:
descr = descr[:-2]
return descr
def __str__(self) -> str:
return self.__repr__()
def __getitem__(self, pos) -> STDataset:
"""
Return a sub STDataset instance with empty attributes.
"""
try:
count_sub = self.count_df.loc[pos]
except TypeError:
count_sub = self.count_df.iloc[pos]
coor_sub = self.coordinate_df.loc[count_sub.index, ]
return STDataset(
count_sub,
coor_sub,
check_duplicate_genes=False,
sort_spots=False,
make_sparse=pd.api.types.is_sparse(self.count_df.iloc[:, 1]),
)
def __del__(self) -> None:
del self._count_df
del self._coordinate_df
del self._normalizer
del self._hotspot_df
del self._weight
del self._weight_type
del self._local_moran_i
del self._local_moran_p
del self._AI
del self._Di
del self._svg_cluster
del self._spot_type
del self._array_coordinate
def to_dense(self) -> None:
"""Convert count_df with sparse values to dense."""
self._count_df = self._count_df.sparse.to_dense()
def to_sparse(self) -> None:
"""Convert count_df with dense values to sparse."""
is_int = [i == "int" for i in self.count_df.dtypes]
if all(is_int):
dt = "int64"
else:
dt = "float64"
self._count_df = self._count_df.astype(pd.SparseDtype(dt, 0))
[docs] def acquire_weight(self, k: int = 6, **kwargs) -> None:
"""
Acquire weight for analysis.
Parameters
==========
k : int, default 6
Number of nearest neighbors for KNN network.
**kwargs
Additional keyword arguments passed to the libpysal.weights.KNN call.
"""
self._weight = KNN(self._coordinate_df, k=k, **kwargs)
self._weight_type = ("KNN", str(k))
[docs] def acquire_hotspot(self, **kwargs) -> None:
"""
Acquire hotspot matrix.
Parameters
==========
**kwargs
Additional keyword arguments passed to local_moran call.
"""
if self._weight is None:
self.acquire_weight()
hotspot, i_value, p_value = moran.local_moran(
gene_expression_df=self.count_df,
weights=self.weight,
**kwargs,
)
self._hotspot_df = hotspot.reindex(
index=self.spots,
columns=self.genes,
)
self._local_moran_i = i_value.astype(pd.SparseDtype("float", 0))
self._local_moran_p = p_value.astype(pd.SparseDtype("float", 0))
[docs] def acquire_density(self, cores: int = density.cpu_count()) -> None:
"""
Acquire local Di and global AI value.
Parameters
==========
cores : int
Number of threads to run svgbit. Use all available cpus by default.
"""
if self._hotspot_df is None:
self.acquire_hotspot()
results = density.hotspot_AI(
hotspot_df=self._hotspot_df,
weight_df=-np.log(self._local_moran_p),
knn=self._weight,
cores=cores,
)
self._AI = results[0].reindex(index=self.genes)
self._Di = results[1].astype(pd.SparseDtype("float", 0)).reindex(
index=self.spots, columns=self.genes)
[docs] def find_clusters(
self,
n_svgs: int = 1000,
n_svg_clusters: int = 8,
threshold: float = 0.3,
) -> None:
"""
Find SVG clusters.
Parameters
==========
n_svgs : int, default 1000
Number of SVGs to find clusters.
n_svg_clusters : int, default 8
Number of SVG clusters to find.
threshold : float, dafault 0.3
min value to identify multiple svg clusters to spot.
"""
results = cluster.cluster(
self._hotspot_df,
self._AI,
n_svgs=n_svgs,
n_svg_clusters=n_svg_clusters,
threshold=threshold,
)
self._svg_cluster = results[0]
self._spot_type = results[1].reindex(index=self.spots)
@property
def count_df(self) -> pd.DataFrame:
"""Expression matrix."""
return self._count_df
@property
def coordinate_df(self) -> pd.DataFrame:
"""Coordinate information."""
return self._coordinate_df
@property
def n_spots(self) -> int:
"""Number of total spots."""
return self._count_df.shape[0]
@property
def spots(self) -> pd.Index:
"""An Index for spots' names."""
return self._count_df.index
@property
def n_genes(self) -> int:
"""Number of total genes."""
return self._count_df.shape[1]
@property
def genes(self) -> pd.Index:
"""An Index for genes' names."""
return self._count_df.columns
@property
def weight(self) -> libpysal_W:
"""Weight used by svgbit. Use KNN if not specified."""
return self._weight
@weight.setter
def weight(self, value: libpysal_W):
self._weight = value
self._weight_type = ("User specified weight", None)
@property
def weight_type(self) -> Tuple[Optional[str], Optional[str]]:
"""
What kind of weight is used. The second element indicates parameter k
used by KNN in default.
"""
return self._weight_type
@property
def hotspot_df(self) -> pd.DataFrame:
"""Hotspot matrix."""
return self._hotspot_df
@property
def AI(self) -> pd.Series:
"""A Series for AI value."""
return self._AI
@property
def Di(self) -> pd.DataFrame:
"""A DataFrame for local Di value."""
return self._Di
@property
def svg_cluster(self) -> pd.Series:
"""SVG cluster result."""
return self._svg_cluster
@property
def spot_type(self) -> pd.DataFrame:
"""A pd.DataFrame for spot type."""
return self._spot_type
if __name__ == "__main__":
pass