from __future__ import absolute_import, division, print_function
import logging
import numpy as np
from astropy.wcs import WCS
from glue.utils import unbroadcast, broadcast_to, axis_correlation_matrix
__all__ = ['Coordinates', 'AffineCoordinates', 'WCSCoordinates', 'coordinates_from_header', 'coordinates_from_wcs']
[docs]class Coordinates(object):
"""
Base class for coordinate transformation
"""
def __init__(self):
pass
[docs] def pixel2world(self, *args):
"""
Convert pixel to world coordinates, preserving input type/shape.
Parameters
----------
*pixel : scalars lists, or Numpy arrays
The pixel coordinates (0-based) to convert
Returns
-------
*world : Numpy arrays
The corresponding world coordinates
"""
return args
[docs] def world2pixel(self, *args):
"""
Convert world to pixel coordinates, preserving input type/shape.
Parameters
----------
*world : scalars lists, or Numpy arrays
The world coordinates to convert
Returns
-------
*pixel : Numpy arrays
The corresponding pixel coordinates
"""
return args
[docs] def default_world_coords(self, ndim):
return np.zeros(ndim, dtype=float)
# PY3: pixel2world_single_axis(self, *pixel, axis=None)
[docs] def pixel2world_single_axis(self, *pixel, **kwargs):
"""
Convert pixel to world coordinates, preserving input type/shape.
This is a wrapper around pixel2world which returns the result for just
one axis, and also determines whether the calculation can be sped up
if broadcasting is present in the input arrays.
Parameters
----------
*pixel : scalars lists, or Numpy arrays
The pixel coordinates (0-based) to convert
axis : int, optional
If only one axis is needed, it should be specified since the
calculation will be much more efficient.
Returns
-------
world : `numpy.ndarray`
The world coordinates for the requested axis
"""
# PY3: the following is needed for Python 2
axis = kwargs.get('axis', None)
if axis is None:
raise ValueError("axis needs to be set")
if np.size(pixel[0]) == 0:
return np.array([], dtype=float)
original_shape = pixel[0].shape
pixel_new = []
# NOTE: the axis passed to this function is the WCS axis not the Numpy
# axis, so we need to convert it as needed.
dep_axes = self.dependent_axes(len(pixel) - 1 - axis)
for ip, p in enumerate(pixel):
if (len(pixel) - 1 - ip) in dep_axes:
pixel_new.append(unbroadcast(p))
else:
pixel_new.append(p.flat[0])
pixel = np.broadcast_arrays(*pixel_new)
result = self.pixel2world(*pixel)
return broadcast_to(result[axis], original_shape)
[docs] def world2pixel_single_axis(self, *world, **kwargs):
"""
Convert world to pixel coordinates, preserving input type/shape.
This is a wrapper around world2pixel which returns the result for just
one axis, and also determines whether the calculation can be sped up
if broadcasting is present in the input arrays.
Parameters
----------
*world : scalars lists, or Numpy arrays
The world coordinates to convert
axis : int, optional
If only one axis is needed, it should be specified since the
calculation will be much more efficient.
Returns
-------
pixel : `numpy.ndarray`
The pixel coordinates for the requested axis
"""
# PY3: the following is needed for Python 2
axis = kwargs.get('axis', None)
if axis is None:
raise ValueError("axis needs to be set")
if np.size(world[0]) == 0:
return np.array([], dtype=float)
original_shape = world[0].shape
world_new = []
# NOTE: the axis passed to this function is the WCS axis not the Numpy
# axis, so we need to convert it as needed.
dep_axes = self.dependent_axes(len(world) - 1 - axis)
for iw, w in enumerate(world):
if (len(world) - 1 - iw) in dep_axes:
world_new.append(unbroadcast(w))
else:
world_new.append(w.flat[0])
world = np.broadcast_arrays(*world_new)
result = self.world2pixel(*world)
return broadcast_to(result[axis], original_shape)
[docs] def world_axis(self, data, axis):
"""
Find the world coordinates along a given dimension, and which for now
we center on the pixel origin.
Parameters
----------
data : `~glue.core.data.Data`
The data to compute the coordinate axis for (this is used to
determine the size of the axis)
axis : int
The axis to compute, in Numpy axis order
Notes
-----
This method computes the axis values using pixel positions at the
center of the data along all other axes. This will therefore only give
the correct result for non-dependent axes (which can be checked using
the ``dependent_axes`` method).
"""
pixel = []
for i, s in enumerate(data.shape):
if i == axis:
pixel.append(np.arange(data.shape[axis]))
else:
pixel.append(np.repeat((s - 1) / 2, data.shape[axis]))
return self.pixel2world_single_axis(*pixel[::-1],
axis=data.ndim - 1 - axis)
[docs] def world_axis_unit(self, axis):
"""
Return the unit of the world coordinate given by ``axis`` (assuming the
Numpy axis order)
"""
return ''
[docs] def axis_label(self, axis):
return "World {}".format(axis)
[docs] def dependent_axes(self, axis):
"""Return a tuple of which world-axes are non-independent
from a given pixel axis
The axis index is given in numpy ordering convention (note that
opposite the fits convention)
"""
return (axis,)
def __gluestate__(self, context):
return {} # no state
@classmethod
def __setgluestate__(cls, rec, context):
return cls()
[docs]class WCSCoordinates(Coordinates):
"""
Class for coordinate transformation based on the WCS FITS
standard. This class does not take into account
distortions.
Parameters
----------
header : :class:`astropy.io.fits.Header`
FITS header (derived from WCS if not given)
wcs : :class:`astropy.wcs.WCS`
WCS object to use, if different from header
References
----------
* Greisen & Calabretta (2002), Astronomy and Astrophysics, 395, 1061
* Calabretta & Greisen (2002), Astronomy and Astrophysics, 395, 1077
* Greisen, Calabretta, Valdes & Allen (2006), Astronomy and
Astrophysics, 446, 747
"""
def __init__(self, header=None, wcs=None):
super(WCSCoordinates, self).__init__()
if header is None and wcs is None:
raise ValueError('Must provide either FITS header or WCS or both')
if header is None:
header = wcs.to_header()
self._header = header
try:
naxis = header['NAXIS']
except (KeyError, TypeError):
naxis = None
wcs = wcs or WCS(header, naxis=naxis)
# update WCS interface if using old API
mapping = {'wcs_pix2world': 'wcs_pix2sky',
'wcs_world2pix': 'wcs_sky2pix',
'all_pix2world': 'all_pix2sky'}
for k, v in mapping.items():
if not hasattr(wcs, k):
setattr(wcs, k, getattr(wcs, v))
self._wcs = wcs
# Pre-compute dependent axes. The matrix returned by
# axis_correlation_matrix is (n_world, n_pixel) but we want to know
# which pixel coordinates are linked to which other pixel coordinates.
# So to do this we take a column from the matrix and find if there are
# any entries in common with all other columns in the matrix.
matrix = axis_correlation_matrix(wcs)[::-1, ::-1]
self._dependent_axes = []
for axis in range(wcs.naxis):
world_dep = matrix[:, axis:axis + 1]
dependent = tuple(np.nonzero((world_dep & matrix).any(axis=0))[0])
self._dependent_axes.append(dependent)
[docs] def world_axis_unit(self, axis):
return str(self._wcs.wcs.cunit[self._wcs.naxis - 1 - axis])
[docs] @property
def wcs(self):
return self._wcs
[docs] def dependent_axes(self, axis):
return self._dependent_axes[axis]
def __setstate__(self, state):
self.__dict__ = state
# wcs object doesn't seem to unpickle properly. reconstruct it
from astropy.wcs import WCS
try:
naxis = self._header['NAXIS']
except (KeyError, TypeError):
naxis = None
self._wcs = WCS(self._header, naxis=naxis)
[docs] def pixel2world(self, *pixel):
# PY3: can just do pix2world(*pixel, 0)
if np.size(pixel[0]) == 0:
return tuple(np.array([], dtype=float) for p in pixel)
else:
return self._wcs.wcs_pix2world(*(tuple(pixel) + (0,)))
[docs] def world2pixel(self, *world):
# PY3: can just do world2pix(*world, 0)
if np.size(world[0]) == 0:
return tuple(np.array([], dtype=float) for w in world)
else:
return self._wcs.wcs_world2pix(*(tuple(world) + (0,)))
[docs] def default_world_coords(self, ndim):
if ndim != self._wcs.naxis:
raise ValueError("Requested default world coordinates for {0} "
"dimensions, WCS has {1}".format(ndim, self._wcs.naxis))
return self._wcs.wcs.crval
[docs] def axis_label(self, axis):
header = self._header
num = _get_ndim(header) - axis # number orientation reversed
ax = self._header.get('CTYPE%i' % num)
if ax is not None:
if len(ax) == 8 or '-' in ax: # assume standard format
ax = ax[:5].split('-')[0].title()
else:
ax = ax.title()
translate = dict(
Glon='Galactic Longitude',
Glat='Galactic Latitude',
Ra='Right Ascension',
Dec='Declination',
Velo='Velocity',
Freq='Frequency'
)
return translate.get(ax, ax)
unit = self._header.get('CUNIT%i' % num)
if unit is not None:
return "World {} ({})".format(axis, unit)
return super(WCSCoordinates, self).axis_label(axis)
def __gluestate__(self, context):
return dict(header=self._wcs.to_header_string())
@classmethod
def __setgluestate__(cls, rec, context):
from astropy.io import fits
return cls(fits.Header.fromstring(rec['header']))
[docs]class AffineCoordinates(WCSCoordinates):
"""
Coordinates determined via an affine transformation represented by an
augmented matrix of shape N+1 x N+1 matrix, where N is the number of pixel
and world coordinates. The last column of the matrix should be used for
the translation term, and the last row should be set to 0 except for the
last column which should be 1.
Note that the order of the dimensions in the matrix (x, y) should be the
opposite of the order of the order of dimensions of Numpy arrays (y, x).
"""
# Note that for now the easiest way to implement this is to sub-class
# WCS, which means that this will automatically work with WCSAxes. In
# future it would be good to make this independent from WCS but it will
# require changes to WCSAxes.
def __init__(self, matrix, units=None, labels=None):
if matrix.ndim != 2:
raise ValueError("Affine matrix should be two-dimensional")
if matrix.shape[0] != matrix.shape[1]:
raise ValueError("Affine matrix should be square")
if np.any(matrix[-1, :-1] != 0) or matrix[-1, -1] != 1:
raise ValueError("Last row of matrix should be zeros and a one")
if units is not None and len(units) != matrix.shape[0] - 1:
raise ValueError("Expected {0} units, got {1}".format(matrix.shape[0] - 1, len(units)))
if labels is not None and len(labels) != matrix.shape[0] - 1:
raise ValueError("Expected {0} labels, got {1}".format(matrix.shape[0] - 1, len(labels)))
self.matrix = matrix
self.units = units
self.labels = labels
wcs = WCS(naxis=self.matrix.shape[0] - 1)
wcs.wcs.cd = self.matrix[:-1, :-1]
wcs.wcs.crpix = np.ones(wcs.naxis)
wcs.wcs.crval = self.matrix[:-1, -1]
if labels is not None:
wcs.wcs.ctype = labels
if units is not None:
wcs.wcs.cunit = units
super(AffineCoordinates, self).__init__(wcs=wcs)
def __gluestate__(self, context):
return dict(matrix=context.do(self.matrix),
labels=self.labels,
units=self.units)
@classmethod
def __setgluestate__(cls, rec, context):
return cls(context.object(rec['matrix']),
units=rec['units'],
labels=rec['labels'])
def _get_ndim(header):
if 'NAXIS' in header:
return header['NAXIS']
if 'WCSAXES' in header:
return header['WCSAXES']
return None
[docs]def coordinates_from_wcs(wcs):
"""
Convert an Astropy WCS object into a glue Coordinates object.
Parameters
----------
wcs : :class:`astropy.wcs.WCS`
The WCS object to use
Returns
-------
coordinates : :class:`~glue.core.coordinates.Coordinates`
"""
from astropy.io import fits
hdr_str = wcs.wcs.to_header()
hdr = fits.Header.fromstring(hdr_str)
try:
return WCSCoordinates(hdr, wcs)
except (AttributeError, TypeError) as e:
print(e)
return Coordinates()
def header_from_string(string):
"""
Convert a string to a FITS header.
"""
from astropy.io import fits
return fits.Header.fromstring(string, sep='\n')