Source code for limix.qc._quant_gauss

from limix._bits import dask, numpy, pandas, xarray


[docs]def quantile_gaussianize(X, axis=1, inplace=False): r"""Normalize a sequence of values via rank and Normal c.d.f. It defaults to column-wise normalization. Parameters ---------- X : array_like Array of values. axis : int, optional Axis value. Defaults to `1`. inplace : bool, optional Defaults to `False`. Returns ------- array_like Gaussian-normalized values. Examples -------- .. doctest:: >>> from limix.qc import quantile_gaussianize >>> from numpy import array_str >>> >>> qg = quantile_gaussianize([-1, 0, 2]) >>> print(qg) # doctest: +FLOAT_CMP [-0.67448975 0. 0.67448975] """ from numpy import issubdtype, integer, asarray if hasattr(X, "dtype") and issubdtype(X.dtype, integer): raise ValueError("Integer type is not supported.") if isinstance(X, (tuple, list)): if inplace: raise ValueError("Can't use `inplace=True` for {}.".format(type(X))) X = asarray(X, float) if numpy.is_array(X): X = _qg_numpy(X, axis, inplace) elif pandas.is_series(X): X = _qg_pandas_series(X, axis, inplace) elif pandas.is_dataframe(X): X = _qg_pandas_dataframe(X, axis, inplace) elif dask.is_array(X): X = _qg_dask_array(X, axis, inplace) elif dask.is_series(X): raise NotImplementedError() elif dask.is_dataframe(X): X = _qg_dask_dataframe(X, axis, inplace) elif xarray.is_dataarray(X): X = _qg_xarray_dataarray(X, axis, inplace) else: raise NotImplementedError() return X
def _qg_numpy(X, axis, inplace): from scipy.stats import norm from numpy import isfinite from numpy.ma import masked_invalid from numpy_sugar import nanrankdata from numpy import apply_along_axis import warnings orig_shape = X.shape if X.ndim == 1: X = X.reshape(orig_shape + (1,)) if not inplace: X = X.copy() D = X.swapaxes(1, axis) D = masked_invalid(D) D *= -1 with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) D = nanrankdata(D) D = D / (isfinite(D).sum(axis=0) + 1) D = apply_along_axis(norm.isf, 0, D) D = D.swapaxes(1, axis) X[:] = D return X.reshape(orig_shape) def _qg_pandas_series(X, axis, inplace): if not inplace: X = X.copy() a = X.to_numpy() _qg_numpy(a, axis, True) X[:] = a return X def _qg_pandas_dataframe(x, axis, inplace): if not inplace: x = x.copy() a = x.to_numpy() _qg_numpy(a, axis, True) x[:] = a return x def _qg_dask_array(x, axis, inplace): import dask.array as da from scipy.stats import norm from numpy_sugar import nanrankdata if inplace: raise NotImplementedError() x = x.swapaxes(1, axis) x = dask.array_shape_reveal(x) shape = da.compute(*x.shape) x = da.ma.masked_array(x) x *= -1 x = da.apply_along_axis(_dask_apply, 0, x, nanrankdata, shape[0]) x = x / (da.isfinite(x).sum(axis=0) + 1) x = da.apply_along_axis(_dask_apply, 0, x, norm.isf, shape[0]) return x.swapaxes(1, axis) def _qg_dask_dataframe(x, axis, inplace): if inplace: raise NotImplementedError() d = x.to_dask_array(lengths=True) orig_chunks = d.chunks d = _qg_dask_array(d, axis, False).rechunk(orig_chunks) return d.to_dask_dataframe(columns=x.columns, index=x.index) def _qg_xarray_dataarray(X, axis, inplace): if not inplace: X = X.copy(deep=True) data = X.data if dask.is_array(data): data = _qg_dask_array(data, axis, inplace) else: data = _qg_numpy(data, axis, inplace) X.data = data return X def _dask_apply(x, func1d, length): from numpy import resize import warnings with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=RuntimeWarning) x = func1d(x) return resize(x, length)