Source code for WrightTools.data._pycmds

"""PyCMDS."""

# --- import --------------------------------------------------------------------------------------


import itertools
import os
import pathlib

import h5py
import numpy as np

import tidy_headers

from ._data import Data
from .. import kit as wt_kit
from .. import units as wt_units


# --- define --------------------------------------------------------------------------------------


__all__ = ["from_PyCMDS"]


# --- from function -------------------------------------------------------------------------------


[docs] def from_PyCMDS(filepath, name=None, parent=None, verbose=True, *, collapse=True) -> Data: """Create a data object from a single PyCMDS output file. Parameters ---------- filepath : path-like Path to the .data file Can be either a local or remote file (http/ftp). Can be compressed with gz/bz2, decompression based on file name. name : str or None (optional) The name to be applied to the new data object. If None, name is read from file. parent : WrightTools.Collection (optional) Collection to place new data object within. Default is None. verbose : bool (optional) Toggle talkback. Default is True. Returns ------- data A Data instance. """ filestr = os.fspath(filepath) # header ds = np.DataSource(None) file_ = ds.open(filestr, "rt") headers = tidy_headers.read(file_) file_.seek(0) # name if name is None: # name not given in method arguments data_name = headers["data name"] else: data_name = name if data_name == "": # name not given in PyCMDS data_name = headers["data origin"] # create data object kwargs = { "name": data_name, "kind": "PyCMDS", "source": filestr, "created": headers["file created"], } if parent is not None: data = parent.create_data(**kwargs) else: data = Data(**kwargs) if collapse: # array arr = np.genfromtxt(file_).T # get axes and scanned variables axes = [] for name, identity, units in zip( headers["axis names"], headers["axis identities"], headers["axis units"] ): # points and centers points = np.array(headers[name + " points"]) if name + " centers" in headers.keys(): centers = headers[name + " centers"] else: centers = None # create axis = { "points": points, "units": units, "name": name, "identity": identity, "centers": centers, } axes.append(axis) shape = tuple([a["points"].size for a in axes]) for i, ax in enumerate(axes): sh = [1] * len(shape) sh[i] = len(ax["points"]) data.create_variable( name=ax["name"] + "_points", values=np.array(ax["points"]).reshape(sh) ) if ax["centers"] is not None: centers = np.array(ax["centers"]) sh = list(shape) sh[i] = 1 for j, s in enumerate(sh): if centers.size % s: sh[j] = 1 data.create_variable( name=ax["name"] + "_centers", values=np.array(centers.reshape(sh)) ) # get assorted remaining things # variables and channels try: signed = iter(headers["channel signed"]) except KeyError: signed = itertools.repeat(False) for index, (kind, name) in enumerate(zip(headers["kind"], headers["name"])): if collapse: _collapse_read_in(data, headers, axes, arr, signed, index, kind, name, shape) else: _no_collapse_create(data, headers, signed, index, kind, name, shape) if not collapse: _no_collapse_fill(data, headers, file_, shape, verbose) file_.close() # axes for a in axes: expression = a["identity"] if expression.startswith("D"): expression = expression[1:] expression.replace("=D", "=") a["expression"] = expression data.transform(*[a["expression"] for a in axes]) for a, u in zip(data.axes, headers["axis units"]): if u is not None: a.convert(u) if ( headers["system name"] == "fs" and int(headers["PyCMDS version"].split(".")[0]) == 0 and int(headers["PyCMDS version"].split(".")[1]) < 10 ): # in versions of PyCMDS up to (and including) 0.9.0 # there was an incorrect hard-coded conversion factor between mm and fs # this ONLY applied to Newport MFA stages # we apply this correction knowing that Newport MFAs were only used on the "fs" system # and knowing that the Newport MFAs were always assigned as "d1", "d2" and "d3" # ---Blaise 2019-04-09 for delay in ("d1", "d2", "d3", "d1_points", "d2_points", "d3_points"): if delay not in data.variable_names: continue data[delay][:] *= 6000.671281903963041 / 6671.281903963041 if verbose: print(f"Correction factor applied to {delay}") # return if verbose: print("data created at {0}".format(data.fullpath)) print(" axes: {0}".format(data.axis_names)) print(" shape: {0}".format(data.shape)) return data
def _collapse_read_in(data, headers, axes, arr, signed, index, kind, name, shape): values = np.full(np.prod(shape), np.nan) values[: len(arr[index])] = arr[index] values.shape = shape if name == "time": for i in range(len(shape)): tolerance = 1e-6 mean = np.nanmean(values, axis=i) mean = np.expand_dims(mean, i) values, meanexp = wt_kit.share_nans(values, mean) if np.allclose(meanexp, values, atol=tolerance, rtol=0, equal_nan=True): values = mean data.create_variable(name="labtime", values=values) if kind == "hardware": # sadly, recorded tolerances are not reliable # so a bit of hard-coded hacking is needed # if this ends up being too fragile, we might have to use the points arrays # ---Blaise 2018-01-09 units = headers["units"][index] label = headers["label"][index] if ( "w" in name and name.startswith(tuple(data.variable_names)) and name not in headers["axis names"] ): inherited_shape = data[name.split("_")[0]].shape for i, s in enumerate(inherited_shape): if s == 1: values = np.mean(values, axis=i) values = np.expand_dims(values, i) else: tolerance = headers["tolerance"][index] units = headers["units"][index] for i in range(len(shape)): if tolerance is None: break if "d" in name: # This is a hack because delay is particularly # unreliable in tolerance. And 3 fs vs 3 ps is a huge # difference... KFS 2019-2-27 if units == "fs": tolerance = 3.0 else: tolerance = 0.1 if "zero" in name: tolerance = 1e-10 if name in headers["axis names"]: if ( i == headers["axis names"].index(name) or f"{name}_centers" in data.variable_names ): tolerance = 1e-10 else: tolerance = np.inf mean = np.nanmean(values, axis=i) mean = np.expand_dims(mean, i) values, meanexp = wt_kit.share_nans(values, mean) if np.allclose(meanexp, values, atol=tolerance, rtol=0, equal_nan=True): values = mean if name in headers["axis names"]: points = np.array(headers[name + " points"]) pointsshape = [1] * values.ndim for i, ax in enumerate(axes): if ax["name"] == name: pointsshape[i] = len(points) break points.shape = pointsshape points = wt_units.converter(points, headers["axis units"][i], units) for i in range(points.ndim): if points.shape[i] == 1: points = np.repeat(points, values.shape[i], axis=i) if points.size <= values.size: values[np.isnan(values)] = points[np.isnan(values)] data.create_variable(name, values=values, units=units, label=label) if kind == "channel": data.create_channel(name=name, values=values, shape=values.shape, signed=next(signed)) def _no_collapse_create(data, headers, signed, index, kind, name, shape): sh = shape if "wa" in headers["name"] and name not in ("wa", "array", "array_signal"): sh = list(sh) sh[-1] = 1 sh = tuple(sh) if name == "time": data.create_variable(name="labtime", dtype=np.dtype(np.float64), shape=sh) if kind == "hardware": units = headers["units"][index] label = headers["label"][index] data.create_variable(name, shape=sh, dtype=np.dtype(np.float64), units=units, label=label) if kind == "channel": data.create_channel(name=name, shape=sh, dtype=np.dtype(np.float64), signed=next(signed)) def _no_collapse_fill(data, headers, file_, shape, verbose): frame_size = shape[-1] file_.seek(0) arr = np.genfromtxt(file_, max_rows=frame_size) while arr.size > 0: index = tuple(arr[0, 0 : len(shape) - 1].astype(np.int64)) if verbose: print(index) for i, (kind, name) in enumerate(zip(headers["kind"], headers["name"])): if kind is None and name != "time": continue if name == "time": name = "labtime" if "wa" not in headers["name"] or name in ("wa", "array", "array_signal"): h5py.Group.__getitem__(data, name)[index + (...,)] = arr[:, i] else: h5py.Group.__getitem__(data, name)[index + (...,)] = arr[0, i] arr = np.genfromtxt(file_, max_rows=frame_size)