from __future__ import absolute_import, division, print_function
import warnings
import numpy as np
from numpy.lib.stride_tricks import as_strided
import pandas as pd
import bottleneck as bt
from glue.external.six import string_types
from glue.external.six.moves import range
__all__ = ['unique', 'shape_to_string', 'view_shape', 'stack_view',
'coerce_numeric', 'check_sorted', 'broadcast_to', 'unbroadcast',
'iterate_chunks', 'combine_slices', 'nanmean', 'nanmedian', 'nansum',
'nanmin', 'nanmax', 'format_minimal', 'compute_statistic',
'categorical_ndarray', 'index_lookup', 'ensure_numerical']
[docs]def unbroadcast(array):
"""
Given an array, return a new array that is the smallest subset of the
original array that can be re-broadcasted back to the original array.
See https://stackoverflow.com/questions/40845769/un-broadcasting-numpy-arrays
for more details.
"""
if array.ndim == 0:
return array
new_shape = np.where(np.array(array.strides) == 0, 1, array.shape)
return as_strided(array, shape=new_shape)
[docs]def unique(array):
"""
Return the unique elements of the array U, as well as
the index array I such that U[I] == array
Parameters
----------
array : `numpy.ndarray`
The array to use
Returns
-------
U : `numpy.ndarray`
The unique elements of the array
I : `numpy.ndarray`
The indices such that ``U[I] == array``
"""
# numpy.unique doesn't handle mixed-types on python3,
# so we use pandas
array = np.asarray(array)
I, U = pd.factorize(array, sort=True)
return U.astype(array.dtype), I
[docs]def shape_to_string(shape):
"""
On Windows, shape tuples use long ints which results in formatted shapes
such as (2L, 3L). This function ensures that the shape is always formatted
without the Ls.
"""
return "({0})".format(", ".join(str(int(item)) for item in shape))
[docs]def view_shape(shape, view):
"""
Return the shape of a view of an array.
Returns equivalent of ``np.zeros(shape)[view].shape`` but with minimal
memory usage.
Parameters
----------
shape : tuple
The shape of the array
view : slice
A valid index into a Numpy array, or None
"""
if view is None:
return shape
else:
return np.broadcast_to(1, shape)[view].shape
[docs]def stack_view(shape, *views):
shp = tuple(slice(0, s, 1) for s in shape)
result = np.broadcast_arrays(*np.ogrid[shp])
for v in views:
if isinstance(v, string_types) and v == 'transpose':
result = [r.T for r in result]
continue
result = [r[v] for r in result]
return tuple(result)
[docs]def coerce_numeric(arr):
"""
Coerce an array into a numeric array, replacing non-numeric elements with
nans.
If the array is already a numeric type, it is returned unchanged
Parameters
----------
arr : `numpy.ndarray`
The array to coerce
"""
# Already numeric type
if np.issubdtype(arr.dtype, np.number):
return arr
# Numpy datetime64 format
if np.issubdtype(arr.dtype, np.datetime64):
return arr
# Convert booleans to integers
if np.issubdtype(arr.dtype, np.bool_):
return arr.astype(np.int)
# a string dtype, or anything else
try:
return pd.to_numeric(arr, errors='coerce')
except AttributeError: # pandas < 0.19
return pd.Series(arr).convert_objects(convert_numeric=True).values
[docs]def check_sorted(array):
"""
Return `True` if the array is sorted, `False` otherwise.
"""
# this ignores NANs, and does the right thing if nans
# are concentrated at beginning or end of array
# otherwise, it will miss things at nan/finite boundaries
array = np.asarray(array)
return not (array[:-1] > array[1:]).any()
def pretty_number(numbers):
"""
Convert a list/array of numbers into a nice list of strings
Parameters
----------
numbers : list
The numbers to convert
"""
try:
return [pretty_number(n) for n in numbers]
except TypeError:
pass
n = numbers
if n == 0:
result = '0'
elif (abs(n) < 1e-3) or (abs(n) > 1e3):
result = "%0.3e" % n
elif abs(int(n) - n) < 1e-3 and int(n) != 0:
result = "%i" % n
else:
result = "%0.3f" % n
if result.find('.') != -1:
result = result.rstrip('0')
return result
[docs]def broadcast_to(array, shape):
"""
Compatibility function - can be removed once we support only Numpy 1.10
and above
"""
try:
return np.broadcast_to(array, shape)
except AttributeError:
array = np.asarray(array)
return np.broadcast_arrays(array, np.ones(shape, array.dtype))[0]
def find_chunk_shape(shape, n_max=None):
"""
Given the shape of an n-dimensional array, and the maximum number of
elements in a chunk, return the largest chunk shape to use for iteration.
This currently assumes the optimal chunk shape to return is for C-contiguous
arrays.
"""
if n_max is None:
return tuple(shape)
block_shape = []
max_repeat_remaining = n_max
for size in shape[::-1]:
if max_repeat_remaining > size:
block_shape.append(size)
max_repeat_remaining = max_repeat_remaining // size
else:
block_shape.append(max_repeat_remaining)
max_repeat_remaining = 1
return tuple(block_shape[::-1])
[docs]def iterate_chunks(shape, chunk_shape=None, n_max=None):
"""
Given a data shape and a chunk shape (or maximum chunk size), iteratively
return slice objects that can be used to slice the array.
"""
# Shortcut - if there are any 0 elements in the shape, there are no
# chunks to iterate over.
if np.prod(shape) == 0:
return
if chunk_shape is None and n_max is None:
raise ValueError('Either chunk_shape or n_max should be specified')
elif chunk_shape is not None and n_max is not None:
raise ValueError('Either chunk_shape or n_max should be specified (not both)')
elif chunk_shape is None:
chunk_shape = find_chunk_shape(shape, n_max)
else:
if len(chunk_shape) != len(shape):
raise ValueError('chunk_shape should have the same length as shape')
elif any(x > y for (x, y) in zip(chunk_shape, shape)):
raise ValueError('chunk_shape should fit within shape')
ndim = len(chunk_shape)
start_index = [0] * ndim
shape = list(shape)
while start_index <= shape:
end_index = [min(start_index[i] + chunk_shape[i], shape[i]) for i in range(ndim)]
slices = tuple([slice(start_index[i], end_index[i]) for i in range(ndim)])
yield slices
# Update chunk index. What we do is to increment the
# counter for the first dimension, and then if it
# exceeds the number of elements in that direction,
# cycle back to zero and advance in the next dimension,
# and so on.
start_index[0] += chunk_shape[0]
for i in range(ndim - 1):
if start_index[i] >= shape[i]:
start_index[i] = 0
start_index[i + 1] += chunk_shape[i + 1]
# We can now check whether the iteration is finished
if start_index[-1] >= shape[-1]:
break
[docs]def combine_slices(slice1, slice2, length):
"""
Given two slices that can be applied to a 1D array and the length of that
array, this returns a new slice which is the one that should be applied to
the array instead of slice2 if slice1 has already been applied.
"""
beg1, end1, step1 = slice1.indices(length)
beg2, end2, step2 = slice2.indices(length)
if step1 < 0 or step2 < 0:
raise ValueError("combine_slices does not support slices with negative step")
if beg2 >= end1 or end2 <= beg1:
return slice(0, 0, 1)
beg = max(beg1, beg2)
end = min(end1, end2)
if (beg - beg2) % step2 != 0:
beg += step2 - ((beg - beg2) % step2)
# Now we want to find the two first overlap indices inside the overlap
# range. Loop over indices of second slice (but with min/max constraints
# of first added) and check if they are valid indices given slice1
indices = []
for idx in range(beg, end, step2):
if (idx - beg1) % step1 == 0:
indices.append((idx - beg1) // step1)
if len(indices) == 2:
break
if len(indices) == 0:
return slice(0, 0, 1)
elif len(indices) == 1:
return slice(indices[0], indices[0] + 1, 1)
else:
end_new = (end - beg1) // step1
if (end - beg1) % step1 != 0:
end_new += 1
return slice(indices[0], end_new, indices[1] - indices[0])
def _move_tuple_axes_first(array, axis):
"""
Bottleneck can only take integer axis, not tuple, so this function takes all
the axes to be operated on and combines them into the first dimension of the
array so that we can then use axis=0
"""
# Figure out how many axes we are operating over
naxis = len(axis)
# Add remaining axes to the axis tuple
axis += tuple(i for i in range(array.ndim) if i not in axis)
# The new position of each axis is just in order
destination = tuple(range(array.ndim))
# Reorder the array so that the axes being operated on are at the beginning
array_new = np.moveaxis(array, axis, destination)
# Figure out the size of the product of the dimensions being operated on
first = np.prod(array_new.shape[:naxis])
# Collapse the dimensions being operated on into a single dimension so that
# we can then use axis=0 with the bottleneck functions
array_new = array_new.reshape((first,) + array_new.shape[naxis:])
return array_new
[docs]def nanmean(array, axis=None):
if isinstance(axis, tuple):
array = _move_tuple_axes_first(array, axis=axis)
axis = 0
return bt.nanmean(array, axis=axis)
[docs]def nansum(array, axis=None):
if isinstance(axis, tuple):
array = _move_tuple_axes_first(array, axis=axis)
axis = 0
return bt.nansum(array, axis=axis)
[docs]def nanmin(array, axis=None):
if isinstance(axis, tuple):
array = _move_tuple_axes_first(array, axis=axis)
axis = 0
return bt.nanmin(array, axis=axis)
[docs]def nanmax(array, axis=None):
if isinstance(axis, tuple):
array = _move_tuple_axes_first(array, axis=axis)
axis = 0
return bt.nanmax(array, axis=axis)
PLAIN_FUNCTIONS = {'minimum': np.min,
'maximum': np.max,
'mean': np.mean,
'median': np.median,
'sum': np.sum,
'percentile': np.percentile}
NAN_FUNCTIONS = {'minimum': nanmin,
'maximum': nanmax,
'mean': nanmean,
'median': nanmedian,
'sum': nansum,
'percentile': np.nanpercentile}
[docs]def compute_statistic(statistic, data, mask=None, axis=None, finite=True,
positive=False, percentile=None):
"""
Compute a statistic for the data.
Parameters
----------
statistic : {'minimum', 'maximum', 'mean', 'median', 'sum', 'percentile'}
The statistic to compute
data : `numpy.ndarray`
The data to compute the statistic for.
mask : `numpy.ndarray`
The mask to apply when computing the statistic.
axis : None or int or tuple of int
If specified, the axis/axes to compute the statistic over.
finite : bool, optional
Whether to include only finite values in the statistic. This should
be `True` to ignore NaN/Inf values
positive : bool, optional
Whether to include only (strictly) positive values in the statistic.
This is used for example when computing statistics of data shown in
log space.
percentile : float, optional
If ``statistic`` is ``'percentile'``, the ``percentile`` argument
should be given and specify the percentile to calculate in the
range [0:100]
"""
data = np.asanyarray(data)
if mask is not None:
mask = np.asanyarray(mask, dtype=bool)
# NOTE: this function should not ever have to use glue-specific objects.
# The aim is to eventually use a fast C implementation of this function.
if statistic not in PLAIN_FUNCTIONS:
raise ValueError("Unrecognized statistic: {0}".format(statistic))
if (finite or positive or mask is not None) and data.dtype.kind != 'M':
keep = np.ones(data.shape, dtype=bool)
if finite:
keep &= np.isfinite(data)
if positive:
keep &= data > 0
if mask is not None:
keep &= mask
if axis is None:
data = data[keep]
else:
# We need to force a copy since we are editing the values and we
# might as well convert to float just in case
data = np.array(data, dtype=float)
data[~keep] = np.nan
function = NAN_FUNCTIONS[statistic]
else:
function = PLAIN_FUNCTIONS[statistic]
if data.size == 0:
return np.nan
if isinstance(axis, tuple) and len(axis) == 0:
return data
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
if statistic == 'percentile':
return function(data, percentile, axis=axis)
else:
return function(data, axis=axis)
[docs]class categorical_ndarray(np.ndarray):
"""
A Numpy array subclass that includes properties to find the categories and
unique integer codes for array values.
"""
_jitter = None
def __new__(cls, value, dtype=None, copy=True, order=None, subok=False,
ndmin=0, categories=None):
result = np.array(value, dtype=dtype, copy=copy, order=order,
subok=True, ndmin=ndmin).view(categorical_ndarray)
if categories is not None:
result.categories = categories
return result
def __array_finalize__(self, obj):
if isinstance(obj, categorical_ndarray):
self.categories = obj.categories
def _update_categories_and_codes(self):
if hasattr(self, '_categories'):
self._codes = index_lookup(self, self._categories)
else:
self._categories, self._codes = unique(self)
self._categories.setflags(write=False)
self._codes = self._codes.astype(float)
self._codes.setflags(write=False)
@property
def categories(self):
if not hasattr(self, '_categories'):
self._update_categories_and_codes()
return self._categories
[docs] @categories.setter
def categories(self, value):
self._categories = value
[docs] @property
def codes(self):
if not hasattr(self, '_codes'):
self._update_categories_and_codes()
if self._jitter is None:
return self._codes
else:
return self._codes + self._jitter
[docs] def jitter(self, method=None):
"""
Jitter the codes.
Parameters
----------
method : {None, 'uniform'}
If `None`, not jittering is done (or any jittering is undone).
If ``'uniform'``, the codes are randomized by a uniformly
distributed random variable.
"""
if method is None:
self._jitter = None
elif method == 'uniform':
self._jitter = np.random.random(self.shape)
self._jitter -= 0.5
else:
raise ValueError("method should be None or 'uniform'")
[docs]def ensure_numerical(values):
if isinstance(values, categorical_ndarray):
return values.codes
else:
return values
[docs]def index_lookup(data, items):
"""
Lookup which index in items each data value is equal to
Parameters
----------
data
An array-like object
items
Array-like of unique values
Returns
-------
array
If result[i] is finite, then data[i] = categories[result[i]]
Otherwise, data[i] is not in the categories list
"""
# np.searchsorted doesn't work on mixed types in Python3
ndata, ncat = len(data), len(items)
data = pd.DataFrame({'data': data, 'row': np.arange(ndata)})
cats = pd.DataFrame({'items': items,
'cat_row': np.arange(ncat)})
m = pd.merge(data, cats, left_on='data', right_on='items')
result = np.zeros(ndata, dtype=float) * np.nan
result[np.array(m.row)] = m.cat_row
return result