Shortcuts

Source code for slideflow.project

"""Module for the ``Project`` class and its associated functions.

The ``Project`` class supervises data organization and provides a high-level
API for common functionality, such as tile extraction from whole
slide images, model training and evaluation, feature calculation, and
heatmap generation.
"""
import re
import copy
import csv
import itertools
import requests
import shutil
import json
import multiprocessing
import numpy as np
import os
import pickle
import pandas as pd
import tarfile
import warnings
from tqdm import tqdm
from os.path import basename, exists, join, isdir, dirname
from multiprocessing.managers import DictProxy
from contextlib import contextmanager
from statistics import mean
from types import SimpleNamespace
from typing import (TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple,
                    Union)

import slideflow as sf
from . import errors, project_utils
from .util import log, path_to_name, path_to_ext
from .dataset import Dataset
from .model import ModelParams
from .project_utils import (  # noqa: F401
    auto_dataset, auto_dataset_allow_none, get_validation_settings,
    get_first_nested_directory, get_matching_directory, BreastER, ThyroidBRS,
    LungAdenoSquam
)

if TYPE_CHECKING:
    from slideflow.model import DatasetFeatures, Trainer, BaseFeatureExtractor
    from slideflow.slide import SlideReport
    from slideflow import simclr, mil
    from ConfigSpace import ConfigurationSpace, Configuration
    from smac.facade.smac_bb_facade import SMAC4BB  # noqa: F401


[docs]class Project: """Assists with project organization and execution of common tasks.""" def __init__( self, root: str, use_neptune: bool = False, create: bool = False, **kwargs ) -> None: """Load or create a project at a given directory. If a project does not exist at the given root directory, one can be created if a project configuration was provided via keyword arguments. *Create a project:* .. code-block:: python import slideflow as sf P = sf.Project('/project/path', name=..., ...) *Load an existing project:* .. code-block:: python P = sf.Project('/project/path') Args: root (str): Path to project directory. Keyword Args: name (str): Project name. Defaults to 'MyProject'. annotations (str): Path to annotations CSV file. Defaults to './annotations.csv' dataset_config (str): Path to dataset configuration JSON file. Defaults to './datasets.json'. sources (list(str)): List of dataset sources to include in project. Defaults to 'source1'. models_dir (str): Path to directory in which to save models. Defaults to './models'. eval_dir (str): Path to directory in which to save evaluations. Defaults to './eval'. Raises: slideflow.errors.ProjectError: if project folder does not exist, or the folder exists but kwargs are provided. """ self.root = root if sf.util.is_project(root) and kwargs: raise errors.ProjectError(f"Project already exists at {root}") elif sf.util.is_project(root): self._load(root) elif create: log.info(f"Creating project at {root}...") if not exists(root): os.makedirs(root) self._settings = project_utils._project_config(root, **kwargs) self.save() else: raise errors.ProjectError( f"Project not found at {root}. Create a project using " "slideflow.Project(..., create=True), or with " "slideflow.create_project(...)" ) # Create directories, if not already made if not exists(self.models_dir): os.makedirs(self.models_dir) if not exists(self.eval_dir): os.makedirs(self.eval_dir) # Create blank annotations file if one does not exist if not exists(self.annotations) and exists(self.dataset_config): self.create_blank_annotations() # Neptune self.use_neptune = use_neptune @classmethod def from_prompt(cls, root: str, **kwargs: Any) -> "Project": """Initialize a project using an interactive prompt. Creates a project folder and then prompts the user for project settings, saving to "settings.json" in project directory. Args: root (str): Path to project directory. """ if not sf.util.is_project(root): log.info(f'Setting up new project at "{root}"') project_utils.interactive_project_setup(root) obj = cls(root, **kwargs) return obj def __repr__(self): # noqa D105 if self.use_neptune: tail = ", use_neptune={!r}".format(self.use_neptune) else: tail = '' return "Project(root={!r}{})".format(self.root, tail) @property def verbosity(self) -> int: """Current logging verbosity level.""" return sf.getLoggingLevel() @property def annotations(self) -> str: """Path to annotations file.""" return self._read_relative_path(self._settings['annotations']) @annotations.setter def annotations(self, val: str) -> None: if not isinstance(val, str): raise errors.ProjectError("'annotations' must be a path.") self._settings['annotations'] = val @property def dataset_config(self) -> str: """Path to dataset configuration JSON file.""" return self._read_relative_path(self._settings['dataset_config']) @dataset_config.setter def dataset_config(self, val: str) -> None: if not isinstance(val, str): raise errors.ProjectError("'dataset_config' must be path to JSON.") self._settings['dataset_config'] = val @property def eval_dir(self) -> str: """Path to evaluation directory.""" if 'eval_dir' not in self._settings: log.debug("Missing eval_dir in project settings, Assuming ./eval") return self._read_relative_path('./eval') else: return self._read_relative_path(self._settings['eval_dir']) @eval_dir.setter def eval_dir(self, val: str) -> None: if not isinstance(val, str): raise errors.ProjectError("'eval_dir' must be a path") self._settings['eval_dir'] = val @property def models_dir(self) -> str: """Path to models directory.""" return self._read_relative_path(self._settings['models_dir']) @models_dir.setter def models_dir(self, val: str) -> None: if not isinstance(val, str): raise errors.ProjectError("'models_dir' must be a path") self._settings['models_dir'] = val @property def name(self) -> str: """Descriptive project name.""" return self._settings['name'] @name.setter def name(self, val: str) -> None: if not isinstance(val, str): raise errors.ProjectError("'name' must be a str") self._settings['name'] = val @property def neptune_workspace(self) -> Optional[str]: """Neptune workspace name.""" if 'neptune_workspace' in self._settings: return self._settings['neptune_workspace'] elif 'NEPTUNE_WORKSPACE' in os.environ: return os.environ['NEPTUNE_WORKSPACE'] else: return None @neptune_workspace.setter def neptune_workspace(self, name: str) -> None: """Neptune workspace name.""" if not isinstance(name, str): raise errors.ProjectError('Neptune workspace must be a string.') self._settings['neptune_workspace'] = name @property def neptune_api(self) -> Optional[str]: """Neptune API token.""" if 'neptune_api' in self._settings: return self._settings['neptune_api'] elif 'NEPTUNE_API_TOKEN' in os.environ: return os.environ['NEPTUNE_API_TOKEN'] else: return None @neptune_api.setter def neptune_api(self, api_token: str) -> None: """Neptune API token.""" if not isinstance(api_token, str): raise errors.ProjectError('API token must be a string.') self._settings['neptune_api'] = api_token @property def sources(self) -> List[str]: """List of dataset sources active in this project.""" if 'sources' in self._settings: return self._settings['sources'] elif 'datasets' in self._settings: log.debug("'sources' misnamed 'datasets' in project settings.") return self._settings['datasets'] else: raise ValueError('Unable to find project dataset sources') @sources.setter def sources(self, v: List[str]) -> None: if not isinstance(v, list) or any([not isinstance(v, str) for v in v]): raise errors.ProjectError("'sources' must be a list of str") self._settings['sources'] = v def _load(self, path: str) -> None: """Load a saved and pre-configured project from the specified path.""" if sf.util.is_project(path): self._settings = sf.util.load_json(join(path, 'settings.json')) else: raise errors.ProjectError('Unable to find settings.json.') @contextmanager def _set_eval_dir(self, path: str): _initial = self.eval_dir self.eval_dir = path try: yield finally: self.eval_dir = _initial @contextmanager def _set_models_dir(self, path: str): _initial = self.models_dir self.models_dir = path try: yield finally: self.models_dir = _initial def _read_relative_path(self, path: str) -> str: """Convert relative path within project directory to global path.""" return sf.util.relative_path(path, self.root) def _setup_labels( self, dataset: Dataset, hp: ModelParams, outcomes: List[str], config: Dict, splits: str, eval_k_fold: Optional[int] = None ) -> Tuple[Dataset, Dict, Union[Dict, List]]: """Prepare dataset and labels.""" # Assign labels into int conf_labels = config['outcome_labels'] if hp.model_type() == 'classification': if len(outcomes) == 1 and outcomes[0] not in conf_labels: outcome_label_to_int = { outcomes[0]: { v: int(k) for k, v in conf_labels.items() } } else: outcome_label_to_int = { o: { v: int(k) for k, v in conf_labels[o].items() } for o in conf_labels } else: outcome_label_to_int = None # Get patient-level labels use_float = (hp.model_type() in ['regression', 'survival']) labels, unique = dataset.labels( outcomes, use_float=use_float, assign=outcome_label_to_int ) # Prepare labels for validation splitting if hp.model_type() == 'classification' and len(outcomes) > 1: def process_label(v): return '-'.join(map(str, v)) if isinstance(v, list) else v split_labels = {k: process_label(v) for k, v in labels.items()} else: split_labels = labels # If using a specific k-fold, load validation plan if eval_k_fold: log.info(f"Using k-fold iteration {eval_k_fold}") _, eval_dts = dataset.split( hp.model_type(), split_labels, val_strategy=config['validation_strategy'], splits=join(self.root, splits), val_fraction=config['validation_fraction'], val_k_fold=config['validation_k_fold'], k_fold_iter=eval_k_fold ) return eval_dts, labels, unique # Otherwise use all TFRecords else: return dataset, labels, unique def _prepare_trainer( self, model: str, dataset: Dataset, outcomes: Optional[Union[str, List[str]]] = None, checkpoint: Optional[str] = None, eval_k_fold: Optional[int] = None, splits: str = "splits.json", max_tiles: int = 0, mixed_precision: bool = True, allow_tf32: bool = False, input_header: Optional[Union[str, List[str]]] = None, load_method: str = 'weights', custom_objects: Optional[Dict[str, Any]] = None, ) -> Tuple["Trainer", Dataset]: """Prepare a :class:`slideflow.model.Trainer` for eval or prediction. Args: model (str): Path to model to evaluate. dataset (:class:`slideflow.Dataset`): Dataset from which to generate activations. outcomes (str): Str or list of str. Annotation column header specifying the outcome label(s). checkpoint (str, optional): Path to cp.ckpt file, if evaluating saved checkpoint. Defaults to None. eval_k_fold (int, optional): K-fold iteration number to evaluate. Defaults to None. If None, evaluate all tfrecords. splits (str, optional): Filename of JSON file in which to log training/validation splits. Looks for filename in project root. Defaults to "splits.json". max_tiles (int, optional): Maximum number of tiles from each slide to evaluate. Defaults to 0 (include all tiles). mixed_precision (bool, optional): Enable mixed precision. Defaults to True. allow_tf32 (bool): Allow internal use of Tensorfloat-32 format. Defaults to False. input_header (str, optional): Annotation column header to use as additional input. Defaults to None. load_method (str): Either 'full' or 'weights'. Method to use when loading a Tensorflow model. If 'full', loads the model with ``tf.keras.models.load_model()``. If 'weights', will read the ``params.json`` configuration file, build the model architecture, and then load weights from the given model with ``Model.load_weights()``. Loading with 'full' may improve compatibility across Slideflow versions. Loading with 'weights' may improve compatibility across hardware & environments. custom_objects (dict, Optional): Dictionary mapping names (strings) to custom classes or functions. Defaults to None. Returns: A tuple containing :class:`slideflow.model.Trainer`: Trainer. :class:`slideflow.Dataset`: Evaluation dataset. """ if eval_k_fold is not None and outcomes is None: raise ValueError('`eval_k_fold` invalid when predicting.') # Load hyperparameters from saved model config = sf.util.get_model_config(model) hp = ModelParams() hp.load_dict(config['hp']) model_name = f"eval-{basename(model)}" # If not provided, detect outcomes from model config predicting = (outcomes is None) if predicting: outcomes = config['outcomes'] assert outcomes is not None outcomes = sf.util.as_list(outcomes) # Filter out slides that are blank in the outcome label, # or blank in any of the input_header categories filter_blank = [o for o in outcomes] if input_header is not None and not isinstance(input_header, list): input_header = [input_header] if input_header is not None: filter_blank += input_header # Set up outcome labels if not predicting: dataset = dataset.filter(filter_blank=filter_blank) eval_dts, labels, unique = self._setup_labels( dataset, hp, outcomes, config, splits, eval_k_fold=eval_k_fold ) else: eval_dts = dataset if sf.backend() == 'torch': labels = config['outcome_labels'] else: labels = {} unique = list(config['outcome_labels'].values()) # Set max tiles eval_dts = eval_dts.clip(max_tiles) # Prepare additional slide-level input if input_header: _res = project_utils._setup_input_labels(eval_dts, input_header) inpt_labels, feature_sizes, slide_inp = _res else: inpt_labels = None feature_sizes = None slide_inp = {} n_feat = 0 if feature_sizes is None else sum(feature_sizes) if feature_sizes and n_feat != sum(config['input_feature_sizes']): n_model_feat = sum(config['input_feature_sizes']) raise ValueError( f'Patient feature matrix (size {n_feat}) ' f'is different from model (size {n_model_feat}).' ) # Log model settings and hyperparameters if hp.model_type() == 'classification': outcome_labels = dict(zip(range(len(unique)), unique)) else: outcome_labels = None model_dir = sf.util.get_new_model_dir(self.eval_dir, model_name) # Set missing validation keys to NA for v_end in ('strategy', 'fraction', 'k_fold'): val_key = f'validation_{v_end}' if val_key not in config: config[val_key] = 'NA' eval_config = { 'slideflow_version': sf.__version__, 'project': self.name, 'backend': sf.backend(), 'git_commit': sf.__gitcommit__, 'model_name': model_name, 'model_path': model, 'stage': 'evaluation', 'img_format': config['img_format'], 'tile_px': hp.tile_px, 'tile_um': hp.tile_um, 'model_type': hp.model_type(), 'outcomes': outcomes, 'input_features': input_header, 'input_feature_sizes': feature_sizes, 'input_feature_labels': inpt_labels, 'outcome_labels': outcome_labels, 'dataset_config': self.dataset_config, 'sources': self.sources, 'annotations': self.annotations, 'validation_strategy': config['validation_strategy'], 'validation_fraction': config['validation_fraction'], 'validation_k_fold': config['validation_k_fold'], 'k_fold_i': eval_k_fold, 'filters': dataset.filters, 'pretrain': None, 'resume_training': None, 'checkpoint': checkpoint, 'hp': hp.to_dict(), 'max_tiles': max_tiles, 'min_tiles': dataset.min_tiles, } if 'norm_fit' in config: eval_config.update({'norm_fit': config['norm_fit']}) # Build a model using the slide list as input # and the annotations dictionary as output labels trainer = sf.model.build_trainer( hp, outdir=model_dir, labels=labels, config=eval_config, slide_input=slide_inp, mixed_precision=mixed_precision, allow_tf32=allow_tf32, feature_names=input_header, feature_sizes=feature_sizes, outcome_names=outcomes, use_neptune=self.use_neptune, neptune_api=self.neptune_api, neptune_workspace=self.neptune_workspace, load_method=load_method, custom_objects=custom_objects, ) return trainer, eval_dts def _train_hp( self, *, hp_name: str, hp: ModelParams, outcomes: List[str], val_settings: SimpleNamespace, ctx: multiprocessing.context.BaseContext, dataset: Optional[sf.Dataset], filters: Optional[Dict], filter_blank: Optional[Union[str, List[str]]], input_header: Optional[Union[str, List[str]]], min_tiles: int, max_tiles: int, mixed_precision: bool, allow_tf32: bool, splits: str, results_dict: Union[Dict, DictProxy], training_kwargs: Dict, balance_headers: Optional[Union[str, List[str]]], process_isolate: bool = False, **kwargs ) -> None: """Train a model(s) using the specified hyperparameters. Keyword Args: hp_name (str): Name of hyperparameter combination being run. hp (:class:`slideflow.ModelParams`): Model parameters. outcomes (str or list(str)): Annotation outcome headers. val_settings (:class:`types.SimpleNamspace`): Validation settings. ctx (multiprocessing.Context): Multiprocessing context for sharing results from isolated training processes. filters (dict, optional): Dataset filters to use for selecting slides. See :meth:`slideflow.Dataset.filter` for more information. Defaults to None. filter_blank (list(str) or str, optional): Skip slides that have blank values in these patient annotation columns. Defaults to None. input_header (str or list(str)): Annotation col of additional slide-level input. min_tiles (int): Only includes tfrecords with >= min_tiles max_tiles (int): Cap maximum tiles per tfrecord. mixed_precision (bool): Train with mixed precision. allow_tf32 (bool): Allow internal use of Tensorfloat-32 format. Defaults to False. splits (str): Location of splits file for logging/reading splits. balance_headers (str, list(str)): Annotation col headers for mini-batch balancing. results_dict (dict): Multiprocessing-friendly dict for sending results from isolated training processes training_kwargs (dict): Keyword arguments for Trainer.train(). """ # --- Prepare dataset --------------------------------------------- # Filter out slides that are blank in the outcome label, # or blank in any of the input_header categories if filter_blank is not None and not isinstance(filter_blank, list): filter_blank = [filter_blank] if filter_blank: filter_blank += [o for o in outcomes] else: filter_blank = [o for o in outcomes] if input_header is not None and not isinstance(input_header, list): input_header = [input_header] if input_header is not None: filter_blank += input_header if dataset is None: dataset = self.dataset(hp.tile_px, hp.tile_um) else: _compatible = sf.util.is_tile_size_compatible( dataset.tile_px, dataset.tile_um, hp.tile_px, hp.tile_um ) if not _compatible: raise errors.IncompatibleTileSizeError( "Dataset tile size (px={}, um={}) does not match provided " "hyperparameters (px={}, um={})".format( dataset.tile_px, dataset.tile_um, hp.tile_px, hp.tile_um ) ) dataset = dataset.filter( filters=filters, filter_blank=filter_blank, min_tiles=min_tiles ) # --- Load labels ------------------------------------------------- use_float = (hp.model_type() in ['regression', 'survival']) labels, unique = dataset.labels(outcomes, use_float=use_float) if hp.model_type() == 'classification' and len(outcomes) == 1: outcome_labels = dict(zip(range(len(unique)), unique)) elif hp.model_type() == 'classification': assert isinstance(unique, dict) outcome_labels = { k: dict(zip(range(len(ul)), ul)) # type: ignore for k, ul in unique.items() } else: outcome_labels = dict(zip(range(len(outcomes)), outcomes)) if hp.model_type() != 'regression' and len(outcomes) > 1: log.info('Using multi-outcome approach for classification') # If multiple classification outcomes are used, # create a merged variable for k-fold splitting if hp.model_type() == 'classification' and len(outcomes) > 1: split_labels = { k: '-'.join(map(str, v)) # type: ignore for k, v in labels.items() } else: split_labels = labels # type: ignore # --- Prepare k-fold validation configuration --------------------- results_log_path = os.path.join(self.root, 'results_log.csv') k_header = val_settings.k_fold_header if val_settings.k is not None and not isinstance(val_settings.k, list): val_settings.k = [val_settings.k] if val_settings.strategy == 'k-fold-manual': _, unique_k = dataset.labels(k_header, format='name') valid_k = [kf for kf in unique_k] k_fold = len(valid_k) log.info(f"Manual folds: {', '.join([str(ks) for ks in valid_k])}") if val_settings.k: valid_k = [kf for kf in valid_k if kf in val_settings.k] elif val_settings.strategy in ('k-fold', 'k-fold-preserved-site', 'bootstrap'): k_fold = val_settings.k_fold if val_settings.k is None: valid_k = list(range(1, k_fold+1)) else: valid_k = [ kf for kf in range(1, k_fold+1) if kf in val_settings.k ] else: k_fold = None valid_k = [None] # type: ignore # Create model labels label_string = '-'.join(outcomes) model_name = f'{label_string}-{hp_name}' if k_fold is None: model_iterations = [model_name] else: model_iterations = [f'{model_name}-kfold{k}' for k in valid_k] s_args = SimpleNamespace( model_name=model_name, outcomes=outcomes, k_header=k_header, valid_k=valid_k, split_labels=split_labels, splits=splits, labels=labels, min_tiles=min_tiles, max_tiles=max_tiles, outcome_labels=outcome_labels, filters=filters, training_kwargs=training_kwargs, mixed_precision=mixed_precision, allow_tf32=allow_tf32, ctx=ctx, results_dict=results_dict, bal_headers=balance_headers, input_header=input_header, process_isolate=process_isolate, **kwargs ) # --- Train on a specific K-fold -------------------------------------- for k in valid_k: s_args.k = k self._train_split(dataset, hp, val_settings, s_args) # --- Record results -------------------------------------------------- if (not val_settings.source and (val_settings.strategy is None or val_settings.strategy == 'none')): log.info('No validation performed.') else: for mi in model_iterations: if mi not in results_dict or 'epochs' not in results_dict[mi]: log.error(f'Training failed for model {model_name}') else: sf.util.update_results_log( results_log_path, mi, results_dict[mi]['epochs'] ) log.info(f'Training results saved: [green]{results_log_path}') def _train_split( self, dataset: Dataset, hp: ModelParams, val_settings: SimpleNamespace, s_args: SimpleNamespace, ) -> None: """Train a model for a given training/validation split. Args: dataset (:class:`slideflow.Dataset`): Dataset to split into training and validation. hp (:class:`slideflow.ModelParams`): Model parameters. val_settings (:class:`types.SimpleNamspace`): Validation settings. s_args (:class:`types.SimpleNamspace`): Training settings. """ # Log current model name and k-fold iteration, if applicable k_msg = '' if s_args.k is not None: k_msg = f' ({val_settings.strategy} #{s_args.k})' if sf.getLoggingLevel() <= 20: print() log.info(f'Training model [bold]{s_args.model_name}[/]{k_msg}...') log.info(f'Hyperparameters: {hp}') if val_settings.dataset: log.info('Val settings: <Dataset manually provided>') else: log.info( f'Val settings: {json.dumps(vars(val_settings), indent=2)}' ) # --- Set up validation data ------------------------------------------ from_wsi = ('from_wsi' in s_args.training_kwargs and s_args.training_kwargs['from_wsi']) # Use an external validation dataset if supplied if val_settings.dataset: train_dts = dataset val_dts = val_settings.dataset is_float = (hp.model_type() in ['regression', 'survival']) val_labels, _ = val_dts.labels(s_args.outcomes, use_float=is_float) s_args.labels.update(val_labels) elif val_settings.source: train_dts = dataset val_dts = Dataset( tile_px=hp.tile_px, tile_um=hp.tile_um, config=self.dataset_config, sources=val_settings.source, annotations=val_settings.annotations, filters=val_settings.filters, filter_blank=val_settings.filter_blank ) is_float = (hp.model_type() in ['regression', 'survival']) val_labels, _ = val_dts.labels(s_args.outcomes, use_float=is_float) s_args.labels.update(val_labels) # Use manual k-fold assignments if indicated elif val_settings.strategy == 'k-fold-manual': t_filters = { s_args.k_header: [j for j in s_args.valid_k if j != s_args.k] } train_dts = dataset.filter(t_filters) val_dts = dataset.filter(filters={s_args.k_header: [s_args.k]}) # No validation elif val_settings.strategy == 'none': train_dts = dataset val_dts = None # Otherwise, calculate k-fold splits else: if val_settings.strategy == 'k-fold-preserved-site': site_labels = dataset.labels( s_args.k_header, format='name' )[0] # type: Any else: site_labels = None train_dts, val_dts = dataset.split( hp.model_type(), s_args.split_labels, val_strategy=val_settings.strategy, splits=join(self.root, s_args.splits), val_fraction=val_settings.fraction, val_k_fold=val_settings.k_fold, k_fold_iter=s_args.k, site_labels=site_labels, from_wsi=from_wsi ) # ---- Balance datasets -------------------------------------- # Training if s_args.bal_headers is None: s_args.bal_headers = s_args.outcomes if train_dts.prob_weights and hp.training_balance not in ('none', None): log.warning( "Training dataset already balanced; ignoring hyperparameter " "training_balance={!r}".format(hp.training_balance) ) elif not from_wsi: train_dts = train_dts.balance( s_args.bal_headers, hp.training_balance, force=(hp.model_type() == 'classification') ) elif from_wsi and hp.training_balance not in ('none', None): log.warning( "Balancing / clipping is disabled when `from_wsi=True`" ) # Validation if val_dts and val_dts.prob_weights and hp.validation_balance not in ( 'none', None ): log.warning( "Validation dataset already balanced; ignoring hyperparameter " "validation_balance={!r}".format(hp.validation_balance) ) elif val_dts and not from_wsi: val_dts = val_dts.balance( s_args.bal_headers, hp.validation_balance, force=(hp.model_type() == 'classification') ) elif val_dts and from_wsi and hp.validation_balance not in ( 'none', None ): log.warning( "Balancing / clipping is disabled when `from_wsi=True`" ) # ---- Clip datasets ----------------------------------------- # Training if s_args.max_tiles and train_dts._clip: log.warning( "Training dataset already clipped; ignoring parameter " "max_tiles={!r}".format(s_args.max_tiles) ) elif s_args.max_tiles and not from_wsi: train_dts = train_dts.clip(s_args.max_tiles) elif s_args.max_tiles and from_wsi: log.warning( "Clipping is disabled when `from_wsi=True`" ) # Validation if val_dts and s_args.max_tiles and val_dts._clip: log.warning( "Validation dataset already clipped; ignoring parameter " "max_tiles={!r}".format(s_args.max_tiles) ) elif s_args.max_tiles and val_dts and not from_wsi: val_dts = val_dts.clip(s_args.max_tiles) elif s_args.max_tiles and val_dts and from_wsi: log.warning( "Clipping is disabled when `from_wsi=True`" ) # ---- Determine tile counts --------------------------------------- if from_wsi: num_train = len(train_dts.slide_paths()) num_val = 0 if not val_dts else len(val_dts.slide_paths()) log.info( f'Using {num_train} training slides, {num_val} validation' ) else: num_train = len(train_dts.tfrecords()) num_val = 0 if not val_dts else len(val_dts.tfrecords()) log.info( f'Using {num_train} training TFRecords, {num_val} validation' ) # --- Prepare additional slide-level input ---------------------------- if s_args.input_header: _res = project_utils._setup_input_labels( dataset, s_args.input_header, val_dts=val_dts ) inpt_labels, feature_sizes, slide_inp = _res else: inpt_labels = None feature_sizes = None slide_inp = None # --- Initialize model ------------------------------------------------ # Using the project annotation file, assemble slides for training, # as well as the slide annotations dictionary (output labels) full_name = s_args.model_name if s_args.k is not None: full_name += f'-kfold{s_args.k}' model_dir = sf.util.get_new_model_dir(self.models_dir, full_name) # Log model settings and hyperparameters config = { 'slideflow_version': sf.__version__, 'project': self.name, 'backend': sf.backend(), 'git_commit': sf.__gitcommit__, 'model_name': s_args.model_name, 'full_model_name': full_name, 'stage': 'training', 'img_format': train_dts.img_format, 'tile_px': hp.tile_px, 'tile_um': hp.tile_um, 'max_tiles': s_args.max_tiles, 'min_tiles': s_args.min_tiles, 'model_type': hp.model_type(), 'outcomes': s_args.outcomes, 'input_features': s_args.input_header, 'input_feature_sizes': feature_sizes, 'input_feature_labels': inpt_labels, 'outcome_labels': s_args.outcome_labels, 'dataset_config': self.dataset_config, 'sources': self.sources, 'annotations': self.annotations, 'validation_strategy': val_settings.strategy, 'validation_fraction': val_settings.fraction, 'validation_k_fold': val_settings.k_fold, 'k_fold_i': s_args.k, 'filters': s_args.filters, 'hp': hp.to_dict(), 'training_kwargs': s_args.training_kwargs, } model_kwargs = { 'hp': hp, 'name': full_name, 'feature_names': s_args.input_header, 'feature_sizes': feature_sizes, 'outcome_names': s_args.outcomes, 'outdir': model_dir, 'config': config, 'slide_input': slide_inp, 'labels': s_args.labels, 'mixed_precision': s_args.mixed_precision, 'allow_tf32': s_args.allow_tf32, 'use_neptune': self.use_neptune, 'neptune_api': self.neptune_api, 'neptune_workspace': self.neptune_workspace, 'load_method': s_args.load_method } if s_args.process_isolate: process = s_args.ctx.Process(target=project_utils._train_worker, args=((train_dts, val_dts), model_kwargs, s_args.training_kwargs, s_args.results_dict, self.verbosity)) process.start() log.debug(f'Spawning training process (PID: {process.pid})') process.join() else: project_utils._train_worker( (train_dts, val_dts), model_kwargs, s_args.training_kwargs, s_args.results_dict, self.verbosity ) def add_source( self, name: str, *, slides: Optional[str] = None, roi: Optional[str] = None, tiles: Optional[str] = None, tfrecords: Optional[str] = None, path: Optional[str] = None ) -> None: r"""Add a dataset source to the dataset configuration file. Args: name (str): Dataset source name. Keyword Args: slides (str, optional): Path to directory containing slides. Defaults to None. roi (str, optional): Path to directory containing CSV ROIs. Defaults to None. tiles (str, optional): Path to directory for loose extracted tiles images (\*.jpg, \*.png). Defaults to None. tfrecords (str, optional): Path to directory for storing TFRecords of tiles. Defaults to None. path (str, optional): Path to dataset configuration file. If not provided, uses project default. Defaults to None. """ if not path: path = self.dataset_config project_utils.add_source( name, path=path, slides=slides, roi=(roi or join(self._read_relative_path('./roi'), name)), tiles=tiles, tfrecords=(tfrecords or join(self._read_relative_path('./tfrecords'), name)), ) if name not in self.sources: self.sources += [name] self.save() def associate_slide_names(self) -> None: """Automatically associate patients with slides in the annotations.""" dataset = self.dataset(tile_px=0, tile_um=0, verification=None) dataset.update_annotations_with_slidenames(self.annotations) def cell_segmentation( self, diam_um: float, dest: Optional[str] = None, *, filters: Optional[Dict] = None, filter_blank: Optional[Union[str, List[str]]] = None, sources: Union[str, List[str]], **kwargs ) -> None: """Perform cell segmentation on slides, saving segmentation masks. Cells are segmented with `Cellpose <https://www.nature.com/articles/s41592-020-01018-x>`_ from whole-slide images, and segmentation masks are saved in the ``masks/`` subfolder within the project root directory. .. note:: Cell segmentation requires installation of the ``cellpose`` package available via pip: .. code-block:: bash pip install cellpose Args: diam_um (float, optional): Cell segmentation diameter, in microns. dest (str): Destination in which to save cell segmentation masks. If None, will save masks in ``{project_root}/masks`` Defaults to None. Keyword args: batch_size (int): Batch size for cell segmentation. Defaults to 8. cp_thresh (float): Cell probability threshold. All pixels with value above threshold kept for masks, decrease to find more and larger masks. Defaults to 0. diam_mean (int, optional): Cell diameter to detect, in pixels (without image resizing). If None, uses Cellpose defaults (17 for the 'nuclei' model, 30 for all others). downscale (float): Factor by which to downscale generated masks after calculation. Defaults to None (keep masks at original size). flow_threshold (float): Flow error threshold (all cells with errors below threshold are kept). Defaults to 0.4. gpus (int, list(int)): GPUs to use for cell segmentation. Defaults to 0 (first GPU). interp (bool): Interpolate during 2D dynamics. Defaults to True. qc (str): Slide-level quality control method to use before performing cell segmentation. Defaults to "Otsu". model (str, :class:`cellpose.models.Cellpose`): Cellpose model to use for cell segmentation. May be any valid cellpose model. Defaults to 'cyto2'. mpp (float): Microns-per-pixel at which cells should be segmented. Defaults to 0.5. num_workers (int, optional): Number of workers. Defaults to 2 * num_gpus. save_centroid (bool): Save mask centroids. Increases memory utilization slightly. Defaults to True. save_flow (bool): Save flow values for the whole-slide image. Increases memory utilization. Defaults to False. sources (List[str]): List of dataset sources to include from configuration file. tile (bool): Tiles image to decrease GPU/CPU memory usage. Defaults to True. verbose (bool): Verbose log output at the INFO level. Defaults to True. window_size (int): Window size at which to segment cells across a whole-slide image. Defaults to 256. Returns: None """ if dest is None: dest = join(self.root, 'masks') if not exists(dest): os.makedirs(dest) dataset = self.dataset( None, None, filters=filters, filter_blank=filter_blank, verification='slides', sources=sources, ) dataset.cell_segmentation(diam_um, dest, **kwargs) def create_blank_annotations( self, filename: Optional[str] = None ) -> None: """Create an empty annotations file. Args: filename (str): Annotations file destination. If not provided, will use project default. """ if filename is None: filename = self.annotations if exists(filename): raise errors.AnnotationsError( f"Error creating annotations {filename}; file already exists" ) if not exists(self.dataset_config): raise errors.AnnotationsError( f"Dataset config {self.dataset_config} missing." ) dataset = Dataset( config=self.dataset_config, sources=self.sources, tile_px=None, tile_um=None, annotations=None ) all_paths = dataset.slide_paths(apply_filters=False) slides = [path_to_name(s) for s in all_paths] with open(filename, 'w') as csv_outfile: csv_writer = csv.writer(csv_outfile, delimiter=',') header = ['patient', 'dataset', 'category'] csv_writer.writerow(header) for slide in slides: csv_writer.writerow([slide, '', '']) log.info(f"Wrote annotations file to [green]{filename}") def create_hp_sweep( self, filename: str = 'sweep.json', label: Optional[str] = None, **kwargs: Any ) -> None: """Prepare a grid-search hyperparameter sweep, saving to a config file. To initiate a grid-search sweep using the created JSON file, pass this file to the ``params`` argument of ``Project.train()``: >>> P.train('outcome', params='sweep.json', ...) Args: filename (str, optional): Filename for hyperparameter sweep. Overwrites existing files. Saves in project root directory. Defaults to "sweep.json". label (str, optional): Label to use when naming models in sweep. Defaults to None. **kwargs: Parameters to include in the sweep. Parameters may either be fixed or provided as lists. """ non_epoch_kwargs = {k: v for k, v in kwargs.items() if k != 'epochs'} pdict = copy.deepcopy(non_epoch_kwargs) args = list(pdict.keys()) for arg in args: if not isinstance(pdict[arg], list): pdict[arg] = [pdict[arg]] argsv = list(pdict.values()) sweep = list(itertools.product(*argsv)) label = '' if not label else f'{label}-' hp_list = [] for i, params in enumerate(sweep): full_params = dict(zip(args, list(params))) if 'epochs' in kwargs: full_params['epochs'] = kwargs['epochs'] mp = ModelParams(**full_params) hp_list += [{f'{label}HPSweep{i}': mp.to_dict()}] sf.util.write_json(hp_list, os.path.join(self.root, filename)) log.info(f'Wrote hp sweep (len {len(sweep)}) to [green]{filename}') @auto_dataset def evaluate( self, model: str, outcomes: Union[str, List[str]], *, dataset: Dataset, filters: Optional[Dict] = None, filter_blank: Optional[Union[str, List[str]]] = None, min_tiles: int = 0, checkpoint: Optional[str] = None, eval_k_fold: Optional[int] = None, splits: str = "splits.json", max_tiles: int = 0, mixed_precision: bool = True, allow_tf32: bool = False, input_header: Optional[Union[str, List[str]]] = None, load_method: str = 'weights', custom_objects: Optional[Dict[str, Any]] = None, **kwargs: Any ) -> Dict: """Evaluate a saved model on a given set of tfrecords. Args: model (str): Path to model to evaluate. outcomes (str): Str or list of str. Annotation column header specifying the outcome label(s). Keyword Args: dataset (:class:`slideflow.Dataset`, optional): Dataset to evaluate. If not supplied, will evaluate all project tfrecords at the tile_px/tile_um matching the supplied model, optionally using provided filters and filter_blank. filters (dict, optional): Dataset filters to use for selecting slides. See :meth:`slideflow.Dataset.filter` for more information. Defaults to None. filter_blank (list(str) or str, optional): Skip slides that have blank values in these patient annotation columns. Defaults to None. min_tiles (int, optional): Minimum number of tiles a slide must have to be included in evaluation. Defaults to 0. checkpoint (str, optional): Path to cp.ckpt file, if evaluating a saved checkpoint. Defaults to None. eval_k_fold (int, optional): K-fold iteration number to evaluate. Defaults to None. If None, will evaluate all tfrecords irrespective of K-fold. splits (str, optional): Filename of JSON file in which to log train/val splits. Looks for filename in project root directory. Defaults to "splits.json". max_tiles (int, optional): Maximum number of tiles from each slide to evaluate. Defaults to 0. If zero, will include all tiles. mixed_precision (bool, optional): Enable mixed precision. Defaults to True. allow_tf32 (bool): Allow internal use of Tensorfloat-32 format. Defaults to False. input_header (str, optional): Annotation column header to use as additional input. Defaults to None. load_method (str): Either 'full' or 'weights'. Method to use when loading a Tensorflow model. If 'full', loads the model with ``tf.keras.models.load_model()``. If 'weights', will read the ``params.json`` configuration file, build the model architecture, and then load weights from the given model with ``Model.load_weights()``. Loading with 'full' may improve compatibility across Slideflow versions. Loading with 'weights' may improve compatibility across hardware & environments. reduce_method (str, optional): Reduction method for calculating slide-level and patient-level predictions for categorical outcomes. Options include 'average', 'mean', 'proportion', 'median', 'sum', 'min', 'max', or a callable function. 'average' and 'mean' are synonymous, with both options kept for backwards compatibility. If 'average' or 'mean', will reduce with average of each logit across tiles. If 'proportion', will convert tile predictions into onehot encoding then reduce by averaging these onehot values. For all other values, will reduce with the specified function, applied via the pandas ``DataFrame.agg()`` function. Defaults to 'average'. save_predictions (bool or str, optional): Save tile, slide, and patient-level predictions at each evaluation. May be 'csv', 'feather', or 'parquet'. If False, will not save predictions. Defaults to 'parquet'. custom_objects (dict, Optional): Dictionary mapping names (strings) to custom classes or functions. Defaults to None. **kwargs: Additional keyword arguments to the `Trainer.evaluate()` function. Returns: Dict: Dictionary of keras training results, nested by epoch. """ log.info(f'Evaluating model at [green]{model}') trainer, eval_dts = self._prepare_trainer( model=model, dataset=dataset, outcomes=outcomes, checkpoint=checkpoint, eval_k_fold=eval_k_fold, splits=splits, max_tiles=max_tiles, input_header=input_header, mixed_precision=mixed_precision, allow_tf32=allow_tf32, load_method=load_method, custom_objects=custom_objects, ) # Load the model if isinstance(model, str): trainer.load(model, training=True) if checkpoint: if trainer.feature_sizes: n_features = sum(trainer.feature_sizes) else: n_features = 0 trainer.model = trainer.hp.build_model( labels=trainer.labels, num_slide_features=n_features ) trainer.model.load_weights(checkpoint) # Evaluate return trainer.evaluate(eval_dts, **kwargs) def evaluate_mil( self, model: str, outcomes: Union[str, List[str]], dataset: Dataset, bags: Union[str, List[str]], config: Optional["mil.TrainerConfig"] = None, *, outdir: Optional[str] = None, **kwargs ) -> pd.DataFrame: r"""Evaluate a multi-instance learning model. Saves results for the evaluation in the ``mil_eval`` project folder, including predictions (parquet format), attention (Numpy format for each slide), and attention heatmaps (if ``attention_heatmaps=True``). Logs classifier metrics (AUROC and AP) to the console. Args: model (str): Path to MIL model. outcomes (str): Outcome column (annotation header) from which to derive category labels. dataset (:class:`slideflow.Dataset`): Dataset. bags (str): Either a path to directory with \*.pt files, or a list of paths to individual \*.pt files. Each file should contain exported feature vectors, with each file containing all tile features for one patient. config (:class:`slideflow.mil.TrainerConfig`): Training configuration, as obtained by :func:`slideflow.mil.mil_config()`. Keyword args: exp_label (str): Experiment label, used for naming the subdirectory in the ``{project root}/mil`` folder, where training history and the model will be saved. attention_heatmaps (bool): Calculate and save attention heatmaps. Defaults to False. interpolation (str, optional): Interpolation strategy for smoothing attention heatmaps. Defaults to 'bicubic'. cmap (str, optional): Matplotlib colormap for heatmap. Can be any valid matplotlib colormap. Defaults to 'inferno'. norm (str, optional): Normalization strategy for assigning heatmap values to colors. Either 'two_slope', or any other valid value for the ``norm`` argument of ``matplotlib.pyplot.imshow``. If 'two_slope', normalizes values less than 0 and greater than 0 separately. Defaults to None. Returns: pd.DataFrame: Dataframe of predictions. """ from .mil import eval_mil if outdir is None: outdir = join(self.root, 'mil_eval') return eval_mil( model, dataset=dataset, outcomes=outcomes, bags=bags, config=config, outdir=outdir, **kwargs ) def extract_cells( self, tile_px: int, tile_um: Union[int, str], masks_path: Optional[str] = None, *, filters: Optional[Dict] = None, filter_blank: Optional[Union[str, List[str]]] = None, **kwargs: Any ) -> Dict[str, "SlideReport"]: """Extract images of cells from whole-slide images. Image tiles are extracted from cells, with a tile at each cell centroid. Requires that cells have already been segmented with ``Project.cell_segmentation()``. This function otherwise is similar to :meth:`slideflow.Project.extract_tiles`, with tiles saved in TFRecords by default. Args: tile_px (int): Size of tiles to extract at cell centroids (pixels). tile_um (int or str): Size of tiles to extract, in microns (int) or magnification (str, e.g. "20x"). masks_path (str, optional): Location of saved masks. If None, will look in project default (subfolder '/masks'). Defaults to None. Keyword Args: apply_masks (bool): Apply cell segmentation masks to the extracted tiles. Defaults to True. **kwargs (Any): All other keyword arguments are passed to :meth:`Project.extract_tiles()`. Returns: Dictionary mapping slide paths to each slide's SlideReport (:class:`slideflow.slide.report.SlideReport`) """ if masks_path is None: masks_path = join(self.root, 'masks') dataset = self.dataset( tile_px, tile_um, filters=filters, filter_blank=filter_blank, verification='slides' ) return dataset.extract_cells(masks_path=masks_path, **kwargs) def extract_tiles( self, tile_px: int, tile_um: Union[int, str], *, filters: Optional[Dict] = None, filter_blank: Optional[Union[str, List[str]]] = None, **kwargs: Any ) -> Dict[str, "SlideReport"]: """Extract tiles from slides. Preferred use is calling :meth:`slideflow.Dataset.extract_tiles`. Args: tile_px (int): Size of tiles to extract, in pixels. tile_um (int or str): Size of tiles to extract, in microns (int) or magnification (str, e.g. "20x"). Keyword Args: filters (dict, optional): Dataset filters to use for selecting slides. See :meth:`slideflow.Dataset.filter` for more information. Defaults to None. filter_blank (list(str) or str, optional): Skip slides that have blank values in these patient annotation columns. Defaults to None. save_tiles (bool, optional): Save tile images in loose format. Defaults to False. save_tfrecords (bool): Save compressed image data from extracted tiles into TFRecords in the corresponding TFRecord directory. Defaults to True. source (str, optional): Name of dataset source from which to select slides for extraction. Defaults to None. If not provided, will default to all sources in project. stride_div (int): Stride divisor for tile extraction. A stride of 1 will extract non-overlapping tiles. A stride_div of 2 will extract overlapping tiles, with a stride equal to 50% of the tile width. Defaults to 1. enable_downsample (bool): Enable downsampling for slides. This may result in corrupted image tiles if downsampled slide layers are corrupted or incomplete. Defaults to True. roi_method (str): Either 'inside', 'outside', 'auto', or 'ignore'. Determines how ROIs are used to extract tiles. If 'inside' or 'outside', will extract tiles in/out of an ROI, and skip the slide if an ROI is not available. If 'auto', will extract tiles inside an ROI if available, and across the whole-slide if no ROI is found. If 'ignore', will extract tiles across the whole-slide regardless of whether an ROI is available. Defaults to 'auto'. roi_filter_method (str or float): Method of filtering tiles with ROIs. Either 'center' or float (0-1). If 'center', tiles are filtered with ROIs based on the center of the tile. If float, tiles are filtered based on the proportion of the tile inside the ROI, and ``roi_filter_method`` is interpreted as a threshold. If the proportion of a tile inside the ROI is greater than this number, the tile is included. For example, if ``roi_filter_method=0.7``, a tile that is 80% inside of an ROI will be included, and a tile that is 50% inside of an ROI will be excluded. Defaults to 'center'. skip_extracted (bool): Skip slides that have already been extracted. Defaults to True. tma (bool): Reads slides as Tumor Micro-Arrays (TMAs). Deprecated argument; all slides are now read as standard WSIs. randomize_origin (bool): Randomize pixel starting position during extraction. Defaults to False. buffer (str, optional): Slides will be copied to this directory before extraction. Defaults to None. Using an SSD or ramdisk buffer vastly improves tile extraction speed. q_size (int): Size of queue when using a buffer. Defaults to 2. qc (str, optional): 'otsu', 'blur', 'both', or None. Perform blur detection quality control - discarding tiles with detected out-of-focus regions or artifact - and/or otsu's method. Increases tile extraction time. Defaults to None. report (bool): Save a PDF report of tile extraction. Defaults to True. normalizer (str, optional): Normalization strategy. Defaults to None. normalizer_source (str, optional): Stain normalization preset or path to a source image. Valid presets include 'v1', 'v2', and 'v3'. If None, will use the default present ('v3'). Defaults to None. whitespace_fraction (float, optional): Range 0-1. Discard tiles with this fraction of whitespace. If 1, will not perform whitespace filtering. Defaults to 1. whitespace_threshold (int, optional): Range 0-255. Defaults to 230. Threshold above which a pixel (RGB average) is whitespace. grayspace_fraction (float, optional): Range 0-1. Defaults to 0.6. Discard tiles with this fraction of grayspace. If 1, will not perform grayspace filtering. grayspace_threshold (float, optional): Range 0-1. Defaults to 0.05. Pixels in HSV format with saturation below this threshold are considered grayspace. img_format (str, optional): 'png' or 'jpg'. Defaults to 'jpg'. Image format to use in tfrecords. PNG (lossless) for fidelity, JPG (lossy) for efficiency. shuffle (bool, optional): Shuffle tiles prior to storage in tfrecords. Defaults to True. num_threads (int, optional): Number of worker processes for each tile extractor. When using cuCIM slide reading backend, defaults to the total number of available CPU cores, using the 'fork' multiprocessing method. With Libvips, this defaults to the total number of available CPU cores or 32, whichever is lower, using 'spawn' multiprocessing. qc_blur_radius (int, optional): Quality control blur radius for out-of-focus area detection. Used if qc=True. Defaults to 3. qc_blur_threshold (float, optional): Quality control blur threshold for detecting out-of-focus areas. Only used if qc=True. Defaults to 0.1 qc_filter_threshold (float, optional): Float between 0-1. Tiles with more than this proportion of blur will be discarded. Only used if qc=True. Defaults to 0.6. qc_mpp (float, optional): Microns-per-pixel indicating image magnification level at which quality control is performed. Defaults to mpp=4 (effective magnification 2.5 X) dry_run (bool, optional): Determine tiles that would be extracted, but do not export any images. Defaults to None. max_tiles (int, optional): Only extract this many tiles per slide. Defaults to None. Returns: Dictionary mapping slide paths to each slide's SlideReport (:class:`slideflow.slide.report.SlideReport`) """ dataset = self.dataset( tile_px, tile_um, filters=filters, filter_blank=filter_blank, verification='slides' ) return dataset.extract_tiles(**kwargs) def gan_train( self, dataset: Dataset, *, model: str = 'stylegan3', outcomes: Optional[Union[str, List[str]]] = None, exp_label: Optional[str] = None, mirror: bool = True, metrics: Optional[Union[str, List[str]]] = None, dry_run: bool = False, normalizer: Optional[str] = None, normalizer_source: Optional[str] = None, tile_labels: Optional[str] = None, crop: Optional[int] = None, resize: Optional[int] = None, **kwargs ) -> None: """Train a GAN network. Examples Train StyleGAN2 from a Slideflow dataset. >>> P = sf.Project('/project/path') >>> dataset = P.dataset(tile_px=512, tile_um=400) >>> P.gan_train(dataset=dataset, exp_label="MyExperiment", ...) Train StyleGAN2 as a class-conditional network. >>> P.gan_train(..., outcomes='class_label') Train using a pretrained network. >>> P.gan_train(..., resume='/path/to/network.pkl') Train with multiple GPUs. >>> P.gan_train(..., gpus=4) Args: dataset (:class:`slideflow.Dataset`): Training dataset. Keyword Args: allow_tf32 (bool): Allow internal use of Tensorflow-32. Option only available for StyleGAN2. Defaults to True. aug (str): Augmentation mode. Options include 'ada', 'noaug', 'fixed'. Defaults to 'ada'. augpipe (str): Augmentation pipeline. Options include 'blit', 'geom', 'color', 'filter', 'noise', 'cutout', 'bg', 'bgc', 'bgcfnc'. Only available for StyleGAN2. Defaults to 'bgcfnc'. batch (int, optional): Override batch size set by `cfg`. cfg (str): StyleGAN2 base configuration. Options include 'auto', 'stylegan2', 'paper256', 'paper512', 'paper1024', and 'cifar'. Defaults to 'auto'. dry_run (bool): Set up training but do not execute. Defaults to False. exp_label (str, optional): Experiment label. Defaults to None. freezed (int): Freeze this many discriminator layers. Defaults to 0. fp32 (bool, optional): Disable mixed-precision training. Defaults to False. gamma (float, optional): Override R1 gamma from configuration (set with `cfg`). gpus (int): Number GPUs to train on in parallel. Defaults to 1. kimg (int): Override training duration in kimg (thousand images) set by `cfg`. Most configurations default to 25,000 kimg (25 million images). lazy_resume (bool). Allow lazy loading from saved pretrained networks, for example to load a non-conditional network when training a conditional network. Defaults to False. mirror (bool): Randomly flip/rotate images during training. Defaults to True. metrics (str, list(str), optional): Metrics to calculate during training. Options include 'fid50k', 'is50k', 'ppl_zfull', 'ppl_wfull', 'ppl_zend', 'ppl2_wend', 'ls', and 'pr50k3'. Defaults to None. model (str): Architecture to train. Valid model architectures include "stylegan2" and "stylegan3". Defaults to "stylegan3". nhwc (bool): Use NWHC memory format with FP16. Defaults to False. nobench (bool): Disable cuDNN benchmarking. Defaults to False. outcomes (str, list(str), optional): Class conditioning outcome labels for training a class-conditioned GAN. If not provided, trains an unconditioned GAN. Defaults to None. tile_labels (str, optional): Path to pandas dataframe with tile-level labels. The dataframe should be indexed by tile name, where the name of the tile follows the format: [slide name]-[tile x coordinate]-[tile y coordinate], e.g.: ``slide1-251-666``. The dataframe should have a single column with the name 'label'. Labels can be categorical or continuous. If categorical, the labels should be onehot encoded. crop (int, optional): Randomly crop images to this target size during training. This permits training a smaller network (e.g. 256 x 256) on larger images (e.g. 299 x 299). Defaults to None. resize (int, optional): Resize images to this target size during training. This permits training a smaller network (e.g. 256 x 256) on larger images (e.g. 299 x 299). If both ``crop`` and ``resize`` are provided, cropping will be performed first. Defaults to None. resume (str): Load previous network. Options include 'noresume' , 'ffhq256', 'ffhq512', 'ffhqq1024', 'celebahq256', 'lsundog256', <file>, or <url>. Defaults to 'noresume'. snap (int): Snapshot interval for saving network and example images. Defaults to 50 ticks. """ # Validate the method and import the appropriate submodule supported_models = ('stylegan2', 'stylegan3') if model not in supported_models: raise ValueError(f"Unknown method '{model}'. Valid methods " f"include: {', '.join(supported_models)}") try: if model == 'stylegan2': from slideflow.gan.stylegan2 import stylegan2 as network elif model == 'stylegan3': from slideflow.gan.stylegan3 import stylegan3 as network # type: ignore except ImportError: raise ImportError("StyleGAN functions require 'slideflow-noncommercial'. " "Please install with 'pip install slideflow-noncommercial'") if metrics is not None: log.warn( "StyleGAN2 metrics are not fully implemented for Slideflow." ) # Setup directories gan_root = join(self.root, 'gan') if not exists(gan_root): os.makedirs(gan_root) if exp_label is None: exp_label = 'gan_experiment' gan_dir = sf.util.get_new_model_dir(gan_root, exp_label) # Write GAN configuration config_loc = join(gan_dir, 'slideflow_config.json') config = dict( project_path=self.root, tile_px=dataset.tile_px, tile_um=dataset.tile_um, model_type='classification', outcome_label_headers=outcomes, filters=dataset._filters, filter_blank=dataset._filter_blank, min_tiles=dataset._min_tiles, tile_labels=tile_labels, crop=crop, resize=resize ) if normalizer: config['normalizer_kwargs'] = dict( normalizer=normalizer, normalizer_source=normalizer_source ) sf.util.write_json(config, config_loc) # Train the GAN network.train.train( ctx=None, outdir=gan_dir, dry_run=dry_run, slideflow=config_loc, cond=(outcomes is not None or tile_labels is not None), mirror=mirror, metrics=metrics, **kwargs) def gan_generate( self, network_pkl: str, out: str, seeds: List[int], **kwargs ) -> None: """Generate images from a trained GAN network. Examples Save images as ``.png`` for seeds 0-100. >>> network_pkl = '/path/to/trained/gan.pkl' >>> P.gan_generate( ... network_pkl, ... out='/dir', ... format='jpg', ... seeds=range(100)) Save images in TFRecord format. >>> P.gan_generate(... out='target.tfrecords') Save images of class '0' for a class-conditional GAN. >>> P.gan_generate(..., class_idx=0) Resize GAN images (trained at 512 px / 400 um) to match a target tile size (299 px / 302 um). >>> P.gan_generate( ... ..., ... gan_px=512, ... gan_um=400, ... target_px=299, ... target_um=302) Args: network_pkl (str): Path to a trained StyleGAN2 network (``.pkl``) out (str): Directory in which to save generated images. seeds (list(int)): Seeds for which images will be generated. Keyword args: format (str, optional): Image format, either 'jpg' or 'png'. Defaults to 'png'. truncation_psi (float, optional): Truncation PSI. Defaults to 1. noise_mode (str, optional): Either 'const', 'random', or 'none'. Defaults to 'const'. class_idx (int, optional): Class index to generate, for class- conditional networks. Defaults to None. save_projection (bool, optional): Save weight projection for each generated image as an `.npz` file in the out directory. Defaults to False. resize (bool, optional): Crop/resize images to a target micron/pixel size. Defaults to False. gan_um (int, optional): Size of GAN images in microns. Used for cropping/resizing images to a target size. Defaults to None. gan_px (int, optional): Size of GAN images in pixels. Used for cropping/resizing images to a target size. Defaults to None. target_um (int, optional): Crop/resize GAN images to this micron size. Defaults to None. target_px (int, optional): Crop/resize GAN images to this pixel size. Defaults to None. """ from slideflow.gan.stylegan2 import stylegan2 stylegan2.generate.generate_images( network_pkl, outdir=out, seeds=seeds, **kwargs ) @auto_dataset_allow_none def generate_features( self, model: str, dataset: Optional[Dataset] = None, *, filters: Optional[Dict] = None, filter_blank: Optional[Union[str, List[str]]] = None, min_tiles: int = 0, max_tiles: int = 0, outcomes: Optional[List[str]] = None, **kwargs: Any ) -> sf.DatasetFeatures: """Calculate layer activations. See :ref:`Layer activations <dataset_features>` for more information. Args: model (str): Path to model dataset (:class:`slideflow.Dataset`, optional): Dataset from which to generate activations. If not supplied, calculate activations for all tfrecords compatible with the model, optionally using provided filters and filter_blank. Keyword Args: filters (dict, optional): Dataset filters to use for selecting slides. See :meth:`slideflow.Dataset.filter` for more information. Defaults to None. filter_blank (list(str) or str, optional): Skip slides that have blank values in these patient annotation columns. Defaults to None. min_tiles (int, optional): Only include slides with this minimum number of tiles. Defaults to 0. max_tiles (int, optional): Only include maximum of this many tiles per slide. Defaults to 0 (all tiles). outcomes (list, optional): Column header(s) in annotations file. Used for category-level comparisons. Defaults to None. layers (list(str)): Layers from which to generate activations. Defaults to 'postconv'. export (str): Path to CSV file. Save activations in CSV format. Defaults to None. cache (str): Path to PKL file. Cache activations at this location. Defaults to None. include_preds (bool): Generate and store logit predictions along with layer activations. Defaults to True. batch_size (int): Batch size to use when calculating activations. Defaults to 32. Returns: :class:`slideflow.DatasetFeatures` """ if dataset is None: raise ValueError( 'Argument "dataset" is required when "model" is ' 'an imagenet-pretrained model, or otherwise not a ' 'saved Slideflow model.' ) # Prepare dataset and annotations dataset = dataset.clip(max_tiles) if outcomes is not None: labels = dataset.labels(outcomes, format='name')[0] else: labels = None df = sf.DatasetFeatures(model=model, dataset=dataset, annotations=labels, **kwargs) return df @auto_dataset_allow_none def generate_feature_bags( self, model: Union[str, "BaseFeatureExtractor"], dataset: Optional[Dataset] = None, outdir: str = 'auto', *, filters: Optional[Dict] = None, filter_blank: Optional[Union[str, List[str]]] = None, min_tiles: int = 16, max_tiles: int = 0, **kwargs: Any ) -> str: """Generate bags of tile-level features for slides for use with MIL models. By default, features are exported to the ``pt_files`` folder within the project root directory. Args: model (str): Path to model from which to generate activations. May provide either this or "pt_files" dataset (:class:`slideflow.Dataset`, optional): Dataset from which to generate activations. If not supplied, calculate activations for all tfrecords compatible with the model, optionally using provided filters and filter_blank. outdir (str, optional): Save exported activations in .pt format. Defaults to 'auto' (project directory). Keyword Args: filters (dict, optional): Dataset filters to use for selecting slides. See :meth:`slideflow.Dataset.filter` for more information. Defaults to None. filter_blank (list(str) or str, optional): Skip slides that have blank values in these patient annotation columns. Defaults to None. min_tiles (int, optional): Only include slides with this minimum number of tiles. Defaults to 16. max_tiles (int, optional): Only include maximum of this many tiles per slide. Defaults to 0 (all tiles). layers (list): Which model layer(s) generate activations. If ``model`` is a saved model, this defaults to 'postconv'. Not used if ``model`` is pretrained feature extractor. Defaults to None. force_regenerate (bool): Forcibly regenerate activations for all slides even if .pt file exists. Defaults to False. min_tiles (int, optional): Minimum tiles per slide. Skip slides not meeting this threshold. Defaults to 16. batch_size (int): Batch size during feature calculation. Defaults to 32. slide_batch_size (int): Interleave feature calculation across this many slides. Higher values may improve performance but require more memory. Defaults to 16. num_gpus (int): Number of GPUs to use for feature extraction. Defaults to 0. **kwargs: Additional keyword arguments are passed to :class:`slideflow.DatasetFeatures`. Returns: Path to directory containing exported .pt files """ # Check if the model exists and has a valid parameters file if isinstance(model, str) and exists(model) and dataset is None: log.debug(f"Auto-building dataset from provided model {model}") config = sf.util.get_model_config(model) dataset = self.dataset( tile_px=config['tile_px'], tile_um=config['tile_um'], min_tiles=min_tiles ) elif dataset is None: raise ValueError( 'Argument "dataset" is required when "model" is ' 'an imagenet-pretrained model, or otherwise not a ' 'saved Slideflow model.' ) # Ensure min_tiles and max_tiles is applied to the dataset. # max_tiles has already been applied via @auto_dataset decorator. dataset = dataset.filter(min_tiles=min_tiles) # Prepare output directory if outdir.lower() == 'auto': # Check if the model is an architecture name # (for using an Imagenet pretrained model) if isinstance(model, str) and sf.model.is_extractor(model): outdir = join(self.root, 'pt_files', model) # Check if the model is a trained model elif isinstance(model, str) and exists(model): config = sf.util.get_model_config(model) if 'k_fold_i' in config: _end = f"_kfold{config['k_fold_i']}" else: _end = '' outdir = join( self.root, 'pt_files', config['model_name'] + _end ) # Otherwise, it's a pretrained feature extractor # and the subdirectory can be named by its tag. else: from slideflow.model.base import BaseFeatureExtractor if isinstance(model, BaseFeatureExtractor): outdir = join(self.root, 'pt_files', model.tag) # Generate feature bags. dataset.generate_feature_bags(model, outdir, **kwargs) return outdir @auto_dataset def generate_heatmaps( self, model: str, *, dataset: Dataset, filters: Optional[Dict] = None, filter_blank: Optional[Union[str, List[str]]] = None, min_tiles: int = 0, outdir: Optional[str] = None, resolution: str = 'low', batch_size: int = 32, roi_method: str = 'auto', num_threads: Optional[int] = None, img_format: str = 'auto', skip_completed: bool = False, verbose: bool = True, **kwargs: Any ) -> None: """Create predictive heatmap overlays on a set of slides. By default, heatmaps are saved in the ``heatmaps/`` folder in the project root directory. Args: model (str): Path to Tensorflow model. Keyword Args: dataset (:class:`slideflow.Dataset`, optional): Dataset from which to generate predictions. If not supplied, will generate predictions for all project tfrecords at the tile_px/tile_um matching the model, optionally using provided filters and filter_blank. filters (dict, optional): Dataset filters to use for selecting slides. See :meth:`slideflow.Dataset.filter` for more information. Defaults to None. filter_blank (list(str) or str, optional): Skip slides that have blank values in these patient annotation columns. Defaults to None. min_tiles (int, optional): Minimum tiles per slide. Skip slides not meeting this threshold. Defaults to 8. outdir (path, optional): Directory in which to save heatmap images. resolution (str, optional): Heatmap resolution. Defaults to 'low'. "low" uses a stride equal to tile width. "medium" uses a stride equal 1/2 tile width. "high" uses a stride equal to 1/4 tile width. batch_size (int, optional): Batch size during heatmap calculation. Defaults to 64. roi_method (str): Either 'inside', 'outside', 'auto', or 'ignore'. Determines how ROIs are used to extract tiles. If 'inside' or 'outside', will extract tiles in/out of an ROI, and raise errors.MissingROIError if an ROI is not available. If 'auto', will extract tiles inside an ROI if available, and across the whole-slide if no ROI is found. If 'ignore', will extract tiles across the whole-slide regardless of whether an ROI is available. Defaults to 'auto'. num_threads (int, optional): Number of workers threads for each tile extractor. Defaults to the total number of available CPU threads. img_format (str, optional): Image format (png, jpg) to use when extracting tiles from slide. Must match the image format the model was trained on. If 'auto', will use the format logged in the model params.json. skip_completed (bool, optional): Skip heatmaps for slides that already have heatmaps in target directory. show_roi (bool): Show ROI on heatmaps. interpolation (str): Interpolation strategy for predictions. Defaults to None. Includes all matplotlib imshow interpolation options. logit_cmap: Function or a dict used to create heatmap colormap. If None (default), separate heatmaps are generated for each category, with color representing category prediction. Each image tile will generate a list of preds of length O, If logit_cmap is a function, then the logit predictions will be passed, where O is the number of label categories. and the function is expected to return [R, G, B] values. If the logit_cmap is a dictionary, it should map 'r', 'g', and 'b' to label indices; the prediction for these label categories will be mapped to corresponding colors. Thus, the corresponding color will only reflect predictions of up to three labels. Example (this would map predictions for label 0 to red, 3 to green, etc): {'r': 0, 'g': 3, 'b': 1 } verbose (bool): Show verbose output. Defaults to True. vmin (float): Minimimum value to display on heatmap. Defaults to 0. vcenter (float): Center value for color display on heatmap. Defaults to 0.5. vmax (float): Maximum value to display on heatmap. Defaults to 1. """ # Prepare arguments for subprocess args = SimpleNamespace(**locals()) del args.self # Prepare dataset config = sf.util.get_model_config(model) args.rois = dataset.rois() # Set resolution / stride resolutions = {'low': 1, 'medium': 2, 'high': 4} try: stride_div = resolutions[resolution] except KeyError: raise ValueError(f"Invalid resolution '{resolution}'.") args.stride_div = stride_div args.verbosity = self.verbosity # Set logging level in subprocess args.img_format = img_format # Attempt to auto-detect supplied model name model_name = os.path.basename(model) if 'model_name' in config: model_name = config['model_name'] # Make output directory outdir = outdir if outdir else join(self.root, 'heatmaps', model_name) if not exists(outdir): os.makedirs(outdir) args.outdir = outdir # Verbose output if verbose: n_poss_slides = len(dataset.slides()) n_slides = len(dataset.slide_paths()) log.info("Generating heatmaps for {} slides.".format(n_slides)) log.info("Model: [green]{}".format(model)) log.info("Tile px: {}".format(config['tile_px'])) log.info("Tile um: {}".format(config['tile_um'])) # Any function loading a slide must be kept in an isolated process, # as loading >1 slide in a single process causes instability. # I suspect this is a libvips or openslide issue but I haven't been # able to identify the root cause. Isolating processes when multiple # slides are to be processed sequentially is a functional workaround. for slide in dataset.slide_paths(): name = path_to_name(slide) if (skip_completed and exists(join(outdir, f'{name}-custom.png'))): log.info(f'Skipping completed heatmap for slide {name}') return ctx = multiprocessing.get_context('spawn') process = ctx.Process(target=project_utils._heatmap_worker, args=(slide, args, kwargs)) process.start() process.join() def generate_mosaic( self, df: "DatasetFeatures", dataset: Optional[Dataset] = None, *, filters: Optional[Dict] = None, filter_blank: Optional[Union[str, List[str]]] = None, outcomes: Optional[Union[str, List[str]]] = None, map_slide: Optional[str] = None, show_prediction: Optional[Union[int, str]] = None, predict_on_axes: Optional[List[int]] = None, max_tiles: int = 0, umap_cache: Optional[str] = None, use_float: bool = False, low_memory: bool = False, use_norm: bool = True, umap_kwargs: Dict = {}, **kwargs: Any ) -> sf.Mosaic: """Generate a mosaic map. See :ref:`Mosaic maps <mosaic_map>` for more information. Args: df (:class:`slideflow.DatasetFeatures`): Dataset. dataset (:class:`slideflow.Dataset`, optional): Dataset from which to generate mosaic. If not supplied, will generate mosaic for all tfrecords at the tile_px/tile_um matching the supplied model, optionally using filters/filter_blank. Keyword Args: filters (dict, optional): Dataset filters to use for selecting slides. See :meth:`slideflow.Dataset.filter` for more information. Defaults to None. filter_blank (list(str) or str, optional): Skip slides that have blank values in these patient annotation columns. Defaults to None. outcomes (list, optional): Column name in annotations file from which to read category labels. map_slide (str, optional): None (default), 'centroid' or 'average'. If provided, will map slides using slide-level calculations, either mapping centroid tiles if 'centroid', or calculating node averages across tiles in a slide and mapping slide-level node averages, if 'average'. show_prediction (int or str, optional): May be either int or str, corresponding to label category. Predictions for this category will be displayed on the exported UMAP plot. max_tiles (int, optional): Limits tiles taken from each slide. Defaults to 0. umap_cache (str, optional): Path to PKL file in which to save/cache UMAP coordinates. Defaults to None. use_float (bool, optional): Interpret labels as continuous instead of categorical. Defaults to False. umap_kwargs (dict, optional): Dictionary of keyword arguments to pass to the UMAP function. low_memory (bool, optional): Limit memory during UMAP calculations. Defaults to False. use_norm (bool, optional): Display image tiles using the normalizer used during model training (if applicable). Detected from a model's metadata file (params.json). Defaults to True. figsize (Tuple[int, int], optional): Figure size. Defaults to (200, 200). num_tiles_x (int): Specifies the size of the mosaic map grid. expanded (bool): Deprecated argument. Returns: :class:`slideflow.Mosaic`: Mosaic object. """ # Set up paths stats_root = join(self.root, 'stats') mosaic_root = join(self.root, 'mosaic') if not exists(stats_root): os.makedirs(stats_root) if not exists(mosaic_root): os.makedirs(mosaic_root) # Prepare dataset & model if isinstance(df.model, str): config = sf.util.get_model_config(df.model) else: raise ValueError( "Unable to auto-create Mosaic from DatasetFeatures created " "from a loaded Tensorflow/PyTorch model. Please use a " "DatasetFeatures object created from a saved Slideflow model, " "or manually create a mosaic with `sf.Mosaic`.") if dataset is None: tile_px, tile_um = config['hp']['tile_px'], config['hp']['tile_um'] dataset = self.dataset(tile_px=tile_px, tile_um=tile_um) else: dataset._assert_size_matches_hp(config['hp']) tile_px = dataset.tile_px # Filter and clip dataset dataset = dataset.filter(filters=filters, filter_blank=filter_blank) dataset = dataset.clip(max_tiles) # Get TFrecords, and prepare a list for focus, if requested tfr = dataset.tfrecords() n_slides = len([t for t in tfr if path_to_name(t) in df.slides]) log.info(f'Generating mosaic from {n_slides} slides') # If a header category is supplied and we are not showing predictions, # then assign slide labels from annotations model_type = config['model_type'] if model_type == 'regression': use_float = True if outcomes and (show_prediction is None): labels, _ = dataset.labels(outcomes, use_float=use_float, format='name') else: labels = {} # type: ignore # If showing predictions, try to automatically load prediction labels if (show_prediction is not None) and (not use_float): outcome_labels = config['outcome_labels'] model_type = model_type if model_type else config['model_type'] log.info(f'Loaded pred labels found at [green]{df.model}') # Create mosaic map from UMAP of layer activations umap = sf.SlideMap.from_features( df, map_slide=map_slide, low_memory=low_memory, **umap_kwargs ) if umap_cache: umap.save_coordinates(umap_cache) # If displaying centroid AND predictions, show slide-level predictions # rather than tile-level predictions if (map_slide == 'centroid') and show_prediction is not None: log.info('Showing slide-level predictions at point of centroid') # If not model has not been assigned, assume classification model model_type = model_type if model_type else 'classification' # Get predictions if model_type == 'classification': s_pred = df.softmax_predict() s_perc = df.softmax_percent() else: s_pred = s_perc = df.softmax_mean() # type: ignore # If show_prediction is provided (either a number or string), # then display ONLY the prediction for the provided category if type(show_prediction) == int: log.info(f'Showing preds for {show_prediction} as colormap') labels = { k: v[show_prediction] for k, v in s_perc.items() } show_prediction = None elif type(show_prediction) == str: log.info(f'Showing preds for {show_prediction} as colormap') reversed_labels = {v: k for k, v in outcome_labels.items()} if show_prediction not in reversed_labels: raise ValueError(f"Unknown category '{show_prediction}'") labels = { k: v[int(reversed_labels[show_prediction])] for k, v in s_perc.items() } show_prediction = None elif use_float: # Displaying linear predictions needs to be implemented here raise NotImplementedError( "Showing slide preds not supported for regression models." ) # Otherwise, show_prediction is assumed to be just "True", # in which case show categorical predictions else: try: labels = { k: outcome_labels[v] for k, v in s_pred.items() } except KeyError: # Try interpreting prediction label keys as strings labels = { k: outcome_labels[str(v)] for k, v in s_pred.items() } if labels: umap.label_by_slide(labels) if show_prediction and (map_slide != 'centroid'): umap.label('predictions', translate=outcome_labels) umap.filter(dataset.slides()) mosaic = sf.Mosaic( umap, tfrecords=dataset.tfrecords(), normalizer=(df.normalizer if use_norm else None), **kwargs ) return mosaic def generate_mosaic_from_annotations( self, header_x: str, header_y: str, *, dataset: Dataset, model: Optional[str] = None, outcomes: Optional[Union[str, List[str]]] = None, max_tiles: int = 100, use_optimal_tile: bool = False, cache: Optional[str] = None, batch_size: int = 32, **kwargs: Any ) -> sf.Mosaic: """Generate a mosaic map with manually supplied x/y coordinates. Slides are mapped with slide-level annotations, with x-axis determined from ``header_x``, y-axis from ``header_y``. If ``use_optimal_tile=False`` and no model is provided, the first image tile in each TFRecord will be displayed. If optimal_tile is True, layer activations for all tiles in each slide are calculated using the provided model, and the tile nearest to centroid is used. Args: header_x (str): Annotations file header with X-axis coords. header_y (str): Annotations file header with Y-axis coords. Keyword Args: dataset (:class:`slideflow.Dataset`): Dataset object. model (str, optional): Path to model to use when generating layer activations. Defaults to None. If not provided, mosaic will not be calculated or saved. If provided, saved in project mosaic directory. outcomes (list(str)): Column name(s) in annotations file from which to read category labels. max_tiles (int, optional): Limits the number of tiles taken from each slide. Defaults to 0. use_optimal_tile (bool, optional): Use model to calculate layer activations for all tiles in each slide, and choosing tile nearest centroid for each slide for display. cache (str, optional): Path to PKL file to cache node activations. Defaults to None. batch_size (int, optional): Batch size for model. Defaults to 64. figsize (Tuple[int, int], optional): Figure size. Defaults to (200, 200). num_tiles_x (int): Specifies the size of the mosaic map grid. expanded (bool): Deprecated argument. Returns: slideflow.Mosaic """ # Setup paths stats_root = join(self.root, 'stats') mosaic_root = join(self.root, 'mosaic') if not exists(stats_root): os.makedirs(stats_root) if not exists(mosaic_root): os.makedirs(mosaic_root) # Filter dataset to exclude slides blank in the x and y header columns dataset = dataset.filter(filter_blank=[header_x, header_y]) dataset = dataset.clip(max_tiles) # We are assembling a list of slides from the TFRecords path list, # because we only want to use slides that have a corresponding TFRecord # (some slides did not have a large enough ROI for tile extraction # & some slides may be in the annotations but are missing a slide) slides = [path_to_name(tfr) for tfr in dataset.tfrecords()] labels, _ = dataset.labels([header_x, header_y], use_float=True) umap_x = np.array([labels[slide][0] # type: ignore for slide in slides]) umap_y = np.array([labels[slide][1] # type: ignore for slide in slides]) if use_optimal_tile and model is None: raise ValueError("Optimal tile calculation requires a model.") elif use_optimal_tile: # Calculate most representative tile in each TFRecord for display assert model is not None df = sf.DatasetFeatures(model=model, dataset=dataset, batch_size=batch_size, cache=cache) opt_ind, _ = sf.stats.calculate_centroid(df.activations) # Restrict mosaic to only slides that had enough tiles to # calculate an optimal index from centroid success_slides = list(opt_ind.keys()) sf.util.multi_warn( slides, lambda x: x not in success_slides, 'Unable to calculate optimal tile for {}, skipping' ) umap_x = np.array([ labels[slide][0] # type: ignore for slide in success_slides ]) umap_y = np.array([ labels[slide][1] # type: ignore for slide in success_slides ]) umap_slides = np.array(success_slides) umap_tfr_idx = np.array([ opt_ind[slide] for slide in success_slides ]) else: # Take the first tile from each slide/TFRecord umap_slides = np.array(slides) umap_tfr_idx = np.zeros(len(slides)) umap = sf.SlideMap.from_xy( x=umap_x, y=umap_y, slides=umap_slides, tfr_index=umap_tfr_idx, ) if outcomes is not None: slide_to_category, _ = dataset.labels(outcomes, format='name') umap.label_by_slide(slide_to_category) mosaic = sf.Mosaic( umap, tfrecords=dataset.tfrecords(), tile_select='centroid' if use_optimal_tile else 'first', **kwargs ) return mosaic def generate_tfrecord_heatmap( self, tfrecord: str, tile_px: int, tile_um: Union[int, str], tile_dict: Dict[int, float], filename: Optional[str] = None ) -> None: """Create a tfrecord-based WSI heatmap. Uses a dictionary of tile values for heatmap display, saving to project root directory. Args: tfrecord (str): Path to tfrecord tile_dict (dict): Dictionary mapping tfrecord indices to a tile-level value for display in heatmap format tile_px (int): Tile width in pixels tile_um (int or str): Tile width in microns (int) or magnification (str, e.g. "20x"). filename (str, optional): Destination path to save heatmap. Defaults to saving as ``{slide_name}.png`` in the project root directory. Returns: None """ dataset = self.dataset(tile_px=tile_px, tile_um=tile_um) if filename is None: filename = join(self.root, sf.util.path_to_name(tfrecord) + '.png') dataset.tfrecord_heatmap(tfrecord, tile_dict, filename) def inspect_tfrecords(self): """Inspect TFRecords in the project dataset configuration.""" from rich import print as rprint config = sf.util.load_json(self.dataset_config) rprint("[b]Dataset sources:[/]") for source in self.sources: rprint(". {}".format(source)) if source not in config: rprint(" {}: Source not found in dataset" " configuration".format(source)) continue if 'tfrecords' not in config[source]: rprint(" {}: TFRecords directory not set".format(source)) continue tfr_path = config[source]['tfrecords'] if not exists(tfr_path): rprint(" {}: TFRecords directory not found".format(source)) continue subdirs = [f for f in os.listdir(tfr_path) if isdir(join(tfr_path, f))] for subdir in subdirs: # Check if this is a valid subdir with a tile size label # (e.g. "256px_10um" or "256px_20x") if re.match(r'\d+px_\d+(um|x)$', subdir): px_str, um_str = subdir.split('_') _tile_px = px_str.split('px')[0] _tile_um = um_str.split('um')[0] if 'um' in um_str else um_str.split('x')[0] tfr_files = [f for f in os.listdir(join(tfr_path, subdir)) if f.endswith('.tfrecords')] rprint(" tile_px={}, tile_um={}: {} TFRecords".format( _tile_px, _tile_um, len(tfr_files) )) def dataset( self, tile_px: Optional[int] = None, tile_um: Optional[Union[int, str]] = None, *, verification: Optional[str] = 'both', **kwargs: Any ) -> Dataset: """Return a :class:`slideflow.Dataset` object using project settings. Args: tile_px (int): Tile size in pixels tile_um (int or str): Tile size in microns (int) or magnification (str, e.g. "20x"). Keyword Args: filters (dict, optional): Dataset filters to use for selecting slides. See :meth:`slideflow.Dataset.filter` for more information. Defaults to None. filter_blank (list(str) or str, optional): Skip slides that have blank values in these patient annotation columns. Defaults to None. min_tiles (int, optional): Min tiles a slide must have. Defaults to 0. config (str, optional): Path to dataset configuration JSON file. Defaults to project default. sources (str, list(str), optional): Dataset sources to use from configuration. Defaults to project default. verification (str, optional): 'tfrecords', 'slides', or 'both'. If 'slides', verify all annotations are mapped to slides. If 'tfrecords', check that TFRecords exist and update manifest. Defaults to 'both'. """ if 'config' not in kwargs: kwargs['config'] = self.dataset_config if 'sources' not in kwargs: kwargs['sources'] = self.sources try: if self.annotations and exists(self.annotations): annotations = self.annotations else: annotations = None dataset = Dataset( tile_px=tile_px, tile_um=tile_um, annotations=annotations, **kwargs ) except FileNotFoundError: raise errors.DatasetError('No datasets configured.') if verification in ('both', 'slides'): log.debug("Verifying slide annotations...") dataset.verify_annotations_slides() if verification in ('both', 'tfrecords'): log.debug("Verifying tfrecords...") dataset.update_manifest() return dataset @auto_dataset def predict( self, model: str, *, dataset: Dataset, filters: Optional[Dict] = None, filter_blank: Optional[Union[str, List[str]]] = None, min_tiles: int = 0, checkpoint: Optional[str] = None, eval_k_fold: Optional[int] = None, splits: str = "splits.json", max_tiles: int = 0, batch_size: int = 32, format: str = 'csv', input_header: Optional[Union[str, List[str]]] = None, mixed_precision: bool = True, allow_tf32: bool = False, load_method: str = 'weights', custom_objects: Optional[Dict[str, Any]] = None, **kwargs: Any ) -> Dict[str, pd.DataFrame]: """Generate model predictions on a set of tfrecords. Args: model (str): Path to model to evaluate. Keyword Args: dataset (:class:`slideflow.Dataset`, optional): Dataset from which to generate predictions. If not supplied, will generate predictions for all project tfrecords at the tile_px/tile_um matching the model, optionally using provided filters and filter_blank. filters (dict, optional): Dataset filters to use for selecting slides. See :meth:`slideflow.Dataset.filter` for more information. Defaults to None. filter_blank (list(str) or str, optional): Skip slides that have blank values in these patient annotation columns. Defaults to None. min_tiles (int, optional): Min tiles a slide must have to be included. Defaults to 0. checkpoint (str, optional): Path to cp.ckpt file, if evaluating a saved checkpoint. Defaults to None. eval_k_fold (int, optional): K-fold iteration number to evaluate. If None, will evaluate all tfrecords irrespective of K-fold. Defaults to None. splits (str, optional): Filename of JSON file in which to log training/validation splits. Looks for filename in project root directory. Defaults to "splits.json". max_tiles (int, optional): Maximum number of tiles from each slide to evaluate. If zero, will include all tiles. Defaults to 0. batch_size (int, optional): Batch size to use during prediction. Defaults to 32. format (str, optional): Format in which to save predictions. Either 'csv', 'feather', or 'parquet'. Defaults to 'parquet'. input_header (str, optional): Annotation column header to use as additional input. Defaults to None. mixed_precision (bool, optional): Enable mixed precision. Defaults to True. allow_tf32 (bool): Allow internal use of Tensorfloat-32 format. Defaults to False. load_method (str): Either 'full' or 'weights'. Method to use when loading a Tensorflow model. If 'full', loads the model with ``tf.keras.models.load_model()``. If 'weights', will read the ``params.json`` configuration file, build the model architecture, and then load weights from the given model with ``Model.load_weights()``. Loading with 'full' may improve compatibility across Slideflow versions. Loading with 'weights' may improve compatibility across hardware & environments. reduce_method (str, optional): Reduction method for calculating slide-level and patient-level predictions for categorical outcomes. Options include 'average', 'mean', 'proportion', 'median', 'sum', 'min', 'max', or a callable function. 'average' and 'mean' are synonymous, with both options kept for backwards compatibility. If 'average' or 'mean', will reduce with average of each logit across tiles. If 'proportion', will convert tile predictions into onehot encoding then reduce by averaging these onehot values. For all other values, will reduce with the specified function, applied via the pandas ``DataFrame.agg()`` function. Defaults to 'average'. custom_objects (dict, Optional): Dictionary mapping names (strings) to custom classes or functions. Defaults to None. Returns: Dictionary of predictions dataframes, with the keys 'tile', 'slide', and 'patient'. """ # Perform evaluation log.info('Predicting model results') trainer, eval_dts = self._prepare_trainer( model=model, dataset=dataset, checkpoint=checkpoint, eval_k_fold=eval_k_fold, splits=splits, max_tiles=max_tiles, input_header=input_header, mixed_precision=mixed_precision, allow_tf32=allow_tf32, load_method=load_method, custom_objects=custom_objects, ) # Load the model if isinstance(model, str): trainer.load(model, training=False) if checkpoint: if trainer.feature_sizes: n_features = sum(trainer.feature_sizes) else: n_features = 0 trainer.model = trainer.hp.build_model( labels=trainer.labels, num_slide_features=n_features ) trainer.model.load_weights(checkpoint) # Predict results = trainer.predict( dataset=eval_dts, batch_size=batch_size, format=format, **kwargs ) return results def predict_ensemble( self, model: str, k: Optional[int] = None, epoch: Optional[int] = None, **kwargs ) -> None: """Evaluate an ensemble of models on a given set of tfrecords. Args: model (str): Path to ensemble model to evaluate. Keyword Args: k (int, optional): The k-fold number to be considered to run the prediction. By default it sets to the first k-fold present in the ensemble folder. epoch (int, optional): The epoch number to be considered to run the prediction. By default it sets to the first epoch present in the selected k-fold folder. **kwargs (Any): All keyword arguments accepted by :meth:`slideflow.Project.predict()` """ if not exists(model): raise OSError(f"Path {model} not found") config = sf.util.get_ensemble_model_config(model) outcomes = f"{'-'.join(config['outcomes'])}" model_name = f"eval-ensemble-{outcomes}" main_eval_dir = sf.util.get_new_model_dir(self.eval_dir, model_name) member_paths = sorted([ join(model, x) for x in os.listdir(model) if isdir(join(model, x)) ]) # Generate predictions from each ensemble member, # and merge predictions into a single dataframe. for member_id, member_path in enumerate(member_paths): if k: _k_path = get_matching_directory(member_path, f'kfold{k}') else: _k_path = get_first_nested_directory(member_path) if epoch: prediction_path = get_matching_directory( _k_path, f'epoch{epoch}' ) else: prediction_path = get_first_nested_directory(_k_path) # Update the current evaluation directory. member_eval_dir = sf.util.get_new_model_dir( main_eval_dir, f"ensemble_{member_id+1}" ) with self._set_eval_dir(member_eval_dir): self.predict(prediction_path, **kwargs) # If this is the first ensemble member, copy the slide manifest # and params.json file into the ensemble prediction folder. if member_id == 0: _, path = sf.util.get_valid_model_dir(self.eval_dir) shutil.copyfile( join(self.eval_dir, path[0], "slide_manifest.csv"), join(main_eval_dir, "slide_manifest.csv") ) params = sf.util.load_json( join(self.eval_dir, path[0], "params.json") ) params['ensemble_epochs'] = params['hp']['epochs'] del params['hp'] sf.util.write_json( params, join(main_eval_dir, "ensemble_params.json") ) # Create (or add to) the ensemble dataframe. for level in ('slide', 'tile'): project_utils.add_to_ensemble_dataframe( ensemble_path=main_eval_dir, kfold_path=join(self.eval_dir, path[0]), level=level, member_id=member_id ) # Create new ensemble columns and rename fixed columns. for level in ('tile', 'slide'): project_utils.update_ensemble_dataframe_headers( ensemble_path=main_eval_dir, level=level, ) @auto_dataset def predict_wsi( self, model: str, outdir: str, *, dataset: Dataset, filters: Optional[Dict] = None, filter_blank: Optional[Union[str, List[str]]] = None, min_tiles: int = 0, stride_div: int = 1, enable_downsample: bool = True, roi_method: str = 'auto', source: Optional[str] = None, img_format: str = 'auto', randomize_origin: bool = False, **kwargs: Any ) -> None: """Generate a map of predictions across a whole-slide image. Args: model (str): Path to model from which to generate predictions. outdir (str): Directory for saving WSI predictions in .pkl format. Keyword Args: dataset (:class:`slideflow.Dataset`, optional): Dataset from which to generate activations. If not supplied, will calculate activations for all tfrecords at the tile_px/tile_um matching the supplied model. filters (dict, optional): Dataset filters to use for selecting slides. See :meth:`slideflow.Dataset.filter` for more information. Defaults to None. filter_blank (list(str) or str, optional): Skip slides that have blank values in these patient annotation columns. Defaults to None. min_tiles (int, optional): Min tiles a slide must have to be included. Defaults to 0. stride_div (int, optional): Stride divisor for extracting tiles. A stride of 1 will extract non-overlapping tiles. A stride_div of 2 will extract overlapping tiles, with a stride equal to 50% of the tile width. Defaults to 1. enable_downsample (bool, optional): Enable downsampling for slides. This may result in corrupted image tiles if downsampled slide layers are corrupted or incomplete. Defaults to True. roi_method (str): Either 'inside', 'outside', 'auto', or 'ignore'. Determines how ROIs are used to extract tiles. If 'inside' or 'outside', will extract tiles in/out of an ROI, and raise errors.MissingROIError if an ROI is not available. If 'auto', will extract tiles inside an ROI if available, and across the whole-slide if no ROI is found. If 'ignore', will extract tiles across the whole-slide regardless of whether an ROI is available. Defaults to 'auto'. source (list, optional): Name(s) of dataset sources from which to get slides. If None, will use all. img_format (str, optional): Image format (png, jpg) to use when extracting tiles from slide. Must match the image format the model was trained on. If 'auto', will use the format logged in the model params.json. randomize_origin (bool, optional): Randomize pixel starting position during extraction. Defaults to False. whitespace_fraction (float, optional): Range 0-1. Defaults to 1. Discard tiles with this fraction of whitespace. If 1, will not perform whitespace filtering. whitespace_threshold (int, optional): Range 0-255. Defaults to 230. Threshold above which a pixel (RGB average) is whitespace. grayspace_fraction (float, optional): Range 0-1. Defaults to 0.6. Discard tiles with this fraction of grayspace. If 1, will not perform grayspace filtering. grayspace_threshold (float, optional): Range 0-1. Defaults to 0.05. Pixels in HSV format with saturation below this are grayspace. """ log.info('Generating WSI prediction / activation maps...') if not exists(outdir): os.makedirs(outdir) if source: sources = sf.util.as_list(source) else: sources = self.sources if dataset.tile_px is None or dataset.tile_um is None: raise errors.DatasetError( "Dataset must have non-zero tile_px and tile_um" ) # Prepare dataset & model if img_format == 'auto': config = sf.util.get_model_config(model) img_format = config['img_format'] # Log extraction parameters sf.slide.log_extraction_params(**kwargs) for source in sources: log.info(f'Working on dataset source [bold]{source}') if dataset._roi_set(source): roi_dir = dataset.sources[source]['roi'] else: roi_dir = None # Prepare list of slides for extraction slide_list = dataset.slide_paths(source=source) log.info(f'Generating predictions for {len(slide_list)} slides') # Verify slides and estimate total number of tiles log.info('Verifying slides...') total_tiles = 0 from rich.progress import track for slide_path in track(slide_list, transient=True): try: slide = sf.WSI(slide_path, dataset.tile_px, dataset.tile_um, stride_div, roi_dir=roi_dir, roi_method=roi_method) except errors.SlideError as e: log.error(e) else: n_est = slide.estimated_num_tiles log.debug(f"Estimated tiles for {slide.name}: {n_est}") total_tiles += n_est finally: del slide log.info(f'Total estimated tiles: {total_tiles}') # Predict for each WSI for slide_path in slide_list: log.info(f'Working on slide {path_to_name(slide_path)}') try: wsi = sf.WSI(slide_path, dataset.tile_px, dataset.tile_um, stride_div, enable_downsample=enable_downsample, roi_dir=roi_dir, roi_method=roi_method, origin='random' if randomize_origin else (0,0)) except errors.SlideLoadError as e: log.error(e) continue except errors.MissingROIError as e: log.error(e) continue try: interface = sf.model.Features(model, include_preds=False) wsi_grid = interface(wsi, img_format=img_format) with open(join(outdir, wsi.name+'.pkl'), 'wb') as file: pickle.dump(wsi_grid, file) except errors.TileCorruptionError: log.error(f'[green]{path_to_name(slide_path)}[/] is ' 'corrupt; skipping slide') continue def save(self) -> None: """Save current project configuration as ``settings.json``.""" sf.util.write_json(self._settings, join(self.root, 'settings.json')) def _get_smac_runner( self, outcomes: Union[str, List[str]], params: sf.ModelParams, metric: Union[str, Callable], n_replicates: int, train_kwargs: Any ) -> Callable: """Build a SMAC3 optimization runner. Args: outcomes (str, List[str]): Outcome label annotation header(s). params (sf.ModelParams): Model parameters for training. metric (str or Callable): Metric to monitor for optimization. May be callable function or a str. If a callable function, must accept the epoch results dict and return a float value. If a str, must be a valid metric, such as 'tile_auc', 'patient_auc', 'r_squared', etc. train_kwargs (dict): Dict of keyword arguments used for the Project.train() function call. Raises: errors.SMACError: If training does not return the given metric. Returns: Callable: tae_runner for SMAC optimization. """ def smac_runner(config): """SMAC tae_runner function.""" # Load hyperparameters from SMAC configuration, handling "None". c = dict(config) if 'normalizer' in c and c['normalizer'].lower() == 'none': c['normalizer'] = None if ('normalizer_source' in c and c['normalizer_source'].lower() == 'none'): c['normalizer_source'] = None all_results = [] for _ in range(n_replicates): # Train model(s). pretty = json.dumps(c, indent=2) log.info(f"Training model with config={pretty}") params.load_dict(c) _prior_logging_level = sf.getLoggingLevel() sf.setLoggingLevel(40) results = self.train( outcomes=outcomes, params=params, **train_kwargs ) sf.setLoggingLevel(_prior_logging_level) # Interpret results. model_name = list(results.keys())[0] last_epoch = sorted(list(results[model_name]['epochs'].keys()), key=lambda x: int(x.replace("epoch", "")))[-1] if len(results[model_name]['epochs']) > 1: log.warning(f"Ambiguous epoch for SMAC. Using '{last_epoch}'.") epoch_results = results[model_name]['epochs'][last_epoch] # Determine metric for optimization. if callable(metric): result = metric(epoch_results) elif metric not in epoch_results: raise errors.SMACError(f"Metric '{metric}' not returned from " "training, unable to optimize.") else: if outcomes not in epoch_results[metric]: raise errors.SMACError( f"Unable to interpret metric {metric} (epoch results: " f"{epoch_results})") result = 1 - mean(epoch_results[metric][outcomes]) all_results.append(result) # Average results across iterations log.info("[green]Result ({})[/]: {:.4f}".format( 'custom' if callable(metric) else f'1-{metric}', result )) return mean(all_results) return smac_runner def smac_search( self, outcomes: Union[str, List[str]], params: ModelParams, smac_configspace: "ConfigurationSpace", exp_label: str = "SMAC", smac_limit: int = 10, smac_metric: str = 'tile_auc', smac_replicates: int = 1, save_checkpoints: bool = False, save_model: bool = False, save_predictions: Union[bool, str] = False, **train_kwargs: Any ) -> Tuple["Configuration", pd.DataFrame]: """Train a model using SMAC3 Bayesian hyperparameter optimization. See :ref:`Bayesian optimization <bayesian_optimization>` for more information. .. note:: The hyperparameter optimization is performed with `SMAC3 <https://automl.github.io/SMAC3/master/>`_ and requires the ``smac`` package available from pip. Args: outcomes (str, List[str]): Outcome label annotation header(s). params (ModelParams): Model parameters for training. smac_configspace (ConfigurationSpace): ConfigurationSpace to determine the SMAC optimization. smac_limit (int): Max number of models to train during optimization. Defaults to 10. smac_metric (str, optional): Metric to monitor for optimization. May either be a callable function or a str. If a callable function, must accept the epoch results dict and return a float value. If a str, must be a valid metric, such as 'tile_auc', 'patient_auc', 'r_squared', etc. Defaults to 'tile_auc'. save_checkpoints (bool): Save model checkpoints. Defaults to False. save_model (bool): Save each trained model. Defaults to False. save_predictions (bool or str, optional): Save tile, slide, and patient-level predictions at each evaluation. May be 'csv', 'feather', or 'parquet'. If False, will not save predictions. Defaults to False. Returns: Tuple: Configuration: Optimal hyperparameter configuration returned by SMAC4BB.optimize(). pd.DataFrame: History of hyperparameters resulting metrics. """ from smac.facade.smac_bb_facade import SMAC4BB # noqa: F811 from smac.scenario.scenario import Scenario # Perform SMAC search in a single model folder. smac_path = sf.util.get_new_model_dir(self.models_dir, exp_label) _initial_models_dir = self.models_dir self.models_dir = smac_path # Create SMAC scenario. scenario = Scenario( {'run_obj': 'quality', # Optimize quality (alternatively: runtime) 'runcount-limit': smac_limit, # Max # of function evaluations 'cs': smac_configspace}, {'output_dir': self.models_dir}) train_kwargs['save_checkpoints'] = save_checkpoints train_kwargs['save_model'] = save_model train_kwargs['save_predictions'] = save_predictions smac = SMAC4BB( scenario=scenario, tae_runner=self._get_smac_runner( outcomes=outcomes, params=params, metric=smac_metric, train_kwargs=train_kwargs, n_replicates=smac_replicates, ) ) # Log. log.info("Performing Bayesian hyperparameter optimization with SMAC") log.info( "=== SMAC config ==========================================\n" "[bold]Options:[/]\n" f"Metric: {smac_metric}\n" f"Limit: {smac_limit}\n" f"Model replicates: {smac_replicates}\n" "[bold]Base parameters:[/]\n" f"{params}\n\n" "[bold]Configuration space:[/]\n" f"{smac_configspace}\n" "==========================================================" ) # Optimize. best_config = smac.optimize() log.info(f"Best configuration after SMAC optimization: {best_config}") # Process history and write to dataframe. configs = smac.runhistory.get_all_configs() history = pd.DataFrame([c.get_dictionary() for c in configs]) history['metric'] = [smac.runhistory.get_cost(c) for c in configs] history.to_csv(join(self.models_dir, 'run_history.csv'), index=False) self.models_dir = _initial_models_dir return best_config, history def train( self, outcomes: Union[str, List[str]], params: Union[str, ModelParams, List[ModelParams], Dict[str, ModelParams]], *, dataset: Optional[sf.Dataset] = None, exp_label: Optional[str] = None, filters: Optional[Dict] = None, filter_blank: Optional[Union[str, List[str]]] = None, input_header: Optional[Union[str, List[str]]] = None, min_tiles: int = 0, max_tiles: int = 0, splits: str = "splits.json", mixed_precision: bool = True, allow_tf32: bool = False, load_method: str = 'weights', balance_headers: Optional[Union[str, List[str]]] = None, process_isolate: bool = False, **training_kwargs: Any ) -> Dict: """Train model(s). Models are trained using a given set of parameters, outcomes, and (optionally) slide-level inputs. See :ref:`Training <training>` for more information. Examples Method 1 (hyperparameter sweep from a configuration file): >>> P.train('outcome', params='sweep.json', ...) Method 2 (manually specified hyperparameters): >>> hp = sf.ModelParams(...) >>> P.train('outcome', params=hp, ...) Method 3 (list of hyperparameters): >>> hp = [sf.ModelParams(...), sf.ModelParams(...)] >>> P.train('outcome', params=hp, ...) Method 4 (dict of hyperparameters): >>> hp = {'HP0': sf.ModelParams(...), ...} >>> P.train('outcome', params=hp, ...) Args: outcomes (str or list(str)): Outcome label annotation header(s). params (:class:`slideflow.ModelParams`, list, dict, or str): Model parameters for training. May provide one ``ModelParams``, a list, or dict mapping model names to params. If multiple params are provided, will train models for each. If JSON file is provided, will interpret as a hyperparameter sweep. See examples below for use. Keyword Args: exp_label (str, optional): Experiment label to add model names. filters (dict, optional): Dataset filters to use for selecting slides. See :meth:`slideflow.Dataset.filter` for more information. Defaults to None. filter_blank (list(str) or str, optional): Skip slides that have blank values in these patient annotation columns. Defaults to None. input_header (list, optional): List of annotation column headers to use as additional slide-level model input. Defaults to None. min_tiles (int): Minimum number of tiles a slide must have to include in training. Defaults to 0. max_tiles (int): Only use up to this many tiles from each slide for training. Defaults to 0 (include all tiles). splits (str, optional): Filename of JSON file in which to log train/val splits. Looks for filename in project root directory. Defaults to "splits.json". mixed_precision (bool, optional): Enable mixed precision. Defaults to True. allow_tf32 (bool): Allow internal use of Tensorfloat-32 format. Defaults to False. load_method (str): Either 'full' or 'weights'. Method to use when loading a Tensorflow model. If 'full', loads the model with ``tf.keras.models.load_model()``. If 'weights', will read the ``params.json`` configuration file, build the model architecture, and then load weights from the given model with ``Model.load_weights()``. Loading with 'full' may improve compatibility across Slideflow versions. Loading with 'weights' may improve compatibility across hardware & environments. balance_headers (str or list(str)): Annotation header(s) specifying labels on which to perform mini-batch balancing. If performing category-level balancing and this is set to None, will default to balancing on outcomes. Defaults to None. val_strategy (str): Validation dataset selection strategy. Options include bootstrap, k-fold, k-fold-manual, k-fold-preserved-site, fixed, and none. Defaults to 'k-fold'. val_k_fold (int): Total number of K if using K-fold validation. Defaults to 3. val_k (int): Iteration of K-fold to train, starting at 1. Defaults to None (training all k-folds). val_k_fold_header (str): Annotations file header column for manually specifying k-fold or for preserved-site cross validation. Only used if validation strategy is 'k-fold-manual' or 'k-fold-preserved-site'. Defaults to None for k-fold-manual and 'site' for k-fold-preserved-site. val_fraction (float): Fraction of dataset to use for validation testing, if strategy is 'fixed'. val_source (str): Dataset source to use for validation. Defaults to None (same as training). val_annotations (str): Path to annotations file for validation dataset. Defaults to None (same as training). val_filters (dict): Filters to use for validation dataset. See :meth:`slideflow.Dataset.filter` for more information. Defaults to None (same as training). checkpoint (str, optional): Path to cp.ckpt from which to load weights. Defaults to None. pretrain (str, optional): Either 'imagenet' or path to Tensorflow model from which to load weights. Defaults to 'imagenet'. multi_gpu (bool): Train using multiple GPUs when available. Defaults to False. reduce_method (str, optional): Reduction method for calculating slide-level and patient-level predictions for categorical outcomes. Options include 'average', 'mean', 'proportion', 'median', 'sum', 'min', 'max', or a callable function. 'average' and 'mean' are synonymous, with both options kept for backwards compatibility. If 'average' or 'mean', will reduce with average of each logit across tiles. If 'proportion', will convert tile predictions into onehot encoding then reduce by averaging these onehot values. For all other values, will reduce with the specified function, applied via the pandas ``DataFrame.agg()`` function. Defaults to 'average'. resume_training (str, optional): Path to model to continue training. Only valid in Tensorflow backend. Defaults to None. starting_epoch (int): Start training at the specified epoch. Defaults to 0. steps_per_epoch_override (int): If provided, will manually set the number of steps in an epoch. Default epoch length is the number of total tiles. save_predictions (bool or str, optional): Save tile, slide, and patient-level predictions at each evaluation. May be 'csv', 'feather', or 'parquet'. If False, will not save predictions. Defaults to 'parquet'. save_model (bool, optional): Save models when evaluating at specified epochs. Defaults to True. validate_on_batch (int): Perform validation every N batches. Defaults to 0 (only at epoch end). validation_batch_size (int): Validation dataset batch size. Defaults to 32. use_tensorboard (bool): Add tensorboard callback for realtime training monitoring. Defaults to True. validation_steps (int): Number of steps of validation to perform each time doing a mid-epoch validation check. Defaults to 200. Returns: Dict with model names mapped to train_acc, val_loss, and val_acc """ # Prepare outcomes if not isinstance(outcomes, list): outcomes = [outcomes] if len(outcomes) > 1: log.info(f'Training with {len(outcomes)} outcomes') log.info(f'Outcomes: {", ".join(outcomes)}') # Prepare hyperparameters if isinstance(params, str): if exists(params): hp_dict = sf.model.read_hp_sweep(params) elif exists(join(self.root, params)): hp_dict = sf.model.read_hp_sweep(join(self.root, params)) else: raise errors.ModelParamsError(f"Unable to find file {params}") elif isinstance(params, ModelParams): hp_dict = {'HP0': params} elif isinstance(params, list): if not all([isinstance(hp, ModelParams) for hp in params]): raise errors.ModelParamsError( 'If params is a list, items must be sf.ModelParams' ) hp_dict = {f'HP{i}': hp for i, hp in enumerate(params)} elif isinstance(params, dict): if not all([isinstance(hp, str) for hp in params.keys()]): raise errors.ModelParamsError( 'If params is a dict, keys must be of type str' ) all_hp = params.values() if not all([isinstance(hp, ModelParams) for hp in all_hp]): raise errors.ModelParamsError( 'If params is a dict, values must be sf.ModelParams' ) hp_dict = params else: raise ValueError(f"Unable to interpret params value {params}") # Get default validation settings from kwargs val_kwargs = { k[4:]: v for k, v in training_kwargs.items() if k[:4] == 'val_' } training_kwargs = { k: v for k, v in training_kwargs.items() if k[:4] != 'val_' } val_settings = get_validation_settings(**val_kwargs) _invalid = ( 'k-fold-manual', 'k-fold-preserved-site', 'k-fold', 'bootstrap' ) if (val_settings.strategy in _invalid) and val_settings.source: _m = f'{val_settings.strategy} invalid with val_source != None' raise ValueError(_m) # Next, prepare the multiprocessing manager (needed to free VRAM after # training and keep track of results) manager = multiprocessing.Manager() results_dict = manager.dict() ctx = multiprocessing.get_context('spawn') # === Train with a set of hyperparameters ============================= for hp_name, hp in hp_dict.items(): if exp_label: hp_name = f'{exp_label}-{hp_name}' self._train_hp( hp_name=hp_name, hp=hp, outcomes=outcomes, val_settings=val_settings, ctx=ctx, dataset=dataset, filters=filters, filter_blank=filter_blank, input_header=input_header, min_tiles=min_tiles, max_tiles=max_tiles, mixed_precision=mixed_precision, allow_tf32=allow_tf32, splits=splits, balance_headers=balance_headers, training_kwargs=training_kwargs, results_dict=results_dict, load_method=load_method, process_isolate=process_isolate ) # Print summary of all models log.info('Training complete; validation accuracies:') for model in results_dict: if 'epochs' not in results_dict[model]: continue ep_res = results_dict[model]['epochs'] epochs = [e for e in ep_res if 'epoch' in e] try: last = max([int(e.split('epoch')[-1]) for e in epochs]) final_train_metrics = ep_res[f'epoch{last}']['train_metrics'] except ValueError: pass else: log.info(f'[green]{model}[/] training metrics:') for m in final_train_metrics: log.info(f'{m}: {final_train_metrics[m]}') if 'val_metrics' in ep_res[f'epoch{last}']: final_val_metrics = ep_res[f'epoch{last}']['val_metrics'] log.info(f'[green]{model}[/] validation metrics:') for m in final_val_metrics: log.info(f'{m}: {final_val_metrics[m]}') return dict(results_dict) def train_ensemble( self, outcomes: Union[str, List[str]], params: Union[ModelParams, List[ModelParams], Dict[str, ModelParams]], n_ensembles: Optional[int] = None, **kwargs ) -> List[Dict]: """Train an ensemble of model(s). Trains models using a given set of parameters and outcomes by calling the train function ``n_ensembles`` of times. Args: outcomes (str or list(str)): Outcome label annotation header(s). params (:class:`slideflow.ModelParams`, list or dict): Model parameters for training. May provide one `ModelParams`, a list, or dict mapping model names to params. If multiple params are provided, will train an hyper deep ensemble models for them, otherwise a deep ensemble model. Keyword Args: n_ensembles (int, optional): Total models needed in the ensemble. Defaults to 5. **kwargs: All keyword arguments accepted by :meth:`slideflow.Project.train` Returns: List of dictionaries of length ``n_ensembles``, containing training results for each member of the ensemble. """ # Prepare output directory for saving ensemble members if isinstance(outcomes, list): ensemble_name = f"{'-'.join(outcomes)}-ensemble" else: ensemble_name = f"{outcomes}-ensemble" ensemble_path = sf.util.get_new_model_dir( self.models_dir, ensemble_name ) ensemble_results = [] # Process model params arguments if isinstance(params, ModelParams): hyper_deep = False if n_ensembles is None: raise TypeError( "Keyword argument 'n_ensembles' is required if 'params' is" " not a list of ModelParams." ) elif isinstance(params, list): hyper_deep = True if not all([isinstance(hp, ModelParams) for hp in params]): raise errors.ModelParamsError( 'If params is a list, items must be sf.ModelParams' ) hp_list = params n_ensembles = len(hp_list) elif isinstance(params, dict): hyper_deep = True if not all([isinstance(hp, str) for hp in params.keys()]): raise errors.ModelParamsError( 'If params is a dict, keys must be of type str' ) all_hp = params.values() if not all([isinstance(hp, ModelParams) for hp in all_hp]): raise errors.ModelParamsError( 'If params is a dict, values must be sf.ModelParams' ) hp_list = [hp for hp in params.values()] n_ensembles = len(hp_list) print("The hyperparameter name to ensemble member mapping is:") for e, n in enumerate(params.keys()): print(f" - {n} : ensemble_{e+1}") else: raise ValueError(f"Unable to interpret params value {params}") # Check for same epoch value if hyper_deep: for hp in hp_list: if hp.epochs != hp_list[0].epochs: raise errors.ModelParamsNotFoundError( "All hyperparameters must have the same epoch value" ) # Parse validation settings val_kwargs = {k[4:]: v for k, v in kwargs.items() if k[:4] == 'val_'} val_settings = get_validation_settings(**val_kwargs) print(f"Val settings: {json.dumps(vars(val_settings), indent=2)}") if not hyper_deep: print(f"\nHyperparameters: {params}") for i in range(n_ensembles): print(f"Training Ensemble {i+1} of {n_ensembles}") # Create the ensemble member folder, which will hold each # k-fold model for the given ensemble member. with sf.util.logging_level(30): member_path = sf.util.get_new_model_dir( ensemble_path, f"ensemble_{i+1}") if hyper_deep: print(f"\nHyperparameters: {hp_list[i]}") with self._set_models_dir(member_path): if hyper_deep: hp = hp_list[i] result = self.train(outcomes, hp, **kwargs) ensemble_results.append(result) else: result = self.train(outcomes, params, **kwargs) ensemble_results.append(result) # Copy the slide manifest and params.json file # into the parent ensemble folder. _, member_models = sf.util.get_valid_model_dir(member_path) if len(member_models): try: shutil.copyfile( join(member_path, member_models[0], "slide_manifest.csv"), join(ensemble_path, "slide_manifest.csv")) params_data = sf.util.load_json( join(member_path, member_models[0], "params.json") ) params_data['ensemble_epochs'] = params_data['hp']['epochs'] del params_data['hp'] params_data['hyper_deep_ensemble'] = hyper_deep sf.util.write_json( params_data, join(ensemble_path, "ensemble_params.json") ) except OSError: log.error( "Unable to find ensemble slide manifest and params.json" ) else: log.error("Unable to find ensemble slide manifest and params.json") # Merge predictions from each ensemble. if "save_predictions" in kwargs: if not kwargs['save_predictions']: return ensemble_results project_utils.ensemble_train_predictions(ensemble_path) return ensemble_results def train_simclr( self, simclr_args: "simclr.SimCLR_Args", train_dataset: Dataset, val_dataset: Optional[Dataset] = None, *, exp_label: Optional[str] = None, outcomes: Optional[Union[str, List[str]]] = None, dataset_kwargs: Optional[Dict[str, Any]] = None, normalizer: Optional[Union[str, "sf.norm.StainNormalizer"]] = None, normalizer_source: Optional[str] = None, **kwargs ) -> None: """Train SimCLR model. Models are saved in ``simclr`` folder in the project root directory. See :ref:`simclr_ssl` for more information. Args: simclr_args (slideflow.simclr.SimCLR_Args, optional): SimCLR arguments, as provided by :func:`slideflow.simclr.get_args()`. train_dataset (:class:`slideflow.Dataset`): Training dataset. val_dataset (:class:`slideflow.Dataset`): Validation dataset. Defaults to None. Keyword Args: exp_label (str, optional): Experiment label to add model names. outcomes (str, optional): Annotation column which specifies the outcome, for optionally training a supervised head. Defaults to None. dataset_kwargs: All other keyword arguments for :meth:`slideflow.Dataset.tensorflow` **kwargs: All other keyword arguments for :meth:`slideflow.simclr.run_simclr()` """ from slideflow import simclr # Set up SimCLR experiment data directory if exp_label is None: exp_label = 'simclr' if not exists(join(self.root, 'simclr')): os.makedirs(join(self.root, 'simclr')) outdir = sf.util.create_new_model_dir( join(self.root, 'simclr'), exp_label ) # Get base SimCLR args/settings if not provided if not simclr_args: simclr_args = simclr.get_args() assert isinstance(simclr_args, simclr.SimCLR_Args) # Create dataset builder, which SimCLR will use to create # the input pipeline for training builder = simclr.DatasetBuilder( train_dts=train_dataset, val_dts=val_dataset, labels=outcomes, dataset_kwargs=dataset_kwargs, normalizer=normalizer, normalizer_source=normalizer_source ) simclr.run_simclr(simclr_args, builder, model_dir=outdir, **kwargs) def train_mil( self, config: "mil.TrainerConfig", train_dataset: Dataset, val_dataset: Dataset, outcomes: Union[str, List[str]], bags: Union[str, List[str]], *, exp_label: Optional[str] = None, outdir: Optional[str] = None, **kwargs ): r"""Train a multi-instance learning model. Args: config (:class:`slideflow.mil.TrainerConfig`): Training configuration, as obtained by :func:`slideflow.mil.mil_config()`. train_dataset (:class:`slideflow.Dataset`): Training dataset. val_dataset (:class:`slideflow.Dataset`): Validation dataset. outcomes (str): Outcome column (annotation header) from which to derive category labels. bags (str): Either a path to directory with \*.pt files, or a list of paths to individual \*.pt files. Each file should contain exported feature vectors, with each file containing all tile features for one patient. Keyword args: exp_label (str): Experiment label, used for naming the subdirectory in the ``{project root}/mil`` folder, where training history and the model will be saved. attention_heatmaps (bool): Calculate and save attention heatmaps on the validation dataset. Defaults to False. interpolation (str, optional): Interpolation strategy for smoothing attention heatmaps. Defaults to 'bicubic'. cmap (str, optional): Matplotlib colormap for heatmap. Can be any valid matplotlib colormap. Defaults to 'inferno'. norm (str, optional): Normalization strategy for assigning heatmap values to colors. Either 'two_slope', or any other valid value for the ``norm`` argument of ``matplotlib.pyplot.imshow``. If 'two_slope', normalizes values less than 0 and greater than 0 separately. Defaults to None. """ from .mil import train_mil if outdir is None: outdir = join(self.root, 'mil') return train_mil( config, train_dataset, val_dataset, outcomes, bags, outdir=outdir, exp_label=exp_label, **kwargs )
# ----------------------------------------------------------------------------- def load(root: str, **kwargs) -> "Project": """Load a project at the given root directory. Args: root (str): Path to project. Returns: slideflow.Project """ return Project(root, **kwargs) def create( root: str, cfg: Optional[Union[str, Dict]] = None, *, download: bool = False, md5: bool = False, **kwargs ) -> "Project": """Create a project at the existing folder from a given configuration. Supports both manual project creation via keyword arguments, and setting up a project through a specified configuration. The configuration may be a dictionary or a path to a JSON file containing a dictionary. It must have the key 'annotations', which includes a path to an annotations file, and may optionally have the following arguments: - **name**: Name for the project and dataset. - **rois**: Path to .tar.gz file containing compressed ROIs. - **slides**: Path in which slides will be stored. - **tiles**: Path in which extracted tiles will be stored. - **tfrecords**: Path in which TFRecords will be stored. .. code-block:: python import slideflow as sf P = sf.create_project( root='path', annotations='file.csv', slides='path', tfrecords='path' ) Annotations files are copied into the created project folder. Alternatively, you can create a project using a prespecified configuration, of which there are three available: - ``sf.project.LungAdenoSquam()`` - ``sf.project.ThyroidBRS()`` - ``sf.project.BreastER()`` When creating a project from a configuration, setting ``download=True`` will download the annoations file and slides from The Cancer Genome Atlas (TCGA). .. code-block:: python import slideflow as sf project = sf.create_project( root='path', cfg=sf.project.LungAdenoSquam(), download=True ) Args: root (str): Path at which the Project will be set up. cfg (dict, str, optional): Path to configuration file (JSON), or a dictionary, containing the key "annotations", and optionally with the keys "name", "rois", "slides", "tiles", or "tfrecords". Defaults to None. Keyword Args: download (bool): Download any missing slides from the Genomic Data Commons (GDC) automatically, using slide names stored in the annotations file. md5 (bool): Perform MD5 hash verification for all slides using the GDC (TCGA) MD5 manifest, which will be downloaded. name (str): Set the project name. This has higher priority than any supplied configuration, which will be ignored. slides (str): Set the destination folder for slides. This has higher priority than any supplied configuration, which will be ignored. tiles (str): Set the destination folder for tiles. This has higher priority than any supplied configuration, which will be ignored. tfrecords (str): Set the destination for TFRecords. This has higher priority than any supplied configuration, which will be ignored. roi_dest (str): Set the destination folder for ROIs. dataset_config (str): Path to dataset configuration JSON file for the project. Defaults to './datasets.json'. sources (list(str)): List of dataset sources to include in project. Defaults to 'MyProject'. models_dir (str): Path to directory in which to save models. Defaults to './models'. eval_dir (str): Path to directory in which to save evaluations. Defaults to './eval'. Returns: slideflow.Project """ cfg_names = ( 'annotations', 'name', 'slides', 'tiles', 'tfrecords', 'roi_dest' ) proj_kwargs = {k: v for k, v in kwargs.items() if k not in cfg_names} kwargs = {k: v for k, v in kwargs.items() if k in cfg_names} # Initial verification if sf.util.is_project(root): raise OSError(f"A project already exists at {root}") if isinstance(cfg, dict): cfg = sf.util.EasyDict(cfg) if isinstance(cfg, str): cfg_path = cfg cfg = sf.util.EasyDict(sf.util.load_json(cfg)) # Resolve relative paths in configuration file if 'annotations' in cfg and exists(join(dirname(cfg_path), cfg.annotations)): cfg.annotations = join(dirname(cfg_path), cfg.annotations) if 'rois' in cfg and exists(join(dirname(cfg_path), cfg.rois)): cfg.rois = join(dirname(cfg_path), cfg.rois) elif cfg is None: cfg = sf.util.EasyDict(kwargs) elif issubclass(type(cfg), project_utils._ProjectConfig): cfg = sf.util.EasyDict(cfg.to_dict()) if 'name' not in cfg: cfg.name = "MyProject" if 'slides' not in cfg: cfg.slides = join(root, 'slides') if 'tiles' not in cfg: cfg.tiles = join(root, 'tiles') if 'tfrecords' not in cfg: cfg.tfrecords = join(root, 'tfrecords') cfg.roi_dest = join(cfg.slides, 'rois') # Overwrite any project configuration with user-specified keyword arguments cfg.update(kwargs) # Set up project at the given directory. log.info(f"Setting up project at {root}") if 'annotations' in cfg: if root.startswith('.'): proj_kwargs['annotations'] = join('.', basename(cfg.annotations)) else: proj_kwargs['annotations'] = join(root, basename(cfg.annotations)) P = sf.Project(root, **proj_kwargs, create=True) # Download annotations, if a URL. if 'annotations' in cfg and cfg.annotations.startswith('http'): log.info(f"Downloading {cfg.annotations}") r = requests.get(cfg.annotations) open(proj_kwargs['annotations'], 'wb').write(r.content) if cfg.annotations_md5 != sf.util.md5(proj_kwargs['annotations']): raise errors.ChecksumError( "Remote annotations URL failed MD5 checksum." ) elif 'annotations' in cfg and not cfg.annotations.startswith('.'): try: shutil.copy(cfg.annotations, root) except shutil.SameFileError: pass # Set up the dataset source. source_already_exists = False if 'sources' in proj_kwargs and exists(P.dataset_config): _dataset_config = sf.util.load_json(P.dataset_config) if isinstance(proj_kwargs['sources'], str): source_already_exists = proj_kwargs['sources'] in _dataset_config else: source_already_exists = all( [s in _dataset_config for s in proj_kwargs['sources']] ) if (('sources' not in proj_kwargs or proj_kwargs['sources'] is not None) and not source_already_exists): # Create a new dataset source if it does not already exist. P.add_source( cfg.name, slides=cfg.slides, roi=cfg.roi_dest, tiles=cfg.tiles, tfrecords=cfg.tfrecords) # Set up ROIs, if provided. if 'rois' in cfg and not exists(cfg.roi_dest): os.makedirs(cfg.roi_dest) if 'rois' in cfg and exists(cfg.rois) and os.path.isdir(cfg.rois): # Search the folder for CSV files # and copy to the project ROI directory. to_copy = [r for r in os.listdir(cfg.rois) if path_to_ext(r) == 'csv'] log.info("Copying {} ROIs from {} to {}.".format( len(to_copy), cfg.rois, cfg.roi_dest )) for roi in to_copy: shutil.copy(join(cfg.rois, roi), cfg.roi_dest) elif 'rois' in cfg and exists(cfg.rois) and os.path.isfile(cfg.rois): # Assume ROIs is a tarfile - extract at destination. log.info(f"Extrating ROIs from tarfile at {cfg.rois}.") roi_file = tarfile.open(cfg.rois) roi_file.extractall(cfg.roi_dest) # Create blank annotations file, if not provided. if not exists(P.annotations): P.create_blank_annotations() # Download slides from GDC (TCGA), if specified. if download: df = sf.util.get_gdc_manifest() slide_manifest = dict(zip(df.filename.values, df.id.values)) if not exists(cfg.slides): os.makedirs(cfg.slides) to_download = [s for s in P.dataset().slides() if not exists(join(cfg.slides, f'{s}.svs'))] for i, slide in enumerate(to_download): sf.util.download_from_tcga( slide_manifest[slide+".svs"], dest=cfg.slides, message=f"Downloading {i+1} of {len(to_download)}...") # Perform MD5 hash verification of slides using the GDC manifest. if md5: df = sf.util.get_gdc_manifest() md5_manifest = dict(zip(df.filename.values, df.md5.values)) slides_with_md5 = [s for s in os.listdir(cfg.slides) if s in md5_manifest] failed_md5 = [] for slide in tqdm(slides_with_md5): if sf.util.md5(join(cfg.slides, slide)) != md5_manifest[slide]: log.info(f"Slide {slide} failed MD5 verification") failed_md5 += [slide] if not failed_md5: log.info( f"All {len(slides_with_md5)} slides passed MD5 verification." ) else: log.warn( f"Warning: {len(failed_md5)} slides failed MD5 verification:" f"{', '.join(failed_md5)}" ) return P