Shortcuts

Source code for slideflow.slide.utils

"""Utility functions and constants for slide reading."""

import slideflow as sf
import cv2
import csv
import io
import numpy as np
import shapely.validation as sv
import shapely.geometry as sg
import shapely.affinity as sa
import xml.etree.ElementTree as ET

from shapely.ops import unary_union, polygonize
from PIL import Image, ImageDraw
from slideflow import errors, log
from types import SimpleNamespace
from typing import Union, List, Tuple, Optional, Dict

# Constants
DEFAULT_JPG_MPP = 1
OPS_LEVEL_COUNT = 'openslide.level-count'
OPS_MPP_X = 'openslide.mpp-x'
OPS_VENDOR = 'openslide.vendor'
OPS_BOUNDS_HEIGHT = 'openslide.bounds-height'
OPS_BOUNDS_WIDTH = 'openslide.bounds-width'
OPS_BOUNDS_X = 'openslide.bounds-x'
OPS_BOUNDS_Y = 'openslide.bounds-y'
TIF_EXIF_KEY_MPP = 65326
OPS_WIDTH = 'width'
OPS_HEIGHT = 'height'
DEFAULT_WHITESPACE_THRESHOLD = 230
DEFAULT_WHITESPACE_FRACTION = 1.0
DEFAULT_GRAYSPACE_THRESHOLD = 0.05
DEFAULT_GRAYSPACE_FRACTION = 0.6
FORCE_CALCULATE_WHITESPACE = -1
FORCE_CALCULATE_GRAYSPACE = -1
ROTATE_90_CLOCKWISE = 1
ROTATE_180_CLOCKWISE = 2
ROTATE_270_CLOCKWISE = 3
FLIP_HORIZONTAL = 4
FLIP_VERTICAL = 5


def OPS_LEVEL_HEIGHT(level: int) -> str:
    return f'openslide.level[{level}].height'


def OPS_LEVEL_WIDTH(level: int) -> str:
    return f'openslide.level[{level}].width'


def OPS_LEVEL_DOWNSAMPLE(level: int) -> str:
    return f'openslide.level[{level}].downsample'

# -----------------------------------------------------------------------------
# Classes

class ROI:
    """Object container for an ROI polygon annotation."""

    def __init__(
        self,
        name: str,
        coordinates: Union[np.ndarray, List[Tuple[int, int]]],
        *,
        label: Optional[str] = None,
        holes: Optional[List["ROI"]] = None
    ) -> None:
        self.name = name
        self._label = label if label else None
        self.holes = holes if holes else {}
        self._poly = None
        self._triangles = None
        self.coordinates = np.array(coordinates)
        self.validate()

    def __repr__(self):
        return f"<ROI (coords={len(self.coordinates)} label={self.label})>"

    @property
    def description(self) -> str:
        """Return a description of the ROI."""
        if not self.holes:
            return self.name
        else:
            return self.name + ' (holes: {})'.format(', '.join(
                [h.name for h in self.holes.values()]
            ))

    @property
    def label(self) -> Optional[str]:
        """Return the label of the ROI."""
        return self._label

    @label.setter
    def label(self, label: str) -> None:
        """Set the label of the ROI."""
        self._label = label
        for h in self.holes.values():
            h.label = label

    # --- Polygons ------------------------------------------------------------

    @property
    def poly(self) -> sg.Polygon:
        """Return the shapely polygon object."""
        if self._poly is None:
            self.update_polygon()
        return self._poly

    @property
    def triangles(self) -> np.ndarray:
        """Return the triangulated mesh."""
        if self._triangles is None:
            self._triangles = self.create_triangles()
        return self._triangles

    def make_polygon(self) -> sg.Polygon:
        """Create a shapely polygon from the coordinates.

        Raises:
            ValueError: If the coordinates do not form a valid polygon.

        Returns:
            sg.Polygon: Shapely polygon object.

        """
        poly = sv.make_valid(sg.Polygon(self.coordinates))
        to_delete = []
        for h, hole in self.holes.items():
            if not poly.contains(hole.poly):
                # Hole is not contained within the polygon,
                # so remove it from the list of holes.
                to_delete.append(h)
            poly = poly.difference(hole.poly)
        for h in to_delete:
            del self.holes[h]
        return poly

    def update_polygon(self) -> None:
        """Update the shapely polygon object."""
        self._poly = self.make_polygon()
        self._triangles = None

    def scaled_poly(self, scale: float) -> sg.Polygon:
        """Create a scaled polygon."""
        poly = sv.make_valid(sg.Polygon(self.scaled_coords(scale)))
        for h in self.holes.values():
            poly = poly.difference(h.scaled_poly(scale))
        return poly

    def poly_coords(self) -> np.ndarray:
        """Return the coordinates of the polygon."""
        if self.poly.geom_type in ('MultiPolygon', 'GeometryCollection'):
            valid_polys = [p for p in self.poly.geoms if p.geom_type == 'Polygon']
            if not len(valid_polys):
                return np.array([])
            else:
                coords = np.concatenate([
                    np.stack(p.exterior.coords.xy, axis=-1)
                    for p in valid_polys
                ])
        elif self.poly.geom_type == 'Polygon':
            coords = np.stack(self.poly.exterior.coords.xy, axis=-1)
        else:
            # This should have been caught by the validate function
            raise errors.InvalidROIError(f"Unrecognized ROI polygon geometry: {self.poly.geom_type}")
        # Remove duplicate points
        coords = np.concatenate([
            # Take the first coordinate
            np.expand_dims(coords[0], 0),
            # Only take subsequent coordinates if they are not repeating
            coords[1:][~np.all(coords[:-1] == coords[1:], axis=-1)]
        ], axis=0)
        return coords

    def simplify(self, tolerance: float = 5) -> None:
        """Simplify the polygon."""
        if self.poly.geom_type in ('MultiPolygon', 'GeometryCollection'):
            poly_s = sg.MultiPolygon([p.simplify(tolerance) for p in self.poly.geoms if p.geom_type == 'Polygon'])
            if not len(poly_s.geoms):
                # Polygon is empty after simplification, and thus cannot be simplified.
                log.warning(f"ROI {self.name} is empty after simplification.")
                pass
            else:
                self.coordinates = np.concatenate([np.stack(p.exterior.coords.xy, axis=-1) for p in poly_s.geoms])
        elif self.poly.geom_type == 'Polygon':
            poly_s = self.poly.simplify(tolerance=tolerance)
            self.coordinates = np.stack(poly_s.exterior.coords.xy, axis=-1)
        else:
            # This should have been caught by the validate function
            raise errors.InvalidROIError(f"Unrecognized ROI polygon geometry: {self.poly.geom_type}")
        for hole in self.holes.values():
            hole.simplify(tolerance)
        self.update_polygon()

    def invert(self, width: int, height: int) -> "ROI":
        """Invert the ROI within the bounds of a whole-slide image.

        Args:
            width (int): Width of the whole-slide image.
            height (int): Height of the whole-slide image.

        Returns:
            ROI: Inverted ROI of shape (width, height) with this ROI as a hole.

        """
        # Ensure polygon is generated
        self.update_polygon()
        # Calculate polygon bounding box (whole-slide)
        roi_wsi_coords = np.array([[0., 0.], [0., height], [width, height], [width, 0.]])
        # Create the inverted ROI
        inverted_ROI = ROI(name=self.name, coordinates=roi_wsi_coords)
        # Add the hole to the ROI
        inverted_ROI.add_hole(self)
        return inverted_ROI

    def create_triangles(self) -> Optional[np.ndarray]:
        """Create a triangulated mesh from the polygon."""

        def as_open_array(array):
            if (array[0] == array[-1]).all():
                return array[:-1]
            else:
                return array

        # First, ensure the polygon is valid
        if not self.polygon_is_valid():
            sf.log.error(
                "Unable to create triangles; ROI polygon is invalid."
            )
            return None
        if self.poly.geom_type != 'Polygon' or any([h.poly.geom_type != 'Polygon' for h in self.holes.values()]):
            sf.log.error(
                "Unable to create triangles; ROI is not a simple polygon."
            )
            return None

        # Vertices of the hole boundaries
        hole_vertices = {
            h: as_open_array(hole.poly_coords())
            for h, hole in self.holes.items()
        }

        # Filter out holes that are too small
        valid_holes = [hole for h, hole in self.holes.items() if len(hole_vertices[h]) > 3]
        hole_vertices = [v for h, v in hole_vertices.items() if len(v) > 3]

        # Verify all holes are contained within the polygon
        for hole in valid_holes:
            poly = sv.make_valid(sg.Polygon(self.poly_coords()))
            if not poly.contains(hole.poly):
                # Hole is not contained within the polygon
                return None

        # Vertices of representative points within each hole
        hole_points = [
            hole.poly.representative_point().coords[0]
            for hole in valid_holes
        ]

        if not len(hole_vertices):
            hole_vertices = None
            hole_points = None

        # Build triangles.
        triangle_vertices = sf.util.create_triangles(
            as_open_array(self.poly_coords()),
            hole_vertices=hole_vertices,
            hole_points=hole_points
        )
        return triangle_vertices

    # --- Holes ---------------------------------------------------------------

    def add_hole(self, roi: "ROI") -> None:
        """Add a hole to the ROI."""
        hole_name = self.get_next_hole_name()
        self.holes[hole_name] = roi
        self.update_polygon()

    def remove_hole(self, roi: Union["ROI", str]) -> None:
        """Remove a hole from the ROI."""
        if isinstance(roi, str):
            roi = self.get_hole(roi)
        hole_idx = [h for h, r in self.holes.items() if r == roi]
        del self.holes[hole_idx]
        self.update_polygon()

    def get_hole(self, name: str) -> "ROI":
        """Get a hole by name."""
        for h in self.holes.values():
            if h.name == name:
                return h
        raise ValueError(f"No hole found with name {name}")

    def get_next_hole_name(self) -> str:
        """Get the next available hole name."""
        return len(self.holes)

    # --- Other functions -----------------------------------------------------

    def validate(self) -> None:
        """Validate the exterior coordinates form a valid polygon."""
        try:
            poly_coords = self.poly_coords()
        except ValueError as e:
            raise errors.InvalidROIError(f"Invalid ROI ({self.name}): {e}")
        if len(poly_coords) < 4:
            raise errors.InvalidROIError(f"Invalid ROI ({self.name}): ROI must contain at least 4 coordinates.")
        if self.poly.geom_type not in ('Polygon', 'MultiPolygon', 'GeometryCollection'):
            raise errors.InvalidROIError(
                f"Invalid ROI ({self.name}): ROI must either be a Polygon, or "
                "MultiPolygon/GeometryCollection containing at least one polygon."
            )
        if self.poly.geom_type in ('MultiPolygon', 'GeometryCollection'):
            if not len([p for p in self.poly.geoms if p.geom_type == 'Polygon']):
                raise errors.InvalidROIError(
                    f"Invalid ROI ({self.name}): ROI must contain at least one valid polygon."
                )

    def polygon_is_valid(self) -> bool:
        """Check if the polygon is valid."""
        poly = sg.Polygon(self.coordinates)
        if not len(list(polygonize(unary_union(poly)))):
            # Polygon is self-intersecting
            return False
        if not poly.is_valid:
            # Polygon is invalid
            return False
        return all([h.polygon_is_valid() for h in self.holes.values()])

    def scaled_coords(self, scale: float) -> np.ndarray:
        return np.multiply(self.coordinates, 1/scale)


class QCMask:

    def __init__(
        self,
        mask: np.ndarray,
        filter_threshold: float = 0.6,
        is_roi: bool = False
    ) -> None:

        if not 0 <= filter_threshold <= 1:
            raise ValueError('filter_threshold must be between 0 and 1')
        if not isinstance(mask, np.ndarray):
            raise ValueError('mask must be a numpy array')
        if not len(mask.shape) == 2:
            raise ValueError('mask must be a 2D array')
        if not mask.dtype == bool:
            raise ValueError('mask must be a boolean array')

        self.mask = mask
        self.is_roi = is_roi
        self.filter_threshold = filter_threshold

    def __repr__(self):
        return f"<QCMask (shape={self.shape}), filter_threshold={self.filter_threshold}, is_roi={self.is_roi}>"

    @property
    def shape(self):
        return self.mask.shape


class Alignment:

    def __init__(
        self,
        origin: Tuple[int, int],
        scale: float,
        coord: Optional[np.ndarray] = None
    ) -> None:
        self.origin = origin
        self.scale = scale
        self.coord = coord
        self.centroid = None  # type: Tuple[float, float]
        self.normal = None    # type: Tuple[float, float]

    def __repr__(self):
        return f"<Alignment (origin={self.origin}, coord={self.coord}, centroid={self.centroid}, normal={self.normal})>"

    @classmethod
    def from_fit(cls, origin, scale, centroid, normal):
        obj = cls(origin, scale, None)
        obj.centroid = centroid
        obj.normal = normal
        return obj

    @classmethod
    def from_translation(cls, origin, scale):
        return cls(origin, scale, None)

    @classmethod
    def from_coord(cls, origin, coord):
        return cls(origin, None, coord)

    def save(self, path):
        save_dict = dict(
            origin=np.array(self.origin),
            scale=np.array(self.scale)
        )
        if self.coord is not None:
            save_dict['coord'] = self.coord
        if self.centroid is not None:
            save_dict['centroid'] = np.array(self.centroid)
        if self.normal is not None:
            save_dict['normal'] = np.array(self.normal)
        np.savez(path, **save_dict)

    @classmethod
    def load(cls, path):
        load_dict = np.load(path, allow_pickle=True)
        origin = tuple(load_dict['origin'])
        scale = load_dict['scale']
        coord = load_dict['coord'] if 'coord' in load_dict else None
        centroid = load_dict['centroid'] if 'centroid' in load_dict else None
        normal = load_dict['normal'] if 'normal' in load_dict else None
        obj = cls(origin, scale, coord)
        obj.centroid = centroid
        obj.normal = normal
        return obj


# -----------------------------------------------------------------------------
# Functions

def numpy2jpg(img: np.ndarray) -> str:
    if img.shape[-1] == 4:
        img = img[:, :, 0:3]
    img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
    return cv2.imencode(".jpg", img)[1].tobytes()   # Default quality = 95%


def numpy2png(img: np.ndarray) -> str:
    if img.shape[-1] == 4:
        img = img[:, :, 0:3]
    img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
    return cv2.imencode(".png", img)[1].tobytes()


[docs]def predict( slide: str, model: str, *, stride_div: int = 1, **kwargs ) -> Tuple[np.ndarray, Optional[np.ndarray]]: """Generate a whole-slide prediction from a saved model. Args: slide (str): Path to slide. model (str): Path to saved model trained in Slideflow. Keyword args: stride_div (int, optional): Divisor for stride when convoluting across slide. Defaults to 1. roi_dir (str, optional): Directory in which slide ROI is contained. Defaults to None. rois (list, optional): List of paths to slide ROIs. Alternative to providing roi_dir. Defaults to None. roi_method (str): Either 'inside', 'outside', 'auto', or 'ignore'. Determines how ROIs are used to extract tiles. If 'inside' or 'outside', will extract tiles in/out of an ROI, and raise errors.MissingROIError if an ROI is not available. If 'auto', will extract tiles inside an ROI if available, and across the whole-slide if no ROI is found. If 'ignore', will extract tiles across the whole-slide regardless of whether an ROI is available. Defaults to 'auto'. batch_size (int, optional): Batch size for calculating predictions. Defaults to 32. num_threads (int, optional): Number of tile worker threads. Cannot supply both ``num_threads`` (uses thread pool) and ``num_processes`` (uses multiprocessing pool). Defaults to CPU core count. num_processes (int, optional): Number of child processes to spawn for multiprocessing pool. Defaults to None (does not use multiprocessing). enable_downsample (bool, optional): Enable the use of downsampled slide image layers. Defaults to True. img_format (str, optional): Image format (png, jpg) to use when extracting tiles from slide. Must match the image format the model was trained on. If 'auto', will use the format logged in the model params.json. Defaults to 'auto'. generator_kwargs (dict, optional): Keyword arguments passed to the :meth:`slideflow.WSI.build_generator()`. device (torch.device, optional): PyTorch device. Defaults to initializing a new CUDA device. Returns: np.ndarray: Predictions for each outcome, with shape = (num_classes, ) np.ndarray, optional: Uncertainty for each outcome, if the model was trained with uncertainty, with shape = (num_classes,) """ from slideflow import Heatmap log.info("Calculating whole-slide prediction...") heatmap = Heatmap(slide, model, generate=True, stride_div=stride_div, **kwargs) assert heatmap.predictions is not None preds = heatmap.predictions.reshape(-1, heatmap.predictions.shape[-1]) preds = np.nanmean(preds, axis=0).filled() if heatmap.uncertainty is not None: unc = heatmap.uncertainty.reshape(-1, heatmap.uncertainty.shape[-1]) unc = np.nanmean(unc, axis=0).filled() return preds, unc else: return preds
def log_extraction_params(**kwargs) -> None: """Log tile extraction parameters.""" if 'whitespace_fraction' not in kwargs: ws_f = DEFAULT_WHITESPACE_FRACTION else: ws_f = kwargs['whitespace_fraction'] if 'whitespace_threshold' not in kwargs: ws_t = DEFAULT_WHITESPACE_THRESHOLD else: ws_t = kwargs['whitespace_threshold'] if 'grayspace_fraction' not in kwargs: gs_f = DEFAULT_GRAYSPACE_FRACTION else: gs_f = kwargs['grayspace_fraction'] if 'grayspace_threshold' not in kwargs: gs_t = DEFAULT_GRAYSPACE_THRESHOLD else: gs_t = kwargs['grayspace_threshold'] if 'normalizer' in kwargs: log.info(f'Extracting tiles using [magenta]{kwargs["normalizer"]}[/] ' 'normalization') if ws_f < 1: log.info('Filtering tiles by whitespace fraction') excl = f'(exclude if >={ws_f*100:.0f}% whitespace)' log.debug(f'Whitespace defined as RGB avg > {ws_t} {excl}') if gs_f < 1: log.info('Filtering tiles by grayspace fraction') excl = f'(exclude if >={gs_f*100:.0f}% grayspace)' log.debug(f'Grayspace defined as HSV avg < {gs_t} {excl}') def draw_roi( img: Union[np.ndarray, str], coords: List[List[int]], color: str = 'red', linewidth: int = 5 ) -> np.ndarray: """Draw ROIs on image. Args: img (Union[np.ndarray, str]): Image. coords (List[List[int]]): ROI coordinates. Returns: np.ndarray: Image as numpy array. """ annPolys = [sg.Polygon(b) for b in coords] if isinstance(img, np.ndarray): annotated_img = Image.fromarray(img) elif isinstance(img, str): annotated_img = Image.open(io.BytesIO(img)) # type: ignore else: raise ValueError("Expected img to be a numpy array or bytes, got: {}".format( type(img) )) draw = ImageDraw.Draw(annotated_img) for poly in annPolys: if poly.geom_type in ('MultiPolygon', 'GeometryCollection'): for p in poly.geoms: if p.is_empty or p.geom_type != 'Polygon': continue x, y = p.exterior.coords.xy zipped = list(zip(x.tolist(), y.tolist())) draw.line(zipped, joint='curve', fill=color, width=linewidth) else: x, y = poly.exterior.coords.xy zipped = list(zip(x.tolist(), y.tolist())) draw.line(zipped, joint='curve', fill=color, width=linewidth) return np.asarray(annotated_img) def roi_coords_from_image( c: List[int], args: SimpleNamespace ) -> Tuple[List[int], List[np.ndarray], List[List[int]]]: # Scale ROI according to downsample level extract_scale = (args.extract_px / args.full_extract_px) # Scale ROI according to image resizing resize_scale = (args.tile_px / args.extract_px) def proc_coords(_coords): # Offset coordinates to extraction window _coords = np.add(_coords, np.array([-1 * c[0], -1 * c[1]])) # Rescale according to downsampling and resizing _coords = np.multiply(_coords, (extract_scale * resize_scale)) return _coords # Filter out ROIs not in this tile coords = [] ll = np.array([0, 0]) ur = np.array([args.tile_px, args.tile_px]) for roi in args.rois: coord = proc_coords(roi.coordinates) idx = np.all(np.logical_and(ll <= coord, coord <= ur), axis=1) coords_in_tile = coord[idx] if len(coords_in_tile) > 3: coords += [coords_in_tile] for hole in roi.holes.values(): hole_coord = proc_coords(hole.coordinates) hole_idx = np.all(np.logical_and(ll <= hole_coord, hole_coord <= ur), axis=1) hole_coords_in_tile = hole_coord[hole_idx] if len(hole_coords_in_tile) > 3: coords += [hole_coords_in_tile] # Convert outer ROI to bounding box that fits within tile boxes = [] yolo_anns = [] for coord in coords: max_vals = np.max(coord, axis=0) min_vals = np.min(coord, axis=0) max_x = min(max_vals[0], args.tile_px) max_y = min(max_vals[1], args.tile_px) min_x = max(min_vals[0], 0) min_y = max(0, min_vals[1]) width = (max_x - min_x) / args.tile_px height = (max_y - min_y) / args.tile_px x_center = ((max_x + min_x) / 2) / args.tile_px y_center = ((max_y + min_y) / 2) / args.tile_px yolo_anns += [[x_center, y_center, width, height]] boxes += [np.array([ [min_x, min_y], [min_x, max_y], [max_x, max_y], [max_x, min_y] ])] return coords, boxes, yolo_anns def xml_to_csv(path: str) -> str: """Create a QuPath format CSV ROI file from an ImageScope-format XML. ImageScope-formatted XMLs are expected to have "Region" and "Vertex" attributes. The "Region" attribute should have an "ID" sub-attribute. Args: path (str): ImageScope XML ROI file path Returns: str: Path to new CSV file. Raises: slideflow.errors.ROIError: If the XML could not be converted. """ tree = ET.parse(path) root = tree.getroot() new_csv_file = path[:-4] + '.csv' required_attributes = ['.//Region', './/Vertex'] if not all(root.findall(a) for a in required_attributes): raise errors.ROIError( f"No ROIs found in the XML file {path}. Check that the XML " "file attributes are named correctly named in ImageScope " "format with 'Region' and 'Vertex' tags." ) with open(new_csv_file, 'w', newline='') as csvfile: csvwriter = csv.writer(csvfile) csvwriter.writerow(['ROI_name', 'X_base', 'Y_base']) for region in root.findall('.//Region'): id_tag = region.get('Id') if not id_tag: raise errors.ROIError( "No ID attribute found for Region. Check xml file and " "ensure it adheres to ImageScope format." ) roi_name = 'ROI_' + str(id_tag) vertices = region.findall('.//Vertex') if not vertices: raise errors.ROIError( "No Vertex found in ROI. Check xml file and ensure it " "adheres to ImageScope format." ) csvwriter.writerows([ [roi_name, vertex.get('X'), vertex.get('Y')] for vertex in vertices ]) return new_csv_file def get_scaled_and_intersecting_polys( polys: "sg.Polygon", tile: "sg.Polygon", scale: float, origin: Tuple[int, int] ): """Get scaled and intersecting polygons for a given tile. Args: polys (sg.Polygon): Shapely polygon containing union of all ROIs, in base dimensions. tile (sg.Polygon): Shapely polygon representing the tile, in base dimensions. scale (float): Scale factor. full_stride (int): Full stride, indictating the number of pixels in between each tile. grid_idx (Tuple[int, int]): Grid index of the tile (x, y). Returns: sg.Polygon: ROI polygons intersecting the tile, scaled to the tile size and with the origin with reference to the tile. """ topleft, topright = origin A = polys.intersection(tile) # Translate polygons so the intersection origin is at (0, 0) B = sa.translate(A, -topleft, -topright) # Scale to the target tile size C = sa.scale(B, xfact=scale, yfact=scale, origin=(0, 0)) return C def _align_to_matrix(im1: np.ndarray, im2: np.ndarray, warp_matrix: np.ndarray) -> np.ndarray: """Align an image to a warp matrix.""" # Use the warpAffine function to apply the transformation return cv2.warpAffine(im1, warp_matrix, (im2.shape[1], im2.shape[0]), flags=cv2.INTER_LINEAR + cv2.WARP_INVERSE_MAP) def _find_translation_matrix( im1: np.ndarray, im2: np.ndarray, *, denoise: bool = True, h: float = 30, block_size: int = 7, search_window: int = 21, n_iterations: int = 10000, termination_eps = 1e-10, warp_matrix: Optional[np.ndarray] = None ) -> np.ndarray: """ Align two images using only scaling and translation. :param im1: The image to be aligned. :param im2: The reference image. :return: Aligned image of im1. """ # Convert images to grayscale im1_gray = cv2.cvtColor(im1, cv2.COLOR_BGR2GRAY) im2_gray = cv2.cvtColor(im2, cv2.COLOR_BGR2GRAY) # De-noising if denoise: im1_gray = cv2.fastNlMeansDenoising(im1_gray, None, h, block_size, search_window) im2_gray = cv2.fastNlMeansDenoising(im2_gray, None, h, block_size, search_window) # Transform images to normalize contrast im1_gray = cv2.equalizeHist(im1_gray) im2_gray = cv2.equalizeHist(im2_gray) # Define the motion model warp_mode = cv2.MOTION_TRANSLATION # Define 2x3 matrix to store the transformation if warp_matrix is None: warp_matrix = np.eye(2, 3, dtype=np.float32) # Set the number of iterations and termination criteria criteria = (cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, n_iterations, termination_eps) # Use findTransformECC to compute the transformation _, warp_matrix = cv2.findTransformECC(im2_gray, im1_gray, warp_matrix, warp_mode, criteria) return warp_matrix # type: ignore def align_image(im1: np.ndarray, im2: np.ndarray) -> np.ndarray: """ Align two images using only scaling and translation. :param im1: The image to be aligned. :param im2: The reference image. :return: Aligned image of im1. """ warp_matrix = _find_translation_matrix(im1, im2) return _align_to_matrix(im1, im2, warp_matrix) def align_by_translation( im1: np.ndarray, im2: np.ndarray, round: bool = False, calculate_mse: bool = False, **kwargs ) -> Union[Union[Tuple[float, float], Tuple[int, int]], Tuple[Union[Tuple[float, float], Tuple[int, int]], float]]: """ Find the (x, y) translation that aligns im1 to im2. Args: im1 (np.ndarray): Target for alignment. im2 (np.ndarray): Image to align. round (bool): Round to the nearest int. Defaults to False. calculate_mse (bool): Return the mean squared error (MSE) of alignment. Defaults to False. """ try: warp_matrix = _find_translation_matrix(im1, im2, **kwargs) except cv2.error: raise errors.AlignmentError( "Could not align images. Check that the images are the same " "size, that they are not rotated or flipped, and that they have " "overlapping regions." ) alignment = -warp_matrix[0, 2], -warp_matrix[1, 2] if round: alignment = (int(np.round(alignment[0])), int(np.round(alignment[1]))) if calculate_mse: aligned_im1 = _align_to_matrix(im1, im2, warp_matrix) mse = compute_alignment_mse(aligned_im1, im2) return alignment, mse else: return alignment def compute_alignment_mse( imageA: np.ndarray, imageB: np.ndarray, flatten: bool = True ) -> float: """ Compute the Mean Squared Error between two images in their overlapping region, excluding areas that are black (0, 0, 0) in either image. :param imageA: First image. :param imageB: Second image. :return: Mean Squared Error (MSE) between the images in the valid overlapping region. """ # Remove the alpha channel from both images if flatten: imageA = imageA[:, :, 0:3] imageB = imageB[:, :, 0:3] assert imageA.shape == imageB.shape, "Image sizes must match." # Create a combined mask where neither of the images is black combined_mask = np.logical_not(np.logical_or(imageA == 0, imageB == 0)) # Compute MSE only for valid regions diff = (imageA.astype("float") - imageB.astype("float")) ** 2 err = np.sum(diff[combined_mask]) / np.sum(combined_mask) return err def best_fit_plane(points): # Ensure the input is a numpy array points = np.array(points) # 1. Center the data centroid = points.mean(axis=0) centered_points = points - centroid # 2. Compute the covariance matrix cov_matrix = np.cov(centered_points, rowvar=False) # 3. Compute the eigenvalues and eigenvectors eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix) # 4. Get the eigenvector corresponding to the smallest eigenvalue normal_vector = eigenvectors[:, np.argmin(eigenvalues)] # The equation of the plane is `normal_vector . (x - centroid) = 0` return centroid, normal_vector def z_on_plane(x, y, centroid, normal): cx, cy, cz = centroid nx, ny, nz = normal if nz == 0: raise ValueError("Normal vector's Z component is zero. Can't compute Z value for the given X, Y.") z = cz + (nx * (cx - x) + ny * (cy - y)) / nz return z def calc_alignment(c, us, them, n=None): idx, (x, y, xi, yi) = c our_tile = us[xi, yi] try: their_tile = them[xi, yi] except IndexError: return None, c if our_tile is None or their_tile is None: return None, c if n is not None: our_tile = n.transform(our_tile[:, :, 0:3]) their_tile = n.transform(their_tile[:, :, 0:3]) try: rough_alignment = sf.slide.utils._find_translation_matrix(their_tile, our_tile, h=50, search_window=53) except cv2.error: rough_alignment = None log.debug("Initial rough alignment failed at x={}, y={} (grid {}, {})".format( x, y, xi, yi )) else: log.debug("Initial rough alignment complete at x={}, y={} (grid {}, {}): {}".format( x, y, xi, yi, (int(np.round(-rough_alignment[0, 2])), int(np.round(-rough_alignment[1, 2]))) )) try: return align_by_translation(their_tile, our_tile, round=True, warp_matrix=rough_alignment), c except errors.AlignmentError as e: return 'error', c # ----------------------------------------------------------------------------- # Internals def _update_kw_with_defaults(kwargs) -> Dict: """Updates a set of keyword arguments with default extraction values. for whitepsace/grayspace filtering. """ if kwargs['whitespace_fraction'] is None: kwargs['whitespace_fraction'] = DEFAULT_WHITESPACE_FRACTION if kwargs['whitespace_threshold'] is None: kwargs['whitespace_threshold'] = DEFAULT_WHITESPACE_THRESHOLD if kwargs['grayspace_fraction'] is None: kwargs['grayspace_fraction'] = DEFAULT_GRAYSPACE_FRACTION if kwargs['grayspace_threshold'] is None: kwargs['grayspace_threshold'] = DEFAULT_GRAYSPACE_THRESHOLD if kwargs['img_format'] is None: kwargs['img_format'] = 'jpg' return kwargs def _polyArea(x: List[float], y: List[float]) -> float: return 0.5*np.abs(np.dot(x, np.roll(y, 1))-np.dot(y, np.roll(x, 1))) def _convert_img_to_format(image: np.ndarray, img_format: str) -> str: if img_format.lower() == 'png': return cv2.imencode( '.png', cv2.cvtColor(image, cv2.COLOR_RGB2BGR) )[1].tobytes() elif img_format.lower() in ('jpg', 'jpeg'): return cv2.imencode( '.jpg', cv2.cvtColor(image, cv2.COLOR_RGB2BGR), [int(cv2.IMWRITE_JPEG_QUALITY), 100] )[1].tostring() else: raise ValueError(f"Unknown image format {img_format}")