Source code for WrightTools.data._colors

"""COLORS."""

# --- import --------------------------------------------------------------------------------------


import os
import pathlib
import collections

import numpy as np

from scipy.interpolate import griddata

from ._data import Data
from .. import kit as wt_kit


# --- define --------------------------------------------------------------------------------------


__all__ = ["from_COLORS"]


# --- from function -------------------------------------------------------------------------------


[docs] def from_COLORS( filepaths, name=None, cols=None, invert_d1=True, ignore=["w3", "wa", "dref", "m0", "m1", "m2", "m3", "m4", "m5", "m6"], parent=None, verbose=True, ): """Create data object from COLORS file(s). Parameters ---------- filepaths : path-like or list of path-like Filepath(s). Can be either a local or remote file (http/ftp). Can be compressed with gz/bz2, decompression based on file name. name : string (optional) Unique dataset identifier. If None (default), autogenerated. cols : {'v0', 'v1', 'v2'} (optional) Format of COLORS dat file. If None, autorecognized. Default is None. invert_d1 : boolean (optional) Toggle inversion of D1 at import time. Default is True. ignore : list of strings (optional) Columns to ignore. parent : WrightTools.Collection (optional) Collection to place new data object within. Default is None. verbose : bool (optional) Toggle talkback. Default is True. Returns ------- WrightTools.Data Data from COLORS. """ # do we have a list of files or just one file? ------------------------------------------------ if isinstance(filepaths, list): filestrs = [os.fspath(f) for f in filepaths] filepaths = [pathlib.Path(f) for f in filepaths] else: filestrs = [os.fspath(filepaths)] filepaths = [pathlib.Path(filepaths)] ds = np.DataSource(None) # define format of dat file ------------------------------------------------------------------- if cols: pass else: f = ds.open(filestrs[0], "rt") num_cols = len(np.genfromtxt(f).T) f.close() if num_cols in [28, 35, 41]: cols = "v2" elif num_cols in [20]: cols = "v1" elif num_cols in [15, 16, 19]: cols = "v0" if verbose: print("cols recognized as", cols, "(%d)" % num_cols) if cols == "v2": axes = collections.OrderedDict() axes["w1"] = {"idx": 1, "units": "nm", "tolerance": 0.5, "label": "1"} axes["w2"] = {"idx": 3, "units": "nm", "tolerance": 0.5, "label": "2"} axes["w3"] = {"idx": 5, "units": "nm", "tolerance": 0.5, "label": "3"} axes["wm"] = {"idx": 7, "units": "nm", "tolerance": 0.5, "label": "m"} axes["wa"] = {"idx": 8, "units": "nm", "tolerance": 1.0, "label": "a"} axes["d0"] = {"idx": 10, "units": "fs", "tolerance": 25.0, "label": "0"} axes["d1"] = {"idx": 12, "units": "fs", "tolerance": 4.0, "label": "1"} axes["d2"] = {"idx": 14, "units": "fs", "tolerance": 4.0, "label": "2"} axes["m0"] = {"idx": 22, "units": None, "tolerance": 10.0, "label": 0} axes["m1"] = {"idx": 23, "units": None, "tolerance": 10.0, "label": 1} axes["m2"] = {"idx": 24, "units": None, "tolerance": 10.0, "label": 2} axes["m3"] = {"idx": 25, "units": None, "tolerance": 10.0, "label": 3} axes["m4"] = {"idx": 26, "units": None, "tolerance": 15.0, "label": 4} axes["m5"] = {"idx": 27, "units": None, "tolerance": 15.0, "label": 5} axes["m6"] = {"idx": 28, "units": None, "tolerance": 15.0, "label": 6} channels = collections.OrderedDict() channels["ai0"] = {"idx": 16, "label": "0"} channels["ai1"] = {"idx": 17, "label": "1"} channels["ai2"] = {"idx": 18, "label": "2"} channels["ai3"] = {"idx": 19, "label": "3"} channels["ai4"] = {"idx": 20, "label": "4"} channels["mc"] = {"idx": 21, "label": "a"} elif cols == "v1": axes = collections.OrderedDict() axes["w1"] = {"idx": 1, "units": "nm", "tolerance": 0.5, "label": "1"} axes["w2"] = {"idx": 3, "units": "nm", "tolerance": 0.5, "label": "2"} axes["wm"] = {"idx": 5, "units": "nm", "tolerance": 0.5, "label": "m"} axes["d1"] = {"idx": 6, "units": "fs", "tolerance": 3.0, "label": "1"} axes["d2"] = {"idx": 7, "units": "fs", "tolerance": 3.0, "label": "2"} channels = collections.OrderedDict() channels["ai0"] = {"idx": 8, "label": "0"} channels["ai1"] = {"idx": 9, "label": "1"} channels["ai2"] = {"idx": 10, "label": "2"} channels["ai3"] = {"idx": 11, "label": "3"} elif cols == "v0": axes = collections.OrderedDict() axes["w1"] = {"idx": 1, "units": "nm", "tolerance": 0.5, "label": "1"} axes["w2"] = {"idx": 3, "units": "nm", "tolerance": 0.5, "label": "2"} axes["wm"] = {"idx": 5, "units": "nm", "tolerance": 0.5, "label": "m"} axes["d1"] = {"idx": 6, "units": "fs", "tolerance": 3.0, "label": "1"} axes["d2"] = {"idx": 8, "units": "fs", "tolerance": 3.0, "label": "2"} channels = collections.OrderedDict() channels["ai0"] = {"idx": 10, "label": "0"} channels["ai1"] = {"idx": 11, "label": "1"} channels["ai2"] = {"idx": 12, "label": "2"} channels["ai3"] = {"idx": 13, "label": "3"} # import full array --------------------------------------------------------------------------- arr = [] for f in filestrs: ff = ds.open(f, "rt") arr.append(np.genfromtxt(ff).T) ff.close() arr = np.concatenate(arr, axis=1) if invert_d1: idx = axes["d1"]["idx"] arr[idx] = -arr[idx] # recognize dimensionality of data ------------------------------------------------------------ axes_discover = axes.copy() for key in ignore: if key in axes_discover: axes_discover.pop(key) # remove dimensions that mess up discovery scanned = wt_kit.discover_dimensions(arr, axes_discover) # create data object -------------------------------------------------------------------------- if name is None: name = wt_kit.string2identifier(filepaths[0].name) kwargs = {"name": name, "kind": "COLORS", "source": filestrs} if parent is not None: data = parent.create_data(**kwargs) else: data = Data(**kwargs) # grid and fill data -------------------------------------------------------------------------- # variables ndim = len(scanned) for i, key in enumerate(scanned.keys()): for name in key.split("="): shape = [1] * ndim a = scanned[key] shape[i] = a.size a.shape = tuple(shape) units = axes[name]["units"] label = axes[name]["label"] data.create_variable(name=name, values=a, units=units, label=label) for key, dic in axes.items(): if key not in data.variable_names: c = np.mean(arr[dic["idx"]]) if not np.isnan(c): shape = [1] * ndim a = np.array([c]) a.shape = tuple(shape) units = dic["units"] label = dic["label"] data.create_variable(name=key, values=a, units=units, label=label) # channels points = tuple(arr[axes[key.split("=")[0]]["idx"]] for key in scanned.keys()) if len(scanned) == 1: # 1D data (xi,) = scanned.values() for key in channels.keys(): channel = channels[key] zi = arr[channel["idx"]] grid_i = griddata(points, zi, xi, method="nearest") data.create_channel(name=key, values=grid_i) else: # all other dimensionalities xi = tuple(np.meshgrid(*scanned.values(), indexing="ij")) for key in channels.keys(): channel = channels[key] zi = arr[channel["idx"]] fill_value = min(zi) grid_i = griddata(points, zi, xi, method="linear", fill_value=fill_value) data.create_channel(name=key, values=grid_i) # axes data.transform(*scanned.keys()) # return -------------------------------------------------------------------------------------- if verbose: print("data created at {0}".format(data.fullpath)) print(" axes: {0}".format(data.axis_names)) print(" shape: {0}".format(data.shape)) return data