Source code for gwcs.wcs

# Licensed under a 3-clause BSD style license - see LICENSE.rst
import functools
import itertools
import numpy as np
from astropy.modeling.core import Model
from astropy.modeling import utils as mutils

from .api import GWCSAPIMixin
from . import coordinate_frames
from .utils import CoordinateFrameError
from .utils import _toindex
from . import utils


__all__ = ['WCS']


[docs]class WCS(GWCSAPIMixin): """ Basic WCS class. Parameters ---------- forward_transform : `~astropy.modeling.Model` or a list The transform between ``input_frame`` and ``output_frame``. A list of (frame, transform) tuples where ``frame`` is the starting frame and ``transform`` is the transform from this frame to the next one or ``output_frame``. The last tuple is (transform, None), where None indicates the end of the pipeline. input_frame : str, `~gwcs.coordinate_frames.CoordinateFrame` A coordinates object or a string name. output_frame : str, `~gwcs.coordinate_frames.CoordinateFrame` A coordinates object or a string name. name : str a name for this WCS """ def __init__(self, forward_transform=None, input_frame='detector', output_frame=None, name=""): #self.low_level_wcs = self self._available_frames = [] self._pipeline = [] self._name = name self._array_shape = None self._initialize_wcs(forward_transform, input_frame, output_frame) def _initialize_wcs(self, forward_transform, input_frame, output_frame): if forward_transform is not None: if isinstance(forward_transform, Model): if output_frame is None: raise CoordinateFrameError("An output_frame must be specified" "if forward_transform is a model.") _input_frame, inp_frame_obj = self._get_frame_name(input_frame) _output_frame, outp_frame_obj = self._get_frame_name(output_frame) super(WCS, self).__setattr__(_input_frame, inp_frame_obj) super(WCS, self).__setattr__(_output_frame, outp_frame_obj) self._pipeline = [(input_frame, forward_transform.copy()), (output_frame, None)] elif isinstance(forward_transform, list): for item in forward_transform: name, frame_obj = self._get_frame_name(item[0]) super(WCS, self).__setattr__(name, frame_obj) #self._pipeline.append((name, item[1])) self._pipeline = forward_transform else: raise TypeError("Expected forward_transform to be a model or a " "(frame, transform) list, got {0}".format( type(forward_transform))) else: # Initialize a WCS without a forward_transform - allows building a WCS programmatically. if output_frame is None: raise CoordinateFrameError("An output_frame must be specified" "if forward_transform is None.") _input_frame, inp_frame_obj = self._get_frame_name(input_frame) _output_frame, outp_frame_obj = self._get_frame_name(output_frame) super(WCS, self).__setattr__(_input_frame, inp_frame_obj) super(WCS, self).__setattr__(_output_frame, outp_frame_obj) self._pipeline = [(_input_frame, None), (_output_frame, None)]
[docs] def get_transform(self, from_frame, to_frame): """ Return a transform between two coordinate frames. Parameters ---------- from_frame : str or `~gwcs.coordinate_frame.CoordinateFrame` Initial coordinate frame name of object. to_frame : str, or instance of `~gwcs.cordinate_frames.CoordinateFrame` End coordinate frame name or object. Returns ------- transform : `~astropy.modeling.Model` Transform between two frames. """ if not self._pipeline: return None try: from_ind = self._get_frame_index(from_frame) except ValueError: raise CoordinateFrameError("Frame {0} is not in the available " "frames".format(from_frame)) try: to_ind = self._get_frame_index(to_frame) except ValueError: raise CoordinateFrameError("Frame {0} is not in the available frames".format(to_frame)) if to_ind < from_ind: transforms = np.array(self._pipeline[to_ind: from_ind])[:, 1].tolist() transforms = [tr.inverse for tr in transforms[::-1]] elif to_ind == from_ind: return None else: transforms = np.array(self._pipeline[from_ind: to_ind])[:, 1].copy() return functools.reduce(lambda x, y: x | y, transforms)
[docs] def set_transform(self, from_frame, to_frame, transform): """ Set/replace the transform between two coordinate frames. Parameters ---------- from_frame : str or `~gwcs.coordinate_frame.CoordinateFrame` Initial coordinate frame. to_frame : str, or instance of `~gwcs.cordinate_frames.CoordinateFrame` End coordinate frame. transform : `~astropy.modeling.Model` Transform between ``from_frame`` and ``to_frame``. """ from_name, from_obj = self._get_frame_name(from_frame) to_name, to_obj = self._get_frame_name(to_frame) if not self._pipeline: if from_name != self._input_frame: raise CoordinateFrameError( "Expected 'from_frame' to be {0}".format(self._input_frame)) if to_frame != self._output_frame: raise CoordinateFrameError( "Expected 'to_frame' to be {0}".format(self._output_frame)) try: from_ind = self._get_frame_index(from_name) except ValueError: raise CoordinateFrameError("Frame {0} is not in the available frames".format(from_name)) try: to_ind = self._get_frame_index(to_name) except ValueError: raise CoordinateFrameError("Frame {0} is not in the available frames".format(to_name)) if from_ind + 1 != to_ind: raise ValueError("Frames {0} and {1} are not in sequence".format(from_name, to_name)) self._pipeline[from_ind] = (self._pipeline[from_ind][0], transform)
@property def forward_transform(self): """ Return the total forward transform - from input to output coordinate frame. """ if self._pipeline: return functools.reduce(lambda x, y: x | y, [step[1] for step in self._pipeline[: -1]]) else: return None @property def backward_transform(self): """ Return the total backward transform if available - from output to input coordinate system. Raises ------ NotImplementedError : An analytical inverse does not exist. """ try: backward = self.forward_transform.inverse except NotImplementedError as err: raise NotImplementedError("Could not construct backward transform. \n{0}".format(err)) return backward def _get_frame_index(self, frame): """ Return the index in the pipeline where this frame is locate. """ if isinstance(frame, coordinate_frames.CoordinateFrame): frame = frame.name frame_names = [getattr(item[0], "name", item[0]) for item in self._pipeline] return frame_names.index(frame) def _get_frame_name(self, frame): """ Return the name of the frame and a ``CoordinateFrame`` object. Parameters ---------- frame : str, `~gwcs.coordinate_frames.CoordinateFrame` Coordinate frame. Returns ------- name : str The name of the frame frame_obj : `~gwcs.coordinate_frames.CoordinateFrame` Frame instance or None (if `frame` is str) """ if isinstance(frame, str): name = frame frame_obj = None else: name = frame.name frame_obj = frame return name, frame_obj
[docs] def __call__(self, *args, **kwargs): """ Executes the forward transform. args : float or array-like Inputs in the input coordinate system, separate inputs for each dimension. with_units : bool If ``True`` returns a `~astropy.coordinates.SkyCoord` or `~astropy.units.Quantity` object, by using the units of the output cooridnate frame. Optional, default=False. with_bounding_box : bool, optional If True(default) values in the result which correspond to any of the inputs being outside the bounding_box are set to ``fill_value``. fill_value : float, optional Output value for inputs outside the bounding_box (default is np.nan). """ transform = self.forward_transform if transform is None: raise NotImplementedError("WCS.forward_transform is not implemented.") with_units = kwargs.pop("with_units", False) if 'with_bounding_box' not in kwargs: kwargs['with_bounding_box'] = True if 'fill_value' not in kwargs: kwargs['fill_value'] = np.nan if self.bounding_box is not None: # Currently compound models do not attempt to combine individual model # bounding boxes. Get the forward transform and assign the ounding_box to it # before evaluating it. The order Model.bounding_box is reversed. axes_ind = self._get_axes_indices() if transform.n_inputs > 1: transform.bounding_box = np.array(self.bounding_box)[axes_ind][::-1] else: transform.bounding_box = self.bounding_box result = transform(*args, **kwargs) if with_units: if self.output_frame.naxes == 1: result = self.output_frame.coordinates(result) else: result = self.output_frame.coordinates(*result) return result
[docs] def invert(self, *args, **kwargs): """ Invert coordinates. The analytical inverse of the forward transform is used, if available. If not an iterative method is used. Parameters ---------- args : float, array like, `~astropy.coordinates.SkyCoord` or `~astropy.units.Unit` coordinates to be inverted kwargs : dict keyword arguments to be passed to the iterative invert method. with_bounding_box : bool, optional If True(default) values in the result which correspond to any of the inputs being outside the bounding_box are set to ``fill_value``. fill_value : float, optional Output value for inputs outside the bounding_box (default is np.nan). """ if not utils.isnumerical(args[0]): args = self.output_frame.coordinate_to_quantity(*args) if not self.forward_transform.uses_quantity: args = utils.get_values(self.output_frame.unit, *args) with_units = kwargs.pop('with_units', False) if 'with_bounding_box' not in kwargs: kwargs['with_bounding_box'] = True if 'fill_value' not in kwargs: kwargs['fill_value'] = np.nan try: result = self.backward_transform(*args, **kwargs) except (NotImplementedError, KeyError): result = self._invert(*args, **kwargs) if with_units and self.input_frame: if self.input_frame.naxes == 1: return self.input_frame.coordinates(result) else: return self.input_frame.coordinates(*result) else: return result
def _invert(self, *args, **kwargs): """ Implement iterative inverse here. """ raise NotImplementedError
[docs] def transform(self, from_frame, to_frame, *args, **kwargs): """ Transform positions between two frames. Parameters ---------- from_frame : str or `~gwcs.coordinate_frames.CoordinateFrame` Initial coordinate frame. to_frame : str, or instance of `~gwcs.cordinate_frames.CoordinateFrame` Coordinate frame into which to transform. args : float or array-like Inputs in ``from_frame``, separate inputs for each dimension. output_with_units : bool If ``True`` - returns a `~astropy.coordinates.SkyCoord` or `~astropy.units.Quantity` object. with_bounding_box : bool, optional If True(default) values in the result which correspond to any of the inputs being outside the bounding_box are set to ``fill_value``. fill_value : float, optional Output value for inputs outside the bounding_box (default is np.nan). """ transform = self.get_transform(from_frame, to_frame) if not utils.isnumerical(args[0]): inp_frame = getattr(self, from_frame) args = inp_frame.coordinate_to_quantity(*args) if not transform.uses_quantity: args = utils.get_values(inp_frame.unit, *args) with_units = kwargs.pop("with_units", False) if 'with_bounding_box' not in kwargs: kwargs['with_bounding_box'] = True if 'fill_value' not in kwargs: kwargs['fill_value'] = np.nan result = transform(*args, **kwargs) if with_units: to_frame_name, to_frame_obj = self._get_frame_name(to_frame) if to_frame_obj is not None: if to_frame_obj.naxes == 1: result = to_frame_obj.coordinates(result) else: result = to_frame_obj.coordinates(*result) else: raise TypeError("Coordinate objects could not be created because" "frame {0} is not defined.".format(to_frame_name)) return result
@property def available_frames(self): """ List all frames in this WCS object. Returns ------- available_frames : dict {frame_name: frame_object or None} """ if self._pipeline: return [getattr(frame[0], "name", frame[0]) for frame in self._pipeline] else: return None
[docs] def insert_transform(self, frame, transform, after=False): """ Insert a transform before (default) or after a coordinate frame. Append (or prepend) a transform to the transform connected to frame. Parameters ---------- frame : str or `~gwcs.coordinate_frame.CoordinateFrame` Coordinate frame which sets the point of insertion. transform : `~astropy.modeling.Model` New transform to be inserted in the pipeline after : bool If True, the new transform is inserted in the pipeline immediately after ``frame``. """ name, _ = self._get_frame_name(frame) frame_ind = self._get_frame_index(name) if not after: fr, current_transform = self._pipeline[frame_ind - 1] self._pipeline[frame_ind - 1] = (fr, current_transform | transform) else: fr, current_transform = self._pipeline[frame_ind] self._pipeline[frame_ind] = (fr, transform | current_transform)
@property def unit(self): """The unit of the coordinates in the output coordinate system.""" if self._pipeline: try: return getattr(self, self._pipeline[-1][0].name).unit except AttributeError: return None else: return None @property def output_frame(self): """Return the output coordinate frame.""" if self._pipeline: frame = self._pipeline[-1][0] if not isinstance(frame, str): frame = frame.name return getattr(self, frame) else: return None @property def input_frame(self): """Return the input coordinate frame.""" if self._pipeline: frame = self._pipeline[0][0] if not isinstance(frame, str): frame = frame.name return getattr(self, frame) else: return None @property def name(self): """Return the name for this WCS.""" return self._name @name.setter def name(self, value): """Set the name for the WCS.""" self._name = value @property def pipeline(self): """Return the pipeline structure.""" return self._pipeline @property def bounding_box(self): """ Return the range of acceptable values for each input axis. The order of the axes is `~gwcs.coordinate_frames.CoordinateFrame.axes_order`. """ frames = self.available_frames transform_0 = self.get_transform(frames[0], frames[1]) try: bb = transform_0.bounding_box except NotImplementedError: return None if transform_0.n_inputs == 1: return bb try: axes_order = self.input_frame.axes_order except AttributeError: axes_order = np.arange(transform_0.n_inputs) # Model.bounding_box is in python order, need to reverse it first. bb = np.array(bb[::-1])[np.array(axes_order)] return tuple(tuple(item) for item in bb) @bounding_box.setter def bounding_box(self, value): """ Set the range of acceptable values for each input axis. The order of the axes is `~gwcs.coordinate_frames.CoordinateFrame.axes_order`. For two inputs and axes_order(0, 1) the bounding box is ((xlow, xhigh), (ylow, yhigh)). Parameters ---------- value : tuple or None Tuple of tuples with ("low", high") values for the range. """ frames = self.available_frames transform_0 = self.get_transform(frames[0], frames[1]) if value is None: transform_0.bounding_box = value else: try: # Make sure the dimensions of the new bbox are correct. mutils._BoundingBox.validate(transform_0, value) except: raise # get the sorted order of axes' indices axes_ind = self._get_axes_indices() if transform_0.n_inputs == 1: transform_0.bounding_box = value else: # The axes in bounding_box in modeling follow python order transform_0.bounding_box = np.array(value)[axes_ind][::-1] self.set_transform(frames[0], frames[1], transform_0) def _get_axes_indices(self): try: axes_ind = np.argsort(self.input_frame.axes_order) except AttributeError: # the case of a frame being a string axes_ind = np.arange(self.forward_transform.n_inputs) return axes_ind def __str__(self): from astropy.table import Table col1 = [item[0] for item in self._pipeline] col2 = [] for item in self._pipeline[: -1]: model = item[1] if model.name is not None: col2.append(model.name) else: col2.append(model.__class__.__name__) col2.append(None) t = Table([col1, col2], names=['From', 'Transform']) return str(t) def __repr__(self): fmt = "<WCS(output_frame={0}, input_frame={1}, forward_transform={2})>".format( self.output_frame, self.input_frame, self.forward_transform) return fmt
[docs] def footprint(self, bounding_box=None, center=False, axis_type="all"): """ Return the footprint in world coordinates. Parameters ---------- bounding_box : tuple of floats: (start, stop) `prop: bounding_box` center : bool If `True` use the center of the pixel, otherwise use the corner. axis_type : str A supported ``output_frame.axes_type`` or "all" (default). One of ['spatial', 'spectral', 'temporal'] or a custom type. Returns ------- coord : ndarray Array of coordinates in the output_frame mapping corners to the output frame. For spatial coordinates the order is clockwise, starting from the bottom left corner. """ def _order_clockwise(v): return np.asarray([[v[0][0], v[1][0]], [v[0][0], v[1][1]], [v[0][1], v[1][1]], [v[0][1], v[1][0]]]).T if bounding_box is None: if self.bounding_box is None: raise TypeError("Need a valid bounding_box to compute the footprint.") bb = self.bounding_box else: bb = bounding_box all_spatial = all([t.lower() == "spatial" for t in self.output_frame.axes_type]) if all_spatial: vertices = _order_clockwise(bb) else: vertices = np.array(list(itertools.product(*bb))).T if center: vertices = _toindex(vertices) result = np.asarray(self.__call__(*vertices, **{'with_bounding_box': False})) axis_type = axis_type.lower() if axis_type == 'spatial' and all_spatial: return result.T if axis_type != "all": axtyp_ind = np.array([t.lower() for t in self.output_frame.axes_type]) == axis_type if not axtyp_ind.any(): raise ValueError('This WCS does not have axis of type "{}".'.format(axis_type)) result = np.asarray([(r.min(), r.max()) for r in result[axtyp_ind]]) if axis_type == "spatial": result = _order_clockwise(result) else: result.sort() result = np.squeeze(result) return result.T