"""Central data class and associated."""
# --- import --------------------------------------------------------------------------------------
from __future__ import annotations
import collections
import operator
import functools
import warnings
import numpy as np
import sys
import h5py
import scipy
from scipy.interpolate import griddata, interp1d
from .._group import Group
from .. import collection as wt_collection
from .. import exceptions as wt_exceptions
from .. import kit as wt_kit
from .. import units as wt_units
from ._axis import Axis, identifier_to_operator, operator_to_identifier
from ._channel import Channel
from ._constant import Constant
from ._variable import Variable
# --- define --------------------------------------------------------------------------------------
__all__ = ["Data"]
# --- class ---------------------------------------------------------------------------------------
[docs]
class Data(Group):
"""Multidimensional dataset."""
class_name = "Data"
[docs]
def __init__(self, *args, **kwargs):
self._axes = []
self._constants = []
Group.__init__(self, *args, **kwargs)
# populate axes, constants from attrs string
for identifier in self.attrs.get("axes", []):
if hasattr(identifier, "decode"):
identifier = identifier.decode()
expression, units = identifier.split("{")
units = units.replace("}", "").strip()
if units == "None":
units = None
# Should not be needed for wt5 >= 1.0.3, kept for opening older wt5 files.
for i in identifier_to_operator.keys():
expression = expression.replace(i, identifier_to_operator[i])
expression = expression.replace(" ", "") # remove all whitespace
axis = Axis(self, expression, units)
self._axes.append(axis)
for identifier in self.attrs.get("constants", []):
if hasattr(identifier, "decode"):
identifier = identifier.decode()
expression, units = identifier.split("{")
units = units.replace("}", "").strip()
if units == "None":
units = None
for i in identifier_to_operator.keys():
expression = expression.replace(i, identifier_to_operator[i])
expression = expression.replace(" ", "") # remove all whitespace
const = Constant(self, expression, units)
self._constants.append(const)
self._current_axis_identities_in_natural_namespace = []
if self.file.mode is not None and self.file.mode != "r":
self._on_constants_updated()
self._on_axes_updated()
# the following are populated if not already recorded
self.channel_names
self.source
self.variable_names
def __repr__(self) -> str:
return "<WrightTools.Data '{0}' {1} at {2}>".format(
self.natural_name, str(self.axis_names), "::".join([self.filepath, self.name])
)
def __dir__(self) -> list:
default = object.__dir__(self)
axes = [ax.natural_name for ax in self.axes]
return default + axes + self._ipython_key_completions_()
def _ipython_key_completions_(self) -> list:
channels = [ch.natural_name for ch in self.channels]
variables = [v.natural_name for v in self.variables]
return channels + variables
@property
def axes(self) -> tuple:
return tuple(self._axes)
@property
def axis_expressions(self) -> tuple:
"""Axis expressions."""
return tuple(a.expression for a in self._axes)
@property
def axis_names(self) -> tuple:
"""Axis names."""
return tuple(a.natural_name for a in self._axes)
@property
def constants(self) -> tuple:
return tuple(self._constants)
@property
def constant_expressions(self) -> tuple:
"""Axis expressions."""
return tuple(a.expression for a in self._constants)
@property
def constant_names(self) -> tuple:
"""Axis names."""
return tuple(a.natural_name for a in self._constants)
@property
def channel_names(self) -> tuple:
"""Channel names."""
if "channel_names" not in self.attrs.keys():
self.attrs["channel_names"] = np.array([], dtype="S")
return tuple(s.decode() for s in self.attrs["channel_names"])
@channel_names.setter
def channel_names(self, value):
"""Set channel names."""
self.attrs["channel_names"] = np.array(value, dtype="S")
@property
def channels(self) -> tuple:
"""Channels."""
return tuple(self[n] for n in self.channel_names)
@property
def datasets(self) -> tuple:
"""Datasets."""
return tuple(v for _, v in self.items() if isinstance(v, h5py.Dataset))
@property
def kind(self):
"""Kind."""
if "kind" not in self.attrs.keys():
self.attrs["kind"] = "None"
value = self.attrs["kind"]
return value if not value == "None" else None
@property
def ndim(self) -> int:
"""Get number of dimensions."""
try:
assert self._ndim is not None
except (AssertionError, AttributeError):
if len(self.variables) == 0:
self._ndim = 0
else:
self._ndim = self.variables[0].ndim
finally:
return self._ndim
@property
def shape(self) -> tuple:
"""Shape."""
try:
assert self._shape is not None
except (AssertionError, AttributeError):
self._shape = wt_kit.joint_shape(*self.variables)
finally:
return self._shape
@property
def size(self) -> int:
"""Size."""
return functools.reduce(operator.mul, self.shape)
@property
def source(self):
"""Source."""
if "source" not in self.attrs.keys():
self.attrs["source"] = "None"
value = self.attrs["source"]
return value if isinstance(value, str) and not value == "None" else None
@property
def units(self) -> tuple:
"""All axis units."""
return tuple(a.units for a in self._axes)
@property
def constant_units(self) -> tuple:
"""All constant units."""
return tuple(a.units for a in self._constants)
@property
def variable_names(self) -> tuple:
"""Variable names."""
if "variable_names" not in self.attrs.keys():
self.attrs["variable_names"] = np.array([], dtype="S")
return tuple(s.decode() for s in self.attrs["variable_names"])
@variable_names.setter
def variable_names(self, value):
"""Set variable names."""
self.attrs["variable_names"] = np.array(value, dtype="S")
@property
def variables(self) -> tuple:
"""Variables."""
try:
assert self._variables is not None
except (AssertionError, AttributeError):
self._variables = [self[n] for n in self.variable_names]
finally:
return tuple(self._variables)
@property
def _leaf(self):
return "{0} {1}".format(self.natural_name, self.shape)
def _on_axes_updated(self):
"""Method to run when axes are changed in any way.
Propagates updated axes properly.
"""
# update attrs
self.attrs["axes"] = np.array([a.identity.encode() for a in self._axes], dtype="S")
# remove old attributes
while len(self._current_axis_identities_in_natural_namespace) > 0:
key = self._current_axis_identities_in_natural_namespace.pop(0)
try:
delattr(self, key)
except AttributeError:
pass # already gone
# populate new attributes
for a in self._axes:
key = a.natural_name
setattr(self, key, a)
self._current_axis_identities_in_natural_namespace.append(key)
def _on_constants_updated(self):
"""Method to run when constants are changed in any way.
Propagates updated constants properly.
"""
# update attrs
self.attrs["constants"] = np.array(
[a.identity.encode() for a in self._constants], dtype="S"
)
def _print_branch(self, prefix, depth, verbose):
def print_leaves(prefix, lis, vline=True):
for i, item in enumerate(lis):
if vline:
a = "│ "
else:
a = " "
if i + 1 == len(lis):
b = "└── "
else:
b = "├── "
s = prefix + a + b + "{0}: {1}".format(i, item._leaf)
print(s)
if verbose:
# axes
print(prefix + f"├── axes ({len(self.axes)})")
print_leaves(prefix, self.axes)
# constants
print(prefix + f"├── constants ({len(self.constants)})")
print_leaves(prefix, self.constants)
# variables
print(prefix + f"├── variables ({len(self.variables)})")
print_leaves(prefix, self.variables)
# channels
print(prefix + f"└── channels ({len(self.channels)})")
print_leaves(prefix, self.channels, vline=False)
else:
# axes
s = f"axes ({len(self.axes)}): "
s += ", ".join(["{0} ({1})".format(a.expression, a.units) for a in self.axes])
print(prefix + "├── " + s)
# constants
s = f"constants ({len(self.constants)}): "
s += ", ".join(
["{0} ({1} {2})".format(a.expression, a.value, a.units) for a in self.constants]
)
print(prefix + "├── " + s)
# channels
s = f"channels ({len(self.channels)}): "
s += ", ".join(self.channel_names)
print(prefix + "└── " + s)
[docs]
def bring_to_front(self, channel):
"""Bring a specific channel to the zero-indexed position in channels.
All other channels get pushed back but remain in order.
Parameters
----------
channel : int or str
Channel index or name.
"""
channel_index = wt_kit.get_index(self.channel_names, channel)
new = list(self.channel_names)
new.insert(0, new.pop(channel_index))
self.channel_names = new
def at(self, parent=None, name=None, **at) -> Data:
"""Return data of a subset of the data at specified axis position(s).
kwargs
------
parent : WrightTools.Collection (optional)
parent of new object. Default is no parent.
name : string (optional)
if specified, name of new data object.
**at :
keys are axis identifiers (Axis.natural_name), values lists of format
[position, unit] if no coordinates are specified, returns a copy of
the array. _specified axes must be one-dimensional in the array_
Returns
-------
out : wt.Data
Data that retains the dimension of unspecified axes.
Example
-------
```
>>> from WrightTools import datasets
>>> data = wt.open(datasets.wt5.v1p0p1_MoS2_TrEE_movie) # axes w2, w1=wm, d2
>>> probed_at_resonance = data.at(w1__e__wm=[690, "nm"])
>>> zero_delay_data = data.at(d2=[0, "fs"])
```
Notes
-----
* _most attrs are not retained in the new data object_.
* some axis expressions use forbidden dictkey characters (e.g. "=").
For these case, you must use the string substitution
(e.g "=" -> "__e__"). Use `Data.axis_names` or `Axis.natural_name`
to check the identifier form of the expression. Also consider using
kit.string2identifier to filter.
See Also
--------
Data.chop : also reduces dimensionality, but returns a collection of one or more
data objects. Kept axes can be sliced into a collection
"""
idx = self._at_to_slice(**at)
return self._from_slice(idx, name=name, parent=parent)
def squeeze(self, name=None, parent=None):
"""Reduce the data to the dimensionality of the (non-trivial) span of the axes.
i.e. if the joint shape of the axes has an array dimension with length 1, this
array dimension is squeezed.
channels and variables that span beyond the axes are removed.
Parameters
----------
name : string (optional)
name of the new Data.
parent : WrightTools Collection instance (optional)
Collection to place the new "chop" collection within. Default is
None (new parent).
Returns
-------
out : wt.Data
new data object. The new data object has dimensions of the
(non-trivial) span of the current axes
Examples
--------
>>> ...
See also
--------
Data.chop: Divide the dataset into its lower-dimensionality components.
...
"""
new = Data(name=name, parent=parent)
attrs = {
k: v
for k, v in self.attrs.items()
if k
not in [
"axes",
"channel_names",
"constants",
"name",
"source",
"item_names",
"variable_names",
]
}
new.attrs.update(attrs)
joint_shape = wt_kit.joint_shape(*[ai[:] for ai in self.axes])
cull_dims = [j == 1 for j in joint_shape]
sl = [0 if cull else slice(None) for cull in cull_dims]
matches_broadcast_axes = lambda a: all(
[a.shape[i] == 1 for i in range(self.ndim) if cull_dims[i]]
)
for v in filter(matches_broadcast_axes, self.variables):
kwargs = v._to_dict()
kwargs["values"] = v[sl]
new.create_variable(**kwargs)
for c in filter(matches_broadcast_axes, self.channels):
kwargs = c._to_dict()
kwargs["values"] = c[sl]
new.create_channel(**kwargs)
# inherit constants
for c in self.constants:
new.create_constant(c.expression)
new.transform(*self.axis_expressions)
return new
[docs]
def chop(self, *args, at=None, parent=None, verbose=True) -> wt_collection.Collection:
"""Divide the dataset into its lower-dimensionality components.
Parameters
----------
axis : str or int (args)
Axes of the returned data objects. Strings refer to the names of
axes in this object, integers refer to their index. Provide multiple
axes to return multidimensional data objects.
at : dict (optional)
Choice of position along an axis. Keys are axis names, values are lists
``[position, input units]``. If exact position does not exist,
the closest valid position is used.
parent : WrightTools Collection instance (optional)
Collection to place the new "chop" collection within. Default is
None (new parent).
verbose : bool (optional)
Toggle talkback. Default is True.
Returns
-------
WrightTools Collection
Collection of chopped data objects.
Examples
--------
>>> data.axis_names
['d2', 'w1', 'w2']
Get all w1 wigners.
>>> datas = data.chop('d2', 'w1')
>>> len(datas)
51
Get 2D frequency at d2=0 fs.
>>> datas = data.chop('w1', 'w2', at={'d2': [0, 'fs']})
>>> len(datas)
0
>>> datas[0].axis_names
['w1', 'w2']
>>> datas[0].d2[:]
0.
See Also
--------
collapse
Collapse the dataset along one axis.
split
Split the dataset while maintaining its dimensionality.
"""
# parse args
args = list(args)
for i, arg in enumerate(args):
if isinstance(arg, int):
args[i] = self._axes[arg].natural_name
elif isinstance(arg, str):
arg = arg.strip()
args[i] = wt_kit.string2identifier(arg, replace=operator_to_identifier)
transform_expression = [self._axes[self.axis_names.index(a)].expression for a in args]
if at is None:
at = {}
# normalize the at keys to the natural name
for k in list(at.keys()):
k = k.strip()
nk = wt_kit.string2identifier(k, replace=operator_to_identifier)
if nk != k:
at[nk] = at[k]
at.pop(k)
# distinguish looping and non-looping indices
at_idx = self._at_to_slice(**at)
at_axes = at_idx != slice(None)
kept = args + [ak for ak in at.keys()]
kept_axes = [self._axes[self.axis_names.index(a)] for a in kept]
removed_axes = [a for a in self._axes if a not in kept_axes]
removed_shape = wt_kit.joint_shape(*removed_axes)
if removed_shape == ():
removed_shape = (1,) * self.ndim
removed_shape = list(removed_shape)
for ax in kept_axes:
if ax.shape.count(1) == ax.ndim - 1:
removed_shape[ax.shape.index(ax.size)] = 1
removed_shape = tuple(removed_shape)
# get output collection
out = wt_collection.Collection(name="chop", parent=parent)
i_digits = int(np.log10(np.prod(removed_shape))) + 1
# iterate
for i, idx in enumerate(np.ndindex(removed_shape)):
name = f"chop{i:0>{i_digits}}"
idx = np.array(idx, dtype=object)
idx[np.array(removed_shape) == 1] = slice(None)
idx[at_axes] = at_idx[at_axes]
self._from_slice(idx, name=name, parent=out)
out[name].transform(*transform_expression)
out.flush()
# return
if verbose:
print("chopped data into %d piece(s)" % len(out), "in", out[0].axis_expressions)
return out
def __getitem__(self, key) -> Data:
"""
data[5, :3]; return new data object with those array slices
"""
if type(key) in [int, slice]:
return self._from_slice(key)
elif (type(key) == tuple) and np.all([type(ki) in [int, slice] for ki in key]):
return self._from_slice(key)
else:
return super().__getitem__(key)
def _at_to_slice(self, **at) -> np.array:
"""create array slice using at"""
for k in at.keys():
if k not in self.axis_names:
raise ValueError(f"Axis identifier {k} not in Axes: {self.axis_names}")
idx = np.array([slice(None)] * len(self.shape))
for axis, point in at.items():
point, units = point
destination_units = self._axes[self.axis_names.index(axis)].units
point = wt_units.converter(point, units, destination_units)
axis_index = self.axis_names.index(axis)
axis = self._axes[axis_index]
idx_index = np.array(axis.shape) > 1
if np.sum(idx_index) > 1:
# we don't know how to handle a position within multiple array
# dimensions
raise wt_exceptions.MultidimensionalAxisError(
"Data._at_to_slice", axis.natural_name
)
idx_index = list(idx_index).index(True)
idx[idx_index] = np.argmin(np.abs(axis[tuple(idx)] - point))
return idx
def _from_slice(self, idx, name=None, parent=None) -> Data:
"""create self from an array slice of the parent self"""
if parent is None:
out = Data(name=name)
else:
out = parent.create_data(name=name)
for v in self.variables:
kwargs = v._to_dict()
kwargs["values"] = v[idx]
out.create_variable(**kwargs)
for c in self.channels:
kwargs = c._to_dict()
kwargs["values"] = c[idx]
out.create_channel(**kwargs)
new_axes = [a.expression for a in self.axes if a[idx].size > 1]
out.transform(*new_axes)
constants = [a for a in self.axis_expressions if a not in new_axes]
new_axis_units = [a.units for a in self.axes if a.expression in out.axis_expressions]
for const in list(self.constant_expressions) + constants:
out.create_constant(const, verbose=False)
for j, units in enumerate(new_axis_units):
out.axes[j].convert(units)
return out
[docs]
def gradient(self, axis, *, channel=0):
"""
Compute the gradient along one axis.
New channels have names ``<channel name>_<axis name>_gradient``.
Parameters
----------
axis : int or str
The axis to differentiate along.
If given as an integer, the axis in the underlying array is used,
and unitary spacing is assumed.
If given as a string, the axis must exist, and be a 1D array-aligned axis.
(i.e. have a shape with a single value which is not ``1``)
The axis to collapse along is inferred from the shape of the axis.
channel : int or str
The channel to differentiate.
Default is the first channel.
"""
# get axis index --------------------------------------------------------------------------
if isinstance(axis, int):
axis_index = axis
elif isinstance(axis, str):
index = self.axis_names.index(axis)
axes = [i for i in range(self.ndim) if self.axes[index].shape[i] > 1]
if len(axes) > 1:
raise wt_exceptions.MultidimensionalAxisError(axis, "collapse")
elif len(axes) == 0:
raise wt_exceptions.ValueError(
"Axis '{}' is a single point, cannot compute gradient".format(axis)
)
axis_index = axes[0]
else:
raise wt_exceptions.TypeError("axis: expected {int, str}, got %s" % type(axis))
channel_index = wt_kit.get_index(self.channel_names, channel)
channel = self.channel_names[channel_index]
if self[channel].shape[axis_index] == 1:
raise wt_exceptions.ValueError(
"Channel '{}' has a single point along Axis '{}', cannot compute gradient".format(
channel, axis
)
)
rtype = np.result_type(self[channel].dtype, float)
new = self.create_channel(
"{}_{}_gradient".format(channel, axis),
values=np.empty(self[channel].shape, dtype=rtype),
)
channel = self[channel]
if axis == axis_index:
new[:] = np.gradient(channel[:], axis=axis_index)
else:
new[:] = np.gradient(channel[:], self[axis].points, axis=axis_index)
[docs]
def moment(self, axis, channel=0, moment=1, *, resultant=None):
"""Take the nth moment the dataset along one axis, adding lower rank channels.
New channels have names ``<channel name>_<axis name>_moment_<moment num>``.
Moment 0 is the integral of the slice.
Moment 1 is the weighted average or "Center of Mass", normalized by the integral
Moment 2 is the variance, the central moment about the center of mass,
normalized by the integral
Moments 3+ are central moments about the center of mass, normalized by the integral
and by the standard deviation to the power of the moment.
Moments, especially higher order moments, are susceptible to noise and baseline.
It is recommended when used with real data to use :meth:`WrightTools.data.Channel.clip`
in conjunction with moments to reduce effects of noise.
Parameters
----------
axis : int or str
The axis to take the moment along.
If given as an integer, the axis with that index is used.
If given as a string, the axis with that name is used.
The axis must exist, and be a 1D array-aligned axis.
(i.e. have a shape with a single value which is not ``1``)
The collapsed axis must be monotonic to produce correct results.
The axis to collapse along is inferred from the shape of the axis.
channel : int or str
The channel to take the moment.
If given as an integer, the channel with that index is used.
If given as a string, the channel with that name is used.
The channel must have values along the axis
(i.e. its shape must not be ``1`` in the dimension for which the axis is not ``1``)
Default is 0, the first channel.
moment : int or tuple of int
The moments to take.
One channel will be created for each number given.
Default is 1, the center of mass.
resultant : tuple of int
The resultant shape after the moment operation.
By default, it is intuited by the axis along which the moment is being taken.
This default only works if that axis is 1D, so resultant is required if a
multidimensional axis is passed as the first argument.
The requirement of monotonicity applies on a per pixel basis.
See Also
--------
collapse
Reduce dimensionality by some mathematical operation
clip
Set values above/below a threshold to a particular value
WrightTools.kit.joint_shape
Useful for setting `resultant` kwarg based off of axes not collapsed.
"""
# get axis index --------------------------------------------------------------------------
axis_index = None
if resultant is not None:
for i, (s, r) in enumerate(zip(wt_kit.joint_shape(*self.axes), resultant)):
if s != r and r == 1 and axis_index is None:
axis_index = i
elif s == r:
continue
else:
raise wt_exceptions.ValueError(
f"Invalid resultant shape '{resultant}' for shape {wt_kit.joint_shape(*self.axes)}. "
+ "Consider using `wt.kit.joint_shape` to join non-collapsed axes."
)
index = wt_kit.get_index(self.axis_names, axis)
if axis_index is None:
axes = [i for i in range(self.ndim) if self.axes[index].shape[i] > 1]
if len(axes) > 1:
raise wt_exceptions.MultidimensionalAxisError(axis, "moment")
elif len(axes) == 0:
raise wt_exceptions.ValueError(
"Axis {} is a single point, cannot compute moment".format(axis)
)
axis_index = axes[0]
warnings.warn("moment", category=wt_exceptions.EntireDatasetInMemoryWarning)
channel_index = wt_kit.get_index(self.channel_names, channel)
channel = self.channel_names[channel_index]
if self[channel].shape[axis_index] == 1:
raise wt_exceptions.ValueError(
"Channel '{}' has a single point along Axis '{}', cannot compute moment".format(
channel, axis
)
)
new_shape = list(self[channel].shape)
new_shape[axis_index] = 1
channel = self[channel]
axis_inp = axis
axis = self.axes[index]
x = axis[:]
if np.any(np.isnan(x)):
raise wt_exceptions.ValueError("Axis '{}' includes NaN".format(axis_inp))
y = np.nan_to_num(channel[:])
try:
moments = tuple(moment)
except TypeError:
moments = (moment,)
multiplier = 1
if 0 in moments:
# May be possible to optimize, probably doesn't need the sum
# only matters for integral, all others normalize by integral
multiplier = np.sign(
np.sum(np.diff(x, axis=axis_index), axis=axis_index, keepdims=True)
)
for moment in moments:
about = 0
norm = 1
if moment > 0:
norm = np.trapz(y, x, axis=axis_index)
norm = np.array(norm)
norm.shape = new_shape
if moment > 1:
about = np.trapz(x * y, x, axis=axis_index)
about = np.array(about)
about.shape = new_shape
about /= norm
if moment > 2:
sigma = np.trapz((x - about) ** 2 * y, x, axis=axis_index)
sigma = np.array(sigma)
sigma.shape = new_shape
sigma /= norm
sigma **= 0.5
norm *= sigma**moment
values = np.trapz((x - about) ** moment * y, x, axis=axis_index)
values = np.array(values)
values.shape = new_shape
values /= norm
if moment == 0:
values *= multiplier
self.create_channel(
"{}_{}_{}_{}".format(channel.natural_name, axis_inp, "moment", moment),
values=values,
)
[docs]
def collapse(self, axis, method="sum"):
"""Collapse the dataset along one axis, adding lower rank channels.
New channels have names ``<channel name>_<axis name>_<method>``.
Parameters
----------
axis : int or str
The axis to collapse along.
If given as an integer, the axis in the underlying array is used.
If given as a string, the axis must exist, and be a 1D array-aligned axis.
(i.e. have a shape with a single value which is not ``1``)
The axis to collapse along is inferred from the shape of the axis.
method : {'average', 'sum', 'max', 'min'} (optional)
The method of collapsing the given axis. Method may also be list
of methods corresponding to the channels of the object. Default
is sum. NaNs are ignored.
Can also be a list, allowing for different treatment for varied channels.
In this case, None indicates that no change to that channel should occur.
See Also
--------
chop
Divide the dataset into its lower-dimensionality components.
split
Split the dataset while maintaining its dimensionality.
moment
Take the moment along a particular axis
"""
if method in ("int", "integrate"):
warnings.warn(
"integrate method of collapse is deprecated, use moment(moment=0) instead",
wt_exceptions.VisibleDeprecationWarning,
)
for channel in self.channel_names:
try:
self.moment(axis, channel, moment=0)
self.rename_channels(
**{self.channel_names[-1]: f"{channel}_{axis}_{method}"}, verbose=False
)
except wt_exceptions.ValueError:
pass # may have some channels which fail, do so silently
return
# get axis index --------------------------------------------------------------------------
if isinstance(axis, int):
axis_index = axis
elif isinstance(axis, str):
index = self.axis_names.index(axis)
axes = [i for i in range(self.ndim) if self.axes[index].shape[i] > 1]
if len(axes) > 1:
raise wt_exceptions.MultidimensionalAxisError(axis, "collapse")
elif len(axes) == 0:
raise wt_exceptions.ValueError(
"Axis {} is a single point, cannot collapse".format(axis)
)
axis_index = axes[0]
else:
raise wt_exceptions.TypeError("axis: expected {int, str}, got %s" % type(axis))
new_shape = list(self.shape)
new_shape[axis_index] = 1
func = {
"sum": np.nansum,
"max": np.nanmax,
"maximum": np.nanmax,
"min": np.nanmin,
"minimum": np.nanmin,
"ave": np.nanmean,
"average": np.nanmean,
"mean": np.nanmean,
}
# methods ---------------------------------------------------------------------------------
if isinstance(method, str):
methods = [method for _ in self.channels]
if isinstance(method, list):
if len(method) == len(self.channels):
methods = method
else:
raise wt_exceptions.ValueError(
"method argument must have same number of elements as there are channels"
)
for m in methods:
if m not in func.keys():
raise wt_exceptions.ValueError("method '{}' not recognized".format(m))
warnings.warn("collapse", category=wt_exceptions.EntireDatasetInMemoryWarning)
# collapse --------------------------------------------------------------------------------
for method, channel in zip(methods, self.channel_names):
if method is None:
continue
if self[channel].shape[axis_index] == 1:
continue # Cannot collapse any further, don't clutter data object
new_shape = list(self[channel].shape)
new_shape[axis_index] = 1
rtype = self[channel].dtype
if method in ["ave", "average", "mean"]:
rtype = np.result_type(self[channel].dtype, float)
new = self.create_channel(
"{}_{}_{}".format(channel, axis, method),
values=np.empty(new_shape, dtype=rtype),
units=self[channel].units,
)
new[:] = func[method](self[channel], axis=axis_index, keepdims=True)
[docs]
def convert(self, destination_units, *, convert_variables=False, verbose=True):
"""Convert all compatable axes and constants to given units.
Parameters
----------
destination_units : str
Destination units.
convert_variables : boolean (optional)
Toggle conversion of stored arrays. Default is False
verbose : bool (optional)
Toggle talkback. Default is True.
See Also
--------
Axis.convert
Convert a single axis object to compatable units. Call on an
axis object in data.axes.
"""
# apply to all compatible axes
for axis in self.axes:
if wt_units.is_valid_conversion(axis.units, destination_units):
orig = axis.units
axis.convert(destination_units, convert_variables=convert_variables)
if verbose:
print(
"axis {} converted from {} to {}".format(
axis.expression, orig, destination_units
)
)
# apply to all compatible constants
for constant in self.constants:
if wt_units.is_valid_conversion(constant.units, destination_units):
orig = constant.units
constant.convert(destination_units, convert_variables=convert_variables)
if verbose:
print(
"constant {} converted from {} to {}".format(
constant.expression, orig, destination_units
)
)
if convert_variables:
for var in self.variables:
if wt_units.is_valid_conversion(var.units, destination_units):
orig = var.units
var.convert(destination_units)
if verbose:
print(
"variable {} converted from {} to {}".format(
var.natural_name, orig, destination_units
)
)
self._on_axes_updated()
self._on_constants_updated()
[docs]
def create_channel(
self, name, values=None, *, shape=None, units=None, dtype=None, **kwargs
) -> Channel:
"""Append a new channel.
Parameters
----------
name : string
Unique name for this channel.
values : array (optional)
Array. If None, an empty array equaling the data shape is
created. Default is None.
shape : tuple of int
Shape to use. Must broadcast with the full shape.
Only used if `values` is None.
Default is the full shape of self.
units : string (optional)
Channel units. Default is None.
dtype : numpy.dtype (optional)
dtype to use for dataset, default is np.float64.
Only used if `values` is None.
kwargs : dict
Additional keyword arguments passed to Channel instantiation.
Returns
-------
Channel
Created channel.
"""
if name in self.channel_names:
warnings.warn(name, wt_exceptions.ObjectExistsWarning)
return self[name]
elif name in self.variable_names:
raise wt_exceptions.NameNotUniqueError(name)
require_kwargs = {"chunks": True}
if values is None:
if shape is None:
require_kwargs["shape"] = self.shape
else:
require_kwargs["shape"] = shape
if dtype is None:
require_kwargs["dtype"] = np.dtype(np.float64)
else:
require_kwargs["dtype"] = dtype
if require_kwargs["dtype"].kind == "f":
require_kwargs["fillvalue"] = np.nan
elif require_kwargs["dtype"].kind == "M":
require_kwargs["fillvalue"] = np.datetime64("NaT")
elif require_kwargs["dtype"].kind == "c":
require_kwargs["fillvalue"] = complex(np.nan, np.nan)
else:
require_kwargs["fillvalue"] = 0
else:
require_kwargs["data"] = values
require_kwargs["shape"] = values.shape
require_kwargs["dtype"] = values.dtype
if np.prod(require_kwargs["shape"]) == 1:
require_kwargs["chunks"] = None
if "compression" in kwargs:
require_kwargs["compression"] = kwargs["compression"]
if "compression_opts" in kwargs:
require_kwargs["compression_opts"] = kwargs["compression_opts"]
if "shuffle" in kwargs:
require_kwargs["shuffle"] = kwargs["shuffle"]
# create dataset
dataset_id = self.require_dataset(name=name, **require_kwargs).id
channel = Channel(self, dataset_id, units=units, **kwargs)
# finish
self.attrs["channel_names"] = np.append(self.attrs["channel_names"], name.encode())
return channel
[docs]
def create_variable(
self, name, values=None, *, shape=None, units=None, dtype=None, **kwargs
) -> Variable:
"""Add new child variable.
Parameters
----------
name : string
Unique identifier.
values : array-like (optional)
Array to populate variable with. If None, an variable will be filled with NaN.
Default is None.
shape : tuple of int
Shape to use. must broadcast with the full shape.
Only used if `values` is None.
Default is the full shape of self.
units : string (optional)
Variable units. Default is None.
dtype : numpy.dtype (optional)
dtype to use for dataset, default is np.float64.
Only used if `values` is None.
kwargs
Additional kwargs to variable instantiation.
Returns
-------
WrightTools Variable
New child variable.
"""
if name in self.variable_names:
warnings.warn(name, wt_exceptions.ObjectExistsWarning)
return self[name]
elif name in self.channel_names:
raise wt_exceptions.NameNotUniqueError(name)
if values is None:
if shape is None:
shape = self.shape
if dtype is None:
dtype = np.dtype(np.float64)
if dtype.kind in "f":
fillvalue = np.nan
elif dtype.kind in "M":
fillvalue = np.datetime64("NaT")
elif dtype.kind in "c":
fillvalue = complex(np.nan, np.nan)
else:
fillvalue = 0
else:
shape = values.shape
dtype = values.dtype
fillvalue = None
require_kwargs = {"chunks": True}
if "compression" in kwargs:
require_kwargs["compression"] = kwargs["compression"]
if "compression_opts" in kwargs:
require_kwargs["compression_opts"] = kwargs["compression_opts"]
if "shuffle" in kwargs:
require_kwargs["shuffle"] = kwargs["shuffle"]
if np.prod(shape) == 1:
require_kwargs["chunks"] = None
# create dataset
id = self.require_dataset(
name=name, data=values, shape=shape, dtype=dtype, fillvalue=fillvalue, **require_kwargs
).id
variable = Variable(self, id, units=units, **kwargs)
# finish
self._variables = None
self.attrs["variable_names"] = np.append(self.attrs["variable_names"], name.encode())
return variable
[docs]
def get_nadir(self, channel=0) -> tuple:
"""Get the coordinates, in units, of the minimum in a channel.
Parameters
----------
channel : int or str (optional)
Channel. Default is 0.
Returns
-------
generator of numbers
Coordinates in units for each axis.
"""
# get channel
if isinstance(channel, int):
channel_index = channel
elif isinstance(channel, str):
channel_index = self.channel_names.index(channel)
else:
raise TypeError("channel: expected {int, str}, got %s" % type(channel))
channel = self.channels[channel_index]
# get indicies
idx = channel.argmin()
# finish
return tuple(a[idx] for a in self._axes)
[docs]
def get_zenith(self, channel=0) -> tuple:
"""Get the coordinates, in units, of the maximum in a channel.
Parameters
----------
channel : int or str (optional)
Channel. Default is 0.
Returns
-------
generator of numbers
Coordinates in units for each axis.
"""
# get channel
if isinstance(channel, int):
channel_index = channel
elif isinstance(channel, str):
channel_index = self.channel_names.index(channel)
else:
raise TypeError("channel: expected {int, str}, got %s" % type(channel))
channel = self.channels[channel_index]
# get indicies
idx = channel.argmax()
# finish
return tuple(a[idx] for a in self._axes)
[docs]
def heal(self, channel=0, method="linear", fill_value=np.nan, verbose=True):
"""
Remove nans from channel using interpolation.
Parameters
----------
channel : int or str (optional)
Channel to heal. Default is 0.
method : {'linear', 'nearest', 'cubic'} (optional)
The interpolation method. Note that cubic interpolation is only
possible for 1D and 2D data. See `griddata`__ for more information.
Default is linear.
fill_value : number-like (optional)
The value written to pixels that cannot be filled by interpolation.
Default is nan.
verbose : bool (optional)
Toggle talkback. Default is True.
__ http://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.griddata.html
.. note:: Healing may take several minutes for large datasets.
Interpolation time goes as nearest, linear, then cubic.
"""
warnings.warn("heal", category=wt_exceptions.EntireDatasetInMemoryWarning)
timer = wt_kit.Timer(verbose=False)
with timer:
# channel
if isinstance(channel, int):
channel_index = channel
elif isinstance(channel, str):
channel_index = self.channel_names.index(channel)
else:
raise TypeError("channel: expected {int, str}, got %s" % type(channel))
channel = self.channels[channel_index]
values = self.channels[channel_index][:]
points = [axis[:] for axis in self._axes]
xi = tuple(np.meshgrid(*points, indexing="ij"))
# 'undo' gridding
arr = np.zeros((len(self._axes) + 1, values.size))
for i in range(len(self._axes)):
arr[i] = xi[i].flatten()
arr[-1] = values.flatten()
# remove nans
arr = arr[:, ~np.isnan(arr).any(axis=0)]
# grid data wants tuples
tup = tuple([arr[i] for i in range(len(arr) - 1)])
# grid data
out = griddata(tup, arr[-1], xi, method=method, fill_value=fill_value)
self.channels[channel_index][:] = out
# print
if verbose:
print(
"channel {0} healed in {1} seconds".format(
channel.name, np.around(timer.interval, decimals=3)
)
)
[docs]
def level(self, channel, axis, npts, *, verbose=True):
"""Subtract the average value of npts at the edge of a given axis.
Parameters
----------
channel : int or str
Channel to level.
axis : int
Axis to level along.
npts : int
Number of points to average for each slice. Positive numbers
take points at leading indicies and negative numbers take points
at trailing indicies.
verbose : bool (optional)
Toggle talkback. Default is True.
"""
warnings.warn("level", category=wt_exceptions.EntireDatasetInMemoryWarning)
channel_index = wt_kit.get_index(self.channel_names, channel)
channel = self.channels[channel_index]
# verify npts not zero
npts = int(npts)
if npts == 0:
raise wt_exceptions.ValueError("npts must not be zero")
# get subtrahend
ss = [slice(None)] * self.ndim
if npts > 0:
ss[axis] = slice(0, npts, None)
else:
ss[axis] = slice(npts, None, None)
subtrahend = np.nanmean(channel[ss], axis=axis)
if self.ndim > 1:
subtrahend = np.expand_dims(subtrahend, axis=axis)
# level
channel -= subtrahend
# finish
channel._null = 0
if verbose:
print("channel {0} leveled along axis {1}".format(channel.natural_name, axis))
[docs]
def map_variable(
self, variable, points, input_units="same", *, name=None, parent=None, verbose=True
) -> "Data":
"""Map points of an axis to new points using linear interpolation.
Out-of-bounds points are written nan.
Parameters
----------
variable : string
The variable to map onto.
points : array-like or int
If array, the new points. If int, new points will have the same
limits, with int defining the number of evenly spaced points
between.
input_units : str (optional)
The units of the new points. Default is same, which assumes
the new points have the same units as the variable.
name : string (optional)
The name of the new data object. If None, generated from
natural_name. Default is None.
parent : WrightTools.Collection (optional)
Parent of new data object. If None, data is made at root of a
new temporary file.
verbose : bool (optional)
Toggle talkback. Default is True.
Returns
-------
WrightTools.Data
New data object.
"""
# get variable index
variable_index = wt_kit.get_index(self.variable_names, variable)
variable = self.variables[variable_index]
# get points
if isinstance(points, int):
points = np.linspace(variable.min(), variable.max(), points)
points = np.array(points)
# points dimensionality
if points.ndim < variable.ndim:
for i, d in enumerate(variable.shape):
if d == 1:
points = np.expand_dims(points, axis=i)
# convert points
if input_units == "same":
pass
else:
points = wt_units.converter(points, input_units, variable.units)
# construct new data object
special = [
"name",
"axes",
"constants",
"channel_names",
"variable_names",
"filepath",
"edit_local",
]
kwargs = {k: v for k, v in self.attrs.items() if k not in special}
if name is None:
name = "{0}_{1}_mapped".format(self.natural_name, variable.natural_name)
kwargs["name"] = name
kwargs["parent"] = parent
out = Data(**kwargs)
# mapped variable
values = points
out.create_variable(values=values, **variable.attrs)
# orthogonal variables
for v in self.variables:
if wt_kit.orthogonal(v.shape, variable.shape):
out.create_variable(values=v[:], **v.attrs)
out.transform(*self.axis_expressions)
# interpolate
if self.ndim == 1:
def interpolate(dataset, points):
function = scipy.interpolate.interp1d(variable[:], dataset[:], bounds_error=False)
return function(points)
else:
pts = np.array([a.full.flatten() for a in self.axes]).T
out_pts = np.array([a.full.flatten() for a in out.axes]).T
def interpolate(dataset, points):
values = dataset.full.flatten()
function = scipy.interpolate.LinearNDInterpolator(pts, values, rescale=True)
new = function(out_pts)
new.shape = out.shape
return new
for v in self.variables:
if v.natural_name not in out.variable_names:
out.create_variable(values=interpolate(v, points), **v.attrs)
out.variable_names = self.variable_names # enforce old order
out._variables = None # force regeneration of variables @property
for channel in self.channels:
out.create_channel(values=interpolate(channel, points), **channel.attrs)
# finish
if verbose:
print("data mapped from {0} to {1}".format(self.shape, out.shape))
return out
[docs]
def offset(
self,
points,
offsets,
along,
offset_axis,
units="same",
offset_units="same",
mode="valid",
method="linear",
verbose=True,
):
"""Offset one axis based on another axis' values.
Useful for correcting instrumental artifacts such as zerotune.
Parameters
----------
points : 1D array-like
Points.
offsets : 1D array-like
Offsets.
along : str or int
Axis that points array lies along.
offset_axis : str or int
Axis to offset using offsets.
units : str (optional)
Units of points array.
offset_units : str (optional)
Units of offsets aray.
mode : {'valid', 'full', 'old'} (optional)
Define how far the new axis will extend. Points outside of valid
interpolation range will be written nan.
method : {'linear', 'nearest', 'cubic'} (optional)
The interpolation method. Note that cubic interpolation is only
possible for 1D and 2D data. See `griddata`__ for more information.
Default is linear.
verbose : bool (optional)
Toggle talkback. Default is True.
__ http://docs.scipy.org/doc/scipy/reference/generated/scipy.interpolate.griddata.html
>>> points # an array of w1 points
>>> offsets # an array of d1 corrections
>>> data.offset(points, offsets, 'w1', 'd1')
"""
raise NotImplementedError
# axis ------------------------------------------------------------------------------------
if isinstance(along, int):
axis_index = along
elif isinstance(along, str):
axis_index = self.axis_names.index(along)
else:
raise TypeError("along: expected {int, str}, got %s" % type(along))
axis = self._axes[axis_index]
# values & points -------------------------------------------------------------------------
# get values, points, units
if units == "same":
input_units = axis.units
else:
input_units = units
# check offsets is 1D or 0D
if len(offsets.shape) == 1:
pass
else:
raise RuntimeError("values must be 1D or 0D in offset!")
# check if units is compatible, convert
dictionary = getattr(wt_units, axis.units_kind)
if input_units in dictionary.keys():
pass
else:
raise RuntimeError("units incompatible in offset!")
points = wt_units.converter(points, input_units, axis.units)
# create correction array
function = interp1d(points, offsets, bounds_error=False)
corrections = function(axis[:])
# remove nans
finite_indicies = np.where(np.isfinite(corrections))[0]
left_pad_width = finite_indicies[0]
right_pad_width = len(corrections) - finite_indicies[-1] - 1
corrections = np.pad(
corrections[np.isfinite(corrections)],
(int(left_pad_width), int(right_pad_width)),
mode="edge",
)
# do correction ---------------------------------------------------------------------------
# transpose so axis is last
transpose_order = np.arange(len(self._axes))
transpose_order[axis_index] = len(self._axes) - 1
transpose_order[-1] = axis_index
self.transpose(transpose_order, verbose=False)
# get offset axis index
if isinstance(offset_axis, int):
offset_axis_index = offset_axis
elif isinstance(offset_axis, str):
offset_axis_index = self.axis_names.index(offset_axis)
else:
raise TypeError("offset_axis: expected {int, str}, got %s" % type(offset_axis))
# new points
new_points = [a[:] for a in self._axes]
old_offset_axis_points = self._axes[offset_axis_index][:]
spacing = abs(
(old_offset_axis_points.max() - old_offset_axis_points.min())
/ float(len(old_offset_axis_points))
)
if mode == "old":
new_offset_axis_points = old_offset_axis_points
elif mode == "valid":
_max = old_offset_axis_points.max() + corrections.min()
_min = old_offset_axis_points.min() + corrections.max()
n = int(abs(np.ceil((_max - _min) / spacing)))
new_offset_axis_points = np.linspace(_min, _max, n)
elif mode == "full":
_max = old_offset_axis_points.max() + corrections.max()
_min = old_offset_axis_points.min() + corrections.min()
n = np.ceil((_max - _min) / spacing)
new_offset_axis_points = np.linspace(_min, _max, n)
new_points[offset_axis_index] = new_offset_axis_points
new_xi = tuple(np.meshgrid(*new_points, indexing="ij"))
xi = tuple(np.meshgrid(*[a[:] for a in self._axes], indexing="ij"))
for channel in self.channels:
# 'undo' gridding
arr = np.zeros((len(self._axes) + 1, channel[:].size))
for i in range(len(self._axes)):
arr[i] = xi[i].flatten()
arr[-1] = channel[:].flatten()
# do corrections
corrections = list(corrections)
corrections = corrections * int((len(arr[0]) / len(corrections)))
arr[offset_axis_index] += corrections
# grid data
tup = tuple([arr[i] for i in range(len(arr) - 1)])
# note that rescale is crucial in this operation
out = griddata(tup, arr[-1], new_xi, method=method, fill_value=np.nan, rescale=True)
channel[:] = out
self._axes[offset_axis_index][:] = new_offset_axis_points
# transpose out
self.transpose(transpose_order, verbose=False)
[docs]
def print_tree(self, *, verbose=True):
"""Print a ascii-formatted tree representation of the data contents."""
print("{0} ({1}) {2}".format(self.natural_name, self.filepath, self.shape))
self._print_branch("", depth=0, verbose=verbose)
[docs]
def prune(self, keep_channels=True, *, verbose=True):
"""Remove unused variables and (optionally) channels from the Data object.
Unused variables are those that are not included in either axes or constants.
Unused channels are those not specified in keep_channels, or the first channel.
Parameters
----------
keep_channels : boolean or int or str or tuple
If False, removes all but the first channel.
If int or str, removes all but that index/name channel.
If tuple, removes all channels except those in the tuple by index or name.
Default is True: do not delete channels
verbose : boolean
Toggle talkback. Default is True.
"""
for v in self.variables:
for var in wt_kit.flatten_list([ax.variables for ax in self._axes + self._constants]):
if v == var:
break
else:
self.remove_variable(v.natural_name, implied=False, verbose=verbose)
if keep_channels is not True:
try:
if isinstance(keep_channels, str):
raise TypeError
indexes = tuple(keep_channels)
except TypeError:
indexes = (keep_channels,)
for i, ch in enumerate(self.channels):
if i not in indexes and not ch.natural_name in indexes:
self.remove_channel(ch.natural_name, verbose=verbose)
[docs]
def remove_channel(self, channel, *, verbose=True):
"""Remove channel from data.
Parameters
----------
channel : int or str
Channel index or name to remove.
verbose : boolean (optional)
Toggle talkback. Default is True.
"""
channel_index = wt_kit.get_index(self.channel_names, channel)
new = list(self.channel_names)
name = new.pop(channel_index)
del self[name]
self.channel_names = new
if verbose:
print("channel {0} removed".format(name))
[docs]
def remove_variable(self, variable, *, implied=True, verbose=True):
"""Remove variable from data.
Parameters
----------
variable : int or str
Variable index or name to remove.
implied : boolean (optional)
Toggle deletion of other variables that start with the same
name. Default is True.
verbose : boolean (optional)
Toggle talkback. Default is True.
"""
if isinstance(variable, int):
variable = self.variable_names[variable]
# find all of the implied variables
removed = []
if implied:
for n in self.variable_names:
if n.startswith(variable):
removed.append(n)
else:
removed = [variable]
# check that axes will not be ruined
for n in removed:
for a in self._axes:
if n in [v.natural_name for v in a.variables]:
message = "{0} is contained in axis {1}".format(n, a.expression)
raise RuntimeError(message)
for c in self._constants:
if n in [v.natural_name for v in c.variables]:
warnings.warn(
"Variable being removed used in a constant",
wt_exceptions.WrightToolsWarning,
)
# do removal
for n in removed:
variable_index = wt_kit.get_index(self.variable_names, n)
new = list(self.variable_names)
name = new.pop(variable_index)
del self[name]
self.variable_names = new
self._variables = None
# finish
if verbose:
print("{0} variable(s) removed:".format(len(removed)))
for n in removed:
print(" {0}".format(n))
[docs]
def rename_channels(self, *, verbose=True, **kwargs):
"""Rename a set of channels.
Parameters
----------
kwargs
Keyword arguments of the form current:'new'.
verbose : boolean (optional)
Toggle talkback. Default is True
"""
# ensure that items will remain unique
changed = kwargs.keys()
for k, v in kwargs.items():
if v not in changed and v in self.keys():
raise wt_exceptions.NameNotUniqueError(v)
# compile references to items that are changing
new = {}
for k, v in kwargs.items():
obj = self[k]
index = self.channel_names.index(k)
# rename
new[v] = obj, index
Group._instances.pop(obj.fullpath, None)
obj.natural_name = str(v)
# remove old references
del self[k]
# apply new references
names = list(self.channel_names)
for v, value in new.items():
obj, index = value
self[v] = obj
names[index] = v
self.channel_names = names
# finish
if verbose:
print("{0} channel(s) renamed:".format(len(kwargs)))
for k, v in kwargs.items():
print(" {0} --> {1}".format(k, v))
[docs]
def rename_variables(self, *, implied=True, verbose=True, **kwargs):
"""Rename a set of variables.
Parameters
----------
kwargs
Keyword arguments of the form current:'new'.
implied : boolean (optional)
Toggle inclusion of other variables that start with the same
name. Default is True.
verbose : boolean (optional)
Toggle talkback. Default is True
"""
# find all of the implied variables
kwargs = collections.OrderedDict(kwargs)
if implied:
new = collections.OrderedDict()
for k, v in kwargs.items():
for n in self.variable_names:
if n.startswith(k):
new[n] = n.replace(k, v, 1)
kwargs = new
# ensure that items will remain unique
changed = kwargs.keys()
for k, v in kwargs.items():
if v not in changed and v in self.keys():
raise wt_exceptions.NameNotUniqueError(v)
# compile references to items that are changing
new = {}
for k, v in kwargs.items():
obj = self[k]
index = self.variable_names.index(k)
# rename
new[v] = obj, index
Group._instances.pop(obj.fullpath, None)
obj.natural_name = str(v)
# remove old references
del self[k]
# apply new references
names = list(self.variable_names)
for v, value in new.items():
obj, index = value
self[v] = obj
names[index] = v
self.variable_names = names
units = self.units
new = list(self.axis_expressions)
for i, v in enumerate(kwargs.keys()):
for j, n in enumerate(new):
new[j] = n.replace(v, "{%i}" % i)
for i, n in enumerate(new):
new[i] = n.format(*kwargs.values())
self.transform(*new)
for a, u in zip(self._axes, units):
a.convert(u)
units = self.constant_units
new = list(self.constant_expressions)
for i, v in enumerate(kwargs.keys()):
for j, n in enumerate(new):
new[j] = n.replace(v, "{%i}" % i)
for i, n in enumerate(new):
new[i] = n.format(*kwargs.values())
self.set_constants(*new)
for c, u in zip(self._constants, units):
c.convert(u)
# finish
if verbose:
print("{0} variable(s) renamed:".format(len(kwargs)))
for k, v in kwargs.items():
print(" {0} --> {1}".format(k, v))
[docs]
def share_nans(self):
"""Share not-a-numbers between all channels.
If any channel is nan at a given index, all channels will be nan
at that index after this operation.
Uses the share_nans method found in wt.kit.
"""
def f(_, s, channels):
outs = wt_kit.share_nans(*[c[s] for c in channels])
for c, o in zip(channels, outs):
c[s] = o
self.channels[0].chunkwise(f, self.channels)
[docs]
def smooth(self, factors, channel=None, verbose=True) -> "Data":
"""Smooth a channel using an n-dimenional kaiser window.
Note, all arrays are loaded into memory.
For more info see `Kaiser_window`__ wikipedia entry.
__ https://en.wikipedia.org/wiki/Kaiser_window
Parameters
----------
factors : int or list of int
The smoothing factor. You may provide a list of smoothing factors
for each axis.
channel : int or str or None (optional)
The channel to smooth. If None, all channels will be smoothed.
Default is None.
verbose : bool (optional)
Toggle talkback. Default is True.
"""
warnings.warn("smooth", category=wt_exceptions.EntireDatasetInMemoryWarning)
# get factors -----------------------------------------------------------------------------
if isinstance(factors, list):
pass
else:
dummy = np.zeros(len(self._axes))
dummy[::] = factors
factors = list(dummy)
# get channels ----------------------------------------------------------------------------
if channel is None:
channels = self.channels
else:
if isinstance(channel, int):
channel_index = channel
elif isinstance(channel, str):
channel_index = self.channel_names.index(channel)
else:
raise TypeError("channel: expected {int, str}, got %s" % type(channel))
channels = [self.channels[channel_index]]
# smooth ----------------------------------------------------------------------------------
for channel in channels:
values = channel[:]
for axis_index in range(len(factors)):
factor = factors[axis_index]
# transpose so the axis of interest is last
transpose_order = range(len(values.shape))
# replace axis_index with zero
transpose_order = [
len(values.shape) - 1 if i == axis_index else i for i in transpose_order
]
transpose_order[len(values.shape) - 1] = axis_index
values = values.transpose(transpose_order)
# get kaiser window
beta = 5.0
w = np.kaiser(2 * factor + 1, beta)
# for all slices...
for index in np.ndindex(values[..., 0].shape):
current_slice = values[index]
temp_slice = np.pad(current_slice, int(factor), mode=str("edge"))
values[index] = np.convolve(temp_slice, w / w.sum(), mode=str("valid"))
# transpose out
values = values.transpose(transpose_order)
# return array to channel object
channel[:] = values
if verbose:
print("smoothed data")
[docs]
def split(
self, expression, positions, *, units=None, parent=None, verbose=True
) -> wt_collection.Collection:
"""
Split the data object along a given expression, in units.
Parameters
----------
expression : int or str
The expression to split along. If given as an integer, the axis at that index
is used.
positions : number-type or 1D array-type
The position(s) to split at, in units.
units : str (optional)
The units of the given positions. Default is same, which assumes
input units are identical to first variable units.
parent : WrightTools.Collection (optional)
The parent collection in which to place the 'split' collection.
Default is a new Collection.
verbose : bool (optional)
Toggle talkback. Default is True.
Returns
-------
WrightTools.collection.Collection
A Collection of data objects.
The order of the objects is such that the axis points retain their original order.
See Also
--------
chop
Divide the dataset into its lower-dimensionality components.
collapse
Collapse the dataset along one axis.
"""
# axis ------------------------------------------------------------------------------------
old_expr = self.axis_expressions
old_units = self.units
out = wt_collection.Collection(name="split", parent=parent)
if isinstance(expression, int):
if units is None:
units = self._axes[expression].units
expression = self._axes[expression].expression
elif isinstance(expression, str):
pass
else:
raise TypeError("expression: expected {int, str}, got %s" % type(expression))
self.transform(expression)
if units:
self.convert(units, verbose=False)
try:
positions = [-np.inf] + sorted(list(positions)) + [np.inf]
except TypeError:
positions = [-np.inf, positions, np.inf]
values = self._axes[0].full
masks = [(values >= lo) & (values < hi) for lo, hi in wt_kit.pairwise(positions)]
omasks = []
cuts = []
for mask in masks:
try:
omasks.append(wt_kit.mask_reduce(mask))
cuts.append([i == 1 for i in omasks[-1].shape])
# Ensure at least one axis is kept
if np.all(cuts[-1]):
cuts[-1][0] = False
except ValueError:
omasks.append(None)
cuts.append(None)
for i in range(len(positions) - 1):
out.create_data("split%03i" % i)
for var in self.variables:
for i, (imask, omask, cut) in enumerate(zip(masks, omasks, cuts)):
if omask is None:
# Zero length split
continue
omask = wt_kit.enforce_mask_shape(omask, var.shape)
omask.shape = tuple([s for s, c in zip(omask.shape, cut) if not c])
out_arr = np.full(omask.shape, np.nan)
imask = wt_kit.enforce_mask_shape(imask, var.shape)
out_arr[omask] = var[:][imask]
out[i].create_variable(values=out_arr, **var.attrs, dtype=var.dtype)
for ch in self.channels:
for i, (imask, omask, cut) in enumerate(zip(masks, omasks, cuts)):
if omask is None:
# Zero length split
continue
omask = wt_kit.enforce_mask_shape(omask, ch.shape)
omask.shape = tuple([s for s, c in zip(omask.shape, cut) if not c])
out_arr = np.full(omask.shape, np.nan, dtype=ch.dtype)
imask = wt_kit.enforce_mask_shape(imask, ch.shape)
out_arr[omask] = ch[:][imask]
out[i].create_channel(values=out_arr, **ch.attrs, dtype=ch.dtype)
if verbose:
for d in out.values():
try:
d.transform(expression)
except IndexError:
continue
print("split data into {0} pieces along <{1}>:".format(len(positions) - 1, expression))
for i, (lo, hi) in enumerate(wt_kit.pairwise(positions)):
new_data = out[i]
if new_data.shape == ():
print(" {0} : None".format(i))
else:
new_axis = new_data.axes[0]
print(
" {0} : {1:0.2f} to {2:0.2f} {3} {4}".format(
i, lo, hi, self.axes[0].units, new_axis.shape
)
)
for d in out.values():
try:
d.transform(*old_expr)
keep = []
keep_units = []
for ax, u in zip(d.axes, old_units):
if ax.size > 1:
keep.append(ax.expression)
keep_units.append(u)
else:
d.create_constant(ax.expression, verbose=False)
d.transform(*keep)
for ax, u in zip(d.axes, keep_units):
ax.convert(u)
except IndexError:
continue
tempax = Axis(d, expression)
if all(
np.all(
np.sum(~np.isnan(tempax.masked), axis=tuple(set(range(tempax.ndim)) - {j}))
<= 1
)
for j in range(tempax.ndim)
):
d.create_constant(expression, verbose=False)
self.transform(*old_expr)
for ax, u in zip(self.axes, old_units):
ax.convert(u)
return out
def translate_to_txt(
self, filepath, delimiter="\t", channels=None, variables=None, fmt=".5g", verbose=True
):
"""
Write a serialized, readable list of the channels and variables to
file. Each line (row) denotes a seperate data point. Each column
represents a unique variable or channel. Axes are neglected.
Parameters
----------
filepath: path-like
path to output file.
delimiter: str
separation character. Defaults to "\t"
channels: list[str] (optional)
List of channel names to include. Default is all channels.
variables: list[str] (optional)
List of variable names to include. Default is all variables.
fmt: [python format spec](https://docs.python.org/3/library/string.html#formatspec)
format specifier for variables and channels
Returns
-------
None
Notes
-----
* This is a lossy write procedure; some properties, such as axes, are
not recorded.
* The shape structure of the data is recorded as a series of indices
(`{a_i}`) comprising the first few columns. A vertical line separates
these indexes from variables and channels
* wt5, the native file format for Data objects, is a specific variant
of the HDF5 file format. HDF5 has well-developed tools for working with
generic datasets. See
https://portal.hdfgroup.org/display/knowledge/How+to+convert+an+HDF4+or+HDF5+file+to+ASCII+%28text%29+or+Excel
for more information.
"""
# attrs
import tidy_headers
tidy_headers.write(filepath, {k: v for k, v in self.attrs.items()})
columns = [f"a_{i}" for i in range(self.ndim)]
columns.append("|")
is_broadcast = []
variables = list(self.variable_names) if variables is None else variables
for var in variables:
columns.append(f"{var} ({self[var].units})")
is_broadcast.append([i == 1 for i in self[var].shape])
channels = list(self.channel_names) if channels is None else channels
for ch in channels:
columns.append(f"{ch} ({self[ch].units})")
is_broadcast.append([i == 1 for i in self[ch].shape])
with open(filepath, "a") as f:
f.write(delimiter.join(columns) + "\n")
chunk = ""
for i, ndi in enumerate(np.ndindex(self.shape)):
line = [str(i) for i in ndi]
line += ["|"]
for j, name in enumerate(variables + channels):
arr = self[name]
# broadcast reduced dimensions arrays to full
idxs = tuple(xi * (not yi) for xi, yi in zip(ndi, is_broadcast[j]))
line.append(f"{arr[idxs]:{fmt}}")
if verbose and ((i == 0) or (not (i % 10))):
frac = round(i / self.size, 3)
sys.stdout.write(
f"[{'=' * int(frac * 60): <60}] {frac * 100:0.1f}% ...to_txt\r"
)
sys.stdout.flush()
chunk += delimiter.join(line) + "\n"
if (i % 100) == 99: # write to disk periodically
f.write(chunk)
chunk = ""
if chunk:
f.write(chunk)
if verbose:
sys.stdout.write(f"[{'=' * 60}] {100:0.1f}% ...done! \r")
sys.stdout.flush()
[docs]
def set_constants(self, *constants, verbose=True):
"""Set the constants associated with the data.
Parameters
----------
constants : str
Expressions for the new set of constants.
verbose : boolean (optional)
Toggle talkback. Default is True
See Also
--------
transform
Similar method except for axes.
create_constant
Add an individual constant.
remove_constant
Remove an individual constant.
"""
# create
new = []
current = {c.expression: c for c in self._constants}
for expression in constants:
constant = current.get(expression, Constant(self, expression))
new.append(constant)
self._constants = new
# units
for c in self._constants:
if c.units is None:
c.convert(c.variables[0].units)
# finish
self.flush()
self._on_constants_updated()
[docs]
def create_constant(self, expression, *, verbose=True):
"""Append a constant to the stored list.
Parameters
----------
expression : str
Expression for the new constant.
verbose : boolean (optional)
Toggle talkback. Default is True
See Also
--------
set_constants
Remove and replace all constants.
remove_constant
Remove an individual constant.
"""
if expression in self.constant_expressions:
wt_exceptions.ObjectExistsWarning.warn(expression)
return self.constants[self.constant_expressions.index(expression)]
constant = Constant(self, expression)
if constant.units is None:
constant.convert(constant.variables[0].units)
self._constants.append(constant)
self.flush()
self._on_constants_updated()
if verbose:
print("Constant '{}' added".format(constant.expression))
return constant
[docs]
def remove_constant(self, constant, *, verbose=True):
"""Remove a constant from the stored list.
Parameters
----------
constant : str or Constant or int
Expression for the new constant.
verbose : boolean (optional)
Toggle talkback. Default is True
See Also
--------
set_constants
Remove and replace all constants.
create_constant
Add an individual constant.
"""
if isinstance(constant, (str, int)):
constant_index = wt_kit.get_index(self.constant_expressions, constant)
elif isinstance(constant, Constant):
constant_index = wt_kit.get_index(self.constants, constant)
else:
raise TypeError(f"unsupported type for 'constant': {type(constant).__name__}")
constant = self._constants[constant_index]
self._constants.pop(constant_index)
self.flush()
self._on_constants_updated()
if verbose:
print("Constant '{}' removed".format(constant.expression))
[docs]
def zoom(self, factor, order=1, verbose=True):
"""Zoom the data array using spline interpolation of the requested order.
The number of points along each axis is increased by factor.
See `scipy ndimage`__ for more info.
__ http://docs.scipy.org/doc/scipy/reference/
generated/scipy.ndimage.zoom.html
Parameters
----------
factor : float
The number of points along each axis will increase by this factor.
order : int (optional)
The order of the spline used to interpolate onto new points.
verbose : bool (optional)
Toggle talkback. Default is True.
"""
raise NotImplementedError
import scipy.ndimage
# axes
for axis in self._axes:
axis[:] = scipy.ndimage.zoom(axis[:], factor, order=order)
# channels
for channel in self.channels:
channel[:] = scipy.ndimage.zoom(channel[:], factor, order=order)
# return
if verbose:
print("data zoomed to new shape:", self.shape)