Source code for WrightTools.data._kent

"""Kent Meyer."""

# --- import --------------------------------------------------------------------------------------


import os
import pathlib

import collections

import numpy as np

from scipy.interpolate import griddata

from ._data import Data
from .. import kit as wt_kit


# --- define --------------------------------------------------------------------------------------


__all__ = ["from_KENT"]


# --- from function -------------------------------------------------------------------------------


[docs] def from_KENT( filepaths, name=None, ignore=["wm"], delay_tolerance=0.1, frequency_tolerance=0.5, parent=None, verbose=True, ) -> Data: """Create data object from KENT file(s). Parameters ---------- filepaths : path-like or list of path-like Filepath(s). Can be either a local or remote file (http/ftp). Can be compressed with gz/bz2, decompression based on file name. name : string (optional) Unique dataset identifier. If None (default), autogenerated. ignore : list of strings (optional) Columns to ignore. Default is ['wm']. delay_tolerance : float (optional) Tolerance below-which to ignore delay changes (in picoseconds). Default is 0.1. frequency_tolerance : float (optional) Tolerance below-which to ignore frequency changes (in wavenumbers). Default is 0.5. parent : WrightTools.Collection (optional) Collection to place new data object within. Default is None. verbose : bool (optional) Toggle talkback. Default is True. Returns ------- WrightTools.Data Data from KENT. """ # define columns ------------------------------------------------------------------------------ # axes axes = collections.OrderedDict() axes["w1"] = {"units": "wn", "idx": 0, "label": "1"} axes["w2"] = {"units": "wn", "idx": 1, "label": "2"} axes["wm"] = {"units": "wn", "idx": 2, "label": "m"} axes["d1"] = {"units": "ps", "idx": 3, "label": "1"} axes["d2"] = {"units": "ps", "idx": 4, "label": "2"} for key in axes.keys(): if "w" in key: axes[key]["tolerance"] = frequency_tolerance elif "d" in key: axes[key]["tolerance"] = delay_tolerance # channels channels = collections.OrderedDict() channels["signal"] = {"idx": 5} channels["OPA1"] = {"idx": 6} channels["OPA2"] = {"idx": 7} # do we have a list of files or just one file? ------------------------------------------------ if not isinstance(filepaths, list): filepaths = [filepaths] filestrs = [os.fspath(f) for f in filepaths] filepaths = [pathlib.Path(f) for f in filepaths] # import full array --------------------------------------------------------------------------- ds = np.DataSource(None) arr = [] for f in filestrs: ff = ds.open(f, "rt") arr.append(np.genfromtxt(ff).T) ff.close() arr = np.concatenate(arr, axis=1) # recognize dimensionality of data ------------------------------------------------------------ axes_discover = axes.copy() for key in ignore: if key in axes_discover: axes_discover.pop(key) # remove dimensions that mess up discovery scanned = wt_kit.discover_dimensions(arr, axes_discover) # create data object -------------------------------------------------------------------------- if name is None: name = wt_kit.string2identifier(filepaths[0].name) kwargs = {"name": name, "kind": "KENT", "source": filestrs} if parent is not None: data = parent.create_data(**kwargs) else: data = Data(**kwargs) # grid and fill data -------------------------------------------------------------------------- # variables ndim = len(scanned) for i, key in enumerate(scanned.keys()): for name in key.split("="): shape = [1] * ndim a = scanned[key] shape[i] = a.size a.shape = tuple(shape) units = axes[name]["units"] label = axes[name]["label"] data.create_variable(name=name, values=a, units=units, label=label) for key, dic in axes.items(): if key not in data.variable_names: c = np.mean(arr[dic["idx"]]) if not np.isnan(c): shape = [1] * ndim a = np.array([c]) a.shape = tuple(shape) units = dic["units"] label = dic["label"] data.create_variable(name=key, values=a, units=units, label=label) # channels if len(scanned) == 1: # 1D data for key in channels.keys(): channel = channels[key] zi = arr[channel["idx"]] data.create_channel(name=key, values=zi) else: # all other dimensionalities # channels points = tuple(arr[axes[key.split("=")[0]]["idx"]] for key in scanned.keys()) xi = tuple(np.meshgrid(*scanned.values(), indexing="ij")) for key in channels.keys(): channel = channels[key] zi = arr[channel["idx"]] fill_value = min(zi) grid_i = griddata(points, zi, xi, method="linear", fill_value=fill_value) data.create_channel(name=key, values=grid_i) # axes data.transform(*scanned.keys()) # return -------------------------------------------------------------------------------------- if verbose: print("data created at {0}".format(data.fullpath)) print(" axes: {0}".format(data.axis_names)) print(" shape: {0}".format(data.shape)) return data