"""Central data class and associated."""
# --- import --------------------------------------------------------------------------------------
import collections
import operator
import functools
import warnings
import numpy as np
import h5py
import scipy
from scipy.interpolate import griddata, interp1d
from skimage.transform import downscale_local_mean
from .._group import Group
from .. import collection as wt_collection
from .. import exceptions as wt_exceptions
from .. import kit as wt_kit
from .. import units as wt_units
from ._axis import Axis, identifier_to_operator
from ._channel import Channel
from ._constant import Constant
from ._variable import Variable
# --- define --------------------------------------------------------------------------------------
__all__ = ["Data"]
# --- class ---------------------------------------------------------------------------------------
[docs]class Data(Group):
"""Multidimensional dataset."""
class_name = "Data"
[docs] def __init__(self, *args, **kwargs):
self._axes = []
self._constants = []
Group.__init__(self, *args, **kwargs)
# populate axes from attrs string
for identifier in self.attrs.get("axes", []):
identifier = identifier.decode()
expression, units = identifier.split("{")
units = units.replace("}", "")
for i in identifier_to_operator.keys():
expression = expression.replace(i, identifier_to_operator[i])
expression = expression.replace(" ", "") # remove all whitespace
axis = Axis(self, expression, units.strip())
self._axes.append(axis)
for identifier in self.attrs.get("constants", []):
identifier = identifier.decode()
expression, units = identifier.split("{")
units = units.replace("}", "")
for i in identifier_to_operator.keys():
expression = expression.replace(i, identifier_to_operator[i])
expression = expression.replace(" ", "") # remove all whitespace
const = Constant(self, expression, units.strip())
self._constants.append(const)
self._current_axis_identities_in_natural_namespace = []
self._on_constants_updated()
self._on_axes_updated()
# the following are populated if not already recorded
self.channel_names
self.source
self.variable_names
def __repr__(self) -> str:
return "<WrightTools.Data '{0}' {1} at {2}>".format(
self.natural_name, str(self.axis_names), "::".join([self.filepath, self.name])
)
@property
def axes(self) -> tuple:
return tuple(self._axes)
@property
def axis_expressions(self) -> tuple:
"""Axis expressions."""
return tuple(a.expression for a in self._axes)
@property
def axis_names(self) -> tuple:
"""Axis names."""
return tuple(a.natural_name for a in self._axes)
@property
def constants(self) -> tuple:
return tuple(self._constants)
@property
def constant_expressions(self) -> tuple:
"""Axis expressions."""
return tuple(a.expression for a in self._constants)
@property
def constant_names(self) -> tuple:
"""Axis names."""
return tuple(a.natural_name for a in self._constants)
@property
def channel_names(self) -> tuple:
"""Channel names."""
if "channel_names" not in self.attrs.keys():
self.attrs["channel_names"] = np.array([], dtype="S")
return tuple(s.decode() for s in self.attrs["channel_names"])
@channel_names.setter
def channel_names(self, value):
"""Set channel names."""
self.attrs["channel_names"] = np.array(value, dtype="S")
@property
def channels(self) -> tuple:
"""Channels."""
return tuple(self[n] for n in self.channel_names)
@property
def datasets(self) -> tuple:
"""Datasets."""
return tuple(v for _, v in self.items() if isinstance(v, h5py.Dataset))
@property
def kind(self):
"""Kind."""
if "kind" not in self.attrs.keys():
self.attrs["kind"] = "None"
value = self.attrs["kind"]
return value if not value == "None" else None
@property
def ndim(self) -> int:
"""Get number of dimensions."""
try:
assert self._ndim is not None
except (AssertionError, AttributeError):
if len(self.variables) == 0:
self._ndim = 0
else:
self._ndim = self.variables[0].ndim
finally:
return self._ndim
@property
def shape(self) -> tuple:
"""Shape."""
try:
assert self._shape is not None
except (AssertionError, AttributeError):
self._shape = wt_kit.joint_shape(*self.variables)
finally:
return self._shape
@property
def size(self) -> int:
"""Size."""
return functools.reduce(operator.mul, self.shape)
@property
def source(self):
"""Source."""
if "source" not in self.attrs.keys():
self.attrs["source"] = "None"
value = self.attrs["source"]
return value if not value == "None" else None
@property
def units(self) -> tuple:
"""All axis units."""
return tuple(a.units for a in self._axes)
@property
def constant_units(self) -> tuple:
"""All constant units."""
return tuple(a.units for a in self._constants)
@property
def variable_names(self) -> tuple:
"""Variable names."""
if "variable_names" not in self.attrs.keys():
self.attrs["variable_names"] = np.array([], dtype="S")
return tuple(s.decode() for s in self.attrs["variable_names"])
@variable_names.setter
def variable_names(self, value):
"""Set variable names."""
self.attrs["variable_names"] = np.array(value, dtype="S")
@property
def variables(self) -> tuple:
"""Variables."""
try:
assert self._variables is not None
except (AssertionError, AttributeError):
self._variables = [self[n] for n in self.variable_names]
finally:
return tuple(self._variables)
@property
def _leaf(self):
return "{0} {1}".format(self.natural_name, self.shape)
def _on_axes_updated(self):
"""Method to run when axes are changed in any way.
Propagates updated axes properly.
"""
# update attrs
self.attrs["axes"] = [a.identity.encode() for a in self._axes]
# remove old attributes
while len(self._current_axis_identities_in_natural_namespace) > 0:
key = self._current_axis_identities_in_natural_namespace.pop(0)
try:
delattr(self, key)
except AttributeError:
pass # already gone
# populate new attributes
for a in self._axes:
key = a.natural_name
setattr(self, key, a)
self._current_axis_identities_in_natural_namespace.append(key)
def _on_constants_updated(self):
"""Method to run when constants are changed in any way.
Propagates updated constants properly.
"""
# update attrs
self.attrs["constants"] = [a.identity.encode() for a in self._constants]
def _print_branch(self, prefix, depth, verbose):
def print_leaves(prefix, lis, vline=True):
for i, item in enumerate(lis):
if vline:
a = "│ "
else:
a = " "
if i + 1 == len(lis):
b = "└── "
else:
b = "├── "
s = prefix + a + b + "{0}: {1}".format(i, item._leaf)
print(s)
if verbose:
# axes
print(prefix + "├── axes")
print_leaves(prefix, self.axes)
# constants
print(prefix + "├── constants")
print_leaves(prefix, self.constants)
# variables
print(prefix + "├── variables")
print_leaves(prefix, self.variables)
# channels
print(prefix + "└── channels")
print_leaves(prefix, self.channels, vline=False)
else:
# axes
s = "axes: "
s += ", ".join(["{0} ({1})".format(a.expression, a.units) for a in self.axes])
print(prefix + "├── " + s)
# constants
s = "constants: "
s += ", ".join(
["{0} ({1} {2})".format(a.expression, a.value, a.units) for a in self.constants]
)
print(prefix + "├── " + s)
# channels
s = "channels: "
s += ", ".join(self.channel_names)
print(prefix + "└── " + s)
[docs] def bring_to_front(self, channel):
"""Bring a specific channel to the zero-indexed position in channels.
All other channels get pushed back but remain in order.
Parameters
----------
channel : int or str
Channel index or name.
"""
channel_index = wt_kit.get_index(self.channel_names, channel)
new = list(self.channel_names)
new.insert(0, new.pop(channel_index))
self.channel_names = new
[docs] def chop(self, *args, at={}, parent=None, verbose=True) -> wt_collection.Collection:
"""Divide the dataset into its lower-dimensionality components.
Parameters
----------
axis : str or int (args)
Axes of the returned data objects. Strings refer to the names of
axes in this object, integers refer to their index. Provide multiple
axes to return multidimensional data objects.
at : dict (optional)
Choice of position along an axis. Keys are axis names, values are lists
``[position, input units]``. If exact position does not exist,
the closest valid position is used.
parent : WrightTools Collection instance (optional)
Collection to place the new "chop" collection within. Default is
None (new parent).
verbose : bool (optional)
Toggle talkback. Default is True.
Returns
-------
WrightTools Collection
Collection of chopped data objects.
Examples
--------
>>> data.axis_names
['d2', 'w1', 'w2']
Get all w1 wigners.
>>> datas = data.chop('d2', 'w1')
>>> len(datas)
51
Get 2D frequency at d2=0 fs.
>>> datas = data.chop('w1', 'w2', at={'d2': [0, 'fs']})
>>> len(datas)
0
>>> datas[0].axis_names
['w1', 'w2']
>>> datas[0].d2[:]
0.
See Also
--------
collapse
Collapse the dataset along one axis.
split
Split the dataset while maintaining its dimensionality.
"""
from ._axis import operators, operator_to_identifier
# parse args
args = list(args)
for i, arg in enumerate(args):
if isinstance(arg, int):
args[i] = self._axes[arg].natural_name
elif isinstance(arg, str):
# same normalization that occurs in the natural_name @property
arg = arg.strip()
for op in operators:
arg = arg.replace(op, operator_to_identifier[op])
args[i] = wt_kit.string2identifier(arg)
# normalize the at keys to the natural name
for k in [ak for ak in at.keys() if type(ak) == str]:
for op in operators:
if op in k:
nk = k.replace(op, operator_to_identifier[op])
at[nk] = at[k]
at.pop(k)
k = nk
# get output collection
out = wt_collection.Collection(name="chop", parent=parent)
# get output shape
kept = args + [ak for ak in at.keys() if type(ak) == str]
kept_axes = [self._axes[self.axis_names.index(a)] for a in kept]
removed_axes = [a for a in self._axes if a not in kept_axes]
removed_shape = wt_kit.joint_shape(*removed_axes)
if removed_shape == ():
removed_shape = (1,) * self.ndim
removed_shape = list(removed_shape)
for i in at.keys():
if type(i) == int:
removed_shape[i] = 1
removed_shape = tuple(removed_shape)
# iterate
i = 0
for idx in np.ndindex(removed_shape):
idx = np.array(idx, dtype=object)
idx[np.array(removed_shape) == 1] = slice(None)
for axis, point in at.items():
if type(axis) == int:
idx[axis] = point
continue
point, units = point
destination_units = self._axes[self.axis_names.index(axis)].units
point = wt_units.converter(point, units, destination_units)
axis_index = self.axis_names.index(axis)
axis = self._axes[axis_index]
idx_index = np.array(axis.shape) > 1
if np.sum(idx_index) > 1:
raise wt_exceptions.MultidimensionalAxisError("chop", axis.natural_name)
idx_index = list(idx_index).index(True)
idx[idx_index] = np.argmin(np.abs(axis[tuple(idx)] - point))
data = out.create_data(name="chop%03i" % i)
for v in self.variables:
kwargs = {}
kwargs["name"] = v.natural_name
kwargs["values"] = v[idx]
kwargs["units"] = v.units
kwargs["label"] = v.label
kwargs.update(v.attrs)
data.create_variable(**kwargs)
for c in self.channels:
kwargs = {}
kwargs["name"] = c.natural_name
kwargs["values"] = c[idx]
kwargs["units"] = c.units
kwargs["label"] = c.label
kwargs["signed"] = c.signed
kwargs.update(c.attrs)
data.create_channel(**kwargs)
new_axes = [a.expression for a in kept_axes if a.expression not in at.keys()]
new_axis_units = [a.units for a in kept_axes if a.expression not in at.keys()]
data.transform(*new_axes)
for const in self.constant_expressions:
data.create_constant(const, verbose=False)
for ax in self.axis_expressions:
if ax not in new_axes:
data.create_constant(ax, verbose=False)
for j, units in enumerate(new_axis_units):
data.axes[j].convert(units)
i += 1
out.flush()
# return
if verbose:
print("chopped data into %d piece(s)" % len(out), "in", new_axes)
return out
[docs] def gradient(self, axis, *, channel=0):
"""
Compute the gradient along one axis.
New channels have names ``<channel name>_<axis name>_gradient``.
Parameters
----------
axis : int or str
The axis to differentiate along.
If given as an integer, the axis in the underlying array is used,
and unitary spacing is assumed.
If given as a string, the axis must exist, and be a 1D array-aligned axis.
(i.e. have a shape with a single value which is not ``1``)
The axis to collapse along is inferred from the shape of the axis.
channel : int or str
The channel to differentiate.
Default is the first channel.
"""
# get axis index --------------------------------------------------------------------------
if isinstance(axis, int):
axis_index = axis
elif isinstance(axis, str):
index = self.axis_names.index(axis)
axes = [i for i in range(self.ndim) if self.axes[index].shape[i] > 1]
if len(axes) > 1:
raise wt_exceptions.MultidimensionalAxisError(axis, "collapse")
elif len(axes) == 0:
raise wt_exceptions.ValueError(
"Axis '{}' is a single point, cannot compute gradient".format(axis)
)
axis_index = axes[0]
else:
raise wt_exceptions.TypeError("axis: expected {int, str}, got %s" % type(axis))
channel_index = wt_kit.get_index(self.channel_names, channel)
channel = self.channel_names[channel_index]
if self[channel].shape[axis_index] == 1:
raise wt_exceptions.ValueError(
"Channel '{}' has a single point along Axis '{}', cannot compute gradient".format(
channel, axis
)
)
rtype = np.result_type(self[channel].dtype, float)
new = self.create_channel(
"{}_{}_gradient".format(channel, axis),
values=np.empty(self[channel].shape, dtype=rtype),
)
channel = self[channel]
if axis == axis_index:
new[:] = np.gradient(channel[:], axis=axis_index)
else:
new[:] = np.gradient(channel[:], self[axis].points, axis=axis_index)
[docs] def moment(self, axis, channel=0, moment=1):
"""Take the nth moment the dataset along one axis, adding lower rank channels.
New channels have names ``<channel name>_<axis name>_moment_<moment num>``.
Moment 0 is the integral of the slice.
Moment 1 is the weighted average or "Center of Mass", normalized by the integral
Moment 2 is the variance, the central moment about the center of mass,
normalized by the integral
Moments 3+ are central moments about the center of mass, normalized by the integral
and by the standard deviation to the power of the moment.
Moments, especially higher order moments, are susceptible to noise and baseline.
It is recommended when used with real data to use :meth:`WrightTools.data.Channel.clip`
in conjunction with moments to reduce effects of noise.
Parameters
----------
axis : int or str
The axis to take the moment along.
If given as an integer, the axis with that index is used.
If given as a string, the axis with that name is used.
The axis must exist, and be a 1D array-aligned axis.
(i.e. have a shape with a single value which is not ``1``)
The axis to collapse along is inferred from the shape of the axis.
channel : int or str
The channel to take the moment.
If given as an integer, the channel with that index is used.
If given as a string, the channel with that name is used.
The channel must have values along the axis
(i.e. its shape must not be ``1`` in the dimension for which the axis is not ``1``)
Default is 0, the first channel.
moment : int or tuple of int
The moments to take.
One channel will be created for each number given.
Default is 1, the center of mass.
See Also
--------
collapse
Reduce dimensionality by some mathematical operation
clip
Set values above/below a threshold to a particular value
"""
# get axis index --------------------------------------------------------------------------
index = wt_kit.get_index(self.axis_names, axis)
axes = [i for i in range(self.ndim) if self.axes[index].shape[i] > 1]
if len(axes) > 1:
raise wt_exceptions.MultidimensionalAxisError(axis, "moment")
elif len(axes) == 0:
raise wt_exceptions.ValueError(
"Axis {} is a single point, cannot compute moment".format(axis)
)
axis_index = axes[0]
warnings.warn("moment", category=wt_exceptions.EntireDatasetInMemoryWarning)
channel_index = wt_kit.get_index(self.channel_names, channel)
channel = self.channel_names[channel_index]
if self[channel].shape[axis_index] == 1:
raise wt_exceptions.ValueError(
"Channel '{}' has a single point along Axis '{}', cannot compute moment".format(
channel, axis
)
)
new_shape = list(self[channel].shape)
new_shape[axis_index] = 1
channel = self[channel]
axis_inp = axis
axis = self.axes[index]
x = axis[:]
if np.any(np.isnan(x)):
raise wt_exceptions.ValueError("Axis '{}' includes NaN".format(axis_inp))
y = np.nan_to_num(channel[:])
try:
moments = tuple(moment)
except TypeError:
moments = (moment,)
if 0 in moments:
# Sort axis, so that integrals come out with expected sign
# only matters for integral, all others normalize by integral
sli = [slice(None) for _ in range(x.ndim)]
sort = np.argsort(x.flat)
sli[axis_index] = sort
sli = tuple(sli)
x = x[sli]
y = y[sli]
for moment in moments:
about = 0
norm = 1
if moment > 0:
norm = np.trapz(y, x, axis=axis_index)
norm = np.array(norm)
norm.shape = new_shape
if moment > 1:
about = np.trapz(x * y, x, axis=axis_index)
about = np.array(about)
about.shape = new_shape
about /= norm
if moment > 2:
sigma = np.trapz((x - about) ** 2 * y, x, axis=axis_index)
sigma = np.array(sigma)
sigma.shape = new_shape
sigma /= norm
sigma **= 0.5
norm *= sigma ** moment
values = np.trapz((x - about) ** moment * y, x, axis=axis_index)
values = np.array(values)
values.shape = new_shape
values /= norm
self.create_channel(
"{}_{}_{}_{}".format(channel.natural_name, axis_inp, "moment", moment),
values=values,
)
[docs] def collapse(self, axis, method="sum"):
"""Collapse the dataset along one axis, adding lower rank channels.
New channels have names ``<channel name>_<axis name>_<method>``.
Parameters
----------
axis : int or str
The axis to collapse along.
If given as an integer, the axis in the underlying array is used.
If given as a string, the axis must exist, and be a 1D array-aligned axis.
(i.e. have a shape with a single value which is not ``1``)
The axis to collapse along is inferred from the shape of the axis.
method : {'average', 'sum', 'max', 'min'} (optional)
The method of collapsing the given axis. Method may also be list
of methods corresponding to the channels of the object. Default
is sum. NaNs are ignored.
Can also be a list, allowing for different treatment for varied channels.
In this case, None indicates that no change to that channel should occur.
See Also
--------
chop
Divide the dataset into its lower-dimensionality components.
split
Split the dataset while maintaining its dimensionality.
moment
Take the moment along a particular axis
"""
if method in ("int", "integrate"):
warnings.warn(
"integrate method of collapse is deprecated, use moment(moment=0) instead",
wt_exceptions.VisibleDeprecationWarning,
)
for channel in self.channel_names:
try:
self.moment(axis, channel, moment=0)
self.rename_channels(
**{self.channel_names[-1]: f"{channel}_{axis}_{method}"}, verbose=False
)
except wt_exceptions.ValueError:
pass # may have some channels which fail, do so silently
return
# get axis index --------------------------------------------------------------------------
if isinstance(axis, int):
axis_index = axis
elif isinstance(axis, str):
index = self.axis_names.index(axis)
axes = [i for i in range(self.ndim) if self.axes[index].shape[i] > 1]
if len(axes) > 1:
raise wt_exceptions.MultidimensionalAxisError(axis, "collapse")
elif len(axes) == 0:
raise wt_exceptions.ValueError(
"Axis {} is a single point, cannot collapse".format(axis)
)
axis_index = axes[0]
else:
raise wt_exceptions.TypeError("axis: expected {int, str}, got %s" % type(axis))
new_shape = list(self.shape)
new_shape[axis_index] = 1
func = {
"sum": np.nansum,
"max": np.nanmax,
"maximum": np.nanmax,
"min": np.nanmin,
"minimum": np.nanmin,
"ave": np.nanmean,
"average": np.nanmean,
"mean": np.nanmean,
}
# methods ---------------------------------------------------------------------------------
if isinstance(method, str):
methods = [method for _ in self.channels]
if isinstance(method, list):
if len(method) == len(self.channels):
methods = method
else:
raise wt_exceptions.ValueError(
"method argument must have same number of elements as there are channels"
)
for m in methods:
if m not in func.keys():
raise wt_exceptions.ValueError("method '{}' not recognized".format(m))
warnings.warn("collapse", category=wt_exceptions.EntireDatasetInMemoryWarning)
# collapse --------------------------------------------------------------------------------
for method, channel in zip(methods, self.channel_names):
if method is None:
continue
if self[channel].shape[axis_index] == 1:
continue # Cannot collapse any further, don't clutter data object
new_shape = list(self[channel].shape)
new_shape[axis_index] = 1
rtype = self[channel].dtype
if method in ["ave", "average", "mean"]:
rtype = np.result_type(self[channel].dtype, float)
new = self.create_channel(
"{}_{}_{}".format(channel, axis, method),
values=np.empty(new_shape, dtype=rtype),
units=self[channel].units,
)
new[:] = func[method](self[channel], axis=axis_index, keepdims=True)
[docs] def convert(self, destination_units, *, convert_variables=False, verbose=True):
"""Convert all compatable axes and constants to given units.
Parameters
----------
destination_units : str
Destination units.
convert_variables : boolean (optional)
Toggle conversion of stored arrays. Default is False
verbose : bool (optional)
Toggle talkback. Default is True.
See Also
--------
Axis.convert
Convert a single axis object to compatable units. Call on an
axis object in data.axes.
"""
# get kind of units
units_kind = wt_units.kind(destination_units)
# apply to all compatible axes
for axis in self.axes:
if axis.units_kind == units_kind:
orig = axis.units
axis.convert(destination_units, convert_variables=convert_variables)
if verbose:
print(
"axis {} converted from {} to {}".format(
axis.expression, orig, destination_units
)
)
# apply to all compatible constants
for constant in self.constants:
if constant.units_kind == units_kind:
orig = constant.units
constant.convert(destination_units, convert_variables=convert_variables)
if verbose:
print(
"constant {} converted from {} to {}".format(
constant.expression, orig, destination_units
)
)
if convert_variables:
for var in self.variables:
if wt_units.kind(var.units) == units_kind:
orig = var.units
var.convert(destination_units)
if verbose:
print(
"variable {} converted from {} to {}".format(
var.natural_name, orig, destination_units
)
)
self._on_axes_updated()
self._on_constants_updated()
[docs] def create_channel(
self, name, values=None, *, shape=None, units=None, dtype=None, **kwargs
) -> Channel:
"""Append a new channel.
Parameters
----------
name : string
Unique name for this channel.
values : array (optional)
Array. If None, an empty array equaling the data shape is
created. Default is None.
shape : tuple of int
Shape to use. Must broadcast with the full shape.
Only used if `values` is None.
Default is the full shape of self.
units : string (optional)
Channel units. Default is None.
dtype : numpy.dtype (optional)
dtype to use for dataset, default is np.float64.
Only used if `values` is None.
kwargs : dict
Additional keyword arguments passed to Channel instantiation.
Returns
-------
Channel
Created channel.
"""
if name in self.channel_names:
warnings.warn(name, wt_exceptions.ObjectExistsWarning)
return self[name]
require_kwargs = {}
if values is None:
if shape is None:
require_kwargs["shape"] = self.shape
else:
require_kwargs["shape"] = shape
if dtype is None:
require_kwargs["dtype"] = np.dtype(np.float64)
else:
require_kwargs["dtype"] = dtype
if require_kwargs["dtype"].kind in "fcmM":
require_kwargs["fillvalue"] = np.nan
else:
require_kwargs["fillvalue"] = 0
else:
require_kwargs["data"] = values
require_kwargs["shape"] = values.shape
require_kwargs["dtype"] = values.dtype
# create dataset
dataset_id = self.require_dataset(name=name, chunks=True, **require_kwargs).id
channel = Channel(self, dataset_id, units=units, **kwargs)
# finish
self.attrs["channel_names"] = np.append(self.attrs["channel_names"], name.encode())
return channel
[docs] def create_variable(
self, name, values=None, *, shape=None, units=None, dtype=None, **kwargs
) -> Variable:
"""Add new child variable.
Parameters
----------
name : string
Unique identifier.
values : array-like (optional)
Array to populate variable with. If None, an variable will be filled with NaN.
Default is None.
shape : tuple of int
Shape to use. must broadcast with the full shape.
Only used if `values` is None.
Default is the full shape of self.
units : string (optional)
Variable units. Default is None.
dtype : numpy.dtype (optional)
dtype to use for dataset, default is np.float64.
Only used if `values` is None.
kwargs
Additional kwargs to variable instantiation.
Returns
-------
WrightTools Variable
New child variable.
"""
if name in self.variable_names:
warnings.warn(name, wt_exceptions.ObjectExistsWarning)
return self[name]
if values is None:
if shape is None:
shape = self.shape
if dtype is None:
dtype = np.dtype(np.float64)
if dtype.kind in "fcmM":
fillvalue = np.nan
else:
fillvalue = 0
else:
shape = values.shape
dtype = values.dtype
fillvalue = None
# create dataset
id = self.require_dataset(
name=name, data=values, shape=shape, dtype=dtype, fillvalue=fillvalue
).id
variable = Variable(self, id, units=units, **kwargs)
# finish
self._variables = None
self.attrs["variable_names"] = np.append(self.attrs["variable_names"], name.encode())
return variable
[docs] def downscale(self, tup, name=None, parent=None) -> "Data":
"""Down sample the data array using local averaging.
See `skimage.transform.downscale_local_mean`__ for more info.
__ http://scikit-image.org/docs/0.12.x/api/
skimage.transform.html#skimage.transform.downscale_local_mean
Parameters
----------
tup : tuple of ints
The collection of step sizes by which each axis is binned.
Each axis is sliced with step size determined by the tuple.
To keep an axis sampling unchanged, use 1 or None
name : string (optional)
The name of the string. Default is None.
parent : WrightTools Collection instance (optional)
Collection to place the downscaled data object. Default is
None (new parent).
Returns
-------
WrightTools Data instance
New data object with the downscaled channels and axes
See Also
--------
zoom
Zoom the data array using spline interpolation of the requested order.
"""
if name is None:
name = self.natural_name + "_downscaled"
if parent is None:
newdata = Data(name=name)
else:
parent.create_data(name=name)
for channel in self.channels:
name = channel.natural_name
newdata.create_channel(
name=name, values=downscale_local_mean(channel[:], tup), units=channel.units
)
args = []
for i, axis in enumerate(self.axes):
if len(axis.variables) > 1:
raise NotImplementedError("downscale only works with simple axes currently")
variable = axis.variables[0]
name = variable.natural_name
args.append(name)
slices = [slice(None, None, step) for step in tup]
newdata.create_variable(name=name, values=variable[slices], units=variable.units)
newdata.transform(*args)
return newdata
[docs] def get_nadir(self, channel=0) -> tuple:
"""Get the coordinates, in units, of the minimum in a channel.
Parameters
----------
channel : int or str (optional)
Channel. Default is 0.
Returns
-------
generator of numbers
Coordinates in units for each axis.
"""
# get channel
if isinstance(channel, int):
channel_index = channel
elif isinstance(channel, str):
channel_index = self.channel_names.index(channel)
else:
raise TypeError("channel: expected {int, str}, got %s" % type(channel))
channel = self.channels[channel_index]
# get indicies
idx = channel.argmin()
# finish
return tuple(a[idx] for a in self._axes)
[docs] def get_zenith(self, channel=0) -> tuple:
"""Get the coordinates, in units, of the maximum in a channel.
Parameters
----------
channel : int or str (optional)
Channel. Default is 0.
Returns
-------
generator of numbers
Coordinates in units for each axis.
"""
# get channel
if isinstance(channel, int):
channel_index = channel
elif isinstance(channel, str):
channel_index = self.channel_names.index(channel)
else:
raise TypeError("channel: expected {int, str}, got %s" % type(channel))
channel = self.channels[channel_index]
# get indicies
idx = channel.argmax()
# finish
return tuple(a[idx] for a in self._axes)
[docs] def heal(self, channel=0, method="linear", fill_value=np.nan, verbose=True):
"""
Remove nans from channel using interpolation.
Parameters
----------
channel : int or str (optional)
Channel to heal. Default is 0.
method : {'linear', 'nearest', 'cubic'} (optional)
The interpolation method. Note that cubic interpolation is only
possible for 1D and 2D data. See `griddata`__ for more information.
Default is linear.
fill_value : number-like (optional)
The value written to pixels that cannot be filled by interpolation.
Default is nan.
verbose : bool (optional)
Toggle talkback. Default is True.
__ http://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.griddata.html
.. note:: Healing may take several minutes for large datasets.
Interpolation time goes as nearest, linear, then cubic.
"""
warnings.warn("heal", category=wt_exceptions.EntireDatasetInMemoryWarning)
timer = wt_kit.Timer(verbose=False)
with timer:
# channel
if isinstance(channel, int):
channel_index = channel
elif isinstance(channel, str):
channel_index = self.channel_names.index(channel)
else:
raise TypeError("channel: expected {int, str}, got %s" % type(channel))
channel = self.channels[channel_index]
values = self.channels[channel_index][:]
points = [axis[:] for axis in self._axes]
xi = tuple(np.meshgrid(*points, indexing="ij"))
# 'undo' gridding
arr = np.zeros((len(self._axes) + 1, values.size))
for i in range(len(self._axes)):
arr[i] = xi[i].flatten()
arr[-1] = values.flatten()
# remove nans
arr = arr[:, ~np.isnan(arr).any(axis=0)]
# grid data wants tuples
tup = tuple([arr[i] for i in range(len(arr) - 1)])
# grid data
out = griddata(tup, arr[-1], xi, method=method, fill_value=fill_value)
self.channels[channel_index][:] = out
# print
if verbose:
print(
"channel {0} healed in {1} seconds".format(
channel.name, np.around(timer.interval, decimals=3)
)
)
[docs] def level(self, channel, axis, npts, *, verbose=True):
"""Subtract the average value of npts at the edge of a given axis.
Parameters
----------
channel : int or str
Channel to level.
axis : int
Axis to level along.
npts : int
Number of points to average for each slice. Positive numbers
take points at leading indicies and negative numbers take points
at trailing indicies.
verbose : bool (optional)
Toggle talkback. Default is True.
"""
warnings.warn("level", category=wt_exceptions.EntireDatasetInMemoryWarning)
channel_index = wt_kit.get_index(self.channel_names, channel)
channel = self.channels[channel_index]
# verify npts not zero
npts = int(npts)
if npts == 0:
raise wt_exceptions.ValueError("npts must not be zero")
# get subtrahend
ss = [slice(None)] * self.ndim
if npts > 0:
ss[axis] = slice(0, npts, None)
else:
ss[axis] = slice(npts, None, None)
subtrahend = np.nanmean(channel[ss], axis=axis)
if self.ndim > 1:
subtrahend = np.expand_dims(subtrahend, axis=axis)
# level
channel -= subtrahend
# finish
channel._null = 0
if verbose:
print("channel {0} leveled along axis {1}".format(channel.natural_name, axis))
[docs] def map_variable(
self, variable, points, input_units="same", *, name=None, parent=None, verbose=True
) -> "Data":
"""Map points of an axis to new points using linear interpolation.
Out-of-bounds points are written nan.
Parameters
----------
variable : string
The variable to map onto.
points : array-like or int
If array, the new points. If int, new points will have the same
limits, with int defining the number of evenly spaced points
between.
input_units : str (optional)
The units of the new points. Default is same, which assumes
the new points have the same units as the axis.
name : string (optional)
The name of the new data object. If None, generated from
natural_name. Default is None.
parent : WrightTools.Collection (optional)
Parent of new data object. If None, data is made at root of a
new temporary file.
verbose : bool (optional)
Toggle talkback. Default is True.
Returns
-------
WrightTools.Data
New data object.
"""
# get variable index
variable_index = wt_kit.get_index(self.variable_names, variable)
variable = self.variables[variable_index]
# get points
if isinstance(points, int):
points = np.linspace(variable.min(), variable.max(), points)
points = np.array(points)
# points dimensionality
if points.ndim < variable.ndim:
for i, d in enumerate(variable.shape):
if d == 1:
points = np.expand_dims(points, axis=i)
# convert points
if input_units == "same":
pass
else:
points = wt_units.converter(points, input_units, variable.units)
# construct new data object
special = ["name", "axes", "constants", "channel_names", "variable_names"]
kwargs = {k: v for k, v in self.attrs.items() if k not in special}
if name is None:
name = "{0}_{1}_mapped".format(self.natural_name, variable.natural_name)
kwargs["name"] = name
kwargs["parent"] = parent
out = Data(**kwargs)
# mapped variable
values = points
out.create_variable(values=values, **variable.attrs)
# orthogonal variables
for v in self.variables:
if wt_kit.orthogonal(v.shape, variable.shape):
out.create_variable(values=v[:], **v.attrs)
out.transform(*self.axis_expressions)
# interpolate
if self.ndim == 1:
def interpolate(dataset, points):
function = scipy.interpolate.interp1d(variable[:], dataset[:], bounds_error=False)
return function(points)
else:
pts = np.array([a.full.flatten() for a in self.axes]).T
out_pts = np.array([a.full.flatten() for a in out.axes]).T
def interpolate(dataset, points):
values = dataset.full.flatten()
function = scipy.interpolate.LinearNDInterpolator(pts, values, rescale=True)
new = function(out_pts)
new.shape = out.shape
return new
for v in self.variables:
if v.natural_name not in out.variable_names:
out.create_variable(values=interpolate(v, points), **v.attrs)
out.variable_names = self.variable_names # enforce old order
out._variables = None # force regeneration of variables @property
for channel in self.channels:
out.create_channel(values=interpolate(channel, points), **channel.attrs)
# finish
if verbose:
print("data mapped from {0} to {1}".format(self.shape, out.shape))
return out
[docs] def offset(
self,
points,
offsets,
along,
offset_axis,
units="same",
offset_units="same",
mode="valid",
method="linear",
verbose=True,
):
"""Offset one axis based on another axis' values.
Useful for correcting instrumental artifacts such as zerotune.
Parameters
----------
points : 1D array-like
Points.
offsets : 1D array-like
Offsets.
along : str or int
Axis that points array lies along.
offset_axis : str or int
Axis to offset using offsets.
units : str (optional)
Units of points array.
offset_units : str (optional)
Units of offsets aray.
mode : {'valid', 'full', 'old'} (optional)
Define how far the new axis will extend. Points outside of valid
interpolation range will be written nan.
method : {'linear', 'nearest', 'cubic'} (optional)
The interpolation method. Note that cubic interpolation is only
possible for 1D and 2D data. See `griddata`__ for more information.
Default is linear.
verbose : bool (optional)
Toggle talkback. Default is True.
__ http://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.griddata.html
>>> points # an array of w1 points
>>> offsets # an array of d1 corrections
>>> data.offset(points, offsets, 'w1', 'd1')
"""
raise NotImplementedError
# axis ------------------------------------------------------------------------------------
if isinstance(along, int):
axis_index = along
elif isinstance(along, str):
axis_index = self.axis_names.index(along)
else:
raise TypeError("along: expected {int, str}, got %s" % type(along))
axis = self._axes[axis_index]
# values & points -------------------------------------------------------------------------
# get values, points, units
if units == "same":
input_units = axis.units
else:
input_units = units
# check offsets is 1D or 0D
if len(offsets.shape) == 1:
pass
else:
raise RuntimeError("values must be 1D or 0D in offset!")
# check if units is compatible, convert
dictionary = getattr(wt_units, axis.units_kind)
if input_units in dictionary.keys():
pass
else:
raise RuntimeError("units incompatible in offset!")
points = wt_units.converter(points, input_units, axis.units)
# create correction array
function = interp1d(points, offsets, bounds_error=False)
corrections = function(axis[:])
# remove nans
finite_indicies = np.where(np.isfinite(corrections))[0]
left_pad_width = finite_indicies[0]
right_pad_width = len(corrections) - finite_indicies[-1] - 1
corrections = np.pad(
corrections[np.isfinite(corrections)],
(int(left_pad_width), int(right_pad_width)),
mode="edge",
)
# do correction ---------------------------------------------------------------------------
# transpose so axis is last
transpose_order = np.arange(len(self._axes))
transpose_order[axis_index] = len(self._axes) - 1
transpose_order[-1] = axis_index
self.transpose(transpose_order, verbose=False)
# get offset axis index
if isinstance(offset_axis, int):
offset_axis_index = offset_axis
elif isinstance(offset_axis, str):
offset_axis_index = self.axis_names.index(offset_axis)
else:
raise TypeError("offset_axis: expected {int, str}, got %s" % type(offset_axis))
# new points
new_points = [a[:] for a in self._axes]
old_offset_axis_points = self._axes[offset_axis_index][:]
spacing = abs(
(old_offset_axis_points.max() - old_offset_axis_points.min())
/ float(len(old_offset_axis_points))
)
if mode == "old":
new_offset_axis_points = old_offset_axis_points
elif mode == "valid":
_max = old_offset_axis_points.max() + corrections.min()
_min = old_offset_axis_points.min() + corrections.max()
n = int(abs(np.ceil((_max - _min) / spacing)))
new_offset_axis_points = np.linspace(_min, _max, n)
elif mode == "full":
_max = old_offset_axis_points.max() + corrections.max()
_min = old_offset_axis_points.min() + corrections.min()
n = np.ceil((_max - _min) / spacing)
new_offset_axis_points = np.linspace(_min, _max, n)
new_points[offset_axis_index] = new_offset_axis_points
new_xi = tuple(np.meshgrid(*new_points, indexing="ij"))
xi = tuple(np.meshgrid(*[a[:] for a in self._axes], indexing="ij"))
for channel in self.channels:
# 'undo' gridding
arr = np.zeros((len(self._axes) + 1, channel[:].size))
for i in range(len(self._axes)):
arr[i] = xi[i].flatten()
arr[-1] = channel[:].flatten()
# do corrections
corrections = list(corrections)
corrections = corrections * int((len(arr[0]) / len(corrections)))
arr[offset_axis_index] += corrections
# grid data
tup = tuple([arr[i] for i in range(len(arr) - 1)])
# note that rescale is crucial in this operation
out = griddata(tup, arr[-1], new_xi, method=method, fill_value=np.nan, rescale=True)
channel[:] = out
self._axes[offset_axis_index][:] = new_offset_axis_points
# transpose out
self.transpose(transpose_order, verbose=False)
[docs] def print_tree(self, *, verbose=True):
"""Print a ascii-formatted tree representation of the data contents."""
print("{0} ({1})".format(self.natural_name, self.filepath))
self._print_branch("", depth=0, verbose=verbose)
[docs] def prune(self, keep_channels=True, *, verbose=True):
"""Remove unused variables and (optionally) channels from the Data object.
Unused variables are those that are not included in either axes or constants.
Unused channels are those not specified in keep_channels, or the first channel.
Parameters
----------
keep_channels : boolean or int or str or tuple
If False, removes all but the first channel.
If int or str, removes all but that index/name channel.
If tuple, removes all channels except those in the tuple by index or name.
Default is True: do not delete channels
verbose : boolean
Toggle talkback. Default is True.
"""
for v in self.variables:
for var in wt_kit.flatten_list([ax.variables for ax in self._axes + self._constants]):
if v == var:
break
else:
self.remove_variable(v.natural_name, implied=False, verbose=verbose)
if keep_channels is not True:
try:
if isinstance(keep_channels, str):
raise TypeError
indexes = tuple(keep_channels)
except TypeError:
indexes = (keep_channels,)
for i, ch in enumerate(self.channels):
if i not in indexes and not ch.natural_name in indexes:
self.remove_channel(ch.natural_name, verbose=verbose)
[docs] def remove_channel(self, channel, *, verbose=True):
"""Remove channel from data.
Parameters
----------
channel : int or str
Channel index or name to remove.
verbose : boolean (optional)
Toggle talkback. Default is True.
"""
channel_index = wt_kit.get_index(self.channel_names, channel)
new = list(self.channel_names)
name = new.pop(channel_index)
del self[name]
self.channel_names = new
if verbose:
print("channel {0} removed".format(name))
[docs] def remove_variable(self, variable, *, implied=True, verbose=True):
"""Remove variable from data.
Parameters
----------
variable : int or str
Variable index or name to remove.
implied : boolean (optional)
Toggle deletion of other variables that start with the same
name. Default is True.
verbose : boolean (optional)
Toggle talkback. Default is True.
"""
if isinstance(variable, int):
variable = self.variable_names[variable]
# find all of the implied variables
removed = []
if implied:
for n in self.variable_names:
if n.startswith(variable):
removed.append(n)
else:
removed = [variable]
# check that axes will not be ruined
for n in removed:
for a in self._axes:
if n in [v.natural_name for v in a.variables]:
message = "{0} is contained in axis {1}".format(n, a.expression)
raise RuntimeError(message)
for c in self._constants:
if n in [v.natural_name for v in c.variables]:
warnings.warn(
"Variable being removed used in a constant",
wt_exceptions.WrightToolsWarning,
)
# do removal
for n in removed:
variable_index = wt_kit.get_index(self.variable_names, n)
new = list(self.variable_names)
name = new.pop(variable_index)
del self[name]
self.variable_names = new
self._variables = None
# finish
if verbose:
print("{0} variable(s) removed:".format(len(removed)))
for n in removed:
print(" {0}".format(n))
[docs] def rename_channels(self, *, verbose=True, **kwargs):
"""Rename a set of channels.
Parameters
----------
kwargs
Keyword arguments of the form current:'new'.
verbose : boolean (optional)
Toggle talkback. Default is True
"""
# ensure that items will remain unique
changed = kwargs.keys()
for k, v in kwargs.items():
if v not in changed and v in self.keys():
raise wt_exceptions.NameNotUniqueError(v)
# compile references to items that are changing
new = {}
for k, v in kwargs.items():
obj = self[k]
index = self.channel_names.index(k)
# rename
new[v] = obj, index
Group._instances.pop(obj.fullpath, None)
obj.natural_name = str(v)
# remove old references
del self[k]
# apply new references
names = list(self.channel_names)
for v, value in new.items():
obj, index = value
self[v] = obj
names[index] = v
self.channel_names = names
# finish
if verbose:
print("{0} channel(s) renamed:".format(len(kwargs)))
for k, v in kwargs.items():
print(" {0} --> {1}".format(k, v))
[docs] def rename_variables(self, *, implied=True, verbose=True, **kwargs):
"""Rename a set of variables.
Parameters
----------
kwargs
Keyword arguments of the form current:'new'.
implied : boolean (optional)
Toggle inclusion of other variables that start with the same
name. Default is True.
verbose : boolean (optional)
Toggle talkback. Default is True
"""
# find all of the implied variables
kwargs = collections.OrderedDict(kwargs)
if implied:
new = collections.OrderedDict()
for k, v in kwargs.items():
for n in self.variable_names:
if n.startswith(k):
new[n] = n.replace(k, v, 1)
kwargs = new
# ensure that items will remain unique
changed = kwargs.keys()
for k, v in kwargs.items():
if v not in changed and v in self.keys():
raise wt_exceptions.NameNotUniqueError(v)
# compile references to items that are changing
new = {}
for k, v in kwargs.items():
obj = self[k]
index = self.variable_names.index(k)
# rename
new[v] = obj, index
Group._instances.pop(obj.fullpath, None)
obj.natural_name = str(v)
# remove old references
del self[k]
# apply new references
names = list(self.variable_names)
for v, value in new.items():
obj, index = value
self[v] = obj
names[index] = v
self.variable_names = names
units = self.units
new = list(self.axis_expressions)
for i, v in enumerate(kwargs.keys()):
for j, n in enumerate(new):
new[j] = n.replace(v, "{%i}" % i)
for i, n in enumerate(new):
new[i] = n.format(*kwargs.values())
self.transform(*new)
for a, u in zip(self._axes, units):
a.convert(u)
units = self.constant_units
new = list(self.constant_expressions)
for i, v in enumerate(kwargs.keys()):
for j, n in enumerate(new):
new[j] = n.replace(v, "{%i}" % i)
for i, n in enumerate(new):
new[i] = n.format(*kwargs.values())
self.set_constants(*new)
for c, u in zip(self._constants, units):
c.convert(u)
# finish
if verbose:
print("{0} variable(s) renamed:".format(len(kwargs)))
for k, v in kwargs.items():
print(" {0} --> {1}".format(k, v))
[docs] def share_nans(self):
"""Share not-a-numbers between all channels.
If any channel is nan at a given index, all channels will be nan
at that index after this operation.
Uses the share_nans method found in wt.kit.
"""
def f(_, s, channels):
outs = wt_kit.share_nans(*[c[s] for c in channels])
for c, o in zip(channels, outs):
c[s] = o
self.channels[0].chunkwise(f, self.channels)
[docs] def smooth(self, factors, channel=None, verbose=True) -> "Data":
"""Smooth a channel using an n-dimenional kaiser window.
Note, all arrays are loaded into memory.
For more info see `Kaiser_window`__ wikipedia entry.
__ https://en.wikipedia.org/wiki/Kaiser_window
Parameters
----------
factors : int or list of int
The smoothing factor. You may provide a list of smoothing factors
for each axis.
channel : int or str or None (optional)
The channel to smooth. If None, all channels will be smoothed.
Default is None.
verbose : bool (optional)
Toggle talkback. Default is True.
"""
warnings.warn("smooth", category=wt_exceptions.EntireDatasetInMemoryWarning)
# get factors -----------------------------------------------------------------------------
if isinstance(factors, list):
pass
else:
dummy = np.zeros(len(self._axes))
dummy[::] = factors
factors = list(dummy)
# get channels ----------------------------------------------------------------------------
if channel is None:
channels = self.channels
else:
if isinstance(channel, int):
channel_index = channel
elif isinstance(channel, str):
channel_index = self.channel_names.index(channel)
else:
raise TypeError("channel: expected {int, str}, got %s" % type(channel))
channels = [self.channels[channel_index]]
# smooth ----------------------------------------------------------------------------------
for channel in channels:
values = channel[:]
for axis_index in range(len(factors)):
factor = factors[axis_index]
# transpose so the axis of interest is last
transpose_order = range(len(values.shape))
# replace axis_index with zero
transpose_order = [
len(values.shape) - 1 if i == axis_index else i for i in transpose_order
]
transpose_order[len(values.shape) - 1] = axis_index
values = values.transpose(transpose_order)
# get kaiser window
beta = 5.0
w = np.kaiser(2 * factor + 1, beta)
# for all slices...
for index in np.ndindex(values[..., 0].shape):
current_slice = values[index]
temp_slice = np.pad(current_slice, int(factor), mode=str("edge"))
values[index] = np.convolve(temp_slice, w / w.sum(), mode=str("valid"))
# transpose out
values = values.transpose(transpose_order)
# return array to channel object
channel[:] = values
if verbose:
print("smoothed data")
[docs] def split(
self, expression, positions, *, units=None, parent=None, verbose=True
) -> wt_collection.Collection:
"""
Split the data object along a given expression, in units.
Parameters
----------
expression : int or str
The expression to split along. If given as an integer, the axis at that index
is used.
positions : number-type or 1D array-type
The position(s) to split at, in units.
units : str (optional)
The units of the given positions. Default is same, which assumes
input units are identical to first variable units.
parent : WrightTools.Collection (optional)
The parent collection in which to place the 'split' collection.
Default is a new Collection.
verbose : bool (optional)
Toggle talkback. Default is True.
Returns
-------
WrightTools.collection.Collection
A Collection of data objects.
The order of the objects is such that the axis points retain their original order.
See Also
--------
chop
Divide the dataset into its lower-dimensionality components.
collapse
Collapse the dataset along one axis.
"""
# axis ------------------------------------------------------------------------------------
old_expr = self.axis_expressions
old_units = self.units
out = wt_collection.Collection(name="split", parent=parent)
if isinstance(expression, int):
if units is None:
units = self._axes[expression].units
expression = self._axes[expression].expression
elif isinstance(expression, str):
pass
else:
raise TypeError("expression: expected {int, str}, got %s" % type(expression))
self.transform(expression)
if units:
self.convert(units)
try:
positions = [-np.inf] + sorted(list(positions)) + [np.inf]
except TypeError:
positions = [-np.inf, positions, np.inf]
values = self._axes[0].full
masks = [(values >= lo) & (values < hi) for lo, hi in wt_kit.pairwise(positions)]
omasks = []
cuts = []
for mask in masks:
try:
omasks.append(wt_kit.mask_reduce(mask))
cuts.append([i == 1 for i in omasks[-1].shape])
# Ensure at least one axis is kept
if np.all(cuts[-1]):
cuts[-1][0] = False
except ValueError:
omasks.append(None)
cuts.append(None)
for i in range(len(positions) - 1):
out.create_data("split%03i" % i)
for var in self.variables:
for i, (imask, omask, cut) in enumerate(zip(masks, omasks, cuts)):
if omask is None:
# Zero length split
continue
omask = wt_kit.enforce_mask_shape(omask, var.shape)
omask.shape = tuple([s for s, c in zip(omask.shape, cut) if not c])
out_arr = np.full(omask.shape, np.nan)
imask = wt_kit.enforce_mask_shape(imask, var.shape)
out_arr[omask] = var[:][imask]
out[i].create_variable(values=out_arr, **var.attrs)
for ch in self.channels:
for i, (imask, omask, cut) in enumerate(zip(masks, omasks, cuts)):
if omask is None:
# Zero length split
continue
omask = wt_kit.enforce_mask_shape(omask, ch.shape)
omask.shape = tuple([s for s, c in zip(omask.shape, cut) if not c])
out_arr = np.full(omask.shape, np.nan)
imask = wt_kit.enforce_mask_shape(imask, ch.shape)
out_arr[omask] = ch[:][imask]
out[i].create_channel(values=out_arr, **ch.attrs)
if verbose:
for d in out.values():
try:
d.transform(expression)
except IndexError:
continue
print("split data into {0} pieces along <{1}>:".format(len(positions) - 1, expression))
for i, (lo, hi) in enumerate(wt_kit.pairwise(positions)):
new_data = out[i]
if new_data.shape == ():
print(" {0} : None".format(i))
else:
new_axis = new_data.axes[0]
print(
" {0} : {1:0.2f} to {2:0.2f} {3} {4}".format(
i, lo, hi, new_axis.units, new_axis.shape
)
)
for d in out.values():
try:
d.transform(*old_expr)
keep = []
keep_units = []
for ax in d.axes:
if ax.size > 1:
keep.append(ax.expression)
keep_units.append(ax.units)
else:
d.create_constant(ax.expression, verbose=False)
d.transform(*keep)
for ax, u in zip(d.axes, keep_units):
ax.convert(u)
except IndexError:
continue
tempax = Axis(d, expression)
if all(
np.all(
np.sum(~np.isnan(tempax.masked), axis=tuple(set(range(tempax.ndim)) - {j}))
<= 1
)
for j in range(tempax.ndim)
):
d.create_constant(expression, verbose=False)
self.transform(*old_expr)
for ax, u in zip(self.axes, old_units):
ax.convert(u)
return out
[docs] def set_constants(self, *constants, verbose=True):
"""Set the constants associated with the data.
Parameters
----------
constants : str
Expressions for the new set of constants.
verbose : boolean (optional)
Toggle talkback. Default is True
See Also
--------
transform
Similar method except for axes.
create_constant
Add an individual constant.
remove_constant
Remove an individual constant.
"""
# create
new = []
current = {c.expression: c for c in self._constants}
for expression in constants:
constant = current.get(expression, Constant(self, expression))
new.append(constant)
self._constants = new
# units
for c in self._constants:
if c.units is None:
c.convert(c.variables[0].units)
# finish
self.flush()
self._on_constants_updated()
[docs] def create_constant(self, expression, *, verbose=True):
"""Append a constant to the stored list.
Parameters
----------
expression : str
Expression for the new constant.
verbose : boolean (optional)
Toggle talkback. Default is True
See Also
--------
set_constants
Remove and replace all constants.
remove_constant
Remove an individual constant.
"""
if expression in self.constant_expressions:
wt_exceptions.ObjectExistsWarning.warn(expression)
return self.constants[self.constant_expressions.index(expression)]
constant = Constant(self, expression)
if constant.units is None:
constant.convert(constant.variables[0].units)
self._constants.append(constant)
self.flush()
self._on_constants_updated()
if verbose:
print("Constant '{}' added".format(constant.expression))
return constant
[docs] def remove_constant(self, constant, *, verbose=True):
"""Remove a constant from the stored list.
Parameters
----------
constant : str or Constant or int
Expression for the new constant.
verbose : boolean (optional)
Toggle talkback. Default is True
See Also
--------
set_constants
Remove and replace all constants.
create_constant
Add an individual constant.
"""
if isinstance(constant, (str, int)):
constant_index = wt_kit.get_index(self.constant_expressions, constant)
elif isinstance(constant, Constant):
constant_index = wt_kit.get_index(self.constants, constant)
constant = self._constants[constant_index]
self._constants.pop(constant_index)
self.flush()
self._on_constants_updated()
if verbose:
print("Constant '{}' removed".format(constant.expression))
[docs] def zoom(self, factor, order=1, verbose=True):
"""Zoom the data array using spline interpolation of the requested order.
The number of points along each axis is increased by factor.
See `scipy ndimage`__ for more info.
__ http://docs.scipy.org/doc/scipy/reference/
generated/scipy.ndimage.interpolation.zoom.html
Parameters
----------
factor : float
The number of points along each axis will increase by this factor.
order : int (optional)
The order of the spline used to interpolate onto new points.
verbose : bool (optional)
Toggle talkback. Default is True.
See Also
--------
downscale
Down-sample the data array using local averaging.
"""
raise NotImplementedError
import scipy.ndimage
# axes
for axis in self._axes:
axis[:] = scipy.ndimage.interpolation.zoom(axis[:], factor, order=order)
# channels
for channel in self.channels:
channel[:] = scipy.ndimage.interpolation.zoom(channel[:], factor, order=order)
# return
if verbose:
print("data zoomed to new shape:", self.shape)