Shortcuts

Source code for slideflow.stats.slidemap

from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union

import os
import pickle
import numpy as np
import pandas as pd
import slideflow as sf
import warnings
from os.path import join, exists, isdir
from pandas.core.frame import DataFrame
from sklearn.cluster import KMeans
from slideflow import errors
from slideflow.stats import stats_utils
from slideflow.util import log

if TYPE_CHECKING:
    import umap
    from matplotlib.axes import Axes
    from matplotlib.figure import Figure
    from slideflow.model import DatasetFeatures


[docs]class SlideMap: """Two-dimensional slide map for visualization & backend for mosaic maps. Slides are mapped in 2D either explicitly with pre-specified coordinates, or with dimensionality reduction from post-convolutional layer weights, provided from :class:`slideflow.DatasetFeatures`. """ def __init__( self, *, parametric_umap: bool = False ) -> None: """Backend for mapping slides into two dimensional space. Can use a DatasetFeatures object to map slides according to UMAP of features, or map according to pre-specified coordinates. Can be initialized with three methods: from precalculated X/Y coordinates, from a DatasetFeatures object, or from a saved map. Examples Build a SlideMap from a DatasetFeatures object .. code-block:: python dts_ftrs = sf.DatasetFeatures(model, dataset) slidemap = sf.SlideMap.from_features(dts_ftrs) Build a SlideMap from prespecified coordinates .. code-block:: python x = np.array(...) y = np.array(...) slides = ['slide1', 'slide1', 'slide5', ...] slidemap = sf.SlideMap.from_xy( x=x, y=y, slides=slides ) Load a saved SlideMap .. code-block:: python slidemap = sf.SlideMap.load('map.parquet') Args: slides (list(str)): List of slide names """ assert isinstance(parametric_umap, bool), "Expected <bool> for argument 'parametric_umap'" self.data = None # type: DataFrame self.ftrs = None # type: Optional[DatasetFeatures] self.slides = None # type: List[str] self.tfrecords = None # type: List[str] self.parametric_umap = parametric_umap self._umap_normalized_range = None self.map_meta = {} # type: Dict[str, Any] @classmethod def load(cls, path: str): """Load a previously saved SlideMap (UMAP and coordinates). Loads a ``SlideMap`` previously saved with ``SlideMap.save()``. Expects a directory with ``slidemap.parquet``, ``range_clip.npz``, and either ``umap.pkl`` (non-parametric models) or a folder named ``parametric_model``. Examples Save a SlideMap, then load it. .. code-block:: python slidemap.save('/directory/') new_slidemap = sf.SlideMap.load('/directory/') Args: path (str): Directory from which to load a previously saved UMAP. """ log.debug(f"Loading SlideMap from {path}") obj = cls() if isdir(path): # Load coordinates if exists(join(path, 'slidemap.parquet')): obj.load_coordinates(join(path, 'slidemap.parquet')) else: log.warn("Could not find slidemap.parquet; no data loaded.") # Load UMAP if exists(join(path, 'parametric_model')): obj.parametric_umap = True obj.load_umap(path) elif exists(join(path, 'umap.pkl')): obj.load_umap(join(path, 'umap.pkl')) else: log.warn(f"Could not find a valid umap model at {path}. Ensure " "the path is a valid directory with either 'parametric_umap' " "subdirectory or a valid 'umap.pkl'.") # Load range/clip try: obj.load_range_clip(path) except FileNotFoundError: log.warn("Could not find range_clip.npz; results from " "umap_transform() will not be normalized.") if exists(join(path, 'tfrecords.json')): obj.tfrecords = sf.util.load_json(join(path, 'tfrecords.json')) elif path.endswith('.parquet'): obj.load_coordinates(path) else: raise ValueError( f"Unable to determine how to load {path}. Expected " "a path to a directory, or a slidemap.parquet file." ) obj.slides = obj.data.slide.unique() return obj @classmethod def from_xy( cls, x: Union[np.ndarray, List[int], str], y: Union[np.ndarray, List[int], str], slides: Union[np.ndarray, List[str], str], tfr_index: Union[np.ndarray, List[int], str], data: Optional[DataFrame] = None, parametric_umap: bool = False, cache: Optional[str] = None ) -> "SlideMap": """Initializes map from precalculated (x, y) coordinates. Args: slides (list(str)): List of slide names. x (list(int)): X coordinates for each point on the map. Can either be a list of int, or the name of a column in the DataFrame provided to the argument 'data'. y (list(int)): Y coordinates for tfrecords. Can either be a list of int, or the name of a column in the DataFrame provided to the argument 'data'. slides (list(str)): Slide names for each point on the map. Can either be a list of str, or the name of a column in the DataFrame provided to the argument 'data'. tfr_index (list(int)): TFRecord indicies for each point on the map. Can either be a list of int, or the name of a column in the DataFrame provided to the argument 'data'. data (DataFrame, optional): Optional DataFrame which can be used to supply the 'x', 'y', 'slides', and 'tfr_index' data. cache (str, optional): Deprecated """ if cache is not None: warnings.warn( 'Argument "cache" is deprecated for SlideMap. ' 'Instead of using/recalculating SlideMaps with cache, manually ' 'save and load maps with SlideMap.save() and SlideMap.load()', DeprecationWarning ) # Read and verify provided input cols = {'x': x, 'y': y, 'slides': slides, 'tfr_index': tfr_index} for col, col_val in cols.items(): if isinstance(col_val, str) and data is None: raise ValueError( f"Could not interpret input {col_val} for arg {col}. " "Did you mean to supply a DataFrame via 'data'?") elif data is not None: if isinstance(col_val, str) and col_val not in data.columns: raise ValueError(f"Could not find column {col_val}.") elif isinstance(col_val, str): cols[col] = data[col_val].values else: cols[col] = col_val # Verify lengths of provided input if not all(len(cols[c]) == len(cols['x']) for c in cols): raise ValueError( "Length of x, y, slides, and tfr_index must all be equal." ) obj_data = pd.DataFrame({ 'x': pd.Series(cols['x']), 'y': pd.Series(cols['y']), 'slide': pd.Series(cols['slides']), 'tfr_index': pd.Series(cols['tfr_index']) }) obj = cls() obj.slides = obj_data.slide.unique() obj.data = obj_data obj.parametric_umap = parametric_umap return obj @classmethod def from_features( cls, ftrs: "DatasetFeatures", *, exclude_slides: Optional[List[str]] = None, map_slide: Optional[str] = None, parametric_umap: bool = False, umap_dim: int = 2, umap: Optional[Any] = None, recalculate: Optional[bool] = None, # Deprecated cache: Optional[str] = None, # Deprecated **umap_kwargs: Any ) -> "SlideMap": """Initializes map from dataset features. Args: ftrs (:class:`slideflow.DatasetFeatures`): DatasetFeatures. exclude_slides (list, optional): List of slides to exclude. map_slide (str, optional): Either None, 'centroid', or 'average'. If None, will map all tiles from each slide. Defaults to None. umap_dim (int, optional): Number of dimensions for UMAP. Defaults to 2. umap (umap.UMAP, optional): Fit UMAP, to be used instead of fitting a new UMAP. cache (str, optional): Deprecated. recalculate (bool, optional): Deprecated """ if recalculate or cache: warnings.warn( 'Arguments "recalculate" and "cache" are deprecated for SlideMap. ' 'Instead of using/recalculating SlideMaps with cache, manually ' 'save and load maps with SlideMap.save() and SlideMap.load()', DeprecationWarning ) if map_slide is not None and map_slide not in ('centroid', 'average'): raise errors.SlideMapError( "map_slide must be None, 'centroid' or 'average', (got " f"{map_slide})" ) if not exclude_slides: slides = ftrs.slides else: slides = [s for s in ftrs.slides if s not in exclude_slides] obj = cls() obj.slides = slides obj.ftrs = ftrs obj.umap = umap # type: ignore obj.parametric_umap = parametric_umap if map_slide: obj._calculate_from_slides( method=map_slide, **umap_kwargs ) else: obj._calculate_from_tiles( dim=umap_dim, **umap_kwargs ) return obj @classmethod def from_precalculated(cls, *args, **kwargs) -> "SlideMap": """Deprecated class initializer.""" warnings.warn( "sf.SlideMap.from_precalculated() deprecated. Please use " "sf.SlideMap.from_xy() instead.", DeprecationWarning ) return cls.from_xy(*args, **kwargs) @property def x(self): """X coordinates of map.""" return self.data.x.values @property def y(self): """Y coordinates of map.""" return self.data.y.values def _calculate_from_tiles( self, **umap_kwargs: Any ) -> None: """Internal function to guide calculation of UMAP from final layer features / activations, as provided by DatasetFeatures. Keyword Args: dim (int): Number of dimensions for UMAP. Defaults to 2. n_neighbors (int): Number of neighbors for UMAP. Defaults to 50. min_dist (float): Minimum distance for UMAP. Defaults to 0.1. metric (str): UMAP metric. Defaults to 'cosine'. **umap_kwargs (optional): Additional keyword arguments for the UMAP function. """ assert self.ftrs is not None # Calculate UMAP node_activations = np.concatenate([ self.ftrs.activations[slide] for slide in self.slides ]) self.map_meta['num_features'] = self.ftrs.num_features log.info("Calculating UMAP...") coordinates = self.umap_transform(node_activations, **umap_kwargs) # Assemble dataframe tfrecord_indices = np.concatenate([ np.arange(self.ftrs.activations[slide].shape[0]) for slide in self.slides ]) slides = np.array([ slide for slide in self.slides for _ in range(self.ftrs.activations[slide].shape[0]) ]) data_dict = { 'slide': pd.Series(slides), 'x': pd.Series(coordinates[:, 0]), 'tfr_index': pd.Series(tfrecord_indices), } if self.ftrs.locations: locations = np.concatenate([ self.ftrs.locations[slide] for slide in self.slides ]) data_dict['location'] = pd.Series([l for l in locations]).astype(object) if self.ftrs.predictions and isinstance(self.ftrs, sf.DatasetFeatures): predictions = np.concatenate([ self.ftrs.predictions[slide] for slide in self.slides ]) data_dict.update({ 'predicted_class': pd.Series(np.argmax(predictions, axis=1)), 'predictions': pd.Series([l for l in predictions]).astype(object), }) if self.ftrs.uq and self.ftrs.uncertainty != {}: # type: ignore uncertainty = np.concatenate([ self.ftrs.uncertainty[slide] for slide in self.slides ]) data_dict.update({ 'uncertainty': pd.Series( [u for u in uncertainty] ).astype(object) }) if 'dim' not in umap_kwargs or umap_kwargs['dim'] > 1: data_dict.update({ 'y': pd.Series(coordinates[:, 1]), }) self.data = pd.DataFrame(data_dict) def _calculate_from_slides( self, method: str = 'centroid', **umap_kwargs: Any ) -> None: """ Internal function to guide calculation of UMAP from final layer activations for each tile, as provided via DatasetFeatures, and then map only the centroid tile for each slide. Args: method (str, optional): 'centroid' or 'average'. If centroid, will calculate UMAP only from centroid tiles for each slide. If average, will calculate UMAP based on average node activations across all tiles within the slide, then display the centroid tile for each slide. Keyword Args: dim (int): Number of dimensions for UMAP. Defaults to 2. n_neighbors (int): Number of neighbors for UMAP. Defaults to 50. min_dist (float): Minimum distance for UMAP. Defaults to 0.1. metric (str): UMAP metric. Defaults to 'cosine'. **umap_kwargs (optional): Additional keyword arguments for the UMAP function. """ if method not in ('centroid', 'average'): _m = f'Method must be either "centroid" or "average", not {method}' raise errors.SlideMapError(_m) assert self.ftrs is not None # Calculate optimal slide indices and centroid activations log.info("Calculating centroid indices...") opt_idx, centroid_activations = stats_utils.calculate_centroid(self.ftrs.activations) # Restrict mosaic to only slides that had enough tiles to calculate # an optimal index from centroid successful_slides = list(opt_idx.keys()) num_warned = 0 for slide in self.ftrs.slides: if slide not in successful_slides: log.debug(f"No centroid for [green]{slide}[/]; skipping") if num_warned: log.warning(f"No centroid for {num_warned} slides.") log.info(f"Calculating UMAP from slide-level {method}...") if method == 'centroid': umap_input = np.array([ centroid_activations[slide] for slide in self.slides ]) elif method == 'average': umap_input = np.array([ np.mean(self.ftrs.activations[slide], axis=0) for slide in self.slides ]) # Calculate UMAP coordinates = self.umap_transform( umap_input, **umap_kwargs ) # Create dataframe locations = np.stack([ self.ftrs.locations[slide][opt_idx[slide]] for slide in self.slides ]) data_dict = { 'slide': pd.Series(self.slides), 'x': pd.Series(coordinates[:, 0]), 'tfr_index': pd.Series(opt_idx[slide] for slide in self.slides), 'location': pd.Series([l for l in locations]).astype(object) } if self.ftrs.predictions: predictions = np.stack([ self.ftrs.predictions[slide][opt_idx[slide]] for slide in self.slides ]) data_dict.update({ 'predictions': pd.Series([l for l in predictions]).astype(object), 'predicted_class': pd.Series(np.argmax(predictions, axis=1)), }) if self.ftrs.uq and self.ftrs.uncertainty != {}: # type: ignore uncertainty = np.stack([ self.ftrs.uncertainty[slide][opt_idx[slide]] for slide in self.slides ]) data_dict.update({ 'uncertainty': pd.Series( [u for u in uncertainty] ).astype(object) }) if 'dim' not in umap_kwargs or umap_kwargs['dim'] > 1: data_dict.update({ 'y': pd.Series(coordinates[:, 1]), }) self.data = pd.DataFrame(data_dict) def activations(self) -> np.ndarray: """Return associated DatasetFeatures activations as a numpy array corresponding to the points on this SlideMap.""" if self.ftrs is None: raise ValueError( "No associated DatasetFeatures object for reading activations." ) return np.array([ self.ftrs.activations[row.slide][row.tfr_index] for row in self.data.itertuples() ]) def build_mosaic( self, tfrecords: Optional[List[str]] = None, **kwargs ) -> "sf.Mosaic": """Build a mosaic map. Args: tfrecords (list(str), optional): List of tfrecord paths. If SlideMap was created using DatasetFeatures, this argument is not required. Keyword args: num_tiles_x (int, optional): Mosaic map grid size. Defaults to 50. tile_select (str, optional): 'first', 'nearest', or 'centroid'. Determines how to choose a tile for display on each grid space. If 'first', will display the first valid tile in a grid space (fastest; recommended). If 'nearest', will display tile nearest to center of grid space. If 'centroid', for each grid, will calculate which tile is nearest to centroid tile_meta. Defaults to 'nearest'. tile_meta (dict, optional): Tile metadata, used for tile_select. Dictionary should have slide names as keys, mapped to list of metadata (length of list = number of tiles in slide). Defaults to None. normalizer ((str or :class:`slideflow.norm.StainNormalizer`), optional): Normalization strategy to use on image tiles. Defaults to None. normalizer_source (str, optional): Stain normalization preset or path to a source image. Valid presets include 'v1', 'v2', and 'v3'. If None, will use the default present ('v3'). Defaults to None. """ if self.ftrs is None and tfrecords is None: raise ValueError( "If SlideMap was not created using DatasetFeatures, then the " "`tfrecords` argument (list of TFRecord paths) must be supplied " "to `SlideMap.build_mosaic()`" ) elif ((self.ftrs is not None and not len(self.ftrs.tfrecords)) and tfrecords is None): raise ValueError( "The DatasetFeatures object used to create this SlideMap " "did not have paths to TFRecords stored. Please supply a list " "of TFRecord paths to the `tfrecords` argument " "of `SlideMap.build_mosaic()`" ) elif (tfrecords is None and self.ftrs is not None and len(self.ftrs.tfrecords)): return sf.Mosaic(self, tfrecords=self.ftrs.tfrecords, **kwargs) else: return sf.Mosaic(self, tfrecords=tfrecords, **kwargs) def cluster(self, n_clusters: int) -> None: """Performs K-means clustering on data and adds to metadata labels. Clusters are saved to self.data['cluster']. Requires that SlideMap was generated via DatasetFeatures. Examples Perform K-means clustering and apply cluster labels. slidemap.cluster(n_clusters=5) slidemap.plot() Args: n_clusters (int): Number of clusters for K means clustering. """ if self.ftrs is None: raise errors.SlideMapError( "Unable to cluster; no DatasetFeatures provided" ) activations = [ self.ftrs.activations[row.slide][row.tfr_index] for row in self.data.itertuples() ] log.info(f"Calculating K-means clustering (n={n_clusters})") kmeans = KMeans(n_clusters=n_clusters, n_init=10).fit(activations) self.data['cluster'] = kmeans.labels_ self.label('cluster') def neighbors( self, slide_categories: Optional[Dict] = None, algorithm: str = 'kd_tree', method: str = 'map', pca_dim: int = 100 ) -> None: """Calculates neighbors among tiles in this map, assigning neighboring statistics to tile metadata 'num_unique_neighbors' and 'percent_matching_categories'. Args: slide_categories (dict, optional): Maps slides to categories. Defaults to None. If provided, will be used to calculate 'percent_matching_categories' statistic. algorithm (str, optional): NearestNeighbor algorithm, either 'kd_tree', 'ball_tree', or 'brute'. Defaults to 'kd_tree'. method (str, optional): Either 'map', 'pca', or 'features'. How neighbors are determined. If 'map', calculates neighbors based on UMAP coordinates. If 'features', calculates neighbors on the full feature space. If 'pca', reduces features into `pca_dim` space. Defaults to 'map'. """ from sklearn.decomposition import PCA from sklearn.neighbors import NearestNeighbors if self.ftrs is None: raise errors.SlideMapError( "Unable perform neighbor search; no DatasetFeatures provided" ) log.info(f"Initializing neighbor search (method={method})...") if method == 'map': X = np.stack((self.data.x.values, self.data.y.values), axis=-1) elif method == 'features': X = self.activations() elif method == 'pca': log.info(f"Reducing dimensionality with PCA (dim={pca_dim})...") pca = PCA(n_components=pca_dim) features = self.activations() pca.fit(features) X = pca.transform(features) else: raise ValueError(f'Unknown neighbor method {method}.') nbrs = NearestNeighbors( n_neighbors=100, algorithm=algorithm, n_jobs=-1 ).fit(X) log.info("Calculating nearest neighbors...") _, indices = nbrs.kneighbors(X) def num_category_matching(idx_list, idx): list_cat = np.array([ slide_categories[self.data.loc[_i].slide] for _i in idx_list ]) idx_cat = slide_categories[self.data.loc[idx].slide] return (list_cat == idx_cat).sum() log.info('Matching neighbors...') #TODO: accelerate this step with multiprocessing self.data['num_unique_neighbors'] = [ len(self.data.loc[ind].slide.unique()) for ind in indices ] if slide_categories: self.data['percent_matching_categories'] = [ num_category_matching(ind, i) / len(ind) for i, ind in enumerate(indices) ] def filter(self, slides: List[str]) -> None: """Filters map to only show tiles from the given slides. Args: slides (list(str)): List of slide names. """ self.data = self.data.loc[self.data.slide.isin(slides)] def umap_transform( self, array: np.ndarray, *, dim: int = 2, n_neighbors: int = 50, min_dist: float = 0.1, metric: str = 'cosine', **kwargs: Any ) -> np.ndarray: """Transforms a given array using UMAP projection. If a UMAP has not yet been fit, this will fit a new UMAP on the given data. Args: array (np.ndarray): Array to transform with UMAP dimensionality reduction. Keyword Args: dim (int, optional): Number of dimensions for UMAP. Defaults to 2. n_neighbors (int, optional): Number of neighbors for UMAP algorithm. Defaults to 50. min_dist (float, optional): Minimum distance argument for UMAP algorithm. Defaults to 0.1. metric (str, optional): Metric for UMAP algorithm. Defaults to 'cosine'. **kwargs (optional): Additional keyword arguments for the UMAP function. """ import umap # Imported in this function due to long import time if not len(array): raise errors.StatsError("Unable to perform UMAP on empty array.") if self.umap is None: # type: ignore fn = umap.UMAP if not self.parametric_umap else umap.ParametricUMAP self.umap = fn( n_components=dim, verbose=(sf.getLoggingLevel() <= 20), n_neighbors=n_neighbors, min_dist=min_dist, metric=metric, **kwargs ) layout = self.umap.fit_transform(array) # type: ignore (normalized, self._umap_normalized_range, self._umap_normalized_clip) = stats_utils.normalize_layout(layout) else: layout = self.umap.transform(array) # type: ignore if self._umap_normalized_range is not None: normalized = stats_utils.normalize( layout, norm_range=self._umap_normalized_range, norm_clip=self._umap_normalized_clip) else: log.info("No range/clip information available; unable to " "normalize UMAP output.") return layout return normalized def label_by_uncertainty(self, index: int = 0) -> None: """Labels each point with the tile-level uncertainty, if available. Args: index (int, optional): Uncertainty index. Defaults to 0. """ if 'label' in self.data.columns: self.data.drop(columns='label', inplace=True) if self.ftrs is None: raise errors.SlideMapError("DatasetFeatures not provided.") if not self.ftrs.uq or self.ftrs.uncertainty == {}: # type: ignore raise errors.DatasetError( 'Unable to label by uncertainty; UQ estimates not available.' ) else: uq_labels = np.stack(self.data['uncertainty'].values)[:, index] self.data['label'] = uq_labels def label_by_preds(self, index: int) -> None: """Displays each point with label equal to the prediction value (from 0-1) Args: index (int): Logit index. """ if 'label' in self.data.columns: self.data.drop(columns='label', inplace=True) self.data['label'] = np.stack(self.data['predictions'].values)[:, index] def label_by_slide(self, slide_labels: Optional[Dict] = None) -> None: """Displays each point as the name of the corresponding slide. If slide_labels is provided, will use this dict to label slides. Args: slide_labels (dict, optional): Dict mapping slide names to labels. """ if 'label' in self.data.columns: self.data.drop(columns='label', inplace=True) if slide_labels: self.data['label'] = self.data.slide.map(slide_labels) else: self.data['label'] = self.data.slide.values def label(self, meta: str, translate: Optional[Dict] = None) -> None: """Displays each point labeled by tile metadata (e.g. 'predicted_class') Args: meta (str): Data column from which to assign labels. translate (dict, optional): If provided, will translate the read metadata through this dictionary. """ if 'label' in self.data.columns: self.data.drop(columns='label', inplace=True) self.data['label'] = self.data[meta].values if translate: self.data['label'] = self.data['label'].map(translate) def plot( self, subsample: Optional[int] = None, title: Optional[str] = None, cmap: Optional[Dict] = None, xlim: Tuple[float, float] = (-0.05, 1.05), ylim: Tuple[float, float] = (-0.05, 1.05), xlabel: Optional[str] = None, ylabel: Optional[str] = None, legend: Optional[str] = None, ax: Optional["Axes"] = None, loc: Optional[str] = 'center right', ncol: Optional[int] = 1, categorical: Union[str, bool] = 'auto', legend_kwargs: Optional[Dict] = None, **scatter_kwargs: Any, ) -> None: """Plots calculated map. Args: subsample (int, optional): Subsample to only include this many tiles on plot. Defaults to None. title (str, optional): Title for plot. cmap (dict, optional): Dict mapping labels to colors. xlim (list, optional): List of float indicating limit for x-axis. Defaults to (-0.05, 1.05). ylim (list, optional): List of float indicating limit for y-axis. Defaults to (-0.05, 1.05). xlabel (str, optional): Label for x axis. Defaults to None. ylabel (str, optional): Label for y axis. Defaults to None. legend (str, optional): Title for legend. Defaults to None. ax (matplotlib.axes.Axes, optional): Figure axis. If not supplied, will prepare a new figure axis. loc (str, optional): Location for legend, as defined by matplotlib.axes.Axes.legend(). Defaults to 'center right'. ncol (int, optional): Number of columns in legend, as defined by matplotlib.axes.Axes.legend(). Defaults to 1. categorical (str, optional): Specify whether labels are categorical. Determines the colormap. Defaults to 'auto' (will attempt to automatically determine from the labels). legend_kwargs (dict, optional): Dictionary of additional keyword arguments to the matplotlib.axes.Axes.legend() function. **scatter_kwargs (optional): Additional keyword arguments to the seaborn scatterplot function. """ import seaborn as sns import matplotlib.pyplot as plt if legend_kwargs is None: legend_kwargs = dict() # Make plot if ax is None: fig = plt.figure(figsize=(6, 4.5)) ax = fig.add_subplot(111) # Subsampling if subsample: plot_df = self.data.sample(subsample) else: plot_df = self.data x = plot_df.x y = plot_df.y if 'label' in self.data.columns: labels = plot_df.label # Check for categorical labels if (categorical is True or not pd.to_numeric(labels, errors='coerce').notnull().all()): log.debug("Interpreting labels as categorical") scatter_kwargs.update( dict(hue=labels.astype('category')) ) unique = list(labels.unique()) try: unique.sort() except TypeError: log.error( "Unable to sort categories; are some values NaN?" ) if len(unique) >= 12: sns_pal = sns.color_palette("Paired", len(unique)) else: sns_pal = sns.color_palette('hls', len(unique)) if cmap is None: cmap = {unique[i]: sns_pal[i] for i in range(len(unique))} else: log.debug("Interpreting labels as continuous") scatter_kwargs.update(dict(hue=labels)) umap_2d = sns.scatterplot( x=x, y=y, palette=cmap, ax=ax, **scatter_kwargs ) ax.set_ylim(*((None, None) if not ylim else ylim)) ax.set_xlim(*((None, None) if not xlim else xlim)) if 'hue' in scatter_kwargs: ax.legend( loc=loc, ncol=ncol, title=legend, **legend_kwargs ) umap_2d.set(xlabel=xlabel, ylabel=ylabel) if title: ax.set_title(title) def plot_3d( self, z: Optional[np.ndarray] = None, feature: Optional[int] = None, subsample: Optional[int] = None, fig: Optional["Figure"] = None, ) -> None: """Saves a plot of a 3D umap, with the 3rd dimension representing values provided by argument "z". Args: z (list, optional): Values for z axis. Must supply z or feature. Defaults to None. feature (int, optional): Int, feature to plot on 3rd axis. Must supply z or feature. Defaults to None. subsample (int, optional): Subsample to only include this many tiles on plot. Defaults to None. fig (matplotlib.figure.Figure, optional): Figure. If not supplied, will prepare a new figure. """ import matplotlib.pyplot as plt from mpl_toolkits.mplot3d import Axes3D if fig is None: fig = plt.figure() title = f"UMAP with feature {feature} focus" if self.ftrs is None: raise errors.SlideMapError("DatasetFeatures not provided.") if (z is None) and (feature is None): raise errors.SlideMapError("Must supply either 'z' or 'feature'.") # Subsampling if subsample: plot_df = self.data.sample(subsample) else: plot_df = self.data # Get feature activations for 3rd dimension if z is None: z = np.array([ self.ftrs.activations[row.slide][row.tfr_index][feature] for row in plot_df.itertuples() ]) # Plot tiles on a 3D coordinate space with 2 coordinates from UMAP # and 3rd from the value of the excluded feature ax = Axes3D(fig, auto_add_to_figure=False) fig.add_axes(ax) scatter_kw = dict(c=z, cmap='viridis', linewidth=0.5, edgecolor="k") ax.scatter(plot_df.x, plot_df.y, z, **scatter_kw) ax.set_title(title) def save( self, path: str, dpi: int = 300, **kwargs, ): """Save UMAP, plot, coordinates, and normalization values to a directory. The UMAP, plot, coordinates, and normalization values can all be loaded from this directory after saving with ``sf.SlideMap.load(path)``. Args: path (str): Directory in which to save the plot and UMAP. The UMAP image will be saved with the filename "slidemap.png". dpi (int, optional): DPI for final image. Defaults to 300. Keyword args: subsample (int, optional): Subsample to only include this many tiles on plot. Defaults to None. title (str, optional): Title for plot. cmap (dict, optional): Dict mapping labels to colors. xlim (list, optional): List of float indicating limit for x-axis. Defaults to (-0.05, 1.05). ylim (list, optional): List of float indicating limit for y-axis. Defaults to (-0.05, 1.05). xlabel (str, optional): Label for x axis. Defaults to None. ylabel (str, optional): Label for y axis. Defaults to None. legend (str, optional): Title for legend. Defaults to None. **scatter_kwargs (optional): Additional keyword arguments to the seaborn scatterplot function. """ if not exists(path): os.makedirs(path) if path.endswith('.png') or path.endswith('.jpg') or path.endswith('.jpeg'): log.warning( "Path provided to `SlideMap.save()` is a file name, " "not a directory. Will save the figure plot to this location, " "but will not save the associated UMAP. To save both plot and " "UMAP, provide a path to a directory instead." ) self.save_plot(path, dpi=dpi, **kwargs) else: self.save_plot(join(path, "slidemap.png"), dpi=dpi, **kwargs) if self.umap is not None: self.save_umap(path) def save_plot( self, filename: str, dpi: int = 300, **kwargs ): """Save plot of slide map. Args: filename (str): File path to save the image. dpi (int, optional): DPI for final image. Defaults to 300. Keyword args: subsample (int, optional): Subsample to only include this many tiles on plot. Defaults to None. title (str, optional): Title for plot. cmap (dict, optional): Dict mapping labels to colors. xlim (list, optional): List of float indicating limit for x-axis. Defaults to (-0.05, 1.05). ylim (list, optional): List of float indicating limit for y-axis. Defaults to (-0.05, 1.05). xlabel (str, optional): Label for x axis. Defaults to None. ylabel (str, optional): Label for y axis. Defaults to None. legend (str, optional): Title for legend. Defaults to None. **scatter_kwargs (optional): Additional keyword arguments to the seaborn scatterplot function. """ import matplotlib.pyplot as plt with sf.util.matplotlib_backend('Agg'): self.plot(**kwargs) plt.savefig(filename, bbox_inches='tight', dpi=dpi) plt.close() log.info(f"Saved 2D UMAP to [green]{filename}") def save_3d( self, filename: str, dpi: int = 300, **kwargs ): """Save 3D plot of slide map. Args: filename (str): _description_ dpi (int, optional): _description_. Defaults to 300. Keyword args: z (list, optional): Values for z axis. Must supply z or feature. Defaults to None. feature (int, optional): Int, feature to plot on 3rd axis. Must supply z or feature. Defaults to None. subsample (int, optional): Subsample to only include this many tiles on plot. Defaults to None. """ import matplotlib.pyplot as plt with sf.util.matplotlib_backend('Agg'): self.plot_3d(**kwargs) plt.savefig(filename, bbox_inches='tight', dpi=dpi) plt.close() log.info(f"Saved 3D UMAP to [green]{filename}") def save_coordinates(self, path: str) -> None: """Save coordinates only to parquet file. Args: path (str, optional): Save coordinates to this location. """ self.data.to_parquet(path) log.info(f"Wrote slide map coordinates to [green]{path}") def save_umap(self, path: str) -> None: """Save UMAP, coordinates, and normalization information to a directory. Args: path (str, optional): Save UMAP and coordinates to this directory. Coordinates will be saved in this directory with the filename ``slidemap.parquet`` Model will be saved as umap.pkl (parametric) or model.pkl (parametric). """ if self.parametric_umap: self.umap.save(path) else: with open(join(path, 'umap.pkl'), 'wb') as f: pickle.dump(self.umap, f) log.info(f"Wrote UMAP coordinates to [green]{path}") self.save_coordinates(join(path, 'slidemap.parquet')) self.save_range_clip(path) def save_encoder(self, path: str) -> None: """Save Parametric UMAP encoder only.""" if not self.parametric_umap: raise ValueError("SlideMap not built with Parametric UMAP.") self.umap.encoder.save(join(path, 'encoder')) self.save_coordinates(join(path, 'slidemap.parquet')) self.save_range_clip(path) def save_range_clip(self, dest: str) -> None: """Save range/clip information. If ZIP saving is enabled, will save to range_clip.npz, with the attributes ``"range"`` and ``"clip"``. If ZIP saving is disabled (SF_ALLOW_ZIP=0, for databricks compatibility), will save these attributes to range.npy and clip.npy, separately. Args: dest (str): Destination directory. """ if sf.util.zip_allowed(): np.savez( dest + 'range_clip.npz', range=self._umap_normalized_range, clip=self._umap_normalized_clip ) else: np.save(dest + 'range.npy', self._umap_normalized_range) np.save(dest + 'clip.npy', self._umap_normalized_clip) def load_range_clip(self, path: str) -> None: """Load a saved range_clip.npz file for normalizing raw UMAP output. Args: path (str): Path to numpy file (\*.npz) with 'clip' and 'range' keys as generated from ``SlideMap.save()``. """ rc_path, r_path, c_path = None, None, None if exists(path) and path.endswith('.npz'): rc_path = path elif exists(join(path, 'range_clip.npz')): rc_path = join(path, 'range_clip.npz') elif exists(join(path, 'range.npy')) and exists(join(path, 'clip.npy')): r_path = join(path, 'range.npy') c_path = join(path, 'clip.npy') else: raise FileNotFoundError( f"Unable to find range/clip information at {path}." ) if rc_path: loaded = np.load(path) if not ('range' in loaded and 'clip' in loaded): raise ValueError(f"Unable to load {path}; did not find values " "'range' and 'clip'.") self._umap_normalized_clip = loaded['clip'] self._umap_normalized_range = loaded['range'] else: self._umap_normalized_clip = np.load(c_path) self._umap_normalized_range = np.load(r_path) log.info("Loaded range={}, clip={}".format( self._umap_normalized_range, self._umap_normalized_clip )) def load_umap(self, path: str) -> "umap.UMAP": """Load only a UMAP model and not slide coordinates or range_clip.npz. Args: path (str): Path to either umap.pkl or directory with saved parametric UMAP. """ log.debug(f"Loading UMAP at {path}") if self.parametric_umap: from umap.parametric_umap import load_ParametricUMAP self.umap = load_ParametricUMAP(path) else: with open(path, 'rb') as f: self.umap = pickle.load(f) log.info(f"Loaded UMAP from [green]{path}") def load_coordinates(self, path: str) -> None: """Load coordinates from parquet file. Args: path (str, optional): Path to parquet file (.parquet) with SlideMap coordinates. """ log.debug(f"Loading coordinates at {path}") self.data = pd.read_parquet(path) log.info(f"Loaded coordinates from [green]{path}")